diff --git a/go.mod b/go.mod index 0f1d0cd..ab9a59e 100644 --- a/go.mod +++ b/go.mod @@ -6,7 +6,6 @@ require ( code.cloudfoundry.org/go-diodes v0.0.0-20221122215814-f99b6a62c703 github.com/MicahParks/keyfunc v1.6.0 github.com/anacrolix/torrent v1.47.0 - github.com/cornelk/hashmap v1.0.8 github.com/go-redis/redis/v8 v8.11.5 github.com/golang-jwt/jwt/v4 v4.4.2 github.com/jackc/pgx/v5 v5.1.1 diff --git a/go.sum b/go.sum index 1bd9d64..97cd23c 100644 --- a/go.sum +++ b/go.sum @@ -95,8 +95,6 @@ github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMn github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc= github.com/coreos/go-systemd/v22 v22.3.3-0.20220203105225-a9a7ef127534/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc= -github.com/cornelk/hashmap v1.0.8 h1:nv0AWgw02n+iDcawr5It4CjQIAcdMMKRrs10HOJYlrc= -github.com/cornelk/hashmap v1.0.8/go.mod h1:RfZb7JO3RviW/rT6emczVuC/oxpdz4UsSB2LJSclR1k= github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= diff --git a/storage/memory/storage.go b/storage/memory/storage.go index ad978de..d8d20a3 100644 --- a/storage/memory/storage.go +++ b/storage/memory/storage.go @@ -11,7 +11,6 @@ import ( "sync/atomic" "time" - "github.com/cornelk/hashmap" "github.com/sot-tech/mochi/bittorrent" "github.com/sot-tech/mochi/pkg/conf" "github.com/sot-tech/mochi/pkg/log" @@ -78,37 +77,135 @@ func NewPeerStorage(provided Config) (storage.PeerStorage, error) { } for i := 0; i < cfg.ShardCount*2; i++ { - ps.shards[i] = &peerShard{swarms: hashmap.New[bittorrent.InfoHash, swarm]()} + ps.shards[i] = &peerShard{swarms: &ihSwarm{m: make(map[bittorrent.InfoHash]swarm)}} } return ps, nil } type peerShard struct { - swarms *hashmap.Map[bittorrent.InfoHash, swarm] + swarms *ihSwarm numSeeders atomic.Uint64 numLeechers atomic.Uint64 - sync.Mutex } -// ginSwarm returns existing swarm or inserts new empty if it does not exist -func (sh *peerShard) ginSwarm(ih bittorrent.InfoHash) swarm { - sw, ok := sh.swarms.Get(ih) - if !ok { - sh.Lock() - defer sh.Unlock() - sw, _ = sh.swarms.GetOrInsert(ih, swarm{ - seeders: hashmap.New[string, int64](), - leechers: hashmap.New[string, int64](), - }) +type ihSwarm struct { + m map[bittorrent.InfoHash]swarm + sync.RWMutex +} + +func (p *ihSwarm) get(k bittorrent.InfoHash) (v swarm, ok bool) { + p.RLock() + v, ok = p.m[k] + p.RUnlock() + return +} + +func (p *ihSwarm) getOrCreate(k bittorrent.InfoHash) (v swarm) { + var ok bool + if v, ok = p.get(k); !ok { + p.Lock() + if v, ok = p.m[k]; !ok { + v = swarm{ + seeders: &peers{m: make(map[bittorrent.Peer]int64)}, + leechers: &peers{m: make(map[bittorrent.Peer]int64)}, + } + p.m[k] = v + } + p.Unlock() } - return sw + return +} + +func (p *ihSwarm) del(k bittorrent.InfoHash) (ok bool) { + p.Lock() + if _, ok = p.m[k]; ok { + delete(p.m, k) + } + p.Unlock() + return +} + +func (p *ihSwarm) len() int { + return len(p.m) +} + +func (p *ihSwarm) keys(fn func(k bittorrent.InfoHash) bool) { + p.RLock() + for k := range p.m { + if !fn(k) { + break + } + } + p.RUnlock() +} + +func (p *ihSwarm) forEach(fn func(k bittorrent.InfoHash, v swarm) bool) { + p.RLock() + for k, v := range p.m { + if !fn(k, v) { + break + } + } + p.RUnlock() } type swarm struct { // map serialized peer to mtime - seeders *hashmap.Map[string, int64] - leechers *hashmap.Map[string, int64] + seeders *peers + leechers *peers +} + +type peers struct { + m map[bittorrent.Peer]int64 + sync.RWMutex +} + +func (p *peers) get(k bittorrent.Peer) (v int64, ok bool) { + p.RLock() + v, ok = p.m[k] + p.RUnlock() + return +} + +func (p *peers) set(k bittorrent.Peer, v int64) { + p.Lock() + p.m[k] = v + p.Unlock() + return +} + +func (p *peers) del(k bittorrent.Peer) (ok bool) { + p.Lock() + if _, ok = p.m[k]; ok { + delete(p.m, k) + } + p.Unlock() + return +} + +func (p *peers) len() int { + return len(p.m) +} + +func (p *peers) keys(fn func(k bittorrent.Peer) bool) { + p.RLock() + for k := range p.m { + if !fn(k) { + break + } + } + p.RUnlock() +} + +func (p *peers) forEach(fn func(k bittorrent.Peer, v int64) bool) { + p.RLock() + for k, v := range p.m { + if !fn(k, v) { + break + } + } + p.RUnlock() } type peerStore struct { @@ -163,7 +260,7 @@ func (ps *peerStore) ScheduleStatisticsCollection(reportInterval time.Duration) var numInfoHashes, numSeeders, numLeechers uint64 for _, s := range ps.shards { - numInfoHashes += uint64(s.swarms.Len()) + numInfoHashes += uint64(s.swarms.len()) numSeeders += s.numSeeders.Load() numLeechers += s.numLeechers.Load() } @@ -201,13 +298,13 @@ func (ps *peerStore) PutSeeder(_ context.Context, ih bittorrent.InfoHash, p bitt Msg("put seeder") sh := ps.shards[ps.shardIndex(ih, p.Addr().Is6())] - sw, pID := sh.ginSwarm(ih), p.RawString() + sw := sh.swarms.getOrCreate(ih) - if _, exists := sw.seeders.Get(pID); !exists { + if _, exists := sw.seeders.get(p); !exists { sh.numSeeders.Add(1) } - sw.seeders.Set(pID, timecache.NowUnixNano()) + sw.seeders.set(p, timecache.NowUnixNano()) return nil } @@ -224,8 +321,8 @@ func (ps *peerStore) DeleteSeeder(_ context.Context, ih bittorrent.InfoHash, p b Msg("delete seeder") sh := ps.shards[ps.shardIndex(ih, p.Addr().Is6())] - if sw, ok := sh.swarms.Get(ih); ok { - if sw.seeders.Del(p.RawString()) { + if sw, ok := sh.swarms.get(ih); ok { + if sw.seeders.del(p) { sh.numSeeders.Add(decrUint64) } } else { @@ -247,13 +344,13 @@ func (ps *peerStore) PutLeecher(_ context.Context, ih bittorrent.InfoHash, p bit Msg("put leecher") sh := ps.shards[ps.shardIndex(ih, p.Addr().Is6())] - sw, pID := sh.ginSwarm(ih), p.RawString() + sw := sh.swarms.getOrCreate(ih) - if _, exists := sw.leechers.Get(pID); !exists { + if _, exists := sw.leechers.get(p); !exists { sh.numLeechers.Add(1) } - sw.leechers.Set(pID, timecache.NowUnixNano()) + sw.leechers.set(p, timecache.NowUnixNano()) return nil } @@ -270,8 +367,8 @@ func (ps *peerStore) DeleteLeecher(_ context.Context, ih bittorrent.InfoHash, p Msg("delete leecher") sh := ps.shards[ps.shardIndex(ih, p.Addr().Is6())] - if sw, ok := sh.swarms.Get(ih); ok { - if sw.leechers.Del(p.RawString()) { + if sw, ok := sh.swarms.get(ih); ok { + if sw.leechers.del(p) { sh.numLeechers.Add(decrUint64) } } else { @@ -293,17 +390,17 @@ func (ps *peerStore) GraduateLeecher(_ context.Context, ih bittorrent.InfoHash, Msg("graduate leecher") sh := ps.shards[ps.shardIndex(ih, p.Addr().Is6())] - sw, pID := sh.ginSwarm(ih), p.RawString() + sw := sh.swarms.getOrCreate(ih) - if sw.leechers.Del(pID) { + if sw.leechers.del(p) { sh.numLeechers.Add(decrUint64) } - if _, exists := sw.seeders.Get(pID); !exists { + if _, exists := sw.seeders.get(p); !exists { sh.numSeeders.Add(1) } - sw.seeders.Set(pID, timecache.NowUnixNano()) + sw.seeders.set(p, timecache.NowUnixNano()) return nil } @@ -321,20 +418,19 @@ func (ps *peerStore) AnnouncePeers(_ context.Context, ih bittorrent.InfoHash, fo Bool("v6", v6). Msg("announce peers") - if sw, ok := ps.shards[ps.shardIndex(ih, v6)].swarms.Get(ih); ok { + if sw, ok := ps.shards[ps.shardIndex(ih, v6)].swarms.get(ih); ok { peers = make([]bittorrent.Peer, 0, numWant/2) - rangeFn := func(pID string, _ int64) bool { - p, _ := bittorrent.NewPeer(pID) + rangeFn := func(p bittorrent.Peer) bool { peers = append(peers, p) numWant-- return numWant > 0 } if forSeeder { - sw.leechers.Range(rangeFn) + sw.leechers.keys(rangeFn) } else { - sw.seeders.Range(rangeFn) + sw.seeders.keys(rangeFn) if numWant > 0 { - sw.leechers.Range(rangeFn) + sw.leechers.keys(rangeFn) } } } @@ -345,8 +441,8 @@ func (ps *peerStore) AnnouncePeers(_ context.Context, ih bittorrent.InfoHash, fo func (ps *peerStore) countPeers(ih bittorrent.InfoHash, v6 bool) (leechers, seeders uint32) { shard := ps.shards[ps.shardIndex(ih, v6)] - if sw, ok := shard.swarms.Get(ih); ok { - leechers, seeders = uint32(sw.leechers.Len()), uint32(sw.seeders.Len()) + if sw, ok := shard.swarms.get(ih); ok { + leechers, seeders = uint32(sw.leechers.len()), uint32(sw.seeders.len()) } return } @@ -370,30 +466,19 @@ func (ps *peerStore) ScrapeSwarm(_ context.Context, ih bittorrent.InfoHash) (lee // NewDataStorage creates new in-memory data store func NewDataStorage() storage.DataStorage { - return &dataStore{ - hashmap.New[string, *hashmap.Map[string, []byte]](), - sync.Mutex{}, - } + return new(dataStore) } type dataStore struct { - *hashmap.Map[string, *hashmap.Map[string, []byte]] - sync.Mutex + sync.Map } func (ds *dataStore) Put(_ context.Context, ctx string, values ...storage.Entry) error { if len(values) > 0 { - m, ok := ds.Get(ctx) - if !ok { - ds.Lock() - if m, ok = ds.Get(ctx); !ok { - m = hashmap.New[string, []byte]() - ds.Insert(ctx, m) - } - ds.Unlock() - } + c, _ := ds.LoadOrStore(ctx, new(sync.Map)) + m := c.(*sync.Map) for _, p := range values { - m.Set(p.Key, p.Value) + m.Store(p.Key, p.Value) } } return nil @@ -401,24 +486,27 @@ func (ds *dataStore) Put(_ context.Context, ctx string, values ...storage.Entry) func (ds *dataStore) Contains(_ context.Context, ctx string, key string) (bool, error) { var exist bool - if m, found := ds.Get(ctx); found { - _, exist = m.Get(key) + if m, found := ds.Map.Load(ctx); found { + _, exist = m.(*sync.Map).Load(key) } return exist, nil } func (ds *dataStore) Load(_ context.Context, ctx string, key string) (out []byte, _ error) { - if m, found := ds.Map.Get(ctx); found { - out, _ = m.Get(key) + if m, found := ds.Map.Load(ctx); found { + if v, _ := m.(*sync.Map).Load(key); v != nil { + out = v.([]byte) + } } return } func (ds *dataStore) Delete(_ context.Context, ctx string, keys ...string) error { if len(keys) > 0 { - if m, found := ds.Get(ctx); found { + if m, found := ds.Map.Load(ctx); found { + m := m.(*sync.Map) for _, k := range keys { - m.Del(k) + m.Delete(k) } } } @@ -444,38 +532,38 @@ func (ps *peerStore) gc(cutoff time.Time) { cutoffUnix := cutoff.UnixNano() for _, shard := range ps.shards { - infoHashes := make([]bittorrent.InfoHash, 0, shard.swarms.Len()) - shard.swarms.Range(func(ih bittorrent.InfoHash, _ swarm) bool { + infoHashes := make([]bittorrent.InfoHash, 0, shard.swarms.len()) + shard.swarms.keys(func(ih bittorrent.InfoHash) bool { infoHashes = append(infoHashes, ih) return true }) runtime.Gosched() for _, ih := range infoHashes { - sw, stillExists := shard.swarms.Get(ih) + sw, stillExists := shard.swarms.get(ih) if !stillExists { runtime.Gosched() continue } - sw.leechers.Range(func(pID string, mtime int64) bool { + sw.leechers.forEach(func(p bittorrent.Peer, mtime int64) bool { if mtime <= cutoffUnix { - sw.leechers.Del(pID) + sw.leechers.del(p) shard.numLeechers.Add(decrUint64) } return true }) - sw.seeders.Range(func(pID string, mtime int64) bool { + sw.seeders.forEach(func(p bittorrent.Peer, mtime int64) bool { if mtime <= cutoffUnix { - sw.seeders.Del(pID) + sw.seeders.del(p) shard.numSeeders.Add(decrUint64) } return true }) - if sw.leechers.Len()|sw.seeders.Len() == 0 { - shard.swarms.Del(ih) + if sw.leechers.len()|sw.seeders.len() == 0 { + shard.swarms.del(ih) } runtime.Gosched() @@ -497,7 +585,7 @@ func (ps *peerStore) Close() error { // Explicitly deallocate our storage. shards := make([]*peerShard, len(ps.shards)) for i := 0; i < len(ps.shards); i++ { - shards[i] = &peerShard{swarms: hashmap.New[bittorrent.InfoHash, swarm]()} + shards[i] = &peerShard{swarms: &ihSwarm{m: make(map[bittorrent.InfoHash]swarm)}} } ps.shards = shards })