Files
mochi/storage/mdb/storage.go

254 lines
5.7 KiB
Go

//go:build lmdb && cgo
package mdb
import (
"context"
"errors"
"github.com/bmatsuo/lmdb-go/lmdb"
"github.com/sot-tech/mochi/bittorrent"
"github.com/sot-tech/mochi/pkg/conf"
"github.com/sot-tech/mochi/pkg/log"
"github.com/sot-tech/mochi/storage"
"os"
)
const (
// Name - registered name of the storage
Name = "lmdb"
defaultMode = 0o640
defaultMapSize = 1 << 28
)
var logger = log.NewLogger("storage/memory")
func init() {
// Register the storage driver.
storage.RegisterDriver(Name, builder{})
}
type builder struct{}
func (b builder) NewDataStorage(icfg conf.MapConfig) (storage.DataStorage, error) {
var cfg config
if err := icfg.Unmarshal(&cfg); err != nil {
return nil, err
}
return newStorage(cfg)
}
func (builder) NewPeerStorage(_ conf.MapConfig) (storage.PeerStorage, error) {
panic("lmdb peer storage not implemented")
}
type config struct {
Path string
Mode uint32
DataDBName string `cfg:"data_db"`
PeersDBName string `cfg:"peers_db"`
MaxSize int64 `cfg:"max_size"`
}
var (
errPathNotProvided = errors.New("lmdb path not provided")
errPathNotDirectory = errors.New("lmdb path is not directory")
)
func (cfg config) validate() (config, error) {
validCfg := cfg
if len(cfg.Path) == 0 {
return cfg, errPathNotProvided
} else {
if stat, err := os.Stat(cfg.Path); err != nil {
return cfg, err
} else if !stat.IsDir() {
return cfg, errPathNotDirectory
}
}
if cfg.Mode == 0 {
validCfg.Mode = defaultMode
logger.Warn().
Str("name", "mode").
Stringer("provided", os.FileMode(cfg.Mode)).
Stringer("default", os.FileMode(validCfg.Mode)).
Msg("falling back to default configuration")
}
if cfg.MaxSize == 0 {
validCfg.MaxSize = defaultMapSize
logger.Warn().
Str("name", "max_size").
Int64("provided", cfg.MaxSize).
Int64("default", validCfg.MaxSize).
Msg("falling back to default configuration")
}
return validCfg, nil
}
type mdb struct {
*lmdb.Env
dataDB, peersDB lmdb.DBI
}
func newStorage(cfg config) (*mdb, error) {
var err error
if cfg, err = cfg.validate(); err != nil {
return nil, err
}
env, err := lmdb.NewEnv()
if err != nil {
return nil, err
}
if err = env.SetMaxDBs(2); err != nil {
return nil, err
}
if err = env.SetMapSize(cfg.MaxSize); err != nil {
return nil, err
}
if err = env.Open(cfg.Path, 0, os.FileMode(cfg.Mode)); err != nil {
return nil, err
}
var dataDB, peersDB lmdb.DBI
if err = env.Update(func(txn *lmdb.Txn) (err error) {
if len(cfg.DataDBName) > 0 {
dataDB, err = txn.CreateDBI(cfg.DataDBName)
} else {
dataDB, err = txn.OpenRoot(0)
}
if err != nil {
return
}
if len(cfg.PeersDBName) > 0 {
peersDB, err = txn.OpenDBI(cfg.PeersDBName, lmdb.Create|lmdb.DupSort|lmdb.DupFixed)
} else {
peersDB, err = txn.OpenRoot(lmdb.DupSort | lmdb.DupFixed)
}
return
}); err != nil {
_ = env.Close()
return nil, err
}
return &mdb{env, dataDB, peersDB}, nil
}
func (*mdb) Preservable() bool {
return true
}
func (m *mdb) Close() (err error) {
if m.Env != nil {
err = m.Env.Close()
}
return
}
const keySeparator = '_'
func composeKey(ctx, key string) []byte {
ctxLen := len(ctx)
res := make([]byte, ctxLen+len(key)+1)
copy(res, ctx)
res[ctxLen] = keySeparator
copy(res[ctxLen+1:], key)
return res
}
func (m *mdb) Put(_ context.Context, storeCtx string, values ...storage.Entry) (err error) {
if len(values) > 0 {
err = m.Update(func(txn *lmdb.Txn) (err error) {
for _, kv := range values {
if err = txn.Put(m.dataDB, composeKey(storeCtx, kv.Key), kv.Value, 0); err != nil {
break
}
}
return
})
}
return
}
func (m *mdb) Contains(_ context.Context, storeCtx string, key string) (contains bool, err error) {
err = m.View(func(txn *lmdb.Txn) (err error) {
_, err = txn.Get(m.dataDB, composeKey(storeCtx, key))
return
})
if err == nil {
contains = true
} else if lmdb.IsNotFound(err) {
err = nil
}
return
}
func ignoreNotFound(data []byte, err error) ([]byte, error) {
if err != nil && lmdb.IsNotFound(err) {
err = nil
}
return data, err
}
func (m *mdb) Load(_ context.Context, storeCtx string, key string) (v []byte, err error) {
err = m.View(func(txn *lmdb.Txn) (err error) {
v, err = ignoreNotFound(txn.Get(m.dataDB, composeKey(storeCtx, key)))
return
})
return
}
func (m *mdb) Delete(_ context.Context, storeCtx string, keys ...string) (err error) {
if len(keys) > 0 {
err = m.Update(func(txn *lmdb.Txn) (err error) {
for _, k := range keys {
if err = txn.Del(m.dataDB, composeKey(storeCtx, k), nil); err != nil {
break
}
}
return
})
}
return
}
func (m *mdb) PutSeeder(_ context.Context, ih bittorrent.InfoHash, peer bittorrent.Peer) error {
//TODO implement me
panic("implement me")
}
func (m *mdb) DeleteSeeder(_ context.Context, ih bittorrent.InfoHash, peer bittorrent.Peer) error {
//TODO implement me
panic("implement me")
}
func (m *mdb) PutLeecher(_ context.Context, ih bittorrent.InfoHash, peer bittorrent.Peer) error {
//TODO implement me
panic("implement me")
}
func (m *mdb) DeleteLeecher(_ context.Context, ih bittorrent.InfoHash, peer bittorrent.Peer) error {
//TODO implement me
panic("implement me")
}
func (m *mdb) GraduateLeecher(_ context.Context, ih bittorrent.InfoHash, peer bittorrent.Peer) error {
//TODO implement me
panic("implement me")
}
func (m *mdb) AnnouncePeers(_ context.Context, ih bittorrent.InfoHash, forSeeder bool, numWant int, v6 bool) (peers []bittorrent.Peer, err error) {
//TODO implement me
panic("implement me")
}
func (m *mdb) ScrapeSwarm(_ context.Context, ih bittorrent.InfoHash) (leechers uint32, seeders uint32, snatched uint32, err error) {
//TODO implement me
panic("implement me")
}
func (m *mdb) Ping(_ context.Context) error {
//TODO implement me
panic("implement me")
}