mirror of
https://github.com/sot-tech/mochi.git
synced 2026-06-08 14:11:54 -07:00
S3 container for torrentapproval
This commit is contained in:
@@ -9,6 +9,8 @@ require (
|
||||
github.com/MicahParks/jwkset v0.8.0
|
||||
github.com/MicahParks/keyfunc/v3 v3.3.10
|
||||
github.com/PowerDNS/lmdb-go v1.9.3
|
||||
github.com/aws/aws-sdk-go-v2/config v1.29.12
|
||||
github.com/aws/aws-sdk-go-v2/service/s3 v1.79.0
|
||||
github.com/cespare/xxhash/v2 v2.3.0
|
||||
github.com/fasthttp/router v1.5.4
|
||||
github.com/golang-jwt/jwt/v5 v5.2.2
|
||||
@@ -26,6 +28,22 @@ require (
|
||||
|
||||
require (
|
||||
github.com/andybalholm/brotli v1.1.1 // indirect
|
||||
github.com/aws/aws-sdk-go-v2 v1.36.3 // indirect
|
||||
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.10 // indirect
|
||||
github.com/aws/aws-sdk-go-v2/credentials v1.17.65 // indirect
|
||||
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.30 // indirect
|
||||
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.34 // indirect
|
||||
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.34 // indirect
|
||||
github.com/aws/aws-sdk-go-v2/internal/ini v1.8.3 // indirect
|
||||
github.com/aws/aws-sdk-go-v2/internal/v4a v1.3.34 // indirect
|
||||
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.12.3 // indirect
|
||||
github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.7.0 // indirect
|
||||
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.12.15 // indirect
|
||||
github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.18.15 // indirect
|
||||
github.com/aws/aws-sdk-go-v2/service/sso v1.25.2 // indirect
|
||||
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.30.0 // indirect
|
||||
github.com/aws/aws-sdk-go-v2/service/sts v1.33.17 // indirect
|
||||
github.com/aws/smithy-go v1.22.2 // indirect
|
||||
github.com/beorn7/perks v1.0.1 // indirect
|
||||
github.com/davecgh/go-spew v1.1.1 // indirect
|
||||
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
|
||||
|
||||
@@ -8,6 +8,42 @@ github.com/PowerDNS/lmdb-go v1.9.3 h1:AUMY2pZT8WRpkEv39I9Id3MuoHd+NZbTVpNhruVkPT
|
||||
github.com/PowerDNS/lmdb-go v1.9.3/go.mod h1:TE0l+EZK8Z1B4dx070ZxkWTlp8RG1mjN0/+FkFRQMtU=
|
||||
github.com/andybalholm/brotli v1.1.1 h1:PR2pgnyFznKEugtsUo0xLdDop5SKXd5Qf5ysW+7XdTA=
|
||||
github.com/andybalholm/brotli v1.1.1/go.mod h1:05ib4cKhjx3OQYUY22hTVd34Bc8upXjOLL2rKwwZBoA=
|
||||
github.com/aws/aws-sdk-go-v2 v1.36.3 h1:mJoei2CxPutQVxaATCzDUjcZEjVRdpsiiXi2o38yqWM=
|
||||
github.com/aws/aws-sdk-go-v2 v1.36.3/go.mod h1:LLXuLpgzEbD766Z5ECcRmi8AzSwfZItDtmABVkRLGzg=
|
||||
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.10 h1:zAybnyUQXIZ5mok5Jqwlf58/TFE7uvd3IAsa1aF9cXs=
|
||||
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.10/go.mod h1:qqvMj6gHLR/EXWZw4ZbqlPbQUyenf4h82UQUlKc+l14=
|
||||
github.com/aws/aws-sdk-go-v2/config v1.29.12 h1:Y/2a+jLPrPbHpFkpAAYkVEtJmxORlXoo5k2g1fa2sUo=
|
||||
github.com/aws/aws-sdk-go-v2/config v1.29.12/go.mod h1:xse1YTjmORlb/6fhkWi8qJh3cvZi4JoVNhc+NbJt4kI=
|
||||
github.com/aws/aws-sdk-go-v2/credentials v1.17.65 h1:q+nV2yYegofO/SUXruT+pn4KxkxmaQ++1B/QedcKBFM=
|
||||
github.com/aws/aws-sdk-go-v2/credentials v1.17.65/go.mod h1:4zyjAuGOdikpNYiSGpsGz8hLGmUzlY8pc8r9QQ/RXYQ=
|
||||
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.30 h1:x793wxmUWVDhshP8WW2mlnXuFrO4cOd3HLBroh1paFw=
|
||||
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.30/go.mod h1:Jpne2tDnYiFascUEs2AWHJL9Yp7A5ZVy3TNyxaAjD6M=
|
||||
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.34 h1:ZK5jHhnrioRkUNOc+hOgQKlUL5JeC3S6JgLxtQ+Rm0Q=
|
||||
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.34/go.mod h1:p4VfIceZokChbA9FzMbRGz5OV+lekcVtHlPKEO0gSZY=
|
||||
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.34 h1:SZwFm17ZUNNg5Np0ioo/gq8Mn6u9w19Mri8DnJ15Jf0=
|
||||
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.34/go.mod h1:dFZsC0BLo346mvKQLWmoJxT+Sjp+qcVR1tRVHQGOH9Q=
|
||||
github.com/aws/aws-sdk-go-v2/internal/ini v1.8.3 h1:bIqFDwgGXXN1Kpp99pDOdKMTTb5d2KyU5X/BZxjOkRo=
|
||||
github.com/aws/aws-sdk-go-v2/internal/ini v1.8.3/go.mod h1:H5O/EsxDWyU+LP/V8i5sm8cxoZgc2fdNR9bxlOFrQTo=
|
||||
github.com/aws/aws-sdk-go-v2/internal/v4a v1.3.34 h1:ZNTqv4nIdE/DiBfUUfXcLZ/Spcuz+RjeziUtNJackkM=
|
||||
github.com/aws/aws-sdk-go-v2/internal/v4a v1.3.34/go.mod h1:zf7Vcd1ViW7cPqYWEHLHJkS50X0JS2IKz9Cgaj6ugrs=
|
||||
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.12.3 h1:eAh2A4b5IzM/lum78bZ590jy36+d/aFLgKF/4Vd1xPE=
|
||||
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.12.3/go.mod h1:0yKJC/kb8sAnmlYa6Zs3QVYqaC8ug2AbnNChv5Ox3uA=
|
||||
github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.7.0 h1:lguz0bmOoGzozP9XfRJR1QIayEYo+2vP/No3OfLF0pU=
|
||||
github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.7.0/go.mod h1:iu6FSzgt+M2/x3Dk8zhycdIcHjEFb36IS8HVUVFoMg0=
|
||||
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.12.15 h1:dM9/92u2F1JbDaGooxTq18wmmFzbJRfXfVfy96/1CXM=
|
||||
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.12.15/go.mod h1:SwFBy2vjtA0vZbjjaFtfN045boopadnoVPhu4Fv66vY=
|
||||
github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.18.15 h1:moLQUoVq91LiqT1nbvzDukyqAlCv89ZmwaHw/ZFlFZg=
|
||||
github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.18.15/go.mod h1:ZH34PJUc8ApjBIfgQCFvkWcUDBtl/WTD+uiYHjd8igA=
|
||||
github.com/aws/aws-sdk-go-v2/service/s3 v1.79.0 h1:OIw2nryEApESTYI5deCZGcq4Gvz8DBAt4tJlNyg3v5o=
|
||||
github.com/aws/aws-sdk-go-v2/service/s3 v1.79.0/go.mod h1:U5SNqwhXB3Xe6F47kXvWihPl/ilGaEDe8HD/50Z9wxc=
|
||||
github.com/aws/aws-sdk-go-v2/service/sso v1.25.2 h1:pdgODsAhGo4dvzC3JAG5Ce0PX8kWXrTZGx+jxADD+5E=
|
||||
github.com/aws/aws-sdk-go-v2/service/sso v1.25.2/go.mod h1:qs4a9T5EMLl/Cajiw2TcbNt2UNo/Hqlyp+GiuG4CFDI=
|
||||
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.30.0 h1:90uX0veLKcdHVfvxhkWUQSCi5VabtwMLFutYiRke4oo=
|
||||
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.30.0/go.mod h1:MlYRNmYu/fGPoxBQVvBYr9nyr948aY/WLUvwBMBJubs=
|
||||
github.com/aws/aws-sdk-go-v2/service/sts v1.33.17 h1:PZV5W8yk4OtH1JAuhV2PXwwO9v5G5Aoj+eMCn4T+1Kc=
|
||||
github.com/aws/aws-sdk-go-v2/service/sts v1.33.17/go.mod h1:cQnB8CUnxbMU82JvlqjKR2HBOm3fe9pWorWBza6MBJ4=
|
||||
github.com/aws/smithy-go v1.22.2 h1:6D9hW43xKFrRx/tXXfAlIZc4JI+yQe6snnWcQyxSyLQ=
|
||||
github.com/aws/smithy-go v1.22.2/go.mod h1:irrKGvNn1InZwb2d7fkIRNucdfwR8R+Ts3wxYa/cJHg=
|
||||
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
|
||||
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
|
||||
github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs=
|
||||
|
||||
@@ -0,0 +1,202 @@
|
||||
package s3
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/sha1"
|
||||
"crypto/sha256"
|
||||
"fmt"
|
||||
"io"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/aws/aws-sdk-go-v2/aws"
|
||||
"github.com/aws/aws-sdk-go-v2/config"
|
||||
awss3 "github.com/aws/aws-sdk-go-v2/service/s3"
|
||||
"github.com/sot-tech/mochi/bittorrent"
|
||||
"github.com/sot-tech/mochi/middleware/torrentapproval/container"
|
||||
"github.com/sot-tech/mochi/middleware/torrentapproval/container/list"
|
||||
"github.com/sot-tech/mochi/pkg/conf"
|
||||
"github.com/sot-tech/mochi/pkg/log"
|
||||
"github.com/sot-tech/mochi/pkg/str2bytes"
|
||||
"github.com/sot-tech/mochi/storage"
|
||||
"github.com/zeebo/bencode"
|
||||
)
|
||||
|
||||
var logger = log.NewLogger("middleware/torrent approval/s3")
|
||||
|
||||
const (
|
||||
defaultPeriod = time.Minute
|
||||
maxTorrentSize = 10 * 1024 * 1024
|
||||
)
|
||||
|
||||
// Config - implementation of directory container configuration.
|
||||
// Extends list.Config because uses the same storage and Approved function.
|
||||
type Config struct {
|
||||
list.Config
|
||||
Bucket string
|
||||
Path string
|
||||
Period time.Duration
|
||||
}
|
||||
|
||||
type s3 struct {
|
||||
list.List
|
||||
closed chan bool
|
||||
}
|
||||
|
||||
func init() {
|
||||
container.Register("s3", build)
|
||||
}
|
||||
|
||||
func build(conf conf.MapConfig, st storage.DataStorage) (container.Container, error) {
|
||||
c := new(Config)
|
||||
if err := conf.Unmarshal(c); err != nil {
|
||||
return nil, fmt.Errorf("unable to deserialise configuration: %w", err)
|
||||
}
|
||||
var err error
|
||||
s := &s3{
|
||||
List: list.List{
|
||||
Invert: c.Invert,
|
||||
Storage: st,
|
||||
StorageCtx: c.StorageCtx,
|
||||
},
|
||||
closed: make(chan bool),
|
||||
}
|
||||
if len(s.StorageCtx) == 0 {
|
||||
logger.Warn().
|
||||
Str("name", "StorageCtx").
|
||||
Str("provided", s.StorageCtx).
|
||||
Str("default", container.DefaultStorageCtxName).
|
||||
Msg("falling back to default configuration")
|
||||
s.StorageCtx = container.DefaultStorageCtxName
|
||||
}
|
||||
if c.Period == 0 {
|
||||
logger.Warn().
|
||||
Str("name", "Period").
|
||||
Dur("provided", 0).
|
||||
Dur("default", defaultPeriod).
|
||||
Msg("falling back to default configuration")
|
||||
c.Period = defaultPeriod
|
||||
}
|
||||
|
||||
ctx := context.Background()
|
||||
sdkConfig, err := config.LoadDefaultConfig(ctx)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("unable load aws sdk configuration: %w", err)
|
||||
}
|
||||
s3Client := awss3.NewFromConfig(sdkConfig)
|
||||
|
||||
go s.runScan(ctx, c.Bucket, c.Path, s3Client, c.Period)
|
||||
return s, err
|
||||
}
|
||||
|
||||
// BencodeRawBytes wrapper for byte slice to get raw 'info' section from
|
||||
// torrent file
|
||||
type BencodeRawBytes []byte
|
||||
|
||||
// UnmarshalBencode just appends raw byte slice to result
|
||||
func (ba *BencodeRawBytes) UnmarshalBencode(in []byte) error {
|
||||
*ba = append([]byte(nil), in...)
|
||||
return nil
|
||||
}
|
||||
|
||||
type torrentRawInfoStruct struct {
|
||||
Info BencodeRawBytes `bencode:"info"`
|
||||
}
|
||||
|
||||
type torrentNameInfoStruct struct {
|
||||
Name string `bencode:"name"`
|
||||
}
|
||||
|
||||
func (s *s3) runScan(ctx context.Context, bucket, prefix string, s3Client *awss3.Client, period time.Duration) {
|
||||
t := time.NewTicker(period)
|
||||
defer t.Stop()
|
||||
files := make(map[string][2]bittorrent.InfoHash)
|
||||
tmpFiles := make(map[string]bool)
|
||||
// nolint:gosec
|
||||
s1, s2 := sha1.New(), sha256.New()
|
||||
for {
|
||||
select {
|
||||
case <-s.closed:
|
||||
return
|
||||
case <-t.C:
|
||||
logger.Debug().Msg("starting directory scan")
|
||||
listObj := &awss3.ListObjectsV2Input{Bucket: &bucket, Prefix: &prefix}
|
||||
if entries, err := s3Client.ListObjectsV2(ctx, listObj); err == nil {
|
||||
for _, e := range entries.Contents {
|
||||
if strings.ToLower(filepath.Ext(*e.Key)) == ".torrent" {
|
||||
tmpFiles[filepath.Join(prefix, *e.Key)] = true
|
||||
}
|
||||
}
|
||||
for p := range tmpFiles {
|
||||
if _, exists := files[p]; !exists {
|
||||
requestInput := &awss3.GetObjectInput{
|
||||
Bucket: aws.String(bucket),
|
||||
Key: aws.String(p),
|
||||
}
|
||||
|
||||
result, err := s3Client.GetObject(ctx, requestInput)
|
||||
if err != nil {
|
||||
log.Print(err)
|
||||
}
|
||||
var info torrentRawInfoStruct
|
||||
err = bencode.NewDecoder(io.LimitReader(result.Body, maxTorrentSize)).Decode(&info)
|
||||
if err == nil {
|
||||
s1.Write(info.Info)
|
||||
h1, _ := bittorrent.NewInfoHash(s1.Sum(nil))
|
||||
s1.Reset()
|
||||
|
||||
s2.Write(info.Info)
|
||||
h2, _ := bittorrent.NewInfoHash(s2.Sum(nil))
|
||||
s2.Reset()
|
||||
|
||||
files[p] = [2]bittorrent.InfoHash{h1, h2}
|
||||
var name torrentNameInfoStruct
|
||||
if err := bencode.DecodeBytes(info.Info, &name); err != nil {
|
||||
logger.Warn().
|
||||
Err(err).
|
||||
Str("file", p).
|
||||
Msg("unable to unmarshal torrent info")
|
||||
}
|
||||
if len(name.Name) == 0 {
|
||||
name.Name = list.DUMMY
|
||||
}
|
||||
bName := str2bytes.StringToBytes(name.Name)
|
||||
logger.Err(s.Storage.Put(ctx, s.StorageCtx, storage.Entry{
|
||||
Key: h1.RawString(),
|
||||
Value: bName,
|
||||
}, storage.Entry{
|
||||
Key: h2.RawString(),
|
||||
Value: bName,
|
||||
}, storage.Entry{
|
||||
Key: h2.TruncateV1().RawString(),
|
||||
Value: bName,
|
||||
})).
|
||||
Str("file", p).
|
||||
Stringer("infoHash", h1).
|
||||
Stringer("infoHashV2", h2).
|
||||
Msg("added torrent to approval list")
|
||||
}
|
||||
}
|
||||
if err != nil {
|
||||
logger.Warn().Err(err).Str("file", p).Msg("unable to read file")
|
||||
}
|
||||
}
|
||||
for p, ih := range files {
|
||||
if _, isOk := tmpFiles[p]; !isOk {
|
||||
delete(files, p)
|
||||
logger.Err(s.Storage.Delete(ctx, s.StorageCtx, ih[0].RawString(),
|
||||
ih[1].RawString(), ih[1].TruncateV1().RawString())).
|
||||
Str("file", p).
|
||||
Stringer("infoHash", ih[1]).
|
||||
Stringer("infoHashV2", ih[1]).
|
||||
Msg("deleted torrent from approval list")
|
||||
}
|
||||
}
|
||||
clear(tmpFiles)
|
||||
} else {
|
||||
logger.Warn().Err(err).Msg("unable to get directory content")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -16,6 +16,7 @@ import (
|
||||
|
||||
// import directory watcher to enable appropriate support
|
||||
_ "github.com/sot-tech/mochi/middleware/torrentapproval/container/directory"
|
||||
_ "github.com/sot-tech/mochi/middleware/torrentapproval/container/s3"
|
||||
|
||||
// import static list to enable appropriate support
|
||||
_ "github.com/sot-tech/mochi/middleware/torrentapproval/container/list"
|
||||
|
||||
Reference in New Issue
Block a user