Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 82 additions & 11 deletions desktop/src/features/channels/readState/readStateManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,9 @@ export class ReadStateManager {
private maxFetchedCreatedAt = 0;
private forcedContexts = new Set<string>();
private contextSourceCreatedAt = new Map<string, number>();
private pendingSyncedRollbacks = new Set<string>();
private pendingSyncedAdvances = new Set<string>();
private destroyed = false;

constructor(pubkey: string, relayClient: RelayClient) {
this.pubkey = pubkey;
Expand All @@ -75,17 +78,25 @@ export class ReadStateManager {
}

async initialize(): Promise<void> {
if (this.initialized) return;
if (this.initialized || this.destroyed) return;
console.debug(
`[ReadStateManager] initialize pubkey=${this.pubkey.substring(0, 8)}… clientId=${this.clientId.substring(0, 8)}… slotId=${this.slotId}`,
);

this.hydrateFromLocalStorage();

await this.fetchAndMerge();
if (this.destroyed) return;
await this.startLiveSubscription();
if (this.destroyed) return;
if (!this.isIdenticalToLastPublished(this.currentContexts())) {
this.schedulePublish();
}

this.initialized = true;
console.debug(
`[ReadStateManager] initialize complete maxFetchedCreatedAt=${this.maxFetchedCreatedAt} contexts=${this.effectiveState.size}`,
);
this.notifyListeners();
}

Expand Down Expand Up @@ -152,6 +163,7 @@ export class ReadStateManager {
}

destroy(): void {
this.destroyed = true;
// Flush any pending writes immediately
if (this.debounceTimer !== null) {
window.clearTimeout(this.debounceTimer);
Expand All @@ -177,7 +189,8 @@ export class ReadStateManager {
since: Math.floor(Date.now() / 1_000) - READ_STATE_HORIZON_SECONDS,
limit: READ_STATE_FETCH_LIMIT,
});
} catch {
} catch (error) {
console.debug("[ReadStateManager] fetchAndMerge failed:", error);
// If fetch fails, proceed with local state only
return;
}
Expand Down Expand Up @@ -219,7 +232,11 @@ export class ReadStateManager {
client_id: parsed.client_id,
contexts: sanitizeContexts(parsed.contexts),
};
} catch {
} catch (error) {
console.debug(
`[ReadStateManager] mergeEvents decrypt failed event=${event.id.substring(0, 8)}…:`,
error,
);
continue;
}

Expand Down Expand Up @@ -260,7 +277,11 @@ export class ReadStateManager {
localStorage.setItem(slotIdKey(this.pubkey), this.slotId);
break;
}
} catch {
} catch (error) {
console.debug(
`[ReadStateManager] conflict check decrypt failed event=${event.id.substring(0, 8)}…:`,
error,
);
// Decrypt failure — skip this event
}
}
Expand All @@ -286,14 +307,24 @@ export class ReadStateManager {
void this.handleIncomingEvent(event);
},
);
if (this.destroyed) {
unsub();
return;
}
this.unsubscribeLive = unsub;
} catch {
console.debug("[ReadStateManager] live subscription established");
} catch (error) {
console.debug("[ReadStateManager] live subscription FAILED:", error);
// Non-fatal: we can still work with local state
}
}

private async handleIncomingEvent(event: RelayEvent): Promise<void> {
if (event.pubkey !== this.pubkey) return;
if (this.destroyed) return;
console.debug(
`[ReadStateManager] incoming event=${event.id.substring(0, 8)}… created_at=${event.created_at}`,
);

const dTags = event.tags.filter((t) => t[0] === "d");
if (dTags.length !== 1) return;
Expand All @@ -320,7 +351,11 @@ export class ReadStateManager {
client_id: parsed.client_id,
contexts: sanitizeContexts(parsed.contexts),
};
} catch {
} catch (error) {
console.debug(
`[ReadStateManager] incoming event decrypt/parse failed event=${event.id.substring(0, 8)}…:`,
error,
);
return;
}

