From 37c40f9231d8730b0fb9a301ee811c28d85c274d Mon Sep 17 00:00:00 2001 From: "Lawrence, Rendall" Date: Tue, 21 May 2024 18:39:55 +0300 Subject: [PATCH] (WIP) add LMDB storage --- go.mod | 1 + storage/mdb/storage.go | 226 ++++++++++++++++++++++++++++++++ storage/mdb/storage_disabled.go | 3 + storage/memory/storage.go | 8 +- 4 files changed, 232 insertions(+), 6 deletions(-) create mode 100644 storage/mdb/storage.go create mode 100644 storage/mdb/storage_disabled.go diff --git a/go.mod b/go.mod index c95dcf8..a3f6332 100644 --- a/go.mod +++ b/go.mod @@ -7,6 +7,7 @@ require ( github.com/MicahParks/jwkset v0.5.17 github.com/MicahParks/keyfunc/v3 v3.3.2 github.com/anacrolix/torrent v1.55.0 + github.com/bmatsuo/lmdb-go v1.8.0 github.com/cespare/xxhash/v2 v2.3.0 github.com/golang-jwt/jwt/v5 v5.2.1 github.com/jackc/pgx/v5 v5.5.5 diff --git a/storage/mdb/storage.go b/storage/mdb/storage.go new file mode 100644 index 0000000..ee57362 --- /dev/null +++ b/storage/mdb/storage.go @@ -0,0 +1,226 @@ +//go:build lmdb && cgo + +package mdb + +import ( + "context" + "errors" + "github.com/bmatsuo/lmdb-go/lmdb" + "github.com/sot-tech/mochi/bittorrent" + "github.com/sot-tech/mochi/pkg/conf" + "github.com/sot-tech/mochi/pkg/log" + "github.com/sot-tech/mochi/storage" + "os" +) + +const ( + // Name - registered name of the storage + Name = "lmdb" + defaultMode = 0o640 + defaultMapSize = 1 << 28 +) + +var logger = log.NewLogger("storage/memory") + +func init() { + // Register the storage driver. + storage.RegisterDriver(Name, builder{}) +} + +type builder struct{} + +func (b builder) NewDataStorage(icfg conf.MapConfig) (storage.DataStorage, error) { + return b.NewPeerStorage(icfg) +} + +func (builder) NewPeerStorage(icfg conf.MapConfig) (storage.PeerStorage, error) { + var cfg config + if err := icfg.Unmarshal(&cfg); err != nil { + return nil, err + } + return newStorage(cfg) +} + +type config struct { + Path string + Mode uint32 + DataDBName string `cfg:"data_db"` + PeersDBName string `cfg:"peers_db"` + MaxSize int64 `cfg:"max_size"` +} + +var ( + errPathNotProvided = errors.New("lmdb path not provided") + errPathNotDirectory = errors.New("lmdb path is not directory") +) + +func (cfg config) validate() (config, error) { + validCfg := cfg + if len(cfg.Path) == 0 { + return cfg, errPathNotProvided + } else { + if stat, err := os.Stat(cfg.Path); err != nil { + return cfg, err + } else if !stat.IsDir() { + return cfg, errPathNotDirectory + } + } + if cfg.Mode == 0 { + validCfg.Mode = defaultMode + logger.Warn(). + Str("name", "mode"). + Stringer("provided", os.FileMode(cfg.Mode)). + Stringer("default", os.FileMode(validCfg.Mode)). + Msg("falling back to default configuration") + } + if cfg.MaxSize == 0 { + validCfg.MaxSize = defaultMapSize + logger.Warn(). + Str("name", "max_size"). + Int64("provided", cfg.MaxSize). + Int64("default", validCfg.MaxSize). + Msg("falling back to default configuration") + } + return validCfg, nil +} + +type mdb struct { + *lmdb.Env + dataDB, peersDB lmdb.DBI +} + +func newStorage(cfg config) (*mdb, error) { + var err error + if cfg, err = cfg.validate(); err != nil { + return nil, err + } + env, err := lmdb.NewEnv() + if err != nil { + return nil, err + } + if err = env.SetMaxDBs(2); err != nil { + return nil, err + } + if err = env.SetMapSize(cfg.MaxSize); err != nil { + return nil, err + } + + if err = env.Open(cfg.Path, 0, os.FileMode(cfg.Mode)); err != nil { + return nil, err + } + + var dataDB, peersDB lmdb.DBI + if err = env.Update(func(txn *lmdb.Txn) (err error) { + if len(cfg.DataDBName) > 0 { + dataDB, err = txn.CreateDBI(cfg.DataDBName) + } else { + dataDB, err = txn.OpenRoot(0) + } + if err != nil { + return + } + if len(cfg.PeersDBName) > 0 { + peersDB, err = txn.CreateDBI(cfg.PeersDBName) + } else { + peersDB, err = txn.OpenRoot(0) + } + return + }); err != nil { + _ = env.Close() + return nil, err + } + + return &mdb{env, dataDB, peersDB}, nil +} + +func (*mdb) Preservable() bool { + return true +} + +func (m *mdb) Close() (err error) { + if m.Env != nil { + err = m.Env.Close() + } + return +} + +const keySeparator = '_' + +func composeKey(ctx, key string) []byte { + ctxLen := len(ctx) + res := make([]byte, ctxLen+len(key)+1) + copy(res, ctx) + res[ctxLen] = keySeparator + copy(res[ctxLen+1:], key) + return res +} + +func (m *mdb) Put(ctx context.Context, storeCtx string, values ...storage.Entry) error { + //TODO implement me + panic("implement me") +} + +func (m *mdb) Contains(ctx context.Context, storeCtx string, key string) (bool, error) { + //TODO implement me + panic("implement me") +} + +func ignoreNotFound(data []byte, err error) ([]byte, error) { + if err != nil && lmdb.IsNotFound(err) { + err = nil + } + return data, err +} + +func (m *mdb) Load(_ context.Context, storeCtx string, key string) (v []byte, err error) { + err = m.Env.View(func(txn *lmdb.Txn) (err error) { + v, err = ignoreNotFound(txn.Get(m.dataDB, composeKey(storeCtx, key))) + return + }) + return +} + +func (m *mdb) Delete(ctx context.Context, storeCtx string, keys ...string) error { + //TODO implement me + panic("implement me") +} + +func (m *mdb) PutSeeder(ctx context.Context, ih bittorrent.InfoHash, peer bittorrent.Peer) error { + //TODO implement me + panic("implement me") +} + +func (m *mdb) DeleteSeeder(ctx context.Context, ih bittorrent.InfoHash, peer bittorrent.Peer) error { + //TODO implement me + panic("implement me") +} + +func (m *mdb) PutLeecher(ctx context.Context, ih bittorrent.InfoHash, peer bittorrent.Peer) error { + //TODO implement me + panic("implement me") +} + +func (m *mdb) DeleteLeecher(ctx context.Context, ih bittorrent.InfoHash, peer bittorrent.Peer) error { + //TODO implement me + panic("implement me") +} + +func (m *mdb) GraduateLeecher(ctx context.Context, ih bittorrent.InfoHash, peer bittorrent.Peer) error { + //TODO implement me + panic("implement me") +} + +func (m *mdb) AnnouncePeers(ctx context.Context, ih bittorrent.InfoHash, forSeeder bool, numWant int, v6 bool) (peers []bittorrent.Peer, err error) { + //TODO implement me + panic("implement me") +} + +func (m *mdb) ScrapeSwarm(ctx context.Context, ih bittorrent.InfoHash) (leechers uint32, seeders uint32, snatched uint32, err error) { + //TODO implement me + panic("implement me") +} + +func (m *mdb) Ping(ctx context.Context) error { + //TODO implement me + panic("implement me") +} diff --git a/storage/mdb/storage_disabled.go b/storage/mdb/storage_disabled.go new file mode 100644 index 0000000..8e3863a --- /dev/null +++ b/storage/mdb/storage_disabled.go @@ -0,0 +1,3 @@ +//go:build !lmdb || !cgo + +package mdb diff --git a/storage/memory/storage.go b/storage/memory/storage.go index 28ec7ce..61a5d7c 100644 --- a/storage/memory/storage.go +++ b/storage/memory/storage.go @@ -53,11 +53,7 @@ type config struct { ShardCount int `cfg:"shard_count"` } -// Validate sanity checks values set in a config and returns a new config with -// default values replacing anything that is invalid. -// -// This function warns to the logger when a value is changed. -func (cfg config) Validate() config { +func (cfg config) validate() config { validcfg := cfg if cfg.ShardCount <= 0 || cfg.ShardCount > (math.MaxInt/2) { @@ -73,7 +69,7 @@ func (cfg config) Validate() config { } func peerStorage(provided config) (storage.PeerStorage, error) { - cfg := provided.Validate() + cfg := provided.validate() ps := &peerStore{ shards: make([]*peerShard, cfg.ShardCount*2), DataStorage: dataStorage(),