diff --git a/bittorrent/bittorrent.go b/bittorrent/bittorrent.go index dff7d1c..07ce499 100644 --- a/bittorrent/bittorrent.go +++ b/bittorrent/bittorrent.go @@ -12,20 +12,23 @@ import ( "github.com/chihaya/chihaya/pkg/log" ) -// PeerID represents a peer ID. -type PeerID [20]byte +const PeerIDLen = 20 -// PeerIDFromBytes creates a PeerID from a byte slice. +// PeerID represents a peer ID. +type PeerID [PeerIDLen]byte + +var InvalidPeerIDSizeError = errors.New("peer ID must be 20 bytes") + +// NewPeerID creates a PeerID from a byte slice. // // It panics if b is not 20 bytes long. -func PeerIDFromBytes(b []byte) PeerID { - if len(b) != 20 { - panic("peer ID must be 20 bytes") +func NewPeerID(b []byte) (PeerID, error) { + var p PeerID + if len(b) != PeerIDLen { + return p, InvalidPeerIDSizeError } - - var buf [20]byte - copy(buf[:], b) - return buf + copy(p[:], b) + return p, nil } // String implements fmt.Stringer, returning the base16 encoded PeerID. @@ -38,36 +41,22 @@ func (p PeerID) RawString() string { return string(p[:]) } -// PeerIDFromString creates a PeerID from a string. -// -// It panics if s is not 20 bytes long. -func PeerIDFromString(s string) PeerID { - if len(s) != 20 { - panic("peer ID must be 20 bytes") - } - - var buf [20]byte - copy(buf[:], s) - return buf -} - // InfoHash represents an infohash. -type InfoHash []byte +type InfoHash string -const( - InfoHashV1Len = 20 - InfoHashV2Len = 32 +const ( + InfoHashV1Len = 20 + InfoHashV2Len = 32 + NoneInfoHash InfoHash = "" ) -var invalidHashSize = errors.New("InfoHash must be either 20 (for torrent V1) or 32 (V2) bytes") +var InvalidHashSizeError = errors.New("info hash must be either 20 (for torrent V1) or 32 (V2) bytes") -// BytesV1 returns 20-bytes length array of the corresponding InfoHash. +// TruncateV1 returns truncated to 20-bytes length array of the corresponding InfoHash. // If InfoHash is V2 (32 bytes), it will be truncated to 20 bytes // according to BEP52. -func (i InfoHash) BytesV1() [InfoHashV1Len]byte{ - var bb [InfoHashV1Len]byte - copy(bb[:], i) - return bb +func (i InfoHash) TruncateV1() InfoHash { + return i[:InfoHashV1Len] } // ValidateInfoHash validates input bytes size and returns it @@ -76,27 +65,21 @@ func (i InfoHash) BytesV1() [InfoHashV1Len]byte{ func ValidateInfoHash(b []byte) (int, error) { l := len(b) if l != InfoHashV1Len && l != InfoHashV2Len { - return 0, invalidHashSize + return 0, InvalidHashSizeError } return l, nil } -// InfoHashFromBytes creates an InfoHash from a byte slice. -func InfoHashFromBytes(b []byte) (InfoHash, error) { - if l, err := ValidateInfoHash(b); err != nil{ - return nil, err +// NewInfoHash creates an InfoHash from a byte slice. +func NewInfoHash(b []byte) (InfoHash, error) { + if _, err := ValidateInfoHash(b); err != nil { + return NoneInfoHash, err } else { - buf := make([]byte, l) - copy(buf[:], b) + buf := InfoHash(b) return buf, nil } } -// InfoHashFromString creates an InfoHash from a string. -func InfoHashFromString(s string) (InfoHash, error) { - return InfoHashFromBytes([]byte(s)) -} - // String implements fmt.Stringer, returning the base16 encoded InfoHash. func (i InfoHash) String() string { return fmt.Sprintf("%x", i[:]) diff --git a/bittorrent/bittorrent_test.go b/bittorrent/bittorrent_test.go index 7ef007b..7b39a45 100644 --- a/bittorrent/bittorrent_test.go +++ b/bittorrent/bittorrent_test.go @@ -19,7 +19,7 @@ var peerStringTestCases = []struct { }{ { input: Peer{ - ID: PeerIDFromBytes(b), + ID: NewPeerID(b), IP: IP{net.IPv4(10, 11, 12, 1), IPv4}, Port: 1234, }, @@ -27,7 +27,7 @@ var peerStringTestCases = []struct { }, { input: Peer{ - ID: PeerIDFromBytes(b), + ID: NewPeerID(b), IP: IP{net.ParseIP("2001:db8::ff00:42:8329"), IPv6}, Port: 1234, }, @@ -36,12 +36,12 @@ var peerStringTestCases = []struct { } func TestPeerID_String(t *testing.T) { - s := PeerIDFromBytes(b).String() + s := NewPeerID(b).String() require.Equal(t, expected, s) } func TestInfoHash_String(t *testing.T) { - ih, err := InfoHashFromBytes(b) + ih, err := NewInfoHash(b) require.Nil(t, err) require.Equal(t, expected, ih.String()) } diff --git a/cmd/chihaya/e2e.go b/cmd/chihaya/e2e.go index a550f6c..17ae662 100644 --- a/cmd/chihaya/e2e.go +++ b/cmd/chihaya/e2e.go @@ -57,17 +57,17 @@ func EndToEndRunCmdFunc(cmd *cobra.Command, args []string) error { func generateInfohash() bittorrent.InfoHash { b := make([]byte, 20) rand.Read(b) - ih, _ := bittorrent.InfoHashFromBytes(b) + ih, _ := bittorrent.NewInfoHash(b) return ih } func test(addr string, delay time.Duration) error { - ih, _ := generateInfohash().BytesV1() + ih := generateInfohash().TruncateV1() return testWithInfohash(ih, addr, delay) } -func testWithInfohash(infoHash [20]byte, url string, delay time.Duration) error { - var ih [20]byte +func testWithInfohash(infoHash bittorrent.InfoHash, url string, delay time.Duration) error { + var ih [bittorrent.InfoHashV1Len]byte req := tracker.AnnounceRequest{ InfoHash: ih, PeerId: [20]byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20}, @@ -95,8 +95,9 @@ func testWithInfohash(infoHash [20]byte, url string, delay time.Duration) error time.Sleep(delay) + copy(ih[:], infoHash) req = tracker.AnnounceRequest{ - InfoHash: infoHash, + InfoHash: ih, PeerId: [20]byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 21}, Downloaded: 50, Left: 100, diff --git a/cmd/chihaya/main.go b/cmd/chihaya/main.go index bb60c19..1d8c30d 100644 --- a/cmd/chihaya/main.go +++ b/cmd/chihaya/main.go @@ -24,7 +24,7 @@ import ( // Run represents the state of a running instance of Chihaya. type Run struct { configFilePath string - peerStore storage.PeerStore + storage storage.Storage logic *middleware.Logic sg *stop.Group } @@ -41,7 +41,7 @@ func NewRun(configFilePath string) (*Run, error) { // Start begins an instance of Chihaya. // It is optional to provide an instance of the peer store to avoid the // creation of a new one. -func (r *Run) Start(ps storage.PeerStore) error { +func (r *Run) Start(ps storage.Storage) error { configFile, err := ParseConfigFile(r.configFilePath) if err != nil { return errors.New("failed to read config: " + err.Error()) @@ -59,19 +59,19 @@ func (r *Run) Start(ps storage.PeerStore) error { if ps == nil { log.Info("starting storage", log.Fields{"name": cfg.Storage.Name}) - ps, err = storage.NewPeerStore(cfg.Storage.Name, cfg.Storage.Config) + ps, err = storage.NewStorage(cfg.Storage.Name, cfg.Storage.Config) if err != nil { return errors.New("failed to create storage: " + err.Error()) } log.Info("started storage", ps) } - r.peerStore = ps + r.storage = ps - preHooks, err := middleware.HooksFromHookConfigs(cfg.PreHooks) + preHooks, err := middleware.HooksFromHookConfigs(cfg.PreHooks, r.storage) if err != nil { return errors.New("failed to validate hook config: " + err.Error()) } - postHooks, err := middleware.HooksFromHookConfigs(cfg.PostHooks) + postHooks, err := middleware.HooksFromHookConfigs(cfg.PostHooks, r.storage) if err != nil { return errors.New("failed to validate hook config: " + err.Error()) } @@ -80,7 +80,7 @@ func (r *Run) Start(ps storage.PeerStore) error { "prehooks": cfg.PreHookNames(), "posthooks": cfg.PostHookNames(), }) - r.logic = middleware.NewLogic(cfg.ResponseConfig, r.peerStore, preHooks, postHooks) + r.logic = middleware.NewLogic(cfg.ResponseConfig, r.storage, preHooks, postHooks) if cfg.HTTPConfig.Addr != "" { log.Info("starting HTTP frontend", cfg.HTTPConfig) @@ -113,7 +113,7 @@ func combineErrors(prefix string, errs []error) error { } // Stop shuts down an instance of Chihaya. -func (r *Run) Stop(keepPeerStore bool) (storage.PeerStore, error) { +func (r *Run) Stop(keepPeerStore bool) (storage.Storage, error) { log.Debug("stopping frontends and metrics server") if errs := r.sg.Stop().Wait(); len(errs) != 0 { return nil, combineErrors("failed while shutting down frontends", errs) @@ -126,13 +126,13 @@ func (r *Run) Stop(keepPeerStore bool) (storage.PeerStore, error) { if !keepPeerStore { log.Debug("stopping peer store") - if errs := r.peerStore.Stop().Wait(); len(errs) != 0 { + if errs := r.storage.Stop().Wait(); len(errs) != 0 { return nil, combineErrors("failed while shutting down peer store", errs) } - r.peerStore = nil + r.storage = nil } - return r.peerStore, nil + return r.storage, nil } // RootRunCmdFunc implements a Cobra command that runs an instance of Chihaya diff --git a/frontend/http/parser.go b/frontend/http/parser.go index ca53e87..f795114 100644 --- a/frontend/http/parser.go +++ b/frontend/http/parser.go @@ -67,10 +67,10 @@ func ParseAnnounce(r *http.Request, opts ParseOptions) (*bittorrent.AnnounceRequ if !ok { return nil, bittorrent.ClientError("failed to parse parameter: peer_id") } - if len(peerID) != 20 { + if len(peerID) != bittorrent.PeerIDLen { return nil, bittorrent.ClientError("failed to provide valid peer_id") } - request.Peer.ID = bittorrent.PeerIDFromString(peerID) + request.Peer.ID = bittorrent.NewPeerID([]byte(peerID)) // Determine the number of remaining bytes for the client. request.Left, err = qp.Uint64("left") diff --git a/frontend/udp/frontend_test.go b/frontend/udp/frontend_test.go index b4cff14..d4a234b 100644 --- a/frontend/udp/frontend_test.go +++ b/frontend/udp/frontend_test.go @@ -10,7 +10,7 @@ import ( ) func TestStartStopRaceIssue437(t *testing.T) { - ps, err := storage.NewPeerStore("memory", nil) + ps, err := storage.NewStorage("memory", nil) if err != nil { t.Fatal(err) } diff --git a/frontend/udp/parser.go b/frontend/udp/parser.go index 9df6b3c..1fdbbdd 100644 --- a/frontend/udp/parser.go +++ b/frontend/udp/parser.go @@ -112,7 +112,7 @@ func ParseAnnounce(r Request, v6Action bool, opts ParseOptions) (*bittorrent.Ann return nil, err } - ih, err := bittorrent.InfoHashFromBytes(infohash) + ih, err := bittorrent.NewInfoHash(infohash) if err != nil{ return nil, err } @@ -128,7 +128,7 @@ func ParseAnnounce(r Request, v6Action bool, opts ParseOptions) (*bittorrent.Ann NumWantProvided: true, EventProvided: true, Peer: bittorrent.Peer{ - ID: bittorrent.PeerIDFromBytes(peerID), + ID: bittorrent.NewPeerID(peerID), IP: bittorrent.IP{IP: ip}, Port: port, }, @@ -227,7 +227,7 @@ func ParseScrape(r Request, opts ParseOptions) (*bittorrent.ScrapeRequest, error pageSize = bittorrent.InfoHashV2Len } for len(r.Packet) >= pageSize { - if ih, err := bittorrent.InfoHashFromBytes(r.Packet[:pageSize]); err != nil{ + if ih, err := bittorrent.NewInfoHash(r.Packet[:pageSize]); err != nil{ return nil, err } else { infohashes = append(infohashes, ih) diff --git a/middleware/clientapproval/client_id_test.go b/middleware/clientapproval/client_id_test.go index 3f9f408..486c351 100644 --- a/middleware/clientapproval/client_id_test.go +++ b/middleware/clientapproval/client_id_test.go @@ -47,7 +47,7 @@ func TestClientID(t *testing.T) { t.Run(tt.peerID, func(t *testing.T) { var clientID ClientID copy(clientID[:], tt.clientID) - parsedID := NewClientID(bittorrent.PeerIDFromString(tt.peerID)) + parsedID := NewClientID(bittorrent.NewPeerID([]byte(tt.peerID))) if parsedID != clientID { t.Error("Incorrectly parsed peer ID", tt.peerID, "as", parsedID) } diff --git a/middleware/clientapproval/clientapproval.go b/middleware/clientapproval/clientapproval.go index a1b4fb5..5c857ed 100644 --- a/middleware/clientapproval/clientapproval.go +++ b/middleware/clientapproval/clientapproval.go @@ -6,6 +6,7 @@ import ( "context" "errors" "fmt" + "github.com/chihaya/chihaya/storage" "gopkg.in/yaml.v2" "github.com/chihaya/chihaya/bittorrent" @@ -23,7 +24,7 @@ var _ middleware.Driver = driver{} type driver struct{} -func (d driver) NewHook(optionBytes []byte) (middleware.Hook, error) { +func (d driver) NewHook(optionBytes []byte, _ storage.Storage) (middleware.Hook, error) { var cfg Config err := yaml.Unmarshal(optionBytes, &cfg) if err != nil { @@ -100,7 +101,7 @@ func (h *hook) HandleAnnounce(ctx context.Context, req *bittorrent.AnnounceReque return ctx, nil } -func (h *hook) HandleScrape(ctx context.Context, req *bittorrent.ScrapeRequest, resp *bittorrent.ScrapeResponse) (context.Context, error) { +func (h *hook) HandleScrape(ctx context.Context, _ *bittorrent.ScrapeRequest, _ *bittorrent.ScrapeResponse) (context.Context, error) { // Scrapes don't require any protection. return ctx, nil } diff --git a/middleware/clientapproval/clientapproval_test.go b/middleware/clientapproval/clientapproval_test.go index 95632d8..f46ae77 100644 --- a/middleware/clientapproval/clientapproval_test.go +++ b/middleware/clientapproval/clientapproval_test.go @@ -59,7 +59,7 @@ func TestHandleAnnounce(t *testing.T) { req := &bittorrent.AnnounceRequest{} resp := &bittorrent.AnnounceResponse{} - peerid := bittorrent.PeerIDFromString(tt.peerID) + peerid := bittorrent.NewPeerID([]byte(tt.peerID)) req.Peer.ID = peerid diff --git a/middleware/hooks.go b/middleware/hooks.go index 8a8e9cf..b508f16 100644 --- a/middleware/hooks.go +++ b/middleware/hooks.go @@ -25,7 +25,7 @@ type skipSwarmInteraction struct{} var SkipSwarmInteractionKey = skipSwarmInteraction{} type swarmInteractionHook struct { - store storage.PeerStore + store storage.Storage } func (h *swarmInteractionHook) HandleAnnounce(ctx context.Context, req *bittorrent.AnnounceRequest, resp *bittorrent.AnnounceResponse) (_ context.Context, err error) { @@ -85,7 +85,7 @@ type scrapeAddressType struct{} var ScrapeIsIPv6Key = scrapeAddressType{} type responseHook struct { - store storage.PeerStore + store storage.Storage } func (h *responseHook) HandleAnnounce(ctx context.Context, req *bittorrent.AnnounceRequest, resp *bittorrent.AnnounceResponse) (_ context.Context, err error) { diff --git a/middleware/jwt/jwt.go b/middleware/jwt/jwt.go index f9f35f9..2cbccd3 100644 --- a/middleware/jwt/jwt.go +++ b/middleware/jwt/jwt.go @@ -13,6 +13,7 @@ import ( "encoding/json" "errors" "fmt" + "github.com/chihaya/chihaya/storage" "net/http" "strings" "time" @@ -40,7 +41,7 @@ var _ middleware.Driver = driver{} type driver struct{} -func (d driver) NewHook(optionBytes []byte) (middleware.Hook, error) { +func (d driver) NewHook(optionBytes []byte, _ storage.Storage) (middleware.Hook, error) { var cfg Config err := yaml.Unmarshal(optionBytes, &cfg) if err != nil { @@ -174,7 +175,7 @@ func (h *hook) HandleAnnounce(ctx context.Context, req *bittorrent.AnnounceReque return ctx, nil } -func (h *hook) HandleScrape(ctx context.Context, req *bittorrent.ScrapeRequest, resp *bittorrent.ScrapeResponse) (context.Context, error) { +func (h *hook) HandleScrape(ctx context.Context, _ *bittorrent.ScrapeRequest, _ *bittorrent.ScrapeResponse) (context.Context, error) { // Scrapes don't require any protection. return ctx, nil } @@ -204,7 +205,7 @@ func validateJWT(ih bittorrent.InfoHash, jwtBytes []byte, cfgIss, cfgAud string, return jwt.ErrInvalidAUDClaim } - ihHex := hex.EncodeToString(ih[:]) + ihHex := hex.EncodeToString([]byte(ih)) if ihClaim, ok := claims.Get("infohash").(string); !ok || ihClaim != ihHex { log.Debug("unequal or missing infohash when validating JWT", log.Fields{ "exists": ok, diff --git a/middleware/logic.go b/middleware/logic.go index 3c06d25..8722bbf 100644 --- a/middleware/logic.go +++ b/middleware/logic.go @@ -26,11 +26,10 @@ var _ frontend.TrackerLogic = &Logic{} // NewLogic creates a new instance of a TrackerLogic that executes the provided // middleware hooks. -func NewLogic(cfg ResponseConfig, peerStore storage.PeerStore, preHooks, postHooks []Hook) *Logic { +func NewLogic(cfg ResponseConfig, peerStore storage.Storage, preHooks, postHooks []Hook) *Logic { return &Logic{ announceInterval: cfg.AnnounceInterval, minAnnounceInterval: cfg.MinAnnounceInterval, - peerStore: peerStore, preHooks: append(preHooks, &responseHook{store: peerStore}), postHooks: append(postHooks, &swarmInteractionHook{store: peerStore}), } @@ -41,7 +40,6 @@ func NewLogic(cfg ResponseConfig, peerStore storage.PeerStore, preHooks, postHoo type Logic struct { announceInterval time.Duration minAnnounceInterval time.Duration - peerStore storage.PeerStore preHooks []Hook postHooks []Hook } diff --git a/middleware/middleware.go b/middleware/middleware.go index 90644bf..ff918ee 100644 --- a/middleware/middleware.go +++ b/middleware/middleware.go @@ -4,6 +4,7 @@ package middleware import ( "errors" + "github.com/chihaya/chihaya/storage" "sync" yaml "gopkg.in/yaml.v2" @@ -23,7 +24,7 @@ var ( // The options parameter is YAML encoded bytes that should be unmarshalled into // the hook's custom configuration. type Driver interface { - NewHook(options []byte) (Hook, error) + NewHook(options []byte, storage storage.Storage) (Hook, error) } // RegisterDriver makes a Driver available by the provided name. @@ -52,7 +53,7 @@ func RegisterDriver(name string, d Driver) { // list of registered Drivers. // // If a driver does not exist, returns ErrDriverDoesNotExist. -func New(name string, optionBytes []byte) (Hook, error) { +func New(name string, optionBytes []byte, storage storage.Storage) (Hook, error) { driversM.RLock() defer driversM.RUnlock() @@ -62,7 +63,7 @@ func New(name string, optionBytes []byte) (Hook, error) { return nil, ErrDriverDoesNotExist } - return d.NewHook(optionBytes) + return d.NewHook(optionBytes, storage) } // Config is the generic configuration format used for all registered Hooks. @@ -72,7 +73,7 @@ type Config struct { } // HooksFromHookConfigs is a utility function for initializing Hooks in bulk. -func HooksFromHookConfigs(cfgs []Config) (hooks []Hook, err error) { +func HooksFromHookConfigs(cfgs []Config, storage storage.Storage) (hooks []Hook, err error) { for _, cfg := range cfgs { // Marshal the options back into bytes. var optionBytes []byte @@ -82,7 +83,7 @@ func HooksFromHookConfigs(cfgs []Config) (hooks []Hook, err error) { } var h Hook - h, err = New(cfg.Name, optionBytes) + h, err = New(cfg.Name, optionBytes, storage) if err != nil { return } diff --git a/middleware/pkg/random/entropy.go b/middleware/pkg/random/entropy.go index 8ee44a3..37574f5 100644 --- a/middleware/pkg/random/entropy.go +++ b/middleware/pkg/random/entropy.go @@ -11,7 +11,7 @@ import ( // // Calling DeriveEntropyFromRequest multiple times yields the same values. func DeriveEntropyFromRequest(req *bittorrent.AnnounceRequest) (uint64, uint64) { - v0 := binary.BigEndian.Uint64(req.InfoHash[:8]) + binary.BigEndian.Uint64(req.InfoHash[8:16]) + v0 := binary.BigEndian.Uint64([]byte(req.InfoHash[:8])) + binary.BigEndian.Uint64([]byte(req.InfoHash[8:16])) v1 := binary.BigEndian.Uint64(req.Peer.ID[:8]) + binary.BigEndian.Uint64(req.Peer.ID[8:16]) return v0, v1 } diff --git a/middleware/torrentapproval/container/container.go b/middleware/torrentapproval/container/container.go index 5a00cee..cb515fb 100644 --- a/middleware/torrentapproval/container/container.go +++ b/middleware/torrentapproval/container/container.go @@ -3,10 +3,11 @@ package container import ( "errors" "github.com/chihaya/chihaya/bittorrent" + "github.com/chihaya/chihaya/storage" "sync" ) -type Builder func ([]byte) (Container, error) +type Builder func ([]byte, storage.Storage) (Container, error) var ( buildersMU sync.Mutex @@ -32,7 +33,7 @@ type Container interface { Contains(bittorrent.InfoHash) bool } -func GetContainer(name string, confBytes []byte) (Container, error) { +func GetContainer(name string, confBytes []byte, storage storage.Storage) (Container, error) { buildersMU.Lock() defer buildersMU.Unlock() @@ -41,7 +42,7 @@ func GetContainer(name string, confBytes []byte) (Container, error) { if builder, exist := builders[name]; !exist { err = ErrContainerDoesNotExist } else { - cn, err = builder(confBytes) + cn, err = builder(confBytes, storage) } return cn, err } diff --git a/middleware/torrentapproval/container/directory/directory.go b/middleware/torrentapproval/container/directory/directory.go index 41a30a3..6856a74 100644 --- a/middleware/torrentapproval/container/directory/directory.go +++ b/middleware/torrentapproval/container/directory/directory.go @@ -9,6 +9,7 @@ import ( "github.com/chihaya/chihaya/middleware/torrentapproval/container" "github.com/chihaya/chihaya/middleware/torrentapproval/container/list" "github.com/chihaya/chihaya/pkg/stop" + "github.com/chihaya/chihaya/storage" "gopkg.in/yaml.v2" "sync" ) @@ -22,7 +23,8 @@ type Config struct { BlacklistPath string `yaml:"blacklist_path"` } -func build(confBytes []byte) (container.Container, error) { +// TODO: change sync map to provided storage +func build(confBytes []byte, storage storage.Storage) (container.Container, error) { c := new(Config) if err := yaml.Unmarshal(confBytes, c); err != nil { return nil, fmt.Errorf("unable to deserialise configuration: %v", err) @@ -42,6 +44,7 @@ func build(confBytes []byte) (container.Container, error) { if lst.Invert { dir = c.BlacklistPath } + //FIXME: implement V2 torrent add/delete var w *dirwatch.Instance if w, err = dirwatch.New(dir); err != nil { return nil, fmt.Errorf("unable to initialize directory watch: %v", err) diff --git a/middleware/torrentapproval/container/list/list.go b/middleware/torrentapproval/container/list/list.go index 13f4bb9..e6704c8 100644 --- a/middleware/torrentapproval/container/list/list.go +++ b/middleware/torrentapproval/container/list/list.go @@ -5,8 +5,9 @@ package list import ( "encoding/hex" "fmt" - "github.com/chihaya/chihaya/bittorrent" + bittorrent "github.com/chihaya/chihaya/bittorrent" "github.com/chihaya/chihaya/middleware/torrentapproval/container" + "github.com/chihaya/chihaya/storage" "gopkg.in/yaml.v2" "sync" ) @@ -22,7 +23,8 @@ type Config struct { var DUMMY struct{} -func build(confBytes []byte) (container.Container, error) { +// TODO: change sync map to provided storage +func build(confBytes []byte, storage storage.Storage) (container.Container, error) { c := new(Config) if err := yaml.Unmarshal(confBytes, c); err != nil { return nil, fmt.Errorf("unable to deserialise configuration: %v", err) @@ -41,14 +43,15 @@ func build(confBytes []byte) (container.Container, error) { } for _, hashString := range hashList { - hashinfo, err := hex.DecodeString(hashString) + hashBytes, err := hex.DecodeString(hashString) if err != nil { return nil, fmt.Errorf("whitelist : invalid hash %s, %v", hashString, err) } - if len(hashinfo) != 20 { - return nil, fmt.Errorf("whitelist : hash %s is not 20 byes", hashString) + ih, err := bittorrent.NewInfoHash(hashBytes) + if err != nil { + return nil, fmt.Errorf("whitelist : %s : %v", hashString, err) } - l.Hashes.Store(bittorrent.InfoHashFromBytes(hashinfo), DUMMY) + l.Hashes.Store(ih, DUMMY) } return l, nil } diff --git a/middleware/torrentapproval/torrentapproval.go b/middleware/torrentapproval/torrentapproval.go index 69dba1f..f903f64 100644 --- a/middleware/torrentapproval/torrentapproval.go +++ b/middleware/torrentapproval/torrentapproval.go @@ -7,6 +7,7 @@ import ( "fmt" "github.com/chihaya/chihaya/middleware/torrentapproval/container" "github.com/chihaya/chihaya/pkg/stop" + "github.com/chihaya/chihaya/storage" "gopkg.in/yaml.v2" "github.com/chihaya/chihaya/bittorrent" @@ -24,7 +25,7 @@ func init() { type driver struct{} -func (d driver) NewHook(optionBytes []byte) (middleware.Hook, error) { +func (d driver) NewHook(optionBytes []byte, storage storage.Storage) (middleware.Hook, error) { var cfg middleware.Config err := yaml.Unmarshal(optionBytes, &cfg) if err != nil { @@ -44,7 +45,7 @@ func (d driver) NewHook(optionBytes []byte) (middleware.Hook, error) { return nil, err } - if c, err := container.GetContainer(cfg.Name, confBytes); err == nil { + if c, err := container.GetContainer(cfg.Name, confBytes, storage); err == nil { return &hook{c}, nil } else { return nil, err diff --git a/middleware/torrentapproval/torrentapproval_test.go b/middleware/torrentapproval/torrentapproval_test.go index 1a22f35..bdf0ea0 100644 --- a/middleware/torrentapproval/torrentapproval_test.go +++ b/middleware/torrentapproval/torrentapproval_test.go @@ -80,7 +80,7 @@ func TestHandleAnnounce(t *testing.T) { hashbytes, err := hex.DecodeString(tt.ih) require.Nil(t, err) - hashinfo := bittorrent.InfoHashFromBytes(hashbytes) + hashinfo := bittorrent.NewInfoHash(hashbytes) req.InfoHash = hashinfo diff --git a/middleware/varinterval/varinterval.go b/middleware/varinterval/varinterval.go index 42b1adc..0e7af22 100644 --- a/middleware/varinterval/varinterval.go +++ b/middleware/varinterval/varinterval.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "github.com/chihaya/chihaya/storage" "sync" "time" @@ -25,7 +26,7 @@ var _ middleware.Driver = driver{} type driver struct{} -func (d driver) NewHook(optionBytes []byte) (middleware.Hook, error) { +func (d driver) NewHook(optionBytes []byte, _ storage.Storage) (middleware.Hook, error) { var cfg Config err := yaml.Unmarshal(optionBytes, &cfg) if err != nil { @@ -96,12 +97,12 @@ func (h *hook) HandleAnnounce(ctx context.Context, req *bittorrent.AnnounceReque if h.cfg.ModifyResponseProbability == 1 || p < h.cfg.ModifyResponseProbability { // Generate the increase delta. v, _, _ = random.Intn(s0, s1, h.cfg.MaxIncreaseDelta) - addSeconds := time.Duration(v+1) * time.Second + add := time.Duration(v+1) * time.Second - resp.Interval += addSeconds + resp.Interval += add if h.cfg.ModifyMinInterval { - resp.MinInterval += addSeconds + resp.MinInterval += add } return ctx, nil diff --git a/storage/memory/peer_store.go b/storage/memory/peer_store.go index af66af9..e82f126 100644 --- a/storage/memory/peer_store.go +++ b/storage/memory/peer_store.go @@ -11,7 +11,7 @@ import ( yaml "gopkg.in/yaml.v2" - "github.com/chihaya/chihaya/bittorrent" + bittorrent "github.com/chihaya/chihaya/bittorrent" "github.com/chihaya/chihaya/pkg/log" "github.com/chihaya/chihaya/pkg/stop" "github.com/chihaya/chihaya/pkg/timecache" @@ -36,7 +36,7 @@ func init() { type driver struct{} -func (d driver) NewPeerStore(icfg interface{}) (storage.PeerStore, error) { +func (d driver) NewPeerStore(icfg interface{}) (storage.Storage, error) { // Marshal the config back into bytes. bytes, err := yaml.Marshal(icfg) if err != nil { @@ -53,7 +53,7 @@ func (d driver) NewPeerStore(icfg interface{}) (storage.PeerStore, error) { return New(cfg) } -// Config holds the configuration of a memory PeerStore. +// Config holds the configuration of a memory Storage. type Config struct { GarbageCollectionInterval time.Duration `yaml:"gc_interval"` PrometheusReportingInterval time.Duration `yaml:"prometheus_reporting_interval"` @@ -118,8 +118,8 @@ func (cfg Config) Validate() Config { return validcfg } -// New creates a new PeerStore backed by memory. -func New(provided Config) (storage.PeerStore, error) { +// New creates a new Storage backed by memory. +func New(provided Config) (storage.Storage, error) { cfg := provided.Validate() ps := &peerStore{ cfg: cfg, @@ -142,7 +142,9 @@ func New(provided Config) (storage.PeerStore, error) { case <-time.After(cfg.GarbageCollectionInterval): before := time.Now().Add(-cfg.PeerLifetime) log.Debug("storage: purging peers with no announces since", log.Fields{"before": before}) - ps.collectGarbage(before) + if err := ps.collectGarbage(before); err != nil { + log.Error(err) + } } } }() @@ -177,17 +179,22 @@ type serializedPeer string func newPeerKey(p bittorrent.Peer) serializedPeer { b := make([]byte, 20+2+len(p.IP.IP)) copy(b[:20], p.ID[:]) - binary.BigEndian.PutUint16(b[20:22], p.Port) - copy(b[22:], p.IP.IP) + binary.BigEndian.PutUint16(b[bittorrent.PeerIDLen:bittorrent.PeerIDLen+2], p.Port) + copy(b[bittorrent.PeerIDLen+2:], p.IP.IP) return serializedPeer(b) } +// TODO: move duplicated code into one place func decodePeerKey(pk serializedPeer) bittorrent.Peer { + peerId, err := bittorrent.NewPeerID([]byte(pk[:bittorrent.PeerIDLen])) + if err != nil { + panic(err) + } peer := bittorrent.Peer{ - ID: bittorrent.PeerIDFromString(string(pk[:20])), - Port: binary.BigEndian.Uint16([]byte(pk[20:22])), - IP: bittorrent.IP{IP: net.IP(pk[22:])}} + ID: peerId, + Port: binary.BigEndian.Uint16([]byte(pk[bittorrent.PeerIDLen:bittorrent.PeerIDLen+2])), + IP: bittorrent.IP{IP: net.IP(pk[bittorrent.PeerIDLen+2:])}} if ip := peer.IP.To4(); ip != nil { peer.IP.IP = ip @@ -222,7 +229,7 @@ type peerStore struct { wg sync.WaitGroup } -var _ storage.PeerStore = &peerStore{} +var _ storage.Storage = &peerStore{} // populateProm aggregates metrics over all shards and then posts them to // prometheus. @@ -255,7 +262,7 @@ func (ps *peerStore) shardIndex(infoHash bittorrent.InfoHash, af bittorrent.Addr // There are twice the amount of shards specified by the user, the first // half is dedicated to IPv4 swarms and the second half is dedicated to // IPv6 swarms. - idx := binary.BigEndian.Uint32(infoHash[:4]) % (uint32(len(ps.shards)) / 2) + idx := binary.BigEndian.Uint32([]byte(infoHash[:4])) % (uint32(len(ps.shards)) / 2) if af == bittorrent.IPv6 { idx += uint32(len(ps.shards) / 2) } @@ -512,7 +519,7 @@ func (ps *peerStore) ScrapeSwarm(ih bittorrent.InfoHash, addressFamily bittorren return } -// collectGarbage deletes all Peers from the PeerStore which are older than the +// collectGarbage deletes all Peers from the Storage which are older than the // cutoff time. // // This function must be able to execute while other methods on this interface diff --git a/storage/memory/peer_store_test.go b/storage/memory/peer_store_test.go index 25b2ec4..4e35131 100644 --- a/storage/memory/peer_store_test.go +++ b/storage/memory/peer_store_test.go @@ -7,7 +7,7 @@ import ( s "github.com/chihaya/chihaya/storage" ) -func createNew() s.PeerStore { +func createNew() s.Storage { ps, err := New(Config{ ShardCount: 1024, GarbageCollectionInterval: 10 * time.Minute, diff --git a/storage/redis/peer_store.go b/storage/redis/peer_store.go index 5f45458..7d406d6 100644 --- a/storage/redis/peer_store.go +++ b/storage/redis/peer_store.go @@ -61,7 +61,7 @@ func init() { type driver struct{} -func (d driver) NewPeerStore(icfg interface{}) (storage.PeerStore, error) { +func (d driver) NewPeerStore(icfg interface{}) (storage.Storage, error) { // Marshal the config back into bytes. bytes, err := yaml.Marshal(icfg) if err != nil { @@ -78,7 +78,7 @@ func (d driver) NewPeerStore(icfg interface{}) (storage.PeerStore, error) { return New(cfg) } -// Config holds the configuration of a redis PeerStore. +// Config holds the configuration of a redis Storage. type Config struct { GarbageCollectionInterval time.Duration `yaml:"gc_interval"` PrometheusReportingInterval time.Duration `yaml:"prometheus_reporting_interval"` @@ -176,8 +176,8 @@ func (cfg Config) Validate() Config { return validcfg } -// New creates a new PeerStore backed by redis. -func New(provided Config) (storage.PeerStore, error) { +// New creates a new Storage backed by redis. +func New(provided Config) (storage.Storage, error) { cfg := provided.Validate() u, err := parseRedisURL(cfg.RedisBroker) @@ -237,19 +237,24 @@ func New(provided Config) (storage.PeerStore, error) { type serializedPeer string func newPeerKey(p bittorrent.Peer) serializedPeer { - b := make([]byte, 20+2+len(p.IP.IP)) - copy(b[:20], p.ID[:]) - binary.BigEndian.PutUint16(b[20:22], p.Port) - copy(b[22:], p.IP.IP) + b := make([]byte, bittorrent.PeerIDLen+2+len(p.IP.IP)) + copy(b[:bittorrent.PeerIDLen], p.ID[:]) + binary.BigEndian.PutUint16(b[bittorrent.PeerIDLen:bittorrent.PeerIDLen+2], p.Port) + copy(b[bittorrent.PeerIDLen+2:], p.IP.IP) return serializedPeer(b) } +// TODO: move duplicated code into one place func decodePeerKey(pk serializedPeer) bittorrent.Peer { + peerId, err := bittorrent.NewPeerID([]byte(pk[:bittorrent.PeerIDLen])) + if err != nil { + panic(err) + } peer := bittorrent.Peer{ - ID: bittorrent.PeerIDFromString(string(pk[:20])), - Port: binary.BigEndian.Uint16([]byte(pk[20:22])), - IP: bittorrent.IP{IP: net.IP(pk[22:])}} + ID: peerId, + Port: binary.BigEndian.Uint16([]byte(pk[bittorrent.PeerIDLen : bittorrent.PeerIDLen+2])), + IP: bittorrent.IP{IP: net.IP(pk[bittorrent.PeerIDLen+2:])}} if ip := peer.IP.To4(); ip != nil { peer.IP.IP = ip @@ -295,12 +300,21 @@ func (ps *peerStore) leecherCountKey(af string) string { return af + "_L_count" } +func (ps *peerStore) getConnection() redis.Conn { + select { + case <-ps.closed: + panic("attempted to interact with stopped redis store") + default: + } + return ps.rb.open() +} + // populateProm aggregates metrics over all groups and then posts them to // prometheus. func (ps *peerStore) populateProm() { var numInfohashes, numSeeders, numLeechers int64 - conn := ps.rb.open() + conn := ps.getConnection() defer conn.Close() for _, group := range ps.groups() { @@ -346,18 +360,11 @@ func (ps *peerStore) PutSeeder(ih bittorrent.InfoHash, p bittorrent.Peer) error "Peer": p, }) - select { - case <-ps.closed: - panic("attempted to interact with stopped redis store") - default: - } - pk := newPeerKey(p) - encodedSeederInfoHash := ps.seederInfohashKey(addressFamily, ih.String()) ct := ps.getClock() - conn := ps.rb.open() + conn := ps.getConnection() defer conn.Close() conn.Send("MULTI") @@ -393,15 +400,9 @@ func (ps *peerStore) DeleteSeeder(ih bittorrent.InfoHash, p bittorrent.Peer) err "Peer": p, }) - select { - case <-ps.closed: - panic("attempted to interact with stopped redis store") - default: - } - pk := newPeerKey(p) - conn := ps.rb.open() + conn := ps.getConnection() defer conn.Close() encodedSeederInfoHash := ps.seederInfohashKey(addressFamily, ih.String()) @@ -427,18 +428,12 @@ func (ps *peerStore) PutLeecher(ih bittorrent.InfoHash, p bittorrent.Peer) error "Peer": p, }) - select { - case <-ps.closed: - panic("attempted to interact with stopped redis store") - default: - } - // Update the peer in the swarm. encodedLeecherInfoHash := ps.leecherInfohashKey(addressFamily, ih.String()) pk := newPeerKey(p) ct := ps.getClock() - conn := ps.rb.open() + conn := ps.getConnection() defer conn.Close() conn.Send("MULTI") @@ -465,13 +460,7 @@ func (ps *peerStore) DeleteLeecher(ih bittorrent.InfoHash, p bittorrent.Peer) er "Peer": p, }) - select { - case <-ps.closed: - panic("attempted to interact with stopped redis store") - default: - } - - conn := ps.rb.open() + conn := ps.getConnection() defer conn.Close() pk := newPeerKey(p) @@ -484,9 +473,7 @@ func (ps *peerStore) DeleteLeecher(ih bittorrent.InfoHash, p bittorrent.Peer) er if delNum == 0 { return storage.ErrResourceDoesNotExist } - if _, err := conn.Do("DECR", ps.leecherCountKey(addressFamily)); err != nil { - return err - } + _, err = conn.Do("DECR", ps.leecherCountKey(addressFamily)) return nil } @@ -498,19 +485,13 @@ func (ps *peerStore) GraduateLeecher(ih bittorrent.InfoHash, p bittorrent.Peer) "Peer": p, }) - select { - case <-ps.closed: - panic("attempted to interact with stopped redis store") - default: - } - encodedInfoHash := ih.String() encodedLeecherInfoHash := ps.leecherInfohashKey(addressFamily, encodedInfoHash) encodedSeederInfoHash := ps.seederInfohashKey(addressFamily, encodedInfoHash) pk := newPeerKey(p) ct := ps.getClock() - conn := ps.rb.open() + conn := ps.getConnection() defer conn.Close() conn.Send("MULTI") @@ -552,17 +533,11 @@ func (ps *peerStore) AnnouncePeers(ih bittorrent.InfoHash, seeder bool, numWant "Peer": announcer, }) - select { - case <-ps.closed: - panic("attempted to interact with stopped redis store") - default: - } - encodedInfoHash := ih.String() encodedLeecherInfoHash := ps.leecherInfohashKey(addressFamily, encodedInfoHash) encodedSeederInfoHash := ps.seederInfohashKey(addressFamily, encodedInfoHash) - conn := ps.rb.open() + conn := ps.getConnection() defer conn.Close() leechers, err := conn.Do("HKEYS", encodedLeecherInfoHash) @@ -624,11 +599,6 @@ func (ps *peerStore) AnnouncePeers(ih bittorrent.InfoHash, seeder bool, numWant } func (ps *peerStore) ScrapeSwarm(ih bittorrent.InfoHash, af bittorrent.AddressFamily) (resp bittorrent.Scrape) { - select { - case <-ps.closed: - panic("attempted to interact with stopped redis store") - default: - } resp.InfoHash = ih addressFamily := af.String() @@ -636,7 +606,7 @@ func (ps *peerStore) ScrapeSwarm(ih bittorrent.InfoHash, af bittorrent.AddressFa encodedLeecherInfoHash := ps.leecherInfohashKey(addressFamily, encodedInfoHash) encodedSeederInfoHash := ps.seederInfohashKey(addressFamily, encodedInfoHash) - conn := ps.rb.open() + conn := ps.getConnection() defer conn.Close() leechersLen, err := redis.Int64(conn.Do("HLEN", encodedLeecherInfoHash)) @@ -663,7 +633,7 @@ func (ps *peerStore) ScrapeSwarm(ih bittorrent.InfoHash, af bittorrent.AddressFa return } -// collectGarbage deletes all Peers from the PeerStore which are older than the +// collectGarbage deletes all Peers from the Storage which are older than the // cutoff time. // // This function must be able to execute while other methods on this interface @@ -709,18 +679,12 @@ func (ps *peerStore) ScrapeSwarm(ih bittorrent.InfoHash, af bittorrent.AddressFa // transaction. The infohash key will remain in the addressFamil hash and // we'll attempt to clean it up the next time collectGarbage runs. func (ps *peerStore) collectGarbage(cutoff time.Time) error { - select { - case <-ps.closed: - return nil - default: - } - - conn := ps.rb.open() - defer conn.Close() - cutoffUnix := cutoff.UnixNano() start := time.Now() + conn := ps.getConnection() + defer conn.Close() + for _, group := range ps.groups() { // list all infohashes in the group infohashesList, err := redis.Strings(conn.Do("HKEYS", group)) diff --git a/storage/redis/peer_store_test.go b/storage/redis/peer_store_test.go index 305927a..1edf32a 100644 --- a/storage/redis/peer_store_test.go +++ b/storage/redis/peer_store_test.go @@ -10,7 +10,7 @@ import ( s "github.com/chihaya/chihaya/storage" ) -func createNew() s.PeerStore { +func createNew() s.Storage { rs, err := miniredis.Run() if err != nil { panic(err) diff --git a/storage/storage.go b/storage/storage.go index e64c5f8..b4c46b5 100644 --- a/storage/storage.go +++ b/storage/storage.go @@ -14,24 +14,24 @@ var ( drivers = make(map[string]Driver) ) -// Driver is the interface used to initialize a new type of PeerStore. +// Driver is the interface used to initialize a new type of Storage. type Driver interface { - NewPeerStore(cfg interface{}) (PeerStore, error) + NewPeerStore(cfg interface{}) (Storage, error) } // ErrResourceDoesNotExist is the error returned by all delete methods and the -// AnnouncePeers method of the PeerStore interface if the requested resource +// AnnouncePeers method of the Storage interface if the requested resource // does not exist. var ErrResourceDoesNotExist = bittorrent.ClientError("resource does not exist") -// ErrDriverDoesNotExist is the error returned by NewPeerStore when a peer +// ErrDriverDoesNotExist is the error returned by NewStorage when a peer // store driver with that name does not exist. var ErrDriverDoesNotExist = errors.New("peer store driver with that name does not exist") -// PeerStore is an interface that abstracts the interactions of storing and +// Storage is an interface that abstracts the interactions of storing and // manipulating Peers such that it can be implemented for various data stores. // -// Implementations of the PeerStore interface must do the following in addition +// Implementations of the Storage interface must do the following in addition // to implementing the methods of the interface in the way documented: // // - Implement a garbage-collection strategy that ensures stale data is removed. @@ -40,13 +40,13 @@ var ErrDriverDoesNotExist = errors.New("peer store driver with that name does no // be scanned periodically and too old Peers removed. The intervals and // durations involved should be configurable. // - IPv4 and IPv6 swarms must be isolated from each other. -// A PeerStore must be able to transparently handle IPv4 and IPv6 Peers, but +// A Storage must be able to transparently handle IPv4 and IPv6 Peers, but // must separate them. AnnouncePeers and ScrapeSwarm must return information // about the Swarm matching the given AddressFamily only. // // Implementations can be tested against this interface using the tests in -// storage_tests.go and the benchmarks in storage_bench.go. -type PeerStore interface { +// storage_test.go and the benchmarks in storage_bench.go. +type Storage interface { // PutSeeder adds a Seeder to the Swarm identified by the provided // InfoHash. PutSeeder(infoHash bittorrent.InfoHash, p bittorrent.Peer) error @@ -104,13 +104,21 @@ type PeerStore interface { // If the Swarm does not exist, an empty Scrape and no error is returned. ScrapeSwarm(infoHash bittorrent.InfoHash, addressFamily bittorrent.AddressFamily) bittorrent.Scrape + /*TODO: implement this*/ + + Put(key interface{}, value interface{}) + + Load(key interface{}) interface{} + + Delete(key interface{}) + // stop.Stopper is an interface that expects a Stop method to stop the - // PeerStore. + // Storage. // For more details see the documentation in the stop package. stop.Stopper // log.Fielder returns a loggable version of the data used to configure and - // operate a particular PeerStore. + // operate a particular Storage. log.Fielder } @@ -136,11 +144,11 @@ func RegisterDriver(name string, d Driver) { drivers[name] = d } -// NewPeerStore attempts to initialize a new PeerStore instance from +// NewStorage attempts to initialize a new Storage instance from // the list of registered Drivers. // // If a driver does not exist, returns ErrDriverDoesNotExist. -func NewPeerStore(name string, cfg interface{}) (ps PeerStore, err error) { +func NewStorage(name string, cfg interface{}) (ps Storage, err error) { driversM.RLock() defer driversM.RUnlock() diff --git a/storage/storage_bench.go b/storage/storage_bench.go index 10d56ff..d1eb0b6 100644 --- a/storage/storage_bench.go +++ b/storage/storage_bench.go @@ -19,7 +19,7 @@ func generateInfohashes() (a [1000]bittorrent.InfoHash) { for i := range a { b := make([]byte, 20) rand.Read(b) - a[i], _ = bittorrent.InfoHashFromBytes(b) + a[i], _ = bittorrent.NewInfoHash(b) } return @@ -49,10 +49,10 @@ func generatePeers() (a [1000]bittorrent.Peer) { return } -type executionFunc func(int, PeerStore, *benchData) error -type setupFunc func(PeerStore, *benchData) error +type executionFunc func(int, Storage, *benchData) error +type setupFunc func(Storage, *benchData) error -func runBenchmark(b *testing.B, ps PeerStore, parallel bool, sf setupFunc, ef executionFunc) { +func runBenchmark(b *testing.B, ps Storage, parallel bool, sf setupFunc, ef executionFunc) { bd := &benchData{generateInfohashes(), generatePeers()} spacing := int32(1000 / runtime.NumCPU()) if sf != nil { @@ -92,65 +92,65 @@ func runBenchmark(b *testing.B, ps PeerStore, parallel bool, sf setupFunc, ef ex } // Nop executes a no-op for each iteration. -// It should produce the same results for each PeerStore. +// It should produce the same results for each Storage. // This can be used to get an estimate of the impact of the benchmark harness // on benchmark results and an estimate of the general performance of the system // benchmarked on. // // Nop can run in parallel. -func Nop(b *testing.B, ps PeerStore) { - runBenchmark(b, ps, true, putPeers, func(i int, ps PeerStore, bd *benchData) error { +func Nop(b *testing.B, ps Storage) { + runBenchmark(b, ps, true, putPeers, func(i int, ps Storage, bd *benchData) error { return nil }) } -// Put benchmarks the PutSeeder method of a PeerStore by repeatedly Putting the +// Put benchmarks the PutSeeder method of a Storage by repeatedly Putting the // same Peer for the same InfoHash. // // Put can run in parallel. -func Put(b *testing.B, ps PeerStore) { - runBenchmark(b, ps, true, nil, func(i int, ps PeerStore, bd *benchData) error { +func Put(b *testing.B, ps Storage) { + runBenchmark(b, ps, true, nil, func(i int, ps Storage, bd *benchData) error { return ps.PutSeeder(bd.infohashes[0], bd.peers[0]) }) } -// Put1k benchmarks the PutSeeder method of a PeerStore by cycling through 1000 +// Put1k benchmarks the PutSeeder method of a Storage by cycling through 1000 // Peers and Putting them into the swarm of one infohash. // // Put1k can run in parallel. -func Put1k(b *testing.B, ps PeerStore) { - runBenchmark(b, ps, true, nil, func(i int, ps PeerStore, bd *benchData) error { +func Put1k(b *testing.B, ps Storage) { + runBenchmark(b, ps, true, nil, func(i int, ps Storage, bd *benchData) error { return ps.PutSeeder(bd.infohashes[0], bd.peers[i%1000]) }) } -// Put1kInfohash benchmarks the PutSeeder method of a PeerStore by cycling +// Put1kInfohash benchmarks the PutSeeder method of a Storage by cycling // through 1000 infohashes and putting the same peer into their swarms. // // Put1kInfohash can run in parallel. -func Put1kInfohash(b *testing.B, ps PeerStore) { - runBenchmark(b, ps, true, nil, func(i int, ps PeerStore, bd *benchData) error { +func Put1kInfohash(b *testing.B, ps Storage) { + runBenchmark(b, ps, true, nil, func(i int, ps Storage, bd *benchData) error { return ps.PutSeeder(bd.infohashes[i%1000], bd.peers[0]) }) } -// Put1kInfohash1k benchmarks the PutSeeder method of a PeerStore by cycling +// Put1kInfohash1k benchmarks the PutSeeder method of a Storage by cycling // through 1000 infohashes and 1000 Peers and calling Put with them. // // Put1kInfohash1k can run in parallel. -func Put1kInfohash1k(b *testing.B, ps PeerStore) { - runBenchmark(b, ps, true, nil, func(i int, ps PeerStore, bd *benchData) error { +func Put1kInfohash1k(b *testing.B, ps Storage) { + runBenchmark(b, ps, true, nil, func(i int, ps Storage, bd *benchData) error { err := ps.PutSeeder(bd.infohashes[i%1000], bd.peers[(i*3)%1000]) return err }) } -// PutDelete benchmarks the PutSeeder and DeleteSeeder methods of a PeerStore by +// PutDelete benchmarks the PutSeeder and DeleteSeeder methods of a Storage by // calling PutSeeder followed by DeleteSeeder for one Peer and one infohash. // // PutDelete can not run in parallel. -func PutDelete(b *testing.B, ps PeerStore) { - runBenchmark(b, ps, false, nil, func(i int, ps PeerStore, bd *benchData) error { +func PutDelete(b *testing.B, ps Storage) { + runBenchmark(b, ps, false, nil, func(i int, ps Storage, bd *benchData) error { err := ps.PutSeeder(bd.infohashes[0], bd.peers[0]) if err != nil { return err @@ -163,8 +163,8 @@ func PutDelete(b *testing.B, ps PeerStore) { // PutDelete does, but with one from 1000 Peers per iteration. // // PutDelete1k can not run in parallel. -func PutDelete1k(b *testing.B, ps PeerStore) { - runBenchmark(b, ps, false, nil, func(i int, ps PeerStore, bd *benchData) error { +func PutDelete1k(b *testing.B, ps Storage) { + runBenchmark(b, ps, false, nil, func(i int, ps Storage, bd *benchData) error { err := ps.PutSeeder(bd.infohashes[0], bd.peers[i%1000]) if err != nil { return err @@ -177,8 +177,8 @@ func PutDelete1k(b *testing.B, ps PeerStore) { // 1000 Peers. // // PutDelete1kInfohash can not run in parallel. -func PutDelete1kInfohash(b *testing.B, ps PeerStore) { - runBenchmark(b, ps, false, nil, func(i int, ps PeerStore, bd *benchData) error { +func PutDelete1kInfohash(b *testing.B, ps Storage) { + runBenchmark(b, ps, false, nil, func(i int, ps Storage, bd *benchData) error { err := ps.PutSeeder(bd.infohashes[i%1000], bd.peers[0]) if err != nil { } @@ -190,8 +190,8 @@ func PutDelete1kInfohash(b *testing.B, ps PeerStore) { // addition to 1000 Peers. // // PutDelete1kInfohash1k can not run in parallel. -func PutDelete1kInfohash1k(b *testing.B, ps PeerStore) { - runBenchmark(b, ps, false, nil, func(i int, ps PeerStore, bd *benchData) error { +func PutDelete1kInfohash1k(b *testing.B, ps Storage) { + runBenchmark(b, ps, false, nil, func(i int, ps Storage, bd *benchData) error { err := ps.PutSeeder(bd.infohashes[i%1000], bd.peers[(i*3)%1000]) if err != nil { return err @@ -201,102 +201,102 @@ func PutDelete1kInfohash1k(b *testing.B, ps PeerStore) { }) } -// DeleteNonexist benchmarks the DeleteSeeder method of a PeerStore by +// DeleteNonexist benchmarks the DeleteSeeder method of a Storage by // attempting to delete a Peer that is nonexistent. // // DeleteNonexist can run in parallel. -func DeleteNonexist(b *testing.B, ps PeerStore) { - runBenchmark(b, ps, true, nil, func(i int, ps PeerStore, bd *benchData) error { +func DeleteNonexist(b *testing.B, ps Storage) { + runBenchmark(b, ps, true, nil, func(i int, ps Storage, bd *benchData) error { ps.DeleteSeeder(bd.infohashes[0], bd.peers[0]) return nil }) } -// DeleteNonexist1k benchmarks the DeleteSeeder method of a PeerStore by +// DeleteNonexist1k benchmarks the DeleteSeeder method of a Storage by // attempting to delete one of 1000 nonexistent Peers. // // DeleteNonexist can run in parallel. -func DeleteNonexist1k(b *testing.B, ps PeerStore) { - runBenchmark(b, ps, true, nil, func(i int, ps PeerStore, bd *benchData) error { +func DeleteNonexist1k(b *testing.B, ps Storage) { + runBenchmark(b, ps, true, nil, func(i int, ps Storage, bd *benchData) error { ps.DeleteSeeder(bd.infohashes[0], bd.peers[i%1000]) return nil }) } -// DeleteNonexist1kInfohash benchmarks the DeleteSeeder method of a PeerStore by +// DeleteNonexist1kInfohash benchmarks the DeleteSeeder method of a Storage by // attempting to delete one Peer from one of 1000 infohashes. // // DeleteNonexist1kInfohash can run in parallel. -func DeleteNonexist1kInfohash(b *testing.B, ps PeerStore) { - runBenchmark(b, ps, true, nil, func(i int, ps PeerStore, bd *benchData) error { +func DeleteNonexist1kInfohash(b *testing.B, ps Storage) { + runBenchmark(b, ps, true, nil, func(i int, ps Storage, bd *benchData) error { ps.DeleteSeeder(bd.infohashes[i%1000], bd.peers[0]) return nil }) } -// DeleteNonexist1kInfohash1k benchmarks the Delete method of a PeerStore by +// DeleteNonexist1kInfohash1k benchmarks the Delete method of a Storage by // attempting to delete one of 1000 Peers from one of 1000 Infohashes. // // DeleteNonexist1kInfohash1k can run in parallel. -func DeleteNonexist1kInfohash1k(b *testing.B, ps PeerStore) { - runBenchmark(b, ps, true, nil, func(i int, ps PeerStore, bd *benchData) error { +func DeleteNonexist1kInfohash1k(b *testing.B, ps Storage) { + runBenchmark(b, ps, true, nil, func(i int, ps Storage, bd *benchData) error { ps.DeleteSeeder(bd.infohashes[i%1000], bd.peers[(i*3)%1000]) return nil }) } -// GradNonexist benchmarks the GraduateLeecher method of a PeerStore by +// GradNonexist benchmarks the GraduateLeecher method of a Storage by // attempting to graduate a nonexistent Peer. // // GradNonexist can run in parallel. -func GradNonexist(b *testing.B, ps PeerStore) { - runBenchmark(b, ps, true, nil, func(i int, ps PeerStore, bd *benchData) error { +func GradNonexist(b *testing.B, ps Storage) { + runBenchmark(b, ps, true, nil, func(i int, ps Storage, bd *benchData) error { ps.GraduateLeecher(bd.infohashes[0], bd.peers[0]) return nil }) } -// GradNonexist1k benchmarks the GraduateLeecher method of a PeerStore by +// GradNonexist1k benchmarks the GraduateLeecher method of a Storage by // attempting to graduate one of 1000 nonexistent Peers. // // GradNonexist1k can run in parallel. -func GradNonexist1k(b *testing.B, ps PeerStore) { - runBenchmark(b, ps, true, nil, func(i int, ps PeerStore, bd *benchData) error { +func GradNonexist1k(b *testing.B, ps Storage) { + runBenchmark(b, ps, true, nil, func(i int, ps Storage, bd *benchData) error { ps.GraduateLeecher(bd.infohashes[0], bd.peers[i%1000]) return nil }) } -// GradNonexist1kInfohash benchmarks the GraduateLeecher method of a PeerStore +// GradNonexist1kInfohash benchmarks the GraduateLeecher method of a Storage // by attempting to graduate a nonexistent Peer for one of 100 Infohashes. // // GradNonexist1kInfohash can run in parallel. -func GradNonexist1kInfohash(b *testing.B, ps PeerStore) { - runBenchmark(b, ps, true, nil, func(i int, ps PeerStore, bd *benchData) error { +func GradNonexist1kInfohash(b *testing.B, ps Storage) { + runBenchmark(b, ps, true, nil, func(i int, ps Storage, bd *benchData) error { ps.GraduateLeecher(bd.infohashes[i%1000], bd.peers[0]) return nil }) } -// GradNonexist1kInfohash1k benchmarks the GraduateLeecher method of a PeerStore +// GradNonexist1kInfohash1k benchmarks the GraduateLeecher method of a Storage // by attempting to graduate one of 1000 nonexistent Peers for one of 1000 // infohashes. // // GradNonexist1kInfohash1k can run in parallel. -func GradNonexist1kInfohash1k(b *testing.B, ps PeerStore) { - runBenchmark(b, ps, true, nil, func(i int, ps PeerStore, bd *benchData) error { +func GradNonexist1kInfohash1k(b *testing.B, ps Storage) { + runBenchmark(b, ps, true, nil, func(i int, ps Storage, bd *benchData) error { ps.GraduateLeecher(bd.infohashes[i%1000], bd.peers[(i*3)%1000]) return nil }) } // PutGradDelete benchmarks the PutLeecher, GraduateLeecher and DeleteSeeder -// methods of a PeerStore by adding one leecher to a swarm, promoting it to a +// methods of a Storage by adding one leecher to a swarm, promoting it to a // seeder and deleting the seeder. // // PutGradDelete can not run in parallel. -func PutGradDelete(b *testing.B, ps PeerStore) { - runBenchmark(b, ps, false, nil, func(i int, ps PeerStore, bd *benchData) error { +func PutGradDelete(b *testing.B, ps Storage) { + runBenchmark(b, ps, false, nil, func(i int, ps Storage, bd *benchData) error { err := ps.PutLeecher(bd.infohashes[0], bd.peers[0]) if err != nil { return err @@ -312,8 +312,8 @@ func PutGradDelete(b *testing.B, ps PeerStore) { // PutGradDelete1k behaves like PutGradDelete with one of 1000 Peers. // // PutGradDelete1k can not run in parallel. -func PutGradDelete1k(b *testing.B, ps PeerStore) { - runBenchmark(b, ps, false, nil, func(i int, ps PeerStore, bd *benchData) error { +func PutGradDelete1k(b *testing.B, ps Storage) { + runBenchmark(b, ps, false, nil, func(i int, ps Storage, bd *benchData) error { err := ps.PutLeecher(bd.infohashes[0], bd.peers[i%1000]) if err != nil { return err @@ -330,8 +330,8 @@ func PutGradDelete1k(b *testing.B, ps PeerStore) { // infohashes. // // PutGradDelete1kInfohash can not run in parallel. -func PutGradDelete1kInfohash(b *testing.B, ps PeerStore) { - runBenchmark(b, ps, false, nil, func(i int, ps PeerStore, bd *benchData) error { +func PutGradDelete1kInfohash(b *testing.B, ps Storage) { + runBenchmark(b, ps, false, nil, func(i int, ps Storage, bd *benchData) error { err := ps.PutLeecher(bd.infohashes[i%1000], bd.peers[0]) if err != nil { return err @@ -348,8 +348,8 @@ func PutGradDelete1kInfohash(b *testing.B, ps PeerStore) { // and one of 1000 infohashes. // // PutGradDelete1kInfohash can not run in parallel. -func PutGradDelete1kInfohash1k(b *testing.B, ps PeerStore) { - runBenchmark(b, ps, false, nil, func(i int, ps PeerStore, bd *benchData) error { +func PutGradDelete1kInfohash1k(b *testing.B, ps Storage) { + runBenchmark(b, ps, false, nil, func(i int, ps Storage, bd *benchData) error { err := ps.PutLeecher(bd.infohashes[i%1000], bd.peers[(i*3)%1000]) if err != nil { return err @@ -363,7 +363,7 @@ func PutGradDelete1kInfohash1k(b *testing.B, ps PeerStore) { }) } -func putPeers(ps PeerStore, bd *benchData) error { +func putPeers(ps Storage, bd *benchData) error { for i := 0; i < 1000; i++ { for j := 0; j < 1000; j++ { var err error @@ -380,13 +380,13 @@ func putPeers(ps PeerStore, bd *benchData) error { return nil } -// AnnounceLeecher benchmarks the AnnouncePeers method of a PeerStore for +// AnnounceLeecher benchmarks the AnnouncePeers method of a Storage for // announcing a leecher. // The swarm announced to has 500 seeders and 500 leechers. // // AnnounceLeecher can run in parallel. -func AnnounceLeecher(b *testing.B, ps PeerStore) { - runBenchmark(b, ps, true, putPeers, func(i int, ps PeerStore, bd *benchData) error { +func AnnounceLeecher(b *testing.B, ps Storage) { + runBenchmark(b, ps, true, putPeers, func(i int, ps Storage, bd *benchData) error { _, err := ps.AnnouncePeers(bd.infohashes[0], false, 50, bd.peers[0]) return err }) @@ -396,8 +396,8 @@ func AnnounceLeecher(b *testing.B, ps PeerStore) { // infohashes. // // AnnounceLeecher1kInfohash can run in parallel. -func AnnounceLeecher1kInfohash(b *testing.B, ps PeerStore) { - runBenchmark(b, ps, true, putPeers, func(i int, ps PeerStore, bd *benchData) error { +func AnnounceLeecher1kInfohash(b *testing.B, ps Storage) { + runBenchmark(b, ps, true, putPeers, func(i int, ps Storage, bd *benchData) error { _, err := ps.AnnouncePeers(bd.infohashes[i%1000], false, 50, bd.peers[0]) return err }) @@ -407,8 +407,8 @@ func AnnounceLeecher1kInfohash(b *testing.B, ps PeerStore) { // leecher. // // AnnounceSeeder can run in parallel. -func AnnounceSeeder(b *testing.B, ps PeerStore) { - runBenchmark(b, ps, true, putPeers, func(i int, ps PeerStore, bd *benchData) error { +func AnnounceSeeder(b *testing.B, ps Storage) { + runBenchmark(b, ps, true, putPeers, func(i int, ps Storage, bd *benchData) error { _, err := ps.AnnouncePeers(bd.infohashes[0], true, 50, bd.peers[0]) return err }) @@ -418,19 +418,19 @@ func AnnounceSeeder(b *testing.B, ps PeerStore) { // infohashes. // // AnnounceSeeder1kInfohash can run in parallel. -func AnnounceSeeder1kInfohash(b *testing.B, ps PeerStore) { - runBenchmark(b, ps, true, putPeers, func(i int, ps PeerStore, bd *benchData) error { +func AnnounceSeeder1kInfohash(b *testing.B, ps Storage) { + runBenchmark(b, ps, true, putPeers, func(i int, ps Storage, bd *benchData) error { _, err := ps.AnnouncePeers(bd.infohashes[i%1000], true, 50, bd.peers[0]) return err }) } -// ScrapeSwarm benchmarks the ScrapeSwarm method of a PeerStore. +// ScrapeSwarm benchmarks the ScrapeSwarm method of a Storage. // The swarm scraped has 500 seeders and 500 leechers. // // ScrapeSwarm can run in parallel. -func ScrapeSwarm(b *testing.B, ps PeerStore) { - runBenchmark(b, ps, true, putPeers, func(i int, ps PeerStore, bd *benchData) error { +func ScrapeSwarm(b *testing.B, ps Storage) { + runBenchmark(b, ps, true, putPeers, func(i int, ps Storage, bd *benchData) error { ps.ScrapeSwarm(bd.infohashes[0], bittorrent.IPv4) return nil }) @@ -439,8 +439,8 @@ func ScrapeSwarm(b *testing.B, ps PeerStore) { // ScrapeSwarm1kInfohash behaves like ScrapeSwarm with one of 1000 infohashes. // // ScrapeSwarm1kInfohash can run in parallel. -func ScrapeSwarm1kInfohash(b *testing.B, ps PeerStore) { - runBenchmark(b, ps, true, putPeers, func(i int, ps PeerStore, bd *benchData) error { +func ScrapeSwarm1kInfohash(b *testing.B, ps Storage) { + runBenchmark(b, ps, true, putPeers, func(i int, ps Storage, bd *benchData) error { ps.ScrapeSwarm(bd.infohashes[i%1000], bittorrent.IPv4) return nil }) diff --git a/storage/storage_tests.go b/storage/storage_test.go similarity index 78% rename from storage/storage_tests.go rename to storage/storage_test.go index 2419d55..acd6f8f 100644 --- a/storage/storage_tests.go +++ b/storage/storage_test.go @@ -11,30 +11,34 @@ import ( // PeerEqualityFunc is the boolean function to use to check two Peers for // equality. -// Depending on the implementation of the PeerStore, this can be changed to +// Depending on the implementation of the Storage, this can be changed to // use (Peer).EqualEndpoint instead. var PeerEqualityFunc = func(p1, p2 bittorrent.Peer) bool { return p1.Equal(p2) } -// TestPeerStore tests a PeerStore implementation against the interface. -func TestPeerStore(t *testing.T, p PeerStore) { - ih0, _ := bittorrent.InfoHashFromString("00000000000000000001") - ih1, _ := bittorrent.InfoHashFromString("00000000000000000002") +// TestPeerStore tests a Storage implementation against the interface. +func TestPeerStore(t *testing.T, p Storage) { + ih0, _ := bittorrent.NewInfoHash([]byte("00000000000000000001")) + ih1, _ := bittorrent.NewInfoHash([]byte("00000000000000000002")) + id0, _ := bittorrent.NewPeerID([]byte("00000000000000000001")) + id1, _ := bittorrent.NewPeerID([]byte("00000000000000000001")) + id2, _ := bittorrent.NewPeerID([]byte("00000000000000000001")) + id3, _ := bittorrent.NewPeerID([]byte("00000000000000000001")) testData := []struct { ih bittorrent.InfoHash peer bittorrent.Peer }{ { ih0, - bittorrent.Peer{ID: bittorrent.PeerIDFromString("00000000000000000001"), Port: 1, IP: bittorrent.IP{IP: net.ParseIP("1.1.1.1").To4(), AddressFamily: bittorrent.IPv4}}, + bittorrent.Peer{ID: id0, Port: 1, IP: bittorrent.IP{IP: net.ParseIP("1.1.1.1").To4(), AddressFamily: bittorrent.IPv4}}, }, { ih1, - bittorrent.Peer{ID: bittorrent.PeerIDFromString("00000000000000000002"), Port: 2, IP: bittorrent.IP{IP: net.ParseIP("abab::0001"), AddressFamily: bittorrent.IPv6}}, + bittorrent.Peer{ID: id1, Port: 2, IP: bittorrent.IP{IP: net.ParseIP("abab::0001"), AddressFamily: bittorrent.IPv6}}, }, } - v4Peer := bittorrent.Peer{ID: bittorrent.PeerIDFromString("99999999999999999994"), IP: bittorrent.IP{IP: net.ParseIP("99.99.99.99").To4(), AddressFamily: bittorrent.IPv4}, Port: 9994} - v6Peer := bittorrent.Peer{ID: bittorrent.PeerIDFromString("99999999999999999996"), IP: bittorrent.IP{IP: net.ParseIP("fc00::0001"), AddressFamily: bittorrent.IPv6}, Port: 9996} + v4Peer := bittorrent.Peer{ID: id2, IP: bittorrent.IP{IP: net.ParseIP("99.99.99.99").To4(), AddressFamily: bittorrent.IPv4}, Port: 9994} + v6Peer := bittorrent.Peer{ID: id3, IP: bittorrent.IP{IP: net.ParseIP("fc00::0001"), AddressFamily: bittorrent.IPv6}, Port: 9996} for _, c := range testData { peer := v4Peer