mirror of
https://github.com/sot-tech/mochi.git
synced 2026-06-16 09:29:45 -07:00
add download count number while scrape
* remove storage configuration print * change test for pg storage
This commit is contained in:
+2
-2
@@ -39,12 +39,12 @@ func (r *Server) Run(configFilePath string) error {
|
||||
log.Info().Msg("metrics disabled because of empty address")
|
||||
}
|
||||
|
||||
log.Info().Str("name", cfg.Storage.Name).Msg("starting storage")
|
||||
log.Debug().Str("name", cfg.Storage.Name).Object("config", cfg.Storage.Config).Msg("starting storage")
|
||||
r.storage, err = storage.NewStorage(cfg.Storage.Name, cfg.Storage.Config)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to create storage: %w", err)
|
||||
}
|
||||
log.Info().Object("config", r.storage).Msg("started storage")
|
||||
log.Info().Str("name", cfg.Storage.Name).Msg("started storage")
|
||||
|
||||
preHooks, err := middleware.NewHooks(cfg.PreHooks, r.storage)
|
||||
if err != nil {
|
||||
|
||||
+18
-22
@@ -15,7 +15,6 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/go-redis/redis/v8"
|
||||
"github.com/rs/zerolog"
|
||||
|
||||
"github.com/sot-tech/mochi/bittorrent"
|
||||
"github.com/sot-tech/mochi/pkg/conf"
|
||||
@@ -98,16 +97,11 @@ type store struct {
|
||||
peerTTL uint
|
||||
}
|
||||
|
||||
// MarshalZerologObject writes configuration into zerolog event
|
||||
func (s store) MarshalZerologObject(e *zerolog.Event) {
|
||||
e.Str("type", Name).Object("config", s.Config)
|
||||
}
|
||||
|
||||
func (s store) setPeerTTL(infoHashKey, peerID string) error {
|
||||
func (s *store) setPeerTTL(infoHashKey, peerID string) error {
|
||||
return s.Process(context.TODO(), redis.NewCmd(context.TODO(), expireMemberCmd, infoHashKey, peerID, s.peerTTL))
|
||||
}
|
||||
|
||||
func (s store) addPeer(infoHashKey, peerID string) (err error) {
|
||||
func (s *store) addPeer(infoHashKey, peerID string) (err error) {
|
||||
logger.Trace().
|
||||
Str("infoHashKey", infoHashKey).
|
||||
Str("peerID", peerID).
|
||||
@@ -118,7 +112,7 @@ func (s store) addPeer(infoHashKey, peerID string) (err error) {
|
||||
return
|
||||
}
|
||||
|
||||
func (s store) delPeer(infoHashKey, peerID string) error {
|
||||
func (s *store) delPeer(infoHashKey, peerID string) error {
|
||||
logger.Trace().
|
||||
Str("infoHashKey", infoHashKey).
|
||||
Str("peerID", peerID).
|
||||
@@ -132,23 +126,23 @@ func (s store) delPeer(infoHashKey, peerID string) error {
|
||||
return err
|
||||
}
|
||||
|
||||
func (s store) PutSeeder(ih bittorrent.InfoHash, peer bittorrent.Peer) error {
|
||||
func (s *store) PutSeeder(ih bittorrent.InfoHash, peer bittorrent.Peer) error {
|
||||
return s.addPeer(r.InfoHashKey(ih.RawString(), true, peer.Addr().Is6()), peer.RawString())
|
||||
}
|
||||
|
||||
func (s store) DeleteSeeder(ih bittorrent.InfoHash, peer bittorrent.Peer) error {
|
||||
func (s *store) DeleteSeeder(ih bittorrent.InfoHash, peer bittorrent.Peer) error {
|
||||
return s.delPeer(r.InfoHashKey(ih.RawString(), true, peer.Addr().Is6()), peer.RawString())
|
||||
}
|
||||
|
||||
func (s store) PutLeecher(ih bittorrent.InfoHash, peer bittorrent.Peer) error {
|
||||
func (s *store) PutLeecher(ih bittorrent.InfoHash, peer bittorrent.Peer) error {
|
||||
return s.addPeer(r.InfoHashKey(ih.RawString(), false, peer.Addr().Is6()), peer.RawString())
|
||||
}
|
||||
|
||||
func (s store) DeleteLeecher(ih bittorrent.InfoHash, peer bittorrent.Peer) error {
|
||||
func (s *store) DeleteLeecher(ih bittorrent.InfoHash, peer bittorrent.Peer) error {
|
||||
return s.delPeer(r.InfoHashKey(ih.RawString(), false, peer.Addr().Is6()), peer.RawString())
|
||||
}
|
||||
|
||||
func (s store) GraduateLeecher(ih bittorrent.InfoHash, peer bittorrent.Peer) (err error) {
|
||||
func (s *store) GraduateLeecher(ih bittorrent.InfoHash, peer bittorrent.Peer) (err error) {
|
||||
logger.Trace().
|
||||
Stringer("infoHash", ih).
|
||||
Object("peer", peer).
|
||||
@@ -163,12 +157,15 @@ func (s store) GraduateLeecher(ih bittorrent.InfoHash, peer bittorrent.Peer) (er
|
||||
} else {
|
||||
err = s.addPeer(ihSeederKey, peerID)
|
||||
}
|
||||
if err == nil {
|
||||
err = s.HIncrBy(context.TODO(), r.CountDownloadsKey, infoHash, 1).Err()
|
||||
}
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// AnnouncePeers is the same function as redis.AnnouncePeers
|
||||
func (s store) AnnouncePeers(ih bittorrent.InfoHash, forSeeder bool, numWant int, v6 bool) ([]bittorrent.Peer, error) {
|
||||
func (s *store) AnnouncePeers(ih bittorrent.InfoHash, forSeeder bool, numWant int, v6 bool) ([]bittorrent.Peer, error) {
|
||||
logger.Trace().
|
||||
Stringer("infoHash", ih).
|
||||
Bool("forSeeder", forSeeder).
|
||||
@@ -182,25 +179,24 @@ func (s store) AnnouncePeers(ih bittorrent.InfoHash, forSeeder bool, numWant int
|
||||
}
|
||||
|
||||
// ScrapeSwarm is the same function as redis.ScrapeSwarm except `SCard` call instead of `HLen`
|
||||
func (s store) ScrapeSwarm(ih bittorrent.InfoHash) (leechers uint32, seeders uint32, snatched uint32) {
|
||||
func (s *store) ScrapeSwarm(ih bittorrent.InfoHash) (uint32, uint32, uint32) {
|
||||
logger.Trace().
|
||||
Stringer("infoHash", ih).
|
||||
Msg("scrape swarm")
|
||||
leechers, seeders = s.CountPeers(ih, s.SCard)
|
||||
return
|
||||
return s.ScrapeIH(ih, s.SCard)
|
||||
}
|
||||
|
||||
func (store) GCAware() bool {
|
||||
func (*store) GCAware() bool {
|
||||
return false
|
||||
}
|
||||
|
||||
func (store) ScheduleGC(_, _ time.Duration) {}
|
||||
func (*store) ScheduleGC(_, _ time.Duration) {}
|
||||
|
||||
func (store) StatisticsAware() bool {
|
||||
func (*store) StatisticsAware() bool {
|
||||
return false
|
||||
}
|
||||
|
||||
func (store) ScheduleStatisticsCollection(_ time.Duration) {}
|
||||
func (*store) ScheduleStatisticsCollection(_ time.Duration) {}
|
||||
|
||||
func (s *store) Stop() stop.Result {
|
||||
c := make(stop.Channel)
|
||||
|
||||
@@ -9,8 +9,6 @@ import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/rs/zerolog"
|
||||
|
||||
"github.com/sot-tech/mochi/bittorrent"
|
||||
"github.com/sot-tech/mochi/pkg/conf"
|
||||
"github.com/sot-tech/mochi/pkg/log"
|
||||
@@ -47,11 +45,6 @@ type Config struct {
|
||||
ShardCount int `cfg:"shard_count"`
|
||||
}
|
||||
|
||||
// MarshalZerologObject writes configuration into zerolog event
|
||||
func (cfg Config) MarshalZerologObject(e *zerolog.Event) {
|
||||
e.Int("shardCount", cfg.ShardCount)
|
||||
}
|
||||
|
||||
// Validate sanity checks values set in a config and returns a new config with
|
||||
// default values replacing anything that is invalid.
|
||||
//
|
||||
@@ -110,11 +103,6 @@ type peerStore struct {
|
||||
wg sync.WaitGroup
|
||||
}
|
||||
|
||||
// MarshalZerologObject writes configuration into zerolog event
|
||||
func (ps *peerStore) MarshalZerologObject(e *zerolog.Event) {
|
||||
e.Str("type", Name).Object("config", ps.cfg)
|
||||
}
|
||||
|
||||
var _ storage.PeerStorage = &peerStore{}
|
||||
|
||||
func (ps *peerStore) ScheduleGC(gcInterval, peerLifeTime time.Duration) {
|
||||
@@ -406,7 +394,7 @@ func (ps *peerStore) AnnouncePeers(ih bittorrent.InfoHash, forSeeder bool, numWa
|
||||
return
|
||||
}
|
||||
|
||||
func (ps *peerStore) countPeers(ih bittorrent.InfoHash, v6 bool) (leechers uint32, seeders uint32) {
|
||||
func (ps *peerStore) countPeers(ih bittorrent.InfoHash, v6 bool) (leechers, seeders uint32) {
|
||||
shard := ps.shards[ps.shardIndex(ih, v6)]
|
||||
shard.RLock()
|
||||
defer shard.RUnlock()
|
||||
|
||||
+33
-43
@@ -15,7 +15,6 @@ import (
|
||||
|
||||
"github.com/jackc/pgx/v5"
|
||||
"github.com/jackc/pgx/v5/pgxpool"
|
||||
"github.com/rs/zerolog"
|
||||
|
||||
"github.com/sot-tech/mochi/bittorrent"
|
||||
"github.com/sot-tech/mochi/pkg/conf"
|
||||
@@ -107,39 +106,23 @@ type dataQueryConf struct {
|
||||
DelQuery string `cfg:"del_query"`
|
||||
}
|
||||
|
||||
type downloadQueryConf struct {
|
||||
GetQuery string
|
||||
IncrementQuery string
|
||||
}
|
||||
|
||||
// Config holds the configuration of a redis PeerStorage.
|
||||
type Config struct {
|
||||
ConnectionString string `cfg:"connection_string"`
|
||||
PingQuery string `cfg:"ping_query"`
|
||||
Peer peerQueryConf
|
||||
Announce announceQueryConf
|
||||
Downloads downloadQueryConf
|
||||
Data dataQueryConf
|
||||
GCQuery string `cfg:"gc_query"`
|
||||
InfoHashCountQuery string `cfg:"info_hash_count_query"`
|
||||
}
|
||||
|
||||
// MarshalZerologObject writes configuration fields into zerolog event
|
||||
func (cfg Config) MarshalZerologObject(e *zerolog.Event) {
|
||||
e.Str("connectionString", "<hidden>").
|
||||
Str("pingQuery", cfg.PingQuery).
|
||||
Str("peer.addQuery", cfg.Peer.AddQuery).
|
||||
Str("peer.delQuery", cfg.Peer.DelQuery).
|
||||
Str("peer.graduateQuery", cfg.Peer.GraduateQuery).
|
||||
Str("peer.countQuery", cfg.Peer.CountQuery).
|
||||
Str("peer.countSeedersColumn", cfg.Peer.CountSeedersColumn).
|
||||
Str("peer.countLeechersColumn", cfg.Peer.CountLeechersColumn).
|
||||
Str("peer.byInfoHashClause", cfg.Peer.ByInfoHashClause).
|
||||
Str("announce.query", cfg.Announce.Query).
|
||||
Str("announce.peerIDColumn", cfg.Announce.PeerIDColumn).
|
||||
Str("announce.addressColumn", cfg.Announce.AddressColumn).
|
||||
Str("announce.portColumn", cfg.Announce.PortColumn).
|
||||
Str("data.addQuery", cfg.Data.AddQuery).
|
||||
Str("data.getQuery", cfg.Data.GetQuery).
|
||||
Str("data.delQuery", cfg.Data.DelQuery).
|
||||
Str("gcQuery", cfg.GCQuery).
|
||||
Str("infoHashCountQuery", cfg.InfoHashCountQuery)
|
||||
}
|
||||
|
||||
// Validate sanity checks values set in a config and returns a new config with
|
||||
// default values replacing anything that is invalid.
|
||||
//
|
||||
@@ -295,8 +278,7 @@ func (s *store) Contains(ctx string, key string) (contains bool, err error) {
|
||||
}
|
||||
|
||||
func (s *store) Load(ctx string, key string) (out []byte, err error) {
|
||||
row := s.QueryRow(context.TODO(), s.Data.GetQuery, pgx.NamedArgs{pCtx: ctx, pKey: []byte(key)})
|
||||
if err = row.Scan(&out); errors.Is(err, pgx.ErrNoRows) {
|
||||
if err = s.QueryRow(context.TODO(), s.Data.GetQuery, pgx.NamedArgs{pCtx: ctx, pKey: []byte(key)}).Scan(&out); errors.Is(err, pgx.ErrNoRows) {
|
||||
err = nil
|
||||
}
|
||||
return
|
||||
@@ -364,10 +346,9 @@ func (s *store) ScheduleStatisticsCollection(reportInterval time.Duration) {
|
||||
case <-t.C:
|
||||
if metrics.Enabled() {
|
||||
before := time.Now()
|
||||
sc, lc := s.countPeers(bittorrent.NoneInfoHash)
|
||||
sc, lc := s.countPeers(nil)
|
||||
var hc int
|
||||
row := s.QueryRow(context.Background(), s.InfoHashCountQuery)
|
||||
if err := row.Scan(&hc); err != nil && !errors.Is(err, pgx.ErrNoRows) {
|
||||
if err := s.QueryRow(context.Background(), s.InfoHashCountQuery).Scan(&hc); err != nil && !errors.Is(err, pgx.ErrNoRows) {
|
||||
logger.Error().Err(err).Msg("error occurred while get info hash count")
|
||||
}
|
||||
|
||||
@@ -433,18 +414,21 @@ func (s *store) DeleteLeecher(ih bittorrent.InfoHash, peer bittorrent.Peer) erro
|
||||
return s.delPeer(ih, peer, false)
|
||||
}
|
||||
|
||||
func (s *store) GraduateLeecher(ih bittorrent.InfoHash, peer bittorrent.Peer) (err error) {
|
||||
func (s *store) GraduateLeecher(ih bittorrent.InfoHash, peer bittorrent.Peer) error {
|
||||
logger.Trace().
|
||||
Stringer("infoHash", ih).
|
||||
Object("peer", peer).
|
||||
Msg("graduate leecher")
|
||||
_, err = s.Exec(context.TODO(), s.Peer.GraduateQuery, pgx.NamedArgs{
|
||||
pInfoHash: []byte(ih),
|
||||
var batch pgx.Batch
|
||||
ihb := []byte(ih)
|
||||
batch.Queue(s.Peer.GraduateQuery, pgx.NamedArgs{
|
||||
pInfoHash: ihb,
|
||||
pPeerID: peer.ID[:],
|
||||
pAddress: net.IP(peer.Addr().AsSlice()),
|
||||
pPort: peer.Port(),
|
||||
})
|
||||
return
|
||||
batch.Queue(s.Downloads.IncrementQuery, pgx.NamedArgs{pInfoHash: ihb})
|
||||
return s.txBatch(context.TODO(), &batch)
|
||||
}
|
||||
|
||||
func (s *store) getPeers(ih bittorrent.InfoHash, seeders bool, maxCount int, isV6 bool) (peers []bittorrent.Peer, err error) {
|
||||
@@ -469,7 +453,11 @@ func (s *store) getPeers(ih bittorrent.InfoHash, seeders bool, maxCount int, isV
|
||||
}
|
||||
}
|
||||
if idIndex < 0 || ipIndex < 0 || portIndex < 0 {
|
||||
err = fmt.Errorf(errRequiredColumnsNotFoundMsg, []string{s.Announce.PeerIDColumn, s.Announce.AddressColumn, s.Announce.PortColumn})
|
||||
err = fmt.Errorf(errRequiredColumnsNotFoundMsg, []string{
|
||||
s.Announce.PeerIDColumn,
|
||||
s.Announce.AddressColumn,
|
||||
s.Announce.PortColumn,
|
||||
})
|
||||
return
|
||||
}
|
||||
var maxIndex int
|
||||
@@ -543,13 +531,13 @@ func (s *store) AnnouncePeers(ih bittorrent.InfoHash, forSeeder bool, numWant in
|
||||
return
|
||||
}
|
||||
|
||||
func (s *store) countPeers(ih bittorrent.InfoHash) (seeders int, leechers int) {
|
||||
func (s *store) countPeers(ih []byte) (seeders uint32, leechers uint32) {
|
||||
var rows pgx.Rows
|
||||
var err error
|
||||
if ih == bittorrent.NoneInfoHash {
|
||||
if len(ih) == 0 {
|
||||
rows, err = s.Query(context.TODO(), s.Peer.CountQuery)
|
||||
} else {
|
||||
rows, err = s.Query(context.TODO(), s.Peer.CountQuery+" "+s.Peer.ByInfoHashClause, pgx.NamedArgs{pInfoHash: []byte(ih)})
|
||||
rows, err = s.Query(context.TODO(), s.Peer.CountQuery+" "+s.Peer.ByInfoHashClause, pgx.NamedArgs{pInfoHash: ih})
|
||||
}
|
||||
if err == nil {
|
||||
defer rows.Close()
|
||||
@@ -581,7 +569,7 @@ func (s *store) countPeers(ih bittorrent.InfoHash) (seeders int, leechers int) {
|
||||
}
|
||||
}
|
||||
if err != nil {
|
||||
logger.Error().Err(err).Stringer("infoHash", ih).Msg("unable to get peers count")
|
||||
logger.Error().Err(err).Hex("infoHash", ih).Msg("unable to get peers count")
|
||||
}
|
||||
return
|
||||
}
|
||||
@@ -590,8 +578,14 @@ func (s *store) ScrapeSwarm(ih bittorrent.InfoHash) (leechers uint32, seeders ui
|
||||
logger.Trace().
|
||||
Stringer("infoHash", ih).
|
||||
Msg("scrape swarm")
|
||||
sc, lc := s.countPeers(ih)
|
||||
seeders, leechers = uint32(sc), uint32(lc)
|
||||
ihb := []byte(ih)
|
||||
seeders, leechers = s.countPeers(ihb)
|
||||
if len(s.Downloads.GetQuery) > 0 {
|
||||
if err := s.QueryRow(context.TODO(), s.Downloads.GetQuery, pgx.NamedArgs{pInfoHash: ihb}).Scan(&snatched); err != nil && !errors.Is(err, pgx.ErrNoRows) {
|
||||
logger.Error().Stringer("infoHash", ih).Err(err).Msg("error occurred while get info downloads count")
|
||||
}
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
@@ -616,7 +610,3 @@ func (s *store) Stop() stop.Result {
|
||||
}()
|
||||
return c.Result()
|
||||
}
|
||||
|
||||
func (s *store) MarshalZerologObject(e *zerolog.Event) {
|
||||
e.Str("type", Name).Object("config", s.Config)
|
||||
}
|
||||
|
||||
@@ -26,6 +26,12 @@ CREATE TABLE mo_peers (
|
||||
CREATE INDEX mo_peers_created_idx ON mo_peers(created);
|
||||
CREATE INDEX mo_peers_announce_idx ON mo_peers(info_hash, is_seeder, is_v6);
|
||||
|
||||
DROP TABLE IF EXISTS mo_downloads;
|
||||
CREATE TABLE mo_downloads (
|
||||
info_hash bytea PRIMARY KEY NOT NULL,
|
||||
downloads int NOT NULL DEFAULT 0
|
||||
);
|
||||
|
||||
DROP TABLE IF EXISTS mo_kv;
|
||||
CREATE TABLE mo_kv (
|
||||
context varchar NOT NULL,
|
||||
@@ -41,12 +47,12 @@ var cfg = Config{
|
||||
PingQuery: "SELECT 1",
|
||||
Peer: peerQueryConf{
|
||||
AddQuery: "INSERT INTO mo_peers VALUES(@info_hash, @peer_id, @address, @port, @is_seeder, @is_v6, @created) ON CONFLICT (info_hash, peer_id, address, port) DO UPDATE SET created = EXCLUDED.created, is_seeder = EXCLUDED.is_seeder",
|
||||
DelQuery: "DELETE FROM mo_peers WHERE info_hash=@info_hash AND peer_id=peer_id AND address=@address AND port=@port AND is_seeder=$5",
|
||||
DelQuery: "DELETE FROM mo_peers WHERE info_hash=@info_hash AND peer_id=@peer_id AND address=@address AND port=@port AND is_seeder=@is_seeder",
|
||||
GraduateQuery: "UPDATE mo_peers SET is_seeder=TRUE WHERE info_hash=@info_hash AND peer_id=peer_id AND address=@address AND port=@port AND NOT is_seeder",
|
||||
CountQuery: "SELECT COUNT(1) FILTER (WHERE is_seeder) AS seeders, COUNT(1) FILTER (WHERE NOT is_seeder) AS leechers FROM mo_peers",
|
||||
CountSeedersColumn: "seeders",
|
||||
CountLeechersColumn: "leechers",
|
||||
ByInfoHashClause: "WHERE info_hash = $1",
|
||||
ByInfoHashClause: "WHERE info_hash = @info_hash",
|
||||
},
|
||||
Announce: announceQueryConf{
|
||||
Query: "SELECT peer_id, address, port FROM mo_peers WHERE info_hash=@info_hash AND is_seeder=@is_seeder AND is_v6=@is_v6 LIMIT @count",
|
||||
@@ -54,10 +60,14 @@ var cfg = Config{
|
||||
AddressColumn: "address",
|
||||
PortColumn: "port",
|
||||
},
|
||||
Downloads: downloadQueryConf{
|
||||
GetQuery: "SELECT downloads FROM mo_downloads where info_hash=@info_hash",
|
||||
IncrementQuery: "UPDATE mo_downloads SET downloads = downloads + 1 WHERE info_hash=@info_hash",
|
||||
},
|
||||
Data: dataQueryConf{
|
||||
AddQuery: "INSERT INTO mo_kv VALUES(@context, @key, @value) ON CONFLICT (context, name) DO NOTHING",
|
||||
GetQuery: "SELECT value FROM mo_kv WHERE context=@context AND name=@key",
|
||||
DelQuery: "DELETE FROM mo_kv WHERE context=@context AND name IN @key",
|
||||
DelQuery: "DELETE FROM mo_kv WHERE context=@context AND name = ANY(@key)",
|
||||
},
|
||||
GCQuery: "DELETE FROM mo_peers WHERE created <= @created",
|
||||
InfoHashCountQuery: "SELECT COUNT(DISTINCT info_hash) as info_hashes FROM mo_peers",
|
||||
|
||||
+20
-31
@@ -1,6 +1,6 @@
|
||||
// Package redis implements the storage interface.
|
||||
// BitTorrent tracker keeping peer data in redis with hash.
|
||||
// There two categories of hash:
|
||||
// There three categories of hash:
|
||||
//
|
||||
// - CHI_{L,S}{4,6}_<HASH> (hash type)
|
||||
// To save peers that hold the infohash, used for fast searching,
|
||||
@@ -10,6 +10,9 @@
|
||||
// To save all the infohashes, used for garbage collection,
|
||||
// metrics aggregation and leecher graduation
|
||||
//
|
||||
// - CHI_D (hash type)
|
||||
// To record the number of torrent downloads.
|
||||
//
|
||||
// Two keys are used to record the count of seeders and leechers.
|
||||
//
|
||||
// - CHI_C_S (key type)
|
||||
@@ -28,7 +31,6 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/go-redis/redis/v8"
|
||||
"github.com/rs/zerolog"
|
||||
|
||||
"github.com/sot-tech/mochi/bittorrent"
|
||||
"github.com/sot-tech/mochi/pkg/conf"
|
||||
@@ -63,6 +65,8 @@ const (
|
||||
CountSeederKey = "CHI_C_S"
|
||||
// CountLeecherKey redis key for leecher count
|
||||
CountLeecherKey = "CHI_C_L"
|
||||
// CountDownloadsKey redis key for snatches (downloads) count
|
||||
CountDownloadsKey = "CHI_D"
|
||||
)
|
||||
|
||||
var (
|
||||
@@ -121,20 +125,6 @@ type Config struct {
|
||||
ConnectTimeout time.Duration `cfg:"connect_timeout"`
|
||||
}
|
||||
|
||||
// MarshalZerologObject writes configuration fields into zerolog event
|
||||
func (cfg Config) MarshalZerologObject(e *zerolog.Event) {
|
||||
e.Strs("addresses", cfg.Addresses).
|
||||
Int("db", cfg.DB).
|
||||
Int("poolSize", cfg.PoolSize).
|
||||
Bool("sentinel", cfg.Sentinel).
|
||||
Str("sentinelMaster", cfg.SentinelMaster).
|
||||
Bool("cluster", cfg.Cluster).
|
||||
Dur("readTimeout", cfg.ReadTimeout).
|
||||
Dur("writeTimeout", cfg.WriteTimeout).
|
||||
Dur("connectTimeout", cfg.ConnectTimeout).
|
||||
Dur("peerLifetime", cfg.PeerLifetime)
|
||||
}
|
||||
|
||||
// Validate sanity checks values set in a config and returns a new config with
|
||||
// default values replacing anything that is invalid.
|
||||
//
|
||||
@@ -239,12 +229,7 @@ func (cfg Config) Connect() (con Connection, err error) {
|
||||
rs = nil
|
||||
}
|
||||
cfg.Login, cfg.Password = "", ""
|
||||
return Connection{rs, cfg}, err
|
||||
}
|
||||
|
||||
// MarshalZerologObject writes configuration into zerolog event
|
||||
func (ps *store) MarshalZerologObject(e *zerolog.Event) {
|
||||
e.Str("type", Name).Object("config", ps.Config)
|
||||
return Connection{rs}, err
|
||||
}
|
||||
|
||||
func (ps *store) ScheduleGC(gcInterval, peerLifeTime time.Duration) {
|
||||
@@ -301,7 +286,6 @@ func (ps *store) ScheduleStatisticsCollection(reportInterval time.Duration) {
|
||||
// Connection is wrapper for redis.UniversalClient
|
||||
type Connection struct {
|
||||
redis.UniversalClient
|
||||
Config
|
||||
}
|
||||
|
||||
type store struct {
|
||||
@@ -454,6 +438,9 @@ func (ps *store) GraduateLeecher(ih bittorrent.InfoHash, peer bittorrent.Peer) e
|
||||
if err == nil {
|
||||
err = tx.SAdd(context.TODO(), IHKey, ihSeederKey).Err()
|
||||
}
|
||||
if err == nil {
|
||||
err = tx.HIncrBy(context.TODO(), CountDownloadsKey, infoHash, 1).Err()
|
||||
}
|
||||
return err
|
||||
})
|
||||
}
|
||||
@@ -537,26 +524,28 @@ func (ps *Connection) countPeers(infoHashKey string, countFn getPeerCountFn) uin
|
||||
return uint32(count)
|
||||
}
|
||||
|
||||
// CountPeers calls provided countFn and returns seeders and leechers count for specified info hash
|
||||
func (ps *Connection) CountPeers(ih bittorrent.InfoHash, countFn getPeerCountFn) (leechersCount, seedersCount uint32) {
|
||||
// ScrapeIH calls provided countFn and returns seeders, leechers and downloads count for specified info hash
|
||||
func (ps *Connection) ScrapeIH(ih bittorrent.InfoHash, countFn getPeerCountFn) (leechersCount, seedersCount, downloadsCount uint32) {
|
||||
infoHash := ih.RawString()
|
||||
|
||||
leechersCount = ps.countPeers(InfoHashKey(infoHash, false, false), countFn) +
|
||||
ps.countPeers(InfoHashKey(infoHash, false, true), countFn)
|
||||
seedersCount = ps.countPeers(InfoHashKey(infoHash, true, false), countFn) +
|
||||
ps.countPeers(InfoHashKey(infoHash, true, true), countFn)
|
||||
d, err := ps.HGet(context.TODO(), CountDownloadsKey, infoHash).Uint64()
|
||||
if err = AsNil(err); err != nil {
|
||||
logger.Error().Err(err).Str("infoHash", infoHash).Msg("downloads count calculation failure")
|
||||
}
|
||||
downloadsCount = uint32(d)
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func (ps *store) ScrapeSwarm(ih bittorrent.InfoHash) (leechers uint32, seeders uint32, snatched uint32) {
|
||||
func (ps *store) ScrapeSwarm(ih bittorrent.InfoHash) (uint32, uint32, uint32) {
|
||||
logger.Trace().
|
||||
Stringer("infoHash", ih).
|
||||
Msg("scrape swarm")
|
||||
|
||||
leechers, seeders = ps.CountPeers(ih, ps.HLen)
|
||||
|
||||
return
|
||||
return ps.ScrapeIH(ih, ps.HLen)
|
||||
}
|
||||
|
||||
const argNumErrorMsg = "ERR wrong number of arguments"
|
||||
@@ -621,7 +610,7 @@ func (ps *Connection) Delete(ctx string, keys ...string) (err error) {
|
||||
}
|
||||
|
||||
// Preservable - storage.DataStorage implementation
|
||||
func (Connection) Preservable() bool {
|
||||
func (*Connection) Preservable() bool {
|
||||
return true
|
||||
}
|
||||
|
||||
|
||||
@@ -7,8 +7,6 @@ import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/rs/zerolog"
|
||||
|
||||
"github.com/sot-tech/mochi/bittorrent"
|
||||
"github.com/sot-tech/mochi/pkg/conf"
|
||||
"github.com/sot-tech/mochi/pkg/log"
|
||||
@@ -216,10 +214,6 @@ type PeerStorage interface {
|
||||
// Stopper is an interface that expects a Stop method to stop the PeerStorage.
|
||||
// For more details see the documentation in the stop package.
|
||||
stop.Stopper
|
||||
|
||||
// LogObjectMarshaler returns a loggable version of the data used to configure and
|
||||
// operate a particular PeerStorage.
|
||||
zerolog.LogObjectMarshaler
|
||||
}
|
||||
|
||||
// RegisterBuilder makes a Builder available by the provided name.
|
||||
|
||||
Reference in New Issue
Block a user