diff --git a/.github/workflows/build.yaml b/.github/workflows/build.yaml index a4161f0..412d94b 100644 --- a/.github/workflows/build.yaml +++ b/.github/workflows/build.yaml @@ -22,6 +22,11 @@ jobs: unit: name: "Run Unit Tests" runs-on: "ubuntu-latest" + services: + redis: + image: "eqalpha/keydb" + ports: [ "6379:6379" ] + options: "--entrypoint keydb-server" steps: - uses: "actions/checkout@v3" - uses: "actions/setup-go@v3" @@ -54,9 +59,9 @@ jobs: runs-on: "ubuntu-latest" services: redis: - image: "redis" + image: "eqalpha/keydb" ports: [ "6379:6379" ] - options: "--entrypoint redis-server" + options: "--entrypoint keydb-server" steps: - uses: "actions/checkout@v3" - uses: "actions/setup-go@v3" @@ -67,7 +72,7 @@ jobs: go install --tags e2e ./cmd/mochi curl -LO https://github.com/jzelinskie/faq/releases/download/0.0.6/faq-linux-amd64 chmod +x faq-linux-amd64 - ./faq-linux-amd64 '.mochi.storage = {"config":{"gc_interval":"3m","peer_lifetime":"31m","prometheus_reporting_interval":"1s","redis_broker":"redis://127.0.0.1:6379/0","connect_timeout":"15s","read_timeout":"15s","write_timeout":"15s"},"name":"redis"}' ./dist/example_config.yaml > ./dist/example_redis_config.yaml + ./faq-linux-amd64 '.mochi.storage = {"config":{"gc_interval":"3m","peer_lifetime":"31m","prometheus_reporting_interval":"1s","connect_timeout":"15s","read_timeout":"15s","write_timeout":"15s"},"name":"redis"}' ./dist/example_config.yaml > ./dist/example_redis_config.yaml cat ./dist/example_redis_config.yaml - name: "Run end-to-end tests" run: | diff --git a/bittorrent/bittorrent.go b/bittorrent/bittorrent.go index e372888..4da9a25 100644 --- a/bittorrent/bittorrent.go +++ b/bittorrent/bittorrent.go @@ -275,8 +275,8 @@ func (p Peer) RawString() string { // LogFields renders the current peer as a set of Logrus fields. func (p Peer) LogFields() log.Fields { return log.Fields{ - "ID": p.ID, - "IP": p.Addr(), + "id": p.ID, + "ip": p.Addr(), "port": p.Port(), } } diff --git a/cmd/mochi/config.go b/cmd/mochi/config.go index 9245b53..fc2ecba 100644 --- a/cmd/mochi/config.go +++ b/cmd/mochi/config.go @@ -16,6 +16,7 @@ import ( _ "github.com/sot-tech/mochi/middleware/varinterval" // Imports to register storage drivers. + _ "github.com/sot-tech/mochi/storage/keydb" _ "github.com/sot-tech/mochi/storage/memory" _ "github.com/sot-tech/mochi/storage/redis" ) diff --git a/docs/storage/redis.md b/docs/storage/redis.md index 4a51842..88a452a 100644 --- a/docs/storage/redis.md +++ b/docs/storage/redis.md @@ -58,24 +58,24 @@ time as value. Here is an example: ``` -- CHI_4_I - - CHI_4_S_ - - CHI_4_L_ -- CHI_4_S_ +- CHI_I + - CHI_S4_ + - CHI_L4_ +- CHI_S4_ - : - : -- CHI_4_L_ +- CHI_L4_ - : ... ``` -In this case, prometheus would record two swarms, three seeders, and one leecher. These three keys per address family -are used to record the count of swarms, seeders, and leechers. +In this case, prometheus would record two swarms, three seeders, and one leecher. These two keys +are used to record the count of seeders and leechers. ``` -- CHI_4_S_C: "3" -- CHI_6_L_C: "1" +- CHI_S_C: "3" +- CHI_L_C: "1" ``` -Note: `CHI_4_I` set has a different meaning compared to the `memory` storage: +Note: `CHI_I` set has a different meaning compared to the `memory` storage: It represents info hashes reported by seeder, meaning that info hashes without seeders are not counted. diff --git a/frontend/http/frontend.go b/frontend/http/frontend.go index c5d845b..1161de5 100644 --- a/frontend/http/frontend.go +++ b/frontend/http/frontend.go @@ -349,7 +349,7 @@ func (f *Frontend) scrapeRoute(w http.ResponseWriter, r *http.Request, ps httpro addr, err = netip.ParseAddr(host) if err != nil || addr.IsUnspecified() { - log.Error("http: invalid IP: neither v4 nor v6", log.Fields{"RemoteAddr": r.RemoteAddr}) + log.Error("http: invalid IP: neither v4 nor v6", log.Fields{"remoteAddr": r.RemoteAddr}) WriteError(w, bittorrent.ErrInvalidIP) return } diff --git a/frontend/udp/parser.go b/frontend/udp/parser.go index 11f956b..7e50582 100644 --- a/frontend/udp/parser.go +++ b/frontend/udp/parser.go @@ -81,8 +81,9 @@ func ParseAnnounce(r Request, v6Action bool, opts ParseOptions) (*bittorrent.Ann return nil, errMalformedPacket } - // XXX: pure V2 hashes will cause invalid parsing - infohash := r.Packet[16:36] + // XXX: pure V2 hashes will cause invalid parsing, + // but BEP-52 says, that V2 hashes SHOULD be truncated + infoHash := r.Packet[16:36] peerIDBytes := r.Packet[36:56] downloaded := binary.BigEndian.Uint64(r.Packet[56:64]) left := binary.BigEndian.Uint64(r.Packet[64:72]) @@ -117,7 +118,7 @@ func ParseAnnounce(r Request, v6Action bool, opts ParseOptions) (*bittorrent.Ann return nil, err } - ih, err := bittorrent.NewInfoHash(infohash) + ih, err := bittorrent.NewInfoHash(infoHash) if err != nil { return nil, errInvalidInfoHash } @@ -222,33 +223,27 @@ func ParseScrape(r Request, opts ParseOptions) (*bittorrent.ScrapeRequest, error // Skip past the initial headers and check that the bytes left equal the // length of a valid list of infohashes. r.Packet = r.Packet[16:] - l := len(r.Packet) - isV1, isV2 := l%bittorrent.InfoHashV1Len == 0, l%bittorrent.InfoHashV2Len == 0 - - if !(isV1 || isV2) { + // Only V1 and V2to1 (truncated) allowed + if len(r.Packet)%bittorrent.InfoHashV1Len != 0 { return nil, errMalformedPacket } // Allocate a list of infohashes and append it to the list until we're out. - var infohashes []bittorrent.InfoHash + var infoHashes []bittorrent.InfoHash var err error var request *bittorrent.ScrapeRequest - pageSize := bittorrent.InfoHashV1Len - if isV2 { - pageSize = bittorrent.InfoHashV2Len - } - for len(r.Packet) >= pageSize { + for len(r.Packet) >= bittorrent.InfoHashV1Len { var ih bittorrent.InfoHash - if ih, err = bittorrent.NewInfoHash(r.Packet[:pageSize]); err == nil { - infohashes = append(infohashes, ih) - r.Packet = r.Packet[pageSize:] + if ih, err = bittorrent.NewInfoHash(r.Packet[:bittorrent.InfoHashV1Len]); err == nil { + infoHashes = append(infoHashes, ih) + r.Packet = r.Packet[bittorrent.InfoHashV1Len:] } else { break } } if err == nil { // Sanitize the request. - request = &bittorrent.ScrapeRequest{InfoHashes: infohashes} + request = &bittorrent.ScrapeRequest{InfoHashes: infoHashes} err = bittorrent.SanitizeScrape(request, opts.MaxScrapeInfoHashes) } diff --git a/go.mod b/go.mod index 7162562..67b26a7 100644 --- a/go.mod +++ b/go.mod @@ -4,13 +4,12 @@ go 1.18 require ( github.com/SermoDigital/jose v0.9.2-0.20180104203859-803625baeddc - github.com/alicebob/miniredis v2.5.0+incompatible github.com/anacrolix/torrent v1.42.0 github.com/go-redis/redis/v8 v8.11.5 github.com/julienschmidt/httprouter v1.3.0 github.com/mendsley/gojwk v0.0.0-20141217222730-4d5ec6e58103 github.com/minio/sha256-simd v1.0.0 - github.com/mitchellh/mapstructure v1.4.3 + github.com/mitchellh/mapstructure v1.5.0 github.com/pkg/errors v0.9.1 github.com/prometheus/client_golang v1.12.1 github.com/sirupsen/logrus v1.8.1 @@ -20,7 +19,6 @@ require ( ) require ( - github.com/alicebob/gopher-json v0.0.0-20200520072559-a9ecdc9d1d3a // indirect github.com/anacrolix/dht/v2 v2.17.0 // indirect github.com/anacrolix/log v0.13.1 // indirect github.com/anacrolix/missinggo v1.3.0 // indirect @@ -30,19 +28,17 @@ require ( github.com/cespare/xxhash/v2 v2.1.2 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect - github.com/fsnotify/fsnotify v1.5.1 // indirect + github.com/fsnotify/fsnotify v1.5.3 // indirect github.com/golang/protobuf v1.5.2 // indirect - github.com/gomodule/redigo v1.8.8 // indirect github.com/huandu/xstrings v1.3.2 // indirect github.com/inconshreveable/mousetrap v1.0.0 // indirect github.com/klauspost/cpuid/v2 v2.0.12 // indirect github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/prometheus/client_model v0.2.0 // indirect - github.com/prometheus/common v0.33.0 // indirect + github.com/prometheus/common v0.34.0 // indirect github.com/prometheus/procfs v0.7.3 // indirect github.com/spf13/pflag v1.0.5 // indirect - github.com/yuin/gopher-lua v0.0.0-20210529063254-f4c35e4016d9 // indirect - golang.org/x/sys v0.0.0-20220412211240-33da011f77ad // indirect + golang.org/x/sys v0.0.0-20220422013727-9388b58f7150 // indirect google.golang.org/protobuf v1.28.0 // indirect ) diff --git a/go.sum b/go.sum index 3c22fda..55c6f9f 100644 --- a/go.sum +++ b/go.sum @@ -47,10 +47,6 @@ github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuy github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho= -github.com/alicebob/gopher-json v0.0.0-20200520072559-a9ecdc9d1d3a h1:HbKu58rmZpUGpz5+4FfNmIU+FmZg2P3Xaj2v2bfNWmk= -github.com/alicebob/gopher-json v0.0.0-20200520072559-a9ecdc9d1d3a/go.mod h1:SGnFV6hVsYE877CKEZ6tDNTjaSXYUk6QqoIK6PrAtcc= -github.com/alicebob/miniredis v2.5.0+incompatible h1:yBHoLpsyjupjz3NL3MhKMVkR41j82Yjf3KFv7ApYzUI= -github.com/alicebob/miniredis v2.5.0+incompatible/go.mod h1:8HZjEj4yU0dwhYHky+DxYx+6BMjkBbe5ONFIF1MXffk= github.com/anacrolix/dht/v2 v2.17.0 h1:MrAS6XqVCqNyyskTwxZB2sqhU/GGUdecb2TNe2b2QjE= github.com/anacrolix/dht/v2 v2.17.0/go.mod h1:osiyaNrMLG9dw7wUtVMaII/NdCjlXeHjUcYzXnmop68= github.com/anacrolix/envpprof v0.0.0-20180404065416-323002cec2fa/go.mod h1:KgHhUaQMc8cC0+cEflSgCFNFbKwi5h54gqtVn8yhP7c= @@ -115,8 +111,8 @@ github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1m github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= github.com/frankban/quicktest v1.14.0 h1:+cqqvzZV87b4adx/5ayVOaYZ2CrvM4ejQvUdBzPPUss= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= -github.com/fsnotify/fsnotify v1.5.1 h1:mZcQUHVQUQWoPXXtuf9yuEXKudkV2sx1E06UadKWpgI= -github.com/fsnotify/fsnotify v1.5.1/go.mod h1:T3375wBYaZdLLcVNkcVbzGHY7f1l/uK5T5Ai1i3InKU= +github.com/fsnotify/fsnotify v1.5.3 h1:vNFpj2z7YIbwh2bw7x35sqYpp2wfuq+pivKbWG09B8c= +github.com/fsnotify/fsnotify v1.5.3/go.mod h1:T3375wBYaZdLLcVNkcVbzGHY7f1l/uK5T5Ai1i3InKU= github.com/glycerine/go-unsnap-stream v0.0.0-20180323001048-9f0cb55181dd/go.mod h1:/20jfyN9Y5QPEAprSgKAUr+glWDY39ZiUEAYOEv5dsE= github.com/glycerine/go-unsnap-stream v0.0.0-20181221182339-f9677308dec2/go.mod h1:/20jfyN9Y5QPEAprSgKAUr+glWDY39ZiUEAYOEv5dsE= github.com/glycerine/go-unsnap-stream v0.0.0-20190901134440-81cf024a9e0a/go.mod h1:/20jfyN9Y5QPEAprSgKAUr+glWDY39ZiUEAYOEv5dsE= @@ -169,8 +165,6 @@ github.com/golang/protobuf v1.5.2 h1:ROPKBNFfQgOUMifHyP+KYbvpjbdoFNs+aK7DXlji0Tw github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= -github.com/gomodule/redigo v1.8.8 h1:f6cXq6RRfiyrOJEV7p3JhLDlmawGBVBBP1MggY8Mo4E= -github.com/gomodule/redigo v1.8.8/go.mod h1:7ArFNvsTjH8GMMzB4uy1snslv2BwmginuMs06a1uzZE= github.com/google/btree v0.0.0-20180124185431-e89373fe6b4a/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= @@ -247,8 +241,8 @@ github.com/mendsley/gojwk v0.0.0-20141217222730-4d5ec6e58103 h1:Z/i1e+gTZrmcGeZy github.com/mendsley/gojwk v0.0.0-20141217222730-4d5ec6e58103/go.mod h1:o9YPB5aGP8ob35Vy6+vyq3P3bWe7NQWzf+JLiXCiMaE= github.com/minio/sha256-simd v1.0.0 h1:v1ta+49hkWZyvaKwrQB8elexRqm6Y0aMLjCNsrYxo6g= github.com/minio/sha256-simd v1.0.0/go.mod h1:OuYzVNI5vcoYIAmbIvHPl3N3jUzVedXbKy5RFepssQM= -github.com/mitchellh/mapstructure v1.4.3 h1:OVowDSCllw/YjdLkam3/sm7wEtOy59d8ndGgCcyj8cs= -github.com/mitchellh/mapstructure v1.4.3/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo= +github.com/mitchellh/mapstructure v1.5.0 h1:jeMsZIYE/09sWLaz43PL7Gy6RuMjD2eJVyuac5Z2hdY= +github.com/mitchellh/mapstructure v1.5.0/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo= github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= @@ -294,8 +288,8 @@ github.com/prometheus/common v0.9.1/go.mod h1:yhUN8i9wzaXS3w1O07YhxHEBxD+W35wd8b github.com/prometheus/common v0.10.0/go.mod h1:Tlit/dnDKsSWFlCLTWaA1cyBgKHSMdTB80sz/V91rCo= github.com/prometheus/common v0.26.0/go.mod h1:M7rCNAaPfAosfx8veZJCuw84e35h3Cfd9VFqTh1DIvc= github.com/prometheus/common v0.32.1/go.mod h1:vu+V0TpY+O6vW9J44gczi3Ap/oXXR10b+M/gUGO4Hls= -github.com/prometheus/common v0.33.0 h1:rHgav/0a6+uYgGdNt3jwz8FNSesO/Hsang3O0T9A5SE= -github.com/prometheus/common v0.33.0/go.mod h1:gB3sOl7P0TvJabZpLY5uQMpUqRCPPCyRLCZYc7JZTNE= +github.com/prometheus/common v0.34.0 h1:RBmGO9d/FVjqHT0yUGQwBJhkwKV+wPCn7KGpvfab0uE= +github.com/prometheus/common v0.34.0/go.mod h1:gB3sOl7P0TvJabZpLY5uQMpUqRCPPCyRLCZYc7JZTNE= github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk= github.com/prometheus/procfs v0.0.0-20190117184657-bf6a532e95b1/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk= github.com/prometheus/procfs v0.0.2/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsTZCD3I8kEA= @@ -342,8 +336,6 @@ github.com/willf/bitset v1.1.10/go.mod h1:RjeCKbqT1RxIR/KWY6phxZiaY1IyutSBfGjNPy github.com/yuin/goldmark v1.1.25/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.1.32/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= -github.com/yuin/gopher-lua v0.0.0-20210529063254-f4c35e4016d9 h1:k/gmLsJDWwWqbLCur2yWnJzwQEKRcAHXo6seXGuSwWw= -github.com/yuin/gopher-lua v0.0.0-20210529063254-f4c35e4016d9/go.mod h1:E1AXubJBdNmFERAOucpDIxNzeGfLzg0mYh+UfMWdChA= go.opencensus.io v0.20.1/go.mod h1:6WKK9ahsWS3RSO+PY9ZHZUfv2irvY6gN279GOPZjmmk= go.opencensus.io v0.20.2/go.mod h1:6WKK9ahsWS3RSO+PY9ZHZUfv2irvY6gN279GOPZjmmk= go.opencensus.io v0.21.0/go.mod h1:mSImk1erAIZhrmZN+AvHh14ztQfjbGwt4TtuofqLduU= @@ -442,7 +434,6 @@ golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5h golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20181122145206-62eef0e2fa9b/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= -golang.org/x/sys v0.0.0-20190204203706-41f3e6584952/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190312061237-fead79001313/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= @@ -481,8 +472,8 @@ golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220114195835-da31bd327af9/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.0.0-20220412211240-33da011f77ad h1:ntjMns5wyP/fN65tdBD4g8J5w8n015+iIIs9rtjXkY0= -golang.org/x/sys v0.0.0-20220412211240-33da011f77ad/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220422013727-9388b58f7150 h1:xHms4gcpe1YE7A3yIllJXP16CMAGuqwO2lX1mTyyRRc= +golang.org/x/sys v0.0.0-20220422013727-9388b58f7150/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= diff --git a/middleware/hooks.go b/middleware/hooks.go index 5550fa4..2f0574b 100644 --- a/middleware/hooks.go +++ b/middleware/hooks.go @@ -30,38 +30,45 @@ type swarmInteractionHook struct { store storage.PeerStorage } -func (h *swarmInteractionHook) HandleAnnounce(ctx context.Context, req *bittorrent.AnnounceRequest, _ *bittorrent.AnnounceResponse) (_ context.Context, err error) { +func (h *swarmInteractionHook) HandleAnnounce(ctx context.Context, req *bittorrent.AnnounceRequest, _ *bittorrent.AnnounceResponse) (outCtx context.Context, err error) { + outCtx = ctx if ctx.Value(SkipSwarmInteractionKey) != nil { - return ctx, nil + return } + var storeFn func(bittorrent.InfoHash, bittorrent.Peer) error + switch { case req.Event == bittorrent.Stopped: - err = h.store.DeleteSeeder(req.InfoHash, req.Peer) - if err != nil && !errors.Is(err, storage.ErrResourceDoesNotExist) { - return ctx, err - } + storeFn = func(hash bittorrent.InfoHash, peer bittorrent.Peer) error { + err = h.store.DeleteSeeder(hash, peer) + if err != nil && !errors.Is(err, storage.ErrResourceDoesNotExist) { + return err + } - err = h.store.DeleteLeecher(req.InfoHash, req.Peer) - if err != nil && !errors.Is(err, storage.ErrResourceDoesNotExist) { - return ctx, err + err = h.store.DeleteLeecher(hash, peer) + if err != nil && !errors.Is(err, storage.ErrResourceDoesNotExist) { + return err + } + return nil } case req.Event == bittorrent.Completed: - err = h.store.GraduateLeecher(req.InfoHash, req.Peer) - return ctx, err + storeFn = h.store.GraduateLeecher case req.Left == 0: // Completed events will also have Left == 0, but by making this // an extra case we can treat "old" seeders differently from // graduating leechers. (Calling PutSeeder is probably faster // than calling GraduateLeecher.) - err = h.store.PutSeeder(req.InfoHash, req.Peer) - return ctx, err + storeFn = h.store.PutSeeder default: - err = h.store.PutLeecher(req.InfoHash, req.Peer) - return ctx, err + storeFn = h.store.PutLeecher } - return ctx, nil + if err = storeFn(req.InfoHash, req.Peer); err == nil && len(req.InfoHash) == bittorrent.InfoHashV2Len { + err = storeFn(req.InfoHash.TruncateV1(), req.Peer) + } + + return } func (h *swarmInteractionHook) HandleScrape(ctx context.Context, _ *bittorrent.ScrapeRequest, _ *bittorrent.ScrapeResponse) (context.Context, error) { @@ -97,6 +104,10 @@ func (h *responseHook) HandleAnnounce(ctx context.Context, req *bittorrent.Annou // Add the Scrape data to the response. resp.Incomplete, resp.Complete, _ = h.store.ScrapeSwarm(req.InfoHash, req.Peer) + if len(req.InfoHash) == bittorrent.InfoHashV2Len { + incomplete, complete, _ := h.store.ScrapeSwarm(req.InfoHash.TruncateV1(), req.Peer) + resp.Incomplete, resp.Complete = resp.Incomplete+incomplete, resp.Complete+complete + } err = h.appendPeers(req, resp) return ctx, err @@ -159,14 +170,14 @@ func (h *responseHook) HandleScrape(ctx context.Context, req *bittorrent.ScrapeR } for _, infoHash := range req.InfoHashes { - leechers, seeders, snatched := h.store.ScrapeSwarm(infoHash, req.Peer) + scr := bittorrent.Scrape{InfoHash: infoHash} + scr.Incomplete, scr.Complete, scr.Snatches = h.store.ScrapeSwarm(infoHash, req.Peer) + if len(infoHash) == bittorrent.InfoHashV2Len { + leechers, seeders, snatched := h.store.ScrapeSwarm(infoHash.TruncateV1(), req.Peer) + scr.Incomplete, scr.Complete, scr.Snatches = scr.Incomplete+leechers, scr.Complete+seeders, scr.Snatches+snatched + } - resp.Files = append(resp.Files, bittorrent.Scrape{ - InfoHash: infoHash, - Snatches: snatched, - Complete: seeders, - Incomplete: leechers, - }) + resp.Files = append(resp.Files, scr) } return ctx, nil diff --git a/storage/keydb/storage.go b/storage/keydb/storage.go index a1ad1c3..3df6ce3 100644 --- a/storage/keydb/storage.go +++ b/storage/keydb/storage.go @@ -35,10 +35,10 @@ var ErrNotKeyDB = errors.New("provided instance seems not KeyDB") func init() { // Register the storage driver. - storage.RegisterBuilder(Name, Builder) + storage.RegisterBuilder(Name, builder) } -func Builder(icfg conf.MapConfig) (storage.PeerStorage, error) { +func builder(icfg conf.MapConfig) (storage.PeerStorage, error) { var cfg r.Config var err error @@ -46,10 +46,10 @@ func Builder(icfg conf.MapConfig) (storage.PeerStorage, error) { return nil, err } - return New(cfg) + return newStore(cfg) } -func New(cfg r.Config) (*store, error) { +func newStore(cfg r.Config) (*store, error) { var err error if cfg, err = cfg.Validate(); err != nil { return nil, err @@ -70,7 +70,11 @@ func New(cfg r.Config) (*store, error) { var st *store if err == nil { - st = &store{rs, cfg.LogFields(), uint(cfg.PeerLifetime.Seconds())} + st = &store{ + Connection: rs, + logFields: cfg.LogFields(), + peerTTL: uint(cfg.PeerLifetime.Seconds()), + } } return st, err @@ -82,27 +86,27 @@ type store struct { peerTTL uint } -func (s store) setPeerTTL(infoHashKey, peerId string) error { - return s.Process(context.TODO(), redis.NewCmd(context.TODO(), expireMemberCmd, infoHashKey, peerId, s.peerTTL)) +func (s store) setPeerTTL(infoHashKey, peerID string) error { + return s.Process(context.TODO(), redis.NewCmd(context.TODO(), expireMemberCmd, infoHashKey, peerID, s.peerTTL)) } -func (s store) addPeer(infoHashKey, peerId string) (err error) { +func (s store) addPeer(infoHashKey, peerID string) (err error) { log.Debug("storage: KeyDB: PutPeer", log.Fields{ - "InfoHashKey": infoHashKey, - "PeerId": peerId, + "infoHashKey": infoHashKey, + "peerID": peerID, }) - if err = s.SAdd(context.TODO(), infoHashKey, peerId).Err(); err == nil { - err = s.setPeerTTL(infoHashKey, peerId) + if err = s.SAdd(context.TODO(), infoHashKey, peerID).Err(); err == nil { + err = s.setPeerTTL(infoHashKey, peerID) } return } -func (s store) delPeer(infoHashKey, peerId string) error { +func (s store) delPeer(infoHashKey, peerID string) error { log.Debug("storage: KeyDB: DeletePeer", log.Fields{ - "InfoHashKey": infoHashKey, - "PeerId": peerId, + "infoHashKey": infoHashKey, + "peerID": peerID, }) - deleted, err := s.SRem(context.TODO(), infoHashKey, peerId).Uint64() + deleted, err := s.SRem(context.TODO(), infoHashKey, peerID).Uint64() err = r.AsNil(err) if err == nil && deleted == 0 { err = storage.ErrResourceDoesNotExist @@ -112,34 +116,35 @@ func (s store) delPeer(infoHashKey, peerId string) error { } func (s store) PutSeeder(ih bittorrent.InfoHash, peer bittorrent.Peer) error { - return s.addPeer(r.IHSeederKey+ih.RawString(), peer.RawString()) + return s.addPeer(r.InfoHashKey(ih.RawString(), true, peer.Addr().Is6()), peer.RawString()) } func (s store) DeleteSeeder(ih bittorrent.InfoHash, peer bittorrent.Peer) error { - return s.delPeer(r.IHSeederKey+ih.RawString(), peer.RawString()) + return s.delPeer(r.InfoHashKey(ih.RawString(), true, peer.Addr().Is6()), peer.RawString()) } func (s store) PutLeecher(ih bittorrent.InfoHash, peer bittorrent.Peer) error { - return s.addPeer(r.IHLeecherKey+ih.RawString(), peer.RawString()) + return s.addPeer(r.InfoHashKey(ih.RawString(), false, peer.Addr().Is6()), peer.RawString()) } func (s store) DeleteLeecher(ih bittorrent.InfoHash, peer bittorrent.Peer) error { - return s.delPeer(r.IHLeecherKey+ih.RawString(), peer.RawString()) + return s.delPeer(r.InfoHashKey(ih.RawString(), false, peer.Addr().Is6()), peer.RawString()) } func (s store) GraduateLeecher(ih bittorrent.InfoHash, peer bittorrent.Peer) (err error) { log.Debug("storage: KeyDB: GraduateLeecher", log.Fields{ - "InfoHash": ih, - "Peer": peer, + "infoHash": ih, + "peer": peer, }) - infoHash, peerId := ih.RawString(), peer.RawString() - ihSeederKey, ihLeecherKey := r.IHSeederKey+infoHash, r.IHLeecherKey+infoHash + infoHash, peerID := ih.RawString(), peer.RawString() + ihSeederKey := r.InfoHashKey(infoHash, true, peer.Addr().Is6()) + ihLeecherKey := r.InfoHashKey(infoHash, false, peer.Addr().Is6()) var moved bool - if moved, err = s.SMove(context.TODO(), ihLeecherKey, ihSeederKey, peerId).Result(); err == nil { + if moved, err = s.SMove(context.TODO(), ihLeecherKey, ihSeederKey, peerID).Result(); err == nil { if moved { - err = s.setPeerTTL(ihSeederKey, peerId) + err = s.setPeerTTL(ihSeederKey, peerID) } else { - err = s.addPeer(ihSeederKey, peerId) + err = s.addPeer(ihSeederKey, peerID) } } return err @@ -148,22 +153,22 @@ func (s store) GraduateLeecher(ih bittorrent.InfoHash, peer bittorrent.Peer) (er // AnnouncePeers is the same function as redis.AnnouncePeers func (s store) AnnouncePeers(ih bittorrent.InfoHash, seeder bool, numWant int, peer bittorrent.Peer) ([]bittorrent.Peer, error) { log.Debug("storage: KeyDB: AnnouncePeers", log.Fields{ - "InfoHash": ih, - "Seeder": seeder, - "NumWant": numWant, - "Peer": peer, + "infoHash": ih, + "seeder": seeder, + "numWant": numWant, + "peer": peer, }) - return s.GetPeers(ih, seeder, numWant, peer, func(ctx context.Context, infoHashKey string) *redis.StringSliceCmd { - return s.SRandMemberN(context.TODO(), infoHashKey, int64(numWant)) + return s.GetPeers(ih, seeder, numWant, peer, func(ctx context.Context, infoHashKey string, maxCount int) *redis.StringSliceCmd { + return s.SRandMemberN(context.TODO(), infoHashKey, int64(maxCount)) }) } // ScrapeSwarm is the same function as redis.ScrapeSwarm except `SCard` call instead of `HLen` func (s store) ScrapeSwarm(ih bittorrent.InfoHash, peer bittorrent.Peer) (leechers uint32, seeders uint32, snatched uint32) { log.Debug("storage: KeyDB ScrapeSwarm", log.Fields{ - "InfoHash": ih, - "Peer": peer, + "infoHash": ih, + "peer": peer, }) leechers, seeders = s.CountPeers(ih, s.SCard) return diff --git a/storage/keydb/storage_test.go b/storage/keydb/storage_test.go index 023545e..d363333 100644 --- a/storage/keydb/storage_test.go +++ b/storage/keydb/storage_test.go @@ -21,7 +21,7 @@ var cfg = r.Config{ func createNew() s.PeerStorage { var ps s.PeerStorage var err error - ps, err = New(cfg) + ps, err = newStore(cfg) if err != nil { panic(fmt.Sprint("Unable to create KeyDB connection: ", err, "\nThis driver needs real KeyDB instance")) } diff --git a/storage/memory/storage.go b/storage/memory/storage.go index 0c23054..0e305ca 100644 --- a/storage/memory/storage.go +++ b/storage/memory/storage.go @@ -30,10 +30,10 @@ const ( func init() { // Register the storage driver. - storage.RegisterBuilder(Name, Builder) + storage.RegisterBuilder(Name, builder) } -func Builder(icfg conf.MapConfig) (storage.PeerStorage, error) { +func builder(icfg conf.MapConfig) (storage.PeerStorage, error) { var cfg Config if err := icfg.Unmarshal(&cfg); err != nil { return nil, err @@ -49,8 +49,8 @@ type Config struct { // LogFields renders the current config as a set of Logrus fields. func (cfg Config) LogFields() log.Fields { return log.Fields{ - "Name": Name, - "ShardCount": cfg.ShardCount, + "name": Name, + "shardCount": cfg.ShardCount, } } @@ -64,9 +64,9 @@ func (cfg Config) Validate() Config { if cfg.ShardCount <= 0 || cfg.ShardCount > (math.MaxInt/2) { validcfg.ShardCount = defaultShardCount log.Warn("falling back to default configuration", log.Fields{ - "Name": Name + ".ShardCount", - "Provided": cfg.ShardCount, - "Default": validcfg.ShardCount, + "name": Name + ".ShardCount", + "provided": cfg.ShardCount, + "default": validcfg.ShardCount, }) } diff --git a/storage/redis/storage.go b/storage/redis/storage.go index c14d6d2..a011084 100644 --- a/storage/redis/storage.go +++ b/storage/redis/storage.go @@ -2,7 +2,7 @@ // BitTorrent tracker keeping peer data in redis with hash. // There two categories of hash: // -// - CHI_{L,S}_ (hash type) +// - CHI_{L,S}{4,6}_ (hash type) // To save peers that hold the infohash, used for fast searching, // deleting, and timeout handling // @@ -50,10 +50,14 @@ const ( PrefixKey = "CHI_" // IHKey redis hash key for all info hashes IHKey = "CHI_I" - // IHSeederKey redis hash key prefix for seeders - IHSeederKey = "CHI_S_" - // IHLeecherKey redis hash key prefix for leechers - IHLeecherKey = "CHI_L_" + // IH4SeederKey redis hash key prefix for IPv4 seeders + IH4SeederKey = "CHI_S4_" + // IH6SeederKey redis hash key prefix for IPv6 seeders + IH6SeederKey = "CHI_S6_" + // IH4LeecherKey redis hash key prefix for IPv4 leechers + IH4LeecherKey = "CHI_L4_" + // IH6LeecherKey redis hash key prefix for IPv6 leechers + IH6LeecherKey = "CHI_L6_" // CountSeederKey redis key for seeder count CountSeederKey = "CHI_C_S" // CountLeecherKey redis key for leecher count @@ -65,10 +69,10 @@ var ErrSentinelAndClusterChecked = errors.New("unable to use both cluster and se func init() { // Register the storage builder. - storage.RegisterBuilder(Name, Builder) + storage.RegisterBuilder(Name, builder) } -func Builder(icfg conf.MapConfig) (storage.PeerStorage, error) { +func builder(icfg conf.MapConfig) (storage.PeerStorage, error) { // Unmarshal the bytes into the proper config type. var cfg Config @@ -76,10 +80,10 @@ func Builder(icfg conf.MapConfig) (storage.PeerStorage, error) { return nil, err } - return New(cfg) + return newStore(cfg) } -func New(cfg Config) (*store, error) { +func newStore(cfg Config) (*store, error) { cfg, err := cfg.Validate() if err != nil { return nil, err @@ -117,17 +121,17 @@ type Config struct { // LogFields renders the current config as a set of Logrus fields. func (cfg Config) LogFields() log.Fields { return log.Fields{ - "Name": Name, - "PeerLifetime": cfg.PeerLifetime, - "Addresses": cfg.Addresses, - "DB": cfg.DB, - "PoolSize": cfg.PoolSize, - "Sentinel": cfg.Sentinel, - "SentinelMaster": cfg.SentinelMaster, - "Cluster": cfg.Cluster, - "ReadTimeout": cfg.ReadTimeout, - "WriteTimeout": cfg.WriteTimeout, - "ConnectTimeout": cfg.ConnectTimeout, + "name": Name, + "peerLifetime": cfg.PeerLifetime, + "addresses": cfg.Addresses, + "db": cfg.DB, + "poolSize": cfg.PoolSize, + "sentinel": cfg.Sentinel, + "sentinelMaster": cfg.SentinelMaster, + "cluster": cfg.Cluster, + "readTimeout": cfg.ReadTimeout, + "writeTimeout": cfg.WriteTimeout, + "connectTimeout": cfg.ConnectTimeout, } } @@ -154,42 +158,43 @@ func (cfg Config) Validate() (Config, error) { if len(cfg.Addresses) == 0 { validCfg.Addresses = []string{defaultRedisAddress} log.Warn("falling back to default configuration", log.Fields{ - "Name": Name + ".Addresses", - "Provided": cfg.Addresses, - "Default": validCfg.Addresses, + "name": Name + ".Addresses", + "provided": cfg.Addresses, + "default": validCfg.Addresses, }) } if cfg.ReadTimeout <= 0 { validCfg.ReadTimeout = defaultReadTimeout log.Warn("falling back to default configuration", log.Fields{ - "Name": Name + ".ReadTimeout", - "Provided": cfg.ReadTimeout, - "Default": validCfg.ReadTimeout, + "name": Name + ".ReadTimeout", + "provided": cfg.ReadTimeout, + "default": validCfg.ReadTimeout, }) } if cfg.WriteTimeout <= 0 { validCfg.WriteTimeout = defaultWriteTimeout log.Warn("falling back to default configuration", log.Fields{ - "Name": Name + ".WriteTimeout", - "Provided": cfg.WriteTimeout, - "Default": validCfg.WriteTimeout, + "name": Name + ".WriteTimeout", + "provided": cfg.WriteTimeout, + "default": validCfg.WriteTimeout, }) } if cfg.ConnectTimeout <= 0 { validCfg.ConnectTimeout = defaultConnectTimeout log.Warn("falling back to default configuration", log.Fields{ - "Name": Name + ".ConnectTimeout", - "Provided": cfg.ConnectTimeout, - "Default": validCfg.ConnectTimeout, + "name": Name + ".ConnectTimeout", + "provided": cfg.ConnectTimeout, + "default": validCfg.ConnectTimeout, }) } return validCfg, nil } +// Connect creates redis client from configuration func (cfg Config) Connect() (con Connection, err error) { var rs redis.UniversalClient switch { @@ -287,6 +292,7 @@ func (ps *store) ScheduleStatisticsCollection(reportInterval time.Duration) { }() } +// Connection is wrapper for redis.UniversalClient type Connection struct { redis.UniversalClient } @@ -309,7 +315,7 @@ func (ps *store) count(key string, getLength bool) (n uint64) { if err != nil { log.Error("storage: Redis: GET/SCARD failure", log.Fields{ "key": key, - "Error": err, + "error": err, }) } return @@ -336,6 +342,8 @@ func (ps *store) tx(txf func(tx redis.Pipeliner) error) (err error) { return } +// AsNil returns nil if provided err is redis.Nil +// otherwise returns err func AsNil(err error) error { if err == nil || errors.Is(err, redis.Nil) { return nil @@ -343,14 +351,37 @@ func AsNil(err error) error { return err } -func (ps *store) putPeer(infoHashKey, peerCountKey, peerId string) error { +// InfoHashKey generates redis key for provided hash and flags +func InfoHashKey(infoHash string, seeder, v6 bool) (infoHashKey string) { + var bm int + if seeder { + bm = 0b01 + } + if v6 { + bm |= 0b10 + } + switch bm { + case 0b11: + infoHashKey = IH6SeederKey + case 0b10: + infoHashKey = IH6LeecherKey + case 0b01: + infoHashKey = IH4SeederKey + case 0b00: + infoHashKey = IH4LeecherKey + } + infoHashKey += infoHash + return +} + +func (ps *store) putPeer(infoHashKey, peerCountKey, peerID string) error { log.Debug("storage: Redis: PutPeer", log.Fields{ - "InfoHashKey": infoHashKey, - "PeerCountKey": peerCountKey, - "PeerId": peerId, + "infoHashKey": infoHashKey, + "peerCountKey": peerCountKey, + "peerID": peerID, }) return ps.tx(func(tx redis.Pipeliner) (err error) { - if err = tx.HSet(context.TODO(), infoHashKey, peerId, ps.getClock()).Err(); err != nil { + if err = tx.HSet(context.TODO(), infoHashKey, peerID, ps.getClock()).Err(); err != nil { return } if err = tx.Incr(context.TODO(), peerCountKey).Err(); err != nil { @@ -361,13 +392,13 @@ func (ps *store) putPeer(infoHashKey, peerCountKey, peerId string) error { }) } -func (ps *store) delPeer(infoHashKey, peerCountKey, peerId string) error { +func (ps *store) delPeer(infoHashKey, peerCountKey, peerID string) error { log.Debug("storage: Redis: DeletePeer", log.Fields{ - "InfoHashKey": infoHashKey, - "PeerCountKey": peerCountKey, - "PeerId": peerId, + "infoHashKey": infoHashKey, + "peerCountKey": peerCountKey, + "peerID": peerID, }) - deleted, err := ps.HDel(context.TODO(), infoHashKey, peerId).Uint64() + deleted, err := ps.HDel(context.TODO(), infoHashKey, peerID).Uint64() err = AsNil(err) if err == nil { if deleted == 0 { @@ -381,32 +412,32 @@ func (ps *store) delPeer(infoHashKey, peerCountKey, peerId string) error { } func (ps *store) PutSeeder(ih bittorrent.InfoHash, peer bittorrent.Peer) error { - return ps.putPeer(IHSeederKey+ih.RawString(), CountSeederKey, peer.RawString()) + return ps.putPeer(InfoHashKey(ih.RawString(), true, peer.Addr().Is6()), CountSeederKey, peer.RawString()) } func (ps *store) DeleteSeeder(ih bittorrent.InfoHash, peer bittorrent.Peer) error { - return ps.delPeer(IHSeederKey+ih.RawString(), CountSeederKey, peer.RawString()) + return ps.delPeer(InfoHashKey(ih.RawString(), true, peer.Addr().Is6()), CountSeederKey, peer.RawString()) } func (ps *store) PutLeecher(ih bittorrent.InfoHash, peer bittorrent.Peer) error { - return ps.putPeer(IHLeecherKey+ih.RawString(), CountLeecherKey, peer.RawString()) + return ps.putPeer(InfoHashKey(ih.RawString(), false, peer.Addr().Is6()), CountLeecherKey, peer.RawString()) } func (ps *store) DeleteLeecher(ih bittorrent.InfoHash, peer bittorrent.Peer) error { - return ps.delPeer(IHLeecherKey+ih.RawString(), CountLeecherKey, peer.RawString()) + return ps.delPeer(InfoHashKey(ih.RawString(), false, peer.Addr().Is6()), CountLeecherKey, peer.RawString()) } func (ps *store) GraduateLeecher(ih bittorrent.InfoHash, peer bittorrent.Peer) error { log.Debug("storage: Redis: GraduateLeecher", log.Fields{ - "InfoHash": ih, - "Peer": peer, + "infoHash": ih, + "peer": peer, }) - infoHash, peerId := ih.RawString(), peer.RawString() - ihSeederKey, ihLeecherKey := IHSeederKey+infoHash, IHLeecherKey+infoHash + infoHash, peerID, isV6 := ih.RawString(), peer.RawString(), peer.Addr().Is6() + ihSeederKey, ihLeecherKey := InfoHashKey(infoHash, true, isV6), InfoHashKey(infoHash, false, isV6) return ps.tx(func(tx redis.Pipeliner) error { - deleted, err := tx.HDel(context.TODO(), ihLeecherKey, peerId).Uint64() + deleted, err := tx.HDel(context.TODO(), ihLeecherKey, peerID).Uint64() err = AsNil(err) if err == nil { if deleted > 0 { @@ -414,7 +445,7 @@ func (ps *store) GraduateLeecher(ih bittorrent.InfoHash, peer bittorrent.Peer) e } } if err == nil { - err = tx.HSet(context.TODO(), ihSeederKey, peerId, ps.getClock()).Err() + err = tx.HSet(context.TODO(), ihSeederKey, peerID, ps.getClock()).Err() } if err == nil { err = tx.Incr(context.TODO(), CountSeederKey).Err() @@ -426,19 +457,16 @@ func (ps *store) GraduateLeecher(ih bittorrent.InfoHash, peer bittorrent.Peer) e }) } -func (ps Connection) parsePeersList(peersResult *redis.StringSliceCmd, skipPeerId string, v4Only bool) (peers []bittorrent.Peer, err error) { +func (ps Connection) parsePeersList(peersResult *redis.StringSliceCmd, skipPeerID string) (peers []bittorrent.Peer, err error) { var peerIds []string peerIds, err = peersResult.Result() if err = AsNil(err); err == nil { - for _, peerId := range peerIds { - if peerId != skipPeerId { - if p, err := bittorrent.NewPeer(peerId); err == nil { - // If peer from request is V4 only, it won't receive V6 peers from DB - if !(v4Only && p.Addr().Is6()) { - peers = append(peers, p) - } + for _, peerID := range peerIds { + if peerID != skipPeerID { + if p, err := bittorrent.NewPeer(peerID); err == nil { + peers = append(peers, p) } else { - log.Error("storage: Redis: unable to decode leecher", log.Fields{"PeerId": peerId}) + log.Error("storage: Redis: unable to decode leecher", log.Fields{"peerID": peerID}) } } } @@ -446,31 +474,52 @@ func (ps Connection) parsePeersList(peersResult *redis.StringSliceCmd, skipPeerI return } -func (ps Connection) GetPeers(ih bittorrent.InfoHash, forSeeder bool, maxCount int, peer bittorrent.Peer, - membersFn func(context.Context, string) *redis.StringSliceCmd) (peers []bittorrent.Peer, err error) { +type getPeersFn func(context.Context, string, int) *redis.StringSliceCmd - infoHash, peerId, isV4 := ih.RawString(), peer.RawString(), peer.Addr().Is4() +// GetPeers retrieves peers for provided info hash by calling membersFn and +// converts result to bittorrent.Peer array. +// If forSeeder set to true - returns only leechers, if false - +// seeders and if maxCount not reached - leechers. +func (ps Connection) GetPeers(ih bittorrent.InfoHash, forSeeder bool, maxCount int, peer bittorrent.Peer, membersFn getPeersFn) (out []bittorrent.Peer, err error) { + infoHash, peerID, isV6 := ih.RawString(), peer.RawString(), peer.Addr().Is6() + + var infoHashKeys []string if forSeeder { - peers, err = ps.parsePeersList(membersFn(context.TODO(), IHLeecherKey+infoHash), peerId, isV4) + infoHashKeys = append(infoHashKeys, InfoHashKey(infoHash, false, isV6), + InfoHashKey(infoHash, false, !isV6)) } else { - // Append as many seeders as possible. - peers, err = ps.parsePeersList(membersFn(context.TODO(), IHSeederKey+infoHash), peerId, isV4) - if err != nil { - return - } + // Append as many peers as possible. + // Priority: + // same ip family seeders > same ip family leechers > + // foreign ip family seeders > foreign ip family leechers + infoHashKeys = append(infoHashKeys, + InfoHashKey(infoHash, true, isV6), + InfoHashKey(infoHash, false, isV6), + InfoHashKey(infoHash, true, !isV6), + InfoHashKey(infoHash, false, !isV6)) + } - if maxCount -= len(peers); maxCount > 0 { - if leechers, err := ps.parsePeersList(membersFn(context.TODO(), IHLeecherKey+infoHash), peerId, isV4); err == nil { - peers = append(peers, leechers...) - } else { - log.Warn("storage: Redis: error occurred while receiving leechers", log.Fields{"InfoHash": ih, "Error": err}) - } + for _, infoHashKey := range infoHashKeys { + var peers []bittorrent.Peer + peers, err = ps.parsePeersList(membersFn(context.TODO(), infoHashKey, maxCount), peerID) + maxCount -= len(peers) + out = append(out, peers...) + if err != nil || maxCount <= 0 { + break } + } - if len(peers) == 0 && !isV4 { + if l := len(out); err == nil { + if l == 0 { err = storage.ErrResourceDoesNotExist } + } else if l > 0 { + err = nil + log.Warn("storage: Redis: error occurred while retrieving peers", log.Fields{ + "infoHash": infoHash, + "error": err, + }) } return @@ -478,50 +527,47 @@ func (ps Connection) GetPeers(ih bittorrent.InfoHash, forSeeder bool, maxCount i func (ps *store) AnnouncePeers(ih bittorrent.InfoHash, seeder bool, numWant int, peer bittorrent.Peer) ([]bittorrent.Peer, error) { log.Debug("storage: Redis: AnnouncePeers", log.Fields{ - "InfoHash": ih, - "Seeder": seeder, - "NumWant": numWant, - "Peer": peer, + "infoHash": ih, + "seeder": seeder, + "numWant": numWant, + "peer": peer, }) - return ps.GetPeers(ih, seeder, numWant, peer, func(ctx context.Context, infoHashKey string) *redis.StringSliceCmd { - return ps.HRandField(ctx, infoHashKey, numWant, false) + return ps.GetPeers(ih, seeder, numWant, peer, func(ctx context.Context, infoHashKey string, maxCount int) *redis.StringSliceCmd { + return ps.HRandField(ctx, infoHashKey, maxCount, false) }) } -func (ps Connection) CountPeers(ih bittorrent.InfoHash, countFn func(context.Context, string) *redis.IntCmd) (leechersCount, seedersCount uint32) { +type getPeerCountFn func(context.Context, string) *redis.IntCmd + +func (ps Connection) countPeers(infoHashKey string, countFn getPeerCountFn) uint32 { + count, err := countFn(context.TODO(), infoHashKey).Result() + err = AsNil(err) + if err != nil { + log.Error("storage: Redis: key size calculation failure", log.Fields{ + "infoHashKey": infoHashKey, + "error": err, + }) + } + return uint32(count) +} + +// CountPeers calls provided countFn and returns seeders and leechers count for specified info hash +func (ps Connection) CountPeers(ih bittorrent.InfoHash, countFn getPeerCountFn) (leechersCount, seedersCount uint32) { infoHash := ih.RawString() - ihSeederKey, ihLeecherKey := IHSeederKey+infoHash, IHLeecherKey+infoHash - count, err := countFn(context.TODO(), ihLeecherKey).Result() - err = AsNil(err) - if err != nil { - log.Error("storage: Redis: key size calculation failure", log.Fields{ - "InfoHashKey": ihLeecherKey, - "Error": err, - }) - return - } - leechersCount = uint32(count) - - count, err = countFn(context.TODO(), ihSeederKey).Result() - err = AsNil(err) - if err != nil { - log.Error("storage: Redis: key size calculation failure", log.Fields{ - "InfoHashKey": ihSeederKey, - "Error": err, - }) - return - } - seedersCount = uint32(count) + leechersCount = ps.countPeers(InfoHashKey(infoHash, false, false), countFn) + + ps.countPeers(InfoHashKey(infoHash, false, true), countFn) + seedersCount = ps.countPeers(InfoHashKey(infoHash, true, false), countFn) + + ps.countPeers(InfoHashKey(infoHash, true, true), countFn) return } -func (ps *store) ScrapeSwarm(ih bittorrent.InfoHash, peer bittorrent.Peer) (leechers uint32, seeders uint32, snatched uint32) { +func (ps *store) ScrapeSwarm(ih bittorrent.InfoHash, peer bittorrent.Peer) (leechers uint32, seeders uint32, _ uint32) { log.Debug("storage: Redis ScrapeSwarm", log.Fields{ - "InfoHash": ih, - "Peer": peer, + "infoHash": ih, + "peer": peer, }) leechers, seeders = ps.CountPeers(ih, ps.HLen) @@ -531,6 +577,7 @@ func (ps *store) ScrapeSwarm(ih bittorrent.InfoHash, peer bittorrent.Peer) (leec const argNumErrorMsg = "ERR wrong number of arguments" +// Put - storage.DataStorage implementation func (ps Connection) Put(ctx string, values ...storage.Entry) (err error) { if l := len(values); l > 0 { if l == 1 { @@ -556,11 +603,13 @@ func (ps Connection) Put(ctx string, values ...storage.Entry) (err error) { return } +// Contains - storage.DataStorage implementation func (ps Connection) Contains(ctx string, key string) (bool, error) { exist, err := ps.HExists(context.TODO(), PrefixKey+ctx, key).Result() return exist, AsNil(err) } +// Load - storage.DataStorage implementation func (ps Connection) Load(ctx string, key string) (v any, err error) { v, err = ps.HGet(context.TODO(), PrefixKey+ctx, key).Result() if err != nil && errors.Is(err, redis.Nil) { @@ -569,6 +618,7 @@ func (ps Connection) Load(ctx string, key string) (v any, err error) { return } +// Delete - storage.DataStorage implementation func (ps Connection) Delete(ctx string, keys ...string) (err error) { if len(keys) > 0 { err = AsNil(ps.HDel(context.TODO(), PrefixKey+ctx, keys...).Err()) @@ -586,6 +636,7 @@ func (ps Connection) Delete(ctx string, keys ...string) (err error) { return } +// Preservable - storage.DataStorage implementation func (Connection) Preservable() bool { return true } @@ -645,12 +696,12 @@ func (ps *store) gc(cutoff time.Time) { for _, infoHashKey := range infoHashKeys { var cntKey string var seeder bool - if seeder = strings.HasPrefix(infoHashKey, IHSeederKey); seeder { + if seeder = strings.HasPrefix(infoHashKey, IH4SeederKey) || strings.HasPrefix(infoHashKey, IH6SeederKey); seeder { cntKey = CountSeederKey - } else if strings.HasPrefix(infoHashKey, IHLeecherKey) { + } else if strings.HasPrefix(infoHashKey, IH4LeecherKey) || strings.HasPrefix(infoHashKey, IH6LeecherKey) { cntKey = CountLeecherKey } else { - log.Warn("storage: Redis: unexpected record found in info hash set", log.Fields{"InfoHashKey": infoHashKey}) + log.Warn("storage: Redis: unexpected record found in info hash set", log.Fields{"infoHashKey": infoHashKey}) continue } // list all (peer, timeout) pairs for the ih @@ -658,18 +709,18 @@ func (ps *store) gc(cutoff time.Time) { err = AsNil(err) if err == nil { peersToRemove := make([]string, 0) - for peerId, timeStamp := range peerList { + for peerID, timeStamp := range peerList { if mtime, err := strconv.ParseInt(timeStamp, 10, 64); err == nil { if mtime <= cutoffNanos { - log.Debug("storage: Redis: adding peer to remove list", log.Fields{"PeerId": peerId}) - peersToRemove = append(peersToRemove, peerId) + log.Debug("storage: Redis: adding peer to remove list", log.Fields{"peerID": peerID}) + peersToRemove = append(peersToRemove, peerID) } } else { log.Error("storage: Redis: unable to decode peer timestamp", log.Fields{ - "InfoHashKey": infoHashKey, - "PeerId": peerId, - "Timestamp": timeStamp, - "Error": err, + "infoHashKey": infoHashKey, + "peerID": peerID, + "timestamp": timeStamp, + "error": err, }) } } @@ -684,9 +735,9 @@ func (ps *store) gc(cutoff time.Time) { err = AsNil(err) if err != nil { log.Error("storage: Redis: unable to delete peer", log.Fields{ - "InfoHashKey": infoHashKey, - "PeerId": k, - "Error": err, + "infoHashKey": infoHashKey, + "peerID": k, + "error": err, }) } else { removedPeerCount += count @@ -694,18 +745,18 @@ func (ps *store) gc(cutoff time.Time) { } } else { log.Error("storage: Redis: unable to delete peers", log.Fields{ - "InfoHashKey": infoHashKey, - "PeerIds": peersToRemove, - "Error": err, + "infoHashKey": infoHashKey, + "peerIds": peersToRemove, + "error": err, }) } } if removedPeerCount > 0 { // DECR seeder/leecher counter if err = ps.DecrBy(context.Background(), cntKey, removedPeerCount).Err(); err != nil { log.Error("storage: Redis: unable to decrement seeder/leecher peer count", log.Fields{ - "InfoHashKey": infoHashKey, - "CountKey": cntKey, - "Error": err, + "infoHashKey": infoHashKey, + "countKey": cntKey, + "error": err, }) } } @@ -725,19 +776,19 @@ func (ps *store) gc(cutoff time.Time) { }, infoHashKey)) if err != nil { log.Error("storage: Redis: unable to clean info hash records", log.Fields{ - "InfoHashKey": infoHashKey, - "Error": err, + "infoHashKey": infoHashKey, + "error": err, }) } } else { log.Error("storage: Redis: unable to fetch info hash peers", log.Fields{ - "InfoHashKey": infoHashKey, - "Error": err, + "infoHashKey": infoHashKey, + "error": err, }) } } } else { - log.Error("storage: Redis: unable to fetch info hash set", log.Fields{"HashSet": IHKey, "Error": err}) + log.Error("storage: Redis: unable to fetch info hash set", log.Fields{"hashSet": IHKey, "error": err}) } } diff --git a/storage/redis/storage_test.go b/storage/redis/storage_test.go index 1561a78..3e305d4 100644 --- a/storage/redis/storage_test.go +++ b/storage/redis/storage_test.go @@ -5,8 +5,6 @@ import ( "testing" "time" - "github.com/alicebob/miniredis" - s "github.com/sot-tech/mochi/storage" "github.com/sot-tech/mochi/storage/test" ) @@ -22,19 +20,9 @@ var cfg = Config{ func createNew() s.PeerStorage { var ps s.PeerStorage var err error - ps, err = New(cfg) + ps, err = newStore(cfg) if err != nil { - fmt.Println("unable to create real redis connection: ", err, " using simulator") - var rs *miniredis.Miniredis - rs, err = miniredis.Run() - if err != nil { - panic(err) - } - cfg.Addresses = []string{rs.Addr()} - ps, err = New(cfg) - } - if err != nil { - panic(err) + panic(fmt.Sprint("Unable to create KeyDB connection: ", err, "\nThis driver needs real Redis instance")) } return ps } diff --git a/storage/storage.go b/storage/storage.go index 6c1801d..711af6c 100644 --- a/storage/storage.go +++ b/storage/storage.go @@ -39,9 +39,9 @@ func (c Config) sanitizeGCConfig() (gcInterval, peerTTL time.Duration) { if c.GarbageCollectionInterval <= 0 { gcInterval = defaultGarbageCollectionInterval log.Warn("falling back to default configuration", log.Fields{ - "Name": "GarbageCollectionInterval", - "Provided": c.GarbageCollectionInterval, - "Default": defaultGarbageCollectionInterval, + "name": "GarbageCollectionInterval", + "provided": c.GarbageCollectionInterval, + "default": defaultGarbageCollectionInterval, }) } else { gcInterval = c.GarbageCollectionInterval @@ -49,9 +49,9 @@ func (c Config) sanitizeGCConfig() (gcInterval, peerTTL time.Duration) { if c.PeerLifetime <= 0 { peerTTL = defaultPeerLifetime log.Warn("falling back to default configuration", log.Fields{ - "Name": "PeerLifetime", - "Provided": c.PeerLifetime, - "Default": defaultPeerLifetime, + "name": "PeerLifetime", + "provided": c.PeerLifetime, + "default": defaultPeerLifetime, }) } else { peerTTL = c.PeerLifetime @@ -63,9 +63,9 @@ func (c Config) sanitizeStatisticsConfig() (statInterval time.Duration) { if c.PrometheusReportingInterval < 0 { statInterval = defaultPrometheusReportingInterval log.Warn("falling back to default configuration", log.Fields{ - "Name": "PrometheusReportingInterval", - "Provided": c.PrometheusReportingInterval, - "Default": defaultPrometheusReportingInterval, + "name": "PrometheusReportingInterval", + "provided": c.PrometheusReportingInterval, + "default": defaultPrometheusReportingInterval, }) } return