(tested) complete replace logrus with zerolog

* remove cobra dependency and split execs to mochi and e2e

* add log init synchronization
This commit is contained in:
Lawrence, Rendall
2022-05-02 03:13:58 +03:00
parent 4d646f7c09
commit c50a532181
36 changed files with 753 additions and 707 deletions
+8 -6
View File
@@ -44,14 +44,15 @@ jobs:
go-version: "^1.18"
- name: "Install and configure mochi"
run: |
go install --tags e2e ./cmd/mochi
go install ./cmd/mochi
go install ./cmd/mochi-e2e
cat ./dist/example_config.yaml
- name: "Run end-to-end tests"
run: |
mochi --config=./dist/example_config.yaml --debug &
mochi --config=./dist/example_config.yaml --logLevel debug --logPretty &
pid=$!
sleep 2
mochi e2e --debug
mochi-e2e
kill $pid
e2e-redis:
name: "E2E Redis Tests"
@@ -67,15 +68,16 @@ jobs:
go-version: "^1.18"
- name: "Install and configure mochi"
run: |
go install --tags e2e ./cmd/mochi
go install ./cmd/mochi
go install ./cmd/mochi-e2e
curl -LO https://github.com/jzelinskie/faq/releases/download/0.0.6/faq-linux-amd64
chmod +x faq-linux-amd64
./faq-linux-amd64 '.mochi.storage = {"config":{"gc_interval":"3m","peer_lifetime":"31m","prometheus_reporting_interval":"1s","connect_timeout":"15s","read_timeout":"15s","write_timeout":"15s"},"name":"redis"}' ./dist/example_config.yaml > ./dist/example_redis_config.yaml
cat ./dist/example_redis_config.yaml
- name: "Run end-to-end tests"
run: |
mochi --config=./dist/example_redis_config.yaml --debug &
mochi --config=./dist/example_redis_config.yaml --logLevel debug --logPretty &
pid=$!
sleep 2
mochi e2e --debug
mochi-e2e
kill $pid
+3 -2
View File
@@ -18,7 +18,7 @@ Modified version of [Chihaya](https://github.com/chihaya/chihaya), an open sourc
* Allows mixed peers: IPv4 requesters can fetch IPv6 peers or vice versa;
* Contains some internal improvements.
_Note: From time to time MoChi fetch modifications from Chihaya but is not
_Note: From time to time MoChi fetch modifications from Chihaya but is not
fully compatible with original project (mainly in Redis storage structure),
so it cannot be mixed with Chihaya (i.e. it is impossible create MoChi-Chihaya cluster)._
@@ -26,7 +26,8 @@ so it cannot be mixed with Chihaya (i.e. it is impossible create MoChi-Chihaya c
The main goal of made modifications is to create semi-private tracker like [Hefur](https://github.com/sot-tech/hefur)
but with cluster support (allowed torrents limited by pre-existent `list` middleware and another `directory` middleware
to [limit registered torrents](docs/middleware/torrent_approval.md)) and to maximize torrent swarm by providing maximum peers as possible (IPv4+IPv6).
to [limit registered torrents](docs/middleware/torrent_approval.md)) and to maximize torrent swarm by providing maximum
peers as possible (IPv4+IPv6).
## Notice
+7 -22
View File
@@ -125,6 +125,7 @@ type Scrape struct {
Incomplete uint32
}
// MarshalZerologObject writes fields into zerolog event
func (s Scrape) MarshalZerologObject(e *zerolog.Event) {
e.Stringer("infoHash", s.InfoHash).
Uint32("snatches", s.Snatches).
@@ -170,13 +171,6 @@ func NewPeer(data string) (Peer, error) {
return peer, err
}
// String implements fmt.Stringer to return a human-readable representation.
// The string will have the format <PeerID>@[<IP>]:<port>, for example
// "0102030405060708090a0b0c0d0e0f1011121314@[10.11.12.13]:1234"
func (p Peer) String() string {
return fmt.Sprintf("%s@[%s]:%d", p.ID, p.Addr(), p.Port())
}
// RawString generates concatenation of PeerID, net port and IP-address
func (p Peer) RawString() string {
ip := p.Addr()
@@ -187,25 +181,16 @@ func (p Peer) RawString() string {
return string(b)
}
// Addr returns unmapped peer's IP address
func (p Peer) Addr() netip.Addr {
return p.AddrPort.Addr().Unmap()
}
// MarshalZerologObject writes fields into zerolog event
func (p Peer) MarshalZerologObject(e *zerolog.Event) {
e.Stringer("id", p.ID).
Stringer("address", p.Addr()).
Uint16("port", p.Port())
}
// Equal reports whether p and x are the same.
func (p Peer) Equal(x Peer) bool { return p.EqualEndpoint(x) && p.ID == x.ID }
// EqualEndpoint reports whether p and x have the same endpoint.
func (p Peer) EqualEndpoint(x Peer) bool {
return p.Port() == x.Port() &&
p.Addr().Compare(x.Addr()) == 0
}
// Addr returns unmapped peer's IP address
func (p Peer) Addr() netip.Addr {
return p.AddrPort.Addr().Unmap()
}
// ClientError represents an error that should be exposed to the client over
-57
View File
@@ -1,57 +0,0 @@
package bittorrent
import (
"fmt"
"net/netip"
"testing"
"github.com/stretchr/testify/require"
)
var (
b = []byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20}
expected = "0102030405060708090a0b0c0d0e0f1011121314"
)
func TestPeerID_String(t *testing.T) {
pid, err := NewPeerID(b)
require.Nil(t, err)
s := pid.String()
require.Equal(t, expected, s)
}
func TestInfoHash_String(t *testing.T) {
ih, err := NewInfoHash(b)
require.Nil(t, err)
require.Equal(t, expected, ih.String())
}
func TestPeer_String(t *testing.T) {
pid, err := NewPeerID(b)
require.Nil(t, err)
id, _ := NewPeerID(b)
peerStringTestCases := []struct {
input Peer
expected string
}{
{
input: Peer{
ID: id,
AddrPort: netip.MustParseAddrPort("10.11.12.1:1234"),
},
expected: fmt.Sprintf("%s@[10.11.12.1]:1234", expected),
},
{
input: Peer{
ID: id,
AddrPort: netip.MustParseAddrPort("[2001:db8::ff00:42:8329]:1234"),
},
expected: fmt.Sprintf("%s@[2001:db8::ff00:42:8329]:1234", expected),
},
}
for _, c := range peerStringTestCases {
c.input.ID = pid
got := c.input.String()
require.Equal(t, c.expected, got)
}
}
+32 -25
View File
@@ -24,42 +24,49 @@ const (
// Completed is the event sent by a BitTorrent client when it finishes
// downloading all of the required chunks.
Completed
// NoneStr string representation of None event
NoneStr = "none"
// StartedStr string representation of Started event
StartedStr = "started"
// StoppedStr string representation of Stopped event
StoppedStr = "stopped"
// CompletedStr string representation of Completed event
CompletedStr = "completed"
)
var (
eventToString = make(map[Event]string)
stringToEvent = make(map[string]Event)
)
func init() {
eventToString[None] = "none"
eventToString[Started] = "started"
eventToString[Stopped] = "stopped"
eventToString[Completed] = "completed"
stringToEvent[""] = None
for k, v := range eventToString {
stringToEvent[v] = k
}
}
// NewEvent returns the proper Event given a string.
func NewEvent(eventStr string) (evt Event, err error) {
if e, ok := stringToEvent[strings.ToLower(eventStr)]; ok {
evt = e
} else {
switch strings.ToLower(eventStr) {
case NoneStr, "":
evt = None
case StartedStr:
evt = Started
case StoppedStr:
evt = Stopped
case CompletedStr:
evt = Completed
default:
evt, err = None, ErrUnknownEvent
}
return
}
// String implements Stringer for an event.
func (e Event) String() (s string) {
if name, ok := eventToString[e]; ok {
s = name
} else {
switch e {
case None:
s = NoneStr
case Started:
s = StartedStr
case Stopped:
s = StoppedStr
case Completed:
s = CompletedStr
default:
s = "<unknown>"
}
return
+1
View File
@@ -211,6 +211,7 @@ func (qp *QueryParams) RawQuery() string {
return qp.query
}
// MarshalZerologObject writes fields into zerolog event
func (qp QueryParams) MarshalZerologObject(e *zerolog.Event) {
e.Str("path", qp.path).Str("query", qp.query)
}
+29 -16
View File
@@ -16,10 +16,6 @@ type RequestAddress struct {
Provided bool
}
func (a RequestAddress) MarshalZerologObject(e *zerolog.Event) {
e.Stringer("address", a.Addr).Bool("provided", a.Provided)
}
// Validate checks if netip.Addr is valid and not unspecified (0.0.0.0)
func (a RequestAddress) Validate() bool {
return a.IsValid() && !a.IsUnspecified()
@@ -35,6 +31,11 @@ func (a RequestAddress) String() string {
return fmt.Sprint(a.Addr.String(), p)
}
// MarshalZerologObject writes fields into zerolog event
func (a RequestAddress) MarshalZerologObject(e *zerolog.Event) {
e.Stringer("address", a.Addr).Bool("provided", a.Provided)
}
// RequestAddresses is an array of RequestAddress used mainly for
// sort.Interface implementation.
// Frontends may determine peer's address from connections info
@@ -42,12 +43,6 @@ func (a RequestAddress) String() string {
// connection information about peer
type RequestAddresses []RequestAddress
func (aa RequestAddresses) MarshalZerologArray(a *zerolog.Array) {
for _, addr := range aa {
a.Object(addr)
}
}
func (aa RequestAddresses) Len() int {
return len(aa)
}
@@ -99,8 +94,17 @@ func (aa RequestAddresses) GetFirst() netip.Addr {
return a
}
// MarshalZerologArray writes array elements to zerolog event
func (aa RequestAddresses) MarshalZerologArray(a *zerolog.Array) {
for _, addr := range aa {
a.Object(addr)
}
}
// Peers wrapper of array of Peer-s
type Peers []Peer
// MarshalZerologArray writes array elements to zerolog event
func (p Peers) MarshalZerologArray(a *zerolog.Array) {
for _, peer := range p {
a.Object(peer)
@@ -115,12 +119,6 @@ type RequestPeer struct {
RequestAddresses
}
func (rp RequestPeer) MarshalZerologObject(e *zerolog.Event) {
e.Stringer("id", rp.ID).
Array("addresses", rp.RequestAddresses).
Uint16("port", rp.Port)
}
// Peers constructs array of Peer-s with the same ID and Port
// for every RequestAddress array.
func (rp RequestPeer) Peers() (peers Peers) {
@@ -133,6 +131,13 @@ func (rp RequestPeer) Peers() (peers Peers) {
return
}
// MarshalZerologObject writes fields into zerolog event
func (rp RequestPeer) MarshalZerologObject(e *zerolog.Event) {
e.Stringer("id", rp.ID).
Array("addresses", rp.RequestAddresses).
Uint16("port", rp.Port)
}
// AnnounceRequest represents the parsed parameters from an announce request.
type AnnounceRequest struct {
Event Event
@@ -149,6 +154,7 @@ type AnnounceRequest struct {
Params
}
// MarshalZerologObject writes fields into zerolog event
func (r AnnounceRequest) MarshalZerologObject(e *zerolog.Event) {
e.Stringer("event", r.Event).
Stringer("infoHash", r.InfoHash).
@@ -175,6 +181,7 @@ type AnnounceResponse struct {
IPv6Peers Peers
}
// MarshalZerologObject writes fields into zerolog event
func (r AnnounceResponse) MarshalZerologObject(e *zerolog.Event) {
e.Bool("compact", r.Compact).
Uint32("complete", r.Complete).
@@ -185,8 +192,10 @@ func (r AnnounceResponse) MarshalZerologObject(e *zerolog.Event) {
Array("ipv6Peers", r.IPv6Peers)
}
// InfoHashes wrapper of array of InfoHash-es
type InfoHashes []InfoHash
// MarshalZerologArray writes array elements to zerolog event
func (i InfoHashes) MarshalZerologArray(a *zerolog.Array) {
for _, ih := range i {
a.Str(ih.String())
@@ -202,14 +211,17 @@ type ScrapeRequest struct {
Params Params
}
// MarshalZerologObject writes fields into zerolog event
func (r ScrapeRequest) MarshalZerologObject(e *zerolog.Event) {
e.Array("addresses", r.RequestAddresses).
Array("infoHashes", r.InfoHashes).
Object("params", r.Params)
}
// Scrapes wrapper of array of Scrape-s
type Scrapes []Scrape
// MarshalZerologArray writes array elements to zerolog event
func (s Scrapes) MarshalZerologArray(a *zerolog.Array) {
for _, scrape := range s {
a.Object(scrape)
@@ -224,6 +236,7 @@ type ScrapeResponse struct {
Files Scrapes
}
// MarshalZerologObject writes fields into zerolog event
func (sr ScrapeResponse) MarshalZerologObject(e *zerolog.Event) {
e.Array("scrapes", sr.Files)
}
+3 -10
View File
@@ -5,6 +5,7 @@ import (
)
var (
logger = log.NewLogger("bittorrent")
// ErrInvalidIP indicates an invalid IP for an Announce.
ErrInvalidIP = ClientError("invalid IP")
@@ -29,12 +30,7 @@ func SanitizeAnnounce(r *AnnounceRequest, maxNumWant, defaultNumWant uint32) err
r.NumWant = maxNumWant
}
log.Debug("sanitized announce", r, log.Fields{
"port": r.Port,
"addresses": r.RequestAddresses,
"maxNumWant": maxNumWant,
"defaultNumWant": defaultNumWant,
})
logger.Debug().Object("request", r).Msg("sanitized announce")
return nil
}
@@ -49,9 +45,6 @@ func SanitizeScrape(r *ScrapeRequest, maxScrapeInfoHashes uint32) error {
return ErrInvalidIP
}
log.Debug("sanitized scrape", r, log.Fields{
"addresses": r.RequestAddresses,
"maxScrapeInfoHashes": maxScrapeInfoHashes,
})
logger.Debug().Object("request", r).Msg("sanitized scrape")
return nil
}
+19 -51
View File
@@ -1,82 +1,50 @@
//go:build e2e
// +build e2e
package main
import (
"flag"
"fmt"
"log"
"math/rand"
"time"
"github.com/anacrolix/torrent/tracker"
"github.com/spf13/cobra"
"github.com/sot-tech/mochi/bittorrent"
"github.com/sot-tech/mochi/pkg/log"
)
func init() {
e2eCmd = &cobra.Command{
Use: "e2e",
Short: "exec e2e tests",
Long: "Execute the Conf end-to-end test suite",
RunE: EndToEndRunCmdFunc,
}
func main() {
httpAddress := flag.String("httpaddr", "http://127.0.0.1:6969/announce", "address of the HTTP tracker")
udpAddress := flag.String("udpaddr", "udp://127.0.0.1:6969", "address of the UDP tracker")
delay := flag.Duration("delay", time.Second, "delay between announces")
flag.Parse()
e2eCmd.Flags().String("httpaddr", "http://127.0.0.1:6969/announce", "address of the HTTP tracker")
e2eCmd.Flags().String("udpaddr", "udp://127.0.0.1:6969", "address of the UDP tracker")
e2eCmd.Flags().Duration("delay", time.Second, "delay between announces")
}
// EndToEndRunCmdFunc implements a Cobra command that runs the end-to-end test
// suite for a Conf build.
func EndToEndRunCmdFunc(cmd *cobra.Command, args []string) error {
delay, err := cmd.Flags().GetDuration("delay")
if err != nil {
return err
}
// Test the HTTP tracker
httpAddr, err := cmd.Flags().GetString("httpaddr")
if err != nil {
return err
}
if len(httpAddr) != 0 {
log.Info("testing HTTP...")
err := test(httpAddr, delay)
if len(*httpAddress) != 0 {
log.Println("testing HTTP...")
err := test(*httpAddress, *delay)
if err != nil {
return err
log.Fatal(err)
}
log.Info("success")
log.Println("success")
}
// Test the UDP tracker.
udpAddr, err := cmd.Flags().GetString("udpaddr")
if err != nil {
return err
}
if len(udpAddr) != 0 {
log.Info("testing UDP...")
err := test(udpAddr, delay)
if len(*udpAddress) != 0 {
log.Println("testing UDP...")
err := test(*udpAddress, *delay)
if err != nil {
return err
log.Fatal(err)
}
log.Info("success")
log.Println("success")
}
return nil
}
func test(addr string, delay time.Duration) error {
b := make([]byte, bittorrent.InfoHashV1Len)
rand.Read(b)
ih, _ := bittorrent.NewInfoHash(b)
return testWithInfohash(ih, addr, delay)
return testWithInfoHash(ih, addr, delay)
}
func testWithInfohash(infoHash bittorrent.InfoHash, url string, delay time.Duration) error {
func testWithInfoHash(infoHash bittorrent.InfoHash, url string, delay time.Duration) error {
var ih [bittorrent.InfoHashV1Len]byte
req := tracker.AnnounceRequest{
InfoHash: ih,
+21 -220
View File
@@ -1,182 +1,17 @@
package main
import (
"context"
"errors"
"fmt"
"flag"
"log"
"os"
"os/signal"
"runtime"
"strings"
"syscall"
"github.com/spf13/cobra"
"github.com/sot-tech/mochi/frontend/http"
"github.com/sot-tech/mochi/frontend/udp"
"github.com/sot-tech/mochi/middleware"
"github.com/sot-tech/mochi/pkg/conf"
"github.com/sot-tech/mochi/pkg/log"
"github.com/sot-tech/mochi/pkg/metrics"
_ "github.com/sot-tech/mochi/pkg/randseed"
"github.com/sot-tech/mochi/pkg/stop"
"github.com/sot-tech/mochi/storage"
l "github.com/sot-tech/mochi/pkg/log"
)
var e2eCmd *cobra.Command
// Run represents the state of a running instance of Conf.
type Run struct {
configFilePath string
storage storage.PeerStorage
logic *middleware.Logic
sg *stop.Group
}
// NewRun runs an instance of Conf.
func NewRun(configFilePath string) (*Run, error) {
r := &Run{
configFilePath: configFilePath,
}
return r, r.Start(nil)
}
// Start begins an instance of Conf.
// It is optional to provide an instance of the peer store to avoid the
// creation of a new one.
func (r *Run) Start(ps storage.PeerStorage) error {
configFile, err := ParseConfigFile(r.configFilePath)
if err != nil {
return fmt.Errorf("failed to read config: %w", err)
}
cfg := configFile.Conf
r.sg = stop.NewGroup()
if len(cfg.MetricsAddr) > 0 {
log.Info().Str("addr", cfg.MetricsAddr).Msg("starting metrics server")
r.sg.Add(metrics.NewServer(cfg.MetricsAddr))
} else {
log.Info().Msg("metrics disabled because of empty address")
}
if ps == nil {
log.Info().Str("name", cfg.Storage.Name).Msg("starting storage")
ps, err = storage.NewStorage(cfg.Storage.Name, cfg.Storage.Config)
if err != nil {
return fmt.Errorf("failed to create storage: %w", err)
}
log.Info().Object("config", ps).Msg("started storage")
}
r.storage = ps
preHooks, err := middleware.HooksFromHookConfigs(cfg.PreHooks, r.storage)
if err != nil {
return fmt.Errorf("failed to validate hook config: %w", err)
}
postHooks, err := middleware.HooksFromHookConfigs(cfg.PostHooks, r.storage)
if err != nil {
return fmt.Errorf("failed to validate hook config: %w", err)
}
r.logic = middleware.NewLogic(cfg.AnnounceInterval, cfg.MinAnnounceInterval, r.storage, preHooks, postHooks)
if len(cfg.HTTPConfig) > 0 {
log.Info().Object("config", cfg.HTTPConfig).Msg("starting HTTP frontend")
httpFE, err := http.NewFrontend(r.logic, cfg.HTTPConfig)
if err == nil {
r.sg.Add(httpFE)
} else if !errors.Is(err, conf.ErrNilConfigMap) {
return err
}
}
if len(cfg.UDPConfig) > 0 {
log.Info().Object("config", cfg.HTTPConfig).Msg("starting UDP frontend")
udpFE, err := udp.NewFrontend(r.logic, cfg.UDPConfig)
if err == nil {
r.sg.Add(udpFE)
} else if !errors.Is(err, conf.ErrNilConfigMap) {
return err
}
}
return nil
}
func combineErrors(prefix string, errs []error) error {
errStrs := make([]string, 0, len(errs))
for _, err := range errs {
errStrs = append(errStrs, err.Error())
}
return errors.New(prefix + ": " + strings.Join(errStrs, "; "))
}
// Stop shuts down an instance of Conf.
func (r *Run) Stop(keepPeerStore bool) (storage.PeerStorage, error) {
log.Debug().Msg("stopping frontends and metrics server")
if errs := r.sg.Stop().Wait(); len(errs) != 0 {
return nil, combineErrors("failed while shutting down frontends", errs)
}
log.Debug().Msg("stopping logic")
if errs := r.logic.Stop().Wait(); len(errs) != 0 {
return nil, combineErrors("failed while shutting down middleware", errs)
}
if !keepPeerStore {
log.Debug().Msg("stopping peer store")
if errs := r.storage.Stop().Wait(); len(errs) != 0 {
return nil, combineErrors("failed while shutting down peer store", errs)
}
r.storage = nil
}
return r.storage, nil
}
// RootRunCmdFunc implements a Cobra command that runs an instance of Conf
// and handles reloading and shutdown via process signals.
func RootRunCmdFunc(cmd *cobra.Command, _ []string) error {
configFilePath, err := cmd.Flags().GetString(configArg)
if err != nil {
return err
}
r, err := NewRun(configFilePath)
if err != nil {
return err
}
shutdown, _ := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
reload, _ := signal.NotifyContext(context.Background(), ReloadSignals...)
for {
select {
case <-reload.Done():
log.Info().Msg("reloading; received reload signal")
peerStore, err := r.Stop(true)
if err != nil {
return err
}
if err := r.Start(peerStore); err != nil {
return err
}
case <-shutdown.Done():
log.Info().Msg("shutting down; received shutdown signal")
if _, err := r.Stop(false); err != nil {
return err
}
return nil
}
}
}
const (
appName = "mochi"
logOutArg = "logOut"
logLevelArg = "logLevel"
logPrettyArg = "logPretty"
@@ -184,59 +19,25 @@ const (
configArg = "config"
)
// configureLogger handles command line flags for the logger.
func configureLogger(cmd *cobra.Command, _ []string) (err error) {
var out, lvl string
var pretty, colored bool
flags := cmd.Flags()
out, err = flags.GetString(logOutArg)
if err != nil {
return err
}
lvl, err = flags.GetString(logLevelArg)
if err != nil {
return err
}
pretty, err = flags.GetBool(logPrettyArg)
if err != nil {
return err
}
colored, err = cmd.Flags().GetBool(logColorsArg)
if err != nil {
return err
}
return log.ConfigureLogger(out, lvl, pretty, colored)
}
func main() {
rootCmd := &cobra.Command{
Use: appName,
Short: "BitTorrent Tracker",
Long: "A customizable, multi-protocol BitTorrent Tracker",
PersistentPreRunE: configureLogger,
RunE: RootRunCmdFunc,
var s Server
logOut := flag.String(logOutArg, "stderr", "output for logging, might be 'stderr', 'stdout' or file path")
logLevel := flag.String(logLevelArg, "warn", "logging level: trace, debug, info, warn, error, fatal, panic")
logPretty := flag.Bool(logPrettyArg, false, "enable log pretty print. used only if 'logOut' set to 'stdout' or 'stderr'. if not set, log outputs json")
logColored := flag.Bool(logColorsArg, runtime.GOOS == "windows", "enable log coloring. used only if set 'logPretty'")
configPath := flag.String(configArg, "/etc/mochi.yaml", "location of configuration file")
flag.Parse()
if err := l.ConfigureLogger(*logOut, *logLevel, *logPretty, *logColored); err != nil {
log.Fatal("unable to configure logger ", err)
}
flags := rootCmd.PersistentFlags()
flags.String(logOutArg, "", "output for logging, might be 'stderr', 'stdout' of file path. 'stderr' if not set")
flags.String(logLevelArg, "info", "logging level (trace, debug, info, warn, error, fatal, panic). 'warn' if not set")
flags.Bool(logPrettyArg, false, "enable log pretty print. used only if 'logOut' set to 'stdout' or 'stderr'. if not set, log outputs json)")
flags.Bool(logColorsArg, runtime.GOOS == "windows", "enable log coloring. used only if set 'logPretty'")
rootCmd.Flags().String("config", "/etc/mochi.yaml", "location of configuration file")
if e2eCmd != nil {
rootCmd.AddCommand(e2eCmd)
}
if err := rootCmd.Execute(); err != nil {
log.Fatal().Err(err).Msg("failed while executing root command")
if err := s.Run(*configPath); err != nil {
log.Fatal("unable to start server ", err)
}
defer s.Dispose()
ch := make(chan os.Signal, 2)
signal.Notify(ch, os.Interrupt, syscall.SIGTERM)
<-ch
}
+106
View File
@@ -0,0 +1,106 @@
package main
import (
"errors"
"fmt"
"github.com/sot-tech/mochi/frontend/http"
"github.com/sot-tech/mochi/frontend/udp"
"github.com/sot-tech/mochi/middleware"
"github.com/sot-tech/mochi/pkg/log"
"github.com/sot-tech/mochi/pkg/metrics"
"github.com/sot-tech/mochi/pkg/stop"
"github.com/sot-tech/mochi/storage"
)
// Server represents the state of a running instance.
type Server struct {
storage storage.PeerStorage
logic *middleware.Logic
sg *stop.Group
}
// Run begins an instance of Conf.
// It is optional to provide an instance of the peer store to avoid the
// creation of a new one.
func (r *Server) Run(configFilePath string) error {
configFile, err := ParseConfigFile(configFilePath)
if err != nil {
return fmt.Errorf("failed to read config: %w", err)
}
cfg := configFile.Conf
r.sg = stop.NewGroup()
if len(cfg.MetricsAddr) > 0 {
log.Info().Str("address", cfg.MetricsAddr).Msg("starting metrics server")
r.sg.Add(metrics.NewServer(cfg.MetricsAddr))
} else {
log.Info().Msg("metrics disabled because of empty address")
}
log.Info().Str("name", cfg.Storage.Name).Msg("starting storage")
r.storage, err = storage.NewStorage(cfg.Storage.Name, cfg.Storage.Config)
if err != nil {
return fmt.Errorf("failed to create storage: %w", err)
}
log.Info().Object("config", r.storage).Msg("started storage")
preHooks, err := middleware.NewHooks(cfg.PreHooks, r.storage)
if err != nil {
return fmt.Errorf("failed to validate hook config: %w", err)
}
postHooks, err := middleware.NewHooks(cfg.PostHooks, r.storage)
if err != nil {
return fmt.Errorf("failed to validate hook config: %w", err)
}
r.logic = middleware.NewLogic(cfg.AnnounceInterval, cfg.MinAnnounceInterval, r.storage, preHooks, postHooks)
var started bool
if len(cfg.HTTPConfig) > 0 {
log.Info().Object("config", cfg.HTTPConfig).Msg("starting HTTP frontend")
httpFE, err := http.NewFrontend(r.logic, cfg.HTTPConfig)
if err == nil {
r.sg.Add(httpFE)
started = true
} else {
return err
}
}
if len(cfg.UDPConfig) > 0 {
log.Info().Object("config", cfg.UDPConfig).Msg("starting UDP frontend")
udpFE, err := udp.NewFrontend(r.logic, cfg.UDPConfig)
if err == nil {
r.sg.Add(udpFE)
started = true
} else {
return err
}
}
if !started {
return errors.New("no frontends configured")
}
return nil
}
// Dispose shuts down an instance of Server.
func (r *Server) Dispose() {
log.Debug().Msg("stopping frontends and metrics server")
if errs := r.sg.Stop().Wait(); len(errs) > 0 {
log.Error().Errs("errors", errs).Msg("error occurred while shutting down frontends")
}
log.Debug().Msg("stopping logic")
if errs := r.logic.Stop().Wait(); len(errs) > 0 {
log.Error().Errs("errors", errs).Msg("error occurred while shutting down middlewares")
}
log.Debug().Msg("stopping peer store")
if errs := r.storage.Stop().Wait(); len(errs) != 0 {
log.Error().Errs("errors", errs).Msg("error occurred while shutting down peer store")
}
log.Close()
}
-14
View File
@@ -1,14 +0,0 @@
//go:build darwin || freebsd || linux || netbsd || openbsd || dragonfly || solaris
package main
import (
"os"
"syscall"
)
// ReloadSignals are the signals that the current OS will send to the process
// when a configuration reload is requested.
var ReloadSignals = []os.Signal{
syscall.SIGUSR1,
}
-14
View File
@@ -1,14 +0,0 @@
//go:build windows
package main
import (
"os"
"syscall"
)
// ReloadSignals are the signals that the current OS will send to the process
// when a configuration reload is requested.
var ReloadSignals = []os.Signal{
syscall.SIGHUP,
}
+3 -3
View File
@@ -24,9 +24,9 @@ There are two sources of hashes: `list` and `directory`.
* `list` is the static set of hashes, specified in configuration file.
* `directory` will watch for `*.torrent` files in specified path and
append/delete records from storage. This source will parse all existing
files at start and then watch for new files to add, or for delete events
to remove hash from storage.
append/delete records from storage. This source will parse all existing
files at start and then watch for new files to add, or for delete events
to remove hash from storage.
Note: if storage is not `memory`, and `preserve` option set to `true`, records
will be persisted in storage until _somebody_ or _something_ (different tool with access
+7 -7
View File
@@ -3,18 +3,18 @@
This storage mainly the same as Redis and uses some of [redis](redis.md) store logic
with next exceptions:
* peers stored in [sets](https://redis.io/docs/manual/data-types/#sets)
instead of [hashes](https://redis.io/docs/manual/data-types/#hashes);
* peers stored in [sets](https://redis.io/docs/manual/data-types/#sets)
instead of [hashes](https://redis.io/docs/manual/data-types/#hashes);
* keys such as `CHI_I`, `CHI_S_C` and `CHI_L_C` not used (at all);
* peer TTL relies on KeyDB's [EXPIREMEMBER](https://docs.keydb.dev/docs/commands/#expiremember)
command, so MoChi does not need to periodically check peer expiration;
* peer TTL relies on KeyDB's [EXPIREMEMBER](https://docs.keydb.dev/docs/commands/#expiremember)
command, so MoChi does not need to periodically check peer expiration;
* storage does not execute periodical statistics collection (peer/lecher/info hash count)
because:
* manual calculation (INC/DEC peers count) is not usable
* manual scan of all keys is quite expensive operation.
because:
* manual calculation (INC/DEC peers count) is not usable
* manual scan of all keys is quite expensive operation.
## Use Case
-1
View File
@@ -61,7 +61,6 @@ func (cfg Config) Validate() Config {
Dur("provided", cfg.ReadTimeout).
Dur("default", validcfg.ReadTimeout).
Msg("falling back to default configuration")
}
if cfg.WriteTimeout <= 0 {
+5
View File
@@ -8,8 +8,13 @@ import (
"github.com/stretchr/testify/require"
"github.com/sot-tech/mochi/bittorrent"
"github.com/sot-tech/mochi/pkg/log"
)
func init() {
_ = log.ConfigureLogger("", "warn", false, false)
}
func TestWriteError(t *testing.T) {
table := []struct {
reason, expected string
+1 -1
View File
@@ -52,7 +52,7 @@ func (cfg Config) Validate() Config {
}
validcfg.PrivateKey = string(pkeyRunes)
log.Warn().
logger.Warn().
Str("name", "UDP.PrivateKey").
Str("provided", "").
Str("key", validcfg.PrivateKey).
+5
View File
@@ -6,11 +6,16 @@ import (
"github.com/sot-tech/mochi/frontend/udp"
"github.com/sot-tech/mochi/middleware"
"github.com/sot-tech/mochi/pkg/conf"
"github.com/sot-tech/mochi/pkg/log"
_ "github.com/sot-tech/mochi/pkg/randseed"
"github.com/sot-tech/mochi/storage"
_ "github.com/sot-tech/mochi/storage/memory"
)
func init() {
_ = log.ConfigureLogger("", "warn", false, false)
}
func TestStartStopRaceIssue437(t *testing.T) {
ps, err := storage.NewStorage("memory", conf.MapConfig{})
if err != nil {
+3 -8
View File
@@ -14,8 +14,6 @@ require (
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.12.1
github.com/rs/zerolog v1.26.1
github.com/sirupsen/logrus v1.6.0
github.com/spf13/cobra v1.4.0
github.com/stretchr/testify v1.7.1
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b
)
@@ -24,24 +22,21 @@ require (
github.com/anacrolix/dht/v2 v2.17.0 // indirect
github.com/anacrolix/log v0.13.1 // indirect
github.com/anacrolix/missinggo v1.3.0 // indirect
github.com/anacrolix/missinggo/v2 v2.6.0 // indirect
github.com/anacrolix/missinggo/v2 v2.7.0 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/bradfitz/iter v0.0.0-20191230175014-e8f45d346db8 // indirect
github.com/cespare/xxhash/v2 v2.1.2 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/fsnotify/fsnotify v1.5.3 // indirect
github.com/fsnotify/fsnotify v1.5.4 // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/huandu/xstrings v1.3.2 // indirect
github.com/inconshreveable/mousetrap v1.0.0 // indirect
github.com/klauspost/cpuid/v2 v2.0.12 // indirect
github.com/konsorten/go-windows-terminal-sequences v1.0.3 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/client_model v0.2.0 // indirect
github.com/prometheus/common v0.34.0 // indirect
github.com/prometheus/procfs v0.7.3 // indirect
github.com/spf13/pflag v1.0.5 // indirect
golang.org/x/sys v0.0.0-20220422013727-9388b58f7150 // indirect
golang.org/x/sys v0.0.0-20220429233432-b5fbb4746d32 // indirect
google.golang.org/protobuf v1.28.0 // indirect
)
+7 -17
View File
@@ -67,8 +67,8 @@ github.com/anacrolix/missinggo v1.3.0/go.mod h1:bqHm8cE8xr+15uVfMG3BFui/TxyB6//H
github.com/anacrolix/missinggo/perf v1.0.0/go.mod h1:ljAFWkBuzkO12MQclXzZrosP5urunoLS0Cbvb4V0uMQ=
github.com/anacrolix/missinggo/v2 v2.2.0/go.mod h1:o0jgJoYOyaoYQ4E2ZMISVa9c88BbUBVQQW4QeRkNCGY=
github.com/anacrolix/missinggo/v2 v2.5.1/go.mod h1:WEjqh2rmKECd0t1VhQkLGTdIWXO6f6NLjp5GlMZ+6FA=
github.com/anacrolix/missinggo/v2 v2.6.0 h1:kHkn6nLy1isWYV4mthZX8itV1bRd2mwFVuXrxzJ4VX0=
github.com/anacrolix/missinggo/v2 v2.6.0/go.mod h1:2IZIvmRTizALNYFYXsPR7ofXPzJgyBpKZ4kMqMEICkI=
github.com/anacrolix/missinggo/v2 v2.7.0 h1:4fzOAAn/VCvfWGviLmh64MPMttrlYew81JdPO7nSHvI=
github.com/anacrolix/missinggo/v2 v2.7.0/go.mod h1:2IZIvmRTizALNYFYXsPR7ofXPzJgyBpKZ4kMqMEICkI=
github.com/anacrolix/stm v0.2.0/go.mod h1:zoVQRvSiGjGoTmbM0vSLIiaKjWtNPeTvXUSdJQA4hsg=
github.com/anacrolix/tagflag v0.0.0-20180109131632-2146c8d41bf0/go.mod h1:1m2U/K6ZT+JZG0+bdMK6qauP49QT4wE5pmhJXOKKCHw=
github.com/anacrolix/tagflag v1.0.0/go.mod h1:1m2U/K6ZT+JZG0+bdMK6qauP49QT4wE5pmhJXOKKCHw=
@@ -95,7 +95,6 @@ github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMn
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc=
github.com/coreos/go-systemd/v22 v22.3.2/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc=
github.com/cpuguy83/go-md2man/v2 v2.0.1/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o=
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
@@ -114,8 +113,8 @@ github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1m
github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
github.com/frankban/quicktest v1.14.0 h1:+cqqvzZV87b4adx/5ayVOaYZ2CrvM4ejQvUdBzPPUss=
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
github.com/fsnotify/fsnotify v1.5.3 h1:vNFpj2z7YIbwh2bw7x35sqYpp2wfuq+pivKbWG09B8c=
github.com/fsnotify/fsnotify v1.5.3/go.mod h1:T3375wBYaZdLLcVNkcVbzGHY7f1l/uK5T5Ai1i3InKU=
github.com/fsnotify/fsnotify v1.5.4 h1:jRbGcIw6P2Meqdwuo0H1p6JVLbL5DHKAKlYndzMwVZI=
github.com/fsnotify/fsnotify v1.5.4/go.mod h1:OVB6XrOHzAwXMpEM7uPOzcehqUV2UqJxmVXmkdnm1bU=
github.com/glycerine/go-unsnap-stream v0.0.0-20180323001048-9f0cb55181dd/go.mod h1:/20jfyN9Y5QPEAprSgKAUr+glWDY39ZiUEAYOEv5dsE=
github.com/glycerine/go-unsnap-stream v0.0.0-20181221182339-f9677308dec2/go.mod h1:/20jfyN9Y5QPEAprSgKAUr+glWDY39ZiUEAYOEv5dsE=
github.com/glycerine/go-unsnap-stream v0.0.0-20190901134440-81cf024a9e0a/go.mod h1:/20jfyN9Y5QPEAprSgKAUr+glWDY39ZiUEAYOEv5dsE=
@@ -210,8 +209,6 @@ github.com/huandu/xstrings v1.3.1/go.mod h1:y5/lhBue+AyNmUVz9RLU9xbLR0o4KIIExikq
github.com/huandu/xstrings v1.3.2 h1:L18LIDzqlW6xN2rEkpdV8+oL/IXWJ1APd+vsdYy4Wdw=
github.com/huandu/xstrings v1.3.2/go.mod h1:y5/lhBue+AyNmUVz9RLU9xbLR0o4KIIExikq4ovT0aE=
github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc=
github.com/inconshreveable/mousetrap v1.0.0 h1:Z8tu5sraLXCXIcARxBp/8cbvlwVa7Z1NHg9XEKhtSvM=
github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8=
github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4=
github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU=
github.com/json-iterator/go v1.1.9/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4=
@@ -230,7 +227,6 @@ github.com/klauspost/cpuid/v2 v2.0.4/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa02
github.com/klauspost/cpuid/v2 v2.0.12 h1:p9dKCg8i4gmOxtv35DvrYoWqYzQrvEVdjQ762Y0OqZE=
github.com/klauspost/cpuid/v2 v2.0.12/go.mod h1:g2LTdtYhdyuGPqyWyv7qRAmj1WBqxuObKfj5c0PQa7c=
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/konsorten/go-windows-terminal-sequences v1.0.3 h1:CE8S1cTafDpPvMhIxNJKvHsGVBgn1xWYf1NbHQhywc8=
github.com/konsorten/go-windows-terminal-sequences v1.0.3/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
@@ -312,20 +308,14 @@ github.com/rogpeppe/go-internal v1.8.0/go.mod h1:WmiCO8CzOY8rg0OYDC4/i/2WRWAB6po
github.com/rs/xid v1.3.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg=
github.com/rs/zerolog v1.26.1 h1:/ihwxqH+4z8UxyI70wM1z9yCvkWcfz/a3mj48k/Zngc=
github.com/rs/zerolog v1.26.1/go.mod h1:/wSSJWX7lVrsOwlbyTRSOJvqRlc+WjWlfes+CiJ+tmc=
github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
github.com/ryszard/goskiplist v0.0.0-20150312221310-2dfbae5fcf46/go.mod h1:uAQ5PCi+MFsC7HjREoAz1BU+Mq60+05gifQSsHSDG/8=
github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE=
github.com/sirupsen/logrus v1.6.0 h1:UBcNElsrwanuuMsnGSlYmtmgbb23qDR5dG+6X6Oo89I=
github.com/sirupsen/logrus v1.6.0/go.mod h1:7uNnSEd1DgxDLC74fIahvMZmmYsHGZGEOFrfsX/uA88=
github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d/go.mod h1:OnSkiWE9lh6wB0YB77sQom3nweQdgAjqCqsofrRNTgc=
github.com/smartystreets/assertions v0.0.0-20190215210624-980c5ac6f3ac/go.mod h1:OnSkiWE9lh6wB0YB77sQom3nweQdgAjqCqsofrRNTgc=
github.com/smartystreets/goconvey v0.0.0-20181108003508-044398e4856c/go.mod h1:XDJAKZRPZ1CvBcN2aX5YOUTYGHki24fSF0Iv48Ibg0s=
github.com/smartystreets/goconvey v0.0.0-20190306220146-200a235640ff/go.mod h1:KSQcGKpxUMHk3nbYzs/tIBAM2iDooCn0BmttHOJEbLs=
github.com/spf13/cobra v1.4.0 h1:y+wJpx64xcgO1V+RcnwW0LEHxTKRi2ZDPSBjWnrg88Q=
github.com/spf13/cobra v1.4.0/go.mod h1:Wo4iy3BUC+X2Fybo0PDqwJIv3dNRiZLHQymsfxlB84g=
github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA=
github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.2.1/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
@@ -481,12 +471,12 @@ golang.org/x/sys v0.0.0-20210124154548-22da62e12c0c/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210603081109-ebe580a85c40/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210809222454-d867a43fc93e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220114195835-da31bd327af9/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220422013727-9388b58f7150 h1:xHms4gcpe1YE7A3yIllJXP16CMAGuqwO2lX1mTyyRRc=
golang.org/x/sys v0.0.0-20220422013727-9388b58f7150/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220412211240-33da011f77ad/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220429233432-b5fbb4746d32 h1:Js08h5hqB5xyWR789+QqueR6sDE8mk+YvpETZ+F6X9Y=
golang.org/x/sys v0.0.0-20220429233432-b5fbb4746d32/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
+1 -8
View File
@@ -5,7 +5,6 @@ import (
"errors"
"github.com/sot-tech/mochi/bittorrent"
"github.com/sot-tech/mochi/pkg/log"
"github.com/sot-tech/mochi/storage"
)
@@ -185,16 +184,10 @@ func (h *responseHook) appendPeers(req *bittorrent.AnnounceRequest, resp *bittor
resp.IPv4Peers = append(resp.IPv4Peers, p)
uniquePeers[p] = nil
} else {
log.Warn("received invalid peer from storage", log.Fields{"peer": p})
logger.Warn().Object("peer", p).Msg("received invalid peer from storage")
}
}
}
log.Debug("responseHook announce peers", log.Fields{
"infoHash": req.InfoHash,
"requestPeer": req.RequestPeer,
"ipv4Peers": resp.IPv4Peers,
"ipv6Peers": resp.IPv6Peers,
})
return
}
+2
View File
@@ -38,6 +38,7 @@ type Logic struct {
// HandleAnnounce generates a response for an Announce.
func (l *Logic) HandleAnnounce(ctx context.Context, req *bittorrent.AnnounceRequest) (_ context.Context, resp *bittorrent.AnnounceResponse, err error) {
logger.Debug().Object("request", req).Msg("new announce request")
resp = &bittorrent.AnnounceResponse{
Interval: l.announceInterval,
MinInterval: l.minAnnounceInterval,
@@ -70,6 +71,7 @@ func (l *Logic) AfterAnnounce(ctx context.Context, req *bittorrent.AnnounceReque
// HandleScrape generates a response for a Scrape.
func (l *Logic) HandleScrape(ctx context.Context, req *bittorrent.ScrapeRequest) (_ context.Context, resp *bittorrent.ScrapeResponse, err error) {
logger.Debug().Object("request", req).Msg("new scrape request")
resp = &bittorrent.ScrapeResponse{
Files: make([]bittorrent.Scrape, 0, len(req.InfoHashes)),
}
+5
View File
@@ -9,8 +9,13 @@ import (
"github.com/stretchr/testify/require"
"github.com/sot-tech/mochi/bittorrent"
"github.com/sot-tech/mochi/pkg/log"
)
func init() {
_ = log.ConfigureLogger("", "warn", false, false)
}
// nopHook is a Hook to measure the overhead of a no-operation Hook through
// benchmarks.
type nopHook struct{}
+5 -5
View File
@@ -47,11 +47,11 @@ func RegisterBuilder(name string, d Builder) {
drivers[name] = d
}
// New attempts to initialize a new middleware instance from the
// NewHook attempts to initialize a new middleware instance from the
// list of registered Builders.
//
// If a driver does not exist, returns ErrBuilderDoesNotExist.
func New(name string, options conf.MapConfig, storage storage.PeerStorage) (Hook, error) {
func NewHook(name string, options conf.MapConfig, storage storage.PeerStorage) (Hook, error) {
driversM.RLock()
defer driversM.RUnlock()
@@ -70,9 +70,9 @@ type Config struct {
Options conf.MapConfig
}
// HooksFromHookConfigs is a utility function for initializing Hooks in bulk.
// NewHooks is a utility function for initializing Hooks in bulk.
// each element of configs must contain pairs `name` - string and `options` - map[string]any
func HooksFromHookConfigs(configs []conf.MapConfig, storage storage.PeerStorage) (hooks []Hook, err error) {
func NewHooks(configs []conf.MapConfig, storage storage.PeerStorage) (hooks []Hook, err error) {
for _, cfg := range configs {
var c Config
@@ -81,7 +81,7 @@ func HooksFromHookConfigs(configs []conf.MapConfig, storage storage.PeerStorage)
}
var h Hook
h, err = New(c.Name, c.Options, storage)
h, err = NewHook(c.Name, c.Options, storage)
if err != nil {
break
}
@@ -57,36 +57,37 @@ func build(conf conf.MapConfig, st storage.DataStorage) (container.Container, er
}
d.watcher = w
if len(d.StorageCtx) == 0 {
log.Info("storage context not set, using default value: " + container.DefaultStorageCtxName)
logger.Warn().
Str("name", "StorageCtx").
Str("provided", d.StorageCtx).
Str("default", container.DefaultStorageCtxName).
Msg("falling back to default configuration")
d.StorageCtx = container.DefaultStorageCtxName
}
go func() {
for event := range d.watcher.Events {
var mi *metainfo.MetaInfo
lf := log.Fields{
"file": event.TorrentFilePath,
"v1hash": event.InfoHash,
}
if mi, err = metainfo.LoadFromFile(event.TorrentFilePath); err == nil {
s256 := sha256.New()
s256.Write(mi.InfoBytes)
v2hash, _ := bittorrent.NewInfoHash(s256.Sum(nil))
lf["v2hash"] = v2hash
lf["v2to1hash"] = v2hash.TruncateV1()
switch event.Change {
case dirwatch.Added:
var name string
if info, err := mi.UnmarshalInfo(); err == nil {
name = info.Name
} else {
lf["error"] = err
log.Warn("unable to unmarshal torrent info", lf)
delete(lf, "error")
logger.Error().
Err(err).
Str("file", event.TorrentFilePath).
Stringer("infoHash", event.InfoHash).
Stringer("infoHashV2", v2hash).
Msg("unable to unmarshal torrent info")
}
if len(name) == 0 {
name = list.DUMMY
}
if err := d.Storage.Put(d.StorageCtx,
logger.Err(d.Storage.Put(d.StorageCtx,
storage.Entry{
Key: event.InfoHash.AsString(),
Value: name,
@@ -96,23 +97,28 @@ func build(conf conf.MapConfig, st storage.DataStorage) (container.Container, er
}, storage.Entry{
Key: v2hash.TruncateV1().RawString(),
Value: name,
}); err != nil {
lf["error"] = err
}
log.Debug("approval torrent added", lf)
})).
Str("action", "add").
Str("file", event.TorrentFilePath).
Stringer("infoHash", event.InfoHash).
Stringer("infoHashV2", v2hash).
Msg("approval torrent watcher event")
case dirwatch.Removed:
if err := d.Storage.Delete(c.StorageCtx,
logger.Err(d.Storage.Delete(c.StorageCtx,
event.InfoHash.AsString(),
v2hash.RawString(),
v2hash.TruncateV1().RawString(),
); err != nil {
lf["error"] = err
}
log.Debug("approval torrent deleted", lf)
)).
Str("action", "delete").
Str("file", event.TorrentFilePath).
Stringer("infoHash", event.InfoHash).
Stringer("infoHashV2", v2hash).
Msg("approval torrent watcher event")
}
} else {
lf["error"] = err
log.Error("unable to load torrent file", lf)
logger.Error().Err(err).
Str("file", event.TorrentFilePath).
Msg("unable to load torrent file")
}
}
}()
@@ -126,6 +132,8 @@ type directory struct {
// Stop closes watching of torrent directory
func (d *directory) Stop() stop.Result {
st := make(stop.Channel)
d.watcher.Close()
return stop.AlreadyStopped
st.Done()
return st.Result()
}
@@ -15,6 +15,8 @@ import (
// Name of this container for registry.
const Name = "list"
var logger = log.NewLogger("torrent approval list")
func init() {
container.Register(Name, build)
}
@@ -45,7 +47,11 @@ func build(conf conf.MapConfig, st storage.DataStorage) (container.Container, er
}
if len(l.StorageCtx) == 0 {
log.Info("Storage context not set, using default value: " + container.DefaultStorageCtxName)
logger.Warn().
Str("name", "StorageCtx").
Str("provided", l.StorageCtx).
Str("default", container.DefaultStorageCtxName).
Msg("falling back to default configuration")
l.StorageCtx = container.DefaultStorageCtxName
}
@@ -93,7 +99,7 @@ func (l *List) Approved(hash bittorrent.InfoHash) (contains bool) {
}
}
if err != nil {
log.Err(err)
logger.Error().Err(err).Stringer("infoHash", hash).Msg("unable load hash information from storage")
}
return contains != l.Invert
}
@@ -9,9 +9,14 @@ import (
"github.com/sot-tech/mochi/bittorrent"
"github.com/sot-tech/mochi/pkg/conf"
"github.com/sot-tech/mochi/pkg/log"
"github.com/sot-tech/mochi/storage/memory"
)
func init() {
_ = log.ConfigureLogger("", "warn", false, false)
}
var cases = []struct {
cfg baseConfig
ih string
+1
View File
@@ -19,6 +19,7 @@ var ErrNilConfigMap = errors.New("unable to process nil map")
// MapConfig is just alias for map[string]any
type MapConfig map[string]any
// MarshalZerologObject writes map into zerolog event
func (m MapConfig) MarshalZerologObject(e *zerolog.Event) {
e.Fields(map[string]any(m))
}
+233 -10
View File
@@ -1,21 +1,37 @@
// Package log adds a thin wrapper around logrus to improve non-debug logging
// performance.
// Package log adds a thin wrapper around zerolog to improve logging performance.
//
// Root logger (called by log.Info, log.Warn etc.) uses global zerolog.Logger instance
// until ConfigureLogger called. Any child logger created with NewLogger will not
// produce any events until logger configured, so any function which uses child
// logger will come stuck because of root initialization synchronization.
package log
import (
"io"
"os"
"strings"
"time"
"sync"
// needs for async file logging
_ "code.cloudfoundry.org/go-diodes"
"github.com/rs/zerolog"
"github.com/rs/zerolog/diode"
zl "github.com/rs/zerolog/log"
)
var root = zl.Logger
var (
root = zl.Logger
rootWg = sync.WaitGroup{}
customOut io.WriteCloser
customOutMu = sync.Mutex{}
)
func init() {
rootWg.Add(1)
}
// ConfigureLogger initializes root and all child loggers.
// NOTE: this function MUST be called before any child log call
// otherwise any goroutine, which uses logger will wait logger initialization
func ConfigureLogger(output, level string, formatted, colored bool) (err error) {
lvl := zerolog.WarnLevel
output = strings.ToLower(output)
@@ -27,11 +43,13 @@ func ConfigureLogger(output, level string, formatted, colored bool) (err error)
case "stdout":
w, stdAny = os.Stdout, true
default:
if w, err = os.OpenFile(output, os.O_APPEND|os.O_CREATE, 0600); err == nil {
w = diode.NewWriter(w, 1000, 10*time.Millisecond, func(missed int) {
if w, err = os.OpenFile(output, os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0o600); err == nil {
customOutMu.Lock()
defer customOutMu.Unlock()
customOut = diode.NewWriter(w, 1000, 0, func(missed int) {
zl.Warn().Int("count", missed).Msg("Logger dropped messages")
})
w = customOut
} else {
return err
}
@@ -52,29 +70,234 @@ func ConfigureLogger(output, level string, formatted, colored bool) (err error)
}
root = zerolog.New(w).With().Timestamp().Logger()
zerolog.SetGlobalLevel(lvl)
rootWg.Done()
return nil
}
// Logger is the holder for zerolog.Logger which
// waits until root logger initialized to prevent
// mixed logging format and output
type Logger struct {
comp string
zlOnce sync.Once
zerolog.Logger
}
func (l *Logger) init() {
l.zlOnce.Do(func() {
rootWg.Wait()
l.Logger = root.With().Str("component", l.comp).Logger()
})
}
// ==== copied from zerolog ====
// Trace starts a new message with trace level.
//
// You must call Msg on the returned event in order to send the event.
func (l *Logger) Trace() *zerolog.Event {
l.init()
return l.Logger.Trace()
}
// Debug starts a new message with debug level.
//
// You must call Msg on the returned event in order to send the event.
func (l *Logger) Debug() *zerolog.Event {
l.init()
return l.Logger.Debug()
}
// Info starts a new message with info level.
//
// You must call Msg on the returned event in order to send the event.
func (l *Logger) Info() *zerolog.Event {
l.init()
return l.Logger.Info()
}
// Warn starts a new message with warn level.
//
// You must call Msg on the returned event in order to send the event.
func (l *Logger) Warn() *zerolog.Event {
l.init()
return l.Logger.Warn()
}
// Error starts a new message with error level.
//
// You must call Msg on the returned event in order to send the event.
func (l *Logger) Error() *zerolog.Event {
l.init()
return l.Logger.Error()
}
// Err starts a new message with error level with err as a field if not nil or
// with info level if err is nil.
//
// You must call Msg on the returned event in order to send the event.
func (l *Logger) Err(err error) *zerolog.Event {
l.init()
return l.Logger.Err(err)
}
// Fatal starts a new message with fatal level. The os.Exit(1) function
// is called by the Msg method, which terminates the program immediately.
//
// You must call Msg on the returned event in order to send the event.
func (l *Logger) Fatal() *zerolog.Event {
l.init()
return l.Logger.Fatal()
}
// Panic starts a new message with panic level. The panic() function
// is called by the Msg method, which stops the ordinary flow of a goroutine.
//
// You must call Msg on the returned event in order to send the event.
func (l *Logger) Panic() *zerolog.Event {
l.init()
return l.Logger.Panic()
}
// WithLevel starts a new message with level. Unlike Fatal and Panic
// methods, WithLevel does not terminate the program or stop the ordinary
// flow of a gourotine when used with their respective levels.
//
// You must call Msg on the returned event in order to send the event.
func (l *Logger) WithLevel(level zerolog.Level) *zerolog.Event {
l.init()
return l.Logger.WithLevel(level)
}
// Log starts a new message with no level. Setting GlobalLevel to Disabled
// will still disable events produced by this method.
//
// You must call Msg on the returned event in order to send the event.
func (l *Logger) Log() *zerolog.Event {
l.init()
return l.Logger.Log()
}
// Print sends a log event using debug level and no extra field.
// Arguments are handled in the manner of fmt.Print.
func (l *Logger) Print(v ...interface{}) {
l.init()
l.Logger.Print(v...)
}
// Printf sends a log event using debug level and no extra field.
// Arguments are handled in the manner of fmt.Printf.
func (l *Logger) Printf(format string, v ...interface{}) {
l.init()
l.Logger.Printf(format, v...)
}
// Write implements the io.Writer interface. This is useful to set as a writer
// for the standard library log.
func (l *Logger) Write(p []byte) (n int, err error) {
l.init()
return l.Logger.Write(p)
}
// Err starts a new message with error level with err as a field if not nil or
// with info level if err is nil.
//
// You must call Msg on the returned event in order to send the event.
func Err(err error) *zerolog.Event {
return root.Err(err)
}
// Trace starts a new message with trace level.
//
// You must call Msg on the returned event in order to send the event.
func Trace() *zerolog.Event {
return root.Trace()
}
// Debug starts a new message with debug level.
//
// You must call Msg on the returned event in order to send the event.
func Debug() *zerolog.Event {
return root.Debug()
}
// Info starts a new message with info level.
//
// You must call Msg on the returned event in order to send the event.
func Info() *zerolog.Event {
return root.Info()
}
// Warn starts a new message with warn level.
//
// You must call Msg on the returned event in order to send the event.
func Warn() *zerolog.Event {
return root.Warn()
}
// Error starts a new message with error level.
//
// You must call Msg on the returned event in order to send the event.
func Error() *zerolog.Event {
return root.Error()
}
// Fatal starts a new message with fatal level. The os.Exit(1) function
// is called by the Msg method.
//
// You must call Msg on the returned event in order to send the event.
func Fatal() *zerolog.Event {
return root.Fatal()
}
func NewLogger(component string) zerolog.Logger {
return root.With().Str("component", component).Logger()
// Panic starts a new message with panic level. The message is also sent
// to the panic function.
//
// You must call Msg on the returned event in order to send the event.
func Panic() *zerolog.Event {
return root.Panic()
}
// WithLevel starts a new message with level.
//
// You must call Msg on the returned event in order to send the event.
func WithLevel(level zerolog.Level) *zerolog.Event {
return root.WithLevel(level)
}
// Log starts a new message with no level. Setting zerolog.GlobalLevel to
// zerolog.Disabled will still disable events produced by this method.
//
// You must call Msg on the returned event in order to send the event.
func Log() *zerolog.Event {
return root.Log()
}
// Print sends a log event using debug level and no extra field.
// Arguments are handled in the manner of fmt.Print.
func Print(v ...interface{}) {
root.Print(v...)
}
// Printf sends a log event using debug level and no extra field.
// Arguments are handled in the manner of fmt.Printf.
func Printf(format string, v ...interface{}) {
root.Printf(format, v...)
}
// Close closes custom output writer if it configured
func Close() {
customOutMu.Lock()
defer customOutMu.Unlock()
if customOut != nil {
_ = customOut.Close()
customOut = nil
}
}
// NewLogger creates child logger with specified component name
// NOTE: root logger MUST be initialized with ConfigureLogger
// before any logger call
func NewLogger(component string) *Logger {
return &Logger{comp: component}
}
+5 -2
View File
@@ -16,7 +16,10 @@ import (
"github.com/sot-tech/mochi/pkg/stop"
)
var serverCounter = new(int32)
var (
logger = log.NewLogger("metrics")
serverCounter = new(int32)
)
// Enabled indicates that configured at least one metrics server
func Enabled() bool {
@@ -74,7 +77,7 @@ func NewServer(addr string) *Server {
atomic.AddInt32(serverCounter, 1)
defer atomic.AddInt32(serverCounter, -1)
if err := s.srv.ListenAndServe(); !errors.Is(err, http.ErrServerClosed) {
log.Fatal("failed while serving prometheus", log.Err(err))
logger.Error().Err(err).Msg("failed while serving prometheus")
}
}()
+34 -32
View File
@@ -14,6 +14,7 @@ import (
"errors"
"github.com/go-redis/redis/v8"
"github.com/rs/zerolog"
"github.com/sot-tech/mochi/bittorrent"
"github.com/sot-tech/mochi/pkg/conf"
@@ -29,9 +30,12 @@ const (
expireMemberCmd = "EXPIREMEMBER"
)
// ErrNotKeyDB returned from initializer if connected does not support KeyDB
// specific command (EXPIREMEMBER)
var ErrNotKeyDB = errors.New("provided instance seems not KeyDB")
var (
logger = log.NewLogger(Name)
// ErrNotKeyDB returned from initializer if connected does not support KeyDB
// specific command (EXPIREMEMBER)
ErrNotKeyDB = errors.New("provided instance seems not KeyDB")
)
func init() {
// Register the storage driver.
@@ -72,10 +76,8 @@ func newStore(cfg r.Config) (*store, error) {
if err == nil {
st = &store{
Connection: rs,
logFields: cfg.LogFields(),
peerTTL: uint(cfg.PeerLifetime.Seconds()),
}
st.logFields["name"] = Name
}
return st, err
@@ -83,8 +85,12 @@ func newStore(cfg r.Config) (*store, error) {
type store struct {
r.Connection
logFields log.Fields
peerTTL uint
peerTTL uint
}
// MarshalZerologObject writes configuration into zerolog event
func (s store) MarshalZerologObject(e *zerolog.Event) {
e.Str("type", Name).Object("config", s.Config)
}
func (s store) setPeerTTL(infoHashKey, peerID string) error {
@@ -92,10 +98,10 @@ func (s store) setPeerTTL(infoHashKey, peerID string) error {
}
func (s store) addPeer(infoHashKey, peerID string) (err error) {
log.Debug("storage: KeyDB: PutPeer", log.Fields{
"infoHashKey": infoHashKey,
"peerID": peerID,
})
logger.Trace().
Str("infoHashKey", infoHashKey).
Str("peerID", peerID).
Msg("add peer")
if err = s.SAdd(context.TODO(), infoHashKey, peerID).Err(); err == nil {
err = s.setPeerTTL(infoHashKey, peerID)
}
@@ -103,10 +109,10 @@ func (s store) addPeer(infoHashKey, peerID string) (err error) {
}
func (s store) delPeer(infoHashKey, peerID string) error {
log.Debug("storage: KeyDB: DeletePeer", log.Fields{
"infoHashKey": infoHashKey,
"peerID": peerID,
})
logger.Trace().
Str("infoHashKey", infoHashKey).
Str("peerID", peerID).
Msg("del peer")
deleted, err := s.SRem(context.TODO(), infoHashKey, peerID).Uint64()
err = r.AsNil(err)
if err == nil && deleted == 0 {
@@ -133,10 +139,10 @@ func (s store) DeleteLeecher(ih bittorrent.InfoHash, peer bittorrent.Peer) error
}
func (s store) GraduateLeecher(ih bittorrent.InfoHash, peer bittorrent.Peer) (err error) {
log.Debug("storage: KeyDB: GraduateLeecher", log.Fields{
"infoHash": ih,
"peer": peer,
})
logger.Trace().
Stringer("infoHash", ih).
Object("peer", peer).
Msg("graduate leecher")
infoHash, peerID := ih.RawString(), peer.RawString()
ihSeederKey := r.InfoHashKey(infoHash, true, peer.Addr().Is6())
ihLeecherKey := r.InfoHashKey(infoHash, false, peer.Addr().Is6())
@@ -153,12 +159,12 @@ func (s store) GraduateLeecher(ih bittorrent.InfoHash, peer bittorrent.Peer) (er
// AnnouncePeers is the same function as redis.AnnouncePeers
func (s store) AnnouncePeers(ih bittorrent.InfoHash, seeder bool, numWant int, v6 bool) ([]bittorrent.Peer, error) {
log.Debug("storage: KeyDB: AnnouncePeers", log.Fields{
"infoHash": ih,
"seeder": seeder,
"numWant": numWant,
"v6": v6,
})
logger.Trace().
Stringer("infoHash", ih).
Bool("seeder", seeder).
Int("numWant", numWant).
Bool("v6", v6).
Msg("announce peers")
return s.GetPeers(ih, seeder, numWant, v6, func(ctx context.Context, infoHashKey string, maxCount int) *redis.StringSliceCmd {
return s.SRandMemberN(context.TODO(), infoHashKey, int64(maxCount))
@@ -167,9 +173,9 @@ func (s store) AnnouncePeers(ih bittorrent.InfoHash, seeder bool, numWant int, v
// ScrapeSwarm is the same function as redis.ScrapeSwarm except `SCard` call instead of `HLen`
func (s store) ScrapeSwarm(ih bittorrent.InfoHash) (leechers uint32, seeders uint32, snatched uint32) {
log.Debug("storage: KeyDB: ScrapeSwarm", log.Fields{
"infoHash": ih,
})
logger.Trace().
Stringer("infoHash", ih).
Msg("scrape swarm")
leechers, seeders = s.CountPeers(ih, s.SCard)
return
}
@@ -182,7 +188,3 @@ func (s *store) Stop() stop.Result {
}
return c.Result()
}
func (s store) LogFields() log.Fields {
return s.logFields
}
+22 -18
View File
@@ -11,6 +11,8 @@ import (
"sync"
"time"
"github.com/rs/zerolog"
"github.com/sot-tech/mochi/bittorrent"
"github.com/sot-tech/mochi/pkg/conf"
"github.com/sot-tech/mochi/pkg/log"
@@ -27,6 +29,8 @@ const (
defaultShardCount = 1024
)
var logger = log.NewLogger(Name)
func init() {
// Register the storage driver.
storage.RegisterBuilder(Name, builder)
@@ -45,12 +49,9 @@ type Config struct {
ShardCount int `cfg:"shard_count"`
}
// LogFields renders the current config as a set of Logrus fields.
func (cfg Config) LogFields() log.Fields {
return log.Fields{
"name": Name,
"shardCount": cfg.ShardCount,
}
// MarshalZerologObject writes configuration into zerolog event
func (cfg Config) MarshalZerologObject(e *zerolog.Event) {
e.Int("shardCount", cfg.ShardCount)
}
// Validate sanity checks values set in a config and returns a new config with
@@ -62,11 +63,11 @@ func (cfg Config) Validate() Config {
if cfg.ShardCount <= 0 || cfg.ShardCount > (math.MaxInt/2) {
validcfg.ShardCount = defaultShardCount
log.Warn("falling back to default configuration", log.Fields{
"name": Name + ".ShardCount",
"provided": cfg.ShardCount,
"default": validcfg.ShardCount,
})
log.Warn().
Str("name", "ShardCount").
Int("provided", cfg.ShardCount).
Int("default", validcfg.ShardCount).
Msg("falling back to default configuration")
}
return validcfg
@@ -111,6 +112,11 @@ type peerStore struct {
wg sync.WaitGroup
}
// MarshalZerologObject writes configuration into zerolog event
func (ps *peerStore) MarshalZerologObject(e *zerolog.Event) {
e.Str("type", Name).Object("config", ps.cfg)
}
var _ storage.PeerStorage = &peerStore{}
func (ps *peerStore) ScheduleGC(gcInterval, peerLifeTime time.Duration) {
@@ -125,10 +131,12 @@ func (ps *peerStore) ScheduleGC(gcInterval, peerLifeTime time.Duration) {
return
case <-t.C:
before := time.Now().Add(-peerLifeTime)
log.Debug("storage: Memory purging peers with no announces since", log.Fields{"before": before})
logger.Trace().Time("before", before).Msg("purging peers with no announces")
start := time.Now()
ps.gc(before)
storage.PromGCDurationMilliseconds.Observe(float64(time.Since(start).Nanoseconds()) / float64(time.Millisecond))
duration := time.Since(start)
logger.Debug().Dur("timeTaken", duration).Msg("gc complete")
storage.PromGCDurationMilliseconds.Observe(float64(duration.Milliseconds()))
}
}
}()
@@ -162,7 +170,7 @@ func (ps *peerStore) ScheduleStatisticsCollection(reportInterval time.Duration)
storage.PromInfoHashesCount.Set(float64(numInfohashes))
storage.PromSeedersCount.Set(float64(numSeeders))
storage.PromLeechersCount.Set(float64(numLeechers))
log.Debug("storage: Memory: populateProm() finished", log.Fields{"timeTaken": time.Since(before)})
logger.Debug().TimeDiff("timeTaken", time.Now(), before).Msg("populate prom complete")
}
}
}
@@ -540,7 +548,3 @@ func (ps *peerStore) Stop() stop.Result {
return c.Result()
}
func (ps *peerStore) LogFields() log.Fields {
return ps.cfg.LogFields()
}
+109 -127
View File
@@ -28,6 +28,7 @@ import (
"time"
"github.com/go-redis/redis/v8"
"github.com/rs/zerolog"
"github.com/sot-tech/mochi/bittorrent"
"github.com/sot-tech/mochi/pkg/conf"
@@ -64,8 +65,11 @@ const (
CountLeecherKey = "CHI_C_L"
)
// ErrSentinelAndClusterChecked returned from initializer if both Config.Sentinel and Config.Cluster provided
var ErrSentinelAndClusterChecked = errors.New("unable to use both cluster and sentinel mode")
var (
logger = log.NewLogger(Name)
// ErrSentinelAndClusterChecked returned from initializer if both Config.Sentinel and Config.Cluster provided
ErrSentinelAndClusterChecked = errors.New("unable to use both cluster and sentinel mode")
)
func init() {
// Register the storage builder.
@@ -98,7 +102,6 @@ func newStore(cfg Config) (*store, error) {
Connection: rs,
closed: make(chan any),
wg: sync.WaitGroup{},
logFields: cfg.LogFields(),
}, nil
}
@@ -118,21 +121,18 @@ type Config struct {
ConnectTimeout time.Duration `cfg:"connect_timeout"`
}
// LogFields renders the current config as a set of Logrus fields.
func (cfg Config) LogFields() log.Fields {
return log.Fields{
"name": Name,
"peerLifetime": cfg.PeerLifetime,
"addresses": cfg.Addresses,
"db": cfg.DB,
"poolSize": cfg.PoolSize,
"sentinel": cfg.Sentinel,
"sentinelMaster": cfg.SentinelMaster,
"cluster": cfg.Cluster,
"readTimeout": cfg.ReadTimeout,
"writeTimeout": cfg.WriteTimeout,
"connectTimeout": cfg.ConnectTimeout,
}
// MarshalZerologObject writes configuration fields into zerolog event
func (cfg Config) MarshalZerologObject(e *zerolog.Event) {
e.Strs("addresses", cfg.Addresses).
Int("db", cfg.DB).
Int("poolSize", cfg.PoolSize).
Bool("sentinel", cfg.Sentinel).
Str("sentinelMaster", cfg.SentinelMaster).
Bool("cluster", cfg.Cluster).
Dur("readTimeout", cfg.ReadTimeout).
Dur("writeTimeout", cfg.WriteTimeout).
Dur("connectTimeout", cfg.ConnectTimeout).
Dur("peerLifetime", cfg.PeerLifetime)
}
// Validate sanity checks values set in a config and returns a new config with
@@ -157,38 +157,38 @@ func (cfg Config) Validate() (Config, error) {
validCfg.Addresses = addresses
if len(cfg.Addresses) == 0 {
validCfg.Addresses = []string{defaultRedisAddress}
log.Warn("falling back to default configuration", log.Fields{
"name": Name + ".Addresses",
"provided": cfg.Addresses,
"default": validCfg.Addresses,
})
logger.Warn().
Str("name", "Addresses").
Strs("provided", cfg.Addresses).
Strs("default", validCfg.Addresses).
Msg("falling back to default configuration")
}
if cfg.ReadTimeout <= 0 {
validCfg.ReadTimeout = defaultReadTimeout
log.Warn("falling back to default configuration", log.Fields{
"name": Name + ".ReadTimeout",
"provided": cfg.ReadTimeout,
"default": validCfg.ReadTimeout,
})
logger.Warn().
Str("name", "ReadTimeout").
Dur("provided", cfg.ReadTimeout).
Dur("default", validCfg.ReadTimeout).
Msg("falling back to default configuration")
}
if cfg.WriteTimeout <= 0 {
validCfg.WriteTimeout = defaultWriteTimeout
log.Warn("falling back to default configuration", log.Fields{
"name": Name + ".WriteTimeout",
"provided": cfg.WriteTimeout,
"default": validCfg.WriteTimeout,
})
logger.Warn().
Str("name", "WriteTimeout").
Dur("provided", cfg.WriteTimeout).
Dur("default", validCfg.WriteTimeout).
Msg("falling back to default configuration")
}
if cfg.ConnectTimeout <= 0 {
validCfg.ConnectTimeout = defaultConnectTimeout
log.Warn("falling back to default configuration", log.Fields{
"name": Name + ".ConnectTimeout",
"provided": cfg.ConnectTimeout,
"default": validCfg.ConnectTimeout,
})
logger.Warn().
Str("name", "ConnectTimeout").
Dur("provided", cfg.ConnectTimeout).
Dur("default", validCfg.ConnectTimeout).
Msg("falling back to default configuration")
}
return validCfg, nil
@@ -238,7 +238,13 @@ func (cfg Config) Connect() (con Connection, err error) {
_ = rs.Close()
rs = nil
}
return Connection{rs}, err
cfg.Login, cfg.Password = "", ""
return Connection{rs, cfg}, err
}
// MarshalZerologObject writes configuration into zerolog event
func (ps *store) MarshalZerologObject(e *zerolog.Event) {
e.Str("type", Name).Object("config", ps.Config)
}
func (ps *store) ScheduleGC(gcInterval, peerLifeTime time.Duration) {
@@ -254,9 +260,9 @@ func (ps *store) ScheduleGC(gcInterval, peerLifeTime time.Duration) {
case <-t.C:
start := time.Now()
ps.gc(time.Now().Add(-peerLifeTime))
duration := time.Since(start).Milliseconds()
log.Debug("storage: Redis: recordGCDuration", log.Fields{"timeTaken(ms)": duration})
storage.PromGCDurationMilliseconds.Observe(float64(duration))
duration := time.Since(start)
logger.Debug().Dur("timeTaken", duration).Msg("gc complete")
storage.PromGCDurationMilliseconds.Observe(float64(duration.Milliseconds()))
t.Reset(gcInterval)
}
}
@@ -285,7 +291,7 @@ func (ps *store) ScheduleStatisticsCollection(reportInterval time.Duration) {
storage.PromInfoHashesCount.Set(float64(numInfoHashes))
storage.PromSeedersCount.Set(float64(numSeeders))
storage.PromLeechersCount.Set(float64(numLeechers))
log.Debug("storage: Redis: populateProm() finished", log.Fields{"timeTaken": time.Since(before)})
logger.Debug().TimeDiff("timeTaken", time.Now(), before).Msg("populate prom complete")
}
}
}
@@ -295,13 +301,13 @@ func (ps *store) ScheduleStatisticsCollection(reportInterval time.Duration) {
// Connection is wrapper for redis.UniversalClient
type Connection struct {
redis.UniversalClient
Config
}
type store struct {
Connection
closed chan any
wg sync.WaitGroup
logFields log.Fields
closed chan any
wg sync.WaitGroup
}
func (ps *store) count(key string, getLength bool) (n uint64) {
@@ -313,10 +319,7 @@ func (ps *store) count(key string, getLength bool) (n uint64) {
}
err = AsNil(err)
if err != nil {
log.Error("storage: Redis: GET/SCARD failure", log.Fields{
"key": key,
"error": err,
})
logger.Error().Err(err).Str("key", key).Msg("storage: Redis: GET/SCARD failure")
}
return
}
@@ -375,11 +378,10 @@ func InfoHashKey(infoHash string, seeder, v6 bool) (infoHashKey string) {
}
func (ps *store) putPeer(infoHashKey, peerCountKey, peerID string) error {
log.Debug("storage: Redis: PutPeer", log.Fields{
"infoHashKey": infoHashKey,
"peerCountKey": peerCountKey,
"peerID": peerID,
})
logger.Trace().
Str("infoHashKey", infoHashKey).
Str("peerID", peerID).
Msg("put peer")
return ps.tx(func(tx redis.Pipeliner) (err error) {
if err = tx.HSet(context.TODO(), infoHashKey, peerID, ps.getClock()).Err(); err != nil {
return
@@ -393,11 +395,10 @@ func (ps *store) putPeer(infoHashKey, peerCountKey, peerID string) error {
}
func (ps *store) delPeer(infoHashKey, peerCountKey, peerID string) error {
log.Debug("storage: Redis: DeletePeer", log.Fields{
"infoHashKey": infoHashKey,
"peerCountKey": peerCountKey,
"peerID": peerID,
})
logger.Trace().
Str("infoHashKey", infoHashKey).
Str("peerID", peerID).
Msg("del peer")
deleted, err := ps.HDel(context.TODO(), infoHashKey, peerID).Uint64()
err = AsNil(err)
if err == nil {
@@ -428,10 +429,10 @@ func (ps *store) DeleteLeecher(ih bittorrent.InfoHash, peer bittorrent.Peer) err
}
func (ps *store) GraduateLeecher(ih bittorrent.InfoHash, peer bittorrent.Peer) error {
log.Debug("storage: Redis: GraduateLeecher", log.Fields{
"infoHash": ih,
"peer": peer,
})
logger.Trace().
Stringer("infoHash", ih).
Object("peer", peer).
Msg("graduate leecher")
infoHash, peerID, isV6 := ih.RawString(), peer.RawString(), peer.Addr().Is6()
ihSeederKey, ihLeecherKey := InfoHashKey(infoHash, true, isV6), InfoHashKey(infoHash, false, isV6)
@@ -465,7 +466,7 @@ func (ps Connection) parsePeersList(peersResult *redis.StringSliceCmd) (peers []
if p, err := bittorrent.NewPeer(peerID); err == nil {
peers = append(peers, p)
} else {
log.Error("storage: Redis: unable to decode leecher", log.Fields{"peerID": peerID})
logger.Error().Err(err).Str("peerID", peerID).Msg("unable to decode peer")
}
}
}
@@ -506,22 +507,19 @@ func (ps Connection) GetPeers(ih bittorrent.InfoHash, forSeeder bool, maxCount i
}
} else if l > 0 {
err = nil
log.Warn("storage: Redis: error occurred while retrieving peers", log.Fields{
"infoHash": infoHash,
"error": err,
})
logger.Warn().Err(err).Stringer("infoHash", ih).Msg("error occurred while retrieving peers")
}
return
}
func (ps *store) AnnouncePeers(ih bittorrent.InfoHash, seeder bool, numWant int, v6 bool) ([]bittorrent.Peer, error) {
log.Debug("storage: Redis: AnnouncePeers", log.Fields{
"infoHash": ih,
"seeder": seeder,
"numWant": numWant,
"peer": v6,
})
logger.Trace().
Stringer("infoHash", ih).
Bool("seeder", seeder).
Int("numWant", numWant).
Bool("v6", v6).
Msg("announce peers")
return ps.GetPeers(ih, seeder, numWant, v6, func(ctx context.Context, infoHashKey string, maxCount int) *redis.StringSliceCmd {
return ps.HRandField(ctx, infoHashKey, maxCount, false)
@@ -534,10 +532,7 @@ func (ps Connection) countPeers(infoHashKey string, countFn getPeerCountFn) uint
count, err := countFn(context.TODO(), infoHashKey).Result()
err = AsNil(err)
if err != nil {
log.Error("storage: Redis: key size calculation failure", log.Fields{
"infoHashKey": infoHashKey,
"error": err,
})
logger.Error().Err(err).Str("infoHashKey", infoHashKey).Msg("key size calculation failure")
}
return uint32(count)
}
@@ -555,9 +550,9 @@ func (ps Connection) CountPeers(ih bittorrent.InfoHash, countFn getPeerCountFn)
}
func (ps *store) ScrapeSwarm(ih bittorrent.InfoHash) (leechers uint32, seeders uint32, snatched uint32) {
log.Debug("storage: Redis ScrapeSwarm", log.Fields{
"infoHash": ih,
})
logger.Trace().
Stringer("infoHash", ih).
Msg("scrape swarm")
leechers, seeders = ps.CountPeers(ih, ps.HLen)
@@ -579,7 +574,7 @@ func (ps Connection) Put(ctx string, values ...storage.Entry) (err error) {
err = ps.HSet(context.TODO(), PrefixKey+ctx, args...).Err()
if err != nil {
if strings.Contains(err.Error(), argNumErrorMsg) {
log.Warn("This Redis version/implementation does not support variadic arguments for HSET")
logger.Warn().Msg("This Redis version/implementation does not support variadic arguments for HSET")
for _, p := range values {
if err = ps.HSet(context.TODO(), PrefixKey+ctx, p.Key, p.Value).Err(); err != nil {
break
@@ -613,7 +608,7 @@ func (ps Connection) Delete(ctx string, keys ...string) (err error) {
err = AsNil(ps.HDel(context.TODO(), PrefixKey+ctx, keys...).Err())
if err != nil {
if strings.Contains(err.Error(), argNumErrorMsg) {
log.Warn("This Redis version/implementation does not support variadic arguments for HDEL")
logger.Warn().Msg("This Redis version/implementation does not support variadic arguments for HDEL")
for _, k := range keys {
if err = AsNil(ps.HDel(context.TODO(), PrefixKey+ctx, k).Err()); err != nil {
break
@@ -676,7 +671,6 @@ func (Connection) Preservable() bool {
// transaction. The infohash key will remain in the addressFamil hash and
// we'll attempt to clean it up the next time gc runs.
func (ps *store) gc(cutoff time.Time) {
log.Debug("storage: Redis: purging peers with no announces since", log.Fields{"before": cutoff})
cutoffNanos := cutoff.UnixNano()
// list all infoHashKeys in the group
infoHashKeys, err := ps.SMembers(context.Background(), IHKey).Result()
@@ -690,7 +684,7 @@ func (ps *store) gc(cutoff time.Time) {
} else if strings.HasPrefix(infoHashKey, IH4LeecherKey) || strings.HasPrefix(infoHashKey, IH6LeecherKey) {
cntKey = CountLeecherKey
} else {
log.Warn("storage: Redis: unexpected record found in info hash set", log.Fields{"infoHashKey": infoHashKey})
logger.Warn().Str("infoHashKey", infoHashKey).Msg("unexpected record found in info hash set")
continue
}
// list all (peer, timeout) pairs for the ih
@@ -701,16 +695,15 @@ func (ps *store) gc(cutoff time.Time) {
for peerID, timeStamp := range peerList {
if mtime, err := strconv.ParseInt(timeStamp, 10, 64); err == nil {
if mtime <= cutoffNanos {
log.Debug("storage: Redis: adding peer to remove list", log.Fields{"peerID": peerID})
logger.Trace().Str("peerID", peerID).Msg("adding peer to remove list")
peersToRemove = append(peersToRemove, peerID)
}
} else {
log.Error("storage: Redis: unable to decode peer timestamp", log.Fields{
"infoHashKey": infoHashKey,
"peerID": peerID,
"timestamp": timeStamp,
"error": err,
})
logger.Error().Err(err).
Str("infoHashKey", infoHashKey).
Str("peerID", peerID).
Str("timestamp", timeStamp).
Msg("unable to decode peer timestamp")
}
}
if len(peersToRemove) > 0 {
@@ -718,35 +711,32 @@ func (ps *store) gc(cutoff time.Time) {
err = AsNil(err)
if err != nil {
if strings.Contains(err.Error(), argNumErrorMsg) {
log.Warn("This Redis version/implementation does not support variadic arguments for HDEL")
logger.Warn().Msg("This Redis version/implementation does not support variadic arguments for HDEL")
for _, k := range peersToRemove {
count, err := ps.HDel(context.Background(), infoHashKey, k).Result()
err = AsNil(err)
if err != nil {
log.Error("storage: Redis: unable to delete peer", log.Fields{
"infoHashKey": infoHashKey,
"peerID": k,
"error": err,
})
logger.Error().Err(err).
Str("infoHashKey", infoHashKey).
Str("peerID", k).
Msg("unable to delete peer")
} else {
removedPeerCount += count
}
}
} else {
log.Error("storage: Redis: unable to delete peers", log.Fields{
"infoHashKey": infoHashKey,
"peerIds": peersToRemove,
"error": err,
})
logger.Error().Err(err).
Str("infoHashKey", infoHashKey).
Strs("peerIDs", peersToRemove).
Msg("unable to delete peers")
}
}
if removedPeerCount > 0 { // DECR seeder/leecher counter
if err = ps.DecrBy(context.Background(), cntKey, removedPeerCount).Err(); err != nil {
log.Error("storage: Redis: unable to decrement seeder/leecher peer count", log.Fields{
"infoHashKey": infoHashKey,
"countKey": cntKey,
"error": err,
})
logger.Error().Err(err).
Str("infoHashKey", infoHashKey).
Str("countKey", cntKey).
Msg("unable to decrement seeder/leecher peer count")
}
}
}
@@ -764,20 +754,20 @@ func (ps *store) gc(cutoff time.Time) {
return err
}, infoHashKey))
if err != nil {
log.Error("storage: Redis: unable to clean info hash records", log.Fields{
"infoHashKey": infoHashKey,
"error": err,
})
logger.Error().Err(err).
Str("infoHashKey", infoHashKey).
Msg("unable to clean info hash records")
}
} else {
log.Error("storage: Redis: unable to fetch info hash peers", log.Fields{
"infoHashKey": infoHashKey,
"error": err,
})
logger.Error().Err(err).
Str("infoHashKey", infoHashKey).
Msg("unable to fetch info hash peers")
}
}
} else {
log.Error("storage: Redis: unable to fetch info hash set", log.Fields{"hashSet": IHKey, "error": err})
logger.Error().Err(err).
Str("hashSet", IHKey).
Msg("unable to fetch info hash peers")
}
}
@@ -790,7 +780,7 @@ func (ps *store) Stop() stop.Result {
ps.wg.Wait()
var err error
if ps.UniversalClient != nil {
log.Info("storage: Redis: exiting. mochi does not clear data in redis when exiting. mochi keys have prefix " + PrefixKey)
logger.Info().Msg("storage: Redis: exiting. mochi does not clear data in redis when exiting. mochi keys have prefix " + PrefixKey)
err = ps.UniversalClient.Close()
ps.UniversalClient = nil
}
@@ -799,11 +789,3 @@ func (ps *store) Stop() stop.Result {
return c.Result()
}
func (ps *store) LogFields() log.Fields {
fields := make(log.Fields, len(ps.logFields))
for k, v := range ps.logFields {
fields[k] = v
}
return fields
}
+22 -5
View File
@@ -70,7 +70,6 @@ func (c Config) sanitizeStatisticsConfig() (statInterval time.Duration) {
Dur("provided", c.PrometheusReportingInterval).
Dur("default", defaultPrometheusReportingInterval).
Msg("falling back to default configuration")
}
return
}
@@ -262,15 +261,33 @@ func NewStorage(name string, cfg conf.MapConfig) (ps PeerStorage, err error) {
}
if gc, isOk := ps.(GCAware); isOk {
gc.ScheduleGC(c.sanitizeGCConfig())
gcInterval, peerTTL := c.sanitizeGCConfig()
logger.Info().
Str("type", name).
Dur("gcInterval", gcInterval).
Dur("peerTTL", peerTTL).
Msg("scheduling GC")
gc.ScheduleGC(gcInterval, peerTTL)
} else {
logger.Debug().
Str("type", name).
Msg("storage does not support GC")
}
if statInterval := c.sanitizeStatisticsConfig(); statInterval > 0 {
if st, isOk := ps.(StatisticsAware); isOk {
if st, isOk := ps.(StatisticsAware); isOk {
if statInterval := c.sanitizeStatisticsConfig(); statInterval > 0 {
logger.Info().
Str("type", name).
Dur("statInterval", statInterval).
Msg("scheduling statistics collection")
st.ScheduleStatisticsCollection(statInterval)
} else {
logger.Info().Str("type", name).Msg("statistics collection disabled because of zero reporting interval")
}
} else {
logger.Info().Msg("prometheus disabled because of zero reporting interval")
logger.Debug().
Str("type", name).
Msg("storage does not support statistics collection")
}
return
+10 -1
View File
@@ -9,14 +9,23 @@ import (
"github.com/stretchr/testify/require"
"github.com/sot-tech/mochi/bittorrent"
"github.com/sot-tech/mochi/pkg/log"
"github.com/sot-tech/mochi/storage"
)
func init() {
_ = log.ConfigureLogger("", "warn", false, false)
}
// PeerEqualityFunc is the boolean function to use to check two Peers for
// equality.
// Depending on the implementation of the PeerStorage, this can be changed to
// use (Peer).EqualEndpoint instead.
var PeerEqualityFunc = func(p1, p2 bittorrent.Peer) bool { return p1.Equal(p2) }
var PeerEqualityFunc = func(p1, p2 bittorrent.Peer) bool {
return p1.Port() == p2.Port() &&
p1.Addr().Compare(p1.Addr()) == 0 &&
p1.ID == p2.ID
}
type testHolder struct {
st storage.PeerStorage