From 21eaea2b8af0a0a22a961da9d380971d2ec1a473 Mon Sep 17 00:00:00 2001 From: "Lawrence, Rendall" Date: Thu, 16 Jun 2022 17:47:54 +0300 Subject: [PATCH] (untested) implement gc, stats and scrape in pg storage fix query call to use columns names while row scan --- storage/pg/storage.go | 248 ++++++++++++++++++++++++++++++++------- storage/redis/storage.go | 4 +- 2 files changed, 209 insertions(+), 43 deletions(-) diff --git a/storage/pg/storage.go b/storage/pg/storage.go index f4635e0..8208dd5 100644 --- a/storage/pg/storage.go +++ b/storage/pg/storage.go @@ -7,6 +7,7 @@ import ( "net" "net/netip" "strings" + "sync" "time" "github.com/jackc/pgx/v4" @@ -17,6 +18,7 @@ import ( "github.com/sot-tech/mochi/bittorrent" "github.com/sot-tech/mochi/pkg/conf" "github.com/sot-tech/mochi/pkg/log" + "github.com/sot-tech/mochi/pkg/metrics" "github.com/sot-tech/mochi/pkg/stop" "github.com/sot-tech/mochi/pkg/timecache" "github.com/sot-tech/mochi/storage" @@ -34,6 +36,7 @@ var ( errConnectionStringNotProvided = errors.New("database address not provided") errRequiredParameterNotSetMsg = "required parameter not provided: %s" + errRequiredColumnsNotFoundMsg = "one or more required columns not found in result set: %v" tc = timecache.New() ) @@ -55,14 +58,18 @@ func builder(icfg conf.MapConfig) (storage.PeerStorage, error) { return nil, err } + pgConf, err := pgxpool.ParseConfig(cfg.ConnectionString) + if err != nil { + return nil, err + } + + pgConf.ConnConfig.Logger = zerologadapter.NewLogger(logger.Logger) con, err := pgxpool.Connect(context.Background(), cfg.ConnectionString) if err != nil { return nil, err } - con.Config().ConnConfig.Logger = zerologadapter.NewLogger(logger.Logger) - con.Config().ConnConfig.LogLevel = pgx.LogLevel(logger.GetLevel()) - return &store{Config: &cfg, Pool: con}, nil + return &store{Config: cfg, Pool: con, wg: sync.WaitGroup{}, closed: make(chan any)}, nil } // Config holds the configuration of a redis PeerStorage. @@ -74,7 +81,9 @@ type Config struct { DelQuery string `cfg:"del_query"` GraduateQuery string `cfg:"graduate_query"` // SELECT COUNT(1) FILTER (WHERE seeder) AS seeders, COUNT(1) FILTER (WHERE NOT seeder) AS leechers FROM peers - CountQuery string `cfg:"count_query"` + CountQuery string `cfg:"count_query"` + CountSeedersColumn string `cfg:"count_seeders_column"` + CountLeechersColumn string `cfg:"count_leechers_column"` // WHERE ih = ? ByInfoHashClause string `cfg:"by_info_hash_clause"` } @@ -95,8 +104,24 @@ type Config struct { // MarshalZerologObject writes configuration fields into zerolog event func (cfg Config) MarshalZerologObject(e *zerolog.Event) { - e.Str("connectionString", cfg.ConnectionString). - Str("pingQuery", cfg.PingQuery) + e.Str("connectionString", ""). + Str("pingQuery", cfg.PingQuery). + Str("peer.addQuery", cfg.Peer.AddQuery). + Str("peer.delQuery", cfg.Peer.DelQuery). + Str("peer.graduateQuery", cfg.Peer.GraduateQuery). + Str("peer.countQuery", cfg.Peer.CountQuery). + Str("peer.countSeedersColumn", cfg.Peer.CountSeedersColumn). + Str("peer.countLeechersColumn", cfg.Peer.CountLeechersColumn). + Str("peer.byInfoHashClause", cfg.Peer.ByInfoHashClause). + Str("announce.query", cfg.Announce.Query). + Str("announce.peerIDColumn", cfg.Announce.PeerIDColumn). + Str("announce.addressColumn", cfg.Announce.AddressColumn). + Str("announce.portColumn", cfg.Announce.PortColumn). + Str("data.addQuery", cfg.Data.AddQuery). + Str("data.getQuery", cfg.Data.GetQuery). + Str("data.delQuery", cfg.Data.DelQuery). + Str("gcQuery", cfg.GCQuery). + Str("infoHashCountQuery", cfg.InfoHashCountQuery) } // Validate sanity checks values set in a config and returns a new config with @@ -142,6 +167,14 @@ func (cfg Config) Validate() (Config, error) { return cfg, err } + if err := fn(&validCfg.Peer.CountSeedersColumn, "Peer.CountSeedersColumn"); err != nil { + return cfg, err + } + + if err := fn(&validCfg.Peer.CountLeechersColumn, "Peer.CountLeechersColumn"); err != nil { + return cfg, err + } + if err := fn(&validCfg.Peer.ByInfoHashClause, "Peer.ByInfoHashClause"); err != nil { return cfg, err } @@ -174,63 +207,118 @@ func (cfg Config) Validate() (Config, error) { return cfg, err } + validCfg.Announce.PeerIDColumn = strings.ToUpper(validCfg.Announce.PeerIDColumn) + validCfg.Announce.PeerIDColumn = strings.ToUpper(validCfg.Announce.AddressColumn) + validCfg.Announce.PeerIDColumn = strings.ToUpper(validCfg.Announce.PortColumn) + + validCfg.Peer.CountSeedersColumn = strings.ToUpper(validCfg.Peer.CountSeedersColumn) + validCfg.Peer.CountLeechersColumn = strings.ToUpper(validCfg.Peer.CountLeechersColumn) + return validCfg, nil } type store struct { - *Config + Config *pgxpool.Pool + wg sync.WaitGroup + closed chan any } -func (s store) Put(ctx string, values ...storage.Entry) error { +func (s *store) Put(ctx string, values ...storage.Entry) error { // TODO implement me panic("implement me") } -func (s store) Contains(ctx string, key string) (bool, error) { +func (s *store) Contains(ctx string, key string) (bool, error) { // TODO implement me panic("implement me") } -func (s store) Load(ctx string, key string) (any, error) { +func (s *store) Load(ctx string, key string) (any, error) { // TODO implement me panic("implement me") } -func (s store) Delete(ctx string, keys ...string) error { +func (s *store) Delete(ctx string, keys ...string) error { // TODO implement me panic("implement me") } -func (s store) Preservable() bool { +func (s *store) Preservable() bool { return true } -func (s store) GCAware() bool { +func (s *store) GCAware() bool { return len(s.GCQuery) > 0 } -func (s store) ScheduleGC(gcInterval, peerLifeTime time.Duration) { - // TODO implement me - panic("implement me") +func (s *store) ScheduleGC(gcInterval, peerLifeTime time.Duration) { + s.wg.Add(1) + go func() { + defer s.wg.Done() + t := time.NewTimer(gcInterval) + defer t.Stop() + for { + select { + case <-s.closed: + return + case <-t.C: + start := time.Now() + _, err := s.Exec(context.Background(), s.GCQuery, time.Now().Add(-peerLifeTime)) + duration := time.Since(start) + if err != nil { + logger.Error().Err(err).Msg("error occurred while GC") + } else { + logger.Debug().Dur("timeTaken", duration).Msg("GC complete") + } + storage.PromGCDurationMilliseconds.Observe(float64(duration.Milliseconds())) + t.Reset(gcInterval) + } + } + }() } -func (s store) StatisticsAware() bool { - return len(s.InfoHashCountQuery) > 0 && len(s.Peer.CountQuery) > 0 +func (s *store) StatisticsAware() bool { + return len(s.InfoHashCountQuery) > 0 } -func (s store) ScheduleStatisticsCollection(reportInterval time.Duration) { - // TODO implement me - panic("implement me") +func (s *store) ScheduleStatisticsCollection(reportInterval time.Duration) { + s.wg.Add(1) + go func() { + defer s.wg.Done() + t := time.NewTicker(reportInterval) + for { + select { + case <-s.closed: + t.Stop() + return + case <-t.C: + if metrics.Enabled() { + before := time.Now() + sc, lc := s.countPeers(bittorrent.NoneInfoHash) + var hc int + row := s.QueryRow(context.Background(), s.InfoHashCountQuery) + if err := row.Scan(&hc); err != nil && !errors.Is(err, pgx.ErrNoRows) { + logger.Error().Err(err).Msg("error occurred while get info hash count") + } + + storage.PromInfoHashesCount.Set(float64(hc)) + storage.PromSeedersCount.Set(float64(sc)) + storage.PromLeechersCount.Set(float64(lc)) + logger.Debug().TimeDiff("timeTaken", time.Now(), before).Msg("populate prom complete") + } + } + } + }() } -func (s store) putPeer(ih bittorrent.InfoHash, peer bittorrent.Peer, seeder bool) error { +func (s *store) putPeer(ih bittorrent.InfoHash, peer bittorrent.Peer, seeder bool) error { logger.Trace(). Stringer("infoHash", ih). Object("peer", peer). Bool("seeder", seeder). Msg("put peer") - args := []interface{}{ih, peer.ID, net.IP(peer.Addr().AsSlice()), peer.Port(), seeder, peer.Addr().Is6()} + args := []interface{}{[]byte(ih), peer.ID[:], net.IP(peer.Addr().AsSlice()), peer.Port(), seeder, peer.Addr().Is6()} if s.GCAware() { args = append(args, tc.Now()) } @@ -238,52 +326,83 @@ func (s store) putPeer(ih bittorrent.InfoHash, peer bittorrent.Peer, seeder bool return err } -func (s store) delPeer(ih bittorrent.InfoHash, peer bittorrent.Peer, seeder bool) error { +func (s *store) delPeer(ih bittorrent.InfoHash, peer bittorrent.Peer, seeder bool) error { logger.Trace(). Stringer("infoHash", ih). Object("peer", peer). Bool("seeder", seeder). Msg("del peer") - _, err := s.Exec(context.TODO(), s.Peer.DelQuery, ih, peer.ID, net.IP(peer.Addr().AsSlice()), peer.Port(), seeder) + _, err := s.Exec(context.TODO(), s.Peer.DelQuery, []byte(ih), peer.ID[:], net.IP(peer.Addr().AsSlice()), peer.Port(), seeder) return err } -func (s store) PutSeeder(ih bittorrent.InfoHash, peer bittorrent.Peer) error { +func (s *store) PutSeeder(ih bittorrent.InfoHash, peer bittorrent.Peer) error { return s.putPeer(ih, peer, true) } -func (s store) DeleteSeeder(ih bittorrent.InfoHash, peer bittorrent.Peer) error { +func (s *store) DeleteSeeder(ih bittorrent.InfoHash, peer bittorrent.Peer) error { return s.delPeer(ih, peer, true) } -func (s store) PutLeecher(ih bittorrent.InfoHash, peer bittorrent.Peer) error { +func (s *store) PutLeecher(ih bittorrent.InfoHash, peer bittorrent.Peer) error { return s.putPeer(ih, peer, false) } -func (s store) DeleteLeecher(ih bittorrent.InfoHash, peer bittorrent.Peer) error { +func (s *store) DeleteLeecher(ih bittorrent.InfoHash, peer bittorrent.Peer) error { return s.delPeer(ih, peer, false) } -func (s store) GraduateLeecher(ih bittorrent.InfoHash, peer bittorrent.Peer) error { +func (s *store) GraduateLeecher(ih bittorrent.InfoHash, peer bittorrent.Peer) error { logger.Trace(). Stringer("infoHash", ih). Object("peer", peer). Msg("graduate leecher") - _, err := s.Exec(context.TODO(), s.Peer.GraduateQuery, ih, peer.ID, peer.Addr(), peer.Port()) + _, err := s.Exec(context.TODO(), s.Peer.GraduateQuery, []byte(ih), peer.ID[:], peer.Addr(), peer.Port()) return err } -func (s store) getPeers(ih bittorrent.InfoHash, seeders bool, maxCount int, isV6 bool) (peers []bittorrent.Peer, err error) { +func (s *store) getPeers(ih bittorrent.InfoHash, seeders bool, maxCount int, isV6 bool) (peers []bittorrent.Peer, err error) { var rows pgx.Rows - if rows, err = s.Query(context.TODO(), s.Announce.Query, ih, isV6, seeders, maxCount); err == nil { + if rows, err = s.Query(context.TODO(), s.Announce.Query, []byte(ih), isV6, seeders, maxCount); err == nil { defer rows.Close() + idIndex, ipIndex, portIndex := -1, -1, -1 + for i, field := range rows.FieldDescriptions() { + name := strings.ToUpper(string(field.Name)) + switch name { + case s.Announce.PeerIDColumn: + idIndex = i + case s.Announce.AddressColumn: + ipIndex = i + case s.Announce.PortColumn: + portIndex = i + } + } + if idIndex < 0 || ipIndex < 0 || portIndex < 0 { + err = fmt.Errorf(errRequiredColumnsNotFoundMsg, []string{s.Announce.PeerIDColumn, s.Announce.AddressColumn, s.Announce.PortColumn}) + return + } + var maxIndex int + switch { + case idIndex >= ipIndex && idIndex >= portIndex: + maxIndex = idIndex + break + case ipIndex >= idIndex && ipIndex >= portIndex: + maxIndex = ipIndex + break + case portIndex >= idIndex && portIndex >= ipIndex: + maxIndex = portIndex + break + } + for rows.Next() && len(peers) < maxCount { var peer bittorrent.Peer var id []byte var ip net.IP var port int + into := make([]interface{}, maxIndex+1) + into[idIndex], into[ipIndex], into[portIndex] = &id, &ip, &port - if err = rows.Scan(&id, &ip, &port); err == nil { + if err = rows.Scan(into...); err == nil { if peer.ID, err = bittorrent.NewPeerID(id); err == nil { if netAddr, isOk := netip.AddrFromSlice(ip); isOk { peer.AddrPort = netip.AddrPortFrom(netAddr, uint16(port)) @@ -307,7 +426,7 @@ func (s store) getPeers(ih bittorrent.InfoHash, seeders bool, maxCount int, isV6 return } -func (s store) AnnouncePeers(ih bittorrent.InfoHash, forSeeder bool, numWant int, v6 bool) (peers []bittorrent.Peer, err error) { +func (s *store) AnnouncePeers(ih bittorrent.InfoHash, forSeeder bool, numWant int, v6 bool) (peers []bittorrent.Peer, err error) { logger.Trace(). Stringer("infoHash", ih). Bool("forSeeder", forSeeder). @@ -336,12 +455,59 @@ func (s store) AnnouncePeers(ih bittorrent.InfoHash, forSeeder bool, numWant int return } -func (s store) ScrapeSwarm(ih bittorrent.InfoHash) (leechers uint32, seeders uint32, snatched uint32) { - // TODO implement me - panic("implement me") +func (s *store) countPeers(ih bittorrent.InfoHash) (leechers int, seeders int) { + var rows pgx.Rows + var err error + if ih == bittorrent.NoneInfoHash { + rows, err = s.Query(context.TODO(), s.Peer.CountQuery) + } else { + rows, err = s.Query(context.TODO(), s.Peer.CountQuery+" "+s.Peer.ByInfoHashClause, []byte(ih)) + } + if err == nil { + defer rows.Close() + if rows.Next() { + si, li := -1, -1 + for i, field := range rows.FieldDescriptions() { + name := strings.ToUpper(string(field.Name)) + switch name { + case s.Peer.CountSeedersColumn: + si = i + case s.Peer.CountLeechersColumn: + li = i + } + } + if si < 0 || li < 0 { + err = fmt.Errorf(errRequiredColumnsNotFoundMsg, []string{s.Peer.CountSeedersColumn, s.Peer.CountLeechersColumn}) + } else { + var mi int + if si > li { + mi = si + } else { + mi = li + } + into := make([]interface{}, mi+1) + into[si], into[li] = &seeders, &leechers + + err = rows.Scan(into...) + } + } + } + if err != nil { + logger.Error().Err(err).Stringer("infoHash", ih).Msg("unable to get peers count") + } + return } -func (s store) Ping() error { +func (s *store) ScrapeSwarm(ih bittorrent.InfoHash) (leechers uint32, seeders uint32, snatched uint32) { + logger.Trace(). + Stringer("infoHash", ih). + Msg("scrape swarm") + sc, lc := s.countPeers(ih) + seeders, leechers = uint32(sc), uint32(lc) + return +} + +func (s *store) Ping() error { _, err := s.Exec(context.TODO(), s.PingQuery) return err } @@ -352,6 +518,6 @@ func (s *store) Stop() stop.Result { return c.Result() } -func (s store) MarshalZerologObject(e *zerolog.Event) { - e.Str("type", Name).Object("config", *s.Config) +func (s *store) MarshalZerologObject(e *zerolog.Event) { + e.Str("type", Name).Object("config", s.Config) } diff --git a/storage/redis/storage.go b/storage/redis/storage.go index 9274899..a77d9d4 100644 --- a/storage/redis/storage.go +++ b/storage/redis/storage.go @@ -319,7 +319,7 @@ func (ps *store) count(key string, getLength bool) (n uint64) { } err = AsNil(err) if err != nil { - logger.Error().Err(err).Str("key", key).Msg("storage: Redis: GET/SCARD failure") + logger.Error().Err(err).Str("key", key).Msg("GET/SCARD failure") } return } @@ -793,7 +793,7 @@ func (ps *store) Stop() stop.Result { ps.wg.Wait() var err error if ps.UniversalClient != nil { - logger.Info().Msg("storage: Redis: exiting. mochi does not clear data in redis when exiting. mochi keys have prefix " + PrefixKey) + logger.Info().Msg("redis exiting. mochi does not clear data in redis when exiting. mochi keys have prefix " + PrefixKey) err = ps.UniversalClient.Close() ps.UniversalClient = nil }