Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 56 additions & 0 deletions src/app/api/chat/route.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ vi.mock("@/lib/chat-pubsub", () => ({

const mockSelectRecoveredAssistantText = vi.fn<(params: unknown) => string>(() => "");
vi.mock("@/lib/chat-recovery", () => ({
isAssistantDeliveryPlaceholder: (content: string) =>
content.trim().replace(/\s+/g, " ").toLowerCase() === "answered in chat.",
selectRecoveredAssistantText: (params: unknown) => mockSelectRecoveredAssistantText(params),
}));

Expand Down Expand Up @@ -779,6 +781,60 @@ describe("POST /api/chat", () => {
expect(streamed).toContain("data: [DONE]");
});

it("streams message-tool source replies instead of the delivery placeholder", async () => {
const chatHandlers: Array<(payload: unknown) => void> = [];
mockGetGatewayClient.mockResolvedValueOnce({
on: vi.fn((event: string, handler: (payload: unknown) => void) => {
if (event === "*") chatHandlers.push(handler);
}),
off: vi.fn(),
chatSend: vi.fn().mockResolvedValue({ runId: "run-1" }),
chatAbort: vi.fn(() => Promise.resolve()),
chatHistory: vi.fn(),
rpc: vi.fn(),
});

const response = await POST(makeRequest({
messages: [{ role: "user", content: "did it publish?" }],
agent: "main",
}));
const reader = response.body!.getReader();
await readUntilContains(reader, "\"event\":\"gateway_send_started\"");

await vi.waitFor(() => {
expect(chatHandlers).toHaveLength(1);
});

chatHandlers[0]({
event: "agent",
stream: "tool",
sessionKey: "main",
runId: "run-1",
data: {
name: "message",
phase: "completed",
result: {
status: "ok",
sourceReply: {
text: "Detailed answer from the message tool.",
},
},
},
});
chatHandlers[0]({
event: "chat",
state: "final",
sessionKey: "main",
runId: "run-1",
message: { role: "assistant", content: "Answered in chat." },
});

const streamed = await readUntilDone(reader);
expect(streamed).toContain("\"choices\":[{\"delta\":{\"content\":\"Detailed answer from the message tool.\"}}]");
expect(streamed).not.toContain("\"choices\":[{\"delta\":{\"content\":\"Answered in chat.\"}}]");
expect(streamed).toContain("data: [DONE]");
});

it("keeps long-running chat streams alive with heartbeat progress", async () => {
vi.useFakeTimers();
const response = await POST(makeRequest({
Expand Down
60 changes: 58 additions & 2 deletions src/app/api/chat/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import { db, withRetry } from "@/db";
import { agents, channelMembers, chatMessages, chatRuns, chatSessions, chatThreads } from "@/db/schema";
import { eq, desc, and, isNull, sql } from "drizzle-orm";
import { publishChatEvent, publishChatProgressEvent } from "@/lib/chat-pubsub";
import { selectRecoveredAssistantText } from "@/lib/chat-recovery";
import { isAssistantDeliveryPlaceholder, selectRecoveredAssistantText } from "@/lib/chat-recovery";
import { resolveCurrentUser } from "@/lib/resolve-user";
import { sendAgentReplyNotification } from "@/lib/mobile-push";
import { registerChatRunAbort } from "@/lib/chat-run-abort-registry";
Expand Down Expand Up @@ -274,6 +274,51 @@ function extractText(value: unknown, seen = new WeakSet<object>()): string {
return "";
}

function parseJsonRecord(value: string) {
const trimmed = value.trim();
if (!trimmed || (!trimmed.startsWith("{") && !trimmed.startsWith("["))) return null;
try {
return JSON.parse(trimmed) as unknown;
} catch {
return null;
}
}

function extractSourceReplyText(value: unknown, seen = new WeakSet<object>()): string {
if (!value) return "";

if (typeof value === "string") {
const parsed = parseJsonRecord(value);
return parsed ? extractSourceReplyText(parsed, seen) : "";
}

if (Array.isArray(value)) {
for (const item of value) {
const text = extractSourceReplyText(item, seen);
if (text) return text;
}
return "";
}

const record = asRecord(value);
if (!record) return "";
if (seen.has(record)) return "";
seen.add(record);

const sourceReply = asRecord(record.sourceReply) ?? asRecord(record.source_reply);
const direct = sourceReply
? firstString(sourceReply.text, sourceReply.message, sourceReply.content)
: firstString(record.sourceReplyText, record.source_reply_text);
if (direct && !isAssistantDeliveryPlaceholder(direct)) return direct;

for (const key of ["content", "text", "output", "result", "data", "payload", "message"]) {
const text = extractSourceReplyText(record[key], seen);
if (text) return text;
}

return "";
}

function isToolOnlyMessage(value: unknown): boolean {
const message = asRecord(value);
if (!message) return false;
Expand Down Expand Up @@ -1012,6 +1057,7 @@ export async function POST(request: NextRequest) {
let historySnapshotStreamed = false;
let hasToolActivity = false;
let deferredToolCompletion = false;
let deliveredSourceReplyText = "";

const enqueueData = (payload: unknown) => {
if (!streamController || done) return;
Expand Down Expand Up @@ -1440,6 +1486,13 @@ export async function POST(request: NextRequest) {
if (!matchesSession) return;

const state = p.state as string;
const sourceReplyText = extractSourceReplyText(p);
if (sourceReplyText) {
deliveredSourceReplyText = sourceReplyText;
if (streamAssistantSnapshot(sourceReplyText)) {
historySnapshotStreamed = true;
}
}
const isChatLifecycle = isChatLifecycleEvent(p);
const compactionProgress = extractCompactionProgress(p);
const toolProgress = compactionProgress ? null : extractToolProgress(p);
Expand Down Expand Up @@ -1500,7 +1553,10 @@ export async function POST(request: NextRequest) {
} else if (state === "final") {
const finalMessage = p.message || p;
const toolOnlyFinal = isToolOnlyMessage(finalMessage);
const finalText = toolOnlyFinal ? "" : extractText(finalMessage);
const extractedFinalText = toolOnlyFinal ? "" : extractText(finalMessage);
const finalText = extractedFinalText && isAssistantDeliveryPlaceholder(extractedFinalText)
? deliveredSourceReplyText
: extractedFinalText;
if (finalText && !streamAssistantSnapshot(finalText)) {
fullAssistantText = finalText;
}
Expand Down
13 changes: 13 additions & 0 deletions src/lib/chat-recovery.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -63,4 +63,17 @@ describe("selectRecoveredAssistantText", () => {

expect(recovered).toBe("");
});

it("skips webchat delivery placeholders and recovers the mirrored source reply", () => {
const recovered = selectRecoveredAssistantText({
messages: [
{ role: "user", content: "did the cron publish?" },
{ role: "assistant", content: "Answered in chat." },
{ role: "assistant", content: "Yes, it published updates, not five brand-new repos." },
],
currentUserContents: ["did the cron publish?"],
});

expect(recovered).toBe("Yes, it published updates, not five brand-new repos.");
});
});
10 changes: 9 additions & 1 deletion src/lib/chat-recovery.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@ function contentSet(contents: string[]) {
);
}

export function isAssistantDeliveryPlaceholder(content: string) {
return normalizeContent(content).toLowerCase() === "answered in chat.";
}

export function selectRecoveredAssistantText(params: {
messages: ChatRecoveryMessage[];
currentUserContents: string[];
Expand All @@ -31,7 +35,11 @@ export function selectRecoveredAssistantText(params: {

const recovered = params.messages
.slice(lastMatchingUserIndex + 1)
.find((message) => message.role === "assistant" && message.content.trim());
.find((message) =>
message.role === "assistant" &&
message.content.trim() &&
!isAssistantDeliveryPlaceholder(message.content)
);

if (!recovered) return "";

Expand Down
Loading