Gegin work on shared store for handlers

This commit is contained in:
Širhoe Biazhkovič
2021-11-19 19:25:44 +03:00
committed by Lawrence, Rendall
parent af1cbc543c
commit 566d99fcd7
28 changed files with 271 additions and 294 deletions

View File

@@ -12,20 +12,23 @@ import (
"github.com/chihaya/chihaya/pkg/log" "github.com/chihaya/chihaya/pkg/log"
) )
// PeerID represents a peer ID. const PeerIDLen = 20
type PeerID [20]byte
// PeerIDFromBytes creates a PeerID from a byte slice. // PeerID represents a peer ID.
type PeerID [PeerIDLen]byte
var InvalidPeerIDSizeError = errors.New("peer ID must be 20 bytes")
// NewPeerID creates a PeerID from a byte slice.
// //
// It panics if b is not 20 bytes long. // It panics if b is not 20 bytes long.
func PeerIDFromBytes(b []byte) PeerID { func NewPeerID(b []byte) (PeerID, error) {
if len(b) != 20 { var p PeerID
panic("peer ID must be 20 bytes") if len(b) != PeerIDLen {
return p, InvalidPeerIDSizeError
} }
copy(p[:], b)
var buf [20]byte return p, nil
copy(buf[:], b)
return buf
} }
// String implements fmt.Stringer, returning the base16 encoded PeerID. // String implements fmt.Stringer, returning the base16 encoded PeerID.
@@ -38,36 +41,22 @@ func (p PeerID) RawString() string {
return string(p[:]) return string(p[:])
} }
// PeerIDFromString creates a PeerID from a string.
//
// It panics if s is not 20 bytes long.
func PeerIDFromString(s string) PeerID {
if len(s) != 20 {
panic("peer ID must be 20 bytes")
}
var buf [20]byte
copy(buf[:], s)
return buf
}
// InfoHash represents an infohash. // InfoHash represents an infohash.
type InfoHash []byte type InfoHash string
const ( const (
InfoHashV1Len = 20 InfoHashV1Len = 20
InfoHashV2Len = 32 InfoHashV2Len = 32
NoneInfoHash InfoHash = ""
) )
var invalidHashSize = errors.New("InfoHash must be either 20 (for torrent V1) or 32 (V2) bytes") var InvalidHashSizeError = errors.New("info hash must be either 20 (for torrent V1) or 32 (V2) bytes")
// BytesV1 returns 20-bytes length array of the corresponding InfoHash. // TruncateV1 returns truncated to 20-bytes length array of the corresponding InfoHash.
// If InfoHash is V2 (32 bytes), it will be truncated to 20 bytes // If InfoHash is V2 (32 bytes), it will be truncated to 20 bytes
// according to BEP52. // according to BEP52.
func (i InfoHash) BytesV1() [InfoHashV1Len]byte{ func (i InfoHash) TruncateV1() InfoHash {
var bb [InfoHashV1Len]byte return i[:InfoHashV1Len]
copy(bb[:], i)
return bb
} }
// ValidateInfoHash validates input bytes size and returns it // ValidateInfoHash validates input bytes size and returns it
@@ -76,27 +65,21 @@ func (i InfoHash) BytesV1() [InfoHashV1Len]byte{
func ValidateInfoHash(b []byte) (int, error) { func ValidateInfoHash(b []byte) (int, error) {
l := len(b) l := len(b)
if l != InfoHashV1Len && l != InfoHashV2Len { if l != InfoHashV1Len && l != InfoHashV2Len {
return 0, invalidHashSize return 0, InvalidHashSizeError
} }
return l, nil return l, nil
} }
// InfoHashFromBytes creates an InfoHash from a byte slice. // NewInfoHash creates an InfoHash from a byte slice.
func InfoHashFromBytes(b []byte) (InfoHash, error) { func NewInfoHash(b []byte) (InfoHash, error) {
if l, err := ValidateInfoHash(b); err != nil{ if _, err := ValidateInfoHash(b); err != nil {
return nil, err return NoneInfoHash, err
} else { } else {
buf := make([]byte, l) buf := InfoHash(b)
copy(buf[:], b)
return buf, nil return buf, nil
} }
} }
// InfoHashFromString creates an InfoHash from a string.
func InfoHashFromString(s string) (InfoHash, error) {
return InfoHashFromBytes([]byte(s))
}
// String implements fmt.Stringer, returning the base16 encoded InfoHash. // String implements fmt.Stringer, returning the base16 encoded InfoHash.
func (i InfoHash) String() string { func (i InfoHash) String() string {
return fmt.Sprintf("%x", i[:]) return fmt.Sprintf("%x", i[:])

View File

@@ -19,7 +19,7 @@ var peerStringTestCases = []struct {
}{ }{
{ {
input: Peer{ input: Peer{
ID: PeerIDFromBytes(b), ID: NewPeerID(b),
IP: IP{net.IPv4(10, 11, 12, 1), IPv4}, IP: IP{net.IPv4(10, 11, 12, 1), IPv4},
Port: 1234, Port: 1234,
}, },
@@ -27,7 +27,7 @@ var peerStringTestCases = []struct {
}, },
{ {
input: Peer{ input: Peer{
ID: PeerIDFromBytes(b), ID: NewPeerID(b),
IP: IP{net.ParseIP("2001:db8::ff00:42:8329"), IPv6}, IP: IP{net.ParseIP("2001:db8::ff00:42:8329"), IPv6},
Port: 1234, Port: 1234,
}, },
@@ -36,12 +36,12 @@ var peerStringTestCases = []struct {
} }
func TestPeerID_String(t *testing.T) { func TestPeerID_String(t *testing.T) {
s := PeerIDFromBytes(b).String() s := NewPeerID(b).String()
require.Equal(t, expected, s) require.Equal(t, expected, s)
} }
func TestInfoHash_String(t *testing.T) { func TestInfoHash_String(t *testing.T) {
ih, err := InfoHashFromBytes(b) ih, err := NewInfoHash(b)
require.Nil(t, err) require.Nil(t, err)
require.Equal(t, expected, ih.String()) require.Equal(t, expected, ih.String())
} }

View File

@@ -57,17 +57,17 @@ func EndToEndRunCmdFunc(cmd *cobra.Command, args []string) error {
func generateInfohash() bittorrent.InfoHash { func generateInfohash() bittorrent.InfoHash {
b := make([]byte, 20) b := make([]byte, 20)
rand.Read(b) rand.Read(b)
ih, _ := bittorrent.InfoHashFromBytes(b) ih, _ := bittorrent.NewInfoHash(b)
return ih return ih
} }
func test(addr string, delay time.Duration) error { func test(addr string, delay time.Duration) error {
ih, _ := generateInfohash().BytesV1() ih := generateInfohash().TruncateV1()
return testWithInfohash(ih, addr, delay) return testWithInfohash(ih, addr, delay)
} }
func testWithInfohash(infoHash [20]byte, url string, delay time.Duration) error { func testWithInfohash(infoHash bittorrent.InfoHash, url string, delay time.Duration) error {
var ih [20]byte var ih [bittorrent.InfoHashV1Len]byte
req := tracker.AnnounceRequest{ req := tracker.AnnounceRequest{
InfoHash: ih, InfoHash: ih,
PeerId: [20]byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20}, PeerId: [20]byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20},
@@ -95,8 +95,9 @@ func testWithInfohash(infoHash [20]byte, url string, delay time.Duration) error
time.Sleep(delay) time.Sleep(delay)
copy(ih[:], infoHash)
req = tracker.AnnounceRequest{ req = tracker.AnnounceRequest{
InfoHash: infoHash, InfoHash: ih,
PeerId: [20]byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 21}, PeerId: [20]byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 21},
Downloaded: 50, Downloaded: 50,
Left: 100, Left: 100,

View File

@@ -24,7 +24,7 @@ import (
// Run represents the state of a running instance of Chihaya. // Run represents the state of a running instance of Chihaya.
type Run struct { type Run struct {
configFilePath string configFilePath string
peerStore storage.PeerStore storage storage.Storage
logic *middleware.Logic logic *middleware.Logic
sg *stop.Group sg *stop.Group
} }
@@ -41,7 +41,7 @@ func NewRun(configFilePath string) (*Run, error) {
// Start begins an instance of Chihaya. // Start begins an instance of Chihaya.
// It is optional to provide an instance of the peer store to avoid the // It is optional to provide an instance of the peer store to avoid the
// creation of a new one. // creation of a new one.
func (r *Run) Start(ps storage.PeerStore) error { func (r *Run) Start(ps storage.Storage) error {
configFile, err := ParseConfigFile(r.configFilePath) configFile, err := ParseConfigFile(r.configFilePath)
if err != nil { if err != nil {
return errors.New("failed to read config: " + err.Error()) return errors.New("failed to read config: " + err.Error())
@@ -59,19 +59,19 @@ func (r *Run) Start(ps storage.PeerStore) error {
if ps == nil { if ps == nil {
log.Info("starting storage", log.Fields{"name": cfg.Storage.Name}) log.Info("starting storage", log.Fields{"name": cfg.Storage.Name})
ps, err = storage.NewPeerStore(cfg.Storage.Name, cfg.Storage.Config) ps, err = storage.NewStorage(cfg.Storage.Name, cfg.Storage.Config)
if err != nil { if err != nil {
return errors.New("failed to create storage: " + err.Error()) return errors.New("failed to create storage: " + err.Error())
} }
log.Info("started storage", ps) log.Info("started storage", ps)
} }
r.peerStore = ps r.storage = ps
preHooks, err := middleware.HooksFromHookConfigs(cfg.PreHooks) preHooks, err := middleware.HooksFromHookConfigs(cfg.PreHooks, r.storage)
if err != nil { if err != nil {
return errors.New("failed to validate hook config: " + err.Error()) return errors.New("failed to validate hook config: " + err.Error())
} }
postHooks, err := middleware.HooksFromHookConfigs(cfg.PostHooks) postHooks, err := middleware.HooksFromHookConfigs(cfg.PostHooks, r.storage)
if err != nil { if err != nil {
return errors.New("failed to validate hook config: " + err.Error()) return errors.New("failed to validate hook config: " + err.Error())
} }
@@ -80,7 +80,7 @@ func (r *Run) Start(ps storage.PeerStore) error {
"prehooks": cfg.PreHookNames(), "prehooks": cfg.PreHookNames(),
"posthooks": cfg.PostHookNames(), "posthooks": cfg.PostHookNames(),
}) })
r.logic = middleware.NewLogic(cfg.ResponseConfig, r.peerStore, preHooks, postHooks) r.logic = middleware.NewLogic(cfg.ResponseConfig, r.storage, preHooks, postHooks)
if cfg.HTTPConfig.Addr != "" { if cfg.HTTPConfig.Addr != "" {
log.Info("starting HTTP frontend", cfg.HTTPConfig) log.Info("starting HTTP frontend", cfg.HTTPConfig)
@@ -113,7 +113,7 @@ func combineErrors(prefix string, errs []error) error {
} }
// Stop shuts down an instance of Chihaya. // Stop shuts down an instance of Chihaya.
func (r *Run) Stop(keepPeerStore bool) (storage.PeerStore, error) { func (r *Run) Stop(keepPeerStore bool) (storage.Storage, error) {
log.Debug("stopping frontends and metrics server") log.Debug("stopping frontends and metrics server")
if errs := r.sg.Stop().Wait(); len(errs) != 0 { if errs := r.sg.Stop().Wait(); len(errs) != 0 {
return nil, combineErrors("failed while shutting down frontends", errs) return nil, combineErrors("failed while shutting down frontends", errs)
@@ -126,13 +126,13 @@ func (r *Run) Stop(keepPeerStore bool) (storage.PeerStore, error) {
if !keepPeerStore { if !keepPeerStore {
log.Debug("stopping peer store") log.Debug("stopping peer store")
if errs := r.peerStore.Stop().Wait(); len(errs) != 0 { if errs := r.storage.Stop().Wait(); len(errs) != 0 {
return nil, combineErrors("failed while shutting down peer store", errs) return nil, combineErrors("failed while shutting down peer store", errs)
} }
r.peerStore = nil r.storage = nil
} }
return r.peerStore, nil return r.storage, nil
} }
// RootRunCmdFunc implements a Cobra command that runs an instance of Chihaya // RootRunCmdFunc implements a Cobra command that runs an instance of Chihaya

View File

@@ -67,10 +67,10 @@ func ParseAnnounce(r *http.Request, opts ParseOptions) (*bittorrent.AnnounceRequ
if !ok { if !ok {
return nil, bittorrent.ClientError("failed to parse parameter: peer_id") return nil, bittorrent.ClientError("failed to parse parameter: peer_id")
} }
if len(peerID) != 20 { if len(peerID) != bittorrent.PeerIDLen {
return nil, bittorrent.ClientError("failed to provide valid peer_id") return nil, bittorrent.ClientError("failed to provide valid peer_id")
} }
request.Peer.ID = bittorrent.PeerIDFromString(peerID) request.Peer.ID = bittorrent.NewPeerID([]byte(peerID))
// Determine the number of remaining bytes for the client. // Determine the number of remaining bytes for the client.
request.Left, err = qp.Uint64("left") request.Left, err = qp.Uint64("left")

View File

@@ -10,7 +10,7 @@ import (
) )
func TestStartStopRaceIssue437(t *testing.T) { func TestStartStopRaceIssue437(t *testing.T) {
ps, err := storage.NewPeerStore("memory", nil) ps, err := storage.NewStorage("memory", nil)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }

View File

@@ -112,7 +112,7 @@ func ParseAnnounce(r Request, v6Action bool, opts ParseOptions) (*bittorrent.Ann
return nil, err return nil, err
} }
ih, err := bittorrent.InfoHashFromBytes(infohash) ih, err := bittorrent.NewInfoHash(infohash)
if err != nil{ if err != nil{
return nil, err return nil, err
} }
@@ -128,7 +128,7 @@ func ParseAnnounce(r Request, v6Action bool, opts ParseOptions) (*bittorrent.Ann
NumWantProvided: true, NumWantProvided: true,
EventProvided: true, EventProvided: true,
Peer: bittorrent.Peer{ Peer: bittorrent.Peer{
ID: bittorrent.PeerIDFromBytes(peerID), ID: bittorrent.NewPeerID(peerID),
IP: bittorrent.IP{IP: ip}, IP: bittorrent.IP{IP: ip},
Port: port, Port: port,
}, },
@@ -227,7 +227,7 @@ func ParseScrape(r Request, opts ParseOptions) (*bittorrent.ScrapeRequest, error
pageSize = bittorrent.InfoHashV2Len pageSize = bittorrent.InfoHashV2Len
} }
for len(r.Packet) >= pageSize { for len(r.Packet) >= pageSize {
if ih, err := bittorrent.InfoHashFromBytes(r.Packet[:pageSize]); err != nil{ if ih, err := bittorrent.NewInfoHash(r.Packet[:pageSize]); err != nil{
return nil, err return nil, err
} else { } else {
infohashes = append(infohashes, ih) infohashes = append(infohashes, ih)

View File

@@ -47,7 +47,7 @@ func TestClientID(t *testing.T) {
t.Run(tt.peerID, func(t *testing.T) { t.Run(tt.peerID, func(t *testing.T) {
var clientID ClientID var clientID ClientID
copy(clientID[:], tt.clientID) copy(clientID[:], tt.clientID)
parsedID := NewClientID(bittorrent.PeerIDFromString(tt.peerID)) parsedID := NewClientID(bittorrent.NewPeerID([]byte(tt.peerID)))
if parsedID != clientID { if parsedID != clientID {
t.Error("Incorrectly parsed peer ID", tt.peerID, "as", parsedID) t.Error("Incorrectly parsed peer ID", tt.peerID, "as", parsedID)
} }

View File

@@ -6,6 +6,7 @@ import (
"context" "context"
"errors" "errors"
"fmt" "fmt"
"github.com/chihaya/chihaya/storage"
"gopkg.in/yaml.v2" "gopkg.in/yaml.v2"
"github.com/chihaya/chihaya/bittorrent" "github.com/chihaya/chihaya/bittorrent"
@@ -23,7 +24,7 @@ var _ middleware.Driver = driver{}
type driver struct{} type driver struct{}
func (d driver) NewHook(optionBytes []byte) (middleware.Hook, error) { func (d driver) NewHook(optionBytes []byte, _ storage.Storage) (middleware.Hook, error) {
var cfg Config var cfg Config
err := yaml.Unmarshal(optionBytes, &cfg) err := yaml.Unmarshal(optionBytes, &cfg)
if err != nil { if err != nil {
@@ -100,7 +101,7 @@ func (h *hook) HandleAnnounce(ctx context.Context, req *bittorrent.AnnounceReque
return ctx, nil return ctx, nil
} }
func (h *hook) HandleScrape(ctx context.Context, req *bittorrent.ScrapeRequest, resp *bittorrent.ScrapeResponse) (context.Context, error) { func (h *hook) HandleScrape(ctx context.Context, _ *bittorrent.ScrapeRequest, _ *bittorrent.ScrapeResponse) (context.Context, error) {
// Scrapes don't require any protection. // Scrapes don't require any protection.
return ctx, nil return ctx, nil
} }

View File

@@ -59,7 +59,7 @@ func TestHandleAnnounce(t *testing.T) {
req := &bittorrent.AnnounceRequest{} req := &bittorrent.AnnounceRequest{}
resp := &bittorrent.AnnounceResponse{} resp := &bittorrent.AnnounceResponse{}
peerid := bittorrent.PeerIDFromString(tt.peerID) peerid := bittorrent.NewPeerID([]byte(tt.peerID))
req.Peer.ID = peerid req.Peer.ID = peerid

View File

@@ -25,7 +25,7 @@ type skipSwarmInteraction struct{}
var SkipSwarmInteractionKey = skipSwarmInteraction{} var SkipSwarmInteractionKey = skipSwarmInteraction{}
type swarmInteractionHook struct { type swarmInteractionHook struct {
store storage.PeerStore store storage.Storage
} }
func (h *swarmInteractionHook) HandleAnnounce(ctx context.Context, req *bittorrent.AnnounceRequest, resp *bittorrent.AnnounceResponse) (_ context.Context, err error) { func (h *swarmInteractionHook) HandleAnnounce(ctx context.Context, req *bittorrent.AnnounceRequest, resp *bittorrent.AnnounceResponse) (_ context.Context, err error) {
@@ -85,7 +85,7 @@ type scrapeAddressType struct{}
var ScrapeIsIPv6Key = scrapeAddressType{} var ScrapeIsIPv6Key = scrapeAddressType{}
type responseHook struct { type responseHook struct {
store storage.PeerStore store storage.Storage
} }
func (h *responseHook) HandleAnnounce(ctx context.Context, req *bittorrent.AnnounceRequest, resp *bittorrent.AnnounceResponse) (_ context.Context, err error) { func (h *responseHook) HandleAnnounce(ctx context.Context, req *bittorrent.AnnounceRequest, resp *bittorrent.AnnounceResponse) (_ context.Context, err error) {

View File

@@ -13,6 +13,7 @@ import (
"encoding/json" "encoding/json"
"errors" "errors"
"fmt" "fmt"
"github.com/chihaya/chihaya/storage"
"net/http" "net/http"
"strings" "strings"
"time" "time"
@@ -40,7 +41,7 @@ var _ middleware.Driver = driver{}
type driver struct{} type driver struct{}
func (d driver) NewHook(optionBytes []byte) (middleware.Hook, error) { func (d driver) NewHook(optionBytes []byte, _ storage.Storage) (middleware.Hook, error) {
var cfg Config var cfg Config
err := yaml.Unmarshal(optionBytes, &cfg) err := yaml.Unmarshal(optionBytes, &cfg)
if err != nil { if err != nil {
@@ -174,7 +175,7 @@ func (h *hook) HandleAnnounce(ctx context.Context, req *bittorrent.AnnounceReque
return ctx, nil return ctx, nil
} }
func (h *hook) HandleScrape(ctx context.Context, req *bittorrent.ScrapeRequest, resp *bittorrent.ScrapeResponse) (context.Context, error) { func (h *hook) HandleScrape(ctx context.Context, _ *bittorrent.ScrapeRequest, _ *bittorrent.ScrapeResponse) (context.Context, error) {
// Scrapes don't require any protection. // Scrapes don't require any protection.
return ctx, nil return ctx, nil
} }
@@ -204,7 +205,7 @@ func validateJWT(ih bittorrent.InfoHash, jwtBytes []byte, cfgIss, cfgAud string,
return jwt.ErrInvalidAUDClaim return jwt.ErrInvalidAUDClaim
} }
ihHex := hex.EncodeToString(ih[:]) ihHex := hex.EncodeToString([]byte(ih))
if ihClaim, ok := claims.Get("infohash").(string); !ok || ihClaim != ihHex { if ihClaim, ok := claims.Get("infohash").(string); !ok || ihClaim != ihHex {
log.Debug("unequal or missing infohash when validating JWT", log.Fields{ log.Debug("unequal or missing infohash when validating JWT", log.Fields{
"exists": ok, "exists": ok,

View File

@@ -26,11 +26,10 @@ var _ frontend.TrackerLogic = &Logic{}
// NewLogic creates a new instance of a TrackerLogic that executes the provided // NewLogic creates a new instance of a TrackerLogic that executes the provided
// middleware hooks. // middleware hooks.
func NewLogic(cfg ResponseConfig, peerStore storage.PeerStore, preHooks, postHooks []Hook) *Logic { func NewLogic(cfg ResponseConfig, peerStore storage.Storage, preHooks, postHooks []Hook) *Logic {
return &Logic{ return &Logic{
announceInterval: cfg.AnnounceInterval, announceInterval: cfg.AnnounceInterval,
minAnnounceInterval: cfg.MinAnnounceInterval, minAnnounceInterval: cfg.MinAnnounceInterval,
peerStore: peerStore,
preHooks: append(preHooks, &responseHook{store: peerStore}), preHooks: append(preHooks, &responseHook{store: peerStore}),
postHooks: append(postHooks, &swarmInteractionHook{store: peerStore}), postHooks: append(postHooks, &swarmInteractionHook{store: peerStore}),
} }
@@ -41,7 +40,6 @@ func NewLogic(cfg ResponseConfig, peerStore storage.PeerStore, preHooks, postHoo
type Logic struct { type Logic struct {
announceInterval time.Duration announceInterval time.Duration
minAnnounceInterval time.Duration minAnnounceInterval time.Duration
peerStore storage.PeerStore
preHooks []Hook preHooks []Hook
postHooks []Hook postHooks []Hook
} }

View File

@@ -4,6 +4,7 @@ package middleware
import ( import (
"errors" "errors"
"github.com/chihaya/chihaya/storage"
"sync" "sync"
yaml "gopkg.in/yaml.v2" yaml "gopkg.in/yaml.v2"
@@ -23,7 +24,7 @@ var (
// The options parameter is YAML encoded bytes that should be unmarshalled into // The options parameter is YAML encoded bytes that should be unmarshalled into
// the hook's custom configuration. // the hook's custom configuration.
type Driver interface { type Driver interface {
NewHook(options []byte) (Hook, error) NewHook(options []byte, storage storage.Storage) (Hook, error)
} }
// RegisterDriver makes a Driver available by the provided name. // RegisterDriver makes a Driver available by the provided name.
@@ -52,7 +53,7 @@ func RegisterDriver(name string, d Driver) {
// list of registered Drivers. // list of registered Drivers.
// //
// If a driver does not exist, returns ErrDriverDoesNotExist. // If a driver does not exist, returns ErrDriverDoesNotExist.
func New(name string, optionBytes []byte) (Hook, error) { func New(name string, optionBytes []byte, storage storage.Storage) (Hook, error) {
driversM.RLock() driversM.RLock()
defer driversM.RUnlock() defer driversM.RUnlock()
@@ -62,7 +63,7 @@ func New(name string, optionBytes []byte) (Hook, error) {
return nil, ErrDriverDoesNotExist return nil, ErrDriverDoesNotExist
} }
return d.NewHook(optionBytes) return d.NewHook(optionBytes, storage)
} }
// Config is the generic configuration format used for all registered Hooks. // Config is the generic configuration format used for all registered Hooks.
@@ -72,7 +73,7 @@ type Config struct {
} }
// HooksFromHookConfigs is a utility function for initializing Hooks in bulk. // HooksFromHookConfigs is a utility function for initializing Hooks in bulk.
func HooksFromHookConfigs(cfgs []Config) (hooks []Hook, err error) { func HooksFromHookConfigs(cfgs []Config, storage storage.Storage) (hooks []Hook, err error) {
for _, cfg := range cfgs { for _, cfg := range cfgs {
// Marshal the options back into bytes. // Marshal the options back into bytes.
var optionBytes []byte var optionBytes []byte
@@ -82,7 +83,7 @@ func HooksFromHookConfigs(cfgs []Config) (hooks []Hook, err error) {
} }
var h Hook var h Hook
h, err = New(cfg.Name, optionBytes) h, err = New(cfg.Name, optionBytes, storage)
if err != nil { if err != nil {
return return
} }

View File

@@ -11,7 +11,7 @@ import (
// //
// Calling DeriveEntropyFromRequest multiple times yields the same values. // Calling DeriveEntropyFromRequest multiple times yields the same values.
func DeriveEntropyFromRequest(req *bittorrent.AnnounceRequest) (uint64, uint64) { func DeriveEntropyFromRequest(req *bittorrent.AnnounceRequest) (uint64, uint64) {
v0 := binary.BigEndian.Uint64(req.InfoHash[:8]) + binary.BigEndian.Uint64(req.InfoHash[8:16]) v0 := binary.BigEndian.Uint64([]byte(req.InfoHash[:8])) + binary.BigEndian.Uint64([]byte(req.InfoHash[8:16]))
v1 := binary.BigEndian.Uint64(req.Peer.ID[:8]) + binary.BigEndian.Uint64(req.Peer.ID[8:16]) v1 := binary.BigEndian.Uint64(req.Peer.ID[:8]) + binary.BigEndian.Uint64(req.Peer.ID[8:16])
return v0, v1 return v0, v1
} }

View File

@@ -3,10 +3,11 @@ package container
import ( import (
"errors" "errors"
"github.com/chihaya/chihaya/bittorrent" "github.com/chihaya/chihaya/bittorrent"
"github.com/chihaya/chihaya/storage"
"sync" "sync"
) )
type Builder func ([]byte) (Container, error) type Builder func ([]byte, storage.Storage) (Container, error)
var ( var (
buildersMU sync.Mutex buildersMU sync.Mutex
@@ -32,7 +33,7 @@ type Container interface {
Contains(bittorrent.InfoHash) bool Contains(bittorrent.InfoHash) bool
} }
func GetContainer(name string, confBytes []byte) (Container, error) { func GetContainer(name string, confBytes []byte, storage storage.Storage) (Container, error) {
buildersMU.Lock() buildersMU.Lock()
defer buildersMU.Unlock() defer buildersMU.Unlock()
@@ -41,7 +42,7 @@ func GetContainer(name string, confBytes []byte) (Container, error) {
if builder, exist := builders[name]; !exist { if builder, exist := builders[name]; !exist {
err = ErrContainerDoesNotExist err = ErrContainerDoesNotExist
} else { } else {
cn, err = builder(confBytes) cn, err = builder(confBytes, storage)
} }
return cn, err return cn, err
} }

View File

@@ -9,6 +9,7 @@ import (
"github.com/chihaya/chihaya/middleware/torrentapproval/container" "github.com/chihaya/chihaya/middleware/torrentapproval/container"
"github.com/chihaya/chihaya/middleware/torrentapproval/container/list" "github.com/chihaya/chihaya/middleware/torrentapproval/container/list"
"github.com/chihaya/chihaya/pkg/stop" "github.com/chihaya/chihaya/pkg/stop"
"github.com/chihaya/chihaya/storage"
"gopkg.in/yaml.v2" "gopkg.in/yaml.v2"
"sync" "sync"
) )
@@ -22,7 +23,8 @@ type Config struct {
BlacklistPath string `yaml:"blacklist_path"` BlacklistPath string `yaml:"blacklist_path"`
} }
func build(confBytes []byte) (container.Container, error) { // TODO: change sync map to provided storage
func build(confBytes []byte, storage storage.Storage) (container.Container, error) {
c := new(Config) c := new(Config)
if err := yaml.Unmarshal(confBytes, c); err != nil { if err := yaml.Unmarshal(confBytes, c); err != nil {
return nil, fmt.Errorf("unable to deserialise configuration: %v", err) return nil, fmt.Errorf("unable to deserialise configuration: %v", err)
@@ -42,6 +44,7 @@ func build(confBytes []byte) (container.Container, error) {
if lst.Invert { if lst.Invert {
dir = c.BlacklistPath dir = c.BlacklistPath
} }
//FIXME: implement V2 torrent add/delete
var w *dirwatch.Instance var w *dirwatch.Instance
if w, err = dirwatch.New(dir); err != nil { if w, err = dirwatch.New(dir); err != nil {
return nil, fmt.Errorf("unable to initialize directory watch: %v", err) return nil, fmt.Errorf("unable to initialize directory watch: %v", err)

View File

@@ -5,8 +5,9 @@ package list
import ( import (
"encoding/hex" "encoding/hex"
"fmt" "fmt"
"github.com/chihaya/chihaya/bittorrent" bittorrent "github.com/chihaya/chihaya/bittorrent"
"github.com/chihaya/chihaya/middleware/torrentapproval/container" "github.com/chihaya/chihaya/middleware/torrentapproval/container"
"github.com/chihaya/chihaya/storage"
"gopkg.in/yaml.v2" "gopkg.in/yaml.v2"
"sync" "sync"
) )
@@ -22,7 +23,8 @@ type Config struct {
var DUMMY struct{} var DUMMY struct{}
func build(confBytes []byte) (container.Container, error) { // TODO: change sync map to provided storage
func build(confBytes []byte, storage storage.Storage) (container.Container, error) {
c := new(Config) c := new(Config)
if err := yaml.Unmarshal(confBytes, c); err != nil { if err := yaml.Unmarshal(confBytes, c); err != nil {
return nil, fmt.Errorf("unable to deserialise configuration: %v", err) return nil, fmt.Errorf("unable to deserialise configuration: %v", err)
@@ -41,14 +43,15 @@ func build(confBytes []byte) (container.Container, error) {
} }
for _, hashString := range hashList { for _, hashString := range hashList {
hashinfo, err := hex.DecodeString(hashString) hashBytes, err := hex.DecodeString(hashString)
if err != nil { if err != nil {
return nil, fmt.Errorf("whitelist : invalid hash %s, %v", hashString, err) return nil, fmt.Errorf("whitelist : invalid hash %s, %v", hashString, err)
} }
if len(hashinfo) != 20 { ih, err := bittorrent.NewInfoHash(hashBytes)
return nil, fmt.Errorf("whitelist : hash %s is not 20 byes", hashString) if err != nil {
return nil, fmt.Errorf("whitelist : %s : %v", hashString, err)
} }
l.Hashes.Store(bittorrent.InfoHashFromBytes(hashinfo), DUMMY) l.Hashes.Store(ih, DUMMY)
} }
return l, nil return l, nil
} }

View File

@@ -7,6 +7,7 @@ import (
"fmt" "fmt"
"github.com/chihaya/chihaya/middleware/torrentapproval/container" "github.com/chihaya/chihaya/middleware/torrentapproval/container"
"github.com/chihaya/chihaya/pkg/stop" "github.com/chihaya/chihaya/pkg/stop"
"github.com/chihaya/chihaya/storage"
"gopkg.in/yaml.v2" "gopkg.in/yaml.v2"
"github.com/chihaya/chihaya/bittorrent" "github.com/chihaya/chihaya/bittorrent"
@@ -24,7 +25,7 @@ func init() {
type driver struct{} type driver struct{}
func (d driver) NewHook(optionBytes []byte) (middleware.Hook, error) { func (d driver) NewHook(optionBytes []byte, storage storage.Storage) (middleware.Hook, error) {
var cfg middleware.Config var cfg middleware.Config
err := yaml.Unmarshal(optionBytes, &cfg) err := yaml.Unmarshal(optionBytes, &cfg)
if err != nil { if err != nil {
@@ -44,7 +45,7 @@ func (d driver) NewHook(optionBytes []byte) (middleware.Hook, error) {
return nil, err return nil, err
} }
if c, err := container.GetContainer(cfg.Name, confBytes); err == nil { if c, err := container.GetContainer(cfg.Name, confBytes, storage); err == nil {
return &hook{c}, nil return &hook{c}, nil
} else { } else {
return nil, err return nil, err

View File

@@ -80,7 +80,7 @@ func TestHandleAnnounce(t *testing.T) {
hashbytes, err := hex.DecodeString(tt.ih) hashbytes, err := hex.DecodeString(tt.ih)
require.Nil(t, err) require.Nil(t, err)
hashinfo := bittorrent.InfoHashFromBytes(hashbytes) hashinfo := bittorrent.NewInfoHash(hashbytes)
req.InfoHash = hashinfo req.InfoHash = hashinfo

View File

@@ -4,6 +4,7 @@ import (
"context" "context"
"errors" "errors"
"fmt" "fmt"
"github.com/chihaya/chihaya/storage"
"sync" "sync"
"time" "time"
@@ -25,7 +26,7 @@ var _ middleware.Driver = driver{}
type driver struct{} type driver struct{}
func (d driver) NewHook(optionBytes []byte) (middleware.Hook, error) { func (d driver) NewHook(optionBytes []byte, _ storage.Storage) (middleware.Hook, error) {
var cfg Config var cfg Config
err := yaml.Unmarshal(optionBytes, &cfg) err := yaml.Unmarshal(optionBytes, &cfg)
if err != nil { if err != nil {
@@ -96,12 +97,12 @@ func (h *hook) HandleAnnounce(ctx context.Context, req *bittorrent.AnnounceReque
if h.cfg.ModifyResponseProbability == 1 || p < h.cfg.ModifyResponseProbability { if h.cfg.ModifyResponseProbability == 1 || p < h.cfg.ModifyResponseProbability {
// Generate the increase delta. // Generate the increase delta.
v, _, _ = random.Intn(s0, s1, h.cfg.MaxIncreaseDelta) v, _, _ = random.Intn(s0, s1, h.cfg.MaxIncreaseDelta)
addSeconds := time.Duration(v+1) * time.Second add := time.Duration(v+1) * time.Second
resp.Interval += addSeconds resp.Interval += add
if h.cfg.ModifyMinInterval { if h.cfg.ModifyMinInterval {
resp.MinInterval += addSeconds resp.MinInterval += add
} }
return ctx, nil return ctx, nil

View File

@@ -11,7 +11,7 @@ import (
yaml "gopkg.in/yaml.v2" yaml "gopkg.in/yaml.v2"
"github.com/chihaya/chihaya/bittorrent" bittorrent "github.com/chihaya/chihaya/bittorrent"
"github.com/chihaya/chihaya/pkg/log" "github.com/chihaya/chihaya/pkg/log"
"github.com/chihaya/chihaya/pkg/stop" "github.com/chihaya/chihaya/pkg/stop"
"github.com/chihaya/chihaya/pkg/timecache" "github.com/chihaya/chihaya/pkg/timecache"
@@ -36,7 +36,7 @@ func init() {
type driver struct{} type driver struct{}
func (d driver) NewPeerStore(icfg interface{}) (storage.PeerStore, error) { func (d driver) NewPeerStore(icfg interface{}) (storage.Storage, error) {
// Marshal the config back into bytes. // Marshal the config back into bytes.
bytes, err := yaml.Marshal(icfg) bytes, err := yaml.Marshal(icfg)
if err != nil { if err != nil {
@@ -53,7 +53,7 @@ func (d driver) NewPeerStore(icfg interface{}) (storage.PeerStore, error) {
return New(cfg) return New(cfg)
} }
// Config holds the configuration of a memory PeerStore. // Config holds the configuration of a memory Storage.
type Config struct { type Config struct {
GarbageCollectionInterval time.Duration `yaml:"gc_interval"` GarbageCollectionInterval time.Duration `yaml:"gc_interval"`
PrometheusReportingInterval time.Duration `yaml:"prometheus_reporting_interval"` PrometheusReportingInterval time.Duration `yaml:"prometheus_reporting_interval"`
@@ -118,8 +118,8 @@ func (cfg Config) Validate() Config {
return validcfg return validcfg
} }
// New creates a new PeerStore backed by memory. // New creates a new Storage backed by memory.
func New(provided Config) (storage.PeerStore, error) { func New(provided Config) (storage.Storage, error) {
cfg := provided.Validate() cfg := provided.Validate()
ps := &peerStore{ ps := &peerStore{
cfg: cfg, cfg: cfg,
@@ -142,7 +142,9 @@ func New(provided Config) (storage.PeerStore, error) {
case <-time.After(cfg.GarbageCollectionInterval): case <-time.After(cfg.GarbageCollectionInterval):
before := time.Now().Add(-cfg.PeerLifetime) before := time.Now().Add(-cfg.PeerLifetime)
log.Debug("storage: purging peers with no announces since", log.Fields{"before": before}) log.Debug("storage: purging peers with no announces since", log.Fields{"before": before})
ps.collectGarbage(before) if err := ps.collectGarbage(before); err != nil {
log.Error(err)
}
} }
} }
}() }()
@@ -177,17 +179,22 @@ type serializedPeer string
func newPeerKey(p bittorrent.Peer) serializedPeer { func newPeerKey(p bittorrent.Peer) serializedPeer {
b := make([]byte, 20+2+len(p.IP.IP)) b := make([]byte, 20+2+len(p.IP.IP))
copy(b[:20], p.ID[:]) copy(b[:20], p.ID[:])
binary.BigEndian.PutUint16(b[20:22], p.Port) binary.BigEndian.PutUint16(b[bittorrent.PeerIDLen:bittorrent.PeerIDLen+2], p.Port)
copy(b[22:], p.IP.IP) copy(b[bittorrent.PeerIDLen+2:], p.IP.IP)
return serializedPeer(b) return serializedPeer(b)
} }
// TODO: move duplicated code into one place
func decodePeerKey(pk serializedPeer) bittorrent.Peer { func decodePeerKey(pk serializedPeer) bittorrent.Peer {
peerId, err := bittorrent.NewPeerID([]byte(pk[:bittorrent.PeerIDLen]))
if err != nil {
panic(err)
}
peer := bittorrent.Peer{ peer := bittorrent.Peer{
ID: bittorrent.PeerIDFromString(string(pk[:20])), ID: peerId,
Port: binary.BigEndian.Uint16([]byte(pk[20:22])), Port: binary.BigEndian.Uint16([]byte(pk[bittorrent.PeerIDLen:bittorrent.PeerIDLen+2])),
IP: bittorrent.IP{IP: net.IP(pk[22:])}} IP: bittorrent.IP{IP: net.IP(pk[bittorrent.PeerIDLen+2:])}}
if ip := peer.IP.To4(); ip != nil { if ip := peer.IP.To4(); ip != nil {
peer.IP.IP = ip peer.IP.IP = ip
@@ -222,7 +229,7 @@ type peerStore struct {
wg sync.WaitGroup wg sync.WaitGroup
} }
var _ storage.PeerStore = &peerStore{} var _ storage.Storage = &peerStore{}
// populateProm aggregates metrics over all shards and then posts them to // populateProm aggregates metrics over all shards and then posts them to
// prometheus. // prometheus.
@@ -255,7 +262,7 @@ func (ps *peerStore) shardIndex(infoHash bittorrent.InfoHash, af bittorrent.Addr
// There are twice the amount of shards specified by the user, the first // There are twice the amount of shards specified by the user, the first
// half is dedicated to IPv4 swarms and the second half is dedicated to // half is dedicated to IPv4 swarms and the second half is dedicated to
// IPv6 swarms. // IPv6 swarms.
idx := binary.BigEndian.Uint32(infoHash[:4]) % (uint32(len(ps.shards)) / 2) idx := binary.BigEndian.Uint32([]byte(infoHash[:4])) % (uint32(len(ps.shards)) / 2)
if af == bittorrent.IPv6 { if af == bittorrent.IPv6 {
idx += uint32(len(ps.shards) / 2) idx += uint32(len(ps.shards) / 2)
} }
@@ -512,7 +519,7 @@ func (ps *peerStore) ScrapeSwarm(ih bittorrent.InfoHash, addressFamily bittorren
return return
} }
// collectGarbage deletes all Peers from the PeerStore which are older than the // collectGarbage deletes all Peers from the Storage which are older than the
// cutoff time. // cutoff time.
// //
// This function must be able to execute while other methods on this interface // This function must be able to execute while other methods on this interface

View File

@@ -7,7 +7,7 @@ import (
s "github.com/chihaya/chihaya/storage" s "github.com/chihaya/chihaya/storage"
) )
func createNew() s.PeerStore { func createNew() s.Storage {
ps, err := New(Config{ ps, err := New(Config{
ShardCount: 1024, ShardCount: 1024,
GarbageCollectionInterval: 10 * time.Minute, GarbageCollectionInterval: 10 * time.Minute,

View File

@@ -61,7 +61,7 @@ func init() {
type driver struct{} type driver struct{}
func (d driver) NewPeerStore(icfg interface{}) (storage.PeerStore, error) { func (d driver) NewPeerStore(icfg interface{}) (storage.Storage, error) {
// Marshal the config back into bytes. // Marshal the config back into bytes.
bytes, err := yaml.Marshal(icfg) bytes, err := yaml.Marshal(icfg)
if err != nil { if err != nil {
@@ -78,7 +78,7 @@ func (d driver) NewPeerStore(icfg interface{}) (storage.PeerStore, error) {
return New(cfg) return New(cfg)
} }
// Config holds the configuration of a redis PeerStore. // Config holds the configuration of a redis Storage.
type Config struct { type Config struct {
GarbageCollectionInterval time.Duration `yaml:"gc_interval"` GarbageCollectionInterval time.Duration `yaml:"gc_interval"`
PrometheusReportingInterval time.Duration `yaml:"prometheus_reporting_interval"` PrometheusReportingInterval time.Duration `yaml:"prometheus_reporting_interval"`
@@ -176,8 +176,8 @@ func (cfg Config) Validate() Config {
return validcfg return validcfg
} }
// New creates a new PeerStore backed by redis. // New creates a new Storage backed by redis.
func New(provided Config) (storage.PeerStore, error) { func New(provided Config) (storage.Storage, error) {
cfg := provided.Validate() cfg := provided.Validate()
u, err := parseRedisURL(cfg.RedisBroker) u, err := parseRedisURL(cfg.RedisBroker)
@@ -237,19 +237,24 @@ func New(provided Config) (storage.PeerStore, error) {
type serializedPeer string type serializedPeer string
func newPeerKey(p bittorrent.Peer) serializedPeer { func newPeerKey(p bittorrent.Peer) serializedPeer {
b := make([]byte, 20+2+len(p.IP.IP)) b := make([]byte, bittorrent.PeerIDLen+2+len(p.IP.IP))
copy(b[:20], p.ID[:]) copy(b[:bittorrent.PeerIDLen], p.ID[:])
binary.BigEndian.PutUint16(b[20:22], p.Port) binary.BigEndian.PutUint16(b[bittorrent.PeerIDLen:bittorrent.PeerIDLen+2], p.Port)
copy(b[22:], p.IP.IP) copy(b[bittorrent.PeerIDLen+2:], p.IP.IP)
return serializedPeer(b) return serializedPeer(b)
} }
// TODO: move duplicated code into one place
func decodePeerKey(pk serializedPeer) bittorrent.Peer { func decodePeerKey(pk serializedPeer) bittorrent.Peer {
peerId, err := bittorrent.NewPeerID([]byte(pk[:bittorrent.PeerIDLen]))
if err != nil {
panic(err)
}
peer := bittorrent.Peer{ peer := bittorrent.Peer{
ID: bittorrent.PeerIDFromString(string(pk[:20])), ID: peerId,
Port: binary.BigEndian.Uint16([]byte(pk[20:22])), Port: binary.BigEndian.Uint16([]byte(pk[bittorrent.PeerIDLen : bittorrent.PeerIDLen+2])),
IP: bittorrent.IP{IP: net.IP(pk[22:])}} IP: bittorrent.IP{IP: net.IP(pk[bittorrent.PeerIDLen+2:])}}
if ip := peer.IP.To4(); ip != nil { if ip := peer.IP.To4(); ip != nil {
peer.IP.IP = ip peer.IP.IP = ip
@@ -295,12 +300,21 @@ func (ps *peerStore) leecherCountKey(af string) string {
return af + "_L_count" return af + "_L_count"
} }
func (ps *peerStore) getConnection() redis.Conn {
select {
case <-ps.closed:
panic("attempted to interact with stopped redis store")
default:
}
return ps.rb.open()
}
// populateProm aggregates metrics over all groups and then posts them to // populateProm aggregates metrics over all groups and then posts them to
// prometheus. // prometheus.
func (ps *peerStore) populateProm() { func (ps *peerStore) populateProm() {
var numInfohashes, numSeeders, numLeechers int64 var numInfohashes, numSeeders, numLeechers int64
conn := ps.rb.open() conn := ps.getConnection()
defer conn.Close() defer conn.Close()
for _, group := range ps.groups() { for _, group := range ps.groups() {
@@ -346,18 +360,11 @@ func (ps *peerStore) PutSeeder(ih bittorrent.InfoHash, p bittorrent.Peer) error
"Peer": p, "Peer": p,
}) })
select {
case <-ps.closed:
panic("attempted to interact with stopped redis store")
default:
}
pk := newPeerKey(p) pk := newPeerKey(p)
encodedSeederInfoHash := ps.seederInfohashKey(addressFamily, ih.String()) encodedSeederInfoHash := ps.seederInfohashKey(addressFamily, ih.String())
ct := ps.getClock() ct := ps.getClock()
conn := ps.rb.open() conn := ps.getConnection()
defer conn.Close() defer conn.Close()
conn.Send("MULTI") conn.Send("MULTI")
@@ -393,15 +400,9 @@ func (ps *peerStore) DeleteSeeder(ih bittorrent.InfoHash, p bittorrent.Peer) err
"Peer": p, "Peer": p,
}) })
select {
case <-ps.closed:
panic("attempted to interact with stopped redis store")
default:
}
pk := newPeerKey(p) pk := newPeerKey(p)
conn := ps.rb.open() conn := ps.getConnection()
defer conn.Close() defer conn.Close()
encodedSeederInfoHash := ps.seederInfohashKey(addressFamily, ih.String()) encodedSeederInfoHash := ps.seederInfohashKey(addressFamily, ih.String())
@@ -427,18 +428,12 @@ func (ps *peerStore) PutLeecher(ih bittorrent.InfoHash, p bittorrent.Peer) error
"Peer": p, "Peer": p,
}) })
select {
case <-ps.closed:
panic("attempted to interact with stopped redis store")
default:
}
// Update the peer in the swarm. // Update the peer in the swarm.
encodedLeecherInfoHash := ps.leecherInfohashKey(addressFamily, ih.String()) encodedLeecherInfoHash := ps.leecherInfohashKey(addressFamily, ih.String())
pk := newPeerKey(p) pk := newPeerKey(p)
ct := ps.getClock() ct := ps.getClock()
conn := ps.rb.open() conn := ps.getConnection()
defer conn.Close() defer conn.Close()
conn.Send("MULTI") conn.Send("MULTI")
@@ -465,13 +460,7 @@ func (ps *peerStore) DeleteLeecher(ih bittorrent.InfoHash, p bittorrent.Peer) er
"Peer": p, "Peer": p,
}) })
select { conn := ps.getConnection()
case <-ps.closed:
panic("attempted to interact with stopped redis store")
default:
}
conn := ps.rb.open()
defer conn.Close() defer conn.Close()
pk := newPeerKey(p) pk := newPeerKey(p)
@@ -484,9 +473,7 @@ func (ps *peerStore) DeleteLeecher(ih bittorrent.InfoHash, p bittorrent.Peer) er
if delNum == 0 { if delNum == 0 {
return storage.ErrResourceDoesNotExist return storage.ErrResourceDoesNotExist
} }
if _, err := conn.Do("DECR", ps.leecherCountKey(addressFamily)); err != nil { _, err = conn.Do("DECR", ps.leecherCountKey(addressFamily))
return err
}
return nil return nil
} }
@@ -498,19 +485,13 @@ func (ps *peerStore) GraduateLeecher(ih bittorrent.InfoHash, p bittorrent.Peer)
"Peer": p, "Peer": p,
}) })
select {
case <-ps.closed:
panic("attempted to interact with stopped redis store")
default:
}
encodedInfoHash := ih.String() encodedInfoHash := ih.String()
encodedLeecherInfoHash := ps.leecherInfohashKey(addressFamily, encodedInfoHash) encodedLeecherInfoHash := ps.leecherInfohashKey(addressFamily, encodedInfoHash)
encodedSeederInfoHash := ps.seederInfohashKey(addressFamily, encodedInfoHash) encodedSeederInfoHash := ps.seederInfohashKey(addressFamily, encodedInfoHash)
pk := newPeerKey(p) pk := newPeerKey(p)
ct := ps.getClock() ct := ps.getClock()
conn := ps.rb.open() conn := ps.getConnection()
defer conn.Close() defer conn.Close()
conn.Send("MULTI") conn.Send("MULTI")
@@ -552,17 +533,11 @@ func (ps *peerStore) AnnouncePeers(ih bittorrent.InfoHash, seeder bool, numWant
"Peer": announcer, "Peer": announcer,
}) })
select {
case <-ps.closed:
panic("attempted to interact with stopped redis store")
default:
}
encodedInfoHash := ih.String() encodedInfoHash := ih.String()
encodedLeecherInfoHash := ps.leecherInfohashKey(addressFamily, encodedInfoHash) encodedLeecherInfoHash := ps.leecherInfohashKey(addressFamily, encodedInfoHash)
encodedSeederInfoHash := ps.seederInfohashKey(addressFamily, encodedInfoHash) encodedSeederInfoHash := ps.seederInfohashKey(addressFamily, encodedInfoHash)
conn := ps.rb.open() conn := ps.getConnection()
defer conn.Close() defer conn.Close()
leechers, err := conn.Do("HKEYS", encodedLeecherInfoHash) leechers, err := conn.Do("HKEYS", encodedLeecherInfoHash)
@@ -624,11 +599,6 @@ func (ps *peerStore) AnnouncePeers(ih bittorrent.InfoHash, seeder bool, numWant
} }
func (ps *peerStore) ScrapeSwarm(ih bittorrent.InfoHash, af bittorrent.AddressFamily) (resp bittorrent.Scrape) { func (ps *peerStore) ScrapeSwarm(ih bittorrent.InfoHash, af bittorrent.AddressFamily) (resp bittorrent.Scrape) {
select {
case <-ps.closed:
panic("attempted to interact with stopped redis store")
default:
}
resp.InfoHash = ih resp.InfoHash = ih
addressFamily := af.String() addressFamily := af.String()
@@ -636,7 +606,7 @@ func (ps *peerStore) ScrapeSwarm(ih bittorrent.InfoHash, af bittorrent.AddressFa
encodedLeecherInfoHash := ps.leecherInfohashKey(addressFamily, encodedInfoHash) encodedLeecherInfoHash := ps.leecherInfohashKey(addressFamily, encodedInfoHash)
encodedSeederInfoHash := ps.seederInfohashKey(addressFamily, encodedInfoHash) encodedSeederInfoHash := ps.seederInfohashKey(addressFamily, encodedInfoHash)
conn := ps.rb.open() conn := ps.getConnection()
defer conn.Close() defer conn.Close()
leechersLen, err := redis.Int64(conn.Do("HLEN", encodedLeecherInfoHash)) leechersLen, err := redis.Int64(conn.Do("HLEN", encodedLeecherInfoHash))
@@ -663,7 +633,7 @@ func (ps *peerStore) ScrapeSwarm(ih bittorrent.InfoHash, af bittorrent.AddressFa
return return
} }
// collectGarbage deletes all Peers from the PeerStore which are older than the // collectGarbage deletes all Peers from the Storage which are older than the
// cutoff time. // cutoff time.
// //
// This function must be able to execute while other methods on this interface // This function must be able to execute while other methods on this interface
@@ -709,18 +679,12 @@ func (ps *peerStore) ScrapeSwarm(ih bittorrent.InfoHash, af bittorrent.AddressFa
// transaction. The infohash key will remain in the addressFamil hash and // transaction. The infohash key will remain in the addressFamil hash and
// we'll attempt to clean it up the next time collectGarbage runs. // we'll attempt to clean it up the next time collectGarbage runs.
func (ps *peerStore) collectGarbage(cutoff time.Time) error { func (ps *peerStore) collectGarbage(cutoff time.Time) error {
select {
case <-ps.closed:
return nil
default:
}
conn := ps.rb.open()
defer conn.Close()
cutoffUnix := cutoff.UnixNano() cutoffUnix := cutoff.UnixNano()
start := time.Now() start := time.Now()
conn := ps.getConnection()
defer conn.Close()
for _, group := range ps.groups() { for _, group := range ps.groups() {
// list all infohashes in the group // list all infohashes in the group
infohashesList, err := redis.Strings(conn.Do("HKEYS", group)) infohashesList, err := redis.Strings(conn.Do("HKEYS", group))

View File

@@ -10,7 +10,7 @@ import (
s "github.com/chihaya/chihaya/storage" s "github.com/chihaya/chihaya/storage"
) )
func createNew() s.PeerStore { func createNew() s.Storage {
rs, err := miniredis.Run() rs, err := miniredis.Run()
if err != nil { if err != nil {
panic(err) panic(err)

View File

@@ -14,24 +14,24 @@ var (
drivers = make(map[string]Driver) drivers = make(map[string]Driver)
) )
// Driver is the interface used to initialize a new type of PeerStore. // Driver is the interface used to initialize a new type of Storage.
type Driver interface { type Driver interface {
NewPeerStore(cfg interface{}) (PeerStore, error) NewPeerStore(cfg interface{}) (Storage, error)
} }
// ErrResourceDoesNotExist is the error returned by all delete methods and the // ErrResourceDoesNotExist is the error returned by all delete methods and the
// AnnouncePeers method of the PeerStore interface if the requested resource // AnnouncePeers method of the Storage interface if the requested resource
// does not exist. // does not exist.
var ErrResourceDoesNotExist = bittorrent.ClientError("resource does not exist") var ErrResourceDoesNotExist = bittorrent.ClientError("resource does not exist")
// ErrDriverDoesNotExist is the error returned by NewPeerStore when a peer // ErrDriverDoesNotExist is the error returned by NewStorage when a peer
// store driver with that name does not exist. // store driver with that name does not exist.
var ErrDriverDoesNotExist = errors.New("peer store driver with that name does not exist") var ErrDriverDoesNotExist = errors.New("peer store driver with that name does not exist")
// PeerStore is an interface that abstracts the interactions of storing and // Storage is an interface that abstracts the interactions of storing and
// manipulating Peers such that it can be implemented for various data stores. // manipulating Peers such that it can be implemented for various data stores.
// //
// Implementations of the PeerStore interface must do the following in addition // Implementations of the Storage interface must do the following in addition
// to implementing the methods of the interface in the way documented: // to implementing the methods of the interface in the way documented:
// //
// - Implement a garbage-collection strategy that ensures stale data is removed. // - Implement a garbage-collection strategy that ensures stale data is removed.
@@ -40,13 +40,13 @@ var ErrDriverDoesNotExist = errors.New("peer store driver with that name does no
// be scanned periodically and too old Peers removed. The intervals and // be scanned periodically and too old Peers removed. The intervals and
// durations involved should be configurable. // durations involved should be configurable.
// - IPv4 and IPv6 swarms must be isolated from each other. // - IPv4 and IPv6 swarms must be isolated from each other.
// A PeerStore must be able to transparently handle IPv4 and IPv6 Peers, but // A Storage must be able to transparently handle IPv4 and IPv6 Peers, but
// must separate them. AnnouncePeers and ScrapeSwarm must return information // must separate them. AnnouncePeers and ScrapeSwarm must return information
// about the Swarm matching the given AddressFamily only. // about the Swarm matching the given AddressFamily only.
// //
// Implementations can be tested against this interface using the tests in // Implementations can be tested against this interface using the tests in
// storage_tests.go and the benchmarks in storage_bench.go. // storage_test.go and the benchmarks in storage_bench.go.
type PeerStore interface { type Storage interface {
// PutSeeder adds a Seeder to the Swarm identified by the provided // PutSeeder adds a Seeder to the Swarm identified by the provided
// InfoHash. // InfoHash.
PutSeeder(infoHash bittorrent.InfoHash, p bittorrent.Peer) error PutSeeder(infoHash bittorrent.InfoHash, p bittorrent.Peer) error
@@ -104,13 +104,21 @@ type PeerStore interface {
// If the Swarm does not exist, an empty Scrape and no error is returned. // If the Swarm does not exist, an empty Scrape and no error is returned.
ScrapeSwarm(infoHash bittorrent.InfoHash, addressFamily bittorrent.AddressFamily) bittorrent.Scrape ScrapeSwarm(infoHash bittorrent.InfoHash, addressFamily bittorrent.AddressFamily) bittorrent.Scrape
/*TODO: implement this*/
Put(key interface{}, value interface{})
Load(key interface{}) interface{}
Delete(key interface{})
// stop.Stopper is an interface that expects a Stop method to stop the // stop.Stopper is an interface that expects a Stop method to stop the
// PeerStore. // Storage.
// For more details see the documentation in the stop package. // For more details see the documentation in the stop package.
stop.Stopper stop.Stopper
// log.Fielder returns a loggable version of the data used to configure and // log.Fielder returns a loggable version of the data used to configure and
// operate a particular PeerStore. // operate a particular Storage.
log.Fielder log.Fielder
} }
@@ -136,11 +144,11 @@ func RegisterDriver(name string, d Driver) {
drivers[name] = d drivers[name] = d
} }
// NewPeerStore attempts to initialize a new PeerStore instance from // NewStorage attempts to initialize a new Storage instance from
// the list of registered Drivers. // the list of registered Drivers.
// //
// If a driver does not exist, returns ErrDriverDoesNotExist. // If a driver does not exist, returns ErrDriverDoesNotExist.
func NewPeerStore(name string, cfg interface{}) (ps PeerStore, err error) { func NewStorage(name string, cfg interface{}) (ps Storage, err error) {
driversM.RLock() driversM.RLock()
defer driversM.RUnlock() defer driversM.RUnlock()

View File

@@ -19,7 +19,7 @@ func generateInfohashes() (a [1000]bittorrent.InfoHash) {
for i := range a { for i := range a {
b := make([]byte, 20) b := make([]byte, 20)
rand.Read(b) rand.Read(b)
a[i], _ = bittorrent.InfoHashFromBytes(b) a[i], _ = bittorrent.NewInfoHash(b)
} }
return return
@@ -49,10 +49,10 @@ func generatePeers() (a [1000]bittorrent.Peer) {
return return
} }
type executionFunc func(int, PeerStore, *benchData) error type executionFunc func(int, Storage, *benchData) error
type setupFunc func(PeerStore, *benchData) error type setupFunc func(Storage, *benchData) error
func runBenchmark(b *testing.B, ps PeerStore, parallel bool, sf setupFunc, ef executionFunc) { func runBenchmark(b *testing.B, ps Storage, parallel bool, sf setupFunc, ef executionFunc) {
bd := &benchData{generateInfohashes(), generatePeers()} bd := &benchData{generateInfohashes(), generatePeers()}
spacing := int32(1000 / runtime.NumCPU()) spacing := int32(1000 / runtime.NumCPU())
if sf != nil { if sf != nil {
@@ -92,65 +92,65 @@ func runBenchmark(b *testing.B, ps PeerStore, parallel bool, sf setupFunc, ef ex
} }
// Nop executes a no-op for each iteration. // Nop executes a no-op for each iteration.
// It should produce the same results for each PeerStore. // It should produce the same results for each Storage.
// This can be used to get an estimate of the impact of the benchmark harness // This can be used to get an estimate of the impact of the benchmark harness
// on benchmark results and an estimate of the general performance of the system // on benchmark results and an estimate of the general performance of the system
// benchmarked on. // benchmarked on.
// //
// Nop can run in parallel. // Nop can run in parallel.
func Nop(b *testing.B, ps PeerStore) { func Nop(b *testing.B, ps Storage) {
runBenchmark(b, ps, true, putPeers, func(i int, ps PeerStore, bd *benchData) error { runBenchmark(b, ps, true, putPeers, func(i int, ps Storage, bd *benchData) error {
return nil return nil
}) })
} }
// Put benchmarks the PutSeeder method of a PeerStore by repeatedly Putting the // Put benchmarks the PutSeeder method of a Storage by repeatedly Putting the
// same Peer for the same InfoHash. // same Peer for the same InfoHash.
// //
// Put can run in parallel. // Put can run in parallel.
func Put(b *testing.B, ps PeerStore) { func Put(b *testing.B, ps Storage) {
runBenchmark(b, ps, true, nil, func(i int, ps PeerStore, bd *benchData) error { runBenchmark(b, ps, true, nil, func(i int, ps Storage, bd *benchData) error {
return ps.PutSeeder(bd.infohashes[0], bd.peers[0]) return ps.PutSeeder(bd.infohashes[0], bd.peers[0])
}) })
} }
// Put1k benchmarks the PutSeeder method of a PeerStore by cycling through 1000 // Put1k benchmarks the PutSeeder method of a Storage by cycling through 1000
// Peers and Putting them into the swarm of one infohash. // Peers and Putting them into the swarm of one infohash.
// //
// Put1k can run in parallel. // Put1k can run in parallel.
func Put1k(b *testing.B, ps PeerStore) { func Put1k(b *testing.B, ps Storage) {
runBenchmark(b, ps, true, nil, func(i int, ps PeerStore, bd *benchData) error { runBenchmark(b, ps, true, nil, func(i int, ps Storage, bd *benchData) error {
return ps.PutSeeder(bd.infohashes[0], bd.peers[i%1000]) return ps.PutSeeder(bd.infohashes[0], bd.peers[i%1000])
}) })
} }
// Put1kInfohash benchmarks the PutSeeder method of a PeerStore by cycling // Put1kInfohash benchmarks the PutSeeder method of a Storage by cycling
// through 1000 infohashes and putting the same peer into their swarms. // through 1000 infohashes and putting the same peer into their swarms.
// //
// Put1kInfohash can run in parallel. // Put1kInfohash can run in parallel.
func Put1kInfohash(b *testing.B, ps PeerStore) { func Put1kInfohash(b *testing.B, ps Storage) {
runBenchmark(b, ps, true, nil, func(i int, ps PeerStore, bd *benchData) error { runBenchmark(b, ps, true, nil, func(i int, ps Storage, bd *benchData) error {
return ps.PutSeeder(bd.infohashes[i%1000], bd.peers[0]) return ps.PutSeeder(bd.infohashes[i%1000], bd.peers[0])
}) })
} }
// Put1kInfohash1k benchmarks the PutSeeder method of a PeerStore by cycling // Put1kInfohash1k benchmarks the PutSeeder method of a Storage by cycling
// through 1000 infohashes and 1000 Peers and calling Put with them. // through 1000 infohashes and 1000 Peers and calling Put with them.
// //
// Put1kInfohash1k can run in parallel. // Put1kInfohash1k can run in parallel.
func Put1kInfohash1k(b *testing.B, ps PeerStore) { func Put1kInfohash1k(b *testing.B, ps Storage) {
runBenchmark(b, ps, true, nil, func(i int, ps PeerStore, bd *benchData) error { runBenchmark(b, ps, true, nil, func(i int, ps Storage, bd *benchData) error {
err := ps.PutSeeder(bd.infohashes[i%1000], bd.peers[(i*3)%1000]) err := ps.PutSeeder(bd.infohashes[i%1000], bd.peers[(i*3)%1000])
return err return err
}) })
} }
// PutDelete benchmarks the PutSeeder and DeleteSeeder methods of a PeerStore by // PutDelete benchmarks the PutSeeder and DeleteSeeder methods of a Storage by
// calling PutSeeder followed by DeleteSeeder for one Peer and one infohash. // calling PutSeeder followed by DeleteSeeder for one Peer and one infohash.
// //
// PutDelete can not run in parallel. // PutDelete can not run in parallel.
func PutDelete(b *testing.B, ps PeerStore) { func PutDelete(b *testing.B, ps Storage) {
runBenchmark(b, ps, false, nil, func(i int, ps PeerStore, bd *benchData) error { runBenchmark(b, ps, false, nil, func(i int, ps Storage, bd *benchData) error {
err := ps.PutSeeder(bd.infohashes[0], bd.peers[0]) err := ps.PutSeeder(bd.infohashes[0], bd.peers[0])
if err != nil { if err != nil {
return err return err
@@ -163,8 +163,8 @@ func PutDelete(b *testing.B, ps PeerStore) {
// PutDelete does, but with one from 1000 Peers per iteration. // PutDelete does, but with one from 1000 Peers per iteration.
// //
// PutDelete1k can not run in parallel. // PutDelete1k can not run in parallel.
func PutDelete1k(b *testing.B, ps PeerStore) { func PutDelete1k(b *testing.B, ps Storage) {
runBenchmark(b, ps, false, nil, func(i int, ps PeerStore, bd *benchData) error { runBenchmark(b, ps, false, nil, func(i int, ps Storage, bd *benchData) error {
err := ps.PutSeeder(bd.infohashes[0], bd.peers[i%1000]) err := ps.PutSeeder(bd.infohashes[0], bd.peers[i%1000])
if err != nil { if err != nil {
return err return err
@@ -177,8 +177,8 @@ func PutDelete1k(b *testing.B, ps PeerStore) {
// 1000 Peers. // 1000 Peers.
// //
// PutDelete1kInfohash can not run in parallel. // PutDelete1kInfohash can not run in parallel.
func PutDelete1kInfohash(b *testing.B, ps PeerStore) { func PutDelete1kInfohash(b *testing.B, ps Storage) {
runBenchmark(b, ps, false, nil, func(i int, ps PeerStore, bd *benchData) error { runBenchmark(b, ps, false, nil, func(i int, ps Storage, bd *benchData) error {
err := ps.PutSeeder(bd.infohashes[i%1000], bd.peers[0]) err := ps.PutSeeder(bd.infohashes[i%1000], bd.peers[0])
if err != nil { if err != nil {
} }
@@ -190,8 +190,8 @@ func PutDelete1kInfohash(b *testing.B, ps PeerStore) {
// addition to 1000 Peers. // addition to 1000 Peers.
// //
// PutDelete1kInfohash1k can not run in parallel. // PutDelete1kInfohash1k can not run in parallel.
func PutDelete1kInfohash1k(b *testing.B, ps PeerStore) { func PutDelete1kInfohash1k(b *testing.B, ps Storage) {
runBenchmark(b, ps, false, nil, func(i int, ps PeerStore, bd *benchData) error { runBenchmark(b, ps, false, nil, func(i int, ps Storage, bd *benchData) error {
err := ps.PutSeeder(bd.infohashes[i%1000], bd.peers[(i*3)%1000]) err := ps.PutSeeder(bd.infohashes[i%1000], bd.peers[(i*3)%1000])
if err != nil { if err != nil {
return err return err
@@ -201,102 +201,102 @@ func PutDelete1kInfohash1k(b *testing.B, ps PeerStore) {
}) })
} }
// DeleteNonexist benchmarks the DeleteSeeder method of a PeerStore by // DeleteNonexist benchmarks the DeleteSeeder method of a Storage by
// attempting to delete a Peer that is nonexistent. // attempting to delete a Peer that is nonexistent.
// //
// DeleteNonexist can run in parallel. // DeleteNonexist can run in parallel.
func DeleteNonexist(b *testing.B, ps PeerStore) { func DeleteNonexist(b *testing.B, ps Storage) {
runBenchmark(b, ps, true, nil, func(i int, ps PeerStore, bd *benchData) error { runBenchmark(b, ps, true, nil, func(i int, ps Storage, bd *benchData) error {
ps.DeleteSeeder(bd.infohashes[0], bd.peers[0]) ps.DeleteSeeder(bd.infohashes[0], bd.peers[0])
return nil return nil
}) })
} }
// DeleteNonexist1k benchmarks the DeleteSeeder method of a PeerStore by // DeleteNonexist1k benchmarks the DeleteSeeder method of a Storage by
// attempting to delete one of 1000 nonexistent Peers. // attempting to delete one of 1000 nonexistent Peers.
// //
// DeleteNonexist can run in parallel. // DeleteNonexist can run in parallel.
func DeleteNonexist1k(b *testing.B, ps PeerStore) { func DeleteNonexist1k(b *testing.B, ps Storage) {
runBenchmark(b, ps, true, nil, func(i int, ps PeerStore, bd *benchData) error { runBenchmark(b, ps, true, nil, func(i int, ps Storage, bd *benchData) error {
ps.DeleteSeeder(bd.infohashes[0], bd.peers[i%1000]) ps.DeleteSeeder(bd.infohashes[0], bd.peers[i%1000])
return nil return nil
}) })
} }
// DeleteNonexist1kInfohash benchmarks the DeleteSeeder method of a PeerStore by // DeleteNonexist1kInfohash benchmarks the DeleteSeeder method of a Storage by
// attempting to delete one Peer from one of 1000 infohashes. // attempting to delete one Peer from one of 1000 infohashes.
// //
// DeleteNonexist1kInfohash can run in parallel. // DeleteNonexist1kInfohash can run in parallel.
func DeleteNonexist1kInfohash(b *testing.B, ps PeerStore) { func DeleteNonexist1kInfohash(b *testing.B, ps Storage) {
runBenchmark(b, ps, true, nil, func(i int, ps PeerStore, bd *benchData) error { runBenchmark(b, ps, true, nil, func(i int, ps Storage, bd *benchData) error {
ps.DeleteSeeder(bd.infohashes[i%1000], bd.peers[0]) ps.DeleteSeeder(bd.infohashes[i%1000], bd.peers[0])
return nil return nil
}) })
} }
// DeleteNonexist1kInfohash1k benchmarks the Delete method of a PeerStore by // DeleteNonexist1kInfohash1k benchmarks the Delete method of a Storage by
// attempting to delete one of 1000 Peers from one of 1000 Infohashes. // attempting to delete one of 1000 Peers from one of 1000 Infohashes.
// //
// DeleteNonexist1kInfohash1k can run in parallel. // DeleteNonexist1kInfohash1k can run in parallel.
func DeleteNonexist1kInfohash1k(b *testing.B, ps PeerStore) { func DeleteNonexist1kInfohash1k(b *testing.B, ps Storage) {
runBenchmark(b, ps, true, nil, func(i int, ps PeerStore, bd *benchData) error { runBenchmark(b, ps, true, nil, func(i int, ps Storage, bd *benchData) error {
ps.DeleteSeeder(bd.infohashes[i%1000], bd.peers[(i*3)%1000]) ps.DeleteSeeder(bd.infohashes[i%1000], bd.peers[(i*3)%1000])
return nil return nil
}) })
} }
// GradNonexist benchmarks the GraduateLeecher method of a PeerStore by // GradNonexist benchmarks the GraduateLeecher method of a Storage by
// attempting to graduate a nonexistent Peer. // attempting to graduate a nonexistent Peer.
// //
// GradNonexist can run in parallel. // GradNonexist can run in parallel.
func GradNonexist(b *testing.B, ps PeerStore) { func GradNonexist(b *testing.B, ps Storage) {
runBenchmark(b, ps, true, nil, func(i int, ps PeerStore, bd *benchData) error { runBenchmark(b, ps, true, nil, func(i int, ps Storage, bd *benchData) error {
ps.GraduateLeecher(bd.infohashes[0], bd.peers[0]) ps.GraduateLeecher(bd.infohashes[0], bd.peers[0])
return nil return nil
}) })
} }
// GradNonexist1k benchmarks the GraduateLeecher method of a PeerStore by // GradNonexist1k benchmarks the GraduateLeecher method of a Storage by
// attempting to graduate one of 1000 nonexistent Peers. // attempting to graduate one of 1000 nonexistent Peers.
// //
// GradNonexist1k can run in parallel. // GradNonexist1k can run in parallel.
func GradNonexist1k(b *testing.B, ps PeerStore) { func GradNonexist1k(b *testing.B, ps Storage) {
runBenchmark(b, ps, true, nil, func(i int, ps PeerStore, bd *benchData) error { runBenchmark(b, ps, true, nil, func(i int, ps Storage, bd *benchData) error {
ps.GraduateLeecher(bd.infohashes[0], bd.peers[i%1000]) ps.GraduateLeecher(bd.infohashes[0], bd.peers[i%1000])
return nil return nil
}) })
} }
// GradNonexist1kInfohash benchmarks the GraduateLeecher method of a PeerStore // GradNonexist1kInfohash benchmarks the GraduateLeecher method of a Storage
// by attempting to graduate a nonexistent Peer for one of 100 Infohashes. // by attempting to graduate a nonexistent Peer for one of 100 Infohashes.
// //
// GradNonexist1kInfohash can run in parallel. // GradNonexist1kInfohash can run in parallel.
func GradNonexist1kInfohash(b *testing.B, ps PeerStore) { func GradNonexist1kInfohash(b *testing.B, ps Storage) {
runBenchmark(b, ps, true, nil, func(i int, ps PeerStore, bd *benchData) error { runBenchmark(b, ps, true, nil, func(i int, ps Storage, bd *benchData) error {
ps.GraduateLeecher(bd.infohashes[i%1000], bd.peers[0]) ps.GraduateLeecher(bd.infohashes[i%1000], bd.peers[0])
return nil return nil
}) })
} }
// GradNonexist1kInfohash1k benchmarks the GraduateLeecher method of a PeerStore // GradNonexist1kInfohash1k benchmarks the GraduateLeecher method of a Storage
// by attempting to graduate one of 1000 nonexistent Peers for one of 1000 // by attempting to graduate one of 1000 nonexistent Peers for one of 1000
// infohashes. // infohashes.
// //
// GradNonexist1kInfohash1k can run in parallel. // GradNonexist1kInfohash1k can run in parallel.
func GradNonexist1kInfohash1k(b *testing.B, ps PeerStore) { func GradNonexist1kInfohash1k(b *testing.B, ps Storage) {
runBenchmark(b, ps, true, nil, func(i int, ps PeerStore, bd *benchData) error { runBenchmark(b, ps, true, nil, func(i int, ps Storage, bd *benchData) error {
ps.GraduateLeecher(bd.infohashes[i%1000], bd.peers[(i*3)%1000]) ps.GraduateLeecher(bd.infohashes[i%1000], bd.peers[(i*3)%1000])
return nil return nil
}) })
} }
// PutGradDelete benchmarks the PutLeecher, GraduateLeecher and DeleteSeeder // PutGradDelete benchmarks the PutLeecher, GraduateLeecher and DeleteSeeder
// methods of a PeerStore by adding one leecher to a swarm, promoting it to a // methods of a Storage by adding one leecher to a swarm, promoting it to a
// seeder and deleting the seeder. // seeder and deleting the seeder.
// //
// PutGradDelete can not run in parallel. // PutGradDelete can not run in parallel.
func PutGradDelete(b *testing.B, ps PeerStore) { func PutGradDelete(b *testing.B, ps Storage) {
runBenchmark(b, ps, false, nil, func(i int, ps PeerStore, bd *benchData) error { runBenchmark(b, ps, false, nil, func(i int, ps Storage, bd *benchData) error {
err := ps.PutLeecher(bd.infohashes[0], bd.peers[0]) err := ps.PutLeecher(bd.infohashes[0], bd.peers[0])
if err != nil { if err != nil {
return err return err
@@ -312,8 +312,8 @@ func PutGradDelete(b *testing.B, ps PeerStore) {
// PutGradDelete1k behaves like PutGradDelete with one of 1000 Peers. // PutGradDelete1k behaves like PutGradDelete with one of 1000 Peers.
// //
// PutGradDelete1k can not run in parallel. // PutGradDelete1k can not run in parallel.
func PutGradDelete1k(b *testing.B, ps PeerStore) { func PutGradDelete1k(b *testing.B, ps Storage) {
runBenchmark(b, ps, false, nil, func(i int, ps PeerStore, bd *benchData) error { runBenchmark(b, ps, false, nil, func(i int, ps Storage, bd *benchData) error {
err := ps.PutLeecher(bd.infohashes[0], bd.peers[i%1000]) err := ps.PutLeecher(bd.infohashes[0], bd.peers[i%1000])
if err != nil { if err != nil {
return err return err
@@ -330,8 +330,8 @@ func PutGradDelete1k(b *testing.B, ps PeerStore) {
// infohashes. // infohashes.
// //
// PutGradDelete1kInfohash can not run in parallel. // PutGradDelete1kInfohash can not run in parallel.
func PutGradDelete1kInfohash(b *testing.B, ps PeerStore) { func PutGradDelete1kInfohash(b *testing.B, ps Storage) {
runBenchmark(b, ps, false, nil, func(i int, ps PeerStore, bd *benchData) error { runBenchmark(b, ps, false, nil, func(i int, ps Storage, bd *benchData) error {
err := ps.PutLeecher(bd.infohashes[i%1000], bd.peers[0]) err := ps.PutLeecher(bd.infohashes[i%1000], bd.peers[0])
if err != nil { if err != nil {
return err return err
@@ -348,8 +348,8 @@ func PutGradDelete1kInfohash(b *testing.B, ps PeerStore) {
// and one of 1000 infohashes. // and one of 1000 infohashes.
// //
// PutGradDelete1kInfohash can not run in parallel. // PutGradDelete1kInfohash can not run in parallel.
func PutGradDelete1kInfohash1k(b *testing.B, ps PeerStore) { func PutGradDelete1kInfohash1k(b *testing.B, ps Storage) {
runBenchmark(b, ps, false, nil, func(i int, ps PeerStore, bd *benchData) error { runBenchmark(b, ps, false, nil, func(i int, ps Storage, bd *benchData) error {
err := ps.PutLeecher(bd.infohashes[i%1000], bd.peers[(i*3)%1000]) err := ps.PutLeecher(bd.infohashes[i%1000], bd.peers[(i*3)%1000])
if err != nil { if err != nil {
return err return err
@@ -363,7 +363,7 @@ func PutGradDelete1kInfohash1k(b *testing.B, ps PeerStore) {
}) })
} }
func putPeers(ps PeerStore, bd *benchData) error { func putPeers(ps Storage, bd *benchData) error {
for i := 0; i < 1000; i++ { for i := 0; i < 1000; i++ {
for j := 0; j < 1000; j++ { for j := 0; j < 1000; j++ {
var err error var err error
@@ -380,13 +380,13 @@ func putPeers(ps PeerStore, bd *benchData) error {
return nil return nil
} }
// AnnounceLeecher benchmarks the AnnouncePeers method of a PeerStore for // AnnounceLeecher benchmarks the AnnouncePeers method of a Storage for
// announcing a leecher. // announcing a leecher.
// The swarm announced to has 500 seeders and 500 leechers. // The swarm announced to has 500 seeders and 500 leechers.
// //
// AnnounceLeecher can run in parallel. // AnnounceLeecher can run in parallel.
func AnnounceLeecher(b *testing.B, ps PeerStore) { func AnnounceLeecher(b *testing.B, ps Storage) {
runBenchmark(b, ps, true, putPeers, func(i int, ps PeerStore, bd *benchData) error { runBenchmark(b, ps, true, putPeers, func(i int, ps Storage, bd *benchData) error {
_, err := ps.AnnouncePeers(bd.infohashes[0], false, 50, bd.peers[0]) _, err := ps.AnnouncePeers(bd.infohashes[0], false, 50, bd.peers[0])
return err return err
}) })
@@ -396,8 +396,8 @@ func AnnounceLeecher(b *testing.B, ps PeerStore) {
// infohashes. // infohashes.
// //
// AnnounceLeecher1kInfohash can run in parallel. // AnnounceLeecher1kInfohash can run in parallel.
func AnnounceLeecher1kInfohash(b *testing.B, ps PeerStore) { func AnnounceLeecher1kInfohash(b *testing.B, ps Storage) {
runBenchmark(b, ps, true, putPeers, func(i int, ps PeerStore, bd *benchData) error { runBenchmark(b, ps, true, putPeers, func(i int, ps Storage, bd *benchData) error {
_, err := ps.AnnouncePeers(bd.infohashes[i%1000], false, 50, bd.peers[0]) _, err := ps.AnnouncePeers(bd.infohashes[i%1000], false, 50, bd.peers[0])
return err return err
}) })
@@ -407,8 +407,8 @@ func AnnounceLeecher1kInfohash(b *testing.B, ps PeerStore) {
// leecher. // leecher.
// //
// AnnounceSeeder can run in parallel. // AnnounceSeeder can run in parallel.
func AnnounceSeeder(b *testing.B, ps PeerStore) { func AnnounceSeeder(b *testing.B, ps Storage) {
runBenchmark(b, ps, true, putPeers, func(i int, ps PeerStore, bd *benchData) error { runBenchmark(b, ps, true, putPeers, func(i int, ps Storage, bd *benchData) error {
_, err := ps.AnnouncePeers(bd.infohashes[0], true, 50, bd.peers[0]) _, err := ps.AnnouncePeers(bd.infohashes[0], true, 50, bd.peers[0])
return err return err
}) })
@@ -418,19 +418,19 @@ func AnnounceSeeder(b *testing.B, ps PeerStore) {
// infohashes. // infohashes.
// //
// AnnounceSeeder1kInfohash can run in parallel. // AnnounceSeeder1kInfohash can run in parallel.
func AnnounceSeeder1kInfohash(b *testing.B, ps PeerStore) { func AnnounceSeeder1kInfohash(b *testing.B, ps Storage) {
runBenchmark(b, ps, true, putPeers, func(i int, ps PeerStore, bd *benchData) error { runBenchmark(b, ps, true, putPeers, func(i int, ps Storage, bd *benchData) error {
_, err := ps.AnnouncePeers(bd.infohashes[i%1000], true, 50, bd.peers[0]) _, err := ps.AnnouncePeers(bd.infohashes[i%1000], true, 50, bd.peers[0])
return err return err
}) })
} }
// ScrapeSwarm benchmarks the ScrapeSwarm method of a PeerStore. // ScrapeSwarm benchmarks the ScrapeSwarm method of a Storage.
// The swarm scraped has 500 seeders and 500 leechers. // The swarm scraped has 500 seeders and 500 leechers.
// //
// ScrapeSwarm can run in parallel. // ScrapeSwarm can run in parallel.
func ScrapeSwarm(b *testing.B, ps PeerStore) { func ScrapeSwarm(b *testing.B, ps Storage) {
runBenchmark(b, ps, true, putPeers, func(i int, ps PeerStore, bd *benchData) error { runBenchmark(b, ps, true, putPeers, func(i int, ps Storage, bd *benchData) error {
ps.ScrapeSwarm(bd.infohashes[0], bittorrent.IPv4) ps.ScrapeSwarm(bd.infohashes[0], bittorrent.IPv4)
return nil return nil
}) })
@@ -439,8 +439,8 @@ func ScrapeSwarm(b *testing.B, ps PeerStore) {
// ScrapeSwarm1kInfohash behaves like ScrapeSwarm with one of 1000 infohashes. // ScrapeSwarm1kInfohash behaves like ScrapeSwarm with one of 1000 infohashes.
// //
// ScrapeSwarm1kInfohash can run in parallel. // ScrapeSwarm1kInfohash can run in parallel.
func ScrapeSwarm1kInfohash(b *testing.B, ps PeerStore) { func ScrapeSwarm1kInfohash(b *testing.B, ps Storage) {
runBenchmark(b, ps, true, putPeers, func(i int, ps PeerStore, bd *benchData) error { runBenchmark(b, ps, true, putPeers, func(i int, ps Storage, bd *benchData) error {
ps.ScrapeSwarm(bd.infohashes[i%1000], bittorrent.IPv4) ps.ScrapeSwarm(bd.infohashes[i%1000], bittorrent.IPv4)
return nil return nil
}) })

View File

@@ -11,30 +11,34 @@ import (
// PeerEqualityFunc is the boolean function to use to check two Peers for // PeerEqualityFunc is the boolean function to use to check two Peers for
// equality. // equality.
// Depending on the implementation of the PeerStore, this can be changed to // Depending on the implementation of the Storage, this can be changed to
// use (Peer).EqualEndpoint instead. // use (Peer).EqualEndpoint instead.
var PeerEqualityFunc = func(p1, p2 bittorrent.Peer) bool { return p1.Equal(p2) } var PeerEqualityFunc = func(p1, p2 bittorrent.Peer) bool { return p1.Equal(p2) }
// TestPeerStore tests a PeerStore implementation against the interface. // TestPeerStore tests a Storage implementation against the interface.
func TestPeerStore(t *testing.T, p PeerStore) { func TestPeerStore(t *testing.T, p Storage) {
ih0, _ := bittorrent.InfoHashFromString("00000000000000000001") ih0, _ := bittorrent.NewInfoHash([]byte("00000000000000000001"))
ih1, _ := bittorrent.InfoHashFromString("00000000000000000002") ih1, _ := bittorrent.NewInfoHash([]byte("00000000000000000002"))
id0, _ := bittorrent.NewPeerID([]byte("00000000000000000001"))
id1, _ := bittorrent.NewPeerID([]byte("00000000000000000001"))
id2, _ := bittorrent.NewPeerID([]byte("00000000000000000001"))
id3, _ := bittorrent.NewPeerID([]byte("00000000000000000001"))
testData := []struct { testData := []struct {
ih bittorrent.InfoHash ih bittorrent.InfoHash
peer bittorrent.Peer peer bittorrent.Peer
}{ }{
{ {
ih0, ih0,
bittorrent.Peer{ID: bittorrent.PeerIDFromString("00000000000000000001"), Port: 1, IP: bittorrent.IP{IP: net.ParseIP("1.1.1.1").To4(), AddressFamily: bittorrent.IPv4}}, bittorrent.Peer{ID: id0, Port: 1, IP: bittorrent.IP{IP: net.ParseIP("1.1.1.1").To4(), AddressFamily: bittorrent.IPv4}},
}, },
{ {
ih1, ih1,
bittorrent.Peer{ID: bittorrent.PeerIDFromString("00000000000000000002"), Port: 2, IP: bittorrent.IP{IP: net.ParseIP("abab::0001"), AddressFamily: bittorrent.IPv6}}, bittorrent.Peer{ID: id1, Port: 2, IP: bittorrent.IP{IP: net.ParseIP("abab::0001"), AddressFamily: bittorrent.IPv6}},
}, },
} }
v4Peer := bittorrent.Peer{ID: bittorrent.PeerIDFromString("99999999999999999994"), IP: bittorrent.IP{IP: net.ParseIP("99.99.99.99").To4(), AddressFamily: bittorrent.IPv4}, Port: 9994} v4Peer := bittorrent.Peer{ID: id2, IP: bittorrent.IP{IP: net.ParseIP("99.99.99.99").To4(), AddressFamily: bittorrent.IPv4}, Port: 9994}
v6Peer := bittorrent.Peer{ID: bittorrent.PeerIDFromString("99999999999999999996"), IP: bittorrent.IP{IP: net.ParseIP("fc00::0001"), AddressFamily: bittorrent.IPv6}, Port: 9996} v6Peer := bittorrent.Peer{ID: id3, IP: bittorrent.IP{IP: net.ParseIP("fc00::0001"), AddressFamily: bittorrent.IPv6}, Port: 9996}
for _, c := range testData { for _, c := range testData {
peer := v4Peer peer := v4Peer