Expand All @@ -331,6 +366,16 @@ export class ReadStateManager {
const current = this.effectiveState.get(ctx) ?? 0;
if (event.created_at > sourceCreatedAt) {
if (this.effectiveState.get(ctx) !== ts) {
if (ts < current && current > 0) {
this.pendingSyncedRollbacks.add(ctx);
this.pendingSyncedAdvances.delete(ctx);
console.debug(
`[ReadStateManager] synced rollback ctx=${ctx.substring(0, 12)}… from=${current} to=${ts}`,
);
} else if (ts > current) {
this.pendingSyncedAdvances.add(ctx);
this.pendingSyncedRollbacks.delete(ctx);
}
this.effectiveState.set(ctx, ts);
anyAdvanced = true;
}
Expand All @@ -344,6 +389,9 @@ export class ReadStateManager {
anyAdvanced = true;
}
}
console.debug(
`[ReadStateManager] incoming result anyAdvanced=${anyAdvanced} clientId=${blob.client_id.substring(0, 8)}…`,
);

if (anyAdvanced) {
this.persistLocalState();
Expand All @@ -368,6 +416,7 @@ export class ReadStateManager {
}

private async publish(): Promise<void> {
console.debug(`[ReadStateManager] publish starting slotId=${this.slotId}`);
await this.fetchOwnBlobBeforePublish();

// Build blob from contexts this client is allowed to publish.
Expand Down Expand Up @@ -408,12 +457,17 @@ export class ReadStateManager {
"Timed out publishing read state.",
"Failed to publish read state.",
);
console.debug(
`[ReadStateManager] publish accepted createdAt=${createdAt}`,
);

this.lastPublishedContexts = contexts;
this.forcedContexts.clear();
for (const key of Object.keys(contexts)) {
this.contextSourceCreatedAt.set(key, createdAt);
if (this.lastPublishedContexts[key] !== contexts[key]) {
this.contextSourceCreatedAt.set(key, createdAt);
}
}
this.lastPublishedContexts = contexts;
this.forcedContexts.clear();
this.maxFetchedCreatedAt = Math.max(
this.maxFetchedCreatedAt,
event.created_at,
Expand All @@ -435,7 +489,11 @@ export class ReadStateManager {

await this.mergeEvents(events);
this.persistLocalState();
} catch {
} catch (error) {
console.debug(
"[ReadStateManager] fetchOwnBlobBeforePublish failed:",
error,
);
// Per NIP-RS, proceed with reachable data and merge on a later fetch.
}
}
Expand Down Expand Up @@ -486,11 +544,24 @@ export class ReadStateManager {
);
}

drainSyncedRollbacks(): ReadonlySet<string> {
const drained = this.pendingSyncedRollbacks;
this.pendingSyncedRollbacks = new Set<string>();
return drained;
}

drainSyncedAdvances(): ReadonlySet<string> {
const drained = this.pendingSyncedAdvances;
this.pendingSyncedAdvances = new Set<string>();
return drained;
}

private notifyListeners(): void {
for (const listener of this.listeners) {
try {
listener();
} catch {
} catch (error) {
console.debug("[ReadStateManager] listener threw:", error);
// Don't let a broken listener break the manager
}
}
Expand Down
15 changes: 12 additions & 3 deletions desktop/src/features/channels/readState/readStateStorage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ function mergeLocalStorageKey(
contexts.set(channelId, unixSeconds);
}
}
} catch {
} catch (error) {
console.debug("[ReadStateManager] storage: contexts JSON corrupt:", error);
// Corrupt localStorage, ignore.
}
}
Expand All @@ -50,7 +51,11 @@ function readPublishableContextIds(pubkey: string): Set<string> {
result.add(value);
}
}
} catch {
} catch (error) {
console.debug(
"[ReadStateManager] storage: publishableContextIds JSON corrupt:",
error,
);
// Corrupt localStorage, ignore.
}

Expand All @@ -71,7 +76,11 @@ function readContextSourceCreatedAt(pubkey: string): Map<string, number> {
result.set(key, value);
}
}
} catch {
} catch (error) {
console.debug(
"[ReadStateManager] storage: sourceCreatedAt JSON corrupt:",
error,
);
// Corrupt localStorage, ignore.
}

Expand Down
14 changes: 14 additions & 0 deletions desktop/src/features/channels/readState/useReadState.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import type { RelayClient } from "@/shared/api/relayClientSession";
const noopGetTimestamp = () => null;
const noopMarkRead = () => {};
const noopMarkUnread = () => {};
const noopDrainRollbacks = (): ReadonlySet<string> => new Set<string>();
const noopDrainAdvances = (): ReadonlySet<string> => new Set<string>();

