From f010cabdb1b764c98156b61758f9c85055f47603 Mon Sep 17 00:00:00 2001 From: "Lawrence, Rendall" Date: Thu, 27 Jun 2024 20:27:20 +0300 Subject: [PATCH] remove separate key for L/S count --- docs/storage/lmdb.md | 3 +- middleware/torrentapproval/torrentapproval.go | 6 +- storage/mdb/storage.go | 132 +++++++++--------- storage/mdb/storage_disabled_test.go | 3 - 4 files changed, 74 insertions(+), 70 deletions(-) delete mode 100644 storage/mdb/storage_disabled_test.go diff --git a/docs/storage/lmdb.md b/docs/storage/lmdb.md index 72efda4..15310fe 100644 --- a/docs/storage/lmdb.md +++ b/docs/storage/lmdb.md @@ -35,8 +35,7 @@ Fields: * `` - 20 bytes of peer ID * `` - 16 bytes of BE-encoded IP address (real IPv6 or IPv4-mapped IPv6 address) * `` - 2 bytes of BE-encoded port -2. Key `__`, value - BE-encoded unsigned 32-bit integer. -`` is `LC`, `SC`, `DC` string for leechers, seeders or downloaded count of specified `` (20 or 32 bytes). +2. Key `DC__` - downloaded count of specified `` (20 or 32 bytes), value - BE-encoded unsigned 32-bit integer. Write speed may be increased with `no_sync_meta` and `async_write` configuration options, but the risk of DB corruption is also increase. diff --git a/middleware/torrentapproval/torrentapproval.go b/middleware/torrentapproval/torrentapproval.go index e4fa142..80962b1 100644 --- a/middleware/torrentapproval/torrentapproval.go +++ b/middleware/torrentapproval/torrentapproval.go @@ -102,8 +102,10 @@ func (h *hook) Close() (err error) { if cl, isOk := h.hashContainer.(io.Closer); isOk { err = cl.Close() } - if stErr := h.providedStorage.Close(); stErr != nil { - err = errors.Join(err, stErr) + if h.providedStorage != nil { + if stErr := h.providedStorage.Close(); stErr != nil { + err = errors.Join(err, stErr) + } } return err } diff --git a/storage/mdb/storage.go b/storage/mdb/storage.go index 7654a96..b8b406c 100644 --- a/storage/mdb/storage.go +++ b/storage/mdb/storage.go @@ -327,7 +327,7 @@ func unpackPeer(arr []byte) (peer bittorrent.Peer) { return } -func composeIHKeyPrefix(ih bittorrent.InfoHash, seeder bool, v6 bool, suffixLen int) (ihKey []byte, suffixStart int) { +func composeIHKeyPrefix(ih []byte, seeder bool, v6 bool, suffixLen int) (ihKey []byte, suffixStart int) { ihLen := len(ih) ihKey = make([]byte, ihLen+4+suffixLen) // prefix{L/S} + prefix{4/6} + separator + infoHash + separator if seeder { @@ -347,37 +347,17 @@ func composeIHKeyPrefix(ih bittorrent.InfoHash, seeder bool, v6 bool, suffixLen } func composeIHKey(ih bittorrent.InfoHash, peer bittorrent.Peer, seeder bool) (ihKey []byte) { - ihKey, start := composeIHKeyPrefix(ih, seeder, peer.Addr().Is6(), packedPeerLen) + ihKey, start := composeIHKeyPrefix(ih.Bytes(), seeder, peer.Addr().Is6(), packedPeerLen) packPeer(peer, ihKey[start:]) return } -func (m *mdb) incr(txn *lmdb.Txn, key []byte, inc int) (err error) { - var v int - var b []byte - if b, err = ignoreNotFoundData(txn.Get(m.peersDB, key)); err != nil { - return - } - if len(b) >= 4 { - v = int(binary.BigEndian.Uint32(b)) - } - v += inc - if v <= 0 { - err = ignoreNotFound(txn.Del(m.peersDB, key, nil)) - } else if b, err = txn.PutReserve(m.peersDB, key, 4, 0); err == nil { - binary.BigEndian.PutUint32(b, uint32(v)) - } - return -} - func (m *mdb) putPeer(ih bittorrent.InfoHash, peer bittorrent.Peer, seeder bool) error { ihKey := composeIHKey(ih, peer, seeder) return m.Update(func(txn *lmdb.Txn) (err error) { var b []byte if b, err = txn.PutReserve(m.peersDB, ihKey, 8, 0); err == nil { binary.BigEndian.PutUint64(b, uint64(timecache.NowUnix())) - ihKey[1] = countPrefix - err = m.incr(txn, ihKey[:len(ihKey)-packedPeerLen], 1) } return }) @@ -385,12 +365,8 @@ func (m *mdb) putPeer(ih bittorrent.InfoHash, peer bittorrent.Peer, seeder bool) func (m *mdb) delPeer(ih bittorrent.InfoHash, peer bittorrent.Peer, seeder bool) error { ihKey := composeIHKey(ih, peer, seeder) - return m.Update(func(txn *lmdb.Txn) (err error) { - if err = ignoreNotFound(txn.Del(m.peersDB, ihKey, nil)); err == nil { - ihKey[1] = countPrefix - err = m.incr(txn, ihKey[:len(ihKey)-packedPeerLen], -1) - } - return + return m.Update(func(txn *lmdb.Txn) error { + return ignoreNotFound(txn.Del(m.peersDB, ihKey, nil)) }) } @@ -422,17 +398,20 @@ func (m *mdb) GraduateLeecher(_ context.Context, ih bittorrent.InfoHash, peer bi return } binary.BigEndian.PutUint64(b, uint64(timecache.NowUnixNano())) + ihPrefix := ihKey[:len(ihKey)-packedPeerLen] - ihPrefix[1] = countPrefix - if err = m.incr(txn, ihPrefix, 1); err != nil { + ihPrefix[0], ihPrefix[1] = downloadedPrefix, countPrefix + var v int + if b, err = ignoreNotFoundData(txn.Get(m.peersDB, ihPrefix)); err != nil { return } - ihPrefix[0] = leecherPrefix - if err = m.incr(txn, ihPrefix, -1); err != nil { - return + if len(b) >= 4 { + v = int(binary.BigEndian.Uint32(b)) + } + v++ + if b, err = txn.PutReserve(m.peersDB, ihPrefix, 4, 0); err == nil { + binary.BigEndian.PutUint32(b, uint32(v)) } - ihPrefix[0] = downloadedPrefix - err = m.incr(txn, ihPrefix, 1) return }) } @@ -480,7 +459,7 @@ func (m *mdb) scanPeers(ctx context.Context, prefix []byte, readRaw bool, fn fun func (m *mdb) AnnouncePeers(ctx context.Context, ih bittorrent.InfoHash, forSeeder bool, numWant int, v6 bool) (peers []bittorrent.Peer, err error) { peers = make([]bittorrent.Peer, 0, numWant) - prefix, prefixLen := composeIHKeyPrefix(ih, false, v6, 0) + prefix, prefixLen := composeIHKeyPrefix(ih.Bytes(), false, v6, 0) appendFn := func(k, _ []byte) bool { peers = append(peers, unpackPeer(k[prefixLen:])) numWant-- @@ -498,28 +477,62 @@ func (m *mdb) AnnouncePeers(ctx context.Context, ih bittorrent.InfoHash, forSeed return } -func (m *mdb) ScrapeSwarm(_ context.Context, ih bittorrent.InfoHash) (leechers uint32, seeders uint32, snatched uint32, err error) { - prefix, _ := composeIHKeyPrefix(ih, false, false, 0) - prefix[1] = countPrefix - var b []byte +func (m *mdb) countPeers(ctx context.Context, scanPrefix []byte) (cnt uint32, err error) { + m.wg.Add(1) err = m.View(func(txn *lmdb.Txn) (err error) { - if b, err = ignoreNotFoundData(txn.Get(m.peersDB, prefix)); err != nil { - return - } else if len(b) >= 4 { - leechers = binary.BigEndian.Uint32(b) + txn.RawRead = true + scanner := lmdbscan.New(txn, m.peersDB) + if scanner.SetNext(scanPrefix, nil, lmdb.SetRange, lmdb.Next) { + var prevKey []byte + loop: + for scanner.Scan() { + select { + case <-ctx.Done(): + return ctx.Err() + default: + k := scanner.Key() + if len(k) == len(scanPrefix)+packedPeerLen && bytes.HasPrefix(scanPrefix, k[:len(k)-packedPeerLen]) { + if !bytes.Equal(k, prevKey) { + cnt++ + prevKey = k + } + } else if scanPrefix[1] == ipv4Prefix { + scanPrefix[1] = ipv6Prefix + if !scanner.SetNext(scanPrefix, nil, lmdb.SetRange, lmdb.Next) { + break loop + } + } else { + break loop + } + } + } } + err = scanner.Err() + scanner.Close() + return + }) + m.wg.Done() - prefix[0] = seederPrefix - if b, err = ignoreNotFoundData(txn.Get(m.peersDB, prefix)); err != nil { + return +} + +func (m *mdb) ScrapeSwarm(ctx context.Context, ih bittorrent.InfoHash) (leechers uint32, seeders uint32, snatched uint32, err error) { + scanPrefix, _ := composeIHKeyPrefix(ih.Bytes(), false, false, 0) + if leechers, err = m.countPeers(ctx, scanPrefix); err != nil { + return + } + scanPrefix[0], scanPrefix[1] = seederPrefix, ipv4Prefix + if seeders, err = m.countPeers(ctx, scanPrefix); err != nil { + return + } + + scanPrefix[0], scanPrefix[1] = downloadedPrefix, countPrefix + err = m.View(func(txn *lmdb.Txn) (err error) { + var b []byte + if b, err = ignoreNotFoundData(txn.Get(m.peersDB, scanPrefix)); err != nil { return - } else if len(b) >= 4 { - seeders = binary.BigEndian.Uint32(b) } - - prefix[0] = downloadedPrefix - if b, err = ignoreNotFoundData(txn.Get(m.peersDB, prefix)); err != nil { - return - } else if len(b) >= 4 { + if len(b) >= 4 { snatched = binary.BigEndian.Uint32(b) } return @@ -534,12 +547,12 @@ const ( func (m *mdb) gc(cutoff time.Time) { toDel := make([][]byte, 0, 50) - cutoffNano := cutoff.Unix() + cutoffUnix := cutoff.Unix() err := m.scanPeers(context.Background(), nil, false, func(k, v []byte) bool { if l := len(k); (l == v1IHKeyLen || l == v2IHKeyPen) && (k[0] == seederPrefix || k[0] == leecherPrefix) && (k[1] == ipv4Prefix || k[1] == ipv6Prefix) && - k[2] == keySeparator && len(v) >= 8 && cutoffNano >= int64(binary.BigEndian.Uint64(v)) { + k[2] == keySeparator && len(v) >= 8 && cutoffUnix >= int64(binary.BigEndian.Uint64(v)) { toDel = append(toDel, k) } return true @@ -547,14 +560,7 @@ func (m *mdb) gc(cutoff time.Time) { if err == nil { err = m.Update(func(txn *lmdb.Txn) (err error) { for _, k := range toDel { - err = txn.Del(m.peersDB, k, nil) - if err == nil { - k[1] = countPrefix - err = m.incr(txn, k[:len(k)-packedPeerLen], -1) - } else if lmdb.IsNotFound(err) { - err = nil - } - if err != nil { + if err = txn.Del(m.peersDB, k, nil); err != nil { break } } diff --git a/storage/mdb/storage_disabled_test.go b/storage/mdb/storage_disabled_test.go deleted file mode 100644 index 7346ef5..0000000 --- a/storage/mdb/storage_disabled_test.go +++ /dev/null @@ -1,3 +0,0 @@ -//go:build !cgo - -package mdb