diff --git a/.github/workflows/lint.yaml b/.github/workflows/lint.yaml index a357920..e2fd4d4 100644 --- a/.github/workflows/lint.yaml +++ b/.github/workflows/lint.yaml @@ -15,7 +15,7 @@ jobs: - uses: "actions/checkout@v3" - uses: "actions/setup-go@v3" with: - go-version: "^1.18" + go-version: "^1.19" - uses: "authzed/actions/gofumpt@main" - uses: "authzed/actions/go-mod-tidy@main" - uses: "authzed/actions/go-generate@main" diff --git a/.golangci.yaml b/.golangci.yaml index 7eb8775..94a7e35 100644 --- a/.golangci.yaml +++ b/.golangci.yaml @@ -1,8 +1,6 @@ --- run: - # mochi in not written with generics (a.t.m), - # so we can check with 1.17 - go: "1.17" + go: "1.19" timeout: "5m" output: sort-results: true diff --git a/storage/memory/storage.go b/storage/memory/storage.go index ca41b26..3a32e33 100644 --- a/storage/memory/storage.go +++ b/storage/memory/storage.go @@ -5,13 +5,13 @@ package memory import ( "context" "encoding/binary" - "github.com/cornelk/hashmap" "math" "runtime" "sync" "sync/atomic" "time" + "github.com/cornelk/hashmap" "github.com/sot-tech/mochi/bittorrent" "github.com/sot-tech/mochi/pkg/conf" "github.com/sot-tech/mochi/pkg/log" @@ -160,15 +160,15 @@ func (ps *peerStore) ScheduleStatisticsCollection(reportInterval time.Duration) before := time.Now() // aggregates metrics over all shards and then posts them to // prometheus. - var numInfohashes, numSeeders, numLeechers uint64 + var numInfoHashes, numSeeders, numLeechers uint64 for _, s := range ps.shards { - numInfohashes += uint64(s.swarms.Len()) - numSeeders += uint64(s.numSeeders.Load()) - numLeechers += uint64(s.numLeechers.Load()) + numInfoHashes += uint64(s.swarms.Len()) + numSeeders += s.numSeeders.Load() + numLeechers += s.numLeechers.Load() } - storage.PromInfoHashesCount.Set(float64(numInfohashes)) + storage.PromInfoHashesCount.Set(float64(numInfoHashes)) storage.PromSeedersCount.Set(float64(numSeeders)) storage.PromLeechersCount.Set(float64(numLeechers)) logger.Debug().TimeDiff("timeTaken", time.Now(), before).Msg("populate prom complete") @@ -201,13 +201,13 @@ func (ps *peerStore) PutSeeder(_ context.Context, ih bittorrent.InfoHash, p bitt Msg("put seeder") sh := ps.shards[ps.shardIndex(ih, p.Addr().Is6())] - sw, pId := sh.ginSwarm(ih), p.RawString() + sw, pID := sh.ginSwarm(ih), p.RawString() - if _, exists := sw.seeders.Get(pId); !exists { + if _, exists := sw.seeders.Get(pID); !exists { sh.numSeeders.Add(1) } - sw.seeders.Set(pId, timecache.NowUnixNano()) + sw.seeders.Set(pID, timecache.NowUnixNano()) return nil } @@ -247,13 +247,13 @@ func (ps *peerStore) PutLeecher(_ context.Context, ih bittorrent.InfoHash, p bit Msg("put leecher") sh := ps.shards[ps.shardIndex(ih, p.Addr().Is6())] - sw, pId := sh.ginSwarm(ih), p.RawString() + sw, pID := sh.ginSwarm(ih), p.RawString() - if _, exists := sw.leechers.Get(pId); !exists { + if _, exists := sw.leechers.Get(pID); !exists { sh.numLeechers.Add(1) } - sw.leechers.Set(pId, timecache.NowUnixNano()) + sw.leechers.Set(pID, timecache.NowUnixNano()) return nil } @@ -293,45 +293,21 @@ func (ps *peerStore) GraduateLeecher(_ context.Context, ih bittorrent.InfoHash, Msg("graduate leecher") sh := ps.shards[ps.shardIndex(ih, p.Addr().Is6())] - sw, pId := sh.ginSwarm(ih), p.RawString() + sw, pID := sh.ginSwarm(ih), p.RawString() - if sw.leechers.Del(pId) { + if sw.leechers.Del(pID) { sh.numLeechers.Add(decrUint64) } - if _, exists := sw.seeders.Get(pId); !exists { + if _, exists := sw.seeders.Get(pID); !exists { sh.numSeeders.Add(1) } - sw.seeders.Set(pId, timecache.NowUnixNano()) + sw.seeders.Set(pID, timecache.NowUnixNano()) return nil } -func parsePeers(peersMap *hashmap.Map[string, int64], maxCount int) (peers []bittorrent.Peer) { - peersMap.Range(func(pId string, _ int64) bool { - p, _ := bittorrent.NewPeer(pId) - peers = append(peers, p) - maxCount-- - return maxCount > 0 - }) - return -} - -func (ps *peerStore) getPeers(shard *peerShard, ih bittorrent.InfoHash, numWant int, forSeeder bool) (peers []bittorrent.Peer) { - if sw, ok := shard.swarms.Get(ih); ok { - if forSeeder { - peers = parsePeers(sw.leechers, numWant) - } else { - peers = append(peers, parsePeers(sw.seeders, numWant)...) - if numWant -= len(peers); numWant > 0 { - peers = append(peers, parsePeers(sw.leechers, numWant)...) - } - } - } - return -} - func (ps *peerStore) AnnouncePeers(_ context.Context, ih bittorrent.InfoHash, forSeeder bool, numWant int, v6 bool) (peers []bittorrent.Peer, err error) { select { case <-ps.closed: @@ -345,7 +321,23 @@ func (ps *peerStore) AnnouncePeers(_ context.Context, ih bittorrent.InfoHash, fo Bool("v6", v6). Msg("announce peers") - peers = ps.getPeers(ps.shards[ps.shardIndex(ih, v6)], ih, numWant, forSeeder) + if sw, ok := ps.shards[ps.shardIndex(ih, v6)].swarms.Get(ih); ok { + peers = make([]bittorrent.Peer, 0, numWant/2) + rangeFn := func(pID string, _ int64) bool { + p, _ := bittorrent.NewPeer(pID) + peers = append(peers, p) + numWant-- + return numWant > 0 + } + if forSeeder { + sw.leechers.Range(rangeFn) + } else { + sw.seeders.Range(rangeFn) + if numWant > 0 { + sw.leechers.Range(rangeFn) + } + } + } return } @@ -453,40 +445,37 @@ func (ps *peerStore) gc(cutoff time.Time) { for _, shard := range ps.shards { infoHashes := make([]bittorrent.InfoHash, 0, shard.swarms.Len()) - shard.swarms.Range(func(ih bittorrent.InfoHash, sw swarm) bool { + shard.swarms.Range(func(ih bittorrent.InfoHash, _ swarm) bool { infoHashes = append(infoHashes, ih) return true }) runtime.Gosched() for _, ih := range infoHashes { - sw, stillExists := shard.swarms.Get(ih) if !stillExists { runtime.Gosched() continue } - sw.leechers.Range(func(pId string, mtime int64) bool { + sw.leechers.Range(func(pID string, mtime int64) bool { if mtime <= cutoffUnix { - sw.leechers.Del(pId) + sw.leechers.Del(pID) shard.numLeechers.Add(decrUint64) } return true }) - sw.seeders.Range(func(pId string, mtime int64) bool { + sw.seeders.Range(func(pID string, mtime int64) bool { if mtime <= cutoffUnix { - sw.seeders.Del(pId) + sw.seeders.Del(pID) shard.numSeeders.Add(decrUint64) } return true }) if sw.leechers.Len()|sw.seeders.Len() == 0 { - shard.Lock() shard.swarms.Del(ih) - shard.Unlock() } runtime.Gosched()