diff --git a/src-tauri/Cargo.lock b/src-tauri/Cargo.lock index 6b40756..5a3e552 100644 --- a/src-tauri/Cargo.lock +++ b/src-tauri/Cargo.lock @@ -68,6 +68,12 @@ dependencies = [ "derive_arbitrary", ] +[[package]] +name = "arrayvec" +version = "0.7.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7c02d123df017efcdfbd739ef81735b36c5ba83ec3c59c80a9d7ecc718f92e50" + [[package]] name = "async-broadcast" version = "0.7.2" @@ -261,6 +267,22 @@ version = "0.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5e764a1d40d510daf35e07be9eb06e75770908c27d411ee6c92109c9840eaaf7" +[[package]] +name = "bitcoin-io" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2dee39a0ee5b4095224a0cfc6bf4cc1baf0f9624b96b367e53b66d974e51d953" + +[[package]] +name = "bitcoin_hashes" +version = "0.14.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "26ec84b80c482df901772e931a9a681e26a1b9ee2302edeff23cb30328745c8b" +dependencies = [ + "bitcoin-io", + "hex-conservative", +] + [[package]] name = "bitflags" version = "1.3.2" @@ -732,6 +754,12 @@ dependencies = [ "syn 2.0.117", ] +[[package]] +name = "data-encoding" +version = "2.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d7a1e2f27636f116493b8b860f5546edb47c8d8f8ea73e1d2a20be88e28d1fea" + [[package]] name = "data-url" version = "0.3.2" @@ -1645,6 +1673,15 @@ version = "0.4.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70" +[[package]] +name = "hex-conservative" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fda06d18ac606267c40c04e41b9947729bf8b9efe74bd4e82b61a5f26a510b9f" +dependencies = [ + "arrayvec", +] + [[package]] name = "html5ever" version = "0.29.1" @@ -3699,6 +3736,26 @@ version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" +[[package]] +name = "secp256k1" +version = "0.30.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b50c5943d326858130af85e049f2661ba3c78b26589b8ab98e65e80ae44a1252" +dependencies = [ + "bitcoin_hashes", + "rand 0.8.5", + "secp256k1-sys", +] + +[[package]] +name = "secp256k1-sys" +version = "0.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d4387882333d3aa8cb20530a17c69a3752e97837832f34f6dccc760e715001d9" +dependencies = [ + "cc", +] + [[package]] name = "security-framework" version = "2.11.1" @@ -3961,6 +4018,17 @@ dependencies = [ "stable_deref_trait", ] +[[package]] +name = "sha1" +version = "0.10.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e3bf829a2d51ab4a5ddf1352d8470c140cadc8301b2ae1789db023f01cedd6ba" +dependencies = [ + "cfg-if", + "cpufeatures", + "digest", +] + [[package]] name = "sha2" version = "0.10.9" @@ -4853,6 +4921,18 @@ dependencies = [ "tokio", ] +[[package]] +name = "tokio-tungstenite" +version = "0.24.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "edc5f74e248dc973e0dbb7b74c7e0d6fcc301c694ff50049504004ef4d0cdcd9" +dependencies = [ + "futures-util", + "log", + "tokio", + "tungstenite", +] + [[package]] name = "tokio-util" version = "0.7.18" @@ -5075,6 +5155,24 @@ version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b" +[[package]] +name = "tungstenite" +version = "0.24.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "18e5b8366ee7a95b16d32197d0b2604b43a0be89dc5fac9f8e96ccafbaedda8a" +dependencies = [ + "byteorder", + "bytes", + "data-encoding", + "http", + "httparse", + "log", + "rand 0.8.5", + "sha1", + "thiserror 1.0.69", + "utf-8", +] + [[package]] name = "typeid" version = "1.0.3" @@ -5222,10 +5320,14 @@ checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426" name = "vega" version = "0.10.1" dependencies = [ + "futures-util", + "hex", "keyring", "rusqlite", + "secp256k1", "serde", "serde_json", + "sha2", "tauri", "tauri-build", "tauri-plugin-dialog", @@ -5235,6 +5337,9 @@ dependencies = [ "tauri-plugin-opener", "tauri-plugin-process", "tauri-plugin-updater", + "tokio", + "tokio-tungstenite", + "webkit2gtk", ] [[package]] diff --git a/src-tauri/Cargo.toml b/src-tauri/Cargo.toml index b52c201..db66172 100644 --- a/src-tauri/Cargo.toml +++ b/src-tauri/Cargo.toml @@ -30,4 +30,13 @@ tauri-plugin-http = "2.5.7" tauri-plugin-dialog = "2.6.0" tauri-plugin-fs = "2.4.5" tauri-plugin-notification = "2.3.3" +tokio = { version = "1", features = ["rt-multi-thread", "net", "sync", "macros", "time"] } +tokio-tungstenite = "0.24" +secp256k1 = { version = "0.30", features = ["global-context"] } +sha2 = "0.10" +hex = "0.4" +futures-util = "0.3" + +[target.'cfg(target_os = "linux")'.dependencies] +webkit2gtk = "2.0" diff --git a/src-tauri/src/lib.rs b/src-tauri/src/lib.rs index c4bc7e2..7248c54 100644 --- a/src-tauri/src/lib.rs +++ b/src-tauri/src/lib.rs @@ -7,6 +7,8 @@ use tauri::{ Manager, WindowEvent, }; +mod relay; + // ── OS keychain ───────────────────────────────────────────────────────────── // Keep legacy keyring service name so existing users don't lose their keys @@ -372,6 +374,39 @@ fn db_load_articles(state: tauri::State, limit: u32) -> Result) -> Option { + state.port() +} + +#[tauri::command] +fn relay_get_stats(state: tauri::State) -> Result { + let db_path = state.data_dir().join("relay.db"); + + // Get file size + let db_size_bytes = std::fs::metadata(&db_path) + .map(|m| m.len()) + .unwrap_or(0); + + // Open read-only connection for count query + let conn = rusqlite::Connection::open_with_flags( + &db_path, + rusqlite::OpenFlags::SQLITE_OPEN_READ_ONLY | rusqlite::OpenFlags::SQLITE_OPEN_NO_MUTEX, + ) + .map_err(|e| e.to_string())?; + + let event_count: i64 = conn + .query_row("SELECT COUNT(*) FROM events", [], |row| row.get(0)) + .map_err(|e| e.to_string())?; + + Ok(serde_json::json!({ + "event_count": event_count, + "db_size_bytes": db_size_bytes + })) +} + // ── App entry ──────────────────────────────────────────────────────────────── #[cfg_attr(mobile, tauri::mobile_entry_point)] @@ -387,10 +422,35 @@ pub fn run() { .setup(|app| { // ── SQLite ─────────────────────────────────────────────────────── let data_dir = app.path().app_data_dir()?; - let conn = open_db(data_dir) + let conn = open_db(data_dir.clone()) .unwrap_or_else(|_| Connection::open_in_memory().expect("in-memory SQLite")); app.manage(DbState(Mutex::new(conn))); + // ── Embedded relay ────────────────────────────────────────────── + match relay::start_relay(data_dir, 4869) { + Ok(handle) => { + app.manage(handle); + } + Err(e) => { + eprintln!("[relay] Failed to start embedded relay: {}", e); + } + } + + // ── WebKit GPU workaround for Linux (webkit2gtk 2.50+ black screen) ── + #[cfg(target_os = "linux")] + { + let main_window = app.get_webview_window("main").unwrap(); + main_window.with_webview(|webview| { + use webkit2gtk::{SettingsExt, WebViewExt}; + let wv = webview.inner(); + if let Some(settings) = wv.settings() { + settings.set_hardware_acceleration_policy( + webkit2gtk::HardwareAccelerationPolicy::Never, + ); + } + }).ok(); + } + // ── System tray ────────────────────────────────────────────────── let show_item = MenuItem::with_id(app, "show", "Open Vega", true, None::<&str>)?; let quit_item = MenuItem::with_id(app, "quit", "Quit", true, None::<&str>)?; @@ -458,6 +518,8 @@ pub fn run() { db_save_bookmarked_notes, db_load_bookmarked_notes, db_load_articles, + relay_get_port, + relay_get_stats, ]) .run(tauri::generate_context!()) .expect("error while running tauri application"); diff --git a/src-tauri/src/main.rs b/src-tauri/src/main.rs index 7985eb2..21e829c 100644 --- a/src-tauri/src/main.rs +++ b/src-tauri/src/main.rs @@ -2,10 +2,12 @@ #![cfg_attr(not(debug_assertions), windows_subsystem = "windows")] fn main() { - // Disable WebKit DMA-BUF renderer which causes blank windows on some - // Wayland compositors (Hyprland, etc.) due to EGL_BAD_PARAMETER errors. + // WebKitGTK rendering workaround for Wayland/GPU issues. + // DMABUF: blank windows on some compositors (Hyprland) due to EGL errors. #[cfg(target_os = "linux")] - std::env::set_var("WEBKIT_DISABLE_DMABUF_RENDERER", "1"); + { + std::env::set_var("WEBKIT_DISABLE_DMABUF_RENDERER", "1"); + } vega_lib::run() } diff --git a/src-tauri/src/relay/db.rs b/src-tauri/src/relay/db.rs new file mode 100644 index 0000000..e35469c --- /dev/null +++ b/src-tauri/src/relay/db.rs @@ -0,0 +1,154 @@ +use crate::relay::event::Event; +use crate::relay::filter::Filter; +use rusqlite::{params, Connection}; +use std::path::Path; + +pub fn open_relay_db(data_dir: &Path) -> rusqlite::Result { + std::fs::create_dir_all(data_dir).ok(); + let path = data_dir.join("relay.db"); + let conn = Connection::open(path)?; + conn.execute_batch( + "PRAGMA journal_mode=WAL; + PRAGMA foreign_keys=ON; + + CREATE TABLE IF NOT EXISTS events ( + id TEXT PRIMARY KEY, + pubkey TEXT NOT NULL, + created_at INTEGER NOT NULL, + kind INTEGER NOT NULL, + content TEXT NOT NULL, + sig TEXT NOT NULL, + raw TEXT NOT NULL + ); + + CREATE INDEX IF NOT EXISTS idx_events_pubkey ON events(pubkey); + CREATE INDEX IF NOT EXISTS idx_events_kind ON events(kind); + CREATE INDEX IF NOT EXISTS idx_events_created ON events(created_at); + + CREATE TABLE IF NOT EXISTS event_tags ( + event_id TEXT NOT NULL REFERENCES events(id) ON DELETE CASCADE, + tag_name TEXT NOT NULL, + tag_value TEXT NOT NULL + ); + + CREATE INDEX IF NOT EXISTS idx_tags_name_value ON event_tags(tag_name, tag_value); + CREATE INDEX IF NOT EXISTS idx_tags_event ON event_tags(event_id);", + )?; + Ok(conn) +} + +/// Store an event. Returns true if the event was newly inserted, false if it already existed. +/// Handles replaceable (kind 0/3/10000-19999) and parameterized replaceable (30000-39999) events. +pub fn store_event(conn: &Connection, event: &Event, raw: &str) -> rusqlite::Result { + // Check if already exists (idempotent) + let exists: bool = conn.query_row( + "SELECT EXISTS(SELECT 1 FROM events WHERE id = ?1)", + [&event.id], + |row| row.get(0), + )?; + if exists { + return Ok(false); + } + + // Handle replaceable events: delete older event with same pubkey+kind + if event.is_replaceable() { + conn.execute( + "DELETE FROM events WHERE pubkey = ?1 AND kind = ?2 AND created_at < ?3", + params![event.pubkey, event.kind as i64, event.created_at as i64], + )?; + // If a newer one already exists, reject this one + let newer_exists: bool = conn.query_row( + "SELECT EXISTS(SELECT 1 FROM events WHERE pubkey = ?1 AND kind = ?2)", + params![event.pubkey, event.kind as i64], + |row| row.get(0), + )?; + if newer_exists { + return Ok(false); + } + } + + // Handle parameterized replaceable events: same pubkey+kind+d-tag + if event.is_parameterized_replaceable() { + let d_tag = event.d_tag().unwrap_or(""); + conn.execute( + "DELETE FROM events WHERE pubkey = ?1 AND kind = ?2 AND id IN \ + (SELECT e.id FROM events e \ + JOIN event_tags t ON t.event_id = e.id \ + WHERE e.pubkey = ?1 AND e.kind = ?2 AND t.tag_name = 'd' AND t.tag_value = ?3 \ + AND e.created_at < ?4)", + params![event.pubkey, event.kind as i64, d_tag, event.created_at as i64], + )?; + // Check if newer already exists + let newer_exists: bool = conn.query_row( + "SELECT EXISTS(SELECT 1 FROM events e \ + JOIN event_tags t ON t.event_id = e.id \ + WHERE e.pubkey = ?1 AND e.kind = ?2 AND t.tag_name = 'd' AND t.tag_value = ?3)", + params![event.pubkey, event.kind as i64, d_tag], + |row| row.get(0), + )?; + if newer_exists { + return Ok(false); + } + } + + // Insert the event + conn.execute( + "INSERT INTO events (id, pubkey, created_at, kind, content, sig, raw) \ + VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)", + params![ + event.id, + event.pubkey, + event.created_at as i64, + event.kind as i64, + event.content, + event.sig, + raw, + ], + )?; + + // Index single-letter tags + for tag in &event.tags { + if let (Some(name), Some(value)) = (tag.first(), tag.get(1)) { + if name.len() == 1 && name.chars().next().map_or(false, |c| c.is_ascii_alphabetic()) { + conn.execute( + "INSERT INTO event_tags (event_id, tag_name, tag_value) VALUES (?1, ?2, ?3)", + params![event.id, name, value], + )?; + } + } + } + + Ok(true) +} + +/// Query events matching any of the given filters. Returns raw JSON strings. +pub fn query_events(conn: &Connection, filters: &[Filter]) -> rusqlite::Result> { + let mut all_results: Vec = Vec::new(); + + for filter in filters { + let (where_clause, params, limit) = filter.to_sql(); + let limit_val = limit.unwrap_or(500).min(5000); + let sql = format!( + "SELECT e.raw FROM events e WHERE {} ORDER BY e.created_at DESC LIMIT {}", + where_clause, limit_val + ); + + let mut stmt = conn.prepare(&sql)?; + let param_refs: Vec<&dyn rusqlite::types::ToSql> = + params.iter().map(|v| v as &dyn rusqlite::types::ToSql).collect(); + let rows = stmt.query_map(param_refs.as_slice(), |row| row.get::<_, String>(0))?; + + for row in rows { + all_results.push(row?); + } + } + + all_results.dedup(); + Ok(all_results) +} + +/// Delete an event by ID. +pub fn delete_event(conn: &Connection, id: &str) -> rusqlite::Result<()> { + conn.execute("DELETE FROM events WHERE id = ?1", [id])?; + Ok(()) +} diff --git a/src-tauri/src/relay/event.rs b/src-tauri/src/relay/event.rs new file mode 100644 index 0000000..9e7801f --- /dev/null +++ b/src-tauri/src/relay/event.rs @@ -0,0 +1,76 @@ +use serde::{Deserialize, Serialize}; +use sha2::{Digest, Sha256}; + +/// Maximum raw event size (128 KB). +pub const MAX_EVENT_SIZE: usize = 128 * 1024; + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct Event { + pub id: String, + pub pubkey: String, + pub created_at: u64, + pub kind: u64, + pub tags: Vec>, + pub content: String, + pub sig: String, +} + +impl Event { + /// Verify that the `id` field matches SHA-256 of the NIP-01 canonical serialization. + pub fn verify_id(&self) -> bool { + let canonical = serde_json::json!([ + 0, + &self.pubkey, + self.created_at, + self.kind, + &self.tags, + &self.content + ]); + let hash = Sha256::digest(canonical.to_string().as_bytes()); + hex::encode(hash) == self.id + } + + /// Verify the BIP-340 Schnorr signature over the event id. + pub fn verify_sig(&self) -> bool { + let Ok(pubkey_bytes) = hex::decode(&self.pubkey) else { + return false; + }; + let Ok(sig_bytes) = hex::decode(&self.sig) else { + return false; + }; + let Ok(msg_bytes) = hex::decode(&self.id) else { + return false; + }; + + let Ok(xonly) = secp256k1::XOnlyPublicKey::from_slice(&pubkey_bytes) else { + return false; + }; + let Ok(sig) = secp256k1::schnorr::Signature::from_slice(&sig_bytes) else { + return false; + }; + + secp256k1::SECP256K1 + .verify_schnorr(&sig, &msg_bytes, &xonly) + .is_ok() + } + + /// Returns true if this event kind is replaceable (NIP-01). + /// Kind 0, 3, and 10000-19999 are replaceable (same pubkey+kind = replace). + pub fn is_replaceable(&self) -> bool { + self.kind == 0 || self.kind == 3 || (10_000..20_000).contains(&self.kind) + } + + /// Returns true if this event kind is parameterized-replaceable (NIP-01). + /// Kind 30000-39999: same pubkey+kind+d-tag = replace. + pub fn is_parameterized_replaceable(&self) -> bool { + (30_000..40_000).contains(&self.kind) + } + + /// Get the `d` tag value (for parameterized replaceable events). + pub fn d_tag(&self) -> Option<&str> { + self.tags + .iter() + .find(|t| t.first().map(|s| s.as_str()) == Some("d")) + .and_then(|t| t.get(1).map(|s| s.as_str())) + } +} diff --git a/src-tauri/src/relay/filter.rs b/src-tauri/src/relay/filter.rs new file mode 100644 index 0000000..a09682b --- /dev/null +++ b/src-tauri/src/relay/filter.rs @@ -0,0 +1,182 @@ +use crate::relay::event::Event; +use rusqlite::types::Value; +use serde::Deserialize; +use std::collections::HashMap; + +#[derive(Debug, Clone, Deserialize)] +pub struct Filter { + pub ids: Option>, + pub authors: Option>, + pub kinds: Option>, + pub since: Option, + pub until: Option, + pub limit: Option, + /// Generic tag filters: #e, #p, #d, etc. + /// During deserialization we capture any field starting with '#'. + #[serde(flatten)] + pub generic_tags: HashMap>, +} + +impl Filter { + /// Check if an event matches this filter in memory (for live fan-out). + /// AND across fields, OR within each field. + pub fn matches(&self, event: &Event) -> bool { + // ids — prefix match + if let Some(ref ids) = self.ids { + if !ids.iter().any(|prefix| event.id.starts_with(prefix)) { + return false; + } + } + + // authors — prefix match + if let Some(ref authors) = self.authors { + if !authors.iter().any(|prefix| event.pubkey.starts_with(prefix)) { + return false; + } + } + + // kinds + if let Some(ref kinds) = self.kinds { + if !kinds.contains(&event.kind) { + return false; + } + } + + // since + if let Some(since) = self.since { + if event.created_at < since { + return false; + } + } + + // until + if let Some(until) = self.until { + if event.created_at > until { + return false; + } + } + + // Generic tag filters (#e, #p, #d, etc.) + for (key, values) in &self.generic_tags { + // key is like "#e", "#p", etc. + if !key.starts_with('#') || key.len() < 2 { + continue; + } + let tag_name = &key[1..]; + let event_tag_values: Vec<&str> = event + .tags + .iter() + .filter(|t| t.first().map(|s| s.as_str()) == Some(tag_name)) + .filter_map(|t| t.get(1).map(|s| s.as_str())) + .collect(); + + if !values.iter().any(|v| event_tag_values.contains(&v.as_str())) { + return false; + } + } + + true + } + + /// Build a SQL WHERE clause + params for querying the events table. + /// Returns (where_clause, params, limit). + pub fn to_sql(&self) -> (String, Vec, Option) { + let mut conditions: Vec = Vec::new(); + let mut params: Vec = Vec::new(); + let mut param_idx = 1usize; + + // ids — prefix match + if let Some(ref ids) = self.ids { + let clauses: Vec = ids + .iter() + .map(|prefix| { + let p = format!("?{}", param_idx); + param_idx += 1; + params.push(Value::Text(format!("{}%", prefix))); + format!("e.id LIKE {}", p) + }) + .collect(); + conditions.push(format!("({})", clauses.join(" OR "))); + } + + // authors — prefix match + if let Some(ref authors) = self.authors { + let clauses: Vec = authors + .iter() + .map(|prefix| { + let p = format!("?{}", param_idx); + param_idx += 1; + params.push(Value::Text(format!("{}%", prefix))); + format!("e.pubkey LIKE {}", p) + }) + .collect(); + conditions.push(format!("({})", clauses.join(" OR "))); + } + + // kinds + if let Some(ref kinds) = self.kinds { + let clauses: Vec = kinds + .iter() + .map(|k| { + let p = format!("?{}", param_idx); + param_idx += 1; + params.push(Value::Integer(*k as i64)); + format!("e.kind = {}", p) + }) + .collect(); + conditions.push(format!("({})", clauses.join(" OR "))); + } + + // since + if let Some(since) = self.since { + let p = format!("?{}", param_idx); + param_idx += 1; + params.push(Value::Integer(since as i64)); + conditions.push(format!("e.created_at >= {}", p)); + } + + // until + if let Some(until) = self.until { + let p = format!("?{}", param_idx); + param_idx += 1; + params.push(Value::Integer(until as i64)); + conditions.push(format!("e.created_at <= {}", p)); + } + + // Generic tag filters — JOIN on event_tags + for (key, values) in &self.generic_tags { + if !key.starts_with('#') || key.len() < 2 { + continue; + } + let tag_name = &key[1..]; + + let value_clauses: Vec = values + .iter() + .map(|v| { + let p = format!("?{}", param_idx); + param_idx += 1; + params.push(Value::Text(v.clone())); + format!("t.tag_value = {}", p) + }) + .collect(); + + let tag_name_p = format!("?{}", param_idx); + param_idx += 1; + params.push(Value::Text(tag_name.to_string())); + + conditions.push(format!( + "EXISTS (SELECT 1 FROM event_tags t WHERE t.event_id = e.id AND t.tag_name = {} AND ({}))", + tag_name_p, + value_clauses.join(" OR ") + )); + } + + let where_clause = if conditions.is_empty() { + "1=1".to_string() + } else { + conditions.join(" AND ") + }; + + (where_clause, params, self.limit) + } +} diff --git a/src-tauri/src/relay/mod.rs b/src-tauri/src/relay/mod.rs new file mode 100644 index 0000000..0a2b044 --- /dev/null +++ b/src-tauri/src/relay/mod.rs @@ -0,0 +1,61 @@ +pub mod db; +pub mod event; +pub mod filter; +pub mod server; +pub mod sub; + +use std::path::PathBuf; +use std::sync::{Arc, Mutex}; + +pub struct RelayHandle { + shutdown_tx: tokio::sync::watch::Sender, + thread: Option>, + port: Arc>>, + data_dir: PathBuf, +} + +impl RelayHandle { + pub fn port(&self) -> Option { + *self.port.lock().unwrap_or_else(|e| e.into_inner()) + } + + pub fn data_dir(&self) -> &PathBuf { + &self.data_dir + } +} + +impl Drop for RelayHandle { + fn drop(&mut self) { + println!("[relay] Sending shutdown signal"); + let _ = self.shutdown_tx.send(true); + if let Some(handle) = self.thread.take() { + let _ = handle.join(); + } + println!("[relay] Shutdown complete"); + } +} + +pub fn start_relay(data_dir: PathBuf, port: u16) -> Result> { + let (shutdown_tx, shutdown_rx) = tokio::sync::watch::channel(false); + let bound_port: Arc>> = Arc::new(Mutex::new(None)); + let bound_port_clone = bound_port.clone(); + let data_dir_clone = data_dir.clone(); + + let thread = std::thread::Builder::new() + .name("vega-relay".into()) + .spawn(move || { + let rt = tokio::runtime::Builder::new_multi_thread() + .worker_threads(2) + .enable_all() + .build() + .expect("Failed to create relay tokio runtime"); + rt.block_on(server::run(data_dir_clone, port, shutdown_rx, bound_port_clone)); + })?; + + Ok(RelayHandle { + shutdown_tx, + thread: Some(thread), + port: bound_port, + data_dir, + }) +} diff --git a/src-tauri/src/relay/server.rs b/src-tauri/src/relay/server.rs new file mode 100644 index 0000000..cec3b08 --- /dev/null +++ b/src-tauri/src/relay/server.rs @@ -0,0 +1,298 @@ +use crate::relay::db; +use crate::relay::event::{Event, MAX_EVENT_SIZE}; +use crate::relay::filter::Filter; +use crate::relay::sub::SubscriptionMap; +use futures_util::{SinkExt, StreamExt}; +use rusqlite::Connection; +use std::net::SocketAddr; +use std::path::PathBuf; +use std::sync::{Arc, Mutex}; +use tokio::net::{TcpListener, TcpStream}; +use tokio::sync::{broadcast, watch}; +use tokio_tungstenite::tungstenite::Message; + +/// Shared relay state across all connections. +struct RelayState { + db: Mutex, + broadcast_tx: broadcast::Sender, +} + +/// Run the relay server. Blocks until shutdown signal. +pub async fn run( + data_dir: PathBuf, + port: u16, + mut shutdown_rx: watch::Receiver, + bound_port: Arc>>, +) { + let conn = match db::open_relay_db(&data_dir) { + Ok(c) => c, + Err(e) => { + eprintln!("[relay] Failed to open relay DB: {}", e); + return; + } + }; + + let (broadcast_tx, _) = broadcast::channel::(1024); + let state = Arc::new(RelayState { + db: Mutex::new(conn), + broadcast_tx, + }); + + // Try the requested port, then fallback ports + let listener = match bind_with_fallback(port).await { + Some(l) => l, + None => { + eprintln!("[relay] Could not bind to any port in range {}-{}", port, port + 10); + return; + } + }; + + let local_addr = listener.local_addr().unwrap(); + if let Ok(mut p) = bound_port.lock() { + *p = Some(local_addr.port()); + } + println!("[relay] Listening on ws://{}", local_addr); + + loop { + tokio::select! { + result = listener.accept() => { + match result { + Ok((stream, addr)) => { + let state = state.clone(); + let shutdown_rx = shutdown_rx.clone(); + tokio::spawn(handle_connection(stream, addr, state, shutdown_rx)); + } + Err(e) => { + eprintln!("[relay] Accept error: {}", e); + } + } + } + _ = shutdown_rx.changed() => { + println!("[relay] Shutting down"); + break; + } + } + } +} + +async fn bind_with_fallback(port: u16) -> Option { + for p in port..=port.saturating_add(10) { + match TcpListener::bind(("127.0.0.1", p)).await { + Ok(l) => return Some(l), + Err(e) => { + if p == port { + eprintln!("[relay] Port {} in use ({}), trying fallbacks...", p, e); + } + } + } + } + None +} + +async fn handle_connection( + stream: TcpStream, + addr: SocketAddr, + state: Arc, + mut shutdown_rx: watch::Receiver, +) { + let ws = match tokio_tungstenite::accept_async(stream).await { + Ok(ws) => ws, + Err(e) => { + eprintln!("[relay] WebSocket handshake failed from {}: {}", addr, e); + return; + } + }; + + let (mut ws_tx, mut ws_rx) = ws.split(); + let mut subs = SubscriptionMap::new(); + let mut broadcast_rx = state.broadcast_tx.subscribe(); + + loop { + tokio::select! { + // Incoming WebSocket messages from this client + msg = ws_rx.next() => { + match msg { + Some(Ok(Message::Text(text))) => { + let responses = handle_message(&text, &state, &mut subs); + for resp in responses { + if ws_tx.send(Message::Text(resp.into())).await.is_err() { + return; + } + } + } + Some(Ok(Message::Close(_))) | None => return, + Some(Err(_)) => return, + _ => {} // ping/pong/binary — ignore + } + } + // Broadcast events from other connections + result = broadcast_rx.recv() => { + match result { + Ok(raw) => { + // Parse the event to check against subscriptions + if let Ok(event) = serde_json::from_str::(&raw) { + let matching = subs.matching_sub_ids(&event); + for sub_id in matching { + let msg = serde_json::json!(["EVENT", sub_id, serde_json::from_str::(&raw).unwrap_or_default()]); + if ws_tx.send(Message::Text(msg.to_string().into())).await.is_err() { + return; + } + } + } + } + Err(broadcast::error::RecvError::Lagged(n)) => { + eprintln!("[relay] Client {} lagged, skipped {} events", addr, n); + } + Err(broadcast::error::RecvError::Closed) => return, + } + } + // Shutdown signal + _ = shutdown_rx.changed() => return, + } + } +} + +/// Handle a single NIP-01 message. Returns response messages to send back. +fn handle_message(text: &str, state: &RelayState, subs: &mut SubscriptionMap) -> Vec { + let parsed: serde_json::Value = match serde_json::from_str(text) { + Ok(v) => v, + Err(_) => return vec![notice("error: invalid JSON")], + }; + + let arr = match parsed.as_array() { + Some(a) if !a.is_empty() => a, + _ => return vec![notice("error: expected JSON array")], + }; + + let msg_type = match arr[0].as_str() { + Some(s) => s, + None => return vec![notice("error: first element must be a string")], + }; + + match msg_type { + "EVENT" => handle_event(arr, state), + "REQ" => handle_req(arr, state, subs), + "CLOSE" => handle_close(arr, subs), + _ => vec![notice(&format!("error: unknown message type: {}", msg_type))], + } +} + +fn handle_event(arr: &[serde_json::Value], state: &RelayState) -> Vec { + if arr.len() < 2 { + return vec![notice("error: EVENT requires an event object")]; + } + + let raw = arr[1].to_string(); + + // Size check + if raw.len() > MAX_EVENT_SIZE { + let id = arr[1]["id"].as_str().unwrap_or(""); + return vec![ok_msg(id, false, "error: event too large")]; + } + + let event: Event = match serde_json::from_value(arr[1].clone()) { + Ok(e) => e, + Err(e) => return vec![ok_msg("", false, &format!("error: invalid event: {}", e))], + }; + + // Verify ID + if !event.verify_id() { + return vec![ok_msg(&event.id, false, "error: invalid event id")]; + } + + // Verify signature + if !event.verify_sig() { + return vec![ok_msg(&event.id, false, "error: invalid signature")]; + } + + // Store + let conn = match state.db.lock() { + Ok(c) => c, + Err(_) => return vec![ok_msg(&event.id, false, "error: internal database error")], + }; + + match db::store_event(&conn, &event, &raw) { + Ok(true) => { + // Broadcast to other connections + let _ = state.broadcast_tx.send(raw); + vec![ok_msg(&event.id, true, "")] + } + Ok(false) => { + // Already exists or rejected (older replaceable) + vec![ok_msg(&event.id, true, "duplicate:")] + } + Err(e) => vec![ok_msg(&event.id, false, &format!("error: {}", e))], + } +} + +fn handle_req(arr: &[serde_json::Value], state: &RelayState, subs: &mut SubscriptionMap) -> Vec { + if arr.len() < 3 { + return vec![notice("error: REQ requires subscription id and at least one filter")]; + } + + let sub_id = match arr[1].as_str() { + Some(s) => s.to_string(), + None => return vec![notice("error: subscription id must be a string")], + }; + + // Parse filters (elements 2..n) + let mut filters: Vec = Vec::new(); + for val in &arr[2..] { + match serde_json::from_value(val.clone()) { + Ok(f) => filters.push(f), + Err(e) => return vec![notice(&format!("error: invalid filter: {}", e))], + } + } + + // Register/replace subscription (NIP-01: same sub_id replaces) + subs.upsert(sub_id.clone(), filters.clone()); + + // Query DB for stored events matching filters + let conn = match state.db.lock() { + Ok(c) => c, + Err(_) => return vec![notice("error: internal database error")], + }; + + let mut responses: Vec = Vec::new(); + + match db::query_events(&conn, &filters) { + Ok(raws) => { + for raw in raws { + let event_val: serde_json::Value = + serde_json::from_str(&raw).unwrap_or_default(); + let msg = serde_json::json!(["EVENT", &sub_id, event_val]); + responses.push(msg.to_string()); + } + } + Err(e) => { + responses.push(notice(&format!("error: query failed: {}", e))); + } + } + + // EOSE + responses.push(serde_json::json!(["EOSE", &sub_id]).to_string()); + + responses +} + +fn handle_close(arr: &[serde_json::Value], subs: &mut SubscriptionMap) -> Vec { + if arr.len() < 2 { + return vec![notice("error: CLOSE requires a subscription id")]; + } + + let sub_id = match arr[1].as_str() { + Some(s) => s, + None => return vec![notice("error: subscription id must be a string")], + }; + + subs.remove(sub_id); + vec![serde_json::json!(["CLOSED", sub_id, ""]).to_string()] +} + +fn notice(msg: &str) -> String { + serde_json::json!(["NOTICE", msg]).to_string() +} + +fn ok_msg(id: &str, success: bool, message: &str) -> String { + serde_json::json!(["OK", id, success, message]).to_string() +} diff --git a/src-tauri/src/relay/sub.rs b/src-tauri/src/relay/sub.rs new file mode 100644 index 0000000..4aec56c --- /dev/null +++ b/src-tauri/src/relay/sub.rs @@ -0,0 +1,51 @@ +use crate::relay::event::Event; +use crate::relay::filter::Filter; +use std::collections::HashMap; + +pub struct Subscription { + pub id: String, + pub filters: Vec, +} + +impl Subscription { + /// Returns true if the event matches any filter in this subscription. + pub fn matches(&self, event: &Event) -> bool { + self.filters.iter().any(|f| f.matches(event)) + } +} + +/// Per-connection subscription state. +pub struct SubscriptionMap { + subs: HashMap, +} + +impl SubscriptionMap { + pub fn new() -> Self { + Self { + subs: HashMap::new(), + } + } + + /// Add or replace a subscription. Returns the old one if replaced. + pub fn upsert(&mut self, id: String, filters: Vec) -> Option { + let sub = Subscription { + id: id.clone(), + filters, + }; + self.subs.insert(id, sub) + } + + /// Remove a subscription by ID. + pub fn remove(&mut self, id: &str) -> Option { + self.subs.remove(id) + } + + /// Get all subscription IDs that match the given event. + pub fn matching_sub_ids(&self, event: &Event) -> Vec { + self.subs + .values() + .filter(|sub| sub.matches(event)) + .map(|sub| sub.id.clone()) + .collect() + } +} diff --git a/src/components/shared/SettingsView.tsx b/src/components/shared/SettingsView.tsx index 94d8ab0..f7a7fba 100644 --- a/src/components/shared/SettingsView.tsx +++ b/src/components/shared/SettingsView.tsx @@ -1,4 +1,4 @@ -import { useState } from "react"; +import { useState, useEffect } from "react"; import { save } from "@tauri-apps/plugin-dialog"; import { writeTextFile } from "@tauri-apps/plugin-fs"; import { useUserStore } from "../../stores/user"; @@ -8,12 +8,22 @@ import { useMuteStore } from "../../stores/mute"; import { useBookmarkStore } from "../../stores/bookmark"; import { getStoredRelayUrls } from "../../lib/nostr"; import { useProfile } from "../../hooks/useProfile"; +import { profileName } from "../../lib/utils"; import { NWCWizard } from "./NWCWizard"; import { getNotificationSettings, saveNotificationSettings, ensurePermission } from "../../lib/notifications"; +import { + isLocalRelayEnabled, + setLocalRelayEnabled, + connectLocalRelay, + disconnectLocalRelay, + getRelayPort, + getRelayStats, + type RelayStats, +} from "../../lib/localRelay"; function MutedRow({ pubkey, onUnmute }: { pubkey: string; onUnmute: () => void }) { const profile = useProfile(pubkey); - const name = profile?.displayName || profile?.name || pubkey.slice(0, 12) + "…"; + const name = profileName(profile, pubkey.slice(0, 12) + "…"); return (
{profile?.picture && ( @@ -355,6 +365,77 @@ function FontSizeSection() { ); } +function formatBytes(bytes: number): string { + if (bytes < 1024) return `${bytes} B`; + if (bytes < 1024 * 1024) return `${(bytes / 1024).toFixed(1)} KB`; + return `${(bytes / (1024 * 1024)).toFixed(1)} MB`; +} + +function ExperimentalSection() { + const [enabled, setEnabled] = useState(isLocalRelayEnabled); + const [port, setPort] = useState(null); + const [stats, setStats] = useState(null); + + useEffect(() => { + if (enabled) { + getRelayPort().then(setPort); + getRelayStats().then(setStats); + } + }, [enabled]); + + const toggle = () => { + const next = !enabled; + setEnabled(next); + setLocalRelayEnabled(next); + if (next) { + connectLocalRelay().catch(() => {}); + } else { + disconnectLocalRelay(); + setPort(null); + setStats(null); + } + }; + + return ( +
+

+ Experimental +

+

+ Features under development. May change or be removed. +

+ +

+ Run a local Nostr relay for offline access and faster reads. +

+ {enabled && (port || stats) && ( +
+ {port &&

Running on port {port}

} + {stats && ( +

+ {stats.event_count} events stored · {formatBytes(stats.db_size_bytes)} +

+ )} +
+ )} +
+ ); +} + export function SettingsView() { return (
@@ -367,6 +448,7 @@ export function SettingsView() { + diff --git a/src/lib/localRelay.ts b/src/lib/localRelay.ts new file mode 100644 index 0000000..6e81e03 --- /dev/null +++ b/src/lib/localRelay.ts @@ -0,0 +1,221 @@ +import { invoke } from "@tauri-apps/api/core"; +import { NDKEvent, NDKFilter, NDKKind, NDKRelay } from "@nostr-dev-kit/ndk"; +import { getNDK, fetchWithTimeout, FEED_TIMEOUT } from "./nostr"; + +const STORAGE_KEY = "vega_local_relay_enabled"; +const LAST_SYNC_KEY = "vega_local_relay_last_sync"; +const LOCAL_RELAY_PREFIX = "ws://127.0.0.1:48"; + +export function isLocalRelayEnabled(): boolean { + return localStorage.getItem(STORAGE_KEY) === "true"; +} + +export function setLocalRelayEnabled(enabled: boolean): void { + localStorage.setItem(STORAGE_KEY, enabled ? "true" : "false"); +} + +export async function getRelayPort(): Promise { + try { + return await invoke("relay_get_port"); + } catch { + return null; + } +} + +export interface RelayStats { + event_count: number; + db_size_bytes: number; +} + +export async function getRelayStats(): Promise { + try { + return await invoke("relay_get_stats"); + } catch { + return null; + } +} + +/** + * Add the local relay to NDK's pool without persisting to the relay list. + * Retries once after 500ms if port isn't available yet (race with server startup). + */ +export async function connectLocalRelay(): Promise { + let port = await getRelayPort(); + if (port === null) { + await new Promise((r) => setTimeout(r, 500)); + port = await getRelayPort(); + } + if (port === null) return; + + const url = `ws://127.0.0.1:${port}`; + const instance = getNDK(); + + if (instance.pool?.relays.has(url)) return; + + const relay = new NDKRelay(url, undefined, instance); + instance.pool?.addRelay(relay, true); + console.log(`[Vega] Local relay connected: ${url}`); +} + +/** + * Remove any local relay (ws://127.0.0.1:48XX) from NDK's pool. + * Does NOT touch the stored relay list. + */ +export function disconnectLocalRelay(): void { + const instance = getNDK(); + if (!instance.pool?.relays) return; + + for (const [url, relay] of instance.pool.relays.entries()) { + if (url.startsWith(LOCAL_RELAY_PREFIX)) { + relay.disconnect(); + instance.pool.relays.delete(url); + console.log(`[Vega] Local relay disconnected: ${url}`); + } + } +} + +// ── Catch-up sync ────────────────────────────────────────────────────────── + +function getLastSyncTimestamp(): number | null { + const stored = localStorage.getItem(LAST_SYNC_KEY); + return stored ? parseInt(stored, 10) : null; +} + +function setLastSyncTimestamp(ts: number): void { + localStorage.setItem(LAST_SYNC_KEY, String(ts)); +} + +/** + * Write events to the local relay via a direct WebSocket connection. + * Bypasses NDK's publish to avoid overwhelming WebKit. + * Sends NIP-01 EVENT messages one at a time sequentially. + */ +async function writeEventsToLocalRelay(events: NDKEvent[]): Promise { + if (events.length === 0) return 0; + + const port = await getRelayPort(); + if (!port) return 0; + + const url = `ws://127.0.0.1:${port}`; + + return new Promise((resolve) => { + const ws = new WebSocket(url); + let written = 0; + let idx = 0; + + const sendNext = () => { + if (idx >= events.length) { + ws.close(); + resolve(written); + return; + } + const event = events[idx++]; + const raw = event.rawEvent(); + ws.send(JSON.stringify(["EVENT", raw])); + }; + + ws.onopen = () => sendNext(); + + ws.onmessage = (msg) => { + try { + const parsed = JSON.parse(msg.data); + // ["OK", id, success, message] + if (parsed[0] === "OK" && parsed[2]) { + written++; + } + } catch { /* ignore */ } + sendNext(); + }; + + ws.onerror = () => resolve(written); + ws.onclose = () => resolve(written); + + // Safety timeout — don't hang forever + setTimeout(() => { + ws.close(); + resolve(written); + }, 30000); + }); +} + +/** + * Sync recent events from remote relays into the local relay. + * Non-blocking — intended to run in background after connect. + */ +export async function syncToLocalRelay( + userPubkey: string, + followPubkeys: string[], +): Promise { + console.log("[Vega] Starting local relay catch-up..."); + const syncStart = performance.now(); + const instance = getNDK(); + const now = Math.floor(Date.now() / 1000); + const lastSync = getLastSyncTimestamp(); + + // Determine time windows + const feedSince = lastSync ?? (now - 24 * 3600); // 24h or since last sync + const mentionsSince = lastSync ?? (now - 7 * 24 * 3600); // 7 days or since last sync + + const allEvents: NDKEvent[] = []; + + // 1. User's own notes (last 50, all-time for first sync) + try { + const filter: NDKFilter = { kinds: [NDKKind.Text], authors: [userPubkey], limit: 50 }; + const events = await fetchWithTimeout(instance, filter, FEED_TIMEOUT); + allEvents.push(...Array.from(events)); + } catch { /* continue */ } + + // 2. User's profile (kind 0) and contact list (kind 3) + try { + const filter: NDKFilter = { kinds: [0 as NDKKind, 3 as NDKKind], authors: [userPubkey], limit: 2 }; + const events = await fetchWithTimeout(instance, filter, FEED_TIMEOUT); + allEvents.push(...Array.from(events)); + } catch { /* continue */ } + + // 3. Follow feed (since last sync or 24h) + if (followPubkeys.length > 0) { + try { + // Batch follows to avoid oversized filters + const batchSize = 50; + for (let i = 0; i < followPubkeys.length; i += batchSize) { + const batch = followPubkeys.slice(i, i + batchSize); + const filter: NDKFilter = { + kinds: [NDKKind.Text], + authors: batch, + since: feedSince, + limit: 100, + }; + const events = await fetchWithTimeout(instance, filter, FEED_TIMEOUT); + allEvents.push(...Array.from(events)); + } + } catch { /* continue */ } + } + + // 4. Mentions (since last sync or 7 days) + try { + const filter: NDKFilter = { + kinds: [NDKKind.Text], + "#p": [userPubkey], + since: mentionsSince, + limit: 100, + }; + const events = await fetchWithTimeout(instance, filter, 12000); + allEvents.push(...Array.from(events)); + } catch { /* continue */ } + + // Deduplicate by event ID + const seen = new Set(); + const unique = allEvents.filter((e) => { + const id = e.id; + if (seen.has(id)) return false; + seen.add(id); + return true; + }); + + // Write to local relay + const written = await writeEventsToLocalRelay(unique); + setLastSyncTimestamp(now); + + const elapsed = Math.round(performance.now() - syncStart); + console.log(`[Vega] Synced ${written}/${unique.length} events to local relay (${elapsed}ms)`); +} diff --git a/src/lib/nostr/core.ts b/src/lib/nostr/core.ts index 60a3667..d5fac53 100644 --- a/src/lib/nostr/core.ts +++ b/src/lib/nostr/core.ts @@ -127,6 +127,14 @@ export async function resetNDK(): Promise { console.log("[Vega] NDK instance reset — connecting fresh relays"); await ndk.connect(); await waitForConnectedRelay(ndk, 5000); + + // Re-add local relay if enabled (dynamic import to avoid circular dependency) + import("../localRelay").then(({ isLocalRelayEnabled, connectLocalRelay }) => { + if (isLocalRelayEnabled()) { + connectLocalRelay().catch(() => {}); + } + }).catch(() => {}); + const relays = Array.from(ndk.pool?.relays?.values() ?? []); const connected = relays.filter((r) => r.connected).length; console.log(`[Vega] Fresh connection: ${connected}/${relays.length} relays connected`); diff --git a/src/stores/feed.ts b/src/stores/feed.ts index e31496c..43c2327 100644 --- a/src/stores/feed.ts +++ b/src/stores/feed.ts @@ -5,6 +5,8 @@ import { seedReactionsCache } from "../hooks/useReactions"; import { useToastStore } from "./toast"; import { dbLoadFeed, dbSaveNotes } from "../lib/db"; import { diagWrapFetch, logDiag, startRelaySnapshots, getRelayStates } from "../lib/feedDiagnostics"; +// Local relay imports deferred to avoid circular dependency +// import { isLocalRelayEnabled, connectLocalRelay } from "../lib/localRelay"; const TRENDING_CACHE_KEY = "wrystr_trending_cache"; const TRENDING_TTL = 10 * 60 * 1000; // 10 minutes @@ -50,8 +52,30 @@ export const useFeedStore = create((set, get) => ({ try { set({ error: null }); const connectStart = performance.now(); - await connectToRelays(); + // connectToRelays() can hang if NDK's instance.connect() never resolves — safety timeout + await Promise.race([ + connectToRelays(), + new Promise((resolve) => setTimeout(resolve, 15000)), + ]); set({ connected: true }); + + // Connect local embedded relay if enabled, then sync recent events + try { + const { isLocalRelayEnabled, connectLocalRelay, syncToLocalRelay } = await import("../lib/localRelay"); + if (isLocalRelayEnabled()) { + await connectLocalRelay(); + const { useUserStore } = await import("./user"); + const { pubkey, follows } = useUserStore.getState(); + if (pubkey) { + syncToLocalRelay(pubkey, follows).catch((err) => + console.warn("[Vega] Local relay sync failed:", err), + ); + } + } + } catch (err) { + console.warn("[Vega] Local relay setup failed:", err); + } + const connectMs = Math.round(performance.now() - connectStart); logDiag({ ts: new Date().toISOString(),