WIP Add support for custom torrents' approval storages

* migrate torrentapproval to list storage
* add initial support for torrent file storage (watch directory with fsnotify)
* replace frontend/http/bencode package with github.com/zeebo/bencode module
* sanitize code (fix warnings)

TODO:
* parse torrent files to get hashes,
* watch directory event types

DON'T use for now
This commit is contained in:
Širhoe Biazhkovič
2021-09-04 01:45:34 +03:00
parent d57c348b6c
commit 8580bb37e0
22 changed files with 309 additions and 644 deletions

View File

@@ -24,7 +24,7 @@ func PeerIDFromBytes(b []byte) PeerID {
var buf [20]byte
copy(buf[:], b)
return PeerID(buf)
return buf
}
// String implements fmt.Stringer, returning the base16 encoded PeerID.
@@ -47,7 +47,7 @@ func PeerIDFromString(s string) PeerID {
var buf [20]byte
copy(buf[:], s)
return PeerID(buf)
return buf
}
// InfoHash represents an infohash.
@@ -63,7 +63,7 @@ func InfoHashFromBytes(b []byte) InfoHash {
var buf [20]byte
copy(buf[:], b)
return InfoHash(buf)
return buf
}
// InfoHashFromString creates an InfoHash from a string.
@@ -76,7 +76,7 @@ func InfoHashFromString(s string) InfoHash {
var buf [20]byte
copy(buf[:], s)
return InfoHash(buf)
return buf
}
// String implements fmt.Stringer, returning the base16 encoded InfoHash.

View File

@@ -33,9 +33,9 @@ type Config struct {
MetricsAddr string `yaml:"metrics_addr"`
HTTPConfig http.Config `yaml:"http"`
UDPConfig udp.Config `yaml:"udp"`
Storage storageConfig `yaml:"storage"`
PreHooks []middleware.HookConfig `yaml:"prehooks"`
PostHooks []middleware.HookConfig `yaml:"posthooks"`
Storage storageConfig `yaml:"storage"`
PreHooks []middleware.Config `yaml:"prehooks"`
PostHooks []middleware.Config `yaml:"posthooks"`
}
// PreHookNames returns only the names of the configured middleware.

View File

