diff --git a/README.md b/README.md index 0bd2495..db7bdff 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,7 @@ ![Mochi (source image: https://www.flaticon.com/free-icon/mochi_5392004)](mochi.svg) # Modified Chihaya (MoChi) + [![Build Status](https://github.com/sot-tech/mochi/workflows/Build%20&%20Test/badge.svg)](https://github.com/sot-tech/mochi/actions) [![Docker Repository on Quay](https://quay.io/repository/eramde/mochi/status "Docker Repository on Quay")](https://quay.io/repository/eramde/mochi) [![License](https://img.shields.io/badge/license-BSD-blue.svg)](https://opensource.org/licenses/BSD-2-Clause) diff --git a/bittorrent/bittorrent.go b/bittorrent/bittorrent.go index e51ec5d..e372888 100644 --- a/bittorrent/bittorrent.go +++ b/bittorrent/bittorrent.go @@ -243,7 +243,7 @@ func NewPeer(data string) (Peer, error) { peer = Peer{ ID: peerID, AddrPort: netip.AddrPortFrom( - addr, + addr.Unmap(), binary.BigEndian.Uint16(b[PeerIDLen:PeerIDLen+2]), ), } @@ -276,7 +276,7 @@ func (p Peer) RawString() string { func (p Peer) LogFields() log.Fields { return log.Fields{ "ID": p.ID, - "IP": p.Addr().String(), + "IP": p.Addr(), "port": p.Port(), } } @@ -290,6 +290,11 @@ func (p Peer) EqualEndpoint(x Peer) bool { p.Addr().Compare(x.Addr()) == 0 } +// Addr returns unmapped peer's IP address +func (p Peer) Addr() netip.Addr { + return p.AddrPort.Addr().Unmap() +} + // ClientError represents an error that should be exposed to the client over // the BitTorrent protocol implementation. type ClientError string diff --git a/bittorrent/sanitize.go b/bittorrent/sanitize.go index 71df8ad..7cffaaf 100644 --- a/bittorrent/sanitize.go +++ b/bittorrent/sanitize.go @@ -27,7 +27,7 @@ func SanitizeAnnounce(r *AnnounceRequest, maxNumWant, defaultNumWant uint32) err r.NumWant = maxNumWant } - r.AddrPort = netip.AddrPortFrom(r.Addr().Unmap(), r.Port()) + r.AddrPort = netip.AddrPortFrom(r.Addr(), r.Port()) if !r.Addr().IsValid() || r.Addr().IsUnspecified() { return ErrInvalidIP } diff --git a/cmd/mochi/main.go b/cmd/mochi/main.go index 8163730..a2ad00c 100644 --- a/cmd/mochi/main.go +++ b/cmd/mochi/main.go @@ -3,6 +3,7 @@ package main import ( "context" "errors" + "fmt" "os/signal" "runtime" "strings" @@ -46,7 +47,7 @@ func NewRun(configFilePath string) (*Run, error) { func (r *Run) Start(ps storage.Storage) error { configFile, err := ParseConfigFile(r.configFilePath) if err != nil { - return errors.New("failed to read config: " + err.Error()) + return fmt.Errorf("failed to read config: %w", err) } cfg := configFile.Conf @@ -63,7 +64,7 @@ func (r *Run) Start(ps storage.Storage) error { log.Info("starting storage", log.Fields{"name": cfg.Storage.Name}) ps, err = storage.NewStorage(cfg.Storage.Name, cfg.Storage.Config) if err != nil { - return errors.New("failed to create storage: " + err.Error()) + return fmt.Errorf("failed to create storage: %w", err) } log.Info("started storage", ps) } @@ -71,11 +72,11 @@ func (r *Run) Start(ps storage.Storage) error { preHooks, err := middleware.HooksFromHookConfigs(cfg.PreHooks, r.storage) if err != nil { - return errors.New("failed to validate hook config: " + err.Error()) + return fmt.Errorf("failed to validate hook config: %w", err) } postHooks, err := middleware.HooksFromHookConfigs(cfg.PostHooks, r.storage) if err != nil { - return errors.New("failed to validate hook config: " + err.Error()) + return fmt.Errorf("failed to validate hook config: %w", err) } log.Info("starting tracker logic", log.Fields{ diff --git a/frontend/http/frontend.go b/frontend/http/frontend.go index c39a88c..ffbf910 100644 --- a/frontend/http/frontend.go +++ b/frontend/http/frontend.go @@ -297,7 +297,7 @@ func (f *Frontend) announceRoute(w http.ResponseWriter, r *http.Request, ps http WriteError(w, err) return } - addr = req.AddrPort.Addr() + addr = req.Addr() ctx := injectRouteParamsToContext(context.Background(), ps) ctx, resp, err := f.logic.HandleAnnounce(ctx, req) diff --git a/frontend/http/parser.go b/frontend/http/parser.go index 2e7bd2f..727a774 100644 --- a/frontend/http/parser.go +++ b/frontend/http/parser.go @@ -178,5 +178,5 @@ func requestedIP(r *http.Request, p bittorrent.Params, opts ParseOptions) (netip } addrPort, err := netip.ParseAddrPort(r.RemoteAddr) - return addrPort.Addr(), false, err + return addrPort.Addr().Unmap(), false, err } diff --git a/frontend/http/writer.go b/frontend/http/writer.go index 035b092..ae1bac2 100644 --- a/frontend/http/writer.go +++ b/frontend/http/writer.go @@ -98,17 +98,17 @@ func WriteScrapeResponse(w http.ResponseWriter, resp *bittorrent.ScrapeResponse) } func compact4(peer bittorrent.Peer) (buf []byte) { - ip := peer.AddrPort.Addr().As4() + ip := peer.Addr().As4() buf = append(buf, ip[:]...) - port := peer.AddrPort.Port() + port := peer.Port() buf = append(buf, byte(port>>8), byte(port&0xff)) return } func compact6(peer bittorrent.Peer) (buf []byte) { - ip := peer.AddrPort.Addr().As16() + ip := peer.Addr().As16() buf = append(buf, ip[:]...) - port := peer.AddrPort.Port() + port := peer.Port() buf = append(buf, byte(port>>8), byte(port&0xff)) return } diff --git a/frontend/udp/frontend.go b/frontend/udp/frontend.go index 72b050b..8e97a51 100644 --- a/frontend/udp/frontend.go +++ b/frontend/udp/frontend.go @@ -207,7 +207,7 @@ func (t *Frontend) serve() error { defer pool.Put(buffer) // Handle the request. - addr := addrPort.Addr() + addr := addrPort.Addr().Unmap() var start time.Time if t.EnableRequestTiming { start = time.Now() diff --git a/middleware/hooks.go b/middleware/hooks.go index 1810008..415943c 100644 --- a/middleware/hooks.go +++ b/middleware/hooks.go @@ -125,7 +125,7 @@ func (h *responseHook) appendPeers(req *bittorrent.AnnounceRequest, resp *bittor } switch addr := req.Peer.Addr(); { - case addr.Is4(), addr.Is4In6(): + case addr.Is4(): resp.IPv4Peers = mergePeers(resp.IPv4Peers, peers, max) case addr.Is6(): resp.IPv6Peers = mergePeers(resp.IPv6Peers, peers, max) diff --git a/middleware/jwt/jwt.go b/middleware/jwt/jwt.go index 21b9b4b..a0e2937 100644 --- a/middleware/jwt/jwt.go +++ b/middleware/jwt/jwt.go @@ -95,7 +95,7 @@ func NewHook(cfg Config) (middleware.Hook, error) { log.Debug("performing initial fetch of JWKs") if err := h.updateKeys(); err != nil { - return nil, errors.New("failed to fetch initial JWK Set: " + err.Error()) + return nil, fmt.Errorf("failed to fetch initial JWK Set: %w", err) } go func() { diff --git a/mochi.svg b/mochi.svg index 6d679e7..ce35d73 100644 --- a/mochi.svg +++ b/mochi.svg @@ -3,97 +3,97 @@ - - - - - - - - - - - - - - - - - - - + xmlns:inkscape="http://www.inkscape.org/namespaces/inkscape" + xmlns:sodipodi="http://sodipodi.sourceforge.net/DTD/sodipodi-0.dtd" + width="100mm" + height="100mm" + viewBox="0 0 100 99.999997" + version="1.1" + id="svg5" + sodipodi:docname="mochi.svg" + inkscape:version="1.1.2 (1:1.1+202202050950+0a00cf5339)" + xmlns="http://www.w3.org/2000/svg" +> + + + + + + + + + + + + + + + + + + + diff --git a/storage/memory/storage.go b/storage/memory/storage.go index abee8b1..42de117 100644 --- a/storage/memory/storage.go +++ b/storage/memory/storage.go @@ -139,16 +139,18 @@ func New(provided Config) (storage.Storage, error) { ps.wg.Add(1) go func() { defer ps.wg.Done() + t := time.NewTimer(cfg.GarbageCollectionInterval) + defer t.Stop() for { select { case <-ps.closed: return - case <-time.After(cfg.GarbageCollectionInterval): + case <-t.C: before := time.Now().Add(-cfg.PeerLifetime) log.Debug("storage: purging peers with no announces since", log.Fields{"before": before}) - if err := ps.collectGarbage(before); err != nil { - log.Error(err) - } + start := time.Now() + ps.GC(before) + recordGCDuration(time.Since(start)) } } }() @@ -546,20 +548,19 @@ func (ps *store) Delete(ctx string, keys ...string) error { return nil } -// collectGarbage deletes all Peers from the Storage which are older than the +// GC deletes all Peers from the Storage which are older than the // cutoff time. // // This function must be able to execute while other methods on this interface // are being executed in parallel. -func (ps *store) collectGarbage(cutoff time.Time) error { +func (ps *store) GC(cutoff time.Time) { select { case <-ps.closed: - return nil + return default: } cutoffUnix := cutoff.UnixNano() - start := time.Now() for _, shard := range ps.shards { shard.RLock() @@ -603,10 +604,6 @@ func (ps *store) collectGarbage(cutoff time.Time) error { runtime.Gosched() } - - recordGCDuration(time.Since(start)) - - return nil } func (ps *store) Stop() stop.Result { diff --git a/storage/redis/storage.go b/storage/redis/storage.go index 646ef97..82aab65 100644 --- a/storage/redis/storage.go +++ b/storage/redis/storage.go @@ -2,24 +2,21 @@ // BitTorrent tracker keeping peer data in redis with hash. // There two categories of hash: // -// - CHI_{4,6}_{L,S}_infohash +// - CHI_{4,6}_{L,S}_ (hash type) // To save peers that hold the infohash, used for fast searching, // deleting, and timeout handling // -// - CHI_{4,6} +// - CHI_{4,6}_I (set type) // To save all the infohashes, used for garbage collection, // metrics aggregation and leecher graduation // // Tree keys are used to record the count of swarms, seeders // and leechers for each group (IPv4, IPv6). // -// - CHI_{4,6}_I_C -// To record the number of infohashes. -// -// - CHI_{4,6}_S_C +// - CHI_{4,6}_S_C (key type) // To record the number of seeders. // -// - CHI_{4,6}_L_C +// - CHI_{4,6}_L_C (key type) // To record the number of leechers. package redis @@ -64,8 +61,6 @@ const ( cnt6SeederKey = "CHI_6_C_S" cnt4LeecherKey = "CHI_4_C_L" cnt6LeecherKey = "CHI_6_C_L" - cnt4InfoHashKey = "CHI_4_C_I" - cnt6InfoHashKey = "CHI_6_C_I" ) // ErrSentinelAndClusterChecked returned from initializer if both Config.Sentinel and Config.Cluster provided @@ -277,10 +272,12 @@ func New(conf Config) (storage.Storage, error) { ps.logFields = cfg.LogFields() // Start a goroutine for garbage collection. + ps.wg.Add(1) go ps.scheduleGC(cfg.GarbageCollectionInterval, cfg.PeerLifetime) if cfg.PrometheusReportingInterval > 0 { // Start a goroutine for reporting statistics to Prometheus. + ps.wg.Add(1) go ps.schedulerProm(cfg.PrometheusReportingInterval) } else { log.Info("prometheus disabled because of zero reporting interval") @@ -290,7 +287,6 @@ func New(conf Config) (storage.Storage, error) { } func (ps *store) scheduleGC(gcInterval, peerLifeTime time.Duration) { - ps.wg.Add(1) defer ps.wg.Done() t := time.NewTimer(gcInterval) defer t.Stop() @@ -299,12 +295,8 @@ func (ps *store) scheduleGC(gcInterval, peerLifeTime time.Duration) { case <-ps.closed: return case <-t.C: - before := time.Now().Add(-peerLifeTime) - log.Debug("storage: purging peers with no announces since", log.Fields{"before": before}) - cutoffUnix := before.UnixNano() start := time.Now() - ps.gc(cutoffUnix, false) - ps.gc(cutoffUnix, true) + ps.GC(time.Now().Add(-peerLifeTime)) duration := time.Since(start).Milliseconds() log.Debug("storage: recordGCDuration", log.Fields{"timeTaken(ms)": duration}) storage.PromGCDurationMilliseconds.Observe(float64(duration)) @@ -314,7 +306,6 @@ func (ps *store) scheduleGC(gcInterval, peerLifeTime time.Duration) { } func (ps *store) schedulerProm(reportInterval time.Duration) { - ps.wg.Add(1) defer ps.wg.Done() t := time.NewTicker(reportInterval) for { @@ -339,10 +330,16 @@ type store struct { logFields log.Fields } -func (ps *store) count(key string) (n uint64) { +func (ps *store) count(key string, getLength bool) (n uint64) { var err error - if n, err = ps.con.Get(ps.ctx, key).Uint64(); err != nil && !errors.Is(err, redis.Nil) { - log.Error("storage: GET counter failure", log.Fields{ + if getLength { + n, err = ps.con.SCard(ps.ctx, key).Uint64() + } else { + n, err = ps.con.Get(ps.ctx, key).Uint64() + } + err = asNil(err) + if err != nil { + log.Error("storage: len/counter failure", log.Fields{ "key": key, "error": err, }) @@ -355,15 +352,15 @@ func (ps *store) count(key string) (n uint64) { func (ps *store) populateProm() { numInfoHashes, numSeeders, numLeechers := new(uint64), new(uint64), new(uint64) fetchFn := func(v6 bool) { - var cntSeederKey, cntLeecherKey, cntInfoHashKey string + var cntSeederKey, cntLeecherKey, ihSummaryKey string if v6 { - cntSeederKey, cntLeecherKey, cntInfoHashKey = cnt6SeederKey, cnt6LeecherKey, cnt6InfoHashKey + cntSeederKey, cntLeecherKey, ihSummaryKey = cnt6SeederKey, cnt6LeecherKey, ih6Key } else { - cntSeederKey, cntLeecherKey, cntInfoHashKey = cnt4SeederKey, cnt4LeecherKey, cnt4InfoHashKey + cntSeederKey, cntLeecherKey, ihSummaryKey = cnt4SeederKey, cnt4LeecherKey, ih4Key } - *numInfoHashes += ps.count(cntInfoHashKey) - *numSeeders += ps.count(cntSeederKey) - *numLeechers += ps.count(cntLeecherKey) + *numInfoHashes += ps.count(ihSummaryKey, true) + *numSeeders += ps.count(cntSeederKey, false) + *numLeechers += ps.count(cntLeecherKey, false) } fetchFn(false) @@ -403,15 +400,15 @@ func asNil(err error) error { } func (ps *store) PutSeeder(ih bittorrent.InfoHash, peer bittorrent.Peer) error { - var ihSummaryKey, ihPeerKey, cntPeerKey, cntInfoHashKey string + var ihSummaryKey, ihPeerKey, cntPeerKey string log.Debug("storage: PutSeeder", log.Fields{ "InfoHash": ih, "Peer": peer, }) if peer.Addr().Is6() { - ihSummaryKey, ihPeerKey, cntPeerKey, cntInfoHashKey = ih6Key, ih6SeederKey, cnt6SeederKey, cnt6InfoHashKey + ihSummaryKey, ihPeerKey, cntPeerKey = ih6Key, ih6SeederKey, cnt6SeederKey } else { - ihSummaryKey, ihPeerKey, cntPeerKey, cntInfoHashKey = ih4Key, ih4SeederKey, cnt4SeederKey, cnt4InfoHashKey + ihSummaryKey, ihPeerKey, cntPeerKey = ih4Key, ih4SeederKey, cnt4SeederKey } ihPeerKey += ih.RawString() now := ps.getClock() @@ -423,13 +420,7 @@ func (ps *store) PutSeeder(ih bittorrent.InfoHash, peer bittorrent.Peer) error { if err = ps.con.Incr(ps.ctx, cntPeerKey).Err(); err != nil { return } - var added int64 - if added, err = ps.con.SAdd(ps.ctx, ihSummaryKey, ihPeerKey).Result(); err != nil { - return - } - if added > 0 { - err = ps.con.Incr(ps.ctx, cntInfoHashKey).Err() - } + err = ps.con.SAdd(ps.ctx, ihSummaryKey, ihPeerKey).Err() return }) } @@ -473,16 +464,14 @@ func (ps *store) PutLeecher(ih bittorrent.InfoHash, peer bittorrent.Peer) error } ihPeerKey += ih.RawString() - now := ps.getClock() - return ps.tx(func(tx redis.Pipeliner) (err error) { - if err = tx.HSet(ps.ctx, ihPeerKey, peer.RawString(), now).Err(); err != nil { + if err = tx.HSet(ps.ctx, ihPeerKey, peer.RawString(), ps.getClock()).Err(); err != nil { return } if err = tx.Incr(ps.ctx, cntPeerKey).Err(); err != nil { return err } - err = tx.HSet(ps.ctx, ihSummaryKey, ihPeerKey, now).Err() + err = tx.SAdd(ps.ctx, ihSummaryKey, ihPeerKey).Err() return }) } @@ -515,7 +504,7 @@ func (ps *store) DeleteLeecher(ih bittorrent.InfoHash, peer bittorrent.Peer) err } func (ps *store) GraduateLeecher(ih bittorrent.InfoHash, peer bittorrent.Peer) error { - var ihSummaryKey, ihSeederKey, ihLeecherKey, cntSeederKey, cntLeecherKey, cntInfoHashKey string + var ihSummaryKey, ihSeederKey, ihLeecherKey, cntSeederKey, cntLeecherKey string log.Debug("storage: GraduateLeecher", log.Fields{ "InfoHash": ih, "Peer": peer, @@ -523,16 +512,14 @@ func (ps *store) GraduateLeecher(ih bittorrent.InfoHash, peer bittorrent.Peer) e if peer.Addr().Is6() { ihSummaryKey, ihSeederKey, cntSeederKey = ih6Key, ih6SeederKey, cnt6SeederKey - ihLeecherKey, cntLeecherKey, cntInfoHashKey = ih6LeecherKey, cnt6LeecherKey, cnt6InfoHashKey + ihLeecherKey, cntLeecherKey = ih6LeecherKey, cnt6LeecherKey } else { ihSummaryKey, ihSeederKey, cntSeederKey = ih4Key, ih4SeederKey, cnt4SeederKey - ihLeecherKey, cntLeecherKey, cntInfoHashKey = ih4LeecherKey, cnt4LeecherKey, cnt4InfoHashKey + ihLeecherKey, cntLeecherKey = ih4LeecherKey, cnt4LeecherKey } infoHash, peerKey := ih.RawString(), peer.RawString() ihSeederKey, ihLeecherKey = ihSeederKey+infoHash, ihLeecherKey+infoHash - now := ps.getClock() - return ps.tx(func(tx redis.Pipeliner) error { deleted, err := tx.HDel(ps.ctx, ihLeecherKey, peerKey).Uint64() err = asNil(err) @@ -542,16 +529,13 @@ func (ps *store) GraduateLeecher(ih bittorrent.InfoHash, peer bittorrent.Peer) e } } if err == nil { - err = tx.HSet(ps.ctx, ihSeederKey, peerKey, now).Err() + err = tx.HSet(ps.ctx, ihSeederKey, peerKey, ps.getClock()).Err() } if err == nil { err = tx.Incr(ps.ctx, cntSeederKey).Err() } if err == nil { - err = tx.HSet(ps.ctx, ihSummaryKey, ihSeederKey, now).Err() - } - if err == nil { - err = tx.Incr(ps.ctx, cntInfoHashKey).Err() + err = tx.SAdd(ps.ctx, ihSummaryKey, ihSeederKey).Err() } return err }) @@ -567,7 +551,7 @@ func (ps *store) AnnouncePeers(ih bittorrent.InfoHash, seeder bool, numWant int, }) if peer.Addr().Is6() { - ihSeederKey, ihLeecherKey = ih6SeederKey, cnt6LeecherKey + ihSeederKey, ihLeecherKey = ih6SeederKey, ih6LeecherKey } else { ihSeederKey, ihLeecherKey = ih4SeederKey, ih4LeecherKey } @@ -647,7 +631,7 @@ func (ps *store) ScrapeSwarm(ih bittorrent.InfoHash, peer bittorrent.Peer) (resp }) resp.InfoHash = ih if peer.Addr().Is6() { - ihSeederKey, ihLeecherKey = ih6SeederKey, cnt6LeecherKey + ihSeederKey, ihLeecherKey = ih6SeederKey, ih6LeecherKey } else { ihSeederKey, ihLeecherKey = ih4SeederKey, ih4LeecherKey } @@ -737,6 +721,13 @@ func (ps *store) Delete(ctx string, keys ...string) (err error) { return } +func (ps *store) GC(cutoff time.Time) { + log.Debug("storage: purging peers with no announces since", log.Fields{"before": cutoff}) + cutoffUnix := cutoff.UnixNano() + ps.gc(cutoffUnix, false) + ps.gc(cutoffUnix, true) +} + // gc deletes all Peers from the Storage which are older than the // cutoff time. // @@ -782,14 +773,14 @@ func (ps *store) Delete(ctx string, keys ...string) (err error) { // - If the change happens after the HLEN, we will not even attempt to make the // transaction. The infohash key will remain in the addressFamil hash and // we'll attempt to clean it up the next time gc runs. -func (ps *store) gc(cutoffUnix int64, v6 bool) { +func (ps *store) gc(cutoffNanos int64, v6 bool) { // list all infoHashKeys in the group - var ihSummaryKey, ihSeederKey, ihLeecherKey, cntSeederKey, cntLeecherKey, cntInfoHashKey string + var ihSummaryKey, ihSeederKey, ihLeecherKey, cntSeederKey, cntLeecherKey string if v6 { - cntSeederKey, cntLeecherKey, cntInfoHashKey = cnt6SeederKey, cnt6LeecherKey, cnt6InfoHashKey + cntSeederKey, cntLeecherKey = cnt6SeederKey, cnt6LeecherKey ihSummaryKey, ihSeederKey, ihLeecherKey = ih6Key, ih6SeederKey, ih6LeecherKey } else { - cntSeederKey, cntLeecherKey, cntInfoHashKey = cnt4SeederKey, cnt4LeecherKey, cnt4InfoHashKey + cntSeederKey, cntLeecherKey = cnt4SeederKey, cnt4LeecherKey ihSummaryKey, ihSeederKey, ihLeecherKey = ih4Key, ih4SeederKey, ih4LeecherKey } infoHashKeys, err := ps.con.SMembers(ps.ctx, ihSummaryKey).Result() @@ -818,7 +809,7 @@ func (ps *store) gc(cutoffUnix int64, v6 bool) { var peer bittorrent.Peer if peer, err = bittorrent.NewPeer(peerKey); err == nil { if mtime, err := strconv.ParseInt(timeStamp, 10, 64); err == nil { - if mtime <= cutoffUnix { + if mtime <= cutoffNanos { log.Debug("storage: Redis: deleting peer", log.Fields{ "Peer": peer, }) @@ -853,22 +844,15 @@ func (ps *store) gc(cutoffUnix int64, v6 bool) { } } - // use WATCH to avoid race condition - // https://redis.io/topics/transactions err = asNil(ps.con.Watch(ps.ctx, func(tx *redis.Tx) (err error) { - var infoHashCount int64 - infoHashCount, err = ps.con.HLen(ps.ctx, infoHashKey).Result() + var infoHashCount uint64 + infoHashCount, err = ps.con.HLen(ps.ctx, infoHashKey).Uint64() err = asNil(err) if err == nil && infoHashCount == 0 { // Empty hashes are not shown among existing keys, // in other words, it's removed automatically after `HDEL` the last field. // _, err := ps.con.Del(ps.ctx, infoHashKey) - var deletedCount int64 - deletedCount, err = ps.con.SRem(ps.ctx, ihSummaryKey, infoHashKey).Result() - err = asNil(err) - if err == nil && seeder && deletedCount > 0 { - err = ps.con.Decr(ps.ctx, cntInfoHashKey).Err() - } + err = asNil(ps.con.SRem(ps.ctx, ihSummaryKey, infoHashKey).Err()) } return err }, infoHashKey)) diff --git a/storage/storage.go b/storage/storage.go index 9cf3dae..a2b43c3 100644 --- a/storage/storage.go +++ b/storage/storage.go @@ -3,6 +3,7 @@ package storage import ( "errors" "sync" + "time" "github.com/sot-tech/mochi/bittorrent" "github.com/sot-tech/mochi/pkg/log" @@ -128,6 +129,9 @@ type Storage interface { // Delete used to delete arbitrary data in specified context by its keys Delete(ctx string, keys ...string) error + // GC used to delete stale data, such as timed out seeders/leechers + GC(cutoff time.Time) + // Stopper is an interface that expects a Stop method to stop the Storage. // For more details see the documentation in the stop package. stop.Stopper diff --git a/storage/test/storage_bench.go b/storage/test/storage_bench.go index 74fb3bc..0efce33 100644 --- a/storage/test/storage_bench.go +++ b/storage/test/storage_bench.go @@ -2,50 +2,46 @@ package test import ( "math/rand" + "net" "net/netip" "runtime" "sync/atomic" "testing" "github.com/sot-tech/mochi/bittorrent" + // used for seeding global math.Rand _ "github.com/sot-tech/mochi/pkg/randseed" "github.com/sot-tech/mochi/storage" ) type benchData struct { - infohashes [1000]bittorrent.InfoHash - peers [1000]bittorrent.Peer + infoHashes [1000]bittorrent.InfoHash + peers [10000]bittorrent.Peer } func generateInfoHashes() (a [1000]bittorrent.InfoHash) { for i := range a { - b := make([]byte, bittorrent.InfoHashV1Len) - rand.Read(b) - a[i], _ = bittorrent.NewInfoHash(b) + a[i] = randIH(rand.Int63()%2 == 0) } - return } -func generatePeers() (a [1000]bittorrent.Peer) { +func generatePeers() (a [10000]bittorrent.Peer) { for i := range a { - ip := make([]byte, 4) - n, err := rand.Read(ip) - if err != nil || n != 4 { - panic("unable to create random bytes") - } - id := [bittorrent.PeerIDLen]byte{} - n, err = rand.Read(id[:]) - if err != nil || n != bittorrent.InfoHashV1Len { - panic("unable to create random bytes") + var ip []byte + if rand.Int63()%2 == 0 { + ip = make([]byte, net.IPv4len) + } else { + ip = make([]byte, net.IPv6len) } + rand.Read(ip) addr, ok := netip.AddrFromSlice(ip) if !ok { panic("unable to create ip from random bytes") } port := uint16(rand.Uint32()) a[i] = bittorrent.Peer{ - ID: id, + ID: randPeerID(), AddrPort: netip.AddrPortFrom(addr, port), } } @@ -122,7 +118,7 @@ func (bh *benchHolder) Nop(b *testing.B) { // Put can run in parallel. func (bh *benchHolder) Put(b *testing.B) { bh.runBenchmark(b, true, nil, func(i int, ps storage.Storage, bd *benchData) error { - return ps.PutSeeder(bd.infohashes[0], bd.peers[0]) + return ps.PutSeeder(bd.infoHashes[0], bd.peers[0]) }) } @@ -132,27 +128,27 @@ func (bh *benchHolder) Put(b *testing.B) { // Put1k can run in parallel. func (bh *benchHolder) Put1k(b *testing.B) { bh.runBenchmark(b, true, nil, func(i int, ps storage.Storage, bd *benchData) error { - return ps.PutSeeder(bd.infohashes[0], bd.peers[i%1000]) + return ps.PutSeeder(bd.infoHashes[0], bd.peers[i%1000]) }) } // Put1kInfoHash benchmarks the PutSeeder method of a storage.Storage by cycling -// through 1000 infohashes and putting the same peer into their swarms. +// through 1000 infoHashes and putting the same peer into their swarms. // // Put1kInfoHash can run in parallel. func (bh *benchHolder) Put1kInfoHash(b *testing.B) { bh.runBenchmark(b, true, nil, func(i int, ps storage.Storage, bd *benchData) error { - return ps.PutSeeder(bd.infohashes[i%1000], bd.peers[0]) + return ps.PutSeeder(bd.infoHashes[i%1000], bd.peers[0]) }) } // Put1kInfoHash1k benchmarks the PutSeeder method of a storage.Storage by cycling -// through 1000 infohashes and 1000 Peers and calling Put with them. +// through 1000 infoHashes and 1000 Peers and calling Put with them. // // Put1kInfoHash1k can run in parallel. func (bh *benchHolder) Put1kInfoHash1k(b *testing.B) { bh.runBenchmark(b, true, nil, func(i int, ps storage.Storage, bd *benchData) error { - err := ps.PutSeeder(bd.infohashes[i%1000], bd.peers[(i*3)%1000]) + err := ps.PutSeeder(bd.infoHashes[i%1000], bd.peers[(i*3)%1000]) return err }) } @@ -163,11 +159,11 @@ func (bh *benchHolder) Put1kInfoHash1k(b *testing.B) { // PutDelete can not run in parallel. func (bh *benchHolder) PutDelete(b *testing.B) { bh.runBenchmark(b, false, nil, func(i int, ps storage.Storage, bd *benchData) error { - err := ps.PutSeeder(bd.infohashes[0], bd.peers[0]) + err := ps.PutSeeder(bd.infoHashes[0], bd.peers[0]) if err != nil { return err } - return ps.DeleteSeeder(bd.infohashes[0], bd.peers[0]) + return ps.DeleteSeeder(bd.infoHashes[0], bd.peers[0]) }) } @@ -177,39 +173,39 @@ func (bh *benchHolder) PutDelete(b *testing.B) { // PutDelete1k can not run in parallel. func (bh *benchHolder) PutDelete1k(b *testing.B) { bh.runBenchmark(b, false, nil, func(i int, ps storage.Storage, bd *benchData) error { - err := ps.PutSeeder(bd.infohashes[0], bd.peers[i%1000]) + err := ps.PutSeeder(bd.infoHashes[0], bd.peers[i%1000]) if err != nil { return err } - return ps.DeleteSeeder(bd.infohashes[0], bd.peers[i%1000]) + return ps.DeleteSeeder(bd.infoHashes[0], bd.peers[i%1000]) }) } -// PutDelete1kInfoHash behaves like PutDelete1k with 1000 infohashes instead of +// PutDelete1kInfoHash behaves like PutDelete1k with 1000 infoHashes instead of // 1000 Peers. // // PutDelete1kInfoHash can not run in parallel. func (bh *benchHolder) PutDelete1kInfoHash(b *testing.B) { bh.runBenchmark(b, false, nil, func(i int, ps storage.Storage, bd *benchData) error { - err := ps.PutSeeder(bd.infohashes[i%1000], bd.peers[0]) + err := ps.PutSeeder(bd.infoHashes[i%1000], bd.peers[0]) if err != nil { return err } - return ps.DeleteSeeder(bd.infohashes[i%1000], bd.peers[0]) + return ps.DeleteSeeder(bd.infoHashes[i%1000], bd.peers[0]) }) } -// PutDelete1kInfoHash1k behaves like PutDelete1k with 1000 infohashes in +// PutDelete1kInfoHash1k behaves like PutDelete1k with 1000 infoHashes in // addition to 1000 Peers. // // PutDelete1kInfoHash1k can not run in parallel. func (bh *benchHolder) PutDelete1kInfoHash1k(b *testing.B) { bh.runBenchmark(b, false, nil, func(i int, ps storage.Storage, bd *benchData) error { - err := ps.PutSeeder(bd.infohashes[i%1000], bd.peers[(i*3)%1000]) + err := ps.PutSeeder(bd.infoHashes[i%1000], bd.peers[(i*3)%1000]) if err != nil { return err } - err = ps.DeleteSeeder(bd.infohashes[i%1000], bd.peers[(i*3)%1000]) + err = ps.DeleteSeeder(bd.infoHashes[i%1000], bd.peers[(i*3)%1000]) return err }) } @@ -220,7 +216,7 @@ func (bh *benchHolder) PutDelete1kInfoHash1k(b *testing.B) { // DeleteNonexist can run in parallel. func (bh *benchHolder) DeleteNonexist(b *testing.B) { bh.runBenchmark(b, true, nil, func(i int, ps storage.Storage, bd *benchData) error { - _ = ps.DeleteSeeder(bd.infohashes[0], bd.peers[0]) + _ = ps.DeleteSeeder(bd.infoHashes[0], bd.peers[0]) return nil }) } @@ -231,18 +227,18 @@ func (bh *benchHolder) DeleteNonexist(b *testing.B) { // DeleteNonexist can run in parallel. func (bh *benchHolder) DeleteNonexist1k(b *testing.B) { bh.runBenchmark(b, true, nil, func(i int, ps storage.Storage, bd *benchData) error { - _ = ps.DeleteSeeder(bd.infohashes[0], bd.peers[i%1000]) + _ = ps.DeleteSeeder(bd.infoHashes[0], bd.peers[i%1000]) return nil }) } // DeleteNonexist1kInfoHash benchmarks the DeleteSeeder method of a storage.Storage by -// attempting to delete one Peer from one of 1000 infohashes. +// attempting to delete one Peer from one of 1000 infoHashes. // // DeleteNonexist1kInfoHash can run in parallel. func (bh *benchHolder) DeleteNonexist1kInfoHash(b *testing.B) { bh.runBenchmark(b, true, nil, func(i int, ps storage.Storage, bd *benchData) error { - _ = ps.DeleteSeeder(bd.infohashes[i%1000], bd.peers[0]) + _ = ps.DeleteSeeder(bd.infoHashes[i%1000], bd.peers[0]) return nil }) } @@ -253,7 +249,7 @@ func (bh *benchHolder) DeleteNonexist1kInfoHash(b *testing.B) { // DeleteNonexist1kInfoHash1k can run in parallel. func (bh *benchHolder) DeleteNonexist1kInfoHash1k(b *testing.B) { bh.runBenchmark(b, true, nil, func(i int, ps storage.Storage, bd *benchData) error { - _ = ps.DeleteSeeder(bd.infohashes[i%1000], bd.peers[(i*3)%1000]) + _ = ps.DeleteSeeder(bd.infoHashes[i%1000], bd.peers[(i*3)%1000]) return nil }) } @@ -264,7 +260,7 @@ func (bh *benchHolder) DeleteNonexist1kInfoHash1k(b *testing.B) { // GradNonexist can run in parallel. func (bh *benchHolder) GradNonexist(b *testing.B) { bh.runBenchmark(b, true, nil, func(i int, ps storage.Storage, bd *benchData) error { - _ = ps.GraduateLeecher(bd.infohashes[0], bd.peers[0]) + _ = ps.GraduateLeecher(bd.infoHashes[0], bd.peers[0]) return nil }) } @@ -275,7 +271,7 @@ func (bh *benchHolder) GradNonexist(b *testing.B) { // GradNonexist1k can run in parallel. func (bh *benchHolder) GradNonexist1k(b *testing.B) { bh.runBenchmark(b, true, nil, func(i int, ps storage.Storage, bd *benchData) error { - _ = ps.GraduateLeecher(bd.infohashes[0], bd.peers[i%1000]) + _ = ps.GraduateLeecher(bd.infoHashes[0], bd.peers[i%1000]) return nil }) } @@ -286,19 +282,19 @@ func (bh *benchHolder) GradNonexist1k(b *testing.B) { // GradNonexist1kInfoHash can run in parallel. func (bh *benchHolder) GradNonexist1kInfoHash(b *testing.B) { bh.runBenchmark(b, true, nil, func(i int, ps storage.Storage, bd *benchData) error { - _ = ps.GraduateLeecher(bd.infohashes[i%1000], bd.peers[0]) + _ = ps.GraduateLeecher(bd.infoHashes[i%1000], bd.peers[0]) return nil }) } // GradNonexist1kInfoHash1k benchmarks the GraduateLeecher method of a storage.Storage // by attempting to graduate one of 1000 nonexistent Peers for one of 1000 -// infohashes. +// infoHashes. // // GradNonexist1kInfoHash1k can run in parallel. func (bh *benchHolder) GradNonexist1kInfoHash1k(b *testing.B) { bh.runBenchmark(b, true, nil, func(i int, ps storage.Storage, bd *benchData) error { - _ = ps.GraduateLeecher(bd.infohashes[i%1000], bd.peers[(i*3)%1000]) + _ = ps.GraduateLeecher(bd.infoHashes[i%1000], bd.peers[(i*3)%1000]) return nil }) } @@ -310,15 +306,15 @@ func (bh *benchHolder) GradNonexist1kInfoHash1k(b *testing.B) { // PutGradDelete can not run in parallel. func (bh *benchHolder) PutGradDelete(b *testing.B) { bh.runBenchmark(b, false, nil, func(i int, ps storage.Storage, bd *benchData) error { - err := ps.PutLeecher(bd.infohashes[0], bd.peers[0]) + err := ps.PutLeecher(bd.infoHashes[0], bd.peers[0]) if err != nil { return err } - err = ps.GraduateLeecher(bd.infohashes[0], bd.peers[0]) + err = ps.GraduateLeecher(bd.infoHashes[0], bd.peers[0]) if err != nil { return err } - return ps.DeleteSeeder(bd.infohashes[0], bd.peers[0]) + return ps.DeleteSeeder(bd.infoHashes[0], bd.peers[0]) }) } @@ -327,51 +323,51 @@ func (bh *benchHolder) PutGradDelete(b *testing.B) { // PutGradDelete1k can not run in parallel. func (bh *benchHolder) PutGradDelete1k(b *testing.B) { bh.runBenchmark(b, false, nil, func(i int, ps storage.Storage, bd *benchData) error { - err := ps.PutLeecher(bd.infohashes[0], bd.peers[i%1000]) + err := ps.PutLeecher(bd.infoHashes[0], bd.peers[i%1000]) if err != nil { return err } - err = ps.GraduateLeecher(bd.infohashes[0], bd.peers[i%1000]) + err = ps.GraduateLeecher(bd.infoHashes[0], bd.peers[i%1000]) if err != nil { return err } - return ps.DeleteSeeder(bd.infohashes[0], bd.peers[i%1000]) + return ps.DeleteSeeder(bd.infoHashes[0], bd.peers[i%1000]) }) } // PutGradDelete1kInfoHash behaves like PutGradDelete with one of 1000 -// infohashes. +// infoHashes. // // PutGradDelete1kInfoHash can not run in parallel. func (bh *benchHolder) PutGradDelete1kInfoHash(b *testing.B) { bh.runBenchmark(b, false, nil, func(i int, ps storage.Storage, bd *benchData) error { - err := ps.PutLeecher(bd.infohashes[i%1000], bd.peers[0]) + err := ps.PutLeecher(bd.infoHashes[i%1000], bd.peers[0]) if err != nil { return err } - err = ps.GraduateLeecher(bd.infohashes[i%1000], bd.peers[0]) + err = ps.GraduateLeecher(bd.infoHashes[i%1000], bd.peers[0]) if err != nil { return err } - return ps.DeleteSeeder(bd.infohashes[i%1000], bd.peers[0]) + return ps.DeleteSeeder(bd.infoHashes[i%1000], bd.peers[0]) }) } // PutGradDelete1kInfoHash1k behaves like PutGradDelete with one of 1000 Peers -// and one of 1000 infohashes. +// and one of 1000 infoHashes. // // PutGradDelete1kInfoHash can not run in parallel. func (bh *benchHolder) PutGradDelete1kInfoHash1k(b *testing.B) { bh.runBenchmark(b, false, nil, func(i int, ps storage.Storage, bd *benchData) error { - err := ps.PutLeecher(bd.infohashes[i%1000], bd.peers[(i*3)%1000]) + err := ps.PutLeecher(bd.infoHashes[i%1000], bd.peers[(i*3)%1000]) if err != nil { return err } - err = ps.GraduateLeecher(bd.infohashes[i%1000], bd.peers[(i*3)%1000]) + err = ps.GraduateLeecher(bd.infoHashes[i%1000], bd.peers[(i*3)%1000]) if err != nil { return err } - err = ps.DeleteSeeder(bd.infohashes[i%1000], bd.peers[(i*3)%1000]) + err = ps.DeleteSeeder(bd.infoHashes[i%1000], bd.peers[(i*3)%1000]) return err }) } @@ -381,9 +377,9 @@ func putPeers(ps storage.Storage, bd *benchData) error { for j := 0; j < 1000; j++ { var err error if j < 1000/2 { - err = ps.PutLeecher(bd.infohashes[i], bd.peers[j]) + err = ps.PutLeecher(bd.infoHashes[i], bd.peers[j]) } else { - err = ps.PutSeeder(bd.infohashes[i], bd.peers[j]) + err = ps.PutSeeder(bd.infoHashes[i], bd.peers[j]) } if err != nil { return err @@ -400,18 +396,18 @@ func putPeers(ps storage.Storage, bd *benchData) error { // AnnounceLeecher can run in parallel. func (bh *benchHolder) AnnounceLeecher(b *testing.B) { bh.runBenchmark(b, true, putPeers, func(i int, ps storage.Storage, bd *benchData) error { - _, err := ps.AnnouncePeers(bd.infohashes[0], false, 50, bd.peers[0]) + _, err := ps.AnnouncePeers(bd.infoHashes[0], false, 50, bd.peers[0]) return err }) } // AnnounceLeecher1kInfoHash behaves like AnnounceLeecher with one of 1000 -// infohashes. +// infoHashes. // // AnnounceLeecher1kInfoHash can run in parallel. func (bh *benchHolder) AnnounceLeecher1kInfoHash(b *testing.B) { bh.runBenchmark(b, true, putPeers, func(i int, ps storage.Storage, bd *benchData) error { - _, err := ps.AnnouncePeers(bd.infohashes[i%1000], false, 50, bd.peers[0]) + _, err := ps.AnnouncePeers(bd.infoHashes[i%1000], false, 50, bd.peers[0]) return err }) } @@ -422,18 +418,18 @@ func (bh *benchHolder) AnnounceLeecher1kInfoHash(b *testing.B) { // AnnounceSeeder can run in parallel. func (bh *benchHolder) AnnounceSeeder(b *testing.B) { bh.runBenchmark(b, true, putPeers, func(i int, ps storage.Storage, bd *benchData) error { - _, err := ps.AnnouncePeers(bd.infohashes[0], true, 50, bd.peers[0]) + _, err := ps.AnnouncePeers(bd.infoHashes[0], true, 50, bd.peers[0]) return err }) } // AnnounceSeeder1kInfoHash behaves like AnnounceSeeder with one of 1000 -// infohashes. +// infoHashes. // // AnnounceSeeder1kInfoHash can run in parallel. func (bh *benchHolder) AnnounceSeeder1kInfoHash(b *testing.B) { bh.runBenchmark(b, true, putPeers, func(i int, ps storage.Storage, bd *benchData) error { - _, err := ps.AnnouncePeers(bd.infohashes[i%1000], true, 50, bd.peers[0]) + _, err := ps.AnnouncePeers(bd.infoHashes[i%1000], true, 50, bd.peers[0]) return err }) } @@ -444,17 +440,17 @@ func (bh *benchHolder) AnnounceSeeder1kInfoHash(b *testing.B) { // ScrapeSwarm can run in parallel. func (bh *benchHolder) ScrapeSwarm(b *testing.B) { bh.runBenchmark(b, true, putPeers, func(i int, ps storage.Storage, bd *benchData) error { - ps.ScrapeSwarm(bd.infohashes[0], bd.peers[0]) + ps.ScrapeSwarm(bd.infoHashes[0], bd.peers[0]) return nil }) } -// ScrapeSwarm1kInfoHash behaves like ScrapeSwarm with one of 1000 infohashes. +// ScrapeSwarm1kInfoHash behaves like ScrapeSwarm with one of 1000 infoHashes. // // ScrapeSwarm1kInfoHash can run in parallel. func (bh *benchHolder) ScrapeSwarm1kInfoHash(b *testing.B) { bh.runBenchmark(b, true, putPeers, func(i int, ps storage.Storage, bd *benchData) error { - ps.ScrapeSwarm(bd.infohashes[i%1000], bd.peers[0]) + ps.ScrapeSwarm(bd.infoHashes[i%1000], bd.peers[0]) return nil }) } diff --git a/storage/test/storage_test_base.go b/storage/test/storage_test_base.go index 3e0accd..b533dbd 100644 --- a/storage/test/storage_test_base.go +++ b/storage/test/storage_test_base.go @@ -2,6 +2,7 @@ package test import ( "testing" + "time" "github.com/stretchr/testify/require" @@ -244,6 +245,19 @@ func (th *testHolder) CustomBulkPutContainsLoadDelete(t *testing.T) { } } +func (th *testHolder) GC(t *testing.T) { + for _, c := range testData { + require.Nil(t, th.st.PutSeeder(c.ih, c.peer)) + require.Nil(t, th.st.PutSeeder(c.ih, v4Peer)) + require.Nil(t, th.st.PutSeeder(c.ih, v6Peer)) + } + th.st.GC(time.Now().Add(time.Hour)) + for _, c := range testData { + _, err := th.st.AnnouncePeers(c.ih, false, 100, v4Peer) + require.Equal(t, storage.ErrResourceDoesNotExist, err) + } +} + // RunTests tests a Storage implementation against the interface. func RunTests(t *testing.T, p storage.Storage) { th := testHolder{st: p} @@ -275,6 +289,8 @@ func RunTests(t *testing.T, p storage.Storage) { t.Run("CustomPutContainsLoadDelete", th.CustomPutContainsLoadDelete) t.Run("CustomBulkPutContainsLoadDelete", th.CustomBulkPutContainsLoadDelete) + t.Run("GC", th.GC) + e := th.st.Stop() require.Nil(t, <-e) } diff --git a/storage/test/storage_test_data.go b/storage/test/storage_test_data.go index dbc2fbd..c2a2611 100644 --- a/storage/test/storage_test_data.go +++ b/storage/test/storage_test_data.go @@ -1,9 +1,12 @@ package test import ( + "math/rand" "net/netip" "github.com/sot-tech/mochi/bittorrent" + // used for seeding global math.Rand + _ "github.com/sot-tech/mochi/pkg/randseed" ) var ( @@ -13,13 +16,32 @@ var ( v4Peer, v6Peer bittorrent.Peer ) +func randIH(v2 bool) (ih bittorrent.InfoHash) { + var b []byte + if v2 { + b = make([]byte, bittorrent.InfoHashV2Len) + } else { + b = make([]byte, bittorrent.InfoHashV1Len) + } + rand.Read(b) + ih, _ = bittorrent.NewInfoHash(b) + return +} + +func randPeerID() (ih bittorrent.PeerID) { + b := make([]byte, bittorrent.PeerIDLen) + rand.Read(b) + ih, _ = bittorrent.NewPeerID(b) + return +} + func init() { - testIh1, _ = bittorrent.NewInfoHash("00000000000000000001") - testIh2, _ = bittorrent.NewInfoHash("00000000000000000002") - testPeerID0, _ = bittorrent.NewPeerID([]byte("00000000000000000001")) - testPeerID1, _ = bittorrent.NewPeerID([]byte("00000000000000000002")) - testPeerID2, _ = bittorrent.NewPeerID([]byte("99999999999999999994")) - testPeerID3, _ = bittorrent.NewPeerID([]byte("99999999999999999996")) + testIh1 = randIH(false) + testIh2 = randIH(true) + testPeerID0 = randPeerID() + testPeerID1 = randPeerID() + testPeerID2 = randPeerID() + testPeerID3 = randPeerID() testData = []hashPeer{ { testIh1,