(wip) rework configuration to support multiple frontends

This commit is contained in:
Lawrence, Rendall
2022-10-21 17:53:03 +03:00
parent 3d48b882c5
commit dff0ba6da8
23 changed files with 545 additions and 558 deletions
+15 -20
View File
@@ -9,7 +9,11 @@ import (
"github.com/sot-tech/mochi/pkg/conf"
// Imports to register middleware drivers.
// Imports to register frontends
_ "github.com/sot-tech/mochi/frontend/http"
_ "github.com/sot-tech/mochi/frontend/udp"
// Imports to register middleware hooks.
_ "github.com/sot-tech/mochi/middleware/clientapproval"
_ "github.com/sot-tech/mochi/middleware/jwt"
_ "github.com/sot-tech/mochi/middleware/torrentapproval"
@@ -29,29 +33,20 @@ type Config struct {
// We can make Conf extensible enough that you can program a new response
// generator at the cost of making it possible for users to create config that
// won't compose a functional tracker.
AnnounceInterval time.Duration `yaml:"announce_interval"`
MinAnnounceInterval time.Duration `yaml:"min_announce_interval"`
MetricsAddr string `yaml:"metrics_addr"`
HTTPConfig conf.MapConfig `yaml:"http"`
UDPConfig conf.MapConfig `yaml:"udp"`
Storage struct {
Name string `yaml:"name"`
Config conf.MapConfig `yaml:"config"`
} `yaml:"storage"`
PreHooks []conf.MapConfig `yaml:"prehooks"`
PostHooks []conf.MapConfig `yaml:"posthooks"`
AnnounceInterval time.Duration `yaml:"announce_interval"`
MinAnnounceInterval time.Duration `yaml:"min_announce_interval"`
MetricsAddr string `yaml:"metrics_addr"`
Frontends []conf.NamedMapConfig `yaml:"frontends"`
Storage conf.NamedMapConfig `yaml:"storage"`
PreHooks []conf.NamedMapConfig `yaml:"prehooks"`
PostHooks []conf.NamedMapConfig `yaml:"posthooks"`
}
// ConfigFile represents a namespaced YAML configation file.
type ConfigFile struct {
Conf Config `yaml:"mochi"`
}
// ParseConfigFile returns a new ConfigFile given the path to a YAML
// ParseConfigFile returns a new Config given the path to a YAML
// configuration file.
//
// It supports relative and absolute paths and environment variables.
func ParseConfigFile(path string) (*ConfigFile, error) {
func ParseConfigFile(path string) (*Config, error) {
if path == "" {
return nil, errors.New("no config path specified")
}
@@ -59,7 +54,7 @@ func ParseConfigFile(path string) (*ConfigFile, error) {
f, err := os.Open(os.ExpandEnv(path))
if err == nil {
defer f.Close()
cfgFile := new(ConfigFile)
cfgFile := new(Config)
err = yaml.NewDecoder(f).Decode(cfgFile)
return cfgFile, err
}
+16 -34
View File
@@ -4,8 +4,7 @@ import (
"errors"
"fmt"
"github.com/sot-tech/mochi/frontend/http"
"github.com/sot-tech/mochi/frontend/udp"
"github.com/sot-tech/mochi/frontend"
"github.com/sot-tech/mochi/middleware"
"github.com/sot-tech/mochi/pkg/log"
"github.com/sot-tech/mochi/pkg/metrics"
@@ -24,11 +23,10 @@ type Server struct {
// It is optional to provide an instance of the peer store to avoid the
// creation of a new one.
func (r *Server) Run(configFilePath string) error {
configFile, err := ParseConfigFile(configFilePath)
cfg, err := ParseConfigFile(configFilePath)
if err != nil {
return fmt.Errorf("failed to read config: %w", err)
}
cfg := configFile.Conf
r.sg = stop.NewGroup()
@@ -39,51 +37,35 @@ func (r *Server) Run(configFilePath string) error {
log.Info().Msg("metrics disabled because of empty address")
}
log.Debug().Str("name", cfg.Storage.Name).Object("config", cfg.Storage.Config).Msg("starting storage")
r.storage, err = storage.NewStorage(cfg.Storage.Name, cfg.Storage.Config)
r.storage, err = storage.NewStorage(cfg.Storage)
if err != nil {
return fmt.Errorf("failed to create storage: %w", err)
}
log.Info().Str("name", cfg.Storage.Name).Msg("started storage")
preHooks, err := middleware.NewHooks(cfg.PreHooks, r.storage)
if err != nil {
return fmt.Errorf("failed to validate hook config: %w", err)
return fmt.Errorf("failed to configure pre-hooks: %w", err)
}
postHooks, err := middleware.NewHooks(cfg.PostHooks, r.storage)
if err != nil {
return fmt.Errorf("failed to validate hook config: %w", err)
return fmt.Errorf("failed to configure post-hooks: %w", err)
}
r.logic = middleware.NewLogic(cfg.AnnounceInterval, cfg.MinAnnounceInterval, r.storage, preHooks, postHooks)
var started bool
if len(cfg.HTTPConfig) > 0 {
log.Info().Object("config", cfg.HTTPConfig).Msg("starting HTTP frontend")
httpFE, err := http.NewFrontend(r.logic, cfg.HTTPConfig)
if err == nil {
r.sg.Add(httpFE)
started = true
if len(cfg.Frontends) > 0 {
var fs []frontend.Frontend
r.logic = middleware.NewLogic(cfg.AnnounceInterval, cfg.MinAnnounceInterval, r.storage, preHooks, postHooks)
if fs, err = frontend.NewFrontends(cfg.Frontends, r.logic); err == nil {
for _, f := range fs {
r.sg.Add(f)
}
} else {
return err
err = fmt.Errorf("failed to configure frontends: %w", err)
}
} else {
err = errors.New("no frontends configured")
}
if len(cfg.UDPConfig) > 0 {
log.Info().Object("config", cfg.UDPConfig).Msg("starting UDP frontend")
udpFE, err := udp.NewFrontend(r.logic, cfg.UDPConfig)
if err == nil {
r.sg.Add(udpFE)
started = true
} else {
return err
}
}
if !started {
return errors.New("no frontends configured")
}
return nil
return err
}
// Dispose shuts down an instance of Server.
+285 -277
View File
@@ -1,303 +1,311 @@
# @formatter:off
mochi:
# The interval communicated with BitTorrent clients informing them how
# frequently they should announce in between client events.
announce_interval: 30m
# The interval communicated with BitTorrent clients informing them how
# frequently they should announce in between client events.
announce_interval: 30m
# The interval communicated with BitTorrent clients informing them of the
# minimal duration between announces.
min_announce_interval: 15m
# The interval communicated with BitTorrent clients informing them of the
# minimal duration between announces.
min_announce_interval: 15m
# The network interface that will bind to an HTTP endpoint that can be
# scraped by programs collecting metrics.
#
# /metrics serves metrics in the Prometheus format
# /debug/pprof/{cmdline,profile,symbol,trace} serves profiles in the pprof format
metrics_addr: "0.0.0.0:6880"
# The network interface that will bind to an HTTP endpoint that can be
# scraped by programs collecting metrics.
#
# /metrics serves metrics in the Prometheus format
# /debug/pprof/{cmdline,profile,symbol,trace} serves profiles in the pprof format
metrics_addr: "0.0.0.0:6880"
frontends:
# This block defines configuration for the tracker's HTTP interface.
# If you do not wish to run this, delete this section.
http:
# The network interface that will bind to an HTTP server for serving
# BitTorrent traffic. Remove this to disable the non-TLS listener.
addr: "0.0.0.0:6969"
# The network interface that will bind to an HTTPS server for serving
# BitTorrent traffic. If set, tls_cert_path and tls_key_path are required.
https_addr: ""
# The path to the required files to listen via HTTPS.
tls_cert_path: ""
tls_key_path: ""
# Enable SO_REUSEPORT to allow starting multiple mochi instances with the same HTTP(S) port.
reuse_port: true
# The timeout durations for HTTP requests.
read_timeout: 5s
write_timeout: 5s
# When true, persistent connections will be allowed. Generally this is not
# useful for a public tracker, but helps performance in some cases (use of
# a reverse proxy, or when there are few clients issuing many requests).
enable_keepalive: false
idle_timeout: 30s
# Whether to time requests.
# Disabling this should increase performance/decrease load.
enable_request_timing: false
# An array of routes to listen on for announce requests. This is an option
# to support trackers that do not listen for /announce or need to listen
# on multiple routes.
#
# This supports named parameters and catch-all parameters as described at
# https://github.com/julienschmidt/httprouter#named-parameters
announce_routes:
- "/announce"
# - "/announce.php"
# An array of routes to listen on for scrape requests. This is an option
# to support trackers that do not listen for /scrape or need to listen
# on multiple routes.
#
# This supports named parameters and catch-all parameters as described at
# https://github.com/julienschmidt/httprouter#named-parameters
scrape_routes:
- "/scrape"
# - "/scrape.php"
# An array of routes to listen ping requests.
# Used just to ensure if server is operational. Returns nothing,
# just HTTP 200 without body. Listens both GET and HEAD HTTP methods.
# HEAD method just checks http server, GET checks all hooks,
# which support ping
ping_routes:
- "/ping"
# When not enabled, tracker will use only address from which client connected to tracker.
# When enabled, the IP address that clients advertise as their IP address will
# be appended as announce candidate.
allow_ip_spoofing: false
# When enabled, IPs from private, local and loopback subnets will be ignored
filter_private_ips: false
# The HTTP Header containing the IP address of the client.
# This is only necessary if using a reverse proxy.
real_ip_header: "x-real-ip"
# The maximum number of peers returned for an individual request.
max_numwant: 100
# The default number of peers returned for an individual request.
default_numwant: 50
# The maximum number of infohashes that can be scraped in one request.
max_scrape_infohashes: 50
# This block defines configuration for the tracker's UDP interface.
# If you do not wish to run this, delete this section.
udp:
# The network interface that will bind to a UDP server for serving
# BitTorrent traffic.
addr: "0.0.0.0:6969"
# Enable SO_REUSEPORT to allow starting multiple mochi instances with the same UDP port.
reuse_port: true
# The leeway for a timestamp on a connection ID.
max_clock_skew: 10s
# The key used to encrypt connection IDs.
private_key: "paste a random string here that will be used to hmac connection IDs"
# Whether to time requests.
# Disabling this should increase performance/decrease load.
enable_request_timing: false
# When not enabled, tracker will use only address from which client connected to tracker.
# When enabled, the IP address that clients advertise as their IP address will
# be appended as announce candidate.
allow_ip_spoofing: false
# When enabled, IPs from private, local and loopback subnets will be ignored
filter_private_ips: false
# The maximum number of peers returned for an individual request.
max_numwant: 100
# The default number of peers returned for an individual request.
default_numwant: 50
# The maximum number of infohashes that can be scraped in one request.
max_scrape_infohashes: 50
# This block defines configuration used for the storage of peer data.
storage:
name: memory
- name: http
config:
# The frequency which stale peers are removed.
# This balances between
# - collecting garbage more often, potentially using more CPU time, but potentially using less memory (lower value)
# - collecting garbage less frequently, saving CPU time, but keeping old peers long, thus using more memory (higher value).
gc_interval: 3m
# The network interface that will bind to an HTTP server for serving
# BitTorrent traffic. Remove this to disable the non-TLS listener.
addr: "0.0.0.0:6969"
# The amount of time until a peer is considered stale.
# To avoid churn, keep this slightly larger than `announce_interval`
peer_lifetime: 31m
# The network interface that will bind to an HTTPS server for serving
# BitTorrent traffic. If set, tls_cert_path and tls_key_path are required.
https_addr: ""
# The number of partitions data will be divided into in order to provide a
# higher degree of parallelism.
shard_count: 1024
# The path to the required files to listen via HTTPS.
tls_cert_path: ""
tls_key_path: ""
# Enable SO_REUSEPORT to allow starting multiple mochi instances with the same HTTP(S) port.
reuse_port: true
# The interval at which metrics about the number of infohashes and peers
# are collected and posted to Prometheus.
prometheus_reporting_interval: 1s
# The timeout durations for HTTP requests.
read_timeout: 5s
write_timeout: 5s
# This block defines configuration used for redis storage.
#storage:
#name: redis
#config:
# The frequency which stale peers are removed.
# This balances between
# - collecting garbage more often, potentially using more CPU time, but potentially using less memory (lower value)
# - collecting garbage less frequently, saving CPU time, but keeping old peers long, thus using more memory (higher value).
#gc_interval: 3m
# When true, persistent connections will be allowed. Generally this is not
# useful for a public tracker, but helps performance in some cases (use of
# a reverse proxy, or when there are few clients issuing many requests).
enable_keepalive: false
idle_timeout: 30s
# The interval at which metrics about the number of infohashes and peers
# are collected and posted to Prometheus.
#prometheus_reporting_interval: 1s
# Whether to time requests.
# Disabling this should increase performance/decrease load.
enable_request_timing: false
# The amount of time until a peer is considered stale.
# To avoid churn, keep this slightly larger than `announce_interval`
#peer_lifetime: 31m
# An array of routes to listen on for announce requests. This is an option
# to support trackers that do not listen for /announce or need to listen
# on multiple routes.
#
# This supports named parameters and catch-all parameters as described at
# https://github.com/julienschmidt/httprouter#named-parameters
announce_routes:
- "/announce"
# - "/announce.php"
# The addresses of redis storage.
# If neither sentinel not cluster switched,
# only first address used
#addresses: ["127.0.0.1:6379"]
# An array of routes to listen on for scrape requests. This is an option
# to support trackers that do not listen for /scrape or need to listen
# on multiple routes.
#
# This supports named parameters and catch-all parameters as described at
# https://github.com/julienschmidt/httprouter#named-parameters
scrape_routes:
- "/scrape"
# - "/scrape.php"
# Database to be selected after connecting to the server.
#db: 0
# An array of routes to listen ping requests.
# Used just to ensure if server is operational. Returns nothing,
# just HTTP 200 without body. Listens both GET and HEAD HTTP methods.
# HEAD method just checks http server, GET checks all hooks,
# which support ping
ping_routes:
- "/ping"
# Maximum number of socket connections, default is 10 per CPU
#pool_size: 10
# When not enabled, tracker will use only address from which client connected to tracker.
# When enabled, the IP address that clients advertise as their IP address will
# be appended as announce candidate.
allow_ip_spoofing: false
# Use the specified login/username to authenticate the current connection
#login: ""
# When enabled, IPs from private, local and loopback subnets will be ignored
filter_private_ips: false
# Optional password
#password: ""
# The HTTP Header containing the IP address of the client.
# This is only necessary if using a reverse proxy.
real_ip_header: "x-real-ip"
# Connect to sentinel nodes
#sentinel: false
# The maximum number of peers returned for an individual request.
max_numwant: 100
# The master name
#sentinel_master: ""
# The default number of peers returned for an individual request.
default_numwant: 50
# Connect to the redis cluster
#cluster: false
# The maximum number of infohashes that can be scraped in one request.
max_scrape_infohashes: 50
# The timeout for reading a command reply from redis.
#read_timeout: 15s
# This block defines configuration for the tracker's UDP interface.
# If you do not wish to run this, delete this section.
- name: udp
config:
# The network interface that will bind to a UDP server for serving
# BitTorrent traffic.
addr: "0.0.0.0:6969"
# The timeout for writing a command to redis.
#write_timeout: 15s
# Enable SO_REUSEPORT to allow starting multiple mochi instances with the same UDP port.
reuse_port: true
# Dial timeout for establishing new connections.
#connect_timeout: 15s
# The leeway for a timestamp on a connection ID.
max_clock_skew: 10s
# This block defines configuration used for PostgreSQL storage.
# example `mo_peers` table structure:
# - info_hash bytea
# - peer_id bytea
# - address inet or bytea
# - port int4
# - is_seeder bool
# - is_v6 bool
# - created timestamp
#storage:
#name: pg
#config:
# connection string to pg storage. may be URL (postgres://...) or DSN (host=... port=...)
#connection_string: host=127.0.0.1 database=test user=postgres pool_max_conns=50
# query and parameters for announce operation
#announce:
#query: SELECT peer_id, address, port FROM mo_peers WHERE info_hash=$1 AND is_seeder=$2 AND is_v6=$3 LIMIT $4
#peer_id_column: peer_id
#address_column: address
#port_column: port
#peer:
# expected parameters: 1 - info hash (bytea), 2 - peer id (bytea), 3 - ip address (bytea/inet)
# 4 - port (int), 5 - is seeder (bool), 6 - is IPv6 (bool), 7 - create date and time (timestamp)
#add_query: INSERT INTO mo_peers VALUES($1, $2, $3, $4, $5, $6, $7) ON CONFLICT (info_hash, peer_id, address, port) DO UPDATE SET created = EXCLUDED.created, is_seeder = EXCLUDED.is_seeder
#del_query: DELETE FROM mo_peers WHERE info_hash=$1 AND peer_id=$2 AND address=$3 AND port=$4 AND is_seeder=$5
#graduate_query: UPDATE mo_peers SET is_seeder=TRUE WHERE info_hash=$1 AND peer_id=$2 AND address=$3 AND port=$4 AND NOT is_seeder
#count_query: SELECT COUNT(1) FILTER (WHERE is_seeder) AS seeders, COUNT(1) FILTER (WHERE NOT is_seeder) AS leechers FROM mo_peers
# predicate part of `count_query` for get count of peers by info hash
#by_info_hash_clause: WHERE info_hash = $1
#count_seeders_column: seeders
#count_leechers_column: leechers
# queries for KV-store
#data:
# expected parameters: 1 - context (varchar), 2 - name (bytea), 3 - value (bytea)
#add_query: INSERT INTO mo_kv VALUES($1, $2, $3) ON CONFLICT (context, name) DO NOTHING
#del_query: DELETE FROM mo_kv WHERE context=$1 AND name=$2
#get_query: SELECT value FROM mo_kv WHERE context=$1 AND name=$2
# query for check if database is alive
#ping_query: SELECT 1
# query for garbage collection, expected parameter is timestamp
#gc_query: DELETE FROM mo_peers WHERE created <= $1
# The amount of time until a peer is considered stale.
# To avoid churn, keep this slightly larger than `announce_interval`
#peer_lifetime: 31m
# The frequency which stale peers are removed.
#gc_interval: 3m
# query for info hash statistics
#info_hash_count_query: SELECT COUNT(DISTINCT info_hash) as info_hashes FROM mo_peers
# The interval at which metrics about the number of info hashes and peers
# are collected and posted to Prometheus.
#prometheus_reporting_interval: 1s
# The key used to encrypt connection IDs.
private_key: "paste a random string here that will be used to hmac connection IDs"
# This block defines configuration used for middleware executed before a
# response has been returned to a BitTorrent client.
prehooks:
# - name: jwt
# options:
# header: "authorization"
# issuer: "https://issuer.com"
# audience: "https://some.issuer.com"
# jwk_set_url: "https://issuer.com/keys"
# jwk_set_update_interval: 5m
# handle_announce: true
# handle_scrape: false
#
# - name: client approval
# options:
# whitelist:
# - "OP1011"
# blacklist:
# - "OP1012"
#
# - name: interval variation
# options:
# modify_response_probability: 0.2
# max_increase_delta: 60
# modify_min_interval: true
#
# This block defines configuration used for torrent approval, it requires to be given
# hashes for whitelist or for blacklist. Hashes are hexadecimal-encoaded.
# - name: torrent approval
# options:
# initial_source: list
# Save data provided by source in storage above
# preserve: false
# configuration:
# hash_list:
# - "a1b2c3d4e5a1b2c3d4e5a1b2c3d4e5a1b2c3d4e5"
# true - whitelist mode, false - blacklist
# invert: false
# Name of storage context where store hash list
# storage_ctx: APPROVED_HASH
posthooks:
# Whether to time requests.
# Disabling this should increase performance/decrease load.
enable_request_timing: false
# When not enabled, tracker will use only address from which client connected to tracker.
# When enabled, the IP address that clients advertise as their IP address will
# be appended as announce candidate.
allow_ip_spoofing: false
# When enabled, IPs from private, local and loopback subnets will be ignored
filter_private_ips: false
# The maximum number of peers returned for an individual request.
max_numwant: 100
# The default number of peers returned for an individual request.
default_numwant: 50
# The maximum number of infohashes that can be scraped in one request.
max_scrape_infohashes: 50
# This block defines configuration used for the storage of peer data.
storage:
name: memory
config:
# The frequency which stale peers are removed.
# This balances between
# - collecting garbage more often, potentially using more CPU time, but potentially using less memory (lower value)
# - collecting garbage less frequently, saving CPU time, but keeping old peers long, thus using more memory (higher value).
gc_interval: 3m
# The amount of time until a peer is considered stale.
# To avoid churn, keep this slightly larger than `announce_interval`
peer_lifetime: 31m
# The number of partitions data will be divided into in order to provide a
# higher degree of parallelism.
shard_count: 1024
# The interval at which metrics about the number of infohashes and peers
# are collected and posted to Prometheus.
prometheus_reporting_interval: 1s
# This block defines configuration used for redis storage.
#storage:
#name: redis
#config:
# The frequency which stale peers are removed.
# This balances between
# - collecting garbage more often, potentially using more CPU time, but potentially using less memory (lower value)
# - collecting garbage less frequently, saving CPU time, but keeping old peers long, thus using more memory (higher value).
#gc_interval: 3m
# The interval at which metrics about the number of infohashes and peers
# are collected and posted to Prometheus.
#prometheus_reporting_interval: 1s
# The amount of time until a peer is considered stale.
# To avoid churn, keep this slightly larger than `announce_interval`
#peer_lifetime: 31m
# The addresses of redis storage.
# If neither sentinel not cluster switched,
# only first address used
#addresses: ["127.0.0.1:6379"]
# Database to be selected after connecting to the server.
#db: 0
# Maximum number of socket connections, default is 10 per CPU
#pool_size: 10
# Use the specified login/username to authenticate the current connection
#login: ""
# Optional password
#password: ""
# Connect to sentinel nodes
#sentinel: false
# The master name
#sentinel_master: ""
# Connect to the redis cluster
#cluster: false
# The timeout for reading a command reply from redis.
#read_timeout: 15s
# The timeout for writing a command to redis.
#write_timeout: 15s
# Dial timeout for establishing new connections.
#connect_timeout: 15s
# This block defines configuration used for PostgreSQL storage.
# example peers table structure:
# - info_hash bytea
# - peer_id bytea
# - address inet or bytea
# - port int4
# - is_seeder bool
# - is_v6 bool
# - created timestamp
# example downloads table structure:
# - info_hash bytea
# - downloads int
#storage:
#name: pg
#config:
# connection string to pg storage. may be URL (postgres://...) or DSN (host=... port=...)
#connection_string: host=127.0.0.1 database=test user=postgres pool_max_conns=50
# query and parameters for announce operation
#announce:
#query: SELECT peer_id, address, port FROM mo_peers WHERE info_hash=@info_hash AND is_seeder=@is_seeder AND is_v6=@is_v6 LIMIT @count
#peer_id_column: peer_id
#address_column: address
#port_column: port
# queries to get/increment 'snached' (downloaded) count
#downloads:
#get_query: SELECT downloads FROM mo_downloads where info_hash=@info_hash
#inc_query: INSERT INTO mo_downloads VALUES(@info_hash) ON CONFLICT(info_hash) DO UPDATE SET downloads = mo_downloads.downloads + 1
# queries and parameters for add/delete/count peers operations
#peer:
#add_query: INSERT INTO mo_peers VALUES(@info_hash, @peer_id, @address, @port, @is_seeder, @is_v6, @created) ON CONFLICT (info_hash, peer_id, address, port) DO UPDATE SET created = EXCLUDED.created, is_seeder = EXCLUDED.is_seeder
#del_query: DELETE FROM mo_peers WHERE info_hash=@info_hash AND peer_id=@peer_id AND address=@address AND port=@port AND is_seeder=@is_seeder
#graduate_query: UPDATE mo_peers SET is_seeder=TRUE WHERE info_hash=@info_hash AND peer_id=peer_id AND address=@address AND port=@port AND NOT is_seeder
#count_query: SELECT COUNT(1) FILTER (WHERE is_seeder) AS seeders, COUNT(1) FILTER (WHERE NOT is_seeder) AS leechers FROM mo_peers
# predicate part of `count_query` to get count of peers by info hash
#by_info_hash_clause: WHERE info_hash = @info_hash
#count_seeders_column: seeders
#count_leechers_column: leechers
# queries for KV-store
#data:
#add_query: INSERT INTO mo_kv VALUES(@context, @key, @value) ON CONFLICT (context, name) DO NOTHING
# NOTE: in del_query @key parameter is array, NOT single value
#del_query: DELETE FROM mo_kv WHERE context=@context AND name = ANY(@key)
#get_query: SELECT value FROM mo_kv WHERE context=@context AND name=@key
# query for check if database is alive
#ping_query: SELECT 1
# query for garbage collection, expected parameter is timestamp
#gc_query: DELETE FROM mo_peers WHERE created <= @created
# The amount of time until a peer is considered stale.
# To avoid churn, keep this slightly larger than `announce_interval`
#peer_lifetime: 31m
# The frequency which stale peers are removed.
#gc_interval: 3m
# query for info hash statistics
#info_hash_count_query: SELECT COUNT(DISTINCT info_hash) as info_hashes FROM mo_peers
# The interval at which metrics about the number of info hashes and peers
# are collected and posted to Prometheus.
#prometheus_reporting_interval: 1s
# This block defines configuration used for middleware executed before a
# response has been returned to a BitTorrent client.
prehooks:
# - name: jwt
# config:
# header: "authorization"
# issuer: "https://issuer.com"
# audience: "https://some.issuer.com"
# jwk_set_url: "https://issuer.com/keys"
# jwk_set_update_interval: 5m
# handle_announce: true
# handle_scrape: false
#
# - name: client approval
# config:
# whitelist:
# - "OP1011"
# blacklist:
# - "OP1012"
#
# - name: interval variation
# config:
# modify_response_probability: 0.2
# max_increase_delta: 60
# modify_min_interval: true
#
# This block defines configuration used for torrent approval, it requires to be given
# hashes for whitelist or for blacklist. Hashes are hexadecimal-encoaded.
# - name: torrent approval
# config:
# initial_source: list
# Save data provided by source in storage above
# preserve: false
# configuration:
# hash_list:
# - "a1b2c3d4e5a1b2c3d4e5a1b2c3d4e5a1b2c3d4e5"
# true - whitelist mode, false - blacklist
# invert: false
# Name of storage context where store hash list
# storage_ctx: APPROVED_HASH
posthooks:
+59 -23
View File
@@ -3,34 +3,70 @@
package frontend
import (
"context"
"fmt"
"sync"
"github.com/sot-tech/mochi/bittorrent"
"github.com/sot-tech/mochi/middleware"
"github.com/sot-tech/mochi/pkg/conf"
"github.com/sot-tech/mochi/pkg/log"
"github.com/sot-tech/mochi/pkg/stop"
)
// TrackerLogic is the interface used by a frontend in order to: (1) generate a
// response from a parsed request, and (2) asynchronously observe anything
// after the response has been delivered to the client.
type TrackerLogic interface {
// HandleAnnounce generates a response for an Announce.
//
// Returns the updated context, the generated AnnounceResponse and no error
// on success; nil and error on failure.
HandleAnnounce(context.Context, *bittorrent.AnnounceRequest) (context.Context, *bittorrent.AnnounceResponse, error)
var (
logger = log.NewLogger("frontend")
buildersMU sync.RWMutex
builders = make(map[string]Builder)
)
// AfterAnnounce does something with the results of an Announce after it
// has been completed.
AfterAnnounce(context.Context, *bittorrent.AnnounceRequest, *bittorrent.AnnounceResponse)
// Builder is the function used to initialize a new Frontend
// with provided configuration.
type Builder func(conf.MapConfig, *middleware.Logic) (Frontend, error)
// HandleScrape generates a response for a Scrape.
//
// Returns the updated context, the generated AnnounceResponse and no error
// on success; nil and error on failure.
HandleScrape(context.Context, *bittorrent.ScrapeRequest) (context.Context, *bittorrent.ScrapeResponse, error)
// RegisterBuilder makes a Builder available by the provided name.
//
// If called twice with the same name, the name is blank, or if the provided
// Builder is nil, this function panics.
func RegisterBuilder(name string, b Builder) {
if name == "" {
panic("frontend: could not register Builder with an empty name")
}
if b == nil {
panic("frontend: could not register a nil Builder")
}
// AfterScrape does something with the results of a Scrape after it has been completed.
AfterScrape(context.Context, *bittorrent.ScrapeRequest, *bittorrent.ScrapeResponse)
buildersMU.Lock()
defer buildersMU.Unlock()
// Ping executes checks if all hooks are operational
Ping(context.Context) error
if _, dup := builders[name]; dup {
panic("frontend: RegisterBuilder called twice for " + name)
}
builders[name] = b
}
type Frontend interface {
stop.Stopper
}
// NewFrontends is a utility function for initializing Frontend-s in bulk.
// Returns nil hook and error if frontend with name provided in config
// does not exists.
func NewFrontends(configs []conf.NamedMapConfig, logic *middleware.Logic) (fs []Frontend, err error) {
buildersMU.RLock()
defer buildersMU.RUnlock()
for _, c := range configs {
logger.Debug().Object("frontend", c).Msg("starting frontend")
newFrontend, ok := builders[c.Name]
if !ok {
err = fmt.Errorf("hook with name '%s' does not exists", c.Name)
break
}
var f Frontend
if f, err = newFrontend(c.Config, logic); err != nil {
break
}
fs = append(fs, f)
logger.Info().Str("name", c.Name).Msg("frontend started")
}
return
}
+19 -16
View File
@@ -17,16 +17,22 @@ import (
"github.com/sot-tech/mochi/bittorrent"
"github.com/sot-tech/mochi/frontend"
"github.com/sot-tech/mochi/middleware"
"github.com/sot-tech/mochi/pkg/conf"
"github.com/sot-tech/mochi/pkg/log"
"github.com/sot-tech/mochi/pkg/metrics"
"github.com/sot-tech/mochi/pkg/stop"
)
var logger = log.NewLogger("http frontend")
const Name = "http"
// Config represents all of the configurable options for an HTTP BitTorrent
// Frontend.
var logger = log.NewLogger(Name)
func init() {
frontend.RegisterBuilder(Name, newFrontend)
}
// Config represents all configurable options for an HTTP BitTorrent Frontend
type Config struct {
Addr string
HTTPSAddr string `cfg:"https_addr"`
@@ -94,28 +100,25 @@ func (cfg Config) Validate() Config {
return validcfg
}
// Frontend represents the state of an HTTP BitTorrent Frontend.
type Frontend struct {
type httpFE struct {
srv *http.Server
srvMu sync.Mutex
tlsSrv *http.Server
tlsSrvMu sync.Mutex
tlsCfg *tls.Config
logic frontend.TrackerLogic
logic *middleware.Logic
Config
}
// NewFrontend creates a new instance of an HTTP Frontend that asynchronously
// serves requests.
func NewFrontend(logic frontend.TrackerLogic, c conf.MapConfig) (*Frontend, error) {
func newFrontend(c conf.MapConfig, logic *middleware.Logic) (frontend.Frontend, error) {
var provided Config
if err := c.Unmarshal(&provided); err != nil {
return nil, err
}
cfg := provided.Validate()
f := &Frontend{
f := &httpFE{
logic: logic,
Config: cfg,
srvMu: sync.Mutex{},
@@ -182,7 +185,7 @@ func NewFrontend(logic frontend.TrackerLogic, c conf.MapConfig) (*Frontend, erro
}
// Stop provides a thread-safe way to shut down a currently running Frontend.
func (f *Frontend) Stop() stop.Result {
func (f *httpFE) Stop() stop.Result {
stopGroup := stop.NewGroup()
f.srvMu.Lock()
@@ -200,7 +203,7 @@ func (f *Frontend) Stop() stop.Result {
return stopGroup.Stop()
}
func (f *Frontend) makeStopFunc(stopSrv *http.Server) stop.Func {
func (f *httpFE) makeStopFunc(stopSrv *http.Server) stop.Func {
return func() stop.Result {
c := make(stop.Channel)
go func() {
@@ -212,7 +215,7 @@ func (f *Frontend) makeStopFunc(stopSrv *http.Server) stop.Func {
// serveHTTP blocks while listening and serving non-TLS HTTP BitTorrent
// requests until Stop() is called or an error is returned.
func (f *Frontend) serveHTTP(handler http.Handler, tls bool) error {
func (f *httpFE) serveHTTP(handler http.Handler, tls bool) error {
srv := &http.Server{
Handler: handler,
ReadTimeout: f.ReadTimeout,
@@ -277,7 +280,7 @@ func injectRouteParamsToContext(ctx context.Context, ps httprouter.Params) conte
}
// announceRoute parses and responds to an Announce.
func (f *Frontend) announceRoute(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
func (f *httpFE) announceRoute(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
var err error
var start time.Time
var addr netip.Addr
@@ -314,7 +317,7 @@ func (f *Frontend) announceRoute(w http.ResponseWriter, r *http.Request, ps http
}
// scrapeRoute parses and responds to a Scrape.
func (f *Frontend) scrapeRoute(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
func (f *httpFE) scrapeRoute(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
var err error
var start time.Time
var addr netip.Addr
@@ -349,7 +352,7 @@ func (f *Frontend) scrapeRoute(w http.ResponseWriter, r *http.Request, ps httpro
go f.logic.AfterScrape(ctx, req, resp)
}
func (f *Frontend) ping(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {
func (f *httpFE) ping(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {
var err error
if r.Method == http.MethodGet {
err = f.logic.Ping(context.TODO())
-4
View File
@@ -1,9 +1,5 @@
package frontend
import "github.com/sot-tech/mochi/pkg/log"
var logger = log.NewLogger("frontend configurator")
// ParseOptions is the configuration used to parse an Announce Request.
//
// If AllowIPSpoofing is true, IPs provided via params will be used.
+18 -13
View File
@@ -18,6 +18,7 @@ import (
"github.com/sot-tech/mochi/bittorrent"
"github.com/sot-tech/mochi/frontend"
"github.com/sot-tech/mochi/frontend/udp/bytepool"
"github.com/sot-tech/mochi/middleware"
"github.com/sot-tech/mochi/pkg/conf"
"github.com/sot-tech/mochi/pkg/log"
"github.com/sot-tech/mochi/pkg/metrics"
@@ -25,12 +26,18 @@ import (
"github.com/sot-tech/mochi/pkg/timecache"
)
const Name = "http"
var (
logger = log.NewLogger("udp frontend")
logger = log.NewLogger(Name)
allowedGeneratedPrivateKeyRunes = []rune("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ1234567890")
errUnexpectedConnType = errors.New("unexpected connection type (not UDPConn)")
)
func init() {
frontend.RegisterBuilder(Name, newFrontend)
}
// Config represents all of the configurable options for a UDP BitTorrent
// Tracker.
type Config struct {
@@ -69,28 +76,26 @@ func (cfg Config) Validate() Config {
return validcfg
}
// Frontend holds the state of a UDP BitTorrent Frontend.
type Frontend struct {
// udpFE holds the state of a UDP BitTorrent Frontend.
type udpFE struct {
socket *net.UDPConn
closing chan struct{}
wg sync.WaitGroup
genPool *sync.Pool
logic frontend.TrackerLogic
logic *middleware.Logic
Config
}
// NewFrontend creates a new instance of an UDP Frontend that asynchronously
// serves requests.
func NewFrontend(logic frontend.TrackerLogic, c conf.MapConfig) (*Frontend, error) {
func newFrontend(c conf.MapConfig, logic *middleware.Logic) (frontend.Frontend, error) {
var provided Config
if err := c.Unmarshal(&provided); err != nil {
return nil, err
}
cfg := provided.Validate()
f := &Frontend{
f := &udpFE{
closing: make(chan struct{}),
logic: logic,
Config: cfg,
@@ -116,7 +121,7 @@ func NewFrontend(logic frontend.TrackerLogic, c conf.MapConfig) (*Frontend, erro
}
// Stop provides a thread-safe way to shut down a currently running Frontend.
func (t *Frontend) Stop() stop.Result {
func (t *udpFE) Stop() stop.Result {
select {
case <-t.closing:
return stop.AlreadyStopped
@@ -135,7 +140,7 @@ func (t *Frontend) Stop() stop.Result {
}
// listen resolves the address and binds the server socket.
func (t *Frontend) listen() (err error) {
func (t *udpFE) listen() (err error) {
if t.ReusePort {
var ln net.PacketConn
if ln, err = reuseport.ListenPacket("udp", t.Addr); err == nil {
@@ -156,12 +161,12 @@ func (t *Frontend) listen() (err error) {
// serve blocks while listening and serving UDP BitTorrent requests
// until Stop() is called or an error is returned.
func (t *Frontend) serve() error {
func (t *udpFE) serve() error {
pool := bytepool.New(2048)
defer t.wg.Done()
for {
// Check to see if we need to shutdown.
// Check to see if we need shutdown.
select {
case <-t.closing:
log.Debug().Msg("serve received shutdown signal")
@@ -229,7 +234,7 @@ func (w ResponseWriter) Write(b []byte) (int, error) {
}
// handleRequest parses and responds to a UDP Request.
func (t *Frontend) handleRequest(r Request, w ResponseWriter) (actionName string, err error) {
func (t *udpFE) handleRequest(r Request, w ResponseWriter) (actionName string, err error) {
if len(r.Packet) < 16 {
// Malformed, no client packets are less than 16 bytes.
// We explicitly return nothing in case this is a DoS attempt.
+1 -1
View File
@@ -22,7 +22,7 @@ func TestStartStopRaceIssue437(t *testing.T) {
t.Fatal(err)
}
lgc := middleware.NewLogic(0, 0, ps, nil, nil)
fe, err := udp.NewFrontend(lgc, conf.MapConfig{"addr": "127.0.0.1:0"})
fe, err := udp.newFrontend(conf.MapConfig{"addr": "127.0.0.1:0"}, lgc)
if err != nil {
t.Fatal(err)
}
+6 -6
View File
@@ -3,7 +3,7 @@ module github.com/sot-tech/mochi
go 1.18
require (
code.cloudfoundry.org/go-diodes v0.0.0-20220927211948-2985a72b297c
code.cloudfoundry.org/go-diodes v0.0.0-20221017175818-728392b37655
github.com/MicahParks/keyfunc v1.5.1
github.com/anacrolix/torrent v1.47.0
github.com/go-redis/redis/v8 v8.11.5
@@ -20,7 +20,7 @@ require (
)
require (
github.com/anacrolix/dht/v2 v2.19.0 // indirect
github.com/anacrolix/dht/v2 v2.19.1 // indirect
github.com/anacrolix/log v0.13.2-0.20220711050817-613cb738ef30 // indirect
github.com/anacrolix/missinggo v1.3.0 // indirect
github.com/anacrolix/missinggo/v2 v2.7.1 // indirect
@@ -40,11 +40,11 @@ require (
github.com/mattn/go-isatty v0.0.16 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.2 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/client_model v0.2.0 // indirect
github.com/prometheus/client_model v0.3.0 // indirect
github.com/prometheus/common v0.37.0 // indirect
github.com/prometheus/procfs v0.8.0 // indirect
golang.org/x/crypto v0.0.0-20221012134737-56aed061732a // indirect
golang.org/x/sys v0.0.0-20221013171732-95e765b1cc43 // indirect
golang.org/x/text v0.3.8 // indirect
golang.org/x/crypto v0.1.0 // indirect
golang.org/x/sys v0.1.0 // indirect
golang.org/x/text v0.4.0 // indirect
google.golang.org/protobuf v1.28.1 // indirect
)
+14 -13
View File
@@ -30,8 +30,8 @@ cloud.google.com/go/storage v1.5.0/go.mod h1:tpKbwo567HUNpVclU5sGELwQWBDZ8gh0Zeo
cloud.google.com/go/storage v1.6.0/go.mod h1:N7U0C8pVQ/+NIKOBQyamJIeKQKkZ+mxpohlUTyfDhBk=
cloud.google.com/go/storage v1.8.0/go.mod h1:Wv1Oy7z6Yz3DshWRJFhqM/UCfaWIRTdp0RXyy7KQOVs=
cloud.google.com/go/storage v1.10.0/go.mod h1:FLPqc6j+Ki4BU591ie1oL6qBQGu2Bl/tZ9ullr3+Kg0=
code.cloudfoundry.org/go-diodes v0.0.0-20220927211948-2985a72b297c h1:6Gb9bHc7g8thQ4AWxwdA6qxhtDNcqNw2T1+Z+w37kOw=
code.cloudfoundry.org/go-diodes v0.0.0-20220927211948-2985a72b297c/go.mod h1:v30sK4Ipg4Ng0pRG22iLfeMRfYUHQNee9f4NkE5t1yc=
code.cloudfoundry.org/go-diodes v0.0.0-20221017175818-728392b37655 h1:3IlYr3dIyXkemR1GoOITnBWnWBHG27wnC6mpV9OLD7c=
code.cloudfoundry.org/go-diodes v0.0.0-20221017175818-728392b37655/go.mod h1:T+qZrTfw8xwdJgVRD5pSvVyx4CeqsP30YQooBYFADRc=
crawshaw.io/iox v0.0.0-20181124134642-c51c3df30797/go.mod h1:sXBiorCo8c46JlQV3oXPKINnZ8mcqnye1EkVkqsectk=
crawshaw.io/sqlite v0.3.2/go.mod h1:igAO5JulrQ1DbdZdtVq48mnZUBAPOeFzer7VhDWNtW4=
dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
@@ -49,8 +49,8 @@ github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuy
github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho=
github.com/anacrolix/dht/v2 v2.19.0 h1:A9oMHWRGbLmCyx1JlYzg79bDrur8V60+0ts8ZwEVYt4=
github.com/anacrolix/dht/v2 v2.19.0/go.mod h1:0h83KnnAQ2AUYhpQ/CkoZP45K41pjDAlPR9zGHgFjQE=
github.com/anacrolix/dht/v2 v2.19.1 h1:V/UUGBASGYqYkSnmHJwX8uQmzkyhbgwE6jqcHKnNTD8=
github.com/anacrolix/dht/v2 v2.19.1/go.mod h1:3TU93c1s/oA8I/VH4m3CNP/BeKsiOGmo6HwfZBMTKUs=
github.com/anacrolix/envpprof v0.0.0-20180404065416-323002cec2fa/go.mod h1:KgHhUaQMc8cC0+cEflSgCFNFbKwi5h54gqtVn8yhP7c=
github.com/anacrolix/envpprof v1.0.0/go.mod h1:KgHhUaQMc8cC0+cEflSgCFNFbKwi5h54gqtVn8yhP7c=
github.com/anacrolix/envpprof v1.1.0/go.mod h1:My7T5oSqVfEn4MD4Meczkw/f5lSIndGAKu/0SM/rkf4=
@@ -275,7 +275,7 @@ github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+W
github.com/onsi/ginkgo v1.7.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/ginkgo v1.16.5 h1:8xi0RTUf59SOSfEtZMvwTvXYMzG4gV23XVHOZiXNtnE=
github.com/onsi/gomega v1.4.3/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
github.com/onsi/gomega v1.20.2 h1:8uQq0zMgLEfa0vRrrBgaJF2gyW9Da9BmfGV+OyUzfkY=
github.com/onsi/gomega v1.22.1 h1:pY8O4lBfsHKZHM/6nrxkhVPUznOlIu3quZcKP/M20KI=
github.com/openzipkin/zipkin-go v0.1.6/go.mod h1:QgAqvLzwWbR/WpD4A3cGpPtJrZXNIiJc5AZX7/PBEpw=
github.com/philhofer/fwd v1.0.0/go.mod h1:gk3iGcWd9+svBvR0sR+KPcfE+RNWozjowpeBVG3ZVNU=
github.com/pierrec/lz4 v2.0.5+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY=
@@ -298,8 +298,9 @@ github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:
github.com/prometheus/client_model v0.0.0-20190115171406-56726106282f/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo=
github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
github.com/prometheus/client_model v0.2.0 h1:uq5h0d+GuxiXLJLNABMgp2qUWDPiLvgCzz2dUR+/W/M=
github.com/prometheus/client_model v0.2.0/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
github.com/prometheus/client_model v0.3.0 h1:UBgGFHqYdG/TPFD1B1ogZywDqEkwp3fBMvqdiQ7Xew4=
github.com/prometheus/client_model v0.3.0/go.mod h1:LDGWKZIo7rky3hgvBe+caln+Dr3dPggB5dvjtD7w9+w=
github.com/prometheus/common v0.2.0/go.mod h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y86RQel1bk4=
github.com/prometheus/common v0.4.1/go.mod h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y86RQel1bk4=
github.com/prometheus/common v0.9.1/go.mod h1:yhUN8i9wzaXS3w1O07YhxHEBxD+W35wd8bs7vj7HSQ4=
@@ -367,8 +368,8 @@ golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8U
golang.org/x/crypto v0.0.0-20190605123033-f99c8df09eb5/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20221012134737-56aed061732a h1:NmSIgad6KjE6VvHciPZuNRTKxGhlPfD6OA87W/PLkqg=
golang.org/x/crypto v0.0.0-20221012134737-56aed061732a/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
golang.org/x/crypto v0.1.0 h1:MDRAIl0xIo9Io2xV565hzXHw3zVseKrJKodhohM5CjU=
golang.org/x/crypto v0.1.0/go.mod h1:RecgLatLF4+eUMCP1PoPZQb+cVrJcOPbHkTkbkB9sbw=
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20190510132918-efd6b22b2522/go.mod h1:ZjyILWgesfNpC6sMxTJOJm9Kp84zZh5NQWvqDGG3Qr8=
@@ -432,7 +433,7 @@ golang.org/x/net v0.0.0-20200822124328-c89045814202/go.mod h1:/O7V0waA8r7cgGh81R
golang.org/x/net v0.0.0-20210525063256-abc453219eb5/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk=
golang.org/x/net v0.0.0-20220225172249-27dd8689420f/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk=
golang.org/x/net v0.0.0-20220722155237-a158d28d115b h1:PxfKdU9lEEDYjdIzOtC4qFWgkU2rGHdKlKowJSMN9h0=
golang.org/x/net v0.1.0 h1:hZ/3BUoy5aId7sCpA/Tc5lt8DkFgdVS2onTpJsZ/fl0=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
@@ -495,8 +496,8 @@ golang.org/x/sys v0.0.0-20220114195835-da31bd327af9/go.mod h1:oPkhp1MJrh7nUepCBc
golang.org/x/sys v0.0.0-20220704084225-05e143d24a9e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220908164124-27713097b956/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20221013171732-95e765b1cc43 h1:OK7RB6t2WQX54srQQYSXMW8dF5C6/8+oA/s5QBmmto4=
golang.org/x/sys v0.0.0-20221013171732-95e765b1cc43/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.1.0 h1:kunALQeHf1/185U1i0GOB/fy1IPRDDpuoOOqRReG57U=
golang.org/x/sys v0.1.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
@@ -506,8 +507,8 @@ golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
golang.org/x/text v0.3.8 h1:nAL+RVCQ9uMn3vJZbV+MRnydTJFPf8qqY42YiA6MrqY=
golang.org/x/text v0.3.8/go.mod h1:E6s5w1FMmriuDzIBO73fBruAKo1PCIq6d2Q6DHfQ8WQ=
golang.org/x/text v0.4.0 h1:BrVqGRd7+k1DiOgtnFvAkoQEWQvBc25ouMJM6429SFg=
golang.org/x/text v0.4.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8=
golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
+2 -2
View File
@@ -39,10 +39,10 @@ type hook struct {
unapproved map[ClientID]struct{}
}
func build(options conf.MapConfig, _ storage.PeerStorage) (middleware.Hook, error) {
func build(config conf.MapConfig, _ storage.PeerStorage) (middleware.Hook, error) {
var cfg Config
if err := options.Unmarshal(&cfg); err != nil {
if err := config.Unmarshal(&cfg); err != nil {
return nil, fmt.Errorf("middleware %s: %w", Name, err)
}
+1 -1
View File
@@ -19,7 +19,7 @@ type Hook interface {
}
// Pinger is an optional interface that may be implemented by a pre Hook
// to check if it is operational. Used in frontend.TrackerLogic.
// to check if it is operational. Used in frontend.Logic.
//
// It may be useful in cases when Hook performs foreign requests to
// some external resources (i.e. storage) and `ping` request should
+2 -4
View File
@@ -70,12 +70,10 @@ type hook struct {
jwks *keyfunc.JWKS
}
func build(options conf.MapConfig, _ storage.PeerStorage) (h middleware.Hook, err error) {
func build(config conf.MapConfig, _ storage.PeerStorage) (h middleware.Hook, err error) {
var cfg Config
logger.Debug().Object("options", options).Msg("creating new JWT middleware")
if err = options.Unmarshal(&cfg); err != nil {
if err = config.Unmarshal(&cfg); err != nil {
return nil, fmt.Errorf("middleware %s: %w", Name, err)
}
+21 -22
View File
@@ -5,18 +5,22 @@ import (
"time"
"github.com/sot-tech/mochi/bittorrent"
"github.com/sot-tech/mochi/frontend"
"github.com/sot-tech/mochi/pkg/log"
"github.com/sot-tech/mochi/pkg/stop"
"github.com/sot-tech/mochi/storage"
)
var (
logger = log.NewLogger("middleware")
_ frontend.TrackerLogic = &Logic{}
)
// Logic used by a frontend in order to: (1) generate a
// response from a parsed request, and (2) asynchronously observe anything
// after the response has been delivered to the client.
type Logic struct {
announceInterval time.Duration
minAnnounceInterval time.Duration
preHooks []Hook
postHooks []Hook
pingers []Pinger
}
// NewLogic creates a new instance of a TrackerLogic that executes the provided
// NewLogic creates a new instance of a Logic that executes the provided
// middleware hooks.
func NewLogic(annInterval, minAnnInterval time.Duration, peerStore storage.PeerStorage, preHooks, postHooks []Hook) *Logic {
l := &Logic{
@@ -34,17 +38,10 @@ func NewLogic(annInterval, minAnnInterval time.Duration, peerStore storage.PeerS
return l
}
// Logic is an implementation of the TrackerLogic that functions by
// executing a series of middleware hooks.
type Logic struct {
announceInterval time.Duration
minAnnounceInterval time.Duration
preHooks []Hook
postHooks []Hook
pingers []Pinger
}
// HandleAnnounce generates a response for an Announce.
//
// Returns the updated context, the generated AnnounceResponse and no error
// on success; nil and error on failure.
func (l *Logic) HandleAnnounce(ctx context.Context, req *bittorrent.AnnounceRequest) (_ context.Context, resp *bittorrent.AnnounceResponse, err error) {
logger.Debug().Object("request", req).Msg("new announce request")
resp = &bittorrent.AnnounceResponse{
@@ -62,8 +59,8 @@ func (l *Logic) HandleAnnounce(ctx context.Context, req *bittorrent.AnnounceRequ
return ctx, resp, nil
}
// AfterAnnounce does something with the results of an Announce after it has
// been completed.
// AfterAnnounce does something with the results of an Announce after it
// has been completed.
func (l *Logic) AfterAnnounce(ctx context.Context, req *bittorrent.AnnounceRequest, resp *bittorrent.AnnounceResponse) {
var err error
for _, h := range l.postHooks {
@@ -78,6 +75,9 @@ func (l *Logic) AfterAnnounce(ctx context.Context, req *bittorrent.AnnounceReque
}
// HandleScrape generates a response for a Scrape.
//
// Returns the updated context, the generated AnnounceResponse and no error
// on success; nil and error on failure.
func (l *Logic) HandleScrape(ctx context.Context, req *bittorrent.ScrapeRequest) (_ context.Context, resp *bittorrent.ScrapeResponse, err error) {
logger.Debug().Object("request", req).Msg("new scrape request")
resp = &bittorrent.ScrapeResponse{
@@ -93,8 +93,7 @@ func (l *Logic) HandleScrape(ctx context.Context, req *bittorrent.ScrapeRequest)
return ctx, resp, nil
}
// AfterScrape does something with the results of a Scrape after it has been
// completed.
// AfterScrape does something with the results of a Scrape after it has been completed.
func (l *Logic) AfterScrape(ctx context.Context, req *bittorrent.ScrapeRequest, resp *bittorrent.ScrapeResponse) {
var err error
for _, h := range l.postHooks {
@@ -109,7 +108,7 @@ func (l *Logic) AfterScrape(ctx context.Context, req *bittorrent.ScrapeRequest,
}
}
// Ping performs check if all Hook-s are operational
// Ping executes checks if all Hook-s are operational
func (l *Logic) Ping(ctx context.Context) (err error) {
for _, p := range l.pingers {
if err = p.Ping(ctx); err != nil {
+26 -53
View File
@@ -1,92 +1,65 @@
// Package middleware implements the TrackerLogic interface by executing
// Package middleware implements the Logic interface by executing
// a series of middleware hooks.
package middleware
import (
"errors"
"fmt"
"sync"
"github.com/sot-tech/mochi/pkg/conf"
"github.com/sot-tech/mochi/pkg/log"
"github.com/sot-tech/mochi/storage"
)
var (
driversM sync.RWMutex
drivers = make(map[string]Builder)
// ErrBuilderDoesNotExist is the error returned by NewMiddleware when a
// middleware driver with that name does not exist.
ErrBuilderDoesNotExist = errors.New("middleware builder with that name does not exist")
logger = log.NewLogger("middleware")
buildersMU sync.RWMutex
builders = make(map[string]Builder)
)
// Builder is the interface used to initialize a new type of middleware.
//
// The `options` parameter is map of parameters that should be unmarshalled into
// the hook's custom configuration.
type Builder func(options conf.MapConfig, storage storage.PeerStorage) (Hook, error)
// Builder is the function used to initialize a new Hook
// with provided configuration.
type Builder func(conf.MapConfig, storage.PeerStorage) (Hook, error)
// RegisterBuilder makes a Builder available by the provided name.
//
// If called twice with the same name, the name is blank, or if the provided
// Builder is nil, this function panics.
func RegisterBuilder(name string, d Builder) {
func RegisterBuilder(name string, b Builder) {
if name == "" {
panic("middleware: could not register a Builder with an empty name")
panic("middleware: could not register Builder with an empty name")
}
if d == nil {
if b == nil {
panic("middleware: could not register a nil Builder")
}
driversM.Lock()
defer driversM.Unlock()
buildersMU.Lock()
defer buildersMU.Unlock()
if _, dup := drivers[name]; dup {
if _, dup := builders[name]; dup {
panic("middleware: RegisterBuilder called twice for " + name)
}
drivers[name] = d
}
// NewHook attempts to initialize a new middleware instance from the
// list of registered Builders.
//
// If a driver does not exist, returns ErrBuilderDoesNotExist.
func NewHook(name string, options conf.MapConfig, storage storage.PeerStorage) (Hook, error) {
driversM.RLock()
defer driversM.RUnlock()
var newHook Builder
newHook, ok := drivers[name]
if !ok {
return nil, ErrBuilderDoesNotExist
}
return newHook(options, storage)
}
// Config is the generic configuration format used for all registered Hooks.
type Config struct {
Name string
Options conf.MapConfig
builders[name] = b
}
// NewHooks is a utility function for initializing Hooks in bulk.
// each element of configs must contain pairs `name` - string and `options` - map[string]any
func NewHooks(configs []conf.MapConfig, storage storage.PeerStorage) (hooks []Hook, err error) {
for _, cfg := range configs {
var c Config
if err = cfg.Unmarshal(&c); err != nil {
func NewHooks(configs []conf.NamedMapConfig, storage storage.PeerStorage) (hooks []Hook, err error) {
buildersMU.RLock()
defer buildersMU.RUnlock()
for _, c := range configs {
logger.Debug().Object("hook", c).Msg("starting hook")
newHook, ok := builders[c.Name]
if !ok {
err = fmt.Errorf("hook with name '%s' does not exists", c.Name)
break
}
var h Hook
h, err = NewHook(c.Name, c.Options, storage)
if err != nil {
if h, err = newHook(c.Config, storage); err != nil {
break
}
hooks = append(hooks, h)
logger.Info().Str("name", c.Name).Msg("hook started")
}
return
@@ -38,18 +38,18 @@ type baseConfig struct {
Configuration conf.MapConfig
}
func build(options conf.MapConfig, st storage.PeerStorage) (h middleware.Hook, err error) {
func build(config conf.MapConfig, st storage.PeerStorage) (h middleware.Hook, err error) {
var cfg baseConfig
if err = options.Unmarshal(&cfg); err != nil {
if err = config.Unmarshal(&cfg); err != nil {
return nil, fmt.Errorf("middleware %s: %w", Name, err)
}
if len(cfg.Source) == 0 {
return nil, fmt.Errorf("invalid options for middleware %s: source not provided", Name)
return nil, fmt.Errorf("invalid config for middleware %s: source not provided", Name)
}
if cfg.Configuration == nil {
return nil, fmt.Errorf("invalid options for middleware %s: options not provided", Name)
return nil, fmt.Errorf("invalid config for middleware %s: config not provided", Name)
}
var ds storage.DataStorage = st
+2 -2
View File
@@ -23,10 +23,10 @@ func init() {
middleware.RegisterBuilder(Name, build)
}
func build(options conf.MapConfig, _ storage.PeerStorage) (h middleware.Hook, err error) {
func build(config conf.MapConfig, _ storage.PeerStorage) (h middleware.Hook, err error) {
var cfg Config
if err = options.Unmarshal(&cfg); err != nil {
if err = config.Unmarshal(&cfg); err != nil {
err = fmt.Errorf("middleware %s: %w", Name, err)
} else {
if err := checkConfig(cfg); err == nil {
@@ -51,3 +51,12 @@ func (m MapConfig) Unmarshal(into any) (err error) {
}
return
}
type NamedMapConfig struct {
Name string
Config MapConfig
}
func (nm NamedMapConfig) MarshalZerologObject(e *zerolog.Event) {
e.Str("name", nm.Name).Dict("config", zerolog.Dict().EmbedObject(nm.Config))
}
+1 -1
View File
@@ -39,7 +39,7 @@ var (
func init() {
// Register the storage driver.
storage.RegisterBuilder(Name, builder)
storage.RegisterDriver(Name, builder)
}
func builder(icfg conf.MapConfig) (storage.PeerStorage, error) {
+1 -1
View File
@@ -29,7 +29,7 @@ var logger = log.NewLogger(Name)
func init() {
// Register the storage driver.
storage.RegisterBuilder(Name, builder)
storage.RegisterDriver(Name, builder)
}
func builder(icfg conf.MapConfig) (storage.PeerStorage, error) {
+5 -19
View File
@@ -56,7 +56,7 @@ var (
func init() {
// Register the storage builder.
storage.RegisterBuilder(Name, builder)
storage.RegisterDriver(Name, builder)
}
func builder(icfg conf.MapConfig) (storage.PeerStorage, error) {
@@ -107,8 +107,8 @@ type dataQueryConf struct {
}
type downloadQueryConf struct {
GetQuery string
IncrementQuery string
GetQuery string `cfg:"get_query"`
IncrementQuery string `cfg:"inc_query"`
}
// Config holds the configuration of a redis PeerStorage.
@@ -223,20 +223,6 @@ type store struct {
closed chan any
}
func (s *store) tx(ctx context.Context, query string, args pgx.NamedArgs) (err error) {
var tx pgx.Tx
if tx, err = s.Begin(ctx); err == nil {
if _, err = tx.Exec(ctx, query, args); err == nil {
err = tx.Commit(ctx)
} else {
if txErr := tx.Rollback(ctx); txErr != nil {
err = fmt.Errorf(errRollBackMsg, txErr, err)
}
}
}
return
}
func (s *store) txBatch(ctx context.Context, batch *pgx.Batch) (err error) {
var tx pgx.Tx
if tx, err = s.Begin(ctx); err == nil {
@@ -256,7 +242,7 @@ func (s *store) Put(ctx string, values ...storage.Entry) (err error) {
case 0:
// ignore
case 1:
err = s.tx(context.TODO(), s.Data.AddQuery, pgx.NamedArgs{pCtx: ctx, pKey: []byte(values[0].Key), pValue: values[0].Value})
_, err = s.Exec(context.TODO(), s.Data.AddQuery, pgx.NamedArgs{pCtx: ctx, pKey: []byte(values[0].Key), pValue: values[0].Value})
default:
var batch pgx.Batch
for _, v := range values {
@@ -569,7 +555,7 @@ func (s *store) countPeers(ih []byte) (seeders uint32, leechers uint32) {
}
}
if err != nil {
logger.Error().Err(err).Hex("infoHash", ih).Msg("unable to get peers count")
logger.Error().Err(err).Bytes("infoHash", ih).Msg("unable to get peers count")
}
return
}
+1 -1
View File
@@ -77,7 +77,7 @@ var (
func init() {
// Register the storage builder.
storage.RegisterBuilder(Name, builder)
storage.RegisterDriver(Name, builder)
}
func builder(icfg conf.MapConfig) (storage.PeerStorage, error) {
+37 -41
View File
@@ -3,7 +3,7 @@
package storage
import (
"errors"
"fmt"
"sync"
"time"
@@ -23,9 +23,9 @@ const (
)
var (
logger = log.NewLogger("storage configurator")
driversM sync.RWMutex
drivers = make(map[string]Builder)
logger = log.NewLogger("storage")
driversMU sync.RWMutex
drivers = make(map[string]Driver)
)
// Config holds configuration for periodic execution tasks, which may or may not implement
@@ -81,19 +81,14 @@ type Entry struct {
Value []byte
}
// Builder is the function used to initialize a new type of PeerStorage.
type Builder func(cfg conf.MapConfig) (PeerStorage, error)
// Driver is the function used to initialize a new PeerStorage
// with provided configuration.
type Driver func(conf.MapConfig) (PeerStorage, error)
var (
// ErrResourceDoesNotExist is the error returned by all delete methods and the
// AnnouncePeers method of the PeerStorage interface if the requested resource
// does not exist.
ErrResourceDoesNotExist = bittorrent.ClientError("resource does not exist")
// ErrDriverDoesNotExist is the error returned by NewStorage when a peer
// store driver with that name does not exist.
ErrDriverDoesNotExist = errors.New("peer store driver with that name does not exist")
)
// ErrResourceDoesNotExist is the error returned by all delete methods and the
// AnnouncePeers method of the PeerStorage interface if the requested resource
// does not exist.
var ErrResourceDoesNotExist = bittorrent.ClientError("resource does not exist")
// DataStorage is the interface, used for implementing store for arbitrary data
type DataStorage interface {
@@ -216,80 +211,81 @@ type PeerStorage interface {
stop.Stopper
}
// RegisterBuilder makes a Builder available by the provided name.
// RegisterDriver makes a Driver available by the provided name.
//
// If called twice with the same name, the name is blank, or if the provided
// Driver is nil, this function panics.
func RegisterBuilder(name string, b Builder) {
func RegisterDriver(name string, d Driver) {
if name == "" {
panic("storage: could not register a Builder with an empty name")
panic("storage: could not register a Driver with an empty name")
}
if b == nil {
panic("storage: could not register a nil Builder")
if d == nil {
panic("storage: could not register a nil Driver")
}
driversM.Lock()
defer driversM.Unlock()
driversMU.Lock()
defer driversMU.Unlock()
if _, dup := drivers[name]; dup {
panic("storage: RegisterBuilder called twice for " + name)
panic("storage: RegisterDriver called twice for " + name)
}
drivers[name] = b
drivers[name] = d
}
// NewStorage attempts to initialize a new PeerStorage instance from
// the list of registered Drivers.
//
// If a builder does not exist, returns ErrDriverDoesNotExist.
func NewStorage(name string, cfg conf.MapConfig) (ps PeerStorage, err error) {
driversM.RLock()
defer driversM.RUnlock()
// the list of registered drivers.
func NewStorage(cfg conf.NamedMapConfig) (ps PeerStorage, err error) {
driversMU.RLock()
defer driversMU.RUnlock()
logger.Debug().Object("cfg", cfg).Msg("staring storage")
var b Builder
b, ok := drivers[name]
var b Driver
b, ok := drivers[cfg.Name]
if !ok {
return nil, ErrDriverDoesNotExist
return nil, fmt.Errorf("storage with name '%s' does not exists", cfg.Name)
}
c := new(Config)
if err = cfg.Unmarshal(c); err != nil {
if err = cfg.Config.Unmarshal(c); err != nil {
return
}
if ps, err = b(cfg); err != nil {
if ps, err = b(cfg.Config); err != nil {
return
}
if gc := ps.GCAware(); gc {
gcInterval, peerTTL := c.sanitizeGCConfig()
logger.Info().
Str("type", name).
Str("name", cfg.Name).
Dur("gcInterval", gcInterval).
Dur("peerTTL", peerTTL).
Msg("scheduling GC")
ps.ScheduleGC(gcInterval, peerTTL)
} else {
logger.Debug().
Str("type", name).
Str("name", cfg.Name).
Msg("storage does not support GC")
}
if st := ps.StatisticsAware(); st {
if statInterval := c.sanitizeStatisticsConfig(); statInterval > 0 {
logger.Info().
Str("type", name).
Str("name", cfg.Name).
Dur("statInterval", statInterval).
Msg("scheduling statistics collection")
ps.ScheduleStatisticsCollection(statInterval)
} else {
logger.Info().Str("type", name).Msg("statistics collection disabled because of zero reporting interval")
logger.Info().Str("name", cfg.Name).Msg("statistics collection disabled because of zero reporting interval")
}
} else {
logger.Debug().
Str("type", name).
Str("name", cfg.Name).
Msg("storage does not support statistics collection")
}
logger.Info().Str("name", cfg.Name).Msg("storage started")
return
}