diff --git a/bittorrent/peer.go b/bittorrent/peer.go index 3d791da..5b55740 100644 --- a/bittorrent/peer.go +++ b/bittorrent/peer.go @@ -26,8 +26,6 @@ var ErrInvalidPeerIDSize = fmt.Errorf("peer ID must be %d bytes", PeerIDLen) var zeroPeerID PeerID // NewPeerID creates a PeerID from a byte slice. -// -// It panics if b is not 20 bytes long. func NewPeerID(b []byte) (PeerID, error) { if len(b) != PeerIDLen { return zeroPeerID, ErrInvalidPeerIDSize diff --git a/cmd/mochi/config.go b/cmd/mochi/config.go index aac39bd..1c1d55f 100644 --- a/cmd/mochi/config.go +++ b/cmd/mochi/config.go @@ -19,6 +19,7 @@ import ( // Imports to register storage drivers. _ "github.com/sot-tech/mochi/storage/keydb" + _ "github.com/sot-tech/mochi/storage/mdb" sm "github.com/sot-tech/mochi/storage/memory" _ "github.com/sot-tech/mochi/storage/pg" _ "github.com/sot-tech/mochi/storage/redis" diff --git a/dist/example_config_lmdb.yaml b/dist/example_config_lmdb.yaml new file mode 100644 index 0000000..f8aab3d --- /dev/null +++ b/dist/example_config_lmdb.yaml @@ -0,0 +1,83 @@ +# @formatter:off +# Note: see `example_config.yaml` for `frontends` and `*hooks` config description + + +announce_interval: 30m +min_announce_interval: 15m +metrics_addr: "" + +frontends: + - name: http + config: + addr: "0.0.0.0:6969" + tls: false + tls_cert_path: "" + tls_key_path: "" + reuse_port: true + read_timeout: 5s + write_timeout: 5s + enable_keepalive: false + idle_timeout: 30s + enable_request_timing: false + announce_routes: + - "/announce" + scrape_routes: + - "/scrape" + ping_routes: + - "/ping" + allow_ip_spoofing: false + filter_private_ips: false + real_ip_header: "x-real-ip" + max_numwant: 100 + default_numwant: 50 + max_scrape_infohashes: 50 + + - name: udp + config: + addr: "0.0.0.0:6969" + reuse_port: true + workers: 1 + max_clock_skew: 10s + private_key: "paste a random string here that will be used to hmac connection IDs" + enable_request_timing: false + allow_ip_spoofing: false + filter_private_ips: false + max_numwant: 100 + default_numwant: 50 + max_scrape_infohashes: 50 + +# This block defines configuration used for redis storage. +storage: + name: lmdb + config: + # The frequency which stale peers are removed. + # This balances between + # - collecting garbage more often, potentially using more CPU time, but potentially using less memory (lower value) + # - collecting garbage less frequently, saving CPU time, but keeping old peers long, thus using more memory (higher value). + gc_interval: 3m + + # The interval at which metrics about the number of infohashes and peers + # are collected and posted to Prometheus. + prometheus_reporting_interval: 1s + + # The amount of time until a peer is considered stale. + # To avoid churn, keep this slightly larger than `announce_interval` + peer_lifetime: 31m + + # Path to LMDB folder. Must exist + Path: "" + + # File mode of created database files, default is 0o640 + mode: 0 + + # Name of database to store KV data. If not provided, root DB is used (not recommended) + data_db: "" + + # Name of database to store peers data. If not provided, root DB is used (not recommended) + peers_db: "" + + # Maximum size of database, default is 1GiB + max_size: 0 + +posthooks: [] +prehooks: [] \ No newline at end of file diff --git a/go.mod b/go.mod index 05e45f1..3624e1f 100644 --- a/go.mod +++ b/go.mod @@ -35,7 +35,6 @@ require ( github.com/davecgh/go-spew v1.1.1 // indirect github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect github.com/fsnotify/fsnotify v1.7.0 // indirect - github.com/google/pprof v0.0.0-20240327155427-868f304927ed // indirect github.com/huandu/xstrings v1.4.0 // indirect github.com/jackc/pgpassfile v1.0.0 // indirect github.com/jackc/pgservicefile v0.0.0-20231201235250-de7065d80cb9 // indirect diff --git a/storage/mdb/storage.go b/storage/mdb/storage.go index 43563b7..e02066e 100644 --- a/storage/mdb/storage.go +++ b/storage/mdb/storage.go @@ -3,10 +3,12 @@ package mdb import ( + "bytes" "context" "encoding/binary" "errors" "github.com/bmatsuo/lmdb-go/lmdb" + "github.com/bmatsuo/lmdb-go/lmdbscan" "github.com/sot-tech/mochi/bittorrent" "github.com/sot-tech/mochi/pkg/conf" "github.com/sot-tech/mochi/pkg/log" @@ -14,13 +16,14 @@ import ( "github.com/sot-tech/mochi/storage" "net/netip" "os" + "time" ) const ( // Name - registered name of the storage Name = "lmdb" defaultMode = 0o640 - defaultMapSize = 1 << 28 + defaultMapSize = 1 << 30 ) var logger = log.NewLogger("storage/memory") @@ -33,6 +36,10 @@ func init() { type builder struct{} func (b builder) NewDataStorage(icfg conf.MapConfig) (storage.DataStorage, error) { + return b.NewPeerStorage(icfg) +} + +func (builder) NewPeerStorage(icfg conf.MapConfig) (storage.PeerStorage, error) { var cfg config if err := icfg.Unmarshal(&cfg); err != nil { return nil, err @@ -40,10 +47,6 @@ func (b builder) NewDataStorage(icfg conf.MapConfig) (storage.DataStorage, error return newStorage(cfg) } -func (builder) NewPeerStorage(_ conf.MapConfig) (storage.PeerStorage, error) { - panic("lmdb peer storage not implemented") -} - type config struct { Path string Mode uint32 @@ -76,7 +79,7 @@ func (cfg config) validate() (config, error) { Stringer("default", os.FileMode(validCfg.Mode)). Msg("falling back to default configuration") } - if cfg.MaxSize == 0 { + if cfg.MaxSize <= 0 { validCfg.MaxSize = defaultMapSize logger.Warn(). Str("name", "max_size"). @@ -226,10 +229,14 @@ func (m *mdb) Delete(_ context.Context, storeCtx string, keys ...string) (err er } const ( - ipLen = 16 - packedPeerLen = bittorrent.PeerIDLen + ipLen + 2 - seederPrefix = 'S' - leecherPrefix = 'L' + ipLen = 16 + packedPeerLen = bittorrent.PeerIDLen + ipLen + 2 // peer_id + ipv6 + port + seederPrefix = 'S' + leecherPrefix = 'L' + ipv4Prefix = '4' + ipv6Prefix = '6' + countPrefix = 'C' + downloadedPrefix = 'D' ) func packPeer(peer bittorrent.Peer, out []byte) { @@ -252,62 +259,227 @@ func unpackPeer(arr []byte) (peer bittorrent.Peer) { return } -func composeIHKey(ih bittorrent.InfoHash, peer bittorrent.Peer, seeder bool) (ihKey []byte) { +func composeIHKeyPrefix(ih bittorrent.InfoHash, seeder bool, v6 bool, suffixLen int) (ihKey []byte, suffixStart int) { ihLen := len(ih) - ihKey = make([]byte, ihLen+3+packedPeerLen) + ihKey = make([]byte, ihLen+4+suffixLen) // prefix{L/S} + prefix{4/6} + separator + infoHash + separator if seeder { ihKey[0] = seederPrefix } else { ihKey[0] = leecherPrefix } - ihKey[1], ihKey[ihLen+2] = keySeparator, keySeparator - copy(ihKey[2:], ih) - packPeer(peer, ihKey[ihLen+3:]) + if v6 { + ihKey[1] = ipv6Prefix + } else { + ihKey[1] = ipv4Prefix + } + ihKey[2], ihKey[ihLen+3] = keySeparator, keySeparator + copy(ihKey[3:], ih) + suffixStart = len(ihKey) - suffixLen return } -func (m *mdb) PutSeeder(_ context.Context, ih bittorrent.InfoHash, peer bittorrent.Peer) error { - return m.Update(func(txn *lmdb.Txn) error { - return txn.Put(m.peersDB, composeIHKey(ih, peer, true), +func composeIHKey(ih bittorrent.InfoHash, peer bittorrent.Peer, seeder bool) (ihKey []byte) { + ihKey, start := composeIHKeyPrefix(ih, seeder, peer.Addr().Is6(), packedPeerLen) + packPeer(peer, ihKey[start:]) + return +} + +func (m *mdb) incr(txn *lmdb.Txn, key []byte, inc int) (err error) { + var v int + var b []byte + if b, err = ignoreNotFoundData(txn.Get(m.peersDB, key)); err != nil { + return + } + if len(b) >= 4 { + v = int(binary.BigEndian.Uint32(b)) + } else { + b = make([]byte, 4) + } + v += inc + if v < 0 { + v = 0 + } + binary.BigEndian.PutUint32(b, uint32(v)) + return txn.Put(m.peersDB, key, b, 0) +} + +func (m *mdb) putPeer(ih bittorrent.InfoHash, peer bittorrent.Peer, seeder bool) error { + ihKey := composeIHKey(ih, peer, seeder) + return m.Update(func(txn *lmdb.Txn) (err error) { + if err = txn.Put(m.peersDB, ihKey, binary.BigEndian.AppendUint64(nil, uint64(timecache.NowUnixNano())), - 0) + 0); err == nil { + ihKey[1] = countPrefix + err = m.incr(txn, ihKey[:len(ihKey)-packedPeerLen], 1) + } + return }) } +func (m *mdb) delPeer(ih bittorrent.InfoHash, peer bittorrent.Peer, seeder bool) error { + ihKey := composeIHKey(ih, peer, seeder) + return m.Update(func(txn *lmdb.Txn) (err error) { + if err = ignoreNotFound(txn.Del(m.peersDB, ihKey, nil)); err == nil { + ihKey[1] = countPrefix + err = m.incr(txn, ihKey[:len(ihKey)-packedPeerLen], -1) + } + return + }) +} + +func (m *mdb) PutSeeder(_ context.Context, ih bittorrent.InfoHash, peer bittorrent.Peer) error { + return m.putPeer(ih, peer, true) +} + func (m *mdb) DeleteSeeder(_ context.Context, ih bittorrent.InfoHash, peer bittorrent.Peer) error { - //TODO implement me - panic("implement me") + return m.delPeer(ih, peer, true) } func (m *mdb) PutLeecher(_ context.Context, ih bittorrent.InfoHash, peer bittorrent.Peer) error { - return m.Update(func(txn *lmdb.Txn) error { - return txn.Put(m.peersDB, composeIHKey(ih, peer, false), - binary.BigEndian.AppendUint64(nil, uint64(timecache.NowUnixNano())), - 0) - }) + return m.putPeer(ih, peer, false) } func (m *mdb) DeleteLeecher(_ context.Context, ih bittorrent.InfoHash, peer bittorrent.Peer) error { - //TODO implement me - panic("implement me") + return m.delPeer(ih, peer, false) } func (m *mdb) GraduateLeecher(_ context.Context, ih bittorrent.InfoHash, peer bittorrent.Peer) error { - //TODO implement me - panic("implement me") + ihKey := composeIHKey(ih, peer, false) + return m.Update(func(txn *lmdb.Txn) (err error) { + if err = ignoreNotFound(txn.Del(m.peersDB, ihKey, nil)); err != nil { + return + } + ihKey[0] = seederPrefix + if err = txn.Put(m.peersDB, ihKey, binary.BigEndian.AppendUint64(nil, uint64(timecache.NowUnixNano())), 0); err != nil { + return + } + ihPrefix := ihKey[:len(ihKey)-packedPeerLen] + ihPrefix[1] = countPrefix + if err = m.incr(txn, ihPrefix, 1); err != nil { + return + } + ihPrefix[0] = leecherPrefix + if err = m.incr(txn, ihPrefix, -1); err != nil { + return + } + ihPrefix[0] = downloadedPrefix + err = m.incr(txn, ihPrefix, 1) + return + }) } -func (m *mdb) AnnouncePeers(_ context.Context, ih bittorrent.InfoHash, forSeeder bool, numWant int, v6 bool) (peers []bittorrent.Peer, err error) { - //TODO implement me - panic("implement me") +type scanAction int + +const ( + next scanAction = iota + stop + del +) + +func (m *mdb) scanPeers(ctx context.Context, prefix []byte, rw bool, fn func(k, v []byte) scanAction) (err error) { + prefixLen := len(prefix) + txFunc := func(txn *lmdb.Txn) (err error) { + txn.RawRead = true + scanner := lmdbscan.New(txn, m.peersDB) + defer scanner.Close() + if scanner.SetNext(prefix, nil, lmdb.SetRange, lmdb.Next) { + loop: + for scanner.Scan() { + select { + case <-ctx.Done(): + return ctx.Err() + default: + k := scanner.Key() + if !bytes.HasPrefix(k, prefix) { + break loop + } + if len(k) == prefixLen+packedPeerLen { + switch fn(k, scanner.Val()) { + case del: + if err = scanner.Cursor().Del(0); err != nil { + break loop + } + case stop: + break loop + } + } else { + logger.Warn().Int("expected", prefixLen+packedPeerLen).Int("got", len(k)). + Msg("Invalid key length") + } + } + } + err = scanner.Err() + } + return + } + + if rw { + err = m.Update(txFunc) + } else { + err = m.View(txFunc) + } + + return +} + +func (m *mdb) AnnouncePeers(ctx context.Context, ih bittorrent.InfoHash, forSeeder bool, numWant int, v6 bool) (peers []bittorrent.Peer, err error) { + prefix, prefixLen := composeIHKeyPrefix(ih, false, v6, 0) + appendFn := func(k, _ []byte) scanAction { + peers = append(peers, unpackPeer(k[prefixLen:])) + numWant-- + res := next + if numWant == 0 { + res = stop + } + return res + } + if forSeeder { + err = m.scanPeers(ctx, prefix, false, appendFn) + } else { + prefix[0] = seederPrefix + if err = m.scanPeers(ctx, prefix, false, appendFn); err == nil && numWant > 0 { + prefix[0] = leecherPrefix + err = m.scanPeers(ctx, prefix, false, appendFn) + } + } + return } func (m *mdb) ScrapeSwarm(_ context.Context, ih bittorrent.InfoHash) (leechers uint32, seeders uint32, snatched uint32, err error) { + prefix, _ := composeIHKeyPrefix(ih, false, false, 0) + prefix[1] = countPrefix + var b []byte + err = m.View(func(txn *lmdb.Txn) (err error) { + if b, err = ignoreNotFoundData(txn.Get(m.peersDB, prefix)); err != nil { + return + } else if len(b) >= 4 { + leechers = binary.BigEndian.Uint32(b) + } + + prefix[0] = seederPrefix + if b, err = ignoreNotFoundData(txn.Get(m.peersDB, prefix)); err != nil { + return + } else if len(b) >= 4 { + seeders = binary.BigEndian.Uint32(b) + } + + prefix[0] = downloadedPrefix + if b, err = ignoreNotFoundData(txn.Get(m.peersDB, prefix)); err != nil { + return + } else if len(b) >= 4 { + snatched = binary.BigEndian.Uint32(b) + } + return + }) + return +} + +func (m *mdb) ScheduleGC(gcInterval, peerLifeTime time.Duration) { //TODO implement me panic("implement me") } func (m *mdb) Ping(_ context.Context) error { - //TODO implement me - panic("implement me") + _, err := m.Info() + return err } diff --git a/storage/mdb/storage_test.go b/storage/mdb/storage_test.go new file mode 100644 index 0000000..890157b --- /dev/null +++ b/storage/mdb/storage_test.go @@ -0,0 +1,55 @@ +package mdb + +import ( + "fmt" + s "github.com/sot-tech/mochi/storage" + "github.com/sot-tech/mochi/storage/test" + "os" + "testing" +) + +var cfg = config{ + Path: "", + Mode: defaultMode, + DataDBName: "KV", + PeersDBName: "PEERS", + MaxSize: defaultMapSize, +} + +func createNew() s.PeerStorage { + var ps s.PeerStorage + var err error + ps, err = newStorage(cfg) + if err != nil { + panic(fmt.Sprint("Unable to open/create LMDB: ", err)) + } + return ps +} + +func TestStorage(t *testing.T) { + tmpDir, err := os.MkdirTemp("", "lmdb*") + if err != nil { + t.Error(err) + } + t.Cleanup(func() { + err := os.RemoveAll(tmpDir) + if err != nil { + } + }) + cfg.Path = tmpDir + test.RunTests(t, createNew()) +} + +func BenchmarkStorage(b *testing.B) { + tmpDir, err := os.MkdirTemp("", "lmdb*") + if err != nil { + b.Error(err) + } + b.Cleanup(func() { + err := os.RemoveAll(tmpDir) + if err != nil { + } + }) + cfg.Path = tmpDir + test.RunBenchmarks(b, createNew) +}