mirror of
https://github.com/sot-tech/mochi.git
synced 2026-04-26 23:50:00 -07:00
Merge pull request #34 from sot-tech/ctxCancel
Suppress `context canceled` error
This commit is contained in:
@@ -3,6 +3,7 @@
|
||||
package http
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/tls"
|
||||
"errors"
|
||||
"net/http"
|
||||
@@ -249,23 +250,27 @@ func (f *httpFE) announceRoute(reqCtx *fasthttp.RequestCtx) {
|
||||
ctx := bittorrent.InjectRouteParamsToContext(reqCtx, nil)
|
||||
ctx, aResp, err := f.logic.HandleAnnounce(ctx, aReq)
|
||||
if err != nil {
|
||||
writeErrorResponse(reqCtx, err)
|
||||
if !errors.Is(err, context.Canceled) {
|
||||
writeErrorResponse(reqCtx, err)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
reqCtx.Response.Header.Set("Content-Type", "text/plain; charset=utf-8")
|
||||
qArgs := reqCtx.QueryArgs()
|
||||
// `compact` means that tracker should return addresses in
|
||||
// binary (single concatenated string) mode instead of dictionary.
|
||||
// `no_peer_id` means, that tracker may omit PeerID field in response dictionary.
|
||||
// see https://wiki.theory.org/BitTorrentSpecification#Tracker_Request_Parameters
|
||||
writeAnnounceResponse(reqCtx, aResp, qArgs.GetBool("compact"), !qArgs.GetBool("no_peer_id"))
|
||||
if err = reqCtx.Err(); err == nil {
|
||||
reqCtx.Response.Header.Set("Content-Type", "text/plain; charset=utf-8")
|
||||
qArgs := reqCtx.QueryArgs()
|
||||
// `compact` means that tracker should return addresses in
|
||||
// binary (single concatenated string) mode instead of dictionary.
|
||||
// `no_peer_id` means, that tracker may omit PeerID field in response dictionary.
|
||||
// see https://wiki.theory.org/BitTorrentSpecification#Tracker_Request_Parameters
|
||||
writeAnnounceResponse(reqCtx, aResp, qArgs.GetBool("compact"), !qArgs.GetBool("no_peer_id"))
|
||||
|
||||
// next actions are background and should not be canceled after http writer closed
|
||||
ctx = bittorrent.RemapRouteParamsToBgContext(ctx)
|
||||
// params mapped from fasthttp.QueryArgs will be reused in the next request
|
||||
aReq.Params = nil
|
||||
go f.logic.AfterAnnounce(ctx, aReq, aResp)
|
||||
// next actions are background and should not be canceled after http writer closed
|
||||
ctx = bittorrent.RemapRouteParamsToBgContext(ctx)
|
||||
// params mapped from fasthttp.QueryArgs will be reused in the next request
|
||||
aReq.Params = nil
|
||||
go f.logic.AfterAnnounce(ctx, aReq, aResp)
|
||||
}
|
||||
}
|
||||
|
||||
// scrapeRoute parses and responds to a Scrape.
|
||||
@@ -290,32 +295,35 @@ func (f *httpFE) scrapeRoute(reqCtx *fasthttp.RequestCtx) {
|
||||
ctx := bittorrent.InjectRouteParamsToContext(reqCtx, nil)
|
||||
ctx, resp, err := f.logic.HandleScrape(ctx, req)
|
||||
if err != nil {
|
||||
writeErrorResponse(reqCtx, err)
|
||||
if !errors.Is(err, context.Canceled) {
|
||||
writeErrorResponse(reqCtx, err)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
reqCtx.Response.Header.Set("Content-Type", "text/plain; charset=utf-8")
|
||||
writeScrapeResponse(reqCtx, resp)
|
||||
if err = reqCtx.Err(); err == nil {
|
||||
reqCtx.SetContentType("text/plain; charset=utf-8")
|
||||
writeScrapeResponse(reqCtx, resp)
|
||||
|
||||
// next actions are background and should not be canceled after http writer closed
|
||||
ctx = bittorrent.RemapRouteParamsToBgContext(ctx)
|
||||
// params mapped from fasthttp.QueryArgs will in the next request
|
||||
req.Params = nil
|
||||
go f.logic.AfterScrape(ctx, req, resp)
|
||||
// next actions are background and should not be canceled after http writer closed
|
||||
ctx = bittorrent.RemapRouteParamsToBgContext(ctx)
|
||||
// params mapped from fasthttp.QueryArgs will in the next request
|
||||
req.Params = nil
|
||||
go f.logic.AfterScrape(ctx, req, resp)
|
||||
}
|
||||
}
|
||||
|
||||
func (f *httpFE) ping(ctx *fasthttp.RequestCtx) {
|
||||
var err error
|
||||
status := http.StatusOK
|
||||
err = f.logic.Ping(ctx)
|
||||
|
||||
err := f.logic.Ping(ctx)
|
||||
if err != nil {
|
||||
if errors.Is(err, context.Canceled) {
|
||||
return
|
||||
}
|
||||
logger.Error().Err(err).Msg("ping completed with error")
|
||||
status = http.StatusServiceUnavailable
|
||||
}
|
||||
if ctxErr := ctx.Err(); ctxErr == nil {
|
||||
if err = ctx.Err(); err == nil {
|
||||
ctx.SetStatusCode(status)
|
||||
} else {
|
||||
logger.Info().Err(ctxErr).Stringer("addr", ctx.RemoteAddr()).Msg("ping request cancelled")
|
||||
}
|
||||
}
|
||||
|
||||
@@ -298,14 +298,18 @@ func (f *udpFE) handleRequest(ctx context.Context, r Request, w ResponseWriter)
|
||||
ctx := bittorrent.InjectRouteParamsToContext(ctx, bittorrent.RouteParams{})
|
||||
ctx, resp, err = f.logic.HandleAnnounce(ctx, req)
|
||||
if err != nil {
|
||||
writeErrorResponse(w, txID, err)
|
||||
if !errors.Is(err, context.Canceled) {
|
||||
writeErrorResponse(w, txID, err)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
writeAnnounceResponse(w, txID, resp, actionID == announceV6ActionID, r.IP.Is6())
|
||||
if err = ctx.Err(); err == nil {
|
||||
writeAnnounceResponse(w, txID, resp, actionID == announceV6ActionID, r.IP.Is6())
|
||||
|
||||
ctx = bittorrent.RemapRouteParamsToBgContext(ctx)
|
||||
go f.logic.AfterAnnounce(ctx, req, resp)
|
||||
ctx = bittorrent.RemapRouteParamsToBgContext(ctx)
|
||||
go f.logic.AfterAnnounce(ctx, req, resp)
|
||||
}
|
||||
|
||||
case scrapeActionID:
|
||||
actionName = "scrape"
|
||||
@@ -321,14 +325,18 @@ func (f *udpFE) handleRequest(ctx context.Context, r Request, w ResponseWriter)
|
||||
ctx := bittorrent.InjectRouteParamsToContext(ctx, bittorrent.RouteParams{})
|
||||
ctx, resp, err = f.logic.HandleScrape(ctx, req)
|
||||
if err != nil {
|
||||
writeErrorResponse(w, txID, err)
|
||||
if !errors.Is(err, context.Canceled) {
|
||||
writeErrorResponse(w, txID, err)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
writeScrapeResponse(w, txID, resp)
|
||||
if err = ctx.Err(); err == nil {
|
||||
writeScrapeResponse(w, txID, resp)
|
||||
|
||||
ctx = bittorrent.RemapRouteParamsToBgContext(ctx)
|
||||
go f.logic.AfterScrape(ctx, req, resp)
|
||||
ctx = bittorrent.RemapRouteParamsToBgContext(ctx)
|
||||
go f.logic.AfterScrape(ctx, req, resp)
|
||||
}
|
||||
|
||||
default:
|
||||
err = errUnknownAction
|
||||
|
||||
@@ -102,10 +102,17 @@ type responseHook struct {
|
||||
store storage.PeerStorage
|
||||
}
|
||||
|
||||
func (h *responseHook) scrape(ctx context.Context, ih bittorrent.InfoHash) (leechers uint32, seeders uint32, snatched uint32) {
|
||||
leechers, seeders, snatched = h.store.ScrapeSwarm(ctx, ih)
|
||||
func (h *responseHook) scrape(ctx context.Context, ih bittorrent.InfoHash) (leechers uint32, seeders uint32, snatched uint32, err error) {
|
||||
leechers, seeders, snatched, err = h.store.ScrapeSwarm(ctx, ih)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
if len(ih) == bittorrent.InfoHashV2Len {
|
||||
l, s, n := h.store.ScrapeSwarm(ctx, ih.TruncateV1())
|
||||
var l, s, n uint32
|
||||
l, s, n, err = h.store.ScrapeSwarm(ctx, ih.TruncateV1())
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
leechers, seeders, snatched = leechers+l, seeders+s, snatched+n
|
||||
}
|
||||
return
|
||||
@@ -117,7 +124,10 @@ func (h *responseHook) HandleAnnounce(ctx context.Context, req *bittorrent.Annou
|
||||
}
|
||||
|
||||
// Add the Scrape data to the response.
|
||||
resp.Incomplete, resp.Complete, _ = h.scrape(ctx, req.InfoHash)
|
||||
resp.Incomplete, resp.Complete, _, err = h.scrape(ctx, req.InfoHash)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
err = h.appendPeers(ctx, req, resp)
|
||||
return ctx, err
|
||||
@@ -202,14 +212,17 @@ func (h *responseHook) appendPeers(ctx context.Context, req *bittorrent.Announce
|
||||
return
|
||||
}
|
||||
|
||||
func (h *responseHook) HandleScrape(ctx context.Context, req *bittorrent.ScrapeRequest, resp *bittorrent.ScrapeResponse) (context.Context, error) {
|
||||
func (h *responseHook) HandleScrape(ctx context.Context, req *bittorrent.ScrapeRequest, resp *bittorrent.ScrapeResponse) (_ context.Context, err error) {
|
||||
if ctx.Value(SkipResponseHookKey) != nil {
|
||||
return ctx, nil
|
||||
}
|
||||
|
||||
for _, infoHash := range req.InfoHashes {
|
||||
scr := bittorrent.Scrape{InfoHash: infoHash}
|
||||
scr.Incomplete, scr.Complete, scr.Snatches = h.scrape(ctx, infoHash)
|
||||
scr.Incomplete, scr.Complete, scr.Snatches, err = h.scrape(ctx, infoHash)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
resp.Files = append(resp.Files, scr)
|
||||
}
|
||||
|
||||
|
||||
@@ -70,7 +70,7 @@ func newStore(cfg r.Config) (*store, error) {
|
||||
|
||||
cmd := redis.NewCommandsInfoCmd(context.Background(), "COMMAND", "INFO", expireMemberCmd)
|
||||
_ = rs.Process(context.Background(), cmd)
|
||||
err = r.AsNil(cmd.Err())
|
||||
err = r.NoResultErr(cmd.Err())
|
||||
if err == nil && len(cmd.Val()) == 0 {
|
||||
err = errNotKeyDB
|
||||
}
|
||||
@@ -105,7 +105,7 @@ func (s *store) delPeer(ctx context.Context, infoHashKey, peerID string) error {
|
||||
Str("peerID", peerID).
|
||||
Msg("del peer")
|
||||
deleted, err := s.SRem(ctx, infoHashKey, peerID).Uint64()
|
||||
err = r.AsNil(err)
|
||||
err = r.NoResultErr(err)
|
||||
if err == nil && deleted == 0 {
|
||||
err = storage.ErrResourceDoesNotExist
|
||||
}
|
||||
@@ -166,7 +166,7 @@ func (s *store) AnnouncePeers(ctx context.Context, ih bittorrent.InfoHash, forSe
|
||||
}
|
||||
|
||||
// ScrapeSwarm is the same function as redis.ScrapeSwarm except `SCard` call instead of `HLen`
|
||||
func (s *store) ScrapeSwarm(ctx context.Context, ih bittorrent.InfoHash) (uint32, uint32, uint32) {
|
||||
func (s *store) ScrapeSwarm(ctx context.Context, ih bittorrent.InfoHash) (uint32, uint32, uint32, error) {
|
||||
logger.Trace().
|
||||
Stringer("infoHash", ih).
|
||||
Msg("scrape swarm")
|
||||
|
||||
@@ -436,7 +436,7 @@ func (ps *peerStore) countPeers(ih bittorrent.InfoHash, v6 bool) (leechers, seed
|
||||
return
|
||||
}
|
||||
|
||||
func (ps *peerStore) ScrapeSwarm(_ context.Context, ih bittorrent.InfoHash) (leechers uint32, seeders uint32, snatched uint32) {
|
||||
func (ps *peerStore) ScrapeSwarm(_ context.Context, ih bittorrent.InfoHash) (leechers uint32, seeders uint32, snatched uint32, _ error) {
|
||||
select {
|
||||
case <-ps.closed:
|
||||
panic("attempted to interact with stopped memory store")
|
||||
|
||||
@@ -64,6 +64,15 @@ func builder(icfg conf.MapConfig) (storage.PeerStorage, error) {
|
||||
return newStore(cfg)
|
||||
}
|
||||
|
||||
// noResultErr returns nil if provided err is pgx.ErrNoRows
|
||||
// otherwise returns err
|
||||
func noResultErr(err error) error {
|
||||
if err == nil || errors.Is(err, pgx.ErrNoRows) {
|
||||
return nil
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func newStore(cfg Config) (storage.PeerStorage, error) {
|
||||
cfg, err := cfg.Validate()
|
||||
if err != nil {
|
||||
@@ -155,7 +164,7 @@ func (cfg Config) Validate() (Config, error) {
|
||||
return cfg, err
|
||||
}
|
||||
|
||||
if err := fn(&validCfg.Peer.DelQuery, "peer.aelQuery"); err != nil {
|
||||
if err := fn(&validCfg.Peer.DelQuery, "peer.delQuery"); err != nil {
|
||||
return cfg, err
|
||||
}
|
||||
|
||||
@@ -266,9 +275,7 @@ func (s *store) Contains(ctx context.Context, storeCtx string, key string) (cont
|
||||
}
|
||||
|
||||
func (s *store) Load(ctx context.Context, storeCtx string, key string) (out []byte, err error) {
|
||||
if err = s.QueryRow(ctx, s.Data.GetQuery, pgx.NamedArgs{pCtx: storeCtx, pKey: []byte(key)}).Scan(&out); errors.Is(err, pgx.ErrNoRows) {
|
||||
err = nil
|
||||
}
|
||||
err = noResultErr(s.QueryRow(ctx, s.Data.GetQuery, pgx.NamedArgs{pCtx: storeCtx, pKey: []byte(key)}).Scan(&out))
|
||||
return
|
||||
}
|
||||
|
||||
@@ -332,9 +339,13 @@ func (s *store) ScheduleStatisticsCollection(reportInterval time.Duration) {
|
||||
case <-t.C:
|
||||
if metrics.Enabled() {
|
||||
before := time.Now()
|
||||
sc, lc := s.countPeers(context.Background(), nil)
|
||||
sc, lc, err := s.countPeers(context.Background(), nil)
|
||||
if err = noResultErr(err); err != nil {
|
||||
logger.Error().Err(err).Msg("error occurred while get peers count count")
|
||||
}
|
||||
var hc int
|
||||
if err := s.QueryRow(context.Background(), s.InfoHashCountQuery).Scan(&hc); err != nil && !errors.Is(err, pgx.ErrNoRows) {
|
||||
err = s.QueryRow(context.Background(), s.InfoHashCountQuery).Scan(&hc)
|
||||
if err = noResultErr(err); err != nil {
|
||||
logger.Error().Err(err).Msg("error occurred while get info hash count")
|
||||
}
|
||||
|
||||
@@ -514,15 +525,14 @@ func (s *store) AnnouncePeers(ctx context.Context, ih bittorrent.InfoHash, forSe
|
||||
return
|
||||
}
|
||||
|
||||
func (s *store) countPeers(ctx context.Context, ih []byte) (seeders uint32, leechers uint32) {
|
||||
func (s *store) countPeers(ctx context.Context, ih []byte) (seeders uint32, leechers uint32, err error) {
|
||||
var rows pgx.Rows
|
||||
var err error
|
||||
if len(ih) == 0 {
|
||||
rows, err = s.Query(ctx, s.Peer.CountQuery)
|
||||
} else {
|
||||
rows, err = s.Query(ctx, s.Peer.CountQuery+" "+s.Peer.ByInfoHashClause, pgx.NamedArgs{pInfoHash: ih})
|
||||
}
|
||||
if err == nil {
|
||||
if err = noResultErr(err); err == nil {
|
||||
defer rows.Close()
|
||||
if rows.Next() {
|
||||
si, li := -1, -1
|
||||
@@ -547,26 +557,23 @@ func (s *store) countPeers(ctx context.Context, ih []byte) (seeders uint32, leec
|
||||
into := make([]any, mi+1)
|
||||
into[si], into[li] = &seeders, &leechers
|
||||
|
||||
err = rows.Scan(into...)
|
||||
err = noResultErr(rows.Scan(into...))
|
||||
}
|
||||
}
|
||||
}
|
||||
if err != nil {
|
||||
logger.Error().Err(err).Hex("infoHash", ih).Msg("unable to get peers count")
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (s *store) ScrapeSwarm(ctx context.Context, ih bittorrent.InfoHash) (leechers uint32, seeders uint32, snatched uint32) {
|
||||
func (s *store) ScrapeSwarm(ctx context.Context, ih bittorrent.InfoHash) (leechers uint32, seeders uint32, snatched uint32, err error) {
|
||||
logger.Trace().
|
||||
Stringer("infoHash", ih).
|
||||
Msg("scrape swarm")
|
||||
ihb := []byte(ih)
|
||||
seeders, leechers = s.countPeers(ctx, ihb)
|
||||
if seeders, leechers, err = s.countPeers(ctx, ihb); err != nil {
|
||||
return
|
||||
}
|
||||
if len(s.Downloads.GetQuery) > 0 {
|
||||
if err := s.QueryRow(ctx, s.Downloads.GetQuery, pgx.NamedArgs{pInfoHash: ihb}).Scan(&snatched); err != nil && !errors.Is(err, pgx.ErrNoRows) {
|
||||
logger.Error().Stringer("infoHash", ih).Err(err).Msg("error occurred while get info downloads count")
|
||||
}
|
||||
err = noResultErr(s.QueryRow(ctx, s.Downloads.GetQuery, pgx.NamedArgs{pInfoHash: ihb}).Scan(&snatched))
|
||||
}
|
||||
|
||||
return
|
||||
|
||||
@@ -294,7 +294,7 @@ func (ps *store) count(key string, getLength bool) (n uint64) {
|
||||
} else {
|
||||
n, err = ps.Get(context.Background(), key).Uint64()
|
||||
}
|
||||
err = AsNil(err)
|
||||
err = NoResultErr(err)
|
||||
if err != nil {
|
||||
logger.Error().Err(err).Str("key", key).Msg("GET/SCARD failure")
|
||||
}
|
||||
@@ -322,9 +322,9 @@ func (ps *store) tx(ctx context.Context, txf func(tx redis.Pipeliner) error) (er
|
||||
return
|
||||
}
|
||||
|
||||
// AsNil returns nil if provided err is redis.Nil
|
||||
// NoResultErr returns nil if provided err is redis.Nil
|
||||
// otherwise returns err
|
||||
func AsNil(err error) error {
|
||||
func NoResultErr(err error) error {
|
||||
if err == nil || errors.Is(err, redis.Nil) {
|
||||
return nil
|
||||
}
|
||||
@@ -377,7 +377,7 @@ func (ps *store) delPeer(ctx context.Context, infoHashKey, peerCountKey, peerID
|
||||
Str("peerID", peerID).
|
||||
Msg("del peer")
|
||||
deleted, err := ps.HDel(ctx, infoHashKey, peerID).Uint64()
|
||||
err = AsNil(err)
|
||||
err = NoResultErr(err)
|
||||
if err == nil {
|
||||
if deleted == 0 {
|
||||
err = storage.ErrResourceDoesNotExist
|
||||
@@ -416,7 +416,7 @@ func (ps *store) GraduateLeecher(ctx context.Context, ih bittorrent.InfoHash, pe
|
||||
|
||||
return ps.tx(ctx, func(tx redis.Pipeliner) error {
|
||||
deleted, err := tx.HDel(ctx, ihLeecherKey, peerID).Uint64()
|
||||
err = AsNil(err)
|
||||
err = NoResultErr(err)
|
||||
if err == nil {
|
||||
if deleted > 0 {
|
||||
err = tx.Decr(ctx, CountLeecherKey).Err()
|
||||
@@ -441,7 +441,7 @@ func (ps *store) GraduateLeecher(ctx context.Context, ih bittorrent.InfoHash, pe
|
||||
func (ps *Connection) parsePeersList(peersResult *redis.StringSliceCmd) (peers []bittorrent.Peer, err error) {
|
||||
var peerIds []string
|
||||
peerIds, err = peersResult.Result()
|
||||
if err = AsNil(err); err == nil {
|
||||
if err = NoResultErr(err); err == nil {
|
||||
for _, peerID := range peerIds {
|
||||
if p, err := bittorrent.NewPeer(peerID); err == nil {
|
||||
peers = append(peers, p)
|
||||
@@ -508,33 +508,38 @@ func (ps *store) AnnouncePeers(ctx context.Context, ih bittorrent.InfoHash, forS
|
||||
|
||||
type getPeerCountFn func(context.Context, string) *redis.IntCmd
|
||||
|
||||
func (ps *Connection) countPeers(ctx context.Context, infoHashKey string, countFn getPeerCountFn) uint32 {
|
||||
count, err := countFn(ctx, infoHashKey).Result()
|
||||
err = AsNil(err)
|
||||
if err != nil {
|
||||
logger.Error().Err(err).Str("infoHashKey", infoHashKey).Msg("key size calculation failure")
|
||||
}
|
||||
return uint32(count)
|
||||
}
|
||||
|
||||
// ScrapeIH calls provided countFn and returns seeders, leechers and downloads count for specified info hash
|
||||
func (ps *Connection) ScrapeIH(ctx context.Context, ih bittorrent.InfoHash, countFn getPeerCountFn) (leechersCount, seedersCount, downloadsCount uint32) {
|
||||
func (ps *Connection) ScrapeIH(ctx context.Context, ih bittorrent.InfoHash, countFn getPeerCountFn) (
|
||||
leechersCount, seedersCount, downloadsCount uint32, err error,
|
||||
) {
|
||||
infoHash := ih.RawString()
|
||||
var lc4, lc6, sc4, sc6, dc int64
|
||||
|
||||
leechersCount = ps.countPeers(ctx, InfoHashKey(infoHash, false, false), countFn) +
|
||||
ps.countPeers(ctx, InfoHashKey(infoHash, false, true), countFn)
|
||||
seedersCount = ps.countPeers(ctx, InfoHashKey(infoHash, true, false), countFn) +
|
||||
ps.countPeers(ctx, InfoHashKey(infoHash, true, true), countFn)
|
||||
d, err := ps.HGet(ctx, CountDownloadsKey, infoHash).Uint64()
|
||||
if err = AsNil(err); err != nil {
|
||||
logger.Error().Err(err).Str("infoHash", infoHash).Msg("downloads count calculation failure")
|
||||
lc4, err = countFn(ctx, InfoHashKey(infoHash, false, false)).Result()
|
||||
if err = NoResultErr(err); err != nil {
|
||||
return
|
||||
}
|
||||
downloadsCount = uint32(d)
|
||||
|
||||
lc6, err = countFn(ctx, InfoHashKey(infoHash, false, true)).Result()
|
||||
if err = NoResultErr(err); err != nil {
|
||||
return
|
||||
}
|
||||
sc4, err = countFn(ctx, InfoHashKey(infoHash, true, false)).Result()
|
||||
if err = NoResultErr(err); err != nil {
|
||||
return
|
||||
}
|
||||
sc6, err = countFn(ctx, InfoHashKey(infoHash, true, true)).Result()
|
||||
if err = NoResultErr(err); err != nil {
|
||||
return
|
||||
}
|
||||
dc, err = ps.HGet(ctx, CountDownloadsKey, infoHash).Int64()
|
||||
if err = NoResultErr(err); err != nil {
|
||||
return
|
||||
}
|
||||
leechersCount, seedersCount, downloadsCount = uint32(lc4+lc6), uint32(sc4+sc6), uint32(dc)
|
||||
return
|
||||
}
|
||||
|
||||
func (ps *store) ScrapeSwarm(ctx context.Context, ih bittorrent.InfoHash) (uint32, uint32, uint32) {
|
||||
func (ps *store) ScrapeSwarm(ctx context.Context, ih bittorrent.InfoHash) (uint32, uint32, uint32, error) {
|
||||
logger.Trace().
|
||||
Stringer("infoHash", ih).
|
||||
Msg("scrape swarm")
|
||||
@@ -572,7 +577,7 @@ func (ps *Connection) Put(ctx context.Context, storeCtx string, values ...storag
|
||||
// Contains - storage.DataStorage implementation
|
||||
func (ps *Connection) Contains(ctx context.Context, storeCtx string, key string) (bool, error) {
|
||||
exist, err := ps.HExists(ctx, PrefixKey+storeCtx, key).Result()
|
||||
return exist, AsNil(err)
|
||||
return exist, NoResultErr(err)
|
||||
}
|
||||
|
||||
// Load - storage.DataStorage implementation
|
||||
@@ -587,12 +592,12 @@ func (ps *Connection) Load(ctx context.Context, storeCtx string, key string) (v
|
||||
// Delete - storage.DataStorage implementation
|
||||
func (ps *Connection) Delete(ctx context.Context, storeCtx string, keys ...string) (err error) {
|
||||
if len(keys) > 0 {
|
||||
err = AsNil(ps.HDel(ctx, PrefixKey+storeCtx, keys...).Err())
|
||||
err = NoResultErr(ps.HDel(ctx, PrefixKey+storeCtx, keys...).Err())
|
||||
if err != nil {
|
||||
if strings.Contains(err.Error(), argNumErrorMsg) {
|
||||
logger.Warn().Msg("This Redis version/implementation does not support variadic arguments for HDEL")
|
||||
for _, k := range keys {
|
||||
if err = AsNil(ps.HDel(ctx, PrefixKey+storeCtx, k).Err()); err != nil {
|
||||
if err = NoResultErr(ps.HDel(ctx, PrefixKey+storeCtx, k).Err()); err != nil {
|
||||
break
|
||||
}
|
||||
}
|
||||
@@ -661,7 +666,7 @@ func (ps *store) gc(cutoff time.Time) {
|
||||
cutoffNanos := cutoff.UnixNano()
|
||||
// list all infoHashKeys in the group
|
||||
infoHashKeys, err := ps.SMembers(context.Background(), IHKey).Result()
|
||||
err = AsNil(err)
|
||||
err = NoResultErr(err)
|
||||
if err == nil {
|
||||
for _, infoHashKey := range infoHashKeys {
|
||||
var cntKey string
|
||||
@@ -676,7 +681,7 @@ func (ps *store) gc(cutoff time.Time) {
|
||||
}
|
||||
// list all (peer, timeout) pairs for the ih
|
||||
peerList, err := ps.HGetAll(context.Background(), infoHashKey).Result()
|
||||
err = AsNil(err)
|
||||
err = NoResultErr(err)
|
||||
if err == nil {
|
||||
peersToRemove := make([]string, 0)
|
||||
for peerID, timeStamp := range peerList {
|
||||
@@ -695,13 +700,13 @@ func (ps *store) gc(cutoff time.Time) {
|
||||
}
|
||||
if len(peersToRemove) > 0 {
|
||||
removedPeerCount, err := ps.HDel(context.Background(), infoHashKey, peersToRemove...).Result()
|
||||
err = AsNil(err)
|
||||
err = NoResultErr(err)
|
||||
if err != nil {
|
||||
if strings.Contains(err.Error(), argNumErrorMsg) {
|
||||
logger.Warn().Msg("This Redis version/implementation does not support variadic arguments for HDEL")
|
||||
for _, k := range peersToRemove {
|
||||
count, err := ps.HDel(context.Background(), infoHashKey, k).Result()
|
||||
err = AsNil(err)
|
||||
err = NoResultErr(err)
|
||||
if err != nil {
|
||||
logger.Error().Err(err).
|
||||
Str("infoHashKey", infoHashKey).
|
||||
@@ -728,14 +733,14 @@ func (ps *store) gc(cutoff time.Time) {
|
||||
}
|
||||
}
|
||||
|
||||
err = AsNil(ps.Watch(context.Background(), func(tx *redis.Tx) (err error) {
|
||||
err = NoResultErr(ps.Watch(context.Background(), func(tx *redis.Tx) (err error) {
|
||||
var infoHashCount uint64
|
||||
infoHashCount, err = ps.HLen(context.Background(), infoHashKey).Uint64()
|
||||
err = AsNil(err)
|
||||
err = NoResultErr(err)
|
||||
if err == nil && infoHashCount == 0 {
|
||||
// Empty hashes are not shown among existing keys,
|
||||
// in other words, it's removed automatically after `HDEL` the last field.
|
||||
err = AsNil(ps.SRem(context.Background(), IHKey, infoHashKey).Err())
|
||||
err = NoResultErr(ps.SRem(context.Background(), IHKey, infoHashKey).Err())
|
||||
}
|
||||
return err
|
||||
}, infoHashKey))
|
||||
|
||||
@@ -185,7 +185,7 @@ type PeerStorage interface {
|
||||
// filling the Snatches field is optional.
|
||||
//
|
||||
// If the Swarm does not exist, an empty Scrape and no error is returned.
|
||||
ScrapeSwarm(ctx context.Context, ih bittorrent.InfoHash) (leechers uint32, seeders uint32, snatched uint32)
|
||||
ScrapeSwarm(ctx context.Context, ih bittorrent.InfoHash) (leechers uint32, seeders uint32, snatched uint32, err error)
|
||||
|
||||
// Ping used for checks if storage is alive
|
||||
// (connection could be established, enough space etc.)
|
||||
|
||||
@@ -449,7 +449,7 @@ func (bh *benchHolder) AnnounceSeeder1kInfoHash(b *testing.B) {
|
||||
// ScrapeSwarm can run in parallel.
|
||||
func (bh *benchHolder) ScrapeSwarm(b *testing.B) {
|
||||
bh.runBenchmark(b, true, putPeers, func(i int, ps storage.PeerStorage, bd *benchData) error {
|
||||
ps.ScrapeSwarm(context.TODO(), bd.infoHashes[0])
|
||||
_, _, _, _ = ps.ScrapeSwarm(context.TODO(), bd.infoHashes[0])
|
||||
return nil
|
||||
})
|
||||
}
|
||||
@@ -459,7 +459,7 @@ func (bh *benchHolder) ScrapeSwarm(b *testing.B) {
|
||||
// ScrapeSwarm1kInfoHash can run in parallel.
|
||||
func (bh *benchHolder) ScrapeSwarm1kInfoHash(b *testing.B) {
|
||||
bh.runBenchmark(b, true, putPeers, func(i int, ps storage.PeerStorage, bd *benchData) error {
|
||||
ps.ScrapeSwarm(context.TODO(), bd.infoHashes[i%ihCount])
|
||||
_, _, _, _ = ps.ScrapeSwarm(context.TODO(), bd.infoHashes[i%ihCount])
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
@@ -82,7 +82,7 @@ func (th *testHolder) AnnouncePeers(t *testing.T) {
|
||||
|
||||
func (th *testHolder) ScrapeSwarm(t *testing.T) {
|
||||
for _, c := range testData {
|
||||
l, s, n := th.st.ScrapeSwarm(context.TODO(), c.ih)
|
||||
l, s, n, _ := th.st.ScrapeSwarm(context.TODO(), c.ih)
|
||||
require.Equal(t, uint32(0), s)
|
||||
require.Equal(t, uint32(0), l)
|
||||
require.Equal(t, uint32(0), n)
|
||||
@@ -104,7 +104,7 @@ func (th *testHolder) LeecherPutAnnounceDeleteAnnounce(t *testing.T) {
|
||||
require.Nil(t, err)
|
||||
require.True(t, containsPeer(peers, c.peer))
|
||||
|
||||
l, s, _ := th.st.ScrapeSwarm(context.TODO(), c.ih)
|
||||
l, s, _, _ := th.st.ScrapeSwarm(context.TODO(), c.ih)
|
||||
require.Equal(t, uint32(2), l)
|
||||
require.Equal(t, uint32(0), s)
|
||||
|
||||
@@ -131,7 +131,7 @@ func (th *testHolder) SeederPutAnnounceDeleteAnnounce(t *testing.T) {
|
||||
require.Nil(t, err)
|
||||
require.True(t, containsPeer(peers, c.peer))
|
||||
|
||||
l, s, _ := th.st.ScrapeSwarm(context.TODO(), c.ih)
|
||||
l, s, _, _ := th.st.ScrapeSwarm(context.TODO(), c.ih)
|
||||
require.Equal(t, uint32(1), l)
|
||||
require.Equal(t, uint32(1), s)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user