Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 47 additions & 21 deletions packages/ai-bot/main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,10 @@ import {
import type { MatrixEvent as DiscreteMatrixEvent } from 'https://cardstack.com/base/matrix-event';
import * as Sentry from '@sentry/node';

import { saveUsageCost } from '@cardstack/billing/ai-billing';
import {
spendUsageCost,
fetchGenerationCostWithBackoff,
} from '@cardstack/billing/ai-billing';
import { PgAdapter } from '@cardstack/postgres';
import type { ChatCompletionMessageParam } from 'openai/resources';
import type { OpenAIError } from 'openai/error';
Expand Down Expand Up @@ -86,22 +89,41 @@ class Assistant {
this.aiBotInstanceId = aiBotInstanceId;
}

async trackAiUsageCost(matrixUserId: string, generationId: string) {
async trackAiUsageCost(
matrixUserId: string,
opts: { costInUsd?: number; generationId?: string },
) {
if (trackAiUsageCostPromises.has(matrixUserId)) {
return;
}
// intentionally do not await saveUsageCost promise - it has a backoff mechanism to retry if the cost is not immediately available so we don't want to block the main thread
trackAiUsageCostPromises.set(
matrixUserId,
saveUsageCost(
this.pgAdapter,
matrixUserId,
generationId,
process.env.OPENROUTER_API_KEY!,
).finally(() => {
trackAiUsageCostPromises.delete(matrixUserId);
}),
);
const promise = (async () => {
let { costInUsd, generationId } = opts;
if (
typeof costInUsd === 'number' &&
Number.isFinite(costInUsd) &&
costInUsd > 0
) {
await spendUsageCost(this.pgAdapter, matrixUserId, costInUsd);
} else if (generationId) {
log.info(
`No inline cost for user ${matrixUserId}, falling back to generation cost API (generationId: ${generationId})`,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In which case there is no inline cost?

);
const fetchedCost = await fetchGenerationCostWithBackoff(
generationId,
process.env.OPENROUTER_API_KEY!,
);
if (fetchedCost !== null) {
await spendUsageCost(this.pgAdapter, matrixUserId, fetchedCost);
Copy link

Copilot AI Apr 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the ai-bot fallback path, if fetchGenerationCostWithBackoff returns null the code currently does nothing (beyond whatever logging happens inside ai-billing) and proceeds without any bot-level warning that credits were not deducted for this user. Please add explicit logging (and/or Sentry capture) here on fetchedCost === null so failed deductions for interrupted streams are visible and actionable.

Suggested change
await spendUsageCost(this.pgAdapter, matrixUserId, fetchedCost);
await spendUsageCost(this.pgAdapter, matrixUserId, fetchedCost);
} else {
let message = `Failed to fetch generation cost for user ${matrixUserId} (generationId: ${generationId}); credits were not deducted`;
log.warn(message);
Sentry.captureMessage(message, {
level: 'warning',
extra: {
matrixUserId,
generationId,
},
});

Copilot uses AI. Check for mistakes.
}
} else {
log.warn(
`No usage cost and no generation ID for user ${matrixUserId}, skipping credit deduction`,
);
}
})().finally(() => {
trackAiUsageCostPromises.delete(matrixUserId);
});
trackAiUsageCostPromises.set(matrixUserId, promise);
}

getResponse(prompt: PromptParts, senderMatrixUserId?: string) {
Expand Down Expand Up @@ -288,10 +310,9 @@ Common issues are:
isCanceled: true,
});
if (activeGeneration.lastGeneratedChunkId) {
await assistant.trackAiUsageCost(
senderMatrixUserId,
activeGeneration.lastGeneratedChunkId,
);
await assistant.trackAiUsageCost(senderMatrixUserId, {
generationId: activeGeneration.lastGeneratedChunkId,
});
}
activeGenerations.delete(room.roomId);
}
Expand Down Expand Up @@ -448,6 +469,7 @@ Common issues are:

