diff --git a/cmd/mochi/main.go b/cmd/mochi/main.go index 4904caa..0a9e05d 100644 --- a/cmd/mochi/main.go +++ b/cmd/mochi/main.go @@ -28,7 +28,7 @@ var e2eCmd *cobra.Command // Run represents the state of a running instance of Conf. type Run struct { configFilePath string - storage storage.Storage + storage storage.PeerStorage logic *middleware.Logic sg *stop.Group } @@ -45,7 +45,7 @@ func NewRun(configFilePath string) (*Run, error) { // Start begins an instance of Conf. // It is optional to provide an instance of the peer store to avoid the // creation of a new one. -func (r *Run) Start(ps storage.Storage) error { +func (r *Run) Start(ps storage.PeerStorage) error { configFile, err := ParseConfigFile(r.configFilePath) if err != nil { return fmt.Errorf("failed to read config: %w", err) @@ -115,7 +115,7 @@ func combineErrors(prefix string, errs []error) error { } // Stop shuts down an instance of Conf. -func (r *Run) Stop(keepPeerStore bool) (storage.Storage, error) { +func (r *Run) Stop(keepPeerStore bool) (storage.PeerStorage, error) { log.Debug("stopping frontends and metrics server") if errs := r.sg.Stop().Wait(); len(errs) != 0 { return nil, combineErrors("failed while shutting down frontends", errs) diff --git a/dist/example_config.yaml b/dist/example_config.yaml index dd1b763..d89730f 100644 --- a/dist/example_config.yaml +++ b/dist/example_config.yaml @@ -220,6 +220,8 @@ mochi: # - name: torrent approval # options: # initial_source: list + # Save data provided by source in storage above +# preserve: false # configuration: # hash_list: # - "a1b2c3d4e5a1b2c3d4e5a1b2c3d4e5a1b2c3d4e5" diff --git a/docs/middleware/torrent_approval.md b/docs/middleware/torrent_approval.md index 8d3a2d1..e3db3a1 100644 --- a/docs/middleware/torrent_approval.md +++ b/docs/middleware/torrent_approval.md @@ -21,10 +21,6 @@ If mode is **black list** (`invert` set to `true`), tracker will allow all hashe There are two sources of hashes: `list` and `directory`. -Both of them used as **INITIAL** source for storing in storage. -If storage is not `memory`, records are persisted until _somebody_ -or _something_ (different tool with access to storage) won't delete it. - `list` is the static set of hashes, specified in configuration file. `directory` will watch for `*.torrent` files in specified path and @@ -32,11 +28,16 @@ append/delete records from storage. This source will parse all existing files at start and then watch for new files to add, or for delete events to remove hash from storage. +Note: if storage is not `memory`, and `preserve` option set to `true`, records +will be persisted in storage until _somebody_ or _something_ (different tool with access +to storage) won't delete it. + ## Configuration This middleware provides the following parameters for configuration: - `initial_source` - source type: `list` or `directory` +- `preserve`: - save source provided data into storage - `configuration` - options for specified source - `list`: - `hash_list` - list of HEX encoded hashes @@ -57,8 +58,9 @@ mochi: - name: torrent approval options: initial_source: list + preserve: true configuration: - hash_list: ["AAA", "BBB"] + hash_list: [ "AAA", "BBB" ] invert: false storage_ctx: APPROVED_HASH ``` diff --git a/middleware/clientapproval/clientapproval.go b/middleware/clientapproval/clientapproval.go index 2a20fc3..607aa7c 100644 --- a/middleware/clientapproval/clientapproval.go +++ b/middleware/clientapproval/clientapproval.go @@ -39,7 +39,7 @@ type hook struct { unapproved map[ClientID]struct{} } -func build(options conf.MapConfig, _ storage.Storage) (middleware.Hook, error) { +func build(options conf.MapConfig, _ storage.PeerStorage) (middleware.Hook, error) { var cfg Config if err := options.Unmarshal(&cfg); err != nil { diff --git a/middleware/hooks.go b/middleware/hooks.go index 415943c..3120245 100644 --- a/middleware/hooks.go +++ b/middleware/hooks.go @@ -27,7 +27,7 @@ type skipSwarmInteraction struct{} var SkipSwarmInteractionKey = skipSwarmInteraction{} type swarmInteractionHook struct { - store storage.Storage + store storage.PeerStorage } func (h *swarmInteractionHook) HandleAnnounce(ctx context.Context, req *bittorrent.AnnounceRequest, _ *bittorrent.AnnounceResponse) (_ context.Context, err error) { @@ -87,7 +87,7 @@ var SkipResponseHookKey = skipResponseHook{} // var ScrapeIsIPv6Key = scrapeAddressType{} type responseHook struct { - store storage.Storage + store storage.PeerStorage } func (h *responseHook) HandleAnnounce(ctx context.Context, req *bittorrent.AnnounceRequest, resp *bittorrent.AnnounceResponse) (_ context.Context, err error) { diff --git a/middleware/jwt/jwt.go b/middleware/jwt/jwt.go index f5375db..9ad1683 100644 --- a/middleware/jwt/jwt.go +++ b/middleware/jwt/jwt.go @@ -70,7 +70,7 @@ type hook struct { closing chan struct{} } -func build(options conf.MapConfig, _ storage.Storage) (middleware.Hook, error) { +func build(options conf.MapConfig, _ storage.PeerStorage) (middleware.Hook, error) { var cfg Config if err := options.Unmarshal(&cfg); err != nil { diff --git a/middleware/logic.go b/middleware/logic.go index f2b224e..0219012 100644 --- a/middleware/logic.go +++ b/middleware/logic.go @@ -15,7 +15,7 @@ var _ frontend.TrackerLogic = &Logic{} // NewLogic creates a new instance of a TrackerLogic that executes the provided // middleware hooks. -func NewLogic(annInterval, minAnnInterval time.Duration, peerStore storage.Storage, preHooks, postHooks []Hook) *Logic { +func NewLogic(annInterval, minAnnInterval time.Duration, peerStore storage.PeerStorage, preHooks, postHooks []Hook) *Logic { return &Logic{ announceInterval: annInterval, minAnnounceInterval: minAnnInterval, diff --git a/middleware/middleware.go b/middleware/middleware.go index 238d8d7..140a97b 100644 --- a/middleware/middleware.go +++ b/middleware/middleware.go @@ -23,7 +23,7 @@ var ( // // The `options` parameter is map of parameters that should be unmarshalled into // the hook's custom configuration. -type Builder func(options conf.MapConfig, storage storage.Storage) (Hook, error) +type Builder func(options conf.MapConfig, storage storage.PeerStorage) (Hook, error) // RegisterBuilder makes a Builder available by the provided name. // @@ -51,7 +51,7 @@ func RegisterBuilder(name string, d Builder) { // list of registered Builders. // // If a driver does not exist, returns ErrBuilderDoesNotExist. -func New(name string, options conf.MapConfig, storage storage.Storage) (Hook, error) { +func New(name string, options conf.MapConfig, storage storage.PeerStorage) (Hook, error) { driversM.RLock() defer driversM.RUnlock() @@ -72,7 +72,7 @@ type Config struct { // HooksFromHookConfigs is a utility function for initializing Hooks in bulk. // each element of configs must contain pairs `name` - string and `options` - map[string]any -func HooksFromHookConfigs(configs []conf.MapConfig, storage storage.Storage) (hooks []Hook, err error) { +func HooksFromHookConfigs(configs []conf.MapConfig, storage storage.PeerStorage) (hooks []Hook, err error) { for _, cfg := range configs { var c Config diff --git a/middleware/torrentapproval/container/container.go b/middleware/torrentapproval/container/container.go index af648ba..155d71c 100644 --- a/middleware/torrentapproval/container/container.go +++ b/middleware/torrentapproval/container/container.go @@ -15,7 +15,7 @@ import ( const DefaultStorageCtxName = "MW_APPROVAL" // Builder function that creates and configures specific container -type Builder func(conf.MapConfig, storage.Storage) (Container, error) +type Builder func(conf.MapConfig, storage.DataStorage) (Container, error) var ( buildersMU sync.Mutex @@ -45,7 +45,7 @@ type Container interface { } // GetContainer creates Container by its name and provided confBytes -func GetContainer(name string, config conf.MapConfig, storage storage.Storage) (Container, error) { +func GetContainer(name string, config conf.MapConfig, storage storage.DataStorage) (Container, error) { buildersMU.Lock() defer buildersMU.Unlock() var err error diff --git a/middleware/torrentapproval/container/directory/directory.go b/middleware/torrentapproval/container/directory/directory.go index c97d92c..82c88f8 100644 --- a/middleware/torrentapproval/container/directory/directory.go +++ b/middleware/torrentapproval/container/directory/directory.go @@ -35,7 +35,7 @@ type Config struct { Path string } -func build(conf conf.MapConfig, st storage.Storage) (container.Container, error) { +func build(conf conf.MapConfig, st storage.DataStorage) (container.Container, error) { c := new(Config) if err := conf.Unmarshal(c); err != nil { return nil, fmt.Errorf("unable to deserialise configuration: %w", err) @@ -84,7 +84,7 @@ func build(conf conf.MapConfig, st storage.Storage) (container.Container, error) if len(name) == 0 { name = list.DUMMY } - if err := d.Storage.BulkPut(d.StorageCtx, + if err := d.Storage.Put(d.StorageCtx, storage.Entry{ Key: event.InfoHash.AsString(), Value: name, diff --git a/middleware/torrentapproval/container/list/list.go b/middleware/torrentapproval/container/list/list.go index 011ee98..ae51b43 100644 --- a/middleware/torrentapproval/container/list/list.go +++ b/middleware/torrentapproval/container/list/list.go @@ -33,7 +33,7 @@ type Config struct { // DUMMY used as value placeholder if storage needs some value with const DUMMY = "_" -func build(conf conf.MapConfig, st storage.Storage) (container.Container, error) { +func build(conf conf.MapConfig, st storage.DataStorage) (container.Container, error) { c := new(Config) if err := conf.Unmarshal(c); err != nil { return nil, fmt.Errorf("unable to deserialise configuration: %w", err) @@ -61,7 +61,7 @@ func build(conf conf.MapConfig, st storage.Storage) (container.Container, error) init = append(init, storage.Entry{Key: ih.TruncateV1().RawString(), Value: DUMMY}) } } - if err := l.Storage.BulkPut(l.StorageCtx, init...); err != nil { + if err := l.Storage.Put(l.StorageCtx, init...); err != nil { return nil, fmt.Errorf("unable to put initial data: %w", err) } } @@ -73,7 +73,7 @@ type List struct { // Invert see Config.Invert description. Invert bool // Storage implementation where hashes are stored for approval checks. - Storage storage.Storage + Storage storage.DataStorage // StorageCtx see Config.StorageCtx description. StorageCtx string } diff --git a/middleware/torrentapproval/torrentapproval.go b/middleware/torrentapproval/torrentapproval.go index 5d59aba..1d43e68 100644 --- a/middleware/torrentapproval/torrentapproval.go +++ b/middleware/torrentapproval/torrentapproval.go @@ -10,6 +10,7 @@ import ( "github.com/sot-tech/mochi/middleware" "github.com/sot-tech/mochi/middleware/torrentapproval/container" "github.com/sot-tech/mochi/pkg/conf" + "github.com/sot-tech/mochi/storage/memory" // import directory watcher to enable appropriate support _ "github.com/sot-tech/mochi/middleware/torrentapproval/container/directory" @@ -30,11 +31,14 @@ func init() { type baseConfig struct { // Source - name of container for initial values Source string `cfg:"initial_source"` + // Preserve - if true, container will receive real registered storage if it is NOT `memory` + // if false - temporary in-memory storage will be used or created + Preserve bool // Configuration depends on used container Configuration conf.MapConfig } -func build(options conf.MapConfig, storage storage.Storage) (h middleware.Hook, err error) { +func build(options conf.MapConfig, st storage.PeerStorage) (h middleware.Hook, err error) { var cfg baseConfig if err = options.Unmarshal(&cfg); err != nil { return nil, fmt.Errorf("middleware %s: %w", Name, err) @@ -48,8 +52,13 @@ func build(options conf.MapConfig, storage storage.Storage) (h middleware.Hook, return nil, fmt.Errorf("invalid options for middleware %s: options not provided", Name) } + var ds storage.DataStorage = st + if !cfg.Preserve && ds.Preservable() { + ds = memory.NewDataStore() + } + var c container.Container - if c, err = container.GetContainer(cfg.Source, cfg.Configuration, storage); err == nil { + if c, err = container.GetContainer(cfg.Source, cfg.Configuration, ds); err == nil { h = &hook{c} } return h, err diff --git a/middleware/torrentapproval/torrentapproval_test.go b/middleware/torrentapproval/torrentapproval_test.go index 48c40fc..32c11d5 100644 --- a/middleware/torrentapproval/torrentapproval_test.go +++ b/middleware/torrentapproval/torrentapproval_test.go @@ -67,7 +67,7 @@ var cases = []struct { func TestHandleAnnounce(t *testing.T) { config := memory.Config{}.Validate() - storage, err := memory.New(config) + storage, err := memory.NewPeerStorage(config) require.Nil(t, err) for _, tt := range cases { t.Run(fmt.Sprintf("testing hash %s", tt.ih), func(t *testing.T) { diff --git a/middleware/varinterval/varinterval.go b/middleware/varinterval/varinterval.go index 9dd530f..f9540ac 100644 --- a/middleware/varinterval/varinterval.go +++ b/middleware/varinterval/varinterval.go @@ -22,7 +22,7 @@ func init() { middleware.RegisterBuilder(Name, build) } -func build(options conf.MapConfig, _ storage.Storage) (h middleware.Hook, err error) { +func build(options conf.MapConfig, _ storage.PeerStorage) (h middleware.Hook, err error) { var cfg Config if err = options.Unmarshal(&cfg); err != nil { diff --git a/storage/memory/storage.go b/storage/memory/storage.go index af97fe3..9a7c1b5 100644 --- a/storage/memory/storage.go +++ b/storage/memory/storage.go @@ -38,15 +38,15 @@ func init() { type driver struct{} -func (d driver) NewStorage(icfg conf.MapConfig) (storage.Storage, error) { +func (d driver) NewStorage(icfg conf.MapConfig) (storage.PeerStorage, error) { var cfg Config if err := icfg.Unmarshal(&cfg); err != nil { return nil, err } - return New(cfg) + return NewPeerStorage(cfg) } -// Config holds the configuration of a memory Storage. +// Config holds the configuration of a memory PeerStorage. type Config struct { GarbageCollectionInterval time.Duration `cfg:"gc_interval"` PrometheusReportingInterval time.Duration `cfg:"prometheus_reporting_interval"` @@ -111,14 +111,14 @@ func (cfg Config) Validate() Config { return validcfg } -// New creates a new Storage backed by memory. -func New(provided Config) (storage.Storage, error) { +// NewPeerStorage creates a new PeerStorage backed by memory. +func NewPeerStorage(provided Config) (storage.PeerStorage, error) { cfg := provided.Validate() - ps := &store{ - cfg: cfg, - shards: make([]*peerShard, cfg.ShardCount*2), - contexts: sync.Map{}, - closed: make(chan struct{}), + ps := &peerStore{ + cfg: cfg, + shards: make([]*peerShard, cfg.ShardCount*2), + DataStorage: NewDataStore(), + closed: make(chan struct{}), } for i := 0; i < cfg.ShardCount*2; i++ { @@ -183,20 +183,20 @@ type swarm struct { leechers map[string]int64 } -type store struct { - cfg Config - shards []*peerShard - contexts sync.Map +type peerStore struct { + storage.DataStorage + cfg Config + shards []*peerShard closed chan struct{} wg sync.WaitGroup } -var _ storage.Storage = &store{} +var _ storage.PeerStorage = &peerStore{} // populateProm aggregates metrics over all shards and then posts them to // prometheus. -func (ps *store) populateProm() { +func (ps *peerStore) populateProm() { var numInfohashes, numSeeders, numLeechers uint64 for _, s := range ps.shards { @@ -217,11 +217,11 @@ func recordGCDuration(duration time.Duration) { storage.PromGCDurationMilliseconds.Observe(float64(duration.Nanoseconds()) / float64(time.Millisecond)) } -func (ps *store) getClock() int64 { +func (ps *peerStore) getClock() int64 { return timecache.NowUnixNano() } -func (ps *store) shardIndex(infoHash bittorrent.InfoHash, addr netip.Addr) uint32 { +func (ps *peerStore) shardIndex(infoHash bittorrent.InfoHash, addr netip.Addr) uint32 { // There are twice the amount of shards specified by the user, the first // half is dedicated to IPv4 swarms and the second half is dedicated to // IPv6 swarms. @@ -232,7 +232,7 @@ func (ps *store) shardIndex(infoHash bittorrent.InfoHash, addr netip.Addr) uint3 return idx } -func (ps *store) PutSeeder(ih bittorrent.InfoHash, p bittorrent.Peer) error { +func (ps *peerStore) PutSeeder(ih bittorrent.InfoHash, p bittorrent.Peer) error { select { case <-ps.closed: panic("attempted to interact with stopped memory store") @@ -263,7 +263,7 @@ func (ps *store) PutSeeder(ih bittorrent.InfoHash, p bittorrent.Peer) error { return nil } -func (ps *store) DeleteSeeder(ih bittorrent.InfoHash, p bittorrent.Peer) error { +func (ps *peerStore) DeleteSeeder(ih bittorrent.InfoHash, p bittorrent.Peer) error { select { case <-ps.closed: panic("attempted to interact with stopped memory store") @@ -296,7 +296,7 @@ func (ps *store) DeleteSeeder(ih bittorrent.InfoHash, p bittorrent.Peer) error { return nil } -func (ps *store) PutLeecher(ih bittorrent.InfoHash, p bittorrent.Peer) error { +func (ps *peerStore) PutLeecher(ih bittorrent.InfoHash, p bittorrent.Peer) error { select { case <-ps.closed: panic("attempted to interact with stopped memory store") @@ -327,7 +327,7 @@ func (ps *store) PutLeecher(ih bittorrent.InfoHash, p bittorrent.Peer) error { return nil } -func (ps *store) DeleteLeecher(ih bittorrent.InfoHash, p bittorrent.Peer) error { +func (ps *peerStore) DeleteLeecher(ih bittorrent.InfoHash, p bittorrent.Peer) error { select { case <-ps.closed: panic("attempted to interact with stopped memory store") @@ -360,7 +360,7 @@ func (ps *store) DeleteLeecher(ih bittorrent.InfoHash, p bittorrent.Peer) error return nil } -func (ps *store) GraduateLeecher(ih bittorrent.InfoHash, p bittorrent.Peer) error { +func (ps *peerStore) GraduateLeecher(ih bittorrent.InfoHash, p bittorrent.Peer) error { select { case <-ps.closed: panic("attempted to interact with stopped memory store") @@ -397,7 +397,7 @@ func (ps *store) GraduateLeecher(ih bittorrent.InfoHash, p bittorrent.Peer) erro return nil } -func (ps *store) AnnouncePeers(ih bittorrent.InfoHash, seeder bool, numWant int, peer bittorrent.Peer) (peers []bittorrent.Peer, err error) { +func (ps *peerStore) AnnouncePeers(ih bittorrent.InfoHash, seeder bool, numWant int, peer bittorrent.Peer) (peers []bittorrent.Peer, err error) { select { case <-ps.closed: panic("attempted to interact with stopped memory store") @@ -458,7 +458,7 @@ func (ps *store) AnnouncePeers(ih bittorrent.InfoHash, seeder bool, numWant int, return } -func (ps *store) ScrapeSwarm(ih bittorrent.InfoHash, peer bittorrent.Peer) (resp bittorrent.Scrape) { +func (ps *peerStore) ScrapeSwarm(ih bittorrent.InfoHash, peer bittorrent.Peer) (resp bittorrent.Scrape) { select { case <-ps.closed: panic("attempted to interact with stopped memory store") @@ -482,6 +482,15 @@ func (ps *store) ScrapeSwarm(ih bittorrent.InfoHash, peer bittorrent.Peer) (resp return } +// NewDataStore creates new in-memory data store +func NewDataStore() storage.DataStorage { + return new(dataStore) +} + +type dataStore struct { + sync.Map +} + func asKey(in any) any { if in == nil { panic("unable to use nil map key") @@ -493,42 +502,36 @@ func asKey(in any) any { return fmt.Sprint(in) } -func (ps *store) Put(ctx string, value storage.Entry) error { - m, _ := ps.contexts.LoadOrStore(ctx, new(sync.Map)) - m.(*sync.Map).Store(value.Key, value.Value) - return nil -} - -func (ps *store) Contains(ctx string, key string) (bool, error) { - var exist bool - if m, found := ps.contexts.Load(ctx); found { - _, exist = m.(*sync.Map).Load(asKey(key)) - } - return exist, nil -} - -func (ps *store) BulkPut(ctx string, pairs ...storage.Entry) error { - if len(pairs) > 0 { - c, _ := ps.contexts.LoadOrStore(ctx, new(sync.Map)) +func (ds *dataStore) Put(ctx string, values ...storage.Entry) error { + if len(values) > 0 { + c, _ := ds.LoadOrStore(ctx, new(sync.Map)) m := c.(*sync.Map) - for _, p := range pairs { + for _, p := range values { m.Store(asKey(p.Key), p.Value) } } return nil } -func (ps *store) Load(ctx string, key string) (any, error) { +func (ds *dataStore) Contains(ctx string, key string) (bool, error) { + var exist bool + if m, found := ds.Map.Load(ctx); found { + _, exist = m.(*sync.Map).Load(asKey(key)) + } + return exist, nil +} + +func (ds *dataStore) Load(ctx string, key string) (any, error) { var v any - if m, found := ps.contexts.Load(ctx); found { + if m, found := ds.Map.Load(ctx); found { v, _ = m.(*sync.Map).Load(asKey(key)) } return v, nil } -func (ps *store) Delete(ctx string, keys ...string) error { +func (ds *dataStore) Delete(ctx string, keys ...string) error { if len(keys) > 0 { - if m, found := ps.contexts.Load(ctx); found { + if m, found := ds.Map.Load(ctx); found { m := m.(*sync.Map) for _, k := range keys { m.Delete(asKey(k)) @@ -538,12 +541,16 @@ func (ps *store) Delete(ctx string, keys ...string) error { return nil } -// GC deletes all Peers from the Storage which are older than the +func (*dataStore) Preservable() bool { + return false +} + +// GC deletes all Peers from the PeerStorage which are older than the // cutoff time. // // This function must be able to execute while other methods on this interface // are being executed in parallel. -func (ps *store) GC(cutoff time.Time) { +func (ps *peerStore) GC(cutoff time.Time) { select { case <-ps.closed: return @@ -596,7 +603,7 @@ func (ps *store) GC(cutoff time.Time) { } } -func (ps *store) Stop() stop.Result { +func (ps *peerStore) Stop() stop.Result { c := make(stop.Channel) go func() { if ps.closed != nil { @@ -617,6 +624,6 @@ func (ps *store) Stop() stop.Result { return c.Result() } -func (ps *store) LogFields() log.Fields { +func (ps *peerStore) LogFields() log.Fields { return ps.cfg.LogFields() } diff --git a/storage/memory/storage_test.go b/storage/memory/storage_test.go index f37861c..2f86f4c 100644 --- a/storage/memory/storage_test.go +++ b/storage/memory/storage_test.go @@ -8,8 +8,8 @@ import ( "github.com/sot-tech/mochi/storage/test" ) -func createNew() storage.Storage { - ps, err := New(Config{ +func createNew() storage.PeerStorage { + ps, err := NewPeerStorage(Config{ ShardCount: 1024, GarbageCollectionInterval: 10 * time.Minute, PrometheusReportingInterval: 10 * time.Minute, diff --git a/storage/redis/storage.go b/storage/redis/storage.go index ef84f1c..c7b1c12 100644 --- a/storage/redis/storage.go +++ b/storage/redis/storage.go @@ -73,7 +73,7 @@ func init() { type driver struct{} -func (d driver) NewStorage(icfg conf.MapConfig) (storage.Storage, error) { +func (d driver) NewStorage(icfg conf.MapConfig) (storage.PeerStorage, error) { // Unmarshal the bytes into the proper config type. var cfg Config @@ -84,7 +84,7 @@ func (d driver) NewStorage(icfg conf.MapConfig) (storage.Storage, error) { return New(cfg) } -// Config holds the configuration of a redis Storage. +// Config holds the configuration of a redis PeerStorage. type Config struct { GarbageCollectionInterval time.Duration `cfg:"gc_interval"` PrometheusReportingInterval time.Duration `cfg:"prometheus_reporting_interval"` @@ -252,8 +252,8 @@ func connect(cfg Config) (*store, error) { return db, err } -// New creates a new Storage backed by redis. -func New(conf Config) (storage.Storage, error) { +// New creates a new PeerStorage backed by redis. +func New(conf Config) (storage.PeerStorage, error) { cfg, err := conf.Validate() if err != nil { return nil, err @@ -658,30 +658,25 @@ func (ps *store) ScrapeSwarm(ih bittorrent.InfoHash, peer bittorrent.Peer) (resp return } -func (ps *store) Put(ctx string, value storage.Entry) error { - return ps.con.HSet(ps.ctx, prefixKey+ctx, value.Key, value.Value).Err() -} - -func (ps *store) Contains(ctx string, key string) (bool, error) { - exist, err := ps.con.HExists(ps.ctx, prefixKey+ctx, key).Result() - return exist, asNil(err) -} - const argNumErrorMsg = "ERR wrong number of arguments" -func (ps *store) BulkPut(ctx string, pairs ...storage.Entry) (err error) { - if l := len(pairs); l > 0 { - args := make([]any, 0, l*2) - for _, p := range pairs { - args = append(args, p.Key, p.Value) - } - err = ps.con.HSet(ps.ctx, prefixKey+ctx, args...).Err() - if err != nil { - if strings.Contains(err.Error(), argNumErrorMsg) { - log.Warn("This REDIS version/implementation does not support variadic arguments for HSET") - for _, p := range pairs { - if err = ps.con.HSet(ps.ctx, prefixKey+ctx, p.Key, p.Value).Err(); err != nil { - break +func (ps *store) Put(ctx string, values ...storage.Entry) (err error) { + if l := len(values); l > 0 { + if l == 1 { + err = ps.con.HSet(ps.ctx, prefixKey+ctx, values[0].Key, values[0].Value).Err() + } else { + args := make([]any, 0, l*2) + for _, p := range values { + args = append(args, p.Key, p.Value) + } + err = ps.con.HSet(ps.ctx, prefixKey+ctx, args...).Err() + if err != nil { + if strings.Contains(err.Error(), argNumErrorMsg) { + log.Warn("This REDIS version/implementation does not support variadic arguments for HSET") + for _, p := range values { + if err = ps.con.HSet(ps.ctx, prefixKey+ctx, p.Key, p.Value).Err(); err != nil { + break + } } } } @@ -690,6 +685,11 @@ func (ps *store) BulkPut(ctx string, pairs ...storage.Entry) (err error) { return } +func (ps *store) Contains(ctx string, key string) (bool, error) { + exist, err := ps.con.HExists(ps.ctx, prefixKey+ctx, key).Result() + return exist, asNil(err) +} + func (ps *store) Load(ctx string, key string) (v any, err error) { v, err = ps.con.HGet(ps.ctx, prefixKey+ctx, key).Result() if err != nil && errors.Is(err, redis.Nil) { @@ -715,6 +715,10 @@ func (ps *store) Delete(ctx string, keys ...string) (err error) { return } +func (*store) Preservable() bool { + return true +} + func (ps *store) GC(cutoff time.Time) { log.Debug("storage: purging peers with no announces since", log.Fields{"before": cutoff}) cutoffUnix := cutoff.UnixNano() @@ -722,7 +726,7 @@ func (ps *store) GC(cutoff time.Time) { ps.gc(cutoffUnix, true) } -// gc deletes all Peers from the Storage which are older than the +// gc deletes all Peers from the PeerStorage which are older than the // cutoff time. // // This function must be able to execute while other methods on this interface diff --git a/storage/redis/storage_test.go b/storage/redis/storage_test.go index f8955b9..2d90eef 100644 --- a/storage/redis/storage_test.go +++ b/storage/redis/storage_test.go @@ -20,8 +20,8 @@ var cfg = Config{ ConnectTimeout: 10 * time.Second, } -func createNew() s.Storage { - var ps s.Storage +func createNew() s.PeerStorage { + var ps s.PeerStorage var err error ps, err = New(cfg) if err != nil { diff --git a/storage/storage.go b/storage/storage.go index 98c82f6..6dbef8e 100644 --- a/storage/storage.go +++ b/storage/storage.go @@ -24,13 +24,13 @@ type Entry struct { Value any } -// Driver is the interface used to initialize a new type of Storage. +// Driver is the interface used to initialize a new type of PeerStorage. type Driver interface { - NewStorage(cfg conf.MapConfig) (Storage, error) + NewStorage(cfg conf.MapConfig) (PeerStorage, error) } // ErrResourceDoesNotExist is the error returned by all delete methods and the -// AnnouncePeers method of the Storage interface if the requested resource +// AnnouncePeers method of the PeerStorage interface if the requested resource // does not exist. var ErrResourceDoesNotExist = bittorrent.ClientError("resource does not exist") @@ -38,10 +38,31 @@ var ErrResourceDoesNotExist = bittorrent.ClientError("resource does not exist") // store driver with that name does not exist. var ErrDriverDoesNotExist = errors.New("peer store driver with that name does not exist") -// Storage is an interface that abstracts the interactions of storing and +// DataStorage is the interface, used for implementing store for arbitrary data +type DataStorage interface { + // Put used to place arbitrary k-v data with specified context + // into storage. ctx parameter used to group data + // (i.e. data only for specific middleware module: hash key, table name etc...) + Put(ctx string, values ...Entry) error + + // Contains checks if any data in specified context exist + Contains(ctx string, key string) (bool, error) + + // Load used to get arbitrary data in specified context by its key + Load(ctx string, key string) (any, error) + + // Delete used to delete arbitrary data in specified context by its keys + Delete(ctx string, keys ...string) error + + // Preservable indicates, that this storage can store data permanently, + // in other words, is NOT in-memory storage, which data will be lost after restart + Preservable() bool +} + +// PeerStorage is an interface that abstracts the interactions of storing and // manipulating Peers such that it can be implemented for various data stores. // -// Implementations of the Storage interface must do the following in addition +// Implementations of the PeerStorage interface must do the following in addition // to implementing the methods of the interface in the way documented: // // - Implement a garbage-collection strategy that ensures stale data is removed. @@ -50,13 +71,14 @@ var ErrDriverDoesNotExist = errors.New("peer store driver with that name does no // be scanned periodically and too old Peers removed. The intervals and // durations involved should be configurable. // - IPv4 and IPv6 swarms must be isolated from each other. -// A Storage must be able to transparently handle IPv4 and IPv6 Peers, but +// A PeerStorage must be able to transparently handle IPv4 and IPv6 Peers, but // must separate them. AnnouncePeers and ScrapeSwarm must return information // about the Swarm matching the given AddressFamily only. // // Implementations can be tested against this interface using the tests in // storage_test.go and the benchmarks in storage_bench.go. -type Storage interface { +type PeerStorage interface { + DataStorage // PutSeeder adds a Seeder to the Swarm identified by the provided // InfoHash. PutSeeder(infoHash bittorrent.InfoHash, peer bittorrent.Peer) error @@ -114,33 +136,15 @@ type Storage interface { // If the Swarm does not exist, an empty Scrape and no error is returned. ScrapeSwarm(infoHash bittorrent.InfoHash, peer bittorrent.Peer) bittorrent.Scrape - // Put used to place arbitrary k-v data with specified context - // into storage. ctx parameter used to group data - // (i.e. data only for specific middleware module) - Put(ctx string, value Entry) error - - // BulkPut used to place array of k-v data in specified context. - // Useful when several data entries should be added in single transaction/connection - BulkPut(ctx string, values ...Entry) error - - // Contains checks if any data in specified context exist - Contains(ctx string, key string) (bool, error) - - // Load used to get arbitrary data in specified context by its key - Load(ctx string, key string) (any, error) - - // Delete used to delete arbitrary data in specified context by its keys - Delete(ctx string, keys ...string) error - // GC used to delete stale data, such as timed out seeders/leechers GC(cutoff time.Time) - // Stopper is an interface that expects a Stop method to stop the Storage. + // Stopper is an interface that expects a Stop method to stop the PeerStorage. // For more details see the documentation in the stop package. stop.Stopper // Fielder returns a loggable version of the data used to configure and - // operate a particular Storage. + // operate a particular PeerStorage. log.Fielder } @@ -166,11 +170,11 @@ func RegisterDriver(name string, d Driver) { drivers[name] = d } -// NewStorage attempts to initialize a new Storage instance from +// NewStorage attempts to initialize a new PeerStorage instance from // the list of registered Drivers. // // If a driver does not exist, returns ErrDriverDoesNotExist. -func NewStorage(name string, cfg conf.MapConfig) (ps Storage, err error) { +func NewStorage(name string, cfg conf.MapConfig) (ps PeerStorage, err error) { driversM.RLock() defer driversM.RUnlock() diff --git a/storage/test/storage_bench.go b/storage/test/storage_bench.go index 2c1a93d..b79e601 100644 --- a/storage/test/storage_bench.go +++ b/storage/test/storage_bench.go @@ -52,9 +52,9 @@ func generatePeers() (a [10000]bittorrent.Peer) { } type ( - benchExecFunc func(int, storage.Storage, *benchData) error - benchSetupFunc func(storage.Storage, *benchData) error - benchStorageConstructor func() storage.Storage + benchExecFunc func(int, storage.PeerStorage, *benchData) error + benchSetupFunc func(storage.PeerStorage, *benchData) error + benchStorageConstructor func() storage.PeerStorage ) type benchHolder struct { @@ -102,65 +102,65 @@ func (bh *benchHolder) runBenchmark(b *testing.B, parallel bool, sf benchSetupFu } // Nop executes a no-op for each iteration. -// It should produce the same results for each storage.Storage. +// It should produce the same results for each storage.PeerStorage. // This can be used to get an estimate of the impact of the benchmark harness // on benchmark results and an estimate of the general performance of the system // benchmarked on. // // Nop can run in parallel. func (bh *benchHolder) Nop(b *testing.B) { - bh.runBenchmark(b, true, putPeers, func(i int, ps storage.Storage, bd *benchData) error { + bh.runBenchmark(b, true, putPeers, func(i int, ps storage.PeerStorage, bd *benchData) error { return nil }) } -// Put benchmarks the PutSeeder method of a storage.Storage by repeatedly Putting the +// Put benchmarks the PutSeeder method of a storage.PeerStorage by repeatedly Putting the // same Peer for the same InfoHash. // // Put can run in parallel. func (bh *benchHolder) Put(b *testing.B) { - bh.runBenchmark(b, true, nil, func(i int, ps storage.Storage, bd *benchData) error { + bh.runBenchmark(b, true, nil, func(i int, ps storage.PeerStorage, bd *benchData) error { return ps.PutSeeder(bd.infoHashes[0], bd.peers[0]) }) } -// Put1k benchmarks the PutSeeder method of a storage.Storage by cycling through 1000 +// Put1k benchmarks the PutSeeder method of a storage.PeerStorage by cycling through 1000 // Peers and Putting them into the swarm of one infohash. // // Put1k can run in parallel. func (bh *benchHolder) Put1k(b *testing.B) { - bh.runBenchmark(b, true, nil, func(i int, ps storage.Storage, bd *benchData) error { + bh.runBenchmark(b, true, nil, func(i int, ps storage.PeerStorage, bd *benchData) error { return ps.PutSeeder(bd.infoHashes[0], bd.peers[i%1000]) }) } -// Put1kInfoHash benchmarks the PutSeeder method of a storage.Storage by cycling +// Put1kInfoHash benchmarks the PutSeeder method of a storage.PeerStorage by cycling // through 1000 infoHashes and putting the same peer into their swarms. // // Put1kInfoHash can run in parallel. func (bh *benchHolder) Put1kInfoHash(b *testing.B) { - bh.runBenchmark(b, true, nil, func(i int, ps storage.Storage, bd *benchData) error { + bh.runBenchmark(b, true, nil, func(i int, ps storage.PeerStorage, bd *benchData) error { return ps.PutSeeder(bd.infoHashes[i%1000], bd.peers[0]) }) } -// Put1kInfoHash1k benchmarks the PutSeeder method of a storage.Storage by cycling +// Put1kInfoHash1k benchmarks the PutSeeder method of a storage.PeerStorage by cycling // through 1000 infoHashes and 1000 Peers and calling Put with them. // // Put1kInfoHash1k can run in parallel. func (bh *benchHolder) Put1kInfoHash1k(b *testing.B) { - bh.runBenchmark(b, true, nil, func(i int, ps storage.Storage, bd *benchData) error { + bh.runBenchmark(b, true, nil, func(i int, ps storage.PeerStorage, bd *benchData) error { err := ps.PutSeeder(bd.infoHashes[i%1000], bd.peers[(i*3)%1000]) return err }) } -// PutDelete benchmarks the PutSeeder and DeleteSeeder methods of a storage.Storage by +// PutDelete benchmarks the PutSeeder and DeleteSeeder methods of a storage.PeerStorage by // calling PutSeeder followed by DeleteSeeder for one Peer and one infohash. // // PutDelete can not run in parallel. func (bh *benchHolder) PutDelete(b *testing.B) { - bh.runBenchmark(b, false, nil, func(i int, ps storage.Storage, bd *benchData) error { + bh.runBenchmark(b, false, nil, func(i int, ps storage.PeerStorage, bd *benchData) error { err := ps.PutSeeder(bd.infoHashes[0], bd.peers[0]) if err != nil { return err @@ -174,7 +174,7 @@ func (bh *benchHolder) PutDelete(b *testing.B) { // // PutDelete1k can not run in parallel. func (bh *benchHolder) PutDelete1k(b *testing.B) { - bh.runBenchmark(b, false, nil, func(i int, ps storage.Storage, bd *benchData) error { + bh.runBenchmark(b, false, nil, func(i int, ps storage.PeerStorage, bd *benchData) error { err := ps.PutSeeder(bd.infoHashes[0], bd.peers[i%1000]) if err != nil { return err @@ -188,7 +188,7 @@ func (bh *benchHolder) PutDelete1k(b *testing.B) { // // PutDelete1kInfoHash can not run in parallel. func (bh *benchHolder) PutDelete1kInfoHash(b *testing.B) { - bh.runBenchmark(b, false, nil, func(i int, ps storage.Storage, bd *benchData) error { + bh.runBenchmark(b, false, nil, func(i int, ps storage.PeerStorage, bd *benchData) error { err := ps.PutSeeder(bd.infoHashes[i%1000], bd.peers[0]) if err != nil { return err @@ -202,7 +202,7 @@ func (bh *benchHolder) PutDelete1kInfoHash(b *testing.B) { // // PutDelete1kInfoHash1k can not run in parallel. func (bh *benchHolder) PutDelete1kInfoHash1k(b *testing.B) { - bh.runBenchmark(b, false, nil, func(i int, ps storage.Storage, bd *benchData) error { + bh.runBenchmark(b, false, nil, func(i int, ps storage.PeerStorage, bd *benchData) error { err := ps.PutSeeder(bd.infoHashes[i%1000], bd.peers[(i*3)%1000]) if err != nil { return err @@ -212,102 +212,102 @@ func (bh *benchHolder) PutDelete1kInfoHash1k(b *testing.B) { }) } -// DeleteNonexist benchmarks the DeleteSeeder method of a storage.Storage by +// DeleteNonexist benchmarks the DeleteSeeder method of a storage.PeerStorage by // attempting to delete a Peer that is nonexistent. // // DeleteNonexist can run in parallel. func (bh *benchHolder) DeleteNonexist(b *testing.B) { - bh.runBenchmark(b, true, nil, func(i int, ps storage.Storage, bd *benchData) error { + bh.runBenchmark(b, true, nil, func(i int, ps storage.PeerStorage, bd *benchData) error { _ = ps.DeleteSeeder(bd.infoHashes[0], bd.peers[0]) return nil }) } -// DeleteNonexist1k benchmarks the DeleteSeeder method of a storage.Storage by +// DeleteNonexist1k benchmarks the DeleteSeeder method of a storage.PeerStorage by // attempting to delete one of 1000 nonexistent Peers. // // DeleteNonexist can run in parallel. func (bh *benchHolder) DeleteNonexist1k(b *testing.B) { - bh.runBenchmark(b, true, nil, func(i int, ps storage.Storage, bd *benchData) error { + bh.runBenchmark(b, true, nil, func(i int, ps storage.PeerStorage, bd *benchData) error { _ = ps.DeleteSeeder(bd.infoHashes[0], bd.peers[i%1000]) return nil }) } -// DeleteNonexist1kInfoHash benchmarks the DeleteSeeder method of a storage.Storage by +// DeleteNonexist1kInfoHash benchmarks the DeleteSeeder method of a storage.PeerStorage by // attempting to delete one Peer from one of 1000 infoHashes. // // DeleteNonexist1kInfoHash can run in parallel. func (bh *benchHolder) DeleteNonexist1kInfoHash(b *testing.B) { - bh.runBenchmark(b, true, nil, func(i int, ps storage.Storage, bd *benchData) error { + bh.runBenchmark(b, true, nil, func(i int, ps storage.PeerStorage, bd *benchData) error { _ = ps.DeleteSeeder(bd.infoHashes[i%1000], bd.peers[0]) return nil }) } -// DeleteNonexist1kInfoHash1k benchmarks the Delete method of a storage.Storage by +// DeleteNonexist1kInfoHash1k benchmarks the Delete method of a storage.PeerStorage by // attempting to delete one of 1000 Peers from one of 1000 InfoHashes. // // DeleteNonexist1kInfoHash1k can run in parallel. func (bh *benchHolder) DeleteNonexist1kInfoHash1k(b *testing.B) { - bh.runBenchmark(b, true, nil, func(i int, ps storage.Storage, bd *benchData) error { + bh.runBenchmark(b, true, nil, func(i int, ps storage.PeerStorage, bd *benchData) error { _ = ps.DeleteSeeder(bd.infoHashes[i%1000], bd.peers[(i*3)%1000]) return nil }) } -// GradNonexist benchmarks the GraduateLeecher method of a storage.Storage by +// GradNonexist benchmarks the GraduateLeecher method of a storage.PeerStorage by // attempting to graduate a nonexistent Peer. // // GradNonexist can run in parallel. func (bh *benchHolder) GradNonexist(b *testing.B) { - bh.runBenchmark(b, true, nil, func(i int, ps storage.Storage, bd *benchData) error { + bh.runBenchmark(b, true, nil, func(i int, ps storage.PeerStorage, bd *benchData) error { _ = ps.GraduateLeecher(bd.infoHashes[0], bd.peers[0]) return nil }) } -// GradNonexist1k benchmarks the GraduateLeecher method of a storage.Storage by +// GradNonexist1k benchmarks the GraduateLeecher method of a storage.PeerStorage by // attempting to graduate one of 1000 nonexistent Peers. // // GradNonexist1k can run in parallel. func (bh *benchHolder) GradNonexist1k(b *testing.B) { - bh.runBenchmark(b, true, nil, func(i int, ps storage.Storage, bd *benchData) error { + bh.runBenchmark(b, true, nil, func(i int, ps storage.PeerStorage, bd *benchData) error { _ = ps.GraduateLeecher(bd.infoHashes[0], bd.peers[i%1000]) return nil }) } -// GradNonexist1kInfoHash benchmarks the GraduateLeecher method of a storage.Storage +// GradNonexist1kInfoHash benchmarks the GraduateLeecher method of a storage.PeerStorage // by attempting to graduate a nonexistent Peer for one of 100 InfoHashes. // // GradNonexist1kInfoHash can run in parallel. func (bh *benchHolder) GradNonexist1kInfoHash(b *testing.B) { - bh.runBenchmark(b, true, nil, func(i int, ps storage.Storage, bd *benchData) error { + bh.runBenchmark(b, true, nil, func(i int, ps storage.PeerStorage, bd *benchData) error { _ = ps.GraduateLeecher(bd.infoHashes[i%1000], bd.peers[0]) return nil }) } -// GradNonexist1kInfoHash1k benchmarks the GraduateLeecher method of a storage.Storage +// GradNonexist1kInfoHash1k benchmarks the GraduateLeecher method of a storage.PeerStorage // by attempting to graduate one of 1000 nonexistent Peers for one of 1000 // infoHashes. // // GradNonexist1kInfoHash1k can run in parallel. func (bh *benchHolder) GradNonexist1kInfoHash1k(b *testing.B) { - bh.runBenchmark(b, true, nil, func(i int, ps storage.Storage, bd *benchData) error { + bh.runBenchmark(b, true, nil, func(i int, ps storage.PeerStorage, bd *benchData) error { _ = ps.GraduateLeecher(bd.infoHashes[i%1000], bd.peers[(i*3)%1000]) return nil }) } // PutGradDelete benchmarks the PutLeecher, GraduateLeecher and DeleteSeeder -// methods of a storage.Storage by adding one leecher to a swarm, promoting it to a +// methods of a storage.PeerStorage by adding one leecher to a swarm, promoting it to a // seeder and deleting the seeder. // // PutGradDelete can not run in parallel. func (bh *benchHolder) PutGradDelete(b *testing.B) { - bh.runBenchmark(b, false, nil, func(i int, ps storage.Storage, bd *benchData) error { + bh.runBenchmark(b, false, nil, func(i int, ps storage.PeerStorage, bd *benchData) error { err := ps.PutLeecher(bd.infoHashes[0], bd.peers[0]) if err != nil { return err @@ -324,7 +324,7 @@ func (bh *benchHolder) PutGradDelete(b *testing.B) { // // PutGradDelete1k can not run in parallel. func (bh *benchHolder) PutGradDelete1k(b *testing.B) { - bh.runBenchmark(b, false, nil, func(i int, ps storage.Storage, bd *benchData) error { + bh.runBenchmark(b, false, nil, func(i int, ps storage.PeerStorage, bd *benchData) error { err := ps.PutLeecher(bd.infoHashes[0], bd.peers[i%1000]) if err != nil { return err @@ -342,7 +342,7 @@ func (bh *benchHolder) PutGradDelete1k(b *testing.B) { // // PutGradDelete1kInfoHash can not run in parallel. func (bh *benchHolder) PutGradDelete1kInfoHash(b *testing.B) { - bh.runBenchmark(b, false, nil, func(i int, ps storage.Storage, bd *benchData) error { + bh.runBenchmark(b, false, nil, func(i int, ps storage.PeerStorage, bd *benchData) error { err := ps.PutLeecher(bd.infoHashes[i%1000], bd.peers[0]) if err != nil { return err @@ -360,7 +360,7 @@ func (bh *benchHolder) PutGradDelete1kInfoHash(b *testing.B) { // // PutGradDelete1kInfoHash can not run in parallel. func (bh *benchHolder) PutGradDelete1kInfoHash1k(b *testing.B) { - bh.runBenchmark(b, false, nil, func(i int, ps storage.Storage, bd *benchData) error { + bh.runBenchmark(b, false, nil, func(i int, ps storage.PeerStorage, bd *benchData) error { err := ps.PutLeecher(bd.infoHashes[i%1000], bd.peers[(i*3)%1000]) if err != nil { return err @@ -374,7 +374,7 @@ func (bh *benchHolder) PutGradDelete1kInfoHash1k(b *testing.B) { }) } -func putPeers(ps storage.Storage, bd *benchData) error { +func putPeers(ps storage.PeerStorage, bd *benchData) error { for i := 0; i < 1000; i++ { for j := 0; j < 1000; j++ { var err error @@ -391,13 +391,13 @@ func putPeers(ps storage.Storage, bd *benchData) error { return nil } -// AnnounceLeecher benchmarks the AnnouncePeers method of a storage.Storage for +// AnnounceLeecher benchmarks the AnnouncePeers method of a storage.PeerStorage for // announcing a leecher. // The swarm announced to has 500 seeders and 500 leechers. // // AnnounceLeecher can run in parallel. func (bh *benchHolder) AnnounceLeecher(b *testing.B) { - bh.runBenchmark(b, true, putPeers, func(i int, ps storage.Storage, bd *benchData) error { + bh.runBenchmark(b, true, putPeers, func(i int, ps storage.PeerStorage, bd *benchData) error { _, err := ps.AnnouncePeers(bd.infoHashes[0], false, 50, bd.peers[0]) return err }) @@ -408,7 +408,7 @@ func (bh *benchHolder) AnnounceLeecher(b *testing.B) { // // AnnounceLeecher1kInfoHash can run in parallel. func (bh *benchHolder) AnnounceLeecher1kInfoHash(b *testing.B) { - bh.runBenchmark(b, true, putPeers, func(i int, ps storage.Storage, bd *benchData) error { + bh.runBenchmark(b, true, putPeers, func(i int, ps storage.PeerStorage, bd *benchData) error { _, err := ps.AnnouncePeers(bd.infoHashes[i%1000], false, 50, bd.peers[0]) return err }) @@ -419,7 +419,7 @@ func (bh *benchHolder) AnnounceLeecher1kInfoHash(b *testing.B) { // // AnnounceSeeder can run in parallel. func (bh *benchHolder) AnnounceSeeder(b *testing.B) { - bh.runBenchmark(b, true, putPeers, func(i int, ps storage.Storage, bd *benchData) error { + bh.runBenchmark(b, true, putPeers, func(i int, ps storage.PeerStorage, bd *benchData) error { _, err := ps.AnnouncePeers(bd.infoHashes[0], true, 50, bd.peers[0]) return err }) @@ -430,18 +430,18 @@ func (bh *benchHolder) AnnounceSeeder(b *testing.B) { // // AnnounceSeeder1kInfoHash can run in parallel. func (bh *benchHolder) AnnounceSeeder1kInfoHash(b *testing.B) { - bh.runBenchmark(b, true, putPeers, func(i int, ps storage.Storage, bd *benchData) error { + bh.runBenchmark(b, true, putPeers, func(i int, ps storage.PeerStorage, bd *benchData) error { _, err := ps.AnnouncePeers(bd.infoHashes[i%1000], true, 50, bd.peers[0]) return err }) } -// ScrapeSwarm benchmarks the ScrapeSwarm method of a storage.Storage. +// ScrapeSwarm benchmarks the ScrapeSwarm method of a storage.PeerStorage. // The swarm scraped has 500 seeders and 500 leechers. // // ScrapeSwarm can run in parallel. func (bh *benchHolder) ScrapeSwarm(b *testing.B) { - bh.runBenchmark(b, true, putPeers, func(i int, ps storage.Storage, bd *benchData) error { + bh.runBenchmark(b, true, putPeers, func(i int, ps storage.PeerStorage, bd *benchData) error { ps.ScrapeSwarm(bd.infoHashes[0], bd.peers[0]) return nil }) @@ -451,7 +451,7 @@ func (bh *benchHolder) ScrapeSwarm(b *testing.B) { // // ScrapeSwarm1kInfoHash can run in parallel. func (bh *benchHolder) ScrapeSwarm1kInfoHash(b *testing.B) { - bh.runBenchmark(b, true, putPeers, func(i int, ps storage.Storage, bd *benchData) error { + bh.runBenchmark(b, true, putPeers, func(i int, ps storage.PeerStorage, bd *benchData) error { ps.ScrapeSwarm(bd.infoHashes[i%1000], bd.peers[0]) return nil }) diff --git a/storage/test/storage_test_base.go b/storage/test/storage_test_base.go index bb6b936..e53f15b 100644 --- a/storage/test/storage_test_base.go +++ b/storage/test/storage_test_base.go @@ -14,12 +14,12 @@ import ( // PeerEqualityFunc is the boolean function to use to check two Peers for // equality. -// Depending on the implementation of the Storage, this can be changed to +// Depending on the implementation of the PeerStorage, this can be changed to // use (Peer).EqualEndpoint instead. var PeerEqualityFunc = func(p1, p2 bittorrent.Peer) bool { return p1.Equal(p2) } type testHolder struct { - st storage.Storage + st storage.PeerStorage } type hashPeer struct { @@ -219,7 +219,7 @@ func (th *testHolder) CustomBulkPutContainsLoadDelete(t *testing.T) { Value: c.ih.RawString(), }) } - err := th.st.BulkPut("test", pairs...) + err := th.st.Put("test", pairs...) require.Nil(t, err) // check if exist in ctx we put @@ -260,8 +260,8 @@ func (th *testHolder) GC(t *testing.T) { } } -// RunTests tests a Storage implementation against the interface. -func RunTests(t *testing.T, p storage.Storage) { +// RunTests tests a PeerStorage implementation against the interface. +func RunTests(t *testing.T, p storage.PeerStorage) { th := testHolder{st: p} // Test ErrDNE for non-existent swarms.