mirror of
https://github.com/sot-tech/mochi.git
synced 2026-04-26 23:50:00 -07:00
* remove peer argument from scrape swarm storage call * replace Peer field with netip.Addr in ScrapeRequest * add man for keydb storage * update readme
189 lines
5.2 KiB
Go
189 lines
5.2 KiB
Go
// Package keydb implements the storage interface.
|
|
// This storage mostly is the same as redis, but
|
|
// uses KeyDB-specific command `EXPIREMEMBER`, so it
|
|
// does not need garbage collection.
|
|
//
|
|
// Storage uses redis.IHSeederKey and redis.IHLeecherKey,
|
|
// BUT they are NOT compatible with each other because of
|
|
// another structure (hash in redis and set in keydb).
|
|
// Note: this storage also does not support statistics collection.
|
|
package keydb
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
|
|
"github.com/go-redis/redis/v8"
|
|
|
|
"github.com/sot-tech/mochi/bittorrent"
|
|
"github.com/sot-tech/mochi/pkg/conf"
|
|
"github.com/sot-tech/mochi/pkg/log"
|
|
"github.com/sot-tech/mochi/pkg/stop"
|
|
"github.com/sot-tech/mochi/storage"
|
|
r "github.com/sot-tech/mochi/storage/redis"
|
|
)
|
|
|
|
// Name is name of this storage
|
|
const (
|
|
Name = "keydb"
|
|
expireMemberCmd = "EXPIREMEMBER"
|
|
)
|
|
|
|
// ErrNotKeyDB returned from initializer if connected does not support KeyDB
|
|
// specific command (EXPIREMEMBER)
|
|
var ErrNotKeyDB = errors.New("provided instance seems not KeyDB")
|
|
|
|
func init() {
|
|
// Register the storage driver.
|
|
storage.RegisterBuilder(Name, builder)
|
|
}
|
|
|
|
func builder(icfg conf.MapConfig) (storage.PeerStorage, error) {
|
|
var cfg r.Config
|
|
var err error
|
|
|
|
if err = icfg.Unmarshal(&cfg); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return newStore(cfg)
|
|
}
|
|
|
|
func newStore(cfg r.Config) (*store, error) {
|
|
var err error
|
|
if cfg, err = cfg.Validate(); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
var rs r.Connection
|
|
|
|
if rs, err = cfg.Connect(); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
cmd := redis.NewCommandsInfoCmd(context.Background(), "COMMAND", "INFO", expireMemberCmd)
|
|
_ = rs.Process(context.Background(), cmd)
|
|
err = r.AsNil(cmd.Err())
|
|
if err == nil && len(cmd.Val()) == 0 {
|
|
err = ErrNotKeyDB
|
|
}
|
|
|
|
var st *store
|
|
if err == nil {
|
|
st = &store{
|
|
Connection: rs,
|
|
logFields: cfg.LogFields(),
|
|
peerTTL: uint(cfg.PeerLifetime.Seconds()),
|
|
}
|
|
st.logFields["name"] = Name
|
|
}
|
|
|
|
return st, err
|
|
}
|
|
|
|
type store struct {
|
|
r.Connection
|
|
logFields log.Fields
|
|
peerTTL uint
|
|
}
|
|
|
|
func (s store) setPeerTTL(infoHashKey, peerID string) error {
|
|
return s.Process(context.TODO(), redis.NewCmd(context.TODO(), expireMemberCmd, infoHashKey, peerID, s.peerTTL))
|
|
}
|
|
|
|
func (s store) addPeer(infoHashKey, peerID string) (err error) {
|
|
log.Debug("storage: KeyDB: PutPeer", log.Fields{
|
|
"infoHashKey": infoHashKey,
|
|
"peerID": peerID,
|
|
})
|
|
if err = s.SAdd(context.TODO(), infoHashKey, peerID).Err(); err == nil {
|
|
err = s.setPeerTTL(infoHashKey, peerID)
|
|
}
|
|
return
|
|
}
|
|
|
|
func (s store) delPeer(infoHashKey, peerID string) error {
|
|
log.Debug("storage: KeyDB: DeletePeer", log.Fields{
|
|
"infoHashKey": infoHashKey,
|
|
"peerID": peerID,
|
|
})
|
|
deleted, err := s.SRem(context.TODO(), infoHashKey, peerID).Uint64()
|
|
err = r.AsNil(err)
|
|
if err == nil && deleted == 0 {
|
|
err = storage.ErrResourceDoesNotExist
|
|
}
|
|
|
|
return err
|
|
}
|
|
|
|
func (s store) PutSeeder(ih bittorrent.InfoHash, peer bittorrent.Peer) error {
|
|
return s.addPeer(r.InfoHashKey(ih.RawString(), true, peer.Addr().Is6()), peer.RawString())
|
|
}
|
|
|
|
func (s store) DeleteSeeder(ih bittorrent.InfoHash, peer bittorrent.Peer) error {
|
|
return s.delPeer(r.InfoHashKey(ih.RawString(), true, peer.Addr().Is6()), peer.RawString())
|
|
}
|
|
|
|
func (s store) PutLeecher(ih bittorrent.InfoHash, peer bittorrent.Peer) error {
|
|
return s.addPeer(r.InfoHashKey(ih.RawString(), false, peer.Addr().Is6()), peer.RawString())
|
|
}
|
|
|
|
func (s store) DeleteLeecher(ih bittorrent.InfoHash, peer bittorrent.Peer) error {
|
|
return s.delPeer(r.InfoHashKey(ih.RawString(), false, peer.Addr().Is6()), peer.RawString())
|
|
}
|
|
|
|
func (s store) GraduateLeecher(ih bittorrent.InfoHash, peer bittorrent.Peer) (err error) {
|
|
log.Debug("storage: KeyDB: GraduateLeecher", log.Fields{
|
|
"infoHash": ih,
|
|
"peer": peer,
|
|
})
|
|
infoHash, peerID := ih.RawString(), peer.RawString()
|
|
ihSeederKey := r.InfoHashKey(infoHash, true, peer.Addr().Is6())
|
|
ihLeecherKey := r.InfoHashKey(infoHash, false, peer.Addr().Is6())
|
|
var moved bool
|
|
if moved, err = s.SMove(context.TODO(), ihLeecherKey, ihSeederKey, peerID).Result(); err == nil {
|
|
if moved {
|
|
err = s.setPeerTTL(ihSeederKey, peerID)
|
|
} else {
|
|
err = s.addPeer(ihSeederKey, peerID)
|
|
}
|
|
}
|
|
return err
|
|
}
|
|
|
|
// AnnouncePeers is the same function as redis.AnnouncePeers
|
|
func (s store) AnnouncePeers(ih bittorrent.InfoHash, seeder bool, numWant int, peer bittorrent.Peer) ([]bittorrent.Peer, error) {
|
|
log.Debug("storage: KeyDB: AnnouncePeers", log.Fields{
|
|
"infoHash": ih,
|
|
"seeder": seeder,
|
|
"numWant": numWant,
|
|
"peer": peer,
|
|
})
|
|
|
|
return s.GetPeers(ih, seeder, numWant, peer, func(ctx context.Context, infoHashKey string, maxCount int) *redis.StringSliceCmd {
|
|
return s.SRandMemberN(context.TODO(), infoHashKey, int64(maxCount))
|
|
})
|
|
}
|
|
|
|
// ScrapeSwarm is the same function as redis.ScrapeSwarm except `SCard` call instead of `HLen`
|
|
func (s store) ScrapeSwarm(ih bittorrent.InfoHash) (leechers uint32, seeders uint32, snatched uint32) {
|
|
log.Debug("storage: KeyDB ScrapeSwarm", log.Fields{
|
|
"infoHash": ih,
|
|
})
|
|
leechers, seeders = s.CountPeers(ih, s.SCard)
|
|
return
|
|
}
|
|
|
|
func (s *store) Stop() stop.Result {
|
|
c := make(stop.Channel)
|
|
if s.UniversalClient != nil {
|
|
c.Done(s.UniversalClient.Close())
|
|
s.UniversalClient = nil
|
|
}
|
|
return c.Result()
|
|
}
|
|
|
|
func (s store) LogFields() log.Fields {
|
|
return s.logFields
|
|
}
|