(WIP) implementing lmdb peer storage

This commit is contained in:
Lawrence, Rendall
2024-06-04 18:21:53 +03:00
parent b216e15f37
commit 2adbb5f8c7
4 changed files with 114 additions and 56 deletions
+6 -6
View File
@@ -126,19 +126,19 @@ func (s *store) delPeer(ctx context.Context, infoHashKey, peerID string) error {
}
func (s *store) PutSeeder(ctx context.Context, ih bittorrent.InfoHash, peer bittorrent.Peer) error {
return s.addPeer(ctx, r.InfoHashKey(ih.RawString(), true, peer.Addr().Is6(), ""), r.PackPeer(peer))
return s.addPeer(ctx, r.InfoHashKey(ih.RawString(), true, peer.Addr().Is6()), r.PackPeer(peer))
}
func (s *store) DeleteSeeder(ctx context.Context, ih bittorrent.InfoHash, peer bittorrent.Peer) error {
return s.delPeer(ctx, r.InfoHashKey(ih.RawString(), true, peer.Addr().Is6(), ""), r.PackPeer(peer))
return s.delPeer(ctx, r.InfoHashKey(ih.RawString(), true, peer.Addr().Is6()), r.PackPeer(peer))
}
func (s *store) PutLeecher(ctx context.Context, ih bittorrent.InfoHash, peer bittorrent.Peer) error {
return s.addPeer(ctx, r.InfoHashKey(ih.RawString(), false, peer.Addr().Is6(), ""), r.PackPeer(peer))
return s.addPeer(ctx, r.InfoHashKey(ih.RawString(), false, peer.Addr().Is6()), r.PackPeer(peer))
}
func (s *store) DeleteLeecher(ctx context.Context, ih bittorrent.InfoHash, peer bittorrent.Peer) error {
return s.delPeer(ctx, r.InfoHashKey(ih.RawString(), false, peer.Addr().Is6(), ""), r.PackPeer(peer))
return s.delPeer(ctx, r.InfoHashKey(ih.RawString(), false, peer.Addr().Is6()), r.PackPeer(peer))
}
func (s *store) GraduateLeecher(ctx context.Context, ih bittorrent.InfoHash, peer bittorrent.Peer) (err error) {
@@ -147,8 +147,8 @@ func (s *store) GraduateLeecher(ctx context.Context, ih bittorrent.InfoHash, pee
Object("peer", peer).
Msg("graduate leecher")
infoHash, peerID := ih.RawString(), r.PackPeer(peer)
ihSeederKey := r.InfoHashKey(infoHash, true, peer.Addr().Is6(), "")
ihLeecherKey := r.InfoHashKey(infoHash, false, peer.Addr().Is6(), "")
ihSeederKey := r.InfoHashKey(infoHash, true, peer.Addr().Is6())
ihLeecherKey := r.InfoHashKey(infoHash, false, peer.Addr().Is6())
var moved bool
if moved, err = s.SMove(ctx, ihLeecherKey, ihSeederKey, peerID).Result(); err == nil {
if !moved {
+78 -18
View File
@@ -1,15 +1,18 @@
//go:build lmdb && cgo
//go:build cgo
package mdb
import (
"context"
"encoding/binary"
"errors"
"github.com/bmatsuo/lmdb-go/lmdb"
"github.com/sot-tech/mochi/bittorrent"
"github.com/sot-tech/mochi/pkg/conf"
"github.com/sot-tech/mochi/pkg/log"
"github.com/sot-tech/mochi/pkg/timecache"
"github.com/sot-tech/mochi/storage"
"net/netip"
"os"
)
@@ -120,10 +123,9 @@ func newStorage(cfg config) (*mdb, error) {
return
}
if len(cfg.PeersDBName) > 0 {
peersDB, err = txn.OpenDBI(cfg.PeersDBName, lmdb.Create|lmdb.DupSort|lmdb.DupFixed)
peersDB, err = txn.CreateDBI(cfg.PeersDBName)
} else {
peersDB, err = txn.OpenRoot(lmdb.DupSort | lmdb.DupFixed)
peersDB, err = txn.OpenRoot(0)
}
return
}); err != nil {
@@ -147,6 +149,20 @@ func (m *mdb) Close() (err error) {
const keySeparator = '_'
func ignoreNotFound(err error) error {
if lmdb.IsNotFound(err) {
err = nil
}
return err
}
func ignoreNotFoundData(data []byte, err error) ([]byte, error) {
if lmdb.IsNotFound(err) {
err = nil
}
return data, err
}
func composeKey(ctx, key string) []byte {
ctxLen := len(ctx)
res := make([]byte, ctxLen+len(key)+1)
@@ -159,8 +175,12 @@ func composeKey(ctx, key string) []byte {
func (m *mdb) Put(_ context.Context, storeCtx string, values ...storage.Entry) (err error) {
if len(values) > 0 {
err = m.Update(func(txn *lmdb.Txn) (err error) {
var data []byte
for _, kv := range values {
if err = txn.Put(m.dataDB, composeKey(storeCtx, kv.Key), kv.Value, 0); err != nil {
vl := len(kv.Value)
if data, err = txn.PutReserve(m.dataDB, composeKey(storeCtx, kv.Key), vl, 0); err == nil {
copy(data, kv.Value)
} else {
break
}
}
@@ -183,16 +203,9 @@ func (m *mdb) Contains(_ context.Context, storeCtx string, key string) (contains
return
}
func ignoreNotFound(data []byte, err error) ([]byte, error) {
if err != nil && lmdb.IsNotFound(err) {
err = nil
}
return data, err
}
func (m *mdb) Load(_ context.Context, storeCtx string, key string) (v []byte, err error) {
err = m.View(func(txn *lmdb.Txn) (err error) {
v, err = ignoreNotFound(txn.Get(m.dataDB, composeKey(storeCtx, key)))
v, err = ignoreNotFoundData(txn.Get(m.dataDB, composeKey(storeCtx, key)))
return
})
return
@@ -202,7 +215,7 @@ func (m *mdb) Delete(_ context.Context, storeCtx string, keys ...string) (err er
if len(keys) > 0 {
err = m.Update(func(txn *lmdb.Txn) (err error) {
for _, k := range keys {
if err = txn.Del(m.dataDB, composeKey(storeCtx, k), nil); err != nil {
if err = ignoreNotFound(txn.Del(m.dataDB, composeKey(storeCtx, k), nil)); err != nil {
break
}
}
@@ -212,9 +225,53 @@ func (m *mdb) Delete(_ context.Context, storeCtx string, keys ...string) (err er
return
}
const (
ipLen = 16
packedPeerLen = bittorrent.PeerIDLen + ipLen + 2
seederPrefix = 'S'
leecherPrefix = 'L'
)
func packPeer(peer bittorrent.Peer, out []byte) {
_ = out[packedPeerLen-1]
copy(out, peer.ID.Bytes())
a := peer.Addr().As16()
copy(out[bittorrent.PeerIDLen:], a[:])
binary.BigEndian.PutUint16(out[bittorrent.PeerIDLen+ipLen:], peer.Port())
return
}
func unpackPeer(arr []byte) (peer bittorrent.Peer) {
_ = arr[packedPeerLen-1]
peerID, _ := bittorrent.NewPeerID(arr[:bittorrent.PeerIDLen])
peer = bittorrent.Peer{
ID: peerID,
AddrPort: netip.AddrPortFrom(netip.AddrFrom16([ipLen]byte(arr[bittorrent.PeerIDLen:])).Unmap(),
binary.BigEndian.Uint16(arr[bittorrent.PeerIDLen+ipLen:])),
}
return
}
func composeIHKey(ih bittorrent.InfoHash, peer bittorrent.Peer, seeder bool) (ihKey []byte) {
ihLen := len(ih)
ihKey = make([]byte, ihLen+3+packedPeerLen)
if seeder {
ihKey[0] = seederPrefix
} else {
ihKey[0] = leecherPrefix
}
ihKey[1], ihKey[ihLen+2] = keySeparator, keySeparator
copy(ihKey[2:], ih)
packPeer(peer, ihKey[ihLen+3:])
return
}
func (m *mdb) PutSeeder(_ context.Context, ih bittorrent.InfoHash, peer bittorrent.Peer) error {
//TODO implement me
panic("implement me")
return m.Update(func(txn *lmdb.Txn) error {
return txn.Put(m.peersDB, composeIHKey(ih, peer, true),
binary.BigEndian.AppendUint64(nil, uint64(timecache.NowUnixNano())),
0)
})
}
func (m *mdb) DeleteSeeder(_ context.Context, ih bittorrent.InfoHash, peer bittorrent.Peer) error {
@@ -223,8 +280,11 @@ func (m *mdb) DeleteSeeder(_ context.Context, ih bittorrent.InfoHash, peer bitto
}
func (m *mdb) PutLeecher(_ context.Context, ih bittorrent.InfoHash, peer bittorrent.Peer) error {
//TODO implement me
panic("implement me")
return m.Update(func(txn *lmdb.Txn) error {
return txn.Put(m.peersDB, composeIHKey(ih, peer, false),
binary.BigEndian.AppendUint64(nil, uint64(timecache.NowUnixNano())),
0)
})
}
func (m *mdb) DeleteLeecher(_ context.Context, ih bittorrent.InfoHash, peer bittorrent.Peer) error {
+1 -1
View File
@@ -1,3 +1,3 @@
//go:build !lmdb || !cgo
//go:build !cgo
package mdb
+29 -31
View File
@@ -343,7 +343,7 @@ func NoResultErr(err error) error {
}
// InfoHashKey generates redis key for provided hash and flags
func InfoHashKey(infoHash string, seeder, v6 bool, suffix string) (infoHashKey string) {
func InfoHashKey(infoHash string, seeder, v6 bool) (infoHashKey string) {
var bm int
if seeder {
bm = 0b01
@@ -361,7 +361,7 @@ func InfoHashKey(infoHash string, seeder, v6 bool, suffix string) (infoHashKey s
case 0b00:
infoHashKey = IH4LeecherKey
}
infoHashKey += infoHash + suffix
infoHashKey += infoHash
return
}
@@ -411,19 +411,19 @@ func PackPeer(p bittorrent.Peer) string {
}
func (ps *store) PutSeeder(ctx context.Context, ih bittorrent.InfoHash, peer bittorrent.Peer) error {
return ps.putPeer(ctx, InfoHashKey(ih.RawString(), true, peer.Addr().Is6(), ""), CountSeederKey, PackPeer(peer))
return ps.putPeer(ctx, InfoHashKey(ih.RawString(), true, peer.Addr().Is6()), CountSeederKey, PackPeer(peer))
}
func (ps *store) DeleteSeeder(ctx context.Context, ih bittorrent.InfoHash, peer bittorrent.Peer) error {
return ps.delPeer(ctx, InfoHashKey(ih.RawString(), true, peer.Addr().Is6(), ""), CountSeederKey, PackPeer(peer))
return ps.delPeer(ctx, InfoHashKey(ih.RawString(), true, peer.Addr().Is6()), CountSeederKey, PackPeer(peer))
}
func (ps *store) PutLeecher(ctx context.Context, ih bittorrent.InfoHash, peer bittorrent.Peer) error {
return ps.putPeer(ctx, InfoHashKey(ih.RawString(), false, peer.Addr().Is6(), ""), CountLeecherKey, PackPeer(peer))
return ps.putPeer(ctx, InfoHashKey(ih.RawString(), false, peer.Addr().Is6()), CountLeecherKey, PackPeer(peer))
}
func (ps *store) DeleteLeecher(ctx context.Context, ih bittorrent.InfoHash, peer bittorrent.Peer) error {
return ps.delPeer(ctx, InfoHashKey(ih.RawString(), false, peer.Addr().Is6(), ""), CountLeecherKey, PackPeer(peer))
return ps.delPeer(ctx, InfoHashKey(ih.RawString(), false, peer.Addr().Is6()), CountLeecherKey, PackPeer(peer))
}
func (ps *store) GraduateLeecher(ctx context.Context, ih bittorrent.InfoHash, peer bittorrent.Peer) error {
@@ -433,7 +433,7 @@ func (ps *store) GraduateLeecher(ctx context.Context, ih bittorrent.InfoHash, pe
Msg("graduate leecher")
infoHash, peerID, isV6 := ih.RawString(), PackPeer(peer), peer.Addr().Is6()
ihSeederKey, ihLeecherKey := InfoHashKey(infoHash, true, isV6, ""), InfoHashKey(infoHash, false, isV6, "")
ihSeederKey, ihLeecherKey := InfoHashKey(infoHash, true, isV6), InfoHashKey(infoHash, false, isV6)
return ps.tx(ctx, func(tx redis.Pipeliner) error {
deleted, err := tx.HDel(ctx, ihLeecherKey, peerID).Uint64()
@@ -462,31 +462,29 @@ func (ps *store) GraduateLeecher(ctx context.Context, ih bittorrent.InfoHash, pe
// peerMinimumLen is the least allowed length of string serialized Peer
const peerMinimumLen = bittorrent.PeerIDLen + 2 + net.IPv4len
var errInvalidPeerDataSize = fmt.Errorf("invalid peer data (must be at least %d bytes (InfoHash + Port + IPv4))", peerMinimumLen)
var errInvalidPeerDataSize = fmt.Errorf("invalid peer data (must be at least %d bytes (PeerID + Port + IPv4))", peerMinimumLen)
// UnpackPeer constructs Peer from serialized by Peer.PackPeer data: PeerID[20by]Port[2by]net.IP[4/16by]
func UnpackPeer(data string) (bittorrent.Peer, error) {
var peer bittorrent.Peer
func UnpackPeer(data string) (peer bittorrent.Peer, err error) {
if len(data) < peerMinimumLen {
return peer, errInvalidPeerDataSize
err = errInvalidPeerDataSize
return
}
b := str2bytes.StringToBytes(data)
peerID, err := bittorrent.NewPeerID(b[:bittorrent.PeerIDLen])
if err == nil {
if addr, isOk := netip.AddrFromSlice(b[bittorrent.PeerIDLen+2:]); isOk {
peer = bittorrent.Peer{
ID: peerID,
AddrPort: netip.AddrPortFrom(
addr.Unmap(),
binary.BigEndian.Uint16(b[bittorrent.PeerIDLen:bittorrent.PeerIDLen+2]),
),
}
} else {
err = bittorrent.ErrInvalidIP
peerID, _ := bittorrent.NewPeerID(b[:bittorrent.PeerIDLen])
if addr, isOk := netip.AddrFromSlice(b[bittorrent.PeerIDLen+2:]); isOk {
peer = bittorrent.Peer{
ID: peerID,
AddrPort: netip.AddrPortFrom(
addr.Unmap(),
binary.BigEndian.Uint16(b[bittorrent.PeerIDLen:bittorrent.PeerIDLen+2]),
),
}
} else {
err = bittorrent.ErrInvalidIP
}
return peer, err
return
}
func (ps *Connection) parsePeersList(peersResult *redis.StringSliceCmd) (peers []bittorrent.Peer, err error) {
@@ -516,10 +514,10 @@ func (ps *Connection) GetPeers(ctx context.Context, ih bittorrent.InfoHash, forS
infoHashKeys := make([]string, 1, 2)
if forSeeder {
infoHashKeys[0] = InfoHashKey(infoHash, false, isV6, "")
infoHashKeys[0] = InfoHashKey(infoHash, false, isV6)
} else {
infoHashKeys[0] = InfoHashKey(infoHash, true, isV6, "")
infoHashKeys = append(infoHashKeys, InfoHashKey(infoHash, false, isV6, ""))
infoHashKeys[0] = InfoHashKey(infoHash, true, isV6)
infoHashKeys = append(infoHashKeys, InfoHashKey(infoHash, false, isV6))
}
for _, infoHashKey := range infoHashKeys {
@@ -564,19 +562,19 @@ func (ps *Connection) ScrapeIH(ctx context.Context, ih bittorrent.InfoHash, coun
infoHash := ih.RawString()
var lc4, lc6, sc4, sc6, dc int64
lc4, err = countFn(ctx, InfoHashKey(infoHash, false, false, "")).Result()
lc4, err = countFn(ctx, InfoHashKey(infoHash, false, false)).Result()
if err = NoResultErr(err); err != nil {
return
}
lc6, err = countFn(ctx, InfoHashKey(infoHash, false, true, "")).Result()
lc6, err = countFn(ctx, InfoHashKey(infoHash, false, true)).Result()
if err = NoResultErr(err); err != nil {
return
}
sc4, err = countFn(ctx, InfoHashKey(infoHash, true, false, "")).Result()
sc4, err = countFn(ctx, InfoHashKey(infoHash, true, false)).Result()
if err = NoResultErr(err); err != nil {
return
}
sc6, err = countFn(ctx, InfoHashKey(infoHash, true, true, "")).Result()
sc6, err = countFn(ctx, InfoHashKey(infoHash, true, true)).Result()
if err = NoResultErr(err); err != nil {
return
}