mirror of
https://github.com/sot-tech/mochi.git
synced 2026-07-02 23:08:58 -07:00
(untested) merge ipv4 and ipv6 peers and pass v4 peers from db to v6 requester
This commit is contained in:
+136
-257
@@ -2,21 +2,20 @@
|
||||
// BitTorrent tracker keeping peer data in redis with hash.
|
||||
// There two categories of hash:
|
||||
//
|
||||
// - CHI_{4,6}_{L,S}_<HASH> (hash type)
|
||||
// - CHI_{L,S}_<HASH> (hash type)
|
||||
// To save peers that hold the infohash, used for fast searching,
|
||||
// deleting, and timeout handling
|
||||
//
|
||||
// - CHI_{4,6}_I (set type)
|
||||
// - CHI_I (set type)
|
||||
// To save all the infohashes, used for garbage collection,
|
||||
// metrics aggregation and leecher graduation
|
||||
//
|
||||
// Tree keys are used to record the count of swarms, seeders
|
||||
// and leechers for each group (IPv4, IPv6).
|
||||
// Tree keys are used to record the count of seeders and leechers.
|
||||
//
|
||||
// - CHI_{4,6}_S_C (key type)
|
||||
// - CHI_C_S (key type)
|
||||
// To record the number of seeders.
|
||||
//
|
||||
// - CHI_{4,6}_L_C (key type)
|
||||
// - CHI_C_L (key type)
|
||||
// To record the number of leechers.
|
||||
package redis
|
||||
|
||||
@@ -52,16 +51,11 @@ const (
|
||||
defaultWriteTimeout = time.Second * 15
|
||||
defaultConnectTimeout = time.Second * 15
|
||||
prefixKey = "CHI_"
|
||||
ih4Key = "CHI_4_I"
|
||||
ih6Key = "CHI_6_I"
|
||||
ih4SeederKey = "CHI_4_S_"
|
||||
ih6SeederKey = "CHI_6_S_"
|
||||
ih4LeecherKey = "CHI_4_L_"
|
||||
ih6LeecherKey = "CHI_6_L_"
|
||||
cnt4SeederKey = "CHI_4_C_S"
|
||||
cnt6SeederKey = "CHI_6_C_S"
|
||||
cnt4LeecherKey = "CHI_4_C_L"
|
||||
cnt6LeecherKey = "CHI_6_C_L"
|
||||
ihKey = "CHI_I"
|
||||
ihSeederKey = "CHI_S_"
|
||||
ihLeecherKey = "CHI_L_"
|
||||
cntSeederKey = "CHI_C_S"
|
||||
cntLeecherKey = "CHI_C_L"
|
||||
)
|
||||
|
||||
// ErrSentinelAndClusterChecked returned from initializer if both Config.Sentinel and Config.Cluster provided
|
||||
@@ -246,6 +240,13 @@ func connect(cfg Config) (*store, error) {
|
||||
}
|
||||
if err = db.con.Ping(db.ctx).Err(); err == nil && !errors.Is(err, redis.Nil) {
|
||||
err = nil
|
||||
res, err := db.con.Do(db.ctx, "command", "info", "keydb.cron").Result()
|
||||
err = asNil(err)
|
||||
if err != nil {
|
||||
log.Warn("storage: Redis: unable to determine if current instance is KeyDB", log.Fields{"Error": err})
|
||||
} else {
|
||||
db.isKeyDB = res != nil
|
||||
}
|
||||
} else {
|
||||
_ = db.con.Close()
|
||||
db = nil
|
||||
@@ -326,6 +327,7 @@ type store struct {
|
||||
closed chan any
|
||||
wg sync.WaitGroup
|
||||
logFields log.Fields
|
||||
isKeyDB bool
|
||||
}
|
||||
|
||||
func (ps *store) count(key string, getLength bool) (n uint64) {
|
||||
@@ -339,7 +341,7 @@ func (ps *store) count(key string, getLength bool) (n uint64) {
|
||||
if err != nil {
|
||||
log.Error("storage: Redis: GET/SCARD failure", log.Fields{
|
||||
"key": key,
|
||||
"error": err,
|
||||
"Error": err,
|
||||
})
|
||||
}
|
||||
return
|
||||
@@ -348,25 +350,13 @@ func (ps *store) count(key string, getLength bool) (n uint64) {
|
||||
// populateProm aggregates metrics over all groups and then posts them to
|
||||
// prometheus.
|
||||
func (ps *store) populateProm() {
|
||||
numInfoHashes, numSeeders, numLeechers := new(uint64), new(uint64), new(uint64)
|
||||
fetchFn := func(v6 bool) {
|
||||
var cntSeederKey, cntLeecherKey, ihSummaryKey string
|
||||
if v6 {
|
||||
cntSeederKey, cntLeecherKey, ihSummaryKey = cnt6SeederKey, cnt6LeecherKey, ih6Key
|
||||
} else {
|
||||
cntSeederKey, cntLeecherKey, ihSummaryKey = cnt4SeederKey, cnt4LeecherKey, ih4Key
|
||||
}
|
||||
*numInfoHashes += ps.count(ihSummaryKey, true)
|
||||
*numSeeders += ps.count(cntSeederKey, false)
|
||||
*numLeechers += ps.count(cntLeecherKey, false)
|
||||
}
|
||||
numInfoHashes := ps.count(ihKey, true)
|
||||
numSeeders := ps.count(cntSeederKey, false)
|
||||
numLeechers := ps.count(cntLeecherKey, false)
|
||||
|
||||
fetchFn(false)
|
||||
fetchFn(true)
|
||||
|
||||
storage.PromInfoHashesCount.Set(float64(*numInfoHashes))
|
||||
storage.PromSeedersCount.Set(float64(*numSeeders))
|
||||
storage.PromLeechersCount.Set(float64(*numLeechers))
|
||||
storage.PromInfoHashesCount.Set(float64(numInfoHashes))
|
||||
storage.PromSeedersCount.Set(float64(numSeeders))
|
||||
storage.PromLeechersCount.Set(float64(numLeechers))
|
||||
}
|
||||
|
||||
func (ps *store) getClock() int64 {
|
||||
@@ -397,128 +387,70 @@ func asNil(err error) error {
|
||||
return err
|
||||
}
|
||||
|
||||
func (ps *store) PutSeeder(ih bittorrent.InfoHash, peer bittorrent.Peer) error {
|
||||
var ihSummaryKey, ihPeerKey, cntPeerKey string
|
||||
log.Debug("storage: Redis: PutSeeder", log.Fields{
|
||||
"InfoHash": ih,
|
||||
"Peer": peer,
|
||||
func (ps *store) putPeer(infoHashKey, peerCountKey, peerId string) error {
|
||||
log.Debug("storage: Redis: PutPeer", log.Fields{
|
||||
"InfoHashKey": infoHashKey,
|
||||
"PeerCountKey": peerCountKey,
|
||||
"PeerId": peerId,
|
||||
})
|
||||
if peer.Addr().Is6() {
|
||||
ihSummaryKey, ihPeerKey, cntPeerKey = ih6Key, ih6SeederKey, cnt6SeederKey
|
||||
} else {
|
||||
ihSummaryKey, ihPeerKey, cntPeerKey = ih4Key, ih4SeederKey, cnt4SeederKey
|
||||
}
|
||||
ihPeerKey += ih.RawString()
|
||||
|
||||
return ps.tx(func(tx redis.Pipeliner) (err error) {
|
||||
if err = tx.HSet(ps.ctx, ihPeerKey, peer.RawString(), ps.getClock()).Err(); err != nil {
|
||||
if err = tx.HSet(ps.ctx, infoHashKey, peerId, ps.getClock()).Err(); err != nil {
|
||||
return
|
||||
}
|
||||
if err = tx.Incr(ps.ctx, cntPeerKey).Err(); err != nil {
|
||||
if err = tx.Incr(ps.ctx, peerCountKey).Err(); err != nil {
|
||||
return
|
||||
}
|
||||
err = tx.SAdd(ps.ctx, ihSummaryKey, ihPeerKey).Err()
|
||||
err = tx.SAdd(ps.ctx, ihKey, infoHashKey).Err()
|
||||
return
|
||||
})
|
||||
}
|
||||
|
||||
func (ps *store) delPeer(infoHashKey, peerCountKey, peerId string) error {
|
||||
log.Debug("storage: Redis: DeletePeer", log.Fields{
|
||||
"InfoHashKey": infoHashKey,
|
||||
"PeerCountKey": peerCountKey,
|
||||
"PeerId": peerId,
|
||||
})
|
||||
deleted, err := ps.con.HDel(ps.ctx, infoHashKey, peerId).Uint64()
|
||||
err = asNil(err)
|
||||
if err == nil {
|
||||
if deleted == 0 {
|
||||
err = storage.ErrResourceDoesNotExist
|
||||
} else {
|
||||
err = ps.con.Decr(ps.ctx, peerCountKey).Err()
|
||||
}
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
func (ps *store) PutSeeder(ih bittorrent.InfoHash, peer bittorrent.Peer) error {
|
||||
return ps.putPeer(ihSeederKey+ih.RawString(), cntSeederKey, peer.RawString())
|
||||
}
|
||||
|
||||
func (ps *store) DeleteSeeder(ih bittorrent.InfoHash, peer bittorrent.Peer) error {
|
||||
var ihPeerKey, cntPeerKey string
|
||||
log.Debug("storage: Redis: DeleteSeeder", log.Fields{
|
||||
"InfoHash": ih,
|
||||
"Peer": peer,
|
||||
})
|
||||
if peer.Addr().Is6() {
|
||||
ihPeerKey, cntPeerKey = ih6SeederKey, cnt6SeederKey
|
||||
} else {
|
||||
ihPeerKey, cntPeerKey = ih4SeederKey, cnt4SeederKey
|
||||
}
|
||||
ihPeerKey += ih.RawString()
|
||||
|
||||
deleted, err := ps.con.HDel(ps.ctx, ihPeerKey, peer.RawString()).Uint64()
|
||||
err = asNil(err)
|
||||
if err == nil {
|
||||
if deleted == 0 {
|
||||
err = storage.ErrResourceDoesNotExist
|
||||
} else {
|
||||
err = ps.con.Decr(ps.ctx, cntPeerKey).Err()
|
||||
}
|
||||
}
|
||||
|
||||
return err
|
||||
return ps.delPeer(ihSeederKey+ih.RawString(), cntSeederKey, peer.RawString())
|
||||
}
|
||||
|
||||
func (ps *store) PutLeecher(ih bittorrent.InfoHash, peer bittorrent.Peer) error {
|
||||
var ihSummaryKey, ihPeerKey, cntPeerKey string
|
||||
log.Debug("storage: Redis: PutLeecher", log.Fields{
|
||||
"InfoHash": ih,
|
||||
"Peer": peer,
|
||||
})
|
||||
if peer.Addr().Is6() {
|
||||
ihSummaryKey, ihPeerKey, cntPeerKey = ih6Key, ih6LeecherKey, cnt6LeecherKey
|
||||
} else {
|
||||
ihSummaryKey, ihPeerKey, cntPeerKey = ih4Key, ih4LeecherKey, cnt4LeecherKey
|
||||
}
|
||||
ihPeerKey += ih.RawString()
|
||||
|
||||
return ps.tx(func(tx redis.Pipeliner) (err error) {
|
||||
if err = tx.HSet(ps.ctx, ihPeerKey, peer.RawString(), ps.getClock()).Err(); err != nil {
|
||||
return
|
||||
}
|
||||
if err = tx.Incr(ps.ctx, cntPeerKey).Err(); err != nil {
|
||||
return err
|
||||
}
|
||||
err = tx.SAdd(ps.ctx, ihSummaryKey, ihPeerKey).Err()
|
||||
return
|
||||
})
|
||||
return ps.putPeer(ihLeecherKey+ih.RawString(), cntLeecherKey, peer.RawString())
|
||||
}
|
||||
|
||||
func (ps *store) DeleteLeecher(ih bittorrent.InfoHash, peer bittorrent.Peer) error {
|
||||
var ihPeerKey, cntPeerKey string
|
||||
log.Debug("storage: Redis: DeleteLeecher", log.Fields{
|
||||
"InfoHash": ih,
|
||||
"Peer": peer,
|
||||
})
|
||||
|
||||
if peer.Addr().Is6() {
|
||||
ihPeerKey, cntPeerKey = ih6LeecherKey, cnt6LeecherKey
|
||||
} else {
|
||||
ihPeerKey, cntPeerKey = ih4LeecherKey, cnt4LeecherKey
|
||||
}
|
||||
ihPeerKey += ih.RawString()
|
||||
|
||||
deleted, err := ps.con.HDel(ps.ctx, ihPeerKey, peer.RawString()).Uint64()
|
||||
err = asNil(err)
|
||||
if err == nil {
|
||||
if deleted == 0 {
|
||||
err = storage.ErrResourceDoesNotExist
|
||||
} else {
|
||||
err = ps.con.Decr(ps.ctx, cntPeerKey).Err()
|
||||
}
|
||||
}
|
||||
|
||||
return err
|
||||
return ps.delPeer(ihLeecherKey+ih.RawString(), cntLeecherKey, peer.RawString())
|
||||
}
|
||||
|
||||
func (ps *store) GraduateLeecher(ih bittorrent.InfoHash, peer bittorrent.Peer) error {
|
||||
var ihSummaryKey, ihSeederKey, ihLeecherKey, cntSeederKey, cntLeecherKey string
|
||||
log.Debug("storage: Redis: GraduateLeecher", log.Fields{
|
||||
"InfoHash": ih,
|
||||
"Peer": peer,
|
||||
})
|
||||
|
||||
if peer.Addr().Is6() {
|
||||
ihSummaryKey, ihSeederKey, cntSeederKey = ih6Key, ih6SeederKey, cnt6SeederKey
|
||||
ihLeecherKey, cntLeecherKey = ih6LeecherKey, cnt6LeecherKey
|
||||
} else {
|
||||
ihSummaryKey, ihSeederKey, cntSeederKey = ih4Key, ih4SeederKey, cnt4SeederKey
|
||||
ihLeecherKey, cntLeecherKey = ih4LeecherKey, cnt4LeecherKey
|
||||
}
|
||||
infoHash, peerKey := ih.RawString(), peer.RawString()
|
||||
ihSeederKey, ihLeecherKey = ihSeederKey+infoHash, ihLeecherKey+infoHash
|
||||
infoHash, peerId := ih.RawString(), peer.RawString()
|
||||
ihSeederKey, ihLeecherKey := ihSeederKey+infoHash, ihLeecherKey+infoHash
|
||||
|
||||
return ps.tx(func(tx redis.Pipeliner) error {
|
||||
deleted, err := tx.HDel(ps.ctx, ihLeecherKey, peerKey).Uint64()
|
||||
deleted, err := tx.HDel(ps.ctx, ihLeecherKey, peerId).Uint64()
|
||||
err = asNil(err)
|
||||
if err == nil {
|
||||
if deleted > 0 {
|
||||
@@ -526,94 +458,71 @@ func (ps *store) GraduateLeecher(ih bittorrent.InfoHash, peer bittorrent.Peer) e
|
||||
}
|
||||
}
|
||||
if err == nil {
|
||||
err = tx.HSet(ps.ctx, ihSeederKey, peerKey, ps.getClock()).Err()
|
||||
err = tx.HSet(ps.ctx, ihSeederKey, peerId, ps.getClock()).Err()
|
||||
}
|
||||
if err == nil {
|
||||
err = tx.Incr(ps.ctx, cntSeederKey).Err()
|
||||
}
|
||||
if err == nil {
|
||||
err = tx.SAdd(ps.ctx, ihSummaryKey, ihSeederKey).Err()
|
||||
err = tx.SAdd(ps.ctx, ihKey, ihSeederKey).Err()
|
||||
}
|
||||
return err
|
||||
})
|
||||
}
|
||||
|
||||
func (ps *store) getPeers(infoHashKey, except string, max int, v4Only bool) (peers []bittorrent.Peer, err error) {
|
||||
var peerIds []string
|
||||
peerIds, err = ps.con.HKeys(ps.ctx, infoHashKey).Result()
|
||||
if err = asNil(err); err == nil {
|
||||
for _, peerId := range peerIds {
|
||||
if peerId != except {
|
||||
if p, err := bittorrent.NewPeer(peerId); err == nil {
|
||||
// If peer from request is V4 only, it won't receive V6 peers from DB
|
||||
if !(v4Only && p.Addr().Is6()) {
|
||||
peers = append(peers, p)
|
||||
max--
|
||||
}
|
||||
} else {
|
||||
log.Error("storage: Redis: unable to decode leecher", log.Fields{"PeerId": peerId})
|
||||
}
|
||||
}
|
||||
if max == 0 {
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (ps *store) AnnouncePeers(ih bittorrent.InfoHash, seeder bool, numWant int, peer bittorrent.Peer) (peers []bittorrent.Peer, err error) {
|
||||
var ihSeederKey, ihLeecherKey string
|
||||
log.Debug("storage: Redis: AnnouncePeers", log.Fields{
|
||||
"InfoHash": ih,
|
||||
"seeder": seeder,
|
||||
"numWant": numWant,
|
||||
"Seeder": seeder,
|
||||
"NumWant": numWant,
|
||||
"Peer": peer,
|
||||
})
|
||||
|
||||
if peer.Addr().Is6() {
|
||||
ihSeederKey, ihLeecherKey = ih6SeederKey, ih6LeecherKey
|
||||
} else {
|
||||
ihSeederKey, ihLeecherKey = ih4SeederKey, ih4LeecherKey
|
||||
}
|
||||
infoHash := ih.RawString()
|
||||
ihSeederKey, ihLeecherKey = ihSeederKey+infoHash, ihLeecherKey+infoHash
|
||||
|
||||
leechers, err := ps.con.HKeys(ps.ctx, ihLeecherKey).Result()
|
||||
err = asNil(err)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
seeders, err := ps.con.HKeys(ps.ctx, ihSeederKey).Result()
|
||||
err = asNil(err)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if len(leechers) == 0 && len(seeders) == 0 {
|
||||
return nil, storage.ErrResourceDoesNotExist
|
||||
}
|
||||
infoHash, peerId, isV4 := ih.RawString(), peer.RawString(), peer.Addr().Is4()
|
||||
|
||||
if seeder {
|
||||
// Append leechers as possible.
|
||||
for _, peerKey := range leechers {
|
||||
if numWant == 0 {
|
||||
break
|
||||
}
|
||||
if p, err := bittorrent.NewPeer(peerKey); err == nil {
|
||||
peers = append(peers, p)
|
||||
numWant--
|
||||
} else {
|
||||
log.Error("storage: Redis: unable to decode leecher", log.Fields{"peer": peerKey})
|
||||
}
|
||||
}
|
||||
peers, err = ps.getPeers(ihLeecherKey+infoHash, peerId, numWant, isV4)
|
||||
} else {
|
||||
// Append as many seeders as possible.
|
||||
for _, peerKey := range seeders {
|
||||
if numWant == 0 {
|
||||
break
|
||||
}
|
||||
if p, err := bittorrent.NewPeer(peerKey); err == nil {
|
||||
peers = append(peers, p)
|
||||
numWant--
|
||||
peers, err = ps.getPeers(ihSeederKey+infoHash, peerId, numWant, isV4)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
if numWant -= len(peers); numWant > 0 {
|
||||
if leechers, err := ps.getPeers(ihLeecherKey+infoHash, peerId, numWant, isV4); err == nil {
|
||||
peers = append(peers, leechers...)
|
||||
} else {
|
||||
log.Error("storage: Redis: to decode seeder", log.Fields{"peer": peerKey})
|
||||
log.Warn("storage: Redis: error occurred while receiving leechers", log.Fields{"InfoHash": ih, "Error": err})
|
||||
}
|
||||
}
|
||||
|
||||
// Append leechers until we reach numWant.
|
||||
if numWant > 0 {
|
||||
announcerPK := peer.RawString()
|
||||
for _, peerKey := range leechers {
|
||||
if peerKey != announcerPK {
|
||||
if numWant == 0 {
|
||||
break
|
||||
}
|
||||
if p, err := bittorrent.NewPeer(peerKey); err == nil {
|
||||
peers = append(peers, p)
|
||||
numWant--
|
||||
} else {
|
||||
log.Error("storage: Redis: unable to decode leecher", log.Fields{"peer": peerKey})
|
||||
}
|
||||
}
|
||||
}
|
||||
if len(peers) == 0 && !isV4 {
|
||||
err = storage.ErrResourceDoesNotExist
|
||||
}
|
||||
}
|
||||
|
||||
@@ -621,26 +530,20 @@ func (ps *store) AnnouncePeers(ih bittorrent.InfoHash, seeder bool, numWant int,
|
||||
}
|
||||
|
||||
func (ps *store) ScrapeSwarm(ih bittorrent.InfoHash, peer bittorrent.Peer) (resp bittorrent.Scrape) {
|
||||
var ihSeederKey, ihLeecherKey string
|
||||
log.Debug("storage: Redis ScrapeSwarm", log.Fields{
|
||||
"InfoHash": ih,
|
||||
"Peer": peer,
|
||||
})
|
||||
resp.InfoHash = ih
|
||||
if peer.Addr().Is6() {
|
||||
ihSeederKey, ihLeecherKey = ih6SeederKey, ih6LeecherKey
|
||||
} else {
|
||||
ihSeederKey, ihLeecherKey = ih4SeederKey, ih4LeecherKey
|
||||
}
|
||||
infoHash := ih.RawString()
|
||||
ihSeederKey, ihLeecherKey = ihSeederKey+infoHash, ihLeecherKey+infoHash
|
||||
ihSeederKey, ihLeecherKey := ihSeederKey+infoHash, ihLeecherKey+infoHash
|
||||
|
||||
leechersLen, err := ps.con.HLen(ps.ctx, ihLeecherKey).Result()
|
||||
err = asNil(err)
|
||||
if err != nil {
|
||||
log.Error("storage: Redis: HLEN failure", log.Fields{
|
||||
"Hkey": ihLeecherKey,
|
||||
"error": err,
|
||||
"InfoHashKey": ihLeecherKey,
|
||||
"Error": err,
|
||||
})
|
||||
return
|
||||
}
|
||||
@@ -649,8 +552,8 @@ func (ps *store) ScrapeSwarm(ih bittorrent.InfoHash, peer bittorrent.Peer) (resp
|
||||
err = asNil(err)
|
||||
if err != nil {
|
||||
log.Error("storage: Redis: HLEN failure", log.Fields{
|
||||
"Hkey": ihSeederKey,
|
||||
"error": err,
|
||||
"InfoHashKey": ihSeederKey,
|
||||
"Error": err,
|
||||
})
|
||||
return
|
||||
}
|
||||
@@ -722,14 +625,7 @@ func (*store) Preservable() bool {
|
||||
return true
|
||||
}
|
||||
|
||||
func (ps *store) GC(cutoff time.Time) {
|
||||
log.Debug("storage: Redis: purging peers with no announces since", log.Fields{"before": cutoff})
|
||||
cutoffUnix := cutoff.UnixNano()
|
||||
ps.gc(cutoffUnix, false)
|
||||
ps.gc(cutoffUnix, true)
|
||||
}
|
||||
|
||||
// gc deletes all Peers from the PeerStorage which are older than the
|
||||
// GC deletes all Peers from the PeerStorage which are older than the
|
||||
// cutoff time.
|
||||
//
|
||||
// This function must be able to execute while other methods on this interface
|
||||
@@ -774,17 +670,11 @@ func (ps *store) GC(cutoff time.Time) {
|
||||
// - If the change happens after the HLEN, we will not even attempt to make the
|
||||
// transaction. The infohash key will remain in the addressFamil hash and
|
||||
// we'll attempt to clean it up the next time gc runs.
|
||||
func (ps *store) gc(cutoffNanos int64, v6 bool) {
|
||||
func (ps *store) GC(cutoff time.Time) {
|
||||
log.Debug("storage: Redis: purging peers with no announces since", log.Fields{"before": cutoff})
|
||||
cutoffNanos := cutoff.UnixNano()
|
||||
// list all infoHashKeys in the group
|
||||
var ihSummaryKey, ihSeederKey, ihLeecherKey, cntSeederKey, cntLeecherKey string
|
||||
if v6 {
|
||||
cntSeederKey, cntLeecherKey = cnt6SeederKey, cnt6LeecherKey
|
||||
ihSummaryKey, ihSeederKey, ihLeecherKey = ih6Key, ih6SeederKey, ih6LeecherKey
|
||||
} else {
|
||||
cntSeederKey, cntLeecherKey = cnt4SeederKey, cnt4LeecherKey
|
||||
ihSummaryKey, ihSeederKey, ihLeecherKey = ih4Key, ih4SeederKey, ih4LeecherKey
|
||||
}
|
||||
infoHashKeys, err := ps.con.SMembers(ps.ctx, ihSummaryKey).Result()
|
||||
infoHashKeys, err := ps.con.SMembers(ps.ctx, ihKey).Result()
|
||||
err = asNil(err)
|
||||
if err == nil {
|
||||
for _, infoHashKey := range infoHashKeys {
|
||||
@@ -795,10 +685,7 @@ func (ps *store) gc(cutoffNanos int64, v6 bool) {
|
||||
} else if strings.HasPrefix(infoHashKey, ihLeecherKey) {
|
||||
cntKey = cntLeecherKey
|
||||
} else {
|
||||
log.Warn("storage: Redis: unexpected record found in info hash set", log.Fields{
|
||||
"hashSet": ihSummaryKey,
|
||||
"infoHashKey": infoHashKey,
|
||||
})
|
||||
log.Warn("storage: Redis: unexpected record found in info hash set", log.Fields{"InfoHashKey": infoHashKey})
|
||||
continue
|
||||
}
|
||||
// list all (peer, timeout) pairs for the ih
|
||||
@@ -806,21 +693,18 @@ func (ps *store) gc(cutoffNanos int64, v6 bool) {
|
||||
err = asNil(err)
|
||||
if err == nil {
|
||||
peersToRemove := make([]string, 0)
|
||||
for peerKey, timeStamp := range peerList {
|
||||
for peerId, timeStamp := range peerList {
|
||||
if mtime, err := strconv.ParseInt(timeStamp, 10, 64); err == nil {
|
||||
if mtime <= cutoffNanos {
|
||||
log.Debug("storage: Redis: adding peer to remove list", log.Fields{
|
||||
"key": peerKey,
|
||||
})
|
||||
peersToRemove = append(peersToRemove, peerKey)
|
||||
log.Debug("storage: Redis: adding peer to remove list", log.Fields{"PeerId": peerId})
|
||||
peersToRemove = append(peersToRemove, peerId)
|
||||
}
|
||||
} else {
|
||||
log.Error("storage: Redis: unable to decode peer timestamp", log.Fields{
|
||||
"hashSet": ihSummaryKey,
|
||||
"infoHashKey": infoHashKey,
|
||||
"key": peerKey,
|
||||
"timestamp": timeStamp,
|
||||
"error": err,
|
||||
"InfoHashKey": infoHashKey,
|
||||
"PeerId": peerId,
|
||||
"Timestamp": timeStamp,
|
||||
"Error": err,
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -835,10 +719,9 @@ func (ps *store) gc(cutoffNanos int64, v6 bool) {
|
||||
err = asNil(err)
|
||||
if err != nil {
|
||||
log.Error("storage: Redis: unable to delete peer", log.Fields{
|
||||
"hashSet": ihSummaryKey,
|
||||
"infoHashKey": infoHashKey,
|
||||
"key": k,
|
||||
"error": err,
|
||||
"InfoHashKey": infoHashKey,
|
||||
"PeerId": k,
|
||||
"Error": err,
|
||||
})
|
||||
} else {
|
||||
removedPeerCount += count
|
||||
@@ -846,20 +729,18 @@ func (ps *store) gc(cutoffNanos int64, v6 bool) {
|
||||
}
|
||||
} else {
|
||||
log.Error("storage: Redis: unable to delete peers", log.Fields{
|
||||
"hashSet": ihSummaryKey,
|
||||
"infoHashKey": infoHashKey,
|
||||
"keys": peersToRemove,
|
||||
"error": err,
|
||||
"InfoHashKey": infoHashKey,
|
||||
"PeerIds": peersToRemove,
|
||||
"Error": err,
|
||||
})
|
||||
}
|
||||
}
|
||||
if removedPeerCount > 0 { // DECR seeder/leecher counter
|
||||
if err = ps.con.DecrBy(ps.ctx, cntKey, removedPeerCount).Err(); err != nil {
|
||||
log.Error("storage: Redis: unable to decrement seeder/leecher peer count", log.Fields{
|
||||
"hashSet": ihSummaryKey,
|
||||
"infoHashKey": infoHashKey,
|
||||
"key": cntKey,
|
||||
"error": err,
|
||||
"InfoHashKey": infoHashKey,
|
||||
"CountKey": cntKey,
|
||||
"Error": err,
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -873,27 +754,25 @@ func (ps *store) gc(cutoffNanos int64, v6 bool) {
|
||||
// Empty hashes are not shown among existing keys,
|
||||
// in other words, it's removed automatically after `HDEL` the last field.
|
||||
// _, err := ps.con.Del(ps.ctx, infoHashKey)
|
||||
err = asNil(ps.con.SRem(ps.ctx, ihSummaryKey, infoHashKey).Err())
|
||||
err = asNil(ps.con.SRem(ps.ctx, ihKey, infoHashKey).Err())
|
||||
}
|
||||
return err
|
||||
}, infoHashKey))
|
||||
if err != nil {
|
||||
log.Error("storage: Redis: unable to clean info hash records", log.Fields{
|
||||
"hashSet": ihSummaryKey,
|
||||
"infoHashKey": infoHashKey,
|
||||
"error": err,
|
||||
"InfoHashKey": infoHashKey,
|
||||
"Error": err,
|
||||
})
|
||||
}
|
||||
} else {
|
||||
log.Error("storage: Redis: unable to fetch info hash peers", log.Fields{
|
||||
"hashSet": ihSummaryKey,
|
||||
"infoHashKey": infoHashKey,
|
||||
"error": err,
|
||||
"InfoHashKey": infoHashKey,
|
||||
"Error": err,
|
||||
})
|
||||
}
|
||||
}
|
||||
} else {
|
||||
log.Error("storage: Redis: unable to fetch info hash set", log.Fields{"hashSet": ihSummaryKey, "error": err})
|
||||
log.Error("storage: Redis: unable to fetch info hash set", log.Fields{"HashSet": ihKey, "Error": err})
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -12,6 +12,7 @@ import (
|
||||
)
|
||||
|
||||
var cfg = Config{
|
||||
Addresses: []string{"localhost:6379"},
|
||||
GarbageCollectionInterval: 10 * time.Minute,
|
||||
PrometheusReportingInterval: 10 * time.Minute,
|
||||
PeerLifetime: 30 * time.Minute,
|
||||
|
||||
@@ -3,6 +3,7 @@
|
||||
package test
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
@@ -59,7 +60,10 @@ func (th *testHolder) AnnouncePeers(t *testing.T) {
|
||||
peer = v6Peer
|
||||
}
|
||||
_, err := th.st.AnnouncePeers(c.ih, false, 50, peer)
|
||||
require.Equal(t, storage.ErrResourceDoesNotExist, err)
|
||||
if errors.Is(err, storage.ErrResourceDoesNotExist) {
|
||||
err = nil
|
||||
}
|
||||
require.Nil(t, err)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -98,6 +102,9 @@ func (th *testHolder) LeecherPutAnnounceDeleteAnnounce(t *testing.T) {
|
||||
require.Nil(t, err)
|
||||
|
||||
peers, err = th.st.AnnouncePeers(c.ih, true, 50, peer)
|
||||
if errors.Is(err, storage.ErrResourceDoesNotExist) {
|
||||
err = nil
|
||||
}
|
||||
require.Nil(t, err)
|
||||
require.False(t, containsPeer(peers, c.peer))
|
||||
}
|
||||
@@ -125,6 +132,9 @@ func (th *testHolder) SeederPutAnnounceDeleteAnnounce(t *testing.T) {
|
||||
require.Nil(t, err)
|
||||
|
||||
peers, err = th.st.AnnouncePeers(c.ih, false, 50, peer)
|
||||
if errors.Is(err, storage.ErrResourceDoesNotExist) {
|
||||
err = nil
|
||||
}
|
||||
require.Nil(t, err)
|
||||
require.False(t, containsPeer(peers, c.peer))
|
||||
}
|
||||
@@ -255,8 +265,12 @@ func (th *testHolder) GC(t *testing.T) {
|
||||
}
|
||||
th.st.GC(time.Now().Add(time.Hour))
|
||||
for _, c := range testData {
|
||||
_, err := th.st.AnnouncePeers(c.ih, false, 100, v4Peer)
|
||||
require.Equal(t, storage.ErrResourceDoesNotExist, err)
|
||||
peers, err := th.st.AnnouncePeers(c.ih, false, 100, v4Peer)
|
||||
if errors.Is(err, storage.ErrResourceDoesNotExist) {
|
||||
err = nil
|
||||
}
|
||||
require.Nil(t, err)
|
||||
require.Empty(t, peers)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user