Merge pull request #23 from sot-tech/test

Minor improvements
This commit is contained in:
SOT-TECH
2022-12-06 13:56:39 +03:00
committed by GitHub
24 changed files with 724 additions and 435 deletions
+1 -1
View File
@@ -15,7 +15,7 @@ jobs:
- uses: "actions/checkout@v3"
- uses: "actions/setup-go@v3"
with:
go-version: "^1.18"
go-version: "^1.19"
- uses: "authzed/actions/gofumpt@main"
- uses: "authzed/actions/go-mod-tidy@main"
- uses: "authzed/actions/go-generate@main"
+2
View File
@@ -12,6 +12,8 @@ import (
"github.com/anacrolix/torrent/tracker"
"github.com/sot-tech/mochi/bittorrent"
_ "github.com/sot-tech/mochi/pkg/randseed"
)
func main() {
+27 -5
View File
@@ -7,11 +7,12 @@ import (
"gopkg.in/yaml.v3"
fh "github.com/sot-tech/mochi/frontend/http"
fu "github.com/sot-tech/mochi/frontend/udp"
"github.com/sot-tech/mochi/pkg/conf"
// Imports to register frontends
_ "github.com/sot-tech/mochi/frontend/http"
_ "github.com/sot-tech/mochi/frontend/udp"
// Seed math random
_ "github.com/sot-tech/mochi/pkg/randseed"
// Imports to register middleware hooks.
_ "github.com/sot-tech/mochi/middleware/clientapproval"
@@ -21,12 +22,12 @@ import (
// Imports to register storage drivers.
_ "github.com/sot-tech/mochi/storage/keydb"
_ "github.com/sot-tech/mochi/storage/memory"
sm "github.com/sot-tech/mochi/storage/memory"
_ "github.com/sot-tech/mochi/storage/pg"
_ "github.com/sot-tech/mochi/storage/redis"
)
// Config represents the configuration used for executing Conf.
// Config represents the configuration used for Server start.
type Config struct {
// TODO(jzelinskie): Evaluate whether we would like to make
// AnnounceInterval and MinAnnounceInterval optional.
@@ -42,6 +43,27 @@ type Config struct {
PostHooks []conf.NamedMapConfig `yaml:"posthooks"`
}
// QuickConfig is the simple configuration for quick start without config file.
// Includes in-memory store, http and udp frontends without any middleware.
var QuickConfig = &Config{
Frontends: []conf.NamedMapConfig{
{
Name: fh.Name,
Config: conf.MapConfig{},
},
{
Name: fu.Name,
Config: conf.MapConfig{},
},
},
Storage: conf.NamedMapConfig{
Name: sm.Name,
Config: conf.MapConfig{},
},
PreHooks: []conf.NamedMapConfig{},
PostHooks: []conf.NamedMapConfig{},
}
// ParseConfigFile returns a new Config given the path to a YAML
// configuration file.
//
+17 -3
View File
@@ -18,23 +18,37 @@ const (
logPrettyArg = "logPretty"
logColorsArg = "logColored"
configArg = "config"
quickArg = "quick"
)
func main() {
var s Server
var err error
logOut := flag.String(logOutArg, "stderr", "output for logging, might be 'stderr', 'stdout' or file path")
logLevel := flag.String(logLevelArg, "warn", "logging level: trace, debug, info, warn, error, fatal, panic")
logPretty := flag.Bool(logPrettyArg, false, "enable log pretty print. used only if 'logOut' set to 'stdout' or 'stderr'. if not set, log outputs json")
//goland:noinspection GoBoolExpressions
logColored := flag.Bool(logColorsArg, runtime.GOOS == "windows", "enable log coloring. used only if set 'logPretty'")
configPath := flag.String(configArg, "/etc/mochi.yaml", "location of configuration file")
quickStart := flag.Bool(quickArg, false, "start tracker with default configuration (all frontends, in-memory store, no hooks)")
flag.Parse()
if err := l.ConfigureLogger(*logOut, *logLevel, *logPretty, *logColored); err != nil {
if err = l.ConfigureLogger(*logOut, *logLevel, *logPretty, *logColored); err != nil {
log.Fatal("unable to configure logger: ", err)
}
if err := s.Run(*configPath); err != nil {
var cfg *Config
if *quickStart {
cfg = QuickConfig
} else {
cfg, err = ParseConfigFile(*configPath)
if err != nil {
log.Fatal("unable to read config file: ", err)
}
}
var s Server
if err = s.Run(cfg); err != nil {
log.Fatal("unable to start server: ", err)
}
defer s.Shutdown()
+1 -6
View File
@@ -25,12 +25,7 @@ type Server struct {
// Run begins an instance of Conf.
// It is optional to provide an instance of the peer store to avoid the
// creation of a new one.
func (r *Server) Run(configFilePath string) error {
cfg, err := ParseConfigFile(configFilePath)
if err != nil {
return fmt.Errorf("failed to read config: %w", err)
}
func (r *Server) Run(cfg *Config) (err error) {
if len(cfg.MetricsAddr) > 0 {
log.Info().Str("address", cfg.MetricsAddr).Msg("starting metrics server")
r.frontends = append(r.frontends, metrics.NewServer(cfg.MetricsAddr))
+12 -4
View File
@@ -34,10 +34,14 @@ frontends:
tls_key_path: ""
# Enable SO_REUSEPORT to allow starting multiple mochi instances with the same HTTP(S) port.
# You can also use this parameter to define two or mote listeners for the same address and port,
# and (possibly) increase throughput.
# You can also use this parameter to define two or more listeners or separate processes
# for the same address and port, and (possibly) increase throughput (faster queue processing
# because of multiple 'workers').
reuse_port: true
# Number of connection listeners to start. See reuse_port, default is 1.
workers: 1
# The timeout durations for HTTP requests.
read_timeout: 5s
write_timeout: 5s
@@ -110,10 +114,14 @@ frontends:
addr: "0.0.0.0:6969"
# Enable SO_REUSEPORT to allow starting multiple mochi instances with the same UDP port.
# You can also use this parameter to define two or mote listeners for the same address and port,
# and (a little) increase throughput (faster queue processing because of multiple 'workers').
# You can also use this parameter to define two or more listeners or separate processes
# for the same address and port, and (a little) increase throughput (faster queue processing
# because of multiple 'workers').
reuse_port: true
# Number of connection listeners to start. See reuse_port, default is 1.
workers: 1
# The leeway for a timestamp on a connection ID.
max_clock_skew: 10s
+2
View File
@@ -14,6 +14,7 @@ frontends:
tls_cert_path: ""
tls_key_path: ""
reuse_port: true
workers: 1
read_timeout: 5s
write_timeout: 5s
enable_keepalive: false
@@ -36,6 +37,7 @@ frontends:
config:
addr: "0.0.0.0:6969"
reuse_port: true
workers: 1
max_clock_skew: 10s
private_key: "paste a random string here that will be used to hmac connection IDs"
enable_request_timing: false
+2
View File
@@ -14,6 +14,7 @@ frontends:
tls_cert_path: ""
tls_key_path: ""
reuse_port: true
workers: 1
read_timeout: 5s
write_timeout: 5s
enable_keepalive: false
@@ -36,6 +37,7 @@ frontends:
config:
addr: "0.0.0.0:6969"
reuse_port: true
workers: 1
max_clock_skew: 10s
private_key: "paste a random string here that will be used to hmac connection IDs"
enable_request_timing: false
+33
View File
@@ -3,8 +3,10 @@
package frontend
import (
"errors"
"fmt"
"io"
"strings"
"sync"
"github.com/sot-tech/mochi/middleware"
@@ -71,3 +73,34 @@ func NewFrontends(configs []conf.NamedMapConfig, logic *middleware.Logic) (fs []
}
return
}
// CloseGroup simultaneously calls Close for each non-nil
// array element and combines non-nil errors into one
func CloseGroup(cls []io.Closer) (err error) {
l := len(cls)
errs := make([]error, l)
wg := sync.WaitGroup{}
wg.Add(l)
for i, c := range cls {
if c != nil {
go func(i int, c io.Closer) {
defer wg.Done()
if e := c.Close(); e != nil {
errs[i] = e
}
}(i, c)
}
}
wg.Wait()
sb := strings.Builder{}
for _, e := range errs {
if e != nil {
sb.WriteString(e.Error())
sb.WriteString("; ")
}
}
if sb.Len() > 0 {
err = errors.New(sb.String())
}
return
}
+95 -42
View File
@@ -6,8 +6,10 @@ import (
"context"
"crypto/tls"
"errors"
"io"
"net/http"
"net/netip"
"sync"
"time"
"github.com/julienschmidt/httprouter"
@@ -20,14 +22,16 @@ import (
"github.com/sot-tech/mochi/pkg/metrics"
)
// Name - registered name of the frontend
const Name = "http"
var (
logger = log.NewLogger("frontend/http")
errTLSNotProvided = errors.New("tls certificate/key not provided")
errRoutesNotProvided = errors.New("routes not provided")
logger = log.NewLogger("frontend/http")
errTLSNotProvided = errors.New("tls certificate/key not provided")
)
func init() {
frontend.RegisterBuilder("http", NewFrontend)
frontend.RegisterBuilder(Name, NewFrontend)
}
// Config represents all configurable options for an HTTP BitTorrent Frontend
@@ -44,15 +48,17 @@ type Config struct {
ParseOptions
}
const defaultIdleTimeout = 30 * time.Second
const (
defaultIdleTimeout = 30 * time.Second
defaultAnnounceRoute = "/announce"
defaultScrapeRoute = "/scrape"
)
// Validate sanity checks values set in a config and returns a new config with
// default values replacing anything that is invalid.
func (cfg Config) Validate() (validCfg Config, err error) {
validCfg = cfg
if validCfg.ListenOptions, err = cfg.ListenOptions.Validate(); err != nil {
return
}
validCfg.ListenOptions = cfg.ListenOptions.Validate(false, logger)
if cfg.UseTLS && (len(cfg.TLSCertPath) == 0 || len(cfg.TLSKeyPath) == 0) {
err = errTLSNotProvided
return
@@ -68,14 +74,44 @@ func (cfg Config) Validate() (validCfg Config, err error) {
Msg("falling back to default configuration")
}
}
validCfg.ParseOptions.ParseOptions = cfg.ParseOptions.ParseOptions.Validate()
if len(cfg.AnnounceRoutes) == 0 {
validCfg.AnnounceRoutes = []string{defaultAnnounceRoute}
logger.Warn().
Str("name", "AnnounceRoutes").
Strs("provided", cfg.AnnounceRoutes).
Strs("default", validCfg.AnnounceRoutes).
Msg("falling back to default configuration")
}
if len(cfg.ScrapeRoutes) == 0 {
validCfg.ScrapeRoutes = []string{defaultScrapeRoute}
logger.Warn().
Str("name", "ScrapeRoutes").
Strs("provided", cfg.ScrapeRoutes).
Strs("default", validCfg.ScrapeRoutes).
Msg("falling back to default configuration")
}
validCfg.ParseOptions.ParseOptions = cfg.ParseOptions.ParseOptions.Validate(logger)
return
}
// httpServer replaces http.Close method with http.Shutdown
type httpServer struct {
*http.Server
}
func (c httpServer) Close() (err error) {
if c.Server != nil {
err = c.Shutdown(context.Background())
}
return
}
type httpFE struct {
srv *http.Server
servers []httpServer
logic *middleware.Logic
collectTimings bool
onceCloser sync.Once
ParseOptions
}
@@ -89,31 +125,37 @@ func NewFrontend(c conf.MapConfig, logic *middleware.Logic) (frontend.Frontend,
if cfg, err = cfg.Validate(); err != nil {
return nil, err
}
if len(cfg.AnnounceRoutes) < 1 || len(cfg.ScrapeRoutes) < 1 {
return nil, errRoutesNotProvided
}
f := &httpFE{
logic: logic,
srv: &http.Server{
ReadTimeout: cfg.ReadTimeout,
ReadHeaderTimeout: cfg.ReadTimeout,
WriteTimeout: cfg.WriteTimeout,
IdleTimeout: cfg.IdleTimeout,
},
logic: logic,
servers: make([]httpServer, cfg.Workers),
collectTimings: cfg.EnableRequestTiming,
ParseOptions: cfg.ParseOptions,
}
for i := range f.servers {
f.servers[i] = httpServer{
&http.Server{
ReadTimeout: cfg.ReadTimeout,
ReadHeaderTimeout: cfg.ReadTimeout,
WriteTimeout: cfg.WriteTimeout,
IdleTimeout: cfg.IdleTimeout,
},
}
}
// If TLS is enabled, create a key pair.
if cfg.UseTLS {
var cert tls.Certificate
if cert, err = tls.LoadX509KeyPair(cfg.TLSCertPath, cfg.TLSKeyPath); err != nil {
return nil, err
}
f.srv.TLSConfig = &tls.Config{
Certificates: []tls.Certificate{cert},
MinVersion: tls.VersionTLS12,
certs := []tls.Certificate{cert}
for i := range f.servers {
f.servers[i].TLSConfig = &tls.Config{
Certificates: certs,
MinVersion: tls.VersionTLS12,
}
}
}
@@ -128,30 +170,41 @@ func NewFrontend(c conf.MapConfig, logic *middleware.Logic) (frontend.Frontend,
router.GET(route, f.ping)
router.HEAD(route, f.ping)
}
f.srv.Handler = router
f.srv.SetKeepAlivesEnabled(cfg.EnableKeepAlive)
go func() {
ln, err := cfg.ListenTCP()
if err == nil {
if f.srv.TLSConfig == nil {
err = f.srv.Serve(ln)
} else {
err = f.srv.ServeTLS(ln, "", "")
}
}
if !errors.Is(err, http.ErrServerClosed) {
logger.Fatal().Err(err).Msg("server failed")
}
}()
for _, srv := range f.servers {
srv.Handler = router
srv.SetKeepAlivesEnabled(cfg.EnableKeepAlive)
go runServer(srv, &cfg)
}
return f, nil
}
// Close provides a thread-safe way to shut down a currently running Frontend.
func (f *httpFE) Close() error {
return f.srv.Shutdown(context.Background())
func runServer(s httpServer, cfg *Config) {
ln, err := cfg.ListenTCP()
if err == nil {
if s.TLSConfig == nil {
err = s.Serve(ln)
} else {
err = s.ServeTLS(ln, "", "")
}
}
if !errors.Is(err, http.ErrServerClosed) {
logger.Fatal().Err(err).Msg("server failed")
}
}
// Close provides a thread-safe way to gracefully shut down a currently running Frontend.
func (f *httpFE) Close() (err error) {
f.onceCloser.Do(func() {
cls := make([]io.Closer, len(f.servers))
for i, s := range f.servers {
cls[i] = s
}
err = frontend.CloseGroup(cls)
})
return
}
func httpParamsToRouteParams(in httprouter.Params) (out bittorrent.RouteParams) {
+24 -12
View File
@@ -6,23 +6,22 @@ import (
"time"
"github.com/libp2p/go-reuseport"
"github.com/sot-tech/mochi/pkg/log"
)
const (
defaultReadTimeout = 2 * time.Second
defaultWriteTimeout = 2 * time.Second
defaultReadTimeout = 2 * time.Second
defaultWriteTimeout = 2 * time.Second
defaultListenAddress = ":6969"
)
var (
// ErrAddressNotProvided returned if listen address not provided in configuration
ErrAddressNotProvided = errors.New("address not provided")
errUnexpectedListenerType = errors.New("unexpected listener type")
)
var errUnexpectedListenerType = errors.New("unexpected listener type")
// ListenOptions is the base configuration which may be used in net listeners
type ListenOptions struct {
Addr string
ReusePort bool `cfg:"reuse_port"`
ReusePort bool `cfg:"reuse_port"`
Workers uint
ReadTimeout time.Duration `cfg:"read_timeout"`
WriteTimeout time.Duration `cfg:"write_timeout"`
EnableRequestTiming bool `cfg:"enable_request_timing"`
@@ -30,11 +29,24 @@ type ListenOptions struct {
// Validate checks if listen address provided and sets default
// timeout options if needed
func (lo ListenOptions) Validate() (validOptions ListenOptions, err error) {
func (lo ListenOptions) Validate(ignoreTimeouts bool, logger *log.Logger) (validOptions ListenOptions) {
validOptions = lo
if len(lo.Addr) == 0 {
err = ErrAddressNotProvided
} else {
validOptions.Addr = defaultListenAddress
logger.Warn().
Str("name", "Addr").
Str("provided", lo.Addr).
Str("default", validOptions.Addr).
Msg("falling back to default configuration")
}
if lo.Workers == 0 {
validOptions.Workers = 1
}
if lo.Workers > 1 && !lo.ReusePort {
validOptions.ReusePort = true
logger.Warn().Msg("forcibly enabling ReusePort because Workers > 1")
}
if !ignoreTimeouts {
if lo.ReadTimeout <= 0 {
validOptions.ReadTimeout = defaultReadTimeout
logger.Warn().
@@ -111,7 +123,7 @@ type ParseOptions struct {
// Validate sanity checks values set in a config and returns a new config with
// default values replacing anything that is invalid.
func (op ParseOptions) Validate() ParseOptions {
func (op ParseOptions) Validate(logger *log.Logger) ParseOptions {
valid := op
if op.MaxNumWant <= 0 {
valid.MaxNumWant = defaultMaxNumWant
+74 -38
View File
@@ -4,16 +4,29 @@ import (
"crypto/hmac"
"encoding/binary"
"hash"
"math/rand"
"net/netip"
"time"
"github.com/minio/sha256-simd"
"github.com/cespare/xxhash/v2"
"github.com/sot-tech/mochi/pkg/log"
"github.com/sot-tech/mochi/pkg/xorshift"
)
// ttl is the duration a connection ID should be valid according to BEP 15.
const ttl = 2 * time.Minute
var ttl = int64(2 * time.Minute)
const (
// length of connection ID
connIDLen = 8
// uint64 length + 1 byte salt
buffLen = 9
// 16 bytes enough for hashes with output length up to 128bit
scratchLen = 16
// length of HMAC in bytes to place it in connection ID
hmacLen = 5
)
// A ConnectionIDGenerator is a reusable generator and validator for connection
// IDs as described in BEP 15.
@@ -33,17 +46,31 @@ type ConnectionIDGenerator struct {
// It will be overwritten by subsequent calls to Generate.
connID []byte
// buffer for HMAC input
buff []byte
// scratch is a 32-byte slice that is used as a scratchpad for the generated
// HMACs.
// HMACs to increase hash performance.
scratch []byte
// the leeway for a timestamp on a connection ID.
maxClockSkew int64
// PRNG footprint holder
s uint64
}
// NewConnectionIDGenerator creates a new connection ID generator.
func NewConnectionIDGenerator(key string) *ConnectionIDGenerator {
func NewConnectionIDGenerator(key []byte, maxClockSkew time.Duration) *ConnectionIDGenerator {
return &ConnectionIDGenerator{
mac: hmac.New(sha256.New, []byte(key)),
connID: make([]byte, 8),
scratch: make([]byte, 32),
mac: hmac.New(func() hash.Hash {
return xxhash.New()
}, key),
connID: make([]byte, connIDLen),
buff: make([]byte, buffLen),
scratch: make([]byte, scratchLen),
maxClockSkew: int64(maxClockSkew),
s: rand.Uint64(),
}
}
@@ -52,60 +79,69 @@ func NewConnectionIDGenerator(key string) *ConnectionIDGenerator {
// it after getting a generator from a pool.
func (g *ConnectionIDGenerator) reset() {
g.mac.Reset()
g.connID = g.connID[:8]
g.connID = g.connID[:connIDLen]
g.buff = g.buff[:buffLen]
g.scratch = g.scratch[:0]
}
// Generate generates an 8-byte connection ID as described in BEP 15 for the
// given IP and the current time.
//
// The first 4 bytes of the connection identifier is a unix timestamp and the
// last 4 bytes are a truncated HMAC token created from the aforementioned
// unix timestamp and the source IP address of the UDP packet.
// The first byte is random salt, next 2 bytes - truncated unix timestamp
// when ID was generated, last 5 bytes are a truncated HMAC token created
// from salt (1 byte), full unix timestamp (8 bytes) and source IP (4/16 bytes).
//
// Salt used to mitigate generation same MAC if there are several clients
// from same IP sent requests within one second.
//
// Truncated HMAC is known to be safe for 2^(-n) where n is the size in bits
// of the truncated HMAC token. In this use case we have 32 bits, thus a
// of the truncated HMAC token. In this use case we have 40 bits, thus a
// forgery probability of approximately 1 in 4 billion.
//
// The generated ID is written to g.connID, which is also returned. g.connID
// The generated ID is written to g.buffer, which is also returned. g.buffer
// will be reused, so it must not be referenced after returning the generator
// to a pool and will be overwritten be subsequent calls to Generate!
func (g *ConnectionIDGenerator) Generate(ip netip.Addr, now time.Time) []byte {
func (g *ConnectionIDGenerator) Generate(ip netip.Addr, now time.Time) (out []byte) {
g.reset()
var r uint64
r, g.s = xorshift.XorShift64S(g.s)
g.buff[0] = byte(r)
binary.BigEndian.PutUint64(g.buff[1:], uint64(now.Unix()))
g.mac.Write(g.buff)
g.mac.Write(ip.AsSlice())
binary.BigEndian.PutUint32(g.connID, uint32(now.Unix()))
g.mac.Write(g.connID[:4])
ipBytes, _ := ip.MarshalBinary()
g.mac.Write(ipBytes)
g.scratch = g.mac.Sum(g.scratch)
copy(g.connID[4:8], g.scratch[:4])
g.connID[0], g.connID[1], g.connID[2] = g.buff[0], g.buff[7], g.buff[8]
copy(g.connID[connIDLen-hmacLen:], g.scratch[:hmacLen])
log.Debug().
Stringer("ip", ip).
Time("now", now).
Bytes("connID", g.connID).
Hex("connID", g.connID).
Msg("generated connection ID")
return g.connID
return g.connID[:connIDLen]
}
// Validate validates the given connection ID for an IP and the current time.
func (g *ConnectionIDGenerator) Validate(connectionID []byte, ip netip.Addr, now time.Time, maxClockSkew time.Duration) bool {
ts := time.Unix(int64(binary.BigEndian.Uint32(connectionID[:4])), 0)
func (g *ConnectionIDGenerator) Validate(connectionID []byte, ip netip.Addr, now time.Time) bool {
g.reset()
nowTS := now.Unix()
g.buff[0] = connectionID[0]
// connectionID contains only 2 bytes of timestamp, so we clean little 16 bits to place it and rehash.
// We will provide restored full timestamp respectively to current timestamp,
// 2 bytes should be enough to avoid collisions within ~18 hours from same IP.
ts := nowTS&((^int64(0)>>16)<<16) | int64(connectionID[1])<<8 | int64(connectionID[2])
binary.BigEndian.PutUint64(g.buff[1:], uint64(ts))
g.mac.Write(g.buff)
g.mac.Write(ip.AsSlice())
g.scratch = g.mac.Sum(g.scratch)
res := hmac.Equal(g.scratch[:hmacLen], connectionID[connIDLen-hmacLen:connIDLen])
// ts-skew < now < ts+ttl+skew
res = ts-g.maxClockSkew < nowTS && res
res = nowTS < ts+ttl+g.maxClockSkew && res
log.Debug().
Stringer("ip", ip).
Time("ts", ts).Time("now", now).
Bytes("connID", g.connID).
Hex("connID", connectionID).
Bool("result", res).
Msg("validating connection ID")
if now.After(ts.Add(ttl)) || ts.After(now.Add(maxClockSkew)) {
return false
}
g.reset()
g.mac.Write(connectionID[:4])
ipBytes, _ := ip.MarshalBinary()
g.mac.Write(ipBytes)
g.scratch = g.mac.Sum(g.scratch)
return hmac.Equal(g.scratch[:4], connectionID[4:])
return res
}
+42 -38
View File
@@ -4,65 +4,69 @@ import (
"crypto/hmac"
"encoding/binary"
"fmt"
"hash"
"math/rand"
"net/netip"
"sync"
"testing"
"time"
"github.com/minio/sha256-simd"
"github.com/stretchr/testify/require"
"github.com/cespare/xxhash/v2"
"github.com/sot-tech/mochi/pkg/log"
_ "github.com/sot-tech/mochi/pkg/randseed"
"github.com/stretchr/testify/require"
)
var golden = []struct {
createdAt int64
now int64
ip string
key string
key []byte
valid bool
}{
{0, 1, "127.0.0.1", "", true},
{0, 420420, "127.0.0.1", "", false},
{0, 0, "::1", "", true},
{0, 1, "127.0.0.1", []byte(""), true},
{0, 420420, "127.0.0.1", []byte(""), false},
{0, 0, "::1", []byte(""), true},
}
// NewConnectionID creates an 8-byte connection identifier for UDP packets as
// described by BEP 15.
// This is a wrapper around creating a new ConnectionIDGenerator and generating
// an ID. It is recommended to use the generator for performance.
func NewConnectionID(ip netip.Addr, now time.Time, key string) []byte {
return NewConnectionIDGenerator(key).Generate(ip, now)
func NewConnectionID(ip netip.Addr, now time.Time, key []byte) []byte {
return NewConnectionIDGenerator(key, 0).Generate(ip, now)
}
// ValidConnectionID determines whether a connection identifier is legitimate.
// This is a wrapper around creating a new ConnectionIDGenerator and validating
// the ID. It is recommended to use the generator for performance.
func ValidConnectionID(connectionID []byte, ip netip.Addr, now time.Time, maxClockSkew time.Duration, key string) bool {
return NewConnectionIDGenerator(key).Validate(connectionID, ip, now, maxClockSkew)
func ValidConnectionID(connectionID []byte, ip netip.Addr, now time.Time, maxClockSkew time.Duration, key []byte) bool {
return NewConnectionIDGenerator(key, maxClockSkew).Validate(connectionID, ip, now)
}
// simpleNewConnectionID generates a new connection ID the explicit way.
// This is used to verify correct behaviour of the generator.
func simpleNewConnectionID(ip netip.Addr, now time.Time, key string) []byte {
buf := make([]byte, 8)
binary.BigEndian.PutUint32(buf, uint32(now.Unix()))
mac := hmac.New(sha256.New, []byte(key))
mac.Write(buf[:4])
ipBytes, _ := ip.MarshalBinary()
mac.Write(ipBytes)
macBytes := mac.Sum(nil)[:4]
copy(buf[4:], macBytes)
func simpleNewConnectionID(ip netip.Addr, now time.Time, key []byte) []byte {
buffer := make([]byte, 9)
mac := hmac.New(func() hash.Hash {
return xxhash.New()
}, key)
buffer[0] = byte(rand.Int())
binary.BigEndian.PutUint64(buffer[1:], uint64(now.Unix()))
mac.Write(buffer)
mac.Write(ip.AsSlice())
buffer[1], buffer[2] = buffer[7], buffer[8]
copy(buffer[3:8], mac.Sum(nil))
buffer = buffer[:8]
// this is just in here because logging impacts performance and we benchmark
// this version too.
log.Debug().
Stringer("ip", ip).
Time("now", now).
Bytes("connID", buf).
Msg("manually generated connection ID")
return buf
Time("ts", now).
Hex("connID", buffer).
Msg("generated connection ID")
return buffer
}
func TestVerification(t *testing.T) {
@@ -82,7 +86,7 @@ func TestGeneration(t *testing.T) {
t.Run(fmt.Sprintf("%s created at %d", tt.ip, tt.createdAt), func(t *testing.T) {
want := simpleNewConnectionID(netip.MustParseAddr(tt.ip), time.Unix(tt.createdAt, 0), tt.key)
got := NewConnectionID(netip.MustParseAddr(tt.ip), time.Unix(tt.createdAt, 0), tt.key)
require.Equal(t, want, got)
require.NotEqual(t, want, got) // IDs should NOT be equal because of salt
})
}
}
@@ -93,11 +97,11 @@ func TestReuseGeneratorGenerate(t *testing.T) {
cid := NewConnectionID(netip.MustParseAddr(tt.ip), time.Unix(tt.createdAt, 0), tt.key)
require.Len(t, cid, 8)
gen := NewConnectionIDGenerator(tt.key)
gen := NewConnectionIDGenerator(tt.key, 0)
for i := 0; i < 3; i++ {
connID := gen.Generate(netip.MustParseAddr(tt.ip), time.Unix(tt.createdAt, 0))
require.Equal(t, cid, connID)
require.NotEqual(t, cid, connID) // IDs should NOT be equal because of salt
}
})
}
@@ -106,10 +110,10 @@ func TestReuseGeneratorGenerate(t *testing.T) {
func TestReuseGeneratorValidate(t *testing.T) {
for _, tt := range golden {
t.Run(fmt.Sprintf("%s created at %d verified at %d", tt.ip, tt.createdAt, tt.now), func(t *testing.T) {
gen := NewConnectionIDGenerator(tt.key)
gen := NewConnectionIDGenerator(tt.key, time.Minute)
cid := gen.Generate(netip.MustParseAddr(tt.ip), time.Unix(tt.createdAt, 0))
for i := 0; i < 3; i++ {
got := gen.Validate(cid, netip.MustParseAddr(tt.ip), time.Unix(tt.now, 0), time.Minute)
got := gen.Validate(cid, netip.MustParseAddr(tt.ip), time.Unix(tt.now, 0))
if got != tt.valid {
t.Errorf("expected validity: %t got validity: %t", tt.valid, got)
}
@@ -120,7 +124,7 @@ func TestReuseGeneratorValidate(t *testing.T) {
func BenchmarkSimpleNewConnectionID(b *testing.B) {
ip := netip.MustParseAddr("127.0.0.1")
key := "some random string that is hopefully at least this long"
key := []byte("some random string that is hopefully at least this long")
createdAt := time.Now()
b.RunParallel(func(pb *testing.PB) {
@@ -137,7 +141,7 @@ func BenchmarkSimpleNewConnectionID(b *testing.B) {
func BenchmarkNewConnectionID(b *testing.B) {
ip := netip.MustParseAddr("127.0.0.1")
key := "some random string that is hopefully at least this long"
key := []byte("some random string that is hopefully at least this long")
createdAt := time.Now()
b.RunParallel(func(pb *testing.PB) {
@@ -154,12 +158,12 @@ func BenchmarkNewConnectionID(b *testing.B) {
func BenchmarkConnectionIDGenerator_Generate(b *testing.B) {
ip := netip.MustParseAddr("127.0.0.1")
key := "some random string that is hopefully at least this long"
key := []byte("some random string that is hopefully at least this long")
createdAt := time.Now()
pool := &sync.Pool{
New: func() any {
return NewConnectionIDGenerator(key)
return NewConnectionIDGenerator(key, 0)
},
}
@@ -176,7 +180,7 @@ func BenchmarkConnectionIDGenerator_Generate(b *testing.B) {
func BenchmarkValidConnectionID(b *testing.B) {
ip := netip.MustParseAddr("127.0.0.1")
key := "some random string that is hopefully at least this long"
key := []byte("some random string that is hopefully at least this long")
createdAt := time.Now()
cid := NewConnectionID(ip, createdAt, key)
@@ -191,20 +195,20 @@ func BenchmarkValidConnectionID(b *testing.B) {
func BenchmarkConnectionIDGenerator_Validate(b *testing.B) {
ip := netip.MustParseAddr("127.0.0.1")
key := "some random string that is hopefully at least this long"
key := []byte("some random string that is hopefully at least this long")
createdAt := time.Now()
cid := NewConnectionID(ip, createdAt, key)
pool := &sync.Pool{
New: func() any {
return NewConnectionIDGenerator(key)
return NewConnectionIDGenerator(key, 10*time.Second)
},
}
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
gen := pool.Get().(*ConnectionIDGenerator)
if !gen.Validate(cid, ip, createdAt, 10*time.Second) {
if !gen.Validate(cid, ip, createdAt) {
b.FailNow()
}
pool.Put(gen)
+80 -54
View File
@@ -7,6 +7,7 @@ import (
"context"
"encoding/binary"
"errors"
"io"
"math/rand"
"net"
"net/netip"
@@ -23,13 +24,21 @@ import (
"github.com/sot-tech/mochi/pkg/timecache"
)
const (
// Name - registered name of the frontend
Name = "udp"
defaultKeyLen = 32
maxAllowedClockSkew = 30 * time.Second
defaultMaxClockSkew = 10 * time.Second
)
var (
logger = log.NewLogger("frontend/udp")
allowedGeneratedPrivateKeyRunes = []rune("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ1234567890")
)
func init() {
frontend.RegisterBuilder("udp", NewFrontend)
frontend.RegisterBuilder(Name, NewFrontend)
}
// Config represents all the configurable options for a UDP BitTorrent
@@ -43,17 +52,13 @@ type Config struct {
// Validate sanity checks values set in a config and returns a new config with
// default values replacing anything that is invalid.
func (cfg Config) Validate() (validCfg Config, err error) {
if len(cfg.Addr) == 0 {
err = frontend.ErrAddressNotProvided
return
}
func (cfg Config) Validate() (validCfg Config) {
validCfg = cfg
validCfg.ListenOptions = cfg.ListenOptions.Validate(true, logger)
// Generate a private key if one isn't provided by the user.
if cfg.PrivateKey == "" {
pkeyRunes := make([]rune, 64)
pkeyRunes := make([]rune, defaultKeyLen)
for i := range pkeyRunes {
pkeyRunes[i] = allowedGeneratedPrivateKeyRunes[rand.Intn(len(allowedGeneratedPrivateKeyRunes))]
}
@@ -62,23 +67,35 @@ func (cfg Config) Validate() (validCfg Config, err error) {
logger.Warn().
Str("name", "PrivateKey").
Str("provided", "").
Str("key", validCfg.PrivateKey).
Str("default", validCfg.PrivateKey).
Msg("falling back to default configuration")
}
validCfg.ParseOptions = cfg.ParseOptions.Validate()
// ABS
sb := cfg.MaxClockSkew >> 63
validCfg.MaxClockSkew = (cfg.MaxClockSkew ^ sb) + (sb & 1)
if validCfg.MaxClockSkew == 0 || validCfg.MaxClockSkew > maxAllowedClockSkew {
validCfg.MaxClockSkew = defaultMaxClockSkew
logger.Warn().
Str("name", "MaxClockSkew").
Dur("provided", cfg.MaxClockSkew).
Dur("default", validCfg.MaxClockSkew).
Msg("falling back to default configuration")
}
validCfg.ParseOptions = cfg.ParseOptions.Validate(logger)
return
}
// udpFE holds the state of a UDP BitTorrent Frontend.
type udpFE struct {
socket *net.UDPConn
sockets []*net.UDPConn
closing chan any
wg sync.WaitGroup
genPool *sync.Pool
logic *middleware.Logic
maxClockSkew time.Duration
collectTimings bool
ctxCancel context.CancelFunc
onceCloser sync.Once
@@ -92,47 +109,56 @@ func NewFrontend(c conf.MapConfig, logic *middleware.Logic) (frontend.Frontend,
if err = c.Unmarshal(&cfg); err != nil {
return nil, err
}
if cfg, err = cfg.Validate(); err != nil {
return nil, err
}
cfg = cfg.Validate()
pKey := []byte(cfg.PrivateKey)
f := &udpFE{
sockets: make([]*net.UDPConn, cfg.Workers),
closing: make(chan any),
logic: logic,
maxClockSkew: cfg.MaxClockSkew,
collectTimings: cfg.EnableRequestTiming,
ParseOptions: cfg.ParseOptions,
genPool: &sync.Pool{
New: func() any {
return NewConnectionIDGenerator(cfg.PrivateKey)
return NewConnectionIDGenerator(pKey, cfg.MaxClockSkew)
},
},
}
if f.socket, err = cfg.ListenUDP(); err == nil {
var ctx context.Context
ctx, f.ctxCancel = context.WithCancel(context.Background())
f.wg.Add(1)
go func(ctx context.Context) {
if err := f.serve(ctx); err != nil {
logger.Fatal().Err(err).Msg("server failed")
}
}(ctx)
var ctx context.Context
ctx, f.ctxCancel = context.WithCancel(context.Background())
for i := range f.sockets {
if f.sockets[i], err = cfg.ListenUDP(); err == nil {
f.wg.Add(1)
go func(socket *net.UDPConn, ctx context.Context) {
if err := f.serve(ctx, socket); err != nil {
logger.Fatal().Err(err).Msg("server failed")
}
}(f.sockets[i], ctx)
}
}
if err != nil {
_ = f.Close()
}
return f, err
}
// Close provides a thread-safe way to shut down a currently running Frontend.
func (t *udpFE) Close() (err error) {
t.onceCloser.Do(func() {
close(t.closing)
if t.socket != nil {
t.ctxCancel()
_ = t.socket.SetReadDeadline(time.Now())
t.wg.Wait()
err = t.socket.Close()
func (f *udpFE) Close() (err error) {
f.onceCloser.Do(func() {
close(f.closing)
f.ctxCancel()
cls := make([]io.Closer, 0, len(f.sockets))
now := time.Now()
for _, s := range f.sockets {
if s != nil {
_ = s.SetDeadline(now)
cls = append(cls, s)
}
}
f.wg.Wait()
err = frontend.CloseGroup(cls)
})
return
@@ -140,14 +166,14 @@ func (t *udpFE) Close() (err error) {
// serve blocks while listening and serving UDP BitTorrent requests
// until Stop() is called or an error is returned.
func (t *udpFE) serve(ctx context.Context) error {
func (f *udpFE) serve(ctx context.Context, socket *net.UDPConn) error {
pool := bytepool.NewBytePool(2048)
defer t.wg.Done()
defer f.wg.Done()
for {
// Check to see if we need shutdown.
select {
case <-t.closing:
case <-f.closing:
log.Debug().Msg("serve received shutdown signal")
return nil
default:
@@ -155,7 +181,7 @@ func (t *udpFE) serve(ctx context.Context) error {
// Read a UDP packet into a reusable buffer.
buffer := pool.Get()
n, addrPort, err := t.socket.ReadFromUDPAddrPort(*buffer)
n, addrPort, err := socket.ReadFromUDPAddrPort(*buffer)
if err != nil {
pool.Put(buffer)
var netErr net.Error
@@ -172,22 +198,22 @@ func (t *udpFE) serve(ctx context.Context) error {
continue
}
t.wg.Add(1)
f.wg.Add(1)
go func() {
defer t.wg.Done()
defer f.wg.Done()
defer pool.Put(buffer)
// Handle the request.
addr := addrPort.Addr().Unmap()
var start time.Time
if t.collectTimings && metrics.Enabled() {
if f.collectTimings && metrics.Enabled() {
start = time.Now()
}
action, err := t.handleRequest(ctx,
action, err := f.handleRequest(ctx,
Request{(*buffer)[:n], addr},
ResponseWriter{t.socket, addrPort},
ResponseWriter{socket, addrPort},
)
if t.collectTimings && metrics.Enabled() {
if f.collectTimings && metrics.Enabled() {
recordResponseDuration(action, addr, err, time.Since(start))
}
}()
@@ -213,7 +239,7 @@ func (w ResponseWriter) Write(b []byte) (int, error) {
}
// handleRequest parses and responds to a UDP Request.
func (t *udpFE) handleRequest(ctx context.Context, r Request, w ResponseWriter) (actionName string, err error) {
func (f *udpFE) handleRequest(ctx context.Context, r Request, w ResponseWriter) (actionName string, err error) {
if len(r.Packet) < 16 {
// Malformed, no client packets are less than 16 bytes.
// We explicitly return nothing in case this is a DoS attempt.
@@ -227,12 +253,12 @@ func (t *udpFE) handleRequest(ctx context.Context, r Request, w ResponseWriter)
txID := r.Packet[12:16]
// get a connection ID generator/validator from the pool.
gen := t.genPool.Get().(*ConnectionIDGenerator)
defer t.genPool.Put(gen)
gen := f.genPool.Get().(*ConnectionIDGenerator)
defer f.genPool.Put(gen)
// If this isn't requesting a new connection ID and the connection ID is
// invalid, then fail.
if actionID != connectActionID && !gen.Validate(connID, r.IP, timecache.Now(), t.maxClockSkew) {
if actionID != connectActionID && !gen.Validate(connID, r.IP, timecache.Now()) {
err = errBadConnectionID
WriteError(w, txID, err)
return
@@ -254,7 +280,7 @@ func (t *udpFE) handleRequest(ctx context.Context, r Request, w ResponseWriter)
actionName = "announce"
var req *bittorrent.AnnounceRequest
req, err = ParseAnnounce(r, actionID == announceV6ActionID, t.ParseOptions)
req, err = ParseAnnounce(r, actionID == announceV6ActionID, f.ParseOptions)
if err != nil {
WriteError(w, txID, err)
return
@@ -262,7 +288,7 @@ func (t *udpFE) handleRequest(ctx context.Context, r Request, w ResponseWriter)
var resp *bittorrent.AnnounceResponse
ctx := bittorrent.InjectRouteParamsToContext(ctx, bittorrent.RouteParams{})
ctx, resp, err = t.logic.HandleAnnounce(ctx, req)
ctx, resp, err = f.logic.HandleAnnounce(ctx, req)
if err != nil {
WriteError(w, txID, err)
return
@@ -271,13 +297,13 @@ func (t *udpFE) handleRequest(ctx context.Context, r Request, w ResponseWriter)
WriteAnnounce(w, txID, resp, actionID == announceV6ActionID, r.IP.Is6())
ctx = bittorrent.RemapRouteParamsToBgContext(ctx)
go t.logic.AfterAnnounce(ctx, req, resp)
go f.logic.AfterAnnounce(ctx, req, resp)
case scrapeActionID:
actionName = "scrape"
var req *bittorrent.ScrapeRequest
req, err = ParseScrape(r, t.ParseOptions)
req, err = ParseScrape(r, f.ParseOptions)
if err != nil {
WriteError(w, txID, err)
return
@@ -285,7 +311,7 @@ func (t *udpFE) handleRequest(ctx context.Context, r Request, w ResponseWriter)
var resp *bittorrent.ScrapeResponse
ctx := bittorrent.InjectRouteParamsToContext(ctx, bittorrent.RouteParams{})
ctx, resp, err = t.logic.HandleScrape(ctx, req)
ctx, resp, err = f.logic.HandleScrape(ctx, req)
if err != nil {
WriteError(w, txID, err)
return
@@ -294,7 +320,7 @@ func (t *udpFE) handleRequest(ctx context.Context, r Request, w ResponseWriter)
WriteScrape(w, txID, resp)
ctx = bittorrent.RemapRouteParamsToBgContext(ctx)
go t.logic.AfterScrape(ctx, req, resp)
go f.logic.AfterScrape(ctx, req, resp)
default:
err = errUnknownAction
+15 -11
View File
@@ -3,12 +3,13 @@ module github.com/sot-tech/mochi
go 1.19
require (
code.cloudfoundry.org/go-diodes v0.0.0-20221102172008-608069b4b4a6
github.com/MicahParks/keyfunc v1.5.3
github.com/anacrolix/torrent v1.47.0
code.cloudfoundry.org/go-diodes v0.0.0-20221205211830-fe848ebc2e22
github.com/MicahParks/keyfunc v1.7.0
github.com/anacrolix/torrent v1.47.1-0.20221102120345-c63f7e1bd720
github.com/cespare/xxhash/v2 v2.2.0
github.com/go-redis/redis/v8 v8.11.5
github.com/golang-jwt/jwt/v4 v4.4.2
github.com/jackc/pgx/v5 v5.1.0
github.com/golang-jwt/jwt/v4 v4.4.3
github.com/jackc/pgx/v5 v5.2.0
github.com/julienschmidt/httprouter v1.3.0
github.com/libp2p/go-reuseport v0.2.0
github.com/minio/sha256-simd v1.0.0
@@ -20,18 +21,19 @@ require (
)
require (
github.com/anacrolix/dht/v2 v2.19.1 // indirect
github.com/anacrolix/dht/v2 v2.19.2 // indirect
github.com/anacrolix/log v0.13.2-0.20220711050817-613cb738ef30 // indirect
github.com/anacrolix/missinggo v1.3.0 // indirect
github.com/anacrolix/missinggo/v2 v2.7.1 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/bradfitz/iter v0.0.0-20191230175014-e8f45d346db8 // indirect
github.com/cespare/xxhash/v2 v2.1.2 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/fsnotify/fsnotify v1.6.0 // indirect
github.com/go-task/slim-sprig v0.0.0-20210107165309-348f09dbbbc0 // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/huandu/xstrings v1.3.3 // indirect
github.com/google/pprof v0.0.0-20221203041831-ce31453925ec // indirect
github.com/huandu/xstrings v1.4.0 // indirect
github.com/jackc/pgpassfile v1.0.0 // indirect
github.com/jackc/pgservicefile v0.0.0-20200714003250-2b9c44734f2b // indirect
github.com/jackc/puddle/v2 v2.1.2 // indirect
@@ -39,14 +41,16 @@ require (
github.com/mattn/go-colorable v0.1.13 // indirect
github.com/mattn/go-isatty v0.0.16 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect
github.com/onsi/ginkgo/v2 v2.5.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/client_model v0.3.0 // indirect
github.com/prometheus/common v0.37.0 // indirect
github.com/prometheus/procfs v0.8.0 // indirect
go.uber.org/atomic v1.10.0 // indirect
golang.org/x/crypto v0.2.0 // indirect
golang.org/x/crypto v0.3.0 // indirect
golang.org/x/sync v0.1.0 // indirect
golang.org/x/sys v0.2.0 // indirect
golang.org/x/text v0.4.0 // indirect
golang.org/x/sys v0.3.0 // indirect
golang.org/x/text v0.5.0 // indirect
golang.org/x/tools v0.3.0 // indirect
google.golang.org/protobuf v1.28.1 // indirect
)
+32 -21
View File
@@ -30,15 +30,15 @@ cloud.google.com/go/storage v1.5.0/go.mod h1:tpKbwo567HUNpVclU5sGELwQWBDZ8gh0Zeo
cloud.google.com/go/storage v1.6.0/go.mod h1:N7U0C8pVQ/+NIKOBQyamJIeKQKkZ+mxpohlUTyfDhBk=
cloud.google.com/go/storage v1.8.0/go.mod h1:Wv1Oy7z6Yz3DshWRJFhqM/UCfaWIRTdp0RXyy7KQOVs=
cloud.google.com/go/storage v1.10.0/go.mod h1:FLPqc6j+Ki4BU591ie1oL6qBQGu2Bl/tZ9ullr3+Kg0=
code.cloudfoundry.org/go-diodes v0.0.0-20221102172008-608069b4b4a6 h1:Hg6Rci5c2GQcEkSaY8kks8AGhVnAFIMdEmoQn8/Pju8=
code.cloudfoundry.org/go-diodes v0.0.0-20221102172008-608069b4b4a6/go.mod h1:NAaPdNjrSKVX+nRPy67obdqcyn9HGFh4UP/cwD23kkc=
code.cloudfoundry.org/go-diodes v0.0.0-20221205211830-fe848ebc2e22 h1:9YvghSI3zAz5rn9uEHhSV3gH1bQ2mo56lh/B5m1XRCo=
code.cloudfoundry.org/go-diodes v0.0.0-20221205211830-fe848ebc2e22/go.mod h1:5+eDDXV/io7PFGz0Hr2DsT7JNXtGTKInNUsvRfN1bU0=
crawshaw.io/iox v0.0.0-20181124134642-c51c3df30797/go.mod h1:sXBiorCo8c46JlQV3oXPKINnZ8mcqnye1EkVkqsectk=
crawshaw.io/sqlite v0.3.2/go.mod h1:igAO5JulrQ1DbdZdtVq48mnZUBAPOeFzer7VhDWNtW4=
dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo=
github.com/MicahParks/keyfunc v1.5.3 h1:Y+mv+kX3HtL7/dCXXzK4bIDBHg91eunnGGkdndO0RWk=
github.com/MicahParks/keyfunc v1.5.3/go.mod h1:IdnCilugA0O/99dW+/MkvlyrsX8+L8+x95xuVNtM5jw=
github.com/MicahParks/keyfunc v1.7.0 h1:LBd4tBj6FwGs2S4GXniQbgrG0PXzIldyGDKWch8slhg=
github.com/MicahParks/keyfunc v1.7.0/go.mod h1:IdnCilugA0O/99dW+/MkvlyrsX8+L8+x95xuVNtM5jw=
github.com/RoaringBitmap/roaring v0.4.7/go.mod h1:8khRDP4HmeXns4xIj9oGrKSz7XTQiJx2zgh7AcNke4w=
github.com/RoaringBitmap/roaring v0.4.17/go.mod h1:D3qVegWTmfCaX4Bl5CrBE9hfrSrrXIr8KVNvRsDi1NI=
github.com/RoaringBitmap/roaring v0.4.23/go.mod h1:D0gp8kJQgE1A4LQ5wFLggQEyvDi06Mq5mKs52e1TwOo=
@@ -49,8 +49,8 @@ github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuy
github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho=
github.com/anacrolix/dht/v2 v2.19.1 h1:V/UUGBASGYqYkSnmHJwX8uQmzkyhbgwE6jqcHKnNTD8=
github.com/anacrolix/dht/v2 v2.19.1/go.mod h1:3TU93c1s/oA8I/VH4m3CNP/BeKsiOGmo6HwfZBMTKUs=
github.com/anacrolix/dht/v2 v2.19.2 h1:U+lbEFYwa9yajIW2oE9jBxt0K8DlT10pSTYbXyfCxC8=
github.com/anacrolix/dht/v2 v2.19.2/go.mod h1:MctKM1HS5YYDb3F30NGJxLE+QPuqWoT5ReW/4jt8xew=
github.com/anacrolix/envpprof v0.0.0-20180404065416-323002cec2fa/go.mod h1:KgHhUaQMc8cC0+cEflSgCFNFbKwi5h54gqtVn8yhP7c=
github.com/anacrolix/envpprof v1.0.0/go.mod h1:KgHhUaQMc8cC0+cEflSgCFNFbKwi5h54gqtVn8yhP7c=
github.com/anacrolix/envpprof v1.1.0/go.mod h1:My7T5oSqVfEn4MD4Meczkw/f5lSIndGAKu/0SM/rkf4=
@@ -73,8 +73,8 @@ github.com/anacrolix/stm v0.2.0/go.mod h1:zoVQRvSiGjGoTmbM0vSLIiaKjWtNPeTvXUSdJQ
github.com/anacrolix/tagflag v0.0.0-20180109131632-2146c8d41bf0/go.mod h1:1m2U/K6ZT+JZG0+bdMK6qauP49QT4wE5pmhJXOKKCHw=
github.com/anacrolix/tagflag v1.0.0/go.mod h1:1m2U/K6ZT+JZG0+bdMK6qauP49QT4wE5pmhJXOKKCHw=
github.com/anacrolix/tagflag v1.1.0/go.mod h1:Scxs9CV10NQatSmbyjqmqmeQNwGzlNe0CMUMIxqHIG8=
github.com/anacrolix/torrent v1.47.0 h1:aDUnhQZ8+kfStLICHiXOGGYVFgDENK+kz4q96linyRg=
github.com/anacrolix/torrent v1.47.0/go.mod h1:SYPxEUjMwqhDr3kWGzyQLkFMuAb1bgJ57JRMpuD3ZzE=
github.com/anacrolix/torrent v1.47.1-0.20221102120345-c63f7e1bd720 h1:3cvS7QtC2hcrIOfT44fKDfPZUcXvkDf9SkI1mN6TB+s=
github.com/anacrolix/torrent v1.47.1-0.20221102120345-c63f7e1bd720/go.mod h1:SYPxEUjMwqhDr3kWGzyQLkFMuAb1bgJ57JRMpuD3ZzE=
github.com/apache/thrift v0.12.0/go.mod h1:cp2SuWMxlEZw2r+iP2GNCdIi4C1qmUzdZFSVb+bacwQ=
github.com/benbjohnson/immutable v0.2.0/go.mod h1:uc6OHo6PN2++n98KHLxW8ef4W42ylHiQSENghE1ezxI=
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
@@ -87,8 +87,9 @@ github.com/bradfitz/iter v0.0.0-20191230175014-e8f45d346db8 h1:GKTyiRCL6zVf5wWaq
github.com/bradfitz/iter v0.0.0-20191230175014-e8f45d346db8/go.mod h1:spo1JLcs67NmW1aVLEgtA8Yy1elc+X8y5SRW1sFW4Og=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/cespare/xxhash/v2 v2.1.2 h1:YRXhKfTDauu4ajMg1TPgFO5jnlC2HCbmLXMcTG5cbYE=
github.com/cespare/xxhash/v2 v2.1.2/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44=
github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI=
github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI=
github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU=
@@ -132,14 +133,18 @@ github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9
github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk=
github.com/go-logfmt/logfmt v0.5.0/go.mod h1:wCYkCAKZfumFQihp8CzCvQ3paCTfi41vtzG1KdI/P7A=
github.com/go-logfmt/logfmt v0.5.1/go.mod h1:WYhtIu8zTZfxdn5+rREduYbwxfcBr/Vr6KEVveWlfTs=
github.com/go-logr/logr v1.2.3 h1:2DntVwHkVopvECVRSlL5PSo9eG+cAkDCuckLubN+rq0=
github.com/go-redis/redis/v8 v8.11.5 h1:AcZZR7igkdvfVmQTPnu9WE37LRrO/YrBH5zWyjDC0oI=
github.com/go-redis/redis/v8 v8.11.5/go.mod h1:gREzHqY1hg6oD9ngVRbLStwAWKhA0FEgq8Jd4h5lpwo=
github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
github.com/go-task/slim-sprig v0.0.0-20210107165309-348f09dbbbc0 h1:p104kn46Q8WdvHunIJ9dAyjPVtrBPhSr3KT2yUst43I=
github.com/go-task/slim-sprig v0.0.0-20210107165309-348f09dbbbc0/go.mod h1:fyg7847qk6SyHyPtNmDHnmrv/HOrqktSC+C9fM+CJOE=
github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=
github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ=
github.com/gogo/protobuf v1.2.0/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ=
github.com/golang-jwt/jwt/v4 v4.4.2 h1:rcc4lwaZgFMCZ5jxF9ABolDcIHdBytAFgqFPbSJQAYs=
github.com/golang-jwt/jwt/v4 v4.4.2/go.mod h1:m21LjoU+eqJr34lmDMbreY2eSTRJ1cv77w39/MY0Ch0=
github.com/golang-jwt/jwt/v4 v4.4.3 h1:Hxl6lhQFj4AnOX6MLrsCb/+7tCj7DxP7VA+2rDIq5AU=
github.com/golang-jwt/jwt/v4 v4.4.3/go.mod h1:m21LjoU+eqJr34lmDMbreY2eSTRJ1cv77w39/MY0Ch0=
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
github.com/golang/groupcache v0.0.0-20191227052852-215e87163ea7/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
@@ -193,6 +198,8 @@ github.com/google/pprof v0.0.0-20200212024743-f11f1df84d12/go.mod h1:ZgVRPoUq/hf
github.com/google/pprof v0.0.0-20200229191704-1ebb73c60ed3/go.mod h1:ZgVRPoUq/hfqzAqh7sHMqb3I9Rq5C59dIz2SbBwJ4eM=
github.com/google/pprof v0.0.0-20200430221834-fc25d7d30c6d/go.mod h1:ZgVRPoUq/hfqzAqh7sHMqb3I9Rq5C59dIz2SbBwJ4eM=
github.com/google/pprof v0.0.0-20200708004538-1a94d8640e99/go.mod h1:ZgVRPoUq/hfqzAqh7sHMqb3I9Rq5C59dIz2SbBwJ4eM=
github.com/google/pprof v0.0.0-20221203041831-ce31453925ec h1:fR20TYVVwhK4O7r7y+McjRYyaTH6/vjwJOajE+XhlzM=
github.com/google/pprof v0.0.0-20221203041831-ce31453925ec/go.mod h1:dDKJzRmX4S37WGHujM7tX//fmj1uioxKzKxz3lo4HJo=
github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI=
github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg=
github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5mhpdKc/us6bOk=
@@ -208,15 +215,15 @@ github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpO
github.com/huandu/xstrings v1.0.0/go.mod h1:4qWG/gcEcfX4z/mBDHJ++3ReCw9ibxbsNJbcucJdbSo=
github.com/huandu/xstrings v1.2.0/go.mod h1:DvyZB1rfVYsBIigL8HwpZgxHwXozlTgGqn63UyNX5k4=
github.com/huandu/xstrings v1.3.1/go.mod h1:y5/lhBue+AyNmUVz9RLU9xbLR0o4KIIExikq4ovT0aE=
github.com/huandu/xstrings v1.3.3 h1:/Gcsuc1x8JVbJ9/rlye4xZnVAbEkGauT8lbebqcQws4=
github.com/huandu/xstrings v1.3.3/go.mod h1:y5/lhBue+AyNmUVz9RLU9xbLR0o4KIIExikq4ovT0aE=
github.com/huandu/xstrings v1.4.0 h1:D17IlohoQq4UcpqD7fDk80P7l+lwAmlFaBHgOipl2FU=
github.com/huandu/xstrings v1.4.0/go.mod h1:y5/lhBue+AyNmUVz9RLU9xbLR0o4KIIExikq4ovT0aE=
github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc=
github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM=
github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg=
github.com/jackc/pgservicefile v0.0.0-20200714003250-2b9c44734f2b h1:C8S2+VttkHFdOOCXJe+YGfa4vHYwlt4Zx+IVXQ97jYg=
github.com/jackc/pgservicefile v0.0.0-20200714003250-2b9c44734f2b/go.mod h1:vsD4gTJCa9TptPL8sPkXrLZ+hDuNrZCnj29CQpr4X1E=
github.com/jackc/pgx/v5 v5.1.0 h1:Z7pLKUb65HK6m18No8GGKT87K34NhIIEHa86rRdjxbU=
github.com/jackc/pgx/v5 v5.1.0/go.mod h1:Ptn7zmohNsWEsdxRawMzk3gaKma2obW+NWTnKa0S4nk=
github.com/jackc/pgx/v5 v5.2.0 h1:NdPpngX0Y6z6XDFKqmFQaE+bCtkqzvQIOt1wvBlAqs8=
github.com/jackc/pgx/v5 v5.2.0/go.mod h1:Ptn7zmohNsWEsdxRawMzk3gaKma2obW+NWTnKa0S4nk=
github.com/jackc/puddle/v2 v2.1.2 h1:0f7vaaXINONKTsxYDn4otOAiJanX/BMeAtY//BXqzlg=
github.com/jackc/puddle/v2 v2.1.2/go.mod h1:2lpufsF5mRHO6SuZkm0fNYxM6SWHfvyFj62KwNzgels=
github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4=
@@ -274,8 +281,10 @@ github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE=
github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/ginkgo v1.7.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/ginkgo v1.16.5 h1:8xi0RTUf59SOSfEtZMvwTvXYMzG4gV23XVHOZiXNtnE=
github.com/onsi/ginkgo/v2 v2.5.1 h1:auzK7OI497k6x4OvWq+TKAcpcSAlod0doAH72oIN0Jw=
github.com/onsi/ginkgo/v2 v2.5.1/go.mod h1:63DOGlLAH8+REH8jUGdL3YpCpu7JODesutUjdENfUAc=
github.com/onsi/gomega v1.4.3/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
github.com/onsi/gomega v1.23.0 h1:/oxKu9c2HVap+F3PfKort2Hw5DEU+HGlW8n+tguWsys=
github.com/onsi/gomega v1.24.1 h1:KORJXNNTzJXzu4ScJWssJfJMnJ+2QJqhoQSRwNlze9E=
github.com/openzipkin/zipkin-go v0.1.6/go.mod h1:QgAqvLzwWbR/WpD4A3cGpPtJrZXNIiJc5AZX7/PBEpw=
github.com/philhofer/fwd v1.0.0/go.mod h1:gk3iGcWd9+svBvR0sR+KPcfE+RNWozjowpeBVG3ZVNU=
github.com/pierrec/lz4 v2.0.5+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY=
@@ -372,8 +381,8 @@ golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8U
golang.org/x/crypto v0.0.0-20190605123033-f99c8df09eb5/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.2.0 h1:BRXPfhNivWL5Yq0BGQ39a2sW6t44aODpfxkWjYdzewE=
golang.org/x/crypto v0.2.0/go.mod h1:hebNnKkNXi2UzZN1eVRvBB7co0a+JxK6XbPiWVs/3J4=
golang.org/x/crypto v0.3.0 h1:a06MkbcxBrEFc0w0QIZWXrH/9cCX6KJyWbBOIwAn+7A=
golang.org/x/crypto v0.3.0/go.mod h1:hebNnKkNXi2UzZN1eVRvBB7co0a+JxK6XbPiWVs/3J4=
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20190510132918-efd6b22b2522/go.mod h1:ZjyILWgesfNpC6sMxTJOJm9Kp84zZh5NQWvqDGG3Qr8=
@@ -502,8 +511,8 @@ golang.org/x/sys v0.0.0-20220114195835-da31bd327af9/go.mod h1:oPkhp1MJrh7nUepCBc
golang.org/x/sys v0.0.0-20220704084225-05e143d24a9e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220908164124-27713097b956/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.2.0 h1:ljd4t30dBnAvMZaQCevtY0xLLD0A+bRZXbgLMLU1F/A=
golang.org/x/sys v0.2.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.3.0 h1:w8ZOecv6NaNa/zC8944JTU3vz4u6Lagfk4RPQxv92NQ=
golang.org/x/sys v0.3.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
@@ -513,8 +522,8 @@ golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
golang.org/x/text v0.4.0 h1:BrVqGRd7+k1DiOgtnFvAkoQEWQvBc25ouMJM6429SFg=
golang.org/x/text v0.4.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8=
golang.org/x/text v0.5.0 h1:OLmvp0KP+FVG99Ct/qFiL/Fhk4zp4QQnZ7b2U+5piUM=
golang.org/x/text v0.5.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8=
golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
@@ -559,6 +568,8 @@ golang.org/x/tools v0.0.0-20200618134242-20370b0cb4b2/go.mod h1:EkVYQZoAsY45+roY
golang.org/x/tools v0.0.0-20200729194436-6467de6f59a7/go.mod h1:njjCfa9FT2d7l9Bc6FUM5FLjQPp3cFF28FI3qnDFljA=
golang.org/x/tools v0.0.0-20200804011535-6c149bb5ef0d/go.mod h1:njjCfa9FT2d7l9Bc6FUM5FLjQPp3cFF28FI3qnDFljA=
golang.org/x/tools v0.0.0-20200825202427-b303f430e36d/go.mod h1:njjCfa9FT2d7l9Bc6FUM5FLjQPp3cFF28FI3qnDFljA=
golang.org/x/tools v0.3.0 h1:SrNbZl6ECOS1qFzgTdQfWXZM9XBkiA6tkFrH9YSTPHM=
golang.org/x/tools v0.3.0/go.mod h1:/rWhSS2+zyEVwoJf8YAX6L2f0ntZ7Kn/mGgAWcipA5k=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
+3 -13
View File
@@ -13,6 +13,7 @@ import (
"github.com/sot-tech/mochi/bittorrent"
"github.com/sot-tech/mochi/middleware"
"github.com/sot-tech/mochi/pkg/conf"
"github.com/sot-tech/mochi/pkg/xorshift"
"github.com/sot-tech/mochi/storage"
)
@@ -81,10 +82,10 @@ type hook struct {
func (h *hook) HandleAnnounce(ctx context.Context, req *bittorrent.AnnounceRequest, resp *bittorrent.AnnounceResponse) (context.Context, error) {
// Generate a probability p < 1.0.
p, s0, s1 := xoroshiro128p(deriveEntropyFromRequest(req))
p, s0, s1 := xorshift.XoRoShiRo128SS(deriveEntropyFromRequest(req))
if float32(float64(p)/math.MaxUint64) < h.cfg.ModifyResponseProbability {
// Generate the increase delta.
v, _, _ := xoroshiro128p(s0, s1)
v, _, _ := xorshift.XoRoShiRo128SS(s0, s1)
add := time.Duration(v%uint64(h.cfg.MaxIncreaseDelta)+1) * time.Second
resp.Interval += add
@@ -113,14 +114,3 @@ func deriveEntropyFromRequest(req *bittorrent.AnnounceRequest) (v0 uint64, v1 ui
v1 = binary.BigEndian.Uint64(req.ID[:8]) + binary.BigEndian.Uint64(req.ID[8:16])
return
}
// xoroshiro128p calculates predictable pseudorandom number
// with XOR/rotate/shift/rotate 128+ algorithm.
// see https://prng.di.unimi.it/xoroshiro128plus.c
func xoroshiro128p(s0, s1 uint64) (result, ns0, ns1 uint64) {
result = s0 + s1
s1 ^= s0
ns0 = ((s0 << 24) | (s0 >> 40)) ^ s1 ^ (s1 << 16) // rotl(s0, 24) ^ s1 ^ (s1 << 16)
ns1 = (s1 << 37) | (s1 >> 27) // rotl(s1, 37)
return
}
@@ -3,7 +3,6 @@ package varinterval
import (
"context"
"fmt"
"math/rand"
"testing"
"github.com/stretchr/testify/require"
@@ -64,13 +63,3 @@ func TestHandleAnnounce(t *testing.T) {
require.True(t, resp.Interval > 0, "interval should have been increased")
require.True(t, resp.MinInterval > 0, "min_interval should have been increased")
}
func BenchmarkXORoShiRo128Plus(b *testing.B) {
s0, s1 := rand.Uint64(), rand.Uint64()
var v uint64
b.ResetTimer()
for i := 0; i < b.N; i++ {
v, s0, s1 = xoroshiro128p(s0, s1)
}
_, _, _ = v, s0, s1
}
+1 -1
View File
@@ -13,7 +13,7 @@ func init() {
}
// GenSeed returns 64bit seed from crypto/rand source or
// from current time, if crypto random error occurred
// from current time, if crypto random read error occurred
func GenSeed() (seed int64) {
r := make([]byte, 8)
if _, err := cr.Read(r); err == nil {
+27
View File
@@ -0,0 +1,27 @@
// Package xorshift contains functions for fast generating
// predictable pseudorandom numbers
// See https://prng.di.unimi.it .
package xorshift
// XoRoShiRo128SS calculates predictable pseudorandom number
// with XOR/rotate/shift/rotate 128** (xoroshiro128starstar) algorithm.
// In some cases a little faster than XorShift64S, but uses 128 bits footprint.
// see https://prng.di.unimi.it/xoroshiro128starstar.c
func XoRoShiRo128SS(s0, s1 uint64) (uint64, uint64, uint64) {
r := s0 * 5
r = ((r << 7) | (r >> 57)) * 9 // rotl(s0*5, 7) * 9
s1 ^= s0
s0 = ((s0 << 24) | (s0 >> 40)) ^ s1 ^ (s1 << 16) // rotl(s0, 24) ^ s1 ^ (s1 << 16)
s1 = (s1 << 37) | (s1 >> 27) // rotl(s1, 37)
return r, s0, s1
}
// XorShift64S calculates predictable pseudorandom number
// with XOR/Shift 64* (shorshift64*) algorithm.
// see https://vigna.di.unimi.it/ftp/papers/xorshift.pdf
func XorShift64S(s uint64) (uint64, uint64) {
s ^= s >> 12
s ^= s << 25
s ^= s >> 27
return s * uint64(0x2545F4914F6CDD1D), s
}
+30
View File
@@ -0,0 +1,30 @@
package xorshift
import (
"math/rand"
"testing"
)
func BenchmarkRand(b *testing.B) {
var cnt uint64
for i := 0; i < b.N; i++ {
cnt = rand.Uint64()
}
_ = cnt
}
func BenchmarkXoRoShiRo128SS(b *testing.B) {
v, s0, s1 := uint64(0), rand.Uint64(), rand.Uint64()
for i := 0; i < b.N; i++ {
v, s0, s1 = XoRoShiRo128SS(s0, s1)
}
_, _, _ = v, s0, s1
}
func BenchmarkXorShift64Star(b *testing.B) {
v, s := uint64(0), rand.Uint64()
for i := 0; i < b.N; i++ {
v, s = XorShift64S(s)
}
_, _ = v, s
}
+199 -170
View File
@@ -8,6 +8,7 @@ import (
"math"
"runtime"
"sync"
"sync/atomic"
"time"
"github.com/sot-tech/mochi/bittorrent"
@@ -18,14 +19,20 @@ import (
"github.com/sot-tech/mochi/storage"
)
// Default config constants.
const defaultShardCount = 1024
const (
// Name - registered name of the storage
Name = "memory"
// Default config constants.
defaultShardCount = 1024
// -1
decrUint64 = ^uint64(0)
)
var logger = log.NewLogger("storage/memory")
func init() {
// Register the storage driver.
storage.RegisterDriver("memory", builder)
storage.RegisterDriver(Name, builder)
}
func builder(icfg conf.MapConfig) (storage.PeerStorage, error) {
@@ -50,7 +57,7 @@ func (cfg Config) Validate() Config {
if cfg.ShardCount <= 0 || cfg.ShardCount > (math.MaxInt/2) {
validcfg.ShardCount = defaultShardCount
log.Warn().
logger.Warn().
Str("name", "ShardCount").
Int("provided", cfg.ShardCount).
Int("default", validcfg.ShardCount).
@@ -64,38 +71,137 @@ func (cfg Config) Validate() Config {
func NewPeerStorage(provided Config) (storage.PeerStorage, error) {
cfg := provided.Validate()
ps := &peerStore{
cfg: cfg,
shards: make([]*peerShard, cfg.ShardCount*2),
DataStorage: NewDataStorage(),
closed: make(chan struct{}),
closed: make(chan any),
}
for i := 0; i < cfg.ShardCount*2; i++ {
ps.shards[i] = &peerShard{swarms: make(map[bittorrent.InfoHash]swarm)}
ps.shards[i] = &peerShard{swarms: &ihSwarm{m: make(map[bittorrent.InfoHash]swarm)}}
}
return ps, nil
}
type peerShard struct {
swarms map[bittorrent.InfoHash]swarm
numSeeders uint64
numLeechers uint64
swarms *ihSwarm
numSeeders atomic.Uint64
numLeechers atomic.Uint64
}
type ihSwarm struct {
m map[bittorrent.InfoHash]swarm
sync.RWMutex
}
func (p *ihSwarm) get(k bittorrent.InfoHash) (v swarm, ok bool) {
p.RLock()
v, ok = p.m[k]
p.RUnlock()
return
}
func (p *ihSwarm) getOrCreate(k bittorrent.InfoHash) (v swarm) {
var ok bool
if v, ok = p.get(k); !ok {
p.Lock()
if v, ok = p.m[k]; !ok {
v = swarm{
seeders: &peers{m: make(map[bittorrent.Peer]int64)},
leechers: &peers{m: make(map[bittorrent.Peer]int64)},
}
p.m[k] = v
}
p.Unlock()
}
return
}
func (p *ihSwarm) del(k bittorrent.InfoHash) (ok bool) {
p.Lock()
if _, ok = p.m[k]; ok {
delete(p.m, k)
}
p.Unlock()
return
}
func (p *ihSwarm) len() int {
return len(p.m)
}
func (p *ihSwarm) keys(fn func(k bittorrent.InfoHash) bool) {
p.RLock()
for k := range p.m {
if !fn(k) {
break
}
}
p.RUnlock()
}
type swarm struct {
// map serialized peer to mtime
seeders map[bittorrent.Peer]int64
leechers map[bittorrent.Peer]int64
seeders *peers
leechers *peers
}
type peers struct {
m map[bittorrent.Peer]int64
sync.RWMutex
}
func (p *peers) get(k bittorrent.Peer) (v int64, ok bool) {
p.RLock()
v, ok = p.m[k]
p.RUnlock()
return
}
func (p *peers) set(k bittorrent.Peer, v int64) {
p.Lock()
p.m[k] = v
p.Unlock()
}
func (p *peers) del(k bittorrent.Peer) (ok bool) {
p.Lock()
if _, ok = p.m[k]; ok {
delete(p.m, k)
}
p.Unlock()
return
}
func (p *peers) len() int {
return len(p.m)
}
func (p *peers) keys(fn func(k bittorrent.Peer) bool) {
p.RLock()
for k := range p.m {
if !fn(k) {
break
}
}
p.RUnlock()
}
func (p *peers) forEach(fn func(k bittorrent.Peer, v int64) bool) {
p.RLock()
for k, v := range p.m {
if !fn(k, v) {
break
}
}
p.RUnlock()
}
type peerStore struct {
storage.DataStorage
cfg Config
shards []*peerShard
closed chan struct{}
closed chan any
wg sync.WaitGroup
onceCloser sync.Once
}
@@ -140,17 +246,15 @@ func (ps *peerStore) ScheduleStatisticsCollection(reportInterval time.Duration)
before := time.Now()
// aggregates metrics over all shards and then posts them to
// prometheus.
var numInfohashes, numSeeders, numLeechers uint64
var numInfoHashes, numSeeders, numLeechers uint64
for _, s := range ps.shards {
s.RLock()
numInfohashes += uint64(len(s.swarms))
numSeeders += s.numSeeders
numLeechers += s.numLeechers
s.RUnlock()
numInfoHashes += uint64(s.swarms.len())
numSeeders += s.numSeeders.Load()
numLeechers += s.numLeechers.Load()
}
storage.PromInfoHashesCount.Set(float64(numInfohashes))
storage.PromInfoHashesCount.Set(float64(numInfoHashes))
storage.PromSeedersCount.Set(float64(numSeeders))
storage.PromLeechersCount.Set(float64(numLeechers))
logger.Debug().TimeDiff("timeTaken", time.Now(), before).Msg("populate prom complete")
@@ -182,29 +286,19 @@ func (ps *peerStore) PutSeeder(_ context.Context, ih bittorrent.InfoHash, p bitt
Object("peer", p).
Msg("put seeder")
shard := ps.shards[ps.shardIndex(ih, p.Addr().Is6())]
shard.Lock()
defer shard.Unlock()
sh := ps.shards[ps.shardIndex(ih, p.Addr().Is6())]
sw := sh.swarms.getOrCreate(ih)
if _, ok := shard.swarms[ih]; !ok {
shard.swarms[ih] = swarm{
seeders: make(map[bittorrent.Peer]int64),
leechers: make(map[bittorrent.Peer]int64),
}
if _, exists := sw.seeders.get(p); !exists {
sh.numSeeders.Add(1)
}
// If this peer isn't already a seeder, update the stats for the swarm.
if _, ok := shard.swarms[ih].seeders[p]; !ok {
shard.numSeeders++
}
// Update the peer in the swarm.
shard.swarms[ih].seeders[p] = timecache.NowUnixNano()
sw.seeders.set(p, timecache.NowUnixNano())
return nil
}
func (ps *peerStore) DeleteSeeder(_ context.Context, ih bittorrent.InfoHash, p bittorrent.Peer) error {
func (ps *peerStore) DeleteSeeder(_ context.Context, ih bittorrent.InfoHash, p bittorrent.Peer) (err error) {
select {
case <-ps.closed:
panic("attempted to interact with stopped memory store")
@@ -215,26 +309,16 @@ func (ps *peerStore) DeleteSeeder(_ context.Context, ih bittorrent.InfoHash, p b
Object("peer", p).
Msg("delete seeder")
shard := ps.shards[ps.shardIndex(ih, p.Addr().Is6())]
shard.Lock()
defer shard.Unlock()
if _, ok := shard.swarms[ih]; !ok {
return storage.ErrResourceDoesNotExist
sh := ps.shards[ps.shardIndex(ih, p.Addr().Is6())]
if sw, ok := sh.swarms.get(ih); ok {
if sw.seeders.del(p) {
sh.numSeeders.Add(decrUint64)
}
} else {
err = storage.ErrResourceDoesNotExist
}
if _, ok := shard.swarms[ih].seeders[p]; !ok {
return storage.ErrResourceDoesNotExist
}
shard.numSeeders--
delete(shard.swarms[ih].seeders, p)
if len(shard.swarms[ih].seeders)|len(shard.swarms[ih].leechers) == 0 {
delete(shard.swarms, ih)
}
return nil
return
}
func (ps *peerStore) PutLeecher(_ context.Context, ih bittorrent.InfoHash, p bittorrent.Peer) error {
@@ -248,29 +332,19 @@ func (ps *peerStore) PutLeecher(_ context.Context, ih bittorrent.InfoHash, p bit
Object("peer", p).
Msg("put leecher")
shard := ps.shards[ps.shardIndex(ih, p.Addr().Is6())]
shard.Lock()
defer shard.Unlock()
sh := ps.shards[ps.shardIndex(ih, p.Addr().Is6())]
sw := sh.swarms.getOrCreate(ih)
if _, ok := shard.swarms[ih]; !ok {
shard.swarms[ih] = swarm{
seeders: make(map[bittorrent.Peer]int64),
leechers: make(map[bittorrent.Peer]int64),
}
if _, exists := sw.leechers.get(p); !exists {
sh.numLeechers.Add(1)
}
// If this peer isn't already a leecher, update the stats for the swarm.
if _, ok := shard.swarms[ih].leechers[p]; !ok {
shard.numLeechers++
}
// Update the peer in the swarm.
shard.swarms[ih].leechers[p] = timecache.NowUnixNano()
sw.leechers.set(p, timecache.NowUnixNano())
return nil
}
func (ps *peerStore) DeleteLeecher(_ context.Context, ih bittorrent.InfoHash, p bittorrent.Peer) error {
func (ps *peerStore) DeleteLeecher(_ context.Context, ih bittorrent.InfoHash, p bittorrent.Peer) (err error) {
select {
case <-ps.closed:
panic("attempted to interact with stopped memory store")
@@ -281,26 +355,16 @@ func (ps *peerStore) DeleteLeecher(_ context.Context, ih bittorrent.InfoHash, p
Object("peer", p).
Msg("delete leecher")
shard := ps.shards[ps.shardIndex(ih, p.Addr().Is6())]
shard.Lock()
defer shard.Unlock()
if _, ok := shard.swarms[ih]; !ok {
return storage.ErrResourceDoesNotExist
sh := ps.shards[ps.shardIndex(ih, p.Addr().Is6())]
if sw, ok := sh.swarms.get(ih); ok {
if sw.leechers.del(p) {
sh.numLeechers.Add(decrUint64)
}
} else {
err = storage.ErrResourceDoesNotExist
}
if _, ok := shard.swarms[ih].leechers[p]; !ok {
return storage.ErrResourceDoesNotExist
}
shard.numLeechers--
delete(shard.swarms[ih].leechers, p)
if len(shard.swarms[ih].seeders)|len(shard.swarms[ih].leechers) == 0 {
delete(shard.swarms, ih)
}
return nil
return
}
func (ps *peerStore) GraduateLeecher(_ context.Context, ih bittorrent.InfoHash, p bittorrent.Peer) error {
@@ -314,61 +378,22 @@ func (ps *peerStore) GraduateLeecher(_ context.Context, ih bittorrent.InfoHash,
Object("peer", p).
Msg("graduate leecher")
shard := ps.shards[ps.shardIndex(ih, p.Addr().Is6())]
shard.Lock()
defer shard.Unlock()
sh := ps.shards[ps.shardIndex(ih, p.Addr().Is6())]
sw := sh.swarms.getOrCreate(ih)
if _, ok := shard.swarms[ih]; !ok {
shard.swarms[ih] = swarm{
seeders: make(map[bittorrent.Peer]int64),
leechers: make(map[bittorrent.Peer]int64),
}
if sw.leechers.del(p) {
sh.numLeechers.Add(decrUint64)
}
// If this peer is a leecher, update the stats for the swarm and remove them.
if _, ok := shard.swarms[ih].leechers[p]; ok {
shard.numLeechers--
delete(shard.swarms[ih].leechers, p)
if _, exists := sw.seeders.get(p); !exists {
sh.numSeeders.Add(1)
}
// If this peer isn't already a seeder, update the stats for the swarm.
if _, ok := shard.swarms[ih].seeders[p]; !ok {
shard.numSeeders++
}
// Update the peer in the swarm.
shard.swarms[ih].seeders[p] = timecache.NowUnixNano()
sw.seeders.set(p, timecache.NowUnixNano())
return nil
}
func parsePeers(peersMap map[bittorrent.Peer]int64, maxCount int) (peers []bittorrent.Peer) {
for p := range peersMap {
if maxCount == 0 {
break
}
peers = append(peers, p)
maxCount--
}
return
}
func (ps *peerStore) getPeers(shard *peerShard, ih bittorrent.InfoHash, maxCount int, forSeeder bool) (peers []bittorrent.Peer) {
shard.RLock()
defer shard.RUnlock()
if swarm, ok := shard.swarms[ih]; ok {
if forSeeder {
peers = parsePeers(swarm.leechers, maxCount)
} else {
peers = append(peers, parsePeers(swarm.seeders, maxCount)...)
if maxCount -= len(peers); maxCount > 0 {
peers = append(peers, parsePeers(swarm.leechers, maxCount)...)
}
}
}
return
}
func (ps *peerStore) AnnouncePeers(_ context.Context, ih bittorrent.InfoHash, forSeeder bool, numWant int, v6 bool) (peers []bittorrent.Peer, err error) {
select {
case <-ps.closed:
@@ -382,18 +407,31 @@ func (ps *peerStore) AnnouncePeers(_ context.Context, ih bittorrent.InfoHash, fo
Bool("v6", v6).
Msg("announce peers")
peers = ps.getPeers(ps.shards[ps.shardIndex(ih, v6)], ih, numWant, forSeeder)
if sw, ok := ps.shards[ps.shardIndex(ih, v6)].swarms.get(ih); ok {
peers = make([]bittorrent.Peer, 0, numWant/2)
rangeFn := func(p bittorrent.Peer) bool {
peers = append(peers, p)
numWant--
return numWant > 0
}
if forSeeder {
sw.leechers.keys(rangeFn)
} else {
sw.seeders.keys(rangeFn)
if numWant > 0 {
sw.leechers.keys(rangeFn)
}
}
}
return
}
func (ps *peerStore) countPeers(ih bittorrent.InfoHash, v6 bool) (leechers, seeders uint32) {
shard := ps.shards[ps.shardIndex(ih, v6)]
shard.RLock()
defer shard.RUnlock()
if swarm, ok := shard.swarms[ih]; ok {
leechers, seeders = uint32(len(swarm.leechers)), uint32(len(swarm.seeders))
if sw, ok := shard.swarms.get(ih); ok {
leechers, seeders = uint32(sw.leechers.len()), uint32(sw.seeders.len())
}
return
}
@@ -483,42 +521,40 @@ func (ps *peerStore) gc(cutoff time.Time) {
cutoffUnix := cutoff.UnixNano()
for _, shard := range ps.shards {
shard.RLock()
var infohashes []bittorrent.InfoHash
for ih := range shard.swarms {
infohashes = append(infohashes, ih)
}
shard.RUnlock()
infoHashes := make([]bittorrent.InfoHash, 0, shard.swarms.len())
shard.swarms.keys(func(ih bittorrent.InfoHash) bool {
infoHashes = append(infoHashes, ih)
return true
})
runtime.Gosched()
for _, ih := range infohashes {
shard.Lock()
if _, stillExists := shard.swarms[ih]; !stillExists {
shard.Unlock()
for _, ih := range infoHashes {
sw, stillExists := shard.swarms.get(ih)
if !stillExists {
runtime.Gosched()
continue
}
for pk, mtime := range shard.swarms[ih].leechers {
sw.leechers.forEach(func(p bittorrent.Peer, mtime int64) bool {
if mtime <= cutoffUnix {
shard.numLeechers--
delete(shard.swarms[ih].leechers, pk)
sw.leechers.del(p)
shard.numLeechers.Add(decrUint64)
}
}
return true
})
for pk, mtime := range shard.swarms[ih].seeders {
sw.seeders.forEach(func(p bittorrent.Peer, mtime int64) bool {
if mtime <= cutoffUnix {
shard.numSeeders--
delete(shard.swarms[ih].seeders, pk)
sw.seeders.del(p)
shard.numSeeders.Add(decrUint64)
}
return true
})
if sw.leechers.len()|sw.seeders.len() == 0 {
shard.swarms.del(ih)
}
if len(shard.swarms[ih].seeders)|len(shard.swarms[ih].leechers) == 0 {
delete(shard.swarms, ih)
}
shard.Unlock()
runtime.Gosched()
}
@@ -534,13 +570,6 @@ func (ps *peerStore) Close() error {
ps.onceCloser.Do(func() {
close(ps.closed)
ps.wg.Wait()
// Explicitly deallocate our storage.
shards := make([]*peerShard, len(ps.shards))
for i := 0; i < len(ps.shards); i++ {
shards[i] = &peerShard{swarms: make(map[bittorrent.InfoHash]swarm)}
}
ps.shards = shards
})
return nil
+2 -2
View File
@@ -475,7 +475,7 @@ func (s *store) getPeers(ctx context.Context, ih bittorrent.InfoHash, seeders bo
} else {
logger.Warn().
Err(err).
Bytes("peerID", id).
Hex("peerID", id).
IPAddr("ip", ip).
Int("port", port).
Msg("unable to scan/construct peer")
@@ -552,7 +552,7 @@ func (s *store) countPeers(ctx context.Context, ih []byte) (seeders uint32, leec
}
}
if err != nil {
logger.Error().Err(err).Bytes("infoHash", ih).Msg("unable to get peers count")
logger.Error().Err(err).Hex("infoHash", ih).Msg("unable to get peers count")
}
return
}
+3 -3
View File
@@ -44,7 +44,7 @@ func (c Config) sanitizeGCConfig() (gcInterval, peerTTL time.Duration) {
if c.GarbageCollectionInterval <= 0 {
gcInterval = DefaultGarbageCollectionInterval
logger.Warn().
Str("name", "garbageCollectionInterval").
Str("name", "GarbageCollectionInterval").
Dur("provided", c.GarbageCollectionInterval).
Dur("default", DefaultGarbageCollectionInterval).
Msg("falling back to default configuration")
@@ -54,7 +54,7 @@ func (c Config) sanitizeGCConfig() (gcInterval, peerTTL time.Duration) {
if c.PeerLifetime <= 0 {
peerTTL = DefaultPeerLifetime
logger.Warn().
Str("name", "peerLifetime").
Str("name", "PeerLifetime").
Dur("provided", c.PeerLifetime).
Dur("default", DefaultPeerLifetime).
Msg("falling back to default configuration")
@@ -68,7 +68,7 @@ func (c Config) sanitizeStatisticsConfig() (statInterval time.Duration) {
if c.PrometheusReportingInterval < 0 {
statInterval = DefaultPrometheusReportingInterval
logger.Warn().
Str("name", "prometheusReportingInterval").
Str("name", "PrometheusReportingInterval").
Dur("provided", c.PrometheusReportingInterval).
Dur("default", DefaultPrometheusReportingInterval).
Msg("falling back to default configuration")