change async torrent approval directory watch to periodic

This commit is contained in:
Lawrence, Rendall
2024-09-01 15:54:23 +03:00
parent c16f85c79a
commit ee493b845c
7 changed files with 171 additions and 90 deletions

View File

@@ -6,22 +6,34 @@ package directory
import (
"context"
"crypto/sha1"
"fmt"
"io"
"os"
"path/filepath"
"strings"
"time"
"github.com/anacrolix/torrent/metainfo"
"github.com/anacrolix/torrent/util/dirwatch"
"github.com/minio/sha256-simd"
"github.com/zeebo/bencode"
"github.com/sot-tech/mochi/bittorrent"
"github.com/sot-tech/mochi/middleware/torrentapproval/container"
"github.com/sot-tech/mochi/middleware/torrentapproval/container/list"
"github.com/sot-tech/mochi/pkg/conf"
"github.com/sot-tech/mochi/pkg/log"
"github.com/sot-tech/mochi/pkg/str2bytes"
"github.com/sot-tech/mochi/storage"
)
var logger = log.NewLogger("middleware/torrent approval/directory")
const (
defaultPeriod = time.Minute
maxTorrentSize = 10 * 1024 * 1024
)
func init() {
container.Register("directory", build)
}
@@ -32,6 +44,8 @@ type Config struct {
list.Config
// Path in filesystem where torrent files stored and should be watched
Path string
// Period is time between two Path checks
Period time.Duration
}
func build(conf conf.MapConfig, st storage.DataStorage) (container.Container, error) {
@@ -46,7 +60,7 @@ func build(conf conf.MapConfig, st storage.DataStorage) (container.Container, er
Storage: st,
StorageCtx: c.StorageCtx,
},
watcher: nil,
closed: make(chan bool),
}
if len(d.StorageCtx) == 0 {
logger.Warn().
@@ -56,77 +70,132 @@ func build(conf conf.MapConfig, st storage.DataStorage) (container.Container, er
Msg("falling back to default configuration")
d.StorageCtx = container.DefaultStorageCtxName
}
var w *dirwatch.Instance
if w, err = dirwatch.New(c.Path); err != nil {
return nil, fmt.Errorf("unable to initialize directory watch: %w", err)
if c.Period == 0 {
logger.Warn().
Str("name", "Period").
Dur("provided", 0).
Dur("default", defaultPeriod).
Msg("falling back to default configuration")
c.Period = defaultPeriod
}
d.watcher = w
go func() {
for event := range d.watcher.Events {
var mi *metainfo.MetaInfo
if mi, err = metainfo.LoadFromFile(event.TorrentFilePath); err == nil {
s256 := sha256.New()
s256.Write(mi.InfoBytes)
v2hash, _ := bittorrent.NewInfoHash(s256.Sum(nil))
switch event.Change {
case dirwatch.Added:
var name string
if info, err := mi.UnmarshalInfo(); err == nil {
name = info.Name
} else {
logger.Error().
Err(err).
Str("file", event.TorrentFilePath).
Stringer("infoHash", event.InfoHash).
Stringer("infoHashV2", v2hash).
Msg("unable to unmarshal torrent info")
go d.runScan(c.Path, c.Period)
return d, err
}
// BencodeRawBytes wrapper for byte slice to get raw 'info' section from
// torrent file
type BencodeRawBytes []byte
// UnmarshalBencode just appends raw byte slice to result
func (ba *BencodeRawBytes) UnmarshalBencode(in []byte) error {
*ba = append([]byte(nil), in...)
return nil
}
type torrentRawInfoStruct struct {
Info BencodeRawBytes `bencode:"info"`
}
type torrentNameInfoStruct struct {
Name string `bencode:"name"`
}
func (d *directory) runScan(path string, period time.Duration) {
t := time.NewTicker(period)
defer t.Stop()
files := make(map[string][2]bittorrent.InfoHash)
tmpFiles := make(map[string]bool)
// nolint:gosec
s1, s2 := sha1.New(), sha256.New()
for {
select {
case <-d.closed:
return
case <-t.C:
logger.Debug().Msg("starting directory scan")
if entries, err := os.ReadDir(path); err == nil {
for _, e := range entries {
if !e.IsDir() && strings.ToLower(filepath.Ext(e.Name())) == ".torrent" {
tmpFiles[filepath.Join(path, e.Name())] = true
}
if len(name) == 0 {
name = list.DUMMY
}
bName := []byte(name)
logger.Err(d.Storage.Put(context.Background(), d.StorageCtx, storage.Entry{
Key: event.InfoHash.AsString(),
Value: bName,
}, storage.Entry{
Key: v2hash.RawString(),
Value: bName,
}, storage.Entry{
Key: v2hash.TruncateV1().RawString(),
Value: bName,
})).
Str("action", "add").
Str("file", event.TorrentFilePath).
Stringer("infoHash", event.InfoHash).
Stringer("infoHashV2", v2hash).
Msg("approval torrent watcher event")
case dirwatch.Removed:
logger.Err(d.Storage.Delete(context.Background(), c.StorageCtx, event.InfoHash.AsString(), v2hash.RawString(), v2hash.TruncateV1().RawString())).
Str("action", "delete").
Str("file", event.TorrentFilePath).
Stringer("infoHash", event.InfoHash).
Stringer("infoHashV2", v2hash).
Msg("approval torrent watcher event")
}
for p := range tmpFiles {
if _, exists := files[p]; !exists {
var f *os.File
if f, err = os.Open(p); err == nil {
var info torrentRawInfoStruct
err = bencode.NewDecoder(io.LimitReader(f, maxTorrentSize)).Decode(&info)
_ = f.Close()
if err == nil {
s1.Write(info.Info)
h1, _ := bittorrent.NewInfoHash(s1.Sum(nil))
s1.Reset()
s2.Write(info.Info)
h2, _ := bittorrent.NewInfoHash(s2.Sum(nil))
s2.Reset()
files[p] = [2]bittorrent.InfoHash{h1, h2}
var name torrentNameInfoStruct
if err := bencode.DecodeBytes(info.Info, &name); err != nil {
logger.Warn().
Err(err).
Str("file", p).
Msg("unable to unmarshal torrent info")
}
if len(name.Name) == 0 {
name.Name = list.DUMMY
}
bName := str2bytes.StringToBytes(name.Name)
logger.Err(d.Storage.Put(context.Background(), d.StorageCtx, storage.Entry{
Key: h1.RawString(),
Value: bName,
}, storage.Entry{
Key: h2.RawString(),
Value: bName,
}, storage.Entry{
Key: h2.TruncateV1().RawString(),
Value: bName,
})).
Str("file", p).
Stringer("infoHash", h1).
Stringer("infoHashV2", h2).
Msg("added torrent to approval list")
}
}
if err != nil {
logger.Warn().Err(err).Str("file", p).Msg("unable to read file")
}
}
}
for p, ih := range files {
if _, isOk := tmpFiles[p]; !isOk {
delete(files, p)
logger.Err(d.Storage.Delete(context.Background(), d.StorageCtx, ih[0].RawString(),
ih[1].RawString(), ih[1].TruncateV1().RawString())).
Str("file", p).
Stringer("infoHash", ih[1]).
Stringer("infoHashV2", ih[1]).
Msg("deleted torrent from approval list")
}
}
clear(tmpFiles)
} else {
logger.Error().Err(err).
Str("file", event.TorrentFilePath).
Msg("unable to load torrent file")
logger.Warn().Err(err).Msg("unable to get directory content")
}
}
}()
return d, err
}
}
type directory struct {
list.List
watcher *dirwatch.Instance
closed chan bool
}
// Close closes watching of torrent directory
func (d *directory) Close() error {
if d.watcher != nil {
d.watcher.Close()
if d.closed != nil {
close(d.closed)
}
return nil
}

View File

@@ -95,6 +95,7 @@ func TestHandleAnnounce(t *testing.T) {
} else {
require.Equal(t, err, ErrTorrentUnapproved)
}
_ = h.(*hook).Close()
})
}
}