diff --git a/cmd/mochi/config.go b/cmd/mochi/config.go index f665f5c..b32385e 100644 --- a/cmd/mochi/config.go +++ b/cmd/mochi/config.go @@ -9,7 +9,11 @@ import ( "github.com/sot-tech/mochi/pkg/conf" - // Imports to register middleware drivers. + // Imports to register frontends + _ "github.com/sot-tech/mochi/frontend/http" + _ "github.com/sot-tech/mochi/frontend/udp" + + // Imports to register middleware hooks. _ "github.com/sot-tech/mochi/middleware/clientapproval" _ "github.com/sot-tech/mochi/middleware/jwt" _ "github.com/sot-tech/mochi/middleware/torrentapproval" @@ -29,29 +33,20 @@ type Config struct { // We can make Conf extensible enough that you can program a new response // generator at the cost of making it possible for users to create config that // won't compose a functional tracker. - AnnounceInterval time.Duration `yaml:"announce_interval"` - MinAnnounceInterval time.Duration `yaml:"min_announce_interval"` - MetricsAddr string `yaml:"metrics_addr"` - HTTPConfig conf.MapConfig `yaml:"http"` - UDPConfig conf.MapConfig `yaml:"udp"` - Storage struct { - Name string `yaml:"name"` - Config conf.MapConfig `yaml:"config"` - } `yaml:"storage"` - PreHooks []conf.MapConfig `yaml:"prehooks"` - PostHooks []conf.MapConfig `yaml:"posthooks"` + AnnounceInterval time.Duration `yaml:"announce_interval"` + MinAnnounceInterval time.Duration `yaml:"min_announce_interval"` + MetricsAddr string `yaml:"metrics_addr"` + Frontends []conf.NamedMapConfig `yaml:"frontends"` + Storage conf.NamedMapConfig `yaml:"storage"` + PreHooks []conf.NamedMapConfig `yaml:"prehooks"` + PostHooks []conf.NamedMapConfig `yaml:"posthooks"` } -// ConfigFile represents a namespaced YAML configation file. -type ConfigFile struct { - Conf Config `yaml:"mochi"` -} - -// ParseConfigFile returns a new ConfigFile given the path to a YAML +// ParseConfigFile returns a new Config given the path to a YAML // configuration file. // // It supports relative and absolute paths and environment variables. -func ParseConfigFile(path string) (*ConfigFile, error) { +func ParseConfigFile(path string) (*Config, error) { if path == "" { return nil, errors.New("no config path specified") } @@ -59,7 +54,7 @@ func ParseConfigFile(path string) (*ConfigFile, error) { f, err := os.Open(os.ExpandEnv(path)) if err == nil { defer f.Close() - cfgFile := new(ConfigFile) + cfgFile := new(Config) err = yaml.NewDecoder(f).Decode(cfgFile) return cfgFile, err } diff --git a/cmd/mochi/server.go b/cmd/mochi/server.go index b2cc83b..d3dc243 100644 --- a/cmd/mochi/server.go +++ b/cmd/mochi/server.go @@ -4,8 +4,7 @@ import ( "errors" "fmt" - "github.com/sot-tech/mochi/frontend/http" - "github.com/sot-tech/mochi/frontend/udp" + "github.com/sot-tech/mochi/frontend" "github.com/sot-tech/mochi/middleware" "github.com/sot-tech/mochi/pkg/log" "github.com/sot-tech/mochi/pkg/metrics" @@ -24,11 +23,10 @@ type Server struct { // It is optional to provide an instance of the peer store to avoid the // creation of a new one. func (r *Server) Run(configFilePath string) error { - configFile, err := ParseConfigFile(configFilePath) + cfg, err := ParseConfigFile(configFilePath) if err != nil { return fmt.Errorf("failed to read config: %w", err) } - cfg := configFile.Conf r.sg = stop.NewGroup() @@ -39,51 +37,35 @@ func (r *Server) Run(configFilePath string) error { log.Info().Msg("metrics disabled because of empty address") } - log.Debug().Str("name", cfg.Storage.Name).Object("config", cfg.Storage.Config).Msg("starting storage") - r.storage, err = storage.NewStorage(cfg.Storage.Name, cfg.Storage.Config) + r.storage, err = storage.NewStorage(cfg.Storage) if err != nil { return fmt.Errorf("failed to create storage: %w", err) } - log.Info().Str("name", cfg.Storage.Name).Msg("started storage") preHooks, err := middleware.NewHooks(cfg.PreHooks, r.storage) if err != nil { - return fmt.Errorf("failed to validate hook config: %w", err) + return fmt.Errorf("failed to configure pre-hooks: %w", err) } postHooks, err := middleware.NewHooks(cfg.PostHooks, r.storage) if err != nil { - return fmt.Errorf("failed to validate hook config: %w", err) + return fmt.Errorf("failed to configure post-hooks: %w", err) } - r.logic = middleware.NewLogic(cfg.AnnounceInterval, cfg.MinAnnounceInterval, r.storage, preHooks, postHooks) - - var started bool - if len(cfg.HTTPConfig) > 0 { - log.Info().Object("config", cfg.HTTPConfig).Msg("starting HTTP frontend") - httpFE, err := http.NewFrontend(r.logic, cfg.HTTPConfig) - if err == nil { - r.sg.Add(httpFE) - started = true + if len(cfg.Frontends) > 0 { + var fs []frontend.Frontend + r.logic = middleware.NewLogic(cfg.AnnounceInterval, cfg.MinAnnounceInterval, r.storage, preHooks, postHooks) + if fs, err = frontend.NewFrontends(cfg.Frontends, r.logic); err == nil { + for _, f := range fs { + r.sg.Add(f) + } } else { - return err + err = fmt.Errorf("failed to configure frontends: %w", err) } + } else { + err = errors.New("no frontends configured") } - if len(cfg.UDPConfig) > 0 { - log.Info().Object("config", cfg.UDPConfig).Msg("starting UDP frontend") - udpFE, err := udp.NewFrontend(r.logic, cfg.UDPConfig) - if err == nil { - r.sg.Add(udpFE) - started = true - } else { - return err - } - } - if !started { - return errors.New("no frontends configured") - } - - return nil + return err } // Dispose shuts down an instance of Server. diff --git a/dist/example_config.yaml b/dist/example_config.yaml index ed059b6..f494e5b 100644 --- a/dist/example_config.yaml +++ b/dist/example_config.yaml @@ -1,303 +1,311 @@ # @formatter:off -mochi: - # The interval communicated with BitTorrent clients informing them how - # frequently they should announce in between client events. - announce_interval: 30m +# The interval communicated with BitTorrent clients informing them how +# frequently they should announce in between client events. +announce_interval: 30m - # The interval communicated with BitTorrent clients informing them of the - # minimal duration between announces. - min_announce_interval: 15m +# The interval communicated with BitTorrent clients informing them of the +# minimal duration between announces. +min_announce_interval: 15m - # The network interface that will bind to an HTTP endpoint that can be - # scraped by programs collecting metrics. - # - # /metrics serves metrics in the Prometheus format - # /debug/pprof/{cmdline,profile,symbol,trace} serves profiles in the pprof format - metrics_addr: "0.0.0.0:6880" +# The network interface that will bind to an HTTP endpoint that can be +# scraped by programs collecting metrics. +# +# /metrics serves metrics in the Prometheus format +# /debug/pprof/{cmdline,profile,symbol,trace} serves profiles in the pprof format +metrics_addr: "0.0.0.0:6880" +frontends: # This block defines configuration for the tracker's HTTP interface. # If you do not wish to run this, delete this section. - http: - # The network interface that will bind to an HTTP server for serving - # BitTorrent traffic. Remove this to disable the non-TLS listener. - addr: "0.0.0.0:6969" - - # The network interface that will bind to an HTTPS server for serving - # BitTorrent traffic. If set, tls_cert_path and tls_key_path are required. - https_addr: "" - - # The path to the required files to listen via HTTPS. - tls_cert_path: "" - tls_key_path: "" - # Enable SO_REUSEPORT to allow starting multiple mochi instances with the same HTTP(S) port. - reuse_port: true - - # The timeout durations for HTTP requests. - read_timeout: 5s - write_timeout: 5s - - # When true, persistent connections will be allowed. Generally this is not - # useful for a public tracker, but helps performance in some cases (use of - # a reverse proxy, or when there are few clients issuing many requests). - enable_keepalive: false - idle_timeout: 30s - - # Whether to time requests. - # Disabling this should increase performance/decrease load. - enable_request_timing: false - - # An array of routes to listen on for announce requests. This is an option - # to support trackers that do not listen for /announce or need to listen - # on multiple routes. - # - # This supports named parameters and catch-all parameters as described at - # https://github.com/julienschmidt/httprouter#named-parameters - announce_routes: - - "/announce" - # - "/announce.php" - - # An array of routes to listen on for scrape requests. This is an option - # to support trackers that do not listen for /scrape or need to listen - # on multiple routes. - # - # This supports named parameters and catch-all parameters as described at - # https://github.com/julienschmidt/httprouter#named-parameters - scrape_routes: - - "/scrape" - # - "/scrape.php" - - # An array of routes to listen ping requests. - # Used just to ensure if server is operational. Returns nothing, - # just HTTP 200 without body. Listens both GET and HEAD HTTP methods. - # HEAD method just checks http server, GET checks all hooks, - # which support ping - ping_routes: - - "/ping" - - # When not enabled, tracker will use only address from which client connected to tracker. - # When enabled, the IP address that clients advertise as their IP address will - # be appended as announce candidate. - allow_ip_spoofing: false - - # When enabled, IPs from private, local and loopback subnets will be ignored - filter_private_ips: false - - # The HTTP Header containing the IP address of the client. - # This is only necessary if using a reverse proxy. - real_ip_header: "x-real-ip" - - # The maximum number of peers returned for an individual request. - max_numwant: 100 - - # The default number of peers returned for an individual request. - default_numwant: 50 - - # The maximum number of infohashes that can be scraped in one request. - max_scrape_infohashes: 50 - - # This block defines configuration for the tracker's UDP interface. - # If you do not wish to run this, delete this section. - udp: - # The network interface that will bind to a UDP server for serving - # BitTorrent traffic. - addr: "0.0.0.0:6969" - - # Enable SO_REUSEPORT to allow starting multiple mochi instances with the same UDP port. - reuse_port: true - - # The leeway for a timestamp on a connection ID. - max_clock_skew: 10s - - # The key used to encrypt connection IDs. - private_key: "paste a random string here that will be used to hmac connection IDs" - - # Whether to time requests. - # Disabling this should increase performance/decrease load. - enable_request_timing: false - - # When not enabled, tracker will use only address from which client connected to tracker. - # When enabled, the IP address that clients advertise as their IP address will - # be appended as announce candidate. - allow_ip_spoofing: false - - # When enabled, IPs from private, local and loopback subnets will be ignored - filter_private_ips: false - - # The maximum number of peers returned for an individual request. - max_numwant: 100 - - # The default number of peers returned for an individual request. - default_numwant: 50 - - # The maximum number of infohashes that can be scraped in one request. - max_scrape_infohashes: 50 - - - # This block defines configuration used for the storage of peer data. - storage: - name: memory + - name: http config: - # The frequency which stale peers are removed. - # This balances between - # - collecting garbage more often, potentially using more CPU time, but potentially using less memory (lower value) - # - collecting garbage less frequently, saving CPU time, but keeping old peers long, thus using more memory (higher value). - gc_interval: 3m + # The network interface that will bind to an HTTP server for serving + # BitTorrent traffic. Remove this to disable the non-TLS listener. + addr: "0.0.0.0:6969" - # The amount of time until a peer is considered stale. - # To avoid churn, keep this slightly larger than `announce_interval` - peer_lifetime: 31m + # The network interface that will bind to an HTTPS server for serving + # BitTorrent traffic. If set, tls_cert_path and tls_key_path are required. + https_addr: "" - # The number of partitions data will be divided into in order to provide a - # higher degree of parallelism. - shard_count: 1024 + # The path to the required files to listen via HTTPS. + tls_cert_path: "" + tls_key_path: "" + # Enable SO_REUSEPORT to allow starting multiple mochi instances with the same HTTP(S) port. + reuse_port: true - # The interval at which metrics about the number of infohashes and peers - # are collected and posted to Prometheus. - prometheus_reporting_interval: 1s + # The timeout durations for HTTP requests. + read_timeout: 5s + write_timeout: 5s - # This block defines configuration used for redis storage. - #storage: - #name: redis - #config: - # The frequency which stale peers are removed. - # This balances between - # - collecting garbage more often, potentially using more CPU time, but potentially using less memory (lower value) - # - collecting garbage less frequently, saving CPU time, but keeping old peers long, thus using more memory (higher value). - #gc_interval: 3m + # When true, persistent connections will be allowed. Generally this is not + # useful for a public tracker, but helps performance in some cases (use of + # a reverse proxy, or when there are few clients issuing many requests). + enable_keepalive: false + idle_timeout: 30s - # The interval at which metrics about the number of infohashes and peers - # are collected and posted to Prometheus. - #prometheus_reporting_interval: 1s + # Whether to time requests. + # Disabling this should increase performance/decrease load. + enable_request_timing: false - # The amount of time until a peer is considered stale. - # To avoid churn, keep this slightly larger than `announce_interval` - #peer_lifetime: 31m + # An array of routes to listen on for announce requests. This is an option + # to support trackers that do not listen for /announce or need to listen + # on multiple routes. + # + # This supports named parameters and catch-all parameters as described at + # https://github.com/julienschmidt/httprouter#named-parameters + announce_routes: + - "/announce" + # - "/announce.php" - # The addresses of redis storage. - # If neither sentinel not cluster switched, - # only first address used - #addresses: ["127.0.0.1:6379"] + # An array of routes to listen on for scrape requests. This is an option + # to support trackers that do not listen for /scrape or need to listen + # on multiple routes. + # + # This supports named parameters and catch-all parameters as described at + # https://github.com/julienschmidt/httprouter#named-parameters + scrape_routes: + - "/scrape" + # - "/scrape.php" - # Database to be selected after connecting to the server. - #db: 0 + # An array of routes to listen ping requests. + # Used just to ensure if server is operational. Returns nothing, + # just HTTP 200 without body. Listens both GET and HEAD HTTP methods. + # HEAD method just checks http server, GET checks all hooks, + # which support ping + ping_routes: + - "/ping" - # Maximum number of socket connections, default is 10 per CPU - #pool_size: 10 + # When not enabled, tracker will use only address from which client connected to tracker. + # When enabled, the IP address that clients advertise as their IP address will + # be appended as announce candidate. + allow_ip_spoofing: false - # Use the specified login/username to authenticate the current connection - #login: "" + # When enabled, IPs from private, local and loopback subnets will be ignored + filter_private_ips: false - # Optional password - #password: "" + # The HTTP Header containing the IP address of the client. + # This is only necessary if using a reverse proxy. + real_ip_header: "x-real-ip" - # Connect to sentinel nodes - #sentinel: false + # The maximum number of peers returned for an individual request. + max_numwant: 100 - # The master name - #sentinel_master: "" + # The default number of peers returned for an individual request. + default_numwant: 50 - # Connect to the redis cluster - #cluster: false + # The maximum number of infohashes that can be scraped in one request. + max_scrape_infohashes: 50 - # The timeout for reading a command reply from redis. - #read_timeout: 15s + # This block defines configuration for the tracker's UDP interface. + # If you do not wish to run this, delete this section. + - name: udp + config: + # The network interface that will bind to a UDP server for serving + # BitTorrent traffic. + addr: "0.0.0.0:6969" - # The timeout for writing a command to redis. - #write_timeout: 15s + # Enable SO_REUSEPORT to allow starting multiple mochi instances with the same UDP port. + reuse_port: true - # Dial timeout for establishing new connections. - #connect_timeout: 15s + # The leeway for a timestamp on a connection ID. + max_clock_skew: 10s - # This block defines configuration used for PostgreSQL storage. - # example `mo_peers` table structure: - # - info_hash bytea - # - peer_id bytea - # - address inet or bytea - # - port int4 - # - is_seeder bool - # - is_v6 bool - # - created timestamp - #storage: - #name: pg - #config: - # connection string to pg storage. may be URL (postgres://...) or DSN (host=... port=...) - #connection_string: host=127.0.0.1 database=test user=postgres pool_max_conns=50 - # query and parameters for announce operation - #announce: - #query: SELECT peer_id, address, port FROM mo_peers WHERE info_hash=$1 AND is_seeder=$2 AND is_v6=$3 LIMIT $4 - #peer_id_column: peer_id - #address_column: address - #port_column: port - #peer: - # expected parameters: 1 - info hash (bytea), 2 - peer id (bytea), 3 - ip address (bytea/inet) - # 4 - port (int), 5 - is seeder (bool), 6 - is IPv6 (bool), 7 - create date and time (timestamp) - #add_query: INSERT INTO mo_peers VALUES($1, $2, $3, $4, $5, $6, $7) ON CONFLICT (info_hash, peer_id, address, port) DO UPDATE SET created = EXCLUDED.created, is_seeder = EXCLUDED.is_seeder - #del_query: DELETE FROM mo_peers WHERE info_hash=$1 AND peer_id=$2 AND address=$3 AND port=$4 AND is_seeder=$5 - #graduate_query: UPDATE mo_peers SET is_seeder=TRUE WHERE info_hash=$1 AND peer_id=$2 AND address=$3 AND port=$4 AND NOT is_seeder - #count_query: SELECT COUNT(1) FILTER (WHERE is_seeder) AS seeders, COUNT(1) FILTER (WHERE NOT is_seeder) AS leechers FROM mo_peers - # predicate part of `count_query` for get count of peers by info hash - #by_info_hash_clause: WHERE info_hash = $1 - #count_seeders_column: seeders - #count_leechers_column: leechers - # queries for KV-store - #data: - # expected parameters: 1 - context (varchar), 2 - name (bytea), 3 - value (bytea) - #add_query: INSERT INTO mo_kv VALUES($1, $2, $3) ON CONFLICT (context, name) DO NOTHING - #del_query: DELETE FROM mo_kv WHERE context=$1 AND name=$2 - #get_query: SELECT value FROM mo_kv WHERE context=$1 AND name=$2 - # query for check if database is alive - #ping_query: SELECT 1 - # query for garbage collection, expected parameter is timestamp - #gc_query: DELETE FROM mo_peers WHERE created <= $1 - # The amount of time until a peer is considered stale. - # To avoid churn, keep this slightly larger than `announce_interval` - #peer_lifetime: 31m - # The frequency which stale peers are removed. - #gc_interval: 3m - # query for info hash statistics - #info_hash_count_query: SELECT COUNT(DISTINCT info_hash) as info_hashes FROM mo_peers - # The interval at which metrics about the number of info hashes and peers - # are collected and posted to Prometheus. - #prometheus_reporting_interval: 1s + # The key used to encrypt connection IDs. + private_key: "paste a random string here that will be used to hmac connection IDs" - # This block defines configuration used for middleware executed before a - # response has been returned to a BitTorrent client. - prehooks: - # - name: jwt - # options: - # header: "authorization" - # issuer: "https://issuer.com" - # audience: "https://some.issuer.com" - # jwk_set_url: "https://issuer.com/keys" - # jwk_set_update_interval: 5m - # handle_announce: true - # handle_scrape: false - # - # - name: client approval - # options: - # whitelist: - # - "OP1011" - # blacklist: - # - "OP1012" - # - # - name: interval variation - # options: - # modify_response_probability: 0.2 - # max_increase_delta: 60 - # modify_min_interval: true - # - # This block defines configuration used for torrent approval, it requires to be given - # hashes for whitelist or for blacklist. Hashes are hexadecimal-encoaded. - # - name: torrent approval - # options: - # initial_source: list - # Save data provided by source in storage above - # preserve: false - # configuration: - # hash_list: - # - "a1b2c3d4e5a1b2c3d4e5a1b2c3d4e5a1b2c3d4e5" - # true - whitelist mode, false - blacklist - # invert: false - # Name of storage context where store hash list - # storage_ctx: APPROVED_HASH - posthooks: + # Whether to time requests. + # Disabling this should increase performance/decrease load. + enable_request_timing: false + + # When not enabled, tracker will use only address from which client connected to tracker. + # When enabled, the IP address that clients advertise as their IP address will + # be appended as announce candidate. + allow_ip_spoofing: false + + # When enabled, IPs from private, local and loopback subnets will be ignored + filter_private_ips: false + + # The maximum number of peers returned for an individual request. + max_numwant: 100 + + # The default number of peers returned for an individual request. + default_numwant: 50 + + # The maximum number of infohashes that can be scraped in one request. + max_scrape_infohashes: 50 + + +# This block defines configuration used for the storage of peer data. +storage: + name: memory + config: + # The frequency which stale peers are removed. + # This balances between + # - collecting garbage more often, potentially using more CPU time, but potentially using less memory (lower value) + # - collecting garbage less frequently, saving CPU time, but keeping old peers long, thus using more memory (higher value). + gc_interval: 3m + + # The amount of time until a peer is considered stale. + # To avoid churn, keep this slightly larger than `announce_interval` + peer_lifetime: 31m + + # The number of partitions data will be divided into in order to provide a + # higher degree of parallelism. + shard_count: 1024 + + # The interval at which metrics about the number of infohashes and peers + # are collected and posted to Prometheus. + prometheus_reporting_interval: 1s + +# This block defines configuration used for redis storage. +#storage: + #name: redis + #config: + # The frequency which stale peers are removed. + # This balances between + # - collecting garbage more often, potentially using more CPU time, but potentially using less memory (lower value) + # - collecting garbage less frequently, saving CPU time, but keeping old peers long, thus using more memory (higher value). + #gc_interval: 3m + + # The interval at which metrics about the number of infohashes and peers + # are collected and posted to Prometheus. + #prometheus_reporting_interval: 1s + + # The amount of time until a peer is considered stale. + # To avoid churn, keep this slightly larger than `announce_interval` + #peer_lifetime: 31m + + # The addresses of redis storage. + # If neither sentinel not cluster switched, + # only first address used + #addresses: ["127.0.0.1:6379"] + + # Database to be selected after connecting to the server. + #db: 0 + + # Maximum number of socket connections, default is 10 per CPU + #pool_size: 10 + + # Use the specified login/username to authenticate the current connection + #login: "" + + # Optional password + #password: "" + + # Connect to sentinel nodes + #sentinel: false + + # The master name + #sentinel_master: "" + + # Connect to the redis cluster + #cluster: false + + # The timeout for reading a command reply from redis. + #read_timeout: 15s + + # The timeout for writing a command to redis. + #write_timeout: 15s + + # Dial timeout for establishing new connections. + #connect_timeout: 15s + +# This block defines configuration used for PostgreSQL storage. +# example peers table structure: +# - info_hash bytea +# - peer_id bytea +# - address inet or bytea +# - port int4 +# - is_seeder bool +# - is_v6 bool +# - created timestamp +# example downloads table structure: +# - info_hash bytea +# - downloads int +#storage: + #name: pg + #config: + # connection string to pg storage. may be URL (postgres://...) or DSN (host=... port=...) + #connection_string: host=127.0.0.1 database=test user=postgres pool_max_conns=50 + # query and parameters for announce operation + #announce: + #query: SELECT peer_id, address, port FROM mo_peers WHERE info_hash=@info_hash AND is_seeder=@is_seeder AND is_v6=@is_v6 LIMIT @count + #peer_id_column: peer_id + #address_column: address + #port_column: port + # queries to get/increment 'snached' (downloaded) count + #downloads: + #get_query: SELECT downloads FROM mo_downloads where info_hash=@info_hash + #inc_query: INSERT INTO mo_downloads VALUES(@info_hash) ON CONFLICT(info_hash) DO UPDATE SET downloads = mo_downloads.downloads + 1 + # queries and parameters for add/delete/count peers operations + #peer: + #add_query: INSERT INTO mo_peers VALUES(@info_hash, @peer_id, @address, @port, @is_seeder, @is_v6, @created) ON CONFLICT (info_hash, peer_id, address, port) DO UPDATE SET created = EXCLUDED.created, is_seeder = EXCLUDED.is_seeder + #del_query: DELETE FROM mo_peers WHERE info_hash=@info_hash AND peer_id=@peer_id AND address=@address AND port=@port AND is_seeder=@is_seeder + #graduate_query: UPDATE mo_peers SET is_seeder=TRUE WHERE info_hash=@info_hash AND peer_id=peer_id AND address=@address AND port=@port AND NOT is_seeder + #count_query: SELECT COUNT(1) FILTER (WHERE is_seeder) AS seeders, COUNT(1) FILTER (WHERE NOT is_seeder) AS leechers FROM mo_peers + # predicate part of `count_query` to get count of peers by info hash + #by_info_hash_clause: WHERE info_hash = @info_hash + #count_seeders_column: seeders + #count_leechers_column: leechers + # queries for KV-store + #data: + #add_query: INSERT INTO mo_kv VALUES(@context, @key, @value) ON CONFLICT (context, name) DO NOTHING + # NOTE: in del_query @key parameter is array, NOT single value + #del_query: DELETE FROM mo_kv WHERE context=@context AND name = ANY(@key) + #get_query: SELECT value FROM mo_kv WHERE context=@context AND name=@key + # query for check if database is alive + #ping_query: SELECT 1 + # query for garbage collection, expected parameter is timestamp + #gc_query: DELETE FROM mo_peers WHERE created <= @created + # The amount of time until a peer is considered stale. + # To avoid churn, keep this slightly larger than `announce_interval` + #peer_lifetime: 31m + # The frequency which stale peers are removed. + #gc_interval: 3m + # query for info hash statistics + #info_hash_count_query: SELECT COUNT(DISTINCT info_hash) as info_hashes FROM mo_peers + # The interval at which metrics about the number of info hashes and peers + # are collected and posted to Prometheus. + #prometheus_reporting_interval: 1s + +# This block defines configuration used for middleware executed before a +# response has been returned to a BitTorrent client. +prehooks: +# - name: jwt +# config: +# header: "authorization" +# issuer: "https://issuer.com" +# audience: "https://some.issuer.com" +# jwk_set_url: "https://issuer.com/keys" +# jwk_set_update_interval: 5m +# handle_announce: true +# handle_scrape: false +# +# - name: client approval +# config: +# whitelist: +# - "OP1011" +# blacklist: +# - "OP1012" +# +# - name: interval variation +# config: +# modify_response_probability: 0.2 +# max_increase_delta: 60 +# modify_min_interval: true +# +# This block defines configuration used for torrent approval, it requires to be given +# hashes for whitelist or for blacklist. Hashes are hexadecimal-encoaded. +# - name: torrent approval +# config: +# initial_source: list +# Save data provided by source in storage above +# preserve: false +# configuration: +# hash_list: +# - "a1b2c3d4e5a1b2c3d4e5a1b2c3d4e5a1b2c3d4e5" +# true - whitelist mode, false - blacklist +# invert: false +# Name of storage context where store hash list +# storage_ctx: APPROVED_HASH +posthooks: diff --git a/frontend/frontend.go b/frontend/frontend.go index e1d5b82..c92dfde 100644 --- a/frontend/frontend.go +++ b/frontend/frontend.go @@ -3,34 +3,70 @@ package frontend import ( - "context" + "fmt" + "sync" - "github.com/sot-tech/mochi/bittorrent" + "github.com/sot-tech/mochi/middleware" + "github.com/sot-tech/mochi/pkg/conf" + "github.com/sot-tech/mochi/pkg/log" + "github.com/sot-tech/mochi/pkg/stop" ) -// TrackerLogic is the interface used by a frontend in order to: (1) generate a -// response from a parsed request, and (2) asynchronously observe anything -// after the response has been delivered to the client. -type TrackerLogic interface { - // HandleAnnounce generates a response for an Announce. - // - // Returns the updated context, the generated AnnounceResponse and no error - // on success; nil and error on failure. - HandleAnnounce(context.Context, *bittorrent.AnnounceRequest) (context.Context, *bittorrent.AnnounceResponse, error) +var ( + logger = log.NewLogger("frontend") + buildersMU sync.RWMutex + builders = make(map[string]Builder) +) - // AfterAnnounce does something with the results of an Announce after it - // has been completed. - AfterAnnounce(context.Context, *bittorrent.AnnounceRequest, *bittorrent.AnnounceResponse) +// Builder is the function used to initialize a new Frontend +// with provided configuration. +type Builder func(conf.MapConfig, *middleware.Logic) (Frontend, error) - // HandleScrape generates a response for a Scrape. - // - // Returns the updated context, the generated AnnounceResponse and no error - // on success; nil and error on failure. - HandleScrape(context.Context, *bittorrent.ScrapeRequest) (context.Context, *bittorrent.ScrapeResponse, error) +// RegisterBuilder makes a Builder available by the provided name. +// +// If called twice with the same name, the name is blank, or if the provided +// Builder is nil, this function panics. +func RegisterBuilder(name string, b Builder) { + if name == "" { + panic("frontend: could not register Builder with an empty name") + } + if b == nil { + panic("frontend: could not register a nil Builder") + } - // AfterScrape does something with the results of a Scrape after it has been completed. - AfterScrape(context.Context, *bittorrent.ScrapeRequest, *bittorrent.ScrapeResponse) + buildersMU.Lock() + defer buildersMU.Unlock() - // Ping executes checks if all hooks are operational - Ping(context.Context) error + if _, dup := builders[name]; dup { + panic("frontend: RegisterBuilder called twice for " + name) + } + + builders[name] = b +} + +type Frontend interface { + stop.Stopper +} + +// NewFrontends is a utility function for initializing Frontend-s in bulk. +// Returns nil hook and error if frontend with name provided in config +// does not exists. +func NewFrontends(configs []conf.NamedMapConfig, logic *middleware.Logic) (fs []Frontend, err error) { + buildersMU.RLock() + defer buildersMU.RUnlock() + for _, c := range configs { + logger.Debug().Object("frontend", c).Msg("starting frontend") + newFrontend, ok := builders[c.Name] + if !ok { + err = fmt.Errorf("hook with name '%s' does not exists", c.Name) + break + } + var f Frontend + if f, err = newFrontend(c.Config, logic); err != nil { + break + } + fs = append(fs, f) + logger.Info().Str("name", c.Name).Msg("frontend started") + } + return } diff --git a/frontend/http/frontend.go b/frontend/http/frontend.go index 3519ad6..e5a51b6 100644 --- a/frontend/http/frontend.go +++ b/frontend/http/frontend.go @@ -17,16 +17,22 @@ import ( "github.com/sot-tech/mochi/bittorrent" "github.com/sot-tech/mochi/frontend" + "github.com/sot-tech/mochi/middleware" "github.com/sot-tech/mochi/pkg/conf" "github.com/sot-tech/mochi/pkg/log" "github.com/sot-tech/mochi/pkg/metrics" "github.com/sot-tech/mochi/pkg/stop" ) -var logger = log.NewLogger("http frontend") +const Name = "http" -// Config represents all of the configurable options for an HTTP BitTorrent -// Frontend. +var logger = log.NewLogger(Name) + +func init() { + frontend.RegisterBuilder(Name, newFrontend) +} + +// Config represents all configurable options for an HTTP BitTorrent Frontend type Config struct { Addr string HTTPSAddr string `cfg:"https_addr"` @@ -94,28 +100,25 @@ func (cfg Config) Validate() Config { return validcfg } -// Frontend represents the state of an HTTP BitTorrent Frontend. -type Frontend struct { +type httpFE struct { srv *http.Server srvMu sync.Mutex tlsSrv *http.Server tlsSrvMu sync.Mutex tlsCfg *tls.Config - logic frontend.TrackerLogic + logic *middleware.Logic Config } -// NewFrontend creates a new instance of an HTTP Frontend that asynchronously -// serves requests. -func NewFrontend(logic frontend.TrackerLogic, c conf.MapConfig) (*Frontend, error) { +func newFrontend(c conf.MapConfig, logic *middleware.Logic) (frontend.Frontend, error) { var provided Config if err := c.Unmarshal(&provided); err != nil { return nil, err } cfg := provided.Validate() - f := &Frontend{ + f := &httpFE{ logic: logic, Config: cfg, srvMu: sync.Mutex{}, @@ -182,7 +185,7 @@ func NewFrontend(logic frontend.TrackerLogic, c conf.MapConfig) (*Frontend, erro } // Stop provides a thread-safe way to shut down a currently running Frontend. -func (f *Frontend) Stop() stop.Result { +func (f *httpFE) Stop() stop.Result { stopGroup := stop.NewGroup() f.srvMu.Lock() @@ -200,7 +203,7 @@ func (f *Frontend) Stop() stop.Result { return stopGroup.Stop() } -func (f *Frontend) makeStopFunc(stopSrv *http.Server) stop.Func { +func (f *httpFE) makeStopFunc(stopSrv *http.Server) stop.Func { return func() stop.Result { c := make(stop.Channel) go func() { @@ -212,7 +215,7 @@ func (f *Frontend) makeStopFunc(stopSrv *http.Server) stop.Func { // serveHTTP blocks while listening and serving non-TLS HTTP BitTorrent // requests until Stop() is called or an error is returned. -func (f *Frontend) serveHTTP(handler http.Handler, tls bool) error { +func (f *httpFE) serveHTTP(handler http.Handler, tls bool) error { srv := &http.Server{ Handler: handler, ReadTimeout: f.ReadTimeout, @@ -277,7 +280,7 @@ func injectRouteParamsToContext(ctx context.Context, ps httprouter.Params) conte } // announceRoute parses and responds to an Announce. -func (f *Frontend) announceRoute(w http.ResponseWriter, r *http.Request, ps httprouter.Params) { +func (f *httpFE) announceRoute(w http.ResponseWriter, r *http.Request, ps httprouter.Params) { var err error var start time.Time var addr netip.Addr @@ -314,7 +317,7 @@ func (f *Frontend) announceRoute(w http.ResponseWriter, r *http.Request, ps http } // scrapeRoute parses and responds to a Scrape. -func (f *Frontend) scrapeRoute(w http.ResponseWriter, r *http.Request, ps httprouter.Params) { +func (f *httpFE) scrapeRoute(w http.ResponseWriter, r *http.Request, ps httprouter.Params) { var err error var start time.Time var addr netip.Addr @@ -349,7 +352,7 @@ func (f *Frontend) scrapeRoute(w http.ResponseWriter, r *http.Request, ps httpro go f.logic.AfterScrape(ctx, req, resp) } -func (f *Frontend) ping(w http.ResponseWriter, r *http.Request, _ httprouter.Params) { +func (f *httpFE) ping(w http.ResponseWriter, r *http.Request, _ httprouter.Params) { var err error if r.Method == http.MethodGet { err = f.logic.Ping(context.TODO()) diff --git a/frontend/options.go b/frontend/options.go index d8941ca..c77f84c 100644 --- a/frontend/options.go +++ b/frontend/options.go @@ -1,9 +1,5 @@ package frontend -import "github.com/sot-tech/mochi/pkg/log" - -var logger = log.NewLogger("frontend configurator") - // ParseOptions is the configuration used to parse an Announce Request. // // If AllowIPSpoofing is true, IPs provided via params will be used. diff --git a/frontend/udp/frontend.go b/frontend/udp/frontend.go index 7b21839..c7b1ec8 100644 --- a/frontend/udp/frontend.go +++ b/frontend/udp/frontend.go @@ -18,6 +18,7 @@ import ( "github.com/sot-tech/mochi/bittorrent" "github.com/sot-tech/mochi/frontend" "github.com/sot-tech/mochi/frontend/udp/bytepool" + "github.com/sot-tech/mochi/middleware" "github.com/sot-tech/mochi/pkg/conf" "github.com/sot-tech/mochi/pkg/log" "github.com/sot-tech/mochi/pkg/metrics" @@ -25,12 +26,18 @@ import ( "github.com/sot-tech/mochi/pkg/timecache" ) +const Name = "http" + var ( - logger = log.NewLogger("udp frontend") + logger = log.NewLogger(Name) allowedGeneratedPrivateKeyRunes = []rune("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ1234567890") errUnexpectedConnType = errors.New("unexpected connection type (not UDPConn)") ) +func init() { + frontend.RegisterBuilder(Name, newFrontend) +} + // Config represents all of the configurable options for a UDP BitTorrent // Tracker. type Config struct { @@ -69,28 +76,26 @@ func (cfg Config) Validate() Config { return validcfg } -// Frontend holds the state of a UDP BitTorrent Frontend. -type Frontend struct { +// udpFE holds the state of a UDP BitTorrent Frontend. +type udpFE struct { socket *net.UDPConn closing chan struct{} wg sync.WaitGroup genPool *sync.Pool - logic frontend.TrackerLogic + logic *middleware.Logic Config } -// NewFrontend creates a new instance of an UDP Frontend that asynchronously -// serves requests. -func NewFrontend(logic frontend.TrackerLogic, c conf.MapConfig) (*Frontend, error) { +func newFrontend(c conf.MapConfig, logic *middleware.Logic) (frontend.Frontend, error) { var provided Config if err := c.Unmarshal(&provided); err != nil { return nil, err } cfg := provided.Validate() - f := &Frontend{ + f := &udpFE{ closing: make(chan struct{}), logic: logic, Config: cfg, @@ -116,7 +121,7 @@ func NewFrontend(logic frontend.TrackerLogic, c conf.MapConfig) (*Frontend, erro } // Stop provides a thread-safe way to shut down a currently running Frontend. -func (t *Frontend) Stop() stop.Result { +func (t *udpFE) Stop() stop.Result { select { case <-t.closing: return stop.AlreadyStopped @@ -135,7 +140,7 @@ func (t *Frontend) Stop() stop.Result { } // listen resolves the address and binds the server socket. -func (t *Frontend) listen() (err error) { +func (t *udpFE) listen() (err error) { if t.ReusePort { var ln net.PacketConn if ln, err = reuseport.ListenPacket("udp", t.Addr); err == nil { @@ -156,12 +161,12 @@ func (t *Frontend) listen() (err error) { // serve blocks while listening and serving UDP BitTorrent requests // until Stop() is called or an error is returned. -func (t *Frontend) serve() error { +func (t *udpFE) serve() error { pool := bytepool.New(2048) defer t.wg.Done() for { - // Check to see if we need to shutdown. + // Check to see if we need shutdown. select { case <-t.closing: log.Debug().Msg("serve received shutdown signal") @@ -229,7 +234,7 @@ func (w ResponseWriter) Write(b []byte) (int, error) { } // handleRequest parses and responds to a UDP Request. -func (t *Frontend) handleRequest(r Request, w ResponseWriter) (actionName string, err error) { +func (t *udpFE) handleRequest(r Request, w ResponseWriter) (actionName string, err error) { if len(r.Packet) < 16 { // Malformed, no client packets are less than 16 bytes. // We explicitly return nothing in case this is a DoS attempt. diff --git a/frontend/udp/frontend_test.go b/frontend/udp/frontend_test.go index 86b8c6f..c87cee8 100644 --- a/frontend/udp/frontend_test.go +++ b/frontend/udp/frontend_test.go @@ -22,7 +22,7 @@ func TestStartStopRaceIssue437(t *testing.T) { t.Fatal(err) } lgc := middleware.NewLogic(0, 0, ps, nil, nil) - fe, err := udp.NewFrontend(lgc, conf.MapConfig{"addr": "127.0.0.1:0"}) + fe, err := udp.newFrontend(conf.MapConfig{"addr": "127.0.0.1:0"}, lgc) if err != nil { t.Fatal(err) } diff --git a/go.mod b/go.mod index 437ea2a..60f7565 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,7 @@ module github.com/sot-tech/mochi go 1.18 require ( - code.cloudfoundry.org/go-diodes v0.0.0-20220927211948-2985a72b297c + code.cloudfoundry.org/go-diodes v0.0.0-20221017175818-728392b37655 github.com/MicahParks/keyfunc v1.5.1 github.com/anacrolix/torrent v1.47.0 github.com/go-redis/redis/v8 v8.11.5 @@ -20,7 +20,7 @@ require ( ) require ( - github.com/anacrolix/dht/v2 v2.19.0 // indirect + github.com/anacrolix/dht/v2 v2.19.1 // indirect github.com/anacrolix/log v0.13.2-0.20220711050817-613cb738ef30 // indirect github.com/anacrolix/missinggo v1.3.0 // indirect github.com/anacrolix/missinggo/v2 v2.7.1 // indirect @@ -40,11 +40,11 @@ require ( github.com/mattn/go-isatty v0.0.16 // indirect github.com/matttproud/golang_protobuf_extensions v1.0.2 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect - github.com/prometheus/client_model v0.2.0 // indirect + github.com/prometheus/client_model v0.3.0 // indirect github.com/prometheus/common v0.37.0 // indirect github.com/prometheus/procfs v0.8.0 // indirect - golang.org/x/crypto v0.0.0-20221012134737-56aed061732a // indirect - golang.org/x/sys v0.0.0-20221013171732-95e765b1cc43 // indirect - golang.org/x/text v0.3.8 // indirect + golang.org/x/crypto v0.1.0 // indirect + golang.org/x/sys v0.1.0 // indirect + golang.org/x/text v0.4.0 // indirect google.golang.org/protobuf v1.28.1 // indirect ) diff --git a/go.sum b/go.sum index 089ed53..1b09602 100644 --- a/go.sum +++ b/go.sum @@ -30,8 +30,8 @@ cloud.google.com/go/storage v1.5.0/go.mod h1:tpKbwo567HUNpVclU5sGELwQWBDZ8gh0Zeo cloud.google.com/go/storage v1.6.0/go.mod h1:N7U0C8pVQ/+NIKOBQyamJIeKQKkZ+mxpohlUTyfDhBk= cloud.google.com/go/storage v1.8.0/go.mod h1:Wv1Oy7z6Yz3DshWRJFhqM/UCfaWIRTdp0RXyy7KQOVs= cloud.google.com/go/storage v1.10.0/go.mod h1:FLPqc6j+Ki4BU591ie1oL6qBQGu2Bl/tZ9ullr3+Kg0= -code.cloudfoundry.org/go-diodes v0.0.0-20220927211948-2985a72b297c h1:6Gb9bHc7g8thQ4AWxwdA6qxhtDNcqNw2T1+Z+w37kOw= -code.cloudfoundry.org/go-diodes v0.0.0-20220927211948-2985a72b297c/go.mod h1:v30sK4Ipg4Ng0pRG22iLfeMRfYUHQNee9f4NkE5t1yc= +code.cloudfoundry.org/go-diodes v0.0.0-20221017175818-728392b37655 h1:3IlYr3dIyXkemR1GoOITnBWnWBHG27wnC6mpV9OLD7c= +code.cloudfoundry.org/go-diodes v0.0.0-20221017175818-728392b37655/go.mod h1:T+qZrTfw8xwdJgVRD5pSvVyx4CeqsP30YQooBYFADRc= crawshaw.io/iox v0.0.0-20181124134642-c51c3df30797/go.mod h1:sXBiorCo8c46JlQV3oXPKINnZ8mcqnye1EkVkqsectk= crawshaw.io/sqlite v0.3.2/go.mod h1:igAO5JulrQ1DbdZdtVq48mnZUBAPOeFzer7VhDWNtW4= dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU= @@ -49,8 +49,8 @@ github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuy github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho= -github.com/anacrolix/dht/v2 v2.19.0 h1:A9oMHWRGbLmCyx1JlYzg79bDrur8V60+0ts8ZwEVYt4= -github.com/anacrolix/dht/v2 v2.19.0/go.mod h1:0h83KnnAQ2AUYhpQ/CkoZP45K41pjDAlPR9zGHgFjQE= +github.com/anacrolix/dht/v2 v2.19.1 h1:V/UUGBASGYqYkSnmHJwX8uQmzkyhbgwE6jqcHKnNTD8= +github.com/anacrolix/dht/v2 v2.19.1/go.mod h1:3TU93c1s/oA8I/VH4m3CNP/BeKsiOGmo6HwfZBMTKUs= github.com/anacrolix/envpprof v0.0.0-20180404065416-323002cec2fa/go.mod h1:KgHhUaQMc8cC0+cEflSgCFNFbKwi5h54gqtVn8yhP7c= github.com/anacrolix/envpprof v1.0.0/go.mod h1:KgHhUaQMc8cC0+cEflSgCFNFbKwi5h54gqtVn8yhP7c= github.com/anacrolix/envpprof v1.1.0/go.mod h1:My7T5oSqVfEn4MD4Meczkw/f5lSIndGAKu/0SM/rkf4= @@ -275,7 +275,7 @@ github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+W github.com/onsi/ginkgo v1.7.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= github.com/onsi/ginkgo v1.16.5 h1:8xi0RTUf59SOSfEtZMvwTvXYMzG4gV23XVHOZiXNtnE= github.com/onsi/gomega v1.4.3/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY= -github.com/onsi/gomega v1.20.2 h1:8uQq0zMgLEfa0vRrrBgaJF2gyW9Da9BmfGV+OyUzfkY= +github.com/onsi/gomega v1.22.1 h1:pY8O4lBfsHKZHM/6nrxkhVPUznOlIu3quZcKP/M20KI= github.com/openzipkin/zipkin-go v0.1.6/go.mod h1:QgAqvLzwWbR/WpD4A3cGpPtJrZXNIiJc5AZX7/PBEpw= github.com/philhofer/fwd v1.0.0/go.mod h1:gk3iGcWd9+svBvR0sR+KPcfE+RNWozjowpeBVG3ZVNU= github.com/pierrec/lz4 v2.0.5+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY= @@ -298,8 +298,9 @@ github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1: github.com/prometheus/client_model v0.0.0-20190115171406-56726106282f/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo= github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= -github.com/prometheus/client_model v0.2.0 h1:uq5h0d+GuxiXLJLNABMgp2qUWDPiLvgCzz2dUR+/W/M= github.com/prometheus/client_model v0.2.0/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= +github.com/prometheus/client_model v0.3.0 h1:UBgGFHqYdG/TPFD1B1ogZywDqEkwp3fBMvqdiQ7Xew4= +github.com/prometheus/client_model v0.3.0/go.mod h1:LDGWKZIo7rky3hgvBe+caln+Dr3dPggB5dvjtD7w9+w= github.com/prometheus/common v0.2.0/go.mod h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y86RQel1bk4= github.com/prometheus/common v0.4.1/go.mod h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y86RQel1bk4= github.com/prometheus/common v0.9.1/go.mod h1:yhUN8i9wzaXS3w1O07YhxHEBxD+W35wd8bs7vj7HSQ4= @@ -367,8 +368,8 @@ golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8U golang.org/x/crypto v0.0.0-20190605123033-f99c8df09eb5/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= -golang.org/x/crypto v0.0.0-20221012134737-56aed061732a h1:NmSIgad6KjE6VvHciPZuNRTKxGhlPfD6OA87W/PLkqg= -golang.org/x/crypto v0.0.0-20221012134737-56aed061732a/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= +golang.org/x/crypto v0.1.0 h1:MDRAIl0xIo9Io2xV565hzXHw3zVseKrJKodhohM5CjU= +golang.org/x/crypto v0.1.0/go.mod h1:RecgLatLF4+eUMCP1PoPZQb+cVrJcOPbHkTkbkB9sbw= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190510132918-efd6b22b2522/go.mod h1:ZjyILWgesfNpC6sMxTJOJm9Kp84zZh5NQWvqDGG3Qr8= @@ -432,7 +433,7 @@ golang.org/x/net v0.0.0-20200822124328-c89045814202/go.mod h1:/O7V0waA8r7cgGh81R golang.org/x/net v0.0.0-20210525063256-abc453219eb5/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk= golang.org/x/net v0.0.0-20220225172249-27dd8689420f/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk= -golang.org/x/net v0.0.0-20220722155237-a158d28d115b h1:PxfKdU9lEEDYjdIzOtC4qFWgkU2rGHdKlKowJSMN9h0= +golang.org/x/net v0.1.0 h1:hZ/3BUoy5aId7sCpA/Tc5lt8DkFgdVS2onTpJsZ/fl0= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= @@ -495,8 +496,8 @@ golang.org/x/sys v0.0.0-20220114195835-da31bd327af9/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20220704084225-05e143d24a9e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220908164124-27713097b956/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.0.0-20221013171732-95e765b1cc43 h1:OK7RB6t2WQX54srQQYSXMW8dF5C6/8+oA/s5QBmmto4= -golang.org/x/sys v0.0.0-20221013171732-95e765b1cc43/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.1.0 h1:kunALQeHf1/185U1i0GOB/fy1IPRDDpuoOOqRReG57U= +golang.org/x/sys v0.1.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= @@ -506,8 +507,8 @@ golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= -golang.org/x/text v0.3.8 h1:nAL+RVCQ9uMn3vJZbV+MRnydTJFPf8qqY42YiA6MrqY= -golang.org/x/text v0.3.8/go.mod h1:E6s5w1FMmriuDzIBO73fBruAKo1PCIq6d2Q6DHfQ8WQ= +golang.org/x/text v0.4.0 h1:BrVqGRd7+k1DiOgtnFvAkoQEWQvBc25ouMJM6429SFg= +golang.org/x/text v0.4.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= diff --git a/middleware/clientapproval/clientapproval.go b/middleware/clientapproval/clientapproval.go index 644e8ed..8c272dd 100644 --- a/middleware/clientapproval/clientapproval.go +++ b/middleware/clientapproval/clientapproval.go @@ -39,10 +39,10 @@ type hook struct { unapproved map[ClientID]struct{} } -func build(options conf.MapConfig, _ storage.PeerStorage) (middleware.Hook, error) { +func build(config conf.MapConfig, _ storage.PeerStorage) (middleware.Hook, error) { var cfg Config - if err := options.Unmarshal(&cfg); err != nil { + if err := config.Unmarshal(&cfg); err != nil { return nil, fmt.Errorf("middleware %s: %w", Name, err) } diff --git a/middleware/hooks.go b/middleware/hooks.go index 2b1d77c..444969b 100644 --- a/middleware/hooks.go +++ b/middleware/hooks.go @@ -19,7 +19,7 @@ type Hook interface { } // Pinger is an optional interface that may be implemented by a pre Hook -// to check if it is operational. Used in frontend.TrackerLogic. +// to check if it is operational. Used in frontend.Logic. // // It may be useful in cases when Hook performs foreign requests to // some external resources (i.e. storage) and `ping` request should diff --git a/middleware/jwt/jwt.go b/middleware/jwt/jwt.go index fe57633..32397b5 100644 --- a/middleware/jwt/jwt.go +++ b/middleware/jwt/jwt.go @@ -70,12 +70,10 @@ type hook struct { jwks *keyfunc.JWKS } -func build(options conf.MapConfig, _ storage.PeerStorage) (h middleware.Hook, err error) { +func build(config conf.MapConfig, _ storage.PeerStorage) (h middleware.Hook, err error) { var cfg Config - logger.Debug().Object("options", options).Msg("creating new JWT middleware") - - if err = options.Unmarshal(&cfg); err != nil { + if err = config.Unmarshal(&cfg); err != nil { return nil, fmt.Errorf("middleware %s: %w", Name, err) } diff --git a/middleware/logic.go b/middleware/logic.go index 93320de..b876250 100644 --- a/middleware/logic.go +++ b/middleware/logic.go @@ -5,18 +5,22 @@ import ( "time" "github.com/sot-tech/mochi/bittorrent" - "github.com/sot-tech/mochi/frontend" - "github.com/sot-tech/mochi/pkg/log" "github.com/sot-tech/mochi/pkg/stop" "github.com/sot-tech/mochi/storage" ) -var ( - logger = log.NewLogger("middleware") - _ frontend.TrackerLogic = &Logic{} -) +// Logic used by a frontend in order to: (1) generate a +// response from a parsed request, and (2) asynchronously observe anything +// after the response has been delivered to the client. +type Logic struct { + announceInterval time.Duration + minAnnounceInterval time.Duration + preHooks []Hook + postHooks []Hook + pingers []Pinger +} -// NewLogic creates a new instance of a TrackerLogic that executes the provided +// NewLogic creates a new instance of a Logic that executes the provided // middleware hooks. func NewLogic(annInterval, minAnnInterval time.Duration, peerStore storage.PeerStorage, preHooks, postHooks []Hook) *Logic { l := &Logic{ @@ -34,17 +38,10 @@ func NewLogic(annInterval, minAnnInterval time.Duration, peerStore storage.PeerS return l } -// Logic is an implementation of the TrackerLogic that functions by -// executing a series of middleware hooks. -type Logic struct { - announceInterval time.Duration - minAnnounceInterval time.Duration - preHooks []Hook - postHooks []Hook - pingers []Pinger -} - // HandleAnnounce generates a response for an Announce. +// +// Returns the updated context, the generated AnnounceResponse and no error +// on success; nil and error on failure. func (l *Logic) HandleAnnounce(ctx context.Context, req *bittorrent.AnnounceRequest) (_ context.Context, resp *bittorrent.AnnounceResponse, err error) { logger.Debug().Object("request", req).Msg("new announce request") resp = &bittorrent.AnnounceResponse{ @@ -62,8 +59,8 @@ func (l *Logic) HandleAnnounce(ctx context.Context, req *bittorrent.AnnounceRequ return ctx, resp, nil } -// AfterAnnounce does something with the results of an Announce after it has -// been completed. +// AfterAnnounce does something with the results of an Announce after it +// has been completed. func (l *Logic) AfterAnnounce(ctx context.Context, req *bittorrent.AnnounceRequest, resp *bittorrent.AnnounceResponse) { var err error for _, h := range l.postHooks { @@ -78,6 +75,9 @@ func (l *Logic) AfterAnnounce(ctx context.Context, req *bittorrent.AnnounceReque } // HandleScrape generates a response for a Scrape. +// +// Returns the updated context, the generated AnnounceResponse and no error +// on success; nil and error on failure. func (l *Logic) HandleScrape(ctx context.Context, req *bittorrent.ScrapeRequest) (_ context.Context, resp *bittorrent.ScrapeResponse, err error) { logger.Debug().Object("request", req).Msg("new scrape request") resp = &bittorrent.ScrapeResponse{ @@ -93,8 +93,7 @@ func (l *Logic) HandleScrape(ctx context.Context, req *bittorrent.ScrapeRequest) return ctx, resp, nil } -// AfterScrape does something with the results of a Scrape after it has been -// completed. +// AfterScrape does something with the results of a Scrape after it has been completed. func (l *Logic) AfterScrape(ctx context.Context, req *bittorrent.ScrapeRequest, resp *bittorrent.ScrapeResponse) { var err error for _, h := range l.postHooks { @@ -109,7 +108,7 @@ func (l *Logic) AfterScrape(ctx context.Context, req *bittorrent.ScrapeRequest, } } -// Ping performs check if all Hook-s are operational +// Ping executes checks if all Hook-s are operational func (l *Logic) Ping(ctx context.Context) (err error) { for _, p := range l.pingers { if err = p.Ping(ctx); err != nil { diff --git a/middleware/middleware.go b/middleware/middleware.go index bb8bf05..d25fb30 100644 --- a/middleware/middleware.go +++ b/middleware/middleware.go @@ -1,92 +1,65 @@ -// Package middleware implements the TrackerLogic interface by executing +// Package middleware implements the Logic interface by executing // a series of middleware hooks. package middleware import ( - "errors" + "fmt" "sync" "github.com/sot-tech/mochi/pkg/conf" + "github.com/sot-tech/mochi/pkg/log" "github.com/sot-tech/mochi/storage" ) var ( - driversM sync.RWMutex - drivers = make(map[string]Builder) - - // ErrBuilderDoesNotExist is the error returned by NewMiddleware when a - // middleware driver with that name does not exist. - ErrBuilderDoesNotExist = errors.New("middleware builder with that name does not exist") + logger = log.NewLogger("middleware") + buildersMU sync.RWMutex + builders = make(map[string]Builder) ) -// Builder is the interface used to initialize a new type of middleware. -// -// The `options` parameter is map of parameters that should be unmarshalled into -// the hook's custom configuration. -type Builder func(options conf.MapConfig, storage storage.PeerStorage) (Hook, error) +// Builder is the function used to initialize a new Hook +// with provided configuration. +type Builder func(conf.MapConfig, storage.PeerStorage) (Hook, error) // RegisterBuilder makes a Builder available by the provided name. // // If called twice with the same name, the name is blank, or if the provided // Builder is nil, this function panics. -func RegisterBuilder(name string, d Builder) { +func RegisterBuilder(name string, b Builder) { if name == "" { - panic("middleware: could not register a Builder with an empty name") + panic("middleware: could not register Builder with an empty name") } - if d == nil { + if b == nil { panic("middleware: could not register a nil Builder") } - driversM.Lock() - defer driversM.Unlock() + buildersMU.Lock() + defer buildersMU.Unlock() - if _, dup := drivers[name]; dup { + if _, dup := builders[name]; dup { panic("middleware: RegisterBuilder called twice for " + name) } - drivers[name] = d -} - -// NewHook attempts to initialize a new middleware instance from the -// list of registered Builders. -// -// If a driver does not exist, returns ErrBuilderDoesNotExist. -func NewHook(name string, options conf.MapConfig, storage storage.PeerStorage) (Hook, error) { - driversM.RLock() - defer driversM.RUnlock() - - var newHook Builder - newHook, ok := drivers[name] - if !ok { - return nil, ErrBuilderDoesNotExist - } - - return newHook(options, storage) -} - -// Config is the generic configuration format used for all registered Hooks. -type Config struct { - Name string - Options conf.MapConfig + builders[name] = b } // NewHooks is a utility function for initializing Hooks in bulk. -// each element of configs must contain pairs `name` - string and `options` - map[string]any -func NewHooks(configs []conf.MapConfig, storage storage.PeerStorage) (hooks []Hook, err error) { - for _, cfg := range configs { - var c Config - - if err = cfg.Unmarshal(&c); err != nil { +func NewHooks(configs []conf.NamedMapConfig, storage storage.PeerStorage) (hooks []Hook, err error) { + buildersMU.RLock() + defer buildersMU.RUnlock() + for _, c := range configs { + logger.Debug().Object("hook", c).Msg("starting hook") + newHook, ok := builders[c.Name] + if !ok { + err = fmt.Errorf("hook with name '%s' does not exists", c.Name) break } - var h Hook - h, err = NewHook(c.Name, c.Options, storage) - if err != nil { + if h, err = newHook(c.Config, storage); err != nil { break } - hooks = append(hooks, h) + logger.Info().Str("name", c.Name).Msg("hook started") } return diff --git a/middleware/torrentapproval/torrentapproval.go b/middleware/torrentapproval/torrentapproval.go index 2bed5f5..fab9f8a 100644 --- a/middleware/torrentapproval/torrentapproval.go +++ b/middleware/torrentapproval/torrentapproval.go @@ -38,18 +38,18 @@ type baseConfig struct { Configuration conf.MapConfig } -func build(options conf.MapConfig, st storage.PeerStorage) (h middleware.Hook, err error) { +func build(config conf.MapConfig, st storage.PeerStorage) (h middleware.Hook, err error) { var cfg baseConfig - if err = options.Unmarshal(&cfg); err != nil { + if err = config.Unmarshal(&cfg); err != nil { return nil, fmt.Errorf("middleware %s: %w", Name, err) } if len(cfg.Source) == 0 { - return nil, fmt.Errorf("invalid options for middleware %s: source not provided", Name) + return nil, fmt.Errorf("invalid config for middleware %s: source not provided", Name) } if cfg.Configuration == nil { - return nil, fmt.Errorf("invalid options for middleware %s: options not provided", Name) + return nil, fmt.Errorf("invalid config for middleware %s: config not provided", Name) } var ds storage.DataStorage = st diff --git a/middleware/varinterval/varinterval.go b/middleware/varinterval/varinterval.go index c677c76..0587d3e 100644 --- a/middleware/varinterval/varinterval.go +++ b/middleware/varinterval/varinterval.go @@ -23,10 +23,10 @@ func init() { middleware.RegisterBuilder(Name, build) } -func build(options conf.MapConfig, _ storage.PeerStorage) (h middleware.Hook, err error) { +func build(config conf.MapConfig, _ storage.PeerStorage) (h middleware.Hook, err error) { var cfg Config - if err = options.Unmarshal(&cfg); err != nil { + if err = config.Unmarshal(&cfg); err != nil { err = fmt.Errorf("middleware %s: %w", Name, err) } else { if err := checkConfig(cfg); err == nil { diff --git a/pkg/conf/decoder.go b/pkg/conf/config.go similarity index 87% rename from pkg/conf/decoder.go rename to pkg/conf/config.go index 2f28be1..37f4f55 100644 --- a/pkg/conf/decoder.go +++ b/pkg/conf/config.go @@ -51,3 +51,12 @@ func (m MapConfig) Unmarshal(into any) (err error) { } return } + +type NamedMapConfig struct { + Name string + Config MapConfig +} + +func (nm NamedMapConfig) MarshalZerologObject(e *zerolog.Event) { + e.Str("name", nm.Name).Dict("config", zerolog.Dict().EmbedObject(nm.Config)) +} diff --git a/storage/keydb/storage.go b/storage/keydb/storage.go index 4d8b1b1..0c9860f 100644 --- a/storage/keydb/storage.go +++ b/storage/keydb/storage.go @@ -39,7 +39,7 @@ var ( func init() { // Register the storage driver. - storage.RegisterBuilder(Name, builder) + storage.RegisterDriver(Name, builder) } func builder(icfg conf.MapConfig) (storage.PeerStorage, error) { diff --git a/storage/memory/storage.go b/storage/memory/storage.go index 07ec97d..cc6fe16 100644 --- a/storage/memory/storage.go +++ b/storage/memory/storage.go @@ -29,7 +29,7 @@ var logger = log.NewLogger(Name) func init() { // Register the storage driver. - storage.RegisterBuilder(Name, builder) + storage.RegisterDriver(Name, builder) } func builder(icfg conf.MapConfig) (storage.PeerStorage, error) { diff --git a/storage/pg/storage.go b/storage/pg/storage.go index c9c618b..2d2d01a 100644 --- a/storage/pg/storage.go +++ b/storage/pg/storage.go @@ -56,7 +56,7 @@ var ( func init() { // Register the storage builder. - storage.RegisterBuilder(Name, builder) + storage.RegisterDriver(Name, builder) } func builder(icfg conf.MapConfig) (storage.PeerStorage, error) { @@ -107,8 +107,8 @@ type dataQueryConf struct { } type downloadQueryConf struct { - GetQuery string - IncrementQuery string + GetQuery string `cfg:"get_query"` + IncrementQuery string `cfg:"inc_query"` } // Config holds the configuration of a redis PeerStorage. @@ -223,20 +223,6 @@ type store struct { closed chan any } -func (s *store) tx(ctx context.Context, query string, args pgx.NamedArgs) (err error) { - var tx pgx.Tx - if tx, err = s.Begin(ctx); err == nil { - if _, err = tx.Exec(ctx, query, args); err == nil { - err = tx.Commit(ctx) - } else { - if txErr := tx.Rollback(ctx); txErr != nil { - err = fmt.Errorf(errRollBackMsg, txErr, err) - } - } - } - return -} - func (s *store) txBatch(ctx context.Context, batch *pgx.Batch) (err error) { var tx pgx.Tx if tx, err = s.Begin(ctx); err == nil { @@ -256,7 +242,7 @@ func (s *store) Put(ctx string, values ...storage.Entry) (err error) { case 0: // ignore case 1: - err = s.tx(context.TODO(), s.Data.AddQuery, pgx.NamedArgs{pCtx: ctx, pKey: []byte(values[0].Key), pValue: values[0].Value}) + _, err = s.Exec(context.TODO(), s.Data.AddQuery, pgx.NamedArgs{pCtx: ctx, pKey: []byte(values[0].Key), pValue: values[0].Value}) default: var batch pgx.Batch for _, v := range values { @@ -569,7 +555,7 @@ func (s *store) countPeers(ih []byte) (seeders uint32, leechers uint32) { } } if err != nil { - logger.Error().Err(err).Hex("infoHash", ih).Msg("unable to get peers count") + logger.Error().Err(err).Bytes("infoHash", ih).Msg("unable to get peers count") } return } diff --git a/storage/redis/storage.go b/storage/redis/storage.go index a5e3ff1..45a5dc9 100644 --- a/storage/redis/storage.go +++ b/storage/redis/storage.go @@ -77,7 +77,7 @@ var ( func init() { // Register the storage builder. - storage.RegisterBuilder(Name, builder) + storage.RegisterDriver(Name, builder) } func builder(icfg conf.MapConfig) (storage.PeerStorage, error) { diff --git a/storage/storage.go b/storage/storage.go index 0336745..01628cd 100644 --- a/storage/storage.go +++ b/storage/storage.go @@ -3,7 +3,7 @@ package storage import ( - "errors" + "fmt" "sync" "time" @@ -23,9 +23,9 @@ const ( ) var ( - logger = log.NewLogger("storage configurator") - driversM sync.RWMutex - drivers = make(map[string]Builder) + logger = log.NewLogger("storage") + driversMU sync.RWMutex + drivers = make(map[string]Driver) ) // Config holds configuration for periodic execution tasks, which may or may not implement @@ -81,19 +81,14 @@ type Entry struct { Value []byte } -// Builder is the function used to initialize a new type of PeerStorage. -type Builder func(cfg conf.MapConfig) (PeerStorage, error) +// Driver is the function used to initialize a new PeerStorage +// with provided configuration. +type Driver func(conf.MapConfig) (PeerStorage, error) -var ( - // ErrResourceDoesNotExist is the error returned by all delete methods and the - // AnnouncePeers method of the PeerStorage interface if the requested resource - // does not exist. - ErrResourceDoesNotExist = bittorrent.ClientError("resource does not exist") - - // ErrDriverDoesNotExist is the error returned by NewStorage when a peer - // store driver with that name does not exist. - ErrDriverDoesNotExist = errors.New("peer store driver with that name does not exist") -) +// ErrResourceDoesNotExist is the error returned by all delete methods and the +// AnnouncePeers method of the PeerStorage interface if the requested resource +// does not exist. +var ErrResourceDoesNotExist = bittorrent.ClientError("resource does not exist") // DataStorage is the interface, used for implementing store for arbitrary data type DataStorage interface { @@ -216,80 +211,81 @@ type PeerStorage interface { stop.Stopper } -// RegisterBuilder makes a Builder available by the provided name. +// RegisterDriver makes a Driver available by the provided name. // // If called twice with the same name, the name is blank, or if the provided // Driver is nil, this function panics. -func RegisterBuilder(name string, b Builder) { +func RegisterDriver(name string, d Driver) { if name == "" { - panic("storage: could not register a Builder with an empty name") + panic("storage: could not register a Driver with an empty name") } - if b == nil { - panic("storage: could not register a nil Builder") + if d == nil { + panic("storage: could not register a nil Driver") } - driversM.Lock() - defer driversM.Unlock() + driversMU.Lock() + defer driversMU.Unlock() if _, dup := drivers[name]; dup { - panic("storage: RegisterBuilder called twice for " + name) + panic("storage: RegisterDriver called twice for " + name) } - drivers[name] = b + drivers[name] = d } // NewStorage attempts to initialize a new PeerStorage instance from -// the list of registered Drivers. -// -// If a builder does not exist, returns ErrDriverDoesNotExist. -func NewStorage(name string, cfg conf.MapConfig) (ps PeerStorage, err error) { - driversM.RLock() - defer driversM.RUnlock() +// the list of registered drivers. +func NewStorage(cfg conf.NamedMapConfig) (ps PeerStorage, err error) { + driversMU.RLock() + defer driversMU.RUnlock() + logger.Debug().Object("cfg", cfg).Msg("staring storage") - var b Builder - b, ok := drivers[name] + var b Driver + b, ok := drivers[cfg.Name] if !ok { - return nil, ErrDriverDoesNotExist + return nil, fmt.Errorf("storage with name '%s' does not exists", cfg.Name) } c := new(Config) - if err = cfg.Unmarshal(c); err != nil { + if err = cfg.Config.Unmarshal(c); err != nil { return } - if ps, err = b(cfg); err != nil { + if ps, err = b(cfg.Config); err != nil { return } if gc := ps.GCAware(); gc { gcInterval, peerTTL := c.sanitizeGCConfig() logger.Info(). - Str("type", name). + Str("name", cfg.Name). Dur("gcInterval", gcInterval). Dur("peerTTL", peerTTL). Msg("scheduling GC") ps.ScheduleGC(gcInterval, peerTTL) } else { logger.Debug(). - Str("type", name). + Str("name", cfg.Name). Msg("storage does not support GC") } if st := ps.StatisticsAware(); st { if statInterval := c.sanitizeStatisticsConfig(); statInterval > 0 { logger.Info(). - Str("type", name). + Str("name", cfg.Name). Dur("statInterval", statInterval). Msg("scheduling statistics collection") ps.ScheduleStatisticsCollection(statInterval) } else { - logger.Info().Str("type", name).Msg("statistics collection disabled because of zero reporting interval") + logger.Info().Str("name", cfg.Name).Msg("statistics collection disabled because of zero reporting interval") } } else { logger.Debug(). - Str("type", name). + Str("name", cfg.Name). Msg("storage does not support statistics collection") } + logger.Info().Str("name", cfg.Name).Msg("storage started") + return }