diff --git a/docs/storage/postgres.md b/docs/storage/postgres.md index a5a8216..b3ea57a 100644 --- a/docs/storage/postgres.md +++ b/docs/storage/postgres.md @@ -43,9 +43,12 @@ Implementation expects next data types: * peer creation date and time - `timestamp` * Table for arbitrary data (KV store): * context - string (`varchar`, `character varying`) - * name - string (`varchar`, `character varying`) + * name - byte array (`bytea`)* * value - byte array (`bytea`) +(*) in KV table `name` present as byte array because of possibility +to place hash as _raw_ string, which is not supported by PostgreSQL. + Sample script to create tables: ```sql diff --git a/go.mod b/go.mod index 346106d..08f026f 100644 --- a/go.mod +++ b/go.mod @@ -48,8 +48,8 @@ require ( github.com/prometheus/client_model v0.2.0 // indirect github.com/prometheus/common v0.35.0 // indirect github.com/prometheus/procfs v0.7.3 // indirect - golang.org/x/crypto v0.0.0-20220525230936-793ad666bf5e // indirect - golang.org/x/sys v0.0.0-20220615213510-4f61da869c0c // indirect + golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d // indirect + golang.org/x/sys v0.0.0-20220622161953-175b2fd9d664 // indirect golang.org/x/text v0.3.7 // indirect google.golang.org/protobuf v1.28.0 // indirect ) diff --git a/go.sum b/go.sum index 9ad2d3c..269b93a 100644 --- a/go.sum +++ b/go.sum @@ -448,8 +448,8 @@ golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPh golang.org/x/crypto v0.0.0-20201203163018-be400aefbc4c/go.mod h1:jdWPYTVW3xRLrWPugEBEK3UY2ZEsg3UU495nc5E+M+I= golang.org/x/crypto v0.0.0-20210616213533-5ff15b29337e/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= golang.org/x/crypto v0.0.0-20210711020723-a769d52b0f97/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= -golang.org/x/crypto v0.0.0-20220525230936-793ad666bf5e h1:T8NU3HyQ8ClP4SEE+KbFlg6n0NhuTsN4MyznaarGsZM= -golang.org/x/crypto v0.0.0-20220525230936-793ad666bf5e/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= +golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d h1:sK3txAijHtOK88l68nt020reeT1ZdKLIYetKl95FzVY= +golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190510132918-efd6b22b2522/go.mod h1:ZjyILWgesfNpC6sMxTJOJm9Kp84zZh5NQWvqDGG3Qr8= @@ -581,8 +581,8 @@ golang.org/x/sys v0.0.0-20210927094055-39ccf1dd6fa6/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220114195835-da31bd327af9/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220412211240-33da011f77ad/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.0.0-20220615213510-4f61da869c0c h1:aFV+BgZ4svzjfabn8ERpuB4JI4N6/rdy1iusx77G3oU= -golang.org/x/sys v0.0.0-20220615213510-4f61da869c0c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220622161953-175b2fd9d664 h1:wEZYwx+kK+KlZ0hpvP2Ls1Xr4+RWnlzGFwPP0aiDjIU= +golang.org/x/sys v0.0.0-20220622161953-175b2fd9d664/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= diff --git a/middleware/torrentapproval/container/directory/directory.go b/middleware/torrentapproval/container/directory/directory.go index 7b2e13c..a4867ad 100644 --- a/middleware/torrentapproval/container/directory/directory.go +++ b/middleware/torrentapproval/container/directory/directory.go @@ -87,16 +87,17 @@ func build(conf conf.MapConfig, st storage.DataStorage) (container.Container, er if len(name) == 0 { name = list.DUMMY } + bName := []byte(name) logger.Err(d.Storage.Put(d.StorageCtx, storage.Entry{ Key: event.InfoHash.AsString(), - Value: name, + Value: bName, }, storage.Entry{ Key: v2hash.RawString(), - Value: name, + Value: bName, }, storage.Entry{ Key: v2hash.TruncateV1().RawString(), - Value: name, + Value: bName, })). Str("action", "add"). Str("file", event.TorrentFilePath). diff --git a/middleware/torrentapproval/container/list/list.go b/middleware/torrentapproval/container/list/list.go index eff0413..1ffb4ea 100644 --- a/middleware/torrentapproval/container/list/list.go +++ b/middleware/torrentapproval/container/list/list.go @@ -62,9 +62,9 @@ func build(conf conf.MapConfig, st storage.DataStorage) (container.Container, er if err != nil { return nil, fmt.Errorf("whitelist : %s : %w", hashString, err) } - init = append(init, storage.Entry{Key: ih.RawString(), Value: DUMMY}) + init = append(init, storage.Entry{Key: ih.RawString(), Value: []byte(DUMMY)}) if len(ih) == bittorrent.InfoHashV2Len { - init = append(init, storage.Entry{Key: ih.TruncateV1().RawString(), Value: DUMMY}) + init = append(init, storage.Entry{Key: ih.TruncateV1().RawString(), Value: []byte(DUMMY)}) } } if err := l.Storage.Put(l.StorageCtx, init...); err != nil { diff --git a/storage/keydb/storage.go b/storage/keydb/storage.go index 7487850..3d241d3 100644 --- a/storage/keydb/storage.go +++ b/storage/keydb/storage.go @@ -60,6 +60,15 @@ func newStore(cfg r.Config) (*store, error) { return nil, err } + if cfg.PeerLifetime <= 0 { + logger.Warn(). + Str("name", "peerLifetime"). + Dur("provided", cfg.PeerLifetime). + Dur("default", storage.DefaultPeerLifetime). + Msg("falling back to default configuration") + cfg.PeerLifetime = storage.DefaultPeerLifetime + } + var rs r.Connection if rs, err = cfg.Connect(); err != nil { diff --git a/storage/memory/storage.go b/storage/memory/storage.go index 381e186..16f5fad 100644 --- a/storage/memory/storage.go +++ b/storage/memory/storage.go @@ -4,9 +4,7 @@ package memory import ( "encoding/binary" - "fmt" "math" - "reflect" "runtime" "sync" "time" @@ -445,23 +443,12 @@ type dataStore struct { sync.Map } -func asKey(in any) any { - if in == nil { - panic("unable to use nil map key") - } - if reflect.TypeOf(in).Comparable() { - return in - } - // FIXME: dirty hack - return fmt.Sprint(in) -} - func (ds *dataStore) Put(ctx string, values ...storage.Entry) error { if len(values) > 0 { c, _ := ds.LoadOrStore(ctx, new(sync.Map)) m := c.(*sync.Map) for _, p := range values { - m.Store(asKey(p.Key), p.Value) + m.Store(p.Key, p.Value) } } return nil @@ -470,17 +457,18 @@ func (ds *dataStore) Put(ctx string, values ...storage.Entry) error { func (ds *dataStore) Contains(ctx string, key string) (bool, error) { var exist bool if m, found := ds.Map.Load(ctx); found { - _, exist = m.(*sync.Map).Load(asKey(key)) + _, exist = m.(*sync.Map).Load(key) } return exist, nil } -func (ds *dataStore) Load(ctx string, key string) (any, error) { - var v any +func (ds *dataStore) Load(ctx string, key string) (out []byte, _ error) { if m, found := ds.Map.Load(ctx); found { - v, _ = m.(*sync.Map).Load(asKey(key)) + if v, _ := m.(*sync.Map).Load(key); v != nil { + out = v.([]byte) + } } - return v, nil + return } func (ds *dataStore) Delete(ctx string, keys ...string) error { @@ -488,7 +476,7 @@ func (ds *dataStore) Delete(ctx string, keys ...string) error { if m, found := ds.Map.Load(ctx); found { m := m.(*sync.Map) for _, k := range keys { - m.Delete(asKey(k)) + m.Delete(k) } } } diff --git a/storage/pg/storage.go b/storage/pg/storage.go index 717fa2c..e1a889a 100644 --- a/storage/pg/storage.go +++ b/storage/pg/storage.go @@ -236,12 +236,7 @@ func (s *store) Put(ctx string, values ...storage.Entry) (err error) { var tx pgx.Tx if tx, err = s.Begin(context.TODO()); err == nil { for _, v := range values { - val := v.Value - switch tOut := val.(type) { - case string: - val = []byte(tOut) - } - if _, err = tx.Exec(context.TODO(), s.Data.AddQuery, ctx, []byte(v.Key), val); err != nil { + if _, err = tx.Exec(context.TODO(), s.Data.AddQuery, ctx, []byte(v.Key), v.Value); err != nil { break } } @@ -265,20 +260,10 @@ func (s *store) Contains(ctx string, key string) (contains bool, err error) { return } -func (s *store) Load(ctx string, key string) (out any, err error) { - var rows pgx.Rows - if rows, err = s.Query(context.TODO(), s.Data.GetQuery, ctx, []byte(key)); err == nil { - defer rows.Close() - if rows.Next() { - var values []any - if values, err = rows.Values(); err == nil && len(values) > 0 { - out = values[0] - switch tOut := out.(type) { - case []byte: - out = string(tOut) - } - } - } +func (s *store) Load(ctx string, key string) (out []byte, err error) { + row := s.QueryRow(context.TODO(), s.Data.GetQuery, ctx, []byte(key)) + if err = row.Scan(&out); errors.Is(err, pgx.ErrNoRows) { + err = nil } return } diff --git a/storage/redis/storage.go b/storage/redis/storage.go index a77d9d4..1cd231a 100644 --- a/storage/redis/storage.go +++ b/storage/redis/storage.go @@ -158,7 +158,7 @@ func (cfg Config) Validate() (Config, error) { if len(cfg.Addresses) == 0 { validCfg.Addresses = []string{defaultRedisAddress} logger.Warn(). - Str("name", "Addresses"). + Str("name", "addresses"). Strs("provided", cfg.Addresses). Strs("default", validCfg.Addresses). Msg("falling back to default configuration") @@ -167,7 +167,7 @@ func (cfg Config) Validate() (Config, error) { if cfg.ReadTimeout <= 0 { validCfg.ReadTimeout = defaultReadTimeout logger.Warn(). - Str("name", "ReadTimeout"). + Str("name", "readTimeout"). Dur("provided", cfg.ReadTimeout). Dur("default", validCfg.ReadTimeout). Msg("falling back to default configuration") @@ -176,7 +176,7 @@ func (cfg Config) Validate() (Config, error) { if cfg.WriteTimeout <= 0 { validCfg.WriteTimeout = defaultWriteTimeout logger.Warn(). - Str("name", "WriteTimeout"). + Str("name", "writeTimeout"). Dur("provided", cfg.WriteTimeout). Dur("default", validCfg.WriteTimeout). Msg("falling back to default configuration") @@ -185,7 +185,7 @@ func (cfg Config) Validate() (Config, error) { if cfg.ConnectTimeout <= 0 { validCfg.ConnectTimeout = defaultConnectTimeout logger.Warn(). - Str("name", "ConnectTimeout"). + Str("name", "connectTimeout"). Dur("provided", cfg.ConnectTimeout). Dur("default", validCfg.ConnectTimeout). Msg("falling back to default configuration") @@ -594,8 +594,8 @@ func (ps *Connection) Contains(ctx string, key string) (bool, error) { } // Load - storage.DataStorage implementation -func (ps *Connection) Load(ctx string, key string) (v any, err error) { - v, err = ps.HGet(context.TODO(), PrefixKey+ctx, key).Result() +func (ps *Connection) Load(ctx string, key string) (v []byte, err error) { + v, err = ps.HGet(context.TODO(), PrefixKey+ctx, key).Bytes() if err != nil && errors.Is(err, redis.Nil) { v, err = nil, nil } diff --git a/storage/storage.go b/storage/storage.go index 65eba21..b8f46df 100644 --- a/storage/storage.go +++ b/storage/storage.go @@ -16,9 +16,12 @@ import ( ) const ( - defaultPrometheusReportingInterval = time.Second * 1 - defaultGarbageCollectionInterval = time.Minute * 3 - defaultPeerLifetime = time.Minute * 30 + // DefaultPrometheusReportingInterval default interval of statistics collection + DefaultPrometheusReportingInterval = time.Second * 1 + // DefaultGarbageCollectionInterval default interval of stale peers deletions + DefaultGarbageCollectionInterval = time.Minute * 3 + // DefaultPeerLifetime default peer lifetime + DefaultPeerLifetime = time.Minute * 30 ) var ( @@ -40,21 +43,21 @@ type Config struct { func (c Config) sanitizeGCConfig() (gcInterval, peerTTL time.Duration) { if c.GarbageCollectionInterval <= 0 { - gcInterval = defaultGarbageCollectionInterval + gcInterval = DefaultGarbageCollectionInterval logger.Warn(). - Str("name", "GarbageCollectionInterval"). + Str("name", "garbageCollectionInterval"). Dur("provided", c.GarbageCollectionInterval). - Dur("default", defaultGarbageCollectionInterval). + Dur("default", DefaultGarbageCollectionInterval). Msg("falling back to default configuration") } else { gcInterval = c.GarbageCollectionInterval } if c.PeerLifetime <= 0 { - peerTTL = defaultPeerLifetime + peerTTL = DefaultPeerLifetime logger.Warn(). - Str("name", "PeerLifetime"). + Str("name", "peerLifetime"). Dur("provided", c.PeerLifetime). - Dur("default", defaultPeerLifetime). + Dur("default", DefaultPeerLifetime). Msg("falling back to default configuration") } else { peerTTL = c.PeerLifetime @@ -64,11 +67,11 @@ func (c Config) sanitizeGCConfig() (gcInterval, peerTTL time.Duration) { func (c Config) sanitizeStatisticsConfig() (statInterval time.Duration) { if c.PrometheusReportingInterval < 0 { - statInterval = defaultPrometheusReportingInterval + statInterval = DefaultPrometheusReportingInterval logger.Warn(). - Str("name", "PrometheusReportingInterval"). + Str("name", "prometheusReportingInterval"). Dur("provided", c.PrometheusReportingInterval). - Dur("default", defaultPrometheusReportingInterval). + Dur("default", DefaultPrometheusReportingInterval). Msg("falling back to default configuration") } return @@ -77,7 +80,7 @@ func (c Config) sanitizeStatisticsConfig() (statInterval time.Duration) { // Entry - some key-value pair, used for BulkPut type Entry struct { Key string - Value any + Value []byte } // Builder is the function used to initialize a new type of PeerStorage. @@ -105,7 +108,7 @@ type DataStorage interface { Contains(ctx string, key string) (bool, error) // Load used to get arbitrary data in specified context by its key - Load(ctx string, key string) (any, error) + Load(ctx string, key string) ([]byte, error) // Delete used to delete arbitrary data in specified context by its keys Delete(ctx string, keys ...string) error diff --git a/storage/test/storage_test_base.go b/storage/test/storage_test_base.go index 8c82890..3e746c0 100644 --- a/storage/test/storage_test_base.go +++ b/storage/test/storage_test_base.go @@ -200,7 +200,7 @@ func (th *testHolder) LeecherPutGraduateAnnounceDeleteAnnounce(t *testing.T) { func (th *testHolder) CustomPutContainsLoadDelete(t *testing.T) { for _, c := range testData { - err := th.st.Put(kvStoreCtx, storage.Entry{Key: c.peer.String(), Value: c.ih.RawString()}) + err := th.st.Put(kvStoreCtx, storage.Entry{Key: c.peer.String(), Value: []byte(c.ih.RawString())}) require.Nil(t, err) // check if exist in ctx we put @@ -242,7 +242,7 @@ func (th *testHolder) CustomBulkPutContainsLoadDelete(t *testing.T) { keys = append(keys, key) pairs = append(pairs, storage.Entry{ Key: key, - Value: c.ih.RawString(), + Value: []byte(c.ih.RawString()), }) } err := th.st.Put(kvStoreCtx, pairs...) @@ -260,7 +260,7 @@ func (th *testHolder) CustomBulkPutContainsLoadDelete(t *testing.T) { out, _ := th.st.Load(kvStoreCtx, p.Key) ih, err := bittorrent.NewInfoHash(out) require.Nil(t, err) - require.Equal(t, p.Value, ih.RawString()) + require.Equal(t, p.Value, []byte(ih.RawString())) } err = th.st.Delete(kvStoreCtx, keys...)