From ebd240489c2b6acea69c129707d991598d99ac61 Mon Sep 17 00:00:00 2001 From: Ismail Pelaseyed Date: Wed, 20 May 2026 13:13:21 +0200 Subject: [PATCH] Retry transient Flue PR scan failures to reduce inconclusive checks. Fixes #16. --- .flue/agents/pr-scan.ts | 58 ++++++++++------- src/services/__tests__/prScanner.test.ts | 55 +++++++++++++++- src/services/prScanner.ts | 82 ++++++++++++++++++++++-- 3 files changed, 165 insertions(+), 30 deletions(-) diff --git a/.flue/agents/pr-scan.ts b/.flue/agents/pr-scan.ts index 989b645..892569b 100644 --- a/.flue/agents/pr-scan.ts +++ b/.flue/agents/pr-scan.ts @@ -63,29 +63,34 @@ type PrScanPayload = { }; export default async function ({ init, payload, env }: FlueContext) { - const envSource = toEnvSource(env); - const sandbox = await createDaytonaSandbox(env); - const workspacePath = await getWorkspacePath(sandbox); - await seedCiCdSkill(sandbox, workspacePath); - - const harness = await init({ - sandbox: daytona(sandbox), - cwd: workspacePath, - model: getAzureKimiModel(envSource), - }); - const session = await harness.session(); - const pr = normalizePayload(payload); - const workflowFiles = pr.files.filter((file) => file.path.startsWith(".github/workflows/")); - - const ciCdFindings = workflowFiles.length - ? await scanCiCdWorkflows(session, workflowFiles) - : []; - - const { data } = await session.prompt(buildPrScanPrompt(pr, ciCdFindings), { - schema: prScanResultSchema, - }); - - return data; + try { + const envSource = toEnvSource(env); + const sandbox = await createDaytonaSandbox(env); + const workspacePath = await getWorkspacePath(sandbox); + await seedCiCdSkill(sandbox, workspacePath); + + const harness = await init({ + sandbox: daytona(sandbox), + cwd: workspacePath, + model: getAzureKimiModel(envSource), + }); + const session = await harness.session(); + const pr = normalizePayload(payload); + const workflowFiles = pr.files.filter((file) => file.path.startsWith(".github/workflows/")); + + const ciCdFindings = workflowFiles.length + ? await scanCiCdWorkflows(session, workflowFiles) + : []; + + const { data } = await session.prompt(buildPrScanPrompt(pr, ciCdFindings), { + schema: prScanResultSchema, + }); + + return data; + } catch (err) { + const message = err instanceof Error ? err.message : "PR scan agent failed"; + throw new Error(`PR scan agent failed: ${message}`, { cause: err }); + } } async function createDaytonaSandbox(env: unknown) { @@ -119,8 +124,13 @@ async function seedCiCdSkill( `mkdir -p ${shellQuote(`${workspacePath}/.agents/skills/ci-cd-security/references`)}`, ); + const skillRoot = path.resolve( + process.cwd(), + ".agents/skills/ci-cd-security", + ); + for (const relativePath of SKILL_FILES) { - const sourcePath = path.resolve(".agents/skills/ci-cd-security", relativePath); + const sourcePath = path.join(skillRoot, relativePath); const targetPath = `${workspacePath}/.agents/skills/ci-cd-security/${relativePath}`; const content = await readFile(sourcePath); await sandbox.fs.uploadFile(Buffer.from(content), targetPath); diff --git a/src/services/__tests__/prScanner.test.ts b/src/services/__tests__/prScanner.test.ts index 8d8b19f..605c6be 100644 --- a/src/services/__tests__/prScanner.test.ts +++ b/src/services/__tests__/prScanner.test.ts @@ -5,6 +5,7 @@ vi.mock("../../lib/logger.js", () => ({ childLogger: () => ({ error: vi.fn(), info: vi.fn(), + warn: vi.fn(), }), })); @@ -233,18 +234,68 @@ describe("scanPrLocally", () => { }); it("returns an inconclusive error result when Flue fails", async () => { + vi.useFakeTimers(); vi.stubGlobal( "fetch", vi .fn() .mockResolvedValueOnce(jsonResponse({})) .mockResolvedValueOnce(jsonResponse([])) - .mockResolvedValueOnce(new Response("Flue agent failed", { status: 500 })), + .mockImplementation(() => new Response("Flue agent failed", { status: 500 })), ); - const result = await scanPrLocally("acme", "repo", 12); + const scanPromise = scanPrLocally("acme", "repo", 12); + await vi.runAllTimersAsync(); + const result = await scanPromise; expect(result).toEqual({ error: "Flue PR scan returned 500: Flue agent failed" }); + expect(vi.mocked(fetch)).toHaveBeenCalledTimes(5); + expect(vi.mocked(fetch).mock.calls[2][0]).toBe( + "http://127.0.0.1:3583/agents/pr-scan/acme-repo-12", + ); + expect(vi.mocked(fetch).mock.calls[3][0]).toBe( + "http://127.0.0.1:3583/agents/pr-scan/acme-repo-12-retry-2", + ); + expect(vi.mocked(fetch).mock.calls[4][0]).toBe( + "http://127.0.0.1:3583/agents/pr-scan/acme-repo-12-retry-3", + ); + vi.useRealTimers(); + }); + + it("retries transient Flue 500 errors before succeeding", async () => { + vi.useFakeTimers(); + const fetchMock = vi + .fn() + .mockResolvedValueOnce(jsonResponse({ title: "Update build", body: "", user: { login: "octocat" } })) + .mockResolvedValueOnce(jsonResponse([])) + .mockResolvedValueOnce(new Response("temporary outage", { status: 500 })) + .mockResolvedValueOnce(jsonResponse({ + findings: [ + { + category: "malicious_intent", + severity: "medium", + title: "Suspicious change", + file: "package.json", + evidence: "Unexpected network call.", + recommendation: "Review the change.", + }, + ], + })); + vi.stubGlobal("fetch", fetchMock); + + const scanPromise = scanPrLocally("acme", "repo", 12); + await vi.runAllTimersAsync(); + const result = await scanPromise; + + expect(result.findings?.[0]?.title).toBe("Suspicious change"); + expect(fetchMock).toHaveBeenCalledTimes(4); + expect(fetchMock.mock.calls[2][0]).toBe( + "http://127.0.0.1:3583/agents/pr-scan/acme-repo-12", + ); + expect(fetchMock.mock.calls[3][0]).toBe( + "http://127.0.0.1:3583/agents/pr-scan/acme-repo-12-retry-2", + ); + vi.useRealTimers(); }); }); diff --git a/src/services/prScanner.ts b/src/services/prScanner.ts index 7223aa6..6291161 100644 --- a/src/services/prScanner.ts +++ b/src/services/prScanner.ts @@ -5,6 +5,9 @@ const GITHUB_PR_FILES_PER_PAGE = 100; const GITHUB_PR_FILES_PAGE_LIMIT = 30; const MAX_PATCH_CHARS_PER_FILE = 8_000; const MAX_PAYLOAD_CHARS = 100_000; +const FLUE_PR_SCAN_MAX_ATTEMPTS = 3; +const FLUE_PR_SCAN_RETRY_BASE_MS = 2_000; +const RETRYABLE_FLUE_STATUSES = new Set([429, 500, 502, 503, 504]); interface GitHubPrFile { filename: string; @@ -185,9 +188,15 @@ function splitPatch(patch: string, maxChars: number): string[] { async function runFluePrScans(payload: PrScanPayload): Promise { const findings: PrFinding[] = []; + const log = childLogger({ + service: "pr-scanner", + owner: payload.owner, + repo: payload.repo, + pr: payload.prNumber, + }); for (const batch of buildPayloadBatches(payload)) { - const result = await runFluePrScan(batch); + const result = await runFluePrScanWithRetry(batch, log); if (result.error) return { error: result.error }; findings.push(...(result.findings ?? [])); } @@ -228,12 +237,45 @@ function buildPayloadBatches(payload: PrScanPayload): PrScanPayload[] { })); } -async function runFluePrScan(payload: PrScanPayload): Promise { +async function runFluePrScanWithRetry( + payload: PrScanPayload, + log: ReturnType, +): Promise { + let lastError: Error | undefined; + + for (let attempt = 1; attempt <= FLUE_PR_SCAN_MAX_ATTEMPTS; attempt++) { + try { + return await requestFluePrScan(payload, attempt); + } catch (err) { + lastError = err instanceof Error ? err : new Error("Flue PR scan failed"); + if (!isRetryableFlueError(err) || attempt === FLUE_PR_SCAN_MAX_ATTEMPTS) { + throw lastError; + } + + const delayMs = FLUE_PR_SCAN_RETRY_BASE_MS * 2 ** (attempt - 1); + log.warn( + { attempt, delayMs, err: lastError.message }, + "Flue PR scan failed with retryable error; retrying", + ); + await sleep(delayMs); + } + } + + throw lastError ?? new Error("Flue PR scan failed"); +} + +async function requestFluePrScan( + payload: PrScanPayload, + attempt = 1, +): Promise { const baseUrl = process.env.FLUE_BASE_URL ?? "http://127.0.0.1:3583"; const batchSuffix = payload.scan && payload.scan.batches > 1 ? `-batch-${payload.scan.batch}-of-${payload.scan.batches}` : ""; - const agentId = encodeURIComponent(`${payload.owner}-${payload.repo}-${payload.prNumber}${batchSuffix}`); + const retrySuffix = attempt > 1 ? `-retry-${attempt}` : ""; + const agentId = encodeURIComponent( + `${payload.owner}-${payload.repo}-${payload.prNumber}${batchSuffix}${retrySuffix}`, + ); const res = await fetch(`${baseUrl}/agents/pr-scan/${agentId}`, { method: "POST", headers: { @@ -245,13 +287,45 @@ async function runFluePrScan(payload: PrScanPayload): Promise { }); if (!res.ok) { - throw new Error(`Flue PR scan returned ${res.status}: ${await res.text()}`); + throw new FluePrScanHttpError( + `Flue PR scan returned ${res.status}: ${await res.text()}`, + res.status, + ); } const data = await res.json(); return normalizePrScanResult(unwrapFlueResult(data)); } +class FluePrScanHttpError extends Error { + constructor( + message: string, + readonly status: number, + ) { + super(message); + this.name = "FluePrScanHttpError"; + } +} + +function isRetryableFlueError(err: unknown): boolean { + if (err instanceof FluePrScanHttpError) { + return RETRYABLE_FLUE_STATUSES.has(err.status); + } + + if (!(err instanceof Error)) return false; + if (err.name === "AbortError" || err.name === "TimeoutError") return true; + + const code = (err as NodeJS.ErrnoException).code; + return code === "ECONNRESET" + || code === "ECONNREFUSED" + || code === "EPIPE" + || code === "ETIMEDOUT"; +} + +function sleep(ms: number): Promise { + return new Promise((resolve) => setTimeout(resolve, ms)); +} + function normalizePrScanResult(result: PrScanResult): PrScanResult { return { findings: (result.findings ?? []).map(normalizeFinding),