feat(chat-sessions): persistent chat sessions, client-driven#960
feat(chat-sessions): persistent chat sessions, client-driven#960dylan-savage wants to merge 10 commits into
Conversation
Implements Feature 1 of the multimodal-chat-sessions branch
(claude/research/multimodal-chat-sessions/TDD - chat sessions.md).
Persistence is fully owned by the TS/Python client via existing fs*
primitives; the engine/chat node is unchanged except for a one-field
schema addition and a built-in agent tool. Layout per TDD §5:
.chats/
├── catalog.json
└── <chat-guid>/
└── chat.jsonl
- SDK: new Question.chat_id (TS + Python) with UUID validation.
- Engine: AgentContext.chat_id threaded from the inbound Question;
AgentBase._builtin_tools / _recall_history give agents a built-in,
path-less recall_history(before_seq?, limit?) that reads
.chats/<chat_id>/chat.jsonl via the per-account FileStore.
- TS Chat class: create/open/send/rename/delete with optimistic-version
retry on catalog.json; client.chats.list namespace.
- Python Chat mirror with identical on-disk shape.
- Chat node: persistSessions toggle; IEndpoint appends
&pipelineId=&persist=1 to the embed URL when enabled.
- chat-ui: ChatList sidebar, auto-resume the most recent chat per
pipelineId, rename/delete affordances, Chat-aware sendMessage.
- Tests: 18 TS unit tests, 14 Python unit tests, 7 engine contract
tests covering atomic-per-turn writes, optimistic-lock retry,
chat-id routing, schema-version forward-compat, partial-write
tolerance.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…cription jsoncpp in the engine binary rejects the U+2014 em-dash that was in the persistSessions field description, causing 'Throwing JSON exception assert json failed' at engine startup and breaking ./builder build at server:setup-pip. Plain ASCII colon preserves the meaning and parses cleanly through both jsoncpp and the existing services.json test contract logic. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The previous shape (Pipe.source.parameters.properties as an array of
inline field objects) was wrong — that schema crashed the engine's
embedded jsoncpp at startup (Throwing JSON exception ... SIGABRT),
breaking ./builder server:run and any eaas.py launch. Empty {} in the
properties array reproduced the crash, confirming the issue is the
shape itself rather than any single key or non-ASCII char.
Correct shape (mirrors llm_openai/services.json and others):
- Field definitions live as dict entries at the top level of "fields".
- Section containers (Pipe.source.parameters) carry a "properties"
array of field-name strings referencing those top-level entries.
persistSessions is now exposed correctly and renders in dropper-ui's
NodeConfigPanel. IEndpoint.py's parameters.get('persistSessions', False)
read works unchanged.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Batch-commit of fixes caught during end-to-end manual testing and the TypeScript/Python SDK smoke harness. Feature 1 (persistent chat sessions) is now fully validated against a live server. - services.chat.json: drop `hideForm` from the Chat shape's properties so NodeConfigPanel actually renders the new `persistSessions` toggle. The toggle existed in `fields` already but was being suppressed by the hideForm gate in NodeConfigPanel. - IEndpoint.py: resolve `pipeline_id` from `jobConfig.config.pipeline.project_id` (stable across runs) rather than `serviceConfig.name` / `jobConfig.taskId` (taskId rolls every run and cannot anchor a resume). When persist is on, embed the private `tk_*` task token in the chat URL instead of the public `pk_*` auth key — only `tk_*` carries the `task.store` permission needed for fs_mkdir/fs_write on the user's filestore. Documented as a v1 trade-off; a narrower `task.store.chat` scope is the long-term fix. - ChatContainer.tsx: ref-gate the auto-resume effect so "New chat" doesn't immediately re-open the most recent chat. The effect previously had `currentChat` in its deps and would re-fire when set to null. - ChatList.tsx: replace `window.confirm()` (silently dropped inside the VS Code webview sandbox — iframe omits `allow-modals`) with a two-click inline arm pattern; trash icon background turns red when armed and auto-disarms after 3s. Also set explicit `color` on rename/delete buttons so they're visible in dark themes. - Chat.ts + chat.py (mutateCatalog / _mutate_catalog): the FileStore enforces a per-path write-lock that throws "File already open for writing" before our optimistic-version check ever sees the conflict. Wrap the catalog write in the existing retry loop so backend-level lock contention triggers the same backoff as version drift. Caught by the Python SDK smoke test's concurrent-rename section; the TS unit suite (in-memory mock) and two-tab manual test happened to miss it. - Chat.ts (delete): surface the fsRmdir failure via console.warn so the empty `<guid>/` directory cruft is visible during dev. Cosmetic. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The recall_history tool surface and its supporting plumbing are pulled before the chat-sessions PR merges. Model-facing history mechanisms move to the standalone context-engineering workstream. - agent.py: drop _RECALL_HISTORY_* constants, _builtin_tools(), call_tool dispatch branch, _recall_history_metric(), _recall_history(), _slim_turn(), _read_chat_jsonl_bytes(), and the chat_id wiring through run_agent. - _internal/host.py: drop builtin_tools param from Tools.__init__ and AgentHostServices.__init__, drop AgentContext.chat_id field. - tests: delete test_chat_id_routing.py (the entire file covered recall_history; nothing else lived there). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…s.history The clients no longer derive Question.history from chat.jsonl. The EAGER_HISTORY_TURNS sliding window and the persist-strip dance both go away. Chat.send() instead accepts a caller-supplied history list so the UI (or any other caller) decides what to prime the model with. chat-ui now computes the last-6 non-system/status UI messages once and uses it identically on both the persist-on and persist-off paths, giving parity between the two. Previously, persist-on sent no history at all (after the eager-history removal), while persist-off has always flattened the in-memory UI messages — the asymmetry was confusing. - packages/client-typescript/src/client/Chat.ts: drop EAGER_HISTORY_TURNS, slice-and-push loop, persist-strip block. Add opts.history: QuestionHistory[] to send(). - schema/index.ts: drop EAGER_HISTORY_TURNS export. - Chat.test.ts: assert empty history by default, add pass-through test. - packages/client-python/src/rocketride/chat.py: mirror of TS. - __init__.py: drop EAGER_HISTORY_TURNS export. - test_chat.py: mirror of TS tests. - apps/chat-ui/src/hooks/useChatMessages.ts: lift eager-history computation out of the if/else; pass via chat.send opts on persist-on, addHistory locally on persist-off. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The ChatList sidebar referenced --rr-* CSS variables that don't exist in the project's theme system, so it always fell back to the hardcoded light hex values regardless of which theme (Ocean, Forest, Sunset, Midnight, Corporate, VSCode) the rest of the chat UI was on. Replace the references with the actual project theme variables (--bg-secondary, --border-color, --accent-primary, --text-secondary, --bg-tertiary, --error-color, --text-primary, --input-bg, --input-border) so the sidebar repaints when the user switches themes. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Strips inline pointers to the internal design doc from code/test comments, module docstrings, and a couple of jest describe/test labels. Pure documentation cleanup before opening the PR — no runtime behavior changes. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
|
No description provided. |
|
No actionable comments were generated in the recent review. 🎉 ℹ️ Recent review info⚙️ Run configurationConfiguration used: Path: .coderabbit.yaml Review profile: ASSERTIVE Plan: Pro Run ID: 📒 Files selected for processing (8)
💤 Files with no reviewable changes (4)
📝 WalkthroughWalkthroughAdds persistent, pipeline-scoped chat sessions: on-disk JSONL transcripts + catalog, SDK bindings (TS + Python), Question.chat_id support, webhook persist parameter, boot-time UI wiring, UI sidebar and lifecycle, and tests. ChangesPersistent Chat Session Implementation
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Suggested reviewers
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 10
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
packages/client-typescript/src/client/schema/Question.ts (1)
195-218:⚠️ Potential issue | 🟡 Minor | ⚡ Quick winValidate
chat_idin the constructor too.
fromDict()enforces UUID format, but direct construction (new Question({ chat_id: ... })) does not. That allows invalid in-memory state and inconsistent behavior across creation paths.🔧 Suggested patch
constructor( options: { @@ ) { @@ this.expectJson = options.expectJson || false; this.role = options.role || ''; + if (options.chat_id !== undefined && options.chat_id !== null) { + if (typeof options.chat_id !== 'string' || !UUID_RE.test(options.chat_id)) { + throw new Error(`Question.chat_id must be null or a UUID string; got ${String(options.chat_id)}`); + } + } this.chat_id = options.chat_id ?? null; }🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@packages/client-typescript/src/client/schema/Question.ts` around lines 195 - 218, The Question constructor currently assigns chat_id directly without format checks, causing divergence from fromDict(); update the constructor in class Question to validate options.chat_id (when not null/undefined) using the same UUID validation used in fromDict() (reuse the same helper or regex) and only set this.chat_id if it passes validation; if validation fails, throw a clear Error (or TypeError) to match fromDict() behavior so in-memory construction and deserialization enforce the same UUID constraint.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@apps/chat-ui/src/App.tsx`:
- Around line 117-120: When no URL params are provided we currently
unconditionally fall back to sessionStorage causing stale
chatPipelineId/chatPersist to leak; update the logic around isVSCode,
urlPipelineId and urlPersist so that if the URL param is absent you explicitly
clear sessionStorage (removeItem 'chatPipelineId' and 'chatPersist') or set
urlPipelineId = '' and urlPersist = false instead of blindly reusing old values;
apply the same change to the other similar blocks where urlPipelineId/urlPersist
are populated so fresh auth sessions never inherit stale values.
In `@apps/chat-ui/src/components/ChatContainer.tsx`:
- Around line 171-191: handleSendMessage can race when persistEnabled is true
and currentChat is null, causing multiple Chat.create calls; fix by serializing
creation: add a ref (e.g., creatingChatPromiseRef) that stores the in-flight
Promise returned by Chat.create, have handleSendMessage check currentChat and if
null then check creatingChatPromiseRef.current — if present await it, otherwise
set creatingChatPromiseRef.current = Chat.create({...}) and await it, then
setCurrentChat(if successful) and clear creatingChatPromiseRef.current; update
handleSendMessage to always use the resolved chat instance (or null on failure)
before calling sendMessage and bumping setListRefreshKey.
In `@apps/chat-ui/src/components/ChatList.tsx`:
- Around line 154-164: The chat row is a non-focusable div (key={entry.guid})
with mouse-only onClick which prevents keyboard users from focusing/activating
it; change the element to a semantic, focusable control (e.g., a <button> or an
element with tabIndex={0} plus keyboard handlers) so keyboard users can select
chats: replace the clickable div with a button or add role="button"
tabIndex={isEditing ? -1 : 0} and implement onKeyDown that triggers
onSelectChat(entry.guid) for Enter/Space, preserve the existing props (cursor,
background when isActive, disabled behavior when isEditing) and add an
accessible state attribute such as aria-pressed or aria-selected to reflect
isActive.
In `@apps/chat-ui/src/hooks/useChatMessages.ts`:
- Around line 217-231: The hydrated message id calculation (in
useChatMessages.ts where flattened.push is used) currently uses id: turn.seq * 2
+ 1 + i which can collide with the next turn’s user id when a turn emits
multiple responses; change the id scheme to guarantee uniqueness across turns by
either using a global incrementing counter (e.g., messageIdCounter that you
increment for each flattened.push) or by using a per-turn multiplier large
enough (e.g., id: turn.seq * MAX_RESPONSES + offset + i) so each response and
user message gets a distinct id; update all flattened.push calls to use the new
id source (or counter) instead of the current turn.seq * 2 formula.
In `@packages/client-python/src/rocketride/chat.py`:
- Around line 284-312: Chat.send is racy: computing seq from self.history and
then writing chat.jsonl via _append_turn_line/_update_catalog_for_turn can
interleave across concurrent sends, causing duplicate seq and lost turns. Fix by
adding an instance-level asyncio.Lock (e.g. self._send_lock) initialized in
Chat.__init__ (or lazily if needed) and wrap the critical section that
calculates seq, creates ChatTurn, calls _append_turn_line, mutates self.history,
and _update_catalog_for_turn inside "async with self._send_lock:" so only one
send executes those steps at a time; keep the network call to
self._client.chat() outside the lock if possible to avoid blocking other tasks.
Ensure the lock is present on all Chat instances so concurrent calls serialize
correctly.
- Around line 448-495: The optimistic-locking race occurs because the earlier
version check in _read_or_init_catalog and the later fs_write_json are not
atomic; to fix it, re-validate the catalog version immediately before attempting
the write inside the same retry loop: right before calling
client.fs_write_json(CATALOG_PATH, payload) read
client.fs_read_json(CATALOG_PATH) (or reuse a fresh read) and ensure its version
still equals the observed value (or raise CatalogContentionError to trigger a
retry), and only then perform fs_write_json; reference symbols:
_read_or_init_catalog, client.fs_read_json, client.fs_write_json, CATALOG_PATH,
CatalogContentionError, CATALOG_MAX_RETRY.
- Around line 167-172: In parse_chat_file(), after parsing a JSON line into rec,
guard against non-object JSON values by checking that rec is a dict before
accessing rec.get(...); if not isinstance(rec, dict): continue (or otherwise
skip) to avoid AttributeError when encountering JSON scalars; apply this check
before assigning kind = rec.get('type') and before handling kinds like 'header'.
In `@packages/client-typescript/src/client/Chat.ts`:
- Around line 233-267: The send() method currently computes seq from the mutable
history and performs read-modify-write appends (_appendTurnLine, history.push,
_updateCatalogEntryForTurn) without synchronization, which causes lost
turns/duplicate seq when send() is called concurrently; fix this by serializing
send() per Chat instance (e.g., add a per-instance mutex/queue) and wrap the seq
calculation and the critical section that calls _appendTurnLine, history.push,
and _updateCatalogEntryForTurn inside the lock so only one send() mutates
history and writes turn lines at a time; ensure the lock is acquired before
computing seq and released after updating history and catalog.
- Around line 461-495: mutateCatalog has a TOCTOU window: two writers can both
pass the pre-write version check and then overwrite each other; fix by
validating the write completed as intended — after await
client.fsWriteJson(CATALOG_PATH, catalog) immediately re-read the file
(client.fsReadJson<ChatCatalog>(CATALOG_PATH)) and verify the persisted version
equals catalog.version (observedVersion + 1); if it does not, throw
CatalogContentionError(attempt) (or treat as a retryable conflict and continue
the loop) so the function retries the read-modify-write cycle; reference
mutateCatalog, readOrInitCatalog, client.fsReadJson, client.fsWriteJson,
CatalogContentionError, and CATALOG_MAX_RETRY.
In `@packages/client-typescript/tests/Chat.test.ts`:
- Around line 292-299: Update the two tests to assert the specific
UUID-validation error instead of any thrown error: for the async test calling
Chat.open({ client: fs, token: TOKEN, chatId: 'not-a-uuid' }) use await
expect(...).rejects.toThrow(/uuid/i) or rejects.toThrowError(/invalid.*uuid/i)
to match the UUID validation message, and for the synchronous Question.fromDict
call use expect(() => Question.fromDict({ chat_id: 'nope' } as
any)).toThrow(/uuid/i) (leave the valid UUID case unchanged); reference the
Chat.open and Question.fromDict symbols when making these changes.
---
Outside diff comments:
In `@packages/client-typescript/src/client/schema/Question.ts`:
- Around line 195-218: The Question constructor currently assigns chat_id
directly without format checks, causing divergence from fromDict(); update the
constructor in class Question to validate options.chat_id (when not
null/undefined) using the same UUID validation used in fromDict() (reuse the
same helper or regex) and only set this.chat_id if it passes validation; if
validation fails, throw a clear Error (or TypeError) to match fromDict()
behavior so in-memory construction and deserialization enforce the same UUID
constraint.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: ASSERTIVE
Plan: Pro
Run ID: 2d3e38ec-ea4e-4e5a-9683-3d75c4a52761
📒 Files selected for processing (16)
apps/chat-ui/src/App.tsxapps/chat-ui/src/components/ChatContainer.tsxapps/chat-ui/src/components/ChatList.tsxapps/chat-ui/src/hooks/useChatMessages.tsnodes/src/nodes/webhook/IEndpoint.pynodes/src/nodes/webhook/services.chat.jsonpackages/ai/tests/ai/common/agent/__init__.pypackages/client-python/src/rocketride/__init__.pypackages/client-python/src/rocketride/chat.pypackages/client-python/src/rocketride/schema/question.pypackages/client-python/tests/test_chat.pypackages/client-typescript/src/client/Chat.tspackages/client-typescript/src/client/client.tspackages/client-typescript/src/client/schema/Question.tspackages/client-typescript/src/client/schema/index.tspackages/client-typescript/tests/Chat.test.ts
| rec = json.loads(line) | ||
| except json.JSONDecodeError: | ||
| # Partial write on the trailing line — tolerate. | ||
| continue | ||
| kind = rec.get('type') | ||
| if kind == 'header': |
There was a problem hiding this comment.
Guard parse_chat_file() against non-object JSON records.
A valid JSON scalar line (e.g., 123 or "x") makes rec.get(...) crash with AttributeError. Skip non-dict records before key access.
✅ Minimal robustness patch
try:
rec = json.loads(line)
except json.JSONDecodeError:
# Partial write on the trailing line — tolerate.
continue
+ if not isinstance(rec, dict):
+ continue
kind = rec.get('type')📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| rec = json.loads(line) | |
| except json.JSONDecodeError: | |
| # Partial write on the trailing line — tolerate. | |
| continue | |
| kind = rec.get('type') | |
| if kind == 'header': | |
| rec = json.loads(line) | |
| except json.JSONDecodeError: | |
| # Partial write on the trailing line — tolerate. | |
| continue | |
| if not isinstance(rec, dict): | |
| continue | |
| kind = rec.get('type') | |
| if kind == 'header': |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@packages/client-python/src/rocketride/chat.py` around lines 167 - 172, In
parse_chat_file(), after parsing a JSON line into rec, guard against non-object
JSON values by checking that rec is a dict before accessing rec.get(...); if not
isinstance(rec, dict): continue (or otherwise skip) to avoid AttributeError when
encountering JSON scalars; apply this check before assigning kind =
rec.get('type') and before handling kinds like 'header'.
| async def send( | ||
| self, | ||
| text: str, | ||
| *, | ||
| attachments: Optional[Sequence[Any]] = None, # parked for feature 2 | ||
| on_sse: Optional[Callable[[str, dict], Awaitable[None]]] = None, | ||
| history: Optional[Sequence[QuestionHistory]] = None, | ||
| ) -> Any: | ||
| question = Question(type=QuestionType.PROMPT, chat_id=self.id) | ||
| question.addQuestion(text) | ||
| if history: | ||
| for item in history: | ||
| question.addHistory(item) | ||
|
|
||
| result = await self._client.chat(token=self._token, question=question, on_sse=on_sse) | ||
|
|
||
| seq = (self.history[-1].seq if self.history else 0) + 1 | ||
| turn = ChatTurn( | ||
| type='turn', | ||
| schema_version=CHAT_SCHEMA_VERSION, | ||
| seq=seq, | ||
| created=_now(), | ||
| question=question.model_dump(), | ||
| answer=result, | ||
| ) | ||
| await self._append_turn_line(turn) | ||
| self.history.append(turn) | ||
| await self._update_catalog_for_turn(turn) | ||
| return result |
There was a problem hiding this comment.
Serialize Chat.send() to avoid turn loss under concurrent calls.
Line 300 computes seq from in-memory state, and Line 350 rewrites the full file. Concurrent sends can produce duplicate seq values and last-writer-wins loss in chat.jsonl.
🔧 Suggested fix (instance-level async lock)
class Chat:
@@
self.created = created
self.history: List[ChatTurn] = list(history)
+ self._send_lock = asyncio.Lock()
@@
async def send(
@@
) -> Any:
- question = Question(type=QuestionType.PROMPT, chat_id=self.id)
- question.addQuestion(text)
- if history:
- for item in history:
- question.addHistory(item)
-
- result = await self._client.chat(token=self._token, question=question, on_sse=on_sse)
-
- seq = (self.history[-1].seq if self.history else 0) + 1
- turn = ChatTurn(
- type='turn',
- schema_version=CHAT_SCHEMA_VERSION,
- seq=seq,
- created=_now(),
- question=question.model_dump(),
- answer=result,
- )
- await self._append_turn_line(turn)
- self.history.append(turn)
- await self._update_catalog_for_turn(turn)
- return result
+ async with self._send_lock:
+ question = Question(type=QuestionType.PROMPT, chat_id=self.id)
+ question.addQuestion(text)
+ if history:
+ for item in history:
+ question.addHistory(item)
+
+ result = await self._client.chat(token=self._token, question=question, on_sse=on_sse)
+ seq = (self.history[-1].seq if self.history else 0) + 1
+ turn = ChatTurn(
+ type='turn',
+ schema_version=CHAT_SCHEMA_VERSION,
+ seq=seq,
+ created=_now(),
+ question=question.model_dump(),
+ answer=result,
+ )
+ await self._append_turn_line(turn)
+ self.history.append(turn)
+ await self._update_catalog_for_turn(turn)
+ return resultAlso applies to: 338-351
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@packages/client-python/src/rocketride/chat.py` around lines 284 - 312,
Chat.send is racy: computing seq from self.history and then writing chat.jsonl
via _append_turn_line/_update_catalog_for_turn can interleave across concurrent
sends, causing duplicate seq and lost turns. Fix by adding an instance-level
asyncio.Lock (e.g. self._send_lock) initialized in Chat.__init__ (or lazily if
needed) and wrap the critical section that calculates seq, creates ChatTurn,
calls _append_turn_line, mutates self.history, and _update_catalog_for_turn
inside "async with self._send_lock:" so only one send executes those steps at a
time; keep the network call to self._client.chat() outside the lock if possible
to avoid blocking other tasks. Ensure the lock is present on all Chat instances
so concurrent calls serialize correctly.
| for attempt in range(1, CATALOG_MAX_RETRY + 1): | ||
| catalog, observed = await _read_or_init_catalog(client) | ||
| mutator(catalog) | ||
| catalog.version = observed + 1 | ||
| try: | ||
| fresh = await client.fs_read_json(CATALOG_PATH) | ||
| fresh_version = int((fresh or {}).get('version', 0) or 0) if isinstance(fresh, dict) else 0 | ||
| if fresh_version != observed: | ||
| if attempt == CATALOG_MAX_RETRY: | ||
| raise CatalogContentionError( | ||
| f'catalog.json optimistic-lock retry exhausted after {attempt} attempts' | ||
| ) | ||
| await asyncio.sleep(0.02 * attempt) | ||
| continue | ||
| except CatalogContentionError: | ||
| raise | ||
| except Exception: | ||
| # Catalog absent on second read — fine; the write will create it. | ||
| pass | ||
| payload = { | ||
| 'schema_version': catalog.schema_version, | ||
| 'version': catalog.version, | ||
| 'chats': [ | ||
| { | ||
| 'guid': c.guid, | ||
| 'title': c.title, | ||
| 'created': c.created, | ||
| 'updated': c.updated, | ||
| 'message_count': c.message_count, | ||
| 'preview': c.preview, | ||
| 'pipeline_id': c.pipeline_id, | ||
| } | ||
| for c in catalog.chats | ||
| ], | ||
| } | ||
| try: | ||
| await client.fs_write_json(CATALOG_PATH, payload) | ||
| return | ||
| except Exception as e: | ||
| # The filestore enforces a per-path write-lock — concurrent writers | ||
| # hit "File already open for writing" before our version check sees | ||
| # the conflict. Retry the read-modify-write cycle like a normal | ||
| # version-contention case. | ||
| if 'already open for writing' in str(e) and attempt < CATALOG_MAX_RETRY: | ||
| await asyncio.sleep(0.02 * attempt) | ||
| continue | ||
| raise | ||
| raise CatalogContentionError(f'catalog.json optimistic-lock retry exhausted after {CATALOG_MAX_RETRY} attempts') |
There was a problem hiding this comment.
Catalog optimistic locking can still lose updates.
The version check (Lines 453-456) and write (Line 484) are separate operations. Competing writers can both pass validation on the same version and overwrite each other with no detected conflict.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@packages/client-python/src/rocketride/chat.py` around lines 448 - 495, The
optimistic-locking race occurs because the earlier version check in
_read_or_init_catalog and the later fs_write_json are not atomic; to fix it,
re-validate the catalog version immediately before attempting the write inside
the same retry loop: right before calling client.fs_write_json(CATALOG_PATH,
payload) read client.fs_read_json(CATALOG_PATH) (or reuse a fresh read) and
ensure its version still equals the observed value (or raise
CatalogContentionError to trigger a retry), and only then perform fs_write_json;
reference symbols: _read_or_init_catalog, client.fs_read_json,
client.fs_write_json, CATALOG_PATH, CatalogContentionError, CATALOG_MAX_RETRY.
| async send( | ||
| text: string, | ||
| opts?: { | ||
| attachments?: unknown[]; | ||
| onSSE?: (type: string, data: Record<string, unknown>) => Promise<void>; | ||
| history?: QuestionHistory[]; | ||
| } | ||
| ): Promise<PIPELINE_RESULT> { | ||
| const question = new Question({ type: QuestionType.PROMPT, chat_id: this.id }); | ||
| question.addQuestion(text); | ||
| if (opts?.history) { | ||
| for (const item of opts.history) { | ||
| question.addHistory(item); | ||
| } | ||
| } | ||
|
|
||
| const result = await this._client.chat({ | ||
| token: this._token, | ||
| question, | ||
| onSSE: opts?.onSSE, | ||
| }); | ||
|
|
||
| const seq = (this.history[this.history.length - 1]?.seq ?? 0) + 1; | ||
| const turn: ChatTurn = { | ||
| type: 'turn', | ||
| schema_version: CHAT_SCHEMA_VERSION, | ||
| seq, | ||
| created: new Date().toISOString(), | ||
| question: question.toDict(), | ||
| answer: result as unknown, | ||
| }; | ||
| await this._appendTurnLine(turn); | ||
| this.history.push(turn); | ||
| await this._updateCatalogEntryForTurn(turn); | ||
|
|
There was a problem hiding this comment.
Serialize send() to prevent lost turns and duplicate sequence numbers.
Line 255 derives seq from mutable in-memory history, and Lines 304-309 do read-then-rewrite appends. Parallel send() calls can compute the same seq and overwrite each other’s turn lines.
🔧 Suggested fix (serialize sends per Chat instance)
export class Chat {
+ private _sendQueue: Promise<unknown> = Promise.resolve();
+
async send(
text: string,
opts?: {
@@
- ): Promise<PIPELINE_RESULT> {
- const question = new Question({ type: QuestionType.PROMPT, chat_id: this.id });
- question.addQuestion(text);
- if (opts?.history) {
- for (const item of opts.history) {
- question.addHistory(item);
- }
- }
-
- const result = await this._client.chat({
- token: this._token,
- question,
- onSSE: opts?.onSSE,
- });
-
- const seq = (this.history[this.history.length - 1]?.seq ?? 0) + 1;
- const turn: ChatTurn = {
- type: 'turn',
- schema_version: CHAT_SCHEMA_VERSION,
- seq,
- created: new Date().toISOString(),
- question: question.toDict(),
- answer: result as unknown,
- };
- await this._appendTurnLine(turn);
- this.history.push(turn);
- await this._updateCatalogEntryForTurn(turn);
-
- return result;
- }
+ ): Promise<PIPELINE_RESULT> {
+ const run = async (): Promise<PIPELINE_RESULT> => {
+ const question = new Question({ type: QuestionType.PROMPT, chat_id: this.id });
+ question.addQuestion(text);
+ if (opts?.history) {
+ for (const item of opts.history) question.addHistory(item);
+ }
+
+ const result = await this._client.chat({ token: this._token, question, onSSE: opts?.onSSE });
+ const seq = (this.history[this.history.length - 1]?.seq ?? 0) + 1;
+ const turn: ChatTurn = {
+ type: 'turn',
+ schema_version: CHAT_SCHEMA_VERSION,
+ seq,
+ created: new Date().toISOString(),
+ question: question.toDict(),
+ answer: result as unknown,
+ };
+ await this._appendTurnLine(turn);
+ this.history.push(turn);
+ await this._updateCatalogEntryForTurn(turn);
+ return result;
+ };
+
+ const next = this._sendQueue.then(run, run);
+ this._sendQueue = next.then(() => undefined, () => undefined);
+ return next;
+ }
}Also applies to: 304-309
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@packages/client-typescript/src/client/Chat.ts` around lines 233 - 267, The
send() method currently computes seq from the mutable history and performs
read-modify-write appends (_appendTurnLine, history.push,
_updateCatalogEntryForTurn) without synchronization, which causes lost
turns/duplicate seq when send() is called concurrently; fix this by serializing
send() per Chat instance (e.g., add a per-instance mutex/queue) and wrap the seq
calculation and the critical section that calls _appendTurnLine, history.push,
and _updateCatalogEntryForTurn inside the lock so only one send() mutates
history and writes turn lines at a time; ensure the lock is acquired before
computing seq and released after updating history and catalog.
| export async function mutateCatalog(client: RocketRideChatClient, mutator: (cat: ChatCatalog) => void): Promise<void> { | ||
| for (let attempt = 1; attempt <= CATALOG_MAX_RETRY; attempt++) { | ||
| const { catalog, observedVersion } = await readOrInitCatalog(client); | ||
| mutator(catalog); | ||
| catalog.version = observedVersion + 1; | ||
| // Re-read just before writing to detect concurrent mutations between | ||
| // our read and write. Not true CAS — the filestore has per-path write | ||
| // locks but no compare-and-swap primitive — so collisions are still | ||
| // possible. Bounded retry covers the realistic same-user-two-tab case. | ||
| try { | ||
| const fresh = await client.fsReadJson<ChatCatalog>(CATALOG_PATH); | ||
| if ((fresh?.version ?? 0) !== observedVersion) { | ||
| if (attempt === CATALOG_MAX_RETRY) throw new CatalogContentionError(attempt); | ||
| await sleep(20 * attempt); | ||
| continue; | ||
| } | ||
| } catch (err) { | ||
| if (err instanceof CatalogContentionError) throw err; | ||
| // catalog absent on second read — proceed; the write will create it. | ||
| } | ||
| try { | ||
| await client.fsWriteJson(CATALOG_PATH, catalog); | ||
| return; | ||
| } catch (err) { | ||
| // The filestore enforces a per-path write-lock — concurrent writers | ||
| // hit "File already open for writing" before our version check sees | ||
| // the conflict. Retry the read-modify-write cycle like a normal | ||
| // version-contention case. | ||
| const msg = err instanceof Error ? err.message : String(err); | ||
| if (msg.includes('already open for writing') && attempt < CATALOG_MAX_RETRY) { | ||
| await sleep(20 * attempt); | ||
| continue; | ||
| } | ||
| throw err; | ||
| } |
There was a problem hiding this comment.
mutateCatalog still has a TOCTOU overwrite window.
Lines 471-476 (version check) and Line 482 (write) are non-atomic. Two writers can both validate against the same version and then sequentially write divergent payloads, dropping one mutation.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@packages/client-typescript/src/client/Chat.ts` around lines 461 - 495,
mutateCatalog has a TOCTOU window: two writers can both pass the pre-write
version check and then overwrite each other; fix by validating the write
completed as intended — after await client.fsWriteJson(CATALOG_PATH, catalog)
immediately re-read the file (client.fsReadJson<ChatCatalog>(CATALOG_PATH)) and
verify the persisted version equals catalog.version (observedVersion + 1); if it
does not, throw CatalogContentionError(attempt) (or treat as a retryable
conflict and continue the loop) so the function retries the read-modify-write
cycle; reference mutateCatalog, readOrInitCatalog, client.fsReadJson,
client.fsWriteJson, CatalogContentionError, and CATALOG_MAX_RETRY.
| test('open rejects a non-UUID chatId', async () => { | ||
| const fs = new MockFsClient(); | ||
| await expect(Chat.open({ client: fs, token: TOKEN, chatId: 'not-a-uuid' })).rejects.toThrow(); | ||
| }); | ||
|
|
||
| test('Question.fromDict rejects a chat_id that is not a valid UUID', () => { | ||
| expect(() => Question.fromDict({ chat_id: 'nope' } as any)).toThrow(); | ||
| const ok = Question.fromDict({ chat_id: 'aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa' } as any); |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
set -euo pipefail
# Locate the file and print the relevant region with line numbers
FILE="packages/client-typescript/tests/Chat.test.ts"
echo "== File check =="
ls -l "$FILE"
echo "== Lines 270-330 =="
nl -ba "$FILE" | sed -n '270,330p'
# Also print the rest of the UUID-related tests if any
echo "== UUID-related occurrences in this file =="
rg -n "UUID|uuid|chatId|chat_id" "$FILE" || true
# Find implementations referenced: Chat.open and Question.fromDict
echo "== Find Chat.open implementation =="
rg -n "static open\\(|open\\(" packages/client-typescript -S || true
echo "== Find Question.fromDict implementation =="
rg -n "class Question|fromDict\\(" packages/client-typescript -S || trueRepository: rocketride-org/rocketride-server
Length of output: 271
🏁 Script executed:
#!/bin/bash
set -euo pipefail
FILE="packages/client-typescript/tests/Chat.test.ts"
echo "== File check =="
ls -l "$FILE"
echo "== Lines 270-330 =="
nl -ba "$FILE" | sed -n '270,330p'
echo "== UUID-related occurrences in this file =="
rg -n "UUID|uuid|chatId|chat_id" "$FILE" || true
echo "== Find Chat.open implementation =="
rg -n "Chat\\.open|static open\\(|function open\\b|open\\s*\\(" packages/client-typescript -S || true
echo "== Find Question.fromDict implementation =="
rg -n "class Question|Question\\.fromDict|fromDict\\s*\\(" packages/client-typescript -S || trueRepository: rocketride-org/rocketride-server
Length of output: 270
🏁 Script executed:
#!/bin/bash
set -euo pipefail
FILE="packages/client-typescript/tests/Chat.test.ts"
echo "== File check =="
ls -l "$FILE"
echo "== Lines 270-330 (with line numbers) =="
cat -n "$FILE" | sed -n '270,330p'
echo "== UUID-related occurrences in this file =="
rg -n "UUID|uuid|chatId|chat_id" "$FILE" || true
echo "== Print full file for UUID-related test area (short range around first match) =="
# Find first line containing chatId/UUID
LINE=$(rg -n "not-a-uuid|fromDict\\(|chat_id|UUID|uuid|chatId" "$FILE" | head -n 1 | cut -d: -f1 || true)
if [ -n "${LINE:-}" ]; then
START=$((LINE-30))
END=$((LINE+60))
if [ "$START" -lt 1 ]; then START=1; fi
echo "== Context around line $LINE ($START-$END) =="
cat -n "$FILE" | sed -n "${START},${END}p"
fi
echo "== Find Chat.open implementation =="
rg -n "Chat\\.open|static open\\(|open\\s*\\(" packages/client-typescript -S || true
echo "== Find Question.fromDict implementation =="
rg -n "Question\\.fromDict|fromDict\\s*\\(" packages/client-typescript -S || trueRepository: rocketride-org/rocketride-server
Length of output: 10973
🏁 Script executed:
#!/bin/bash
set -euo pipefail
CHAT_TS="packages/client-typescript/src/client/Chat.ts"
QUESTION_TS="packages/client-typescript/src/client/schema/Question.ts"
echo "== Chat.ts: Chat.open region =="
echo "-- lines around static async open("
# Print a bounded region around the open() implementation line
LINE_OPEN=$(rg -n "static async open\\(" "$CHAT_TS" | head -n1 | cut -d: -f1)
START=$((LINE_OPEN-60)); END=$((LINE_OPEN+120))
if [ "$START" -lt 1 ]; then START=1; fi
sed -n "${START},${END}p" "$CHAT_TS" | cat -n
echo "== Question.ts: Question.fromDict region =="
LINE_FROMDICT=$(rg -n "static fromDict\\(" "$QUESTION_TS" | head -n1 | cut -d: -f1)
START=$((LINE_FROMDICT-60)); END=$((LINE_FROMDICT+140))
if [ "$START" -lt 1 ]; then START=1; fi
sed -n "${START},${END}p" "$QUESTION_TS" | cat -n
echo "== UUID guards in Chat.ts =="
rg -n "uuid|UUID|chatId|chat_id|format guard|not-a-uuid" "$CHAT_TS" || true
echo "== UUID guards in Question.ts =="
rg -n "uuid|UUID|chat_id|chatId|not a valid UUID|invalid" "$QUESTION_TS" || trueRepository: rocketride-org/rocketride-server
Length of output: 12575
Tighten UUID guard assertions to avoid false-positive test passes.
Chat.openinvalidchatIdshould assert the UUID-validation error (currently accepts any thrown error).Question.fromDictinvalidchat_idshould assert the UUID-validation error (currently accepts any thrown error).
Suggested test tightening
- await expect(Chat.open({ client: fs, token: TOKEN, chatId: 'not-a-uuid' })).rejects.toThrow();
+ await expect(Chat.open({ client: fs, token: TOKEN, chatId: 'not-a-uuid' })).rejects.toThrow(/uuid/i);
- expect(() => Question.fromDict({ chat_id: 'nope' } as any)).toThrow();
+ expect(() => Question.fromDict({ chat_id: 'nope' } as any)).toThrow(/uuid/i);🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@packages/client-typescript/tests/Chat.test.ts` around lines 292 - 299, Update
the two tests to assert the specific UUID-validation error instead of any thrown
error: for the async test calling Chat.open({ client: fs, token: TOKEN, chatId:
'not-a-uuid' }) use await expect(...).rejects.toThrow(/uuid/i) or
rejects.toThrowError(/invalid.*uuid/i) to match the UUID validation message, and
for the synchronous Question.fromDict call use expect(() => Question.fromDict({
chat_id: 'nope' } as any)).toThrow(/uuid/i) (leave the valid UUID case
unchanged); reference the Chat.open and Question.fromDict symbols when making
these changes.
asclearuc
left a comment
There was a problem hiding this comment.
Thanks @dylan-savage — the client-driven approach on top of the existing fs_* primitives is the right call, the immutable-header + append-per-turn JSONL layout reads well, and the symmetry between the TS and Python SDKs (with parity tests on both sides) made this much easier to review.
Approving v1 and waiting for v2 with the tk_*-on-share-URL issue resolved — it's a real security concern (private operator token with full filestore CRUD landing on a shareable link), so the task.store.chat scope work needs to land before persistence is enabled on the public surface.
# Conflicts: # packages/client-typescript/src/client/client.ts
- App.tsx: scrub stale chatPipelineId/chatPersist when ?auth= refreshes the session so a new task can't inherit the previous task's persistence flags - ChatContainer: dedupe lazy Chat.create via a promise ref so rapid concurrent sends share one in-flight chat creation - ChatList: chat rows now expose role=button, tabIndex, Enter/Space activation, and aria-current for keyboard users and screen readers - useChatMessages: hydrate with a monotonic per-message id so a turn emitting multiple bot responses can't collide with the next turn's user id - chat.py parse_chat_file: skip non-dict JSON records before key access - Chat.ts + chat.py: serialize Chat.send per instance to prevent duplicate seq numbers and last-writer-wins loss on the chat.jsonl read-then-rewrite append - Chat.ts + chat.py: per-client catalog mutex closes the intra-client TOCTOU window on catalog.json mutations; cross-client contention still falls through to the bounded OCC retry (server CAS deferred to v2) - Chat.test.ts: tighten the UUID guard with empty, extra-segment, and non-hex negative cases Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Summary
fs_*primitives — no engine or chat-node changes for chat-aware writes.persistSessionstoggle on the chat node (default OFF — behavior is unchanged for anyone who doesn't enable it).What's in the diff
Question.chat_id: string | nullfield on the TS and Python schemas, with UUID validation.Chatclass in the TS and Python SDKs:create/open/send/rename/delete. Atomic-per-turn JSONL writes; optimistic-version retry on the catalog.client.chats.list({ pipelineId })namespace for enumerating the user's saved chats.ChatListsidebar in chat-ui;ChatContainerlifecycle (auto-open most recent on boot, create on first send, rename/delete handlers);useChatMessagesroutes through aChatwhen one is supplied.persistSessions: booleanparameter on the chat node'sservices.chat.json, defaultfalse. When enabled,IEndpoint.pyappends&pipelineId=<id>&persist=1to the embed URL.chat.send(text, { history })opt-in for callers that want to prime model context explicitly.On-disk layout:
Pre-merge scope cut
Earlier on this branch, the feature also shipped a
recall_historyengine built-in tool and an auto eager-history embedding (last 3 turns inlined into every outboundQuestion.history). Both were pulled before opening the PR — they're entangled with broader context-engineering questions (selector, dedup, system-prompt vs. tool injection, per-session vs. cross-session) that deserve their own design pass. The branch now ships the durable transcript + UI rehydration layer only; the model itself is stateless within a chat. Pipelines that want history can pass it explicitly viachat.send(text, { history }). The model-facing layer lands in a follow-up.What this unlocks
This PR is the substrate for two follow-ups that build directly on it:
.chats/<chat-guid>/. The on-disk schema already accommodates them via the verbatim-Questionstorage and the per-lineschema_versionenvelope, no migration required.Known v1 limitation: persistence requires the operator token
The chat node mints two credentials per run:
pk_*task.control,task.data,task.monitortk_*task.storePersistence calls (
fs_mkdir/fs_write_*) requiretask.store. Onlytk_*carries it.tk_*in a shareable URL — it would grant every visitor full filestore CRUD on the operator's account scope, not just under.chats/.tk_*) persistence works fine; toggling it ON for apk_*consumer is currently a misconfiguration.task.store.chatscope (writes allowed under.chats/<user>/only), grant it topk_*.task_server.py+ FileStore scoping change.Test plan
persistSessionsOFF — send messages,.chats/is not created, refresh loses history.persistSessionsON — send a message,.chats/<guid>/chat.jsonlexists with header + 1 turn;catalog.jsonlists it..chats/<guid>/appears; sidebar shows two entries.catalog.jsontitle updates; the chat'schat.jsonlis byte-identical.versionincrements cleanly.npx tsc --noEmitclean inpackages/client-typescript+apps/chat-ui;ruff checkclean.🤖 Generated with Claude Code
Summary by CodeRabbit
New Features
Tests