(wip) fix invalid http compact address encode

* add packages to loggers
* split config examples with different storages
This commit is contained in:
Lawrence, Rendall
2022-10-25 18:38:55 +03:00
parent a9d1642615
commit c1e041b4f8
19 changed files with 368 additions and 281 deletions
+2 -5
View File
@@ -82,13 +82,10 @@ jobs:
run: |
go install ./cmd/mochi
go install ./cmd/mochi-e2e
curl -LO https://github.com/jzelinskie/faq/releases/download/0.0.6/faq-linux-amd64
chmod +x faq-linux-amd64
./faq-linux-amd64 '.mochi.storage = {"config":{"gc_interval":"3m","peer_lifetime":"31m","prometheus_reporting_interval":"1s","connect_timeout":"15s","read_timeout":"15s","write_timeout":"15s"},"name":"redis"}' ./dist/example_config.yaml > ./dist/example_redis_config.yaml
cat ./dist/example_redis_config.yaml
cat ./dist/example_config_redis.yaml
- name: "Run end-to-end tests"
run: |
mochi --config=./dist/example_redis_config.yaml --logLevel debug --logPretty &
mochi --config=./dist/example_config_redis.yaml --logLevel debug --logPretty &
pid=$!
sleep 2
mochi-e2e
+1 -1
View File
@@ -5,7 +5,7 @@ import (
)
var (
logger = log.NewLogger("request sanitizer")
logger = log.NewLogger("bittorrent/sanitize")
// ErrInvalidIP indicates an invalid IP for an Announce.
ErrInvalidIP = ClientError("invalid IP")
+4 -114
View File
@@ -23,13 +23,14 @@ frontends:
# BitTorrent traffic. Remove this to disable the non-TLS listener.
addr: "0.0.0.0:6969"
# The network interface that will bind to an HTTPS server for serving
# Mark this frontend as HTTPS server for serving
# BitTorrent traffic. If set, tls_cert_path and tls_key_path are required.
https_addr: ""
tls: false
# The path to the required files to listen via HTTPS.
tls_cert_path: ""
tls_key_path: ""
# Enable SO_REUSEPORT to allow starting multiple mochi instances with the same HTTP(S) port.
reuse_port: true
@@ -157,119 +158,9 @@ storage:
# are collected and posted to Prometheus.
prometheus_reporting_interval: 1s
# This block defines configuration used for redis storage.
#storage:
#name: redis
#config:
# The frequency which stale peers are removed.
# This balances between
# - collecting garbage more often, potentially using more CPU time, but potentially using less memory (lower value)
# - collecting garbage less frequently, saving CPU time, but keeping old peers long, thus using more memory (higher value).
#gc_interval: 3m
# The interval at which metrics about the number of infohashes and peers
# are collected and posted to Prometheus.
#prometheus_reporting_interval: 1s
# The amount of time until a peer is considered stale.
# To avoid churn, keep this slightly larger than `announce_interval`
#peer_lifetime: 31m
# The addresses of redis storage.
# If neither sentinel not cluster switched,
# only first address used
#addresses: ["127.0.0.1:6379"]
# Database to be selected after connecting to the server.
#db: 0
# Maximum number of socket connections, default is 10 per CPU
#pool_size: 10
# Use the specified login/username to authenticate the current connection
#login: ""
# Optional password
#password: ""
# Connect to sentinel nodes
#sentinel: false
# The master name
#sentinel_master: ""
# Connect to the redis cluster
#cluster: false
# The timeout for reading a command reply from redis.
#read_timeout: 15s
# The timeout for writing a command to redis.
#write_timeout: 15s
# Dial timeout for establishing new connections.
#connect_timeout: 15s
# This block defines configuration used for PostgreSQL storage.
# example peers table structure:
# - info_hash bytea
# - peer_id bytea
# - address inet or bytea
# - port int4
# - is_seeder bool
# - is_v6 bool
# - created timestamp
# example downloads table structure:
# - info_hash bytea
# - downloads int
#storage:
#name: pg
#config:
# connection string to pg storage. may be URL (postgres://...) or DSN (host=... port=...)
#connection_string: host=127.0.0.1 database=test user=postgres pool_max_conns=50
# query and parameters for announce operation
#announce:
#query: SELECT peer_id, address, port FROM mo_peers WHERE info_hash=@info_hash AND is_seeder=@is_seeder AND is_v6=@is_v6 LIMIT @count
#peer_id_column: peer_id
#address_column: address
#port_column: port
# queries to get/increment 'snached' (downloaded) count
#downloads:
#get_query: SELECT downloads FROM mo_downloads where info_hash=@info_hash
#inc_query: INSERT INTO mo_downloads VALUES(@info_hash) ON CONFLICT(info_hash) DO UPDATE SET downloads = mo_downloads.downloads + 1
# queries and parameters for add/delete/count peers operations
#peer:
#add_query: INSERT INTO mo_peers VALUES(@info_hash, @peer_id, @address, @port, @is_seeder, @is_v6, @created) ON CONFLICT (info_hash, peer_id, address, port) DO UPDATE SET created = EXCLUDED.created, is_seeder = EXCLUDED.is_seeder
#del_query: DELETE FROM mo_peers WHERE info_hash=@info_hash AND peer_id=@peer_id AND address=@address AND port=@port AND is_seeder=@is_seeder
#graduate_query: UPDATE mo_peers SET is_seeder=TRUE WHERE info_hash=@info_hash AND peer_id=peer_id AND address=@address AND port=@port AND NOT is_seeder
#count_query: SELECT COUNT(1) FILTER (WHERE is_seeder) AS seeders, COUNT(1) FILTER (WHERE NOT is_seeder) AS leechers FROM mo_peers
# predicate part of `count_query` to get count of peers by info hash
#by_info_hash_clause: WHERE info_hash = @info_hash
#count_seeders_column: seeders
#count_leechers_column: leechers
# queries for KV-store
#data:
#add_query: INSERT INTO mo_kv VALUES(@context, @key, @value) ON CONFLICT (context, name) DO NOTHING
# NOTE: in del_query @key parameter is array, NOT single value
#del_query: DELETE FROM mo_kv WHERE context=@context AND name = ANY(@key)
#get_query: SELECT value FROM mo_kv WHERE context=@context AND name=@key
# query for check if database is alive
#ping_query: SELECT 1
# query for garbage collection, expected parameter is timestamp
#gc_query: DELETE FROM mo_peers WHERE created <= @created
# The amount of time until a peer is considered stale.
# To avoid churn, keep this slightly larger than `announce_interval`
#peer_lifetime: 31m
# The frequency which stale peers are removed.
#gc_interval: 3m
# query for info hash statistics
#info_hash_count_query: SELECT COUNT(DISTINCT info_hash) as info_hashes FROM mo_peers
# The interval at which metrics about the number of info hashes and peers
# are collected and posted to Prometheus.
#prometheus_reporting_interval: 1s
# This block defines configuration used for middleware executed before a
# response has been returned to a BitTorrent client.
posthooks: []
prehooks:
# - name: jwt
# config:
@@ -308,4 +199,3 @@ prehooks:
# invert: false
# Name of storage context where store hash list
# storage_ctx: APPROVED_HASH
posthooks:
+116
View File
@@ -0,0 +1,116 @@
# @formatter:off
# Note: see `example_config.yaml` for `frontends` and `*hooks` config description
announce_interval: 30m
min_announce_interval: 15m
metrics_addr: ""
frontends:
- name: http
config:
addr: "0.0.0.0:6969"
tls: false
tls_cert_path: ""
tls_key_path: ""
reuse_port: true
read_timeout: 5s
write_timeout: 5s
enable_keepalive: false
idle_timeout: 30s
enable_request_timing: false
announce_routes:
- "/announce"
scrape_routes:
- "/scrape"
ping_routes:
- "/ping"
allow_ip_spoofing: false
filter_private_ips: false
real_ip_header: "x-real-ip"
max_numwant: 100
default_numwant: 50
max_scrape_infohashes: 50
- name: udp
config:
addr: "0.0.0.0:6969"
reuse_port: true
max_clock_skew: 10s
private_key: "paste a random string here that will be used to hmac connection IDs"
enable_request_timing: false
allow_ip_spoofing: false
filter_private_ips: false
max_numwant: 100
default_numwant: 50
max_scrape_infohashes: 50
# This block defines configuration used for PostgreSQL storage.
# example peers table structure:
# - info_hash bytea
# - peer_id bytea
# - address inet or bytea
# - port int4
# - is_seeder bool
# - is_v6 bool
# - created timestamp
# example downloads table structure:
# - info_hash bytea
# - downloads int
storage:
name: pg
config:
# connection string to pg storage. may be URL (postgres://...) or DSN (host=... port=...)
#connection_string: host=127.0.0.1 database=test user=postgres pool_max_conns=50
# query and parameters for announce operation
announce:
query: SELECT peer_id, address, port FROM mo_peers WHERE info_hash=@info_hash AND is_seeder=@is_seeder AND is_v6=@is_v6 LIMIT @count
peer_id_column: peer_id
address_column: address
port_column: port
# queries to get/increment 'snached' (downloaded) count
downloads:
get_query: SELECT downloads FROM mo_downloads where info_hash=@info_hash
inc_query: INSERT INTO mo_downloads VALUES(@info_hash) ON CONFLICT(info_hash) DO UPDATE SET downloads = mo_downloads.downloads + 1
# queries and parameters for add/delete/count peers operations
peer:
add_query: INSERT INTO mo_peers VALUES(@info_hash, @peer_id, @address, @port, @is_seeder, @is_v6, @created) ON CONFLICT (info_hash, peer_id, address, port) DO UPDATE SET created = EXCLUDED.created, is_seeder = EXCLUDED.is_seeder
del_query: DELETE FROM mo_peers WHERE info_hash=@info_hash AND peer_id=@peer_id AND address=@address AND port=@port AND is_seeder=@is_seeder
graduate_query: UPDATE mo_peers SET is_seeder=TRUE WHERE info_hash=@info_hash AND peer_id=peer_id AND address=@address AND port=@port AND NOT is_seeder
count_query: SELECT COUNT(1) FILTER (WHERE is_seeder) AS seeders, COUNT(1) FILTER (WHERE NOT is_seeder) AS leechers FROM mo_peers
# predicate part of `count_query` to get count of peers by info hash
by_info_hash_clause: WHERE info_hash = @info_hash
count_seeders_column: seeders
count_leechers_column: leechers
# queries for KV-store
data:
add_query: INSERT INTO mo_kv VALUES(@context, @key, @value) ON CONFLICT (context, name) DO NOTHING
# Note: in del_query @key parameter is array, NOT single value
del_query: DELETE FROM mo_kv WHERE context=@context AND name = ANY(@key)
get_query: SELECT value FROM mo_kv WHERE context=@context AND name=@key
# query for check if database is alive
ping_query: SELECT 1
# query for garbage collection, expected parameter is timestamp
gc_query: DELETE FROM mo_peers WHERE created <= @created
# The amount of time until a peer is considered stale.
# To avoid churn, keep this slightly larger than `announce_interval`
peer_lifetime: 31m
# The frequency which stale peers are removed.
gc_interval: 3m
# query for info hash statistics
info_hash_count_query: SELECT COUNT(DISTINCT info_hash) as info_hashes FROM mo_peers
# The interval at which metrics about the number of info hashes and peers
# are collected and posted to Prometheus.
prometheus_reporting_interval: 1s
posthooks: []
prehooks: []
+103
View File
@@ -0,0 +1,103 @@
# @formatter:off
# Note: see `example_config.yaml` for `frontends` and `*hooks` config description
announce_interval: 30m
min_announce_interval: 15m
metrics_addr: ""
frontends:
- name: http
config:
addr: "0.0.0.0:6969"
tls: false
tls_cert_path: ""
tls_key_path: ""
reuse_port: true
read_timeout: 5s
write_timeout: 5s
enable_keepalive: false
idle_timeout: 30s
enable_request_timing: false
announce_routes:
- "/announce"
scrape_routes:
- "/scrape"
ping_routes:
- "/ping"
allow_ip_spoofing: false
filter_private_ips: false
real_ip_header: "x-real-ip"
max_numwant: 100
default_numwant: 50
max_scrape_infohashes: 50
- name: udp
config:
addr: "0.0.0.0:6969"
reuse_port: true
max_clock_skew: 10s
private_key: "paste a random string here that will be used to hmac connection IDs"
enable_request_timing: false
allow_ip_spoofing: false
filter_private_ips: false
max_numwant: 100
default_numwant: 50
max_scrape_infohashes: 50
# This block defines configuration used for redis storage.
storage:
# If used keydb fork, set `keydb` name
name: redis
config:
# The frequency which stale peers are removed.
# This balances between
# - collecting garbage more often, potentially using more CPU time, but potentially using less memory (lower value)
# - collecting garbage less frequently, saving CPU time, but keeping old peers long, thus using more memory (higher value).
gc_interval: 3m
# The interval at which metrics about the number of infohashes and peers
# are collected and posted to Prometheus.
prometheus_reporting_interval: 1s
# The amount of time until a peer is considered stale.
# To avoid churn, keep this slightly larger than `announce_interval`
peer_lifetime: 31m
# The addresses of redis storage.
# If neither sentinel not cluster switched,
# only first address used
addresses: ["127.0.0.1:6379"]
# Database to be selected after connecting to the server.
db: 0
# Maximum number of socket connections, default is 10 per CPU
pool_size: 10
# Use the specified login/username to authenticate the current connection
login: ""
# Optional password
password: ""
# Connect to sentinel nodes
sentinel: false
# The master name
sentinel_master: ""
# Connect to the redis cluster
cluster: false
# The timeout for reading a command reply from redis.
read_timeout: 15s
# The timeout for writing a command to redis.
write_timeout: 15s
# Dial timeout for establishing new connections.
connect_timeout: 15s
posthooks: []
prehooks: []
+1
View File
@@ -44,6 +44,7 @@ func RegisterBuilder(name string, b Builder) {
builders[name] = b
}
// Frontend dummy interface for bittorrent frontends
type Frontend interface {
stop.Stopper
}
+10 -12
View File
@@ -21,16 +21,14 @@ import (
"github.com/sot-tech/mochi/pkg/stop"
)
const Name = "http"
var (
logger = log.NewLogger(Name)
logger = log.NewLogger("frontend/http")
errTLSNotProvided = errors.New("tls certificate/key not provided")
errRoutesNotProvided = errors.New("routes not provided")
)
func init() {
frontend.RegisterBuilder(Name, newFrontend)
frontend.RegisterBuilder("http", NewFrontend)
}
// Config represents all configurable options for an HTTP BitTorrent Frontend
@@ -51,9 +49,8 @@ const defaultIdleTimeout = 30 * time.Second
// Validate sanity checks values set in a config and returns a new config with
// default values replacing anything that is invalid.
//
// This function warns to the logger when a value is changed.
func (cfg Config) Validate() (validCfg Config, err error) {
validCfg = cfg
if validCfg.ListenOptions, err = cfg.ListenOptions.Validate(); err != nil {
return
}
@@ -66,7 +63,7 @@ func (cfg Config) Validate() (validCfg Config, err error) {
if cfg.EnableKeepAlive {
// If keepalive is disabled, this configuration isn't used anyway.
logger.Warn().
Str("name", "http.IdleTimeout").
Str("name", "IdleTimeout").
Dur("provided", cfg.IdleTimeout).
Dur("default", validCfg.IdleTimeout).
Msg("falling back to default configuration")
@@ -83,7 +80,8 @@ type httpFE struct {
ParseOptions
}
func newFrontend(c conf.MapConfig, logic *middleware.Logic) (_ frontend.Frontend, err error) {
// NewFrontend builds and starts http bittorrent frontend from provided configuration
func NewFrontend(c conf.MapConfig, logic *middleware.Logic) (_ frontend.Frontend, err error) {
var cfg Config
if err = c.Unmarshal(&cfg); err != nil {
return
@@ -137,7 +135,9 @@ func newFrontend(c conf.MapConfig, logic *middleware.Logic) (_ frontend.Frontend
go func() {
ln, err := cfg.ListenTCP()
defer logger.Err(ln.Close()).Msg("closing listener")
defer func() {
logger.Err(ln.Close()).Msg("closing listener")
}()
if err == nil {
if f.srv.TLSConfig == nil {
err = f.srv.Serve(ln)
@@ -145,9 +145,7 @@ func newFrontend(c conf.MapConfig, logic *middleware.Logic) (_ frontend.Frontend
err = f.srv.ServeTLS(ln, "", "")
}
}
if errors.Is(err, http.ErrServerClosed) {
err = nil
} else {
if !errors.Is(err, http.ErrServerClosed) {
logger.Fatal().Err(err).Msg("server failed")
}
}()
+37 -33
View File
@@ -3,12 +3,11 @@ package http
import (
"bytes"
"errors"
"net"
"net/http"
"strconv"
"time"
"github.com/anacrolix/torrent/bencode"
"github.com/sot-tech/mochi/bittorrent"
"github.com/sot-tech/mochi/pkg/bytepool"
)
@@ -36,7 +35,6 @@ func WriteAnnounceResponse(w http.ResponseWriter, resp *bittorrent.AnnounceRespo
if resp.Interval > 0 {
resp.Interval /= time.Second
}
if resp.Interval > 0 {
resp.MinInterval /= time.Second
}
@@ -54,18 +52,9 @@ func WriteAnnounceResponse(w http.ResponseWriter, resp *bittorrent.AnnounceRespo
// Add the peers to the dictionary in the compact format.
if resp.Compact {
// Add the IPv4 peers to the dictionary.
bb.WriteString("5:peersl")
for _, peer := range resp.IPv4Peers {
compactAddress(bb, peer)
}
bb.WriteByte('e')
compactAddresses(bb, resp.IPv4Peers, false)
// Add the IPv6 peers to the dictionary.
bb.WriteString("6:peers6l")
for _, peer := range resp.IPv6Peers {
compactAddress(bb, peer)
}
bb.WriteByte('e')
compactAddresses(bb, resp.IPv6Peers, true)
} else {
// Add the peers to the dictionary.
bb.WriteString("5:peersl")
@@ -83,14 +72,23 @@ func WriteAnnounceResponse(w http.ResponseWriter, resp *bittorrent.AnnounceRespo
return err
}
func compactAddress(bb *bytes.Buffer, peer bittorrent.Peer) {
addr, port := peer.Addr().AsSlice(), peer.Port()
bb.WriteString(strconv.Itoa(len(addr) + 2))
bb.WriteByte(':')
bb.Write(addr)
bb.WriteByte(byte(port >> 8))
bb.WriteByte(byte(port))
return
func compactAddresses(bb *bytes.Buffer, peers bittorrent.Peers, v6 bool) {
l := len(peers)
if l > 0 {
key, al := "5:peers", net.IPv4len
if v6 {
key, al = "6:peers6", net.IPv6len
}
bb.WriteString(key)
bb.WriteString(strconv.Itoa((al + 2) * l))
bb.WriteByte(':')
for _, peer := range peers {
bb.Write(peer.Addr().AsSlice())
port := peer.Port()
bb.WriteByte(byte(port >> 8))
bb.WriteByte(byte(port))
}
}
}
func dictAddress(bb *bytes.Buffer, peer bittorrent.Peer) {
@@ -100,7 +98,7 @@ func dictAddress(bb *bytes.Buffer, peer bittorrent.Peer) {
bb.WriteByte(':')
bb.WriteString(addr)
bb.WriteString("7:peer id20:")
bb.WriteString(peer.ID.RawString())
bb.Write(peer.ID[:])
bb.WriteString("4:porti")
bb.WriteString(strconv.FormatUint(uint64(peer.Port()), 10))
bb.WriteString("ee")
@@ -109,16 +107,22 @@ func dictAddress(bb *bytes.Buffer, peer bittorrent.Peer) {
// WriteScrapeResponse communicates the results of a Scrape to a BitTorrent
// client over HTTP.
func WriteScrapeResponse(w http.ResponseWriter, resp *bittorrent.ScrapeResponse) error {
filesDict := make(map[bittorrent.InfoHash]any, len(resp.Files))
bb := respBufferPool.Get()
defer respBufferPool.Put(bb)
bb.WriteString("d5:filesd")
for _, scrape := range resp.Files {
filesDict[scrape.InfoHash] = map[string]any{
"complete": scrape.Complete,
"downloaded": scrape.Snatches,
"incomplete": scrape.Incomplete,
}
bb.WriteString(strconv.Itoa(len(scrape.InfoHash)))
bb.WriteByte(':')
bb.Write([]byte(scrape.InfoHash))
bb.WriteString("d8:completei")
bb.WriteString(strconv.FormatUint(uint64(scrape.Complete), 10))
bb.WriteString("e10:downloadedi")
bb.WriteString(strconv.FormatUint(uint64(scrape.Snatches), 10))
bb.WriteString("e10:incompletei")
bb.WriteString(strconv.FormatUint(uint64(scrape.Incomplete), 10))
bb.WriteString("ee")
}
return bencode.NewEncoder(w).Encode(map[string]any{
"files": filesDict,
})
bb.WriteString("ee")
_, err := bb.WriteTo(w)
return err
}
+13 -4
View File
@@ -14,7 +14,8 @@ const (
)
var (
errAddressNotProvided = errors.New("address not provided")
// ErrAddressNotProvided returned if listen address not provided in configuration
ErrAddressNotProvided = errors.New("address not provided")
errUnexpectedListenerType = errors.New("unexpected listener type")
)
@@ -27,15 +28,17 @@ type ListenOptions struct {
EnableRequestTiming bool `cfg:"enable_request_timing"`
}
// Validate checks if listen address provided and sets default
// timeout options if needed
func (lo ListenOptions) Validate() (validOptions ListenOptions, err error) {
validOptions = lo
if len(lo.Addr) == 0 {
err = errAddressNotProvided
err = ErrAddressNotProvided
} else {
if lo.ReadTimeout <= 0 {
validOptions.ReadTimeout = defaultReadTimeout
logger.Warn().
Str("name", "http.ReadTimeout").
Str("name", "ReadTimeout").
Dur("provided", lo.ReadTimeout).
Dur("default", validOptions.ReadTimeout).
Msg("falling back to default configuration")
@@ -44,7 +47,7 @@ func (lo ListenOptions) Validate() (validOptions ListenOptions, err error) {
if lo.WriteTimeout <= 0 {
validOptions.WriteTimeout = defaultWriteTimeout
logger.Warn().
Str("name", "http.WriteTimeout").
Str("name", "WriteTimeout").
Dur("provided", lo.WriteTimeout).
Dur("default", validOptions.WriteTimeout).
Msg("falling back to default configuration")
@@ -53,6 +56,9 @@ func (lo ListenOptions) Validate() (validOptions ListenOptions, err error) {
return
}
// ListenTCP listens at the given TCP Addr
// with SO_REUSEPORT and SO_REUSEADDR options enabled if
// ReusePort set to true
func (lo ListenOptions) ListenTCP() (conn *net.TCPListener, err error) {
if lo.ReusePort {
var ln net.Listener
@@ -71,6 +77,9 @@ func (lo ListenOptions) ListenTCP() (conn *net.TCPListener, err error) {
return
}
// ListenUDP listens at the given UDP Addr
// with SO_REUSEPORT and SO_REUSEADDR options enabled if
// ReusePort set to true
func (lo ListenOptions) ListenUDP() (conn *net.UDPConn, err error) {
if lo.ReusePort {
var ln net.PacketConn
+54 -68
View File
@@ -13,8 +13,6 @@ import (
"sync"
"time"
"github.com/libp2p/go-reuseport"
"github.com/sot-tech/mochi/bittorrent"
"github.com/sot-tech/mochi/frontend"
"github.com/sot-tech/mochi/middleware"
@@ -26,19 +24,16 @@ import (
"github.com/sot-tech/mochi/pkg/timecache"
)
const Name = "http"
var (
logger = log.NewLogger(Name)
logger = log.NewLogger("frontend/udp")
allowedGeneratedPrivateKeyRunes = []rune("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ1234567890")
errUnexpectedConnType = errors.New("unexpected connection type (not UDPConn)")
)
func init() {
frontend.RegisterBuilder(Name, newFrontend)
frontend.RegisterBuilder("udp", NewFrontend)
}
// Config represents all of the configurable options for a UDP BitTorrent
// Config represents all the configurable options for a UDP BitTorrent
// Tracker.
type Config struct {
frontend.ListenOptions
@@ -49,10 +44,13 @@ type Config struct {
// Validate sanity checks values set in a config and returns a new config with
// default values replacing anything that is invalid.
//
// This function warns to the logger when a value is changed.
func (cfg Config) Validate() Config {
validcfg := cfg
func (cfg Config) Validate() (validCfg Config, err error) {
if len(cfg.Addr) == 0 {
err = frontend.ErrAddressNotProvided
return
}
validCfg = cfg
// Generate a private key if one isn't provided by the user.
if cfg.PrivateKey == "" {
@@ -60,43 +58,49 @@ func (cfg Config) Validate() Config {
for i := range pkeyRunes {
pkeyRunes[i] = allowedGeneratedPrivateKeyRunes[rand.Intn(len(allowedGeneratedPrivateKeyRunes))]
}
validcfg.PrivateKey = string(pkeyRunes)
validCfg.PrivateKey = string(pkeyRunes)
logger.Warn().
Str("name", "UDP.PrivateKey").
Str("name", "PrivateKey").
Str("provided", "").
Str("key", validcfg.PrivateKey).
Str("key", validCfg.PrivateKey).
Msg("falling back to default configuration")
}
validcfg.ParseOptions = cfg.ParseOptions.Validate()
validCfg.ParseOptions = cfg.ParseOptions.Validate()
return validcfg
return
}
// udpFE holds the state of a UDP BitTorrent Frontend.
type udpFE struct {
socket *net.UDPConn
closing chan struct{}
wg sync.WaitGroup
genPool *sync.Pool
logic *middleware.Logic
Config
socket *net.UDPConn
closing chan any
wg sync.WaitGroup
genPool *sync.Pool
logic *middleware.Logic
maxClockSkew time.Duration
collectTimings bool
frontend.ParseOptions
}
func newFrontend(c conf.MapConfig, logic *middleware.Logic) (frontend.Frontend, error) {
var provided Config
if err := c.Unmarshal(&provided); err != nil {
// NewFrontend builds and starts udp bittorrent frontend from provided configuration
func NewFrontend(c conf.MapConfig, logic *middleware.Logic) (frontend.Frontend, error) {
var err error
var cfg Config
if err = c.Unmarshal(&cfg); err != nil {
return nil, err
}
if cfg, err = cfg.Validate(); err != nil {
return nil, err
}
cfg := provided.Validate()
f := &udpFE{
closing: make(chan struct{}),
logic: logic,
Config: cfg,
closing: make(chan any),
logic: logic,
maxClockSkew: cfg.MaxClockSkew,
collectTimings: cfg.EnableRequestTiming,
ParseOptions: cfg.ParseOptions,
genPool: &sync.Pool{
New: func() any {
return NewConnectionIDGenerator(cfg.PrivateKey)
@@ -104,18 +108,16 @@ func newFrontend(c conf.MapConfig, logic *middleware.Logic) (frontend.Frontend,
},
}
if err := f.listen(); err != nil {
return nil, err
if f.socket, err = cfg.ListenUDP(); err == nil {
f.wg.Add(1)
go func() {
if err := f.serve(); err != nil {
logger.Fatal().Err(err).Msg("server failed")
}
}()
}
f.wg.Add(1)
go func() {
if err := f.serve(); err != nil {
logger.Fatal().Err(err).Msg("failed while serving")
}
}()
return f, nil
return f, err
}
// Stop provides a thread-safe way to shut down a currently running Frontend.
@@ -129,34 +131,18 @@ func (t *udpFE) Stop() stop.Result {
c := make(stop.Channel)
go func() {
close(t.closing)
_ = t.socket.SetReadDeadline(time.Now())
t.wg.Wait()
c.Done(t.socket.Close())
var err error
if t.socket != nil {
_ = t.socket.SetReadDeadline(time.Now())
t.wg.Wait()
err = t.socket.Close()
}
c.Done(err)
}()
return c.Result()
}
// listen resolves the address and binds the server socket.
func (t *udpFE) listen() (err error) {
if t.ReusePort {
var ln net.PacketConn
if ln, err = reuseport.ListenPacket("udp", t.Addr); err == nil {
var isOk bool
if t.socket, isOk = ln.(*net.UDPConn); !isOk {
err = errUnexpectedConnType
}
}
} else {
var udpAddr *net.UDPAddr
udpAddr, err = net.ResolveUDPAddr("udp", t.Addr)
if err == nil {
t.socket, err = net.ListenUDP("udp", udpAddr)
}
}
return err
}
// serve blocks while listening and serving UDP BitTorrent requests
// until Stop() is called or an error is returned.
func (t *udpFE) serve() error {
@@ -199,14 +185,14 @@ func (t *udpFE) serve() error {
// Handle the request.
addr := addrPort.Addr().Unmap()
var start time.Time
if t.EnableRequestTiming && metrics.Enabled() {
if t.collectTimings && metrics.Enabled() {
start = time.Now()
}
action, err := t.handleRequest(
Request{(*buffer)[:n], addr},
ResponseWriter{t.socket, addrPort},
)
if t.EnableRequestTiming && metrics.Enabled() {
if t.collectTimings && metrics.Enabled() {
recordResponseDuration(action, addr, err, time.Since(start))
}
}()
@@ -251,7 +237,7 @@ func (t *udpFE) handleRequest(r Request, w ResponseWriter) (actionName string, e
// If this isn't requesting a new connection ID and the connection ID is
// invalid, then fail.
if actionID != connectActionID && !gen.Validate(connID, r.IP, timecache.Now(), t.MaxClockSkew) {
if actionID != connectActionID && !gen.Validate(connID, r.IP, timecache.Now(), t.maxClockSkew) {
err = errBadConnectionID
WriteError(w, txID, err)
return
+5 -2
View File
@@ -17,12 +17,15 @@ func init() {
}
func TestStartStopRaceIssue437(t *testing.T) {
ps, err := storage.NewStorage("memory", conf.MapConfig{})
ps, err := storage.NewStorage(conf.NamedMapConfig{
Name: "memory",
Config: conf.MapConfig{},
})
if err != nil {
t.Fatal(err)
}
lgc := middleware.NewLogic(0, 0, ps, nil, nil)
fe, err := udp.newFrontend(conf.MapConfig{"addr": "127.0.0.1:0"}, lgc)
fe, err := udp.NewFrontend(conf.MapConfig{"addr": "127.0.0.1:0"}, lgc)
if err != nil {
t.Fatal(err)
}
+6 -8
View File
@@ -23,19 +23,17 @@ import (
"github.com/sot-tech/mochi/storage"
)
// Name is the name by which this middleware is registered with Conf.
const (
Name = "jwt"
authorizationHeader = "authorization"
bearerAuthPrefix = "bearer "
)
func init() {
middleware.RegisterBuilder(Name, build)
middleware.RegisterBuilder("jwt", build)
}
var (
logger = log.NewLogger(Name)
logger = log.NewLogger("middleware/jwt")
// ErrMissingJWT is returned when a JWT is missing from a request.
ErrMissingJWT = bittorrent.ClientError("unapproved request: missing jwt")
@@ -74,7 +72,7 @@ func build(config conf.MapConfig, _ storage.PeerStorage) (h middleware.Hook, err
var cfg Config
if err = config.Unmarshal(&cfg); err != nil {
return nil, fmt.Errorf("middleware %s: %w", Name, err)
return nil, fmt.Errorf("unable to deserialise configuration: %w", err)
}
if len(cfg.JWKSetURL) > 0 && len(cfg.Issuer) > 0 && len(cfg.Audience) > 0 {
@@ -204,7 +202,7 @@ func (h *hook) HandleScrape(ctx context.Context, req *bittorrent.ScrapeRequest,
if errs := h.validateBaseJWT(jwtParam, claims); len(errs) > 0 {
logger.Info().
Errs("errors", errs).
Array("source", req.RequestAddresses).
Array("source", &req.RequestAddresses).
Msg("JWT validation failed")
err = ErrInvalidJWT
} else {
@@ -215,7 +213,7 @@ func (h *hook) HandleScrape(ctx context.Context, req *bittorrent.ScrapeRequest,
} else {
logger.Info().
Err(err).
Array("addresses", req.RequestAddresses).
Array("addresses", &req.RequestAddresses).
Msg("'infohashes' claim parse failed")
}
}
@@ -239,7 +237,7 @@ func (h *hook) HandleScrape(ctx context.Context, req *bittorrent.ScrapeRequest,
logger.Info().
Array("claimInfoHashes", claimIHs).
Array("requestInfoHashes", req.InfoHashes).
Array("addresses", req.RequestAddresses).
Array("addresses", &req.RequestAddresses).
Msg("unequal 'infohashes' claim when validating JWT")
err = ErrInvalidJWT
}
@@ -20,13 +20,10 @@ import (
"github.com/sot-tech/mochi/storage"
)
// Name of this container for registry
const Name = "directory"
var logger = log.NewLogger("torrent approval directory")
var logger = log.NewLogger("middleware/torrent approval/directory")
func init() {
container.Register(Name, build)
container.Register("directory", build)
}
// Config - implementation of directory container configuration.
@@ -12,13 +12,10 @@ import (
"github.com/sot-tech/mochi/storage"
)
// Name of this container for registry.
const Name = "list"
var logger = log.NewLogger("torrent approval list")
var logger = log.NewLogger("middleware/torrent approval/list")
func init() {
container.Register(Name, build)
container.Register("list", build)
}
// Config - implementation of list container configuration.
+2
View File
@@ -52,11 +52,13 @@ func (m MapConfig) Unmarshal(into any) (err error) {
return
}
// NamedMapConfig encapsulates MapConfig with string Name
type NamedMapConfig struct {
Name string
Config MapConfig
}
// MarshalZerologObject writes Name and Config into zerolog event
func (nm NamedMapConfig) MarshalZerologObject(e *zerolog.Event) {
e.Str("name", nm.Name).Dict("config", zerolog.Dict().EmbedObject(nm.Config))
}
+3 -7
View File
@@ -24,14 +24,10 @@ import (
r "github.com/sot-tech/mochi/storage/redis"
)
// Name is name of this storage
const (
Name = "keydb"
expireMemberCmd = "EXPIREMEMBER"
)
const expireMemberCmd = "EXPIREMEMBER"
var (
logger = log.NewLogger(Name)
logger = log.NewLogger("storage/keydb")
// errNotKeyDB returned from initializer if connected does not support KeyDB
// specific command (EXPIREMEMBER)
errNotKeyDB = errors.New("provided instance seems not KeyDB")
@@ -39,7 +35,7 @@ var (
func init() {
// Register the storage driver.
storage.RegisterDriver(Name, builder)
storage.RegisterDriver("keydb", builder)
}
func builder(icfg conf.MapConfig) (storage.PeerStorage, error) {
+3 -7
View File
@@ -19,17 +19,13 @@ import (
)
// Default config constants.
const (
// Name is the name by which this peer store is registered with Conf.
Name = "memory"
defaultShardCount = 1024
)
const defaultShardCount = 1024
var logger = log.NewLogger(Name)
var logger = log.NewLogger("storage/memory")
func init() {
// Register the storage driver.
storage.RegisterDriver(Name, builder)
storage.RegisterDriver("memory", builder)
}
func builder(icfg conf.MapConfig) (storage.PeerStorage, error) {
+2 -6
View File
@@ -26,9 +26,6 @@ import (
)
const (
// Name is the name by which this peer store is registered with Conf.
Name = "pg"
defaultPingQuery = "SELECT 0"
errRequiredParameterNotSetMsg = "required parameter not provided: %s"
@@ -49,14 +46,13 @@ const (
)
var (
logger = log.NewLogger(Name)
logger = log.NewLogger("storage/pg")
errConnectionStringNotProvided = errors.New("database connection string not provided")
)
func init() {
// Register the storage builder.
storage.RegisterDriver(Name, builder)
storage.RegisterDriver("pg", builder)
}
func builder(icfg conf.MapConfig) (storage.PeerStorage, error) {
+2 -4
View File
@@ -42,8 +42,6 @@ import (
)
const (
// Name is the name by which this peer store is registered with Conf.
Name = "redis"
// Default config constants.
defaultRedisAddress = "127.0.0.1:6379"
defaultReadTimeout = time.Second * 15
@@ -70,14 +68,14 @@ const (
)
var (
logger = log.NewLogger(Name)
logger = log.NewLogger("storage/redis")
// errSentinelAndClusterChecked returned from initializer if both Config.Sentinel and Config.Cluster provided
errSentinelAndClusterChecked = errors.New("unable to use both cluster and sentinel mode")
)
func init() {
// Register the storage builder.
storage.RegisterDriver(Name, builder)
storage.RegisterDriver("redis", builder)
}
func builder(icfg conf.MapConfig) (storage.PeerStorage, error) {