Merge pull request #8 from sot-tech/zerolog

Zerolog
This commit is contained in:
SOT-TECH
2022-05-02 18:37:40 +08:00
committed by GitHub
41 changed files with 1068 additions and 1005 deletions
+8 -6
View File
@@ -44,14 +44,15 @@ jobs:
go-version: "^1.18"
- name: "Install and configure mochi"
run: |
go install --tags e2e ./cmd/mochi
go install ./cmd/mochi
go install ./cmd/mochi-e2e
cat ./dist/example_config.yaml
- name: "Run end-to-end tests"
run: |
mochi --config=./dist/example_config.yaml --debug &
mochi --config=./dist/example_config.yaml --logLevel debug --logPretty &
pid=$!
sleep 2
mochi e2e --debug
mochi-e2e
kill $pid
e2e-redis:
name: "E2E Redis Tests"
@@ -67,15 +68,16 @@ jobs:
go-version: "^1.18"
- name: "Install and configure mochi"
run: |
go install --tags e2e ./cmd/mochi
go install ./cmd/mochi
go install ./cmd/mochi-e2e
curl -LO https://github.com/jzelinskie/faq/releases/download/0.0.6/faq-linux-amd64
chmod +x faq-linux-amd64
./faq-linux-amd64 '.mochi.storage = {"config":{"gc_interval":"3m","peer_lifetime":"31m","prometheus_reporting_interval":"1s","connect_timeout":"15s","read_timeout":"15s","write_timeout":"15s"},"name":"redis"}' ./dist/example_config.yaml > ./dist/example_redis_config.yaml
cat ./dist/example_redis_config.yaml
- name: "Run end-to-end tests"
run: |
mochi --config=./dist/example_redis_config.yaml --debug &
mochi --config=./dist/example_redis_config.yaml --logLevel debug --logPretty &
pid=$!
sleep 2
mochi e2e --debug
mochi-e2e
kill $pid
+3 -2
View File
@@ -18,7 +18,7 @@ Modified version of [Chihaya](https://github.com/chihaya/chihaya), an open sourc
* Allows mixed peers: IPv4 requesters can fetch IPv6 peers or vice versa;
* Contains some internal improvements.
_Note: From time to time MoChi fetch modifications from Chihaya but is not
_Note: From time to time MoChi fetch modifications from Chihaya but is not
fully compatible with original project (mainly in Redis storage structure),
so it cannot be mixed with Chihaya (i.e. it is impossible create MoChi-Chihaya cluster)._
@@ -26,7 +26,8 @@ so it cannot be mixed with Chihaya (i.e. it is impossible create MoChi-Chihaya c
The main goal of made modifications is to create semi-private tracker like [Hefur](https://github.com/sot-tech/hefur)
but with cluster support (allowed torrents limited by pre-existent `list` middleware and another `directory` middleware
to [limit registered torrents](docs/middleware/torrent_approval.md)) and to maximize torrent swarm by providing maximum peers as possible (IPv4+IPv6).
to [limit registered torrents](docs/middleware/torrent_approval.md)) and to maximize torrent swarm by providing maximum
peers as possible (IPv4+IPv6).
## Notice
+16 -27
View File
@@ -13,8 +13,7 @@ import (
"net/netip"
"github.com/pkg/errors"
"github.com/sot-tech/mochi/pkg/log"
"github.com/rs/zerolog"
)
// PeerIDLen is length of peer id field in bytes
@@ -126,6 +125,14 @@ type Scrape struct {
Incomplete uint32
}
// MarshalZerologObject writes fields into zerolog event
func (s Scrape) MarshalZerologObject(e *zerolog.Event) {
e.Stringer("infoHash", s.InfoHash).
Uint32("snatches", s.Snatches).
Uint32("complete", s.Complete).
Uint32("incomplete", s.Incomplete)
}
// Peer represents the connection details of a peer that is returned in an
// announce response.
type Peer struct {
@@ -164,13 +171,6 @@ func NewPeer(data string) (Peer, error) {
return peer, err
}
// String implements fmt.Stringer to return a human-readable representation.
// The string will have the format <PeerID>@[<IP>]:<port>, for example
// "0102030405060708090a0b0c0d0e0f1011121314@[10.11.12.13]:1234"
func (p Peer) String() string {
return fmt.Sprintf("%s@[%s]:%d", p.ID, p.Addr(), p.Port())
}
// RawString generates concatenation of PeerID, net port and IP-address
func (p Peer) RawString() string {
ip := p.Addr()
@@ -181,29 +181,18 @@ func (p Peer) RawString() string {
return string(b)
}
// LogFields renders the current peer as a set of Logrus fields.
func (p Peer) LogFields() log.Fields {
return log.Fields{
"id": p.ID,
"ip": p.Addr(),
"port": p.Port(),
}
}
// Equal reports whether p and x are the same.
func (p Peer) Equal(x Peer) bool { return p.EqualEndpoint(x) && p.ID == x.ID }
// EqualEndpoint reports whether p and x have the same endpoint.
func (p Peer) EqualEndpoint(x Peer) bool {
return p.Port() == x.Port() &&
p.Addr().Compare(x.Addr()) == 0
}
// Addr returns unmapped peer's IP address
func (p Peer) Addr() netip.Addr {
return p.AddrPort.Addr().Unmap()
}
// MarshalZerologObject writes fields into zerolog event
func (p Peer) MarshalZerologObject(e *zerolog.Event) {
e.Stringer("id", p.ID).
Stringer("address", p.Addr()).
Uint16("port", p.Port())
}
// ClientError represents an error that should be exposed to the client over
// the BitTorrent protocol implementation.
type ClientError string
-57
View File
@@ -1,57 +0,0 @@
package bittorrent
import (
"fmt"
"net/netip"
"testing"
"github.com/stretchr/testify/require"
)
var (
b = []byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20}
expected = "0102030405060708090a0b0c0d0e0f1011121314"
)
func TestPeerID_String(t *testing.T) {
pid, err := NewPeerID(b)
require.Nil(t, err)
s := pid.String()
require.Equal(t, expected, s)
}
func TestInfoHash_String(t *testing.T) {
ih, err := NewInfoHash(b)
require.Nil(t, err)
require.Equal(t, expected, ih.String())
}
func TestPeer_String(t *testing.T) {
pid, err := NewPeerID(b)
require.Nil(t, err)
id, _ := NewPeerID(b)
peerStringTestCases := []struct {
input Peer
expected string
}{
{
input: Peer{
ID: id,
AddrPort: netip.MustParseAddrPort("10.11.12.1:1234"),
},
expected: fmt.Sprintf("%s@[10.11.12.1]:1234", expected),
},
{
input: Peer{
ID: id,
AddrPort: netip.MustParseAddrPort("[2001:db8::ff00:42:8329]:1234"),
},
expected: fmt.Sprintf("%s@[2001:db8::ff00:42:8329]:1234", expected),
},
}
for _, c := range peerStringTestCases {
c.input.ID = pid
got := c.input.String()
require.Equal(t, c.expected, got)
}
}
+32 -25
View File
@@ -24,42 +24,49 @@ const (
// Completed is the event sent by a BitTorrent client when it finishes
// downloading all of the required chunks.
Completed
// NoneStr string representation of None event
NoneStr = "none"
// StartedStr string representation of Started event
StartedStr = "started"
// StoppedStr string representation of Stopped event
StoppedStr = "stopped"
// CompletedStr string representation of Completed event
CompletedStr = "completed"
)
var (
eventToString = make(map[Event]string)
stringToEvent = make(map[string]Event)
)
func init() {
eventToString[None] = "none"
eventToString[Started] = "started"
eventToString[Stopped] = "stopped"
eventToString[Completed] = "completed"
stringToEvent[""] = None
for k, v := range eventToString {
stringToEvent[v] = k
}
}
// NewEvent returns the proper Event given a string.
func NewEvent(eventStr string) (evt Event, err error) {
if e, ok := stringToEvent[strings.ToLower(eventStr)]; ok {
evt = e
} else {
switch strings.ToLower(eventStr) {
case NoneStr, "":
evt = None
case StartedStr:
evt = Started
case StoppedStr:
evt = Stopped
case CompletedStr:
evt = Completed
default:
evt, err = None, ErrUnknownEvent
}
return
}
// String implements Stringer for an event.
func (e Event) String() (s string) {
if name, ok := eventToString[e]; ok {
s = name
} else {
switch e {
case None:
s = NoneStr
case Started:
s = StartedStr
case Stopped:
s = StoppedStr
case Completed:
s = CompletedStr
default:
s = "<unknown>"
}
return
+8 -3
View File
@@ -6,7 +6,7 @@ import (
"strconv"
"strings"
"github.com/sot-tech/mochi/pkg/log"
"github.com/rs/zerolog"
)
// Params is used to fetch (optional) request parameters from an Announce.
@@ -31,6 +31,8 @@ type Params interface {
// For a request of the form "/announce?port=1234" this would return
// "port=1234"
RawQuery() string
zerolog.LogObjectMarshaler
}
var (
@@ -151,7 +153,6 @@ func parseQuery(query string) (q *QueryParams, err error) {
// But frontends record these errors to prometheus, which generates
// a lot of time series.
// We log it here for debugging instead.
log.Debug("failed to unescape query param key", log.Err(err))
return nil, ErrInvalidQueryEscape
}
value, err = url.QueryUnescape(value)
@@ -160,7 +161,6 @@ func parseQuery(query string) (q *QueryParams, err error) {
// But frontends record these errors to prometheus, which generates
// a lot of time series.
// We log it here for debugging instead.
log.Debug("failed to unescape query param value", log.Err(err))
return nil, ErrInvalidQueryEscape
}
@@ -210,3 +210,8 @@ func (qp *QueryParams) RawPath() string {
func (qp *QueryParams) RawQuery() string {
return qp.query
}
// MarshalZerologObject writes fields into zerolog event
func (qp QueryParams) MarshalZerologObject(e *zerolog.Event) {
e.Str("path", qp.path).Str("query", qp.query)
}
+81 -50
View File
@@ -1,12 +1,11 @@
package bittorrent
import (
"fmt"
"net/netip"
"sort"
"time"
"github.com/sot-tech/mochi/pkg/log"
"github.com/rs/zerolog"
)
// RequestAddress wrapper for netip.Addr with Provided flag.
@@ -21,14 +20,9 @@ func (a RequestAddress) Validate() bool {
return a.IsValid() && !a.IsUnspecified()
}
func (a RequestAddress) String() string {
var p string
if a.Provided {
p = "(provided)"
} else {
p = "(detected)"
}
return fmt.Sprint(a.Addr.String(), p)
// MarshalZerologObject writes fields into zerolog event
func (a RequestAddress) MarshalZerologObject(e *zerolog.Event) {
e.Stringer("address", a.Addr).Bool("provided", a.Provided)
}
// RequestAddresses is an array of RequestAddress used mainly for
@@ -89,6 +83,23 @@ func (aa RequestAddresses) GetFirst() netip.Addr {
return a
}
// MarshalZerologArray writes array elements to zerolog event
func (aa RequestAddresses) MarshalZerologArray(a *zerolog.Array) {
for _, addr := range aa {
a.Object(addr)
}
}
// Peers wrapper of array of Peer-s
type Peers []Peer
// MarshalZerologArray writes array elements to zerolog event
func (p Peers) MarshalZerologArray(a *zerolog.Array) {
for _, peer := range p {
a.Object(peer)
}
}
// RequestPeer is bundle of peer ID, provided or
// determined addresses and net port
type RequestPeer struct {
@@ -99,7 +110,7 @@ type RequestPeer struct {
// Peers constructs array of Peer-s with the same ID and Port
// for every RequestAddress array.
func (rp RequestPeer) Peers() (peers []Peer) {
func (rp RequestPeer) Peers() (peers Peers) {
for _, a := range rp.RequestAddresses {
peers = append(peers, Peer{
ID: rp.ID,
@@ -109,6 +120,13 @@ func (rp RequestPeer) Peers() (peers []Peer) {
return
}
// MarshalZerologObject writes fields into zerolog event
func (rp RequestPeer) MarshalZerologObject(e *zerolog.Event) {
e.Stringer("id", rp.ID).
Array("addresses", rp.RequestAddresses).
Uint16("port", rp.Port)
}
// AnnounceRequest represents the parsed parameters from an announce request.
type AnnounceRequest struct {
Event Event
@@ -125,21 +143,19 @@ type AnnounceRequest struct {
Params
}
// LogFields renders the current response as a set of log fields.
func (r AnnounceRequest) LogFields() log.Fields {
return log.Fields{
"event": r.Event,
"infoHash": r.InfoHash,
"compact": r.Compact,
"eventProvided": r.EventProvided,
"numWantProvided": r.NumWantProvided,
"numWant": r.NumWant,
"left": r.Left,
"downloaded": r.Downloaded,
"uploaded": r.Uploaded,
"peers": r.RequestPeer,
"params": r.Params,
}
// MarshalZerologObject writes fields into zerolog event
func (r AnnounceRequest) MarshalZerologObject(e *zerolog.Event) {
e.Stringer("event", r.Event).
Stringer("infoHash", r.InfoHash).
Bool("compact", r.Compact).
Bool("eventProvided", r.EventProvided).
Bool("numWantProvided", r.NumWantProvided).
Uint32("numWant", r.NumWant).
Uint64("left", r.Left).
Uint64("downloaded", r.Downloaded).
Uint64("uploaded", r.Uploaded).
Object("peers", r.RequestPeer).
Object("params", r.Params)
}
// AnnounceResponse represents the parameters used to create an announce
@@ -150,19 +166,28 @@ type AnnounceResponse struct {
Incomplete uint32
Interval time.Duration
MinInterval time.Duration
IPv4Peers []Peer
IPv6Peers []Peer
IPv4Peers Peers
IPv6Peers Peers
}
// LogFields renders the current response as a set of log fields.
func (r AnnounceResponse) LogFields() log.Fields {
return log.Fields{
"compact": r.Compact,
"complete": r.Complete,
"interval": r.Interval,
"minInterval": r.MinInterval,
"ipv4Peers": r.IPv4Peers,
"ipv6Peers": r.IPv6Peers,
// MarshalZerologObject writes fields into zerolog event
func (r AnnounceResponse) MarshalZerologObject(e *zerolog.Event) {
e.Bool("compact", r.Compact).
Uint32("complete", r.Complete).
Uint32("incomplete", r.Incomplete).
Dur("interval", r.Interval).
Dur("minInterval", r.MinInterval).
Array("ipv4Peers", r.IPv4Peers).
Array("ipv6Peers", r.IPv6Peers)
}
// InfoHashes wrapper of array of InfoHash-es
type InfoHashes []InfoHash
// MarshalZerologArray writes array elements to zerolog event
func (i InfoHashes) MarshalZerologArray(a *zerolog.Array) {
for _, ih := range i {
a.Str(ih.String())
}
}
@@ -171,16 +196,24 @@ type ScrapeRequest struct {
// RequestAddresses not used in internal logic,
// but MAY be used in middleware (per-ip block etc.)
RequestAddresses
InfoHashes []InfoHash
InfoHashes InfoHashes
Params Params
}
// LogFields renders the current response as a set of log fields.
func (r ScrapeRequest) LogFields() log.Fields {
return log.Fields{
"ip": r.RequestAddresses,
"infoHashes": r.InfoHashes,
"params": r.Params,
// MarshalZerologObject writes fields into zerolog event
func (r ScrapeRequest) MarshalZerologObject(e *zerolog.Event) {
e.Array("addresses", r.RequestAddresses).
Array("infoHashes", r.InfoHashes).
Object("params", r.Params)
}
// Scrapes wrapper of array of Scrape-s
type Scrapes []Scrape
// MarshalZerologArray writes array elements to zerolog event
func (s Scrapes) MarshalZerologArray(a *zerolog.Array) {
for _, scrape := range s {
a.Object(scrape)
}
}
@@ -189,12 +222,10 @@ func (r ScrapeRequest) LogFields() log.Fields {
// The Scrapes must be in the same order as the InfoHashes in the corresponding
// ScrapeRequest.
type ScrapeResponse struct {
Files []Scrape
Files Scrapes
}
// LogFields renders the current response as a set of Logrus fields.
func (sr ScrapeResponse) LogFields() log.Fields {
return log.Fields{
"files": sr.Files,
}
// MarshalZerologObject writes fields into zerolog event
func (sr ScrapeResponse) MarshalZerologObject(e *zerolog.Event) {
e.Array("scrapes", sr.Files)
}
+5 -10
View File
@@ -5,6 +5,7 @@ import (
)
var (
logger = log.NewLogger("request sanitizer")
// ErrInvalidIP indicates an invalid IP for an Announce.
ErrInvalidIP = ClientError("invalid IP")
@@ -15,6 +16,7 @@ var (
// SanitizeAnnounce enforces a max and default NumWant and coerces the peer's
// IP address into the proper format.
func SanitizeAnnounce(r *AnnounceRequest, maxNumWant, defaultNumWant uint32) error {
logger.Trace().Object("request", r).Msg("source announce")
if r.Port == 0 {
return ErrInvalidPort
}
@@ -29,18 +31,14 @@ func SanitizeAnnounce(r *AnnounceRequest, maxNumWant, defaultNumWant uint32) err
r.NumWant = maxNumWant
}
log.Debug("sanitized announce", r, log.Fields{
"port": r.Port,
"addresses": r.RequestAddresses,
"maxNumWant": maxNumWant,
"defaultNumWant": defaultNumWant,
})
logger.Trace().Object("request", r).Msg("sanitized announce")
return nil
}
// SanitizeScrape enforces a max number of infohashes for a single scrape
// request and checks if addresses are valid.
func SanitizeScrape(r *ScrapeRequest, maxScrapeInfoHashes uint32) error {
logger.Trace().Object("request", r).Msg("source scrape")
if len(r.InfoHashes) > int(maxScrapeInfoHashes) {
r.InfoHashes = r.InfoHashes[:maxScrapeInfoHashes]
}
@@ -49,9 +47,6 @@ func SanitizeScrape(r *ScrapeRequest, maxScrapeInfoHashes uint32) error {
return ErrInvalidIP
}
log.Debug("sanitized scrape", r, log.Fields{
"addresses": r.RequestAddresses,
"maxScrapeInfoHashes": maxScrapeInfoHashes,
})
logger.Trace().Object("request", r).Msg("sanitized scrape")
return nil
}
+19 -51
View File
@@ -1,82 +1,50 @@
//go:build e2e
// +build e2e
package main
import (
"flag"
"fmt"
"log"
"math/rand"
"time"
"github.com/anacrolix/torrent/tracker"
"github.com/spf13/cobra"
"github.com/sot-tech/mochi/bittorrent"
"github.com/sot-tech/mochi/pkg/log"
)
func init() {
e2eCmd = &cobra.Command{
Use: "e2e",
Short: "exec e2e tests",
Long: "Execute the Conf end-to-end test suite",
RunE: EndToEndRunCmdFunc,
}
func main() {
httpAddress := flag.String("httpaddr", "http://127.0.0.1:6969/announce", "address of the HTTP tracker")
udpAddress := flag.String("udpaddr", "udp://127.0.0.1:6969", "address of the UDP tracker")
delay := flag.Duration("delay", time.Second, "delay between announces")
flag.Parse()
e2eCmd.Flags().String("httpaddr", "http://127.0.0.1:6969/announce", "address of the HTTP tracker")
e2eCmd.Flags().String("udpaddr", "udp://127.0.0.1:6969", "address of the UDP tracker")
e2eCmd.Flags().Duration("delay", time.Second, "delay between announces")
}
// EndToEndRunCmdFunc implements a Cobra command that runs the end-to-end test
// suite for a Conf build.
func EndToEndRunCmdFunc(cmd *cobra.Command, args []string) error {
delay, err := cmd.Flags().GetDuration("delay")
if err != nil {
return err
}
// Test the HTTP tracker
httpAddr, err := cmd.Flags().GetString("httpaddr")
if err != nil {
return err
}
if len(httpAddr) != 0 {
log.Info("testing HTTP...")
err := test(httpAddr, delay)
if len(*httpAddress) != 0 {
log.Println("testing HTTP...")
err := test(*httpAddress, *delay)
if err != nil {
return err
log.Fatal(err)
}
log.Info("success")
log.Println("success")
}
// Test the UDP tracker.
udpAddr, err := cmd.Flags().GetString("udpaddr")
if err != nil {
return err
}
if len(udpAddr) != 0 {
log.Info("testing UDP...")
err := test(udpAddr, delay)
if len(*udpAddress) != 0 {
log.Println("testing UDP...")
err := test(*udpAddress, *delay)
if err != nil {
return err
log.Fatal(err)
}
log.Info("success")
log.Println("success")
}
return nil
}
func test(addr string, delay time.Duration) error {
b := make([]byte, bittorrent.InfoHashV1Len)
rand.Read(b)
ih, _ := bittorrent.NewInfoHash(b)
return testWithInfohash(ih, addr, delay)
return testWithInfoHash(ih, addr, delay)
}
func testWithInfohash(infoHash bittorrent.InfoHash, url string, delay time.Duration) error {
func testWithInfoHash(infoHash bittorrent.InfoHash, url string, delay time.Duration) error {
var ih [bittorrent.InfoHashV1Len]byte
req := tracker.AnnounceRequest{
InfoHash: ih,
+28 -224
View File
@@ -1,239 +1,43 @@
package main
import (
"context"
"errors"
"fmt"
"flag"
"log"
"os"
"os/signal"
"runtime"
"strings"
"syscall"
"github.com/sirupsen/logrus"
"github.com/spf13/cobra"
"github.com/sot-tech/mochi/frontend/http"
"github.com/sot-tech/mochi/frontend/udp"
"github.com/sot-tech/mochi/middleware"
"github.com/sot-tech/mochi/pkg/conf"
"github.com/sot-tech/mochi/pkg/log"
"github.com/sot-tech/mochi/pkg/metrics"
_ "github.com/sot-tech/mochi/pkg/randseed"
"github.com/sot-tech/mochi/pkg/stop"
"github.com/sot-tech/mochi/storage"
l "github.com/sot-tech/mochi/pkg/log"
)
var e2eCmd *cobra.Command
// Run represents the state of a running instance of Conf.
type Run struct {
configFilePath string
storage storage.PeerStorage
logic *middleware.Logic
sg *stop.Group
}
// NewRun runs an instance of Conf.
func NewRun(configFilePath string) (*Run, error) {
r := &Run{
configFilePath: configFilePath,
}
return r, r.Start(nil)
}
// Start begins an instance of Conf.
// It is optional to provide an instance of the peer store to avoid the
// creation of a new one.
func (r *Run) Start(ps storage.PeerStorage) error {
configFile, err := ParseConfigFile(r.configFilePath)
if err != nil {
return fmt.Errorf("failed to read config: %w", err)
}
cfg := configFile.Conf
r.sg = stop.NewGroup()
if len(cfg.MetricsAddr) > 0 {
log.Info("starting metrics server", log.Fields{"addr": cfg.MetricsAddr})
r.sg.Add(metrics.NewServer(cfg.MetricsAddr))
} else {
log.Info("metrics disabled because of empty address")
}
if ps == nil {
log.Info("starting storage", log.Fields{"name": cfg.Storage.Name})
ps, err = storage.NewStorage(cfg.Storage.Name, cfg.Storage.Config)
if err != nil {
return fmt.Errorf("failed to create storage: %w", err)
}
log.Info("started storage", ps)
}
r.storage = ps
preHooks, err := middleware.HooksFromHookConfigs(cfg.PreHooks, r.storage)
if err != nil {
return fmt.Errorf("failed to validate hook config: %w", err)
}
postHooks, err := middleware.HooksFromHookConfigs(cfg.PostHooks, r.storage)
if err != nil {
return fmt.Errorf("failed to validate hook config: %w", err)
}
r.logic = middleware.NewLogic(cfg.AnnounceInterval, cfg.MinAnnounceInterval, r.storage, preHooks, postHooks)
if len(cfg.HTTPConfig) > 0 {
log.Info("starting HTTP frontend", cfg.HTTPConfig)
httpFE, err := http.NewFrontend(r.logic, cfg.HTTPConfig)
if err == nil {
r.sg.Add(httpFE)
} else if !errors.Is(err, conf.ErrNilConfigMap) {
return err
}
}
if len(cfg.UDPConfig) > 0 {
log.Info("starting UDP frontend", cfg.UDPConfig)
udpFE, err := udp.NewFrontend(r.logic, cfg.UDPConfig)
if err == nil {
r.sg.Add(udpFE)
} else if !errors.Is(err, conf.ErrNilConfigMap) {
return err
}
}
return nil
}
func combineErrors(prefix string, errs []error) error {
errStrs := make([]string, 0, len(errs))
for _, err := range errs {
errStrs = append(errStrs, err.Error())
}
return errors.New(prefix + ": " + strings.Join(errStrs, "; "))
}
// Stop shuts down an instance of Conf.
func (r *Run) Stop(keepPeerStore bool) (storage.PeerStorage, error) {
log.Debug("stopping frontends and metrics server")
if errs := r.sg.Stop().Wait(); len(errs) != 0 {
return nil, combineErrors("failed while shutting down frontends", errs)
}
log.Debug("stopping logic")
if errs := r.logic.Stop().Wait(); len(errs) != 0 {
return nil, combineErrors("failed while shutting down middleware", errs)
}
if !keepPeerStore {
log.Debug("stopping peer store")
if errs := r.storage.Stop().Wait(); len(errs) != 0 {
return nil, combineErrors("failed while shutting down peer store", errs)
}
r.storage = nil
}
return r.storage, nil
}
// RootRunCmdFunc implements a Cobra command that runs an instance of Conf
// and handles reloading and shutdown via process signals.
func RootRunCmdFunc(cmd *cobra.Command, _ []string) error {
configFilePath, err := cmd.Flags().GetString("config")
if err != nil {
return err
}
r, err := NewRun(configFilePath)
if err != nil {
return err
}
shutdown, _ := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
reload, _ := signal.NotifyContext(context.Background(), ReloadSignals...)
for {
select {
case <-reload.Done():
log.Info("reloading; received reload signal")
peerStore, err := r.Stop(true)
if err != nil {
return err
}
if err := r.Start(peerStore); err != nil {
return err
}
case <-shutdown.Done():
log.Info("shutting down; received shutdown signal")
if _, err := r.Stop(false); err != nil {
return err
}
return nil
}
}
}
// RootPreRunCmdFunc handles command line flags for the Run command.
func RootPreRunCmdFunc(cmd *cobra.Command, _ []string) error {
noColors, err := cmd.Flags().GetBool("nocolors")
if err != nil {
return err
}
if noColors {
log.SetFormatter(&logrus.TextFormatter{DisableColors: true})
}
jsonLog, err := cmd.Flags().GetBool("json")
if err != nil {
return err
}
if jsonLog {
log.SetFormatter(&logrus.JSONFormatter{})
log.Info("enabled JSON logging")
}
debugLog, err := cmd.Flags().GetBool("debug")
if err != nil {
return err
}
if debugLog {
log.SetDebug(true)
log.Info("enabled debug logging")
}
return nil
}
// RootPostRunCmdFunc handles clean up of any state initialized by command line
// flags.
func RootPostRunCmdFunc(_ *cobra.Command, _ []string) error {
return nil
}
const (
logOutArg = "logOut"
logLevelArg = "logLevel"
logPrettyArg = "logPretty"
logColorsArg = "logColored"
configArg = "config"
)
func main() {
rootCmd := &cobra.Command{
Use: "mochi",
Short: "BitTorrent Tracker",
Long: "A customizable, multi-protocol BitTorrent Tracker",
PersistentPreRunE: RootPreRunCmdFunc,
RunE: RootRunCmdFunc,
PersistentPostRunE: RootPostRunCmdFunc,
var s Server
logOut := flag.String(logOutArg, "stderr", "output for logging, might be 'stderr', 'stdout' or file path")
logLevel := flag.String(logLevelArg, "warn", "logging level: trace, debug, info, warn, error, fatal, panic")
logPretty := flag.Bool(logPrettyArg, false, "enable log pretty print. used only if 'logOut' set to 'stdout' or 'stderr'. if not set, log outputs json")
logColored := flag.Bool(logColorsArg, runtime.GOOS == "windows", "enable log coloring. used only if set 'logPretty'")
configPath := flag.String(configArg, "/etc/mochi.yaml", "location of configuration file")
flag.Parse()
if err := l.ConfigureLogger(*logOut, *logLevel, *logPretty, *logColored); err != nil {
log.Fatal("unable to configure logger ", err)
}
rootCmd.PersistentFlags().Bool("debug", false, "enable debug logging")
rootCmd.PersistentFlags().Bool("json", false, "enable json logging")
rootCmd.PersistentFlags().Bool("nocolors", runtime.GOOS == "windows", "disable log coloring")
rootCmd.Flags().String("config", "/etc/mochi.yaml", "location of configuration file")
if e2eCmd != nil {
rootCmd.AddCommand(e2eCmd)
}
if err := rootCmd.Execute(); err != nil {
log.Fatal("failed when executing root cobra command: " + err.Error())
if err := s.Run(*configPath); err != nil {
log.Fatal("unable to start server ", err)
}
defer s.Dispose()
ch := make(chan os.Signal, 2)
signal.Notify(ch, os.Interrupt, syscall.SIGTERM)
<-ch
}
+106
View File
@@ -0,0 +1,106 @@
package main
import (
"errors"
"fmt"
"github.com/sot-tech/mochi/frontend/http"
"github.com/sot-tech/mochi/frontend/udp"
"github.com/sot-tech/mochi/middleware"
"github.com/sot-tech/mochi/pkg/log"
"github.com/sot-tech/mochi/pkg/metrics"
"github.com/sot-tech/mochi/pkg/stop"
"github.com/sot-tech/mochi/storage"
)
// Server represents the state of a running instance.
type Server struct {
storage storage.PeerStorage
logic *middleware.Logic
sg *stop.Group
}
// Run begins an instance of Conf.
// It is optional to provide an instance of the peer store to avoid the
// creation of a new one.
func (r *Server) Run(configFilePath string) error {
configFile, err := ParseConfigFile(configFilePath)
if err != nil {
return fmt.Errorf("failed to read config: %w", err)
}
cfg := configFile.Conf
r.sg = stop.NewGroup()
if len(cfg.MetricsAddr) > 0 {
log.Info().Str("address", cfg.MetricsAddr).Msg("starting metrics server")
r.sg.Add(metrics.NewServer(cfg.MetricsAddr))
} else {
log.Info().Msg("metrics disabled because of empty address")
}
log.Info().Str("name", cfg.Storage.Name).Msg("starting storage")
r.storage, err = storage.NewStorage(cfg.Storage.Name, cfg.Storage.Config)
if err != nil {
return fmt.Errorf("failed to create storage: %w", err)
}
log.Info().Object("config", r.storage).Msg("started storage")
preHooks, err := middleware.NewHooks(cfg.PreHooks, r.storage)
if err != nil {
return fmt.Errorf("failed to validate hook config: %w", err)
}
postHooks, err := middleware.NewHooks(cfg.PostHooks, r.storage)
if err != nil {
return fmt.Errorf("failed to validate hook config: %w", err)
}
r.logic = middleware.NewLogic(cfg.AnnounceInterval, cfg.MinAnnounceInterval, r.storage, preHooks, postHooks)
var started bool
if len(cfg.HTTPConfig) > 0 {
log.Info().Object("config", cfg.HTTPConfig).Msg("starting HTTP frontend")
httpFE, err := http.NewFrontend(r.logic, cfg.HTTPConfig)
if err == nil {
r.sg.Add(httpFE)
started = true
} else {
return err
}
}
if len(cfg.UDPConfig) > 0 {
log.Info().Object("config", cfg.UDPConfig).Msg("starting UDP frontend")
udpFE, err := udp.NewFrontend(r.logic, cfg.UDPConfig)
if err == nil {
r.sg.Add(udpFE)
started = true
} else {
return err
}
}
if !started {
return errors.New("no frontends configured")
}
return nil
}
// Dispose shuts down an instance of Server.
func (r *Server) Dispose() {
log.Debug().Msg("stopping frontends and metrics server")
if errs := r.sg.Stop().Wait(); len(errs) > 0 {
log.Error().Errs("errors", errs).Msg("error occurred while shutting down frontends")
}
log.Debug().Msg("stopping logic")
if errs := r.logic.Stop().Wait(); len(errs) > 0 {
log.Error().Errs("errors", errs).Msg("error occurred while shutting down middlewares")
}
log.Debug().Msg("stopping peer store")
if errs := r.storage.Stop().Wait(); len(errs) != 0 {
log.Error().Errs("errors", errs).Msg("error occurred while shutting down peer store")
}
log.Close()
}
-14
View File
@@ -1,14 +0,0 @@
//go:build darwin || freebsd || linux || netbsd || openbsd || dragonfly || solaris
package main
import (
"os"
"syscall"
)
// ReloadSignals are the signals that the current OS will send to the process
// when a configuration reload is requested.
var ReloadSignals = []os.Signal{
syscall.SIGUSR1,
}
-14
View File
@@ -1,14 +0,0 @@
//go:build windows
package main
import (
"os"
"syscall"
)
// ReloadSignals are the signals that the current OS will send to the process
// when a configuration reload is requested.
var ReloadSignals = []os.Signal{
syscall.SIGHUP,
}
+3 -3
View File
@@ -24,9 +24,9 @@ There are two sources of hashes: `list` and `directory`.
* `list` is the static set of hashes, specified in configuration file.
* `directory` will watch for `*.torrent` files in specified path and
append/delete records from storage. This source will parse all existing
files at start and then watch for new files to add, or for delete events
to remove hash from storage.
append/delete records from storage. This source will parse all existing
files at start and then watch for new files to add, or for delete events
to remove hash from storage.
Note: if storage is not `memory`, and `preserve` option set to `true`, records
will be persisted in storage until _somebody_ or _something_ (different tool with access
+7 -7
View File
@@ -3,18 +3,18 @@
This storage mainly the same as Redis and uses some of [redis](redis.md) store logic
with next exceptions:
* peers stored in [sets](https://redis.io/docs/manual/data-types/#sets)
instead of [hashes](https://redis.io/docs/manual/data-types/#hashes);
* peers stored in [sets](https://redis.io/docs/manual/data-types/#sets)
instead of [hashes](https://redis.io/docs/manual/data-types/#hashes);
* keys such as `CHI_I`, `CHI_S_C` and `CHI_L_C` not used (at all);
* peer TTL relies on KeyDB's [EXPIREMEMBER](https://docs.keydb.dev/docs/commands/#expiremember)
command, so MoChi does not need to periodically check peer expiration;
* peer TTL relies on KeyDB's [EXPIREMEMBER](https://docs.keydb.dev/docs/commands/#expiremember)
command, so MoChi does not need to periodically check peer expiration;
* storage does not execute periodical statistics collection (peer/lecher/info hash count)
because:
* manual calculation (INC/DEC peers count) is not usable
* manual scan of all keys is quite expensive operation.
because:
* manual calculation (INC/DEC peers count) is not usable
* manual scan of all keys is quite expensive operation.
## Use Case
+20 -41
View File
@@ -20,6 +20,8 @@ import (
"github.com/sot-tech/mochi/pkg/stop"
)
var logger = log.NewLogger("http frontend")
// Config represents all of the configurable options for an HTTP BitTorrent
// Frontend.
type Config struct {
@@ -38,29 +40,6 @@ type Config struct {
ParseOptions
}
// LogFields renders the current config as a set of Logrus fields.
func (cfg Config) LogFields() log.Fields {
return log.Fields{
"addr": cfg.Addr,
"httpsAddr": cfg.HTTPSAddr,
"readTimeout": cfg.ReadTimeout,
"writeTimeout": cfg.WriteTimeout,
"idleTimeout": cfg.IdleTimeout,
"enableKeepAlive": cfg.EnableKeepAlive,
"tlsCertPath": cfg.TLSCertPath,
"tlsKeyPath": cfg.TLSKeyPath,
"announceRoutes": cfg.AnnounceRoutes,
"scrapeRoutes": cfg.ScrapeRoutes,
"pingRoutes": cfg.PingRoutes,
"enableRequestTiming": cfg.EnableRequestTiming,
"allowIPSpoofing": cfg.AllowIPSpoofing,
"realIPHeader": cfg.RealIPHeader,
"maxNumWant": cfg.MaxNumWant,
"defaultNumWant": cfg.DefaultNumWant,
"maxScrapeInfoHashes": cfg.MaxScrapeInfoHashes,
}
}
// Default config constants.
const (
defaultReadTimeout = 2 * time.Second
@@ -77,20 +56,20 @@ func (cfg Config) Validate() Config {
if cfg.ReadTimeout <= 0 {
validcfg.ReadTimeout = defaultReadTimeout
log.Warn("falling back to default configuration", log.Fields{
"name": "http.ReadTimeout",
"provided": cfg.ReadTimeout,
"default": validcfg.ReadTimeout,
})
logger.Warn().
Str("name", "http.ReadTimeout").
Dur("provided", cfg.ReadTimeout).
Dur("default", validcfg.ReadTimeout).
Msg("falling back to default configuration")
}
if cfg.WriteTimeout <= 0 {
validcfg.WriteTimeout = defaultWriteTimeout
log.Warn("falling back to default configuration", log.Fields{
"name": "http.WriteTimeout",
"provided": cfg.WriteTimeout,
"default": validcfg.WriteTimeout,
})
logger.Warn().
Str("name", "http.WriteTimeout").
Dur("provided", cfg.WriteTimeout).
Dur("default", validcfg.WriteTimeout).
Msg("falling back to default configuration")
}
if cfg.IdleTimeout <= 0 {
@@ -98,11 +77,11 @@ func (cfg Config) Validate() Config {
if cfg.EnableKeepAlive {
// If keepalive is disabled, this configuration isn't used anyway.
log.Warn("falling back to default configuration", log.Fields{
"name": "http.IdleTimeout",
"provided": cfg.IdleTimeout,
"default": validcfg.IdleTimeout,
})
logger.Warn().
Str("name", "http.IdleTimeout").
Dur("provided", cfg.IdleTimeout).
Dur("default", validcfg.IdleTimeout).
Msg("falling back to default configuration")
}
}
@@ -178,7 +157,7 @@ func NewFrontend(logic frontend.TrackerLogic, c conf.MapConfig) (*Frontend, erro
if cfg.Addr != "" {
go func() {
if err := f.serveHTTP(router, false); err != nil {
log.Fatal("failed while serving http", log.Err(err))
logger.Fatal().Err(err).Str("proto", "http").Msg("failed while serving")
}
}()
}
@@ -186,7 +165,7 @@ func NewFrontend(logic frontend.TrackerLogic, c conf.MapConfig) (*Frontend, erro
if cfg.HTTPSAddr != "" {
go func() {
if err := f.serveHTTP(router, true); err != nil {
log.Fatal("failed while serving https", log.Err(err))
logger.Fatal().Err(err).Str("proto", "https").Msg("failed while serving")
}
}()
}
@@ -194,7 +173,7 @@ func NewFrontend(logic frontend.TrackerLogic, c conf.MapConfig) (*Frontend, erro
return f, nil
}
// Stop provides a thread-safe way to shutdown a currently running Frontend.
// Stop provides a thread-safe way to shut down a currently running Frontend.
func (f *Frontend) Stop() stop.Result {
stopGroup := stop.NewGroup()
+2 -3
View File
@@ -9,7 +9,6 @@ import (
"github.com/anacrolix/torrent/bencode"
"github.com/sot-tech/mochi/bittorrent"
"github.com/sot-tech/mochi/pkg/log"
)
// WriteError communicates an error to a BitTorrent client over HTTP.
@@ -19,13 +18,13 @@ func WriteError(w http.ResponseWriter, err error) {
if errors.As(err, &clientErr) {
message = clientErr.Error()
} else {
log.Error("http: internal error", log.Err(err))
logger.Error().Err(err).Msg("http: internal error")
}
if err = bencode.NewEncoder(w).Encode(map[string]any{
"failure reason": message,
}); err != nil {
log.Error("unable to encode message", log.Err(err))
logger.Error().Err(err).Msg("unable to encode message")
}
}
+5
View File
@@ -8,8 +8,13 @@ import (
"github.com/stretchr/testify/require"
"github.com/sot-tech/mochi/bittorrent"
"github.com/sot-tech/mochi/pkg/log"
)
func init() {
_ = log.ConfigureLogger("", "warn", false, false)
}
func TestWriteError(t *testing.T) {
table := []struct {
reason, expected string
+17 -15
View File
@@ -2,6 +2,8 @@ package frontend
import "github.com/sot-tech/mochi/pkg/log"
var logger = log.NewLogger("frontend configurator")
// ParseOptions is the configuration used to parse an Announce Request.
//
// If AllowIPSpoofing is true, IPs provided via params will be used.
@@ -18,29 +20,29 @@ func (op ParseOptions) Validate() ParseOptions {
valid := op
if op.MaxNumWant <= 0 {
valid.MaxNumWant = defaultMaxNumWant
log.Warn("falling back to default configuration", log.Fields{
"name": "MaxNumWant",
"provided": op.MaxNumWant,
"default": valid.MaxNumWant,
})
logger.Warn().
Str("name", "MaxNumWant").
Uint32("provided", op.MaxNumWant).
Uint32("default", valid.MaxNumWant).
Msg("falling back to default configuration")
}
if op.DefaultNumWant <= 0 {
valid.DefaultNumWant = defaultDefaultNumWant
log.Warn("falling back to default configuration", log.Fields{
"name": "DefaultNumWant",
"provided": op.DefaultNumWant,
"default": valid.DefaultNumWant,
})
logger.Warn().
Str("name", "DefaultNumWant").
Uint32("provided", op.DefaultNumWant).
Uint32("default", valid.DefaultNumWant).
Msg("falling back to default configuration")
}
if op.MaxScrapeInfoHashes <= 0 {
valid.MaxScrapeInfoHashes = defaultMaxScrapeInfoHashes
log.Warn("falling back to default configuration", log.Fields{
"name": "MaxScrapeInfoHashes",
"provided": op.MaxScrapeInfoHashes,
"default": valid.MaxScrapeInfoHashes,
})
logger.Warn().
Str("name", "MaxScrapeInfoHashes").
Uint32("provided", op.MaxScrapeInfoHashes).
Uint32("default", valid.MaxScrapeInfoHashes).
Msg("falling back to default configuration")
}
return valid
}
+10 -2
View File
@@ -81,14 +81,22 @@ func (g *ConnectionIDGenerator) Generate(ip netip.Addr, now time.Time) []byte {
g.scratch = g.mac.Sum(g.scratch)
copy(g.connID[4:8], g.scratch[:4])
log.Debug("generated connection ID", log.Fields{"ip": ip, "now": now, "connID": g.connID})
log.Debug().
Stringer("ip", ip).
Time("now", now).
Bytes("connID", g.connID).
Msg("generated connection ID")
return g.connID
}
// Validate validates the given connection ID for an IP and the current time.
func (g *ConnectionIDGenerator) Validate(connectionID []byte, ip netip.Addr, now time.Time, maxClockSkew time.Duration) bool {
ts := time.Unix(int64(binary.BigEndian.Uint32(connectionID[:4])), 0)
log.Debug("validating connection ID", log.Fields{"connID": connectionID, "ip": ip, "ts": ts, "now": now})
log.Debug().
Stringer("ip", ip).
Time("ts", ts).Time("now", now).
Bytes("connID", g.connID).
Msg("validating connection ID")
if now.After(ts.Add(ttl)) || ts.After(now.Add(maxClockSkew)) {
return false
}
+5 -1
View File
@@ -57,7 +57,11 @@ func simpleNewConnectionID(ip netip.Addr, now time.Time, key string) []byte {
// this is just in here because logging impacts performance and we benchmark
// this version too.
log.Debug("manually generated connection ID", log.Fields{"ip": ip, "now": now, "connID": buf})
log.Debug().
Stringer("ip", ip).
Time("now", now).
Bytes("connID", buf).
Msg("manually generated connection ID")
return buf
}
+9 -17
View File
@@ -23,6 +23,8 @@ import (
"github.com/sot-tech/mochi/pkg/timecache"
)
var logger = log.NewLogger("udp frontend")
var allowedGeneratedPrivateKeyRunes = []rune("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ1234567890")
// Config represents all of the configurable options for a UDP BitTorrent
@@ -35,20 +37,6 @@ type Config struct {
frontend.ParseOptions
}
// LogFields renders the current config as a set of Logrus fields.
func (cfg Config) LogFields() log.Fields {
return log.Fields{
"addr": cfg.Addr,
"privateKey": cfg.PrivateKey,
"maxClockSkew": cfg.MaxClockSkew,
"enableRequestTiming": cfg.EnableRequestTiming,
"allowIPSpoofing": cfg.AllowIPSpoofing,
"maxNumWant": cfg.MaxNumWant,
"defaultNumWant": cfg.DefaultNumWant,
"maxScrapeInfoHashes": cfg.MaxScrapeInfoHashes,
}
}
// Validate sanity checks values set in a config and returns a new config with
// default values replacing anything that is invalid.
//
@@ -64,7 +52,11 @@ func (cfg Config) Validate() Config {
}
validcfg.PrivateKey = string(pkeyRunes)
log.Warn("UDP private key was not provided, using generated key", log.Fields{"key": validcfg.PrivateKey})
logger.Warn().
Str("name", "UDP.PrivateKey").
Str("provided", "").
Str("key", validcfg.PrivateKey).
Msg("falling back to default configuration")
}
validcfg.ParseOptions = cfg.ParseOptions.Validate()
@@ -111,7 +103,7 @@ func NewFrontend(logic frontend.TrackerLogic, c conf.MapConfig) (*Frontend, erro
f.wg.Add(1)
go func() {
if err := f.serve(); err != nil {
log.Fatal("failed while serving udp", log.Err(err))
logger.Fatal().Err(err).Str("proto", "udp").Msg("failed while serving")
}
}()
@@ -157,7 +149,7 @@ func (t *Frontend) serve() error {
// Check to see if we need to shutdown.
select {
case <-t.closing:
log.Debug("udp serve() received shutdown signal")
log.Debug().Msg("serve received shutdown signal")
return nil
default:
}
+5
View File
@@ -6,11 +6,16 @@ import (
"github.com/sot-tech/mochi/frontend/udp"
"github.com/sot-tech/mochi/middleware"
"github.com/sot-tech/mochi/pkg/conf"
"github.com/sot-tech/mochi/pkg/log"
_ "github.com/sot-tech/mochi/pkg/randseed"
"github.com/sot-tech/mochi/storage"
_ "github.com/sot-tech/mochi/storage/memory"
)
func init() {
_ = log.ConfigureLogger("", "warn", false, false)
}
func TestStartStopRaceIssue437(t *testing.T) {
ps, err := storage.NewStorage("memory", conf.MapConfig{})
if err != nil {
+5 -7
View File
@@ -3,6 +3,7 @@ module github.com/sot-tech/mochi
go 1.18
require (
code.cloudfoundry.org/go-diodes v0.0.0-20220420211542-53509ccdf174
github.com/SermoDigital/jose v0.9.2-0.20180104203859-803625baeddc
github.com/anacrolix/torrent v1.42.0
github.com/go-redis/redis/v8 v8.11.5
@@ -12,8 +13,7 @@ require (
github.com/mitchellh/mapstructure v1.5.0
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.12.1
github.com/sirupsen/logrus v1.8.1
github.com/spf13/cobra v1.4.0
github.com/rs/zerolog v1.26.1
github.com/stretchr/testify v1.7.1
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b
)
@@ -22,23 +22,21 @@ require (
github.com/anacrolix/dht/v2 v2.17.0 // indirect
github.com/anacrolix/log v0.13.1 // indirect
github.com/anacrolix/missinggo v1.3.0 // indirect
github.com/anacrolix/missinggo/v2 v2.6.0 // indirect
github.com/anacrolix/missinggo/v2 v2.7.0 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/bradfitz/iter v0.0.0-20191230175014-e8f45d346db8 // indirect
github.com/cespare/xxhash/v2 v2.1.2 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/fsnotify/fsnotify v1.5.3 // indirect
github.com/fsnotify/fsnotify v1.5.4 // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/huandu/xstrings v1.3.2 // indirect
github.com/inconshreveable/mousetrap v1.0.0 // indirect
github.com/klauspost/cpuid/v2 v2.0.12 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/client_model v0.2.0 // indirect
github.com/prometheus/common v0.34.0 // indirect
github.com/prometheus/procfs v0.7.3 // indirect
github.com/spf13/pflag v1.0.5 // indirect
golang.org/x/sys v0.0.0-20220422013727-9388b58f7150 // indirect
golang.org/x/sys v0.0.0-20220429233432-b5fbb4746d32 // indirect
google.golang.org/protobuf v1.28.0 // indirect
)
+23 -19
View File
@@ -30,6 +30,8 @@ cloud.google.com/go/storage v1.5.0/go.mod h1:tpKbwo567HUNpVclU5sGELwQWBDZ8gh0Zeo
cloud.google.com/go/storage v1.6.0/go.mod h1:N7U0C8pVQ/+NIKOBQyamJIeKQKkZ+mxpohlUTyfDhBk=
cloud.google.com/go/storage v1.8.0/go.mod h1:Wv1Oy7z6Yz3DshWRJFhqM/UCfaWIRTdp0RXyy7KQOVs=
cloud.google.com/go/storage v1.10.0/go.mod h1:FLPqc6j+Ki4BU591ie1oL6qBQGu2Bl/tZ9ullr3+Kg0=
code.cloudfoundry.org/go-diodes v0.0.0-20220420211542-53509ccdf174 h1:Ht2zKWftukU3F3ACIdE8asNhso3DgHPzaCDO2K5SWmA=
code.cloudfoundry.org/go-diodes v0.0.0-20220420211542-53509ccdf174/go.mod h1:HLP7HKUU1eqMAGMk247yT91tDDi4xxnehkyXh6hGcr0=
crawshaw.io/iox v0.0.0-20181124134642-c51c3df30797/go.mod h1:sXBiorCo8c46JlQV3oXPKINnZ8mcqnye1EkVkqsectk=
crawshaw.io/sqlite v0.3.2/go.mod h1:igAO5JulrQ1DbdZdtVq48mnZUBAPOeFzer7VhDWNtW4=
dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
@@ -65,8 +67,8 @@ github.com/anacrolix/missinggo v1.3.0/go.mod h1:bqHm8cE8xr+15uVfMG3BFui/TxyB6//H
github.com/anacrolix/missinggo/perf v1.0.0/go.mod h1:ljAFWkBuzkO12MQclXzZrosP5urunoLS0Cbvb4V0uMQ=
github.com/anacrolix/missinggo/v2 v2.2.0/go.mod h1:o0jgJoYOyaoYQ4E2ZMISVa9c88BbUBVQQW4QeRkNCGY=
github.com/anacrolix/missinggo/v2 v2.5.1/go.mod h1:WEjqh2rmKECd0t1VhQkLGTdIWXO6f6NLjp5GlMZ+6FA=
github.com/anacrolix/missinggo/v2 v2.6.0 h1:kHkn6nLy1isWYV4mthZX8itV1bRd2mwFVuXrxzJ4VX0=
github.com/anacrolix/missinggo/v2 v2.6.0/go.mod h1:2IZIvmRTizALNYFYXsPR7ofXPzJgyBpKZ4kMqMEICkI=
github.com/anacrolix/missinggo/v2 v2.7.0 h1:4fzOAAn/VCvfWGviLmh64MPMttrlYew81JdPO7nSHvI=
github.com/anacrolix/missinggo/v2 v2.7.0/go.mod h1:2IZIvmRTizALNYFYXsPR7ofXPzJgyBpKZ4kMqMEICkI=
github.com/anacrolix/stm v0.2.0/go.mod h1:zoVQRvSiGjGoTmbM0vSLIiaKjWtNPeTvXUSdJQA4hsg=
github.com/anacrolix/tagflag v0.0.0-20180109131632-2146c8d41bf0/go.mod h1:1m2U/K6ZT+JZG0+bdMK6qauP49QT4wE5pmhJXOKKCHw=
github.com/anacrolix/tagflag v1.0.0/go.mod h1:1m2U/K6ZT+JZG0+bdMK6qauP49QT4wE5pmhJXOKKCHw=
@@ -92,7 +94,7 @@ github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5P
github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU=
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc=
github.com/cpuguy83/go-md2man/v2 v2.0.1/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o=
github.com/coreos/go-systemd/v22 v22.3.2/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc=
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
@@ -111,8 +113,8 @@ github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1m
github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
github.com/frankban/quicktest v1.14.0 h1:+cqqvzZV87b4adx/5ayVOaYZ2CrvM4ejQvUdBzPPUss=
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
github.com/fsnotify/fsnotify v1.5.3 h1:vNFpj2z7YIbwh2bw7x35sqYpp2wfuq+pivKbWG09B8c=
github.com/fsnotify/fsnotify v1.5.3/go.mod h1:T3375wBYaZdLLcVNkcVbzGHY7f1l/uK5T5Ai1i3InKU=
github.com/fsnotify/fsnotify v1.5.4 h1:jRbGcIw6P2Meqdwuo0H1p6JVLbL5DHKAKlYndzMwVZI=
github.com/fsnotify/fsnotify v1.5.4/go.mod h1:OVB6XrOHzAwXMpEM7uPOzcehqUV2UqJxmVXmkdnm1bU=
github.com/glycerine/go-unsnap-stream v0.0.0-20180323001048-9f0cb55181dd/go.mod h1:/20jfyN9Y5QPEAprSgKAUr+glWDY39ZiUEAYOEv5dsE=
github.com/glycerine/go-unsnap-stream v0.0.0-20181221182339-f9677308dec2/go.mod h1:/20jfyN9Y5QPEAprSgKAUr+glWDY39ZiUEAYOEv5dsE=
github.com/glycerine/go-unsnap-stream v0.0.0-20190901134440-81cf024a9e0a/go.mod h1:/20jfyN9Y5QPEAprSgKAUr+glWDY39ZiUEAYOEv5dsE=
@@ -133,6 +135,7 @@ github.com/go-logfmt/logfmt v0.5.1/go.mod h1:WYhtIu8zTZfxdn5+rREduYbwxfcBr/Vr6KE
github.com/go-redis/redis/v8 v8.11.5 h1:AcZZR7igkdvfVmQTPnu9WE37LRrO/YrBH5zWyjDC0oI=
github.com/go-redis/redis/v8 v8.11.5/go.mod h1:gREzHqY1hg6oD9ngVRbLStwAWKhA0FEgq8Jd4h5lpwo=
github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=
github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ=
github.com/gogo/protobuf v1.2.0/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ=
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
@@ -206,8 +209,6 @@ github.com/huandu/xstrings v1.3.1/go.mod h1:y5/lhBue+AyNmUVz9RLU9xbLR0o4KIIExikq
github.com/huandu/xstrings v1.3.2 h1:L18LIDzqlW6xN2rEkpdV8+oL/IXWJ1APd+vsdYy4Wdw=
github.com/huandu/xstrings v1.3.2/go.mod h1:y5/lhBue+AyNmUVz9RLU9xbLR0o4KIIExikq4ovT0aE=
github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc=
github.com/inconshreveable/mousetrap v1.0.0 h1:Z8tu5sraLXCXIcARxBp/8cbvlwVa7Z1NHg9XEKhtSvM=
github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8=
github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4=
github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU=
github.com/json-iterator/go v1.1.9/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4=
@@ -257,7 +258,7 @@ github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+W
github.com/onsi/ginkgo v1.7.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/ginkgo v1.16.5 h1:8xi0RTUf59SOSfEtZMvwTvXYMzG4gV23XVHOZiXNtnE=
github.com/onsi/gomega v1.4.3/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
github.com/onsi/gomega v1.18.1 h1:M1GfJqGRrBrrGGsbxzV5dqM2U2ApXefZCQpkukxYRLE=
github.com/onsi/gomega v1.19.0 h1:4ieX6qQjPP/BfC3mpsAtIGGlxTWPeA3Inl/7DtXw1tw=
github.com/openzipkin/zipkin-go v0.1.6/go.mod h1:QgAqvLzwWbR/WpD4A3cGpPtJrZXNIiJc5AZX7/PBEpw=
github.com/philhofer/fwd v1.0.0/go.mod h1:gk3iGcWd9+svBvR0sR+KPcfE+RNWozjowpeBVG3ZVNU=
github.com/pierrec/lz4 v2.0.5+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY=
@@ -304,21 +305,17 @@ github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFR
github.com/rogpeppe/go-internal v1.6.1/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc=
github.com/rogpeppe/go-internal v1.8.0 h1:FCbCCtXNOY3UtUuHUYaghJg4y7Fd14rXifAYUAtL9R8=
github.com/rogpeppe/go-internal v1.8.0/go.mod h1:WmiCO8CzOY8rg0OYDC4/i/2WRWAB6poM+XZ2dLUbcbE=
github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
github.com/rs/xid v1.3.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg=
github.com/rs/zerolog v1.26.1 h1:/ihwxqH+4z8UxyI70wM1z9yCvkWcfz/a3mj48k/Zngc=
github.com/rs/zerolog v1.26.1/go.mod h1:/wSSJWX7lVrsOwlbyTRSOJvqRlc+WjWlfes+CiJ+tmc=
github.com/ryszard/goskiplist v0.0.0-20150312221310-2dfbae5fcf46/go.mod h1:uAQ5PCi+MFsC7HjREoAz1BU+Mq60+05gifQSsHSDG/8=
github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE=
github.com/sirupsen/logrus v1.6.0/go.mod h1:7uNnSEd1DgxDLC74fIahvMZmmYsHGZGEOFrfsX/uA88=
github.com/sirupsen/logrus v1.8.1 h1:dJKuHgqk1NNQlqoA6BTlM1Wf9DOH3NBjQyu0h9+AZZE=
github.com/sirupsen/logrus v1.8.1/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0=
github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d/go.mod h1:OnSkiWE9lh6wB0YB77sQom3nweQdgAjqCqsofrRNTgc=
github.com/smartystreets/assertions v0.0.0-20190215210624-980c5ac6f3ac/go.mod h1:OnSkiWE9lh6wB0YB77sQom3nweQdgAjqCqsofrRNTgc=
github.com/smartystreets/goconvey v0.0.0-20181108003508-044398e4856c/go.mod h1:XDJAKZRPZ1CvBcN2aX5YOUTYGHki24fSF0Iv48Ibg0s=
github.com/smartystreets/goconvey v0.0.0-20190306220146-200a235640ff/go.mod h1:KSQcGKpxUMHk3nbYzs/tIBAM2iDooCn0BmttHOJEbLs=
github.com/spf13/cobra v1.4.0 h1:y+wJpx64xcgO1V+RcnwW0LEHxTKRi2ZDPSBjWnrg88Q=
github.com/spf13/cobra v1.4.0/go.mod h1:Wo4iy3BUC+X2Fybo0PDqwJIv3dNRiZLHQymsfxlB84g=
github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA=
github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.2.1/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
@@ -336,6 +333,7 @@ github.com/willf/bitset v1.1.10/go.mod h1:RjeCKbqT1RxIR/KWY6phxZiaY1IyutSBfGjNPy
github.com/yuin/goldmark v1.1.25/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.1.32/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.4.0/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=
go.opencensus.io v0.20.1/go.mod h1:6WKK9ahsWS3RSO+PY9ZHZUfv2irvY6gN279GOPZjmmk=
go.opencensus.io v0.20.2/go.mod h1:6WKK9ahsWS3RSO+PY9ZHZUfv2irvY6gN279GOPZjmmk=
go.opencensus.io v0.21.0/go.mod h1:mSImk1erAIZhrmZN+AvHh14ztQfjbGwt4TtuofqLduU=
@@ -349,6 +347,7 @@ golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8U
golang.org/x/crypto v0.0.0-20190605123033-f99c8df09eb5/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20211215165025-cf75a172585e/go.mod h1:P+XmwS30IXTQdn5tA2iutPOUgjI07+tq3H3K9MVA1s8=
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20190510132918-efd6b22b2522/go.mod h1:ZjyILWgesfNpC6sMxTJOJm9Kp84zZh5NQWvqDGG3Qr8=
@@ -379,6 +378,7 @@ golang.org/x/mod v0.1.1-0.20191105210325-c90efee705ee/go.mod h1:QqPTAvyqsEbceGzB
golang.org/x/mod v0.1.1-0.20191107180719-034126e5016b/go.mod h1:QqPTAvyqsEbceGzBzNggFXnrqF1CaUcvgkdR5Ot7KZg=
golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
@@ -409,7 +409,9 @@ golang.org/x/net v0.0.0-20200520182314-0ba52f642ac2/go.mod h1:qpuaurCH72eLCgpAm/
golang.org/x/net v0.0.0-20200625001655-4c5254603344/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA=
golang.org/x/net v0.0.0-20200707034311-ab3426394381/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA=
golang.org/x/net v0.0.0-20200822124328-c89045814202/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA=
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
golang.org/x/net v0.0.0-20210525063256-abc453219eb5/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.0.0-20210805182204-aaa1db679c0d/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk=
golang.org/x/net v0.0.0-20220225172249-27dd8689420f h1:oA4XRj0qtSt8Yo1Zms0CUlsT3KG69V2UGQWPBxujDmc=
golang.org/x/net v0.0.0-20220225172249-27dd8689420f/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk=
@@ -429,6 +431,7 @@ golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
@@ -444,7 +447,6 @@ golang.org/x/sys v0.0.0-20190606165138-5da285871e9c/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20190624142023-c5567b49c5d0/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190726091711-fc99dfbffb4e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191001151750-bb3f8db39f24/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191204072324-ce4227a45e2e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191228213918-04cbcbbfeed8/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200106162015-b016eb3dc98e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
@@ -469,11 +471,12 @@ golang.org/x/sys v0.0.0-20210124154548-22da62e12c0c/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210603081109-ebe580a85c40/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210809222454-d867a43fc93e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220114195835-da31bd327af9/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220422013727-9388b58f7150 h1:xHms4gcpe1YE7A3yIllJXP16CMAGuqwO2lX1mTyyRRc=
golang.org/x/sys v0.0.0-20220422013727-9388b58f7150/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220412211240-33da011f77ad/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220429233432-b5fbb4746d32 h1:Js08h5hqB5xyWR789+QqueR6sDE8mk+YvpETZ+F6X9Y=
golang.org/x/sys v0.0.0-20220429233432-b5fbb4746d32/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
@@ -528,6 +531,7 @@ golang.org/x/tools v0.0.0-20200618134242-20370b0cb4b2/go.mod h1:EkVYQZoAsY45+roY
golang.org/x/tools v0.0.0-20200729194436-6467de6f59a7/go.mod h1:njjCfa9FT2d7l9Bc6FUM5FLjQPp3cFF28FI3qnDFljA=
golang.org/x/tools v0.0.0-20200804011535-6c149bb5ef0d/go.mod h1:njjCfa9FT2d7l9Bc6FUM5FLjQPp3cFF28FI3qnDFljA=
golang.org/x/tools v0.0.0-20200825202427-b303f430e36d/go.mod h1:njjCfa9FT2d7l9Bc6FUM5FLjQPp3cFF28FI3qnDFljA=
golang.org/x/tools v0.1.7/go.mod h1:LGqMHiF4EqQNHR1JncWGqT5BVaXmza+X+BDGol+dOxo=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
+1 -8
View File
@@ -5,7 +5,6 @@ import (
"errors"
"github.com/sot-tech/mochi/bittorrent"
"github.com/sot-tech/mochi/pkg/log"
"github.com/sot-tech/mochi/storage"
)
@@ -185,16 +184,10 @@ func (h *responseHook) appendPeers(req *bittorrent.AnnounceRequest, resp *bittor
resp.IPv4Peers = append(resp.IPv4Peers, p)
uniquePeers[p] = nil
} else {
log.Warn("received invalid peer from storage", log.Fields{"peer": p})
logger.Warn().Object("peer", p).Msg("received invalid peer from storage")
}
}
}
log.Debug("responseHook announce peers", log.Fields{
"infoHash": req.InfoHash,
"requestPeer": req.RequestPeer,
"ipv4Peers": resp.IPv4Peers,
"ipv6Peers": resp.IPv6Peers,
})
return
}
+39 -45
View File
@@ -14,7 +14,6 @@ import (
"errors"
"fmt"
"net/http"
"strings"
"time"
jc "github.com/SermoDigital/jose/crypto"
@@ -38,11 +37,18 @@ func init() {
}
var (
logger = log.NewLogger(Name)
// ErrMissingJWT is returned when a JWT is missing from a request.
ErrMissingJWT = bittorrent.ClientError("unapproved request: missing jwt")
// ErrInvalidJWT is returned when a JWT fails to verify.
ErrInvalidJWT = bittorrent.ClientError("unapproved request: invalid jwt")
errInvalidInfoHashClaim = errors.New("claim \"infohash\" is invalid")
errInvalidKid = errors.New("invalid kid")
errUnknownKidSigner = errors.New("signed by unknown kid")
)
// Config represents all the values required by this middleware to fetch JWKs
@@ -54,16 +60,6 @@ type Config struct {
JWKUpdateInterval time.Duration `cfg:"jwk_set_update_interval"`
}
// LogFields implements log.Fielder for a Config.
func (cfg Config) LogFields() log.Fields {
return log.Fields{
"issuer": cfg.Issuer,
"audience": cfg.Audience,
"JWKSetURL": cfg.JWKSetURL,
"JWKUpdateInterval": cfg.JWKUpdateInterval,
}
}
type hook struct {
cfg Config
publicKeys map[string]crypto.PublicKey
@@ -77,14 +73,14 @@ func build(options conf.MapConfig, _ storage.PeerStorage) (middleware.Hook, erro
return nil, fmt.Errorf("middleware %s: %w", Name, err)
}
log.Debug("creating new JWT middleware", options)
logger.Debug().Object("options", options).Msg("creating new JWT middleware")
h := &hook{
cfg: cfg,
publicKeys: map[string]crypto.PublicKey{},
closing: make(chan struct{}),
}
log.Debug("performing initial fetch of JWKs")
logger.Debug().Msg("performing initial fetch of JWKs")
if err := h.updateKeys(); err != nil {
return nil, fmt.Errorf("failed to fetch initial JWK Set: %w", err)
}
@@ -95,7 +91,7 @@ func build(options conf.MapConfig, _ storage.PeerStorage) (middleware.Hook, erro
case <-h.closing:
return
case <-time.After(cfg.JWKUpdateInterval):
log.Debug("performing fetch of JWKs")
logger.Debug().Msg("performing fetch of JWKs")
_ = h.updateKeys()
}
}
@@ -107,14 +103,14 @@ func build(options conf.MapConfig, _ storage.PeerStorage) (middleware.Hook, erro
func (h *hook) updateKeys() error {
resp, err := http.Get(h.cfg.JWKSetURL)
if err != nil {
log.Error("failed to fetch JWK Set", log.Err(err))
logger.Error().Err(err).Msg("failed to fetch JWK Set")
return err
}
defer resp.Body.Close()
var parsedJWKs gojwk.Key
err = json.NewDecoder(resp.Body).Decode(&parsedJWKs)
if err != nil {
log.Error("failed to decode JWK JSON", log.Err(err))
logger.Error().Err(err).Msg("failed to decode JWK JSON")
return err
}
@@ -122,19 +118,19 @@ func (h *hook) updateKeys() error {
for _, parsedJWK := range parsedJWKs.Keys {
publicKey, err := parsedJWK.DecodePublicKey()
if err != nil {
log.Error("failed to decode JWK into public key", log.Err(err))
logger.Error().Err(err).Msg("failed to decode JWK into public key")
return err
}
keys[parsedJWK.Kid] = publicKey
}
h.publicKeys = keys
log.Debug("successfully fetched JWK Set")
logger.Debug().Msg("successfully fetched JWK Set")
return nil
}
func (h *hook) Stop() stop.Result {
log.Debug("attempting to shutdown JWT middleware")
logger.Debug().Msg("attempting to shutdown JWT middleware")
select {
case <-h.closing:
return stop.AlreadyStopped
@@ -178,53 +174,51 @@ func validateJWT(ih bittorrent.InfoHash, jwtBytes []byte, cfgIss, cfgAud string,
claims := parsedJWT.Claims()
if iss, ok := claims.Issuer(); !ok || iss != cfgIss {
log.Debug("unequal or missing issuer when validating JWT", log.Fields{
"exists": ok,
"claim": iss,
"config": cfgIss,
})
logger.Debug().
Bool("exists", ok).
Str("claim", iss).
Str("config", cfgIss).
Msg("unequal or missing issuer when validating JWT")
return jwt.ErrInvalidISSClaim
}
if auds, ok := claims.Audience(); !ok || !in(cfgAud, auds) {
log.Debug("unequal or missing audience when validating JWT", log.Fields{
"exists": ok,
"claim": strings.Join(auds, ","),
"config": cfgAud,
})
logger.Debug().
Bool("exists", ok).
Strs("claim", auds).
Str("config", cfgAud).
Msg("unequal or missing audience when validating JWT")
return jwt.ErrInvalidAUDClaim
}
ihHex := hex.EncodeToString([]byte(ih))
if ihClaim, ok := claims.Get("infohash").(string); !ok || ihClaim != ihHex {
log.Debug("unequal or missing infohash when validating JWT", log.Fields{
"exists": ok,
"claim": ihClaim,
"request": ihHex,
})
return errors.New("claim \"infohash\" is invalid")
logger.Debug().
Bool("exists", ok).
Str("claim", ihClaim).
Str("request", ihHex).
Msg("unequal or missing infohash when validating JWT")
return errInvalidInfoHashClaim
}
parsedJWS := parsedJWT.(jws.JWS)
kid, ok := parsedJWS.Protected().Get("kid").(string)
if !ok {
log.Debug("missing kid when validating JWT", log.Fields{
"exists": ok,
"claim": kid,
})
return errors.New("invalid kid")
logger.Debug().
Bool("exists", ok).
Str("claim", kid).
Msg("missing kid when validating JWT")
return errInvalidKid
}
publicKey, ok := publicKeys[kid]
if !ok {
log.Debug("missing public key forkid when validating JWT", log.Fields{
"kid": kid,
})
return errors.New("signed by unknown kid")
logger.Debug().Str("claim", kid).Msg("missing public key forkid when validating JWT")
return errUnknownKidSigner
}
err = parsedJWS.Verify(publicKey, jc.SigningMethodRS256)
if err != nil {
log.Debug("failed to verify signature of JWT", log.Err(err))
logger.Debug().Err(err).Msg("failed to verify signature of JWT")
return err
}
+17 -5
View File
@@ -11,7 +11,10 @@ import (
"github.com/sot-tech/mochi/storage"
)
var _ frontend.TrackerLogic = &Logic{}
var (
logger = log.NewLogger("middleware")
_ frontend.TrackerLogic = &Logic{}
)
// NewLogic creates a new instance of a TrackerLogic that executes the provided
// middleware hooks.
@@ -35,6 +38,7 @@ type Logic struct {
// HandleAnnounce generates a response for an Announce.
func (l *Logic) HandleAnnounce(ctx context.Context, req *bittorrent.AnnounceRequest) (_ context.Context, resp *bittorrent.AnnounceResponse, err error) {
logger.Debug().Object("request", req).Msg("new announce request")
resp = &bittorrent.AnnounceResponse{
Interval: l.announceInterval,
MinInterval: l.minAnnounceInterval,
@@ -46,7 +50,7 @@ func (l *Logic) HandleAnnounce(ctx context.Context, req *bittorrent.AnnounceRequ
}
}
log.Debug("generated announce response", resp)
logger.Debug().Object("response", resp).Msg("generated announce response")
return ctx, resp, nil
}
@@ -56,7 +60,10 @@ func (l *Logic) AfterAnnounce(ctx context.Context, req *bittorrent.AnnounceReque
var err error
for _, h := range l.postHooks {
if ctx, err = h.HandleAnnounce(ctx, req, resp); err != nil {
log.Error("post-announce hooks failed", log.Err(err))
logger.Error().Err(err).
Object("request", req).
Object("response", resp).
Msg("post-announce hooks failed")
return
}
}
@@ -64,6 +71,7 @@ func (l *Logic) AfterAnnounce(ctx context.Context, req *bittorrent.AnnounceReque
// HandleScrape generates a response for a Scrape.
func (l *Logic) HandleScrape(ctx context.Context, req *bittorrent.ScrapeRequest) (_ context.Context, resp *bittorrent.ScrapeResponse, err error) {
logger.Debug().Object("request", req).Msg("new scrape request")
resp = &bittorrent.ScrapeResponse{
Files: make([]bittorrent.Scrape, 0, len(req.InfoHashes)),
}
@@ -73,7 +81,7 @@ func (l *Logic) HandleScrape(ctx context.Context, req *bittorrent.ScrapeRequest)
}
}
log.Debug("generated scrape response", resp)
logger.Debug().Object("response", resp).Msg("generated scrape response")
return ctx, resp, nil
}
@@ -83,7 +91,11 @@ func (l *Logic) AfterScrape(ctx context.Context, req *bittorrent.ScrapeRequest,
var err error
for _, h := range l.postHooks {
if ctx, err = h.HandleScrape(ctx, req, resp); err != nil {
log.Error("post-scrape hooks failed", log.Err(err))
logger.Error().
Err(err).
Object("request", req).
Object("response", resp).
Msg("post-scrape hooks failed")
return
}
}
+5
View File
@@ -9,8 +9,13 @@ import (
"github.com/stretchr/testify/require"
"github.com/sot-tech/mochi/bittorrent"
"github.com/sot-tech/mochi/pkg/log"
)
func init() {
_ = log.ConfigureLogger("", "warn", false, false)
}
// nopHook is a Hook to measure the overhead of a no-operation Hook through
// benchmarks.
type nopHook struct{}
+5 -5
View File
@@ -47,11 +47,11 @@ func RegisterBuilder(name string, d Builder) {
drivers[name] = d
}
// New attempts to initialize a new middleware instance from the
// NewHook attempts to initialize a new middleware instance from the
// list of registered Builders.
//
// If a driver does not exist, returns ErrBuilderDoesNotExist.
func New(name string, options conf.MapConfig, storage storage.PeerStorage) (Hook, error) {
func NewHook(name string, options conf.MapConfig, storage storage.PeerStorage) (Hook, error) {
driversM.RLock()
defer driversM.RUnlock()
@@ -70,9 +70,9 @@ type Config struct {
Options conf.MapConfig
}
// HooksFromHookConfigs is a utility function for initializing Hooks in bulk.
// NewHooks is a utility function for initializing Hooks in bulk.
// each element of configs must contain pairs `name` - string and `options` - map[string]any
func HooksFromHookConfigs(configs []conf.MapConfig, storage storage.PeerStorage) (hooks []Hook, err error) {
func NewHooks(configs []conf.MapConfig, storage storage.PeerStorage) (hooks []Hook, err error) {
for _, cfg := range configs {
var c Config
@@ -81,7 +81,7 @@ func HooksFromHookConfigs(configs []conf.MapConfig, storage storage.PeerStorage)
}
var h Hook
h, err = New(c.Name, c.Options, storage)
h, err = NewHook(c.Name, c.Options, storage)
if err != nil {
break
}
@@ -23,6 +23,8 @@ import (
// Name of this container for registry
const Name = "directory"
var logger = log.NewLogger("torrent approval directory")
func init() {
container.Register(Name, build)
}
@@ -55,36 +57,37 @@ func build(conf conf.MapConfig, st storage.DataStorage) (container.Container, er
}
d.watcher = w
if len(d.StorageCtx) == 0 {
log.Info("storage context not set, using default value: " + container.DefaultStorageCtxName)
logger.Warn().
Str("name", "StorageCtx").
Str("provided", d.StorageCtx).
Str("default", container.DefaultStorageCtxName).
Msg("falling back to default configuration")
d.StorageCtx = container.DefaultStorageCtxName
}
go func() {
for event := range d.watcher.Events {
var mi *metainfo.MetaInfo
lf := log.Fields{
"file": event.TorrentFilePath,
"v1hash": event.InfoHash,
}
if mi, err = metainfo.LoadFromFile(event.TorrentFilePath); err == nil {
s256 := sha256.New()
s256.Write(mi.InfoBytes)
v2hash, _ := bittorrent.NewInfoHash(s256.Sum(nil))
lf["v2hash"] = v2hash
lf["v2to1hash"] = v2hash.TruncateV1()
switch event.Change {
case dirwatch.Added:
var name string
if info, err := mi.UnmarshalInfo(); err == nil {
name = info.Name
} else {
lf["error"] = err
log.Warn("unable to unmarshal torrent info", lf)
delete(lf, "error")
logger.Error().
Err(err).
Str("file", event.TorrentFilePath).
Stringer("infoHash", event.InfoHash).
Stringer("infoHashV2", v2hash).
Msg("unable to unmarshal torrent info")
}
if len(name) == 0 {
name = list.DUMMY
}
if err := d.Storage.Put(d.StorageCtx,
logger.Err(d.Storage.Put(d.StorageCtx,
storage.Entry{
Key: event.InfoHash.AsString(),
Value: name,
@@ -94,23 +97,28 @@ func build(conf conf.MapConfig, st storage.DataStorage) (container.Container, er
}, storage.Entry{
Key: v2hash.TruncateV1().RawString(),
Value: name,
}); err != nil {
lf["error"] = err
}
log.Debug("approval torrent added", lf)
})).
Str("action", "add").
Str("file", event.TorrentFilePath).
Stringer("infoHash", event.InfoHash).
Stringer("infoHashV2", v2hash).
Msg("approval torrent watcher event")
case dirwatch.Removed:
if err := d.Storage.Delete(c.StorageCtx,
logger.Err(d.Storage.Delete(c.StorageCtx,
event.InfoHash.AsString(),
v2hash.RawString(),
v2hash.TruncateV1().RawString(),
); err != nil {
lf["error"] = err
}
log.Debug("approval torrent deleted", lf)
)).
Str("action", "delete").
Str("file", event.TorrentFilePath).
Stringer("infoHash", event.InfoHash).
Stringer("infoHashV2", v2hash).
Msg("approval torrent watcher event")
}
} else {
lf["error"] = err
log.Error("unable to load torrent file", lf)
logger.Error().Err(err).
Str("file", event.TorrentFilePath).
Msg("unable to load torrent file")
}
}
}()
@@ -124,6 +132,8 @@ type directory struct {
// Stop closes watching of torrent directory
func (d *directory) Stop() stop.Result {
st := make(stop.Channel)
d.watcher.Close()
return stop.AlreadyStopped
st.Done()
return st.Result()
}
@@ -15,6 +15,8 @@ import (
// Name of this container for registry.
const Name = "list"
var logger = log.NewLogger("torrent approval list")
func init() {
container.Register(Name, build)
}
@@ -45,7 +47,11 @@ func build(conf conf.MapConfig, st storage.DataStorage) (container.Container, er
}
if len(l.StorageCtx) == 0 {
log.Info("Storage context not set, using default value: " + container.DefaultStorageCtxName)
logger.Warn().
Str("name", "StorageCtx").
Str("provided", l.StorageCtx).
Str("default", container.DefaultStorageCtxName).
Msg("falling back to default configuration")
l.StorageCtx = container.DefaultStorageCtxName
}
@@ -93,7 +99,7 @@ func (l *List) Approved(hash bittorrent.InfoHash) (contains bool) {
}
}
if err != nil {
log.Err(err)
logger.Error().Err(err).Stringer("infoHash", hash).Msg("unable load hash information from storage")
}
return contains != l.Invert
}
@@ -9,9 +9,14 @@ import (
"github.com/sot-tech/mochi/bittorrent"
"github.com/sot-tech/mochi/pkg/conf"
"github.com/sot-tech/mochi/pkg/log"
"github.com/sot-tech/mochi/storage/memory"
)
func init() {
_ = log.ConfigureLogger("", "warn", false, false)
}
var cases = []struct {
cfg baseConfig
ih string
+4 -5
View File
@@ -6,8 +6,7 @@ import (
"errors"
"github.com/mitchellh/mapstructure"
"github.com/sot-tech/mochi/pkg/log"
"github.com/rs/zerolog"
)
// TagName is a tag name, used for decoder customization
@@ -20,9 +19,9 @@ var ErrNilConfigMap = errors.New("unable to process nil map")
// MapConfig is just alias for map[string]any
type MapConfig map[string]any
// LogFields just returns this map as a set of Logrus fields.
func (m MapConfig) LogFields() log.Fields {
return log.Fields(m)
// MarshalZerologObject writes map into zerolog event
func (m MapConfig) MarshalZerologObject(e *zerolog.Event) {
e.Fields(map[string]any(m))
}
// Unmarshal decodes receiver map into provided structure.
+283 -113
View File
@@ -1,134 +1,304 @@
// Package log adds a thin wrapper around logrus to improve non-debug logging
// performance.
// Package log adds a thin wrapper around zerolog to improve logging performance.
//
// Root logger (called by log.Info, log.Warn etc.) uses global zerolog.Logger instance
// until ConfigureLogger called. Any child logger created with NewLogger will not
// produce any events until logger configured, so any function which uses child
// logger will come stuck because of root initialization synchronization.
package log
import (
"fmt"
"io"
"os"
"strings"
"sync"
"github.com/sirupsen/logrus"
// needs for async file logging
_ "code.cloudfoundry.org/go-diodes"
"github.com/rs/zerolog"
"github.com/rs/zerolog/diode"
zl "github.com/rs/zerolog/log"
)
var (
l = logrus.New()
debug = false
root = zl.Logger
rootWg = sync.WaitGroup{}
customOut io.WriteCloser
customOutMu = sync.Mutex{}
)
// SetDebug controls debug logging.
func SetDebug(to bool) {
debug = to
l.Level = logrus.DebugLevel
func init() {
rootWg.Add(1)
}
// SetFormatter sets the formatter.
func SetFormatter(to logrus.Formatter) {
l.Formatter = to
}
// SetOutput sets the output.
func SetOutput(to io.Writer) {
l.Out = to
}
// Fields is a map of logging fields.
type Fields map[string]any
// LogFields implements Fielder for Fields.
func (f Fields) LogFields() Fields {
return f
}
// A Fielder provides Fields via the LogFields method.
type Fielder interface {
LogFields() Fields
}
// err is a wrapper around an error.
type err struct {
e error
}
// LogFields provides Fields for logging.
func (e err) LogFields() Fields {
return Fields{
"error": e.e.Error(),
"type": fmt.Sprintf("%T", e.e),
}
}
// Err is a wrapper around errors that implements Fielder.
func Err(e error) Fielder {
return err{e}
}
// mergeFielders merges the Fields of multiple Fielders.
// Fields from the first Fielder will be used unchanged, Fields from subsequent
// Fielders will be prefixed with "%d.", starting from 1.
//
// must be called with len(fielders) > 0
func mergeFielders(fielders ...Fielder) logrus.Fields {
if fielders[0] == nil {
return nil
}
fields := fielders[0].LogFields()
for i := 1; i < len(fielders); i++ {
if fielders[i] == nil {
continue
}
prefix := fmt.Sprint(i, ".")
ff := fielders[i].LogFields()
for k, v := range ff {
fields[prefix+k] = v
}
}
return logrus.Fields(fields)
}
// Debug logs at the debug level if debug logging is enabled.
func Debug(v any, fielders ...Fielder) {
if debug {
if len(fielders) != 0 {
l.WithFields(mergeFielders(fielders...)).Debug(v)
// ConfigureLogger initializes root and all child loggers.
// NOTE: this function MUST be called before any child log call
// otherwise any goroutine, which uses logger will wait logger initialization
func ConfigureLogger(output, level string, formatted, colored bool) (err error) {
lvl := zerolog.WarnLevel
output = strings.ToLower(output)
var w io.Writer
var stdAny bool
switch output {
case "stderr", "":
w, stdAny = os.Stderr, true
case "stdout":
w, stdAny = os.Stdout, true
default:
if w, err = os.OpenFile(output, os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0o600); err == nil {
customOutMu.Lock()
defer customOutMu.Unlock()
customOut = diode.NewWriter(w, 1000, 0, func(missed int) {
zl.Warn().Int("count", missed).Msg("Logger dropped messages")
})
w = customOut
} else {
l.Debug(v)
return err
}
}
if stdAny && formatted {
w = zerolog.ConsoleWriter{
Out: w,
NoColor: !colored,
TimeFormat: "2006-01-02 15:04:05.999",
}
}
if len(level) > 0 {
if logLevel, err := zerolog.ParseLevel(strings.ToLower(level)); err == nil {
lvl = logLevel
} else {
return err
}
}
root = zerolog.New(w).With().Timestamp().Logger()
zerolog.SetGlobalLevel(lvl)
rootWg.Done()
return nil
}
// Info logs at the info level.
func Info(v any, fielders ...Fielder) {
if len(fielders) != 0 {
l.WithFields(mergeFielders(fielders...)).Info(v)
} else {
l.Info(v)
// Logger is the holder for zerolog.Logger which
// waits until root logger initialized to prevent
// mixed logging format and output
type Logger struct {
comp string
zlOnce sync.Once
zerolog.Logger
}
func (l *Logger) init() {
l.zlOnce.Do(func() {
rootWg.Wait()
l.Logger = root.With().Str("component", l.comp).Logger()
})
}
// ==== copied from zerolog ====
// Trace starts a new message with trace level.
//
// You must call Msg on the returned event in order to send the event.
func (l *Logger) Trace() *zerolog.Event {
l.init()
return l.Logger.Trace()
}
// Debug starts a new message with debug level.
//
// You must call Msg on the returned event in order to send the event.
func (l *Logger) Debug() *zerolog.Event {
l.init()
return l.Logger.Debug()
}
// Info starts a new message with info level.
//
// You must call Msg on the returned event in order to send the event.
func (l *Logger) Info() *zerolog.Event {
l.init()
return l.Logger.Info()
}
// Warn starts a new message with warn level.
//
// You must call Msg on the returned event in order to send the event.
func (l *Logger) Warn() *zerolog.Event {
l.init()
return l.Logger.Warn()
}
// Error starts a new message with error level.
//
// You must call Msg on the returned event in order to send the event.
func (l *Logger) Error() *zerolog.Event {
l.init()
return l.Logger.Error()
}
// Err starts a new message with error level with err as a field if not nil or
// with info level if err is nil.
//
// You must call Msg on the returned event in order to send the event.
func (l *Logger) Err(err error) *zerolog.Event {
l.init()
return l.Logger.Err(err)
}
// Fatal starts a new message with fatal level. The os.Exit(1) function
// is called by the Msg method, which terminates the program immediately.
//
// You must call Msg on the returned event in order to send the event.
func (l *Logger) Fatal() *zerolog.Event {
l.init()
return l.Logger.Fatal()
}
// Panic starts a new message with panic level. The panic() function
// is called by the Msg method, which stops the ordinary flow of a goroutine.
//
// You must call Msg on the returned event in order to send the event.
func (l *Logger) Panic() *zerolog.Event {
l.init()
return l.Logger.Panic()
}
// WithLevel starts a new message with level. Unlike Fatal and Panic
// methods, WithLevel does not terminate the program or stop the ordinary
// flow of a gourotine when used with their respective levels.
//
// You must call Msg on the returned event in order to send the event.
func (l *Logger) WithLevel(level zerolog.Level) *zerolog.Event {
l.init()
return l.Logger.WithLevel(level)
}
// Log starts a new message with no level. Setting GlobalLevel to Disabled
// will still disable events produced by this method.
//
// You must call Msg on the returned event in order to send the event.
func (l *Logger) Log() *zerolog.Event {
l.init()
return l.Logger.Log()
}
// Print sends a log event using debug level and no extra field.
// Arguments are handled in the manner of fmt.Print.
func (l *Logger) Print(v ...interface{}) {
l.init()
l.Logger.Print(v...)
}
// Printf sends a log event using debug level and no extra field.
// Arguments are handled in the manner of fmt.Printf.
func (l *Logger) Printf(format string, v ...interface{}) {
l.init()
l.Logger.Printf(format, v...)
}
// Write implements the io.Writer interface. This is useful to set as a writer
// for the standard library log.
func (l *Logger) Write(p []byte) (n int, err error) {
l.init()
return l.Logger.Write(p)
}
// Err starts a new message with error level with err as a field if not nil or
// with info level if err is nil.
//
// You must call Msg on the returned event in order to send the event.
func Err(err error) *zerolog.Event {
return root.Err(err)
}
// Trace starts a new message with trace level.
//
// You must call Msg on the returned event in order to send the event.
func Trace() *zerolog.Event {
return root.Trace()
}
// Debug starts a new message with debug level.
//
// You must call Msg on the returned event in order to send the event.
func Debug() *zerolog.Event {
return root.Debug()
}
// Info starts a new message with info level.
//
// You must call Msg on the returned event in order to send the event.
func Info() *zerolog.Event {
return root.Info()
}
// Warn starts a new message with warn level.
//
// You must call Msg on the returned event in order to send the event.
func Warn() *zerolog.Event {
return root.Warn()
}
// Error starts a new message with error level.
//
// You must call Msg on the returned event in order to send the event.
func Error() *zerolog.Event {
return root.Error()
}
// Fatal starts a new message with fatal level. The os.Exit(1) function
// is called by the Msg method.
//
// You must call Msg on the returned event in order to send the event.
func Fatal() *zerolog.Event {
return root.Fatal()
}
// Panic starts a new message with panic level. The message is also sent
// to the panic function.
//
// You must call Msg on the returned event in order to send the event.
func Panic() *zerolog.Event {
return root.Panic()
}
// WithLevel starts a new message with level.
//
// You must call Msg on the returned event in order to send the event.
func WithLevel(level zerolog.Level) *zerolog.Event {
return root.WithLevel(level)
}
// Log starts a new message with no level. Setting zerolog.GlobalLevel to
// zerolog.Disabled will still disable events produced by this method.
//
// You must call Msg on the returned event in order to send the event.
func Log() *zerolog.Event {
return root.Log()
}
// Print sends a log event using debug level and no extra field.
// Arguments are handled in the manner of fmt.Print.
func Print(v ...interface{}) {
root.Print(v...)
}
// Printf sends a log event using debug level and no extra field.
// Arguments are handled in the manner of fmt.Printf.
func Printf(format string, v ...interface{}) {
root.Printf(format, v...)
}
// Close closes custom output writer if it configured
func Close() {
customOutMu.Lock()
defer customOutMu.Unlock()
if customOut != nil {
_ = customOut.Close()
customOut = nil
}
}
// Warn logs at the warning level.
func Warn(v any, fielders ...Fielder) {
if len(fielders) != 0 {
l.WithFields(mergeFielders(fielders...)).Warn(v)
} else {
l.Warn(v)
}
}
// Error logs at the error level.
func Error(v any, fielders ...Fielder) {
if len(fielders) != 0 {
l.WithFields(mergeFielders(fielders...)).Error(v)
} else {
l.Error(v)
}
}
// Fatal logs at the fatal level and exits with a status code != 0.
func Fatal(v any, fielders ...Fielder) {
if len(fielders) != 0 {
l.WithFields(mergeFielders(fielders...)).Fatal(v)
} else {
l.Fatal(v)
}
// NewLogger creates child logger with specified component name
// NOTE: root logger MUST be initialized with ConfigureLogger
// before any logger call
func NewLogger(component string) *Logger {
return &Logger{comp: component}
}
+5 -2
View File
@@ -16,7 +16,10 @@ import (
"github.com/sot-tech/mochi/pkg/stop"
)
var serverCounter = new(int32)
var (
logger = log.NewLogger("metrics")
serverCounter = new(int32)
)
// Enabled indicates that configured at least one metrics server
func Enabled() bool {
@@ -74,7 +77,7 @@ func NewServer(addr string) *Server {
atomic.AddInt32(serverCounter, 1)
defer atomic.AddInt32(serverCounter, -1)
if err := s.srv.ListenAndServe(); !errors.Is(err, http.ErrServerClosed) {
log.Fatal("failed while serving prometheus", log.Err(err))
logger.Error().Err(err).Msg("failed while serving prometheus")
}
}()
+34 -32
View File
@@ -14,6 +14,7 @@ import (
"errors"
"github.com/go-redis/redis/v8"
"github.com/rs/zerolog"
"github.com/sot-tech/mochi/bittorrent"
"github.com/sot-tech/mochi/pkg/conf"
@@ -29,9 +30,12 @@ const (
expireMemberCmd = "EXPIREMEMBER"
)
// ErrNotKeyDB returned from initializer if connected does not support KeyDB
// specific command (EXPIREMEMBER)
var ErrNotKeyDB = errors.New("provided instance seems not KeyDB")
var (
logger = log.NewLogger(Name)
// ErrNotKeyDB returned from initializer if connected does not support KeyDB
// specific command (EXPIREMEMBER)
ErrNotKeyDB = errors.New("provided instance seems not KeyDB")
)
func init() {
// Register the storage driver.
@@ -72,10 +76,8 @@ func newStore(cfg r.Config) (*store, error) {
if err == nil {
st = &store{
Connection: rs,
logFields: cfg.LogFields(),
peerTTL: uint(cfg.PeerLifetime.Seconds()),
}
st.logFields["name"] = Name
}
return st, err
@@ -83,8 +85,12 @@ func newStore(cfg r.Config) (*store, error) {
type store struct {
r.Connection
logFields log.Fields
peerTTL uint
peerTTL uint
}
// MarshalZerologObject writes configuration into zerolog event
func (s store) MarshalZerologObject(e *zerolog.Event) {
e.Str("type", Name).Object("config", s.Config)
}
func (s store) setPeerTTL(infoHashKey, peerID string) error {
@@ -92,10 +98,10 @@ func (s store) setPeerTTL(infoHashKey, peerID string) error {
}
func (s store) addPeer(infoHashKey, peerID string) (err error) {
log.Debug("storage: KeyDB: PutPeer", log.Fields{
"infoHashKey": infoHashKey,
"peerID": peerID,
})
logger.Trace().
Str("infoHashKey", infoHashKey).
Str("peerID", peerID).
Msg("add peer")
if err = s.SAdd(context.TODO(), infoHashKey, peerID).Err(); err == nil {
err = s.setPeerTTL(infoHashKey, peerID)
}
@@ -103,10 +109,10 @@ func (s store) addPeer(infoHashKey, peerID string) (err error) {
}
func (s store) delPeer(infoHashKey, peerID string) error {
log.Debug("storage: KeyDB: DeletePeer", log.Fields{
"infoHashKey": infoHashKey,
"peerID": peerID,
})
logger.Trace().
Str("infoHashKey", infoHashKey).
Str("peerID", peerID).
Msg("del peer")
deleted, err := s.SRem(context.TODO(), infoHashKey, peerID).Uint64()
err = r.AsNil(err)
if err == nil && deleted == 0 {
@@ -133,10 +139,10 @@ func (s store) DeleteLeecher(ih bittorrent.InfoHash, peer bittorrent.Peer) error
}
func (s store) GraduateLeecher(ih bittorrent.InfoHash, peer bittorrent.Peer) (err error) {
log.Debug("storage: KeyDB: GraduateLeecher", log.Fields{
"infoHash": ih,
"peer": peer,
})
logger.Trace().
Stringer("infoHash", ih).
Object("peer", peer).
Msg("graduate leecher")
infoHash, peerID := ih.RawString(), peer.RawString()
ihSeederKey := r.InfoHashKey(infoHash, true, peer.Addr().Is6())
ihLeecherKey := r.InfoHashKey(infoHash, false, peer.Addr().Is6())
@@ -153,12 +159,12 @@ func (s store) GraduateLeecher(ih bittorrent.InfoHash, peer bittorrent.Peer) (er
// AnnouncePeers is the same function as redis.AnnouncePeers
func (s store) AnnouncePeers(ih bittorrent.InfoHash, seeder bool, numWant int, v6 bool) ([]bittorrent.Peer, error) {
log.Debug("storage: KeyDB: AnnouncePeers", log.Fields{
"infoHash": ih,
"seeder": seeder,
"numWant": numWant,
"v6": v6,
})
logger.Trace().
Stringer("infoHash", ih).
Bool("seeder", seeder).
Int("numWant", numWant).
Bool("v6", v6).
Msg("announce peers")
return s.GetPeers(ih, seeder, numWant, v6, func(ctx context.Context, infoHashKey string, maxCount int) *redis.StringSliceCmd {
return s.SRandMemberN(context.TODO(), infoHashKey, int64(maxCount))
@@ -167,9 +173,9 @@ func (s store) AnnouncePeers(ih bittorrent.InfoHash, seeder bool, numWant int, v
// ScrapeSwarm is the same function as redis.ScrapeSwarm except `SCard` call instead of `HLen`
func (s store) ScrapeSwarm(ih bittorrent.InfoHash) (leechers uint32, seeders uint32, snatched uint32) {
log.Debug("storage: KeyDB ScrapeSwarm", log.Fields{
"infoHash": ih,
})
logger.Trace().
Stringer("infoHash", ih).
Msg("scrape swarm")
leechers, seeders = s.CountPeers(ih, s.SCard)
return
}
@@ -182,7 +188,3 @@ func (s *store) Stop() stop.Result {
}
return c.Result()
}
func (s store) LogFields() log.Fields {
return s.logFields
}
+51 -18
View File
@@ -11,6 +11,8 @@ import (
"sync"
"time"
"github.com/rs/zerolog"
"github.com/sot-tech/mochi/bittorrent"
"github.com/sot-tech/mochi/pkg/conf"
"github.com/sot-tech/mochi/pkg/log"
@@ -27,6 +29,8 @@ const (
defaultShardCount = 1024
)
var logger = log.NewLogger(Name)
func init() {
// Register the storage driver.
storage.RegisterBuilder(Name, builder)
@@ -45,12 +49,9 @@ type Config struct {
ShardCount int `cfg:"shard_count"`
}
// LogFields renders the current config as a set of Logrus fields.
func (cfg Config) LogFields() log.Fields {
return log.Fields{
"name": Name,
"shardCount": cfg.ShardCount,
}
// MarshalZerologObject writes configuration into zerolog event
func (cfg Config) MarshalZerologObject(e *zerolog.Event) {
e.Int("shardCount", cfg.ShardCount)
}
// Validate sanity checks values set in a config and returns a new config with
@@ -62,11 +63,11 @@ func (cfg Config) Validate() Config {
if cfg.ShardCount <= 0 || cfg.ShardCount > (math.MaxInt/2) {
validcfg.ShardCount = defaultShardCount
log.Warn("falling back to default configuration", log.Fields{
"name": Name + ".ShardCount",
"provided": cfg.ShardCount,
"default": validcfg.ShardCount,
})
log.Warn().
Str("name", "ShardCount").
Int("provided", cfg.ShardCount).
Int("default", validcfg.ShardCount).
Msg("falling back to default configuration")
}
return validcfg
@@ -111,6 +112,11 @@ type peerStore struct {
wg sync.WaitGroup
}
// MarshalZerologObject writes configuration into zerolog event
func (ps *peerStore) MarshalZerologObject(e *zerolog.Event) {
e.Str("type", Name).Object("config", ps.cfg)
}
var _ storage.PeerStorage = &peerStore{}
func (ps *peerStore) ScheduleGC(gcInterval, peerLifeTime time.Duration) {
@@ -125,10 +131,12 @@ func (ps *peerStore) ScheduleGC(gcInterval, peerLifeTime time.Duration) {
return
case <-t.C:
before := time.Now().Add(-peerLifeTime)
log.Debug("storage: Memory purging peers with no announces since", log.Fields{"before": before})
logger.Trace().Time("before", before).Msg("purging peers with no announces")
start := time.Now()
ps.gc(before)
storage.PromGCDurationMilliseconds.Observe(float64(time.Since(start).Nanoseconds()) / float64(time.Millisecond))
duration := time.Since(start)
logger.Debug().Dur("timeTaken", duration).Msg("gc complete")
storage.PromGCDurationMilliseconds.Observe(float64(duration.Milliseconds()))
}
}
}()
@@ -162,7 +170,7 @@ func (ps *peerStore) ScheduleStatisticsCollection(reportInterval time.Duration)
storage.PromInfoHashesCount.Set(float64(numInfohashes))
storage.PromSeedersCount.Set(float64(numSeeders))
storage.PromLeechersCount.Set(float64(numLeechers))
log.Debug("storage: Memory: populateProm() finished", log.Fields{"timeTaken": time.Since(before)})
logger.Debug().TimeDiff("timeTaken", time.Now(), before).Msg("populate prom complete")
}
}
}
@@ -190,6 +198,10 @@ func (ps *peerStore) PutSeeder(ih bittorrent.InfoHash, p bittorrent.Peer) error
panic("attempted to interact with stopped memory store")
default:
}
logger.Trace().
Stringer("infoHash", ih).
Object("peer", p).
Msg("put seeder")
shard := ps.shards[ps.shardIndex(ih, p.Addr().Is6())]
shard.Lock()
@@ -219,6 +231,10 @@ func (ps *peerStore) DeleteSeeder(ih bittorrent.InfoHash, p bittorrent.Peer) err
panic("attempted to interact with stopped memory store")
default:
}
logger.Trace().
Stringer("infoHash", ih).
Object("peer", p).
Msg("delete seeder")
shard := ps.shards[ps.shardIndex(ih, p.Addr().Is6())]
shard.Lock()
@@ -248,6 +264,10 @@ func (ps *peerStore) PutLeecher(ih bittorrent.InfoHash, p bittorrent.Peer) error
panic("attempted to interact with stopped memory store")
default:
}
logger.Trace().
Stringer("infoHash", ih).
Object("peer", p).
Msg("put leecher")
shard := ps.shards[ps.shardIndex(ih, p.Addr().Is6())]
shard.Lock()
@@ -277,6 +297,10 @@ func (ps *peerStore) DeleteLeecher(ih bittorrent.InfoHash, p bittorrent.Peer) er
panic("attempted to interact with stopped memory store")
default:
}
logger.Trace().
Stringer("infoHash", ih).
Object("peer", p).
Msg("delete leecher")
shard := ps.shards[ps.shardIndex(ih, p.Addr().Is6())]
shard.Lock()
@@ -306,6 +330,10 @@ func (ps *peerStore) GraduateLeecher(ih bittorrent.InfoHash, p bittorrent.Peer)
panic("attempted to interact with stopped memory store")
default:
}
logger.Trace().
Stringer("infoHash", ih).
Object("peer", p).
Msg("graduate leecher")
shard := ps.shards[ps.shardIndex(ih, p.Addr().Is6())]
shard.Lock()
@@ -368,6 +396,12 @@ func (ps *peerStore) AnnouncePeers(ih bittorrent.InfoHash, seeder bool, numWant
panic("attempted to interact with stopped memory store")
default:
}
logger.Trace().
Stringer("infoHash", ih).
Bool("seeder", seeder).
Int("numWant", numWant).
Bool("v6", v6).
Msg("announce peers")
peers = ps.getPeers(ps.shards[ps.shardIndex(ih, v6)], ih, numWant, seeder)
@@ -391,6 +425,9 @@ func (ps *peerStore) ScrapeSwarm(ih bittorrent.InfoHash) (leechers uint32, seede
panic("attempted to interact with stopped memory store")
default:
}
logger.Trace().
Stringer("infoHash", ih).
Msg("scrape swarm")
leechers, seeders = ps.countPeers(ih, true)
l, s := ps.countPeers(ih, false)
@@ -540,7 +577,3 @@ func (ps *peerStore) Stop() stop.Result {
return c.Result()
}
func (ps *peerStore) LogFields() log.Fields {
return ps.cfg.LogFields()
}
+109 -127
View File
@@ -28,6 +28,7 @@ import (
"time"
"github.com/go-redis/redis/v8"
"github.com/rs/zerolog"
"github.com/sot-tech/mochi/bittorrent"
"github.com/sot-tech/mochi/pkg/conf"
@@ -64,8 +65,11 @@ const (
CountLeecherKey = "CHI_C_L"
)
// ErrSentinelAndClusterChecked returned from initializer if both Config.Sentinel and Config.Cluster provided
var ErrSentinelAndClusterChecked = errors.New("unable to use both cluster and sentinel mode")
var (
logger = log.NewLogger(Name)
// ErrSentinelAndClusterChecked returned from initializer if both Config.Sentinel and Config.Cluster provided
ErrSentinelAndClusterChecked = errors.New("unable to use both cluster and sentinel mode")
)
func init() {
// Register the storage builder.
@@ -98,7 +102,6 @@ func newStore(cfg Config) (*store, error) {
Connection: rs,
closed: make(chan any),
wg: sync.WaitGroup{},
logFields: cfg.LogFields(),
}, nil
}
@@ -118,21 +121,18 @@ type Config struct {
ConnectTimeout time.Duration `cfg:"connect_timeout"`
}
// LogFields renders the current config as a set of Logrus fields.
func (cfg Config) LogFields() log.Fields {
return log.Fields{
"name": Name,
"peerLifetime": cfg.PeerLifetime,
"addresses": cfg.Addresses,
"db": cfg.DB,
"poolSize": cfg.PoolSize,
"sentinel": cfg.Sentinel,
"sentinelMaster": cfg.SentinelMaster,
"cluster": cfg.Cluster,
"readTimeout": cfg.ReadTimeout,
"writeTimeout": cfg.WriteTimeout,
"connectTimeout": cfg.ConnectTimeout,
}
// MarshalZerologObject writes configuration fields into zerolog event
func (cfg Config) MarshalZerologObject(e *zerolog.Event) {
e.Strs("addresses", cfg.Addresses).
Int("db", cfg.DB).
Int("poolSize", cfg.PoolSize).
Bool("sentinel", cfg.Sentinel).
Str("sentinelMaster", cfg.SentinelMaster).
Bool("cluster", cfg.Cluster).
Dur("readTimeout", cfg.ReadTimeout).
Dur("writeTimeout", cfg.WriteTimeout).
Dur("connectTimeout", cfg.ConnectTimeout).
Dur("peerLifetime", cfg.PeerLifetime)
}
// Validate sanity checks values set in a config and returns a new config with
@@ -157,38 +157,38 @@ func (cfg Config) Validate() (Config, error) {
validCfg.Addresses = addresses
if len(cfg.Addresses) == 0 {
validCfg.Addresses = []string{defaultRedisAddress}
log.Warn("falling back to default configuration", log.Fields{
"name": Name + ".Addresses",
"provided": cfg.Addresses,
"default": validCfg.Addresses,
})
logger.Warn().
Str("name", "Addresses").
Strs("provided", cfg.Addresses).
Strs("default", validCfg.Addresses).
Msg("falling back to default configuration")
}
if cfg.ReadTimeout <= 0 {
validCfg.ReadTimeout = defaultReadTimeout
log.Warn("falling back to default configuration", log.Fields{
"name": Name + ".ReadTimeout",
"provided": cfg.ReadTimeout,
"default": validCfg.ReadTimeout,
})
logger.Warn().
Str("name", "ReadTimeout").
Dur("provided", cfg.ReadTimeout).
Dur("default", validCfg.ReadTimeout).
Msg("falling back to default configuration")
}
if cfg.WriteTimeout <= 0 {
validCfg.WriteTimeout = defaultWriteTimeout
log.Warn("falling back to default configuration", log.Fields{
"name": Name + ".WriteTimeout",
"provided": cfg.WriteTimeout,
"default": validCfg.WriteTimeout,
})
logger.Warn().
Str("name", "WriteTimeout").
Dur("provided", cfg.WriteTimeout).
Dur("default", validCfg.WriteTimeout).
Msg("falling back to default configuration")
}
if cfg.ConnectTimeout <= 0 {
validCfg.ConnectTimeout = defaultConnectTimeout
log.Warn("falling back to default configuration", log.Fields{
"name": Name + ".ConnectTimeout",
"provided": cfg.ConnectTimeout,
"default": validCfg.ConnectTimeout,
})
logger.Warn().
Str("name", "ConnectTimeout").
Dur("provided", cfg.ConnectTimeout).
Dur("default", validCfg.ConnectTimeout).
Msg("falling back to default configuration")
}
return validCfg, nil
@@ -238,7 +238,13 @@ func (cfg Config) Connect() (con Connection, err error) {
_ = rs.Close()
rs = nil
}
return Connection{rs}, err
cfg.Login, cfg.Password = "", ""
return Connection{rs, cfg}, err
}
// MarshalZerologObject writes configuration into zerolog event
func (ps *store) MarshalZerologObject(e *zerolog.Event) {
e.Str("type", Name).Object("config", ps.Config)
}
func (ps *store) ScheduleGC(gcInterval, peerLifeTime time.Duration) {
@@ -254,9 +260,9 @@ func (ps *store) ScheduleGC(gcInterval, peerLifeTime time.Duration) {
case <-t.C:
start := time.Now()
ps.gc(time.Now().Add(-peerLifeTime))
duration := time.Since(start).Milliseconds()
log.Debug("storage: Redis: recordGCDuration", log.Fields{"timeTaken(ms)": duration})
storage.PromGCDurationMilliseconds.Observe(float64(duration))
duration := time.Since(start)
logger.Debug().Dur("timeTaken", duration).Msg("gc complete")
storage.PromGCDurationMilliseconds.Observe(float64(duration.Milliseconds()))
t.Reset(gcInterval)
}
}
@@ -285,7 +291,7 @@ func (ps *store) ScheduleStatisticsCollection(reportInterval time.Duration) {
storage.PromInfoHashesCount.Set(float64(numInfoHashes))
storage.PromSeedersCount.Set(float64(numSeeders))
storage.PromLeechersCount.Set(float64(numLeechers))
log.Debug("storage: Redis: populateProm() finished", log.Fields{"timeTaken": time.Since(before)})
logger.Debug().TimeDiff("timeTaken", time.Now(), before).Msg("populate prom complete")
}
}
}
@@ -295,13 +301,13 @@ func (ps *store) ScheduleStatisticsCollection(reportInterval time.Duration) {
// Connection is wrapper for redis.UniversalClient
type Connection struct {
redis.UniversalClient
Config
}
type store struct {
Connection
closed chan any
wg sync.WaitGroup
logFields log.Fields
closed chan any
wg sync.WaitGroup
}
func (ps *store) count(key string, getLength bool) (n uint64) {
@@ -313,10 +319,7 @@ func (ps *store) count(key string, getLength bool) (n uint64) {
}
err = AsNil(err)
if err != nil {
log.Error("storage: Redis: GET/SCARD failure", log.Fields{
"key": key,
"error": err,
})
logger.Error().Err(err).Str("key", key).Msg("storage: Redis: GET/SCARD failure")
}
return
}
@@ -375,11 +378,10 @@ func InfoHashKey(infoHash string, seeder, v6 bool) (infoHashKey string) {
}
func (ps *store) putPeer(infoHashKey, peerCountKey, peerID string) error {
log.Debug("storage: Redis: PutPeer", log.Fields{
"infoHashKey": infoHashKey,
"peerCountKey": peerCountKey,
"peerID": peerID,
})
logger.Trace().
Str("infoHashKey", infoHashKey).
Str("peerID", peerID).
Msg("put peer")
return ps.tx(func(tx redis.Pipeliner) (err error) {
if err = tx.HSet(context.TODO(), infoHashKey, peerID, ps.getClock()).Err(); err != nil {
return
@@ -393,11 +395,10 @@ func (ps *store) putPeer(infoHashKey, peerCountKey, peerID string) error {
}
func (ps *store) delPeer(infoHashKey, peerCountKey, peerID string) error {
log.Debug("storage: Redis: DeletePeer", log.Fields{
"infoHashKey": infoHashKey,
"peerCountKey": peerCountKey,
"peerID": peerID,
})
logger.Trace().
Str("infoHashKey", infoHashKey).
Str("peerID", peerID).
Msg("del peer")
deleted, err := ps.HDel(context.TODO(), infoHashKey, peerID).Uint64()
err = AsNil(err)
if err == nil {
@@ -428,10 +429,10 @@ func (ps *store) DeleteLeecher(ih bittorrent.InfoHash, peer bittorrent.Peer) err
}
func (ps *store) GraduateLeecher(ih bittorrent.InfoHash, peer bittorrent.Peer) error {
log.Debug("storage: Redis: GraduateLeecher", log.Fields{
"infoHash": ih,
"peer": peer,
})
logger.Trace().
Stringer("infoHash", ih).
Object("peer", peer).
Msg("graduate leecher")
infoHash, peerID, isV6 := ih.RawString(), peer.RawString(), peer.Addr().Is6()
ihSeederKey, ihLeecherKey := InfoHashKey(infoHash, true, isV6), InfoHashKey(infoHash, false, isV6)
@@ -465,7 +466,7 @@ func (ps Connection) parsePeersList(peersResult *redis.StringSliceCmd) (peers []
if p, err := bittorrent.NewPeer(peerID); err == nil {
peers = append(peers, p)
} else {
log.Error("storage: Redis: unable to decode leecher", log.Fields{"peerID": peerID})
logger.Error().Err(err).Str("peerID", peerID).Msg("unable to decode peer")
}
}
}
@@ -506,22 +507,19 @@ func (ps Connection) GetPeers(ih bittorrent.InfoHash, forSeeder bool, maxCount i
}
} else if l > 0 {
err = nil
log.Warn("storage: Redis: error occurred while retrieving peers", log.Fields{
"infoHash": infoHash,
"error": err,
})
logger.Warn().Err(err).Stringer("infoHash", ih).Msg("error occurred while retrieving peers")
}
return
}
func (ps *store) AnnouncePeers(ih bittorrent.InfoHash, seeder bool, numWant int, v6 bool) ([]bittorrent.Peer, error) {
log.Debug("storage: Redis: AnnouncePeers", log.Fields{
"infoHash": ih,
"seeder": seeder,
"numWant": numWant,
"peer": v6,
})
logger.Trace().
Stringer("infoHash", ih).
Bool("seeder", seeder).
Int("numWant", numWant).
Bool("v6", v6).
Msg("announce peers")
return ps.GetPeers(ih, seeder, numWant, v6, func(ctx context.Context, infoHashKey string, maxCount int) *redis.StringSliceCmd {
return ps.HRandField(ctx, infoHashKey, maxCount, false)
@@ -534,10 +532,7 @@ func (ps Connection) countPeers(infoHashKey string, countFn getPeerCountFn) uint
count, err := countFn(context.TODO(), infoHashKey).Result()
err = AsNil(err)
if err != nil {
log.Error("storage: Redis: key size calculation failure", log.Fields{
"infoHashKey": infoHashKey,
"error": err,
})
logger.Error().Err(err).Str("infoHashKey", infoHashKey).Msg("key size calculation failure")
}
return uint32(count)
}
@@ -555,9 +550,9 @@ func (ps Connection) CountPeers(ih bittorrent.InfoHash, countFn getPeerCountFn)
}
func (ps *store) ScrapeSwarm(ih bittorrent.InfoHash) (leechers uint32, seeders uint32, snatched uint32) {
log.Debug("storage: Redis ScrapeSwarm", log.Fields{
"infoHash": ih,
})
logger.Trace().
Stringer("infoHash", ih).
Msg("scrape swarm")
leechers, seeders = ps.CountPeers(ih, ps.HLen)
@@ -579,7 +574,7 @@ func (ps Connection) Put(ctx string, values ...storage.Entry) (err error) {
err = ps.HSet(context.TODO(), PrefixKey+ctx, args...).Err()
if err != nil {
if strings.Contains(err.Error(), argNumErrorMsg) {
log.Warn("This Redis version/implementation does not support variadic arguments for HSET")
logger.Warn().Msg("This Redis version/implementation does not support variadic arguments for HSET")
for _, p := range values {
if err = ps.HSet(context.TODO(), PrefixKey+ctx, p.Key, p.Value).Err(); err != nil {
break
@@ -613,7 +608,7 @@ func (ps Connection) Delete(ctx string, keys ...string) (err error) {
err = AsNil(ps.HDel(context.TODO(), PrefixKey+ctx, keys...).Err())
if err != nil {
if strings.Contains(err.Error(), argNumErrorMsg) {
log.Warn("This Redis version/implementation does not support variadic arguments for HDEL")
logger.Warn().Msg("This Redis version/implementation does not support variadic arguments for HDEL")
for _, k := range keys {
if err = AsNil(ps.HDel(context.TODO(), PrefixKey+ctx, k).Err()); err != nil {
break
@@ -676,7 +671,6 @@ func (Connection) Preservable() bool {
// transaction. The infohash key will remain in the addressFamil hash and
// we'll attempt to clean it up the next time gc runs.
func (ps *store) gc(cutoff time.Time) {
log.Debug("storage: Redis: purging peers with no announces since", log.Fields{"before": cutoff})
cutoffNanos := cutoff.UnixNano()
// list all infoHashKeys in the group
infoHashKeys, err := ps.SMembers(context.Background(), IHKey).Result()
@@ -690,7 +684,7 @@ func (ps *store) gc(cutoff time.Time) {
} else if strings.HasPrefix(infoHashKey, IH4LeecherKey) || strings.HasPrefix(infoHashKey, IH6LeecherKey) {
cntKey = CountLeecherKey
} else {
log.Warn("storage: Redis: unexpected record found in info hash set", log.Fields{"infoHashKey": infoHashKey})
logger.Warn().Str("infoHashKey", infoHashKey).Msg("unexpected record found in info hash set")
continue
}
// list all (peer, timeout) pairs for the ih
@@ -701,16 +695,15 @@ func (ps *store) gc(cutoff time.Time) {
for peerID, timeStamp := range peerList {
if mtime, err := strconv.ParseInt(timeStamp, 10, 64); err == nil {
if mtime <= cutoffNanos {
log.Debug("storage: Redis: adding peer to remove list", log.Fields{"peerID": peerID})
logger.Trace().Str("peerID", peerID).Msg("adding peer to remove list")
peersToRemove = append(peersToRemove, peerID)
}
} else {
log.Error("storage: Redis: unable to decode peer timestamp", log.Fields{
"infoHashKey": infoHashKey,
"peerID": peerID,
"timestamp": timeStamp,
"error": err,
})
logger.Error().Err(err).
Str("infoHashKey", infoHashKey).
Str("peerID", peerID).
Str("timestamp", timeStamp).
Msg("unable to decode peer timestamp")
}
}
if len(peersToRemove) > 0 {
@@ -718,35 +711,32 @@ func (ps *store) gc(cutoff time.Time) {
err = AsNil(err)
if err != nil {
if strings.Contains(err.Error(), argNumErrorMsg) {
log.Warn("This Redis version/implementation does not support variadic arguments for HDEL")
logger.Warn().Msg("This Redis version/implementation does not support variadic arguments for HDEL")
for _, k := range peersToRemove {
count, err := ps.HDel(context.Background(), infoHashKey, k).Result()
err = AsNil(err)
if err != nil {
log.Error("storage: Redis: unable to delete peer", log.Fields{
"infoHashKey": infoHashKey,
"peerID": k,
"error": err,
})
logger.Error().Err(err).
Str("infoHashKey", infoHashKey).
Str("peerID", k).
Msg("unable to delete peer")
} else {
removedPeerCount += count
}
}
} else {
log.Error("storage: Redis: unable to delete peers", log.Fields{
"infoHashKey": infoHashKey,
"peerIds": peersToRemove,
"error": err,
})
logger.Error().Err(err).
Str("infoHashKey", infoHashKey).
Strs("peerIDs", peersToRemove).
Msg("unable to delete peers")
}
}
if removedPeerCount > 0 { // DECR seeder/leecher counter
if err = ps.DecrBy(context.Background(), cntKey, removedPeerCount).Err(); err != nil {
log.Error("storage: Redis: unable to decrement seeder/leecher peer count", log.Fields{
"infoHashKey": infoHashKey,
"countKey": cntKey,
"error": err,
})
logger.Error().Err(err).
Str("infoHashKey", infoHashKey).
Str("countKey", cntKey).
Msg("unable to decrement seeder/leecher peer count")
}
}
}
@@ -764,20 +754,20 @@ func (ps *store) gc(cutoff time.Time) {
return err
}, infoHashKey))
if err != nil {
log.Error("storage: Redis: unable to clean info hash records", log.Fields{
"infoHashKey": infoHashKey,
"error": err,
})
logger.Error().Err(err).
Str("infoHashKey", infoHashKey).
Msg("unable to clean info hash records")
}
} else {
log.Error("storage: Redis: unable to fetch info hash peers", log.Fields{
"infoHashKey": infoHashKey,
"error": err,
})
logger.Error().Err(err).
Str("infoHashKey", infoHashKey).
Msg("unable to fetch info hash peers")
}
}
} else {
log.Error("storage: Redis: unable to fetch info hash set", log.Fields{"hashSet": IHKey, "error": err})
logger.Error().Err(err).
Str("hashSet", IHKey).
Msg("unable to fetch info hash peers")
}
}
@@ -790,7 +780,7 @@ func (ps *store) Stop() stop.Result {
ps.wg.Wait()
var err error
if ps.UniversalClient != nil {
log.Info("storage: Redis: exiting. mochi does not clear data in redis when exiting. mochi keys have prefix " + PrefixKey)
logger.Info().Msg("storage: Redis: exiting. mochi does not clear data in redis when exiting. mochi keys have prefix " + PrefixKey)
err = ps.UniversalClient.Close()
ps.UniversalClient = nil
}
@@ -799,11 +789,3 @@ func (ps *store) Stop() stop.Result {
return c.Result()
}
func (ps *store) LogFields() log.Fields {
fields := make(log.Fields, len(ps.logFields))
for k, v := range ps.logFields {
fields[k] = v
}
return fields
}
+42 -21
View File
@@ -7,6 +7,8 @@ import (
"sync"
"time"
"github.com/rs/zerolog"
"github.com/sot-tech/mochi/bittorrent"
"github.com/sot-tech/mochi/pkg/conf"
"github.com/sot-tech/mochi/pkg/log"
@@ -20,6 +22,7 @@ const (
)
var (
logger = log.NewLogger("storage configurator")
driversM sync.RWMutex
drivers = make(map[string]Builder)
)
@@ -38,21 +41,21 @@ type Config struct {
func (c Config) sanitizeGCConfig() (gcInterval, peerTTL time.Duration) {
if c.GarbageCollectionInterval <= 0 {
gcInterval = defaultGarbageCollectionInterval
log.Warn("falling back to default configuration", log.Fields{
"name": "GarbageCollectionInterval",
"provided": c.GarbageCollectionInterval,
"default": defaultGarbageCollectionInterval,
})
logger.Warn().
Str("name", "GarbageCollectionInterval").
Dur("provided", c.GarbageCollectionInterval).
Dur("default", defaultGarbageCollectionInterval).
Msg("falling back to default configuration")
} else {
gcInterval = c.GarbageCollectionInterval
}
if c.PeerLifetime <= 0 {
peerTTL = defaultPeerLifetime
log.Warn("falling back to default configuration", log.Fields{
"name": "PeerLifetime",
"provided": c.PeerLifetime,
"default": defaultPeerLifetime,
})
logger.Warn().
Str("name", "PeerLifetime").
Dur("provided", c.PeerLifetime).
Dur("default", defaultPeerLifetime).
Msg("falling back to default configuration")
} else {
peerTTL = c.PeerLifetime
}
@@ -62,11 +65,11 @@ func (c Config) sanitizeGCConfig() (gcInterval, peerTTL time.Duration) {
func (c Config) sanitizeStatisticsConfig() (statInterval time.Duration) {
if c.PrometheusReportingInterval < 0 {
statInterval = defaultPrometheusReportingInterval
log.Warn("falling back to default configuration", log.Fields{
"name": "PrometheusReportingInterval",
"provided": c.PrometheusReportingInterval,
"default": defaultPrometheusReportingInterval,
})
logger.Warn().
Str("name", "PrometheusReportingInterval").
Dur("provided", c.PrometheusReportingInterval).
Dur("default", defaultPrometheusReportingInterval).
Msg("falling back to default configuration")
}
return
}
@@ -207,9 +210,9 @@ type PeerStorage interface {
// For more details see the documentation in the stop package.
stop.Stopper
// Fielder returns a loggable version of the data used to configure and
// LogObjectMarshaler returns a loggable version of the data used to configure and
// operate a particular PeerStorage.
log.Fielder
zerolog.LogObjectMarshaler
}
// RegisterBuilder makes a Builder available by the provided name.
@@ -258,15 +261,33 @@ func NewStorage(name string, cfg conf.MapConfig) (ps PeerStorage, err error) {
}
if gc, isOk := ps.(GCAware); isOk {
gc.ScheduleGC(c.sanitizeGCConfig())
gcInterval, peerTTL := c.sanitizeGCConfig()
logger.Info().
Str("type", name).
Dur("gcInterval", gcInterval).
Dur("peerTTL", peerTTL).
Msg("scheduling GC")
gc.ScheduleGC(gcInterval, peerTTL)
} else {
logger.Debug().
Str("type", name).
Msg("storage does not support GC")
}
if statInterval := c.sanitizeStatisticsConfig(); statInterval > 0 {
if st, isOk := ps.(StatisticsAware); isOk {
if st, isOk := ps.(StatisticsAware); isOk {
if statInterval := c.sanitizeStatisticsConfig(); statInterval > 0 {
logger.Info().
Str("type", name).
Dur("statInterval", statInterval).
Msg("scheduling statistics collection")
st.ScheduleStatisticsCollection(statInterval)
} else {
logger.Info().Str("type", name).Msg("statistics collection disabled because of zero reporting interval")
}
} else {
log.Info("prometheus disabled because of zero reporting interval")
logger.Debug().
Str("type", name).
Msg("storage does not support statistics collection")
}
return
+10 -1
View File
@@ -9,14 +9,23 @@ import (
"github.com/stretchr/testify/require"
"github.com/sot-tech/mochi/bittorrent"
"github.com/sot-tech/mochi/pkg/log"
"github.com/sot-tech/mochi/storage"
)
func init() {
_ = log.ConfigureLogger("", "warn", false, false)
}
// PeerEqualityFunc is the boolean function to use to check two Peers for
// equality.
// Depending on the implementation of the PeerStorage, this can be changed to
// use (Peer).EqualEndpoint instead.
var PeerEqualityFunc = func(p1, p2 bittorrent.Peer) bool { return p1.Equal(p2) }
var PeerEqualityFunc = func(p1, p2 bittorrent.Peer) bool {
return p1.Port() == p2.Port() &&
p1.Addr().Compare(p1.Addr()) == 0 &&
p1.ID == p2.ID
}
type testHolder struct {
st storage.PeerStorage