diff --git a/.github/workflows/CI.yaml b/.github/workflows/CI.yaml index d3c05be..62beb62 100644 --- a/.github/workflows/CI.yaml +++ b/.github/workflows/CI.yaml @@ -56,7 +56,7 @@ jobs: go-version: ^1.15 - name: End-to-End Test run: | - go install ./cmd/chihaya + go install --tags e2e ./cmd/chihaya cat ./dist/example_config.yaml chihaya --config=./dist/example_config.yaml --debug & pid=$! @@ -87,7 +87,7 @@ jobs: cat ./dist/example_redis_config.yaml - name: End-to-End Test run: | - go install ./cmd/chihaya + go install --tags e2e ./cmd/chihaya chihaya --config=./dist/example_redis_config.yaml --debug & pid=$! sleep 2 diff --git a/bittorrent/bittorrent.go b/bittorrent/bittorrent.go index 07ce499..7551880 100644 --- a/bittorrent/bittorrent.go +++ b/bittorrent/bittorrent.go @@ -4,6 +4,8 @@ package bittorrent import ( + "crypto/sha1" + "crypto/sha256" "fmt" "github.com/pkg/errors" "net" @@ -45,8 +47,8 @@ func (p PeerID) RawString() string { type InfoHash string const ( - InfoHashV1Len = 20 - InfoHashV2Len = 32 + InfoHashV1Len = sha1.Size + InfoHashV2Len = sha256.Size NoneInfoHash InfoHash = "" ) diff --git a/bittorrent/bittorrent_test.go b/bittorrent/bittorrent_test.go index 7b39a45..1394d3e 100644 --- a/bittorrent/bittorrent_test.go +++ b/bittorrent/bittorrent_test.go @@ -19,7 +19,6 @@ var peerStringTestCases = []struct { }{ { input: Peer{ - ID: NewPeerID(b), IP: IP{net.IPv4(10, 11, 12, 1), IPv4}, Port: 1234, }, @@ -27,7 +26,6 @@ var peerStringTestCases = []struct { }, { input: Peer{ - ID: NewPeerID(b), IP: IP{net.ParseIP("2001:db8::ff00:42:8329"), IPv6}, Port: 1234, }, @@ -36,7 +34,9 @@ var peerStringTestCases = []struct { } func TestPeerID_String(t *testing.T) { - s := NewPeerID(b).String() + pid, err := NewPeerID(b) + require.Nil(t, err) + s := pid.String() require.Equal(t, expected, s) } @@ -47,7 +47,10 @@ func TestInfoHash_String(t *testing.T) { } func TestPeer_String(t *testing.T) { + pid, err := NewPeerID(b) + require.Nil(t, err) for _, c := range peerStringTestCases { + c.input.ID = pid got := c.input.String() require.Equal(t, c.expected, got) } diff --git a/bittorrent/params.go b/bittorrent/params.go index 453fe00..0cc7496 100644 --- a/bittorrent/params.go +++ b/bittorrent/params.go @@ -168,7 +168,7 @@ func parseQuery(query string) (q *QueryParams, err error) { } if key == "info_hash" { - if ih, err := InfoHashFromString(value); err == nil{ + if ih, err := NewInfoHash([]byte(value)); err == nil { q.infoHashes = append(q.infoHashes, ih) } else { return nil, err diff --git a/cmd/chihaya/config.go b/cmd/chihaya/config.go index ea9b65f..983ecba 100644 --- a/cmd/chihaya/config.go +++ b/cmd/chihaya/config.go @@ -29,9 +29,9 @@ type storageConfig struct { // Config represents the configuration used for executing Chihaya. type Config struct { middleware.ResponseConfig `yaml:",inline"` - MetricsAddr string `yaml:"metrics_addr"` - HTTPConfig http.Config `yaml:"http"` - UDPConfig udp.Config `yaml:"udp"` + MetricsAddr string `yaml:"metrics_addr"` + HTTPConfig http.Config `yaml:"http"` + UDPConfig udp.Config `yaml:"udp"` Storage storageConfig `yaml:"storage"` PreHooks []middleware.Config `yaml:"prehooks"` PostHooks []middleware.Config `yaml:"posthooks"` diff --git a/cmd/chihaya/e2e.go b/cmd/chihaya/e2e.go index 17ae662..29bcedd 100644 --- a/cmd/chihaya/e2e.go +++ b/cmd/chihaya/e2e.go @@ -1,3 +1,6 @@ +//go:build e2e +// +build e2e + package main import ( @@ -13,6 +16,19 @@ import ( "github.com/chihaya/chihaya/pkg/log" ) +func init() { + e2eCmd = &cobra.Command{ + Use: "e2e", + Short: "exec e2e tests", + Long: "Execute the Chihaya end-to-end test suite", + RunE: EndToEndRunCmdFunc, + } + + e2eCmd.Flags().String("httpaddr", "http://127.0.0.1:6969/announce", "address of the HTTP tracker") + e2eCmd.Flags().String("udpaddr", "udp://127.0.0.1:6969", "address of the UDP tracker") + e2eCmd.Flags().Duration("delay", time.Second, "delay between announces") +} + // EndToEndRunCmdFunc implements a Cobra command that runs the end-to-end test // suite for a Chihaya build. func EndToEndRunCmdFunc(cmd *cobra.Command, args []string) error { @@ -55,7 +71,7 @@ func EndToEndRunCmdFunc(cmd *cobra.Command, args []string) error { } func generateInfohash() bittorrent.InfoHash { - b := make([]byte, 20) + b := make([]byte, bittorrent.InfoHashV1Len) rand.Read(b) ih, _ := bittorrent.NewInfoHash(b) return ih @@ -70,7 +86,7 @@ func testWithInfohash(infoHash bittorrent.InfoHash, url string, delay time.Durat var ih [bittorrent.InfoHashV1Len]byte req := tracker.AnnounceRequest{ InfoHash: ih, - PeerId: [20]byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20}, + PeerId: [bittorrent.PeerIDLen]byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20}, Downloaded: 50, Left: 100, Uploaded: 50, @@ -98,7 +114,7 @@ func testWithInfohash(infoHash bittorrent.InfoHash, url string, delay time.Durat copy(ih[:], infoHash) req = tracker.AnnounceRequest{ InfoHash: ih, - PeerId: [20]byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 21}, + PeerId: [bittorrent.PeerIDLen]byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 21}, Downloaded: 50, Left: 100, Uploaded: 50, diff --git a/cmd/chihaya/main.go b/cmd/chihaya/main.go index 1d8c30d..3c546af 100644 --- a/cmd/chihaya/main.go +++ b/cmd/chihaya/main.go @@ -2,15 +2,13 @@ package main import ( "errors" + "github.com/sirupsen/logrus" + "github.com/spf13/cobra" "os" "os/signal" "runtime" "strings" "syscall" - "time" - - "github.com/sirupsen/logrus" - "github.com/spf13/cobra" "github.com/chihaya/chihaya/frontend/http" "github.com/chihaya/chihaya/frontend/udp" @@ -21,6 +19,8 @@ import ( "github.com/chihaya/chihaya/storage" ) +var e2eCmd *cobra.Command + // Run represents the state of a running instance of Chihaya. type Run struct { configFilePath string @@ -137,7 +137,7 @@ func (r *Run) Stop(keepPeerStore bool) (storage.Storage, error) { // RootRunCmdFunc implements a Cobra command that runs an instance of Chihaya // and handles reloading and shutdown via process signals. -func RootRunCmdFunc(cmd *cobra.Command, args []string) error { +func RootRunCmdFunc(cmd *cobra.Command, _ []string) error { configFilePath, err := cmd.Flags().GetString("config") if err != nil { return err @@ -177,7 +177,7 @@ func RootRunCmdFunc(cmd *cobra.Command, args []string) error { } // RootPreRunCmdFunc handles command line flags for the Run command. -func RootPreRunCmdFunc(cmd *cobra.Command, args []string) error { +func RootPreRunCmdFunc(cmd *cobra.Command, _ []string) error { noColors, err := cmd.Flags().GetBool("nocolors") if err != nil { return err @@ -233,19 +233,10 @@ func main() { rootCmd.Flags().String("config", "/etc/chihaya.yaml", "location of configuration file") - var e2eCmd = &cobra.Command{ - Use: "e2e", - Short: "exec e2e tests", - Long: "Execute the Chihaya end-to-end test suite", - RunE: EndToEndRunCmdFunc, + if e2eCmd != nil { + rootCmd.AddCommand(e2eCmd) } - e2eCmd.Flags().String("httpaddr", "http://127.0.0.1:6969/announce", "address of the HTTP tracker") - e2eCmd.Flags().String("udpaddr", "udp://127.0.0.1:6969", "address of the UDP tracker") - e2eCmd.Flags().Duration("delay", time.Second, "delay between announces") - - rootCmd.AddCommand(e2eCmd) - if err := rootCmd.Execute(); err != nil { log.Fatal("failed when executing root cobra command: " + err.Error()) } diff --git a/cmd/chihaya/signal_unix.go b/cmd/chihaya/signal_unix.go index e2c6987..5b61d80 100644 --- a/cmd/chihaya/signal_unix.go +++ b/cmd/chihaya/signal_unix.go @@ -1,3 +1,4 @@ +//go:build darwin || freebsd || linux || netbsd || openbsd || dragonfly || solaris // +build darwin freebsd linux netbsd openbsd dragonfly solaris package main diff --git a/cmd/chihaya/signal_windows.go b/cmd/chihaya/signal_windows.go index ebd0d43..8110409 100644 --- a/cmd/chihaya/signal_windows.go +++ b/cmd/chihaya/signal_windows.go @@ -1,3 +1,4 @@ +//go:build windows // +build windows package main diff --git a/frontend/http/frontend.go b/frontend/http/frontend.go index 8cc45d7..e9537f5 100644 --- a/frontend/http/frontend.go +++ b/frontend/http/frontend.go @@ -186,7 +186,7 @@ func NewFrontend(logic frontend.TrackerLogic, provided Config) (*Frontend, error if cfg.Addr != "" { go func() { - if err := f.serveHTTP(router,false); err != nil { + if err := f.serveHTTP(router, false); err != nil { log.Fatal("failed while serving http", log.Err(err)) } }() @@ -194,7 +194,7 @@ func NewFrontend(logic frontend.TrackerLogic, provided Config) (*Frontend, error if cfg.HTTPSAddr != "" { go func() { - if err := f.serveHTTP(router,true); err != nil { + if err := f.serveHTTP(router, true); err != nil { log.Fatal("failed while serving https", log.Err(err)) } }() @@ -245,7 +245,7 @@ func (f *Frontend) serveHTTP(handler http.Handler, tls bool) error { srv.TLSConfig = f.tlsCfg f.tlsSrv = srv err = srv.ListenAndServe() - } else{ + } else { srv.Addr = f.Addr f.srv = srv err = f.tlsSrv.ListenAndServeTLS("", "") diff --git a/frontend/http/parser.go b/frontend/http/parser.go index f795114..33226b4 100644 --- a/frontend/http/parser.go +++ b/frontend/http/parser.go @@ -67,11 +67,10 @@ func ParseAnnounce(r *http.Request, opts ParseOptions) (*bittorrent.AnnounceRequ if !ok { return nil, bittorrent.ClientError("failed to parse parameter: peer_id") } - if len(peerID) != bittorrent.PeerIDLen { - return nil, bittorrent.ClientError("failed to provide valid peer_id") + request.Peer.ID, err = bittorrent.NewPeerID([]byte(peerID)) + if err != nil { + return nil, err } - request.Peer.ID = bittorrent.NewPeerID([]byte(peerID)) - // Determine the number of remaining bytes for the client. request.Left, err = qp.Uint64("left") if err != nil { diff --git a/frontend/http/writer.go b/frontend/http/writer.go index deb1042..bb22683 100644 --- a/frontend/http/writer.go +++ b/frontend/http/writer.go @@ -23,7 +23,7 @@ func WriteError(w http.ResponseWriter, err error) { w.WriteHeader(http.StatusOK) if err = bencode.NewEncoder(w).Encode(map[string]interface{}{ "failure reason": message, - }); err != nil{ + }); err != nil { log.Error("unable to encode string", log.Err(err)) } } diff --git a/frontend/udp/connection_id.go b/frontend/udp/connection_id.go index 3dd6978..991fea1 100644 --- a/frontend/udp/connection_id.go +++ b/frontend/udp/connection_id.go @@ -7,7 +7,7 @@ import ( "net" "time" - sha256 "github.com/minio/sha256-simd" + "github.com/minio/sha256-simd" "github.com/chihaya/chihaya/pkg/log" ) diff --git a/frontend/udp/connection_id_test.go b/frontend/udp/connection_id_test.go index 36cbe9c..15f68eb 100644 --- a/frontend/udp/connection_id_test.go +++ b/frontend/udp/connection_id_test.go @@ -9,7 +9,7 @@ import ( "testing" "time" - sha256 "github.com/minio/sha256-simd" + "github.com/minio/sha256-simd" "github.com/stretchr/testify/require" "github.com/chihaya/chihaya/pkg/log" diff --git a/frontend/udp/frontend.go b/frontend/udp/frontend.go index 98f8b75..fe0f857 100644 --- a/frontend/udp/frontend.go +++ b/frontend/udp/frontend.go @@ -148,7 +148,7 @@ func (t *Frontend) Stop() stop.Result { c := make(stop.Channel) go func() { close(t.closing) - t.socket.SetReadDeadline(time.Now()) + _ = t.socket.SetReadDeadline(time.Now()) t.wg.Wait() c.Done(t.socket.Close()) }() diff --git a/frontend/udp/parser.go b/frontend/udp/parser.go index 1fdbbdd..59151fc 100644 --- a/frontend/udp/parser.go +++ b/frontend/udp/parser.go @@ -7,7 +7,7 @@ import ( "net" "sync" - "github.com/chihaya/chihaya/bittorrent" + bittorrent "github.com/chihaya/chihaya/bittorrent" ) const ( @@ -113,7 +113,12 @@ func ParseAnnounce(r Request, v6Action bool, opts ParseOptions) (*bittorrent.Ann } ih, err := bittorrent.NewInfoHash(infohash) - if err != nil{ + if err != nil { + return nil, err + } + + peerId, err := bittorrent.NewPeerID(peerID) + if err != nil { return nil, err } @@ -128,7 +133,7 @@ func ParseAnnounce(r Request, v6Action bool, opts ParseOptions) (*bittorrent.Ann NumWantProvided: true, EventProvided: true, Peer: bittorrent.Peer{ - ID: bittorrent.NewPeerID(peerID), + ID: peerId, IP: bittorrent.IP{IP: ip}, Port: port, }, @@ -227,7 +232,7 @@ func ParseScrape(r Request, opts ParseOptions) (*bittorrent.ScrapeRequest, error pageSize = bittorrent.InfoHashV2Len } for len(r.Packet) >= pageSize { - if ih, err := bittorrent.NewInfoHash(r.Packet[:pageSize]); err != nil{ + if ih, err := bittorrent.NewInfoHash(r.Packet[:pageSize]); err != nil { return nil, err } else { infohashes = append(infohashes, ih) diff --git a/frontend/udp/writer.go b/frontend/udp/writer.go index c23619f..d86bd2d 100644 --- a/frontend/udp/writer.go +++ b/frontend/udp/writer.go @@ -18,9 +18,9 @@ func WriteError(w io.Writer, txID []byte, err error) { buf := newBuffer() writeHeader(buf, txID, errorActionID) - buf.WriteString(err.Error()) - buf.WriteRune('\000') - w.Write(buf.Bytes()) + _, _ = buf.WriteString(err.Error()) + _, _ = buf.WriteRune('\000') + _, _ = w.Write(buf.Bytes()) buf.free() } @@ -37,9 +37,9 @@ func WriteAnnounce(w io.Writer, txID []byte, resp *bittorrent.AnnounceResponse, } else { writeHeader(buf, txID, announceActionID) } - binary.Write(buf, binary.BigEndian, uint32(resp.Interval/time.Second)) - binary.Write(buf, binary.BigEndian, resp.Incomplete) - binary.Write(buf, binary.BigEndian, resp.Complete) + _ = binary.Write(buf, binary.BigEndian, uint32(resp.Interval/time.Second)) + _ = binary.Write(buf, binary.BigEndian, resp.Incomplete) + _ = binary.Write(buf, binary.BigEndian, resp.Complete) peers := resp.IPv4Peers if v6Peers { @@ -47,11 +47,11 @@ func WriteAnnounce(w io.Writer, txID []byte, resp *bittorrent.AnnounceResponse, } for _, peer := range peers { - buf.Write(peer.IP.IP) - binary.Write(buf, binary.BigEndian, peer.Port) + _, _ = buf.Write(peer.IP.IP) + _ = binary.Write(buf, binary.BigEndian, peer.Port) } - w.Write(buf.Bytes()) + _, _ = w.Write(buf.Bytes()) buf.free() } @@ -62,12 +62,12 @@ func WriteScrape(w io.Writer, txID []byte, resp *bittorrent.ScrapeResponse) { writeHeader(buf, txID, scrapeActionID) for _, scrape := range resp.Files { - binary.Write(buf, binary.BigEndian, scrape.Complete) - binary.Write(buf, binary.BigEndian, scrape.Snatches) - binary.Write(buf, binary.BigEndian, scrape.Incomplete) + _ = binary.Write(buf, binary.BigEndian, scrape.Complete) + _ = binary.Write(buf, binary.BigEndian, scrape.Snatches) + _ = binary.Write(buf, binary.BigEndian, scrape.Incomplete) } - w.Write(buf.Bytes()) + _, _ = w.Write(buf.Bytes()) buf.free() } @@ -76,15 +76,15 @@ func WriteConnectionID(w io.Writer, txID, connID []byte) { buf := newBuffer() writeHeader(buf, txID, connectActionID) - buf.Write(connID) + _, _ = buf.Write(connID) - w.Write(buf.Bytes()) + _, _ = w.Write(buf.Bytes()) buf.free() } // writeHeader writes the action and transaction ID to the provided response // buffer. func writeHeader(w io.Writer, txID []byte, action uint32) { - binary.Write(w, binary.BigEndian, action) - w.Write(txID) + _ = binary.Write(w, binary.BigEndian, action) + _, _ = w.Write(txID) } diff --git a/middleware/clientapproval/client_id_test.go b/middleware/clientapproval/client_id_test.go index 486c351..c82c710 100644 --- a/middleware/clientapproval/client_id_test.go +++ b/middleware/clientapproval/client_id_test.go @@ -47,7 +47,11 @@ func TestClientID(t *testing.T) { t.Run(tt.peerID, func(t *testing.T) { var clientID ClientID copy(clientID[:], tt.clientID) - parsedID := NewClientID(bittorrent.NewPeerID([]byte(tt.peerID))) + peerId, err := bittorrent.NewPeerID([]byte(tt.peerID)) + if err != nil { + t.Error(err) + } + parsedID := NewClientID(peerId) if parsedID != clientID { t.Error("Incorrectly parsed peer ID", tt.peerID, "as", parsedID) } diff --git a/middleware/clientapproval/clientapproval_test.go b/middleware/clientapproval/clientapproval_test.go index f46ae77..100dbff 100644 --- a/middleware/clientapproval/clientapproval_test.go +++ b/middleware/clientapproval/clientapproval_test.go @@ -59,8 +59,8 @@ func TestHandleAnnounce(t *testing.T) { req := &bittorrent.AnnounceRequest{} resp := &bittorrent.AnnounceResponse{} - peerid := bittorrent.NewPeerID([]byte(tt.peerID)) - + peerid, err := bittorrent.NewPeerID([]byte(tt.peerID)) + require.Nil(t, err) req.Peer.ID = peerid nctx, err := h.HandleAnnounce(ctx, req, resp) diff --git a/middleware/hooks.go b/middleware/hooks.go index b508f16..7c56193 100644 --- a/middleware/hooks.go +++ b/middleware/hooks.go @@ -28,7 +28,7 @@ type swarmInteractionHook struct { store storage.Storage } -func (h *swarmInteractionHook) HandleAnnounce(ctx context.Context, req *bittorrent.AnnounceRequest, resp *bittorrent.AnnounceResponse) (_ context.Context, err error) { +func (h *swarmInteractionHook) HandleAnnounce(ctx context.Context, req *bittorrent.AnnounceRequest, _ *bittorrent.AnnounceResponse) (_ context.Context, err error) { if ctx.Value(SkipSwarmInteractionKey) != nil { return ctx, nil } @@ -75,14 +75,14 @@ type skipResponseHook struct{} // skip. var SkipResponseHookKey = skipResponseHook{} -type scrapeAddressType struct{} +//type scrapeAddressType struct{} // ScrapeIsIPv6Key is the key under which to store whether or not the // address used to request a scrape was an IPv6 address. // The value is expected to be of type bool. // A missing value or a value that is not a bool for this key is equivalent to // it being set to false. -var ScrapeIsIPv6Key = scrapeAddressType{} +//var ScrapeIsIPv6Key = scrapeAddressType{} type responseHook struct { store storage.Storage diff --git a/middleware/jwt/jwt.go b/middleware/jwt/jwt.go index 2cbccd3..db81119 100644 --- a/middleware/jwt/jwt.go +++ b/middleware/jwt/jwt.go @@ -22,7 +22,7 @@ import ( "github.com/SermoDigital/jose/jws" "github.com/SermoDigital/jose/jwt" "github.com/mendsley/gojwk" - yaml "gopkg.in/yaml.v2" + "gopkg.in/yaml.v2" "github.com/chihaya/chihaya/bittorrent" "github.com/chihaya/chihaya/middleware" diff --git a/middleware/logic_test.go b/middleware/logic_test.go index 9976815..caf777c 100644 --- a/middleware/logic_test.go +++ b/middleware/logic_test.go @@ -15,11 +15,11 @@ import ( // benchmarks. type nopHook struct{} -func (h *nopHook) HandleAnnounce(ctx context.Context, req *bittorrent.AnnounceRequest, resp *bittorrent.AnnounceResponse) (context.Context, error) { +func (h *nopHook) HandleAnnounce(ctx context.Context, _ *bittorrent.AnnounceRequest, _ *bittorrent.AnnounceResponse) (context.Context, error) { return ctx, nil } -func (h *nopHook) HandleScrape(ctx context.Context, req *bittorrent.ScrapeRequest, resp *bittorrent.ScrapeResponse) (context.Context, error) { +func (h *nopHook) HandleScrape(ctx context.Context, _ *bittorrent.ScrapeRequest, _ *bittorrent.ScrapeResponse) (context.Context, error) { return ctx, nil } diff --git a/middleware/middleware.go b/middleware/middleware.go index ff918ee..57d123e 100644 --- a/middleware/middleware.go +++ b/middleware/middleware.go @@ -7,7 +7,7 @@ import ( "github.com/chihaya/chihaya/storage" "sync" - yaml "gopkg.in/yaml.v2" + "gopkg.in/yaml.v2" ) var ( diff --git a/middleware/torrentapproval/container/container.go b/middleware/torrentapproval/container/container.go index cb515fb..5e10045 100644 --- a/middleware/torrentapproval/container/container.go +++ b/middleware/torrentapproval/container/container.go @@ -7,7 +7,9 @@ import ( "sync" ) -type Builder func ([]byte, storage.Storage) (Container, error) +const DefaultStorageCtxName = "MW_APPROVAL" + +type Builder func([]byte, storage.Storage) (Container, error) var ( buildersMU sync.Mutex @@ -30,7 +32,7 @@ func Register(n string, c Builder) { } type Container interface { - Contains(bittorrent.InfoHash) bool + Approved(bittorrent.InfoHash) bool } func GetContainer(name string, confBytes []byte, storage storage.Storage) (Container, error) { diff --git a/middleware/torrentapproval/container/directory/directory.go b/middleware/torrentapproval/container/directory/directory.go index 6856a74..4eb8dd0 100644 --- a/middleware/torrentapproval/container/directory/directory.go +++ b/middleware/torrentapproval/container/directory/directory.go @@ -4,63 +4,88 @@ package directory import ( + "crypto/sha256" "fmt" + "github.com/anacrolix/torrent/metainfo" "github.com/anacrolix/torrent/util/dirwatch" + "github.com/chihaya/chihaya/bittorrent" "github.com/chihaya/chihaya/middleware/torrentapproval/container" "github.com/chihaya/chihaya/middleware/torrentapproval/container/list" + "github.com/chihaya/chihaya/pkg/log" "github.com/chihaya/chihaya/pkg/stop" "github.com/chihaya/chihaya/storage" "gopkg.in/yaml.v2" - "sync" ) +const Name = "directory" + func init() { - container.Register("directory", build) + container.Register(Name, build) } type Config struct { - WhitelistPath string `yaml:"whitelist_path"` - BlacklistPath string `yaml:"blacklist_path"` + list.Config + Path string `yaml:"path"` } -// TODO: change sync map to provided storage -func build(confBytes []byte, storage storage.Storage) (container.Container, error) { +func build(confBytes []byte, st storage.Storage) (container.Container, error) { c := new(Config) if err := yaml.Unmarshal(confBytes, c); err != nil { return nil, fmt.Errorf("unable to deserialise configuration: %v", err) } - if len(c.WhitelistPath) > 0 && len(c.BlacklistPath) > 0 { - return nil, fmt.Errorf("using both whitelist and blacklist is invalid") - } var err error - lst := &directory{ + d := &directory{ List: list.List{ - Hashes: sync.Map{}, - Invert: len(c.WhitelistPath) == 0, + Invert: c.Invert, + Storage: st, + StorageCtx: c.StorageCtx, }, watcher: nil, } - dir := c.WhitelistPath - if lst.Invert { - dir = c.BlacklistPath - } - //FIXME: implement V2 torrent add/delete var w *dirwatch.Instance - if w, err = dirwatch.New(dir); err != nil { + if w, err = dirwatch.New(c.Path); err != nil { return nil, fmt.Errorf("unable to initialize directory watch: %v", err) } - lst.watcher = w + d.watcher = w + if len(d.StorageCtx) == 0 { + log.Info("Storage context not set, using default value: " + container.DefaultStorageCtxName) + d.StorageCtx = container.DefaultStorageCtxName + } go func() { - for event := range lst.watcher.Events { + for event := range d.watcher.Events { switch event.Change { case dirwatch.Added: - lst.Hashes.Store(event.InfoHash, list.DUMMY) + data := make([]storage.Pair, 1, 2) + data[0] = storage.Pair{Left: event.InfoHash[:], Right: list.DUMMY} + if v2ih, err := v2InfoHash(event.TorrentFilePath); err == nil { + data = append(data, storage.Pair{Left: v2ih, Right: list.DUMMY}) + } else { + log.Err(err) + } + d.Storage.BulkPut(c.StorageCtx, data...) case dirwatch.Removed: - lst.Hashes.Delete(event.InfoHash) + data := make([]interface{}, 1, 2) + data[0] = event.InfoHash[:] + if v2ih, err := v2InfoHash(event.TorrentFilePath); err == nil { + data = append(data, v2ih) + } else { + log.Err(err) + } + d.Storage.Delete(c.StorageCtx, data...) } } }() - return lst, err + return d, err +} + +func v2InfoHash(path string) (ih bittorrent.InfoHash, err error) { + var mi *metainfo.MetaInfo + if mi, err = metainfo.LoadFromFile(path); err == nil { + hash := sha256.New() + hash.Write(mi.InfoBytes) + ih, err = bittorrent.NewInfoHash(hash.Sum(nil)) + } + return } type directory struct { diff --git a/middleware/torrentapproval/container/list/list.go b/middleware/torrentapproval/container/list/list.go index e6704c8..3adb372 100644 --- a/middleware/torrentapproval/container/list/list.go +++ b/middleware/torrentapproval/container/list/list.go @@ -5,63 +5,68 @@ package list import ( "encoding/hex" "fmt" - bittorrent "github.com/chihaya/chihaya/bittorrent" + "github.com/chihaya/chihaya/bittorrent" "github.com/chihaya/chihaya/middleware/torrentapproval/container" + "github.com/chihaya/chihaya/pkg/log" "github.com/chihaya/chihaya/storage" "gopkg.in/yaml.v2" - "sync" ) +const Name = "list" + func init() { - container.Register("list", build) + container.Register(Name, build) } type Config struct { - Whitelist []string `yaml:"whitelist"` - Blacklist []string `yaml:"blacklist"` + HashList []string `yaml:"hash_list"` + Invert bool `yaml:"invert"` + StorageCtx string `yaml:"storage_ctx"` } -var DUMMY struct{} +const DUMMY = true -// TODO: change sync map to provided storage -func build(confBytes []byte, storage storage.Storage) (container.Container, error) { +func build(confBytes []byte, st storage.Storage) (container.Container, error) { c := new(Config) if err := yaml.Unmarshal(confBytes, c); err != nil { return nil, fmt.Errorf("unable to deserialise configuration: %v", err) } - if len(c.Whitelist) > 0 && len(c.Blacklist) > 0 { - return nil, fmt.Errorf("using both whitelist and blacklist is invalid") - } l := &List{ - Hashes: sync.Map{}, - Invert: len(c.Whitelist) == 0, + Invert: c.Invert, + Storage: st, + StorageCtx: c.StorageCtx, } - hashList := c.Whitelist - if l.Invert { - hashList = c.Blacklist + if len(l.StorageCtx) == 0 { + log.Info("Storage context not set, using default value: " + container.DefaultStorageCtxName) + l.StorageCtx = container.DefaultStorageCtxName } - for _, hashString := range hashList { - hashBytes, err := hex.DecodeString(hashString) - if err != nil { - return nil, fmt.Errorf("whitelist : invalid hash %s, %v", hashString, err) + if len(c.HashList) > 0 { + init := make([]storage.Pair, 0, len(c.HashList)) + for _, hashString := range c.HashList { + hashBytes, err := hex.DecodeString(hashString) + if err != nil { + return nil, fmt.Errorf("whitelist : invalid hash %s, %v", hashString, err) + } + ih, err := bittorrent.NewInfoHash(hashBytes) + if err != nil { + return nil, fmt.Errorf("whitelist : %s : %v", hashString, err) + } + init = append(init, storage.Pair{Left: ih, Right: DUMMY}) } - ih, err := bittorrent.NewInfoHash(hashBytes) - if err != nil { - return nil, fmt.Errorf("whitelist : %s : %v", hashString, err) - } - l.Hashes.Store(ih, DUMMY) + l.Storage.BulkPut(l.StorageCtx, init...) } return l, nil } type List struct { - Invert bool - Hashes sync.Map + Invert bool + Storage storage.Storage + StorageCtx string } -func (l *List) Contains(hash bittorrent.InfoHash) bool { - _, result := l.Hashes.Load(hash) - return result != l.Invert +func (l *List) Approved(hash bittorrent.InfoHash) bool { + b := l.Storage.Contains(l.StorageCtx, hash) + return b != l.Invert } diff --git a/middleware/torrentapproval/torrentapproval.go b/middleware/torrentapproval/torrentapproval.go index f903f64..37ce981 100644 --- a/middleware/torrentapproval/torrentapproval.go +++ b/middleware/torrentapproval/torrentapproval.go @@ -62,7 +62,7 @@ type hook struct { func (h *hook) HandleAnnounce(ctx context.Context, req *bittorrent.AnnounceRequest, _ *bittorrent.AnnounceResponse) (context.Context, error) { var err error - if !h.hashContainer.Contains(req.InfoHash) { + if !h.hashContainer.Approved(req.InfoHash) { err = ErrTorrentUnapproved } diff --git a/middleware/torrentapproval/torrentapproval_test.go b/middleware/torrentapproval/torrentapproval_test.go index bdf0ea0..4395e21 100644 --- a/middleware/torrentapproval/torrentapproval_test.go +++ b/middleware/torrentapproval/torrentapproval_test.go @@ -5,6 +5,7 @@ import ( "encoding/hex" "fmt" "github.com/chihaya/chihaya/middleware" + "github.com/chihaya/chihaya/storage/memory" "gopkg.in/yaml.v2" "testing" @@ -23,7 +24,7 @@ var cases = []struct { middleware.Config{ Name: "list", Options: map[string]interface{}{ - "whitelist": []string{"3532cf2d327fad8448c075b4cb42c8136964a435"}, + "hash_list": []string{"3532cf2d327fad8448c075b4cb42c8136964a435"}, }, }, "3532cf2d327fad8448c075b4cb42c8136964a435", @@ -34,7 +35,7 @@ var cases = []struct { middleware.Config{ Name: "list", Options: map[string]interface{}{ - "whitelist": []string{"3532cf2d327fad8448c075b4cb42c8136964a435"}, + "hash_list": []string{"3532cf2d327fad8448c075b4cb42c8136964a435"}, }, }, "4532cf2d327fad8448c075b4cb42c8136964a435", @@ -45,7 +46,8 @@ var cases = []struct { middleware.Config{ Name: "list", Options: map[string]interface{}{ - "blacklist": []string{"3532cf2d327fad8448c075b4cb42c8136964a435"}, + "hash_list": []string{"3532cf2d327fad8448c075b4cb42c8136964a435"}, + "invert": true, }, }, "4532cf2d327fad8448c075b4cb42c8136964a435", @@ -56,7 +58,8 @@ var cases = []struct { middleware.Config{ Name: "list", Options: map[string]interface{}{ - "blacklist": []string{"3532cf2d327fad8448c075b4cb42c8136964a435"}, + "hash_list": []string{"3532cf2d327fad8448c075b4cb42c8136964a435"}, + "invert": true, }, }, "3532cf2d327fad8448c075b4cb42c8136964a435", @@ -65,12 +68,15 @@ var cases = []struct { } func TestHandleAnnounce(t *testing.T) { + config := memory.Config{}.Validate() + storage, err := memory.New(config) + require.Nil(t, err) for _, tt := range cases { t.Run(fmt.Sprintf("testing hash %s", tt.ih), func(t *testing.T) { d := driver{} cfg, err := yaml.Marshal(tt.cfg) require.Nil(t, err) - h, err := d.NewHook(cfg) + h, err := d.NewHook(cfg, storage) require.Nil(t, err) ctx := context.Background() @@ -80,7 +86,8 @@ func TestHandleAnnounce(t *testing.T) { hashbytes, err := hex.DecodeString(tt.ih) require.Nil(t, err) - hashinfo := bittorrent.NewInfoHash(hashbytes) + hashinfo, err := bittorrent.NewInfoHash(hashbytes) + require.Nil(t, err) req.InfoHash = hashinfo diff --git a/middleware/varinterval/varinterval.go b/middleware/varinterval/varinterval.go index 0e7af22..5dee361 100644 --- a/middleware/varinterval/varinterval.go +++ b/middleware/varinterval/varinterval.go @@ -8,7 +8,7 @@ import ( "sync" "time" - yaml "gopkg.in/yaml.v2" + "gopkg.in/yaml.v2" "github.com/chihaya/chihaya/bittorrent" "github.com/chihaya/chihaya/middleware" diff --git a/storage/memory/peer_store.go b/storage/memory/storage.go similarity index 78% rename from storage/memory/peer_store.go rename to storage/memory/storage.go index e82f126..7cd8e35 100644 --- a/storage/memory/peer_store.go +++ b/storage/memory/storage.go @@ -4,14 +4,15 @@ package memory import ( "encoding/binary" - "net" + "fmt" + "reflect" "runtime" "sync" "time" - yaml "gopkg.in/yaml.v2" + "gopkg.in/yaml.v2" - bittorrent "github.com/chihaya/chihaya/bittorrent" + "github.com/chihaya/chihaya/bittorrent" "github.com/chihaya/chihaya/pkg/log" "github.com/chihaya/chihaya/pkg/stop" "github.com/chihaya/chihaya/pkg/timecache" @@ -36,7 +37,7 @@ func init() { type driver struct{} -func (d driver) NewPeerStore(icfg interface{}) (storage.Storage, error) { +func (d driver) NewStorage(icfg interface{}) (storage.Storage, error) { // Marshal the config back into bytes. bytes, err := yaml.Marshal(icfg) if err != nil { @@ -121,10 +122,11 @@ func (cfg Config) Validate() Config { // New creates a new Storage backed by memory. func New(provided Config) (storage.Storage, error) { cfg := provided.Validate() - ps := &peerStore{ - cfg: cfg, - shards: make([]*peerShard, cfg.ShardCount*2), - closed: make(chan struct{}), + ps := &store{ + cfg: cfg, + shards: make([]*peerShard, cfg.ShardCount*2), + contexts: sync.Map{}, + closed: make(chan struct{}), } for i := 0; i < cfg.ShardCount*2; i++ { @@ -174,40 +176,6 @@ func New(provided Config) (storage.Storage, error) { return ps, nil } -type serializedPeer string - -func newPeerKey(p bittorrent.Peer) serializedPeer { - b := make([]byte, 20+2+len(p.IP.IP)) - copy(b[:20], p.ID[:]) - binary.BigEndian.PutUint16(b[bittorrent.PeerIDLen:bittorrent.PeerIDLen+2], p.Port) - copy(b[bittorrent.PeerIDLen+2:], p.IP.IP) - - return serializedPeer(b) -} - -// TODO: move duplicated code into one place -func decodePeerKey(pk serializedPeer) bittorrent.Peer { - peerId, err := bittorrent.NewPeerID([]byte(pk[:bittorrent.PeerIDLen])) - if err != nil { - panic(err) - } - peer := bittorrent.Peer{ - ID: peerId, - Port: binary.BigEndian.Uint16([]byte(pk[bittorrent.PeerIDLen:bittorrent.PeerIDLen+2])), - IP: bittorrent.IP{IP: net.IP(pk[bittorrent.PeerIDLen+2:])}} - - if ip := peer.IP.To4(); ip != nil { - peer.IP.IP = ip - peer.IP.AddressFamily = bittorrent.IPv4 - } else if len(peer.IP.IP) == net.IPv6len { // implies toReturn.IP.To4() == nil - peer.IP.AddressFamily = bittorrent.IPv6 - } else { - panic("IP is neither v4 nor v6") - } - - return peer -} - type peerShard struct { swarms map[bittorrent.InfoHash]swarm numSeeders uint64 @@ -217,23 +185,24 @@ type peerShard struct { type swarm struct { // map serialized peer to mtime - seeders map[serializedPeer]int64 - leechers map[serializedPeer]int64 + seeders map[storage.SerializedPeer]int64 + leechers map[storage.SerializedPeer]int64 } -type peerStore struct { - cfg Config - shards []*peerShard +type store struct { + cfg Config + shards []*peerShard + contexts sync.Map closed chan struct{} wg sync.WaitGroup } -var _ storage.Storage = &peerStore{} +var _ storage.Storage = &store{} // populateProm aggregates metrics over all shards and then posts them to // prometheus. -func (ps *peerStore) populateProm() { +func (ps *store) populateProm() { var numInfohashes, numSeeders, numLeechers uint64 for _, s := range ps.shards { @@ -254,11 +223,11 @@ func recordGCDuration(duration time.Duration) { storage.PromGCDurationMilliseconds.Observe(float64(duration.Nanoseconds()) / float64(time.Millisecond)) } -func (ps *peerStore) getClock() int64 { +func (ps *store) getClock() int64 { return timecache.NowUnixNano() } -func (ps *peerStore) shardIndex(infoHash bittorrent.InfoHash, af bittorrent.AddressFamily) uint32 { +func (ps *store) shardIndex(infoHash bittorrent.InfoHash, af bittorrent.AddressFamily) uint32 { // There are twice the amount of shards specified by the user, the first // half is dedicated to IPv4 swarms and the second half is dedicated to // IPv6 swarms. @@ -269,22 +238,22 @@ func (ps *peerStore) shardIndex(infoHash bittorrent.InfoHash, af bittorrent.Addr return idx } -func (ps *peerStore) PutSeeder(ih bittorrent.InfoHash, p bittorrent.Peer) error { +func (ps *store) PutSeeder(ih bittorrent.InfoHash, p bittorrent.Peer) error { select { case <-ps.closed: panic("attempted to interact with stopped memory store") default: } - pk := newPeerKey(p) + pk := storage.NewSerializedPeer(p) shard := ps.shards[ps.shardIndex(ih, p.IP.AddressFamily)] shard.Lock() if _, ok := shard.swarms[ih]; !ok { shard.swarms[ih] = swarm{ - seeders: make(map[serializedPeer]int64), - leechers: make(map[serializedPeer]int64), + seeders: make(map[storage.SerializedPeer]int64), + leechers: make(map[storage.SerializedPeer]int64), } } @@ -300,14 +269,14 @@ func (ps *peerStore) PutSeeder(ih bittorrent.InfoHash, p bittorrent.Peer) error return nil } -func (ps *peerStore) DeleteSeeder(ih bittorrent.InfoHash, p bittorrent.Peer) error { +func (ps *store) DeleteSeeder(ih bittorrent.InfoHash, p bittorrent.Peer) error { select { case <-ps.closed: panic("attempted to interact with stopped memory store") default: } - pk := newPeerKey(p) + pk := storage.NewSerializedPeer(p) shard := ps.shards[ps.shardIndex(ih, p.IP.AddressFamily)] shard.Lock() @@ -333,22 +302,22 @@ func (ps *peerStore) DeleteSeeder(ih bittorrent.InfoHash, p bittorrent.Peer) err return nil } -func (ps *peerStore) PutLeecher(ih bittorrent.InfoHash, p bittorrent.Peer) error { +func (ps *store) PutLeecher(ih bittorrent.InfoHash, p bittorrent.Peer) error { select { case <-ps.closed: panic("attempted to interact with stopped memory store") default: } - pk := newPeerKey(p) + pk := storage.NewSerializedPeer(p) shard := ps.shards[ps.shardIndex(ih, p.IP.AddressFamily)] shard.Lock() if _, ok := shard.swarms[ih]; !ok { shard.swarms[ih] = swarm{ - seeders: make(map[serializedPeer]int64), - leechers: make(map[serializedPeer]int64), + seeders: make(map[storage.SerializedPeer]int64), + leechers: make(map[storage.SerializedPeer]int64), } } @@ -364,14 +333,14 @@ func (ps *peerStore) PutLeecher(ih bittorrent.InfoHash, p bittorrent.Peer) error return nil } -func (ps *peerStore) DeleteLeecher(ih bittorrent.InfoHash, p bittorrent.Peer) error { +func (ps *store) DeleteLeecher(ih bittorrent.InfoHash, p bittorrent.Peer) error { select { case <-ps.closed: panic("attempted to interact with stopped memory store") default: } - pk := newPeerKey(p) + pk := storage.NewSerializedPeer(p) shard := ps.shards[ps.shardIndex(ih, p.IP.AddressFamily)] shard.Lock() @@ -397,22 +366,22 @@ func (ps *peerStore) DeleteLeecher(ih bittorrent.InfoHash, p bittorrent.Peer) er return nil } -func (ps *peerStore) GraduateLeecher(ih bittorrent.InfoHash, p bittorrent.Peer) error { +func (ps *store) GraduateLeecher(ih bittorrent.InfoHash, p bittorrent.Peer) error { select { case <-ps.closed: panic("attempted to interact with stopped memory store") default: } - pk := newPeerKey(p) + pk := storage.NewSerializedPeer(p) shard := ps.shards[ps.shardIndex(ih, p.IP.AddressFamily)] shard.Lock() if _, ok := shard.swarms[ih]; !ok { shard.swarms[ih] = swarm{ - seeders: make(map[serializedPeer]int64), - leechers: make(map[serializedPeer]int64), + seeders: make(map[storage.SerializedPeer]int64), + leechers: make(map[storage.SerializedPeer]int64), } } @@ -434,7 +403,7 @@ func (ps *peerStore) GraduateLeecher(ih bittorrent.InfoHash, p bittorrent.Peer) return nil } -func (ps *peerStore) AnnouncePeers(ih bittorrent.InfoHash, seeder bool, numWant int, announcer bittorrent.Peer) (peers []bittorrent.Peer, err error) { +func (ps *store) AnnouncePeers(ih bittorrent.InfoHash, seeder bool, numWant int, announcer bittorrent.Peer) (peers []bittorrent.Peer, err error) { select { case <-ps.closed: panic("attempted to interact with stopped memory store") @@ -457,7 +426,7 @@ func (ps *peerStore) AnnouncePeers(ih bittorrent.InfoHash, seeder bool, numWant break } - peers = append(peers, decodePeerKey(pk)) + peers = append(peers, pk.ToPeer()) numWant-- } } else { @@ -468,14 +437,14 @@ func (ps *peerStore) AnnouncePeers(ih bittorrent.InfoHash, seeder bool, numWant break } - peers = append(peers, decodePeerKey(pk)) + peers = append(peers, pk.ToPeer()) numWant-- } // Append leechers until we reach numWant. if numWant > 0 { leechers := shard.swarms[ih].leechers - announcerPK := newPeerKey(announcer) + announcerPK := storage.NewSerializedPeer(announcer) for pk := range leechers { if pk == announcerPK { continue @@ -485,7 +454,7 @@ func (ps *peerStore) AnnouncePeers(ih bittorrent.InfoHash, seeder bool, numWant break } - peers = append(peers, decodePeerKey(pk)) + peers = append(peers, pk.ToPeer()) numWant-- } } @@ -495,7 +464,7 @@ func (ps *peerStore) AnnouncePeers(ih bittorrent.InfoHash, seeder bool, numWant return } -func (ps *peerStore) ScrapeSwarm(ih bittorrent.InfoHash, addressFamily bittorrent.AddressFamily) (resp bittorrent.Scrape) { +func (ps *store) ScrapeSwarm(ih bittorrent.InfoHash, addressFamily bittorrent.AddressFamily) (resp bittorrent.Scrape) { select { case <-ps.closed: panic("attempted to interact with stopped memory store") @@ -519,12 +488,65 @@ func (ps *peerStore) ScrapeSwarm(ih bittorrent.InfoHash, addressFamily bittorren return } +func asKey(in interface{}) interface{} { + if in == nil { + panic("unable to use nil map key") + } + if reflect.TypeOf(in).Comparable() { + return in + } + //FIXME: dirty hack + return fmt.Sprint(in) +} + +func (ps *store) Put(ctx string, key, value interface{}) { + m, _ := ps.contexts.LoadOrStore(ctx, new(sync.Map)) + m.(*sync.Map).Store(asKey(key), value) +} + +func (ps *store) Contains(ctx string, key interface{}) bool { + var exist bool + if m, found := ps.contexts.Load(ctx); found { + _, exist = m.(*sync.Map).Load(asKey(key)) + } + return exist +} + +func (ps *store) BulkPut(ctx string, pairs ...storage.Pair) { + if len(pairs) > 0 { + c, _ := ps.contexts.LoadOrStore(ctx, new(sync.Map)) + m := c.(*sync.Map) + for _, p := range pairs { + m.Store(asKey(p.Left), p.Right) + } + } +} + +func (ps *store) Load(ctx string, key interface{}) interface{} { + var v interface{} + if m, found := ps.contexts.Load(ctx); found { + v, _ = m.(*sync.Map).Load(asKey(key)) + } + return v +} + +func (ps *store) Delete(ctx string, keys ...interface{}) { + if len(keys) > 0 { + if m, found := ps.contexts.Load(ctx); found { + m := m.(*sync.Map) + for k := range keys { + m.Delete(asKey(k)) + } + } + } +} + // collectGarbage deletes all Peers from the Storage which are older than the // cutoff time. // // This function must be able to execute while other methods on this interface // are being executed in parallel. -func (ps *peerStore) collectGarbage(cutoff time.Time) error { +func (ps *store) collectGarbage(cutoff time.Time) error { select { case <-ps.closed: return nil @@ -582,7 +604,7 @@ func (ps *peerStore) collectGarbage(cutoff time.Time) error { return nil } -func (ps *peerStore) Stop() stop.Result { +func (ps *store) Stop() stop.Result { c := make(stop.Channel) go func() { close(ps.closed) @@ -601,6 +623,6 @@ func (ps *peerStore) Stop() stop.Result { return c.Result() } -func (ps *peerStore) LogFields() log.Fields { +func (ps *store) LogFields() log.Fields { return ps.cfg.LogFields() } diff --git a/storage/memory/peer_store_test.go b/storage/memory/storage_test.go similarity index 100% rename from storage/memory/peer_store_test.go rename to storage/memory/storage_test.go diff --git a/storage/misc.go b/storage/misc.go new file mode 100644 index 0000000..ca2ba5a --- /dev/null +++ b/storage/misc.go @@ -0,0 +1,44 @@ +package storage + +import ( + "encoding/binary" + "github.com/chihaya/chihaya/bittorrent" + "net" +) + +type Pair struct { + Left, Right interface{} +} + +type SerializedPeer string + +func NewSerializedPeer(p bittorrent.Peer) SerializedPeer { + b := make([]byte, bittorrent.PeerIDLen+2+len(p.IP.IP)) + copy(b[:bittorrent.PeerIDLen], p.ID[:]) + binary.BigEndian.PutUint16(b[bittorrent.PeerIDLen:bittorrent.PeerIDLen+2], p.Port) + copy(b[bittorrent.PeerIDLen+2:], p.IP.IP) + + return SerializedPeer(b) +} + +func (pk SerializedPeer) ToPeer() bittorrent.Peer { + peerId, err := bittorrent.NewPeerID([]byte(pk[:bittorrent.PeerIDLen])) + if err != nil { + panic(err) + } + peer := bittorrent.Peer{ + ID: peerId, + Port: binary.BigEndian.Uint16([]byte(pk[bittorrent.PeerIDLen : bittorrent.PeerIDLen+2])), + IP: bittorrent.IP{IP: net.IP(pk[bittorrent.PeerIDLen+2:])}} + + if ip := peer.IP.To4(); ip != nil { + peer.IP.IP = ip + peer.IP.AddressFamily = bittorrent.IPv4 + } else if len(peer.IP.IP) == net.IPv6len { // implies toReturn.IP.To4() == nil + peer.IP.AddressFamily = bittorrent.IPv6 + } else { + panic("IP is neither v4 nor v6") + } + + return peer +} diff --git a/storage/redis/redis.go b/storage/redis/redis.go index e1d7e4b..47975b6 100644 --- a/storage/redis/redis.go +++ b/storage/redis/redis.go @@ -27,10 +27,10 @@ func newRedisBackend(cfg *Config, u *redisURL, socketPath string) *redisBackend ConnectTimeout: cfg.RedisConnectTimeout, } pool := rc.NewPool() - redsync := redsync.New([]redsync.Pool{pool}) + rs := redsync.New([]redsync.Pool{pool}) return &redisBackend{ pool: pool, - redsync: redsync, + redsync: rs, } } diff --git a/storage/redis/peer_store.go b/storage/redis/storage.go similarity index 81% rename from storage/redis/peer_store.go rename to storage/redis/storage.go index 7d406d6..e8330b5 100644 --- a/storage/redis/peer_store.go +++ b/storage/redis/storage.go @@ -24,14 +24,13 @@ package redis import ( - "encoding/binary" - "net" + "fmt" "strconv" "sync" "time" "github.com/gomodule/redigo/redis" - yaml "gopkg.in/yaml.v2" + "gopkg.in/yaml.v2" "github.com/chihaya/chihaya/bittorrent" "github.com/chihaya/chihaya/pkg/log" @@ -61,7 +60,7 @@ func init() { type driver struct{} -func (d driver) NewPeerStore(icfg interface{}) (storage.Storage, error) { +func (d driver) NewStorage(icfg interface{}) (storage.Storage, error) { // Marshal the config back into bytes. bytes, err := yaml.Marshal(icfg) if err != nil { @@ -185,7 +184,7 @@ func New(provided Config) (storage.Storage, error) { return nil, err } - ps := &peerStore{ + ps := &store{ cfg: cfg, rb: newRedisBackend(&provided, u, ""), closed: make(chan struct{}), @@ -234,41 +233,7 @@ func New(provided Config) (storage.Storage, error) { return ps, nil } -type serializedPeer string - -func newPeerKey(p bittorrent.Peer) serializedPeer { - b := make([]byte, bittorrent.PeerIDLen+2+len(p.IP.IP)) - copy(b[:bittorrent.PeerIDLen], p.ID[:]) - binary.BigEndian.PutUint16(b[bittorrent.PeerIDLen:bittorrent.PeerIDLen+2], p.Port) - copy(b[bittorrent.PeerIDLen+2:], p.IP.IP) - - return serializedPeer(b) -} - -// TODO: move duplicated code into one place -func decodePeerKey(pk serializedPeer) bittorrent.Peer { - peerId, err := bittorrent.NewPeerID([]byte(pk[:bittorrent.PeerIDLen])) - if err != nil { - panic(err) - } - peer := bittorrent.Peer{ - ID: peerId, - Port: binary.BigEndian.Uint16([]byte(pk[bittorrent.PeerIDLen : bittorrent.PeerIDLen+2])), - IP: bittorrent.IP{IP: net.IP(pk[bittorrent.PeerIDLen+2:])}} - - if ip := peer.IP.To4(); ip != nil { - peer.IP.IP = ip - peer.IP.AddressFamily = bittorrent.IPv4 - } else if len(peer.IP.IP) == net.IPv6len { // implies toReturn.IP.To4() == nil - peer.IP.AddressFamily = bittorrent.IPv6 - } else { - panic("IP is neither v4 nor v6") - } - - return peer -} - -type peerStore struct { +type store struct { cfg Config rb *redisBackend @@ -276,31 +241,31 @@ type peerStore struct { wg sync.WaitGroup } -func (ps *peerStore) groups() []string { +func (ps *store) groups() []string { return []string{bittorrent.IPv4.String(), bittorrent.IPv6.String()} } -func (ps *peerStore) leecherInfohashKey(af, ih string) string { +func (ps *store) leecherInfohashKey(af, ih string) string { return af + "_L_" + ih } -func (ps *peerStore) seederInfohashKey(af, ih string) string { +func (ps *store) seederInfohashKey(af, ih string) string { return af + "_S_" + ih } -func (ps *peerStore) infohashCountKey(af string) string { +func (ps *store) infohashCountKey(af string) string { return af + "_infohash_count" } -func (ps *peerStore) seederCountKey(af string) string { +func (ps *store) seederCountKey(af string) string { return af + "_S_count" } -func (ps *peerStore) leecherCountKey(af string) string { +func (ps *store) leecherCountKey(af string) string { return af + "_L_count" } -func (ps *peerStore) getConnection() redis.Conn { +func (ps *store) getConnection() redis.Conn { select { case <-ps.closed: panic("attempted to interact with stopped redis store") @@ -309,13 +274,21 @@ func (ps *peerStore) getConnection() redis.Conn { return ps.rb.open() } +func closeConnection(con redis.Conn) { + if con != nil { + if err := con.Close(); err != nil { + log.Err(err) + } + } +} + // populateProm aggregates metrics over all groups and then posts them to // prometheus. -func (ps *peerStore) populateProm() { +func (ps *store) populateProm() { var numInfohashes, numSeeders, numLeechers int64 conn := ps.getConnection() - defer conn.Close() + defer closeConnection(conn) for _, group := range ps.groups() { if n, err := redis.Int64(conn.Do("GET", ps.infohashCountKey(group))); err != nil && err != redis.ErrNil { @@ -349,27 +322,27 @@ func (ps *peerStore) populateProm() { storage.PromLeechersCount.Set(float64(numLeechers)) } -func (ps *peerStore) getClock() int64 { +func (ps *store) getClock() int64 { return timecache.NowUnixNano() } -func (ps *peerStore) PutSeeder(ih bittorrent.InfoHash, p bittorrent.Peer) error { +func (ps *store) PutSeeder(ih bittorrent.InfoHash, p bittorrent.Peer) error { addressFamily := p.IP.AddressFamily.String() log.Debug("storage: PutSeeder", log.Fields{ "InfoHash": ih.String(), "Peer": p, }) - pk := newPeerKey(p) + pk := storage.NewSerializedPeer(p) encodedSeederInfoHash := ps.seederInfohashKey(addressFamily, ih.String()) ct := ps.getClock() conn := ps.getConnection() - defer conn.Close() + defer closeConnection(conn) - conn.Send("MULTI") - conn.Send("HSET", encodedSeederInfoHash, pk, ct) - conn.Send("HSET", addressFamily, encodedSeederInfoHash, ct) + _ = conn.Send("MULTI") + _ = conn.Send("HSET", encodedSeederInfoHash, pk, ct) + _ = conn.Send("HSET", addressFamily, encodedSeederInfoHash, ct) reply, err := redis.Int64s(conn.Do("EXEC")) if err != nil { return err @@ -393,17 +366,17 @@ func (ps *peerStore) PutSeeder(ih bittorrent.InfoHash, p bittorrent.Peer) error return nil } -func (ps *peerStore) DeleteSeeder(ih bittorrent.InfoHash, p bittorrent.Peer) error { +func (ps *store) DeleteSeeder(ih bittorrent.InfoHash, p bittorrent.Peer) error { addressFamily := p.IP.AddressFamily.String() log.Debug("storage: DeleteSeeder", log.Fields{ "InfoHash": ih.String(), "Peer": p, }) - pk := newPeerKey(p) + pk := storage.NewSerializedPeer(p) conn := ps.getConnection() - defer conn.Close() + defer closeConnection(conn) encodedSeederInfoHash := ps.seederInfohashKey(addressFamily, ih.String()) @@ -421,7 +394,7 @@ func (ps *peerStore) DeleteSeeder(ih bittorrent.InfoHash, p bittorrent.Peer) err return nil } -func (ps *peerStore) PutLeecher(ih bittorrent.InfoHash, p bittorrent.Peer) error { +func (ps *store) PutLeecher(ih bittorrent.InfoHash, p bittorrent.Peer) error { addressFamily := p.IP.AddressFamily.String() log.Debug("storage: PutLeecher", log.Fields{ "InfoHash": ih.String(), @@ -430,15 +403,15 @@ func (ps *peerStore) PutLeecher(ih bittorrent.InfoHash, p bittorrent.Peer) error // Update the peer in the swarm. encodedLeecherInfoHash := ps.leecherInfohashKey(addressFamily, ih.String()) - pk := newPeerKey(p) + pk := storage.NewSerializedPeer(p) ct := ps.getClock() conn := ps.getConnection() - defer conn.Close() + defer closeConnection(conn) - conn.Send("MULTI") - conn.Send("HSET", encodedLeecherInfoHash, pk, ct) - conn.Send("HSET", addressFamily, encodedLeecherInfoHash, ct) + _ = conn.Send("MULTI") + _ = conn.Send("HSET", encodedLeecherInfoHash, pk, ct) + _ = conn.Send("HSET", addressFamily, encodedLeecherInfoHash, ct) reply, err := redis.Int64s(conn.Do("EXEC")) if err != nil { return err @@ -453,7 +426,7 @@ func (ps *peerStore) PutLeecher(ih bittorrent.InfoHash, p bittorrent.Peer) error return nil } -func (ps *peerStore) DeleteLeecher(ih bittorrent.InfoHash, p bittorrent.Peer) error { +func (ps *store) DeleteLeecher(ih bittorrent.InfoHash, p bittorrent.Peer) error { addressFamily := p.IP.AddressFamily.String() log.Debug("storage: DeleteLeecher", log.Fields{ "InfoHash": ih.String(), @@ -461,9 +434,9 @@ func (ps *peerStore) DeleteLeecher(ih bittorrent.InfoHash, p bittorrent.Peer) er }) conn := ps.getConnection() - defer conn.Close() + defer closeConnection(conn) - pk := newPeerKey(p) + pk := storage.NewSerializedPeer(p) encodedLeecherInfoHash := ps.leecherInfohashKey(addressFamily, ih.String()) delNum, err := redis.Int64(conn.Do("HDEL", encodedLeecherInfoHash, pk)) @@ -478,7 +451,7 @@ func (ps *peerStore) DeleteLeecher(ih bittorrent.InfoHash, p bittorrent.Peer) er return nil } -func (ps *peerStore) GraduateLeecher(ih bittorrent.InfoHash, p bittorrent.Peer) error { +func (ps *store) GraduateLeecher(ih bittorrent.InfoHash, p bittorrent.Peer) error { addressFamily := p.IP.AddressFamily.String() log.Debug("storage: GraduateLeecher", log.Fields{ "InfoHash": ih.String(), @@ -488,16 +461,16 @@ func (ps *peerStore) GraduateLeecher(ih bittorrent.InfoHash, p bittorrent.Peer) encodedInfoHash := ih.String() encodedLeecherInfoHash := ps.leecherInfohashKey(addressFamily, encodedInfoHash) encodedSeederInfoHash := ps.seederInfohashKey(addressFamily, encodedInfoHash) - pk := newPeerKey(p) + pk := storage.NewSerializedPeer(p) ct := ps.getClock() conn := ps.getConnection() - defer conn.Close() + defer closeConnection(conn) - conn.Send("MULTI") - conn.Send("HDEL", encodedLeecherInfoHash, pk) - conn.Send("HSET", encodedSeederInfoHash, pk, ct) - conn.Send("HSET", addressFamily, encodedSeederInfoHash, ct) + _ = conn.Send("MULTI") + _ = conn.Send("HDEL", encodedLeecherInfoHash, pk) + _ = conn.Send("HSET", encodedSeederInfoHash, pk, ct) + _ = conn.Send("HSET", addressFamily, encodedSeederInfoHash, ct) reply, err := redis.Int64s(conn.Do("EXEC")) if err != nil { return err @@ -524,7 +497,7 @@ func (ps *peerStore) GraduateLeecher(ih bittorrent.InfoHash, p bittorrent.Peer) return nil } -func (ps *peerStore) AnnouncePeers(ih bittorrent.InfoHash, seeder bool, numWant int, announcer bittorrent.Peer) (peers []bittorrent.Peer, err error) { +func (ps *store) AnnouncePeers(ih bittorrent.InfoHash, seeder bool, numWant int, announcer bittorrent.Peer) (peers []bittorrent.Peer, err error) { addressFamily := announcer.IP.AddressFamily.String() log.Debug("storage: AnnouncePeers", log.Fields{ "InfoHash": ih.String(), @@ -538,7 +511,7 @@ func (ps *peerStore) AnnouncePeers(ih bittorrent.InfoHash, seeder bool, numWant encodedSeederInfoHash := ps.seederInfohashKey(addressFamily, encodedInfoHash) conn := ps.getConnection() - defer conn.Close() + defer closeConnection(conn) leechers, err := conn.Do("HKEYS", encodedLeecherInfoHash) if err != nil { @@ -563,7 +536,7 @@ func (ps *peerStore) AnnouncePeers(ih bittorrent.InfoHash, seeder bool, numWant break } - peers = append(peers, decodePeerKey(serializedPeer(pk.([]byte)))) + peers = append(peers, storage.SerializedPeer(pk.([]byte)).ToPeer()) numWant-- } } else { @@ -573,13 +546,13 @@ func (ps *peerStore) AnnouncePeers(ih bittorrent.InfoHash, seeder bool, numWant break } - peers = append(peers, decodePeerKey(serializedPeer(pk.([]byte)))) + peers = append(peers, storage.SerializedPeer(pk.([]byte)).ToPeer()) numWant-- } // Append leechers until we reach numWant. if numWant > 0 { - announcerPK := newPeerKey(announcer) + announcerPK := storage.NewSerializedPeer(announcer) for _, pk := range conLeechers { if pk == announcerPK { continue @@ -589,7 +562,7 @@ func (ps *peerStore) AnnouncePeers(ih bittorrent.InfoHash, seeder bool, numWant break } - peers = append(peers, decodePeerKey(serializedPeer(pk.([]byte)))) + peers = append(peers, storage.SerializedPeer(pk.([]byte)).ToPeer()) numWant-- } } @@ -598,7 +571,7 @@ func (ps *peerStore) AnnouncePeers(ih bittorrent.InfoHash, seeder bool, numWant return } -func (ps *peerStore) ScrapeSwarm(ih bittorrent.InfoHash, af bittorrent.AddressFamily) (resp bittorrent.Scrape) { +func (ps *store) ScrapeSwarm(ih bittorrent.InfoHash, af bittorrent.AddressFamily) (resp bittorrent.Scrape) { resp.InfoHash = ih addressFamily := af.String() @@ -607,7 +580,7 @@ func (ps *peerStore) ScrapeSwarm(ih bittorrent.InfoHash, af bittorrent.AddressFa encodedSeederInfoHash := ps.seederInfohashKey(addressFamily, encodedInfoHash) conn := ps.getConnection() - defer conn.Close() + defer closeConnection(conn) leechersLen, err := redis.Int64(conn.Do("HLEN", encodedLeecherInfoHash)) if err != nil { @@ -633,6 +606,62 @@ func (ps *peerStore) ScrapeSwarm(ih bittorrent.InfoHash, af bittorrent.AddressFa return } +func (ps *store) Put(ctx string, key, value interface{}) { + conn := ps.getConnection() + defer closeConnection(conn) + _ = conn.Send("HSET", ctx, key, value) +} + +func (ps *store) Contains(ctx string, key interface{}) bool { + conn := ps.getConnection() + defer closeConnection(conn) + exist, _ := redis.Bool(conn.Do("HEXISTS", ctx, key)) + return exist +} + +func (ps *store) BulkPut(ctx string, pairs ...storage.Pair) { + switch l := len(pairs); l { + case 0: + break + case 1: + ps.Put(ctx, fmt.Sprint(pairs[0].Left), pairs[0].Right) + default: + conn := ps.getConnection() + defer closeConnection(conn) + args := make([]interface{}, 1, l*2+1) + args[0] = ctx + for _, p := range pairs { + args = append(args, p.Left, p.Right) + } + _ = conn.Send("HSET", args...) + } +} + +func (ps *store) Load(ctx string, key interface{}) interface{} { + conn := ps.getConnection() + defer closeConnection(conn) + v, _ := conn.Do("HGET", ctx, key) + return v +} + +func (ps *store) Delete(ctx string, keys ...interface{}) { + switch l := len(keys); l { + case 0: + break + case 1: + conn := ps.getConnection() + defer closeConnection(conn) + _ = conn.Send("HDEL", ctx, keys[0]) + default: + conn := ps.getConnection() + defer closeConnection(conn) + args := make([]interface{}, 1, l+1) + args[0] = ctx + args = append(args, keys...) + _ = conn.Send("HDEL", args...) + } +} + // collectGarbage deletes all Peers from the Storage which are older than the // cutoff time. // @@ -678,12 +707,12 @@ func (ps *peerStore) ScrapeSwarm(ih bittorrent.InfoHash, af bittorrent.AddressFa // - If the change happens after the HLEN, we will not even attempt to make the // transaction. The infohash key will remain in the addressFamil hash and // we'll attempt to clean it up the next time collectGarbage runs. -func (ps *peerStore) collectGarbage(cutoff time.Time) error { +func (ps *store) collectGarbage(cutoff time.Time) error { cutoffUnix := cutoff.UnixNano() start := time.Now() conn := ps.getConnection() - defer conn.Close() + defer closeConnection(conn) for _, group := range ps.groups() { // list all infohashes in the group @@ -701,7 +730,7 @@ func (ps *peerStore) collectGarbage(cutoff time.Time) error { return err } - var pk serializedPeer + var pk storage.SerializedPeer var removedPeerCount int64 for index, ihField := range ihList { if index%2 == 1 { // value @@ -711,7 +740,7 @@ func (ps *peerStore) collectGarbage(cutoff time.Time) error { } if mtime <= cutoffUnix { log.Debug("storage: deleting peer", log.Fields{ - "Peer": decodePeerKey(pk).String(), + "Peer": pk.ToPeer(), }) ret, err := redis.Int64(conn.Do("HDEL", ihStr, pk)) if err != nil { @@ -721,7 +750,7 @@ func (ps *peerStore) collectGarbage(cutoff time.Time) error { removedPeerCount += ret } } else { // key - pk = serializedPeer([]byte(ihField)) + pk = storage.SerializedPeer(ihField) } } // DECR seeder/leecher counter @@ -750,10 +779,10 @@ func (ps *peerStore) collectGarbage(cutoff time.Time) error { // in other words, it's removed automatically after `HDEL` the last field. //_, err := conn.Do("DEL", ihStr) - conn.Send("MULTI") - conn.Send("HDEL", group, ihStr) + _ = conn.Send("MULTI") + _ = conn.Send("HDEL", group, ihStr) if isSeeder { - conn.Send("DECR", ps.infohashCountKey(group)) + _ = conn.Send("DECR", ps.infohashCountKey(group)) } _, err = redis.Values(conn.Do("EXEC")) if err != nil && err != redis.ErrNil { @@ -778,7 +807,7 @@ func (ps *peerStore) collectGarbage(cutoff time.Time) error { return nil } -func (ps *peerStore) Stop() stop.Result { +func (ps *store) Stop() stop.Result { c := make(stop.Channel) go func() { close(ps.closed) @@ -790,6 +819,6 @@ func (ps *peerStore) Stop() stop.Result { return c.Result() } -func (ps *peerStore) LogFields() log.Fields { +func (ps *store) LogFields() log.Fields { return ps.cfg.LogFields() } diff --git a/storage/redis/peer_store_test.go b/storage/redis/storage_test.go similarity index 100% rename from storage/redis/peer_store_test.go rename to storage/redis/storage_test.go diff --git a/storage/storage.go b/storage/storage.go index b4c46b5..02f3c56 100644 --- a/storage/storage.go +++ b/storage/storage.go @@ -16,7 +16,7 @@ var ( // Driver is the interface used to initialize a new type of Storage. type Driver interface { - NewPeerStore(cfg interface{}) (Storage, error) + NewStorage(cfg interface{}) (Storage, error) } // ErrResourceDoesNotExist is the error returned by all delete methods and the @@ -104,13 +104,23 @@ type Storage interface { // If the Swarm does not exist, an empty Scrape and no error is returned. ScrapeSwarm(infoHash bittorrent.InfoHash, addressFamily bittorrent.AddressFamily) bittorrent.Scrape - /*TODO: implement this*/ - - Put(key interface{}, value interface{}) + // Put used to place arbitrary k-v data with specified context + // into storage. ctx parameter used to group data + // (i.e. data only for specific middleware module) + Put(ctx string, key, value interface{}) - Load(key interface{}) interface{} + // BulkPut used to place array of k-v data in specified context. + // Useful when several data entries should be added in single transaction/connection + BulkPut(ctx string, pairs ...Pair) - Delete(key interface{}) + // Contains checks if any data in specified context exist + Contains(ctx string, key interface{}) bool + + // Load used to get arbitrary data in specified context by its key + Load(ctx string, key interface{}) interface{} + + // Delete used to delete arbitrary data in specified context by its keys + Delete(ctx string, keys ...interface{}) // stop.Stopper is an interface that expects a Stop method to stop the // Storage. @@ -158,5 +168,5 @@ func NewStorage(name string, cfg interface{}) (ps Storage, err error) { return nil, ErrDriverDoesNotExist } - return d.NewPeerStore(cfg) + return d.NewStorage(cfg) } diff --git a/storage/storage_bench.go b/storage/storage_bench.go index d1eb0b6..c5296ae 100644 --- a/storage/storage_bench.go +++ b/storage/storage_bench.go @@ -17,7 +17,7 @@ type benchData struct { func generateInfohashes() (a [1000]bittorrent.InfoHash) { for i := range a { - b := make([]byte, 20) + b := make([]byte, bittorrent.InfoHashV1Len) rand.Read(b) a[i], _ = bittorrent.NewInfoHash(b) } @@ -33,9 +33,9 @@ func generatePeers() (a [1000]bittorrent.Peer) { if err != nil || n != 4 { panic("unable to create random bytes") } - id := [20]byte{} + id := [bittorrent.PeerIDLen]byte{} n, err = r.Read(id[:]) - if err != nil || n != 20 { + if err != nil || n != bittorrent.InfoHashV1Len { panic("unable to create random bytes") } port := uint16(r.Uint32())