mirror of
https://github.com/sot-tech/mochi.git
synced 2026-04-23 22:29:58 -07:00
fix lint warnings
replace naked returns with arguments (gofumpt@v0.9.1)
This commit is contained in:
2
.github/workflows/build.yaml
vendored
2
.github/workflows/build.yaml
vendored
@@ -39,7 +39,7 @@ jobs:
|
||||
--health-timeout 5s
|
||||
--health-retries 5
|
||||
minio:
|
||||
image: "lazybit/minio"
|
||||
image: "minio/minio"
|
||||
ports: [ "9000:9000" ]
|
||||
env:
|
||||
MINIO_ACCESS_KEY: "minioadmin"
|
||||
|
||||
@@ -52,7 +52,7 @@ func NewEvent(eventStr string) (evt Event, err error) {
|
||||
default:
|
||||
evt, err = None, ErrUnknownEvent
|
||||
}
|
||||
return
|
||||
return evt, err
|
||||
}
|
||||
|
||||
// String implements Stringer for an event.
|
||||
@@ -69,5 +69,5 @@ func (e Event) String() (s string) {
|
||||
default:
|
||||
s = "<unknown>"
|
||||
}
|
||||
return
|
||||
return s
|
||||
}
|
||||
|
||||
@@ -133,7 +133,7 @@ func (rp RequestPeer) Peers() (peers Peers) {
|
||||
AddrPort: netip.AddrPortFrom(a.Addr, rp.Port),
|
||||
})
|
||||
}
|
||||
return
|
||||
return peers
|
||||
}
|
||||
|
||||
// MarshalZerologObject writes fields into zerolog event
|
||||
|
||||
@@ -242,7 +242,7 @@ func sendHTTPReq(u string) (err error) {
|
||||
return errors.New(r.Status)
|
||||
}
|
||||
}
|
||||
return
|
||||
return err
|
||||
}
|
||||
|
||||
func BenchmarkServerHTTPAnnounce(b *testing.B) {
|
||||
|
||||
@@ -71,7 +71,7 @@ func NewFrontends(configs []conf.NamedMapConfig, logic *middleware.Logic) (fs []
|
||||
fs = append(fs, f)
|
||||
logger.Info().Str("name", c.Name).Msg("frontend started")
|
||||
}
|
||||
return
|
||||
return fs, err
|
||||
}
|
||||
|
||||
// CloseGroup simultaneously calls Close for each non-nil
|
||||
@@ -102,5 +102,5 @@ func CloseGroup(cls []io.Closer) (err error) {
|
||||
if sb.Len() > 0 {
|
||||
err = errors.New(sb.String())
|
||||
}
|
||||
return
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -69,7 +69,7 @@ func (cfg Config) Validate() (validCfg Config, err error) {
|
||||
validCfg.ListenOptions = cfg.ListenOptions.Validate(logger)
|
||||
if cfg.UseTLS && (len(cfg.TLSCertPath) == 0 || len(cfg.TLSKeyPath) == 0) {
|
||||
err = errTLSNotProvided
|
||||
return
|
||||
return validCfg, err
|
||||
}
|
||||
|
||||
if cfg.ReadTimeout <= 0 {
|
||||
@@ -118,7 +118,7 @@ func (cfg Config) Validate() (validCfg Config, err error) {
|
||||
Msg("falling back to default configuration")
|
||||
}
|
||||
validCfg.ParseOptions.ParseOptions = cfg.ParseOptions.Validate(logger)
|
||||
return
|
||||
return validCfg, err
|
||||
}
|
||||
|
||||
type httpFE struct {
|
||||
@@ -232,7 +232,7 @@ func (f *httpFE) Close() (err error) {
|
||||
}
|
||||
})
|
||||
|
||||
return
|
||||
return err
|
||||
}
|
||||
|
||||
// announceRoute parses and responds to an Announce.
|
||||
|
||||
@@ -77,7 +77,7 @@ func runGet(u string, checkResponse bool) (err error) {
|
||||
return errors.New(r.Status)
|
||||
}
|
||||
}
|
||||
return
|
||||
return err
|
||||
}
|
||||
|
||||
func BenchmarkPing(b *testing.B) {
|
||||
|
||||
@@ -158,12 +158,12 @@ func requestedIPs(r *fasthttp.RequestCtx, p *queryParams, opts ParseOptions) (ad
|
||||
Provided: false,
|
||||
})
|
||||
}
|
||||
return
|
||||
return addresses
|
||||
}
|
||||
|
||||
func parseRequestAddress(s string, provided bool) (ra bittorrent.RequestAddress) {
|
||||
if addr, err := netip.ParseAddr(s); err == nil {
|
||||
ra.Addr, ra.Provided = addr, provided
|
||||
}
|
||||
return
|
||||
return ra
|
||||
}
|
||||
|
||||
@@ -35,7 +35,7 @@ func (lo ListenOptions) Validate(logger *log.Logger) (validOptions ListenOptions
|
||||
Str("default", validOptions.Addr).
|
||||
Msg("falling back to default configuration")
|
||||
}
|
||||
return
|
||||
return validOptions
|
||||
}
|
||||
|
||||
// ListenTCP listens at the given TCP Addr
|
||||
@@ -56,7 +56,7 @@ func (lo ListenOptions) ListenTCP() (conn *net.TCPListener, err error) {
|
||||
conn, err = net.ListenTCP("tcp", addr)
|
||||
}
|
||||
}
|
||||
return
|
||||
return conn, err
|
||||
}
|
||||
|
||||
// ListenUDP listens at the given UDP Addr
|
||||
@@ -77,7 +77,7 @@ func (lo ListenOptions) ListenUDP() (conn *net.UDPConn, err error) {
|
||||
conn, err = net.ListenUDP("udp", addr)
|
||||
}
|
||||
}
|
||||
return
|
||||
return conn, err
|
||||
}
|
||||
|
||||
// ParseOptions is the configuration used to parse an Announce Request.
|
||||
|
||||
@@ -96,7 +96,7 @@ func (cfg Config) Validate() (validCfg Config) {
|
||||
|
||||
validCfg.ParseOptions = cfg.ParseOptions.Validate(logger)
|
||||
|
||||
return
|
||||
return validCfg
|
||||
}
|
||||
|
||||
// udpFE holds the state of a UDP BitTorrent Frontend.
|
||||
@@ -174,7 +174,7 @@ func (f *udpFE) Close() (err error) {
|
||||
err = frontend.CloseGroup(cls)
|
||||
})
|
||||
|
||||
return
|
||||
return err
|
||||
}
|
||||
|
||||
// serve blocks while listening and serving UDP BitTorrent requests
|
||||
@@ -257,7 +257,7 @@ func (f *udpFE) handleRequest(ctx context.Context, r Request, w ResponseWriter)
|
||||
// Malformed, no client packets are less than 16 bytes.
|
||||
// We explicitly return nothing in case this is a DoS attempt.
|
||||
err = errMalformedPacket
|
||||
return
|
||||
return actionName, err
|
||||
}
|
||||
|
||||
// Parse the headers of the UDP packet.
|
||||
@@ -274,7 +274,7 @@ func (f *udpFE) handleRequest(ctx context.Context, r Request, w ResponseWriter)
|
||||
if actionID != connectActionID && !gen.Validate(connID, r.IP, timecache.Now()) {
|
||||
err = errBadConnectionID
|
||||
writeErrorResponse(w, txID, err)
|
||||
return
|
||||
return actionName, err
|
||||
}
|
||||
|
||||
// Handle the requested action.
|
||||
@@ -284,7 +284,7 @@ func (f *udpFE) handleRequest(ctx context.Context, r Request, w ResponseWriter)
|
||||
|
||||
if !bytes.Equal(connID, initialConnectionID) {
|
||||
err = errMalformedPacket
|
||||
return
|
||||
return actionName, err
|
||||
}
|
||||
|
||||
writeConnectionID(w, txID, gen.Generate(r.IP, timecache.Now()))
|
||||
@@ -296,7 +296,7 @@ func (f *udpFE) handleRequest(ctx context.Context, r Request, w ResponseWriter)
|
||||
req, err = parseAnnounce(r, actionID == announceV6ActionID, f.ParseOptions)
|
||||
if err != nil {
|
||||
writeErrorResponse(w, txID, err)
|
||||
return
|
||||
return actionName, err
|
||||
}
|
||||
|
||||
var resp *bittorrent.AnnounceResponse
|
||||
@@ -306,7 +306,7 @@ func (f *udpFE) handleRequest(ctx context.Context, r Request, w ResponseWriter)
|
||||
if !errors.Is(err, context.Canceled) {
|
||||
writeErrorResponse(w, txID, err)
|
||||
}
|
||||
return
|
||||
return actionName, err
|
||||
}
|
||||
|
||||
if err = ctx.Err(); err == nil {
|
||||
@@ -323,7 +323,7 @@ func (f *udpFE) handleRequest(ctx context.Context, r Request, w ResponseWriter)
|
||||
req, err = parseScrape(r, f.ParseOptions)
|
||||
if err != nil {
|
||||
writeErrorResponse(w, txID, err)
|
||||
return
|
||||
return actionName, err
|
||||
}
|
||||
|
||||
var resp *bittorrent.ScrapeResponse
|
||||
@@ -333,7 +333,7 @@ func (f *udpFE) handleRequest(ctx context.Context, r Request, w ResponseWriter)
|
||||
if !errors.Is(err, context.Canceled) {
|
||||
writeErrorResponse(w, txID, err)
|
||||
}
|
||||
return
|
||||
return actionName, err
|
||||
}
|
||||
|
||||
if err = ctx.Err(); err == nil {
|
||||
@@ -348,5 +348,5 @@ func (f *udpFE) handleRequest(ctx context.Context, r Request, w ResponseWriter)
|
||||
writeErrorResponse(w, txID, err)
|
||||
}
|
||||
|
||||
return
|
||||
return actionName, err
|
||||
}
|
||||
|
||||
@@ -43,7 +43,7 @@ type swarmInteractionHook struct {
|
||||
func (h *swarmInteractionHook) HandleAnnounce(ctx context.Context, req *bittorrent.AnnounceRequest, _ *bittorrent.AnnounceResponse) (outCtx context.Context, err error) {
|
||||
outCtx = ctx
|
||||
if ctx.Value(SkipSwarmInteractionKey) != nil {
|
||||
return
|
||||
return outCtx, err
|
||||
}
|
||||
|
||||
var storeFn func(context.Context, bittorrent.InfoHash, bittorrent.Peer) error
|
||||
@@ -82,7 +82,7 @@ func (h *swarmInteractionHook) HandleAnnounce(ctx context.Context, req *bittorre
|
||||
}
|
||||
}
|
||||
|
||||
return
|
||||
return outCtx, err
|
||||
}
|
||||
|
||||
func (h *swarmInteractionHook) HandleScrape(ctx context.Context, _ *bittorrent.ScrapeRequest, _ *bittorrent.ScrapeResponse) (context.Context, error) {
|
||||
@@ -105,17 +105,17 @@ type responseHook struct {
|
||||
func (h *responseHook) scrape(ctx context.Context, ih bittorrent.InfoHash) (leechers uint32, seeders uint32, snatched uint32, err error) {
|
||||
leechers, seeders, snatched, err = h.store.ScrapeSwarm(ctx, ih)
|
||||
if err != nil {
|
||||
return
|
||||
return leechers, seeders, snatched, err
|
||||
}
|
||||
if len(ih) == bittorrent.InfoHashV2Len {
|
||||
var l, s, n uint32
|
||||
l, s, n, err = h.store.ScrapeSwarm(ctx, ih.TruncateV1())
|
||||
if err != nil {
|
||||
return
|
||||
return leechers, seeders, snatched, err
|
||||
}
|
||||
leechers, seeders, snatched = leechers+l, seeders+s, snatched+n
|
||||
}
|
||||
return
|
||||
return leechers, seeders, snatched, err
|
||||
}
|
||||
|
||||
func (h *responseHook) HandleAnnounce(ctx context.Context, req *bittorrent.AnnounceRequest, resp *bittorrent.AnnounceResponse) (_ context.Context, err error) {
|
||||
@@ -209,7 +209,7 @@ func (h *responseHook) appendPeers(ctx context.Context, req *bittorrent.Announce
|
||||
}
|
||||
}
|
||||
|
||||
return
|
||||
return err
|
||||
}
|
||||
|
||||
func (h *responseHook) HandleScrape(ctx context.Context, req *bittorrent.ScrapeRequest, resp *bittorrent.ScrapeResponse) (_ context.Context, err error) {
|
||||
|
||||
@@ -113,7 +113,7 @@ func build(config conf.MapConfig, _ storage.PeerStorage) (h middleware.Hook, err
|
||||
err = errJWKsNotSet
|
||||
}
|
||||
|
||||
return
|
||||
return h, err
|
||||
}
|
||||
|
||||
type announceClaims struct {
|
||||
@@ -237,5 +237,5 @@ func (h *hook) getJWTString(params bittorrent.Params) (jwt string) {
|
||||
}
|
||||
}
|
||||
}
|
||||
return
|
||||
return jwt
|
||||
}
|
||||
|
||||
@@ -56,7 +56,7 @@ type params map[string]string
|
||||
|
||||
func (p params) GetString(key string) (out string, found bool) {
|
||||
out, found = p[key]
|
||||
return
|
||||
return out, found
|
||||
}
|
||||
|
||||
func (params) MarshalZerologObject(*zerolog.Event) {}
|
||||
|
||||
@@ -113,5 +113,5 @@ func (l *Logic) Ping(ctx context.Context) (err error) {
|
||||
break
|
||||
}
|
||||
}
|
||||
return
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -62,5 +62,5 @@ func NewHooks(configs []conf.NamedMapConfig, storage storage.PeerStorage) (hooks
|
||||
logger.Info().Str("name", c.Name).Msg("hook started")
|
||||
}
|
||||
|
||||
return
|
||||
return hooks, err
|
||||
}
|
||||
|
||||
@@ -164,5 +164,5 @@ func (s s3) ReadData(entry string) (data io.ReadCloser, err error) {
|
||||
if err == nil {
|
||||
data = result.Body
|
||||
}
|
||||
return
|
||||
return data, err
|
||||
}
|
||||
|
||||
@@ -66,7 +66,7 @@ func build(config conf.MapConfig, st storage.PeerStorage) (h middleware.Hook, er
|
||||
} else if ds, err = storage.NewDataStorage(cfg.Storage); err == nil {
|
||||
dsc = ds
|
||||
} else {
|
||||
return
|
||||
return h, err
|
||||
}
|
||||
|
||||
var c container.Container
|
||||
|
||||
@@ -36,7 +36,7 @@ func build(config conf.MapConfig, _ storage.PeerStorage) (h middleware.Hook, err
|
||||
}
|
||||
}
|
||||
}
|
||||
return
|
||||
return h, err
|
||||
}
|
||||
|
||||
var (
|
||||
@@ -112,5 +112,5 @@ func deriveEntropyFromRequest(req *bittorrent.AnnounceRequest) (v0 uint64, v1 ui
|
||||
v0 = binary.BigEndian.Uint64([]byte(req.InfoHash[:8])) + binary.BigEndian.Uint64([]byte(req.InfoHash[8:16]))
|
||||
}
|
||||
v1 = binary.BigEndian.Uint64(req.ID[:8]) + binary.BigEndian.Uint64(req.ID[8:16])
|
||||
return
|
||||
return v0, v1
|
||||
}
|
||||
|
||||
@@ -49,7 +49,7 @@ func (m MapConfig) Unmarshal(into any) (err error) {
|
||||
} else {
|
||||
err = ErrNilConfigMap
|
||||
}
|
||||
return
|
||||
return err
|
||||
}
|
||||
|
||||
// NamedMapConfig encapsulates MapConfig with string Name
|
||||
|
||||
@@ -38,7 +38,7 @@ type TimeCache struct {
|
||||
func New() (tc *TimeCache) {
|
||||
tc = &TimeCache{closed: make(chan struct{})}
|
||||
tc.clock.Store(time.Now().UnixNano())
|
||||
return
|
||||
return tc
|
||||
}
|
||||
|
||||
// Run runs the TimeCache, updating the cached clock value once every interval
|
||||
|
||||
@@ -108,7 +108,7 @@ func (s *store) addPeer(ctx context.Context, infoHashKey, peerID string) (err er
|
||||
if err = s.SAdd(ctx, infoHashKey, peerID).Err(); err == nil {
|
||||
err = s.Process(ctx, redis.NewCmd(ctx, expireMemberCmd, infoHashKey, peerID, s.peerTTL))
|
||||
}
|
||||
return
|
||||
return err
|
||||
}
|
||||
|
||||
func (s *store) delPeer(ctx context.Context, infoHashKey, peerID string) error {
|
||||
|
||||
@@ -181,14 +181,14 @@ func newStorage(cfg config) (*mdb, error) {
|
||||
dataDB, err = txn.OpenRoot(0)
|
||||
}
|
||||
if err != nil {
|
||||
return
|
||||
return err
|
||||
}
|
||||
if len(cfg.PeersDBName) > 0 {
|
||||
peersDB, err = txn.CreateDBI(cfg.PeersDBName)
|
||||
} else {
|
||||
peersDB, err = txn.OpenRoot(0)
|
||||
}
|
||||
return
|
||||
return err
|
||||
}); err != nil {
|
||||
_ = env.Close()
|
||||
return nil, err
|
||||
@@ -216,7 +216,7 @@ func (m *mdb) Close() (err error) {
|
||||
err = m.lmdbEnv.Close()
|
||||
}
|
||||
})
|
||||
return
|
||||
return err
|
||||
}
|
||||
|
||||
const keySeparator = '_'
|
||||
@@ -256,31 +256,31 @@ func (m *mdb) Put(_ context.Context, storeCtx string, values ...storage.Entry) (
|
||||
break
|
||||
}
|
||||
}
|
||||
return
|
||||
return err
|
||||
})
|
||||
}
|
||||
return
|
||||
return err
|
||||
}
|
||||
|
||||
func (m *mdb) Contains(_ context.Context, storeCtx string, key string) (contains bool, err error) {
|
||||
err = m.View(func(txn *lmdb.Txn) (err error) {
|
||||
_, err = txn.Get(m.dataDB, composeKey(storeCtx, key))
|
||||
return
|
||||
return err
|
||||
})
|
||||
if err == nil {
|
||||
contains = true
|
||||
} else if lmdb.IsNotFound(err) {
|
||||
err = nil
|
||||
}
|
||||
return
|
||||
return contains, err
|
||||
}
|
||||
|
||||
func (m *mdb) Load(_ context.Context, storeCtx string, key string) (v []byte, err error) {
|
||||
err = m.View(func(txn *lmdb.Txn) (err error) {
|
||||
v, err = ignoreNotFoundData(txn.Get(m.dataDB, composeKey(storeCtx, key)))
|
||||
return
|
||||
return err
|
||||
})
|
||||
return
|
||||
return v, err
|
||||
}
|
||||
|
||||
func (m *mdb) Delete(_ context.Context, storeCtx string, keys ...string) (err error) {
|
||||
@@ -291,10 +291,10 @@ func (m *mdb) Delete(_ context.Context, storeCtx string, keys ...string) (err er
|
||||
break
|
||||
}
|
||||
}
|
||||
return
|
||||
return err
|
||||
})
|
||||
}
|
||||
return
|
||||
return err
|
||||
}
|
||||
|
||||
const (
|
||||
@@ -324,7 +324,7 @@ func unpackPeer(arr []byte) (peer bittorrent.Peer) {
|
||||
AddrPort: netip.AddrPortFrom(netip.AddrFrom16([ipLen]byte(arr[bittorrent.PeerIDLen:])).Unmap(),
|
||||
binary.BigEndian.Uint16(arr[bittorrent.PeerIDLen+ipLen:])),
|
||||
}
|
||||
return
|
||||
return peer
|
||||
}
|
||||
|
||||
func composeIHKeyPrefix(ih []byte, seeder bool, v6 bool, suffixLen int) (ihKey []byte, suffixStart int) {
|
||||
@@ -343,13 +343,13 @@ func composeIHKeyPrefix(ih []byte, seeder bool, v6 bool, suffixLen int) (ihKey [
|
||||
ihKey[2], ihKey[ihLen+3] = keySeparator, keySeparator
|
||||
copy(ihKey[3:], ih)
|
||||
suffixStart = len(ihKey) - suffixLen
|
||||
return
|
||||
return ihKey, suffixStart
|
||||
}
|
||||
|
||||
func composeIHKey(ih bittorrent.InfoHash, peer bittorrent.Peer, seeder bool) (ihKey []byte) {
|
||||
ihKey, start := composeIHKeyPrefix(ih.Bytes(), seeder, peer.Addr().Is6(), packedPeerLen)
|
||||
packPeer(peer, ihKey[start:])
|
||||
return
|
||||
return ihKey
|
||||
}
|
||||
|
||||
func (m *mdb) putPeer(ih bittorrent.InfoHash, peer bittorrent.Peer, seeder bool) error {
|
||||
@@ -359,7 +359,7 @@ func (m *mdb) putPeer(ih bittorrent.InfoHash, peer bittorrent.Peer, seeder bool)
|
||||
if b, err = txn.PutReserve(m.peersDB, ihKey, 8, 0); err == nil {
|
||||
binary.BigEndian.PutUint64(b, uint64(timecache.NowUnix()))
|
||||
}
|
||||
return
|
||||
return err
|
||||
})
|
||||
}
|
||||
|
||||
@@ -390,12 +390,12 @@ func (m *mdb) GraduateLeecher(_ context.Context, ih bittorrent.InfoHash, peer bi
|
||||
ihKey := composeIHKey(ih, peer, false)
|
||||
return m.Update(func(txn *lmdb.Txn) (err error) {
|
||||
if err = ignoreNotFound(txn.Del(m.peersDB, ihKey, nil)); err != nil {
|
||||
return
|
||||
return err
|
||||
}
|
||||
ihKey[0] = seederPrefix
|
||||
var b []byte
|
||||
if b, err = txn.PutReserve(m.peersDB, ihKey, 8, 0); err != nil {
|
||||
return
|
||||
return err
|
||||
}
|
||||
binary.BigEndian.PutUint64(b, uint64(timecache.NowUnixNano()))
|
||||
|
||||
@@ -403,7 +403,7 @@ func (m *mdb) GraduateLeecher(_ context.Context, ih bittorrent.InfoHash, peer bi
|
||||
ihPrefix[0], ihPrefix[1] = downloadedPrefix, countPrefix
|
||||
var v int
|
||||
if b, err = ignoreNotFoundData(txn.Get(m.peersDB, ihPrefix)); err != nil {
|
||||
return
|
||||
return err
|
||||
}
|
||||
if len(b) >= 4 {
|
||||
v = int(binary.BigEndian.Uint32(b))
|
||||
@@ -412,7 +412,7 @@ func (m *mdb) GraduateLeecher(_ context.Context, ih bittorrent.InfoHash, peer bi
|
||||
if b, err = txn.PutReserve(m.peersDB, ihPrefix, 4, 0); err == nil {
|
||||
binary.BigEndian.PutUint32(b, uint32(v))
|
||||
}
|
||||
return
|
||||
return err
|
||||
})
|
||||
}
|
||||
|
||||
@@ -450,11 +450,11 @@ func (m *mdb) scanPeers(ctx context.Context, prefix []byte, readRaw bool, fn fun
|
||||
err = scanner.Err()
|
||||
}
|
||||
scanner.Close()
|
||||
return
|
||||
return err
|
||||
})
|
||||
m.wg.Done()
|
||||
|
||||
return
|
||||
return err
|
||||
}
|
||||
|
||||
func (m *mdb) AnnouncePeers(ctx context.Context, ih bittorrent.InfoHash, forSeeder bool, numWant int, v6 bool) (peers []bittorrent.Peer, err error) {
|
||||
@@ -474,7 +474,7 @@ func (m *mdb) AnnouncePeers(ctx context.Context, ih bittorrent.InfoHash, forSeed
|
||||
err = m.scanPeers(ctx, prefix, true, appendFn)
|
||||
}
|
||||
}
|
||||
return
|
||||
return peers, err
|
||||
}
|
||||
|
||||
func (m *mdb) countPeers(ctx context.Context, scanPrefix []byte) (cnt uint32, err error) {
|
||||
@@ -509,35 +509,35 @@ func (m *mdb) countPeers(ctx context.Context, scanPrefix []byte) (cnt uint32, er
|
||||
}
|
||||
err = scanner.Err()
|
||||
scanner.Close()
|
||||
return
|
||||
return err
|
||||
})
|
||||
m.wg.Done()
|
||||
|
||||
return
|
||||
return cnt, err
|
||||
}
|
||||
|
||||
func (m *mdb) ScrapeSwarm(ctx context.Context, ih bittorrent.InfoHash) (leechers uint32, seeders uint32, snatched uint32, err error) {
|
||||
scanPrefix, _ := composeIHKeyPrefix(ih.Bytes(), false, false, 0)
|
||||
if leechers, err = m.countPeers(ctx, scanPrefix); err != nil {
|
||||
return
|
||||
return leechers, seeders, snatched, err
|
||||
}
|
||||
scanPrefix[0], scanPrefix[1] = seederPrefix, ipv4Prefix
|
||||
if seeders, err = m.countPeers(ctx, scanPrefix); err != nil {
|
||||
return
|
||||
return leechers, seeders, snatched, err
|
||||
}
|
||||
|
||||
scanPrefix[0], scanPrefix[1] = downloadedPrefix, countPrefix
|
||||
err = m.View(func(txn *lmdb.Txn) (err error) {
|
||||
var b []byte
|
||||
if b, err = ignoreNotFoundData(txn.Get(m.peersDB, scanPrefix)); err != nil {
|
||||
return
|
||||
return err
|
||||
}
|
||||
if len(b) >= 4 {
|
||||
snatched = binary.BigEndian.Uint32(b)
|
||||
}
|
||||
return
|
||||
return err
|
||||
})
|
||||
return
|
||||
return leechers, seeders, snatched, err
|
||||
}
|
||||
|
||||
const (
|
||||
@@ -564,7 +564,7 @@ func (m *mdb) gc(cutoff time.Time) {
|
||||
break
|
||||
}
|
||||
}
|
||||
return
|
||||
return err
|
||||
})
|
||||
}
|
||||
if err == nil {
|
||||
|
||||
@@ -101,7 +101,7 @@ func (p *ihSwarm) get(k bittorrent.InfoHash) (v swarm, ok bool) {
|
||||
p.RLock()
|
||||
v, ok = p.m[k]
|
||||
p.RUnlock()
|
||||
return
|
||||
return v, ok
|
||||
}
|
||||
|
||||
func (p *ihSwarm) getOrCreate(k bittorrent.InfoHash) (v swarm) {
|
||||
@@ -117,7 +117,7 @@ func (p *ihSwarm) getOrCreate(k bittorrent.InfoHash) (v swarm) {
|
||||
}
|
||||
p.Unlock()
|
||||
}
|
||||
return
|
||||
return v
|
||||
}
|
||||
|
||||
func (p *ihSwarm) del(k bittorrent.InfoHash) (ok bool) {
|
||||
@@ -126,7 +126,7 @@ func (p *ihSwarm) del(k bittorrent.InfoHash) (ok bool) {
|
||||
delete(p.m, k)
|
||||
}
|
||||
p.Unlock()
|
||||
return
|
||||
return ok
|
||||
}
|
||||
|
||||
func (p *ihSwarm) len() int {
|
||||
@@ -158,7 +158,7 @@ func (p *peers) get(k bittorrent.Peer) (v int64, ok bool) {
|
||||
p.RLock()
|
||||
v, ok = p.m[k]
|
||||
p.RUnlock()
|
||||
return
|
||||
return v, ok
|
||||
}
|
||||
|
||||
func (p *peers) set(k bittorrent.Peer, v int64) {
|
||||
@@ -173,7 +173,7 @@ func (p *peers) del(k bittorrent.Peer) (ok bool) {
|
||||
delete(p.m, k)
|
||||
}
|
||||
p.Unlock()
|
||||
return
|
||||
return ok
|
||||
}
|
||||
|
||||
func (p *peers) len() int {
|
||||
@@ -322,7 +322,7 @@ func (ps *peerStore) DeleteSeeder(_ context.Context, ih bittorrent.InfoHash, p b
|
||||
err = storage.ErrResourceDoesNotExist
|
||||
}
|
||||
|
||||
return
|
||||
return err
|
||||
}
|
||||
|
||||
func (ps *peerStore) PutLeecher(_ context.Context, ih bittorrent.InfoHash, p bittorrent.Peer) error {
|
||||
@@ -368,7 +368,7 @@ func (ps *peerStore) DeleteLeecher(_ context.Context, ih bittorrent.InfoHash, p
|
||||
err = storage.ErrResourceDoesNotExist
|
||||
}
|
||||
|
||||
return
|
||||
return err
|
||||
}
|
||||
|
||||
func (ps *peerStore) GraduateLeecher(_ context.Context, ih bittorrent.InfoHash, p bittorrent.Peer) error {
|
||||
@@ -427,7 +427,7 @@ func (ps *peerStore) AnnouncePeers(_ context.Context, ih bittorrent.InfoHash, fo
|
||||
}
|
||||
}
|
||||
|
||||
return
|
||||
return peers, err
|
||||
}
|
||||
|
||||
func (ps *peerStore) countPeers(ih bittorrent.InfoHash, v6 bool) (leechers, seeders uint32) {
|
||||
@@ -436,7 +436,7 @@ func (ps *peerStore) countPeers(ih bittorrent.InfoHash, v6 bool) (leechers, seed
|
||||
if sw, ok := shard.swarms.get(ih); ok {
|
||||
leechers, seeders = uint32(sw.leechers.len()), uint32(sw.seeders.len())
|
||||
}
|
||||
return
|
||||
return leechers, seeders
|
||||
}
|
||||
|
||||
func (ps *peerStore) ScrapeSwarm(_ context.Context, ih bittorrent.InfoHash) (leechers uint32, seeders uint32, snatched uint32, _ error) {
|
||||
|
||||
@@ -144,7 +144,7 @@ func checkParameter(p *string, name string) (err error) {
|
||||
if *p = strings.TrimSpace(*p); len(*p) == 0 {
|
||||
err = fmt.Errorf(errRequiredParameterNotSetMsg, name)
|
||||
}
|
||||
return
|
||||
return err
|
||||
}
|
||||
|
||||
type config struct {
|
||||
@@ -268,7 +268,7 @@ func (s *store) txBatch(ctx context.Context, batch *pgx.Batch) (err error) {
|
||||
}
|
||||
}
|
||||
}
|
||||
return
|
||||
return err
|
||||
}
|
||||
|
||||
func (s *store) Put(ctx context.Context, storeCtx string, values ...storage.Entry) (err error) {
|
||||
@@ -284,7 +284,7 @@ func (s *store) Put(ctx context.Context, storeCtx string, values ...storage.Entr
|
||||
}
|
||||
err = s.txBatch(ctx, &batch)
|
||||
}
|
||||
return
|
||||
return err
|
||||
}
|
||||
|
||||
func (s *store) Contains(ctx context.Context, storeCtx string, key string) (contains bool, err error) {
|
||||
@@ -294,12 +294,12 @@ func (s *store) Contains(ctx context.Context, storeCtx string, key string) (cont
|
||||
contains = rows.Next()
|
||||
err = rows.Err()
|
||||
}
|
||||
return
|
||||
return contains, err
|
||||
}
|
||||
|
||||
func (s *store) Load(ctx context.Context, storeCtx string, key string) (out []byte, err error) {
|
||||
err = noResultErr(s.QueryRow(ctx, s.Data.GetQuery, pgx.NamedArgs{pCtx: storeCtx, pKey: []byte(key)}).Scan(&out))
|
||||
return
|
||||
return out, err
|
||||
}
|
||||
|
||||
func (s *store) Delete(ctx context.Context, storeCtx string, keys ...string) (err error) {
|
||||
@@ -310,7 +310,7 @@ func (s *store) Delete(ctx context.Context, storeCtx string, keys ...string) (er
|
||||
}
|
||||
_, err = s.Exec(ctx, s.Data.DelQuery, pgx.NamedArgs{pCtx: storeCtx, pKey: baKeys})
|
||||
}
|
||||
return
|
||||
return err
|
||||
}
|
||||
|
||||
func (s *store) Preservable() bool {
|
||||
@@ -397,7 +397,7 @@ func (s *store) putPeer(ctx context.Context, ih []byte, peer bittorrent.Peer, se
|
||||
pV6: peer.Addr().Is6(),
|
||||
pCreated: timecache.Now(),
|
||||
})
|
||||
return
|
||||
return err
|
||||
}
|
||||
|
||||
func (s *store) delPeer(ctx context.Context, ih []byte, peer bittorrent.Peer, seeder bool) (err error) {
|
||||
@@ -412,7 +412,7 @@ func (s *store) delPeer(ctx context.Context, ih []byte, peer bittorrent.Peer, se
|
||||
pPort: peer.Port(),
|
||||
pSeeder: seeder,
|
||||
})
|
||||
return
|
||||
return err
|
||||
}
|
||||
|
||||
func (s *store) PutSeeder(ctx context.Context, ih bittorrent.InfoHash, peer bittorrent.Peer) error {
|
||||
@@ -475,7 +475,7 @@ func (s *store) getPeers(ctx context.Context, ih []byte, seeders bool, maxCount
|
||||
s.Announce.AddressColumn,
|
||||
s.Announce.PortColumn,
|
||||
})
|
||||
return
|
||||
return peers, err
|
||||
}
|
||||
var maxIndex int
|
||||
switch {
|
||||
@@ -516,7 +516,7 @@ func (s *store) getPeers(ctx context.Context, ih []byte, seeders bool, maxCount
|
||||
}
|
||||
}
|
||||
}
|
||||
return
|
||||
return peers, err
|
||||
}
|
||||
|
||||
func (s *store) AnnouncePeers(ctx context.Context, ih bittorrent.InfoHash, forSeeder bool, numWant int, v6 bool) (peers []bittorrent.Peer, err error) {
|
||||
@@ -546,7 +546,7 @@ func (s *store) AnnouncePeers(ctx context.Context, ih bittorrent.InfoHash, forSe
|
||||
logger.Warn().Err(err).Stringer("infoHash", ih).Msg("error occurred while retrieving peers")
|
||||
}
|
||||
|
||||
return
|
||||
return peers, err
|
||||
}
|
||||
|
||||
func (s *store) countPeers(ctx context.Context, ih []byte) (seeders uint32, leechers uint32, err error) {
|
||||
@@ -585,7 +585,7 @@ func (s *store) countPeers(ctx context.Context, ih []byte) (seeders uint32, leec
|
||||
}
|
||||
}
|
||||
}
|
||||
return
|
||||
return seeders, leechers, err
|
||||
}
|
||||
|
||||
func (s *store) ScrapeSwarm(ctx context.Context, ih bittorrent.InfoHash) (leechers uint32, seeders uint32, snatched uint32, err error) {
|
||||
@@ -594,13 +594,13 @@ func (s *store) ScrapeSwarm(ctx context.Context, ih bittorrent.InfoHash) (leeche
|
||||
Msg("scrape swarm")
|
||||
ihb := ih.Bytes()
|
||||
if seeders, leechers, err = s.countPeers(ctx, ihb); err != nil {
|
||||
return
|
||||
return leechers, seeders, snatched, err
|
||||
}
|
||||
if len(s.Downloads.GetQuery) > 0 {
|
||||
err = noResultErr(s.QueryRow(ctx, s.Downloads.GetQuery, pgx.NamedArgs{pInfoHash: ihb}).Scan(&snatched))
|
||||
}
|
||||
|
||||
return
|
||||
return leechers, seeders, snatched, err
|
||||
}
|
||||
|
||||
func (s *store) Ping(ctx context.Context) error {
|
||||
|
||||
@@ -353,7 +353,7 @@ func (ps *store) count(key string, getLength bool) (n uint64) {
|
||||
if err != nil {
|
||||
logger.Error().Err(err).Str("key", key).Msg("GET/SCARD failure")
|
||||
}
|
||||
return
|
||||
return n
|
||||
}
|
||||
|
||||
func (ps *store) getClock() int64 {
|
||||
@@ -374,7 +374,7 @@ func (ps *store) tx(ctx context.Context, txf func(tx redis.Pipeliner) error) (er
|
||||
} else {
|
||||
err = txErr
|
||||
}
|
||||
return
|
||||
return err
|
||||
}
|
||||
|
||||
// NoResultErr returns nil if provided err is redis.Nil
|
||||
@@ -406,7 +406,7 @@ func InfoHashKey(infoHash string, seeder, v6 bool) (infoHashKey string) {
|
||||
infoHashKey = IH4LeecherKey
|
||||
}
|
||||
infoHashKey += infoHash
|
||||
return
|
||||
return infoHashKey
|
||||
}
|
||||
|
||||
func (ps *store) putPeer(ctx context.Context, infoHashKey, peerCountKey, peerID string) error {
|
||||
@@ -416,13 +416,13 @@ func (ps *store) putPeer(ctx context.Context, infoHashKey, peerCountKey, peerID
|
||||
Msg("put peer")
|
||||
return ps.tx(ctx, func(tx redis.Pipeliner) (err error) {
|
||||
if err = tx.HSet(ctx, infoHashKey, peerID, ps.getClock()).Err(); err != nil {
|
||||
return
|
||||
return err
|
||||
}
|
||||
if err = tx.Incr(ctx, peerCountKey).Err(); err != nil {
|
||||
return
|
||||
return err
|
||||
}
|
||||
err = tx.SAdd(ctx, IHKey, infoHashKey).Err()
|
||||
return
|
||||
return err
|
||||
})
|
||||
}
|
||||
|
||||
@@ -513,7 +513,7 @@ var errInvalidPeerDataSize = fmt.Errorf("invalid peer data (must be at least %d
|
||||
func UnpackPeer(data string) (peer bittorrent.Peer, err error) {
|
||||
if len(data) < peerMinimumLen {
|
||||
err = errInvalidPeerDataSize
|
||||
return
|
||||
return peer, err
|
||||
}
|
||||
b := str2bytes.StringToBytes(data)
|
||||
peerID, _ := bittorrent.NewPeerID(b[:bittorrent.PeerIDLen])
|
||||
@@ -529,7 +529,7 @@ func UnpackPeer(data string) (peer bittorrent.Peer, err error) {
|
||||
err = bittorrent.ErrInvalidIP
|
||||
}
|
||||
|
||||
return
|
||||
return peer, err
|
||||
}
|
||||
|
||||
func (ps *Connection) parsePeersList(peersResult *redis.StringSliceCmd) (peers []bittorrent.Peer, err error) {
|
||||
@@ -544,7 +544,7 @@ func (ps *Connection) parsePeersList(peersResult *redis.StringSliceCmd) (peers [
|
||||
}
|
||||
}
|
||||
}
|
||||
return
|
||||
return peers, err
|
||||
}
|
||||
|
||||
type getPeersFn func(context.Context, string, int) *redis.StringSliceCmd
|
||||
@@ -586,7 +586,7 @@ func (ps *Connection) GetPeers(
|
||||
logger.Warn().Err(err).Stringer("infoHash", ih).Msg("error occurred while retrieving peers")
|
||||
}
|
||||
|
||||
return
|
||||
return out, err
|
||||
}
|
||||
|
||||
func (ps *store) AnnouncePeers(
|
||||
@@ -613,26 +613,26 @@ func (ps *Connection) ScrapeIH(ctx context.Context, ih bittorrent.InfoHash, coun
|
||||
|
||||
lc4, err = countFn(ctx, InfoHashKey(infoHash, false, false)).Result()
|
||||
if err = NoResultErr(err); err != nil {
|
||||
return
|
||||
return leechersCount, seedersCount, downloadsCount, err
|
||||
}
|
||||
lc6, err = countFn(ctx, InfoHashKey(infoHash, false, true)).Result()
|
||||
if err = NoResultErr(err); err != nil {
|
||||
return
|
||||
return leechersCount, seedersCount, downloadsCount, err
|
||||
}
|
||||
sc4, err = countFn(ctx, InfoHashKey(infoHash, true, false)).Result()
|
||||
if err = NoResultErr(err); err != nil {
|
||||
return
|
||||
return leechersCount, seedersCount, downloadsCount, err
|
||||
}
|
||||
sc6, err = countFn(ctx, InfoHashKey(infoHash, true, true)).Result()
|
||||
if err = NoResultErr(err); err != nil {
|
||||
return
|
||||
return leechersCount, seedersCount, downloadsCount, err
|
||||
}
|
||||
dc, err = ps.HGet(ctx, CountDownloadsKey, infoHash).Int64()
|
||||
if err = NoResultErr(err); err != nil {
|
||||
return
|
||||
return leechersCount, seedersCount, downloadsCount, err
|
||||
}
|
||||
leechersCount, seedersCount, downloadsCount = uint32(lc4+lc6), uint32(sc4+sc6), uint32(dc)
|
||||
return
|
||||
return leechersCount, seedersCount, downloadsCount, err
|
||||
}
|
||||
|
||||
func (ps *store) ScrapeSwarm(ctx context.Context, ih bittorrent.InfoHash) (uint32, uint32, uint32, error) {
|
||||
@@ -667,7 +667,7 @@ func (ps *Connection) Put(ctx context.Context, storeCtx string, values ...storag
|
||||
}
|
||||
}
|
||||
}
|
||||
return
|
||||
return err
|
||||
}
|
||||
|
||||
// Contains - storage.DataStorage implementation
|
||||
@@ -682,7 +682,7 @@ func (ps *Connection) Load(ctx context.Context, storeCtx string, key string) (v
|
||||
if err != nil && errors.Is(err, redis.Nil) {
|
||||
v, err = nil, nil
|
||||
}
|
||||
return
|
||||
return v, err
|
||||
}
|
||||
|
||||
// Delete - storage.DataStorage implementation
|
||||
@@ -700,7 +700,7 @@ func (ps *Connection) Delete(ctx context.Context, storeCtx string, keys ...strin
|
||||
}
|
||||
}
|
||||
}
|
||||
return
|
||||
return err
|
||||
}
|
||||
|
||||
// Preservable - storage.DataStorage implementation
|
||||
@@ -866,5 +866,5 @@ func (ps *store) Close() (err error) {
|
||||
logger.Info().Msg("redis exiting. mochi does not clear data in redis when exiting. mochi keys have prefix " + PrefixKey)
|
||||
err = ps.UniversalClient.Close()
|
||||
})
|
||||
return
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -61,7 +61,7 @@ func (c Config) sanitizeGCConfig() (gcInterval, peerTTL time.Duration) {
|
||||
} else {
|
||||
peerTTL = c.PeerLifetime
|
||||
}
|
||||
return
|
||||
return gcInterval, peerTTL
|
||||
}
|
||||
|
||||
func (c Config) sanitizeStatisticsConfig() (statInterval time.Duration) {
|
||||
@@ -73,7 +73,7 @@ func (c Config) sanitizeStatisticsConfig() (statInterval time.Duration) {
|
||||
Dur("default", DefaultPrometheusReportingInterval).
|
||||
Msg("falling back to default configuration")
|
||||
}
|
||||
return
|
||||
return statInterval
|
||||
}
|
||||
|
||||
// Entry - some key-value pair, used for BulkPut
|
||||
@@ -269,11 +269,11 @@ func NewPeerStorage(cfg conf.NamedMapConfig) (ps PeerStorage, err error) {
|
||||
|
||||
c := new(Config)
|
||||
if err = cfg.Config.Unmarshal(c); err != nil {
|
||||
return
|
||||
return ps, err
|
||||
}
|
||||
|
||||
if ps, err = d.NewPeerStorage(cfg.Config); err != nil {
|
||||
return
|
||||
return ps, err
|
||||
}
|
||||
|
||||
if gc, isOk := ps.(GarbageCollector); isOk {
|
||||
@@ -308,5 +308,5 @@ func NewPeerStorage(cfg conf.NamedMapConfig) (ps PeerStorage, err error) {
|
||||
|
||||
logger.Info().Str("name", cfg.Name).Msg("storage started")
|
||||
|
||||
return
|
||||
return ps, err
|
||||
}
|
||||
|
||||
@@ -30,7 +30,7 @@ func generateInfoHashes() (a [ihCount]bittorrent.InfoHash) {
|
||||
for i := range a {
|
||||
a[i] = randIH(i < ihCount/2)
|
||||
}
|
||||
return
|
||||
return a
|
||||
}
|
||||
|
||||
func generatePeers() (a [peersCount]bittorrent.Peer) {
|
||||
@@ -56,7 +56,7 @@ func generatePeers() (a [peersCount]bittorrent.Peer) {
|
||||
}
|
||||
}
|
||||
|
||||
return
|
||||
return a
|
||||
}
|
||||
|
||||
type (
|
||||
|
||||
@@ -27,7 +27,7 @@ func randIH(v2 bool) (ih bittorrent.InfoHash) {
|
||||
panic(err)
|
||||
}
|
||||
ih, _ = bittorrent.NewInfoHash(b)
|
||||
return
|
||||
return ih
|
||||
}
|
||||
|
||||
func randPeerID() (ih bittorrent.PeerID) {
|
||||
@@ -36,7 +36,7 @@ func randPeerID() (ih bittorrent.PeerID) {
|
||||
panic(err)
|
||||
}
|
||||
ih, _ = bittorrent.NewPeerID(b)
|
||||
return
|
||||
return ih
|
||||
}
|
||||
|
||||
func init() {
|
||||
|
||||
Reference in New Issue
Block a user