(done) replace redigo with go-redis

* replace redis keys with RawString encoded values (delete SerializedPeer)
* merge peers got from pre-hools with store data
This commit is contained in:
Lawrence, Rendall
2022-04-14 00:55:58 +03:00
parent 1fcddf5102
commit 781fa9440f
20 changed files with 490 additions and 455 deletions

View File

@@ -55,6 +55,7 @@ const (
defaultConnectTimeout = time.Second * 15
)
// ErrSentinelAndClusterChecked returned from initializer if both Config.Sentinel and Config.Cluster provided
var ErrSentinelAndClusterChecked = errors.New("unable to use both cluster and sentinel mode")
func init() {
@@ -118,7 +119,6 @@ func (cfg Config) LogFields() log.Fields {
//
// This function warns to the logger when a value is changed.
func (cfg Config) Validate() (Config, error) {
if cfg.Sentinel && cfg.Cluster {
return cfg, ErrSentinelAndClusterChecked
}
@@ -203,7 +203,7 @@ func (cfg Config) Validate() (Config, error) {
func connect(cfg Config) (*store, error) {
var err error
db := &store{
// FIXME: get context from parent
// FIXME: get context from parent and put into GC, middleware functions should use own ctx
ctx: context.TODO(),
}
switch {
@@ -243,8 +243,8 @@ func connect(cfg Config) (*store, error) {
if err = db.con.Ping(db.ctx).Err(); err == nil && !errors.Is(err, redis.Nil) {
err = nil
} else {
db = nil
_ = db.con.Close()
db = nil
}
return db, err
}
@@ -287,9 +287,7 @@ func (ps *store) runGC(gcInterval, peerLifeTime time.Duration) {
case <-time.After(gcInterval):
before := time.Now().Add(-peerLifeTime)
log.Debug("storage: purging peers with no announces since", log.Fields{"before": before})
if err := ps.collectGarbage(before); err != nil {
log.Error("storage: collectGarbage error", log.Fields{"before": before, "error": err})
}
ps.collectGarbage(before)
}
}
}
@@ -321,39 +319,44 @@ type store struct {
var groups = []string{bittorrent.IPv4.String(), bittorrent.IPv6.String()}
func leecherInfohashKey(af, ih string) string {
return af + "_L_" + ih
// leecherInfoHashKey generates string IPvN_L_hash
func leecherInfoHashKey(addressFamily, infoHash string) string {
return addressFamily + "_L_" + infoHash
}
func seederInfohashKey(af, ih string) string {
return af + "_S_" + ih
// seederInfoHashKey generates string IPvN_S_hash
func seederInfoHashKey(addressFamily, infoHash string) string {
return addressFamily + "_S_" + infoHash
}
func infohashCountKey(af string) string {
return af + "_infohash_count"
// seederInfoHashKey generates string IPvN_infohash_count
func infoHashCountKey(addressFamily string) string {
return addressFamily + "_infohash_count"
}
func seederCountKey(af string) string {
return af + "_S_count"
// seederInfoHashKey generates string IPvN_L_count
func leecherCountKey(addressFamily string) string {
return addressFamily + "_L_count"
}
func leecherCountKey(af string) string {
return af + "_L_count"
// seederInfoHashKey generates string IPvN_S_count
func seederCountKey(addressFamily string) string {
return addressFamily + "_S_count"
}
// populateProm aggregates metrics over all groups and then posts them to
// prometheus.
func (ps *store) populateProm() {
var numInfohashes, numSeeders, numLeechers int64
var numInfoHashes, numSeeders, numLeechers int64
for _, group := range groups {
if n, err := ps.con.Get(ps.ctx, infohashCountKey(group)).Int64(); err != nil && !errors.Is(err, redis.Nil) {
if n, err := ps.con.Get(ps.ctx, infoHashCountKey(group)).Int64(); err != nil && !errors.Is(err, redis.Nil) {
log.Error("storage: GET counter failure", log.Fields{
"key": infohashCountKey(group),
"key": infoHashCountKey(group),
"error": err,
})
} else {
numInfohashes += n
numInfoHashes += n
}
if n, err := ps.con.Get(ps.ctx, seederCountKey(group)).Int64(); err != nil && !errors.Is(err, redis.Nil) {
log.Error("storage: GET counter failure", log.Fields{
@@ -373,7 +376,7 @@ func (ps *store) populateProm() {
}
}
storage.PromInfohashesCount.Set(float64(numInfohashes))
storage.PromInfoHashesCount.Set(float64(numInfoHashes))
storage.PromSeedersCount.Set(float64(numSeeders))
storage.PromLeechersCount.Set(float64(numLeechers))
}
@@ -386,7 +389,7 @@ func (ps *store) tx(txf func(tx redis.Pipeliner) error) (err error) {
if pipe, txErr := ps.con.TxPipelined(ps.ctx, txf); txErr == nil {
errs := make([]string, 0)
for _, c := range pipe {
if err := c.Err(); err == nil {
if err := c.Err(); err != nil {
errs = append(errs, err.Error())
}
}
@@ -409,16 +412,15 @@ func asNil(err error) error {
func (ps *store) PutSeeder(ih bittorrent.InfoHash, p bittorrent.Peer) error {
addressFamily := p.IP.AddressFamily.String()
log.Debug("storage: PutSeeder", log.Fields{
"InfoHash": ih.String(),
"InfoHash": ih,
"Peer": p,
})
peerKey := storage.NewSerializedPeer(p).String()
encodedSeederInfoHash := seederInfohashKey(addressFamily, ih.String())
encodedSeederInfoHash := seederInfoHashKey(addressFamily, ih.RawString())
now := ps.getClock()
return ps.tx(func(tx redis.Pipeliner) (err error) {
if err = tx.HSet(ps.ctx, encodedSeederInfoHash, peerKey, now).Err(); err != nil {
if err = tx.HSet(ps.ctx, encodedSeederInfoHash, p.RawString(), now).Err(); err != nil {
return
}
if err = ps.con.Incr(ps.ctx, seederCountKey(addressFamily)).Err(); err != nil {
@@ -427,7 +429,7 @@ func (ps *store) PutSeeder(ih bittorrent.InfoHash, p bittorrent.Peer) error {
if err = ps.con.HSet(ps.ctx, addressFamily, encodedSeederInfoHash, now).Err(); err != nil {
return
}
err = ps.con.Incr(ps.ctx, infohashCountKey(addressFamily)).Err()
err = ps.con.Incr(ps.ctx, infoHashCountKey(addressFamily)).Err()
return
})
}
@@ -435,13 +437,12 @@ func (ps *store) PutSeeder(ih bittorrent.InfoHash, p bittorrent.Peer) error {
func (ps *store) DeleteSeeder(ih bittorrent.InfoHash, p bittorrent.Peer) error {
addressFamily := p.IP.AddressFamily.String()
log.Debug("storage: DeleteSeeder", log.Fields{
"InfoHash": ih.String(),
"InfoHash": ih,
"Peer": p,
})
peerKey := storage.NewSerializedPeer(p).String()
encodedSeederInfoHash := seederInfohashKey(addressFamily, ih.String())
deleted, err := ps.con.HDel(ps.ctx, encodedSeederInfoHash, peerKey).Uint64()
encodedSeederInfoHash := seederInfoHashKey(addressFamily, ih.RawString())
deleted, err := ps.con.HDel(ps.ctx, encodedSeederInfoHash, p.RawString()).Uint64()
err = asNil(err)
if err == nil {
if deleted == 0 {
@@ -457,17 +458,16 @@ func (ps *store) DeleteSeeder(ih bittorrent.InfoHash, p bittorrent.Peer) error {
func (ps *store) PutLeecher(ih bittorrent.InfoHash, p bittorrent.Peer) error {
addressFamily := p.IP.AddressFamily.String()
log.Debug("storage: PutLeecher", log.Fields{
"InfoHash": ih.String(),
"InfoHash": ih,
"Peer": p,
})
// Update the peer in the swarm.
encodedLeecherInfoHash := leecherInfohashKey(addressFamily, ih.String())
peerKey := storage.NewSerializedPeer(p).String()
encodedLeecherInfoHash := leecherInfoHashKey(addressFamily, ih.RawString())
now := ps.getClock()
return ps.tx(func(tx redis.Pipeliner) (err error) {
if err = tx.HSet(ps.ctx, encodedLeecherInfoHash, peerKey, now).Err(); err != nil {
if err = tx.HSet(ps.ctx, encodedLeecherInfoHash, p.RawString(), now).Err(); err != nil {
return
}
if err = tx.HSet(ps.ctx, addressFamily, encodedLeecherInfoHash, now).Err(); err != nil {
@@ -481,14 +481,13 @@ func (ps *store) PutLeecher(ih bittorrent.InfoHash, p bittorrent.Peer) error {
func (ps *store) DeleteLeecher(ih bittorrent.InfoHash, p bittorrent.Peer) error {
addressFamily := p.IP.AddressFamily.String()
log.Debug("storage: DeleteLeecher", log.Fields{
"InfoHash": ih.String(),
"InfoHash": ih,
"Peer": p,
})
peerKey := storage.NewSerializedPeer(p).String()
encodedLeecherInfoHash := leecherInfohashKey(addressFamily, ih.String())
encodedLeecherInfoHash := leecherInfoHashKey(addressFamily, ih.RawString())
deleted, err := ps.con.HDel(ps.ctx, encodedLeecherInfoHash, peerKey).Uint64()
deleted, err := ps.con.HDel(ps.ctx, encodedLeecherInfoHash, p.RawString()).Uint64()
err = asNil(err)
if err == nil {
if deleted == 0 {
@@ -504,14 +503,14 @@ func (ps *store) DeleteLeecher(ih bittorrent.InfoHash, p bittorrent.Peer) error
func (ps *store) GraduateLeecher(ih bittorrent.InfoHash, p bittorrent.Peer) error {
addressFamily := p.IP.AddressFamily.String()
log.Debug("storage: GraduateLeecher", log.Fields{
"InfoHash": ih.String(),
"InfoHash": ih,
"Peer": p,
})
encodedInfoHash := ih.String()
encodedLeecherInfoHash := leecherInfohashKey(addressFamily, encodedInfoHash)
encodedSeederInfoHash := seederInfohashKey(addressFamily, encodedInfoHash)
peerKey := storage.NewSerializedPeer(p).String()
encodedInfoHash := ih.RawString()
encodedLeecherInfoHash := leecherInfoHashKey(addressFamily, encodedInfoHash)
encodedSeederInfoHash := seederInfoHashKey(addressFamily, encodedInfoHash)
peerKey := p.RawString()
now := ps.getClock()
return ps.tx(func(tx redis.Pipeliner) error {
@@ -532,7 +531,7 @@ func (ps *store) GraduateLeecher(ih bittorrent.InfoHash, p bittorrent.Peer) erro
err = tx.HSet(ps.ctx, addressFamily, encodedSeederInfoHash, now).Err()
}
if err == nil {
err = tx.Incr(ps.ctx, infohashCountKey(addressFamily)).Err()
err = tx.Incr(ps.ctx, infoHashCountKey(addressFamily)).Err()
}
return err
})
@@ -541,15 +540,15 @@ func (ps *store) GraduateLeecher(ih bittorrent.InfoHash, p bittorrent.Peer) erro
func (ps *store) AnnouncePeers(ih bittorrent.InfoHash, seeder bool, numWant int, announcer bittorrent.Peer) (peers []bittorrent.Peer, err error) {
addressFamily := announcer.IP.AddressFamily.String()
log.Debug("storage: AnnouncePeers", log.Fields{
"InfoHash": ih.String(),
"InfoHash": ih,
"seeder": seeder,
"numWant": numWant,
"Peer": announcer,
})
encodedInfoHash := ih.String()
encodedLeecherInfoHash := leecherInfohashKey(addressFamily, encodedInfoHash)
encodedSeederInfoHash := seederInfohashKey(addressFamily, encodedInfoHash)
encodedInfoHash := ih.RawString()
encodedLeecherInfoHash := leecherInfoHashKey(addressFamily, encodedInfoHash)
encodedSeederInfoHash := seederInfoHashKey(addressFamily, encodedInfoHash)
leechers, err := ps.con.HKeys(ps.ctx, encodedLeecherInfoHash).Result()
err = asNil(err)
@@ -573,9 +572,12 @@ func (ps *store) AnnouncePeers(ih bittorrent.InfoHash, seeder bool, numWant int,
if numWant == 0 {
break
}
peers = append(peers, storage.SerializedPeer(peerKey).ToPeer())
numWant--
if p, err := bittorrent.NewPeer(peerKey); err == nil {
peers = append(peers, p)
numWant--
} else {
log.Error("storage: unable to decode leecher", log.Fields{"peer": peerKey})
}
}
} else {
// Append as many seeders as possible.
@@ -583,22 +585,28 @@ func (ps *store) AnnouncePeers(ih bittorrent.InfoHash, seeder bool, numWant int,
if numWant == 0 {
break
}
peers = append(peers, storage.SerializedPeer(peerKey).ToPeer())
numWant--
if p, err := bittorrent.NewPeer(peerKey); err == nil {
peers = append(peers, p)
numWant--
} else {
log.Error("storage: unable to decode seeder", log.Fields{"peer": peerKey})
}
}
// Append leechers until we reach numWant.
if numWant > 0 {
announcerPK := storage.NewSerializedPeer(announcer).String()
announcerPK := announcer.RawString()
for _, peerKey := range leechers {
if peerKey != announcerPK {
if numWant == 0 {
break
}
peers = append(peers, storage.SerializedPeer(peerKey).ToPeer())
numWant--
if p, err := bittorrent.NewPeer(peerKey); err == nil {
peers = append(peers, p)
numWant--
} else {
log.Error("storage: unable to decode leecher", log.Fields{"peer": peerKey})
}
}
}
}
@@ -610,9 +618,9 @@ func (ps *store) AnnouncePeers(ih bittorrent.InfoHash, seeder bool, numWant int,
func (ps *store) ScrapeSwarm(ih bittorrent.InfoHash, af bittorrent.AddressFamily) (resp bittorrent.Scrape) {
resp.InfoHash = ih
addressFamily := af.String()
encodedInfoHash := ih.String()
encodedLeecherInfoHash := leecherInfohashKey(addressFamily, encodedInfoHash)
encodedSeederInfoHash := seederInfohashKey(addressFamily, encodedInfoHash)
encodedInfoHash := ih.RawString()
encodedLeecherInfoHash := leecherInfoHashKey(addressFamily, encodedInfoHash)
encodedSeederInfoHash := seederInfoHashKey(addressFamily, encodedInfoHash)
leechersLen, err := ps.con.HLen(ps.ctx, encodedLeecherInfoHash).Result()
err = asNil(err)
@@ -640,70 +648,61 @@ func (ps *store) ScrapeSwarm(ih bittorrent.InfoHash, af bittorrent.AddressFamily
return
}
func (ps *store) Put(ctx string, key, value any) {
err := ps.con.HSet(ps.ctx, ctx, key, value).Err()
if err != nil {
panic(err)
}
func (ps *store) Put(ctx string, value storage.Entry) error {
return ps.con.HSet(ps.ctx, ctx, value.Key, value.Value).Err()
}
func (ps *store) Contains(ctx string, key any) bool {
func (ps *store) Contains(ctx string, key string) (bool, error) {
exist, err := ps.con.HExists(ps.ctx, ctx, key).Result()
if err != nil {
panic(err)
}
return exist
return exist, asNil(err)
}
const argNumErrorMsg = "ERR wrong number of arguments"
func (ps *store) BulkPut(ctx string, pairs ...storage.Pair) {
func (ps *store) BulkPut(ctx string, pairs ...storage.Entry) (err error) {
if l := len(pairs); l > 0 {
args := make([]any, 0, l*2)
for _, p := range pairs {
args = append(args, p.Left, p.Right)
args = append(args, p.Key, p.Value)
}
err := ps.con.HSet(ps.ctx, ctx, args...).Err()
err = ps.con.HSet(ps.ctx, ctx, args...).Err()
if err != nil {
if strings.Contains(err.Error(), argNumErrorMsg) {
log.Warn("This REDIS version/implementation does not support variadic arguments for HSET")
for _, p := range pairs {
if err := ps.con.HSet(ps.ctx, ctx, p.Left, p.Right).Err(); err != nil {
panic(err)
if err = ps.con.HSet(ps.ctx, ctx, p.Key, p.Value).Err(); err != nil {
break
}
}
} else {
panic(err)
}
}
}
return
}
func (ps *store) Load(ctx string, key any) any {
v, err := ps.con.HGet(ps.ctx, ctx, key).Result()
err = asNil(err)
if err != nil {
panic(err)
func (ps *store) Load(ctx string, key string) (v any, err error) {
v, err = ps.con.HGet(ps.ctx, ctx, key).Result()
if err != nil && errors.Is(err, redis.Nil) {
v, err = nil, nil
}
return v
return
}
func (ps *store) Delete(ctx string, keys ...any) {
func (ps *store) Delete(ctx string, keys ...string) (err error) {
if len(keys) > 0 {
err := asNil(ps.con.HDel(ps.ctx, keys...).Err())
err = asNil(ps.con.HDel(ps.ctx, ctx, keys...).Err())
if err != nil {
if strings.Contains(err.Error(), argNumErrorMsg) {
log.Warn("This REDIS version/implementation does not support variadic arguments for HDEL")
for _, k := range keys {
if err := asNil(ps.con.HDel(ps.ctx, ctx, k).Err()); err != nil {
panic(err)
if err = asNil(ps.con.HDel(ps.ctx, ctx, k).Err()); err != nil {
break
}
}
} else {
panic(err)
}
}
}
return
}
// collectGarbage deletes all Peers from the Storage which are older than the
@@ -751,108 +750,125 @@ func (ps *store) Delete(ctx string, keys ...any) {
// - If the change happens after the HLEN, we will not even attempt to make the
// transaction. The infohash key will remain in the addressFamil hash and
// we'll attempt to clean it up the next time collectGarbage runs.
func (ps *store) collectGarbage(cutoff time.Time) error {
func (ps *store) collectGarbage(cutoff time.Time) {
cutoffUnix := cutoff.UnixNano()
start := time.Now()
var err error
for _, group := range groups {
// list all infohashes in the group
infohashesList, err := ps.con.HKeys(ps.ctx, group).Result()
// list all infoHashes in the group
var infoHashes []string
infoHashes, err = ps.con.HKeys(ps.ctx, group).Result()
err = asNil(err)
if err != nil {
return err
}
for _, ihStr := range infohashesList {
isSeeder := len(ihStr) > 5 && ihStr[5:6] == "S"
// list all (peer, timeout) pairs for the ih
ihList, err := ps.con.HGetAll(ps.ctx, ihStr).Result()
err = asNil(err)
if err != nil {
return err
}
var removedPeerCount int64
for k, v := range ihList {
peerKey := storage.SerializedPeer(k)
mtime, err := strconv.ParseInt(v, 10, 64)
if err != nil {
return err
}
if mtime <= cutoffUnix {
log.Debug("storage: deleting peer", log.Fields{
"Peer": peerKey.ToPeer(),
})
ret, err := ps.con.HDel(ps.ctx, ihStr, peerKey.String()).Result()
if err != nil {
return err
if err == nil {
for _, infoHash := range infoHashes {
isSeeder := len(infoHash) > 5 && infoHash[5:6] == "S"
// list all (peer, timeout) pairs for the ih
peerList, err := ps.con.HGetAll(ps.ctx, infoHash).Result()
err = asNil(err)
if err == nil {
var removedPeerCount int64
for peerKey, timeStamp := range peerList {
var peer bittorrent.Peer
if peer, err = bittorrent.NewPeer(peerKey); err == nil {
if mtime, err := strconv.ParseInt(timeStamp, 10, 64); err == nil {
if mtime <= cutoffUnix {
log.Debug("storage: deleting peer", log.Fields{
"Peer": peer,
})
var count int64
count, err = ps.con.HDel(ps.ctx, infoHash, peerKey).Result()
err = asNil(err)
if err == nil {
removedPeerCount += count
}
}
}
}
if err != nil {
log.Error("storage: Redis: unable to delete info hash peer", log.Fields{
"group": group,
"infoHash": infoHash,
"peer": peer,
"key": peerKey,
"error": err,
})
}
}
// DECR seeder/leecher counter
if removedPeerCount > 0 {
var decrCounter string
if isSeeder {
decrCounter = seederCountKey(group)
} else {
decrCounter = leecherCountKey(group)
}
if err := ps.con.DecrBy(ps.ctx, decrCounter, removedPeerCount).Err(); err != nil {
log.Error("storage: Redis: unable to decrement seeder/leecher peer count", log.Fields{
"group": group,
"infoHash": infoHash,
"key": decrCounter,
"error": err,
})
}
}
removedPeerCount += ret
}
}
// DECR seeder/leecher counter
decrCounter := leecherCountKey(group)
if isSeeder {
decrCounter = seederCountKey(group)
}
if removedPeerCount > 0 {
if err := ps.con.DecrBy(ps.ctx, decrCounter, removedPeerCount).Err(); err != nil {
return err
}
}
// use WATCH to avoid race condition
// https://redis.io/topics/transactions
_, err = ps.con.Watch(ps.ctx, ihStr)
if err != nil {
return err
}
ihLen, err := ps.con.HLen(ps.ctx, ihStr).Result()
if err != nil {
return err
}
if ihLen == 0 {
// Empty hashes are not shown among existing keys,
// in other words, it's removed automatically after `HDEL` the last field.
// _, err := ps.con.Del(ps.ctx, ihStr)
_ = ps.con.Multi(ps.ctx)
_ = ps.con.HDel(ps.ctx, group, ihStr)
if isSeeder {
_ = ps.con.Decr(ps.ctx, infohashCountKey(group))
}
_, err = redis.Values(ps.con.Exec(ps.ctx))
if err != nil && !errors.Is(err, redis.Nil) {
log.Error("storage: Redis EXEC failure", log.Fields{
// use WATCH to avoid race condition
// https://redis.io/topics/transactions
err = asNil(ps.con.Watch(ps.ctx, func(tx *redis.Tx) (err error) {
var infoHashCount int64
infoHashCount, err = ps.con.HLen(ps.ctx, infoHash).Result()
err = asNil(err)
if err == nil && infoHashCount == 0 {
// Empty hashes are not shown among existing keys,
// in other words, it's removed automatically after `HDEL` the last field.
// _, err := ps.con.Del(ps.ctx, infoHash)
var deletedCount int64
deletedCount, err = ps.con.HDel(ps.ctx, group, infoHash).Result()
err = asNil(err)
if err == nil && isSeeder && deletedCount > 0 {
err = ps.con.Decr(ps.ctx, infoHashCountKey(group)).Err()
}
}
return err
}, infoHash))
if err != nil {
log.Error("storage: Redis: unable to clean info hash records", log.Fields{
"group": group,
"infoHash": infoHash,
"error": err,
})
}
} else {
log.Error("storage: Redis: unable to fetch info hash peers", log.Fields{
"group": group,
"infohash": ihStr,
"infoHash": infoHash,
"error": err,
})
}
} else {
if _, err = ps.con.Unwatch(ps.ctx); err != nil && !errors.Is(err, redis.Nil) {
log.Error("storage: Redis UNWATCH failure", log.Fields{"error": err})
}
}
} else {
log.Error("storage: Redis: unable to fetch info hashes", log.Fields{"group": group, "error": err})
}
}
duration := float64(time.Since(start).Nanoseconds()) / float64(time.Millisecond)
duration := time.Since(start).Milliseconds()
log.Debug("storage: recordGCDuration", log.Fields{"timeTaken(ms)": duration})
storage.PromGCDurationMilliseconds.Observe(duration)
return nil
storage.PromGCDurationMilliseconds.Observe(float64(duration))
}
func (ps *store) Stop() stop.Result {
c := make(stop.Channel)
go func() {
close(ps.closed)
if ps.closed != nil {
close(ps.closed)
}
ps.wg.Wait()
log.Info("storage: exiting. mochi does not clear data in redis when exiting. mochi keys have prefix 'IPv{4,6}_'.")
c.Done()
var err error
if ps.con != nil {
log.Info("storage: exiting. mochi does not clear data in redis when exiting. mochi keys have prefix 'IPv{4,6}_'.")
err = ps.con.Close()
}
c.Done(err)
}()
return c.Result()