Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 35 additions & 0 deletions src/agent/llm/pi-ai-openai-responses-shared.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ import type {
} from "openai/resources/responses/responses.js";
import { appendCappedTextTail, capTextTail } from "@/src/shared/capped-text.js";

export type ResponsesInput = Exclude<ResponseCreateParamsStreaming["input"], undefined>;

export interface OpenAIResponsesStreamOptions {
serviceTier?: ResponseCreateParamsStreaming["service_tier"];
applyServiceTierPricing?: (
Expand Down Expand Up @@ -74,6 +76,39 @@ type StreamBlock = StreamThinkingBlock | StreamTextBlock | StreamToolCallBlock;
const STREAM_REASONING_CONTENT_MAX_CHARS = 100_000;
const STREAM_REASONING_TRUNCATION_PREFIX = "...[earlier reasoning truncated]\n";

/**
* Normalizes developer role to system for the Responses API.
*
* This is a hard invariant: the Responses API must never emit developer-role
* input items, even if a future upstream change allows `supportsDeveloperRole`
* in a compat layer. The responses format treats developer and system as
* equivalent, and pokoclaw normalizes to system everywhere.
*/
export function normalizeResponsesInputRoles(input: ResponsesInput): ResponsesInput {
if (!Array.isArray(input)) {
return input;
}

let changed = false;
const normalized = input.map((item) => {
if (!isRecord(item) || item.role !== "developer") {
return item;
}

changed = true;
return {
...item,
role: "system" as const,
};
});

return changed ? normalized : input;
}

export function isRecord(value: unknown): value is Record<string, unknown> & { role?: unknown } {
return typeof value === "object" && value !== null && !Array.isArray(value);
}

interface ResponsesReasoningSummaryPart {
text: string;
}
Expand Down
106 changes: 75 additions & 31 deletions src/agent/llm/pi-bridge.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,10 @@ import {
type Api,
type AssistantMessage,
type AssistantMessageEvent,
type Context,
completeSimple,
type Model,
type SimpleStreamOptions,
type Tool,
Type,
} from "@mariozechner/pi-ai";
Expand Down Expand Up @@ -90,8 +92,15 @@ export interface PiBridgeRunTurnResult {
errorMessage?: string;
}

export interface ResolvedProviderApiCredential {
apiKey: string;
accountId?: string;
}

export type ResolvedProviderApiKey = string | ResolvedProviderApiCredential;

export interface ProviderApiKeyResolver {
resolveApiKey(provider: ResolvedProvider): Promise<string | undefined>;
resolveApiKey(provider: ResolvedProvider): Promise<ResolvedProviderApiKey | undefined>;
}

export interface PiBridgeOptions {
Expand Down Expand Up @@ -251,13 +260,18 @@ export class PiBridge {
});

try {
const finalMessage = await completeSimple(
model,
context,
await buildPiStreamOptions(this.providerApiKeyResolver, input.model, input.signal, {
const streamOptions = await buildPiStreamOptions(
this.providerApiKeyResolver,
input.model,
input.signal,
{
enableReasoning: true,
}),
},
);
const finalMessage =
model.api === "openai-codex-responses"
? await consumeStreamToCompletion(model, context, streamOptions)
: await completeSimple(model, context, streamOptions);
const contentSummary = summarizeAssistantContent(finalMessage.content);
logger.debug("non-stream llm turn finished", {
modelId: input.model.id,
Expand Down Expand Up @@ -286,27 +300,28 @@ export class PiBridge {
});

try {
const finalMessage = await completeSimple(
model,
{
systemPrompt: input.systemPrompt,
messages: [
{
role: "user" as const,
content: [{ type: "text" as const, text: input.prompt }],
timestamp: Date.now(),
},
],
},
await buildPiStreamOptions(
this.providerApiKeyResolver,
input.model,
input.signal ?? new AbortController().signal,
const context = {
systemPrompt: input.systemPrompt,
messages: [
{
enableReasoning: false,
role: "user" as const,
content: [{ type: "text" as const, text: input.prompt }],
timestamp: Date.now(),
},
),
],
};
const streamOptions = await buildPiStreamOptions(
this.providerApiKeyResolver,
input.model,
input.signal ?? new AbortController().signal,
{
enableReasoning: false,
},
);
const finalMessage =
model.api === "openai-codex-responses"
? await consumeStreamToCompletion(model, context, streamOptions)
: await completeSimple(model, context, streamOptions);

const normalized = normalizeAssistantResult(finalMessage, "complete");
logger.debug("compaction llm call finished", {
Expand Down Expand Up @@ -365,6 +380,20 @@ export class PiAgentModelRunner implements AgentModelRunner, CompactionModelRunn
}
}

async function consumeStreamToCompletion(
model: Model<Api>,
context: Context,
options: SimpleStreamOptions,
): Promise<AssistantMessage> {
// Reuses the Codex streaming adapter for converter parity only. Non-streaming
// calls still rely on the caller-provided AbortSignal, not LlmStreamWatchdog.
const stream = streamWithNormalizedUpstreamUsage(model, context, options);
for await (const _event of stream) {
// Drain the stream so the adapter can assemble the final AssistantMessage.
}
return await stream.result();
}

type LlmStreamWatchdogTimeoutKind = "first_response" | "stream_idle";

class LlmStreamWatchdog {
Expand Down Expand Up @@ -621,6 +650,7 @@ async function buildPiStreamOptions(
sessionId: string;
maxTokens: number;
apiKey?: string;
codexAccountId?: string;
reasoning?: "minimal" | "low" | "medium" | "high" | "xhigh";
serviceTier?: RequestServiceTier;
onPayload?: (
Expand All @@ -633,11 +663,16 @@ async function buildPiStreamOptions(
maxTokens: model.maxOutputTokens,
};
let resolvedApiKey: string | undefined;
let resolvedCodexAccountId: string | undefined;
try {
resolvedApiKey =
(await providerApiKeyResolver?.resolveApiKey(model.provider)) ??
model.provider.apiKey ??
undefined;
const resolvedCredential = await providerApiKeyResolver?.resolveApiKey(model.provider);
if (typeof resolvedCredential === "string") {
resolvedApiKey = resolvedCredential;
} else if (resolvedCredential != null) {
resolvedApiKey = resolvedCredential.apiKey;
resolvedCodexAccountId = resolvedCredential.accountId;
}
resolvedApiKey ??= model.provider.apiKey ?? undefined;
} catch (error) {
throw enrichCodexFetchFailure(error, model, "auth");
}
Expand All @@ -647,6 +682,9 @@ async function buildPiStreamOptions(
apiKey,
});
}
if (resolvedCodexAccountId != null && resolvedCodexAccountId.length > 0) {
options.codexAccountId = resolvedCodexAccountId;
}

if (input.enableReasoning && model.reasoning?.enabled) {
options.reasoning = model.reasoning.effort ?? DEFAULT_REASONING_LEVEL;
Expand All @@ -655,7 +693,9 @@ async function buildPiStreamOptions(
const serviceTier = resolveRequestServiceTier(model.serviceTier);
if (serviceTier != null && shouldApplyOpenAIServiceTier(model)) {
options.serviceTier = serviceTier;
options.onPayload = createServiceTierPayloadPatch(serviceTier);
if (shouldInjectServiceTierViaPayloadHook(model)) {
options.onPayload = createServiceTierPayloadPatch(serviceTier);
}
}

return options;
Expand All @@ -671,8 +711,8 @@ function resolveRequestServiceTier(
}

function createServiceTierPayloadPatch(serviceTier: RequestServiceTier) {
// pi-ai calls `onPayload` with the live request payload before JSON serialization.
// Mutating in place is intentional here and covered by the final-fetch bridge tests.
// Used for pi-ai streamSimple adapters that do not read Pokoclaw's
// `options.serviceTier` extension. Custom adapters set service_tier directly.
return (payload: unknown): unknown | undefined => {
if (!isPlainObjectRecord(payload) || payload.service_tier !== undefined) {
return undefined;
Expand All @@ -682,6 +722,10 @@ function createServiceTierPayloadPatch(serviceTier: RequestServiceTier) {
};
}

function shouldInjectServiceTierViaPayloadHook(model: ResolvedModel): boolean {
return resolvePiApi(model) === "openai-responses";
}

function isPlainObjectRecord(value: unknown): value is Record<string, unknown> {
return Boolean(value && typeof value === "object" && !Array.isArray(value));
}
Expand Down
12 changes: 9 additions & 3 deletions src/agent/llm/providers/codex/resolver.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { refreshOpenAICodexToken } from "@mariozechner/pi-ai/oauth";
import type { ResolvedProvider } from "@/src/agent/llm/models.js";
import type { ProviderApiKeyResolver } from "@/src/agent/llm/pi-bridge.js";
import type { ProviderApiKeyResolver, ResolvedProviderApiKey } from "@/src/agent/llm/pi-bridge.js";
import { withFileLock } from "@/src/shared/file-lock.js";
import { CODEX_CREDENTIALS_PATH } from "@/src/shared/paths.js";
import {
Expand All @@ -14,7 +14,7 @@ import { readStoredCodexCredential, writeStoredCodexCredential } from "./store.j
// This prevents accidentally forwarding ChatGPT/Codex bearer tokens to arbitrary
// third-party endpoints via a misconfigured provider.
export class CodexProviderApiKeyResolver implements ProviderApiKeyResolver {
async resolveApiKey(provider: ResolvedProvider): Promise<string | undefined> {
async resolveApiKey(provider: ResolvedProvider): Promise<ResolvedProviderApiKey | undefined> {
if (provider.authSource !== "codex-local") {
return provider.apiKey;
}
Expand All @@ -25,7 +25,13 @@ export class CodexProviderApiKeyResolver implements ProviderApiKeyResolver {
}

const credential = await resolveCodexCredential();
return credential?.accessToken;
if (credential == null) {
return undefined;
}
return {
apiKey: credential.accessToken,
...(credential.accountId == null ? {} : { accountId: credential.accountId }),
};
}
}

Expand Down
Loading
Loading