(partially tested) implement LMDB GC

* added AsyncWrite and NoMetaSync flags to increase write speed (if needed)
This commit is contained in:
Lawrence, Rendall
2024-06-21 18:32:06 +03:00
parent 8f2bea1de6
commit 13d5444ca4
4 changed files with 140 additions and 57 deletions

View File

@@ -56,15 +56,11 @@ storage:
# - collecting garbage less frequently, saving CPU time, but keeping old peers long, thus using more memory (higher value).
gc_interval: 3m
# The interval at which metrics about the number of infohashes and peers
# are collected and posted to Prometheus.
prometheus_reporting_interval: 1s
# The amount of time until a peer is considered stale.
# To avoid churn, keep this slightly larger than `announce_interval`
peer_lifetime: 31m
# Path to LMDB folder. Must exist
# Path to LMDB folder. Required.
Path: ""
# File mode of created database files, default is 0o640
@@ -79,5 +75,15 @@ storage:
# Maximum size of database, default is 1GiB
max_size: 0
# Set MDB_WRITEMAP and MDB_MAPASYNC flags to use asynchronous flushes to disk.
# The installation of the flag can highly speed up writes, but there is a risk of DB damage
# or loss of last committed data if the application crashes.
async_write: true
# Set MDB_NOMETASYNC flag. Omit the metadata flush on commit.
# Can a little accelerate writes if `async_write` not set, but last committed data
# bay be lost if the application crashes.
no_sync_meta: false
posthooks: []
prehooks: []

View File

