(wip) implement keydb store

* make redis store reusable for keydb
* replace redis HKeys calls in AnnouncePeers with HRandField
* change signature of storage.ScrapeSwarm
This commit is contained in:
Lawrence, Rendall
2022-04-24 02:23:18 +03:00
parent 4131c64e89
commit 22f459315b
9 changed files with 227 additions and 124 deletions
+9 -4
View File
@@ -96,9 +96,7 @@ func (h *responseHook) HandleAnnounce(ctx context.Context, req *bittorrent.Annou
}
// Add the Scrape data to the response.
s := h.store.ScrapeSwarm(req.InfoHash, req.Peer)
resp.Incomplete = s.Incomplete
resp.Complete = s.Complete
resp.Incomplete, resp.Complete, _ = h.store.ScrapeSwarm(req.InfoHash, req.Peer)
err = h.appendPeers(req, resp)
return ctx, err
@@ -161,7 +159,14 @@ func (h *responseHook) HandleScrape(ctx context.Context, req *bittorrent.ScrapeR
}
for _, infoHash := range req.InfoHashes {
resp.Files = append(resp.Files, h.store.ScrapeSwarm(infoHash, req.Peer))
leechers, seeders, snatched := h.store.ScrapeSwarm(infoHash, req.Peer)
resp.Files = append(resp.Files, bittorrent.Scrape{
InfoHash: infoHash,
Snatches: snatched,
Complete: seeders,
Incomplete: leechers,
})
}
return ctx, nil
+83 -26
View File
@@ -1,8 +1,12 @@
// Package keydb implements the storage interface.
// This storage mostly is the same as redis, but it collects peers
// not in hashes, but in sets and uses KeyDB-specific command
// `EXPIREMEMBER` and, so it does not need garbage collection.
// Note: this storage also does not support statistics collection
// This storage mostly is the same as redis, but
// uses KeyDB-specific command `EXPIREMEMBER`, so it
// does not need garbage collection.
//
// Storage uses redis.IHSeederKey and redis.IHLeecherKey,
// BUT they are NOT compatible with each other because of
// another structure (hash in redis and set in keydb).
// Note: this storage also does not support statistics collection.
package keydb
import (
@@ -66,7 +70,7 @@ func New(cfg r.Config) (*store, error) {
var st *store
if err == nil {
st = &store{rs, cfg.LogFields()}
st = &store{rs, cfg.LogFields(), uint(cfg.PeerLifetime.Seconds())}
}
return st, err
@@ -75,41 +79,94 @@ func New(cfg r.Config) (*store, error) {
type store struct {
r.Connection
logFields log.Fields
peerTTL uint
}
func (s store) PutSeeder(infoHash bittorrent.InfoHash, peer bittorrent.Peer) error {
// TODO implement me
panic("implement me")
func (s store) setPeerTTL(infoHashKey, peerId string) error {
return s.Process(context.TODO(), redis.NewCmd(context.TODO(), expireMemberCmd, infoHashKey, peerId, s.peerTTL))
}
func (s store) DeleteSeeder(infoHash bittorrent.InfoHash, peer bittorrent.Peer) error {
// TODO implement me
panic("implement me")
func (s store) addPeer(infoHashKey, peerId string) (err error) {
log.Debug("storage: KeyDB: PutPeer", log.Fields{
"InfoHashKey": infoHashKey,
"PeerId": peerId,
})
if err = s.SAdd(context.TODO(), infoHashKey, peerId).Err(); err == nil {
err = s.setPeerTTL(infoHashKey, peerId)
}
return
}
func (s store) PutLeecher(infoHash bittorrent.InfoHash, peer bittorrent.Peer) error {
// TODO implement me
panic("implement me")
func (s store) delPeer(infoHashKey, peerId string) error {
log.Debug("storage: KeyDB: DeletePeer", log.Fields{
"InfoHashKey": infoHashKey,
"PeerId": peerId,
})
deleted, err := s.SRem(context.TODO(), infoHashKey, peerId).Uint64()
err = r.AsNil(err)
if err == nil && deleted == 0 {
err = storage.ErrResourceDoesNotExist
}
return err
}
func (s store) DeleteLeecher(infoHash bittorrent.InfoHash, peer bittorrent.Peer) error {
// TODO implement me
panic("implement me")
func (s store) PutSeeder(ih bittorrent.InfoHash, peer bittorrent.Peer) error {
return s.addPeer(r.IHSeederKey+ih.RawString(), peer.RawString())
}
func (s store) GraduateLeecher(infoHash bittorrent.InfoHash, peer bittorrent.Peer) error {
// TODO implement me
panic("implement me")
func (s store) DeleteSeeder(ih bittorrent.InfoHash, peer bittorrent.Peer) error {
return s.delPeer(r.IHSeederKey+ih.RawString(), peer.RawString())
}
func (s store) AnnouncePeers(infoHash bittorrent.InfoHash, seeder bool, numWant int, peer bittorrent.Peer) (peers []bittorrent.Peer, err error) {
// TODO implement me
panic("implement me")
func (s store) PutLeecher(ih bittorrent.InfoHash, peer bittorrent.Peer) error {
return s.addPeer(r.IHLeecherKey+ih.RawString(), peer.RawString())
}
func (s store) ScrapeSwarm(infoHash bittorrent.InfoHash, peer bittorrent.Peer) bittorrent.Scrape {
// TODO implement me
panic("implement me")
func (s store) DeleteLeecher(ih bittorrent.InfoHash, peer bittorrent.Peer) error {
return s.delPeer(r.IHLeecherKey+ih.RawString(), peer.RawString())
}
func (s store) GraduateLeecher(ih bittorrent.InfoHash, peer bittorrent.Peer) (err error) {
log.Debug("storage: KeyDB: GraduateLeecher", log.Fields{
"InfoHash": ih,
"Peer": peer,
})
infoHash, peerId := ih.RawString(), peer.RawString()
ihSeederKey, ihLeecherKey := r.IHSeederKey+infoHash, r.IHLeecherKey+infoHash
var moved bool
if moved, err = s.SMove(context.TODO(), ihLeecherKey, ihSeederKey, peerId).Result(); err == nil {
if moved {
err = s.setPeerTTL(ihSeederKey, peerId)
} else {
err = s.addPeer(ihSeederKey, peerId)
}
}
return err
}
// AnnouncePeers is the same function as redis.AnnouncePeers
func (s store) AnnouncePeers(ih bittorrent.InfoHash, seeder bool, numWant int, peer bittorrent.Peer) ([]bittorrent.Peer, error) {
log.Debug("storage: KeyDB: AnnouncePeers", log.Fields{
"InfoHash": ih,
"Seeder": seeder,
"NumWant": numWant,
"Peer": peer,
})
return s.GetPeers(ih, seeder, numWant, peer, func(ctx context.Context, infoHashKey string) *redis.StringSliceCmd {
return s.SRandMemberN(context.TODO(), infoHashKey, int64(numWant))
})
}
// ScrapeSwarm is the same function as redis.ScrapeSwarm except `SCard` call instead of `HLen`
func (s store) ScrapeSwarm(ih bittorrent.InfoHash, peer bittorrent.Peer) (leechers uint32, seeders uint32, snatched uint32) {
log.Debug("storage: KeyDB ScrapeSwarm", log.Fields{
"InfoHash": ih,
"Peer": peer,
})
leechers, seeders = s.CountPeers(ih, s.SCard)
return
}
func (s *store) Stop() stop.Result {
+33
View File
@@ -0,0 +1,33 @@
package keydb
import (
"fmt"
"testing"
"time"
s "github.com/sot-tech/mochi/storage"
r "github.com/sot-tech/mochi/storage/redis"
"github.com/sot-tech/mochi/storage/test"
)
var cfg = r.Config{
Addresses: []string{"localhost:6379"},
PeerLifetime: 30 * time.Minute,
ReadTimeout: 10 * time.Second,
WriteTimeout: 10 * time.Second,
ConnectTimeout: 10 * time.Second,
}
func createNew() s.PeerStorage {
var ps s.PeerStorage
var err error
ps, err = New(cfg)
if err != nil {
panic(fmt.Sprint("Unable to create KeyDB connection: ", err, "\nThis driver needs real KeyDB instance"))
}
return ps
}
func TestStorage(t *testing.T) { test.RunTests(t, createNew()) }
func BenchmarkStorage(b *testing.B) { test.RunBenchmarks(b, createNew) }
+3 -10
View File
@@ -129,7 +129,7 @@ func (ps *peerStore) ScheduleGC(gcInterval, peerLifeTime time.Duration) {
log.Debug("storage: Memory purging peers with no announces since", log.Fields{"before": before})
start := time.Now()
ps.gc(before)
recordGCDuration(time.Since(start))
storage.PromGCDurationMilliseconds.Observe(float64(time.Since(start).Nanoseconds()) / float64(time.Millisecond))
}
}
}()
@@ -170,11 +170,6 @@ func (ps *peerStore) ScheduleStatisticsCollection(reportInterval time.Duration)
}()
}
// recordGCDuration records the duration of a GC sweep.
func recordGCDuration(duration time.Duration) {
storage.PromGCDurationMilliseconds.Observe(float64(duration.Nanoseconds()) / float64(time.Millisecond))
}
func (ps *peerStore) getClock() int64 {
return timecache.NowUnixNano()
}
@@ -416,14 +411,13 @@ func (ps *peerStore) AnnouncePeers(ih bittorrent.InfoHash, seeder bool, numWant
return
}
func (ps *peerStore) ScrapeSwarm(ih bittorrent.InfoHash, peer bittorrent.Peer) (resp bittorrent.Scrape) {
func (ps *peerStore) ScrapeSwarm(ih bittorrent.InfoHash, peer bittorrent.Peer) (leechers uint32, seeders uint32, snatched uint32) {
select {
case <-ps.closed:
panic("attempted to interact with stopped memory store")
default:
}
resp.InfoHash = ih
shard := ps.shards[ps.shardIndex(ih, peer.Addr())]
shard.RLock()
@@ -433,8 +427,7 @@ func (ps *peerStore) ScrapeSwarm(ih bittorrent.InfoHash, peer bittorrent.Peer) (
return
}
resp.Incomplete = uint32(len(swarm.leechers))
resp.Complete = uint32(len(swarm.seeders))
leechers, seeders = uint32(len(swarm.leechers)), uint32(len(swarm.seeders))
shard.RUnlock()
return
+41 -32
View File
@@ -64,7 +64,7 @@ const (
var ErrSentinelAndClusterChecked = errors.New("unable to use both cluster and sentinel mode")
func init() {
// Register the storage RedisDriver.
// Register the storage builder.
storage.RegisterBuilder(Name, Builder)
}
@@ -426,51 +426,42 @@ func (ps *store) GraduateLeecher(ih bittorrent.InfoHash, peer bittorrent.Peer) e
})
}
func (ps *store) getPeers(infoHashKey, except string, max int, v4Only bool) (peers []bittorrent.Peer, err error) {
func (ps Connection) parsePeersList(peersResult *redis.StringSliceCmd, skipPeerId string, v4Only bool) (peers []bittorrent.Peer, err error) {
var peerIds []string
peerIds, err = ps.HKeys(context.TODO(), infoHashKey).Result()
peerIds, err = peersResult.Result()
if err = AsNil(err); err == nil {
for _, peerId := range peerIds {
if peerId != except {
if peerId != skipPeerId {
if p, err := bittorrent.NewPeer(peerId); err == nil {
// If peer from request is V4 only, it won't receive V6 peers from DB
if !(v4Only && p.Addr().Is6()) {
peers = append(peers, p)
max--
}
} else {
log.Error("storage: Redis: unable to decode leecher", log.Fields{"PeerId": peerId})
}
}
if max == 0 {
break
}
}
}
return
}
func (ps *store) AnnouncePeers(ih bittorrent.InfoHash, seeder bool, numWant int, peer bittorrent.Peer) (peers []bittorrent.Peer, err error) {
log.Debug("storage: Redis: AnnouncePeers", log.Fields{
"InfoHash": ih,
"Seeder": seeder,
"NumWant": numWant,
"Peer": peer,
})
func (ps Connection) GetPeers(ih bittorrent.InfoHash, forSeeder bool, maxCount int, peer bittorrent.Peer,
membersFn func(context.Context, string) *redis.StringSliceCmd) (peers []bittorrent.Peer, err error) {
infoHash, peerId, isV4 := ih.RawString(), peer.RawString(), peer.Addr().Is4()
if seeder {
peers, err = ps.getPeers(IHLeecherKey+infoHash, peerId, numWant, isV4)
if forSeeder {
peers, err = ps.parsePeersList(membersFn(context.TODO(), IHLeecherKey+infoHash), peerId, isV4)
} else {
// Append as many seeders as possible.
peers, err = ps.getPeers(IHSeederKey+infoHash, peerId, numWant, isV4)
peers, err = ps.parsePeersList(membersFn(context.TODO(), IHSeederKey+infoHash), peerId, isV4)
if err != nil {
return
}
if numWant -= len(peers); numWant > 0 {
if leechers, err := ps.getPeers(IHLeecherKey+infoHash, peerId, numWant, isV4); err == nil {
if maxCount -= len(peers); maxCount > 0 {
if leechers, err := ps.parsePeersList(membersFn(context.TODO(), IHLeecherKey+infoHash), peerId, isV4); err == nil {
peers = append(peers, leechers...)
} else {
log.Warn("storage: Redis: error occurred while receiving leechers", log.Fields{"InfoHash": ih, "Error": err})
@@ -485,37 +476,55 @@ func (ps *store) AnnouncePeers(ih bittorrent.InfoHash, seeder bool, numWant int,
return
}
func (ps *store) ScrapeSwarm(ih bittorrent.InfoHash, peer bittorrent.Peer) (resp bittorrent.Scrape) {
log.Debug("storage: RedisDriver ScrapeSwarm", log.Fields{
func (ps *store) AnnouncePeers(ih bittorrent.InfoHash, seeder bool, numWant int, peer bittorrent.Peer) ([]bittorrent.Peer, error) {
log.Debug("storage: Redis: AnnouncePeers", log.Fields{
"InfoHash": ih,
"Seeder": seeder,
"NumWant": numWant,
"Peer": peer,
})
resp.InfoHash = ih
return ps.GetPeers(ih, seeder, numWant, peer, func(ctx context.Context, infoHashKey string) *redis.StringSliceCmd {
return ps.HRandField(ctx, infoHashKey, numWant, false)
})
}
func (ps Connection) CountPeers(ih bittorrent.InfoHash, countFn func(context.Context, string) *redis.IntCmd) (leechersCount, seedersCount uint32) {
infoHash := ih.RawString()
ihSeederKey, ihLeecherKey := IHSeederKey+infoHash, IHLeecherKey+infoHash
leechersLen, err := ps.HLen(context.TODO(), ihLeecherKey).Result()
count, err := countFn(context.TODO(), ihLeecherKey).Result()
err = AsNil(err)
if err != nil {
log.Error("storage: Redis: HLEN failure", log.Fields{
log.Error("storage: Redis: key size calculation failure", log.Fields{
"InfoHashKey": ihLeecherKey,
"Error": err,
})
return
}
leechersCount = uint32(count)
seedersLen, err := ps.HLen(context.TODO(), ihSeederKey).Result()
count, err = countFn(context.TODO(), ihSeederKey).Result()
err = AsNil(err)
if err != nil {
log.Error("storage: Redis: HLEN failure", log.Fields{
log.Error("storage: Redis: key size calculation failure", log.Fields{
"InfoHashKey": ihSeederKey,
"Error": err,
})
return
}
seedersCount = uint32(count)
resp.Incomplete = uint32(leechersLen)
resp.Complete = uint32(seedersLen)
return
}
func (ps *store) ScrapeSwarm(ih bittorrent.InfoHash, peer bittorrent.Peer) (leechers uint32, seeders uint32, snatched uint32) {
log.Debug("storage: Redis ScrapeSwarm", log.Fields{
"InfoHash": ih,
"Peer": peer,
})
leechers, seeders = ps.CountPeers(ih, ps.HLen)
return
}
@@ -534,7 +543,7 @@ func (ps Connection) Put(ctx string, values ...storage.Entry) (err error) {
err = ps.HSet(context.TODO(), PrefixKey+ctx, args...).Err()
if err != nil {
if strings.Contains(err.Error(), argNumErrorMsg) {
log.Warn("This RedisDriver version/implementation does not support variadic arguments for HSET")
log.Warn("This Redis version/implementation does not support variadic arguments for HSET")
for _, p := range values {
if err = ps.HSet(context.TODO(), PrefixKey+ctx, p.Key, p.Value).Err(); err != nil {
break
@@ -565,7 +574,7 @@ func (ps Connection) Delete(ctx string, keys ...string) (err error) {
err = AsNil(ps.HDel(context.TODO(), PrefixKey+ctx, keys...).Err())
if err != nil {
if strings.Contains(err.Error(), argNumErrorMsg) {
log.Warn("This RedisDriver version/implementation does not support variadic arguments for HDEL")
log.Warn("This Redis version/implementation does not support variadic arguments for HDEL")
for _, k := range keys {
if err = AsNil(ps.HDel(context.TODO(), PrefixKey+ctx, k).Err()); err != nil {
break
@@ -669,7 +678,7 @@ func (ps *store) gc(cutoff time.Time) {
err = AsNil(err)
if err != nil {
if strings.Contains(err.Error(), argNumErrorMsg) {
log.Warn("This RedisDriver version/implementation does not support variadic arguments for HDEL")
log.Warn("This Redis version/implementation does not support variadic arguments for HDEL")
for _, k := range peersToRemove {
count, err := ps.HDel(context.Background(), infoHashKey, k).Result()
err = AsNil(err)
+1 -1
View File
@@ -24,7 +24,7 @@ func createNew() s.PeerStorage {
var err error
ps, err = New(cfg)
if err != nil {
fmt.Println("unable to create real RedisDriver connection: ", err, " using simulator")
fmt.Println("unable to create real redis connection: ", err, " using simulator")
var rs *miniredis.Miniredis
rs, err = miniredis.Run()
if err != nil {
+1 -1
View File
@@ -201,7 +201,7 @@ type PeerStorage interface {
// filling the Snatches field is optional.
//
// If the Swarm does not exist, an empty Scrape and no error is returned.
ScrapeSwarm(infoHash bittorrent.InfoHash, peer bittorrent.Peer) bittorrent.Scrape
ScrapeSwarm(infoHash bittorrent.InfoHash, peer bittorrent.Peer) (leechers uint32, seeders uint32, snatched uint32)
// Stopper is an interface that expects a Stop method to stop the PeerStorage.
// For more details see the documentation in the stop package.
+46 -40
View File
@@ -16,22 +16,27 @@ import (
"github.com/sot-tech/mochi/storage"
)
const (
ihCount = 1000
peersCount = 1000
)
type benchData struct {
infoHashes [1000]bittorrent.InfoHash
peers [1000]bittorrent.Peer
infoHashes [ihCount]bittorrent.InfoHash
peers [peersCount]bittorrent.Peer
}
func generateInfoHashes() (a [1000]bittorrent.InfoHash) {
func generateInfoHashes() (a [ihCount]bittorrent.InfoHash) {
for i := range a {
a[i] = randIH(rand.Int63()%2 == 0)
a[i] = randIH(i < ihCount/2)
}
return
}
func generatePeers() (a [1000]bittorrent.Peer) {
func generatePeers() (a [peersCount]bittorrent.Peer) {
for i := range a {
var ip []byte
if rand.Int63()%2 == 0 {
if i < peersCount/2 {
ip = make([]byte, net.IPv4len)
} else {
ip = make([]byte, net.IPv6len)
@@ -41,7 +46,7 @@ func generatePeers() (a [1000]bittorrent.Peer) {
if !ok {
panic("unable to create ip from random bytes")
}
port := uint16(rand.Uint32())
port := uint16(rand.Int63())
a[i] = bittorrent.Peer{
ID: randPeerID(),
AddrPort: netip.AddrPortFrom(addr, port),
@@ -64,7 +69,7 @@ type benchHolder struct {
func (bh *benchHolder) runBenchmark(b *testing.B, parallel bool, sf benchSetupFunc, ef benchExecFunc) {
ps := bh.st()
bd := &benchData{generateInfoHashes(), generatePeers()}
spacing := int32(1000 / runtime.NumCPU())
spacing := int32(ihCount / runtime.NumCPU())
if sf != nil {
err := sf(ps, bd)
if err != nil {
@@ -130,7 +135,7 @@ func (bh *benchHolder) Put(b *testing.B) {
// Put1k can run in parallel.
func (bh *benchHolder) Put1k(b *testing.B) {
bh.runBenchmark(b, true, nil, func(i int, ps storage.PeerStorage, bd *benchData) error {
return ps.PutSeeder(bd.infoHashes[0], bd.peers[i%1000])
return ps.PutSeeder(bd.infoHashes[0], bd.peers[i%peersCount])
})
}
@@ -140,7 +145,7 @@ func (bh *benchHolder) Put1k(b *testing.B) {
// Put1kInfoHash can run in parallel.
func (bh *benchHolder) Put1kInfoHash(b *testing.B) {
bh.runBenchmark(b, true, nil, func(i int, ps storage.PeerStorage, bd *benchData) error {
return ps.PutSeeder(bd.infoHashes[i%1000], bd.peers[0])
return ps.PutSeeder(bd.infoHashes[i%ihCount], bd.peers[0])
})
}
@@ -150,7 +155,7 @@ func (bh *benchHolder) Put1kInfoHash(b *testing.B) {
// Put1kInfoHash1k can run in parallel.
func (bh *benchHolder) Put1kInfoHash1k(b *testing.B) {
bh.runBenchmark(b, true, nil, func(i int, ps storage.PeerStorage, bd *benchData) error {
err := ps.PutSeeder(bd.infoHashes[i%1000], bd.peers[(i*3)%1000])
err := ps.PutSeeder(bd.infoHashes[i%ihCount], bd.peers[(i*3)%peersCount])
return err
})
}
@@ -175,11 +180,11 @@ func (bh *benchHolder) PutDelete(b *testing.B) {
// PutDelete1k can not run in parallel.
func (bh *benchHolder) PutDelete1k(b *testing.B) {
bh.runBenchmark(b, false, nil, func(i int, ps storage.PeerStorage, bd *benchData) error {
err := ps.PutSeeder(bd.infoHashes[0], bd.peers[i%1000])
err := ps.PutSeeder(bd.infoHashes[0], bd.peers[i%peersCount])
if err != nil {
return err
}
return ps.DeleteSeeder(bd.infoHashes[0], bd.peers[i%1000])
return ps.DeleteSeeder(bd.infoHashes[0], bd.peers[i%peersCount])
})
}
@@ -189,11 +194,11 @@ func (bh *benchHolder) PutDelete1k(b *testing.B) {
// PutDelete1kInfoHash can not run in parallel.
func (bh *benchHolder) PutDelete1kInfoHash(b *testing.B) {
bh.runBenchmark(b, false, nil, func(i int, ps storage.PeerStorage, bd *benchData) error {
err := ps.PutSeeder(bd.infoHashes[i%1000], bd.peers[0])
err := ps.PutSeeder(bd.infoHashes[i%ihCount], bd.peers[0])
if err != nil {
return err
}
return ps.DeleteSeeder(bd.infoHashes[i%1000], bd.peers[0])
return ps.DeleteSeeder(bd.infoHashes[i%ihCount], bd.peers[0])
})
}
@@ -203,11 +208,11 @@ func (bh *benchHolder) PutDelete1kInfoHash(b *testing.B) {
// PutDelete1kInfoHash1k can not run in parallel.
func (bh *benchHolder) PutDelete1kInfoHash1k(b *testing.B) {
bh.runBenchmark(b, false, nil, func(i int, ps storage.PeerStorage, bd *benchData) error {
err := ps.PutSeeder(bd.infoHashes[i%1000], bd.peers[(i*3)%1000])
err := ps.PutSeeder(bd.infoHashes[i%ihCount], bd.peers[(i*3)%peersCount])
if err != nil {
return err
}
err = ps.DeleteSeeder(bd.infoHashes[i%1000], bd.peers[(i*3)%1000])
err = ps.DeleteSeeder(bd.infoHashes[i%ihCount], bd.peers[(i*3)%peersCount])
return err
})
}
@@ -229,7 +234,7 @@ func (bh *benchHolder) DeleteNonexist(b *testing.B) {
// DeleteNonexist can run in parallel.
func (bh *benchHolder) DeleteNonexist1k(b *testing.B) {
bh.runBenchmark(b, true, nil, func(i int, ps storage.PeerStorage, bd *benchData) error {
_ = ps.DeleteSeeder(bd.infoHashes[0], bd.peers[i%1000])
_ = ps.DeleteSeeder(bd.infoHashes[0], bd.peers[i%peersCount])
return nil
})
}
@@ -240,7 +245,7 @@ func (bh *benchHolder) DeleteNonexist1k(b *testing.B) {
// DeleteNonexist1kInfoHash can run in parallel.
func (bh *benchHolder) DeleteNonexist1kInfoHash(b *testing.B) {
bh.runBenchmark(b, true, nil, func(i int, ps storage.PeerStorage, bd *benchData) error {
_ = ps.DeleteSeeder(bd.infoHashes[i%1000], bd.peers[0])
_ = ps.DeleteSeeder(bd.infoHashes[i%ihCount], bd.peers[0])
return nil
})
}
@@ -251,7 +256,7 @@ func (bh *benchHolder) DeleteNonexist1kInfoHash(b *testing.B) {
// DeleteNonexist1kInfoHash1k can run in parallel.
func (bh *benchHolder) DeleteNonexist1kInfoHash1k(b *testing.B) {
bh.runBenchmark(b, true, nil, func(i int, ps storage.PeerStorage, bd *benchData) error {
_ = ps.DeleteSeeder(bd.infoHashes[i%1000], bd.peers[(i*3)%1000])
_ = ps.DeleteSeeder(bd.infoHashes[i%ihCount], bd.peers[(i*3)%peersCount])
return nil
})
}
@@ -273,7 +278,7 @@ func (bh *benchHolder) GradNonexist(b *testing.B) {
// GradNonexist1k can run in parallel.
func (bh *benchHolder) GradNonexist1k(b *testing.B) {
bh.runBenchmark(b, true, nil, func(i int, ps storage.PeerStorage, bd *benchData) error {
_ = ps.GraduateLeecher(bd.infoHashes[0], bd.peers[i%1000])
_ = ps.GraduateLeecher(bd.infoHashes[0], bd.peers[i%peersCount])
return nil
})
}
@@ -284,7 +289,7 @@ func (bh *benchHolder) GradNonexist1k(b *testing.B) {
// GradNonexist1kInfoHash can run in parallel.
func (bh *benchHolder) GradNonexist1kInfoHash(b *testing.B) {
bh.runBenchmark(b, true, nil, func(i int, ps storage.PeerStorage, bd *benchData) error {
_ = ps.GraduateLeecher(bd.infoHashes[i%1000], bd.peers[0])
_ = ps.GraduateLeecher(bd.infoHashes[i%ihCount], bd.peers[0])
return nil
})
}
@@ -296,7 +301,7 @@ func (bh *benchHolder) GradNonexist1kInfoHash(b *testing.B) {
// GradNonexist1kInfoHash1k can run in parallel.
func (bh *benchHolder) GradNonexist1kInfoHash1k(b *testing.B) {
bh.runBenchmark(b, true, nil, func(i int, ps storage.PeerStorage, bd *benchData) error {
_ = ps.GraduateLeecher(bd.infoHashes[i%1000], bd.peers[(i*3)%1000])
_ = ps.GraduateLeecher(bd.infoHashes[i%ihCount], bd.peers[(i*3)%peersCount])
return nil
})
}
@@ -325,15 +330,15 @@ func (bh *benchHolder) PutGradDelete(b *testing.B) {
// PutGradDelete1k can not run in parallel.
func (bh *benchHolder) PutGradDelete1k(b *testing.B) {
bh.runBenchmark(b, false, nil, func(i int, ps storage.PeerStorage, bd *benchData) error {
err := ps.PutLeecher(bd.infoHashes[0], bd.peers[i%1000])
err := ps.PutLeecher(bd.infoHashes[0], bd.peers[i%peersCount])
if err != nil {
return err
}
err = ps.GraduateLeecher(bd.infoHashes[0], bd.peers[i%1000])
err = ps.GraduateLeecher(bd.infoHashes[0], bd.peers[i%peersCount])
if err != nil {
return err
}
return ps.DeleteSeeder(bd.infoHashes[0], bd.peers[i%1000])
return ps.DeleteSeeder(bd.infoHashes[0], bd.peers[i%peersCount])
})
}
@@ -343,15 +348,15 @@ func (bh *benchHolder) PutGradDelete1k(b *testing.B) {
// PutGradDelete1kInfoHash can not run in parallel.
func (bh *benchHolder) PutGradDelete1kInfoHash(b *testing.B) {
bh.runBenchmark(b, false, nil, func(i int, ps storage.PeerStorage, bd *benchData) error {
err := ps.PutLeecher(bd.infoHashes[i%1000], bd.peers[0])
err := ps.PutLeecher(bd.infoHashes[i%ihCount], bd.peers[0])
if err != nil {
return err
}
err = ps.GraduateLeecher(bd.infoHashes[i%1000], bd.peers[0])
err = ps.GraduateLeecher(bd.infoHashes[i%ihCount], bd.peers[0])
if err != nil {
return err
}
return ps.DeleteSeeder(bd.infoHashes[i%1000], bd.peers[0])
return ps.DeleteSeeder(bd.infoHashes[i%ihCount], bd.peers[0])
})
}
@@ -361,27 +366,28 @@ func (bh *benchHolder) PutGradDelete1kInfoHash(b *testing.B) {
// PutGradDelete1kInfoHash can not run in parallel.
func (bh *benchHolder) PutGradDelete1kInfoHash1k(b *testing.B) {
bh.runBenchmark(b, false, nil, func(i int, ps storage.PeerStorage, bd *benchData) error {
err := ps.PutLeecher(bd.infoHashes[i%1000], bd.peers[(i*3)%1000])
err := ps.PutLeecher(bd.infoHashes[i%ihCount], bd.peers[(i*3)%peersCount])
if err != nil {
return err
}
err = ps.GraduateLeecher(bd.infoHashes[i%1000], bd.peers[(i*3)%1000])
err = ps.GraduateLeecher(bd.infoHashes[i%ihCount], bd.peers[(i*3)%peersCount])
if err != nil {
return err
}
err = ps.DeleteSeeder(bd.infoHashes[i%1000], bd.peers[(i*3)%1000])
err = ps.DeleteSeeder(bd.infoHashes[i%ihCount], bd.peers[(i*3)%peersCount])
return err
})
}
func putPeers(ps storage.PeerStorage, bd *benchData) error {
for i := 0; i < 1000; i++ {
for j := 0; j < 1000; j++ {
l := len(bd.peers)
for _, ih := range bd.infoHashes {
for i, peer := range bd.peers {
var err error
if j < 1000/2 {
err = ps.PutLeecher(bd.infoHashes[i], bd.peers[j])
if i < l/2 {
err = ps.PutLeecher(ih, peer)
} else {
err = ps.PutSeeder(bd.infoHashes[i], bd.peers[j])
err = ps.PutSeeder(ih, peer)
}
if err != nil {
return err
@@ -409,7 +415,7 @@ func (bh *benchHolder) AnnounceLeecher(b *testing.B) {
// AnnounceLeecher1kInfoHash can run in parallel.
func (bh *benchHolder) AnnounceLeecher1kInfoHash(b *testing.B) {
bh.runBenchmark(b, true, putPeers, func(i int, ps storage.PeerStorage, bd *benchData) error {
_, err := ps.AnnouncePeers(bd.infoHashes[i%1000], false, 50, bd.peers[0])
_, err := ps.AnnouncePeers(bd.infoHashes[i%ihCount], false, 50, bd.peers[0])
return err
})
}
@@ -431,7 +437,7 @@ func (bh *benchHolder) AnnounceSeeder(b *testing.B) {
// AnnounceSeeder1kInfoHash can run in parallel.
func (bh *benchHolder) AnnounceSeeder1kInfoHash(b *testing.B) {
bh.runBenchmark(b, true, putPeers, func(i int, ps storage.PeerStorage, bd *benchData) error {
_, err := ps.AnnouncePeers(bd.infoHashes[i%1000], true, 50, bd.peers[0])
_, err := ps.AnnouncePeers(bd.infoHashes[i%ihCount], true, 50, bd.peers[0])
return err
})
}
@@ -452,7 +458,7 @@ func (bh *benchHolder) ScrapeSwarm(b *testing.B) {
// ScrapeSwarm1kInfoHash can run in parallel.
func (bh *benchHolder) ScrapeSwarm1kInfoHash(b *testing.B) {
bh.runBenchmark(b, true, putPeers, func(i int, ps storage.PeerStorage, bd *benchData) error {
ps.ScrapeSwarm(bd.infoHashes[i%1000], bd.peers[0])
ps.ScrapeSwarm(bd.infoHashes[i%ihCount], bd.peers[0])
return nil
})
}
+10 -10
View File
@@ -68,10 +68,10 @@ func (th *testHolder) AnnouncePeers(t *testing.T) {
func (th *testHolder) ScrapeSwarm(t *testing.T) {
for _, c := range testData {
scrape := th.st.ScrapeSwarm(c.ih, c.peer)
require.Equal(t, uint32(0), scrape.Complete)
require.Equal(t, uint32(0), scrape.Incomplete)
require.Equal(t, uint32(0), scrape.Snatches)
l, s, n := th.st.ScrapeSwarm(c.ih, c.peer)
require.Equal(t, uint32(0), s)
require.Equal(t, uint32(0), l)
require.Equal(t, uint32(0), n)
}
}
@@ -93,9 +93,9 @@ func (th *testHolder) LeecherPutAnnounceDeleteAnnounce(t *testing.T) {
require.Nil(t, err)
require.True(t, containsPeer(peers, c.peer))
scrape := th.st.ScrapeSwarm(c.ih, c.peer)
require.Equal(t, uint32(2), scrape.Incomplete)
require.Equal(t, uint32(0), scrape.Complete)
l, s, _ := th.st.ScrapeSwarm(c.ih, c.peer)
require.Equal(t, uint32(2), l)
require.Equal(t, uint32(0), s)
err = th.st.DeleteLeecher(c.ih, c.peer)
require.Nil(t, err)
@@ -123,9 +123,9 @@ func (th *testHolder) SeederPutAnnounceDeleteAnnounce(t *testing.T) {
require.Nil(t, err)
require.True(t, containsPeer(peers, c.peer))
scrape := th.st.ScrapeSwarm(c.ih, c.peer)
require.Equal(t, uint32(1), scrape.Incomplete)
require.Equal(t, uint32(1), scrape.Complete)
l, s, _ := th.st.ScrapeSwarm(c.ih, c.peer)
require.Equal(t, uint32(1), l)
require.Equal(t, uint32(1), s)
err = th.st.DeleteSeeder(c.ih, c.peer)
require.Nil(t, err)