From 22f459315b4f05106748d92437290e066bf1c068 Mon Sep 17 00:00:00 2001 From: "Lawrence, Rendall" Date: Sun, 24 Apr 2022 02:23:18 +0300 Subject: [PATCH] (wip) implement keydb store * make redis store reusable for keydb * replace redis HKeys calls in AnnouncePeers with HRandField * change signature of storage.ScrapeSwarm --- middleware/hooks.go | 13 ++-- storage/keydb/storage.go | 109 +++++++++++++++++++++++------- storage/keydb/storage_test.go | 33 +++++++++ storage/memory/storage.go | 13 +--- storage/redis/storage.go | 73 +++++++++++--------- storage/redis/storage_test.go | 2 +- storage/storage.go | 2 +- storage/test/storage_bench.go | 86 ++++++++++++----------- storage/test/storage_test_base.go | 20 +++--- 9 files changed, 227 insertions(+), 124 deletions(-) create mode 100644 storage/keydb/storage_test.go diff --git a/middleware/hooks.go b/middleware/hooks.go index 3120245..5550fa4 100644 --- a/middleware/hooks.go +++ b/middleware/hooks.go @@ -96,9 +96,7 @@ func (h *responseHook) HandleAnnounce(ctx context.Context, req *bittorrent.Annou } // Add the Scrape data to the response. - s := h.store.ScrapeSwarm(req.InfoHash, req.Peer) - resp.Incomplete = s.Incomplete - resp.Complete = s.Complete + resp.Incomplete, resp.Complete, _ = h.store.ScrapeSwarm(req.InfoHash, req.Peer) err = h.appendPeers(req, resp) return ctx, err @@ -161,7 +159,14 @@ func (h *responseHook) HandleScrape(ctx context.Context, req *bittorrent.ScrapeR } for _, infoHash := range req.InfoHashes { - resp.Files = append(resp.Files, h.store.ScrapeSwarm(infoHash, req.Peer)) + leechers, seeders, snatched := h.store.ScrapeSwarm(infoHash, req.Peer) + + resp.Files = append(resp.Files, bittorrent.Scrape{ + InfoHash: infoHash, + Snatches: snatched, + Complete: seeders, + Incomplete: leechers, + }) } return ctx, nil diff --git a/storage/keydb/storage.go b/storage/keydb/storage.go index 3ea4ae1..a1ad1c3 100644 --- a/storage/keydb/storage.go +++ b/storage/keydb/storage.go @@ -1,8 +1,12 @@ // Package keydb implements the storage interface. -// This storage mostly is the same as redis, but it collects peers -// not in hashes, but in sets and uses KeyDB-specific command -// `EXPIREMEMBER` and, so it does not need garbage collection. -// Note: this storage also does not support statistics collection +// This storage mostly is the same as redis, but +// uses KeyDB-specific command `EXPIREMEMBER`, so it +// does not need garbage collection. +// +// Storage uses redis.IHSeederKey and redis.IHLeecherKey, +// BUT they are NOT compatible with each other because of +// another structure (hash in redis and set in keydb). +// Note: this storage also does not support statistics collection. package keydb import ( @@ -66,7 +70,7 @@ func New(cfg r.Config) (*store, error) { var st *store if err == nil { - st = &store{rs, cfg.LogFields()} + st = &store{rs, cfg.LogFields(), uint(cfg.PeerLifetime.Seconds())} } return st, err @@ -75,41 +79,94 @@ func New(cfg r.Config) (*store, error) { type store struct { r.Connection logFields log.Fields + peerTTL uint } -func (s store) PutSeeder(infoHash bittorrent.InfoHash, peer bittorrent.Peer) error { - // TODO implement me - panic("implement me") +func (s store) setPeerTTL(infoHashKey, peerId string) error { + return s.Process(context.TODO(), redis.NewCmd(context.TODO(), expireMemberCmd, infoHashKey, peerId, s.peerTTL)) } -func (s store) DeleteSeeder(infoHash bittorrent.InfoHash, peer bittorrent.Peer) error { - // TODO implement me - panic("implement me") +func (s store) addPeer(infoHashKey, peerId string) (err error) { + log.Debug("storage: KeyDB: PutPeer", log.Fields{ + "InfoHashKey": infoHashKey, + "PeerId": peerId, + }) + if err = s.SAdd(context.TODO(), infoHashKey, peerId).Err(); err == nil { + err = s.setPeerTTL(infoHashKey, peerId) + } + return } -func (s store) PutLeecher(infoHash bittorrent.InfoHash, peer bittorrent.Peer) error { - // TODO implement me - panic("implement me") +func (s store) delPeer(infoHashKey, peerId string) error { + log.Debug("storage: KeyDB: DeletePeer", log.Fields{ + "InfoHashKey": infoHashKey, + "PeerId": peerId, + }) + deleted, err := s.SRem(context.TODO(), infoHashKey, peerId).Uint64() + err = r.AsNil(err) + if err == nil && deleted == 0 { + err = storage.ErrResourceDoesNotExist + } + + return err } -func (s store) DeleteLeecher(infoHash bittorrent.InfoHash, peer bittorrent.Peer) error { - // TODO implement me - panic("implement me") +func (s store) PutSeeder(ih bittorrent.InfoHash, peer bittorrent.Peer) error { + return s.addPeer(r.IHSeederKey+ih.RawString(), peer.RawString()) } -func (s store) GraduateLeecher(infoHash bittorrent.InfoHash, peer bittorrent.Peer) error { - // TODO implement me - panic("implement me") +func (s store) DeleteSeeder(ih bittorrent.InfoHash, peer bittorrent.Peer) error { + return s.delPeer(r.IHSeederKey+ih.RawString(), peer.RawString()) } -func (s store) AnnouncePeers(infoHash bittorrent.InfoHash, seeder bool, numWant int, peer bittorrent.Peer) (peers []bittorrent.Peer, err error) { - // TODO implement me - panic("implement me") +func (s store) PutLeecher(ih bittorrent.InfoHash, peer bittorrent.Peer) error { + return s.addPeer(r.IHLeecherKey+ih.RawString(), peer.RawString()) } -func (s store) ScrapeSwarm(infoHash bittorrent.InfoHash, peer bittorrent.Peer) bittorrent.Scrape { - // TODO implement me - panic("implement me") +func (s store) DeleteLeecher(ih bittorrent.InfoHash, peer bittorrent.Peer) error { + return s.delPeer(r.IHLeecherKey+ih.RawString(), peer.RawString()) +} + +func (s store) GraduateLeecher(ih bittorrent.InfoHash, peer bittorrent.Peer) (err error) { + log.Debug("storage: KeyDB: GraduateLeecher", log.Fields{ + "InfoHash": ih, + "Peer": peer, + }) + infoHash, peerId := ih.RawString(), peer.RawString() + ihSeederKey, ihLeecherKey := r.IHSeederKey+infoHash, r.IHLeecherKey+infoHash + var moved bool + if moved, err = s.SMove(context.TODO(), ihLeecherKey, ihSeederKey, peerId).Result(); err == nil { + if moved { + err = s.setPeerTTL(ihSeederKey, peerId) + } else { + err = s.addPeer(ihSeederKey, peerId) + } + } + return err +} + +// AnnouncePeers is the same function as redis.AnnouncePeers +func (s store) AnnouncePeers(ih bittorrent.InfoHash, seeder bool, numWant int, peer bittorrent.Peer) ([]bittorrent.Peer, error) { + log.Debug("storage: KeyDB: AnnouncePeers", log.Fields{ + "InfoHash": ih, + "Seeder": seeder, + "NumWant": numWant, + "Peer": peer, + }) + + return s.GetPeers(ih, seeder, numWant, peer, func(ctx context.Context, infoHashKey string) *redis.StringSliceCmd { + return s.SRandMemberN(context.TODO(), infoHashKey, int64(numWant)) + }) +} + +// ScrapeSwarm is the same function as redis.ScrapeSwarm except `SCard` call instead of `HLen` +func (s store) ScrapeSwarm(ih bittorrent.InfoHash, peer bittorrent.Peer) (leechers uint32, seeders uint32, snatched uint32) { + log.Debug("storage: KeyDB ScrapeSwarm", log.Fields{ + "InfoHash": ih, + "Peer": peer, + }) + leechers, seeders = s.CountPeers(ih, s.SCard) + return } func (s *store) Stop() stop.Result { diff --git a/storage/keydb/storage_test.go b/storage/keydb/storage_test.go new file mode 100644 index 0000000..023545e --- /dev/null +++ b/storage/keydb/storage_test.go @@ -0,0 +1,33 @@ +package keydb + +import ( + "fmt" + "testing" + "time" + + s "github.com/sot-tech/mochi/storage" + r "github.com/sot-tech/mochi/storage/redis" + "github.com/sot-tech/mochi/storage/test" +) + +var cfg = r.Config{ + Addresses: []string{"localhost:6379"}, + PeerLifetime: 30 * time.Minute, + ReadTimeout: 10 * time.Second, + WriteTimeout: 10 * time.Second, + ConnectTimeout: 10 * time.Second, +} + +func createNew() s.PeerStorage { + var ps s.PeerStorage + var err error + ps, err = New(cfg) + if err != nil { + panic(fmt.Sprint("Unable to create KeyDB connection: ", err, "\nThis driver needs real KeyDB instance")) + } + return ps +} + +func TestStorage(t *testing.T) { test.RunTests(t, createNew()) } + +func BenchmarkStorage(b *testing.B) { test.RunBenchmarks(b, createNew) } diff --git a/storage/memory/storage.go b/storage/memory/storage.go index f8823b3..0c23054 100644 --- a/storage/memory/storage.go +++ b/storage/memory/storage.go @@ -129,7 +129,7 @@ func (ps *peerStore) ScheduleGC(gcInterval, peerLifeTime time.Duration) { log.Debug("storage: Memory purging peers with no announces since", log.Fields{"before": before}) start := time.Now() ps.gc(before) - recordGCDuration(time.Since(start)) + storage.PromGCDurationMilliseconds.Observe(float64(time.Since(start).Nanoseconds()) / float64(time.Millisecond)) } } }() @@ -170,11 +170,6 @@ func (ps *peerStore) ScheduleStatisticsCollection(reportInterval time.Duration) }() } -// recordGCDuration records the duration of a GC sweep. -func recordGCDuration(duration time.Duration) { - storage.PromGCDurationMilliseconds.Observe(float64(duration.Nanoseconds()) / float64(time.Millisecond)) -} - func (ps *peerStore) getClock() int64 { return timecache.NowUnixNano() } @@ -416,14 +411,13 @@ func (ps *peerStore) AnnouncePeers(ih bittorrent.InfoHash, seeder bool, numWant return } -func (ps *peerStore) ScrapeSwarm(ih bittorrent.InfoHash, peer bittorrent.Peer) (resp bittorrent.Scrape) { +func (ps *peerStore) ScrapeSwarm(ih bittorrent.InfoHash, peer bittorrent.Peer) (leechers uint32, seeders uint32, snatched uint32) { select { case <-ps.closed: panic("attempted to interact with stopped memory store") default: } - resp.InfoHash = ih shard := ps.shards[ps.shardIndex(ih, peer.Addr())] shard.RLock() @@ -433,8 +427,7 @@ func (ps *peerStore) ScrapeSwarm(ih bittorrent.InfoHash, peer bittorrent.Peer) ( return } - resp.Incomplete = uint32(len(swarm.leechers)) - resp.Complete = uint32(len(swarm.seeders)) + leechers, seeders = uint32(len(swarm.leechers)), uint32(len(swarm.seeders)) shard.RUnlock() return diff --git a/storage/redis/storage.go b/storage/redis/storage.go index 276f408..c14d6d2 100644 --- a/storage/redis/storage.go +++ b/storage/redis/storage.go @@ -64,7 +64,7 @@ const ( var ErrSentinelAndClusterChecked = errors.New("unable to use both cluster and sentinel mode") func init() { - // Register the storage RedisDriver. + // Register the storage builder. storage.RegisterBuilder(Name, Builder) } @@ -426,51 +426,42 @@ func (ps *store) GraduateLeecher(ih bittorrent.InfoHash, peer bittorrent.Peer) e }) } -func (ps *store) getPeers(infoHashKey, except string, max int, v4Only bool) (peers []bittorrent.Peer, err error) { +func (ps Connection) parsePeersList(peersResult *redis.StringSliceCmd, skipPeerId string, v4Only bool) (peers []bittorrent.Peer, err error) { var peerIds []string - peerIds, err = ps.HKeys(context.TODO(), infoHashKey).Result() + peerIds, err = peersResult.Result() if err = AsNil(err); err == nil { for _, peerId := range peerIds { - if peerId != except { + if peerId != skipPeerId { if p, err := bittorrent.NewPeer(peerId); err == nil { // If peer from request is V4 only, it won't receive V6 peers from DB if !(v4Only && p.Addr().Is6()) { peers = append(peers, p) - max-- } } else { log.Error("storage: Redis: unable to decode leecher", log.Fields{"PeerId": peerId}) } } - if max == 0 { - break - } } } return } -func (ps *store) AnnouncePeers(ih bittorrent.InfoHash, seeder bool, numWant int, peer bittorrent.Peer) (peers []bittorrent.Peer, err error) { - log.Debug("storage: Redis: AnnouncePeers", log.Fields{ - "InfoHash": ih, - "Seeder": seeder, - "NumWant": numWant, - "Peer": peer, - }) +func (ps Connection) GetPeers(ih bittorrent.InfoHash, forSeeder bool, maxCount int, peer bittorrent.Peer, + membersFn func(context.Context, string) *redis.StringSliceCmd) (peers []bittorrent.Peer, err error) { infoHash, peerId, isV4 := ih.RawString(), peer.RawString(), peer.Addr().Is4() - if seeder { - peers, err = ps.getPeers(IHLeecherKey+infoHash, peerId, numWant, isV4) + if forSeeder { + peers, err = ps.parsePeersList(membersFn(context.TODO(), IHLeecherKey+infoHash), peerId, isV4) } else { // Append as many seeders as possible. - peers, err = ps.getPeers(IHSeederKey+infoHash, peerId, numWant, isV4) + peers, err = ps.parsePeersList(membersFn(context.TODO(), IHSeederKey+infoHash), peerId, isV4) if err != nil { return } - if numWant -= len(peers); numWant > 0 { - if leechers, err := ps.getPeers(IHLeecherKey+infoHash, peerId, numWant, isV4); err == nil { + if maxCount -= len(peers); maxCount > 0 { + if leechers, err := ps.parsePeersList(membersFn(context.TODO(), IHLeecherKey+infoHash), peerId, isV4); err == nil { peers = append(peers, leechers...) } else { log.Warn("storage: Redis: error occurred while receiving leechers", log.Fields{"InfoHash": ih, "Error": err}) @@ -485,37 +476,55 @@ func (ps *store) AnnouncePeers(ih bittorrent.InfoHash, seeder bool, numWant int, return } -func (ps *store) ScrapeSwarm(ih bittorrent.InfoHash, peer bittorrent.Peer) (resp bittorrent.Scrape) { - log.Debug("storage: RedisDriver ScrapeSwarm", log.Fields{ +func (ps *store) AnnouncePeers(ih bittorrent.InfoHash, seeder bool, numWant int, peer bittorrent.Peer) ([]bittorrent.Peer, error) { + log.Debug("storage: Redis: AnnouncePeers", log.Fields{ "InfoHash": ih, + "Seeder": seeder, + "NumWant": numWant, "Peer": peer, }) - resp.InfoHash = ih + + return ps.GetPeers(ih, seeder, numWant, peer, func(ctx context.Context, infoHashKey string) *redis.StringSliceCmd { + return ps.HRandField(ctx, infoHashKey, numWant, false) + }) +} + +func (ps Connection) CountPeers(ih bittorrent.InfoHash, countFn func(context.Context, string) *redis.IntCmd) (leechersCount, seedersCount uint32) { infoHash := ih.RawString() ihSeederKey, ihLeecherKey := IHSeederKey+infoHash, IHLeecherKey+infoHash - leechersLen, err := ps.HLen(context.TODO(), ihLeecherKey).Result() + count, err := countFn(context.TODO(), ihLeecherKey).Result() err = AsNil(err) if err != nil { - log.Error("storage: Redis: HLEN failure", log.Fields{ + log.Error("storage: Redis: key size calculation failure", log.Fields{ "InfoHashKey": ihLeecherKey, "Error": err, }) return } + leechersCount = uint32(count) - seedersLen, err := ps.HLen(context.TODO(), ihSeederKey).Result() + count, err = countFn(context.TODO(), ihSeederKey).Result() err = AsNil(err) if err != nil { - log.Error("storage: Redis: HLEN failure", log.Fields{ + log.Error("storage: Redis: key size calculation failure", log.Fields{ "InfoHashKey": ihSeederKey, "Error": err, }) return } + seedersCount = uint32(count) - resp.Incomplete = uint32(leechersLen) - resp.Complete = uint32(seedersLen) + return +} + +func (ps *store) ScrapeSwarm(ih bittorrent.InfoHash, peer bittorrent.Peer) (leechers uint32, seeders uint32, snatched uint32) { + log.Debug("storage: Redis ScrapeSwarm", log.Fields{ + "InfoHash": ih, + "Peer": peer, + }) + + leechers, seeders = ps.CountPeers(ih, ps.HLen) return } @@ -534,7 +543,7 @@ func (ps Connection) Put(ctx string, values ...storage.Entry) (err error) { err = ps.HSet(context.TODO(), PrefixKey+ctx, args...).Err() if err != nil { if strings.Contains(err.Error(), argNumErrorMsg) { - log.Warn("This RedisDriver version/implementation does not support variadic arguments for HSET") + log.Warn("This Redis version/implementation does not support variadic arguments for HSET") for _, p := range values { if err = ps.HSet(context.TODO(), PrefixKey+ctx, p.Key, p.Value).Err(); err != nil { break @@ -565,7 +574,7 @@ func (ps Connection) Delete(ctx string, keys ...string) (err error) { err = AsNil(ps.HDel(context.TODO(), PrefixKey+ctx, keys...).Err()) if err != nil { if strings.Contains(err.Error(), argNumErrorMsg) { - log.Warn("This RedisDriver version/implementation does not support variadic arguments for HDEL") + log.Warn("This Redis version/implementation does not support variadic arguments for HDEL") for _, k := range keys { if err = AsNil(ps.HDel(context.TODO(), PrefixKey+ctx, k).Err()); err != nil { break @@ -669,7 +678,7 @@ func (ps *store) gc(cutoff time.Time) { err = AsNil(err) if err != nil { if strings.Contains(err.Error(), argNumErrorMsg) { - log.Warn("This RedisDriver version/implementation does not support variadic arguments for HDEL") + log.Warn("This Redis version/implementation does not support variadic arguments for HDEL") for _, k := range peersToRemove { count, err := ps.HDel(context.Background(), infoHashKey, k).Result() err = AsNil(err) diff --git a/storage/redis/storage_test.go b/storage/redis/storage_test.go index bf53779..1561a78 100644 --- a/storage/redis/storage_test.go +++ b/storage/redis/storage_test.go @@ -24,7 +24,7 @@ func createNew() s.PeerStorage { var err error ps, err = New(cfg) if err != nil { - fmt.Println("unable to create real RedisDriver connection: ", err, " using simulator") + fmt.Println("unable to create real redis connection: ", err, " using simulator") var rs *miniredis.Miniredis rs, err = miniredis.Run() if err != nil { diff --git a/storage/storage.go b/storage/storage.go index 0214197..6c1801d 100644 --- a/storage/storage.go +++ b/storage/storage.go @@ -201,7 +201,7 @@ type PeerStorage interface { // filling the Snatches field is optional. // // If the Swarm does not exist, an empty Scrape and no error is returned. - ScrapeSwarm(infoHash bittorrent.InfoHash, peer bittorrent.Peer) bittorrent.Scrape + ScrapeSwarm(infoHash bittorrent.InfoHash, peer bittorrent.Peer) (leechers uint32, seeders uint32, snatched uint32) // Stopper is an interface that expects a Stop method to stop the PeerStorage. // For more details see the documentation in the stop package. diff --git a/storage/test/storage_bench.go b/storage/test/storage_bench.go index f3f6d8a..0bf26a5 100644 --- a/storage/test/storage_bench.go +++ b/storage/test/storage_bench.go @@ -16,22 +16,27 @@ import ( "github.com/sot-tech/mochi/storage" ) +const ( + ihCount = 1000 + peersCount = 1000 +) + type benchData struct { - infoHashes [1000]bittorrent.InfoHash - peers [1000]bittorrent.Peer + infoHashes [ihCount]bittorrent.InfoHash + peers [peersCount]bittorrent.Peer } -func generateInfoHashes() (a [1000]bittorrent.InfoHash) { +func generateInfoHashes() (a [ihCount]bittorrent.InfoHash) { for i := range a { - a[i] = randIH(rand.Int63()%2 == 0) + a[i] = randIH(i < ihCount/2) } return } -func generatePeers() (a [1000]bittorrent.Peer) { +func generatePeers() (a [peersCount]bittorrent.Peer) { for i := range a { var ip []byte - if rand.Int63()%2 == 0 { + if i < peersCount/2 { ip = make([]byte, net.IPv4len) } else { ip = make([]byte, net.IPv6len) @@ -41,7 +46,7 @@ func generatePeers() (a [1000]bittorrent.Peer) { if !ok { panic("unable to create ip from random bytes") } - port := uint16(rand.Uint32()) + port := uint16(rand.Int63()) a[i] = bittorrent.Peer{ ID: randPeerID(), AddrPort: netip.AddrPortFrom(addr, port), @@ -64,7 +69,7 @@ type benchHolder struct { func (bh *benchHolder) runBenchmark(b *testing.B, parallel bool, sf benchSetupFunc, ef benchExecFunc) { ps := bh.st() bd := &benchData{generateInfoHashes(), generatePeers()} - spacing := int32(1000 / runtime.NumCPU()) + spacing := int32(ihCount / runtime.NumCPU()) if sf != nil { err := sf(ps, bd) if err != nil { @@ -130,7 +135,7 @@ func (bh *benchHolder) Put(b *testing.B) { // Put1k can run in parallel. func (bh *benchHolder) Put1k(b *testing.B) { bh.runBenchmark(b, true, nil, func(i int, ps storage.PeerStorage, bd *benchData) error { - return ps.PutSeeder(bd.infoHashes[0], bd.peers[i%1000]) + return ps.PutSeeder(bd.infoHashes[0], bd.peers[i%peersCount]) }) } @@ -140,7 +145,7 @@ func (bh *benchHolder) Put1k(b *testing.B) { // Put1kInfoHash can run in parallel. func (bh *benchHolder) Put1kInfoHash(b *testing.B) { bh.runBenchmark(b, true, nil, func(i int, ps storage.PeerStorage, bd *benchData) error { - return ps.PutSeeder(bd.infoHashes[i%1000], bd.peers[0]) + return ps.PutSeeder(bd.infoHashes[i%ihCount], bd.peers[0]) }) } @@ -150,7 +155,7 @@ func (bh *benchHolder) Put1kInfoHash(b *testing.B) { // Put1kInfoHash1k can run in parallel. func (bh *benchHolder) Put1kInfoHash1k(b *testing.B) { bh.runBenchmark(b, true, nil, func(i int, ps storage.PeerStorage, bd *benchData) error { - err := ps.PutSeeder(bd.infoHashes[i%1000], bd.peers[(i*3)%1000]) + err := ps.PutSeeder(bd.infoHashes[i%ihCount], bd.peers[(i*3)%peersCount]) return err }) } @@ -175,11 +180,11 @@ func (bh *benchHolder) PutDelete(b *testing.B) { // PutDelete1k can not run in parallel. func (bh *benchHolder) PutDelete1k(b *testing.B) { bh.runBenchmark(b, false, nil, func(i int, ps storage.PeerStorage, bd *benchData) error { - err := ps.PutSeeder(bd.infoHashes[0], bd.peers[i%1000]) + err := ps.PutSeeder(bd.infoHashes[0], bd.peers[i%peersCount]) if err != nil { return err } - return ps.DeleteSeeder(bd.infoHashes[0], bd.peers[i%1000]) + return ps.DeleteSeeder(bd.infoHashes[0], bd.peers[i%peersCount]) }) } @@ -189,11 +194,11 @@ func (bh *benchHolder) PutDelete1k(b *testing.B) { // PutDelete1kInfoHash can not run in parallel. func (bh *benchHolder) PutDelete1kInfoHash(b *testing.B) { bh.runBenchmark(b, false, nil, func(i int, ps storage.PeerStorage, bd *benchData) error { - err := ps.PutSeeder(bd.infoHashes[i%1000], bd.peers[0]) + err := ps.PutSeeder(bd.infoHashes[i%ihCount], bd.peers[0]) if err != nil { return err } - return ps.DeleteSeeder(bd.infoHashes[i%1000], bd.peers[0]) + return ps.DeleteSeeder(bd.infoHashes[i%ihCount], bd.peers[0]) }) } @@ -203,11 +208,11 @@ func (bh *benchHolder) PutDelete1kInfoHash(b *testing.B) { // PutDelete1kInfoHash1k can not run in parallel. func (bh *benchHolder) PutDelete1kInfoHash1k(b *testing.B) { bh.runBenchmark(b, false, nil, func(i int, ps storage.PeerStorage, bd *benchData) error { - err := ps.PutSeeder(bd.infoHashes[i%1000], bd.peers[(i*3)%1000]) + err := ps.PutSeeder(bd.infoHashes[i%ihCount], bd.peers[(i*3)%peersCount]) if err != nil { return err } - err = ps.DeleteSeeder(bd.infoHashes[i%1000], bd.peers[(i*3)%1000]) + err = ps.DeleteSeeder(bd.infoHashes[i%ihCount], bd.peers[(i*3)%peersCount]) return err }) } @@ -229,7 +234,7 @@ func (bh *benchHolder) DeleteNonexist(b *testing.B) { // DeleteNonexist can run in parallel. func (bh *benchHolder) DeleteNonexist1k(b *testing.B) { bh.runBenchmark(b, true, nil, func(i int, ps storage.PeerStorage, bd *benchData) error { - _ = ps.DeleteSeeder(bd.infoHashes[0], bd.peers[i%1000]) + _ = ps.DeleteSeeder(bd.infoHashes[0], bd.peers[i%peersCount]) return nil }) } @@ -240,7 +245,7 @@ func (bh *benchHolder) DeleteNonexist1k(b *testing.B) { // DeleteNonexist1kInfoHash can run in parallel. func (bh *benchHolder) DeleteNonexist1kInfoHash(b *testing.B) { bh.runBenchmark(b, true, nil, func(i int, ps storage.PeerStorage, bd *benchData) error { - _ = ps.DeleteSeeder(bd.infoHashes[i%1000], bd.peers[0]) + _ = ps.DeleteSeeder(bd.infoHashes[i%ihCount], bd.peers[0]) return nil }) } @@ -251,7 +256,7 @@ func (bh *benchHolder) DeleteNonexist1kInfoHash(b *testing.B) { // DeleteNonexist1kInfoHash1k can run in parallel. func (bh *benchHolder) DeleteNonexist1kInfoHash1k(b *testing.B) { bh.runBenchmark(b, true, nil, func(i int, ps storage.PeerStorage, bd *benchData) error { - _ = ps.DeleteSeeder(bd.infoHashes[i%1000], bd.peers[(i*3)%1000]) + _ = ps.DeleteSeeder(bd.infoHashes[i%ihCount], bd.peers[(i*3)%peersCount]) return nil }) } @@ -273,7 +278,7 @@ func (bh *benchHolder) GradNonexist(b *testing.B) { // GradNonexist1k can run in parallel. func (bh *benchHolder) GradNonexist1k(b *testing.B) { bh.runBenchmark(b, true, nil, func(i int, ps storage.PeerStorage, bd *benchData) error { - _ = ps.GraduateLeecher(bd.infoHashes[0], bd.peers[i%1000]) + _ = ps.GraduateLeecher(bd.infoHashes[0], bd.peers[i%peersCount]) return nil }) } @@ -284,7 +289,7 @@ func (bh *benchHolder) GradNonexist1k(b *testing.B) { // GradNonexist1kInfoHash can run in parallel. func (bh *benchHolder) GradNonexist1kInfoHash(b *testing.B) { bh.runBenchmark(b, true, nil, func(i int, ps storage.PeerStorage, bd *benchData) error { - _ = ps.GraduateLeecher(bd.infoHashes[i%1000], bd.peers[0]) + _ = ps.GraduateLeecher(bd.infoHashes[i%ihCount], bd.peers[0]) return nil }) } @@ -296,7 +301,7 @@ func (bh *benchHolder) GradNonexist1kInfoHash(b *testing.B) { // GradNonexist1kInfoHash1k can run in parallel. func (bh *benchHolder) GradNonexist1kInfoHash1k(b *testing.B) { bh.runBenchmark(b, true, nil, func(i int, ps storage.PeerStorage, bd *benchData) error { - _ = ps.GraduateLeecher(bd.infoHashes[i%1000], bd.peers[(i*3)%1000]) + _ = ps.GraduateLeecher(bd.infoHashes[i%ihCount], bd.peers[(i*3)%peersCount]) return nil }) } @@ -325,15 +330,15 @@ func (bh *benchHolder) PutGradDelete(b *testing.B) { // PutGradDelete1k can not run in parallel. func (bh *benchHolder) PutGradDelete1k(b *testing.B) { bh.runBenchmark(b, false, nil, func(i int, ps storage.PeerStorage, bd *benchData) error { - err := ps.PutLeecher(bd.infoHashes[0], bd.peers[i%1000]) + err := ps.PutLeecher(bd.infoHashes[0], bd.peers[i%peersCount]) if err != nil { return err } - err = ps.GraduateLeecher(bd.infoHashes[0], bd.peers[i%1000]) + err = ps.GraduateLeecher(bd.infoHashes[0], bd.peers[i%peersCount]) if err != nil { return err } - return ps.DeleteSeeder(bd.infoHashes[0], bd.peers[i%1000]) + return ps.DeleteSeeder(bd.infoHashes[0], bd.peers[i%peersCount]) }) } @@ -343,15 +348,15 @@ func (bh *benchHolder) PutGradDelete1k(b *testing.B) { // PutGradDelete1kInfoHash can not run in parallel. func (bh *benchHolder) PutGradDelete1kInfoHash(b *testing.B) { bh.runBenchmark(b, false, nil, func(i int, ps storage.PeerStorage, bd *benchData) error { - err := ps.PutLeecher(bd.infoHashes[i%1000], bd.peers[0]) + err := ps.PutLeecher(bd.infoHashes[i%ihCount], bd.peers[0]) if err != nil { return err } - err = ps.GraduateLeecher(bd.infoHashes[i%1000], bd.peers[0]) + err = ps.GraduateLeecher(bd.infoHashes[i%ihCount], bd.peers[0]) if err != nil { return err } - return ps.DeleteSeeder(bd.infoHashes[i%1000], bd.peers[0]) + return ps.DeleteSeeder(bd.infoHashes[i%ihCount], bd.peers[0]) }) } @@ -361,27 +366,28 @@ func (bh *benchHolder) PutGradDelete1kInfoHash(b *testing.B) { // PutGradDelete1kInfoHash can not run in parallel. func (bh *benchHolder) PutGradDelete1kInfoHash1k(b *testing.B) { bh.runBenchmark(b, false, nil, func(i int, ps storage.PeerStorage, bd *benchData) error { - err := ps.PutLeecher(bd.infoHashes[i%1000], bd.peers[(i*3)%1000]) + err := ps.PutLeecher(bd.infoHashes[i%ihCount], bd.peers[(i*3)%peersCount]) if err != nil { return err } - err = ps.GraduateLeecher(bd.infoHashes[i%1000], bd.peers[(i*3)%1000]) + err = ps.GraduateLeecher(bd.infoHashes[i%ihCount], bd.peers[(i*3)%peersCount]) if err != nil { return err } - err = ps.DeleteSeeder(bd.infoHashes[i%1000], bd.peers[(i*3)%1000]) + err = ps.DeleteSeeder(bd.infoHashes[i%ihCount], bd.peers[(i*3)%peersCount]) return err }) } func putPeers(ps storage.PeerStorage, bd *benchData) error { - for i := 0; i < 1000; i++ { - for j := 0; j < 1000; j++ { + l := len(bd.peers) + for _, ih := range bd.infoHashes { + for i, peer := range bd.peers { var err error - if j < 1000/2 { - err = ps.PutLeecher(bd.infoHashes[i], bd.peers[j]) + if i < l/2 { + err = ps.PutLeecher(ih, peer) } else { - err = ps.PutSeeder(bd.infoHashes[i], bd.peers[j]) + err = ps.PutSeeder(ih, peer) } if err != nil { return err @@ -409,7 +415,7 @@ func (bh *benchHolder) AnnounceLeecher(b *testing.B) { // AnnounceLeecher1kInfoHash can run in parallel. func (bh *benchHolder) AnnounceLeecher1kInfoHash(b *testing.B) { bh.runBenchmark(b, true, putPeers, func(i int, ps storage.PeerStorage, bd *benchData) error { - _, err := ps.AnnouncePeers(bd.infoHashes[i%1000], false, 50, bd.peers[0]) + _, err := ps.AnnouncePeers(bd.infoHashes[i%ihCount], false, 50, bd.peers[0]) return err }) } @@ -431,7 +437,7 @@ func (bh *benchHolder) AnnounceSeeder(b *testing.B) { // AnnounceSeeder1kInfoHash can run in parallel. func (bh *benchHolder) AnnounceSeeder1kInfoHash(b *testing.B) { bh.runBenchmark(b, true, putPeers, func(i int, ps storage.PeerStorage, bd *benchData) error { - _, err := ps.AnnouncePeers(bd.infoHashes[i%1000], true, 50, bd.peers[0]) + _, err := ps.AnnouncePeers(bd.infoHashes[i%ihCount], true, 50, bd.peers[0]) return err }) } @@ -452,7 +458,7 @@ func (bh *benchHolder) ScrapeSwarm(b *testing.B) { // ScrapeSwarm1kInfoHash can run in parallel. func (bh *benchHolder) ScrapeSwarm1kInfoHash(b *testing.B) { bh.runBenchmark(b, true, putPeers, func(i int, ps storage.PeerStorage, bd *benchData) error { - ps.ScrapeSwarm(bd.infoHashes[i%1000], bd.peers[0]) + ps.ScrapeSwarm(bd.infoHashes[i%ihCount], bd.peers[0]) return nil }) } diff --git a/storage/test/storage_test_base.go b/storage/test/storage_test_base.go index 5b7dc77..3a2a9bd 100644 --- a/storage/test/storage_test_base.go +++ b/storage/test/storage_test_base.go @@ -68,10 +68,10 @@ func (th *testHolder) AnnouncePeers(t *testing.T) { func (th *testHolder) ScrapeSwarm(t *testing.T) { for _, c := range testData { - scrape := th.st.ScrapeSwarm(c.ih, c.peer) - require.Equal(t, uint32(0), scrape.Complete) - require.Equal(t, uint32(0), scrape.Incomplete) - require.Equal(t, uint32(0), scrape.Snatches) + l, s, n := th.st.ScrapeSwarm(c.ih, c.peer) + require.Equal(t, uint32(0), s) + require.Equal(t, uint32(0), l) + require.Equal(t, uint32(0), n) } } @@ -93,9 +93,9 @@ func (th *testHolder) LeecherPutAnnounceDeleteAnnounce(t *testing.T) { require.Nil(t, err) require.True(t, containsPeer(peers, c.peer)) - scrape := th.st.ScrapeSwarm(c.ih, c.peer) - require.Equal(t, uint32(2), scrape.Incomplete) - require.Equal(t, uint32(0), scrape.Complete) + l, s, _ := th.st.ScrapeSwarm(c.ih, c.peer) + require.Equal(t, uint32(2), l) + require.Equal(t, uint32(0), s) err = th.st.DeleteLeecher(c.ih, c.peer) require.Nil(t, err) @@ -123,9 +123,9 @@ func (th *testHolder) SeederPutAnnounceDeleteAnnounce(t *testing.T) { require.Nil(t, err) require.True(t, containsPeer(peers, c.peer)) - scrape := th.st.ScrapeSwarm(c.ih, c.peer) - require.Equal(t, uint32(1), scrape.Incomplete) - require.Equal(t, uint32(1), scrape.Complete) + l, s, _ := th.st.ScrapeSwarm(c.ih, c.peer) + require.Equal(t, uint32(1), l) + require.Equal(t, uint32(1), s) err = th.st.DeleteSeeder(c.ih, c.peer) require.Nil(t, err)