diff --git a/bittorrent/bittorrent.go b/bittorrent/bittorrent.go index 0b8c744..0be552d 100644 --- a/bittorrent/bittorrent.go +++ b/bittorrent/bittorrent.go @@ -6,6 +6,7 @@ package bittorrent import ( "crypto/sha1" "crypto/sha256" + "encoding/binary" "encoding/hex" "fmt" "net" @@ -23,7 +24,7 @@ const PeerIDLen = 20 type PeerID [PeerIDLen]byte // ErrInvalidPeerIDSize holds error about invalid PeerID size -var ErrInvalidPeerIDSize = errors.New("peer ID must be 20 bytes") +var ErrInvalidPeerIDSize = fmt.Errorf("peer ID must be %d bytes", PeerIDLen) // NewPeerID creates a PeerID from a byte slice. // @@ -63,7 +64,7 @@ var ( // ErrInvalidHashType holds error about invalid InfoHash input type ErrInvalidHashType = errors.New("info hash must be provided as byte slice or raw/hex string") // ErrInvalidHashSize holds error about invalid InfoHash size - ErrInvalidHashSize = errors.New("info hash must be either 20 (for torrent V1) or 32 (V2) bytes") + ErrInvalidHashSize = fmt.Errorf("info hash must be either %d (for torrent V1) or %d (V2) bytes", InfoHashV1Len, InfoHashV2Len) ) // TruncateV1 returns truncated to 20-bytes length array of the corresponding InfoHash. @@ -77,12 +78,12 @@ func (i InfoHash) TruncateV1() InfoHash { } // NewInfoHash creates an InfoHash from a byte slice or raw/hex string. -func NewInfoHash(b any) (InfoHash, error) { - if b == nil { +func NewInfoHash(data any) (InfoHash, error) { + if data == nil { return NoneInfoHash, ErrInvalidHashType } var ba []byte - switch t := b.(type) { + switch t := data.(type) { case [InfoHashV1Len]byte: ba = t[:] case [InfoHashV2Len]byte: @@ -225,7 +226,7 @@ func (af AddressFamily) String() string { case IPv6: return "IPv6" default: - panic("tried to print unknown AddressFamily") + return "" } } @@ -253,6 +254,44 @@ type Peer struct { Port uint16 } +// PeerMinimumLen is the least allowed length of string serialized Peer +const PeerMinimumLen = PeerIDLen + 2 + net.IPv4len + +var ( + // ErrInvalidAddressFamily holds error about invalid address family + ErrInvalidAddressFamily = fmt.Errorf("address family must be %d(IPv4) or %d(IPv6)", IPv4, IPv6) + + // ErrInvalidPeerDataSize holds error about invalid Peer data size + ErrInvalidPeerDataSize = fmt.Errorf("invalid peer data it must be at least %d bytes (InfoHash + Port + IPv4)", PeerMinimumLen) +) + +// NewPeer constructs Peer from serialized by Peer.RawString data: PeerID[20by]Port[2by]net.IP[4/16by] +func NewPeer(data string) (Peer, error) { + var peer Peer + if len(data) < PeerMinimumLen { + return peer, ErrInvalidPeerDataSize + } + peerID, err := NewPeerID([]byte(data[:PeerIDLen])) + if err == nil { + peer = Peer{ + ID: peerID, + Port: binary.BigEndian.Uint16([]byte(data[PeerIDLen : PeerIDLen+2])), + IP: IP{IP: net.IP(data[PeerIDLen+2:])}, + } + + if ip := peer.IP.To4(); ip != nil { + peer.IP.IP = ip + peer.IP.AddressFamily = IPv4 + } else if len(peer.IP.IP) == net.IPv6len { // implies toReturn.IP.To4() == nil + peer.IP.AddressFamily = IPv6 + } else { + err = ErrInvalidAddressFamily + } + } + + return peer, err +} + // String implements fmt.Stringer to return a human-readable representation. // The string will have the format @[]:, for example // "0102030405060708090a0b0c0d0e0f1011121314@[10.11.12.13]:1234" @@ -260,6 +299,15 @@ func (p Peer) String() string { return fmt.Sprintf("%s@[%s]:%d", p.ID.String(), p.IP.String(), p.Port) } +// RawString generates concatenation of PeerID, net port and IP-address +func (p Peer) RawString() string { + b := make([]byte, PeerIDLen+2+len(p.IP.IP)) + copy(b[:PeerIDLen], p.ID[:]) + binary.BigEndian.PutUint16(b[PeerIDLen:PeerIDLen+2], p.Port) + copy(b[PeerIDLen+2:], p.IP.IP) + return string(b) +} + // LogFields renders the current peer as a set of Logrus fields. func (p Peer) LogFields() log.Fields { return log.Fields{ diff --git a/bittorrent/event.go b/bittorrent/event.go index 2c68ce2..87cd772 100644 --- a/bittorrent/event.go +++ b/bittorrent/event.go @@ -55,10 +55,11 @@ func NewEvent(eventStr string) (Event, error) { } // String implements Stringer for an event. -func (e Event) String() string { +func (e Event) String() (s string) { if name, ok := eventToString[e]; ok { - return name + s = name + } else { + s = "" } - - panic("bittorrent: event has no associated name") + return } diff --git a/docs/frontend.md b/docs/frontend.md index 1816ac7..3dd2f31 100644 --- a/docs/frontend.md +++ b/docs/frontend.md @@ -68,7 +68,7 @@ serves multiple transports or networks, metrics for them should be separable. It is recommended to publish one Prometheus `HistogramVec` with: -- A name like `chihaya_PROTOCOL_response_duration_milliseconds` +- A name like `mochi_PROTOCOL_response_duration_milliseconds` - A value holding the duration in milliseconds of the reported request - Labels for: - `action` (= `announce`, `scrape`, ...) diff --git a/frontend/http/prometheus.go b/frontend/http/prometheus.go index d51d6bc..63f727b 100644 --- a/frontend/http/prometheus.go +++ b/frontend/http/prometheus.go @@ -15,7 +15,7 @@ func init() { var promResponseDurationMilliseconds = prometheus.NewHistogramVec( prometheus.HistogramOpts{ - Name: "chihaya_http_response_duration_milliseconds", + Name: "mochi_http_response_duration_milliseconds", Help: "The duration of time it takes to receive and write a response to an API request", Buckets: prometheus.ExponentialBuckets(9.375, 2, 10), }, diff --git a/frontend/http/writer.go b/frontend/http/writer.go index d91bfdd..3d44894 100644 --- a/frontend/http/writer.go +++ b/frontend/http/writer.go @@ -103,8 +103,7 @@ func compact4(peer bittorrent.Peer) (buf []byte) { } else { buf = ip } - buf = append(buf, byte(peer.Port>>8)) - buf = append(buf, byte(peer.Port&0xff)) + buf = append(buf, byte(peer.Port>>8), byte(peer.Port)) return } @@ -114,8 +113,7 @@ func compact6(peer bittorrent.Peer) (buf []byte) { } else { buf = ip } - buf = append(buf, byte(peer.Port>>8)) - buf = append(buf, byte(peer.Port&0xff)) + buf = append(buf, byte(peer.Port>>8), byte(peer.Port)) return } diff --git a/frontend/udp/prometheus.go b/frontend/udp/prometheus.go index c97bedf..34e5367 100644 --- a/frontend/udp/prometheus.go +++ b/frontend/udp/prometheus.go @@ -15,7 +15,7 @@ func init() { var promResponseDurationMilliseconds = prometheus.NewHistogramVec( prometheus.HistogramOpts{ - Name: "chihaya_udp_response_duration_milliseconds", + Name: "mochi_udp_response_duration_milliseconds", Help: "The duration of time it takes to receive and write a response to an API request", Buckets: prometheus.ExponentialBuckets(9.375, 2, 10), }, diff --git a/go.mod b/go.mod index 32b850e..c1530b7 100644 --- a/go.mod +++ b/go.mod @@ -6,6 +6,7 @@ require ( github.com/SermoDigital/jose v0.9.2-0.20180104203859-803625baeddc github.com/alicebob/miniredis v2.5.0+incompatible github.com/anacrolix/torrent v1.42.0 + github.com/go-redis/redis/v8 v8.11.5 github.com/julienschmidt/httprouter v1.3.0 github.com/mendsley/gojwk v0.0.0-20141217222730-4d5ec6e58103 github.com/minio/sha256-simd v1.0.0 @@ -13,7 +14,6 @@ require ( github.com/prometheus/client_golang v1.12.1 github.com/sirupsen/logrus v1.8.1 github.com/spf13/cobra v1.4.0 - github.com/go-redis/redis/v8 v8.11.5 github.com/stretchr/testify v1.7.1 gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b ) @@ -31,8 +31,7 @@ require ( github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect github.com/fsnotify/fsnotify v1.5.1 // indirect github.com/golang/protobuf v1.5.2 // indirect - github.com/hashicorp/errwrap v1.1.0 // indirect - github.com/hashicorp/go-multierror v1.1.1 // indirect + github.com/gomodule/redigo v1.8.8 // indirect github.com/huandu/xstrings v1.3.2 // indirect github.com/inconshreveable/mousetrap v1.0.0 // indirect github.com/klauspost/cpuid/v2 v2.0.12 // indirect @@ -43,6 +42,6 @@ require ( github.com/prometheus/procfs v0.7.3 // indirect github.com/spf13/pflag v1.0.5 // indirect github.com/yuin/gopher-lua v0.0.0-20210529063254-f4c35e4016d9 // indirect - golang.org/x/sys v0.0.0-20220412071739-889880a91fd5 // indirect + golang.org/x/sys v0.0.0-20220412211240-33da011f77ad // indirect google.golang.org/protobuf v1.28.0 // indirect ) diff --git a/go.sum b/go.sum index 3acedb2..e7171ce 100644 --- a/go.sum +++ b/go.sum @@ -115,7 +115,6 @@ github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1m github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= github.com/frankban/quicktest v1.14.0 h1:+cqqvzZV87b4adx/5ayVOaYZ2CrvM4ejQvUdBzPPUss= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= -github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ= github.com/fsnotify/fsnotify v1.5.1 h1:mZcQUHVQUQWoPXXtuf9yuEXKudkV2sx1E06UadKWpgI= github.com/fsnotify/fsnotify v1.5.1/go.mod h1:T3375wBYaZdLLcVNkcVbzGHY7f1l/uK5T5Ai1i3InKU= github.com/glycerine/go-unsnap-stream v0.0.0-20180323001048-9f0cb55181dd/go.mod h1:/20jfyN9Y5QPEAprSgKAUr+glWDY39ZiUEAYOEv5dsE= @@ -135,17 +134,9 @@ github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9 github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk= github.com/go-logfmt/logfmt v0.5.0/go.mod h1:wCYkCAKZfumFQihp8CzCvQ3paCTfi41vtzG1KdI/P7A= github.com/go-logfmt/logfmt v0.5.1/go.mod h1:WYhtIu8zTZfxdn5+rREduYbwxfcBr/Vr6KEVveWlfTs= -github.com/go-redis/redis v6.15.9+incompatible h1:K0pv1D7EQUjfyoMql+r/jZqCLizCGKFlFgcHWWmHQjg= -github.com/go-redis/redis v6.15.9+incompatible/go.mod h1:NAIEuMOZ/fxfXJIrKDQDz8wamY7mA7PouImQ2Jvg6kA= -github.com/go-redis/redis/v7 v7.4.0 h1:7obg6wUoj05T0EpY0o8B59S9w5yeMWql7sw2kwNW1x4= -github.com/go-redis/redis/v7 v7.4.0/go.mod h1:JDNMw23GTyLNC4GZu9njt15ctBQVn7xjRfnwdHj/Dcg= -github.com/go-redis/redis/v8 v8.11.4/go.mod h1:2Z2wHZXdQpCDXEGzqMockDpNyYvi2l4Pxt6RJr792+w= github.com/go-redis/redis/v8 v8.11.5 h1:AcZZR7igkdvfVmQTPnu9WE37LRrO/YrBH5zWyjDC0oI= github.com/go-redis/redis/v8 v8.11.5/go.mod h1:gREzHqY1hg6oD9ngVRbLStwAWKhA0FEgq8Jd4h5lpwo= -github.com/go-redsync/redsync/v4 v4.5.0 h1:kJjDzn/iEbU+K/6w+O8b1rzuYIK/nP9EQRc5nXKW9x4= -github.com/go-redsync/redsync/v4 v4.5.0/go.mod h1:AfhgO1E6W3rlUTs6Zmz/B6qBZJFasV30lwo7nlizdDs= github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= -github.com/go-task/slim-sprig v0.0.0-20210107165309-348f09dbbbc0/go.mod h1:fyg7847qk6SyHyPtNmDHnmrv/HOrqktSC+C9fM+CJOE= github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= github.com/gogo/protobuf v1.2.0/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= @@ -178,7 +169,6 @@ github.com/golang/protobuf v1.5.2 h1:ROPKBNFfQgOUMifHyP+KYbvpjbdoFNs+aK7DXlji0Tw github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= -github.com/gomodule/redigo v1.8.2/go.mod h1:P9dn9mFrCBvWhGE1wpxx6fgq7BAeLBk+UUUzlpkBYO0= github.com/gomodule/redigo v1.8.8 h1:f6cXq6RRfiyrOJEV7p3JhLDlmawGBVBBP1MggY8Mo4E= github.com/gomodule/redigo v1.8.8/go.mod h1:7ArFNvsTjH8GMMzB4uy1snslv2BwmginuMs06a1uzZE= github.com/google/btree v0.0.0-20180124185431-e89373fe6b4a/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= @@ -194,7 +184,6 @@ github.com/google/go-cmp v0.5.1/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/ github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.6 h1:BKbKCqvP6I+rmFHt06ZmyQtvB8xAkWdhFyr0ZUNZcxQ= -github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs= github.com/google/martian/v3 v3.0.0/go.mod h1:y5Zk1BBys9G+gd6Jrk0W3cC1+ELVxBWuIGO+w/tUAp0= @@ -214,12 +203,6 @@ github.com/gopherjs/gopherjs v0.0.0-20190309154008-847fc94819f9/go.mod h1:wJfORR github.com/gopherjs/gopherjs v0.0.0-20190910122728-9d188e94fb99/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY= github.com/gorilla/context v1.1.1/go.mod h1:kBGZzfjB9CEq2AlWe17Uuf7NDRt0dE0s8S51q0aT7Yg= github.com/gorilla/mux v1.6.2/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2zaAs= -github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= -github.com/hashicorp/errwrap v1.1.0 h1:OxrOeh75EUXMY8TBjag2fzXGZ40LB6IKw45YeGUDY2I= -github.com/hashicorp/errwrap v1.1.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= -github.com/hashicorp/go-multierror v1.1.0/go.mod h1:spPvp8C1qA32ftKqdAHm4hHTbPw+vmowP0z+KUhOZdA= -github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+lD48awMYo= -github.com/hashicorp/go-multierror v1.1.1/go.mod h1:iw975J/qwKPdAO1clOe2L8331t/9/fmwbPZ6JB6eMoM= github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= @@ -273,18 +256,12 @@ github.com/mschoch/smat v0.0.0-20160514031455-90eadee771ae/go.mod h1:qAyveg+e4CE github.com/mschoch/smat v0.2.0/go.mod h1:kc9mz7DoBKqDyiRL7VZN8KvXQMWeTaVnttLRXOlotKw= github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= -github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A= -github.com/nxadm/tail v1.4.8/go.mod h1:+ncqLTQzXmGhMZNUePPaPqPvBxHAIsmXswZKocGu+AU= +github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE= github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= github.com/onsi/ginkgo v1.7.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= -github.com/onsi/ginkgo v1.10.1/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= -github.com/onsi/ginkgo v1.12.1/go.mod h1:zj2OWP4+oCPe1qIXoGWkgMRwljMUYCdkwsT2108oapk= -github.com/onsi/ginkgo v1.16.4/go.mod h1:dX+/inL/fNMqNlz0e9LfyB9TswhZpCVdJM/Z6Vvnwo0= +github.com/onsi/ginkgo v1.16.5 h1:8xi0RTUf59SOSfEtZMvwTvXYMzG4gV23XVHOZiXNtnE= github.com/onsi/gomega v1.4.3/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY= -github.com/onsi/gomega v1.7.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY= -github.com/onsi/gomega v1.7.1/go.mod h1:XdKZgCCFLUoM/7CFJVPcG8C1xQ1AJ0vpAezJrB7JYyY= -github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1ybHNo= -github.com/onsi/gomega v1.16.0/go.mod h1:HnhC7FXeEQY45zxNK3PPoIUhzk/80Xly9PcubAlGdZY= +github.com/onsi/gomega v1.18.1 h1:M1GfJqGRrBrrGGsbxzV5dqM2U2ApXefZCQpkukxYRLE= github.com/openzipkin/zipkin-go v0.1.6/go.mod h1:QgAqvLzwWbR/WpD4A3cGpPtJrZXNIiJc5AZX7/PBEpw= github.com/philhofer/fwd v1.0.0/go.mod h1:gk3iGcWd9+svBvR0sR+KPcfE+RNWozjowpeBVG3ZVNU= github.com/pierrec/lz4 v2.0.5+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY= @@ -352,12 +329,9 @@ github.com/stretchr/testify v1.2.1/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXf github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= -github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.1 h1:5TQK59W5E3v0r2duFAb7P95B6hEeOyEnHRa8MjYSMTY= github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= -github.com/stvp/tempredis v0.0.0-20181119212430-b82af8480203 h1:QVqDTf3h2WHt08YuiTGPZLls0Wq99X9bWd0Q5ZSBesM= -github.com/stvp/tempredis v0.0.0-20181119212430-b82af8480203/go.mod h1:oqN97ltKNihBbwlX8dLpwxCl3+HnXKV/R0e+sRLd9C8= github.com/tinylib/msgp v1.0.2/go.mod h1:+d+yLhGm8mzTaHzB+wgMYrodPfmZrzkirds8fDWklFE= github.com/tinylib/msgp v1.1.0/go.mod h1:+d+yLhGm8mzTaHzB+wgMYrodPfmZrzkirds8fDWklFE= github.com/tinylib/msgp v1.1.2/go.mod h1:+d+yLhGm8mzTaHzB+wgMYrodPfmZrzkirds8fDWklFE= @@ -366,7 +340,6 @@ github.com/willf/bitset v1.1.10/go.mod h1:RjeCKbqT1RxIR/KWY6phxZiaY1IyutSBfGjNPy github.com/yuin/goldmark v1.1.25/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.1.32/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= -github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/gopher-lua v0.0.0-20210529063254-f4c35e4016d9 h1:k/gmLsJDWwWqbLCur2yWnJzwQEKRcAHXo6seXGuSwWw= github.com/yuin/gopher-lua v0.0.0-20210529063254-f4c35e4016d9/go.mod h1:E1AXubJBdNmFERAOucpDIxNzeGfLzg0mYh+UfMWdChA= go.opencensus.io v0.20.1/go.mod h1:6WKK9ahsWS3RSO+PY9ZHZUfv2irvY6gN279GOPZjmmk= @@ -428,7 +401,6 @@ golang.org/x/net v0.0.0-20190613194153-d28f0bde5980/go.mod h1:z5CRVTTTmAJ677TzLL golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20190628185345-da137c7871d7/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20190724013045-ca1201d0de80/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= -golang.org/x/net v0.0.0-20190923162816-aa69164e4478/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20191209160850-c0dbc17a3553/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20200114155413-6afb5195e5aa/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20200202094626-16171245cfb2/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= @@ -439,15 +411,13 @@ golang.org/x/net v0.0.0-20200324143707-d3edc9973b7e/go.mod h1:qpuaurCH72eLCgpAm/ golang.org/x/net v0.0.0-20200501053045-e0ff5e5a1de5/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A= golang.org/x/net v0.0.0-20200506145744-7e3656a0809f/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A= golang.org/x/net v0.0.0-20200513185701-a91f0712d120/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A= -golang.org/x/net v0.0.0-20200520004742-59133d7f0dd7/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A= golang.org/x/net v0.0.0-20200520182314-0ba52f642ac2/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A= golang.org/x/net v0.0.0-20200625001655-4c5254603344/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= golang.org/x/net v0.0.0-20200707034311-ab3426394381/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= golang.org/x/net v0.0.0-20200822124328-c89045814202/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= -golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= -golang.org/x/net v0.0.0-20210428140749-89ef3d95e781/go.mod h1:OJAsFXCWl8Ukc7SiCT/9KSuxbyM7479/AVlXFRxuMCk= golang.org/x/net v0.0.0-20210525063256-abc453219eb5/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk= +golang.org/x/net v0.0.0-20220225172249-27dd8689420f h1:oA4XRj0qtSt8Yo1Zms0CUlsT3KG69V2UGQWPBxujDmc= golang.org/x/net v0.0.0-20220225172249-27dd8689420f/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= @@ -464,7 +434,6 @@ golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= @@ -481,12 +450,8 @@ golang.org/x/sys v0.0.0-20190507160741-ecd444e8653b/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20190606165138-5da285871e9c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190624142023-c5567b49c5d0/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190726091711-fc99dfbffb4e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20190904154756-749cb33beabd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191001151750-bb3f8db39f24/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20191005200804-aed5e4c7ecf9/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20191010194322-b09406accb47/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20191120155948-bd437916bb0e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191204072324-ce4227a45e2e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191228213918-04cbcbbfeed8/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200106162015-b016eb3dc98e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= @@ -506,9 +471,7 @@ golang.org/x/sys v0.0.0-20200523222454-059865788121/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20200615200032-f1bc736245b1/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200625212154-ddb9806d33ae/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200803210538-64077c9b5642/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20210112080510-489259a85091/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210124154548-22da62e12c0c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210603081109-ebe580a85c40/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= @@ -516,8 +479,8 @@ golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220114195835-da31bd327af9/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.0.0-20220412071739-889880a91fd5 h1:NubxfvTRuNb4RVzWrIDAUzUvREH1HkCD4JjyQTSG9As= -golang.org/x/sys v0.0.0-20220412071739-889880a91fd5/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220412211240-33da011f77ad h1:ntjMns5wyP/fN65tdBD4g8J5w8n015+iIIs9rtjXkY0= +golang.org/x/sys v0.0.0-20220412211240-33da011f77ad/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= @@ -526,6 +489,7 @@ golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3 golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.3.7 h1:olpwvP2KacW1ZWvsR7uQhoyTYvKAupfQrRGBFM352Gk= golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= @@ -571,7 +535,6 @@ golang.org/x/tools v0.0.0-20200618134242-20370b0cb4b2/go.mod h1:EkVYQZoAsY45+roY golang.org/x/tools v0.0.0-20200729194436-6467de6f59a7/go.mod h1:njjCfa9FT2d7l9Bc6FUM5FLjQPp3cFF28FI3qnDFljA= golang.org/x/tools v0.0.0-20200804011535-6c149bb5ef0d/go.mod h1:njjCfa9FT2d7l9Bc6FUM5FLjQPp3cFF28FI3qnDFljA= golang.org/x/tools v0.0.0-20200825202427-b303f430e36d/go.mod h1:njjCfa9FT2d7l9Bc6FUM5FLjQPp3cFF28FI3qnDFljA= -golang.org/x/tools v0.0.0-20201224043029-2b0845dc783e/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= @@ -663,12 +626,14 @@ gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogR gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI= gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys= +gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ= gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw= gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.5/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b h1:h8qDotaEPuJATrMmW04NCwg7v22aHH28wwpauUhK9Oo= diff --git a/middleware/hooks.go b/middleware/hooks.go index a535e68..778261c 100644 --- a/middleware/hooks.go +++ b/middleware/hooks.go @@ -50,7 +50,7 @@ func (h *swarmInteractionHook) HandleAnnounce(ctx context.Context, req *bittorre err = h.store.GraduateLeecher(req.InfoHash, req.Peer) return ctx, err case req.Left == 0: - // Completed events will also have Left == 0, but by making this + // Completed events will also have Key == 0, but by making this // an extra case we can treat "old" seeders differently from // graduating leechers. (Calling PutSeeder is probably faster // than calling GraduateLeecher.) @@ -124,14 +124,29 @@ func (h *responseHook) appendPeers(req *bittorrent.AnnounceRequest, resp *bittor switch req.IP.AddressFamily { case bittorrent.IPv4: - resp.IPv4Peers = peers + resp.IPv4Peers = mergePeers(resp.IPv4Peers, peers) case bittorrent.IPv6: - resp.IPv6Peers = peers + resp.IPv6Peers = mergePeers(resp.IPv6Peers, peers) default: - panic("attempted to append peer that is neither IPv4 nor IPv6") + err = bittorrent.ErrInvalidAddressFamily } - return nil + return err +} + +func mergePeers(p0, p1 []bittorrent.Peer) (result []bittorrent.Peer) { + peers := make(map[string]bittorrent.Peer, len(p0)+len(p1)) + for _, p := range p0 { + peers[p.RawString()] = p + } + for _, p := range p1 { + peers[p.RawString()] = p + } + result = make([]bittorrent.Peer, 0, len(peers)) + for _, v := range peers { + result = append(result, v) + } + return } func (h *responseHook) HandleScrape(ctx context.Context, req *bittorrent.ScrapeRequest, resp *bittorrent.ScrapeResponse) (context.Context, error) { diff --git a/middleware/torrentapproval/container/directory/directory.go b/middleware/torrentapproval/container/directory/directory.go index ef3cc0d..ff902aa 100644 --- a/middleware/torrentapproval/container/directory/directory.go +++ b/middleware/torrentapproval/container/directory/directory.go @@ -61,49 +61,56 @@ func build(confBytes []byte, st storage.Storage) (container.Container, error) { go func() { for event := range d.watcher.Events { var mi *metainfo.MetaInfo + lf := log.Fields{ + "file": event.TorrentFilePath, + "v1hash": event.InfoHash, + } if mi, err = metainfo.LoadFromFile(event.TorrentFilePath); err == nil { s256 := sha256.New() s256.Write(mi.InfoBytes) v2hash, _ := bittorrent.NewInfoHash(s256.Sum(nil)) - lf := log.Fields{ - "file": event.TorrentFilePath, - "v1hash": event.InfoHash.String(), - "v2hash": v2hash.String(), - "v2to1hash": v2hash.TruncateV1().String(), - } + lf["v2hash"] = v2hash + lf["v2to1hash"] = v2hash.TruncateV1() switch event.Change { case dirwatch.Added: var name string if info, err := mi.UnmarshalInfo(); err == nil { name = info.Name } else { - log.Warn(err) + lf["error"] = err + log.Warn("unable to unmarshal torrent info", lf) + delete(lf, "error") } if len(name) == 0 { name = list.DUMMY } - d.Storage.BulkPut(d.StorageCtx, - storage.Pair{ - Left: event.InfoHash.AsString(), - Right: name, - }, storage.Pair{ - Left: v2hash.RawString(), - Right: name, - }, storage.Pair{ - Left: v2hash.TruncateV1().RawString(), - Right: name, - }) + if err := d.Storage.BulkPut(d.StorageCtx, + storage.Entry{ + Key: event.InfoHash.AsString(), + Value: name, + }, storage.Entry{ + Key: v2hash.RawString(), + Value: name, + }, storage.Entry{ + Key: v2hash.TruncateV1().RawString(), + Value: name, + }); err != nil { + lf["error"] = err + } log.Debug("approval torrent added", lf) case dirwatch.Removed: - d.Storage.Delete(c.StorageCtx, + if err := d.Storage.Delete(c.StorageCtx, event.InfoHash.AsString(), v2hash.RawString(), v2hash.TruncateV1().RawString(), - ) + ); err != nil { + lf["error"] = err + } log.Debug("approval torrent deleted", lf) } } else { - log.Err(err) + lf["error"] = err + log.Error("unable to load torrent file", lf) } } }() diff --git a/middleware/torrentapproval/container/list/list.go b/middleware/torrentapproval/container/list/list.go index bf31070..8862d30 100644 --- a/middleware/torrentapproval/container/list/list.go +++ b/middleware/torrentapproval/container/list/list.go @@ -51,18 +51,20 @@ func build(confBytes []byte, st storage.Storage) (container.Container, error) { } if len(c.HashList) > 0 { - init := make([]storage.Pair, 0, len(c.HashList)) + init := make([]storage.Entry, 0, len(c.HashList)) for _, hashString := range c.HashList { ih, err := bittorrent.NewInfoHash(hashString) if err != nil { return nil, fmt.Errorf("whitelist : %s : %w", hashString, err) } - init = append(init, storage.Pair{Left: ih.RawString(), Right: DUMMY}) + init = append(init, storage.Entry{Key: ih.RawString(), Value: DUMMY}) if len(ih) == bittorrent.InfoHashV2Len { - init = append(init, storage.Pair{Left: ih.TruncateV1().RawString(), Right: DUMMY}) + init = append(init, storage.Entry{Key: ih.TruncateV1().RawString(), Value: DUMMY}) } } - l.Storage.BulkPut(l.StorageCtx, init...) + if err := l.Storage.BulkPut(l.StorageCtx, init...); err != nil { + return nil, fmt.Errorf("unable to put initial data: %w", err) + } } return l, nil } @@ -80,10 +82,19 @@ type List struct { // Approved checks if specified hash is approved or not. // If List.Invert set to true and hash found in storage, function will return false, // that means that hash is blacklisted. -func (l *List) Approved(hash bittorrent.InfoHash) bool { - b := l.Storage.Contains(l.StorageCtx, hash.RawString()) - if len(hash) == bittorrent.InfoHashV2Len { - b = b || l.Storage.Contains(l.StorageCtx, hash.TruncateV1().RawString()) +func (l *List) Approved(hash bittorrent.InfoHash) (contains bool) { + var err error + if contains, err = l.Storage.Contains(l.StorageCtx, hash.RawString()); err == nil { + if len(hash) == bittorrent.InfoHashV2Len { + if containsV2, errV2 := l.Storage.Contains(l.StorageCtx, hash.TruncateV1().RawString()); err == nil { + contains = contains || containsV2 + } else { + err = errV2 + } + } } - return b != l.Invert + if err != nil { + log.Err(err) + } + return contains != l.Invert } diff --git a/middleware/torrentapproval/torrentapproval.go b/middleware/torrentapproval/torrentapproval.go index 1f38dde..ebec9ae 100644 --- a/middleware/torrentapproval/torrentapproval.go +++ b/middleware/torrentapproval/torrentapproval.go @@ -6,10 +6,11 @@ import ( "context" "fmt" + "gopkg.in/yaml.v3" + "github.com/sot-tech/mochi/bittorrent" "github.com/sot-tech/mochi/middleware" "github.com/sot-tech/mochi/middleware/torrentapproval/container" - "gopkg.in/yaml.v3" // import directory watcher to enable appropriate support _ "github.com/sot-tech/mochi/middleware/torrentapproval/container/directory" diff --git a/storage/memory/storage.go b/storage/memory/storage.go index 9ab7e95..815ad2d 100644 --- a/storage/memory/storage.go +++ b/storage/memory/storage.go @@ -186,8 +186,8 @@ type peerShard struct { type swarm struct { // map serialized peer to mtime - seeders map[storage.SerializedPeer]int64 - leechers map[storage.SerializedPeer]int64 + seeders map[string]int64 + leechers map[string]int64 } type store struct { @@ -214,7 +214,7 @@ func (ps *store) populateProm() { s.RUnlock() } - storage.PromInfohashesCount.Set(float64(numInfohashes)) + storage.PromInfoHashesCount.Set(float64(numInfohashes)) storage.PromSeedersCount.Set(float64(numSeeders)) storage.PromLeechersCount.Set(float64(numLeechers)) } @@ -246,15 +246,15 @@ func (ps *store) PutSeeder(ih bittorrent.InfoHash, p bittorrent.Peer) error { default: } - pk := storage.NewSerializedPeer(p) + pk := p.RawString() shard := ps.shards[ps.shardIndex(ih, p.IP.AddressFamily)] shard.Lock() if _, ok := shard.swarms[ih]; !ok { shard.swarms[ih] = swarm{ - seeders: make(map[storage.SerializedPeer]int64), - leechers: make(map[storage.SerializedPeer]int64), + seeders: make(map[string]int64), + leechers: make(map[string]int64), } } @@ -277,7 +277,7 @@ func (ps *store) DeleteSeeder(ih bittorrent.InfoHash, p bittorrent.Peer) error { default: } - pk := storage.NewSerializedPeer(p) + pk := p.RawString() shard := ps.shards[ps.shardIndex(ih, p.IP.AddressFamily)] shard.Lock() @@ -310,15 +310,15 @@ func (ps *store) PutLeecher(ih bittorrent.InfoHash, p bittorrent.Peer) error { default: } - pk := storage.NewSerializedPeer(p) + pk := p.RawString() shard := ps.shards[ps.shardIndex(ih, p.IP.AddressFamily)] shard.Lock() if _, ok := shard.swarms[ih]; !ok { shard.swarms[ih] = swarm{ - seeders: make(map[storage.SerializedPeer]int64), - leechers: make(map[storage.SerializedPeer]int64), + seeders: make(map[string]int64), + leechers: make(map[string]int64), } } @@ -341,7 +341,7 @@ func (ps *store) DeleteLeecher(ih bittorrent.InfoHash, p bittorrent.Peer) error default: } - pk := storage.NewSerializedPeer(p) + pk := p.RawString() shard := ps.shards[ps.shardIndex(ih, p.IP.AddressFamily)] shard.Lock() @@ -374,15 +374,15 @@ func (ps *store) GraduateLeecher(ih bittorrent.InfoHash, p bittorrent.Peer) erro default: } - pk := storage.NewSerializedPeer(p) + pk := p.RawString() shard := ps.shards[ps.shardIndex(ih, p.IP.AddressFamily)] shard.Lock() if _, ok := shard.swarms[ih]; !ok { shard.swarms[ih] = swarm{ - seeders: make(map[storage.SerializedPeer]int64), - leechers: make(map[storage.SerializedPeer]int64), + seeders: make(map[string]int64), + leechers: make(map[string]int64), } } @@ -426,8 +426,8 @@ func (ps *store) AnnouncePeers(ih bittorrent.InfoHash, seeder bool, numWant int, if numWant == 0 { break } - - peers = append(peers, pk.ToPeer()) + p, _ := bittorrent.NewPeer(pk) + peers = append(peers, p) numWant-- } } else { @@ -437,15 +437,15 @@ func (ps *store) AnnouncePeers(ih bittorrent.InfoHash, seeder bool, numWant int, if numWant == 0 { break } - - peers = append(peers, pk.ToPeer()) + p, _ := bittorrent.NewPeer(pk) + peers = append(peers, p) numWant-- } // Append leechers until we reach numWant. if numWant > 0 { leechers := shard.swarms[ih].leechers - announcerPK := storage.NewSerializedPeer(announcer) + announcerPK := announcer.RawString() for pk := range leechers { if pk == announcerPK { continue @@ -454,8 +454,8 @@ func (ps *store) AnnouncePeers(ih bittorrent.InfoHash, seeder bool, numWant int, if numWant == 0 { break } - - peers = append(peers, pk.ToPeer()) + p, _ := bittorrent.NewPeer(pk) + peers = append(peers, p) numWant-- } } @@ -500,38 +500,40 @@ func asKey(in any) any { return fmt.Sprint(in) } -func (ps *store) Put(ctx string, key, value any) { +func (ps *store) Put(ctx string, value storage.Entry) error { m, _ := ps.contexts.LoadOrStore(ctx, new(sync.Map)) - m.(*sync.Map).Store(asKey(key), value) + m.(*sync.Map).Store(value.Key, value.Value) + return nil } -func (ps *store) Contains(ctx string, key any) bool { +func (ps *store) Contains(ctx string, key string) (bool, error) { var exist bool if m, found := ps.contexts.Load(ctx); found { _, exist = m.(*sync.Map).Load(asKey(key)) } - return exist + return exist, nil } -func (ps *store) BulkPut(ctx string, pairs ...storage.Pair) { +func (ps *store) BulkPut(ctx string, pairs ...storage.Entry) error { if len(pairs) > 0 { c, _ := ps.contexts.LoadOrStore(ctx, new(sync.Map)) m := c.(*sync.Map) for _, p := range pairs { - m.Store(asKey(p.Left), p.Right) + m.Store(asKey(p.Key), p.Value) } } + return nil } -func (ps *store) Load(ctx string, key any) any { +func (ps *store) Load(ctx string, key string) (any, error) { var v any if m, found := ps.contexts.Load(ctx); found { v, _ = m.(*sync.Map).Load(asKey(key)) } - return v + return v, nil } -func (ps *store) Delete(ctx string, keys ...any) { +func (ps *store) Delete(ctx string, keys ...string) error { if len(keys) > 0 { if m, found := ps.contexts.Load(ctx); found { m := m.(*sync.Map) @@ -540,6 +542,7 @@ func (ps *store) Delete(ctx string, keys ...any) { } } } + return nil } // collectGarbage deletes all Peers from the Storage which are older than the diff --git a/storage/misc.go b/storage/misc.go deleted file mode 100644 index 3c59a10..0000000 --- a/storage/misc.go +++ /dev/null @@ -1,54 +0,0 @@ -package storage - -import ( - "encoding/binary" - "net" - - "github.com/sot-tech/mochi/bittorrent" -) - -// Pair - some key-value pair, used for BulkPut -type Pair struct { - Left, Right any -} - -// SerializedPeer concatenation of PeerID, net port and IP-address -type SerializedPeer string - -// NewSerializedPeer builds SerializedPeer from bittorrent.Peer -func NewSerializedPeer(p bittorrent.Peer) SerializedPeer { - b := make([]byte, bittorrent.PeerIDLen+2+len(p.IP.IP)) - copy(b[:bittorrent.PeerIDLen], p.ID[:]) - binary.BigEndian.PutUint16(b[bittorrent.PeerIDLen:bittorrent.PeerIDLen+2], p.Port) - copy(b[bittorrent.PeerIDLen+2:], p.IP.IP) - - return SerializedPeer(b) -} - -// ToPeer parses SerializedPeer to bittorrent.Peer -func (pk SerializedPeer) ToPeer() bittorrent.Peer { - peerID, err := bittorrent.NewPeerID([]byte(pk[:bittorrent.PeerIDLen])) - if err != nil { - panic(err) - } - peer := bittorrent.Peer{ - ID: peerID, - Port: binary.BigEndian.Uint16([]byte(pk[bittorrent.PeerIDLen : bittorrent.PeerIDLen+2])), - IP: bittorrent.IP{IP: net.IP(pk[bittorrent.PeerIDLen+2:])}, - } - - if ip := peer.IP.To4(); ip != nil { - peer.IP.IP = ip - peer.IP.AddressFamily = bittorrent.IPv4 - } else if len(peer.IP.IP) == net.IPv6len { // implies toReturn.IP.To4() == nil - peer.IP.AddressFamily = bittorrent.IPv6 - } else { - panic("IP is neither v4 nor v6") - } - - return peer -} - -func (pk SerializedPeer) String() string { - return string(pk) -} diff --git a/storage/prometheus.go b/storage/prometheus.go index 3eb8899..210a8d1 100644 --- a/storage/prometheus.go +++ b/storage/prometheus.go @@ -6,7 +6,7 @@ func init() { // Register the metrics. prometheus.MustRegister( PromGCDurationMilliseconds, - PromInfohashesCount, + PromInfoHashesCount, PromSeedersCount, PromLeechersCount, ) @@ -16,29 +16,29 @@ var ( // PromGCDurationMilliseconds is a histogram used by storage to record the // durations of execution time required for removing expired peers. PromGCDurationMilliseconds = prometheus.NewHistogram(prometheus.HistogramOpts{ - Name: "chihaya_storage_gc_duration_milliseconds", + Name: "mochi_storage_gc_duration_milliseconds", Help: "The time it takes to perform storage garbage collection", Buckets: prometheus.ExponentialBuckets(9.375, 2, 10), }) - // PromInfohashesCount is a gauge used to hold the current total amount of + // PromInfoHashesCount is a gauge used to hold the current total amount of // unique swarms being tracked by a storage. - PromInfohashesCount = prometheus.NewGauge(prometheus.GaugeOpts{ - Name: "chihaya_storage_infohashes_count", + PromInfoHashesCount = prometheus.NewGauge(prometheus.GaugeOpts{ + Name: "mochi_storage_infohashes_count", Help: "The number of Infohashes tracked", }) // PromSeedersCount is a gauge used to hold the current total amount of // unique seeders per swarm. PromSeedersCount = prometheus.NewGauge(prometheus.GaugeOpts{ - Name: "chihaya_storage_seeders_count", + Name: "mochi_storage_seeders_count", Help: "The number of seeders tracked", }) // PromLeechersCount is a gauge used to hold the current total amount of // unique leechers per swarm. PromLeechersCount = prometheus.NewGauge(prometheus.GaugeOpts{ - Name: "chihaya_storage_leechers_count", + Name: "mochi_storage_leechers_count", Help: "The number of leechers tracked", }) ) diff --git a/storage/redis/storage.go b/storage/redis/storage.go index 2bbca32..f497e8a 100644 --- a/storage/redis/storage.go +++ b/storage/redis/storage.go @@ -55,6 +55,7 @@ const ( defaultConnectTimeout = time.Second * 15 ) +// ErrSentinelAndClusterChecked returned from initializer if both Config.Sentinel and Config.Cluster provided var ErrSentinelAndClusterChecked = errors.New("unable to use both cluster and sentinel mode") func init() { @@ -118,7 +119,6 @@ func (cfg Config) LogFields() log.Fields { // // This function warns to the logger when a value is changed. func (cfg Config) Validate() (Config, error) { - if cfg.Sentinel && cfg.Cluster { return cfg, ErrSentinelAndClusterChecked } @@ -203,7 +203,7 @@ func (cfg Config) Validate() (Config, error) { func connect(cfg Config) (*store, error) { var err error db := &store{ - // FIXME: get context from parent + // FIXME: get context from parent and put into GC, middleware functions should use own ctx ctx: context.TODO(), } switch { @@ -243,8 +243,8 @@ func connect(cfg Config) (*store, error) { if err = db.con.Ping(db.ctx).Err(); err == nil && !errors.Is(err, redis.Nil) { err = nil } else { - db = nil _ = db.con.Close() + db = nil } return db, err } @@ -287,9 +287,7 @@ func (ps *store) runGC(gcInterval, peerLifeTime time.Duration) { case <-time.After(gcInterval): before := time.Now().Add(-peerLifeTime) log.Debug("storage: purging peers with no announces since", log.Fields{"before": before}) - if err := ps.collectGarbage(before); err != nil { - log.Error("storage: collectGarbage error", log.Fields{"before": before, "error": err}) - } + ps.collectGarbage(before) } } } @@ -321,39 +319,44 @@ type store struct { var groups = []string{bittorrent.IPv4.String(), bittorrent.IPv6.String()} -func leecherInfohashKey(af, ih string) string { - return af + "_L_" + ih +// leecherInfoHashKey generates string IPvN_L_hash +func leecherInfoHashKey(addressFamily, infoHash string) string { + return addressFamily + "_L_" + infoHash } -func seederInfohashKey(af, ih string) string { - return af + "_S_" + ih +// seederInfoHashKey generates string IPvN_S_hash +func seederInfoHashKey(addressFamily, infoHash string) string { + return addressFamily + "_S_" + infoHash } -func infohashCountKey(af string) string { - return af + "_infohash_count" +// seederInfoHashKey generates string IPvN_infohash_count +func infoHashCountKey(addressFamily string) string { + return addressFamily + "_infohash_count" } -func seederCountKey(af string) string { - return af + "_S_count" +// seederInfoHashKey generates string IPvN_L_count +func leecherCountKey(addressFamily string) string { + return addressFamily + "_L_count" } -func leecherCountKey(af string) string { - return af + "_L_count" +// seederInfoHashKey generates string IPvN_S_count +func seederCountKey(addressFamily string) string { + return addressFamily + "_S_count" } // populateProm aggregates metrics over all groups and then posts them to // prometheus. func (ps *store) populateProm() { - var numInfohashes, numSeeders, numLeechers int64 + var numInfoHashes, numSeeders, numLeechers int64 for _, group := range groups { - if n, err := ps.con.Get(ps.ctx, infohashCountKey(group)).Int64(); err != nil && !errors.Is(err, redis.Nil) { + if n, err := ps.con.Get(ps.ctx, infoHashCountKey(group)).Int64(); err != nil && !errors.Is(err, redis.Nil) { log.Error("storage: GET counter failure", log.Fields{ - "key": infohashCountKey(group), + "key": infoHashCountKey(group), "error": err, }) } else { - numInfohashes += n + numInfoHashes += n } if n, err := ps.con.Get(ps.ctx, seederCountKey(group)).Int64(); err != nil && !errors.Is(err, redis.Nil) { log.Error("storage: GET counter failure", log.Fields{ @@ -373,7 +376,7 @@ func (ps *store) populateProm() { } } - storage.PromInfohashesCount.Set(float64(numInfohashes)) + storage.PromInfoHashesCount.Set(float64(numInfoHashes)) storage.PromSeedersCount.Set(float64(numSeeders)) storage.PromLeechersCount.Set(float64(numLeechers)) } @@ -386,7 +389,7 @@ func (ps *store) tx(txf func(tx redis.Pipeliner) error) (err error) { if pipe, txErr := ps.con.TxPipelined(ps.ctx, txf); txErr == nil { errs := make([]string, 0) for _, c := range pipe { - if err := c.Err(); err == nil { + if err := c.Err(); err != nil { errs = append(errs, err.Error()) } } @@ -409,16 +412,15 @@ func asNil(err error) error { func (ps *store) PutSeeder(ih bittorrent.InfoHash, p bittorrent.Peer) error { addressFamily := p.IP.AddressFamily.String() log.Debug("storage: PutSeeder", log.Fields{ - "InfoHash": ih.String(), + "InfoHash": ih, "Peer": p, }) - peerKey := storage.NewSerializedPeer(p).String() - encodedSeederInfoHash := seederInfohashKey(addressFamily, ih.String()) + encodedSeederInfoHash := seederInfoHashKey(addressFamily, ih.RawString()) now := ps.getClock() return ps.tx(func(tx redis.Pipeliner) (err error) { - if err = tx.HSet(ps.ctx, encodedSeederInfoHash, peerKey, now).Err(); err != nil { + if err = tx.HSet(ps.ctx, encodedSeederInfoHash, p.RawString(), now).Err(); err != nil { return } if err = ps.con.Incr(ps.ctx, seederCountKey(addressFamily)).Err(); err != nil { @@ -427,7 +429,7 @@ func (ps *store) PutSeeder(ih bittorrent.InfoHash, p bittorrent.Peer) error { if err = ps.con.HSet(ps.ctx, addressFamily, encodedSeederInfoHash, now).Err(); err != nil { return } - err = ps.con.Incr(ps.ctx, infohashCountKey(addressFamily)).Err() + err = ps.con.Incr(ps.ctx, infoHashCountKey(addressFamily)).Err() return }) } @@ -435,13 +437,12 @@ func (ps *store) PutSeeder(ih bittorrent.InfoHash, p bittorrent.Peer) error { func (ps *store) DeleteSeeder(ih bittorrent.InfoHash, p bittorrent.Peer) error { addressFamily := p.IP.AddressFamily.String() log.Debug("storage: DeleteSeeder", log.Fields{ - "InfoHash": ih.String(), + "InfoHash": ih, "Peer": p, }) - peerKey := storage.NewSerializedPeer(p).String() - encodedSeederInfoHash := seederInfohashKey(addressFamily, ih.String()) - deleted, err := ps.con.HDel(ps.ctx, encodedSeederInfoHash, peerKey).Uint64() + encodedSeederInfoHash := seederInfoHashKey(addressFamily, ih.RawString()) + deleted, err := ps.con.HDel(ps.ctx, encodedSeederInfoHash, p.RawString()).Uint64() err = asNil(err) if err == nil { if deleted == 0 { @@ -457,17 +458,16 @@ func (ps *store) DeleteSeeder(ih bittorrent.InfoHash, p bittorrent.Peer) error { func (ps *store) PutLeecher(ih bittorrent.InfoHash, p bittorrent.Peer) error { addressFamily := p.IP.AddressFamily.String() log.Debug("storage: PutLeecher", log.Fields{ - "InfoHash": ih.String(), + "InfoHash": ih, "Peer": p, }) // Update the peer in the swarm. - encodedLeecherInfoHash := leecherInfohashKey(addressFamily, ih.String()) - peerKey := storage.NewSerializedPeer(p).String() + encodedLeecherInfoHash := leecherInfoHashKey(addressFamily, ih.RawString()) now := ps.getClock() return ps.tx(func(tx redis.Pipeliner) (err error) { - if err = tx.HSet(ps.ctx, encodedLeecherInfoHash, peerKey, now).Err(); err != nil { + if err = tx.HSet(ps.ctx, encodedLeecherInfoHash, p.RawString(), now).Err(); err != nil { return } if err = tx.HSet(ps.ctx, addressFamily, encodedLeecherInfoHash, now).Err(); err != nil { @@ -481,14 +481,13 @@ func (ps *store) PutLeecher(ih bittorrent.InfoHash, p bittorrent.Peer) error { func (ps *store) DeleteLeecher(ih bittorrent.InfoHash, p bittorrent.Peer) error { addressFamily := p.IP.AddressFamily.String() log.Debug("storage: DeleteLeecher", log.Fields{ - "InfoHash": ih.String(), + "InfoHash": ih, "Peer": p, }) - peerKey := storage.NewSerializedPeer(p).String() - encodedLeecherInfoHash := leecherInfohashKey(addressFamily, ih.String()) + encodedLeecherInfoHash := leecherInfoHashKey(addressFamily, ih.RawString()) - deleted, err := ps.con.HDel(ps.ctx, encodedLeecherInfoHash, peerKey).Uint64() + deleted, err := ps.con.HDel(ps.ctx, encodedLeecherInfoHash, p.RawString()).Uint64() err = asNil(err) if err == nil { if deleted == 0 { @@ -504,14 +503,14 @@ func (ps *store) DeleteLeecher(ih bittorrent.InfoHash, p bittorrent.Peer) error func (ps *store) GraduateLeecher(ih bittorrent.InfoHash, p bittorrent.Peer) error { addressFamily := p.IP.AddressFamily.String() log.Debug("storage: GraduateLeecher", log.Fields{ - "InfoHash": ih.String(), + "InfoHash": ih, "Peer": p, }) - encodedInfoHash := ih.String() - encodedLeecherInfoHash := leecherInfohashKey(addressFamily, encodedInfoHash) - encodedSeederInfoHash := seederInfohashKey(addressFamily, encodedInfoHash) - peerKey := storage.NewSerializedPeer(p).String() + encodedInfoHash := ih.RawString() + encodedLeecherInfoHash := leecherInfoHashKey(addressFamily, encodedInfoHash) + encodedSeederInfoHash := seederInfoHashKey(addressFamily, encodedInfoHash) + peerKey := p.RawString() now := ps.getClock() return ps.tx(func(tx redis.Pipeliner) error { @@ -532,7 +531,7 @@ func (ps *store) GraduateLeecher(ih bittorrent.InfoHash, p bittorrent.Peer) erro err = tx.HSet(ps.ctx, addressFamily, encodedSeederInfoHash, now).Err() } if err == nil { - err = tx.Incr(ps.ctx, infohashCountKey(addressFamily)).Err() + err = tx.Incr(ps.ctx, infoHashCountKey(addressFamily)).Err() } return err }) @@ -541,15 +540,15 @@ func (ps *store) GraduateLeecher(ih bittorrent.InfoHash, p bittorrent.Peer) erro func (ps *store) AnnouncePeers(ih bittorrent.InfoHash, seeder bool, numWant int, announcer bittorrent.Peer) (peers []bittorrent.Peer, err error) { addressFamily := announcer.IP.AddressFamily.String() log.Debug("storage: AnnouncePeers", log.Fields{ - "InfoHash": ih.String(), + "InfoHash": ih, "seeder": seeder, "numWant": numWant, "Peer": announcer, }) - encodedInfoHash := ih.String() - encodedLeecherInfoHash := leecherInfohashKey(addressFamily, encodedInfoHash) - encodedSeederInfoHash := seederInfohashKey(addressFamily, encodedInfoHash) + encodedInfoHash := ih.RawString() + encodedLeecherInfoHash := leecherInfoHashKey(addressFamily, encodedInfoHash) + encodedSeederInfoHash := seederInfoHashKey(addressFamily, encodedInfoHash) leechers, err := ps.con.HKeys(ps.ctx, encodedLeecherInfoHash).Result() err = asNil(err) @@ -573,9 +572,12 @@ func (ps *store) AnnouncePeers(ih bittorrent.InfoHash, seeder bool, numWant int, if numWant == 0 { break } - - peers = append(peers, storage.SerializedPeer(peerKey).ToPeer()) - numWant-- + if p, err := bittorrent.NewPeer(peerKey); err == nil { + peers = append(peers, p) + numWant-- + } else { + log.Error("storage: unable to decode leecher", log.Fields{"peer": peerKey}) + } } } else { // Append as many seeders as possible. @@ -583,22 +585,28 @@ func (ps *store) AnnouncePeers(ih bittorrent.InfoHash, seeder bool, numWant int, if numWant == 0 { break } - - peers = append(peers, storage.SerializedPeer(peerKey).ToPeer()) - numWant-- + if p, err := bittorrent.NewPeer(peerKey); err == nil { + peers = append(peers, p) + numWant-- + } else { + log.Error("storage: unable to decode seeder", log.Fields{"peer": peerKey}) + } } // Append leechers until we reach numWant. if numWant > 0 { - announcerPK := storage.NewSerializedPeer(announcer).String() + announcerPK := announcer.RawString() for _, peerKey := range leechers { if peerKey != announcerPK { if numWant == 0 { break } - - peers = append(peers, storage.SerializedPeer(peerKey).ToPeer()) - numWant-- + if p, err := bittorrent.NewPeer(peerKey); err == nil { + peers = append(peers, p) + numWant-- + } else { + log.Error("storage: unable to decode leecher", log.Fields{"peer": peerKey}) + } } } } @@ -610,9 +618,9 @@ func (ps *store) AnnouncePeers(ih bittorrent.InfoHash, seeder bool, numWant int, func (ps *store) ScrapeSwarm(ih bittorrent.InfoHash, af bittorrent.AddressFamily) (resp bittorrent.Scrape) { resp.InfoHash = ih addressFamily := af.String() - encodedInfoHash := ih.String() - encodedLeecherInfoHash := leecherInfohashKey(addressFamily, encodedInfoHash) - encodedSeederInfoHash := seederInfohashKey(addressFamily, encodedInfoHash) + encodedInfoHash := ih.RawString() + encodedLeecherInfoHash := leecherInfoHashKey(addressFamily, encodedInfoHash) + encodedSeederInfoHash := seederInfoHashKey(addressFamily, encodedInfoHash) leechersLen, err := ps.con.HLen(ps.ctx, encodedLeecherInfoHash).Result() err = asNil(err) @@ -640,70 +648,61 @@ func (ps *store) ScrapeSwarm(ih bittorrent.InfoHash, af bittorrent.AddressFamily return } -func (ps *store) Put(ctx string, key, value any) { - err := ps.con.HSet(ps.ctx, ctx, key, value).Err() - if err != nil { - panic(err) - } +func (ps *store) Put(ctx string, value storage.Entry) error { + return ps.con.HSet(ps.ctx, ctx, value.Key, value.Value).Err() } -func (ps *store) Contains(ctx string, key any) bool { +func (ps *store) Contains(ctx string, key string) (bool, error) { exist, err := ps.con.HExists(ps.ctx, ctx, key).Result() - if err != nil { - panic(err) - } - return exist + return exist, asNil(err) } const argNumErrorMsg = "ERR wrong number of arguments" -func (ps *store) BulkPut(ctx string, pairs ...storage.Pair) { +func (ps *store) BulkPut(ctx string, pairs ...storage.Entry) (err error) { if l := len(pairs); l > 0 { args := make([]any, 0, l*2) for _, p := range pairs { - args = append(args, p.Left, p.Right) + args = append(args, p.Key, p.Value) } - err := ps.con.HSet(ps.ctx, ctx, args...).Err() + err = ps.con.HSet(ps.ctx, ctx, args...).Err() if err != nil { if strings.Contains(err.Error(), argNumErrorMsg) { log.Warn("This REDIS version/implementation does not support variadic arguments for HSET") for _, p := range pairs { - if err := ps.con.HSet(ps.ctx, ctx, p.Left, p.Right).Err(); err != nil { - panic(err) + if err = ps.con.HSet(ps.ctx, ctx, p.Key, p.Value).Err(); err != nil { + break } } - } else { - panic(err) } } } + return } -func (ps *store) Load(ctx string, key any) any { - v, err := ps.con.HGet(ps.ctx, ctx, key).Result() - err = asNil(err) - if err != nil { - panic(err) +func (ps *store) Load(ctx string, key string) (v any, err error) { + v, err = ps.con.HGet(ps.ctx, ctx, key).Result() + if err != nil && errors.Is(err, redis.Nil) { + v, err = nil, nil } - return v + return } -func (ps *store) Delete(ctx string, keys ...any) { +func (ps *store) Delete(ctx string, keys ...string) (err error) { if len(keys) > 0 { - err := asNil(ps.con.HDel(ps.ctx, keys...).Err()) + err = asNil(ps.con.HDel(ps.ctx, ctx, keys...).Err()) if err != nil { if strings.Contains(err.Error(), argNumErrorMsg) { log.Warn("This REDIS version/implementation does not support variadic arguments for HDEL") for _, k := range keys { - if err := asNil(ps.con.HDel(ps.ctx, ctx, k).Err()); err != nil { - panic(err) + if err = asNil(ps.con.HDel(ps.ctx, ctx, k).Err()); err != nil { + break } } - } else { - panic(err) } } } + return } // collectGarbage deletes all Peers from the Storage which are older than the @@ -751,108 +750,125 @@ func (ps *store) Delete(ctx string, keys ...any) { // - If the change happens after the HLEN, we will not even attempt to make the // transaction. The infohash key will remain in the addressFamil hash and // we'll attempt to clean it up the next time collectGarbage runs. -func (ps *store) collectGarbage(cutoff time.Time) error { +func (ps *store) collectGarbage(cutoff time.Time) { cutoffUnix := cutoff.UnixNano() start := time.Now() - + var err error for _, group := range groups { - // list all infohashes in the group - infohashesList, err := ps.con.HKeys(ps.ctx, group).Result() + // list all infoHashes in the group + var infoHashes []string + infoHashes, err = ps.con.HKeys(ps.ctx, group).Result() err = asNil(err) - if err != nil { - return err - } - - for _, ihStr := range infohashesList { - isSeeder := len(ihStr) > 5 && ihStr[5:6] == "S" - - // list all (peer, timeout) pairs for the ih - ihList, err := ps.con.HGetAll(ps.ctx, ihStr).Result() - err = asNil(err) - if err != nil { - return err - } - - var removedPeerCount int64 - for k, v := range ihList { - peerKey := storage.SerializedPeer(k) - mtime, err := strconv.ParseInt(v, 10, 64) - if err != nil { - return err - } - if mtime <= cutoffUnix { - log.Debug("storage: deleting peer", log.Fields{ - "Peer": peerKey.ToPeer(), - }) - ret, err := ps.con.HDel(ps.ctx, ihStr, peerKey.String()).Result() - if err != nil { - return err + if err == nil { + for _, infoHash := range infoHashes { + isSeeder := len(infoHash) > 5 && infoHash[5:6] == "S" + // list all (peer, timeout) pairs for the ih + peerList, err := ps.con.HGetAll(ps.ctx, infoHash).Result() + err = asNil(err) + if err == nil { + var removedPeerCount int64 + for peerKey, timeStamp := range peerList { + var peer bittorrent.Peer + if peer, err = bittorrent.NewPeer(peerKey); err == nil { + if mtime, err := strconv.ParseInt(timeStamp, 10, 64); err == nil { + if mtime <= cutoffUnix { + log.Debug("storage: deleting peer", log.Fields{ + "Peer": peer, + }) + var count int64 + count, err = ps.con.HDel(ps.ctx, infoHash, peerKey).Result() + err = asNil(err) + if err == nil { + removedPeerCount += count + } + } + } + } + if err != nil { + log.Error("storage: Redis: unable to delete info hash peer", log.Fields{ + "group": group, + "infoHash": infoHash, + "peer": peer, + "key": peerKey, + "error": err, + }) + } + } + // DECR seeder/leecher counter + if removedPeerCount > 0 { + var decrCounter string + if isSeeder { + decrCounter = seederCountKey(group) + } else { + decrCounter = leecherCountKey(group) + } + if err := ps.con.DecrBy(ps.ctx, decrCounter, removedPeerCount).Err(); err != nil { + log.Error("storage: Redis: unable to decrement seeder/leecher peer count", log.Fields{ + "group": group, + "infoHash": infoHash, + "key": decrCounter, + "error": err, + }) + } } - removedPeerCount += ret - } - } - // DECR seeder/leecher counter - decrCounter := leecherCountKey(group) - if isSeeder { - decrCounter = seederCountKey(group) - } - if removedPeerCount > 0 { - if err := ps.con.DecrBy(ps.ctx, decrCounter, removedPeerCount).Err(); err != nil { - return err - } - } - - // use WATCH to avoid race condition - // https://redis.io/topics/transactions - _, err = ps.con.Watch(ps.ctx, ihStr) - if err != nil { - return err - } - ihLen, err := ps.con.HLen(ps.ctx, ihStr).Result() - if err != nil { - return err - } - if ihLen == 0 { - // Empty hashes are not shown among existing keys, - // in other words, it's removed automatically after `HDEL` the last field. - // _, err := ps.con.Del(ps.ctx, ihStr) - - _ = ps.con.Multi(ps.ctx) - _ = ps.con.HDel(ps.ctx, group, ihStr) - if isSeeder { - _ = ps.con.Decr(ps.ctx, infohashCountKey(group)) - } - _, err = redis.Values(ps.con.Exec(ps.ctx)) - if err != nil && !errors.Is(err, redis.Nil) { - log.Error("storage: Redis EXEC failure", log.Fields{ + // use WATCH to avoid race condition + // https://redis.io/topics/transactions + err = asNil(ps.con.Watch(ps.ctx, func(tx *redis.Tx) (err error) { + var infoHashCount int64 + infoHashCount, err = ps.con.HLen(ps.ctx, infoHash).Result() + err = asNil(err) + if err == nil && infoHashCount == 0 { + // Empty hashes are not shown among existing keys, + // in other words, it's removed automatically after `HDEL` the last field. + // _, err := ps.con.Del(ps.ctx, infoHash) + var deletedCount int64 + deletedCount, err = ps.con.HDel(ps.ctx, group, infoHash).Result() + err = asNil(err) + if err == nil && isSeeder && deletedCount > 0 { + err = ps.con.Decr(ps.ctx, infoHashCountKey(group)).Err() + } + } + return err + }, infoHash)) + if err != nil { + log.Error("storage: Redis: unable to clean info hash records", log.Fields{ + "group": group, + "infoHash": infoHash, + "error": err, + }) + } + } else { + log.Error("storage: Redis: unable to fetch info hash peers", log.Fields{ "group": group, - "infohash": ihStr, + "infoHash": infoHash, "error": err, }) } - } else { - if _, err = ps.con.Unwatch(ps.ctx); err != nil && !errors.Is(err, redis.Nil) { - log.Error("storage: Redis UNWATCH failure", log.Fields{"error": err}) - } } + } else { + log.Error("storage: Redis: unable to fetch info hashes", log.Fields{"group": group, "error": err}) } } - duration := float64(time.Since(start).Nanoseconds()) / float64(time.Millisecond) + duration := time.Since(start).Milliseconds() log.Debug("storage: recordGCDuration", log.Fields{"timeTaken(ms)": duration}) - storage.PromGCDurationMilliseconds.Observe(duration) - - return nil + storage.PromGCDurationMilliseconds.Observe(float64(duration)) } func (ps *store) Stop() stop.Result { c := make(stop.Channel) go func() { - close(ps.closed) + if ps.closed != nil { + close(ps.closed) + } ps.wg.Wait() - log.Info("storage: exiting. mochi does not clear data in redis when exiting. mochi keys have prefix 'IPv{4,6}_'.") - c.Done() + var err error + if ps.con != nil { + log.Info("storage: exiting. mochi does not clear data in redis when exiting. mochi keys have prefix 'IPv{4,6}_'.") + err = ps.con.Close() + } + c.Done(err) }() return c.Result() diff --git a/storage/redis/storage_test.go b/storage/redis/storage_test.go index 5fd97c3..4724214 100644 --- a/storage/redis/storage_test.go +++ b/storage/redis/storage_test.go @@ -11,21 +11,29 @@ import ( "github.com/sot-tech/mochi/storage/test" ) +var conf = Config{ + GarbageCollectionInterval: 10 * time.Minute, + PrometheusReportingInterval: 10 * time.Minute, + PeerLifetime: 30 * time.Minute, + ReadTimeout: 10 * time.Second, + WriteTimeout: 10 * time.Second, + ConnectTimeout: 10 * time.Second, +} + func createNew() s.Storage { - rs, err := miniredis.Run() + var ps s.Storage + var err error + ps, err = New(conf) if err != nil { - panic(err) + fmt.Println("unable to create real Redis connection: ", err, " using simulator") + var rs *miniredis.Miniredis + rs, err = miniredis.Run() + if err != nil { + panic(err) + } + conf.Addresses = []string{rs.Addr()} + ps, err = New(conf) } - redisURL := fmt.Sprintf("redis://@%s/0", rs.Addr()) - ps, err := New(Config{ - GarbageCollectionInterval: 10 * time.Minute, - PrometheusReportingInterval: 10 * time.Minute, - PeerLifetime: 30 * time.Minute, - RedisBroker: redisURL, - ReadTimeout: 10 * time.Second, - WriteTimeout: 10 * time.Second, - ConnectTimeout: 10 * time.Second, - }) if err != nil { panic(err) } diff --git a/storage/storage.go b/storage/storage.go index f51c68a..2e81634 100644 --- a/storage/storage.go +++ b/storage/storage.go @@ -14,6 +14,12 @@ var ( drivers = make(map[string]Driver) ) +// Entry - some key-value pair, used for BulkPut +type Entry struct { + Key string + Value any +} + // Driver is the interface used to initialize a new type of Storage. type Driver interface { NewStorage(cfg any) (Storage, error) @@ -107,20 +113,20 @@ type Storage interface { // Put used to place arbitrary k-v data with specified context // into storage. ctx parameter used to group data // (i.e. data only for specific middleware module) - Put(ctx string, key, value any) + Put(ctx string, value Entry) error // BulkPut used to place array of k-v data in specified context. // Useful when several data entries should be added in single transaction/connection - BulkPut(ctx string, pairs ...Pair) + BulkPut(ctx string, pairs ...Entry) error // Contains checks if any data in specified context exist - Contains(ctx string, key any) bool + Contains(ctx string, key string) (bool, error) // Load used to get arbitrary data in specified context by its key - Load(ctx string, key any) any + Load(ctx string, key string) (any, error) // Delete used to delete arbitrary data in specified context by its keys - Delete(ctx string, keys ...any) + Delete(ctx string, keys ...string) error // Stopper is an interface that expects a Stop method to stop the Storage. // For more details see the documentation in the stop package. diff --git a/storage/test/storage_bench.go b/storage/test/storage_bench.go index 253d92e..aba1476 100644 --- a/storage/test/storage_bench.go +++ b/storage/test/storage_bench.go @@ -17,7 +17,7 @@ type benchData struct { peers [1000]bittorrent.Peer } -func generateInfohashes() (a [1000]bittorrent.InfoHash) { +func generateInfoHashes() (a [1000]bittorrent.InfoHash) { for i := range a { b := make([]byte, bittorrent.InfoHashV1Len) rand.Read(b) @@ -63,7 +63,7 @@ type benchHolder struct { func (bh *benchHolder) runBenchmark(b *testing.B, parallel bool, sf benchSetupFunc, ef benchExecFunc) { ps := bh.st() - bd := &benchData{generateInfohashes(), generatePeers()} + bd := &benchData{generateInfoHashes(), generatePeers()} spacing := int32(1000 / runtime.NumCPU()) if sf != nil { err := sf(ps, bd) @@ -134,21 +134,21 @@ func (bh *benchHolder) Put1k(b *testing.B) { }) } -// Put1kInfohash benchmarks the PutSeeder method of a storage.Storage by cycling +// Put1kInfoHash benchmarks the PutSeeder method of a storage.Storage by cycling // through 1000 infohashes and putting the same peer into their swarms. // -// Put1kInfohash can run in parallel. -func (bh *benchHolder) Put1kInfohash(b *testing.B) { +// Put1kInfoHash can run in parallel. +func (bh *benchHolder) Put1kInfoHash(b *testing.B) { bh.runBenchmark(b, true, nil, func(i int, ps storage.Storage, bd *benchData) error { return ps.PutSeeder(bd.infohashes[i%1000], bd.peers[0]) }) } -// Put1kInfohash1k benchmarks the PutSeeder method of a storage.Storage by cycling +// Put1kInfoHash1k benchmarks the PutSeeder method of a storage.Storage by cycling // through 1000 infohashes and 1000 Peers and calling Put with them. // -// Put1kInfohash1k can run in parallel. -func (bh *benchHolder) Put1kInfohash1k(b *testing.B) { +// Put1kInfoHash1k can run in parallel. +func (bh *benchHolder) Put1kInfoHash1k(b *testing.B) { bh.runBenchmark(b, true, nil, func(i int, ps storage.Storage, bd *benchData) error { err := ps.PutSeeder(bd.infohashes[i%1000], bd.peers[(i*3)%1000]) return err @@ -183,11 +183,11 @@ func (bh *benchHolder) PutDelete1k(b *testing.B) { }) } -// PutDelete1kInfohash behaves like PutDelete1k with 1000 infohashes instead of +// PutDelete1kInfoHash behaves like PutDelete1k with 1000 infohashes instead of // 1000 Peers. // -// PutDelete1kInfohash can not run in parallel. -func (bh *benchHolder) PutDelete1kInfohash(b *testing.B) { +// PutDelete1kInfoHash can not run in parallel. +func (bh *benchHolder) PutDelete1kInfoHash(b *testing.B) { bh.runBenchmark(b, false, nil, func(i int, ps storage.Storage, bd *benchData) error { err := ps.PutSeeder(bd.infohashes[i%1000], bd.peers[0]) if err != nil { @@ -197,11 +197,11 @@ func (bh *benchHolder) PutDelete1kInfohash(b *testing.B) { }) } -// PutDelete1kInfohash1k behaves like PutDelete1k with 1000 infohashes in +// PutDelete1kInfoHash1k behaves like PutDelete1k with 1000 infohashes in // addition to 1000 Peers. // -// PutDelete1kInfohash1k can not run in parallel. -func (bh *benchHolder) PutDelete1kInfohash1k(b *testing.B) { +// PutDelete1kInfoHash1k can not run in parallel. +func (bh *benchHolder) PutDelete1kInfoHash1k(b *testing.B) { bh.runBenchmark(b, false, nil, func(i int, ps storage.Storage, bd *benchData) error { err := ps.PutSeeder(bd.infohashes[i%1000], bd.peers[(i*3)%1000]) if err != nil { @@ -234,22 +234,22 @@ func (bh *benchHolder) DeleteNonexist1k(b *testing.B) { }) } -// DeleteNonexist1kInfohash benchmarks the DeleteSeeder method of a storage.Storage by +// DeleteNonexist1kInfoHash benchmarks the DeleteSeeder method of a storage.Storage by // attempting to delete one Peer from one of 1000 infohashes. // -// DeleteNonexist1kInfohash can run in parallel. -func (bh *benchHolder) DeleteNonexist1kInfohash(b *testing.B) { +// DeleteNonexist1kInfoHash can run in parallel. +func (bh *benchHolder) DeleteNonexist1kInfoHash(b *testing.B) { bh.runBenchmark(b, true, nil, func(i int, ps storage.Storage, bd *benchData) error { _ = ps.DeleteSeeder(bd.infohashes[i%1000], bd.peers[0]) return nil }) } -// DeleteNonexist1kInfohash1k benchmarks the Delete method of a storage.Storage by -// attempting to delete one of 1000 Peers from one of 1000 Infohashes. +// DeleteNonexist1kInfoHash1k benchmarks the Delete method of a storage.Storage by +// attempting to delete one of 1000 Peers from one of 1000 InfoHashes. // -// DeleteNonexist1kInfohash1k can run in parallel. -func (bh *benchHolder) DeleteNonexist1kInfohash1k(b *testing.B) { +// DeleteNonexist1kInfoHash1k can run in parallel. +func (bh *benchHolder) DeleteNonexist1kInfoHash1k(b *testing.B) { bh.runBenchmark(b, true, nil, func(i int, ps storage.Storage, bd *benchData) error { _ = ps.DeleteSeeder(bd.infohashes[i%1000], bd.peers[(i*3)%1000]) return nil @@ -278,23 +278,23 @@ func (bh *benchHolder) GradNonexist1k(b *testing.B) { }) } -// GradNonexist1kInfohash benchmarks the GraduateLeecher method of a storage.Storage -// by attempting to graduate a nonexistent Peer for one of 100 Infohashes. +// GradNonexist1kInfoHash benchmarks the GraduateLeecher method of a storage.Storage +// by attempting to graduate a nonexistent Peer for one of 100 InfoHashes. // -// GradNonexist1kInfohash can run in parallel. -func (bh *benchHolder) GradNonexist1kInfohash(b *testing.B) { +// GradNonexist1kInfoHash can run in parallel. +func (bh *benchHolder) GradNonexist1kInfoHash(b *testing.B) { bh.runBenchmark(b, true, nil, func(i int, ps storage.Storage, bd *benchData) error { _ = ps.GraduateLeecher(bd.infohashes[i%1000], bd.peers[0]) return nil }) } -// GradNonexist1kInfohash1k benchmarks the GraduateLeecher method of a storage.Storage +// GradNonexist1kInfoHash1k benchmarks the GraduateLeecher method of a storage.Storage // by attempting to graduate one of 1000 nonexistent Peers for one of 1000 // infohashes. // -// GradNonexist1kInfohash1k can run in parallel. -func (bh *benchHolder) GradNonexist1kInfohash1k(b *testing.B) { +// GradNonexist1kInfoHash1k can run in parallel. +func (bh *benchHolder) GradNonexist1kInfoHash1k(b *testing.B) { bh.runBenchmark(b, true, nil, func(i int, ps storage.Storage, bd *benchData) error { _ = ps.GraduateLeecher(bd.infohashes[i%1000], bd.peers[(i*3)%1000]) return nil @@ -337,11 +337,11 @@ func (bh *benchHolder) PutGradDelete1k(b *testing.B) { }) } -// PutGradDelete1kInfohash behaves like PutGradDelete with one of 1000 +// PutGradDelete1kInfoHash behaves like PutGradDelete with one of 1000 // infohashes. // -// PutGradDelete1kInfohash can not run in parallel. -func (bh *benchHolder) PutGradDelete1kInfohash(b *testing.B) { +// PutGradDelete1kInfoHash can not run in parallel. +func (bh *benchHolder) PutGradDelete1kInfoHash(b *testing.B) { bh.runBenchmark(b, false, nil, func(i int, ps storage.Storage, bd *benchData) error { err := ps.PutLeecher(bd.infohashes[i%1000], bd.peers[0]) if err != nil { @@ -355,11 +355,11 @@ func (bh *benchHolder) PutGradDelete1kInfohash(b *testing.B) { }) } -// PutGradDelete1kInfohash1k behaves like PutGradDelete with one of 1000 Peers +// PutGradDelete1kInfoHash1k behaves like PutGradDelete with one of 1000 Peers // and one of 1000 infohashes. // -// PutGradDelete1kInfohash can not run in parallel. -func (bh *benchHolder) PutGradDelete1kInfohash1k(b *testing.B) { +// PutGradDelete1kInfoHash can not run in parallel. +func (bh *benchHolder) PutGradDelete1kInfoHash1k(b *testing.B) { bh.runBenchmark(b, false, nil, func(i int, ps storage.Storage, bd *benchData) error { err := ps.PutLeecher(bd.infohashes[i%1000], bd.peers[(i*3)%1000]) if err != nil { @@ -403,11 +403,11 @@ func (bh *benchHolder) AnnounceLeecher(b *testing.B) { }) } -// AnnounceLeecher1kInfohash behaves like AnnounceLeecher with one of 1000 +// AnnounceLeecher1kInfoHash behaves like AnnounceLeecher with one of 1000 // infohashes. // -// AnnounceLeecher1kInfohash can run in parallel. -func (bh *benchHolder) AnnounceLeecher1kInfohash(b *testing.B) { +// AnnounceLeecher1kInfoHash can run in parallel. +func (bh *benchHolder) AnnounceLeecher1kInfoHash(b *testing.B) { bh.runBenchmark(b, true, putPeers, func(i int, ps storage.Storage, bd *benchData) error { _, err := ps.AnnouncePeers(bd.infohashes[i%1000], false, 50, bd.peers[0]) return err @@ -425,11 +425,11 @@ func (bh *benchHolder) AnnounceSeeder(b *testing.B) { }) } -// AnnounceSeeder1kInfohash behaves like AnnounceSeeder with one of 1000 +// AnnounceSeeder1kInfoHash behaves like AnnounceSeeder with one of 1000 // infohashes. // -// AnnounceSeeder1kInfohash can run in parallel. -func (bh *benchHolder) AnnounceSeeder1kInfohash(b *testing.B) { +// AnnounceSeeder1kInfoHash can run in parallel. +func (bh *benchHolder) AnnounceSeeder1kInfoHash(b *testing.B) { bh.runBenchmark(b, true, putPeers, func(i int, ps storage.Storage, bd *benchData) error { _, err := ps.AnnouncePeers(bd.infohashes[i%1000], true, 50, bd.peers[0]) return err @@ -447,10 +447,10 @@ func (bh *benchHolder) ScrapeSwarm(b *testing.B) { }) } -// ScrapeSwarm1kInfohash behaves like ScrapeSwarm with one of 1000 infohashes. +// ScrapeSwarm1kInfoHash behaves like ScrapeSwarm with one of 1000 infohashes. // -// ScrapeSwarm1kInfohash can run in parallel. -func (bh *benchHolder) ScrapeSwarm1kInfohash(b *testing.B) { +// ScrapeSwarm1kInfoHash can run in parallel. +func (bh *benchHolder) ScrapeSwarm1kInfoHash(b *testing.B) { bh.runBenchmark(b, true, putPeers, func(i int, ps storage.Storage, bd *benchData) error { ps.ScrapeSwarm(bd.infohashes[i%1000], bittorrent.IPv4) return nil @@ -463,28 +463,28 @@ func RunBenchmarks(b *testing.B, newStorage benchStorageConstructor) { b.Run("BenchmarkNop", bh.Nop) b.Run("BenchmarkPut", bh.Put) b.Run("BenchmarkPut1k", bh.Put1k) - b.Run("BenchmarkPut1kInfohash", bh.Put1kInfohash) - b.Run("BenchmarkPut1kInfohash1k", bh.Put1kInfohash1k) + b.Run("BenchmarkPut1kInfoHash", bh.Put1kInfoHash) + b.Run("BenchmarkPut1kInfoHash1k", bh.Put1kInfoHash1k) b.Run("BenchmarkPutDelete", bh.PutDelete) b.Run("BenchmarkPutDelete1k", bh.PutDelete1k) - b.Run("BenchmarkPutDelete1kInfohash", bh.PutDelete1kInfohash) - b.Run("BenchmarkPutDelete1kInfohash1k", bh.PutDelete1kInfohash1k) + b.Run("BenchmarkPutDelete1kInfoHash", bh.PutDelete1kInfoHash) + b.Run("BenchmarkPutDelete1kInfoHash1k", bh.PutDelete1kInfoHash1k) b.Run("BenchmarkDeleteNonexist", bh.DeleteNonexist) b.Run("BenchmarkDeleteNonexist1k", bh.DeleteNonexist1k) - b.Run("BenchmarkDeleteNonexist1kInfohash", bh.DeleteNonexist1kInfohash) - b.Run("BenchmarkDeleteNonexist1kInfohash1k", bh.DeleteNonexist1kInfohash1k) + b.Run("BenchmarkDeleteNonexist1kInfoHash", bh.DeleteNonexist1kInfoHash) + b.Run("BenchmarkDeleteNonexist1kInfoHash1k", bh.DeleteNonexist1kInfoHash1k) b.Run("BenchmarkPutGradDelete", bh.PutGradDelete) b.Run("BenchmarkPutGradDelete1k", bh.PutGradDelete1k) - b.Run("BenchmarkPutGradDelete1kInfohash", bh.PutGradDelete1kInfohash) - b.Run("BenchmarkPutGradDelete1kInfohash1k", bh.PutGradDelete1kInfohash1k) + b.Run("BenchmarkPutGradDelete1kInfoHash", bh.PutGradDelete1kInfoHash) + b.Run("BenchmarkPutGradDelete1kInfoHash1k", bh.PutGradDelete1kInfoHash1k) b.Run("BenchmarkGradNonexist", bh.GradNonexist) b.Run("BenchmarkGradNonexist1k", bh.GradNonexist1k) - b.Run("BenchmarkGradNonexist1kInfohash", bh.GradNonexist1kInfohash) - b.Run("BenchmarkGradNonexist1kInfohash1k", bh.GradNonexist1kInfohash1k) + b.Run("BenchmarkGradNonexist1kInfoHash", bh.GradNonexist1kInfoHash) + b.Run("BenchmarkGradNonexist1kInfoHash1k", bh.GradNonexist1kInfoHash1k) b.Run("BenchmarkAnnounceLeecher", bh.AnnounceLeecher) - b.Run("BenchmarkAnnounceLeecher1kInfohash", bh.AnnounceLeecher1kInfohash) + b.Run("BenchmarkAnnounceLeecher1kInfoHash", bh.AnnounceLeecher1kInfoHash) b.Run("BenchmarkAnnounceSeeder", bh.AnnounceSeeder) - b.Run("BenchmarkAnnounceSeeder1kInfohash", bh.AnnounceSeeder1kInfohash) + b.Run("BenchmarkAnnounceSeeder1kInfoHash", bh.AnnounceSeeder1kInfoHash) b.Run("BenchmarkScrapeSwarm", bh.ScrapeSwarm) - b.Run("BenchmarkScrapeSwarm1kInfohash", bh.ScrapeSwarm1kInfohash) + b.Run("BenchmarkScrapeSwarm1kInfoHash", bh.ScrapeSwarm1kInfoHash) } diff --git a/storage/test/storage_test_base.go b/storage/test/storage_test_base.go index f17d697..1db27d9 100644 --- a/storage/test/storage_test_base.go +++ b/storage/test/storage_test_base.go @@ -171,64 +171,75 @@ func (th *testHolder) LeecherPutGraduateAnnounceDeleteAnnounce(t *testing.T) { func (th *testHolder) CustomPutContainsLoadDelete(t *testing.T) { for _, c := range testData { - th.st.Put("test", c.peer.String(), c.ih.RawString()) + err := th.st.Put("test", storage.Entry{Key: c.peer.String(), Value: c.ih.RawString()}) + require.Nil(t, err) // check if exist in ctx we put - contains := th.st.Contains("test", c.peer.String()) + contains, err := th.st.Contains("test", c.peer.String()) + require.Nil(t, err) require.True(t, contains) // check if not exist in another ctx - contains = th.st.Contains("", c.peer.String()) + contains, err = th.st.Contains("", c.peer.String()) + require.Nil(t, err) require.False(t, contains) // check value and type in ctx we put - out := th.st.Load("test", c.peer.String()) + out, err := th.st.Load("test", c.peer.String()) + require.Nil(t, err) ih, err := bittorrent.NewInfoHash(out) require.Nil(t, err) require.Equal(t, c.ih, ih) // check value is nil in another ctx - dummy := th.st.Load("", c.peer.String()) + dummy, err := th.st.Load("", c.peer.String()) + require.Nil(t, err) require.Nil(t, dummy) - th.st.Delete("test", c.peer.String()) + err = th.st.Delete("test", c.peer.String()) + require.Nil(t, err) - contains = th.st.Contains("peers", c.peer.String()) + contains, err = th.st.Contains("peers", c.peer.String()) + require.Nil(t, err) require.False(t, contains) } } func (th *testHolder) CustomBulkPutContainsLoadDelete(t *testing.T) { - pairs := make([]storage.Pair, 0, len(testData)) - keys := make([]any, 0, len(testData)) + pairs := make([]storage.Entry, 0, len(testData)) + keys := make([]string, 0, len(testData)) for _, c := range testData { key := c.peer.String() keys = append(keys, key) - pairs = append(pairs, storage.Pair{ - Left: key, - Right: c.ih.RawString(), + pairs = append(pairs, storage.Entry{ + Key: key, + Value: c.ih.RawString(), }) } - th.st.BulkPut("test", pairs...) + err := th.st.BulkPut("test", pairs...) + require.Nil(t, err) // check if exist in ctx we put for _, k := range keys { - contains := th.st.Contains("test", k) + contains, err := th.st.Contains("test", k) + require.Nil(t, err) require.True(t, contains) } // check value and type in ctx we put for _, p := range pairs { - out := th.st.Load("test", p.Left) + out, _ := th.st.Load("test", p.Key) ih, err := bittorrent.NewInfoHash(out) require.Nil(t, err) - require.Equal(t, p.Right, ih.RawString()) + require.Equal(t, p.Value, ih.RawString()) } - th.st.Delete("test", keys...) + err = th.st.Delete("test", keys...) + require.Nil(t, err) for _, k := range keys { - contains := th.st.Contains("test", k) + contains, err := th.st.Contains("test", k) + require.Nil(t, err) require.False(t, contains) } }