(tested) rollback to separate v4 and v6 keys in redis/keydb

* sanitize log fields
* remove miniredis dependency
* store/collect information about hybrid (v2to1) hashes
This commit is contained in:
Lawrence, Rendall
2022-04-24 20:28:41 +03:00
parent 22f459315b
commit ef03291efe
15 changed files with 324 additions and 281 deletions
+40 -35
View File
@@ -35,10 +35,10 @@ var ErrNotKeyDB = errors.New("provided instance seems not KeyDB")
func init() {
// Register the storage driver.
storage.RegisterBuilder(Name, Builder)
storage.RegisterBuilder(Name, builder)
}
func Builder(icfg conf.MapConfig) (storage.PeerStorage, error) {
func builder(icfg conf.MapConfig) (storage.PeerStorage, error) {
var cfg r.Config
var err error
@@ -46,10 +46,10 @@ func Builder(icfg conf.MapConfig) (storage.PeerStorage, error) {
return nil, err
}
return New(cfg)
return newStore(cfg)
}
func New(cfg r.Config) (*store, error) {
func newStore(cfg r.Config) (*store, error) {
var err error
if cfg, err = cfg.Validate(); err != nil {
return nil, err
@@ -70,7 +70,11 @@ func New(cfg r.Config) (*store, error) {
var st *store
if err == nil {
st = &store{rs, cfg.LogFields(), uint(cfg.PeerLifetime.Seconds())}
st = &store{
Connection: rs,
logFields: cfg.LogFields(),
peerTTL: uint(cfg.PeerLifetime.Seconds()),
}
}
return st, err
@@ -82,27 +86,27 @@ type store struct {
peerTTL uint
}
func (s store) setPeerTTL(infoHashKey, peerId string) error {
return s.Process(context.TODO(), redis.NewCmd(context.TODO(), expireMemberCmd, infoHashKey, peerId, s.peerTTL))
func (s store) setPeerTTL(infoHashKey, peerID string) error {
return s.Process(context.TODO(), redis.NewCmd(context.TODO(), expireMemberCmd, infoHashKey, peerID, s.peerTTL))
}
func (s store) addPeer(infoHashKey, peerId string) (err error) {
func (s store) addPeer(infoHashKey, peerID string) (err error) {
log.Debug("storage: KeyDB: PutPeer", log.Fields{
"InfoHashKey": infoHashKey,
"PeerId": peerId,
"infoHashKey": infoHashKey,
"peerID": peerID,
})
if err = s.SAdd(context.TODO(), infoHashKey, peerId).Err(); err == nil {
err = s.setPeerTTL(infoHashKey, peerId)
if err = s.SAdd(context.TODO(), infoHashKey, peerID).Err(); err == nil {
err = s.setPeerTTL(infoHashKey, peerID)
}
return
}
func (s store) delPeer(infoHashKey, peerId string) error {
func (s store) delPeer(infoHashKey, peerID string) error {
log.Debug("storage: KeyDB: DeletePeer", log.Fields{
"InfoHashKey": infoHashKey,
"PeerId": peerId,
"infoHashKey": infoHashKey,
"peerID": peerID,
})
deleted, err := s.SRem(context.TODO(), infoHashKey, peerId).Uint64()
deleted, err := s.SRem(context.TODO(), infoHashKey, peerID).Uint64()
err = r.AsNil(err)
if err == nil && deleted == 0 {
err = storage.ErrResourceDoesNotExist
@@ -112,34 +116,35 @@ func (s store) delPeer(infoHashKey, peerId string) error {
}
func (s store) PutSeeder(ih bittorrent.InfoHash, peer bittorrent.Peer) error {
return s.addPeer(r.IHSeederKey+ih.RawString(), peer.RawString())
return s.addPeer(r.InfoHashKey(ih.RawString(), true, peer.Addr().Is6()), peer.RawString())
}
func (s store) DeleteSeeder(ih bittorrent.InfoHash, peer bittorrent.Peer) error {
return s.delPeer(r.IHSeederKey+ih.RawString(), peer.RawString())
return s.delPeer(r.InfoHashKey(ih.RawString(), true, peer.Addr().Is6()), peer.RawString())
}
func (s store) PutLeecher(ih bittorrent.InfoHash, peer bittorrent.Peer) error {
return s.addPeer(r.IHLeecherKey+ih.RawString(), peer.RawString())
return s.addPeer(r.InfoHashKey(ih.RawString(), false, peer.Addr().Is6()), peer.RawString())
}
func (s store) DeleteLeecher(ih bittorrent.InfoHash, peer bittorrent.Peer) error {
return s.delPeer(r.IHLeecherKey+ih.RawString(), peer.RawString())
return s.delPeer(r.InfoHashKey(ih.RawString(), false, peer.Addr().Is6()), peer.RawString())
}
func (s store) GraduateLeecher(ih bittorrent.InfoHash, peer bittorrent.Peer) (err error) {
log.Debug("storage: KeyDB: GraduateLeecher", log.Fields{
"InfoHash": ih,
"Peer": peer,
"infoHash": ih,
"peer": peer,
})
infoHash, peerId := ih.RawString(), peer.RawString()
ihSeederKey, ihLeecherKey := r.IHSeederKey+infoHash, r.IHLeecherKey+infoHash
infoHash, peerID := ih.RawString(), peer.RawString()
ihSeederKey := r.InfoHashKey(infoHash, true, peer.Addr().Is6())
ihLeecherKey := r.InfoHashKey(infoHash, false, peer.Addr().Is6())
var moved bool
if moved, err = s.SMove(context.TODO(), ihLeecherKey, ihSeederKey, peerId).Result(); err == nil {
if moved, err = s.SMove(context.TODO(), ihLeecherKey, ihSeederKey, peerID).Result(); err == nil {
if moved {
err = s.setPeerTTL(ihSeederKey, peerId)
err = s.setPeerTTL(ihSeederKey, peerID)
} else {
err = s.addPeer(ihSeederKey, peerId)
err = s.addPeer(ihSeederKey, peerID)
}
}
return err
@@ -148,22 +153,22 @@ func (s store) GraduateLeecher(ih bittorrent.InfoHash, peer bittorrent.Peer) (er
// AnnouncePeers is the same function as redis.AnnouncePeers
func (s store) AnnouncePeers(ih bittorrent.InfoHash, seeder bool, numWant int, peer bittorrent.Peer) ([]bittorrent.Peer, error) {
log.Debug("storage: KeyDB: AnnouncePeers", log.Fields{
"InfoHash": ih,
"Seeder": seeder,
"NumWant": numWant,
"Peer": peer,
"infoHash": ih,
"seeder": seeder,
"numWant": numWant,
"peer": peer,
})
return s.GetPeers(ih, seeder, numWant, peer, func(ctx context.Context, infoHashKey string) *redis.StringSliceCmd {
return s.SRandMemberN(context.TODO(), infoHashKey, int64(numWant))
return s.GetPeers(ih, seeder, numWant, peer, func(ctx context.Context, infoHashKey string, maxCount int) *redis.StringSliceCmd {
return s.SRandMemberN(context.TODO(), infoHashKey, int64(maxCount))
})
}
// ScrapeSwarm is the same function as redis.ScrapeSwarm except `SCard` call instead of `HLen`
func (s store) ScrapeSwarm(ih bittorrent.InfoHash, peer bittorrent.Peer) (leechers uint32, seeders uint32, snatched uint32) {
log.Debug("storage: KeyDB ScrapeSwarm", log.Fields{
"InfoHash": ih,
"Peer": peer,
"infoHash": ih,
"peer": peer,
})
leechers, seeders = s.CountPeers(ih, s.SCard)
return
+1 -1
View File
@@ -21,7 +21,7 @@ var cfg = r.Config{
func createNew() s.PeerStorage {
var ps s.PeerStorage
var err error
ps, err = New(cfg)
ps, err = newStore(cfg)
if err != nil {
panic(fmt.Sprint("Unable to create KeyDB connection: ", err, "\nThis driver needs real KeyDB instance"))
}
+7 -7
View File
@@ -30,10 +30,10 @@ const (
func init() {
// Register the storage driver.
storage.RegisterBuilder(Name, Builder)
storage.RegisterBuilder(Name, builder)
}
func Builder(icfg conf.MapConfig) (storage.PeerStorage, error) {
func builder(icfg conf.MapConfig) (storage.PeerStorage, error) {
var cfg Config
if err := icfg.Unmarshal(&cfg); err != nil {
return nil, err
@@ -49,8 +49,8 @@ type Config struct {
// LogFields renders the current config as a set of Logrus fields.
func (cfg Config) LogFields() log.Fields {
return log.Fields{
"Name": Name,
"ShardCount": cfg.ShardCount,
"name": Name,
"shardCount": cfg.ShardCount,
}
}
@@ -64,9 +64,9 @@ func (cfg Config) Validate() Config {
if cfg.ShardCount <= 0 || cfg.ShardCount > (math.MaxInt/2) {
validcfg.ShardCount = defaultShardCount
log.Warn("falling back to default configuration", log.Fields{
"Name": Name + ".ShardCount",
"Provided": cfg.ShardCount,
"Default": validcfg.ShardCount,
"name": Name + ".ShardCount",
"provided": cfg.ShardCount,
"default": validcfg.ShardCount,
})
}
+185 -134
View File
@@ -2,7 +2,7 @@
// BitTorrent tracker keeping peer data in redis with hash.
// There two categories of hash:
//
// - CHI_{L,S}_<HASH> (hash type)
// - CHI_{L,S}{4,6}_<HASH> (hash type)
// To save peers that hold the infohash, used for fast searching,
// deleting, and timeout handling
//
@@ -50,10 +50,14 @@ const (
PrefixKey = "CHI_"
// IHKey redis hash key for all info hashes
IHKey = "CHI_I"
// IHSeederKey redis hash key prefix for seeders
IHSeederKey = "CHI_S_"
// IHLeecherKey redis hash key prefix for leechers
IHLeecherKey = "CHI_L_"
// IH4SeederKey redis hash key prefix for IPv4 seeders
IH4SeederKey = "CHI_S4_"
// IH6SeederKey redis hash key prefix for IPv6 seeders
IH6SeederKey = "CHI_S6_"
// IH4LeecherKey redis hash key prefix for IPv4 leechers
IH4LeecherKey = "CHI_L4_"
// IH6LeecherKey redis hash key prefix for IPv6 leechers
IH6LeecherKey = "CHI_L6_"
// CountSeederKey redis key for seeder count
CountSeederKey = "CHI_C_S"
// CountLeecherKey redis key for leecher count
@@ -65,10 +69,10 @@ var ErrSentinelAndClusterChecked = errors.New("unable to use both cluster and se
func init() {
// Register the storage builder.
storage.RegisterBuilder(Name, Builder)
storage.RegisterBuilder(Name, builder)
}
func Builder(icfg conf.MapConfig) (storage.PeerStorage, error) {
func builder(icfg conf.MapConfig) (storage.PeerStorage, error) {
// Unmarshal the bytes into the proper config type.
var cfg Config
@@ -76,10 +80,10 @@ func Builder(icfg conf.MapConfig) (storage.PeerStorage, error) {
return nil, err
}
return New(cfg)
return newStore(cfg)
}
func New(cfg Config) (*store, error) {
func newStore(cfg Config) (*store, error) {
cfg, err := cfg.Validate()
if err != nil {
return nil, err
@@ -117,17 +121,17 @@ type Config struct {
// LogFields renders the current config as a set of Logrus fields.
func (cfg Config) LogFields() log.Fields {
return log.Fields{
"Name": Name,
"PeerLifetime": cfg.PeerLifetime,
"Addresses": cfg.Addresses,
"DB": cfg.DB,
"PoolSize": cfg.PoolSize,
"Sentinel": cfg.Sentinel,
"SentinelMaster": cfg.SentinelMaster,
"Cluster": cfg.Cluster,
"ReadTimeout": cfg.ReadTimeout,
"WriteTimeout": cfg.WriteTimeout,
"ConnectTimeout": cfg.ConnectTimeout,
"name": Name,
"peerLifetime": cfg.PeerLifetime,
"addresses": cfg.Addresses,
"db": cfg.DB,
"poolSize": cfg.PoolSize,
"sentinel": cfg.Sentinel,
"sentinelMaster": cfg.SentinelMaster,
"cluster": cfg.Cluster,
"readTimeout": cfg.ReadTimeout,
"writeTimeout": cfg.WriteTimeout,
"connectTimeout": cfg.ConnectTimeout,
}
}
@@ -154,42 +158,43 @@ func (cfg Config) Validate() (Config, error) {
if len(cfg.Addresses) == 0 {
validCfg.Addresses = []string{defaultRedisAddress}
log.Warn("falling back to default configuration", log.Fields{
"Name": Name + ".Addresses",
"Provided": cfg.Addresses,
"Default": validCfg.Addresses,
"name": Name + ".Addresses",
"provided": cfg.Addresses,
"default": validCfg.Addresses,
})
}
if cfg.ReadTimeout <= 0 {
validCfg.ReadTimeout = defaultReadTimeout
log.Warn("falling back to default configuration", log.Fields{
"Name": Name + ".ReadTimeout",
"Provided": cfg.ReadTimeout,
"Default": validCfg.ReadTimeout,
"name": Name + ".ReadTimeout",
"provided": cfg.ReadTimeout,
"default": validCfg.ReadTimeout,
})
}
if cfg.WriteTimeout <= 0 {
validCfg.WriteTimeout = defaultWriteTimeout
log.Warn("falling back to default configuration", log.Fields{
"Name": Name + ".WriteTimeout",
"Provided": cfg.WriteTimeout,
"Default": validCfg.WriteTimeout,
"name": Name + ".WriteTimeout",
"provided": cfg.WriteTimeout,
"default": validCfg.WriteTimeout,
})
}
if cfg.ConnectTimeout <= 0 {
validCfg.ConnectTimeout = defaultConnectTimeout
log.Warn("falling back to default configuration", log.Fields{
"Name": Name + ".ConnectTimeout",
"Provided": cfg.ConnectTimeout,
"Default": validCfg.ConnectTimeout,
"name": Name + ".ConnectTimeout",
"provided": cfg.ConnectTimeout,
"default": validCfg.ConnectTimeout,
})
}
return validCfg, nil
}
// Connect creates redis client from configuration
func (cfg Config) Connect() (con Connection, err error) {
var rs redis.UniversalClient
switch {
@@ -287,6 +292,7 @@ func (ps *store) ScheduleStatisticsCollection(reportInterval time.Duration) {
}()
}
// Connection is wrapper for redis.UniversalClient
type Connection struct {
redis.UniversalClient
}
@@ -309,7 +315,7 @@ func (ps *store) count(key string, getLength bool) (n uint64) {
if err != nil {
log.Error("storage: Redis: GET/SCARD failure", log.Fields{
"key": key,
"Error": err,
"error": err,
})
}
return
@@ -336,6 +342,8 @@ func (ps *store) tx(txf func(tx redis.Pipeliner) error) (err error) {
return
}
// AsNil returns nil if provided err is redis.Nil
// otherwise returns err
func AsNil(err error) error {
if err == nil || errors.Is(err, redis.Nil) {
return nil
@@ -343,14 +351,37 @@ func AsNil(err error) error {
return err
}
func (ps *store) putPeer(infoHashKey, peerCountKey, peerId string) error {
// InfoHashKey generates redis key for provided hash and flags
func InfoHashKey(infoHash string, seeder, v6 bool) (infoHashKey string) {
var bm int
if seeder {
bm = 0b01
}
if v6 {
bm |= 0b10
}
switch bm {
case 0b11:
infoHashKey = IH6SeederKey
case 0b10:
infoHashKey = IH6LeecherKey
case 0b01:
infoHashKey = IH4SeederKey
case 0b00:
infoHashKey = IH4LeecherKey
}
infoHashKey += infoHash
return
}
func (ps *store) putPeer(infoHashKey, peerCountKey, peerID string) error {
log.Debug("storage: Redis: PutPeer", log.Fields{
"InfoHashKey": infoHashKey,
"PeerCountKey": peerCountKey,
"PeerId": peerId,
"infoHashKey": infoHashKey,
"peerCountKey": peerCountKey,
"peerID": peerID,
})
return ps.tx(func(tx redis.Pipeliner) (err error) {
if err = tx.HSet(context.TODO(), infoHashKey, peerId, ps.getClock()).Err(); err != nil {
if err = tx.HSet(context.TODO(), infoHashKey, peerID, ps.getClock()).Err(); err != nil {
return
}
if err = tx.Incr(context.TODO(), peerCountKey).Err(); err != nil {
@@ -361,13 +392,13 @@ func (ps *store) putPeer(infoHashKey, peerCountKey, peerId string) error {
})
}
func (ps *store) delPeer(infoHashKey, peerCountKey, peerId string) error {
func (ps *store) delPeer(infoHashKey, peerCountKey, peerID string) error {
log.Debug("storage: Redis: DeletePeer", log.Fields{
"InfoHashKey": infoHashKey,
"PeerCountKey": peerCountKey,
"PeerId": peerId,
"infoHashKey": infoHashKey,
"peerCountKey": peerCountKey,
"peerID": peerID,
})
deleted, err := ps.HDel(context.TODO(), infoHashKey, peerId).Uint64()
deleted, err := ps.HDel(context.TODO(), infoHashKey, peerID).Uint64()
err = AsNil(err)
if err == nil {
if deleted == 0 {
@@ -381,32 +412,32 @@ func (ps *store) delPeer(infoHashKey, peerCountKey, peerId string) error {
}
func (ps *store) PutSeeder(ih bittorrent.InfoHash, peer bittorrent.Peer) error {
return ps.putPeer(IHSeederKey+ih.RawString(), CountSeederKey, peer.RawString())
return ps.putPeer(InfoHashKey(ih.RawString(), true, peer.Addr().Is6()), CountSeederKey, peer.RawString())
}
func (ps *store) DeleteSeeder(ih bittorrent.InfoHash, peer bittorrent.Peer) error {
return ps.delPeer(IHSeederKey+ih.RawString(), CountSeederKey, peer.RawString())
return ps.delPeer(InfoHashKey(ih.RawString(), true, peer.Addr().Is6()), CountSeederKey, peer.RawString())
}
func (ps *store) PutLeecher(ih bittorrent.InfoHash, peer bittorrent.Peer) error {
return ps.putPeer(IHLeecherKey+ih.RawString(), CountLeecherKey, peer.RawString())
return ps.putPeer(InfoHashKey(ih.RawString(), false, peer.Addr().Is6()), CountLeecherKey, peer.RawString())
}
func (ps *store) DeleteLeecher(ih bittorrent.InfoHash, peer bittorrent.Peer) error {
return ps.delPeer(IHLeecherKey+ih.RawString(), CountLeecherKey, peer.RawString())
return ps.delPeer(InfoHashKey(ih.RawString(), false, peer.Addr().Is6()), CountLeecherKey, peer.RawString())
}
func (ps *store) GraduateLeecher(ih bittorrent.InfoHash, peer bittorrent.Peer) error {
log.Debug("storage: Redis: GraduateLeecher", log.Fields{
"InfoHash": ih,
"Peer": peer,
"infoHash": ih,
"peer": peer,
})
infoHash, peerId := ih.RawString(), peer.RawString()
ihSeederKey, ihLeecherKey := IHSeederKey+infoHash, IHLeecherKey+infoHash
infoHash, peerID, isV6 := ih.RawString(), peer.RawString(), peer.Addr().Is6()
ihSeederKey, ihLeecherKey := InfoHashKey(infoHash, true, isV6), InfoHashKey(infoHash, false, isV6)
return ps.tx(func(tx redis.Pipeliner) error {
deleted, err := tx.HDel(context.TODO(), ihLeecherKey, peerId).Uint64()
deleted, err := tx.HDel(context.TODO(), ihLeecherKey, peerID).Uint64()
err = AsNil(err)
if err == nil {
if deleted > 0 {
@@ -414,7 +445,7 @@ func (ps *store) GraduateLeecher(ih bittorrent.InfoHash, peer bittorrent.Peer) e
}
}
if err == nil {
err = tx.HSet(context.TODO(), ihSeederKey, peerId, ps.getClock()).Err()
err = tx.HSet(context.TODO(), ihSeederKey, peerID, ps.getClock()).Err()
}
if err == nil {
err = tx.Incr(context.TODO(), CountSeederKey).Err()
@@ -426,19 +457,16 @@ func (ps *store) GraduateLeecher(ih bittorrent.InfoHash, peer bittorrent.Peer) e
})
}
func (ps Connection) parsePeersList(peersResult *redis.StringSliceCmd, skipPeerId string, v4Only bool) (peers []bittorrent.Peer, err error) {
func (ps Connection) parsePeersList(peersResult *redis.StringSliceCmd, skipPeerID string) (peers []bittorrent.Peer, err error) {
var peerIds []string
peerIds, err = peersResult.Result()
if err = AsNil(err); err == nil {
for _, peerId := range peerIds {
if peerId != skipPeerId {
if p, err := bittorrent.NewPeer(peerId); err == nil {
// If peer from request is V4 only, it won't receive V6 peers from DB
if !(v4Only && p.Addr().Is6()) {
peers = append(peers, p)
}
for _, peerID := range peerIds {
if peerID != skipPeerID {
if p, err := bittorrent.NewPeer(peerID); err == nil {
peers = append(peers, p)
} else {
log.Error("storage: Redis: unable to decode leecher", log.Fields{"PeerId": peerId})
log.Error("storage: Redis: unable to decode leecher", log.Fields{"peerID": peerID})
}
}
}
@@ -446,31 +474,52 @@ func (ps Connection) parsePeersList(peersResult *redis.StringSliceCmd, skipPeerI
return
}
func (ps Connection) GetPeers(ih bittorrent.InfoHash, forSeeder bool, maxCount int, peer bittorrent.Peer,
membersFn func(context.Context, string) *redis.StringSliceCmd) (peers []bittorrent.Peer, err error) {
type getPeersFn func(context.Context, string, int) *redis.StringSliceCmd
infoHash, peerId, isV4 := ih.RawString(), peer.RawString(), peer.Addr().Is4()
// GetPeers retrieves peers for provided info hash by calling membersFn and
// converts result to bittorrent.Peer array.
// If forSeeder set to true - returns only leechers, if false -
// seeders and if maxCount not reached - leechers.
func (ps Connection) GetPeers(ih bittorrent.InfoHash, forSeeder bool, maxCount int, peer bittorrent.Peer, membersFn getPeersFn) (out []bittorrent.Peer, err error) {
infoHash, peerID, isV6 := ih.RawString(), peer.RawString(), peer.Addr().Is6()
var infoHashKeys []string
if forSeeder {
peers, err = ps.parsePeersList(membersFn(context.TODO(), IHLeecherKey+infoHash), peerId, isV4)
infoHashKeys = append(infoHashKeys, InfoHashKey(infoHash, false, isV6),
InfoHashKey(infoHash, false, !isV6))
} else {
// Append as many seeders as possible.
peers, err = ps.parsePeersList(membersFn(context.TODO(), IHSeederKey+infoHash), peerId, isV4)
if err != nil {
return
}
// Append as many peers as possible.
// Priority:
// same ip family seeders > same ip family leechers >
// foreign ip family seeders > foreign ip family leechers
infoHashKeys = append(infoHashKeys,
InfoHashKey(infoHash, true, isV6),
InfoHashKey(infoHash, false, isV6),
InfoHashKey(infoHash, true, !isV6),
InfoHashKey(infoHash, false, !isV6))
}
if maxCount -= len(peers); maxCount > 0 {
if leechers, err := ps.parsePeersList(membersFn(context.TODO(), IHLeecherKey+infoHash), peerId, isV4); err == nil {
peers = append(peers, leechers...)
} else {
log.Warn("storage: Redis: error occurred while receiving leechers", log.Fields{"InfoHash": ih, "Error": err})
}
for _, infoHashKey := range infoHashKeys {
var peers []bittorrent.Peer
peers, err = ps.parsePeersList(membersFn(context.TODO(), infoHashKey, maxCount), peerID)
maxCount -= len(peers)
out = append(out, peers...)
if err != nil || maxCount <= 0 {
break
}
}
if len(peers) == 0 && !isV4 {
if l := len(out); err == nil {
if l == 0 {
err = storage.ErrResourceDoesNotExist
}
} else if l > 0 {
err = nil
log.Warn("storage: Redis: error occurred while retrieving peers", log.Fields{
"infoHash": infoHash,
"error": err,
})
}
return
@@ -478,50 +527,47 @@ func (ps Connection) GetPeers(ih bittorrent.InfoHash, forSeeder bool, maxCount i
func (ps *store) AnnouncePeers(ih bittorrent.InfoHash, seeder bool, numWant int, peer bittorrent.Peer) ([]bittorrent.Peer, error) {
log.Debug("storage: Redis: AnnouncePeers", log.Fields{
"InfoHash": ih,
"Seeder": seeder,
"NumWant": numWant,
"Peer": peer,
"infoHash": ih,
"seeder": seeder,
"numWant": numWant,
"peer": peer,
})
return ps.GetPeers(ih, seeder, numWant, peer, func(ctx context.Context, infoHashKey string) *redis.StringSliceCmd {
return ps.HRandField(ctx, infoHashKey, numWant, false)
return ps.GetPeers(ih, seeder, numWant, peer, func(ctx context.Context, infoHashKey string, maxCount int) *redis.StringSliceCmd {
return ps.HRandField(ctx, infoHashKey, maxCount, false)
})
}
func (ps Connection) CountPeers(ih bittorrent.InfoHash, countFn func(context.Context, string) *redis.IntCmd) (leechersCount, seedersCount uint32) {
type getPeerCountFn func(context.Context, string) *redis.IntCmd
func (ps Connection) countPeers(infoHashKey string, countFn getPeerCountFn) uint32 {
count, err := countFn(context.TODO(), infoHashKey).Result()
err = AsNil(err)
if err != nil {
log.Error("storage: Redis: key size calculation failure", log.Fields{
"infoHashKey": infoHashKey,
"error": err,
})
}
return uint32(count)
}
// CountPeers calls provided countFn and returns seeders and leechers count for specified info hash
func (ps Connection) CountPeers(ih bittorrent.InfoHash, countFn getPeerCountFn) (leechersCount, seedersCount uint32) {
infoHash := ih.RawString()
ihSeederKey, ihLeecherKey := IHSeederKey+infoHash, IHLeecherKey+infoHash
count, err := countFn(context.TODO(), ihLeecherKey).Result()
err = AsNil(err)
if err != nil {
log.Error("storage: Redis: key size calculation failure", log.Fields{
"InfoHashKey": ihLeecherKey,
"Error": err,
})
return
}
leechersCount = uint32(count)
count, err = countFn(context.TODO(), ihSeederKey).Result()
err = AsNil(err)
if err != nil {
log.Error("storage: Redis: key size calculation failure", log.Fields{
"InfoHashKey": ihSeederKey,
"Error": err,
})
return
}
seedersCount = uint32(count)
leechersCount = ps.countPeers(InfoHashKey(infoHash, false, false), countFn) +
ps.countPeers(InfoHashKey(infoHash, false, true), countFn)
seedersCount = ps.countPeers(InfoHashKey(infoHash, true, false), countFn) +
ps.countPeers(InfoHashKey(infoHash, true, true), countFn)
return
}
func (ps *store) ScrapeSwarm(ih bittorrent.InfoHash, peer bittorrent.Peer) (leechers uint32, seeders uint32, snatched uint32) {
func (ps *store) ScrapeSwarm(ih bittorrent.InfoHash, peer bittorrent.Peer) (leechers uint32, seeders uint32, _ uint32) {
log.Debug("storage: Redis ScrapeSwarm", log.Fields{
"InfoHash": ih,
"Peer": peer,
"infoHash": ih,
"peer": peer,
})
leechers, seeders = ps.CountPeers(ih, ps.HLen)
@@ -531,6 +577,7 @@ func (ps *store) ScrapeSwarm(ih bittorrent.InfoHash, peer bittorrent.Peer) (leec
const argNumErrorMsg = "ERR wrong number of arguments"
// Put - storage.DataStorage implementation
func (ps Connection) Put(ctx string, values ...storage.Entry) (err error) {
if l := len(values); l > 0 {
if l == 1 {
@@ -556,11 +603,13 @@ func (ps Connection) Put(ctx string, values ...storage.Entry) (err error) {
return
}
// Contains - storage.DataStorage implementation
func (ps Connection) Contains(ctx string, key string) (bool, error) {
exist, err := ps.HExists(context.TODO(), PrefixKey+ctx, key).Result()
return exist, AsNil(err)
}
// Load - storage.DataStorage implementation
func (ps Connection) Load(ctx string, key string) (v any, err error) {
v, err = ps.HGet(context.TODO(), PrefixKey+ctx, key).Result()
if err != nil && errors.Is(err, redis.Nil) {
@@ -569,6 +618,7 @@ func (ps Connection) Load(ctx string, key string) (v any, err error) {
return
}
// Delete - storage.DataStorage implementation
func (ps Connection) Delete(ctx string, keys ...string) (err error) {
if len(keys) > 0 {
err = AsNil(ps.HDel(context.TODO(), PrefixKey+ctx, keys...).Err())
@@ -586,6 +636,7 @@ func (ps Connection) Delete(ctx string, keys ...string) (err error) {
return
}
// Preservable - storage.DataStorage implementation
func (Connection) Preservable() bool {
return true
}
@@ -645,12 +696,12 @@ func (ps *store) gc(cutoff time.Time) {
for _, infoHashKey := range infoHashKeys {
var cntKey string
var seeder bool
if seeder = strings.HasPrefix(infoHashKey, IHSeederKey); seeder {
if seeder = strings.HasPrefix(infoHashKey, IH4SeederKey) || strings.HasPrefix(infoHashKey, IH6SeederKey); seeder {
cntKey = CountSeederKey
} else if strings.HasPrefix(infoHashKey, IHLeecherKey) {
} else if strings.HasPrefix(infoHashKey, IH4LeecherKey) || strings.HasPrefix(infoHashKey, IH6LeecherKey) {
cntKey = CountLeecherKey
} else {
log.Warn("storage: Redis: unexpected record found in info hash set", log.Fields{"InfoHashKey": infoHashKey})
log.Warn("storage: Redis: unexpected record found in info hash set", log.Fields{"infoHashKey": infoHashKey})
continue
}
// list all (peer, timeout) pairs for the ih
@@ -658,18 +709,18 @@ func (ps *store) gc(cutoff time.Time) {
err = AsNil(err)
if err == nil {
peersToRemove := make([]string, 0)
for peerId, timeStamp := range peerList {
for peerID, timeStamp := range peerList {
if mtime, err := strconv.ParseInt(timeStamp, 10, 64); err == nil {
if mtime <= cutoffNanos {
log.Debug("storage: Redis: adding peer to remove list", log.Fields{"PeerId": peerId})
peersToRemove = append(peersToRemove, peerId)
log.Debug("storage: Redis: adding peer to remove list", log.Fields{"peerID": peerID})
peersToRemove = append(peersToRemove, peerID)
}
} else {
log.Error("storage: Redis: unable to decode peer timestamp", log.Fields{
"InfoHashKey": infoHashKey,
"PeerId": peerId,
"Timestamp": timeStamp,
"Error": err,
"infoHashKey": infoHashKey,
"peerID": peerID,
"timestamp": timeStamp,
"error": err,
})
}
}
@@ -684,9 +735,9 @@ func (ps *store) gc(cutoff time.Time) {
err = AsNil(err)
if err != nil {
log.Error("storage: Redis: unable to delete peer", log.Fields{
"InfoHashKey": infoHashKey,
"PeerId": k,
"Error": err,
"infoHashKey": infoHashKey,
"peerID": k,
"error": err,
})
} else {
removedPeerCount += count
@@ -694,18 +745,18 @@ func (ps *store) gc(cutoff time.Time) {
}
} else {
log.Error("storage: Redis: unable to delete peers", log.Fields{
"InfoHashKey": infoHashKey,
"PeerIds": peersToRemove,
"Error": err,
"infoHashKey": infoHashKey,
"peerIds": peersToRemove,
"error": err,
})
}
}
if removedPeerCount > 0 { // DECR seeder/leecher counter
if err = ps.DecrBy(context.Background(), cntKey, removedPeerCount).Err(); err != nil {
log.Error("storage: Redis: unable to decrement seeder/leecher peer count", log.Fields{
"InfoHashKey": infoHashKey,
"CountKey": cntKey,
"Error": err,
"infoHashKey": infoHashKey,
"countKey": cntKey,
"error": err,
})
}
}
@@ -725,19 +776,19 @@ func (ps *store) gc(cutoff time.Time) {
}, infoHashKey))
if err != nil {
log.Error("storage: Redis: unable to clean info hash records", log.Fields{
"InfoHashKey": infoHashKey,
"Error": err,
"infoHashKey": infoHashKey,
"error": err,
})
}
} else {
log.Error("storage: Redis: unable to fetch info hash peers", log.Fields{
"InfoHashKey": infoHashKey,
"Error": err,
"infoHashKey": infoHashKey,
"error": err,
})
}
}
} else {
log.Error("storage: Redis: unable to fetch info hash set", log.Fields{"HashSet": IHKey, "Error": err})
log.Error("storage: Redis: unable to fetch info hash set", log.Fields{"hashSet": IHKey, "error": err})
}
}
+2 -14
View File
@@ -5,8 +5,6 @@ import (
"testing"
"time"
"github.com/alicebob/miniredis"
s "github.com/sot-tech/mochi/storage"
"github.com/sot-tech/mochi/storage/test"
)
@@ -22,19 +20,9 @@ var cfg = Config{
func createNew() s.PeerStorage {
var ps s.PeerStorage
var err error
ps, err = New(cfg)
ps, err = newStore(cfg)
if err != nil {
fmt.Println("unable to create real redis connection: ", err, " using simulator")
var rs *miniredis.Miniredis
rs, err = miniredis.Run()
if err != nil {
panic(err)
}
cfg.Addresses = []string{rs.Addr()}
ps, err = New(cfg)
}
if err != nil {
panic(err)
panic(fmt.Sprint("Unable to create KeyDB connection: ", err, "\nThis driver needs real Redis instance"))
}
return ps
}
+9 -9
View File
@@ -39,9 +39,9 @@ func (c Config) sanitizeGCConfig() (gcInterval, peerTTL time.Duration) {
if c.GarbageCollectionInterval <= 0 {
gcInterval = defaultGarbageCollectionInterval
log.Warn("falling back to default configuration", log.Fields{
"Name": "GarbageCollectionInterval",
"Provided": c.GarbageCollectionInterval,
"Default": defaultGarbageCollectionInterval,
"name": "GarbageCollectionInterval",
"provided": c.GarbageCollectionInterval,
"default": defaultGarbageCollectionInterval,
})
} else {
gcInterval = c.GarbageCollectionInterval
@@ -49,9 +49,9 @@ func (c Config) sanitizeGCConfig() (gcInterval, peerTTL time.Duration) {
if c.PeerLifetime <= 0 {
peerTTL = defaultPeerLifetime
log.Warn("falling back to default configuration", log.Fields{
"Name": "PeerLifetime",
"Provided": c.PeerLifetime,
"Default": defaultPeerLifetime,
"name": "PeerLifetime",
"provided": c.PeerLifetime,
"default": defaultPeerLifetime,
})
} else {
peerTTL = c.PeerLifetime
@@ -63,9 +63,9 @@ func (c Config) sanitizeStatisticsConfig() (statInterval time.Duration) {
if c.PrometheusReportingInterval < 0 {
statInterval = defaultPrometheusReportingInterval
log.Warn("falling back to default configuration", log.Fields{
"Name": "PrometheusReportingInterval",
"Provided": c.PrometheusReportingInterval,
"Default": defaultPrometheusReportingInterval,
"name": "PrometheusReportingInterval",
"provided": c.PrometheusReportingInterval,
"default": defaultPrometheusReportingInterval,
})
}
return