mirror of
https://github.com/sot-tech/mochi.git
synced 2026-06-11 07:23:30 -07:00
(tested) add hooks check when ping http route called
This commit is contained in:
Vendored
+2
@@ -66,6 +66,8 @@ mochi:
|
||||
# An array of routes to listen ping requests.
|
||||
# Used just to ensure if server is operational. Returns nothing,
|
||||
# just HTTP 200 without body. Listens both GET and HEAD HTTP methods.
|
||||
# HEAD method just checks http server, GET checks all hooks,
|
||||
# which support ping
|
||||
ping_routes:
|
||||
- "/ping"
|
||||
|
||||
|
||||
@@ -30,4 +30,7 @@ type TrackerLogic interface {
|
||||
|
||||
// AfterScrape does something with the results of a Scrape after it has been completed.
|
||||
AfterScrape(context.Context, *bittorrent.ScrapeRequest, *bittorrent.ScrapeResponse)
|
||||
|
||||
// Ping executes checks if all hooks are operational
|
||||
Ping(context.Context) error
|
||||
}
|
||||
|
||||
@@ -308,6 +308,15 @@ func (f *Frontend) scrapeRoute(w http.ResponseWriter, r *http.Request, ps httpro
|
||||
go f.logic.AfterScrape(ctx, req, resp)
|
||||
}
|
||||
|
||||
func (f Frontend) ping(w http.ResponseWriter, _ *http.Request, _ httprouter.Params) {
|
||||
w.WriteHeader(http.StatusOK)
|
||||
func (f Frontend) ping(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {
|
||||
var err error
|
||||
if r.Method == http.MethodGet {
|
||||
err = f.logic.Ping(context.Background())
|
||||
}
|
||||
if err == nil {
|
||||
w.WriteHeader(http.StatusOK)
|
||||
} else {
|
||||
logger.Error().Err(err).Msg("ping completed with error")
|
||||
w.WriteHeader(http.StatusServiceUnavailable)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -18,6 +18,16 @@ type Hook interface {
|
||||
HandleScrape(context.Context, *bittorrent.ScrapeRequest, *bittorrent.ScrapeResponse) (context.Context, error)
|
||||
}
|
||||
|
||||
// Pinger is an optional interface that may be implemented by a pre Hook
|
||||
// to check if it is operational. Used in frontend.TrackerLogic.
|
||||
//
|
||||
// It may be useful in cases when Hook performs foreign requests to
|
||||
// some external resources (i.e. storage) and `ping` request should
|
||||
// also check resource availability.
|
||||
type Pinger interface {
|
||||
Ping(ctx context.Context) error
|
||||
}
|
||||
|
||||
type skipSwarmInteraction struct{}
|
||||
|
||||
// SkipSwarmInteractionKey is a key for the context of an Announce to control
|
||||
@@ -205,3 +215,7 @@ func (h *responseHook) HandleScrape(ctx context.Context, req *bittorrent.ScrapeR
|
||||
|
||||
return ctx, nil
|
||||
}
|
||||
|
||||
func (h *responseHook) Ping(_ context.Context) error {
|
||||
return h.store.Ping()
|
||||
}
|
||||
|
||||
+19
-1
@@ -19,12 +19,19 @@ var (
|
||||
// NewLogic creates a new instance of a TrackerLogic that executes the provided
|
||||
// middleware hooks.
|
||||
func NewLogic(annInterval, minAnnInterval time.Duration, peerStore storage.PeerStorage, preHooks, postHooks []Hook) *Logic {
|
||||
return &Logic{
|
||||
l := &Logic{
|
||||
announceInterval: annInterval,
|
||||
minAnnounceInterval: minAnnInterval,
|
||||
preHooks: append(preHooks, &responseHook{store: peerStore}),
|
||||
postHooks: append(postHooks, &swarmInteractionHook{store: peerStore}),
|
||||
pingers: make([]Pinger, 0, 1),
|
||||
}
|
||||
for _, h := range l.preHooks {
|
||||
if ph, isOk := h.(Pinger); isOk {
|
||||
l.pingers = append(l.pingers, ph)
|
||||
}
|
||||
}
|
||||
return l
|
||||
}
|
||||
|
||||
// Logic is an implementation of the TrackerLogic that functions by
|
||||
@@ -34,6 +41,7 @@ type Logic struct {
|
||||
minAnnounceInterval time.Duration
|
||||
preHooks []Hook
|
||||
postHooks []Hook
|
||||
pingers []Pinger
|
||||
}
|
||||
|
||||
// HandleAnnounce generates a response for an Announce.
|
||||
@@ -101,6 +109,16 @@ func (l *Logic) AfterScrape(ctx context.Context, req *bittorrent.ScrapeRequest,
|
||||
}
|
||||
}
|
||||
|
||||
// Ping performs check if all Hook-s are operational
|
||||
func (l *Logic) Ping(ctx context.Context) (err error) {
|
||||
for _, p := range l.pingers {
|
||||
if err = p.Ping(ctx); err != nil {
|
||||
break
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// Stop stops the Logic.
|
||||
//
|
||||
// This stops any hooks that implement stop.Stopper.
|
||||
|
||||
@@ -557,6 +557,10 @@ func (ps *peerStore) gc(cutoff time.Time) {
|
||||
}
|
||||
}
|
||||
|
||||
func (*peerStore) Ping() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (ps *peerStore) Stop() stop.Result {
|
||||
c := make(stop.Channel)
|
||||
go func() {
|
||||
|
||||
@@ -458,7 +458,7 @@ func (ps *store) GraduateLeecher(ih bittorrent.InfoHash, peer bittorrent.Peer) e
|
||||
})
|
||||
}
|
||||
|
||||
func (ps Connection) parsePeersList(peersResult *redis.StringSliceCmd) (peers []bittorrent.Peer, err error) {
|
||||
func (ps *Connection) parsePeersList(peersResult *redis.StringSliceCmd) (peers []bittorrent.Peer, err error) {
|
||||
var peerIds []string
|
||||
peerIds, err = peersResult.Result()
|
||||
if err = AsNil(err); err == nil {
|
||||
@@ -479,7 +479,7 @@ type getPeersFn func(context.Context, string, int) *redis.StringSliceCmd
|
||||
// converts result to bittorrent.Peer array.
|
||||
// If forSeeder set to true - returns only leechers, if false -
|
||||
// seeders and if maxCount not reached - leechers.
|
||||
func (ps Connection) GetPeers(ih bittorrent.InfoHash, forSeeder bool, maxCount int, isV6 bool, membersFn getPeersFn) (out []bittorrent.Peer, err error) {
|
||||
func (ps *Connection) GetPeers(ih bittorrent.InfoHash, forSeeder bool, maxCount int, isV6 bool, membersFn getPeersFn) (out []bittorrent.Peer, err error) {
|
||||
infoHash := ih.RawString()
|
||||
|
||||
infoHashKeys := make([]string, 1, 2)
|
||||
@@ -528,7 +528,7 @@ func (ps *store) AnnouncePeers(ih bittorrent.InfoHash, seeder bool, numWant int,
|
||||
|
||||
type getPeerCountFn func(context.Context, string) *redis.IntCmd
|
||||
|
||||
func (ps Connection) countPeers(infoHashKey string, countFn getPeerCountFn) uint32 {
|
||||
func (ps *Connection) countPeers(infoHashKey string, countFn getPeerCountFn) uint32 {
|
||||
count, err := countFn(context.TODO(), infoHashKey).Result()
|
||||
err = AsNil(err)
|
||||
if err != nil {
|
||||
@@ -538,7 +538,7 @@ func (ps Connection) countPeers(infoHashKey string, countFn getPeerCountFn) uint
|
||||
}
|
||||
|
||||
// CountPeers calls provided countFn and returns seeders and leechers count for specified info hash
|
||||
func (ps Connection) CountPeers(ih bittorrent.InfoHash, countFn getPeerCountFn) (leechersCount, seedersCount uint32) {
|
||||
func (ps *Connection) CountPeers(ih bittorrent.InfoHash, countFn getPeerCountFn) (leechersCount, seedersCount uint32) {
|
||||
infoHash := ih.RawString()
|
||||
|
||||
leechersCount = ps.countPeers(InfoHashKey(infoHash, false, false), countFn) +
|
||||
@@ -562,7 +562,7 @@ func (ps *store) ScrapeSwarm(ih bittorrent.InfoHash) (leechers uint32, seeders u
|
||||
const argNumErrorMsg = "ERR wrong number of arguments"
|
||||
|
||||
// Put - storage.DataStorage implementation
|
||||
func (ps Connection) Put(ctx string, values ...storage.Entry) (err error) {
|
||||
func (ps *Connection) Put(ctx string, values ...storage.Entry) (err error) {
|
||||
if l := len(values); l > 0 {
|
||||
if l == 1 {
|
||||
err = ps.HSet(context.TODO(), PrefixKey+ctx, values[0].Key, values[0].Value).Err()
|
||||
@@ -588,13 +588,13 @@ func (ps Connection) Put(ctx string, values ...storage.Entry) (err error) {
|
||||
}
|
||||
|
||||
// Contains - storage.DataStorage implementation
|
||||
func (ps Connection) Contains(ctx string, key string) (bool, error) {
|
||||
func (ps *Connection) Contains(ctx string, key string) (bool, error) {
|
||||
exist, err := ps.HExists(context.TODO(), PrefixKey+ctx, key).Result()
|
||||
return exist, AsNil(err)
|
||||
}
|
||||
|
||||
// Load - storage.DataStorage implementation
|
||||
func (ps Connection) Load(ctx string, key string) (v any, err error) {
|
||||
func (ps *Connection) Load(ctx string, key string) (v any, err error) {
|
||||
v, err = ps.HGet(context.TODO(), PrefixKey+ctx, key).Result()
|
||||
if err != nil && errors.Is(err, redis.Nil) {
|
||||
v, err = nil, nil
|
||||
@@ -603,7 +603,7 @@ func (ps Connection) Load(ctx string, key string) (v any, err error) {
|
||||
}
|
||||
|
||||
// Delete - storage.DataStorage implementation
|
||||
func (ps Connection) Delete(ctx string, keys ...string) (err error) {
|
||||
func (ps *Connection) Delete(ctx string, keys ...string) (err error) {
|
||||
if len(keys) > 0 {
|
||||
err = AsNil(ps.HDel(context.TODO(), PrefixKey+ctx, keys...).Err())
|
||||
if err != nil {
|
||||
@@ -625,6 +625,11 @@ func (Connection) Preservable() bool {
|
||||
return true
|
||||
}
|
||||
|
||||
// Ping sends `PING` request to Redis server
|
||||
func (ps *Connection) Ping() error {
|
||||
return ps.UniversalClient.Ping(context.TODO()).Err()
|
||||
}
|
||||
|
||||
// GC deletes all Peers from the PeerStorage which are older than the
|
||||
// cutoff time.
|
||||
//
|
||||
|
||||
@@ -206,6 +206,10 @@ type PeerStorage interface {
|
||||
// If the Swarm does not exist, an empty Scrape and no error is returned.
|
||||
ScrapeSwarm(ih bittorrent.InfoHash) (leechers uint32, seeders uint32, snatched uint32)
|
||||
|
||||
// Ping used for checks if storage is alive
|
||||
// (connection could be established, enough space etc.)
|
||||
Ping() error
|
||||
|
||||
// Stopper is an interface that expects a Stop method to stop the PeerStorage.
|
||||
// For more details see the documentation in the stop package.
|
||||
stop.Stopper
|
||||
|
||||
Reference in New Issue
Block a user