Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
125 changes: 125 additions & 0 deletions web/src/routes/observability.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,16 @@
import { Hono } from 'hono';
import type { Env } from '../types.js';
import { getAllProceduresWithDerivedStats, getActiveAgendaItems } from '../kernel/memory/index.js';
import { detectEntropy } from '../kernel/scheduled/entropy.js';
import { buildEdgeEnv } from '../edge-env.js';

const observability = new Hono<{ Bindings: Env }>();

function boundedDays(value: string | undefined, fallback: number, max: number): number {
const days = parseInt(value ?? String(fallback), 10);
return Number.isNaN(days) || days < 1 || days > max ? fallback : days;
}

// ─── Shadow Write Stats ─────────────────────────────────────

observability.get('/api/shadow-stats', async (c) => {
Expand Down Expand Up @@ -46,6 +53,124 @@ observability.get('/api/shadow-read-stats', async (c) => {
return c.json({ days, summary, by_site: bySite.results, recent: recent.results });
});

// ─── Entropy ────────────────────────────────────────────────

observability.get('/api/entropy', async (c) => {
// detectEntropy needs the full EdgeEnv (API keys, model config) — not just c.env.DB
const env = buildEdgeEnv(c.env);
const report = await detectEntropy(env);
return c.json(report);
});

// ─── Shadow Read Drift ──────────────────────────────────────

observability.get('/api/shadow-read-drift', async (c) => {
const days = boundedDays(c.req.query('days'), 7, 30);
const reader = c.req.query('reader');

const latestWhere = reader
? "WHERE reader = ? AND sampled_at > datetime('now', '-' || ? || ' days')"
: "WHERE sampled_at > datetime('now', '-' || ? || ' days')";
// latestBindings: reader-first to match latestWhere (WHERE reader = ? AND sampled_at...)
// windowBindings: days-first to match the WHERE sampled_at... AND reader = ? pattern used in distribution/topDrifters
const latestBindings = reader ? [reader, days] : [days];
const windowBindings = reader ? [days, reader] : [days];

const [distribution, readiness, topDrifters] = await Promise.all([
c.env.DB.prepare(`
WITH ranked AS (
SELECT reader,
ABS((cached_count - pre_tier_count) - derived_count) AS count_abs_drift,
ABS(cached_avg_latency_ms - derived_avg_latency_ms) AS latency_abs_drift,
ABS(cached_avg_cost - derived_avg_cost) AS cost_abs_drift,
ROW_NUMBER() OVER (PARTITION BY reader ORDER BY ABS((cached_count - pre_tier_count) - derived_count)) AS count_rank,
ROW_NUMBER() OVER (PARTITION BY reader ORDER BY ABS(cached_avg_latency_ms - derived_avg_latency_ms)) AS latency_rank,
ROW_NUMBER() OVER (PARTITION BY reader ORDER BY ABS(cached_avg_cost - derived_avg_cost)) AS cost_rank,
COUNT(*) OVER (PARTITION BY reader) AS n
FROM shadow_read_drift
WHERE sampled_at > datetime('now', '-' || ? || ' days')
${reader ? 'AND reader = ?' : ''}
)
SELECT reader,
MAX(n) AS samples,
ROUND(AVG(count_abs_drift), 2) AS avg_abs_count_drift,
ROUND(MAX(count_abs_drift), 2) AS max_abs_count_drift,
ROUND(MAX(CASE WHEN count_rank = MAX(1, (n + 1) / 2) THEN count_abs_drift END), 2) AS p50_count_drift,
ROUND(MAX(CASE WHEN count_rank = MAX(1, (n * 19 + 19) / 20) THEN count_abs_drift END), 2) AS p95_count_drift,
ROUND(MAX(CASE WHEN count_rank = MAX(1, (n * 99 + 99) / 100) THEN count_abs_drift END), 2) AS p99_count_drift,
ROUND(AVG(latency_abs_drift), 2) AS avg_latency_drift_ms,
ROUND(MAX(latency_abs_drift), 2) AS max_latency_drift_ms,
ROUND(MAX(CASE WHEN latency_rank = MAX(1, (n + 1) / 2) THEN latency_abs_drift END), 2) AS p50_latency_drift_ms,
ROUND(MAX(CASE WHEN latency_rank = MAX(1, (n * 19 + 19) / 20) THEN latency_abs_drift END), 2) AS p95_latency_drift_ms,
ROUND(MAX(CASE WHEN latency_rank = MAX(1, (n * 99 + 99) / 100) THEN latency_abs_drift END), 2) AS p99_latency_drift_ms,
ROUND(AVG(cost_abs_drift), 6) AS avg_cost_drift,
ROUND(MAX(cost_abs_drift), 6) AS max_cost_drift,
ROUND(MAX(CASE WHEN cost_rank = MAX(1, (n + 1) / 2) THEN cost_abs_drift END), 6) AS p50_cost_drift,
ROUND(MAX(CASE WHEN cost_rank = MAX(1, (n * 19 + 19) / 20) THEN cost_abs_drift END), 6) AS p95_cost_drift,
ROUND(MAX(CASE WHEN cost_rank = MAX(1, (n * 99 + 99) / 100) THEN cost_abs_drift END), 6) AS p99_cost_drift
FROM ranked
GROUP BY reader
`).bind(...windowBindings).all(),

c.env.DB.prepare(`
WITH latest AS (
SELECT reader, task_pattern, cached_count, cached_success_count,
cached_avg_latency_ms, cached_avg_cost,
derived_count, derived_success_count,
derived_avg_latency_ms, derived_avg_cost,
pre_tier_count,
ROW_NUMBER() OVER (PARTITION BY reader, task_pattern ORDER BY sampled_at DESC) as rn
FROM shadow_read_drift
${latestWhere}
)
SELECT
COUNT(*) as total_pairs,
COUNT(DISTINCT task_pattern) as distinct_procedures,
SUM(CASE WHEN pre_tier_count = 0 THEN 1 ELSE 0 END) as clean_pairs,
SUM(CASE
WHEN pre_tier_count = 0
AND cached_count = derived_count
AND cached_success_count = derived_success_count
AND ABS(cached_avg_latency_ms - derived_avg_latency_ms) < 10
AND ABS(cached_avg_cost - derived_avg_cost) < 0.0001
THEN 1 ELSE 0 END) as ready_pairs
FROM latest WHERE rn = 1
`).bind(...latestBindings).first(),

c.env.DB.prepare(`
WITH latest_per_pattern AS (
SELECT task_pattern, reader,
cached_count, derived_count, pre_tier_count,
cached_avg_latency_ms, derived_avg_latency_ms,
cached_avg_cost, derived_avg_cost,
sampled_at,
ROW_NUMBER() OVER (PARTITION BY task_pattern, reader ORDER BY sampled_at DESC) as rn
FROM shadow_read_drift
WHERE sampled_at > datetime('now', '-' || ? || ' days')
${reader ? 'AND reader = ?' : ''}
)
SELECT task_pattern, reader,
cached_count, derived_count, pre_tier_count,
ABS((cached_count - pre_tier_count) - derived_count) as count_drift,
ROUND(ABS(cached_avg_latency_ms - derived_avg_latency_ms), 1) as latency_drift,
ROUND(ABS(cached_avg_cost - derived_avg_cost), 6) as cost_drift,
sampled_at
FROM latest_per_pattern
WHERE rn = 1
ORDER BY ABS((cached_count - pre_tier_count) - derived_count) DESC
LIMIT 15
`).bind(...windowBindings).all(),
]);

return c.json({
days,
reader_filter: reader ?? null,
distribution: distribution.results,
readiness,
top_drifters: topDrifters.results,
});
});

// ─── Agenda ─────────────────────────────────────────────────

observability.get('/agenda', async (c) => {
Expand Down
107 changes: 107 additions & 0 deletions web/tests/observability.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,19 @@ vi.mock('../src/kernel/memory/index.js', () => ({
getActiveAgendaItems: vi.fn().mockResolvedValue([]),
}));

vi.mock('../src/kernel/scheduled/entropy.js', () => ({
detectEntropy: vi.fn().mockResolvedValue({
score: 0,
ghostAgendaItems: [],
ghostTasks: [],
dormantGoals: [],
summary: 'Entropy 0% - all clear.',
}),
}));

import { observability } from '../src/routes/observability.js';
import { getAllProceduresWithDerivedStats, getActiveAgendaItems } from '../src/kernel/memory/index.js';
import { detectEntropy } from '../src/kernel/scheduled/entropy.js';
import type { Env } from '../src/types.js';

// ─── D1 Mock ──────────────────────────────────────────────────
Expand Down Expand Up @@ -134,6 +145,102 @@ describe('observability routes', () => {
});
});

describe('GET /api/entropy', () => {
it('returns entropy report from detector', async () => {
const report = {
score: 25,
ghostAgendaItems: [{ id: 1, item: 'Stale item', daysSinceCreated: 9 }],
ghostTasks: [],
dormantGoals: [],
summary: 'Entropy 25% - 1 agenda item stale >7d.',
};
vi.mocked(detectEntropy).mockResolvedValueOnce(report as any);

const db = createMockDb();
const app = createApp(db);

const res = await app.request('/api/entropy');
expect(res.status).toBe(200);
expect(await res.json()).toEqual(report);
expect(detectEntropy).toHaveBeenCalledWith(expect.objectContaining({ db }));
});
});

describe('GET /api/shadow-read-drift', () => {
it('returns drift distribution, readiness, and top drifters', async () => {
const distribution = [{
reader: 'observability',
samples: 4,
p95_count_drift: 1,
p95_latency_drift_ms: 5,
p95_cost_drift: 0.00001,
}];
const readiness = {
total_pairs: 2,
distinct_procedures: 2,
clean_pairs: 2,
ready_pairs: 1,
};
const topDrifters = [{
task_pattern: 'dispatch:mid',
reader: 'observability',
count_drift: 1,
latency_drift: 5,
cost_drift: 0.00001,
sampled_at: '2026-04-24 00:00:00',
}];
const db = createMockDb({
firstResults: [readiness],
allResults: [distribution, topDrifters],
});
const app = createApp(db);

const res = await app.request('/api/shadow-read-drift?days=14&reader=observability');
expect(res.status).toBe(200);
const json = await res.json() as any;
expect(json).toEqual({
days: 14,
reader_filter: 'observability',
distribution,
readiness,
top_drifters: topDrifters,
});
expect(db._queries[0].bindings).toEqual([14, 'observability']);
expect(db._queries[1].bindings).toEqual(['observability', 14]);
expect(db._queries[2].bindings).toEqual([14, 'observability']);
});

it('defaults invalid days and omits reader filter', async () => {
const db = createMockDb({
firstResults: [{ total_pairs: 0, distinct_procedures: 0, clean_pairs: 0, ready_pairs: 0 }],
allResults: [[], []],
});
const app = createApp(db);

const res = await app.request('/api/shadow-read-drift?days=999');
expect(res.status).toBe(200);
const json = await res.json() as any;
expect(json.days).toBe(7);
expect(json.reader_filter).toBeNull();
expect(db._queries[0].bindings).toEqual([7]);
expect(db._queries[1].bindings).toEqual([7]);
expect(db._queries[2].bindings).toEqual([7]);
});

it('defaults days=0 to fallback', async () => {
const db = createMockDb({
firstResults: [{ total_pairs: 0, distinct_procedures: 0, clean_pairs: 0, ready_pairs: 0 }],
allResults: [[], []],
});
const app = createApp(db);

const res = await app.request('/api/shadow-read-drift?days=0');
expect(res.status).toBe(200);
const json = await res.json() as any;
expect(json.days).toBe(7);
});
});

describe('GET /agenda', () => {
it('returns active agenda items', async () => {
const items = [
Expand Down
Loading