mirror of
https://github.com/sot-tech/mochi.git
synced 2026-05-21 07:14:48 -07:00
replace go map with cornelk/hashmap in memory store
This commit is contained in:
1
go.mod
1
go.mod
@@ -18,6 +18,7 @@ require (
|
||||
github.com/stretchr/testify v1.8.1
|
||||
github.com/zeebo/xxh3 v1.0.2
|
||||
gopkg.in/yaml.v3 v3.0.1
|
||||
github.com/cornelk/hashmap v1.0.8
|
||||
)
|
||||
|
||||
require (
|
||||
|
||||
2
go.sum
2
go.sum
@@ -95,6 +95,8 @@ github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMn
|
||||
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
|
||||
github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc=
|
||||
github.com/coreos/go-systemd/v22 v22.3.3-0.20220203105225-a9a7ef127534/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc=
|
||||
github.com/cornelk/hashmap v1.0.8 h1:nv0AWgw02n+iDcawr5It4CjQIAcdMMKRrs10HOJYlrc=
|
||||
github.com/cornelk/hashmap v1.0.8/go.mod h1:RfZb7JO3RviW/rT6emczVuC/oxpdz4UsSB2LJSclR1k=
|
||||
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
|
||||
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
|
||||
|
||||
@@ -5,9 +5,11 @@ package memory
|
||||
import (
|
||||
"context"
|
||||
"encoding/binary"
|
||||
"github.com/cornelk/hashmap"
|
||||
"math"
|
||||
"runtime"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/sot-tech/mochi/bittorrent"
|
||||
@@ -23,6 +25,8 @@ const (
|
||||
Name = "memory"
|
||||
// Default config constants.
|
||||
defaultShardCount = 1024
|
||||
// -1
|
||||
decrUint64 = ^uint64(0)
|
||||
)
|
||||
|
||||
var logger = log.NewLogger("storage/memory")
|
||||
@@ -68,38 +72,50 @@ func (cfg Config) Validate() Config {
|
||||
func NewPeerStorage(provided Config) (storage.PeerStorage, error) {
|
||||
cfg := provided.Validate()
|
||||
ps := &peerStore{
|
||||
cfg: cfg,
|
||||
shards: make([]*peerShard, cfg.ShardCount*2),
|
||||
DataStorage: NewDataStorage(),
|
||||
closed: make(chan struct{}),
|
||||
closed: make(chan any),
|
||||
}
|
||||
|
||||
for i := 0; i < cfg.ShardCount*2; i++ {
|
||||
ps.shards[i] = &peerShard{swarms: make(map[bittorrent.InfoHash]swarm)}
|
||||
ps.shards[i] = &peerShard{swarms: hashmap.New[bittorrent.InfoHash, swarm]()}
|
||||
}
|
||||
|
||||
return ps, nil
|
||||
}
|
||||
|
||||
type peerShard struct {
|
||||
swarms map[bittorrent.InfoHash]swarm
|
||||
numSeeders uint64
|
||||
numLeechers uint64
|
||||
sync.RWMutex
|
||||
swarms *hashmap.Map[bittorrent.InfoHash, swarm]
|
||||
numSeeders atomic.Uint64
|
||||
numLeechers atomic.Uint64
|
||||
sync.Mutex
|
||||
}
|
||||
|
||||
// ginSwarm returns existing swarm or inserts new empty if it does not exist
|
||||
func (sh *peerShard) ginSwarm(ih bittorrent.InfoHash) swarm {
|
||||
sw, ok := sh.swarms.Get(ih)
|
||||
if !ok {
|
||||
sh.Lock()
|
||||
defer sh.Unlock()
|
||||
sw, _ = sh.swarms.GetOrInsert(ih, swarm{
|
||||
seeders: hashmap.New[string, int64](),
|
||||
leechers: hashmap.New[string, int64](),
|
||||
})
|
||||
}
|
||||
return sw
|
||||
}
|
||||
|
||||
type swarm struct {
|
||||
// map serialized peer to mtime
|
||||
seeders map[bittorrent.Peer]int64
|
||||
leechers map[bittorrent.Peer]int64
|
||||
seeders *hashmap.Map[string, int64]
|
||||
leechers *hashmap.Map[string, int64]
|
||||
}
|
||||
|
||||
type peerStore struct {
|
||||
storage.DataStorage
|
||||
cfg Config
|
||||
shards []*peerShard
|
||||
|
||||
closed chan struct{}
|
||||
closed chan any
|
||||
wg sync.WaitGroup
|
||||
onceCloser sync.Once
|
||||
}
|
||||
@@ -147,11 +163,9 @@ func (ps *peerStore) ScheduleStatisticsCollection(reportInterval time.Duration)
|
||||
var numInfohashes, numSeeders, numLeechers uint64
|
||||
|
||||
for _, s := range ps.shards {
|
||||
s.RLock()
|
||||
numInfohashes += uint64(len(s.swarms))
|
||||
numSeeders += s.numSeeders
|
||||
numLeechers += s.numLeechers
|
||||
s.RUnlock()
|
||||
numInfohashes += uint64(s.swarms.Len())
|
||||
numSeeders += uint64(s.numSeeders.Load())
|
||||
numLeechers += uint64(s.numLeechers.Load())
|
||||
}
|
||||
|
||||
storage.PromInfoHashesCount.Set(float64(numInfohashes))
|
||||
@@ -186,29 +200,19 @@ func (ps *peerStore) PutSeeder(_ context.Context, ih bittorrent.InfoHash, p bitt
|
||||
Object("peer", p).
|
||||
Msg("put seeder")
|
||||
|
||||
shard := ps.shards[ps.shardIndex(ih, p.Addr().Is6())]
|
||||
shard.Lock()
|
||||
defer shard.Unlock()
|
||||
sh := ps.shards[ps.shardIndex(ih, p.Addr().Is6())]
|
||||
sw, pId := sh.ginSwarm(ih), p.RawString()
|
||||
|
||||
if _, ok := shard.swarms[ih]; !ok {
|
||||
shard.swarms[ih] = swarm{
|
||||
seeders: make(map[bittorrent.Peer]int64),
|
||||
leechers: make(map[bittorrent.Peer]int64),
|
||||
}
|
||||
if _, exists := sw.seeders.Get(pId); !exists {
|
||||
sh.numSeeders.Add(1)
|
||||
}
|
||||
|
||||
// If this peer isn't already a seeder, update the stats for the swarm.
|
||||
if _, ok := shard.swarms[ih].seeders[p]; !ok {
|
||||
shard.numSeeders++
|
||||
}
|
||||
|
||||
// Update the peer in the swarm.
|
||||
shard.swarms[ih].seeders[p] = timecache.NowUnixNano()
|
||||
sw.seeders.Set(pId, timecache.NowUnixNano())
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (ps *peerStore) DeleteSeeder(_ context.Context, ih bittorrent.InfoHash, p bittorrent.Peer) error {
|
||||
func (ps *peerStore) DeleteSeeder(_ context.Context, ih bittorrent.InfoHash, p bittorrent.Peer) (err error) {
|
||||
select {
|
||||
case <-ps.closed:
|
||||
panic("attempted to interact with stopped memory store")
|
||||
@@ -219,26 +223,16 @@ func (ps *peerStore) DeleteSeeder(_ context.Context, ih bittorrent.InfoHash, p b
|
||||
Object("peer", p).
|
||||
Msg("delete seeder")
|
||||
|
||||
shard := ps.shards[ps.shardIndex(ih, p.Addr().Is6())]
|
||||
shard.Lock()
|
||||
defer shard.Unlock()
|
||||
|
||||
if _, ok := shard.swarms[ih]; !ok {
|
||||
return storage.ErrResourceDoesNotExist
|
||||
sh := ps.shards[ps.shardIndex(ih, p.Addr().Is6())]
|
||||
if sw, ok := sh.swarms.Get(ih); ok {
|
||||
if sw.seeders.Del(p.RawString()) {
|
||||
sh.numSeeders.Add(decrUint64)
|
||||
}
|
||||
} else {
|
||||
err = storage.ErrResourceDoesNotExist
|
||||
}
|
||||
|
||||
if _, ok := shard.swarms[ih].seeders[p]; !ok {
|
||||
return storage.ErrResourceDoesNotExist
|
||||
}
|
||||
|
||||
shard.numSeeders--
|
||||
delete(shard.swarms[ih].seeders, p)
|
||||
|
||||
if len(shard.swarms[ih].seeders)|len(shard.swarms[ih].leechers) == 0 {
|
||||
delete(shard.swarms, ih)
|
||||
}
|
||||
|
||||
return nil
|
||||
return
|
||||
}
|
||||
|
||||
func (ps *peerStore) PutLeecher(_ context.Context, ih bittorrent.InfoHash, p bittorrent.Peer) error {
|
||||
@@ -252,29 +246,19 @@ func (ps *peerStore) PutLeecher(_ context.Context, ih bittorrent.InfoHash, p bit
|
||||
Object("peer", p).
|
||||
Msg("put leecher")
|
||||
|
||||
shard := ps.shards[ps.shardIndex(ih, p.Addr().Is6())]
|
||||
shard.Lock()
|
||||
defer shard.Unlock()
|
||||
sh := ps.shards[ps.shardIndex(ih, p.Addr().Is6())]
|
||||
sw, pId := sh.ginSwarm(ih), p.RawString()
|
||||
|
||||
if _, ok := shard.swarms[ih]; !ok {
|
||||
shard.swarms[ih] = swarm{
|
||||
seeders: make(map[bittorrent.Peer]int64),
|
||||
leechers: make(map[bittorrent.Peer]int64),
|
||||
}
|
||||
if _, exists := sw.leechers.Get(pId); !exists {
|
||||
sh.numLeechers.Add(1)
|
||||
}
|
||||
|
||||
// If this peer isn't already a leecher, update the stats for the swarm.
|
||||
if _, ok := shard.swarms[ih].leechers[p]; !ok {
|
||||
shard.numLeechers++
|
||||
}
|
||||
|
||||
// Update the peer in the swarm.
|
||||
shard.swarms[ih].leechers[p] = timecache.NowUnixNano()
|
||||
sw.leechers.Set(pId, timecache.NowUnixNano())
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (ps *peerStore) DeleteLeecher(_ context.Context, ih bittorrent.InfoHash, p bittorrent.Peer) error {
|
||||
func (ps *peerStore) DeleteLeecher(_ context.Context, ih bittorrent.InfoHash, p bittorrent.Peer) (err error) {
|
||||
select {
|
||||
case <-ps.closed:
|
||||
panic("attempted to interact with stopped memory store")
|
||||
@@ -285,26 +269,16 @@ func (ps *peerStore) DeleteLeecher(_ context.Context, ih bittorrent.InfoHash, p
|
||||
Object("peer", p).
|
||||
Msg("delete leecher")
|
||||
|
||||
shard := ps.shards[ps.shardIndex(ih, p.Addr().Is6())]
|
||||
shard.Lock()
|
||||
defer shard.Unlock()
|
||||
|
||||
if _, ok := shard.swarms[ih]; !ok {
|
||||
return storage.ErrResourceDoesNotExist
|
||||
sh := ps.shards[ps.shardIndex(ih, p.Addr().Is6())]
|
||||
if sw, ok := sh.swarms.Get(ih); ok {
|
||||
if sw.leechers.Del(p.RawString()) {
|
||||
sh.numLeechers.Add(decrUint64)
|
||||
}
|
||||
} else {
|
||||
err = storage.ErrResourceDoesNotExist
|
||||
}
|
||||
|
||||
if _, ok := shard.swarms[ih].leechers[p]; !ok {
|
||||
return storage.ErrResourceDoesNotExist
|
||||
}
|
||||
|
||||
shard.numLeechers--
|
||||
delete(shard.swarms[ih].leechers, p)
|
||||
|
||||
if len(shard.swarms[ih].seeders)|len(shard.swarms[ih].leechers) == 0 {
|
||||
delete(shard.swarms, ih)
|
||||
}
|
||||
|
||||
return nil
|
||||
return
|
||||
}
|
||||
|
||||
func (ps *peerStore) GraduateLeecher(_ context.Context, ih bittorrent.InfoHash, p bittorrent.Peer) error {
|
||||
@@ -318,55 +292,40 @@ func (ps *peerStore) GraduateLeecher(_ context.Context, ih bittorrent.InfoHash,
|
||||
Object("peer", p).
|
||||
Msg("graduate leecher")
|
||||
|
||||
shard := ps.shards[ps.shardIndex(ih, p.Addr().Is6())]
|
||||
shard.Lock()
|
||||
defer shard.Unlock()
|
||||
sh := ps.shards[ps.shardIndex(ih, p.Addr().Is6())]
|
||||
sw, pId := sh.ginSwarm(ih), p.RawString()
|
||||
|
||||
if _, ok := shard.swarms[ih]; !ok {
|
||||
shard.swarms[ih] = swarm{
|
||||
seeders: make(map[bittorrent.Peer]int64),
|
||||
leechers: make(map[bittorrent.Peer]int64),
|
||||
}
|
||||
if sw.leechers.Del(pId) {
|
||||
sh.numLeechers.Add(decrUint64)
|
||||
}
|
||||
|
||||
// If this peer is a leecher, update the stats for the swarm and remove them.
|
||||
if _, ok := shard.swarms[ih].leechers[p]; ok {
|
||||
shard.numLeechers--
|
||||
delete(shard.swarms[ih].leechers, p)
|
||||
if _, exists := sw.seeders.Get(pId); !exists {
|
||||
sh.numSeeders.Add(1)
|
||||
}
|
||||
|
||||
// If this peer isn't already a seeder, update the stats for the swarm.
|
||||
if _, ok := shard.swarms[ih].seeders[p]; !ok {
|
||||
shard.numSeeders++
|
||||
}
|
||||
|
||||
// Update the peer in the swarm.
|
||||
shard.swarms[ih].seeders[p] = timecache.NowUnixNano()
|
||||
sw.seeders.Set(pId, timecache.NowUnixNano())
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func parsePeers(peersMap map[bittorrent.Peer]int64, maxCount int) (peers []bittorrent.Peer) {
|
||||
for p := range peersMap {
|
||||
if maxCount == 0 {
|
||||
break
|
||||
}
|
||||
func parsePeers(peersMap *hashmap.Map[string, int64], maxCount int) (peers []bittorrent.Peer) {
|
||||
peersMap.Range(func(pId string, _ int64) bool {
|
||||
p, _ := bittorrent.NewPeer(pId)
|
||||
peers = append(peers, p)
|
||||
maxCount--
|
||||
}
|
||||
return maxCount > 0
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
func (ps *peerStore) getPeers(shard *peerShard, ih bittorrent.InfoHash, maxCount int, forSeeder bool) (peers []bittorrent.Peer) {
|
||||
shard.RLock()
|
||||
defer shard.RUnlock()
|
||||
if swarm, ok := shard.swarms[ih]; ok {
|
||||
func (ps *peerStore) getPeers(shard *peerShard, ih bittorrent.InfoHash, numWant int, forSeeder bool) (peers []bittorrent.Peer) {
|
||||
if sw, ok := shard.swarms.Get(ih); ok {
|
||||
if forSeeder {
|
||||
peers = parsePeers(swarm.leechers, maxCount)
|
||||
peers = parsePeers(sw.leechers, numWant)
|
||||
} else {
|
||||
peers = append(peers, parsePeers(swarm.seeders, maxCount)...)
|
||||
if maxCount -= len(peers); maxCount > 0 {
|
||||
peers = append(peers, parsePeers(swarm.leechers, maxCount)...)
|
||||
peers = append(peers, parsePeers(sw.seeders, numWant)...)
|
||||
if numWant -= len(peers); numWant > 0 {
|
||||
peers = append(peers, parsePeers(sw.leechers, numWant)...)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -393,11 +352,9 @@ func (ps *peerStore) AnnouncePeers(_ context.Context, ih bittorrent.InfoHash, fo
|
||||
|
||||
func (ps *peerStore) countPeers(ih bittorrent.InfoHash, v6 bool) (leechers, seeders uint32) {
|
||||
shard := ps.shards[ps.shardIndex(ih, v6)]
|
||||
shard.RLock()
|
||||
defer shard.RUnlock()
|
||||
|
||||
if swarm, ok := shard.swarms[ih]; ok {
|
||||
leechers, seeders = uint32(len(swarm.leechers)), uint32(len(swarm.seeders))
|
||||
if sw, ok := shard.swarms.Get(ih); ok {
|
||||
leechers, seeders = uint32(sw.leechers.Len()), uint32(sw.seeders.Len())
|
||||
}
|
||||
return
|
||||
}
|
||||
@@ -421,19 +378,30 @@ func (ps *peerStore) ScrapeSwarm(_ context.Context, ih bittorrent.InfoHash) (lee
|
||||
|
||||
// NewDataStorage creates new in-memory data store
|
||||
func NewDataStorage() storage.DataStorage {
|
||||
return new(dataStore)
|
||||
return &dataStore{
|
||||
hashmap.New[string, *hashmap.Map[string, []byte]](),
|
||||
sync.Mutex{},
|
||||
}
|
||||
}
|
||||
|
||||
type dataStore struct {
|
||||
sync.Map
|
||||
*hashmap.Map[string, *hashmap.Map[string, []byte]]
|
||||
sync.Mutex
|
||||
}
|
||||
|
||||
func (ds *dataStore) Put(_ context.Context, ctx string, values ...storage.Entry) error {
|
||||
if len(values) > 0 {
|
||||
c, _ := ds.LoadOrStore(ctx, new(sync.Map))
|
||||
m := c.(*sync.Map)
|
||||
m, ok := ds.Get(ctx)
|
||||
if !ok {
|
||||
ds.Lock()
|
||||
if m, ok = ds.Get(ctx); !ok {
|
||||
m = hashmap.New[string, []byte]()
|
||||
ds.Insert(ctx, m)
|
||||
}
|
||||
ds.Unlock()
|
||||
}
|
||||
for _, p := range values {
|
||||
m.Store(p.Key, p.Value)
|
||||
m.Set(p.Key, p.Value)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
@@ -441,27 +409,24 @@ func (ds *dataStore) Put(_ context.Context, ctx string, values ...storage.Entry)
|
||||
|
||||
func (ds *dataStore) Contains(_ context.Context, ctx string, key string) (bool, error) {
|
||||
var exist bool
|
||||
if m, found := ds.Map.Load(ctx); found {
|
||||
_, exist = m.(*sync.Map).Load(key)
|
||||
if m, found := ds.Get(ctx); found {
|
||||
_, exist = m.Get(key)
|
||||
}
|
||||
return exist, nil
|
||||
}
|
||||
|
||||
func (ds *dataStore) Load(_ context.Context, ctx string, key string) (out []byte, _ error) {
|
||||
if m, found := ds.Map.Load(ctx); found {
|
||||
if v, _ := m.(*sync.Map).Load(key); v != nil {
|
||||
out = v.([]byte)
|
||||
}
|
||||
if m, found := ds.Map.Get(ctx); found {
|
||||
out, _ = m.Get(key)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (ds *dataStore) Delete(_ context.Context, ctx string, keys ...string) error {
|
||||
if len(keys) > 0 {
|
||||
if m, found := ds.Map.Load(ctx); found {
|
||||
m := m.(*sync.Map)
|
||||
if m, found := ds.Get(ctx); found {
|
||||
for _, k := range keys {
|
||||
m.Delete(k)
|
||||
m.Del(k)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -487,42 +452,43 @@ func (ps *peerStore) gc(cutoff time.Time) {
|
||||
cutoffUnix := cutoff.UnixNano()
|
||||
|
||||
for _, shard := range ps.shards {
|
||||
shard.RLock()
|
||||
var infohashes []bittorrent.InfoHash
|
||||
for ih := range shard.swarms {
|
||||
infohashes = append(infohashes, ih)
|
||||
}
|
||||
shard.RUnlock()
|
||||
infoHashes := make([]bittorrent.InfoHash, 0, shard.swarms.Len())
|
||||
shard.swarms.Range(func(ih bittorrent.InfoHash, sw swarm) bool {
|
||||
infoHashes = append(infoHashes, ih)
|
||||
return true
|
||||
})
|
||||
runtime.Gosched()
|
||||
|
||||
for _, ih := range infohashes {
|
||||
shard.Lock()
|
||||
for _, ih := range infoHashes {
|
||||
|
||||
if _, stillExists := shard.swarms[ih]; !stillExists {
|
||||
shard.Unlock()
|
||||
sw, stillExists := shard.swarms.Get(ih)
|
||||
if !stillExists {
|
||||
runtime.Gosched()
|
||||
continue
|
||||
}
|
||||
|
||||
for pk, mtime := range shard.swarms[ih].leechers {
|
||||
sw.leechers.Range(func(pId string, mtime int64) bool {
|
||||
if mtime <= cutoffUnix {
|
||||
shard.numLeechers--
|
||||
delete(shard.swarms[ih].leechers, pk)
|
||||
sw.leechers.Del(pId)
|
||||
shard.numLeechers.Add(decrUint64)
|
||||
}
|
||||
}
|
||||
return true
|
||||
})
|
||||
|
||||
for pk, mtime := range shard.swarms[ih].seeders {
|
||||
sw.seeders.Range(func(pId string, mtime int64) bool {
|
||||
if mtime <= cutoffUnix {
|
||||
shard.numSeeders--
|
||||
delete(shard.swarms[ih].seeders, pk)
|
||||
sw.seeders.Del(pId)
|
||||
shard.numSeeders.Add(decrUint64)
|
||||
}
|
||||
return true
|
||||
})
|
||||
|
||||
if sw.leechers.Len()|sw.seeders.Len() == 0 {
|
||||
shard.Lock()
|
||||
shard.swarms.Del(ih)
|
||||
shard.Unlock()
|
||||
}
|
||||
|
||||
if len(shard.swarms[ih].seeders)|len(shard.swarms[ih].leechers) == 0 {
|
||||
delete(shard.swarms, ih)
|
||||
}
|
||||
|
||||
shard.Unlock()
|
||||
runtime.Gosched()
|
||||
}
|
||||
|
||||
@@ -542,7 +508,7 @@ func (ps *peerStore) Close() error {
|
||||
// Explicitly deallocate our storage.
|
||||
shards := make([]*peerShard, len(ps.shards))
|
||||
for i := 0; i < len(ps.shards); i++ {
|
||||
shards[i] = &peerShard{swarms: make(map[bittorrent.InfoHash]swarm)}
|
||||
shards[i] = &peerShard{swarms: hashmap.New[bittorrent.InfoHash, swarm]()}
|
||||
}
|
||||
ps.shards = shards
|
||||
})
|
||||
|
||||
Reference in New Issue
Block a user