From b956811e40b94222b235d97cd115b31ac6f1ca63 Mon Sep 17 00:00:00 2001 From: "Lawrence, Rendall" Date: Tue, 21 Mar 2023 18:58:27 +0300 Subject: [PATCH] (untested) refactor code --- bittorrent/peer.go | 128 +++++------------- frontend/http/frontend_test.go | 8 +- frontend/http/parser.go | 1 - frontend/udp/parser.go | 2 - go.mod | 4 +- go.sum | 4 +- middleware/jwt/jwt.go | 4 +- .../torrentapproval/container/list/list.go | 2 +- .../torrentapproval/torrentapproval_test.go | 2 +- storage/keydb/storage.go | 10 +- storage/memory/storage.go | 16 +-- storage/pg/storage.go | 35 ++--- storage/redis/storage.go | 57 +++++++- 13 files changed, 129 insertions(+), 144 deletions(-) diff --git a/bittorrent/peer.go b/bittorrent/peer.go index 3227e42..751fb21 100644 --- a/bittorrent/peer.go +++ b/bittorrent/peer.go @@ -6,12 +6,10 @@ package bittorrent import ( "crypto/sha1" "crypto/sha256" - "encoding/binary" "encoding/hex" - "errors" "fmt" - "net" "net/netip" + "unsafe" "github.com/rs/zerolog" ) @@ -33,18 +31,22 @@ func NewPeerID(b []byte) (PeerID, error) { if len(b) != PeerIDLen { return p, ErrInvalidPeerIDSize } - copy(p[:], b) - return p, nil + return PeerID(b), nil } // String implements fmt.Stringer, returning the base16 encoded PeerID. func (p PeerID) String() string { - return hex.EncodeToString(p[:]) + return hex.EncodeToString(p.Bytes()) } // RawString returns a 20-byte string of the raw bytes of the ID. func (p PeerID) RawString() string { - return string(p[:]) + return unsafe.String(&p[0], PeerIDLen) +} + +// Bytes returns slice of bytes represents this PeerID +func (p PeerID) Bytes() []byte { + return p[:] } // InfoHash represents an infohash. @@ -55,16 +57,10 @@ const ( InfoHashV1Len = sha1.Size // InfoHashV2Len ... sha256.Size InfoHashV2Len = sha256.Size - // NoneInfoHash dummy invalid InfoHash - NoneInfoHash InfoHash = "" ) -var ( - // ErrInvalidHashType holds error about invalid InfoHash input type - ErrInvalidHashType = errors.New("info hash must be provided as byte slice or raw/hex string") - // ErrInvalidHashSize holds error about invalid InfoHash size - ErrInvalidHashSize = fmt.Errorf("info hash must be either %d (for torrent V1) or %d (V2) bytes", InfoHashV1Len, InfoHashV2Len) -) +// ErrInvalidHashSize holds error about invalid InfoHash size +var ErrInvalidHashSize = fmt.Errorf("info hash must be either %d (for torrent V1) or %d (V2) bytes or same sizes x2 (if HEX encoded)", InfoHashV1Len, InfoHashV2Len) // TruncateV1 returns truncated to 20-bytes length array of the corresponding InfoHash. // If InfoHash is V2 (32 bytes), it will be truncated to 20 bytes @@ -76,48 +72,32 @@ func (i InfoHash) TruncateV1() InfoHash { return i } -// NewInfoHash creates an InfoHash from a byte slice or raw/hex string. -func NewInfoHash(data any) (InfoHash, error) { - if data == nil { - return NoneInfoHash, ErrInvalidHashType - } - var ba []byte - switch t := data.(type) { - case [InfoHashV1Len]byte: - ba = t[:] - case [InfoHashV2Len]byte: - ba = t[:] - case []byte: - l := len(t) - if l == InfoHashV1Len*2 || l == InfoHashV2Len*2 { - ba = make([]byte, l/2) - if _, err := hex.Decode(ba, t); err != nil { - return NoneInfoHash, err - } - } else { - ba = t - } - case string: - l := len(t) - if l == InfoHashV1Len*2 || l == InfoHashV2Len*2 { - var err error - if ba, err = hex.DecodeString(t); err != nil { - return NoneInfoHash, err - } - } else { - ba = []byte(t) +// NewInfoHash creates an InfoHash from raw/hex byte slice. +func NewInfoHash(data []byte) (InfoHash, error) { + var ih InfoHash + switch l := len(data); l { + case InfoHashV1Len, InfoHashV2Len: + ih = InfoHash(data) + case InfoHashV1Len * 2, InfoHashV2Len * 2: + bb := make([]byte, l/2) + if _, err := hex.Decode(bb, data); err != nil { + return "", err } + ih = InfoHash(unsafe.String(&bb[0], len(bb))) + default: + return "", ErrInvalidHashSize } - l := len(ba) - if l != InfoHashV1Len && l != InfoHashV2Len { - return NoneInfoHash, ErrInvalidHashSize - } - return InfoHash(ba), nil + return ih, nil +} + +// NewInfoHashString creates an InfoHash from raw/hex string. +func NewInfoHashString(data string) (InfoHash, error) { + return NewInfoHash(unsafe.Slice(unsafe.StringData(data), len(data))) } // String implements fmt.Stringer, returning the base16 encoded InfoHash. func (i InfoHash) String() string { - return hex.EncodeToString([]byte(i)) + return hex.EncodeToString(i.Bytes()) } // RawString returns a string of the raw bytes of the InfoHash. @@ -125,6 +105,11 @@ func (i InfoHash) RawString() string { return string(i) } +// Bytes returns slice of bytes represents this InfoHash +func (i InfoHash) Bytes() []byte { + return unsafe.Slice(unsafe.StringData(string(i)), len(i)) +} + // Peer represents the connection details of a peer that is returned in an // announce response. type Peer struct { @@ -132,47 +117,6 @@ type Peer struct { netip.AddrPort } -// PeerMinimumLen is the least allowed length of string serialized Peer -const PeerMinimumLen = PeerIDLen + 2 + net.IPv4len - -// ErrInvalidPeerDataSize holds error about invalid Peer data size -var ErrInvalidPeerDataSize = fmt.Errorf("invalid peer data it must be at least %d bytes (InfoHash + Port + IPv4)", PeerMinimumLen) - -// NewPeer constructs Peer from serialized by Peer.RawString data: PeerID[20by]Port[2by]net.IP[4/16by] -func NewPeer(data string) (Peer, error) { - var peer Peer - if len(data) < PeerMinimumLen { - return peer, ErrInvalidPeerDataSize - } - b := []byte(data) - peerID, err := NewPeerID(b[:PeerIDLen]) - if err == nil { - if addr, isOk := netip.AddrFromSlice(b[PeerIDLen+2:]); isOk { - peer = Peer{ - ID: peerID, - AddrPort: netip.AddrPortFrom( - addr.Unmap(), - binary.BigEndian.Uint16(b[PeerIDLen:PeerIDLen+2]), - ), - } - } else { - err = ErrInvalidIP - } - } - - return peer, err -} - -// RawString generates concatenation of PeerID, net port and IP-address -func (p Peer) RawString() string { - ip := p.Addr() - b := make([]byte, PeerIDLen+2+(ip.BitLen()/8)) - copy(b[:PeerIDLen], p.ID[:]) - binary.BigEndian.PutUint16(b[PeerIDLen:PeerIDLen+2], p.Port()) - copy(b[PeerIDLen+2:], ip.AsSlice()) - return string(b) -} - // Addr returns unmapped peer's IP address func (p Peer) Addr() netip.Addr { return p.AddrPort.Addr().Unmap() diff --git a/frontend/http/frontend_test.go b/frontend/http/frontend_test.go index 0bf4779..1df4904 100644 --- a/frontend/http/frontend_test.go +++ b/frontend/http/frontend_test.go @@ -17,8 +17,6 @@ import ( "github.com/sot-tech/mochi/pkg/log" ) -const iterations = 10000 - var ( addr = fmt.Sprintf("127.0.0.1:%d", rand.Int63n(10000)+16384) hashes = make([]string, 10) @@ -87,7 +85,7 @@ func BenchmarkPing(b *testing.B) { Path: "ping", } us := u.String() - for i := 0; i < iterations; i++ { + for i := 0; i < b.N; i++ { if err := runGet(us, false); err != nil { b.Error(err) } @@ -95,7 +93,7 @@ func BenchmarkPing(b *testing.B) { } func BenchmarkAnnounce(b *testing.B) { - for i := 0; i < iterations; i++ { + for i := 0; i < b.N; i++ { u := url.URL{ Scheme: "http", Host: addr, @@ -119,7 +117,7 @@ func BenchmarkAnnounce(b *testing.B) { } func BenchmarkScrape(b *testing.B) { - for i := 0; i < iterations; i++ { + for i := 0; i < b.N; i++ { u := url.URL{ Scheme: "http", Host: addr, diff --git a/frontend/http/parser.go b/frontend/http/parser.go index 6542496..c96dd53 100644 --- a/frontend/http/parser.go +++ b/frontend/http/parser.go @@ -57,7 +57,6 @@ func parseAnnounce(r *fasthttp.RequestCtx, opts ParseOptions) (*bittorrent.Annou if len(infoHashes) > 1 { return nil, errMultipleInfoHashes } - // FIXME: make sure that we have a copy of InfoHash request.InfoHash = infoHashes[0] // Parse the PeerID from the request. diff --git a/frontend/udp/parser.go b/frontend/udp/parser.go index 2117392..6f229bb 100644 --- a/frontend/udp/parser.go +++ b/frontend/udp/parser.go @@ -71,7 +71,6 @@ func parseAnnounce(r Request, v6Action bool, opts frontend.ParseOptions) (*bitto // XXX: pure V2 hashes will cause invalid parsing, // but BEP-52 says, that V2 hashes SHOULD be truncated - // FIXME: make sure that we have a copy of InfoHash request.InfoHash, err = bittorrent.NewInfoHash(r.Packet[16:36]) if err != nil { return nil, errInvalidInfoHash @@ -178,7 +177,6 @@ func parseScrape(r Request, opts frontend.ParseOptions) (*bittorrent.ScrapeReque var request *bittorrent.ScrapeRequest for len(r.Packet) >= bittorrent.InfoHashV1Len { var ih bittorrent.InfoHash - // FIXME: make sure that we have a copy of InfoHash if ih, err = bittorrent.NewInfoHash(r.Packet[:bittorrent.InfoHashV1Len]); err == nil { infoHashes = append(infoHashes, ih) r.Packet = r.Packet[bittorrent.InfoHashV1Len:] diff --git a/go.mod b/go.mod index a3ba01b..087d932 100644 --- a/go.mod +++ b/go.mod @@ -1,11 +1,11 @@ module github.com/sot-tech/mochi -go 1.19 +go 1.20 require ( code.cloudfoundry.org/go-diodes v0.0.0-20230317203753-49f1af6d2f1a github.com/MicahParks/keyfunc v1.9.0 - github.com/anacrolix/torrent v1.48.0 + github.com/anacrolix/torrent v1.49.0 github.com/cespare/xxhash/v2 v2.2.0 github.com/golang-jwt/jwt/v4 v4.5.0 github.com/jackc/pgx/v5 v5.3.1 diff --git a/go.sum b/go.sum index c47f4d5..e23f1c2 100644 --- a/go.sum +++ b/go.sum @@ -40,8 +40,8 @@ github.com/anacrolix/stm v0.2.0/go.mod h1:zoVQRvSiGjGoTmbM0vSLIiaKjWtNPeTvXUSdJQ github.com/anacrolix/tagflag v0.0.0-20180109131632-2146c8d41bf0/go.mod h1:1m2U/K6ZT+JZG0+bdMK6qauP49QT4wE5pmhJXOKKCHw= github.com/anacrolix/tagflag v1.0.0/go.mod h1:1m2U/K6ZT+JZG0+bdMK6qauP49QT4wE5pmhJXOKKCHw= github.com/anacrolix/tagflag v1.1.0/go.mod h1:Scxs9CV10NQatSmbyjqmqmeQNwGzlNe0CMUMIxqHIG8= -github.com/anacrolix/torrent v1.48.0 h1:OQe1aQb8WnhDzpcI7r3yWoHzHWKyPbfhXGfO9Q/pvbY= -github.com/anacrolix/torrent v1.48.0/go.mod h1:3UtkJ8BnxXDRwvk+eT+uwiZalfFJ8YzAhvxe4QRPSJI= +github.com/anacrolix/torrent v1.49.0 h1:v/TAd8BKsZarYEYv7VkPNv8tY5zZCwQyxMMlKKbAF4I= +github.com/anacrolix/torrent v1.49.0/go.mod h1:qT3yS5oQwDUHnBXy+zf3nozLPudG7SFNDL3Jl/zQwFw= github.com/andybalholm/brotli v1.0.5 h1:8uQZIdzKmjc/iuPu7O2ioW48L81FgatrcpfFmiq/cCs= github.com/andybalholm/brotli v1.0.5/go.mod h1:fO7iG3H7G2nSZ7m0zPUDn85XEX2GTukHGRSepvi9Eig= github.com/apache/thrift v0.12.0/go.mod h1:cp2SuWMxlEZw2r+iP2GNCdIi4C1qmUzdZFSVb+bacwQ= diff --git a/middleware/jwt/jwt.go b/middleware/jwt/jwt.go index 65c6eaf..61142e0 100644 --- a/middleware/jwt/jwt.go +++ b/middleware/jwt/jwt.go @@ -160,7 +160,7 @@ func (h *hook) HandleAnnounce(ctx context.Context, req *bittorrent.AnnounceReque err = ErrInvalidJWT } else { var claimIH bittorrent.InfoHash - if claimIH, err = bittorrent.NewInfoHash(claims.InfoHash); err != nil { + if claimIH, err = bittorrent.NewInfoHashString(claims.InfoHash); err != nil { logger.Info(). Err(err). Object("source", req.RequestPeer). @@ -206,7 +206,7 @@ func (h *hook) HandleScrape(ctx context.Context, req *bittorrent.ScrapeRequest, } else { var claimIHs bittorrent.InfoHashes for _, s := range claims.InfoHashes { - if providedIh, err := bittorrent.NewInfoHash(s); err == nil { + if providedIh, err := bittorrent.NewInfoHashString(s); err == nil { claimIHs = append(claimIHs, providedIh) } else { logger.Info(). diff --git a/middleware/torrentapproval/container/list/list.go b/middleware/torrentapproval/container/list/list.go index cd61d25..f857b4a 100644 --- a/middleware/torrentapproval/container/list/list.go +++ b/middleware/torrentapproval/container/list/list.go @@ -56,7 +56,7 @@ func build(conf conf.MapConfig, st storage.DataStorage) (container.Container, er if len(c.HashList) > 0 { init := make([]storage.Entry, 0, len(c.HashList)) for _, hashString := range c.HashList { - ih, err := bittorrent.NewInfoHash(hashString) + ih, err := bittorrent.NewInfoHashString(hashString) if err != nil { return nil, fmt.Errorf("whitelist : %s : %w", hashString, err) } diff --git a/middleware/torrentapproval/torrentapproval_test.go b/middleware/torrentapproval/torrentapproval_test.go index 8bf69ba..a6ca2b9 100644 --- a/middleware/torrentapproval/torrentapproval_test.go +++ b/middleware/torrentapproval/torrentapproval_test.go @@ -84,7 +84,7 @@ func TestHandleAnnounce(t *testing.T) { req := &bittorrent.AnnounceRequest{} resp := &bittorrent.AnnounceResponse{} - hashinfo, err := bittorrent.NewInfoHash(tt.ih) + hashinfo, err := bittorrent.NewInfoHashString(tt.ih) require.Nil(t, err) req.InfoHash = hashinfo diff --git a/storage/keydb/storage.go b/storage/keydb/storage.go index bbc6365..ba5c1ea 100644 --- a/storage/keydb/storage.go +++ b/storage/keydb/storage.go @@ -114,19 +114,19 @@ func (s *store) delPeer(ctx context.Context, infoHashKey, peerID string) error { } func (s *store) PutSeeder(ctx context.Context, ih bittorrent.InfoHash, peer bittorrent.Peer) error { - return s.addPeer(ctx, r.InfoHashKey(ih.RawString(), true, peer.Addr().Is6()), peer.RawString()) + return s.addPeer(ctx, r.InfoHashKey(ih.RawString(), true, peer.Addr().Is6()), r.PackPeer(peer)) } func (s *store) DeleteSeeder(ctx context.Context, ih bittorrent.InfoHash, peer bittorrent.Peer) error { - return s.delPeer(ctx, r.InfoHashKey(ih.RawString(), true, peer.Addr().Is6()), peer.RawString()) + return s.delPeer(ctx, r.InfoHashKey(ih.RawString(), true, peer.Addr().Is6()), r.PackPeer(peer)) } func (s *store) PutLeecher(ctx context.Context, ih bittorrent.InfoHash, peer bittorrent.Peer) error { - return s.addPeer(ctx, r.InfoHashKey(ih.RawString(), false, peer.Addr().Is6()), peer.RawString()) + return s.addPeer(ctx, r.InfoHashKey(ih.RawString(), false, peer.Addr().Is6()), r.PackPeer(peer)) } func (s *store) DeleteLeecher(ctx context.Context, ih bittorrent.InfoHash, peer bittorrent.Peer) error { - return s.delPeer(ctx, r.InfoHashKey(ih.RawString(), false, peer.Addr().Is6()), peer.RawString()) + return s.delPeer(ctx, r.InfoHashKey(ih.RawString(), false, peer.Addr().Is6()), r.PackPeer(peer)) } func (s *store) GraduateLeecher(ctx context.Context, ih bittorrent.InfoHash, peer bittorrent.Peer) (err error) { @@ -134,7 +134,7 @@ func (s *store) GraduateLeecher(ctx context.Context, ih bittorrent.InfoHash, pee Stringer("infoHash", ih). Object("peer", peer). Msg("graduate leecher") - infoHash, peerID := ih.RawString(), peer.RawString() + infoHash, peerID := ih.RawString(), r.PackPeer(peer) ihSeederKey := r.InfoHashKey(infoHash, true, peer.Addr().Is6()) ihLeecherKey := r.InfoHashKey(infoHash, false, peer.Addr().Is6()) var moved bool diff --git a/storage/memory/storage.go b/storage/memory/storage.go index 8697b81..a43453c 100644 --- a/storage/memory/storage.go +++ b/storage/memory/storage.go @@ -177,14 +177,15 @@ func (p *peers) len() int { return len(p.m) } -func (p *peers) keys(fn func(k bittorrent.Peer) bool) { +func (p *peers) keys(fn func(k bittorrent.Peer) bool) bool { p.RLock() + defer p.RUnlock() for k := range p.m { if !fn(k) { - break + return false } } - p.RUnlock() + return true } func (p *peers) forEach(fn func(k bittorrent.Peer, v int64) bool) { @@ -268,7 +269,7 @@ func (ps *peerStore) shardIndex(infoHash bittorrent.InfoHash, v6 bool) uint32 { // There are twice the amount of shards specified by the user, the first // half is dedicated to IPv4 swarms and the second half is dedicated to // IPv6 swarms. - idx := binary.BigEndian.Uint32([]byte(infoHash[:4])) % (uint32(len(ps.shards)) / 2) + idx := binary.BigEndian.Uint32(infoHash.Bytes()[:4]) % (uint32(len(ps.shards)) / 2) if v6 { idx += uint32(len(ps.shards) / 2) } @@ -417,8 +418,7 @@ func (ps *peerStore) AnnouncePeers(_ context.Context, ih bittorrent.InfoHash, fo if forSeeder { sw.leechers.keys(rangeFn) } else { - sw.seeders.keys(rangeFn) - if numWant > 0 { + if sw.seeders.keys(rangeFn) { sw.leechers.keys(rangeFn) } } @@ -446,8 +446,8 @@ func (ps *peerStore) ScrapeSwarm(_ context.Context, ih bittorrent.InfoHash) (lee Stringer("infoHash", ih). Msg("scrape swarm") - leechers, seeders = ps.countPeers(ih, true) - l, s := ps.countPeers(ih, false) + leechers, seeders = ps.countPeers(ih, false) + l, s := ps.countPeers(ih, true) leechers, seeders = leechers+l, seeders+s return diff --git a/storage/pg/storage.go b/storage/pg/storage.go index 631acec..554c2af 100644 --- a/storage/pg/storage.go +++ b/storage/pg/storage.go @@ -359,14 +359,14 @@ func (s *store) ScheduleStatisticsCollection(reportInterval time.Duration) { }() } -func (s *store) putPeer(ctx context.Context, ih bittorrent.InfoHash, peer bittorrent.Peer, seeder bool) (err error) { +func (s *store) putPeer(ctx context.Context, ih []byte, peer bittorrent.Peer, seeder bool) (err error) { logger.Trace(). - Stringer("infoHash", ih). + Hex("infoHash", ih). Object("peer", peer). Bool("seeder", seeder). Msg("put peer") _, err = s.Exec(ctx, s.Peer.AddQuery, pgx.NamedArgs{ - pInfoHash: []byte(ih), + pInfoHash: ih, pPeerID: peer.ID[:], pAddress: net.IP(peer.Addr().AsSlice()), pPort: peer.Port(), @@ -377,13 +377,13 @@ func (s *store) putPeer(ctx context.Context, ih bittorrent.InfoHash, peer bittor return } -func (s *store) delPeer(ctx context.Context, ih bittorrent.InfoHash, peer bittorrent.Peer, seeder bool) (err error) { +func (s *store) delPeer(ctx context.Context, ih []byte, peer bittorrent.Peer, seeder bool) (err error) { logger.Trace(). - Stringer("infoHash", ih). + Hex("infoHash", ih). Object("peer", peer). Msg("del peer") _, err = s.Exec(ctx, s.Peer.DelQuery, pgx.NamedArgs{ - pInfoHash: []byte(ih), + pInfoHash: ih, pPeerID: peer.ID[:], pAddress: net.IP(peer.Addr().AsSlice()), pPort: peer.Port(), @@ -393,19 +393,19 @@ func (s *store) delPeer(ctx context.Context, ih bittorrent.InfoHash, peer bittor } func (s *store) PutSeeder(ctx context.Context, ih bittorrent.InfoHash, peer bittorrent.Peer) error { - return s.putPeer(ctx, ih, peer, true) + return s.putPeer(ctx, ih.Bytes(), peer, true) } func (s *store) DeleteSeeder(ctx context.Context, ih bittorrent.InfoHash, peer bittorrent.Peer) error { - return s.delPeer(ctx, ih, peer, true) + return s.delPeer(ctx, ih.Bytes(), peer, true) } func (s *store) PutLeecher(ctx context.Context, ih bittorrent.InfoHash, peer bittorrent.Peer) error { - return s.putPeer(ctx, ih, peer, false) + return s.putPeer(ctx, ih.Bytes(), peer, false) } func (s *store) DeleteLeecher(ctx context.Context, ih bittorrent.InfoHash, peer bittorrent.Peer) error { - return s.delPeer(ctx, ih, peer, false) + return s.delPeer(ctx, ih.Bytes(), peer, false) } func (s *store) GraduateLeecher(ctx context.Context, ih bittorrent.InfoHash, peer bittorrent.Peer) error { @@ -414,7 +414,7 @@ func (s *store) GraduateLeecher(ctx context.Context, ih bittorrent.InfoHash, pee Object("peer", peer). Msg("graduate leecher") var batch pgx.Batch - ihb := []byte(ih) + ihb := ih.Bytes() batch.Queue(s.Peer.GraduateQuery, pgx.NamedArgs{ pInfoHash: ihb, pPeerID: peer.ID[:], @@ -425,10 +425,10 @@ func (s *store) GraduateLeecher(ctx context.Context, ih bittorrent.InfoHash, pee return s.txBatch(ctx, &batch) } -func (s *store) getPeers(ctx context.Context, ih bittorrent.InfoHash, seeders bool, maxCount int, isV6 bool) (peers []bittorrent.Peer, err error) { +func (s *store) getPeers(ctx context.Context, ih []byte, seeders bool, maxCount int, isV6 bool) (peers []bittorrent.Peer, err error) { var rows pgx.Rows if rows, err = s.Query(ctx, s.Announce.Query, pgx.NamedArgs{ - pInfoHash: []byte(ih), + pInfoHash: ih, pSeeder: seeders, pV6: isV6, pCount: maxCount, @@ -503,12 +503,13 @@ func (s *store) AnnouncePeers(ctx context.Context, ih bittorrent.InfoHash, forSe Int("numWant", numWant). Bool("v6", v6). Msg("announce peers") + ihb := ih.Bytes() if forSeeder { - peers, err = s.getPeers(ctx, ih, false, numWant, v6) + peers, err = s.getPeers(ctx, ihb, false, numWant, v6) } else { - if peers, err = s.getPeers(ctx, ih, true, numWant, v6); err == nil { + if peers, err = s.getPeers(ctx, ihb, true, numWant, v6); err == nil { var addPeers []bittorrent.Peer - addPeers, err = s.getPeers(ctx, ih, false, numWant-len(peers), v6) + addPeers, err = s.getPeers(ctx, ihb, false, numWant-len(peers), v6) peers = append(peers, addPeers...) } } @@ -568,7 +569,7 @@ func (s *store) ScrapeSwarm(ctx context.Context, ih bittorrent.InfoHash) (leeche logger.Trace(). Stringer("infoHash", ih). Msg("scrape swarm") - ihb := []byte(ih) + ihb := ih.Bytes() if seeders, leechers, err = s.countPeers(ctx, ihb); err != nil { return } diff --git a/storage/redis/storage.go b/storage/redis/storage.go index f46ffbf..64f12fc 100644 --- a/storage/redis/storage.go +++ b/storage/redis/storage.go @@ -24,11 +24,16 @@ package redis import ( "context" + "encoding/binary" "errors" + "fmt" + "net" + "net/netip" "strconv" "strings" "sync" "time" + "unsafe" "github.com/redis/go-redis/v9" @@ -389,20 +394,30 @@ func (ps *store) delPeer(ctx context.Context, infoHashKey, peerCountKey, peerID return err } +// PackPeer generates concatenation of PeerID, net port and IP-address +func PackPeer(p bittorrent.Peer) string { + ip := p.Addr() + b := make([]byte, bittorrent.PeerIDLen+2+(ip.BitLen()/8)) + copy(b[:bittorrent.PeerIDLen], p.ID.Bytes()) + binary.BigEndian.PutUint16(b[bittorrent.PeerIDLen:bittorrent.PeerIDLen+2], p.Port()) + copy(b[bittorrent.PeerIDLen+2:], ip.AsSlice()) + return unsafe.String(&b[0], len(b)) +} + func (ps *store) PutSeeder(ctx context.Context, ih bittorrent.InfoHash, peer bittorrent.Peer) error { - return ps.putPeer(ctx, InfoHashKey(ih.RawString(), true, peer.Addr().Is6()), CountSeederKey, peer.RawString()) + return ps.putPeer(ctx, InfoHashKey(ih.RawString(), true, peer.Addr().Is6()), CountSeederKey, PackPeer(peer)) } func (ps *store) DeleteSeeder(ctx context.Context, ih bittorrent.InfoHash, peer bittorrent.Peer) error { - return ps.delPeer(ctx, InfoHashKey(ih.RawString(), true, peer.Addr().Is6()), CountSeederKey, peer.RawString()) + return ps.delPeer(ctx, InfoHashKey(ih.RawString(), true, peer.Addr().Is6()), CountSeederKey, PackPeer(peer)) } func (ps *store) PutLeecher(ctx context.Context, ih bittorrent.InfoHash, peer bittorrent.Peer) error { - return ps.putPeer(ctx, InfoHashKey(ih.RawString(), false, peer.Addr().Is6()), CountLeecherKey, peer.RawString()) + return ps.putPeer(ctx, InfoHashKey(ih.RawString(), false, peer.Addr().Is6()), CountLeecherKey, PackPeer(peer)) } func (ps *store) DeleteLeecher(ctx context.Context, ih bittorrent.InfoHash, peer bittorrent.Peer) error { - return ps.delPeer(ctx, InfoHashKey(ih.RawString(), false, peer.Addr().Is6()), CountLeecherKey, peer.RawString()) + return ps.delPeer(ctx, InfoHashKey(ih.RawString(), false, peer.Addr().Is6()), CountLeecherKey, PackPeer(peer)) } func (ps *store) GraduateLeecher(ctx context.Context, ih bittorrent.InfoHash, peer bittorrent.Peer) error { @@ -411,7 +426,7 @@ func (ps *store) GraduateLeecher(ctx context.Context, ih bittorrent.InfoHash, pe Object("peer", peer). Msg("graduate leecher") - infoHash, peerID, isV6 := ih.RawString(), peer.RawString(), peer.Addr().Is6() + infoHash, peerID, isV6 := ih.RawString(), PackPeer(peer), peer.Addr().Is6() ihSeederKey, ihLeecherKey := InfoHashKey(infoHash, true, isV6), InfoHashKey(infoHash, false, isV6) return ps.tx(ctx, func(tx redis.Pipeliner) error { @@ -438,12 +453,42 @@ func (ps *store) GraduateLeecher(ctx context.Context, ih bittorrent.InfoHash, pe }) } +// peerMinimumLen is the least allowed length of string serialized Peer +const peerMinimumLen = bittorrent.PeerIDLen + 2 + net.IPv4len + +var errInvalidPeerDataSize = fmt.Errorf("invalid peer data (must be at least %d bytes (InfoHash + Port + IPv4))", peerMinimumLen) + +// UnpackPeer constructs Peer from serialized by Peer.PackPeer data: PeerID[20by]Port[2by]net.IP[4/16by] +func UnpackPeer(data string) (bittorrent.Peer, error) { + var peer bittorrent.Peer + if len(data) < peerMinimumLen { + return peer, errInvalidPeerDataSize + } + b := unsafe.Slice(unsafe.StringData(data), len(data)) + peerID, err := bittorrent.NewPeerID(b[:bittorrent.PeerIDLen]) + if err == nil { + if addr, isOk := netip.AddrFromSlice(b[bittorrent.PeerIDLen+2:]); isOk { + peer = bittorrent.Peer{ + ID: peerID, + AddrPort: netip.AddrPortFrom( + addr.Unmap(), + binary.BigEndian.Uint16(b[bittorrent.PeerIDLen:bittorrent.PeerIDLen+2]), + ), + } + } else { + err = bittorrent.ErrInvalidIP + } + } + + return peer, err +} + func (ps *Connection) parsePeersList(peersResult *redis.StringSliceCmd) (peers []bittorrent.Peer, err error) { var peerIds []string peerIds, err = peersResult.Result() if err = NoResultErr(err); err == nil { for _, peerID := range peerIds { - if p, err := bittorrent.NewPeer(peerID); err == nil { + if p, err := UnpackPeer(peerID); err == nil { peers = append(peers, p) } else { logger.Error().Err(err).Str("peerID", peerID).Msg("unable to decode peer")