diff --git a/.github/workflows/build.yaml b/.github/workflows/build.yaml index 5fea484..5681a5f 100644 --- a/.github/workflows/build.yaml +++ b/.github/workflows/build.yaml @@ -44,14 +44,15 @@ jobs: go-version: "^1.18" - name: "Install and configure mochi" run: | - go install --tags e2e ./cmd/mochi + go install ./cmd/mochi + go install ./cmd/mochi-e2e cat ./dist/example_config.yaml - name: "Run end-to-end tests" run: | - mochi --config=./dist/example_config.yaml --debug & + mochi --config=./dist/example_config.yaml --logLevel debug --logPretty & pid=$! sleep 2 - mochi e2e --debug + mochi-e2e kill $pid e2e-redis: name: "E2E Redis Tests" @@ -67,15 +68,16 @@ jobs: go-version: "^1.18" - name: "Install and configure mochi" run: | - go install --tags e2e ./cmd/mochi + go install ./cmd/mochi + go install ./cmd/mochi-e2e curl -LO https://github.com/jzelinskie/faq/releases/download/0.0.6/faq-linux-amd64 chmod +x faq-linux-amd64 ./faq-linux-amd64 '.mochi.storage = {"config":{"gc_interval":"3m","peer_lifetime":"31m","prometheus_reporting_interval":"1s","connect_timeout":"15s","read_timeout":"15s","write_timeout":"15s"},"name":"redis"}' ./dist/example_config.yaml > ./dist/example_redis_config.yaml cat ./dist/example_redis_config.yaml - name: "Run end-to-end tests" run: | - mochi --config=./dist/example_redis_config.yaml --debug & + mochi --config=./dist/example_redis_config.yaml --logLevel debug --logPretty & pid=$! sleep 2 - mochi e2e --debug + mochi-e2e kill $pid diff --git a/README.md b/README.md index e21e4d8..d8722ac 100644 --- a/README.md +++ b/README.md @@ -18,7 +18,7 @@ Modified version of [Chihaya](https://github.com/chihaya/chihaya), an open sourc * Allows mixed peers: IPv4 requesters can fetch IPv6 peers or vice versa; * Contains some internal improvements. -_Note: From time to time MoChi fetch modifications from Chihaya but is not +_Note: From time to time MoChi fetch modifications from Chihaya but is not fully compatible with original project (mainly in Redis storage structure), so it cannot be mixed with Chihaya (i.e. it is impossible create MoChi-Chihaya cluster)._ @@ -26,7 +26,8 @@ so it cannot be mixed with Chihaya (i.e. it is impossible create MoChi-Chihaya c The main goal of made modifications is to create semi-private tracker like [Hefur](https://github.com/sot-tech/hefur) but with cluster support (allowed torrents limited by pre-existent `list` middleware and another `directory` middleware -to [limit registered torrents](docs/middleware/torrent_approval.md)) and to maximize torrent swarm by providing maximum peers as possible (IPv4+IPv6). +to [limit registered torrents](docs/middleware/torrent_approval.md)) and to maximize torrent swarm by providing maximum +peers as possible (IPv4+IPv6). ## Notice diff --git a/bittorrent/bittorrent.go b/bittorrent/bittorrent.go index ad66eb7..8fc840e 100644 --- a/bittorrent/bittorrent.go +++ b/bittorrent/bittorrent.go @@ -125,6 +125,7 @@ type Scrape struct { Incomplete uint32 } +// MarshalZerologObject writes fields into zerolog event func (s Scrape) MarshalZerologObject(e *zerolog.Event) { e.Stringer("infoHash", s.InfoHash). Uint32("snatches", s.Snatches). @@ -170,13 +171,6 @@ func NewPeer(data string) (Peer, error) { return peer, err } -// String implements fmt.Stringer to return a human-readable representation. -// The string will have the format @[]:, for example -// "0102030405060708090a0b0c0d0e0f1011121314@[10.11.12.13]:1234" -func (p Peer) String() string { - return fmt.Sprintf("%s@[%s]:%d", p.ID, p.Addr(), p.Port()) -} - // RawString generates concatenation of PeerID, net port and IP-address func (p Peer) RawString() string { ip := p.Addr() @@ -187,25 +181,16 @@ func (p Peer) RawString() string { return string(b) } +// Addr returns unmapped peer's IP address +func (p Peer) Addr() netip.Addr { + return p.AddrPort.Addr().Unmap() +} + +// MarshalZerologObject writes fields into zerolog event func (p Peer) MarshalZerologObject(e *zerolog.Event) { e.Stringer("id", p.ID). Stringer("address", p.Addr()). Uint16("port", p.Port()) - -} - -// Equal reports whether p and x are the same. -func (p Peer) Equal(x Peer) bool { return p.EqualEndpoint(x) && p.ID == x.ID } - -// EqualEndpoint reports whether p and x have the same endpoint. -func (p Peer) EqualEndpoint(x Peer) bool { - return p.Port() == x.Port() && - p.Addr().Compare(x.Addr()) == 0 -} - -// Addr returns unmapped peer's IP address -func (p Peer) Addr() netip.Addr { - return p.AddrPort.Addr().Unmap() } // ClientError represents an error that should be exposed to the client over diff --git a/bittorrent/bittorrent_test.go b/bittorrent/bittorrent_test.go deleted file mode 100644 index 4dac3e3..0000000 --- a/bittorrent/bittorrent_test.go +++ /dev/null @@ -1,57 +0,0 @@ -package bittorrent - -import ( - "fmt" - "net/netip" - "testing" - - "github.com/stretchr/testify/require" -) - -var ( - b = []byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20} - expected = "0102030405060708090a0b0c0d0e0f1011121314" -) - -func TestPeerID_String(t *testing.T) { - pid, err := NewPeerID(b) - require.Nil(t, err) - s := pid.String() - require.Equal(t, expected, s) -} - -func TestInfoHash_String(t *testing.T) { - ih, err := NewInfoHash(b) - require.Nil(t, err) - require.Equal(t, expected, ih.String()) -} - -func TestPeer_String(t *testing.T) { - pid, err := NewPeerID(b) - require.Nil(t, err) - id, _ := NewPeerID(b) - peerStringTestCases := []struct { - input Peer - expected string - }{ - { - input: Peer{ - ID: id, - AddrPort: netip.MustParseAddrPort("10.11.12.1:1234"), - }, - expected: fmt.Sprintf("%s@[10.11.12.1]:1234", expected), - }, - { - input: Peer{ - ID: id, - AddrPort: netip.MustParseAddrPort("[2001:db8::ff00:42:8329]:1234"), - }, - expected: fmt.Sprintf("%s@[2001:db8::ff00:42:8329]:1234", expected), - }, - } - for _, c := range peerStringTestCases { - c.input.ID = pid - got := c.input.String() - require.Equal(t, c.expected, got) - } -} diff --git a/bittorrent/event.go b/bittorrent/event.go index e47950e..bb54ed1 100644 --- a/bittorrent/event.go +++ b/bittorrent/event.go @@ -24,42 +24,49 @@ const ( // Completed is the event sent by a BitTorrent client when it finishes // downloading all of the required chunks. Completed + + // NoneStr string representation of None event + NoneStr = "none" + + // StartedStr string representation of Started event + StartedStr = "started" + + // StoppedStr string representation of Stopped event + StoppedStr = "stopped" + + // CompletedStr string representation of Completed event + CompletedStr = "completed" ) -var ( - eventToString = make(map[Event]string) - stringToEvent = make(map[string]Event) -) - -func init() { - eventToString[None] = "none" - eventToString[Started] = "started" - eventToString[Stopped] = "stopped" - eventToString[Completed] = "completed" - - stringToEvent[""] = None - - for k, v := range eventToString { - stringToEvent[v] = k - } -} - // NewEvent returns the proper Event given a string. func NewEvent(eventStr string) (evt Event, err error) { - if e, ok := stringToEvent[strings.ToLower(eventStr)]; ok { - evt = e - } else { + switch strings.ToLower(eventStr) { + case NoneStr, "": + evt = None + case StartedStr: + evt = Started + case StoppedStr: + evt = Stopped + case CompletedStr: + evt = Completed + default: evt, err = None, ErrUnknownEvent } - return } // String implements Stringer for an event. func (e Event) String() (s string) { - if name, ok := eventToString[e]; ok { - s = name - } else { + switch e { + case None: + s = NoneStr + case Started: + s = StartedStr + case Stopped: + s = StoppedStr + case Completed: + s = CompletedStr + default: s = "" } return diff --git a/bittorrent/params.go b/bittorrent/params.go index 964c884..f7aaf02 100644 --- a/bittorrent/params.go +++ b/bittorrent/params.go @@ -211,6 +211,7 @@ func (qp *QueryParams) RawQuery() string { return qp.query } +// MarshalZerologObject writes fields into zerolog event func (qp QueryParams) MarshalZerologObject(e *zerolog.Event) { e.Str("path", qp.path).Str("query", qp.query) } diff --git a/bittorrent/request.go b/bittorrent/request.go index d76294d..7ae5326 100644 --- a/bittorrent/request.go +++ b/bittorrent/request.go @@ -16,10 +16,6 @@ type RequestAddress struct { Provided bool } -func (a RequestAddress) MarshalZerologObject(e *zerolog.Event) { - e.Stringer("address", a.Addr).Bool("provided", a.Provided) -} - // Validate checks if netip.Addr is valid and not unspecified (0.0.0.0) func (a RequestAddress) Validate() bool { return a.IsValid() && !a.IsUnspecified() @@ -35,6 +31,11 @@ func (a RequestAddress) String() string { return fmt.Sprint(a.Addr.String(), p) } +// MarshalZerologObject writes fields into zerolog event +func (a RequestAddress) MarshalZerologObject(e *zerolog.Event) { + e.Stringer("address", a.Addr).Bool("provided", a.Provided) +} + // RequestAddresses is an array of RequestAddress used mainly for // sort.Interface implementation. // Frontends may determine peer's address from connections info @@ -42,12 +43,6 @@ func (a RequestAddress) String() string { // connection information about peer type RequestAddresses []RequestAddress -func (aa RequestAddresses) MarshalZerologArray(a *zerolog.Array) { - for _, addr := range aa { - a.Object(addr) - } -} - func (aa RequestAddresses) Len() int { return len(aa) } @@ -99,8 +94,17 @@ func (aa RequestAddresses) GetFirst() netip.Addr { return a } +// MarshalZerologArray writes array elements to zerolog event +func (aa RequestAddresses) MarshalZerologArray(a *zerolog.Array) { + for _, addr := range aa { + a.Object(addr) + } +} + +// Peers wrapper of array of Peer-s type Peers []Peer +// MarshalZerologArray writes array elements to zerolog event func (p Peers) MarshalZerologArray(a *zerolog.Array) { for _, peer := range p { a.Object(peer) @@ -115,12 +119,6 @@ type RequestPeer struct { RequestAddresses } -func (rp RequestPeer) MarshalZerologObject(e *zerolog.Event) { - e.Stringer("id", rp.ID). - Array("addresses", rp.RequestAddresses). - Uint16("port", rp.Port) -} - // Peers constructs array of Peer-s with the same ID and Port // for every RequestAddress array. func (rp RequestPeer) Peers() (peers Peers) { @@ -133,6 +131,13 @@ func (rp RequestPeer) Peers() (peers Peers) { return } +// MarshalZerologObject writes fields into zerolog event +func (rp RequestPeer) MarshalZerologObject(e *zerolog.Event) { + e.Stringer("id", rp.ID). + Array("addresses", rp.RequestAddresses). + Uint16("port", rp.Port) +} + // AnnounceRequest represents the parsed parameters from an announce request. type AnnounceRequest struct { Event Event @@ -149,6 +154,7 @@ type AnnounceRequest struct { Params } +// MarshalZerologObject writes fields into zerolog event func (r AnnounceRequest) MarshalZerologObject(e *zerolog.Event) { e.Stringer("event", r.Event). Stringer("infoHash", r.InfoHash). @@ -175,6 +181,7 @@ type AnnounceResponse struct { IPv6Peers Peers } +// MarshalZerologObject writes fields into zerolog event func (r AnnounceResponse) MarshalZerologObject(e *zerolog.Event) { e.Bool("compact", r.Compact). Uint32("complete", r.Complete). @@ -185,8 +192,10 @@ func (r AnnounceResponse) MarshalZerologObject(e *zerolog.Event) { Array("ipv6Peers", r.IPv6Peers) } +// InfoHashes wrapper of array of InfoHash-es type InfoHashes []InfoHash +// MarshalZerologArray writes array elements to zerolog event func (i InfoHashes) MarshalZerologArray(a *zerolog.Array) { for _, ih := range i { a.Str(ih.String()) @@ -202,14 +211,17 @@ type ScrapeRequest struct { Params Params } +// MarshalZerologObject writes fields into zerolog event func (r ScrapeRequest) MarshalZerologObject(e *zerolog.Event) { e.Array("addresses", r.RequestAddresses). Array("infoHashes", r.InfoHashes). Object("params", r.Params) } +// Scrapes wrapper of array of Scrape-s type Scrapes []Scrape +// MarshalZerologArray writes array elements to zerolog event func (s Scrapes) MarshalZerologArray(a *zerolog.Array) { for _, scrape := range s { a.Object(scrape) @@ -224,6 +236,7 @@ type ScrapeResponse struct { Files Scrapes } +// MarshalZerologObject writes fields into zerolog event func (sr ScrapeResponse) MarshalZerologObject(e *zerolog.Event) { e.Array("scrapes", sr.Files) } diff --git a/bittorrent/sanitize.go b/bittorrent/sanitize.go index 2d8167e..e828523 100644 --- a/bittorrent/sanitize.go +++ b/bittorrent/sanitize.go @@ -5,6 +5,7 @@ import ( ) var ( + logger = log.NewLogger("bittorrent") // ErrInvalidIP indicates an invalid IP for an Announce. ErrInvalidIP = ClientError("invalid IP") @@ -29,12 +30,7 @@ func SanitizeAnnounce(r *AnnounceRequest, maxNumWant, defaultNumWant uint32) err r.NumWant = maxNumWant } - log.Debug("sanitized announce", r, log.Fields{ - "port": r.Port, - "addresses": r.RequestAddresses, - "maxNumWant": maxNumWant, - "defaultNumWant": defaultNumWant, - }) + logger.Debug().Object("request", r).Msg("sanitized announce") return nil } @@ -49,9 +45,6 @@ func SanitizeScrape(r *ScrapeRequest, maxScrapeInfoHashes uint32) error { return ErrInvalidIP } - log.Debug("sanitized scrape", r, log.Fields{ - "addresses": r.RequestAddresses, - "maxScrapeInfoHashes": maxScrapeInfoHashes, - }) + logger.Debug().Object("request", r).Msg("sanitized scrape") return nil } diff --git a/cmd/mochi/e2e.go b/cmd/mochi-e2e/e2e.go similarity index 57% rename from cmd/mochi/e2e.go rename to cmd/mochi-e2e/e2e.go index 19c5c91..7abc45a 100644 --- a/cmd/mochi/e2e.go +++ b/cmd/mochi-e2e/e2e.go @@ -1,82 +1,50 @@ -//go:build e2e -// +build e2e - package main import ( + "flag" "fmt" + "log" "math/rand" "time" "github.com/anacrolix/torrent/tracker" - "github.com/spf13/cobra" "github.com/sot-tech/mochi/bittorrent" - "github.com/sot-tech/mochi/pkg/log" ) -func init() { - e2eCmd = &cobra.Command{ - Use: "e2e", - Short: "exec e2e tests", - Long: "Execute the Conf end-to-end test suite", - RunE: EndToEndRunCmdFunc, - } +func main() { + httpAddress := flag.String("httpaddr", "http://127.0.0.1:6969/announce", "address of the HTTP tracker") + udpAddress := flag.String("udpaddr", "udp://127.0.0.1:6969", "address of the UDP tracker") + delay := flag.Duration("delay", time.Second, "delay between announces") + flag.Parse() - e2eCmd.Flags().String("httpaddr", "http://127.0.0.1:6969/announce", "address of the HTTP tracker") - e2eCmd.Flags().String("udpaddr", "udp://127.0.0.1:6969", "address of the UDP tracker") - e2eCmd.Flags().Duration("delay", time.Second, "delay between announces") -} - -// EndToEndRunCmdFunc implements a Cobra command that runs the end-to-end test -// suite for a Conf build. -func EndToEndRunCmdFunc(cmd *cobra.Command, args []string) error { - delay, err := cmd.Flags().GetDuration("delay") - if err != nil { - return err - } - - // Test the HTTP tracker - httpAddr, err := cmd.Flags().GetString("httpaddr") - if err != nil { - return err - } - - if len(httpAddr) != 0 { - log.Info("testing HTTP...") - err := test(httpAddr, delay) + if len(*httpAddress) != 0 { + log.Println("testing HTTP...") + err := test(*httpAddress, *delay) if err != nil { - return err + log.Fatal(err) } - log.Info("success") + log.Println("success") } - // Test the UDP tracker. - udpAddr, err := cmd.Flags().GetString("udpaddr") - if err != nil { - return err - } - - if len(udpAddr) != 0 { - log.Info("testing UDP...") - err := test(udpAddr, delay) + if len(*udpAddress) != 0 { + log.Println("testing UDP...") + err := test(*udpAddress, *delay) if err != nil { - return err + log.Fatal(err) } - log.Info("success") + log.Println("success") } - - return nil } func test(addr string, delay time.Duration) error { b := make([]byte, bittorrent.InfoHashV1Len) rand.Read(b) ih, _ := bittorrent.NewInfoHash(b) - return testWithInfohash(ih, addr, delay) + return testWithInfoHash(ih, addr, delay) } -func testWithInfohash(infoHash bittorrent.InfoHash, url string, delay time.Duration) error { +func testWithInfoHash(infoHash bittorrent.InfoHash, url string, delay time.Duration) error { var ih [bittorrent.InfoHashV1Len]byte req := tracker.AnnounceRequest{ InfoHash: ih, diff --git a/cmd/mochi/main.go b/cmd/mochi/main.go index ea027b5..788a1d5 100644 --- a/cmd/mochi/main.go +++ b/cmd/mochi/main.go @@ -1,182 +1,17 @@ package main import ( - "context" - "errors" - "fmt" + "flag" + "log" + "os" "os/signal" "runtime" - "strings" "syscall" - "github.com/spf13/cobra" - - "github.com/sot-tech/mochi/frontend/http" - "github.com/sot-tech/mochi/frontend/udp" - "github.com/sot-tech/mochi/middleware" - "github.com/sot-tech/mochi/pkg/conf" - "github.com/sot-tech/mochi/pkg/log" - "github.com/sot-tech/mochi/pkg/metrics" - _ "github.com/sot-tech/mochi/pkg/randseed" - "github.com/sot-tech/mochi/pkg/stop" - "github.com/sot-tech/mochi/storage" + l "github.com/sot-tech/mochi/pkg/log" ) -var e2eCmd *cobra.Command - -// Run represents the state of a running instance of Conf. -type Run struct { - configFilePath string - storage storage.PeerStorage - logic *middleware.Logic - sg *stop.Group -} - -// NewRun runs an instance of Conf. -func NewRun(configFilePath string) (*Run, error) { - r := &Run{ - configFilePath: configFilePath, - } - - return r, r.Start(nil) -} - -// Start begins an instance of Conf. -// It is optional to provide an instance of the peer store to avoid the -// creation of a new one. -func (r *Run) Start(ps storage.PeerStorage) error { - configFile, err := ParseConfigFile(r.configFilePath) - if err != nil { - return fmt.Errorf("failed to read config: %w", err) - } - cfg := configFile.Conf - - r.sg = stop.NewGroup() - - if len(cfg.MetricsAddr) > 0 { - log.Info().Str("addr", cfg.MetricsAddr).Msg("starting metrics server") - r.sg.Add(metrics.NewServer(cfg.MetricsAddr)) - } else { - log.Info().Msg("metrics disabled because of empty address") - } - - if ps == nil { - log.Info().Str("name", cfg.Storage.Name).Msg("starting storage") - ps, err = storage.NewStorage(cfg.Storage.Name, cfg.Storage.Config) - if err != nil { - return fmt.Errorf("failed to create storage: %w", err) - } - log.Info().Object("config", ps).Msg("started storage") - } - r.storage = ps - - preHooks, err := middleware.HooksFromHookConfigs(cfg.PreHooks, r.storage) - if err != nil { - return fmt.Errorf("failed to validate hook config: %w", err) - } - postHooks, err := middleware.HooksFromHookConfigs(cfg.PostHooks, r.storage) - if err != nil { - return fmt.Errorf("failed to validate hook config: %w", err) - } - - r.logic = middleware.NewLogic(cfg.AnnounceInterval, cfg.MinAnnounceInterval, r.storage, preHooks, postHooks) - - if len(cfg.HTTPConfig) > 0 { - log.Info().Object("config", cfg.HTTPConfig).Msg("starting HTTP frontend") - httpFE, err := http.NewFrontend(r.logic, cfg.HTTPConfig) - if err == nil { - r.sg.Add(httpFE) - } else if !errors.Is(err, conf.ErrNilConfigMap) { - return err - } - } - - if len(cfg.UDPConfig) > 0 { - log.Info().Object("config", cfg.HTTPConfig).Msg("starting UDP frontend") - udpFE, err := udp.NewFrontend(r.logic, cfg.UDPConfig) - if err == nil { - r.sg.Add(udpFE) - } else if !errors.Is(err, conf.ErrNilConfigMap) { - return err - } - } - - return nil -} - -func combineErrors(prefix string, errs []error) error { - errStrs := make([]string, 0, len(errs)) - for _, err := range errs { - errStrs = append(errStrs, err.Error()) - } - - return errors.New(prefix + ": " + strings.Join(errStrs, "; ")) -} - -// Stop shuts down an instance of Conf. -func (r *Run) Stop(keepPeerStore bool) (storage.PeerStorage, error) { - log.Debug().Msg("stopping frontends and metrics server") - if errs := r.sg.Stop().Wait(); len(errs) != 0 { - return nil, combineErrors("failed while shutting down frontends", errs) - } - - log.Debug().Msg("stopping logic") - if errs := r.logic.Stop().Wait(); len(errs) != 0 { - return nil, combineErrors("failed while shutting down middleware", errs) - } - - if !keepPeerStore { - log.Debug().Msg("stopping peer store") - if errs := r.storage.Stop().Wait(); len(errs) != 0 { - return nil, combineErrors("failed while shutting down peer store", errs) - } - r.storage = nil - } - - return r.storage, nil -} - -// RootRunCmdFunc implements a Cobra command that runs an instance of Conf -// and handles reloading and shutdown via process signals. -func RootRunCmdFunc(cmd *cobra.Command, _ []string) error { - configFilePath, err := cmd.Flags().GetString(configArg) - if err != nil { - return err - } - - r, err := NewRun(configFilePath) - if err != nil { - return err - } - - shutdown, _ := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM) - reload, _ := signal.NotifyContext(context.Background(), ReloadSignals...) - - for { - select { - case <-reload.Done(): - log.Info().Msg("reloading; received reload signal") - peerStore, err := r.Stop(true) - if err != nil { - return err - } - - if err := r.Start(peerStore); err != nil { - return err - } - case <-shutdown.Done(): - log.Info().Msg("shutting down; received shutdown signal") - if _, err := r.Stop(false); err != nil { - return err - } - - return nil - } - } -} - const ( - appName = "mochi" logOutArg = "logOut" logLevelArg = "logLevel" logPrettyArg = "logPretty" @@ -184,59 +19,25 @@ const ( configArg = "config" ) -// configureLogger handles command line flags for the logger. -func configureLogger(cmd *cobra.Command, _ []string) (err error) { - var out, lvl string - var pretty, colored bool - - flags := cmd.Flags() - - out, err = flags.GetString(logOutArg) - if err != nil { - return err - } - - lvl, err = flags.GetString(logLevelArg) - if err != nil { - return err - } - - pretty, err = flags.GetBool(logPrettyArg) - if err != nil { - return err - } - - colored, err = cmd.Flags().GetBool(logColorsArg) - if err != nil { - return err - } - - return log.ConfigureLogger(out, lvl, pretty, colored) -} - func main() { - rootCmd := &cobra.Command{ - Use: appName, - Short: "BitTorrent Tracker", - Long: "A customizable, multi-protocol BitTorrent Tracker", - PersistentPreRunE: configureLogger, - RunE: RootRunCmdFunc, + var s Server + + logOut := flag.String(logOutArg, "stderr", "output for logging, might be 'stderr', 'stdout' or file path") + logLevel := flag.String(logLevelArg, "warn", "logging level: trace, debug, info, warn, error, fatal, panic") + logPretty := flag.Bool(logPrettyArg, false, "enable log pretty print. used only if 'logOut' set to 'stdout' or 'stderr'. if not set, log outputs json") + logColored := flag.Bool(logColorsArg, runtime.GOOS == "windows", "enable log coloring. used only if set 'logPretty'") + configPath := flag.String(configArg, "/etc/mochi.yaml", "location of configuration file") + flag.Parse() + + if err := l.ConfigureLogger(*logOut, *logLevel, *logPretty, *logColored); err != nil { + log.Fatal("unable to configure logger ", err) } - flags := rootCmd.PersistentFlags() - - flags.String(logOutArg, "", "output for logging, might be 'stderr', 'stdout' of file path. 'stderr' if not set") - flags.String(logLevelArg, "info", "logging level (trace, debug, info, warn, error, fatal, panic). 'warn' if not set") - flags.Bool(logPrettyArg, false, "enable log pretty print. used only if 'logOut' set to 'stdout' or 'stderr'. if not set, log outputs json)") - flags.Bool(logColorsArg, runtime.GOOS == "windows", "enable log coloring. used only if set 'logPretty'") - - rootCmd.Flags().String("config", "/etc/mochi.yaml", "location of configuration file") - - if e2eCmd != nil { - rootCmd.AddCommand(e2eCmd) - } - - if err := rootCmd.Execute(); err != nil { - log.Fatal().Err(err).Msg("failed while executing root command") + if err := s.Run(*configPath); err != nil { + log.Fatal("unable to start server ", err) } + defer s.Dispose() + ch := make(chan os.Signal, 2) + signal.Notify(ch, os.Interrupt, syscall.SIGTERM) + <-ch } diff --git a/cmd/mochi/server.go b/cmd/mochi/server.go new file mode 100644 index 0000000..7f42248 --- /dev/null +++ b/cmd/mochi/server.go @@ -0,0 +1,106 @@ +package main + +import ( + "errors" + "fmt" + + "github.com/sot-tech/mochi/frontend/http" + "github.com/sot-tech/mochi/frontend/udp" + "github.com/sot-tech/mochi/middleware" + "github.com/sot-tech/mochi/pkg/log" + "github.com/sot-tech/mochi/pkg/metrics" + "github.com/sot-tech/mochi/pkg/stop" + "github.com/sot-tech/mochi/storage" +) + +// Server represents the state of a running instance. +type Server struct { + storage storage.PeerStorage + logic *middleware.Logic + sg *stop.Group +} + +// Run begins an instance of Conf. +// It is optional to provide an instance of the peer store to avoid the +// creation of a new one. +func (r *Server) Run(configFilePath string) error { + configFile, err := ParseConfigFile(configFilePath) + if err != nil { + return fmt.Errorf("failed to read config: %w", err) + } + cfg := configFile.Conf + + r.sg = stop.NewGroup() + + if len(cfg.MetricsAddr) > 0 { + log.Info().Str("address", cfg.MetricsAddr).Msg("starting metrics server") + r.sg.Add(metrics.NewServer(cfg.MetricsAddr)) + } else { + log.Info().Msg("metrics disabled because of empty address") + } + + log.Info().Str("name", cfg.Storage.Name).Msg("starting storage") + r.storage, err = storage.NewStorage(cfg.Storage.Name, cfg.Storage.Config) + if err != nil { + return fmt.Errorf("failed to create storage: %w", err) + } + log.Info().Object("config", r.storage).Msg("started storage") + + preHooks, err := middleware.NewHooks(cfg.PreHooks, r.storage) + if err != nil { + return fmt.Errorf("failed to validate hook config: %w", err) + } + postHooks, err := middleware.NewHooks(cfg.PostHooks, r.storage) + if err != nil { + return fmt.Errorf("failed to validate hook config: %w", err) + } + + r.logic = middleware.NewLogic(cfg.AnnounceInterval, cfg.MinAnnounceInterval, r.storage, preHooks, postHooks) + + var started bool + if len(cfg.HTTPConfig) > 0 { + log.Info().Object("config", cfg.HTTPConfig).Msg("starting HTTP frontend") + httpFE, err := http.NewFrontend(r.logic, cfg.HTTPConfig) + if err == nil { + r.sg.Add(httpFE) + started = true + } else { + return err + } + } + + if len(cfg.UDPConfig) > 0 { + log.Info().Object("config", cfg.UDPConfig).Msg("starting UDP frontend") + udpFE, err := udp.NewFrontend(r.logic, cfg.UDPConfig) + if err == nil { + r.sg.Add(udpFE) + started = true + } else { + return err + } + } + if !started { + return errors.New("no frontends configured") + } + + return nil +} + +// Dispose shuts down an instance of Server. +func (r *Server) Dispose() { + log.Debug().Msg("stopping frontends and metrics server") + if errs := r.sg.Stop().Wait(); len(errs) > 0 { + log.Error().Errs("errors", errs).Msg("error occurred while shutting down frontends") + } + + log.Debug().Msg("stopping logic") + if errs := r.logic.Stop().Wait(); len(errs) > 0 { + log.Error().Errs("errors", errs).Msg("error occurred while shutting down middlewares") + } + + log.Debug().Msg("stopping peer store") + if errs := r.storage.Stop().Wait(); len(errs) != 0 { + log.Error().Errs("errors", errs).Msg("error occurred while shutting down peer store") + } + log.Close() +} diff --git a/cmd/mochi/signal_unix.go b/cmd/mochi/signal_unix.go deleted file mode 100644 index 9dc3fd4..0000000 --- a/cmd/mochi/signal_unix.go +++ /dev/null @@ -1,14 +0,0 @@ -//go:build darwin || freebsd || linux || netbsd || openbsd || dragonfly || solaris - -package main - -import ( - "os" - "syscall" -) - -// ReloadSignals are the signals that the current OS will send to the process -// when a configuration reload is requested. -var ReloadSignals = []os.Signal{ - syscall.SIGUSR1, -} diff --git a/cmd/mochi/signal_windows.go b/cmd/mochi/signal_windows.go deleted file mode 100644 index 684169f..0000000 --- a/cmd/mochi/signal_windows.go +++ /dev/null @@ -1,14 +0,0 @@ -//go:build windows - -package main - -import ( - "os" - "syscall" -) - -// ReloadSignals are the signals that the current OS will send to the process -// when a configuration reload is requested. -var ReloadSignals = []os.Signal{ - syscall.SIGHUP, -} diff --git a/docs/middleware/torrent_approval.md b/docs/middleware/torrent_approval.md index 488b172..4828ead 100644 --- a/docs/middleware/torrent_approval.md +++ b/docs/middleware/torrent_approval.md @@ -24,9 +24,9 @@ There are two sources of hashes: `list` and `directory`. * `list` is the static set of hashes, specified in configuration file. * `directory` will watch for `*.torrent` files in specified path and -append/delete records from storage. This source will parse all existing -files at start and then watch for new files to add, or for delete events -to remove hash from storage. + append/delete records from storage. This source will parse all existing + files at start and then watch for new files to add, or for delete events + to remove hash from storage. Note: if storage is not `memory`, and `preserve` option set to `true`, records will be persisted in storage until _somebody_ or _something_ (different tool with access diff --git a/docs/storage/keydb.md b/docs/storage/keydb.md index 4383f22..aa1168f 100644 --- a/docs/storage/keydb.md +++ b/docs/storage/keydb.md @@ -3,18 +3,18 @@ This storage mainly the same as Redis and uses some of [redis](redis.md) store logic with next exceptions: -* peers stored in [sets](https://redis.io/docs/manual/data-types/#sets) -instead of [hashes](https://redis.io/docs/manual/data-types/#hashes); +* peers stored in [sets](https://redis.io/docs/manual/data-types/#sets) + instead of [hashes](https://redis.io/docs/manual/data-types/#hashes); * keys such as `CHI_I`, `CHI_S_C` and `CHI_L_C` not used (at all); -* peer TTL relies on KeyDB's [EXPIREMEMBER](https://docs.keydb.dev/docs/commands/#expiremember) -command, so MoChi does not need to periodically check peer expiration; +* peer TTL relies on KeyDB's [EXPIREMEMBER](https://docs.keydb.dev/docs/commands/#expiremember) + command, so MoChi does not need to periodically check peer expiration; * storage does not execute periodical statistics collection (peer/lecher/info hash count) -because: - * manual calculation (INC/DEC peers count) is not usable - * manual scan of all keys is quite expensive operation. + because: + * manual calculation (INC/DEC peers count) is not usable + * manual scan of all keys is quite expensive operation. ## Use Case diff --git a/frontend/http/frontend.go b/frontend/http/frontend.go index dcb8da9..1a6189e 100644 --- a/frontend/http/frontend.go +++ b/frontend/http/frontend.go @@ -61,7 +61,6 @@ func (cfg Config) Validate() Config { Dur("provided", cfg.ReadTimeout). Dur("default", validcfg.ReadTimeout). Msg("falling back to default configuration") - } if cfg.WriteTimeout <= 0 { diff --git a/frontend/http/writer_test.go b/frontend/http/writer_test.go index 572e093..bb5cb6a 100644 --- a/frontend/http/writer_test.go +++ b/frontend/http/writer_test.go @@ -8,8 +8,13 @@ import ( "github.com/stretchr/testify/require" "github.com/sot-tech/mochi/bittorrent" + "github.com/sot-tech/mochi/pkg/log" ) +func init() { + _ = log.ConfigureLogger("", "warn", false, false) +} + func TestWriteError(t *testing.T) { table := []struct { reason, expected string diff --git a/frontend/udp/frontend.go b/frontend/udp/frontend.go index 724719e..1ae757f 100644 --- a/frontend/udp/frontend.go +++ b/frontend/udp/frontend.go @@ -52,7 +52,7 @@ func (cfg Config) Validate() Config { } validcfg.PrivateKey = string(pkeyRunes) - log.Warn(). + logger.Warn(). Str("name", "UDP.PrivateKey"). Str("provided", ""). Str("key", validcfg.PrivateKey). diff --git a/frontend/udp/frontend_test.go b/frontend/udp/frontend_test.go index b1b48ac..86b8c6f 100644 --- a/frontend/udp/frontend_test.go +++ b/frontend/udp/frontend_test.go @@ -6,11 +6,16 @@ import ( "github.com/sot-tech/mochi/frontend/udp" "github.com/sot-tech/mochi/middleware" "github.com/sot-tech/mochi/pkg/conf" + "github.com/sot-tech/mochi/pkg/log" _ "github.com/sot-tech/mochi/pkg/randseed" "github.com/sot-tech/mochi/storage" _ "github.com/sot-tech/mochi/storage/memory" ) +func init() { + _ = log.ConfigureLogger("", "warn", false, false) +} + func TestStartStopRaceIssue437(t *testing.T) { ps, err := storage.NewStorage("memory", conf.MapConfig{}) if err != nil { diff --git a/go.mod b/go.mod index e36c2fd..099e5c3 100644 --- a/go.mod +++ b/go.mod @@ -14,8 +14,6 @@ require ( github.com/pkg/errors v0.9.1 github.com/prometheus/client_golang v1.12.1 github.com/rs/zerolog v1.26.1 - github.com/sirupsen/logrus v1.6.0 - github.com/spf13/cobra v1.4.0 github.com/stretchr/testify v1.7.1 gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b ) @@ -24,24 +22,21 @@ require ( github.com/anacrolix/dht/v2 v2.17.0 // indirect github.com/anacrolix/log v0.13.1 // indirect github.com/anacrolix/missinggo v1.3.0 // indirect - github.com/anacrolix/missinggo/v2 v2.6.0 // indirect + github.com/anacrolix/missinggo/v2 v2.7.0 // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/bradfitz/iter v0.0.0-20191230175014-e8f45d346db8 // indirect github.com/cespare/xxhash/v2 v2.1.2 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect - github.com/fsnotify/fsnotify v1.5.3 // indirect + github.com/fsnotify/fsnotify v1.5.4 // indirect github.com/golang/protobuf v1.5.2 // indirect github.com/huandu/xstrings v1.3.2 // indirect - github.com/inconshreveable/mousetrap v1.0.0 // indirect github.com/klauspost/cpuid/v2 v2.0.12 // indirect - github.com/konsorten/go-windows-terminal-sequences v1.0.3 // indirect github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/prometheus/client_model v0.2.0 // indirect github.com/prometheus/common v0.34.0 // indirect github.com/prometheus/procfs v0.7.3 // indirect - github.com/spf13/pflag v1.0.5 // indirect - golang.org/x/sys v0.0.0-20220422013727-9388b58f7150 // indirect + golang.org/x/sys v0.0.0-20220429233432-b5fbb4746d32 // indirect google.golang.org/protobuf v1.28.0 // indirect ) diff --git a/go.sum b/go.sum index 7c765ff..7133a44 100644 --- a/go.sum +++ b/go.sum @@ -67,8 +67,8 @@ github.com/anacrolix/missinggo v1.3.0/go.mod h1:bqHm8cE8xr+15uVfMG3BFui/TxyB6//H github.com/anacrolix/missinggo/perf v1.0.0/go.mod h1:ljAFWkBuzkO12MQclXzZrosP5urunoLS0Cbvb4V0uMQ= github.com/anacrolix/missinggo/v2 v2.2.0/go.mod h1:o0jgJoYOyaoYQ4E2ZMISVa9c88BbUBVQQW4QeRkNCGY= github.com/anacrolix/missinggo/v2 v2.5.1/go.mod h1:WEjqh2rmKECd0t1VhQkLGTdIWXO6f6NLjp5GlMZ+6FA= -github.com/anacrolix/missinggo/v2 v2.6.0 h1:kHkn6nLy1isWYV4mthZX8itV1bRd2mwFVuXrxzJ4VX0= -github.com/anacrolix/missinggo/v2 v2.6.0/go.mod h1:2IZIvmRTizALNYFYXsPR7ofXPzJgyBpKZ4kMqMEICkI= +github.com/anacrolix/missinggo/v2 v2.7.0 h1:4fzOAAn/VCvfWGviLmh64MPMttrlYew81JdPO7nSHvI= +github.com/anacrolix/missinggo/v2 v2.7.0/go.mod h1:2IZIvmRTizALNYFYXsPR7ofXPzJgyBpKZ4kMqMEICkI= github.com/anacrolix/stm v0.2.0/go.mod h1:zoVQRvSiGjGoTmbM0vSLIiaKjWtNPeTvXUSdJQA4hsg= github.com/anacrolix/tagflag v0.0.0-20180109131632-2146c8d41bf0/go.mod h1:1m2U/K6ZT+JZG0+bdMK6qauP49QT4wE5pmhJXOKKCHw= github.com/anacrolix/tagflag v1.0.0/go.mod h1:1m2U/K6ZT+JZG0+bdMK6qauP49QT4wE5pmhJXOKKCHw= @@ -95,7 +95,6 @@ github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMn github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc= github.com/coreos/go-systemd/v22 v22.3.2/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc= -github.com/cpuguy83/go-md2man/v2 v2.0.1/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o= github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= @@ -114,8 +113,8 @@ github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1m github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= github.com/frankban/quicktest v1.14.0 h1:+cqqvzZV87b4adx/5ayVOaYZ2CrvM4ejQvUdBzPPUss= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= -github.com/fsnotify/fsnotify v1.5.3 h1:vNFpj2z7YIbwh2bw7x35sqYpp2wfuq+pivKbWG09B8c= -github.com/fsnotify/fsnotify v1.5.3/go.mod h1:T3375wBYaZdLLcVNkcVbzGHY7f1l/uK5T5Ai1i3InKU= +github.com/fsnotify/fsnotify v1.5.4 h1:jRbGcIw6P2Meqdwuo0H1p6JVLbL5DHKAKlYndzMwVZI= +github.com/fsnotify/fsnotify v1.5.4/go.mod h1:OVB6XrOHzAwXMpEM7uPOzcehqUV2UqJxmVXmkdnm1bU= github.com/glycerine/go-unsnap-stream v0.0.0-20180323001048-9f0cb55181dd/go.mod h1:/20jfyN9Y5QPEAprSgKAUr+glWDY39ZiUEAYOEv5dsE= github.com/glycerine/go-unsnap-stream v0.0.0-20181221182339-f9677308dec2/go.mod h1:/20jfyN9Y5QPEAprSgKAUr+glWDY39ZiUEAYOEv5dsE= github.com/glycerine/go-unsnap-stream v0.0.0-20190901134440-81cf024a9e0a/go.mod h1:/20jfyN9Y5QPEAprSgKAUr+glWDY39ZiUEAYOEv5dsE= @@ -210,8 +209,6 @@ github.com/huandu/xstrings v1.3.1/go.mod h1:y5/lhBue+AyNmUVz9RLU9xbLR0o4KIIExikq github.com/huandu/xstrings v1.3.2 h1:L18LIDzqlW6xN2rEkpdV8+oL/IXWJ1APd+vsdYy4Wdw= github.com/huandu/xstrings v1.3.2/go.mod h1:y5/lhBue+AyNmUVz9RLU9xbLR0o4KIIExikq4ovT0aE= github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc= -github.com/inconshreveable/mousetrap v1.0.0 h1:Z8tu5sraLXCXIcARxBp/8cbvlwVa7Z1NHg9XEKhtSvM= -github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8= github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4= github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU= github.com/json-iterator/go v1.1.9/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4= @@ -230,7 +227,6 @@ github.com/klauspost/cpuid/v2 v2.0.4/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa02 github.com/klauspost/cpuid/v2 v2.0.12 h1:p9dKCg8i4gmOxtv35DvrYoWqYzQrvEVdjQ762Y0OqZE= github.com/klauspost/cpuid/v2 v2.0.12/go.mod h1:g2LTdtYhdyuGPqyWyv7qRAmj1WBqxuObKfj5c0PQa7c= github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= -github.com/konsorten/go-windows-terminal-sequences v1.0.3 h1:CE8S1cTafDpPvMhIxNJKvHsGVBgn1xWYf1NbHQhywc8= github.com/konsorten/go-windows-terminal-sequences v1.0.3/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= @@ -312,20 +308,14 @@ github.com/rogpeppe/go-internal v1.8.0/go.mod h1:WmiCO8CzOY8rg0OYDC4/i/2WRWAB6po github.com/rs/xid v1.3.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg= github.com/rs/zerolog v1.26.1 h1:/ihwxqH+4z8UxyI70wM1z9yCvkWcfz/a3mj48k/Zngc= github.com/rs/zerolog v1.26.1/go.mod h1:/wSSJWX7lVrsOwlbyTRSOJvqRlc+WjWlfes+CiJ+tmc= -github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= github.com/ryszard/goskiplist v0.0.0-20150312221310-2dfbae5fcf46/go.mod h1:uAQ5PCi+MFsC7HjREoAz1BU+Mq60+05gifQSsHSDG/8= github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo= github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE= -github.com/sirupsen/logrus v1.6.0 h1:UBcNElsrwanuuMsnGSlYmtmgbb23qDR5dG+6X6Oo89I= github.com/sirupsen/logrus v1.6.0/go.mod h1:7uNnSEd1DgxDLC74fIahvMZmmYsHGZGEOFrfsX/uA88= github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d/go.mod h1:OnSkiWE9lh6wB0YB77sQom3nweQdgAjqCqsofrRNTgc= github.com/smartystreets/assertions v0.0.0-20190215210624-980c5ac6f3ac/go.mod h1:OnSkiWE9lh6wB0YB77sQom3nweQdgAjqCqsofrRNTgc= github.com/smartystreets/goconvey v0.0.0-20181108003508-044398e4856c/go.mod h1:XDJAKZRPZ1CvBcN2aX5YOUTYGHki24fSF0Iv48Ibg0s= github.com/smartystreets/goconvey v0.0.0-20190306220146-200a235640ff/go.mod h1:KSQcGKpxUMHk3nbYzs/tIBAM2iDooCn0BmttHOJEbLs= -github.com/spf13/cobra v1.4.0 h1:y+wJpx64xcgO1V+RcnwW0LEHxTKRi2ZDPSBjWnrg88Q= -github.com/spf13/cobra v1.4.0/go.mod h1:Wo4iy3BUC+X2Fybo0PDqwJIv3dNRiZLHQymsfxlB84g= -github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA= -github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.2.1/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= @@ -481,12 +471,12 @@ golang.org/x/sys v0.0.0-20210124154548-22da62e12c0c/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210603081109-ebe580a85c40/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210809222454-d867a43fc93e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220114195835-da31bd327af9/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.0.0-20220422013727-9388b58f7150 h1:xHms4gcpe1YE7A3yIllJXP16CMAGuqwO2lX1mTyyRRc= -golang.org/x/sys v0.0.0-20220422013727-9388b58f7150/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220412211240-33da011f77ad/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220429233432-b5fbb4746d32 h1:Js08h5hqB5xyWR789+QqueR6sDE8mk+YvpETZ+F6X9Y= +golang.org/x/sys v0.0.0-20220429233432-b5fbb4746d32/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= diff --git a/middleware/hooks.go b/middleware/hooks.go index 4419a8a..88f1639 100644 --- a/middleware/hooks.go +++ b/middleware/hooks.go @@ -5,7 +5,6 @@ import ( "errors" "github.com/sot-tech/mochi/bittorrent" - "github.com/sot-tech/mochi/pkg/log" "github.com/sot-tech/mochi/storage" ) @@ -185,16 +184,10 @@ func (h *responseHook) appendPeers(req *bittorrent.AnnounceRequest, resp *bittor resp.IPv4Peers = append(resp.IPv4Peers, p) uniquePeers[p] = nil } else { - log.Warn("received invalid peer from storage", log.Fields{"peer": p}) + logger.Warn().Object("peer", p).Msg("received invalid peer from storage") } } } - log.Debug("responseHook announce peers", log.Fields{ - "infoHash": req.InfoHash, - "requestPeer": req.RequestPeer, - "ipv4Peers": resp.IPv4Peers, - "ipv6Peers": resp.IPv6Peers, - }) return } diff --git a/middleware/logic.go b/middleware/logic.go index 8319f52..ad5cef3 100644 --- a/middleware/logic.go +++ b/middleware/logic.go @@ -38,6 +38,7 @@ type Logic struct { // HandleAnnounce generates a response for an Announce. func (l *Logic) HandleAnnounce(ctx context.Context, req *bittorrent.AnnounceRequest) (_ context.Context, resp *bittorrent.AnnounceResponse, err error) { + logger.Debug().Object("request", req).Msg("new announce request") resp = &bittorrent.AnnounceResponse{ Interval: l.announceInterval, MinInterval: l.minAnnounceInterval, @@ -70,6 +71,7 @@ func (l *Logic) AfterAnnounce(ctx context.Context, req *bittorrent.AnnounceReque // HandleScrape generates a response for a Scrape. func (l *Logic) HandleScrape(ctx context.Context, req *bittorrent.ScrapeRequest) (_ context.Context, resp *bittorrent.ScrapeResponse, err error) { + logger.Debug().Object("request", req).Msg("new scrape request") resp = &bittorrent.ScrapeResponse{ Files: make([]bittorrent.Scrape, 0, len(req.InfoHashes)), } diff --git a/middleware/logic_test.go b/middleware/logic_test.go index 5929354..7b0592e 100644 --- a/middleware/logic_test.go +++ b/middleware/logic_test.go @@ -9,8 +9,13 @@ import ( "github.com/stretchr/testify/require" "github.com/sot-tech/mochi/bittorrent" + "github.com/sot-tech/mochi/pkg/log" ) +func init() { + _ = log.ConfigureLogger("", "warn", false, false) +} + // nopHook is a Hook to measure the overhead of a no-operation Hook through // benchmarks. type nopHook struct{} diff --git a/middleware/middleware.go b/middleware/middleware.go index 140a97b..bb8bf05 100644 --- a/middleware/middleware.go +++ b/middleware/middleware.go @@ -47,11 +47,11 @@ func RegisterBuilder(name string, d Builder) { drivers[name] = d } -// New attempts to initialize a new middleware instance from the +// NewHook attempts to initialize a new middleware instance from the // list of registered Builders. // // If a driver does not exist, returns ErrBuilderDoesNotExist. -func New(name string, options conf.MapConfig, storage storage.PeerStorage) (Hook, error) { +func NewHook(name string, options conf.MapConfig, storage storage.PeerStorage) (Hook, error) { driversM.RLock() defer driversM.RUnlock() @@ -70,9 +70,9 @@ type Config struct { Options conf.MapConfig } -// HooksFromHookConfigs is a utility function for initializing Hooks in bulk. +// NewHooks is a utility function for initializing Hooks in bulk. // each element of configs must contain pairs `name` - string and `options` - map[string]any -func HooksFromHookConfigs(configs []conf.MapConfig, storage storage.PeerStorage) (hooks []Hook, err error) { +func NewHooks(configs []conf.MapConfig, storage storage.PeerStorage) (hooks []Hook, err error) { for _, cfg := range configs { var c Config @@ -81,7 +81,7 @@ func HooksFromHookConfigs(configs []conf.MapConfig, storage storage.PeerStorage) } var h Hook - h, err = New(c.Name, c.Options, storage) + h, err = NewHook(c.Name, c.Options, storage) if err != nil { break } diff --git a/middleware/torrentapproval/container/directory/directory.go b/middleware/torrentapproval/container/directory/directory.go index 365bc67..7b2e13c 100644 --- a/middleware/torrentapproval/container/directory/directory.go +++ b/middleware/torrentapproval/container/directory/directory.go @@ -57,36 +57,37 @@ func build(conf conf.MapConfig, st storage.DataStorage) (container.Container, er } d.watcher = w if len(d.StorageCtx) == 0 { - log.Info("storage context not set, using default value: " + container.DefaultStorageCtxName) + logger.Warn(). + Str("name", "StorageCtx"). + Str("provided", d.StorageCtx). + Str("default", container.DefaultStorageCtxName). + Msg("falling back to default configuration") d.StorageCtx = container.DefaultStorageCtxName } go func() { for event := range d.watcher.Events { var mi *metainfo.MetaInfo - lf := log.Fields{ - "file": event.TorrentFilePath, - "v1hash": event.InfoHash, - } if mi, err = metainfo.LoadFromFile(event.TorrentFilePath); err == nil { s256 := sha256.New() s256.Write(mi.InfoBytes) v2hash, _ := bittorrent.NewInfoHash(s256.Sum(nil)) - lf["v2hash"] = v2hash - lf["v2to1hash"] = v2hash.TruncateV1() switch event.Change { case dirwatch.Added: var name string if info, err := mi.UnmarshalInfo(); err == nil { name = info.Name } else { - lf["error"] = err - log.Warn("unable to unmarshal torrent info", lf) - delete(lf, "error") + logger.Error(). + Err(err). + Str("file", event.TorrentFilePath). + Stringer("infoHash", event.InfoHash). + Stringer("infoHashV2", v2hash). + Msg("unable to unmarshal torrent info") } if len(name) == 0 { name = list.DUMMY } - if err := d.Storage.Put(d.StorageCtx, + logger.Err(d.Storage.Put(d.StorageCtx, storage.Entry{ Key: event.InfoHash.AsString(), Value: name, @@ -96,23 +97,28 @@ func build(conf conf.MapConfig, st storage.DataStorage) (container.Container, er }, storage.Entry{ Key: v2hash.TruncateV1().RawString(), Value: name, - }); err != nil { - lf["error"] = err - } - log.Debug("approval torrent added", lf) + })). + Str("action", "add"). + Str("file", event.TorrentFilePath). + Stringer("infoHash", event.InfoHash). + Stringer("infoHashV2", v2hash). + Msg("approval torrent watcher event") case dirwatch.Removed: - if err := d.Storage.Delete(c.StorageCtx, + logger.Err(d.Storage.Delete(c.StorageCtx, event.InfoHash.AsString(), v2hash.RawString(), v2hash.TruncateV1().RawString(), - ); err != nil { - lf["error"] = err - } - log.Debug("approval torrent deleted", lf) + )). + Str("action", "delete"). + Str("file", event.TorrentFilePath). + Stringer("infoHash", event.InfoHash). + Stringer("infoHashV2", v2hash). + Msg("approval torrent watcher event") } } else { - lf["error"] = err - log.Error("unable to load torrent file", lf) + logger.Error().Err(err). + Str("file", event.TorrentFilePath). + Msg("unable to load torrent file") } } }() @@ -126,6 +132,8 @@ type directory struct { // Stop closes watching of torrent directory func (d *directory) Stop() stop.Result { + st := make(stop.Channel) d.watcher.Close() - return stop.AlreadyStopped + st.Done() + return st.Result() } diff --git a/middleware/torrentapproval/container/list/list.go b/middleware/torrentapproval/container/list/list.go index ae51b43..eff0413 100644 --- a/middleware/torrentapproval/container/list/list.go +++ b/middleware/torrentapproval/container/list/list.go @@ -15,6 +15,8 @@ import ( // Name of this container for registry. const Name = "list" +var logger = log.NewLogger("torrent approval list") + func init() { container.Register(Name, build) } @@ -45,7 +47,11 @@ func build(conf conf.MapConfig, st storage.DataStorage) (container.Container, er } if len(l.StorageCtx) == 0 { - log.Info("Storage context not set, using default value: " + container.DefaultStorageCtxName) + logger.Warn(). + Str("name", "StorageCtx"). + Str("provided", l.StorageCtx). + Str("default", container.DefaultStorageCtxName). + Msg("falling back to default configuration") l.StorageCtx = container.DefaultStorageCtxName } @@ -93,7 +99,7 @@ func (l *List) Approved(hash bittorrent.InfoHash) (contains bool) { } } if err != nil { - log.Err(err) + logger.Error().Err(err).Stringer("infoHash", hash).Msg("unable load hash information from storage") } return contains != l.Invert } diff --git a/middleware/torrentapproval/torrentapproval_test.go b/middleware/torrentapproval/torrentapproval_test.go index 32c11d5..8bf69ba 100644 --- a/middleware/torrentapproval/torrentapproval_test.go +++ b/middleware/torrentapproval/torrentapproval_test.go @@ -9,9 +9,14 @@ import ( "github.com/sot-tech/mochi/bittorrent" "github.com/sot-tech/mochi/pkg/conf" + "github.com/sot-tech/mochi/pkg/log" "github.com/sot-tech/mochi/storage/memory" ) +func init() { + _ = log.ConfigureLogger("", "warn", false, false) +} + var cases = []struct { cfg baseConfig ih string diff --git a/pkg/conf/decoder.go b/pkg/conf/decoder.go index 0702b03..2f28be1 100644 --- a/pkg/conf/decoder.go +++ b/pkg/conf/decoder.go @@ -19,6 +19,7 @@ var ErrNilConfigMap = errors.New("unable to process nil map") // MapConfig is just alias for map[string]any type MapConfig map[string]any +// MarshalZerologObject writes map into zerolog event func (m MapConfig) MarshalZerologObject(e *zerolog.Event) { e.Fields(map[string]any(m)) } diff --git a/pkg/log/log.go b/pkg/log/log.go index 1ccd2dd..b172a94 100644 --- a/pkg/log/log.go +++ b/pkg/log/log.go @@ -1,21 +1,37 @@ -// Package log adds a thin wrapper around logrus to improve non-debug logging -// performance. +// Package log adds a thin wrapper around zerolog to improve logging performance. +// +// Root logger (called by log.Info, log.Warn etc.) uses global zerolog.Logger instance +// until ConfigureLogger called. Any child logger created with NewLogger will not +// produce any events until logger configured, so any function which uses child +// logger will come stuck because of root initialization synchronization. package log import ( "io" "os" "strings" - "time" - + "sync" + // needs for async file logging _ "code.cloudfoundry.org/go-diodes" "github.com/rs/zerolog" "github.com/rs/zerolog/diode" zl "github.com/rs/zerolog/log" ) -var root = zl.Logger +var ( + root = zl.Logger + rootWg = sync.WaitGroup{} + customOut io.WriteCloser + customOutMu = sync.Mutex{} +) +func init() { + rootWg.Add(1) +} + +// ConfigureLogger initializes root and all child loggers. +// NOTE: this function MUST be called before any child log call +// otherwise any goroutine, which uses logger will wait logger initialization func ConfigureLogger(output, level string, formatted, colored bool) (err error) { lvl := zerolog.WarnLevel output = strings.ToLower(output) @@ -27,11 +43,13 @@ func ConfigureLogger(output, level string, formatted, colored bool) (err error) case "stdout": w, stdAny = os.Stdout, true default: - if w, err = os.OpenFile(output, os.O_APPEND|os.O_CREATE, 0600); err == nil { - w = diode.NewWriter(w, 1000, 10*time.Millisecond, func(missed int) { + if w, err = os.OpenFile(output, os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0o600); err == nil { + customOutMu.Lock() + defer customOutMu.Unlock() + customOut = diode.NewWriter(w, 1000, 0, func(missed int) { zl.Warn().Int("count", missed).Msg("Logger dropped messages") }) - + w = customOut } else { return err } @@ -52,29 +70,234 @@ func ConfigureLogger(output, level string, formatted, colored bool) (err error) } root = zerolog.New(w).With().Timestamp().Logger() zerolog.SetGlobalLevel(lvl) + rootWg.Done() return nil } +// Logger is the holder for zerolog.Logger which +// waits until root logger initialized to prevent +// mixed logging format and output +type Logger struct { + comp string + zlOnce sync.Once + zerolog.Logger +} + +func (l *Logger) init() { + l.zlOnce.Do(func() { + rootWg.Wait() + l.Logger = root.With().Str("component", l.comp).Logger() + }) +} + +// ==== copied from zerolog ==== + +// Trace starts a new message with trace level. +// +// You must call Msg on the returned event in order to send the event. +func (l *Logger) Trace() *zerolog.Event { + l.init() + return l.Logger.Trace() +} + +// Debug starts a new message with debug level. +// +// You must call Msg on the returned event in order to send the event. +func (l *Logger) Debug() *zerolog.Event { + l.init() + return l.Logger.Debug() +} + +// Info starts a new message with info level. +// +// You must call Msg on the returned event in order to send the event. +func (l *Logger) Info() *zerolog.Event { + l.init() + return l.Logger.Info() +} + +// Warn starts a new message with warn level. +// +// You must call Msg on the returned event in order to send the event. +func (l *Logger) Warn() *zerolog.Event { + l.init() + return l.Logger.Warn() +} + +// Error starts a new message with error level. +// +// You must call Msg on the returned event in order to send the event. +func (l *Logger) Error() *zerolog.Event { + l.init() + return l.Logger.Error() +} + +// Err starts a new message with error level with err as a field if not nil or +// with info level if err is nil. +// +// You must call Msg on the returned event in order to send the event. +func (l *Logger) Err(err error) *zerolog.Event { + l.init() + return l.Logger.Err(err) +} + +// Fatal starts a new message with fatal level. The os.Exit(1) function +// is called by the Msg method, which terminates the program immediately. +// +// You must call Msg on the returned event in order to send the event. +func (l *Logger) Fatal() *zerolog.Event { + l.init() + return l.Logger.Fatal() +} + +// Panic starts a new message with panic level. The panic() function +// is called by the Msg method, which stops the ordinary flow of a goroutine. +// +// You must call Msg on the returned event in order to send the event. +func (l *Logger) Panic() *zerolog.Event { + l.init() + return l.Logger.Panic() +} + +// WithLevel starts a new message with level. Unlike Fatal and Panic +// methods, WithLevel does not terminate the program or stop the ordinary +// flow of a gourotine when used with their respective levels. +// +// You must call Msg on the returned event in order to send the event. +func (l *Logger) WithLevel(level zerolog.Level) *zerolog.Event { + l.init() + return l.Logger.WithLevel(level) +} + +// Log starts a new message with no level. Setting GlobalLevel to Disabled +// will still disable events produced by this method. +// +// You must call Msg on the returned event in order to send the event. +func (l *Logger) Log() *zerolog.Event { + l.init() + return l.Logger.Log() +} + +// Print sends a log event using debug level and no extra field. +// Arguments are handled in the manner of fmt.Print. +func (l *Logger) Print(v ...interface{}) { + l.init() + l.Logger.Print(v...) +} + +// Printf sends a log event using debug level and no extra field. +// Arguments are handled in the manner of fmt.Printf. +func (l *Logger) Printf(format string, v ...interface{}) { + l.init() + l.Logger.Printf(format, v...) +} + +// Write implements the io.Writer interface. This is useful to set as a writer +// for the standard library log. +func (l *Logger) Write(p []byte) (n int, err error) { + l.init() + return l.Logger.Write(p) +} + +// Err starts a new message with error level with err as a field if not nil or +// with info level if err is nil. +// +// You must call Msg on the returned event in order to send the event. +func Err(err error) *zerolog.Event { + return root.Err(err) +} + +// Trace starts a new message with trace level. +// +// You must call Msg on the returned event in order to send the event. +func Trace() *zerolog.Event { + return root.Trace() +} + +// Debug starts a new message with debug level. +// +// You must call Msg on the returned event in order to send the event. func Debug() *zerolog.Event { return root.Debug() } +// Info starts a new message with info level. +// +// You must call Msg on the returned event in order to send the event. func Info() *zerolog.Event { return root.Info() } +// Warn starts a new message with warn level. +// +// You must call Msg on the returned event in order to send the event. func Warn() *zerolog.Event { return root.Warn() } +// Error starts a new message with error level. +// +// You must call Msg on the returned event in order to send the event. func Error() *zerolog.Event { return root.Error() } +// Fatal starts a new message with fatal level. The os.Exit(1) function +// is called by the Msg method. +// +// You must call Msg on the returned event in order to send the event. func Fatal() *zerolog.Event { return root.Fatal() } -func NewLogger(component string) zerolog.Logger { - return root.With().Str("component", component).Logger() +// Panic starts a new message with panic level. The message is also sent +// to the panic function. +// +// You must call Msg on the returned event in order to send the event. +func Panic() *zerolog.Event { + return root.Panic() +} + +// WithLevel starts a new message with level. +// +// You must call Msg on the returned event in order to send the event. +func WithLevel(level zerolog.Level) *zerolog.Event { + return root.WithLevel(level) +} + +// Log starts a new message with no level. Setting zerolog.GlobalLevel to +// zerolog.Disabled will still disable events produced by this method. +// +// You must call Msg on the returned event in order to send the event. +func Log() *zerolog.Event { + return root.Log() +} + +// Print sends a log event using debug level and no extra field. +// Arguments are handled in the manner of fmt.Print. +func Print(v ...interface{}) { + root.Print(v...) +} + +// Printf sends a log event using debug level and no extra field. +// Arguments are handled in the manner of fmt.Printf. +func Printf(format string, v ...interface{}) { + root.Printf(format, v...) +} + +// Close closes custom output writer if it configured +func Close() { + customOutMu.Lock() + defer customOutMu.Unlock() + if customOut != nil { + _ = customOut.Close() + customOut = nil + } +} + +// NewLogger creates child logger with specified component name +// NOTE: root logger MUST be initialized with ConfigureLogger +// before any logger call +func NewLogger(component string) *Logger { + return &Logger{comp: component} } diff --git a/pkg/metrics/server.go b/pkg/metrics/server.go index 53a9282..30b1f4e 100644 --- a/pkg/metrics/server.go +++ b/pkg/metrics/server.go @@ -16,7 +16,10 @@ import ( "github.com/sot-tech/mochi/pkg/stop" ) -var serverCounter = new(int32) +var ( + logger = log.NewLogger("metrics") + serverCounter = new(int32) +) // Enabled indicates that configured at least one metrics server func Enabled() bool { @@ -74,7 +77,7 @@ func NewServer(addr string) *Server { atomic.AddInt32(serverCounter, 1) defer atomic.AddInt32(serverCounter, -1) if err := s.srv.ListenAndServe(); !errors.Is(err, http.ErrServerClosed) { - log.Fatal("failed while serving prometheus", log.Err(err)) + logger.Error().Err(err).Msg("failed while serving prometheus") } }() diff --git a/storage/keydb/storage.go b/storage/keydb/storage.go index 0d05199..b972d8a 100644 --- a/storage/keydb/storage.go +++ b/storage/keydb/storage.go @@ -14,6 +14,7 @@ import ( "errors" "github.com/go-redis/redis/v8" + "github.com/rs/zerolog" "github.com/sot-tech/mochi/bittorrent" "github.com/sot-tech/mochi/pkg/conf" @@ -29,9 +30,12 @@ const ( expireMemberCmd = "EXPIREMEMBER" ) -// ErrNotKeyDB returned from initializer if connected does not support KeyDB -// specific command (EXPIREMEMBER) -var ErrNotKeyDB = errors.New("provided instance seems not KeyDB") +var ( + logger = log.NewLogger(Name) + // ErrNotKeyDB returned from initializer if connected does not support KeyDB + // specific command (EXPIREMEMBER) + ErrNotKeyDB = errors.New("provided instance seems not KeyDB") +) func init() { // Register the storage driver. @@ -72,10 +76,8 @@ func newStore(cfg r.Config) (*store, error) { if err == nil { st = &store{ Connection: rs, - logFields: cfg.LogFields(), peerTTL: uint(cfg.PeerLifetime.Seconds()), } - st.logFields["name"] = Name } return st, err @@ -83,8 +85,12 @@ func newStore(cfg r.Config) (*store, error) { type store struct { r.Connection - logFields log.Fields - peerTTL uint + peerTTL uint +} + +// MarshalZerologObject writes configuration into zerolog event +func (s store) MarshalZerologObject(e *zerolog.Event) { + e.Str("type", Name).Object("config", s.Config) } func (s store) setPeerTTL(infoHashKey, peerID string) error { @@ -92,10 +98,10 @@ func (s store) setPeerTTL(infoHashKey, peerID string) error { } func (s store) addPeer(infoHashKey, peerID string) (err error) { - log.Debug("storage: KeyDB: PutPeer", log.Fields{ - "infoHashKey": infoHashKey, - "peerID": peerID, - }) + logger.Trace(). + Str("infoHashKey", infoHashKey). + Str("peerID", peerID). + Msg("add peer") if err = s.SAdd(context.TODO(), infoHashKey, peerID).Err(); err == nil { err = s.setPeerTTL(infoHashKey, peerID) } @@ -103,10 +109,10 @@ func (s store) addPeer(infoHashKey, peerID string) (err error) { } func (s store) delPeer(infoHashKey, peerID string) error { - log.Debug("storage: KeyDB: DeletePeer", log.Fields{ - "infoHashKey": infoHashKey, - "peerID": peerID, - }) + logger.Trace(). + Str("infoHashKey", infoHashKey). + Str("peerID", peerID). + Msg("del peer") deleted, err := s.SRem(context.TODO(), infoHashKey, peerID).Uint64() err = r.AsNil(err) if err == nil && deleted == 0 { @@ -133,10 +139,10 @@ func (s store) DeleteLeecher(ih bittorrent.InfoHash, peer bittorrent.Peer) error } func (s store) GraduateLeecher(ih bittorrent.InfoHash, peer bittorrent.Peer) (err error) { - log.Debug("storage: KeyDB: GraduateLeecher", log.Fields{ - "infoHash": ih, - "peer": peer, - }) + logger.Trace(). + Stringer("infoHash", ih). + Object("peer", peer). + Msg("graduate leecher") infoHash, peerID := ih.RawString(), peer.RawString() ihSeederKey := r.InfoHashKey(infoHash, true, peer.Addr().Is6()) ihLeecherKey := r.InfoHashKey(infoHash, false, peer.Addr().Is6()) @@ -153,12 +159,12 @@ func (s store) GraduateLeecher(ih bittorrent.InfoHash, peer bittorrent.Peer) (er // AnnouncePeers is the same function as redis.AnnouncePeers func (s store) AnnouncePeers(ih bittorrent.InfoHash, seeder bool, numWant int, v6 bool) ([]bittorrent.Peer, error) { - log.Debug("storage: KeyDB: AnnouncePeers", log.Fields{ - "infoHash": ih, - "seeder": seeder, - "numWant": numWant, - "v6": v6, - }) + logger.Trace(). + Stringer("infoHash", ih). + Bool("seeder", seeder). + Int("numWant", numWant). + Bool("v6", v6). + Msg("announce peers") return s.GetPeers(ih, seeder, numWant, v6, func(ctx context.Context, infoHashKey string, maxCount int) *redis.StringSliceCmd { return s.SRandMemberN(context.TODO(), infoHashKey, int64(maxCount)) @@ -167,9 +173,9 @@ func (s store) AnnouncePeers(ih bittorrent.InfoHash, seeder bool, numWant int, v // ScrapeSwarm is the same function as redis.ScrapeSwarm except `SCard` call instead of `HLen` func (s store) ScrapeSwarm(ih bittorrent.InfoHash) (leechers uint32, seeders uint32, snatched uint32) { - log.Debug("storage: KeyDB: ScrapeSwarm", log.Fields{ - "infoHash": ih, - }) + logger.Trace(). + Stringer("infoHash", ih). + Msg("scrape swarm") leechers, seeders = s.CountPeers(ih, s.SCard) return } @@ -182,7 +188,3 @@ func (s *store) Stop() stop.Result { } return c.Result() } - -func (s store) LogFields() log.Fields { - return s.logFields -} diff --git a/storage/memory/storage.go b/storage/memory/storage.go index 29366b8..8225e50 100644 --- a/storage/memory/storage.go +++ b/storage/memory/storage.go @@ -11,6 +11,8 @@ import ( "sync" "time" + "github.com/rs/zerolog" + "github.com/sot-tech/mochi/bittorrent" "github.com/sot-tech/mochi/pkg/conf" "github.com/sot-tech/mochi/pkg/log" @@ -27,6 +29,8 @@ const ( defaultShardCount = 1024 ) +var logger = log.NewLogger(Name) + func init() { // Register the storage driver. storage.RegisterBuilder(Name, builder) @@ -45,12 +49,9 @@ type Config struct { ShardCount int `cfg:"shard_count"` } -// LogFields renders the current config as a set of Logrus fields. -func (cfg Config) LogFields() log.Fields { - return log.Fields{ - "name": Name, - "shardCount": cfg.ShardCount, - } +// MarshalZerologObject writes configuration into zerolog event +func (cfg Config) MarshalZerologObject(e *zerolog.Event) { + e.Int("shardCount", cfg.ShardCount) } // Validate sanity checks values set in a config and returns a new config with @@ -62,11 +63,11 @@ func (cfg Config) Validate() Config { if cfg.ShardCount <= 0 || cfg.ShardCount > (math.MaxInt/2) { validcfg.ShardCount = defaultShardCount - log.Warn("falling back to default configuration", log.Fields{ - "name": Name + ".ShardCount", - "provided": cfg.ShardCount, - "default": validcfg.ShardCount, - }) + log.Warn(). + Str("name", "ShardCount"). + Int("provided", cfg.ShardCount). + Int("default", validcfg.ShardCount). + Msg("falling back to default configuration") } return validcfg @@ -111,6 +112,11 @@ type peerStore struct { wg sync.WaitGroup } +// MarshalZerologObject writes configuration into zerolog event +func (ps *peerStore) MarshalZerologObject(e *zerolog.Event) { + e.Str("type", Name).Object("config", ps.cfg) +} + var _ storage.PeerStorage = &peerStore{} func (ps *peerStore) ScheduleGC(gcInterval, peerLifeTime time.Duration) { @@ -125,10 +131,12 @@ func (ps *peerStore) ScheduleGC(gcInterval, peerLifeTime time.Duration) { return case <-t.C: before := time.Now().Add(-peerLifeTime) - log.Debug("storage: Memory purging peers with no announces since", log.Fields{"before": before}) + logger.Trace().Time("before", before).Msg("purging peers with no announces") start := time.Now() ps.gc(before) - storage.PromGCDurationMilliseconds.Observe(float64(time.Since(start).Nanoseconds()) / float64(time.Millisecond)) + duration := time.Since(start) + logger.Debug().Dur("timeTaken", duration).Msg("gc complete") + storage.PromGCDurationMilliseconds.Observe(float64(duration.Milliseconds())) } } }() @@ -162,7 +170,7 @@ func (ps *peerStore) ScheduleStatisticsCollection(reportInterval time.Duration) storage.PromInfoHashesCount.Set(float64(numInfohashes)) storage.PromSeedersCount.Set(float64(numSeeders)) storage.PromLeechersCount.Set(float64(numLeechers)) - log.Debug("storage: Memory: populateProm() finished", log.Fields{"timeTaken": time.Since(before)}) + logger.Debug().TimeDiff("timeTaken", time.Now(), before).Msg("populate prom complete") } } } @@ -540,7 +548,3 @@ func (ps *peerStore) Stop() stop.Result { return c.Result() } - -func (ps *peerStore) LogFields() log.Fields { - return ps.cfg.LogFields() -} diff --git a/storage/redis/storage.go b/storage/redis/storage.go index 1e3a9c3..903db27 100644 --- a/storage/redis/storage.go +++ b/storage/redis/storage.go @@ -28,6 +28,7 @@ import ( "time" "github.com/go-redis/redis/v8" + "github.com/rs/zerolog" "github.com/sot-tech/mochi/bittorrent" "github.com/sot-tech/mochi/pkg/conf" @@ -64,8 +65,11 @@ const ( CountLeecherKey = "CHI_C_L" ) -// ErrSentinelAndClusterChecked returned from initializer if both Config.Sentinel and Config.Cluster provided -var ErrSentinelAndClusterChecked = errors.New("unable to use both cluster and sentinel mode") +var ( + logger = log.NewLogger(Name) + // ErrSentinelAndClusterChecked returned from initializer if both Config.Sentinel and Config.Cluster provided + ErrSentinelAndClusterChecked = errors.New("unable to use both cluster and sentinel mode") +) func init() { // Register the storage builder. @@ -98,7 +102,6 @@ func newStore(cfg Config) (*store, error) { Connection: rs, closed: make(chan any), wg: sync.WaitGroup{}, - logFields: cfg.LogFields(), }, nil } @@ -118,21 +121,18 @@ type Config struct { ConnectTimeout time.Duration `cfg:"connect_timeout"` } -// LogFields renders the current config as a set of Logrus fields. -func (cfg Config) LogFields() log.Fields { - return log.Fields{ - "name": Name, - "peerLifetime": cfg.PeerLifetime, - "addresses": cfg.Addresses, - "db": cfg.DB, - "poolSize": cfg.PoolSize, - "sentinel": cfg.Sentinel, - "sentinelMaster": cfg.SentinelMaster, - "cluster": cfg.Cluster, - "readTimeout": cfg.ReadTimeout, - "writeTimeout": cfg.WriteTimeout, - "connectTimeout": cfg.ConnectTimeout, - } +// MarshalZerologObject writes configuration fields into zerolog event +func (cfg Config) MarshalZerologObject(e *zerolog.Event) { + e.Strs("addresses", cfg.Addresses). + Int("db", cfg.DB). + Int("poolSize", cfg.PoolSize). + Bool("sentinel", cfg.Sentinel). + Str("sentinelMaster", cfg.SentinelMaster). + Bool("cluster", cfg.Cluster). + Dur("readTimeout", cfg.ReadTimeout). + Dur("writeTimeout", cfg.WriteTimeout). + Dur("connectTimeout", cfg.ConnectTimeout). + Dur("peerLifetime", cfg.PeerLifetime) } // Validate sanity checks values set in a config and returns a new config with @@ -157,38 +157,38 @@ func (cfg Config) Validate() (Config, error) { validCfg.Addresses = addresses if len(cfg.Addresses) == 0 { validCfg.Addresses = []string{defaultRedisAddress} - log.Warn("falling back to default configuration", log.Fields{ - "name": Name + ".Addresses", - "provided": cfg.Addresses, - "default": validCfg.Addresses, - }) + logger.Warn(). + Str("name", "Addresses"). + Strs("provided", cfg.Addresses). + Strs("default", validCfg.Addresses). + Msg("falling back to default configuration") } if cfg.ReadTimeout <= 0 { validCfg.ReadTimeout = defaultReadTimeout - log.Warn("falling back to default configuration", log.Fields{ - "name": Name + ".ReadTimeout", - "provided": cfg.ReadTimeout, - "default": validCfg.ReadTimeout, - }) + logger.Warn(). + Str("name", "ReadTimeout"). + Dur("provided", cfg.ReadTimeout). + Dur("default", validCfg.ReadTimeout). + Msg("falling back to default configuration") } if cfg.WriteTimeout <= 0 { validCfg.WriteTimeout = defaultWriteTimeout - log.Warn("falling back to default configuration", log.Fields{ - "name": Name + ".WriteTimeout", - "provided": cfg.WriteTimeout, - "default": validCfg.WriteTimeout, - }) + logger.Warn(). + Str("name", "WriteTimeout"). + Dur("provided", cfg.WriteTimeout). + Dur("default", validCfg.WriteTimeout). + Msg("falling back to default configuration") } if cfg.ConnectTimeout <= 0 { validCfg.ConnectTimeout = defaultConnectTimeout - log.Warn("falling back to default configuration", log.Fields{ - "name": Name + ".ConnectTimeout", - "provided": cfg.ConnectTimeout, - "default": validCfg.ConnectTimeout, - }) + logger.Warn(). + Str("name", "ConnectTimeout"). + Dur("provided", cfg.ConnectTimeout). + Dur("default", validCfg.ConnectTimeout). + Msg("falling back to default configuration") } return validCfg, nil @@ -238,7 +238,13 @@ func (cfg Config) Connect() (con Connection, err error) { _ = rs.Close() rs = nil } - return Connection{rs}, err + cfg.Login, cfg.Password = "", "" + return Connection{rs, cfg}, err +} + +// MarshalZerologObject writes configuration into zerolog event +func (ps *store) MarshalZerologObject(e *zerolog.Event) { + e.Str("type", Name).Object("config", ps.Config) } func (ps *store) ScheduleGC(gcInterval, peerLifeTime time.Duration) { @@ -254,9 +260,9 @@ func (ps *store) ScheduleGC(gcInterval, peerLifeTime time.Duration) { case <-t.C: start := time.Now() ps.gc(time.Now().Add(-peerLifeTime)) - duration := time.Since(start).Milliseconds() - log.Debug("storage: Redis: recordGCDuration", log.Fields{"timeTaken(ms)": duration}) - storage.PromGCDurationMilliseconds.Observe(float64(duration)) + duration := time.Since(start) + logger.Debug().Dur("timeTaken", duration).Msg("gc complete") + storage.PromGCDurationMilliseconds.Observe(float64(duration.Milliseconds())) t.Reset(gcInterval) } } @@ -285,7 +291,7 @@ func (ps *store) ScheduleStatisticsCollection(reportInterval time.Duration) { storage.PromInfoHashesCount.Set(float64(numInfoHashes)) storage.PromSeedersCount.Set(float64(numSeeders)) storage.PromLeechersCount.Set(float64(numLeechers)) - log.Debug("storage: Redis: populateProm() finished", log.Fields{"timeTaken": time.Since(before)}) + logger.Debug().TimeDiff("timeTaken", time.Now(), before).Msg("populate prom complete") } } } @@ -295,13 +301,13 @@ func (ps *store) ScheduleStatisticsCollection(reportInterval time.Duration) { // Connection is wrapper for redis.UniversalClient type Connection struct { redis.UniversalClient + Config } type store struct { Connection - closed chan any - wg sync.WaitGroup - logFields log.Fields + closed chan any + wg sync.WaitGroup } func (ps *store) count(key string, getLength bool) (n uint64) { @@ -313,10 +319,7 @@ func (ps *store) count(key string, getLength bool) (n uint64) { } err = AsNil(err) if err != nil { - log.Error("storage: Redis: GET/SCARD failure", log.Fields{ - "key": key, - "error": err, - }) + logger.Error().Err(err).Str("key", key).Msg("storage: Redis: GET/SCARD failure") } return } @@ -375,11 +378,10 @@ func InfoHashKey(infoHash string, seeder, v6 bool) (infoHashKey string) { } func (ps *store) putPeer(infoHashKey, peerCountKey, peerID string) error { - log.Debug("storage: Redis: PutPeer", log.Fields{ - "infoHashKey": infoHashKey, - "peerCountKey": peerCountKey, - "peerID": peerID, - }) + logger.Trace(). + Str("infoHashKey", infoHashKey). + Str("peerID", peerID). + Msg("put peer") return ps.tx(func(tx redis.Pipeliner) (err error) { if err = tx.HSet(context.TODO(), infoHashKey, peerID, ps.getClock()).Err(); err != nil { return @@ -393,11 +395,10 @@ func (ps *store) putPeer(infoHashKey, peerCountKey, peerID string) error { } func (ps *store) delPeer(infoHashKey, peerCountKey, peerID string) error { - log.Debug("storage: Redis: DeletePeer", log.Fields{ - "infoHashKey": infoHashKey, - "peerCountKey": peerCountKey, - "peerID": peerID, - }) + logger.Trace(). + Str("infoHashKey", infoHashKey). + Str("peerID", peerID). + Msg("del peer") deleted, err := ps.HDel(context.TODO(), infoHashKey, peerID).Uint64() err = AsNil(err) if err == nil { @@ -428,10 +429,10 @@ func (ps *store) DeleteLeecher(ih bittorrent.InfoHash, peer bittorrent.Peer) err } func (ps *store) GraduateLeecher(ih bittorrent.InfoHash, peer bittorrent.Peer) error { - log.Debug("storage: Redis: GraduateLeecher", log.Fields{ - "infoHash": ih, - "peer": peer, - }) + logger.Trace(). + Stringer("infoHash", ih). + Object("peer", peer). + Msg("graduate leecher") infoHash, peerID, isV6 := ih.RawString(), peer.RawString(), peer.Addr().Is6() ihSeederKey, ihLeecherKey := InfoHashKey(infoHash, true, isV6), InfoHashKey(infoHash, false, isV6) @@ -465,7 +466,7 @@ func (ps Connection) parsePeersList(peersResult *redis.StringSliceCmd) (peers [] if p, err := bittorrent.NewPeer(peerID); err == nil { peers = append(peers, p) } else { - log.Error("storage: Redis: unable to decode leecher", log.Fields{"peerID": peerID}) + logger.Error().Err(err).Str("peerID", peerID).Msg("unable to decode peer") } } } @@ -506,22 +507,19 @@ func (ps Connection) GetPeers(ih bittorrent.InfoHash, forSeeder bool, maxCount i } } else if l > 0 { err = nil - log.Warn("storage: Redis: error occurred while retrieving peers", log.Fields{ - "infoHash": infoHash, - "error": err, - }) + logger.Warn().Err(err).Stringer("infoHash", ih).Msg("error occurred while retrieving peers") } return } func (ps *store) AnnouncePeers(ih bittorrent.InfoHash, seeder bool, numWant int, v6 bool) ([]bittorrent.Peer, error) { - log.Debug("storage: Redis: AnnouncePeers", log.Fields{ - "infoHash": ih, - "seeder": seeder, - "numWant": numWant, - "peer": v6, - }) + logger.Trace(). + Stringer("infoHash", ih). + Bool("seeder", seeder). + Int("numWant", numWant). + Bool("v6", v6). + Msg("announce peers") return ps.GetPeers(ih, seeder, numWant, v6, func(ctx context.Context, infoHashKey string, maxCount int) *redis.StringSliceCmd { return ps.HRandField(ctx, infoHashKey, maxCount, false) @@ -534,10 +532,7 @@ func (ps Connection) countPeers(infoHashKey string, countFn getPeerCountFn) uint count, err := countFn(context.TODO(), infoHashKey).Result() err = AsNil(err) if err != nil { - log.Error("storage: Redis: key size calculation failure", log.Fields{ - "infoHashKey": infoHashKey, - "error": err, - }) + logger.Error().Err(err).Str("infoHashKey", infoHashKey).Msg("key size calculation failure") } return uint32(count) } @@ -555,9 +550,9 @@ func (ps Connection) CountPeers(ih bittorrent.InfoHash, countFn getPeerCountFn) } func (ps *store) ScrapeSwarm(ih bittorrent.InfoHash) (leechers uint32, seeders uint32, snatched uint32) { - log.Debug("storage: Redis ScrapeSwarm", log.Fields{ - "infoHash": ih, - }) + logger.Trace(). + Stringer("infoHash", ih). + Msg("scrape swarm") leechers, seeders = ps.CountPeers(ih, ps.HLen) @@ -579,7 +574,7 @@ func (ps Connection) Put(ctx string, values ...storage.Entry) (err error) { err = ps.HSet(context.TODO(), PrefixKey+ctx, args...).Err() if err != nil { if strings.Contains(err.Error(), argNumErrorMsg) { - log.Warn("This Redis version/implementation does not support variadic arguments for HSET") + logger.Warn().Msg("This Redis version/implementation does not support variadic arguments for HSET") for _, p := range values { if err = ps.HSet(context.TODO(), PrefixKey+ctx, p.Key, p.Value).Err(); err != nil { break @@ -613,7 +608,7 @@ func (ps Connection) Delete(ctx string, keys ...string) (err error) { err = AsNil(ps.HDel(context.TODO(), PrefixKey+ctx, keys...).Err()) if err != nil { if strings.Contains(err.Error(), argNumErrorMsg) { - log.Warn("This Redis version/implementation does not support variadic arguments for HDEL") + logger.Warn().Msg("This Redis version/implementation does not support variadic arguments for HDEL") for _, k := range keys { if err = AsNil(ps.HDel(context.TODO(), PrefixKey+ctx, k).Err()); err != nil { break @@ -676,7 +671,6 @@ func (Connection) Preservable() bool { // transaction. The infohash key will remain in the addressFamil hash and // we'll attempt to clean it up the next time gc runs. func (ps *store) gc(cutoff time.Time) { - log.Debug("storage: Redis: purging peers with no announces since", log.Fields{"before": cutoff}) cutoffNanos := cutoff.UnixNano() // list all infoHashKeys in the group infoHashKeys, err := ps.SMembers(context.Background(), IHKey).Result() @@ -690,7 +684,7 @@ func (ps *store) gc(cutoff time.Time) { } else if strings.HasPrefix(infoHashKey, IH4LeecherKey) || strings.HasPrefix(infoHashKey, IH6LeecherKey) { cntKey = CountLeecherKey } else { - log.Warn("storage: Redis: unexpected record found in info hash set", log.Fields{"infoHashKey": infoHashKey}) + logger.Warn().Str("infoHashKey", infoHashKey).Msg("unexpected record found in info hash set") continue } // list all (peer, timeout) pairs for the ih @@ -701,16 +695,15 @@ func (ps *store) gc(cutoff time.Time) { for peerID, timeStamp := range peerList { if mtime, err := strconv.ParseInt(timeStamp, 10, 64); err == nil { if mtime <= cutoffNanos { - log.Debug("storage: Redis: adding peer to remove list", log.Fields{"peerID": peerID}) + logger.Trace().Str("peerID", peerID).Msg("adding peer to remove list") peersToRemove = append(peersToRemove, peerID) } } else { - log.Error("storage: Redis: unable to decode peer timestamp", log.Fields{ - "infoHashKey": infoHashKey, - "peerID": peerID, - "timestamp": timeStamp, - "error": err, - }) + logger.Error().Err(err). + Str("infoHashKey", infoHashKey). + Str("peerID", peerID). + Str("timestamp", timeStamp). + Msg("unable to decode peer timestamp") } } if len(peersToRemove) > 0 { @@ -718,35 +711,32 @@ func (ps *store) gc(cutoff time.Time) { err = AsNil(err) if err != nil { if strings.Contains(err.Error(), argNumErrorMsg) { - log.Warn("This Redis version/implementation does not support variadic arguments for HDEL") + logger.Warn().Msg("This Redis version/implementation does not support variadic arguments for HDEL") for _, k := range peersToRemove { count, err := ps.HDel(context.Background(), infoHashKey, k).Result() err = AsNil(err) if err != nil { - log.Error("storage: Redis: unable to delete peer", log.Fields{ - "infoHashKey": infoHashKey, - "peerID": k, - "error": err, - }) + logger.Error().Err(err). + Str("infoHashKey", infoHashKey). + Str("peerID", k). + Msg("unable to delete peer") } else { removedPeerCount += count } } } else { - log.Error("storage: Redis: unable to delete peers", log.Fields{ - "infoHashKey": infoHashKey, - "peerIds": peersToRemove, - "error": err, - }) + logger.Error().Err(err). + Str("infoHashKey", infoHashKey). + Strs("peerIDs", peersToRemove). + Msg("unable to delete peers") } } if removedPeerCount > 0 { // DECR seeder/leecher counter if err = ps.DecrBy(context.Background(), cntKey, removedPeerCount).Err(); err != nil { - log.Error("storage: Redis: unable to decrement seeder/leecher peer count", log.Fields{ - "infoHashKey": infoHashKey, - "countKey": cntKey, - "error": err, - }) + logger.Error().Err(err). + Str("infoHashKey", infoHashKey). + Str("countKey", cntKey). + Msg("unable to decrement seeder/leecher peer count") } } } @@ -764,20 +754,20 @@ func (ps *store) gc(cutoff time.Time) { return err }, infoHashKey)) if err != nil { - log.Error("storage: Redis: unable to clean info hash records", log.Fields{ - "infoHashKey": infoHashKey, - "error": err, - }) + logger.Error().Err(err). + Str("infoHashKey", infoHashKey). + Msg("unable to clean info hash records") } } else { - log.Error("storage: Redis: unable to fetch info hash peers", log.Fields{ - "infoHashKey": infoHashKey, - "error": err, - }) + logger.Error().Err(err). + Str("infoHashKey", infoHashKey). + Msg("unable to fetch info hash peers") } } } else { - log.Error("storage: Redis: unable to fetch info hash set", log.Fields{"hashSet": IHKey, "error": err}) + logger.Error().Err(err). + Str("hashSet", IHKey). + Msg("unable to fetch info hash peers") } } @@ -790,7 +780,7 @@ func (ps *store) Stop() stop.Result { ps.wg.Wait() var err error if ps.UniversalClient != nil { - log.Info("storage: Redis: exiting. mochi does not clear data in redis when exiting. mochi keys have prefix " + PrefixKey) + logger.Info().Msg("storage: Redis: exiting. mochi does not clear data in redis when exiting. mochi keys have prefix " + PrefixKey) err = ps.UniversalClient.Close() ps.UniversalClient = nil } @@ -799,11 +789,3 @@ func (ps *store) Stop() stop.Result { return c.Result() } - -func (ps *store) LogFields() log.Fields { - fields := make(log.Fields, len(ps.logFields)) - for k, v := range ps.logFields { - fields[k] = v - } - return fields -} diff --git a/storage/storage.go b/storage/storage.go index ef403ca..70f2c6c 100644 --- a/storage/storage.go +++ b/storage/storage.go @@ -70,7 +70,6 @@ func (c Config) sanitizeStatisticsConfig() (statInterval time.Duration) { Dur("provided", c.PrometheusReportingInterval). Dur("default", defaultPrometheusReportingInterval). Msg("falling back to default configuration") - } return } @@ -262,15 +261,33 @@ func NewStorage(name string, cfg conf.MapConfig) (ps PeerStorage, err error) { } if gc, isOk := ps.(GCAware); isOk { - gc.ScheduleGC(c.sanitizeGCConfig()) + gcInterval, peerTTL := c.sanitizeGCConfig() + logger.Info(). + Str("type", name). + Dur("gcInterval", gcInterval). + Dur("peerTTL", peerTTL). + Msg("scheduling GC") + gc.ScheduleGC(gcInterval, peerTTL) + } else { + logger.Debug(). + Str("type", name). + Msg("storage does not support GC") } - if statInterval := c.sanitizeStatisticsConfig(); statInterval > 0 { - if st, isOk := ps.(StatisticsAware); isOk { + if st, isOk := ps.(StatisticsAware); isOk { + if statInterval := c.sanitizeStatisticsConfig(); statInterval > 0 { + logger.Info(). + Str("type", name). + Dur("statInterval", statInterval). + Msg("scheduling statistics collection") st.ScheduleStatisticsCollection(statInterval) + } else { + logger.Info().Str("type", name).Msg("statistics collection disabled because of zero reporting interval") } } else { - logger.Info().Msg("prometheus disabled because of zero reporting interval") + logger.Debug(). + Str("type", name). + Msg("storage does not support statistics collection") } return diff --git a/storage/test/storage_test_base.go b/storage/test/storage_test_base.go index 5d78410..ae1495e 100644 --- a/storage/test/storage_test_base.go +++ b/storage/test/storage_test_base.go @@ -9,14 +9,23 @@ import ( "github.com/stretchr/testify/require" "github.com/sot-tech/mochi/bittorrent" + "github.com/sot-tech/mochi/pkg/log" "github.com/sot-tech/mochi/storage" ) +func init() { + _ = log.ConfigureLogger("", "warn", false, false) +} + // PeerEqualityFunc is the boolean function to use to check two Peers for // equality. // Depending on the implementation of the PeerStorage, this can be changed to // use (Peer).EqualEndpoint instead. -var PeerEqualityFunc = func(p1, p2 bittorrent.Peer) bool { return p1.Equal(p2) } +var PeerEqualityFunc = func(p1, p2 bittorrent.Peer) bool { + return p1.Port() == p2.Port() && + p1.Addr().Compare(p1.Addr()) == 0 && + p1.ID == p2.ID +} type testHolder struct { st storage.PeerStorage