diff --git a/storage/keydb/storage.go b/storage/keydb/storage.go index b7b51fd..d5fddd0 100644 --- a/storage/keydb/storage.go +++ b/storage/keydb/storage.go @@ -126,19 +126,19 @@ func (s *store) delPeer(ctx context.Context, infoHashKey, peerID string) error { } func (s *store) PutSeeder(ctx context.Context, ih bittorrent.InfoHash, peer bittorrent.Peer) error { - return s.addPeer(ctx, r.InfoHashKey(ih.RawString(), true, peer.Addr().Is6(), ""), r.PackPeer(peer)) + return s.addPeer(ctx, r.InfoHashKey(ih.RawString(), true, peer.Addr().Is6()), r.PackPeer(peer)) } func (s *store) DeleteSeeder(ctx context.Context, ih bittorrent.InfoHash, peer bittorrent.Peer) error { - return s.delPeer(ctx, r.InfoHashKey(ih.RawString(), true, peer.Addr().Is6(), ""), r.PackPeer(peer)) + return s.delPeer(ctx, r.InfoHashKey(ih.RawString(), true, peer.Addr().Is6()), r.PackPeer(peer)) } func (s *store) PutLeecher(ctx context.Context, ih bittorrent.InfoHash, peer bittorrent.Peer) error { - return s.addPeer(ctx, r.InfoHashKey(ih.RawString(), false, peer.Addr().Is6(), ""), r.PackPeer(peer)) + return s.addPeer(ctx, r.InfoHashKey(ih.RawString(), false, peer.Addr().Is6()), r.PackPeer(peer)) } func (s *store) DeleteLeecher(ctx context.Context, ih bittorrent.InfoHash, peer bittorrent.Peer) error { - return s.delPeer(ctx, r.InfoHashKey(ih.RawString(), false, peer.Addr().Is6(), ""), r.PackPeer(peer)) + return s.delPeer(ctx, r.InfoHashKey(ih.RawString(), false, peer.Addr().Is6()), r.PackPeer(peer)) } func (s *store) GraduateLeecher(ctx context.Context, ih bittorrent.InfoHash, peer bittorrent.Peer) (err error) { @@ -147,8 +147,8 @@ func (s *store) GraduateLeecher(ctx context.Context, ih bittorrent.InfoHash, pee Object("peer", peer). Msg("graduate leecher") infoHash, peerID := ih.RawString(), r.PackPeer(peer) - ihSeederKey := r.InfoHashKey(infoHash, true, peer.Addr().Is6(), "") - ihLeecherKey := r.InfoHashKey(infoHash, false, peer.Addr().Is6(), "") + ihSeederKey := r.InfoHashKey(infoHash, true, peer.Addr().Is6()) + ihLeecherKey := r.InfoHashKey(infoHash, false, peer.Addr().Is6()) var moved bool if moved, err = s.SMove(ctx, ihLeecherKey, ihSeederKey, peerID).Result(); err == nil { if !moved { diff --git a/storage/mdb/storage.go b/storage/mdb/storage.go index 2cb3bed..43563b7 100644 --- a/storage/mdb/storage.go +++ b/storage/mdb/storage.go @@ -1,15 +1,18 @@ -//go:build lmdb && cgo +//go:build cgo package mdb import ( "context" + "encoding/binary" "errors" "github.com/bmatsuo/lmdb-go/lmdb" "github.com/sot-tech/mochi/bittorrent" "github.com/sot-tech/mochi/pkg/conf" "github.com/sot-tech/mochi/pkg/log" + "github.com/sot-tech/mochi/pkg/timecache" "github.com/sot-tech/mochi/storage" + "net/netip" "os" ) @@ -120,10 +123,9 @@ func newStorage(cfg config) (*mdb, error) { return } if len(cfg.PeersDBName) > 0 { - peersDB, err = txn.OpenDBI(cfg.PeersDBName, lmdb.Create|lmdb.DupSort|lmdb.DupFixed) - + peersDB, err = txn.CreateDBI(cfg.PeersDBName) } else { - peersDB, err = txn.OpenRoot(lmdb.DupSort | lmdb.DupFixed) + peersDB, err = txn.OpenRoot(0) } return }); err != nil { @@ -147,6 +149,20 @@ func (m *mdb) Close() (err error) { const keySeparator = '_' +func ignoreNotFound(err error) error { + if lmdb.IsNotFound(err) { + err = nil + } + return err +} + +func ignoreNotFoundData(data []byte, err error) ([]byte, error) { + if lmdb.IsNotFound(err) { + err = nil + } + return data, err +} + func composeKey(ctx, key string) []byte { ctxLen := len(ctx) res := make([]byte, ctxLen+len(key)+1) @@ -159,8 +175,12 @@ func composeKey(ctx, key string) []byte { func (m *mdb) Put(_ context.Context, storeCtx string, values ...storage.Entry) (err error) { if len(values) > 0 { err = m.Update(func(txn *lmdb.Txn) (err error) { + var data []byte for _, kv := range values { - if err = txn.Put(m.dataDB, composeKey(storeCtx, kv.Key), kv.Value, 0); err != nil { + vl := len(kv.Value) + if data, err = txn.PutReserve(m.dataDB, composeKey(storeCtx, kv.Key), vl, 0); err == nil { + copy(data, kv.Value) + } else { break } } @@ -183,16 +203,9 @@ func (m *mdb) Contains(_ context.Context, storeCtx string, key string) (contains return } -func ignoreNotFound(data []byte, err error) ([]byte, error) { - if err != nil && lmdb.IsNotFound(err) { - err = nil - } - return data, err -} - func (m *mdb) Load(_ context.Context, storeCtx string, key string) (v []byte, err error) { err = m.View(func(txn *lmdb.Txn) (err error) { - v, err = ignoreNotFound(txn.Get(m.dataDB, composeKey(storeCtx, key))) + v, err = ignoreNotFoundData(txn.Get(m.dataDB, composeKey(storeCtx, key))) return }) return @@ -202,7 +215,7 @@ func (m *mdb) Delete(_ context.Context, storeCtx string, keys ...string) (err er if len(keys) > 0 { err = m.Update(func(txn *lmdb.Txn) (err error) { for _, k := range keys { - if err = txn.Del(m.dataDB, composeKey(storeCtx, k), nil); err != nil { + if err = ignoreNotFound(txn.Del(m.dataDB, composeKey(storeCtx, k), nil)); err != nil { break } } @@ -212,9 +225,53 @@ func (m *mdb) Delete(_ context.Context, storeCtx string, keys ...string) (err er return } +const ( + ipLen = 16 + packedPeerLen = bittorrent.PeerIDLen + ipLen + 2 + seederPrefix = 'S' + leecherPrefix = 'L' +) + +func packPeer(peer bittorrent.Peer, out []byte) { + _ = out[packedPeerLen-1] + copy(out, peer.ID.Bytes()) + a := peer.Addr().As16() + copy(out[bittorrent.PeerIDLen:], a[:]) + binary.BigEndian.PutUint16(out[bittorrent.PeerIDLen+ipLen:], peer.Port()) + return +} + +func unpackPeer(arr []byte) (peer bittorrent.Peer) { + _ = arr[packedPeerLen-1] + peerID, _ := bittorrent.NewPeerID(arr[:bittorrent.PeerIDLen]) + peer = bittorrent.Peer{ + ID: peerID, + AddrPort: netip.AddrPortFrom(netip.AddrFrom16([ipLen]byte(arr[bittorrent.PeerIDLen:])).Unmap(), + binary.BigEndian.Uint16(arr[bittorrent.PeerIDLen+ipLen:])), + } + return +} + +func composeIHKey(ih bittorrent.InfoHash, peer bittorrent.Peer, seeder bool) (ihKey []byte) { + ihLen := len(ih) + ihKey = make([]byte, ihLen+3+packedPeerLen) + if seeder { + ihKey[0] = seederPrefix + } else { + ihKey[0] = leecherPrefix + } + ihKey[1], ihKey[ihLen+2] = keySeparator, keySeparator + copy(ihKey[2:], ih) + packPeer(peer, ihKey[ihLen+3:]) + return +} + func (m *mdb) PutSeeder(_ context.Context, ih bittorrent.InfoHash, peer bittorrent.Peer) error { - //TODO implement me - panic("implement me") + return m.Update(func(txn *lmdb.Txn) error { + return txn.Put(m.peersDB, composeIHKey(ih, peer, true), + binary.BigEndian.AppendUint64(nil, uint64(timecache.NowUnixNano())), + 0) + }) } func (m *mdb) DeleteSeeder(_ context.Context, ih bittorrent.InfoHash, peer bittorrent.Peer) error { @@ -223,8 +280,11 @@ func (m *mdb) DeleteSeeder(_ context.Context, ih bittorrent.InfoHash, peer bitto } func (m *mdb) PutLeecher(_ context.Context, ih bittorrent.InfoHash, peer bittorrent.Peer) error { - //TODO implement me - panic("implement me") + return m.Update(func(txn *lmdb.Txn) error { + return txn.Put(m.peersDB, composeIHKey(ih, peer, false), + binary.BigEndian.AppendUint64(nil, uint64(timecache.NowUnixNano())), + 0) + }) } func (m *mdb) DeleteLeecher(_ context.Context, ih bittorrent.InfoHash, peer bittorrent.Peer) error { diff --git a/storage/mdb/storage_disabled.go b/storage/mdb/storage_disabled.go index 8e3863a..7346ef5 100644 --- a/storage/mdb/storage_disabled.go +++ b/storage/mdb/storage_disabled.go @@ -1,3 +1,3 @@ -//go:build !lmdb || !cgo +//go:build !cgo package mdb diff --git a/storage/redis/storage.go b/storage/redis/storage.go index c6a6518..99fb1b5 100644 --- a/storage/redis/storage.go +++ b/storage/redis/storage.go @@ -343,7 +343,7 @@ func NoResultErr(err error) error { } // InfoHashKey generates redis key for provided hash and flags -func InfoHashKey(infoHash string, seeder, v6 bool, suffix string) (infoHashKey string) { +func InfoHashKey(infoHash string, seeder, v6 bool) (infoHashKey string) { var bm int if seeder { bm = 0b01 @@ -361,7 +361,7 @@ func InfoHashKey(infoHash string, seeder, v6 bool, suffix string) (infoHashKey s case 0b00: infoHashKey = IH4LeecherKey } - infoHashKey += infoHash + suffix + infoHashKey += infoHash return } @@ -411,19 +411,19 @@ func PackPeer(p bittorrent.Peer) string { } func (ps *store) PutSeeder(ctx context.Context, ih bittorrent.InfoHash, peer bittorrent.Peer) error { - return ps.putPeer(ctx, InfoHashKey(ih.RawString(), true, peer.Addr().Is6(), ""), CountSeederKey, PackPeer(peer)) + return ps.putPeer(ctx, InfoHashKey(ih.RawString(), true, peer.Addr().Is6()), CountSeederKey, PackPeer(peer)) } func (ps *store) DeleteSeeder(ctx context.Context, ih bittorrent.InfoHash, peer bittorrent.Peer) error { - return ps.delPeer(ctx, InfoHashKey(ih.RawString(), true, peer.Addr().Is6(), ""), CountSeederKey, PackPeer(peer)) + return ps.delPeer(ctx, InfoHashKey(ih.RawString(), true, peer.Addr().Is6()), CountSeederKey, PackPeer(peer)) } func (ps *store) PutLeecher(ctx context.Context, ih bittorrent.InfoHash, peer bittorrent.Peer) error { - return ps.putPeer(ctx, InfoHashKey(ih.RawString(), false, peer.Addr().Is6(), ""), CountLeecherKey, PackPeer(peer)) + return ps.putPeer(ctx, InfoHashKey(ih.RawString(), false, peer.Addr().Is6()), CountLeecherKey, PackPeer(peer)) } func (ps *store) DeleteLeecher(ctx context.Context, ih bittorrent.InfoHash, peer bittorrent.Peer) error { - return ps.delPeer(ctx, InfoHashKey(ih.RawString(), false, peer.Addr().Is6(), ""), CountLeecherKey, PackPeer(peer)) + return ps.delPeer(ctx, InfoHashKey(ih.RawString(), false, peer.Addr().Is6()), CountLeecherKey, PackPeer(peer)) } func (ps *store) GraduateLeecher(ctx context.Context, ih bittorrent.InfoHash, peer bittorrent.Peer) error { @@ -433,7 +433,7 @@ func (ps *store) GraduateLeecher(ctx context.Context, ih bittorrent.InfoHash, pe Msg("graduate leecher") infoHash, peerID, isV6 := ih.RawString(), PackPeer(peer), peer.Addr().Is6() - ihSeederKey, ihLeecherKey := InfoHashKey(infoHash, true, isV6, ""), InfoHashKey(infoHash, false, isV6, "") + ihSeederKey, ihLeecherKey := InfoHashKey(infoHash, true, isV6), InfoHashKey(infoHash, false, isV6) return ps.tx(ctx, func(tx redis.Pipeliner) error { deleted, err := tx.HDel(ctx, ihLeecherKey, peerID).Uint64() @@ -462,31 +462,29 @@ func (ps *store) GraduateLeecher(ctx context.Context, ih bittorrent.InfoHash, pe // peerMinimumLen is the least allowed length of string serialized Peer const peerMinimumLen = bittorrent.PeerIDLen + 2 + net.IPv4len -var errInvalidPeerDataSize = fmt.Errorf("invalid peer data (must be at least %d bytes (InfoHash + Port + IPv4))", peerMinimumLen) +var errInvalidPeerDataSize = fmt.Errorf("invalid peer data (must be at least %d bytes (PeerID + Port + IPv4))", peerMinimumLen) // UnpackPeer constructs Peer from serialized by Peer.PackPeer data: PeerID[20by]Port[2by]net.IP[4/16by] -func UnpackPeer(data string) (bittorrent.Peer, error) { - var peer bittorrent.Peer +func UnpackPeer(data string) (peer bittorrent.Peer, err error) { if len(data) < peerMinimumLen { - return peer, errInvalidPeerDataSize + err = errInvalidPeerDataSize + return } b := str2bytes.StringToBytes(data) - peerID, err := bittorrent.NewPeerID(b[:bittorrent.PeerIDLen]) - if err == nil { - if addr, isOk := netip.AddrFromSlice(b[bittorrent.PeerIDLen+2:]); isOk { - peer = bittorrent.Peer{ - ID: peerID, - AddrPort: netip.AddrPortFrom( - addr.Unmap(), - binary.BigEndian.Uint16(b[bittorrent.PeerIDLen:bittorrent.PeerIDLen+2]), - ), - } - } else { - err = bittorrent.ErrInvalidIP + peerID, _ := bittorrent.NewPeerID(b[:bittorrent.PeerIDLen]) + if addr, isOk := netip.AddrFromSlice(b[bittorrent.PeerIDLen+2:]); isOk { + peer = bittorrent.Peer{ + ID: peerID, + AddrPort: netip.AddrPortFrom( + addr.Unmap(), + binary.BigEndian.Uint16(b[bittorrent.PeerIDLen:bittorrent.PeerIDLen+2]), + ), } + } else { + err = bittorrent.ErrInvalidIP } - return peer, err + return } func (ps *Connection) parsePeersList(peersResult *redis.StringSliceCmd) (peers []bittorrent.Peer, err error) { @@ -516,10 +514,10 @@ func (ps *Connection) GetPeers(ctx context.Context, ih bittorrent.InfoHash, forS infoHashKeys := make([]string, 1, 2) if forSeeder { - infoHashKeys[0] = InfoHashKey(infoHash, false, isV6, "") + infoHashKeys[0] = InfoHashKey(infoHash, false, isV6) } else { - infoHashKeys[0] = InfoHashKey(infoHash, true, isV6, "") - infoHashKeys = append(infoHashKeys, InfoHashKey(infoHash, false, isV6, "")) + infoHashKeys[0] = InfoHashKey(infoHash, true, isV6) + infoHashKeys = append(infoHashKeys, InfoHashKey(infoHash, false, isV6)) } for _, infoHashKey := range infoHashKeys { @@ -564,19 +562,19 @@ func (ps *Connection) ScrapeIH(ctx context.Context, ih bittorrent.InfoHash, coun infoHash := ih.RawString() var lc4, lc6, sc4, sc6, dc int64 - lc4, err = countFn(ctx, InfoHashKey(infoHash, false, false, "")).Result() + lc4, err = countFn(ctx, InfoHashKey(infoHash, false, false)).Result() if err = NoResultErr(err); err != nil { return } - lc6, err = countFn(ctx, InfoHashKey(infoHash, false, true, "")).Result() + lc6, err = countFn(ctx, InfoHashKey(infoHash, false, true)).Result() if err = NoResultErr(err); err != nil { return } - sc4, err = countFn(ctx, InfoHashKey(infoHash, true, false, "")).Result() + sc4, err = countFn(ctx, InfoHashKey(infoHash, true, false)).Result() if err = NoResultErr(err); err != nil { return } - sc6, err = countFn(ctx, InfoHashKey(infoHash, true, true, "")).Result() + sc6, err = countFn(ctx, InfoHashKey(infoHash, true, true)).Result() if err = NoResultErr(err); err != nil { return }