diff --git a/storage/keydb/storage.go b/storage/keydb/storage.go index d5fddd0..b7b51fd 100644 --- a/storage/keydb/storage.go +++ b/storage/keydb/storage.go @@ -126,19 +126,19 @@ func (s *store) delPeer(ctx context.Context, infoHashKey, peerID string) error { } func (s *store) PutSeeder(ctx context.Context, ih bittorrent.InfoHash, peer bittorrent.Peer) error { - return s.addPeer(ctx, r.InfoHashKey(ih.RawString(), true, peer.Addr().Is6()), r.PackPeer(peer)) + return s.addPeer(ctx, r.InfoHashKey(ih.RawString(), true, peer.Addr().Is6(), ""), r.PackPeer(peer)) } func (s *store) DeleteSeeder(ctx context.Context, ih bittorrent.InfoHash, peer bittorrent.Peer) error { - return s.delPeer(ctx, r.InfoHashKey(ih.RawString(), true, peer.Addr().Is6()), r.PackPeer(peer)) + return s.delPeer(ctx, r.InfoHashKey(ih.RawString(), true, peer.Addr().Is6(), ""), r.PackPeer(peer)) } func (s *store) PutLeecher(ctx context.Context, ih bittorrent.InfoHash, peer bittorrent.Peer) error { - return s.addPeer(ctx, r.InfoHashKey(ih.RawString(), false, peer.Addr().Is6()), r.PackPeer(peer)) + return s.addPeer(ctx, r.InfoHashKey(ih.RawString(), false, peer.Addr().Is6(), ""), r.PackPeer(peer)) } func (s *store) DeleteLeecher(ctx context.Context, ih bittorrent.InfoHash, peer bittorrent.Peer) error { - return s.delPeer(ctx, r.InfoHashKey(ih.RawString(), false, peer.Addr().Is6()), r.PackPeer(peer)) + return s.delPeer(ctx, r.InfoHashKey(ih.RawString(), false, peer.Addr().Is6(), ""), r.PackPeer(peer)) } func (s *store) GraduateLeecher(ctx context.Context, ih bittorrent.InfoHash, peer bittorrent.Peer) (err error) { @@ -147,8 +147,8 @@ func (s *store) GraduateLeecher(ctx context.Context, ih bittorrent.InfoHash, pee Object("peer", peer). Msg("graduate leecher") infoHash, peerID := ih.RawString(), r.PackPeer(peer) - ihSeederKey := r.InfoHashKey(infoHash, true, peer.Addr().Is6()) - ihLeecherKey := r.InfoHashKey(infoHash, false, peer.Addr().Is6()) + ihSeederKey := r.InfoHashKey(infoHash, true, peer.Addr().Is6(), "") + ihLeecherKey := r.InfoHashKey(infoHash, false, peer.Addr().Is6(), "") var moved bool if moved, err = s.SMove(ctx, ihLeecherKey, ihSeederKey, peerID).Result(); err == nil { if !moved { diff --git a/storage/mdb/storage.go b/storage/mdb/storage.go index ee57362..2cb3bed 100644 --- a/storage/mdb/storage.go +++ b/storage/mdb/storage.go @@ -30,10 +30,6 @@ func init() { type builder struct{} func (b builder) NewDataStorage(icfg conf.MapConfig) (storage.DataStorage, error) { - return b.NewPeerStorage(icfg) -} - -func (builder) NewPeerStorage(icfg conf.MapConfig) (storage.PeerStorage, error) { var cfg config if err := icfg.Unmarshal(&cfg); err != nil { return nil, err @@ -41,6 +37,10 @@ func (builder) NewPeerStorage(icfg conf.MapConfig) (storage.PeerStorage, error) return newStorage(cfg) } +func (builder) NewPeerStorage(_ conf.MapConfig) (storage.PeerStorage, error) { + panic("lmdb peer storage not implemented") +} + type config struct { Path string Mode uint32 @@ -120,9 +120,10 @@ func newStorage(cfg config) (*mdb, error) { return } if len(cfg.PeersDBName) > 0 { - peersDB, err = txn.CreateDBI(cfg.PeersDBName) + peersDB, err = txn.OpenDBI(cfg.PeersDBName, lmdb.Create|lmdb.DupSort|lmdb.DupFixed) + } else { - peersDB, err = txn.OpenRoot(0) + peersDB, err = txn.OpenRoot(lmdb.DupSort | lmdb.DupFixed) } return }); err != nil { @@ -155,14 +156,31 @@ func composeKey(ctx, key string) []byte { return res } -func (m *mdb) Put(ctx context.Context, storeCtx string, values ...storage.Entry) error { - //TODO implement me - panic("implement me") +func (m *mdb) Put(_ context.Context, storeCtx string, values ...storage.Entry) (err error) { + if len(values) > 0 { + err = m.Update(func(txn *lmdb.Txn) (err error) { + for _, kv := range values { + if err = txn.Put(m.dataDB, composeKey(storeCtx, kv.Key), kv.Value, 0); err != nil { + break + } + } + return + }) + } + return } -func (m *mdb) Contains(ctx context.Context, storeCtx string, key string) (bool, error) { - //TODO implement me - panic("implement me") +func (m *mdb) Contains(_ context.Context, storeCtx string, key string) (contains bool, err error) { + err = m.View(func(txn *lmdb.Txn) (err error) { + _, err = txn.Get(m.dataDB, composeKey(storeCtx, key)) + return + }) + if err == nil { + contains = true + } else if lmdb.IsNotFound(err) { + err = nil + } + return } func ignoreNotFound(data []byte, err error) ([]byte, error) { @@ -173,54 +191,63 @@ func ignoreNotFound(data []byte, err error) ([]byte, error) { } func (m *mdb) Load(_ context.Context, storeCtx string, key string) (v []byte, err error) { - err = m.Env.View(func(txn *lmdb.Txn) (err error) { + err = m.View(func(txn *lmdb.Txn) (err error) { v, err = ignoreNotFound(txn.Get(m.dataDB, composeKey(storeCtx, key))) return }) return } -func (m *mdb) Delete(ctx context.Context, storeCtx string, keys ...string) error { +func (m *mdb) Delete(_ context.Context, storeCtx string, keys ...string) (err error) { + if len(keys) > 0 { + err = m.Update(func(txn *lmdb.Txn) (err error) { + for _, k := range keys { + if err = txn.Del(m.dataDB, composeKey(storeCtx, k), nil); err != nil { + break + } + } + return + }) + } + return +} + +func (m *mdb) PutSeeder(_ context.Context, ih bittorrent.InfoHash, peer bittorrent.Peer) error { //TODO implement me panic("implement me") } -func (m *mdb) PutSeeder(ctx context.Context, ih bittorrent.InfoHash, peer bittorrent.Peer) error { +func (m *mdb) DeleteSeeder(_ context.Context, ih bittorrent.InfoHash, peer bittorrent.Peer) error { //TODO implement me panic("implement me") } -func (m *mdb) DeleteSeeder(ctx context.Context, ih bittorrent.InfoHash, peer bittorrent.Peer) error { +func (m *mdb) PutLeecher(_ context.Context, ih bittorrent.InfoHash, peer bittorrent.Peer) error { //TODO implement me panic("implement me") } -func (m *mdb) PutLeecher(ctx context.Context, ih bittorrent.InfoHash, peer bittorrent.Peer) error { +func (m *mdb) DeleteLeecher(_ context.Context, ih bittorrent.InfoHash, peer bittorrent.Peer) error { //TODO implement me panic("implement me") } -func (m *mdb) DeleteLeecher(ctx context.Context, ih bittorrent.InfoHash, peer bittorrent.Peer) error { +func (m *mdb) GraduateLeecher(_ context.Context, ih bittorrent.InfoHash, peer bittorrent.Peer) error { //TODO implement me panic("implement me") } -func (m *mdb) GraduateLeecher(ctx context.Context, ih bittorrent.InfoHash, peer bittorrent.Peer) error { +func (m *mdb) AnnouncePeers(_ context.Context, ih bittorrent.InfoHash, forSeeder bool, numWant int, v6 bool) (peers []bittorrent.Peer, err error) { //TODO implement me panic("implement me") } -func (m *mdb) AnnouncePeers(ctx context.Context, ih bittorrent.InfoHash, forSeeder bool, numWant int, v6 bool) (peers []bittorrent.Peer, err error) { +func (m *mdb) ScrapeSwarm(_ context.Context, ih bittorrent.InfoHash) (leechers uint32, seeders uint32, snatched uint32, err error) { //TODO implement me panic("implement me") } -func (m *mdb) ScrapeSwarm(ctx context.Context, ih bittorrent.InfoHash) (leechers uint32, seeders uint32, snatched uint32, err error) { - //TODO implement me - panic("implement me") -} - -func (m *mdb) Ping(ctx context.Context) error { +func (m *mdb) Ping(_ context.Context) error { //TODO implement me panic("implement me") } diff --git a/storage/redis/storage.go b/storage/redis/storage.go index 27e7b71..c6a6518 100644 --- a/storage/redis/storage.go +++ b/storage/redis/storage.go @@ -343,7 +343,7 @@ func NoResultErr(err error) error { } // InfoHashKey generates redis key for provided hash and flags -func InfoHashKey(infoHash string, seeder, v6 bool) (infoHashKey string) { +func InfoHashKey(infoHash string, seeder, v6 bool, suffix string) (infoHashKey string) { var bm int if seeder { bm = 0b01 @@ -361,7 +361,7 @@ func InfoHashKey(infoHash string, seeder, v6 bool) (infoHashKey string) { case 0b00: infoHashKey = IH4LeecherKey } - infoHashKey += infoHash + infoHashKey += infoHash + suffix return } @@ -411,19 +411,19 @@ func PackPeer(p bittorrent.Peer) string { } func (ps *store) PutSeeder(ctx context.Context, ih bittorrent.InfoHash, peer bittorrent.Peer) error { - return ps.putPeer(ctx, InfoHashKey(ih.RawString(), true, peer.Addr().Is6()), CountSeederKey, PackPeer(peer)) + return ps.putPeer(ctx, InfoHashKey(ih.RawString(), true, peer.Addr().Is6(), ""), CountSeederKey, PackPeer(peer)) } func (ps *store) DeleteSeeder(ctx context.Context, ih bittorrent.InfoHash, peer bittorrent.Peer) error { - return ps.delPeer(ctx, InfoHashKey(ih.RawString(), true, peer.Addr().Is6()), CountSeederKey, PackPeer(peer)) + return ps.delPeer(ctx, InfoHashKey(ih.RawString(), true, peer.Addr().Is6(), ""), CountSeederKey, PackPeer(peer)) } func (ps *store) PutLeecher(ctx context.Context, ih bittorrent.InfoHash, peer bittorrent.Peer) error { - return ps.putPeer(ctx, InfoHashKey(ih.RawString(), false, peer.Addr().Is6()), CountLeecherKey, PackPeer(peer)) + return ps.putPeer(ctx, InfoHashKey(ih.RawString(), false, peer.Addr().Is6(), ""), CountLeecherKey, PackPeer(peer)) } func (ps *store) DeleteLeecher(ctx context.Context, ih bittorrent.InfoHash, peer bittorrent.Peer) error { - return ps.delPeer(ctx, InfoHashKey(ih.RawString(), false, peer.Addr().Is6()), CountLeecherKey, PackPeer(peer)) + return ps.delPeer(ctx, InfoHashKey(ih.RawString(), false, peer.Addr().Is6(), ""), CountLeecherKey, PackPeer(peer)) } func (ps *store) GraduateLeecher(ctx context.Context, ih bittorrent.InfoHash, peer bittorrent.Peer) error { @@ -433,7 +433,7 @@ func (ps *store) GraduateLeecher(ctx context.Context, ih bittorrent.InfoHash, pe Msg("graduate leecher") infoHash, peerID, isV6 := ih.RawString(), PackPeer(peer), peer.Addr().Is6() - ihSeederKey, ihLeecherKey := InfoHashKey(infoHash, true, isV6), InfoHashKey(infoHash, false, isV6) + ihSeederKey, ihLeecherKey := InfoHashKey(infoHash, true, isV6, ""), InfoHashKey(infoHash, false, isV6, "") return ps.tx(ctx, func(tx redis.Pipeliner) error { deleted, err := tx.HDel(ctx, ihLeecherKey, peerID).Uint64() @@ -516,10 +516,10 @@ func (ps *Connection) GetPeers(ctx context.Context, ih bittorrent.InfoHash, forS infoHashKeys := make([]string, 1, 2) if forSeeder { - infoHashKeys[0] = InfoHashKey(infoHash, false, isV6) + infoHashKeys[0] = InfoHashKey(infoHash, false, isV6, "") } else { - infoHashKeys[0] = InfoHashKey(infoHash, true, isV6) - infoHashKeys = append(infoHashKeys, InfoHashKey(infoHash, false, isV6)) + infoHashKeys[0] = InfoHashKey(infoHash, true, isV6, "") + infoHashKeys = append(infoHashKeys, InfoHashKey(infoHash, false, isV6, "")) } for _, infoHashKey := range infoHashKeys { @@ -564,19 +564,19 @@ func (ps *Connection) ScrapeIH(ctx context.Context, ih bittorrent.InfoHash, coun infoHash := ih.RawString() var lc4, lc6, sc4, sc6, dc int64 - lc4, err = countFn(ctx, InfoHashKey(infoHash, false, false)).Result() + lc4, err = countFn(ctx, InfoHashKey(infoHash, false, false, "")).Result() if err = NoResultErr(err); err != nil { return } - lc6, err = countFn(ctx, InfoHashKey(infoHash, false, true)).Result() + lc6, err = countFn(ctx, InfoHashKey(infoHash, false, true, "")).Result() if err = NoResultErr(err); err != nil { return } - sc4, err = countFn(ctx, InfoHashKey(infoHash, true, false)).Result() + sc4, err = countFn(ctx, InfoHashKey(infoHash, true, false, "")).Result() if err = NoResultErr(err); err != nil { return } - sc6, err = countFn(ctx, InfoHashKey(infoHash, true, true)).Result() + sc6, err = countFn(ctx, InfoHashKey(infoHash, true, true, "")).Result() if err = NoResultErr(err); err != nil { return }