(tested) split storage to data and peer interfaces,

add option for persisting (or not) torrent approval data
This commit is contained in:
Lawrence, Rendall
2022-04-16 18:50:19 +03:00
parent 965df2a9c3
commit 01064fd21a
21 changed files with 219 additions and 191 deletions
+3 -3
View File
@@ -28,7 +28,7 @@ var e2eCmd *cobra.Command
// Run represents the state of a running instance of Conf.
type Run struct {
configFilePath string
storage storage.Storage
storage storage.PeerStorage
logic *middleware.Logic
sg *stop.Group
}
@@ -45,7 +45,7 @@ func NewRun(configFilePath string) (*Run, error) {
// Start begins an instance of Conf.
// It is optional to provide an instance of the peer store to avoid the
// creation of a new one.
func (r *Run) Start(ps storage.Storage) error {
func (r *Run) Start(ps storage.PeerStorage) error {
configFile, err := ParseConfigFile(r.configFilePath)
if err != nil {
return fmt.Errorf("failed to read config: %w", err)
@@ -115,7 +115,7 @@ func combineErrors(prefix string, errs []error) error {
}
// Stop shuts down an instance of Conf.
func (r *Run) Stop(keepPeerStore bool) (storage.Storage, error) {
func (r *Run) Stop(keepPeerStore bool) (storage.PeerStorage, error) {
log.Debug("stopping frontends and metrics server")
if errs := r.sg.Stop().Wait(); len(errs) != 0 {
return nil, combineErrors("failed while shutting down frontends", errs)
+2
View File
@@ -220,6 +220,8 @@ mochi:
# - name: torrent approval
# options:
# initial_source: list
# Save data provided by source in storage above
# preserve: false
# configuration:
# hash_list:
# - "a1b2c3d4e5a1b2c3d4e5a1b2c3d4e5a1b2c3d4e5"
+7 -5
View File
@@ -21,10 +21,6 @@ If mode is **black list** (`invert` set to `true`), tracker will allow all hashe
There are two sources of hashes: `list` and `directory`.
Both of them used as **INITIAL** source for storing in storage.
If storage is not `memory`, records are persisted until _somebody_
or _something_ (different tool with access to storage) won't delete it.
`list` is the static set of hashes, specified in configuration file.
`directory` will watch for `*.torrent` files in specified path and
@@ -32,11 +28,16 @@ append/delete records from storage. This source will parse all existing
files at start and then watch for new files to add, or for delete events
to remove hash from storage.
Note: if storage is not `memory`, and `preserve` option set to `true`, records
will be persisted in storage until _somebody_ or _something_ (different tool with access
to storage) won't delete it.
## Configuration
This middleware provides the following parameters for configuration:
- `initial_source` - source type: `list` or `directory`
- `preserve`: - save source provided data into storage
- `configuration` - options for specified source
- `list`:
- `hash_list` - list of HEX encoded hashes
@@ -57,8 +58,9 @@ mochi:
- name: torrent approval
options:
initial_source: list
preserve: true
configuration:
hash_list: ["AAA", "BBB"]
hash_list: [ "AAA", "BBB" ]
invert: false
storage_ctx: APPROVED_HASH
```
+1 -1
View File
@@ -39,7 +39,7 @@ type hook struct {
unapproved map[ClientID]struct{}
}
func build(options conf.MapConfig, _ storage.Storage) (middleware.Hook, error) {
func build(options conf.MapConfig, _ storage.PeerStorage) (middleware.Hook, error) {
var cfg Config
if err := options.Unmarshal(&cfg); err != nil {
+2 -2
View File
@@ -27,7 +27,7 @@ type skipSwarmInteraction struct{}
var SkipSwarmInteractionKey = skipSwarmInteraction{}
type swarmInteractionHook struct {
store storage.Storage
store storage.PeerStorage
}
func (h *swarmInteractionHook) HandleAnnounce(ctx context.Context, req *bittorrent.AnnounceRequest, _ *bittorrent.AnnounceResponse) (_ context.Context, err error) {
@@ -87,7 +87,7 @@ var SkipResponseHookKey = skipResponseHook{}
// var ScrapeIsIPv6Key = scrapeAddressType{}
type responseHook struct {
store storage.Storage
store storage.PeerStorage
}
func (h *responseHook) HandleAnnounce(ctx context.Context, req *bittorrent.AnnounceRequest, resp *bittorrent.AnnounceResponse) (_ context.Context, err error) {
+1 -1
View File
@@ -70,7 +70,7 @@ type hook struct {
closing chan struct{}
}
func build(options conf.MapConfig, _ storage.Storage) (middleware.Hook, error) {
func build(options conf.MapConfig, _ storage.PeerStorage) (middleware.Hook, error) {
var cfg Config
if err := options.Unmarshal(&cfg); err != nil {
+1 -1
View File
@@ -15,7 +15,7 @@ var _ frontend.TrackerLogic = &Logic{}
// NewLogic creates a new instance of a TrackerLogic that executes the provided
// middleware hooks.
func NewLogic(annInterval, minAnnInterval time.Duration, peerStore storage.Storage, preHooks, postHooks []Hook) *Logic {
func NewLogic(annInterval, minAnnInterval time.Duration, peerStore storage.PeerStorage, preHooks, postHooks []Hook) *Logic {
return &Logic{
announceInterval: annInterval,
minAnnounceInterval: minAnnInterval,
+3 -3
View File
@@ -23,7 +23,7 @@ var (
//
// The `options` parameter is map of parameters that should be unmarshalled into
// the hook's custom configuration.
type Builder func(options conf.MapConfig, storage storage.Storage) (Hook, error)
type Builder func(options conf.MapConfig, storage storage.PeerStorage) (Hook, error)
// RegisterBuilder makes a Builder available by the provided name.
//
@@ -51,7 +51,7 @@ func RegisterBuilder(name string, d Builder) {
// list of registered Builders.
//
// If a driver does not exist, returns ErrBuilderDoesNotExist.
func New(name string, options conf.MapConfig, storage storage.Storage) (Hook, error) {
func New(name string, options conf.MapConfig, storage storage.PeerStorage) (Hook, error) {
driversM.RLock()
defer driversM.RUnlock()
@@ -72,7 +72,7 @@ type Config struct {
// HooksFromHookConfigs is a utility function for initializing Hooks in bulk.
// each element of configs must contain pairs `name` - string and `options` - map[string]any
func HooksFromHookConfigs(configs []conf.MapConfig, storage storage.Storage) (hooks []Hook, err error) {
func HooksFromHookConfigs(configs []conf.MapConfig, storage storage.PeerStorage) (hooks []Hook, err error) {
for _, cfg := range configs {
var c Config
@@ -15,7 +15,7 @@ import (
const DefaultStorageCtxName = "MW_APPROVAL"
// Builder function that creates and configures specific container
type Builder func(conf.MapConfig, storage.Storage) (Container, error)
type Builder func(conf.MapConfig, storage.DataStorage) (Container, error)
var (
buildersMU sync.Mutex
@@ -45,7 +45,7 @@ type Container interface {
}
// GetContainer creates Container by its name and provided confBytes
func GetContainer(name string, config conf.MapConfig, storage storage.Storage) (Container, error) {
func GetContainer(name string, config conf.MapConfig, storage storage.DataStorage) (Container, error) {
buildersMU.Lock()
defer buildersMU.Unlock()
var err error
@@ -35,7 +35,7 @@ type Config struct {
Path string
}
func build(conf conf.MapConfig, st storage.Storage) (container.Container, error) {
func build(conf conf.MapConfig, st storage.DataStorage) (container.Container, error) {
c := new(Config)
if err := conf.Unmarshal(c); err != nil {
return nil, fmt.Errorf("unable to deserialise configuration: %w", err)
@@ -84,7 +84,7 @@ func build(conf conf.MapConfig, st storage.Storage) (container.Container, error)
if len(name) == 0 {
name = list.DUMMY
}
if err := d.Storage.BulkPut(d.StorageCtx,
if err := d.Storage.Put(d.StorageCtx,
storage.Entry{
Key: event.InfoHash.AsString(),
Value: name,
@@ -33,7 +33,7 @@ type Config struct {
// DUMMY used as value placeholder if storage needs some value with
const DUMMY = "_"
func build(conf conf.MapConfig, st storage.Storage) (container.Container, error) {
func build(conf conf.MapConfig, st storage.DataStorage) (container.Container, error) {
c := new(Config)
if err := conf.Unmarshal(c); err != nil {
return nil, fmt.Errorf("unable to deserialise configuration: %w", err)
@@ -61,7 +61,7 @@ func build(conf conf.MapConfig, st storage.Storage) (container.Container, error)
init = append(init, storage.Entry{Key: ih.TruncateV1().RawString(), Value: DUMMY})
}
}
if err := l.Storage.BulkPut(l.StorageCtx, init...); err != nil {
if err := l.Storage.Put(l.StorageCtx, init...); err != nil {
return nil, fmt.Errorf("unable to put initial data: %w", err)
}
}
@@ -73,7 +73,7 @@ type List struct {
// Invert see Config.Invert description.
Invert bool
// Storage implementation where hashes are stored for approval checks.
Storage storage.Storage
Storage storage.DataStorage
// StorageCtx see Config.StorageCtx description.
StorageCtx string
}
+11 -2
View File
@@ -10,6 +10,7 @@ import (
"github.com/sot-tech/mochi/middleware"
"github.com/sot-tech/mochi/middleware/torrentapproval/container"
"github.com/sot-tech/mochi/pkg/conf"
"github.com/sot-tech/mochi/storage/memory"
// import directory watcher to enable appropriate support
_ "github.com/sot-tech/mochi/middleware/torrentapproval/container/directory"
@@ -30,11 +31,14 @@ func init() {
type baseConfig struct {
// Source - name of container for initial values
Source string `cfg:"initial_source"`
// Preserve - if true, container will receive real registered storage if it is NOT `memory`
// if false - temporary in-memory storage will be used or created
Preserve bool
// Configuration depends on used container
Configuration conf.MapConfig
}
func build(options conf.MapConfig, storage storage.Storage) (h middleware.Hook, err error) {
func build(options conf.MapConfig, st storage.PeerStorage) (h middleware.Hook, err error) {
var cfg baseConfig
if err = options.Unmarshal(&cfg); err != nil {
return nil, fmt.Errorf("middleware %s: %w", Name, err)
@@ -48,8 +52,13 @@ func build(options conf.MapConfig, storage storage.Storage) (h middleware.Hook,
return nil, fmt.Errorf("invalid options for middleware %s: options not provided", Name)
}
var ds storage.DataStorage = st
if !cfg.Preserve && ds.Preservable() {
ds = memory.NewDataStore()
}
var c container.Container
if c, err = container.GetContainer(cfg.Source, cfg.Configuration, storage); err == nil {
if c, err = container.GetContainer(cfg.Source, cfg.Configuration, ds); err == nil {
h = &hook{c}
}
return h, err
@@ -67,7 +67,7 @@ var cases = []struct {
func TestHandleAnnounce(t *testing.T) {
config := memory.Config{}.Validate()
storage, err := memory.New(config)
storage, err := memory.NewPeerStorage(config)
require.Nil(t, err)
for _, tt := range cases {
t.Run(fmt.Sprintf("testing hash %s", tt.ih), func(t *testing.T) {
+1 -1
View File
@@ -22,7 +22,7 @@ func init() {
middleware.RegisterBuilder(Name, build)
}
func build(options conf.MapConfig, _ storage.Storage) (h middleware.Hook, err error) {
func build(options conf.MapConfig, _ storage.PeerStorage) (h middleware.Hook, err error) {
var cfg Config
if err = options.Unmarshal(&cfg); err != nil {
+58 -51
View File
@@ -38,15 +38,15 @@ func init() {
type driver struct{}
func (d driver) NewStorage(icfg conf.MapConfig) (storage.Storage, error) {
func (d driver) NewStorage(icfg conf.MapConfig) (storage.PeerStorage, error) {
var cfg Config
if err := icfg.Unmarshal(&cfg); err != nil {
return nil, err
}
return New(cfg)
return NewPeerStorage(cfg)
}
// Config holds the configuration of a memory Storage.
// Config holds the configuration of a memory PeerStorage.
type Config struct {
GarbageCollectionInterval time.Duration `cfg:"gc_interval"`
PrometheusReportingInterval time.Duration `cfg:"prometheus_reporting_interval"`
@@ -111,14 +111,14 @@ func (cfg Config) Validate() Config {
return validcfg
}
// New creates a new Storage backed by memory.
func New(provided Config) (storage.Storage, error) {
// NewPeerStorage creates a new PeerStorage backed by memory.
func NewPeerStorage(provided Config) (storage.PeerStorage, error) {
cfg := provided.Validate()
ps := &store{
cfg: cfg,
shards: make([]*peerShard, cfg.ShardCount*2),
contexts: sync.Map{},
closed: make(chan struct{}),
ps := &peerStore{
cfg: cfg,
shards: make([]*peerShard, cfg.ShardCount*2),
DataStorage: NewDataStore(),
closed: make(chan struct{}),
}
for i := 0; i < cfg.ShardCount*2; i++ {
@@ -183,20 +183,20 @@ type swarm struct {
leechers map[string]int64
}
type store struct {
cfg Config
shards []*peerShard
contexts sync.Map
type peerStore struct {
storage.DataStorage
cfg Config
shards []*peerShard
closed chan struct{}
wg sync.WaitGroup
}
var _ storage.Storage = &store{}
var _ storage.PeerStorage = &peerStore{}
// populateProm aggregates metrics over all shards and then posts them to
// prometheus.
func (ps *store) populateProm() {
func (ps *peerStore) populateProm() {
var numInfohashes, numSeeders, numLeechers uint64
for _, s := range ps.shards {
@@ -217,11 +217,11 @@ func recordGCDuration(duration time.Duration) {
storage.PromGCDurationMilliseconds.Observe(float64(duration.Nanoseconds()) / float64(time.Millisecond))
}
func (ps *store) getClock() int64 {
func (ps *peerStore) getClock() int64 {
return timecache.NowUnixNano()
}
func (ps *store) shardIndex(infoHash bittorrent.InfoHash, addr netip.Addr) uint32 {
func (ps *peerStore) shardIndex(infoHash bittorrent.InfoHash, addr netip.Addr) uint32 {
// There are twice the amount of shards specified by the user, the first
// half is dedicated to IPv4 swarms and the second half is dedicated to
// IPv6 swarms.
@@ -232,7 +232,7 @@ func (ps *store) shardIndex(infoHash bittorrent.InfoHash, addr netip.Addr) uint3
return idx
}
func (ps *store) PutSeeder(ih bittorrent.InfoHash, p bittorrent.Peer) error {
func (ps *peerStore) PutSeeder(ih bittorrent.InfoHash, p bittorrent.Peer) error {
select {
case <-ps.closed:
panic("attempted to interact with stopped memory store")
@@ -263,7 +263,7 @@ func (ps *store) PutSeeder(ih bittorrent.InfoHash, p bittorrent.Peer) error {
return nil
}
func (ps *store) DeleteSeeder(ih bittorrent.InfoHash, p bittorrent.Peer) error {
func (ps *peerStore) DeleteSeeder(ih bittorrent.InfoHash, p bittorrent.Peer) error {
select {
case <-ps.closed:
panic("attempted to interact with stopped memory store")
@@ -296,7 +296,7 @@ func (ps *store) DeleteSeeder(ih bittorrent.InfoHash, p bittorrent.Peer) error {
return nil
}
func (ps *store) PutLeecher(ih bittorrent.InfoHash, p bittorrent.Peer) error {
func (ps *peerStore) PutLeecher(ih bittorrent.InfoHash, p bittorrent.Peer) error {
select {
case <-ps.closed:
panic("attempted to interact with stopped memory store")
@@ -327,7 +327,7 @@ func (ps *store) PutLeecher(ih bittorrent.InfoHash, p bittorrent.Peer) error {
return nil
}
func (ps *store) DeleteLeecher(ih bittorrent.InfoHash, p bittorrent.Peer) error {
func (ps *peerStore) DeleteLeecher(ih bittorrent.InfoHash, p bittorrent.Peer) error {
select {
case <-ps.closed:
panic("attempted to interact with stopped memory store")
@@ -360,7 +360,7 @@ func (ps *store) DeleteLeecher(ih bittorrent.InfoHash, p bittorrent.Peer) error
return nil
}
func (ps *store) GraduateLeecher(ih bittorrent.InfoHash, p bittorrent.Peer) error {
func (ps *peerStore) GraduateLeecher(ih bittorrent.InfoHash, p bittorrent.Peer) error {
select {
case <-ps.closed:
panic("attempted to interact with stopped memory store")
@@ -397,7 +397,7 @@ func (ps *store) GraduateLeecher(ih bittorrent.InfoHash, p bittorrent.Peer) erro
return nil
}
func (ps *store) AnnouncePeers(ih bittorrent.InfoHash, seeder bool, numWant int, peer bittorrent.Peer) (peers []bittorrent.Peer, err error) {
func (ps *peerStore) AnnouncePeers(ih bittorrent.InfoHash, seeder bool, numWant int, peer bittorrent.Peer) (peers []bittorrent.Peer, err error) {
select {
case <-ps.closed:
panic("attempted to interact with stopped memory store")
@@ -458,7 +458,7 @@ func (ps *store) AnnouncePeers(ih bittorrent.InfoHash, seeder bool, numWant int,
return
}
func (ps *store) ScrapeSwarm(ih bittorrent.InfoHash, peer bittorrent.Peer) (resp bittorrent.Scrape) {
func (ps *peerStore) ScrapeSwarm(ih bittorrent.InfoHash, peer bittorrent.Peer) (resp bittorrent.Scrape) {
select {
case <-ps.closed:
panic("attempted to interact with stopped memory store")
@@ -482,6 +482,15 @@ func (ps *store) ScrapeSwarm(ih bittorrent.InfoHash, peer bittorrent.Peer) (resp
return
}
// NewDataStore creates new in-memory data store
func NewDataStore() storage.DataStorage {
return new(dataStore)
}
type dataStore struct {
sync.Map
}
func asKey(in any) any {
if in == nil {
panic("unable to use nil map key")
@@ -493,42 +502,36 @@ func asKey(in any) any {
return fmt.Sprint(in)
}
func (ps *store) Put(ctx string, value storage.Entry) error {
m, _ := ps.contexts.LoadOrStore(ctx, new(sync.Map))
m.(*sync.Map).Store(value.Key, value.Value)
return nil
}
func (ps *store) Contains(ctx string, key string) (bool, error) {
var exist bool
if m, found := ps.contexts.Load(ctx); found {
_, exist = m.(*sync.Map).Load(asKey(key))
}
return exist, nil
}
func (ps *store) BulkPut(ctx string, pairs ...storage.Entry) error {
if len(pairs) > 0 {
c, _ := ps.contexts.LoadOrStore(ctx, new(sync.Map))
func (ds *dataStore) Put(ctx string, values ...storage.Entry) error {
if len(values) > 0 {
c, _ := ds.LoadOrStore(ctx, new(sync.Map))
m := c.(*sync.Map)
for _, p := range pairs {
for _, p := range values {
m.Store(asKey(p.Key), p.Value)
}
}
return nil
}
func (ps *store) Load(ctx string, key string) (any, error) {
func (ds *dataStore) Contains(ctx string, key string) (bool, error) {
var exist bool
if m, found := ds.Map.Load(ctx); found {
_, exist = m.(*sync.Map).Load(asKey(key))
}
return exist, nil
}
func (ds *dataStore) Load(ctx string, key string) (any, error) {
var v any
if m, found := ps.contexts.Load(ctx); found {
if m, found := ds.Map.Load(ctx); found {
v, _ = m.(*sync.Map).Load(asKey(key))
}
return v, nil
}
func (ps *store) Delete(ctx string, keys ...string) error {
func (ds *dataStore) Delete(ctx string, keys ...string) error {
if len(keys) > 0 {
if m, found := ps.contexts.Load(ctx); found {
if m, found := ds.Map.Load(ctx); found {
m := m.(*sync.Map)
for _, k := range keys {
m.Delete(asKey(k))
@@ -538,12 +541,16 @@ func (ps *store) Delete(ctx string, keys ...string) error {
return nil
}
// GC deletes all Peers from the Storage which are older than the
func (*dataStore) Preservable() bool {
return false
}
// GC deletes all Peers from the PeerStorage which are older than the
// cutoff time.
//
// This function must be able to execute while other methods on this interface
// are being executed in parallel.
func (ps *store) GC(cutoff time.Time) {
func (ps *peerStore) GC(cutoff time.Time) {
select {
case <-ps.closed:
return
@@ -596,7 +603,7 @@ func (ps *store) GC(cutoff time.Time) {
}
}
func (ps *store) Stop() stop.Result {
func (ps *peerStore) Stop() stop.Result {
c := make(stop.Channel)
go func() {
if ps.closed != nil {
@@ -617,6 +624,6 @@ func (ps *store) Stop() stop.Result {
return c.Result()
}
func (ps *store) LogFields() log.Fields {
func (ps *peerStore) LogFields() log.Fields {
return ps.cfg.LogFields()
}
+2 -2
View File
@@ -8,8 +8,8 @@ import (
"github.com/sot-tech/mochi/storage/test"
)
func createNew() storage.Storage {
ps, err := New(Config{
func createNew() storage.PeerStorage {
ps, err := NewPeerStorage(Config{
ShardCount: 1024,
GarbageCollectionInterval: 10 * time.Minute,
PrometheusReportingInterval: 10 * time.Minute,
+31 -27
View File
@@ -73,7 +73,7 @@ func init() {
type driver struct{}
func (d driver) NewStorage(icfg conf.MapConfig) (storage.Storage, error) {
func (d driver) NewStorage(icfg conf.MapConfig) (storage.PeerStorage, error) {
// Unmarshal the bytes into the proper config type.
var cfg Config
@@ -84,7 +84,7 @@ func (d driver) NewStorage(icfg conf.MapConfig) (storage.Storage, error) {
return New(cfg)
}
// Config holds the configuration of a redis Storage.
// Config holds the configuration of a redis PeerStorage.
type Config struct {
GarbageCollectionInterval time.Duration `cfg:"gc_interval"`
PrometheusReportingInterval time.Duration `cfg:"prometheus_reporting_interval"`
@@ -252,8 +252,8 @@ func connect(cfg Config) (*store, error) {
return db, err
}
// New creates a new Storage backed by redis.
func New(conf Config) (storage.Storage, error) {
// New creates a new PeerStorage backed by redis.
func New(conf Config) (storage.PeerStorage, error) {
cfg, err := conf.Validate()
if err != nil {
return nil, err
@@ -658,30 +658,25 @@ func (ps *store) ScrapeSwarm(ih bittorrent.InfoHash, peer bittorrent.Peer) (resp
return
}
func (ps *store) Put(ctx string, value storage.Entry) error {
return ps.con.HSet(ps.ctx, prefixKey+ctx, value.Key, value.Value).Err()
}
func (ps *store) Contains(ctx string, key string) (bool, error) {
exist, err := ps.con.HExists(ps.ctx, prefixKey+ctx, key).Result()
return exist, asNil(err)
}
const argNumErrorMsg = "ERR wrong number of arguments"
func (ps *store) BulkPut(ctx string, pairs ...storage.Entry) (err error) {
if l := len(pairs); l > 0 {
args := make([]any, 0, l*2)
for _, p := range pairs {
args = append(args, p.Key, p.Value)
}
err = ps.con.HSet(ps.ctx, prefixKey+ctx, args...).Err()
if err != nil {
if strings.Contains(err.Error(), argNumErrorMsg) {
log.Warn("This REDIS version/implementation does not support variadic arguments for HSET")
for _, p := range pairs {
if err = ps.con.HSet(ps.ctx, prefixKey+ctx, p.Key, p.Value).Err(); err != nil {
break
func (ps *store) Put(ctx string, values ...storage.Entry) (err error) {
if l := len(values); l > 0 {
if l == 1 {
err = ps.con.HSet(ps.ctx, prefixKey+ctx, values[0].Key, values[0].Value).Err()
} else {
args := make([]any, 0, l*2)
for _, p := range values {
args = append(args, p.Key, p.Value)
}
err = ps.con.HSet(ps.ctx, prefixKey+ctx, args...).Err()
if err != nil {
if strings.Contains(err.Error(), argNumErrorMsg) {
log.Warn("This REDIS version/implementation does not support variadic arguments for HSET")
for _, p := range values {
if err = ps.con.HSet(ps.ctx, prefixKey+ctx, p.Key, p.Value).Err(); err != nil {
break
}
}
}
}
@@ -690,6 +685,11 @@ func (ps *store) BulkPut(ctx string, pairs ...storage.Entry) (err error) {
return
}
func (ps *store) Contains(ctx string, key string) (bool, error) {
exist, err := ps.con.HExists(ps.ctx, prefixKey+ctx, key).Result()
return exist, asNil(err)
}
func (ps *store) Load(ctx string, key string) (v any, err error) {
v, err = ps.con.HGet(ps.ctx, prefixKey+ctx, key).Result()
if err != nil && errors.Is(err, redis.Nil) {
@@ -715,6 +715,10 @@ func (ps *store) Delete(ctx string, keys ...string) (err error) {
return
}
func (*store) Preservable() bool {
return true
}
func (ps *store) GC(cutoff time.Time) {
log.Debug("storage: purging peers with no announces since", log.Fields{"before": cutoff})
cutoffUnix := cutoff.UnixNano()
@@ -722,7 +726,7 @@ func (ps *store) GC(cutoff time.Time) {
ps.gc(cutoffUnix, true)
}
// gc deletes all Peers from the Storage which are older than the
// gc deletes all Peers from the PeerStorage which are older than the
// cutoff time.
//
// This function must be able to execute while other methods on this interface
+2 -2
View File
@@ -20,8 +20,8 @@ var cfg = Config{
ConnectTimeout: 10 * time.Second,
}
func createNew() s.Storage {
var ps s.Storage
func createNew() s.PeerStorage {
var ps s.PeerStorage
var err error
ps, err = New(cfg)
if err != nil {
+33 -29
View File
@@ -24,13 +24,13 @@ type Entry struct {
Value any
}
// Driver is the interface used to initialize a new type of Storage.
// Driver is the interface used to initialize a new type of PeerStorage.
type Driver interface {
NewStorage(cfg conf.MapConfig) (Storage, error)
NewStorage(cfg conf.MapConfig) (PeerStorage, error)
}
// ErrResourceDoesNotExist is the error returned by all delete methods and the
// AnnouncePeers method of the Storage interface if the requested resource
// AnnouncePeers method of the PeerStorage interface if the requested resource
// does not exist.
var ErrResourceDoesNotExist = bittorrent.ClientError("resource does not exist")
@@ -38,10 +38,31 @@ var ErrResourceDoesNotExist = bittorrent.ClientError("resource does not exist")
// store driver with that name does not exist.
var ErrDriverDoesNotExist = errors.New("peer store driver with that name does not exist")
// Storage is an interface that abstracts the interactions of storing and
// DataStorage is the interface, used for implementing store for arbitrary data
type DataStorage interface {
// Put used to place arbitrary k-v data with specified context
// into storage. ctx parameter used to group data
// (i.e. data only for specific middleware module: hash key, table name etc...)
Put(ctx string, values ...Entry) error
// Contains checks if any data in specified context exist
Contains(ctx string, key string) (bool, error)
// Load used to get arbitrary data in specified context by its key
Load(ctx string, key string) (any, error)
// Delete used to delete arbitrary data in specified context by its keys
Delete(ctx string, keys ...string) error
// Preservable indicates, that this storage can store data permanently,
// in other words, is NOT in-memory storage, which data will be lost after restart
Preservable() bool
}
// PeerStorage is an interface that abstracts the interactions of storing and
// manipulating Peers such that it can be implemented for various data stores.
//
// Implementations of the Storage interface must do the following in addition
// Implementations of the PeerStorage interface must do the following in addition
// to implementing the methods of the interface in the way documented:
//
// - Implement a garbage-collection strategy that ensures stale data is removed.
@@ -50,13 +71,14 @@ var ErrDriverDoesNotExist = errors.New("peer store driver with that name does no
// be scanned periodically and too old Peers removed. The intervals and
// durations involved should be configurable.
// - IPv4 and IPv6 swarms must be isolated from each other.
// A Storage must be able to transparently handle IPv4 and IPv6 Peers, but
// A PeerStorage must be able to transparently handle IPv4 and IPv6 Peers, but
// must separate them. AnnouncePeers and ScrapeSwarm must return information
// about the Swarm matching the given AddressFamily only.
//
// Implementations can be tested against this interface using the tests in
// storage_test.go and the benchmarks in storage_bench.go.
type Storage interface {
type PeerStorage interface {
DataStorage
// PutSeeder adds a Seeder to the Swarm identified by the provided
// InfoHash.
PutSeeder(infoHash bittorrent.InfoHash, peer bittorrent.Peer) error
@@ -114,33 +136,15 @@ type Storage interface {
// If the Swarm does not exist, an empty Scrape and no error is returned.
ScrapeSwarm(infoHash bittorrent.InfoHash, peer bittorrent.Peer) bittorrent.Scrape
// Put used to place arbitrary k-v data with specified context
// into storage. ctx parameter used to group data
// (i.e. data only for specific middleware module)
Put(ctx string, value Entry) error
// BulkPut used to place array of k-v data in specified context.
// Useful when several data entries should be added in single transaction/connection
BulkPut(ctx string, values ...Entry) error
// Contains checks if any data in specified context exist
Contains(ctx string, key string) (bool, error)
// Load used to get arbitrary data in specified context by its key
Load(ctx string, key string) (any, error)
// Delete used to delete arbitrary data in specified context by its keys
Delete(ctx string, keys ...string) error
// GC used to delete stale data, such as timed out seeders/leechers
GC(cutoff time.Time)
// Stopper is an interface that expects a Stop method to stop the Storage.
// Stopper is an interface that expects a Stop method to stop the PeerStorage.
// For more details see the documentation in the stop package.
stop.Stopper
// Fielder returns a loggable version of the data used to configure and
// operate a particular Storage.
// operate a particular PeerStorage.
log.Fielder
}
@@ -166,11 +170,11 @@ func RegisterDriver(name string, d Driver) {
drivers[name] = d
}
// NewStorage attempts to initialize a new Storage instance from
// NewStorage attempts to initialize a new PeerStorage instance from
// the list of registered Drivers.
//
// If a driver does not exist, returns ErrDriverDoesNotExist.
func NewStorage(name string, cfg conf.MapConfig) (ps Storage, err error) {
func NewStorage(name string, cfg conf.MapConfig) (ps PeerStorage, err error) {
driversM.RLock()
defer driversM.RUnlock()
+48 -48
View File
@@ -52,9 +52,9 @@ func generatePeers() (a [10000]bittorrent.Peer) {
}
type (
benchExecFunc func(int, storage.Storage, *benchData) error
benchSetupFunc func(storage.Storage, *benchData) error
benchStorageConstructor func() storage.Storage
benchExecFunc func(int, storage.PeerStorage, *benchData) error
benchSetupFunc func(storage.PeerStorage, *benchData) error
benchStorageConstructor func() storage.PeerStorage
)
type benchHolder struct {
@@ -102,65 +102,65 @@ func (bh *benchHolder) runBenchmark(b *testing.B, parallel bool, sf benchSetupFu
}
// Nop executes a no-op for each iteration.
// It should produce the same results for each storage.Storage.
// It should produce the same results for each storage.PeerStorage.
// This can be used to get an estimate of the impact of the benchmark harness
// on benchmark results and an estimate of the general performance of the system
// benchmarked on.
//
// Nop can run in parallel.
func (bh *benchHolder) Nop(b *testing.B) {
bh.runBenchmark(b, true, putPeers, func(i int, ps storage.Storage, bd *benchData) error {
bh.runBenchmark(b, true, putPeers, func(i int, ps storage.PeerStorage, bd *benchData) error {
return nil
})
}
// Put benchmarks the PutSeeder method of a storage.Storage by repeatedly Putting the
// Put benchmarks the PutSeeder method of a storage.PeerStorage by repeatedly Putting the
// same Peer for the same InfoHash.
//
// Put can run in parallel.
func (bh *benchHolder) Put(b *testing.B) {
bh.runBenchmark(b, true, nil, func(i int, ps storage.Storage, bd *benchData) error {
bh.runBenchmark(b, true, nil, func(i int, ps storage.PeerStorage, bd *benchData) error {
return ps.PutSeeder(bd.infoHashes[0], bd.peers[0])
})
}
// Put1k benchmarks the PutSeeder method of a storage.Storage by cycling through 1000
// Put1k benchmarks the PutSeeder method of a storage.PeerStorage by cycling through 1000
// Peers and Putting them into the swarm of one infohash.
//
// Put1k can run in parallel.
func (bh *benchHolder) Put1k(b *testing.B) {
bh.runBenchmark(b, true, nil, func(i int, ps storage.Storage, bd *benchData) error {
bh.runBenchmark(b, true, nil, func(i int, ps storage.PeerStorage, bd *benchData) error {
return ps.PutSeeder(bd.infoHashes[0], bd.peers[i%1000])
})
}
// Put1kInfoHash benchmarks the PutSeeder method of a storage.Storage by cycling
// Put1kInfoHash benchmarks the PutSeeder method of a storage.PeerStorage by cycling
// through 1000 infoHashes and putting the same peer into their swarms.
//
// Put1kInfoHash can run in parallel.
func (bh *benchHolder) Put1kInfoHash(b *testing.B) {
bh.runBenchmark(b, true, nil, func(i int, ps storage.Storage, bd *benchData) error {
bh.runBenchmark(b, true, nil, func(i int, ps storage.PeerStorage, bd *benchData) error {
return ps.PutSeeder(bd.infoHashes[i%1000], bd.peers[0])
})
}
// Put1kInfoHash1k benchmarks the PutSeeder method of a storage.Storage by cycling
// Put1kInfoHash1k benchmarks the PutSeeder method of a storage.PeerStorage by cycling
// through 1000 infoHashes and 1000 Peers and calling Put with them.
//
// Put1kInfoHash1k can run in parallel.
func (bh *benchHolder) Put1kInfoHash1k(b *testing.B) {
bh.runBenchmark(b, true, nil, func(i int, ps storage.Storage, bd *benchData) error {
bh.runBenchmark(b, true, nil, func(i int, ps storage.PeerStorage, bd *benchData) error {
err := ps.PutSeeder(bd.infoHashes[i%1000], bd.peers[(i*3)%1000])
return err
})
}
// PutDelete benchmarks the PutSeeder and DeleteSeeder methods of a storage.Storage by
// PutDelete benchmarks the PutSeeder and DeleteSeeder methods of a storage.PeerStorage by
// calling PutSeeder followed by DeleteSeeder for one Peer and one infohash.
//
// PutDelete can not run in parallel.
func (bh *benchHolder) PutDelete(b *testing.B) {
bh.runBenchmark(b, false, nil, func(i int, ps storage.Storage, bd *benchData) error {
bh.runBenchmark(b, false, nil, func(i int, ps storage.PeerStorage, bd *benchData) error {
err := ps.PutSeeder(bd.infoHashes[0], bd.peers[0])
if err != nil {
return err
@@ -174,7 +174,7 @@ func (bh *benchHolder) PutDelete(b *testing.B) {
//
// PutDelete1k can not run in parallel.
func (bh *benchHolder) PutDelete1k(b *testing.B) {
bh.runBenchmark(b, false, nil, func(i int, ps storage.Storage, bd *benchData) error {
bh.runBenchmark(b, false, nil, func(i int, ps storage.PeerStorage, bd *benchData) error {
err := ps.PutSeeder(bd.infoHashes[0], bd.peers[i%1000])
if err != nil {
return err
@@ -188,7 +188,7 @@ func (bh *benchHolder) PutDelete1k(b *testing.B) {
//
// PutDelete1kInfoHash can not run in parallel.
func (bh *benchHolder) PutDelete1kInfoHash(b *testing.B) {
bh.runBenchmark(b, false, nil, func(i int, ps storage.Storage, bd *benchData) error {
bh.runBenchmark(b, false, nil, func(i int, ps storage.PeerStorage, bd *benchData) error {
err := ps.PutSeeder(bd.infoHashes[i%1000], bd.peers[0])
if err != nil {
return err
@@ -202,7 +202,7 @@ func (bh *benchHolder) PutDelete1kInfoHash(b *testing.B) {
//
// PutDelete1kInfoHash1k can not run in parallel.
func (bh *benchHolder) PutDelete1kInfoHash1k(b *testing.B) {
bh.runBenchmark(b, false, nil, func(i int, ps storage.Storage, bd *benchData) error {
bh.runBenchmark(b, false, nil, func(i int, ps storage.PeerStorage, bd *benchData) error {
err := ps.PutSeeder(bd.infoHashes[i%1000], bd.peers[(i*3)%1000])
if err != nil {
return err
@@ -212,102 +212,102 @@ func (bh *benchHolder) PutDelete1kInfoHash1k(b *testing.B) {
})
}
// DeleteNonexist benchmarks the DeleteSeeder method of a storage.Storage by
// DeleteNonexist benchmarks the DeleteSeeder method of a storage.PeerStorage by
// attempting to delete a Peer that is nonexistent.
//
// DeleteNonexist can run in parallel.
func (bh *benchHolder) DeleteNonexist(b *testing.B) {
bh.runBenchmark(b, true, nil, func(i int, ps storage.Storage, bd *benchData) error {
bh.runBenchmark(b, true, nil, func(i int, ps storage.PeerStorage, bd *benchData) error {
_ = ps.DeleteSeeder(bd.infoHashes[0], bd.peers[0])
return nil
})
}
// DeleteNonexist1k benchmarks the DeleteSeeder method of a storage.Storage by
// DeleteNonexist1k benchmarks the DeleteSeeder method of a storage.PeerStorage by
// attempting to delete one of 1000 nonexistent Peers.
//
// DeleteNonexist can run in parallel.
func (bh *benchHolder) DeleteNonexist1k(b *testing.B) {
bh.runBenchmark(b, true, nil, func(i int, ps storage.Storage, bd *benchData) error {
bh.runBenchmark(b, true, nil, func(i int, ps storage.PeerStorage, bd *benchData) error {
_ = ps.DeleteSeeder(bd.infoHashes[0], bd.peers[i%1000])
return nil
})
}
// DeleteNonexist1kInfoHash benchmarks the DeleteSeeder method of a storage.Storage by
// DeleteNonexist1kInfoHash benchmarks the DeleteSeeder method of a storage.PeerStorage by
// attempting to delete one Peer from one of 1000 infoHashes.
//
// DeleteNonexist1kInfoHash can run in parallel.
func (bh *benchHolder) DeleteNonexist1kInfoHash(b *testing.B) {
bh.runBenchmark(b, true, nil, func(i int, ps storage.Storage, bd *benchData) error {
bh.runBenchmark(b, true, nil, func(i int, ps storage.PeerStorage, bd *benchData) error {
_ = ps.DeleteSeeder(bd.infoHashes[i%1000], bd.peers[0])
return nil
})
}
// DeleteNonexist1kInfoHash1k benchmarks the Delete method of a storage.Storage by
// DeleteNonexist1kInfoHash1k benchmarks the Delete method of a storage.PeerStorage by
// attempting to delete one of 1000 Peers from one of 1000 InfoHashes.
//
// DeleteNonexist1kInfoHash1k can run in parallel.
func (bh *benchHolder) DeleteNonexist1kInfoHash1k(b *testing.B) {
bh.runBenchmark(b, true, nil, func(i int, ps storage.Storage, bd *benchData) error {
bh.runBenchmark(b, true, nil, func(i int, ps storage.PeerStorage, bd *benchData) error {
_ = ps.DeleteSeeder(bd.infoHashes[i%1000], bd.peers[(i*3)%1000])
return nil
})
}
// GradNonexist benchmarks the GraduateLeecher method of a storage.Storage by
// GradNonexist benchmarks the GraduateLeecher method of a storage.PeerStorage by
// attempting to graduate a nonexistent Peer.
//
// GradNonexist can run in parallel.
func (bh *benchHolder) GradNonexist(b *testing.B) {
bh.runBenchmark(b, true, nil, func(i int, ps storage.Storage, bd *benchData) error {
bh.runBenchmark(b, true, nil, func(i int, ps storage.PeerStorage, bd *benchData) error {
_ = ps.GraduateLeecher(bd.infoHashes[0], bd.peers[0])
return nil
})
}
// GradNonexist1k benchmarks the GraduateLeecher method of a storage.Storage by
// GradNonexist1k benchmarks the GraduateLeecher method of a storage.PeerStorage by
// attempting to graduate one of 1000 nonexistent Peers.
//
// GradNonexist1k can run in parallel.
func (bh *benchHolder) GradNonexist1k(b *testing.B) {
bh.runBenchmark(b, true, nil, func(i int, ps storage.Storage, bd *benchData) error {
bh.runBenchmark(b, true, nil, func(i int, ps storage.PeerStorage, bd *benchData) error {
_ = ps.GraduateLeecher(bd.infoHashes[0], bd.peers[i%1000])
return nil
})
}
// GradNonexist1kInfoHash benchmarks the GraduateLeecher method of a storage.Storage
// GradNonexist1kInfoHash benchmarks the GraduateLeecher method of a storage.PeerStorage
// by attempting to graduate a nonexistent Peer for one of 100 InfoHashes.
//
// GradNonexist1kInfoHash can run in parallel.
func (bh *benchHolder) GradNonexist1kInfoHash(b *testing.B) {
bh.runBenchmark(b, true, nil, func(i int, ps storage.Storage, bd *benchData) error {
bh.runBenchmark(b, true, nil, func(i int, ps storage.PeerStorage, bd *benchData) error {
_ = ps.GraduateLeecher(bd.infoHashes[i%1000], bd.peers[0])
return nil
})
}
// GradNonexist1kInfoHash1k benchmarks the GraduateLeecher method of a storage.Storage
// GradNonexist1kInfoHash1k benchmarks the GraduateLeecher method of a storage.PeerStorage
// by attempting to graduate one of 1000 nonexistent Peers for one of 1000
// infoHashes.
//
// GradNonexist1kInfoHash1k can run in parallel.
func (bh *benchHolder) GradNonexist1kInfoHash1k(b *testing.B) {
bh.runBenchmark(b, true, nil, func(i int, ps storage.Storage, bd *benchData) error {
bh.runBenchmark(b, true, nil, func(i int, ps storage.PeerStorage, bd *benchData) error {
_ = ps.GraduateLeecher(bd.infoHashes[i%1000], bd.peers[(i*3)%1000])
return nil
})
}
// PutGradDelete benchmarks the PutLeecher, GraduateLeecher and DeleteSeeder
// methods of a storage.Storage by adding one leecher to a swarm, promoting it to a
// methods of a storage.PeerStorage by adding one leecher to a swarm, promoting it to a
// seeder and deleting the seeder.
//
// PutGradDelete can not run in parallel.
func (bh *benchHolder) PutGradDelete(b *testing.B) {
bh.runBenchmark(b, false, nil, func(i int, ps storage.Storage, bd *benchData) error {
bh.runBenchmark(b, false, nil, func(i int, ps storage.PeerStorage, bd *benchData) error {
err := ps.PutLeecher(bd.infoHashes[0], bd.peers[0])
if err != nil {
return err
@@ -324,7 +324,7 @@ func (bh *benchHolder) PutGradDelete(b *testing.B) {
//
// PutGradDelete1k can not run in parallel.
func (bh *benchHolder) PutGradDelete1k(b *testing.B) {
bh.runBenchmark(b, false, nil, func(i int, ps storage.Storage, bd *benchData) error {
bh.runBenchmark(b, false, nil, func(i int, ps storage.PeerStorage, bd *benchData) error {
err := ps.PutLeecher(bd.infoHashes[0], bd.peers[i%1000])
if err != nil {
return err
@@ -342,7 +342,7 @@ func (bh *benchHolder) PutGradDelete1k(b *testing.B) {
//
// PutGradDelete1kInfoHash can not run in parallel.
func (bh *benchHolder) PutGradDelete1kInfoHash(b *testing.B) {
bh.runBenchmark(b, false, nil, func(i int, ps storage.Storage, bd *benchData) error {
bh.runBenchmark(b, false, nil, func(i int, ps storage.PeerStorage, bd *benchData) error {
err := ps.PutLeecher(bd.infoHashes[i%1000], bd.peers[0])
if err != nil {
return err
@@ -360,7 +360,7 @@ func (bh *benchHolder) PutGradDelete1kInfoHash(b *testing.B) {
//
// PutGradDelete1kInfoHash can not run in parallel.
func (bh *benchHolder) PutGradDelete1kInfoHash1k(b *testing.B) {
bh.runBenchmark(b, false, nil, func(i int, ps storage.Storage, bd *benchData) error {
bh.runBenchmark(b, false, nil, func(i int, ps storage.PeerStorage, bd *benchData) error {
err := ps.PutLeecher(bd.infoHashes[i%1000], bd.peers[(i*3)%1000])
if err != nil {
return err
@@ -374,7 +374,7 @@ func (bh *benchHolder) PutGradDelete1kInfoHash1k(b *testing.B) {
})
}
func putPeers(ps storage.Storage, bd *benchData) error {
func putPeers(ps storage.PeerStorage, bd *benchData) error {
for i := 0; i < 1000; i++ {
for j := 0; j < 1000; j++ {
var err error
@@ -391,13 +391,13 @@ func putPeers(ps storage.Storage, bd *benchData) error {
return nil
}
// AnnounceLeecher benchmarks the AnnouncePeers method of a storage.Storage for
// AnnounceLeecher benchmarks the AnnouncePeers method of a storage.PeerStorage for
// announcing a leecher.
// The swarm announced to has 500 seeders and 500 leechers.
//
// AnnounceLeecher can run in parallel.
func (bh *benchHolder) AnnounceLeecher(b *testing.B) {
bh.runBenchmark(b, true, putPeers, func(i int, ps storage.Storage, bd *benchData) error {
bh.runBenchmark(b, true, putPeers, func(i int, ps storage.PeerStorage, bd *benchData) error {
_, err := ps.AnnouncePeers(bd.infoHashes[0], false, 50, bd.peers[0])
return err
})
@@ -408,7 +408,7 @@ func (bh *benchHolder) AnnounceLeecher(b *testing.B) {
//
// AnnounceLeecher1kInfoHash can run in parallel.
func (bh *benchHolder) AnnounceLeecher1kInfoHash(b *testing.B) {
bh.runBenchmark(b, true, putPeers, func(i int, ps storage.Storage, bd *benchData) error {
bh.runBenchmark(b, true, putPeers, func(i int, ps storage.PeerStorage, bd *benchData) error {
_, err := ps.AnnouncePeers(bd.infoHashes[i%1000], false, 50, bd.peers[0])
return err
})
@@ -419,7 +419,7 @@ func (bh *benchHolder) AnnounceLeecher1kInfoHash(b *testing.B) {
//
// AnnounceSeeder can run in parallel.
func (bh *benchHolder) AnnounceSeeder(b *testing.B) {
bh.runBenchmark(b, true, putPeers, func(i int, ps storage.Storage, bd *benchData) error {
bh.runBenchmark(b, true, putPeers, func(i int, ps storage.PeerStorage, bd *benchData) error {
_, err := ps.AnnouncePeers(bd.infoHashes[0], true, 50, bd.peers[0])
return err
})
@@ -430,18 +430,18 @@ func (bh *benchHolder) AnnounceSeeder(b *testing.B) {
//
// AnnounceSeeder1kInfoHash can run in parallel.
func (bh *benchHolder) AnnounceSeeder1kInfoHash(b *testing.B) {
bh.runBenchmark(b, true, putPeers, func(i int, ps storage.Storage, bd *benchData) error {
bh.runBenchmark(b, true, putPeers, func(i int, ps storage.PeerStorage, bd *benchData) error {
_, err := ps.AnnouncePeers(bd.infoHashes[i%1000], true, 50, bd.peers[0])
return err
})
}
// ScrapeSwarm benchmarks the ScrapeSwarm method of a storage.Storage.
// ScrapeSwarm benchmarks the ScrapeSwarm method of a storage.PeerStorage.
// The swarm scraped has 500 seeders and 500 leechers.
//
// ScrapeSwarm can run in parallel.
func (bh *benchHolder) ScrapeSwarm(b *testing.B) {
bh.runBenchmark(b, true, putPeers, func(i int, ps storage.Storage, bd *benchData) error {
bh.runBenchmark(b, true, putPeers, func(i int, ps storage.PeerStorage, bd *benchData) error {
ps.ScrapeSwarm(bd.infoHashes[0], bd.peers[0])
return nil
})
@@ -451,7 +451,7 @@ func (bh *benchHolder) ScrapeSwarm(b *testing.B) {
//
// ScrapeSwarm1kInfoHash can run in parallel.
func (bh *benchHolder) ScrapeSwarm1kInfoHash(b *testing.B) {
bh.runBenchmark(b, true, putPeers, func(i int, ps storage.Storage, bd *benchData) error {
bh.runBenchmark(b, true, putPeers, func(i int, ps storage.PeerStorage, bd *benchData) error {
ps.ScrapeSwarm(bd.infoHashes[i%1000], bd.peers[0])
return nil
})
+5 -5
View File
@@ -14,12 +14,12 @@ import (
// PeerEqualityFunc is the boolean function to use to check two Peers for
// equality.
// Depending on the implementation of the Storage, this can be changed to
// Depending on the implementation of the PeerStorage, this can be changed to
// use (Peer).EqualEndpoint instead.
var PeerEqualityFunc = func(p1, p2 bittorrent.Peer) bool { return p1.Equal(p2) }
type testHolder struct {
st storage.Storage
st storage.PeerStorage
}
type hashPeer struct {
@@ -219,7 +219,7 @@ func (th *testHolder) CustomBulkPutContainsLoadDelete(t *testing.T) {
Value: c.ih.RawString(),
})
}
err := th.st.BulkPut("test", pairs...)
err := th.st.Put("test", pairs...)
require.Nil(t, err)
// check if exist in ctx we put
@@ -260,8 +260,8 @@ func (th *testHolder) GC(t *testing.T) {
}
}
// RunTests tests a Storage implementation against the interface.
func RunTests(t *testing.T, p storage.Storage) {
// RunTests tests a PeerStorage implementation against the interface.
func RunTests(t *testing.T, p storage.PeerStorage) {
th := testHolder{st: p}
// Test ErrDNE for non-existent swarms.