From 7e3204b9dd3591b00780314924857a51f1f85f33 Mon Sep 17 00:00:00 2001 From: "Lawrence, Rendall" Date: Sun, 27 Nov 2022 16:22:04 +0300 Subject: [PATCH] replace go map with cornelk/hashmap in memory store --- go.mod | 1 + go.sum | 2 + storage/memory/storage.go | 282 +++++++++++++++++--------------------- 3 files changed, 127 insertions(+), 158 deletions(-) diff --git a/go.mod b/go.mod index a4e66c4..433855a 100644 --- a/go.mod +++ b/go.mod @@ -18,6 +18,7 @@ require ( github.com/stretchr/testify v1.8.1 github.com/zeebo/xxh3 v1.0.2 gopkg.in/yaml.v3 v3.0.1 + github.com/cornelk/hashmap v1.0.8 ) require ( diff --git a/go.sum b/go.sum index 8dd0a16..cc136f3 100644 --- a/go.sum +++ b/go.sum @@ -95,6 +95,8 @@ github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMn github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc= github.com/coreos/go-systemd/v22 v22.3.3-0.20220203105225-a9a7ef127534/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc= +github.com/cornelk/hashmap v1.0.8 h1:nv0AWgw02n+iDcawr5It4CjQIAcdMMKRrs10HOJYlrc= +github.com/cornelk/hashmap v1.0.8/go.mod h1:RfZb7JO3RviW/rT6emczVuC/oxpdz4UsSB2LJSclR1k= github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= diff --git a/storage/memory/storage.go b/storage/memory/storage.go index 88c7c37..ca41b26 100644 --- a/storage/memory/storage.go +++ b/storage/memory/storage.go @@ -5,9 +5,11 @@ package memory import ( "context" "encoding/binary" + "github.com/cornelk/hashmap" "math" "runtime" "sync" + "sync/atomic" "time" "github.com/sot-tech/mochi/bittorrent" @@ -23,6 +25,8 @@ const ( Name = "memory" // Default config constants. defaultShardCount = 1024 + // -1 + decrUint64 = ^uint64(0) ) var logger = log.NewLogger("storage/memory") @@ -68,38 +72,50 @@ func (cfg Config) Validate() Config { func NewPeerStorage(provided Config) (storage.PeerStorage, error) { cfg := provided.Validate() ps := &peerStore{ - cfg: cfg, shards: make([]*peerShard, cfg.ShardCount*2), DataStorage: NewDataStorage(), - closed: make(chan struct{}), + closed: make(chan any), } for i := 0; i < cfg.ShardCount*2; i++ { - ps.shards[i] = &peerShard{swarms: make(map[bittorrent.InfoHash]swarm)} + ps.shards[i] = &peerShard{swarms: hashmap.New[bittorrent.InfoHash, swarm]()} } return ps, nil } type peerShard struct { - swarms map[bittorrent.InfoHash]swarm - numSeeders uint64 - numLeechers uint64 - sync.RWMutex + swarms *hashmap.Map[bittorrent.InfoHash, swarm] + numSeeders atomic.Uint64 + numLeechers atomic.Uint64 + sync.Mutex +} + +// ginSwarm returns existing swarm or inserts new empty if it does not exist +func (sh *peerShard) ginSwarm(ih bittorrent.InfoHash) swarm { + sw, ok := sh.swarms.Get(ih) + if !ok { + sh.Lock() + defer sh.Unlock() + sw, _ = sh.swarms.GetOrInsert(ih, swarm{ + seeders: hashmap.New[string, int64](), + leechers: hashmap.New[string, int64](), + }) + } + return sw } type swarm struct { // map serialized peer to mtime - seeders map[bittorrent.Peer]int64 - leechers map[bittorrent.Peer]int64 + seeders *hashmap.Map[string, int64] + leechers *hashmap.Map[string, int64] } type peerStore struct { storage.DataStorage - cfg Config shards []*peerShard - closed chan struct{} + closed chan any wg sync.WaitGroup onceCloser sync.Once } @@ -147,11 +163,9 @@ func (ps *peerStore) ScheduleStatisticsCollection(reportInterval time.Duration) var numInfohashes, numSeeders, numLeechers uint64 for _, s := range ps.shards { - s.RLock() - numInfohashes += uint64(len(s.swarms)) - numSeeders += s.numSeeders - numLeechers += s.numLeechers - s.RUnlock() + numInfohashes += uint64(s.swarms.Len()) + numSeeders += uint64(s.numSeeders.Load()) + numLeechers += uint64(s.numLeechers.Load()) } storage.PromInfoHashesCount.Set(float64(numInfohashes)) @@ -186,29 +200,19 @@ func (ps *peerStore) PutSeeder(_ context.Context, ih bittorrent.InfoHash, p bitt Object("peer", p). Msg("put seeder") - shard := ps.shards[ps.shardIndex(ih, p.Addr().Is6())] - shard.Lock() - defer shard.Unlock() + sh := ps.shards[ps.shardIndex(ih, p.Addr().Is6())] + sw, pId := sh.ginSwarm(ih), p.RawString() - if _, ok := shard.swarms[ih]; !ok { - shard.swarms[ih] = swarm{ - seeders: make(map[bittorrent.Peer]int64), - leechers: make(map[bittorrent.Peer]int64), - } + if _, exists := sw.seeders.Get(pId); !exists { + sh.numSeeders.Add(1) } - // If this peer isn't already a seeder, update the stats for the swarm. - if _, ok := shard.swarms[ih].seeders[p]; !ok { - shard.numSeeders++ - } - - // Update the peer in the swarm. - shard.swarms[ih].seeders[p] = timecache.NowUnixNano() + sw.seeders.Set(pId, timecache.NowUnixNano()) return nil } -func (ps *peerStore) DeleteSeeder(_ context.Context, ih bittorrent.InfoHash, p bittorrent.Peer) error { +func (ps *peerStore) DeleteSeeder(_ context.Context, ih bittorrent.InfoHash, p bittorrent.Peer) (err error) { select { case <-ps.closed: panic("attempted to interact with stopped memory store") @@ -219,26 +223,16 @@ func (ps *peerStore) DeleteSeeder(_ context.Context, ih bittorrent.InfoHash, p b Object("peer", p). Msg("delete seeder") - shard := ps.shards[ps.shardIndex(ih, p.Addr().Is6())] - shard.Lock() - defer shard.Unlock() - - if _, ok := shard.swarms[ih]; !ok { - return storage.ErrResourceDoesNotExist + sh := ps.shards[ps.shardIndex(ih, p.Addr().Is6())] + if sw, ok := sh.swarms.Get(ih); ok { + if sw.seeders.Del(p.RawString()) { + sh.numSeeders.Add(decrUint64) + } + } else { + err = storage.ErrResourceDoesNotExist } - if _, ok := shard.swarms[ih].seeders[p]; !ok { - return storage.ErrResourceDoesNotExist - } - - shard.numSeeders-- - delete(shard.swarms[ih].seeders, p) - - if len(shard.swarms[ih].seeders)|len(shard.swarms[ih].leechers) == 0 { - delete(shard.swarms, ih) - } - - return nil + return } func (ps *peerStore) PutLeecher(_ context.Context, ih bittorrent.InfoHash, p bittorrent.Peer) error { @@ -252,29 +246,19 @@ func (ps *peerStore) PutLeecher(_ context.Context, ih bittorrent.InfoHash, p bit Object("peer", p). Msg("put leecher") - shard := ps.shards[ps.shardIndex(ih, p.Addr().Is6())] - shard.Lock() - defer shard.Unlock() + sh := ps.shards[ps.shardIndex(ih, p.Addr().Is6())] + sw, pId := sh.ginSwarm(ih), p.RawString() - if _, ok := shard.swarms[ih]; !ok { - shard.swarms[ih] = swarm{ - seeders: make(map[bittorrent.Peer]int64), - leechers: make(map[bittorrent.Peer]int64), - } + if _, exists := sw.leechers.Get(pId); !exists { + sh.numLeechers.Add(1) } - // If this peer isn't already a leecher, update the stats for the swarm. - if _, ok := shard.swarms[ih].leechers[p]; !ok { - shard.numLeechers++ - } - - // Update the peer in the swarm. - shard.swarms[ih].leechers[p] = timecache.NowUnixNano() + sw.leechers.Set(pId, timecache.NowUnixNano()) return nil } -func (ps *peerStore) DeleteLeecher(_ context.Context, ih bittorrent.InfoHash, p bittorrent.Peer) error { +func (ps *peerStore) DeleteLeecher(_ context.Context, ih bittorrent.InfoHash, p bittorrent.Peer) (err error) { select { case <-ps.closed: panic("attempted to interact with stopped memory store") @@ -285,26 +269,16 @@ func (ps *peerStore) DeleteLeecher(_ context.Context, ih bittorrent.InfoHash, p Object("peer", p). Msg("delete leecher") - shard := ps.shards[ps.shardIndex(ih, p.Addr().Is6())] - shard.Lock() - defer shard.Unlock() - - if _, ok := shard.swarms[ih]; !ok { - return storage.ErrResourceDoesNotExist + sh := ps.shards[ps.shardIndex(ih, p.Addr().Is6())] + if sw, ok := sh.swarms.Get(ih); ok { + if sw.leechers.Del(p.RawString()) { + sh.numLeechers.Add(decrUint64) + } + } else { + err = storage.ErrResourceDoesNotExist } - if _, ok := shard.swarms[ih].leechers[p]; !ok { - return storage.ErrResourceDoesNotExist - } - - shard.numLeechers-- - delete(shard.swarms[ih].leechers, p) - - if len(shard.swarms[ih].seeders)|len(shard.swarms[ih].leechers) == 0 { - delete(shard.swarms, ih) - } - - return nil + return } func (ps *peerStore) GraduateLeecher(_ context.Context, ih bittorrent.InfoHash, p bittorrent.Peer) error { @@ -318,55 +292,40 @@ func (ps *peerStore) GraduateLeecher(_ context.Context, ih bittorrent.InfoHash, Object("peer", p). Msg("graduate leecher") - shard := ps.shards[ps.shardIndex(ih, p.Addr().Is6())] - shard.Lock() - defer shard.Unlock() + sh := ps.shards[ps.shardIndex(ih, p.Addr().Is6())] + sw, pId := sh.ginSwarm(ih), p.RawString() - if _, ok := shard.swarms[ih]; !ok { - shard.swarms[ih] = swarm{ - seeders: make(map[bittorrent.Peer]int64), - leechers: make(map[bittorrent.Peer]int64), - } + if sw.leechers.Del(pId) { + sh.numLeechers.Add(decrUint64) } - // If this peer is a leecher, update the stats for the swarm and remove them. - if _, ok := shard.swarms[ih].leechers[p]; ok { - shard.numLeechers-- - delete(shard.swarms[ih].leechers, p) + if _, exists := sw.seeders.Get(pId); !exists { + sh.numSeeders.Add(1) } - // If this peer isn't already a seeder, update the stats for the swarm. - if _, ok := shard.swarms[ih].seeders[p]; !ok { - shard.numSeeders++ - } - - // Update the peer in the swarm. - shard.swarms[ih].seeders[p] = timecache.NowUnixNano() + sw.seeders.Set(pId, timecache.NowUnixNano()) return nil } -func parsePeers(peersMap map[bittorrent.Peer]int64, maxCount int) (peers []bittorrent.Peer) { - for p := range peersMap { - if maxCount == 0 { - break - } +func parsePeers(peersMap *hashmap.Map[string, int64], maxCount int) (peers []bittorrent.Peer) { + peersMap.Range(func(pId string, _ int64) bool { + p, _ := bittorrent.NewPeer(pId) peers = append(peers, p) maxCount-- - } + return maxCount > 0 + }) return } -func (ps *peerStore) getPeers(shard *peerShard, ih bittorrent.InfoHash, maxCount int, forSeeder bool) (peers []bittorrent.Peer) { - shard.RLock() - defer shard.RUnlock() - if swarm, ok := shard.swarms[ih]; ok { +func (ps *peerStore) getPeers(shard *peerShard, ih bittorrent.InfoHash, numWant int, forSeeder bool) (peers []bittorrent.Peer) { + if sw, ok := shard.swarms.Get(ih); ok { if forSeeder { - peers = parsePeers(swarm.leechers, maxCount) + peers = parsePeers(sw.leechers, numWant) } else { - peers = append(peers, parsePeers(swarm.seeders, maxCount)...) - if maxCount -= len(peers); maxCount > 0 { - peers = append(peers, parsePeers(swarm.leechers, maxCount)...) + peers = append(peers, parsePeers(sw.seeders, numWant)...) + if numWant -= len(peers); numWant > 0 { + peers = append(peers, parsePeers(sw.leechers, numWant)...) } } } @@ -393,11 +352,9 @@ func (ps *peerStore) AnnouncePeers(_ context.Context, ih bittorrent.InfoHash, fo func (ps *peerStore) countPeers(ih bittorrent.InfoHash, v6 bool) (leechers, seeders uint32) { shard := ps.shards[ps.shardIndex(ih, v6)] - shard.RLock() - defer shard.RUnlock() - if swarm, ok := shard.swarms[ih]; ok { - leechers, seeders = uint32(len(swarm.leechers)), uint32(len(swarm.seeders)) + if sw, ok := shard.swarms.Get(ih); ok { + leechers, seeders = uint32(sw.leechers.Len()), uint32(sw.seeders.Len()) } return } @@ -421,19 +378,30 @@ func (ps *peerStore) ScrapeSwarm(_ context.Context, ih bittorrent.InfoHash) (lee // NewDataStorage creates new in-memory data store func NewDataStorage() storage.DataStorage { - return new(dataStore) + return &dataStore{ + hashmap.New[string, *hashmap.Map[string, []byte]](), + sync.Mutex{}, + } } type dataStore struct { - sync.Map + *hashmap.Map[string, *hashmap.Map[string, []byte]] + sync.Mutex } func (ds *dataStore) Put(_ context.Context, ctx string, values ...storage.Entry) error { if len(values) > 0 { - c, _ := ds.LoadOrStore(ctx, new(sync.Map)) - m := c.(*sync.Map) + m, ok := ds.Get(ctx) + if !ok { + ds.Lock() + if m, ok = ds.Get(ctx); !ok { + m = hashmap.New[string, []byte]() + ds.Insert(ctx, m) + } + ds.Unlock() + } for _, p := range values { - m.Store(p.Key, p.Value) + m.Set(p.Key, p.Value) } } return nil @@ -441,27 +409,24 @@ func (ds *dataStore) Put(_ context.Context, ctx string, values ...storage.Entry) func (ds *dataStore) Contains(_ context.Context, ctx string, key string) (bool, error) { var exist bool - if m, found := ds.Map.Load(ctx); found { - _, exist = m.(*sync.Map).Load(key) + if m, found := ds.Get(ctx); found { + _, exist = m.Get(key) } return exist, nil } func (ds *dataStore) Load(_ context.Context, ctx string, key string) (out []byte, _ error) { - if m, found := ds.Map.Load(ctx); found { - if v, _ := m.(*sync.Map).Load(key); v != nil { - out = v.([]byte) - } + if m, found := ds.Map.Get(ctx); found { + out, _ = m.Get(key) } return } func (ds *dataStore) Delete(_ context.Context, ctx string, keys ...string) error { if len(keys) > 0 { - if m, found := ds.Map.Load(ctx); found { - m := m.(*sync.Map) + if m, found := ds.Get(ctx); found { for _, k := range keys { - m.Delete(k) + m.Del(k) } } } @@ -487,42 +452,43 @@ func (ps *peerStore) gc(cutoff time.Time) { cutoffUnix := cutoff.UnixNano() for _, shard := range ps.shards { - shard.RLock() - var infohashes []bittorrent.InfoHash - for ih := range shard.swarms { - infohashes = append(infohashes, ih) - } - shard.RUnlock() + infoHashes := make([]bittorrent.InfoHash, 0, shard.swarms.Len()) + shard.swarms.Range(func(ih bittorrent.InfoHash, sw swarm) bool { + infoHashes = append(infoHashes, ih) + return true + }) runtime.Gosched() - for _, ih := range infohashes { - shard.Lock() + for _, ih := range infoHashes { - if _, stillExists := shard.swarms[ih]; !stillExists { - shard.Unlock() + sw, stillExists := shard.swarms.Get(ih) + if !stillExists { runtime.Gosched() continue } - for pk, mtime := range shard.swarms[ih].leechers { + sw.leechers.Range(func(pId string, mtime int64) bool { if mtime <= cutoffUnix { - shard.numLeechers-- - delete(shard.swarms[ih].leechers, pk) + sw.leechers.Del(pId) + shard.numLeechers.Add(decrUint64) } - } + return true + }) - for pk, mtime := range shard.swarms[ih].seeders { + sw.seeders.Range(func(pId string, mtime int64) bool { if mtime <= cutoffUnix { - shard.numSeeders-- - delete(shard.swarms[ih].seeders, pk) + sw.seeders.Del(pId) + shard.numSeeders.Add(decrUint64) } + return true + }) + + if sw.leechers.Len()|sw.seeders.Len() == 0 { + shard.Lock() + shard.swarms.Del(ih) + shard.Unlock() } - if len(shard.swarms[ih].seeders)|len(shard.swarms[ih].leechers) == 0 { - delete(shard.swarms, ih) - } - - shard.Unlock() runtime.Gosched() } @@ -542,7 +508,7 @@ func (ps *peerStore) Close() error { // Explicitly deallocate our storage. shards := make([]*peerShard, len(ps.shards)) for i := 0; i < len(ps.shards); i++ { - shards[i] = &peerShard{swarms: make(map[bittorrent.InfoHash]swarm)} + shards[i] = &peerShard{swarms: hashmap.New[bittorrent.InfoHash, swarm]()} } ps.shards = shards })