Merge commits 129aac230aa..828edb8fd8b from https://github.com/chihaya/chihaya

This commit is contained in:
Širhoe Biazhkovič
2022-04-12 15:58:14 +03:00
parent c858576f76
commit c7edbb52f2
51 changed files with 562 additions and 1327 deletions
+23
View File
@@ -0,0 +1,23 @@
---
version: 2
updates:
- package-ecosystem: "github-actions"
directory: "/"
schedule:
interval: "monthly"
labels:
- "component/dependencies"
- package-ecosystem: "gomod"
directory: "/"
schedule:
interval: "monthly"
labels:
- "component/dependencies"
- package-ecosystem: "docker"
directory: "/"
schedule:
interval: "monthly"
labels:
- "component/dependencies"
-95
View File
@@ -1,95 +0,0 @@
name: CI
on:
# See the documentation for more intricate event dispatch here:
# https://help.github.com/en/actions/reference/workflow-syntax-for-github-actions#on
push:
branches:
- "!dependabot/*"
- "*"
pull_request:
branches:
- "*"
jobs:
build:
name: Build & Lint
runs-on: ubuntu-latest
steps:
- name: Checkout
uses: actions/checkout@v2
- name: Setup
uses: actions/setup-go@v2
with:
go-version: ^1.15
- name: Build
run: go build -v ./cmd/...
- name: Vet
run: go vet ./...
# - name: Imports
# uses: Jerome1337/goimports-action@v1.0.3
- name: Format
uses: Jerome1337/gofmt-action@v1.0.4
- name: Lint
uses: Jerome1337/golint-action@v1.0.2
unit:
name: Unit Tests
runs-on: ubuntu-latest
steps:
- name: Checkout
uses: actions/checkout@v2
- name: Setup
uses: actions/setup-go@v2
with:
go-version: ^1.15
- name: Unit Tests
run: go test -v -race $(go list ./...)
e2e-mem:
name: E2E Tests (Memory Storage)
runs-on: ubuntu-latest
steps:
- name: Checkout
uses: actions/checkout@v2
- name: Setup
uses: actions/setup-go@v2
with:
go-version: ^1.15
- name: End-to-End Test
run: |
go install --tags e2e ./cmd/mochi
cat ./dist/example_config.yaml
mochi --config=./dist/example_config.yaml --debug &
pid=$!
sleep 2
mochi e2e --debug
kill $pid
e2e-redis:
name: E2E Tests (Redis Storage)
runs-on: ubuntu-latest
services:
redis:
image: redis
ports: [ "6379:6379" ]
options: --entrypoint redis-server
steps:
- name: Checkout
uses: actions/checkout@v2
- name: Setup
uses: actions/setup-go@v2
with:
go-version: ^1.15
- name: Configure redis storage
run: |
curl -LO https://github.com/jzelinskie/faq/releases/download/0.0.6/faq-linux-amd64
chmod +x faq-linux-amd64
./faq-linux-amd64 '.mochi.storage = {"config":{"gc_interval":"3m","peer_lifetime":"31m","prometheus_reporting_interval":"1s","redis_broker":"redis://127.0.0.1:6379/0","redis_connect_timeout":"15s","redis_read_timeout":"15s","redis_write_timeout":"15s"},"name":"redis"}' ./dist/example_config.yaml > ./dist/example_redis_config.yaml
cat ./dist/example_redis_config.yaml
- name: End-to-End Test
run: |
go install --tags e2e ./cmd/mochi
mochi --config=./dist/example_redis_config.yaml --debug &
pid=$!
sleep 2
mochi e2e --debug
kill $pid
+78
View File
@@ -0,0 +1,78 @@
---
name: "Build & Test"
on:
push:
branches:
- "!dependabot/*"
- "main"
pull_request:
branches: [ "*" ]
jobs:
build:
name: "Go Build"
runs-on: "ubuntu-latest"
steps:
- uses: "actions/checkout@v2"
- uses: "actions/setup-go@v2"
with:
go-version: "^1.18"
- name: "Build"
run: "go build ./cmd/..."
unit:
name: "Run Unit Tests"
runs-on: "ubuntu-latest"
steps:
- uses: "actions/checkout@v2"
- uses: "actions/setup-go@v2"
with:
go-version: "^1.18"
- name: "Run `go test`"
run: "go test -race ./..."
e2e-mem:
name: "E2E Memory Tests"
runs-on: "ubuntu-latest"
steps:
- uses: "actions/checkout@v2"
- uses: "actions/setup-go@v2"
with:
go-version: "^1.18"
- name: "Install and configure mochi"
run: |
go install --tags e2e ./cmd/mochi
cat ./dist/example_config.yaml
- name: "Run end-to-end tests"
run: |
mochi --config=./dist/example_config.yaml --debug &
pid=$!
sleep 2
mochi e2e --debug
kill $pid
e2e-redis:
name: "E2E Redis Tests"
runs-on: "ubuntu-latest"
services:
redis:
image: "redis"
ports: [ "6379:6379" ]
options: "--entrypoint redis-server"
steps:
- uses: "actions/checkout@v2"
- uses: "actions/setup-go@v2"
with:
go-version: "^1.18"
- name: "Install and configure mochi"
run: |
go install --tags e2e ./cmd/mochi
curl -LO https://github.com/jzelinskie/faq/releases/download/0.0.6/faq-linux-amd64
chmod +x faq-linux-amd64
./faq-linux-amd64 '.mochi.storage = {"config":{"gc_interval":"3m","peer_lifetime":"31m","prometheus_reporting_interval":"1s","redis_broker":"redis://127.0.0.1:6379/0","redis_connect_timeout":"15s","redis_read_timeout":"15s","redis_write_timeout":"15s"},"name":"redis"}' ./dist/example_config.yaml > ./dist/example_redis_config.yaml
cat ./dist/example_redis_config.yaml
- name: "Run end-to-end tests"
run: |
mochi --config=./dist/example_redis_config.yaml --debug &
pid=$!
sleep 2
mochi e2e --debug
kill $pid
+85
View File
@@ -0,0 +1,85 @@
---
name: "Lint"
on:
push:
branches:
- "!dependabot/*"
- "main"
pull_request:
branches: [ "*" ]
jobs:
go-mod-tidy:
name: "Lint Go Modules"
runs-on: "ubuntu-latest"
steps:
- uses: "actions/checkout@v2"
- uses: "actions/setup-go@v2"
with:
go-version: "^1.18"
- name: "Run `go mod tidy`"
run: "go mod tidy && bash -c '[ $(git status --porcelain | tee /dev/fd/2 | wc -c) -eq 0 ]'"
go-fmt:
name: "Format Go"
runs-on: "ubuntu-latest"
steps:
- uses: "actions/checkout@v2"
- uses: "actions/setup-go@v2"
with:
go-version: "^1.18"
- name: "Install gofumpt"
run: "go install mvdan.cc/gofumpt@latest"
- name: "Run `gofumpt`"
run: |
GOFUMPT_OUTPUT="$(find . -iname '*.go' -type f | xargs gofumpt -d)"
if [ -n "$GOFUMPT_OUTPUT" ]; then
echo "The following files are not correctly formatted:"
echo "${GOFUMPT_OUTPUT}"
exit 1
fi
go-lint:
name: "Lint Go"
runs-on: "ubuntu-latest"
steps:
- uses: "actions/checkout@v2"
- uses: "actions/setup-go@v2"
with:
go-version: "^1.18"
- uses: "golangci/golangci-lint-action@v2"
with:
version: "v1.43"
skip-go-installation: true
skip-pkg-cache: true
skip-build-cache: false
extra-lint:
name: "Lint YAML & Markdown"
runs-on: "ubuntu-latest"
steps:
- uses: "actions/checkout@v2"
- uses: "bewuethr/yamllint-action@v1.1.1"
with:
config-file: ".yamllint"
- uses: "nosborn/github-action-markdown-cli@v2.0.0"
with:
files: "."
config_file: ".markdownlint.yaml"
codeql:
name: "Analyze with CodeQL"
runs-on: "ubuntu-latest"
permissions:
actions: "read"
contents: "read"
security-events: "write"
strategy:
fail-fast: false
matrix:
language: [ "go" ]
steps:
- uses: "actions/checkout@v2"
- uses: "github/codeql-action/init@v1"
with:
languages: "${{ matrix.language }}"
- uses: "github/codeql-action/autobuild@v1"
- uses: "github/codeql-action/analyze@v1"
+50
View File
@@ -0,0 +1,50 @@
---
run:
timeout: "5m"
output:
sort-results: true
linters-settings:
goimports:
local-prefixes: "github.com/sot-te.ch/mochi"
gosec:
excludes:
- "G404" # Allow the usage of math/rand
linters:
enable:
- "bidichk"
- "bodyclose"
- "deadcode"
- "errcheck"
- "errname"
- "errorlint"
- "gofumpt"
- "goimports"
- "goprintffuncname"
- "gosec"
- "gosimple"
- "govet"
- "ifshort"
- "importas"
- "ineffassign"
- "makezero"
- "prealloc"
- "predeclared"
- "revive"
- "rowserrcheck"
- "staticcheck"
- "structcheck"
- "stylecheck"
- "tenv"
- "typecheck"
- "unconvert"
- "unused"
- "varcheck"
- "wastedassign"
- "whitespace"
issues:
include:
- "EXC0012" # Exported should have comment
- "EXC0012" # Exported should have comment
- "EXC0013" # Package comment should be of form
- "EXC0014" # Comment on exported should be of form
- "EXC0015" # Should have a package comment
+3
View File
@@ -0,0 +1,3 @@
---
line-length: false
no-hard-tabs: false
+11
View File
@@ -0,0 +1,11 @@
# vim: ft=yaml
---
yaml-files:
- "*.yaml"
- "*.yml"
- ".yamllint"
ignore: "dist/helm/"
extends: "default"
rules:
quoted-strings: "enable"
line-length: "disable"
+1 -1
View File
@@ -75,7 +75,7 @@ func (i InfoHash) TruncateV1() InfoHash {
}
// NewInfoHash creates an InfoHash from a byte slice or raw/hex string.
func NewInfoHash(b interface{}) (InfoHash, error) {
func NewInfoHash(b any) (InfoHash, error) {
if b == nil {
return NoneInfoHash, ErrInvalidHashType
}
+1 -1
View File
@@ -8,7 +8,7 @@ import (
)
func TestNew(t *testing.T) {
var table = []struct {
table := []struct {
data string
expected Event
expectedErr error
+4 -9
View File
@@ -39,7 +39,7 @@ var ErrKeyNotFound = errors.New("query: value for the provided key does not exis
// ErrInvalidInfohash is returned when parsing a query encounters an infohash
// with invalid length.
var ErrInvalidInfohash = ClientError("provided invalid infohash")
//var ErrInvalidInfohash = ClientError("provided invalid infohash")
// ErrInvalidQueryEscape is returned when a query string contains invalid
// escapes.
@@ -188,20 +188,15 @@ func (qp *QueryParams) String(key string) (string, bool) {
return value, ok
}
// Uint64 returns a uint parsed from a query. After being called, it is safe to
// Uint returns an uint parsed from a query. After being called, it is safe to
// cast the uint64 to your desired length.
func (qp *QueryParams) Uint64(key string) (uint64, error) {
func (qp *QueryParams) Uint(key string, bitSize int) (uint64, error) {
str, exists := qp.params[key]
if !exists {
return 0, ErrKeyNotFound
}
val, err := strconv.ParseUint(str, 10, 64)
if err != nil {
return 0, err
}
return val, nil
return strconv.ParseUint(str, 10, bitSize)
}
// InfoHashes returns a list of requested infohashes.
+1 -1
View File
@@ -92,7 +92,7 @@ func TestParseInvalidURLData(t *testing.T) {
func TestParseShouldNotPanicURLData(t *testing.T) {
for _, parseStr := range shouldNotPanicQueries {
ParseURLData(parseStr)
_, _ = ParseURLData(parseStr)
}
}
+3 -3
View File
@@ -4,7 +4,7 @@ import (
"errors"
"os"
"gopkg.in/yaml.v2"
"gopkg.in/yaml.v3"
"github.com/sot-tech/mochi/frontend/http"
"github.com/sot-tech/mochi/frontend/udp"
@@ -22,8 +22,8 @@ import (
)
type storageConfig struct {
Name string `yaml:"name"`
Config interface{} `yaml:"config"`
Name string `yaml:"name"`
Config any `yaml:"config"`
}
// Config represents the configuration used for executing Conf.
+10 -15
View File
@@ -1,6 +1,7 @@
package main
import (
"context"
"errors"
"github.com/sirupsen/logrus"
"github.com/sot-tech/mochi/frontend/http"
@@ -8,10 +9,10 @@ import (
"github.com/sot-tech/mochi/middleware"
"github.com/sot-tech/mochi/pkg/log"
"github.com/sot-tech/mochi/pkg/metrics"
_ "github.com/sot-tech/mochi/pkg/rand_seed"
"github.com/sot-tech/mochi/pkg/stop"
"github.com/sot-tech/mochi/storage"
"github.com/spf13/cobra"
"os"
"os/signal"
"runtime"
"strings"
@@ -147,15 +148,13 @@ func RootRunCmdFunc(cmd *cobra.Command, _ []string) error {
return err
}
quit := make(chan os.Signal, 1)
signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
reload := makeReloadChan()
shutdown, _ := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
reload, _ := signal.NotifyContext(context.Background(), ReloadSignals...)
for {
select {
case <-reload:
log.Info("reloading; received SIGUSR1")
case <-reload.Done():
log.Info("reloading; received reload signal")
peerStore, err := r.Stop(true)
if err != nil {
return err
@@ -164,8 +163,8 @@ func RootRunCmdFunc(cmd *cobra.Command, _ []string) error {
if err := r.Start(peerStore); err != nil {
return err
}
case <-quit:
log.Info("shutting down; received SIGINT/SIGTERM")
case <-shutdown.Done():
log.Info("shutting down; received shutdown signal")
if _, err := r.Stop(false); err != nil {
return err
}
@@ -213,7 +212,7 @@ func RootPostRunCmdFunc(_ *cobra.Command, _ []string) error {
}
func main() {
var rootCmd = &cobra.Command{
rootCmd := &cobra.Command{
Use: "mochi",
Short: "BitTorrent Tracker",
Long: "A customizable, multi-protocol BitTorrent Tracker",
@@ -224,11 +223,7 @@ func main() {
rootCmd.PersistentFlags().Bool("debug", false, "enable debug logging")
rootCmd.PersistentFlags().Bool("json", false, "enable json logging")
if runtime.GOOS == "windows" {
rootCmd.PersistentFlags().Bool("nocolors", true, "disable log coloring")
} else {
rootCmd.PersistentFlags().Bool("nocolors", false, "disable log coloring")
}
rootCmd.PersistentFlags().Bool("nocolors", runtime.GOOS == "windows", "disable log coloring")
rootCmd.Flags().String("config", "/etc/mochi.yaml", "location of configuration file")
+4 -5
View File
@@ -5,12 +5,11 @@ package main
import (
"os"
"os/signal"
"syscall"
)
func makeReloadChan() <-chan os.Signal {
reload := make(chan os.Signal, 1)
signal.Notify(reload, syscall.SIGUSR1)
return reload
// ReloadSignals are the signals that the current OS will send to the process
// when a configuration reload is requested.
var ReloadSignals = []os.Signal{
syscall.SIGUSR1,
}
+4 -5
View File
@@ -5,12 +5,11 @@ package main
import (
"os"
"os/signal"
"syscall"
)
func makeReloadChan() <-chan os.Signal {
reload := make(chan os.Signal, 1)
signal.Notify(reload, syscall.SIGHUP)
return reload
// ReloadSignals are the signals that the current OS will send to the process
// when a configuration reload is requested.
var ReloadSignals = []os.Signal{
syscall.SIGHUP,
}
+1 -1
View File
@@ -260,7 +260,7 @@ func (f *Frontend) serveHTTP(handler http.Handler, tls bool) error {
err = srv.ListenAndServe()
}
// Start the HTTP server.
if err != http.ErrServerClosed {
if !errors.Is(err, http.ErrServerClosed) {
return err
}
return nil
+5 -5
View File
@@ -72,25 +72,25 @@ func ParseAnnounce(r *http.Request, opts ParseOptions) (*bittorrent.AnnounceRequ
return nil, err
}
// Determine the number of remaining bytes for the client.
request.Left, err = qp.Uint64("left")
request.Left, err = qp.Uint("left", 64)
if err != nil {
return nil, bittorrent.ClientError("failed to parse parameter: left")
}
// Determine the number of bytes downloaded by the client.
request.Downloaded, err = qp.Uint64("downloaded")
request.Downloaded, err = qp.Uint("downloaded", 64)
if err != nil {
return nil, bittorrent.ClientError("failed to parse parameter: downloaded")
}
// Determine the number of bytes shared by the client.
request.Uploaded, err = qp.Uint64("uploaded")
request.Uploaded, err = qp.Uint("uploaded", 64)
if err != nil {
return nil, bittorrent.ClientError("failed to parse parameter: uploaded")
}
// Determine the number of peers the client wants in the response.
numwant, err := qp.Uint64("numwant")
numwant, err := qp.Uint("numwant", 32)
if err != nil && err != bittorrent.ErrKeyNotFound {
return nil, bittorrent.ClientError("failed to parse parameter: numwant")
}
@@ -99,7 +99,7 @@ func ParseAnnounce(r *http.Request, opts ParseOptions) (*bittorrent.AnnounceRequ
request.NumWant = uint32(numwant)
// Parse the port where the client is listening.
port, err := qp.Uint64("port")
port, err := qp.Uint("port", 16)
if err != nil {
return nil, bittorrent.ClientError("failed to parse parameter: port")
}
+4 -2
View File
@@ -1,6 +1,7 @@
package http
import (
"errors"
"time"
"github.com/prometheus/client_golang/prometheus"
@@ -26,8 +27,9 @@ var promResponseDurationMilliseconds = prometheus.NewHistogramVec(
func recordResponseDuration(action string, af *bittorrent.AddressFamily, err error, duration time.Duration) {
var errString string
if err != nil {
if _, ok := err.(bittorrent.ClientError); ok {
errString = err.Error()
var clientErr bittorrent.ClientError
if errors.As(err, &clientErr) {
errString = clientErr.Error()
} else {
errString = "internal error"
}
+12 -12
View File
@@ -1,6 +1,7 @@
package http
import (
"errors"
"github.com/anacrolix/torrent/bencode"
"github.com/sot-tech/mochi/bittorrent"
"github.com/sot-tech/mochi/pkg/log"
@@ -8,19 +9,18 @@ import (
"time"
)
type strMap map[string]interface{}
// WriteError communicates an error to a BitTorrent client over HTTP.
func WriteError(w http.ResponseWriter, err error) {
message := "internal server error"
if _, clientErr := err.(bittorrent.ClientError); clientErr {
message = err.Error()
var clientErr bittorrent.ClientError
if errors.As(err, &clientErr) {
message = clientErr.Error()
} else {
log.Error("http: internal error", log.Err(err))
}
w.WriteHeader(http.StatusOK)
if err = bencode.NewEncoder(w).Encode(map[string]interface{}{
if err = bencode.NewEncoder(w).Encode(map[string]any{
"failure reason": message,
}); err != nil {
log.Error("unable to encode string", log.Err(err))
@@ -38,7 +38,7 @@ func WriteAnnounceResponse(w http.ResponseWriter, resp *bittorrent.AnnounceRespo
resp.MinInterval /= time.Second
}
bdict := strMap{
bdict := map[string]any{
"complete": resp.Complete,
"incomplete": resp.Incomplete,
"interval": resp.Interval,
@@ -67,7 +67,7 @@ func WriteAnnounceResponse(w http.ResponseWriter, resp *bittorrent.AnnounceRespo
} else {
// Add the peers to the dictionary.
var peers []strMap
peers := make([]map[string]any, 0, len(resp.IPv4Peers)+len(resp.IPv6Peers))
for _, peer := range resp.IPv4Peers {
peers = append(peers, dict(peer))
}
@@ -83,15 +83,15 @@ func WriteAnnounceResponse(w http.ResponseWriter, resp *bittorrent.AnnounceRespo
// WriteScrapeResponse communicates the results of a Scrape to a BitTorrent
// client over HTTP.
func WriteScrapeResponse(w http.ResponseWriter, resp *bittorrent.ScrapeResponse) error {
filesDict := make(strMap)
filesDict := make(map[string]any, len(resp.Files))
for _, scrape := range resp.Files {
filesDict[string(scrape.InfoHash[:])] = strMap{
filesDict[string(scrape.InfoHash[:])] = map[string]any{
"complete": scrape.Complete,
"incomplete": scrape.Incomplete,
}
}
return bencode.NewEncoder(w).Encode(strMap{
return bencode.NewEncoder(w).Encode(map[string]any{
"files": filesDict,
})
}
@@ -118,8 +118,8 @@ func compact6(peer bittorrent.Peer) (buf []byte) {
return
}
func dict(peer bittorrent.Peer) strMap {
return strMap{
func dict(peer bittorrent.Peer) map[string]any {
return map[string]any{
"peer id": string(peer.ID[:]),
"ip": peer.IP.String(),
"port": peer.Port,
+2 -2
View File
@@ -11,7 +11,7 @@ import (
)
func TestWriteError(t *testing.T) {
var table = []struct {
table := []struct {
reason, expected string
}{
{"hello world", "d14:failure reason11:hello worlde"},
@@ -28,7 +28,7 @@ func TestWriteError(t *testing.T) {
}
func TestWriteStatus(t *testing.T) {
var table = []struct {
table := []struct {
reason, expected string
}{
{"something is missing", "d14:failure reason20:something is missinge"},
+15 -10
View File
@@ -10,25 +10,30 @@ type BytePool struct {
// New allocates a new BytePool with slices of equal length and capacity.
func New(length int) *BytePool {
var bp BytePool
bp.Pool.New = func() interface{} {
return make([]byte, length, length)
bp.Pool.New = func() any {
// This avoids allocations for the slice metadata, see:
// https://staticcheck.io/docs/checks#SA6002
b := make([]byte, length)
return &b
}
return &bp
}
// Get returns a byte slice from the pool.
func (bp *BytePool) Get() []byte {
return bp.Pool.Get().([]byte)
func (bp *BytePool) Get() *[]byte {
return bp.Pool.Get().(*[]byte)
}
// Put returns a byte slice to the pool.
func (bp *BytePool) Put(b []byte) {
b = b[:cap(b)]
func (bp *BytePool) Put(b *[]byte) {
*b = (*b)[:cap(*b)]
// Zero out the bytes.
// Apparently this specific expression is optimized by the compiler, see
// github.com/golang/go/issues/5373.
for i := range b {
b[i] = 0
// This specific expression is optimized by the compiler:
// https://github.com/golang/go/issues/5373.
for i := range *b {
(*b)[i] = 0
}
bp.Pool.Put(b)
}
+2 -2
View File
@@ -138,7 +138,7 @@ func BenchmarkConnectionIDGenerator_Generate(b *testing.B) {
createdAt := time.Now()
pool := &sync.Pool{
New: func() interface{} {
New: func() any {
return NewConnectionIDGenerator(key)
},
}
@@ -176,7 +176,7 @@ func BenchmarkConnectionIDGenerator_Validate(b *testing.B) {
cid := NewConnectionID(ip, createdAt, key)
pool := &sync.Pool{
New: func() interface{} {
New: func() any {
return NewConnectionIDGenerator(key)
},
}
+6 -9
View File
@@ -55,7 +55,6 @@ func (cfg Config) Validate() Config {
// Generate a private key if one isn't provided by the user.
if cfg.PrivateKey == "" {
rand.Seed(time.Now().UnixNano())
pkeyRunes := make([]rune, 64)
for i := range pkeyRunes {
pkeyRunes[i] = allowedGeneratedPrivateKeyRunes[rand.Intn(len(allowedGeneratedPrivateKeyRunes))]
@@ -117,14 +116,13 @@ func NewFrontend(logic frontend.TrackerLogic, provided Config) (*Frontend, error
logic: logic,
Config: cfg,
genPool: &sync.Pool{
New: func() interface{} {
New: func() any {
return NewConnectionIDGenerator(cfg.PrivateKey)
},
},
}
err := f.listen()
if err != nil {
if err := f.listen(); err != nil {
return nil, err
}
@@ -185,10 +183,10 @@ func (t *Frontend) serve() error {
// Read a UDP packet into a reusable buffer.
buffer := pool.Get()
n, addr, err := t.socket.ReadFromUDP(buffer)
n, addr, err := t.socket.ReadFromUDP(*buffer)
if err != nil {
pool.Put(buffer)
if netErr, ok := err.(net.Error); ok && netErr.Temporary() {
if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
// A temporary failure is not fatal; just pretend it never happened.
continue
}
@@ -217,7 +215,7 @@ func (t *Frontend) serve() error {
}
action, af, err := t.handleRequest(
// Make sure the IP is copied, not referenced.
Request{buffer[:n], append([]byte{}, addr.IP...)},
Request{(*buffer)[:n], append([]byte{}, addr.IP...)},
ResponseWriter{t.socket, addr},
)
if t.EnableRequestTiming {
@@ -244,8 +242,7 @@ type ResponseWriter struct {
// Write implements the io.Writer interface for a ResponseWriter.
func (w ResponseWriter) Write(b []byte) (int, error) {
w.socket.WriteToUDP(b, w.addr)
return len(b), nil
return w.socket.WriteToUDP(b, w.addr)
}
// handleRequest parses and responds to a UDP Request.
+1
View File
@@ -5,6 +5,7 @@ import (
"github.com/sot-tech/mochi/frontend/udp"
"github.com/sot-tech/mochi/middleware"
_ "github.com/sot-tech/mochi/pkg/rand_seed"
"github.com/sot-tech/mochi/storage"
_ "github.com/sot-tech/mochi/storage/memory"
)
+8 -7
View File
@@ -23,9 +23,9 @@ const (
// Option-Types as described in BEP 41 and BEP 45.
const (
optionEndOfOptions byte = 0x0
optionNOP = 0x1
optionURLData = 0x2
optionEndOfOptions = 0x0
optionNOP = 0x1
optionURLData = 0x2
)
var (
@@ -80,6 +80,7 @@ func ParseAnnounce(r Request, v6Action bool, opts ParseOptions) (*bittorrent.Ann
return nil, errMalformedPacket
}
// XXX: pure V2 hashes will cause invalid parsing
infohash := r.Packet[16:36]
peerIDBytes := r.Packet[36:56]
downloaded := binary.BigEndian.Uint64(r.Packet[56:64])
@@ -93,10 +94,10 @@ func ParseAnnounce(r Request, v6Action bool, opts ParseOptions) (*bittorrent.Ann
ip := r.IP
ipProvided := false
ipbytes := r.Packet[84:ipEnd]
ipBytes := r.Packet[84:ipEnd]
if opts.AllowIPSpoofing {
// Make sure the bytes are copied to a new slice.
copy(ip, net.IP(ipbytes))
copy(ip, ipBytes)
ipProvided = true
}
if !opts.AllowIPSpoofing && r.IP == nil {
@@ -152,7 +153,7 @@ type buffer struct {
}
var bufferFree = sync.Pool{
New: func() interface{} { return new(buffer) },
New: func() any { return new(buffer) },
}
func newBuffer() *buffer {
@@ -171,7 +172,7 @@ func handleOptionalParameters(packet []byte) (bittorrent.Params, error) {
return bittorrent.ParseURLData("")
}
var buf = newBuffer()
buf := newBuffer()
defer buf.free()
for i := 0; i < len(packet); {
+2 -1
View File
@@ -1,6 +1,7 @@
package udp
import (
"errors"
"fmt"
"testing"
)
@@ -51,7 +52,7 @@ func TestHandleOptionalParameters(t *testing.T) {
for _, tt := range table {
t.Run(fmt.Sprintf("%#v as %#v", tt.data, tt.values), func(t *testing.T) {
params, err := handleOptionalParameters(tt.data)
if err != tt.err {
if !errors.Is(err, tt.err) {
if tt.err == nil {
t.Fatalf("expected no parsing error for %x but got %s", tt.data, err)
} else {
+4 -2
View File
@@ -1,6 +1,7 @@
package udp
import (
"errors"
"time"
"github.com/prometheus/client_golang/prometheus"
@@ -26,8 +27,9 @@ var promResponseDurationMilliseconds = prometheus.NewHistogramVec(
func recordResponseDuration(action string, af *bittorrent.AddressFamily, err error, duration time.Duration) {
var errString string
if err != nil {
if _, ok := err.(bittorrent.ClientError); ok {
errString = err.Error()
var clientErr bittorrent.ClientError
if errors.As(err, &clientErr) {
errString = clientErr.Error()
} else {
errString = "internal error"
}
+4 -2
View File
@@ -2,6 +2,7 @@ package udp
import (
"encoding/binary"
"errors"
"fmt"
"io"
"time"
@@ -12,8 +13,9 @@ import (
// WriteError writes the failure reason as a null-terminated string.
func WriteError(w io.Writer, txID []byte, err error) {
// If the client wasn't at fault, acknowledge it.
if _, ok := err.(bittorrent.ClientError); !ok {
err = fmt.Errorf("internal error occurred: %s", err.Error())
var clientErr bittorrent.ClientError
if !errors.As(err, &clientErr) {
err = fmt.Errorf("internal error occurred: %w", err)
}
buf := newBuffer()
+36 -16
View File
@@ -1,29 +1,49 @@
module github.com/sot-tech/mochi
go 1.16
go 1.18
require (
github.com/SermoDigital/jose v0.9.2-0.20180104203859-803625baeddc
github.com/alicebob/gopher-json v0.0.0-20200520072559-a9ecdc9d1d3a // indirect
github.com/alicebob/miniredis v2.5.0+incompatible
github.com/anacrolix/dht/v2 v2.16.1 // indirect
github.com/anacrolix/log v0.13.1 // indirect
github.com/anacrolix/missinggo/v2 v2.5.3 // indirect
github.com/anacrolix/torrent v1.41.0
github.com/go-redsync/redsync v1.4.2
github.com/gomodule/redigo v2.0.0+incompatible
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/anacrolix/torrent v1.42.0
github.com/go-redsync/redsync/v4 v4.5.0
github.com/gomodule/redigo v1.8.8
github.com/julienschmidt/httprouter v1.3.0
github.com/klauspost/cpuid/v2 v2.0.11 // indirect
github.com/mendsley/gojwk v0.0.0-20141217222730-4d5ec6e58103
github.com/minio/sha256-simd v1.0.0
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.12.1
github.com/sirupsen/logrus v1.8.1
github.com/spf13/cobra v1.3.0
github.com/stretchr/testify v1.7.0
github.com/yuin/gopher-lua v0.0.0-20210529063254-f4c35e4016d9 // indirect
golang.org/x/sys v0.0.0-20220307203707-22a9840ba4d7 // indirect
gopkg.in/yaml.v2 v2.4.0
github.com/spf13/cobra v1.4.0
github.com/stretchr/testify v1.7.1
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b
)
require (
github.com/alicebob/gopher-json v0.0.0-20200520072559-a9ecdc9d1d3a // indirect
github.com/anacrolix/dht/v2 v2.17.0 // indirect
github.com/anacrolix/log v0.13.1 // indirect
github.com/anacrolix/missinggo v1.3.0 // indirect
github.com/anacrolix/missinggo/v2 v2.6.0 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/bradfitz/iter v0.0.0-20191230175014-e8f45d346db8 // indirect
github.com/cespare/xxhash/v2 v2.1.2 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/fsnotify/fsnotify v1.5.1 // indirect
github.com/go-redis/redis/v8 v8.11.5 // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/huandu/xstrings v1.3.2 // indirect
github.com/inconshreveable/mousetrap v1.0.0 // indirect
github.com/klauspost/cpuid/v2 v2.0.12 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/client_model v0.2.0 // indirect
github.com/prometheus/common v0.33.0 // indirect
github.com/prometheus/procfs v0.7.3 // indirect
github.com/spf13/pflag v1.0.5 // indirect
github.com/yuin/gopher-lua v0.0.0-20210529063254-f4c35e4016d9 // indirect
golang.org/x/sys v0.0.0-20220412071739-889880a91fd5 // indirect
google.golang.org/protobuf v1.28.0 // indirect
)
+41 -1002
View File
File diff suppressed because it is too large Load Diff
+1 -1
View File
@@ -6,7 +6,7 @@ import (
)
func TestClientID(t *testing.T) {
var clientTable = []struct{ peerID, clientID string }{
clientTable := []struct{ peerID, clientID string }{
{"-AZ3034-6wfG2wk6wWLc", "AZ3034"},
{"-AZ3042-6ozMq5q6Q3NX", "AZ3042"},
{"-BS5820-oy4La2MWGEFj", "BS5820"},
+3 -3
View File
@@ -9,7 +9,7 @@ import (
"github.com/sot-tech/mochi/bittorrent"
"github.com/sot-tech/mochi/middleware"
"github.com/sot-tech/mochi/storage"
"gopkg.in/yaml.v2"
"gopkg.in/yaml.v3"
)
// Name is the name by which this middleware is registered with Conf.
@@ -27,7 +27,7 @@ func (d driver) NewHook(optionBytes []byte, _ storage.Storage) (middleware.Hook,
var cfg Config
err := yaml.Unmarshal(optionBytes, &cfg)
if err != nil {
return nil, fmt.Errorf("invalid options for middleware %s: %s", Name, err)
return nil, fmt.Errorf("invalid options for middleware %s: %w", Name, err)
}
return NewHook(cfg)
@@ -82,7 +82,7 @@ func NewHook(cfg Config) (middleware.Hook, error) {
return h, nil
}
func (h *hook) HandleAnnounce(ctx context.Context, req *bittorrent.AnnounceRequest, resp *bittorrent.AnnounceResponse) (context.Context, error) {
func (h *hook) HandleAnnounce(ctx context.Context, req *bittorrent.AnnounceRequest, _ *bittorrent.AnnounceResponse) (context.Context, error) {
clientID := NewClientID(req.Peer.ID)
if len(h.approved) > 0 {
+4 -3
View File
@@ -2,6 +2,7 @@ package middleware
import (
"context"
"errors"
"github.com/sot-tech/mochi/bittorrent"
"github.com/sot-tech/mochi/storage"
)
@@ -36,12 +37,12 @@ func (h *swarmInteractionHook) HandleAnnounce(ctx context.Context, req *bittorre
switch {
case req.Event == bittorrent.Stopped:
err = h.store.DeleteSeeder(req.InfoHash, req.Peer)
if err != nil && err != storage.ErrResourceDoesNotExist {
if err != nil && !errors.Is(err, storage.ErrResourceDoesNotExist) {
return ctx, err
}
err = h.store.DeleteLeecher(req.InfoHash, req.Peer)
if err != nil && err != storage.ErrResourceDoesNotExist {
if err != nil && !errors.Is(err, storage.ErrResourceDoesNotExist) {
return ctx, err
}
case req.Event == bittorrent.Completed:
@@ -105,7 +106,7 @@ func (h *responseHook) HandleAnnounce(ctx context.Context, req *bittorrent.Annou
func (h *responseHook) appendPeers(req *bittorrent.AnnounceRequest, resp *bittorrent.AnnounceResponse) error {
seeding := req.Left == 0
peers, err := h.store.AnnouncePeers(req.InfoHash, seeding, int(req.NumWant), req.Peer)
if err != nil && err != storage.ErrResourceDoesNotExist {
if err != nil && !errors.Is(err, storage.ErrResourceDoesNotExist) {
return err
}
+4 -5
View File
@@ -22,7 +22,7 @@ import (
"github.com/sot-tech/mochi/pkg/log"
"github.com/sot-tech/mochi/pkg/stop"
"github.com/sot-tech/mochi/storage"
"gopkg.in/yaml.v2"
"gopkg.in/yaml.v3"
"net/http"
"strings"
"time"
@@ -43,7 +43,7 @@ func (d driver) NewHook(optionBytes []byte, _ storage.Storage) (middleware.Hook,
var cfg Config
err := yaml.Unmarshal(optionBytes, &cfg)
if err != nil {
return nil, fmt.Errorf("invalid options for middleware %s: %s", Name, err)
return nil, fmt.Errorf("invalid options for middleware %s: %w", Name, err)
}
return NewHook(cfg)
@@ -92,8 +92,7 @@ func NewHook(cfg Config) (middleware.Hook, error) {
}
log.Debug("performing initial fetch of JWKs")
err := h.updateKeys()
if err != nil {
if err := h.updateKeys(); err != nil {
return nil, errors.New("failed to fetch initial JWK Set: " + err.Error())
}
@@ -156,7 +155,7 @@ func (h *hook) Stop() stop.Result {
return c.Result()
}
func (h *hook) HandleAnnounce(ctx context.Context, req *bittorrent.AnnounceRequest, resp *bittorrent.AnnounceResponse) (context.Context, error) {
func (h *hook) HandleAnnounce(ctx context.Context, req *bittorrent.AnnounceRequest, _ *bittorrent.AnnounceResponse) (context.Context, error) {
if req.Params == nil {
return ctx, ErrMissingJWT
}
+3 -3
View File
@@ -5,7 +5,7 @@ package middleware
import (
"errors"
"github.com/sot-tech/mochi/storage"
"gopkg.in/yaml.v2"
"gopkg.in/yaml.v3"
"sync"
)
@@ -67,8 +67,8 @@ func New(name string, optionBytes []byte, storage storage.Storage) (Hook, error)
// Config is the generic configuration format used for all registered Hooks.
type Config struct {
Name string `yaml:"name"`
Options map[string]interface{} `yaml:"options"`
Name string `yaml:"name"`
Options map[string]any `yaml:"options"`
}
// HooksFromHookConfigs is a utility function for initializing Hooks in bulk.
+2 -4
View File
@@ -1,15 +1,13 @@
package random
import (
_ "github.com/sot-tech/mochi/pkg/rand_seed"
"github.com/stretchr/testify/require"
"math/rand"
"testing"
"time"
"github.com/stretchr/testify/require"
)
func TestIntn(t *testing.T) {
rand.Seed(time.Now().UnixNano())
s0, s1 := rand.Uint64(), rand.Uint64()
var k int
for i := 0; i < 10000; i++ {
@@ -15,7 +15,7 @@ import (
"github.com/sot-tech/mochi/pkg/log"
"github.com/sot-tech/mochi/pkg/stop"
"github.com/sot-tech/mochi/storage"
"gopkg.in/yaml.v2"
"gopkg.in/yaml.v3"
)
// Name of this container for registry
@@ -36,7 +36,7 @@ type Config struct {
func build(confBytes []byte, st storage.Storage) (container.Container, error) {
c := new(Config)
if err := yaml.Unmarshal(confBytes, c); err != nil {
return nil, fmt.Errorf("unable to deserialise configuration: %v", err)
return nil, fmt.Errorf("unable to deserialise configuration: %w", err)
}
var err error
d := &directory{
@@ -49,7 +49,7 @@ func build(confBytes []byte, st storage.Storage) (container.Container, error) {
}
var w *dirwatch.Instance
if w, err = dirwatch.New(c.Path); err != nil {
return nil, fmt.Errorf("unable to initialize directory watch: %v", err)
return nil, fmt.Errorf("unable to initialize directory watch: %w", err)
}
d.watcher = w
if len(d.StorageCtx) == 0 {
@@ -8,7 +8,7 @@ import (
"github.com/sot-tech/mochi/middleware/torrentapproval/container"
"github.com/sot-tech/mochi/pkg/log"
"github.com/sot-tech/mochi/storage"
"gopkg.in/yaml.v2"
"gopkg.in/yaml.v3"
)
// Name of this container for registry.
@@ -35,7 +35,7 @@ const DUMMY = "_"
func build(confBytes []byte, st storage.Storage) (container.Container, error) {
c := new(Config)
if err := yaml.Unmarshal(confBytes, c); err != nil {
return nil, fmt.Errorf("unable to deserialise configuration: %v", err)
return nil, fmt.Errorf("unable to deserialise configuration: %w", err)
}
l := &List{
Invert: c.Invert,
@@ -14,7 +14,7 @@ import (
_ "github.com/sot-tech/mochi/middleware/torrentapproval/container/list"
"github.com/sot-tech/mochi/pkg/stop"
"github.com/sot-tech/mochi/storage"
"gopkg.in/yaml.v2"
"gopkg.in/yaml.v3"
)
// Name is the name by which this middleware is registered with Conf.
@@ -28,7 +28,7 @@ type baseConfig struct {
// Source - name of container for initial values
Source string `yaml:"initial_source"`
// Configuration depends on used container
Configuration map[string]interface{} `yaml:"configuration"`
Configuration map[string]any `yaml:"configuration"`
}
type driver struct{}
@@ -37,7 +37,7 @@ func (d driver) NewHook(optionBytes []byte, storage storage.Storage) (middleware
var cfg baseConfig
err := yaml.Unmarshal(optionBytes, &cfg)
if err != nil {
return nil, fmt.Errorf("invalid options for middleware %s: %s", Name, err)
return nil, fmt.Errorf("invalid options for middleware %s: %w", Name, err)
}
if len(cfg.Source) == 0 {
@@ -6,7 +6,7 @@ import (
"github.com/sot-tech/mochi/bittorrent"
"github.com/sot-tech/mochi/storage/memory"
"github.com/stretchr/testify/require"
"gopkg.in/yaml.v2"
"gopkg.in/yaml.v3"
"testing"
)
@@ -19,7 +19,7 @@ var cases = []struct {
{
baseConfig{
Source: "list",
Configuration: map[string]interface{}{
Configuration: map[string]any{
"hash_list": []string{"3532cf2d327fad8448c075b4cb42c8136964a435"},
},
},
@@ -30,7 +30,7 @@ var cases = []struct {
{
baseConfig{
Source: "list",
Configuration: map[string]interface{}{
Configuration: map[string]any{
"hash_list": []string{"3532cf2d327fad8448c075b4cb42c8136964a435"},
},
},
@@ -41,7 +41,7 @@ var cases = []struct {
{
baseConfig{
Source: "list",
Configuration: map[string]interface{}{
Configuration: map[string]any{
"hash_list": []string{"3532cf2d327fad8448c075b4cb42c8136964a435"},
"invert": true,
},
@@ -53,7 +53,7 @@ var cases = []struct {
{
baseConfig{
Source: "list",
Configuration: map[string]interface{}{
Configuration: map[string]any{
"hash_list": []string{"3532cf2d327fad8448c075b4cb42c8136964a435"},
"invert": true,
},
+9 -14
View File
@@ -8,7 +8,7 @@ import (
"github.com/sot-tech/mochi/middleware"
"github.com/sot-tech/mochi/middleware/pkg/random"
"github.com/sot-tech/mochi/storage"
"gopkg.in/yaml.v2"
"gopkg.in/yaml.v3"
"sync"
"time"
)
@@ -28,7 +28,7 @@ func (d driver) NewHook(optionBytes []byte, _ storage.Storage) (middleware.Hook,
var cfg Config
err := yaml.Unmarshal(optionBytes, &cfg)
if err != nil {
return nil, fmt.Errorf("invalid options for middleware %s: %s", Name, err)
return nil, fmt.Errorf("invalid options for middleware %s: %w", Name, err)
}
return NewHook(cfg)
@@ -75,16 +75,13 @@ type hook struct {
// NewHook creates a middleware to randomly modify the announce interval from
// the given config.
func NewHook(cfg Config) (middleware.Hook, error) {
err := checkConfig(cfg)
if err != nil {
return nil, err
func NewHook(cfg Config) (h middleware.Hook, err error) {
if err = checkConfig(cfg); err == nil {
h = &hook{
cfg: cfg,
}
}
h := &hook{
cfg: cfg,
}
return h, nil
return
}
func (h *hook) HandleAnnounce(ctx context.Context, req *bittorrent.AnnounceRequest, resp *bittorrent.AnnounceResponse) (context.Context, error) {
@@ -102,14 +99,12 @@ func (h *hook) HandleAnnounce(ctx context.Context, req *bittorrent.AnnounceReque
if h.cfg.ModifyMinInterval {
resp.MinInterval += add
}
return ctx, nil
}
return ctx, nil
}
func (h *hook) HandleScrape(ctx context.Context, req *bittorrent.ScrapeRequest, resp *bittorrent.ScrapeResponse) (context.Context, error) {
func (h *hook) HandleScrape(ctx context.Context, _ *bittorrent.ScrapeRequest, _ *bittorrent.ScrapeResponse) (context.Context, error) {
// Scrapes are not altered.
return ctx, nil
}
+6 -6
View File
@@ -31,7 +31,7 @@ func SetOutput(to io.Writer) {
}
// Fields is a map of logging fields.
type Fields map[string]interface{}
type Fields map[string]any
// LogFields implements Fielder for Fields.
func (f Fields) LogFields() Fields {
@@ -87,7 +87,7 @@ func mergeFielders(fielders ...Fielder) logrus.Fields {
}
// Debug logs at the debug level if debug logging is enabled.
func Debug(v interface{}, fielders ...Fielder) {
func Debug(v any, fielders ...Fielder) {
if debug {
if len(fielders) != 0 {
l.WithFields(mergeFielders(fielders...)).Debug(v)
@@ -98,7 +98,7 @@ func Debug(v interface{}, fielders ...Fielder) {
}
// Info logs at the info level.
func Info(v interface{}, fielders ...Fielder) {
func Info(v any, fielders ...Fielder) {
if len(fielders) != 0 {
l.WithFields(mergeFielders(fielders...)).Info(v)
} else {
@@ -107,7 +107,7 @@ func Info(v interface{}, fielders ...Fielder) {
}
// Warn logs at the warning level.
func Warn(v interface{}, fielders ...Fielder) {
func Warn(v any, fielders ...Fielder) {
if len(fielders) != 0 {
l.WithFields(mergeFielders(fielders...)).Warn(v)
} else {
@@ -116,7 +116,7 @@ func Warn(v interface{}, fielders ...Fielder) {
}
// Error logs at the error level.
func Error(v interface{}, fielders ...Fielder) {
func Error(v any, fielders ...Fielder) {
if len(fielders) != 0 {
l.WithFields(mergeFielders(fielders...)).Error(v)
} else {
@@ -125,7 +125,7 @@ func Error(v interface{}, fielders ...Fielder) {
}
// Fatal logs at the fatal level and exits with a status code != 0.
func Fatal(v interface{}, fielders ...Fielder) {
func Fatal(v any, fielders ...Fielder) {
if len(fielders) != 0 {
l.WithFields(mergeFielders(fielders...)).Fatal(v)
} else {
+2 -1
View File
@@ -4,6 +4,7 @@ package metrics
import (
"context"
"errors"
"net/http"
"net/http/pprof"
@@ -49,7 +50,7 @@ func NewServer(addr string) *Server {
}
go func() {
if err := s.srv.ListenAndServe(); err != http.ErrServerClosed {
if err := s.srv.ListenAndServe(); !errors.Is(err, http.ErrServerClosed) {
log.Fatal("failed while serving prometheus", log.Err(err))
}
}()
+26
View File
@@ -0,0 +1,26 @@
// Package rand_seed just seeds (math) rand.Rand
package rand_seed
import (
cr "crypto/rand"
"math/rand"
"time"
)
func init() {
//Seeding global math random
rand.Seed(GenSeed())
}
// GenSeed returns 64bit seed from crypto/rand source or
// from current time, if crypto random error occurred
func GenSeed() (seed int64) {
r := make([]byte, 0, 8)
if _, err := cr.Read(r); err == nil {
seed = time.Now().UnixNano()
} else {
seed = int64(r[0])<<56 | int64(r[1])<<48 | int64(r[2])<<40 | int64(r[3])<<32 |
int64(r[4])<<24 | int64(r[5])<<16 | int64(r[6])<<8 | int64(r[7])
}
return
}
+10 -9
View File
@@ -5,12 +5,13 @@ package memory
import (
"encoding/binary"
"fmt"
"math"
"reflect"
"runtime"
"sync"
"time"
"gopkg.in/yaml.v2"
"gopkg.in/yaml.v3"
"github.com/sot-tech/mochi/bittorrent"
"github.com/sot-tech/mochi/pkg/log"
@@ -37,7 +38,7 @@ func init() {
type driver struct{}
func (d driver) NewStorage(icfg interface{}) (storage.Storage, error) {
func (d driver) NewStorage(icfg any) (storage.Storage, error) {
// Marshal the config back into bytes.
bytes, err := yaml.Marshal(icfg)
if err != nil {
@@ -80,7 +81,7 @@ func (cfg Config) LogFields() log.Fields {
func (cfg Config) Validate() Config {
validcfg := cfg
if cfg.ShardCount <= 0 {
if cfg.ShardCount <= 0 || cfg.ShardCount > (math.MaxInt/2) {
validcfg.ShardCount = defaultShardCount
log.Warn("falling back to default configuration", log.Fields{
"name": Name + ".ShardCount",
@@ -488,7 +489,7 @@ func (ps *store) ScrapeSwarm(ih bittorrent.InfoHash, addressFamily bittorrent.Ad
return
}
func asKey(in interface{}) interface{} {
func asKey(in any) any {
if in == nil {
panic("unable to use nil map key")
}
@@ -499,12 +500,12 @@ func asKey(in interface{}) interface{} {
return fmt.Sprint(in)
}
func (ps *store) Put(ctx string, key, value interface{}) {
func (ps *store) Put(ctx string, key, value any) {
m, _ := ps.contexts.LoadOrStore(ctx, new(sync.Map))
m.(*sync.Map).Store(asKey(key), value)
}
func (ps *store) Contains(ctx string, key interface{}) bool {
func (ps *store) Contains(ctx string, key any) bool {
var exist bool
if m, found := ps.contexts.Load(ctx); found {
_, exist = m.(*sync.Map).Load(asKey(key))
@@ -522,15 +523,15 @@ func (ps *store) BulkPut(ctx string, pairs ...storage.Pair) {
}
}
func (ps *store) Load(ctx string, key interface{}) interface{} {
var v interface{}
func (ps *store) Load(ctx string, key any) any {
var v any
if m, found := ps.contexts.Load(ctx); found {
v, _ = m.(*sync.Map).Load(asKey(key))
}
return v
}
func (ps *store) Delete(ctx string, keys ...interface{}) {
func (ps *store) Delete(ctx string, keys ...any) {
if len(keys) > 0 {
if m, found := ps.contexts.Load(ctx); found {
m := m.(*sync.Map)
+1 -1
View File
@@ -8,7 +8,7 @@ import (
// Pair - some key-value pair, used for BulkPut
type Pair struct {
Left, Right interface{}
Left, Right any
}
// SerializedPeer concatenation of PeerID, net port and IP-address
+22 -25
View File
@@ -1,24 +1,26 @@
package redis
import (
"context"
"errors"
"github.com/go-redsync/redsync/v4"
"github.com/go-redsync/redsync/v4/redis/redigo"
redigolib "github.com/gomodule/redigo/redis"
"net/url"
"strconv"
"strings"
"time"
"github.com/go-redsync/redsync"
"github.com/gomodule/redigo/redis"
)
// redisBackend represents a redis handler.
type redisBackend struct {
pool *redis.Pool
pool *redigolib.Pool
redsync *redsync.Redsync
}
// newRedisBackend creates a redisBackend instance.
func newRedisBackend(cfg *Config, u *redisURL, socketPath string) *redisBackend {
context.Background()
rc := &redisConnector{
URL: u,
SocketPath: socketPath,
@@ -27,18 +29,13 @@ func newRedisBackend(cfg *Config, u *redisURL, socketPath string) *redisBackend
ConnectTimeout: cfg.RedisConnectTimeout,
}
pool := rc.NewPool()
rs := redsync.New([]redsync.Pool{pool})
rs := redsync.New(redigo.NewPool(pool))
return &redisBackend{
pool: pool,
redsync: rs,
}
}
// open returns or creates instance of Redis connection.
func (rb *redisBackend) open() redis.Conn {
return rb.pool.Get()
}
type redisConnector struct {
URL *redisURL
SocketPath string
@@ -48,11 +45,11 @@ type redisConnector struct {
}
// NewPool returns a new pool of Redis connections
func (rc *redisConnector) NewPool() *redis.Pool {
return &redis.Pool{
func (rc *redisConnector) NewPool() *redigolib.Pool {
return &redigolib.Pool{
MaxIdle: 3,
IdleTimeout: 240 * time.Second,
Dial: func() (redis.Conn, error) {
Dial: func() (redigolib.Conn, error) {
c, err := rc.open()
if err != nil {
return nil, err
@@ -68,8 +65,8 @@ func (rc *redisConnector) NewPool() *redis.Pool {
return c, err
},
// PINGs connections that have been idle more than 10 seconds
TestOnBorrow: func(c redis.Conn, t time.Time) error {
if time.Since(t) < time.Duration(10*time.Second) {
TestOnBorrow: func(c redigolib.Conn, t time.Time) error {
if time.Since(t) < 10*time.Second {
return nil
}
_, err := c.Do("PING")
@@ -79,23 +76,23 @@ func (rc *redisConnector) NewPool() *redis.Pool {
}
// Open a new Redis connection
func (rc *redisConnector) open() (redis.Conn, error) {
var opts = []redis.DialOption{
redis.DialDatabase(rc.URL.DB),
redis.DialReadTimeout(rc.ReadTimeout),
redis.DialWriteTimeout(rc.WriteTimeout),
redis.DialConnectTimeout(rc.ConnectTimeout),
func (rc *redisConnector) open() (redigolib.Conn, error) {
opts := []redigolib.DialOption{
redigolib.DialDatabase(rc.URL.DB),
redigolib.DialReadTimeout(rc.ReadTimeout),
redigolib.DialWriteTimeout(rc.WriteTimeout),
redigolib.DialConnectTimeout(rc.ConnectTimeout),
}
if rc.URL.Password != "" {
opts = append(opts, redis.DialPassword(rc.URL.Password))
opts = append(opts, redigolib.DialPassword(rc.URL.Password))
}
if rc.SocketPath != "" {
return redis.Dial("unix", rc.SocketPath, opts...)
return redigolib.Dial("unix", rc.SocketPath, opts...)
}
return redis.Dial("tcp", rc.URL.Host, opts...)
return redigolib.Dial("tcp", rc.URL.Host, opts...)
}
// A redisURL represents a parsed redisURL
@@ -119,7 +116,7 @@ func parseRedisURL(target string) (*redisURL, error) {
return nil, errors.New("no redis scheme found")
}
db := 0 //default redis db
db := 0 // default redis db
parts := strings.Split(u.Path, "/")
if len(parts) != 1 {
db, err = strconv.Atoi(parts[1])
+18 -17
View File
@@ -24,13 +24,14 @@
package redis
import (
"errors"
"strconv"
"strings"
"sync"
"time"
"github.com/gomodule/redigo/redis"
"gopkg.in/yaml.v2"
"gopkg.in/yaml.v3"
"github.com/sot-tech/mochi/bittorrent"
"github.com/sot-tech/mochi/pkg/log"
@@ -60,7 +61,7 @@ func init() {
type driver struct{}
func (d driver) NewStorage(icfg interface{}) (storage.Storage, error) {
func (d driver) NewStorage(icfg any) (storage.Storage, error) {
// Marshal the config back into bytes.
bytes, err := yaml.Marshal(icfg)
if err != nil {
@@ -271,7 +272,7 @@ func (ps *store) getConnection() redis.Conn {
panic("attempted to interact with stopped redis store")
default:
}
return ps.rb.open()
return ps.rb.pool.Get()
}
func closeConnection(con redis.Conn) {
@@ -291,7 +292,7 @@ func (ps *store) populateProm() {
defer closeConnection(conn)
for _, group := range ps.groups() {
if n, err := redis.Int64(conn.Do("GET", ps.infohashCountKey(group))); err != nil && err != redis.ErrNil {
if n, err := redis.Int64(conn.Do("GET", ps.infohashCountKey(group))); err != nil && !errors.Is(err, redis.ErrNil) {
log.Error("storage: GET counter failure", log.Fields{
"key": ps.infohashCountKey(group),
"error": err,
@@ -299,7 +300,7 @@ func (ps *store) populateProm() {
} else {
numInfohashes += n
}
if n, err := redis.Int64(conn.Do("GET", ps.seederCountKey(group))); err != nil && err != redis.ErrNil {
if n, err := redis.Int64(conn.Do("GET", ps.seederCountKey(group))); err != nil && !errors.Is(err, redis.ErrNil) {
log.Error("storage: GET counter failure", log.Fields{
"key": ps.seederCountKey(group),
"error": err,
@@ -307,7 +308,7 @@ func (ps *store) populateProm() {
} else {
numSeeders += n
}
if n, err := redis.Int64(conn.Do("GET", ps.leecherCountKey(group))); err != nil && err != redis.ErrNil {
if n, err := redis.Int64(conn.Do("GET", ps.leecherCountKey(group))); err != nil && !errors.Is(err, redis.ErrNil) {
log.Error("storage: GET counter failure", log.Fields{
"key": ps.leecherCountKey(group),
"error": err,
@@ -448,7 +449,7 @@ func (ps *store) DeleteLeecher(ih bittorrent.InfoHash, p bittorrent.Peer) error
}
_, err = conn.Do("DECR", ps.leecherCountKey(addressFamily))
return nil
return err
}
func (ps *store) GraduateLeecher(ih bittorrent.InfoHash, p bittorrent.Peer) error {
@@ -517,13 +518,13 @@ func (ps *store) AnnouncePeers(ih bittorrent.InfoHash, seeder bool, numWant int,
if err != nil {
return nil, err
}
conLeechers := leechers.([]interface{})
conLeechers := leechers.([]any)
seeders, err := conn.Do("HKEYS", encodedSeederInfoHash)
if err != nil {
return nil, err
}
conSeeders := seeders.([]interface{})
conSeeders := seeders.([]any)
if len(conLeechers) == 0 && len(conSeeders) == 0 {
return nil, storage.ErrResourceDoesNotExist
@@ -606,7 +607,7 @@ func (ps *store) ScrapeSwarm(ih bittorrent.InfoHash, af bittorrent.AddressFamily
return
}
func (ps *store) Put(ctx string, key, value interface{}) {
func (ps *store) Put(ctx string, key, value any) {
conn := ps.getConnection()
defer closeConnection(conn)
_, err := conn.Do("HSET", ctx, key, value)
@@ -615,7 +616,7 @@ func (ps *store) Put(ctx string, key, value interface{}) {
}
}
func (ps *store) Contains(ctx string, key interface{}) bool {
func (ps *store) Contains(ctx string, key any) bool {
conn := ps.getConnection()
defer closeConnection(conn)
exist, err := redis.Bool(conn.Do("HEXISTS", ctx, key))
@@ -636,7 +637,7 @@ func (ps *store) BulkPut(ctx string, pairs ...storage.Pair) {
default:
conn := ps.getConnection()
defer closeConnection(conn)
args := make([]interface{}, 1, l*2+1)
args := make([]any, 1, l*2+1)
args[0] = ctx
for _, p := range pairs {
args = append(args, p.Left, p.Right)
@@ -657,7 +658,7 @@ func (ps *store) BulkPut(ctx string, pairs ...storage.Pair) {
}
}
func (ps *store) Load(ctx string, key interface{}) interface{} {
func (ps *store) Load(ctx string, key any) any {
conn := ps.getConnection()
defer closeConnection(conn)
v, err := conn.Do("HGET", ctx, key)
@@ -667,7 +668,7 @@ func (ps *store) Load(ctx string, key interface{}) interface{} {
return v
}
func (ps *store) Delete(ctx string, keys ...interface{}) {
func (ps *store) Delete(ctx string, keys ...any) {
switch l := len(keys); l {
case 0:
break
@@ -681,7 +682,7 @@ func (ps *store) Delete(ctx string, keys ...interface{}) {
default:
conn := ps.getConnection()
defer closeConnection(conn)
args := make([]interface{}, 1, l+1)
args := make([]any, 1, l+1)
args[0] = ctx
args = append(args, keys...)
_, err := conn.Do("HDEL", args...)
@@ -823,7 +824,7 @@ func (ps *store) collectGarbage(cutoff time.Time) error {
_ = conn.Send("DECR", ps.infohashCountKey(group))
}
_, err = redis.Values(conn.Do("EXEC"))
if err != nil && err != redis.ErrNil {
if err != nil && !errors.Is(err, redis.ErrNil) {
log.Error("storage: Redis EXEC failure", log.Fields{
"group": group,
"infohash": ihStr,
@@ -831,7 +832,7 @@ func (ps *store) collectGarbage(cutoff time.Time) error {
})
}
} else {
if _, err = conn.Do("UNWATCH"); err != nil && err != redis.ErrNil {
if _, err = conn.Do("UNWATCH"); err != nil && !errors.Is(err, redis.ErrNil) {
log.Error("storage: Redis UNWATCH failure", log.Fields{"error": err})
}
}
+8 -9
View File
@@ -16,7 +16,7 @@ var (
// Driver is the interface used to initialize a new type of Storage.
type Driver interface {
NewStorage(cfg interface{}) (Storage, error)
NewStorage(cfg any) (Storage, error)
}
// ErrResourceDoesNotExist is the error returned by all delete methods and the
@@ -107,27 +107,26 @@ type Storage interface {
// Put used to place arbitrary k-v data with specified context
// into storage. ctx parameter used to group data
// (i.e. data only for specific middleware module)
Put(ctx string, key, value interface{})
Put(ctx string, key, value any)
// BulkPut used to place array of k-v data in specified context.
// Useful when several data entries should be added in single transaction/connection
BulkPut(ctx string, pairs ...Pair)
// Contains checks if any data in specified context exist
Contains(ctx string, key interface{}) bool
Contains(ctx string, key any) bool
// Load used to get arbitrary data in specified context by its key
Load(ctx string, key interface{}) interface{}
Load(ctx string, key any) any
// Delete used to delete arbitrary data in specified context by its keys
Delete(ctx string, keys ...interface{})
Delete(ctx string, keys ...any)
// stop.Stopper is an interface that expects a Stop method to stop the
// Storage.
// Stopper is an interface that expects a Stop method to stop the Storage.
// For more details see the documentation in the stop package.
stop.Stopper
// log.Fielder returns a loggable version of the data used to configure and
// Fielder returns a loggable version of the data used to configure and
// operate a particular Storage.
log.Fielder
}
@@ -158,7 +157,7 @@ func RegisterDriver(name string, d Driver) {
// the list of registered Drivers.
//
// If a driver does not exist, returns ErrDriverDoesNotExist.
func NewStorage(name string, cfg interface{}) (ps Storage, err error) {
func NewStorage(name string, cfg any) (ps Storage, err error) {
driversM.RLock()
defer driversM.RUnlock()
+8 -4
View File
@@ -2,6 +2,7 @@ package test
import (
"github.com/sot-tech/mochi/bittorrent"
"github.com/sot-tech/mochi/pkg/rand_seed"
"github.com/sot-tech/mochi/storage"
"math/rand"
"net"
@@ -26,7 +27,7 @@ func generateInfohashes() (a [1000]bittorrent.InfoHash) {
}
func generatePeers() (a [1000]bittorrent.Peer) {
r := rand.New(rand.NewSource(0))
r := rand.New(rand.NewSource(rand_seed.GenSeed()))
for i := range a {
ip := make([]byte, 4)
n, err := r.Read(ip)
@@ -49,9 +50,11 @@ func generatePeers() (a [1000]bittorrent.Peer) {
return
}
type benchExecFunc func(int, storage.Storage, *benchData) error
type benchSetupFunc func(storage.Storage, *benchData) error
type benchStorageConstructor func() storage.Storage
type (
benchExecFunc func(int, storage.Storage, *benchData) error
benchSetupFunc func(storage.Storage, *benchData) error
benchStorageConstructor func() storage.Storage
)
type benchHolder struct {
st benchStorageConstructor
@@ -187,6 +190,7 @@ func (bh *benchHolder) PutDelete1kInfohash(b *testing.B) {
bh.runBenchmark(b, false, nil, func(i int, ps storage.Storage, bd *benchData) error {
err := ps.PutSeeder(bd.infohashes[i%1000], bd.peers[0])
if err != nil {
return err
}
return ps.DeleteSeeder(bd.infohashes[i%1000], bd.peers[0])
})
+1 -1
View File
@@ -198,7 +198,7 @@ func (th *testHolder) CustomPutContainsLoadDelete(t *testing.T) {
func (th *testHolder) CustomBulkPutContainsLoadDelete(t *testing.T) {
pairs := make([]storage.Pair, 0, len(testData))
keys := make([]interface{}, 0, len(testData))
keys := make([]any, 0, len(testData))
for _, c := range testData {
key := c.peer.String()
keys = append(keys, key)