From fa7e6052fae8cd50c9e02510f689763266f968c8 Mon Sep 17 00:00:00 2001 From: enki Date: Sun, 17 May 2026 18:18:08 -0700 Subject: [PATCH] feat: publisher profile enrichment Fetch kind 0 metadata events for known publishers in background batches (150 at a time, hourly). Store name, nip05, picture, about in new columns (migration 005). UI now shows avatar + display name instead of raw pubkeys across indexed, dashboard, and publishers views. --- src/db/migrations/005_profiles.sql | 5 + src/db/queries.rs | 105 +++++++++++++++++--- src/main.rs | 1 + src/nostr/mod.rs | 1 + src/nostr/profiles.rs | 152 +++++++++++++++++++++++++++++ src/ui/dashboard.rs | 8 +- src/ui/indexed.rs | 7 +- src/ui/mod.rs | 8 ++ src/ui/publishers.rs | 70 ++++++++----- 9 files changed, 316 insertions(+), 41 deletions(-) create mode 100644 src/db/migrations/005_profiles.sql create mode 100644 src/nostr/profiles.rs diff --git a/src/db/migrations/005_profiles.sql b/src/db/migrations/005_profiles.sql new file mode 100644 index 0000000..6494ada --- /dev/null +++ b/src/db/migrations/005_profiles.sql @@ -0,0 +1,5 @@ +-- Publisher profile data fetched from kind 0 (NIP-01 metadata) events +ALTER TABLE publishers ADD COLUMN nip05 TEXT; +ALTER TABLE publishers ADD COLUMN picture TEXT; +ALTER TABLE publishers ADD COLUMN about TEXT; +ALTER TABLE publishers ADD COLUMN profile_fetched_at INTEGER; diff --git a/src/db/queries.rs b/src/db/queries.rs index 010608a..668eff9 100644 --- a/src/db/queries.rs +++ b/src/db/queries.rs @@ -398,6 +398,8 @@ pub async fn update_relay_sync(pool: &SqlitePool, url: &str, ts: i64) -> anyhow: pub struct PublisherRow { pub pubkey: String, pub name: Option, + pub nip05: Option, + pub picture: Option, pub trust: f64, pub blocked: bool, pub muted: bool, @@ -406,6 +408,7 @@ pub struct PublisherRow { pub torrents_n: i32, pub first_seen: Option, pub last_seen: Option, + pub profile_fetched_at: Option, } impl sqlx::FromRow<'_, sqlx::sqlite::SqliteRow> for PublisherRow { @@ -414,6 +417,8 @@ impl sqlx::FromRow<'_, sqlx::sqlite::SqliteRow> for PublisherRow { Ok(PublisherRow { pubkey: row.try_get("pubkey")?, name: row.try_get("name")?, + nip05: row.try_get("nip05").unwrap_or(None), + picture: row.try_get("picture").unwrap_or(None), trust: row.try_get("trust")?, blocked: row.try_get::("blocked")? != 0, muted: row.try_get::("muted")? != 0, @@ -422,6 +427,7 @@ impl sqlx::FromRow<'_, sqlx::sqlite::SqliteRow> for PublisherRow { torrents_n: row.try_get("torrents_n")?, first_seen: row.try_get("first_seen")?, last_seen: row.try_get("last_seen")?, + profile_fetched_at: row.try_get("profile_fetched_at").unwrap_or(None), }) } } @@ -513,7 +519,8 @@ pub async fn is_publisher_allowed(pool: &SqlitePool, pubkey: &str, wot_only: boo pub async fn list_publishers(pool: &SqlitePool, limit: i64, offset: i64) -> anyhow::Result> { sqlx::query_as( - "SELECT pubkey, name, trust, blocked, muted, wot_level, report_count, torrents_n, first_seen, last_seen + "SELECT pubkey, name, nip05, picture, trust, blocked, muted, wot_level, + report_count, torrents_n, first_seen, last_seen, profile_fetched_at FROM publishers ORDER BY last_seen DESC LIMIT ? OFFSET ?", ) .bind(limit) @@ -525,7 +532,8 @@ pub async fn list_publishers(pool: &SqlitePool, limit: i64, offset: i64) -> anyh pub async fn get_publisher(pool: &SqlitePool, pubkey: &str) -> anyhow::Result> { sqlx::query_as( - "SELECT pubkey, name, trust, blocked, muted, wot_level, report_count, torrents_n, first_seen, last_seen + "SELECT pubkey, name, nip05, picture, trust, blocked, muted, wot_level, + report_count, torrents_n, first_seen, last_seen, profile_fetched_at FROM publishers WHERE pubkey = ?", ) .bind(pubkey) @@ -534,6 +542,67 @@ pub async fn get_publisher(pool: &SqlitePool, pubkey: &str) -> anyhow::Result anyhow::Result> { + let rows: Vec<(String,)> = sqlx::query_as( + "SELECT pubkey FROM publishers + WHERE profile_fetched_at IS NULL + ORDER BY last_seen DESC LIMIT ?", + ) + .bind(limit) + .fetch_all(pool) + .await?; + Ok(rows.into_iter().map(|r| r.0).collect()) +} + +pub async fn update_publisher_profile( + pool: &SqlitePool, + pubkey: &str, + name: Option<&str>, + nip05: Option<&str>, + picture: Option<&str>, + about: Option<&str>, + fetched_at: i64, +) -> anyhow::Result<()> { + sqlx::query( + "UPDATE publishers + SET name = COALESCE(?, name), + nip05 = ?, + picture = ?, + about = ?, + profile_fetched_at = ? + WHERE pubkey = ?", + ) + .bind(name) + .bind(nip05) + .bind(picture) + .bind(about) + .bind(fetched_at) + .bind(pubkey) + .execute(pool) + .await?; + Ok(()) +} + +/// Mark a batch of pubkeys as having been attempted (even if no profile was found), +/// so they aren't re-queued every cycle. +pub async fn mark_profiles_fetched(pool: &SqlitePool, pubkeys: &[String], fetched_at: i64) -> anyhow::Result<()> { + for pk in pubkeys { + sqlx::query( + "UPDATE publishers SET profile_fetched_at = ? + WHERE pubkey = ? AND profile_fetched_at IS NULL", + ) + .bind(fetched_at) + .bind(pk) + .execute(pool) + .await?; + } + Ok(()) +} + // ─── Curation ───────────────────────────────────────────────────────────────── pub async fn insert_curation_item( @@ -766,19 +835,24 @@ pub struct RecentTorrentRow { pub title: String, pub category: String, pub pubkey: String, + pub publisher_name: Option, + pub publisher_nip05: Option, pub ingested_at: i64, } pub async fn recent_torrents(pool: &SqlitePool, limit: i64) -> anyhow::Result> { - let rows: Vec<(String, String, String, i64)> = sqlx::query_as( - "SELECT title, COALESCE(category,''), pubkey, ingested_at - FROM torrents ORDER BY ingested_at DESC LIMIT ?", + let rows: Vec<(String, String, String, Option, Option, i64)> = sqlx::query_as( + "SELECT t.title, COALESCE(t.category,''), t.pubkey, + p.name, p.nip05, t.ingested_at + FROM torrents t + LEFT JOIN publishers p ON p.pubkey = t.pubkey + ORDER BY t.ingested_at DESC LIMIT ?", ) .bind(limit) .fetch_all(pool) .await?; - Ok(rows.into_iter().map(|(title, category, pubkey, ingested_at)| RecentTorrentRow { - title, category, pubkey, ingested_at, + Ok(rows.into_iter().map(|(title, category, pubkey, publisher_name, publisher_nip05, ingested_at)| RecentTorrentRow { + title, category, pubkey, publisher_name, publisher_nip05, ingested_at, }).collect()) } @@ -789,6 +863,8 @@ pub struct BrowseTorrentRow { pub category: String, pub size_bytes: Option, pub pubkey: String, + pub publisher_name: Option, + pub publisher_nip05: Option, pub ingested_at: i64, } @@ -802,6 +878,8 @@ impl sqlx::FromRow<'_, sqlx::sqlite::SqliteRow> for BrowseTorrentRow { category: row.try_get("category").unwrap_or_default(), size_bytes: row.try_get("size_bytes")?, pubkey: row.try_get("pubkey")?, + publisher_name: row.try_get("publisher_name").unwrap_or(None), + publisher_nip05: row.try_get("publisher_nip05").unwrap_or(None), ingested_at: row.try_get("ingested_at")?, }) } @@ -815,9 +893,12 @@ pub async fn browse_torrents( ) -> anyhow::Result> { if q.is_empty() { sqlx::query_as( - "SELECT event_id, info_hash, title, COALESCE(category,'') AS category, - size_bytes, pubkey, ingested_at - FROM torrents ORDER BY ingested_at DESC LIMIT ? OFFSET ?", + "SELECT t.event_id, t.info_hash, t.title, COALESCE(t.category,'') AS category, + t.size_bytes, t.pubkey, t.ingested_at, + p.name AS publisher_name, p.nip05 AS publisher_nip05 + FROM torrents t + LEFT JOIN publishers p ON p.pubkey = t.pubkey + ORDER BY t.ingested_at DESC LIMIT ? OFFSET ?", ) .bind(limit) .bind(offset) @@ -827,8 +908,10 @@ pub async fn browse_torrents( } else { sqlx::query_as( "SELECT t.event_id, t.info_hash, t.title, COALESCE(t.category,'') AS category, - t.size_bytes, t.pubkey, t.ingested_at + t.size_bytes, t.pubkey, t.ingested_at, + p.name AS publisher_name, p.nip05 AS publisher_nip05 FROM torrents t + LEFT JOIN publishers p ON p.pubkey = t.pubkey WHERE t.rowid IN (SELECT rowid FROM torrents_fts WHERE torrents_fts MATCH ?) ORDER BY t.ingested_at DESC LIMIT ? OFFSET ?", ) diff --git a/src/main.rs b/src/main.rs index d0fb96b..38a9cc3 100644 --- a/src/main.rs +++ b/src/main.rs @@ -63,6 +63,7 @@ async fn main() -> anyhow::Result<()> { reader.start(); wot::follows::WotBuilder::new(cfg.clone(), pool.clone()).start(); + nostr::profiles::ProfileFetcher::new(cfg.clone(), pool.clone()).start(); if cfg.publisher.enabled { match nostr::signer::Signer::from_nsec(&cfg.publisher.identity.nsec) { diff --git a/src/nostr/mod.rs b/src/nostr/mod.rs index ab9bc34..67ea9e0 100644 --- a/src/nostr/mod.rs +++ b/src/nostr/mod.rs @@ -1,4 +1,5 @@ pub mod parser; +pub mod profiles; pub mod reader; pub mod signer; pub mod writer; diff --git a/src/nostr/profiles.rs b/src/nostr/profiles.rs new file mode 100644 index 0000000..9aa4c76 --- /dev/null +++ b/src/nostr/profiles.rs @@ -0,0 +1,152 @@ +use std::{sync::Arc, time::Duration}; + +use nostr_sdk::{prelude::*, RelayPoolNotification}; +use serde::Deserialize; +use sqlx::SqlitePool; +use tracing::{debug, info, warn}; + +use crate::{config::Config, db}; + +pub struct ProfileFetcher { + cfg: Arc, + pool: SqlitePool, +} + +#[derive(Deserialize, Default)] +struct Nip0Content { + name: Option, + display_name: Option, + about: Option, + picture: Option, + nip05: Option, +} + +impl ProfileFetcher { + pub fn new(cfg: Arc, pool: SqlitePool) -> Self { + ProfileFetcher { cfg, pool } + } + + pub fn start(self) { + tokio::spawn(async move { + // Let the reader settle before the first fetch. + tokio::time::sleep(Duration::from_secs(45)).await; + loop { + if let Err(e) = self.fetch_batch().await { + warn!("profile fetcher error: {e}"); + } + tokio::time::sleep(Duration::from_secs(3600)).await; + } + }); + } + + async fn fetch_batch(&self) -> anyhow::Result<()> { + let pubkeys = db::list_publishers_needing_profile(&self.pool, 150).await?; + if pubkeys.is_empty() { + return Ok(()); + } + info!("profiles: fetching kind 0 for {} publishers", pubkeys.len()); + + let authors: Vec = pubkeys.iter() + .filter_map(|pk| PublicKey::from_hex(pk).ok()) + .collect(); + + if authors.is_empty() { + return Ok(()); + } + + let filter = Filter::new() + .kind(Kind::Metadata) + .authors(authors); + + let events = self.fetch(filter, 20).await; + + let now = now_secs(); + // Mark the whole batch as attempted so we don't retry endlessly. + db::mark_profiles_fetched(&self.pool, &pubkeys, now).await?; + + let mut updated = 0usize; + for event in &events { + let pubkey_hex = event.pubkey.to_hex(); + let content: Nip0Content = match serde_json::from_str(&event.content) { + Ok(c) => c, + Err(_) => continue, + }; + + // Prefer display_name over name; skip empty strings. + let name = content.display_name.as_deref() + .filter(|s| !s.trim().is_empty()) + .or_else(|| content.name.as_deref().filter(|s| !s.trim().is_empty())); + + let nip05 = content.nip05.as_deref().filter(|s| !s.is_empty()); + let picture = content.picture.as_deref().filter(|s| !s.is_empty()); + let about = content.about.as_deref().filter(|s| !s.is_empty()); + + db::update_publisher_profile( + &self.pool, + &pubkey_hex, + name, + nip05, + picture, + about, + now, + ).await?; + updated += 1; + } + + info!("profiles: updated {updated}/{} publishers", pubkeys.len()); + Ok(()) + } + + async fn fetch(&self, filter: Filter, timeout_secs: u64) -> Vec { + let client = Client::default(); + for relay in &self.cfg.relays { + if let Err(e) = client.add_relay(relay.as_str()).await { + warn!(url = relay, "profiles: relay add failed: {e}"); + } + } + client.connect().await; + + if let Err(e) = client.subscribe(filter, None).await { + warn!("profiles: subscribe failed: {e}"); + return vec![]; + } + + let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::(); + let tx2 = tx.clone(); + let client_h = client.clone(); + + let handler = tokio::spawn(async move { + let _ = client_h.handle_notifications(|n| { + let tx = tx2.clone(); + async move { + if let RelayPoolNotification::Event { event, .. } = n { + let _ = tx.send(*event); + } + Ok(false) + } + }).await; + }); + + let mut events = Vec::new(); + let sleep = tokio::time::sleep(Duration::from_secs(timeout_secs)); + tokio::pin!(sleep); + loop { + tokio::select! { + Some(e) = rx.recv() => events.push(e), + _ = &mut sleep => break, + } + } + + handler.abort(); + let _ = client.disconnect().await; + debug!("profiles: received {} kind-0 events", events.len()); + events + } +} + +fn now_secs() -> i64 { + std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap_or_default() + .as_secs() as i64 +} diff --git a/src/ui/dashboard.rs b/src/ui/dashboard.rs index def8fd7..af5d8ec 100644 --- a/src/ui/dashboard.rs +++ b/src/ui/dashboard.rs @@ -72,16 +72,20 @@ async fn render(state: &AppState) -> anyhow::Result> { recent .iter() .map(|r| { + let pub_display = r.publisher_name.as_deref() + .or(r.publisher_nip05.as_deref()) + .map(esc) + .unwrap_or_else(|| short_pubkey(&r.pubkey)); format!( r#" {title} {cat} - {pk} + {pub_display} {ts} "#, title = esc(&r.title), cat = esc(&r.category), - pk = short_pubkey(&r.pubkey), + pk_full = esc(&r.pubkey), ts = fmt_ts_ago(r.ingested_at), ) }) diff --git a/src/ui/indexed.rs b/src/ui/indexed.rs index 84261e8..877f621 100644 --- a/src/ui/indexed.rs +++ b/src/ui/indexed.rs @@ -55,19 +55,22 @@ async fn render(state: &AppState, params: &Params) -> anyhow::Result {title} {cat} {size} - {pk} + {pub_display} {hash_short} {ts} "#, title = esc(&r.title), cat = esc(&r.category), pk_full = esc(&r.pubkey), - pk = short_pubkey(&r.pubkey), hash = esc(&r.info_hash), hash_short = &r.info_hash[..12], ts = fmt_ts(r.ingested_at), diff --git a/src/ui/mod.rs b/src/ui/mod.rs index 0186e2a..fa3e541 100644 --- a/src/ui/mod.rs +++ b/src/ui/mod.rs @@ -245,6 +245,14 @@ input.wide { flex: 1; min-width: 260px; } .msg-err { background: #2d0c0c; border: 1px solid #5a1a1a; color: var(--danger); border-radius: 8px; padding: .6rem 1rem; margin-bottom: 1rem; font-size: 13px; } form.ifrm { display: inline; } +.pub-identity { display: flex; align-items: center; gap: .5rem; } +.avatar { width: 28px; height: 28px; border-radius: 50%; object-fit: cover; flex-shrink: 0; } +.avatar-placeholder { width: 28px; height: 28px; border-radius: 50%; background: var(--bg3); + border: 1px solid var(--border); display: inline-flex; align-items: center; + justify-content: center; font-size: 11px; font-weight: 700; + color: var(--muted); flex-shrink: 0; } +.pub-name { font-size: 13px; font-weight: 500; } +.pub-nip05 { font-size: 11px; color: var(--muted); } .tabs { display: flex; gap: 0; margin-bottom: 1rem; border: 1px solid var(--border); border-radius: 6px; overflow: hidden; width: fit-content; } .tabs a { padding: .35rem .9rem; font-size: 12px; font-weight: 600; color: var(--muted); background: var(--bg2); } .tabs a:hover { color: var(--text); text-decoration: none; } diff --git a/src/ui/publishers.rs b/src/ui/publishers.rs index d665d0f..fa5124f 100644 --- a/src/ui/publishers.rs +++ b/src/ui/publishers.rs @@ -85,41 +85,61 @@ async fn render(state: &AppState, params: &ListParams) -> anyhow::Resultneutral"# }; - let wot = r.wot_level - .map(|l| format!("{l}")) - .unwrap_or_else(|| "—".into()); - - let last = r.last_seen - .map(fmt_ts_ago) - .unwrap_or_else(|| "—".into()); - + let wot = r.wot_level.map(|l| l.to_string()).unwrap_or_else(|| "—".into()); + let last = r.last_seen.map(fmt_ts_ago).unwrap_or_else(|| "—".into()); let pk = &r.pubkey; - // Action buttons — wrapped in inline forms - let block_btn = if r.blocked { - format!( - r#"
-
"# - ) - } else { - format!( - r#"
-
"# - ) + // Avatar: photo or initial placeholder + let avatar = match &r.picture { + Some(url) => format!( + r#""#, + esc(url) + ), + None => { + let initial = r.name.as_deref() + .or(r.nip05.as_deref()) + .and_then(|s| s.chars().next()) + .map(|c| c.to_uppercase().to_string()) + .unwrap_or_else(|| "?".into()); + format!(r#"{initial}"#) + } }; + // Display name line + nip05 sub-line + let display = match (&r.name, &r.nip05) { + (Some(n), Some(n05)) => format!( + r#"{}
{}"#, + esc(n), esc(n05) + ), + (Some(n), None) => format!(r#"{}"#, esc(n)), + (None, Some(n05)) => format!(r#"{}"#, esc(n05)), + (None, None) => { + let pending = if r.profile_fetched_at.is_none() { + r#""# + } else { "" }; + format!(r#"{} {pending}"#, short_pubkey(pk)) + } + }; + + let identity_cell = format!( + r#"
{avatar} {display}
"#, + pk_esc = esc(pk), + ); + + let block_btn = if r.blocked { + format!(r#"
"#) + } else { + format!(r#"
"#) + }; let mute_btn = if !r.muted { - format!( - r#"
-
"# - ) + format!(r#"
"#) } else { String::new() }; format!( r#" - {short} + {identity_cell} {trust_badge} {score:.2} {wot} {events} @@ -127,8 +147,6 @@ async fn render(state: &AppState, params: &ListParams) -> anyhow::Result{last} {block_btn} {mute_btn} "#, - pk = esc(pk), - short = short_pubkey(pk), score = r.trust, events = r.torrents_n, reports = r.report_count,