(minor) refactor redis GC and fix TX calls,

add indicator that metrics are running
This commit is contained in:
Lawrence, Rendall
2022-04-17 00:57:30 +03:00
parent 7716aa828a
commit f9c72341c0
12 changed files with 169 additions and 137 deletions
+4 -4
View File
@@ -16,10 +16,10 @@ jobs:
- uses: "actions/setup-go@v3"
with:
go-version: "^1.18"
- uses: "authzed/actions/gofumpt@main"
- uses: "authzed/actions/go-mod-tidy@main"
- uses: "authzed/actions/go-generate@main"
- uses: "authzed/actions/golangci-lint@main"
- uses: "authzed/actions/gofumpt@main"
- uses: "authzed/actions/go-mod-tidy@main"
- uses: "authzed/actions/go-generate@main"
- uses: "authzed/actions/golangci-lint@main"
codeql:
name: "Analyze with CodeQL"
+8 -11
View File
@@ -33,17 +33,14 @@ type Params interface {
RawQuery() string
}
// ErrKeyNotFound is returned when a provided key has no value associated with
// it.
var ErrKeyNotFound = errors.New("query: value for the provided key does not exist")
// ErrInvalidInfohash is returned when parsing a query encounters an infohash
// with invalid length.
// var ErrInvalidInfohash = ClientError("provided invalid infohash")
// ErrInvalidQueryEscape is returned when a query string contains invalid
// escapes.
var ErrInvalidQueryEscape = ClientError("invalid query escape")
var (
// ErrKeyNotFound is returned when a provided key has no value associated with
// it.
ErrKeyNotFound = errors.New("query: value for the provided key does not exist")
// ErrInvalidQueryEscape is returned when a query string contains invalid
// escapes.
ErrInvalidQueryEscape = ClientError("invalid query escape")
)
// QueryParams parses a URL Query and implements the Params interface with some
// additional helpers.
+38 -38
View File
@@ -139,10 +139,10 @@ mochi:
# are collected and posted to Prometheus.
prometheus_reporting_interval: 1s
# This block defines configuration used for redis storage.
#storage:
#name: redis
#config:
# This block defines configuration used for redis storage.
#storage:
#name: redis
#config:
# The frequency which stale peers are removed.
# This balances between
# - collecting garbage more often, potentially using more CPU time, but potentially using less memory (lower value)
@@ -195,38 +195,38 @@ mochi:
# This block defines configuration used for middleware executed before a
# response has been returned to a BitTorrent client.
prehooks:
# - name: jwt
# options:
# issuer: "https://issuer.com"
# audience: "https://some.issuer.com"
# jwk_set_url: "https://issuer.com/keys"
# jwk_set_update_interval: 5m
#
# - name: client approval
# options:
# whitelist:
# - "OP1011"
# blacklist:
# - "OP1012"
#
# - name: interval variation
# options:
# modify_response_probability: 0.2
# max_increase_delta: 60
# modify_min_interval: true
#
# This block defines configuration used for torrent approval, it requires to be given
# hashes for whitelist or for blacklist. Hashes are hexadecimal-encoaded.
# - name: torrent approval
# options:
# initial_source: list
# Save data provided by source in storage above
# preserve: false
# configuration:
# hash_list:
# - "a1b2c3d4e5a1b2c3d4e5a1b2c3d4e5a1b2c3d4e5"
# true - whitelist mode, false - blacklist
# invert: false
# Name of storage context where store hash list
# storage_ctx: APPROVED_HASH
# - name: jwt
# options:
# issuer: "https://issuer.com"
# audience: "https://some.issuer.com"
# jwk_set_url: "https://issuer.com/keys"
# jwk_set_update_interval: 5m
#
# - name: client approval
# options:
# whitelist:
# - "OP1011"
# blacklist:
# - "OP1012"
#
# - name: interval variation
# options:
# modify_response_probability: 0.2
# max_increase_delta: 60
# modify_min_interval: true
#
# This block defines configuration used for torrent approval, it requires to be given
# hashes for whitelist or for blacklist. Hashes are hexadecimal-encoaded.
# - name: torrent approval
# options:
# initial_source: list
# Save data provided by source in storage above
# preserve: false
# configuration:
# hash_list:
# - "a1b2c3d4e5a1b2c3d4e5a1b2c3d4e5a1b2c3d4e5"
# true - whitelist mode, false - blacklist
# invert: false
# Name of storage context where store hash list
# storage_ctx: APPROVED_HASH
posthooks:
+2 -1
View File
@@ -5,7 +5,8 @@ Redis. Multiple instances of MoChi can use the same redis instance concurrently.
clustering. If one instance of MoChi goes down, peer data will still be available in Redis.
The HA of storage service is not considered here. In case Redis runs as a single node, peer data will be unavailable if
the node is down. You should consider setting up a Redis sentinel (or KeyDB active-active replication) for MoChi in production.
the node is down. You should consider setting up a Redis sentinel (or KeyDB active-active replication) for MoChi in
production.
This storage implementation is currently orders of magnitude slower than the in-memory implementation.
+17 -20
View File
@@ -17,6 +17,7 @@ import (
"github.com/sot-tech/mochi/frontend"
"github.com/sot-tech/mochi/pkg/conf"
"github.com/sot-tech/mochi/pkg/log"
"github.com/sot-tech/mochi/pkg/metrics"
"github.com/sot-tech/mochi/pkg/stop"
)
@@ -285,17 +286,15 @@ func injectRouteParamsToContext(ctx context.Context, ps httprouter.Params) conte
func (f *Frontend) announceRoute(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
var err error
var start time.Time
if f.EnableRequestTiming {
start = time.Now()
}
var addr netip.Addr
defer func() {
if f.EnableRequestTiming {
recordResponseDuration("announce", addr, err, time.Since(start))
} else {
recordResponseDuration("announce", addr, err, time.Duration(0))
}
}()
if f.EnableRequestTiming && metrics.Enabled() {
start = time.Now()
defer func() {
if f.EnableRequestTiming && metrics.Enabled() {
recordResponseDuration("announce", addr, err, time.Since(start))
}
}()
}
req, err := ParseAnnounce(r, f.ParseOptions)
if err != nil {
@@ -325,17 +324,15 @@ func (f *Frontend) announceRoute(w http.ResponseWriter, r *http.Request, ps http
func (f *Frontend) scrapeRoute(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
var err error
var start time.Time
if f.EnableRequestTiming {
start = time.Now()
}
var addr netip.Addr
defer func() {
if f.EnableRequestTiming {
recordResponseDuration("scrape", addr, err, time.Since(start))
} else {
recordResponseDuration("scrape", addr, err, time.Duration(0))
}
}()
if f.EnableRequestTiming && metrics.Enabled() {
start = time.Now()
defer func() {
if f.EnableRequestTiming && metrics.Enabled() {
recordResponseDuration("scrape", addr, err, time.Since(start))
}
}()
}
req, err := ParseScrape(r, f.ParseOptions)
if err != nil {
+1 -1
View File
@@ -79,7 +79,7 @@ func ParseAnnounce(r *http.Request, opts ParseOptions) (*bittorrent.AnnounceRequ
}
request.Peer.ID, err = bittorrent.NewPeerID([]byte(peerID))
if err != nil {
return nil, err
return nil, errInvalidPeerID
}
// Determine the number of remaining bytes for the client.
request.Left, err = qp.Uint("left", 64)
+3 -4
View File
@@ -18,6 +18,7 @@ import (
"github.com/sot-tech/mochi/frontend/udp/bytepool"
"github.com/sot-tech/mochi/pkg/conf"
"github.com/sot-tech/mochi/pkg/log"
"github.com/sot-tech/mochi/pkg/metrics"
"github.com/sot-tech/mochi/pkg/stop"
"github.com/sot-tech/mochi/pkg/timecache"
)
@@ -214,17 +215,15 @@ func (t *Frontend) serve() error {
// Handle the request.
addr := addrPort.Addr().Unmap()
var start time.Time
if t.EnableRequestTiming {
if t.EnableRequestTiming && metrics.Enabled() {
start = time.Now()
}
action, err := t.handleRequest(
Request{(*buffer)[:n], addr},
ResponseWriter{t.socket, addrPort},
)
if t.EnableRequestTiming {
if t.EnableRequestTiming && metrics.Enabled() {
recordResponseDuration(action, addr, err, time.Since(start))
} else {
recordResponseDuration(action, addr, err, time.Duration(0))
}
}()
}
+4 -2
View File
@@ -45,6 +45,8 @@ var (
errUnknownAction = bittorrent.ClientError("unknown action ID")
errBadConnectionID = bittorrent.ClientError("bad connection ID")
errUnknownOptionType = bittorrent.ClientError("unknown option type")
errInvalidInfoHash = bittorrent.ClientError("invalid info hash")
errInvalidPeerID = bittorrent.ClientError("invalid info hash")
)
// ParseOptions is the configuration used to parse an Announce Request.
@@ -117,12 +119,12 @@ func ParseAnnounce(r Request, v6Action bool, opts ParseOptions) (*bittorrent.Ann
ih, err := bittorrent.NewInfoHash(infohash)
if err != nil {
return nil, err
return nil, errInvalidInfoHash
}
peerID, err := bittorrent.NewPeerID(peerIDBytes)
if err != nil {
return nil, err
return nil, errInvalidPeerID
}
request := &bittorrent.AnnounceRequest{
+10
View File
@@ -8,6 +8,7 @@ import (
"net/http"
"net/http/pprof"
"net/netip"
"sync/atomic"
"github.com/prometheus/client_golang/prometheus/promhttp"
@@ -15,6 +16,13 @@ import (
"github.com/sot-tech/mochi/pkg/stop"
)
var serverCounter = new(int32)
// Enabled indicates that configured at least one metrics server
func Enabled() bool {
return atomic.LoadInt32(serverCounter) > 0
}
// Server represents a standalone HTTP server for serving a Prometheus metrics
// endpoint.
type Server struct {
@@ -63,6 +71,8 @@ func NewServer(addr string) *Server {
}
go func() {
atomic.AddInt32(serverCounter, 1)
defer atomic.AddInt32(serverCounter, -1)
if err := s.srv.ListenAndServe(); !errors.Is(err, http.ErrServerClosed) {
log.Fatal("failed while serving prometheus", log.Err(err))
}
+6 -3
View File
@@ -15,6 +15,7 @@ import (
"github.com/sot-tech/mochi/bittorrent"
"github.com/sot-tech/mochi/pkg/conf"
"github.com/sot-tech/mochi/pkg/log"
"github.com/sot-tech/mochi/pkg/metrics"
"github.com/sot-tech/mochi/pkg/stop"
"github.com/sot-tech/mochi/pkg/timecache"
"github.com/sot-tech/mochi/storage"
@@ -157,9 +158,11 @@ func NewPeerStorage(provided Config) (storage.PeerStorage, error) {
t.Stop()
return
case <-t.C:
before := time.Now()
ps.populateProm()
log.Debug("storage: populateProm() finished", log.Fields{"timeTaken": time.Since(before)})
if metrics.Enabled() {
before := time.Now()
ps.populateProm()
log.Debug("storage: populateProm() finished", log.Fields{"timeTaken": time.Since(before)})
}
}
}
}()
+74 -51
View File
@@ -33,6 +33,7 @@ import (
"github.com/sot-tech/mochi/bittorrent"
"github.com/sot-tech/mochi/pkg/conf"
"github.com/sot-tech/mochi/pkg/log"
"github.com/sot-tech/mochi/pkg/metrics"
"github.com/sot-tech/mochi/pkg/stop"
"github.com/sot-tech/mochi/pkg/timecache"
"github.com/sot-tech/mochi/storage"
@@ -293,7 +294,7 @@ func (ps *store) scheduleGC(gcInterval, peerLifeTime time.Duration) {
start := time.Now()
ps.GC(time.Now().Add(-peerLifeTime))
duration := time.Since(start).Milliseconds()
log.Debug("storage: recordGCDuration", log.Fields{"timeTaken(ms)": duration})
log.Debug("storage: Redis: recordGCDuration", log.Fields{"timeTaken(ms)": duration})
storage.PromGCDurationMilliseconds.Observe(float64(duration))
t.Reset(gcInterval)
}
@@ -309,9 +310,11 @@ func (ps *store) schedulerProm(reportInterval time.Duration) {
t.Stop()
return
case <-t.C:
before := time.Now()
ps.populateProm()
log.Debug("storage: populateProm() finished", log.Fields{"timeTaken": time.Since(before)})
if metrics.Enabled() {
before := time.Now()
ps.populateProm()
log.Debug("storage: Redis: populateProm() finished", log.Fields{"timeTaken": time.Since(before)})
}
}
}
}
@@ -334,7 +337,7 @@ func (ps *store) count(key string, getLength bool) (n uint64) {
}
err = asNil(err)
if err != nil {
log.Error("storage: len/counter failure", log.Fields{
log.Error("storage: Redis: GET/SCARD failure", log.Fields{
"key": key,
"error": err,
})
@@ -396,7 +399,7 @@ func asNil(err error) error {
func (ps *store) PutSeeder(ih bittorrent.InfoHash, peer bittorrent.Peer) error {
var ihSummaryKey, ihPeerKey, cntPeerKey string
log.Debug("storage: PutSeeder", log.Fields{
log.Debug("storage: Redis: PutSeeder", log.Fields{
"InfoHash": ih,
"Peer": peer,
})
@@ -411,17 +414,17 @@ func (ps *store) PutSeeder(ih bittorrent.InfoHash, peer bittorrent.Peer) error {
if err = tx.HSet(ps.ctx, ihPeerKey, peer.RawString(), ps.getClock()).Err(); err != nil {
return
}
if err = ps.con.Incr(ps.ctx, cntPeerKey).Err(); err != nil {
if err = tx.Incr(ps.ctx, cntPeerKey).Err(); err != nil {
return
}
err = ps.con.SAdd(ps.ctx, ihSummaryKey, ihPeerKey).Err()
err = tx.SAdd(ps.ctx, ihSummaryKey, ihPeerKey).Err()
return
})
}
func (ps *store) DeleteSeeder(ih bittorrent.InfoHash, peer bittorrent.Peer) error {
var ihPeerKey, cntPeerKey string
log.Debug("storage: DeleteSeeder", log.Fields{
log.Debug("storage: Redis: DeleteSeeder", log.Fields{
"InfoHash": ih,
"Peer": peer,
})
@@ -447,7 +450,7 @@ func (ps *store) DeleteSeeder(ih bittorrent.InfoHash, peer bittorrent.Peer) erro
func (ps *store) PutLeecher(ih bittorrent.InfoHash, peer bittorrent.Peer) error {
var ihSummaryKey, ihPeerKey, cntPeerKey string
log.Debug("storage: PutLeecher", log.Fields{
log.Debug("storage: Redis: PutLeecher", log.Fields{
"InfoHash": ih,
"Peer": peer,
})
@@ -472,7 +475,7 @@ func (ps *store) PutLeecher(ih bittorrent.InfoHash, peer bittorrent.Peer) error
func (ps *store) DeleteLeecher(ih bittorrent.InfoHash, peer bittorrent.Peer) error {
var ihPeerKey, cntPeerKey string
log.Debug("storage: DeleteLeecher", log.Fields{
log.Debug("storage: Redis: DeleteLeecher", log.Fields{
"InfoHash": ih,
"Peer": peer,
})
@@ -499,7 +502,7 @@ func (ps *store) DeleteLeecher(ih bittorrent.InfoHash, peer bittorrent.Peer) err
func (ps *store) GraduateLeecher(ih bittorrent.InfoHash, peer bittorrent.Peer) error {
var ihSummaryKey, ihSeederKey, ihLeecherKey, cntSeederKey, cntLeecherKey string
log.Debug("storage: GraduateLeecher", log.Fields{
log.Debug("storage: Redis: GraduateLeecher", log.Fields{
"InfoHash": ih,
"Peer": peer,
})
@@ -537,7 +540,7 @@ func (ps *store) GraduateLeecher(ih bittorrent.InfoHash, peer bittorrent.Peer) e
func (ps *store) AnnouncePeers(ih bittorrent.InfoHash, seeder bool, numWant int, peer bittorrent.Peer) (peers []bittorrent.Peer, err error) {
var ihSeederKey, ihLeecherKey string
log.Debug("storage: AnnouncePeers", log.Fields{
log.Debug("storage: Redis: AnnouncePeers", log.Fields{
"InfoHash": ih,
"seeder": seeder,
"numWant": numWant,
@@ -578,7 +581,7 @@ func (ps *store) AnnouncePeers(ih bittorrent.InfoHash, seeder bool, numWant int,
peers = append(peers, p)
numWant--
} else {
log.Error("storage: unable to decode leecher", log.Fields{"peer": peerKey})
log.Error("storage: Redis: unable to decode leecher", log.Fields{"peer": peerKey})
}
}
} else {
@@ -591,7 +594,7 @@ func (ps *store) AnnouncePeers(ih bittorrent.InfoHash, seeder bool, numWant int,
peers = append(peers, p)
numWant--
} else {
log.Error("storage: unable to decode seeder", log.Fields{"peer": peerKey})
log.Error("storage: Redis: to decode seeder", log.Fields{"peer": peerKey})
}
}
@@ -607,7 +610,7 @@ func (ps *store) AnnouncePeers(ih bittorrent.InfoHash, seeder bool, numWant int,
peers = append(peers, p)
numWant--
} else {
log.Error("storage: unable to decode leecher", log.Fields{"peer": peerKey})
log.Error("storage: Redis: unable to decode leecher", log.Fields{"peer": peerKey})
}
}
}
@@ -619,7 +622,7 @@ func (ps *store) AnnouncePeers(ih bittorrent.InfoHash, seeder bool, numWant int,
func (ps *store) ScrapeSwarm(ih bittorrent.InfoHash, peer bittorrent.Peer) (resp bittorrent.Scrape) {
var ihSeederKey, ihLeecherKey string
log.Debug("storage: ScrapeSwarm", log.Fields{
log.Debug("storage: Redis ScrapeSwarm", log.Fields{
"InfoHash": ih,
"Peer": peer,
})
@@ -635,7 +638,7 @@ func (ps *store) ScrapeSwarm(ih bittorrent.InfoHash, peer bittorrent.Peer) (resp
leechersLen, err := ps.con.HLen(ps.ctx, ihLeecherKey).Result()
err = asNil(err)
if err != nil {
log.Error("storage: Redis HLEN failure", log.Fields{
log.Error("storage: Redis: HLEN failure", log.Fields{
"Hkey": ihLeecherKey,
"error": err,
})
@@ -645,7 +648,7 @@ func (ps *store) ScrapeSwarm(ih bittorrent.InfoHash, peer bittorrent.Peer) (resp
seedersLen, err := ps.con.HLen(ps.ctx, ihSeederKey).Result()
err = asNil(err)
if err != nil {
log.Error("storage: Redis HLEN failure", log.Fields{
log.Error("storage: Redis: HLEN failure", log.Fields{
"Hkey": ihSeederKey,
"error": err,
})
@@ -672,7 +675,7 @@ func (ps *store) Put(ctx string, values ...storage.Entry) (err error) {
err = ps.con.HSet(ps.ctx, prefixKey+ctx, args...).Err()
if err != nil {
if strings.Contains(err.Error(), argNumErrorMsg) {
log.Warn("This REDIS version/implementation does not support variadic arguments for HSET")
log.Warn("This Redis version/implementation does not support variadic arguments for HSET")
for _, p := range values {
if err = ps.con.HSet(ps.ctx, prefixKey+ctx, p.Key, p.Value).Err(); err != nil {
break
@@ -703,7 +706,7 @@ func (ps *store) Delete(ctx string, keys ...string) (err error) {
err = asNil(ps.con.HDel(ps.ctx, prefixKey+ctx, keys...).Err())
if err != nil {
if strings.Contains(err.Error(), argNumErrorMsg) {
log.Warn("This REDIS version/implementation does not support variadic arguments for HDEL")
log.Warn("This Redis version/implementation does not support variadic arguments for HDEL")
for _, k := range keys {
if err = asNil(ps.con.HDel(ps.ctx, prefixKey+ctx, k).Err()); err != nil {
break
@@ -720,7 +723,7 @@ func (*store) Preservable() bool {
}
func (ps *store) GC(cutoff time.Time) {
log.Debug("storage: purging peers with no announces since", log.Fields{"before": cutoff})
log.Debug("storage: Redis: purging peers with no announces since", log.Fields{"before": cutoff})
cutoffUnix := cutoff.UnixNano()
ps.gc(cutoffUnix, false)
ps.gc(cutoffUnix, true)
@@ -802,43 +805,63 @@ func (ps *store) gc(cutoffNanos int64, v6 bool) {
peerList, err := ps.con.HGetAll(ps.ctx, infoHashKey).Result()
err = asNil(err)
if err == nil {
var removedPeerCount int64
peersToRemove := make([]string, 0)
for peerKey, timeStamp := range peerList {
var peer bittorrent.Peer
if peer, err = bittorrent.NewPeer(peerKey); err == nil {
if mtime, err := strconv.ParseInt(timeStamp, 10, 64); err == nil {
if mtime <= cutoffNanos {
log.Debug("storage: Redis: deleting peer", log.Fields{
"Peer": peer,
})
var count int64
count, err = ps.con.HDel(ps.ctx, infoHashKey, peerKey).Result()
err = asNil(err)
if err == nil {
removedPeerCount += count
}
}
if mtime, err := strconv.ParseInt(timeStamp, 10, 64); err == nil {
if mtime <= cutoffNanos {
log.Debug("storage: Redis: adding peer to remove list", log.Fields{
"key": peerKey,
})
peersToRemove = append(peersToRemove, peerKey)
}
}
if err != nil {
log.Error("storage: Redis: unable to delete info hash peer", log.Fields{
} else {
log.Error("storage: Redis: unable to decode peer timestamp", log.Fields{
"hashSet": ihSummaryKey,
"infoHashKey": infoHashKey,
"peer": peer,
"key": peerKey,
"timestamp": timeStamp,
"error": err,
})
}
}
// DECR seeder/leecher counter
if removedPeerCount > 0 {
if err := ps.con.DecrBy(ps.ctx, cntKey, removedPeerCount).Err(); err != nil {
log.Error("storage: Redis: unable to decrement seeder/leecher peer count", log.Fields{
"hashSet": ihSummaryKey,
"infoHashKey": infoHashKey,
"key": cntKey,
"error": err,
})
if len(peersToRemove) > 0 {
removedPeerCount, err := ps.con.HDel(ps.ctx, infoHashKey, peersToRemove...).Result()
err = asNil(err)
if err != nil {
if strings.Contains(err.Error(), argNumErrorMsg) {
log.Warn("This Redis version/implementation does not support variadic arguments for HDEL")
for _, k := range peersToRemove {
count, err := ps.con.HDel(ps.ctx, infoHashKey, k).Result()
err = asNil(err)
if err != nil {
log.Error("storage: Redis: unable to delete peer", log.Fields{
"hashSet": ihSummaryKey,
"infoHashKey": infoHashKey,
"key": k,
"error": err,
})
} else {
removedPeerCount += count
}
}
} else {
log.Error("storage: Redis: unable to delete peers", log.Fields{
"hashSet": ihSummaryKey,
"infoHashKey": infoHashKey,
"keys": peersToRemove,
"error": err,
})
}
}
if removedPeerCount > 0 { // DECR seeder/leecher counter
if err = ps.con.DecrBy(ps.ctx, cntKey, removedPeerCount).Err(); err != nil {
log.Error("storage: Redis: unable to decrement seeder/leecher peer count", log.Fields{
"hashSet": ihSummaryKey,
"infoHashKey": infoHashKey,
"key": cntKey,
"error": err,
})
}
}
}
@@ -883,7 +906,7 @@ func (ps *store) Stop() stop.Result {
ps.wg.Wait()
var err error
if ps.con != nil {
log.Info("storage: exiting. mochi does not clear data in redis when exiting. mochi keys have prefix 'IPv{4,6}_'.")
log.Info("storage: Redis: exiting. mochi does not clear data in redis when exiting. mochi keys have prefix " + prefixKey)
err = ps.con.Close()
}
c.Done(err)
+2 -2
View File
@@ -18,7 +18,7 @@ import (
type benchData struct {
infoHashes [1000]bittorrent.InfoHash
peers [10000]bittorrent.Peer
peers [1000]bittorrent.Peer
}
func generateInfoHashes() (a [1000]bittorrent.InfoHash) {
@@ -28,7 +28,7 @@ func generateInfoHashes() (a [1000]bittorrent.InfoHash) {
return
}
func generatePeers() (a [10000]bittorrent.Peer) {
func generatePeers() (a [1000]bittorrent.Peer) {
for i := range a {
var ip []byte
if rand.Int63()%2 == 0 {