Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 34 additions & 24 deletions .flue/agents/pr-scan.ts
Original file line number Diff line number Diff line change
Expand Up @@ -63,29 +63,34 @@ type PrScanPayload = {
};

export default async function ({ init, payload, env }: FlueContext) {
const envSource = toEnvSource(env);
const sandbox = await createDaytonaSandbox(env);
const workspacePath = await getWorkspacePath(sandbox);
await seedCiCdSkill(sandbox, workspacePath);

const harness = await init({
sandbox: daytona(sandbox),
cwd: workspacePath,
model: getAzureKimiModel(envSource),
});
const session = await harness.session();
const pr = normalizePayload(payload);
const workflowFiles = pr.files.filter((file) => file.path.startsWith(".github/workflows/"));

const ciCdFindings = workflowFiles.length
? await scanCiCdWorkflows(session, workflowFiles)
: [];

const { data } = await session.prompt(buildPrScanPrompt(pr, ciCdFindings), {
schema: prScanResultSchema,
});

return data;
try {
const envSource = toEnvSource(env);
const sandbox = await createDaytonaSandbox(env);
const workspacePath = await getWorkspacePath(sandbox);
await seedCiCdSkill(sandbox, workspacePath);

const harness = await init({
sandbox: daytona(sandbox),
cwd: workspacePath,
model: getAzureKimiModel(envSource),
});
const session = await harness.session();
const pr = normalizePayload(payload);
const workflowFiles = pr.files.filter((file) => file.path.startsWith(".github/workflows/"));

const ciCdFindings = workflowFiles.length
? await scanCiCdWorkflows(session, workflowFiles)
: [];

const { data } = await session.prompt(buildPrScanPrompt(pr, ciCdFindings), {
schema: prScanResultSchema,
});

return data;
} catch (err) {
const message = err instanceof Error ? err.message : "PR scan agent failed";
throw new Error(`PR scan agent failed: ${message}`, { cause: err });
}
}

async function createDaytonaSandbox(env: unknown) {
Expand Down Expand Up @@ -119,8 +124,13 @@ async function seedCiCdSkill(
`mkdir -p ${shellQuote(`${workspacePath}/.agents/skills/ci-cd-security/references`)}`,
);

const skillRoot = path.resolve(
process.cwd(),
".agents/skills/ci-cd-security",
);

for (const relativePath of SKILL_FILES) {
const sourcePath = path.resolve(".agents/skills/ci-cd-security", relativePath);
const sourcePath = path.join(skillRoot, relativePath);
const targetPath = `${workspacePath}/.agents/skills/ci-cd-security/${relativePath}`;
const content = await readFile(sourcePath);
await sandbox.fs.uploadFile(Buffer.from(content), targetPath);
Expand Down
55 changes: 53 additions & 2 deletions src/services/__tests__/prScanner.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ vi.mock("../../lib/logger.js", () => ({
childLogger: () => ({
error: vi.fn(),
info: vi.fn(),
warn: vi.fn(),
}),
}));

Expand Down Expand Up @@ -233,18 +234,68 @@ describe("scanPrLocally", () => {
});

it("returns an inconclusive error result when Flue fails", async () => {
vi.useFakeTimers();
vi.stubGlobal(
"fetch",
vi
.fn()
.mockResolvedValueOnce(jsonResponse({}))
.mockResolvedValueOnce(jsonResponse([]))
.mockResolvedValueOnce(new Response("Flue agent failed", { status: 500 })),
.mockImplementation(() => new Response("Flue agent failed", { status: 500 })),
);

const result = await scanPrLocally("acme", "repo", 12);
const scanPromise = scanPrLocally("acme", "repo", 12);
await vi.runAllTimersAsync();
const result = await scanPromise;

expect(result).toEqual({ error: "Flue PR scan returned 500: Flue agent failed" });
expect(vi.mocked(fetch)).toHaveBeenCalledTimes(5);
expect(vi.mocked(fetch).mock.calls[2][0]).toBe(
"http://127.0.0.1:3583/agents/pr-scan/acme-repo-12",
);
expect(vi.mocked(fetch).mock.calls[3][0]).toBe(
"http://127.0.0.1:3583/agents/pr-scan/acme-repo-12-retry-2",
);
expect(vi.mocked(fetch).mock.calls[4][0]).toBe(
"http://127.0.0.1:3583/agents/pr-scan/acme-repo-12-retry-3",
);
vi.useRealTimers();
});

it("retries transient Flue 500 errors before succeeding", async () => {
vi.useFakeTimers();
const fetchMock = vi
.fn()
.mockResolvedValueOnce(jsonResponse({ title: "Update build", body: "", user: { login: "octocat" } }))
.mockResolvedValueOnce(jsonResponse([]))
.mockResolvedValueOnce(new Response("temporary outage", { status: 500 }))
.mockResolvedValueOnce(jsonResponse({
findings: [
{
category: "malicious_intent",
severity: "medium",
title: "Suspicious change",
file: "package.json",
evidence: "Unexpected network call.",
recommendation: "Review the change.",
},
],
}));
vi.stubGlobal("fetch", fetchMock);

const scanPromise = scanPrLocally("acme", "repo", 12);
await vi.runAllTimersAsync();
const result = await scanPromise;

expect(result.findings?.[0]?.title).toBe("Suspicious change");
expect(fetchMock).toHaveBeenCalledTimes(4);
expect(fetchMock.mock.calls[2][0]).toBe(
"http://127.0.0.1:3583/agents/pr-scan/acme-repo-12",
);
expect(fetchMock.mock.calls[3][0]).toBe(
"http://127.0.0.1:3583/agents/pr-scan/acme-repo-12-retry-2",
);
vi.useRealTimers();
});
});

