Implement simple PGDC methods for storage

* sanitize code a little
* move e2e build to 'e2e' tag
This commit is contained in:
Širhoe Biazhkovič
2021-11-22 19:33:52 +03:00
committed by Lawrence, Rendall
parent 566d99fcd7
commit beb4736b86
37 changed files with 483 additions and 317 deletions
+2 -2
View File
@@ -56,7 +56,7 @@ jobs:
go-version: ^1.15
- name: End-to-End Test
run: |
go install ./cmd/chihaya
go install --tags e2e ./cmd/chihaya
cat ./dist/example_config.yaml
chihaya --config=./dist/example_config.yaml --debug &
pid=$!
@@ -87,7 +87,7 @@ jobs:
cat ./dist/example_redis_config.yaml
- name: End-to-End Test
run: |
go install ./cmd/chihaya
go install --tags e2e ./cmd/chihaya
chihaya --config=./dist/example_redis_config.yaml --debug &
pid=$!
sleep 2
+4 -2
View File
@@ -4,6 +4,8 @@
package bittorrent
import (
"crypto/sha1"
"crypto/sha256"
"fmt"
"github.com/pkg/errors"
"net"
@@ -45,8 +47,8 @@ func (p PeerID) RawString() string {
type InfoHash string
const (
InfoHashV1Len = 20
InfoHashV2Len = 32
InfoHashV1Len = sha1.Size
InfoHashV2Len = sha256.Size
NoneInfoHash InfoHash = ""
)
+6 -3
View File
@@ -19,7 +19,6 @@ var peerStringTestCases = []struct {
}{
{
input: Peer{
ID: NewPeerID(b),
IP: IP{net.IPv4(10, 11, 12, 1), IPv4},
Port: 1234,
},
@@ -27,7 +26,6 @@ var peerStringTestCases = []struct {
},
{
input: Peer{
ID: NewPeerID(b),
IP: IP{net.ParseIP("2001:db8::ff00:42:8329"), IPv6},
Port: 1234,
},
@@ -36,7 +34,9 @@ var peerStringTestCases = []struct {
}
func TestPeerID_String(t *testing.T) {
s := NewPeerID(b).String()
pid, err := NewPeerID(b)
require.Nil(t, err)
s := pid.String()
require.Equal(t, expected, s)
}
@@ -47,7 +47,10 @@ func TestInfoHash_String(t *testing.T) {
}
func TestPeer_String(t *testing.T) {
pid, err := NewPeerID(b)
require.Nil(t, err)
for _, c := range peerStringTestCases {
c.input.ID = pid
got := c.input.String()
require.Equal(t, c.expected, got)
}
+1 -1
View File
@@ -168,7 +168,7 @@ func parseQuery(query string) (q *QueryParams, err error) {
}
if key == "info_hash" {
if ih, err := InfoHashFromString(value); err == nil{
if ih, err := NewInfoHash([]byte(value)); err == nil {
q.infoHashes = append(q.infoHashes, ih)
} else {
return nil, err
+3 -3
View File
@@ -29,9 +29,9 @@ type storageConfig struct {
// Config represents the configuration used for executing Chihaya.
type Config struct {
middleware.ResponseConfig `yaml:",inline"`
MetricsAddr string `yaml:"metrics_addr"`
HTTPConfig http.Config `yaml:"http"`
UDPConfig udp.Config `yaml:"udp"`
MetricsAddr string `yaml:"metrics_addr"`
HTTPConfig http.Config `yaml:"http"`
UDPConfig udp.Config `yaml:"udp"`
Storage storageConfig `yaml:"storage"`
PreHooks []middleware.Config `yaml:"prehooks"`
PostHooks []middleware.Config `yaml:"posthooks"`
+19 -3
View File
@@ -1,3 +1,6 @@
//go:build e2e
// +build e2e
package main
import (
@@ -13,6 +16,19 @@ import (
"github.com/chihaya/chihaya/pkg/log"
)
func init() {
e2eCmd = &cobra.Command{
Use: "e2e",
Short: "exec e2e tests",
Long: "Execute the Chihaya end-to-end test suite",
RunE: EndToEndRunCmdFunc,
}
e2eCmd.Flags().String("httpaddr", "http://127.0.0.1:6969/announce", "address of the HTTP tracker")
e2eCmd.Flags().String("udpaddr", "udp://127.0.0.1:6969", "address of the UDP tracker")
e2eCmd.Flags().Duration("delay", time.Second, "delay between announces")
}
// EndToEndRunCmdFunc implements a Cobra command that runs the end-to-end test
// suite for a Chihaya build.
func EndToEndRunCmdFunc(cmd *cobra.Command, args []string) error {
@@ -55,7 +71,7 @@ func EndToEndRunCmdFunc(cmd *cobra.Command, args []string) error {
}
func generateInfohash() bittorrent.InfoHash {
b := make([]byte, 20)
b := make([]byte, bittorrent.InfoHashV1Len)
rand.Read(b)
ih, _ := bittorrent.NewInfoHash(b)
return ih
@@ -70,7 +86,7 @@ func testWithInfohash(infoHash bittorrent.InfoHash, url string, delay time.Durat
var ih [bittorrent.InfoHashV1Len]byte
req := tracker.AnnounceRequest{
InfoHash: ih,
PeerId: [20]byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20},
PeerId: [bittorrent.PeerIDLen]byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20},
Downloaded: 50,
Left: 100,
Uploaded: 50,
@@ -98,7 +114,7 @@ func testWithInfohash(infoHash bittorrent.InfoHash, url string, delay time.Durat
copy(ih[:], infoHash)
req = tracker.AnnounceRequest{
InfoHash: ih,
PeerId: [20]byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 21},
PeerId: [bittorrent.PeerIDLen]byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 21},
Downloaded: 50,
Left: 100,
Uploaded: 50,
+8 -17
View File
@@ -2,15 +2,13 @@ package main
import (
"errors"
"github.com/sirupsen/logrus"
"github.com/spf13/cobra"
"os"
"os/signal"
"runtime"
"strings"
"syscall"
"time"
"github.com/sirupsen/logrus"
"github.com/spf13/cobra"
"github.com/chihaya/chihaya/frontend/http"
"github.com/chihaya/chihaya/frontend/udp"
@@ -21,6 +19,8 @@ import (
"github.com/chihaya/chihaya/storage"
)
var e2eCmd *cobra.Command
// Run represents the state of a running instance of Chihaya.
type Run struct {
configFilePath string
@@ -137,7 +137,7 @@ func (r *Run) Stop(keepPeerStore bool) (storage.Storage, error) {
// RootRunCmdFunc implements a Cobra command that runs an instance of Chihaya
// and handles reloading and shutdown via process signals.
func RootRunCmdFunc(cmd *cobra.Command, args []string) error {
func RootRunCmdFunc(cmd *cobra.Command, _ []string) error {
configFilePath, err := cmd.Flags().GetString("config")
if err != nil {
return err
@@ -177,7 +177,7 @@ func RootRunCmdFunc(cmd *cobra.Command, args []string) error {
}
// RootPreRunCmdFunc handles command line flags for the Run command.
func RootPreRunCmdFunc(cmd *cobra.Command, args []string) error {
func RootPreRunCmdFunc(cmd *cobra.Command, _ []string) error {
noColors, err := cmd.Flags().GetBool("nocolors")
if err != nil {
return err
@@ -233,19 +233,10 @@ func main() {
rootCmd.Flags().String("config", "/etc/chihaya.yaml", "location of configuration file")
var e2eCmd = &cobra.Command{
Use: "e2e",
Short: "exec e2e tests",
Long: "Execute the Chihaya end-to-end test suite",
RunE: EndToEndRunCmdFunc,
if e2eCmd != nil {
rootCmd.AddCommand(e2eCmd)
}
e2eCmd.Flags().String("httpaddr", "http://127.0.0.1:6969/announce", "address of the HTTP tracker")
e2eCmd.Flags().String("udpaddr", "udp://127.0.0.1:6969", "address of the UDP tracker")
e2eCmd.Flags().Duration("delay", time.Second, "delay between announces")
rootCmd.AddCommand(e2eCmd)
if err := rootCmd.Execute(); err != nil {
log.Fatal("failed when executing root cobra command: " + err.Error())
}
+1
View File
@@ -1,3 +1,4 @@
//go:build darwin || freebsd || linux || netbsd || openbsd || dragonfly || solaris
// +build darwin freebsd linux netbsd openbsd dragonfly solaris
package main
+1
View File
@@ -1,3 +1,4 @@
//go:build windows
// +build windows
package main
+3 -3
View File
@@ -186,7 +186,7 @@ func NewFrontend(logic frontend.TrackerLogic, provided Config) (*Frontend, error
if cfg.Addr != "" {
go func() {
if err := f.serveHTTP(router,false); err != nil {
if err := f.serveHTTP(router, false); err != nil {
log.Fatal("failed while serving http", log.Err(err))
}
}()
@@ -194,7 +194,7 @@ func NewFrontend(logic frontend.TrackerLogic, provided Config) (*Frontend, error
if cfg.HTTPSAddr != "" {
go func() {
if err := f.serveHTTP(router,true); err != nil {
if err := f.serveHTTP(router, true); err != nil {
log.Fatal("failed while serving https", log.Err(err))
}
}()
@@ -245,7 +245,7 @@ func (f *Frontend) serveHTTP(handler http.Handler, tls bool) error {
srv.TLSConfig = f.tlsCfg
f.tlsSrv = srv
err = srv.ListenAndServe()
} else{
} else {
srv.Addr = f.Addr
f.srv = srv
err = f.tlsSrv.ListenAndServeTLS("", "")
+3 -4
View File
@@ -67,11 +67,10 @@ func ParseAnnounce(r *http.Request, opts ParseOptions) (*bittorrent.AnnounceRequ
if !ok {
return nil, bittorrent.ClientError("failed to parse parameter: peer_id")
}
if len(peerID) != bittorrent.PeerIDLen {
return nil, bittorrent.ClientError("failed to provide valid peer_id")
request.Peer.ID, err = bittorrent.NewPeerID([]byte(peerID))
if err != nil {
return nil, err
}
request.Peer.ID = bittorrent.NewPeerID([]byte(peerID))
// Determine the number of remaining bytes for the client.
request.Left, err = qp.Uint64("left")
if err != nil {
+1 -1
View File
@@ -23,7 +23,7 @@ func WriteError(w http.ResponseWriter, err error) {
w.WriteHeader(http.StatusOK)
if err = bencode.NewEncoder(w).Encode(map[string]interface{}{
"failure reason": message,
}); err != nil{
}); err != nil {
log.Error("unable to encode string", log.Err(err))
}
}
+1 -1
View File
@@ -7,7 +7,7 @@ import (
"net"
"time"
sha256 "github.com/minio/sha256-simd"
"github.com/minio/sha256-simd"
"github.com/chihaya/chihaya/pkg/log"
)
+1 -1
View File
@@ -9,7 +9,7 @@ import (
"testing"
"time"
sha256 "github.com/minio/sha256-simd"
"github.com/minio/sha256-simd"
"github.com/stretchr/testify/require"
"github.com/chihaya/chihaya/pkg/log"
+1 -1
View File
@@ -148,7 +148,7 @@ func (t *Frontend) Stop() stop.Result {
c := make(stop.Channel)
go func() {
close(t.closing)
t.socket.SetReadDeadline(time.Now())
_ = t.socket.SetReadDeadline(time.Now())
t.wg.Wait()
c.Done(t.socket.Close())
}()
+9 -4
View File
@@ -7,7 +7,7 @@ import (
"net"
"sync"
"github.com/chihaya/chihaya/bittorrent"
bittorrent "github.com/chihaya/chihaya/bittorrent"
)
const (
@@ -113,7 +113,12 @@ func ParseAnnounce(r Request, v6Action bool, opts ParseOptions) (*bittorrent.Ann
}
ih, err := bittorrent.NewInfoHash(infohash)
if err != nil{
if err != nil {
return nil, err
}
peerId, err := bittorrent.NewPeerID(peerID)
if err != nil {
return nil, err
}
@@ -128,7 +133,7 @@ func ParseAnnounce(r Request, v6Action bool, opts ParseOptions) (*bittorrent.Ann
NumWantProvided: true,
EventProvided: true,
Peer: bittorrent.Peer{
ID: bittorrent.NewPeerID(peerID),
ID: peerId,
IP: bittorrent.IP{IP: ip},
Port: port,
},
@@ -227,7 +232,7 @@ func ParseScrape(r Request, opts ParseOptions) (*bittorrent.ScrapeRequest, error
pageSize = bittorrent.InfoHashV2Len
}
for len(r.Packet) >= pageSize {
if ih, err := bittorrent.NewInfoHash(r.Packet[:pageSize]); err != nil{
if ih, err := bittorrent.NewInfoHash(r.Packet[:pageSize]); err != nil {
return nil, err
} else {
infohashes = append(infohashes, ih)
+17 -17
View File
@@ -18,9 +18,9 @@ func WriteError(w io.Writer, txID []byte, err error) {
buf := newBuffer()
writeHeader(buf, txID, errorActionID)
buf.WriteString(err.Error())
buf.WriteRune('\000')
w.Write(buf.Bytes())
_, _ = buf.WriteString(err.Error())
_, _ = buf.WriteRune('\000')
_, _ = w.Write(buf.Bytes())
buf.free()
}
@@ -37,9 +37,9 @@ func WriteAnnounce(w io.Writer, txID []byte, resp *bittorrent.AnnounceResponse,
} else {
writeHeader(buf, txID, announceActionID)
}
binary.Write(buf, binary.BigEndian, uint32(resp.Interval/time.Second))
binary.Write(buf, binary.BigEndian, resp.Incomplete)
binary.Write(buf, binary.BigEndian, resp.Complete)
_ = binary.Write(buf, binary.BigEndian, uint32(resp.Interval/time.Second))
_ = binary.Write(buf, binary.BigEndian, resp.Incomplete)
_ = binary.Write(buf, binary.BigEndian, resp.Complete)
peers := resp.IPv4Peers
if v6Peers {
@@ -47,11 +47,11 @@ func WriteAnnounce(w io.Writer, txID []byte, resp *bittorrent.AnnounceResponse,
}
for _, peer := range peers {
buf.Write(peer.IP.IP)
binary.Write(buf, binary.BigEndian, peer.Port)
_, _ = buf.Write(peer.IP.IP)
_ = binary.Write(buf, binary.BigEndian, peer.Port)
}
w.Write(buf.Bytes())
_, _ = w.Write(buf.Bytes())
buf.free()
}
@@ -62,12 +62,12 @@ func WriteScrape(w io.Writer, txID []byte, resp *bittorrent.ScrapeResponse) {
writeHeader(buf, txID, scrapeActionID)
for _, scrape := range resp.Files {
binary.Write(buf, binary.BigEndian, scrape.Complete)
binary.Write(buf, binary.BigEndian, scrape.Snatches)
binary.Write(buf, binary.BigEndian, scrape.Incomplete)
_ = binary.Write(buf, binary.BigEndian, scrape.Complete)
_ = binary.Write(buf, binary.BigEndian, scrape.Snatches)
_ = binary.Write(buf, binary.BigEndian, scrape.Incomplete)
}
w.Write(buf.Bytes())
_, _ = w.Write(buf.Bytes())
buf.free()
}
@@ -76,15 +76,15 @@ func WriteConnectionID(w io.Writer, txID, connID []byte) {
buf := newBuffer()
writeHeader(buf, txID, connectActionID)
buf.Write(connID)
_, _ = buf.Write(connID)
w.Write(buf.Bytes())
_, _ = w.Write(buf.Bytes())
buf.free()
}
// writeHeader writes the action and transaction ID to the provided response
// buffer.
func writeHeader(w io.Writer, txID []byte, action uint32) {
binary.Write(w, binary.BigEndian, action)
w.Write(txID)
_ = binary.Write(w, binary.BigEndian, action)
_, _ = w.Write(txID)
}
+5 -1
View File
@@ -47,7 +47,11 @@ func TestClientID(t *testing.T) {
t.Run(tt.peerID, func(t *testing.T) {
var clientID ClientID
copy(clientID[:], tt.clientID)
parsedID := NewClientID(bittorrent.NewPeerID([]byte(tt.peerID)))
peerId, err := bittorrent.NewPeerID([]byte(tt.peerID))
if err != nil {
t.Error(err)
}
parsedID := NewClientID(peerId)
if parsedID != clientID {
t.Error("Incorrectly parsed peer ID", tt.peerID, "as", parsedID)
}
@@ -59,8 +59,8 @@ func TestHandleAnnounce(t *testing.T) {
req := &bittorrent.AnnounceRequest{}
resp := &bittorrent.AnnounceResponse{}
peerid := bittorrent.NewPeerID([]byte(tt.peerID))
peerid, err := bittorrent.NewPeerID([]byte(tt.peerID))
require.Nil(t, err)
req.Peer.ID = peerid
nctx, err := h.HandleAnnounce(ctx, req, resp)
+3 -3
View File
@@ -28,7 +28,7 @@ type swarmInteractionHook struct {
store storage.Storage
}
func (h *swarmInteractionHook) HandleAnnounce(ctx context.Context, req *bittorrent.AnnounceRequest, resp *bittorrent.AnnounceResponse) (_ context.Context, err error) {
func (h *swarmInteractionHook) HandleAnnounce(ctx context.Context, req *bittorrent.AnnounceRequest, _ *bittorrent.AnnounceResponse) (_ context.Context, err error) {
if ctx.Value(SkipSwarmInteractionKey) != nil {
return ctx, nil
}
@@ -75,14 +75,14 @@ type skipResponseHook struct{}
// skip.
var SkipResponseHookKey = skipResponseHook{}
type scrapeAddressType struct{}
//type scrapeAddressType struct{}
// ScrapeIsIPv6Key is the key under which to store whether or not the
// address used to request a scrape was an IPv6 address.
// The value is expected to be of type bool.
// A missing value or a value that is not a bool for this key is equivalent to
// it being set to false.
var ScrapeIsIPv6Key = scrapeAddressType{}
//var ScrapeIsIPv6Key = scrapeAddressType{}
type responseHook struct {
store storage.Storage
+1 -1
View File
@@ -22,7 +22,7 @@ import (
"github.com/SermoDigital/jose/jws"
"github.com/SermoDigital/jose/jwt"
"github.com/mendsley/gojwk"
yaml "gopkg.in/yaml.v2"
"gopkg.in/yaml.v2"
"github.com/chihaya/chihaya/bittorrent"
"github.com/chihaya/chihaya/middleware"
+2 -2
View File
@@ -15,11 +15,11 @@ import (
// benchmarks.
type nopHook struct{}
func (h *nopHook) HandleAnnounce(ctx context.Context, req *bittorrent.AnnounceRequest, resp *bittorrent.AnnounceResponse) (context.Context, error) {
func (h *nopHook) HandleAnnounce(ctx context.Context, _ *bittorrent.AnnounceRequest, _ *bittorrent.AnnounceResponse) (context.Context, error) {
return ctx, nil
}
func (h *nopHook) HandleScrape(ctx context.Context, req *bittorrent.ScrapeRequest, resp *bittorrent.ScrapeResponse) (context.Context, error) {
func (h *nopHook) HandleScrape(ctx context.Context, _ *bittorrent.ScrapeRequest, _ *bittorrent.ScrapeResponse) (context.Context, error) {
return ctx, nil
}
+1 -1
View File
@@ -7,7 +7,7 @@ import (
"github.com/chihaya/chihaya/storage"
"sync"
yaml "gopkg.in/yaml.v2"
"gopkg.in/yaml.v2"
)
var (
@@ -7,7 +7,9 @@ import (
"sync"
)
type Builder func ([]byte, storage.Storage) (Container, error)
const DefaultStorageCtxName = "MW_APPROVAL"
type Builder func([]byte, storage.Storage) (Container, error)
var (
buildersMU sync.Mutex
@@ -30,7 +32,7 @@ func Register(n string, c Builder) {
}
type Container interface {
Contains(bittorrent.InfoHash) bool
Approved(bittorrent.InfoHash) bool
}
func GetContainer(name string, confBytes []byte, storage storage.Storage) (Container, error) {
@@ -4,63 +4,88 @@
package directory
import (
"crypto/sha256"
"fmt"
"github.com/anacrolix/torrent/metainfo"
"github.com/anacrolix/torrent/util/dirwatch"
"github.com/chihaya/chihaya/bittorrent"
"github.com/chihaya/chihaya/middleware/torrentapproval/container"
"github.com/chihaya/chihaya/middleware/torrentapproval/container/list"
"github.com/chihaya/chihaya/pkg/log"
"github.com/chihaya/chihaya/pkg/stop"
"github.com/chihaya/chihaya/storage"
"gopkg.in/yaml.v2"
"sync"
)
const Name = "directory"
func init() {
container.Register("directory", build)
container.Register(Name, build)
}
type Config struct {
WhitelistPath string `yaml:"whitelist_path"`
BlacklistPath string `yaml:"blacklist_path"`
list.Config
Path string `yaml:"path"`
}
// TODO: change sync map to provided storage
func build(confBytes []byte, storage storage.Storage) (container.Container, error) {
func build(confBytes []byte, st storage.Storage) (container.Container, error) {
c := new(Config)
if err := yaml.Unmarshal(confBytes, c); err != nil {
return nil, fmt.Errorf("unable to deserialise configuration: %v", err)
}
if len(c.WhitelistPath) > 0 && len(c.BlacklistPath) > 0 {
return nil, fmt.Errorf("using both whitelist and blacklist is invalid")
}
var err error
lst := &directory{
d := &directory{
List: list.List{
Hashes: sync.Map{},
Invert: len(c.WhitelistPath) == 0,
Invert: c.Invert,
Storage: st,
StorageCtx: c.StorageCtx,
},
watcher: nil,
}
dir := c.WhitelistPath
if lst.Invert {
dir = c.BlacklistPath
}
//FIXME: implement V2 torrent add/delete
var w *dirwatch.Instance
if w, err = dirwatch.New(dir); err != nil {
if w, err = dirwatch.New(c.Path); err != nil {
return nil, fmt.Errorf("unable to initialize directory watch: %v", err)
}
lst.watcher = w
d.watcher = w
if len(d.StorageCtx) == 0 {
log.Info("Storage context not set, using default value: " + container.DefaultStorageCtxName)
d.StorageCtx = container.DefaultStorageCtxName
}
go func() {
for event := range lst.watcher.Events {
for event := range d.watcher.Events {
switch event.Change {
case dirwatch.Added:
lst.Hashes.Store(event.InfoHash, list.DUMMY)
data := make([]storage.Pair, 1, 2)
data[0] = storage.Pair{Left: event.InfoHash[:], Right: list.DUMMY}
if v2ih, err := v2InfoHash(event.TorrentFilePath); err == nil {
data = append(data, storage.Pair{Left: v2ih, Right: list.DUMMY})
} else {
log.Err(err)
}
d.Storage.BulkPut(c.StorageCtx, data...)
case dirwatch.Removed:
lst.Hashes.Delete(event.InfoHash)
data := make([]interface{}, 1, 2)
data[0] = event.InfoHash[:]
if v2ih, err := v2InfoHash(event.TorrentFilePath); err == nil {
data = append(data, v2ih)
} else {
log.Err(err)
}
d.Storage.Delete(c.StorageCtx, data...)
}
}
}()
return lst, err
return d, err
}
func v2InfoHash(path string) (ih bittorrent.InfoHash, err error) {
var mi *metainfo.MetaInfo
if mi, err = metainfo.LoadFromFile(path); err == nil {
hash := sha256.New()
hash.Write(mi.InfoBytes)
ih, err = bittorrent.NewInfoHash(hash.Sum(nil))
}
return
}
type directory struct {
@@ -5,63 +5,68 @@ package list
import (
"encoding/hex"
"fmt"
bittorrent "github.com/chihaya/chihaya/bittorrent"
"github.com/chihaya/chihaya/bittorrent"
"github.com/chihaya/chihaya/middleware/torrentapproval/container"
"github.com/chihaya/chihaya/pkg/log"
"github.com/chihaya/chihaya/storage"
"gopkg.in/yaml.v2"
"sync"
)
const Name = "list"
func init() {
container.Register("list", build)
container.Register(Name, build)
}
type Config struct {
Whitelist []string `yaml:"whitelist"`
Blacklist []string `yaml:"blacklist"`
HashList []string `yaml:"hash_list"`
Invert bool `yaml:"invert"`
StorageCtx string `yaml:"storage_ctx"`
}
var DUMMY struct{}
const DUMMY = true
// TODO: change sync map to provided storage
func build(confBytes []byte, storage storage.Storage) (container.Container, error) {
func build(confBytes []byte, st storage.Storage) (container.Container, error) {
c := new(Config)
if err := yaml.Unmarshal(confBytes, c); err != nil {
return nil, fmt.Errorf("unable to deserialise configuration: %v", err)
}
if len(c.Whitelist) > 0 && len(c.Blacklist) > 0 {
return nil, fmt.Errorf("using both whitelist and blacklist is invalid")
}
l := &List{
Hashes: sync.Map{},
Invert: len(c.Whitelist) == 0,
Invert: c.Invert,
Storage: st,
StorageCtx: c.StorageCtx,
}
hashList := c.Whitelist
if l.Invert {
hashList = c.Blacklist
if len(l.StorageCtx) == 0 {
log.Info("Storage context not set, using default value: " + container.DefaultStorageCtxName)
l.StorageCtx = container.DefaultStorageCtxName
}
for _, hashString := range hashList {
hashBytes, err := hex.DecodeString(hashString)
if err != nil {
return nil, fmt.Errorf("whitelist : invalid hash %s, %v", hashString, err)
if len(c.HashList) > 0 {
init := make([]storage.Pair, 0, len(c.HashList))
for _, hashString := range c.HashList {
hashBytes, err := hex.DecodeString(hashString)
if err != nil {
return nil, fmt.Errorf("whitelist : invalid hash %s, %v", hashString, err)
}
ih, err := bittorrent.NewInfoHash(hashBytes)
if err != nil {
return nil, fmt.Errorf("whitelist : %s : %v", hashString, err)
}
init = append(init, storage.Pair{Left: ih, Right: DUMMY})
}
ih, err := bittorrent.NewInfoHash(hashBytes)
if err != nil {
return nil, fmt.Errorf("whitelist : %s : %v", hashString, err)
}
l.Hashes.Store(ih, DUMMY)
l.Storage.BulkPut(l.StorageCtx, init...)
}
return l, nil
}
type List struct {
Invert bool
Hashes sync.Map
Invert bool
Storage storage.Storage
StorageCtx string
}
func (l *List) Contains(hash bittorrent.InfoHash) bool {
_, result := l.Hashes.Load(hash)
return result != l.Invert
func (l *List) Approved(hash bittorrent.InfoHash) bool {
b := l.Storage.Contains(l.StorageCtx, hash)
return b != l.Invert
}
@@ -62,7 +62,7 @@ type hook struct {
func (h *hook) HandleAnnounce(ctx context.Context, req *bittorrent.AnnounceRequest, _ *bittorrent.AnnounceResponse) (context.Context, error) {
var err error
if !h.hashContainer.Contains(req.InfoHash) {
if !h.hashContainer.Approved(req.InfoHash) {
err = ErrTorrentUnapproved
}
@@ -5,6 +5,7 @@ import (
"encoding/hex"
"fmt"
"github.com/chihaya/chihaya/middleware"
"github.com/chihaya/chihaya/storage/memory"
"gopkg.in/yaml.v2"
"testing"
@@ -23,7 +24,7 @@ var cases = []struct {
middleware.Config{
Name: "list",
Options: map[string]interface{}{
"whitelist": []string{"3532cf2d327fad8448c075b4cb42c8136964a435"},
"hash_list": []string{"3532cf2d327fad8448c075b4cb42c8136964a435"},
},
},
"3532cf2d327fad8448c075b4cb42c8136964a435",
@@ -34,7 +35,7 @@ var cases = []struct {
middleware.Config{
Name: "list",
Options: map[string]interface{}{
"whitelist": []string{"3532cf2d327fad8448c075b4cb42c8136964a435"},
"hash_list": []string{"3532cf2d327fad8448c075b4cb42c8136964a435"},
},
},
"4532cf2d327fad8448c075b4cb42c8136964a435",
@@ -45,7 +46,8 @@ var cases = []struct {
middleware.Config{
Name: "list",
Options: map[string]interface{}{
"blacklist": []string{"3532cf2d327fad8448c075b4cb42c8136964a435"},
"hash_list": []string{"3532cf2d327fad8448c075b4cb42c8136964a435"},
"invert": true,
},
},
"4532cf2d327fad8448c075b4cb42c8136964a435",
@@ -56,7 +58,8 @@ var cases = []struct {
middleware.Config{
Name: "list",
Options: map[string]interface{}{
"blacklist": []string{"3532cf2d327fad8448c075b4cb42c8136964a435"},
"hash_list": []string{"3532cf2d327fad8448c075b4cb42c8136964a435"},
"invert": true,
},
},
"3532cf2d327fad8448c075b4cb42c8136964a435",
@@ -65,12 +68,15 @@ var cases = []struct {
}
func TestHandleAnnounce(t *testing.T) {
config := memory.Config{}.Validate()
storage, err := memory.New(config)
require.Nil(t, err)
for _, tt := range cases {
t.Run(fmt.Sprintf("testing hash %s", tt.ih), func(t *testing.T) {
d := driver{}
cfg, err := yaml.Marshal(tt.cfg)
require.Nil(t, err)
h, err := d.NewHook(cfg)
h, err := d.NewHook(cfg, storage)
require.Nil(t, err)
ctx := context.Background()
@@ -80,7 +86,8 @@ func TestHandleAnnounce(t *testing.T) {
hashbytes, err := hex.DecodeString(tt.ih)
require.Nil(t, err)
hashinfo := bittorrent.NewInfoHash(hashbytes)
hashinfo, err := bittorrent.NewInfoHash(hashbytes)
require.Nil(t, err)
req.InfoHash = hashinfo
+1 -1
View File
@@ -8,7 +8,7 @@ import (
"sync"
"time"
yaml "gopkg.in/yaml.v2"
"gopkg.in/yaml.v2"
"github.com/chihaya/chihaya/bittorrent"
"github.com/chihaya/chihaya/middleware"
@@ -4,14 +4,15 @@ package memory
import (
"encoding/binary"
"net"
"fmt"
"reflect"
"runtime"
"sync"
"time"
yaml "gopkg.in/yaml.v2"
"gopkg.in/yaml.v2"
bittorrent "github.com/chihaya/chihaya/bittorrent"
"github.com/chihaya/chihaya/bittorrent"
"github.com/chihaya/chihaya/pkg/log"
"github.com/chihaya/chihaya/pkg/stop"
"github.com/chihaya/chihaya/pkg/timecache"
@@ -36,7 +37,7 @@ func init() {
type driver struct{}
func (d driver) NewPeerStore(icfg interface{}) (storage.Storage, error) {
func (d driver) NewStorage(icfg interface{}) (storage.Storage, error) {
// Marshal the config back into bytes.
bytes, err := yaml.Marshal(icfg)
if err != nil {
@@ -121,10 +122,11 @@ func (cfg Config) Validate() Config {
// New creates a new Storage backed by memory.
func New(provided Config) (storage.Storage, error) {
cfg := provided.Validate()
ps := &peerStore{
cfg: cfg,
shards: make([]*peerShard, cfg.ShardCount*2),
closed: make(chan struct{}),
ps := &store{
cfg: cfg,
shards: make([]*peerShard, cfg.ShardCount*2),
contexts: sync.Map{},
closed: make(chan struct{}),
}
for i := 0; i < cfg.ShardCount*2; i++ {
@@ -174,40 +176,6 @@ func New(provided Config) (storage.Storage, error) {
return ps, nil
}
type serializedPeer string
func newPeerKey(p bittorrent.Peer) serializedPeer {
b := make([]byte, 20+2+len(p.IP.IP))
copy(b[:20], p.ID[:])
binary.BigEndian.PutUint16(b[bittorrent.PeerIDLen:bittorrent.PeerIDLen+2], p.Port)
copy(b[bittorrent.PeerIDLen+2:], p.IP.IP)
return serializedPeer(b)
}
// TODO: move duplicated code into one place
func decodePeerKey(pk serializedPeer) bittorrent.Peer {
peerId, err := bittorrent.NewPeerID([]byte(pk[:bittorrent.PeerIDLen]))
if err != nil {
panic(err)
}
peer := bittorrent.Peer{
ID: peerId,
Port: binary.BigEndian.Uint16([]byte(pk[bittorrent.PeerIDLen:bittorrent.PeerIDLen+2])),
IP: bittorrent.IP{IP: net.IP(pk[bittorrent.PeerIDLen+2:])}}
if ip := peer.IP.To4(); ip != nil {
peer.IP.IP = ip
peer.IP.AddressFamily = bittorrent.IPv4
} else if len(peer.IP.IP) == net.IPv6len { // implies toReturn.IP.To4() == nil
peer.IP.AddressFamily = bittorrent.IPv6
} else {
panic("IP is neither v4 nor v6")
}
return peer
}
type peerShard struct {
swarms map[bittorrent.InfoHash]swarm
numSeeders uint64
@@ -217,23 +185,24 @@ type peerShard struct {
type swarm struct {
// map serialized peer to mtime
seeders map[serializedPeer]int64
leechers map[serializedPeer]int64
seeders map[storage.SerializedPeer]int64
leechers map[storage.SerializedPeer]int64
}
type peerStore struct {
cfg Config
shards []*peerShard
type store struct {
cfg Config
shards []*peerShard
contexts sync.Map
closed chan struct{}
wg sync.WaitGroup
}
var _ storage.Storage = &peerStore{}
var _ storage.Storage = &store{}
// populateProm aggregates metrics over all shards and then posts them to
// prometheus.
func (ps *peerStore) populateProm() {
func (ps *store) populateProm() {
var numInfohashes, numSeeders, numLeechers uint64
for _, s := range ps.shards {
@@ -254,11 +223,11 @@ func recordGCDuration(duration time.Duration) {
storage.PromGCDurationMilliseconds.Observe(float64(duration.Nanoseconds()) / float64(time.Millisecond))
}
func (ps *peerStore) getClock() int64 {
func (ps *store) getClock() int64 {
return timecache.NowUnixNano()
}
func (ps *peerStore) shardIndex(infoHash bittorrent.InfoHash, af bittorrent.AddressFamily) uint32 {
func (ps *store) shardIndex(infoHash bittorrent.InfoHash, af bittorrent.AddressFamily) uint32 {
// There are twice the amount of shards specified by the user, the first
// half is dedicated to IPv4 swarms and the second half is dedicated to
// IPv6 swarms.
@@ -269,22 +238,22 @@ func (ps *peerStore) shardIndex(infoHash bittorrent.InfoHash, af bittorrent.Addr
return idx
}
func (ps *peerStore) PutSeeder(ih bittorrent.InfoHash, p bittorrent.Peer) error {
func (ps *store) PutSeeder(ih bittorrent.InfoHash, p bittorrent.Peer) error {
select {
case <-ps.closed:
panic("attempted to interact with stopped memory store")
default:
}
pk := newPeerKey(p)
pk := storage.NewSerializedPeer(p)
shard := ps.shards[ps.shardIndex(ih, p.IP.AddressFamily)]
shard.Lock()
if _, ok := shard.swarms[ih]; !ok {
shard.swarms[ih] = swarm{
seeders: make(map[serializedPeer]int64),
leechers: make(map[serializedPeer]int64),
seeders: make(map[storage.SerializedPeer]int64),
leechers: make(map[storage.SerializedPeer]int64),
}
}
@@ -300,14 +269,14 @@ func (ps *peerStore) PutSeeder(ih bittorrent.InfoHash, p bittorrent.Peer) error
return nil
}
func (ps *peerStore) DeleteSeeder(ih bittorrent.InfoHash, p bittorrent.Peer) error {
func (ps *store) DeleteSeeder(ih bittorrent.InfoHash, p bittorrent.Peer) error {
select {
case <-ps.closed:
panic("attempted to interact with stopped memory store")
default:
}
pk := newPeerKey(p)
pk := storage.NewSerializedPeer(p)
shard := ps.shards[ps.shardIndex(ih, p.IP.AddressFamily)]
shard.Lock()
@@ -333,22 +302,22 @@ func (ps *peerStore) DeleteSeeder(ih bittorrent.InfoHash, p bittorrent.Peer) err
return nil
}
func (ps *peerStore) PutLeecher(ih bittorrent.InfoHash, p bittorrent.Peer) error {
func (ps *store) PutLeecher(ih bittorrent.InfoHash, p bittorrent.Peer) error {
select {
case <-ps.closed:
panic("attempted to interact with stopped memory store")
default:
}
pk := newPeerKey(p)
pk := storage.NewSerializedPeer(p)
shard := ps.shards[ps.shardIndex(ih, p.IP.AddressFamily)]
shard.Lock()
if _, ok := shard.swarms[ih]; !ok {
shard.swarms[ih] = swarm{
seeders: make(map[serializedPeer]int64),
leechers: make(map[serializedPeer]int64),
seeders: make(map[storage.SerializedPeer]int64),
leechers: make(map[storage.SerializedPeer]int64),
}
}
@@ -364,14 +333,14 @@ func (ps *peerStore) PutLeecher(ih bittorrent.InfoHash, p bittorrent.Peer) error
return nil
}
func (ps *peerStore) DeleteLeecher(ih bittorrent.InfoHash, p bittorrent.Peer) error {
func (ps *store) DeleteLeecher(ih bittorrent.InfoHash, p bittorrent.Peer) error {
select {
case <-ps.closed:
panic("attempted to interact with stopped memory store")
default:
}
pk := newPeerKey(p)
pk := storage.NewSerializedPeer(p)
shard := ps.shards[ps.shardIndex(ih, p.IP.AddressFamily)]
shard.Lock()
@@ -397,22 +366,22 @@ func (ps *peerStore) DeleteLeecher(ih bittorrent.InfoHash, p bittorrent.Peer) er
return nil
}
func (ps *peerStore) GraduateLeecher(ih bittorrent.InfoHash, p bittorrent.Peer) error {
func (ps *store) GraduateLeecher(ih bittorrent.InfoHash, p bittorrent.Peer) error {
select {
case <-ps.closed:
panic("attempted to interact with stopped memory store")
default:
}
pk := newPeerKey(p)
pk := storage.NewSerializedPeer(p)
shard := ps.shards[ps.shardIndex(ih, p.IP.AddressFamily)]
shard.Lock()
if _, ok := shard.swarms[ih]; !ok {
shard.swarms[ih] = swarm{
seeders: make(map[serializedPeer]int64),
leechers: make(map[serializedPeer]int64),
seeders: make(map[storage.SerializedPeer]int64),
leechers: make(map[storage.SerializedPeer]int64),
}
}
@@ -434,7 +403,7 @@ func (ps *peerStore) GraduateLeecher(ih bittorrent.InfoHash, p bittorrent.Peer)
return nil
}
func (ps *peerStore) AnnouncePeers(ih bittorrent.InfoHash, seeder bool, numWant int, announcer bittorrent.Peer) (peers []bittorrent.Peer, err error) {
func (ps *store) AnnouncePeers(ih bittorrent.InfoHash, seeder bool, numWant int, announcer bittorrent.Peer) (peers []bittorrent.Peer, err error) {
select {
case <-ps.closed:
panic("attempted to interact with stopped memory store")
@@ -457,7 +426,7 @@ func (ps *peerStore) AnnouncePeers(ih bittorrent.InfoHash, seeder bool, numWant
break
}
peers = append(peers, decodePeerKey(pk))
peers = append(peers, pk.ToPeer())
numWant--
}
} else {
@@ -468,14 +437,14 @@ func (ps *peerStore) AnnouncePeers(ih bittorrent.InfoHash, seeder bool, numWant
break
}
peers = append(peers, decodePeerKey(pk))
peers = append(peers, pk.ToPeer())
numWant--
}
// Append leechers until we reach numWant.
if numWant > 0 {
leechers := shard.swarms[ih].leechers
announcerPK := newPeerKey(announcer)
announcerPK := storage.NewSerializedPeer(announcer)
for pk := range leechers {
if pk == announcerPK {
continue
@@ -485,7 +454,7 @@ func (ps *peerStore) AnnouncePeers(ih bittorrent.InfoHash, seeder bool, numWant
break
}
peers = append(peers, decodePeerKey(pk))
peers = append(peers, pk.ToPeer())
numWant--
}
}
@@ -495,7 +464,7 @@ func (ps *peerStore) AnnouncePeers(ih bittorrent.InfoHash, seeder bool, numWant
return
}
func (ps *peerStore) ScrapeSwarm(ih bittorrent.InfoHash, addressFamily bittorrent.AddressFamily) (resp bittorrent.Scrape) {
func (ps *store) ScrapeSwarm(ih bittorrent.InfoHash, addressFamily bittorrent.AddressFamily) (resp bittorrent.Scrape) {
select {
case <-ps.closed:
panic("attempted to interact with stopped memory store")
@@ -519,12 +488,65 @@ func (ps *peerStore) ScrapeSwarm(ih bittorrent.InfoHash, addressFamily bittorren
return
}
func asKey(in interface{}) interface{} {
if in == nil {
panic("unable to use nil map key")
}
if reflect.TypeOf(in).Comparable() {
return in
}
//FIXME: dirty hack
return fmt.Sprint(in)
}
func (ps *store) Put(ctx string, key, value interface{}) {
m, _ := ps.contexts.LoadOrStore(ctx, new(sync.Map))
m.(*sync.Map).Store(asKey(key), value)
}
func (ps *store) Contains(ctx string, key interface{}) bool {
var exist bool
if m, found := ps.contexts.Load(ctx); found {
_, exist = m.(*sync.Map).Load(asKey(key))
}
return exist
}
func (ps *store) BulkPut(ctx string, pairs ...storage.Pair) {
if len(pairs) > 0 {
c, _ := ps.contexts.LoadOrStore(ctx, new(sync.Map))
m := c.(*sync.Map)
for _, p := range pairs {
m.Store(asKey(p.Left), p.Right)
}
}
}
func (ps *store) Load(ctx string, key interface{}) interface{} {
var v interface{}
if m, found := ps.contexts.Load(ctx); found {
v, _ = m.(*sync.Map).Load(asKey(key))
}
return v
}
func (ps *store) Delete(ctx string, keys ...interface{}) {
if len(keys) > 0 {
if m, found := ps.contexts.Load(ctx); found {
m := m.(*sync.Map)
for k := range keys {
m.Delete(asKey(k))
}
}
}
}
// collectGarbage deletes all Peers from the Storage which are older than the
// cutoff time.
//
// This function must be able to execute while other methods on this interface
// are being executed in parallel.
func (ps *peerStore) collectGarbage(cutoff time.Time) error {
func (ps *store) collectGarbage(cutoff time.Time) error {
select {
case <-ps.closed:
return nil
@@ -582,7 +604,7 @@ func (ps *peerStore) collectGarbage(cutoff time.Time) error {
return nil
}
func (ps *peerStore) Stop() stop.Result {
func (ps *store) Stop() stop.Result {
c := make(stop.Channel)
go func() {
close(ps.closed)
@@ -601,6 +623,6 @@ func (ps *peerStore) Stop() stop.Result {
return c.Result()
}
func (ps *peerStore) LogFields() log.Fields {
func (ps *store) LogFields() log.Fields {
return ps.cfg.LogFields()
}
+44
View File
@@ -0,0 +1,44 @@
package storage
import (
"encoding/binary"
"github.com/chihaya/chihaya/bittorrent"
"net"
)
type Pair struct {
Left, Right interface{}
}
type SerializedPeer string
func NewSerializedPeer(p bittorrent.Peer) SerializedPeer {
b := make([]byte, bittorrent.PeerIDLen+2+len(p.IP.IP))
copy(b[:bittorrent.PeerIDLen], p.ID[:])
binary.BigEndian.PutUint16(b[bittorrent.PeerIDLen:bittorrent.PeerIDLen+2], p.Port)
copy(b[bittorrent.PeerIDLen+2:], p.IP.IP)
return SerializedPeer(b)
}
func (pk SerializedPeer) ToPeer() bittorrent.Peer {
peerId, err := bittorrent.NewPeerID([]byte(pk[:bittorrent.PeerIDLen]))
if err != nil {
panic(err)
}
peer := bittorrent.Peer{
ID: peerId,
Port: binary.BigEndian.Uint16([]byte(pk[bittorrent.PeerIDLen : bittorrent.PeerIDLen+2])),
IP: bittorrent.IP{IP: net.IP(pk[bittorrent.PeerIDLen+2:])}}
if ip := peer.IP.To4(); ip != nil {
peer.IP.IP = ip
peer.IP.AddressFamily = bittorrent.IPv4
} else if len(peer.IP.IP) == net.IPv6len { // implies toReturn.IP.To4() == nil
peer.IP.AddressFamily = bittorrent.IPv6
} else {
panic("IP is neither v4 nor v6")
}
return peer
}
+2 -2
View File
@@ -27,10 +27,10 @@ func newRedisBackend(cfg *Config, u *redisURL, socketPath string) *redisBackend
ConnectTimeout: cfg.RedisConnectTimeout,
}
pool := rc.NewPool()
redsync := redsync.New([]redsync.Pool{pool})
rs := redsync.New([]redsync.Pool{pool})
return &redisBackend{
pool: pool,
redsync: redsync,
redsync: rs,
}
}
@@ -24,14 +24,13 @@
package redis
import (
"encoding/binary"
"net"
"fmt"
"strconv"
"sync"
"time"
"github.com/gomodule/redigo/redis"
yaml "gopkg.in/yaml.v2"
"gopkg.in/yaml.v2"
"github.com/chihaya/chihaya/bittorrent"
"github.com/chihaya/chihaya/pkg/log"
@@ -61,7 +60,7 @@ func init() {
type driver struct{}
func (d driver) NewPeerStore(icfg interface{}) (storage.Storage, error) {
func (d driver) NewStorage(icfg interface{}) (storage.Storage, error) {
// Marshal the config back into bytes.
bytes, err := yaml.Marshal(icfg)
if err != nil {
@@ -185,7 +184,7 @@ func New(provided Config) (storage.Storage, error) {
return nil, err
}
ps := &peerStore{
ps := &store{
cfg: cfg,
rb: newRedisBackend(&provided, u, ""),
closed: make(chan struct{}),
@@ -234,41 +233,7 @@ func New(provided Config) (storage.Storage, error) {
return ps, nil
}
type serializedPeer string
func newPeerKey(p bittorrent.Peer) serializedPeer {
b := make([]byte, bittorrent.PeerIDLen+2+len(p.IP.IP))
copy(b[:bittorrent.PeerIDLen], p.ID[:])
binary.BigEndian.PutUint16(b[bittorrent.PeerIDLen:bittorrent.PeerIDLen+2], p.Port)
copy(b[bittorrent.PeerIDLen+2:], p.IP.IP)
return serializedPeer(b)
}
// TODO: move duplicated code into one place
func decodePeerKey(pk serializedPeer) bittorrent.Peer {
peerId, err := bittorrent.NewPeerID([]byte(pk[:bittorrent.PeerIDLen]))
if err != nil {
panic(err)
}
peer := bittorrent.Peer{
ID: peerId,
Port: binary.BigEndian.Uint16([]byte(pk[bittorrent.PeerIDLen : bittorrent.PeerIDLen+2])),
IP: bittorrent.IP{IP: net.IP(pk[bittorrent.PeerIDLen+2:])}}
if ip := peer.IP.To4(); ip != nil {
peer.IP.IP = ip
peer.IP.AddressFamily = bittorrent.IPv4
} else if len(peer.IP.IP) == net.IPv6len { // implies toReturn.IP.To4() == nil
peer.IP.AddressFamily = bittorrent.IPv6
} else {
panic("IP is neither v4 nor v6")
}
return peer
}
type peerStore struct {
type store struct {
cfg Config
rb *redisBackend
@@ -276,31 +241,31 @@ type peerStore struct {
wg sync.WaitGroup
}
func (ps *peerStore) groups() []string {
func (ps *store) groups() []string {
return []string{bittorrent.IPv4.String(), bittorrent.IPv6.String()}
}
func (ps *peerStore) leecherInfohashKey(af, ih string) string {
func (ps *store) leecherInfohashKey(af, ih string) string {
return af + "_L_" + ih
}
func (ps *peerStore) seederInfohashKey(af, ih string) string {
func (ps *store) seederInfohashKey(af, ih string) string {
return af + "_S_" + ih
}
func (ps *peerStore) infohashCountKey(af string) string {
func (ps *store) infohashCountKey(af string) string {
return af + "_infohash_count"
}
func (ps *peerStore) seederCountKey(af string) string {
func (ps *store) seederCountKey(af string) string {
return af + "_S_count"
}
func (ps *peerStore) leecherCountKey(af string) string {
func (ps *store) leecherCountKey(af string) string {
return af + "_L_count"
}
func (ps *peerStore) getConnection() redis.Conn {
func (ps *store) getConnection() redis.Conn {
select {
case <-ps.closed:
panic("attempted to interact with stopped redis store")
@@ -309,13 +274,21 @@ func (ps *peerStore) getConnection() redis.Conn {
return ps.rb.open()
}
func closeConnection(con redis.Conn) {
if con != nil {
if err := con.Close(); err != nil {
log.Err(err)
}
}
}
// populateProm aggregates metrics over all groups and then posts them to
// prometheus.
func (ps *peerStore) populateProm() {
func (ps *store) populateProm() {
var numInfohashes, numSeeders, numLeechers int64
conn := ps.getConnection()
defer conn.Close()
defer closeConnection(conn)
for _, group := range ps.groups() {
if n, err := redis.Int64(conn.Do("GET", ps.infohashCountKey(group))); err != nil && err != redis.ErrNil {
@@ -349,27 +322,27 @@ func (ps *peerStore) populateProm() {
storage.PromLeechersCount.Set(float64(numLeechers))
}
func (ps *peerStore) getClock() int64 {
func (ps *store) getClock() int64 {
return timecache.NowUnixNano()
}
func (ps *peerStore) PutSeeder(ih bittorrent.InfoHash, p bittorrent.Peer) error {
func (ps *store) PutSeeder(ih bittorrent.InfoHash, p bittorrent.Peer) error {
addressFamily := p.IP.AddressFamily.String()
log.Debug("storage: PutSeeder", log.Fields{
"InfoHash": ih.String(),
"Peer": p,
})
pk := newPeerKey(p)
pk := storage.NewSerializedPeer(p)
encodedSeederInfoHash := ps.seederInfohashKey(addressFamily, ih.String())
ct := ps.getClock()
conn := ps.getConnection()
defer conn.Close()
defer closeConnection(conn)
conn.Send("MULTI")
conn.Send("HSET", encodedSeederInfoHash, pk, ct)
conn.Send("HSET", addressFamily, encodedSeederInfoHash, ct)
_ = conn.Send("MULTI")
_ = conn.Send("HSET", encodedSeederInfoHash, pk, ct)
_ = conn.Send("HSET", addressFamily, encodedSeederInfoHash, ct)
reply, err := redis.Int64s(conn.Do("EXEC"))
if err != nil {
return err
@@ -393,17 +366,17 @@ func (ps *peerStore) PutSeeder(ih bittorrent.InfoHash, p bittorrent.Peer) error
return nil
}
func (ps *peerStore) DeleteSeeder(ih bittorrent.InfoHash, p bittorrent.Peer) error {
func (ps *store) DeleteSeeder(ih bittorrent.InfoHash, p bittorrent.Peer) error {
addressFamily := p.IP.AddressFamily.String()
log.Debug("storage: DeleteSeeder", log.Fields{
"InfoHash": ih.String(),
"Peer": p,
})
pk := newPeerKey(p)
pk := storage.NewSerializedPeer(p)
conn := ps.getConnection()
defer conn.Close()
defer closeConnection(conn)
encodedSeederInfoHash := ps.seederInfohashKey(addressFamily, ih.String())
@@ -421,7 +394,7 @@ func (ps *peerStore) DeleteSeeder(ih bittorrent.InfoHash, p bittorrent.Peer) err
return nil
}
func (ps *peerStore) PutLeecher(ih bittorrent.InfoHash, p bittorrent.Peer) error {
func (ps *store) PutLeecher(ih bittorrent.InfoHash, p bittorrent.Peer) error {
addressFamily := p.IP.AddressFamily.String()
log.Debug("storage: PutLeecher", log.Fields{
"InfoHash": ih.String(),
@@ -430,15 +403,15 @@ func (ps *peerStore) PutLeecher(ih bittorrent.InfoHash, p bittorrent.Peer) error
// Update the peer in the swarm.
encodedLeecherInfoHash := ps.leecherInfohashKey(addressFamily, ih.String())
pk := newPeerKey(p)
pk := storage.NewSerializedPeer(p)
ct := ps.getClock()
conn := ps.getConnection()
defer conn.Close()
defer closeConnection(conn)
conn.Send("MULTI")
conn.Send("HSET", encodedLeecherInfoHash, pk, ct)
conn.Send("HSET", addressFamily, encodedLeecherInfoHash, ct)
_ = conn.Send("MULTI")
_ = conn.Send("HSET", encodedLeecherInfoHash, pk, ct)
_ = conn.Send("HSET", addressFamily, encodedLeecherInfoHash, ct)
reply, err := redis.Int64s(conn.Do("EXEC"))
if err != nil {
return err
@@ -453,7 +426,7 @@ func (ps *peerStore) PutLeecher(ih bittorrent.InfoHash, p bittorrent.Peer) error
return nil
}
func (ps *peerStore) DeleteLeecher(ih bittorrent.InfoHash, p bittorrent.Peer) error {
func (ps *store) DeleteLeecher(ih bittorrent.InfoHash, p bittorrent.Peer) error {
addressFamily := p.IP.AddressFamily.String()
log.Debug("storage: DeleteLeecher", log.Fields{
"InfoHash": ih.String(),
@@ -461,9 +434,9 @@ func (ps *peerStore) DeleteLeecher(ih bittorrent.InfoHash, p bittorrent.Peer) er
})
conn := ps.getConnection()
defer conn.Close()
defer closeConnection(conn)
pk := newPeerKey(p)
pk := storage.NewSerializedPeer(p)
encodedLeecherInfoHash := ps.leecherInfohashKey(addressFamily, ih.String())
delNum, err := redis.Int64(conn.Do("HDEL", encodedLeecherInfoHash, pk))
@@ -478,7 +451,7 @@ func (ps *peerStore) DeleteLeecher(ih bittorrent.InfoHash, p bittorrent.Peer) er
return nil
}
func (ps *peerStore) GraduateLeecher(ih bittorrent.InfoHash, p bittorrent.Peer) error {
func (ps *store) GraduateLeecher(ih bittorrent.InfoHash, p bittorrent.Peer) error {
addressFamily := p.IP.AddressFamily.String()
log.Debug("storage: GraduateLeecher", log.Fields{
"InfoHash": ih.String(),
@@ -488,16 +461,16 @@ func (ps *peerStore) GraduateLeecher(ih bittorrent.InfoHash, p bittorrent.Peer)
encodedInfoHash := ih.String()
encodedLeecherInfoHash := ps.leecherInfohashKey(addressFamily, encodedInfoHash)
encodedSeederInfoHash := ps.seederInfohashKey(addressFamily, encodedInfoHash)
pk := newPeerKey(p)
pk := storage.NewSerializedPeer(p)
ct := ps.getClock()
conn := ps.getConnection()
defer conn.Close()
defer closeConnection(conn)
conn.Send("MULTI")
conn.Send("HDEL", encodedLeecherInfoHash, pk)
conn.Send("HSET", encodedSeederInfoHash, pk, ct)
conn.Send("HSET", addressFamily, encodedSeederInfoHash, ct)
_ = conn.Send("MULTI")
_ = conn.Send("HDEL", encodedLeecherInfoHash, pk)
_ = conn.Send("HSET", encodedSeederInfoHash, pk, ct)
_ = conn.Send("HSET", addressFamily, encodedSeederInfoHash, ct)
reply, err := redis.Int64s(conn.Do("EXEC"))
if err != nil {
return err
@@ -524,7 +497,7 @@ func (ps *peerStore) GraduateLeecher(ih bittorrent.InfoHash, p bittorrent.Peer)
return nil
}
func (ps *peerStore) AnnouncePeers(ih bittorrent.InfoHash, seeder bool, numWant int, announcer bittorrent.Peer) (peers []bittorrent.Peer, err error) {
func (ps *store) AnnouncePeers(ih bittorrent.InfoHash, seeder bool, numWant int, announcer bittorrent.Peer) (peers []bittorrent.Peer, err error) {
addressFamily := announcer.IP.AddressFamily.String()
log.Debug("storage: AnnouncePeers", log.Fields{
"InfoHash": ih.String(),
@@ -538,7 +511,7 @@ func (ps *peerStore) AnnouncePeers(ih bittorrent.InfoHash, seeder bool, numWant
encodedSeederInfoHash := ps.seederInfohashKey(addressFamily, encodedInfoHash)
conn := ps.getConnection()
defer conn.Close()
defer closeConnection(conn)
leechers, err := conn.Do("HKEYS", encodedLeecherInfoHash)
if err != nil {
@@ -563,7 +536,7 @@ func (ps *peerStore) AnnouncePeers(ih bittorrent.InfoHash, seeder bool, numWant
break
}
peers = append(peers, decodePeerKey(serializedPeer(pk.([]byte))))
peers = append(peers, storage.SerializedPeer(pk.([]byte)).ToPeer())
numWant--
}
} else {
@@ -573,13 +546,13 @@ func (ps *peerStore) AnnouncePeers(ih bittorrent.InfoHash, seeder bool, numWant
break
}
peers = append(peers, decodePeerKey(serializedPeer(pk.([]byte))))
peers = append(peers, storage.SerializedPeer(pk.([]byte)).ToPeer())
numWant--
}
// Append leechers until we reach numWant.
if numWant > 0 {
announcerPK := newPeerKey(announcer)
announcerPK := storage.NewSerializedPeer(announcer)
for _, pk := range conLeechers {
if pk == announcerPK {
continue
@@ -589,7 +562,7 @@ func (ps *peerStore) AnnouncePeers(ih bittorrent.InfoHash, seeder bool, numWant
break
}
peers = append(peers, decodePeerKey(serializedPeer(pk.([]byte))))
peers = append(peers, storage.SerializedPeer(pk.([]byte)).ToPeer())
numWant--
}
}
@@ -598,7 +571,7 @@ func (ps *peerStore) AnnouncePeers(ih bittorrent.InfoHash, seeder bool, numWant
return
}
func (ps *peerStore) ScrapeSwarm(ih bittorrent.InfoHash, af bittorrent.AddressFamily) (resp bittorrent.Scrape) {
func (ps *store) ScrapeSwarm(ih bittorrent.InfoHash, af bittorrent.AddressFamily) (resp bittorrent.Scrape) {
resp.InfoHash = ih
addressFamily := af.String()
@@ -607,7 +580,7 @@ func (ps *peerStore) ScrapeSwarm(ih bittorrent.InfoHash, af bittorrent.AddressFa
encodedSeederInfoHash := ps.seederInfohashKey(addressFamily, encodedInfoHash)
conn := ps.getConnection()
defer conn.Close()
defer closeConnection(conn)
leechersLen, err := redis.Int64(conn.Do("HLEN", encodedLeecherInfoHash))
if err != nil {
@@ -633,6 +606,62 @@ func (ps *peerStore) ScrapeSwarm(ih bittorrent.InfoHash, af bittorrent.AddressFa
return
}
func (ps *store) Put(ctx string, key, value interface{}) {
conn := ps.getConnection()
defer closeConnection(conn)
_ = conn.Send("HSET", ctx, key, value)
}
func (ps *store) Contains(ctx string, key interface{}) bool {
conn := ps.getConnection()
defer closeConnection(conn)
exist, _ := redis.Bool(conn.Do("HEXISTS", ctx, key))
return exist
}
func (ps *store) BulkPut(ctx string, pairs ...storage.Pair) {
switch l := len(pairs); l {
case 0:
break
case 1:
ps.Put(ctx, fmt.Sprint(pairs[0].Left), pairs[0].Right)
default:
conn := ps.getConnection()
defer closeConnection(conn)
args := make([]interface{}, 1, l*2+1)
args[0] = ctx
for _, p := range pairs {
args = append(args, p.Left, p.Right)
}
_ = conn.Send("HSET", args...)
}
}
func (ps *store) Load(ctx string, key interface{}) interface{} {
conn := ps.getConnection()
defer closeConnection(conn)
v, _ := conn.Do("HGET", ctx, key)
return v
}
func (ps *store) Delete(ctx string, keys ...interface{}) {
switch l := len(keys); l {
case 0:
break
case 1:
conn := ps.getConnection()
defer closeConnection(conn)
_ = conn.Send("HDEL", ctx, keys[0])
default:
conn := ps.getConnection()
defer closeConnection(conn)
args := make([]interface{}, 1, l+1)
args[0] = ctx
args = append(args, keys...)
_ = conn.Send("HDEL", args...)
}
}
// collectGarbage deletes all Peers from the Storage which are older than the
// cutoff time.
//
@@ -678,12 +707,12 @@ func (ps *peerStore) ScrapeSwarm(ih bittorrent.InfoHash, af bittorrent.AddressFa
// - If the change happens after the HLEN, we will not even attempt to make the
// transaction. The infohash key will remain in the addressFamil hash and
// we'll attempt to clean it up the next time collectGarbage runs.
func (ps *peerStore) collectGarbage(cutoff time.Time) error {
func (ps *store) collectGarbage(cutoff time.Time) error {
cutoffUnix := cutoff.UnixNano()
start := time.Now()
conn := ps.getConnection()
defer conn.Close()
defer closeConnection(conn)
for _, group := range ps.groups() {
// list all infohashes in the group
@@ -701,7 +730,7 @@ func (ps *peerStore) collectGarbage(cutoff time.Time) error {
return err
}
var pk serializedPeer
var pk storage.SerializedPeer
var removedPeerCount int64
for index, ihField := range ihList {
if index%2 == 1 { // value
@@ -711,7 +740,7 @@ func (ps *peerStore) collectGarbage(cutoff time.Time) error {
}
if mtime <= cutoffUnix {
log.Debug("storage: deleting peer", log.Fields{
"Peer": decodePeerKey(pk).String(),
"Peer": pk.ToPeer(),
})
ret, err := redis.Int64(conn.Do("HDEL", ihStr, pk))
if err != nil {
@@ -721,7 +750,7 @@ func (ps *peerStore) collectGarbage(cutoff time.Time) error {
removedPeerCount += ret
}
} else { // key
pk = serializedPeer([]byte(ihField))
pk = storage.SerializedPeer(ihField)
}
}
// DECR seeder/leecher counter
@@ -750,10 +779,10 @@ func (ps *peerStore) collectGarbage(cutoff time.Time) error {
// in other words, it's removed automatically after `HDEL` the last field.
//_, err := conn.Do("DEL", ihStr)
conn.Send("MULTI")
conn.Send("HDEL", group, ihStr)
_ = conn.Send("MULTI")
_ = conn.Send("HDEL", group, ihStr)
if isSeeder {
conn.Send("DECR", ps.infohashCountKey(group))
_ = conn.Send("DECR", ps.infohashCountKey(group))
}
_, err = redis.Values(conn.Do("EXEC"))
if err != nil && err != redis.ErrNil {
@@ -778,7 +807,7 @@ func (ps *peerStore) collectGarbage(cutoff time.Time) error {
return nil
}
func (ps *peerStore) Stop() stop.Result {
func (ps *store) Stop() stop.Result {
c := make(stop.Channel)
go func() {
close(ps.closed)
@@ -790,6 +819,6 @@ func (ps *peerStore) Stop() stop.Result {
return c.Result()
}
func (ps *peerStore) LogFields() log.Fields {
func (ps *store) LogFields() log.Fields {
return ps.cfg.LogFields()
}
+17 -7
View File
@@ -16,7 +16,7 @@ var (
// Driver is the interface used to initialize a new type of Storage.
type Driver interface {
NewPeerStore(cfg interface{}) (Storage, error)
NewStorage(cfg interface{}) (Storage, error)
}
// ErrResourceDoesNotExist is the error returned by all delete methods and the
@@ -104,13 +104,23 @@ type Storage interface {
// If the Swarm does not exist, an empty Scrape and no error is returned.
ScrapeSwarm(infoHash bittorrent.InfoHash, addressFamily bittorrent.AddressFamily) bittorrent.Scrape
/*TODO: implement this*/
Put(key interface{}, value interface{})
// Put used to place arbitrary k-v data with specified context
// into storage. ctx parameter used to group data
// (i.e. data only for specific middleware module)
Put(ctx string, key, value interface{})
Load(key interface{}) interface{}
// BulkPut used to place array of k-v data in specified context.
// Useful when several data entries should be added in single transaction/connection
BulkPut(ctx string, pairs ...Pair)
Delete(key interface{})
// Contains checks if any data in specified context exist
Contains(ctx string, key interface{}) bool
// Load used to get arbitrary data in specified context by its key
Load(ctx string, key interface{}) interface{}
// Delete used to delete arbitrary data in specified context by its keys
Delete(ctx string, keys ...interface{})
// stop.Stopper is an interface that expects a Stop method to stop the
// Storage.
@@ -158,5 +168,5 @@ func NewStorage(name string, cfg interface{}) (ps Storage, err error) {
return nil, ErrDriverDoesNotExist
}
return d.NewPeerStore(cfg)
return d.NewStorage(cfg)
}
+3 -3
View File
@@ -17,7 +17,7 @@ type benchData struct {
func generateInfohashes() (a [1000]bittorrent.InfoHash) {
for i := range a {
b := make([]byte, 20)
b := make([]byte, bittorrent.InfoHashV1Len)
rand.Read(b)
a[i], _ = bittorrent.NewInfoHash(b)
}
@@ -33,9 +33,9 @@ func generatePeers() (a [1000]bittorrent.Peer) {
if err != nil || n != 4 {
panic("unable to create random bytes")
}
id := [20]byte{}
id := [bittorrent.PeerIDLen]byte{}
n, err = r.Read(id[:])
if err != nil || n != 20 {
if err != nil || n != bittorrent.InfoHashV1Len {
panic("unable to create random bytes")
}
port := uint16(r.Uint32())