diff --git a/cmd/mochi/server.go b/cmd/mochi/server.go index ab6c7ed..0a674be 100644 --- a/cmd/mochi/server.go +++ b/cmd/mochi/server.go @@ -33,7 +33,7 @@ func (r *Server) Run(cfg *Config) (err error) { log.Info().Msg("metrics disabled because of empty address") } - r.storage, err = storage.NewStorage(cfg.Storage) + r.storage, err = storage.NewPeerStorage(cfg.Storage) if err != nil { return fmt.Errorf("failed to create storage: %w", err) } diff --git a/dist/example_config.yaml b/dist/example_config.yaml index 34a9093..d733116 100644 --- a/dist/example_config.yaml +++ b/dist/example_config.yaml @@ -206,8 +206,11 @@ prehooks: # - name: torrent approval # config: # initial_source: list -# Save data provided by source in storage above -# preserve: false +# Save data provided by source in specific storage. If name is empty or 'internal', provided above 'storage' +# is used, but another storage may be provided (configuration is the same as for 'storage' above) +# store: +# name: internal +# config: # configuration: # hash_list: # - "a1b2c3d4e5a1b2c3d4e5a1b2c3d4e5a1b2c3d4e5" diff --git a/frontend/udp/frontend_test.go b/frontend/udp/frontend_test.go index dda2955..9a1af53 100644 --- a/frontend/udp/frontend_test.go +++ b/frontend/udp/frontend_test.go @@ -16,7 +16,7 @@ func init() { } func TestStartStopRaceIssue437(t *testing.T) { - ps, err := storage.NewStorage(conf.NamedMapConfig{ + ps, err := storage.NewPeerStorage(conf.NamedMapConfig{ Name: "memory", Config: conf.MapConfig{}, }) diff --git a/middleware/torrentapproval/torrentapproval.go b/middleware/torrentapproval/torrentapproval.go index c873967..29f8c7b 100644 --- a/middleware/torrentapproval/torrentapproval.go +++ b/middleware/torrentapproval/torrentapproval.go @@ -4,6 +4,7 @@ package torrentapproval import ( "context" + "errors" "fmt" "io" @@ -11,8 +12,6 @@ import ( "github.com/sot-tech/mochi/middleware" "github.com/sot-tech/mochi/middleware/torrentapproval/container" "github.com/sot-tech/mochi/pkg/conf" - "github.com/sot-tech/mochi/storage/memory" - // import directory watcher to enable appropriate support _ "github.com/sot-tech/mochi/middleware/torrentapproval/container/directory" @@ -24,6 +23,8 @@ import ( // Name is the name by which this middleware is registered with Conf. const Name = "torrent approval" +const internalStore = "internal" + func init() { middleware.RegisterBuilder(Name, build) } @@ -31,9 +32,10 @@ func init() { type baseConfig struct { // Source - name of container for initial values Source string `cfg:"initial_source"` - // Preserve - if true, container will receive real registered storage if it is NOT `memory` - // if false - temporary in-memory storage will be used or created + // Deprecated: use Store parameter Preserve bool + // Store where to hold provided data by Source + Store conf.NamedMapConfig // Configuration depends on used container Configuration conf.MapConfig } @@ -52,9 +54,15 @@ func build(config conf.MapConfig, st storage.PeerStorage) (h middleware.Hook, er return nil, fmt.Errorf("invalid config for middleware %s: config not provided", Name) } - var ds storage.DataStorage = st - if !cfg.Preserve && ds.Preservable() { - ds = memory.NewDataStorage() + if cfg.Preserve { + return nil, errors.New("preserve option is deprecated, use store parameter") + } + + var ds storage.DataStorage + if len(cfg.Store.Name) == 0 || cfg.Store.Name == internalStore { + ds = st + } else if ds, err = storage.NewDataStorage(cfg.Store); err != nil { + return } var c container.Container diff --git a/middleware/torrentapproval/torrentapproval_test.go b/middleware/torrentapproval/torrentapproval_test.go index a6ca2b9..daaf84f 100644 --- a/middleware/torrentapproval/torrentapproval_test.go +++ b/middleware/torrentapproval/torrentapproval_test.go @@ -71,8 +71,7 @@ var cases = []struct { } func TestHandleAnnounce(t *testing.T) { - config := memory.Config{}.Validate() - storage, err := memory.NewPeerStorage(config) + storage, err := memory.Builder{}.NewPeerStorage(make(conf.MapConfig)) require.Nil(t, err) for _, tt := range cases { t.Run(fmt.Sprintf("testing hash %s", tt.ih), func(t *testing.T) { @@ -84,10 +83,10 @@ func TestHandleAnnounce(t *testing.T) { req := &bittorrent.AnnounceRequest{} resp := &bittorrent.AnnounceResponse{} - hashinfo, err := bittorrent.NewInfoHashString(tt.ih) + ih, err := bittorrent.NewInfoHashString(tt.ih) require.Nil(t, err) - req.InfoHash = hashinfo + req.InfoHash = ih nctx, err := h.HandleAnnounce(ctx, req, resp) require.Equal(t, ctx, nctx) diff --git a/storage/keydb/storage.go b/storage/keydb/storage.go index 7c3facf..d5fddd0 100644 --- a/storage/keydb/storage.go +++ b/storage/keydb/storage.go @@ -33,10 +33,12 @@ var ( func init() { // Register the storage driver. - storage.RegisterDriver("keydb", builder) + storage.RegisterDriver("keydb", builder{}) } -func builder(icfg conf.MapConfig) (storage.PeerStorage, error) { +type builder struct{} + +func (builder) NewPeerStorage(icfg conf.MapConfig) (storage.PeerStorage, error) { var cfg r.Config var err error @@ -47,6 +49,16 @@ func builder(icfg conf.MapConfig) (storage.PeerStorage, error) { return newStore(cfg) } +func (b builder) NewDataStorage(icfg conf.MapConfig) (storage.DataStorage, error) { + var cfg r.Config + + if err := icfg.Unmarshal(&cfg); err != nil { + return nil, err + } + + return r.NewStore(cfg) +} + func newStore(cfg r.Config) (*store, error) { var err error if cfg, err = cfg.Validate(); err != nil { diff --git a/storage/memory/storage.go b/storage/memory/storage.go index 70ebd46..28ec7ce 100644 --- a/storage/memory/storage.go +++ b/storage/memory/storage.go @@ -32,19 +32,24 @@ var logger = log.NewLogger("storage/memory") func init() { // Register the storage driver. - storage.RegisterDriver(Name, builder) + storage.RegisterDriver(Name, Builder{}) } -func builder(icfg conf.MapConfig) (storage.PeerStorage, error) { - var cfg Config +type Builder struct{} + +func (Builder) NewDataStorage(conf.MapConfig) (storage.DataStorage, error) { + return dataStorage(), nil +} + +func (Builder) NewPeerStorage(icfg conf.MapConfig) (storage.PeerStorage, error) { + var cfg config if err := icfg.Unmarshal(&cfg); err != nil { return nil, err } - return NewPeerStorage(cfg) + return peerStorage(cfg) } -// Config holds the configuration of a memory PeerStorage. -type Config struct { +type config struct { ShardCount int `cfg:"shard_count"` } @@ -52,7 +57,7 @@ type Config struct { // default values replacing anything that is invalid. // // This function warns to the logger when a value is changed. -func (cfg Config) Validate() Config { +func (cfg config) Validate() config { validcfg := cfg if cfg.ShardCount <= 0 || cfg.ShardCount > (math.MaxInt/2) { @@ -67,12 +72,11 @@ func (cfg Config) Validate() Config { return validcfg } -// NewPeerStorage creates a new PeerStorage backed by memory. -func NewPeerStorage(provided Config) (storage.PeerStorage, error) { +func peerStorage(provided config) (storage.PeerStorage, error) { cfg := provided.Validate() ps := &peerStore{ shards: make([]*peerShard, cfg.ShardCount*2), - DataStorage: NewDataStorage(), + DataStorage: dataStorage(), closed: make(chan any), } @@ -453,8 +457,7 @@ func (ps *peerStore) ScrapeSwarm(_ context.Context, ih bittorrent.InfoHash) (lee return } -// NewDataStorage creates new in-memory data store -func NewDataStorage() storage.DataStorage { +func dataStorage() storage.DataStorage { return new(dataStore) } diff --git a/storage/memory/storage_test.go b/storage/memory/storage_test.go index c8e3b7f..6c59053 100644 --- a/storage/memory/storage_test.go +++ b/storage/memory/storage_test.go @@ -8,7 +8,7 @@ import ( ) func createNew() storage.PeerStorage { - ps, err := NewPeerStorage(Config{ShardCount: 1024}) + ps, err := peerStorage(config{ShardCount: 1024}) if err != nil { panic(err) } diff --git a/storage/pg/storage.go b/storage/pg/storage.go index b7873b0..3332b62 100644 --- a/storage/pg/storage.go +++ b/storage/pg/storage.go @@ -51,13 +51,37 @@ var ( func init() { // Register the storage builder. - storage.RegisterDriver("pg", builder) + storage.RegisterDriver("pg", builder{}) } -func builder(icfg conf.MapConfig) (storage.PeerStorage, error) { - var cfg Config +type builder struct{} - if err := icfg.Unmarshal(&cfg); err != nil { +func (builder) NewDataStorage(icfg conf.MapConfig) (storage.DataStorage, error) { + var cfg config + + err := icfg.Unmarshal(&cfg) + if err != nil { + return nil, err + } + + cfg, err = cfg.validateDataStore() + if err != nil { + return nil, err + } + + return newStore(cfg) +} + +func (builder) NewPeerStorage(icfg conf.MapConfig) (storage.PeerStorage, error) { + var cfg config + + err := icfg.Unmarshal(&cfg) + if err != nil { + return nil, err + } + + cfg, err = cfg.validateFull() + if err != nil { return nil, err } @@ -73,19 +97,14 @@ func noResultErr(err error) error { return err } -func newStore(cfg Config) (storage.PeerStorage, error) { - cfg, err := cfg.Validate() - if err != nil { - return nil, err - } - +func newStore(cfg config) (storage.PeerStorage, error) { con, err := pgxpool.New(context.Background(), cfg.ConnectionString) if err != nil { return nil, err } return &store{ - Config: cfg, + config: cfg, Pool: con, wg: sync.WaitGroup{}, closed: make(chan any), @@ -121,8 +140,14 @@ type downloadQueryConf struct { IncrementQuery string `cfg:"inc_query"` } -// Config holds the configuration of a redis PeerStorage. -type Config struct { +func checkParameter(p *string, name string) (err error) { + if *p = strings.TrimSpace(*p); len(*p) == 0 { + err = fmt.Errorf(errRequiredParameterNotSetMsg, name) + } + return +} + +type config struct { ConnectionString string `cfg:"connection_string"` PingQuery string `cfg:"ping_query"` Peer peerQueryConf @@ -133,11 +158,7 @@ type Config struct { InfoHashCountQuery string `cfg:"info_hash_count_query"` } -// Validate sanity checks values set in a config and returns a new config with -// default values replacing anything that is invalid. -// -// This function warns to the logger when a value is changed. -func (cfg Config) Validate() (Config, error) { +func (cfg config) validateDataStore() (config, error) { validCfg := cfg validCfg.ConnectionString = strings.TrimSpace(validCfg.ConnectionString) if len(validCfg.ConnectionString) == 0 { @@ -153,66 +174,68 @@ func (cfg Config) Validate() (Config, error) { Msg("falling back to default configuration") } - fn := func(p *string, name string) (err error) { - if *p = strings.TrimSpace(*p); len(*p) == 0 { - err = fmt.Errorf(errRequiredParameterNotSetMsg, name) - } - return - } - - if err := fn(&validCfg.Peer.AddQuery, "peer.addQuery"); err != nil { + if err := checkParameter(&validCfg.Data.AddQuery, "data.addQuery"); err != nil { return cfg, err } - if err := fn(&validCfg.Peer.DelQuery, "peer.delQuery"); err != nil { + if err := checkParameter(&validCfg.Data.GetQuery, "data.getQuery"); err != nil { return cfg, err } - if err := fn(&validCfg.Peer.GraduateQuery, "peer.graduateQuery"); err != nil { + if err := checkParameter(&validCfg.Data.DelQuery, "data.delQuery"); err != nil { return cfg, err } - if err := fn(&validCfg.Peer.CountQuery, "peer.countQuery"); err != nil { + return validCfg, nil +} + +func (cfg config) validateFull() (config, error) { + validCfg, err := cfg.validateDataStore() + if err != nil { return cfg, err } - if err := fn(&validCfg.Peer.CountSeedersColumn, "peer.countSeedersColumn"); err != nil { + if err = checkParameter(&validCfg.Peer.AddQuery, "peer.addQuery"); err != nil { return cfg, err } - if err := fn(&validCfg.Peer.CountLeechersColumn, "peer.countLeechersColumn"); err != nil { + if err = checkParameter(&validCfg.Peer.DelQuery, "peer.delQuery"); err != nil { return cfg, err } - if err := fn(&validCfg.Peer.ByInfoHashClause, "peer.byInfoHashClause"); err != nil { + if err = checkParameter(&validCfg.Peer.GraduateQuery, "peer.graduateQuery"); err != nil { return cfg, err } - if err := fn(&validCfg.Announce.Query, "announce.query"); err != nil { + if err = checkParameter(&validCfg.Peer.CountQuery, "peer.countQuery"); err != nil { return cfg, err } - if err := fn(&validCfg.Announce.PeerIDColumn, "announce.peerIDColumn"); err != nil { + if err = checkParameter(&validCfg.Peer.CountSeedersColumn, "peer.countSeedersColumn"); err != nil { return cfg, err } - if err := fn(&validCfg.Announce.AddressColumn, "announce.addressColumn"); err != nil { + if err = checkParameter(&validCfg.Peer.CountLeechersColumn, "peer.countLeechersColumn"); err != nil { return cfg, err } - if err := fn(&validCfg.Announce.PortColumn, "announce.portColumn"); err != nil { + if err = checkParameter(&validCfg.Peer.ByInfoHashClause, "peer.byInfoHashClause"); err != nil { return cfg, err } - if err := fn(&validCfg.Data.AddQuery, "data.addQuery"); err != nil { + if err = checkParameter(&validCfg.Announce.Query, "announce.query"); err != nil { return cfg, err } - if err := fn(&validCfg.Data.GetQuery, "data.getQuery"); err != nil { + if err = checkParameter(&validCfg.Announce.PeerIDColumn, "announce.peerIDColumn"); err != nil { return cfg, err } - if err := fn(&validCfg.Data.DelQuery, "data.delQuery"); err != nil { + if err = checkParameter(&validCfg.Announce.AddressColumn, "announce.addressColumn"); err != nil { + return cfg, err + } + + if err = checkParameter(&validCfg.Announce.PortColumn, "announce.portColumn"); err != nil { return cfg, err } @@ -227,7 +250,7 @@ func (cfg Config) Validate() (Config, error) { } type store struct { - Config + config *pgxpool.Pool wg sync.WaitGroup closed chan any diff --git a/storage/pg/storage_test.go b/storage/pg/storage_test.go index bab82de..ad305f6 100644 --- a/storage/pg/storage_test.go +++ b/storage/pg/storage_test.go @@ -42,7 +42,7 @@ CREATE TABLE mo_kv ( ` ) -var cfg = Config{ +var cfg = config{ ConnectionString: "host=127.0.0.1 database=test user=postgres pool_max_conns=50", PingQuery: "SELECT 1", Peer: peerQueryConf{ @@ -76,6 +76,10 @@ var cfg = Config{ func createNew() s.PeerStorage { var ps s.PeerStorage var err error + cfg, err = cfg.validateFull() + if err != nil { + panic(fmt.Sprint("invalid configuration: ", err)) + } ps, err = newStore(cfg) if err != nil { panic(fmt.Sprint("Unable to create PostgreSQL connection: ", err, "\nThis driver needs real PostgreSQL instance")) diff --git a/storage/redis/storage.go b/storage/redis/storage.go index 8cf717a..27e7b71 100644 --- a/storage/redis/storage.go +++ b/storage/redis/storage.go @@ -79,21 +79,27 @@ var ( func init() { // Register the storage builder. - storage.RegisterDriver("redis", builder) + storage.RegisterDriver("redis", builder{}) } -func builder(icfg conf.MapConfig) (storage.PeerStorage, error) { - // Unmarshal the bytes into the proper config type. - var cfg Config +type builder struct{} - if err := icfg.Unmarshal(&cfg); err != nil { +func (builder) NewPeerStorage(icfg conf.MapConfig) (storage.PeerStorage, error) { + var cfg Config + var err error + + if err = icfg.Unmarshal(&cfg); err != nil { return nil, err } - return newStore(cfg) + return NewStore(cfg) } -func newStore(cfg Config) (*store, error) { +func (b builder) NewDataStorage(icfg conf.MapConfig) (storage.DataStorage, error) { + return b.NewPeerStorage(icfg) +} + +func NewStore(cfg Config) (storage.PeerStorage, error) { cfg, err := cfg.Validate() if err != nil { return nil, err diff --git a/storage/redis/storage_test.go b/storage/redis/storage_test.go index a8ea916..2be47bb 100644 --- a/storage/redis/storage_test.go +++ b/storage/redis/storage_test.go @@ -19,7 +19,7 @@ var cfg = Config{ func createNew() s.PeerStorage { var ps s.PeerStorage var err error - ps, err = newStore(cfg) + ps, err = NewStore(cfg) if err != nil { panic(fmt.Sprint("Unable to create Redis connection: ", err, "\nThis driver needs real Redis instance")) } diff --git a/storage/storage.go b/storage/storage.go index b38c4d7..d1b12dd 100644 --- a/storage/storage.go +++ b/storage/storage.go @@ -82,9 +82,12 @@ type Entry struct { Value []byte } -// Driver is the function used to initialize a new PeerStorage +// Driver is the interface used to initialize a new DataStorage or PeerStorage // with provided configuration. -type Driver func(conf.MapConfig) (PeerStorage, error) +type Driver interface { + NewDataStorage(cfg conf.MapConfig) (DataStorage, error) + NewPeerStorage(cfg conf.MapConfig) (PeerStorage, error) +} // ErrResourceDoesNotExist is the error returned by all delete methods and the // AnnouncePeers method of the PeerStorage interface if the requested resource @@ -231,15 +234,31 @@ func RegisterDriver(name string, d Driver) { drivers[name] = d } -// NewStorage attempts to initialize a new PeerStorage instance from +// NewDataStorage attempts to initialize a new DataStorage instance from // the list of registered drivers. -func NewStorage(cfg conf.NamedMapConfig) (ps PeerStorage, err error) { +func NewDataStorage(cfg conf.NamedMapConfig) (DataStorage, error) { driversMU.RLock() defer driversMU.RUnlock() - logger.Debug().Object("config", cfg).Msg("staring storage") + logger.Debug().Object("config", cfg).Msg("starting peer storage") - var b Driver - b, ok := drivers[cfg.Name] + var d Driver + d, ok := drivers[cfg.Name] + if !ok { + return nil, fmt.Errorf("storage with name '%s' does not exists", cfg.Name) + } + + return d.NewPeerStorage(cfg.Config) +} + +// NewPeerStorage attempts to initialize a new PeerStorage instance from +// the list of registered drivers. +func NewPeerStorage(cfg conf.NamedMapConfig) (ps PeerStorage, err error) { + driversMU.RLock() + defer driversMU.RUnlock() + logger.Debug().Object("config", cfg).Msg("starting peer storage") + + var d Driver + d, ok := drivers[cfg.Name] if !ok { return nil, fmt.Errorf("storage with name '%s' does not exists", cfg.Name) } @@ -249,7 +268,7 @@ func NewStorage(cfg conf.NamedMapConfig) (ps PeerStorage, err error) { return } - if ps, err = b(cfg.Config); err != nil { + if ps, err = d.NewPeerStorage(cfg.Config); err != nil { return }