diff --git a/go.mod b/go.mod index 3624e1f..91c16e2 100644 --- a/go.mod +++ b/go.mod @@ -6,8 +6,8 @@ require ( code.cloudfoundry.org/go-diodes v0.0.0-20240515174142-71582f284718 github.com/MicahParks/jwkset v0.5.18 github.com/MicahParks/keyfunc/v3 v3.3.3 + github.com/PowerDNS/lmdb-go v1.9.2 github.com/anacrolix/torrent v1.56.0 - github.com/bmatsuo/lmdb-go v1.8.0 github.com/cespare/xxhash/v2 v2.3.0 github.com/golang-jwt/jwt/v5 v5.2.1 github.com/jackc/pgx/v5 v5.6.0 @@ -54,6 +54,7 @@ require ( github.com/valyala/bytebufferpool v1.0.0 // indirect golang.org/x/crypto v0.23.0 // indirect golang.org/x/exp v0.0.0-20240531132922-fd00a4e0eefc // indirect + golang.org/x/net v0.24.0 // indirect golang.org/x/sync v0.7.0 // indirect golang.org/x/sys v0.20.0 // indirect golang.org/x/text v0.15.0 // indirect diff --git a/go.sum b/go.sum index a46e5d2..8f8ebc0 100644 --- a/go.sum +++ b/go.sum @@ -9,6 +9,8 @@ github.com/MicahParks/jwkset v0.5.18 h1:WLdyMngF7rCrnstQxA7mpRoxeaWqGzPM/0z40PJU github.com/MicahParks/jwkset v0.5.18/go.mod h1:q8ptTGn/Z9c4MwbcfeCDssADeVQb3Pk7PnVxrvi+2QY= github.com/MicahParks/keyfunc/v3 v3.3.3 h1:c6j9oSu1YUo0k//KwF1miIQlEMtqNlj7XBFLB8jtEmY= github.com/MicahParks/keyfunc/v3 v3.3.3/go.mod h1:f/UMyXdKfkZzmBeBFUeYk+zu066J1Fcl48f7Wnl5Z48= +github.com/PowerDNS/lmdb-go v1.9.2 h1:Cmgerh9y3ZKBZGz1irxSShhfmFyRUh+Zdk4cZk7ZJvU= +github.com/PowerDNS/lmdb-go v1.9.2/go.mod h1:TE0l+EZK8Z1B4dx070ZxkWTlp8RG1mjN0/+FkFRQMtU= github.com/RoaringBitmap/roaring v0.4.7/go.mod h1:8khRDP4HmeXns4xIj9oGrKSz7XTQiJx2zgh7AcNke4w= github.com/RoaringBitmap/roaring v0.4.17/go.mod h1:D3qVegWTmfCaX4Bl5CrBE9hfrSrrXIr8KVNvRsDi1NI= github.com/RoaringBitmap/roaring v0.4.23/go.mod h1:D0gp8kJQgE1A4LQ5wFLggQEyvDi06Mq5mKs52e1TwOo= @@ -57,8 +59,6 @@ github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24 github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= -github.com/bmatsuo/lmdb-go v1.8.0 h1:ohf3Q4xjXZBKh4AayUY4bb2CXuhRAI8BYGlJq08EfNA= -github.com/bmatsuo/lmdb-go v1.8.0/go.mod h1:wWPZmKdOAZsl4qOqkowQ1aCrFie1HU8gWloHMCeAUdM= github.com/bradfitz/iter v0.0.0-20140124041915-454541ec3da2/go.mod h1:PyRFw1Lt2wKX4ZVSQ2mk+PeDa1rxyObEDlApuIsUKuo= github.com/bradfitz/iter v0.0.0-20190303215204-33e6a9893b0c/go.mod h1:PyRFw1Lt2wKX4ZVSQ2mk+PeDa1rxyObEDlApuIsUKuo= github.com/bradfitz/iter v0.0.0-20191230175014-e8f45d346db8 h1:GKTyiRCL6zVf5wWaqKnf+7Qs6GbEPfd4iMOitWzXJx8= diff --git a/storage/mdb/storage.go b/storage/mdb/storage.go index e02066e..d58f71e 100644 --- a/storage/mdb/storage.go +++ b/storage/mdb/storage.go @@ -7,23 +7,26 @@ import ( "context" "encoding/binary" "errors" - "github.com/bmatsuo/lmdb-go/lmdb" - "github.com/bmatsuo/lmdb-go/lmdbscan" + "github.com/PowerDNS/lmdb-go/exp/lmdbsync" + "net/netip" + "os" + "time" + + "github.com/PowerDNS/lmdb-go/lmdb" + "github.com/PowerDNS/lmdb-go/lmdbscan" "github.com/sot-tech/mochi/bittorrent" "github.com/sot-tech/mochi/pkg/conf" "github.com/sot-tech/mochi/pkg/log" "github.com/sot-tech/mochi/pkg/timecache" "github.com/sot-tech/mochi/storage" - "net/netip" - "os" - "time" ) const ( // Name - registered name of the storage - Name = "lmdb" - defaultMode = 0o640 - defaultMapSize = 1 << 30 + Name = "lmdb" + defaultMode = 0o640 + defaultMapSize = 1 << 30 + defaultMaxReaders = 126 ) var logger = log.NewLogger("storage/memory") @@ -52,7 +55,13 @@ type config struct { Mode uint32 DataDBName string `cfg:"data_db"` PeersDBName string `cfg:"peers_db"` - MaxSize int64 `cfg:"max_size"` + // MaxSize - size of the memory map to use for lmdb environment. + // The size should be a multiple of the OS page size. + // Mochi's default is 1GiB. + MaxSize int64 `cfg:"max_size"` + // MaxReaders - maximum number of threads/reader slots for the LMDB environment. + // LMDB library's default is 126. + MaxReaders int `cfg:"max_readers"` } var ( @@ -87,11 +96,19 @@ func (cfg config) validate() (config, error) { Int64("default", validCfg.MaxSize). Msg("falling back to default configuration") } + if cfg.MaxReaders <= 0 { + validCfg.MaxReaders = defaultMaxReaders + logger.Warn(). + Str("name", "max_readers"). + Int("provided", cfg.MaxReaders). + Int("default", 126). + Msg("falling back to default configuration") + } return validCfg, nil } type mdb struct { - *lmdb.Env + *lmdbsync.Env dataDB, peersDB lmdb.DBI } @@ -100,16 +117,29 @@ func newStorage(cfg config) (*mdb, error) { if cfg, err = cfg.validate(); err != nil { return nil, err } - env, err := lmdb.NewEnv() + lmEnv, err := lmdb.NewEnv() if err != nil { return nil, err } + var env *lmdbsync.Env + + if env, err = lmdbsync.NewEnv(lmEnv, + lmdbsync.MapResizedHandler(lmdbsync.MapResizedDefaultRetry, lmdbsync.MapResizedDefaultDelay), + ); err != nil { + return nil, err + } + if err = env.SetMaxDBs(2); err != nil { return nil, err } if err = env.SetMapSize(cfg.MaxSize); err != nil { return nil, err } + if cfg.MaxReaders > 0 { + if err = env.SetMaxReaders(cfg.MaxReaders); err != nil { + return nil, err + } + } if err = env.Open(cfg.Path, 0, os.FileMode(cfg.Mode)); err != nil { return nil, err @@ -292,23 +322,23 @@ func (m *mdb) incr(txn *lmdb.Txn, key []byte, inc int) (err error) { } if len(b) >= 4 { v = int(binary.BigEndian.Uint32(b)) - } else { - b = make([]byte, 4) } v += inc if v < 0 { v = 0 } - binary.BigEndian.PutUint32(b, uint32(v)) - return txn.Put(m.peersDB, key, b, 0) + if b, err = txn.PutReserve(m.peersDB, key, 4, 0); err == nil { + binary.BigEndian.PutUint32(b, uint32(v)) + } + return } func (m *mdb) putPeer(ih bittorrent.InfoHash, peer bittorrent.Peer, seeder bool) error { ihKey := composeIHKey(ih, peer, seeder) return m.Update(func(txn *lmdb.Txn) (err error) { - if err = txn.Put(m.peersDB, ihKey, - binary.BigEndian.AppendUint64(nil, uint64(timecache.NowUnixNano())), - 0); err == nil { + var b []byte + if b, err = txn.PutReserve(m.peersDB, ihKey, 8, 0); err == nil { + binary.BigEndian.PutUint64(b, uint64(timecache.NowUnixNano())) ihKey[1] = countPrefix err = m.incr(txn, ihKey[:len(ihKey)-packedPeerLen], 1) } @@ -350,9 +380,11 @@ func (m *mdb) GraduateLeecher(_ context.Context, ih bittorrent.InfoHash, peer bi return } ihKey[0] = seederPrefix - if err = txn.Put(m.peersDB, ihKey, binary.BigEndian.AppendUint64(nil, uint64(timecache.NowUnixNano())), 0); err != nil { + var b []byte + if b, err = txn.PutReserve(m.peersDB, ihKey, 8, 0); err != nil { return } + binary.BigEndian.PutUint64(b, uint64(timecache.NowUnixNano())) ihPrefix := ihKey[:len(ihKey)-packedPeerLen] ihPrefix[1] = countPrefix if err = m.incr(txn, ihPrefix, 1); err != nil { @@ -423,6 +455,7 @@ func (m *mdb) scanPeers(ctx context.Context, prefix []byte, rw bool, fn func(k, } func (m *mdb) AnnouncePeers(ctx context.Context, ih bittorrent.InfoHash, forSeeder bool, numWant int, v6 bool) (peers []bittorrent.Peer, err error) { + peers = make([]bittorrent.Peer, 0, numWant) prefix, prefixLen := composeIHKeyPrefix(ih, false, v6, 0) appendFn := func(k, _ []byte) scanAction { peers = append(peers, unpackPeer(k[prefixLen:])) diff --git a/storage/mdb/storage_test.go b/storage/mdb/storage_test.go index 890157b..8d37654 100644 --- a/storage/mdb/storage_test.go +++ b/storage/mdb/storage_test.go @@ -8,12 +8,15 @@ import ( "testing" ) +const tmpPath = "" + var cfg = config{ Path: "", Mode: defaultMode, DataDBName: "KV", PeersDBName: "PEERS", MaxSize: defaultMapSize, + MaxReaders: defaultMaxReaders, } func createNew() s.PeerStorage { @@ -27,7 +30,7 @@ func createNew() s.PeerStorage { } func TestStorage(t *testing.T) { - tmpDir, err := os.MkdirTemp("", "lmdb*") + tmpDir, err := os.MkdirTemp(tmpPath, "lmdb*") if err != nil { t.Error(err) } @@ -41,7 +44,7 @@ func TestStorage(t *testing.T) { } func BenchmarkStorage(b *testing.B) { - tmpDir, err := os.MkdirTemp("", "lmdb*") + tmpDir, err := os.MkdirTemp(tmpPath, "lmdb*") if err != nil { b.Error(err) }