(tested) combine v4 and v6 peers response in memory store

This commit is contained in:
Lawrence, Rendall
2022-04-24 23:46:20 +03:00
parent 0d4a2a751e
commit 7471697c20
+58 -62
View File
@@ -6,7 +6,6 @@ import (
"encoding/binary"
"fmt"
"math"
"net/netip"
"reflect"
"runtime"
"sync"
@@ -174,12 +173,12 @@ func (ps *peerStore) getClock() int64 {
return timecache.NowUnixNano()
}
func (ps *peerStore) shardIndex(infoHash bittorrent.InfoHash, addr netip.Addr) uint32 {
func (ps *peerStore) shardIndex(infoHash bittorrent.InfoHash, v6 bool) uint32 {
// There are twice the amount of shards specified by the user, the first
// half is dedicated to IPv4 swarms and the second half is dedicated to
// IPv6 swarms.
idx := binary.BigEndian.Uint32([]byte(infoHash[:4])) % (uint32(len(ps.shards)) / 2)
if addr.Is6() {
if v6 {
idx += uint32(len(ps.shards) / 2)
}
return idx
@@ -194,7 +193,7 @@ func (ps *peerStore) PutSeeder(ih bittorrent.InfoHash, p bittorrent.Peer) error
pk := p.RawString()
shard := ps.shards[ps.shardIndex(ih, p.Addr())]
shard := ps.shards[ps.shardIndex(ih, p.Addr().Is6())]
shard.Lock()
if _, ok := shard.swarms[ih]; !ok {
@@ -225,7 +224,7 @@ func (ps *peerStore) DeleteSeeder(ih bittorrent.InfoHash, p bittorrent.Peer) err
pk := p.RawString()
shard := ps.shards[ps.shardIndex(ih, p.Addr())]
shard := ps.shards[ps.shardIndex(ih, p.Addr().Is6())]
shard.Lock()
if _, ok := shard.swarms[ih]; !ok {
@@ -258,7 +257,7 @@ func (ps *peerStore) PutLeecher(ih bittorrent.InfoHash, p bittorrent.Peer) error
pk := p.RawString()
shard := ps.shards[ps.shardIndex(ih, p.Addr())]
shard := ps.shards[ps.shardIndex(ih, p.Addr().Is6())]
shard.Lock()
if _, ok := shard.swarms[ih]; !ok {
@@ -289,7 +288,7 @@ func (ps *peerStore) DeleteLeecher(ih bittorrent.InfoHash, p bittorrent.Peer) er
pk := p.RawString()
shard := ps.shards[ps.shardIndex(ih, p.Addr())]
shard := ps.shards[ps.shardIndex(ih, p.Addr().Is6())]
shard.Lock()
if _, ok := shard.swarms[ih]; !ok {
@@ -322,7 +321,7 @@ func (ps *peerStore) GraduateLeecher(ih bittorrent.InfoHash, p bittorrent.Peer)
pk := p.RawString()
shard := ps.shards[ps.shardIndex(ih, p.Addr())]
shard := ps.shards[ps.shardIndex(ih, p.Addr().Is6())]
shard.Lock()
if _, ok := shard.swarms[ih]; !ok {
@@ -350,6 +349,35 @@ func (ps *peerStore) GraduateLeecher(ih bittorrent.InfoHash, p bittorrent.Peer)
return nil
}
func parsePeers(peersMap map[string]int64, maxCount int, skipPeerID string) (peers []bittorrent.Peer) {
for pk := range peersMap {
if maxCount == 0 {
break
}
if pk != skipPeerID {
p, _ := bittorrent.NewPeer(pk)
peers = append(peers, p)
maxCount--
}
}
return
}
func (ps *peerStore) getPeers(shard *peerShard, ih bittorrent.InfoHash, maxCount int, leechersOnly bool, skipPeerID string) (peers []bittorrent.Peer) {
shard.RLock()
defer shard.RUnlock()
if swarm, ok := shard.swarms[ih]; ok {
if !leechersOnly {
peers = append(peers, parsePeers(swarm.seeders, maxCount, skipPeerID)...)
maxCount -= len(peers)
}
if maxCount > 0 {
peers = append(peers, parsePeers(swarm.leechers, maxCount, skipPeerID)...)
}
}
return
}
func (ps *peerStore) AnnouncePeers(ih bittorrent.InfoHash, seeder bool, numWant int, peer bittorrent.Peer) (peers []bittorrent.Peer, err error) {
select {
case <-ps.closed:
@@ -357,78 +385,46 @@ func (ps *peerStore) AnnouncePeers(ih bittorrent.InfoHash, seeder bool, numWant
default:
}
shard := ps.shards[ps.shardIndex(ih, peer.Addr())]
shard.RLock()
if _, ok := shard.swarms[ih]; !ok {
shard.RUnlock()
return nil, storage.ErrResourceDoesNotExist
}
peerID, isV6 := peer.RawString(), peer.Addr().Is6()
if seeder {
// Append leechers as possible.
leechers := shard.swarms[ih].leechers
for pk := range leechers {
if numWant == 0 {
break
}
p, _ := bittorrent.NewPeer(pk)
peers = append(peers, p)
numWant--
peers = ps.getPeers(ps.shards[ps.shardIndex(ih, isV6)], ih, numWant, true, peerID)
if numWant -= len(peers); numWant > 0 {
peers = append(peers, ps.getPeers(ps.shards[ps.shardIndex(ih, !isV6)], ih, numWant, true, peerID)...)
}
} else {
// Append as many seeders as possible.
seeders := shard.swarms[ih].seeders
for pk := range seeders {
if numWant == 0 {
break
}
p, _ := bittorrent.NewPeer(pk)
peers = append(peers, p)
numWant--
}
// Append leechers until we reach numWant.
if numWant > 0 {
leechers := shard.swarms[ih].leechers
announcerPK := peer.RawString()
for pk := range leechers {
if pk == announcerPK {
continue
}
if numWant == 0 {
break
}
p, _ := bittorrent.NewPeer(pk)
peers = append(peers, p)
numWant--
}
peers = ps.getPeers(ps.shards[ps.shardIndex(ih, isV6)], ih, numWant, false, peerID)
if numWant -= len(peers); numWant > 0 {
peers = append(peers, ps.getPeers(ps.shards[ps.shardIndex(ih, !isV6)], ih, numWant, false, peerID)...)
}
}
shard.RUnlock()
return
}
func (ps *peerStore) ScrapeSwarm(ih bittorrent.InfoHash, peer bittorrent.Peer) (leechers uint32, seeders uint32, snatched uint32) {
func (ps *peerStore) countPeers(ih bittorrent.InfoHash, v6 bool) (leechers uint32, seeders uint32) {
shard := ps.shards[ps.shardIndex(ih, v6)]
shard.RLock()
defer shard.RUnlock()
if swarm, ok := shard.swarms[ih]; ok {
leechers, seeders = uint32(len(swarm.leechers)), uint32(len(swarm.seeders))
}
return
}
func (ps *peerStore) ScrapeSwarm(ih bittorrent.InfoHash, _ bittorrent.Peer) (leechers uint32, seeders uint32, _ uint32) {
select {
case <-ps.closed:
panic("attempted to interact with stopped memory store")
default:
}
shard := ps.shards[ps.shardIndex(ih, peer.Addr())]
shard.RLock()
swarm, ok := shard.swarms[ih]
if !ok {
shard.RUnlock()
return
}
leechers, seeders = uint32(len(swarm.leechers)), uint32(len(swarm.seeders))
shard.RUnlock()
leechers, seeders = ps.countPeers(ih, true)
l, s := ps.countPeers(ih, false)
leechers, seeders = leechers+l, seeders+s
return
}