/**
* React hook that creates and manages a ReadStateManager instance.
Expand Down Expand Up @@ -78,6 +80,14 @@ export function useReadState(
[],
);

const drainSyncedRollbacks = React.useCallback((): ReadonlySet<string> => {
return managerRef.current?.drainSyncedRollbacks() ?? new Set<string>();
}, []);

const drainSyncedAdvances = React.useCallback((): ReadonlySet<string> => {
return managerRef.current?.drainSyncedAdvances() ?? new Set<string>();
}, []);

const isReady = Boolean(
pubkey && relayClient && initializedPubkey === pubkey,
);
Expand All @@ -89,6 +99,8 @@ export function useReadState(
markContextRead: noopMarkRead,
markContextUnread: noopMarkUnread,
seedContextRead: noopMarkRead,
drainSyncedRollbacks: noopDrainRollbacks,
drainSyncedAdvances: noopDrainAdvances,
readStateVersion: 0,
};
}
Expand All @@ -99,6 +111,8 @@ export function useReadState(
markContextRead,
markContextUnread,
seedContextRead,
drainSyncedRollbacks,
drainSyncedAdvances,
readStateVersion,
};
}
26 changes: 26 additions & 0 deletions desktop/src/features/channels/useUnreadChannels.ts
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,8 @@ export function useUnreadChannels(
isReady: isReadStateReady,
markContextRead,
markContextUnread,
drainSyncedRollbacks,
drainSyncedAdvances,
readStateVersion,
} = useReadState(pubkey, relayClient);

Expand All @@ -272,6 +274,30 @@ export function useUnreadChannels(
// against. Cleared when the user opens the channel.
const forcedUnreadRef = React.useRef(new Set<string>());

// When a synced event rolls back a read marker (cross-device mark-as-unread),
// merge into forcedUnreadRef so the badge appears immediately without waiting
// for a catch-up REQ that already ran with the old (higher) marker.
// When a synced event advances a read marker (cross-device mark-as-read),
// remove from forcedUnreadRef so the dot clears immediately.
// biome-ignore lint/correctness/useExhaustiveDependencies: readStateVersion is the intentional drain trigger
React.useEffect(() => {
const rolled = drainSyncedRollbacks();
const advanced = drainSyncedAdvances();
let anyNew = false;
for (const channelId of rolled) {
if (!forcedUnreadRef.current.has(channelId)) {
forcedUnreadRef.current.add(channelId);
anyNew = true;
}
}
for (const channelId of advanced) {
if (forcedUnreadRef.current.delete(channelId)) {
anyNew = true;
}
}
if (anyNew) bumpLatestVersion();
}, [readStateVersion, drainSyncedRollbacks, drainSyncedAdvances]);

// Root event IDs of threads where the current user has replied at least once.
// Used to determine if thread replies should trigger unread notifications.
const participatedRootIdsRef = React.useRef(new Set<string>());
Expand Down
39 changes: 39 additions & 0 deletions desktop/src/testing/e2eBridge.ts
Original file line number Diff line number Diff line change
Expand Up @@ -526,6 +526,12 @@ declare global {
models?: Array<{ id: string; name: string | null }>;
denyReason?: string;
}) => void;
__SPROUT_E2E_EMIT_MOCK_READ_STATE__?: (input: {
clientId: string;
contexts: Record<string, number>;
createdAt: number;
slotId: string;
}) => unknown;
}
}

Expand Down Expand Up @@ -5098,6 +5104,11 @@ function sendToMockSocket(args: {
return;
}

if (event.kind === 30078) {
sendWsText(socket.handler, ["OK", event.id, true, ""]);
return;
}

const channelId = getChannelIdFromTags(event.tags);
if (!channelId) {
sendWsText(socket.handler, [
Expand Down Expand Up @@ -5201,6 +5212,30 @@ export function maybeInstallE2eTauriMocks() {
window.dispatchEvent(new CustomEvent("sprout:e2e-home-feed-updated"));
return item;
};
window.__SPROUT_E2E_EMIT_MOCK_READ_STATE__ = ({
clientId,
contexts,
createdAt,
slotId,
}) => {
const blob = JSON.stringify({
v: 1,
client_id: clientId,
contexts,
});
const event = createMockEvent(
30078,
blob,
[
["d", `read-state:${slotId}`],
["t", "read-state"],
],
getMockMemberPubkey(config),
createdAt,
);
emitMockLiveEvent(GLOBAL_MOCK_SUBSCRIPTION, event);
return event;
};
window.__SPROUT_E2E_SET_STALL_WEBSOCKET_SENDS__ = (stall) => {
const config = getConfig();
if (!config?.mock) return;
Expand Down Expand Up @@ -5652,6 +5687,10 @@ export function maybeInstallE2eTauriMocks() {
(payload as { createdAt?: number }).createdAt,
),
);
case "nip44_encrypt_to_self":
return (payload as { plaintext: string }).plaintext;
case "nip44_decrypt_from_self":
return (payload as { ciphertext: string }).ciphertext;
case "create_auth_event":
if (identity) {
return JSON.stringify(
Expand Down
Loading