let chunkHandlingError: string | undefined;
let generationId: string | undefined;
let costInUsd: number | undefined;
log.info(
`[${eventId}] Starting generation with model %s`,
promptParts.model,
Expand All @@ -471,6 +493,9 @@ Common issues are:
});
}
generationId = chunk.id;
if (chunk.usage && (chunk.usage as any).cost != null) {
costInUsd = (chunk.usage as any).cost;
}
let activeGeneration = activeGenerations.get(room.roomId);
if (activeGeneration) {
activeGeneration.lastGeneratedChunkId = generationId;
Expand Down Expand Up @@ -525,9 +550,10 @@ Common issues are:
await responder.onError(error as OpenAIError);
}
} finally {
if (generationId) {
assistant.trackAiUsageCost(senderMatrixUserId, generationId);
}
assistant.trackAiUsageCost(senderMatrixUserId, {
costInUsd,
generationId,
});
activeGenerations.delete(room.roomId);
}

Expand Down
67 changes: 1 addition & 66 deletions packages/billing/ai-billing.ts
Original file line number Diff line number Diff line change
Expand Up @@ -109,50 +109,7 @@ export async function spendUsageCost(
}
}

export async function saveUsageCost(
dbAdapter: DBAdapter,
matrixUserId: string,
generationId: string,
openRouterApiKey: string,
) {
try {
// Generation data is sometimes not immediately available, so we retry a couple of times until we are able to get the cost
let costInUsd = await fetchGenerationCostWithBackoff(
generationId,
openRouterApiKey,
);

if (costInUsd === null) {
Sentry.captureException(
new Error(
`Failed to fetch generation cost after retries (generationId: ${generationId})`,
),
);
return;
}

let creditsConsumed = Math.round(costInUsd * CREDITS_PER_USD);

let user = await getUserByMatrixUserId(dbAdapter, matrixUserId);

if (!user) {
throw new Error(
`should not happen: user with matrix id ${matrixUserId} not found in the users table`,
);
}

await spendCredits(dbAdapter, user.id, creditsConsumed);
} catch (err) {
log.error(
`Failed to track AI usage (matrixUserId: ${matrixUserId}, generationId: ${generationId}):`,
err,
);
Sentry.captureException(err);
// Don't throw, because we don't want to crash the application over this
}
}

