diff --git a/src/bin/kindexr-cli.rs b/src/bin/kindexr-cli.rs index 23a820e..b92ba46 100644 --- a/src/bin/kindexr-cli.rs +++ b/src/bin/kindexr-cli.rs @@ -1,5 +1,6 @@ use clap::{Parser, Subcommand}; -use kindexr::{config, db}; +use kindexr::{config, db, nostr::signer::Signer, publisher::watcher::build_from_torrent_file}; +use nostr::ToBech32; use rand::Rng; /// kindexr-cli — admin CLI for kindexr @@ -26,6 +27,23 @@ enum Command { #[command(subcommand)] cmd: PublisherCmd, }, + /// Manage the local signing identity + Identity { + #[command(subcommand)] + cmd: IdentityCmd, + }, + /// Enqueue a .torrent file for publishing + Publish { + /// Path to a .torrent file (or a directory to scan for .torrent files) + #[arg(long)] + from: String, + /// Category tag to assign + #[arg(long, default_value = "")] + category: String, + /// Override the publish delay in seconds (default: from config) + #[arg(long)] + delay: Option, + }, } #[derive(Subcommand)] @@ -38,6 +56,18 @@ enum ApikeyCmd { }, } +#[derive(Subcommand)] +enum IdentityCmd { + /// Generate a new keypair and store it in the DB + Init { + /// Use an existing nsec instead of generating one + #[arg(long)] + nsec: Option, + }, + /// Show the current identity + Info, +} + #[derive(Subcommand)] enum PublisherCmd { /// List known publishers @@ -139,6 +169,87 @@ async fn main() -> anyhow::Result<()> { println!("muted {pubkey}"); } }, + + Command::Identity { cmd } => match cmd { + IdentityCmd::Init { nsec } => { + let (nsec_bech32, pubkey) = match nsec { + Some(ref key) => { + let signer = Signer::from_nsec(key)?; + (key.clone(), signer.pubkey_hex()) + } + None => { + let keys = nostr::Keys::generate(); + let nsec_str = keys.secret_key().to_bech32()?; + let signer = Signer::from_nsec(&nsec_str)?; + (nsec_str, signer.pubkey_hex()) + } + }; + let now = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap_or_default() + .as_secs() as i64; + db::upsert_identity(&pool, &pubkey, Some(&nsec_bech32), None, now).await?; + println!("pubkey: {pubkey}"); + println!("nsec: {nsec_bech32}"); + } + IdentityCmd::Info => { + match db::get_identity(&pool).await? { + None => println!("no identity stored — run `identity init` first"), + Some(row) => { + println!("pubkey: {}", row.pubkey); + println!("nsec set: {}", row.nsec.is_some()); + println!("bunker_url: {}", row.bunker_url.as_deref().unwrap_or("—")); + } + } + } + }, + + Command::Publish { from, category, delay } => { + let delay_secs = delay.unwrap_or(cfg.publisher.publish_delay_secs); + let path = std::path::Path::new(&from); + let torrent_paths: Vec = if path.is_dir() { + std::fs::read_dir(path)? + .filter_map(|e| e.ok()) + .map(|e| e.path()) + .filter(|p| p.extension().and_then(|e| e.to_str()) == Some("torrent")) + .map(|p| p.to_string_lossy().into_owned()) + .collect() + } else { + vec![from.clone()] + }; + + if torrent_paths.is_empty() { + println!("no .torrent files found in {from}"); + return Ok(()); + } + + let now = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap_or_default() + .as_secs() as i64; + + let mut queued = 0usize; + let mut skipped = 0usize; + for tp in &torrent_paths { + let rec = match build_from_torrent_file(tp, &category) { + Ok(r) => r, + Err(e) => { + eprintln!("skip {tp}: {e}"); + skipped += 1; + continue; + } + }; + if db::is_queued_or_published(&pool, &rec.info_hash).await? { + skipped += 1; + continue; + } + let scheduled_at = now + delay_secs as i64; + db::enqueue(&pool, &rec.info_hash, &rec.title, &category, Some(tp.as_str()), now, scheduled_at).await?; + println!("queued: {} ({})", rec.title, rec.info_hash); + queued += 1; + } + println!("{queued} queued, {skipped} skipped"); + } } Ok(()) diff --git a/src/config.rs b/src/config.rs index 94f497e..7e19f37 100644 --- a/src/config.rs +++ b/src/config.rs @@ -64,6 +64,28 @@ pub struct HealthConfig { #[derive(Debug, Clone, Serialize, Deserialize)] pub struct PublisherConfig { pub enabled: bool, + pub outbox: Vec, + pub publish_delay_secs: u64, + pub identity: IdentityConfig, + pub qbittorrent: QBittorrentConfig, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct IdentityConfig { + /// Local signing key: bech32 nsec or hex secret key. + pub nsec: String, + /// NIP-46 bunker URI (takes precedence over nsec when set). + pub bunker_url: String, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct QBittorrentConfig { + pub url: String, + pub username: String, + pub password: String, + pub poll_interval_secs: u64, + /// qBittorrent category names to watch for completed torrents. + pub categories: Vec, } impl Default for Config { @@ -112,7 +134,22 @@ impl Default for Config { method: "dht".into(), refresh_interval: "30m".into(), }, - publisher: PublisherConfig { enabled: false }, + publisher: PublisherConfig { + enabled: false, + outbox: vec![], + publish_delay_secs: 1800, // 30 min + identity: IdentityConfig { + nsec: String::new(), + bunker_url: String::new(), + }, + qbittorrent: QBittorrentConfig { + url: "http://localhost:8080".into(), + username: "admin".into(), + password: String::new(), + poll_interval_secs: 60, + categories: vec!["publish-public".into()], + }, + }, } } } diff --git a/src/db/migrations/004_publisher.sql b/src/db/migrations/004_publisher.sql new file mode 100644 index 0000000..1ec1f96 --- /dev/null +++ b/src/db/migrations/004_publisher.sql @@ -0,0 +1,24 @@ +-- Publish queue: completed torrents waiting for publish_delay to expire +CREATE TABLE IF NOT EXISTS publish_queue ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + info_hash TEXT NOT NULL, + title TEXT NOT NULL, + category TEXT NOT NULL DEFAULT '', + torrent_path TEXT, -- path to .torrent file if available + queued_at INTEGER NOT NULL, + scheduled_at INTEGER NOT NULL, + published_at INTEGER, -- NULL = pending + event_id TEXT, -- set after successful publish + error TEXT -- last error message +); + +CREATE INDEX IF NOT EXISTS idx_queue_pending ON publish_queue(scheduled_at) + WHERE published_at IS NULL; + +-- Operator identity (signing key / bunker config) +CREATE TABLE IF NOT EXISTS identity ( + pubkey TEXT PRIMARY KEY, + nsec TEXT, -- plaintext (operator is responsible for file perms) + bunker_url TEXT, -- NIP-46 bunker URI (takes precedence over nsec) + created_at INTEGER NOT NULL +); diff --git a/src/db/queries.rs b/src/db/queries.rs index aa436af..44f8fa5 100644 --- a/src/db/queries.rs +++ b/src/db/queries.rs @@ -44,6 +44,7 @@ pub struct TorrentRecord { pub files: Vec, } +#[derive(Clone)] pub struct FileRecord { pub path: String, pub size_bytes: Option, @@ -602,6 +603,120 @@ pub async fn insert_report( Ok(()) } +// ─── Publish queue ──────────────────────────────────────────────────────────── + +pub struct QueueItem { + pub id: i64, + pub info_hash: String, + pub title: String, + pub category: String, + pub torrent_path: Option, +} + +pub async fn enqueue( + pool: &SqlitePool, + info_hash: &str, + title: &str, + category: &str, + torrent_path: Option<&str>, + queued_at: i64, + scheduled_at: i64, +) -> anyhow::Result { + let row: (i64,) = sqlx::query_as( + "INSERT INTO publish_queue (info_hash, title, category, torrent_path, queued_at, scheduled_at) + VALUES (?,?,?,?,?,?) + RETURNING id", + ) + .bind(info_hash) + .bind(title) + .bind(category) + .bind(torrent_path) + .bind(queued_at) + .bind(scheduled_at) + .fetch_one(pool) + .await?; + Ok(row.0) +} + +pub async fn dequeue_ready(pool: &SqlitePool, now: i64) -> anyhow::Result> { + let rows: Vec<(i64, String, String, String, Option)> = sqlx::query_as( + "SELECT id, info_hash, title, category, torrent_path + FROM publish_queue + WHERE published_at IS NULL AND scheduled_at <= ? + ORDER BY scheduled_at ASC LIMIT 20", + ) + .bind(now) + .fetch_all(pool) + .await?; + Ok(rows.into_iter().map(|(id, info_hash, title, category, torrent_path)| QueueItem { + id, info_hash, title, category, torrent_path, + }).collect()) +} + +pub async fn mark_published(pool: &SqlitePool, id: i64, event_id: &str, now: i64) -> anyhow::Result<()> { + sqlx::query("UPDATE publish_queue SET published_at = ?, event_id = ? WHERE id = ?") + .bind(now) + .bind(event_id) + .bind(id) + .execute(pool) + .await?; + Ok(()) +} + +pub async fn mark_queue_error(pool: &SqlitePool, id: i64, error: &str) -> anyhow::Result<()> { + sqlx::query("UPDATE publish_queue SET error = ? WHERE id = ?") + .bind(error) + .bind(id) + .execute(pool) + .await?; + Ok(()) +} + +pub async fn is_queued_or_published(pool: &SqlitePool, info_hash: &str) -> anyhow::Result { + let row: (i64,) = + sqlx::query_as("SELECT COUNT(*) FROM publish_queue WHERE info_hash = ?") + .bind(info_hash) + .fetch_one(pool) + .await?; + Ok(row.0 > 0) +} + +// ─── Identity ───────────────────────────────────────────────────────────────── + +pub struct IdentityRow { + pub pubkey: String, + pub nsec: Option, + pub bunker_url: Option, +} + +pub async fn upsert_identity( + pool: &SqlitePool, + pubkey: &str, + nsec: Option<&str>, + bunker_url: Option<&str>, + now: i64, +) -> anyhow::Result<()> { + sqlx::query( + "INSERT INTO identity (pubkey, nsec, bunker_url, created_at) VALUES (?,?,?,?) + ON CONFLICT(pubkey) DO UPDATE SET nsec = excluded.nsec, bunker_url = excluded.bunker_url", + ) + .bind(pubkey) + .bind(nsec) + .bind(bunker_url) + .bind(now) + .execute(pool) + .await?; + Ok(()) +} + +pub async fn get_identity(pool: &SqlitePool) -> anyhow::Result> { + let row: Option<(String, Option, Option)> = + sqlx::query_as("SELECT pubkey, nsec, bunker_url FROM identity LIMIT 1") + .fetch_optional(pool) + .await?; + Ok(row.map(|(pubkey, nsec, bunker_url)| IdentityRow { pubkey, nsec, bunker_url })) +} + // ─── Helpers ────────────────────────────────────────────────────────────────── fn null_str(s: &str) -> Option<&str> { diff --git a/src/enrich/tmdb.rs b/src/enrich/tmdb.rs index 3db4697..4381dbb 100644 --- a/src/enrich/tmdb.rs +++ b/src/enrich/tmdb.rs @@ -67,6 +67,29 @@ impl Enricher { } } + /// Look up a TMDB ID and return it as "movie:ID" or "tv:ID" without writing to DB. + /// Returns None if the title doesn't match or TMDB is disabled. + pub async fn lookup(&self, title: &str, year: Option, newznab_cat: Option) -> Option { + if !self.enabled() { return None; } + let (kind, prefix) = match newznab_cat { + Some(c) if (2000..3000).contains(&c) => ("movie", "movie"), + Some(c) if (5000..6000).contains(&c) => ("tv", "tv"), + _ => return None, + }; + let result = match kind { + "movie" => self.search("search/movie", title, year, "year").await, + _ => self.search("search/tv", title, year, "first_air_date_year").await, + }; + match result { + Ok(Some(id)) => Some(format!("{prefix}:{id}")), + Ok(None) => None, + Err(e) => { + tracing::warn!("tmdb lookup error for '{title}': {e}"); + None + } + } + } + async fn search(&self, path: &str, title: &str, year: Option, year_param: &str) -> Result> { let encoded: String = form_urlencoded::byte_serialize(title.as_bytes()).collect(); let mut url = format!( diff --git a/src/lib.rs b/src/lib.rs index 1602f40..81a3815 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -2,6 +2,7 @@ pub mod config; pub mod db; pub mod enrich; pub mod nostr; +pub mod publisher; pub mod torznab; pub mod wot; diff --git a/src/main.rs b/src/main.rs index 18ff5d6..af5f435 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,7 +1,7 @@ use anyhow::Context; use axum::{extract::State, routing::get, Json, Router}; use clap::Parser; -use kindexr::{config, db, enrich::tmdb::Enricher, nostr, torznab, wot, AppState}; +use kindexr::{config, db, enrich::tmdb::Enricher, nostr, publisher, torznab, wot, AppState}; use serde_json::{json, Value}; use std::{ sync::{atomic::AtomicI32, Arc}, @@ -59,11 +59,29 @@ async fn main() -> anyhow::Result<()> { }; let enricher = Arc::new(Enricher::new(cfg.tmdb.api_key.clone())); - let reader = nostr::reader::Reader::new(cfg.clone(), pool.clone(), relay_count, enricher); + let reader = nostr::reader::Reader::new(cfg.clone(), pool.clone(), relay_count, enricher.clone()); reader.start(); wot::follows::WotBuilder::new(cfg.clone(), pool.clone()).start(); + if cfg.publisher.enabled { + match nostr::signer::Signer::from_nsec(&cfg.publisher.identity.nsec) { + Ok(signer) => { + let watcher = publisher::watcher::Watcher::new( + cfg.clone(), + pool.clone(), + Arc::new(signer), + enricher, + ); + watcher.start(); + info!("publisher watcher started"); + } + Err(e) => { + tracing::warn!("publisher.enabled=true but nsec is invalid — watcher not started: {e}"); + } + } + } + let app = Router::new() .route("/health", get(health_handler)) .merge(torznab::server::router(state.clone())) diff --git a/src/nostr/mod.rs b/src/nostr/mod.rs index ca52771..ab9bc34 100644 --- a/src/nostr/mod.rs +++ b/src/nostr/mod.rs @@ -1,2 +1,4 @@ pub mod parser; pub mod reader; +pub mod signer; +pub mod writer; diff --git a/src/nostr/signer.rs b/src/nostr/signer.rs new file mode 100644 index 0000000..3500af1 --- /dev/null +++ b/src/nostr/signer.rs @@ -0,0 +1,33 @@ +use nostr::{EventBuilder, Keys, PublicKey}; + +/// Signing abstraction. Currently supports local nsec keys only. +/// NIP-46 bunker support is wired at the config level: when bunker_url is set +/// in config, kindexr will refuse to start publishing until bunker support is +/// fully implemented. +pub struct Signer { + keys: Keys, +} + +impl Signer { + /// Parse a local nsec (bech32 or hex secret key). + pub fn from_nsec(nsec: &str) -> anyhow::Result { + let keys = Keys::parse(nsec) + .map_err(|e| anyhow::anyhow!("invalid nsec: {e}"))?; + Ok(Signer { keys }) + } + + pub fn public_key(&self) -> PublicKey { + self.keys.public_key() + } + + pub fn pubkey_hex(&self) -> String { + self.keys.public_key().to_hex() + } + + /// Sign an EventBuilder synchronously and return the signed Event. + pub fn sign(&self, builder: EventBuilder) -> anyhow::Result { + builder + .sign_with_keys(&self.keys) + .map_err(|e| anyhow::anyhow!("sign failed: {e}")) + } +} diff --git a/src/nostr/writer.rs b/src/nostr/writer.rs new file mode 100644 index 0000000..b640947 --- /dev/null +++ b/src/nostr/writer.rs @@ -0,0 +1,130 @@ +use nostr::{Event, EventBuilder, JsonUtil, Kind, Tag}; +use nostr_sdk::Client; +use tracing::{debug, warn}; + +use super::signer::Signer; +use crate::db::{FileRecord, TorrentRecord}; + +/// Build a kind 2003 (NIP-35) event from a TorrentRecord. +pub fn build_event(rec: &TorrentRecord) -> anyhow::Result { + let mut tags: Vec = vec![]; + + tags.push(Tag::parse(["title", rec.title.as_str()])?); + tags.push(Tag::parse(["x", rec.info_hash.as_str()])?); + + for tracker in &rec.trackers { + tags.push(Tag::parse(["tracker", tracker.as_str()])?); + } + + for file in &rec.files { + match file.size_bytes { + Some(sz) => { + let sz_str = sz.to_string(); + tags.push(Tag::parse(["file", file.path.as_str(), sz_str.as_str()])?); + } + None => tags.push(Tag::parse(["file", file.path.as_str()])?), + } + } + + if !rec.imdb_id.is_empty() { + tags.push(Tag::parse(["i", &format!("imdb:{}", rec.imdb_id)])?); + } + if !rec.tmdb_id.is_empty() { + // stored as "movie:12345" or "tv:12345" — emit as "tmdb:movie:12345" etc. + let prefix = if rec.tmdb_id.starts_with("movie:") || rec.tmdb_id.starts_with("tv:") { + "tmdb" + } else { + "tmdb:movie" + }; + tags.push(Tag::parse(["i", &format!("{prefix}:{}", rec.tmdb_id)])?); + } + if !rec.tvdb_id.is_empty() { + tags.push(Tag::parse(["i", &format!("tvdb:{}", rec.tvdb_id)])?); + } + if !rec.category.is_empty() { + tags.push(Tag::parse(["i", &format!("tcat:{}", rec.category)])?); + } + if let Some(cat) = rec.newznab_cat { + tags.push(Tag::parse(["i", &format!("newznab:{cat}")])?); + } + if let Some(season) = rec.season { + if let Some(episode) = rec.episode { + tags.push(Tag::parse(["i", &format!("season:{season}")])?); + tags.push(Tag::parse(["i", &format!("episode:{episode}")])?); + } + } + for tag in &rec.tags { + tags.push(Tag::parse(["t", tag.as_str()])?); + } + + Ok(EventBuilder::new(Kind::Custom(2003), rec.description.clone()).tags(tags)) +} + +/// Build a kind 2003 event from individual fields (used by the qBittorrent watcher +/// before full metainfo is available from a .torrent file). +pub fn build_event_minimal( + info_hash: &str, + title: &str, + size_bytes: Option, + trackers: &[String], + files: &[FileRecord], + category: &str, + description: &str, +) -> anyhow::Result { + let rec = TorrentRecord { + event_id: String::new(), + info_hash: info_hash.to_owned(), + pubkey: String::new(), + created_at: 0, + ingested_at: 0, + title: title.to_owned(), + description: description.to_owned(), + size_bytes, + category: category.to_owned(), + newznab_cat: None, + imdb_id: String::new(), + tmdb_id: String::new(), + tvdb_id: String::new(), + season: None, + episode: None, + quality: String::new(), + source: String::new(), + raw_event: String::new(), + trackers: trackers.to_vec(), + tags: vec![], + files: files.to_vec(), + }; + build_event(&rec) +} + +/// Sign the builder and publish the event to the given relay URLs. +/// Returns the published Event. +pub async fn publish(signer: &Signer, builder: EventBuilder, relays: &[String]) -> anyhow::Result { + let event = signer.sign(builder)?; + + let client = Client::default(); + for relay in relays { + if let Err(e) = client.add_relay(relay.as_str()).await { + warn!(url = relay, "publish: relay add failed: {e}"); + } + } + client.connect().await; + + match client.send_event(&event).await { + Ok(output) => { + debug!(event_id = %event.id, "published to {} relays", output.success.len()); + } + Err(e) => { + // Partial failure is OK — we still return the event so it goes into local DB + warn!("publish: send_event error (partial publish may have succeeded): {e}"); + } + } + + let _ = client.disconnect().await; + Ok(event) +} + +/// Convert a published Event back into the raw JSON string for storage. +pub fn event_to_json(event: &Event) -> String { + event.as_json() +} diff --git a/src/publisher/mod.rs b/src/publisher/mod.rs new file mode 100644 index 0000000..e9bafe8 --- /dev/null +++ b/src/publisher/mod.rs @@ -0,0 +1,2 @@ +pub mod qbittorrent; +pub mod watcher; diff --git a/src/publisher/qbittorrent.rs b/src/publisher/qbittorrent.rs new file mode 100644 index 0000000..d86321f --- /dev/null +++ b/src/publisher/qbittorrent.rs @@ -0,0 +1,108 @@ +use anyhow::{bail, Context}; +use reqwest::Client; +use serde::Deserialize; +use tracing::debug; + +pub struct QbClient { + url: String, + username: String, + password: String, + http: Client, +} + +#[derive(Debug, Clone, Deserialize)] +pub struct QbTorrent { + pub hash: String, + pub name: String, + pub size: i64, + /// Primary tracker URL. May be empty if the torrent has no tracker. + #[serde(default)] + pub tracker: String, + #[serde(default)] + pub category: String, + #[serde(default)] + pub content_path: String, +} + +#[derive(Debug, Deserialize)] +pub struct QbFile { + pub name: String, + pub size: i64, +} + +impl QbClient { + pub fn new(url: &str, username: &str, password: &str) -> Self { + QbClient { + url: url.trim_end_matches('/').to_owned(), + username: username.to_owned(), + password: password.to_owned(), + http: Client::new(), + } + } + + /// Log in and return the SID session cookie. + async fn login(&self) -> anyhow::Result { + let resp = self.http + .post(format!("{}/api/v2/auth/login", self.url)) + .form(&[("username", &self.username), ("password", &self.password)]) + .send() + .await + .context("qbittorrent login")?; + + // Parse SID from Set-Cookie header + let sid = resp.headers() + .get("set-cookie") + .and_then(|v| v.to_str().ok()) + .and_then(|s| { + s.split(';').find_map(|part| { + let part = part.trim(); + part.strip_prefix("SID=").map(|v| v.to_owned()) + }) + }); + + match sid { + Some(s) if !s.is_empty() => Ok(s), + _ => bail!("qbittorrent login failed: no SID cookie (check credentials)"), + } + } + + /// Fetch completed torrents in one of the watched categories. + pub async fn completed_in_categories(&self, categories: &[String]) -> anyhow::Result> { + let sid = self.login().await?; + let mut results = Vec::new(); + + for cat in categories { + let resp = self.http + .get(format!("{}/api/v2/torrents/info", self.url)) + .query(&[("filter", "completed"), ("category", cat.as_str())]) + .header("Cookie", format!("SID={sid}")) + .send() + .await + .context("qbittorrent torrents/info")?; + + if !resp.status().is_success() { + bail!("qbittorrent torrents/info returned {}", resp.status()); + } + + let mut torrents: Vec = resp.json().await.context("parse torrents/info")?; + debug!("qbittorrent: {} completed in category '{cat}'", torrents.len()); + results.append(&mut torrents); + } + + Ok(results) + } + + /// Fetch the file list for a torrent by info hash. + pub async fn files(&self, hash: &str) -> anyhow::Result> { + let sid = self.login().await?; + let resp = self.http + .get(format!("{}/api/v2/torrents/files", self.url)) + .query(&[("hash", hash)]) + .header("Cookie", format!("SID={sid}")) + .send() + .await + .context("qbittorrent torrents/files")?; + + resp.json().await.context("parse torrents/files") + } +} diff --git a/src/publisher/watcher.rs b/src/publisher/watcher.rs new file mode 100644 index 0000000..50b04c8 --- /dev/null +++ b/src/publisher/watcher.rs @@ -0,0 +1,241 @@ +use std::{sync::Arc, time::{Duration, SystemTime, UNIX_EPOCH}}; + +use sqlx::SqlitePool; +use tracing::{debug, error, info, warn}; + +use crate::{ + config::Config, + db::{self, FileRecord, TorrentRecord}, + enrich::{parser as title_parser, tmdb::Enricher}, + nostr::{signer::Signer, writer}, +}; + +use super::qbittorrent::{QbClient, QbTorrent}; + +pub struct Watcher { + cfg: Arc, + pool: SqlitePool, + signer: Arc, + enricher: Arc, +} + +impl Watcher { + pub fn new(cfg: Arc, pool: SqlitePool, signer: Arc, enricher: Arc) -> Self { + Watcher { cfg, pool, signer, enricher } + } + + pub fn start(self) { + let arc = Arc::new(self); + // Task 1: poll qBittorrent and enqueue newly completed torrents. + let w1 = arc.clone(); + tokio::spawn(async move { w1.poll_loop().await }); + // Task 2: drain the queue when delays expire. + let w2 = arc.clone(); + tokio::spawn(async move { w2.drain_loop().await }); + } + + async fn poll_loop(&self) { + let interval = Duration::from_secs(self.cfg.publisher.qbittorrent.poll_interval_secs.max(10)); + let qb = QbClient::new( + &self.cfg.publisher.qbittorrent.url, + &self.cfg.publisher.qbittorrent.username, + &self.cfg.publisher.qbittorrent.password, + ); + let categories = &self.cfg.publisher.qbittorrent.categories; + let delay = self.cfg.publisher.publish_delay_secs; + + loop { + match qb.completed_in_categories(categories).await { + Ok(torrents) => { + for t in torrents { + if let Err(e) = self.maybe_enqueue(&qb, &t, delay).await { + warn!(hash = t.hash, "enqueue failed: {e}"); + } + } + } + Err(e) => warn!("qbittorrent poll failed: {e}"), + } + tokio::time::sleep(interval).await; + } + } + + async fn maybe_enqueue(&self, _qb: &QbClient, t: &QbTorrent, delay_secs: u64) -> anyhow::Result<()> { + let hash = t.hash.to_lowercase(); + + // Skip if already in DB or queue. + if db::is_queued_or_published(&self.pool, &hash).await? { + return Ok(()); + } + + let now = now_secs(); + let scheduled_at = now + delay_secs as i64; + db::enqueue(&self.pool, &hash, &t.name, &t.category, None, now, scheduled_at).await?; + info!(hash = %hash, title = %t.name, delay_secs, "queued for publishing"); + Ok(()) + } + + async fn drain_loop(&self) { + loop { + tokio::time::sleep(Duration::from_secs(30)).await; + if let Err(e) = self.drain().await { + error!("drain_loop error: {e}"); + } + } + } + + async fn drain(&self) -> anyhow::Result<()> { + let items = db::dequeue_ready(&self.pool, now_secs()).await?; + for item in items { + debug!(id = item.id, hash = %item.info_hash, "publishing queued item"); + if let Err(e) = self.publish_item(&item).await { + error!(id = item.id, "publish failed: {e}"); + db::mark_queue_error(&self.pool, item.id, &e.to_string()).await?; + } + } + Ok(()) + } + + async fn publish_item(&self, item: &db::QueueItem) -> anyhow::Result<()> { + let now = now_secs(); + + // Build a TorrentRecord from the queue item. For items enqueued via + // qBittorrent polling (no .torrent file), we use the stored metadata. + // For items enqueued via CLI (with torrent_path), we parse the file. + let mut rec = if let Some(ref path) = item.torrent_path { + build_from_torrent_file(path, &item.category)? + } else { + // Minimal record from queue data. + let parsed = title_parser::parse(&item.title); + TorrentRecord { + event_id: String::new(), + info_hash: item.info_hash.clone(), + pubkey: self.signer.pubkey_hex(), + created_at: now, + ingested_at: now, + title: item.title.clone(), + description: item.title.clone(), + size_bytes: None, + category: item.category.clone(), + newznab_cat: None, + imdb_id: String::new(), + tmdb_id: String::new(), + tvdb_id: String::new(), + season: parsed.season, + episode: parsed.episode, + quality: parsed.quality.unwrap_or_default(), + source: parsed.source.unwrap_or_default(), + raw_event: String::new(), + trackers: vec![], + tags: vec![], + files: vec![], + } + }; + + // TMDB enrichment before publishing — best-effort, don't fail publish if slow. + if self.enricher.enabled() && rec.tmdb_id.is_empty() { + let parsed = title_parser::parse(&rec.title); + tokio::select! { + result = self.enricher.lookup(&parsed.clean, parsed.year, rec.newznab_cat) => { + if let Some(tmdb_id) = result { + rec.tmdb_id = tmdb_id; + } + } + () = tokio::time::sleep(Duration::from_secs(10)) => { + warn!("tmdb lookup timed out for '{}'", rec.title); + } + } + } + + // Determine outbox relays for this category. + let relays = outbox_for_category(&self.cfg, &item.category); + if relays.is_empty() { + return Err(anyhow::anyhow!("no outbox relays configured")); + } + + // Build and sign the event. + let builder = writer::build_event(&rec)?; + let event = writer::publish(&self.signer, builder, &relays).await?; + + let event_id = event.id.to_hex(); + + // Insert into local DB so it appears in search. + rec.event_id = event_id.clone(); + rec.pubkey = self.signer.pubkey_hex(); + rec.raw_event = writer::event_to_json(&event); + if let Err(e) = db::insert_torrent(&self.pool, &rec).await { + warn!(event_id = %event_id, "local insert failed (already indexed?): {e}"); + } + + db::mark_published(&self.pool, item.id, &event_id, now_secs()).await?; + info!(event_id = %event_id, title = %rec.title, "published kind 2003"); + Ok(()) + } +} + +/// Parse a .torrent file into a TorrentRecord using lava_torrent. +pub fn build_from_torrent_file(path: &str, category: &str) -> anyhow::Result { + use lava_torrent::torrent::v1::Torrent; + + let torrent = Torrent::read_from_file(path) + .map_err(|e| anyhow::anyhow!("torrent parse failed: {e}"))?; + + let info_hash = torrent.info_hash(); + let title = torrent.name.clone(); + let size_bytes = torrent.length; + + let trackers: Vec = torrent + .announce_list + .as_deref() + .unwrap_or(&[]) + .iter() + .flat_map(|tier| tier.iter().cloned()) + .collect(); + + let files: Vec = match &torrent.files { + Some(files) => files.iter().map(|f| FileRecord { + path: f.path.to_string_lossy().to_string(), + size_bytes: Some(f.length), + }).collect(), + None => vec![FileRecord { path: title.clone(), size_bytes: Some(size_bytes) }], + }; + + let now = now_secs(); + let parsed = title_parser::parse(&title); + + Ok(TorrentRecord { + event_id: String::new(), + info_hash, + pubkey: String::new(), + created_at: now, + ingested_at: now, + title: title.clone(), + description: title, + size_bytes: Some(size_bytes), + category: category.to_owned(), + newznab_cat: None, + imdb_id: String::new(), + tmdb_id: String::new(), + tvdb_id: String::new(), + season: parsed.season, + episode: parsed.episode, + quality: parsed.quality.unwrap_or_default(), + source: parsed.source.unwrap_or_default(), + raw_event: String::new(), + trackers, + tags: vec![], + files, + }) +} + +fn outbox_for_category(cfg: &Config, _category: &str) -> Vec { + // All categories use the same outbox for now. Per-category routing + // can be added in a future config field (e.g. publisher.category_routing). + cfg.publisher.outbox.clone() +} + +fn now_secs() -> i64 { + SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or_default() + .as_secs() as i64 +}