(tested) change DataStorage interface to accept byte array as value

This commit is contained in:
Lawrence, Rendall
2022-06-23 21:01:05 +03:00
parent 4468794de1
commit 0fd0e06360
11 changed files with 64 additions and 75 deletions
+4 -1
View File
@@ -43,9 +43,12 @@ Implementation expects next data types:
* peer creation date and time - `timestamp`
* Table for arbitrary data (KV store):
* context - string (`varchar`, `character varying`)
* name - string (`varchar`, `character varying`)
* name - byte array (`bytea`)*
* value - byte array (`bytea`)
(*) in KV table `name` present as byte array because of possibility
to place hash as _raw_ string, which is not supported by PostgreSQL.
Sample script to create tables:
```sql
+2 -2
View File
@@ -48,8 +48,8 @@ require (
github.com/prometheus/client_model v0.2.0 // indirect
github.com/prometheus/common v0.35.0 // indirect
github.com/prometheus/procfs v0.7.3 // indirect
golang.org/x/crypto v0.0.0-20220525230936-793ad666bf5e // indirect
golang.org/x/sys v0.0.0-20220615213510-4f61da869c0c // indirect
golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d // indirect
golang.org/x/sys v0.0.0-20220622161953-175b2fd9d664 // indirect
golang.org/x/text v0.3.7 // indirect
google.golang.org/protobuf v1.28.0 // indirect
)
+4 -4
View File
@@ -448,8 +448,8 @@ golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPh
golang.org/x/crypto v0.0.0-20201203163018-be400aefbc4c/go.mod h1:jdWPYTVW3xRLrWPugEBEK3UY2ZEsg3UU495nc5E+M+I=
golang.org/x/crypto v0.0.0-20210616213533-5ff15b29337e/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
golang.org/x/crypto v0.0.0-20210711020723-a769d52b0f97/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
golang.org/x/crypto v0.0.0-20220525230936-793ad666bf5e h1:T8NU3HyQ8ClP4SEE+KbFlg6n0NhuTsN4MyznaarGsZM=
golang.org/x/crypto v0.0.0-20220525230936-793ad666bf5e/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d h1:sK3txAijHtOK88l68nt020reeT1ZdKLIYetKl95FzVY=
golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20190510132918-efd6b22b2522/go.mod h1:ZjyILWgesfNpC6sMxTJOJm9Kp84zZh5NQWvqDGG3Qr8=
@@ -581,8 +581,8 @@ golang.org/x/sys v0.0.0-20210927094055-39ccf1dd6fa6/go.mod h1:oPkhp1MJrh7nUepCBc
golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220114195835-da31bd327af9/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220412211240-33da011f77ad/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220615213510-4f61da869c0c h1:aFV+BgZ4svzjfabn8ERpuB4JI4N6/rdy1iusx77G3oU=
golang.org/x/sys v0.0.0-20220615213510-4f61da869c0c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220622161953-175b2fd9d664 h1:wEZYwx+kK+KlZ0hpvP2Ls1Xr4+RWnlzGFwPP0aiDjIU=
golang.org/x/sys v0.0.0-20220622161953-175b2fd9d664/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
@@ -87,16 +87,17 @@ func build(conf conf.MapConfig, st storage.DataStorage) (container.Container, er
if len(name) == 0 {
name = list.DUMMY
}
bName := []byte(name)
logger.Err(d.Storage.Put(d.StorageCtx,
storage.Entry{
Key: event.InfoHash.AsString(),
Value: name,
Value: bName,
}, storage.Entry{
Key: v2hash.RawString(),
Value: name,
Value: bName,
}, storage.Entry{
Key: v2hash.TruncateV1().RawString(),
Value: name,
Value: bName,
})).
Str("action", "add").
Str("file", event.TorrentFilePath).
@@ -62,9 +62,9 @@ func build(conf conf.MapConfig, st storage.DataStorage) (container.Container, er
if err != nil {
return nil, fmt.Errorf("whitelist : %s : %w", hashString, err)
}
init = append(init, storage.Entry{Key: ih.RawString(), Value: DUMMY})
init = append(init, storage.Entry{Key: ih.RawString(), Value: []byte(DUMMY)})
if len(ih) == bittorrent.InfoHashV2Len {
init = append(init, storage.Entry{Key: ih.TruncateV1().RawString(), Value: DUMMY})
init = append(init, storage.Entry{Key: ih.TruncateV1().RawString(), Value: []byte(DUMMY)})
}
}
if err := l.Storage.Put(l.StorageCtx, init...); err != nil {
+9
View File
@@ -60,6 +60,15 @@ func newStore(cfg r.Config) (*store, error) {
return nil, err
}
if cfg.PeerLifetime <= 0 {
logger.Warn().
Str("name", "peerLifetime").
Dur("provided", cfg.PeerLifetime).
Dur("default", storage.DefaultPeerLifetime).
Msg("falling back to default configuration")
cfg.PeerLifetime = storage.DefaultPeerLifetime
}
var rs r.Connection
if rs, err = cfg.Connect(); err != nil {
+8 -20
View File
@@ -4,9 +4,7 @@ package memory
import (
"encoding/binary"
"fmt"
"math"
"reflect"
"runtime"
"sync"
"time"
@@ -445,23 +443,12 @@ type dataStore struct {
sync.Map
}
func asKey(in any) any {
if in == nil {
panic("unable to use nil map key")
}
if reflect.TypeOf(in).Comparable() {
return in
}
// FIXME: dirty hack
return fmt.Sprint(in)
}
func (ds *dataStore) Put(ctx string, values ...storage.Entry) error {
if len(values) > 0 {
c, _ := ds.LoadOrStore(ctx, new(sync.Map))
m := c.(*sync.Map)
for _, p := range values {
m.Store(asKey(p.Key), p.Value)
m.Store(p.Key, p.Value)
}
}
return nil
@@ -470,17 +457,18 @@ func (ds *dataStore) Put(ctx string, values ...storage.Entry) error {
func (ds *dataStore) Contains(ctx string, key string) (bool, error) {
var exist bool
if m, found := ds.Map.Load(ctx); found {
_, exist = m.(*sync.Map).Load(asKey(key))
_, exist = m.(*sync.Map).Load(key)
}
return exist, nil
}
func (ds *dataStore) Load(ctx string, key string) (any, error) {
var v any
func (ds *dataStore) Load(ctx string, key string) (out []byte, _ error) {
if m, found := ds.Map.Load(ctx); found {
v, _ = m.(*sync.Map).Load(asKey(key))
if v, _ := m.(*sync.Map).Load(key); v != nil {
out = v.([]byte)
}
}
return v, nil
return
}
func (ds *dataStore) Delete(ctx string, keys ...string) error {
@@ -488,7 +476,7 @@ func (ds *dataStore) Delete(ctx string, keys ...string) error {
if m, found := ds.Map.Load(ctx); found {
m := m.(*sync.Map)
for _, k := range keys {
m.Delete(asKey(k))
m.Delete(k)
}
}
}
+5 -20
View File
@@ -236,12 +236,7 @@ func (s *store) Put(ctx string, values ...storage.Entry) (err error) {
var tx pgx.Tx
if tx, err = s.Begin(context.TODO()); err == nil {
for _, v := range values {
val := v.Value
switch tOut := val.(type) {
case string:
val = []byte(tOut)
}
if _, err = tx.Exec(context.TODO(), s.Data.AddQuery, ctx, []byte(v.Key), val); err != nil {
if _, err = tx.Exec(context.TODO(), s.Data.AddQuery, ctx, []byte(v.Key), v.Value); err != nil {
break
}
}
@@ -265,20 +260,10 @@ func (s *store) Contains(ctx string, key string) (contains bool, err error) {
return
}
func (s *store) Load(ctx string, key string) (out any, err error) {
var rows pgx.Rows
if rows, err = s.Query(context.TODO(), s.Data.GetQuery, ctx, []byte(key)); err == nil {
defer rows.Close()
if rows.Next() {
var values []any
if values, err = rows.Values(); err == nil && len(values) > 0 {
out = values[0]
switch tOut := out.(type) {
case []byte:
out = string(tOut)
}
}
}
func (s *store) Load(ctx string, key string) (out []byte, err error) {
row := s.QueryRow(context.TODO(), s.Data.GetQuery, ctx, []byte(key))
if err = row.Scan(&out); errors.Is(err, pgx.ErrNoRows) {
err = nil
}
return
}
+6 -6
View File
@@ -158,7 +158,7 @@ func (cfg Config) Validate() (Config, error) {
if len(cfg.Addresses) == 0 {
validCfg.Addresses = []string{defaultRedisAddress}
logger.Warn().
Str("name", "Addresses").
Str("name", "addresses").
Strs("provided", cfg.Addresses).
Strs("default", validCfg.Addresses).
Msg("falling back to default configuration")
@@ -167,7 +167,7 @@ func (cfg Config) Validate() (Config, error) {
if cfg.ReadTimeout <= 0 {
validCfg.ReadTimeout = defaultReadTimeout
logger.Warn().
Str("name", "ReadTimeout").
Str("name", "readTimeout").
Dur("provided", cfg.ReadTimeout).
Dur("default", validCfg.ReadTimeout).
Msg("falling back to default configuration")
@@ -176,7 +176,7 @@ func (cfg Config) Validate() (Config, error) {
if cfg.WriteTimeout <= 0 {
validCfg.WriteTimeout = defaultWriteTimeout
logger.Warn().
Str("name", "WriteTimeout").
Str("name", "writeTimeout").
Dur("provided", cfg.WriteTimeout).
Dur("default", validCfg.WriteTimeout).
Msg("falling back to default configuration")
@@ -185,7 +185,7 @@ func (cfg Config) Validate() (Config, error) {
if cfg.ConnectTimeout <= 0 {
validCfg.ConnectTimeout = defaultConnectTimeout
logger.Warn().
Str("name", "ConnectTimeout").
Str("name", "connectTimeout").
Dur("provided", cfg.ConnectTimeout).
Dur("default", validCfg.ConnectTimeout).
Msg("falling back to default configuration")
@@ -594,8 +594,8 @@ func (ps *Connection) Contains(ctx string, key string) (bool, error) {
}
// Load - storage.DataStorage implementation
func (ps *Connection) Load(ctx string, key string) (v any, err error) {
v, err = ps.HGet(context.TODO(), PrefixKey+ctx, key).Result()
func (ps *Connection) Load(ctx string, key string) (v []byte, err error) {
v, err = ps.HGet(context.TODO(), PrefixKey+ctx, key).Bytes()
if err != nil && errors.Is(err, redis.Nil) {
v, err = nil, nil
}
+17 -14
View File
@@ -16,9 +16,12 @@ import (
)
const (
defaultPrometheusReportingInterval = time.Second * 1
defaultGarbageCollectionInterval = time.Minute * 3
defaultPeerLifetime = time.Minute * 30
// DefaultPrometheusReportingInterval default interval of statistics collection
DefaultPrometheusReportingInterval = time.Second * 1
// DefaultGarbageCollectionInterval default interval of stale peers deletions
DefaultGarbageCollectionInterval = time.Minute * 3
// DefaultPeerLifetime default peer lifetime
DefaultPeerLifetime = time.Minute * 30
)
var (
@@ -40,21 +43,21 @@ type Config struct {
func (c Config) sanitizeGCConfig() (gcInterval, peerTTL time.Duration) {
if c.GarbageCollectionInterval <= 0 {
gcInterval = defaultGarbageCollectionInterval
gcInterval = DefaultGarbageCollectionInterval
logger.Warn().
Str("name", "GarbageCollectionInterval").
Str("name", "garbageCollectionInterval").
Dur("provided", c.GarbageCollectionInterval).
Dur("default", defaultGarbageCollectionInterval).
Dur("default", DefaultGarbageCollectionInterval).
Msg("falling back to default configuration")
} else {
gcInterval = c.GarbageCollectionInterval
}
if c.PeerLifetime <= 0 {
peerTTL = defaultPeerLifetime
peerTTL = DefaultPeerLifetime
logger.Warn().
Str("name", "PeerLifetime").
Str("name", "peerLifetime").
Dur("provided", c.PeerLifetime).
Dur("default", defaultPeerLifetime).
Dur("default", DefaultPeerLifetime).
Msg("falling back to default configuration")
} else {
peerTTL = c.PeerLifetime
@@ -64,11 +67,11 @@ func (c Config) sanitizeGCConfig() (gcInterval, peerTTL time.Duration) {
func (c Config) sanitizeStatisticsConfig() (statInterval time.Duration) {
if c.PrometheusReportingInterval < 0 {
statInterval = defaultPrometheusReportingInterval
statInterval = DefaultPrometheusReportingInterval
logger.Warn().
Str("name", "PrometheusReportingInterval").
Str("name", "prometheusReportingInterval").
Dur("provided", c.PrometheusReportingInterval).
Dur("default", defaultPrometheusReportingInterval).
Dur("default", DefaultPrometheusReportingInterval).
Msg("falling back to default configuration")
}
return
@@ -77,7 +80,7 @@ func (c Config) sanitizeStatisticsConfig() (statInterval time.Duration) {
// Entry - some key-value pair, used for BulkPut
type Entry struct {
Key string
Value any
Value []byte
}
// Builder is the function used to initialize a new type of PeerStorage.
@@ -105,7 +108,7 @@ type DataStorage interface {
Contains(ctx string, key string) (bool, error)
// Load used to get arbitrary data in specified context by its key
Load(ctx string, key string) (any, error)
Load(ctx string, key string) ([]byte, error)
// Delete used to delete arbitrary data in specified context by its keys
Delete(ctx string, keys ...string) error
+3 -3
View File
@@ -200,7 +200,7 @@ func (th *testHolder) LeecherPutGraduateAnnounceDeleteAnnounce(t *testing.T) {
func (th *testHolder) CustomPutContainsLoadDelete(t *testing.T) {
for _, c := range testData {
err := th.st.Put(kvStoreCtx, storage.Entry{Key: c.peer.String(), Value: c.ih.RawString()})
err := th.st.Put(kvStoreCtx, storage.Entry{Key: c.peer.String(), Value: []byte(c.ih.RawString())})
require.Nil(t, err)
// check if exist in ctx we put
@@ -242,7 +242,7 @@ func (th *testHolder) CustomBulkPutContainsLoadDelete(t *testing.T) {
keys = append(keys, key)
pairs = append(pairs, storage.Entry{
Key: key,
Value: c.ih.RawString(),
Value: []byte(c.ih.RawString()),
})
}
err := th.st.Put(kvStoreCtx, pairs...)
@@ -260,7 +260,7 @@ func (th *testHolder) CustomBulkPutContainsLoadDelete(t *testing.T) {
out, _ := th.st.Load(kvStoreCtx, p.Key)
ih, err := bittorrent.NewInfoHash(out)
require.Nil(t, err)
require.Equal(t, p.Value, ih.RawString())
require.Equal(t, p.Value, []byte(ih.RawString()))
}
err = th.st.Delete(kvStoreCtx, keys...)