Add live feed subscription, timeouts on all relay fetches

Global feed now uses a persistent live subscription (closeOnEose: false)
so new notes stream in real-time instead of requiring manual refresh.
Inspired by Wisp's streaming architecture.

Every fetchEvents call across the entire codebase now uses
fetchWithTimeout with groupable: false — prevents NDK from
batching/reusing stale subscriptions. This fixes Articles, DMs,
Notifications, Zaps, and Trending hanging indefinitely.

Also adds since filters on global (2h) and follow (24h) feeds
to ensure relay freshness, and fixes ArticleFeed re-fetch bug
where follows changes wiped latest tab results.
This commit is contained in:
Jure
2026-03-22 11:35:28 +01:00
parent 2e03c6ce11
commit b3e7ff7029
15 changed files with 176 additions and 162 deletions

View File

@@ -1,12 +1,16 @@
import { create } from "zustand";
import { NDKEvent } from "@nostr-dev-kit/ndk";
import { NDKEvent, NDKFilter, NDKKind, NDKSubscription, NDKSubscriptionCacheUsage } from "@nostr-dev-kit/ndk";
import { connectToRelays, ensureConnected, resetNDK, fetchGlobalFeed, fetchBatchEngagement, fetchTrendingCandidates, getNDK } from "../lib/nostr";
import { dbLoadFeed, dbSaveNotes } from "../lib/db";
import { diagWrapFetch, logDiag, startRelaySnapshots, getRelayStates } from "../lib/feedDiagnostics";
const TRENDING_CACHE_KEY = "wrystr_trending_cache";
const TRENDING_TTL = 10 * 60 * 1000; // 10 minutes
const MAX_FEED_SIZE = 200;
// Live subscription handle — persists across store calls
let liveSub: NDKSubscription | null = null;
let saveTimer: ReturnType<typeof setTimeout> | null = null;
interface FeedState {
notes: NDKEvent[];
@@ -19,6 +23,7 @@ interface FeedState {
connect: () => Promise<void>;
loadCachedFeed: () => Promise<void>;
loadFeed: () => Promise<void>;
startLiveFeed: () => void;
loadTrendingFeed: (force?: boolean) => Promise<void>;
setFocusedNoteIndex: (n: number) => void;
}
@@ -73,7 +78,11 @@ export const useFeedStore = create<FeedState>((set, get) => ({
resetNDK().then(() => {
if (getNDK().pool?.relays) {
const after = Array.from(getNDK().pool.relays.values());
if (after.some((r) => r.connected)) set({ connected: true });
if (after.some((r) => r.connected)) {
set({ connected: true });
// Restart live sub after NDK reset
get().startLiveFeed();
}
}
}).catch(() => {});
} else {
@@ -90,7 +99,7 @@ export const useFeedStore = create<FeedState>((set, get) => ({
loadCachedFeed: async () => {
try {
const rawNotes = await dbLoadFeed(200);
const rawNotes = await dbLoadFeed(MAX_FEED_SIZE);
if (rawNotes.length === 0) return;
const ndk = getNDK();
const events = rawNotes.map((raw) => new NDKEvent(ndk, JSON.parse(raw)));
@@ -100,6 +109,9 @@ export const useFeedStore = create<FeedState>((set, get) => ({
}
},
/**
* One-shot feed fetch — loads initial batch, then starts live subscription.
*/
loadFeed: async () => {
if (get().loading) return;
set({ loading: true, error: null });
@@ -107,23 +119,78 @@ export const useFeedStore = create<FeedState>((set, get) => ({
await ensureConnected();
const fresh = await diagWrapFetch("global_fetch", () => fetchGlobalFeed(80));
// Merge with currently displayed notes so cached notes aren't lost
// if the relay returns fewer results than the cache had.
// Merge with currently displayed notes
const freshIds = new Set(fresh.map((n) => n.id));
const kept = get().notes.filter((n) => !freshIds.has(n.id));
const merged = [...fresh, ...kept]
.sort((a, b) => (b.created_at ?? 0) - (a.created_at ?? 0))
.slice(0, 200);
.slice(0, MAX_FEED_SIZE);
set({ notes: merged, loading: false, focusedNoteIndex: -1 });
// Persist fresh notes to SQLite (fire-and-forget)
dbSaveNotes(fresh.map((e) => JSON.stringify(e.rawEvent())));
// Start live subscription after initial load
get().startLiveFeed();
} catch (err) {
set({ error: `Feed failed: ${err}`, loading: false });
}
},
/**
* Start a persistent live subscription for new notes.
* New events stream in and are prepended to the feed in real time.
*/
startLiveFeed: () => {
// Close existing subscription if any
if (liveSub) {
try { liveSub.stop(); } catch { /* ignore */ }
liveSub = null;
}
const ndk = getNDK();
const since = Math.floor(Date.now() / 1000);
const filter: NDKFilter = { kinds: [NDKKind.Text], since, limit: 20 };
const sub = ndk.subscribe(filter, {
closeOnEose: false, // Keep subscription open — this is the key difference
groupable: false,
cacheUsage: NDKSubscriptionCacheUsage.ONLY_RELAY,
});
sub.on("event", (event: NDKEvent) => {
const current = get().notes;
// Deduplicate
if (current.some((n) => n.id === event.id)) return;
const updated = [event, ...current]
.sort((a, b) => (b.created_at ?? 0) - (a.created_at ?? 0))
.slice(0, MAX_FEED_SIZE);
set({ notes: updated });
// Debounced save to SQLite — batch saves every 5s
if (!saveTimer) {
saveTimer = setTimeout(() => {
saveTimer = null;
const toSave = get().notes.slice(0, 20);
dbSaveNotes(toSave.map((e) => JSON.stringify(e.rawEvent())));
}, 5000);
}
});
sub.on("eose", () => {
logDiag({
ts: new Date().toISOString(),
action: "live_feed_eose",
details: "Live subscription received EOSE — now streaming new events",
});
});
liveSub = sub;
console.log("[Wrystr] Live feed subscription started");
},
loadTrendingFeed: async (force?: boolean) => {
if (get().trendingLoading) return;