(wip) create new driver for KeyDB

* move GC and prometheus aware storage functions to separate interfaces
This commit is contained in:
Lawrence, Rendall
2022-04-23 01:28:06 +03:00
parent a1ce79b003
commit 4131c64e89
8 changed files with 527 additions and 432 deletions

View File

@@ -54,7 +54,7 @@ func build(options conf.MapConfig, st storage.PeerStorage) (h middleware.Hook, e
var ds storage.DataStorage = st
if !cfg.Preserve && ds.Preservable() {
ds = memory.NewDataStore()
ds = memory.NewDataStorage()
}
var c container.Container

126
storage/keydb/storage.go Normal file
View File

@@ -0,0 +1,126 @@
// Package keydb implements the storage interface.
// This storage mostly is the same as redis, but it collects peers
// not in hashes, but in sets and uses KeyDB-specific command
// `EXPIREMEMBER` and, so it does not need garbage collection.
// Note: this storage also does not support statistics collection
package keydb
import (
"context"
"errors"
"github.com/go-redis/redis/v8"
"github.com/sot-tech/mochi/bittorrent"
"github.com/sot-tech/mochi/pkg/conf"
"github.com/sot-tech/mochi/pkg/log"
"github.com/sot-tech/mochi/pkg/stop"
"github.com/sot-tech/mochi/storage"
r "github.com/sot-tech/mochi/storage/redis"
)
// Name is name of this storage
const (
Name = "keydb"
expireMemberCmd = "EXPIREMEMBER"
)
// ErrNotKeyDB returned from initializer if connected does not support KeyDB
// specific command (EXPIREMEMBER)
var ErrNotKeyDB = errors.New("provided instance seems not KeyDB")
func init() {
// Register the storage driver.
storage.RegisterBuilder(Name, Builder)
}
func Builder(icfg conf.MapConfig) (storage.PeerStorage, error) {
var cfg r.Config
var err error
if err = icfg.Unmarshal(&cfg); err != nil {
return nil, err
}
return New(cfg)
}
func New(cfg r.Config) (*store, error) {
var err error
if cfg, err = cfg.Validate(); err != nil {
return nil, err
}
var rs r.Connection
if rs, err = cfg.Connect(); err != nil {
return nil, err
}
cmd := redis.NewCommandsInfoCmd(context.Background(), "COMMAND", "INFO", expireMemberCmd)
_ = rs.Process(context.Background(), cmd)
err = r.AsNil(cmd.Err())
if err == nil && len(cmd.Val()) == 0 {
err = ErrNotKeyDB
}
var st *store
if err == nil {
st = &store{rs, cfg.LogFields()}
}
return st, err
}
type store struct {
r.Connection
logFields log.Fields
}
func (s store) PutSeeder(infoHash bittorrent.InfoHash, peer bittorrent.Peer) error {
// TODO implement me
panic("implement me")
}
func (s store) DeleteSeeder(infoHash bittorrent.InfoHash, peer bittorrent.Peer) error {
// TODO implement me
panic("implement me")
}
func (s store) PutLeecher(infoHash bittorrent.InfoHash, peer bittorrent.Peer) error {
// TODO implement me
panic("implement me")
}
func (s store) DeleteLeecher(infoHash bittorrent.InfoHash, peer bittorrent.Peer) error {
// TODO implement me
panic("implement me")
}
func (s store) GraduateLeecher(infoHash bittorrent.InfoHash, peer bittorrent.Peer) error {
// TODO implement me
panic("implement me")
}
func (s store) AnnouncePeers(infoHash bittorrent.InfoHash, seeder bool, numWant int, peer bittorrent.Peer) (peers []bittorrent.Peer, err error) {
// TODO implement me
panic("implement me")
}
func (s store) ScrapeSwarm(infoHash bittorrent.InfoHash, peer bittorrent.Peer) bittorrent.Scrape {
// TODO implement me
panic("implement me")
}
func (s *store) Stop() stop.Result {
c := make(stop.Channel)
if s.UniversalClient != nil {
c.Done(s.UniversalClient.Close())
s.UniversalClient = nil
}
return c.Result()
}
func (s store) LogFields() log.Fields {
return s.logFields
}

View File

@@ -21,25 +21,19 @@ import (
"github.com/sot-tech/mochi/storage"
)
// Name is the name by which this peer store is registered with Conf.
const Name = "memory"
// Default config constants.
const (
defaultShardCount = 1024
defaultPrometheusReportingInterval = time.Second * 1
defaultGarbageCollectionInterval = time.Minute * 3
defaultPeerLifetime = time.Minute * 30
// Name is the name by which this peer store is registered with Conf.
Name = "memory"
defaultShardCount = 1024
)
func init() {
// Register the storage driver.
storage.RegisterDriver(Name, driver{})
storage.RegisterBuilder(Name, Builder)
}
type driver struct{}
func (d driver) NewStorage(icfg conf.MapConfig) (storage.PeerStorage, error) {
func Builder(icfg conf.MapConfig) (storage.PeerStorage, error) {
var cfg Config
if err := icfg.Unmarshal(&cfg); err != nil {
return nil, err
@@ -49,20 +43,14 @@ func (d driver) NewStorage(icfg conf.MapConfig) (storage.PeerStorage, error) {
// Config holds the configuration of a memory PeerStorage.
type Config struct {
GarbageCollectionInterval time.Duration `cfg:"gc_interval"`
PrometheusReportingInterval time.Duration `cfg:"prometheus_reporting_interval"`
PeerLifetime time.Duration `cfg:"peer_lifetime"`
ShardCount int `cfg:"shard_count"`
ShardCount int `cfg:"shard_count"`
}
// LogFields renders the current config as a set of Logrus fields.
func (cfg Config) LogFields() log.Fields {
return log.Fields{
"name": Name,
"gcInterval": cfg.GarbageCollectionInterval,
"promReportInterval": cfg.PrometheusReportingInterval,
"peerLifetime": cfg.PeerLifetime,
"shardCount": cfg.ShardCount,
"Name": Name,
"ShardCount": cfg.ShardCount,
}
}
@@ -76,36 +64,9 @@ func (cfg Config) Validate() Config {
if cfg.ShardCount <= 0 || cfg.ShardCount > (math.MaxInt/2) {
validcfg.ShardCount = defaultShardCount
log.Warn("falling back to default configuration", log.Fields{
"name": Name + ".ShardCount",
"provided": cfg.ShardCount,
"default": validcfg.ShardCount,
})
}
if cfg.GarbageCollectionInterval <= 0 {
validcfg.GarbageCollectionInterval = defaultGarbageCollectionInterval
log.Warn("falling back to default configuration", log.Fields{
"name": Name + ".GarbageCollectionInterval",
"provided": cfg.GarbageCollectionInterval,
"default": validcfg.GarbageCollectionInterval,
})
}
if cfg.PrometheusReportingInterval < 0 {
validcfg.PrometheusReportingInterval = defaultPrometheusReportingInterval
log.Warn("falling back to default configuration", log.Fields{
"name": Name + ".PrometheusReportingInterval",
"provided": cfg.PrometheusReportingInterval,
"default": validcfg.PrometheusReportingInterval,
})
}
if cfg.PeerLifetime <= 0 {
validcfg.PeerLifetime = defaultPeerLifetime
log.Warn("falling back to default configuration", log.Fields{
"name": Name + ".PeerLifetime",
"provided": cfg.PeerLifetime,
"default": validcfg.PeerLifetime,
"Name": Name + ".ShardCount",
"Provided": cfg.ShardCount,
"Default": validcfg.ShardCount,
})
}
@@ -118,7 +79,7 @@ func NewPeerStorage(provided Config) (storage.PeerStorage, error) {
ps := &peerStore{
cfg: cfg,
shards: make([]*peerShard, cfg.ShardCount*2),
DataStorage: NewDataStore(),
DataStorage: NewDataStorage(),
closed: make(chan struct{}),
}
@@ -126,50 +87,6 @@ func NewPeerStorage(provided Config) (storage.PeerStorage, error) {
ps.shards[i] = &peerShard{swarms: make(map[bittorrent.InfoHash]swarm)}
}
// Start a goroutine for garbage collection.
ps.wg.Add(1)
go func() {
defer ps.wg.Done()
t := time.NewTimer(cfg.GarbageCollectionInterval)
defer t.Stop()
for {
select {
case <-ps.closed:
return
case <-t.C:
before := time.Now().Add(-cfg.PeerLifetime)
log.Debug("storage: purging peers with no announces since", log.Fields{"before": before})
start := time.Now()
ps.GC(before)
recordGCDuration(time.Since(start))
}
}
}()
if cfg.PrometheusReportingInterval > 0 {
// Start a goroutine for reporting statistics to Prometheus.
ps.wg.Add(1)
go func() {
defer ps.wg.Done()
t := time.NewTicker(cfg.PrometheusReportingInterval)
for {
select {
case <-ps.closed:
t.Stop()
return
case <-t.C:
if metrics.Enabled() {
before := time.Now()
ps.populateProm()
log.Debug("storage: populateProm() finished", log.Fields{"timeTaken": time.Since(before)})
}
}
}
}()
} else {
log.Info("prometheus disabled because of zero reporting interval")
}
return ps, nil
}
@@ -197,22 +114,60 @@ type peerStore struct {
var _ storage.PeerStorage = &peerStore{}
// populateProm aggregates metrics over all shards and then posts them to
// prometheus.
func (ps *peerStore) populateProm() {
var numInfohashes, numSeeders, numLeechers uint64
func (ps *peerStore) ScheduleGC(gcInterval, peerLifeTime time.Duration) {
ps.wg.Add(1)
go func() {
defer ps.wg.Done()
t := time.NewTimer(gcInterval)
defer t.Stop()
for {
select {
case <-ps.closed:
return
case <-t.C:
before := time.Now().Add(-peerLifeTime)
log.Debug("storage: Memory purging peers with no announces since", log.Fields{"before": before})
start := time.Now()
ps.gc(before)
recordGCDuration(time.Since(start))
}
}
}()
}
for _, s := range ps.shards {
s.RLock()
numInfohashes += uint64(len(s.swarms))
numSeeders += s.numSeeders
numLeechers += s.numLeechers
s.RUnlock()
}
func (ps *peerStore) ScheduleStatisticsCollection(reportInterval time.Duration) {
ps.wg.Add(1)
go func() {
defer ps.wg.Done()
t := time.NewTicker(reportInterval)
for {
select {
case <-ps.closed:
t.Stop()
return
case <-t.C:
if metrics.Enabled() {
before := time.Now()
// aggregates metrics over all shards and then posts them to
// prometheus.
var numInfohashes, numSeeders, numLeechers uint64
storage.PromInfoHashesCount.Set(float64(numInfohashes))
storage.PromSeedersCount.Set(float64(numSeeders))
storage.PromLeechersCount.Set(float64(numLeechers))
for _, s := range ps.shards {
s.RLock()
numInfohashes += uint64(len(s.swarms))
numSeeders += s.numSeeders
numLeechers += s.numLeechers
s.RUnlock()
}
storage.PromInfoHashesCount.Set(float64(numInfohashes))
storage.PromSeedersCount.Set(float64(numSeeders))
storage.PromLeechersCount.Set(float64(numLeechers))
log.Debug("storage: Memory: populateProm() finished", log.Fields{"timeTaken": time.Since(before)})
}
}
}
}()
}
// recordGCDuration records the duration of a GC sweep.
@@ -485,8 +440,8 @@ func (ps *peerStore) ScrapeSwarm(ih bittorrent.InfoHash, peer bittorrent.Peer) (
return
}
// NewDataStore creates new in-memory data store
func NewDataStore() storage.DataStorage {
// NewDataStorage creates new in-memory data store
func NewDataStorage() storage.DataStorage {
return new(dataStore)
}
@@ -553,7 +508,7 @@ func (*dataStore) Preservable() bool {
//
// This function must be able to execute while other methods on this interface
// are being executed in parallel.
func (ps *peerStore) GC(cutoff time.Time) {
func (ps *peerStore) gc(cutoff time.Time) {
select {
case <-ps.closed:
return

View File

@@ -2,19 +2,13 @@ package memory
import (
"testing"
"time"
"github.com/sot-tech/mochi/storage"
"github.com/sot-tech/mochi/storage/test"
)
func createNew() storage.PeerStorage {
ps, err := NewPeerStorage(Config{
ShardCount: 1024,
GarbageCollectionInterval: 10 * time.Minute,
PrometheusReportingInterval: 10 * time.Minute,
PeerLifetime: 30 * time.Minute,
})
ps, err := NewPeerStorage(Config{ShardCount: 1024})
if err != nil {
panic(err)
}

View File

@@ -1,4 +1,4 @@
// Package redis implements the storage interface for a Conf
// Package redis implements the storage interface.
// BitTorrent tracker keeping peer data in redis with hash.
// There two categories of hash:
//
@@ -10,7 +10,7 @@
// To save all the infohashes, used for garbage collection,
// metrics aggregation and leecher graduation
//
// Tree keys are used to record the count of seeders and leechers.
// Two keys are used to record the count of seeders and leechers.
//
// - CHI_C_S (key type)
// To record the number of seeders.
@@ -38,37 +38,37 @@ import (
"github.com/sot-tech/mochi/storage"
)
// Name is the name by which this peer store is registered with Conf.
const Name = "redis"
// Default config constants.
const (
defaultPrometheusReportingInterval = time.Second * 1
defaultGarbageCollectionInterval = time.Minute * 3
defaultPeerLifetime = time.Minute * 30
defaultRedisAddress = "127.0.0.1:6379"
defaultReadTimeout = time.Second * 15
defaultWriteTimeout = time.Second * 15
defaultConnectTimeout = time.Second * 15
prefixKey = "CHI_"
ihKey = "CHI_I"
ihSeederKey = "CHI_S_"
ihLeecherKey = "CHI_L_"
cntSeederKey = "CHI_C_S"
cntLeecherKey = "CHI_C_L"
// Name is the name by which this peer store is registered with Conf.
Name = "redis"
// Default config constants.
defaultRedisAddress = "127.0.0.1:6379"
defaultReadTimeout = time.Second * 15
defaultWriteTimeout = time.Second * 15
defaultConnectTimeout = time.Second * 15
// PrefixKey prefix which will be prepended to ctx argument in storage.DataStorage calls
PrefixKey = "CHI_"
// IHKey redis hash key for all info hashes
IHKey = "CHI_I"
// IHSeederKey redis hash key prefix for seeders
IHSeederKey = "CHI_S_"
// IHLeecherKey redis hash key prefix for leechers
IHLeecherKey = "CHI_L_"
// CountSeederKey redis key for seeder count
CountSeederKey = "CHI_C_S"
// CountLeecherKey redis key for leecher count
CountLeecherKey = "CHI_C_L"
)
// ErrSentinelAndClusterChecked returned from initializer if both Config.Sentinel and Config.Cluster provided
var ErrSentinelAndClusterChecked = errors.New("unable to use both cluster and sentinel mode")
func init() {
// Register the storage driver.
storage.RegisterDriver(Name, driver{})
// Register the storage RedisDriver.
storage.RegisterBuilder(Name, Builder)
}
type driver struct{}
func (d driver) NewStorage(icfg conf.MapConfig) (storage.PeerStorage, error) {
func Builder(icfg conf.MapConfig) (storage.PeerStorage, error) {
// Unmarshal the bytes into the proper config type.
var cfg Config
@@ -79,35 +79,55 @@ func (d driver) NewStorage(icfg conf.MapConfig) (storage.PeerStorage, error) {
return New(cfg)
}
func New(cfg Config) (*store, error) {
cfg, err := cfg.Validate()
if err != nil {
return nil, err
}
rs, err := cfg.Connect()
if err != nil {
return nil, err
}
return &store{
Connection: rs,
closed: make(chan any),
wg: sync.WaitGroup{},
logFields: cfg.LogFields(),
}, nil
}
// Config holds the configuration of a redis PeerStorage.
type Config struct {
GarbageCollectionInterval time.Duration `cfg:"gc_interval"`
PrometheusReportingInterval time.Duration `cfg:"prometheus_reporting_interval"`
PeerLifetime time.Duration `cfg:"peer_lifetime"`
Addresses []string
DB int
PoolSize int `cfg:"pool_size"`
Login string
Password string
Sentinel bool
SentinelMaster string `cfg:"sentinel_master"`
Cluster bool
ReadTimeout time.Duration `cfg:"read_timeout"`
WriteTimeout time.Duration `cfg:"write_timeout"`
ConnectTimeout time.Duration `cfg:"connect_timeout"`
PeerLifetime time.Duration `cfg:"peer_lifetime"`
Addresses []string
DB int
PoolSize int `cfg:"pool_size"`
Login string
Password string
Sentinel bool
SentinelMaster string `cfg:"sentinel_master"`
Cluster bool
ReadTimeout time.Duration `cfg:"read_timeout"`
WriteTimeout time.Duration `cfg:"write_timeout"`
ConnectTimeout time.Duration `cfg:"connect_timeout"`
}
// LogFields renders the current config as a set of Logrus fields.
func (cfg Config) LogFields() log.Fields {
return log.Fields{
"name": Name,
"gcInterval": cfg.GarbageCollectionInterval,
"promReportInterval": cfg.PrometheusReportingInterval,
"peerLifetime": cfg.PeerLifetime,
"addresses": cfg.Addresses,
"readTimeout": cfg.ReadTimeout,
"writeTimeout": cfg.WriteTimeout,
"connectTimeout": cfg.ConnectTimeout,
"Name": Name,
"PeerLifetime": cfg.PeerLifetime,
"Addresses": cfg.Addresses,
"DB": cfg.DB,
"PoolSize": cfg.PoolSize,
"Sentinel": cfg.Sentinel,
"SentinelMaster": cfg.SentinelMaster,
"Cluster": cfg.Cluster,
"ReadTimeout": cfg.ReadTimeout,
"WriteTimeout": cfg.WriteTimeout,
"ConnectTimeout": cfg.ConnectTimeout,
}
}
@@ -134,78 +154,47 @@ func (cfg Config) Validate() (Config, error) {
if len(cfg.Addresses) == 0 {
validCfg.Addresses = []string{defaultRedisAddress}
log.Warn("falling back to default configuration", log.Fields{
"name": Name + ".Addresses",
"provided": cfg.Addresses,
"default": validCfg.Addresses,
"Name": Name + ".Addresses",
"Provided": cfg.Addresses,
"Default": validCfg.Addresses,
})
}
if cfg.ReadTimeout <= 0 {
validCfg.ReadTimeout = defaultReadTimeout
log.Warn("falling back to default configuration", log.Fields{
"name": Name + ".ReadTimeout",
"provided": cfg.ReadTimeout,
"default": validCfg.ReadTimeout,
"Name": Name + ".ReadTimeout",
"Provided": cfg.ReadTimeout,
"Default": validCfg.ReadTimeout,
})
}
if cfg.WriteTimeout <= 0 {
validCfg.WriteTimeout = defaultWriteTimeout
log.Warn("falling back to default configuration", log.Fields{
"name": Name + ".WriteTimeout",
"provided": cfg.WriteTimeout,
"default": validCfg.WriteTimeout,
"Name": Name + ".WriteTimeout",
"Provided": cfg.WriteTimeout,
"Default": validCfg.WriteTimeout,
})
}
if cfg.ConnectTimeout <= 0 {
validCfg.ConnectTimeout = defaultConnectTimeout
log.Warn("falling back to default configuration", log.Fields{
"name": Name + ".ConnectTimeout",
"provided": cfg.ConnectTimeout,
"default": validCfg.ConnectTimeout,
})
}
if cfg.GarbageCollectionInterval <= 0 {
validCfg.GarbageCollectionInterval = defaultGarbageCollectionInterval
log.Warn("falling back to default configuration", log.Fields{
"name": Name + ".GarbageCollectionInterval",
"provided": cfg.GarbageCollectionInterval,
"default": validCfg.GarbageCollectionInterval,
})
}
if cfg.PrometheusReportingInterval < 0 {
validCfg.PrometheusReportingInterval = defaultPrometheusReportingInterval
log.Warn("falling back to default configuration", log.Fields{
"name": Name + ".PrometheusReportingInterval",
"provided": cfg.PrometheusReportingInterval,
"default": validCfg.PrometheusReportingInterval,
})
}
if cfg.PeerLifetime <= 0 {
validCfg.PeerLifetime = defaultPeerLifetime
log.Warn("falling back to default configuration", log.Fields{
"name": Name + ".PeerLifetime",
"provided": cfg.PeerLifetime,
"default": validCfg.PeerLifetime,
"Name": Name + ".ConnectTimeout",
"Provided": cfg.ConnectTimeout,
"Default": validCfg.ConnectTimeout,
})
}
return validCfg, nil
}
func connect(cfg Config) (*store, error) {
var err error
db := &store{
// FIXME: get context from parent and put into GC, middleware functions should use own ctx
ctx: context.TODO(),
}
func (cfg Config) Connect() (con Connection, err error) {
var rs redis.UniversalClient
switch {
case cfg.Cluster:
db.con = redis.NewClusterClient(&redis.ClusterOptions{
rs = redis.NewClusterClient(&redis.ClusterOptions{
Addrs: cfg.Addresses,
Username: cfg.Login,
Password: cfg.Password,
@@ -215,7 +204,7 @@ func connect(cfg Config) (*store, error) {
PoolSize: cfg.PoolSize,
})
case cfg.Sentinel:
db.con = redis.NewFailoverClient(&redis.FailoverOptions{
rs = redis.NewFailoverClient(&redis.FailoverOptions{
SentinelAddrs: cfg.Addresses,
SentinelUsername: cfg.Login,
SentinelPassword: cfg.Password,
@@ -227,7 +216,7 @@ func connect(cfg Config) (*store, error) {
DB: cfg.DB,
})
default:
db.con = redis.NewClient(&redis.Options{
rs = redis.NewClient(&redis.Options{
Addr: cfg.Addresses[0],
Username: cfg.Login,
Password: cfg.Password,
@@ -238,106 +227,85 @@ func connect(cfg Config) (*store, error) {
DB: cfg.DB,
})
}
if err = db.con.Ping(db.ctx).Err(); err == nil && !errors.Is(err, redis.Nil) {
if err = rs.Ping(context.Background()).Err(); err == nil && !errors.Is(err, redis.Nil) {
err = nil
res, err := db.con.Do(db.ctx, "command", "info", "keydb.cron").Result()
err = asNil(err)
if err != nil {
log.Warn("storage: Redis: unable to determine if current instance is KeyDB", log.Fields{"Error": err})
} else {
db.isKeyDB = res != nil
}
} else {
_ = db.con.Close()
db = nil
_ = rs.Close()
rs = nil
}
return db, err
return Connection{rs}, err
}
// New creates a new PeerStorage backed by redis.
func New(conf Config) (storage.PeerStorage, error) {
cfg, err := conf.Validate()
if err != nil {
return nil, err
}
ps, err := connect(cfg)
if err != nil {
return nil, err
}
ps.closed = make(chan any)
ps.logFields = cfg.LogFields()
// Start a goroutine for garbage collection.
func (ps *store) ScheduleGC(gcInterval, peerLifeTime time.Duration) {
ps.wg.Add(1)
go ps.scheduleGC(cfg.GarbageCollectionInterval, cfg.PeerLifetime)
if cfg.PrometheusReportingInterval > 0 {
// Start a goroutine for reporting statistics to Prometheus.
ps.wg.Add(1)
go ps.schedulerProm(cfg.PrometheusReportingInterval)
} else {
log.Info("prometheus disabled because of zero reporting interval")
}
return ps, nil
}
func (ps *store) scheduleGC(gcInterval, peerLifeTime time.Duration) {
defer ps.wg.Done()
t := time.NewTimer(gcInterval)
defer t.Stop()
for {
select {
case <-ps.closed:
return
case <-t.C:
start := time.Now()
ps.GC(time.Now().Add(-peerLifeTime))
duration := time.Since(start).Milliseconds()
log.Debug("storage: Redis: recordGCDuration", log.Fields{"timeTaken(ms)": duration})
storage.PromGCDurationMilliseconds.Observe(float64(duration))
t.Reset(gcInterval)
}
}
}
func (ps *store) schedulerProm(reportInterval time.Duration) {
defer ps.wg.Done()
t := time.NewTicker(reportInterval)
for {
select {
case <-ps.closed:
t.Stop()
return
case <-t.C:
if metrics.Enabled() {
before := time.Now()
ps.populateProm()
log.Debug("storage: Redis: populateProm() finished", log.Fields{"timeTaken": time.Since(before)})
go func() {
defer ps.wg.Done()
t := time.NewTimer(gcInterval)
defer t.Stop()
for {
select {
case <-ps.closed:
return
case <-t.C:
start := time.Now()
ps.gc(time.Now().Add(-peerLifeTime))
duration := time.Since(start).Milliseconds()
log.Debug("storage: Redis: recordGCDuration", log.Fields{"timeTaken(ms)": duration})
storage.PromGCDurationMilliseconds.Observe(float64(duration))
t.Reset(gcInterval)
}
}
}
}()
}
func (ps *store) ScheduleStatisticsCollection(reportInterval time.Duration) {
ps.wg.Add(1)
go func() {
defer ps.wg.Done()
t := time.NewTicker(reportInterval)
for {
select {
case <-ps.closed:
t.Stop()
return
case <-t.C:
if metrics.Enabled() {
before := time.Now()
// populateProm aggregates metrics over all groups and then posts them to
// prometheus.
numInfoHashes := ps.count(IHKey, true)
numSeeders := ps.count(CountSeederKey, false)
numLeechers := ps.count(CountLeecherKey, false)
storage.PromInfoHashesCount.Set(float64(numInfoHashes))
storage.PromSeedersCount.Set(float64(numSeeders))
storage.PromLeechersCount.Set(float64(numLeechers))
log.Debug("storage: Redis: populateProm() finished", log.Fields{"timeTaken": time.Since(before)})
}
}
}
}()
}
type Connection struct {
redis.UniversalClient
}
type store struct {
con redis.UniversalClient
ctx context.Context
Connection
closed chan any
wg sync.WaitGroup
logFields log.Fields
isKeyDB bool
}
func (ps *store) count(key string, getLength bool) (n uint64) {
var err error
if getLength {
n, err = ps.con.SCard(ps.ctx, key).Uint64()
n, err = ps.SCard(context.Background(), key).Uint64()
} else {
n, err = ps.con.Get(ps.ctx, key).Uint64()
n, err = ps.Get(context.Background(), key).Uint64()
}
err = asNil(err)
err = AsNil(err)
if err != nil {
log.Error("storage: Redis: GET/SCARD failure", log.Fields{
"key": key,
@@ -347,24 +315,12 @@ func (ps *store) count(key string, getLength bool) (n uint64) {
return
}
// populateProm aggregates metrics over all groups and then posts them to
// prometheus.
func (ps *store) populateProm() {
numInfoHashes := ps.count(ihKey, true)
numSeeders := ps.count(cntSeederKey, false)
numLeechers := ps.count(cntLeecherKey, false)
storage.PromInfoHashesCount.Set(float64(numInfoHashes))
storage.PromSeedersCount.Set(float64(numSeeders))
storage.PromLeechersCount.Set(float64(numLeechers))
}
func (ps *store) getClock() int64 {
return timecache.NowUnixNano()
}
func (ps *store) tx(txf func(tx redis.Pipeliner) error) (err error) {
if pipe, txErr := ps.con.TxPipelined(ps.ctx, txf); txErr == nil {
if pipe, txErr := ps.TxPipelined(context.TODO(), txf); txErr == nil {
errs := make([]string, 0)
for _, c := range pipe {
if err := c.Err(); err != nil {
@@ -380,7 +336,7 @@ func (ps *store) tx(txf func(tx redis.Pipeliner) error) (err error) {
return
}
func asNil(err error) error {
func AsNil(err error) error {
if err == nil || errors.Is(err, redis.Nil) {
return nil
}
@@ -394,13 +350,13 @@ func (ps *store) putPeer(infoHashKey, peerCountKey, peerId string) error {
"PeerId": peerId,
})
return ps.tx(func(tx redis.Pipeliner) (err error) {
if err = tx.HSet(ps.ctx, infoHashKey, peerId, ps.getClock()).Err(); err != nil {
if err = tx.HSet(context.TODO(), infoHashKey, peerId, ps.getClock()).Err(); err != nil {
return
}
if err = tx.Incr(ps.ctx, peerCountKey).Err(); err != nil {
if err = tx.Incr(context.TODO(), peerCountKey).Err(); err != nil {
return
}
err = tx.SAdd(ps.ctx, ihKey, infoHashKey).Err()
err = tx.SAdd(context.TODO(), IHKey, infoHashKey).Err()
return
})
}
@@ -411,13 +367,13 @@ func (ps *store) delPeer(infoHashKey, peerCountKey, peerId string) error {
"PeerCountKey": peerCountKey,
"PeerId": peerId,
})
deleted, err := ps.con.HDel(ps.ctx, infoHashKey, peerId).Uint64()
err = asNil(err)
deleted, err := ps.HDel(context.TODO(), infoHashKey, peerId).Uint64()
err = AsNil(err)
if err == nil {
if deleted == 0 {
err = storage.ErrResourceDoesNotExist
} else {
err = ps.con.Decr(ps.ctx, peerCountKey).Err()
err = ps.Decr(context.TODO(), peerCountKey).Err()
}
}
@@ -425,19 +381,19 @@ func (ps *store) delPeer(infoHashKey, peerCountKey, peerId string) error {
}
func (ps *store) PutSeeder(ih bittorrent.InfoHash, peer bittorrent.Peer) error {
return ps.putPeer(ihSeederKey+ih.RawString(), cntSeederKey, peer.RawString())
return ps.putPeer(IHSeederKey+ih.RawString(), CountSeederKey, peer.RawString())
}
func (ps *store) DeleteSeeder(ih bittorrent.InfoHash, peer bittorrent.Peer) error {
return ps.delPeer(ihSeederKey+ih.RawString(), cntSeederKey, peer.RawString())
return ps.delPeer(IHSeederKey+ih.RawString(), CountSeederKey, peer.RawString())
}
func (ps *store) PutLeecher(ih bittorrent.InfoHash, peer bittorrent.Peer) error {
return ps.putPeer(ihLeecherKey+ih.RawString(), cntLeecherKey, peer.RawString())
return ps.putPeer(IHLeecherKey+ih.RawString(), CountLeecherKey, peer.RawString())
}
func (ps *store) DeleteLeecher(ih bittorrent.InfoHash, peer bittorrent.Peer) error {
return ps.delPeer(ihLeecherKey+ih.RawString(), cntLeecherKey, peer.RawString())
return ps.delPeer(IHLeecherKey+ih.RawString(), CountLeecherKey, peer.RawString())
}
func (ps *store) GraduateLeecher(ih bittorrent.InfoHash, peer bittorrent.Peer) error {
@@ -447,24 +403,24 @@ func (ps *store) GraduateLeecher(ih bittorrent.InfoHash, peer bittorrent.Peer) e
})
infoHash, peerId := ih.RawString(), peer.RawString()
ihSeederKey, ihLeecherKey := ihSeederKey+infoHash, ihLeecherKey+infoHash
ihSeederKey, ihLeecherKey := IHSeederKey+infoHash, IHLeecherKey+infoHash
return ps.tx(func(tx redis.Pipeliner) error {
deleted, err := tx.HDel(ps.ctx, ihLeecherKey, peerId).Uint64()
err = asNil(err)
deleted, err := tx.HDel(context.TODO(), ihLeecherKey, peerId).Uint64()
err = AsNil(err)
if err == nil {
if deleted > 0 {
err = tx.Decr(ps.ctx, cntLeecherKey).Err()
err = tx.Decr(context.TODO(), CountLeecherKey).Err()
}
}
if err == nil {
err = tx.HSet(ps.ctx, ihSeederKey, peerId, ps.getClock()).Err()
err = tx.HSet(context.TODO(), ihSeederKey, peerId, ps.getClock()).Err()
}
if err == nil {
err = tx.Incr(ps.ctx, cntSeederKey).Err()
err = tx.Incr(context.TODO(), CountSeederKey).Err()
}
if err == nil {
err = tx.SAdd(ps.ctx, ihKey, ihSeederKey).Err()
err = tx.SAdd(context.TODO(), IHKey, ihSeederKey).Err()
}
return err
})
@@ -472,8 +428,8 @@ func (ps *store) GraduateLeecher(ih bittorrent.InfoHash, peer bittorrent.Peer) e
func (ps *store) getPeers(infoHashKey, except string, max int, v4Only bool) (peers []bittorrent.Peer, err error) {
var peerIds []string
peerIds, err = ps.con.HKeys(ps.ctx, infoHashKey).Result()
if err = asNil(err); err == nil {
peerIds, err = ps.HKeys(context.TODO(), infoHashKey).Result()
if err = AsNil(err); err == nil {
for _, peerId := range peerIds {
if peerId != except {
if p, err := bittorrent.NewPeer(peerId); err == nil {
@@ -505,16 +461,16 @@ func (ps *store) AnnouncePeers(ih bittorrent.InfoHash, seeder bool, numWant int,
infoHash, peerId, isV4 := ih.RawString(), peer.RawString(), peer.Addr().Is4()
if seeder {
peers, err = ps.getPeers(ihLeecherKey+infoHash, peerId, numWant, isV4)
peers, err = ps.getPeers(IHLeecherKey+infoHash, peerId, numWant, isV4)
} else {
// Append as many seeders as possible.
peers, err = ps.getPeers(ihSeederKey+infoHash, peerId, numWant, isV4)
peers, err = ps.getPeers(IHSeederKey+infoHash, peerId, numWant, isV4)
if err != nil {
return
}
if numWant -= len(peers); numWant > 0 {
if leechers, err := ps.getPeers(ihLeecherKey+infoHash, peerId, numWant, isV4); err == nil {
if leechers, err := ps.getPeers(IHLeecherKey+infoHash, peerId, numWant, isV4); err == nil {
peers = append(peers, leechers...)
} else {
log.Warn("storage: Redis: error occurred while receiving leechers", log.Fields{"InfoHash": ih, "Error": err})
@@ -530,16 +486,16 @@ func (ps *store) AnnouncePeers(ih bittorrent.InfoHash, seeder bool, numWant int,
}
func (ps *store) ScrapeSwarm(ih bittorrent.InfoHash, peer bittorrent.Peer) (resp bittorrent.Scrape) {
log.Debug("storage: Redis ScrapeSwarm", log.Fields{
log.Debug("storage: RedisDriver ScrapeSwarm", log.Fields{
"InfoHash": ih,
"Peer": peer,
})
resp.InfoHash = ih
infoHash := ih.RawString()
ihSeederKey, ihLeecherKey := ihSeederKey+infoHash, ihLeecherKey+infoHash
ihSeederKey, ihLeecherKey := IHSeederKey+infoHash, IHLeecherKey+infoHash
leechersLen, err := ps.con.HLen(ps.ctx, ihLeecherKey).Result()
err = asNil(err)
leechersLen, err := ps.HLen(context.TODO(), ihLeecherKey).Result()
err = AsNil(err)
if err != nil {
log.Error("storage: Redis: HLEN failure", log.Fields{
"InfoHashKey": ihLeecherKey,
@@ -548,8 +504,8 @@ func (ps *store) ScrapeSwarm(ih bittorrent.InfoHash, peer bittorrent.Peer) (resp
return
}
seedersLen, err := ps.con.HLen(ps.ctx, ihSeederKey).Result()
err = asNil(err)
seedersLen, err := ps.HLen(context.TODO(), ihSeederKey).Result()
err = AsNil(err)
if err != nil {
log.Error("storage: Redis: HLEN failure", log.Fields{
"InfoHashKey": ihSeederKey,
@@ -566,21 +522,21 @@ func (ps *store) ScrapeSwarm(ih bittorrent.InfoHash, peer bittorrent.Peer) (resp
const argNumErrorMsg = "ERR wrong number of arguments"
func (ps *store) Put(ctx string, values ...storage.Entry) (err error) {
func (ps Connection) Put(ctx string, values ...storage.Entry) (err error) {
if l := len(values); l > 0 {
if l == 1 {
err = ps.con.HSet(ps.ctx, prefixKey+ctx, values[0].Key, values[0].Value).Err()
err = ps.HSet(context.TODO(), PrefixKey+ctx, values[0].Key, values[0].Value).Err()
} else {
args := make([]any, 0, l*2)
for _, p := range values {
args = append(args, p.Key, p.Value)
}
err = ps.con.HSet(ps.ctx, prefixKey+ctx, args...).Err()
err = ps.HSet(context.TODO(), PrefixKey+ctx, args...).Err()
if err != nil {
if strings.Contains(err.Error(), argNumErrorMsg) {
log.Warn("This Redis version/implementation does not support variadic arguments for HSET")
log.Warn("This RedisDriver version/implementation does not support variadic arguments for HSET")
for _, p := range values {
if err = ps.con.HSet(ps.ctx, prefixKey+ctx, p.Key, p.Value).Err(); err != nil {
if err = ps.HSet(context.TODO(), PrefixKey+ctx, p.Key, p.Value).Err(); err != nil {
break
}
}
@@ -591,27 +547,27 @@ func (ps *store) Put(ctx string, values ...storage.Entry) (err error) {
return
}
func (ps *store) Contains(ctx string, key string) (bool, error) {
exist, err := ps.con.HExists(ps.ctx, prefixKey+ctx, key).Result()
return exist, asNil(err)
func (ps Connection) Contains(ctx string, key string) (bool, error) {
exist, err := ps.HExists(context.TODO(), PrefixKey+ctx, key).Result()
return exist, AsNil(err)
}
func (ps *store) Load(ctx string, key string) (v any, err error) {
v, err = ps.con.HGet(ps.ctx, prefixKey+ctx, key).Result()
func (ps Connection) Load(ctx string, key string) (v any, err error) {
v, err = ps.HGet(context.TODO(), PrefixKey+ctx, key).Result()
if err != nil && errors.Is(err, redis.Nil) {
v, err = nil, nil
}
return
}
func (ps *store) Delete(ctx string, keys ...string) (err error) {
func (ps Connection) Delete(ctx string, keys ...string) (err error) {
if len(keys) > 0 {
err = asNil(ps.con.HDel(ps.ctx, prefixKey+ctx, keys...).Err())
err = AsNil(ps.HDel(context.TODO(), PrefixKey+ctx, keys...).Err())
if err != nil {
if strings.Contains(err.Error(), argNumErrorMsg) {
log.Warn("This Redis version/implementation does not support variadic arguments for HDEL")
log.Warn("This RedisDriver version/implementation does not support variadic arguments for HDEL")
for _, k := range keys {
if err = asNil(ps.con.HDel(ps.ctx, prefixKey+ctx, k).Err()); err != nil {
if err = AsNil(ps.HDel(context.TODO(), PrefixKey+ctx, k).Err()); err != nil {
break
}
}
@@ -621,7 +577,7 @@ func (ps *store) Delete(ctx string, keys ...string) (err error) {
return
}
func (*store) Preservable() bool {
func (Connection) Preservable() bool {
return true
}
@@ -670,27 +626,27 @@ func (*store) Preservable() bool {
// - If the change happens after the HLEN, we will not even attempt to make the
// transaction. The infohash key will remain in the addressFamil hash and
// we'll attempt to clean it up the next time gc runs.
func (ps *store) GC(cutoff time.Time) {
func (ps *store) gc(cutoff time.Time) {
log.Debug("storage: Redis: purging peers with no announces since", log.Fields{"before": cutoff})
cutoffNanos := cutoff.UnixNano()
// list all infoHashKeys in the group
infoHashKeys, err := ps.con.SMembers(ps.ctx, ihKey).Result()
err = asNil(err)
infoHashKeys, err := ps.SMembers(context.Background(), IHKey).Result()
err = AsNil(err)
if err == nil {
for _, infoHashKey := range infoHashKeys {
var cntKey string
var seeder bool
if seeder = strings.HasPrefix(infoHashKey, ihSeederKey); seeder {
cntKey = cntSeederKey
} else if strings.HasPrefix(infoHashKey, ihLeecherKey) {
cntKey = cntLeecherKey
if seeder = strings.HasPrefix(infoHashKey, IHSeederKey); seeder {
cntKey = CountSeederKey
} else if strings.HasPrefix(infoHashKey, IHLeecherKey) {
cntKey = CountLeecherKey
} else {
log.Warn("storage: Redis: unexpected record found in info hash set", log.Fields{"InfoHashKey": infoHashKey})
continue
}
// list all (peer, timeout) pairs for the ih
peerList, err := ps.con.HGetAll(ps.ctx, infoHashKey).Result()
err = asNil(err)
peerList, err := ps.HGetAll(context.Background(), infoHashKey).Result()
err = AsNil(err)
if err == nil {
peersToRemove := make([]string, 0)
for peerId, timeStamp := range peerList {
@@ -709,14 +665,14 @@ func (ps *store) GC(cutoff time.Time) {
}
}
if len(peersToRemove) > 0 {
removedPeerCount, err := ps.con.HDel(ps.ctx, infoHashKey, peersToRemove...).Result()
err = asNil(err)
removedPeerCount, err := ps.HDel(context.Background(), infoHashKey, peersToRemove...).Result()
err = AsNil(err)
if err != nil {
if strings.Contains(err.Error(), argNumErrorMsg) {
log.Warn("This Redis version/implementation does not support variadic arguments for HDEL")
log.Warn("This RedisDriver version/implementation does not support variadic arguments for HDEL")
for _, k := range peersToRemove {
count, err := ps.con.HDel(ps.ctx, infoHashKey, k).Result()
err = asNil(err)
count, err := ps.HDel(context.Background(), infoHashKey, k).Result()
err = AsNil(err)
if err != nil {
log.Error("storage: Redis: unable to delete peer", log.Fields{
"InfoHashKey": infoHashKey,
@@ -736,7 +692,7 @@ func (ps *store) GC(cutoff time.Time) {
}
}
if removedPeerCount > 0 { // DECR seeder/leecher counter
if err = ps.con.DecrBy(ps.ctx, cntKey, removedPeerCount).Err(); err != nil {
if err = ps.DecrBy(context.Background(), cntKey, removedPeerCount).Err(); err != nil {
log.Error("storage: Redis: unable to decrement seeder/leecher peer count", log.Fields{
"InfoHashKey": infoHashKey,
"CountKey": cntKey,
@@ -746,15 +702,15 @@ func (ps *store) GC(cutoff time.Time) {
}
}
err = asNil(ps.con.Watch(ps.ctx, func(tx *redis.Tx) (err error) {
err = AsNil(ps.Watch(context.Background(), func(tx *redis.Tx) (err error) {
var infoHashCount uint64
infoHashCount, err = ps.con.HLen(ps.ctx, infoHashKey).Uint64()
err = asNil(err)
infoHashCount, err = ps.HLen(context.Background(), infoHashKey).Uint64()
err = AsNil(err)
if err == nil && infoHashCount == 0 {
// Empty hashes are not shown among existing keys,
// in other words, it's removed automatically after `HDEL` the last field.
// _, err := ps.con.Del(ps.ctx, infoHashKey)
err = asNil(ps.con.SRem(ps.ctx, ihKey, infoHashKey).Err())
// _, err := ps.Del(context.TODO(), infoHashKey)
err = AsNil(ps.SRem(context.Background(), IHKey, infoHashKey).Err())
}
return err
}, infoHashKey))
@@ -772,7 +728,7 @@ func (ps *store) GC(cutoff time.Time) {
}
}
} else {
log.Error("storage: Redis: unable to fetch info hash set", log.Fields{"HashSet": ihKey, "Error": err})
log.Error("storage: Redis: unable to fetch info hash set", log.Fields{"HashSet": IHKey, "Error": err})
}
}
@@ -784,9 +740,10 @@ func (ps *store) Stop() stop.Result {
}
ps.wg.Wait()
var err error
if ps.con != nil {
log.Info("storage: Redis: exiting. mochi does not clear data in redis when exiting. mochi keys have prefix " + prefixKey)
err = ps.con.Close()
if ps.UniversalClient != nil {
log.Info("storage: Redis: exiting. mochi does not clear data in redis when exiting. mochi keys have prefix " + PrefixKey)
err = ps.UniversalClient.Close()
ps.UniversalClient = nil
}
c.Done(err)
}()

View File

@@ -12,13 +12,11 @@ import (
)
var cfg = Config{
Addresses: []string{"localhost:6379"},
GarbageCollectionInterval: 10 * time.Minute,
PrometheusReportingInterval: 10 * time.Minute,
PeerLifetime: 30 * time.Minute,
ReadTimeout: 10 * time.Second,
WriteTimeout: 10 * time.Second,
ConnectTimeout: 10 * time.Second,
Addresses: []string{"localhost:6379"},
PeerLifetime: 30 * time.Minute,
ReadTimeout: 10 * time.Second,
WriteTimeout: 10 * time.Second,
ConnectTimeout: 10 * time.Second,
}
func createNew() s.PeerStorage {
@@ -26,7 +24,7 @@ func createNew() s.PeerStorage {
var err error
ps, err = New(cfg)
if err != nil {
fmt.Println("unable to create real Redis connection: ", err, " using simulator")
fmt.Println("unable to create real RedisDriver connection: ", err, " using simulator")
var rs *miniredis.Miniredis
rs, err = miniredis.Run()
if err != nil {

View File

@@ -13,30 +13,83 @@ import (
"github.com/sot-tech/mochi/pkg/stop"
)
const (
defaultPrometheusReportingInterval = time.Second * 1
defaultGarbageCollectionInterval = time.Minute * 3
defaultPeerLifetime = time.Minute * 30
)
var (
driversM sync.RWMutex
drivers = make(map[string]Driver)
drivers = make(map[string]Builder)
)
// Config holds configuration for periodic execution tasks, which may or may not implement
// specific storage (such as GCAware or StatisticsAware)
type Config struct {
// GarbageCollectionInterval period of GC
GarbageCollectionInterval time.Duration `cfg:"gc_interval"`
// PeerLifetime maximum TTL of peer
PeerLifetime time.Duration `cfg:"peer_lifetime"`
// PrometheusReportingInterval period of statistics data polling
PrometheusReportingInterval time.Duration `cfg:"prometheus_reporting_interval"`
}
func (c Config) sanitizeGCConfig() (gcInterval, peerTTL time.Duration) {
if c.GarbageCollectionInterval <= 0 {
gcInterval = defaultGarbageCollectionInterval
log.Warn("falling back to default configuration", log.Fields{
"Name": "GarbageCollectionInterval",
"Provided": c.GarbageCollectionInterval,
"Default": defaultGarbageCollectionInterval,
})
} else {
gcInterval = c.GarbageCollectionInterval
}
if c.PeerLifetime <= 0 {
peerTTL = defaultPeerLifetime
log.Warn("falling back to default configuration", log.Fields{
"Name": "PeerLifetime",
"Provided": c.PeerLifetime,
"Default": defaultPeerLifetime,
})
} else {
peerTTL = c.PeerLifetime
}
return
}
func (c Config) sanitizeStatisticsConfig() (statInterval time.Duration) {
if c.PrometheusReportingInterval < 0 {
statInterval = defaultPrometheusReportingInterval
log.Warn("falling back to default configuration", log.Fields{
"Name": "PrometheusReportingInterval",
"Provided": c.PrometheusReportingInterval,
"Default": defaultPrometheusReportingInterval,
})
}
return
}
// Entry - some key-value pair, used for BulkPut
type Entry struct {
Key string
Value any
}
// Driver is the interface used to initialize a new type of PeerStorage.
type Driver interface {
NewStorage(cfg conf.MapConfig) (PeerStorage, error)
}
// Builder is the function used to initialize a new type of PeerStorage.
type Builder func(cfg conf.MapConfig) (PeerStorage, error)
// ErrResourceDoesNotExist is the error returned by all delete methods and the
// AnnouncePeers method of the PeerStorage interface if the requested resource
// does not exist.
var ErrResourceDoesNotExist = bittorrent.ClientError("resource does not exist")
var (
// ErrResourceDoesNotExist is the error returned by all delete methods and the
// AnnouncePeers method of the PeerStorage interface if the requested resource
// does not exist.
ErrResourceDoesNotExist = bittorrent.ClientError("resource does not exist")
// ErrDriverDoesNotExist is the error returned by NewStorage when a peer
// store driver with that name does not exist.
var ErrDriverDoesNotExist = errors.New("peer store driver with that name does not exist")
// ErrDriverDoesNotExist is the error returned by NewStorage when a peer
// store driver with that name does not exist.
ErrDriverDoesNotExist = errors.New("peer store driver with that name does not exist")
)
// DataStorage is the interface, used for implementing store for arbitrary data
type DataStorage interface {
@@ -59,6 +112,23 @@ type DataStorage interface {
Preservable() bool
}
// GCAware is the interface for storage that supports periodic
// stale peers collection
type GCAware interface {
// ScheduleGC used to delete stale data, such as timed out seeders/leechers.
// Note: implementation must create subroutine by itself
ScheduleGC(gcInterval, peerLifeTime time.Duration)
}
// StatisticsAware is the interface for storage that supports periodic
// statistics collection
type StatisticsAware interface {
// ScheduleStatisticsCollection used to receive statistics information about hashes,
// seeders and leechers count.
// Note: implementation must create subroutine by itself
ScheduleStatisticsCollection(reportInterval time.Duration)
}
// PeerStorage is an interface that abstracts the interactions of storing and
// manipulating Peers such that it can be implemented for various data stores.
//
@@ -70,10 +140,7 @@ type DataStorage interface {
// to track the last activity for that Peer. The entire database can then
// be scanned periodically and too old Peers removed. The intervals and
// durations involved should be configurable.
// - IPv4 and IPv6 swarms must be isolated from each other.
// A PeerStorage must be able to transparently handle IPv4 and IPv6 Peers, but
// must separate them. AnnouncePeers and ScrapeSwarm must return information
// about the Swarm matching the given AddressFamily only.
// - IPv4 and IPv6 swarms may be isolated from each other.
//
// Implementations can be tested against this interface using the tests in
// storage_test.go and the benchmarks in storage_bench.go.
@@ -136,9 +203,6 @@ type PeerStorage interface {
// If the Swarm does not exist, an empty Scrape and no error is returned.
ScrapeSwarm(infoHash bittorrent.InfoHash, peer bittorrent.Peer) bittorrent.Scrape
// GC used to delete stale data, such as timed out seeders/leechers
GC(cutoff time.Time)
// Stopper is an interface that expects a Stop method to stop the PeerStorage.
// For more details see the documentation in the stop package.
stop.Stopper
@@ -148,41 +212,62 @@ type PeerStorage interface {
log.Fielder
}
// RegisterDriver makes a Driver available by the provided name.
// RegisterBuilder makes a Builder available by the provided name.
//
// If called twice with the same name, the name is blank, or if the provided
// Driver is nil, this function panics.
func RegisterDriver(name string, d Driver) {
func RegisterBuilder(name string, b Builder) {
if name == "" {
panic("storage: could not register a Driver with an empty name")
panic("storage: could not register a Builder with an empty name")
}
if d == nil {
panic("storage: could not register a nil Driver")
if b == nil {
panic("storage: could not register a nil Builder")
}
driversM.Lock()
defer driversM.Unlock()
if _, dup := drivers[name]; dup {
panic("storage: RegisterDriver called twice for " + name)
panic("storage: RegisterBuilder called twice for " + name)
}
drivers[name] = d
drivers[name] = b
}
// NewStorage attempts to initialize a new PeerStorage instance from
// the list of registered Drivers.
//
// If a driver does not exist, returns ErrDriverDoesNotExist.
// If a builder does not exist, returns ErrDriverDoesNotExist.
func NewStorage(name string, cfg conf.MapConfig) (ps PeerStorage, err error) {
driversM.RLock()
defer driversM.RUnlock()
var d Driver
d, ok := drivers[name]
var b Builder
b, ok := drivers[name]
if !ok {
return nil, ErrDriverDoesNotExist
}
return d.NewStorage(cfg)
c := new(Config)
if err = cfg.Unmarshal(c); err != nil {
return
}
if ps, err = b(cfg); err != nil {
return
}
if gc, isOk := ps.(GCAware); isOk {
gc.ScheduleGC(c.sanitizeGCConfig())
}
if statInterval := c.sanitizeStatisticsConfig(); statInterval > 0 {
if st, isOk := ps.(StatisticsAware); isOk {
st.ScheduleStatisticsCollection(statInterval)
}
} else {
log.Info("prometheus disabled because of zero reporting interval")
}
return
}

View File

@@ -5,7 +5,6 @@ package test
import (
"errors"
"testing"
"time"
"github.com/stretchr/testify/require"
@@ -257,23 +256,6 @@ func (th *testHolder) CustomBulkPutContainsLoadDelete(t *testing.T) {
}
}
func (th *testHolder) GC(t *testing.T) {
for _, c := range testData {
require.Nil(t, th.st.PutSeeder(c.ih, c.peer))
require.Nil(t, th.st.PutSeeder(c.ih, v4Peer))
require.Nil(t, th.st.PutSeeder(c.ih, v6Peer))
}
th.st.GC(time.Now().Add(time.Hour))
for _, c := range testData {
peers, err := th.st.AnnouncePeers(c.ih, false, 100, v4Peer)
if errors.Is(err, storage.ErrResourceDoesNotExist) {
err = nil
}
require.Nil(t, err)
require.Empty(t, peers)
}
}
// RunTests tests a PeerStorage implementation against the interface.
func RunTests(t *testing.T, p storage.PeerStorage) {
th := testHolder{st: p}
@@ -305,8 +287,6 @@ func RunTests(t *testing.T, p storage.PeerStorage) {
t.Run("CustomPutContainsLoadDelete", th.CustomPutContainsLoadDelete)
t.Run("CustomBulkPutContainsLoadDelete", th.CustomBulkPutContainsLoadDelete)
t.Run("GC", th.GC)
e := th.st.Stop()
require.Nil(t, <-e)
}