diff --git a/middleware/torrentapproval/torrentapproval.go b/middleware/torrentapproval/torrentapproval.go index 1d43e68..2bed5f5 100644 --- a/middleware/torrentapproval/torrentapproval.go +++ b/middleware/torrentapproval/torrentapproval.go @@ -54,7 +54,7 @@ func build(options conf.MapConfig, st storage.PeerStorage) (h middleware.Hook, e var ds storage.DataStorage = st if !cfg.Preserve && ds.Preservable() { - ds = memory.NewDataStore() + ds = memory.NewDataStorage() } var c container.Container diff --git a/storage/keydb/storage.go b/storage/keydb/storage.go new file mode 100644 index 0000000..3ea4ae1 --- /dev/null +++ b/storage/keydb/storage.go @@ -0,0 +1,126 @@ +// Package keydb implements the storage interface. +// This storage mostly is the same as redis, but it collects peers +// not in hashes, but in sets and uses KeyDB-specific command +// `EXPIREMEMBER` and, so it does not need garbage collection. +// Note: this storage also does not support statistics collection +package keydb + +import ( + "context" + "errors" + + "github.com/go-redis/redis/v8" + + "github.com/sot-tech/mochi/bittorrent" + "github.com/sot-tech/mochi/pkg/conf" + "github.com/sot-tech/mochi/pkg/log" + "github.com/sot-tech/mochi/pkg/stop" + "github.com/sot-tech/mochi/storage" + r "github.com/sot-tech/mochi/storage/redis" +) + +// Name is name of this storage +const ( + Name = "keydb" + expireMemberCmd = "EXPIREMEMBER" +) + +// ErrNotKeyDB returned from initializer if connected does not support KeyDB +// specific command (EXPIREMEMBER) +var ErrNotKeyDB = errors.New("provided instance seems not KeyDB") + +func init() { + // Register the storage driver. + storage.RegisterBuilder(Name, Builder) +} + +func Builder(icfg conf.MapConfig) (storage.PeerStorage, error) { + var cfg r.Config + var err error + + if err = icfg.Unmarshal(&cfg); err != nil { + return nil, err + } + + return New(cfg) +} + +func New(cfg r.Config) (*store, error) { + var err error + if cfg, err = cfg.Validate(); err != nil { + return nil, err + } + + var rs r.Connection + + if rs, err = cfg.Connect(); err != nil { + return nil, err + } + + cmd := redis.NewCommandsInfoCmd(context.Background(), "COMMAND", "INFO", expireMemberCmd) + _ = rs.Process(context.Background(), cmd) + err = r.AsNil(cmd.Err()) + if err == nil && len(cmd.Val()) == 0 { + err = ErrNotKeyDB + } + + var st *store + if err == nil { + st = &store{rs, cfg.LogFields()} + } + + return st, err +} + +type store struct { + r.Connection + logFields log.Fields +} + +func (s store) PutSeeder(infoHash bittorrent.InfoHash, peer bittorrent.Peer) error { + // TODO implement me + panic("implement me") +} + +func (s store) DeleteSeeder(infoHash bittorrent.InfoHash, peer bittorrent.Peer) error { + // TODO implement me + panic("implement me") +} + +func (s store) PutLeecher(infoHash bittorrent.InfoHash, peer bittorrent.Peer) error { + // TODO implement me + panic("implement me") +} + +func (s store) DeleteLeecher(infoHash bittorrent.InfoHash, peer bittorrent.Peer) error { + // TODO implement me + panic("implement me") +} + +func (s store) GraduateLeecher(infoHash bittorrent.InfoHash, peer bittorrent.Peer) error { + // TODO implement me + panic("implement me") +} + +func (s store) AnnouncePeers(infoHash bittorrent.InfoHash, seeder bool, numWant int, peer bittorrent.Peer) (peers []bittorrent.Peer, err error) { + // TODO implement me + panic("implement me") +} + +func (s store) ScrapeSwarm(infoHash bittorrent.InfoHash, peer bittorrent.Peer) bittorrent.Scrape { + // TODO implement me + panic("implement me") +} + +func (s *store) Stop() stop.Result { + c := make(stop.Channel) + if s.UniversalClient != nil { + c.Done(s.UniversalClient.Close()) + s.UniversalClient = nil + } + return c.Result() +} + +func (s store) LogFields() log.Fields { + return s.logFields +} diff --git a/storage/memory/storage.go b/storage/memory/storage.go index dd27c38..f8823b3 100644 --- a/storage/memory/storage.go +++ b/storage/memory/storage.go @@ -21,25 +21,19 @@ import ( "github.com/sot-tech/mochi/storage" ) -// Name is the name by which this peer store is registered with Conf. -const Name = "memory" - // Default config constants. const ( - defaultShardCount = 1024 - defaultPrometheusReportingInterval = time.Second * 1 - defaultGarbageCollectionInterval = time.Minute * 3 - defaultPeerLifetime = time.Minute * 30 + // Name is the name by which this peer store is registered with Conf. + Name = "memory" + defaultShardCount = 1024 ) func init() { // Register the storage driver. - storage.RegisterDriver(Name, driver{}) + storage.RegisterBuilder(Name, Builder) } -type driver struct{} - -func (d driver) NewStorage(icfg conf.MapConfig) (storage.PeerStorage, error) { +func Builder(icfg conf.MapConfig) (storage.PeerStorage, error) { var cfg Config if err := icfg.Unmarshal(&cfg); err != nil { return nil, err @@ -49,20 +43,14 @@ func (d driver) NewStorage(icfg conf.MapConfig) (storage.PeerStorage, error) { // Config holds the configuration of a memory PeerStorage. type Config struct { - GarbageCollectionInterval time.Duration `cfg:"gc_interval"` - PrometheusReportingInterval time.Duration `cfg:"prometheus_reporting_interval"` - PeerLifetime time.Duration `cfg:"peer_lifetime"` - ShardCount int `cfg:"shard_count"` + ShardCount int `cfg:"shard_count"` } // LogFields renders the current config as a set of Logrus fields. func (cfg Config) LogFields() log.Fields { return log.Fields{ - "name": Name, - "gcInterval": cfg.GarbageCollectionInterval, - "promReportInterval": cfg.PrometheusReportingInterval, - "peerLifetime": cfg.PeerLifetime, - "shardCount": cfg.ShardCount, + "Name": Name, + "ShardCount": cfg.ShardCount, } } @@ -76,36 +64,9 @@ func (cfg Config) Validate() Config { if cfg.ShardCount <= 0 || cfg.ShardCount > (math.MaxInt/2) { validcfg.ShardCount = defaultShardCount log.Warn("falling back to default configuration", log.Fields{ - "name": Name + ".ShardCount", - "provided": cfg.ShardCount, - "default": validcfg.ShardCount, - }) - } - - if cfg.GarbageCollectionInterval <= 0 { - validcfg.GarbageCollectionInterval = defaultGarbageCollectionInterval - log.Warn("falling back to default configuration", log.Fields{ - "name": Name + ".GarbageCollectionInterval", - "provided": cfg.GarbageCollectionInterval, - "default": validcfg.GarbageCollectionInterval, - }) - } - - if cfg.PrometheusReportingInterval < 0 { - validcfg.PrometheusReportingInterval = defaultPrometheusReportingInterval - log.Warn("falling back to default configuration", log.Fields{ - "name": Name + ".PrometheusReportingInterval", - "provided": cfg.PrometheusReportingInterval, - "default": validcfg.PrometheusReportingInterval, - }) - } - - if cfg.PeerLifetime <= 0 { - validcfg.PeerLifetime = defaultPeerLifetime - log.Warn("falling back to default configuration", log.Fields{ - "name": Name + ".PeerLifetime", - "provided": cfg.PeerLifetime, - "default": validcfg.PeerLifetime, + "Name": Name + ".ShardCount", + "Provided": cfg.ShardCount, + "Default": validcfg.ShardCount, }) } @@ -118,7 +79,7 @@ func NewPeerStorage(provided Config) (storage.PeerStorage, error) { ps := &peerStore{ cfg: cfg, shards: make([]*peerShard, cfg.ShardCount*2), - DataStorage: NewDataStore(), + DataStorage: NewDataStorage(), closed: make(chan struct{}), } @@ -126,50 +87,6 @@ func NewPeerStorage(provided Config) (storage.PeerStorage, error) { ps.shards[i] = &peerShard{swarms: make(map[bittorrent.InfoHash]swarm)} } - // Start a goroutine for garbage collection. - ps.wg.Add(1) - go func() { - defer ps.wg.Done() - t := time.NewTimer(cfg.GarbageCollectionInterval) - defer t.Stop() - for { - select { - case <-ps.closed: - return - case <-t.C: - before := time.Now().Add(-cfg.PeerLifetime) - log.Debug("storage: purging peers with no announces since", log.Fields{"before": before}) - start := time.Now() - ps.GC(before) - recordGCDuration(time.Since(start)) - } - } - }() - - if cfg.PrometheusReportingInterval > 0 { - // Start a goroutine for reporting statistics to Prometheus. - ps.wg.Add(1) - go func() { - defer ps.wg.Done() - t := time.NewTicker(cfg.PrometheusReportingInterval) - for { - select { - case <-ps.closed: - t.Stop() - return - case <-t.C: - if metrics.Enabled() { - before := time.Now() - ps.populateProm() - log.Debug("storage: populateProm() finished", log.Fields{"timeTaken": time.Since(before)}) - } - } - } - }() - } else { - log.Info("prometheus disabled because of zero reporting interval") - } - return ps, nil } @@ -197,22 +114,60 @@ type peerStore struct { var _ storage.PeerStorage = &peerStore{} -// populateProm aggregates metrics over all shards and then posts them to -// prometheus. -func (ps *peerStore) populateProm() { - var numInfohashes, numSeeders, numLeechers uint64 +func (ps *peerStore) ScheduleGC(gcInterval, peerLifeTime time.Duration) { + ps.wg.Add(1) + go func() { + defer ps.wg.Done() + t := time.NewTimer(gcInterval) + defer t.Stop() + for { + select { + case <-ps.closed: + return + case <-t.C: + before := time.Now().Add(-peerLifeTime) + log.Debug("storage: Memory purging peers with no announces since", log.Fields{"before": before}) + start := time.Now() + ps.gc(before) + recordGCDuration(time.Since(start)) + } + } + }() +} - for _, s := range ps.shards { - s.RLock() - numInfohashes += uint64(len(s.swarms)) - numSeeders += s.numSeeders - numLeechers += s.numLeechers - s.RUnlock() - } +func (ps *peerStore) ScheduleStatisticsCollection(reportInterval time.Duration) { + ps.wg.Add(1) + go func() { + defer ps.wg.Done() + t := time.NewTicker(reportInterval) + for { + select { + case <-ps.closed: + t.Stop() + return + case <-t.C: + if metrics.Enabled() { + before := time.Now() + // aggregates metrics over all shards and then posts them to + // prometheus. + var numInfohashes, numSeeders, numLeechers uint64 - storage.PromInfoHashesCount.Set(float64(numInfohashes)) - storage.PromSeedersCount.Set(float64(numSeeders)) - storage.PromLeechersCount.Set(float64(numLeechers)) + for _, s := range ps.shards { + s.RLock() + numInfohashes += uint64(len(s.swarms)) + numSeeders += s.numSeeders + numLeechers += s.numLeechers + s.RUnlock() + } + + storage.PromInfoHashesCount.Set(float64(numInfohashes)) + storage.PromSeedersCount.Set(float64(numSeeders)) + storage.PromLeechersCount.Set(float64(numLeechers)) + log.Debug("storage: Memory: populateProm() finished", log.Fields{"timeTaken": time.Since(before)}) + } + } + } + }() } // recordGCDuration records the duration of a GC sweep. @@ -485,8 +440,8 @@ func (ps *peerStore) ScrapeSwarm(ih bittorrent.InfoHash, peer bittorrent.Peer) ( return } -// NewDataStore creates new in-memory data store -func NewDataStore() storage.DataStorage { +// NewDataStorage creates new in-memory data store +func NewDataStorage() storage.DataStorage { return new(dataStore) } @@ -553,7 +508,7 @@ func (*dataStore) Preservable() bool { // // This function must be able to execute while other methods on this interface // are being executed in parallel. -func (ps *peerStore) GC(cutoff time.Time) { +func (ps *peerStore) gc(cutoff time.Time) { select { case <-ps.closed: return diff --git a/storage/memory/storage_test.go b/storage/memory/storage_test.go index 2f86f4c..c8e3b7f 100644 --- a/storage/memory/storage_test.go +++ b/storage/memory/storage_test.go @@ -2,19 +2,13 @@ package memory import ( "testing" - "time" "github.com/sot-tech/mochi/storage" "github.com/sot-tech/mochi/storage/test" ) func createNew() storage.PeerStorage { - ps, err := NewPeerStorage(Config{ - ShardCount: 1024, - GarbageCollectionInterval: 10 * time.Minute, - PrometheusReportingInterval: 10 * time.Minute, - PeerLifetime: 30 * time.Minute, - }) + ps, err := NewPeerStorage(Config{ShardCount: 1024}) if err != nil { panic(err) } diff --git a/storage/redis/storage.go b/storage/redis/storage.go index 3d23631..276f408 100644 --- a/storage/redis/storage.go +++ b/storage/redis/storage.go @@ -1,4 +1,4 @@ -// Package redis implements the storage interface for a Conf +// Package redis implements the storage interface. // BitTorrent tracker keeping peer data in redis with hash. // There two categories of hash: // @@ -10,7 +10,7 @@ // To save all the infohashes, used for garbage collection, // metrics aggregation and leecher graduation // -// Tree keys are used to record the count of seeders and leechers. +// Two keys are used to record the count of seeders and leechers. // // - CHI_C_S (key type) // To record the number of seeders. @@ -38,37 +38,37 @@ import ( "github.com/sot-tech/mochi/storage" ) -// Name is the name by which this peer store is registered with Conf. -const Name = "redis" - -// Default config constants. const ( - defaultPrometheusReportingInterval = time.Second * 1 - defaultGarbageCollectionInterval = time.Minute * 3 - defaultPeerLifetime = time.Minute * 30 - defaultRedisAddress = "127.0.0.1:6379" - defaultReadTimeout = time.Second * 15 - defaultWriteTimeout = time.Second * 15 - defaultConnectTimeout = time.Second * 15 - prefixKey = "CHI_" - ihKey = "CHI_I" - ihSeederKey = "CHI_S_" - ihLeecherKey = "CHI_L_" - cntSeederKey = "CHI_C_S" - cntLeecherKey = "CHI_C_L" + // Name is the name by which this peer store is registered with Conf. + Name = "redis" + // Default config constants. + defaultRedisAddress = "127.0.0.1:6379" + defaultReadTimeout = time.Second * 15 + defaultWriteTimeout = time.Second * 15 + defaultConnectTimeout = time.Second * 15 + // PrefixKey prefix which will be prepended to ctx argument in storage.DataStorage calls + PrefixKey = "CHI_" + // IHKey redis hash key for all info hashes + IHKey = "CHI_I" + // IHSeederKey redis hash key prefix for seeders + IHSeederKey = "CHI_S_" + // IHLeecherKey redis hash key prefix for leechers + IHLeecherKey = "CHI_L_" + // CountSeederKey redis key for seeder count + CountSeederKey = "CHI_C_S" + // CountLeecherKey redis key for leecher count + CountLeecherKey = "CHI_C_L" ) // ErrSentinelAndClusterChecked returned from initializer if both Config.Sentinel and Config.Cluster provided var ErrSentinelAndClusterChecked = errors.New("unable to use both cluster and sentinel mode") func init() { - // Register the storage driver. - storage.RegisterDriver(Name, driver{}) + // Register the storage RedisDriver. + storage.RegisterBuilder(Name, Builder) } -type driver struct{} - -func (d driver) NewStorage(icfg conf.MapConfig) (storage.PeerStorage, error) { +func Builder(icfg conf.MapConfig) (storage.PeerStorage, error) { // Unmarshal the bytes into the proper config type. var cfg Config @@ -79,35 +79,55 @@ func (d driver) NewStorage(icfg conf.MapConfig) (storage.PeerStorage, error) { return New(cfg) } +func New(cfg Config) (*store, error) { + cfg, err := cfg.Validate() + if err != nil { + return nil, err + } + + rs, err := cfg.Connect() + if err != nil { + return nil, err + } + + return &store{ + Connection: rs, + closed: make(chan any), + wg: sync.WaitGroup{}, + logFields: cfg.LogFields(), + }, nil +} + // Config holds the configuration of a redis PeerStorage. type Config struct { - GarbageCollectionInterval time.Duration `cfg:"gc_interval"` - PrometheusReportingInterval time.Duration `cfg:"prometheus_reporting_interval"` - PeerLifetime time.Duration `cfg:"peer_lifetime"` - Addresses []string - DB int - PoolSize int `cfg:"pool_size"` - Login string - Password string - Sentinel bool - SentinelMaster string `cfg:"sentinel_master"` - Cluster bool - ReadTimeout time.Duration `cfg:"read_timeout"` - WriteTimeout time.Duration `cfg:"write_timeout"` - ConnectTimeout time.Duration `cfg:"connect_timeout"` + PeerLifetime time.Duration `cfg:"peer_lifetime"` + Addresses []string + DB int + PoolSize int `cfg:"pool_size"` + Login string + Password string + Sentinel bool + SentinelMaster string `cfg:"sentinel_master"` + Cluster bool + ReadTimeout time.Duration `cfg:"read_timeout"` + WriteTimeout time.Duration `cfg:"write_timeout"` + ConnectTimeout time.Duration `cfg:"connect_timeout"` } // LogFields renders the current config as a set of Logrus fields. func (cfg Config) LogFields() log.Fields { return log.Fields{ - "name": Name, - "gcInterval": cfg.GarbageCollectionInterval, - "promReportInterval": cfg.PrometheusReportingInterval, - "peerLifetime": cfg.PeerLifetime, - "addresses": cfg.Addresses, - "readTimeout": cfg.ReadTimeout, - "writeTimeout": cfg.WriteTimeout, - "connectTimeout": cfg.ConnectTimeout, + "Name": Name, + "PeerLifetime": cfg.PeerLifetime, + "Addresses": cfg.Addresses, + "DB": cfg.DB, + "PoolSize": cfg.PoolSize, + "Sentinel": cfg.Sentinel, + "SentinelMaster": cfg.SentinelMaster, + "Cluster": cfg.Cluster, + "ReadTimeout": cfg.ReadTimeout, + "WriteTimeout": cfg.WriteTimeout, + "ConnectTimeout": cfg.ConnectTimeout, } } @@ -134,78 +154,47 @@ func (cfg Config) Validate() (Config, error) { if len(cfg.Addresses) == 0 { validCfg.Addresses = []string{defaultRedisAddress} log.Warn("falling back to default configuration", log.Fields{ - "name": Name + ".Addresses", - "provided": cfg.Addresses, - "default": validCfg.Addresses, + "Name": Name + ".Addresses", + "Provided": cfg.Addresses, + "Default": validCfg.Addresses, }) } if cfg.ReadTimeout <= 0 { validCfg.ReadTimeout = defaultReadTimeout log.Warn("falling back to default configuration", log.Fields{ - "name": Name + ".ReadTimeout", - "provided": cfg.ReadTimeout, - "default": validCfg.ReadTimeout, + "Name": Name + ".ReadTimeout", + "Provided": cfg.ReadTimeout, + "Default": validCfg.ReadTimeout, }) } if cfg.WriteTimeout <= 0 { validCfg.WriteTimeout = defaultWriteTimeout log.Warn("falling back to default configuration", log.Fields{ - "name": Name + ".WriteTimeout", - "provided": cfg.WriteTimeout, - "default": validCfg.WriteTimeout, + "Name": Name + ".WriteTimeout", + "Provided": cfg.WriteTimeout, + "Default": validCfg.WriteTimeout, }) } if cfg.ConnectTimeout <= 0 { validCfg.ConnectTimeout = defaultConnectTimeout log.Warn("falling back to default configuration", log.Fields{ - "name": Name + ".ConnectTimeout", - "provided": cfg.ConnectTimeout, - "default": validCfg.ConnectTimeout, - }) - } - - if cfg.GarbageCollectionInterval <= 0 { - validCfg.GarbageCollectionInterval = defaultGarbageCollectionInterval - log.Warn("falling back to default configuration", log.Fields{ - "name": Name + ".GarbageCollectionInterval", - "provided": cfg.GarbageCollectionInterval, - "default": validCfg.GarbageCollectionInterval, - }) - } - - if cfg.PrometheusReportingInterval < 0 { - validCfg.PrometheusReportingInterval = defaultPrometheusReportingInterval - log.Warn("falling back to default configuration", log.Fields{ - "name": Name + ".PrometheusReportingInterval", - "provided": cfg.PrometheusReportingInterval, - "default": validCfg.PrometheusReportingInterval, - }) - } - - if cfg.PeerLifetime <= 0 { - validCfg.PeerLifetime = defaultPeerLifetime - log.Warn("falling back to default configuration", log.Fields{ - "name": Name + ".PeerLifetime", - "provided": cfg.PeerLifetime, - "default": validCfg.PeerLifetime, + "Name": Name + ".ConnectTimeout", + "Provided": cfg.ConnectTimeout, + "Default": validCfg.ConnectTimeout, }) } return validCfg, nil } -func connect(cfg Config) (*store, error) { - var err error - db := &store{ - // FIXME: get context from parent and put into GC, middleware functions should use own ctx - ctx: context.TODO(), - } +func (cfg Config) Connect() (con Connection, err error) { + var rs redis.UniversalClient switch { case cfg.Cluster: - db.con = redis.NewClusterClient(&redis.ClusterOptions{ + rs = redis.NewClusterClient(&redis.ClusterOptions{ Addrs: cfg.Addresses, Username: cfg.Login, Password: cfg.Password, @@ -215,7 +204,7 @@ func connect(cfg Config) (*store, error) { PoolSize: cfg.PoolSize, }) case cfg.Sentinel: - db.con = redis.NewFailoverClient(&redis.FailoverOptions{ + rs = redis.NewFailoverClient(&redis.FailoverOptions{ SentinelAddrs: cfg.Addresses, SentinelUsername: cfg.Login, SentinelPassword: cfg.Password, @@ -227,7 +216,7 @@ func connect(cfg Config) (*store, error) { DB: cfg.DB, }) default: - db.con = redis.NewClient(&redis.Options{ + rs = redis.NewClient(&redis.Options{ Addr: cfg.Addresses[0], Username: cfg.Login, Password: cfg.Password, @@ -238,106 +227,85 @@ func connect(cfg Config) (*store, error) { DB: cfg.DB, }) } - if err = db.con.Ping(db.ctx).Err(); err == nil && !errors.Is(err, redis.Nil) { + if err = rs.Ping(context.Background()).Err(); err == nil && !errors.Is(err, redis.Nil) { err = nil - res, err := db.con.Do(db.ctx, "command", "info", "keydb.cron").Result() - err = asNil(err) - if err != nil { - log.Warn("storage: Redis: unable to determine if current instance is KeyDB", log.Fields{"Error": err}) - } else { - db.isKeyDB = res != nil - } } else { - _ = db.con.Close() - db = nil + _ = rs.Close() + rs = nil } - return db, err + return Connection{rs}, err } -// New creates a new PeerStorage backed by redis. -func New(conf Config) (storage.PeerStorage, error) { - cfg, err := conf.Validate() - if err != nil { - return nil, err - } - - ps, err := connect(cfg) - if err != nil { - return nil, err - } - ps.closed = make(chan any) - ps.logFields = cfg.LogFields() - - // Start a goroutine for garbage collection. +func (ps *store) ScheduleGC(gcInterval, peerLifeTime time.Duration) { ps.wg.Add(1) - go ps.scheduleGC(cfg.GarbageCollectionInterval, cfg.PeerLifetime) - - if cfg.PrometheusReportingInterval > 0 { - // Start a goroutine for reporting statistics to Prometheus. - ps.wg.Add(1) - go ps.schedulerProm(cfg.PrometheusReportingInterval) - } else { - log.Info("prometheus disabled because of zero reporting interval") - } - - return ps, nil -} - -func (ps *store) scheduleGC(gcInterval, peerLifeTime time.Duration) { - defer ps.wg.Done() - t := time.NewTimer(gcInterval) - defer t.Stop() - for { - select { - case <-ps.closed: - return - case <-t.C: - start := time.Now() - ps.GC(time.Now().Add(-peerLifeTime)) - duration := time.Since(start).Milliseconds() - log.Debug("storage: Redis: recordGCDuration", log.Fields{"timeTaken(ms)": duration}) - storage.PromGCDurationMilliseconds.Observe(float64(duration)) - t.Reset(gcInterval) - } - } -} - -func (ps *store) schedulerProm(reportInterval time.Duration) { - defer ps.wg.Done() - t := time.NewTicker(reportInterval) - for { - select { - case <-ps.closed: - t.Stop() - return - case <-t.C: - if metrics.Enabled() { - before := time.Now() - ps.populateProm() - log.Debug("storage: Redis: populateProm() finished", log.Fields{"timeTaken": time.Since(before)}) + go func() { + defer ps.wg.Done() + t := time.NewTimer(gcInterval) + defer t.Stop() + for { + select { + case <-ps.closed: + return + case <-t.C: + start := time.Now() + ps.gc(time.Now().Add(-peerLifeTime)) + duration := time.Since(start).Milliseconds() + log.Debug("storage: Redis: recordGCDuration", log.Fields{"timeTaken(ms)": duration}) + storage.PromGCDurationMilliseconds.Observe(float64(duration)) + t.Reset(gcInterval) } } - } + }() +} + +func (ps *store) ScheduleStatisticsCollection(reportInterval time.Duration) { + ps.wg.Add(1) + go func() { + defer ps.wg.Done() + t := time.NewTicker(reportInterval) + for { + select { + case <-ps.closed: + t.Stop() + return + case <-t.C: + if metrics.Enabled() { + before := time.Now() + // populateProm aggregates metrics over all groups and then posts them to + // prometheus. + numInfoHashes := ps.count(IHKey, true) + numSeeders := ps.count(CountSeederKey, false) + numLeechers := ps.count(CountLeecherKey, false) + + storage.PromInfoHashesCount.Set(float64(numInfoHashes)) + storage.PromSeedersCount.Set(float64(numSeeders)) + storage.PromLeechersCount.Set(float64(numLeechers)) + log.Debug("storage: Redis: populateProm() finished", log.Fields{"timeTaken": time.Since(before)}) + } + } + } + }() +} + +type Connection struct { + redis.UniversalClient } type store struct { - con redis.UniversalClient - ctx context.Context - + Connection closed chan any wg sync.WaitGroup logFields log.Fields - isKeyDB bool } func (ps *store) count(key string, getLength bool) (n uint64) { var err error if getLength { - n, err = ps.con.SCard(ps.ctx, key).Uint64() + n, err = ps.SCard(context.Background(), key).Uint64() } else { - n, err = ps.con.Get(ps.ctx, key).Uint64() + n, err = ps.Get(context.Background(), key).Uint64() } - err = asNil(err) + err = AsNil(err) if err != nil { log.Error("storage: Redis: GET/SCARD failure", log.Fields{ "key": key, @@ -347,24 +315,12 @@ func (ps *store) count(key string, getLength bool) (n uint64) { return } -// populateProm aggregates metrics over all groups and then posts them to -// prometheus. -func (ps *store) populateProm() { - numInfoHashes := ps.count(ihKey, true) - numSeeders := ps.count(cntSeederKey, false) - numLeechers := ps.count(cntLeecherKey, false) - - storage.PromInfoHashesCount.Set(float64(numInfoHashes)) - storage.PromSeedersCount.Set(float64(numSeeders)) - storage.PromLeechersCount.Set(float64(numLeechers)) -} - func (ps *store) getClock() int64 { return timecache.NowUnixNano() } func (ps *store) tx(txf func(tx redis.Pipeliner) error) (err error) { - if pipe, txErr := ps.con.TxPipelined(ps.ctx, txf); txErr == nil { + if pipe, txErr := ps.TxPipelined(context.TODO(), txf); txErr == nil { errs := make([]string, 0) for _, c := range pipe { if err := c.Err(); err != nil { @@ -380,7 +336,7 @@ func (ps *store) tx(txf func(tx redis.Pipeliner) error) (err error) { return } -func asNil(err error) error { +func AsNil(err error) error { if err == nil || errors.Is(err, redis.Nil) { return nil } @@ -394,13 +350,13 @@ func (ps *store) putPeer(infoHashKey, peerCountKey, peerId string) error { "PeerId": peerId, }) return ps.tx(func(tx redis.Pipeliner) (err error) { - if err = tx.HSet(ps.ctx, infoHashKey, peerId, ps.getClock()).Err(); err != nil { + if err = tx.HSet(context.TODO(), infoHashKey, peerId, ps.getClock()).Err(); err != nil { return } - if err = tx.Incr(ps.ctx, peerCountKey).Err(); err != nil { + if err = tx.Incr(context.TODO(), peerCountKey).Err(); err != nil { return } - err = tx.SAdd(ps.ctx, ihKey, infoHashKey).Err() + err = tx.SAdd(context.TODO(), IHKey, infoHashKey).Err() return }) } @@ -411,13 +367,13 @@ func (ps *store) delPeer(infoHashKey, peerCountKey, peerId string) error { "PeerCountKey": peerCountKey, "PeerId": peerId, }) - deleted, err := ps.con.HDel(ps.ctx, infoHashKey, peerId).Uint64() - err = asNil(err) + deleted, err := ps.HDel(context.TODO(), infoHashKey, peerId).Uint64() + err = AsNil(err) if err == nil { if deleted == 0 { err = storage.ErrResourceDoesNotExist } else { - err = ps.con.Decr(ps.ctx, peerCountKey).Err() + err = ps.Decr(context.TODO(), peerCountKey).Err() } } @@ -425,19 +381,19 @@ func (ps *store) delPeer(infoHashKey, peerCountKey, peerId string) error { } func (ps *store) PutSeeder(ih bittorrent.InfoHash, peer bittorrent.Peer) error { - return ps.putPeer(ihSeederKey+ih.RawString(), cntSeederKey, peer.RawString()) + return ps.putPeer(IHSeederKey+ih.RawString(), CountSeederKey, peer.RawString()) } func (ps *store) DeleteSeeder(ih bittorrent.InfoHash, peer bittorrent.Peer) error { - return ps.delPeer(ihSeederKey+ih.RawString(), cntSeederKey, peer.RawString()) + return ps.delPeer(IHSeederKey+ih.RawString(), CountSeederKey, peer.RawString()) } func (ps *store) PutLeecher(ih bittorrent.InfoHash, peer bittorrent.Peer) error { - return ps.putPeer(ihLeecherKey+ih.RawString(), cntLeecherKey, peer.RawString()) + return ps.putPeer(IHLeecherKey+ih.RawString(), CountLeecherKey, peer.RawString()) } func (ps *store) DeleteLeecher(ih bittorrent.InfoHash, peer bittorrent.Peer) error { - return ps.delPeer(ihLeecherKey+ih.RawString(), cntLeecherKey, peer.RawString()) + return ps.delPeer(IHLeecherKey+ih.RawString(), CountLeecherKey, peer.RawString()) } func (ps *store) GraduateLeecher(ih bittorrent.InfoHash, peer bittorrent.Peer) error { @@ -447,24 +403,24 @@ func (ps *store) GraduateLeecher(ih bittorrent.InfoHash, peer bittorrent.Peer) e }) infoHash, peerId := ih.RawString(), peer.RawString() - ihSeederKey, ihLeecherKey := ihSeederKey+infoHash, ihLeecherKey+infoHash + ihSeederKey, ihLeecherKey := IHSeederKey+infoHash, IHLeecherKey+infoHash return ps.tx(func(tx redis.Pipeliner) error { - deleted, err := tx.HDel(ps.ctx, ihLeecherKey, peerId).Uint64() - err = asNil(err) + deleted, err := tx.HDel(context.TODO(), ihLeecherKey, peerId).Uint64() + err = AsNil(err) if err == nil { if deleted > 0 { - err = tx.Decr(ps.ctx, cntLeecherKey).Err() + err = tx.Decr(context.TODO(), CountLeecherKey).Err() } } if err == nil { - err = tx.HSet(ps.ctx, ihSeederKey, peerId, ps.getClock()).Err() + err = tx.HSet(context.TODO(), ihSeederKey, peerId, ps.getClock()).Err() } if err == nil { - err = tx.Incr(ps.ctx, cntSeederKey).Err() + err = tx.Incr(context.TODO(), CountSeederKey).Err() } if err == nil { - err = tx.SAdd(ps.ctx, ihKey, ihSeederKey).Err() + err = tx.SAdd(context.TODO(), IHKey, ihSeederKey).Err() } return err }) @@ -472,8 +428,8 @@ func (ps *store) GraduateLeecher(ih bittorrent.InfoHash, peer bittorrent.Peer) e func (ps *store) getPeers(infoHashKey, except string, max int, v4Only bool) (peers []bittorrent.Peer, err error) { var peerIds []string - peerIds, err = ps.con.HKeys(ps.ctx, infoHashKey).Result() - if err = asNil(err); err == nil { + peerIds, err = ps.HKeys(context.TODO(), infoHashKey).Result() + if err = AsNil(err); err == nil { for _, peerId := range peerIds { if peerId != except { if p, err := bittorrent.NewPeer(peerId); err == nil { @@ -505,16 +461,16 @@ func (ps *store) AnnouncePeers(ih bittorrent.InfoHash, seeder bool, numWant int, infoHash, peerId, isV4 := ih.RawString(), peer.RawString(), peer.Addr().Is4() if seeder { - peers, err = ps.getPeers(ihLeecherKey+infoHash, peerId, numWant, isV4) + peers, err = ps.getPeers(IHLeecherKey+infoHash, peerId, numWant, isV4) } else { // Append as many seeders as possible. - peers, err = ps.getPeers(ihSeederKey+infoHash, peerId, numWant, isV4) + peers, err = ps.getPeers(IHSeederKey+infoHash, peerId, numWant, isV4) if err != nil { return } if numWant -= len(peers); numWant > 0 { - if leechers, err := ps.getPeers(ihLeecherKey+infoHash, peerId, numWant, isV4); err == nil { + if leechers, err := ps.getPeers(IHLeecherKey+infoHash, peerId, numWant, isV4); err == nil { peers = append(peers, leechers...) } else { log.Warn("storage: Redis: error occurred while receiving leechers", log.Fields{"InfoHash": ih, "Error": err}) @@ -530,16 +486,16 @@ func (ps *store) AnnouncePeers(ih bittorrent.InfoHash, seeder bool, numWant int, } func (ps *store) ScrapeSwarm(ih bittorrent.InfoHash, peer bittorrent.Peer) (resp bittorrent.Scrape) { - log.Debug("storage: Redis ScrapeSwarm", log.Fields{ + log.Debug("storage: RedisDriver ScrapeSwarm", log.Fields{ "InfoHash": ih, "Peer": peer, }) resp.InfoHash = ih infoHash := ih.RawString() - ihSeederKey, ihLeecherKey := ihSeederKey+infoHash, ihLeecherKey+infoHash + ihSeederKey, ihLeecherKey := IHSeederKey+infoHash, IHLeecherKey+infoHash - leechersLen, err := ps.con.HLen(ps.ctx, ihLeecherKey).Result() - err = asNil(err) + leechersLen, err := ps.HLen(context.TODO(), ihLeecherKey).Result() + err = AsNil(err) if err != nil { log.Error("storage: Redis: HLEN failure", log.Fields{ "InfoHashKey": ihLeecherKey, @@ -548,8 +504,8 @@ func (ps *store) ScrapeSwarm(ih bittorrent.InfoHash, peer bittorrent.Peer) (resp return } - seedersLen, err := ps.con.HLen(ps.ctx, ihSeederKey).Result() - err = asNil(err) + seedersLen, err := ps.HLen(context.TODO(), ihSeederKey).Result() + err = AsNil(err) if err != nil { log.Error("storage: Redis: HLEN failure", log.Fields{ "InfoHashKey": ihSeederKey, @@ -566,21 +522,21 @@ func (ps *store) ScrapeSwarm(ih bittorrent.InfoHash, peer bittorrent.Peer) (resp const argNumErrorMsg = "ERR wrong number of arguments" -func (ps *store) Put(ctx string, values ...storage.Entry) (err error) { +func (ps Connection) Put(ctx string, values ...storage.Entry) (err error) { if l := len(values); l > 0 { if l == 1 { - err = ps.con.HSet(ps.ctx, prefixKey+ctx, values[0].Key, values[0].Value).Err() + err = ps.HSet(context.TODO(), PrefixKey+ctx, values[0].Key, values[0].Value).Err() } else { args := make([]any, 0, l*2) for _, p := range values { args = append(args, p.Key, p.Value) } - err = ps.con.HSet(ps.ctx, prefixKey+ctx, args...).Err() + err = ps.HSet(context.TODO(), PrefixKey+ctx, args...).Err() if err != nil { if strings.Contains(err.Error(), argNumErrorMsg) { - log.Warn("This Redis version/implementation does not support variadic arguments for HSET") + log.Warn("This RedisDriver version/implementation does not support variadic arguments for HSET") for _, p := range values { - if err = ps.con.HSet(ps.ctx, prefixKey+ctx, p.Key, p.Value).Err(); err != nil { + if err = ps.HSet(context.TODO(), PrefixKey+ctx, p.Key, p.Value).Err(); err != nil { break } } @@ -591,27 +547,27 @@ func (ps *store) Put(ctx string, values ...storage.Entry) (err error) { return } -func (ps *store) Contains(ctx string, key string) (bool, error) { - exist, err := ps.con.HExists(ps.ctx, prefixKey+ctx, key).Result() - return exist, asNil(err) +func (ps Connection) Contains(ctx string, key string) (bool, error) { + exist, err := ps.HExists(context.TODO(), PrefixKey+ctx, key).Result() + return exist, AsNil(err) } -func (ps *store) Load(ctx string, key string) (v any, err error) { - v, err = ps.con.HGet(ps.ctx, prefixKey+ctx, key).Result() +func (ps Connection) Load(ctx string, key string) (v any, err error) { + v, err = ps.HGet(context.TODO(), PrefixKey+ctx, key).Result() if err != nil && errors.Is(err, redis.Nil) { v, err = nil, nil } return } -func (ps *store) Delete(ctx string, keys ...string) (err error) { +func (ps Connection) Delete(ctx string, keys ...string) (err error) { if len(keys) > 0 { - err = asNil(ps.con.HDel(ps.ctx, prefixKey+ctx, keys...).Err()) + err = AsNil(ps.HDel(context.TODO(), PrefixKey+ctx, keys...).Err()) if err != nil { if strings.Contains(err.Error(), argNumErrorMsg) { - log.Warn("This Redis version/implementation does not support variadic arguments for HDEL") + log.Warn("This RedisDriver version/implementation does not support variadic arguments for HDEL") for _, k := range keys { - if err = asNil(ps.con.HDel(ps.ctx, prefixKey+ctx, k).Err()); err != nil { + if err = AsNil(ps.HDel(context.TODO(), PrefixKey+ctx, k).Err()); err != nil { break } } @@ -621,7 +577,7 @@ func (ps *store) Delete(ctx string, keys ...string) (err error) { return } -func (*store) Preservable() bool { +func (Connection) Preservable() bool { return true } @@ -670,27 +626,27 @@ func (*store) Preservable() bool { // - If the change happens after the HLEN, we will not even attempt to make the // transaction. The infohash key will remain in the addressFamil hash and // we'll attempt to clean it up the next time gc runs. -func (ps *store) GC(cutoff time.Time) { +func (ps *store) gc(cutoff time.Time) { log.Debug("storage: Redis: purging peers with no announces since", log.Fields{"before": cutoff}) cutoffNanos := cutoff.UnixNano() // list all infoHashKeys in the group - infoHashKeys, err := ps.con.SMembers(ps.ctx, ihKey).Result() - err = asNil(err) + infoHashKeys, err := ps.SMembers(context.Background(), IHKey).Result() + err = AsNil(err) if err == nil { for _, infoHashKey := range infoHashKeys { var cntKey string var seeder bool - if seeder = strings.HasPrefix(infoHashKey, ihSeederKey); seeder { - cntKey = cntSeederKey - } else if strings.HasPrefix(infoHashKey, ihLeecherKey) { - cntKey = cntLeecherKey + if seeder = strings.HasPrefix(infoHashKey, IHSeederKey); seeder { + cntKey = CountSeederKey + } else if strings.HasPrefix(infoHashKey, IHLeecherKey) { + cntKey = CountLeecherKey } else { log.Warn("storage: Redis: unexpected record found in info hash set", log.Fields{"InfoHashKey": infoHashKey}) continue } // list all (peer, timeout) pairs for the ih - peerList, err := ps.con.HGetAll(ps.ctx, infoHashKey).Result() - err = asNil(err) + peerList, err := ps.HGetAll(context.Background(), infoHashKey).Result() + err = AsNil(err) if err == nil { peersToRemove := make([]string, 0) for peerId, timeStamp := range peerList { @@ -709,14 +665,14 @@ func (ps *store) GC(cutoff time.Time) { } } if len(peersToRemove) > 0 { - removedPeerCount, err := ps.con.HDel(ps.ctx, infoHashKey, peersToRemove...).Result() - err = asNil(err) + removedPeerCount, err := ps.HDel(context.Background(), infoHashKey, peersToRemove...).Result() + err = AsNil(err) if err != nil { if strings.Contains(err.Error(), argNumErrorMsg) { - log.Warn("This Redis version/implementation does not support variadic arguments for HDEL") + log.Warn("This RedisDriver version/implementation does not support variadic arguments for HDEL") for _, k := range peersToRemove { - count, err := ps.con.HDel(ps.ctx, infoHashKey, k).Result() - err = asNil(err) + count, err := ps.HDel(context.Background(), infoHashKey, k).Result() + err = AsNil(err) if err != nil { log.Error("storage: Redis: unable to delete peer", log.Fields{ "InfoHashKey": infoHashKey, @@ -736,7 +692,7 @@ func (ps *store) GC(cutoff time.Time) { } } if removedPeerCount > 0 { // DECR seeder/leecher counter - if err = ps.con.DecrBy(ps.ctx, cntKey, removedPeerCount).Err(); err != nil { + if err = ps.DecrBy(context.Background(), cntKey, removedPeerCount).Err(); err != nil { log.Error("storage: Redis: unable to decrement seeder/leecher peer count", log.Fields{ "InfoHashKey": infoHashKey, "CountKey": cntKey, @@ -746,15 +702,15 @@ func (ps *store) GC(cutoff time.Time) { } } - err = asNil(ps.con.Watch(ps.ctx, func(tx *redis.Tx) (err error) { + err = AsNil(ps.Watch(context.Background(), func(tx *redis.Tx) (err error) { var infoHashCount uint64 - infoHashCount, err = ps.con.HLen(ps.ctx, infoHashKey).Uint64() - err = asNil(err) + infoHashCount, err = ps.HLen(context.Background(), infoHashKey).Uint64() + err = AsNil(err) if err == nil && infoHashCount == 0 { // Empty hashes are not shown among existing keys, // in other words, it's removed automatically after `HDEL` the last field. - // _, err := ps.con.Del(ps.ctx, infoHashKey) - err = asNil(ps.con.SRem(ps.ctx, ihKey, infoHashKey).Err()) + // _, err := ps.Del(context.TODO(), infoHashKey) + err = AsNil(ps.SRem(context.Background(), IHKey, infoHashKey).Err()) } return err }, infoHashKey)) @@ -772,7 +728,7 @@ func (ps *store) GC(cutoff time.Time) { } } } else { - log.Error("storage: Redis: unable to fetch info hash set", log.Fields{"HashSet": ihKey, "Error": err}) + log.Error("storage: Redis: unable to fetch info hash set", log.Fields{"HashSet": IHKey, "Error": err}) } } @@ -784,9 +740,10 @@ func (ps *store) Stop() stop.Result { } ps.wg.Wait() var err error - if ps.con != nil { - log.Info("storage: Redis: exiting. mochi does not clear data in redis when exiting. mochi keys have prefix " + prefixKey) - err = ps.con.Close() + if ps.UniversalClient != nil { + log.Info("storage: Redis: exiting. mochi does not clear data in redis when exiting. mochi keys have prefix " + PrefixKey) + err = ps.UniversalClient.Close() + ps.UniversalClient = nil } c.Done(err) }() diff --git a/storage/redis/storage_test.go b/storage/redis/storage_test.go index 540248d..bf53779 100644 --- a/storage/redis/storage_test.go +++ b/storage/redis/storage_test.go @@ -12,13 +12,11 @@ import ( ) var cfg = Config{ - Addresses: []string{"localhost:6379"}, - GarbageCollectionInterval: 10 * time.Minute, - PrometheusReportingInterval: 10 * time.Minute, - PeerLifetime: 30 * time.Minute, - ReadTimeout: 10 * time.Second, - WriteTimeout: 10 * time.Second, - ConnectTimeout: 10 * time.Second, + Addresses: []string{"localhost:6379"}, + PeerLifetime: 30 * time.Minute, + ReadTimeout: 10 * time.Second, + WriteTimeout: 10 * time.Second, + ConnectTimeout: 10 * time.Second, } func createNew() s.PeerStorage { @@ -26,7 +24,7 @@ func createNew() s.PeerStorage { var err error ps, err = New(cfg) if err != nil { - fmt.Println("unable to create real Redis connection: ", err, " using simulator") + fmt.Println("unable to create real RedisDriver connection: ", err, " using simulator") var rs *miniredis.Miniredis rs, err = miniredis.Run() if err != nil { diff --git a/storage/storage.go b/storage/storage.go index 6dbef8e..0214197 100644 --- a/storage/storage.go +++ b/storage/storage.go @@ -13,30 +13,83 @@ import ( "github.com/sot-tech/mochi/pkg/stop" ) +const ( + defaultPrometheusReportingInterval = time.Second * 1 + defaultGarbageCollectionInterval = time.Minute * 3 + defaultPeerLifetime = time.Minute * 30 +) + var ( driversM sync.RWMutex - drivers = make(map[string]Driver) + drivers = make(map[string]Builder) ) +// Config holds configuration for periodic execution tasks, which may or may not implement +// specific storage (such as GCAware or StatisticsAware) +type Config struct { + // GarbageCollectionInterval period of GC + GarbageCollectionInterval time.Duration `cfg:"gc_interval"` + // PeerLifetime maximum TTL of peer + PeerLifetime time.Duration `cfg:"peer_lifetime"` + // PrometheusReportingInterval period of statistics data polling + PrometheusReportingInterval time.Duration `cfg:"prometheus_reporting_interval"` +} + +func (c Config) sanitizeGCConfig() (gcInterval, peerTTL time.Duration) { + if c.GarbageCollectionInterval <= 0 { + gcInterval = defaultGarbageCollectionInterval + log.Warn("falling back to default configuration", log.Fields{ + "Name": "GarbageCollectionInterval", + "Provided": c.GarbageCollectionInterval, + "Default": defaultGarbageCollectionInterval, + }) + } else { + gcInterval = c.GarbageCollectionInterval + } + if c.PeerLifetime <= 0 { + peerTTL = defaultPeerLifetime + log.Warn("falling back to default configuration", log.Fields{ + "Name": "PeerLifetime", + "Provided": c.PeerLifetime, + "Default": defaultPeerLifetime, + }) + } else { + peerTTL = c.PeerLifetime + } + return +} + +func (c Config) sanitizeStatisticsConfig() (statInterval time.Duration) { + if c.PrometheusReportingInterval < 0 { + statInterval = defaultPrometheusReportingInterval + log.Warn("falling back to default configuration", log.Fields{ + "Name": "PrometheusReportingInterval", + "Provided": c.PrometheusReportingInterval, + "Default": defaultPrometheusReportingInterval, + }) + } + return +} + // Entry - some key-value pair, used for BulkPut type Entry struct { Key string Value any } -// Driver is the interface used to initialize a new type of PeerStorage. -type Driver interface { - NewStorage(cfg conf.MapConfig) (PeerStorage, error) -} +// Builder is the function used to initialize a new type of PeerStorage. +type Builder func(cfg conf.MapConfig) (PeerStorage, error) -// ErrResourceDoesNotExist is the error returned by all delete methods and the -// AnnouncePeers method of the PeerStorage interface if the requested resource -// does not exist. -var ErrResourceDoesNotExist = bittorrent.ClientError("resource does not exist") +var ( + // ErrResourceDoesNotExist is the error returned by all delete methods and the + // AnnouncePeers method of the PeerStorage interface if the requested resource + // does not exist. + ErrResourceDoesNotExist = bittorrent.ClientError("resource does not exist") -// ErrDriverDoesNotExist is the error returned by NewStorage when a peer -// store driver with that name does not exist. -var ErrDriverDoesNotExist = errors.New("peer store driver with that name does not exist") + // ErrDriverDoesNotExist is the error returned by NewStorage when a peer + // store driver with that name does not exist. + ErrDriverDoesNotExist = errors.New("peer store driver with that name does not exist") +) // DataStorage is the interface, used for implementing store for arbitrary data type DataStorage interface { @@ -59,6 +112,23 @@ type DataStorage interface { Preservable() bool } +// GCAware is the interface for storage that supports periodic +// stale peers collection +type GCAware interface { + // ScheduleGC used to delete stale data, such as timed out seeders/leechers. + // Note: implementation must create subroutine by itself + ScheduleGC(gcInterval, peerLifeTime time.Duration) +} + +// StatisticsAware is the interface for storage that supports periodic +// statistics collection +type StatisticsAware interface { + // ScheduleStatisticsCollection used to receive statistics information about hashes, + // seeders and leechers count. + // Note: implementation must create subroutine by itself + ScheduleStatisticsCollection(reportInterval time.Duration) +} + // PeerStorage is an interface that abstracts the interactions of storing and // manipulating Peers such that it can be implemented for various data stores. // @@ -70,10 +140,7 @@ type DataStorage interface { // to track the last activity for that Peer. The entire database can then // be scanned periodically and too old Peers removed. The intervals and // durations involved should be configurable. -// - IPv4 and IPv6 swarms must be isolated from each other. -// A PeerStorage must be able to transparently handle IPv4 and IPv6 Peers, but -// must separate them. AnnouncePeers and ScrapeSwarm must return information -// about the Swarm matching the given AddressFamily only. +// - IPv4 and IPv6 swarms may be isolated from each other. // // Implementations can be tested against this interface using the tests in // storage_test.go and the benchmarks in storage_bench.go. @@ -136,9 +203,6 @@ type PeerStorage interface { // If the Swarm does not exist, an empty Scrape and no error is returned. ScrapeSwarm(infoHash bittorrent.InfoHash, peer bittorrent.Peer) bittorrent.Scrape - // GC used to delete stale data, such as timed out seeders/leechers - GC(cutoff time.Time) - // Stopper is an interface that expects a Stop method to stop the PeerStorage. // For more details see the documentation in the stop package. stop.Stopper @@ -148,41 +212,62 @@ type PeerStorage interface { log.Fielder } -// RegisterDriver makes a Driver available by the provided name. +// RegisterBuilder makes a Builder available by the provided name. // // If called twice with the same name, the name is blank, or if the provided // Driver is nil, this function panics. -func RegisterDriver(name string, d Driver) { +func RegisterBuilder(name string, b Builder) { if name == "" { - panic("storage: could not register a Driver with an empty name") + panic("storage: could not register a Builder with an empty name") } - if d == nil { - panic("storage: could not register a nil Driver") + if b == nil { + panic("storage: could not register a nil Builder") } driversM.Lock() defer driversM.Unlock() if _, dup := drivers[name]; dup { - panic("storage: RegisterDriver called twice for " + name) + panic("storage: RegisterBuilder called twice for " + name) } - drivers[name] = d + drivers[name] = b } // NewStorage attempts to initialize a new PeerStorage instance from // the list of registered Drivers. // -// If a driver does not exist, returns ErrDriverDoesNotExist. +// If a builder does not exist, returns ErrDriverDoesNotExist. func NewStorage(name string, cfg conf.MapConfig) (ps PeerStorage, err error) { driversM.RLock() defer driversM.RUnlock() - var d Driver - d, ok := drivers[name] + var b Builder + b, ok := drivers[name] if !ok { return nil, ErrDriverDoesNotExist } - return d.NewStorage(cfg) + c := new(Config) + if err = cfg.Unmarshal(c); err != nil { + return + } + + if ps, err = b(cfg); err != nil { + return + } + + if gc, isOk := ps.(GCAware); isOk { + gc.ScheduleGC(c.sanitizeGCConfig()) + } + + if statInterval := c.sanitizeStatisticsConfig(); statInterval > 0 { + if st, isOk := ps.(StatisticsAware); isOk { + st.ScheduleStatisticsCollection(statInterval) + } + } else { + log.Info("prometheus disabled because of zero reporting interval") + } + + return } diff --git a/storage/test/storage_test_base.go b/storage/test/storage_test_base.go index 0ec62f0..5b7dc77 100644 --- a/storage/test/storage_test_base.go +++ b/storage/test/storage_test_base.go @@ -5,7 +5,6 @@ package test import ( "errors" "testing" - "time" "github.com/stretchr/testify/require" @@ -257,23 +256,6 @@ func (th *testHolder) CustomBulkPutContainsLoadDelete(t *testing.T) { } } -func (th *testHolder) GC(t *testing.T) { - for _, c := range testData { - require.Nil(t, th.st.PutSeeder(c.ih, c.peer)) - require.Nil(t, th.st.PutSeeder(c.ih, v4Peer)) - require.Nil(t, th.st.PutSeeder(c.ih, v6Peer)) - } - th.st.GC(time.Now().Add(time.Hour)) - for _, c := range testData { - peers, err := th.st.AnnouncePeers(c.ih, false, 100, v4Peer) - if errors.Is(err, storage.ErrResourceDoesNotExist) { - err = nil - } - require.Nil(t, err) - require.Empty(t, peers) - } -} - // RunTests tests a PeerStorage implementation against the interface. func RunTests(t *testing.T, p storage.PeerStorage) { th := testHolder{st: p} @@ -305,8 +287,6 @@ func RunTests(t *testing.T, p storage.PeerStorage) { t.Run("CustomPutContainsLoadDelete", th.CustomPutContainsLoadDelete) t.Run("CustomBulkPutContainsLoadDelete", th.CustomBulkPutContainsLoadDelete) - t.Run("GC", th.GC) - e := th.st.Stop() require.Nil(t, <-e) }