Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 68 additions & 0 deletions .changeset/sse-solid2-async-reactivity.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
---
"@solid-primitives/sse": minor
---

Align `createSSE` with Solid 2.0 async reactivity patterns

### Breaking changes

**`pending` removed from `SSEReturn`**

Use `<Loading>` for initial load UI and `isPending(() => data())` for stale-while-revalidating. Both are re-exported from this package.

```tsx
// Before
const { data, pending } = createSSE(url);
<Show when={!pending()}><p>{data()}</p></Show>

// After — declarative initial load
<Loading fallback={<p>Connecting…</p>}>
<p>{data()}</p>
</Loading>

// After — stale-while-revalidating (only true once a value exists and new data is pending)
<Show when={isPending(() => data())}><p>Refreshing…</p></Show>
```

**`error` removed from `SSEReturn`**

Terminal errors (connection CLOSED with no retries left) now propagate through `data()` to `<Errored>`. Non-terminal errors (browser reconnecting) are still surfaced via `onError` callback.

```tsx
// Before
const { data, error } = createSSE(url);
<Show when={error()}><p>Error: {error()?.type}</p></Show>

// After — single error path via Errored boundary
<Errored fallback={err => <p>Connection failed</p>}>
<Loading fallback={<p>Connecting…</p>}>
<p>{data()}</p>
</Loading>
</Errored>
```

**`data` type narrowed from `Accessor<T | undefined>` to `Accessor<T>`**

The `| undefined` loading hole is removed. When `data()` is not ready it throws `NotReadyError` (caught by `<Loading>`) or the terminal error (caught by `<Errored>`); it never returns `undefined` due to pending state.

**SSR stub**: `data()` now throws `NotReadyError` on the server when no `initialValue` is provided (consistent with browser behaviour). Provide `initialValue` for a non-throwing SSR default.

### New primitives

**`makeSSEAsyncIterable<T>(url, options?)`**

Wraps an SSE endpoint as a standard `AsyncIterable<T>`. Each message is one yielded value; terminal errors are thrown. Cleanup runs automatically when the iterator is abandoned.

```ts
for await (const msg of makeSSEAsyncIterable<string>(url)) {
console.log(msg);
}
```

**`createSSEStream<T>(url, options?)`**

Minimal reactive alternative to `createSSE` — returns only a `data: Accessor<T>` backed by an async iterable. Same `<Loading>` / `<Errored>` integration, no `source` / `readyState` / `close` / `reconnect`.

