diff --git a/dist/example_config_lmdb.yaml b/dist/example_config_lmdb.yaml index f8aab3d..bc664bd 100644 --- a/dist/example_config_lmdb.yaml +++ b/dist/example_config_lmdb.yaml @@ -56,15 +56,11 @@ storage: # - collecting garbage less frequently, saving CPU time, but keeping old peers long, thus using more memory (higher value). gc_interval: 3m - # The interval at which metrics about the number of infohashes and peers - # are collected and posted to Prometheus. - prometheus_reporting_interval: 1s - # The amount of time until a peer is considered stale. # To avoid churn, keep this slightly larger than `announce_interval` peer_lifetime: 31m - # Path to LMDB folder. Must exist + # Path to LMDB folder. Required. Path: "" # File mode of created database files, default is 0o640 @@ -79,5 +75,15 @@ storage: # Maximum size of database, default is 1GiB max_size: 0 + # Set MDB_WRITEMAP and MDB_MAPASYNC flags to use asynchronous flushes to disk. + # The installation of the flag can highly speed up writes, but there is a risk of DB damage + # or loss of last committed data if the application crashes. + async_write: true + + # Set MDB_NOMETASYNC flag. Omit the metadata flush on commit. + # Can a little accelerate writes if `async_write` not set, but last committed data + # bay be lost if the application crashes. + no_sync_meta: false + posthooks: [] prehooks: [] \ No newline at end of file diff --git a/storage/mdb/storage.go b/storage/mdb/storage.go index d58f71e..a67212f 100644 --- a/storage/mdb/storage.go +++ b/storage/mdb/storage.go @@ -10,6 +10,7 @@ import ( "github.com/PowerDNS/lmdb-go/exp/lmdbsync" "net/netip" "os" + "sync" "time" "github.com/PowerDNS/lmdb-go/lmdb" @@ -29,7 +30,7 @@ const ( defaultMaxReaders = 126 ) -var logger = log.NewLogger("storage/memory") +var logger = log.NewLogger("storage/lmdb") func init() { // Register the storage driver. @@ -62,6 +63,10 @@ type config struct { // MaxReaders - maximum number of threads/reader slots for the LMDB environment. // LMDB library's default is 126. MaxReaders int `cfg:"max_readers"` + // AsyncWrite sets MDB_WRITEMAP and MDB_MAPASYNC flags to use asynchronous flushes to disk. + AsyncWrite bool `cfg:"async_write"` + // NoMetaSync sets MDB_NOMETASYNC flag, omit the metadata flush. + NoMetaSync bool `cfg:"no_sync_meta"` } var ( @@ -107,9 +112,24 @@ func (cfg config) validate() (config, error) { return validCfg, nil } +type lmdbEnv interface { + SetMaxDBs(int) error + SetMapSize(int64) error + SetMaxReaders(int) error + Open(string, uint, os.FileMode) error + View(lmdb.TxnOp) error + Update(lmdb.TxnOp) error + Close() error + Sync(bool) error + Info() (*lmdb.EnvInfo, error) +} + type mdb struct { - *lmdbsync.Env + lmdbEnv dataDB, peersDB lmdb.DBI + onceCloser sync.Once + closed chan any + wg sync.WaitGroup } func newStorage(cfg config) (*mdb, error) { @@ -121,7 +141,7 @@ func newStorage(cfg config) (*mdb, error) { if err != nil { return nil, err } - var env *lmdbsync.Env + var env lmdbEnv if env, err = lmdbsync.NewEnv(lmEnv, lmdbsync.MapResizedHandler(lmdbsync.MapResizedDefaultRetry, lmdbsync.MapResizedDefaultDelay), @@ -141,7 +161,15 @@ func newStorage(cfg config) (*mdb, error) { } } - if err = env.Open(cfg.Path, 0, os.FileMode(cfg.Mode)); err != nil { + var flags uint + if cfg.AsyncWrite { + flags |= lmdb.WriteMap | lmdb.MapAsync + } + if cfg.NoMetaSync { + flags |= lmdb.NoMetaSync + } + + if err = env.Open(cfg.Path, flags, os.FileMode(cfg.Mode)); err != nil { return nil, err } @@ -166,7 +194,12 @@ func newStorage(cfg config) (*mdb, error) { return nil, err } - return &mdb{env, dataDB, peersDB}, nil + return &mdb{ + lmdbEnv: env, + dataDB: dataDB, + peersDB: peersDB, + closed: make(chan any), + }, nil } func (*mdb) Preservable() bool { @@ -174,9 +207,15 @@ func (*mdb) Preservable() bool { } func (m *mdb) Close() (err error) { - if m.Env != nil { - err = m.Env.Close() - } + m.onceCloser.Do(func() { + if m.lmdbEnv != nil { + close(m.closed) + m.wg.Wait() + logger.Info().Msg("LMDB exiting. Flushing databases to disk") + _ = m.Sync(true) + err = m.lmdbEnv.Close() + } + }) return } @@ -324,10 +363,9 @@ func (m *mdb) incr(txn *lmdb.Txn, key []byte, inc int) (err error) { v = int(binary.BigEndian.Uint32(b)) } v += inc - if v < 0 { - v = 0 - } - if b, err = txn.PutReserve(m.peersDB, key, 4, 0); err == nil { + if v <= 0 { + err = ignoreNotFound(txn.Del(m.peersDB, key, nil)) + } else if b, err = txn.PutReserve(m.peersDB, key, 4, 0); err == nil { binary.BigEndian.PutUint32(b, uint32(v)) } return @@ -338,7 +376,7 @@ func (m *mdb) putPeer(ih bittorrent.InfoHash, peer bittorrent.Peer, seeder bool) return m.Update(func(txn *lmdb.Txn) (err error) { var b []byte if b, err = txn.PutReserve(m.peersDB, ihKey, 8, 0); err == nil { - binary.BigEndian.PutUint64(b, uint64(timecache.NowUnixNano())) + binary.BigEndian.PutUint64(b, uint64(timecache.NowUnix())) ihKey[1] = countPrefix err = m.incr(txn, ihKey[:len(ihKey)-packedPeerLen], 1) } @@ -400,21 +438,17 @@ func (m *mdb) GraduateLeecher(_ context.Context, ih bittorrent.InfoHash, peer bi }) } -type scanAction int - -const ( - next scanAction = iota - stop - del -) - -func (m *mdb) scanPeers(ctx context.Context, prefix []byte, rw bool, fn func(k, v []byte) scanAction) (err error) { +func (m *mdb) scanPeers(ctx context.Context, prefix []byte, readRaw bool, fn func(k, v []byte) bool) (err error) { + m.wg.Add(1) prefixLen := len(prefix) - txFunc := func(txn *lmdb.Txn) (err error) { - txn.RawRead = true + err = m.View(func(txn *lmdb.Txn) (err error) { + txn.RawRead = readRaw scanner := lmdbscan.New(txn, m.peersDB) - defer scanner.Close() - if scanner.SetNext(prefix, nil, lmdb.SetRange, lmdb.Next) { + var op uint = lmdb.SetRange + if prefixLen == 0 { + op = lmdb.First + } + if scanner.SetNext(prefix, nil, op, lmdb.Next) { loop: for scanner.Scan() { select { @@ -425,13 +459,8 @@ func (m *mdb) scanPeers(ctx context.Context, prefix []byte, rw bool, fn func(k, if !bytes.HasPrefix(k, prefix) { break loop } - if len(k) == prefixLen+packedPeerLen { - switch fn(k, scanner.Val()) { - case del: - if err = scanner.Cursor().Del(0); err != nil { - break loop - } - case stop: + if prefixLen == 0 || len(k) == prefixLen+packedPeerLen { + if !fn(k, scanner.Val()) { break loop } } else { @@ -442,14 +471,10 @@ func (m *mdb) scanPeers(ctx context.Context, prefix []byte, rw bool, fn func(k, } err = scanner.Err() } + scanner.Close() return - } - - if rw { - err = m.Update(txFunc) - } else { - err = m.View(txFunc) - } + }) + m.wg.Done() return } @@ -457,22 +482,18 @@ func (m *mdb) scanPeers(ctx context.Context, prefix []byte, rw bool, fn func(k, func (m *mdb) AnnouncePeers(ctx context.Context, ih bittorrent.InfoHash, forSeeder bool, numWant int, v6 bool) (peers []bittorrent.Peer, err error) { peers = make([]bittorrent.Peer, 0, numWant) prefix, prefixLen := composeIHKeyPrefix(ih, false, v6, 0) - appendFn := func(k, _ []byte) scanAction { + appendFn := func(k, _ []byte) bool { peers = append(peers, unpackPeer(k[prefixLen:])) numWant-- - res := next - if numWant == 0 { - res = stop - } - return res + return numWant > 0 } if forSeeder { - err = m.scanPeers(ctx, prefix, false, appendFn) + err = m.scanPeers(ctx, prefix, true, appendFn) } else { prefix[0] = seederPrefix - if err = m.scanPeers(ctx, prefix, false, appendFn); err == nil && numWant > 0 { + if err = m.scanPeers(ctx, prefix, true, appendFn); err == nil && numWant > 0 { prefix[0] = leecherPrefix - err = m.scanPeers(ctx, prefix, false, appendFn) + err = m.scanPeers(ctx, prefix, true, appendFn) } } return @@ -507,9 +528,63 @@ func (m *mdb) ScrapeSwarm(_ context.Context, ih bittorrent.InfoHash) (leechers u return } +const ( + v1IHKeyLen = bittorrent.InfoHashV1Len + 4 + packedPeerLen + v2IHKeyPen = bittorrent.InfoHashV2Len + 4 + packedPeerLen +) + +func (m *mdb) gc(cutoff time.Time) { + toDel := make([][]byte, 0, 50) + cutoffNano := cutoff.Unix() + err := m.scanPeers(context.Background(), nil, false, func(k, v []byte) bool { + if l := len(k); (l == v1IHKeyLen || l == v2IHKeyPen) && + (k[0] == seederPrefix || k[0] == leecherPrefix) && + (k[1] == ipv4Prefix || k[1] == ipv6Prefix) && + k[2] == keySeparator && len(v) >= 8 && cutoffNano >= int64(binary.BigEndian.Uint64(v)) { + toDel = append(toDel, k) + } + return true + }) + if err == nil { + err = m.Update(func(txn *lmdb.Txn) (err error) { + for _, k := range toDel { + err = txn.Del(m.peersDB, k, nil) + if err == nil { + k[1] = countPrefix + err = m.incr(txn, k[:len(k)-packedPeerLen], -1) + } else if lmdb.IsNotFound(err) { + err = nil + } + if err != nil { + break + } + } + return + }) + } + if err == nil { + _ = m.Sync(true) + } else { + logger.Err(err).Msg("Error occurred while GC") + } +} + func (m *mdb) ScheduleGC(gcInterval, peerLifeTime time.Duration) { - //TODO implement me - panic("implement me") + m.wg.Add(1) + go func() { + defer m.wg.Done() + t := time.NewTimer(gcInterval) + defer t.Stop() + for { + select { + case <-m.closed: + return + case <-t.C: + m.gc(time.Now().Add(-peerLifeTime)) + t.Reset(gcInterval) + } + } + }() } func (m *mdb) Ping(_ context.Context) error { diff --git a/storage/mdb/storage_test.go b/storage/mdb/storage_test.go index 8d37654..6084997 100644 --- a/storage/mdb/storage_test.go +++ b/storage/mdb/storage_test.go @@ -17,6 +17,8 @@ var cfg = config{ PeersDBName: "PEERS", MaxSize: defaultMapSize, MaxReaders: defaultMaxReaders, + AsyncWrite: true, + NoMetaSync: false, } func createNew() s.PeerStorage { diff --git a/storage/storage.go b/storage/storage.go index d1b12dd..411d3bb 100644 --- a/storage/storage.go +++ b/storage/storage.go @@ -239,7 +239,7 @@ func RegisterDriver(name string, d Driver) { func NewDataStorage(cfg conf.NamedMapConfig) (DataStorage, error) { driversMU.RLock() defer driversMU.RUnlock() - logger.Debug().Object("config", cfg).Msg("starting peer storage") + logger.Debug().Object("config", cfg).Msg("starting data storage") var d Driver d, ok := drivers[cfg.Name]