From 7471697c2066d9323f49975b351a86e378767f0f Mon Sep 17 00:00:00 2001 From: "Lawrence, Rendall" Date: Sun, 24 Apr 2022 23:46:20 +0300 Subject: [PATCH] (tested) combine v4 and v6 peers response in memory store --- storage/memory/storage.go | 120 ++++++++++++++++++-------------------- 1 file changed, 58 insertions(+), 62 deletions(-) diff --git a/storage/memory/storage.go b/storage/memory/storage.go index 0e305ca..b35b1c9 100644 --- a/storage/memory/storage.go +++ b/storage/memory/storage.go @@ -6,7 +6,6 @@ import ( "encoding/binary" "fmt" "math" - "net/netip" "reflect" "runtime" "sync" @@ -174,12 +173,12 @@ func (ps *peerStore) getClock() int64 { return timecache.NowUnixNano() } -func (ps *peerStore) shardIndex(infoHash bittorrent.InfoHash, addr netip.Addr) uint32 { +func (ps *peerStore) shardIndex(infoHash bittorrent.InfoHash, v6 bool) uint32 { // There are twice the amount of shards specified by the user, the first // half is dedicated to IPv4 swarms and the second half is dedicated to // IPv6 swarms. idx := binary.BigEndian.Uint32([]byte(infoHash[:4])) % (uint32(len(ps.shards)) / 2) - if addr.Is6() { + if v6 { idx += uint32(len(ps.shards) / 2) } return idx @@ -194,7 +193,7 @@ func (ps *peerStore) PutSeeder(ih bittorrent.InfoHash, p bittorrent.Peer) error pk := p.RawString() - shard := ps.shards[ps.shardIndex(ih, p.Addr())] + shard := ps.shards[ps.shardIndex(ih, p.Addr().Is6())] shard.Lock() if _, ok := shard.swarms[ih]; !ok { @@ -225,7 +224,7 @@ func (ps *peerStore) DeleteSeeder(ih bittorrent.InfoHash, p bittorrent.Peer) err pk := p.RawString() - shard := ps.shards[ps.shardIndex(ih, p.Addr())] + shard := ps.shards[ps.shardIndex(ih, p.Addr().Is6())] shard.Lock() if _, ok := shard.swarms[ih]; !ok { @@ -258,7 +257,7 @@ func (ps *peerStore) PutLeecher(ih bittorrent.InfoHash, p bittorrent.Peer) error pk := p.RawString() - shard := ps.shards[ps.shardIndex(ih, p.Addr())] + shard := ps.shards[ps.shardIndex(ih, p.Addr().Is6())] shard.Lock() if _, ok := shard.swarms[ih]; !ok { @@ -289,7 +288,7 @@ func (ps *peerStore) DeleteLeecher(ih bittorrent.InfoHash, p bittorrent.Peer) er pk := p.RawString() - shard := ps.shards[ps.shardIndex(ih, p.Addr())] + shard := ps.shards[ps.shardIndex(ih, p.Addr().Is6())] shard.Lock() if _, ok := shard.swarms[ih]; !ok { @@ -322,7 +321,7 @@ func (ps *peerStore) GraduateLeecher(ih bittorrent.InfoHash, p bittorrent.Peer) pk := p.RawString() - shard := ps.shards[ps.shardIndex(ih, p.Addr())] + shard := ps.shards[ps.shardIndex(ih, p.Addr().Is6())] shard.Lock() if _, ok := shard.swarms[ih]; !ok { @@ -350,6 +349,35 @@ func (ps *peerStore) GraduateLeecher(ih bittorrent.InfoHash, p bittorrent.Peer) return nil } +func parsePeers(peersMap map[string]int64, maxCount int, skipPeerID string) (peers []bittorrent.Peer) { + for pk := range peersMap { + if maxCount == 0 { + break + } + if pk != skipPeerID { + p, _ := bittorrent.NewPeer(pk) + peers = append(peers, p) + maxCount-- + } + } + return +} + +func (ps *peerStore) getPeers(shard *peerShard, ih bittorrent.InfoHash, maxCount int, leechersOnly bool, skipPeerID string) (peers []bittorrent.Peer) { + shard.RLock() + defer shard.RUnlock() + if swarm, ok := shard.swarms[ih]; ok { + if !leechersOnly { + peers = append(peers, parsePeers(swarm.seeders, maxCount, skipPeerID)...) + maxCount -= len(peers) + } + if maxCount > 0 { + peers = append(peers, parsePeers(swarm.leechers, maxCount, skipPeerID)...) + } + } + return +} + func (ps *peerStore) AnnouncePeers(ih bittorrent.InfoHash, seeder bool, numWant int, peer bittorrent.Peer) (peers []bittorrent.Peer, err error) { select { case <-ps.closed: @@ -357,78 +385,46 @@ func (ps *peerStore) AnnouncePeers(ih bittorrent.InfoHash, seeder bool, numWant default: } - shard := ps.shards[ps.shardIndex(ih, peer.Addr())] - shard.RLock() - - if _, ok := shard.swarms[ih]; !ok { - shard.RUnlock() - return nil, storage.ErrResourceDoesNotExist - } + peerID, isV6 := peer.RawString(), peer.Addr().Is6() if seeder { // Append leechers as possible. - leechers := shard.swarms[ih].leechers - for pk := range leechers { - if numWant == 0 { - break - } - p, _ := bittorrent.NewPeer(pk) - peers = append(peers, p) - numWant-- + peers = ps.getPeers(ps.shards[ps.shardIndex(ih, isV6)], ih, numWant, true, peerID) + if numWant -= len(peers); numWant > 0 { + peers = append(peers, ps.getPeers(ps.shards[ps.shardIndex(ih, !isV6)], ih, numWant, true, peerID)...) } } else { // Append as many seeders as possible. - seeders := shard.swarms[ih].seeders - for pk := range seeders { - if numWant == 0 { - break - } - p, _ := bittorrent.NewPeer(pk) - peers = append(peers, p) - numWant-- - } - - // Append leechers until we reach numWant. - if numWant > 0 { - leechers := shard.swarms[ih].leechers - announcerPK := peer.RawString() - for pk := range leechers { - if pk == announcerPK { - continue - } - - if numWant == 0 { - break - } - p, _ := bittorrent.NewPeer(pk) - peers = append(peers, p) - numWant-- - } + peers = ps.getPeers(ps.shards[ps.shardIndex(ih, isV6)], ih, numWant, false, peerID) + if numWant -= len(peers); numWant > 0 { + peers = append(peers, ps.getPeers(ps.shards[ps.shardIndex(ih, !isV6)], ih, numWant, false, peerID)...) } } - shard.RUnlock() return } -func (ps *peerStore) ScrapeSwarm(ih bittorrent.InfoHash, peer bittorrent.Peer) (leechers uint32, seeders uint32, snatched uint32) { +func (ps *peerStore) countPeers(ih bittorrent.InfoHash, v6 bool) (leechers uint32, seeders uint32) { + shard := ps.shards[ps.shardIndex(ih, v6)] + shard.RLock() + defer shard.RUnlock() + + if swarm, ok := shard.swarms[ih]; ok { + leechers, seeders = uint32(len(swarm.leechers)), uint32(len(swarm.seeders)) + } + return +} + +func (ps *peerStore) ScrapeSwarm(ih bittorrent.InfoHash, _ bittorrent.Peer) (leechers uint32, seeders uint32, _ uint32) { select { case <-ps.closed: panic("attempted to interact with stopped memory store") default: } - shard := ps.shards[ps.shardIndex(ih, peer.Addr())] - shard.RLock() - - swarm, ok := shard.swarms[ih] - if !ok { - shard.RUnlock() - return - } - - leechers, seeders = uint32(len(swarm.leechers)), uint32(len(swarm.seeders)) - shard.RUnlock() + leechers, seeders = ps.countPeers(ih, true) + l, s := ps.countPeers(ih, false) + leechers, seeders = leechers+l, seeders+s return }