```ts
const data = createSSEStream<{ msg: string }>(url, { transform: JSON.parse });
```
213 changes: 145 additions & 68 deletions packages/sse/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,12 @@
[![version](https://img.shields.io/npm/v/@solid-primitives/sse?style=for-the-badge)](https://www.npmjs.com/package/@solid-primitives/sse)
[![stage](https://img.shields.io/endpoint?style=for-the-badge&url=https%3A%2F%2Fraw.githubusercontent.com%2Fsolidjs-community%2Fsolid-primitives%2Fmain%2Fassets%2Fbadges%2Fstage-2.json)](https://github.com/solidjs-community/solid-primitives#contribution-process)

Primitives for [Server-Sent Events](https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events) using the browser's built-in `EventSource` API.
Primitives for [Server-Sent Events](https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events) using the browser's built-in `EventSource` API. Designed for Solid 2.0's async reactivity model.

- [`makeSSE`](#makesse) — Base non-reactive primitive. Creates an `EventSource` and returns a cleanup function. No Solid lifecycle.
- [`createSSE`](#createsse) — Reactive primitive. Accepts a reactive URL, integrates with Solid's owner lifecycle, and returns signals for `data`, `error`, and `readyState`.
- [`createSSE`](#createsse) — Reactive primitive. Accepts a reactive URL, integrates with Solid's owner lifecycle, and returns signals for `data` and `readyState`.
- [`makeSSEAsyncIterable`](#makesseasynciterable) — Wraps an SSE endpoint as an `AsyncIterable<T>`. Non-reactive foundation.
- [`createSSEStream`](#createssesstream) — Minimal reactive stream: just a `data` accessor backed by an async iterable.
- [`makeSSEWorker`](#running-sse-in-a-worker) — Runs the SSE connection inside a Web Worker or SharedWorker.
- [Built-in transformers](#built-in-transformers) — `json`, `ndjson`, `lines`, `number`, `safe`, `pipe`.

Expand Down Expand Up @@ -70,42 +72,87 @@ Reactive SSE primitive. Connects on creation, closes when the owner is disposed,
```ts
import { createSSE, SSEReadyState } from "@solid-primitives/sse";

const { data, readyState, error, close, reconnect } = createSSE<{ message: string }>(
const { data, readyState, close, reconnect } = createSSE<{ message: string }>(
"https://api.example.com/events",
{
transform: JSON.parse,
reconnect: { retries: 3, delay: 2000 },
},
);
```

### Loading and error boundaries

`data()` integrates with Solid 2.0's async reactivity:

- **`<Loading>`** — shows fallback while `data()` is pending (before the first message arrives).
- **`<Errored>`** — catches terminal errors (connection CLOSED with no retries left) thrown through `data()`.

```tsx
import { Loading, Errored } from "solid-js";
import { createSSE } from "@solid-primitives/sse";

const { data, close, reconnect } = createSSE<{ message: string }>(
"https://api.example.com/events",
{ transform: JSON.parse },
);

return (
<div>
<Show when={readyState() === SSEReadyState.OPEN} fallback={<p>Connecting…</p>}>
<p>Latest: {data()?.message ?? "—"}</p>
</Show>
<Show when={error()}>
<p style="color:red">Connection error</p>
<Errored fallback={err => <p style="color:red">Connection failed</p>}>
<Loading fallback={<p>Connecting…</p>}>
<p>Latest: {data().message}</p>
</Loading>
</Errored>
);
```

Non-terminal errors (while the browser is reconnecting automatically) are surfaced via the `onError` callback only — they don't interrupt the reactive graph.

### Stale-while-revalidating with `isPending`

After the first message has arrived, subsequent reconnects (URL change, `reconnect()` call) put the connection back into a pending state. Use `isPending` from Solid to show a subtle "refreshing" indicator without replacing the whole subtree:

```tsx
import { isPending } from "solid-js";
import { createSSE } from "@solid-primitives/sse";

const { data } = createSSE<{ msg: string }>(url, { transform: JSON.parse });

return (
<>
<Show when={isPending(() => data())}>
<p>Refreshing…</p>
</Show>
<button onClick={close}>Disconnect</button>
<button onClick={reconnect}>Reconnect</button>
</div>
<Loading fallback={<p>Connecting…</p>}>
<p>{data().msg}</p>
</Loading>
</>
);
```

### Reactive URL
> **Note:** `isPending` is `false` during the initial `<Loading>` fallback (no stale value yet). It becomes `true` only when a stale value exists and new data is pending — i.e., after a URL change or reconnect.

When the URL is a signal accessor, the connection is replaced whenever the URL changes:
### Reactive URL with `<Loading on=…>`

```ts
When the URL is a signal accessor, the connection is replaced whenever the URL changes. Use `<Loading>`'s `on` prop to re-show the fallback on each URL change:

```tsx
const [userId, setUserId] = createSignal("user-1");

const { data } = createSSE<Notification>(
() => `https://api.example.com/notifications/${userId()}`,
{ transform: JSON.parse },
);

return (
// on={userId()} re-shows the fallback each time userId changes while pending
<Loading on={userId()} fallback={<p>Connecting…</p>}>
<p>{data().message}</p>
</Loading>
);
```

Changing `userId()` will close the existing connection and open a new one to the updated URL.
Without `on`, `<Loading>` keeps showing stale content during revalidation. With `on`, it re-shows the fallback whenever the key changes and a new connection is establishing.

### Options

Expand All @@ -114,7 +161,7 @@ Changing `userId()` will close the existing connection and open a new one to the
| `withCredentials` | `boolean` | `false` | Send credentials with the request |
| `onOpen` | `(e: Event) => void` | — | Called when the connection opens |
| `onMessage` | `(e: MessageEvent) => void` | — | Called on each unnamed `message` event |
| `onError` | `(e: Event) => void` | — | Called on error |
| `onError` | `(e: Event) => void` | — | Called on error (terminal and transient) |
| `events` | `Record<string, (e: MessageEvent) => void>` | — | Handlers for named SSE event types |
| `initialValue` | `T` | `undefined` | Initial value of the `data` signal |
| `transform` | `(raw: string) => T` | identity | Parse raw string data, e.g. `JSON.parse` |
Expand All @@ -129,14 +176,22 @@ Changing `userId()` will close the existing connection and open a new one to the

### Return value

| Property | Type | Description |
| ------------ | ---------------------------------------- | ------------------------------------------------ |
| `source` | `Accessor<SSESourceHandle \| undefined>` | Underlying source instance; `undefined` on SSR |
| `data` | `Accessor<T \| undefined>` | Latest message data |
| `error` | `Accessor<Event \| undefined>` | Latest error event |
| `readyState` | `Accessor<SSEReadyState>` | `SSEReadyState.CONNECTING` / `.OPEN` / `.CLOSED` |
| `close` | `VoidFunction` | Close the connection |
| `reconnect` | `VoidFunction` | Force-close and reopen |
| Property | Type | Description |
| ------------ | ---------------------------------------- | ---------------------------------------------------------------------------------------- |
| `source` | `Accessor<SSESourceHandle \| undefined>` | Underlying source instance; `undefined` on SSR |
| `data` | `Accessor<T>` | Latest message data; throws `NotReadyError` until first message, terminal errors thereafter |
| `readyState` | `Accessor<SSEReadyState>` | `SSEReadyState.CONNECTING` / `.OPEN` / `.CLOSED` |
| `close` | `VoidFunction` | Close the connection |
| `reconnect` | `VoidFunction` | Force-close and reopen; resets `data` to pending |

### Initial value

Provide `initialValue` to skip the pending state entirely — `data()` returns it immediately with no `<Loading>` fallback needed:

```ts
const { data } = createSSE(url, { initialValue: [] as string[] });
// data() === [] immediately, no Loading needed
```

### `SSEReadyState`

Expand All @@ -154,57 +209,79 @@ SSEReadyState.CLOSED; // 2

`EventSource` has native browser-level reconnection built in. For transient network drops the browser automatically retries. The `reconnect` option in `createSSE` is for _application-level_ reconnection — it fires only when `readyState` becomes `SSEReadyState.CLOSED`, meaning the browser has given up entirely. You generally do not need `reconnect: true` for normal usage.

### A note on server disconnection detection
## `makeSSEAsyncIterable`

`EventSource` **does not reliably detect when a server silently stops responding**. If the server process crashes or the network path is severed without a proper TCP close handshake, the browser never fires an `error` event and `readyState` stays `OPEN` indefinitely — the connection looks healthy even though no messages will ever arrive.
Wraps an SSE endpoint as a standard `AsyncIterable<T>`. Each SSE message becomes one yielded value; terminal errors (connection CLOSED) are thrown by the iterator. Cleanup runs automatically when the iterator is abandoned via `return()`.

The only robust workaround is **application-level heartbeats**: the server sends a lightweight event at a fixed interval, and the client starts a timer that triggers a reconnect if no heartbeat is received within the expected window.
Use this as a non-reactive building block: integrate it with a `for await…of` loop, pass it to your own `createMemo`, or compose it with other async utilities.

```ts
import { createSSE } from "@solid-primitives/sse";
import { onCleanup } from "solid-js";

const HEARTBEAT_TIMEOUT_MS = 15_000; // reconnect if silent for 15 s

function createSSEWithHeartbeat(url: string) {
let timer: ReturnType<typeof setTimeout> | undefined;

const { reconnect, ...rest } = createSSE(url, {
// The server emits `event: heartbeat\ndata: \n\n` every ~10 s.
// Any regular message also resets the timer.
events: { heartbeat: resetTimer },
onMessage: resetTimer,
reconnect: true,
});

function resetTimer() {
clearTimeout(timer);
timer = setTimeout(() => {
// No heartbeat received — assume the server is gone.
reconnect();
}, HEARTBEAT_TIMEOUT_MS);
}

onCleanup(() => {
clearTimeout(timer);
timer = undefined;
});
resetTimer(); // arm the first timeout immediately

return { reconnect, ...rest };
import { makeSSEAsyncIterable } from "@solid-primitives/sse";

const iterable = makeSSEAsyncIterable<string>("https://api.example.com/events");

for await (const msg of iterable) {
console.log(msg);
}
```

On the server, emit a periodic heartbeat event well within the client timeout:
### Definition

```ts
function makeSSEAsyncIterable<T = string>(
url: string | URL,
options?: CreateSSEStreamOptions<T>,
): AsyncIterable<T>;

```js
// Express / Node.js example
setInterval(() => {
res.write("event: heartbeat\ndata: \n\n");
}, 10_000); // every 10 s, safely below the 15 s client timeout
type CreateSSEStreamOptions<T> = {
withCredentials?: boolean;
onOpen?: (event: Event) => void;
onError?: (event: Event) => void;
transform?: (raw: string) => T;
events?: Record<string, (event: MessageEvent) => void>;
source?: SSESourceFn;
};
```

> **Why SSE comment lines are not enough** — SSE comment lines (e.g. `: keep-alive`) reset the browser's internal TCP idle timer but are _not_ exposed to JavaScript listeners. Use a named `event: heartbeat` or a plain `data:` event if you need the client to observe the heartbeat.
## `createSSEStream`

A minimal reactive alternative to `createSSE` that returns only a `data` accessor. Internally it drives an `AsyncIterable` produced by `makeSSEAsyncIterable`, giving the same `<Loading>` / `<Errored>` integration with less API surface.

Use this when you only need the stream values and don't need access to `source`, `readyState`, `close`, or `reconnect`.

```ts
import { createSSEStream } from "@solid-primitives/sse";

const data = createSSEStream<{ msg: string }>(url, { transform: JSON.parse });

return (
<Errored fallback={err => <p>Connection failed</p>}>
<Loading fallback={<p>Connecting…</p>}>
<p>{data().msg}</p>
</Loading>
</Errored>
);
```

Reactive URL is supported — the stream reconnects automatically when the URL signal changes:

```ts
const [userId, setUserId] = createSignal("user-1");

const data = createSSEStream<Notification>(
() => `https://api.example.com/notifications/${userId()}`,
{ transform: JSON.parse },
);
```

### Definition

```ts
function createSSEStream<T = string>(
url: MaybeAccessor<string>,
options?: CreateSSEStreamOptions<T>,
): Accessor<T>;
```

## Integration with `@solid-primitives/event-bus`

Expand Down Expand Up @@ -266,7 +343,7 @@ return <For each={messages}>{msg => <p>{msg}</p>}</For>;

## Built-in transformers

Ready-made `transform` functions for the most common SSE data formats. Pass one as the `transform` option to `createSSE`:
Ready-made `transform` functions for the most common SSE data formats. Pass one as the `transform` option to `createSSE` or `createSSEStream`:

```ts
import { createSSE, json } from "@solid-primitives/sse";
Expand Down Expand Up @@ -409,7 +486,7 @@ const worker = new Worker(new URL("@solid-primitives/sse/worker-handler", import
type: "module",
});

const { data, readyState, error, close, reconnect } = createSSE<{ msg: string }>(
const { data, readyState, close, reconnect } = createSSE<{ msg: string }>(
"https://api.example.com/events",
{
source: makeSSEWorker(worker),
Expand Down
6 changes: 3 additions & 3 deletions packages/sse/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -67,17 +67,17 @@
"scripts": {
"dev": "node --import=@nothing-but/node-resolve-ts --experimental-transform-types ../../scripts/dev.ts",
"build": "node --import=@nothing-but/node-resolve-ts --experimental-transform-types ../../scripts/build.ts",
"vitest": "vitest -c ../../configs/vitest.config.ts",
"vitest": "vitest -c vitest.config.ts",
"test": "pnpm run vitest",
"test:ssr": "pnpm run vitest --mode ssr"
},
"dependencies": {
"@solid-primitives/utils": "workspace:^"
},
"peerDependencies": {
"solid-js": "^1.6.12"
"solid-js": "2.0.0-beta.10"
},
"devDependencies": {
"solid-js": "^1.9.7"
"solid-js": "2.0.0-beta.10"
}
}
Loading