Expand Down
82 changes: 78 additions & 4 deletions src/services/prScanner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ const GITHUB_PR_FILES_PER_PAGE = 100;
const GITHUB_PR_FILES_PAGE_LIMIT = 30;
const MAX_PATCH_CHARS_PER_FILE = 8_000;
const MAX_PAYLOAD_CHARS = 100_000;
const FLUE_PR_SCAN_MAX_ATTEMPTS = 3;
const FLUE_PR_SCAN_RETRY_BASE_MS = 2_000;
const RETRYABLE_FLUE_STATUSES = new Set([429, 500, 502, 503, 504]);

interface GitHubPrFile {
filename: string;
Expand Down Expand Up @@ -185,9 +188,15 @@ function splitPatch(patch: string, maxChars: number): string[] {

async function runFluePrScans(payload: PrScanPayload): Promise<PrScanResult> {
const findings: PrFinding[] = [];
const log = childLogger({
service: "pr-scanner",
owner: payload.owner,
repo: payload.repo,
pr: payload.prNumber,
});

for (const batch of buildPayloadBatches(payload)) {
const result = await runFluePrScan(batch);
const result = await runFluePrScanWithRetry(batch, log);
if (result.error) return { error: result.error };
findings.push(...(result.findings ?? []));
}
Expand Down Expand Up @@ -228,12 +237,45 @@ function buildPayloadBatches(payload: PrScanPayload): PrScanPayload[] {
}));
}

async function runFluePrScan(payload: PrScanPayload): Promise<PrScanResult> {
async function runFluePrScanWithRetry(
payload: PrScanPayload,
log: ReturnType<typeof childLogger>,
): Promise<PrScanResult> {
let lastError: Error | undefined;

for (let attempt = 1; attempt <= FLUE_PR_SCAN_MAX_ATTEMPTS; attempt++) {
try {
return await requestFluePrScan(payload, attempt);
} catch (err) {
lastError = err instanceof Error ? err : new Error("Flue PR scan failed");
if (!isRetryableFlueError(err) || attempt === FLUE_PR_SCAN_MAX_ATTEMPTS) {
throw lastError;
}

const delayMs = FLUE_PR_SCAN_RETRY_BASE_MS * 2 ** (attempt - 1);
log.warn(
{ attempt, delayMs, err: lastError.message },
"Flue PR scan failed with retryable error; retrying",
);
await sleep(delayMs);
}
}

throw lastError ?? new Error("Flue PR scan failed");
}

async function requestFluePrScan(
payload: PrScanPayload,
attempt = 1,
): Promise<PrScanResult> {
const baseUrl = process.env.FLUE_BASE_URL ?? "http://127.0.0.1:3583";
const batchSuffix = payload.scan && payload.scan.batches > 1
? `-batch-${payload.scan.batch}-of-${payload.scan.batches}`
: "";
const agentId = encodeURIComponent(`${payload.owner}-${payload.repo}-${payload.prNumber}${batchSuffix}`);
const retrySuffix = attempt > 1 ? `-retry-${attempt}` : "";
const agentId = encodeURIComponent(
`${payload.owner}-${payload.repo}-${payload.prNumber}${batchSuffix}${retrySuffix}`,
);
const res = await fetch(`${baseUrl}/agents/pr-scan/${agentId}`, {
method: "POST",
headers: {
Expand All @@ -245,13 +287,45 @@ async function runFluePrScan(payload: PrScanPayload): Promise<PrScanResult> {
});

if (!res.ok) {
throw new Error(`Flue PR scan returned ${res.status}: ${await res.text()}`);
throw new FluePrScanHttpError(
`Flue PR scan returned ${res.status}: ${await res.text()}`,
res.status,
);
}

const data = await res.json();
return normalizePrScanResult(unwrapFlueResult(data));
}

class FluePrScanHttpError extends Error {
constructor(
message: string,
readonly status: number,
) {
super(message);
this.name = "FluePrScanHttpError";
}
}

function isRetryableFlueError(err: unknown): boolean {
if (err instanceof FluePrScanHttpError) {
return RETRYABLE_FLUE_STATUSES.has(err.status);
}

if (!(err instanceof Error)) return false;
if (err.name === "AbortError" || err.name === "TimeoutError") return true;

const code = (err as NodeJS.ErrnoException).code;
return code === "ECONNRESET"
|| code === "ECONNREFUSED"
|| code === "EPIPE"
|| code === "ETIMEDOUT";
}

function sleep(ms: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, ms));
}

function normalizePrScanResult(result: PrScanResult): PrScanResult {
return {
findings: (result.findings ?? []).map(normalizeFinding),
Expand Down
Loading