(wip) pass context from frontend request down to store

This commit is contained in:
Lawrence, Rendall
2022-10-31 18:57:57 +03:00
parent c3b9615f21
commit d6de38bdbd
13 changed files with 285 additions and 267 deletions

View File

@@ -169,6 +169,17 @@ func injectRouteParamsToContext(ctx context.Context, ps httprouter.Params) conte
return context.WithValue(ctx, bittorrent.RouteParamsKey, rp)
}
func remapRouteParamsToBgContext(inCtx context.Context) context.Context {
rp, isOk := inCtx.Value(bittorrent.RouteParamsKey).(bittorrent.RouteParams)
if !isOk {
rp = bittorrent.RouteParams{}
} else {
logger.Warn().Msg("unable to fetch route parameters, probably jammed context")
}
// FIXME: cancelable context
return context.WithValue(context.TODO(), bittorrent.RouteParamsKey, rp)
}
// announceRoute parses and responds to an Announce.
func (f *httpFE) announceRoute(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
var err error
@@ -189,7 +200,7 @@ func (f *httpFE) announceRoute(w http.ResponseWriter, r *http.Request, ps httpro
}
addr = req.GetFirst()
ctx := injectRouteParamsToContext(context.Background(), ps)
ctx := injectRouteParamsToContext(r.Context(), ps)
ctx, resp, err := f.logic.HandleAnnounce(ctx, req)
if err != nil {
WriteError(w, err)
@@ -203,6 +214,9 @@ func (f *httpFE) announceRoute(w http.ResponseWriter, r *http.Request, ps httpro
return
}
// next actions are background and should not be canceled after http writer closed
ctx = remapRouteParamsToBgContext(ctx)
go f.logic.AfterAnnounce(ctx, req, resp)
}
@@ -225,7 +239,7 @@ func (f *httpFE) scrapeRoute(w http.ResponseWriter, r *http.Request, ps httprout
}
addr = req.GetFirst()
ctx := injectRouteParamsToContext(context.Background(), ps)
ctx := injectRouteParamsToContext(r.Context(), ps)
ctx, resp, err := f.logic.HandleScrape(ctx, req)
if err != nil {
WriteError(w, err)
@@ -239,18 +253,27 @@ func (f *httpFE) scrapeRoute(w http.ResponseWriter, r *http.Request, ps httprout
return
}
// next actions are background and should not be canceled after http writer closed
ctx = remapRouteParamsToBgContext(ctx)
go f.logic.AfterScrape(ctx, req, resp)
}
func (f *httpFE) ping(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {
var err error
status := http.StatusOK
ctx := r.Context()
if r.Method == http.MethodGet {
err = f.logic.Ping(context.TODO())
err = f.logic.Ping(ctx)
}
if err == nil {
w.WriteHeader(http.StatusOK)
} else {
if err != nil {
logger.Error().Err(err).Msg("ping completed with error")
w.WriteHeader(http.StatusServiceUnavailable)
status = http.StatusServiceUnavailable
}
if ctxErr := ctx.Err(); ctxErr == nil {
w.WriteHeader(status)
} else {
logger.Info().Err(ctxErr).Str("ip", r.RemoteAddr).Msg("ping request cancelled")
}
}

View File

@@ -46,17 +46,17 @@ func (h *swarmInteractionHook) HandleAnnounce(ctx context.Context, req *bittorre
return
}
var storeFn func(bittorrent.InfoHash, bittorrent.Peer) error
var storeFn func(context.Context, bittorrent.InfoHash, bittorrent.Peer) error
switch {
case req.Event == bittorrent.Stopped:
storeFn = func(hash bittorrent.InfoHash, peer bittorrent.Peer) error {
err = h.store.DeleteSeeder(hash, peer)
storeFn = func(ctx context.Context, hash bittorrent.InfoHash, peer bittorrent.Peer) error {
err = h.store.DeleteSeeder(ctx, hash, peer)
if err != nil && !errors.Is(err, storage.ErrResourceDoesNotExist) {
return err
}
err = h.store.DeleteLeecher(hash, peer)
err = h.store.DeleteLeecher(ctx, hash, peer)
if err != nil && !errors.Is(err, storage.ErrResourceDoesNotExist) {
return err
}
@@ -74,8 +74,8 @@ func (h *swarmInteractionHook) HandleAnnounce(ctx context.Context, req *bittorre
storeFn = h.store.PutLeecher
}
for _, p := range req.Peers() {
if err = storeFn(req.InfoHash, p); err == nil && len(req.InfoHash) == bittorrent.InfoHashV2Len {
err = storeFn(req.InfoHash.TruncateV1(), p)
if err = storeFn(ctx, req.InfoHash, p); err == nil && len(req.InfoHash) == bittorrent.InfoHashV2Len {
err = storeFn(ctx, req.InfoHash.TruncateV1(), p)
}
if err != nil {
break
@@ -102,10 +102,10 @@ type responseHook struct {
store storage.PeerStorage
}
func (h *responseHook) scrape(ih bittorrent.InfoHash) (leechers uint32, seeders uint32, snatched uint32) {
leechers, seeders, snatched = h.store.ScrapeSwarm(ih)
func (h *responseHook) scrape(ctx context.Context, ih bittorrent.InfoHash) (leechers uint32, seeders uint32, snatched uint32) {
leechers, seeders, snatched = h.store.ScrapeSwarm(ctx, ih)
if len(ih) == bittorrent.InfoHashV2Len {
l, s, n := h.store.ScrapeSwarm(ih.TruncateV1())
l, s, n := h.store.ScrapeSwarm(ctx, ih.TruncateV1())
leechers, seeders, snatched = leechers+l, seeders+s, snatched+n
}
return
@@ -117,9 +117,9 @@ func (h *responseHook) HandleAnnounce(ctx context.Context, req *bittorrent.Annou
}
// Add the Scrape data to the response.
resp.Incomplete, resp.Complete, _ = h.scrape(req.InfoHash)
resp.Incomplete, resp.Complete, _ = h.scrape(ctx, req.InfoHash)
err = h.appendPeers(req, resp)
err = h.appendPeers(ctx, req, resp)
return ctx, err
}
@@ -128,7 +128,7 @@ type fetchArgs struct {
v6 bool
}
func (h *responseHook) appendPeers(req *bittorrent.AnnounceRequest, resp *bittorrent.AnnounceResponse) (err error) {
func (h *responseHook) appendPeers(ctx context.Context, req *bittorrent.AnnounceRequest, resp *bittorrent.AnnounceResponse) (err error) {
seeding := req.Left == 0
max := int(req.NumWant)
peers := make([]bittorrent.Peer, 0, len(resp.IPv4Peers)+len(resp.IPv6Peers))
@@ -159,7 +159,7 @@ func (h *responseHook) appendPeers(req *bittorrent.AnnounceRequest, resp *bittor
break
}
var storePeers []bittorrent.Peer
storePeers, err = h.store.AnnouncePeers(a.ih, seeding, max, a.v6)
storePeers, err = h.store.AnnouncePeers(ctx, a.ih, seeding, max, a.v6)
if err != nil && !errors.Is(err, storage.ErrResourceDoesNotExist) {
return err
}
@@ -209,13 +209,13 @@ func (h *responseHook) HandleScrape(ctx context.Context, req *bittorrent.ScrapeR
for _, infoHash := range req.InfoHashes {
scr := bittorrent.Scrape{InfoHash: infoHash}
scr.Incomplete, scr.Complete, scr.Snatches = h.scrape(infoHash)
scr.Incomplete, scr.Complete, scr.Snatches = h.scrape(ctx, infoHash)
resp.Files = append(resp.Files, scr)
}
return ctx, nil
}
func (h *responseHook) Ping(_ context.Context) error {
return h.store.Ping()
func (h *responseHook) Ping(ctx context.Context) error {
return h.store.Ping(ctx)
}

View File

@@ -3,6 +3,7 @@
package container
import (
"context"
"errors"
"sync"
@@ -41,7 +42,7 @@ func Register(n string, c Builder) {
// Container holds InfoHash and checks if value approved or not
type Container interface {
Approved(bittorrent.InfoHash) bool
Approved(context.Context, bittorrent.InfoHash) bool
}
// GetContainer creates Container by its name and provided confBytes

View File

@@ -5,6 +5,7 @@
package directory
import (
"context"
"fmt"
"github.com/anacrolix/torrent/metainfo"
@@ -85,28 +86,23 @@ func build(conf conf.MapConfig, st storage.DataStorage) (container.Container, er
name = list.DUMMY
}
bName := []byte(name)
logger.Err(d.Storage.Put(d.StorageCtx,
storage.Entry{
Key: event.InfoHash.AsString(),
Value: bName,
}, storage.Entry{
Key: v2hash.RawString(),
Value: bName,
}, storage.Entry{
Key: v2hash.TruncateV1().RawString(),
Value: bName,
})).
logger.Err(d.Storage.Put(context.Background(), d.StorageCtx, storage.Entry{
Key: event.InfoHash.AsString(),
Value: bName,
}, storage.Entry{
Key: v2hash.RawString(),
Value: bName,
}, storage.Entry{
Key: v2hash.TruncateV1().RawString(),
Value: bName,
})).
Str("action", "add").
Str("file", event.TorrentFilePath).
Stringer("infoHash", event.InfoHash).
Stringer("infoHashV2", v2hash).
Msg("approval torrent watcher event")
case dirwatch.Removed:
logger.Err(d.Storage.Delete(c.StorageCtx,
event.InfoHash.AsString(),
v2hash.RawString(),
v2hash.TruncateV1().RawString(),
)).
logger.Err(d.Storage.Delete(context.Background(), c.StorageCtx, event.InfoHash.AsString(), v2hash.RawString(), v2hash.TruncateV1().RawString())).
Str("action", "delete").
Str("file", event.TorrentFilePath).
Stringer("infoHash", event.InfoHash).

View File

@@ -3,6 +3,7 @@
package list
import (
"context"
"fmt"
"github.com/sot-tech/mochi/bittorrent"
@@ -64,7 +65,7 @@ func build(conf conf.MapConfig, st storage.DataStorage) (container.Container, er
init = append(init, storage.Entry{Key: ih.TruncateV1().RawString(), Value: []byte(DUMMY)})
}
}
if err := l.Storage.Put(l.StorageCtx, init...); err != nil {
if err := l.Storage.Put(context.Background(), l.StorageCtx, init...); err != nil {
return nil, fmt.Errorf("unable to put initial data: %w", err)
}
}
@@ -84,11 +85,11 @@ type List struct {
// Approved checks if specified hash is approved or not.
// If List.Invert set to true and hash found in storage, function will return false,
// that means that hash is blacklisted.
func (l *List) Approved(hash bittorrent.InfoHash) (contains bool) {
func (l *List) Approved(ctx context.Context, hash bittorrent.InfoHash) (contains bool) {
var err error
if contains, err = l.Storage.Contains(l.StorageCtx, hash.RawString()); err == nil {
if contains, err = l.Storage.Contains(ctx, l.StorageCtx, hash.RawString()); err == nil {
if len(hash) == bittorrent.InfoHashV2Len {
if containsV2, errV2 := l.Storage.Contains(l.StorageCtx, hash.TruncateV1().RawString()); err == nil {
if containsV2, errV2 := l.Storage.Contains(ctx, l.StorageCtx, hash.TruncateV1().RawString()); err == nil {
contains = contains || containsV2
} else {
err = errV2

View File

@@ -74,7 +74,7 @@ type hook struct {
func (h *hook) HandleAnnounce(ctx context.Context, req *bittorrent.AnnounceRequest, _ *bittorrent.AnnounceResponse) (context.Context, error) {
var err error
if !h.hashContainer.Approved(req.InfoHash) {
if !h.hashContainer.Approved(ctx, req.InfoHash) {
err = ErrTorrentUnapproved
}

View File

@@ -93,27 +93,23 @@ type store struct {
peerTTL uint
}
func (s *store) setPeerTTL(infoHashKey, peerID string) error {
return s.Process(context.TODO(), redis.NewCmd(context.TODO(), expireMemberCmd, infoHashKey, peerID, s.peerTTL))
}
func (s *store) addPeer(infoHashKey, peerID string) (err error) {
func (s *store) addPeer(ctx context.Context, infoHashKey, peerID string) (err error) {
logger.Trace().
Str("infoHashKey", infoHashKey).
Str("peerID", peerID).
Msg("add peer")
if err = s.SAdd(context.TODO(), infoHashKey, peerID).Err(); err == nil {
err = s.setPeerTTL(infoHashKey, peerID)
if err = s.SAdd(ctx, infoHashKey, peerID).Err(); err == nil {
err = s.Process(ctx, redis.NewCmd(ctx, expireMemberCmd, infoHashKey, peerID, s.peerTTL))
}
return
}
func (s *store) delPeer(infoHashKey, peerID string) error {
func (s *store) delPeer(ctx context.Context, infoHashKey, peerID string) error {
logger.Trace().
Str("infoHashKey", infoHashKey).
Str("peerID", peerID).
Msg("del peer")
deleted, err := s.SRem(context.TODO(), infoHashKey, peerID).Uint64()
deleted, err := s.SRem(ctx, infoHashKey, peerID).Uint64()
err = r.AsNil(err)
if err == nil && deleted == 0 {
err = storage.ErrResourceDoesNotExist
@@ -122,23 +118,23 @@ func (s *store) delPeer(infoHashKey, peerID string) error {
return err
}
func (s *store) PutSeeder(ih bittorrent.InfoHash, peer bittorrent.Peer) error {
return s.addPeer(r.InfoHashKey(ih.RawString(), true, peer.Addr().Is6()), peer.RawString())
func (s *store) PutSeeder(ctx context.Context, ih bittorrent.InfoHash, peer bittorrent.Peer) error {
return s.addPeer(ctx, r.InfoHashKey(ih.RawString(), true, peer.Addr().Is6()), peer.RawString())
}
func (s *store) DeleteSeeder(ih bittorrent.InfoHash, peer bittorrent.Peer) error {
return s.delPeer(r.InfoHashKey(ih.RawString(), true, peer.Addr().Is6()), peer.RawString())
func (s *store) DeleteSeeder(ctx context.Context, ih bittorrent.InfoHash, peer bittorrent.Peer) error {
return s.delPeer(ctx, r.InfoHashKey(ih.RawString(), true, peer.Addr().Is6()), peer.RawString())
}
func (s *store) PutLeecher(ih bittorrent.InfoHash, peer bittorrent.Peer) error {
return s.addPeer(r.InfoHashKey(ih.RawString(), false, peer.Addr().Is6()), peer.RawString())
func (s *store) PutLeecher(ctx context.Context, ih bittorrent.InfoHash, peer bittorrent.Peer) error {
return s.addPeer(ctx, r.InfoHashKey(ih.RawString(), false, peer.Addr().Is6()), peer.RawString())
}
func (s *store) DeleteLeecher(ih bittorrent.InfoHash, peer bittorrent.Peer) error {
return s.delPeer(r.InfoHashKey(ih.RawString(), false, peer.Addr().Is6()), peer.RawString())
func (s *store) DeleteLeecher(ctx context.Context, ih bittorrent.InfoHash, peer bittorrent.Peer) error {
return s.delPeer(ctx, r.InfoHashKey(ih.RawString(), false, peer.Addr().Is6()), peer.RawString())
}
func (s *store) GraduateLeecher(ih bittorrent.InfoHash, peer bittorrent.Peer) (err error) {
func (s *store) GraduateLeecher(ctx context.Context, ih bittorrent.InfoHash, peer bittorrent.Peer) (err error) {
logger.Trace().
Stringer("infoHash", ih).
Object("peer", peer).
@@ -147,21 +143,21 @@ func (s *store) GraduateLeecher(ih bittorrent.InfoHash, peer bittorrent.Peer) (e
ihSeederKey := r.InfoHashKey(infoHash, true, peer.Addr().Is6())
ihLeecherKey := r.InfoHashKey(infoHash, false, peer.Addr().Is6())
var moved bool
if moved, err = s.SMove(context.TODO(), ihLeecherKey, ihSeederKey, peerID).Result(); err == nil {
if moved, err = s.SMove(ctx, ihLeecherKey, ihSeederKey, peerID).Result(); err == nil {
if moved {
err = s.setPeerTTL(ihSeederKey, peerID)
err = s.Process(ctx, redis.NewCmd(ctx, expireMemberCmd, ihSeederKey, peerID, s.peerTTL))
} else {
err = s.addPeer(ihSeederKey, peerID)
err = s.addPeer(ctx, ihSeederKey, peerID)
}
if err == nil {
err = s.HIncrBy(context.TODO(), r.CountDownloadsKey, infoHash, 1).Err()
err = s.HIncrBy(ctx, r.CountDownloadsKey, infoHash, 1).Err()
}
}
return err
}
// AnnouncePeers is the same function as redis.AnnouncePeers
func (s *store) AnnouncePeers(ih bittorrent.InfoHash, forSeeder bool, numWant int, v6 bool) ([]bittorrent.Peer, error) {
func (s *store) AnnouncePeers(ctx context.Context, ih bittorrent.InfoHash, forSeeder bool, numWant int, v6 bool) ([]bittorrent.Peer, error) {
logger.Trace().
Stringer("infoHash", ih).
Bool("forSeeder", forSeeder).
@@ -169,17 +165,17 @@ func (s *store) AnnouncePeers(ih bittorrent.InfoHash, forSeeder bool, numWant in
Bool("v6", v6).
Msg("announce peers")
return s.GetPeers(ih, forSeeder, numWant, v6, func(ctx context.Context, infoHashKey string, maxCount int) *redis.StringSliceCmd {
return s.SRandMemberN(context.TODO(), infoHashKey, int64(maxCount))
return s.GetPeers(ih, forSeeder, numWant, v6, func(infoHashKey string, maxCount int) *redis.StringSliceCmd {
return s.SRandMemberN(ctx, infoHashKey, int64(maxCount))
})
}
// ScrapeSwarm is the same function as redis.ScrapeSwarm except `SCard` call instead of `HLen`
func (s *store) ScrapeSwarm(ih bittorrent.InfoHash) (uint32, uint32, uint32) {
func (s *store) ScrapeSwarm(ctx context.Context, ih bittorrent.InfoHash) (uint32, uint32, uint32) {
logger.Trace().
Stringer("infoHash", ih).
Msg("scrape swarm")
return s.ScrapeIH(ih, s.SCard)
return s.ScrapeIH(ctx, ih, s.SCard)
}
func (*store) GCAware() bool {

View File

@@ -3,6 +3,7 @@
package memory
import (
"context"
"encoding/binary"
"math"
"runtime"
@@ -174,7 +175,7 @@ func (ps *peerStore) shardIndex(infoHash bittorrent.InfoHash, v6 bool) uint32 {
return idx
}
func (ps *peerStore) PutSeeder(ih bittorrent.InfoHash, p bittorrent.Peer) error {
func (ps *peerStore) PutSeeder(_ context.Context, ih bittorrent.InfoHash, p bittorrent.Peer) error {
select {
case <-ps.closed:
panic("attempted to interact with stopped memory store")
@@ -207,7 +208,7 @@ func (ps *peerStore) PutSeeder(ih bittorrent.InfoHash, p bittorrent.Peer) error
return nil
}
func (ps *peerStore) DeleteSeeder(ih bittorrent.InfoHash, p bittorrent.Peer) error {
func (ps *peerStore) DeleteSeeder(_ context.Context, ih bittorrent.InfoHash, p bittorrent.Peer) error {
select {
case <-ps.closed:
panic("attempted to interact with stopped memory store")
@@ -240,7 +241,7 @@ func (ps *peerStore) DeleteSeeder(ih bittorrent.InfoHash, p bittorrent.Peer) err
return nil
}
func (ps *peerStore) PutLeecher(ih bittorrent.InfoHash, p bittorrent.Peer) error {
func (ps *peerStore) PutLeecher(_ context.Context, ih bittorrent.InfoHash, p bittorrent.Peer) error {
select {
case <-ps.closed:
panic("attempted to interact with stopped memory store")
@@ -273,7 +274,7 @@ func (ps *peerStore) PutLeecher(ih bittorrent.InfoHash, p bittorrent.Peer) error
return nil
}
func (ps *peerStore) DeleteLeecher(ih bittorrent.InfoHash, p bittorrent.Peer) error {
func (ps *peerStore) DeleteLeecher(_ context.Context, ih bittorrent.InfoHash, p bittorrent.Peer) error {
select {
case <-ps.closed:
panic("attempted to interact with stopped memory store")
@@ -306,7 +307,7 @@ func (ps *peerStore) DeleteLeecher(ih bittorrent.InfoHash, p bittorrent.Peer) er
return nil
}
func (ps *peerStore) GraduateLeecher(ih bittorrent.InfoHash, p bittorrent.Peer) error {
func (ps *peerStore) GraduateLeecher(_ context.Context, ih bittorrent.InfoHash, p bittorrent.Peer) error {
select {
case <-ps.closed:
panic("attempted to interact with stopped memory store")
@@ -372,7 +373,7 @@ func (ps *peerStore) getPeers(shard *peerShard, ih bittorrent.InfoHash, maxCount
return
}
func (ps *peerStore) AnnouncePeers(ih bittorrent.InfoHash, forSeeder bool, numWant int, v6 bool) (peers []bittorrent.Peer, err error) {
func (ps *peerStore) AnnouncePeers(_ context.Context, ih bittorrent.InfoHash, forSeeder bool, numWant int, v6 bool) (peers []bittorrent.Peer, err error) {
select {
case <-ps.closed:
panic("attempted to interact with stopped memory store")
@@ -401,7 +402,7 @@ func (ps *peerStore) countPeers(ih bittorrent.InfoHash, v6 bool) (leechers, seed
return
}
func (ps *peerStore) ScrapeSwarm(ih bittorrent.InfoHash) (leechers uint32, seeders uint32, snatched uint32) {
func (ps *peerStore) ScrapeSwarm(_ context.Context, ih bittorrent.InfoHash) (leechers uint32, seeders uint32, snatched uint32) {
select {
case <-ps.closed:
panic("attempted to interact with stopped memory store")
@@ -427,7 +428,7 @@ type dataStore struct {
sync.Map
}
func (ds *dataStore) Put(ctx string, values ...storage.Entry) error {
func (ds *dataStore) Put(_ context.Context, ctx string, values ...storage.Entry) error {
if len(values) > 0 {
c, _ := ds.LoadOrStore(ctx, new(sync.Map))
m := c.(*sync.Map)
@@ -438,7 +439,7 @@ func (ds *dataStore) Put(ctx string, values ...storage.Entry) error {
return nil
}
func (ds *dataStore) Contains(ctx string, key string) (bool, error) {
func (ds *dataStore) Contains(_ context.Context, ctx string, key string) (bool, error) {
var exist bool
if m, found := ds.Map.Load(ctx); found {
_, exist = m.(*sync.Map).Load(key)
@@ -446,7 +447,7 @@ func (ds *dataStore) Contains(ctx string, key string) (bool, error) {
return exist, nil
}
func (ds *dataStore) Load(ctx string, key string) (out []byte, _ error) {
func (ds *dataStore) Load(_ context.Context, ctx string, key string) (out []byte, _ error) {
if m, found := ds.Map.Load(ctx); found {
if v, _ := m.(*sync.Map).Load(key); v != nil {
out = v.([]byte)
@@ -455,7 +456,7 @@ func (ds *dataStore) Load(ctx string, key string) (out []byte, _ error) {
return
}
func (ds *dataStore) Delete(ctx string, keys ...string) error {
func (ds *dataStore) Delete(_ context.Context, ctx string, keys ...string) error {
if len(keys) > 0 {
if m, found := ds.Map.Load(ctx); found {
m := m.(*sync.Map)
@@ -537,7 +538,7 @@ func (ps *peerStore) gc(cutoff time.Time) {
}
}
func (*peerStore) Ping() error {
func (*peerStore) Ping(context.Context) error {
return nil
}

View File

@@ -222,7 +222,7 @@ type store struct {
func (s *store) txBatch(ctx context.Context, batch *pgx.Batch) (err error) {
var tx pgx.Tx
if tx, err = s.Begin(ctx); err == nil {
if err = tx.SendBatch(context.TODO(), batch).Close(); err == nil {
if err = tx.SendBatch(ctx, batch).Close(); err == nil {
err = tx.Commit(ctx)
} else {
if txErr := tx.Rollback(ctx); txErr != nil {
@@ -233,25 +233,25 @@ func (s *store) txBatch(ctx context.Context, batch *pgx.Batch) (err error) {
return
}
func (s *store) Put(ctx string, values ...storage.Entry) (err error) {
func (s *store) Put(ctx context.Context, storeCtx string, values ...storage.Entry) (err error) {
switch len(values) {
case 0:
// ignore
case 1:
_, err = s.Exec(context.TODO(), s.Data.AddQuery, pgx.NamedArgs{pCtx: ctx, pKey: []byte(values[0].Key), pValue: values[0].Value})
_, err = s.Exec(ctx, s.Data.AddQuery, pgx.NamedArgs{pCtx: storeCtx, pKey: []byte(values[0].Key), pValue: values[0].Value})
default:
var batch pgx.Batch
for _, v := range values {
batch.Queue(s.Data.AddQuery, pgx.NamedArgs{pCtx: ctx, pKey: []byte(v.Key), pValue: v.Value})
}
err = s.txBatch(context.TODO(), &batch)
err = s.txBatch(ctx, &batch)
}
return
}
func (s *store) Contains(ctx string, key string) (contains bool, err error) {
func (s *store) Contains(ctx context.Context, storeCtx string, key string) (contains bool, err error) {
var rows pgx.Rows
if rows, err = s.Query(context.TODO(), s.Data.GetQuery, pgx.NamedArgs{pCtx: ctx, pKey: []byte(key)}); err == nil {
if rows, err = s.Query(ctx, s.Data.GetQuery, pgx.NamedArgs{pCtx: storeCtx, pKey: []byte(key)}); err == nil {
defer rows.Close()
contains = rows.Next()
err = rows.Err()
@@ -259,20 +259,20 @@ func (s *store) Contains(ctx string, key string) (contains bool, err error) {
return
}
func (s *store) Load(ctx string, key string) (out []byte, err error) {
if err = s.QueryRow(context.TODO(), s.Data.GetQuery, pgx.NamedArgs{pCtx: ctx, pKey: []byte(key)}).Scan(&out); errors.Is(err, pgx.ErrNoRows) {
func (s *store) Load(ctx context.Context, storeCtx string, key string) (out []byte, err error) {
if err = s.QueryRow(ctx, s.Data.GetQuery, pgx.NamedArgs{pCtx: storeCtx, pKey: []byte(key)}).Scan(&out); errors.Is(err, pgx.ErrNoRows) {
err = nil
}
return
}
func (s *store) Delete(ctx string, keys ...string) (err error) {
func (s *store) Delete(ctx context.Context, storeCtx string, keys ...string) (err error) {
if len(keys) > 0 {
baKeys := make([][]byte, len(keys))
for i, k := range keys {
baKeys[i] = []byte(k)
}
_, err = s.Exec(context.TODO(), s.Data.DelQuery, pgx.NamedArgs{pCtx: ctx, pKey: baKeys})
_, err = s.Exec(ctx, s.Data.DelQuery, pgx.NamedArgs{pCtx: storeCtx, pKey: baKeys})
}
return
}
@@ -328,7 +328,7 @@ func (s *store) ScheduleStatisticsCollection(reportInterval time.Duration) {
case <-t.C:
if metrics.Enabled() {
before := time.Now()
sc, lc := s.countPeers(nil)
sc, lc := s.countPeers(context.Background(), nil)
var hc int
if err := s.QueryRow(context.Background(), s.InfoHashCountQuery).Scan(&hc); err != nil && !errors.Is(err, pgx.ErrNoRows) {
logger.Error().Err(err).Msg("error occurred while get info hash count")
@@ -344,7 +344,7 @@ func (s *store) ScheduleStatisticsCollection(reportInterval time.Duration) {
}()
}
func (s *store) putPeer(ih bittorrent.InfoHash, peer bittorrent.Peer, seeder bool) (err error) {
func (s *store) putPeer(ctx context.Context, ih bittorrent.InfoHash, peer bittorrent.Peer, seeder bool) (err error) {
logger.Trace().
Stringer("infoHash", ih).
Object("peer", peer).
@@ -361,16 +361,16 @@ func (s *store) putPeer(ih bittorrent.InfoHash, peer bittorrent.Peer, seeder boo
if s.GCAware() {
args[pCreated] = timecache.Now()
}
_, err = s.Exec(context.TODO(), s.Peer.AddQuery, args)
_, err = s.Exec(ctx, s.Peer.AddQuery, args)
return
}
func (s *store) delPeer(ih bittorrent.InfoHash, peer bittorrent.Peer, seeder bool) (err error) {
func (s *store) delPeer(ctx context.Context, ih bittorrent.InfoHash, peer bittorrent.Peer, seeder bool) (err error) {
logger.Trace().
Stringer("infoHash", ih).
Object("peer", peer).
Msg("del peer")
_, err = s.Exec(context.TODO(), s.Peer.DelQuery, pgx.NamedArgs{
_, err = s.Exec(ctx, s.Peer.DelQuery, pgx.NamedArgs{
pInfoHash: []byte(ih),
pPeerID: peer.ID[:],
pAddress: net.IP(peer.Addr().AsSlice()),
@@ -380,23 +380,23 @@ func (s *store) delPeer(ih bittorrent.InfoHash, peer bittorrent.Peer, seeder boo
return
}
func (s *store) PutSeeder(ih bittorrent.InfoHash, peer bittorrent.Peer) error {
return s.putPeer(ih, peer, true)
func (s *store) PutSeeder(ctx context.Context, ih bittorrent.InfoHash, peer bittorrent.Peer) error {
return s.putPeer(ctx, ih, peer, true)
}
func (s *store) DeleteSeeder(ih bittorrent.InfoHash, peer bittorrent.Peer) error {
return s.delPeer(ih, peer, true)
func (s *store) DeleteSeeder(ctx context.Context, ih bittorrent.InfoHash, peer bittorrent.Peer) error {
return s.delPeer(ctx, ih, peer, true)
}
func (s *store) PutLeecher(ih bittorrent.InfoHash, peer bittorrent.Peer) error {
return s.putPeer(ih, peer, false)
func (s *store) PutLeecher(ctx context.Context, ih bittorrent.InfoHash, peer bittorrent.Peer) error {
return s.putPeer(ctx, ih, peer, false)
}
func (s *store) DeleteLeecher(ih bittorrent.InfoHash, peer bittorrent.Peer) error {
return s.delPeer(ih, peer, false)
func (s *store) DeleteLeecher(ctx context.Context, ih bittorrent.InfoHash, peer bittorrent.Peer) error {
return s.delPeer(ctx, ih, peer, false)
}
func (s *store) GraduateLeecher(ih bittorrent.InfoHash, peer bittorrent.Peer) error {
func (s *store) GraduateLeecher(ctx context.Context, ih bittorrent.InfoHash, peer bittorrent.Peer) error {
logger.Trace().
Stringer("infoHash", ih).
Object("peer", peer).
@@ -410,12 +410,12 @@ func (s *store) GraduateLeecher(ih bittorrent.InfoHash, peer bittorrent.Peer) er
pPort: peer.Port(),
})
batch.Queue(s.Downloads.IncrementQuery, pgx.NamedArgs{pInfoHash: ihb})
return s.txBatch(context.TODO(), &batch)
return s.txBatch(ctx, &batch)
}
func (s *store) getPeers(ih bittorrent.InfoHash, seeders bool, maxCount int, isV6 bool) (peers []bittorrent.Peer, err error) {
func (s *store) getPeers(ctx context.Context, ih bittorrent.InfoHash, seeders bool, maxCount int, isV6 bool) (peers []bittorrent.Peer, err error) {
var rows pgx.Rows
if rows, err = s.Query(context.TODO(), s.Announce.Query, pgx.NamedArgs{
if rows, err = s.Query(ctx, s.Announce.Query, pgx.NamedArgs{
pInfoHash: []byte(ih),
pSeeder: seeders,
pV6: isV6,
@@ -484,7 +484,7 @@ func (s *store) getPeers(ih bittorrent.InfoHash, seeders bool, maxCount int, isV
return
}
func (s *store) AnnouncePeers(ih bittorrent.InfoHash, forSeeder bool, numWant int, v6 bool) (peers []bittorrent.Peer, err error) {
func (s *store) AnnouncePeers(ctx context.Context, ih bittorrent.InfoHash, forSeeder bool, numWant int, v6 bool) (peers []bittorrent.Peer, err error) {
logger.Trace().
Stringer("infoHash", ih).
Bool("forSeeder", forSeeder).
@@ -492,11 +492,11 @@ func (s *store) AnnouncePeers(ih bittorrent.InfoHash, forSeeder bool, numWant in
Bool("v6", v6).
Msg("announce peers")
if forSeeder {
peers, err = s.getPeers(ih, false, numWant, v6)
peers, err = s.getPeers(ctx, ih, false, numWant, v6)
} else {
if peers, err = s.getPeers(ih, true, numWant, v6); err == nil {
if peers, err = s.getPeers(ctx, ih, true, numWant, v6); err == nil {
var addPeers []bittorrent.Peer
addPeers, err = s.getPeers(ih, false, numWant-len(peers), v6)
addPeers, err = s.getPeers(ctx, ih, false, numWant-len(peers), v6)
peers = append(peers, addPeers...)
}
}
@@ -513,13 +513,13 @@ func (s *store) AnnouncePeers(ih bittorrent.InfoHash, forSeeder bool, numWant in
return
}
func (s *store) countPeers(ih []byte) (seeders uint32, leechers uint32) {
func (s *store) countPeers(ctx context.Context, ih []byte) (seeders uint32, leechers uint32) {
var rows pgx.Rows
var err error
if len(ih) == 0 {
rows, err = s.Query(context.TODO(), s.Peer.CountQuery)
rows, err = s.Query(ctx, s.Peer.CountQuery)
} else {
rows, err = s.Query(context.TODO(), s.Peer.CountQuery+" "+s.Peer.ByInfoHashClause, pgx.NamedArgs{pInfoHash: ih})
rows, err = s.Query(ctx, s.Peer.CountQuery+" "+s.Peer.ByInfoHashClause, pgx.NamedArgs{pInfoHash: ih})
}
if err == nil {
defer rows.Close()
@@ -556,14 +556,14 @@ func (s *store) countPeers(ih []byte) (seeders uint32, leechers uint32) {
return
}
func (s *store) ScrapeSwarm(ih bittorrent.InfoHash) (leechers uint32, seeders uint32, snatched uint32) {
func (s *store) ScrapeSwarm(ctx context.Context, ih bittorrent.InfoHash) (leechers uint32, seeders uint32, snatched uint32) {
logger.Trace().
Stringer("infoHash", ih).
Msg("scrape swarm")
ihb := []byte(ih)
seeders, leechers = s.countPeers(ihb)
seeders, leechers = s.countPeers(ctx, ihb)
if len(s.Downloads.GetQuery) > 0 {
if err := s.QueryRow(context.TODO(), s.Downloads.GetQuery, pgx.NamedArgs{pInfoHash: ihb}).Scan(&snatched); err != nil && !errors.Is(err, pgx.ErrNoRows) {
if err := s.QueryRow(ctx, s.Downloads.GetQuery, pgx.NamedArgs{pInfoHash: ihb}).Scan(&snatched); err != nil && !errors.Is(err, pgx.ErrNoRows) {
logger.Error().Stringer("infoHash", ih).Err(err).Msg("error occurred while get info downloads count")
}
}
@@ -571,8 +571,8 @@ func (s *store) ScrapeSwarm(ih bittorrent.InfoHash) (leechers uint32, seeders ui
return
}
func (s *store) Ping() error {
_, err := s.Exec(context.TODO(), s.PingQuery)
func (s *store) Ping(ctx context.Context) error {
_, err := s.Exec(ctx, s.PingQuery)
return err
}

View File

@@ -309,8 +309,8 @@ func (ps *store) getClock() int64 {
return timecache.NowUnixNano()
}
func (ps *store) tx(txf func(tx redis.Pipeliner) error) (err error) {
if pipe, txErr := ps.TxPipelined(context.TODO(), txf); txErr == nil {
func (ps *store) tx(ctx context.Context, txf func(tx redis.Pipeliner) error) (err error) {
if pipe, txErr := ps.TxPipelined(ctx, txf); txErr == nil {
errs := make([]string, 0)
for _, c := range pipe {
if err := c.Err(); err != nil {
@@ -358,58 +358,58 @@ func InfoHashKey(infoHash string, seeder, v6 bool) (infoHashKey string) {
return
}
func (ps *store) putPeer(infoHashKey, peerCountKey, peerID string) error {
func (ps *store) putPeer(ctx context.Context, infoHashKey, peerCountKey, peerID string) error {
logger.Trace().
Str("infoHashKey", infoHashKey).
Str("peerID", peerID).
Msg("put peer")
return ps.tx(func(tx redis.Pipeliner) (err error) {
if err = tx.HSet(context.TODO(), infoHashKey, peerID, ps.getClock()).Err(); err != nil {
return ps.tx(ctx, func(tx redis.Pipeliner) (err error) {
if err = tx.HSet(ctx, infoHashKey, peerID, ps.getClock()).Err(); err != nil {
return
}
if err = tx.Incr(context.TODO(), peerCountKey).Err(); err != nil {
if err = tx.Incr(ctx, peerCountKey).Err(); err != nil {
return
}
err = tx.SAdd(context.TODO(), IHKey, infoHashKey).Err()
err = tx.SAdd(ctx, IHKey, infoHashKey).Err()
return
})
}
func (ps *store) delPeer(infoHashKey, peerCountKey, peerID string) error {
func (ps *store) delPeer(ctx context.Context, infoHashKey, peerCountKey, peerID string) error {
logger.Trace().
Str("infoHashKey", infoHashKey).
Str("peerID", peerID).
Msg("del peer")
deleted, err := ps.HDel(context.TODO(), infoHashKey, peerID).Uint64()
deleted, err := ps.HDel(ctx, infoHashKey, peerID).Uint64()
err = AsNil(err)
if err == nil {
if deleted == 0 {
err = storage.ErrResourceDoesNotExist
} else {
err = ps.Decr(context.TODO(), peerCountKey).Err()
err = ps.Decr(ctx, peerCountKey).Err()
}
}
return err
}
func (ps *store) PutSeeder(ih bittorrent.InfoHash, peer bittorrent.Peer) error {
return ps.putPeer(InfoHashKey(ih.RawString(), true, peer.Addr().Is6()), CountSeederKey, peer.RawString())
func (ps *store) PutSeeder(ctx context.Context, ih bittorrent.InfoHash, peer bittorrent.Peer) error {
return ps.putPeer(ctx, InfoHashKey(ih.RawString(), true, peer.Addr().Is6()), CountSeederKey, peer.RawString())
}
func (ps *store) DeleteSeeder(ih bittorrent.InfoHash, peer bittorrent.Peer) error {
return ps.delPeer(InfoHashKey(ih.RawString(), true, peer.Addr().Is6()), CountSeederKey, peer.RawString())
func (ps *store) DeleteSeeder(ctx context.Context, ih bittorrent.InfoHash, peer bittorrent.Peer) error {
return ps.delPeer(ctx, InfoHashKey(ih.RawString(), true, peer.Addr().Is6()), CountSeederKey, peer.RawString())
}
func (ps *store) PutLeecher(ih bittorrent.InfoHash, peer bittorrent.Peer) error {
return ps.putPeer(InfoHashKey(ih.RawString(), false, peer.Addr().Is6()), CountLeecherKey, peer.RawString())
func (ps *store) PutLeecher(ctx context.Context, ih bittorrent.InfoHash, peer bittorrent.Peer) error {
return ps.putPeer(ctx, InfoHashKey(ih.RawString(), false, peer.Addr().Is6()), CountLeecherKey, peer.RawString())
}
func (ps *store) DeleteLeecher(ih bittorrent.InfoHash, peer bittorrent.Peer) error {
return ps.delPeer(InfoHashKey(ih.RawString(), false, peer.Addr().Is6()), CountLeecherKey, peer.RawString())
func (ps *store) DeleteLeecher(ctx context.Context, ih bittorrent.InfoHash, peer bittorrent.Peer) error {
return ps.delPeer(ctx, InfoHashKey(ih.RawString(), false, peer.Addr().Is6()), CountLeecherKey, peer.RawString())
}
func (ps *store) GraduateLeecher(ih bittorrent.InfoHash, peer bittorrent.Peer) error {
func (ps *store) GraduateLeecher(ctx context.Context, ih bittorrent.InfoHash, peer bittorrent.Peer) error {
logger.Trace().
Stringer("infoHash", ih).
Object("peer", peer).
@@ -418,25 +418,25 @@ func (ps *store) GraduateLeecher(ih bittorrent.InfoHash, peer bittorrent.Peer) e
infoHash, peerID, isV6 := ih.RawString(), peer.RawString(), peer.Addr().Is6()
ihSeederKey, ihLeecherKey := InfoHashKey(infoHash, true, isV6), InfoHashKey(infoHash, false, isV6)
return ps.tx(func(tx redis.Pipeliner) error {
deleted, err := tx.HDel(context.TODO(), ihLeecherKey, peerID).Uint64()
return ps.tx(ctx, func(tx redis.Pipeliner) error {
deleted, err := tx.HDel(ctx, ihLeecherKey, peerID).Uint64()
err = AsNil(err)
if err == nil {
if deleted > 0 {
err = tx.Decr(context.TODO(), CountLeecherKey).Err()
err = tx.Decr(ctx, CountLeecherKey).Err()
}
}
if err == nil {
err = tx.HSet(context.TODO(), ihSeederKey, peerID, ps.getClock()).Err()
err = tx.HSet(ctx, ihSeederKey, peerID, ps.getClock()).Err()
}
if err == nil {
err = tx.Incr(context.TODO(), CountSeederKey).Err()
err = tx.Incr(ctx, CountSeederKey).Err()
}
if err == nil {
err = tx.SAdd(context.TODO(), IHKey, ihSeederKey).Err()
err = tx.SAdd(ctx, IHKey, ihSeederKey).Err()
}
if err == nil {
err = tx.HIncrBy(context.TODO(), CountDownloadsKey, infoHash, 1).Err()
err = tx.HIncrBy(ctx, CountDownloadsKey, infoHash, 1).Err()
}
return err
})
@@ -457,7 +457,7 @@ func (ps *Connection) parsePeersList(peersResult *redis.StringSliceCmd) (peers [
return
}
type getPeersFn func(context.Context, string, int) *redis.StringSliceCmd
type getPeersFn func(string, int) *redis.StringSliceCmd
// GetPeers retrieves peers for provided info hash by calling membersFn and
// converts result to bittorrent.Peer array.
@@ -477,7 +477,7 @@ func (ps *Connection) GetPeers(ih bittorrent.InfoHash, forSeeder bool, maxCount
for _, infoHashKey := range infoHashKeys {
var peers []bittorrent.Peer
peers, err = ps.parsePeersList(membersFn(context.TODO(), infoHashKey, maxCount))
peers, err = ps.parsePeersList(membersFn(infoHashKey, maxCount))
maxCount -= len(peers)
out = append(out, peers...)
if err != nil || maxCount <= 0 {
@@ -497,7 +497,7 @@ func (ps *Connection) GetPeers(ih bittorrent.InfoHash, forSeeder bool, maxCount
return
}
func (ps *store) AnnouncePeers(ih bittorrent.InfoHash, forSeeder bool, numWant int, v6 bool) ([]bittorrent.Peer, error) {
func (ps *store) AnnouncePeers(ctx context.Context, ih bittorrent.InfoHash, forSeeder bool, numWant int, v6 bool) ([]bittorrent.Peer, error) {
logger.Trace().
Stringer("infoHash", ih).
Bool("forSeeder", forSeeder).
@@ -505,15 +505,15 @@ func (ps *store) AnnouncePeers(ih bittorrent.InfoHash, forSeeder bool, numWant i
Bool("v6", v6).
Msg("announce peers")
return ps.GetPeers(ih, forSeeder, numWant, v6, func(ctx context.Context, infoHashKey string, maxCount int) *redis.StringSliceCmd {
return ps.GetPeers(ih, forSeeder, numWant, v6, func(infoHashKey string, maxCount int) *redis.StringSliceCmd {
return ps.HRandField(ctx, infoHashKey, maxCount, false)
})
}
type getPeerCountFn func(context.Context, string) *redis.IntCmd
func (ps *Connection) countPeers(infoHashKey string, countFn getPeerCountFn) uint32 {
count, err := countFn(context.TODO(), infoHashKey).Result()
func (ps *Connection) countPeers(ctx context.Context, infoHashKey string, countFn getPeerCountFn) uint32 {
count, err := countFn(ctx, infoHashKey).Result()
err = AsNil(err)
if err != nil {
logger.Error().Err(err).Str("infoHashKey", infoHashKey).Msg("key size calculation failure")
@@ -522,14 +522,14 @@ func (ps *Connection) countPeers(infoHashKey string, countFn getPeerCountFn) uin
}
// ScrapeIH calls provided countFn and returns seeders, leechers and downloads count for specified info hash
func (ps *Connection) ScrapeIH(ih bittorrent.InfoHash, countFn getPeerCountFn) (leechersCount, seedersCount, downloadsCount uint32) {
func (ps *Connection) ScrapeIH(ctx context.Context, ih bittorrent.InfoHash, countFn getPeerCountFn) (leechersCount, seedersCount, downloadsCount uint32) {
infoHash := ih.RawString()
leechersCount = ps.countPeers(InfoHashKey(infoHash, false, false), countFn) +
ps.countPeers(InfoHashKey(infoHash, false, true), countFn)
seedersCount = ps.countPeers(InfoHashKey(infoHash, true, false), countFn) +
ps.countPeers(InfoHashKey(infoHash, true, true), countFn)
d, err := ps.HGet(context.TODO(), CountDownloadsKey, infoHash).Uint64()
leechersCount = ps.countPeers(ctx, InfoHashKey(infoHash, false, false), countFn) +
ps.countPeers(ctx, InfoHashKey(infoHash, false, true), countFn)
seedersCount = ps.countPeers(ctx, InfoHashKey(infoHash, true, false), countFn) +
ps.countPeers(ctx, InfoHashKey(infoHash, true, true), countFn)
d, err := ps.HGet(ctx, CountDownloadsKey, infoHash).Uint64()
if err = AsNil(err); err != nil {
logger.Error().Err(err).Str("infoHash", infoHash).Msg("downloads count calculation failure")
}
@@ -538,31 +538,31 @@ func (ps *Connection) ScrapeIH(ih bittorrent.InfoHash, countFn getPeerCountFn) (
return
}
func (ps *store) ScrapeSwarm(ih bittorrent.InfoHash) (uint32, uint32, uint32) {
func (ps *store) ScrapeSwarm(ctx context.Context, ih bittorrent.InfoHash) (uint32, uint32, uint32) {
logger.Trace().
Stringer("infoHash", ih).
Msg("scrape swarm")
return ps.ScrapeIH(ih, ps.HLen)
return ps.ScrapeIH(ctx, ih, ps.HLen)
}
const argNumErrorMsg = "ERR wrong number of arguments"
// Put - storage.DataStorage implementation
func (ps *Connection) Put(ctx string, values ...storage.Entry) (err error) {
func (ps *Connection) Put(ctx context.Context, storeCtx string, values ...storage.Entry) (err error) {
if l := len(values); l > 0 {
if l == 1 {
err = ps.HSet(context.TODO(), PrefixKey+ctx, values[0].Key, values[0].Value).Err()
err = ps.HSet(ctx, PrefixKey+storeCtx, values[0].Key, values[0].Value).Err()
} else {
args := make([]any, 0, l*2)
for _, p := range values {
args = append(args, p.Key, p.Value)
}
err = ps.HSet(context.TODO(), PrefixKey+ctx, args...).Err()
err = ps.HSet(ctx, PrefixKey+storeCtx, args...).Err()
if err != nil {
if strings.Contains(err.Error(), argNumErrorMsg) {
logger.Warn().Msg("This Redis version/implementation does not support variadic arguments for HSET")
for _, p := range values {
if err = ps.HSet(context.TODO(), PrefixKey+ctx, p.Key, p.Value).Err(); err != nil {
if err = ps.HSet(ctx, PrefixKey+storeCtx, p.Key, p.Value).Err(); err != nil {
break
}
}
@@ -574,14 +574,14 @@ func (ps *Connection) Put(ctx string, values ...storage.Entry) (err error) {
}
// Contains - storage.DataStorage implementation
func (ps *Connection) Contains(ctx string, key string) (bool, error) {
exist, err := ps.HExists(context.TODO(), PrefixKey+ctx, key).Result()
func (ps *Connection) Contains(ctx context.Context, storeCtx string, key string) (bool, error) {
exist, err := ps.HExists(ctx, PrefixKey+storeCtx, key).Result()
return exist, AsNil(err)
}
// Load - storage.DataStorage implementation
func (ps *Connection) Load(ctx string, key string) (v []byte, err error) {
v, err = ps.HGet(context.TODO(), PrefixKey+ctx, key).Bytes()
func (ps *Connection) Load(ctx context.Context, storeCtx string, key string) (v []byte, err error) {
v, err = ps.HGet(ctx, PrefixKey+storeCtx, key).Bytes()
if err != nil && errors.Is(err, redis.Nil) {
v, err = nil, nil
}
@@ -589,14 +589,14 @@ func (ps *Connection) Load(ctx string, key string) (v []byte, err error) {
}
// Delete - storage.DataStorage implementation
func (ps *Connection) Delete(ctx string, keys ...string) (err error) {
func (ps *Connection) Delete(ctx context.Context, storeCtx string, keys ...string) (err error) {
if len(keys) > 0 {
err = AsNil(ps.HDel(context.TODO(), PrefixKey+ctx, keys...).Err())
err = AsNil(ps.HDel(ctx, PrefixKey+storeCtx, keys...).Err())
if err != nil {
if strings.Contains(err.Error(), argNumErrorMsg) {
logger.Warn().Msg("This Redis version/implementation does not support variadic arguments for HDEL")
for _, k := range keys {
if err = AsNil(ps.HDel(context.TODO(), PrefixKey+ctx, k).Err()); err != nil {
if err = AsNil(ps.HDel(ctx, PrefixKey+storeCtx, k).Err()); err != nil {
break
}
}
@@ -620,8 +620,8 @@ func (*store) StatisticsAware() bool {
}
// Ping sends `PING` request to Redis server
func (ps *Connection) Ping() error {
return ps.UniversalClient.Ping(context.TODO()).Err()
func (ps *Connection) Ping(ctx context.Context) error {
return ps.UniversalClient.Ping(ctx).Err()
}
// GC deletes all Peers from the PeerStorage which are older than the
@@ -747,7 +747,6 @@ func (ps *store) gc(cutoff time.Time) {
if err == nil && infoHashCount == 0 {
// Empty hashes are not shown among existing keys,
// in other words, it's removed automatically after `HDEL` the last field.
// _, err := ps.Del(context.TODO(), infoHashKey)
err = AsNil(ps.SRem(context.Background(), IHKey, infoHashKey).Err())
}
return err

View File

@@ -3,6 +3,7 @@
package storage
import (
"context"
"fmt"
"sync"
"time"
@@ -93,18 +94,18 @@ var ErrResourceDoesNotExist = bittorrent.ClientError("resource does not exist")
// DataStorage is the interface, used for implementing store for arbitrary data
type DataStorage interface {
// Put used to place arbitrary k-v data with specified context
// into storage. ctx parameter used to group data
// into storage. storeCtx parameter used to group data
// (i.e. data only for specific middleware module: hash key, table name etc...)
Put(ctx string, values ...Entry) error
Put(ctx context.Context, storeCtx string, values ...Entry) error
// Contains checks if any data in specified context exist
Contains(ctx string, key string) (bool, error)
Contains(ctx context.Context, storeCtx string, key string) (bool, error)
// Load used to get arbitrary data in specified context by its key
Load(ctx string, key string) ([]byte, error)
Load(ctx context.Context, storeCtx string, key string) ([]byte, error)
// Delete used to delete arbitrary data in specified context by its keys
Delete(ctx string, keys ...string) error
Delete(ctx context.Context, storeCtx string, keys ...string) error
// Preservable indicates, that this storage can store data permanently,
// in other words, is NOT in-memory storage, which data will be lost after restart
@@ -130,33 +131,33 @@ type PeerStorage interface {
DataStorage
// PutSeeder adds a Seeder to the Swarm identified by the provided
// InfoHash.
PutSeeder(ih bittorrent.InfoHash, peer bittorrent.Peer) error
PutSeeder(ctx context.Context, ih bittorrent.InfoHash, peer bittorrent.Peer) error
// DeleteSeeder removes a Seeder from the Swarm identified by the
// provided InfoHash.
//
// If the Swarm or Peer does not exist, this function returns
// ErrResourceDoesNotExist.
DeleteSeeder(ih bittorrent.InfoHash, peer bittorrent.Peer) error
DeleteSeeder(ctx context.Context, ih bittorrent.InfoHash, peer bittorrent.Peer) error
// PutLeecher adds a Leecher to the Swarm identified by the provided
// InfoHash.
// If the Swarm does not exist already, it is created.
PutLeecher(ih bittorrent.InfoHash, peer bittorrent.Peer) error
PutLeecher(ctx context.Context, ih bittorrent.InfoHash, peer bittorrent.Peer) error
// DeleteLeecher removes a Leecher from the Swarm identified by the
// provided InfoHash.
//
// If the Swarm or Peer does not exist, this function returns
// ErrResourceDoesNotExist.
DeleteLeecher(ih bittorrent.InfoHash, peer bittorrent.Peer) error
DeleteLeecher(ctx context.Context, ih bittorrent.InfoHash, peer bittorrent.Peer) error
// GraduateLeecher promotes a Leecher to a Seeder in the Swarm
// identified by the provided InfoHash.
//
// If the given Peer is not present as a Leecher or the swarm does not exist
// already, the Peer is added as a Seeder and no error is returned.
GraduateLeecher(ih bittorrent.InfoHash, peer bittorrent.Peer) error
GraduateLeecher(ctx context.Context, ih bittorrent.InfoHash, peer bittorrent.Peer) error
// AnnouncePeers is a best effort attempt to return Peers from the Swarm
// identified by the provided InfoHash.
@@ -173,7 +174,7 @@ type PeerStorage interface {
// leechers
//
// Returns ErrResourceDoesNotExist if the provided InfoHash is not tracked.
AnnouncePeers(ih bittorrent.InfoHash, forSeeder bool, numWant int, v6 bool) (peers []bittorrent.Peer, err error)
AnnouncePeers(ctx context.Context, ih bittorrent.InfoHash, forSeeder bool, numWant int, v6 bool) (peers []bittorrent.Peer, err error)
// ScrapeSwarm returns information required to answer a Scrape request
// about a Swarm identified by the given InfoHash.
@@ -183,11 +184,11 @@ type PeerStorage interface {
// filling the Snatches field is optional.
//
// If the Swarm does not exist, an empty Scrape and no error is returned.
ScrapeSwarm(ih bittorrent.InfoHash) (leechers uint32, seeders uint32, snatched uint32)
ScrapeSwarm(ctx context.Context, ih bittorrent.InfoHash) (leechers uint32, seeders uint32, snatched uint32)
// Ping used for checks if storage is alive
// (connection could be established, enough space etc.)
Ping() error
Ping(ctx context.Context) error
// GCAware marks that this storage supports periodic
// peers collection

View File

@@ -125,7 +125,7 @@ func (bh *benchHolder) Nop(b *testing.B) {
// Put can run in parallel.
func (bh *benchHolder) Put(b *testing.B) {
bh.runBenchmark(b, true, nil, func(i int, ps storage.PeerStorage, bd *benchData) error {
return ps.PutSeeder(bd.infoHashes[0], bd.peers[0])
return ps.PutSeeder(nil, bd.infoHashes[0], bd.peers[0])
})
}
@@ -135,7 +135,7 @@ func (bh *benchHolder) Put(b *testing.B) {
// Put1k can run in parallel.
func (bh *benchHolder) Put1k(b *testing.B) {
bh.runBenchmark(b, true, nil, func(i int, ps storage.PeerStorage, bd *benchData) error {
return ps.PutSeeder(bd.infoHashes[0], bd.peers[i%peersCount])
return ps.PutSeeder(nil, bd.infoHashes[0], bd.peers[i%peersCount])
})
}
@@ -145,7 +145,7 @@ func (bh *benchHolder) Put1k(b *testing.B) {
// Put1kInfoHash can run in parallel.
func (bh *benchHolder) Put1kInfoHash(b *testing.B) {
bh.runBenchmark(b, true, nil, func(i int, ps storage.PeerStorage, bd *benchData) error {
return ps.PutSeeder(bd.infoHashes[i%ihCount], bd.peers[0])
return ps.PutSeeder(nil, bd.infoHashes[i%ihCount], bd.peers[0])
})
}
@@ -155,7 +155,7 @@ func (bh *benchHolder) Put1kInfoHash(b *testing.B) {
// Put1kInfoHash1k can run in parallel.
func (bh *benchHolder) Put1kInfoHash1k(b *testing.B) {
bh.runBenchmark(b, true, nil, func(i int, ps storage.PeerStorage, bd *benchData) error {
err := ps.PutSeeder(bd.infoHashes[i%ihCount], bd.peers[(i*3)%peersCount])
err := ps.PutSeeder(nil, bd.infoHashes[i%ihCount], bd.peers[(i*3)%peersCount])
return err
})
}
@@ -166,11 +166,11 @@ func (bh *benchHolder) Put1kInfoHash1k(b *testing.B) {
// PutDelete can not run in parallel.
func (bh *benchHolder) PutDelete(b *testing.B) {
bh.runBenchmark(b, false, nil, func(i int, ps storage.PeerStorage, bd *benchData) error {
err := ps.PutSeeder(bd.infoHashes[0], bd.peers[0])
err := ps.PutSeeder(nil, bd.infoHashes[0], bd.peers[0])
if err != nil {
return err
}
return ps.DeleteSeeder(bd.infoHashes[0], bd.peers[0])
return ps.DeleteSeeder(nil, bd.infoHashes[0], bd.peers[0])
})
}
@@ -180,11 +180,11 @@ func (bh *benchHolder) PutDelete(b *testing.B) {
// PutDelete1k can not run in parallel.
func (bh *benchHolder) PutDelete1k(b *testing.B) {
bh.runBenchmark(b, false, nil, func(i int, ps storage.PeerStorage, bd *benchData) error {
err := ps.PutSeeder(bd.infoHashes[0], bd.peers[i%peersCount])
err := ps.PutSeeder(nil, bd.infoHashes[0], bd.peers[i%peersCount])
if err != nil {
return err
}
return ps.DeleteSeeder(bd.infoHashes[0], bd.peers[i%peersCount])
return ps.DeleteSeeder(nil, bd.infoHashes[0], bd.peers[i%peersCount])
})
}
@@ -194,11 +194,11 @@ func (bh *benchHolder) PutDelete1k(b *testing.B) {
// PutDelete1kInfoHash can not run in parallel.
func (bh *benchHolder) PutDelete1kInfoHash(b *testing.B) {
bh.runBenchmark(b, false, nil, func(i int, ps storage.PeerStorage, bd *benchData) error {
err := ps.PutSeeder(bd.infoHashes[i%ihCount], bd.peers[0])
err := ps.PutSeeder(nil, bd.infoHashes[i%ihCount], bd.peers[0])
if err != nil {
return err
}
return ps.DeleteSeeder(bd.infoHashes[i%ihCount], bd.peers[0])
return ps.DeleteSeeder(nil, bd.infoHashes[i%ihCount], bd.peers[0])
})
}
@@ -208,11 +208,11 @@ func (bh *benchHolder) PutDelete1kInfoHash(b *testing.B) {
// PutDelete1kInfoHash1k can not run in parallel.
func (bh *benchHolder) PutDelete1kInfoHash1k(b *testing.B) {
bh.runBenchmark(b, false, nil, func(i int, ps storage.PeerStorage, bd *benchData) error {
err := ps.PutSeeder(bd.infoHashes[i%ihCount], bd.peers[(i*3)%peersCount])
err := ps.PutSeeder(nil, bd.infoHashes[i%ihCount], bd.peers[(i*3)%peersCount])
if err != nil {
return err
}
err = ps.DeleteSeeder(bd.infoHashes[i%ihCount], bd.peers[(i*3)%peersCount])
err = ps.DeleteSeeder(nil, bd.infoHashes[i%ihCount], bd.peers[(i*3)%peersCount])
return err
})
}
@@ -223,7 +223,7 @@ func (bh *benchHolder) PutDelete1kInfoHash1k(b *testing.B) {
// DeleteNonexist can run in parallel.
func (bh *benchHolder) DeleteNonexist(b *testing.B) {
bh.runBenchmark(b, true, nil, func(i int, ps storage.PeerStorage, bd *benchData) error {
_ = ps.DeleteSeeder(bd.infoHashes[0], bd.peers[0])
_ = ps.DeleteSeeder(nil, bd.infoHashes[0], bd.peers[0])
return nil
})
}
@@ -234,7 +234,7 @@ func (bh *benchHolder) DeleteNonexist(b *testing.B) {
// DeleteNonexist can run in parallel.
func (bh *benchHolder) DeleteNonexist1k(b *testing.B) {
bh.runBenchmark(b, true, nil, func(i int, ps storage.PeerStorage, bd *benchData) error {
_ = ps.DeleteSeeder(bd.infoHashes[0], bd.peers[i%peersCount])
_ = ps.DeleteSeeder(nil, bd.infoHashes[0], bd.peers[i%peersCount])
return nil
})
}
@@ -245,7 +245,7 @@ func (bh *benchHolder) DeleteNonexist1k(b *testing.B) {
// DeleteNonexist1kInfoHash can run in parallel.
func (bh *benchHolder) DeleteNonexist1kInfoHash(b *testing.B) {
bh.runBenchmark(b, true, nil, func(i int, ps storage.PeerStorage, bd *benchData) error {
_ = ps.DeleteSeeder(bd.infoHashes[i%ihCount], bd.peers[0])
_ = ps.DeleteSeeder(nil, bd.infoHashes[i%ihCount], bd.peers[0])
return nil
})
}
@@ -256,7 +256,7 @@ func (bh *benchHolder) DeleteNonexist1kInfoHash(b *testing.B) {
// DeleteNonexist1kInfoHash1k can run in parallel.
func (bh *benchHolder) DeleteNonexist1kInfoHash1k(b *testing.B) {
bh.runBenchmark(b, true, nil, func(i int, ps storage.PeerStorage, bd *benchData) error {
_ = ps.DeleteSeeder(bd.infoHashes[i%ihCount], bd.peers[(i*3)%peersCount])
_ = ps.DeleteSeeder(nil, bd.infoHashes[i%ihCount], bd.peers[(i*3)%peersCount])
return nil
})
}
@@ -267,7 +267,7 @@ func (bh *benchHolder) DeleteNonexist1kInfoHash1k(b *testing.B) {
// GradNonexist can run in parallel.
func (bh *benchHolder) GradNonexist(b *testing.B) {
bh.runBenchmark(b, true, nil, func(i int, ps storage.PeerStorage, bd *benchData) error {
_ = ps.GraduateLeecher(bd.infoHashes[0], bd.peers[0])
_ = ps.GraduateLeecher(nil, bd.infoHashes[0], bd.peers[0])
return nil
})
}
@@ -278,7 +278,7 @@ func (bh *benchHolder) GradNonexist(b *testing.B) {
// GradNonexist1k can run in parallel.
func (bh *benchHolder) GradNonexist1k(b *testing.B) {
bh.runBenchmark(b, true, nil, func(i int, ps storage.PeerStorage, bd *benchData) error {
_ = ps.GraduateLeecher(bd.infoHashes[0], bd.peers[i%peersCount])
_ = ps.GraduateLeecher(nil, bd.infoHashes[0], bd.peers[i%peersCount])
return nil
})
}
@@ -289,7 +289,7 @@ func (bh *benchHolder) GradNonexist1k(b *testing.B) {
// GradNonexist1kInfoHash can run in parallel.
func (bh *benchHolder) GradNonexist1kInfoHash(b *testing.B) {
bh.runBenchmark(b, true, nil, func(i int, ps storage.PeerStorage, bd *benchData) error {
_ = ps.GraduateLeecher(bd.infoHashes[i%ihCount], bd.peers[0])
_ = ps.GraduateLeecher(nil, bd.infoHashes[i%ihCount], bd.peers[0])
return nil
})
}
@@ -301,7 +301,7 @@ func (bh *benchHolder) GradNonexist1kInfoHash(b *testing.B) {
// GradNonexist1kInfoHash1k can run in parallel.
func (bh *benchHolder) GradNonexist1kInfoHash1k(b *testing.B) {
bh.runBenchmark(b, true, nil, func(i int, ps storage.PeerStorage, bd *benchData) error {
_ = ps.GraduateLeecher(bd.infoHashes[i%ihCount], bd.peers[(i*3)%peersCount])
_ = ps.GraduateLeecher(nil, bd.infoHashes[i%ihCount], bd.peers[(i*3)%peersCount])
return nil
})
}
@@ -313,15 +313,15 @@ func (bh *benchHolder) GradNonexist1kInfoHash1k(b *testing.B) {
// PutGradDelete can not run in parallel.
func (bh *benchHolder) PutGradDelete(b *testing.B) {
bh.runBenchmark(b, false, nil, func(i int, ps storage.PeerStorage, bd *benchData) error {
err := ps.PutLeecher(bd.infoHashes[0], bd.peers[0])
err := ps.PutLeecher(nil, bd.infoHashes[0], bd.peers[0])
if err != nil {
return err
}
err = ps.GraduateLeecher(bd.infoHashes[0], bd.peers[0])
err = ps.GraduateLeecher(nil, bd.infoHashes[0], bd.peers[0])
if err != nil {
return err
}
return ps.DeleteSeeder(bd.infoHashes[0], bd.peers[0])
return ps.DeleteSeeder(nil, bd.infoHashes[0], bd.peers[0])
})
}
@@ -330,15 +330,15 @@ func (bh *benchHolder) PutGradDelete(b *testing.B) {
// PutGradDelete1k can not run in parallel.
func (bh *benchHolder) PutGradDelete1k(b *testing.B) {
bh.runBenchmark(b, false, nil, func(i int, ps storage.PeerStorage, bd *benchData) error {
err := ps.PutLeecher(bd.infoHashes[0], bd.peers[i%peersCount])
err := ps.PutLeecher(nil, bd.infoHashes[0], bd.peers[i%peersCount])
if err != nil {
return err
}
err = ps.GraduateLeecher(bd.infoHashes[0], bd.peers[i%peersCount])
err = ps.GraduateLeecher(nil, bd.infoHashes[0], bd.peers[i%peersCount])
if err != nil {
return err
}
return ps.DeleteSeeder(bd.infoHashes[0], bd.peers[i%peersCount])
return ps.DeleteSeeder(nil, bd.infoHashes[0], bd.peers[i%peersCount])
})
}
@@ -348,15 +348,15 @@ func (bh *benchHolder) PutGradDelete1k(b *testing.B) {
// PutGradDelete1kInfoHash can not run in parallel.
func (bh *benchHolder) PutGradDelete1kInfoHash(b *testing.B) {
bh.runBenchmark(b, false, nil, func(i int, ps storage.PeerStorage, bd *benchData) error {
err := ps.PutLeecher(bd.infoHashes[i%ihCount], bd.peers[0])
err := ps.PutLeecher(nil, bd.infoHashes[i%ihCount], bd.peers[0])
if err != nil {
return err
}
err = ps.GraduateLeecher(bd.infoHashes[i%ihCount], bd.peers[0])
err = ps.GraduateLeecher(nil, bd.infoHashes[i%ihCount], bd.peers[0])
if err != nil {
return err
}
return ps.DeleteSeeder(bd.infoHashes[i%ihCount], bd.peers[0])
return ps.DeleteSeeder(nil, bd.infoHashes[i%ihCount], bd.peers[0])
})
}
@@ -366,15 +366,15 @@ func (bh *benchHolder) PutGradDelete1kInfoHash(b *testing.B) {
// PutGradDelete1kInfoHash can not run in parallel.
func (bh *benchHolder) PutGradDelete1kInfoHash1k(b *testing.B) {
bh.runBenchmark(b, false, nil, func(i int, ps storage.PeerStorage, bd *benchData) error {
err := ps.PutLeecher(bd.infoHashes[i%ihCount], bd.peers[(i*3)%peersCount])
err := ps.PutLeecher(nil, bd.infoHashes[i%ihCount], bd.peers[(i*3)%peersCount])
if err != nil {
return err
}
err = ps.GraduateLeecher(bd.infoHashes[i%ihCount], bd.peers[(i*3)%peersCount])
err = ps.GraduateLeecher(nil, bd.infoHashes[i%ihCount], bd.peers[(i*3)%peersCount])
if err != nil {
return err
}
err = ps.DeleteSeeder(bd.infoHashes[i%ihCount], bd.peers[(i*3)%peersCount])
err = ps.DeleteSeeder(nil, bd.infoHashes[i%ihCount], bd.peers[(i*3)%peersCount])
return err
})
}
@@ -385,9 +385,9 @@ func putPeers(ps storage.PeerStorage, bd *benchData) error {
for i, peer := range bd.peers {
var err error
if i < l/2 {
err = ps.PutLeecher(ih, peer)
err = ps.PutLeecher(nil, ih, peer)
} else {
err = ps.PutSeeder(ih, peer)
err = ps.PutSeeder(nil, ih, peer)
}
if err != nil {
return err
@@ -404,7 +404,7 @@ func putPeers(ps storage.PeerStorage, bd *benchData) error {
// AnnounceLeecher can run in parallel.
func (bh *benchHolder) AnnounceLeecher(b *testing.B) {
bh.runBenchmark(b, true, putPeers, func(i int, ps storage.PeerStorage, bd *benchData) error {
_, err := ps.AnnouncePeers(bd.infoHashes[0], false, 50, bd.peers[0].Addr().Is6())
_, err := ps.AnnouncePeers(nil, bd.infoHashes[0], false, 50, bd.peers[0].Addr().Is6())
return err
})
}
@@ -415,7 +415,7 @@ func (bh *benchHolder) AnnounceLeecher(b *testing.B) {
// AnnounceLeecher1kInfoHash can run in parallel.
func (bh *benchHolder) AnnounceLeecher1kInfoHash(b *testing.B) {
bh.runBenchmark(b, true, putPeers, func(i int, ps storage.PeerStorage, bd *benchData) error {
_, err := ps.AnnouncePeers(bd.infoHashes[i%ihCount], false, 50, bd.peers[0].Addr().Is6())
_, err := ps.AnnouncePeers(nil, bd.infoHashes[i%ihCount], false, 50, bd.peers[0].Addr().Is6())
return err
})
}
@@ -426,7 +426,7 @@ func (bh *benchHolder) AnnounceLeecher1kInfoHash(b *testing.B) {
// AnnounceSeeder can run in parallel.
func (bh *benchHolder) AnnounceSeeder(b *testing.B) {
bh.runBenchmark(b, true, putPeers, func(i int, ps storage.PeerStorage, bd *benchData) error {
_, err := ps.AnnouncePeers(bd.infoHashes[0], true, 50, bd.peers[0].Addr().Is6())
_, err := ps.AnnouncePeers(nil, bd.infoHashes[0], true, 50, bd.peers[0].Addr().Is6())
return err
})
}
@@ -437,7 +437,7 @@ func (bh *benchHolder) AnnounceSeeder(b *testing.B) {
// AnnounceSeeder1kInfoHash can run in parallel.
func (bh *benchHolder) AnnounceSeeder1kInfoHash(b *testing.B) {
bh.runBenchmark(b, true, putPeers, func(i int, ps storage.PeerStorage, bd *benchData) error {
_, err := ps.AnnouncePeers(bd.infoHashes[i%ihCount], true, 50, bd.peers[0].Addr().Is6())
_, err := ps.AnnouncePeers(nil, bd.infoHashes[i%ihCount], true, 50, bd.peers[0].Addr().Is6())
return err
})
}
@@ -448,7 +448,7 @@ func (bh *benchHolder) AnnounceSeeder1kInfoHash(b *testing.B) {
// ScrapeSwarm can run in parallel.
func (bh *benchHolder) ScrapeSwarm(b *testing.B) {
bh.runBenchmark(b, true, putPeers, func(i int, ps storage.PeerStorage, bd *benchData) error {
ps.ScrapeSwarm(bd.infoHashes[0])
ps.ScrapeSwarm(nil, bd.infoHashes[0])
return nil
})
}
@@ -458,7 +458,7 @@ func (bh *benchHolder) ScrapeSwarm(b *testing.B) {
// ScrapeSwarm1kInfoHash can run in parallel.
func (bh *benchHolder) ScrapeSwarm1kInfoHash(b *testing.B) {
bh.runBenchmark(b, true, putPeers, func(i int, ps storage.PeerStorage, bd *benchData) error {
ps.ScrapeSwarm(bd.infoHashes[i%ihCount])
ps.ScrapeSwarm(nil, bd.infoHashes[i%ihCount])
return nil
})
}

View File

@@ -40,7 +40,7 @@ type hashPeer struct {
func (th *testHolder) DeleteSeeder(t *testing.T) {
for _, c := range testData {
err := th.st.DeleteSeeder(c.ih, c.peer)
err := th.st.DeleteSeeder(nil, c.ih, c.peer)
if errors.Is(err, storage.ErrResourceDoesNotExist) {
err = nil
}
@@ -54,14 +54,14 @@ func (th *testHolder) PutLeecher(t *testing.T) {
if c.peer.Addr().Is6() {
peer = v6Peer
}
err := th.st.PutLeecher(c.ih, peer)
err := th.st.PutLeecher(nil, c.ih, peer)
require.Nil(t, err)
}
}
func (th *testHolder) DeleteLeecher(t *testing.T) {
for _, c := range testData {
err := th.st.DeleteLeecher(c.ih, c.peer)
err := th.st.DeleteLeecher(nil, c.ih, c.peer)
if errors.Is(err, storage.ErrResourceDoesNotExist) {
err = nil
}
@@ -71,7 +71,7 @@ func (th *testHolder) DeleteLeecher(t *testing.T) {
func (th *testHolder) AnnouncePeers(t *testing.T) {
for _, c := range testData {
_, err := th.st.AnnouncePeers(c.ih, false, 50, c.peer.Addr().Is6())
_, err := th.st.AnnouncePeers(nil, c.ih, false, 50, c.peer.Addr().Is6())
if errors.Is(err, storage.ErrResourceDoesNotExist) {
err = nil
}
@@ -81,7 +81,7 @@ func (th *testHolder) AnnouncePeers(t *testing.T) {
func (th *testHolder) ScrapeSwarm(t *testing.T) {
for _, c := range testData {
l, s, n := th.st.ScrapeSwarm(c.ih)
l, s, n := th.st.ScrapeSwarm(nil, c.ih)
require.Equal(t, uint32(0), s)
require.Equal(t, uint32(0), l)
require.Equal(t, uint32(0), n)
@@ -91,26 +91,26 @@ func (th *testHolder) ScrapeSwarm(t *testing.T) {
func (th *testHolder) LeecherPutAnnounceDeleteAnnounce(t *testing.T) {
for _, c := range testData {
isV6 := c.peer.Addr().Is6()
err := th.st.PutLeecher(c.ih, c.peer)
err := th.st.PutLeecher(nil, c.ih, c.peer)
require.Nil(t, err)
peers, err := th.st.AnnouncePeers(c.ih, true, 50, isV6)
peers, err := th.st.AnnouncePeers(nil, c.ih, true, 50, isV6)
require.Nil(t, err)
require.True(t, containsPeer(peers, c.peer))
// non-seeder announce should still return the leecher
peers, err = th.st.AnnouncePeers(c.ih, false, 50, isV6)
peers, err = th.st.AnnouncePeers(nil, c.ih, false, 50, isV6)
require.Nil(t, err)
require.True(t, containsPeer(peers, c.peer))
l, s, _ := th.st.ScrapeSwarm(c.ih)
l, s, _ := th.st.ScrapeSwarm(nil, c.ih)
require.Equal(t, uint32(2), l)
require.Equal(t, uint32(0), s)
err = th.st.DeleteLeecher(c.ih, c.peer)
err = th.st.DeleteLeecher(nil, c.ih, c.peer)
require.Nil(t, err)
peers, err = th.st.AnnouncePeers(c.ih, true, 50, isV6)
peers, err = th.st.AnnouncePeers(nil, c.ih, true, 50, isV6)
if errors.Is(err, storage.ErrResourceDoesNotExist) {
err = nil
}
@@ -122,22 +122,22 @@ func (th *testHolder) LeecherPutAnnounceDeleteAnnounce(t *testing.T) {
func (th *testHolder) SeederPutAnnounceDeleteAnnounce(t *testing.T) {
for _, c := range testData {
isV6 := c.peer.Addr().Is6()
err := th.st.PutSeeder(c.ih, c.peer)
err := th.st.PutSeeder(nil, c.ih, c.peer)
require.Nil(t, err)
// Should be leecher to see the seeder
peers, err := th.st.AnnouncePeers(c.ih, false, 50, isV6)
peers, err := th.st.AnnouncePeers(nil, c.ih, false, 50, isV6)
require.Nil(t, err)
require.True(t, containsPeer(peers, c.peer))
l, s, _ := th.st.ScrapeSwarm(c.ih)
l, s, _ := th.st.ScrapeSwarm(nil, c.ih)
require.Equal(t, uint32(1), l)
require.Equal(t, uint32(1), s)
err = th.st.DeleteSeeder(c.ih, c.peer)
err = th.st.DeleteSeeder(nil, c.ih, c.peer)
require.Nil(t, err)
peers, err = th.st.AnnouncePeers(c.ih, false, 50, isV6)
peers, err = th.st.AnnouncePeers(nil, c.ih, false, 50, isV6)
if errors.Is(err, storage.ErrResourceDoesNotExist) {
err = nil
}
@@ -153,44 +153,44 @@ func (th *testHolder) LeecherPutGraduateAnnounceDeleteAnnounce(t *testing.T) {
if isV6 {
peer = v6Peer
}
err := th.st.PutLeecher(c.ih, c.peer)
err := th.st.PutLeecher(nil, c.ih, c.peer)
require.Nil(t, err)
err = th.st.GraduateLeecher(c.ih, c.peer)
err = th.st.GraduateLeecher(nil, c.ih, c.peer)
require.Nil(t, err)
// Has to be leecher to see the graduated seeder
peers, err := th.st.AnnouncePeers(c.ih, false, 50, isV6)
peers, err := th.st.AnnouncePeers(nil, c.ih, false, 50, isV6)
require.Nil(t, err)
require.True(t, containsPeer(peers, c.peer))
// Deleting the Peer as a Leecher should have no effect
err = th.st.DeleteLeecher(c.ih, c.peer)
err = th.st.DeleteLeecher(nil, c.ih, c.peer)
if errors.Is(err, storage.ErrResourceDoesNotExist) {
err = nil
}
require.Nil(t, err)
// Verify it's still there
peers, err = th.st.AnnouncePeers(c.ih, false, 50, isV6)
peers, err = th.st.AnnouncePeers(nil, c.ih, false, 50, isV6)
require.Nil(t, err)
require.True(t, containsPeer(peers, c.peer))
// Clean up
err = th.st.DeleteLeecher(c.ih, peer)
err = th.st.DeleteLeecher(nil, c.ih, peer)
require.Nil(t, err)
// Test ErrDNE for missing leecher
err = th.st.DeleteLeecher(c.ih, peer)
err = th.st.DeleteLeecher(nil, c.ih, peer)
if errors.Is(err, storage.ErrResourceDoesNotExist) {
err = nil
}
require.Nil(t, err)
err = th.st.DeleteSeeder(c.ih, c.peer)
err = th.st.DeleteSeeder(nil, c.ih, c.peer)
require.Nil(t, err)
err = th.st.DeleteSeeder(c.ih, c.peer)
err = th.st.DeleteSeeder(nil, c.ih, c.peer)
if errors.Is(err, storage.ErrResourceDoesNotExist) {
err = nil
}
@@ -200,35 +200,35 @@ func (th *testHolder) LeecherPutGraduateAnnounceDeleteAnnounce(t *testing.T) {
func (th *testHolder) CustomPutContainsLoadDelete(t *testing.T) {
for _, c := range testData {
err := th.st.Put(kvStoreCtx, storage.Entry{Key: c.peer.String(), Value: []byte(c.ih.RawString())})
err := th.st.Put(nil, kvStoreCtx, storage.Entry{Key: c.peer.String(), Value: []byte(c.ih.RawString())})
require.Nil(t, err)
// check if exist in ctx we put
contains, err := th.st.Contains(kvStoreCtx, c.peer.String())
contains, err := th.st.Contains(nil, kvStoreCtx, c.peer.String())
require.Nil(t, err)
require.True(t, contains)
// check if not exist in another ctx
contains, err = th.st.Contains("", c.peer.String())
contains, err = th.st.Contains(nil, "", c.peer.String())
require.Nil(t, err)
require.False(t, contains)
// check value and type in ctx we put
out, err := th.st.Load(kvStoreCtx, c.peer.String())
out, err := th.st.Load(nil, kvStoreCtx, c.peer.String())
require.Nil(t, err)
ih, err := bittorrent.NewInfoHash(out)
require.Nil(t, err)
require.Equal(t, c.ih, ih)
// check value is nil in another ctx
dummy, err := th.st.Load("", c.peer.String())
dummy, err := th.st.Load(nil, "", c.peer.String())
require.Nil(t, err)
require.Nil(t, dummy)
err = th.st.Delete(kvStoreCtx, c.peer.String())
err = th.st.Delete(nil, kvStoreCtx, c.peer.String())
require.Nil(t, err)
contains, err = th.st.Contains("", c.peer.String())
contains, err = th.st.Contains(nil, "", c.peer.String())
require.Nil(t, err)
require.False(t, contains)
}
@@ -245,29 +245,29 @@ func (th *testHolder) CustomBulkPutContainsLoadDelete(t *testing.T) {
Value: []byte(c.ih.RawString()),
})
}
err := th.st.Put(kvStoreCtx, pairs...)
err := th.st.Put(nil, kvStoreCtx, pairs...)
require.Nil(t, err)
// check if exist in ctx we put
for _, k := range keys {
contains, err := th.st.Contains(kvStoreCtx, k)
contains, err := th.st.Contains(nil, kvStoreCtx, k)
require.Nil(t, err)
require.True(t, contains)
}
// check value and type in ctx we put
for _, p := range pairs {
out, _ := th.st.Load(kvStoreCtx, p.Key)
out, _ := th.st.Load(nil, kvStoreCtx, p.Key)
ih, err := bittorrent.NewInfoHash(out)
require.Nil(t, err)
require.Equal(t, p.Value, []byte(ih.RawString()))
}
err = th.st.Delete(kvStoreCtx, keys...)
err = th.st.Delete(nil, kvStoreCtx, keys...)
require.Nil(t, err)
for _, k := range keys {
contains, err := th.st.Contains(kvStoreCtx, k)
contains, err := th.st.Contains(nil, kvStoreCtx, k)
require.Nil(t, err)
require.False(t, contains)
}