feat: Phase 4 — publisher, qBittorrent watcher, identity CLI

Adds the full writer/publisher stack: NIP-35 event signing and relay
delivery, qBittorrent polling with publish-delay queue, lava_torrent
.torrent file parsing, TMDB inline lookup before publish, and
kindexr-cli identity/publish subcommands.
This commit is contained in:
2026-05-17 12:43:21 -07:00
parent b6705d5b85
commit f98e6f8dfa
13 changed files with 849 additions and 4 deletions
+112 -1
View File
@@ -1,5 +1,6 @@
use clap::{Parser, Subcommand};
use kindexr::{config, db};
use kindexr::{config, db, nostr::signer::Signer, publisher::watcher::build_from_torrent_file};
use nostr::ToBech32;
use rand::Rng;
/// kindexr-cli — admin CLI for kindexr
@@ -26,6 +27,23 @@ enum Command {
#[command(subcommand)]
cmd: PublisherCmd,
},
/// Manage the local signing identity
Identity {
#[command(subcommand)]
cmd: IdentityCmd,
},
/// Enqueue a .torrent file for publishing
Publish {
/// Path to a .torrent file (or a directory to scan for .torrent files)
#[arg(long)]
from: String,
/// Category tag to assign
#[arg(long, default_value = "")]
category: String,
/// Override the publish delay in seconds (default: from config)
#[arg(long)]
delay: Option<u64>,
},
}
#[derive(Subcommand)]
@@ -38,6 +56,18 @@ enum ApikeyCmd {
},
}
#[derive(Subcommand)]
enum IdentityCmd {
/// Generate a new keypair and store it in the DB
Init {
/// Use an existing nsec instead of generating one
#[arg(long)]
nsec: Option<String>,
},
/// Show the current identity
Info,
}
#[derive(Subcommand)]
enum PublisherCmd {
/// List known publishers
@@ -139,6 +169,87 @@ async fn main() -> anyhow::Result<()> {
println!("muted {pubkey}");
}
},
Command::Identity { cmd } => match cmd {
IdentityCmd::Init { nsec } => {
let (nsec_bech32, pubkey) = match nsec {
Some(ref key) => {
let signer = Signer::from_nsec(key)?;
(key.clone(), signer.pubkey_hex())
}
None => {
let keys = nostr::Keys::generate();
let nsec_str = keys.secret_key().to_bech32()?;
let signer = Signer::from_nsec(&nsec_str)?;
(nsec_str, signer.pubkey_hex())
}
};
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs() as i64;
db::upsert_identity(&pool, &pubkey, Some(&nsec_bech32), None, now).await?;
println!("pubkey: {pubkey}");
println!("nsec: {nsec_bech32}");
}
IdentityCmd::Info => {
match db::get_identity(&pool).await? {
None => println!("no identity stored — run `identity init` first"),
Some(row) => {
println!("pubkey: {}", row.pubkey);
println!("nsec set: {}", row.nsec.is_some());
println!("bunker_url: {}", row.bunker_url.as_deref().unwrap_or(""));
}
}
}
},
Command::Publish { from, category, delay } => {
let delay_secs = delay.unwrap_or(cfg.publisher.publish_delay_secs);
let path = std::path::Path::new(&from);
let torrent_paths: Vec<String> = if path.is_dir() {
std::fs::read_dir(path)?
.filter_map(|e| e.ok())
.map(|e| e.path())
.filter(|p| p.extension().and_then(|e| e.to_str()) == Some("torrent"))
.map(|p| p.to_string_lossy().into_owned())
.collect()
} else {
vec![from.clone()]
};
if torrent_paths.is_empty() {
println!("no .torrent files found in {from}");
return Ok(());
}
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs() as i64;
let mut queued = 0usize;
let mut skipped = 0usize;
for tp in &torrent_paths {
let rec = match build_from_torrent_file(tp, &category) {
Ok(r) => r,
Err(e) => {
eprintln!("skip {tp}: {e}");
skipped += 1;
continue;
}
};
if db::is_queued_or_published(&pool, &rec.info_hash).await? {
skipped += 1;
continue;
}
let scheduled_at = now + delay_secs as i64;
db::enqueue(&pool, &rec.info_hash, &rec.title, &category, Some(tp.as_str()), now, scheduled_at).await?;
println!("queued: {} ({})", rec.title, rec.info_hash);
queued += 1;
}
println!("{queued} queued, {skipped} skipped");
}
}
Ok(())
+38 -1
View File
@@ -64,6 +64,28 @@ pub struct HealthConfig {
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PublisherConfig {
pub enabled: bool,
pub outbox: Vec<String>,
pub publish_delay_secs: u64,
pub identity: IdentityConfig,
pub qbittorrent: QBittorrentConfig,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct IdentityConfig {
/// Local signing key: bech32 nsec or hex secret key.
pub nsec: String,
/// NIP-46 bunker URI (takes precedence over nsec when set).
pub bunker_url: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct QBittorrentConfig {
pub url: String,
pub username: String,
pub password: String,
pub poll_interval_secs: u64,
/// qBittorrent category names to watch for completed torrents.
pub categories: Vec<String>,
}
impl Default for Config {
@@ -112,7 +134,22 @@ impl Default for Config {
method: "dht".into(),
refresh_interval: "30m".into(),
},
publisher: PublisherConfig { enabled: false },
publisher: PublisherConfig {
enabled: false,
outbox: vec![],
publish_delay_secs: 1800, // 30 min
identity: IdentityConfig {
nsec: String::new(),
bunker_url: String::new(),
},
qbittorrent: QBittorrentConfig {
url: "http://localhost:8080".into(),
username: "admin".into(),
password: String::new(),
poll_interval_secs: 60,
categories: vec!["publish-public".into()],
},
},
}
}
}
+24
View File
@@ -0,0 +1,24 @@
-- Publish queue: completed torrents waiting for publish_delay to expire
CREATE TABLE IF NOT EXISTS publish_queue (
id INTEGER PRIMARY KEY AUTOINCREMENT,
info_hash TEXT NOT NULL,
title TEXT NOT NULL,
category TEXT NOT NULL DEFAULT '',
torrent_path TEXT, -- path to .torrent file if available
queued_at INTEGER NOT NULL,
scheduled_at INTEGER NOT NULL,
published_at INTEGER, -- NULL = pending
event_id TEXT, -- set after successful publish
error TEXT -- last error message
);
CREATE INDEX IF NOT EXISTS idx_queue_pending ON publish_queue(scheduled_at)
WHERE published_at IS NULL;
-- Operator identity (signing key / bunker config)
CREATE TABLE IF NOT EXISTS identity (
pubkey TEXT PRIMARY KEY,
nsec TEXT, -- plaintext (operator is responsible for file perms)
bunker_url TEXT, -- NIP-46 bunker URI (takes precedence over nsec)
created_at INTEGER NOT NULL
);
+115
View File
@@ -44,6 +44,7 @@ pub struct TorrentRecord {
pub files: Vec<FileRecord>,
}
#[derive(Clone)]
pub struct FileRecord {
pub path: String,
pub size_bytes: Option<i64>,
@@ -602,6 +603,120 @@ pub async fn insert_report(
Ok(())
}
// ─── Publish queue ────────────────────────────────────────────────────────────
pub struct QueueItem {
pub id: i64,
pub info_hash: String,
pub title: String,
pub category: String,
pub torrent_path: Option<String>,
}
pub async fn enqueue(
pool: &SqlitePool,
info_hash: &str,
title: &str,
category: &str,
torrent_path: Option<&str>,
queued_at: i64,
scheduled_at: i64,
) -> anyhow::Result<i64> {
let row: (i64,) = sqlx::query_as(
"INSERT INTO publish_queue (info_hash, title, category, torrent_path, queued_at, scheduled_at)
VALUES (?,?,?,?,?,?)
RETURNING id",
)
.bind(info_hash)
.bind(title)
.bind(category)
.bind(torrent_path)
.bind(queued_at)
.bind(scheduled_at)
.fetch_one(pool)
.await?;
Ok(row.0)
}
pub async fn dequeue_ready(pool: &SqlitePool, now: i64) -> anyhow::Result<Vec<QueueItem>> {
let rows: Vec<(i64, String, String, String, Option<String>)> = sqlx::query_as(
"SELECT id, info_hash, title, category, torrent_path
FROM publish_queue
WHERE published_at IS NULL AND scheduled_at <= ?
ORDER BY scheduled_at ASC LIMIT 20",
)
.bind(now)
.fetch_all(pool)
.await?;
Ok(rows.into_iter().map(|(id, info_hash, title, category, torrent_path)| QueueItem {
id, info_hash, title, category, torrent_path,
}).collect())
}
pub async fn mark_published(pool: &SqlitePool, id: i64, event_id: &str, now: i64) -> anyhow::Result<()> {
sqlx::query("UPDATE publish_queue SET published_at = ?, event_id = ? WHERE id = ?")
.bind(now)
.bind(event_id)
.bind(id)
.execute(pool)
.await?;
Ok(())
}
pub async fn mark_queue_error(pool: &SqlitePool, id: i64, error: &str) -> anyhow::Result<()> {
sqlx::query("UPDATE publish_queue SET error = ? WHERE id = ?")
.bind(error)
.bind(id)
.execute(pool)
.await?;
Ok(())
}
pub async fn is_queued_or_published(pool: &SqlitePool, info_hash: &str) -> anyhow::Result<bool> {
let row: (i64,) =
sqlx::query_as("SELECT COUNT(*) FROM publish_queue WHERE info_hash = ?")
.bind(info_hash)
.fetch_one(pool)
.await?;
Ok(row.0 > 0)
}
// ─── Identity ─────────────────────────────────────────────────────────────────
pub struct IdentityRow {
pub pubkey: String,
pub nsec: Option<String>,
pub bunker_url: Option<String>,
}
pub async fn upsert_identity(
pool: &SqlitePool,
pubkey: &str,
nsec: Option<&str>,
bunker_url: Option<&str>,
now: i64,
) -> anyhow::Result<()> {
sqlx::query(
"INSERT INTO identity (pubkey, nsec, bunker_url, created_at) VALUES (?,?,?,?)
ON CONFLICT(pubkey) DO UPDATE SET nsec = excluded.nsec, bunker_url = excluded.bunker_url",
)
.bind(pubkey)
.bind(nsec)
.bind(bunker_url)
.bind(now)
.execute(pool)
.await?;
Ok(())
}
pub async fn get_identity(pool: &SqlitePool) -> anyhow::Result<Option<IdentityRow>> {
let row: Option<(String, Option<String>, Option<String>)> =
sqlx::query_as("SELECT pubkey, nsec, bunker_url FROM identity LIMIT 1")
.fetch_optional(pool)
.await?;
Ok(row.map(|(pubkey, nsec, bunker_url)| IdentityRow { pubkey, nsec, bunker_url }))
}
// ─── Helpers ──────────────────────────────────────────────────────────────────
fn null_str(s: &str) -> Option<&str> {
+23
View File
@@ -67,6 +67,29 @@ impl Enricher {
}
}
/// Look up a TMDB ID and return it as "movie:ID" or "tv:ID" without writing to DB.
/// Returns None if the title doesn't match or TMDB is disabled.
pub async fn lookup(&self, title: &str, year: Option<i32>, newznab_cat: Option<i32>) -> Option<String> {
if !self.enabled() { return None; }
let (kind, prefix) = match newznab_cat {
Some(c) if (2000..3000).contains(&c) => ("movie", "movie"),
Some(c) if (5000..6000).contains(&c) => ("tv", "tv"),
_ => return None,
};
let result = match kind {
"movie" => self.search("search/movie", title, year, "year").await,
_ => self.search("search/tv", title, year, "first_air_date_year").await,
};
match result {
Ok(Some(id)) => Some(format!("{prefix}:{id}")),
Ok(None) => None,
Err(e) => {
tracing::warn!("tmdb lookup error for '{title}': {e}");
None
}
}
}
async fn search(&self, path: &str, title: &str, year: Option<i32>, year_param: &str) -> Result<Option<i64>> {
let encoded: String = form_urlencoded::byte_serialize(title.as_bytes()).collect();
let mut url = format!(
+1
View File
@@ -2,6 +2,7 @@ pub mod config;
pub mod db;
pub mod enrich;
pub mod nostr;
pub mod publisher;
pub mod torznab;
pub mod wot;
+20 -2
View File
@@ -1,7 +1,7 @@
use anyhow::Context;
use axum::{extract::State, routing::get, Json, Router};
use clap::Parser;
use kindexr::{config, db, enrich::tmdb::Enricher, nostr, torznab, wot, AppState};
use kindexr::{config, db, enrich::tmdb::Enricher, nostr, publisher, torznab, wot, AppState};
use serde_json::{json, Value};
use std::{
sync::{atomic::AtomicI32, Arc},
@@ -59,11 +59,29 @@ async fn main() -> anyhow::Result<()> {
};
let enricher = Arc::new(Enricher::new(cfg.tmdb.api_key.clone()));
let reader = nostr::reader::Reader::new(cfg.clone(), pool.clone(), relay_count, enricher);
let reader = nostr::reader::Reader::new(cfg.clone(), pool.clone(), relay_count, enricher.clone());
reader.start();
wot::follows::WotBuilder::new(cfg.clone(), pool.clone()).start();
if cfg.publisher.enabled {
match nostr::signer::Signer::from_nsec(&cfg.publisher.identity.nsec) {
Ok(signer) => {
let watcher = publisher::watcher::Watcher::new(
cfg.clone(),
pool.clone(),
Arc::new(signer),
enricher,
);
watcher.start();
info!("publisher watcher started");
}
Err(e) => {
tracing::warn!("publisher.enabled=true but nsec is invalid — watcher not started: {e}");
}
}
}
let app = Router::new()
.route("/health", get(health_handler))
.merge(torznab::server::router(state.clone()))
+2
View File
@@ -1,2 +1,4 @@
pub mod parser;
pub mod reader;
pub mod signer;
pub mod writer;
+33
View File
@@ -0,0 +1,33 @@
use nostr::{EventBuilder, Keys, PublicKey};
/// Signing abstraction. Currently supports local nsec keys only.
/// NIP-46 bunker support is wired at the config level: when bunker_url is set
/// in config, kindexr will refuse to start publishing until bunker support is
/// fully implemented.
pub struct Signer {
keys: Keys,
}
impl Signer {
/// Parse a local nsec (bech32 or hex secret key).
pub fn from_nsec(nsec: &str) -> anyhow::Result<Self> {
let keys = Keys::parse(nsec)
.map_err(|e| anyhow::anyhow!("invalid nsec: {e}"))?;
Ok(Signer { keys })
}
pub fn public_key(&self) -> PublicKey {
self.keys.public_key()
}
pub fn pubkey_hex(&self) -> String {
self.keys.public_key().to_hex()
}
/// Sign an EventBuilder synchronously and return the signed Event.
pub fn sign(&self, builder: EventBuilder) -> anyhow::Result<nostr::Event> {
builder
.sign_with_keys(&self.keys)
.map_err(|e| anyhow::anyhow!("sign failed: {e}"))
}
}
+130
View File
@@ -0,0 +1,130 @@
use nostr::{Event, EventBuilder, JsonUtil, Kind, Tag};
use nostr_sdk::Client;
use tracing::{debug, warn};
use super::signer::Signer;
use crate::db::{FileRecord, TorrentRecord};
/// Build a kind 2003 (NIP-35) event from a TorrentRecord.
pub fn build_event(rec: &TorrentRecord) -> anyhow::Result<EventBuilder> {
let mut tags: Vec<Tag> = vec![];
tags.push(Tag::parse(["title", rec.title.as_str()])?);
tags.push(Tag::parse(["x", rec.info_hash.as_str()])?);
for tracker in &rec.trackers {
tags.push(Tag::parse(["tracker", tracker.as_str()])?);
}
for file in &rec.files {
match file.size_bytes {
Some(sz) => {
let sz_str = sz.to_string();
tags.push(Tag::parse(["file", file.path.as_str(), sz_str.as_str()])?);
}
None => tags.push(Tag::parse(["file", file.path.as_str()])?),
}
}
if !rec.imdb_id.is_empty() {
tags.push(Tag::parse(["i", &format!("imdb:{}", rec.imdb_id)])?);
}
if !rec.tmdb_id.is_empty() {
// stored as "movie:12345" or "tv:12345" — emit as "tmdb:movie:12345" etc.
let prefix = if rec.tmdb_id.starts_with("movie:") || rec.tmdb_id.starts_with("tv:") {
"tmdb"
} else {
"tmdb:movie"
};
tags.push(Tag::parse(["i", &format!("{prefix}:{}", rec.tmdb_id)])?);
}
if !rec.tvdb_id.is_empty() {
tags.push(Tag::parse(["i", &format!("tvdb:{}", rec.tvdb_id)])?);
}
if !rec.category.is_empty() {
tags.push(Tag::parse(["i", &format!("tcat:{}", rec.category)])?);
}
if let Some(cat) = rec.newznab_cat {
tags.push(Tag::parse(["i", &format!("newznab:{cat}")])?);
}
if let Some(season) = rec.season {
if let Some(episode) = rec.episode {
tags.push(Tag::parse(["i", &format!("season:{season}")])?);
tags.push(Tag::parse(["i", &format!("episode:{episode}")])?);
}
}
for tag in &rec.tags {
tags.push(Tag::parse(["t", tag.as_str()])?);
}
Ok(EventBuilder::new(Kind::Custom(2003), rec.description.clone()).tags(tags))
}
/// Build a kind 2003 event from individual fields (used by the qBittorrent watcher
/// before full metainfo is available from a .torrent file).
pub fn build_event_minimal(
info_hash: &str,
title: &str,
size_bytes: Option<i64>,
trackers: &[String],
files: &[FileRecord],
category: &str,
description: &str,
) -> anyhow::Result<EventBuilder> {
let rec = TorrentRecord {
event_id: String::new(),
info_hash: info_hash.to_owned(),
pubkey: String::new(),
created_at: 0,
ingested_at: 0,
title: title.to_owned(),
description: description.to_owned(),
size_bytes,
category: category.to_owned(),
newznab_cat: None,
imdb_id: String::new(),
tmdb_id: String::new(),
tvdb_id: String::new(),
season: None,
episode: None,
quality: String::new(),
source: String::new(),
raw_event: String::new(),
trackers: trackers.to_vec(),
tags: vec![],
files: files.to_vec(),
};
build_event(&rec)
}
/// Sign the builder and publish the event to the given relay URLs.
/// Returns the published Event.
pub async fn publish(signer: &Signer, builder: EventBuilder, relays: &[String]) -> anyhow::Result<Event> {
let event = signer.sign(builder)?;
let client = Client::default();
for relay in relays {
if let Err(e) = client.add_relay(relay.as_str()).await {
warn!(url = relay, "publish: relay add failed: {e}");
}
}
client.connect().await;
match client.send_event(&event).await {
Ok(output) => {
debug!(event_id = %event.id, "published to {} relays", output.success.len());
}
Err(e) => {
// Partial failure is OK — we still return the event so it goes into local DB
warn!("publish: send_event error (partial publish may have succeeded): {e}");
}
}
let _ = client.disconnect().await;
Ok(event)
}
/// Convert a published Event back into the raw JSON string for storage.
pub fn event_to_json(event: &Event) -> String {
event.as_json()
}
+2
View File
@@ -0,0 +1,2 @@
pub mod qbittorrent;
pub mod watcher;
+108
View File
@@ -0,0 +1,108 @@
use anyhow::{bail, Context};
use reqwest::Client;
use serde::Deserialize;
use tracing::debug;
pub struct QbClient {
url: String,
username: String,
password: String,
http: Client,
}
#[derive(Debug, Clone, Deserialize)]
pub struct QbTorrent {
pub hash: String,
pub name: String,
pub size: i64,
/// Primary tracker URL. May be empty if the torrent has no tracker.
#[serde(default)]
pub tracker: String,
#[serde(default)]
pub category: String,
#[serde(default)]
pub content_path: String,
}
#[derive(Debug, Deserialize)]
pub struct QbFile {
pub name: String,
pub size: i64,
}
impl QbClient {
pub fn new(url: &str, username: &str, password: &str) -> Self {
QbClient {
url: url.trim_end_matches('/').to_owned(),
username: username.to_owned(),
password: password.to_owned(),
http: Client::new(),
}
}
/// Log in and return the SID session cookie.
async fn login(&self) -> anyhow::Result<String> {
let resp = self.http
.post(format!("{}/api/v2/auth/login", self.url))
.form(&[("username", &self.username), ("password", &self.password)])
.send()
.await
.context("qbittorrent login")?;
// Parse SID from Set-Cookie header
let sid = resp.headers()
.get("set-cookie")
.and_then(|v| v.to_str().ok())
.and_then(|s| {
s.split(';').find_map(|part| {
let part = part.trim();
part.strip_prefix("SID=").map(|v| v.to_owned())
})
});
match sid {
Some(s) if !s.is_empty() => Ok(s),
_ => bail!("qbittorrent login failed: no SID cookie (check credentials)"),
}
}
/// Fetch completed torrents in one of the watched categories.
pub async fn completed_in_categories(&self, categories: &[String]) -> anyhow::Result<Vec<QbTorrent>> {
let sid = self.login().await?;
let mut results = Vec::new();
for cat in categories {
let resp = self.http
.get(format!("{}/api/v2/torrents/info", self.url))
.query(&[("filter", "completed"), ("category", cat.as_str())])
.header("Cookie", format!("SID={sid}"))
.send()
.await
.context("qbittorrent torrents/info")?;
if !resp.status().is_success() {
bail!("qbittorrent torrents/info returned {}", resp.status());
}
let mut torrents: Vec<QbTorrent> = resp.json().await.context("parse torrents/info")?;
debug!("qbittorrent: {} completed in category '{cat}'", torrents.len());
results.append(&mut torrents);
}
Ok(results)
}
/// Fetch the file list for a torrent by info hash.
pub async fn files(&self, hash: &str) -> anyhow::Result<Vec<QbFile>> {
let sid = self.login().await?;
let resp = self.http
.get(format!("{}/api/v2/torrents/files", self.url))
.query(&[("hash", hash)])
.header("Cookie", format!("SID={sid}"))
.send()
.await
.context("qbittorrent torrents/files")?;
resp.json().await.context("parse torrents/files")
}
}
+241
View File
@@ -0,0 +1,241 @@
use std::{sync::Arc, time::{Duration, SystemTime, UNIX_EPOCH}};
use sqlx::SqlitePool;
use tracing::{debug, error, info, warn};
use crate::{
config::Config,
db::{self, FileRecord, TorrentRecord},
enrich::{parser as title_parser, tmdb::Enricher},
nostr::{signer::Signer, writer},
};
use super::qbittorrent::{QbClient, QbTorrent};
pub struct Watcher {
cfg: Arc<Config>,
pool: SqlitePool,
signer: Arc<Signer>,
enricher: Arc<Enricher>,
}
impl Watcher {
pub fn new(cfg: Arc<Config>, pool: SqlitePool, signer: Arc<Signer>, enricher: Arc<Enricher>) -> Self {
Watcher { cfg, pool, signer, enricher }
}
pub fn start(self) {
let arc = Arc::new(self);
// Task 1: poll qBittorrent and enqueue newly completed torrents.
let w1 = arc.clone();
tokio::spawn(async move { w1.poll_loop().await });
// Task 2: drain the queue when delays expire.
let w2 = arc.clone();
tokio::spawn(async move { w2.drain_loop().await });
}
async fn poll_loop(&self) {
let interval = Duration::from_secs(self.cfg.publisher.qbittorrent.poll_interval_secs.max(10));
let qb = QbClient::new(
&self.cfg.publisher.qbittorrent.url,
&self.cfg.publisher.qbittorrent.username,
&self.cfg.publisher.qbittorrent.password,
);
let categories = &self.cfg.publisher.qbittorrent.categories;
let delay = self.cfg.publisher.publish_delay_secs;
loop {
match qb.completed_in_categories(categories).await {
Ok(torrents) => {
for t in torrents {
if let Err(e) = self.maybe_enqueue(&qb, &t, delay).await {
warn!(hash = t.hash, "enqueue failed: {e}");
}
}
}
Err(e) => warn!("qbittorrent poll failed: {e}"),
}
tokio::time::sleep(interval).await;
}
}
async fn maybe_enqueue(&self, _qb: &QbClient, t: &QbTorrent, delay_secs: u64) -> anyhow::Result<()> {
let hash = t.hash.to_lowercase();
// Skip if already in DB or queue.
if db::is_queued_or_published(&self.pool, &hash).await? {
return Ok(());
}
let now = now_secs();
let scheduled_at = now + delay_secs as i64;
db::enqueue(&self.pool, &hash, &t.name, &t.category, None, now, scheduled_at).await?;
info!(hash = %hash, title = %t.name, delay_secs, "queued for publishing");
Ok(())
}
async fn drain_loop(&self) {
loop {
tokio::time::sleep(Duration::from_secs(30)).await;
if let Err(e) = self.drain().await {
error!("drain_loop error: {e}");
}
}
}
async fn drain(&self) -> anyhow::Result<()> {
let items = db::dequeue_ready(&self.pool, now_secs()).await?;
for item in items {
debug!(id = item.id, hash = %item.info_hash, "publishing queued item");
if let Err(e) = self.publish_item(&item).await {
error!(id = item.id, "publish failed: {e}");
db::mark_queue_error(&self.pool, item.id, &e.to_string()).await?;
}
}
Ok(())
}
async fn publish_item(&self, item: &db::QueueItem) -> anyhow::Result<()> {
let now = now_secs();
// Build a TorrentRecord from the queue item. For items enqueued via
// qBittorrent polling (no .torrent file), we use the stored metadata.
// For items enqueued via CLI (with torrent_path), we parse the file.
let mut rec = if let Some(ref path) = item.torrent_path {
build_from_torrent_file(path, &item.category)?
} else {
// Minimal record from queue data.
let parsed = title_parser::parse(&item.title);
TorrentRecord {
event_id: String::new(),
info_hash: item.info_hash.clone(),
pubkey: self.signer.pubkey_hex(),
created_at: now,
ingested_at: now,
title: item.title.clone(),
description: item.title.clone(),
size_bytes: None,
category: item.category.clone(),
newznab_cat: None,
imdb_id: String::new(),
tmdb_id: String::new(),
tvdb_id: String::new(),
season: parsed.season,
episode: parsed.episode,
quality: parsed.quality.unwrap_or_default(),
source: parsed.source.unwrap_or_default(),
raw_event: String::new(),
trackers: vec![],
tags: vec![],
files: vec![],
}
};
// TMDB enrichment before publishing — best-effort, don't fail publish if slow.
if self.enricher.enabled() && rec.tmdb_id.is_empty() {
let parsed = title_parser::parse(&rec.title);
tokio::select! {
result = self.enricher.lookup(&parsed.clean, parsed.year, rec.newznab_cat) => {
if let Some(tmdb_id) = result {
rec.tmdb_id = tmdb_id;
}
}
() = tokio::time::sleep(Duration::from_secs(10)) => {
warn!("tmdb lookup timed out for '{}'", rec.title);
}
}
}
// Determine outbox relays for this category.
let relays = outbox_for_category(&self.cfg, &item.category);
if relays.is_empty() {
return Err(anyhow::anyhow!("no outbox relays configured"));
}
// Build and sign the event.
let builder = writer::build_event(&rec)?;
let event = writer::publish(&self.signer, builder, &relays).await?;
let event_id = event.id.to_hex();
// Insert into local DB so it appears in search.
rec.event_id = event_id.clone();
rec.pubkey = self.signer.pubkey_hex();
rec.raw_event = writer::event_to_json(&event);
if let Err(e) = db::insert_torrent(&self.pool, &rec).await {
warn!(event_id = %event_id, "local insert failed (already indexed?): {e}");
}
db::mark_published(&self.pool, item.id, &event_id, now_secs()).await?;
info!(event_id = %event_id, title = %rec.title, "published kind 2003");
Ok(())
}
}
/// Parse a .torrent file into a TorrentRecord using lava_torrent.
pub fn build_from_torrent_file(path: &str, category: &str) -> anyhow::Result<TorrentRecord> {
use lava_torrent::torrent::v1::Torrent;
let torrent = Torrent::read_from_file(path)
.map_err(|e| anyhow::anyhow!("torrent parse failed: {e}"))?;
let info_hash = torrent.info_hash();
let title = torrent.name.clone();
let size_bytes = torrent.length;
let trackers: Vec<String> = torrent
.announce_list
.as_deref()
.unwrap_or(&[])
.iter()
.flat_map(|tier| tier.iter().cloned())
.collect();
let files: Vec<FileRecord> = match &torrent.files {
Some(files) => files.iter().map(|f| FileRecord {
path: f.path.to_string_lossy().to_string(),
size_bytes: Some(f.length),
}).collect(),
None => vec![FileRecord { path: title.clone(), size_bytes: Some(size_bytes) }],
};
let now = now_secs();
let parsed = title_parser::parse(&title);
Ok(TorrentRecord {
event_id: String::new(),
info_hash,
pubkey: String::new(),
created_at: now,
ingested_at: now,
title: title.clone(),
description: title,
size_bytes: Some(size_bytes),
category: category.to_owned(),
newznab_cat: None,
imdb_id: String::new(),
tmdb_id: String::new(),
tvdb_id: String::new(),
season: parsed.season,
episode: parsed.episode,
quality: parsed.quality.unwrap_or_default(),
source: parsed.source.unwrap_or_default(),
raw_event: String::new(),
trackers,
tags: vec![],
files,
})
}
fn outbox_for_category(cfg: &Config, _category: &str) -> Vec<String> {
// All categories use the same outbox for now. Per-category routing
// can be added in a future config field (e.g. publisher.category_routing).
cfg.publisher.outbox.clone()
}
fn now_secs() -> i64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_secs() as i64
}