From 1141fa1a6e29c9489bd00ed6e55e0c7f72a0b0af Mon Sep 17 00:00:00 2001
From: k-taro56 <121674121+k-taro56@users.noreply.github.com>
Date: Sat, 2 May 2026 21:27:10 +0900
Subject: [PATCH 01/55] Enhance Studio development experience with HMR and
early-stop features
- Integrated Rolldown for hot module replacement (HMR) in `arkor dev`, allowing real-time updates to the training interface without page refresh.
- Implemented `requestEarlyStop` and `replaceCallbacks` methods in the Trainer API to facilitate graceful stopping of training jobs and dynamic callback updates during execution.
- Updated documentation to reflect new features and usage patterns for improved developer guidance.
- Adjusted cleanup logic for better resource management during development sessions.
---
AGENTS.md | 14 +-
docs/concepts/studio.mdx | 2 +-
docs/ja/concepts/studio.mdx | 2 +-
docs/ja/sdk/trainer-control.mdx | 22 +-
docs/sdk/trainer-control.mdx | 22 +-
packages/arkor/package.json | 2 +-
packages/arkor/src/cli/commands/build.ts | 40 ++-
packages/arkor/src/cli/commands/dev.ts | 32 +-
packages/arkor/src/cli/commands/start.test.ts | 2 +-
packages/arkor/src/core/arkor.test.ts | 1 +
packages/arkor/src/core/runner.test.ts | 87 +++++-
packages/arkor/src/core/runner.ts | 60 +++-
packages/arkor/src/core/trainer.test.ts | 249 ++++++++++++++++
packages/arkor/src/core/trainer.ts | 62 ++++
packages/arkor/src/core/types.ts | 11 +
packages/arkor/src/studio/hmr.test.ts | 141 +++++++++
packages/arkor/src/studio/hmr.ts | 148 ++++++++++
packages/arkor/src/studio/server.test.ts | 126 ++++++++
packages/arkor/src/studio/server.ts | 111 ++++++-
.../studio-app/src/components/RunTraining.tsx | 79 ++++-
packages/studio-app/src/lib/api.ts | 20 ++
pnpm-lock.yaml | 274 +-----------------
22 files changed, 1206 insertions(+), 301 deletions(-)
create mode 100644 packages/arkor/src/studio/hmr.test.ts
create mode 100644 packages/arkor/src/studio/hmr.ts
diff --git a/AGENTS.md b/AGENTS.md
index 5d4d8455..60960f68 100644
--- a/AGENTS.md
+++ b/AGENTS.md
@@ -63,7 +63,7 @@ cd my-arkor-app && pnpm dev # Studio at http://127.0.
`arkor dev` generates a 32-byte base64url token per launch ([packages/arkor/src/cli/commands/dev.ts](packages/arkor/src/cli/commands/dev.ts)) and:
-1. Passes it to `buildStudioApp({ studioToken })`. The Hono server validates every `/api/*` request via `X-Arkor-Studio-Token` header (or `?studioToken=` query for `EventSource`, which can't set headers). Comparison uses `timingSafeEqual`.
+1. Passes it to `buildStudioApp({ studioToken })`. The Hono server validates every `/api/*` request via `X-Arkor-Studio-Token` header (or `?studioToken=` query for `EventSource`, which can't set headers). Comparison uses `timingSafeEqual`. The query-token allow-list lives in `eventStreamPathPattern` in [packages/arkor/src/studio/server.ts](packages/arkor/src/studio/server.ts) — currently `/api/jobs/:id/events` and `/api/dev/events`. **Adding to that regex is CSRF-sensitive: each entry must be a GET stream-only route, never a mutation endpoint.**
2. Persists it to `~/.arkor/studio-token` (mode 0600) so the SPA dev workflow (`pnpm --filter @arkor/studio-app dev`) can read it via the `arkor-studio-token` Vite plugin in [packages/studio-app/vite.config.ts](packages/studio-app/vite.config.ts), which injects `` into `index.html` on each request. Persistence failure must NOT block server start (read-only `$HOME` on Docker, etc.) — just warn.
3. Cleans up on `exit`/SIGINT/SIGTERM/SIGHUP via `unlinkSync`.
@@ -73,11 +73,19 @@ The whole point: prevents another browser tab on the same machine from POSTing `
When touching the Studio server or SPA fetch layer, preserve: token via header for `fetch`, query param for `EventSource`, host-header guard, no CORS, timing-safe compare. The Vite plugin is dev-only (`apply: "serve"`) — running it during `vite build` would bake a stale per-launch token into the production `index.html` and shadow the runtime tag, causing every `/api/*` call to 403.
+### HMR + graceful early-stop
+
+`arkor dev` keeps a [Rolldown](https://rolldown.rs) watcher over `src/arkor/` ([packages/arkor/src/studio/hmr.ts](packages/arkor/src/studio/hmr.ts)) and pushes rebuild events over `/api/dev/events` (SSE). The SPA re-fetches `/api/manifest` on each event so the Run Training button stays in sync without a browser refresh.
+
+When a rebuild lands while a `/api/train`-spawned subprocess is in flight, the server SIGTERM's the child. The child's signal handler in `runTrainer` calls `Trainer.requestEarlyStop()`, which lets the next `checkpoint.saved` event finish (work preserved) before issuing `cancel()` and exiting cleanly. The SPA then auto-restarts the run with the rebuilt artifact via the `restart: true` flag on the SSE event. A second SIGTERM bypasses the early-stop and exits 143 immediately — emergency escape hatch for a hung cancel.
+
+Don't replace the SIGTERM-and-let-the-child-handle-it pattern with a SIGKILL escalation in the server: that would orphan Cloud-side jobs (no `cancel()` POST goes out) and waste GPU budget. The hard kill timer in `requestEarlyStopOnActive` exists only as a stuck-process fallback.
+
### Project entry-point discovery
The CLI/Studio look at `src/arkor/index.ts` in user projects. Discovery in [packages/arkor/src/core/runner.ts](packages/arkor/src/core/runner.ts) accepts (in order): a named `arkor` export from `createArkor({...})`, a bare `trainer` export, a default export holding either an Arkor manifest or a Trainer, or a `default.trainer` nested shape. `createArkor` returns a frozen, opaque manifest tagged with `_kind: "arkor"`; treat it as a value to hand to tooling, not a programmable client.
-`arkor build` ([packages/arkor/src/cli/commands/build.ts](packages/arkor/src/cli/commands/build.ts)) bundles to `.arkor/build/index.mjs` with esbuild; bare specifiers (e.g. `arkor`, anything in `node_modules`) stay external so the artifact resolves the runtime SDK from the project's installed copy.
+`arkor build` ([packages/arkor/src/cli/commands/build.ts](packages/arkor/src/cli/commands/build.ts)) bundles to `.arkor/build/index.mjs` with [Rolldown](https://rolldown.rs); bare specifiers (e.g. `arkor`, anything in `node_modules`) stay external so the artifact resolves the runtime SDK from the project's installed copy. The `transform.target` is derived from `process.versions.node` at build time so the bundle targets the same Node binary that will execute it.
### E2E suite specifics
@@ -101,4 +109,4 @@ Don't split these into "docs in a follow-up PR" or "tests later" — land them i
- **Don't call a HuggingFace model name "non-existent"** based on training-data alone. Templates reference real models (e.g. `unsloth/gemma-4-E4B-it`) that may post-date Claude's knowledge cutoff. Verify (e.g. `WebFetch`) before flagging in issues or PR comments. If unverifiable, hedge ("could not confirm") rather than asserting absence.
- **Generated files** copied into package dirs are gitignored: `packages/*/CONTRIBUTING.md` (from root), `packages/arkor/docs/` (from root `docs/`). Edit the source under repo root, not the copies.
- **Node version**: published packages declare `engines.node >=22.6`. Use Node 24 (latest preferred) for development per [CONTRIBUTING.md](CONTRIBUTING.md).
-- **pnpm policy** ([pnpm-workspace.yaml](pnpm-workspace.yaml)): `minimumReleaseAge: 1440` (24 h) and `trustPolicy: no-downgrade` are intentional supply-chain guards. `allowBuilds` is the explicit allow-list for postinstall scripts (rolldown, unrs-resolver, esbuild).
+- **pnpm policy** ([pnpm-workspace.yaml](pnpm-workspace.yaml)): `minimumReleaseAge: 1440` (24 h) and `trustPolicy: no-downgrade` are intentional supply-chain guards. `allowBuilds` is the explicit allow-list for postinstall scripts (rolldown for `arkor build`, esbuild for Vite's dependency optimization, unrs-resolver).
diff --git a/docs/concepts/studio.mdx b/docs/concepts/studio.mdx
index 5724079e..3ef52cf8 100644
--- a/docs/concepts/studio.mdx
+++ b/docs/concepts/studio.mdx
@@ -13,7 +13,7 @@ Three jobs:
2. **See training happen.** A jobs list with live status, a loss chart that updates as the run streams in, and a tail of training events. You can leave it open in a tab while you work on other things.
3. **Try a finished model.** A Playground page lets you pick the base model or the final adapter from any completed job and chat with it. The Playground does not load intermediate checkpoints; for mid-run inference, use [`onCheckpoint`](/concepts/lifecycle) callbacks in your trainer.
-A note on the dev loop: Studio's `/api/manifest` endpoint rebuilds and re-imports your trainer on every request (with a cache-bust query, see `packages/arkor/src/studio/manifest.ts`), but the UI only fetches it when the Run training page mounts. So if you edit `src/arkor/` and stay on the same Run training page, the next click reuses the existing `.arkor/build/index.mjs` and runs your old code. Refresh the page (or run `pnpm exec arkor build` from the terminal) between edits and clicks to pick up the new code reliably.
+A note on the dev loop: Studio runs a [Rolldown](https://rolldown.rs) watcher over `src/arkor/` and pushes rebuild notifications to the SPA over a Server-Sent Events stream (`/api/dev/events`). Edit a file, save, and the Run training button updates with the new trainer name without a refresh. If a training run is in flight, the Studio asks it to early-stop at the next checkpoint (`Trainer.requestEarlyStop()`) so the work isn't wasted, then re-spawns the run with the rebuilt artifact. The Cloud-side job for the previous run reaches `cancelled` after the checkpoint is uploaded.
## Where Studio runs
diff --git a/docs/ja/concepts/studio.mdx b/docs/ja/concepts/studio.mdx
index 1e4adedb..b8ae40ff 100644
--- a/docs/ja/concepts/studio.mdx
+++ b/docs/ja/concepts/studio.mdx
@@ -13,7 +13,7 @@ Studio は `arkor dev` 実行時に得られるローカル Web UI です。サ
2. **学習を眺める。** ライブステータス付きのジョブ一覧、ストリーム到着とともに更新される Loss チャート、学習イベントのテール。タブで開きっぱなしにして他の作業ができます。
3. **完成モデルを試す。** Playground ページでベースモデルや任意の完了ジョブの最終アダプタを選んでチャットできます。中間チェックポイントは Playground からはロードしません。学習中の推論には [`onCheckpoint`](/ja/concepts/lifecycle) コールバックをトレーナーで使ってください。
-dev ループのメモ: Studio の `/api/manifest` エンドポイントはリクエストごとにトレーナーをリビルド・再 import しますが(キャッシュバストクエリ付き、`packages/arkor/src/studio/manifest.ts` を参照)、UI が fetch するのは Run training ページがマウントされたときだけです。`src/arkor/` を編集して同じ Run training ページに留まり続けると、次のクリックは既存の `.arkor/build/index.mjs` を再利用して古いコードで走ります。確実に新しいコードを取り込むには、編集とクリックの間にページをリロード(あるいはターミナルから `pnpm exec arkor build`)してください。
+dev ループのメモ: Studio は [Rolldown](https://rolldown.rs) のウォッチャを `src/arkor/` 上で常駐させ、再ビルド通知を Server-Sent Events ストリーム (`/api/dev/events`) で SPA に push します。ファイルを編集して保存すれば、Run training ボタンのトレーナー名表示はリロード無しで更新されます。学習が走っている最中であれば、Studio はそのジョブに次のチェックポイントで Early Stopping を要求し(`Trainer.requestEarlyStop()`、ここまでの学習成果は保全)、再ビルドした成果物で自動的に再投入します。Cloud 側の以前のジョブはチェックポイントのアップロード完了後に `cancelled` 状態に遷移します。
## Studio が動く場所
diff --git a/docs/ja/sdk/trainer-control.mdx b/docs/ja/sdk/trainer-control.mdx
index 6a6bd74f..8683d3d5 100644
--- a/docs/ja/sdk/trainer-control.mdx
+++ b/docs/ja/sdk/trainer-control.mdx
@@ -5,7 +5,7 @@ description: "start、wait、cancel、abortSignal、再接続の仕組み。"
# トレーナー制御
-`createTrainer` は次の 3 メソッドを持つ `Trainer` オブジェクトを返します:
+`createTrainer` は次の 4 メソッドを持つ `Trainer` オブジェクトを返します:
```ts
interface Trainer {
@@ -13,6 +13,7 @@ interface Trainer {
start(): Promise<{ jobId: string }>;
wait(): Promise;
cancel(): Promise;
+ requestEarlyStop(opts?: { timeoutMs?: number }): Promise;
}
interface TrainingResult {
@@ -54,6 +55,25 @@ await trainer.cancel();
- そうでなければバックエンドにキャンセルリクエストを送ります。
- **ベストエフォート。** SDK は終端ステータスでショートサーキットしません。学習が既に completed / failed / cancelled なら、バックエンドは non-2xx を返すことがあり `cancel()` は reject します。投機的に呼ぶなら `try / catch` で囲んでください。
+## `requestEarlyStop()`
+
+```ts
+await trainer.requestEarlyStop();
+// もしくはチェックポイント待ちのデッドラインを指定:
+await trainer.requestEarlyStop({ timeoutMs: 60_000 });
+```
+
+「直近のチェックポイントを保全する」 `cancel()` の兄弟版です。
+
+- ラッチを armed にします。トレーナーは **次の** `checkpoint.saved` イベントが来るまで実行を続けます。チェックポイントが永続化された時点で SDK が代わりに `cancel()` を呼び、戻り値の Promise を resolve します。
+- `timeoutMs`(デフォルト: 5 分)以内にチェックポイントが来なかった場合は即時 `cancel()` にフォールバックします。`saveSteps` の間隔がそれより長い場合はこの値を調整してください。
+- 冪等: 連続して呼んでも同じ in-flight Promise を共有し、`cancel()` は 1 度しか発火しません。
+- `start()` 前、もしくはジョブが既に終端ステータスに達している場合は何もしません。
+
+これは `arkor dev` の HMR パイプラインが内部で使っている API です。実行中にソースファイルを保存すると Studio が spawn 済みの `arkor start` プロセスに `SIGTERM` を送り、シグナルハンドラが `requestEarlyStop()` を呼んでチェックポイントのアップロード完了後にクリーンに終了します。Cloud 側のジョブは `cancelled` ステータスで完了します。
+
+自前のコード(プログラム的な two-process パターンなど)から `requestEarlyStop()` を直接呼ぶこともできます。Cookbook の [Early Stopping](/ja/cookbook/early-stopping) レシピが `onCheckpoint` + `abortSignal` + `cancel()` で組み立てているのと同じ「実行中ステップを捨てずに止める」セマンティクスを、ワンショットで提供します。レシピ版の方が柔軟(メトリクス次第で abort のタイミングを決めるなど)ですが、こちらは「次のチェックポイントで止める」という典型ケースの便利フックです。
+
## `abortSignal`
```ts
diff --git a/docs/sdk/trainer-control.mdx b/docs/sdk/trainer-control.mdx
index ef40cf46..c43a404b 100644
--- a/docs/sdk/trainer-control.mdx
+++ b/docs/sdk/trainer-control.mdx
@@ -5,7 +5,7 @@ description: "start, wait, cancel, abortSignal, and how reconnects work."
# Trainer control
-`createTrainer` returns a `Trainer` object with three methods:
+`createTrainer` returns a `Trainer` object with four methods:
```ts
interface Trainer {
@@ -13,6 +13,7 @@ interface Trainer {
start(): Promise<{ jobId: string }>;
wait(): Promise;
cancel(): Promise;
+ requestEarlyStop(opts?: { timeoutMs?: number }): Promise;
}
interface TrainingResult {
@@ -54,6 +55,25 @@ await trainer.cancel();
- Otherwise it sends a cancel request to the backend.
- **Best-effort.** The SDK does not short-circuit on terminal status; if the run already completed, failed, or was cancelled, the backend may return a non-2xx and `cancel()` rejects. Wrap in `try / catch` if you call it speculatively.
+## `requestEarlyStop()`
+
+```ts
+await trainer.requestEarlyStop();
+// or with a custom checkpoint deadline:
+await trainer.requestEarlyStop({ timeoutMs: 60_000 });
+```
+
+The "preserve the latest checkpoint" sibling of `cancel()`:
+
+- Arms a latch. The trainer keeps running until the **next** `checkpoint.saved` event lands. Once the checkpoint is durable, the SDK calls `cancel()` for you and resolves the returned promise.
+- If no checkpoint arrives within `timeoutMs` (default: 5 minutes), falls back to `cancel()` immediately. Tune this if your `saveSteps` cadence is longer than 5 min.
+- Idempotent — repeat calls share the same in-flight promise and only fire `cancel()` once.
+- A no-op when called before `start()` or after the job has already reached a terminal status.
+
+This is what `arkor dev`'s HMR pipeline uses internally: when you save a source file mid-run, Studio sends `SIGTERM` to the spawned `arkor start` process; that process catches the signal, calls `requestEarlyStop()`, and exits cleanly once the checkpoint is uploaded. The Cloud-side job ends in the `cancelled` status.
+
+You can use `requestEarlyStop()` directly from your own code (e.g. in a programmatic two-process pattern) if you want the same "stop, but don't throw away the in-flight step" semantics that the cookbook's [Early stopping](/cookbook/early-stopping) recipe builds out of `onCheckpoint` + `abortSignal` + `cancel()`. The recipe is more flexible (you decide when to abort based on metrics); this method is the convenience hook for the common "stop after the next checkpoint" case.
+
## `abortSignal`
```ts
diff --git a/packages/arkor/package.json b/packages/arkor/package.json
index d74cda08..58112ef4 100644
--- a/packages/arkor/package.json
+++ b/packages/arkor/package.json
@@ -55,10 +55,10 @@
"@clack/prompts": "^0.8.0",
"@hono/node-server": "^1.14.0",
"commander": "^13.0.0",
- "esbuild": "^0.28.0",
"hono": "^4.7.0",
"open": "^10.0.0",
"posthog-node": "^5.30.6",
+ "rolldown": "^1.0.0-rc.17",
"zod": "^4.3.6"
},
"devDependencies": {
diff --git a/packages/arkor/src/cli/commands/build.ts b/packages/arkor/src/cli/commands/build.ts
index af761428..3d555c59 100644
--- a/packages/arkor/src/cli/commands/build.ts
+++ b/packages/arkor/src/cli/commands/build.ts
@@ -1,7 +1,7 @@
import { existsSync } from "node:fs";
import { mkdir } from "node:fs/promises";
import { isAbsolute, relative, resolve } from "node:path";
-import { build as esbuild } from "esbuild";
+import { rolldown } from "rolldown";
import { ui } from "../prompts";
export interface BuildOptions {
@@ -25,6 +25,16 @@ export interface BuildResult {
const DEFAULT_ENTRY = "src/arkor/index.ts";
const DEFAULT_OUT_DIR = ".arkor/build";
+/**
+ * `node.` derived from the running Node binary. Build host and run
+ * host are effectively the same process: Studio spawns `arkor start` with
+ * `process.execPath`, so the bundle can target precisely what will execute it.
+ */
+function resolveNodeTarget(): string {
+ const [major = "22", minor = "6"] = process.versions.node.split(".");
+ return `node${major}.${minor}`;
+}
+
/**
* Bundle the user's `src/arkor/index.ts` into a single ESM artifact at
* `.arkor/build/index.mjs`.
@@ -48,16 +58,28 @@ export async function runBuild(opts: BuildOptions = {}): Promise {
await mkdir(outDir, { recursive: true });
const outFile = resolve(outDir, "index.mjs");
- await esbuild({
- entryPoints: [entry],
- bundle: true,
+ const bundle = await rolldown({
+ input: entry,
+ cwd,
platform: "node",
- format: "esm",
- target: "node22.6",
- outfile: outFile,
- packages: "external",
- logLevel: "error",
+ logLevel: "warn",
+ transform: { target: resolveNodeTarget() },
+ // Mirror esbuild's `packages: "external"`: any specifier that isn't a
+ // relative or absolute path stays external. `node:`-prefixed builtins are
+ // already handled by `platform: "node"` but we keep the explicit allow as
+ // a safety net in case the builtin set drifts.
+ external: (id, _importer, isResolved) => {
+ if (isResolved) return false;
+ if (id.startsWith(".")) return false;
+ if (isAbsolute(id)) return false;
+ return true;
+ },
});
+ try {
+ await bundle.write({ file: outFile, format: "esm" });
+ } finally {
+ await bundle.close();
+ }
if (!opts.quiet) {
ui.log.success(
diff --git a/packages/arkor/src/cli/commands/dev.ts b/packages/arkor/src/cli/commands/dev.ts
index e2bf01cf..2ba1c9b3 100644
--- a/packages/arkor/src/cli/commands/dev.ts
+++ b/packages/arkor/src/cli/commands/dev.ts
@@ -16,6 +16,7 @@ import {
type AnonymousCredentials,
} from "../../core/credentials";
import { buildStudioApp } from "../../studio/server";
+import { createHmrCoordinator } from "../../studio/hmr";
import { ANON_PERSISTENCE_NUDGE } from "../anonymous";
import { ui } from "../prompts";
@@ -190,6 +191,25 @@ function scheduleStudioTokenCleanup(path: string): void {
}
}
+function scheduleHmrCleanup(hmr: { dispose: () => Promise }): void {
+ let disposed = false;
+ const dispose = () => {
+ if (disposed) return;
+ disposed = true;
+ hmr.dispose().catch(() => {
+ // best-effort: shutdown is racing other cleanup paths
+ });
+ };
+ // Mirror `scheduleStudioTokenCleanup` exit hooks. Note that those handlers
+ // already call `process.exit(0)` for the same signals; this listener fires
+ // first because Node invokes signal handlers in registration order, so the
+ // dispose call lands before exit.
+ process.on("exit", dispose);
+ for (const sig of ["SIGINT", "SIGTERM", "SIGHUP"] as const) {
+ process.on(sig, dispose);
+ }
+}
+
export async function runDev(options: DevOptions = {}): Promise {
await ensureCredentialsForStudio();
@@ -199,6 +219,15 @@ export async function runDev(options: DevOptions = {}): Promise {
// hitting `arkor start` (and therefore RCE via dynamic import).
const studioToken = randomBytes(32).toString("base64url");
+ // HMR coordinator: a long-lived rolldown watcher over the user's
+ // `src/arkor` graph. Lazy-started on first `/api/dev/events` connection so
+ // an `arkor dev` launched in an unbuilt project doesn't immediately fail.
+ // Registered before the studio-token cleanup so the latter remains the
+ // most-recently-attached signal listener (existing tests rely on this
+ // ordering to find the token-removal handler).
+ const hmr = createHmrCoordinator({ cwd: process.cwd() });
+ scheduleHmrCleanup(hmr);
+
// Persisting the token to disk is *only* needed for the Vite SPA dev
// workflow. The bundled `:port` flow injects the meta tag at request time
// via `buildStudioApp`, so a failure here (read-only $HOME on Docker /
@@ -217,7 +246,7 @@ export async function runDev(options: DevOptions = {}): Promise {
// `autoAnonymous: true` (the default) lets the Hono server retry the
// anonymous bootstrap on first `/api/credentials` hit if the up-front
// attempt above failed (e.g. cloud-api was unreachable at launch).
- const app = buildStudioApp({ studioToken });
+ const app = buildStudioApp({ studioToken, hmr });
// Bind to 127.0.0.1 (not "localhost") so the listener can't end up on `::1`
// only — `@hono/node-server` passes hostname to `net.Server.listen`, which
// calls `dns.lookup`. On hosts where `/etc/hosts` orders `::1 localhost`
@@ -229,6 +258,7 @@ export async function runDev(options: DevOptions = {}): Promise {
const url = `http://localhost:${port}`;
serve({ fetch: app.fetch, port, hostname: "127.0.0.1" });
process.stdout.write(`Arkor Studio running on ${url}\n`);
+ process.stdout.write(`HMR enabled (watching src/arkor)\n`);
if (options.open) {
try {
await open(url);
diff --git a/packages/arkor/src/cli/commands/start.test.ts b/packages/arkor/src/cli/commands/start.test.ts
index 8209818b..a08d70f4 100644
--- a/packages/arkor/src/cli/commands/start.test.ts
+++ b/packages/arkor/src/cli/commands/start.test.ts
@@ -78,7 +78,7 @@ describe("runStart", () => {
it("skips the build step when the artifact already exists and no entry override is given", async () => {
// Branch coverage for `Boolean(opts.entry) || !existsSync(outFile)` —
// the path where both halves are false. Pre-build the artifact, then
- // confirm runStart imports it without triggering esbuild again.
+ // confirm runStart imports it without triggering rolldown again.
mkdirSync(join(cwd, "src/arkor"), { recursive: true });
writeFileSync(join(cwd, "src/arkor/index.ts"), FAKE_MANIFEST);
// First call builds normally.
diff --git a/packages/arkor/src/core/arkor.test.ts b/packages/arkor/src/core/arkor.test.ts
index 64e5e82e..d3dc41a7 100644
--- a/packages/arkor/src/core/arkor.test.ts
+++ b/packages/arkor/src/core/arkor.test.ts
@@ -23,6 +23,7 @@ function fakeTrainer(name = "run"): Trainer {
};
},
async cancel() {},
+ async requestEarlyStop() {},
};
}
diff --git a/packages/arkor/src/core/runner.test.ts b/packages/arkor/src/core/runner.test.ts
index b2560c8d..ee1667be 100644
--- a/packages/arkor/src/core/runner.test.ts
+++ b/packages/arkor/src/core/runner.test.ts
@@ -1,4 +1,4 @@
-import { describe, it, expect, afterEach, beforeEach } from "vitest";
+import { describe, it, expect, afterEach, beforeEach, vi } from "vitest";
import { mkdtempSync, rmSync, writeFileSync, mkdirSync } from "node:fs";
import { tmpdir } from "node:os";
import { join } from "node:path";
@@ -33,6 +33,7 @@ function fakeTrainer(onStart?: () => void, onWait?: () => void): Trainer {
};
},
async cancel() {},
+ async requestEarlyStop() {},
};
}
@@ -207,3 +208,87 @@ describe("runTrainer — entry extraction", () => {
expect(typeof t.wait).toBe("function");
});
});
+
+describe("runTrainer — shutdown signal handling", () => {
+ it("first SIGTERM calls trainer.requestEarlyStop and exits 0; second SIGTERM exits 143", async () => {
+ // Fake trainer whose `wait()` hangs until the test manually resolves it
+ // (via a global helper). This lets us hold the run in flight long
+ // enough to assert both signal-handling branches without racing the
+ // `finally` block that removes the listeners.
+ const trainerSrc = `
+ let earlyStopCalls = 0;
+ let resolveWait;
+ const waitPromise = new Promise((r) => { resolveWait = r; });
+ globalThis.__test_signalProbe = {
+ get earlyStopCalls() { return earlyStopCalls; },
+ finishWait: () => resolveWait({
+ job: {
+ id: "j1", orgId: "o", projectId: "p", name: "n",
+ status: "completed",
+ config: { model: "m", datasetSource: { type: "huggingface", name: "x" } },
+ createdAt: "2026",
+ },
+ artifacts: [],
+ }),
+ };
+ export const trainer = {
+ name: "n",
+ start: async () => ({ jobId: "j1" }),
+ wait: () => waitPromise,
+ cancel: async () => {},
+ requestEarlyStop: async () => { earlyStopCalls++; },
+ };
+ `;
+ const entry = join(cwd, "src/arkor/index.mjs");
+ mkdirSync(join(cwd, "src/arkor"), { recursive: true });
+ writeFileSync(entry, trainerSrc);
+
+ const exitCalls: number[] = [];
+ const exitSpy = vi
+ .spyOn(process, "exit")
+ .mockImplementation(((code?: number) => {
+ exitCalls.push(code ?? 0);
+ return undefined as never;
+ }) as typeof process.exit);
+ const stdoutSpy = vi
+ .spyOn(process.stdout, "write")
+ .mockImplementation((() => true) as typeof process.stdout.write);
+ try {
+ const runPromise = runTrainer("src/arkor/index.mjs");
+ // Wait for import + start() to settle so the handler is registered
+ // before we synthesise SIGTERM. The fake's `wait()` hangs forever, so
+ // the run remains in flight throughout the assertions.
+ await new Promise((r) => setTimeout(r, 25));
+
+ const probe = (globalThis as unknown as {
+ __test_signalProbe: {
+ earlyStopCalls: number;
+ finishWait: () => void;
+ };
+ }).__test_signalProbe;
+
+ // 1st SIGTERM → requestEarlyStop is called, exit(0) scheduled in the
+ // promise's `.finally`.
+ process.emit("SIGTERM", "SIGTERM");
+ await new Promise((r) => setTimeout(r, 25));
+ expect(probe.earlyStopCalls).toBe(1);
+ expect(exitCalls).toContain(0);
+
+ // 2nd SIGTERM (still in-flight, listeners not yet removed) →
+ // exit(143) immediately, no second requestEarlyStop call.
+ process.emit("SIGTERM", "SIGTERM");
+ await new Promise((r) => setTimeout(r, 25));
+ expect(probe.earlyStopCalls).toBe(1);
+ expect(exitCalls).toContain(143);
+
+ // Release the hung wait() so runPromise can complete and the
+ // shutdown handlers detach via the finally block.
+ probe.finishWait();
+ await runPromise;
+ } finally {
+ exitSpy.mockRestore();
+ stdoutSpy.mockRestore();
+ delete (globalThis as Record).__test_signalProbe;
+ }
+ });
+});
diff --git a/packages/arkor/src/core/runner.ts b/packages/arkor/src/core/runner.ts
index e674b70e..1fc5b0b2 100644
--- a/packages/arkor/src/core/runner.ts
+++ b/packages/arkor/src/core/runner.ts
@@ -42,6 +42,51 @@ function extractTrainer(mod: Record): Trainer {
);
}
+const SHUTDOWN_SIGNALS = ["SIGTERM", "SIGINT", "SIGHUP"] as const;
+
+/**
+ * Two-stage signal handling so HMR rebuilds (Studio sends SIGTERM) preserve
+ * the in-flight checkpoint work:
+ *
+ * - 1st signal → `trainer.requestEarlyStop()`. The trainer keeps running,
+ * lets the next `checkpoint.saved` event land, then issues `cancel()`.
+ * - 2nd signal → immediate `process.exit(143)`. Escape hatch for an
+ * impatient operator or a hung early-stop.
+ *
+ * The handlers are removed in `finally` so a normal `wait()` completion
+ * doesn't leave stale listeners behind — important because `runTrainer` can
+ * be called multiple times in tests within a single Node process.
+ */
+function installShutdownHandlers(trainer: Trainer): () => void {
+ let signalCount = 0;
+ const handler = (signal: NodeJS.Signals): void => {
+ signalCount += 1;
+ if (signalCount > 1) {
+ process.stdout.write(
+ `Received second ${signal}; exiting without waiting for checkpoint.\n`,
+ );
+ process.exit(143);
+ // Explicit return so test mocks of process.exit (which don't actually
+ // terminate the worker) don't fall through into the early-stop path.
+ return;
+ }
+ process.stdout.write(
+ `Received ${signal}; early-stopping at next checkpoint…\n`,
+ );
+ trainer
+ .requestEarlyStop()
+ .catch((err: unknown) => {
+ const msg = err instanceof Error ? err.message : String(err);
+ process.stderr.write(`requestEarlyStop failed: ${msg}\n`);
+ })
+ .finally(() => process.exit(0));
+ };
+ for (const sig of SHUTDOWN_SIGNALS) process.on(sig, handler);
+ return () => {
+ for (const sig of SHUTDOWN_SIGNALS) process.off(sig, handler);
+ };
+}
+
export async function runTrainer(file?: string): Promise {
const relative = file ?? DEFAULT_ENTRY;
const abs = isAbsolute(relative) ? relative : resolve(process.cwd(), relative);
@@ -53,8 +98,15 @@ export async function runTrainer(file?: string): Promise {
const mod = (await import(pathToFileURL(abs).href)) as Record;
const trainer = extractTrainer(mod);
- const { jobId } = await trainer.start();
- process.stdout.write(`Started job ${jobId}\n`);
- const result = await trainer.wait();
- process.stdout.write(`Job ${result.job.id} finished with status=${result.job.status}\n`);
+ const removeShutdownHandlers = installShutdownHandlers(trainer);
+ try {
+ const { jobId } = await trainer.start();
+ process.stdout.write(`Started job ${jobId}\n`);
+ const result = await trainer.wait();
+ process.stdout.write(
+ `Job ${result.job.id} finished with status=${result.job.status}\n`,
+ );
+ } finally {
+ removeShutdownHandlers();
+ }
}
diff --git a/packages/arkor/src/core/trainer.test.ts b/packages/arkor/src/core/trainer.test.ts
index 3e5633d2..13e8145c 100644
--- a/packages/arkor/src/core/trainer.test.ts
+++ b/packages/arkor/src/core/trainer.test.ts
@@ -1304,3 +1304,252 @@ describe("createTrainer (reconnect backoff + max attempts)", () => {
}
});
});
+
+describe("createTrainer (early stop)", () => {
+ const minimalJobRow = {
+ id: "j-stop",
+ orgId: "o1",
+ projectId: "p1",
+ name: "run",
+ status: "queued",
+ config: {
+ model: "m",
+ datasetSource: { type: "huggingface", name: "x" },
+ },
+ createdAt: "2026-01-01T00:00:00Z",
+ startedAt: null,
+ completedAt: null,
+ };
+
+ it("calls cancel after the next checkpoint when early-stop is requested mid-run", async () => {
+ await writeState(
+ { orgSlug: "anon-org", projectSlug: "proj", projectId: "p1" },
+ cwd,
+ );
+ // SSE stream: training.started → training.log → checkpoint.saved.
+ // The checkpoint event is the trigger for the early-stop branch in
+ // dispatch(); after that, the loop should treat the run as terminal
+ // (we asserted this by ending the wait() promise without sending
+ // training.completed).
+ const sse = [
+ `id: 1\nevent: training.started\ndata: ${JSON.stringify({
+ type: "training.started",
+ jobId: "j-stop",
+ timestamp: "2026-01-01T00:00:01Z",
+ })}\n\n`,
+ `id: 2\nevent: training.log\ndata: ${JSON.stringify({
+ type: "training.log",
+ jobId: "j-stop",
+ timestamp: "2026-01-01T00:00:02Z",
+ step: 1,
+ loss: 0.5,
+ })}\n\n`,
+ `id: 3\nevent: checkpoint.saved\ndata: ${JSON.stringify({
+ type: "checkpoint.saved",
+ jobId: "j-stop",
+ timestamp: "2026-01-01T00:00:03Z",
+ step: 10,
+ })}\n\n`,
+ ];
+
+ let cancelCalls = 0;
+ const fetcher: typeof fetch = (async (
+ input: RequestInfo | URL,
+ init?: RequestInit,
+ ) => {
+ const url = typeof input === "string" ? input : input.toString();
+ const method = init?.method ?? "GET";
+ if (method === "POST" && url.includes("/v1/jobs?")) {
+ return new Response(JSON.stringify({ job: minimalJobRow }), {
+ status: 201,
+ headers: { "content-type": "application/json" },
+ });
+ }
+ if (method === "GET" && url.includes("/v1/jobs/j-stop/events/stream")) {
+ return new Response(sseStream(sse), {
+ status: 200,
+ headers: { "content-type": "text/event-stream" },
+ });
+ }
+ if (method === "POST" && url.includes("/v1/jobs/j-stop/cancel")) {
+ cancelCalls += 1;
+ return new Response(JSON.stringify({ ok: true }), {
+ status: 200,
+ headers: { "content-type": "application/json" },
+ });
+ }
+ throw new Error(`unexpected fetch: ${method} ${url}`);
+ }) as typeof fetch;
+
+ const trainer = createTrainer(
+ {
+ name: "run",
+ model: "m",
+ dataset: { type: "huggingface", name: "x" },
+ callbacks: {
+ // Arm the early-stop latch from inside the on-log callback so it
+ // fires before the checkpoint dispatch — mirrors the real CLI
+ // path where SIGTERM arrives mid-run. Fire-and-forget so the
+ // dispatch loop isn't blocked waiting for the latch's own
+ // checkpoint trigger to arrive.
+ onLog: () => {
+ void trainer.requestEarlyStop({ timeoutMs: 60_000 });
+ },
+ },
+ },
+ { baseUrl: "http://mock", credentials: creds, cwd, reconnectDelayMs: 1 },
+ );
+ const original = globalThis.fetch;
+ globalThis.fetch = fetcher;
+ try {
+ await trainer.wait();
+ } finally {
+ globalThis.fetch = original;
+ }
+ expect(cancelCalls).toBe(1);
+ });
+
+ it("falls back to immediate cancel when no checkpoint arrives within timeoutMs", async () => {
+ await writeState(
+ { orgSlug: "anon-org", projectSlug: "proj", projectId: "p1" },
+ cwd,
+ );
+ // No checkpoint in the stream — only training.completed, which would
+ // normally finish the run. We hand-roll a stream that never ends so
+ // the timeout fallback is what actually triggers cancel.
+ let streamController: ReadableStreamDefaultController | null =
+ null;
+ const stallingStream = new ReadableStream({
+ start(controller) {
+ streamController = controller;
+ const enc = new TextEncoder();
+ controller.enqueue(
+ enc.encode(
+ `id: 1\nevent: training.started\ndata: ${JSON.stringify({
+ type: "training.started",
+ jobId: "j-stop",
+ timestamp: "2026-01-01T00:00:01Z",
+ })}\n\n`,
+ ),
+ );
+ },
+ });
+
+ let cancelCalls = 0;
+ const fetcher: typeof fetch = (async (
+ input: RequestInfo | URL,
+ init?: RequestInit,
+ ) => {
+ const url = typeof input === "string" ? input : input.toString();
+ const method = init?.method ?? "GET";
+ if (method === "POST" && url.includes("/v1/jobs?")) {
+ return new Response(JSON.stringify({ job: minimalJobRow }), {
+ status: 201,
+ headers: { "content-type": "application/json" },
+ });
+ }
+ if (method === "GET" && url.includes("/v1/jobs/j-stop/events/stream")) {
+ return new Response(stallingStream, {
+ status: 200,
+ headers: { "content-type": "text/event-stream" },
+ });
+ }
+ if (method === "POST" && url.includes("/v1/jobs/j-stop/cancel")) {
+ cancelCalls += 1;
+ // Closing the stream now mimics cloud-api's response to a cancel:
+ // the SSE channel ends and wait() exits its loop.
+ streamController?.close();
+ return new Response(JSON.stringify({ ok: true }), {
+ status: 200,
+ headers: { "content-type": "application/json" },
+ });
+ }
+ throw new Error(`unexpected fetch: ${method} ${url}`);
+ }) as typeof fetch;
+
+ const trainer = createTrainer(
+ {
+ name: "run",
+ model: "m",
+ dataset: { type: "huggingface", name: "x" },
+ },
+ { baseUrl: "http://mock", credentials: creds, cwd, reconnectDelayMs: 1 },
+ );
+
+ const original = globalThis.fetch;
+ globalThis.fetch = fetcher;
+ try {
+ await trainer.start();
+ // Tiny timeout so the test doesn't actually wait 5 minutes.
+ const stopPromise = trainer.requestEarlyStop({ timeoutMs: 5 });
+ await stopPromise;
+ expect(cancelCalls).toBe(1);
+ } finally {
+ globalThis.fetch = original;
+ }
+ });
+
+ it("is a no-op before start() and resolves immediately", async () => {
+ const trainer = createTrainer(
+ {
+ name: "run",
+ model: "m",
+ dataset: { type: "huggingface", name: "x" },
+ },
+ { baseUrl: "http://mock", credentials: creds, cwd, reconnectDelayMs: 1 },
+ );
+ // Should resolve without contacting cloud-api at all.
+ await trainer.requestEarlyStop({ timeoutMs: 1 });
+ });
+
+ it("is idempotent — repeated calls share the same in-flight promise", async () => {
+ await writeState(
+ { orgSlug: "anon-org", projectSlug: "proj", projectId: "p1" },
+ cwd,
+ );
+ let cancelCalls = 0;
+ const fetcher: typeof fetch = (async (
+ input: RequestInfo | URL,
+ init?: RequestInit,
+ ) => {
+ const url = typeof input === "string" ? input : input.toString();
+ const method = init?.method ?? "GET";
+ if (method === "POST" && url.includes("/v1/jobs?")) {
+ return new Response(JSON.stringify({ job: minimalJobRow }), {
+ status: 201,
+ headers: { "content-type": "application/json" },
+ });
+ }
+ if (method === "POST" && url.includes("/v1/jobs/j-stop/cancel")) {
+ cancelCalls += 1;
+ return new Response(JSON.stringify({ ok: true }), {
+ status: 200,
+ headers: { "content-type": "application/json" },
+ });
+ }
+ throw new Error(`unexpected fetch: ${method} ${url}`);
+ }) as typeof fetch;
+
+ const trainer = createTrainer(
+ {
+ name: "run",
+ model: "m",
+ dataset: { type: "huggingface", name: "x" },
+ },
+ { baseUrl: "http://mock", credentials: creds, cwd, reconnectDelayMs: 1 },
+ );
+ const original = globalThis.fetch;
+ globalThis.fetch = fetcher;
+ try {
+ await trainer.start();
+ const a = trainer.requestEarlyStop({ timeoutMs: 5 });
+ const b = trainer.requestEarlyStop({ timeoutMs: 5 });
+ await Promise.all([a, b]);
+ // The fallback timer fires once, so cancel is called once even though
+ // requestEarlyStop was called twice.
+ expect(cancelCalls).toBe(1);
+ } finally {
+ globalThis.fetch = original;
+ }
+ });
+});
diff --git a/packages/arkor/src/core/trainer.ts b/packages/arkor/src/core/trainer.ts
index 874382f0..f5e88ca5 100644
--- a/packages/arkor/src/core/trainer.ts
+++ b/packages/arkor/src/core/trainer.ts
@@ -139,6 +139,18 @@ export function createTrainer(
let scope: { orgSlug: string; projectSlug: string } | null = null;
let clientPromise: Promise | null = null;
+ // Early-stop state. `requestEarlyStop()` arms the latch; the next
+ // `checkpoint.saved` dispatch (or the timeout, whichever fires first)
+ // calls cancel() and resolves the deferred. Idempotent across repeat
+ // calls — they share the same deferred.
+ const DEFAULT_EARLY_STOP_TIMEOUT_MS = 5 * 60 * 1000;
+ let earlyStopDeferred: {
+ promise: Promise;
+ resolve: () => void;
+ timer: NodeJS.Timeout | null;
+ } | null = null;
+ let earlyStopRequested = false;
+
async function getClient(): Promise {
if (!clientPromise) {
clientPromise = (async () => {
@@ -244,6 +256,15 @@ export function createTrainer(
artifacts: event.artifacts,
};
await callbacks.onCheckpoint?.(ctx);
+ // Early-stop latch: a checkpoint just landed, so the in-flight work
+ // is durable. Cancel the cloud job and end `wait()` cleanly.
+ if (earlyStopRequested && earlyStopDeferred) {
+ await trainer.cancel();
+ if (earlyStopDeferred.timer) clearTimeout(earlyStopDeferred.timer);
+ earlyStopDeferred.resolve();
+ earlyStopDeferred = null;
+ return { terminal: true, artifacts: terminalResult?.artifacts ?? [] };
+ }
return { terminal: false, artifacts: terminalResult?.artifacts ?? [] };
}
case "training.completed": {
@@ -390,6 +411,47 @@ export function createTrainer(
const client = await getClient();
await client.cancelJob(startedJob.id, scope);
},
+
+ async requestEarlyStop(opts: { timeoutMs?: number } = {}): Promise {
+ // Nothing in flight: cleanup any prior latch and resolve.
+ if (!startedJob || !scope || TERMINAL_STATUSES.has(startedJob.status)) {
+ if (earlyStopDeferred) {
+ if (earlyStopDeferred.timer) clearTimeout(earlyStopDeferred.timer);
+ earlyStopDeferred.resolve();
+ earlyStopDeferred = null;
+ }
+ earlyStopRequested = false;
+ return;
+ }
+ // Idempotent: a second call piggybacks on the first.
+ if (earlyStopDeferred) return earlyStopDeferred.promise;
+
+ earlyStopRequested = true;
+ let resolveFn!: () => void;
+ const promise = new Promise((resolve) => {
+ resolveFn = resolve;
+ });
+ const timeoutMs = opts.timeoutMs ?? DEFAULT_EARLY_STOP_TIMEOUT_MS;
+ const timer = setTimeout(() => {
+ // Timed out waiting for a checkpoint — fall back to immediate cancel.
+ // Capture the active deferred reference: by the time the cancel POST
+ // resolves, the checkpoint branch may have nulled out the shared
+ // slot, but this fallback path still owns the deferred it created.
+ const active = earlyStopDeferred;
+ trainer
+ .cancel()
+ .catch(() => {})
+ .finally(() => {
+ if (active) active.resolve();
+ if (earlyStopDeferred === active) earlyStopDeferred = null;
+ });
+ }, timeoutMs);
+ // `Timer.unref` keeps the early-stop timer from blocking process exit
+ // when the host runtime finishes for unrelated reasons.
+ timer.unref?.();
+ earlyStopDeferred = { promise, resolve: resolveFn, timer };
+ return promise;
+ },
};
return trainer;
diff --git a/packages/arkor/src/core/types.ts b/packages/arkor/src/core/types.ts
index e5fe1f26..c0ec4d31 100644
--- a/packages/arkor/src/core/types.ts
+++ b/packages/arkor/src/core/types.ts
@@ -200,6 +200,17 @@ export interface Trainer {
wait(): Promise;
/** Best-effort cancel; resolves once the cloud API accepts the request. */
cancel(): Promise;
+ /**
+ * Stop after the next saved checkpoint. The trainer keeps running, lets the
+ * in-flight step finish + checkpoint upload complete, then issues `cancel()`.
+ * Resolves once the cancel POST has been accepted. Falls back to immediate
+ * cancel if no checkpoint arrives within `timeoutMs` (default: 5 min).
+ *
+ * Idempotent: repeat calls return the same in-flight promise. If the job
+ * has not been `start()`ed or has already reached a terminal status, this
+ * resolves immediately without contacting the cloud API.
+ */
+ requestEarlyStop(opts?: { timeoutMs?: number }): Promise;
}
/**
diff --git a/packages/arkor/src/studio/hmr.test.ts b/packages/arkor/src/studio/hmr.test.ts
new file mode 100644
index 00000000..2ea6392c
--- /dev/null
+++ b/packages/arkor/src/studio/hmr.test.ts
@@ -0,0 +1,141 @@
+import { describe, it, expect, beforeEach, afterEach } from "vitest";
+import {
+ mkdirSync,
+ mkdtempSync,
+ rmSync,
+ writeFileSync,
+} from "node:fs";
+import { tmpdir } from "node:os";
+import { join } from "node:path";
+import { createHmrCoordinator, type HmrEvent } from "./hmr";
+
+const FAKE_MANIFEST = `export const arkor = Object.freeze({
+ _kind: "arkor",
+ trainer: { name: "alpha" },
+});
+`;
+
+let cwd: string;
+
+beforeEach(() => {
+ cwd = mkdtempSync(join(tmpdir(), "arkor-hmr-test-"));
+});
+
+afterEach(() => {
+ rmSync(cwd, { recursive: true, force: true });
+});
+
+function nextEvent(
+ events: HmrEvent[],
+ predicate: (e: HmrEvent) => boolean,
+ timeoutMs = 10_000,
+): Promise {
+ return new Promise((resolve, reject) => {
+ const start = Date.now();
+ const tick = () => {
+ const found = events.find(predicate);
+ if (found) return resolve(found);
+ if (Date.now() - start > timeoutMs) {
+ return reject(
+ new Error(
+ `Timed out waiting for matching HMR event after ${timeoutMs}ms`,
+ ),
+ );
+ }
+ setTimeout(tick, 25);
+ };
+ tick();
+ });
+}
+
+describe("createHmrCoordinator", () => {
+ it("emits a `ready` event after the first successful build", async () => {
+ mkdirSync(join(cwd, "src/arkor"), { recursive: true });
+ writeFileSync(join(cwd, "src/arkor/index.ts"), FAKE_MANIFEST);
+
+ const events: HmrEvent[] = [];
+ const hmr = createHmrCoordinator({ cwd });
+ hmr.subscribe((e) => events.push(e));
+ try {
+ const ready = await nextEvent(events, (e) => e.type === "ready");
+ expect(ready.outFile).toMatch(/\.arkor[\\/]+build[\\/]+index\.mjs$/);
+ expect(typeof ready.hash).toBe("string");
+ } finally {
+ await hmr.dispose();
+ }
+ });
+
+ it("emits a `rebuild` event after a source edit", async () => {
+ mkdirSync(join(cwd, "src/arkor"), { recursive: true });
+ writeFileSync(join(cwd, "src/arkor/index.ts"), FAKE_MANIFEST);
+
+ const events: HmrEvent[] = [];
+ const hmr = createHmrCoordinator({ cwd });
+ hmr.subscribe((e) => events.push(e));
+ try {
+ const ready = await nextEvent(events, (e) => e.type === "ready");
+ // Touch the entry with new content so the watcher detects a change.
+ writeFileSync(
+ join(cwd, "src/arkor/index.ts"),
+ FAKE_MANIFEST.replace(`"alpha"`, `"beta"`),
+ );
+ const rebuild = await nextEvent(events, (e) => e.type === "rebuild");
+ expect(rebuild.outFile).toBe(ready.outFile);
+ expect(rebuild.hash).not.toBe(ready.hash);
+ } finally {
+ await hmr.dispose();
+ }
+ });
+
+ it("emits an `error` event when the entry is missing on subscribe", async () => {
+ const events: HmrEvent[] = [];
+ const hmr = createHmrCoordinator({ cwd });
+ hmr.subscribe((e) => events.push(e));
+ try {
+ const err = await nextEvent(events, (e) => e.type === "error", 1000);
+ expect(err.message).toMatch(/Build entry not found/);
+ } finally {
+ await hmr.dispose();
+ }
+ });
+
+ it("replays the latest event to late subscribers", async () => {
+ mkdirSync(join(cwd, "src/arkor"), { recursive: true });
+ writeFileSync(join(cwd, "src/arkor/index.ts"), FAKE_MANIFEST);
+
+ const firstEvents: HmrEvent[] = [];
+ const hmr = createHmrCoordinator({ cwd });
+ hmr.subscribe((e) => firstEvents.push(e));
+ try {
+ await nextEvent(firstEvents, (e) => e.type === "ready");
+ // A new subscriber should receive the cached state synchronously
+ // before any new build is triggered.
+ const lateEvents: HmrEvent[] = [];
+ hmr.subscribe((e) => lateEvents.push(e));
+ expect(lateEvents.length).toBeGreaterThanOrEqual(1);
+ expect(lateEvents[0]?.type).toBe("ready");
+ } finally {
+ await hmr.dispose();
+ }
+ });
+
+ it("stops broadcasting after dispose()", async () => {
+ mkdirSync(join(cwd, "src/arkor"), { recursive: true });
+ writeFileSync(join(cwd, "src/arkor/index.ts"), FAKE_MANIFEST);
+
+ const events: HmrEvent[] = [];
+ const hmr = createHmrCoordinator({ cwd });
+ hmr.subscribe((e) => events.push(e));
+ await nextEvent(events, (e) => e.type === "ready");
+ await hmr.dispose();
+ const countAfterDispose = events.length;
+
+ // Edit after dispose must not produce any further events.
+ writeFileSync(
+ join(cwd, "src/arkor/index.ts"),
+ FAKE_MANIFEST.replace(`"alpha"`, `"gamma"`),
+ );
+ await new Promise((r) => setTimeout(r, 250));
+ expect(events.length).toBe(countAfterDispose);
+ });
+});
diff --git a/packages/arkor/src/studio/hmr.ts b/packages/arkor/src/studio/hmr.ts
new file mode 100644
index 00000000..42a03078
--- /dev/null
+++ b/packages/arkor/src/studio/hmr.ts
@@ -0,0 +1,148 @@
+import { existsSync, statSync } from "node:fs";
+import { isAbsolute, resolve } from "node:path";
+import { watch, type RolldownWatcher } from "rolldown";
+
+export type HmrEventType = "ready" | "rebuild" | "error";
+
+export interface HmrEvent {
+ type: HmrEventType;
+ outFile?: string;
+ /** Short content fingerprint (mtime + size) so subscribers can dedupe. */
+ hash?: string;
+ /** Human-readable error message; only present on `type === "error"`. */
+ message?: string;
+}
+
+export interface HmrCoordinator {
+ /**
+ * Receive the current cached state immediately, then every subsequent event.
+ * Returns an unsubscribe function.
+ */
+ subscribe(fn: (event: HmrEvent) => void): () => void;
+ dispose(): Promise;
+}
+
+export interface HmrOptions {
+ cwd: string;
+ /** Defaults to `src/arkor/index.ts`. */
+ entry?: string;
+ /** Defaults to `.arkor/build`. */
+ outDir?: string;
+}
+
+const DEFAULT_ENTRY = "src/arkor/index.ts";
+const DEFAULT_OUT_DIR = ".arkor/build";
+
+function resolveNodeTarget(): string {
+ const [major = "22", minor = "6"] = process.versions.node.split(".");
+ return `node${major}.${minor}`;
+}
+
+function fingerprint(outFile: string): string {
+ try {
+ const s = statSync(outFile);
+ return `${s.mtimeMs.toFixed(0)}-${s.size}`;
+ } catch {
+ return Date.now().toString(36);
+ }
+}
+
+/**
+ * Spin up a rolldown watcher over the user's `src/arkor` entry, broadcasting
+ * `ready` / `rebuild` / `error` to subscribers. Used by `arkor dev` to push
+ * `/api/dev/events` SSE notifications to the SPA.
+ *
+ * Lazy: the watcher only starts on the first `subscribe` call so a Studio
+ * launch in a project without `src/arkor/index.ts` doesn't immediately fail
+ * — the watcher kicks in once the user creates the file and the SPA opens an
+ * EventSource. After every successful build the watcher caches the latest
+ * state and replays it to new subscribers so a late-mounting component still
+ * sees the trainer.
+ */
+export function createHmrCoordinator(opts: HmrOptions): HmrCoordinator {
+ const cwd = opts.cwd;
+ const entryRel = opts.entry ?? DEFAULT_ENTRY;
+ const entry = isAbsolute(entryRel) ? entryRel : resolve(cwd, entryRel);
+ const outDirRel = opts.outDir ?? DEFAULT_OUT_DIR;
+ const outDir = isAbsolute(outDirRel) ? outDirRel : resolve(cwd, outDirRel);
+ const outFile = resolve(outDir, "index.mjs");
+
+ const subscribers = new Set<(event: HmrEvent) => void>();
+ let lastEvent: HmrEvent | null = null;
+ let watcher: RolldownWatcher | null = null;
+ let disposed = false;
+
+ function broadcast(event: HmrEvent): void {
+ lastEvent = event;
+ for (const fn of subscribers) {
+ try {
+ fn(event);
+ } catch {
+ // Subscribers are SSE controllers — a thrown error usually means the
+ // connection closed mid-flight. Drop it so one bad subscriber can't
+ // poison the broadcast for the rest.
+ }
+ }
+ }
+
+ function startWatcher(): void {
+ if (watcher || disposed) return;
+ if (!existsSync(entry)) {
+ broadcast({
+ type: "error",
+ message: `Build entry not found: ${entry}. Create ${DEFAULT_ENTRY} or pass an explicit entry argument.`,
+ });
+ return;
+ }
+ watcher = watch({
+ input: entry,
+ cwd,
+ platform: "node",
+ logLevel: "warn",
+ transform: { target: resolveNodeTarget() },
+ external: (id, _importer, isResolved) => {
+ if (isResolved) return false;
+ if (id.startsWith(".")) return false;
+ if (isAbsolute(id)) return false;
+ return true;
+ },
+ output: { file: outFile, format: "esm" },
+ });
+ let firstBuild = true;
+ watcher.on("event", (event) => {
+ if (event.code === "BUNDLE_END") {
+ // rolldown requires the per-build result to be closed to avoid leaks.
+ event.result.close().catch(() => {});
+ const type: HmrEventType = firstBuild ? "ready" : "rebuild";
+ firstBuild = false;
+ broadcast({ type, outFile, hash: fingerprint(outFile) });
+ } else if (event.code === "ERROR") {
+ event.result.close().catch(() => {});
+ broadcast({
+ type: "error",
+ message: event.error instanceof Error ? event.error.message : String(event.error),
+ });
+ }
+ });
+ }
+
+ return {
+ subscribe(fn) {
+ subscribers.add(fn);
+ if (lastEvent) fn(lastEvent);
+ startWatcher();
+ return () => {
+ subscribers.delete(fn);
+ };
+ },
+ async dispose() {
+ disposed = true;
+ subscribers.clear();
+ if (watcher) {
+ const w = watcher;
+ watcher = null;
+ await w.close().catch(() => {});
+ }
+ },
+ };
+}
diff --git a/packages/arkor/src/studio/server.test.ts b/packages/arkor/src/studio/server.test.ts
index 48c1651e..21cbf94d 100644
--- a/packages/arkor/src/studio/server.test.ts
+++ b/packages/arkor/src/studio/server.test.ts
@@ -11,6 +11,7 @@ import {
import { tmpdir } from "node:os";
import { join, resolve } from "node:path";
import { buildStudioApp } from "./server";
+import type { HmrCoordinator, HmrEvent } from "./hmr";
import { writeCredentials } from "../core/credentials";
import { writeState } from "../core/state";
import {
@@ -1337,4 +1338,129 @@ process.exit(0);
expect(body.error).toMatch(/credentials|login/i);
});
});
+
+ describe("/api/dev/events (HMR)", () => {
+ function fakeHmr() {
+ // Mirror the real HmrCoordinator surface but stay synchronous so the
+ // test doesn't depend on rolldown.watch starting up. `emit` is a test
+ // hook for pushing events into the SSE stream from the test body.
+ const subs = new Set<(e: HmrEvent) => void>();
+ const coordinator: HmrCoordinator = {
+ subscribe(fn) {
+ subs.add(fn);
+ return () => {
+ subs.delete(fn);
+ };
+ },
+ async dispose() {
+ subs.clear();
+ },
+ };
+ return {
+ coordinator,
+ emit(event: HmrEvent) {
+ for (const fn of subs) fn(event);
+ },
+ get subscriberCount() {
+ return subs.size;
+ },
+ };
+ }
+
+ it("is unregistered when no hmr coordinator is supplied", async () => {
+ const app = build();
+ const res = await app.request("/api/dev/events", {
+ headers: {
+ host: "127.0.0.1:4000",
+ "x-arkor-studio-token": STUDIO_TOKEN,
+ },
+ });
+ expect(res.status).toBe(404);
+ });
+
+ it("rejects /api/dev/events without a token", async () => {
+ const fake = fakeHmr();
+ const app = buildStudioApp({
+ baseUrl: "http://mock",
+ assetsDir,
+ autoAnonymous: false,
+ studioToken: STUDIO_TOKEN,
+ cwd: trainCwd,
+ hmr: fake.coordinator,
+ });
+ const res = await app.request("/api/dev/events", {
+ headers: { host: "127.0.0.1:4000" },
+ });
+ expect(res.status).toBe(403);
+ });
+
+ it("accepts the studio token via ?studioToken= for the dev event stream", async () => {
+ const fake = fakeHmr();
+ const app = buildStudioApp({
+ baseUrl: "http://mock",
+ assetsDir,
+ autoAnonymous: false,
+ studioToken: STUDIO_TOKEN,
+ cwd: trainCwd,
+ hmr: fake.coordinator,
+ });
+ const res = await app.request(
+ `/api/dev/events?studioToken=${encodeURIComponent(STUDIO_TOKEN)}`,
+ { headers: { host: "127.0.0.1:4000" } },
+ );
+ expect(res.status).toBe(200);
+ expect(res.headers.get("content-type")).toBe("text/event-stream");
+ // Cancelling the body's reader should release the subscriber.
+ const reader = res.body!.getReader();
+ await reader.cancel();
+ expect(fake.subscriberCount).toBe(0);
+ });
+
+ it("rejects /api/dev/events when host header is non-loopback", async () => {
+ const fake = fakeHmr();
+ const app = buildStudioApp({
+ baseUrl: "http://mock",
+ assetsDir,
+ autoAnonymous: false,
+ studioToken: STUDIO_TOKEN,
+ cwd: trainCwd,
+ hmr: fake.coordinator,
+ });
+ const res = await app.request(
+ `/api/dev/events?studioToken=${encodeURIComponent(STUDIO_TOKEN)}`,
+ { headers: { host: "evil.example.com" } },
+ );
+ expect(res.status).toBe(403);
+ });
+
+ it("forwards rebuild events as SSE frames", async () => {
+ const fake = fakeHmr();
+ const app = buildStudioApp({
+ baseUrl: "http://mock",
+ assetsDir,
+ autoAnonymous: false,
+ studioToken: STUDIO_TOKEN,
+ cwd: trainCwd,
+ hmr: fake.coordinator,
+ });
+ const res = await app.request(
+ `/api/dev/events?studioToken=${encodeURIComponent(STUDIO_TOKEN)}`,
+ { headers: { host: "127.0.0.1:4000" } },
+ );
+ const reader = res.body!.getReader();
+ const decoder = new TextDecoder();
+
+ fake.emit({ type: "ready", outFile: "/tmp/x", hash: "abc" });
+ // Read chunks until we have at least one full SSE frame.
+ let received = "";
+ while (!received.includes("\n\n")) {
+ const { value, done } = await reader.read();
+ if (done) break;
+ received += decoder.decode(value, { stream: true });
+ }
+ expect(received).toContain("event: ready");
+ expect(received).toContain('"outFile":"/tmp/x"');
+ await reader.cancel();
+ });
+ });
});
diff --git a/packages/arkor/src/studio/server.ts b/packages/arkor/src/studio/server.ts
index cd8d3209..d7f744ee 100644
--- a/packages/arkor/src/studio/server.ts
+++ b/packages/arkor/src/studio/server.ts
@@ -1,4 +1,4 @@
-import { spawn } from "node:child_process";
+import { spawn, type ChildProcess } from "node:child_process";
import { readFile, realpath } from "node:fs/promises";
import { timingSafeEqual } from "node:crypto";
import { Hono } from "hono";
@@ -16,6 +16,7 @@ import { SDK_VERSION } from "../core/version";
import { ensureProjectState } from "../core/projectState";
import { readState } from "../core/state";
import { readManifestSummary } from "./manifest";
+import type { HmrCoordinator, HmrEvent } from "./hmr";
const DEPRECATION_HEADERS = ["Deprecation", "Sunset", "Warning"] as const;
function copyDeprecationHeaders(from: Headers, to: Headers): void {
@@ -59,6 +60,15 @@ export interface StudioServerOptions {
* here points at the bin itself). Override in tests.
*/
binPath?: string;
+ /**
+ * Optional HMR coordinator. When provided, the server registers
+ * `/api/dev/events` as an SSE stream that pushes rebuild / error events to
+ * the SPA, and rebuilds also signal SIGTERM to active `/api/train`
+ * subprocesses so they early-stop at the next checkpoint and the SPA can
+ * restart them with the new bundle. Wired in by `arkor dev`; left
+ * undefined for any non-dev consumer of `buildStudioApp`.
+ */
+ hmr?: HmrCoordinator;
}
function tokensMatch(provided: string, expected: string): boolean {
@@ -111,7 +121,12 @@ export function buildStudioApp(options: StudioServerOptions) {
const app = new Hono();
const loopbackHostPattern = /^(127\.0\.0\.1|localhost)(:\d+)?$/;
- const jobEventsPathPattern = /^\/api\/jobs\/[^/]+\/events$/;
+ // Routes where `?studioToken=` is accepted instead of the
+ // `X-Arkor-Studio-Token` header. Used only for `EventSource` streams,
+ // which cannot send custom headers. Adding to this list is CSRF-sensitive:
+ // it must always be a GET stream-only route, never a mutation endpoint.
+ const eventStreamPathPattern =
+ /^\/api\/jobs\/[^/]+\/events$|^\/api\/dev\/events$/;
// Host-header guard for every route, including static HTML that carries the
// per-launch Studio token. This is the DNS-rebinding boundary: a victim
@@ -138,7 +153,7 @@ export function buildStudioApp(options: StudioServerOptions) {
// require the header so a leaked token in a URL is not enough to POST.
app.use("/api/*", async (c, next) => {
const queryTokenAllowed =
- c.req.method === "GET" && jobEventsPathPattern.test(c.req.path);
+ c.req.method === "GET" && eventStreamPathPattern.test(c.req.path);
const provided =
c.req.header("x-arkor-studio-token") ??
(queryTokenAllowed ? c.req.query("studioToken") : undefined) ??
@@ -279,6 +294,32 @@ export function buildStudioApp(options: StudioServerOptions) {
return new Response(upstream.body, { status: upstream.status, headers });
});
+ // Active `/api/train` subprocesses. HMR rebuilds iterate this map and
+ // SIGTERM each entry so its in-process signal handler (see
+ // `runTrainer`) can call `trainer.requestEarlyStop()`. Keyed by pid so
+ // tests can introspect.
+ interface ActiveTrain {
+ child: ChildProcess;
+ trainFile?: string;
+ }
+ const activeTrains = new Map();
+
+ function requestEarlyStopOnActive(): Array<{
+ pid: number;
+ trainFile?: string;
+ }> {
+ const targets: Array<{ pid: number; trainFile?: string }> = [];
+ for (const [pid, entry] of activeTrains) {
+ try {
+ entry.child.kill("SIGTERM");
+ } catch {
+ // child may have already exited between the iterator and the kill
+ }
+ targets.push({ pid, trainFile: entry.trainFile });
+ }
+ return targets;
+ }
+
app.post("/api/train", async (c) => {
const body = (await c.req.json().catch(() => ({}))) as { file?: string };
let trainFile: string | undefined;
@@ -312,17 +353,22 @@ export function buildStudioApp(options: StudioServerOptions) {
stdio: "pipe",
cwd: trainCwd,
});
+ if (typeof child.pid === "number") {
+ activeTrains.set(child.pid, { child, trainFile });
+ }
const stream = new ReadableStream({
start(controller) {
const enc = new TextEncoder();
child.stdout.on("data", (d) => controller.enqueue(enc.encode(d)));
child.stderr.on("data", (d) => controller.enqueue(enc.encode(d)));
child.on("close", (code) => {
+ if (typeof child.pid === "number") activeTrains.delete(child.pid);
controller.enqueue(enc.encode(`\n---\nexit=${code}\n`));
controller.close();
});
},
cancel() {
+ if (typeof child.pid === "number") activeTrains.delete(child.pid);
child.kill();
},
});
@@ -332,6 +378,65 @@ export function buildStudioApp(options: StudioServerOptions) {
});
});
+ // `/api/dev/events` — SSE stream of HMR rebuild / error notifications.
+ // Only active when `arkor dev` passed an HMR coordinator. The CSRF model
+ // accepts `?studioToken=` here (whitelisted in `eventStreamPathPattern`)
+ // because `EventSource` cannot send headers. When HMR is not configured
+ // the route still has an explicit 404 so the request doesn't fall through
+ // to the SPA index.html (which would mislead the SPA into thinking the
+ // EventSource connected successfully).
+ if (!options.hmr) {
+ app.get("/api/dev/events", (c) =>
+ c.json({ error: "HMR not enabled" }, 404),
+ );
+ }
+ if (options.hmr) {
+ const hmr = options.hmr;
+ app.get("/api/dev/events", (c) => {
+ let unsubscribe: (() => void) | null = null;
+ const stream = new ReadableStream({
+ start(controller) {
+ const enc = new TextEncoder();
+ const send = (
+ event: HmrEvent & {
+ restart?: boolean;
+ restartTargets?: Array<{ pid: number; trainFile?: string }>;
+ },
+ ) => {
+ const payload = JSON.stringify(event);
+ try {
+ controller.enqueue(
+ enc.encode(`event: ${event.type}\ndata: ${payload}\n\n`),
+ );
+ } catch {
+ // controller closed mid-write; the unsubscribe path below
+ // takes care of the rest.
+ }
+ };
+ unsubscribe = hmr.subscribe((event) => {
+ if (event.type === "rebuild" && activeTrains.size > 0) {
+ const restartTargets = requestEarlyStopOnActive();
+ send({ ...event, restart: true, restartTargets });
+ } else {
+ send(event);
+ }
+ });
+ },
+ cancel() {
+ unsubscribe?.();
+ unsubscribe = null;
+ },
+ });
+ return new Response(stream, {
+ status: 200,
+ headers: {
+ "content-type": "text/event-stream",
+ "cache-control": "no-cache, no-transform",
+ },
+ });
+ });
+ }
+
// Playground hits this so mid-training inference from Studio has the same
// auth path as the rest of /api/*. State is auto-bootstrapped (anon only)
// so the Playground's base-model mode works on a fresh launch with no
diff --git a/packages/studio-app/src/components/RunTraining.tsx b/packages/studio-app/src/components/RunTraining.tsx
index efa8b6bd..b5f46ab5 100644
--- a/packages/studio-app/src/components/RunTraining.tsx
+++ b/packages/studio-app/src/components/RunTraining.tsx
@@ -1,7 +1,9 @@
import { useEffect, useRef, useState } from "react";
import {
fetchManifest,
+ openDevEvents,
streamTraining,
+ type DevEvent,
type ManifestResult,
} from "../lib/api";
@@ -9,7 +11,13 @@ export function RunTraining() {
const [running, setRunning] = useState(false);
const [log, setLog] = useState("");
const [manifest, setManifest] = useState(null);
+ const [hmrStatus, setHmrStatus] = useState<
+ "idle" | "rebuilding" | "early-stopping" | "restarting"
+ >("idle");
const boxRef = useRef(null);
+ const lastTrainFileRef = useRef(undefined);
+ const restartPendingRef = useRef(false);
+ const runningRef = useRef(false);
useEffect(() => {
let cancelled = false;
@@ -29,7 +37,54 @@ export function RunTraining() {
};
}, []);
- async function run() {
+ // HMR: listen for rebuild notifications from `arkor dev` and refresh the
+ // manifest. When a rebuild also early-stopped a running training run, the
+ // server flags `restart: true`; defer the actual re-invocation until the
+ // current `streamTraining` resolves so we don't run two cloud jobs at once.
+ useEffect(() => {
+ const es = openDevEvents();
+ const onMessage = (raw: MessageEvent) => {
+ let payload: DevEvent;
+ try {
+ payload = JSON.parse(raw.data) as DevEvent;
+ } catch {
+ return;
+ }
+ if (payload.type === "error") {
+ setManifest({ error: payload.message ?? "Build failed" });
+ setHmrStatus("idle");
+ return;
+ }
+ // Always refresh the manifest on ready/rebuild.
+ void fetchManifest()
+ .then(setManifest)
+ .catch((err: unknown) => {
+ setManifest({
+ error: err instanceof Error ? err.message : String(err),
+ });
+ });
+ if (payload.restart) {
+ // Training run is early-stopping; the active stream will resolve
+ // once the next checkpoint lands and the subprocess exits cleanly.
+ // The `finally` block of `run()` picks up the pending flag and
+ // re-spawns with the same args.
+ restartPendingRef.current = true;
+ setHmrStatus(runningRef.current ? "early-stopping" : "idle");
+ } else {
+ setHmrStatus("idle");
+ }
+ };
+ es.addEventListener("ready", onMessage);
+ es.addEventListener("rebuild", onMessage);
+ es.addEventListener("error", onMessage);
+ return () => {
+ es.close();
+ };
+ }, []);
+
+ async function run(file?: string): Promise {
+ runningRef.current = true;
+ lastTrainFileRef.current = file;
setRunning(true);
setLog("");
try {
@@ -41,11 +96,23 @@ export function RunTraining() {
});
return next;
});
- });
+ }, file);
} catch (err) {
setLog((prev) => prev + `\n[error] ${err instanceof Error ? err.message : String(err)}\n`);
} finally {
+ runningRef.current = false;
setRunning(false);
+ if (restartPendingRef.current) {
+ restartPendingRef.current = false;
+ setHmrStatus("restarting");
+ // Re-spawn with the same args after a microtask so React commits the
+ // `running=false` state first (otherwise the re-entry overlaps).
+ queueMicrotask(() => {
+ void run(lastTrainFileRef.current);
+ });
+ } else {
+ setHmrStatus("idle");
+ }
}
}
@@ -75,9 +142,15 @@ export function RunTraining() {
createArkor.
)}
-