@@ -10,6 +10,7 @@ import (
"github.com/PowerDNS/lmdb-go/exp/lmdbsync"
"net/netip"
"os"
"sync"
"time"
"github.com/PowerDNS/lmdb-go/lmdb"
@@ -29,7 +30,7 @@ const (
defaultMaxReaders = 126
)
var logger = log.NewLogger("storage/memory")
var logger = log.NewLogger("storage/lmdb")
func init() {
// Register the storage driver.
@@ -62,6 +63,10 @@ type config struct {
// MaxReaders - maximum number of threads/reader slots for the LMDB environment.
// LMDB library's default is 126.
MaxReaders int `cfg:"max_readers"`
// AsyncWrite sets MDB_WRITEMAP and MDB_MAPASYNC flags to use asynchronous flushes to disk.
AsyncWrite bool `cfg:"async_write"`
// NoMetaSync sets MDB_NOMETASYNC flag, omit the metadata flush.
NoMetaSync bool `cfg:"no_sync_meta"`
}
var (
@@ -107,9 +112,24 @@ func (cfg config) validate() (config, error) {
return validCfg, nil
}
type lmdbEnv interface {
SetMaxDBs(int) error
SetMapSize(int64) error
SetMaxReaders(int) error
Open(string, uint, os.FileMode) error
View(lmdb.TxnOp) error
Update(lmdb.TxnOp) error
Close() error
Sync(bool) error
Info() (*lmdb.EnvInfo, error)
}
type mdb struct {
*lmdbsync.Env
lmdbEnv
dataDB, peersDB lmdb.DBI
onceCloser sync.Once
closed chan any
wg sync.WaitGroup
}
func newStorage(cfg config) (*mdb, error) {
@@ -121,7 +141,7 @@ func newStorage(cfg config) (*mdb, error) {
if err != nil {
return nil, err
}
var env *lmdbsync.Env
var env lmdbEnv
if env, err = lmdbsync.NewEnv(lmEnv,
lmdbsync.MapResizedHandler(lmdbsync.MapResizedDefaultRetry, lmdbsync.MapResizedDefaultDelay),
@@ -141,7 +161,15 @@ func newStorage(cfg config) (*mdb, error) {
}
}
if err = env.Open(cfg.Path, 0, os.FileMode(cfg.Mode)); err != nil {
var flags uint
if cfg.AsyncWrite {
flags |= lmdb.WriteMap | lmdb.MapAsync
}
if cfg.NoMetaSync {
flags |= lmdb.NoMetaSync
}
if err = env.Open(cfg.Path, flags, os.FileMode(cfg.Mode)); err != nil {
return nil, err
}
@@ -166,7 +194,12 @@ func newStorage(cfg config) (*mdb, error) {
return nil, err
}
return &mdb{env, dataDB, peersDB}, nil
return &mdb{
lmdbEnv: env,
dataDB: dataDB,
peersDB: peersDB,
closed: make(chan any),
}, nil
}
func (*mdb) Preservable() bool {
@@ -174,9 +207,15 @@ func (*mdb) Preservable() bool {
}
func (m *mdb) Close() (err error) {
if m.Env != nil {
err = m.Env.Close()
}
m.onceCloser.Do(func() {
if m.lmdbEnv != nil {
close(m.closed)
m.wg.Wait()
logger.Info().Msg("LMDB exiting. Flushing databases to disk")
_ = m.Sync(true)
err = m.lmdbEnv.Close()
}
})
return
}
@@ -324,10 +363,9 @@ func (m *mdb) incr(txn *lmdb.Txn, key []byte, inc int) (err error) {
v = int(binary.BigEndian.Uint32(b))
}
v += inc
if v < 0 {
v = 0
}
if b, err = txn.PutReserve(m.peersDB, key, 4, 0); err == nil {
if v <= 0 {
err = ignoreNotFound(txn.Del(m.peersDB, key, nil))
} else if b, err = txn.PutReserve(m.peersDB, key, 4, 0); err == nil {
binary.BigEndian.PutUint32(b, uint32(v))
}
return
@@ -338,7 +376,7 @@ func (m *mdb) putPeer(ih bittorrent.InfoHash, peer bittorrent.Peer, seeder bool)
return m.Update(func(txn *lmdb.Txn) (err error) {
var b []byte
if b, err = txn.PutReserve(m.peersDB, ihKey, 8, 0); err == nil {
binary.BigEndian.PutUint64(b, uint64(timecache.NowUnixNano()))
binary.BigEndian.PutUint64(b, uint64(timecache.NowUnix()))
ihKey[1] = countPrefix
err = m.incr(txn, ihKey[:len(ihKey)-packedPeerLen], 1)
}
@@ -400,21 +438,17 @@ func (m *mdb) GraduateLeecher(_ context.Context, ih bittorrent.InfoHash, peer bi
})
}
type scanAction int
const (
next scanAction = iota
stop
del
)
func (m *mdb) scanPeers(ctx context.Context, prefix []byte, rw bool, fn func(k, v []byte) scanAction) (err error) {
func (m *mdb) scanPeers(ctx context.Context, prefix []byte, readRaw bool, fn func(k, v []byte) bool) (err error) {
m.wg.Add(1)
prefixLen := len(prefix)
txFunc := func(txn *lmdb.Txn) (err error) {
txn.RawRead = true
err = m.View(func(txn *lmdb.Txn) (err error) {
txn.RawRead = readRaw
scanner := lmdbscan.New(txn, m.peersDB)
defer scanner.Close()
if scanner.SetNext(prefix, nil, lmdb.SetRange, lmdb.Next) {
var op uint = lmdb.SetRange
if prefixLen == 0 {
op = lmdb.First
}
if scanner.SetNext(prefix, nil, op, lmdb.Next) {
loop:
for scanner.Scan() {
select {
@@ -425,13 +459,8 @@ func (m *mdb) scanPeers(ctx context.Context, prefix []byte, rw bool, fn func(k,
if !bytes.HasPrefix(k, prefix) {
break loop
}
if len(k) == prefixLen+packedPeerLen {
switch fn(k, scanner.Val()) {
case del:
if err = scanner.Cursor().Del(0); err != nil {
break loop
}
case stop:
if prefixLen == 0 || len(k) == prefixLen+packedPeerLen {
if !fn(k, scanner.Val()) {
break loop
}
} else {
@@ -442,14 +471,10 @@ func (m *mdb) scanPeers(ctx context.Context, prefix []byte, rw bool, fn func(k,
}
err = scanner.Err()
}
scanner.Close()
return
}
if rw {
err = m.Update(txFunc)
} else {
err = m.View(txFunc)
}
})
m.wg.Done()
return
}
@@ -457,22 +482,18 @@ func (m *mdb) scanPeers(ctx context.Context, prefix []byte, rw bool, fn func(k,
func (m *mdb) AnnouncePeers(ctx context.Context, ih bittorrent.InfoHash, forSeeder bool, numWant int, v6 bool) (peers []bittorrent.Peer, err error) {
peers = make([]bittorrent.Peer, 0, numWant)
prefix, prefixLen := composeIHKeyPrefix(ih, false, v6, 0)
appendFn := func(k, _ []byte) scanAction {
appendFn := func(k, _ []byte) bool {
peers = append(peers, unpackPeer(k[prefixLen:]))
numWant--
res := next
if numWant == 0 {
res = stop
}
return res
return numWant > 0
}
if forSeeder {
err = m.scanPeers(ctx, prefix, false, appendFn)
err = m.scanPeers(ctx, prefix, true, appendFn)
} else {
prefix[0] = seederPrefix
if err = m.scanPeers(ctx, prefix, false, appendFn); err == nil && numWant > 0 {
if err = m.scanPeers(ctx, prefix, true, appendFn); err == nil && numWant > 0 {
prefix[0] = leecherPrefix
err = m.scanPeers(ctx, prefix, false, appendFn)
err = m.scanPeers(ctx, prefix, true, appendFn)
}
}
return
@@ -507,9 +528,63 @@ func (m *mdb) ScrapeSwarm(_ context.Context, ih bittorrent.InfoHash) (leechers u
return
}
const (
v1IHKeyLen = bittorrent.InfoHashV1Len + 4 + packedPeerLen
v2IHKeyPen = bittorrent.InfoHashV2Len + 4 + packedPeerLen
)
func (m *mdb) gc(cutoff time.Time) {
toDel := make([][]byte, 0, 50)
cutoffNano := cutoff.Unix()
err := m.scanPeers(context.Background(), nil, false, func(k, v []byte) bool {
if l := len(k); (l == v1IHKeyLen || l == v2IHKeyPen) &&
(k[0] == seederPrefix || k[0] == leecherPrefix) &&
(k[1] == ipv4Prefix || k[1] == ipv6Prefix) &&
k[2] == keySeparator && len(v) >= 8 && cutoffNano >= int64(binary.BigEndian.Uint64(v)) {
toDel = append(toDel, k)
}
return true
})
if err == nil {
err = m.Update(func(txn *lmdb.Txn) (err error) {
for _, k := range toDel {
err = txn.Del(m.peersDB, k, nil)
if err == nil {
k[1] = countPrefix
err = m.incr(txn, k[:len(k)-packedPeerLen], -1)
} else if lmdb.IsNotFound(err) {
err = nil
}
if err != nil {
break
}
}
return
})
}
if err == nil {
_ = m.Sync(true)
} else {
logger.Err(err).Msg("Error occurred while GC")
}
}
func (m *mdb) ScheduleGC(gcInterval, peerLifeTime time.Duration) {
//TODO implement me
panic("implement me")
m.wg.Add(1)
go func() {
defer m.wg.Done()
t := time.NewTimer(gcInterval)
defer t.Stop()
for {
select {
case <-m.closed:
return
case <-t.C:
m.gc(time.Now().Add(-peerLifeTime))
t.Reset(gcInterval)
}
}
}()
}
func (m *mdb) Ping(_ context.Context) error {

View File

@@ -17,6 +17,8 @@ var cfg = config{
PeersDBName: "PEERS",
MaxSize: defaultMapSize,
MaxReaders: defaultMaxReaders,
AsyncWrite: true,
NoMetaSync: false,
}
func createNew() s.PeerStorage {

View File

@@ -239,7 +239,7 @@ func RegisterDriver(name string, d Driver) {
func NewDataStorage(cfg conf.NamedMapConfig) (DataStorage, error) {
driversMU.RLock()
defer driversMU.RUnlock()
logger.Debug().Object("config", cfg).Msg("starting peer storage")
logger.Debug().Object("config", cfg).Msg("starting data storage")
var d Driver
d, ok := drivers[cfg.Name]