diff --git a/.github/workflows/build.yaml b/.github/workflows/build.yaml index 5fea484..5681a5f 100644 --- a/.github/workflows/build.yaml +++ b/.github/workflows/build.yaml @@ -44,14 +44,15 @@ jobs: go-version: "^1.18" - name: "Install and configure mochi" run: | - go install --tags e2e ./cmd/mochi + go install ./cmd/mochi + go install ./cmd/mochi-e2e cat ./dist/example_config.yaml - name: "Run end-to-end tests" run: | - mochi --config=./dist/example_config.yaml --debug & + mochi --config=./dist/example_config.yaml --logLevel debug --logPretty & pid=$! sleep 2 - mochi e2e --debug + mochi-e2e kill $pid e2e-redis: name: "E2E Redis Tests" @@ -67,15 +68,16 @@ jobs: go-version: "^1.18" - name: "Install and configure mochi" run: | - go install --tags e2e ./cmd/mochi + go install ./cmd/mochi + go install ./cmd/mochi-e2e curl -LO https://github.com/jzelinskie/faq/releases/download/0.0.6/faq-linux-amd64 chmod +x faq-linux-amd64 ./faq-linux-amd64 '.mochi.storage = {"config":{"gc_interval":"3m","peer_lifetime":"31m","prometheus_reporting_interval":"1s","connect_timeout":"15s","read_timeout":"15s","write_timeout":"15s"},"name":"redis"}' ./dist/example_config.yaml > ./dist/example_redis_config.yaml cat ./dist/example_redis_config.yaml - name: "Run end-to-end tests" run: | - mochi --config=./dist/example_redis_config.yaml --debug & + mochi --config=./dist/example_redis_config.yaml --logLevel debug --logPretty & pid=$! sleep 2 - mochi e2e --debug + mochi-e2e kill $pid diff --git a/README.md b/README.md index e21e4d8..d8722ac 100644 --- a/README.md +++ b/README.md @@ -18,7 +18,7 @@ Modified version of [Chihaya](https://github.com/chihaya/chihaya), an open sourc * Allows mixed peers: IPv4 requesters can fetch IPv6 peers or vice versa; * Contains some internal improvements. -_Note: From time to time MoChi fetch modifications from Chihaya but is not +_Note: From time to time MoChi fetch modifications from Chihaya but is not fully compatible with original project (mainly in Redis storage structure), so it cannot be mixed with Chihaya (i.e. it is impossible create MoChi-Chihaya cluster)._ @@ -26,7 +26,8 @@ so it cannot be mixed with Chihaya (i.e. it is impossible create MoChi-Chihaya c The main goal of made modifications is to create semi-private tracker like [Hefur](https://github.com/sot-tech/hefur) but with cluster support (allowed torrents limited by pre-existent `list` middleware and another `directory` middleware -to [limit registered torrents](docs/middleware/torrent_approval.md)) and to maximize torrent swarm by providing maximum peers as possible (IPv4+IPv6). +to [limit registered torrents](docs/middleware/torrent_approval.md)) and to maximize torrent swarm by providing maximum +peers as possible (IPv4+IPv6). ## Notice diff --git a/bittorrent/bittorrent.go b/bittorrent/bittorrent.go index 689427e..8fc840e 100644 --- a/bittorrent/bittorrent.go +++ b/bittorrent/bittorrent.go @@ -13,8 +13,7 @@ import ( "net/netip" "github.com/pkg/errors" - - "github.com/sot-tech/mochi/pkg/log" + "github.com/rs/zerolog" ) // PeerIDLen is length of peer id field in bytes @@ -126,6 +125,14 @@ type Scrape struct { Incomplete uint32 } +// MarshalZerologObject writes fields into zerolog event +func (s Scrape) MarshalZerologObject(e *zerolog.Event) { + e.Stringer("infoHash", s.InfoHash). + Uint32("snatches", s.Snatches). + Uint32("complete", s.Complete). + Uint32("incomplete", s.Incomplete) +} + // Peer represents the connection details of a peer that is returned in an // announce response. type Peer struct { @@ -164,13 +171,6 @@ func NewPeer(data string) (Peer, error) { return peer, err } -// String implements fmt.Stringer to return a human-readable representation. -// The string will have the format @[]:, for example -// "0102030405060708090a0b0c0d0e0f1011121314@[10.11.12.13]:1234" -func (p Peer) String() string { - return fmt.Sprintf("%s@[%s]:%d", p.ID, p.Addr(), p.Port()) -} - // RawString generates concatenation of PeerID, net port and IP-address func (p Peer) RawString() string { ip := p.Addr() @@ -181,29 +181,18 @@ func (p Peer) RawString() string { return string(b) } -// LogFields renders the current peer as a set of Logrus fields. -func (p Peer) LogFields() log.Fields { - return log.Fields{ - "id": p.ID, - "ip": p.Addr(), - "port": p.Port(), - } -} - -// Equal reports whether p and x are the same. -func (p Peer) Equal(x Peer) bool { return p.EqualEndpoint(x) && p.ID == x.ID } - -// EqualEndpoint reports whether p and x have the same endpoint. -func (p Peer) EqualEndpoint(x Peer) bool { - return p.Port() == x.Port() && - p.Addr().Compare(x.Addr()) == 0 -} - // Addr returns unmapped peer's IP address func (p Peer) Addr() netip.Addr { return p.AddrPort.Addr().Unmap() } +// MarshalZerologObject writes fields into zerolog event +func (p Peer) MarshalZerologObject(e *zerolog.Event) { + e.Stringer("id", p.ID). + Stringer("address", p.Addr()). + Uint16("port", p.Port()) +} + // ClientError represents an error that should be exposed to the client over // the BitTorrent protocol implementation. type ClientError string diff --git a/bittorrent/bittorrent_test.go b/bittorrent/bittorrent_test.go deleted file mode 100644 index 4dac3e3..0000000 --- a/bittorrent/bittorrent_test.go +++ /dev/null @@ -1,57 +0,0 @@ -package bittorrent - -import ( - "fmt" - "net/netip" - "testing" - - "github.com/stretchr/testify/require" -) - -var ( - b = []byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20} - expected = "0102030405060708090a0b0c0d0e0f1011121314" -) - -func TestPeerID_String(t *testing.T) { - pid, err := NewPeerID(b) - require.Nil(t, err) - s := pid.String() - require.Equal(t, expected, s) -} - -func TestInfoHash_String(t *testing.T) { - ih, err := NewInfoHash(b) - require.Nil(t, err) - require.Equal(t, expected, ih.String()) -} - -func TestPeer_String(t *testing.T) { - pid, err := NewPeerID(b) - require.Nil(t, err) - id, _ := NewPeerID(b) - peerStringTestCases := []struct { - input Peer - expected string - }{ - { - input: Peer{ - ID: id, - AddrPort: netip.MustParseAddrPort("10.11.12.1:1234"), - }, - expected: fmt.Sprintf("%s@[10.11.12.1]:1234", expected), - }, - { - input: Peer{ - ID: id, - AddrPort: netip.MustParseAddrPort("[2001:db8::ff00:42:8329]:1234"), - }, - expected: fmt.Sprintf("%s@[2001:db8::ff00:42:8329]:1234", expected), - }, - } - for _, c := range peerStringTestCases { - c.input.ID = pid - got := c.input.String() - require.Equal(t, c.expected, got) - } -} diff --git a/bittorrent/event.go b/bittorrent/event.go index e47950e..bb54ed1 100644 --- a/bittorrent/event.go +++ b/bittorrent/event.go @@ -24,42 +24,49 @@ const ( // Completed is the event sent by a BitTorrent client when it finishes // downloading all of the required chunks. Completed + + // NoneStr string representation of None event + NoneStr = "none" + + // StartedStr string representation of Started event + StartedStr = "started" + + // StoppedStr string representation of Stopped event + StoppedStr = "stopped" + + // CompletedStr string representation of Completed event + CompletedStr = "completed" ) -var ( - eventToString = make(map[Event]string) - stringToEvent = make(map[string]Event) -) - -func init() { - eventToString[None] = "none" - eventToString[Started] = "started" - eventToString[Stopped] = "stopped" - eventToString[Completed] = "completed" - - stringToEvent[""] = None - - for k, v := range eventToString { - stringToEvent[v] = k - } -} - // NewEvent returns the proper Event given a string. func NewEvent(eventStr string) (evt Event, err error) { - if e, ok := stringToEvent[strings.ToLower(eventStr)]; ok { - evt = e - } else { + switch strings.ToLower(eventStr) { + case NoneStr, "": + evt = None + case StartedStr: + evt = Started + case StoppedStr: + evt = Stopped + case CompletedStr: + evt = Completed + default: evt, err = None, ErrUnknownEvent } - return } // String implements Stringer for an event. func (e Event) String() (s string) { - if name, ok := eventToString[e]; ok { - s = name - } else { + switch e { + case None: + s = NoneStr + case Started: + s = StartedStr + case Stopped: + s = StoppedStr + case Completed: + s = CompletedStr + default: s = "" } return diff --git a/bittorrent/params.go b/bittorrent/params.go index 61dc425..f7aaf02 100644 --- a/bittorrent/params.go +++ b/bittorrent/params.go @@ -6,7 +6,7 @@ import ( "strconv" "strings" - "github.com/sot-tech/mochi/pkg/log" + "github.com/rs/zerolog" ) // Params is used to fetch (optional) request parameters from an Announce. @@ -31,6 +31,8 @@ type Params interface { // For a request of the form "/announce?port=1234" this would return // "port=1234" RawQuery() string + + zerolog.LogObjectMarshaler } var ( @@ -151,7 +153,6 @@ func parseQuery(query string) (q *QueryParams, err error) { // But frontends record these errors to prometheus, which generates // a lot of time series. // We log it here for debugging instead. - log.Debug("failed to unescape query param key", log.Err(err)) return nil, ErrInvalidQueryEscape } value, err = url.QueryUnescape(value) @@ -160,7 +161,6 @@ func parseQuery(query string) (q *QueryParams, err error) { // But frontends record these errors to prometheus, which generates // a lot of time series. // We log it here for debugging instead. - log.Debug("failed to unescape query param value", log.Err(err)) return nil, ErrInvalidQueryEscape } @@ -210,3 +210,8 @@ func (qp *QueryParams) RawPath() string { func (qp *QueryParams) RawQuery() string { return qp.query } + +// MarshalZerologObject writes fields into zerolog event +func (qp QueryParams) MarshalZerologObject(e *zerolog.Event) { + e.Str("path", qp.path).Str("query", qp.query) +} diff --git a/bittorrent/request.go b/bittorrent/request.go index a095109..f84a49e 100644 --- a/bittorrent/request.go +++ b/bittorrent/request.go @@ -1,12 +1,11 @@ package bittorrent import ( - "fmt" "net/netip" "sort" "time" - "github.com/sot-tech/mochi/pkg/log" + "github.com/rs/zerolog" ) // RequestAddress wrapper for netip.Addr with Provided flag. @@ -21,14 +20,9 @@ func (a RequestAddress) Validate() bool { return a.IsValid() && !a.IsUnspecified() } -func (a RequestAddress) String() string { - var p string - if a.Provided { - p = "(provided)" - } else { - p = "(detected)" - } - return fmt.Sprint(a.Addr.String(), p) +// MarshalZerologObject writes fields into zerolog event +func (a RequestAddress) MarshalZerologObject(e *zerolog.Event) { + e.Stringer("address", a.Addr).Bool("provided", a.Provided) } // RequestAddresses is an array of RequestAddress used mainly for @@ -89,6 +83,23 @@ func (aa RequestAddresses) GetFirst() netip.Addr { return a } +// MarshalZerologArray writes array elements to zerolog event +func (aa RequestAddresses) MarshalZerologArray(a *zerolog.Array) { + for _, addr := range aa { + a.Object(addr) + } +} + +// Peers wrapper of array of Peer-s +type Peers []Peer + +// MarshalZerologArray writes array elements to zerolog event +func (p Peers) MarshalZerologArray(a *zerolog.Array) { + for _, peer := range p { + a.Object(peer) + } +} + // RequestPeer is bundle of peer ID, provided or // determined addresses and net port type RequestPeer struct { @@ -99,7 +110,7 @@ type RequestPeer struct { // Peers constructs array of Peer-s with the same ID and Port // for every RequestAddress array. -func (rp RequestPeer) Peers() (peers []Peer) { +func (rp RequestPeer) Peers() (peers Peers) { for _, a := range rp.RequestAddresses { peers = append(peers, Peer{ ID: rp.ID, @@ -109,6 +120,13 @@ func (rp RequestPeer) Peers() (peers []Peer) { return } +// MarshalZerologObject writes fields into zerolog event +func (rp RequestPeer) MarshalZerologObject(e *zerolog.Event) { + e.Stringer("id", rp.ID). + Array("addresses", rp.RequestAddresses). + Uint16("port", rp.Port) +} + // AnnounceRequest represents the parsed parameters from an announce request. type AnnounceRequest struct { Event Event @@ -125,21 +143,19 @@ type AnnounceRequest struct { Params } -// LogFields renders the current response as a set of log fields. -func (r AnnounceRequest) LogFields() log.Fields { - return log.Fields{ - "event": r.Event, - "infoHash": r.InfoHash, - "compact": r.Compact, - "eventProvided": r.EventProvided, - "numWantProvided": r.NumWantProvided, - "numWant": r.NumWant, - "left": r.Left, - "downloaded": r.Downloaded, - "uploaded": r.Uploaded, - "peers": r.RequestPeer, - "params": r.Params, - } +// MarshalZerologObject writes fields into zerolog event +func (r AnnounceRequest) MarshalZerologObject(e *zerolog.Event) { + e.Stringer("event", r.Event). + Stringer("infoHash", r.InfoHash). + Bool("compact", r.Compact). + Bool("eventProvided", r.EventProvided). + Bool("numWantProvided", r.NumWantProvided). + Uint32("numWant", r.NumWant). + Uint64("left", r.Left). + Uint64("downloaded", r.Downloaded). + Uint64("uploaded", r.Uploaded). + Object("peers", r.RequestPeer). + Object("params", r.Params) } // AnnounceResponse represents the parameters used to create an announce @@ -150,19 +166,28 @@ type AnnounceResponse struct { Incomplete uint32 Interval time.Duration MinInterval time.Duration - IPv4Peers []Peer - IPv6Peers []Peer + IPv4Peers Peers + IPv6Peers Peers } -// LogFields renders the current response as a set of log fields. -func (r AnnounceResponse) LogFields() log.Fields { - return log.Fields{ - "compact": r.Compact, - "complete": r.Complete, - "interval": r.Interval, - "minInterval": r.MinInterval, - "ipv4Peers": r.IPv4Peers, - "ipv6Peers": r.IPv6Peers, +// MarshalZerologObject writes fields into zerolog event +func (r AnnounceResponse) MarshalZerologObject(e *zerolog.Event) { + e.Bool("compact", r.Compact). + Uint32("complete", r.Complete). + Uint32("incomplete", r.Incomplete). + Dur("interval", r.Interval). + Dur("minInterval", r.MinInterval). + Array("ipv4Peers", r.IPv4Peers). + Array("ipv6Peers", r.IPv6Peers) +} + +// InfoHashes wrapper of array of InfoHash-es +type InfoHashes []InfoHash + +// MarshalZerologArray writes array elements to zerolog event +func (i InfoHashes) MarshalZerologArray(a *zerolog.Array) { + for _, ih := range i { + a.Str(ih.String()) } } @@ -171,16 +196,24 @@ type ScrapeRequest struct { // RequestAddresses not used in internal logic, // but MAY be used in middleware (per-ip block etc.) RequestAddresses - InfoHashes []InfoHash + InfoHashes InfoHashes Params Params } -// LogFields renders the current response as a set of log fields. -func (r ScrapeRequest) LogFields() log.Fields { - return log.Fields{ - "ip": r.RequestAddresses, - "infoHashes": r.InfoHashes, - "params": r.Params, +// MarshalZerologObject writes fields into zerolog event +func (r ScrapeRequest) MarshalZerologObject(e *zerolog.Event) { + e.Array("addresses", r.RequestAddresses). + Array("infoHashes", r.InfoHashes). + Object("params", r.Params) +} + +// Scrapes wrapper of array of Scrape-s +type Scrapes []Scrape + +// MarshalZerologArray writes array elements to zerolog event +func (s Scrapes) MarshalZerologArray(a *zerolog.Array) { + for _, scrape := range s { + a.Object(scrape) } } @@ -189,12 +222,10 @@ func (r ScrapeRequest) LogFields() log.Fields { // The Scrapes must be in the same order as the InfoHashes in the corresponding // ScrapeRequest. type ScrapeResponse struct { - Files []Scrape + Files Scrapes } -// LogFields renders the current response as a set of Logrus fields. -func (sr ScrapeResponse) LogFields() log.Fields { - return log.Fields{ - "files": sr.Files, - } +// MarshalZerologObject writes fields into zerolog event +func (sr ScrapeResponse) MarshalZerologObject(e *zerolog.Event) { + e.Array("scrapes", sr.Files) } diff --git a/bittorrent/sanitize.go b/bittorrent/sanitize.go index 2d8167e..8fe705c 100644 --- a/bittorrent/sanitize.go +++ b/bittorrent/sanitize.go @@ -5,6 +5,7 @@ import ( ) var ( + logger = log.NewLogger("request sanitizer") // ErrInvalidIP indicates an invalid IP for an Announce. ErrInvalidIP = ClientError("invalid IP") @@ -15,6 +16,7 @@ var ( // SanitizeAnnounce enforces a max and default NumWant and coerces the peer's // IP address into the proper format. func SanitizeAnnounce(r *AnnounceRequest, maxNumWant, defaultNumWant uint32) error { + logger.Trace().Object("request", r).Msg("source announce") if r.Port == 0 { return ErrInvalidPort } @@ -29,18 +31,14 @@ func SanitizeAnnounce(r *AnnounceRequest, maxNumWant, defaultNumWant uint32) err r.NumWant = maxNumWant } - log.Debug("sanitized announce", r, log.Fields{ - "port": r.Port, - "addresses": r.RequestAddresses, - "maxNumWant": maxNumWant, - "defaultNumWant": defaultNumWant, - }) + logger.Trace().Object("request", r).Msg("sanitized announce") return nil } // SanitizeScrape enforces a max number of infohashes for a single scrape // request and checks if addresses are valid. func SanitizeScrape(r *ScrapeRequest, maxScrapeInfoHashes uint32) error { + logger.Trace().Object("request", r).Msg("source scrape") if len(r.InfoHashes) > int(maxScrapeInfoHashes) { r.InfoHashes = r.InfoHashes[:maxScrapeInfoHashes] } @@ -49,9 +47,6 @@ func SanitizeScrape(r *ScrapeRequest, maxScrapeInfoHashes uint32) error { return ErrInvalidIP } - log.Debug("sanitized scrape", r, log.Fields{ - "addresses": r.RequestAddresses, - "maxScrapeInfoHashes": maxScrapeInfoHashes, - }) + logger.Trace().Object("request", r).Msg("sanitized scrape") return nil } diff --git a/cmd/mochi/e2e.go b/cmd/mochi-e2e/e2e.go similarity index 57% rename from cmd/mochi/e2e.go rename to cmd/mochi-e2e/e2e.go index 19c5c91..7abc45a 100644 --- a/cmd/mochi/e2e.go +++ b/cmd/mochi-e2e/e2e.go @@ -1,82 +1,50 @@ -//go:build e2e -// +build e2e - package main import ( + "flag" "fmt" + "log" "math/rand" "time" "github.com/anacrolix/torrent/tracker" - "github.com/spf13/cobra" "github.com/sot-tech/mochi/bittorrent" - "github.com/sot-tech/mochi/pkg/log" ) -func init() { - e2eCmd = &cobra.Command{ - Use: "e2e", - Short: "exec e2e tests", - Long: "Execute the Conf end-to-end test suite", - RunE: EndToEndRunCmdFunc, - } +func main() { + httpAddress := flag.String("httpaddr", "http://127.0.0.1:6969/announce", "address of the HTTP tracker") + udpAddress := flag.String("udpaddr", "udp://127.0.0.1:6969", "address of the UDP tracker") + delay := flag.Duration("delay", time.Second, "delay between announces") + flag.Parse() - e2eCmd.Flags().String("httpaddr", "http://127.0.0.1:6969/announce", "address of the HTTP tracker") - e2eCmd.Flags().String("udpaddr", "udp://127.0.0.1:6969", "address of the UDP tracker") - e2eCmd.Flags().Duration("delay", time.Second, "delay between announces") -} - -// EndToEndRunCmdFunc implements a Cobra command that runs the end-to-end test -// suite for a Conf build. -func EndToEndRunCmdFunc(cmd *cobra.Command, args []string) error { - delay, err := cmd.Flags().GetDuration("delay") - if err != nil { - return err - } - - // Test the HTTP tracker - httpAddr, err := cmd.Flags().GetString("httpaddr") - if err != nil { - return err - } - - if len(httpAddr) != 0 { - log.Info("testing HTTP...") - err := test(httpAddr, delay) + if len(*httpAddress) != 0 { + log.Println("testing HTTP...") + err := test(*httpAddress, *delay) if err != nil { - return err + log.Fatal(err) } - log.Info("success") + log.Println("success") } - // Test the UDP tracker. - udpAddr, err := cmd.Flags().GetString("udpaddr") - if err != nil { - return err - } - - if len(udpAddr) != 0 { - log.Info("testing UDP...") - err := test(udpAddr, delay) + if len(*udpAddress) != 0 { + log.Println("testing UDP...") + err := test(*udpAddress, *delay) if err != nil { - return err + log.Fatal(err) } - log.Info("success") + log.Println("success") } - - return nil } func test(addr string, delay time.Duration) error { b := make([]byte, bittorrent.InfoHashV1Len) rand.Read(b) ih, _ := bittorrent.NewInfoHash(b) - return testWithInfohash(ih, addr, delay) + return testWithInfoHash(ih, addr, delay) } -func testWithInfohash(infoHash bittorrent.InfoHash, url string, delay time.Duration) error { +func testWithInfoHash(infoHash bittorrent.InfoHash, url string, delay time.Duration) error { var ih [bittorrent.InfoHashV1Len]byte req := tracker.AnnounceRequest{ InfoHash: ih, diff --git a/cmd/mochi/main.go b/cmd/mochi/main.go index 0a9e05d..788a1d5 100644 --- a/cmd/mochi/main.go +++ b/cmd/mochi/main.go @@ -1,239 +1,43 @@ package main import ( - "context" - "errors" - "fmt" + "flag" + "log" + "os" "os/signal" "runtime" - "strings" "syscall" - "github.com/sirupsen/logrus" - "github.com/spf13/cobra" - - "github.com/sot-tech/mochi/frontend/http" - "github.com/sot-tech/mochi/frontend/udp" - "github.com/sot-tech/mochi/middleware" - "github.com/sot-tech/mochi/pkg/conf" - "github.com/sot-tech/mochi/pkg/log" - "github.com/sot-tech/mochi/pkg/metrics" - _ "github.com/sot-tech/mochi/pkg/randseed" - "github.com/sot-tech/mochi/pkg/stop" - "github.com/sot-tech/mochi/storage" + l "github.com/sot-tech/mochi/pkg/log" ) -var e2eCmd *cobra.Command - -// Run represents the state of a running instance of Conf. -type Run struct { - configFilePath string - storage storage.PeerStorage - logic *middleware.Logic - sg *stop.Group -} - -// NewRun runs an instance of Conf. -func NewRun(configFilePath string) (*Run, error) { - r := &Run{ - configFilePath: configFilePath, - } - - return r, r.Start(nil) -} - -// Start begins an instance of Conf. -// It is optional to provide an instance of the peer store to avoid the -// creation of a new one. -func (r *Run) Start(ps storage.PeerStorage) error { - configFile, err := ParseConfigFile(r.configFilePath) - if err != nil { - return fmt.Errorf("failed to read config: %w", err) - } - cfg := configFile.Conf - - r.sg = stop.NewGroup() - - if len(cfg.MetricsAddr) > 0 { - log.Info("starting metrics server", log.Fields{"addr": cfg.MetricsAddr}) - r.sg.Add(metrics.NewServer(cfg.MetricsAddr)) - } else { - log.Info("metrics disabled because of empty address") - } - - if ps == nil { - log.Info("starting storage", log.Fields{"name": cfg.Storage.Name}) - ps, err = storage.NewStorage(cfg.Storage.Name, cfg.Storage.Config) - if err != nil { - return fmt.Errorf("failed to create storage: %w", err) - } - log.Info("started storage", ps) - } - r.storage = ps - - preHooks, err := middleware.HooksFromHookConfigs(cfg.PreHooks, r.storage) - if err != nil { - return fmt.Errorf("failed to validate hook config: %w", err) - } - postHooks, err := middleware.HooksFromHookConfigs(cfg.PostHooks, r.storage) - if err != nil { - return fmt.Errorf("failed to validate hook config: %w", err) - } - - r.logic = middleware.NewLogic(cfg.AnnounceInterval, cfg.MinAnnounceInterval, r.storage, preHooks, postHooks) - - if len(cfg.HTTPConfig) > 0 { - log.Info("starting HTTP frontend", cfg.HTTPConfig) - httpFE, err := http.NewFrontend(r.logic, cfg.HTTPConfig) - if err == nil { - r.sg.Add(httpFE) - } else if !errors.Is(err, conf.ErrNilConfigMap) { - return err - } - } - - if len(cfg.UDPConfig) > 0 { - log.Info("starting UDP frontend", cfg.UDPConfig) - udpFE, err := udp.NewFrontend(r.logic, cfg.UDPConfig) - if err == nil { - r.sg.Add(udpFE) - } else if !errors.Is(err, conf.ErrNilConfigMap) { - return err - } - } - - return nil -} - -func combineErrors(prefix string, errs []error) error { - errStrs := make([]string, 0, len(errs)) - for _, err := range errs { - errStrs = append(errStrs, err.Error()) - } - - return errors.New(prefix + ": " + strings.Join(errStrs, "; ")) -} - -// Stop shuts down an instance of Conf. -func (r *Run) Stop(keepPeerStore bool) (storage.PeerStorage, error) { - log.Debug("stopping frontends and metrics server") - if errs := r.sg.Stop().Wait(); len(errs) != 0 { - return nil, combineErrors("failed while shutting down frontends", errs) - } - - log.Debug("stopping logic") - if errs := r.logic.Stop().Wait(); len(errs) != 0 { - return nil, combineErrors("failed while shutting down middleware", errs) - } - - if !keepPeerStore { - log.Debug("stopping peer store") - if errs := r.storage.Stop().Wait(); len(errs) != 0 { - return nil, combineErrors("failed while shutting down peer store", errs) - } - r.storage = nil - } - - return r.storage, nil -} - -// RootRunCmdFunc implements a Cobra command that runs an instance of Conf -// and handles reloading and shutdown via process signals. -func RootRunCmdFunc(cmd *cobra.Command, _ []string) error { - configFilePath, err := cmd.Flags().GetString("config") - if err != nil { - return err - } - - r, err := NewRun(configFilePath) - if err != nil { - return err - } - - shutdown, _ := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM) - reload, _ := signal.NotifyContext(context.Background(), ReloadSignals...) - - for { - select { - case <-reload.Done(): - log.Info("reloading; received reload signal") - peerStore, err := r.Stop(true) - if err != nil { - return err - } - - if err := r.Start(peerStore); err != nil { - return err - } - case <-shutdown.Done(): - log.Info("shutting down; received shutdown signal") - if _, err := r.Stop(false); err != nil { - return err - } - - return nil - } - } -} - -// RootPreRunCmdFunc handles command line flags for the Run command. -func RootPreRunCmdFunc(cmd *cobra.Command, _ []string) error { - noColors, err := cmd.Flags().GetBool("nocolors") - if err != nil { - return err - } - if noColors { - log.SetFormatter(&logrus.TextFormatter{DisableColors: true}) - } - - jsonLog, err := cmd.Flags().GetBool("json") - if err != nil { - return err - } - if jsonLog { - log.SetFormatter(&logrus.JSONFormatter{}) - log.Info("enabled JSON logging") - } - - debugLog, err := cmd.Flags().GetBool("debug") - if err != nil { - return err - } - if debugLog { - log.SetDebug(true) - log.Info("enabled debug logging") - } - - return nil -} - -// RootPostRunCmdFunc handles clean up of any state initialized by command line -// flags. -func RootPostRunCmdFunc(_ *cobra.Command, _ []string) error { - return nil -} +const ( + logOutArg = "logOut" + logLevelArg = "logLevel" + logPrettyArg = "logPretty" + logColorsArg = "logColored" + configArg = "config" +) func main() { - rootCmd := &cobra.Command{ - Use: "mochi", - Short: "BitTorrent Tracker", - Long: "A customizable, multi-protocol BitTorrent Tracker", - PersistentPreRunE: RootPreRunCmdFunc, - RunE: RootRunCmdFunc, - PersistentPostRunE: RootPostRunCmdFunc, + var s Server + + logOut := flag.String(logOutArg, "stderr", "output for logging, might be 'stderr', 'stdout' or file path") + logLevel := flag.String(logLevelArg, "warn", "logging level: trace, debug, info, warn, error, fatal, panic") + logPretty := flag.Bool(logPrettyArg, false, "enable log pretty print. used only if 'logOut' set to 'stdout' or 'stderr'. if not set, log outputs json") + logColored := flag.Bool(logColorsArg, runtime.GOOS == "windows", "enable log coloring. used only if set 'logPretty'") + configPath := flag.String(configArg, "/etc/mochi.yaml", "location of configuration file") + flag.Parse() + + if err := l.ConfigureLogger(*logOut, *logLevel, *logPretty, *logColored); err != nil { + log.Fatal("unable to configure logger ", err) } - rootCmd.PersistentFlags().Bool("debug", false, "enable debug logging") - rootCmd.PersistentFlags().Bool("json", false, "enable json logging") - rootCmd.PersistentFlags().Bool("nocolors", runtime.GOOS == "windows", "disable log coloring") - - rootCmd.Flags().String("config", "/etc/mochi.yaml", "location of configuration file") - - if e2eCmd != nil { - rootCmd.AddCommand(e2eCmd) - } - - if err := rootCmd.Execute(); err != nil { - log.Fatal("failed when executing root cobra command: " + err.Error()) + if err := s.Run(*configPath); err != nil { + log.Fatal("unable to start server ", err) } + defer s.Dispose() + ch := make(chan os.Signal, 2) + signal.Notify(ch, os.Interrupt, syscall.SIGTERM) + <-ch } diff --git a/cmd/mochi/server.go b/cmd/mochi/server.go new file mode 100644 index 0000000..7f42248 --- /dev/null +++ b/cmd/mochi/server.go @@ -0,0 +1,106 @@ +package main + +import ( + "errors" + "fmt" + + "github.com/sot-tech/mochi/frontend/http" + "github.com/sot-tech/mochi/frontend/udp" + "github.com/sot-tech/mochi/middleware" + "github.com/sot-tech/mochi/pkg/log" + "github.com/sot-tech/mochi/pkg/metrics" + "github.com/sot-tech/mochi/pkg/stop" + "github.com/sot-tech/mochi/storage" +) + +// Server represents the state of a running instance. +type Server struct { + storage storage.PeerStorage + logic *middleware.Logic + sg *stop.Group +} + +// Run begins an instance of Conf. +// It is optional to provide an instance of the peer store to avoid the +// creation of a new one. +func (r *Server) Run(configFilePath string) error { + configFile, err := ParseConfigFile(configFilePath) + if err != nil { + return fmt.Errorf("failed to read config: %w", err) + } + cfg := configFile.Conf + + r.sg = stop.NewGroup() + + if len(cfg.MetricsAddr) > 0 { + log.Info().Str("address", cfg.MetricsAddr).Msg("starting metrics server") + r.sg.Add(metrics.NewServer(cfg.MetricsAddr)) + } else { + log.Info().Msg("metrics disabled because of empty address") + } + + log.Info().Str("name", cfg.Storage.Name).Msg("starting storage") + r.storage, err = storage.NewStorage(cfg.Storage.Name, cfg.Storage.Config) + if err != nil { + return fmt.Errorf("failed to create storage: %w", err) + } + log.Info().Object("config", r.storage).Msg("started storage") + + preHooks, err := middleware.NewHooks(cfg.PreHooks, r.storage) + if err != nil { + return fmt.Errorf("failed to validate hook config: %w", err) + } + postHooks, err := middleware.NewHooks(cfg.PostHooks, r.storage) + if err != nil { + return fmt.Errorf("failed to validate hook config: %w", err) + } + + r.logic = middleware.NewLogic(cfg.AnnounceInterval, cfg.MinAnnounceInterval, r.storage, preHooks, postHooks) + + var started bool + if len(cfg.HTTPConfig) > 0 { + log.Info().Object("config", cfg.HTTPConfig).Msg("starting HTTP frontend") + httpFE, err := http.NewFrontend(r.logic, cfg.HTTPConfig) + if err == nil { + r.sg.Add(httpFE) + started = true + } else { + return err + } + } + + if len(cfg.UDPConfig) > 0 { + log.Info().Object("config", cfg.UDPConfig).Msg("starting UDP frontend") + udpFE, err := udp.NewFrontend(r.logic, cfg.UDPConfig) + if err == nil { + r.sg.Add(udpFE) + started = true + } else { + return err + } + } + if !started { + return errors.New("no frontends configured") + } + + return nil +} + +// Dispose shuts down an instance of Server. +func (r *Server) Dispose() { + log.Debug().Msg("stopping frontends and metrics server") + if errs := r.sg.Stop().Wait(); len(errs) > 0 { + log.Error().Errs("errors", errs).Msg("error occurred while shutting down frontends") + } + + log.Debug().Msg("stopping logic") + if errs := r.logic.Stop().Wait(); len(errs) > 0 { + log.Error().Errs("errors", errs).Msg("error occurred while shutting down middlewares") + } + + log.Debug().Msg("stopping peer store") + if errs := r.storage.Stop().Wait(); len(errs) != 0 { + log.Error().Errs("errors", errs).Msg("error occurred while shutting down peer store") + } + log.Close() +} diff --git a/cmd/mochi/signal_unix.go b/cmd/mochi/signal_unix.go deleted file mode 100644 index 9dc3fd4..0000000 --- a/cmd/mochi/signal_unix.go +++ /dev/null @@ -1,14 +0,0 @@ -//go:build darwin || freebsd || linux || netbsd || openbsd || dragonfly || solaris - -package main - -import ( - "os" - "syscall" -) - -// ReloadSignals are the signals that the current OS will send to the process -// when a configuration reload is requested. -var ReloadSignals = []os.Signal{ - syscall.SIGUSR1, -} diff --git a/cmd/mochi/signal_windows.go b/cmd/mochi/signal_windows.go deleted file mode 100644 index 684169f..0000000 --- a/cmd/mochi/signal_windows.go +++ /dev/null @@ -1,14 +0,0 @@ -//go:build windows - -package main - -import ( - "os" - "syscall" -) - -// ReloadSignals are the signals that the current OS will send to the process -// when a configuration reload is requested. -var ReloadSignals = []os.Signal{ - syscall.SIGHUP, -} diff --git a/docs/middleware/torrent_approval.md b/docs/middleware/torrent_approval.md index 488b172..4828ead 100644 --- a/docs/middleware/torrent_approval.md +++ b/docs/middleware/torrent_approval.md @@ -24,9 +24,9 @@ There are two sources of hashes: `list` and `directory`. * `list` is the static set of hashes, specified in configuration file. * `directory` will watch for `*.torrent` files in specified path and -append/delete records from storage. This source will parse all existing -files at start and then watch for new files to add, or for delete events -to remove hash from storage. + append/delete records from storage. This source will parse all existing + files at start and then watch for new files to add, or for delete events + to remove hash from storage. Note: if storage is not `memory`, and `preserve` option set to `true`, records will be persisted in storage until _somebody_ or _something_ (different tool with access diff --git a/docs/storage/keydb.md b/docs/storage/keydb.md index 4383f22..aa1168f 100644 --- a/docs/storage/keydb.md +++ b/docs/storage/keydb.md @@ -3,18 +3,18 @@ This storage mainly the same as Redis and uses some of [redis](redis.md) store logic with next exceptions: -* peers stored in [sets](https://redis.io/docs/manual/data-types/#sets) -instead of [hashes](https://redis.io/docs/manual/data-types/#hashes); +* peers stored in [sets](https://redis.io/docs/manual/data-types/#sets) + instead of [hashes](https://redis.io/docs/manual/data-types/#hashes); * keys such as `CHI_I`, `CHI_S_C` and `CHI_L_C` not used (at all); -* peer TTL relies on KeyDB's [EXPIREMEMBER](https://docs.keydb.dev/docs/commands/#expiremember) -command, so MoChi does not need to periodically check peer expiration; +* peer TTL relies on KeyDB's [EXPIREMEMBER](https://docs.keydb.dev/docs/commands/#expiremember) + command, so MoChi does not need to periodically check peer expiration; * storage does not execute periodical statistics collection (peer/lecher/info hash count) -because: - * manual calculation (INC/DEC peers count) is not usable - * manual scan of all keys is quite expensive operation. + because: + * manual calculation (INC/DEC peers count) is not usable + * manual scan of all keys is quite expensive operation. ## Use Case diff --git a/frontend/http/frontend.go b/frontend/http/frontend.go index 656f428..1a6189e 100644 --- a/frontend/http/frontend.go +++ b/frontend/http/frontend.go @@ -20,6 +20,8 @@ import ( "github.com/sot-tech/mochi/pkg/stop" ) +var logger = log.NewLogger("http frontend") + // Config represents all of the configurable options for an HTTP BitTorrent // Frontend. type Config struct { @@ -38,29 +40,6 @@ type Config struct { ParseOptions } -// LogFields renders the current config as a set of Logrus fields. -func (cfg Config) LogFields() log.Fields { - return log.Fields{ - "addr": cfg.Addr, - "httpsAddr": cfg.HTTPSAddr, - "readTimeout": cfg.ReadTimeout, - "writeTimeout": cfg.WriteTimeout, - "idleTimeout": cfg.IdleTimeout, - "enableKeepAlive": cfg.EnableKeepAlive, - "tlsCertPath": cfg.TLSCertPath, - "tlsKeyPath": cfg.TLSKeyPath, - "announceRoutes": cfg.AnnounceRoutes, - "scrapeRoutes": cfg.ScrapeRoutes, - "pingRoutes": cfg.PingRoutes, - "enableRequestTiming": cfg.EnableRequestTiming, - "allowIPSpoofing": cfg.AllowIPSpoofing, - "realIPHeader": cfg.RealIPHeader, - "maxNumWant": cfg.MaxNumWant, - "defaultNumWant": cfg.DefaultNumWant, - "maxScrapeInfoHashes": cfg.MaxScrapeInfoHashes, - } -} - // Default config constants. const ( defaultReadTimeout = 2 * time.Second @@ -77,20 +56,20 @@ func (cfg Config) Validate() Config { if cfg.ReadTimeout <= 0 { validcfg.ReadTimeout = defaultReadTimeout - log.Warn("falling back to default configuration", log.Fields{ - "name": "http.ReadTimeout", - "provided": cfg.ReadTimeout, - "default": validcfg.ReadTimeout, - }) + logger.Warn(). + Str("name", "http.ReadTimeout"). + Dur("provided", cfg.ReadTimeout). + Dur("default", validcfg.ReadTimeout). + Msg("falling back to default configuration") } if cfg.WriteTimeout <= 0 { validcfg.WriteTimeout = defaultWriteTimeout - log.Warn("falling back to default configuration", log.Fields{ - "name": "http.WriteTimeout", - "provided": cfg.WriteTimeout, - "default": validcfg.WriteTimeout, - }) + logger.Warn(). + Str("name", "http.WriteTimeout"). + Dur("provided", cfg.WriteTimeout). + Dur("default", validcfg.WriteTimeout). + Msg("falling back to default configuration") } if cfg.IdleTimeout <= 0 { @@ -98,11 +77,11 @@ func (cfg Config) Validate() Config { if cfg.EnableKeepAlive { // If keepalive is disabled, this configuration isn't used anyway. - log.Warn("falling back to default configuration", log.Fields{ - "name": "http.IdleTimeout", - "provided": cfg.IdleTimeout, - "default": validcfg.IdleTimeout, - }) + logger.Warn(). + Str("name", "http.IdleTimeout"). + Dur("provided", cfg.IdleTimeout). + Dur("default", validcfg.IdleTimeout). + Msg("falling back to default configuration") } } @@ -178,7 +157,7 @@ func NewFrontend(logic frontend.TrackerLogic, c conf.MapConfig) (*Frontend, erro if cfg.Addr != "" { go func() { if err := f.serveHTTP(router, false); err != nil { - log.Fatal("failed while serving http", log.Err(err)) + logger.Fatal().Err(err).Str("proto", "http").Msg("failed while serving") } }() } @@ -186,7 +165,7 @@ func NewFrontend(logic frontend.TrackerLogic, c conf.MapConfig) (*Frontend, erro if cfg.HTTPSAddr != "" { go func() { if err := f.serveHTTP(router, true); err != nil { - log.Fatal("failed while serving https", log.Err(err)) + logger.Fatal().Err(err).Str("proto", "https").Msg("failed while serving") } }() } @@ -194,7 +173,7 @@ func NewFrontend(logic frontend.TrackerLogic, c conf.MapConfig) (*Frontend, erro return f, nil } -// Stop provides a thread-safe way to shutdown a currently running Frontend. +// Stop provides a thread-safe way to shut down a currently running Frontend. func (f *Frontend) Stop() stop.Result { stopGroup := stop.NewGroup() diff --git a/frontend/http/writer.go b/frontend/http/writer.go index 79a785e..8b194be 100644 --- a/frontend/http/writer.go +++ b/frontend/http/writer.go @@ -9,7 +9,6 @@ import ( "github.com/anacrolix/torrent/bencode" "github.com/sot-tech/mochi/bittorrent" - "github.com/sot-tech/mochi/pkg/log" ) // WriteError communicates an error to a BitTorrent client over HTTP. @@ -19,13 +18,13 @@ func WriteError(w http.ResponseWriter, err error) { if errors.As(err, &clientErr) { message = clientErr.Error() } else { - log.Error("http: internal error", log.Err(err)) + logger.Error().Err(err).Msg("http: internal error") } if err = bencode.NewEncoder(w).Encode(map[string]any{ "failure reason": message, }); err != nil { - log.Error("unable to encode message", log.Err(err)) + logger.Error().Err(err).Msg("unable to encode message") } } diff --git a/frontend/http/writer_test.go b/frontend/http/writer_test.go index 572e093..bb5cb6a 100644 --- a/frontend/http/writer_test.go +++ b/frontend/http/writer_test.go @@ -8,8 +8,13 @@ import ( "github.com/stretchr/testify/require" "github.com/sot-tech/mochi/bittorrent" + "github.com/sot-tech/mochi/pkg/log" ) +func init() { + _ = log.ConfigureLogger("", "warn", false, false) +} + func TestWriteError(t *testing.T) { table := []struct { reason, expected string diff --git a/frontend/options.go b/frontend/options.go index 093e6a4..ce5b955 100644 --- a/frontend/options.go +++ b/frontend/options.go @@ -2,6 +2,8 @@ package frontend import "github.com/sot-tech/mochi/pkg/log" +var logger = log.NewLogger("frontend configurator") + // ParseOptions is the configuration used to parse an Announce Request. // // If AllowIPSpoofing is true, IPs provided via params will be used. @@ -18,29 +20,29 @@ func (op ParseOptions) Validate() ParseOptions { valid := op if op.MaxNumWant <= 0 { valid.MaxNumWant = defaultMaxNumWant - log.Warn("falling back to default configuration", log.Fields{ - "name": "MaxNumWant", - "provided": op.MaxNumWant, - "default": valid.MaxNumWant, - }) + logger.Warn(). + Str("name", "MaxNumWant"). + Uint32("provided", op.MaxNumWant). + Uint32("default", valid.MaxNumWant). + Msg("falling back to default configuration") } if op.DefaultNumWant <= 0 { valid.DefaultNumWant = defaultDefaultNumWant - log.Warn("falling back to default configuration", log.Fields{ - "name": "DefaultNumWant", - "provided": op.DefaultNumWant, - "default": valid.DefaultNumWant, - }) + logger.Warn(). + Str("name", "DefaultNumWant"). + Uint32("provided", op.DefaultNumWant). + Uint32("default", valid.DefaultNumWant). + Msg("falling back to default configuration") } if op.MaxScrapeInfoHashes <= 0 { valid.MaxScrapeInfoHashes = defaultMaxScrapeInfoHashes - log.Warn("falling back to default configuration", log.Fields{ - "name": "MaxScrapeInfoHashes", - "provided": op.MaxScrapeInfoHashes, - "default": valid.MaxScrapeInfoHashes, - }) + logger.Warn(). + Str("name", "MaxScrapeInfoHashes"). + Uint32("provided", op.MaxScrapeInfoHashes). + Uint32("default", valid.MaxScrapeInfoHashes). + Msg("falling back to default configuration") } return valid } diff --git a/frontend/udp/connection_id.go b/frontend/udp/connection_id.go index f8bcd5a..de5df66 100644 --- a/frontend/udp/connection_id.go +++ b/frontend/udp/connection_id.go @@ -81,14 +81,22 @@ func (g *ConnectionIDGenerator) Generate(ip netip.Addr, now time.Time) []byte { g.scratch = g.mac.Sum(g.scratch) copy(g.connID[4:8], g.scratch[:4]) - log.Debug("generated connection ID", log.Fields{"ip": ip, "now": now, "connID": g.connID}) + log.Debug(). + Stringer("ip", ip). + Time("now", now). + Bytes("connID", g.connID). + Msg("generated connection ID") return g.connID } // Validate validates the given connection ID for an IP and the current time. func (g *ConnectionIDGenerator) Validate(connectionID []byte, ip netip.Addr, now time.Time, maxClockSkew time.Duration) bool { ts := time.Unix(int64(binary.BigEndian.Uint32(connectionID[:4])), 0) - log.Debug("validating connection ID", log.Fields{"connID": connectionID, "ip": ip, "ts": ts, "now": now}) + log.Debug(). + Stringer("ip", ip). + Time("ts", ts).Time("now", now). + Bytes("connID", g.connID). + Msg("validating connection ID") if now.After(ts.Add(ttl)) || ts.After(now.Add(maxClockSkew)) { return false } diff --git a/frontend/udp/connection_id_test.go b/frontend/udp/connection_id_test.go index edc44b2..90f1022 100644 --- a/frontend/udp/connection_id_test.go +++ b/frontend/udp/connection_id_test.go @@ -57,7 +57,11 @@ func simpleNewConnectionID(ip netip.Addr, now time.Time, key string) []byte { // this is just in here because logging impacts performance and we benchmark // this version too. - log.Debug("manually generated connection ID", log.Fields{"ip": ip, "now": now, "connID": buf}) + log.Debug(). + Stringer("ip", ip). + Time("now", now). + Bytes("connID", buf). + Msg("manually generated connection ID") return buf } diff --git a/frontend/udp/frontend.go b/frontend/udp/frontend.go index 4d4451b..1ae757f 100644 --- a/frontend/udp/frontend.go +++ b/frontend/udp/frontend.go @@ -23,6 +23,8 @@ import ( "github.com/sot-tech/mochi/pkg/timecache" ) +var logger = log.NewLogger("udp frontend") + var allowedGeneratedPrivateKeyRunes = []rune("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ1234567890") // Config represents all of the configurable options for a UDP BitTorrent @@ -35,20 +37,6 @@ type Config struct { frontend.ParseOptions } -// LogFields renders the current config as a set of Logrus fields. -func (cfg Config) LogFields() log.Fields { - return log.Fields{ - "addr": cfg.Addr, - "privateKey": cfg.PrivateKey, - "maxClockSkew": cfg.MaxClockSkew, - "enableRequestTiming": cfg.EnableRequestTiming, - "allowIPSpoofing": cfg.AllowIPSpoofing, - "maxNumWant": cfg.MaxNumWant, - "defaultNumWant": cfg.DefaultNumWant, - "maxScrapeInfoHashes": cfg.MaxScrapeInfoHashes, - } -} - // Validate sanity checks values set in a config and returns a new config with // default values replacing anything that is invalid. // @@ -64,7 +52,11 @@ func (cfg Config) Validate() Config { } validcfg.PrivateKey = string(pkeyRunes) - log.Warn("UDP private key was not provided, using generated key", log.Fields{"key": validcfg.PrivateKey}) + logger.Warn(). + Str("name", "UDP.PrivateKey"). + Str("provided", ""). + Str("key", validcfg.PrivateKey). + Msg("falling back to default configuration") } validcfg.ParseOptions = cfg.ParseOptions.Validate() @@ -111,7 +103,7 @@ func NewFrontend(logic frontend.TrackerLogic, c conf.MapConfig) (*Frontend, erro f.wg.Add(1) go func() { if err := f.serve(); err != nil { - log.Fatal("failed while serving udp", log.Err(err)) + logger.Fatal().Err(err).Str("proto", "udp").Msg("failed while serving") } }() @@ -157,7 +149,7 @@ func (t *Frontend) serve() error { // Check to see if we need to shutdown. select { case <-t.closing: - log.Debug("udp serve() received shutdown signal") + log.Debug().Msg("serve received shutdown signal") return nil default: } diff --git a/frontend/udp/frontend_test.go b/frontend/udp/frontend_test.go index b1b48ac..86b8c6f 100644 --- a/frontend/udp/frontend_test.go +++ b/frontend/udp/frontend_test.go @@ -6,11 +6,16 @@ import ( "github.com/sot-tech/mochi/frontend/udp" "github.com/sot-tech/mochi/middleware" "github.com/sot-tech/mochi/pkg/conf" + "github.com/sot-tech/mochi/pkg/log" _ "github.com/sot-tech/mochi/pkg/randseed" "github.com/sot-tech/mochi/storage" _ "github.com/sot-tech/mochi/storage/memory" ) +func init() { + _ = log.ConfigureLogger("", "warn", false, false) +} + func TestStartStopRaceIssue437(t *testing.T) { ps, err := storage.NewStorage("memory", conf.MapConfig{}) if err != nil { diff --git a/go.mod b/go.mod index 67b26a7..099e5c3 100644 --- a/go.mod +++ b/go.mod @@ -3,6 +3,7 @@ module github.com/sot-tech/mochi go 1.18 require ( + code.cloudfoundry.org/go-diodes v0.0.0-20220420211542-53509ccdf174 github.com/SermoDigital/jose v0.9.2-0.20180104203859-803625baeddc github.com/anacrolix/torrent v1.42.0 github.com/go-redis/redis/v8 v8.11.5 @@ -12,8 +13,7 @@ require ( github.com/mitchellh/mapstructure v1.5.0 github.com/pkg/errors v0.9.1 github.com/prometheus/client_golang v1.12.1 - github.com/sirupsen/logrus v1.8.1 - github.com/spf13/cobra v1.4.0 + github.com/rs/zerolog v1.26.1 github.com/stretchr/testify v1.7.1 gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b ) @@ -22,23 +22,21 @@ require ( github.com/anacrolix/dht/v2 v2.17.0 // indirect github.com/anacrolix/log v0.13.1 // indirect github.com/anacrolix/missinggo v1.3.0 // indirect - github.com/anacrolix/missinggo/v2 v2.6.0 // indirect + github.com/anacrolix/missinggo/v2 v2.7.0 // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/bradfitz/iter v0.0.0-20191230175014-e8f45d346db8 // indirect github.com/cespare/xxhash/v2 v2.1.2 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect - github.com/fsnotify/fsnotify v1.5.3 // indirect + github.com/fsnotify/fsnotify v1.5.4 // indirect github.com/golang/protobuf v1.5.2 // indirect github.com/huandu/xstrings v1.3.2 // indirect - github.com/inconshreveable/mousetrap v1.0.0 // indirect github.com/klauspost/cpuid/v2 v2.0.12 // indirect github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/prometheus/client_model v0.2.0 // indirect github.com/prometheus/common v0.34.0 // indirect github.com/prometheus/procfs v0.7.3 // indirect - github.com/spf13/pflag v1.0.5 // indirect - golang.org/x/sys v0.0.0-20220422013727-9388b58f7150 // indirect + golang.org/x/sys v0.0.0-20220429233432-b5fbb4746d32 // indirect google.golang.org/protobuf v1.28.0 // indirect ) diff --git a/go.sum b/go.sum index 55c6f9f..7133a44 100644 --- a/go.sum +++ b/go.sum @@ -30,6 +30,8 @@ cloud.google.com/go/storage v1.5.0/go.mod h1:tpKbwo567HUNpVclU5sGELwQWBDZ8gh0Zeo cloud.google.com/go/storage v1.6.0/go.mod h1:N7U0C8pVQ/+NIKOBQyamJIeKQKkZ+mxpohlUTyfDhBk= cloud.google.com/go/storage v1.8.0/go.mod h1:Wv1Oy7z6Yz3DshWRJFhqM/UCfaWIRTdp0RXyy7KQOVs= cloud.google.com/go/storage v1.10.0/go.mod h1:FLPqc6j+Ki4BU591ie1oL6qBQGu2Bl/tZ9ullr3+Kg0= +code.cloudfoundry.org/go-diodes v0.0.0-20220420211542-53509ccdf174 h1:Ht2zKWftukU3F3ACIdE8asNhso3DgHPzaCDO2K5SWmA= +code.cloudfoundry.org/go-diodes v0.0.0-20220420211542-53509ccdf174/go.mod h1:HLP7HKUU1eqMAGMk247yT91tDDi4xxnehkyXh6hGcr0= crawshaw.io/iox v0.0.0-20181124134642-c51c3df30797/go.mod h1:sXBiorCo8c46JlQV3oXPKINnZ8mcqnye1EkVkqsectk= crawshaw.io/sqlite v0.3.2/go.mod h1:igAO5JulrQ1DbdZdtVq48mnZUBAPOeFzer7VhDWNtW4= dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU= @@ -65,8 +67,8 @@ github.com/anacrolix/missinggo v1.3.0/go.mod h1:bqHm8cE8xr+15uVfMG3BFui/TxyB6//H github.com/anacrolix/missinggo/perf v1.0.0/go.mod h1:ljAFWkBuzkO12MQclXzZrosP5urunoLS0Cbvb4V0uMQ= github.com/anacrolix/missinggo/v2 v2.2.0/go.mod h1:o0jgJoYOyaoYQ4E2ZMISVa9c88BbUBVQQW4QeRkNCGY= github.com/anacrolix/missinggo/v2 v2.5.1/go.mod h1:WEjqh2rmKECd0t1VhQkLGTdIWXO6f6NLjp5GlMZ+6FA= -github.com/anacrolix/missinggo/v2 v2.6.0 h1:kHkn6nLy1isWYV4mthZX8itV1bRd2mwFVuXrxzJ4VX0= -github.com/anacrolix/missinggo/v2 v2.6.0/go.mod h1:2IZIvmRTizALNYFYXsPR7ofXPzJgyBpKZ4kMqMEICkI= +github.com/anacrolix/missinggo/v2 v2.7.0 h1:4fzOAAn/VCvfWGviLmh64MPMttrlYew81JdPO7nSHvI= +github.com/anacrolix/missinggo/v2 v2.7.0/go.mod h1:2IZIvmRTizALNYFYXsPR7ofXPzJgyBpKZ4kMqMEICkI= github.com/anacrolix/stm v0.2.0/go.mod h1:zoVQRvSiGjGoTmbM0vSLIiaKjWtNPeTvXUSdJQA4hsg= github.com/anacrolix/tagflag v0.0.0-20180109131632-2146c8d41bf0/go.mod h1:1m2U/K6ZT+JZG0+bdMK6qauP49QT4wE5pmhJXOKKCHw= github.com/anacrolix/tagflag v1.0.0/go.mod h1:1m2U/K6ZT+JZG0+bdMK6qauP49QT4wE5pmhJXOKKCHw= @@ -92,7 +94,7 @@ github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5P github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU= github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc= -github.com/cpuguy83/go-md2man/v2 v2.0.1/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o= +github.com/coreos/go-systemd/v22 v22.3.2/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc= github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= @@ -111,8 +113,8 @@ github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1m github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= github.com/frankban/quicktest v1.14.0 h1:+cqqvzZV87b4adx/5ayVOaYZ2CrvM4ejQvUdBzPPUss= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= -github.com/fsnotify/fsnotify v1.5.3 h1:vNFpj2z7YIbwh2bw7x35sqYpp2wfuq+pivKbWG09B8c= -github.com/fsnotify/fsnotify v1.5.3/go.mod h1:T3375wBYaZdLLcVNkcVbzGHY7f1l/uK5T5Ai1i3InKU= +github.com/fsnotify/fsnotify v1.5.4 h1:jRbGcIw6P2Meqdwuo0H1p6JVLbL5DHKAKlYndzMwVZI= +github.com/fsnotify/fsnotify v1.5.4/go.mod h1:OVB6XrOHzAwXMpEM7uPOzcehqUV2UqJxmVXmkdnm1bU= github.com/glycerine/go-unsnap-stream v0.0.0-20180323001048-9f0cb55181dd/go.mod h1:/20jfyN9Y5QPEAprSgKAUr+glWDY39ZiUEAYOEv5dsE= github.com/glycerine/go-unsnap-stream v0.0.0-20181221182339-f9677308dec2/go.mod h1:/20jfyN9Y5QPEAprSgKAUr+glWDY39ZiUEAYOEv5dsE= github.com/glycerine/go-unsnap-stream v0.0.0-20190901134440-81cf024a9e0a/go.mod h1:/20jfyN9Y5QPEAprSgKAUr+glWDY39ZiUEAYOEv5dsE= @@ -133,6 +135,7 @@ github.com/go-logfmt/logfmt v0.5.1/go.mod h1:WYhtIu8zTZfxdn5+rREduYbwxfcBr/Vr6KE github.com/go-redis/redis/v8 v8.11.5 h1:AcZZR7igkdvfVmQTPnu9WE37LRrO/YrBH5zWyjDC0oI= github.com/go-redis/redis/v8 v8.11.5/go.mod h1:gREzHqY1hg6oD9ngVRbLStwAWKhA0FEgq8Jd4h5lpwo= github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= +github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= github.com/gogo/protobuf v1.2.0/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= @@ -206,8 +209,6 @@ github.com/huandu/xstrings v1.3.1/go.mod h1:y5/lhBue+AyNmUVz9RLU9xbLR0o4KIIExikq github.com/huandu/xstrings v1.3.2 h1:L18LIDzqlW6xN2rEkpdV8+oL/IXWJ1APd+vsdYy4Wdw= github.com/huandu/xstrings v1.3.2/go.mod h1:y5/lhBue+AyNmUVz9RLU9xbLR0o4KIIExikq4ovT0aE= github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc= -github.com/inconshreveable/mousetrap v1.0.0 h1:Z8tu5sraLXCXIcARxBp/8cbvlwVa7Z1NHg9XEKhtSvM= -github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8= github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4= github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU= github.com/json-iterator/go v1.1.9/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4= @@ -257,7 +258,7 @@ github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+W github.com/onsi/ginkgo v1.7.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= github.com/onsi/ginkgo v1.16.5 h1:8xi0RTUf59SOSfEtZMvwTvXYMzG4gV23XVHOZiXNtnE= github.com/onsi/gomega v1.4.3/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY= -github.com/onsi/gomega v1.18.1 h1:M1GfJqGRrBrrGGsbxzV5dqM2U2ApXefZCQpkukxYRLE= +github.com/onsi/gomega v1.19.0 h1:4ieX6qQjPP/BfC3mpsAtIGGlxTWPeA3Inl/7DtXw1tw= github.com/openzipkin/zipkin-go v0.1.6/go.mod h1:QgAqvLzwWbR/WpD4A3cGpPtJrZXNIiJc5AZX7/PBEpw= github.com/philhofer/fwd v1.0.0/go.mod h1:gk3iGcWd9+svBvR0sR+KPcfE+RNWozjowpeBVG3ZVNU= github.com/pierrec/lz4 v2.0.5+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY= @@ -304,21 +305,17 @@ github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFR github.com/rogpeppe/go-internal v1.6.1/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc= github.com/rogpeppe/go-internal v1.8.0 h1:FCbCCtXNOY3UtUuHUYaghJg4y7Fd14rXifAYUAtL9R8= github.com/rogpeppe/go-internal v1.8.0/go.mod h1:WmiCO8CzOY8rg0OYDC4/i/2WRWAB6poM+XZ2dLUbcbE= -github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= +github.com/rs/xid v1.3.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg= +github.com/rs/zerolog v1.26.1 h1:/ihwxqH+4z8UxyI70wM1z9yCvkWcfz/a3mj48k/Zngc= +github.com/rs/zerolog v1.26.1/go.mod h1:/wSSJWX7lVrsOwlbyTRSOJvqRlc+WjWlfes+CiJ+tmc= github.com/ryszard/goskiplist v0.0.0-20150312221310-2dfbae5fcf46/go.mod h1:uAQ5PCi+MFsC7HjREoAz1BU+Mq60+05gifQSsHSDG/8= github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo= github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE= github.com/sirupsen/logrus v1.6.0/go.mod h1:7uNnSEd1DgxDLC74fIahvMZmmYsHGZGEOFrfsX/uA88= -github.com/sirupsen/logrus v1.8.1 h1:dJKuHgqk1NNQlqoA6BTlM1Wf9DOH3NBjQyu0h9+AZZE= -github.com/sirupsen/logrus v1.8.1/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0= github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d/go.mod h1:OnSkiWE9lh6wB0YB77sQom3nweQdgAjqCqsofrRNTgc= github.com/smartystreets/assertions v0.0.0-20190215210624-980c5ac6f3ac/go.mod h1:OnSkiWE9lh6wB0YB77sQom3nweQdgAjqCqsofrRNTgc= github.com/smartystreets/goconvey v0.0.0-20181108003508-044398e4856c/go.mod h1:XDJAKZRPZ1CvBcN2aX5YOUTYGHki24fSF0Iv48Ibg0s= github.com/smartystreets/goconvey v0.0.0-20190306220146-200a235640ff/go.mod h1:KSQcGKpxUMHk3nbYzs/tIBAM2iDooCn0BmttHOJEbLs= -github.com/spf13/cobra v1.4.0 h1:y+wJpx64xcgO1V+RcnwW0LEHxTKRi2ZDPSBjWnrg88Q= -github.com/spf13/cobra v1.4.0/go.mod h1:Wo4iy3BUC+X2Fybo0PDqwJIv3dNRiZLHQymsfxlB84g= -github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA= -github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.2.1/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= @@ -336,6 +333,7 @@ github.com/willf/bitset v1.1.10/go.mod h1:RjeCKbqT1RxIR/KWY6phxZiaY1IyutSBfGjNPy github.com/yuin/goldmark v1.1.25/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.1.32/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= +github.com/yuin/goldmark v1.4.0/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= go.opencensus.io v0.20.1/go.mod h1:6WKK9ahsWS3RSO+PY9ZHZUfv2irvY6gN279GOPZjmmk= go.opencensus.io v0.20.2/go.mod h1:6WKK9ahsWS3RSO+PY9ZHZUfv2irvY6gN279GOPZjmmk= go.opencensus.io v0.21.0/go.mod h1:mSImk1erAIZhrmZN+AvHh14ztQfjbGwt4TtuofqLduU= @@ -349,6 +347,7 @@ golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8U golang.org/x/crypto v0.0.0-20190605123033-f99c8df09eb5/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/crypto v0.0.0-20211215165025-cf75a172585e/go.mod h1:P+XmwS30IXTQdn5tA2iutPOUgjI07+tq3H3K9MVA1s8= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190510132918-efd6b22b2522/go.mod h1:ZjyILWgesfNpC6sMxTJOJm9Kp84zZh5NQWvqDGG3Qr8= @@ -379,6 +378,7 @@ golang.org/x/mod v0.1.1-0.20191105210325-c90efee705ee/go.mod h1:QqPTAvyqsEbceGzB golang.org/x/mod v0.1.1-0.20191107180719-034126e5016b/go.mod h1:QqPTAvyqsEbceGzBzNggFXnrqF1CaUcvgkdR5Ot7KZg= golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= +golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= @@ -409,7 +409,9 @@ golang.org/x/net v0.0.0-20200520182314-0ba52f642ac2/go.mod h1:qpuaurCH72eLCgpAm/ golang.org/x/net v0.0.0-20200625001655-4c5254603344/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= golang.org/x/net v0.0.0-20200707034311-ab3426394381/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= golang.org/x/net v0.0.0-20200822124328-c89045814202/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= +golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= golang.org/x/net v0.0.0-20210525063256-abc453219eb5/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= +golang.org/x/net v0.0.0-20210805182204-aaa1db679c0d/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk= golang.org/x/net v0.0.0-20220225172249-27dd8689420f h1:oA4XRj0qtSt8Yo1Zms0CUlsT3KG69V2UGQWPBxujDmc= golang.org/x/net v0.0.0-20220225172249-27dd8689420f/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk= @@ -429,6 +431,7 @@ golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= @@ -444,7 +447,6 @@ golang.org/x/sys v0.0.0-20190606165138-5da285871e9c/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20190624142023-c5567b49c5d0/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190726091711-fc99dfbffb4e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191001151750-bb3f8db39f24/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191204072324-ce4227a45e2e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191228213918-04cbcbbfeed8/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200106162015-b016eb3dc98e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= @@ -469,11 +471,12 @@ golang.org/x/sys v0.0.0-20210124154548-22da62e12c0c/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210603081109-ebe580a85c40/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20210809222454-d867a43fc93e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220114195835-da31bd327af9/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.0.0-20220422013727-9388b58f7150 h1:xHms4gcpe1YE7A3yIllJXP16CMAGuqwO2lX1mTyyRRc= -golang.org/x/sys v0.0.0-20220422013727-9388b58f7150/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220412211240-33da011f77ad/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220429233432-b5fbb4746d32 h1:Js08h5hqB5xyWR789+QqueR6sDE8mk+YvpETZ+F6X9Y= +golang.org/x/sys v0.0.0-20220429233432-b5fbb4746d32/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= @@ -528,6 +531,7 @@ golang.org/x/tools v0.0.0-20200618134242-20370b0cb4b2/go.mod h1:EkVYQZoAsY45+roY golang.org/x/tools v0.0.0-20200729194436-6467de6f59a7/go.mod h1:njjCfa9FT2d7l9Bc6FUM5FLjQPp3cFF28FI3qnDFljA= golang.org/x/tools v0.0.0-20200804011535-6c149bb5ef0d/go.mod h1:njjCfa9FT2d7l9Bc6FUM5FLjQPp3cFF28FI3qnDFljA= golang.org/x/tools v0.0.0-20200825202427-b303f430e36d/go.mod h1:njjCfa9FT2d7l9Bc6FUM5FLjQPp3cFF28FI3qnDFljA= +golang.org/x/tools v0.1.7/go.mod h1:LGqMHiF4EqQNHR1JncWGqT5BVaXmza+X+BDGol+dOxo= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= diff --git a/middleware/hooks.go b/middleware/hooks.go index 4419a8a..88f1639 100644 --- a/middleware/hooks.go +++ b/middleware/hooks.go @@ -5,7 +5,6 @@ import ( "errors" "github.com/sot-tech/mochi/bittorrent" - "github.com/sot-tech/mochi/pkg/log" "github.com/sot-tech/mochi/storage" ) @@ -185,16 +184,10 @@ func (h *responseHook) appendPeers(req *bittorrent.AnnounceRequest, resp *bittor resp.IPv4Peers = append(resp.IPv4Peers, p) uniquePeers[p] = nil } else { - log.Warn("received invalid peer from storage", log.Fields{"peer": p}) + logger.Warn().Object("peer", p).Msg("received invalid peer from storage") } } } - log.Debug("responseHook announce peers", log.Fields{ - "infoHash": req.InfoHash, - "requestPeer": req.RequestPeer, - "ipv4Peers": resp.IPv4Peers, - "ipv6Peers": resp.IPv6Peers, - }) return } diff --git a/middleware/jwt/jwt.go b/middleware/jwt/jwt.go index 9ad1683..6875968 100644 --- a/middleware/jwt/jwt.go +++ b/middleware/jwt/jwt.go @@ -14,7 +14,6 @@ import ( "errors" "fmt" "net/http" - "strings" "time" jc "github.com/SermoDigital/jose/crypto" @@ -38,11 +37,18 @@ func init() { } var ( + logger = log.NewLogger(Name) // ErrMissingJWT is returned when a JWT is missing from a request. ErrMissingJWT = bittorrent.ClientError("unapproved request: missing jwt") // ErrInvalidJWT is returned when a JWT fails to verify. ErrInvalidJWT = bittorrent.ClientError("unapproved request: invalid jwt") + + errInvalidInfoHashClaim = errors.New("claim \"infohash\" is invalid") + + errInvalidKid = errors.New("invalid kid") + + errUnknownKidSigner = errors.New("signed by unknown kid") ) // Config represents all the values required by this middleware to fetch JWKs @@ -54,16 +60,6 @@ type Config struct { JWKUpdateInterval time.Duration `cfg:"jwk_set_update_interval"` } -// LogFields implements log.Fielder for a Config. -func (cfg Config) LogFields() log.Fields { - return log.Fields{ - "issuer": cfg.Issuer, - "audience": cfg.Audience, - "JWKSetURL": cfg.JWKSetURL, - "JWKUpdateInterval": cfg.JWKUpdateInterval, - } -} - type hook struct { cfg Config publicKeys map[string]crypto.PublicKey @@ -77,14 +73,14 @@ func build(options conf.MapConfig, _ storage.PeerStorage) (middleware.Hook, erro return nil, fmt.Errorf("middleware %s: %w", Name, err) } - log.Debug("creating new JWT middleware", options) + logger.Debug().Object("options", options).Msg("creating new JWT middleware") h := &hook{ cfg: cfg, publicKeys: map[string]crypto.PublicKey{}, closing: make(chan struct{}), } - log.Debug("performing initial fetch of JWKs") + logger.Debug().Msg("performing initial fetch of JWKs") if err := h.updateKeys(); err != nil { return nil, fmt.Errorf("failed to fetch initial JWK Set: %w", err) } @@ -95,7 +91,7 @@ func build(options conf.MapConfig, _ storage.PeerStorage) (middleware.Hook, erro case <-h.closing: return case <-time.After(cfg.JWKUpdateInterval): - log.Debug("performing fetch of JWKs") + logger.Debug().Msg("performing fetch of JWKs") _ = h.updateKeys() } } @@ -107,14 +103,14 @@ func build(options conf.MapConfig, _ storage.PeerStorage) (middleware.Hook, erro func (h *hook) updateKeys() error { resp, err := http.Get(h.cfg.JWKSetURL) if err != nil { - log.Error("failed to fetch JWK Set", log.Err(err)) + logger.Error().Err(err).Msg("failed to fetch JWK Set") return err } defer resp.Body.Close() var parsedJWKs gojwk.Key err = json.NewDecoder(resp.Body).Decode(&parsedJWKs) if err != nil { - log.Error("failed to decode JWK JSON", log.Err(err)) + logger.Error().Err(err).Msg("failed to decode JWK JSON") return err } @@ -122,19 +118,19 @@ func (h *hook) updateKeys() error { for _, parsedJWK := range parsedJWKs.Keys { publicKey, err := parsedJWK.DecodePublicKey() if err != nil { - log.Error("failed to decode JWK into public key", log.Err(err)) + logger.Error().Err(err).Msg("failed to decode JWK into public key") return err } keys[parsedJWK.Kid] = publicKey } h.publicKeys = keys - log.Debug("successfully fetched JWK Set") + logger.Debug().Msg("successfully fetched JWK Set") return nil } func (h *hook) Stop() stop.Result { - log.Debug("attempting to shutdown JWT middleware") + logger.Debug().Msg("attempting to shutdown JWT middleware") select { case <-h.closing: return stop.AlreadyStopped @@ -178,53 +174,51 @@ func validateJWT(ih bittorrent.InfoHash, jwtBytes []byte, cfgIss, cfgAud string, claims := parsedJWT.Claims() if iss, ok := claims.Issuer(); !ok || iss != cfgIss { - log.Debug("unequal or missing issuer when validating JWT", log.Fields{ - "exists": ok, - "claim": iss, - "config": cfgIss, - }) + logger.Debug(). + Bool("exists", ok). + Str("claim", iss). + Str("config", cfgIss). + Msg("unequal or missing issuer when validating JWT") return jwt.ErrInvalidISSClaim } if auds, ok := claims.Audience(); !ok || !in(cfgAud, auds) { - log.Debug("unequal or missing audience when validating JWT", log.Fields{ - "exists": ok, - "claim": strings.Join(auds, ","), - "config": cfgAud, - }) + logger.Debug(). + Bool("exists", ok). + Strs("claim", auds). + Str("config", cfgAud). + Msg("unequal or missing audience when validating JWT") return jwt.ErrInvalidAUDClaim } ihHex := hex.EncodeToString([]byte(ih)) if ihClaim, ok := claims.Get("infohash").(string); !ok || ihClaim != ihHex { - log.Debug("unequal or missing infohash when validating JWT", log.Fields{ - "exists": ok, - "claim": ihClaim, - "request": ihHex, - }) - return errors.New("claim \"infohash\" is invalid") + logger.Debug(). + Bool("exists", ok). + Str("claim", ihClaim). + Str("request", ihHex). + Msg("unequal or missing infohash when validating JWT") + return errInvalidInfoHashClaim } parsedJWS := parsedJWT.(jws.JWS) kid, ok := parsedJWS.Protected().Get("kid").(string) if !ok { - log.Debug("missing kid when validating JWT", log.Fields{ - "exists": ok, - "claim": kid, - }) - return errors.New("invalid kid") + logger.Debug(). + Bool("exists", ok). + Str("claim", kid). + Msg("missing kid when validating JWT") + return errInvalidKid } publicKey, ok := publicKeys[kid] if !ok { - log.Debug("missing public key forkid when validating JWT", log.Fields{ - "kid": kid, - }) - return errors.New("signed by unknown kid") + logger.Debug().Str("claim", kid).Msg("missing public key forkid when validating JWT") + return errUnknownKidSigner } err = parsedJWS.Verify(publicKey, jc.SigningMethodRS256) if err != nil { - log.Debug("failed to verify signature of JWT", log.Err(err)) + logger.Debug().Err(err).Msg("failed to verify signature of JWT") return err } diff --git a/middleware/logic.go b/middleware/logic.go index 0219012..ad5cef3 100644 --- a/middleware/logic.go +++ b/middleware/logic.go @@ -11,7 +11,10 @@ import ( "github.com/sot-tech/mochi/storage" ) -var _ frontend.TrackerLogic = &Logic{} +var ( + logger = log.NewLogger("middleware") + _ frontend.TrackerLogic = &Logic{} +) // NewLogic creates a new instance of a TrackerLogic that executes the provided // middleware hooks. @@ -35,6 +38,7 @@ type Logic struct { // HandleAnnounce generates a response for an Announce. func (l *Logic) HandleAnnounce(ctx context.Context, req *bittorrent.AnnounceRequest) (_ context.Context, resp *bittorrent.AnnounceResponse, err error) { + logger.Debug().Object("request", req).Msg("new announce request") resp = &bittorrent.AnnounceResponse{ Interval: l.announceInterval, MinInterval: l.minAnnounceInterval, @@ -46,7 +50,7 @@ func (l *Logic) HandleAnnounce(ctx context.Context, req *bittorrent.AnnounceRequ } } - log.Debug("generated announce response", resp) + logger.Debug().Object("response", resp).Msg("generated announce response") return ctx, resp, nil } @@ -56,7 +60,10 @@ func (l *Logic) AfterAnnounce(ctx context.Context, req *bittorrent.AnnounceReque var err error for _, h := range l.postHooks { if ctx, err = h.HandleAnnounce(ctx, req, resp); err != nil { - log.Error("post-announce hooks failed", log.Err(err)) + logger.Error().Err(err). + Object("request", req). + Object("response", resp). + Msg("post-announce hooks failed") return } } @@ -64,6 +71,7 @@ func (l *Logic) AfterAnnounce(ctx context.Context, req *bittorrent.AnnounceReque // HandleScrape generates a response for a Scrape. func (l *Logic) HandleScrape(ctx context.Context, req *bittorrent.ScrapeRequest) (_ context.Context, resp *bittorrent.ScrapeResponse, err error) { + logger.Debug().Object("request", req).Msg("new scrape request") resp = &bittorrent.ScrapeResponse{ Files: make([]bittorrent.Scrape, 0, len(req.InfoHashes)), } @@ -73,7 +81,7 @@ func (l *Logic) HandleScrape(ctx context.Context, req *bittorrent.ScrapeRequest) } } - log.Debug("generated scrape response", resp) + logger.Debug().Object("response", resp).Msg("generated scrape response") return ctx, resp, nil } @@ -83,7 +91,11 @@ func (l *Logic) AfterScrape(ctx context.Context, req *bittorrent.ScrapeRequest, var err error for _, h := range l.postHooks { if ctx, err = h.HandleScrape(ctx, req, resp); err != nil { - log.Error("post-scrape hooks failed", log.Err(err)) + logger.Error(). + Err(err). + Object("request", req). + Object("response", resp). + Msg("post-scrape hooks failed") return } } diff --git a/middleware/logic_test.go b/middleware/logic_test.go index 5929354..7b0592e 100644 --- a/middleware/logic_test.go +++ b/middleware/logic_test.go @@ -9,8 +9,13 @@ import ( "github.com/stretchr/testify/require" "github.com/sot-tech/mochi/bittorrent" + "github.com/sot-tech/mochi/pkg/log" ) +func init() { + _ = log.ConfigureLogger("", "warn", false, false) +} + // nopHook is a Hook to measure the overhead of a no-operation Hook through // benchmarks. type nopHook struct{} diff --git a/middleware/middleware.go b/middleware/middleware.go index 140a97b..bb8bf05 100644 --- a/middleware/middleware.go +++ b/middleware/middleware.go @@ -47,11 +47,11 @@ func RegisterBuilder(name string, d Builder) { drivers[name] = d } -// New attempts to initialize a new middleware instance from the +// NewHook attempts to initialize a new middleware instance from the // list of registered Builders. // // If a driver does not exist, returns ErrBuilderDoesNotExist. -func New(name string, options conf.MapConfig, storage storage.PeerStorage) (Hook, error) { +func NewHook(name string, options conf.MapConfig, storage storage.PeerStorage) (Hook, error) { driversM.RLock() defer driversM.RUnlock() @@ -70,9 +70,9 @@ type Config struct { Options conf.MapConfig } -// HooksFromHookConfigs is a utility function for initializing Hooks in bulk. +// NewHooks is a utility function for initializing Hooks in bulk. // each element of configs must contain pairs `name` - string and `options` - map[string]any -func HooksFromHookConfigs(configs []conf.MapConfig, storage storage.PeerStorage) (hooks []Hook, err error) { +func NewHooks(configs []conf.MapConfig, storage storage.PeerStorage) (hooks []Hook, err error) { for _, cfg := range configs { var c Config @@ -81,7 +81,7 @@ func HooksFromHookConfigs(configs []conf.MapConfig, storage storage.PeerStorage) } var h Hook - h, err = New(c.Name, c.Options, storage) + h, err = NewHook(c.Name, c.Options, storage) if err != nil { break } diff --git a/middleware/torrentapproval/container/directory/directory.go b/middleware/torrentapproval/container/directory/directory.go index 82c88f8..7b2e13c 100644 --- a/middleware/torrentapproval/container/directory/directory.go +++ b/middleware/torrentapproval/container/directory/directory.go @@ -23,6 +23,8 @@ import ( // Name of this container for registry const Name = "directory" +var logger = log.NewLogger("torrent approval directory") + func init() { container.Register(Name, build) } @@ -55,36 +57,37 @@ func build(conf conf.MapConfig, st storage.DataStorage) (container.Container, er } d.watcher = w if len(d.StorageCtx) == 0 { - log.Info("storage context not set, using default value: " + container.DefaultStorageCtxName) + logger.Warn(). + Str("name", "StorageCtx"). + Str("provided", d.StorageCtx). + Str("default", container.DefaultStorageCtxName). + Msg("falling back to default configuration") d.StorageCtx = container.DefaultStorageCtxName } go func() { for event := range d.watcher.Events { var mi *metainfo.MetaInfo - lf := log.Fields{ - "file": event.TorrentFilePath, - "v1hash": event.InfoHash, - } if mi, err = metainfo.LoadFromFile(event.TorrentFilePath); err == nil { s256 := sha256.New() s256.Write(mi.InfoBytes) v2hash, _ := bittorrent.NewInfoHash(s256.Sum(nil)) - lf["v2hash"] = v2hash - lf["v2to1hash"] = v2hash.TruncateV1() switch event.Change { case dirwatch.Added: var name string if info, err := mi.UnmarshalInfo(); err == nil { name = info.Name } else { - lf["error"] = err - log.Warn("unable to unmarshal torrent info", lf) - delete(lf, "error") + logger.Error(). + Err(err). + Str("file", event.TorrentFilePath). + Stringer("infoHash", event.InfoHash). + Stringer("infoHashV2", v2hash). + Msg("unable to unmarshal torrent info") } if len(name) == 0 { name = list.DUMMY } - if err := d.Storage.Put(d.StorageCtx, + logger.Err(d.Storage.Put(d.StorageCtx, storage.Entry{ Key: event.InfoHash.AsString(), Value: name, @@ -94,23 +97,28 @@ func build(conf conf.MapConfig, st storage.DataStorage) (container.Container, er }, storage.Entry{ Key: v2hash.TruncateV1().RawString(), Value: name, - }); err != nil { - lf["error"] = err - } - log.Debug("approval torrent added", lf) + })). + Str("action", "add"). + Str("file", event.TorrentFilePath). + Stringer("infoHash", event.InfoHash). + Stringer("infoHashV2", v2hash). + Msg("approval torrent watcher event") case dirwatch.Removed: - if err := d.Storage.Delete(c.StorageCtx, + logger.Err(d.Storage.Delete(c.StorageCtx, event.InfoHash.AsString(), v2hash.RawString(), v2hash.TruncateV1().RawString(), - ); err != nil { - lf["error"] = err - } - log.Debug("approval torrent deleted", lf) + )). + Str("action", "delete"). + Str("file", event.TorrentFilePath). + Stringer("infoHash", event.InfoHash). + Stringer("infoHashV2", v2hash). + Msg("approval torrent watcher event") } } else { - lf["error"] = err - log.Error("unable to load torrent file", lf) + logger.Error().Err(err). + Str("file", event.TorrentFilePath). + Msg("unable to load torrent file") } } }() @@ -124,6 +132,8 @@ type directory struct { // Stop closes watching of torrent directory func (d *directory) Stop() stop.Result { + st := make(stop.Channel) d.watcher.Close() - return stop.AlreadyStopped + st.Done() + return st.Result() } diff --git a/middleware/torrentapproval/container/list/list.go b/middleware/torrentapproval/container/list/list.go index ae51b43..eff0413 100644 --- a/middleware/torrentapproval/container/list/list.go +++ b/middleware/torrentapproval/container/list/list.go @@ -15,6 +15,8 @@ import ( // Name of this container for registry. const Name = "list" +var logger = log.NewLogger("torrent approval list") + func init() { container.Register(Name, build) } @@ -45,7 +47,11 @@ func build(conf conf.MapConfig, st storage.DataStorage) (container.Container, er } if len(l.StorageCtx) == 0 { - log.Info("Storage context not set, using default value: " + container.DefaultStorageCtxName) + logger.Warn(). + Str("name", "StorageCtx"). + Str("provided", l.StorageCtx). + Str("default", container.DefaultStorageCtxName). + Msg("falling back to default configuration") l.StorageCtx = container.DefaultStorageCtxName } @@ -93,7 +99,7 @@ func (l *List) Approved(hash bittorrent.InfoHash) (contains bool) { } } if err != nil { - log.Err(err) + logger.Error().Err(err).Stringer("infoHash", hash).Msg("unable load hash information from storage") } return contains != l.Invert } diff --git a/middleware/torrentapproval/torrentapproval_test.go b/middleware/torrentapproval/torrentapproval_test.go index 32c11d5..8bf69ba 100644 --- a/middleware/torrentapproval/torrentapproval_test.go +++ b/middleware/torrentapproval/torrentapproval_test.go @@ -9,9 +9,14 @@ import ( "github.com/sot-tech/mochi/bittorrent" "github.com/sot-tech/mochi/pkg/conf" + "github.com/sot-tech/mochi/pkg/log" "github.com/sot-tech/mochi/storage/memory" ) +func init() { + _ = log.ConfigureLogger("", "warn", false, false) +} + var cases = []struct { cfg baseConfig ih string diff --git a/pkg/conf/decoder.go b/pkg/conf/decoder.go index 10321b9..2f28be1 100644 --- a/pkg/conf/decoder.go +++ b/pkg/conf/decoder.go @@ -6,8 +6,7 @@ import ( "errors" "github.com/mitchellh/mapstructure" - - "github.com/sot-tech/mochi/pkg/log" + "github.com/rs/zerolog" ) // TagName is a tag name, used for decoder customization @@ -20,9 +19,9 @@ var ErrNilConfigMap = errors.New("unable to process nil map") // MapConfig is just alias for map[string]any type MapConfig map[string]any -// LogFields just returns this map as a set of Logrus fields. -func (m MapConfig) LogFields() log.Fields { - return log.Fields(m) +// MarshalZerologObject writes map into zerolog event +func (m MapConfig) MarshalZerologObject(e *zerolog.Event) { + e.Fields(map[string]any(m)) } // Unmarshal decodes receiver map into provided structure. diff --git a/pkg/log/log.go b/pkg/log/log.go index a32e69f..83e96dd 100644 --- a/pkg/log/log.go +++ b/pkg/log/log.go @@ -1,134 +1,304 @@ -// Package log adds a thin wrapper around logrus to improve non-debug logging -// performance. +// Package log adds a thin wrapper around zerolog to improve logging performance. +// +// Root logger (called by log.Info, log.Warn etc.) uses global zerolog.Logger instance +// until ConfigureLogger called. Any child logger created with NewLogger will not +// produce any events until logger configured, so any function which uses child +// logger will come stuck because of root initialization synchronization. package log import ( - "fmt" "io" + "os" + "strings" + "sync" - "github.com/sirupsen/logrus" + // needs for async file logging + _ "code.cloudfoundry.org/go-diodes" + "github.com/rs/zerolog" + "github.com/rs/zerolog/diode" + zl "github.com/rs/zerolog/log" ) var ( - l = logrus.New() - debug = false + root = zl.Logger + rootWg = sync.WaitGroup{} + customOut io.WriteCloser + customOutMu = sync.Mutex{} ) -// SetDebug controls debug logging. -func SetDebug(to bool) { - debug = to - l.Level = logrus.DebugLevel +func init() { + rootWg.Add(1) } -// SetFormatter sets the formatter. -func SetFormatter(to logrus.Formatter) { - l.Formatter = to -} - -// SetOutput sets the output. -func SetOutput(to io.Writer) { - l.Out = to -} - -// Fields is a map of logging fields. -type Fields map[string]any - -// LogFields implements Fielder for Fields. -func (f Fields) LogFields() Fields { - return f -} - -// A Fielder provides Fields via the LogFields method. -type Fielder interface { - LogFields() Fields -} - -// err is a wrapper around an error. -type err struct { - e error -} - -// LogFields provides Fields for logging. -func (e err) LogFields() Fields { - return Fields{ - "error": e.e.Error(), - "type": fmt.Sprintf("%T", e.e), - } -} - -// Err is a wrapper around errors that implements Fielder. -func Err(e error) Fielder { - return err{e} -} - -// mergeFielders merges the Fields of multiple Fielders. -// Fields from the first Fielder will be used unchanged, Fields from subsequent -// Fielders will be prefixed with "%d.", starting from 1. -// -// must be called with len(fielders) > 0 -func mergeFielders(fielders ...Fielder) logrus.Fields { - if fielders[0] == nil { - return nil - } - - fields := fielders[0].LogFields() - for i := 1; i < len(fielders); i++ { - if fielders[i] == nil { - continue - } - prefix := fmt.Sprint(i, ".") - ff := fielders[i].LogFields() - for k, v := range ff { - fields[prefix+k] = v - } - } - - return logrus.Fields(fields) -} - -// Debug logs at the debug level if debug logging is enabled. -func Debug(v any, fielders ...Fielder) { - if debug { - if len(fielders) != 0 { - l.WithFields(mergeFielders(fielders...)).Debug(v) +// ConfigureLogger initializes root and all child loggers. +// NOTE: this function MUST be called before any child log call +// otherwise any goroutine, which uses logger will wait logger initialization +func ConfigureLogger(output, level string, formatted, colored bool) (err error) { + lvl := zerolog.WarnLevel + output = strings.ToLower(output) + var w io.Writer + var stdAny bool + switch output { + case "stderr", "": + w, stdAny = os.Stderr, true + case "stdout": + w, stdAny = os.Stdout, true + default: + if w, err = os.OpenFile(output, os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0o600); err == nil { + customOutMu.Lock() + defer customOutMu.Unlock() + customOut = diode.NewWriter(w, 1000, 0, func(missed int) { + zl.Warn().Int("count", missed).Msg("Logger dropped messages") + }) + w = customOut } else { - l.Debug(v) + return err } } + if stdAny && formatted { + w = zerolog.ConsoleWriter{ + Out: w, + NoColor: !colored, + TimeFormat: "2006-01-02 15:04:05.999", + } + } + if len(level) > 0 { + if logLevel, err := zerolog.ParseLevel(strings.ToLower(level)); err == nil { + lvl = logLevel + } else { + return err + } + } + root = zerolog.New(w).With().Timestamp().Logger() + zerolog.SetGlobalLevel(lvl) + rootWg.Done() + return nil } -// Info logs at the info level. -func Info(v any, fielders ...Fielder) { - if len(fielders) != 0 { - l.WithFields(mergeFielders(fielders...)).Info(v) - } else { - l.Info(v) +// Logger is the holder for zerolog.Logger which +// waits until root logger initialized to prevent +// mixed logging format and output +type Logger struct { + comp string + zlOnce sync.Once + zerolog.Logger +} + +func (l *Logger) init() { + l.zlOnce.Do(func() { + rootWg.Wait() + l.Logger = root.With().Str("component", l.comp).Logger() + }) +} + +// ==== copied from zerolog ==== + +// Trace starts a new message with trace level. +// +// You must call Msg on the returned event in order to send the event. +func (l *Logger) Trace() *zerolog.Event { + l.init() + return l.Logger.Trace() +} + +// Debug starts a new message with debug level. +// +// You must call Msg on the returned event in order to send the event. +func (l *Logger) Debug() *zerolog.Event { + l.init() + return l.Logger.Debug() +} + +// Info starts a new message with info level. +// +// You must call Msg on the returned event in order to send the event. +func (l *Logger) Info() *zerolog.Event { + l.init() + return l.Logger.Info() +} + +// Warn starts a new message with warn level. +// +// You must call Msg on the returned event in order to send the event. +func (l *Logger) Warn() *zerolog.Event { + l.init() + return l.Logger.Warn() +} + +// Error starts a new message with error level. +// +// You must call Msg on the returned event in order to send the event. +func (l *Logger) Error() *zerolog.Event { + l.init() + return l.Logger.Error() +} + +// Err starts a new message with error level with err as a field if not nil or +// with info level if err is nil. +// +// You must call Msg on the returned event in order to send the event. +func (l *Logger) Err(err error) *zerolog.Event { + l.init() + return l.Logger.Err(err) +} + +// Fatal starts a new message with fatal level. The os.Exit(1) function +// is called by the Msg method, which terminates the program immediately. +// +// You must call Msg on the returned event in order to send the event. +func (l *Logger) Fatal() *zerolog.Event { + l.init() + return l.Logger.Fatal() +} + +// Panic starts a new message with panic level. The panic() function +// is called by the Msg method, which stops the ordinary flow of a goroutine. +// +// You must call Msg on the returned event in order to send the event. +func (l *Logger) Panic() *zerolog.Event { + l.init() + return l.Logger.Panic() +} + +// WithLevel starts a new message with level. Unlike Fatal and Panic +// methods, WithLevel does not terminate the program or stop the ordinary +// flow of a gourotine when used with their respective levels. +// +// You must call Msg on the returned event in order to send the event. +func (l *Logger) WithLevel(level zerolog.Level) *zerolog.Event { + l.init() + return l.Logger.WithLevel(level) +} + +// Log starts a new message with no level. Setting GlobalLevel to Disabled +// will still disable events produced by this method. +// +// You must call Msg on the returned event in order to send the event. +func (l *Logger) Log() *zerolog.Event { + l.init() + return l.Logger.Log() +} + +// Print sends a log event using debug level and no extra field. +// Arguments are handled in the manner of fmt.Print. +func (l *Logger) Print(v ...interface{}) { + l.init() + l.Logger.Print(v...) +} + +// Printf sends a log event using debug level and no extra field. +// Arguments are handled in the manner of fmt.Printf. +func (l *Logger) Printf(format string, v ...interface{}) { + l.init() + l.Logger.Printf(format, v...) +} + +// Write implements the io.Writer interface. This is useful to set as a writer +// for the standard library log. +func (l *Logger) Write(p []byte) (n int, err error) { + l.init() + return l.Logger.Write(p) +} + +// Err starts a new message with error level with err as a field if not nil or +// with info level if err is nil. +// +// You must call Msg on the returned event in order to send the event. +func Err(err error) *zerolog.Event { + return root.Err(err) +} + +// Trace starts a new message with trace level. +// +// You must call Msg on the returned event in order to send the event. +func Trace() *zerolog.Event { + return root.Trace() +} + +// Debug starts a new message with debug level. +// +// You must call Msg on the returned event in order to send the event. +func Debug() *zerolog.Event { + return root.Debug() +} + +// Info starts a new message with info level. +// +// You must call Msg on the returned event in order to send the event. +func Info() *zerolog.Event { + return root.Info() +} + +// Warn starts a new message with warn level. +// +// You must call Msg on the returned event in order to send the event. +func Warn() *zerolog.Event { + return root.Warn() +} + +// Error starts a new message with error level. +// +// You must call Msg on the returned event in order to send the event. +func Error() *zerolog.Event { + return root.Error() +} + +// Fatal starts a new message with fatal level. The os.Exit(1) function +// is called by the Msg method. +// +// You must call Msg on the returned event in order to send the event. +func Fatal() *zerolog.Event { + return root.Fatal() +} + +// Panic starts a new message with panic level. The message is also sent +// to the panic function. +// +// You must call Msg on the returned event in order to send the event. +func Panic() *zerolog.Event { + return root.Panic() +} + +// WithLevel starts a new message with level. +// +// You must call Msg on the returned event in order to send the event. +func WithLevel(level zerolog.Level) *zerolog.Event { + return root.WithLevel(level) +} + +// Log starts a new message with no level. Setting zerolog.GlobalLevel to +// zerolog.Disabled will still disable events produced by this method. +// +// You must call Msg on the returned event in order to send the event. +func Log() *zerolog.Event { + return root.Log() +} + +// Print sends a log event using debug level and no extra field. +// Arguments are handled in the manner of fmt.Print. +func Print(v ...interface{}) { + root.Print(v...) +} + +// Printf sends a log event using debug level and no extra field. +// Arguments are handled in the manner of fmt.Printf. +func Printf(format string, v ...interface{}) { + root.Printf(format, v...) +} + +// Close closes custom output writer if it configured +func Close() { + customOutMu.Lock() + defer customOutMu.Unlock() + if customOut != nil { + _ = customOut.Close() + customOut = nil } } -// Warn logs at the warning level. -func Warn(v any, fielders ...Fielder) { - if len(fielders) != 0 { - l.WithFields(mergeFielders(fielders...)).Warn(v) - } else { - l.Warn(v) - } -} - -// Error logs at the error level. -func Error(v any, fielders ...Fielder) { - if len(fielders) != 0 { - l.WithFields(mergeFielders(fielders...)).Error(v) - } else { - l.Error(v) - } -} - -// Fatal logs at the fatal level and exits with a status code != 0. -func Fatal(v any, fielders ...Fielder) { - if len(fielders) != 0 { - l.WithFields(mergeFielders(fielders...)).Fatal(v) - } else { - l.Fatal(v) - } +// NewLogger creates child logger with specified component name +// NOTE: root logger MUST be initialized with ConfigureLogger +// before any logger call +func NewLogger(component string) *Logger { + return &Logger{comp: component} } diff --git a/pkg/metrics/server.go b/pkg/metrics/server.go index 53a9282..30b1f4e 100644 --- a/pkg/metrics/server.go +++ b/pkg/metrics/server.go @@ -16,7 +16,10 @@ import ( "github.com/sot-tech/mochi/pkg/stop" ) -var serverCounter = new(int32) +var ( + logger = log.NewLogger("metrics") + serverCounter = new(int32) +) // Enabled indicates that configured at least one metrics server func Enabled() bool { @@ -74,7 +77,7 @@ func NewServer(addr string) *Server { atomic.AddInt32(serverCounter, 1) defer atomic.AddInt32(serverCounter, -1) if err := s.srv.ListenAndServe(); !errors.Is(err, http.ErrServerClosed) { - log.Fatal("failed while serving prometheus", log.Err(err)) + logger.Error().Err(err).Msg("failed while serving prometheus") } }() diff --git a/storage/keydb/storage.go b/storage/keydb/storage.go index 07eb4c7..b972d8a 100644 --- a/storage/keydb/storage.go +++ b/storage/keydb/storage.go @@ -14,6 +14,7 @@ import ( "errors" "github.com/go-redis/redis/v8" + "github.com/rs/zerolog" "github.com/sot-tech/mochi/bittorrent" "github.com/sot-tech/mochi/pkg/conf" @@ -29,9 +30,12 @@ const ( expireMemberCmd = "EXPIREMEMBER" ) -// ErrNotKeyDB returned from initializer if connected does not support KeyDB -// specific command (EXPIREMEMBER) -var ErrNotKeyDB = errors.New("provided instance seems not KeyDB") +var ( + logger = log.NewLogger(Name) + // ErrNotKeyDB returned from initializer if connected does not support KeyDB + // specific command (EXPIREMEMBER) + ErrNotKeyDB = errors.New("provided instance seems not KeyDB") +) func init() { // Register the storage driver. @@ -72,10 +76,8 @@ func newStore(cfg r.Config) (*store, error) { if err == nil { st = &store{ Connection: rs, - logFields: cfg.LogFields(), peerTTL: uint(cfg.PeerLifetime.Seconds()), } - st.logFields["name"] = Name } return st, err @@ -83,8 +85,12 @@ func newStore(cfg r.Config) (*store, error) { type store struct { r.Connection - logFields log.Fields - peerTTL uint + peerTTL uint +} + +// MarshalZerologObject writes configuration into zerolog event +func (s store) MarshalZerologObject(e *zerolog.Event) { + e.Str("type", Name).Object("config", s.Config) } func (s store) setPeerTTL(infoHashKey, peerID string) error { @@ -92,10 +98,10 @@ func (s store) setPeerTTL(infoHashKey, peerID string) error { } func (s store) addPeer(infoHashKey, peerID string) (err error) { - log.Debug("storage: KeyDB: PutPeer", log.Fields{ - "infoHashKey": infoHashKey, - "peerID": peerID, - }) + logger.Trace(). + Str("infoHashKey", infoHashKey). + Str("peerID", peerID). + Msg("add peer") if err = s.SAdd(context.TODO(), infoHashKey, peerID).Err(); err == nil { err = s.setPeerTTL(infoHashKey, peerID) } @@ -103,10 +109,10 @@ func (s store) addPeer(infoHashKey, peerID string) (err error) { } func (s store) delPeer(infoHashKey, peerID string) error { - log.Debug("storage: KeyDB: DeletePeer", log.Fields{ - "infoHashKey": infoHashKey, - "peerID": peerID, - }) + logger.Trace(). + Str("infoHashKey", infoHashKey). + Str("peerID", peerID). + Msg("del peer") deleted, err := s.SRem(context.TODO(), infoHashKey, peerID).Uint64() err = r.AsNil(err) if err == nil && deleted == 0 { @@ -133,10 +139,10 @@ func (s store) DeleteLeecher(ih bittorrent.InfoHash, peer bittorrent.Peer) error } func (s store) GraduateLeecher(ih bittorrent.InfoHash, peer bittorrent.Peer) (err error) { - log.Debug("storage: KeyDB: GraduateLeecher", log.Fields{ - "infoHash": ih, - "peer": peer, - }) + logger.Trace(). + Stringer("infoHash", ih). + Object("peer", peer). + Msg("graduate leecher") infoHash, peerID := ih.RawString(), peer.RawString() ihSeederKey := r.InfoHashKey(infoHash, true, peer.Addr().Is6()) ihLeecherKey := r.InfoHashKey(infoHash, false, peer.Addr().Is6()) @@ -153,12 +159,12 @@ func (s store) GraduateLeecher(ih bittorrent.InfoHash, peer bittorrent.Peer) (er // AnnouncePeers is the same function as redis.AnnouncePeers func (s store) AnnouncePeers(ih bittorrent.InfoHash, seeder bool, numWant int, v6 bool) ([]bittorrent.Peer, error) { - log.Debug("storage: KeyDB: AnnouncePeers", log.Fields{ - "infoHash": ih, - "seeder": seeder, - "numWant": numWant, - "v6": v6, - }) + logger.Trace(). + Stringer("infoHash", ih). + Bool("seeder", seeder). + Int("numWant", numWant). + Bool("v6", v6). + Msg("announce peers") return s.GetPeers(ih, seeder, numWant, v6, func(ctx context.Context, infoHashKey string, maxCount int) *redis.StringSliceCmd { return s.SRandMemberN(context.TODO(), infoHashKey, int64(maxCount)) @@ -167,9 +173,9 @@ func (s store) AnnouncePeers(ih bittorrent.InfoHash, seeder bool, numWant int, v // ScrapeSwarm is the same function as redis.ScrapeSwarm except `SCard` call instead of `HLen` func (s store) ScrapeSwarm(ih bittorrent.InfoHash) (leechers uint32, seeders uint32, snatched uint32) { - log.Debug("storage: KeyDB ScrapeSwarm", log.Fields{ - "infoHash": ih, - }) + logger.Trace(). + Stringer("infoHash", ih). + Msg("scrape swarm") leechers, seeders = s.CountPeers(ih, s.SCard) return } @@ -182,7 +188,3 @@ func (s *store) Stop() stop.Result { } return c.Result() } - -func (s store) LogFields() log.Fields { - return s.logFields -} diff --git a/storage/memory/storage.go b/storage/memory/storage.go index 29366b8..fce72f6 100644 --- a/storage/memory/storage.go +++ b/storage/memory/storage.go @@ -11,6 +11,8 @@ import ( "sync" "time" + "github.com/rs/zerolog" + "github.com/sot-tech/mochi/bittorrent" "github.com/sot-tech/mochi/pkg/conf" "github.com/sot-tech/mochi/pkg/log" @@ -27,6 +29,8 @@ const ( defaultShardCount = 1024 ) +var logger = log.NewLogger(Name) + func init() { // Register the storage driver. storage.RegisterBuilder(Name, builder) @@ -45,12 +49,9 @@ type Config struct { ShardCount int `cfg:"shard_count"` } -// LogFields renders the current config as a set of Logrus fields. -func (cfg Config) LogFields() log.Fields { - return log.Fields{ - "name": Name, - "shardCount": cfg.ShardCount, - } +// MarshalZerologObject writes configuration into zerolog event +func (cfg Config) MarshalZerologObject(e *zerolog.Event) { + e.Int("shardCount", cfg.ShardCount) } // Validate sanity checks values set in a config and returns a new config with @@ -62,11 +63,11 @@ func (cfg Config) Validate() Config { if cfg.ShardCount <= 0 || cfg.ShardCount > (math.MaxInt/2) { validcfg.ShardCount = defaultShardCount - log.Warn("falling back to default configuration", log.Fields{ - "name": Name + ".ShardCount", - "provided": cfg.ShardCount, - "default": validcfg.ShardCount, - }) + log.Warn(). + Str("name", "ShardCount"). + Int("provided", cfg.ShardCount). + Int("default", validcfg.ShardCount). + Msg("falling back to default configuration") } return validcfg @@ -111,6 +112,11 @@ type peerStore struct { wg sync.WaitGroup } +// MarshalZerologObject writes configuration into zerolog event +func (ps *peerStore) MarshalZerologObject(e *zerolog.Event) { + e.Str("type", Name).Object("config", ps.cfg) +} + var _ storage.PeerStorage = &peerStore{} func (ps *peerStore) ScheduleGC(gcInterval, peerLifeTime time.Duration) { @@ -125,10 +131,12 @@ func (ps *peerStore) ScheduleGC(gcInterval, peerLifeTime time.Duration) { return case <-t.C: before := time.Now().Add(-peerLifeTime) - log.Debug("storage: Memory purging peers with no announces since", log.Fields{"before": before}) + logger.Trace().Time("before", before).Msg("purging peers with no announces") start := time.Now() ps.gc(before) - storage.PromGCDurationMilliseconds.Observe(float64(time.Since(start).Nanoseconds()) / float64(time.Millisecond)) + duration := time.Since(start) + logger.Debug().Dur("timeTaken", duration).Msg("gc complete") + storage.PromGCDurationMilliseconds.Observe(float64(duration.Milliseconds())) } } }() @@ -162,7 +170,7 @@ func (ps *peerStore) ScheduleStatisticsCollection(reportInterval time.Duration) storage.PromInfoHashesCount.Set(float64(numInfohashes)) storage.PromSeedersCount.Set(float64(numSeeders)) storage.PromLeechersCount.Set(float64(numLeechers)) - log.Debug("storage: Memory: populateProm() finished", log.Fields{"timeTaken": time.Since(before)}) + logger.Debug().TimeDiff("timeTaken", time.Now(), before).Msg("populate prom complete") } } } @@ -190,6 +198,10 @@ func (ps *peerStore) PutSeeder(ih bittorrent.InfoHash, p bittorrent.Peer) error panic("attempted to interact with stopped memory store") default: } + logger.Trace(). + Stringer("infoHash", ih). + Object("peer", p). + Msg("put seeder") shard := ps.shards[ps.shardIndex(ih, p.Addr().Is6())] shard.Lock() @@ -219,6 +231,10 @@ func (ps *peerStore) DeleteSeeder(ih bittorrent.InfoHash, p bittorrent.Peer) err panic("attempted to interact with stopped memory store") default: } + logger.Trace(). + Stringer("infoHash", ih). + Object("peer", p). + Msg("delete seeder") shard := ps.shards[ps.shardIndex(ih, p.Addr().Is6())] shard.Lock() @@ -248,6 +264,10 @@ func (ps *peerStore) PutLeecher(ih bittorrent.InfoHash, p bittorrent.Peer) error panic("attempted to interact with stopped memory store") default: } + logger.Trace(). + Stringer("infoHash", ih). + Object("peer", p). + Msg("put leecher") shard := ps.shards[ps.shardIndex(ih, p.Addr().Is6())] shard.Lock() @@ -277,6 +297,10 @@ func (ps *peerStore) DeleteLeecher(ih bittorrent.InfoHash, p bittorrent.Peer) er panic("attempted to interact with stopped memory store") default: } + logger.Trace(). + Stringer("infoHash", ih). + Object("peer", p). + Msg("delete leecher") shard := ps.shards[ps.shardIndex(ih, p.Addr().Is6())] shard.Lock() @@ -306,6 +330,10 @@ func (ps *peerStore) GraduateLeecher(ih bittorrent.InfoHash, p bittorrent.Peer) panic("attempted to interact with stopped memory store") default: } + logger.Trace(). + Stringer("infoHash", ih). + Object("peer", p). + Msg("graduate leecher") shard := ps.shards[ps.shardIndex(ih, p.Addr().Is6())] shard.Lock() @@ -368,6 +396,12 @@ func (ps *peerStore) AnnouncePeers(ih bittorrent.InfoHash, seeder bool, numWant panic("attempted to interact with stopped memory store") default: } + logger.Trace(). + Stringer("infoHash", ih). + Bool("seeder", seeder). + Int("numWant", numWant). + Bool("v6", v6). + Msg("announce peers") peers = ps.getPeers(ps.shards[ps.shardIndex(ih, v6)], ih, numWant, seeder) @@ -391,6 +425,9 @@ func (ps *peerStore) ScrapeSwarm(ih bittorrent.InfoHash) (leechers uint32, seede panic("attempted to interact with stopped memory store") default: } + logger.Trace(). + Stringer("infoHash", ih). + Msg("scrape swarm") leechers, seeders = ps.countPeers(ih, true) l, s := ps.countPeers(ih, false) @@ -540,7 +577,3 @@ func (ps *peerStore) Stop() stop.Result { return c.Result() } - -func (ps *peerStore) LogFields() log.Fields { - return ps.cfg.LogFields() -} diff --git a/storage/redis/storage.go b/storage/redis/storage.go index 1e3a9c3..903db27 100644 --- a/storage/redis/storage.go +++ b/storage/redis/storage.go @@ -28,6 +28,7 @@ import ( "time" "github.com/go-redis/redis/v8" + "github.com/rs/zerolog" "github.com/sot-tech/mochi/bittorrent" "github.com/sot-tech/mochi/pkg/conf" @@ -64,8 +65,11 @@ const ( CountLeecherKey = "CHI_C_L" ) -// ErrSentinelAndClusterChecked returned from initializer if both Config.Sentinel and Config.Cluster provided -var ErrSentinelAndClusterChecked = errors.New("unable to use both cluster and sentinel mode") +var ( + logger = log.NewLogger(Name) + // ErrSentinelAndClusterChecked returned from initializer if both Config.Sentinel and Config.Cluster provided + ErrSentinelAndClusterChecked = errors.New("unable to use both cluster and sentinel mode") +) func init() { // Register the storage builder. @@ -98,7 +102,6 @@ func newStore(cfg Config) (*store, error) { Connection: rs, closed: make(chan any), wg: sync.WaitGroup{}, - logFields: cfg.LogFields(), }, nil } @@ -118,21 +121,18 @@ type Config struct { ConnectTimeout time.Duration `cfg:"connect_timeout"` } -// LogFields renders the current config as a set of Logrus fields. -func (cfg Config) LogFields() log.Fields { - return log.Fields{ - "name": Name, - "peerLifetime": cfg.PeerLifetime, - "addresses": cfg.Addresses, - "db": cfg.DB, - "poolSize": cfg.PoolSize, - "sentinel": cfg.Sentinel, - "sentinelMaster": cfg.SentinelMaster, - "cluster": cfg.Cluster, - "readTimeout": cfg.ReadTimeout, - "writeTimeout": cfg.WriteTimeout, - "connectTimeout": cfg.ConnectTimeout, - } +// MarshalZerologObject writes configuration fields into zerolog event +func (cfg Config) MarshalZerologObject(e *zerolog.Event) { + e.Strs("addresses", cfg.Addresses). + Int("db", cfg.DB). + Int("poolSize", cfg.PoolSize). + Bool("sentinel", cfg.Sentinel). + Str("sentinelMaster", cfg.SentinelMaster). + Bool("cluster", cfg.Cluster). + Dur("readTimeout", cfg.ReadTimeout). + Dur("writeTimeout", cfg.WriteTimeout). + Dur("connectTimeout", cfg.ConnectTimeout). + Dur("peerLifetime", cfg.PeerLifetime) } // Validate sanity checks values set in a config and returns a new config with @@ -157,38 +157,38 @@ func (cfg Config) Validate() (Config, error) { validCfg.Addresses = addresses if len(cfg.Addresses) == 0 { validCfg.Addresses = []string{defaultRedisAddress} - log.Warn("falling back to default configuration", log.Fields{ - "name": Name + ".Addresses", - "provided": cfg.Addresses, - "default": validCfg.Addresses, - }) + logger.Warn(). + Str("name", "Addresses"). + Strs("provided", cfg.Addresses). + Strs("default", validCfg.Addresses). + Msg("falling back to default configuration") } if cfg.ReadTimeout <= 0 { validCfg.ReadTimeout = defaultReadTimeout - log.Warn("falling back to default configuration", log.Fields{ - "name": Name + ".ReadTimeout", - "provided": cfg.ReadTimeout, - "default": validCfg.ReadTimeout, - }) + logger.Warn(). + Str("name", "ReadTimeout"). + Dur("provided", cfg.ReadTimeout). + Dur("default", validCfg.ReadTimeout). + Msg("falling back to default configuration") } if cfg.WriteTimeout <= 0 { validCfg.WriteTimeout = defaultWriteTimeout - log.Warn("falling back to default configuration", log.Fields{ - "name": Name + ".WriteTimeout", - "provided": cfg.WriteTimeout, - "default": validCfg.WriteTimeout, - }) + logger.Warn(). + Str("name", "WriteTimeout"). + Dur("provided", cfg.WriteTimeout). + Dur("default", validCfg.WriteTimeout). + Msg("falling back to default configuration") } if cfg.ConnectTimeout <= 0 { validCfg.ConnectTimeout = defaultConnectTimeout - log.Warn("falling back to default configuration", log.Fields{ - "name": Name + ".ConnectTimeout", - "provided": cfg.ConnectTimeout, - "default": validCfg.ConnectTimeout, - }) + logger.Warn(). + Str("name", "ConnectTimeout"). + Dur("provided", cfg.ConnectTimeout). + Dur("default", validCfg.ConnectTimeout). + Msg("falling back to default configuration") } return validCfg, nil @@ -238,7 +238,13 @@ func (cfg Config) Connect() (con Connection, err error) { _ = rs.Close() rs = nil } - return Connection{rs}, err + cfg.Login, cfg.Password = "", "" + return Connection{rs, cfg}, err +} + +// MarshalZerologObject writes configuration into zerolog event +func (ps *store) MarshalZerologObject(e *zerolog.Event) { + e.Str("type", Name).Object("config", ps.Config) } func (ps *store) ScheduleGC(gcInterval, peerLifeTime time.Duration) { @@ -254,9 +260,9 @@ func (ps *store) ScheduleGC(gcInterval, peerLifeTime time.Duration) { case <-t.C: start := time.Now() ps.gc(time.Now().Add(-peerLifeTime)) - duration := time.Since(start).Milliseconds() - log.Debug("storage: Redis: recordGCDuration", log.Fields{"timeTaken(ms)": duration}) - storage.PromGCDurationMilliseconds.Observe(float64(duration)) + duration := time.Since(start) + logger.Debug().Dur("timeTaken", duration).Msg("gc complete") + storage.PromGCDurationMilliseconds.Observe(float64(duration.Milliseconds())) t.Reset(gcInterval) } } @@ -285,7 +291,7 @@ func (ps *store) ScheduleStatisticsCollection(reportInterval time.Duration) { storage.PromInfoHashesCount.Set(float64(numInfoHashes)) storage.PromSeedersCount.Set(float64(numSeeders)) storage.PromLeechersCount.Set(float64(numLeechers)) - log.Debug("storage: Redis: populateProm() finished", log.Fields{"timeTaken": time.Since(before)}) + logger.Debug().TimeDiff("timeTaken", time.Now(), before).Msg("populate prom complete") } } } @@ -295,13 +301,13 @@ func (ps *store) ScheduleStatisticsCollection(reportInterval time.Duration) { // Connection is wrapper for redis.UniversalClient type Connection struct { redis.UniversalClient + Config } type store struct { Connection - closed chan any - wg sync.WaitGroup - logFields log.Fields + closed chan any + wg sync.WaitGroup } func (ps *store) count(key string, getLength bool) (n uint64) { @@ -313,10 +319,7 @@ func (ps *store) count(key string, getLength bool) (n uint64) { } err = AsNil(err) if err != nil { - log.Error("storage: Redis: GET/SCARD failure", log.Fields{ - "key": key, - "error": err, - }) + logger.Error().Err(err).Str("key", key).Msg("storage: Redis: GET/SCARD failure") } return } @@ -375,11 +378,10 @@ func InfoHashKey(infoHash string, seeder, v6 bool) (infoHashKey string) { } func (ps *store) putPeer(infoHashKey, peerCountKey, peerID string) error { - log.Debug("storage: Redis: PutPeer", log.Fields{ - "infoHashKey": infoHashKey, - "peerCountKey": peerCountKey, - "peerID": peerID, - }) + logger.Trace(). + Str("infoHashKey", infoHashKey). + Str("peerID", peerID). + Msg("put peer") return ps.tx(func(tx redis.Pipeliner) (err error) { if err = tx.HSet(context.TODO(), infoHashKey, peerID, ps.getClock()).Err(); err != nil { return @@ -393,11 +395,10 @@ func (ps *store) putPeer(infoHashKey, peerCountKey, peerID string) error { } func (ps *store) delPeer(infoHashKey, peerCountKey, peerID string) error { - log.Debug("storage: Redis: DeletePeer", log.Fields{ - "infoHashKey": infoHashKey, - "peerCountKey": peerCountKey, - "peerID": peerID, - }) + logger.Trace(). + Str("infoHashKey", infoHashKey). + Str("peerID", peerID). + Msg("del peer") deleted, err := ps.HDel(context.TODO(), infoHashKey, peerID).Uint64() err = AsNil(err) if err == nil { @@ -428,10 +429,10 @@ func (ps *store) DeleteLeecher(ih bittorrent.InfoHash, peer bittorrent.Peer) err } func (ps *store) GraduateLeecher(ih bittorrent.InfoHash, peer bittorrent.Peer) error { - log.Debug("storage: Redis: GraduateLeecher", log.Fields{ - "infoHash": ih, - "peer": peer, - }) + logger.Trace(). + Stringer("infoHash", ih). + Object("peer", peer). + Msg("graduate leecher") infoHash, peerID, isV6 := ih.RawString(), peer.RawString(), peer.Addr().Is6() ihSeederKey, ihLeecherKey := InfoHashKey(infoHash, true, isV6), InfoHashKey(infoHash, false, isV6) @@ -465,7 +466,7 @@ func (ps Connection) parsePeersList(peersResult *redis.StringSliceCmd) (peers [] if p, err := bittorrent.NewPeer(peerID); err == nil { peers = append(peers, p) } else { - log.Error("storage: Redis: unable to decode leecher", log.Fields{"peerID": peerID}) + logger.Error().Err(err).Str("peerID", peerID).Msg("unable to decode peer") } } } @@ -506,22 +507,19 @@ func (ps Connection) GetPeers(ih bittorrent.InfoHash, forSeeder bool, maxCount i } } else if l > 0 { err = nil - log.Warn("storage: Redis: error occurred while retrieving peers", log.Fields{ - "infoHash": infoHash, - "error": err, - }) + logger.Warn().Err(err).Stringer("infoHash", ih).Msg("error occurred while retrieving peers") } return } func (ps *store) AnnouncePeers(ih bittorrent.InfoHash, seeder bool, numWant int, v6 bool) ([]bittorrent.Peer, error) { - log.Debug("storage: Redis: AnnouncePeers", log.Fields{ - "infoHash": ih, - "seeder": seeder, - "numWant": numWant, - "peer": v6, - }) + logger.Trace(). + Stringer("infoHash", ih). + Bool("seeder", seeder). + Int("numWant", numWant). + Bool("v6", v6). + Msg("announce peers") return ps.GetPeers(ih, seeder, numWant, v6, func(ctx context.Context, infoHashKey string, maxCount int) *redis.StringSliceCmd { return ps.HRandField(ctx, infoHashKey, maxCount, false) @@ -534,10 +532,7 @@ func (ps Connection) countPeers(infoHashKey string, countFn getPeerCountFn) uint count, err := countFn(context.TODO(), infoHashKey).Result() err = AsNil(err) if err != nil { - log.Error("storage: Redis: key size calculation failure", log.Fields{ - "infoHashKey": infoHashKey, - "error": err, - }) + logger.Error().Err(err).Str("infoHashKey", infoHashKey).Msg("key size calculation failure") } return uint32(count) } @@ -555,9 +550,9 @@ func (ps Connection) CountPeers(ih bittorrent.InfoHash, countFn getPeerCountFn) } func (ps *store) ScrapeSwarm(ih bittorrent.InfoHash) (leechers uint32, seeders uint32, snatched uint32) { - log.Debug("storage: Redis ScrapeSwarm", log.Fields{ - "infoHash": ih, - }) + logger.Trace(). + Stringer("infoHash", ih). + Msg("scrape swarm") leechers, seeders = ps.CountPeers(ih, ps.HLen) @@ -579,7 +574,7 @@ func (ps Connection) Put(ctx string, values ...storage.Entry) (err error) { err = ps.HSet(context.TODO(), PrefixKey+ctx, args...).Err() if err != nil { if strings.Contains(err.Error(), argNumErrorMsg) { - log.Warn("This Redis version/implementation does not support variadic arguments for HSET") + logger.Warn().Msg("This Redis version/implementation does not support variadic arguments for HSET") for _, p := range values { if err = ps.HSet(context.TODO(), PrefixKey+ctx, p.Key, p.Value).Err(); err != nil { break @@ -613,7 +608,7 @@ func (ps Connection) Delete(ctx string, keys ...string) (err error) { err = AsNil(ps.HDel(context.TODO(), PrefixKey+ctx, keys...).Err()) if err != nil { if strings.Contains(err.Error(), argNumErrorMsg) { - log.Warn("This Redis version/implementation does not support variadic arguments for HDEL") + logger.Warn().Msg("This Redis version/implementation does not support variadic arguments for HDEL") for _, k := range keys { if err = AsNil(ps.HDel(context.TODO(), PrefixKey+ctx, k).Err()); err != nil { break @@ -676,7 +671,6 @@ func (Connection) Preservable() bool { // transaction. The infohash key will remain in the addressFamil hash and // we'll attempt to clean it up the next time gc runs. func (ps *store) gc(cutoff time.Time) { - log.Debug("storage: Redis: purging peers with no announces since", log.Fields{"before": cutoff}) cutoffNanos := cutoff.UnixNano() // list all infoHashKeys in the group infoHashKeys, err := ps.SMembers(context.Background(), IHKey).Result() @@ -690,7 +684,7 @@ func (ps *store) gc(cutoff time.Time) { } else if strings.HasPrefix(infoHashKey, IH4LeecherKey) || strings.HasPrefix(infoHashKey, IH6LeecherKey) { cntKey = CountLeecherKey } else { - log.Warn("storage: Redis: unexpected record found in info hash set", log.Fields{"infoHashKey": infoHashKey}) + logger.Warn().Str("infoHashKey", infoHashKey).Msg("unexpected record found in info hash set") continue } // list all (peer, timeout) pairs for the ih @@ -701,16 +695,15 @@ func (ps *store) gc(cutoff time.Time) { for peerID, timeStamp := range peerList { if mtime, err := strconv.ParseInt(timeStamp, 10, 64); err == nil { if mtime <= cutoffNanos { - log.Debug("storage: Redis: adding peer to remove list", log.Fields{"peerID": peerID}) + logger.Trace().Str("peerID", peerID).Msg("adding peer to remove list") peersToRemove = append(peersToRemove, peerID) } } else { - log.Error("storage: Redis: unable to decode peer timestamp", log.Fields{ - "infoHashKey": infoHashKey, - "peerID": peerID, - "timestamp": timeStamp, - "error": err, - }) + logger.Error().Err(err). + Str("infoHashKey", infoHashKey). + Str("peerID", peerID). + Str("timestamp", timeStamp). + Msg("unable to decode peer timestamp") } } if len(peersToRemove) > 0 { @@ -718,35 +711,32 @@ func (ps *store) gc(cutoff time.Time) { err = AsNil(err) if err != nil { if strings.Contains(err.Error(), argNumErrorMsg) { - log.Warn("This Redis version/implementation does not support variadic arguments for HDEL") + logger.Warn().Msg("This Redis version/implementation does not support variadic arguments for HDEL") for _, k := range peersToRemove { count, err := ps.HDel(context.Background(), infoHashKey, k).Result() err = AsNil(err) if err != nil { - log.Error("storage: Redis: unable to delete peer", log.Fields{ - "infoHashKey": infoHashKey, - "peerID": k, - "error": err, - }) + logger.Error().Err(err). + Str("infoHashKey", infoHashKey). + Str("peerID", k). + Msg("unable to delete peer") } else { removedPeerCount += count } } } else { - log.Error("storage: Redis: unable to delete peers", log.Fields{ - "infoHashKey": infoHashKey, - "peerIds": peersToRemove, - "error": err, - }) + logger.Error().Err(err). + Str("infoHashKey", infoHashKey). + Strs("peerIDs", peersToRemove). + Msg("unable to delete peers") } } if removedPeerCount > 0 { // DECR seeder/leecher counter if err = ps.DecrBy(context.Background(), cntKey, removedPeerCount).Err(); err != nil { - log.Error("storage: Redis: unable to decrement seeder/leecher peer count", log.Fields{ - "infoHashKey": infoHashKey, - "countKey": cntKey, - "error": err, - }) + logger.Error().Err(err). + Str("infoHashKey", infoHashKey). + Str("countKey", cntKey). + Msg("unable to decrement seeder/leecher peer count") } } } @@ -764,20 +754,20 @@ func (ps *store) gc(cutoff time.Time) { return err }, infoHashKey)) if err != nil { - log.Error("storage: Redis: unable to clean info hash records", log.Fields{ - "infoHashKey": infoHashKey, - "error": err, - }) + logger.Error().Err(err). + Str("infoHashKey", infoHashKey). + Msg("unable to clean info hash records") } } else { - log.Error("storage: Redis: unable to fetch info hash peers", log.Fields{ - "infoHashKey": infoHashKey, - "error": err, - }) + logger.Error().Err(err). + Str("infoHashKey", infoHashKey). + Msg("unable to fetch info hash peers") } } } else { - log.Error("storage: Redis: unable to fetch info hash set", log.Fields{"hashSet": IHKey, "error": err}) + logger.Error().Err(err). + Str("hashSet", IHKey). + Msg("unable to fetch info hash peers") } } @@ -790,7 +780,7 @@ func (ps *store) Stop() stop.Result { ps.wg.Wait() var err error if ps.UniversalClient != nil { - log.Info("storage: Redis: exiting. mochi does not clear data in redis when exiting. mochi keys have prefix " + PrefixKey) + logger.Info().Msg("storage: Redis: exiting. mochi does not clear data in redis when exiting. mochi keys have prefix " + PrefixKey) err = ps.UniversalClient.Close() ps.UniversalClient = nil } @@ -799,11 +789,3 @@ func (ps *store) Stop() stop.Result { return c.Result() } - -func (ps *store) LogFields() log.Fields { - fields := make(log.Fields, len(ps.logFields)) - for k, v := range ps.logFields { - fields[k] = v - } - return fields -} diff --git a/storage/storage.go b/storage/storage.go index 37036e9..70f2c6c 100644 --- a/storage/storage.go +++ b/storage/storage.go @@ -7,6 +7,8 @@ import ( "sync" "time" + "github.com/rs/zerolog" + "github.com/sot-tech/mochi/bittorrent" "github.com/sot-tech/mochi/pkg/conf" "github.com/sot-tech/mochi/pkg/log" @@ -20,6 +22,7 @@ const ( ) var ( + logger = log.NewLogger("storage configurator") driversM sync.RWMutex drivers = make(map[string]Builder) ) @@ -38,21 +41,21 @@ type Config struct { func (c Config) sanitizeGCConfig() (gcInterval, peerTTL time.Duration) { if c.GarbageCollectionInterval <= 0 { gcInterval = defaultGarbageCollectionInterval - log.Warn("falling back to default configuration", log.Fields{ - "name": "GarbageCollectionInterval", - "provided": c.GarbageCollectionInterval, - "default": defaultGarbageCollectionInterval, - }) + logger.Warn(). + Str("name", "GarbageCollectionInterval"). + Dur("provided", c.GarbageCollectionInterval). + Dur("default", defaultGarbageCollectionInterval). + Msg("falling back to default configuration") } else { gcInterval = c.GarbageCollectionInterval } if c.PeerLifetime <= 0 { peerTTL = defaultPeerLifetime - log.Warn("falling back to default configuration", log.Fields{ - "name": "PeerLifetime", - "provided": c.PeerLifetime, - "default": defaultPeerLifetime, - }) + logger.Warn(). + Str("name", "PeerLifetime"). + Dur("provided", c.PeerLifetime). + Dur("default", defaultPeerLifetime). + Msg("falling back to default configuration") } else { peerTTL = c.PeerLifetime } @@ -62,11 +65,11 @@ func (c Config) sanitizeGCConfig() (gcInterval, peerTTL time.Duration) { func (c Config) sanitizeStatisticsConfig() (statInterval time.Duration) { if c.PrometheusReportingInterval < 0 { statInterval = defaultPrometheusReportingInterval - log.Warn("falling back to default configuration", log.Fields{ - "name": "PrometheusReportingInterval", - "provided": c.PrometheusReportingInterval, - "default": defaultPrometheusReportingInterval, - }) + logger.Warn(). + Str("name", "PrometheusReportingInterval"). + Dur("provided", c.PrometheusReportingInterval). + Dur("default", defaultPrometheusReportingInterval). + Msg("falling back to default configuration") } return } @@ -207,9 +210,9 @@ type PeerStorage interface { // For more details see the documentation in the stop package. stop.Stopper - // Fielder returns a loggable version of the data used to configure and + // LogObjectMarshaler returns a loggable version of the data used to configure and // operate a particular PeerStorage. - log.Fielder + zerolog.LogObjectMarshaler } // RegisterBuilder makes a Builder available by the provided name. @@ -258,15 +261,33 @@ func NewStorage(name string, cfg conf.MapConfig) (ps PeerStorage, err error) { } if gc, isOk := ps.(GCAware); isOk { - gc.ScheduleGC(c.sanitizeGCConfig()) + gcInterval, peerTTL := c.sanitizeGCConfig() + logger.Info(). + Str("type", name). + Dur("gcInterval", gcInterval). + Dur("peerTTL", peerTTL). + Msg("scheduling GC") + gc.ScheduleGC(gcInterval, peerTTL) + } else { + logger.Debug(). + Str("type", name). + Msg("storage does not support GC") } - if statInterval := c.sanitizeStatisticsConfig(); statInterval > 0 { - if st, isOk := ps.(StatisticsAware); isOk { + if st, isOk := ps.(StatisticsAware); isOk { + if statInterval := c.sanitizeStatisticsConfig(); statInterval > 0 { + logger.Info(). + Str("type", name). + Dur("statInterval", statInterval). + Msg("scheduling statistics collection") st.ScheduleStatisticsCollection(statInterval) + } else { + logger.Info().Str("type", name).Msg("statistics collection disabled because of zero reporting interval") } } else { - log.Info("prometheus disabled because of zero reporting interval") + logger.Debug(). + Str("type", name). + Msg("storage does not support statistics collection") } return diff --git a/storage/test/storage_test_base.go b/storage/test/storage_test_base.go index 5d78410..ae1495e 100644 --- a/storage/test/storage_test_base.go +++ b/storage/test/storage_test_base.go @@ -9,14 +9,23 @@ import ( "github.com/stretchr/testify/require" "github.com/sot-tech/mochi/bittorrent" + "github.com/sot-tech/mochi/pkg/log" "github.com/sot-tech/mochi/storage" ) +func init() { + _ = log.ConfigureLogger("", "warn", false, false) +} + // PeerEqualityFunc is the boolean function to use to check two Peers for // equality. // Depending on the implementation of the PeerStorage, this can be changed to // use (Peer).EqualEndpoint instead. -var PeerEqualityFunc = func(p1, p2 bittorrent.Peer) bool { return p1.Equal(p2) } +var PeerEqualityFunc = func(p1, p2 bittorrent.Peer) bool { + return p1.Port() == p2.Port() && + p1.Addr().Compare(p1.Addr()) == 0 && + p1.ID == p2.ID +} type testHolder struct { st storage.PeerStorage