Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 85 additions & 0 deletions src/functions/compress.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import { validateOutput } from "../eval/validator.js";
import { scoreCompression } from "../eval/quality.js";
import { compressWithRetry } from "../eval/self-correct.js";
import type { MetricsStore } from "../eval/metrics-store.js";
import { buildSyntheticCompression } from "./compress-synthetic.js";
import { logger } from "../logger.js";

const VALID_TYPES = new Set<string>([
Expand Down Expand Up @@ -78,6 +79,90 @@ export function registerCompressFunction(
}) => {
const startMs = Date.now();

if (provider.name === "noop") {
logger.info("Compression skipped (noop provider) — generating synthetic compression", {
obsId: data.observationId,
});
const synthetic = buildSyntheticCompression(data.raw);
synthetic.id = data.observationId;
synthetic.sessionId = data.sessionId;
if (data.raw.timestamp) {
synthetic.timestamp = data.raw.timestamp;
}

await kv.set(
KV.observations(data.sessionId),
data.observationId,
synthetic,
);

try {
getSearchIndex().add(synthetic);
} catch (err) {
logger.warn("Failed to index compressed observation into BM25", {
obsId: synthetic.id,
sessionId: synthetic.sessionId,
title: synthetic.title,
error: err instanceof Error ? err.message : String(err),
});
}

await vectorIndexAddGuarded(
synthetic.id,
synthetic.sessionId,
synthetic.title + " " + (synthetic.narrative || ""),
{ kind: "observation", logId: synthetic.id },
);

const streamResults = await Promise.allSettled([
sdk.trigger({
function_id: "stream::set",
payload: {
stream_name: STREAM.name,
group_id: STREAM.group(data.sessionId),
item_id: data.observationId,
data: { type: "compressed", observation: synthetic },
},
}),
sdk.trigger({
function_id: "stream::send",
payload: {
stream_name: STREAM.name,
group_id: STREAM.viewerGroup,
id: `compressed-${data.observationId}`,
type: "compressed_observation",
data: {
type: "compressed",
observation: synthetic,
sessionId: data.sessionId,
},
},
action: TriggerAction.Void(),
}),
]);

for (const res of streamResults) {
if (res.status === "rejected") {
logger.warn("Non-fatal stream publish failure after compress", {
sessionId: data.sessionId,
observationId: data.observationId,
error:
res.reason instanceof Error
? res.reason.message
: String(res.reason),
});
}
}

logger.info("Observation compressed (synthetic noop fallback)", {
obsId: data.observationId,
type: synthetic.type,
importance: synthetic.importance,
});

return { success: true, compressed: synthetic, qualityScore: synthetic.confidence * 100 };
Comment on lines +93 to +163

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Add audit logging for noop fallback mutations.

Line 93 persists data and Lines 117-142 publish stream events, but this branch never calls recordAudit(). Please record an audit entry for this state-changing path before returning success.

As per coding guidelines: Use recordAudit() for state-changing operations.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/functions/compress.ts` around lines 93 - 163, The noop-fallback branch
persists data and publishes streams but never records an audit entry; add a call
to recordAudit(...) before returning success to satisfy the "state-changing
operations" guideline. Specifically, after the streamResults loop (and before
logger.info and the return) invoke recordAudit with the session id
(data.sessionId), the observation id (data.observationId or synthetic.id), an
action string like "compress:no-op-fallback" and relevant metadata (e.g., type:
synthetic.type, importance: synthetic.importance, compressed: true, confidence:
synthetic.confidence); await the call and handle/log any non-fatal errors
similarly to other async ops so failures don’t block the main flow.

}

let imageDescription: string | undefined;
const hasImage = data.raw.modality === "image" || data.raw.modality === "mixed";

Expand Down
61 changes: 59 additions & 2 deletions test/auto-compress.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -79,10 +79,10 @@ describe("mem::observe auto-compress gate (#138)", () => {
// test that sets the env var can be undermined by cached module
// state from an earlier test (and vice versa).
vi.resetModules();
delete process.env["AGENTMEMORY_AUTO_COMPRESS"];
process.env["AGENTMEMORY_AUTO_COMPRESS"] = "false";
});
afterEach(() => {
delete process.env["AGENTMEMORY_AUTO_COMPRESS"];
process.env["AGENTMEMORY_AUTO_COMPRESS"] = "false";
});

it("default (AGENTMEMORY_AUTO_COMPRESS unset): does NOT fire mem::compress", async () => {
Expand Down Expand Up @@ -244,3 +244,60 @@ describe("buildSyntheticCompression", () => {
expect(synth.type).toBe("error");
});
});

describe("mem::compress noop provider graceful fallback", () => {
it("gracefully falls back to synthetic compression when provider is noop", async () => {
const { registerCompressFunction } = await import(
"../src/functions/compress.js"
);
const sdk = mockSdk();
const kv = mockKV();
const mockProvider = {
name: "noop",
compress: vi.fn().mockResolvedValue(""),
summarize: vi.fn().mockResolvedValue(""),
};
const mockMetricsStore = {
record: vi.fn(),
};

registerCompressFunction(
sdk as any,
kv as any,
mockProvider as any,
mockMetricsStore as any
);

const rawObs: RawObservation = {
id: "obs_test_noop",
sessionId: "ses_test_noop",
timestamp: new Date().toISOString(),
hookType: "post_tool_use",
toolName: "Read",
toolInput: { file_path: "src/noop-test.ts" },
toolOutput: "test contents",
raw: {},
};

const compressFn = sdk.fns.get("mem::compress");
expect(compressFn).toBeDefined();

const result = await compressFn({
observationId: "obs_test_noop",
sessionId: "ses_test_noop",
raw: rawObs,
});

expect(result.success).toBe(true);
expect(result.compressed).toBeDefined();
expect(result.compressed.type).toBe("file_read");
expect(result.compressed.title).toBe("Read");
expect(result.compressed.files).toContain("src/noop-test.ts");

expect(mockMetricsStore.record).not.toHaveBeenCalled();

const stored = await kv.get("mem:obs:ses_test_noop", "obs_test_noop");
expect(stored).toBeDefined();
expect((stored as any).type).toBe("file_read");
});
});