diff --git a/.github/workflows/build.yaml b/.github/workflows/build.yaml index f03d8fb..f218839 100644 --- a/.github/workflows/build.yaml +++ b/.github/workflows/build.yaml @@ -82,13 +82,10 @@ jobs: run: | go install ./cmd/mochi go install ./cmd/mochi-e2e - curl -LO https://github.com/jzelinskie/faq/releases/download/0.0.6/faq-linux-amd64 - chmod +x faq-linux-amd64 - ./faq-linux-amd64 '.mochi.storage = {"config":{"gc_interval":"3m","peer_lifetime":"31m","prometheus_reporting_interval":"1s","connect_timeout":"15s","read_timeout":"15s","write_timeout":"15s"},"name":"redis"}' ./dist/example_config.yaml > ./dist/example_redis_config.yaml - cat ./dist/example_redis_config.yaml + cat ./dist/example_config_redis.yaml - name: "Run end-to-end tests" run: | - mochi --config=./dist/example_redis_config.yaml --logLevel debug --logPretty & + mochi --config=./dist/example_config_redis.yaml --logLevel debug --logPretty & pid=$! sleep 2 mochi-e2e diff --git a/bittorrent/sanitize.go b/bittorrent/sanitize.go index fa1a482..c14a4a5 100644 --- a/bittorrent/sanitize.go +++ b/bittorrent/sanitize.go @@ -5,7 +5,7 @@ import ( ) var ( - logger = log.NewLogger("request sanitizer") + logger = log.NewLogger("bittorrent/sanitize") // ErrInvalidIP indicates an invalid IP for an Announce. ErrInvalidIP = ClientError("invalid IP") diff --git a/dist/example_config.yaml b/dist/example_config.yaml index f494e5b..3713af9 100644 --- a/dist/example_config.yaml +++ b/dist/example_config.yaml @@ -23,13 +23,14 @@ frontends: # BitTorrent traffic. Remove this to disable the non-TLS listener. addr: "0.0.0.0:6969" - # The network interface that will bind to an HTTPS server for serving + # Mark this frontend as HTTPS server for serving # BitTorrent traffic. If set, tls_cert_path and tls_key_path are required. - https_addr: "" + tls: false # The path to the required files to listen via HTTPS. tls_cert_path: "" tls_key_path: "" + # Enable SO_REUSEPORT to allow starting multiple mochi instances with the same HTTP(S) port. reuse_port: true @@ -157,119 +158,9 @@ storage: # are collected and posted to Prometheus. prometheus_reporting_interval: 1s -# This block defines configuration used for redis storage. -#storage: - #name: redis - #config: - # The frequency which stale peers are removed. - # This balances between - # - collecting garbage more often, potentially using more CPU time, but potentially using less memory (lower value) - # - collecting garbage less frequently, saving CPU time, but keeping old peers long, thus using more memory (higher value). - #gc_interval: 3m - - # The interval at which metrics about the number of infohashes and peers - # are collected and posted to Prometheus. - #prometheus_reporting_interval: 1s - - # The amount of time until a peer is considered stale. - # To avoid churn, keep this slightly larger than `announce_interval` - #peer_lifetime: 31m - - # The addresses of redis storage. - # If neither sentinel not cluster switched, - # only first address used - #addresses: ["127.0.0.1:6379"] - - # Database to be selected after connecting to the server. - #db: 0 - - # Maximum number of socket connections, default is 10 per CPU - #pool_size: 10 - - # Use the specified login/username to authenticate the current connection - #login: "" - - # Optional password - #password: "" - - # Connect to sentinel nodes - #sentinel: false - - # The master name - #sentinel_master: "" - - # Connect to the redis cluster - #cluster: false - - # The timeout for reading a command reply from redis. - #read_timeout: 15s - - # The timeout for writing a command to redis. - #write_timeout: 15s - - # Dial timeout for establishing new connections. - #connect_timeout: 15s - -# This block defines configuration used for PostgreSQL storage. -# example peers table structure: -# - info_hash bytea -# - peer_id bytea -# - address inet or bytea -# - port int4 -# - is_seeder bool -# - is_v6 bool -# - created timestamp -# example downloads table structure: -# - info_hash bytea -# - downloads int -#storage: - #name: pg - #config: - # connection string to pg storage. may be URL (postgres://...) or DSN (host=... port=...) - #connection_string: host=127.0.0.1 database=test user=postgres pool_max_conns=50 - # query and parameters for announce operation - #announce: - #query: SELECT peer_id, address, port FROM mo_peers WHERE info_hash=@info_hash AND is_seeder=@is_seeder AND is_v6=@is_v6 LIMIT @count - #peer_id_column: peer_id - #address_column: address - #port_column: port - # queries to get/increment 'snached' (downloaded) count - #downloads: - #get_query: SELECT downloads FROM mo_downloads where info_hash=@info_hash - #inc_query: INSERT INTO mo_downloads VALUES(@info_hash) ON CONFLICT(info_hash) DO UPDATE SET downloads = mo_downloads.downloads + 1 - # queries and parameters for add/delete/count peers operations - #peer: - #add_query: INSERT INTO mo_peers VALUES(@info_hash, @peer_id, @address, @port, @is_seeder, @is_v6, @created) ON CONFLICT (info_hash, peer_id, address, port) DO UPDATE SET created = EXCLUDED.created, is_seeder = EXCLUDED.is_seeder - #del_query: DELETE FROM mo_peers WHERE info_hash=@info_hash AND peer_id=@peer_id AND address=@address AND port=@port AND is_seeder=@is_seeder - #graduate_query: UPDATE mo_peers SET is_seeder=TRUE WHERE info_hash=@info_hash AND peer_id=peer_id AND address=@address AND port=@port AND NOT is_seeder - #count_query: SELECT COUNT(1) FILTER (WHERE is_seeder) AS seeders, COUNT(1) FILTER (WHERE NOT is_seeder) AS leechers FROM mo_peers - # predicate part of `count_query` to get count of peers by info hash - #by_info_hash_clause: WHERE info_hash = @info_hash - #count_seeders_column: seeders - #count_leechers_column: leechers - # queries for KV-store - #data: - #add_query: INSERT INTO mo_kv VALUES(@context, @key, @value) ON CONFLICT (context, name) DO NOTHING - # NOTE: in del_query @key parameter is array, NOT single value - #del_query: DELETE FROM mo_kv WHERE context=@context AND name = ANY(@key) - #get_query: SELECT value FROM mo_kv WHERE context=@context AND name=@key - # query for check if database is alive - #ping_query: SELECT 1 - # query for garbage collection, expected parameter is timestamp - #gc_query: DELETE FROM mo_peers WHERE created <= @created - # The amount of time until a peer is considered stale. - # To avoid churn, keep this slightly larger than `announce_interval` - #peer_lifetime: 31m - # The frequency which stale peers are removed. - #gc_interval: 3m - # query for info hash statistics - #info_hash_count_query: SELECT COUNT(DISTINCT info_hash) as info_hashes FROM mo_peers - # The interval at which metrics about the number of info hashes and peers - # are collected and posted to Prometheus. - #prometheus_reporting_interval: 1s - # This block defines configuration used for middleware executed before a # response has been returned to a BitTorrent client. +posthooks: [] prehooks: # - name: jwt # config: @@ -308,4 +199,3 @@ prehooks: # invert: false # Name of storage context where store hash list # storage_ctx: APPROVED_HASH -posthooks: diff --git a/dist/example_config_pg.yaml b/dist/example_config_pg.yaml new file mode 100644 index 0000000..bb61ba3 --- /dev/null +++ b/dist/example_config_pg.yaml @@ -0,0 +1,116 @@ +# @formatter:off +# Note: see `example_config.yaml` for `frontends` and `*hooks` config description + + +announce_interval: 30m +min_announce_interval: 15m +metrics_addr: "" + +frontends: + - name: http + config: + addr: "0.0.0.0:6969" + tls: false + tls_cert_path: "" + tls_key_path: "" + reuse_port: true + read_timeout: 5s + write_timeout: 5s + enable_keepalive: false + idle_timeout: 30s + enable_request_timing: false + announce_routes: + - "/announce" + scrape_routes: + - "/scrape" + ping_routes: + - "/ping" + allow_ip_spoofing: false + filter_private_ips: false + real_ip_header: "x-real-ip" + max_numwant: 100 + default_numwant: 50 + max_scrape_infohashes: 50 + + - name: udp + config: + addr: "0.0.0.0:6969" + reuse_port: true + max_clock_skew: 10s + private_key: "paste a random string here that will be used to hmac connection IDs" + enable_request_timing: false + allow_ip_spoofing: false + filter_private_ips: false + max_numwant: 100 + default_numwant: 50 + max_scrape_infohashes: 50 + +# This block defines configuration used for PostgreSQL storage. +# example peers table structure: +# - info_hash bytea +# - peer_id bytea +# - address inet or bytea +# - port int4 +# - is_seeder bool +# - is_v6 bool +# - created timestamp +# example downloads table structure: +# - info_hash bytea +# - downloads int +storage: + name: pg + config: + # connection string to pg storage. may be URL (postgres://...) or DSN (host=... port=...) + #connection_string: host=127.0.0.1 database=test user=postgres pool_max_conns=50 + # query and parameters for announce operation + announce: + query: SELECT peer_id, address, port FROM mo_peers WHERE info_hash=@info_hash AND is_seeder=@is_seeder AND is_v6=@is_v6 LIMIT @count + peer_id_column: peer_id + address_column: address + port_column: port + + # queries to get/increment 'snached' (downloaded) count + downloads: + get_query: SELECT downloads FROM mo_downloads where info_hash=@info_hash + inc_query: INSERT INTO mo_downloads VALUES(@info_hash) ON CONFLICT(info_hash) DO UPDATE SET downloads = mo_downloads.downloads + 1 + + # queries and parameters for add/delete/count peers operations + peer: + add_query: INSERT INTO mo_peers VALUES(@info_hash, @peer_id, @address, @port, @is_seeder, @is_v6, @created) ON CONFLICT (info_hash, peer_id, address, port) DO UPDATE SET created = EXCLUDED.created, is_seeder = EXCLUDED.is_seeder + del_query: DELETE FROM mo_peers WHERE info_hash=@info_hash AND peer_id=@peer_id AND address=@address AND port=@port AND is_seeder=@is_seeder + graduate_query: UPDATE mo_peers SET is_seeder=TRUE WHERE info_hash=@info_hash AND peer_id=peer_id AND address=@address AND port=@port AND NOT is_seeder + count_query: SELECT COUNT(1) FILTER (WHERE is_seeder) AS seeders, COUNT(1) FILTER (WHERE NOT is_seeder) AS leechers FROM mo_peers + # predicate part of `count_query` to get count of peers by info hash + by_info_hash_clause: WHERE info_hash = @info_hash + count_seeders_column: seeders + count_leechers_column: leechers + + # queries for KV-store + data: + add_query: INSERT INTO mo_kv VALUES(@context, @key, @value) ON CONFLICT (context, name) DO NOTHING + # Note: in del_query @key parameter is array, NOT single value + del_query: DELETE FROM mo_kv WHERE context=@context AND name = ANY(@key) + get_query: SELECT value FROM mo_kv WHERE context=@context AND name=@key + + # query for check if database is alive + ping_query: SELECT 1 + + # query for garbage collection, expected parameter is timestamp + gc_query: DELETE FROM mo_peers WHERE created <= @created + + # The amount of time until a peer is considered stale. + # To avoid churn, keep this slightly larger than `announce_interval` + peer_lifetime: 31m + + # The frequency which stale peers are removed. + gc_interval: 3m + + # query for info hash statistics + info_hash_count_query: SELECT COUNT(DISTINCT info_hash) as info_hashes FROM mo_peers + + # The interval at which metrics about the number of info hashes and peers + # are collected and posted to Prometheus. + prometheus_reporting_interval: 1s + +posthooks: [] +prehooks: [] \ No newline at end of file diff --git a/dist/example_config_redis.yaml b/dist/example_config_redis.yaml new file mode 100644 index 0000000..792e047 --- /dev/null +++ b/dist/example_config_redis.yaml @@ -0,0 +1,103 @@ +# @formatter:off +# Note: see `example_config.yaml` for `frontends` and `*hooks` config description + + +announce_interval: 30m +min_announce_interval: 15m +metrics_addr: "" + +frontends: + - name: http + config: + addr: "0.0.0.0:6969" + tls: false + tls_cert_path: "" + tls_key_path: "" + reuse_port: true + read_timeout: 5s + write_timeout: 5s + enable_keepalive: false + idle_timeout: 30s + enable_request_timing: false + announce_routes: + - "/announce" + scrape_routes: + - "/scrape" + ping_routes: + - "/ping" + allow_ip_spoofing: false + filter_private_ips: false + real_ip_header: "x-real-ip" + max_numwant: 100 + default_numwant: 50 + max_scrape_infohashes: 50 + + - name: udp + config: + addr: "0.0.0.0:6969" + reuse_port: true + max_clock_skew: 10s + private_key: "paste a random string here that will be used to hmac connection IDs" + enable_request_timing: false + allow_ip_spoofing: false + filter_private_ips: false + max_numwant: 100 + default_numwant: 50 + max_scrape_infohashes: 50 + +# This block defines configuration used for redis storage. +storage: + # If used keydb fork, set `keydb` name + name: redis + config: + # The frequency which stale peers are removed. + # This balances between + # - collecting garbage more often, potentially using more CPU time, but potentially using less memory (lower value) + # - collecting garbage less frequently, saving CPU time, but keeping old peers long, thus using more memory (higher value). + gc_interval: 3m + + # The interval at which metrics about the number of infohashes and peers + # are collected and posted to Prometheus. + prometheus_reporting_interval: 1s + + # The amount of time until a peer is considered stale. + # To avoid churn, keep this slightly larger than `announce_interval` + peer_lifetime: 31m + + # The addresses of redis storage. + # If neither sentinel not cluster switched, + # only first address used + addresses: ["127.0.0.1:6379"] + + # Database to be selected after connecting to the server. + db: 0 + + # Maximum number of socket connections, default is 10 per CPU + pool_size: 10 + + # Use the specified login/username to authenticate the current connection + login: "" + + # Optional password + password: "" + + # Connect to sentinel nodes + sentinel: false + + # The master name + sentinel_master: "" + + # Connect to the redis cluster + cluster: false + + # The timeout for reading a command reply from redis. + read_timeout: 15s + + # The timeout for writing a command to redis. + write_timeout: 15s + + # Dial timeout for establishing new connections. + connect_timeout: 15s + +posthooks: [] +prehooks: [] \ No newline at end of file diff --git a/frontend/frontend.go b/frontend/frontend.go index c92dfde..e871d96 100644 --- a/frontend/frontend.go +++ b/frontend/frontend.go @@ -44,6 +44,7 @@ func RegisterBuilder(name string, b Builder) { builders[name] = b } +// Frontend dummy interface for bittorrent frontends type Frontend interface { stop.Stopper } diff --git a/frontend/http/frontend.go b/frontend/http/frontend.go index 8fee959..87d8bd0 100644 --- a/frontend/http/frontend.go +++ b/frontend/http/frontend.go @@ -21,16 +21,14 @@ import ( "github.com/sot-tech/mochi/pkg/stop" ) -const Name = "http" - var ( - logger = log.NewLogger(Name) + logger = log.NewLogger("frontend/http") errTLSNotProvided = errors.New("tls certificate/key not provided") errRoutesNotProvided = errors.New("routes not provided") ) func init() { - frontend.RegisterBuilder(Name, newFrontend) + frontend.RegisterBuilder("http", NewFrontend) } // Config represents all configurable options for an HTTP BitTorrent Frontend @@ -51,9 +49,8 @@ const defaultIdleTimeout = 30 * time.Second // Validate sanity checks values set in a config and returns a new config with // default values replacing anything that is invalid. -// -// This function warns to the logger when a value is changed. func (cfg Config) Validate() (validCfg Config, err error) { + validCfg = cfg if validCfg.ListenOptions, err = cfg.ListenOptions.Validate(); err != nil { return } @@ -66,7 +63,7 @@ func (cfg Config) Validate() (validCfg Config, err error) { if cfg.EnableKeepAlive { // If keepalive is disabled, this configuration isn't used anyway. logger.Warn(). - Str("name", "http.IdleTimeout"). + Str("name", "IdleTimeout"). Dur("provided", cfg.IdleTimeout). Dur("default", validCfg.IdleTimeout). Msg("falling back to default configuration") @@ -83,7 +80,8 @@ type httpFE struct { ParseOptions } -func newFrontend(c conf.MapConfig, logic *middleware.Logic) (_ frontend.Frontend, err error) { +// NewFrontend builds and starts http bittorrent frontend from provided configuration +func NewFrontend(c conf.MapConfig, logic *middleware.Logic) (_ frontend.Frontend, err error) { var cfg Config if err = c.Unmarshal(&cfg); err != nil { return @@ -137,7 +135,9 @@ func newFrontend(c conf.MapConfig, logic *middleware.Logic) (_ frontend.Frontend go func() { ln, err := cfg.ListenTCP() - defer logger.Err(ln.Close()).Msg("closing listener") + defer func() { + logger.Err(ln.Close()).Msg("closing listener") + }() if err == nil { if f.srv.TLSConfig == nil { err = f.srv.Serve(ln) @@ -145,9 +145,7 @@ func newFrontend(c conf.MapConfig, logic *middleware.Logic) (_ frontend.Frontend err = f.srv.ServeTLS(ln, "", "") } } - if errors.Is(err, http.ErrServerClosed) { - err = nil - } else { + if !errors.Is(err, http.ErrServerClosed) { logger.Fatal().Err(err).Msg("server failed") } }() diff --git a/frontend/http/writer.go b/frontend/http/writer.go index c78ab76..3b6641c 100644 --- a/frontend/http/writer.go +++ b/frontend/http/writer.go @@ -3,12 +3,11 @@ package http import ( "bytes" "errors" + "net" "net/http" "strconv" "time" - "github.com/anacrolix/torrent/bencode" - "github.com/sot-tech/mochi/bittorrent" "github.com/sot-tech/mochi/pkg/bytepool" ) @@ -36,7 +35,6 @@ func WriteAnnounceResponse(w http.ResponseWriter, resp *bittorrent.AnnounceRespo if resp.Interval > 0 { resp.Interval /= time.Second } - if resp.Interval > 0 { resp.MinInterval /= time.Second } @@ -54,18 +52,9 @@ func WriteAnnounceResponse(w http.ResponseWriter, resp *bittorrent.AnnounceRespo // Add the peers to the dictionary in the compact format. if resp.Compact { // Add the IPv4 peers to the dictionary. - bb.WriteString("5:peersl") - for _, peer := range resp.IPv4Peers { - compactAddress(bb, peer) - } - bb.WriteByte('e') - + compactAddresses(bb, resp.IPv4Peers, false) // Add the IPv6 peers to the dictionary. - bb.WriteString("6:peers6l") - for _, peer := range resp.IPv6Peers { - compactAddress(bb, peer) - } - bb.WriteByte('e') + compactAddresses(bb, resp.IPv6Peers, true) } else { // Add the peers to the dictionary. bb.WriteString("5:peersl") @@ -83,14 +72,23 @@ func WriteAnnounceResponse(w http.ResponseWriter, resp *bittorrent.AnnounceRespo return err } -func compactAddress(bb *bytes.Buffer, peer bittorrent.Peer) { - addr, port := peer.Addr().AsSlice(), peer.Port() - bb.WriteString(strconv.Itoa(len(addr) + 2)) - bb.WriteByte(':') - bb.Write(addr) - bb.WriteByte(byte(port >> 8)) - bb.WriteByte(byte(port)) - return +func compactAddresses(bb *bytes.Buffer, peers bittorrent.Peers, v6 bool) { + l := len(peers) + if l > 0 { + key, al := "5:peers", net.IPv4len + if v6 { + key, al = "6:peers6", net.IPv6len + } + bb.WriteString(key) + bb.WriteString(strconv.Itoa((al + 2) * l)) + bb.WriteByte(':') + for _, peer := range peers { + bb.Write(peer.Addr().AsSlice()) + port := peer.Port() + bb.WriteByte(byte(port >> 8)) + bb.WriteByte(byte(port)) + } + } } func dictAddress(bb *bytes.Buffer, peer bittorrent.Peer) { @@ -100,7 +98,7 @@ func dictAddress(bb *bytes.Buffer, peer bittorrent.Peer) { bb.WriteByte(':') bb.WriteString(addr) bb.WriteString("7:peer id20:") - bb.WriteString(peer.ID.RawString()) + bb.Write(peer.ID[:]) bb.WriteString("4:porti") bb.WriteString(strconv.FormatUint(uint64(peer.Port()), 10)) bb.WriteString("ee") @@ -109,16 +107,22 @@ func dictAddress(bb *bytes.Buffer, peer bittorrent.Peer) { // WriteScrapeResponse communicates the results of a Scrape to a BitTorrent // client over HTTP. func WriteScrapeResponse(w http.ResponseWriter, resp *bittorrent.ScrapeResponse) error { - filesDict := make(map[bittorrent.InfoHash]any, len(resp.Files)) + bb := respBufferPool.Get() + defer respBufferPool.Put(bb) + bb.WriteString("d5:filesd") for _, scrape := range resp.Files { - filesDict[scrape.InfoHash] = map[string]any{ - "complete": scrape.Complete, - "downloaded": scrape.Snatches, - "incomplete": scrape.Incomplete, - } + bb.WriteString(strconv.Itoa(len(scrape.InfoHash))) + bb.WriteByte(':') + bb.Write([]byte(scrape.InfoHash)) + bb.WriteString("d8:completei") + bb.WriteString(strconv.FormatUint(uint64(scrape.Complete), 10)) + bb.WriteString("e10:downloadedi") + bb.WriteString(strconv.FormatUint(uint64(scrape.Snatches), 10)) + bb.WriteString("e10:incompletei") + bb.WriteString(strconv.FormatUint(uint64(scrape.Incomplete), 10)) + bb.WriteString("ee") } - - return bencode.NewEncoder(w).Encode(map[string]any{ - "files": filesDict, - }) + bb.WriteString("ee") + _, err := bb.WriteTo(w) + return err } diff --git a/frontend/options.go b/frontend/options.go index 3b83a5d..c104853 100644 --- a/frontend/options.go +++ b/frontend/options.go @@ -14,7 +14,8 @@ const ( ) var ( - errAddressNotProvided = errors.New("address not provided") + // ErrAddressNotProvided returned if listen address not provided in configuration + ErrAddressNotProvided = errors.New("address not provided") errUnexpectedListenerType = errors.New("unexpected listener type") ) @@ -27,15 +28,17 @@ type ListenOptions struct { EnableRequestTiming bool `cfg:"enable_request_timing"` } +// Validate checks if listen address provided and sets default +// timeout options if needed func (lo ListenOptions) Validate() (validOptions ListenOptions, err error) { validOptions = lo if len(lo.Addr) == 0 { - err = errAddressNotProvided + err = ErrAddressNotProvided } else { if lo.ReadTimeout <= 0 { validOptions.ReadTimeout = defaultReadTimeout logger.Warn(). - Str("name", "http.ReadTimeout"). + Str("name", "ReadTimeout"). Dur("provided", lo.ReadTimeout). Dur("default", validOptions.ReadTimeout). Msg("falling back to default configuration") @@ -44,7 +47,7 @@ func (lo ListenOptions) Validate() (validOptions ListenOptions, err error) { if lo.WriteTimeout <= 0 { validOptions.WriteTimeout = defaultWriteTimeout logger.Warn(). - Str("name", "http.WriteTimeout"). + Str("name", "WriteTimeout"). Dur("provided", lo.WriteTimeout). Dur("default", validOptions.WriteTimeout). Msg("falling back to default configuration") @@ -53,6 +56,9 @@ func (lo ListenOptions) Validate() (validOptions ListenOptions, err error) { return } +// ListenTCP listens at the given TCP Addr +// with SO_REUSEPORT and SO_REUSEADDR options enabled if +// ReusePort set to true func (lo ListenOptions) ListenTCP() (conn *net.TCPListener, err error) { if lo.ReusePort { var ln net.Listener @@ -71,6 +77,9 @@ func (lo ListenOptions) ListenTCP() (conn *net.TCPListener, err error) { return } +// ListenUDP listens at the given UDP Addr +// with SO_REUSEPORT and SO_REUSEADDR options enabled if +// ReusePort set to true func (lo ListenOptions) ListenUDP() (conn *net.UDPConn, err error) { if lo.ReusePort { var ln net.PacketConn diff --git a/frontend/udp/frontend.go b/frontend/udp/frontend.go index c0f5a40..6dadd61 100644 --- a/frontend/udp/frontend.go +++ b/frontend/udp/frontend.go @@ -13,8 +13,6 @@ import ( "sync" "time" - "github.com/libp2p/go-reuseport" - "github.com/sot-tech/mochi/bittorrent" "github.com/sot-tech/mochi/frontend" "github.com/sot-tech/mochi/middleware" @@ -26,19 +24,16 @@ import ( "github.com/sot-tech/mochi/pkg/timecache" ) -const Name = "http" - var ( - logger = log.NewLogger(Name) + logger = log.NewLogger("frontend/udp") allowedGeneratedPrivateKeyRunes = []rune("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ1234567890") - errUnexpectedConnType = errors.New("unexpected connection type (not UDPConn)") ) func init() { - frontend.RegisterBuilder(Name, newFrontend) + frontend.RegisterBuilder("udp", NewFrontend) } -// Config represents all of the configurable options for a UDP BitTorrent +// Config represents all the configurable options for a UDP BitTorrent // Tracker. type Config struct { frontend.ListenOptions @@ -49,10 +44,13 @@ type Config struct { // Validate sanity checks values set in a config and returns a new config with // default values replacing anything that is invalid. -// -// This function warns to the logger when a value is changed. -func (cfg Config) Validate() Config { - validcfg := cfg +func (cfg Config) Validate() (validCfg Config, err error) { + if len(cfg.Addr) == 0 { + err = frontend.ErrAddressNotProvided + return + } + + validCfg = cfg // Generate a private key if one isn't provided by the user. if cfg.PrivateKey == "" { @@ -60,43 +58,49 @@ func (cfg Config) Validate() Config { for i := range pkeyRunes { pkeyRunes[i] = allowedGeneratedPrivateKeyRunes[rand.Intn(len(allowedGeneratedPrivateKeyRunes))] } - validcfg.PrivateKey = string(pkeyRunes) + validCfg.PrivateKey = string(pkeyRunes) logger.Warn(). - Str("name", "UDP.PrivateKey"). + Str("name", "PrivateKey"). Str("provided", ""). - Str("key", validcfg.PrivateKey). + Str("key", validCfg.PrivateKey). Msg("falling back to default configuration") } - validcfg.ParseOptions = cfg.ParseOptions.Validate() + validCfg.ParseOptions = cfg.ParseOptions.Validate() - return validcfg + return } // udpFE holds the state of a UDP BitTorrent Frontend. type udpFE struct { - socket *net.UDPConn - closing chan struct{} - wg sync.WaitGroup - - genPool *sync.Pool - - logic *middleware.Logic - Config + socket *net.UDPConn + closing chan any + wg sync.WaitGroup + genPool *sync.Pool + logic *middleware.Logic + maxClockSkew time.Duration + collectTimings bool + frontend.ParseOptions } -func newFrontend(c conf.MapConfig, logic *middleware.Logic) (frontend.Frontend, error) { - var provided Config - if err := c.Unmarshal(&provided); err != nil { +// NewFrontend builds and starts udp bittorrent frontend from provided configuration +func NewFrontend(c conf.MapConfig, logic *middleware.Logic) (frontend.Frontend, error) { + var err error + var cfg Config + if err = c.Unmarshal(&cfg); err != nil { + return nil, err + } + if cfg, err = cfg.Validate(); err != nil { return nil, err } - cfg := provided.Validate() f := &udpFE{ - closing: make(chan struct{}), - logic: logic, - Config: cfg, + closing: make(chan any), + logic: logic, + maxClockSkew: cfg.MaxClockSkew, + collectTimings: cfg.EnableRequestTiming, + ParseOptions: cfg.ParseOptions, genPool: &sync.Pool{ New: func() any { return NewConnectionIDGenerator(cfg.PrivateKey) @@ -104,18 +108,16 @@ func newFrontend(c conf.MapConfig, logic *middleware.Logic) (frontend.Frontend, }, } - if err := f.listen(); err != nil { - return nil, err + if f.socket, err = cfg.ListenUDP(); err == nil { + f.wg.Add(1) + go func() { + if err := f.serve(); err != nil { + logger.Fatal().Err(err).Msg("server failed") + } + }() } - f.wg.Add(1) - go func() { - if err := f.serve(); err != nil { - logger.Fatal().Err(err).Msg("failed while serving") - } - }() - - return f, nil + return f, err } // Stop provides a thread-safe way to shut down a currently running Frontend. @@ -129,34 +131,18 @@ func (t *udpFE) Stop() stop.Result { c := make(stop.Channel) go func() { close(t.closing) - _ = t.socket.SetReadDeadline(time.Now()) - t.wg.Wait() - c.Done(t.socket.Close()) + var err error + if t.socket != nil { + _ = t.socket.SetReadDeadline(time.Now()) + t.wg.Wait() + err = t.socket.Close() + } + c.Done(err) }() return c.Result() } -// listen resolves the address and binds the server socket. -func (t *udpFE) listen() (err error) { - if t.ReusePort { - var ln net.PacketConn - if ln, err = reuseport.ListenPacket("udp", t.Addr); err == nil { - var isOk bool - if t.socket, isOk = ln.(*net.UDPConn); !isOk { - err = errUnexpectedConnType - } - } - } else { - var udpAddr *net.UDPAddr - udpAddr, err = net.ResolveUDPAddr("udp", t.Addr) - if err == nil { - t.socket, err = net.ListenUDP("udp", udpAddr) - } - } - return err -} - // serve blocks while listening and serving UDP BitTorrent requests // until Stop() is called or an error is returned. func (t *udpFE) serve() error { @@ -199,14 +185,14 @@ func (t *udpFE) serve() error { // Handle the request. addr := addrPort.Addr().Unmap() var start time.Time - if t.EnableRequestTiming && metrics.Enabled() { + if t.collectTimings && metrics.Enabled() { start = time.Now() } action, err := t.handleRequest( Request{(*buffer)[:n], addr}, ResponseWriter{t.socket, addrPort}, ) - if t.EnableRequestTiming && metrics.Enabled() { + if t.collectTimings && metrics.Enabled() { recordResponseDuration(action, addr, err, time.Since(start)) } }() @@ -251,7 +237,7 @@ func (t *udpFE) handleRequest(r Request, w ResponseWriter) (actionName string, e // If this isn't requesting a new connection ID and the connection ID is // invalid, then fail. - if actionID != connectActionID && !gen.Validate(connID, r.IP, timecache.Now(), t.MaxClockSkew) { + if actionID != connectActionID && !gen.Validate(connID, r.IP, timecache.Now(), t.maxClockSkew) { err = errBadConnectionID WriteError(w, txID, err) return diff --git a/frontend/udp/frontend_test.go b/frontend/udp/frontend_test.go index c87cee8..8ae8b8c 100644 --- a/frontend/udp/frontend_test.go +++ b/frontend/udp/frontend_test.go @@ -17,12 +17,15 @@ func init() { } func TestStartStopRaceIssue437(t *testing.T) { - ps, err := storage.NewStorage("memory", conf.MapConfig{}) + ps, err := storage.NewStorage(conf.NamedMapConfig{ + Name: "memory", + Config: conf.MapConfig{}, + }) if err != nil { t.Fatal(err) } lgc := middleware.NewLogic(0, 0, ps, nil, nil) - fe, err := udp.newFrontend(conf.MapConfig{"addr": "127.0.0.1:0"}, lgc) + fe, err := udp.NewFrontend(conf.MapConfig{"addr": "127.0.0.1:0"}, lgc) if err != nil { t.Fatal(err) } diff --git a/middleware/jwt/jwt.go b/middleware/jwt/jwt.go index 32397b5..34556ae 100644 --- a/middleware/jwt/jwt.go +++ b/middleware/jwt/jwt.go @@ -23,19 +23,17 @@ import ( "github.com/sot-tech/mochi/storage" ) -// Name is the name by which this middleware is registered with Conf. const ( - Name = "jwt" authorizationHeader = "authorization" bearerAuthPrefix = "bearer " ) func init() { - middleware.RegisterBuilder(Name, build) + middleware.RegisterBuilder("jwt", build) } var ( - logger = log.NewLogger(Name) + logger = log.NewLogger("middleware/jwt") // ErrMissingJWT is returned when a JWT is missing from a request. ErrMissingJWT = bittorrent.ClientError("unapproved request: missing jwt") @@ -74,7 +72,7 @@ func build(config conf.MapConfig, _ storage.PeerStorage) (h middleware.Hook, err var cfg Config if err = config.Unmarshal(&cfg); err != nil { - return nil, fmt.Errorf("middleware %s: %w", Name, err) + return nil, fmt.Errorf("unable to deserialise configuration: %w", err) } if len(cfg.JWKSetURL) > 0 && len(cfg.Issuer) > 0 && len(cfg.Audience) > 0 { @@ -204,7 +202,7 @@ func (h *hook) HandleScrape(ctx context.Context, req *bittorrent.ScrapeRequest, if errs := h.validateBaseJWT(jwtParam, claims); len(errs) > 0 { logger.Info(). Errs("errors", errs). - Array("source", req.RequestAddresses). + Array("source", &req.RequestAddresses). Msg("JWT validation failed") err = ErrInvalidJWT } else { @@ -215,7 +213,7 @@ func (h *hook) HandleScrape(ctx context.Context, req *bittorrent.ScrapeRequest, } else { logger.Info(). Err(err). - Array("addresses", req.RequestAddresses). + Array("addresses", &req.RequestAddresses). Msg("'infohashes' claim parse failed") } } @@ -239,7 +237,7 @@ func (h *hook) HandleScrape(ctx context.Context, req *bittorrent.ScrapeRequest, logger.Info(). Array("claimInfoHashes", claimIHs). Array("requestInfoHashes", req.InfoHashes). - Array("addresses", req.RequestAddresses). + Array("addresses", &req.RequestAddresses). Msg("unequal 'infohashes' claim when validating JWT") err = ErrInvalidJWT } diff --git a/middleware/torrentapproval/container/directory/directory.go b/middleware/torrentapproval/container/directory/directory.go index a4867ad..d002fa4 100644 --- a/middleware/torrentapproval/container/directory/directory.go +++ b/middleware/torrentapproval/container/directory/directory.go @@ -20,13 +20,10 @@ import ( "github.com/sot-tech/mochi/storage" ) -// Name of this container for registry -const Name = "directory" - -var logger = log.NewLogger("torrent approval directory") +var logger = log.NewLogger("middleware/torrent approval/directory") func init() { - container.Register(Name, build) + container.Register("directory", build) } // Config - implementation of directory container configuration. diff --git a/middleware/torrentapproval/container/list/list.go b/middleware/torrentapproval/container/list/list.go index 1ffb4ea..0440411 100644 --- a/middleware/torrentapproval/container/list/list.go +++ b/middleware/torrentapproval/container/list/list.go @@ -12,13 +12,10 @@ import ( "github.com/sot-tech/mochi/storage" ) -// Name of this container for registry. -const Name = "list" - -var logger = log.NewLogger("torrent approval list") +var logger = log.NewLogger("middleware/torrent approval/list") func init() { - container.Register(Name, build) + container.Register("list", build) } // Config - implementation of list container configuration. diff --git a/pkg/conf/config.go b/pkg/conf/config.go index 37f4f55..2b84325 100644 --- a/pkg/conf/config.go +++ b/pkg/conf/config.go @@ -52,11 +52,13 @@ func (m MapConfig) Unmarshal(into any) (err error) { return } +// NamedMapConfig encapsulates MapConfig with string Name type NamedMapConfig struct { Name string Config MapConfig } +// MarshalZerologObject writes Name and Config into zerolog event func (nm NamedMapConfig) MarshalZerologObject(e *zerolog.Event) { e.Str("name", nm.Name).Dict("config", zerolog.Dict().EmbedObject(nm.Config)) } diff --git a/storage/keydb/storage.go b/storage/keydb/storage.go index 0c9860f..dbb9acb 100644 --- a/storage/keydb/storage.go +++ b/storage/keydb/storage.go @@ -24,14 +24,10 @@ import ( r "github.com/sot-tech/mochi/storage/redis" ) -// Name is name of this storage -const ( - Name = "keydb" - expireMemberCmd = "EXPIREMEMBER" -) +const expireMemberCmd = "EXPIREMEMBER" var ( - logger = log.NewLogger(Name) + logger = log.NewLogger("storage/keydb") // errNotKeyDB returned from initializer if connected does not support KeyDB // specific command (EXPIREMEMBER) errNotKeyDB = errors.New("provided instance seems not KeyDB") @@ -39,7 +35,7 @@ var ( func init() { // Register the storage driver. - storage.RegisterDriver(Name, builder) + storage.RegisterDriver("keydb", builder) } func builder(icfg conf.MapConfig) (storage.PeerStorage, error) { diff --git a/storage/memory/storage.go b/storage/memory/storage.go index cc6fe16..b586e5f 100644 --- a/storage/memory/storage.go +++ b/storage/memory/storage.go @@ -19,17 +19,13 @@ import ( ) // Default config constants. -const ( - // Name is the name by which this peer store is registered with Conf. - Name = "memory" - defaultShardCount = 1024 -) +const defaultShardCount = 1024 -var logger = log.NewLogger(Name) +var logger = log.NewLogger("storage/memory") func init() { // Register the storage driver. - storage.RegisterDriver(Name, builder) + storage.RegisterDriver("memory", builder) } func builder(icfg conf.MapConfig) (storage.PeerStorage, error) { diff --git a/storage/pg/storage.go b/storage/pg/storage.go index 2d2d01a..7e8dd1b 100644 --- a/storage/pg/storage.go +++ b/storage/pg/storage.go @@ -26,9 +26,6 @@ import ( ) const ( - // Name is the name by which this peer store is registered with Conf. - Name = "pg" - defaultPingQuery = "SELECT 0" errRequiredParameterNotSetMsg = "required parameter not provided: %s" @@ -49,14 +46,13 @@ const ( ) var ( - logger = log.NewLogger(Name) - + logger = log.NewLogger("storage/pg") errConnectionStringNotProvided = errors.New("database connection string not provided") ) func init() { // Register the storage builder. - storage.RegisterDriver(Name, builder) + storage.RegisterDriver("pg", builder) } func builder(icfg conf.MapConfig) (storage.PeerStorage, error) { diff --git a/storage/redis/storage.go b/storage/redis/storage.go index 45a5dc9..f7c26bf 100644 --- a/storage/redis/storage.go +++ b/storage/redis/storage.go @@ -42,8 +42,6 @@ import ( ) const ( - // Name is the name by which this peer store is registered with Conf. - Name = "redis" // Default config constants. defaultRedisAddress = "127.0.0.1:6379" defaultReadTimeout = time.Second * 15 @@ -70,14 +68,14 @@ const ( ) var ( - logger = log.NewLogger(Name) + logger = log.NewLogger("storage/redis") // errSentinelAndClusterChecked returned from initializer if both Config.Sentinel and Config.Cluster provided errSentinelAndClusterChecked = errors.New("unable to use both cluster and sentinel mode") ) func init() { // Register the storage builder. - storage.RegisterDriver(Name, builder) + storage.RegisterDriver("redis", builder) } func builder(icfg conf.MapConfig) (storage.PeerStorage, error) {