replace go map with cornelk/hashmap in memory store

This commit is contained in:
Lawrence, Rendall
2022-11-27 16:22:04 +03:00
parent 234c65333e
commit 7e3204b9dd
3 changed files with 127 additions and 158 deletions

1
go.mod
View File

@@ -18,6 +18,7 @@ require (
github.com/stretchr/testify v1.8.1 github.com/stretchr/testify v1.8.1
github.com/zeebo/xxh3 v1.0.2 github.com/zeebo/xxh3 v1.0.2
gopkg.in/yaml.v3 v3.0.1 gopkg.in/yaml.v3 v3.0.1
github.com/cornelk/hashmap v1.0.8
) )
require ( require (

2
go.sum
View File

@@ -95,6 +95,8 @@ github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMn
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc= github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc=
github.com/coreos/go-systemd/v22 v22.3.3-0.20220203105225-a9a7ef127534/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc= github.com/coreos/go-systemd/v22 v22.3.3-0.20220203105225-a9a7ef127534/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc=
github.com/cornelk/hashmap v1.0.8 h1:nv0AWgw02n+iDcawr5It4CjQIAcdMMKRrs10HOJYlrc=
github.com/cornelk/hashmap v1.0.8/go.mod h1:RfZb7JO3RviW/rT6emczVuC/oxpdz4UsSB2LJSclR1k=
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=

View File

@@ -5,9 +5,11 @@ package memory
import ( import (
"context" "context"
"encoding/binary" "encoding/binary"
"github.com/cornelk/hashmap"
"math" "math"
"runtime" "runtime"
"sync" "sync"
"sync/atomic"
"time" "time"
"github.com/sot-tech/mochi/bittorrent" "github.com/sot-tech/mochi/bittorrent"
@@ -23,6 +25,8 @@ const (
Name = "memory" Name = "memory"
// Default config constants. // Default config constants.
defaultShardCount = 1024 defaultShardCount = 1024
// -1
decrUint64 = ^uint64(0)
) )
var logger = log.NewLogger("storage/memory") var logger = log.NewLogger("storage/memory")
@@ -68,38 +72,50 @@ func (cfg Config) Validate() Config {
func NewPeerStorage(provided Config) (storage.PeerStorage, error) { func NewPeerStorage(provided Config) (storage.PeerStorage, error) {
cfg := provided.Validate() cfg := provided.Validate()
ps := &peerStore{ ps := &peerStore{
cfg: cfg,
shards: make([]*peerShard, cfg.ShardCount*2), shards: make([]*peerShard, cfg.ShardCount*2),
DataStorage: NewDataStorage(), DataStorage: NewDataStorage(),
closed: make(chan struct{}), closed: make(chan any),
} }
for i := 0; i < cfg.ShardCount*2; i++ { for i := 0; i < cfg.ShardCount*2; i++ {
ps.shards[i] = &peerShard{swarms: make(map[bittorrent.InfoHash]swarm)} ps.shards[i] = &peerShard{swarms: hashmap.New[bittorrent.InfoHash, swarm]()}
} }
return ps, nil return ps, nil
} }
type peerShard struct { type peerShard struct {
swarms map[bittorrent.InfoHash]swarm swarms *hashmap.Map[bittorrent.InfoHash, swarm]
numSeeders uint64 numSeeders atomic.Uint64
numLeechers uint64 numLeechers atomic.Uint64
sync.RWMutex sync.Mutex
}
// ginSwarm returns existing swarm or inserts new empty if it does not exist
func (sh *peerShard) ginSwarm(ih bittorrent.InfoHash) swarm {
sw, ok := sh.swarms.Get(ih)
if !ok {
sh.Lock()
defer sh.Unlock()
sw, _ = sh.swarms.GetOrInsert(ih, swarm{
seeders: hashmap.New[string, int64](),
leechers: hashmap.New[string, int64](),
})
}
return sw
} }
type swarm struct { type swarm struct {
// map serialized peer to mtime // map serialized peer to mtime
seeders map[bittorrent.Peer]int64 seeders *hashmap.Map[string, int64]
leechers map[bittorrent.Peer]int64 leechers *hashmap.Map[string, int64]
} }
type peerStore struct { type peerStore struct {
storage.DataStorage storage.DataStorage
cfg Config
shards []*peerShard shards []*peerShard
closed chan struct{} closed chan any
wg sync.WaitGroup wg sync.WaitGroup
onceCloser sync.Once onceCloser sync.Once
} }
@@ -147,11 +163,9 @@ func (ps *peerStore) ScheduleStatisticsCollection(reportInterval time.Duration)
var numInfohashes, numSeeders, numLeechers uint64 var numInfohashes, numSeeders, numLeechers uint64
for _, s := range ps.shards { for _, s := range ps.shards {
s.RLock() numInfohashes += uint64(s.swarms.Len())
numInfohashes += uint64(len(s.swarms)) numSeeders += uint64(s.numSeeders.Load())
numSeeders += s.numSeeders numLeechers += uint64(s.numLeechers.Load())
numLeechers += s.numLeechers
s.RUnlock()
} }
storage.PromInfoHashesCount.Set(float64(numInfohashes)) storage.PromInfoHashesCount.Set(float64(numInfohashes))
@@ -186,29 +200,19 @@ func (ps *peerStore) PutSeeder(_ context.Context, ih bittorrent.InfoHash, p bitt
Object("peer", p). Object("peer", p).
Msg("put seeder") Msg("put seeder")
shard := ps.shards[ps.shardIndex(ih, p.Addr().Is6())] sh := ps.shards[ps.shardIndex(ih, p.Addr().Is6())]
shard.Lock() sw, pId := sh.ginSwarm(ih), p.RawString()
defer shard.Unlock()
if _, ok := shard.swarms[ih]; !ok { if _, exists := sw.seeders.Get(pId); !exists {
shard.swarms[ih] = swarm{ sh.numSeeders.Add(1)
seeders: make(map[bittorrent.Peer]int64),
leechers: make(map[bittorrent.Peer]int64),
}
} }
// If this peer isn't already a seeder, update the stats for the swarm. sw.seeders.Set(pId, timecache.NowUnixNano())
if _, ok := shard.swarms[ih].seeders[p]; !ok {
shard.numSeeders++
}
// Update the peer in the swarm.
shard.swarms[ih].seeders[p] = timecache.NowUnixNano()
return nil return nil
} }
func (ps *peerStore) DeleteSeeder(_ context.Context, ih bittorrent.InfoHash, p bittorrent.Peer) error { func (ps *peerStore) DeleteSeeder(_ context.Context, ih bittorrent.InfoHash, p bittorrent.Peer) (err error) {
select { select {
case <-ps.closed: case <-ps.closed:
panic("attempted to interact with stopped memory store") panic("attempted to interact with stopped memory store")
@@ -219,26 +223,16 @@ func (ps *peerStore) DeleteSeeder(_ context.Context, ih bittorrent.InfoHash, p b
Object("peer", p). Object("peer", p).
Msg("delete seeder") Msg("delete seeder")
shard := ps.shards[ps.shardIndex(ih, p.Addr().Is6())] sh := ps.shards[ps.shardIndex(ih, p.Addr().Is6())]
shard.Lock() if sw, ok := sh.swarms.Get(ih); ok {
defer shard.Unlock() if sw.seeders.Del(p.RawString()) {
sh.numSeeders.Add(decrUint64)
if _, ok := shard.swarms[ih]; !ok { }
return storage.ErrResourceDoesNotExist } else {
err = storage.ErrResourceDoesNotExist
} }
if _, ok := shard.swarms[ih].seeders[p]; !ok { return
return storage.ErrResourceDoesNotExist
}
shard.numSeeders--
delete(shard.swarms[ih].seeders, p)
if len(shard.swarms[ih].seeders)|len(shard.swarms[ih].leechers) == 0 {
delete(shard.swarms, ih)
}
return nil
} }
func (ps *peerStore) PutLeecher(_ context.Context, ih bittorrent.InfoHash, p bittorrent.Peer) error { func (ps *peerStore) PutLeecher(_ context.Context, ih bittorrent.InfoHash, p bittorrent.Peer) error {
@@ -252,29 +246,19 @@ func (ps *peerStore) PutLeecher(_ context.Context, ih bittorrent.InfoHash, p bit
Object("peer", p). Object("peer", p).
Msg("put leecher") Msg("put leecher")
shard := ps.shards[ps.shardIndex(ih, p.Addr().Is6())] sh := ps.shards[ps.shardIndex(ih, p.Addr().Is6())]
shard.Lock() sw, pId := sh.ginSwarm(ih), p.RawString()
defer shard.Unlock()
if _, ok := shard.swarms[ih]; !ok { if _, exists := sw.leechers.Get(pId); !exists {
shard.swarms[ih] = swarm{ sh.numLeechers.Add(1)
seeders: make(map[bittorrent.Peer]int64),
leechers: make(map[bittorrent.Peer]int64),
}
} }
// If this peer isn't already a leecher, update the stats for the swarm. sw.leechers.Set(pId, timecache.NowUnixNano())
if _, ok := shard.swarms[ih].leechers[p]; !ok {
shard.numLeechers++
}
// Update the peer in the swarm.
shard.swarms[ih].leechers[p] = timecache.NowUnixNano()
return nil return nil
} }
func (ps *peerStore) DeleteLeecher(_ context.Context, ih bittorrent.InfoHash, p bittorrent.Peer) error { func (ps *peerStore) DeleteLeecher(_ context.Context, ih bittorrent.InfoHash, p bittorrent.Peer) (err error) {
select { select {
case <-ps.closed: case <-ps.closed:
panic("attempted to interact with stopped memory store") panic("attempted to interact with stopped memory store")
@@ -285,26 +269,16 @@ func (ps *peerStore) DeleteLeecher(_ context.Context, ih bittorrent.InfoHash, p
Object("peer", p). Object("peer", p).
Msg("delete leecher") Msg("delete leecher")
shard := ps.shards[ps.shardIndex(ih, p.Addr().Is6())] sh := ps.shards[ps.shardIndex(ih, p.Addr().Is6())]
shard.Lock() if sw, ok := sh.swarms.Get(ih); ok {
defer shard.Unlock() if sw.leechers.Del(p.RawString()) {
sh.numLeechers.Add(decrUint64)
if _, ok := shard.swarms[ih]; !ok { }
return storage.ErrResourceDoesNotExist } else {
err = storage.ErrResourceDoesNotExist
} }
if _, ok := shard.swarms[ih].leechers[p]; !ok { return
return storage.ErrResourceDoesNotExist
}
shard.numLeechers--
delete(shard.swarms[ih].leechers, p)
if len(shard.swarms[ih].seeders)|len(shard.swarms[ih].leechers) == 0 {
delete(shard.swarms, ih)
}
return nil
} }
func (ps *peerStore) GraduateLeecher(_ context.Context, ih bittorrent.InfoHash, p bittorrent.Peer) error { func (ps *peerStore) GraduateLeecher(_ context.Context, ih bittorrent.InfoHash, p bittorrent.Peer) error {
@@ -318,55 +292,40 @@ func (ps *peerStore) GraduateLeecher(_ context.Context, ih bittorrent.InfoHash,
Object("peer", p). Object("peer", p).
Msg("graduate leecher") Msg("graduate leecher")
shard := ps.shards[ps.shardIndex(ih, p.Addr().Is6())] sh := ps.shards[ps.shardIndex(ih, p.Addr().Is6())]
shard.Lock() sw, pId := sh.ginSwarm(ih), p.RawString()
defer shard.Unlock()
if _, ok := shard.swarms[ih]; !ok { if sw.leechers.Del(pId) {
shard.swarms[ih] = swarm{ sh.numLeechers.Add(decrUint64)
seeders: make(map[bittorrent.Peer]int64),
leechers: make(map[bittorrent.Peer]int64),
}
} }
// If this peer is a leecher, update the stats for the swarm and remove them. if _, exists := sw.seeders.Get(pId); !exists {
if _, ok := shard.swarms[ih].leechers[p]; ok { sh.numSeeders.Add(1)
shard.numLeechers--
delete(shard.swarms[ih].leechers, p)
} }
// If this peer isn't already a seeder, update the stats for the swarm. sw.seeders.Set(pId, timecache.NowUnixNano())
if _, ok := shard.swarms[ih].seeders[p]; !ok {
shard.numSeeders++
}
// Update the peer in the swarm.
shard.swarms[ih].seeders[p] = timecache.NowUnixNano()
return nil return nil
} }
func parsePeers(peersMap map[bittorrent.Peer]int64, maxCount int) (peers []bittorrent.Peer) { func parsePeers(peersMap *hashmap.Map[string, int64], maxCount int) (peers []bittorrent.Peer) {
for p := range peersMap { peersMap.Range(func(pId string, _ int64) bool {
if maxCount == 0 { p, _ := bittorrent.NewPeer(pId)
break
}
peers = append(peers, p) peers = append(peers, p)
maxCount-- maxCount--
} return maxCount > 0
})
return return
} }
func (ps *peerStore) getPeers(shard *peerShard, ih bittorrent.InfoHash, maxCount int, forSeeder bool) (peers []bittorrent.Peer) { func (ps *peerStore) getPeers(shard *peerShard, ih bittorrent.InfoHash, numWant int, forSeeder bool) (peers []bittorrent.Peer) {
shard.RLock() if sw, ok := shard.swarms.Get(ih); ok {
defer shard.RUnlock()
if swarm, ok := shard.swarms[ih]; ok {
if forSeeder { if forSeeder {
peers = parsePeers(swarm.leechers, maxCount) peers = parsePeers(sw.leechers, numWant)
} else { } else {
peers = append(peers, parsePeers(swarm.seeders, maxCount)...) peers = append(peers, parsePeers(sw.seeders, numWant)...)
if maxCount -= len(peers); maxCount > 0 { if numWant -= len(peers); numWant > 0 {
peers = append(peers, parsePeers(swarm.leechers, maxCount)...) peers = append(peers, parsePeers(sw.leechers, numWant)...)
} }
} }
} }
@@ -393,11 +352,9 @@ func (ps *peerStore) AnnouncePeers(_ context.Context, ih bittorrent.InfoHash, fo
func (ps *peerStore) countPeers(ih bittorrent.InfoHash, v6 bool) (leechers, seeders uint32) { func (ps *peerStore) countPeers(ih bittorrent.InfoHash, v6 bool) (leechers, seeders uint32) {
shard := ps.shards[ps.shardIndex(ih, v6)] shard := ps.shards[ps.shardIndex(ih, v6)]
shard.RLock()
defer shard.RUnlock()
if swarm, ok := shard.swarms[ih]; ok { if sw, ok := shard.swarms.Get(ih); ok {
leechers, seeders = uint32(len(swarm.leechers)), uint32(len(swarm.seeders)) leechers, seeders = uint32(sw.leechers.Len()), uint32(sw.seeders.Len())
} }
return return
} }
@@ -421,19 +378,30 @@ func (ps *peerStore) ScrapeSwarm(_ context.Context, ih bittorrent.InfoHash) (lee
// NewDataStorage creates new in-memory data store // NewDataStorage creates new in-memory data store
func NewDataStorage() storage.DataStorage { func NewDataStorage() storage.DataStorage {
return new(dataStore) return &dataStore{
hashmap.New[string, *hashmap.Map[string, []byte]](),
sync.Mutex{},
}
} }
type dataStore struct { type dataStore struct {
sync.Map *hashmap.Map[string, *hashmap.Map[string, []byte]]
sync.Mutex
} }
func (ds *dataStore) Put(_ context.Context, ctx string, values ...storage.Entry) error { func (ds *dataStore) Put(_ context.Context, ctx string, values ...storage.Entry) error {
if len(values) > 0 { if len(values) > 0 {
c, _ := ds.LoadOrStore(ctx, new(sync.Map)) m, ok := ds.Get(ctx)
m := c.(*sync.Map) if !ok {
ds.Lock()
if m, ok = ds.Get(ctx); !ok {
m = hashmap.New[string, []byte]()
ds.Insert(ctx, m)
}
ds.Unlock()
}
for _, p := range values { for _, p := range values {
m.Store(p.Key, p.Value) m.Set(p.Key, p.Value)
} }
} }
return nil return nil
@@ -441,27 +409,24 @@ func (ds *dataStore) Put(_ context.Context, ctx string, values ...storage.Entry)
func (ds *dataStore) Contains(_ context.Context, ctx string, key string) (bool, error) { func (ds *dataStore) Contains(_ context.Context, ctx string, key string) (bool, error) {
var exist bool var exist bool
if m, found := ds.Map.Load(ctx); found { if m, found := ds.Get(ctx); found {
_, exist = m.(*sync.Map).Load(key) _, exist = m.Get(key)
} }
return exist, nil return exist, nil
} }
func (ds *dataStore) Load(_ context.Context, ctx string, key string) (out []byte, _ error) { func (ds *dataStore) Load(_ context.Context, ctx string, key string) (out []byte, _ error) {
if m, found := ds.Map.Load(ctx); found { if m, found := ds.Map.Get(ctx); found {
if v, _ := m.(*sync.Map).Load(key); v != nil { out, _ = m.Get(key)
out = v.([]byte)
}
} }
return return
} }
func (ds *dataStore) Delete(_ context.Context, ctx string, keys ...string) error { func (ds *dataStore) Delete(_ context.Context, ctx string, keys ...string) error {
if len(keys) > 0 { if len(keys) > 0 {
if m, found := ds.Map.Load(ctx); found { if m, found := ds.Get(ctx); found {
m := m.(*sync.Map)
for _, k := range keys { for _, k := range keys {
m.Delete(k) m.Del(k)
} }
} }
} }
@@ -487,42 +452,43 @@ func (ps *peerStore) gc(cutoff time.Time) {
cutoffUnix := cutoff.UnixNano() cutoffUnix := cutoff.UnixNano()
for _, shard := range ps.shards { for _, shard := range ps.shards {
shard.RLock() infoHashes := make([]bittorrent.InfoHash, 0, shard.swarms.Len())
var infohashes []bittorrent.InfoHash shard.swarms.Range(func(ih bittorrent.InfoHash, sw swarm) bool {
for ih := range shard.swarms { infoHashes = append(infoHashes, ih)
infohashes = append(infohashes, ih) return true
} })
shard.RUnlock()
runtime.Gosched() runtime.Gosched()
for _, ih := range infohashes { for _, ih := range infoHashes {
shard.Lock()
if _, stillExists := shard.swarms[ih]; !stillExists { sw, stillExists := shard.swarms.Get(ih)
shard.Unlock() if !stillExists {
runtime.Gosched() runtime.Gosched()
continue continue
} }
for pk, mtime := range shard.swarms[ih].leechers { sw.leechers.Range(func(pId string, mtime int64) bool {
if mtime <= cutoffUnix { if mtime <= cutoffUnix {
shard.numLeechers-- sw.leechers.Del(pId)
delete(shard.swarms[ih].leechers, pk) shard.numLeechers.Add(decrUint64)
}
} }
return true
})
for pk, mtime := range shard.swarms[ih].seeders { sw.seeders.Range(func(pId string, mtime int64) bool {
if mtime <= cutoffUnix { if mtime <= cutoffUnix {
shard.numSeeders-- sw.seeders.Del(pId)
delete(shard.swarms[ih].seeders, pk) shard.numSeeders.Add(decrUint64)
}
}
if len(shard.swarms[ih].seeders)|len(shard.swarms[ih].leechers) == 0 {
delete(shard.swarms, ih)
} }
return true
})
if sw.leechers.Len()|sw.seeders.Len() == 0 {
shard.Lock()
shard.swarms.Del(ih)
shard.Unlock() shard.Unlock()
}
runtime.Gosched() runtime.Gosched()
} }
@@ -542,7 +508,7 @@ func (ps *peerStore) Close() error {
// Explicitly deallocate our storage. // Explicitly deallocate our storage.
shards := make([]*peerShard, len(ps.shards)) shards := make([]*peerShard, len(ps.shards))
for i := 0; i < len(ps.shards); i++ { for i := 0; i < len(ps.shards); i++ {
shards[i] = &peerShard{swarms: make(map[bittorrent.InfoHash]swarm)} shards[i] = &peerShard{swarms: hashmap.New[bittorrent.InfoHash, swarm]()}
} }
ps.shards = shards ps.shards = shards
}) })