(untested) refactor code, add separate call to create KV-store

This commit is contained in:
Lawrence, Rendall
2024-05-20 18:28:40 +03:00
parent 837b388b2f
commit 2f01a7cfc8
13 changed files with 164 additions and 87 deletions
+1 -1
View File
@@ -33,7 +33,7 @@ func (r *Server) Run(cfg *Config) (err error) {
log.Info().Msg("metrics disabled because of empty address")
}
r.storage, err = storage.NewStorage(cfg.Storage)
r.storage, err = storage.NewPeerStorage(cfg.Storage)
if err != nil {
return fmt.Errorf("failed to create storage: %w", err)
}
+5 -2
View File
@@ -206,8 +206,11 @@ prehooks:
# - name: torrent approval
# config:
# initial_source: list
# Save data provided by source in storage above
# preserve: false
# Save data provided by source in specific storage. If name is empty or 'internal', provided above 'storage'
# is used, but another storage may be provided (configuration is the same as for 'storage' above)
# store:
# name: internal
# config:
# configuration:
# hash_list:
# - "a1b2c3d4e5a1b2c3d4e5a1b2c3d4e5a1b2c3d4e5"
+1 -1
View File
@@ -16,7 +16,7 @@ func init() {
}
func TestStartStopRaceIssue437(t *testing.T) {
ps, err := storage.NewStorage(conf.NamedMapConfig{
ps, err := storage.NewPeerStorage(conf.NamedMapConfig{
Name: "memory",
Config: conf.MapConfig{},
})
+15 -7
View File
@@ -4,6 +4,7 @@ package torrentapproval
import (
"context"
"errors"
"fmt"
"io"
@@ -11,8 +12,6 @@ import (
"github.com/sot-tech/mochi/middleware"
"github.com/sot-tech/mochi/middleware/torrentapproval/container"
"github.com/sot-tech/mochi/pkg/conf"
"github.com/sot-tech/mochi/storage/memory"
// import directory watcher to enable appropriate support
_ "github.com/sot-tech/mochi/middleware/torrentapproval/container/directory"
@@ -24,6 +23,8 @@ import (
// Name is the name by which this middleware is registered with Conf.
const Name = "torrent approval"
const internalStore = "internal"
func init() {
middleware.RegisterBuilder(Name, build)
}
@@ -31,9 +32,10 @@ func init() {
type baseConfig struct {
// Source - name of container for initial values
Source string `cfg:"initial_source"`
// Preserve - if true, container will receive real registered storage if it is NOT `memory`
// if false - temporary in-memory storage will be used or created
// Deprecated: use Store parameter
Preserve bool
// Store where to hold provided data by Source
Store conf.NamedMapConfig
// Configuration depends on used container
Configuration conf.MapConfig
}
@@ -52,9 +54,15 @@ func build(config conf.MapConfig, st storage.PeerStorage) (h middleware.Hook, er
return nil, fmt.Errorf("invalid config for middleware %s: config not provided", Name)
}
var ds storage.DataStorage = st
if !cfg.Preserve && ds.Preservable() {
ds = memory.NewDataStorage()
if cfg.Preserve {
return nil, errors.New("preserve option is deprecated, use store parameter")
}
var ds storage.DataStorage
if len(cfg.Store.Name) == 0 || cfg.Store.Name == internalStore {
ds = st
} else if ds, err = storage.NewDataStorage(cfg.Store); err != nil {
return
}
var c container.Container
@@ -71,8 +71,7 @@ var cases = []struct {
}
func TestHandleAnnounce(t *testing.T) {
config := memory.Config{}.Validate()
storage, err := memory.NewPeerStorage(config)
storage, err := memory.Builder{}.NewPeerStorage(make(conf.MapConfig))
require.Nil(t, err)
for _, tt := range cases {
t.Run(fmt.Sprintf("testing hash %s", tt.ih), func(t *testing.T) {
@@ -84,10 +83,10 @@ func TestHandleAnnounce(t *testing.T) {
req := &bittorrent.AnnounceRequest{}
resp := &bittorrent.AnnounceResponse{}
hashinfo, err := bittorrent.NewInfoHashString(tt.ih)
ih, err := bittorrent.NewInfoHashString(tt.ih)
require.Nil(t, err)
req.InfoHash = hashinfo
req.InfoHash = ih
nctx, err := h.HandleAnnounce(ctx, req, resp)
require.Equal(t, ctx, nctx)
+14 -2
View File
@@ -33,10 +33,12 @@ var (
func init() {
// Register the storage driver.
storage.RegisterDriver("keydb", builder)
storage.RegisterDriver("keydb", builder{})
}
func builder(icfg conf.MapConfig) (storage.PeerStorage, error) {
type builder struct{}
func (builder) NewPeerStorage(icfg conf.MapConfig) (storage.PeerStorage, error) {
var cfg r.Config
var err error
@@ -47,6 +49,16 @@ func builder(icfg conf.MapConfig) (storage.PeerStorage, error) {
return newStore(cfg)
}
func (b builder) NewDataStorage(icfg conf.MapConfig) (storage.DataStorage, error) {
var cfg r.Config
if err := icfg.Unmarshal(&cfg); err != nil {
return nil, err
}
return r.NewStore(cfg)
}
func newStore(cfg r.Config) (*store, error) {
var err error
if cfg, err = cfg.Validate(); err != nil {
+15 -12
View File
@@ -32,19 +32,24 @@ var logger = log.NewLogger("storage/memory")
func init() {
// Register the storage driver.
storage.RegisterDriver(Name, builder)
storage.RegisterDriver(Name, Builder{})
}
func builder(icfg conf.MapConfig) (storage.PeerStorage, error) {
var cfg Config
type Builder struct{}
func (Builder) NewDataStorage(conf.MapConfig) (storage.DataStorage, error) {
return dataStorage(), nil
}
func (Builder) NewPeerStorage(icfg conf.MapConfig) (storage.PeerStorage, error) {
var cfg config
if err := icfg.Unmarshal(&cfg); err != nil {
return nil, err
}
return NewPeerStorage(cfg)
return peerStorage(cfg)
}
// Config holds the configuration of a memory PeerStorage.
type Config struct {
type config struct {
ShardCount int `cfg:"shard_count"`
}
@@ -52,7 +57,7 @@ type Config struct {
// default values replacing anything that is invalid.
//
// This function warns to the logger when a value is changed.
func (cfg Config) Validate() Config {
func (cfg config) Validate() config {
validcfg := cfg
if cfg.ShardCount <= 0 || cfg.ShardCount > (math.MaxInt/2) {
@@ -67,12 +72,11 @@ func (cfg Config) Validate() Config {
return validcfg
}
// NewPeerStorage creates a new PeerStorage backed by memory.
func NewPeerStorage(provided Config) (storage.PeerStorage, error) {
func peerStorage(provided config) (storage.PeerStorage, error) {
cfg := provided.Validate()
ps := &peerStore{
shards: make([]*peerShard, cfg.ShardCount*2),
DataStorage: NewDataStorage(),
DataStorage: dataStorage(),
closed: make(chan any),
}
@@ -453,8 +457,7 @@ func (ps *peerStore) ScrapeSwarm(_ context.Context, ih bittorrent.InfoHash) (lee
return
}
// NewDataStorage creates new in-memory data store
func NewDataStorage() storage.DataStorage {
func dataStorage() storage.DataStorage {
return new(dataStore)
}
+1 -1
View File
@@ -8,7 +8,7 @@ import (
)
func createNew() storage.PeerStorage {
ps, err := NewPeerStorage(Config{ShardCount: 1024})
ps, err := peerStorage(config{ShardCount: 1024})
if err != nil {
panic(err)
}
+63 -40
View File
@@ -51,13 +51,37 @@ var (
func init() {
// Register the storage builder.
storage.RegisterDriver("pg", builder)
storage.RegisterDriver("pg", builder{})
}
func builder(icfg conf.MapConfig) (storage.PeerStorage, error) {
var cfg Config
type builder struct{}
if err := icfg.Unmarshal(&cfg); err != nil {
func (builder) NewDataStorage(icfg conf.MapConfig) (storage.DataStorage, error) {
var cfg config
err := icfg.Unmarshal(&cfg)
if err != nil {
return nil, err
}
cfg, err = cfg.validateDataStore()
if err != nil {
return nil, err
}
return newStore(cfg)
}
func (builder) NewPeerStorage(icfg conf.MapConfig) (storage.PeerStorage, error) {
var cfg config
err := icfg.Unmarshal(&cfg)
if err != nil {
return nil, err
}
cfg, err = cfg.validateFull()
if err != nil {
return nil, err
}
@@ -73,19 +97,14 @@ func noResultErr(err error) error {
return err
}
func newStore(cfg Config) (storage.PeerStorage, error) {
cfg, err := cfg.Validate()
if err != nil {
return nil, err
}
func newStore(cfg config) (storage.PeerStorage, error) {
con, err := pgxpool.New(context.Background(), cfg.ConnectionString)
if err != nil {
return nil, err
}
return &store{
Config: cfg,
config: cfg,
Pool: con,
wg: sync.WaitGroup{},
closed: make(chan any),
@@ -121,8 +140,14 @@ type downloadQueryConf struct {
IncrementQuery string `cfg:"inc_query"`
}
// Config holds the configuration of a redis PeerStorage.
type Config struct {
func checkParameter(p *string, name string) (err error) {
if *p = strings.TrimSpace(*p); len(*p) == 0 {
err = fmt.Errorf(errRequiredParameterNotSetMsg, name)
}
return
}
type config struct {
ConnectionString string `cfg:"connection_string"`
PingQuery string `cfg:"ping_query"`
Peer peerQueryConf
@@ -133,11 +158,7 @@ type Config struct {
InfoHashCountQuery string `cfg:"info_hash_count_query"`
}
// Validate sanity checks values set in a config and returns a new config with
// default values replacing anything that is invalid.
//
// This function warns to the logger when a value is changed.
func (cfg Config) Validate() (Config, error) {
func (cfg config) validateDataStore() (config, error) {
validCfg := cfg
validCfg.ConnectionString = strings.TrimSpace(validCfg.ConnectionString)
if len(validCfg.ConnectionString) == 0 {
@@ -153,66 +174,68 @@ func (cfg Config) Validate() (Config, error) {
Msg("falling back to default configuration")
}
fn := func(p *string, name string) (err error) {
if *p = strings.TrimSpace(*p); len(*p) == 0 {
err = fmt.Errorf(errRequiredParameterNotSetMsg, name)
}
return
}
if err := fn(&validCfg.Peer.AddQuery, "peer.addQuery"); err != nil {
if err := checkParameter(&validCfg.Data.AddQuery, "data.addQuery"); err != nil {
return cfg, err
}
if err := fn(&validCfg.Peer.DelQuery, "peer.delQuery"); err != nil {
if err := checkParameter(&validCfg.Data.GetQuery, "data.getQuery"); err != nil {
return cfg, err
}
if err := fn(&validCfg.Peer.GraduateQuery, "peer.graduateQuery"); err != nil {
if err := checkParameter(&validCfg.Data.DelQuery, "data.delQuery"); err != nil {
return cfg, err
}
if err := fn(&validCfg.Peer.CountQuery, "peer.countQuery"); err != nil {
return validCfg, nil
}
func (cfg config) validateFull() (config, error) {
validCfg, err := cfg.validateDataStore()
if err != nil {
return cfg, err
}
if err := fn(&validCfg.Peer.CountSeedersColumn, "peer.countSeedersColumn"); err != nil {
if err = checkParameter(&validCfg.Peer.AddQuery, "peer.addQuery"); err != nil {
return cfg, err
}
if err := fn(&validCfg.Peer.CountLeechersColumn, "peer.countLeechersColumn"); err != nil {
if err = checkParameter(&validCfg.Peer.DelQuery, "peer.delQuery"); err != nil {
return cfg, err
}
if err := fn(&validCfg.Peer.ByInfoHashClause, "peer.byInfoHashClause"); err != nil {
if err = checkParameter(&validCfg.Peer.GraduateQuery, "peer.graduateQuery"); err != nil {
return cfg, err
}
if err := fn(&validCfg.Announce.Query, "announce.query"); err != nil {
if err = checkParameter(&validCfg.Peer.CountQuery, "peer.countQuery"); err != nil {
return cfg, err
}
if err := fn(&validCfg.Announce.PeerIDColumn, "announce.peerIDColumn"); err != nil {
if err = checkParameter(&validCfg.Peer.CountSeedersColumn, "peer.countSeedersColumn"); err != nil {
return cfg, err
}
if err := fn(&validCfg.Announce.AddressColumn, "announce.addressColumn"); err != nil {
if err = checkParameter(&validCfg.Peer.CountLeechersColumn, "peer.countLeechersColumn"); err != nil {
return cfg, err
}
if err := fn(&validCfg.Announce.PortColumn, "announce.portColumn"); err != nil {
if err = checkParameter(&validCfg.Peer.ByInfoHashClause, "peer.byInfoHashClause"); err != nil {
return cfg, err
}
if err := fn(&validCfg.Data.AddQuery, "data.addQuery"); err != nil {
if err = checkParameter(&validCfg.Announce.Query, "announce.query"); err != nil {
return cfg, err
}
if err := fn(&validCfg.Data.GetQuery, "data.getQuery"); err != nil {
if err = checkParameter(&validCfg.Announce.PeerIDColumn, "announce.peerIDColumn"); err != nil {
return cfg, err
}
if err := fn(&validCfg.Data.DelQuery, "data.delQuery"); err != nil {
if err = checkParameter(&validCfg.Announce.AddressColumn, "announce.addressColumn"); err != nil {
return cfg, err
}
if err = checkParameter(&validCfg.Announce.PortColumn, "announce.portColumn"); err != nil {
return cfg, err
}
@@ -227,7 +250,7 @@ func (cfg Config) Validate() (Config, error) {
}
type store struct {
Config
config
*pgxpool.Pool
wg sync.WaitGroup
closed chan any
+5 -1
View File
@@ -42,7 +42,7 @@ CREATE TABLE mo_kv (
`
)
var cfg = Config{
var cfg = config{
ConnectionString: "host=127.0.0.1 database=test user=postgres pool_max_conns=50",
PingQuery: "SELECT 1",
Peer: peerQueryConf{
@@ -76,6 +76,10 @@ var cfg = Config{
func createNew() s.PeerStorage {
var ps s.PeerStorage
var err error
cfg, err = cfg.validateFull()
if err != nil {
panic(fmt.Sprint("invalid configuration: ", err))
}
ps, err = newStore(cfg)
if err != nil {
panic(fmt.Sprint("Unable to create PostgreSQL connection: ", err, "\nThis driver needs real PostgreSQL instance"))
+13 -7
View File
@@ -79,21 +79,27 @@ var (
func init() {
// Register the storage builder.
storage.RegisterDriver("redis", builder)
storage.RegisterDriver("redis", builder{})
}
func builder(icfg conf.MapConfig) (storage.PeerStorage, error) {
// Unmarshal the bytes into the proper config type.
var cfg Config
type builder struct{}
if err := icfg.Unmarshal(&cfg); err != nil {
func (builder) NewPeerStorage(icfg conf.MapConfig) (storage.PeerStorage, error) {
var cfg Config
var err error
if err = icfg.Unmarshal(&cfg); err != nil {
return nil, err
}
return newStore(cfg)
return NewStore(cfg)
}
func newStore(cfg Config) (*store, error) {
func (b builder) NewDataStorage(icfg conf.MapConfig) (storage.DataStorage, error) {
return b.NewPeerStorage(icfg)
}
func NewStore(cfg Config) (storage.PeerStorage, error) {
cfg, err := cfg.Validate()
if err != nil {
return nil, err
+1 -1
View File
@@ -19,7 +19,7 @@ var cfg = Config{
func createNew() s.PeerStorage {
var ps s.PeerStorage
var err error
ps, err = newStore(cfg)
ps, err = NewStore(cfg)
if err != nil {
panic(fmt.Sprint("Unable to create Redis connection: ", err, "\nThis driver needs real Redis instance"))
}
+27 -8
View File
@@ -82,9 +82,12 @@ type Entry struct {
Value []byte
}
// Driver is the function used to initialize a new PeerStorage
// Driver is the interface used to initialize a new DataStorage or PeerStorage
// with provided configuration.
type Driver func(conf.MapConfig) (PeerStorage, error)
type Driver interface {
NewDataStorage(cfg conf.MapConfig) (DataStorage, error)
NewPeerStorage(cfg conf.MapConfig) (PeerStorage, error)
}
// ErrResourceDoesNotExist is the error returned by all delete methods and the
// AnnouncePeers method of the PeerStorage interface if the requested resource
@@ -231,15 +234,31 @@ func RegisterDriver(name string, d Driver) {
drivers[name] = d
}
// NewStorage attempts to initialize a new PeerStorage instance from
// NewDataStorage attempts to initialize a new DataStorage instance from
// the list of registered drivers.
func NewStorage(cfg conf.NamedMapConfig) (ps PeerStorage, err error) {
func NewDataStorage(cfg conf.NamedMapConfig) (DataStorage, error) {
driversMU.RLock()
defer driversMU.RUnlock()
logger.Debug().Object("config", cfg).Msg("staring storage")
logger.Debug().Object("config", cfg).Msg("starting peer storage")
var b Driver
b, ok := drivers[cfg.Name]
var d Driver
d, ok := drivers[cfg.Name]
if !ok {
return nil, fmt.Errorf("storage with name '%s' does not exists", cfg.Name)
}
return d.NewPeerStorage(cfg.Config)
}
// NewPeerStorage attempts to initialize a new PeerStorage instance from
// the list of registered drivers.
func NewPeerStorage(cfg conf.NamedMapConfig) (ps PeerStorage, err error) {
driversMU.RLock()
defer driversMU.RUnlock()
logger.Debug().Object("config", cfg).Msg("starting peer storage")
var d Driver
d, ok := drivers[cfg.Name]
if !ok {
return nil, fmt.Errorf("storage with name '%s' does not exists", cfg.Name)
}
@@ -249,7 +268,7 @@ func NewStorage(cfg conf.NamedMapConfig) (ps PeerStorage, err error) {
return
}
if ps, err = b(cfg.Config); err != nil {
if ps, err = d.NewPeerStorage(cfg.Config); err != nil {
return
}