async function fetchGenerationCostWithBackoff(
export async function fetchGenerationCostWithBackoff(
generationId: string,
openRouterApiKey: string,
): Promise<number | null> {
Expand Down Expand Up @@ -202,7 +159,6 @@ async function fetchGenerationCost(
},
);

// 404 means generation data probably isn't available yet - return null to trigger retry
if (response.status === 404) {
return null;
}
Expand All @@ -224,24 +180,3 @@ async function fetchGenerationCost(

return data.data.total_cost;
}

export function extractGenerationIdFromResponse(
response: any,
): string | undefined {
// OpenRouter responses typically include a generation_id in the response
// This might be in different places depending on the endpoint
if (response.id) {
return response.id;
}

if (response.choices && response.choices[0] && response.choices[0].id) {
return response.choices[0].id;
}

// For chat completions, the generation ID might be in usage
if (response.usage && response.usage.generation_id) {
return response.usage.generation_id;
}

return undefined;
}
96 changes: 38 additions & 58 deletions packages/realm-server/handlers/handle-request-forward.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,33 +57,33 @@ async function handleStreamingRequest(
if (!reader) throw new Error('No readable stream available');

let generationId: string | undefined;
let costInUsd: number | undefined;
let lastPing = Date.now();

await proxySSE(
reader,
async (data) => {
// Handle end of stream
if (data === '[DONE]') {
if (generationId) {
// Save cost in the background so we don't block the stream on OpenRouter's generation cost API.
// Chain per-user promises so costs are recorded sequentially.
const previousPromise =
pendingCostPromises.get(matrixUserId) ?? Promise.resolve();
const costPromise = previousPromise
.then(() =>
endpointConfig.creditStrategy.saveUsageCost(
dbAdapter,
matrixUserId,
{ id: generationId },
),
)
.finally(() => {
if (pendingCostPromises.get(matrixUserId) === costPromise) {
pendingCostPromises.delete(matrixUserId);
}
});
pendingCostPromises.set(matrixUserId, costPromise);
}
// Deduct credits using the cost from the streaming response.
// Chain per-user promises so costs are recorded sequentially.
const previousPromise =
pendingCostPromises.get(matrixUserId) ?? Promise.resolve();
const costPromise = previousPromise
.then(() =>
endpointConfig.creditStrategy.saveUsageCost(
dbAdapter,
matrixUserId,
{ id: generationId, usage: { cost: costInUsd } },
),
)
.finally(() => {
if (pendingCostPromises.get(matrixUserId) === costPromise) {
pendingCostPromises.delete(matrixUserId);
}
});
pendingCostPromises.set(matrixUserId, costPromise);

Comment on lines +68 to +86
Copy link

Copilot AI Apr 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the streaming [DONE] handler you always enqueue a saveUsageCost call, even when neither a generationId nor an inline cost was ever observed. This can generate noisy warnings (and sets a pendingCostPromises entry) for streams where no billable metadata exists. Consider guarding this block so you only schedule cost deduction when you have either a valid numeric costInUsd or a generationId to use for the fallback.

Suggested change
// Deduct credits using the cost from the streaming response.
// Chain per-user promises so costs are recorded sequentially.
const previousPromise =
pendingCostPromises.get(matrixUserId) ?? Promise.resolve();
const costPromise = previousPromise
.then(() =>
endpointConfig.creditStrategy.saveUsageCost(
dbAdapter,
matrixUserId,
{ id: generationId, usage: { cost: costInUsd } },
),
)
.finally(() => {
if (pendingCostPromises.get(matrixUserId) === costPromise) {
pendingCostPromises.delete(matrixUserId);
}
});
pendingCostPromises.set(matrixUserId, costPromise);
// Deduct credits using the cost from the streaming response only
// when we have enough metadata to save or resolve billing details.
// Chain per-user promises so costs are recorded sequentially.
const hasNumericCost =
typeof costInUsd === 'number' && Number.isFinite(costInUsd);
const hasBillingMetadata = hasNumericCost || generationId != null;
if (hasBillingMetadata) {
const previousPromise =
pendingCostPromises.get(matrixUserId) ?? Promise.resolve();
const costPromise = previousPromise
.then(() =>
endpointConfig.creditStrategy.saveUsageCost(
dbAdapter,
matrixUserId,
{ id: generationId, usage: { cost: costInUsd } },
),
)
.finally(() => {
if (pendingCostPromises.get(matrixUserId) === costPromise) {
pendingCostPromises.delete(matrixUserId);
}
});
pendingCostPromises.set(matrixUserId, costPromise);
}

Copilot uses AI. Check for mistakes.
ctxt.res.write(`data: [DONE]\n\n`);
return 'stop';
}
Expand All @@ -95,6 +95,10 @@ async function handleStreamingRequest(
if (!generationId && dataObj.id) {
generationId = dataObj.id;
}

if (dataObj.usage?.cost != null) {
costInUsd = dataObj.usage.cost;
}
} catch {
log.warn('Invalid JSON in streaming response:', data);
}
Expand Down Expand Up @@ -499,46 +503,22 @@ export default function handleRequestForward({

const responseData = await externalResponse.json();

// 6. Deduct credits in the background using the cost from the response,
// or fall back to saveUsageCost when the cost is not provided.
const costInUsd = responseData?.usage?.cost;
// 6. Deduct credits in the background using the cost from the response.
const previousPromise =
pendingCostPromises.get(matrixUserId) ?? Promise.resolve();
let costPromise: Promise<void>;

if (
typeof costInUsd === 'number' &&
Number.isFinite(costInUsd) &&
costInUsd > 0
) {
costPromise = previousPromise
.then(() =>
destinationConfig.creditStrategy.spendUsageCost(
dbAdapter,
matrixUserId,
costInUsd,
),
)
.finally(() => {
if (pendingCostPromises.get(matrixUserId) === costPromise) {
pendingCostPromises.delete(matrixUserId);
}
});
} else {
costPromise = previousPromise
.then(() =>
destinationConfig.creditStrategy.saveUsageCost(
dbAdapter,
matrixUserId,
responseData,
),
)
.finally(() => {
if (pendingCostPromises.get(matrixUserId) === costPromise) {
pendingCostPromises.delete(matrixUserId);
}
});
}
const costPromise = previousPromise
.then(() =>
destinationConfig.creditStrategy.saveUsageCost(
dbAdapter,
matrixUserId,
responseData,
),
)
.finally(() => {
if (pendingCostPromises.get(matrixUserId) === costPromise) {
pendingCostPromises.delete(matrixUserId);
}
});
pendingCostPromises.set(matrixUserId, costPromise);

// 7. Return response
Expand Down
Loading
Loading