@@ -50,8 +50,12 @@ func (r *Run) Start(ps storage.PeerStore) error {
r.sg = stop.NewGroup()
log.Info("starting metrics server", log.Fields{"addr": cfg.MetricsAddr})
r.sg.Add(metrics.NewServer(cfg.MetricsAddr))
if len(cfg.MetricsAddr) > 0 {
log.Info("starting metrics server", log.Fields{"addr": cfg.MetricsAddr})
r.sg.Add(metrics.NewServer(cfg.MetricsAddr))
} else {
log.Info("metrics disabled because of empty address")
}
if ps == nil {
log.Info("starting storage", log.Fields{"name": cfg.Storage.Name})

View File

@@ -1,41 +0,0 @@
// Package bencode implements bencoding of data as defined in BEP 3 using
// type assertion over reflection for performance.
package bencode
import "bytes"
// Enforce that Dict implements the Marshaler interface.
var _ Marshaler = Dict{}
// Dict represents a bencode dictionary.
type Dict map[string]interface{}
// NewDict allocates the memory for a Dict.
func NewDict() Dict {
return make(Dict)
}
// MarshalBencode implements the Marshaler interface for Dict.
func (d Dict) MarshalBencode() ([]byte, error) {
var buf bytes.Buffer
err := marshalMap(&buf, map[string]interface{}(d))
return buf.Bytes(), err
}
// Enforce that List implements the Marshaler interface.
var _ Marshaler = List{}
// List represents a bencode list.
type List []interface{}
// MarshalBencode implements the Marshaler interface for List.
func (l List) MarshalBencode() ([]byte, error) {
var buf bytes.Buffer
err := marshalList(&buf, []interface{}(l))
return buf.Bytes(), err
}
// NewList allocates the memory for a List.
func NewList() List {
return make(List, 0)
}

View File

@@ -1,141 +0,0 @@
package bencode
import (
"bufio"
"bytes"
"errors"
"io"
"strconv"
)
// A Decoder reads bencoded objects from an input stream.
type Decoder struct {
r *bufio.Reader
}
// NewDecoder returns a new decoder that reads from r.
func NewDecoder(r io.Reader) *Decoder {
return &Decoder{r: bufio.NewReader(r)}
}
// Decode unmarshals the next bencoded value in the stream.
func (dec *Decoder) Decode() (interface{}, error) {
return unmarshal(dec.r)
}
// Unmarshal deserializes and returns the bencoded value in buf.
func Unmarshal(buf []byte) (interface{}, error) {
r := bufio.NewReader(bytes.NewBuffer(buf))
return unmarshal(r)
}
// unmarshal reads bencoded values from a bufio.Reader
func unmarshal(r *bufio.Reader) (interface{}, error) {
tok, err := r.ReadByte()
if err != nil {
return nil, err
}
switch tok {
case 'i':
return readTerminatedInt(r, 'e')
case 'l':
return readList(r)
case 'd':
return readDict(r)
default:
err = r.UnreadByte()
if err != nil {
return nil, err
}
length, err := readTerminatedInt(r, ':')
if err != nil {
return nil, errors.New("bencode: unknown input sequence")
}
buf := make([]byte, length)
n, err := r.Read(buf)
if err != nil {
return nil, err
} else if int64(n) != length {
return nil, errors.New("bencode: short read")
}
return string(buf), nil
}
}
func readTerminator(r io.ByteScanner, term byte) (bool, error) {
tok, err := r.ReadByte()
if err != nil {
return false, err
} else if tok == term {
return true, nil
}
return false, r.UnreadByte()
}
func readTerminatedInt(r *bufio.Reader, term byte) (int64, error) {
buf, err := r.ReadSlice(term)
if err != nil {
return 0, err
} else if len(buf) <= 1 {
return 0, errors.New("bencode: empty integer field")
}
return strconv.ParseInt(string(buf[:len(buf)-1]), 10, 64)
}
func readList(r *bufio.Reader) (List, error) {
list := NewList()
for {
ok, err := readTerminator(r, 'e')
if err != nil {
return nil, err
} else if ok {
break
}
v, err := unmarshal(r)
if err != nil {
return nil, err
}
list = append(list, v)
}
return list, nil
}
func readDict(r *bufio.Reader) (Dict, error) {
dict := NewDict()
for {
ok, err := readTerminator(r, 'e')
if err != nil {
return nil, err
} else if ok {
break
}
v, err := unmarshal(r)
if err != nil {
return nil, err
}
key, ok := v.(string)
if !ok {
return nil, errors.New("bencode: non-string map key")
}
dict[key], err = unmarshal(r)
if err != nil {
return nil, err
}
}
return dict, nil
}

View File

@@ -1,84 +0,0 @@
package bencode
import (
"testing"
"github.com/stretchr/testify/require"
)
var unmarshalTests = []struct {
input string
expected interface{}
}{
{"i42e", int64(42)},
{"i-42e", int64(-42)},
{"7:example", "example"},
{"l3:one3:twoe", List{"one", "two"}},
{"le", List{}},
{"d3:one2:aa3:two2:bbe", Dict{"one": "aa", "two": "bb"}},
{"de", Dict{}},
}
func TestUnmarshal(t *testing.T) {
for _, tt := range unmarshalTests {
t.Run(tt.input, func(t *testing.T) {
got, err := Unmarshal([]byte(tt.input))
require.Nil(t, err, "unmarshal should not fail")
require.Equal(t, got, tt.expected, "unmarshalled values should match the expected results")
})
}
}
type bufferLoop struct {
val string
}
func (r *bufferLoop) Read(b []byte) (int, error) {
n := copy(b, r.val)
return n, nil
}
func BenchmarkUnmarshalScalar(b *testing.B) {
d1 := NewDecoder(&bufferLoop{"7:example"})
d2 := NewDecoder(&bufferLoop{"i42e"})
for i := 0; i < b.N; i++ {
d1.Decode()
d2.Decode()
}
}
func TestUnmarshalLarge(t *testing.T) {
data := Dict{
"k1": List{"a", "b", "c"},
"k2": int64(42),
"k3": "val",
"k4": int64(-42),
}
buf, _ := Marshal(data)
dec := NewDecoder(&bufferLoop{string(buf)})
got, err := dec.Decode()
require.Nil(t, err, "decode should not fail")
require.Equal(t, got, data, "encoding and decoding should equal the original value")
}
func BenchmarkUnmarshalLarge(b *testing.B) {
data := map[string]interface{}{
"k1": []string{"a", "b", "c"},
"k2": 42,
"k3": "val",
"k4": uint(42),
}
buf, _ := Marshal(data)
dec := NewDecoder(&bufferLoop{string(buf)})
for i := 0; i < b.N; i++ {
dec.Decode()
}
}

View File

@@ -1,196 +0,0 @@
package bencode
import (
"bytes"
"fmt"
"io"
"strconv"
"time"
)
// An Encoder writes bencoded objects to an output stream.
type Encoder struct {
w io.Writer
}
// NewEncoder returns a new encoder that writes to w.
func NewEncoder(w io.Writer) *Encoder {
return &Encoder{w: w}
}
// Encode writes the bencoding of v to the stream.
func (enc *Encoder) Encode(v interface{}) error {
return marshal(enc.w, v)
}
// Marshal returns the bencoding of v.
func Marshal(v interface{}) ([]byte, error) {
var buf bytes.Buffer
err := marshal(&buf, v)
return buf.Bytes(), err
}
// Marshaler is the interface implemented by objects that can marshal
// themselves.
type Marshaler interface {
MarshalBencode() ([]byte, error)
}
// marshal writes types bencoded to an io.Writer.
func marshal(w io.Writer, data interface{}) (err error) {
switch v := data.(type) {
case Marshaler:
var bencoded []byte
bencoded, err = v.MarshalBencode()
if err != nil {
return err
}
_, err = w.Write(bencoded)
case []byte:
err = marshalBytes(w, v)
case string:
err = marshalString(w, v)
case []string:
err = marshalStringSlice(w, v)
case int:
err = marshalInt(w, int64(v))
case int16:
err = marshalInt(w, int64(v))
case int32:
err = marshalInt(w, int64(v))
case int64:
err = marshalInt(w, int64(v))
case uint:
err = marshalUint(w, uint64(v))
case uint16:
err = marshalUint(w, uint64(v))
case uint32:
err = marshalUint(w, uint64(v))
case uint64:
err = marshalUint(w, uint64(v))
case time.Duration: // Assume seconds
err = marshalInt(w, int64(v/time.Second))
case map[string]interface{}:
err = marshalMap(w, v)
case []interface{}:
err = marshalList(w, v)
case []Dict:
var interfaceSlice = make([]interface{}, len(v))
for i, d := range v {
interfaceSlice[i] = d
}
err = marshalList(w, interfaceSlice)
default:
return fmt.Errorf("attempted to marshal unsupported type:\n%T", v)
}
return err
}
func marshalInt(w io.Writer, v int64) error {
if _, err := w.Write([]byte{'i'}); err != nil {
return err
}
if _, err := w.Write([]byte(strconv.FormatInt(v, 10))); err != nil {
return err
}
_, err := w.Write([]byte{'e'})
return err
}
func marshalUint(w io.Writer, v uint64) error {
if _, err := w.Write([]byte{'i'}); err != nil {
return err
}
if _, err := w.Write([]byte(strconv.FormatUint(v, 10))); err != nil {
return err
}
_, err := w.Write([]byte{'e'})
return err
}
func marshalBytes(w io.Writer, v []byte) error {
if _, err := w.Write([]byte(strconv.Itoa(len(v)))); err != nil {
return err
}
if _, err := w.Write([]byte{':'}); err != nil {
return err
}
_, err := w.Write(v)
return err
}
func marshalString(w io.Writer, v string) error {
return marshalBytes(w, []byte(v))
}
func marshalStringSlice(w io.Writer, v []string) error {
if _, err := w.Write([]byte{'l'}); err != nil {
return err
}
for _, val := range v {
if err := marshal(w, val); err != nil {
return err
}
}
_, err := w.Write([]byte{'e'})
return err
}
func marshalList(w io.Writer, v []interface{}) error {
if _, err := w.Write([]byte{'l'}); err != nil {
return err
}
for _, val := range v {
if err := marshal(w, val); err != nil {
return err
}
}
_, err := w.Write([]byte{'e'})
return err
}
func marshalMap(w io.Writer, v map[string]interface{}) error {
if _, err := w.Write([]byte{'d'}); err != nil {
return err
}
for key, val := range v {
if err := marshalString(w, key); err != nil {
return err
}
if err := marshal(w, val); err != nil {
return err
}
}
_, err := w.Write([]byte{'e'})
return err
}

View File

@@ -1,72 +0,0 @@
package bencode
import (
"bytes"
"fmt"
"testing"
"time"
"github.com/stretchr/testify/require"
)
var marshalTests = []struct {
input interface{}
expected []string
}{
{int(42), []string{"i42e"}},
{int(-42), []string{"i-42e"}},
{uint(43), []string{"i43e"}},
{int64(44), []string{"i44e"}},
{uint64(45), []string{"i45e"}},
{int16(44), []string{"i44e"}},
{uint16(45), []string{"i45e"}},
{"example", []string{"7:example"}},
{[]byte("example"), []string{"7:example"}},
{30 * time.Minute, []string{"i1800e"}},
{[]string{"one", "two"}, []string{"l3:one3:twoe", "l3:two3:onee"}},
{[]interface{}{"one", "two"}, []string{"l3:one3:twoe", "l3:two3:onee"}},
{[]string{}, []string{"le"}},
{map[string]interface{}{"one": "aa", "two": "bb"}, []string{"d3:one2:aa3:two2:bbe", "d3:two2:bb3:one2:aae"}},
{map[string]interface{}{}, []string{"de"}},
{[]Dict{{"a": "b"}, {"c": "d"}}, []string{"ld1:a1:bed1:c1:dee", "ld1:c1:ded1:a1:bee"}},
}
func TestMarshal(t *testing.T) {
for _, tt := range marshalTests {
t.Run(fmt.Sprintf("%#v", tt.input), func(t *testing.T) {
got, err := Marshal(tt.input)
require.Nil(t, err, "marshal should not fail")
require.Contains(t, tt.expected, string(got), "the marshaled result should be one of the expected permutations")
})
}
}
func BenchmarkMarshalScalar(b *testing.B) {
buf := &bytes.Buffer{}
encoder := NewEncoder(buf)
for i := 0; i < b.N; i++ {
encoder.Encode("test")
encoder.Encode(123)
}
}
func BenchmarkMarshalLarge(b *testing.B) {
data := map[string]interface{}{
"k1": []string{"a", "b", "c"},
"k2": 42,
"k3": "val",
"k4": uint(42),
}
buf := &bytes.Buffer{}
encoder := NewEncoder(buf)
for i := 0; i < b.N; i++ {
encoder.Encode(data)
}
}

View File

@@ -1,13 +1,15 @@
package http
import (
"github.com/zeebo/bencode"
"net/http"
"github.com/chihaya/chihaya/bittorrent"
"github.com/chihaya/chihaya/frontend/http/bencode"
"github.com/chihaya/chihaya/pkg/log"
)
type strMap map[string]interface{}
// WriteError communicates an error to a BitTorrent client over HTTP.
func WriteError(w http.ResponseWriter, err error) error {
message := "internal server error"
@@ -18,7 +20,7 @@ func WriteError(w http.ResponseWriter, err error) error {
}
w.WriteHeader(http.StatusOK)
return bencode.NewEncoder(w).Encode(bencode.Dict{
return bencode.NewEncoder(w).Encode(map[string]interface{}{
"failure reason": message,
})
}
@@ -26,7 +28,7 @@ func WriteError(w http.ResponseWriter, err error) error {
// WriteAnnounceResponse communicates the results of an Announce to a
// BitTorrent client over HTTP.
func WriteAnnounceResponse(w http.ResponseWriter, resp *bittorrent.AnnounceResponse) error {
bdict := bencode.Dict{
bdict := strMap{
"complete": resp.Complete,
"incomplete": resp.Incomplete,
"interval": resp.Interval,
@@ -57,7 +59,7 @@ func WriteAnnounceResponse(w http.ResponseWriter, resp *bittorrent.AnnounceRespo
}
// Add the peers to the dictionary.
var peers []bencode.Dict
var peers []strMap
for _, peer := range resp.IPv4Peers {
peers = append(peers, dict(peer))
}
@@ -72,15 +74,15 @@ func WriteAnnounceResponse(w http.ResponseWriter, resp *bittorrent.AnnounceRespo
// WriteScrapeResponse communicates the results of a Scrape to a BitTorrent
// client over HTTP.
func WriteScrapeResponse(w http.ResponseWriter, resp *bittorrent.ScrapeResponse) error {
filesDict := bencode.NewDict()
filesDict := make(strMap)
for _, scrape := range resp.Files {
filesDict[string(scrape.InfoHash[:])] = bencode.Dict{
filesDict[string(scrape.InfoHash[:])] = strMap{
"complete": scrape.Complete,
"incomplete": scrape.Incomplete,
}
}
return bencode.NewEncoder(w).Encode(bencode.Dict{
return bencode.NewEncoder(w).Encode(strMap{
"files": filesDict,
})
}
@@ -107,8 +109,8 @@ func compact6(peer bittorrent.Peer) (buf []byte) {
return
}
func dict(peer bittorrent.Peer) bencode.Dict {
return bencode.Dict{
func dict(peer bittorrent.Peer) strMap {
return strMap{
"peer id": string(peer.ID[:]),
"ip": peer.IP.String(),
"port": peer.Port,

2
go.mod
View File

@@ -7,6 +7,7 @@ require (
github.com/alicebob/gopher-json v0.0.0-20180125190556-5a6b3ba71ee6 // indirect
github.com/alicebob/miniredis v2.5.0+incompatible
github.com/anacrolix/torrent v1.28.0
github.com/fsnotify/fsnotify v1.4.9
github.com/go-redsync/redsync v1.4.2
github.com/gomodule/redigo v2.0.0+incompatible
github.com/julienschmidt/httprouter v1.3.0
@@ -18,5 +19,6 @@ require (
github.com/spf13/cobra v1.1.3
github.com/stretchr/testify v1.7.0
github.com/yuin/gopher-lua v0.0.0-20191220021717-ab39c6098bdb // indirect
github.com/zeebo/bencode v1.0.0
gopkg.in/yaml.v2 v2.4.0
)

3
go.sum
View File

@@ -223,6 +223,7 @@ github.com/franela/goreq v0.0.0-20171204163338-bcd34c9993f8/go.mod h1:ZhphrRTfi2
github.com/frankban/quicktest v1.9.0/go.mod h1:ui7WezCLWMWxVWr1GETZY3smRy0G4KWq9vcPtJmFl7Y=
github.com/frankban/quicktest v1.11.3/go.mod h1:wRf/ReqHper53s+kmmSZizM8NamnL3IM0I9ntUbOk+k=
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4=
github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ=
github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04=
github.com/gliderlabs/ssh v0.1.1/go.mod h1:U7qILu1NlMHj9FlMhZLlkCdDnU1DBEAqr0aevW3Awn0=
@@ -693,6 +694,8 @@ github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/gopher-lua v0.0.0-20191220021717-ab39c6098bdb h1:ZkM6LRnq40pR1Ox0hTHlnpkcOTuFIDQpZ1IN8rKKhX0=
github.com/yuin/gopher-lua v0.0.0-20191220021717-ab39c6098bdb/go.mod h1:gqRgreBUhTSL0GeU64rtZ3Uq3wtjOa/TB2YfrtkCbVQ=
github.com/zeebo/bencode v1.0.0 h1:zgop0Wu1nu4IexAZeCZ5qbsjU4O1vMrfCrVgUjbHVuA=
github.com/zeebo/bencode v1.0.0/go.mod h1:Ct7CkrWIQuLWAy9M3atFHYq4kG9Ao/SsY5cdtCXmp9Y=
go.etcd.io/bbolt v1.3.2/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU=
go.etcd.io/bbolt v1.3.3/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU=
go.etcd.io/bbolt v1.3.4/go.mod h1:G5EMThwa9y8QZGBClrRx5EY+Yw9kAhnjy3bSjsnlVTQ=

View File

@@ -6,8 +6,7 @@ import (
"context"
"errors"
"fmt"
yaml "gopkg.in/yaml.v2"
"gopkg.in/yaml.v2"
"github.com/chihaya/chihaya/bittorrent"
"github.com/chihaya/chihaya/middleware"

View File

@@ -2,7 +2,6 @@ package middleware
import (
"context"
"github.com/chihaya/chihaya/bittorrent"
"github.com/chihaya/chihaya/storage"
)

View File

@@ -119,15 +119,13 @@ func (h *hook) updateKeys() error {
log.Error("failed to fetch JWK Set", log.Err(err))
return err
}
defer resp.Body.Close()
var parsedJWKs gojwk.Key
err = json.NewDecoder(resp.Body).Decode(&parsedJWKs)
if err != nil {
resp.Body.Close()
log.Error("failed to decode JWK JSON", log.Err(err))
return err
}
resp.Body.Close()
keys := map[string]crypto.PublicKey{}
for _, parsedJWK := range parsedJWKs.Keys {

View File

@@ -65,14 +65,14 @@ func New(name string, optionBytes []byte) (Hook, error) {
return d.NewHook(optionBytes)
}
// HookConfig is the generic configuration format used for all registered Hooks.
type HookConfig struct {
// Config is the generic configuration format used for all registered Hooks.
type Config struct {
Name string `yaml:"name"`
Options map[string]interface{} `yaml:"options"`
}
// HooksFromHookConfigs is a utility function for initializing Hooks in bulk.
func HooksFromHookConfigs(cfgs []HookConfig) (hooks []Hook, err error) {
func HooksFromHookConfigs(cfgs []Config) (hooks []Hook, err error) {
for _, cfg := range cfgs {
// Marshal the options back into bytes.
var optionBytes []byte

View File

@@ -11,7 +11,7 @@ import (
//
// Calling DeriveEntropyFromRequest multiple times yields the same values.
func DeriveEntropyFromRequest(req *bittorrent.AnnounceRequest) (uint64, uint64) {
v0 := binary.BigEndian.Uint64([]byte(req.InfoHash[:8])) + binary.BigEndian.Uint64([]byte(req.InfoHash[8:16]))
v1 := binary.BigEndian.Uint64([]byte(req.Peer.ID[:8])) + binary.BigEndian.Uint64([]byte(req.Peer.ID[8:16]))
v0 := binary.BigEndian.Uint64(req.InfoHash[:8]) + binary.BigEndian.Uint64(req.InfoHash[8:16])
v1 := binary.BigEndian.Uint64(req.Peer.ID[:8]) + binary.BigEndian.Uint64(req.Peer.ID[8:16])
return v0, v1
}

View File

@@ -7,7 +7,7 @@ package random
func GenerateAndAdvance(s0, s1 uint64) (v, newS0, newS1 uint64) {
v = s0 + s1
newS0 = s1
s0 ^= (s0 << 23)
s0 ^= s0 << 23
newS1 = s0 ^ s1 ^ (s0 >> 18) ^ (s1 >> 5)
return
}

View File

@@ -0,0 +1,53 @@
package container
import (
"errors"
"github.com/chihaya/chihaya/bittorrent"
"github.com/chihaya/chihaya/pkg/stop"
"gopkg.in/yaml.v2"
"sync"
)
type Builder interface {
New() (Container, error)
}
var (
buildersMU sync.Mutex
builders = make(map[string]Builder)
ErrContainerDoesNotExist = errors.New("torrent hash container with that name does not exist")
)
func Register(n string, c Builder) {
if len(n) == 0 {
panic("middleware: could not register a Container with an empty name")
}
if c == nil {
panic("middleware: could not register a Container with nil builder")
}
buildersMU.Lock()
defer buildersMU.Unlock()
builders[n] = c
}
type Container interface {
stop.Stopper
Contains(bittorrent.InfoHash) bool
}
func GetContainer(name string, confBytes []byte) (Container, error) {
buildersMU.Lock()
defer buildersMU.Unlock()
var err error
var cn Container
if builder, exist := builders[name]; !exist {
err = ErrContainerDoesNotExist
} else {
if err = yaml.Unmarshal(confBytes, &cn); err == nil {
cn, err = builder.New()
}
}
return cn, err
}

View File

@@ -0,0 +1,104 @@
package directory
import (
"fmt"
"github.com/chihaya/chihaya/bittorrent"
"github.com/chihaya/chihaya/middleware/torrentapproval/container"
"github.com/chihaya/chihaya/middleware/torrentapproval/container/list"
"github.com/chihaya/chihaya/pkg/log"
"github.com/chihaya/chihaya/pkg/stop"
"github.com/fsnotify/fsnotify"
"os"
"path/filepath"
"sync"
)
func init() {
container.Register("list", builder{})
}
type builder struct {
WhitelistPath string `yaml:"whitelist_path"`
BlacklistPath string `yaml:"blacklist_path"`
}
func (b builder) New() (container.Container, error) {
if len(b.WhitelistPath) > 0 && len(b.BlacklistPath) > 0 {
return nil, fmt.Errorf("using both whitelist and blacklist is invalid")
}
var err error
dirLister := &directory{
List: list.List{
Hashes: sync.Map{},
Invert: len(b.WhitelistPath) == 0,
},
files: sync.Map{},
root: b.WhitelistPath,
watcher: nil,
}
if dirLister.Invert {
dirLister.root = b.BlacklistPath
}
var w *fsnotify.Watcher
if w, err = fsnotify.NewWatcher(); err != nil {
return nil, fmt.Errorf("unable to initialize fsnotify mechanism")
}
if dirContent, err := os.ReadDir(dirLister.root); err != nil {
return nil, err
} else {
for _, f := range dirContent {
if !f.IsDir() {
if err = dirLister.processFile(f.Name(), false); err != nil {
log.Warn(err)
}
}
}
}
if err = w.Add(dirLister.root); err != nil {
_ = w.Close()
dirLister = nil
}
return dirLister, err
}
func (d *directory) watch() {
go func() {
for err := range d.watcher.Errors {
log.Error(err)
}
}()
go func() {
for event := range d.watcher.Events {
log.Debug(event.String())
//todo: implement event type parsing
}
}()
}
func (d *directory) processFile(name string, delete bool) error {
fullName := filepath.Join(d.root, name)
if delete {
if hash, found := d.files.Load(fullName); found{
d.Hashes.Delete(hash)
}
} else {
var hashBytes []byte
info := bittorrent.InfoHashFromBytes(hashBytes)
d.files.Store(fullName, info)
d.Hashes.Store(info, list.DUMMY)
}
return nil
}
type directory struct {
list.List
files sync.Map
root string
watcher *fsnotify.Watcher
}
func (d *directory) Stop() stop.Result {
ch := make(stop.Channel)
go ch.Done(d.watcher.Close())
return ch.Result()
}

View File

@@ -0,0 +1,63 @@
package list
import (
"encoding/hex"
"fmt"
"github.com/chihaya/chihaya/bittorrent"
"github.com/chihaya/chihaya/middleware/torrentapproval/container"
"github.com/chihaya/chihaya/pkg/stop"
"sync"
)
func init() {
container.Register("list", Builder{})
}
type Builder struct {
Whitelist []string `yaml:"whitelist"`
Blacklist []string `yaml:"blacklist"`
}
var DUMMY struct{}
func (b Builder) New() (container.Container, error) {
if len(b.Whitelist) > 0 && len(b.Blacklist) > 0 {
return nil, fmt.Errorf("using both whitelist and blacklist is invalid")
}
l := &List{
Hashes: sync.Map{},
Invert: len(b.Whitelist) == 0,
}
hashList := b.Whitelist
if l.Invert {
l.Invert = true
hashList = b.Blacklist
}
for _, hashString := range hashList {
hashinfo, err := hex.DecodeString(hashString)
if err != nil {
return nil, fmt.Errorf("whitelist : invalid hash %s", hashString)
}
if len(hashinfo) != 20 {
return nil, fmt.Errorf("whitelist : hash %s is not 20 byes", hashString)
}
l.Hashes.Store(bittorrent.InfoHashFromBytes(hashinfo), DUMMY)
}
return l, nil
}
type List struct {
Invert bool
Hashes sync.Map
}
func (l *List) Stop() stop.Result {
return stop.AlreadyStopped
}
func (l *List) Contains(hash bittorrent.InfoHash) bool {
_, result := l.Hashes.Load(hash)
return result != l.Invert
}

View File

@@ -4,10 +4,10 @@ package torrentapproval
import (
"context"
"encoding/hex"
"fmt"
yaml "gopkg.in/yaml.v2"
"github.com/chihaya/chihaya/middleware/torrentapproval/container"
"github.com/chihaya/chihaya/pkg/stop"
"gopkg.in/yaml.v2"
"github.com/chihaya/chihaya/bittorrent"
"github.com/chihaya/chihaya/middleware"
@@ -20,90 +20,58 @@ func init() {
middleware.RegisterDriver(Name, driver{})
}
var _ middleware.Driver = driver{}
type driver struct{}
func (d driver) NewHook(optionBytes []byte) (middleware.Hook, error) {
var cfg Config
var cfg middleware.Config
err := yaml.Unmarshal(optionBytes, &cfg)
if err != nil {
return nil, fmt.Errorf("invalid options for middleware %s: %s", Name, err)
}
return NewHook(cfg)
if len(cfg.Name) == 0 {
return nil, fmt.Errorf("invalid options for middleware %s: name not provided", Name)
}
if cfg.Options == nil {
return nil, fmt.Errorf("invalid options for middleware %s: options not provided", Name)
}
var confBytes []byte
if confBytes, err = yaml.Marshal(cfg.Options); err != nil {
return nil, err
}
if c, err := container.GetContainer(cfg.Name, confBytes); err == nil{
return &hook{c}, nil
} else {
return nil, err
}
}
// ErrTorrentUnapproved is the error returned when a torrent hash is invalid.
var ErrTorrentUnapproved = bittorrent.ClientError("unapproved torrent")
// Config represents all the values required by this middleware to validate
// torrents based on their hash value.
type Config struct {
Whitelist []string `yaml:"whitelist"`
Blacklist []string `yaml:"blacklist"`
}
type hook struct {
approved map[bittorrent.InfoHash]struct{}
unapproved map[bittorrent.InfoHash]struct{}
hashContainer container.Container
}
// NewHook returns an instance of the torrent approval middleware.
func NewHook(cfg Config) (middleware.Hook, error) {
h := &hook{
approved: make(map[bittorrent.InfoHash]struct{}),
unapproved: make(map[bittorrent.InfoHash]struct{}),
}
if len(cfg.Whitelist) > 0 && len(cfg.Blacklist) > 0 {
return nil, fmt.Errorf("using both whitelist and blacklist is invalid")
}
for _, hashString := range cfg.Whitelist {
hashinfo, err := hex.DecodeString(hashString)
if err != nil {
return nil, fmt.Errorf("whitelist : invalid hash %s", hashString)
}
if len(hashinfo) != 20 {
return nil, fmt.Errorf("whitelist : hash %s is not 20 byes", hashString)
}
h.approved[bittorrent.InfoHashFromBytes(hashinfo)] = struct{}{}
}
for _, hashString := range cfg.Blacklist {
hashinfo, err := hex.DecodeString(hashString)
if err != nil {
return nil, fmt.Errorf("blacklist : invalid hash %s", hashString)
}
if len(hashinfo) != 20 {
return nil, fmt.Errorf("blacklist : hash %s is not 20 byes", hashString)
}
h.unapproved[bittorrent.InfoHashFromBytes(hashinfo)] = struct{}{}
}
return h, nil
}
func (h *hook) HandleAnnounce(ctx context.Context, req *bittorrent.AnnounceRequest, resp *bittorrent.AnnounceResponse) (context.Context, error) {
infohash := req.InfoHash
var err error
if len(h.approved) > 0 {
if _, found := h.approved[infohash]; !found {
return ctx, ErrTorrentUnapproved
}
if !h.hashContainer.Contains(req.InfoHash){
err = ErrTorrentUnapproved
}
if len(h.unapproved) > 0 {
if _, found := h.unapproved[infohash]; found {
return ctx, ErrTorrentUnapproved
}
}
return ctx, nil
return ctx, err
}
func (h *hook) HandleScrape(ctx context.Context, req *bittorrent.ScrapeRequest, resp *bittorrent.ScrapeResponse) (context.Context, error) {
// Scrapes don't require any protection.
return ctx, nil
}
func (h *hook) Stop() stop.Result {
return h.hashContainer.Stop()
}

View File

@@ -155,7 +155,7 @@ func (cfg Config) Validate() Config {
})
}
if cfg.PrometheusReportingInterval <= 0 {
if cfg.PrometheusReportingInterval < 0 {
validcfg.PrometheusReportingInterval = defaultPrometheusReportingInterval
log.Warn("falling back to default configuration", log.Fields{
"name": Name + ".PrometheusReportingInterval",
@@ -209,23 +209,27 @@ func New(provided Config) (storage.PeerStore, error) {
}
}()
// Start a goroutine for reporting statistics to Prometheus.
ps.wg.Add(1)
go func() {
defer ps.wg.Done()
t := time.NewTicker(cfg.PrometheusReportingInterval)
for {
select {
case <-ps.closed:
t.Stop()
return
case <-t.C:
before := time.Now()
ps.populateProm()
log.Debug("storage: populateProm() finished", log.Fields{"timeTaken": time.Since(before)})
if cfg.PrometheusReportingInterval > 0 {
// Start a goroutine for reporting statistics to Prometheus.
ps.wg.Add(1)
go func() {
defer ps.wg.Done()
t := time.NewTicker(cfg.PrometheusReportingInterval)
for {
select {
case <-ps.closed:
t.Stop()
return
case <-t.C:
before := time.Now()
ps.populateProm()
log.Debug("storage: populateProm() finished", log.Fields{"timeTaken": time.Since(before)})
}
}
}
}()
}()
} else {
log.Info("prometheus disabled because of zero reporting interval")
}
return ps, nil
}