mirror of
https://github.com/sot-tech/mochi.git
synced 2026-06-19 10:59:46 -07:00
(WIP) add suffix for redis infohash key creation (for future use)
This commit is contained in:
@@ -126,19 +126,19 @@ func (s *store) delPeer(ctx context.Context, infoHashKey, peerID string) error {
|
||||
}
|
||||
|
||||
func (s *store) PutSeeder(ctx context.Context, ih bittorrent.InfoHash, peer bittorrent.Peer) error {
|
||||
return s.addPeer(ctx, r.InfoHashKey(ih.RawString(), true, peer.Addr().Is6()), r.PackPeer(peer))
|
||||
return s.addPeer(ctx, r.InfoHashKey(ih.RawString(), true, peer.Addr().Is6(), ""), r.PackPeer(peer))
|
||||
}
|
||||
|
||||
func (s *store) DeleteSeeder(ctx context.Context, ih bittorrent.InfoHash, peer bittorrent.Peer) error {
|
||||
return s.delPeer(ctx, r.InfoHashKey(ih.RawString(), true, peer.Addr().Is6()), r.PackPeer(peer))
|
||||
return s.delPeer(ctx, r.InfoHashKey(ih.RawString(), true, peer.Addr().Is6(), ""), r.PackPeer(peer))
|
||||
}
|
||||
|
||||
func (s *store) PutLeecher(ctx context.Context, ih bittorrent.InfoHash, peer bittorrent.Peer) error {
|
||||
return s.addPeer(ctx, r.InfoHashKey(ih.RawString(), false, peer.Addr().Is6()), r.PackPeer(peer))
|
||||
return s.addPeer(ctx, r.InfoHashKey(ih.RawString(), false, peer.Addr().Is6(), ""), r.PackPeer(peer))
|
||||
}
|
||||
|
||||
func (s *store) DeleteLeecher(ctx context.Context, ih bittorrent.InfoHash, peer bittorrent.Peer) error {
|
||||
return s.delPeer(ctx, r.InfoHashKey(ih.RawString(), false, peer.Addr().Is6()), r.PackPeer(peer))
|
||||
return s.delPeer(ctx, r.InfoHashKey(ih.RawString(), false, peer.Addr().Is6(), ""), r.PackPeer(peer))
|
||||
}
|
||||
|
||||
func (s *store) GraduateLeecher(ctx context.Context, ih bittorrent.InfoHash, peer bittorrent.Peer) (err error) {
|
||||
@@ -147,8 +147,8 @@ func (s *store) GraduateLeecher(ctx context.Context, ih bittorrent.InfoHash, pee
|
||||
Object("peer", peer).
|
||||
Msg("graduate leecher")
|
||||
infoHash, peerID := ih.RawString(), r.PackPeer(peer)
|
||||
ihSeederKey := r.InfoHashKey(infoHash, true, peer.Addr().Is6())
|
||||
ihLeecherKey := r.InfoHashKey(infoHash, false, peer.Addr().Is6())
|
||||
ihSeederKey := r.InfoHashKey(infoHash, true, peer.Addr().Is6(), "")
|
||||
ihLeecherKey := r.InfoHashKey(infoHash, false, peer.Addr().Is6(), "")
|
||||
var moved bool
|
||||
if moved, err = s.SMove(ctx, ihLeecherKey, ihSeederKey, peerID).Result(); err == nil {
|
||||
if !moved {
|
||||
|
||||
+53
-26
@@ -30,10 +30,6 @@ func init() {
|
||||
type builder struct{}
|
||||
|
||||
func (b builder) NewDataStorage(icfg conf.MapConfig) (storage.DataStorage, error) {
|
||||
return b.NewPeerStorage(icfg)
|
||||
}
|
||||
|
||||
func (builder) NewPeerStorage(icfg conf.MapConfig) (storage.PeerStorage, error) {
|
||||
var cfg config
|
||||
if err := icfg.Unmarshal(&cfg); err != nil {
|
||||
return nil, err
|
||||
@@ -41,6 +37,10 @@ func (builder) NewPeerStorage(icfg conf.MapConfig) (storage.PeerStorage, error)
|
||||
return newStorage(cfg)
|
||||
}
|
||||
|
||||
func (builder) NewPeerStorage(_ conf.MapConfig) (storage.PeerStorage, error) {
|
||||
panic("lmdb peer storage not implemented")
|
||||
}
|
||||
|
||||
type config struct {
|
||||
Path string
|
||||
Mode uint32
|
||||
@@ -120,9 +120,10 @@ func newStorage(cfg config) (*mdb, error) {
|
||||
return
|
||||
}
|
||||
if len(cfg.PeersDBName) > 0 {
|
||||
peersDB, err = txn.CreateDBI(cfg.PeersDBName)
|
||||
peersDB, err = txn.OpenDBI(cfg.PeersDBName, lmdb.Create|lmdb.DupSort|lmdb.DupFixed)
|
||||
|
||||
} else {
|
||||
peersDB, err = txn.OpenRoot(0)
|
||||
peersDB, err = txn.OpenRoot(lmdb.DupSort | lmdb.DupFixed)
|
||||
}
|
||||
return
|
||||
}); err != nil {
|
||||
@@ -155,14 +156,31 @@ func composeKey(ctx, key string) []byte {
|
||||
return res
|
||||
}
|
||||
|
||||
func (m *mdb) Put(ctx context.Context, storeCtx string, values ...storage.Entry) error {
|
||||
//TODO implement me
|
||||
panic("implement me")
|
||||
func (m *mdb) Put(_ context.Context, storeCtx string, values ...storage.Entry) (err error) {
|
||||
if len(values) > 0 {
|
||||
err = m.Update(func(txn *lmdb.Txn) (err error) {
|
||||
for _, kv := range values {
|
||||
if err = txn.Put(m.dataDB, composeKey(storeCtx, kv.Key), kv.Value, 0); err != nil {
|
||||
break
|
||||
}
|
||||
}
|
||||
return
|
||||
})
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (m *mdb) Contains(ctx context.Context, storeCtx string, key string) (bool, error) {
|
||||
//TODO implement me
|
||||
panic("implement me")
|
||||
func (m *mdb) Contains(_ context.Context, storeCtx string, key string) (contains bool, err error) {
|
||||
err = m.View(func(txn *lmdb.Txn) (err error) {
|
||||
_, err = txn.Get(m.dataDB, composeKey(storeCtx, key))
|
||||
return
|
||||
})
|
||||
if err == nil {
|
||||
contains = true
|
||||
} else if lmdb.IsNotFound(err) {
|
||||
err = nil
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func ignoreNotFound(data []byte, err error) ([]byte, error) {
|
||||
@@ -173,54 +191,63 @@ func ignoreNotFound(data []byte, err error) ([]byte, error) {
|
||||
}
|
||||
|
||||
func (m *mdb) Load(_ context.Context, storeCtx string, key string) (v []byte, err error) {
|
||||
err = m.Env.View(func(txn *lmdb.Txn) (err error) {
|
||||
err = m.View(func(txn *lmdb.Txn) (err error) {
|
||||
v, err = ignoreNotFound(txn.Get(m.dataDB, composeKey(storeCtx, key)))
|
||||
return
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
func (m *mdb) Delete(ctx context.Context, storeCtx string, keys ...string) error {
|
||||
func (m *mdb) Delete(_ context.Context, storeCtx string, keys ...string) (err error) {
|
||||
if len(keys) > 0 {
|
||||
err = m.Update(func(txn *lmdb.Txn) (err error) {
|
||||
for _, k := range keys {
|
||||
if err = txn.Del(m.dataDB, composeKey(storeCtx, k), nil); err != nil {
|
||||
break
|
||||
}
|
||||
}
|
||||
return
|
||||
})
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (m *mdb) PutSeeder(_ context.Context, ih bittorrent.InfoHash, peer bittorrent.Peer) error {
|
||||
//TODO implement me
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (m *mdb) PutSeeder(ctx context.Context, ih bittorrent.InfoHash, peer bittorrent.Peer) error {
|
||||
func (m *mdb) DeleteSeeder(_ context.Context, ih bittorrent.InfoHash, peer bittorrent.Peer) error {
|
||||
//TODO implement me
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (m *mdb) DeleteSeeder(ctx context.Context, ih bittorrent.InfoHash, peer bittorrent.Peer) error {
|
||||
func (m *mdb) PutLeecher(_ context.Context, ih bittorrent.InfoHash, peer bittorrent.Peer) error {
|
||||
//TODO implement me
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (m *mdb) PutLeecher(ctx context.Context, ih bittorrent.InfoHash, peer bittorrent.Peer) error {
|
||||
func (m *mdb) DeleteLeecher(_ context.Context, ih bittorrent.InfoHash, peer bittorrent.Peer) error {
|
||||
//TODO implement me
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (m *mdb) DeleteLeecher(ctx context.Context, ih bittorrent.InfoHash, peer bittorrent.Peer) error {
|
||||
func (m *mdb) GraduateLeecher(_ context.Context, ih bittorrent.InfoHash, peer bittorrent.Peer) error {
|
||||
//TODO implement me
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (m *mdb) GraduateLeecher(ctx context.Context, ih bittorrent.InfoHash, peer bittorrent.Peer) error {
|
||||
func (m *mdb) AnnouncePeers(_ context.Context, ih bittorrent.InfoHash, forSeeder bool, numWant int, v6 bool) (peers []bittorrent.Peer, err error) {
|
||||
//TODO implement me
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (m *mdb) AnnouncePeers(ctx context.Context, ih bittorrent.InfoHash, forSeeder bool, numWant int, v6 bool) (peers []bittorrent.Peer, err error) {
|
||||
func (m *mdb) ScrapeSwarm(_ context.Context, ih bittorrent.InfoHash) (leechers uint32, seeders uint32, snatched uint32, err error) {
|
||||
//TODO implement me
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (m *mdb) ScrapeSwarm(ctx context.Context, ih bittorrent.InfoHash) (leechers uint32, seeders uint32, snatched uint32, err error) {
|
||||
//TODO implement me
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (m *mdb) Ping(ctx context.Context) error {
|
||||
func (m *mdb) Ping(_ context.Context) error {
|
||||
//TODO implement me
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
+14
-14
@@ -343,7 +343,7 @@ func NoResultErr(err error) error {
|
||||
}
|
||||
|
||||
// InfoHashKey generates redis key for provided hash and flags
|
||||
func InfoHashKey(infoHash string, seeder, v6 bool) (infoHashKey string) {
|
||||
func InfoHashKey(infoHash string, seeder, v6 bool, suffix string) (infoHashKey string) {
|
||||
var bm int
|
||||
if seeder {
|
||||
bm = 0b01
|
||||
@@ -361,7 +361,7 @@ func InfoHashKey(infoHash string, seeder, v6 bool) (infoHashKey string) {
|
||||
case 0b00:
|
||||
infoHashKey = IH4LeecherKey
|
||||
}
|
||||
infoHashKey += infoHash
|
||||
infoHashKey += infoHash + suffix
|
||||
return
|
||||
}
|
||||
|
||||
@@ -411,19 +411,19 @@ func PackPeer(p bittorrent.Peer) string {
|
||||
}
|
||||
|
||||
func (ps *store) PutSeeder(ctx context.Context, ih bittorrent.InfoHash, peer bittorrent.Peer) error {
|
||||
return ps.putPeer(ctx, InfoHashKey(ih.RawString(), true, peer.Addr().Is6()), CountSeederKey, PackPeer(peer))
|
||||
return ps.putPeer(ctx, InfoHashKey(ih.RawString(), true, peer.Addr().Is6(), ""), CountSeederKey, PackPeer(peer))
|
||||
}
|
||||
|
||||
func (ps *store) DeleteSeeder(ctx context.Context, ih bittorrent.InfoHash, peer bittorrent.Peer) error {
|
||||
return ps.delPeer(ctx, InfoHashKey(ih.RawString(), true, peer.Addr().Is6()), CountSeederKey, PackPeer(peer))
|
||||
return ps.delPeer(ctx, InfoHashKey(ih.RawString(), true, peer.Addr().Is6(), ""), CountSeederKey, PackPeer(peer))
|
||||
}
|
||||
|
||||
func (ps *store) PutLeecher(ctx context.Context, ih bittorrent.InfoHash, peer bittorrent.Peer) error {
|
||||
return ps.putPeer(ctx, InfoHashKey(ih.RawString(), false, peer.Addr().Is6()), CountLeecherKey, PackPeer(peer))
|
||||
return ps.putPeer(ctx, InfoHashKey(ih.RawString(), false, peer.Addr().Is6(), ""), CountLeecherKey, PackPeer(peer))
|
||||
}
|
||||
|
||||
func (ps *store) DeleteLeecher(ctx context.Context, ih bittorrent.InfoHash, peer bittorrent.Peer) error {
|
||||
return ps.delPeer(ctx, InfoHashKey(ih.RawString(), false, peer.Addr().Is6()), CountLeecherKey, PackPeer(peer))
|
||||
return ps.delPeer(ctx, InfoHashKey(ih.RawString(), false, peer.Addr().Is6(), ""), CountLeecherKey, PackPeer(peer))
|
||||
}
|
||||
|
||||
func (ps *store) GraduateLeecher(ctx context.Context, ih bittorrent.InfoHash, peer bittorrent.Peer) error {
|
||||
@@ -433,7 +433,7 @@ func (ps *store) GraduateLeecher(ctx context.Context, ih bittorrent.InfoHash, pe
|
||||
Msg("graduate leecher")
|
||||
|
||||
infoHash, peerID, isV6 := ih.RawString(), PackPeer(peer), peer.Addr().Is6()
|
||||
ihSeederKey, ihLeecherKey := InfoHashKey(infoHash, true, isV6), InfoHashKey(infoHash, false, isV6)
|
||||
ihSeederKey, ihLeecherKey := InfoHashKey(infoHash, true, isV6, ""), InfoHashKey(infoHash, false, isV6, "")
|
||||
|
||||
return ps.tx(ctx, func(tx redis.Pipeliner) error {
|
||||
deleted, err := tx.HDel(ctx, ihLeecherKey, peerID).Uint64()
|
||||
@@ -516,10 +516,10 @@ func (ps *Connection) GetPeers(ctx context.Context, ih bittorrent.InfoHash, forS
|
||||
infoHashKeys := make([]string, 1, 2)
|
||||
|
||||
if forSeeder {
|
||||
infoHashKeys[0] = InfoHashKey(infoHash, false, isV6)
|
||||
infoHashKeys[0] = InfoHashKey(infoHash, false, isV6, "")
|
||||
} else {
|
||||
infoHashKeys[0] = InfoHashKey(infoHash, true, isV6)
|
||||
infoHashKeys = append(infoHashKeys, InfoHashKey(infoHash, false, isV6))
|
||||
infoHashKeys[0] = InfoHashKey(infoHash, true, isV6, "")
|
||||
infoHashKeys = append(infoHashKeys, InfoHashKey(infoHash, false, isV6, ""))
|
||||
}
|
||||
|
||||
for _, infoHashKey := range infoHashKeys {
|
||||
@@ -564,19 +564,19 @@ func (ps *Connection) ScrapeIH(ctx context.Context, ih bittorrent.InfoHash, coun
|
||||
infoHash := ih.RawString()
|
||||
var lc4, lc6, sc4, sc6, dc int64
|
||||
|
||||
lc4, err = countFn(ctx, InfoHashKey(infoHash, false, false)).Result()
|
||||
lc4, err = countFn(ctx, InfoHashKey(infoHash, false, false, "")).Result()
|
||||
if err = NoResultErr(err); err != nil {
|
||||
return
|
||||
}
|
||||
lc6, err = countFn(ctx, InfoHashKey(infoHash, false, true)).Result()
|
||||
lc6, err = countFn(ctx, InfoHashKey(infoHash, false, true, "")).Result()
|
||||
if err = NoResultErr(err); err != nil {
|
||||
return
|
||||
}
|
||||
sc4, err = countFn(ctx, InfoHashKey(infoHash, true, false)).Result()
|
||||
sc4, err = countFn(ctx, InfoHashKey(infoHash, true, false, "")).Result()
|
||||
if err = NoResultErr(err); err != nil {
|
||||
return
|
||||
}
|
||||
sc6, err = countFn(ctx, InfoHashKey(infoHash, true, true)).Result()
|
||||
sc6, err = countFn(ctx, InfoHashKey(infoHash, true, true, "")).Result()
|
||||
if err = NoResultErr(err); err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user