replace go map with cornelk/hashmap in memory store

This commit is contained in:
Lawrence, Rendall
2022-11-27 16:22:04 +03:00
parent 234c65333e
commit 7e3204b9dd
3 changed files with 127 additions and 158 deletions

1
go.mod
View File

@@ -18,6 +18,7 @@ require (
github.com/stretchr/testify v1.8.1
github.com/zeebo/xxh3 v1.0.2
gopkg.in/yaml.v3 v3.0.1
github.com/cornelk/hashmap v1.0.8
)
require (

2
go.sum
View File

@@ -95,6 +95,8 @@ github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMn
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc=
github.com/coreos/go-systemd/v22 v22.3.3-0.20220203105225-a9a7ef127534/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc=
github.com/cornelk/hashmap v1.0.8 h1:nv0AWgw02n+iDcawr5It4CjQIAcdMMKRrs10HOJYlrc=
github.com/cornelk/hashmap v1.0.8/go.mod h1:RfZb7JO3RviW/rT6emczVuC/oxpdz4UsSB2LJSclR1k=
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=

View File

@@ -5,9 +5,11 @@ package memory
import (
"context"
"encoding/binary"
"github.com/cornelk/hashmap"
"math"
"runtime"
"sync"
"sync/atomic"
"time"
"github.com/sot-tech/mochi/bittorrent"
@@ -23,6 +25,8 @@ const (
Name = "memory"
// Default config constants.
defaultShardCount = 1024
// -1
decrUint64 = ^uint64(0)
)
var logger = log.NewLogger("storage/memory")
@@ -68,38 +72,50 @@ func (cfg Config) Validate() Config {
func NewPeerStorage(provided Config) (storage.PeerStorage, error) {
cfg := provided.Validate()
ps := &peerStore{
cfg: cfg,
shards: make([]*peerShard, cfg.ShardCount*2),
DataStorage: NewDataStorage(),
closed: make(chan struct{}),
closed: make(chan any),
}
for i := 0; i < cfg.ShardCount*2; i++ {
ps.shards[i] = &peerShard{swarms: make(map[bittorrent.InfoHash]swarm)}
ps.shards[i] = &peerShard{swarms: hashmap.New[bittorrent.InfoHash, swarm]()}
}
return ps, nil
}
type peerShard struct {
swarms map[bittorrent.InfoHash]swarm
numSeeders uint64
numLeechers uint64
sync.RWMutex
swarms *hashmap.Map[bittorrent.InfoHash, swarm]
numSeeders atomic.Uint64
numLeechers atomic.Uint64
sync.Mutex
}
// ginSwarm returns existing swarm or inserts new empty if it does not exist
func (sh *peerShard) ginSwarm(ih bittorrent.InfoHash) swarm {
sw, ok := sh.swarms.Get(ih)
if !ok {
sh.Lock()
defer sh.Unlock()
sw, _ = sh.swarms.GetOrInsert(ih, swarm{
seeders: hashmap.New[string, int64](),
leechers: hashmap.New[string, int64](),
})
}
return sw
}
type swarm struct {
// map serialized peer to mtime
seeders map[bittorrent.Peer]int64
leechers map[bittorrent.Peer]int64
seeders *hashmap.Map[string, int64]
leechers *hashmap.Map[string, int64]
}
type peerStore struct {
storage.DataStorage
cfg Config
shards []*peerShard
closed chan struct{}
closed chan any
wg sync.WaitGroup
onceCloser sync.Once
}
@@ -147,11 +163,9 @@ func (ps *peerStore) ScheduleStatisticsCollection(reportInterval time.Duration)
var numInfohashes, numSeeders, numLeechers uint64
for _, s := range ps.shards {
s.RLock()
numInfohashes += uint64(len(s.swarms))
numSeeders += s.numSeeders
numLeechers += s.numLeechers
s.RUnlock()
numInfohashes += uint64(s.swarms.Len())
numSeeders += uint64(s.numSeeders.Load())
numLeechers += uint64(s.numLeechers.Load())
}
storage.PromInfoHashesCount.Set(float64(numInfohashes))
@@ -186,29 +200,19 @@ func (ps *peerStore) PutSeeder(_ context.Context, ih bittorrent.InfoHash, p bitt
Object("peer", p).
Msg("put seeder")
shard := ps.shards[ps.shardIndex(ih, p.Addr().Is6())]
shard.Lock()
defer shard.Unlock()
sh := ps.shards[ps.shardIndex(ih, p.Addr().Is6())]
sw, pId := sh.ginSwarm(ih), p.RawString()
if _, ok := shard.swarms[ih]; !ok {
shard.swarms[ih] = swarm{
seeders: make(map[bittorrent.Peer]int64),
leechers: make(map[bittorrent.Peer]int64),
}
if _, exists := sw.seeders.Get(pId); !exists {
sh.numSeeders.Add(1)
}
// If this peer isn't already a seeder, update the stats for the swarm.
if _, ok := shard.swarms[ih].seeders[p]; !ok {
shard.numSeeders++
}
// Update the peer in the swarm.
shard.swarms[ih].seeders[p] = timecache.NowUnixNano()
sw.seeders.Set(pId, timecache.NowUnixNano())
return nil
}
func (ps *peerStore) DeleteSeeder(_ context.Context, ih bittorrent.InfoHash, p bittorrent.Peer) error {
func (ps *peerStore) DeleteSeeder(_ context.Context, ih bittorrent.InfoHash, p bittorrent.Peer) (err error) {
select {
case <-ps.closed:
panic("attempted to interact with stopped memory store")
@@ -219,26 +223,16 @@ func (ps *peerStore) DeleteSeeder(_ context.Context, ih bittorrent.InfoHash, p b
Object("peer", p).
Msg("delete seeder")
shard := ps.shards[ps.shardIndex(ih, p.Addr().Is6())]
shard.Lock()
defer shard.Unlock()
if _, ok := shard.swarms[ih]; !ok {
return storage.ErrResourceDoesNotExist
sh := ps.shards[ps.shardIndex(ih, p.Addr().Is6())]
if sw, ok := sh.swarms.Get(ih); ok {
if sw.seeders.Del(p.RawString()) {
sh.numSeeders.Add(decrUint64)
}
} else {
err = storage.ErrResourceDoesNotExist
}
if _, ok := shard.swarms[ih].seeders[p]; !ok {
return storage.ErrResourceDoesNotExist
}
shard.numSeeders--
delete(shard.swarms[ih].seeders, p)
if len(shard.swarms[ih].seeders)|len(shard.swarms[ih].leechers) == 0 {
delete(shard.swarms, ih)
}
return nil
return
}
func (ps *peerStore) PutLeecher(_ context.Context, ih bittorrent.InfoHash, p bittorrent.Peer) error {
@@ -252,29 +246,19 @@ func (ps *peerStore) PutLeecher(_ context.Context, ih bittorrent.InfoHash, p bit
Object("peer", p).
Msg("put leecher")
shard := ps.shards[ps.shardIndex(ih, p.Addr().Is6())]
shard.Lock()
defer shard.Unlock()
sh := ps.shards[ps.shardIndex(ih, p.Addr().Is6())]
sw, pId := sh.ginSwarm(ih), p.RawString()
if _, ok := shard.swarms[ih]; !ok {
shard.swarms[ih] = swarm{
seeders: make(map[bittorrent.Peer]int64),
leechers: make(map[bittorrent.Peer]int64),
}
if _, exists := sw.leechers.Get(pId); !exists {
sh.numLeechers.Add(1)
}
// If this peer isn't already a leecher, update the stats for the swarm.
if _, ok := shard.swarms[ih].leechers[p]; !ok {
shard.numLeechers++
}
// Update the peer in the swarm.
shard.swarms[ih].leechers[p] = timecache.NowUnixNano()
sw.leechers.Set(pId, timecache.NowUnixNano())
return nil
}
func (ps *peerStore) DeleteLeecher(_ context.Context, ih bittorrent.InfoHash, p bittorrent.Peer) error {
func (ps *peerStore) DeleteLeecher(_ context.Context, ih bittorrent.InfoHash, p bittorrent.Peer) (err error) {
select {
case <-ps.closed:
panic("attempted to interact with stopped memory store")
@@ -285,26 +269,16 @@ func (ps *peerStore) DeleteLeecher(_ context.Context, ih bittorrent.InfoHash, p
Object("peer", p).
Msg("delete leecher")
shard := ps.shards[ps.shardIndex(ih, p.Addr().Is6())]
shard.Lock()
defer shard.Unlock()
if _, ok := shard.swarms[ih]; !ok {
return storage.ErrResourceDoesNotExist
sh := ps.shards[ps.shardIndex(ih, p.Addr().Is6())]
if sw, ok := sh.swarms.Get(ih); ok {
if sw.leechers.Del(p.RawString()) {
sh.numLeechers.Add(decrUint64)
}
} else {
err = storage.ErrResourceDoesNotExist
}
if _, ok := shard.swarms[ih].leechers[p]; !ok {
return storage.ErrResourceDoesNotExist
}
shard.numLeechers--
delete(shard.swarms[ih].leechers, p)
if len(shard.swarms[ih].seeders)|len(shard.swarms[ih].leechers) == 0 {
delete(shard.swarms, ih)
}
return nil
return
}
func (ps *peerStore) GraduateLeecher(_ context.Context, ih bittorrent.InfoHash, p bittorrent.Peer) error {
@@ -318,55 +292,40 @@ func (ps *peerStore) GraduateLeecher(_ context.Context, ih bittorrent.InfoHash,
Object("peer", p).
Msg("graduate leecher")
shard := ps.shards[ps.shardIndex(ih, p.Addr().Is6())]
shard.Lock()
defer shard.Unlock()
sh := ps.shards[ps.shardIndex(ih, p.Addr().Is6())]
sw, pId := sh.ginSwarm(ih), p.RawString()
if _, ok := shard.swarms[ih]; !ok {
shard.swarms[ih] = swarm{
seeders: make(map[bittorrent.Peer]int64),
leechers: make(map[bittorrent.Peer]int64),
}
if sw.leechers.Del(pId) {
sh.numLeechers.Add(decrUint64)
}
// If this peer is a leecher, update the stats for the swarm and remove them.
if _, ok := shard.swarms[ih].leechers[p]; ok {
shard.numLeechers--
delete(shard.swarms[ih].leechers, p)
if _, exists := sw.seeders.Get(pId); !exists {
sh.numSeeders.Add(1)
}
// If this peer isn't already a seeder, update the stats for the swarm.
if _, ok := shard.swarms[ih].seeders[p]; !ok {
shard.numSeeders++
}
// Update the peer in the swarm.
shard.swarms[ih].seeders[p] = timecache.NowUnixNano()
sw.seeders.Set(pId, timecache.NowUnixNano())
return nil
}
func parsePeers(peersMap map[bittorrent.Peer]int64, maxCount int) (peers []bittorrent.Peer) {
for p := range peersMap {
if maxCount == 0 {
break
}
func parsePeers(peersMap *hashmap.Map[string, int64], maxCount int) (peers []bittorrent.Peer) {
peersMap.Range(func(pId string, _ int64) bool {
p, _ := bittorrent.NewPeer(pId)
peers = append(peers, p)
maxCount--
}
return maxCount > 0
})
return
}
func (ps *peerStore) getPeers(shard *peerShard, ih bittorrent.InfoHash, maxCount int, forSeeder bool) (peers []bittorrent.Peer) {
shard.RLock()
defer shard.RUnlock()
if swarm, ok := shard.swarms[ih]; ok {
func (ps *peerStore) getPeers(shard *peerShard, ih bittorrent.InfoHash, numWant int, forSeeder bool) (peers []bittorrent.Peer) {
if sw, ok := shard.swarms.Get(ih); ok {
if forSeeder {
peers = parsePeers(swarm.leechers, maxCount)
peers = parsePeers(sw.leechers, numWant)
} else {
peers = append(peers, parsePeers(swarm.seeders, maxCount)...)
if maxCount -= len(peers); maxCount > 0 {
peers = append(peers, parsePeers(swarm.leechers, maxCount)...)
peers = append(peers, parsePeers(sw.seeders, numWant)...)
if numWant -= len(peers); numWant > 0 {
peers = append(peers, parsePeers(sw.leechers, numWant)...)
}
}
}
@@ -393,11 +352,9 @@ func (ps *peerStore) AnnouncePeers(_ context.Context, ih bittorrent.InfoHash, fo
func (ps *peerStore) countPeers(ih bittorrent.InfoHash, v6 bool) (leechers, seeders uint32) {
shard := ps.shards[ps.shardIndex(ih, v6)]
shard.RLock()
defer shard.RUnlock()
if swarm, ok := shard.swarms[ih]; ok {
leechers, seeders = uint32(len(swarm.leechers)), uint32(len(swarm.seeders))
if sw, ok := shard.swarms.Get(ih); ok {
leechers, seeders = uint32(sw.leechers.Len()), uint32(sw.seeders.Len())
}
return
}
@@ -421,19 +378,30 @@ func (ps *peerStore) ScrapeSwarm(_ context.Context, ih bittorrent.InfoHash) (lee
// NewDataStorage creates new in-memory data store
func NewDataStorage() storage.DataStorage {
return new(dataStore)
return &dataStore{
hashmap.New[string, *hashmap.Map[string, []byte]](),
sync.Mutex{},
}
}
type dataStore struct {
sync.Map
*hashmap.Map[string, *hashmap.Map[string, []byte]]
sync.Mutex
}
func (ds *dataStore) Put(_ context.Context, ctx string, values ...storage.Entry) error {
if len(values) > 0 {
c, _ := ds.LoadOrStore(ctx, new(sync.Map))
m := c.(*sync.Map)
m, ok := ds.Get(ctx)
if !ok {
ds.Lock()
if m, ok = ds.Get(ctx); !ok {
m = hashmap.New[string, []byte]()
ds.Insert(ctx, m)
}
ds.Unlock()
}
for _, p := range values {
m.Store(p.Key, p.Value)
m.Set(p.Key, p.Value)
}
}
return nil
@@ -441,27 +409,24 @@ func (ds *dataStore) Put(_ context.Context, ctx string, values ...storage.Entry)
func (ds *dataStore) Contains(_ context.Context, ctx string, key string) (bool, error) {
var exist bool
if m, found := ds.Map.Load(ctx); found {
_, exist = m.(*sync.Map).Load(key)
if m, found := ds.Get(ctx); found {
_, exist = m.Get(key)
}
return exist, nil
}
func (ds *dataStore) Load(_ context.Context, ctx string, key string) (out []byte, _ error) {
if m, found := ds.Map.Load(ctx); found {
if v, _ := m.(*sync.Map).Load(key); v != nil {
out = v.([]byte)
}
if m, found := ds.Map.Get(ctx); found {
out, _ = m.Get(key)
}
return
}
func (ds *dataStore) Delete(_ context.Context, ctx string, keys ...string) error {
if len(keys) > 0 {
if m, found := ds.Map.Load(ctx); found {
m := m.(*sync.Map)
if m, found := ds.Get(ctx); found {
for _, k := range keys {
m.Delete(k)
m.Del(k)
}
}
}
@@ -487,42 +452,43 @@ func (ps *peerStore) gc(cutoff time.Time) {
cutoffUnix := cutoff.UnixNano()
for _, shard := range ps.shards {
shard.RLock()
var infohashes []bittorrent.InfoHash
for ih := range shard.swarms {
infohashes = append(infohashes, ih)
}
shard.RUnlock()
infoHashes := make([]bittorrent.InfoHash, 0, shard.swarms.Len())
shard.swarms.Range(func(ih bittorrent.InfoHash, sw swarm) bool {
infoHashes = append(infoHashes, ih)
return true
})
runtime.Gosched()
for _, ih := range infohashes {
shard.Lock()
for _, ih := range infoHashes {
if _, stillExists := shard.swarms[ih]; !stillExists {
shard.Unlock()
sw, stillExists := shard.swarms.Get(ih)
if !stillExists {
runtime.Gosched()
continue
}
for pk, mtime := range shard.swarms[ih].leechers {
sw.leechers.Range(func(pId string, mtime int64) bool {
if mtime <= cutoffUnix {
shard.numLeechers--
delete(shard.swarms[ih].leechers, pk)
sw.leechers.Del(pId)
shard.numLeechers.Add(decrUint64)
}
}
return true
})
for pk, mtime := range shard.swarms[ih].seeders {
sw.seeders.Range(func(pId string, mtime int64) bool {
if mtime <= cutoffUnix {
shard.numSeeders--
delete(shard.swarms[ih].seeders, pk)
sw.seeders.Del(pId)
shard.numSeeders.Add(decrUint64)
}
return true
})
if sw.leechers.Len()|sw.seeders.Len() == 0 {
shard.Lock()
shard.swarms.Del(ih)
shard.Unlock()
}
if len(shard.swarms[ih].seeders)|len(shard.swarms[ih].leechers) == 0 {
delete(shard.swarms, ih)
}
shard.Unlock()
runtime.Gosched()
}
@@ -542,7 +508,7 @@ func (ps *peerStore) Close() error {
// Explicitly deallocate our storage.
shards := make([]*peerShard, len(ps.shards))
for i := 0; i < len(ps.shards); i++ {
shards[i] = &peerShard{swarms: make(map[bittorrent.InfoHash]swarm)}
shards[i] = &peerShard{swarms: hashmap.New[bittorrent.InfoHash, swarm]()}
}
ps.shards = shards
})