From 88f1ef7ca5e1c00282494023f82f652260475706 Mon Sep 17 00:00:00 2001 From: "Lawrence, Rendall" Date: Sun, 19 Mar 2023 19:52:52 +0300 Subject: [PATCH] suppress context canceled error --- frontend/http/frontend.go | 62 ++++++++++++++----------- frontend/udp/frontend.go | 24 ++++++---- middleware/hooks.go | 25 +++++++--- storage/keydb/storage.go | 6 +-- storage/memory/storage.go | 2 +- storage/pg/storage.go | 43 +++++++++-------- storage/redis/storage.go | 77 ++++++++++++++++--------------- storage/storage.go | 2 +- storage/test/storage_bench.go | 4 +- storage/test/storage_test_base.go | 6 +-- 10 files changed, 146 insertions(+), 105 deletions(-) diff --git a/frontend/http/frontend.go b/frontend/http/frontend.go index 76279e8..13df09d 100644 --- a/frontend/http/frontend.go +++ b/frontend/http/frontend.go @@ -3,6 +3,7 @@ package http import ( + "context" "crypto/tls" "errors" "net/http" @@ -249,23 +250,27 @@ func (f *httpFE) announceRoute(reqCtx *fasthttp.RequestCtx) { ctx := bittorrent.InjectRouteParamsToContext(reqCtx, nil) ctx, aResp, err := f.logic.HandleAnnounce(ctx, aReq) if err != nil { - writeErrorResponse(reqCtx, err) + if !errors.Is(err, context.Canceled) { + writeErrorResponse(reqCtx, err) + } return } - reqCtx.Response.Header.Set("Content-Type", "text/plain; charset=utf-8") - qArgs := reqCtx.QueryArgs() - // `compact` means that tracker should return addresses in - // binary (single concatenated string) mode instead of dictionary. - // `no_peer_id` means, that tracker may omit PeerID field in response dictionary. - // see https://wiki.theory.org/BitTorrentSpecification#Tracker_Request_Parameters - writeAnnounceResponse(reqCtx, aResp, qArgs.GetBool("compact"), !qArgs.GetBool("no_peer_id")) + if err = reqCtx.Err(); err == nil { + reqCtx.Response.Header.Set("Content-Type", "text/plain; charset=utf-8") + qArgs := reqCtx.QueryArgs() + // `compact` means that tracker should return addresses in + // binary (single concatenated string) mode instead of dictionary. + // `no_peer_id` means, that tracker may omit PeerID field in response dictionary. + // see https://wiki.theory.org/BitTorrentSpecification#Tracker_Request_Parameters + writeAnnounceResponse(reqCtx, aResp, qArgs.GetBool("compact"), !qArgs.GetBool("no_peer_id")) - // next actions are background and should not be canceled after http writer closed - ctx = bittorrent.RemapRouteParamsToBgContext(ctx) - // params mapped from fasthttp.QueryArgs will be reused in the next request - aReq.Params = nil - go f.logic.AfterAnnounce(ctx, aReq, aResp) + // next actions are background and should not be canceled after http writer closed + ctx = bittorrent.RemapRouteParamsToBgContext(ctx) + // params mapped from fasthttp.QueryArgs will be reused in the next request + aReq.Params = nil + go f.logic.AfterAnnounce(ctx, aReq, aResp) + } } // scrapeRoute parses and responds to a Scrape. @@ -290,32 +295,35 @@ func (f *httpFE) scrapeRoute(reqCtx *fasthttp.RequestCtx) { ctx := bittorrent.InjectRouteParamsToContext(reqCtx, nil) ctx, resp, err := f.logic.HandleScrape(ctx, req) if err != nil { - writeErrorResponse(reqCtx, err) + if !errors.Is(err, context.Canceled) { + writeErrorResponse(reqCtx, err) + } return } - reqCtx.Response.Header.Set("Content-Type", "text/plain; charset=utf-8") - writeScrapeResponse(reqCtx, resp) + if err = reqCtx.Err(); err == nil { + reqCtx.SetContentType("text/plain; charset=utf-8") + writeScrapeResponse(reqCtx, resp) - // next actions are background and should not be canceled after http writer closed - ctx = bittorrent.RemapRouteParamsToBgContext(ctx) - // params mapped from fasthttp.QueryArgs will in the next request - req.Params = nil - go f.logic.AfterScrape(ctx, req, resp) + // next actions are background and should not be canceled after http writer closed + ctx = bittorrent.RemapRouteParamsToBgContext(ctx) + // params mapped from fasthttp.QueryArgs will in the next request + req.Params = nil + go f.logic.AfterScrape(ctx, req, resp) + } } func (f *httpFE) ping(ctx *fasthttp.RequestCtx) { - var err error status := http.StatusOK - err = f.logic.Ping(ctx) - + err := f.logic.Ping(ctx) if err != nil { + if errors.Is(err, context.Canceled) { + return + } logger.Error().Err(err).Msg("ping completed with error") status = http.StatusServiceUnavailable } - if ctxErr := ctx.Err(); ctxErr == nil { + if err = ctx.Err(); err == nil { ctx.SetStatusCode(status) - } else { - logger.Info().Err(ctxErr).Stringer("addr", ctx.RemoteAddr()).Msg("ping request cancelled") } } diff --git a/frontend/udp/frontend.go b/frontend/udp/frontend.go index 5e61dc1..de6f4fa 100644 --- a/frontend/udp/frontend.go +++ b/frontend/udp/frontend.go @@ -298,14 +298,18 @@ func (f *udpFE) handleRequest(ctx context.Context, r Request, w ResponseWriter) ctx := bittorrent.InjectRouteParamsToContext(ctx, bittorrent.RouteParams{}) ctx, resp, err = f.logic.HandleAnnounce(ctx, req) if err != nil { - writeErrorResponse(w, txID, err) + if !errors.Is(err, context.Canceled) { + writeErrorResponse(w, txID, err) + } return } - writeAnnounceResponse(w, txID, resp, actionID == announceV6ActionID, r.IP.Is6()) + if err = ctx.Err(); err == nil { + writeAnnounceResponse(w, txID, resp, actionID == announceV6ActionID, r.IP.Is6()) - ctx = bittorrent.RemapRouteParamsToBgContext(ctx) - go f.logic.AfterAnnounce(ctx, req, resp) + ctx = bittorrent.RemapRouteParamsToBgContext(ctx) + go f.logic.AfterAnnounce(ctx, req, resp) + } case scrapeActionID: actionName = "scrape" @@ -321,14 +325,18 @@ func (f *udpFE) handleRequest(ctx context.Context, r Request, w ResponseWriter) ctx := bittorrent.InjectRouteParamsToContext(ctx, bittorrent.RouteParams{}) ctx, resp, err = f.logic.HandleScrape(ctx, req) if err != nil { - writeErrorResponse(w, txID, err) + if !errors.Is(err, context.Canceled) { + writeErrorResponse(w, txID, err) + } return } - writeScrapeResponse(w, txID, resp) + if err = ctx.Err(); err == nil { + writeScrapeResponse(w, txID, resp) - ctx = bittorrent.RemapRouteParamsToBgContext(ctx) - go f.logic.AfterScrape(ctx, req, resp) + ctx = bittorrent.RemapRouteParamsToBgContext(ctx) + go f.logic.AfterScrape(ctx, req, resp) + } default: err = errUnknownAction diff --git a/middleware/hooks.go b/middleware/hooks.go index 71dd9b5..11c6861 100644 --- a/middleware/hooks.go +++ b/middleware/hooks.go @@ -102,10 +102,17 @@ type responseHook struct { store storage.PeerStorage } -func (h *responseHook) scrape(ctx context.Context, ih bittorrent.InfoHash) (leechers uint32, seeders uint32, snatched uint32) { - leechers, seeders, snatched = h.store.ScrapeSwarm(ctx, ih) +func (h *responseHook) scrape(ctx context.Context, ih bittorrent.InfoHash) (leechers uint32, seeders uint32, snatched uint32, err error) { + leechers, seeders, snatched, err = h.store.ScrapeSwarm(ctx, ih) + if err != nil { + return + } if len(ih) == bittorrent.InfoHashV2Len { - l, s, n := h.store.ScrapeSwarm(ctx, ih.TruncateV1()) + var l, s, n uint32 + l, s, n, err = h.store.ScrapeSwarm(ctx, ih.TruncateV1()) + if err != nil { + return + } leechers, seeders, snatched = leechers+l, seeders+s, snatched+n } return @@ -117,7 +124,10 @@ func (h *responseHook) HandleAnnounce(ctx context.Context, req *bittorrent.Annou } // Add the Scrape data to the response. - resp.Incomplete, resp.Complete, _ = h.scrape(ctx, req.InfoHash) + resp.Incomplete, resp.Complete, _, err = h.scrape(ctx, req.InfoHash) + if err != nil { + return + } err = h.appendPeers(ctx, req, resp) return ctx, err @@ -202,14 +212,17 @@ func (h *responseHook) appendPeers(ctx context.Context, req *bittorrent.Announce return } -func (h *responseHook) HandleScrape(ctx context.Context, req *bittorrent.ScrapeRequest, resp *bittorrent.ScrapeResponse) (context.Context, error) { +func (h *responseHook) HandleScrape(ctx context.Context, req *bittorrent.ScrapeRequest, resp *bittorrent.ScrapeResponse) (_ context.Context, err error) { if ctx.Value(SkipResponseHookKey) != nil { return ctx, nil } for _, infoHash := range req.InfoHashes { scr := bittorrent.Scrape{InfoHash: infoHash} - scr.Incomplete, scr.Complete, scr.Snatches = h.scrape(ctx, infoHash) + scr.Incomplete, scr.Complete, scr.Snatches, err = h.scrape(ctx, infoHash) + if err != nil { + return + } resp.Files = append(resp.Files, scr) } diff --git a/storage/keydb/storage.go b/storage/keydb/storage.go index 65c1e64..bbc6365 100644 --- a/storage/keydb/storage.go +++ b/storage/keydb/storage.go @@ -70,7 +70,7 @@ func newStore(cfg r.Config) (*store, error) { cmd := redis.NewCommandsInfoCmd(context.Background(), "COMMAND", "INFO", expireMemberCmd) _ = rs.Process(context.Background(), cmd) - err = r.AsNil(cmd.Err()) + err = r.NoResultErr(cmd.Err()) if err == nil && len(cmd.Val()) == 0 { err = errNotKeyDB } @@ -105,7 +105,7 @@ func (s *store) delPeer(ctx context.Context, infoHashKey, peerID string) error { Str("peerID", peerID). Msg("del peer") deleted, err := s.SRem(ctx, infoHashKey, peerID).Uint64() - err = r.AsNil(err) + err = r.NoResultErr(err) if err == nil && deleted == 0 { err = storage.ErrResourceDoesNotExist } @@ -166,7 +166,7 @@ func (s *store) AnnouncePeers(ctx context.Context, ih bittorrent.InfoHash, forSe } // ScrapeSwarm is the same function as redis.ScrapeSwarm except `SCard` call instead of `HLen` -func (s *store) ScrapeSwarm(ctx context.Context, ih bittorrent.InfoHash) (uint32, uint32, uint32) { +func (s *store) ScrapeSwarm(ctx context.Context, ih bittorrent.InfoHash) (uint32, uint32, uint32, error) { logger.Trace(). Stringer("infoHash", ih). Msg("scrape swarm") diff --git a/storage/memory/storage.go b/storage/memory/storage.go index be27b4c..8697b81 100644 --- a/storage/memory/storage.go +++ b/storage/memory/storage.go @@ -436,7 +436,7 @@ func (ps *peerStore) countPeers(ih bittorrent.InfoHash, v6 bool) (leechers, seed return } -func (ps *peerStore) ScrapeSwarm(_ context.Context, ih bittorrent.InfoHash) (leechers uint32, seeders uint32, snatched uint32) { +func (ps *peerStore) ScrapeSwarm(_ context.Context, ih bittorrent.InfoHash) (leechers uint32, seeders uint32, snatched uint32, _ error) { select { case <-ps.closed: panic("attempted to interact with stopped memory store") diff --git a/storage/pg/storage.go b/storage/pg/storage.go index af0210b..631acec 100644 --- a/storage/pg/storage.go +++ b/storage/pg/storage.go @@ -64,6 +64,15 @@ func builder(icfg conf.MapConfig) (storage.PeerStorage, error) { return newStore(cfg) } +// noResultErr returns nil if provided err is pgx.ErrNoRows +// otherwise returns err +func noResultErr(err error) error { + if err == nil || errors.Is(err, pgx.ErrNoRows) { + return nil + } + return err +} + func newStore(cfg Config) (storage.PeerStorage, error) { cfg, err := cfg.Validate() if err != nil { @@ -155,7 +164,7 @@ func (cfg Config) Validate() (Config, error) { return cfg, err } - if err := fn(&validCfg.Peer.DelQuery, "peer.aelQuery"); err != nil { + if err := fn(&validCfg.Peer.DelQuery, "peer.delQuery"); err != nil { return cfg, err } @@ -266,9 +275,7 @@ func (s *store) Contains(ctx context.Context, storeCtx string, key string) (cont } func (s *store) Load(ctx context.Context, storeCtx string, key string) (out []byte, err error) { - if err = s.QueryRow(ctx, s.Data.GetQuery, pgx.NamedArgs{pCtx: storeCtx, pKey: []byte(key)}).Scan(&out); errors.Is(err, pgx.ErrNoRows) { - err = nil - } + err = noResultErr(s.QueryRow(ctx, s.Data.GetQuery, pgx.NamedArgs{pCtx: storeCtx, pKey: []byte(key)}).Scan(&out)) return } @@ -332,9 +339,13 @@ func (s *store) ScheduleStatisticsCollection(reportInterval time.Duration) { case <-t.C: if metrics.Enabled() { before := time.Now() - sc, lc := s.countPeers(context.Background(), nil) + sc, lc, err := s.countPeers(context.Background(), nil) + if err = noResultErr(err); err != nil { + logger.Error().Err(err).Msg("error occurred while get peers count count") + } var hc int - if err := s.QueryRow(context.Background(), s.InfoHashCountQuery).Scan(&hc); err != nil && !errors.Is(err, pgx.ErrNoRows) { + err = s.QueryRow(context.Background(), s.InfoHashCountQuery).Scan(&hc) + if err = noResultErr(err); err != nil { logger.Error().Err(err).Msg("error occurred while get info hash count") } @@ -514,15 +525,14 @@ func (s *store) AnnouncePeers(ctx context.Context, ih bittorrent.InfoHash, forSe return } -func (s *store) countPeers(ctx context.Context, ih []byte) (seeders uint32, leechers uint32) { +func (s *store) countPeers(ctx context.Context, ih []byte) (seeders uint32, leechers uint32, err error) { var rows pgx.Rows - var err error if len(ih) == 0 { rows, err = s.Query(ctx, s.Peer.CountQuery) } else { rows, err = s.Query(ctx, s.Peer.CountQuery+" "+s.Peer.ByInfoHashClause, pgx.NamedArgs{pInfoHash: ih}) } - if err == nil { + if err = noResultErr(err); err == nil { defer rows.Close() if rows.Next() { si, li := -1, -1 @@ -547,26 +557,23 @@ func (s *store) countPeers(ctx context.Context, ih []byte) (seeders uint32, leec into := make([]any, mi+1) into[si], into[li] = &seeders, &leechers - err = rows.Scan(into...) + err = noResultErr(rows.Scan(into...)) } } } - if err != nil { - logger.Error().Err(err).Hex("infoHash", ih).Msg("unable to get peers count") - } return } -func (s *store) ScrapeSwarm(ctx context.Context, ih bittorrent.InfoHash) (leechers uint32, seeders uint32, snatched uint32) { +func (s *store) ScrapeSwarm(ctx context.Context, ih bittorrent.InfoHash) (leechers uint32, seeders uint32, snatched uint32, err error) { logger.Trace(). Stringer("infoHash", ih). Msg("scrape swarm") ihb := []byte(ih) - seeders, leechers = s.countPeers(ctx, ihb) + if seeders, leechers, err = s.countPeers(ctx, ihb); err != nil { + return + } if len(s.Downloads.GetQuery) > 0 { - if err := s.QueryRow(ctx, s.Downloads.GetQuery, pgx.NamedArgs{pInfoHash: ihb}).Scan(&snatched); err != nil && !errors.Is(err, pgx.ErrNoRows) { - logger.Error().Stringer("infoHash", ih).Err(err).Msg("error occurred while get info downloads count") - } + err = noResultErr(s.QueryRow(ctx, s.Downloads.GetQuery, pgx.NamedArgs{pInfoHash: ihb}).Scan(&snatched)) } return diff --git a/storage/redis/storage.go b/storage/redis/storage.go index 7d07cf6..f46ffbf 100644 --- a/storage/redis/storage.go +++ b/storage/redis/storage.go @@ -294,7 +294,7 @@ func (ps *store) count(key string, getLength bool) (n uint64) { } else { n, err = ps.Get(context.Background(), key).Uint64() } - err = AsNil(err) + err = NoResultErr(err) if err != nil { logger.Error().Err(err).Str("key", key).Msg("GET/SCARD failure") } @@ -322,9 +322,9 @@ func (ps *store) tx(ctx context.Context, txf func(tx redis.Pipeliner) error) (er return } -// AsNil returns nil if provided err is redis.Nil +// NoResultErr returns nil if provided err is redis.Nil // otherwise returns err -func AsNil(err error) error { +func NoResultErr(err error) error { if err == nil || errors.Is(err, redis.Nil) { return nil } @@ -377,7 +377,7 @@ func (ps *store) delPeer(ctx context.Context, infoHashKey, peerCountKey, peerID Str("peerID", peerID). Msg("del peer") deleted, err := ps.HDel(ctx, infoHashKey, peerID).Uint64() - err = AsNil(err) + err = NoResultErr(err) if err == nil { if deleted == 0 { err = storage.ErrResourceDoesNotExist @@ -416,7 +416,7 @@ func (ps *store) GraduateLeecher(ctx context.Context, ih bittorrent.InfoHash, pe return ps.tx(ctx, func(tx redis.Pipeliner) error { deleted, err := tx.HDel(ctx, ihLeecherKey, peerID).Uint64() - err = AsNil(err) + err = NoResultErr(err) if err == nil { if deleted > 0 { err = tx.Decr(ctx, CountLeecherKey).Err() @@ -441,7 +441,7 @@ func (ps *store) GraduateLeecher(ctx context.Context, ih bittorrent.InfoHash, pe func (ps *Connection) parsePeersList(peersResult *redis.StringSliceCmd) (peers []bittorrent.Peer, err error) { var peerIds []string peerIds, err = peersResult.Result() - if err = AsNil(err); err == nil { + if err = NoResultErr(err); err == nil { for _, peerID := range peerIds { if p, err := bittorrent.NewPeer(peerID); err == nil { peers = append(peers, p) @@ -508,33 +508,38 @@ func (ps *store) AnnouncePeers(ctx context.Context, ih bittorrent.InfoHash, forS type getPeerCountFn func(context.Context, string) *redis.IntCmd -func (ps *Connection) countPeers(ctx context.Context, infoHashKey string, countFn getPeerCountFn) uint32 { - count, err := countFn(ctx, infoHashKey).Result() - err = AsNil(err) - if err != nil { - logger.Error().Err(err).Str("infoHashKey", infoHashKey).Msg("key size calculation failure") - } - return uint32(count) -} - // ScrapeIH calls provided countFn and returns seeders, leechers and downloads count for specified info hash -func (ps *Connection) ScrapeIH(ctx context.Context, ih bittorrent.InfoHash, countFn getPeerCountFn) (leechersCount, seedersCount, downloadsCount uint32) { +func (ps *Connection) ScrapeIH(ctx context.Context, ih bittorrent.InfoHash, countFn getPeerCountFn) ( + leechersCount, seedersCount, downloadsCount uint32, err error, +) { infoHash := ih.RawString() + var lc4, lc6, sc4, sc6, dc int64 - leechersCount = ps.countPeers(ctx, InfoHashKey(infoHash, false, false), countFn) + - ps.countPeers(ctx, InfoHashKey(infoHash, false, true), countFn) - seedersCount = ps.countPeers(ctx, InfoHashKey(infoHash, true, false), countFn) + - ps.countPeers(ctx, InfoHashKey(infoHash, true, true), countFn) - d, err := ps.HGet(ctx, CountDownloadsKey, infoHash).Uint64() - if err = AsNil(err); err != nil { - logger.Error().Err(err).Str("infoHash", infoHash).Msg("downloads count calculation failure") + lc4, err = countFn(ctx, InfoHashKey(infoHash, false, false)).Result() + if err = NoResultErr(err); err != nil { + return } - downloadsCount = uint32(d) - + lc6, err = countFn(ctx, InfoHashKey(infoHash, false, true)).Result() + if err = NoResultErr(err); err != nil { + return + } + sc4, err = countFn(ctx, InfoHashKey(infoHash, true, false)).Result() + if err = NoResultErr(err); err != nil { + return + } + sc6, err = countFn(ctx, InfoHashKey(infoHash, true, true)).Result() + if err = NoResultErr(err); err != nil { + return + } + dc, err = ps.HGet(ctx, CountDownloadsKey, infoHash).Int64() + if err = NoResultErr(err); err != nil { + return + } + leechersCount, seedersCount, downloadsCount = uint32(lc4+lc6), uint32(sc4+sc6), uint32(dc) return } -func (ps *store) ScrapeSwarm(ctx context.Context, ih bittorrent.InfoHash) (uint32, uint32, uint32) { +func (ps *store) ScrapeSwarm(ctx context.Context, ih bittorrent.InfoHash) (uint32, uint32, uint32, error) { logger.Trace(). Stringer("infoHash", ih). Msg("scrape swarm") @@ -572,7 +577,7 @@ func (ps *Connection) Put(ctx context.Context, storeCtx string, values ...storag // Contains - storage.DataStorage implementation func (ps *Connection) Contains(ctx context.Context, storeCtx string, key string) (bool, error) { exist, err := ps.HExists(ctx, PrefixKey+storeCtx, key).Result() - return exist, AsNil(err) + return exist, NoResultErr(err) } // Load - storage.DataStorage implementation @@ -587,12 +592,12 @@ func (ps *Connection) Load(ctx context.Context, storeCtx string, key string) (v // Delete - storage.DataStorage implementation func (ps *Connection) Delete(ctx context.Context, storeCtx string, keys ...string) (err error) { if len(keys) > 0 { - err = AsNil(ps.HDel(ctx, PrefixKey+storeCtx, keys...).Err()) + err = NoResultErr(ps.HDel(ctx, PrefixKey+storeCtx, keys...).Err()) if err != nil { if strings.Contains(err.Error(), argNumErrorMsg) { logger.Warn().Msg("This Redis version/implementation does not support variadic arguments for HDEL") for _, k := range keys { - if err = AsNil(ps.HDel(ctx, PrefixKey+storeCtx, k).Err()); err != nil { + if err = NoResultErr(ps.HDel(ctx, PrefixKey+storeCtx, k).Err()); err != nil { break } } @@ -661,7 +666,7 @@ func (ps *store) gc(cutoff time.Time) { cutoffNanos := cutoff.UnixNano() // list all infoHashKeys in the group infoHashKeys, err := ps.SMembers(context.Background(), IHKey).Result() - err = AsNil(err) + err = NoResultErr(err) if err == nil { for _, infoHashKey := range infoHashKeys { var cntKey string @@ -676,7 +681,7 @@ func (ps *store) gc(cutoff time.Time) { } // list all (peer, timeout) pairs for the ih peerList, err := ps.HGetAll(context.Background(), infoHashKey).Result() - err = AsNil(err) + err = NoResultErr(err) if err == nil { peersToRemove := make([]string, 0) for peerID, timeStamp := range peerList { @@ -695,13 +700,13 @@ func (ps *store) gc(cutoff time.Time) { } if len(peersToRemove) > 0 { removedPeerCount, err := ps.HDel(context.Background(), infoHashKey, peersToRemove...).Result() - err = AsNil(err) + err = NoResultErr(err) if err != nil { if strings.Contains(err.Error(), argNumErrorMsg) { logger.Warn().Msg("This Redis version/implementation does not support variadic arguments for HDEL") for _, k := range peersToRemove { count, err := ps.HDel(context.Background(), infoHashKey, k).Result() - err = AsNil(err) + err = NoResultErr(err) if err != nil { logger.Error().Err(err). Str("infoHashKey", infoHashKey). @@ -728,14 +733,14 @@ func (ps *store) gc(cutoff time.Time) { } } - err = AsNil(ps.Watch(context.Background(), func(tx *redis.Tx) (err error) { + err = NoResultErr(ps.Watch(context.Background(), func(tx *redis.Tx) (err error) { var infoHashCount uint64 infoHashCount, err = ps.HLen(context.Background(), infoHashKey).Uint64() - err = AsNil(err) + err = NoResultErr(err) if err == nil && infoHashCount == 0 { // Empty hashes are not shown among existing keys, // in other words, it's removed automatically after `HDEL` the last field. - err = AsNil(ps.SRem(context.Background(), IHKey, infoHashKey).Err()) + err = NoResultErr(ps.SRem(context.Background(), IHKey, infoHashKey).Err()) } return err }, infoHashKey)) diff --git a/storage/storage.go b/storage/storage.go index 7a4d0ae..841559c 100644 --- a/storage/storage.go +++ b/storage/storage.go @@ -185,7 +185,7 @@ type PeerStorage interface { // filling the Snatches field is optional. // // If the Swarm does not exist, an empty Scrape and no error is returned. - ScrapeSwarm(ctx context.Context, ih bittorrent.InfoHash) (leechers uint32, seeders uint32, snatched uint32) + ScrapeSwarm(ctx context.Context, ih bittorrent.InfoHash) (leechers uint32, seeders uint32, snatched uint32, err error) // Ping used for checks if storage is alive // (connection could be established, enough space etc.) diff --git a/storage/test/storage_bench.go b/storage/test/storage_bench.go index b86c6f7..64274a3 100644 --- a/storage/test/storage_bench.go +++ b/storage/test/storage_bench.go @@ -451,7 +451,7 @@ func (bh *benchHolder) AnnounceSeeder1kInfoHash(b *testing.B) { // ScrapeSwarm can run in parallel. func (bh *benchHolder) ScrapeSwarm(b *testing.B) { bh.runBenchmark(b, true, putPeers, func(i int, ps storage.PeerStorage, bd *benchData) error { - ps.ScrapeSwarm(context.TODO(), bd.infoHashes[0]) + _, _, _, _ = ps.ScrapeSwarm(context.TODO(), bd.infoHashes[0]) return nil }) } @@ -461,7 +461,7 @@ func (bh *benchHolder) ScrapeSwarm(b *testing.B) { // ScrapeSwarm1kInfoHash can run in parallel. func (bh *benchHolder) ScrapeSwarm1kInfoHash(b *testing.B) { bh.runBenchmark(b, true, putPeers, func(i int, ps storage.PeerStorage, bd *benchData) error { - ps.ScrapeSwarm(context.TODO(), bd.infoHashes[i%ihCount]) + _, _, _, _ = ps.ScrapeSwarm(context.TODO(), bd.infoHashes[i%ihCount]) return nil }) } diff --git a/storage/test/storage_test_base.go b/storage/test/storage_test_base.go index a360965..464d513 100644 --- a/storage/test/storage_test_base.go +++ b/storage/test/storage_test_base.go @@ -82,7 +82,7 @@ func (th *testHolder) AnnouncePeers(t *testing.T) { func (th *testHolder) ScrapeSwarm(t *testing.T) { for _, c := range testData { - l, s, n := th.st.ScrapeSwarm(context.TODO(), c.ih) + l, s, n, _ := th.st.ScrapeSwarm(context.TODO(), c.ih) require.Equal(t, uint32(0), s) require.Equal(t, uint32(0), l) require.Equal(t, uint32(0), n) @@ -104,7 +104,7 @@ func (th *testHolder) LeecherPutAnnounceDeleteAnnounce(t *testing.T) { require.Nil(t, err) require.True(t, containsPeer(peers, c.peer)) - l, s, _ := th.st.ScrapeSwarm(context.TODO(), c.ih) + l, s, _, _ := th.st.ScrapeSwarm(context.TODO(), c.ih) require.Equal(t, uint32(2), l) require.Equal(t, uint32(0), s) @@ -131,7 +131,7 @@ func (th *testHolder) SeederPutAnnounceDeleteAnnounce(t *testing.T) { require.Nil(t, err) require.True(t, containsPeer(peers, c.peer)) - l, s, _ := th.st.ScrapeSwarm(context.TODO(), c.ih) + l, s, _, _ := th.st.ScrapeSwarm(context.TODO(), c.ih) require.Equal(t, uint32(1), l) require.Equal(t, uint32(1), s)