(WIP) s3 torrent approval test suite

This commit is contained in:
Lawrence, Rendall
2025-09-19 19:11:15 +03:00
parent 4e8f0e29b6
commit 278e2e8b4d
4 changed files with 391 additions and 37 deletions
@@ -53,46 +53,35 @@ func build(conf conf.MapConfig, st storage.DataStorage) (container.Container, er
if err := conf.Unmarshal(c); err != nil {
return nil, fmt.Errorf("unable to deserialise configuration: %w", err)
}
var err error
if c.Period == 0 {
logger.Warn().
Str("name", "Period").
Dur("provided", 0).
Dur("default", defaultPeriod).
Msg("falling back to default configuration")
c.Period = defaultPeriod
}
d := NewScanner(list.List{
Invert: c.Invert,
Storage: st,
StorageCtx: c.StorageCtx,
}, path(c.Path))
go d.Run(c.Period)
return d, err
}, path(c.Path), c.Period)
go d.Run()
return d, nil
}
// BencodeRawBytes wrapper for byte slice to get raw 'info' section from
// torrent file
type BencodeRawBytes []byte
type bencodeRawBytes []byte
// UnmarshalBencode just appends raw byte slice to result
func (ba *BencodeRawBytes) UnmarshalBencode(in []byte) error {
func (ba *bencodeRawBytes) UnmarshalBencode(in []byte) error {
*ba = append([]byte(nil), in...)
return nil
}
type torrentRawInfoStruct struct {
Info BencodeRawBytes `bencode:"info"`
Info bencodeRawBytes `bencode:"info"`
}
type torrentNameInfoStruct struct {
Name string `bencode:"name"`
}
// PathReader - interface for abstract directory reader
// PathReader - interface for abstract directory-like reader
type PathReader interface {
// ReadDir returns names of torrent entries.
// Implementation must return absolute names of entries
// Implementation must return absolute paths of entries
// to fetch torrent file-like data.
ReadDir() (it iter.Seq[string], err error)
// ReadData returns reader for entry data
@@ -125,7 +114,7 @@ func (p path) ReadData(entry string) (io.ReadCloser, error) {
}
// NewScanner creates Scanner instance.
func NewScanner(list list.List, reader PathReader) *Scanner {
func NewScanner(list list.List, reader PathReader, period time.Duration) *Scanner {
if len(list.StorageCtx) == 0 {
logger.Warn().
Str("name", "StorageCtx").
@@ -134,9 +123,18 @@ func NewScanner(list list.List, reader PathReader) *Scanner {
Msg("falling back to default configuration")
list.StorageCtx = container.DefaultStorageCtxName
}
if period == 0 {
logger.Warn().
Str("name", "Period").
Dur("provided", 0).
Dur("default", defaultPeriod).
Msg("falling back to default configuration")
period = defaultPeriod
}
return &Scanner{
List: list,
reader: reader,
period: period,
closed: make(chan bool),
}
}
@@ -145,16 +143,21 @@ func NewScanner(list list.List, reader PathReader) *Scanner {
type Scanner struct {
list.List
reader PathReader
period time.Duration
closed chan bool
}
// Run starts periodic directory scanning and blocks until Stop called
func (d *Scanner) Run(period time.Duration) {
func (d *Scanner) Run() {
if d.reader == nil {
log.Warn().Msg("reader not provided")
return
}
t := time.NewTicker(period)
if d.period == 0 {
log.Warn().Msg("period not provided")
return
}
t := time.NewTicker(d.period)
defer t.Stop()
files := make(map[string][2]bittorrent.InfoHash)
tmpFiles := make(map[string]bool)
@@ -0,0 +1,109 @@
package directory
import (
"context"
"encoding/base64"
"encoding/hex"
"os"
"path/filepath"
"testing"
"time"
"github.com/stretchr/testify/require"
"github.com/sot-tech/mochi/middleware/torrentapproval/container/list"
"github.com/sot-tech/mochi/pkg/conf"
"github.com/sot-tech/mochi/pkg/log"
"github.com/sot-tech/mochi/storage/memory"
)
type testData struct {
name string
data []byte
hash string
}
func unHEX(s string) string {
b, _ := hex.DecodeString(s)
return string(b)
}
func unBase64(s string) []byte {
b, _ := base64.StdEncoding.DecodeString(s)
return b
}
// contains created torrent file data from simple txt documents.
// data base64 encoded because `pieces` section in torrent file is raw bytes
var files = [2]testData{
{
name: "test0.torrent",
data: unBase64(`ZDEwOmNyZWF0ZWQgYnkzMTpUcmFuc21pc3Npb24vNC4wLjYgKDM4YzE2NDkzM2UpMTM6Y3JlYXRp
b24gZGF0ZWkxNzU1ODYxOTI3ZTg6ZW5jb2Rpbmc1OlVURi04NDppbmZvZDY6bGVuZ3RoaTVlNDpu
YW1lODp0ZXN0LnR4dDEyOnBpZWNlIGxlbmd0aGkzMjc2OGU2OnBpZWNlczIwOk4SQ70ixm52wrqe
3cH5E5Tlf5+DZWU=`),
hash: unHEX("a10e8e9e81702bf8482f251551ff4fe011cba6a7"),
},
{
name: "test1.torrent",
data: unBase64(`ZDEwOmNyZWF0ZWQgYnkzMTpUcmFuc21pc3Npb24vNC4wLjYgKDM4YzE2NDkzM2UpMTM6Y3JlYXRp
b24gZGF0ZWkxNzU2MTIzNzEwZTg6ZW5jb2Rpbmc1OlVURi04NDppbmZvZDY6bGVuZ3RoaTRlNDpu
YW1lOTp0ZXN0MC50eHQxMjpwaWVjZSBsZW5ndGhpMzI3NjhlNjpwaWVjZXMyMDqo/cIFqfGcwcdQ
emDE8BsT0R1/0GVl`),
hash: unHEX("e86d393bd458d2acc46d5467bc8cb8b30b1bfa77"),
},
}
func init() {
_ = log.ConfigureLogger("", "warn", false, false)
}
func writeTmp() (string, error) {
tmpDir, err := os.MkdirTemp("", "")
if err != nil {
return "", err
}
for _, f := range files {
err = os.WriteFile(filepath.Join(tmpDir, f.name), f.data, 0644)
if err != nil {
return "", err
}
}
return tmpDir, err
}
func TestScan(t *testing.T) {
tmpDir, err := writeTmp()
t.Cleanup(func() {
err := os.RemoveAll(tmpDir)
if err != nil {
t.Log(err)
}
})
if err != nil {
t.Error(err)
return
}
st, _ := memory.Builder{}.NewDataStorage(make(conf.MapConfig))
d := NewScanner(list.List{
Invert: false,
Storage: st,
StorageCtx: "TEST",
}, path(tmpDir), time.Millisecond*10)
go d.Run()
t.Cleanup(func() {
_ = d.Close()
})
time.Sleep(time.Millisecond * 100)
for _, f := range files {
contains, _ := d.List.Storage.Contains(context.Background(), "TEST", f.hash)
require.True(t, contains, "%s must present", f.name)
_ = os.Remove(filepath.Join(tmpDir, f.name))
}
time.Sleep(time.Millisecond * 100)
for _, f := range files {
contains, _ := d.List.Storage.Contains(context.Background(), "TEST", f.hash)
require.False(t, contains, "%s must absent", f.name)
}
}
+62 -15
View File
@@ -12,9 +12,10 @@ import (
"strings"
"time"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/credentials"
awss3 "github.com/aws/aws-sdk-go-v2/service/s3"
"github.com/aws/smithy-go/logging"
"github.com/sot-tech/mochi/middleware/torrentapproval/container"
"github.com/sot-tech/mochi/middleware/torrentapproval/container/directory"
@@ -32,9 +33,14 @@ const defaultPeriod = time.Minute
// Extends list.Config because uses the same storage and Approved function.
type Config struct {
list.Config
Bucket string
Prefix string
Period time.Duration
Endpoint string
Region string
KeyID string
KeySecret string
SessionToken string
Bucket string
Prefix string
Period time.Duration
}
func init() {
@@ -55,38 +61,79 @@ func build(conf conf.MapConfig, st storage.DataStorage) (container.Container, er
c.Period = defaultPeriod
}
sdkConfig, err := config.LoadDefaultConfig(context.Background())
awsCfg, err := config.LoadDefaultConfig(context.Background())
if err != nil {
return nil, fmt.Errorf("unable load AWS S3 SDK configuration: %w", err)
}
awsCfg.Logger = logging.LoggerFunc(func(classification logging.Classification, format string, v ...interface{}) {
if classification == logging.Debug {
logger.Debug().CallerSkipFrame(1).Msg(fmt.Sprintf(format, v...))
} else if classification == logging.Warn {
logger.Warn().CallerSkipFrame(1).Msg(fmt.Sprintf(format, v...))
}
})
if len(c.Endpoint) > 0 {
awsCfg.BaseEndpoint = &c.Endpoint
}
if len(c.Region) > 0 {
awsCfg.Region = c.Region
}
if len(c.KeyID) > 0 || len(c.KeySecret) > 0 || len(c.SessionToken) > 0 {
awsCfg.Credentials = credentials.NewStaticCredentialsProvider(c.KeyID, c.KeySecret, c.SessionToken)
}
s := directory.NewScanner(list.List{
Invert: c.Invert,
Storage: st,
StorageCtx: c.StorageCtx,
}, s3{client: awss3.NewFromConfig(sdkConfig), bucket: c.Bucket, prefix: c.Prefix})
go s.Run(c.Period)
}, s3{
client: awss3.NewFromConfig(awsCfg),
bucket: c.Bucket,
prefix: c.Prefix,
}, c.Period)
go s.Run()
return s, err
}
type s3Client interface {
ListObjectsV2(
ctx context.Context, input *awss3.ListObjectsV2Input, f ...func(*awss3.Options),
) (*awss3.ListObjectsV2Output, error)
GetObject(
ctx context.Context, params *awss3.GetObjectInput, optFns ...func(*awss3.Options),
) (*awss3.GetObjectOutput, error)
}
type s3 struct {
client *awss3.Client
client s3Client
bucket, prefix string
}
var _ directory.PathReader = s3{}
func (s s3) ReadDir() (it iter.Seq[string], err error) {
entries, err := s.client.ListObjectsV2(context.Background(), &awss3.ListObjectsV2Input{
Bucket: &s.bucket,
Prefix: &s.prefix,
})
search := &awss3.ListObjectsV2Input{Bucket: &s.bucket}
if len(s.prefix) > 0 {
search.Prefix = &s.prefix
}
entries, err := s.client.ListObjectsV2(context.Background(), search)
if err == nil {
it = func(yield func(string) bool) {
for _, e := range entries.Contents {
logger.Trace().Any("content", e).Msg("read dir")
if e.Key != nil && strings.ToLower(filepath.Ext(*e.Key)) == ".torrent" {
if !yield(filepath.Join(s.prefix, *e.Key)) {
var name string
if len(s.prefix) == 0 {
name = *e.Key
} else {
name = filepath.Join(s.prefix, *e.Key)
}
if !yield(name) {
return
}
}
@@ -99,8 +146,8 @@ func (s s3) ReadDir() (it iter.Seq[string], err error) {
func (s s3) ReadData(entry string) (data io.ReadCloser, err error) {
var result *awss3.GetObjectOutput
result, err = s.client.GetObject(context.Background(), &awss3.GetObjectInput{
Bucket: aws.String(s.bucket),
Key: aws.String(entry),
Bucket: &s.bucket,
Key: &entry,
})
if err == nil {
data = result.Body
@@ -0,0 +1,195 @@
package s3
import (
"bytes"
"context"
"encoding/base64"
"encoding/hex"
"io"
"testing"
"time"
"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/credentials"
awss3 "github.com/aws/aws-sdk-go-v2/service/s3"
"github.com/aws/aws-sdk-go-v2/service/s3/types"
"github.com/stretchr/testify/require"
"github.com/sot-tech/mochi/middleware/torrentapproval/container/directory"
"github.com/sot-tech/mochi/middleware/torrentapproval/container/list"
"github.com/sot-tech/mochi/pkg/conf"
"github.com/sot-tech/mochi/pkg/log"
"github.com/sot-tech/mochi/storage/memory"
)
type testData struct {
data []byte
hash string
}
func unHEX(s string) string {
b, _ := hex.DecodeString(s)
return string(b)
}
func unBase64(s string) []byte {
b, _ := base64.StdEncoding.DecodeString(s)
return b
}
// contains created torrent file data from simple txt documents.
// data base64 encoded because `pieces` section in torrent file is raw bytes
var files = map[string]testData{
"test0.torrent": {
data: unBase64(`ZDEwOmNyZWF0ZWQgYnkzMTpUcmFuc21pc3Npb24vNC4wLjYgKDM4YzE2NDkzM2UpMTM6Y3JlYXRp
b24gZGF0ZWkxNzU1ODYxOTI3ZTg6ZW5jb2Rpbmc1OlVURi04NDppbmZvZDY6bGVuZ3RoaTVlNDpu
YW1lODp0ZXN0LnR4dDEyOnBpZWNlIGxlbmd0aGkzMjc2OGU2OnBpZWNlczIwOk4SQ70ixm52wrqe
3cH5E5Tlf5+DZWU=`),
hash: unHEX("a10e8e9e81702bf8482f251551ff4fe011cba6a7"),
},
"test1.torrent": {
data: unBase64(`ZDEwOmNyZWF0ZWQgYnkzMTpUcmFuc21pc3Npb24vNC4wLjYgKDM4YzE2NDkzM2UpMTM6Y3JlYXRp
b24gZGF0ZWkxNzU2MTIzNzEwZTg6ZW5jb2Rpbmc1OlVURi04NDppbmZvZDY6bGVuZ3RoaTRlNDpu
YW1lOTp0ZXN0MC50eHQxMjpwaWVjZSBsZW5ndGhpMzI3NjhlNjpwaWVjZXMyMDqo/cIFqfGcwcdQ
emDE8BsT0R1/0GVl`),
hash: unHEX("e86d393bd458d2acc46d5467bc8cb8b30b1bfa77"),
},
}
func init() {
_ = log.ConfigureLogger("", "debug", false, false)
}
type mockS3 struct {
objs []types.Object
}
func (m *mockS3) ListObjectsV2(
context.Context, *awss3.ListObjectsV2Input, ...func(*awss3.Options),
) (*awss3.ListObjectsV2Output, error) {
return &awss3.ListObjectsV2Output{
Contents: m.objs,
}, nil
}
var _ s3Client = &mockS3{}
func (m *mockS3) GetObject(
_ context.Context, params *awss3.GetObjectInput, _ ...func(*awss3.Options),
) (*awss3.GetObjectOutput, error) {
if params == nil || params.Key == nil {
return nil, nil
}
v := files[*params.Key]
return &awss3.GetObjectOutput{
Body: io.NopCloser(bytes.NewReader(v.data)),
}, nil
}
func TestScanMock(t *testing.T) {
cl := &mockS3{make([]types.Object, 0, len(files))}
for k := range files {
cl.objs = append(cl.objs, types.Object{Key: &k})
}
st, _ := memory.Builder{}.NewDataStorage(make(conf.MapConfig))
d := directory.NewScanner(list.List{
Invert: false,
Storage: st,
StorageCtx: "TEST",
}, s3{
client: cl,
}, time.Millisecond*10)
go d.Run()
t.Cleanup(func() {
_ = d.Close()
})
time.Sleep(time.Millisecond * 100)
for name, f := range files {
contains, _ := d.List.Storage.Contains(context.Background(), "TEST", f.hash)
require.True(t, contains, "%s must present", name)
for i := 0; i < len(cl.objs); i++ {
if *cl.objs[i].Key == name {
cl.objs = append(cl.objs[:i], cl.objs[i+1:]...)
}
}
}
time.Sleep(time.Millisecond * 100)
for name, f := range files {
contains, _ := d.List.Storage.Contains(context.Background(), "TEST", f.hash)
require.False(t, contains, "%s must absent", name)
}
}
var (
minioEndpoint = "http://127.0.0.1:9000"
minioBucket = "test"
)
const (
minioKeyID = "minioadmin"
minioSecret = "minioadmin"
minioRegion = "us-east-1"
)
// TestScanMinio requires real minio instance listening 127.0.0.1:9000
// with default login/password (minioadmin/minioadmin)
func TestScanMinio(t *testing.T) {
st, _ := memory.Builder{}.NewDataStorage(make(conf.MapConfig))
awsCfg, err := config.LoadDefaultConfig(context.Background())
if err != nil {
t.Fatal(err)
}
awsCfg.BaseEndpoint = &minioEndpoint
awsCfg.Region = minioRegion
awsCfg.Credentials = credentials.NewStaticCredentialsProvider(minioKeyID, minioSecret, "")
cl := awss3.NewFromConfig(awsCfg)
_, _ = cl.CreateBucket(context.Background(), &awss3.CreateBucketInput{Bucket: &minioBucket})
for name, data := range files {
_, err = cl.PutObject(context.Background(), &awss3.PutObjectInput{
Bucket: &minioBucket,
Key: &name,
Body: bytes.NewReader(data.data),
})
if err != nil {
t.Fatal(err)
}
}
d := directory.NewScanner(list.List{
Invert: false,
Storage: st,
StorageCtx: "TEST",
}, s3{
client: cl,
bucket: minioBucket,
}, time.Millisecond*100)
go d.Run()
t.Cleanup(func() {
_ = d.Close()
})
time.Sleep(time.Millisecond * 200)
for name, f := range files {
contains, _ := d.List.Storage.Contains(context.Background(), "TEST", f.hash)
require.True(t, contains, "%s must present", name)
_, err = cl.DeleteObject(context.Background(), &awss3.DeleteObjectInput{
Bucket: &minioBucket,
Key: &name,
})
if err != nil {
t.Fatal(err)
}
}
time.Sleep(time.Millisecond * 200)
for name, f := range files {
contains, _ := d.List.Storage.Contains(context.Background(), "TEST", f.hash)
require.False(t, contains, "%s must absent", name)
}
}