From 278e2e8b4d817c281570b291bbc336021a5e7942 Mon Sep 17 00:00:00 2001 From: "Lawrence, Rendall" Date: Fri, 19 Sep 2025 19:11:15 +0300 Subject: [PATCH] (WIP) s3 torrent approval test suite --- .../container/directory/directory.go | 47 +++-- .../container/directory/directory_test.go | 109 ++++++++++ middleware/torrentapproval/container/s3/s3.go | 77 +++++-- .../torrentapproval/container/s3/s3_test.go | 195 ++++++++++++++++++ 4 files changed, 391 insertions(+), 37 deletions(-) create mode 100644 middleware/torrentapproval/container/directory/directory_test.go create mode 100644 middleware/torrentapproval/container/s3/s3_test.go diff --git a/middleware/torrentapproval/container/directory/directory.go b/middleware/torrentapproval/container/directory/directory.go index 979fa43..a6231a0 100644 --- a/middleware/torrentapproval/container/directory/directory.go +++ b/middleware/torrentapproval/container/directory/directory.go @@ -53,46 +53,35 @@ func build(conf conf.MapConfig, st storage.DataStorage) (container.Container, er if err := conf.Unmarshal(c); err != nil { return nil, fmt.Errorf("unable to deserialise configuration: %w", err) } - var err error - if c.Period == 0 { - logger.Warn(). - Str("name", "Period"). - Dur("provided", 0). - Dur("default", defaultPeriod). - Msg("falling back to default configuration") - c.Period = defaultPeriod - } d := NewScanner(list.List{ Invert: c.Invert, Storage: st, StorageCtx: c.StorageCtx, - }, path(c.Path)) - go d.Run(c.Period) - return d, err + }, path(c.Path), c.Period) + go d.Run() + return d, nil } -// BencodeRawBytes wrapper for byte slice to get raw 'info' section from -// torrent file -type BencodeRawBytes []byte +type bencodeRawBytes []byte // UnmarshalBencode just appends raw byte slice to result -func (ba *BencodeRawBytes) UnmarshalBencode(in []byte) error { +func (ba *bencodeRawBytes) UnmarshalBencode(in []byte) error { *ba = append([]byte(nil), in...) return nil } type torrentRawInfoStruct struct { - Info BencodeRawBytes `bencode:"info"` + Info bencodeRawBytes `bencode:"info"` } type torrentNameInfoStruct struct { Name string `bencode:"name"` } -// PathReader - interface for abstract directory reader +// PathReader - interface for abstract directory-like reader type PathReader interface { // ReadDir returns names of torrent entries. - // Implementation must return absolute names of entries + // Implementation must return absolute paths of entries // to fetch torrent file-like data. ReadDir() (it iter.Seq[string], err error) // ReadData returns reader for entry data @@ -125,7 +114,7 @@ func (p path) ReadData(entry string) (io.ReadCloser, error) { } // NewScanner creates Scanner instance. -func NewScanner(list list.List, reader PathReader) *Scanner { +func NewScanner(list list.List, reader PathReader, period time.Duration) *Scanner { if len(list.StorageCtx) == 0 { logger.Warn(). Str("name", "StorageCtx"). @@ -134,9 +123,18 @@ func NewScanner(list list.List, reader PathReader) *Scanner { Msg("falling back to default configuration") list.StorageCtx = container.DefaultStorageCtxName } + if period == 0 { + logger.Warn(). + Str("name", "Period"). + Dur("provided", 0). + Dur("default", defaultPeriod). + Msg("falling back to default configuration") + period = defaultPeriod + } return &Scanner{ List: list, reader: reader, + period: period, closed: make(chan bool), } } @@ -145,16 +143,21 @@ func NewScanner(list list.List, reader PathReader) *Scanner { type Scanner struct { list.List reader PathReader + period time.Duration closed chan bool } // Run starts periodic directory scanning and blocks until Stop called -func (d *Scanner) Run(period time.Duration) { +func (d *Scanner) Run() { if d.reader == nil { log.Warn().Msg("reader not provided") return } - t := time.NewTicker(period) + if d.period == 0 { + log.Warn().Msg("period not provided") + return + } + t := time.NewTicker(d.period) defer t.Stop() files := make(map[string][2]bittorrent.InfoHash) tmpFiles := make(map[string]bool) diff --git a/middleware/torrentapproval/container/directory/directory_test.go b/middleware/torrentapproval/container/directory/directory_test.go new file mode 100644 index 0000000..4292700 --- /dev/null +++ b/middleware/torrentapproval/container/directory/directory_test.go @@ -0,0 +1,109 @@ +package directory + +import ( + "context" + "encoding/base64" + "encoding/hex" + "os" + "path/filepath" + "testing" + "time" + + "github.com/stretchr/testify/require" + + "github.com/sot-tech/mochi/middleware/torrentapproval/container/list" + "github.com/sot-tech/mochi/pkg/conf" + "github.com/sot-tech/mochi/pkg/log" + "github.com/sot-tech/mochi/storage/memory" +) + +type testData struct { + name string + data []byte + hash string +} + +func unHEX(s string) string { + b, _ := hex.DecodeString(s) + return string(b) +} + +func unBase64(s string) []byte { + b, _ := base64.StdEncoding.DecodeString(s) + return b +} + +// contains created torrent file data from simple txt documents. +// data base64 encoded because `pieces` section in torrent file is raw bytes +var files = [2]testData{ + { + name: "test0.torrent", + data: unBase64(`ZDEwOmNyZWF0ZWQgYnkzMTpUcmFuc21pc3Npb24vNC4wLjYgKDM4YzE2NDkzM2UpMTM6Y3JlYXRp +b24gZGF0ZWkxNzU1ODYxOTI3ZTg6ZW5jb2Rpbmc1OlVURi04NDppbmZvZDY6bGVuZ3RoaTVlNDpu +YW1lODp0ZXN0LnR4dDEyOnBpZWNlIGxlbmd0aGkzMjc2OGU2OnBpZWNlczIwOk4SQ70ixm52wrqe +3cH5E5Tlf5+DZWU=`), + hash: unHEX("a10e8e9e81702bf8482f251551ff4fe011cba6a7"), + }, + { + name: "test1.torrent", + data: unBase64(`ZDEwOmNyZWF0ZWQgYnkzMTpUcmFuc21pc3Npb24vNC4wLjYgKDM4YzE2NDkzM2UpMTM6Y3JlYXRp +b24gZGF0ZWkxNzU2MTIzNzEwZTg6ZW5jb2Rpbmc1OlVURi04NDppbmZvZDY6bGVuZ3RoaTRlNDpu +YW1lOTp0ZXN0MC50eHQxMjpwaWVjZSBsZW5ndGhpMzI3NjhlNjpwaWVjZXMyMDqo/cIFqfGcwcdQ +emDE8BsT0R1/0GVl`), + hash: unHEX("e86d393bd458d2acc46d5467bc8cb8b30b1bfa77"), + }, +} + +func init() { + _ = log.ConfigureLogger("", "warn", false, false) +} + +func writeTmp() (string, error) { + tmpDir, err := os.MkdirTemp("", "") + if err != nil { + return "", err + } + for _, f := range files { + err = os.WriteFile(filepath.Join(tmpDir, f.name), f.data, 0644) + if err != nil { + return "", err + } + } + return tmpDir, err +} + +func TestScan(t *testing.T) { + tmpDir, err := writeTmp() + t.Cleanup(func() { + err := os.RemoveAll(tmpDir) + if err != nil { + t.Log(err) + } + }) + if err != nil { + t.Error(err) + return + } + st, _ := memory.Builder{}.NewDataStorage(make(conf.MapConfig)) + d := NewScanner(list.List{ + Invert: false, + Storage: st, + StorageCtx: "TEST", + }, path(tmpDir), time.Millisecond*10) + go d.Run() + t.Cleanup(func() { + _ = d.Close() + }) + time.Sleep(time.Millisecond * 100) + for _, f := range files { + contains, _ := d.List.Storage.Contains(context.Background(), "TEST", f.hash) + require.True(t, contains, "%s must present", f.name) + _ = os.Remove(filepath.Join(tmpDir, f.name)) + } + + time.Sleep(time.Millisecond * 100) + for _, f := range files { + contains, _ := d.List.Storage.Contains(context.Background(), "TEST", f.hash) + require.False(t, contains, "%s must absent", f.name) + } +} diff --git a/middleware/torrentapproval/container/s3/s3.go b/middleware/torrentapproval/container/s3/s3.go index c3a127c..586cd3a 100644 --- a/middleware/torrentapproval/container/s3/s3.go +++ b/middleware/torrentapproval/container/s3/s3.go @@ -12,9 +12,10 @@ import ( "strings" "time" - "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/config" + "github.com/aws/aws-sdk-go-v2/credentials" awss3 "github.com/aws/aws-sdk-go-v2/service/s3" + "github.com/aws/smithy-go/logging" "github.com/sot-tech/mochi/middleware/torrentapproval/container" "github.com/sot-tech/mochi/middleware/torrentapproval/container/directory" @@ -32,9 +33,14 @@ const defaultPeriod = time.Minute // Extends list.Config because uses the same storage and Approved function. type Config struct { list.Config - Bucket string - Prefix string - Period time.Duration + Endpoint string + Region string + KeyID string + KeySecret string + SessionToken string + Bucket string + Prefix string + Period time.Duration } func init() { @@ -55,38 +61,79 @@ func build(conf conf.MapConfig, st storage.DataStorage) (container.Container, er c.Period = defaultPeriod } - sdkConfig, err := config.LoadDefaultConfig(context.Background()) + awsCfg, err := config.LoadDefaultConfig(context.Background()) if err != nil { return nil, fmt.Errorf("unable load AWS S3 SDK configuration: %w", err) } + awsCfg.Logger = logging.LoggerFunc(func(classification logging.Classification, format string, v ...interface{}) { + if classification == logging.Debug { + logger.Debug().CallerSkipFrame(1).Msg(fmt.Sprintf(format, v...)) + } else if classification == logging.Warn { + logger.Warn().CallerSkipFrame(1).Msg(fmt.Sprintf(format, v...)) + } + }) + + if len(c.Endpoint) > 0 { + awsCfg.BaseEndpoint = &c.Endpoint + } + + if len(c.Region) > 0 { + awsCfg.Region = c.Region + } + + if len(c.KeyID) > 0 || len(c.KeySecret) > 0 || len(c.SessionToken) > 0 { + awsCfg.Credentials = credentials.NewStaticCredentialsProvider(c.KeyID, c.KeySecret, c.SessionToken) + } + s := directory.NewScanner(list.List{ Invert: c.Invert, Storage: st, StorageCtx: c.StorageCtx, - }, s3{client: awss3.NewFromConfig(sdkConfig), bucket: c.Bucket, prefix: c.Prefix}) - go s.Run(c.Period) + }, s3{ + client: awss3.NewFromConfig(awsCfg), + bucket: c.Bucket, + prefix: c.Prefix, + }, c.Period) + go s.Run() return s, err } +type s3Client interface { + ListObjectsV2( + ctx context.Context, input *awss3.ListObjectsV2Input, f ...func(*awss3.Options), + ) (*awss3.ListObjectsV2Output, error) + GetObject( + ctx context.Context, params *awss3.GetObjectInput, optFns ...func(*awss3.Options), + ) (*awss3.GetObjectOutput, error) +} + type s3 struct { - client *awss3.Client + client s3Client bucket, prefix string } var _ directory.PathReader = s3{} func (s s3) ReadDir() (it iter.Seq[string], err error) { - entries, err := s.client.ListObjectsV2(context.Background(), &awss3.ListObjectsV2Input{ - Bucket: &s.bucket, - Prefix: &s.prefix, - }) + search := &awss3.ListObjectsV2Input{Bucket: &s.bucket} + if len(s.prefix) > 0 { + search.Prefix = &s.prefix + } + entries, err := s.client.ListObjectsV2(context.Background(), search) if err == nil { it = func(yield func(string) bool) { for _, e := range entries.Contents { + logger.Trace().Any("content", e).Msg("read dir") if e.Key != nil && strings.ToLower(filepath.Ext(*e.Key)) == ".torrent" { - if !yield(filepath.Join(s.prefix, *e.Key)) { + var name string + if len(s.prefix) == 0 { + name = *e.Key + } else { + name = filepath.Join(s.prefix, *e.Key) + } + if !yield(name) { return } } @@ -99,8 +146,8 @@ func (s s3) ReadDir() (it iter.Seq[string], err error) { func (s s3) ReadData(entry string) (data io.ReadCloser, err error) { var result *awss3.GetObjectOutput result, err = s.client.GetObject(context.Background(), &awss3.GetObjectInput{ - Bucket: aws.String(s.bucket), - Key: aws.String(entry), + Bucket: &s.bucket, + Key: &entry, }) if err == nil { data = result.Body diff --git a/middleware/torrentapproval/container/s3/s3_test.go b/middleware/torrentapproval/container/s3/s3_test.go new file mode 100644 index 0000000..a5012cb --- /dev/null +++ b/middleware/torrentapproval/container/s3/s3_test.go @@ -0,0 +1,195 @@ +package s3 + +import ( + "bytes" + "context" + "encoding/base64" + "encoding/hex" + "io" + "testing" + "time" + + "github.com/aws/aws-sdk-go-v2/config" + "github.com/aws/aws-sdk-go-v2/credentials" + awss3 "github.com/aws/aws-sdk-go-v2/service/s3" + "github.com/aws/aws-sdk-go-v2/service/s3/types" + "github.com/stretchr/testify/require" + + "github.com/sot-tech/mochi/middleware/torrentapproval/container/directory" + "github.com/sot-tech/mochi/middleware/torrentapproval/container/list" + "github.com/sot-tech/mochi/pkg/conf" + "github.com/sot-tech/mochi/pkg/log" + "github.com/sot-tech/mochi/storage/memory" +) + +type testData struct { + data []byte + hash string +} + +func unHEX(s string) string { + b, _ := hex.DecodeString(s) + return string(b) +} + +func unBase64(s string) []byte { + b, _ := base64.StdEncoding.DecodeString(s) + return b +} + +// contains created torrent file data from simple txt documents. +// data base64 encoded because `pieces` section in torrent file is raw bytes +var files = map[string]testData{ + "test0.torrent": { + data: unBase64(`ZDEwOmNyZWF0ZWQgYnkzMTpUcmFuc21pc3Npb24vNC4wLjYgKDM4YzE2NDkzM2UpMTM6Y3JlYXRp +b24gZGF0ZWkxNzU1ODYxOTI3ZTg6ZW5jb2Rpbmc1OlVURi04NDppbmZvZDY6bGVuZ3RoaTVlNDpu +YW1lODp0ZXN0LnR4dDEyOnBpZWNlIGxlbmd0aGkzMjc2OGU2OnBpZWNlczIwOk4SQ70ixm52wrqe +3cH5E5Tlf5+DZWU=`), + hash: unHEX("a10e8e9e81702bf8482f251551ff4fe011cba6a7"), + }, + "test1.torrent": { + data: unBase64(`ZDEwOmNyZWF0ZWQgYnkzMTpUcmFuc21pc3Npb24vNC4wLjYgKDM4YzE2NDkzM2UpMTM6Y3JlYXRp +b24gZGF0ZWkxNzU2MTIzNzEwZTg6ZW5jb2Rpbmc1OlVURi04NDppbmZvZDY6bGVuZ3RoaTRlNDpu +YW1lOTp0ZXN0MC50eHQxMjpwaWVjZSBsZW5ndGhpMzI3NjhlNjpwaWVjZXMyMDqo/cIFqfGcwcdQ +emDE8BsT0R1/0GVl`), + hash: unHEX("e86d393bd458d2acc46d5467bc8cb8b30b1bfa77"), + }, +} + +func init() { + _ = log.ConfigureLogger("", "debug", false, false) +} + +type mockS3 struct { + objs []types.Object +} + +func (m *mockS3) ListObjectsV2( + context.Context, *awss3.ListObjectsV2Input, ...func(*awss3.Options), +) (*awss3.ListObjectsV2Output, error) { + return &awss3.ListObjectsV2Output{ + Contents: m.objs, + }, nil +} + +var _ s3Client = &mockS3{} + +func (m *mockS3) GetObject( + _ context.Context, params *awss3.GetObjectInput, _ ...func(*awss3.Options), +) (*awss3.GetObjectOutput, error) { + if params == nil || params.Key == nil { + return nil, nil + } + v := files[*params.Key] + return &awss3.GetObjectOutput{ + Body: io.NopCloser(bytes.NewReader(v.data)), + }, nil +} + +func TestScanMock(t *testing.T) { + cl := &mockS3{make([]types.Object, 0, len(files))} + for k := range files { + cl.objs = append(cl.objs, types.Object{Key: &k}) + } + st, _ := memory.Builder{}.NewDataStorage(make(conf.MapConfig)) + d := directory.NewScanner(list.List{ + Invert: false, + Storage: st, + StorageCtx: "TEST", + }, s3{ + client: cl, + }, time.Millisecond*10) + go d.Run() + t.Cleanup(func() { + _ = d.Close() + }) + + time.Sleep(time.Millisecond * 100) + for name, f := range files { + contains, _ := d.List.Storage.Contains(context.Background(), "TEST", f.hash) + require.True(t, contains, "%s must present", name) + for i := 0; i < len(cl.objs); i++ { + if *cl.objs[i].Key == name { + cl.objs = append(cl.objs[:i], cl.objs[i+1:]...) + } + } + } + + time.Sleep(time.Millisecond * 100) + for name, f := range files { + contains, _ := d.List.Storage.Contains(context.Background(), "TEST", f.hash) + require.False(t, contains, "%s must absent", name) + } +} + +var ( + minioEndpoint = "http://127.0.0.1:9000" + minioBucket = "test" +) + +const ( + minioKeyID = "minioadmin" + minioSecret = "minioadmin" + minioRegion = "us-east-1" +) + +// TestScanMinio requires real minio instance listening 127.0.0.1:9000 +// with default login/password (minioadmin/minioadmin) +func TestScanMinio(t *testing.T) { + st, _ := memory.Builder{}.NewDataStorage(make(conf.MapConfig)) + awsCfg, err := config.LoadDefaultConfig(context.Background()) + if err != nil { + t.Fatal(err) + } + awsCfg.BaseEndpoint = &minioEndpoint + awsCfg.Region = minioRegion + awsCfg.Credentials = credentials.NewStaticCredentialsProvider(minioKeyID, minioSecret, "") + cl := awss3.NewFromConfig(awsCfg) + + _, _ = cl.CreateBucket(context.Background(), &awss3.CreateBucketInput{Bucket: &minioBucket}) + + for name, data := range files { + _, err = cl.PutObject(context.Background(), &awss3.PutObjectInput{ + Bucket: &minioBucket, + Key: &name, + Body: bytes.NewReader(data.data), + }) + if err != nil { + t.Fatal(err) + } + } + + d := directory.NewScanner(list.List{ + Invert: false, + Storage: st, + StorageCtx: "TEST", + }, s3{ + client: cl, + bucket: minioBucket, + }, time.Millisecond*100) + go d.Run() + t.Cleanup(func() { + _ = d.Close() + }) + + time.Sleep(time.Millisecond * 200) + + for name, f := range files { + contains, _ := d.List.Storage.Contains(context.Background(), "TEST", f.hash) + require.True(t, contains, "%s must present", name) + _, err = cl.DeleteObject(context.Background(), &awss3.DeleteObjectInput{ + Bucket: &minioBucket, + Key: &name, + }) + if err != nil { + t.Fatal(err) + } + } + + time.Sleep(time.Millisecond * 200) + + for name, f := range files { + contains, _ := d.List.Storage.Contains(context.Background(), "TEST", f.hash) + require.False(t, contains, "%s must absent", name) + } +}