remove separate key for L/S count

This commit is contained in:
Lawrence, Rendall
2024-06-27 20:27:20 +03:00
parent 32193ee329
commit f010cabdb1
4 changed files with 74 additions and 70 deletions

View File

@@ -35,8 +35,7 @@ Fields:
* `<PEERID>` - 20 bytes of peer ID * `<PEERID>` - 20 bytes of peer ID
* `<IPADDRESS>` - 16 bytes of BE-encoded IP address (real IPv6 or IPv4-mapped IPv6 address) * `<IPADDRESS>` - 16 bytes of BE-encoded IP address (real IPv6 or IPv4-mapped IPv6 address)
* `<PORT>` - 2 bytes of BE-encoded port * `<PORT>` - 2 bytes of BE-encoded port
2. Key `<PREFIX>_<INFOHASH>_`, value - BE-encoded unsigned 32-bit integer. 2. Key `DC_<INFOHASH>_` - downloaded count of specified `<INFOHASH>` (20 or 32 bytes), value - BE-encoded unsigned 32-bit integer.
`<PREFIX>` is `LC`, `SC`, `DC` string for leechers, seeders or downloaded count of specified `<INFOHASH>` (20 or 32 bytes).
Write speed may be increased with `no_sync_meta` and `async_write` configuration options, Write speed may be increased with `no_sync_meta` and `async_write` configuration options,
but the risk of DB corruption is also increase. but the risk of DB corruption is also increase.

View File

@@ -102,8 +102,10 @@ func (h *hook) Close() (err error) {
if cl, isOk := h.hashContainer.(io.Closer); isOk { if cl, isOk := h.hashContainer.(io.Closer); isOk {
err = cl.Close() err = cl.Close()
} }
if stErr := h.providedStorage.Close(); stErr != nil { if h.providedStorage != nil {
err = errors.Join(err, stErr) if stErr := h.providedStorage.Close(); stErr != nil {
err = errors.Join(err, stErr)
}
} }
return err return err
} }

View File

