cleanup: pre-phase-2 fixes
- fix two stale nzbstr comment refs - migration 002: drop api_keys.visibility and curation_set - remove Visibility from APIKey struct, GetAPIKey, CreateAPIKey, CLI - omit torznab size/seeders/peers attrs when data is absent - reset relay backoff on successful connection (>= 30s uptime) - use last_event from relays table as since on reconnect - fix TestSearchWithResults to actually query the test server DB
This commit is contained in:
@@ -26,7 +26,7 @@ import (
|
||||
func main() {
|
||||
if len(os.Args) < 3 {
|
||||
fmt.Fprintln(os.Stderr, "usage: kindexr-cli <command> <subcommand> [flags]")
|
||||
fmt.Fprintln(os.Stderr, " kindexr-cli apikey create --label <name> [--visibility all|wot|curated]")
|
||||
fmt.Fprintln(os.Stderr, " kindexr-cli apikey create --label <name>")
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
@@ -57,7 +57,6 @@ func runAPIKeyCreate(args []string) {
|
||||
fs := flag.NewFlagSet("apikey create", flag.ExitOnError)
|
||||
configPath := fs.String("config", "/etc/kindexr/config.yaml", "path to config file")
|
||||
label := fs.String("label", "", "key label (e.g. sonarr, radarr)")
|
||||
visibility := fs.String("visibility", "all", "visibility: all|wot|curated")
|
||||
if err := fs.Parse(args); err != nil {
|
||||
os.Exit(1)
|
||||
}
|
||||
@@ -87,7 +86,7 @@ func runAPIKeyCreate(args []string) {
|
||||
}
|
||||
key := hex.EncodeToString(keyBytes)
|
||||
|
||||
if err := database.CreateAPIKey(context.Background(), key, *label, *visibility); err != nil {
|
||||
if err := database.CreateAPIKey(context.Background(), key, *label); err != nil {
|
||||
fmt.Fprintf(os.Stderr, "error creating api key: %v\n", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
@@ -171,7 +171,7 @@ func Defaults() *Config {
|
||||
}
|
||||
|
||||
// Load reads config from defaults, then optionally from a YAML file, then from
|
||||
// environment variables with the NZBSTR_ prefix. If path is empty or the file
|
||||
// environment variables with the KINDEXR_ prefix. If path is empty or the file
|
||||
// does not exist, the file step is silently skipped.
|
||||
func Load(path string) (*Config, error) {
|
||||
k := koanf.New(".")
|
||||
|
||||
+1
-1
@@ -18,7 +18,7 @@ import (
|
||||
//go:embed migrations/*.sql
|
||||
var migrationsFS embed.FS
|
||||
|
||||
// DB wraps *sql.DB with nzbstr-specific helpers.
|
||||
// DB wraps *sql.DB with kindexr-specific helpers.
|
||||
type DB struct {
|
||||
*sql.DB
|
||||
}
|
||||
|
||||
@@ -0,0 +1,14 @@
|
||||
-- SQLite does not support DROP COLUMN before 3.35.0; use table rebuild.
|
||||
CREATE TABLE api_keys_new (
|
||||
key TEXT PRIMARY KEY,
|
||||
label TEXT NOT NULL,
|
||||
created_at INTEGER NOT NULL,
|
||||
last_used INTEGER
|
||||
);
|
||||
|
||||
INSERT INTO api_keys_new (key, label, created_at, last_used)
|
||||
SELECT key, label, created_at, last_used FROM api_keys;
|
||||
|
||||
DROP TABLE api_keys;
|
||||
|
||||
ALTER TABLE api_keys_new RENAME TO api_keys;
|
||||
+24
-9
@@ -230,18 +230,17 @@ func (d *DB) GetTrackers(ctx context.Context, eventID string) ([]string, error)
|
||||
|
||||
// APIKey holds a validated API key record.
|
||||
type APIKey struct {
|
||||
Key string
|
||||
Label string
|
||||
CreatedAt int64
|
||||
Visibility string
|
||||
Key string
|
||||
Label string
|
||||
CreatedAt int64
|
||||
}
|
||||
|
||||
// GetAPIKey returns the key record for the given raw key string, or nil if not found.
|
||||
func (d *DB) GetAPIKey(ctx context.Context, key string) (*APIKey, error) {
|
||||
var ak APIKey
|
||||
err := d.DB.QueryRowContext(ctx,
|
||||
`SELECT key, label, created_at, visibility FROM api_keys WHERE key = ?`, key,
|
||||
).Scan(&ak.Key, &ak.Label, &ak.CreatedAt, &ak.Visibility)
|
||||
`SELECT key, label, created_at FROM api_keys WHERE key = ?`, key,
|
||||
).Scan(&ak.Key, &ak.Label, &ak.CreatedAt)
|
||||
if err == sql.ErrNoRows {
|
||||
return nil, nil
|
||||
}
|
||||
@@ -254,10 +253,10 @@ func (d *DB) GetAPIKey(ctx context.Context, key string) (*APIKey, error) {
|
||||
}
|
||||
|
||||
// CreateAPIKey inserts a new API key record.
|
||||
func (d *DB) CreateAPIKey(ctx context.Context, key, label, visibility string) error {
|
||||
func (d *DB) CreateAPIKey(ctx context.Context, key, label string) error {
|
||||
_, err := d.DB.ExecContext(ctx,
|
||||
`INSERT INTO api_keys (key, label, created_at, visibility) VALUES (?, ?, ?, ?)`,
|
||||
key, label, time.Now().Unix(), visibility,
|
||||
`INSERT INTO api_keys (key, label, created_at) VALUES (?, ?, ?)`,
|
||||
key, label, time.Now().Unix(),
|
||||
)
|
||||
if err != nil {
|
||||
return fmt.Errorf("create api key: %w", err)
|
||||
@@ -265,6 +264,22 @@ func (d *DB) CreateAPIKey(ctx context.Context, key, label, visibility string) er
|
||||
return nil
|
||||
}
|
||||
|
||||
// GetRelayLastEvent returns the created_at of the most recent event seen from
|
||||
// the given relay, or 0 if none has been recorded.
|
||||
func (d *DB) GetRelayLastEvent(ctx context.Context, url string) (int64, error) {
|
||||
var ts sql.NullInt64
|
||||
err := d.DB.QueryRowContext(ctx,
|
||||
`SELECT last_event FROM relays WHERE url = ?`, url,
|
||||
).Scan(&ts)
|
||||
if err == sql.ErrNoRows {
|
||||
return 0, nil
|
||||
}
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("get relay last event: %w", err)
|
||||
}
|
||||
return ts.Int64, nil
|
||||
}
|
||||
|
||||
// UpsertRelay ensures a relay URL exists in the relays table.
|
||||
func (d *DB) UpsertRelay(ctx context.Context, url string) error {
|
||||
_, err := d.DB.ExecContext(ctx,
|
||||
|
||||
@@ -40,19 +40,30 @@ func (rd *Reader) Start(ctx context.Context) {
|
||||
}
|
||||
|
||||
// connectLoop keeps re-connecting to a single relay with exponential backoff.
|
||||
// Backoff resets to the base delay if the connection stayed up long enough to
|
||||
// be considered successful (30 seconds), so a briefly-dropped live relay
|
||||
// reconnects quickly rather than waiting minutes.
|
||||
func (rd *Reader) connectLoop(ctx context.Context, relayURL string) {
|
||||
backoff := 5 * time.Second
|
||||
const baseBackoff = 5 * time.Second
|
||||
const maxBackoff = 5 * time.Minute
|
||||
const successThreshold = 30 * time.Second
|
||||
|
||||
backoff := baseBackoff
|
||||
|
||||
for {
|
||||
if ctx.Err() != nil {
|
||||
return
|
||||
}
|
||||
|
||||
start := time.Now()
|
||||
if err := rd.runRelay(ctx, relayURL); err != nil {
|
||||
slog.Warn("relay disconnected", "url", relayURL, "err", err, "retry_in", backoff)
|
||||
}
|
||||
|
||||
if time.Since(start) >= successThreshold {
|
||||
backoff = baseBackoff
|
||||
}
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
@@ -79,9 +90,12 @@ func (rd *Reader) runRelay(ctx context.Context, relayURL string) error {
|
||||
_ = rd.db.UpdateRelaySync(ctx, relayURL, time.Now().Unix())
|
||||
slog.Info("relay connected", "url", relayURL)
|
||||
|
||||
since := nostrlib.Timestamp(time.Now().Add(
|
||||
-time.Duration(rd.cfg.BackfillDays) * 24 * time.Hour,
|
||||
).Unix())
|
||||
backfillSince := time.Now().Add(-time.Duration(rd.cfg.BackfillDays) * 24 * time.Hour).Unix()
|
||||
lastEvent, _ := rd.db.GetRelayLastEvent(ctx, relayURL)
|
||||
if lastEvent > backfillSince {
|
||||
backfillSince = lastEvent
|
||||
}
|
||||
since := nostrlib.Timestamp(backfillSince)
|
||||
|
||||
sub, err := relay.Subscribe(ctx, nostrlib.Filters{{
|
||||
Kinds: []int{nostrlib.KindTorrent},
|
||||
|
||||
@@ -67,19 +67,16 @@ func buildItem(baseURL string, row db.TorrentRow, trackers []string) RSSItem {
|
||||
|
||||
pubDate := time.Unix(row.CreatedAt, 0).UTC().Format(time.RFC1123Z)
|
||||
|
||||
var sizeVal int64
|
||||
if row.SizeBytes != nil {
|
||||
sizeVal = *row.SizeBytes
|
||||
}
|
||||
|
||||
attrs := []TorznabAttr{
|
||||
{Name: "infohash", Value: row.InfoHash},
|
||||
{Name: "magneturl", Value: magnet},
|
||||
{Name: "downloadvolumefactor", Value: "0"},
|
||||
{Name: "uploadvolumefactor", Value: "1"},
|
||||
{Name: "size", Value: fmt.Sprintf("%d", sizeVal)},
|
||||
{Name: "seeders", Value: "0"},
|
||||
{Name: "peers", Value: "0"},
|
||||
}
|
||||
var sizeBytes int64
|
||||
if row.SizeBytes != nil {
|
||||
sizeBytes = *row.SizeBytes
|
||||
attrs = append(attrs, TorznabAttr{Name: "size", Value: fmt.Sprintf("%d", sizeBytes)})
|
||||
}
|
||||
if row.NewznabCat != nil {
|
||||
parent := (*row.NewznabCat / 1000) * 1000
|
||||
@@ -109,10 +106,10 @@ func buildItem(baseURL string, row db.TorrentRow, trackers []string) RSSItem {
|
||||
GUID: RSSGUID{IsPermaLink: "false", Value: guid},
|
||||
Link: magnet,
|
||||
PubDate: pubDate,
|
||||
Size: sizeVal,
|
||||
Size: sizeBytes,
|
||||
Enclosure: RSSEnclosure{
|
||||
URL: magnet,
|
||||
Length: sizeVal,
|
||||
Length: sizeBytes,
|
||||
Type: "application/x-bittorrent",
|
||||
},
|
||||
Attrs: attrs,
|
||||
|
||||
@@ -17,7 +17,7 @@ import (
|
||||
"github.com/go-chi/chi/v5"
|
||||
)
|
||||
|
||||
func newTestServer(t *testing.T) (*httptest.Server, string) {
|
||||
func newTestServerWithDB(t *testing.T) (*httptest.Server, string, *db.DB) {
|
||||
t.Helper()
|
||||
|
||||
dir := t.TempDir()
|
||||
@@ -32,11 +32,10 @@ func newTestServer(t *testing.T) (*httptest.Server, string) {
|
||||
t.Fatalf("load config: %v", err)
|
||||
}
|
||||
|
||||
// Create an API key for testing.
|
||||
keyBytes := make([]byte, 32)
|
||||
rand.Read(keyBytes)
|
||||
apiKey := hex.EncodeToString(keyBytes)
|
||||
if err := database.CreateAPIKey(context.Background(), apiKey, "test", "all"); err != nil {
|
||||
if err := database.CreateAPIKey(context.Background(), apiKey, "test"); err != nil {
|
||||
t.Fatalf("create api key: %v", err)
|
||||
}
|
||||
|
||||
@@ -44,7 +43,12 @@ func newTestServer(t *testing.T) (*httptest.Server, string) {
|
||||
srv := torznab.New(cfg, database, "0.1.0-test")
|
||||
srv.Mount(r)
|
||||
|
||||
return httptest.NewServer(r), apiKey
|
||||
return httptest.NewServer(r), apiKey, database
|
||||
}
|
||||
|
||||
func newTestServer(t *testing.T) (*httptest.Server, string) {
|
||||
ts, apiKey, _ := newTestServerWithDB(t)
|
||||
return ts, apiKey
|
||||
}
|
||||
|
||||
func TestCapsEndpoint(t *testing.T) {
|
||||
@@ -115,17 +119,9 @@ func TestSearchEmpty(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestSearchWithResults(t *testing.T) {
|
||||
ts, apiKey := newTestServer(t)
|
||||
ts, apiKey, database := newTestServerWithDB(t)
|
||||
defer ts.Close()
|
||||
|
||||
// Insert a torrent directly via the DB.
|
||||
dir := t.TempDir()
|
||||
database, err := db.Open(filepath.Join(dir, "test2.db"))
|
||||
if err != nil {
|
||||
t.Fatalf("open db: %v", err)
|
||||
}
|
||||
defer database.Close()
|
||||
|
||||
cat := 5040
|
||||
rec := db.TorrentRecord{
|
||||
EventID: "cccc333300000000000000000000000000000000000000000000000000000000",
|
||||
@@ -142,8 +138,6 @@ func TestSearchWithResults(t *testing.T) {
|
||||
t.Fatalf("insert torrent: %v", err)
|
||||
}
|
||||
|
||||
// Search against the test server's DB (not the one we just inserted into).
|
||||
// Just verify the empty search returns valid RSS.
|
||||
resp, err := http.Get(ts.URL + "/api?t=search&q=breaking&apikey=" + apiKey)
|
||||
if err != nil {
|
||||
t.Fatalf("GET search: %v", err)
|
||||
@@ -153,6 +147,17 @@ func TestSearchWithResults(t *testing.T) {
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
t.Errorf("expected 200, got %d", resp.StatusCode)
|
||||
}
|
||||
|
||||
var rss torznab.RSS
|
||||
if err := xml.NewDecoder(resp.Body).Decode(&rss); err != nil {
|
||||
t.Fatalf("decode RSS: %v", err)
|
||||
}
|
||||
if len(rss.Channel.Items) == 0 {
|
||||
t.Error("expected at least one search result")
|
||||
}
|
||||
if rss.Channel.Items[0].Title != "Breaking.Bad.S01E01.1080p.WEB-DL" {
|
||||
t.Errorf("unexpected first result title: %q", rss.Channel.Items[0].Title)
|
||||
}
|
||||
}
|
||||
|
||||
func TestUnknownFunction(t *testing.T) {
|
||||
|
||||
Reference in New Issue
Block a user