diff --git a/cmd/mochi/config.go b/cmd/mochi/config.go index 5eee094..9245b53 100644 --- a/cmd/mochi/config.go +++ b/cmd/mochi/config.go @@ -3,12 +3,11 @@ package main import ( "errors" "os" + "time" "gopkg.in/yaml.v3" - "github.com/sot-tech/mochi/frontend/http" - "github.com/sot-tech/mochi/frontend/udp" - "github.com/sot-tech/mochi/middleware" + "github.com/sot-tech/mochi/pkg/conf" // Imports to register middleware drivers. _ "github.com/sot-tech/mochi/middleware/clientapproval" @@ -21,38 +20,24 @@ import ( _ "github.com/sot-tech/mochi/storage/redis" ) -type storageConfig struct { - Name string `yaml:"name"` - Config any `yaml:"config"` -} - // Config represents the configuration used for executing Conf. type Config struct { - middleware.ResponseConfig `yaml:",inline"` - MetricsAddr string `yaml:"metrics_addr"` - HTTPConfig http.Config `yaml:"http"` - UDPConfig udp.Config `yaml:"udp"` - Storage storageConfig `yaml:"storage"` - PreHooks []middleware.Config `yaml:"prehooks"` - PostHooks []middleware.Config `yaml:"posthooks"` -} - -// PreHookNames returns only the names of the configured middleware. -func (cfg Config) PreHookNames() (names []string) { - for _, hook := range cfg.PreHooks { - names = append(names, hook.Name) - } - - return -} - -// PostHookNames returns only the names of the configured middleware. -func (cfg Config) PostHookNames() (names []string) { - for _, hook := range cfg.PostHooks { - names = append(names, hook.Name) - } - - return + // TODO(jzelinskie): Evaluate whether we would like to make + // AnnounceInterval and MinAnnounceInterval optional. + // We can make Conf extensible enough that you can program a new response + // generator at the cost of making it possible for users to create config that + // won't compose a functional tracker. + AnnounceInterval time.Duration `yaml:"announce_interval"` + MinAnnounceInterval time.Duration `yaml:"min_announce_interval"` + MetricsAddr string `yaml:"metrics_addr"` + HTTPConfig conf.MapConfig `yaml:"http"` + UDPConfig conf.MapConfig `yaml:"udp"` + Storage struct { + Name string `yaml:"name"` + Config conf.MapConfig `yaml:"config"` + } `yaml:"storage"` + PreHooks []conf.MapConfig `yaml:"prehooks"` + PostHooks []conf.MapConfig `yaml:"posthooks"` } // ConfigFile represents a namespaced YAML configation file. diff --git a/cmd/mochi/main.go b/cmd/mochi/main.go index a2ad00c..4904caa 100644 --- a/cmd/mochi/main.go +++ b/cmd/mochi/main.go @@ -15,6 +15,7 @@ import ( "github.com/sot-tech/mochi/frontend/http" "github.com/sot-tech/mochi/frontend/udp" "github.com/sot-tech/mochi/middleware" + "github.com/sot-tech/mochi/pkg/conf" "github.com/sot-tech/mochi/pkg/log" "github.com/sot-tech/mochi/pkg/metrics" _ "github.com/sot-tech/mochi/pkg/randseed" @@ -79,28 +80,26 @@ func (r *Run) Start(ps storage.Storage) error { return fmt.Errorf("failed to validate hook config: %w", err) } - log.Info("starting tracker logic", log.Fields{ - "prehooks": cfg.PreHookNames(), - "posthooks": cfg.PostHookNames(), - }) - r.logic = middleware.NewLogic(cfg.ResponseConfig, r.storage, preHooks, postHooks) + r.logic = middleware.NewLogic(cfg.AnnounceInterval, cfg.MinAnnounceInterval, r.storage, preHooks, postHooks) - if cfg.HTTPConfig.Addr != "" { + if len(cfg.HTTPConfig) > 0 { log.Info("starting HTTP frontend", cfg.HTTPConfig) - httpfe, err := http.NewFrontend(r.logic, cfg.HTTPConfig) - if err != nil { + httpFE, err := http.NewFrontend(r.logic, cfg.HTTPConfig) + if err == nil { + r.sg.Add(httpFE) + } else if !errors.Is(err, conf.ErrNilConfigMap) { return err } - r.sg.Add(httpfe) } - if cfg.UDPConfig.Addr != "" { + if len(cfg.UDPConfig) > 0 { log.Info("starting UDP frontend", cfg.UDPConfig) - udpfe, err := udp.NewFrontend(r.logic, cfg.UDPConfig) - if err != nil { + udpFE, err := udp.NewFrontend(r.logic, cfg.UDPConfig) + if err == nil { + r.sg.Add(udpFE) + } else if !errors.Is(err, conf.ErrNilConfigMap) { return err } - r.sg.Add(udpfe) } return nil diff --git a/dist/example_config.yaml b/dist/example_config.yaml index 96dd62e..dd1b763 100644 --- a/dist/example_config.yaml +++ b/dist/example_config.yaml @@ -140,66 +140,91 @@ mochi: prometheus_reporting_interval: 1s # This block defines configuration used for redis storage. - # storage: - # name: redis - # config: - # # The frequency which stale peers are removed. - # # This balances between - # # - collecting garbage more often, potentially using more CPU time, but potentially using less memory (lower value) - # # - collecting garbage less frequently, saving CPU time, but keeping old peers long, thus using more memory (higher value). - # gc_interval: 3m + #storage: + #name: redis + #config: + # The frequency which stale peers are removed. + # This balances between + # - collecting garbage more often, potentially using more CPU time, but potentially using less memory (lower value) + # - collecting garbage less frequently, saving CPU time, but keeping old peers long, thus using more memory (higher value). + #gc_interval: 3m - # # The interval at which metrics about the number of infohashes and peers - # # are collected and posted to Prometheus. - # prometheus_reporting_interval: 1s + # The interval at which metrics about the number of infohashes and peers + # are collected and posted to Prometheus. + #prometheus_reporting_interval: 1s - # # The amount of time until a peer is considered stale. - # # To avoid churn, keep this slightly larger than `announce_interval` - # peer_lifetime: 31m + # The amount of time until a peer is considered stale. + # To avoid churn, keep this slightly larger than `announce_interval` + #peer_lifetime: 31m - # # The address of redis storage. - # redis_broker: "redis://pwd@127.0.0.1:6379/0" + # The addresses of redis storage. + # If neither sentinel not cluster switched, + # only first address used + #addresses: ["127.0.0.1:6379"] - # # The timeout for reading a command reply from redis. - # read_timeout: 15s + # Database to be selected after connecting to the server. + #db: 0 - # # The timeout for writing a command to redis. - # write_timeout: 15s + # Maximum number of socket connections, default is 10 per CPU + #pool_size: 10 - # # The timeout for connecting to redis server. - # connect_timeout: 15s + # Use the specified login/username to authenticate the current connection + #login: "" + + # Optional password + #password: "" + + # Connect to sentinel nodes + #sentinel: false + + # The master name + #sentinel_master: "" + + # Connect to the redis cluster + #cluster: false + + # The timeout for reading a command reply from redis. + #read_timeout: 15s + + # The timeout for writing a command to redis. + #write_timeout: 15s + + # Dial timeout for establishing new connections. + #connect_timeout: 15s # This block defines configuration used for middleware executed before a # response has been returned to a BitTorrent client. prehooks: - #- name: jwt - # options: - # issuer: "https://issuer.com" - # audience: "https://some.issuer.com" - # jwk_set_url: "https://issuer.com/keys" - # jwk_set_update_interval: 5m - - #- name: client approval - # options: - # whitelist: - # - "OP1011" - # blacklist: - # - "OP1012" - - #- name: interval variation - # options: - # modify_response_probability: 0.2 - # max_increase_delta: 60 - # modify_min_interval: true - - # This block defines configuration used for torrent approval, it requires to be given - # hashes for whitelist or for blacklist. Hashes are hexadecimal-encoaded. - #- name: torrent approval - # options: - # initial_source: list - # configuration: - # hash_list: - # - "a1b2c3d4e5a1b2c3d4e5a1b2c3d4e5a1b2c3d4e5" - # invert: false - # storage_ctx: APPROVED_HASH - +# - name: jwt +# options: +# issuer: "https://issuer.com" +# audience: "https://some.issuer.com" +# jwk_set_url: "https://issuer.com/keys" +# jwk_set_update_interval: 5m +# +# - name: client approval +# options: +# whitelist: +# - "OP1011" +# blacklist: +# - "OP1012" +# +# - name: interval variation +# options: +# modify_response_probability: 0.2 +# max_increase_delta: 60 +# modify_min_interval: true +# + # This block defines configuration used for torrent approval, it requires to be given + # hashes for whitelist or for blacklist. Hashes are hexadecimal-encoaded. +# - name: torrent approval +# options: +# initial_source: list +# configuration: +# hash_list: +# - "a1b2c3d4e5a1b2c3d4e5a1b2c3d4e5a1b2c3d4e5" + # true - whitelist mode, false - blacklist +# invert: false + # Name of storage context where store hash list +# storage_ctx: APPROVED_HASH + posthooks: diff --git a/docs/architecture.md b/docs/architecture.md index 0864d91..77e67c8 100644 --- a/docs/architecture.md +++ b/docs/architecture.md @@ -2,12 +2,22 @@ ### Overview -BitTorrent clients send Announce and Scrape requests to a _Frontend_. Frontends parse requests and write responses for -the particular protocol they implement. The _TrackerLogic_ interface is used to generate responses for requests and -optionally perform a task after responding to a client. A configurable chain of _PreHook_ and _PostHook_ middleware is -used to construct an instance of TrackerLogic. PreHooks are middleware that are executed before the response has been +BitTorrent clients send Announce and Scrape requests to a _Frontend_. + +Frontends parse requests and write responses for +the particular protocol they implement. + +The _TrackerLogic_ interface is used to generate responses for requests and +optionally perform a task after responding to a client. + +A configurable chain of _PreHook_ and _PostHook_ middleware is +used to construct an instance of TrackerLogic. + +PreHooks are middleware that are executed before the response has been written. After all PreHooks have executed, any missing response fields that are required are filled by reading out of -the configured implementation of the _Storage_ interface. PostHooks are asynchronous tasks that occur after a response +the configured implementation of the _Storage_ interface. + +PostHooks are asynchronous tasks that occur after a response has been delivered to the client. Because they are unnecessary to for generating a response, updates to the Storage for a particular request are done asynchronously in a PostHook. diff --git a/docs/frontend.md b/docs/frontend.md index 3dd2f31..1562b22 100644 --- a/docs/frontend.md +++ b/docs/frontend.md @@ -1,10 +1,10 @@ # Frontends -A _Frontend_ is a component of Chihaya that serves a BitTorrent tracker on one protocol. The frontend accepts, parses +A _Frontend_ is a component of MoChi that serves a BitTorrent tracker on one protocol. The frontend accepts, parses and sanitizes requests, passes them to the _Logic_ and writes responses to _Clients_. This documentation first gives a high-level overview of Frontends and later goes into implementation specifics. Users of -Chihaya are expected to just read the first part - developers should read both. +MoChi are expected to just read the first part - developers should read both. ## Functionality @@ -19,7 +19,7 @@ answers each of them with one response, a basic overview of the control flow is: ## Available Frontends -Chihaya ships with frontends for HTTP(S) and UDP. The HTTP frontend uses Go's `http` package. The UDP frontend +MoChi ships with frontends for HTTP(S) and UDP. The HTTP frontend uses Go's `http` package. The UDP frontend implements both [old-opentracker-style] IPv6 and the IPv6 support specified in [BEP 15]. The advantage of the old opentracker style is that it contains a usable IPv6 `ip` field, to enable IP overrides in announces. diff --git a/docs/middleware/interval_variation.md b/docs/middleware/interval_variation.md index 16021b0..914846e 100644 --- a/docs/middleware/interval_variation.md +++ b/docs/middleware/interval_variation.md @@ -28,10 +28,10 @@ An example config might look like this: ```yaml mochi: - prehooks: - - name: interval variation - config: - modify_response_probability: 0.2 - max_increase_delta: 60 - modify_min_interval: true + prehooks: + - name: interval variation + config: + modify_response_probability: 0.2 + max_increase_delta: 60 + modify_min_interval: true ``` diff --git a/docs/middleware/torrent_approval.md b/docs/middleware/torrent_approval.md new file mode 100644 index 0000000..8d3a2d1 --- /dev/null +++ b/docs/middleware/torrent_approval.md @@ -0,0 +1,64 @@ +# Approved torrents list + +Package `torrentapproval` can be used for only allow or block +specified hashes or block specified hashes. + +## Functionality + +As said above, there are two modes of approval: white list and black list. + +If mode is **white list** (`invert` set to `false`), tracker works in +_semi-private_ mode, which means, than peers can could share and receive info +about only specified list of torrents' hashes. + +I.e.: if configuration contains hash `AAAA`, but peer announces hash `BBBB` +tracker will return `unapproved torrent` message back to peer. + +If mode is **black list** (`invert` set to `true`), tracker will allow all hashes +**except** specified. + +## Hash sources + +There are two sources of hashes: `list` and `directory`. + +Both of them used as **INITIAL** source for storing in storage. +If storage is not `memory`, records are persisted until _somebody_ +or _something_ (different tool with access to storage) won't delete it. + +`list` is the static set of hashes, specified in configuration file. + +`directory` will watch for `*.torrent` files in specified path and +append/delete records from storage. This source will parse all existing +files at start and then watch for new files to add, or for delete events +to remove hash from storage. + +## Configuration + +This middleware provides the following parameters for configuration: + +- `initial_source` - source type: `list` or `directory` +- `configuration` - options for specified source + - `list`: + - `hash_list` - list of HEX encoded hashes + - `invert` - working mode: `true` - black list, `false` - white list + - `storage_ctx` - name of storage _context_ where to store data. + It may be redis hash key, DB table name etc. + - `directory`: + - `path` - directory to watch + - `invert` and `storage_ctx` has the same meanins as `list`'s options + +Configuration example: + +An example config might look like this: + +```yaml +mochi: + prehooks: + - name: torrent approval + options: + initial_source: list + configuration: + hash_list: ["AAA", "BBB"] + invert: false + storage_ctx: APPROVED_HASH +``` diff --git a/docs/storage/redis.md b/docs/storage/redis.md index 588838a..20a4717 100644 --- a/docs/storage/redis.md +++ b/docs/storage/redis.md @@ -1,17 +1,17 @@ # Redis Storage -This storage implementation separates Chihaya from its storage service. Chihaya achieves HA by storing all peer data in -Redis. Multiple instances of Chihaya can use the same redis instance concurrently. The storage service can get HA by -clustering. If one instance of Chihaya goes down, peer data will still be available in Redis. +This storage implementation separates MoChi from its storage service. MoChi achieves HA by storing all peer data in +Redis. Multiple instances of MoChi can use the same redis instance concurrently. The storage service can get HA by +clustering. If one instance of MoChi goes down, peer data will still be available in Redis. The HA of storage service is not considered here. In case Redis runs as a single node, peer data will be unavailable if -the node is down. You should consider setting up a Redis cluster for Chihaya in production. +the node is down. You should consider setting up a Redis sentinel (or KeyDB active-active replication) for MoChi in production. This storage implementation is currently orders of magnitude slower than the in-memory implementation. ## Use Case -When one instance of Chihaya is down, other instances can continue serving peers from Redis. +When one instance of MoChi is down, other instances can continue serving peers from Redis. ## Configuration @@ -57,27 +57,24 @@ time as value. Here is an example: ``` -- IPv4 - - IPv4_S_: - - IPv4_L_: - - IPv4_S_: -- IPv4_S_ - - : - - : -- IPv4_L_ - - : -- IPv4_S_ - - : +- CHI_4_I + - CHI_4_S_ + - CHI_4_L_ +- CHI_4_S_ + - : + - : +- CHI_4_L_ + - : +... ``` In this case, prometheus would record two swarms, three seeders, and one leecher. These three keys per address family are used to record the count of swarms, seeders, and leechers. ``` -- IPv4_infohash_count: 2 -- IPv4_S_count: 3 -- IPv4_L_count: 1 +- CHI_4_S_C: "3" +- CHI_6_L_C: "1" ``` -Note: IPv4_infohash_count has a different meaning compared to the `memory` storage: -It represents the number of infohashes reported by seeder, meaning that infohashes without seeders are not counted. +Note: `CHI_4_I` set has a different meaning compared to the `memory` storage: +It represents info hashes reported by seeder, meaning that info hashes without seeders are not counted. diff --git a/frontend/http/frontend.go b/frontend/http/frontend.go index ffbf910..058776f 100644 --- a/frontend/http/frontend.go +++ b/frontend/http/frontend.go @@ -15,6 +15,7 @@ import ( "github.com/sot-tech/mochi/bittorrent" "github.com/sot-tech/mochi/frontend" + "github.com/sot-tech/mochi/pkg/conf" "github.com/sot-tech/mochi/pkg/log" "github.com/sot-tech/mochi/pkg/stop" ) @@ -22,19 +23,19 @@ import ( // Config represents all of the configurable options for an HTTP BitTorrent // Frontend. type Config struct { - Addr string `yaml:"addr"` - HTTPSAddr string `yaml:"https_addr"` - ReadTimeout time.Duration `yaml:"read_timeout"` - WriteTimeout time.Duration `yaml:"write_timeout"` - IdleTimeout time.Duration `yaml:"idle_timeout"` - EnableKeepAlive bool `yaml:"enable_keepalive"` - TLSCertPath string `yaml:"tls_cert_path"` - TLSKeyPath string `yaml:"tls_key_path"` - AnnounceRoutes []string `yaml:"announce_routes"` - ScrapeRoutes []string `yaml:"scrape_routes"` - PingRoutes []string `yaml:"ping_routes"` - EnableRequestTiming bool `yaml:"enable_request_timing"` - ParseOptions `yaml:",inline"` + Addr string + HTTPSAddr string `cfg:"https_addr"` + ReadTimeout time.Duration `cfg:"read_timeout"` + WriteTimeout time.Duration `cfg:"write_timeout"` + IdleTimeout time.Duration `cfg:"idle_timeout"` + EnableKeepAlive bool `cfg:"enable_keepalive"` + TLSCertPath string `cfg:"tls_cert_path"` + TLSKeyPath string `cfg:"tls_key_path"` + AnnounceRoutes []string `cfg:"announce_routes"` + ScrapeRoutes []string `cfg:"scrape_routes"` + PingRoutes []string `cfg:"ping_routes"` + EnableRequestTiming bool `cfg:"enable_request_timing"` + ParseOptions } // LogFields renders the current config as a set of Logrus fields. @@ -147,7 +148,11 @@ type Frontend struct { // NewFrontend creates a new instance of an HTTP Frontend that asynchronously // serves requests. -func NewFrontend(logic frontend.TrackerLogic, provided Config) (*Frontend, error) { +func NewFrontend(logic frontend.TrackerLogic, c conf.MapConfig) (*Frontend, error) { + var provided Config + if err := c.Unmarshal(&provided); err != nil { + return nil, err + } cfg := provided.Validate() f := &Frontend{ diff --git a/frontend/http/parser.go b/frontend/http/parser.go index 727a774..9056b49 100644 --- a/frontend/http/parser.go +++ b/frontend/http/parser.go @@ -14,11 +14,11 @@ import ( // If RealIPHeader is not empty string, the value of the first HTTP Header with // that name will be used. type ParseOptions struct { - AllowIPSpoofing bool `yaml:"allow_ip_spoofing"` - RealIPHeader string `yaml:"real_ip_header"` - MaxNumWant uint32 `yaml:"max_numwant"` - DefaultNumWant uint32 `yaml:"default_numwant"` - MaxScrapeInfoHashes uint32 `yaml:"max_scrape_infohashes"` + AllowIPSpoofing bool `cfg:"allow_ip_spoofing"` + RealIPHeader string `cfg:"real_ip_header"` + MaxNumWant uint32 `cfg:"max_numwant"` + DefaultNumWant uint32 `cfg:"default_numwant"` + MaxScrapeInfoHashes uint32 `cfg:"max_scrape_infohashes"` } // Default parser config constants. diff --git a/frontend/udp/frontend.go b/frontend/udp/frontend.go index 8e97a51..8882df0 100644 --- a/frontend/udp/frontend.go +++ b/frontend/udp/frontend.go @@ -16,6 +16,7 @@ import ( "github.com/sot-tech/mochi/bittorrent" "github.com/sot-tech/mochi/frontend" "github.com/sot-tech/mochi/frontend/udp/bytepool" + "github.com/sot-tech/mochi/pkg/conf" "github.com/sot-tech/mochi/pkg/log" "github.com/sot-tech/mochi/pkg/stop" "github.com/sot-tech/mochi/pkg/timecache" @@ -26,11 +27,11 @@ var allowedGeneratedPrivateKeyRunes = []rune("abcdefghijklmnopqrstuvwxyzABCDEFGH // Config represents all of the configurable options for a UDP BitTorrent // Tracker. type Config struct { - Addr string `yaml:"addr"` - PrivateKey string `yaml:"private_key"` - MaxClockSkew time.Duration `yaml:"max_clock_skew"` - EnableRequestTiming bool `yaml:"enable_request_timing"` - ParseOptions `yaml:",inline"` + Addr string + PrivateKey string `cfg:"private_key"` + MaxClockSkew time.Duration `cfg:"max_clock_skew"` + EnableRequestTiming bool `cfg:"enable_request_timing"` + ParseOptions } // LogFields renders the current config as a set of Logrus fields. @@ -109,7 +110,11 @@ type Frontend struct { // NewFrontend creates a new instance of an UDP Frontend that asynchronously // serves requests. -func NewFrontend(logic frontend.TrackerLogic, provided Config) (*Frontend, error) { +func NewFrontend(logic frontend.TrackerLogic, c conf.MapConfig) (*Frontend, error) { + var provided Config + if err := c.Unmarshal(&provided); err != nil { + return nil, err + } cfg := provided.Validate() f := &Frontend{ @@ -136,7 +141,7 @@ func NewFrontend(logic frontend.TrackerLogic, provided Config) (*Frontend, error return f, nil } -// Stop provides a thread-safe way to shutdown a currently running Frontend. +// Stop provides a thread-safe way to shut down a currently running Frontend. func (t *Frontend) Stop() stop.Result { select { case <-t.closing: diff --git a/frontend/udp/frontend_test.go b/frontend/udp/frontend_test.go index 32227d0..b1b48ac 100644 --- a/frontend/udp/frontend_test.go +++ b/frontend/udp/frontend_test.go @@ -5,19 +5,19 @@ import ( "github.com/sot-tech/mochi/frontend/udp" "github.com/sot-tech/mochi/middleware" + "github.com/sot-tech/mochi/pkg/conf" _ "github.com/sot-tech/mochi/pkg/randseed" "github.com/sot-tech/mochi/storage" _ "github.com/sot-tech/mochi/storage/memory" ) func TestStartStopRaceIssue437(t *testing.T) { - ps, err := storage.NewStorage("memory", nil) + ps, err := storage.NewStorage("memory", conf.MapConfig{}) if err != nil { t.Fatal(err) } - var responseConfig middleware.ResponseConfig - lgc := middleware.NewLogic(responseConfig, ps, nil, nil) - fe, err := udp.NewFrontend(lgc, udp.Config{Addr: "127.0.0.1:0"}) + lgc := middleware.NewLogic(0, 0, ps, nil, nil) + fe, err := udp.NewFrontend(lgc, conf.MapConfig{"addr": "127.0.0.1:0"}) if err != nil { t.Fatal(err) } diff --git a/frontend/udp/parser.go b/frontend/udp/parser.go index 4aa3539..7ffbdf7 100644 --- a/frontend/udp/parser.go +++ b/frontend/udp/parser.go @@ -51,10 +51,10 @@ var ( // // If AllowIPSpoofing is true, IPs provided via params will be used. type ParseOptions struct { - AllowIPSpoofing bool `yaml:"allow_ip_spoofing"` - MaxNumWant uint32 `yaml:"max_numwant"` - DefaultNumWant uint32 `yaml:"default_numwant"` - MaxScrapeInfoHashes uint32 `yaml:"max_scrape_infohashes"` + AllowIPSpoofing bool `cfg:"allow_ip_spoofing"` + MaxNumWant uint32 `cfg:"max_numwant"` + DefaultNumWant uint32 `cfg:"default_numwant"` + MaxScrapeInfoHashes uint32 `cfg:"max_scrape_infohashes"` } // Default parser config constants. diff --git a/go.mod b/go.mod index c1530b7..7162562 100644 --- a/go.mod +++ b/go.mod @@ -10,6 +10,7 @@ require ( github.com/julienschmidt/httprouter v1.3.0 github.com/mendsley/gojwk v0.0.0-20141217222730-4d5ec6e58103 github.com/minio/sha256-simd v1.0.0 + github.com/mitchellh/mapstructure v1.4.3 github.com/pkg/errors v0.9.1 github.com/prometheus/client_golang v1.12.1 github.com/sirupsen/logrus v1.8.1 diff --git a/go.sum b/go.sum index e7171ce..3c22fda 100644 --- a/go.sum +++ b/go.sum @@ -247,6 +247,8 @@ github.com/mendsley/gojwk v0.0.0-20141217222730-4d5ec6e58103 h1:Z/i1e+gTZrmcGeZy github.com/mendsley/gojwk v0.0.0-20141217222730-4d5ec6e58103/go.mod h1:o9YPB5aGP8ob35Vy6+vyq3P3bWe7NQWzf+JLiXCiMaE= github.com/minio/sha256-simd v1.0.0 h1:v1ta+49hkWZyvaKwrQB8elexRqm6Y0aMLjCNsrYxo6g= github.com/minio/sha256-simd v1.0.0/go.mod h1:OuYzVNI5vcoYIAmbIvHPl3N3jUzVedXbKy5RFepssQM= +github.com/mitchellh/mapstructure v1.4.3 h1:OVowDSCllw/YjdLkam3/sm7wEtOy59d8ndGgCcyj8cs= +github.com/mitchellh/mapstructure v1.4.3/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo= github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= diff --git a/middleware/clientapproval/clientapproval.go b/middleware/clientapproval/clientapproval.go index 9c22cfb..2a20fc3 100644 --- a/middleware/clientapproval/clientapproval.go +++ b/middleware/clientapproval/clientapproval.go @@ -7,10 +7,9 @@ import ( "errors" "fmt" - "gopkg.in/yaml.v3" - "github.com/sot-tech/mochi/bittorrent" "github.com/sot-tech/mochi/middleware" + "github.com/sot-tech/mochi/pkg/conf" "github.com/sot-tech/mochi/storage" ) @@ -18,31 +17,21 @@ import ( const Name = "client approval" func init() { - middleware.RegisterDriver(Name, driver{}) + middleware.RegisterBuilder(Name, build) } -var _ middleware.Driver = driver{} +var ( + // ErrClientUnapproved is the error returned when a client's PeerID is invalid. + ErrClientUnapproved = bittorrent.ClientError("unapproved client") -type driver struct{} - -func (d driver) NewHook(optionBytes []byte, _ storage.Storage) (middleware.Hook, error) { - var cfg Config - err := yaml.Unmarshal(optionBytes, &cfg) - if err != nil { - return nil, fmt.Errorf("invalid options for middleware %s: %w", Name, err) - } - - return NewHook(cfg) -} - -// ErrClientUnapproved is the error returned when a client's PeerID is invalid. -var ErrClientUnapproved = bittorrent.ClientError("unapproved client") + errBothListsProvided = errors.New("using both whitelist and blacklist is invalid") +) // Config represents all the values required by this middleware to validate // peers based on their BitTorrent client ID. type Config struct { - Whitelist []string `yaml:"whitelist"` - Blacklist []string `yaml:"blacklist"` + Whitelist []string + Blacklist []string } type hook struct { @@ -50,15 +39,20 @@ type hook struct { unapproved map[ClientID]struct{} } -// NewHook returns an instance of the client approval middleware. -func NewHook(cfg Config) (middleware.Hook, error) { +func build(options conf.MapConfig, _ storage.Storage) (middleware.Hook, error) { + var cfg Config + + if err := options.Unmarshal(&cfg); err != nil { + return nil, fmt.Errorf("middleware %s: %w", Name, err) + } + h := &hook{ approved: make(map[ClientID]struct{}), unapproved: make(map[ClientID]struct{}), } if len(cfg.Whitelist) > 0 && len(cfg.Blacklist) > 0 { - return nil, fmt.Errorf("using both whitelist and blacklist is invalid") + return nil, errBothListsProvided } for _, cidString := range cfg.Whitelist { diff --git a/middleware/clientapproval/clientapproval_test.go b/middleware/clientapproval/clientapproval_test.go index 4b9fb01..af3a462 100644 --- a/middleware/clientapproval/clientapproval_test.go +++ b/middleware/clientapproval/clientapproval_test.go @@ -8,6 +8,7 @@ import ( "github.com/stretchr/testify/require" "github.com/sot-tech/mochi/bittorrent" + "github.com/sot-tech/mochi/pkg/conf" ) var cases = []struct { @@ -52,7 +53,8 @@ var cases = []struct { func TestHandleAnnounce(t *testing.T) { for _, tt := range cases { t.Run(fmt.Sprintf("testing peerid %s", tt.peerID), func(t *testing.T) { - h, err := NewHook(tt.cfg) + c := conf.MapConfig{"whitelist": tt.cfg.Whitelist, "blacklist": tt.cfg.Blacklist} + h, err := build(c, nil) require.Nil(t, err) ctx := context.Background() diff --git a/middleware/jwt/jwt.go b/middleware/jwt/jwt.go index a0e2937..f5375db 100644 --- a/middleware/jwt/jwt.go +++ b/middleware/jwt/jwt.go @@ -21,10 +21,10 @@ import ( "github.com/SermoDigital/jose/jws" "github.com/SermoDigital/jose/jwt" "github.com/mendsley/gojwk" - "gopkg.in/yaml.v3" "github.com/sot-tech/mochi/bittorrent" "github.com/sot-tech/mochi/middleware" + "github.com/sot-tech/mochi/pkg/conf" "github.com/sot-tech/mochi/pkg/log" "github.com/sot-tech/mochi/pkg/stop" "github.com/sot-tech/mochi/storage" @@ -34,21 +34,7 @@ import ( const Name = "jwt" func init() { - middleware.RegisterDriver(Name, driver{}) -} - -var _ middleware.Driver = driver{} - -type driver struct{} - -func (d driver) NewHook(optionBytes []byte, _ storage.Storage) (middleware.Hook, error) { - var cfg Config - err := yaml.Unmarshal(optionBytes, &cfg) - if err != nil { - return nil, fmt.Errorf("invalid options for middleware %s: %w", Name, err) - } - - return NewHook(cfg) + middleware.RegisterBuilder(Name, build) } var ( @@ -62,10 +48,10 @@ var ( // Config represents all the values required by this middleware to fetch JWKs // and verify JWTs. type Config struct { - Issuer string `yaml:"issuer"` - Audience string `yaml:"audience"` - JWKSetURL string `yaml:"jwk_set_url"` - JWKUpdateInterval time.Duration `yaml:"jwk_set_update_interval"` + Issuer string + Audience string + JWKSetURL string `cfg:"jwk_set_url"` + JWKUpdateInterval time.Duration `cfg:"jwk_set_update_interval"` } // LogFields implements log.Fielder for a Config. @@ -84,9 +70,14 @@ type hook struct { closing chan struct{} } -// NewHook returns an instance of the JWT middleware. -func NewHook(cfg Config) (middleware.Hook, error) { - log.Debug("creating new JWT middleware", cfg) +func build(options conf.MapConfig, _ storage.Storage) (middleware.Hook, error) { + var cfg Config + + if err := options.Unmarshal(&cfg); err != nil { + return nil, fmt.Errorf("middleware %s: %w", Name, err) + } + + log.Debug("creating new JWT middleware", options) h := &hook{ cfg: cfg, publicKeys: map[string]crypto.PublicKey{}, diff --git a/middleware/logic.go b/middleware/logic.go index 7a1d7d1..f2b224e 100644 --- a/middleware/logic.go +++ b/middleware/logic.go @@ -11,25 +11,14 @@ import ( "github.com/sot-tech/mochi/storage" ) -// ResponseConfig holds the configuration used for the actual response. -// -// TODO(jzelinskie): Evaluate whether we would like to make this optional. -// We can make Conf extensible enough that you can program a new response -// generator at the cost of making it possible for users to create config that -// won't compose a functional tracker. -type ResponseConfig struct { - AnnounceInterval time.Duration `yaml:"announce_interval"` - MinAnnounceInterval time.Duration `yaml:"min_announce_interval"` -} - var _ frontend.TrackerLogic = &Logic{} // NewLogic creates a new instance of a TrackerLogic that executes the provided // middleware hooks. -func NewLogic(cfg ResponseConfig, peerStore storage.Storage, preHooks, postHooks []Hook) *Logic { +func NewLogic(annInterval, minAnnInterval time.Duration, peerStore storage.Storage, preHooks, postHooks []Hook) *Logic { return &Logic{ - announceInterval: cfg.AnnounceInterval, - minAnnounceInterval: cfg.MinAnnounceInterval, + announceInterval: annInterval, + minAnnounceInterval: minAnnInterval, preHooks: append(preHooks, &responseHook{store: peerStore}), postHooks: append(postHooks, &swarmInteractionHook{store: peerStore}), } diff --git a/middleware/middleware.go b/middleware/middleware.go index a0f6995..238d8d7 100644 --- a/middleware/middleware.go +++ b/middleware/middleware.go @@ -6,87 +6,84 @@ import ( "errors" "sync" - "gopkg.in/yaml.v3" - + "github.com/sot-tech/mochi/pkg/conf" "github.com/sot-tech/mochi/storage" ) var ( driversM sync.RWMutex - drivers = make(map[string]Driver) + drivers = make(map[string]Builder) - // ErrDriverDoesNotExist is the error returned by NewMiddleware when a + // ErrBuilderDoesNotExist is the error returned by NewMiddleware when a // middleware driver with that name does not exist. - ErrDriverDoesNotExist = errors.New("middleware driver with that name does not exist") + ErrBuilderDoesNotExist = errors.New("middleware builder with that name does not exist") ) -// Driver is the interface used to initialize a new type of middleware. +// Builder is the interface used to initialize a new type of middleware. // -// The options parameter is YAML encoded bytes that should be unmarshalled into +// The `options` parameter is map of parameters that should be unmarshalled into // the hook's custom configuration. -type Driver interface { - NewHook(options []byte, storage storage.Storage) (Hook, error) -} +type Builder func(options conf.MapConfig, storage storage.Storage) (Hook, error) -// RegisterDriver makes a Driver available by the provided name. +// RegisterBuilder makes a Builder available by the provided name. // // If called twice with the same name, the name is blank, or if the provided -// Driver is nil, this function panics. -func RegisterDriver(name string, d Driver) { +// Builder is nil, this function panics. +func RegisterBuilder(name string, d Builder) { if name == "" { - panic("middleware: could not register a Driver with an empty name") + panic("middleware: could not register a Builder with an empty name") } if d == nil { - panic("middleware: could not register a nil Driver") + panic("middleware: could not register a nil Builder") } driversM.Lock() defer driversM.Unlock() if _, dup := drivers[name]; dup { - panic("middleware: RegisterDriver called twice for " + name) + panic("middleware: RegisterBuilder called twice for " + name) } drivers[name] = d } // New attempts to initialize a new middleware instance from the -// list of registered Drivers. +// list of registered Builders. // -// If a driver does not exist, returns ErrDriverDoesNotExist. -func New(name string, optionBytes []byte, storage storage.Storage) (Hook, error) { +// If a driver does not exist, returns ErrBuilderDoesNotExist. +func New(name string, options conf.MapConfig, storage storage.Storage) (Hook, error) { driversM.RLock() defer driversM.RUnlock() - var d Driver - d, ok := drivers[name] + var newHook Builder + newHook, ok := drivers[name] if !ok { - return nil, ErrDriverDoesNotExist + return nil, ErrBuilderDoesNotExist } - return d.NewHook(optionBytes, storage) + return newHook(options, storage) } // Config is the generic configuration format used for all registered Hooks. type Config struct { - Name string `yaml:"name"` - Options map[string]any `yaml:"options"` + Name string + Options conf.MapConfig } // HooksFromHookConfigs is a utility function for initializing Hooks in bulk. -func HooksFromHookConfigs(cfgs []Config, storage storage.Storage) (hooks []Hook, err error) { - for _, cfg := range cfgs { - // Marshal the options back into bytes. - var optionBytes []byte - optionBytes, err = yaml.Marshal(cfg.Options) - if err != nil { - return +// each element of configs must contain pairs `name` - string and `options` - map[string]any +func HooksFromHookConfigs(configs []conf.MapConfig, storage storage.Storage) (hooks []Hook, err error) { + for _, cfg := range configs { + var c Config + + if err = cfg.Unmarshal(&c); err != nil { + break } var h Hook - h, err = New(cfg.Name, optionBytes, storage) + h, err = New(c.Name, c.Options, storage) if err != nil { - return + break } hooks = append(hooks, h) diff --git a/middleware/torrentapproval/container/container.go b/middleware/torrentapproval/container/container.go index bd4cfdd..810742f 100644 --- a/middleware/torrentapproval/container/container.go +++ b/middleware/torrentapproval/container/container.go @@ -5,6 +5,7 @@ import ( "sync" "github.com/sot-tech/mochi/bittorrent" + "github.com/sot-tech/mochi/pkg/conf" "github.com/sot-tech/mochi/storage" ) @@ -12,7 +13,7 @@ import ( const DefaultStorageCtxName = "MW_APPROVAL" // Builder function that creates and configures specific container -type Builder func([]byte, storage.Storage) (Container, error) +type Builder func(conf.MapConfig, storage.Storage) (Container, error) var ( buildersMU sync.Mutex @@ -42,7 +43,7 @@ type Container interface { } // GetContainer creates Container by its name and provided confBytes -func GetContainer(name string, confBytes []byte, storage storage.Storage) (Container, error) { +func GetContainer(name string, config conf.MapConfig, storage storage.Storage) (Container, error) { buildersMU.Lock() defer buildersMU.Unlock() var err error @@ -50,7 +51,7 @@ func GetContainer(name string, confBytes []byte, storage storage.Storage) (Conta if builder, exist := builders[name]; !exist { err = ErrContainerDoesNotExist } else { - cn, err = builder(confBytes, storage) + cn, err = builder(config, storage) } return cn, err } diff --git a/middleware/torrentapproval/container/directory/directory.go b/middleware/torrentapproval/container/directory/directory.go index ff902aa..c97d92c 100644 --- a/middleware/torrentapproval/container/directory/directory.go +++ b/middleware/torrentapproval/container/directory/directory.go @@ -10,11 +10,11 @@ import ( "github.com/anacrolix/torrent/metainfo" "github.com/anacrolix/torrent/util/dirwatch" "github.com/minio/sha256-simd" - "gopkg.in/yaml.v3" "github.com/sot-tech/mochi/bittorrent" "github.com/sot-tech/mochi/middleware/torrentapproval/container" "github.com/sot-tech/mochi/middleware/torrentapproval/container/list" + "github.com/sot-tech/mochi/pkg/conf" "github.com/sot-tech/mochi/pkg/log" "github.com/sot-tech/mochi/pkg/stop" "github.com/sot-tech/mochi/storage" @@ -30,14 +30,14 @@ func init() { // Config - implementation of directory container configuration. // Extends list.Config because uses the same storage and Approved function. type Config struct { - list.Config `yaml:",inline"` + list.Config // Path in filesystem where torrent files stored and should be watched - Path string `yaml:"path"` + Path string } -func build(confBytes []byte, st storage.Storage) (container.Container, error) { +func build(conf conf.MapConfig, st storage.Storage) (container.Container, error) { c := new(Config) - if err := yaml.Unmarshal(confBytes, c); err != nil { + if err := conf.Unmarshal(c); err != nil { return nil, fmt.Errorf("unable to deserialise configuration: %w", err) } var err error diff --git a/middleware/torrentapproval/container/list/list.go b/middleware/torrentapproval/container/list/list.go index 8862d30..011ee98 100644 --- a/middleware/torrentapproval/container/list/list.go +++ b/middleware/torrentapproval/container/list/list.go @@ -5,10 +5,9 @@ package list import ( "fmt" - "gopkg.in/yaml.v3" - "github.com/sot-tech/mochi/bittorrent" "github.com/sot-tech/mochi/middleware/torrentapproval/container" + "github.com/sot-tech/mochi/pkg/conf" "github.com/sot-tech/mochi/pkg/log" "github.com/sot-tech/mochi/storage" ) @@ -23,20 +22,20 @@ func init() { // Config - implementation of list container configuration. type Config struct { // HashList static list of HEX-encoded InfoHashes. - HashList []string `yaml:"hash_list"` + HashList []string `cfg:"hash_list"` // If Invert set to true, all InfoHashes stored in HashList should be blacklisted. - Invert bool `yaml:"invert"` + Invert bool // StorageCtx is the name of storage context where to store hash list. // It might be table name, REDIS record key or something else, depending on storage. - StorageCtx string `yaml:"storage_ctx"` + StorageCtx string `cfg:"storage_ctx"` } // DUMMY used as value placeholder if storage needs some value with const DUMMY = "_" -func build(confBytes []byte, st storage.Storage) (container.Container, error) { +func build(conf conf.MapConfig, st storage.Storage) (container.Container, error) { c := new(Config) - if err := yaml.Unmarshal(confBytes, c); err != nil { + if err := conf.Unmarshal(c); err != nil { return nil, fmt.Errorf("unable to deserialise configuration: %w", err) } l := &List{ diff --git a/middleware/torrentapproval/torrentapproval.go b/middleware/torrentapproval/torrentapproval.go index ebec9ae..5d59aba 100644 --- a/middleware/torrentapproval/torrentapproval.go +++ b/middleware/torrentapproval/torrentapproval.go @@ -6,11 +6,10 @@ import ( "context" "fmt" - "gopkg.in/yaml.v3" - "github.com/sot-tech/mochi/bittorrent" "github.com/sot-tech/mochi/middleware" "github.com/sot-tech/mochi/middleware/torrentapproval/container" + "github.com/sot-tech/mochi/pkg/conf" // import directory watcher to enable appropriate support _ "github.com/sot-tech/mochi/middleware/torrentapproval/container/directory" @@ -25,40 +24,33 @@ import ( const Name = "torrent approval" func init() { - middleware.RegisterDriver(Name, driver{}) + middleware.RegisterBuilder(Name, build) } type baseConfig struct { // Source - name of container for initial values - Source string `yaml:"initial_source"` + Source string `cfg:"initial_source"` // Configuration depends on used container - Configuration map[string]any `yaml:"configuration"` + Configuration conf.MapConfig } -type driver struct{} - -func (d driver) NewHook(optionBytes []byte, storage storage.Storage) (middleware.Hook, error) { +func build(options conf.MapConfig, storage storage.Storage) (h middleware.Hook, err error) { var cfg baseConfig - err := yaml.Unmarshal(optionBytes, &cfg) - if err != nil { - return nil, fmt.Errorf("invalid options for middleware %s: %w", Name, err) + if err = options.Unmarshal(&cfg); err != nil { + return nil, fmt.Errorf("middleware %s: %w", Name, err) } if len(cfg.Source) == 0 { - return nil, fmt.Errorf("invalid options for middleware %s: name not provided", Name) + return nil, fmt.Errorf("invalid options for middleware %s: source not provided", Name) } if cfg.Configuration == nil { return nil, fmt.Errorf("invalid options for middleware %s: options not provided", Name) } - var confBytes []byte - var h *hook - if confBytes, err = yaml.Marshal(cfg.Configuration); err == nil { - var c container.Container - if c, err = container.GetContainer(cfg.Source, confBytes, storage); err == nil { - h = &hook{c} - } + var c container.Container + if c, err = container.GetContainer(cfg.Source, cfg.Configuration, storage); err == nil { + h = &hook{c} } return h, err } diff --git a/middleware/torrentapproval/torrentapproval_test.go b/middleware/torrentapproval/torrentapproval_test.go index e37871e..48c40fc 100644 --- a/middleware/torrentapproval/torrentapproval_test.go +++ b/middleware/torrentapproval/torrentapproval_test.go @@ -6,9 +6,9 @@ import ( "testing" "github.com/stretchr/testify/require" - "gopkg.in/yaml.v3" "github.com/sot-tech/mochi/bittorrent" + "github.com/sot-tech/mochi/pkg/conf" "github.com/sot-tech/mochi/storage/memory" ) @@ -71,10 +71,8 @@ func TestHandleAnnounce(t *testing.T) { require.Nil(t, err) for _, tt := range cases { t.Run(fmt.Sprintf("testing hash %s", tt.ih), func(t *testing.T) { - d := driver{} - cfg, err := yaml.Marshal(tt.cfg) - require.Nil(t, err) - h, err := d.NewHook(cfg, storage) + cfg := conf.MapConfig{"initial_source": tt.cfg.Source, "configuration": tt.cfg.Configuration} + h, err := build(cfg, storage) require.Nil(t, err) ctx := context.Background() diff --git a/middleware/varinterval/varinterval.go b/middleware/varinterval/varinterval.go index 3853c70..611c208 100644 --- a/middleware/varinterval/varinterval.go +++ b/middleware/varinterval/varinterval.go @@ -7,11 +7,10 @@ import ( "sync" "time" - "gopkg.in/yaml.v3" - "github.com/sot-tech/mochi/bittorrent" "github.com/sot-tech/mochi/middleware" "github.com/sot-tech/mochi/middleware/pkg/random" + "github.com/sot-tech/mochi/pkg/conf" "github.com/sot-tech/mochi/storage" ) @@ -19,43 +18,46 @@ import ( const Name = "interval variation" func init() { - middleware.RegisterDriver(Name, driver{}) + middleware.RegisterBuilder(Name, build) } -var _ middleware.Driver = driver{} - -type driver struct{} - -func (d driver) NewHook(optionBytes []byte, _ storage.Storage) (middleware.Hook, error) { +func build(options conf.MapConfig, _ storage.Storage) (h middleware.Hook, err error) { var cfg Config - err := yaml.Unmarshal(optionBytes, &cfg) - if err != nil { - return nil, fmt.Errorf("invalid options for middleware %s: %w", Name, err) - } - return NewHook(cfg) + if err = options.Unmarshal(&cfg); err != nil { + err = fmt.Errorf("middleware %s: %w", Name, err) + } else { + if err := checkConfig(cfg); err == nil { + h = &hook{ + cfg: cfg, + } + } + } + return } -// ErrInvalidModifyResponseProbability is returned for a config with an invalid -// ModifyResponseProbability. -var ErrInvalidModifyResponseProbability = errors.New("invalid modify_response_probability") +var ( + // ErrInvalidModifyResponseProbability is returned for a config with an invalid + // ModifyResponseProbability. + ErrInvalidModifyResponseProbability = errors.New("invalid modify_response_probability") -// ErrInvalidMaxIncreaseDelta is returned for a config with an invalid -// MaxIncreaseDelta. -var ErrInvalidMaxIncreaseDelta = errors.New("invalid max_increase_delta") + // ErrInvalidMaxIncreaseDelta is returned for a config with an invalid + // MaxIncreaseDelta. + ErrInvalidMaxIncreaseDelta = errors.New("invalid max_increase_delta") +) // Config represents the configuration for the varinterval middleware. type Config struct { // ModifyResponseProbability is the probability by which a response will // be modified. - ModifyResponseProbability float32 `yaml:"modify_response_probability"` + ModifyResponseProbability float32 `cfg:"modify_response_probability"` // MaxIncreaseDelta is the amount of seconds that will be added at most. - MaxIncreaseDelta int `yaml:"max_increase_delta"` + MaxIncreaseDelta int `cfg:"max_increase_delta"` // ModifyMinInterval specifies whether min_interval should be increased // as well. - ModifyMinInterval bool `yaml:"modify_min_interval"` + ModifyMinInterval bool `cfg:"modify_min_interval"` } func checkConfig(cfg Config) error { @@ -75,17 +77,6 @@ type hook struct { sync.Mutex } -// NewHook creates a middleware to randomly modify the announce interval from -// the given config. -func NewHook(cfg Config) (h middleware.Hook, err error) { - if err = checkConfig(cfg); err == nil { - h = &hook{ - cfg: cfg, - } - } - return -} - func (h *hook) HandleAnnounce(ctx context.Context, req *bittorrent.AnnounceRequest, resp *bittorrent.AnnounceResponse) (context.Context, error) { s0, s1 := random.DeriveEntropyFromRequest(req) // Generate a probability p < 1.0. diff --git a/middleware/varinterval/varinterval_test.go b/middleware/varinterval/varinterval_test.go index 2c27d2d..f8c298e 100644 --- a/middleware/varinterval/varinterval_test.go +++ b/middleware/varinterval/varinterval_test.go @@ -8,6 +8,7 @@ import ( "github.com/stretchr/testify/require" "github.com/sot-tech/mochi/bittorrent" + "github.com/sot-tech/mochi/pkg/conf" ) var configTests = []struct { @@ -45,7 +46,8 @@ func TestCheckConfig(t *testing.T) { } func TestHandleAnnounce(t *testing.T) { - h, err := NewHook(Config{1.0, 10, true}) + c := conf.MapConfig{"modify_response_probability": 1.0, "max_increase_delta": 10, "modify_min_interval": true} + h, err := build(c, nil) require.Nil(t, err) require.NotNil(t, h) diff --git a/pkg/conf/decoder.go b/pkg/conf/decoder.go new file mode 100644 index 0000000..10321b9 --- /dev/null +++ b/pkg/conf/decoder.go @@ -0,0 +1,54 @@ +// Package conf contains alias for map encoded configuration +// and structure unmarshaller +package conf + +import ( + "errors" + + "github.com/mitchellh/mapstructure" + + "github.com/sot-tech/mochi/pkg/log" +) + +// TagName is a tag name, used for decoder customization +const TagName = "cfg" + +// ErrNilConfigMap returned if unmarshalling map is nil and could not be +// decoded into structure +var ErrNilConfigMap = errors.New("unable to process nil map") + +// MapConfig is just alias for map[string]any +type MapConfig map[string]any + +// LogFields just returns this map as a set of Logrus fields. +func (m MapConfig) LogFields() log.Fields { + return log.Fields(m) +} + +// Unmarshal decodes receiver map into provided structure. +// Decoder configured to automatically unmarshal inherited structures, +// convert string-ed duration (1s, 2m, 3h...) into time.Duration and +// string representation IP into net.IP. +// Tag used for decode customization is conf.TagName. +func (m MapConfig) Unmarshal(into any) (err error) { + if m != nil { + if len(m) > 0 { + conf := &mapstructure.DecoderConfig{ + DecodeHook: mapstructure.ComposeDecodeHookFunc( + mapstructure.StringToTimeDurationHookFunc(), + mapstructure.StringToIPHookFunc(), + ), + Squash: true, + Result: into, + TagName: TagName, + } + var decoder *mapstructure.Decoder + if decoder, err = mapstructure.NewDecoder(conf); err == nil { + err = decoder.Decode(m) + } + } + } else { + err = ErrNilConfigMap + } + return +} diff --git a/storage/memory/storage.go b/storage/memory/storage.go index 42de117..af97fe3 100644 --- a/storage/memory/storage.go +++ b/storage/memory/storage.go @@ -12,9 +12,8 @@ import ( "sync" "time" - "gopkg.in/yaml.v3" - "github.com/sot-tech/mochi/bittorrent" + "github.com/sot-tech/mochi/pkg/conf" "github.com/sot-tech/mochi/pkg/log" "github.com/sot-tech/mochi/pkg/stop" "github.com/sot-tech/mochi/pkg/timecache" @@ -39,29 +38,20 @@ func init() { type driver struct{} -func (d driver) NewStorage(icfg any) (storage.Storage, error) { - // Marshal the config back into bytes. - bytes, err := yaml.Marshal(icfg) - if err != nil { - return nil, err - } - - // Unmarshal the bytes into the proper config type. +func (d driver) NewStorage(icfg conf.MapConfig) (storage.Storage, error) { var cfg Config - err = yaml.Unmarshal(bytes, &cfg) - if err != nil { + if err := icfg.Unmarshal(&cfg); err != nil { return nil, err } - return New(cfg) } // Config holds the configuration of a memory Storage. type Config struct { - GarbageCollectionInterval time.Duration `yaml:"gc_interval"` - PrometheusReportingInterval time.Duration `yaml:"prometheus_reporting_interval"` - PeerLifetime time.Duration `yaml:"peer_lifetime"` - ShardCount int `yaml:"shard_count"` + GarbageCollectionInterval time.Duration `cfg:"gc_interval"` + PrometheusReportingInterval time.Duration `cfg:"prometheus_reporting_interval"` + PeerLifetime time.Duration `cfg:"peer_lifetime"` + ShardCount int `cfg:"shard_count"` } // LogFields renders the current config as a set of Logrus fields. diff --git a/storage/redis/storage.go b/storage/redis/storage.go index 82aab65..ef84f1c 100644 --- a/storage/redis/storage.go +++ b/storage/redis/storage.go @@ -29,9 +29,9 @@ import ( "time" "github.com/go-redis/redis/v8" - "gopkg.in/yaml.v3" "github.com/sot-tech/mochi/bittorrent" + "github.com/sot-tech/mochi/pkg/conf" "github.com/sot-tech/mochi/pkg/log" "github.com/sot-tech/mochi/pkg/stop" "github.com/sot-tech/mochi/pkg/timecache" @@ -73,17 +73,11 @@ func init() { type driver struct{} -func (d driver) NewStorage(icfg any) (storage.Storage, error) { - // Marshal the config back into bytes. - bytes, err := yaml.Marshal(icfg) - if err != nil { - return nil, err - } - +func (d driver) NewStorage(icfg conf.MapConfig) (storage.Storage, error) { // Unmarshal the bytes into the proper config type. var cfg Config - err = yaml.Unmarshal(bytes, &cfg) - if err != nil { + + if err := icfg.Unmarshal(&cfg); err != nil { return nil, err } @@ -92,20 +86,20 @@ func (d driver) NewStorage(icfg any) (storage.Storage, error) { // Config holds the configuration of a redis Storage. type Config struct { - GarbageCollectionInterval time.Duration `yaml:"gc_interval"` - PrometheusReportingInterval time.Duration `yaml:"prometheus_reporting_interval"` - PeerLifetime time.Duration `yaml:"peer_lifetime"` - Addresses []string `yaml:"addresses"` - Login string `yaml:"login"` - Password string `yaml:"password"` - Sentinel bool `yaml:"sentinel"` - SentinelMaster string `yaml:"sentinel_master"` - Cluster bool `yaml:"cluster"` - DB int `yaml:"db"` - PoolSize int `yaml:"pool_size"` - ReadTimeout time.Duration `yaml:"read_timeout"` - WriteTimeout time.Duration `yaml:"write_timeout"` - ConnectTimeout time.Duration `yaml:"connect_timeout"` + GarbageCollectionInterval time.Duration `cfg:"gc_interval"` + PrometheusReportingInterval time.Duration `cfg:"prometheus_reporting_interval"` + PeerLifetime time.Duration `cfg:"peer_lifetime"` + Addresses []string + DB int + PoolSize int `cfg:"pool_size"` + Login string + Password string + Sentinel bool + SentinelMaster string `cfg:"sentinel_master"` + Cluster bool + ReadTimeout time.Duration `cfg:"read_timeout"` + WriteTimeout time.Duration `cfg:"write_timeout"` + ConnectTimeout time.Duration `cfg:"connect_timeout"` } // LogFields renders the current config as a set of Logrus fields. @@ -230,6 +224,7 @@ func connect(cfg Config) (*store, error) { SentinelAddrs: cfg.Addresses, SentinelUsername: cfg.Login, SentinelPassword: cfg.Password, + MasterName: cfg.SentinelMaster, DialTimeout: cfg.ConnectTimeout, ReadTimeout: cfg.ReadTimeout, WriteTimeout: cfg.WriteTimeout, @@ -411,10 +406,9 @@ func (ps *store) PutSeeder(ih bittorrent.InfoHash, peer bittorrent.Peer) error { ihSummaryKey, ihPeerKey, cntPeerKey = ih4Key, ih4SeederKey, cnt4SeederKey } ihPeerKey += ih.RawString() - now := ps.getClock() return ps.tx(func(tx redis.Pipeliner) (err error) { - if err = tx.HSet(ps.ctx, ihPeerKey, peer.RawString(), now).Err(); err != nil { + if err = tx.HSet(ps.ctx, ihPeerKey, peer.RawString(), ps.getClock()).Err(); err != nil { return } if err = ps.con.Incr(ps.ctx, cntPeerKey).Err(); err != nil { diff --git a/storage/redis/storage_test.go b/storage/redis/storage_test.go index 4724214..f8955b9 100644 --- a/storage/redis/storage_test.go +++ b/storage/redis/storage_test.go @@ -11,7 +11,7 @@ import ( "github.com/sot-tech/mochi/storage/test" ) -var conf = Config{ +var cfg = Config{ GarbageCollectionInterval: 10 * time.Minute, PrometheusReportingInterval: 10 * time.Minute, PeerLifetime: 30 * time.Minute, @@ -23,7 +23,7 @@ var conf = Config{ func createNew() s.Storage { var ps s.Storage var err error - ps, err = New(conf) + ps, err = New(cfg) if err != nil { fmt.Println("unable to create real Redis connection: ", err, " using simulator") var rs *miniredis.Miniredis @@ -31,8 +31,8 @@ func createNew() s.Storage { if err != nil { panic(err) } - conf.Addresses = []string{rs.Addr()} - ps, err = New(conf) + cfg.Addresses = []string{rs.Addr()} + ps, err = New(cfg) } if err != nil { panic(err) diff --git a/storage/storage.go b/storage/storage.go index a2b43c3..219bbe3 100644 --- a/storage/storage.go +++ b/storage/storage.go @@ -6,6 +6,7 @@ import ( "time" "github.com/sot-tech/mochi/bittorrent" + "github.com/sot-tech/mochi/pkg/conf" "github.com/sot-tech/mochi/pkg/log" "github.com/sot-tech/mochi/pkg/stop" ) @@ -23,7 +24,7 @@ type Entry struct { // Driver is the interface used to initialize a new type of Storage. type Driver interface { - NewStorage(cfg any) (Storage, error) + NewStorage(cfg conf.MapConfig) (Storage, error) } // ErrResourceDoesNotExist is the error returned by all delete methods and the @@ -167,7 +168,7 @@ func RegisterDriver(name string, d Driver) { // the list of registered Drivers. // // If a driver does not exist, returns ErrDriverDoesNotExist. -func NewStorage(name string, cfg any) (ps Storage, err error) { +func NewStorage(name string, cfg conf.MapConfig) (ps Storage, err error) { driversM.RLock() defer driversM.RUnlock()