@@ -327,7 +327,7 @@ func unpackPeer(arr []byte) (peer bittorrent.Peer) {
return return
} }
func composeIHKeyPrefix(ih bittorrent.InfoHash, seeder bool, v6 bool, suffixLen int) (ihKey []byte, suffixStart int) { func composeIHKeyPrefix(ih []byte, seeder bool, v6 bool, suffixLen int) (ihKey []byte, suffixStart int) {
ihLen := len(ih) ihLen := len(ih)
ihKey = make([]byte, ihLen+4+suffixLen) // prefix{L/S} + prefix{4/6} + separator + infoHash + separator ihKey = make([]byte, ihLen+4+suffixLen) // prefix{L/S} + prefix{4/6} + separator + infoHash + separator
if seeder { if seeder {
@@ -347,37 +347,17 @@ func composeIHKeyPrefix(ih bittorrent.InfoHash, seeder bool, v6 bool, suffixLen
} }
func composeIHKey(ih bittorrent.InfoHash, peer bittorrent.Peer, seeder bool) (ihKey []byte) { func composeIHKey(ih bittorrent.InfoHash, peer bittorrent.Peer, seeder bool) (ihKey []byte) {
ihKey, start := composeIHKeyPrefix(ih, seeder, peer.Addr().Is6(), packedPeerLen) ihKey, start := composeIHKeyPrefix(ih.Bytes(), seeder, peer.Addr().Is6(), packedPeerLen)
packPeer(peer, ihKey[start:]) packPeer(peer, ihKey[start:])
return return
} }
func (m *mdb) incr(txn *lmdb.Txn, key []byte, inc int) (err error) {
var v int
var b []byte
if b, err = ignoreNotFoundData(txn.Get(m.peersDB, key)); err != nil {
return
}
if len(b) >= 4 {
v = int(binary.BigEndian.Uint32(b))
}
v += inc
if v <= 0 {
err = ignoreNotFound(txn.Del(m.peersDB, key, nil))
} else if b, err = txn.PutReserve(m.peersDB, key, 4, 0); err == nil {
binary.BigEndian.PutUint32(b, uint32(v))
}
return
}
func (m *mdb) putPeer(ih bittorrent.InfoHash, peer bittorrent.Peer, seeder bool) error { func (m *mdb) putPeer(ih bittorrent.InfoHash, peer bittorrent.Peer, seeder bool) error {
ihKey := composeIHKey(ih, peer, seeder) ihKey := composeIHKey(ih, peer, seeder)
return m.Update(func(txn *lmdb.Txn) (err error) { return m.Update(func(txn *lmdb.Txn) (err error) {
var b []byte var b []byte
if b, err = txn.PutReserve(m.peersDB, ihKey, 8, 0); err == nil { if b, err = txn.PutReserve(m.peersDB, ihKey, 8, 0); err == nil {
binary.BigEndian.PutUint64(b, uint64(timecache.NowUnix())) binary.BigEndian.PutUint64(b, uint64(timecache.NowUnix()))
ihKey[1] = countPrefix
err = m.incr(txn, ihKey[:len(ihKey)-packedPeerLen], 1)
} }
return return
}) })
@@ -385,12 +365,8 @@ func (m *mdb) putPeer(ih bittorrent.InfoHash, peer bittorrent.Peer, seeder bool)
func (m *mdb) delPeer(ih bittorrent.InfoHash, peer bittorrent.Peer, seeder bool) error { func (m *mdb) delPeer(ih bittorrent.InfoHash, peer bittorrent.Peer, seeder bool) error {
ihKey := composeIHKey(ih, peer, seeder) ihKey := composeIHKey(ih, peer, seeder)
return m.Update(func(txn *lmdb.Txn) (err error) { return m.Update(func(txn *lmdb.Txn) error {
if err = ignoreNotFound(txn.Del(m.peersDB, ihKey, nil)); err == nil { return ignoreNotFound(txn.Del(m.peersDB, ihKey, nil))
ihKey[1] = countPrefix
err = m.incr(txn, ihKey[:len(ihKey)-packedPeerLen], -1)
}
return
}) })
} }
@@ -422,17 +398,20 @@ func (m *mdb) GraduateLeecher(_ context.Context, ih bittorrent.InfoHash, peer bi
return return
} }
binary.BigEndian.PutUint64(b, uint64(timecache.NowUnixNano())) binary.BigEndian.PutUint64(b, uint64(timecache.NowUnixNano()))
ihPrefix := ihKey[:len(ihKey)-packedPeerLen] ihPrefix := ihKey[:len(ihKey)-packedPeerLen]
ihPrefix[1] = countPrefix ihPrefix[0], ihPrefix[1] = downloadedPrefix, countPrefix
if err = m.incr(txn, ihPrefix, 1); err != nil { var v int
if b, err = ignoreNotFoundData(txn.Get(m.peersDB, ihPrefix)); err != nil {
return return
} }
ihPrefix[0] = leecherPrefix if len(b) >= 4 {
if err = m.incr(txn, ihPrefix, -1); err != nil { v = int(binary.BigEndian.Uint32(b))
return }
v++
if b, err = txn.PutReserve(m.peersDB, ihPrefix, 4, 0); err == nil {
binary.BigEndian.PutUint32(b, uint32(v))
} }
ihPrefix[0] = downloadedPrefix
err = m.incr(txn, ihPrefix, 1)
return return
}) })
} }
@@ -480,7 +459,7 @@ func (m *mdb) scanPeers(ctx context.Context, prefix []byte, readRaw bool, fn fun
func (m *mdb) AnnouncePeers(ctx context.Context, ih bittorrent.InfoHash, forSeeder bool, numWant int, v6 bool) (peers []bittorrent.Peer, err error) { func (m *mdb) AnnouncePeers(ctx context.Context, ih bittorrent.InfoHash, forSeeder bool, numWant int, v6 bool) (peers []bittorrent.Peer, err error) {
peers = make([]bittorrent.Peer, 0, numWant) peers = make([]bittorrent.Peer, 0, numWant)
prefix, prefixLen := composeIHKeyPrefix(ih, false, v6, 0) prefix, prefixLen := composeIHKeyPrefix(ih.Bytes(), false, v6, 0)
appendFn := func(k, _ []byte) bool { appendFn := func(k, _ []byte) bool {
peers = append(peers, unpackPeer(k[prefixLen:])) peers = append(peers, unpackPeer(k[prefixLen:]))
numWant-- numWant--
@@ -498,28 +477,62 @@ func (m *mdb) AnnouncePeers(ctx context.Context, ih bittorrent.InfoHash, forSeed
return return
} }
func (m *mdb) ScrapeSwarm(_ context.Context, ih bittorrent.InfoHash) (leechers uint32, seeders uint32, snatched uint32, err error) { func (m *mdb) countPeers(ctx context.Context, scanPrefix []byte) (cnt uint32, err error) {
prefix, _ := composeIHKeyPrefix(ih, false, false, 0) m.wg.Add(1)
prefix[1] = countPrefix
var b []byte
err = m.View(func(txn *lmdb.Txn) (err error) { err = m.View(func(txn *lmdb.Txn) (err error) {
if b, err = ignoreNotFoundData(txn.Get(m.peersDB, prefix)); err != nil { txn.RawRead = true
return scanner := lmdbscan.New(txn, m.peersDB)
} else if len(b) >= 4 { if scanner.SetNext(scanPrefix, nil, lmdb.SetRange, lmdb.Next) {
leechers = binary.BigEndian.Uint32(b) var prevKey []byte
loop:
for scanner.Scan() {
select {
case <-ctx.Done():
return ctx.Err()
default:
k := scanner.Key()
if len(k) == len(scanPrefix)+packedPeerLen && bytes.HasPrefix(scanPrefix, k[:len(k)-packedPeerLen]) {
if !bytes.Equal(k, prevKey) {
cnt++
prevKey = k
}
} else if scanPrefix[1] == ipv4Prefix {
scanPrefix[1] = ipv6Prefix
if !scanner.SetNext(scanPrefix, nil, lmdb.SetRange, lmdb.Next) {
break loop
}
} else {
break loop
}
}
}
} }
err = scanner.Err()
scanner.Close()
return
})
m.wg.Done()
prefix[0] = seederPrefix return
if b, err = ignoreNotFoundData(txn.Get(m.peersDB, prefix)); err != nil { }
func (m *mdb) ScrapeSwarm(ctx context.Context, ih bittorrent.InfoHash) (leechers uint32, seeders uint32, snatched uint32, err error) {
scanPrefix, _ := composeIHKeyPrefix(ih.Bytes(), false, false, 0)
if leechers, err = m.countPeers(ctx, scanPrefix); err != nil {
return
}
scanPrefix[0], scanPrefix[1] = seederPrefix, ipv4Prefix
if seeders, err = m.countPeers(ctx, scanPrefix); err != nil {
return
}
scanPrefix[0], scanPrefix[1] = downloadedPrefix, countPrefix
err = m.View(func(txn *lmdb.Txn) (err error) {
var b []byte
if b, err = ignoreNotFoundData(txn.Get(m.peersDB, scanPrefix)); err != nil {
return return
} else if len(b) >= 4 {
seeders = binary.BigEndian.Uint32(b)
} }
if len(b) >= 4 {
prefix[0] = downloadedPrefix
if b, err = ignoreNotFoundData(txn.Get(m.peersDB, prefix)); err != nil {
return
} else if len(b) >= 4 {
snatched = binary.BigEndian.Uint32(b) snatched = binary.BigEndian.Uint32(b)
} }
return return
@@ -534,12 +547,12 @@ const (
func (m *mdb) gc(cutoff time.Time) { func (m *mdb) gc(cutoff time.Time) {
toDel := make([][]byte, 0, 50) toDel := make([][]byte, 0, 50)
cutoffNano := cutoff.Unix() cutoffUnix := cutoff.Unix()
err := m.scanPeers(context.Background(), nil, false, func(k, v []byte) bool { err := m.scanPeers(context.Background(), nil, false, func(k, v []byte) bool {
if l := len(k); (l == v1IHKeyLen || l == v2IHKeyPen) && if l := len(k); (l == v1IHKeyLen || l == v2IHKeyPen) &&
(k[0] == seederPrefix || k[0] == leecherPrefix) && (k[0] == seederPrefix || k[0] == leecherPrefix) &&
(k[1] == ipv4Prefix || k[1] == ipv6Prefix) && (k[1] == ipv4Prefix || k[1] == ipv6Prefix) &&
k[2] == keySeparator && len(v) >= 8 && cutoffNano >= int64(binary.BigEndian.Uint64(v)) { k[2] == keySeparator && len(v) >= 8 && cutoffUnix >= int64(binary.BigEndian.Uint64(v)) {
toDel = append(toDel, k) toDel = append(toDel, k)
} }
return true return true
@@ -547,14 +560,7 @@ func (m *mdb) gc(cutoff time.Time) {
if err == nil { if err == nil {
err = m.Update(func(txn *lmdb.Txn) (err error) { err = m.Update(func(txn *lmdb.Txn) (err error) {
for _, k := range toDel { for _, k := range toDel {
err = txn.Del(m.peersDB, k, nil) if err = txn.Del(m.peersDB, k, nil); err != nil {
if err == nil {
k[1] = countPrefix
err = m.incr(txn, k[:len(k)-packedPeerLen], -1)
} else if lmdb.IsNotFound(err) {
err = nil
}
if err != nil {
break break
} }
} }

View File

@@ -1,3 +0,0 @@
//go:build !cgo
package mdb