Merge pull request #157 from sot-tech/s3-approval-middleware

S3 approval middleware (#133 rework)

NB: Automerged due inactivity
This commit is contained in:
SOT-TECH
2025-12-01 16:25:58 +03:00
committed by GitHub
35 changed files with 931 additions and 289 deletions
+9 -4
View File
@@ -15,7 +15,7 @@ jobs:
- uses: "actions/checkout@v4"
- uses: "actions/setup-go@v5"
with:
go-version: ">=1.23"
go-version: ">=1.24"
- name: "Build"
run: "go build ./cmd/..."
@@ -42,7 +42,12 @@ jobs:
- uses: "actions/checkout@v4"
- uses: "actions/setup-go@v5"
with:
go-version: "^1.23"
go-version: "^1.24"
- name: "Download & start MinIO"
run: |
wget --quiet -O /tmp/minio https://dl.min.io/server/minio/release/linux-amd64/minio
chmod +x /tmp/minio
/tmp/minio server /tmp/minio_data &
- name: "Run `go test`"
run: "go test -race ./..."
@@ -53,7 +58,7 @@ jobs:
- uses: "actions/checkout@v4"
- uses: "actions/setup-go@v5"
with:
go-version: "^1.23"
go-version: "^1.24"
- name: "Install and configure mochi"
run: |
go install ./cmd/mochi
@@ -79,7 +84,7 @@ jobs:
- uses: "actions/checkout@v4"
- uses: "actions/setup-go@v5"
with:
go-version: "^1.23"
go-version: "^1.24"
- name: "Install and configure mochi"
run: |
go install ./cmd/mochi
+49 -49
View File
@@ -1,53 +1,53 @@
version: "2"
run:
go: "1.23"
go: "1.24"
linters:
enable:
- bidichk
- bodyclose
- errname
- errorlint
- goprintffuncname
- gosec
- importas
- makezero
- prealloc
- predeclared
- revive
- rowserrcheck
- staticcheck
- unconvert
- wastedassign
- whitespace
settings:
gosec:
excludes:
- G505
- G115
staticcheck:
checks:
- all
exclusions:
generated: lax
presets:
- common-false-positives
- legacy
- std-error-handling
paths:
- third_party$
- builtin$
- examples$
enable:
- bidichk
- bodyclose
- errname
- errorlint
- goprintffuncname
- gosec
- importas
- makezero
- prealloc
- predeclared
- revive
- rowserrcheck
- staticcheck
- unconvert
- wastedassign
- whitespace
settings:
gosec:
excludes:
- G505
- G115
staticcheck:
checks:
- all
exclusions:
generated: lax
presets:
- common-false-positives
- legacy
- std-error-handling
paths:
- third_party$
- builtin$
- examples$
formatters:
enable:
- gofumpt
- goimports
settings:
goimports:
local-prefixes:
- sot-te.ch/mochi
exclusions:
generated: lax
paths:
- third_party$
- builtin$
- examples$
enable:
- gofumpt
- goimports
settings:
goimports:
local-prefixes:
- sot-te.ch/mochi
exclusions:
generated: lax
paths:
- third_party$
- builtin$
- examples$
+2 -2
View File
@@ -52,7 +52,7 @@ func NewEvent(eventStr string) (evt Event, err error) {
default:
evt, err = None, ErrUnknownEvent
}
return
return evt, err
}
// String implements Stringer for an event.
@@ -69,5 +69,5 @@ func (e Event) String() (s string) {
default:
s = "<unknown>"
}
return
return s
}
+1 -1
View File
@@ -133,7 +133,7 @@ func (rp RequestPeer) Peers() (peers Peers) {
AddrPort: netip.AddrPortFrom(a.Addr, rp.Port),
})
}
return
return peers
}
// MarshalZerologObject writes fields into zerolog event
+1 -1
View File
@@ -242,7 +242,7 @@ func sendHTTPReq(u string) (err error) {
return errors.New(r.Status)
}
}
return
return err
}
func BenchmarkServerHTTPAnnounce(b *testing.B) {
+20 -1
View File
@@ -28,6 +28,11 @@ There are two sources of hashes: `list` and `directory`.
files at start and then periodically watch for new files to add, or for delete events
to remove hash from storage.
* `s3` will search for torrent files in specified S3-compatible storage (AWS S3, MinIO, etc.) and
append/delete records from storage. This source will parse all existing
files at start and then periodically watch for new files to add, or for delete events
to remove hash from storage.
Note: if storage is not `memory`, and `preserve` option set to `true`, records
will be persisted in storage until _somebody_ or _something_ (different tool with access
to storage) won't delete it.
@@ -48,7 +53,21 @@ If `name` is empty or `internal` global storage will be used
- `directory`:
- `path` - directory to watch
- `period` - time between two directory checks
- `invert` and `storage_ctx` has the same meanins as `list`'s options
- `invert` and `storage_ctx` has the same meaning as `list`'s options
- `s3`:
- `endpoint`* - base URL of S3 provider
- `region`* - S3 region to connect to
- `key_id`* - S3 access key ID
- `key_secret`* - S3 secret access key
- `session_token`* - S3 temporary security credential
- `bucket` - S3 bucket
- `prefix` - prefix path to search entries
- `suffix` - suffix to filter returned entries, such as extension (e.g. `.torrent`)
- `period` - time between two S3 checks
- `invert` and `storage_ctx` has the same meaning as `list`'s options
Note: `s3` options marked with `*` and any other specific options can be omitted in MoChi and can be provided
with environment variables or in `$HOME/.aws/*` files (see [AWS SDK documentation](https://docs.aws.amazon.com/sdk-for-go/v2/developer-guide/configure-gosdk.html)).
Configuration example:
+2 -2
View File
@@ -71,7 +71,7 @@ func NewFrontends(configs []conf.NamedMapConfig, logic *middleware.Logic) (fs []
fs = append(fs, f)
logger.Info().Str("name", c.Name).Msg("frontend started")
}
return
return fs, err
}
// CloseGroup simultaneously calls Close for each non-nil
@@ -102,5 +102,5 @@ func CloseGroup(cls []io.Closer) (err error) {
if sb.Len() > 0 {
err = errors.New(sb.String())
}
return
return err
}
+3 -3
View File
@@ -69,7 +69,7 @@ func (cfg Config) Validate() (validCfg Config, err error) {
validCfg.ListenOptions = cfg.ListenOptions.Validate(logger)
if cfg.UseTLS && (len(cfg.TLSCertPath) == 0 || len(cfg.TLSKeyPath) == 0) {
err = errTLSNotProvided
return
return validCfg, err
}
if cfg.ReadTimeout <= 0 {
@@ -118,7 +118,7 @@ func (cfg Config) Validate() (validCfg Config, err error) {
Msg("falling back to default configuration")
}
validCfg.ParseOptions.ParseOptions = cfg.ParseOptions.Validate(logger)
return
return validCfg, err
}
type httpFE struct {
@@ -232,7 +232,7 @@ func (f *httpFE) Close() (err error) {
}
})
return
return err
}
// announceRoute parses and responds to an Announce.
+1 -1
View File
@@ -77,7 +77,7 @@ func runGet(u string, checkResponse bool) (err error) {
return errors.New(r.Status)
}
}
return
return err
}
func BenchmarkPing(b *testing.B) {
+2 -2
View File
@@ -158,12 +158,12 @@ func requestedIPs(r *fasthttp.RequestCtx, p *queryParams, opts ParseOptions) (ad
Provided: false,
})
}
return
return addresses
}
func parseRequestAddress(s string, provided bool) (ra bittorrent.RequestAddress) {
if addr, err := netip.ParseAddr(s); err == nil {
ra.Addr, ra.Provided = addr, provided
}
return
return ra
}
+3 -3
View File
@@ -35,7 +35,7 @@ func (lo ListenOptions) Validate(logger *log.Logger) (validOptions ListenOptions
Str("default", validOptions.Addr).
Msg("falling back to default configuration")
}
return
return validOptions
}
// ListenTCP listens at the given TCP Addr
@@ -56,7 +56,7 @@ func (lo ListenOptions) ListenTCP() (conn *net.TCPListener, err error) {
conn, err = net.ListenTCP("tcp", addr)
}
}
return
return conn, err
}
// ListenUDP listens at the given UDP Addr
@@ -77,7 +77,7 @@ func (lo ListenOptions) ListenUDP() (conn *net.UDPConn, err error) {
conn, err = net.ListenUDP("udp", addr)
}
}
return
return conn, err
}
// ParseOptions is the configuration used to parse an Announce Request.
+10 -10
View File
@@ -96,7 +96,7 @@ func (cfg Config) Validate() (validCfg Config) {
validCfg.ParseOptions = cfg.ParseOptions.Validate(logger)
return
return validCfg
}
// udpFE holds the state of a UDP BitTorrent Frontend.
@@ -174,7 +174,7 @@ func (f *udpFE) Close() (err error) {
err = frontend.CloseGroup(cls)
})
return
return err
}
// serve blocks while listening and serving UDP BitTorrent requests
@@ -257,7 +257,7 @@ func (f *udpFE) handleRequest(ctx context.Context, r Request, w ResponseWriter)
// Malformed, no client packets are less than 16 bytes.
// We explicitly return nothing in case this is a DoS attempt.
err = errMalformedPacket
return
return actionName, err
}
// Parse the headers of the UDP packet.
@@ -274,7 +274,7 @@ func (f *udpFE) handleRequest(ctx context.Context, r Request, w ResponseWriter)
if actionID != connectActionID && !gen.Validate(connID, r.IP, timecache.Now()) {
err = errBadConnectionID
writeErrorResponse(w, txID, err)
return
return actionName, err
}
// Handle the requested action.
@@ -284,7 +284,7 @@ func (f *udpFE) handleRequest(ctx context.Context, r Request, w ResponseWriter)
if !bytes.Equal(connID, initialConnectionID) {
err = errMalformedPacket
return
return actionName, err
}
writeConnectionID(w, txID, gen.Generate(r.IP, timecache.Now()))
@@ -296,7 +296,7 @@ func (f *udpFE) handleRequest(ctx context.Context, r Request, w ResponseWriter)
req, err = parseAnnounce(r, actionID == announceV6ActionID, f.ParseOptions)
if err != nil {
writeErrorResponse(w, txID, err)
return
return actionName, err
}
var resp *bittorrent.AnnounceResponse
@@ -306,7 +306,7 @@ func (f *udpFE) handleRequest(ctx context.Context, r Request, w ResponseWriter)
if !errors.Is(err, context.Canceled) {
writeErrorResponse(w, txID, err)
}
return
return actionName, err
}
if err = ctx.Err(); err == nil {
@@ -323,7 +323,7 @@ func (f *udpFE) handleRequest(ctx context.Context, r Request, w ResponseWriter)
req, err = parseScrape(r, f.ParseOptions)
if err != nil {
writeErrorResponse(w, txID, err)
return
return actionName, err
}
var resp *bittorrent.ScrapeResponse
@@ -333,7 +333,7 @@ func (f *udpFE) handleRequest(ctx context.Context, r Request, w ResponseWriter)
if !errors.Is(err, context.Canceled) {
writeErrorResponse(w, txID, err)
}
return
return actionName, err
}
if err = ctx.Err(); err == nil {
@@ -348,5 +348,5 @@ func (f *udpFE) handleRequest(ctx context.Context, r Request, w ResponseWriter)
writeErrorResponse(w, txID, err)
}
return
return actionName, err
}
+37 -20
View File
@@ -1,31 +1,47 @@
module github.com/sot-tech/mochi
go 1.23.0
toolchain go1.23.6
go 1.24.0
require (
code.cloudfoundry.org/go-diodes v0.0.0-20250505082646-e4c2d772c2ec
github.com/MicahParks/jwkset v0.9.6
github.com/MicahParks/keyfunc/v3 v3.4.0
code.cloudfoundry.org/go-diodes v0.0.0-20250909124000-1dfc755f0d96
github.com/MicahParks/jwkset v0.11.0
github.com/MicahParks/keyfunc/v3 v3.6.2
github.com/PowerDNS/lmdb-go v1.9.3
github.com/aws/aws-sdk-go-v2/config v1.31.8
github.com/aws/aws-sdk-go-v2/credentials v1.18.12
github.com/aws/aws-sdk-go-v2/service/s3 v1.88.1
github.com/aws/smithy-go v1.23.0
github.com/cespare/xxhash/v2 v2.3.0
github.com/fasthttp/router v1.5.4
github.com/golang-jwt/jwt/v5 v5.3.0
github.com/jackc/pgx/v5 v5.7.5
github.com/jackc/pgx/v5 v5.7.6
github.com/libp2p/go-reuseport v0.4.0
github.com/mitchellh/mapstructure v1.5.0
github.com/prometheus/client_golang v1.23.0
github.com/redis/go-redis/v9 v9.11.0
github.com/prometheus/client_golang v1.23.2
github.com/redis/go-redis/v9 v9.14.0
github.com/rs/zerolog v1.34.0
github.com/stretchr/testify v1.10.0
github.com/valyala/fasthttp v1.64.0
github.com/stretchr/testify v1.11.1
github.com/valyala/fasthttp v1.66.0
github.com/zeebo/bencode v1.0.0
gopkg.in/yaml.v3 v3.0.1
)
require (
github.com/andybalholm/brotli v1.2.0 // indirect
github.com/aws/aws-sdk-go-v2 v1.39.0 // indirect
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.7.1 // indirect
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.18.7 // indirect
github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.7 // indirect
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.7 // indirect
github.com/aws/aws-sdk-go-v2/internal/ini v1.8.3 // indirect
github.com/aws/aws-sdk-go-v2/internal/v4a v1.4.7 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.13.1 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.8.7 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.13.7 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.19.7 // indirect
github.com/aws/aws-sdk-go-v2/service/sso v1.29.3 // indirect
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.34.4 // indirect
github.com/aws/aws-sdk-go-v2/service/sts v1.38.4 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
@@ -39,15 +55,16 @@ require (
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/client_model v0.6.2 // indirect
github.com/prometheus/common v0.65.0 // indirect
github.com/prometheus/procfs v0.16.1 // indirect
github.com/prometheus/common v0.66.1 // indirect
github.com/prometheus/procfs v0.17.0 // indirect
github.com/savsgio/gotils v0.0.0-20250408102913-196191ec6287 // indirect
github.com/valyala/bytebufferpool v1.0.0 // indirect
golang.org/x/crypto v0.40.0 // indirect
golang.org/x/net v0.42.0 // indirect
golang.org/x/sync v0.16.0 // indirect
golang.org/x/sys v0.34.0 // indirect
golang.org/x/text v0.27.0 // indirect
golang.org/x/time v0.12.0 // indirect
google.golang.org/protobuf v1.36.6 // indirect
go.yaml.in/yaml/v2 v2.4.3 // indirect
golang.org/x/crypto v0.42.0 // indirect
golang.org/x/net v0.44.0 // indirect
golang.org/x/sync v0.17.0 // indirect
golang.org/x/sys v0.36.0 // indirect
golang.org/x/text v0.29.0 // indirect
golang.org/x/time v0.13.0 // indirect
google.golang.org/protobuf v1.36.9 // indirect
)
+86 -44
View File
@@ -1,13 +1,51 @@
code.cloudfoundry.org/go-diodes v0.0.0-20250505082646-e4c2d772c2ec h1:gPChm9h2S3zu/6gc4YdCE2DrOdfTULh/NoM9plF9Qk8=
code.cloudfoundry.org/go-diodes v0.0.0-20250505082646-e4c2d772c2ec/go.mod h1:YJ8cm88vWdmLKzmKO39JZPCSud8pHMHODECSEdSd+9k=
github.com/MicahParks/jwkset v0.9.6 h1:Tf8l2/MOby5Kh3IkrqzThPQKfLytMERoAsGZKlyYZxg=
github.com/MicahParks/jwkset v0.9.6/go.mod h1:U2oRhRaLgDCLjtpGL2GseNKGmZtLs/3O7p+OZaL5vo0=
github.com/MicahParks/keyfunc/v3 v3.4.0 h1:g03TXq6NjhZyO/UkODl//abm4KiLLNRi0VhW7vGOHyg=
github.com/MicahParks/keyfunc/v3 v3.4.0/go.mod h1:y6Ed3dMgNKTcpxbaQHD8mmrYDUZWJAxteddA6OQj+ag=
code.cloudfoundry.org/go-diodes v0.0.0-20250909124000-1dfc755f0d96 h1:RhpatInI+K7hN/IxhT5z5Jy8obH5NVDmlpWAI4LxNoo=
code.cloudfoundry.org/go-diodes v0.0.0-20250909124000-1dfc755f0d96/go.mod h1:UenGpPnsmAK4LqD5O9Z9FYFv52ebeyWuc39hnVRGCtM=
github.com/Masterminds/semver/v3 v3.4.0 h1:Zog+i5UMtVoCU8oKka5P7i9q9HgrJeGzI9SA1Xbatp0=
github.com/Masterminds/semver/v3 v3.4.0/go.mod h1:4V+yj/TJE1HU9XfppCwVMZq3I84lprf4nC11bSS5beM=
github.com/MicahParks/jwkset v0.11.0 h1:yc0zG+jCvZpWgFDFmvs8/8jqqVBG9oyIbmBtmjOhoyQ=
github.com/MicahParks/jwkset v0.11.0/go.mod h1:U2oRhRaLgDCLjtpGL2GseNKGmZtLs/3O7p+OZaL5vo0=
github.com/MicahParks/keyfunc/v3 v3.6.2 h1:82rre60MKw4r117ew5/T4m1AphgkpCOYry0RPbFUY3w=
github.com/MicahParks/keyfunc/v3 v3.6.2/go.mod h1:z66bkCviwqfg2YUp+Jcc/xRE9IXLcMq6DrgV/+Htru0=
github.com/PowerDNS/lmdb-go v1.9.3 h1:AUMY2pZT8WRpkEv39I9Id3MuoHd+NZbTVpNhruVkPTg=
github.com/PowerDNS/lmdb-go v1.9.3/go.mod h1:TE0l+EZK8Z1B4dx070ZxkWTlp8RG1mjN0/+FkFRQMtU=
github.com/andybalholm/brotli v1.2.0 h1:ukwgCxwYrmACq68yiUqwIWnGY0cTPox/M94sVwToPjQ=
github.com/andybalholm/brotli v1.2.0/go.mod h1:rzTDkvFWvIrjDXZHkuS16NPggd91W3kUSvPlQ1pLaKY=
github.com/aws/aws-sdk-go-v2 v1.39.0 h1:xm5WV/2L4emMRmMjHFykqiA4M/ra0DJVSWUkDyBjbg4=
github.com/aws/aws-sdk-go-v2 v1.39.0/go.mod h1:sDioUELIUO9Znk23YVmIk86/9DOpkbyyVb1i/gUNFXY=
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.7.1 h1:i8p8P4diljCr60PpJp6qZXNlgX4m2yQFpYk+9ZT+J4E=
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.7.1/go.mod h1:ddqbooRZYNoJ2dsTwOty16rM+/Aqmk/GOXrK8cg7V00=
github.com/aws/aws-sdk-go-v2/config v1.31.8 h1:kQjtOLlTU4m4A64TsRcqwNChhGCwaPBt+zCQt/oWsHU=
github.com/aws/aws-sdk-go-v2/config v1.31.8/go.mod h1:QPpc7IgljrKwH0+E6/KolCgr4WPLerURiU592AYzfSY=
github.com/aws/aws-sdk-go-v2/credentials v1.18.12 h1:zmc9e1q90wMn8wQbjryy8IwA6Q4XlaL9Bx2zIqdNNbk=
github.com/aws/aws-sdk-go-v2/credentials v1.18.12/go.mod h1:3VzdRDR5u3sSJRI4kYcOSIBbeYsgtVk7dG5R/U6qLWY=
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.18.7 h1:Is2tPmieqGS2edBnmOJIbdvOA6Op+rRpaYR60iBAwXM=
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.18.7/go.mod h1:F1i5V5421EGci570yABvpIXgRIBPb5JM+lSkHF6Dq5w=
github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.7 h1:UCxq0X9O3xrlENdKf1r9eRJoKz/b0AfGkpp3a7FPlhg=
github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.7/go.mod h1:rHRoJUNUASj5Z/0eqI4w32vKvC7atoWR0jC+IkmVH8k=
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.7 h1:Y6DTZUn7ZUC4th9FMBbo8LVE+1fyq3ofw+tRwkUd3PY=
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.7/go.mod h1:x3XE6vMnU9QvHN/Wrx2s44kwzV2o2g5x/siw4ZUJ9g8=
github.com/aws/aws-sdk-go-v2/internal/ini v1.8.3 h1:bIqFDwgGXXN1Kpp99pDOdKMTTb5d2KyU5X/BZxjOkRo=
github.com/aws/aws-sdk-go-v2/internal/ini v1.8.3/go.mod h1:H5O/EsxDWyU+LP/V8i5sm8cxoZgc2fdNR9bxlOFrQTo=
github.com/aws/aws-sdk-go-v2/internal/v4a v1.4.7 h1:BszAktdUo2xlzmYHjWMq70DqJ7cROM8iBd3f6hrpuMQ=
github.com/aws/aws-sdk-go-v2/internal/v4a v1.4.7/go.mod h1:XJ1yHki/P7ZPuG4fd3f0Pg/dSGA2cTQBCLw82MH2H48=
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.13.1 h1:oegbebPEMA/1Jny7kvwejowCaHz1FWZAQ94WXFNCyTM=
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.13.1/go.mod h1:kemo5Myr9ac0U9JfSjMo9yHLtw+pECEHsFtJ9tqCEI8=
github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.8.7 h1:zmZ8qvtE9chfhBPuKB2aQFxW5F/rpwXUgmcVCgQzqRw=
github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.8.7/go.mod h1:vVYfbpd2l+pKqlSIDIOgouxNsGu5il9uDp0ooWb0jys=
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.13.7 h1:mLgc5QIgOy26qyh5bvW+nDoAppxgn3J2WV3m9ewq7+8=
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.13.7/go.mod h1:wXb/eQnqt8mDQIQTTmcw58B5mYGxzLGZGK8PWNFZ0BA=
github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.19.7 h1:u3VbDKUCWarWiU+aIUK4gjTr/wQFXV17y3hgNno9fcA=
github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.19.7/go.mod h1:/OuMQwhSyRapYxq6ZNpPer8juGNrB4P5Oz8bZ2cgjQE=
github.com/aws/aws-sdk-go-v2/service/s3 v1.88.1 h1:+RpGuaQ72qnU83qBKVwxkznewEdAGhIWo/PQCmkhhog=
github.com/aws/aws-sdk-go-v2/service/s3 v1.88.1/go.mod h1:xajPTguLoeQMAOE44AAP2RQoUhF8ey1g5IFHARv71po=
github.com/aws/aws-sdk-go-v2/service/sso v1.29.3 h1:7PKX3VYsZ8LUWceVRuv0+PU+E7OtQb1lgmi5vmUE9CM=
github.com/aws/aws-sdk-go-v2/service/sso v1.29.3/go.mod h1:Ql6jE9kyyWI5JHn+61UT/Y5Z0oyVJGmgmJbZD5g4unY=
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.34.4 h1:e0XBRn3AptQotkyBFrHAxFB8mDhAIOfsG+7KyJ0dg98=
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.34.4/go.mod h1:XclEty74bsGBCr1s0VSaA11hQ4ZidK4viWK7rRfO88I=
github.com/aws/aws-sdk-go-v2/service/sts v1.38.4 h1:PR00NXRYgY4FWHqOGx3fC3lhVKjsp1GdloDv2ynMSd8=
github.com/aws/aws-sdk-go-v2/service/sts v1.38.4/go.mod h1:Z+Gd23v97pX9zK97+tX4ppAgqCt3Z2dIXB02CtBncK8=
github.com/aws/smithy-go v1.23.0 h1:8n6I3gXzWJB2DxBDnfxgBaSX6oe0d/t10qGz7OKqMCE=
github.com/aws/smithy-go v1.23.0/go.mod h1:t1ufH5HMublsJYulve2RKmHDC15xu1f26kHCp/HgceI=
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs=
@@ -25,8 +63,8 @@ github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/r
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc=
github.com/fasthttp/router v1.5.4 h1:oxdThbBwQgsDIYZ3wR1IavsNl6ZS9WdjKukeMikOnC8=
github.com/fasthttp/router v1.5.4/go.mod h1:3/hysWq6cky7dTfzaaEPZGdptwjwx0qzTgFCKEWRjgc=
github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY=
github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
github.com/go-logr/logr v1.4.3 h1:CjnDlHq8ikf6E492q6eKboGOC0T8CDaOvkHCIg8idEI=
github.com/go-logr/logr v1.4.3/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
github.com/go-task/slim-sprig/v3 v3.0.0 h1:sUs3vkvUymDpBKi3qH1YSqBQk9+9D/8M2mN1vB6EwHI=
github.com/go-task/slim-sprig/v3 v3.0.0/go.mod h1:W848ghGpv3Qj3dhTPRyJypKRiqCdHZiAzKg9hl15HA8=
github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=
@@ -34,14 +72,14 @@ github.com/golang-jwt/jwt/v5 v5.3.0 h1:pv4AsKCKKZuqlgs5sUmn4x8UlGa0kEVt/puTpKx9v
github.com/golang-jwt/jwt/v5 v5.3.0/go.mod h1:fxCRLWMO43lRc8nhHWY6LGqRcf+1gQWArsqaEUEa5bE=
github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8=
github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU=
github.com/google/pprof v0.0.0-20250403155104-27863c87afa6 h1:BHT72Gu3keYf3ZEu2J0b1vyeLSOYI8bm5wbJM/8yDe8=
github.com/google/pprof v0.0.0-20250403155104-27863c87afa6/go.mod h1:boTsfXsheKC2y+lKOCMpSfarhxDeIzfZG1jqGcPl3cA=
github.com/google/pprof v0.0.0-20250820193118-f64d9cf942d6 h1:EEHtgt9IwisQ2AZ4pIsMjahcegHh6rmhqxzIRQIyepY=
github.com/google/pprof v0.0.0-20250820193118-f64d9cf942d6/go.mod h1:I6V7YzU0XDpsHqbsyrghnFZLO1gwK6NPTNvmetQIk9U=
github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM=
github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg=
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 h1:iCEnooe7UlwOQYpKFhBabPMi4aNAfoODPEFNiAnClxo=
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM=
github.com/jackc/pgx/v5 v5.7.5 h1:JHGfMnQY+IEtGM63d+NGMjoRpysB2JBwDr5fsngwmJs=
github.com/jackc/pgx/v5 v5.7.5/go.mod h1:aruU7o91Tc2q2cFp5h4uP3f6ztExVpyVv88Xl/8Vl8M=
github.com/jackc/pgx/v5 v5.7.6 h1:rWQc5FwZSPX58r1OQmkuaNicxdmExaEz5A2DO2hUuTk=
github.com/jackc/pgx/v5 v5.7.6/go.mod h1:aruU7o91Tc2q2cFp5h4uP3f6ztExVpyVv88Xl/8Vl8M=
github.com/jackc/puddle/v2 v2.2.2 h1:PR8nw+E/1w0GLuRFSmiioY6UooMp6KJv0/61nB7icHo=
github.com/jackc/puddle/v2 v2.2.2/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4=
github.com/klauspost/compress v1.18.0 h1:c/Cqfb0r+Yi+JtIEq73FWXVkRonBlf0CRNYc8Zttxdo=
@@ -65,23 +103,23 @@ github.com/mitchellh/mapstructure v1.5.0 h1:jeMsZIYE/09sWLaz43PL7Gy6RuMjD2eJVyua
github.com/mitchellh/mapstructure v1.5.0/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo=
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA=
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ=
github.com/onsi/ginkgo/v2 v2.23.4 h1:ktYTpKJAVZnDT4VjxSbiBenUjmlL/5QkBEocaWXiQus=
github.com/onsi/ginkgo/v2 v2.23.4/go.mod h1:Bt66ApGPBFzHyR+JO10Zbt0Gsp4uWxu5mIOTusL46e8=
github.com/onsi/gomega v1.37.0 h1:CdEG8g0S133B4OswTDC/5XPSzE1OeP29QOioj2PID2Y=
github.com/onsi/gomega v1.37.0/go.mod h1:8D9+Txp43QWKhM24yyOBEdpkzN8FvJyAwecBgsU4KU0=
github.com/onsi/ginkgo/v2 v2.25.3 h1:Ty8+Yi/ayDAGtk4XxmmfUy4GabvM+MegeB4cDLRi6nw=
github.com/onsi/ginkgo/v2 v2.25.3/go.mod h1:43uiyQC4Ed2tkOzLsEYm7hnrb7UJTWHYNsuy3bG/snE=
github.com/onsi/gomega v1.38.2 h1:eZCjf2xjZAqe+LeWvKb5weQ+NcPwX84kqJ0cZNxok2A=
github.com/onsi/gomega v1.38.2/go.mod h1:W2MJcYxRGV63b418Ai34Ud0hEdTVXq9NW9+Sx6uXf3k=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/prometheus/client_golang v1.23.0 h1:ust4zpdl9r4trLY/gSjlm07PuiBq2ynaXXlptpfy8Uc=
github.com/prometheus/client_golang v1.23.0/go.mod h1:i/o0R9ByOnHX0McrTMTyhYvKE4haaf2mW08I+jGAjEE=
github.com/prometheus/client_golang v1.23.2 h1:Je96obch5RDVy3FDMndoUsjAhG5Edi49h0RJWRi/o0o=
github.com/prometheus/client_golang v1.23.2/go.mod h1:Tb1a6LWHB3/SPIzCoaDXI4I8UHKeFTEQ1YCr+0Gyqmg=
github.com/prometheus/client_model v0.6.2 h1:oBsgwpGs7iVziMvrGhE53c/GrLUsZdHnqNwqPLxwZyk=
github.com/prometheus/client_model v0.6.2/go.mod h1:y3m2F6Gdpfy6Ut/GBsUqTWZqCUvMVzSfMLjcu6wAwpE=
github.com/prometheus/common v0.65.0 h1:QDwzd+G1twt//Kwj/Ww6E9FQq1iVMmODnILtW1t2VzE=
github.com/prometheus/common v0.65.0/go.mod h1:0gZns+BLRQ3V6NdaerOhMbwwRbNh9hkGINtQAsP5GS8=
github.com/prometheus/procfs v0.16.1 h1:hZ15bTNuirocR6u0JZ6BAHHmwS1p8B4P6MRqxtzMyRg=
github.com/prometheus/procfs v0.16.1/go.mod h1:teAbpZRB1iIAJYREa1LsoWUXykVXA1KlTmWl8x/U+Is=
github.com/redis/go-redis/v9 v9.11.0 h1:E3S08Gl/nJNn5vkxd2i78wZxWAPNZgUNTp8WIJUAiIs=
github.com/redis/go-redis/v9 v9.11.0/go.mod h1:huWgSWd8mW6+m0VPhJjSSQ+d6Nh1VICQ6Q5lHuCH/Iw=
github.com/prometheus/common v0.66.1 h1:h5E0h5/Y8niHc5DlaLlWLArTQI7tMrsfQjHV+d9ZoGs=
github.com/prometheus/common v0.66.1/go.mod h1:gcaUsgf3KfRSwHY4dIMXLPV0K/Wg1oZ8+SbZk/HH/dA=
github.com/prometheus/procfs v0.17.0 h1:FuLQ+05u4ZI+SS/w9+BWEM2TXiHKsUQ9TADiRH7DuK0=
github.com/prometheus/procfs v0.17.0/go.mod h1:oPQLaDAMRbA+u8H5Pbfq+dl3VDAvHxMUOVhe0wYB2zw=
github.com/redis/go-redis/v9 v9.14.0 h1:u4tNCjXOyzfgeLN+vAZaW1xUooqWDqVEsZN0U01jfAE=
github.com/redis/go-redis/v9 v9.14.0/go.mod h1:huWgSWd8mW6+m0VPhJjSSQ+d6Nh1VICQ6Q5lHuCH/Iw=
github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ=
github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog=
github.com/rs/xid v1.6.0/go.mod h1:7XoLgs4eV+QndskICGsho+ADou8ySMSjJKDIan90Nz0=
@@ -92,12 +130,12 @@ github.com/savsgio/gotils v0.0.0-20250408102913-196191ec6287/go.mod h1:sM7Mt7uEo
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA=
github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U=
github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U=
github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw=
github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc=
github.com/valyala/fasthttp v1.64.0 h1:QBygLLQmiAyiXuRhthf0tuRkqAFcrC42dckN2S+N3og=
github.com/valyala/fasthttp v1.64.0/go.mod h1:dGmFxwkWXSK0NbOSJuF7AMVzU+lkHz0wQVvVITv2UQA=
github.com/valyala/fasthttp v1.66.0 h1:M87A0Z7EayeyNaV6pfO3tUTUiYO0dZfEJnRGXTVNuyU=
github.com/valyala/fasthttp v1.66.0/go.mod h1:Y4eC+zwoocmXSVCB1JmhNbYtS7tZPRI2ztPB72EVObs=
github.com/xyproto/randomstring v1.0.5 h1:YtlWPoRdgMu3NZtP45drfy1GKoojuR7hmRcnhZqKjWU=
github.com/xyproto/randomstring v1.0.5/go.mod h1:rgmS5DeNXLivK7YprL0pY+lTuhNQW3iGxZ18UQApw/E=
github.com/zeebo/bencode v1.0.0 h1:zgop0Wu1nu4IexAZeCZ5qbsjU4O1vMrfCrVgUjbHVuA=
@@ -106,25 +144,29 @@ go.uber.org/automaxprocs v1.6.0 h1:O3y2/QNTOdbF+e/dpXNNW7Rx2hZ4sTIPyybbxyNqTUs=
go.uber.org/automaxprocs v1.6.0/go.mod h1:ifeIMSnPZuznNm6jmdzmU3/bfk01Fe2fotchwEFJ8r8=
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
golang.org/x/crypto v0.40.0 h1:r4x+VvoG5Fm+eJcxMaY8CQM7Lb0l1lsmjGBQ6s8BfKM=
golang.org/x/crypto v0.40.0/go.mod h1:Qr1vMER5WyS2dfPHAlsOj01wgLbsyWtFn/aY+5+ZdxY=
golang.org/x/net v0.42.0 h1:jzkYrhi3YQWD6MLBJcsklgQsoAcw89EcZbJw8Z614hs=
golang.org/x/net v0.42.0/go.mod h1:FF1RA5d3u7nAYA4z2TkclSCKh68eSXtiFwcWQpPXdt8=
golang.org/x/sync v0.16.0 h1:ycBJEhp9p4vXvUZNszeOq0kGTPghopOL8q0fq3vstxw=
golang.org/x/sync v0.16.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA=
go.yaml.in/yaml/v2 v2.4.3 h1:6gvOSjQoTB3vt1l+CU+tSyi/HOjfOjRLJ4YwYZGwRO0=
go.yaml.in/yaml/v2 v2.4.3/go.mod h1:zSxWcmIDjOzPXpjlTTbAsKokqkDNAVtZO0WOMiT90s8=
go.yaml.in/yaml/v3 v3.0.4 h1:tfq32ie2Jv2UxXFdLJdh3jXuOzWiL1fo0bu/FbuKpbc=
go.yaml.in/yaml/v3 v3.0.4/go.mod h1:DhzuOOF2ATzADvBadXxruRBLzYTpT36CKvDb3+aBEFg=
golang.org/x/crypto v0.42.0 h1:chiH31gIWm57EkTXpwnqf8qeuMUi0yekh6mT2AvFlqI=
golang.org/x/crypto v0.42.0/go.mod h1:4+rDnOTJhQCx2q7/j6rAN5XDw8kPjeaXEUR2eL94ix8=
golang.org/x/net v0.44.0 h1:evd8IRDyfNBMBTTY5XRF1vaZlD+EmWx6x8PkhR04H/I=
golang.org/x/net v0.44.0/go.mod h1:ECOoLqd5U3Lhyeyo/QDCEVQ4sNgYsqvCZ722XogGieY=
golang.org/x/sync v0.17.0 h1:l60nONMj9l5drqw6jlhIELNv9I0A4OFgRsG9k2oT9Ug=
golang.org/x/sync v0.17.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI=
golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.34.0 h1:H5Y5sJ2L2JRdyv7ROF1he/lPdvFsd0mJHFw2ThKHxLA=
golang.org/x/sys v0.34.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k=
golang.org/x/text v0.27.0 h1:4fGWRpyh641NLlecmyl4LOe6yDdfaYNrGb2zdfo4JV4=
golang.org/x/text v0.27.0/go.mod h1:1D28KMCvyooCX9hBiosv5Tz/+YLxj0j7XhWjpSUF7CU=
golang.org/x/time v0.12.0 h1:ScB/8o8olJvc+CQPWrK3fPZNfh7qgwCrY0zJmoEQLSE=
golang.org/x/time v0.12.0/go.mod h1:CDIdPxbZBQxdj6cxyCIdrNogrJKMJ7pr37NYpMcMDSg=
golang.org/x/tools v0.34.0 h1:qIpSLOxeCYGg9TrcJokLBG4KFA6d795g0xkBkiESGlo=
golang.org/x/tools v0.34.0/go.mod h1:pAP9OwEaY1CAW3HOmg3hLZC5Z0CCmzjAF2UQMSqNARg=
google.golang.org/protobuf v1.36.6 h1:z1NpPI8ku2WgiWnf+t9wTPsn6eP1L7ksHUlkfLvd9xY=
google.golang.org/protobuf v1.36.6/go.mod h1:jduwjTPXsFjZGTmRluh+L6NjiWu7pchiJ2/5YcXBHnY=
golang.org/x/sys v0.36.0 h1:KVRy2GtZBrk1cBYA7MKu5bEZFxQk4NIDV6RLVcC8o0k=
golang.org/x/sys v0.36.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks=
golang.org/x/text v0.29.0 h1:1neNs90w9YzJ9BocxfsQNHKuAT4pkghyXc4nhZ6sJvk=
golang.org/x/text v0.29.0/go.mod h1:7MhJOA9CD2qZyOKYazxdYMF85OwPdEr9jTtBpO7ydH4=
golang.org/x/time v0.13.0 h1:eUlYslOIt32DgYD6utsuUeHs4d7AsEYLuIAdg7FlYgI=
golang.org/x/time v0.13.0/go.mod h1:eL/Oa2bBBK0TkX57Fyni+NgnyQQN4LitPmob2Hjnqw4=
golang.org/x/tools v0.36.0 h1:kWS0uv/zsvHEle1LbV5LE8QujrxB3wfQyxHfhOk0Qkg=
golang.org/x/tools v0.36.0/go.mod h1:WBDiHKJK8YgLHlcQPYQzNCkUxUypCaa5ZegCVutKm+s=
google.golang.org/protobuf v1.36.9 h1:w2gp2mA27hUeUzj9Ex9FBjsBm40zfaDtEWow293U7Iw=
google.golang.org/protobuf v1.36.9/go.mod h1:fuxRtAxBytpl4zzqUh6/eyUujkJdNiuEkXntxiD/uRU=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
+6 -6
View File
@@ -43,7 +43,7 @@ type swarmInteractionHook struct {
func (h *swarmInteractionHook) HandleAnnounce(ctx context.Context, req *bittorrent.AnnounceRequest, _ *bittorrent.AnnounceResponse) (outCtx context.Context, err error) {
outCtx = ctx
if ctx.Value(SkipSwarmInteractionKey) != nil {
return
return outCtx, err
}
var storeFn func(context.Context, bittorrent.InfoHash, bittorrent.Peer) error
@@ -82,7 +82,7 @@ func (h *swarmInteractionHook) HandleAnnounce(ctx context.Context, req *bittorre
}
}
return
return outCtx, err
}
func (h *swarmInteractionHook) HandleScrape(ctx context.Context, _ *bittorrent.ScrapeRequest, _ *bittorrent.ScrapeResponse) (context.Context, error) {
@@ -105,17 +105,17 @@ type responseHook struct {
func (h *responseHook) scrape(ctx context.Context, ih bittorrent.InfoHash) (leechers uint32, seeders uint32, snatched uint32, err error) {
leechers, seeders, snatched, err = h.store.ScrapeSwarm(ctx, ih)
if err != nil {
return
return leechers, seeders, snatched, err
}
if len(ih) == bittorrent.InfoHashV2Len {
var l, s, n uint32
l, s, n, err = h.store.ScrapeSwarm(ctx, ih.TruncateV1())
if err != nil {
return
return leechers, seeders, snatched, err
}
leechers, seeders, snatched = leechers+l, seeders+s, snatched+n
}
return
return leechers, seeders, snatched, err
}
func (h *responseHook) HandleAnnounce(ctx context.Context, req *bittorrent.AnnounceRequest, resp *bittorrent.AnnounceResponse) (_ context.Context, err error) {
@@ -209,7 +209,7 @@ func (h *responseHook) appendPeers(ctx context.Context, req *bittorrent.Announce
}
}
return
return err
}
func (h *responseHook) HandleScrape(ctx context.Context, req *bittorrent.ScrapeRequest, resp *bittorrent.ScrapeResponse) (_ context.Context, err error) {
+2 -2
View File
@@ -113,7 +113,7 @@ func build(config conf.MapConfig, _ storage.PeerStorage) (h middleware.Hook, err
err = errJWKsNotSet
}
return
return h, err
}
type announceClaims struct {
@@ -237,5 +237,5 @@ func (h *hook) getJWTString(params bittorrent.Params) (jwt string) {
}
}
}
return
return jwt
}
+1 -1
View File
@@ -56,7 +56,7 @@ type params map[string]string
func (p params) GetString(key string) (out string, found bool) {
out, found = p[key]
return
return out, found
}
func (params) MarshalZerologObject(*zerolog.Event) {}
+1 -1
View File
@@ -113,5 +113,5 @@ func (l *Logic) Ping(ctx context.Context) (err error) {
break
}
}
return
return err
}
+1 -1
View File
@@ -62,5 +62,5 @@ func NewHooks(configs []conf.NamedMapConfig, storage storage.PeerStorage) (hooks
logger.Info().Str("name", c.Name).Msg("hook started")
}
return
return hooks, err
}
@@ -10,6 +10,7 @@ import (
"crypto/sha256"
"fmt"
"io"
"iter"
"os"
"path/filepath"
"strings"
@@ -52,55 +53,111 @@ func build(conf conf.MapConfig, st storage.DataStorage) (container.Container, er
if err := conf.Unmarshal(c); err != nil {
return nil, fmt.Errorf("unable to deserialise configuration: %w", err)
}
var err error
d := &directory{
List: list.List{
Invert: c.Invert,
Storage: st,
StorageCtx: c.StorageCtx,
},
closed: make(chan bool),
}
if len(d.StorageCtx) == 0 {
logger.Warn().
Str("name", "StorageCtx").
Str("provided", d.StorageCtx).
Str("default", container.DefaultStorageCtxName).
Msg("falling back to default configuration")
d.StorageCtx = container.DefaultStorageCtxName
}
if c.Period == 0 {
logger.Warn().
Str("name", "Period").
Dur("provided", 0).
Dur("default", defaultPeriod).
Msg("falling back to default configuration")
c.Period = defaultPeriod
}
go d.runScan(c.Path, c.Period)
return d, err
d := NewScanner(list.List{
Invert: c.Invert,
Storage: st,
StorageCtx: c.StorageCtx,
}, path(c.Path), c.Period)
go d.Run()
return d, nil
}
// BencodeRawBytes wrapper for byte slice to get raw 'info' section from
// torrent file
type BencodeRawBytes []byte
type bencodeRawBytes []byte
// UnmarshalBencode just appends raw byte slice to result
func (ba *BencodeRawBytes) UnmarshalBencode(in []byte) error {
func (ba *bencodeRawBytes) UnmarshalBencode(in []byte) error {
*ba = append([]byte(nil), in...)
return nil
}
type torrentRawInfoStruct struct {
Info BencodeRawBytes `bencode:"info"`
Info bencodeRawBytes `bencode:"info"`
}
type torrentNameInfoStruct struct {
Name string `bencode:"name"`
}
func (d *directory) runScan(path string, period time.Duration) {
t := time.NewTicker(period)
// PathReader - interface for abstract directory-like reader
type PathReader interface {
// ReadDir returns names of torrent entries.
// Implementation must return absolute paths of entries
// to fetch torrent file-like data.
ReadDir() (it iter.Seq[string], err error)
// ReadData returns reader for entry data
ReadData(entry string) (io.ReadCloser, error)
}
type path string
var _ PathReader = path("")
func (p path) ReadDir() (it iter.Seq[string], err error) {
var entries []os.DirEntry
dir := string(p)
if entries, err = os.ReadDir(dir); err == nil {
it = func(yield func(string) bool) {
for _, e := range entries {
if !e.IsDir() && strings.ToLower(filepath.Ext(e.Name())) == ".torrent" {
if !yield(filepath.Join(dir, e.Name())) {
return
}
}
}
}
}
return it, err
}
func (p path) ReadData(entry string) (io.ReadCloser, error) {
return os.Open(entry)
}
// NewScanner creates Scanner instance.
func NewScanner(list list.List, reader PathReader, period time.Duration) *Scanner {
if len(list.StorageCtx) == 0 {
logger.Warn().
Str("name", "StorageCtx").
Str("provided", list.StorageCtx).
Str("default", container.DefaultStorageCtxName).
Msg("falling back to default configuration")
list.StorageCtx = container.DefaultStorageCtxName
}
if period == 0 {
logger.Warn().
Str("name", "Period").
Dur("provided", 0).
Dur("default", defaultPeriod).
Msg("falling back to default configuration")
period = defaultPeriod
}
return &Scanner{
List: list,
reader: reader,
period: period,
closed: make(chan bool),
}
}
// Scanner holds list of approved/rejected torrents
type Scanner struct {
list.List
reader PathReader
period time.Duration
closed chan bool
}
// Run starts periodic directory scanning and blocks until Stop called
func (d *Scanner) Run() {
if d.reader == nil {
log.Warn().Msg("reader not provided")
return
}
if d.period == 0 {
log.Warn().Msg("period not provided")
return
}
t := time.NewTicker(d.period)
defer t.Stop()
files := make(map[string][2]bittorrent.InfoHash)
tmpFiles := make(map[string]bool)
@@ -112,16 +169,14 @@ func (d *directory) runScan(path string, period time.Duration) {
return
case <-t.C:
logger.Debug().Msg("starting directory scan")
if entries, err := os.ReadDir(path); err == nil {
for _, e := range entries {
if !e.IsDir() && strings.ToLower(filepath.Ext(e.Name())) == ".torrent" {
tmpFiles[filepath.Join(path, e.Name())] = true
}
if entries, err := d.reader.ReadDir(); err == nil {
for e := range entries {
tmpFiles[e] = true
}
for p := range tmpFiles {
if _, exists := files[p]; !exists {
var f *os.File
if f, err = os.Open(p); err == nil {
var f io.ReadCloser
if f, err = d.reader.ReadData(p); err == nil {
var info torrentRawInfoStruct
err = bencode.NewDecoder(io.LimitReader(f, maxTorrentSize)).Decode(&info)
_ = f.Close()
@@ -186,13 +241,8 @@ func (d *directory) runScan(path string, period time.Duration) {
}
}
type directory struct {
list.List
closed chan bool
}
// Close closes watching of torrent directory
func (d *directory) Close() error {
func (d *Scanner) Close() error {
if d.closed != nil {
close(d.closed)
}
@@ -0,0 +1,108 @@
package directory
import (
"context"
"encoding/base64"
"encoding/hex"
"os"
"path/filepath"
"testing"
"time"
"github.com/stretchr/testify/require"
"github.com/sot-tech/mochi/middleware/torrentapproval/container/list"
"github.com/sot-tech/mochi/pkg/conf"
"github.com/sot-tech/mochi/pkg/log"
"github.com/sot-tech/mochi/storage/memory"
)
type testData struct {
name string
data []byte
hash string
}
func unHEX(s string) string {
b, _ := hex.DecodeString(s)
return string(b)
}
func unBase64(s string) []byte {
b, _ := base64.StdEncoding.DecodeString(s)
return b
}
// contains created torrent file data from simple txt documents.
// data base64 encoded because `pieces` section in torrent file is raw bytes
var files = [2]testData{
{
name: "test0.torrent",
data: unBase64(`ZDEwOmNyZWF0ZWQgYnkzMTpUcmFuc21pc3Npb24vNC4wLjYgKDM4YzE2NDkzM2UpMTM6Y3JlYXRp
b24gZGF0ZWkxNzU1ODYxOTI3ZTg6ZW5jb2Rpbmc1OlVURi04NDppbmZvZDY6bGVuZ3RoaTVlNDpu
YW1lODp0ZXN0LnR4dDEyOnBpZWNlIGxlbmd0aGkzMjc2OGU2OnBpZWNlczIwOk4SQ70ixm52wrqe
3cH5E5Tlf5+DZWU=`),
hash: unHEX("a10e8e9e81702bf8482f251551ff4fe011cba6a7"),
},
{
name: "test1.torrent",
data: unBase64(`ZDEwOmNyZWF0ZWQgYnkzMTpUcmFuc21pc3Npb24vNC4wLjYgKDM4YzE2NDkzM2UpMTM6Y3JlYXRp
b24gZGF0ZWkxNzU2MTIzNzEwZTg6ZW5jb2Rpbmc1OlVURi04NDppbmZvZDY6bGVuZ3RoaTRlNDpu
YW1lOTp0ZXN0MC50eHQxMjpwaWVjZSBsZW5ndGhpMzI3NjhlNjpwaWVjZXMyMDqo/cIFqfGcwcdQ
emDE8BsT0R1/0GVl`),
hash: unHEX("e86d393bd458d2acc46d5467bc8cb8b30b1bfa77"),
},
}
func init() {
_ = log.ConfigureLogger("", "warn", false, false)
}
func writeTmp() (string, error) {
tmpDir, err := os.MkdirTemp("", "")
if err != nil {
return "", err
}
for _, f := range files {
if err = os.WriteFile(filepath.Join(tmpDir, f.name), f.data, 0o600); err != nil {
break
}
}
return tmpDir, err
}
func TestScan(t *testing.T) {
tmpDir, err := writeTmp()
t.Cleanup(func() {
err := os.RemoveAll(tmpDir)
if err != nil {
t.Log(err)
}
})
if err != nil {
t.Error(err)
return
}
st, _ := memory.Builder{}.NewDataStorage(make(conf.MapConfig))
d := NewScanner(list.List{
Invert: false,
Storage: st,
StorageCtx: "TEST",
}, path(tmpDir), time.Millisecond*10)
go d.Run()
t.Cleanup(func() {
_ = d.Close()
})
time.Sleep(time.Millisecond * 100)
for _, f := range files {
contains, _ := d.Storage.Contains(context.Background(), "TEST", f.hash)
require.True(t, contains, "%s must present", f.name)
_ = os.Remove(filepath.Join(tmpDir, f.name))
}
time.Sleep(time.Millisecond * 100)
for _, f := range files {
contains, _ := d.Storage.Contains(context.Background(), "TEST", f.hash)
require.False(t, contains, "%s must absent", f.name)
}
}
@@ -0,0 +1,168 @@
// Package s3 implements container which
// checks if hash present in any of torrent file
// placed in S3-like storage.
package s3
import (
"context"
"errors"
"fmt"
"io"
"iter"
"strings"
"time"
"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/credentials"
awss3 "github.com/aws/aws-sdk-go-v2/service/s3"
"github.com/aws/smithy-go/logging"
"github.com/sot-tech/mochi/middleware/torrentapproval/container"
"github.com/sot-tech/mochi/middleware/torrentapproval/container/directory"
"github.com/sot-tech/mochi/middleware/torrentapproval/container/list"
"github.com/sot-tech/mochi/pkg/conf"
"github.com/sot-tech/mochi/pkg/log"
"github.com/sot-tech/mochi/storage"
)
var logger = log.NewLogger("middleware/torrent approval/s3")
const defaultPeriod = time.Minute
// Config - implementation of S3 container configuration.
// Extends list.Config because uses the same storage and Approved function.
type Config struct {
list.Config
Endpoint string
Region string
KeyID string `cfg:"key_id"`
KeySecret string `cfg:"key_secret"`
SessionToken string `cfg:"session_token"`
Bucket string
Prefix string
Suffix string
Period time.Duration
}
func init() {
container.Register("s3", build)
}
func build(conf conf.MapConfig, st storage.DataStorage) (container.Container, error) {
c := new(Config)
if err := conf.Unmarshal(c); err != nil {
return nil, fmt.Errorf("unable to deserialise configuration: %w", err)
}
if len(c.Bucket) == 0 {
return nil, errors.New("no bucket provided")
}
if c.Period == 0 {
logger.Warn().
Str("name", "Period").
Dur("provided", 0).
Dur("default", defaultPeriod).
Msg("falling back to default configuration")
c.Period = defaultPeriod
}
modifiers := make([]func(*config.LoadOptions) error, 1, 4)
modifiers[0] = config.WithLogger(logging.LoggerFunc(func(
classification logging.Classification, format string, v ...interface{},
) {
switch classification {
case logging.Debug:
logger.Debug().CallerSkipFrame(1).Msg(fmt.Sprintf(format, v...))
case logging.Warn:
logger.Warn().CallerSkipFrame(1).Msg(fmt.Sprintf(format, v...))
}
}))
if len(c.Endpoint) > 0 {
modifiers = append(modifiers, config.WithBaseEndpoint(c.Endpoint))
}
if len(c.Region) > 0 {
modifiers = append(modifiers, config.WithRegion(c.Endpoint))
}
if len(c.KeyID) > 0 || len(c.KeySecret) > 0 || len(c.SessionToken) > 0 {
modifiers = append(modifiers, config.WithCredentialsProvider(
credentials.NewStaticCredentialsProvider(c.KeyID, c.KeySecret, c.SessionToken)),
)
}
awsCfg, err := config.LoadDefaultConfig(context.Background(), modifiers...)
if err != nil {
return nil, fmt.Errorf("unable load AWS S3 SDK configuration: %w", err)
}
if len(c.KeyID) > 0 || len(c.KeySecret) > 0 || len(c.SessionToken) > 0 {
awsCfg.Credentials = credentials.NewStaticCredentialsProvider(c.KeyID, c.KeySecret, c.SessionToken)
}
s := directory.NewScanner(list.List{
Invert: c.Invert,
Storage: st,
StorageCtx: c.StorageCtx,
}, s3{
client: awss3.NewFromConfig(awsCfg),
bucket: c.Bucket,
prefix: c.Prefix,
suffix: c.Suffix,
}, c.Period)
go s.Run()
return s, err
}
type s3Client interface {
ListObjectsV2(
ctx context.Context, input *awss3.ListObjectsV2Input, f ...func(*awss3.Options),
) (*awss3.ListObjectsV2Output, error)
GetObject(
ctx context.Context, params *awss3.GetObjectInput, optFns ...func(*awss3.Options),
) (*awss3.GetObjectOutput, error)
}
type s3 struct {
client s3Client
bucket, prefix, suffix string
}
var _ directory.PathReader = s3{}
func (s s3) ReadDir() (it iter.Seq[string], err error) {
search := &awss3.ListObjectsV2Input{Bucket: &s.bucket}
if len(s.prefix) > 0 {
search.Prefix = &s.prefix
}
entries, err := s.client.ListObjectsV2(context.Background(), search)
if err == nil {
it = func(yield func(string) bool) {
for _, e := range entries.Contents {
logger.Trace().Any("content", e).Msg("s3 read")
if e.Key != nil && strings.HasSuffix(*e.Key, s.suffix) {
if !yield(*e.Key) {
return
}
}
}
}
}
return it, err
}
func (s s3) ReadData(entry string) (data io.ReadCloser, err error) {
var result *awss3.GetObjectOutput
result, err = s.client.GetObject(context.Background(), &awss3.GetObjectInput{
Bucket: &s.bucket,
Key: &entry,
})
if err == nil {
data = result.Body
}
return data, err
}
@@ -0,0 +1,232 @@
package s3
import (
"bytes"
"context"
"encoding/base64"
"encoding/hex"
"io"
"sync"
"testing"
"time"
"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/credentials"
awss3 "github.com/aws/aws-sdk-go-v2/service/s3"
"github.com/aws/aws-sdk-go-v2/service/s3/types"
"github.com/stretchr/testify/require"
"github.com/sot-tech/mochi/middleware/torrentapproval/container/directory"
"github.com/sot-tech/mochi/middleware/torrentapproval/container/list"
"github.com/sot-tech/mochi/pkg/conf"
"github.com/sot-tech/mochi/pkg/log"
"github.com/sot-tech/mochi/storage/memory"
)
type testData struct {
data []byte
hash string
}
func unHEX(s string) string {
b, _ := hex.DecodeString(s)
return string(b)
}
func unBase64(s string) []byte {
b, _ := base64.StdEncoding.DecodeString(s)
return b
}
// contains created torrent file data from simple txt documents.
// data base64 encoded because `pieces` section in torrent file is raw bytes
var files = map[string]testData{
"test0.torrent": {
data: unBase64(`ZDEwOmNyZWF0ZWQgYnkzMTpUcmFuc21pc3Npb24vNC4wLjYgKDM4YzE2NDkzM2UpMTM6Y3JlYXRp
b24gZGF0ZWkxNzU1ODYxOTI3ZTg6ZW5jb2Rpbmc1OlVURi04NDppbmZvZDY6bGVuZ3RoaTVlNDpu
YW1lODp0ZXN0LnR4dDEyOnBpZWNlIGxlbmd0aGkzMjc2OGU2OnBpZWNlczIwOk4SQ70ixm52wrqe
3cH5E5Tlf5+DZWU=`),
hash: unHEX("a10e8e9e81702bf8482f251551ff4fe011cba6a7"),
},
"test1.torrent": {
data: unBase64(`ZDEwOmNyZWF0ZWQgYnkzMTpUcmFuc21pc3Npb24vNC4wLjYgKDM4YzE2NDkzM2UpMTM6Y3JlYXRp
b24gZGF0ZWkxNzU2MTIzNzEwZTg6ZW5jb2Rpbmc1OlVURi04NDppbmZvZDY6bGVuZ3RoaTRlNDpu
YW1lOTp0ZXN0MC50eHQxMjpwaWVjZSBsZW5ndGhpMzI3NjhlNjpwaWVjZXMyMDqo/cIFqfGcwcdQ
emDE8BsT0R1/0GVl`),
hash: unHEX("e86d393bd458d2acc46d5467bc8cb8b30b1bfa77"),
},
}
func init() {
_ = log.ConfigureLogger("", "warn", false, false)
}
type mockS3 struct {
objs []types.Object
objsMu *sync.Mutex
}
func (m *mockS3) ListObjectsV2(
context.Context, *awss3.ListObjectsV2Input, ...func(*awss3.Options),
) (*awss3.ListObjectsV2Output, error) {
m.objsMu.Lock()
defer m.objsMu.Unlock()
return &awss3.ListObjectsV2Output{
Contents: m.objs,
}, nil
}
var _ s3Client = &mockS3{}
func (m *mockS3) GetObject(
_ context.Context, params *awss3.GetObjectInput, _ ...func(*awss3.Options),
) (*awss3.GetObjectOutput, error) {
if params == nil || params.Key == nil {
return nil, nil
}
v := files[*params.Key]
return &awss3.GetObjectOutput{
Body: io.NopCloser(bytes.NewReader(v.data)),
}, nil
}
func TestScanMock(t *testing.T) {
cl := &mockS3{objs: make([]types.Object, 0, len(files)), objsMu: &sync.Mutex{}}
for k := range files {
cl.objs = append(cl.objs, types.Object{Key: &k})
}
st, _ := memory.Builder{}.NewDataStorage(make(conf.MapConfig))
d := directory.NewScanner(list.List{
Invert: false,
Storage: st,
StorageCtx: "TEST",
}, s3{
client: cl,
}, time.Millisecond*10)
go d.Run()
t.Cleanup(func() {
_ = d.Close()
})
time.Sleep(time.Millisecond * 100)
for name, f := range files {
contains, _ := d.Storage.Contains(context.Background(), "TEST", f.hash)
require.True(t, contains, "%s must present", name)
}
cl.objsMu.Lock()
cl.objs = []types.Object{}
cl.objsMu.Unlock()
time.Sleep(time.Millisecond * 100)
for name, f := range files {
contains, _ := d.Storage.Contains(context.Background(), "TEST", f.hash)
require.False(t, contains, "%s must absent", name)
}
}
var (
minioEndpoint = "http://127.0.0.1:9000"
minioBucket = "test"
minioPrefix = "test/"
)
const (
minioKeyID = "minioadmin"
minioSecret = "minioadmin"
minioRegion = "us-east-1"
)
// TestScanMinio requires real minio instance listening 127.0.0.1:9000
// with default login, password and region (minioadmin/minioadmin, us-east-1)
func TestScanMinio(t *testing.T) {
st, _ := memory.Builder{}.NewDataStorage(make(conf.MapConfig))
awsCfg, err := config.LoadDefaultConfig(context.Background())
if err != nil {
t.Fatal(err)
}
awsCfg.BaseEndpoint = &minioEndpoint
awsCfg.Region = minioRegion
awsCfg.Credentials = credentials.NewStaticCredentialsProvider(minioKeyID, minioSecret, "")
cl := awss3.NewFromConfig(awsCfg)
_, _ = cl.CreateBucket(context.Background(), &awss3.CreateBucketInput{Bucket: &minioBucket})
for name, data := range files {
_, err = cl.PutObject(context.Background(), &awss3.PutObjectInput{
Bucket: &minioBucket,
Key: &name,
Body: bytes.NewReader(data.data),
})
if err != nil {
t.Fatal(err)
}
name = minioPrefix + name
_, err = cl.PutObject(context.Background(), &awss3.PutObjectInput{
Bucket: &minioBucket,
Key: &name,
Body: bytes.NewReader(data.data),
})
if err != nil {
t.Fatal(err)
}
name += "1"
_, err = cl.PutObject(context.Background(), &awss3.PutObjectInput{
Bucket: &minioBucket,
Key: &name,
Body: bytes.NewReader(data.data),
})
if err != nil {
t.Fatal(err)
}
}
s3Dir := s3{
client: cl,
bucket: minioBucket,
prefix: minioPrefix,
suffix: ".torrent",
}
if content, err := s3Dir.ReadDir(); err == nil {
var i int
for range content {
i++
}
require.Equal(t, len(files), i, "S3 content data not the same as test data")
} else {
t.Fatal(err)
}
d := directory.NewScanner(list.List{
Invert: false,
Storage: st,
StorageCtx: "TEST",
}, s3Dir, time.Millisecond*100)
go d.Run()
t.Cleanup(func() {
_ = d.Close()
})
time.Sleep(time.Millisecond * 200)
for name, f := range files {
contains, _ := d.Storage.Contains(context.Background(), "TEST", f.hash)
require.True(t, contains, "%s must present", name)
name = minioPrefix + name
_, err = cl.DeleteObject(context.Background(), &awss3.DeleteObjectInput{
Bucket: &minioBucket,
Key: &name,
})
if err != nil {
t.Fatal(err)
}
}
time.Sleep(time.Millisecond * 200)
for name, f := range files {
contains, _ := d.Storage.Contains(context.Background(), "TEST", f.hash)
require.False(t, contains, "%s must absent", name)
}
}
@@ -16,6 +16,7 @@ import (
// import directory watcher to enable appropriate support
_ "github.com/sot-tech/mochi/middleware/torrentapproval/container/directory"
_ "github.com/sot-tech/mochi/middleware/torrentapproval/container/s3"
// import static list to enable appropriate support
_ "github.com/sot-tech/mochi/middleware/torrentapproval/container/list"
@@ -65,7 +66,7 @@ func build(config conf.MapConfig, st storage.PeerStorage) (h middleware.Hook, er
} else if ds, err = storage.NewDataStorage(cfg.Storage); err == nil {
dsc = ds
} else {
return
return h, err
}
var c container.Container
+2 -2
View File
@@ -36,7 +36,7 @@ func build(config conf.MapConfig, _ storage.PeerStorage) (h middleware.Hook, err
}
}
}
return
return h, err
}
var (
@@ -112,5 +112,5 @@ func deriveEntropyFromRequest(req *bittorrent.AnnounceRequest) (v0 uint64, v1 ui
v0 = binary.BigEndian.Uint64([]byte(req.InfoHash[:8])) + binary.BigEndian.Uint64([]byte(req.InfoHash[8:16]))
}
v1 = binary.BigEndian.Uint64(req.ID[:8]) + binary.BigEndian.Uint64(req.ID[8:16])
return
return v0, v1
}
+1 -1
View File
@@ -49,7 +49,7 @@ func (m MapConfig) Unmarshal(into any) (err error) {
} else {
err = ErrNilConfigMap
}
return
return err
}
// NamedMapConfig encapsulates MapConfig with string Name
+1 -1
View File
@@ -38,7 +38,7 @@ type TimeCache struct {
func New() (tc *TimeCache) {
tc = &TimeCache{closed: make(chan struct{})}
tc.clock.Store(time.Now().UnixNano())
return
return tc
}
// Run runs the TimeCache, updating the cached clock value once every interval
+1 -1
View File
@@ -108,7 +108,7 @@ func (s *store) addPeer(ctx context.Context, infoHashKey, peerID string) (err er
if err = s.SAdd(ctx, infoHashKey, peerID).Err(); err == nil {
err = s.Process(ctx, redis.NewCmd(ctx, expireMemberCmd, infoHashKey, peerID, s.peerTTL))
}
return
return err
}
func (s *store) delPeer(ctx context.Context, infoHashKey, peerID string) error {
+30 -30
View File
@@ -181,14 +181,14 @@ func newStorage(cfg config) (*mdb, error) {
dataDB, err = txn.OpenRoot(0)
}
if err != nil {
return
return err
}
if len(cfg.PeersDBName) > 0 {
peersDB, err = txn.CreateDBI(cfg.PeersDBName)
} else {
peersDB, err = txn.OpenRoot(0)
}
return
return err
}); err != nil {
_ = env.Close()
return nil, err
@@ -216,7 +216,7 @@ func (m *mdb) Close() (err error) {
err = m.lmdbEnv.Close()
}
})
return
return err
}
const keySeparator = '_'
@@ -256,31 +256,31 @@ func (m *mdb) Put(_ context.Context, storeCtx string, values ...storage.Entry) (
break
}
}
return
return err
})
}
return
return err
}
func (m *mdb) Contains(_ context.Context, storeCtx string, key string) (contains bool, err error) {
err = m.View(func(txn *lmdb.Txn) (err error) {
_, err = txn.Get(m.dataDB, composeKey(storeCtx, key))
return
return err
})
if err == nil {
contains = true
} else if lmdb.IsNotFound(err) {
err = nil
}
return
return contains, err
}
func (m *mdb) Load(_ context.Context, storeCtx string, key string) (v []byte, err error) {
err = m.View(func(txn *lmdb.Txn) (err error) {
v, err = ignoreNotFoundData(txn.Get(m.dataDB, composeKey(storeCtx, key)))
return
return err
})
return
return v, err
}
func (m *mdb) Delete(_ context.Context, storeCtx string, keys ...string) (err error) {
@@ -291,10 +291,10 @@ func (m *mdb) Delete(_ context.Context, storeCtx string, keys ...string) (err er
break
}
}
return
return err
})
}
return
return err
}
const (
@@ -324,7 +324,7 @@ func unpackPeer(arr []byte) (peer bittorrent.Peer) {
AddrPort: netip.AddrPortFrom(netip.AddrFrom16([ipLen]byte(arr[bittorrent.PeerIDLen:])).Unmap(),
binary.BigEndian.Uint16(arr[bittorrent.PeerIDLen+ipLen:])),
}
return
return peer
}
func composeIHKeyPrefix(ih []byte, seeder bool, v6 bool, suffixLen int) (ihKey []byte, suffixStart int) {
@@ -343,13 +343,13 @@ func composeIHKeyPrefix(ih []byte, seeder bool, v6 bool, suffixLen int) (ihKey [
ihKey[2], ihKey[ihLen+3] = keySeparator, keySeparator
copy(ihKey[3:], ih)
suffixStart = len(ihKey) - suffixLen
return
return ihKey, suffixStart
}
func composeIHKey(ih bittorrent.InfoHash, peer bittorrent.Peer, seeder bool) (ihKey []byte) {
ihKey, start := composeIHKeyPrefix(ih.Bytes(), seeder, peer.Addr().Is6(), packedPeerLen)
packPeer(peer, ihKey[start:])
return
return ihKey
}
func (m *mdb) putPeer(ih bittorrent.InfoHash, peer bittorrent.Peer, seeder bool) error {
@@ -359,7 +359,7 @@ func (m *mdb) putPeer(ih bittorrent.InfoHash, peer bittorrent.Peer, seeder bool)
if b, err = txn.PutReserve(m.peersDB, ihKey, 8, 0); err == nil {
binary.BigEndian.PutUint64(b, uint64(timecache.NowUnix()))
}
return
return err
})
}
@@ -390,12 +390,12 @@ func (m *mdb) GraduateLeecher(_ context.Context, ih bittorrent.InfoHash, peer bi
ihKey := composeIHKey(ih, peer, false)
return m.Update(func(txn *lmdb.Txn) (err error) {
if err = ignoreNotFound(txn.Del(m.peersDB, ihKey, nil)); err != nil {
return
return err
}
ihKey[0] = seederPrefix
var b []byte
if b, err = txn.PutReserve(m.peersDB, ihKey, 8, 0); err != nil {
return
return err
}
binary.BigEndian.PutUint64(b, uint64(timecache.NowUnixNano()))
@@ -403,7 +403,7 @@ func (m *mdb) GraduateLeecher(_ context.Context, ih bittorrent.InfoHash, peer bi
ihPrefix[0], ihPrefix[1] = downloadedPrefix, countPrefix
var v int
if b, err = ignoreNotFoundData(txn.Get(m.peersDB, ihPrefix)); err != nil {
return
return err
}
if len(b) >= 4 {
v = int(binary.BigEndian.Uint32(b))
@@ -412,7 +412,7 @@ func (m *mdb) GraduateLeecher(_ context.Context, ih bittorrent.InfoHash, peer bi
if b, err = txn.PutReserve(m.peersDB, ihPrefix, 4, 0); err == nil {
binary.BigEndian.PutUint32(b, uint32(v))
}
return
return err
})
}
@@ -450,11 +450,11 @@ func (m *mdb) scanPeers(ctx context.Context, prefix []byte, readRaw bool, fn fun
err = scanner.Err()
}
scanner.Close()
return
return err
})
m.wg.Done()
return
return err
}
func (m *mdb) AnnouncePeers(ctx context.Context, ih bittorrent.InfoHash, forSeeder bool, numWant int, v6 bool) (peers []bittorrent.Peer, err error) {
@@ -474,7 +474,7 @@ func (m *mdb) AnnouncePeers(ctx context.Context, ih bittorrent.InfoHash, forSeed
err = m.scanPeers(ctx, prefix, true, appendFn)
}
}
return
return peers, err
}
func (m *mdb) countPeers(ctx context.Context, scanPrefix []byte) (cnt uint32, err error) {
@@ -509,35 +509,35 @@ func (m *mdb) countPeers(ctx context.Context, scanPrefix []byte) (cnt uint32, er
}
err = scanner.Err()
scanner.Close()
return
return err
})
m.wg.Done()
return
return cnt, err
}
func (m *mdb) ScrapeSwarm(ctx context.Context, ih bittorrent.InfoHash) (leechers uint32, seeders uint32, snatched uint32, err error) {
scanPrefix, _ := composeIHKeyPrefix(ih.Bytes(), false, false, 0)
if leechers, err = m.countPeers(ctx, scanPrefix); err != nil {
return
return leechers, seeders, snatched, err
}
scanPrefix[0], scanPrefix[1] = seederPrefix, ipv4Prefix
if seeders, err = m.countPeers(ctx, scanPrefix); err != nil {
return
return leechers, seeders, snatched, err
}
scanPrefix[0], scanPrefix[1] = downloadedPrefix, countPrefix
err = m.View(func(txn *lmdb.Txn) (err error) {
var b []byte
if b, err = ignoreNotFoundData(txn.Get(m.peersDB, scanPrefix)); err != nil {
return
return err
}
if len(b) >= 4 {
snatched = binary.BigEndian.Uint32(b)
}
return
return err
})
return
return leechers, seeders, snatched, err
}
const (
@@ -564,7 +564,7 @@ func (m *mdb) gc(cutoff time.Time) {
break
}
}
return
return err
})
}
if err == nil {
+9 -9
View File
@@ -101,7 +101,7 @@ func (p *ihSwarm) get(k bittorrent.InfoHash) (v swarm, ok bool) {
p.RLock()
v, ok = p.m[k]
p.RUnlock()
return
return v, ok
}
func (p *ihSwarm) getOrCreate(k bittorrent.InfoHash) (v swarm) {
@@ -117,7 +117,7 @@ func (p *ihSwarm) getOrCreate(k bittorrent.InfoHash) (v swarm) {
}
p.Unlock()
}
return
return v
}
func (p *ihSwarm) del(k bittorrent.InfoHash) (ok bool) {
@@ -126,7 +126,7 @@ func (p *ihSwarm) del(k bittorrent.InfoHash) (ok bool) {
delete(p.m, k)
}
p.Unlock()
return
return ok
}
func (p *ihSwarm) len() int {
@@ -158,7 +158,7 @@ func (p *peers) get(k bittorrent.Peer) (v int64, ok bool) {
p.RLock()
v, ok = p.m[k]
p.RUnlock()
return
return v, ok
}
func (p *peers) set(k bittorrent.Peer, v int64) {
@@ -173,7 +173,7 @@ func (p *peers) del(k bittorrent.Peer) (ok bool) {
delete(p.m, k)
}
p.Unlock()
return
return ok
}
func (p *peers) len() int {
@@ -322,7 +322,7 @@ func (ps *peerStore) DeleteSeeder(_ context.Context, ih bittorrent.InfoHash, p b
err = storage.ErrResourceDoesNotExist
}
return
return err
}
func (ps *peerStore) PutLeecher(_ context.Context, ih bittorrent.InfoHash, p bittorrent.Peer) error {
@@ -368,7 +368,7 @@ func (ps *peerStore) DeleteLeecher(_ context.Context, ih bittorrent.InfoHash, p
err = storage.ErrResourceDoesNotExist
}
return
return err
}
func (ps *peerStore) GraduateLeecher(_ context.Context, ih bittorrent.InfoHash, p bittorrent.Peer) error {
@@ -427,7 +427,7 @@ func (ps *peerStore) AnnouncePeers(_ context.Context, ih bittorrent.InfoHash, fo
}
}
return
return peers, err
}
func (ps *peerStore) countPeers(ih bittorrent.InfoHash, v6 bool) (leechers, seeders uint32) {
@@ -436,7 +436,7 @@ func (ps *peerStore) countPeers(ih bittorrent.InfoHash, v6 bool) (leechers, seed
if sw, ok := shard.swarms.get(ih); ok {
leechers, seeders = uint32(sw.leechers.len()), uint32(sw.seeders.len())
}
return
return leechers, seeders
}
func (ps *peerStore) ScrapeSwarm(_ context.Context, ih bittorrent.InfoHash) (leechers uint32, seeders uint32, snatched uint32, _ error) {
+14 -14
View File
@@ -144,7 +144,7 @@ func checkParameter(p *string, name string) (err error) {
if *p = strings.TrimSpace(*p); len(*p) == 0 {
err = fmt.Errorf(errRequiredParameterNotSetMsg, name)
}
return
return err
}
type config struct {
@@ -268,7 +268,7 @@ func (s *store) txBatch(ctx context.Context, batch *pgx.Batch) (err error) {
}
}
}
return
return err
}
func (s *store) Put(ctx context.Context, storeCtx string, values ...storage.Entry) (err error) {
@@ -284,7 +284,7 @@ func (s *store) Put(ctx context.Context, storeCtx string, values ...storage.Entr
}
err = s.txBatch(ctx, &batch)
}
return
return err
}
func (s *store) Contains(ctx context.Context, storeCtx string, key string) (contains bool, err error) {
@@ -294,12 +294,12 @@ func (s *store) Contains(ctx context.Context, storeCtx string, key string) (cont
contains = rows.Next()
err = rows.Err()
}
return
return contains, err
}
func (s *store) Load(ctx context.Context, storeCtx string, key string) (out []byte, err error) {
err = noResultErr(s.QueryRow(ctx, s.Data.GetQuery, pgx.NamedArgs{pCtx: storeCtx, pKey: []byte(key)}).Scan(&out))
return
return out, err
}
func (s *store) Delete(ctx context.Context, storeCtx string, keys ...string) (err error) {
@@ -310,7 +310,7 @@ func (s *store) Delete(ctx context.Context, storeCtx string, keys ...string) (er
}
_, err = s.Exec(ctx, s.Data.DelQuery, pgx.NamedArgs{pCtx: storeCtx, pKey: baKeys})
}
return
return err
}
func (s *store) Preservable() bool {
@@ -397,7 +397,7 @@ func (s *store) putPeer(ctx context.Context, ih []byte, peer bittorrent.Peer, se
pV6: peer.Addr().Is6(),
pCreated: timecache.Now(),
})
return
return err
}
func (s *store) delPeer(ctx context.Context, ih []byte, peer bittorrent.Peer, seeder bool) (err error) {
@@ -412,7 +412,7 @@ func (s *store) delPeer(ctx context.Context, ih []byte, peer bittorrent.Peer, se
pPort: peer.Port(),
pSeeder: seeder,
})
return
return err
}
func (s *store) PutSeeder(ctx context.Context, ih bittorrent.InfoHash, peer bittorrent.Peer) error {
@@ -475,7 +475,7 @@ func (s *store) getPeers(ctx context.Context, ih []byte, seeders bool, maxCount
s.Announce.AddressColumn,
s.Announce.PortColumn,
})
return
return peers, err
}
var maxIndex int
switch {
@@ -516,7 +516,7 @@ func (s *store) getPeers(ctx context.Context, ih []byte, seeders bool, maxCount
}
}
}
return
return peers, err
}
func (s *store) AnnouncePeers(ctx context.Context, ih bittorrent.InfoHash, forSeeder bool, numWant int, v6 bool) (peers []bittorrent.Peer, err error) {
@@ -546,7 +546,7 @@ func (s *store) AnnouncePeers(ctx context.Context, ih bittorrent.InfoHash, forSe
logger.Warn().Err(err).Stringer("infoHash", ih).Msg("error occurred while retrieving peers")
}
return
return peers, err
}
func (s *store) countPeers(ctx context.Context, ih []byte) (seeders uint32, leechers uint32, err error) {
@@ -585,7 +585,7 @@ func (s *store) countPeers(ctx context.Context, ih []byte) (seeders uint32, leec
}
}
}
return
return seeders, leechers, err
}
func (s *store) ScrapeSwarm(ctx context.Context, ih bittorrent.InfoHash) (leechers uint32, seeders uint32, snatched uint32, err error) {
@@ -594,13 +594,13 @@ func (s *store) ScrapeSwarm(ctx context.Context, ih bittorrent.InfoHash) (leeche
Msg("scrape swarm")
ihb := ih.Bytes()
if seeders, leechers, err = s.countPeers(ctx, ihb); err != nil {
return
return leechers, seeders, snatched, err
}
if len(s.Downloads.GetQuery) > 0 {
err = noResultErr(s.QueryRow(ctx, s.Downloads.GetQuery, pgx.NamedArgs{pInfoHash: ihb}).Scan(&snatched))
}
return
return leechers, seeders, snatched, err
}
func (s *store) Ping(ctx context.Context) error {
+20 -20
View File
@@ -353,7 +353,7 @@ func (ps *store) count(key string, getLength bool) (n uint64) {
if err != nil {
logger.Error().Err(err).Str("key", key).Msg("GET/SCARD failure")
}
return
return n
}
func (ps *store) getClock() int64 {
@@ -374,7 +374,7 @@ func (ps *store) tx(ctx context.Context, txf func(tx redis.Pipeliner) error) (er
} else {
err = txErr
}
return
return err
}
// NoResultErr returns nil if provided err is redis.Nil
@@ -406,7 +406,7 @@ func InfoHashKey(infoHash string, seeder, v6 bool) (infoHashKey string) {
infoHashKey = IH4LeecherKey
}
infoHashKey += infoHash
return
return infoHashKey
}
func (ps *store) putPeer(ctx context.Context, infoHashKey, peerCountKey, peerID string) error {
@@ -416,13 +416,13 @@ func (ps *store) putPeer(ctx context.Context, infoHashKey, peerCountKey, peerID
Msg("put peer")
return ps.tx(ctx, func(tx redis.Pipeliner) (err error) {
if err = tx.HSet(ctx, infoHashKey, peerID, ps.getClock()).Err(); err != nil {
return
return err
}
if err = tx.Incr(ctx, peerCountKey).Err(); err != nil {
return
return err
}
err = tx.SAdd(ctx, IHKey, infoHashKey).Err()
return
return err
})
}
@@ -513,7 +513,7 @@ var errInvalidPeerDataSize = fmt.Errorf("invalid peer data (must be at least %d
func UnpackPeer(data string) (peer bittorrent.Peer, err error) {
if len(data) < peerMinimumLen {
err = errInvalidPeerDataSize
return
return peer, err
}
b := str2bytes.StringToBytes(data)
peerID, _ := bittorrent.NewPeerID(b[:bittorrent.PeerIDLen])
@@ -529,7 +529,7 @@ func UnpackPeer(data string) (peer bittorrent.Peer, err error) {
err = bittorrent.ErrInvalidIP
}
return
return peer, err
}
func (ps *Connection) parsePeersList(peersResult *redis.StringSliceCmd) (peers []bittorrent.Peer, err error) {
@@ -544,7 +544,7 @@ func (ps *Connection) parsePeersList(peersResult *redis.StringSliceCmd) (peers [
}
}
}
return
return peers, err
}
type getPeersFn func(context.Context, string, int) *redis.StringSliceCmd
@@ -586,7 +586,7 @@ func (ps *Connection) GetPeers(
logger.Warn().Err(err).Stringer("infoHash", ih).Msg("error occurred while retrieving peers")
}
return
return out, err
}
func (ps *store) AnnouncePeers(
@@ -613,26 +613,26 @@ func (ps *Connection) ScrapeIH(ctx context.Context, ih bittorrent.InfoHash, coun
lc4, err = countFn(ctx, InfoHashKey(infoHash, false, false)).Result()
if err = NoResultErr(err); err != nil {
return
return leechersCount, seedersCount, downloadsCount, err
}
lc6, err = countFn(ctx, InfoHashKey(infoHash, false, true)).Result()
if err = NoResultErr(err); err != nil {
return
return leechersCount, seedersCount, downloadsCount, err
}
sc4, err = countFn(ctx, InfoHashKey(infoHash, true, false)).Result()
if err = NoResultErr(err); err != nil {
return
return leechersCount, seedersCount, downloadsCount, err
}
sc6, err = countFn(ctx, InfoHashKey(infoHash, true, true)).Result()
if err = NoResultErr(err); err != nil {
return
return leechersCount, seedersCount, downloadsCount, err
}
dc, err = ps.HGet(ctx, CountDownloadsKey, infoHash).Int64()
if err = NoResultErr(err); err != nil {
return
return leechersCount, seedersCount, downloadsCount, err
}
leechersCount, seedersCount, downloadsCount = uint32(lc4+lc6), uint32(sc4+sc6), uint32(dc)
return
return leechersCount, seedersCount, downloadsCount, err
}
func (ps *store) ScrapeSwarm(ctx context.Context, ih bittorrent.InfoHash) (uint32, uint32, uint32, error) {
@@ -667,7 +667,7 @@ func (ps *Connection) Put(ctx context.Context, storeCtx string, values ...storag
}
}
}
return
return err
}
// Contains - storage.DataStorage implementation
@@ -682,7 +682,7 @@ func (ps *Connection) Load(ctx context.Context, storeCtx string, key string) (v
if err != nil && errors.Is(err, redis.Nil) {
v, err = nil, nil
}
return
return v, err
}
// Delete - storage.DataStorage implementation
@@ -700,7 +700,7 @@ func (ps *Connection) Delete(ctx context.Context, storeCtx string, keys ...strin
}
}
}
return
return err
}
// Preservable - storage.DataStorage implementation
@@ -866,5 +866,5 @@ func (ps *store) Close() (err error) {
logger.Info().Msg("redis exiting. mochi does not clear data in redis when exiting. mochi keys have prefix " + PrefixKey)
err = ps.UniversalClient.Close()
})
return
return err
}
+5 -5
View File
@@ -61,7 +61,7 @@ func (c Config) sanitizeGCConfig() (gcInterval, peerTTL time.Duration) {
} else {
peerTTL = c.PeerLifetime
}
return
return gcInterval, peerTTL
}
func (c Config) sanitizeStatisticsConfig() (statInterval time.Duration) {
@@ -73,7 +73,7 @@ func (c Config) sanitizeStatisticsConfig() (statInterval time.Duration) {
Dur("default", DefaultPrometheusReportingInterval).
Msg("falling back to default configuration")
}
return
return statInterval
}
// Entry - some key-value pair, used for BulkPut
@@ -269,11 +269,11 @@ func NewPeerStorage(cfg conf.NamedMapConfig) (ps PeerStorage, err error) {
c := new(Config)
if err = cfg.Config.Unmarshal(c); err != nil {
return
return ps, err
}
if ps, err = d.NewPeerStorage(cfg.Config); err != nil {
return
return ps, err
}
if gc, isOk := ps.(GarbageCollector); isOk {
@@ -308,5 +308,5 @@ func NewPeerStorage(cfg conf.NamedMapConfig) (ps PeerStorage, err error) {
logger.Info().Str("name", cfg.Name).Msg("storage started")
return
return ps, err
}
+2 -2
View File
@@ -30,7 +30,7 @@ func generateInfoHashes() (a [ihCount]bittorrent.InfoHash) {
for i := range a {
a[i] = randIH(i < ihCount/2)
}
return
return a
}
func generatePeers() (a [peersCount]bittorrent.Peer) {
@@ -56,7 +56,7 @@ func generatePeers() (a [peersCount]bittorrent.Peer) {
}
}
return
return a
}
type (
+2 -2
View File
@@ -27,7 +27,7 @@ func randIH(v2 bool) (ih bittorrent.InfoHash) {
panic(err)
}
ih, _ = bittorrent.NewInfoHash(b)
return
return ih
}
func randPeerID() (ih bittorrent.PeerID) {
@@ -36,7 +36,7 @@ func randPeerID() (ih bittorrent.PeerID) {
panic(err)
}
ih, _ = bittorrent.NewPeerID(b)
return
return ih
}
func init() {