diff --git a/.github/workflows/lint.yaml b/.github/workflows/lint.yaml index a357920..e2fd4d4 100644 --- a/.github/workflows/lint.yaml +++ b/.github/workflows/lint.yaml @@ -15,7 +15,7 @@ jobs: - uses: "actions/checkout@v3" - uses: "actions/setup-go@v3" with: - go-version: "^1.18" + go-version: "^1.19" - uses: "authzed/actions/gofumpt@main" - uses: "authzed/actions/go-mod-tidy@main" - uses: "authzed/actions/go-generate@main" diff --git a/cmd/mochi-e2e/e2e.go b/cmd/mochi-e2e/e2e.go index 0d68274..9841c72 100644 --- a/cmd/mochi-e2e/e2e.go +++ b/cmd/mochi-e2e/e2e.go @@ -12,6 +12,8 @@ import ( "github.com/anacrolix/torrent/tracker" "github.com/sot-tech/mochi/bittorrent" + + _ "github.com/sot-tech/mochi/pkg/randseed" ) func main() { diff --git a/cmd/mochi/config.go b/cmd/mochi/config.go index b32385e..d51ebaf 100644 --- a/cmd/mochi/config.go +++ b/cmd/mochi/config.go @@ -7,11 +7,12 @@ import ( "gopkg.in/yaml.v3" + fh "github.com/sot-tech/mochi/frontend/http" + fu "github.com/sot-tech/mochi/frontend/udp" "github.com/sot-tech/mochi/pkg/conf" - // Imports to register frontends - _ "github.com/sot-tech/mochi/frontend/http" - _ "github.com/sot-tech/mochi/frontend/udp" + // Seed math random + _ "github.com/sot-tech/mochi/pkg/randseed" // Imports to register middleware hooks. _ "github.com/sot-tech/mochi/middleware/clientapproval" @@ -21,12 +22,12 @@ import ( // Imports to register storage drivers. _ "github.com/sot-tech/mochi/storage/keydb" - _ "github.com/sot-tech/mochi/storage/memory" + sm "github.com/sot-tech/mochi/storage/memory" _ "github.com/sot-tech/mochi/storage/pg" _ "github.com/sot-tech/mochi/storage/redis" ) -// Config represents the configuration used for executing Conf. +// Config represents the configuration used for Server start. type Config struct { // TODO(jzelinskie): Evaluate whether we would like to make // AnnounceInterval and MinAnnounceInterval optional. @@ -42,6 +43,27 @@ type Config struct { PostHooks []conf.NamedMapConfig `yaml:"posthooks"` } +// QuickConfig is the simple configuration for quick start without config file. +// Includes in-memory store, http and udp frontends without any middleware. +var QuickConfig = &Config{ + Frontends: []conf.NamedMapConfig{ + { + Name: fh.Name, + Config: conf.MapConfig{}, + }, + { + Name: fu.Name, + Config: conf.MapConfig{}, + }, + }, + Storage: conf.NamedMapConfig{ + Name: sm.Name, + Config: conf.MapConfig{}, + }, + PreHooks: []conf.NamedMapConfig{}, + PostHooks: []conf.NamedMapConfig{}, +} + // ParseConfigFile returns a new Config given the path to a YAML // configuration file. // diff --git a/cmd/mochi/main.go b/cmd/mochi/main.go index f8564e6..e5bea9f 100644 --- a/cmd/mochi/main.go +++ b/cmd/mochi/main.go @@ -18,23 +18,37 @@ const ( logPrettyArg = "logPretty" logColorsArg = "logColored" configArg = "config" + quickArg = "quick" ) func main() { - var s Server + var err error logOut := flag.String(logOutArg, "stderr", "output for logging, might be 'stderr', 'stdout' or file path") logLevel := flag.String(logLevelArg, "warn", "logging level: trace, debug, info, warn, error, fatal, panic") logPretty := flag.Bool(logPrettyArg, false, "enable log pretty print. used only if 'logOut' set to 'stdout' or 'stderr'. if not set, log outputs json") + //goland:noinspection GoBoolExpressions logColored := flag.Bool(logColorsArg, runtime.GOOS == "windows", "enable log coloring. used only if set 'logPretty'") configPath := flag.String(configArg, "/etc/mochi.yaml", "location of configuration file") + quickStart := flag.Bool(quickArg, false, "start tracker with default configuration (all frontends, in-memory store, no hooks)") flag.Parse() - if err := l.ConfigureLogger(*logOut, *logLevel, *logPretty, *logColored); err != nil { + if err = l.ConfigureLogger(*logOut, *logLevel, *logPretty, *logColored); err != nil { log.Fatal("unable to configure logger: ", err) } - if err := s.Run(*configPath); err != nil { + var cfg *Config + if *quickStart { + cfg = QuickConfig + } else { + cfg, err = ParseConfigFile(*configPath) + if err != nil { + log.Fatal("unable to read config file: ", err) + } + } + var s Server + + if err = s.Run(cfg); err != nil { log.Fatal("unable to start server: ", err) } defer s.Shutdown() diff --git a/cmd/mochi/server.go b/cmd/mochi/server.go index 94a618c..6f16750 100644 --- a/cmd/mochi/server.go +++ b/cmd/mochi/server.go @@ -25,12 +25,7 @@ type Server struct { // Run begins an instance of Conf. // It is optional to provide an instance of the peer store to avoid the // creation of a new one. -func (r *Server) Run(configFilePath string) error { - cfg, err := ParseConfigFile(configFilePath) - if err != nil { - return fmt.Errorf("failed to read config: %w", err) - } - +func (r *Server) Run(cfg *Config) (err error) { if len(cfg.MetricsAddr) > 0 { log.Info().Str("address", cfg.MetricsAddr).Msg("starting metrics server") r.frontends = append(r.frontends, metrics.NewServer(cfg.MetricsAddr)) diff --git a/dist/example_config.yaml b/dist/example_config.yaml index 3e92b1c..558373c 100644 --- a/dist/example_config.yaml +++ b/dist/example_config.yaml @@ -34,10 +34,14 @@ frontends: tls_key_path: "" # Enable SO_REUSEPORT to allow starting multiple mochi instances with the same HTTP(S) port. - # You can also use this parameter to define two or mote listeners for the same address and port, - # and (possibly) increase throughput. + # You can also use this parameter to define two or more listeners or separate processes + # for the same address and port, and (possibly) increase throughput (faster queue processing + # because of multiple 'workers'). reuse_port: true + # Number of connection listeners to start. See reuse_port, default is 1. + workers: 1 + # The timeout durations for HTTP requests. read_timeout: 5s write_timeout: 5s @@ -110,10 +114,14 @@ frontends: addr: "0.0.0.0:6969" # Enable SO_REUSEPORT to allow starting multiple mochi instances with the same UDP port. - # You can also use this parameter to define two or mote listeners for the same address and port, - # and (a little) increase throughput (faster queue processing because of multiple 'workers'). + # You can also use this parameter to define two or more listeners or separate processes + # for the same address and port, and (a little) increase throughput (faster queue processing + # because of multiple 'workers'). reuse_port: true + # Number of connection listeners to start. See reuse_port, default is 1. + workers: 1 + # The leeway for a timestamp on a connection ID. max_clock_skew: 10s diff --git a/dist/example_config_pg.yaml b/dist/example_config_pg.yaml index 26647f7..e830291 100644 --- a/dist/example_config_pg.yaml +++ b/dist/example_config_pg.yaml @@ -14,6 +14,7 @@ frontends: tls_cert_path: "" tls_key_path: "" reuse_port: true + workers: 1 read_timeout: 5s write_timeout: 5s enable_keepalive: false @@ -36,6 +37,7 @@ frontends: config: addr: "0.0.0.0:6969" reuse_port: true + workers: 1 max_clock_skew: 10s private_key: "paste a random string here that will be used to hmac connection IDs" enable_request_timing: false diff --git a/dist/example_config_redis.yaml b/dist/example_config_redis.yaml index 792e047..adbbde9 100644 --- a/dist/example_config_redis.yaml +++ b/dist/example_config_redis.yaml @@ -14,6 +14,7 @@ frontends: tls_cert_path: "" tls_key_path: "" reuse_port: true + workers: 1 read_timeout: 5s write_timeout: 5s enable_keepalive: false @@ -36,6 +37,7 @@ frontends: config: addr: "0.0.0.0:6969" reuse_port: true + workers: 1 max_clock_skew: 10s private_key: "paste a random string here that will be used to hmac connection IDs" enable_request_timing: false diff --git a/frontend/frontend.go b/frontend/frontend.go index b00c1e1..9276ed3 100644 --- a/frontend/frontend.go +++ b/frontend/frontend.go @@ -3,8 +3,10 @@ package frontend import ( + "errors" "fmt" "io" + "strings" "sync" "github.com/sot-tech/mochi/middleware" @@ -71,3 +73,34 @@ func NewFrontends(configs []conf.NamedMapConfig, logic *middleware.Logic) (fs [] } return } + +// CloseGroup simultaneously calls Close for each non-nil +// array element and combines non-nil errors into one +func CloseGroup(cls []io.Closer) (err error) { + l := len(cls) + errs := make([]error, l) + wg := sync.WaitGroup{} + wg.Add(l) + for i, c := range cls { + if c != nil { + go func(i int, c io.Closer) { + defer wg.Done() + if e := c.Close(); e != nil { + errs[i] = e + } + }(i, c) + } + } + wg.Wait() + sb := strings.Builder{} + for _, e := range errs { + if e != nil { + sb.WriteString(e.Error()) + sb.WriteString("; ") + } + } + if sb.Len() > 0 { + err = errors.New(sb.String()) + } + return +} diff --git a/frontend/http/frontend.go b/frontend/http/frontend.go index 4e39661..3eea926 100644 --- a/frontend/http/frontend.go +++ b/frontend/http/frontend.go @@ -6,8 +6,10 @@ import ( "context" "crypto/tls" "errors" + "io" "net/http" "net/netip" + "sync" "time" "github.com/julienschmidt/httprouter" @@ -20,14 +22,16 @@ import ( "github.com/sot-tech/mochi/pkg/metrics" ) +// Name - registered name of the frontend +const Name = "http" + var ( - logger = log.NewLogger("frontend/http") - errTLSNotProvided = errors.New("tls certificate/key not provided") - errRoutesNotProvided = errors.New("routes not provided") + logger = log.NewLogger("frontend/http") + errTLSNotProvided = errors.New("tls certificate/key not provided") ) func init() { - frontend.RegisterBuilder("http", NewFrontend) + frontend.RegisterBuilder(Name, NewFrontend) } // Config represents all configurable options for an HTTP BitTorrent Frontend @@ -44,15 +48,17 @@ type Config struct { ParseOptions } -const defaultIdleTimeout = 30 * time.Second +const ( + defaultIdleTimeout = 30 * time.Second + defaultAnnounceRoute = "/announce" + defaultScrapeRoute = "/scrape" +) // Validate sanity checks values set in a config and returns a new config with // default values replacing anything that is invalid. func (cfg Config) Validate() (validCfg Config, err error) { validCfg = cfg - if validCfg.ListenOptions, err = cfg.ListenOptions.Validate(); err != nil { - return - } + validCfg.ListenOptions = cfg.ListenOptions.Validate(false, logger) if cfg.UseTLS && (len(cfg.TLSCertPath) == 0 || len(cfg.TLSKeyPath) == 0) { err = errTLSNotProvided return @@ -68,14 +74,44 @@ func (cfg Config) Validate() (validCfg Config, err error) { Msg("falling back to default configuration") } } - validCfg.ParseOptions.ParseOptions = cfg.ParseOptions.ParseOptions.Validate() + if len(cfg.AnnounceRoutes) == 0 { + validCfg.AnnounceRoutes = []string{defaultAnnounceRoute} + logger.Warn(). + Str("name", "AnnounceRoutes"). + Strs("provided", cfg.AnnounceRoutes). + Strs("default", validCfg.AnnounceRoutes). + Msg("falling back to default configuration") + } + if len(cfg.ScrapeRoutes) == 0 { + validCfg.ScrapeRoutes = []string{defaultScrapeRoute} + logger.Warn(). + Str("name", "ScrapeRoutes"). + Strs("provided", cfg.ScrapeRoutes). + Strs("default", validCfg.ScrapeRoutes). + Msg("falling back to default configuration") + } + validCfg.ParseOptions.ParseOptions = cfg.ParseOptions.ParseOptions.Validate(logger) + return +} + +// httpServer replaces http.Close method with http.Shutdown +type httpServer struct { + *http.Server +} + +func (c httpServer) Close() (err error) { + if c.Server != nil { + err = c.Shutdown(context.Background()) + } return } type httpFE struct { - srv *http.Server + servers []httpServer logic *middleware.Logic collectTimings bool + onceCloser sync.Once + ParseOptions } @@ -89,31 +125,37 @@ func NewFrontend(c conf.MapConfig, logic *middleware.Logic) (frontend.Frontend, if cfg, err = cfg.Validate(); err != nil { return nil, err } - if len(cfg.AnnounceRoutes) < 1 || len(cfg.ScrapeRoutes) < 1 { - return nil, errRoutesNotProvided - } f := &httpFE{ - logic: logic, - srv: &http.Server{ - ReadTimeout: cfg.ReadTimeout, - ReadHeaderTimeout: cfg.ReadTimeout, - WriteTimeout: cfg.WriteTimeout, - IdleTimeout: cfg.IdleTimeout, - }, + logic: logic, + servers: make([]httpServer, cfg.Workers), collectTimings: cfg.EnableRequestTiming, ParseOptions: cfg.ParseOptions, } + for i := range f.servers { + f.servers[i] = httpServer{ + &http.Server{ + ReadTimeout: cfg.ReadTimeout, + ReadHeaderTimeout: cfg.ReadTimeout, + WriteTimeout: cfg.WriteTimeout, + IdleTimeout: cfg.IdleTimeout, + }, + } + } + // If TLS is enabled, create a key pair. if cfg.UseTLS { var cert tls.Certificate if cert, err = tls.LoadX509KeyPair(cfg.TLSCertPath, cfg.TLSKeyPath); err != nil { return nil, err } - f.srv.TLSConfig = &tls.Config{ - Certificates: []tls.Certificate{cert}, - MinVersion: tls.VersionTLS12, + certs := []tls.Certificate{cert} + for i := range f.servers { + f.servers[i].TLSConfig = &tls.Config{ + Certificates: certs, + MinVersion: tls.VersionTLS12, + } } } @@ -128,30 +170,41 @@ func NewFrontend(c conf.MapConfig, logic *middleware.Logic) (frontend.Frontend, router.GET(route, f.ping) router.HEAD(route, f.ping) } - f.srv.Handler = router - f.srv.SetKeepAlivesEnabled(cfg.EnableKeepAlive) - - go func() { - ln, err := cfg.ListenTCP() - if err == nil { - if f.srv.TLSConfig == nil { - err = f.srv.Serve(ln) - } else { - err = f.srv.ServeTLS(ln, "", "") - } - } - if !errors.Is(err, http.ErrServerClosed) { - logger.Fatal().Err(err).Msg("server failed") - } - }() + for _, srv := range f.servers { + srv.Handler = router + srv.SetKeepAlivesEnabled(cfg.EnableKeepAlive) + go runServer(srv, &cfg) + } return f, nil } -// Close provides a thread-safe way to shut down a currently running Frontend. -func (f *httpFE) Close() error { - return f.srv.Shutdown(context.Background()) +func runServer(s httpServer, cfg *Config) { + ln, err := cfg.ListenTCP() + if err == nil { + if s.TLSConfig == nil { + err = s.Serve(ln) + } else { + err = s.ServeTLS(ln, "", "") + } + } + if !errors.Is(err, http.ErrServerClosed) { + logger.Fatal().Err(err).Msg("server failed") + } +} + +// Close provides a thread-safe way to gracefully shut down a currently running Frontend. +func (f *httpFE) Close() (err error) { + f.onceCloser.Do(func() { + cls := make([]io.Closer, len(f.servers)) + for i, s := range f.servers { + cls[i] = s + } + err = frontend.CloseGroup(cls) + }) + + return } func httpParamsToRouteParams(in httprouter.Params) (out bittorrent.RouteParams) { diff --git a/frontend/options.go b/frontend/options.go index c104853..c6ca37a 100644 --- a/frontend/options.go +++ b/frontend/options.go @@ -6,23 +6,22 @@ import ( "time" "github.com/libp2p/go-reuseport" + "github.com/sot-tech/mochi/pkg/log" ) const ( - defaultReadTimeout = 2 * time.Second - defaultWriteTimeout = 2 * time.Second + defaultReadTimeout = 2 * time.Second + defaultWriteTimeout = 2 * time.Second + defaultListenAddress = ":6969" ) -var ( - // ErrAddressNotProvided returned if listen address not provided in configuration - ErrAddressNotProvided = errors.New("address not provided") - errUnexpectedListenerType = errors.New("unexpected listener type") -) +var errUnexpectedListenerType = errors.New("unexpected listener type") // ListenOptions is the base configuration which may be used in net listeners type ListenOptions struct { Addr string - ReusePort bool `cfg:"reuse_port"` + ReusePort bool `cfg:"reuse_port"` + Workers uint ReadTimeout time.Duration `cfg:"read_timeout"` WriteTimeout time.Duration `cfg:"write_timeout"` EnableRequestTiming bool `cfg:"enable_request_timing"` @@ -30,11 +29,24 @@ type ListenOptions struct { // Validate checks if listen address provided and sets default // timeout options if needed -func (lo ListenOptions) Validate() (validOptions ListenOptions, err error) { +func (lo ListenOptions) Validate(ignoreTimeouts bool, logger *log.Logger) (validOptions ListenOptions) { validOptions = lo if len(lo.Addr) == 0 { - err = ErrAddressNotProvided - } else { + validOptions.Addr = defaultListenAddress + logger.Warn(). + Str("name", "Addr"). + Str("provided", lo.Addr). + Str("default", validOptions.Addr). + Msg("falling back to default configuration") + } + if lo.Workers == 0 { + validOptions.Workers = 1 + } + if lo.Workers > 1 && !lo.ReusePort { + validOptions.ReusePort = true + logger.Warn().Msg("forcibly enabling ReusePort because Workers > 1") + } + if !ignoreTimeouts { if lo.ReadTimeout <= 0 { validOptions.ReadTimeout = defaultReadTimeout logger.Warn(). @@ -111,7 +123,7 @@ type ParseOptions struct { // Validate sanity checks values set in a config and returns a new config with // default values replacing anything that is invalid. -func (op ParseOptions) Validate() ParseOptions { +func (op ParseOptions) Validate(logger *log.Logger) ParseOptions { valid := op if op.MaxNumWant <= 0 { valid.MaxNumWant = defaultMaxNumWant diff --git a/frontend/udp/connection_id.go b/frontend/udp/connection_id.go index de5df66..6892477 100644 --- a/frontend/udp/connection_id.go +++ b/frontend/udp/connection_id.go @@ -4,16 +4,29 @@ import ( "crypto/hmac" "encoding/binary" "hash" + "math/rand" "net/netip" "time" - "github.com/minio/sha256-simd" + "github.com/cespare/xxhash/v2" "github.com/sot-tech/mochi/pkg/log" + "github.com/sot-tech/mochi/pkg/xorshift" ) // ttl is the duration a connection ID should be valid according to BEP 15. -const ttl = 2 * time.Minute +var ttl = int64(2 * time.Minute) + +const ( + // length of connection ID + connIDLen = 8 + // uint64 length + 1 byte salt + buffLen = 9 + // 16 bytes enough for hashes with output length up to 128bit + scratchLen = 16 + // length of HMAC in bytes to place it in connection ID + hmacLen = 5 +) // A ConnectionIDGenerator is a reusable generator and validator for connection // IDs as described in BEP 15. @@ -33,17 +46,31 @@ type ConnectionIDGenerator struct { // It will be overwritten by subsequent calls to Generate. connID []byte + // buffer for HMAC input + buff []byte + // scratch is a 32-byte slice that is used as a scratchpad for the generated - // HMACs. + // HMACs to increase hash performance. scratch []byte + + // the leeway for a timestamp on a connection ID. + maxClockSkew int64 + + // PRNG footprint holder + s uint64 } // NewConnectionIDGenerator creates a new connection ID generator. -func NewConnectionIDGenerator(key string) *ConnectionIDGenerator { +func NewConnectionIDGenerator(key []byte, maxClockSkew time.Duration) *ConnectionIDGenerator { return &ConnectionIDGenerator{ - mac: hmac.New(sha256.New, []byte(key)), - connID: make([]byte, 8), - scratch: make([]byte, 32), + mac: hmac.New(func() hash.Hash { + return xxhash.New() + }, key), + connID: make([]byte, connIDLen), + buff: make([]byte, buffLen), + scratch: make([]byte, scratchLen), + maxClockSkew: int64(maxClockSkew), + s: rand.Uint64(), } } @@ -52,60 +79,69 @@ func NewConnectionIDGenerator(key string) *ConnectionIDGenerator { // it after getting a generator from a pool. func (g *ConnectionIDGenerator) reset() { g.mac.Reset() - g.connID = g.connID[:8] + g.connID = g.connID[:connIDLen] + g.buff = g.buff[:buffLen] g.scratch = g.scratch[:0] } // Generate generates an 8-byte connection ID as described in BEP 15 for the // given IP and the current time. // -// The first 4 bytes of the connection identifier is a unix timestamp and the -// last 4 bytes are a truncated HMAC token created from the aforementioned -// unix timestamp and the source IP address of the UDP packet. +// The first byte is random salt, next 2 bytes - truncated unix timestamp +// when ID was generated, last 5 bytes are a truncated HMAC token created +// from salt (1 byte), full unix timestamp (8 bytes) and source IP (4/16 bytes). +// +// Salt used to mitigate generation same MAC if there are several clients +// from same IP sent requests within one second. // // Truncated HMAC is known to be safe for 2^(-n) where n is the size in bits -// of the truncated HMAC token. In this use case we have 32 bits, thus a +// of the truncated HMAC token. In this use case we have 40 bits, thus a // forgery probability of approximately 1 in 4 billion. // -// The generated ID is written to g.connID, which is also returned. g.connID +// The generated ID is written to g.buffer, which is also returned. g.buffer // will be reused, so it must not be referenced after returning the generator // to a pool and will be overwritten be subsequent calls to Generate! -func (g *ConnectionIDGenerator) Generate(ip netip.Addr, now time.Time) []byte { +func (g *ConnectionIDGenerator) Generate(ip netip.Addr, now time.Time) (out []byte) { g.reset() + var r uint64 + r, g.s = xorshift.XorShift64S(g.s) + g.buff[0] = byte(r) + binary.BigEndian.PutUint64(g.buff[1:], uint64(now.Unix())) + g.mac.Write(g.buff) + g.mac.Write(ip.AsSlice()) - binary.BigEndian.PutUint32(g.connID, uint32(now.Unix())) - - g.mac.Write(g.connID[:4]) - ipBytes, _ := ip.MarshalBinary() - g.mac.Write(ipBytes) g.scratch = g.mac.Sum(g.scratch) - copy(g.connID[4:8], g.scratch[:4]) + g.connID[0], g.connID[1], g.connID[2] = g.buff[0], g.buff[7], g.buff[8] + copy(g.connID[connIDLen-hmacLen:], g.scratch[:hmacLen]) log.Debug(). Stringer("ip", ip). - Time("now", now). - Bytes("connID", g.connID). + Hex("connID", g.connID). Msg("generated connection ID") - return g.connID + return g.connID[:connIDLen] } // Validate validates the given connection ID for an IP and the current time. -func (g *ConnectionIDGenerator) Validate(connectionID []byte, ip netip.Addr, now time.Time, maxClockSkew time.Duration) bool { - ts := time.Unix(int64(binary.BigEndian.Uint32(connectionID[:4])), 0) +func (g *ConnectionIDGenerator) Validate(connectionID []byte, ip netip.Addr, now time.Time) bool { + g.reset() + nowTS := now.Unix() + g.buff[0] = connectionID[0] + // connectionID contains only 2 bytes of timestamp, so we clean little 16 bits to place it and rehash. + // We will provide restored full timestamp respectively to current timestamp, + // 2 bytes should be enough to avoid collisions within ~18 hours from same IP. + ts := nowTS&((^int64(0)>>16)<<16) | int64(connectionID[1])<<8 | int64(connectionID[2]) + binary.BigEndian.PutUint64(g.buff[1:], uint64(ts)) + g.mac.Write(g.buff) + g.mac.Write(ip.AsSlice()) + g.scratch = g.mac.Sum(g.scratch) + res := hmac.Equal(g.scratch[:hmacLen], connectionID[connIDLen-hmacLen:connIDLen]) + // ts-skew < now < ts+ttl+skew + res = ts-g.maxClockSkew < nowTS && res + res = nowTS < ts+ttl+g.maxClockSkew && res log.Debug(). Stringer("ip", ip). - Time("ts", ts).Time("now", now). - Bytes("connID", g.connID). + Hex("connID", connectionID). + Bool("result", res). Msg("validating connection ID") - if now.After(ts.Add(ttl)) || ts.After(now.Add(maxClockSkew)) { - return false - } - - g.reset() - - g.mac.Write(connectionID[:4]) - ipBytes, _ := ip.MarshalBinary() - g.mac.Write(ipBytes) - g.scratch = g.mac.Sum(g.scratch) - return hmac.Equal(g.scratch[:4], connectionID[4:]) + return res } diff --git a/frontend/udp/connection_id_test.go b/frontend/udp/connection_id_test.go index 90f1022..8b38b05 100644 --- a/frontend/udp/connection_id_test.go +++ b/frontend/udp/connection_id_test.go @@ -4,65 +4,69 @@ import ( "crypto/hmac" "encoding/binary" "fmt" + "hash" + "math/rand" "net/netip" "sync" "testing" "time" - "github.com/minio/sha256-simd" - "github.com/stretchr/testify/require" - + "github.com/cespare/xxhash/v2" "github.com/sot-tech/mochi/pkg/log" + _ "github.com/sot-tech/mochi/pkg/randseed" + "github.com/stretchr/testify/require" ) var golden = []struct { createdAt int64 now int64 ip string - key string + key []byte valid bool }{ - {0, 1, "127.0.0.1", "", true}, - {0, 420420, "127.0.0.1", "", false}, - {0, 0, "::1", "", true}, + {0, 1, "127.0.0.1", []byte(""), true}, + {0, 420420, "127.0.0.1", []byte(""), false}, + {0, 0, "::1", []byte(""), true}, } // NewConnectionID creates an 8-byte connection identifier for UDP packets as // described by BEP 15. // This is a wrapper around creating a new ConnectionIDGenerator and generating // an ID. It is recommended to use the generator for performance. -func NewConnectionID(ip netip.Addr, now time.Time, key string) []byte { - return NewConnectionIDGenerator(key).Generate(ip, now) +func NewConnectionID(ip netip.Addr, now time.Time, key []byte) []byte { + return NewConnectionIDGenerator(key, 0).Generate(ip, now) } // ValidConnectionID determines whether a connection identifier is legitimate. // This is a wrapper around creating a new ConnectionIDGenerator and validating // the ID. It is recommended to use the generator for performance. -func ValidConnectionID(connectionID []byte, ip netip.Addr, now time.Time, maxClockSkew time.Duration, key string) bool { - return NewConnectionIDGenerator(key).Validate(connectionID, ip, now, maxClockSkew) +func ValidConnectionID(connectionID []byte, ip netip.Addr, now time.Time, maxClockSkew time.Duration, key []byte) bool { + return NewConnectionIDGenerator(key, maxClockSkew).Validate(connectionID, ip, now) } // simpleNewConnectionID generates a new connection ID the explicit way. // This is used to verify correct behaviour of the generator. -func simpleNewConnectionID(ip netip.Addr, now time.Time, key string) []byte { - buf := make([]byte, 8) - binary.BigEndian.PutUint32(buf, uint32(now.Unix())) - - mac := hmac.New(sha256.New, []byte(key)) - mac.Write(buf[:4]) - ipBytes, _ := ip.MarshalBinary() - mac.Write(ipBytes) - macBytes := mac.Sum(nil)[:4] - copy(buf[4:], macBytes) +func simpleNewConnectionID(ip netip.Addr, now time.Time, key []byte) []byte { + buffer := make([]byte, 9) + mac := hmac.New(func() hash.Hash { + return xxhash.New() + }, key) + buffer[0] = byte(rand.Int()) + binary.BigEndian.PutUint64(buffer[1:], uint64(now.Unix())) + mac.Write(buffer) + mac.Write(ip.AsSlice()) + buffer[1], buffer[2] = buffer[7], buffer[8] + copy(buffer[3:8], mac.Sum(nil)) + buffer = buffer[:8] // this is just in here because logging impacts performance and we benchmark // this version too. log.Debug(). Stringer("ip", ip). - Time("now", now). - Bytes("connID", buf). - Msg("manually generated connection ID") - return buf + Time("ts", now). + Hex("connID", buffer). + Msg("generated connection ID") + return buffer } func TestVerification(t *testing.T) { @@ -82,7 +86,7 @@ func TestGeneration(t *testing.T) { t.Run(fmt.Sprintf("%s created at %d", tt.ip, tt.createdAt), func(t *testing.T) { want := simpleNewConnectionID(netip.MustParseAddr(tt.ip), time.Unix(tt.createdAt, 0), tt.key) got := NewConnectionID(netip.MustParseAddr(tt.ip), time.Unix(tt.createdAt, 0), tt.key) - require.Equal(t, want, got) + require.NotEqual(t, want, got) // IDs should NOT be equal because of salt }) } } @@ -93,11 +97,11 @@ func TestReuseGeneratorGenerate(t *testing.T) { cid := NewConnectionID(netip.MustParseAddr(tt.ip), time.Unix(tt.createdAt, 0), tt.key) require.Len(t, cid, 8) - gen := NewConnectionIDGenerator(tt.key) + gen := NewConnectionIDGenerator(tt.key, 0) for i := 0; i < 3; i++ { connID := gen.Generate(netip.MustParseAddr(tt.ip), time.Unix(tt.createdAt, 0)) - require.Equal(t, cid, connID) + require.NotEqual(t, cid, connID) // IDs should NOT be equal because of salt } }) } @@ -106,10 +110,10 @@ func TestReuseGeneratorGenerate(t *testing.T) { func TestReuseGeneratorValidate(t *testing.T) { for _, tt := range golden { t.Run(fmt.Sprintf("%s created at %d verified at %d", tt.ip, tt.createdAt, tt.now), func(t *testing.T) { - gen := NewConnectionIDGenerator(tt.key) + gen := NewConnectionIDGenerator(tt.key, time.Minute) cid := gen.Generate(netip.MustParseAddr(tt.ip), time.Unix(tt.createdAt, 0)) for i := 0; i < 3; i++ { - got := gen.Validate(cid, netip.MustParseAddr(tt.ip), time.Unix(tt.now, 0), time.Minute) + got := gen.Validate(cid, netip.MustParseAddr(tt.ip), time.Unix(tt.now, 0)) if got != tt.valid { t.Errorf("expected validity: %t got validity: %t", tt.valid, got) } @@ -120,7 +124,7 @@ func TestReuseGeneratorValidate(t *testing.T) { func BenchmarkSimpleNewConnectionID(b *testing.B) { ip := netip.MustParseAddr("127.0.0.1") - key := "some random string that is hopefully at least this long" + key := []byte("some random string that is hopefully at least this long") createdAt := time.Now() b.RunParallel(func(pb *testing.PB) { @@ -137,7 +141,7 @@ func BenchmarkSimpleNewConnectionID(b *testing.B) { func BenchmarkNewConnectionID(b *testing.B) { ip := netip.MustParseAddr("127.0.0.1") - key := "some random string that is hopefully at least this long" + key := []byte("some random string that is hopefully at least this long") createdAt := time.Now() b.RunParallel(func(pb *testing.PB) { @@ -154,12 +158,12 @@ func BenchmarkNewConnectionID(b *testing.B) { func BenchmarkConnectionIDGenerator_Generate(b *testing.B) { ip := netip.MustParseAddr("127.0.0.1") - key := "some random string that is hopefully at least this long" + key := []byte("some random string that is hopefully at least this long") createdAt := time.Now() pool := &sync.Pool{ New: func() any { - return NewConnectionIDGenerator(key) + return NewConnectionIDGenerator(key, 0) }, } @@ -176,7 +180,7 @@ func BenchmarkConnectionIDGenerator_Generate(b *testing.B) { func BenchmarkValidConnectionID(b *testing.B) { ip := netip.MustParseAddr("127.0.0.1") - key := "some random string that is hopefully at least this long" + key := []byte("some random string that is hopefully at least this long") createdAt := time.Now() cid := NewConnectionID(ip, createdAt, key) @@ -191,20 +195,20 @@ func BenchmarkValidConnectionID(b *testing.B) { func BenchmarkConnectionIDGenerator_Validate(b *testing.B) { ip := netip.MustParseAddr("127.0.0.1") - key := "some random string that is hopefully at least this long" + key := []byte("some random string that is hopefully at least this long") createdAt := time.Now() cid := NewConnectionID(ip, createdAt, key) pool := &sync.Pool{ New: func() any { - return NewConnectionIDGenerator(key) + return NewConnectionIDGenerator(key, 10*time.Second) }, } b.RunParallel(func(pb *testing.PB) { for pb.Next() { gen := pool.Get().(*ConnectionIDGenerator) - if !gen.Validate(cid, ip, createdAt, 10*time.Second) { + if !gen.Validate(cid, ip, createdAt) { b.FailNow() } pool.Put(gen) diff --git a/frontend/udp/frontend.go b/frontend/udp/frontend.go index 97c960e..6ee2360 100644 --- a/frontend/udp/frontend.go +++ b/frontend/udp/frontend.go @@ -7,6 +7,7 @@ import ( "context" "encoding/binary" "errors" + "io" "math/rand" "net" "net/netip" @@ -23,13 +24,21 @@ import ( "github.com/sot-tech/mochi/pkg/timecache" ) +const ( + // Name - registered name of the frontend + Name = "udp" + defaultKeyLen = 32 + maxAllowedClockSkew = 30 * time.Second + defaultMaxClockSkew = 10 * time.Second +) + var ( logger = log.NewLogger("frontend/udp") allowedGeneratedPrivateKeyRunes = []rune("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ1234567890") ) func init() { - frontend.RegisterBuilder("udp", NewFrontend) + frontend.RegisterBuilder(Name, NewFrontend) } // Config represents all the configurable options for a UDP BitTorrent @@ -43,17 +52,13 @@ type Config struct { // Validate sanity checks values set in a config and returns a new config with // default values replacing anything that is invalid. -func (cfg Config) Validate() (validCfg Config, err error) { - if len(cfg.Addr) == 0 { - err = frontend.ErrAddressNotProvided - return - } - +func (cfg Config) Validate() (validCfg Config) { validCfg = cfg + validCfg.ListenOptions = cfg.ListenOptions.Validate(true, logger) // Generate a private key if one isn't provided by the user. if cfg.PrivateKey == "" { - pkeyRunes := make([]rune, 64) + pkeyRunes := make([]rune, defaultKeyLen) for i := range pkeyRunes { pkeyRunes[i] = allowedGeneratedPrivateKeyRunes[rand.Intn(len(allowedGeneratedPrivateKeyRunes))] } @@ -62,23 +67,35 @@ func (cfg Config) Validate() (validCfg Config, err error) { logger.Warn(). Str("name", "PrivateKey"). Str("provided", ""). - Str("key", validCfg.PrivateKey). + Str("default", validCfg.PrivateKey). Msg("falling back to default configuration") } - validCfg.ParseOptions = cfg.ParseOptions.Validate() + // ABS + sb := cfg.MaxClockSkew >> 63 + validCfg.MaxClockSkew = (cfg.MaxClockSkew ^ sb) + (sb & 1) + + if validCfg.MaxClockSkew == 0 || validCfg.MaxClockSkew > maxAllowedClockSkew { + validCfg.MaxClockSkew = defaultMaxClockSkew + logger.Warn(). + Str("name", "MaxClockSkew"). + Dur("provided", cfg.MaxClockSkew). + Dur("default", validCfg.MaxClockSkew). + Msg("falling back to default configuration") + } + + validCfg.ParseOptions = cfg.ParseOptions.Validate(logger) return } // udpFE holds the state of a UDP BitTorrent Frontend. type udpFE struct { - socket *net.UDPConn + sockets []*net.UDPConn closing chan any wg sync.WaitGroup genPool *sync.Pool logic *middleware.Logic - maxClockSkew time.Duration collectTimings bool ctxCancel context.CancelFunc onceCloser sync.Once @@ -92,47 +109,56 @@ func NewFrontend(c conf.MapConfig, logic *middleware.Logic) (frontend.Frontend, if err = c.Unmarshal(&cfg); err != nil { return nil, err } - if cfg, err = cfg.Validate(); err != nil { - return nil, err - } + cfg = cfg.Validate() + pKey := []byte(cfg.PrivateKey) f := &udpFE{ + sockets: make([]*net.UDPConn, cfg.Workers), closing: make(chan any), logic: logic, - maxClockSkew: cfg.MaxClockSkew, collectTimings: cfg.EnableRequestTiming, ParseOptions: cfg.ParseOptions, genPool: &sync.Pool{ New: func() any { - return NewConnectionIDGenerator(cfg.PrivateKey) + return NewConnectionIDGenerator(pKey, cfg.MaxClockSkew) }, }, } - if f.socket, err = cfg.ListenUDP(); err == nil { - var ctx context.Context - ctx, f.ctxCancel = context.WithCancel(context.Background()) - f.wg.Add(1) - go func(ctx context.Context) { - if err := f.serve(ctx); err != nil { - logger.Fatal().Err(err).Msg("server failed") - } - }(ctx) + var ctx context.Context + ctx, f.ctxCancel = context.WithCancel(context.Background()) + for i := range f.sockets { + if f.sockets[i], err = cfg.ListenUDP(); err == nil { + f.wg.Add(1) + go func(socket *net.UDPConn, ctx context.Context) { + if err := f.serve(ctx, socket); err != nil { + logger.Fatal().Err(err).Msg("server failed") + } + }(f.sockets[i], ctx) + } + } + if err != nil { + _ = f.Close() } return f, err } // Close provides a thread-safe way to shut down a currently running Frontend. -func (t *udpFE) Close() (err error) { - t.onceCloser.Do(func() { - close(t.closing) - if t.socket != nil { - t.ctxCancel() - _ = t.socket.SetReadDeadline(time.Now()) - t.wg.Wait() - err = t.socket.Close() +func (f *udpFE) Close() (err error) { + f.onceCloser.Do(func() { + close(f.closing) + f.ctxCancel() + cls := make([]io.Closer, 0, len(f.sockets)) + now := time.Now() + for _, s := range f.sockets { + if s != nil { + _ = s.SetDeadline(now) + cls = append(cls, s) + } } + f.wg.Wait() + err = frontend.CloseGroup(cls) }) return @@ -140,14 +166,14 @@ func (t *udpFE) Close() (err error) { // serve blocks while listening and serving UDP BitTorrent requests // until Stop() is called or an error is returned. -func (t *udpFE) serve(ctx context.Context) error { +func (f *udpFE) serve(ctx context.Context, socket *net.UDPConn) error { pool := bytepool.NewBytePool(2048) - defer t.wg.Done() + defer f.wg.Done() for { // Check to see if we need shutdown. select { - case <-t.closing: + case <-f.closing: log.Debug().Msg("serve received shutdown signal") return nil default: @@ -155,7 +181,7 @@ func (t *udpFE) serve(ctx context.Context) error { // Read a UDP packet into a reusable buffer. buffer := pool.Get() - n, addrPort, err := t.socket.ReadFromUDPAddrPort(*buffer) + n, addrPort, err := socket.ReadFromUDPAddrPort(*buffer) if err != nil { pool.Put(buffer) var netErr net.Error @@ -172,22 +198,22 @@ func (t *udpFE) serve(ctx context.Context) error { continue } - t.wg.Add(1) + f.wg.Add(1) go func() { - defer t.wg.Done() + defer f.wg.Done() defer pool.Put(buffer) // Handle the request. addr := addrPort.Addr().Unmap() var start time.Time - if t.collectTimings && metrics.Enabled() { + if f.collectTimings && metrics.Enabled() { start = time.Now() } - action, err := t.handleRequest(ctx, + action, err := f.handleRequest(ctx, Request{(*buffer)[:n], addr}, - ResponseWriter{t.socket, addrPort}, + ResponseWriter{socket, addrPort}, ) - if t.collectTimings && metrics.Enabled() { + if f.collectTimings && metrics.Enabled() { recordResponseDuration(action, addr, err, time.Since(start)) } }() @@ -213,7 +239,7 @@ func (w ResponseWriter) Write(b []byte) (int, error) { } // handleRequest parses and responds to a UDP Request. -func (t *udpFE) handleRequest(ctx context.Context, r Request, w ResponseWriter) (actionName string, err error) { +func (f *udpFE) handleRequest(ctx context.Context, r Request, w ResponseWriter) (actionName string, err error) { if len(r.Packet) < 16 { // Malformed, no client packets are less than 16 bytes. // We explicitly return nothing in case this is a DoS attempt. @@ -227,12 +253,12 @@ func (t *udpFE) handleRequest(ctx context.Context, r Request, w ResponseWriter) txID := r.Packet[12:16] // get a connection ID generator/validator from the pool. - gen := t.genPool.Get().(*ConnectionIDGenerator) - defer t.genPool.Put(gen) + gen := f.genPool.Get().(*ConnectionIDGenerator) + defer f.genPool.Put(gen) // If this isn't requesting a new connection ID and the connection ID is // invalid, then fail. - if actionID != connectActionID && !gen.Validate(connID, r.IP, timecache.Now(), t.maxClockSkew) { + if actionID != connectActionID && !gen.Validate(connID, r.IP, timecache.Now()) { err = errBadConnectionID WriteError(w, txID, err) return @@ -254,7 +280,7 @@ func (t *udpFE) handleRequest(ctx context.Context, r Request, w ResponseWriter) actionName = "announce" var req *bittorrent.AnnounceRequest - req, err = ParseAnnounce(r, actionID == announceV6ActionID, t.ParseOptions) + req, err = ParseAnnounce(r, actionID == announceV6ActionID, f.ParseOptions) if err != nil { WriteError(w, txID, err) return @@ -262,7 +288,7 @@ func (t *udpFE) handleRequest(ctx context.Context, r Request, w ResponseWriter) var resp *bittorrent.AnnounceResponse ctx := bittorrent.InjectRouteParamsToContext(ctx, bittorrent.RouteParams{}) - ctx, resp, err = t.logic.HandleAnnounce(ctx, req) + ctx, resp, err = f.logic.HandleAnnounce(ctx, req) if err != nil { WriteError(w, txID, err) return @@ -271,13 +297,13 @@ func (t *udpFE) handleRequest(ctx context.Context, r Request, w ResponseWriter) WriteAnnounce(w, txID, resp, actionID == announceV6ActionID, r.IP.Is6()) ctx = bittorrent.RemapRouteParamsToBgContext(ctx) - go t.logic.AfterAnnounce(ctx, req, resp) + go f.logic.AfterAnnounce(ctx, req, resp) case scrapeActionID: actionName = "scrape" var req *bittorrent.ScrapeRequest - req, err = ParseScrape(r, t.ParseOptions) + req, err = ParseScrape(r, f.ParseOptions) if err != nil { WriteError(w, txID, err) return @@ -285,7 +311,7 @@ func (t *udpFE) handleRequest(ctx context.Context, r Request, w ResponseWriter) var resp *bittorrent.ScrapeResponse ctx := bittorrent.InjectRouteParamsToContext(ctx, bittorrent.RouteParams{}) - ctx, resp, err = t.logic.HandleScrape(ctx, req) + ctx, resp, err = f.logic.HandleScrape(ctx, req) if err != nil { WriteError(w, txID, err) return @@ -294,7 +320,7 @@ func (t *udpFE) handleRequest(ctx context.Context, r Request, w ResponseWriter) WriteScrape(w, txID, resp) ctx = bittorrent.RemapRouteParamsToBgContext(ctx) - go t.logic.AfterScrape(ctx, req, resp) + go f.logic.AfterScrape(ctx, req, resp) default: err = errUnknownAction diff --git a/go.mod b/go.mod index ea3ef88..027ab97 100644 --- a/go.mod +++ b/go.mod @@ -3,12 +3,13 @@ module github.com/sot-tech/mochi go 1.19 require ( - code.cloudfoundry.org/go-diodes v0.0.0-20221102172008-608069b4b4a6 - github.com/MicahParks/keyfunc v1.5.3 - github.com/anacrolix/torrent v1.47.0 + code.cloudfoundry.org/go-diodes v0.0.0-20221205211830-fe848ebc2e22 + github.com/MicahParks/keyfunc v1.7.0 + github.com/anacrolix/torrent v1.47.1-0.20221102120345-c63f7e1bd720 + github.com/cespare/xxhash/v2 v2.2.0 github.com/go-redis/redis/v8 v8.11.5 - github.com/golang-jwt/jwt/v4 v4.4.2 - github.com/jackc/pgx/v5 v5.1.0 + github.com/golang-jwt/jwt/v4 v4.4.3 + github.com/jackc/pgx/v5 v5.2.0 github.com/julienschmidt/httprouter v1.3.0 github.com/libp2p/go-reuseport v0.2.0 github.com/minio/sha256-simd v1.0.0 @@ -20,18 +21,19 @@ require ( ) require ( - github.com/anacrolix/dht/v2 v2.19.1 // indirect + github.com/anacrolix/dht/v2 v2.19.2 // indirect github.com/anacrolix/log v0.13.2-0.20220711050817-613cb738ef30 // indirect github.com/anacrolix/missinggo v1.3.0 // indirect github.com/anacrolix/missinggo/v2 v2.7.1 // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/bradfitz/iter v0.0.0-20191230175014-e8f45d346db8 // indirect - github.com/cespare/xxhash/v2 v2.1.2 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect github.com/fsnotify/fsnotify v1.6.0 // indirect + github.com/go-task/slim-sprig v0.0.0-20210107165309-348f09dbbbc0 // indirect github.com/golang/protobuf v1.5.2 // indirect - github.com/huandu/xstrings v1.3.3 // indirect + github.com/google/pprof v0.0.0-20221203041831-ce31453925ec // indirect + github.com/huandu/xstrings v1.4.0 // indirect github.com/jackc/pgpassfile v1.0.0 // indirect github.com/jackc/pgservicefile v0.0.0-20200714003250-2b9c44734f2b // indirect github.com/jackc/puddle/v2 v2.1.2 // indirect @@ -39,14 +41,16 @@ require ( github.com/mattn/go-colorable v0.1.13 // indirect github.com/mattn/go-isatty v0.0.16 // indirect github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect + github.com/onsi/ginkgo/v2 v2.5.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/prometheus/client_model v0.3.0 // indirect github.com/prometheus/common v0.37.0 // indirect github.com/prometheus/procfs v0.8.0 // indirect go.uber.org/atomic v1.10.0 // indirect - golang.org/x/crypto v0.2.0 // indirect + golang.org/x/crypto v0.3.0 // indirect golang.org/x/sync v0.1.0 // indirect - golang.org/x/sys v0.2.0 // indirect - golang.org/x/text v0.4.0 // indirect + golang.org/x/sys v0.3.0 // indirect + golang.org/x/text v0.5.0 // indirect + golang.org/x/tools v0.3.0 // indirect google.golang.org/protobuf v1.28.1 // indirect ) diff --git a/go.sum b/go.sum index 967678a..bcd27ec 100644 --- a/go.sum +++ b/go.sum @@ -30,15 +30,15 @@ cloud.google.com/go/storage v1.5.0/go.mod h1:tpKbwo567HUNpVclU5sGELwQWBDZ8gh0Zeo cloud.google.com/go/storage v1.6.0/go.mod h1:N7U0C8pVQ/+NIKOBQyamJIeKQKkZ+mxpohlUTyfDhBk= cloud.google.com/go/storage v1.8.0/go.mod h1:Wv1Oy7z6Yz3DshWRJFhqM/UCfaWIRTdp0RXyy7KQOVs= cloud.google.com/go/storage v1.10.0/go.mod h1:FLPqc6j+Ki4BU591ie1oL6qBQGu2Bl/tZ9ullr3+Kg0= -code.cloudfoundry.org/go-diodes v0.0.0-20221102172008-608069b4b4a6 h1:Hg6Rci5c2GQcEkSaY8kks8AGhVnAFIMdEmoQn8/Pju8= -code.cloudfoundry.org/go-diodes v0.0.0-20221102172008-608069b4b4a6/go.mod h1:NAaPdNjrSKVX+nRPy67obdqcyn9HGFh4UP/cwD23kkc= +code.cloudfoundry.org/go-diodes v0.0.0-20221205211830-fe848ebc2e22 h1:9YvghSI3zAz5rn9uEHhSV3gH1bQ2mo56lh/B5m1XRCo= +code.cloudfoundry.org/go-diodes v0.0.0-20221205211830-fe848ebc2e22/go.mod h1:5+eDDXV/io7PFGz0Hr2DsT7JNXtGTKInNUsvRfN1bU0= crawshaw.io/iox v0.0.0-20181124134642-c51c3df30797/go.mod h1:sXBiorCo8c46JlQV3oXPKINnZ8mcqnye1EkVkqsectk= crawshaw.io/sqlite v0.3.2/go.mod h1:igAO5JulrQ1DbdZdtVq48mnZUBAPOeFzer7VhDWNtW4= dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo= -github.com/MicahParks/keyfunc v1.5.3 h1:Y+mv+kX3HtL7/dCXXzK4bIDBHg91eunnGGkdndO0RWk= -github.com/MicahParks/keyfunc v1.5.3/go.mod h1:IdnCilugA0O/99dW+/MkvlyrsX8+L8+x95xuVNtM5jw= +github.com/MicahParks/keyfunc v1.7.0 h1:LBd4tBj6FwGs2S4GXniQbgrG0PXzIldyGDKWch8slhg= +github.com/MicahParks/keyfunc v1.7.0/go.mod h1:IdnCilugA0O/99dW+/MkvlyrsX8+L8+x95xuVNtM5jw= github.com/RoaringBitmap/roaring v0.4.7/go.mod h1:8khRDP4HmeXns4xIj9oGrKSz7XTQiJx2zgh7AcNke4w= github.com/RoaringBitmap/roaring v0.4.17/go.mod h1:D3qVegWTmfCaX4Bl5CrBE9hfrSrrXIr8KVNvRsDi1NI= github.com/RoaringBitmap/roaring v0.4.23/go.mod h1:D0gp8kJQgE1A4LQ5wFLggQEyvDi06Mq5mKs52e1TwOo= @@ -49,8 +49,8 @@ github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuy github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho= -github.com/anacrolix/dht/v2 v2.19.1 h1:V/UUGBASGYqYkSnmHJwX8uQmzkyhbgwE6jqcHKnNTD8= -github.com/anacrolix/dht/v2 v2.19.1/go.mod h1:3TU93c1s/oA8I/VH4m3CNP/BeKsiOGmo6HwfZBMTKUs= +github.com/anacrolix/dht/v2 v2.19.2 h1:U+lbEFYwa9yajIW2oE9jBxt0K8DlT10pSTYbXyfCxC8= +github.com/anacrolix/dht/v2 v2.19.2/go.mod h1:MctKM1HS5YYDb3F30NGJxLE+QPuqWoT5ReW/4jt8xew= github.com/anacrolix/envpprof v0.0.0-20180404065416-323002cec2fa/go.mod h1:KgHhUaQMc8cC0+cEflSgCFNFbKwi5h54gqtVn8yhP7c= github.com/anacrolix/envpprof v1.0.0/go.mod h1:KgHhUaQMc8cC0+cEflSgCFNFbKwi5h54gqtVn8yhP7c= github.com/anacrolix/envpprof v1.1.0/go.mod h1:My7T5oSqVfEn4MD4Meczkw/f5lSIndGAKu/0SM/rkf4= @@ -73,8 +73,8 @@ github.com/anacrolix/stm v0.2.0/go.mod h1:zoVQRvSiGjGoTmbM0vSLIiaKjWtNPeTvXUSdJQ github.com/anacrolix/tagflag v0.0.0-20180109131632-2146c8d41bf0/go.mod h1:1m2U/K6ZT+JZG0+bdMK6qauP49QT4wE5pmhJXOKKCHw= github.com/anacrolix/tagflag v1.0.0/go.mod h1:1m2U/K6ZT+JZG0+bdMK6qauP49QT4wE5pmhJXOKKCHw= github.com/anacrolix/tagflag v1.1.0/go.mod h1:Scxs9CV10NQatSmbyjqmqmeQNwGzlNe0CMUMIxqHIG8= -github.com/anacrolix/torrent v1.47.0 h1:aDUnhQZ8+kfStLICHiXOGGYVFgDENK+kz4q96linyRg= -github.com/anacrolix/torrent v1.47.0/go.mod h1:SYPxEUjMwqhDr3kWGzyQLkFMuAb1bgJ57JRMpuD3ZzE= +github.com/anacrolix/torrent v1.47.1-0.20221102120345-c63f7e1bd720 h1:3cvS7QtC2hcrIOfT44fKDfPZUcXvkDf9SkI1mN6TB+s= +github.com/anacrolix/torrent v1.47.1-0.20221102120345-c63f7e1bd720/go.mod h1:SYPxEUjMwqhDr3kWGzyQLkFMuAb1bgJ57JRMpuD3ZzE= github.com/apache/thrift v0.12.0/go.mod h1:cp2SuWMxlEZw2r+iP2GNCdIi4C1qmUzdZFSVb+bacwQ= github.com/benbjohnson/immutable v0.2.0/go.mod h1:uc6OHo6PN2++n98KHLxW8ef4W42ylHiQSENghE1ezxI= github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= @@ -87,8 +87,9 @@ github.com/bradfitz/iter v0.0.0-20191230175014-e8f45d346db8 h1:GKTyiRCL6zVf5wWaq github.com/bradfitz/iter v0.0.0-20191230175014-e8f45d346db8/go.mod h1:spo1JLcs67NmW1aVLEgtA8Yy1elc+X8y5SRW1sFW4Og= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= -github.com/cespare/xxhash/v2 v2.1.2 h1:YRXhKfTDauu4ajMg1TPgFO5jnlC2HCbmLXMcTG5cbYE= github.com/cespare/xxhash/v2 v2.1.2/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= +github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44= +github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI= github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI= github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU= @@ -132,14 +133,18 @@ github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9 github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk= github.com/go-logfmt/logfmt v0.5.0/go.mod h1:wCYkCAKZfumFQihp8CzCvQ3paCTfi41vtzG1KdI/P7A= github.com/go-logfmt/logfmt v0.5.1/go.mod h1:WYhtIu8zTZfxdn5+rREduYbwxfcBr/Vr6KEVveWlfTs= +github.com/go-logr/logr v1.2.3 h1:2DntVwHkVopvECVRSlL5PSo9eG+cAkDCuckLubN+rq0= github.com/go-redis/redis/v8 v8.11.5 h1:AcZZR7igkdvfVmQTPnu9WE37LRrO/YrBH5zWyjDC0oI= github.com/go-redis/redis/v8 v8.11.5/go.mod h1:gREzHqY1hg6oD9ngVRbLStwAWKhA0FEgq8Jd4h5lpwo= github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= +github.com/go-task/slim-sprig v0.0.0-20210107165309-348f09dbbbc0 h1:p104kn46Q8WdvHunIJ9dAyjPVtrBPhSr3KT2yUst43I= +github.com/go-task/slim-sprig v0.0.0-20210107165309-348f09dbbbc0/go.mod h1:fyg7847qk6SyHyPtNmDHnmrv/HOrqktSC+C9fM+CJOE= github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= github.com/gogo/protobuf v1.2.0/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= -github.com/golang-jwt/jwt/v4 v4.4.2 h1:rcc4lwaZgFMCZ5jxF9ABolDcIHdBytAFgqFPbSJQAYs= github.com/golang-jwt/jwt/v4 v4.4.2/go.mod h1:m21LjoU+eqJr34lmDMbreY2eSTRJ1cv77w39/MY0Ch0= +github.com/golang-jwt/jwt/v4 v4.4.3 h1:Hxl6lhQFj4AnOX6MLrsCb/+7tCj7DxP7VA+2rDIq5AU= +github.com/golang-jwt/jwt/v4 v4.4.3/go.mod h1:m21LjoU+eqJr34lmDMbreY2eSTRJ1cv77w39/MY0Ch0= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= github.com/golang/groupcache v0.0.0-20191227052852-215e87163ea7/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= @@ -193,6 +198,8 @@ github.com/google/pprof v0.0.0-20200212024743-f11f1df84d12/go.mod h1:ZgVRPoUq/hf github.com/google/pprof v0.0.0-20200229191704-1ebb73c60ed3/go.mod h1:ZgVRPoUq/hfqzAqh7sHMqb3I9Rq5C59dIz2SbBwJ4eM= github.com/google/pprof v0.0.0-20200430221834-fc25d7d30c6d/go.mod h1:ZgVRPoUq/hfqzAqh7sHMqb3I9Rq5C59dIz2SbBwJ4eM= github.com/google/pprof v0.0.0-20200708004538-1a94d8640e99/go.mod h1:ZgVRPoUq/hfqzAqh7sHMqb3I9Rq5C59dIz2SbBwJ4eM= +github.com/google/pprof v0.0.0-20221203041831-ce31453925ec h1:fR20TYVVwhK4O7r7y+McjRYyaTH6/vjwJOajE+XhlzM= +github.com/google/pprof v0.0.0-20221203041831-ce31453925ec/go.mod h1:dDKJzRmX4S37WGHujM7tX//fmj1uioxKzKxz3lo4HJo= github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI= github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg= github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5mhpdKc/us6bOk= @@ -208,15 +215,15 @@ github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpO github.com/huandu/xstrings v1.0.0/go.mod h1:4qWG/gcEcfX4z/mBDHJ++3ReCw9ibxbsNJbcucJdbSo= github.com/huandu/xstrings v1.2.0/go.mod h1:DvyZB1rfVYsBIigL8HwpZgxHwXozlTgGqn63UyNX5k4= github.com/huandu/xstrings v1.3.1/go.mod h1:y5/lhBue+AyNmUVz9RLU9xbLR0o4KIIExikq4ovT0aE= -github.com/huandu/xstrings v1.3.3 h1:/Gcsuc1x8JVbJ9/rlye4xZnVAbEkGauT8lbebqcQws4= -github.com/huandu/xstrings v1.3.3/go.mod h1:y5/lhBue+AyNmUVz9RLU9xbLR0o4KIIExikq4ovT0aE= +github.com/huandu/xstrings v1.4.0 h1:D17IlohoQq4UcpqD7fDk80P7l+lwAmlFaBHgOipl2FU= +github.com/huandu/xstrings v1.4.0/go.mod h1:y5/lhBue+AyNmUVz9RLU9xbLR0o4KIIExikq4ovT0aE= github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc= github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM= github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg= github.com/jackc/pgservicefile v0.0.0-20200714003250-2b9c44734f2b h1:C8S2+VttkHFdOOCXJe+YGfa4vHYwlt4Zx+IVXQ97jYg= github.com/jackc/pgservicefile v0.0.0-20200714003250-2b9c44734f2b/go.mod h1:vsD4gTJCa9TptPL8sPkXrLZ+hDuNrZCnj29CQpr4X1E= -github.com/jackc/pgx/v5 v5.1.0 h1:Z7pLKUb65HK6m18No8GGKT87K34NhIIEHa86rRdjxbU= -github.com/jackc/pgx/v5 v5.1.0/go.mod h1:Ptn7zmohNsWEsdxRawMzk3gaKma2obW+NWTnKa0S4nk= +github.com/jackc/pgx/v5 v5.2.0 h1:NdPpngX0Y6z6XDFKqmFQaE+bCtkqzvQIOt1wvBlAqs8= +github.com/jackc/pgx/v5 v5.2.0/go.mod h1:Ptn7zmohNsWEsdxRawMzk3gaKma2obW+NWTnKa0S4nk= github.com/jackc/puddle/v2 v2.1.2 h1:0f7vaaXINONKTsxYDn4otOAiJanX/BMeAtY//BXqzlg= github.com/jackc/puddle/v2 v2.1.2/go.mod h1:2lpufsF5mRHO6SuZkm0fNYxM6SWHfvyFj62KwNzgels= github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4= @@ -274,8 +281,10 @@ github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE= github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= github.com/onsi/ginkgo v1.7.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= github.com/onsi/ginkgo v1.16.5 h1:8xi0RTUf59SOSfEtZMvwTvXYMzG4gV23XVHOZiXNtnE= +github.com/onsi/ginkgo/v2 v2.5.1 h1:auzK7OI497k6x4OvWq+TKAcpcSAlod0doAH72oIN0Jw= +github.com/onsi/ginkgo/v2 v2.5.1/go.mod h1:63DOGlLAH8+REH8jUGdL3YpCpu7JODesutUjdENfUAc= github.com/onsi/gomega v1.4.3/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY= -github.com/onsi/gomega v1.23.0 h1:/oxKu9c2HVap+F3PfKort2Hw5DEU+HGlW8n+tguWsys= +github.com/onsi/gomega v1.24.1 h1:KORJXNNTzJXzu4ScJWssJfJMnJ+2QJqhoQSRwNlze9E= github.com/openzipkin/zipkin-go v0.1.6/go.mod h1:QgAqvLzwWbR/WpD4A3cGpPtJrZXNIiJc5AZX7/PBEpw= github.com/philhofer/fwd v1.0.0/go.mod h1:gk3iGcWd9+svBvR0sR+KPcfE+RNWozjowpeBVG3ZVNU= github.com/pierrec/lz4 v2.0.5+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY= @@ -372,8 +381,8 @@ golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8U golang.org/x/crypto v0.0.0-20190605123033-f99c8df09eb5/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= -golang.org/x/crypto v0.2.0 h1:BRXPfhNivWL5Yq0BGQ39a2sW6t44aODpfxkWjYdzewE= -golang.org/x/crypto v0.2.0/go.mod h1:hebNnKkNXi2UzZN1eVRvBB7co0a+JxK6XbPiWVs/3J4= +golang.org/x/crypto v0.3.0 h1:a06MkbcxBrEFc0w0QIZWXrH/9cCX6KJyWbBOIwAn+7A= +golang.org/x/crypto v0.3.0/go.mod h1:hebNnKkNXi2UzZN1eVRvBB7co0a+JxK6XbPiWVs/3J4= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190510132918-efd6b22b2522/go.mod h1:ZjyILWgesfNpC6sMxTJOJm9Kp84zZh5NQWvqDGG3Qr8= @@ -502,8 +511,8 @@ golang.org/x/sys v0.0.0-20220114195835-da31bd327af9/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20220704084225-05e143d24a9e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220908164124-27713097b956/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.2.0 h1:ljd4t30dBnAvMZaQCevtY0xLLD0A+bRZXbgLMLU1F/A= -golang.org/x/sys v0.2.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.3.0 h1:w8ZOecv6NaNa/zC8944JTU3vz4u6Lagfk4RPQxv92NQ= +golang.org/x/sys v0.3.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= @@ -513,8 +522,8 @@ golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= -golang.org/x/text v0.4.0 h1:BrVqGRd7+k1DiOgtnFvAkoQEWQvBc25ouMJM6429SFg= -golang.org/x/text v0.4.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= +golang.org/x/text v0.5.0 h1:OLmvp0KP+FVG99Ct/qFiL/Fhk4zp4QQnZ7b2U+5piUM= +golang.org/x/text v0.5.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= @@ -559,6 +568,8 @@ golang.org/x/tools v0.0.0-20200618134242-20370b0cb4b2/go.mod h1:EkVYQZoAsY45+roY golang.org/x/tools v0.0.0-20200729194436-6467de6f59a7/go.mod h1:njjCfa9FT2d7l9Bc6FUM5FLjQPp3cFF28FI3qnDFljA= golang.org/x/tools v0.0.0-20200804011535-6c149bb5ef0d/go.mod h1:njjCfa9FT2d7l9Bc6FUM5FLjQPp3cFF28FI3qnDFljA= golang.org/x/tools v0.0.0-20200825202427-b303f430e36d/go.mod h1:njjCfa9FT2d7l9Bc6FUM5FLjQPp3cFF28FI3qnDFljA= +golang.org/x/tools v0.3.0 h1:SrNbZl6ECOS1qFzgTdQfWXZM9XBkiA6tkFrH9YSTPHM= +golang.org/x/tools v0.3.0/go.mod h1:/rWhSS2+zyEVwoJf8YAX6L2f0ntZ7Kn/mGgAWcipA5k= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= diff --git a/middleware/varinterval/varinterval.go b/middleware/varinterval/varinterval.go index 0587d3e..02b4f7e 100644 --- a/middleware/varinterval/varinterval.go +++ b/middleware/varinterval/varinterval.go @@ -13,6 +13,7 @@ import ( "github.com/sot-tech/mochi/bittorrent" "github.com/sot-tech/mochi/middleware" "github.com/sot-tech/mochi/pkg/conf" + "github.com/sot-tech/mochi/pkg/xorshift" "github.com/sot-tech/mochi/storage" ) @@ -81,10 +82,10 @@ type hook struct { func (h *hook) HandleAnnounce(ctx context.Context, req *bittorrent.AnnounceRequest, resp *bittorrent.AnnounceResponse) (context.Context, error) { // Generate a probability p < 1.0. - p, s0, s1 := xoroshiro128p(deriveEntropyFromRequest(req)) + p, s0, s1 := xorshift.XoRoShiRo128SS(deriveEntropyFromRequest(req)) if float32(float64(p)/math.MaxUint64) < h.cfg.ModifyResponseProbability { // Generate the increase delta. - v, _, _ := xoroshiro128p(s0, s1) + v, _, _ := xorshift.XoRoShiRo128SS(s0, s1) add := time.Duration(v%uint64(h.cfg.MaxIncreaseDelta)+1) * time.Second resp.Interval += add @@ -113,14 +114,3 @@ func deriveEntropyFromRequest(req *bittorrent.AnnounceRequest) (v0 uint64, v1 ui v1 = binary.BigEndian.Uint64(req.ID[:8]) + binary.BigEndian.Uint64(req.ID[8:16]) return } - -// xoroshiro128p calculates predictable pseudorandom number -// with XOR/rotate/shift/rotate 128+ algorithm. -// see https://prng.di.unimi.it/xoroshiro128plus.c -func xoroshiro128p(s0, s1 uint64) (result, ns0, ns1 uint64) { - result = s0 + s1 - s1 ^= s0 - ns0 = ((s0 << 24) | (s0 >> 40)) ^ s1 ^ (s1 << 16) // rotl(s0, 24) ^ s1 ^ (s1 << 16) - ns1 = (s1 << 37) | (s1 >> 27) // rotl(s1, 37) - return -} diff --git a/middleware/varinterval/varinterval_test.go b/middleware/varinterval/varinterval_test.go index 8ff8ee6..c792485 100644 --- a/middleware/varinterval/varinterval_test.go +++ b/middleware/varinterval/varinterval_test.go @@ -3,7 +3,6 @@ package varinterval import ( "context" "fmt" - "math/rand" "testing" "github.com/stretchr/testify/require" @@ -64,13 +63,3 @@ func TestHandleAnnounce(t *testing.T) { require.True(t, resp.Interval > 0, "interval should have been increased") require.True(t, resp.MinInterval > 0, "min_interval should have been increased") } - -func BenchmarkXORoShiRo128Plus(b *testing.B) { - s0, s1 := rand.Uint64(), rand.Uint64() - var v uint64 - b.ResetTimer() - for i := 0; i < b.N; i++ { - v, s0, s1 = xoroshiro128p(s0, s1) - } - _, _, _ = v, s0, s1 -} diff --git a/pkg/randseed/rand_seed.go b/pkg/randseed/rand_seed.go index f752756..6f46c67 100644 --- a/pkg/randseed/rand_seed.go +++ b/pkg/randseed/rand_seed.go @@ -13,7 +13,7 @@ func init() { } // GenSeed returns 64bit seed from crypto/rand source or -// from current time, if crypto random error occurred +// from current time, if crypto random read error occurred func GenSeed() (seed int64) { r := make([]byte, 8) if _, err := cr.Read(r); err == nil { diff --git a/pkg/xorshift/prng.go b/pkg/xorshift/prng.go new file mode 100644 index 0000000..e153517 --- /dev/null +++ b/pkg/xorshift/prng.go @@ -0,0 +1,27 @@ +// Package xorshift contains functions for fast generating +// predictable pseudorandom numbers +// See https://prng.di.unimi.it . +package xorshift + +// XoRoShiRo128SS calculates predictable pseudorandom number +// with XOR/rotate/shift/rotate 128** (xoroshiro128starstar) algorithm. +// In some cases a little faster than XorShift64S, but uses 128 bits footprint. +// see https://prng.di.unimi.it/xoroshiro128starstar.c +func XoRoShiRo128SS(s0, s1 uint64) (uint64, uint64, uint64) { + r := s0 * 5 + r = ((r << 7) | (r >> 57)) * 9 // rotl(s0*5, 7) * 9 + s1 ^= s0 + s0 = ((s0 << 24) | (s0 >> 40)) ^ s1 ^ (s1 << 16) // rotl(s0, 24) ^ s1 ^ (s1 << 16) + s1 = (s1 << 37) | (s1 >> 27) // rotl(s1, 37) + return r, s0, s1 +} + +// XorShift64S calculates predictable pseudorandom number +// with XOR/Shift 64* (shorshift64*) algorithm. +// see https://vigna.di.unimi.it/ftp/papers/xorshift.pdf +func XorShift64S(s uint64) (uint64, uint64) { + s ^= s >> 12 + s ^= s << 25 + s ^= s >> 27 + return s * uint64(0x2545F4914F6CDD1D), s +} diff --git a/pkg/xorshift/prng_test.go b/pkg/xorshift/prng_test.go new file mode 100644 index 0000000..ce93c0b --- /dev/null +++ b/pkg/xorshift/prng_test.go @@ -0,0 +1,30 @@ +package xorshift + +import ( + "math/rand" + "testing" +) + +func BenchmarkRand(b *testing.B) { + var cnt uint64 + for i := 0; i < b.N; i++ { + cnt = rand.Uint64() + } + _ = cnt +} + +func BenchmarkXoRoShiRo128SS(b *testing.B) { + v, s0, s1 := uint64(0), rand.Uint64(), rand.Uint64() + for i := 0; i < b.N; i++ { + v, s0, s1 = XoRoShiRo128SS(s0, s1) + } + _, _, _ = v, s0, s1 +} + +func BenchmarkXorShift64Star(b *testing.B) { + v, s := uint64(0), rand.Uint64() + for i := 0; i < b.N; i++ { + v, s = XorShift64S(s) + } + _, _ = v, s +} diff --git a/storage/memory/storage.go b/storage/memory/storage.go index 8d9740b..be27b4c 100644 --- a/storage/memory/storage.go +++ b/storage/memory/storage.go @@ -8,6 +8,7 @@ import ( "math" "runtime" "sync" + "sync/atomic" "time" "github.com/sot-tech/mochi/bittorrent" @@ -18,14 +19,20 @@ import ( "github.com/sot-tech/mochi/storage" ) -// Default config constants. -const defaultShardCount = 1024 +const ( + // Name - registered name of the storage + Name = "memory" + // Default config constants. + defaultShardCount = 1024 + // -1 + decrUint64 = ^uint64(0) +) var logger = log.NewLogger("storage/memory") func init() { // Register the storage driver. - storage.RegisterDriver("memory", builder) + storage.RegisterDriver(Name, builder) } func builder(icfg conf.MapConfig) (storage.PeerStorage, error) { @@ -50,7 +57,7 @@ func (cfg Config) Validate() Config { if cfg.ShardCount <= 0 || cfg.ShardCount > (math.MaxInt/2) { validcfg.ShardCount = defaultShardCount - log.Warn(). + logger.Warn(). Str("name", "ShardCount"). Int("provided", cfg.ShardCount). Int("default", validcfg.ShardCount). @@ -64,38 +71,137 @@ func (cfg Config) Validate() Config { func NewPeerStorage(provided Config) (storage.PeerStorage, error) { cfg := provided.Validate() ps := &peerStore{ - cfg: cfg, shards: make([]*peerShard, cfg.ShardCount*2), DataStorage: NewDataStorage(), - closed: make(chan struct{}), + closed: make(chan any), } for i := 0; i < cfg.ShardCount*2; i++ { - ps.shards[i] = &peerShard{swarms: make(map[bittorrent.InfoHash]swarm)} + ps.shards[i] = &peerShard{swarms: &ihSwarm{m: make(map[bittorrent.InfoHash]swarm)}} } return ps, nil } type peerShard struct { - swarms map[bittorrent.InfoHash]swarm - numSeeders uint64 - numLeechers uint64 + swarms *ihSwarm + numSeeders atomic.Uint64 + numLeechers atomic.Uint64 +} + +type ihSwarm struct { + m map[bittorrent.InfoHash]swarm sync.RWMutex } +func (p *ihSwarm) get(k bittorrent.InfoHash) (v swarm, ok bool) { + p.RLock() + v, ok = p.m[k] + p.RUnlock() + return +} + +func (p *ihSwarm) getOrCreate(k bittorrent.InfoHash) (v swarm) { + var ok bool + if v, ok = p.get(k); !ok { + p.Lock() + if v, ok = p.m[k]; !ok { + v = swarm{ + seeders: &peers{m: make(map[bittorrent.Peer]int64)}, + leechers: &peers{m: make(map[bittorrent.Peer]int64)}, + } + p.m[k] = v + } + p.Unlock() + } + return +} + +func (p *ihSwarm) del(k bittorrent.InfoHash) (ok bool) { + p.Lock() + if _, ok = p.m[k]; ok { + delete(p.m, k) + } + p.Unlock() + return +} + +func (p *ihSwarm) len() int { + return len(p.m) +} + +func (p *ihSwarm) keys(fn func(k bittorrent.InfoHash) bool) { + p.RLock() + for k := range p.m { + if !fn(k) { + break + } + } + p.RUnlock() +} + type swarm struct { // map serialized peer to mtime - seeders map[bittorrent.Peer]int64 - leechers map[bittorrent.Peer]int64 + seeders *peers + leechers *peers +} + +type peers struct { + m map[bittorrent.Peer]int64 + sync.RWMutex +} + +func (p *peers) get(k bittorrent.Peer) (v int64, ok bool) { + p.RLock() + v, ok = p.m[k] + p.RUnlock() + return +} + +func (p *peers) set(k bittorrent.Peer, v int64) { + p.Lock() + p.m[k] = v + p.Unlock() +} + +func (p *peers) del(k bittorrent.Peer) (ok bool) { + p.Lock() + if _, ok = p.m[k]; ok { + delete(p.m, k) + } + p.Unlock() + return +} + +func (p *peers) len() int { + return len(p.m) +} + +func (p *peers) keys(fn func(k bittorrent.Peer) bool) { + p.RLock() + for k := range p.m { + if !fn(k) { + break + } + } + p.RUnlock() +} + +func (p *peers) forEach(fn func(k bittorrent.Peer, v int64) bool) { + p.RLock() + for k, v := range p.m { + if !fn(k, v) { + break + } + } + p.RUnlock() } type peerStore struct { storage.DataStorage - cfg Config shards []*peerShard - closed chan struct{} + closed chan any wg sync.WaitGroup onceCloser sync.Once } @@ -140,17 +246,15 @@ func (ps *peerStore) ScheduleStatisticsCollection(reportInterval time.Duration) before := time.Now() // aggregates metrics over all shards and then posts them to // prometheus. - var numInfohashes, numSeeders, numLeechers uint64 + var numInfoHashes, numSeeders, numLeechers uint64 for _, s := range ps.shards { - s.RLock() - numInfohashes += uint64(len(s.swarms)) - numSeeders += s.numSeeders - numLeechers += s.numLeechers - s.RUnlock() + numInfoHashes += uint64(s.swarms.len()) + numSeeders += s.numSeeders.Load() + numLeechers += s.numLeechers.Load() } - storage.PromInfoHashesCount.Set(float64(numInfohashes)) + storage.PromInfoHashesCount.Set(float64(numInfoHashes)) storage.PromSeedersCount.Set(float64(numSeeders)) storage.PromLeechersCount.Set(float64(numLeechers)) logger.Debug().TimeDiff("timeTaken", time.Now(), before).Msg("populate prom complete") @@ -182,29 +286,19 @@ func (ps *peerStore) PutSeeder(_ context.Context, ih bittorrent.InfoHash, p bitt Object("peer", p). Msg("put seeder") - shard := ps.shards[ps.shardIndex(ih, p.Addr().Is6())] - shard.Lock() - defer shard.Unlock() + sh := ps.shards[ps.shardIndex(ih, p.Addr().Is6())] + sw := sh.swarms.getOrCreate(ih) - if _, ok := shard.swarms[ih]; !ok { - shard.swarms[ih] = swarm{ - seeders: make(map[bittorrent.Peer]int64), - leechers: make(map[bittorrent.Peer]int64), - } + if _, exists := sw.seeders.get(p); !exists { + sh.numSeeders.Add(1) } - // If this peer isn't already a seeder, update the stats for the swarm. - if _, ok := shard.swarms[ih].seeders[p]; !ok { - shard.numSeeders++ - } - - // Update the peer in the swarm. - shard.swarms[ih].seeders[p] = timecache.NowUnixNano() + sw.seeders.set(p, timecache.NowUnixNano()) return nil } -func (ps *peerStore) DeleteSeeder(_ context.Context, ih bittorrent.InfoHash, p bittorrent.Peer) error { +func (ps *peerStore) DeleteSeeder(_ context.Context, ih bittorrent.InfoHash, p bittorrent.Peer) (err error) { select { case <-ps.closed: panic("attempted to interact with stopped memory store") @@ -215,26 +309,16 @@ func (ps *peerStore) DeleteSeeder(_ context.Context, ih bittorrent.InfoHash, p b Object("peer", p). Msg("delete seeder") - shard := ps.shards[ps.shardIndex(ih, p.Addr().Is6())] - shard.Lock() - defer shard.Unlock() - - if _, ok := shard.swarms[ih]; !ok { - return storage.ErrResourceDoesNotExist + sh := ps.shards[ps.shardIndex(ih, p.Addr().Is6())] + if sw, ok := sh.swarms.get(ih); ok { + if sw.seeders.del(p) { + sh.numSeeders.Add(decrUint64) + } + } else { + err = storage.ErrResourceDoesNotExist } - if _, ok := shard.swarms[ih].seeders[p]; !ok { - return storage.ErrResourceDoesNotExist - } - - shard.numSeeders-- - delete(shard.swarms[ih].seeders, p) - - if len(shard.swarms[ih].seeders)|len(shard.swarms[ih].leechers) == 0 { - delete(shard.swarms, ih) - } - - return nil + return } func (ps *peerStore) PutLeecher(_ context.Context, ih bittorrent.InfoHash, p bittorrent.Peer) error { @@ -248,29 +332,19 @@ func (ps *peerStore) PutLeecher(_ context.Context, ih bittorrent.InfoHash, p bit Object("peer", p). Msg("put leecher") - shard := ps.shards[ps.shardIndex(ih, p.Addr().Is6())] - shard.Lock() - defer shard.Unlock() + sh := ps.shards[ps.shardIndex(ih, p.Addr().Is6())] + sw := sh.swarms.getOrCreate(ih) - if _, ok := shard.swarms[ih]; !ok { - shard.swarms[ih] = swarm{ - seeders: make(map[bittorrent.Peer]int64), - leechers: make(map[bittorrent.Peer]int64), - } + if _, exists := sw.leechers.get(p); !exists { + sh.numLeechers.Add(1) } - // If this peer isn't already a leecher, update the stats for the swarm. - if _, ok := shard.swarms[ih].leechers[p]; !ok { - shard.numLeechers++ - } - - // Update the peer in the swarm. - shard.swarms[ih].leechers[p] = timecache.NowUnixNano() + sw.leechers.set(p, timecache.NowUnixNano()) return nil } -func (ps *peerStore) DeleteLeecher(_ context.Context, ih bittorrent.InfoHash, p bittorrent.Peer) error { +func (ps *peerStore) DeleteLeecher(_ context.Context, ih bittorrent.InfoHash, p bittorrent.Peer) (err error) { select { case <-ps.closed: panic("attempted to interact with stopped memory store") @@ -281,26 +355,16 @@ func (ps *peerStore) DeleteLeecher(_ context.Context, ih bittorrent.InfoHash, p Object("peer", p). Msg("delete leecher") - shard := ps.shards[ps.shardIndex(ih, p.Addr().Is6())] - shard.Lock() - defer shard.Unlock() - - if _, ok := shard.swarms[ih]; !ok { - return storage.ErrResourceDoesNotExist + sh := ps.shards[ps.shardIndex(ih, p.Addr().Is6())] + if sw, ok := sh.swarms.get(ih); ok { + if sw.leechers.del(p) { + sh.numLeechers.Add(decrUint64) + } + } else { + err = storage.ErrResourceDoesNotExist } - if _, ok := shard.swarms[ih].leechers[p]; !ok { - return storage.ErrResourceDoesNotExist - } - - shard.numLeechers-- - delete(shard.swarms[ih].leechers, p) - - if len(shard.swarms[ih].seeders)|len(shard.swarms[ih].leechers) == 0 { - delete(shard.swarms, ih) - } - - return nil + return } func (ps *peerStore) GraduateLeecher(_ context.Context, ih bittorrent.InfoHash, p bittorrent.Peer) error { @@ -314,61 +378,22 @@ func (ps *peerStore) GraduateLeecher(_ context.Context, ih bittorrent.InfoHash, Object("peer", p). Msg("graduate leecher") - shard := ps.shards[ps.shardIndex(ih, p.Addr().Is6())] - shard.Lock() - defer shard.Unlock() + sh := ps.shards[ps.shardIndex(ih, p.Addr().Is6())] + sw := sh.swarms.getOrCreate(ih) - if _, ok := shard.swarms[ih]; !ok { - shard.swarms[ih] = swarm{ - seeders: make(map[bittorrent.Peer]int64), - leechers: make(map[bittorrent.Peer]int64), - } + if sw.leechers.del(p) { + sh.numLeechers.Add(decrUint64) } - // If this peer is a leecher, update the stats for the swarm and remove them. - if _, ok := shard.swarms[ih].leechers[p]; ok { - shard.numLeechers-- - delete(shard.swarms[ih].leechers, p) + if _, exists := sw.seeders.get(p); !exists { + sh.numSeeders.Add(1) } - // If this peer isn't already a seeder, update the stats for the swarm. - if _, ok := shard.swarms[ih].seeders[p]; !ok { - shard.numSeeders++ - } - - // Update the peer in the swarm. - shard.swarms[ih].seeders[p] = timecache.NowUnixNano() + sw.seeders.set(p, timecache.NowUnixNano()) return nil } -func parsePeers(peersMap map[bittorrent.Peer]int64, maxCount int) (peers []bittorrent.Peer) { - for p := range peersMap { - if maxCount == 0 { - break - } - peers = append(peers, p) - maxCount-- - } - return -} - -func (ps *peerStore) getPeers(shard *peerShard, ih bittorrent.InfoHash, maxCount int, forSeeder bool) (peers []bittorrent.Peer) { - shard.RLock() - defer shard.RUnlock() - if swarm, ok := shard.swarms[ih]; ok { - if forSeeder { - peers = parsePeers(swarm.leechers, maxCount) - } else { - peers = append(peers, parsePeers(swarm.seeders, maxCount)...) - if maxCount -= len(peers); maxCount > 0 { - peers = append(peers, parsePeers(swarm.leechers, maxCount)...) - } - } - } - return -} - func (ps *peerStore) AnnouncePeers(_ context.Context, ih bittorrent.InfoHash, forSeeder bool, numWant int, v6 bool) (peers []bittorrent.Peer, err error) { select { case <-ps.closed: @@ -382,18 +407,31 @@ func (ps *peerStore) AnnouncePeers(_ context.Context, ih bittorrent.InfoHash, fo Bool("v6", v6). Msg("announce peers") - peers = ps.getPeers(ps.shards[ps.shardIndex(ih, v6)], ih, numWant, forSeeder) + if sw, ok := ps.shards[ps.shardIndex(ih, v6)].swarms.get(ih); ok { + peers = make([]bittorrent.Peer, 0, numWant/2) + rangeFn := func(p bittorrent.Peer) bool { + peers = append(peers, p) + numWant-- + return numWant > 0 + } + if forSeeder { + sw.leechers.keys(rangeFn) + } else { + sw.seeders.keys(rangeFn) + if numWant > 0 { + sw.leechers.keys(rangeFn) + } + } + } return } func (ps *peerStore) countPeers(ih bittorrent.InfoHash, v6 bool) (leechers, seeders uint32) { shard := ps.shards[ps.shardIndex(ih, v6)] - shard.RLock() - defer shard.RUnlock() - if swarm, ok := shard.swarms[ih]; ok { - leechers, seeders = uint32(len(swarm.leechers)), uint32(len(swarm.seeders)) + if sw, ok := shard.swarms.get(ih); ok { + leechers, seeders = uint32(sw.leechers.len()), uint32(sw.seeders.len()) } return } @@ -483,42 +521,40 @@ func (ps *peerStore) gc(cutoff time.Time) { cutoffUnix := cutoff.UnixNano() for _, shard := range ps.shards { - shard.RLock() - var infohashes []bittorrent.InfoHash - for ih := range shard.swarms { - infohashes = append(infohashes, ih) - } - shard.RUnlock() + infoHashes := make([]bittorrent.InfoHash, 0, shard.swarms.len()) + shard.swarms.keys(func(ih bittorrent.InfoHash) bool { + infoHashes = append(infoHashes, ih) + return true + }) runtime.Gosched() - for _, ih := range infohashes { - shard.Lock() - - if _, stillExists := shard.swarms[ih]; !stillExists { - shard.Unlock() + for _, ih := range infoHashes { + sw, stillExists := shard.swarms.get(ih) + if !stillExists { runtime.Gosched() continue } - for pk, mtime := range shard.swarms[ih].leechers { + sw.leechers.forEach(func(p bittorrent.Peer, mtime int64) bool { if mtime <= cutoffUnix { - shard.numLeechers-- - delete(shard.swarms[ih].leechers, pk) + sw.leechers.del(p) + shard.numLeechers.Add(decrUint64) } - } + return true + }) - for pk, mtime := range shard.swarms[ih].seeders { + sw.seeders.forEach(func(p bittorrent.Peer, mtime int64) bool { if mtime <= cutoffUnix { - shard.numSeeders-- - delete(shard.swarms[ih].seeders, pk) + sw.seeders.del(p) + shard.numSeeders.Add(decrUint64) } + return true + }) + + if sw.leechers.len()|sw.seeders.len() == 0 { + shard.swarms.del(ih) } - if len(shard.swarms[ih].seeders)|len(shard.swarms[ih].leechers) == 0 { - delete(shard.swarms, ih) - } - - shard.Unlock() runtime.Gosched() } @@ -534,13 +570,6 @@ func (ps *peerStore) Close() error { ps.onceCloser.Do(func() { close(ps.closed) ps.wg.Wait() - - // Explicitly deallocate our storage. - shards := make([]*peerShard, len(ps.shards)) - for i := 0; i < len(ps.shards); i++ { - shards[i] = &peerShard{swarms: make(map[bittorrent.InfoHash]swarm)} - } - ps.shards = shards }) return nil diff --git a/storage/pg/storage.go b/storage/pg/storage.go index 124e2bf..af0210b 100644 --- a/storage/pg/storage.go +++ b/storage/pg/storage.go @@ -475,7 +475,7 @@ func (s *store) getPeers(ctx context.Context, ih bittorrent.InfoHash, seeders bo } else { logger.Warn(). Err(err). - Bytes("peerID", id). + Hex("peerID", id). IPAddr("ip", ip). Int("port", port). Msg("unable to scan/construct peer") @@ -552,7 +552,7 @@ func (s *store) countPeers(ctx context.Context, ih []byte) (seeders uint32, leec } } if err != nil { - logger.Error().Err(err).Bytes("infoHash", ih).Msg("unable to get peers count") + logger.Error().Err(err).Hex("infoHash", ih).Msg("unable to get peers count") } return } diff --git a/storage/storage.go b/storage/storage.go index 5b3aa73..7a4d0ae 100644 --- a/storage/storage.go +++ b/storage/storage.go @@ -44,7 +44,7 @@ func (c Config) sanitizeGCConfig() (gcInterval, peerTTL time.Duration) { if c.GarbageCollectionInterval <= 0 { gcInterval = DefaultGarbageCollectionInterval logger.Warn(). - Str("name", "garbageCollectionInterval"). + Str("name", "GarbageCollectionInterval"). Dur("provided", c.GarbageCollectionInterval). Dur("default", DefaultGarbageCollectionInterval). Msg("falling back to default configuration") @@ -54,7 +54,7 @@ func (c Config) sanitizeGCConfig() (gcInterval, peerTTL time.Duration) { if c.PeerLifetime <= 0 { peerTTL = DefaultPeerLifetime logger.Warn(). - Str("name", "peerLifetime"). + Str("name", "PeerLifetime"). Dur("provided", c.PeerLifetime). Dur("default", DefaultPeerLifetime). Msg("falling back to default configuration") @@ -68,7 +68,7 @@ func (c Config) sanitizeStatisticsConfig() (statInterval time.Duration) { if c.PrometheusReportingInterval < 0 { statInterval = DefaultPrometheusReportingInterval logger.Warn(). - Str("name", "prometheusReportingInterval"). + Str("name", "PrometheusReportingInterval"). Dur("provided", c.PrometheusReportingInterval). Dur("default", DefaultPrometheusReportingInterval). Msg("falling back to default configuration")