(tested) refactor code

* add sentinel master parameter into driver config
* replace yaml double deserialization with `mapstructure` in initializers
* replace struct initializers with registered functions
* add torrent approval MD and a sanitize rest MDs
This commit is contained in:
Lawrence, Rendall
2022-04-16 00:21:47 +03:00
parent 397e106396
commit 8cd8343757
32 changed files with 473 additions and 385 deletions
+18 -33
View File
@@ -3,12 +3,11 @@ package main
import (
"errors"
"os"
"time"
"gopkg.in/yaml.v3"
"github.com/sot-tech/mochi/frontend/http"
"github.com/sot-tech/mochi/frontend/udp"
"github.com/sot-tech/mochi/middleware"
"github.com/sot-tech/mochi/pkg/conf"
// Imports to register middleware drivers.
_ "github.com/sot-tech/mochi/middleware/clientapproval"
@@ -21,38 +20,24 @@ import (
_ "github.com/sot-tech/mochi/storage/redis"
)
type storageConfig struct {
Name string `yaml:"name"`
Config any `yaml:"config"`
}
// Config represents the configuration used for executing Conf.
type Config struct {
middleware.ResponseConfig `yaml:",inline"`
MetricsAddr string `yaml:"metrics_addr"`
HTTPConfig http.Config `yaml:"http"`
UDPConfig udp.Config `yaml:"udp"`
Storage storageConfig `yaml:"storage"`
PreHooks []middleware.Config `yaml:"prehooks"`
PostHooks []middleware.Config `yaml:"posthooks"`
}
// PreHookNames returns only the names of the configured middleware.
func (cfg Config) PreHookNames() (names []string) {
for _, hook := range cfg.PreHooks {
names = append(names, hook.Name)
}
return
}
// PostHookNames returns only the names of the configured middleware.
func (cfg Config) PostHookNames() (names []string) {
for _, hook := range cfg.PostHooks {
names = append(names, hook.Name)
}
return
// TODO(jzelinskie): Evaluate whether we would like to make
// AnnounceInterval and MinAnnounceInterval optional.
// We can make Conf extensible enough that you can program a new response
// generator at the cost of making it possible for users to create config that
// won't compose a functional tracker.
AnnounceInterval time.Duration `yaml:"announce_interval"`
MinAnnounceInterval time.Duration `yaml:"min_announce_interval"`
MetricsAddr string `yaml:"metrics_addr"`
HTTPConfig conf.MapConfig `yaml:"http"`
UDPConfig conf.MapConfig `yaml:"udp"`
Storage struct {
Name string `yaml:"name"`
Config conf.MapConfig `yaml:"config"`
} `yaml:"storage"`
PreHooks []conf.MapConfig `yaml:"prehooks"`
PostHooks []conf.MapConfig `yaml:"posthooks"`
}
// ConfigFile represents a namespaced YAML configation file.
+12 -13
View File
@@ -15,6 +15,7 @@ import (
"github.com/sot-tech/mochi/frontend/http"
"github.com/sot-tech/mochi/frontend/udp"
"github.com/sot-tech/mochi/middleware"
"github.com/sot-tech/mochi/pkg/conf"
"github.com/sot-tech/mochi/pkg/log"
"github.com/sot-tech/mochi/pkg/metrics"
_ "github.com/sot-tech/mochi/pkg/randseed"
@@ -79,28 +80,26 @@ func (r *Run) Start(ps storage.Storage) error {
return fmt.Errorf("failed to validate hook config: %w", err)
}
log.Info("starting tracker logic", log.Fields{
"prehooks": cfg.PreHookNames(),
"posthooks": cfg.PostHookNames(),
})
r.logic = middleware.NewLogic(cfg.ResponseConfig, r.storage, preHooks, postHooks)
r.logic = middleware.NewLogic(cfg.AnnounceInterval, cfg.MinAnnounceInterval, r.storage, preHooks, postHooks)
if cfg.HTTPConfig.Addr != "" {
if len(cfg.HTTPConfig) > 0 {
log.Info("starting HTTP frontend", cfg.HTTPConfig)
httpfe, err := http.NewFrontend(r.logic, cfg.HTTPConfig)
if err != nil {
httpFE, err := http.NewFrontend(r.logic, cfg.HTTPConfig)
if err == nil {
r.sg.Add(httpFE)
} else if !errors.Is(err, conf.ErrNilConfigMap) {
return err
}
r.sg.Add(httpfe)
}
if cfg.UDPConfig.Addr != "" {
if len(cfg.UDPConfig) > 0 {
log.Info("starting UDP frontend", cfg.UDPConfig)
udpfe, err := udp.NewFrontend(r.logic, cfg.UDPConfig)
if err != nil {
udpFE, err := udp.NewFrontend(r.logic, cfg.UDPConfig)
if err == nil {
r.sg.Add(udpFE)
} else if !errors.Is(err, conf.ErrNilConfigMap) {
return err
}
r.sg.Add(udpfe)
}
return nil
+78 -53
View File
@@ -140,66 +140,91 @@ mochi:
prometheus_reporting_interval: 1s
# This block defines configuration used for redis storage.
# storage:
# name: redis
# config:
# # The frequency which stale peers are removed.
# # This balances between
# # - collecting garbage more often, potentially using more CPU time, but potentially using less memory (lower value)
# # - collecting garbage less frequently, saving CPU time, but keeping old peers long, thus using more memory (higher value).
# gc_interval: 3m
#storage:
#name: redis
#config:
# The frequency which stale peers are removed.
# This balances between
# - collecting garbage more often, potentially using more CPU time, but potentially using less memory (lower value)
# - collecting garbage less frequently, saving CPU time, but keeping old peers long, thus using more memory (higher value).
#gc_interval: 3m
# # The interval at which metrics about the number of infohashes and peers
# # are collected and posted to Prometheus.
# prometheus_reporting_interval: 1s
# The interval at which metrics about the number of infohashes and peers
# are collected and posted to Prometheus.
#prometheus_reporting_interval: 1s
# # The amount of time until a peer is considered stale.
# # To avoid churn, keep this slightly larger than `announce_interval`
# peer_lifetime: 31m
# The amount of time until a peer is considered stale.
# To avoid churn, keep this slightly larger than `announce_interval`
#peer_lifetime: 31m
# # The address of redis storage.
# redis_broker: "redis://pwd@127.0.0.1:6379/0"
# The addresses of redis storage.
# If neither sentinel not cluster switched,
# only first address used
#addresses: ["127.0.0.1:6379"]
# # The timeout for reading a command reply from redis.
# read_timeout: 15s
# Database to be selected after connecting to the server.
#db: 0
# # The timeout for writing a command to redis.
# write_timeout: 15s
# Maximum number of socket connections, default is 10 per CPU
#pool_size: 10
# # The timeout for connecting to redis server.
# connect_timeout: 15s
# Use the specified login/username to authenticate the current connection
#login: ""
# Optional password
#password: ""
# Connect to sentinel nodes
#sentinel: false
# The master name
#sentinel_master: ""
# Connect to the redis cluster
#cluster: false
# The timeout for reading a command reply from redis.
#read_timeout: 15s
# The timeout for writing a command to redis.
#write_timeout: 15s
# Dial timeout for establishing new connections.
#connect_timeout: 15s
# This block defines configuration used for middleware executed before a
# response has been returned to a BitTorrent client.
prehooks:
#- name: jwt
# options:
# issuer: "https://issuer.com"
# audience: "https://some.issuer.com"
# jwk_set_url: "https://issuer.com/keys"
# jwk_set_update_interval: 5m
#- name: client approval
# options:
# whitelist:
# - "OP1011"
# blacklist:
# - "OP1012"
#- name: interval variation
# options:
# modify_response_probability: 0.2
# max_increase_delta: 60
# modify_min_interval: true
# This block defines configuration used for torrent approval, it requires to be given
# hashes for whitelist or for blacklist. Hashes are hexadecimal-encoaded.
#- name: torrent approval
# options:
# initial_source: list
# configuration:
# hash_list:
# - "a1b2c3d4e5a1b2c3d4e5a1b2c3d4e5a1b2c3d4e5"
# invert: false
# storage_ctx: APPROVED_HASH
# - name: jwt
# options:
# issuer: "https://issuer.com"
# audience: "https://some.issuer.com"
# jwk_set_url: "https://issuer.com/keys"
# jwk_set_update_interval: 5m
#
# - name: client approval
# options:
# whitelist:
# - "OP1011"
# blacklist:
# - "OP1012"
#
# - name: interval variation
# options:
# modify_response_probability: 0.2
# max_increase_delta: 60
# modify_min_interval: true
#
# This block defines configuration used for torrent approval, it requires to be given
# hashes for whitelist or for blacklist. Hashes are hexadecimal-encoaded.
# - name: torrent approval
# options:
# initial_source: list
# configuration:
# hash_list:
# - "a1b2c3d4e5a1b2c3d4e5a1b2c3d4e5a1b2c3d4e5"
# true - whitelist mode, false - blacklist
# invert: false
# Name of storage context where store hash list
# storage_ctx: APPROVED_HASH
posthooks:
+15 -5
View File
@@ -2,12 +2,22 @@
### Overview
BitTorrent clients send Announce and Scrape requests to a _Frontend_. Frontends parse requests and write responses for
the particular protocol they implement. The _TrackerLogic_ interface is used to generate responses for requests and
optionally perform a task after responding to a client. A configurable chain of _PreHook_ and _PostHook_ middleware is
used to construct an instance of TrackerLogic. PreHooks are middleware that are executed before the response has been
BitTorrent clients send Announce and Scrape requests to a _Frontend_.
Frontends parse requests and write responses for
the particular protocol they implement.
The _TrackerLogic_ interface is used to generate responses for requests and
optionally perform a task after responding to a client.
A configurable chain of _PreHook_ and _PostHook_ middleware is
used to construct an instance of TrackerLogic.
PreHooks are middleware that are executed before the response has been
written. After all PreHooks have executed, any missing response fields that are required are filled by reading out of
the configured implementation of the _Storage_ interface. PostHooks are asynchronous tasks that occur after a response
the configured implementation of the _Storage_ interface.
PostHooks are asynchronous tasks that occur after a response
has been delivered to the client. Because they are unnecessary to for generating a response, updates to the Storage for
a particular request are done asynchronously in a PostHook.
+3 -3
View File
@@ -1,10 +1,10 @@
# Frontends
A _Frontend_ is a component of Chihaya that serves a BitTorrent tracker on one protocol. The frontend accepts, parses
A _Frontend_ is a component of MoChi that serves a BitTorrent tracker on one protocol. The frontend accepts, parses
and sanitizes requests, passes them to the _Logic_ and writes responses to _Clients_.
This documentation first gives a high-level overview of Frontends and later goes into implementation specifics. Users of
Chihaya are expected to just read the first part - developers should read both.
MoChi are expected to just read the first part - developers should read both.
## Functionality
@@ -19,7 +19,7 @@ answers each of them with one response, a basic overview of the control flow is:
## Available Frontends
Chihaya ships with frontends for HTTP(S) and UDP. The HTTP frontend uses Go's `http` package. The UDP frontend
MoChi ships with frontends for HTTP(S) and UDP. The HTTP frontend uses Go's `http` package. The UDP frontend
implements both [old-opentracker-style] IPv6 and the IPv6 support specified in [BEP 15]. The advantage of the old
opentracker style is that it contains a usable IPv6 `ip` field, to enable IP overrides in announces.
+6 -6
View File
@@ -28,10 +28,10 @@ An example config might look like this:
```yaml
mochi:
prehooks:
- name: interval variation
config:
modify_response_probability: 0.2
max_increase_delta: 60
modify_min_interval: true
prehooks:
- name: interval variation
config:
modify_response_probability: 0.2
max_increase_delta: 60
modify_min_interval: true
```
+64
View File
@@ -0,0 +1,64 @@
# Approved torrents list
Package `torrentapproval` can be used for only allow or block
specified hashes or block specified hashes.
## Functionality
As said above, there are two modes of approval: white list and black list.
If mode is **white list** (`invert` set to `false`), tracker works in
_semi-private_ mode, which means, than peers can could share and receive info
about only specified list of torrents' hashes.
I.e.: if configuration contains hash `AAAA`, but peer announces hash `BBBB`
tracker will return `unapproved torrent` message back to peer.
If mode is **black list** (`invert` set to `true`), tracker will allow all hashes
**except** specified.
## Hash sources
There are two sources of hashes: `list` and `directory`.
Both of them used as **INITIAL** source for storing in storage.
If storage is not `memory`, records are persisted until _somebody_
or _something_ (different tool with access to storage) won't delete it.
`list` is the static set of hashes, specified in configuration file.
`directory` will watch for `*.torrent` files in specified path and
append/delete records from storage. This source will parse all existing
files at start and then watch for new files to add, or for delete events
to remove hash from storage.
## Configuration
This middleware provides the following parameters for configuration:
- `initial_source` - source type: `list` or `directory`
- `configuration` - options for specified source
- `list`:
- `hash_list` - list of HEX encoded hashes
- `invert` - working mode: `true` - black list, `false` - white list
- `storage_ctx` - name of storage _context_ where to store data.
It may be redis hash key, DB table name etc.
- `directory`:
- `path` - directory to watch
- `invert` and `storage_ctx` has the same meanins as `list`'s options
Configuration example:
An example config might look like this:
```yaml
mochi:
prehooks:
- name: torrent approval
options:
initial_source: list
configuration:
hash_list: ["AAA", "BBB"]
invert: false
storage_ctx: APPROVED_HASH
```
+18 -21
View File
@@ -1,17 +1,17 @@
# Redis Storage
This storage implementation separates Chihaya from its storage service. Chihaya achieves HA by storing all peer data in
Redis. Multiple instances of Chihaya can use the same redis instance concurrently. The storage service can get HA by
clustering. If one instance of Chihaya goes down, peer data will still be available in Redis.
This storage implementation separates MoChi from its storage service. MoChi achieves HA by storing all peer data in
Redis. Multiple instances of MoChi can use the same redis instance concurrently. The storage service can get HA by
clustering. If one instance of MoChi goes down, peer data will still be available in Redis.
The HA of storage service is not considered here. In case Redis runs as a single node, peer data will be unavailable if
the node is down. You should consider setting up a Redis cluster for Chihaya in production.
the node is down. You should consider setting up a Redis sentinel (or KeyDB active-active replication) for MoChi in production.
This storage implementation is currently orders of magnitude slower than the in-memory implementation.
## Use Case
When one instance of Chihaya is down, other instances can continue serving peers from Redis.
When one instance of MoChi is down, other instances can continue serving peers from Redis.
## Configuration
@@ -57,27 +57,24 @@ time as value.
Here is an example:
```
- IPv4
- IPv4_S_<infohash 1>: <modification time>
- IPv4_L_<infohash 1>: <modification time>
- IPv4_S_<infohash 2>: <modification time>
- IPv4_S_<infohash 1>
- <peer 1 key>: <modification time>
- <peer 2 key>: <modification time>
- IPv4_L_<infohash 1>
- <peer 3 key>: <modification time>
- IPv4_S_<infohash 2>
- <peer 3 key>: <modification time>
- CHI_4_I
- CHI_4_S_<HASH1>
- CHI_4_L_<HASH1>
- CHI_4_S_<HASH1>
- <peer 1 key>: <modification time in unix nanos>
- <peer 2 key>: <modification time in unix nanos>
- CHI_4_L_<HASH2>
- <peer 3 key>: <modification time in unix nanos>
...
```
In this case, prometheus would record two swarms, three seeders, and one leecher. These three keys per address family
are used to record the count of swarms, seeders, and leechers.
```
- IPv4_infohash_count: 2
- IPv4_S_count: 3
- IPv4_L_count: 1
- CHI_4_S_C: "3"
- CHI_6_L_C: "1"
```
Note: IPv4_infohash_count has a different meaning compared to the `memory` storage:
It represents the number of infohashes reported by seeder, meaning that infohashes without seeders are not counted.
Note: `CHI_4_I` set has a different meaning compared to the `memory` storage:
It represents info hashes reported by seeder, meaning that info hashes without seeders are not counted.
+19 -14
View File
@@ -15,6 +15,7 @@ import (
"github.com/sot-tech/mochi/bittorrent"
"github.com/sot-tech/mochi/frontend"
"github.com/sot-tech/mochi/pkg/conf"
"github.com/sot-tech/mochi/pkg/log"
"github.com/sot-tech/mochi/pkg/stop"
)
@@ -22,19 +23,19 @@ import (
// Config represents all of the configurable options for an HTTP BitTorrent
// Frontend.
type Config struct {
Addr string `yaml:"addr"`
HTTPSAddr string `yaml:"https_addr"`
ReadTimeout time.Duration `yaml:"read_timeout"`
WriteTimeout time.Duration `yaml:"write_timeout"`
IdleTimeout time.Duration `yaml:"idle_timeout"`
EnableKeepAlive bool `yaml:"enable_keepalive"`
TLSCertPath string `yaml:"tls_cert_path"`
TLSKeyPath string `yaml:"tls_key_path"`
AnnounceRoutes []string `yaml:"announce_routes"`
ScrapeRoutes []string `yaml:"scrape_routes"`
PingRoutes []string `yaml:"ping_routes"`
EnableRequestTiming bool `yaml:"enable_request_timing"`
ParseOptions `yaml:",inline"`
Addr string
HTTPSAddr string `cfg:"https_addr"`
ReadTimeout time.Duration `cfg:"read_timeout"`
WriteTimeout time.Duration `cfg:"write_timeout"`
IdleTimeout time.Duration `cfg:"idle_timeout"`
EnableKeepAlive bool `cfg:"enable_keepalive"`
TLSCertPath string `cfg:"tls_cert_path"`
TLSKeyPath string `cfg:"tls_key_path"`
AnnounceRoutes []string `cfg:"announce_routes"`
ScrapeRoutes []string `cfg:"scrape_routes"`
PingRoutes []string `cfg:"ping_routes"`
EnableRequestTiming bool `cfg:"enable_request_timing"`
ParseOptions
}
// LogFields renders the current config as a set of Logrus fields.
@@ -147,7 +148,11 @@ type Frontend struct {
// NewFrontend creates a new instance of an HTTP Frontend that asynchronously
// serves requests.
func NewFrontend(logic frontend.TrackerLogic, provided Config) (*Frontend, error) {
func NewFrontend(logic frontend.TrackerLogic, c conf.MapConfig) (*Frontend, error) {
var provided Config
if err := c.Unmarshal(&provided); err != nil {
return nil, err
}
cfg := provided.Validate()
f := &Frontend{
+5 -5
View File
@@ -14,11 +14,11 @@ import (
// If RealIPHeader is not empty string, the value of the first HTTP Header with
// that name will be used.
type ParseOptions struct {
AllowIPSpoofing bool `yaml:"allow_ip_spoofing"`
RealIPHeader string `yaml:"real_ip_header"`
MaxNumWant uint32 `yaml:"max_numwant"`
DefaultNumWant uint32 `yaml:"default_numwant"`
MaxScrapeInfoHashes uint32 `yaml:"max_scrape_infohashes"`
AllowIPSpoofing bool `cfg:"allow_ip_spoofing"`
RealIPHeader string `cfg:"real_ip_header"`
MaxNumWant uint32 `cfg:"max_numwant"`
DefaultNumWant uint32 `cfg:"default_numwant"`
MaxScrapeInfoHashes uint32 `cfg:"max_scrape_infohashes"`
}
// Default parser config constants.
+12 -7
View File
@@ -16,6 +16,7 @@ import (
"github.com/sot-tech/mochi/bittorrent"
"github.com/sot-tech/mochi/frontend"
"github.com/sot-tech/mochi/frontend/udp/bytepool"
"github.com/sot-tech/mochi/pkg/conf"
"github.com/sot-tech/mochi/pkg/log"
"github.com/sot-tech/mochi/pkg/stop"
"github.com/sot-tech/mochi/pkg/timecache"
@@ -26,11 +27,11 @@ var allowedGeneratedPrivateKeyRunes = []rune("abcdefghijklmnopqrstuvwxyzABCDEFGH
// Config represents all of the configurable options for a UDP BitTorrent
// Tracker.
type Config struct {
Addr string `yaml:"addr"`
PrivateKey string `yaml:"private_key"`
MaxClockSkew time.Duration `yaml:"max_clock_skew"`
EnableRequestTiming bool `yaml:"enable_request_timing"`
ParseOptions `yaml:",inline"`
Addr string
PrivateKey string `cfg:"private_key"`
MaxClockSkew time.Duration `cfg:"max_clock_skew"`
EnableRequestTiming bool `cfg:"enable_request_timing"`
ParseOptions
}
// LogFields renders the current config as a set of Logrus fields.
@@ -109,7 +110,11 @@ type Frontend struct {
// NewFrontend creates a new instance of an UDP Frontend that asynchronously
// serves requests.
func NewFrontend(logic frontend.TrackerLogic, provided Config) (*Frontend, error) {
func NewFrontend(logic frontend.TrackerLogic, c conf.MapConfig) (*Frontend, error) {
var provided Config
if err := c.Unmarshal(&provided); err != nil {
return nil, err
}
cfg := provided.Validate()
f := &Frontend{
@@ -136,7 +141,7 @@ func NewFrontend(logic frontend.TrackerLogic, provided Config) (*Frontend, error
return f, nil
}
// Stop provides a thread-safe way to shutdown a currently running Frontend.
// Stop provides a thread-safe way to shut down a currently running Frontend.
func (t *Frontend) Stop() stop.Result {
select {
case <-t.closing:
+4 -4
View File
@@ -5,19 +5,19 @@ import (
"github.com/sot-tech/mochi/frontend/udp"
"github.com/sot-tech/mochi/middleware"
"github.com/sot-tech/mochi/pkg/conf"
_ "github.com/sot-tech/mochi/pkg/randseed"
"github.com/sot-tech/mochi/storage"
_ "github.com/sot-tech/mochi/storage/memory"
)
func TestStartStopRaceIssue437(t *testing.T) {
ps, err := storage.NewStorage("memory", nil)
ps, err := storage.NewStorage("memory", conf.MapConfig{})
if err != nil {
t.Fatal(err)
}
var responseConfig middleware.ResponseConfig
lgc := middleware.NewLogic(responseConfig, ps, nil, nil)
fe, err := udp.NewFrontend(lgc, udp.Config{Addr: "127.0.0.1:0"})
lgc := middleware.NewLogic(0, 0, ps, nil, nil)
fe, err := udp.NewFrontend(lgc, conf.MapConfig{"addr": "127.0.0.1:0"})
if err != nil {
t.Fatal(err)
}
+4 -4
View File
@@ -51,10 +51,10 @@ var (
//
// If AllowIPSpoofing is true, IPs provided via params will be used.
type ParseOptions struct {
AllowIPSpoofing bool `yaml:"allow_ip_spoofing"`
MaxNumWant uint32 `yaml:"max_numwant"`
DefaultNumWant uint32 `yaml:"default_numwant"`
MaxScrapeInfoHashes uint32 `yaml:"max_scrape_infohashes"`
AllowIPSpoofing bool `cfg:"allow_ip_spoofing"`
MaxNumWant uint32 `cfg:"max_numwant"`
DefaultNumWant uint32 `cfg:"default_numwant"`
MaxScrapeInfoHashes uint32 `cfg:"max_scrape_infohashes"`
}
// Default parser config constants.
+1
View File
@@ -10,6 +10,7 @@ require (
github.com/julienschmidt/httprouter v1.3.0
github.com/mendsley/gojwk v0.0.0-20141217222730-4d5ec6e58103
github.com/minio/sha256-simd v1.0.0
github.com/mitchellh/mapstructure v1.4.3
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.12.1
github.com/sirupsen/logrus v1.8.1
+2
View File
@@ -247,6 +247,8 @@ github.com/mendsley/gojwk v0.0.0-20141217222730-4d5ec6e58103 h1:Z/i1e+gTZrmcGeZy
github.com/mendsley/gojwk v0.0.0-20141217222730-4d5ec6e58103/go.mod h1:o9YPB5aGP8ob35Vy6+vyq3P3bWe7NQWzf+JLiXCiMaE=
github.com/minio/sha256-simd v1.0.0 h1:v1ta+49hkWZyvaKwrQB8elexRqm6Y0aMLjCNsrYxo6g=
github.com/minio/sha256-simd v1.0.0/go.mod h1:OuYzVNI5vcoYIAmbIvHPl3N3jUzVedXbKy5RFepssQM=
github.com/mitchellh/mapstructure v1.4.3 h1:OVowDSCllw/YjdLkam3/sm7wEtOy59d8ndGgCcyj8cs=
github.com/mitchellh/mapstructure v1.4.3/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo=
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0=
+17 -23
View File
@@ -7,10 +7,9 @@ import (
"errors"
"fmt"
"gopkg.in/yaml.v3"
"github.com/sot-tech/mochi/bittorrent"
"github.com/sot-tech/mochi/middleware"
"github.com/sot-tech/mochi/pkg/conf"
"github.com/sot-tech/mochi/storage"
)
@@ -18,31 +17,21 @@ import (
const Name = "client approval"
func init() {
middleware.RegisterDriver(Name, driver{})
middleware.RegisterBuilder(Name, build)
}
var _ middleware.Driver = driver{}
var (
// ErrClientUnapproved is the error returned when a client's PeerID is invalid.
ErrClientUnapproved = bittorrent.ClientError("unapproved client")
type driver struct{}
func (d driver) NewHook(optionBytes []byte, _ storage.Storage) (middleware.Hook, error) {
var cfg Config
err := yaml.Unmarshal(optionBytes, &cfg)
if err != nil {
return nil, fmt.Errorf("invalid options for middleware %s: %w", Name, err)
}
return NewHook(cfg)
}
// ErrClientUnapproved is the error returned when a client's PeerID is invalid.
var ErrClientUnapproved = bittorrent.ClientError("unapproved client")
errBothListsProvided = errors.New("using both whitelist and blacklist is invalid")
)
// Config represents all the values required by this middleware to validate
// peers based on their BitTorrent client ID.
type Config struct {
Whitelist []string `yaml:"whitelist"`
Blacklist []string `yaml:"blacklist"`
Whitelist []string
Blacklist []string
}
type hook struct {
@@ -50,15 +39,20 @@ type hook struct {
unapproved map[ClientID]struct{}
}
// NewHook returns an instance of the client approval middleware.
func NewHook(cfg Config) (middleware.Hook, error) {
func build(options conf.MapConfig, _ storage.Storage) (middleware.Hook, error) {
var cfg Config
if err := options.Unmarshal(&cfg); err != nil {
return nil, fmt.Errorf("middleware %s: %w", Name, err)
}
h := &hook{
approved: make(map[ClientID]struct{}),
unapproved: make(map[ClientID]struct{}),
}
if len(cfg.Whitelist) > 0 && len(cfg.Blacklist) > 0 {
return nil, fmt.Errorf("using both whitelist and blacklist is invalid")
return nil, errBothListsProvided
}
for _, cidString := range cfg.Whitelist {
@@ -8,6 +8,7 @@ import (
"github.com/stretchr/testify/require"
"github.com/sot-tech/mochi/bittorrent"
"github.com/sot-tech/mochi/pkg/conf"
)
var cases = []struct {
@@ -52,7 +53,8 @@ var cases = []struct {
func TestHandleAnnounce(t *testing.T) {
for _, tt := range cases {
t.Run(fmt.Sprintf("testing peerid %s", tt.peerID), func(t *testing.T) {
h, err := NewHook(tt.cfg)
c := conf.MapConfig{"whitelist": tt.cfg.Whitelist, "blacklist": tt.cfg.Blacklist}
h, err := build(c, nil)
require.Nil(t, err)
ctx := context.Background()
+14 -23
View File
@@ -21,10 +21,10 @@ import (
"github.com/SermoDigital/jose/jws"
"github.com/SermoDigital/jose/jwt"
"github.com/mendsley/gojwk"
"gopkg.in/yaml.v3"
"github.com/sot-tech/mochi/bittorrent"
"github.com/sot-tech/mochi/middleware"
"github.com/sot-tech/mochi/pkg/conf"
"github.com/sot-tech/mochi/pkg/log"
"github.com/sot-tech/mochi/pkg/stop"
"github.com/sot-tech/mochi/storage"
@@ -34,21 +34,7 @@ import (
const Name = "jwt"
func init() {
middleware.RegisterDriver(Name, driver{})
}
var _ middleware.Driver = driver{}
type driver struct{}
func (d driver) NewHook(optionBytes []byte, _ storage.Storage) (middleware.Hook, error) {
var cfg Config
err := yaml.Unmarshal(optionBytes, &cfg)
if err != nil {
return nil, fmt.Errorf("invalid options for middleware %s: %w", Name, err)
}
return NewHook(cfg)
middleware.RegisterBuilder(Name, build)
}
var (
@@ -62,10 +48,10 @@ var (
// Config represents all the values required by this middleware to fetch JWKs
// and verify JWTs.
type Config struct {
Issuer string `yaml:"issuer"`
Audience string `yaml:"audience"`
JWKSetURL string `yaml:"jwk_set_url"`
JWKUpdateInterval time.Duration `yaml:"jwk_set_update_interval"`
Issuer string
Audience string
JWKSetURL string `cfg:"jwk_set_url"`
JWKUpdateInterval time.Duration `cfg:"jwk_set_update_interval"`
}
// LogFields implements log.Fielder for a Config.
@@ -84,9 +70,14 @@ type hook struct {
closing chan struct{}
}
// NewHook returns an instance of the JWT middleware.
func NewHook(cfg Config) (middleware.Hook, error) {
log.Debug("creating new JWT middleware", cfg)
func build(options conf.MapConfig, _ storage.Storage) (middleware.Hook, error) {
var cfg Config
if err := options.Unmarshal(&cfg); err != nil {
return nil, fmt.Errorf("middleware %s: %w", Name, err)
}
log.Debug("creating new JWT middleware", options)
h := &hook{
cfg: cfg,
publicKeys: map[string]crypto.PublicKey{},
+3 -14
View File
@@ -11,25 +11,14 @@ import (
"github.com/sot-tech/mochi/storage"
)
// ResponseConfig holds the configuration used for the actual response.
//
// TODO(jzelinskie): Evaluate whether we would like to make this optional.
// We can make Conf extensible enough that you can program a new response
// generator at the cost of making it possible for users to create config that
// won't compose a functional tracker.
type ResponseConfig struct {
AnnounceInterval time.Duration `yaml:"announce_interval"`
MinAnnounceInterval time.Duration `yaml:"min_announce_interval"`
}
var _ frontend.TrackerLogic = &Logic{}
// NewLogic creates a new instance of a TrackerLogic that executes the provided
// middleware hooks.
func NewLogic(cfg ResponseConfig, peerStore storage.Storage, preHooks, postHooks []Hook) *Logic {
func NewLogic(annInterval, minAnnInterval time.Duration, peerStore storage.Storage, preHooks, postHooks []Hook) *Logic {
return &Logic{
announceInterval: cfg.AnnounceInterval,
minAnnounceInterval: cfg.MinAnnounceInterval,
announceInterval: annInterval,
minAnnounceInterval: minAnnInterval,
preHooks: append(preHooks, &responseHook{store: peerStore}),
postHooks: append(postHooks, &swarmInteractionHook{store: peerStore}),
}
+31 -34
View File
@@ -6,87 +6,84 @@ import (
"errors"
"sync"
"gopkg.in/yaml.v3"
"github.com/sot-tech/mochi/pkg/conf"
"github.com/sot-tech/mochi/storage"
)
var (
driversM sync.RWMutex
drivers = make(map[string]Driver)
drivers = make(map[string]Builder)
// ErrDriverDoesNotExist is the error returned by NewMiddleware when a
// ErrBuilderDoesNotExist is the error returned by NewMiddleware when a
// middleware driver with that name does not exist.
ErrDriverDoesNotExist = errors.New("middleware driver with that name does not exist")
ErrBuilderDoesNotExist = errors.New("middleware builder with that name does not exist")
)
// Driver is the interface used to initialize a new type of middleware.
// Builder is the interface used to initialize a new type of middleware.
//
// The options parameter is YAML encoded bytes that should be unmarshalled into
// The `options` parameter is map of parameters that should be unmarshalled into
// the hook's custom configuration.
type Driver interface {
NewHook(options []byte, storage storage.Storage) (Hook, error)
}
type Builder func(options conf.MapConfig, storage storage.Storage) (Hook, error)
// RegisterDriver makes a Driver available by the provided name.
// RegisterBuilder makes a Builder available by the provided name.
//
// If called twice with the same name, the name is blank, or if the provided
// Driver is nil, this function panics.
func RegisterDriver(name string, d Driver) {
// Builder is nil, this function panics.
func RegisterBuilder(name string, d Builder) {
if name == "" {
panic("middleware: could not register a Driver with an empty name")
panic("middleware: could not register a Builder with an empty name")
}
if d == nil {
panic("middleware: could not register a nil Driver")
panic("middleware: could not register a nil Builder")
}
driversM.Lock()
defer driversM.Unlock()
if _, dup := drivers[name]; dup {
panic("middleware: RegisterDriver called twice for " + name)
panic("middleware: RegisterBuilder called twice for " + name)
}
drivers[name] = d
}
// New attempts to initialize a new middleware instance from the
// list of registered Drivers.
// list of registered Builders.
//
// If a driver does not exist, returns ErrDriverDoesNotExist.
func New(name string, optionBytes []byte, storage storage.Storage) (Hook, error) {
// If a driver does not exist, returns ErrBuilderDoesNotExist.
func New(name string, options conf.MapConfig, storage storage.Storage) (Hook, error) {
driversM.RLock()
defer driversM.RUnlock()
var d Driver
d, ok := drivers[name]
var newHook Builder
newHook, ok := drivers[name]
if !ok {
return nil, ErrDriverDoesNotExist
return nil, ErrBuilderDoesNotExist
}
return d.NewHook(optionBytes, storage)
return newHook(options, storage)
}
// Config is the generic configuration format used for all registered Hooks.
type Config struct {
Name string `yaml:"name"`
Options map[string]any `yaml:"options"`
Name string
Options conf.MapConfig
}
// HooksFromHookConfigs is a utility function for initializing Hooks in bulk.
func HooksFromHookConfigs(cfgs []Config, storage storage.Storage) (hooks []Hook, err error) {
for _, cfg := range cfgs {
// Marshal the options back into bytes.
var optionBytes []byte
optionBytes, err = yaml.Marshal(cfg.Options)
if err != nil {
return
// each element of configs must contain pairs `name` - string and `options` - map[string]any
func HooksFromHookConfigs(configs []conf.MapConfig, storage storage.Storage) (hooks []Hook, err error) {
for _, cfg := range configs {
var c Config
if err = cfg.Unmarshal(&c); err != nil {
break
}
var h Hook
h, err = New(cfg.Name, optionBytes, storage)
h, err = New(c.Name, c.Options, storage)
if err != nil {
return
break
}
hooks = append(hooks, h)
@@ -5,6 +5,7 @@ import (
"sync"
"github.com/sot-tech/mochi/bittorrent"
"github.com/sot-tech/mochi/pkg/conf"
"github.com/sot-tech/mochi/storage"
)
@@ -12,7 +13,7 @@ import (
const DefaultStorageCtxName = "MW_APPROVAL"
// Builder function that creates and configures specific container
type Builder func([]byte, storage.Storage) (Container, error)
type Builder func(conf.MapConfig, storage.Storage) (Container, error)
var (
buildersMU sync.Mutex
@@ -42,7 +43,7 @@ type Container interface {
}
// GetContainer creates Container by its name and provided confBytes
func GetContainer(name string, confBytes []byte, storage storage.Storage) (Container, error) {
func GetContainer(name string, config conf.MapConfig, storage storage.Storage) (Container, error) {
buildersMU.Lock()
defer buildersMU.Unlock()
var err error
@@ -50,7 +51,7 @@ func GetContainer(name string, confBytes []byte, storage storage.Storage) (Conta
if builder, exist := builders[name]; !exist {
err = ErrContainerDoesNotExist
} else {
cn, err = builder(confBytes, storage)
cn, err = builder(config, storage)
}
return cn, err
}
@@ -10,11 +10,11 @@ import (
"github.com/anacrolix/torrent/metainfo"
"github.com/anacrolix/torrent/util/dirwatch"
"github.com/minio/sha256-simd"
"gopkg.in/yaml.v3"
"github.com/sot-tech/mochi/bittorrent"
"github.com/sot-tech/mochi/middleware/torrentapproval/container"
"github.com/sot-tech/mochi/middleware/torrentapproval/container/list"
"github.com/sot-tech/mochi/pkg/conf"
"github.com/sot-tech/mochi/pkg/log"
"github.com/sot-tech/mochi/pkg/stop"
"github.com/sot-tech/mochi/storage"
@@ -30,14 +30,14 @@ func init() {
// Config - implementation of directory container configuration.
// Extends list.Config because uses the same storage and Approved function.
type Config struct {
list.Config `yaml:",inline"`
list.Config
// Path in filesystem where torrent files stored and should be watched
Path string `yaml:"path"`
Path string
}
func build(confBytes []byte, st storage.Storage) (container.Container, error) {
func build(conf conf.MapConfig, st storage.Storage) (container.Container, error) {
c := new(Config)
if err := yaml.Unmarshal(confBytes, c); err != nil {
if err := conf.Unmarshal(c); err != nil {
return nil, fmt.Errorf("unable to deserialise configuration: %w", err)
}
var err error
@@ -5,10 +5,9 @@ package list
import (
"fmt"
"gopkg.in/yaml.v3"
"github.com/sot-tech/mochi/bittorrent"
"github.com/sot-tech/mochi/middleware/torrentapproval/container"
"github.com/sot-tech/mochi/pkg/conf"
"github.com/sot-tech/mochi/pkg/log"
"github.com/sot-tech/mochi/storage"
)
@@ -23,20 +22,20 @@ func init() {
// Config - implementation of list container configuration.
type Config struct {
// HashList static list of HEX-encoded InfoHashes.
HashList []string `yaml:"hash_list"`
HashList []string `cfg:"hash_list"`
// If Invert set to true, all InfoHashes stored in HashList should be blacklisted.
Invert bool `yaml:"invert"`
Invert bool
// StorageCtx is the name of storage context where to store hash list.
// It might be table name, REDIS record key or something else, depending on storage.
StorageCtx string `yaml:"storage_ctx"`
StorageCtx string `cfg:"storage_ctx"`
}
// DUMMY used as value placeholder if storage needs some value with
const DUMMY = "_"
func build(confBytes []byte, st storage.Storage) (container.Container, error) {
func build(conf conf.MapConfig, st storage.Storage) (container.Container, error) {
c := new(Config)
if err := yaml.Unmarshal(confBytes, c); err != nil {
if err := conf.Unmarshal(c); err != nil {
return nil, fmt.Errorf("unable to deserialise configuration: %w", err)
}
l := &List{
+11 -19
View File
@@ -6,11 +6,10 @@ import (
"context"
"fmt"
"gopkg.in/yaml.v3"
"github.com/sot-tech/mochi/bittorrent"
"github.com/sot-tech/mochi/middleware"
"github.com/sot-tech/mochi/middleware/torrentapproval/container"
"github.com/sot-tech/mochi/pkg/conf"
// import directory watcher to enable appropriate support
_ "github.com/sot-tech/mochi/middleware/torrentapproval/container/directory"
@@ -25,40 +24,33 @@ import (
const Name = "torrent approval"
func init() {
middleware.RegisterDriver(Name, driver{})
middleware.RegisterBuilder(Name, build)
}
type baseConfig struct {
// Source - name of container for initial values
Source string `yaml:"initial_source"`
Source string `cfg:"initial_source"`
// Configuration depends on used container
Configuration map[string]any `yaml:"configuration"`
Configuration conf.MapConfig
}
type driver struct{}
func (d driver) NewHook(optionBytes []byte, storage storage.Storage) (middleware.Hook, error) {
func build(options conf.MapConfig, storage storage.Storage) (h middleware.Hook, err error) {
var cfg baseConfig
err := yaml.Unmarshal(optionBytes, &cfg)
if err != nil {
return nil, fmt.Errorf("invalid options for middleware %s: %w", Name, err)
if err = options.Unmarshal(&cfg); err != nil {
return nil, fmt.Errorf("middleware %s: %w", Name, err)
}
if len(cfg.Source) == 0 {
return nil, fmt.Errorf("invalid options for middleware %s: name not provided", Name)
return nil, fmt.Errorf("invalid options for middleware %s: source not provided", Name)
}
if cfg.Configuration == nil {
return nil, fmt.Errorf("invalid options for middleware %s: options not provided", Name)
}
var confBytes []byte
var h *hook
if confBytes, err = yaml.Marshal(cfg.Configuration); err == nil {
var c container.Container
if c, err = container.GetContainer(cfg.Source, confBytes, storage); err == nil {
h = &hook{c}
}
var c container.Container
if c, err = container.GetContainer(cfg.Source, cfg.Configuration, storage); err == nil {
h = &hook{c}
}
return h, err
}
@@ -6,9 +6,9 @@ import (
"testing"
"github.com/stretchr/testify/require"
"gopkg.in/yaml.v3"
"github.com/sot-tech/mochi/bittorrent"
"github.com/sot-tech/mochi/pkg/conf"
"github.com/sot-tech/mochi/storage/memory"
)
@@ -71,10 +71,8 @@ func TestHandleAnnounce(t *testing.T) {
require.Nil(t, err)
for _, tt := range cases {
t.Run(fmt.Sprintf("testing hash %s", tt.ih), func(t *testing.T) {
d := driver{}
cfg, err := yaml.Marshal(tt.cfg)
require.Nil(t, err)
h, err := d.NewHook(cfg, storage)
cfg := conf.MapConfig{"initial_source": tt.cfg.Source, "configuration": tt.cfg.Configuration}
h, err := build(cfg, storage)
require.Nil(t, err)
ctx := context.Background()
+24 -33
View File
@@ -7,11 +7,10 @@ import (
"sync"
"time"
"gopkg.in/yaml.v3"
"github.com/sot-tech/mochi/bittorrent"
"github.com/sot-tech/mochi/middleware"
"github.com/sot-tech/mochi/middleware/pkg/random"
"github.com/sot-tech/mochi/pkg/conf"
"github.com/sot-tech/mochi/storage"
)
@@ -19,43 +18,46 @@ import (
const Name = "interval variation"
func init() {
middleware.RegisterDriver(Name, driver{})
middleware.RegisterBuilder(Name, build)
}
var _ middleware.Driver = driver{}
type driver struct{}
func (d driver) NewHook(optionBytes []byte, _ storage.Storage) (middleware.Hook, error) {
func build(options conf.MapConfig, _ storage.Storage) (h middleware.Hook, err error) {
var cfg Config
err := yaml.Unmarshal(optionBytes, &cfg)
if err != nil {
return nil, fmt.Errorf("invalid options for middleware %s: %w", Name, err)
}
return NewHook(cfg)
if err = options.Unmarshal(&cfg); err != nil {
err = fmt.Errorf("middleware %s: %w", Name, err)
} else {
if err := checkConfig(cfg); err == nil {
h = &hook{
cfg: cfg,
}
}
}
return
}
// ErrInvalidModifyResponseProbability is returned for a config with an invalid
// ModifyResponseProbability.
var ErrInvalidModifyResponseProbability = errors.New("invalid modify_response_probability")
var (
// ErrInvalidModifyResponseProbability is returned for a config with an invalid
// ModifyResponseProbability.
ErrInvalidModifyResponseProbability = errors.New("invalid modify_response_probability")
// ErrInvalidMaxIncreaseDelta is returned for a config with an invalid
// MaxIncreaseDelta.
var ErrInvalidMaxIncreaseDelta = errors.New("invalid max_increase_delta")
// ErrInvalidMaxIncreaseDelta is returned for a config with an invalid
// MaxIncreaseDelta.
ErrInvalidMaxIncreaseDelta = errors.New("invalid max_increase_delta")
)
// Config represents the configuration for the varinterval middleware.
type Config struct {
// ModifyResponseProbability is the probability by which a response will
// be modified.
ModifyResponseProbability float32 `yaml:"modify_response_probability"`
ModifyResponseProbability float32 `cfg:"modify_response_probability"`
// MaxIncreaseDelta is the amount of seconds that will be added at most.
MaxIncreaseDelta int `yaml:"max_increase_delta"`
MaxIncreaseDelta int `cfg:"max_increase_delta"`
// ModifyMinInterval specifies whether min_interval should be increased
// as well.
ModifyMinInterval bool `yaml:"modify_min_interval"`
ModifyMinInterval bool `cfg:"modify_min_interval"`
}
func checkConfig(cfg Config) error {
@@ -75,17 +77,6 @@ type hook struct {
sync.Mutex
}
// NewHook creates a middleware to randomly modify the announce interval from
// the given config.
func NewHook(cfg Config) (h middleware.Hook, err error) {
if err = checkConfig(cfg); err == nil {
h = &hook{
cfg: cfg,
}
}
return
}
func (h *hook) HandleAnnounce(ctx context.Context, req *bittorrent.AnnounceRequest, resp *bittorrent.AnnounceResponse) (context.Context, error) {
s0, s1 := random.DeriveEntropyFromRequest(req)
// Generate a probability p < 1.0.
+3 -1
View File
@@ -8,6 +8,7 @@ import (
"github.com/stretchr/testify/require"
"github.com/sot-tech/mochi/bittorrent"
"github.com/sot-tech/mochi/pkg/conf"
)
var configTests = []struct {
@@ -45,7 +46,8 @@ func TestCheckConfig(t *testing.T) {
}
func TestHandleAnnounce(t *testing.T) {
h, err := NewHook(Config{1.0, 10, true})
c := conf.MapConfig{"modify_response_probability": 1.0, "max_increase_delta": 10, "modify_min_interval": true}
h, err := build(c, nil)
require.Nil(t, err)
require.NotNil(t, h)
+54
View File
@@ -0,0 +1,54 @@
// Package conf contains alias for map encoded configuration
// and structure unmarshaller
package conf
import (
"errors"
"github.com/mitchellh/mapstructure"
"github.com/sot-tech/mochi/pkg/log"
)
// TagName is a tag name, used for decoder customization
const TagName = "cfg"
// ErrNilConfigMap returned if unmarshalling map is nil and could not be
// decoded into structure
var ErrNilConfigMap = errors.New("unable to process nil map")
// MapConfig is just alias for map[string]any
type MapConfig map[string]any
// LogFields just returns this map as a set of Logrus fields.
func (m MapConfig) LogFields() log.Fields {
return log.Fields(m)
}
// Unmarshal decodes receiver map into provided structure.
// Decoder configured to automatically unmarshal inherited structures,
// convert string-ed duration (1s, 2m, 3h...) into time.Duration and
// string representation IP into net.IP.
// Tag used for decode customization is conf.TagName.
func (m MapConfig) Unmarshal(into any) (err error) {
if m != nil {
if len(m) > 0 {
conf := &mapstructure.DecoderConfig{
DecodeHook: mapstructure.ComposeDecodeHookFunc(
mapstructure.StringToTimeDurationHookFunc(),
mapstructure.StringToIPHookFunc(),
),
Squash: true,
Result: into,
TagName: TagName,
}
var decoder *mapstructure.Decoder
if decoder, err = mapstructure.NewDecoder(conf); err == nil {
err = decoder.Decode(m)
}
}
} else {
err = ErrNilConfigMap
}
return
}
+7 -17
View File
@@ -12,9 +12,8 @@ import (
"sync"
"time"
"gopkg.in/yaml.v3"
"github.com/sot-tech/mochi/bittorrent"
"github.com/sot-tech/mochi/pkg/conf"
"github.com/sot-tech/mochi/pkg/log"
"github.com/sot-tech/mochi/pkg/stop"
"github.com/sot-tech/mochi/pkg/timecache"
@@ -39,29 +38,20 @@ func init() {
type driver struct{}
func (d driver) NewStorage(icfg any) (storage.Storage, error) {
// Marshal the config back into bytes.
bytes, err := yaml.Marshal(icfg)
if err != nil {
return nil, err
}
// Unmarshal the bytes into the proper config type.
func (d driver) NewStorage(icfg conf.MapConfig) (storage.Storage, error) {
var cfg Config
err = yaml.Unmarshal(bytes, &cfg)
if err != nil {
if err := icfg.Unmarshal(&cfg); err != nil {
return nil, err
}
return New(cfg)
}
// Config holds the configuration of a memory Storage.
type Config struct {
GarbageCollectionInterval time.Duration `yaml:"gc_interval"`
PrometheusReportingInterval time.Duration `yaml:"prometheus_reporting_interval"`
PeerLifetime time.Duration `yaml:"peer_lifetime"`
ShardCount int `yaml:"shard_count"`
GarbageCollectionInterval time.Duration `cfg:"gc_interval"`
PrometheusReportingInterval time.Duration `cfg:"prometheus_reporting_interval"`
PeerLifetime time.Duration `cfg:"peer_lifetime"`
ShardCount int `cfg:"shard_count"`
}
// LogFields renders the current config as a set of Logrus fields.
+20 -26
View File
@@ -29,9 +29,9 @@ import (
"time"
"github.com/go-redis/redis/v8"
"gopkg.in/yaml.v3"
"github.com/sot-tech/mochi/bittorrent"
"github.com/sot-tech/mochi/pkg/conf"
"github.com/sot-tech/mochi/pkg/log"
"github.com/sot-tech/mochi/pkg/stop"
"github.com/sot-tech/mochi/pkg/timecache"
@@ -73,17 +73,11 @@ func init() {
type driver struct{}
func (d driver) NewStorage(icfg any) (storage.Storage, error) {
// Marshal the config back into bytes.
bytes, err := yaml.Marshal(icfg)
if err != nil {
return nil, err
}
func (d driver) NewStorage(icfg conf.MapConfig) (storage.Storage, error) {
// Unmarshal the bytes into the proper config type.
var cfg Config
err = yaml.Unmarshal(bytes, &cfg)
if err != nil {
if err := icfg.Unmarshal(&cfg); err != nil {
return nil, err
}
@@ -92,20 +86,20 @@ func (d driver) NewStorage(icfg any) (storage.Storage, error) {
// Config holds the configuration of a redis Storage.
type Config struct {
GarbageCollectionInterval time.Duration `yaml:"gc_interval"`
PrometheusReportingInterval time.Duration `yaml:"prometheus_reporting_interval"`
PeerLifetime time.Duration `yaml:"peer_lifetime"`
Addresses []string `yaml:"addresses"`
Login string `yaml:"login"`
Password string `yaml:"password"`
Sentinel bool `yaml:"sentinel"`
SentinelMaster string `yaml:"sentinel_master"`
Cluster bool `yaml:"cluster"`
DB int `yaml:"db"`
PoolSize int `yaml:"pool_size"`
ReadTimeout time.Duration `yaml:"read_timeout"`
WriteTimeout time.Duration `yaml:"write_timeout"`
ConnectTimeout time.Duration `yaml:"connect_timeout"`
GarbageCollectionInterval time.Duration `cfg:"gc_interval"`
PrometheusReportingInterval time.Duration `cfg:"prometheus_reporting_interval"`
PeerLifetime time.Duration `cfg:"peer_lifetime"`
Addresses []string
DB int
PoolSize int `cfg:"pool_size"`
Login string
Password string
Sentinel bool
SentinelMaster string `cfg:"sentinel_master"`
Cluster bool
ReadTimeout time.Duration `cfg:"read_timeout"`
WriteTimeout time.Duration `cfg:"write_timeout"`
ConnectTimeout time.Duration `cfg:"connect_timeout"`
}
// LogFields renders the current config as a set of Logrus fields.
@@ -230,6 +224,7 @@ func connect(cfg Config) (*store, error) {
SentinelAddrs: cfg.Addresses,
SentinelUsername: cfg.Login,
SentinelPassword: cfg.Password,
MasterName: cfg.SentinelMaster,
DialTimeout: cfg.ConnectTimeout,
ReadTimeout: cfg.ReadTimeout,
WriteTimeout: cfg.WriteTimeout,
@@ -411,10 +406,9 @@ func (ps *store) PutSeeder(ih bittorrent.InfoHash, peer bittorrent.Peer) error {
ihSummaryKey, ihPeerKey, cntPeerKey = ih4Key, ih4SeederKey, cnt4SeederKey
}
ihPeerKey += ih.RawString()
now := ps.getClock()
return ps.tx(func(tx redis.Pipeliner) (err error) {
if err = tx.HSet(ps.ctx, ihPeerKey, peer.RawString(), now).Err(); err != nil {
if err = tx.HSet(ps.ctx, ihPeerKey, peer.RawString(), ps.getClock()).Err(); err != nil {
return
}
if err = ps.con.Incr(ps.ctx, cntPeerKey).Err(); err != nil {
+4 -4
View File
@@ -11,7 +11,7 @@ import (
"github.com/sot-tech/mochi/storage/test"
)
var conf = Config{
var cfg = Config{
GarbageCollectionInterval: 10 * time.Minute,
PrometheusReportingInterval: 10 * time.Minute,
PeerLifetime: 30 * time.Minute,
@@ -23,7 +23,7 @@ var conf = Config{
func createNew() s.Storage {
var ps s.Storage
var err error
ps, err = New(conf)
ps, err = New(cfg)
if err != nil {
fmt.Println("unable to create real Redis connection: ", err, " using simulator")
var rs *miniredis.Miniredis
@@ -31,8 +31,8 @@ func createNew() s.Storage {
if err != nil {
panic(err)
}
conf.Addresses = []string{rs.Addr()}
ps, err = New(conf)
cfg.Addresses = []string{rs.Addr()}
ps, err = New(cfg)
}
if err != nil {
panic(err)
+3 -2
View File
@@ -6,6 +6,7 @@ import (
"time"
"github.com/sot-tech/mochi/bittorrent"
"github.com/sot-tech/mochi/pkg/conf"
"github.com/sot-tech/mochi/pkg/log"
"github.com/sot-tech/mochi/pkg/stop"
)
@@ -23,7 +24,7 @@ type Entry struct {
// Driver is the interface used to initialize a new type of Storage.
type Driver interface {
NewStorage(cfg any) (Storage, error)
NewStorage(cfg conf.MapConfig) (Storage, error)
}
// ErrResourceDoesNotExist is the error returned by all delete methods and the
@@ -167,7 +168,7 @@ func RegisterDriver(name string, d Driver) {
// the list of registered Drivers.
//
// If a driver does not exist, returns ErrDriverDoesNotExist.
func NewStorage(name string, cfg any) (ps Storage, err error) {
func NewStorage(name string, cfg conf.MapConfig) (ps Storage, err error) {
driversM.RLock()
defer driversM.RUnlock()