diff --git a/CHANGELOG.md b/CHANGELOG.md index c0a42c2..b91b45a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -23,6 +23,7 @@ the first consumer-visible behaviour change and will drive the next SDK version - **Automation-detector wording**: the anti-tamper flag now reads "Anti-tamper signals" (not "Automation detected") when the combined confidence is weak — e.g. devtools open in a dev environment — so a human isn't labelled a bot. The machine-readable `code` (`automation_suspected`) is unchanged; the reason text also drops the `tamper.` prefix for readability. ### Internal +- **Retention sweeper** ([ADR-0004](docs/adr/0004-consent-and-data-lifecycle.md)): a daily BullMQ repeatable job in the worker deletes identities (and, by cascade, their snapshots/drifts/risk/links) whose `last_seen` is older than their project's `retention_days`; projects with a null `retention_days` keep data indefinitely. `sweepRetention()` is idempotent and unit-tested against a real DB. - **Data-subject endpoints** ([ADR-0004](docs/adr/0004-consent-and-data-lifecycle.md)): `DELETE /v1/identity/:id` (GDPR Art. 17 — erases the identity; snapshots/drifts/risk assessments/account links cascade) and `GET /v1/identity/:id/export` (Art. 20 — the full bundle, including each snapshot's consent provenance). Erasure is strictly key-gated (a non-GET never reaches the route via an admin session); export is readable by key or admin session like the other reads. - **Consent provenance + IP minimization on the server** ([ADR-0004](docs/adr/0004-consent-and-data-lifecycle.md), migration 012): snapshots now record the lawful basis, consent version, and grant time the SDK forwards (falling back to a per-project `lawful_basis_default`). The stored `client_ip` is **network-truncated by default** (`/24` IPv4, `/48` IPv6 via `minimizeIp`) — still city-accurate for impossible-travel, while the full IP is used only transiently in the request path (so the anonymizer detector is unaffected). A per-project `store_full_ip` flag keeps the full address for operators with a documented basis. New `projects.retention_days` column lands here for the upcoming retention sweeper. - **Demo recognises a new device on re-observe**: the demo showed the synchronous `POST /v1/resolve` result, then committed via `sdk.flush()` (async ingest). `flush()` returns on enqueue, not on commit, so a first-sight device read as "new" on every click. The demo now polls resolve (bounded) until the commit lands before re-enabling Observe, and the status spells out the eventual-consistency step — so the next click resolves against committed history and recognises the device. diff --git a/packages/server/src/pipeline/retention.integration.test.ts b/packages/server/src/pipeline/retention.integration.test.ts new file mode 100644 index 0000000..6402952 --- /dev/null +++ b/packages/server/src/pipeline/retention.integration.test.ts @@ -0,0 +1,68 @@ +import { describe, it, expect, beforeAll, afterAll } from 'vitest'; +import { migrate } from '../db/migrate.js'; +import { db } from '../db/client.js'; +import { sweepRetention } from './retention.js'; +import { hashApiKey } from '../middleware/api-key.js'; + +// Gated on DATABASE_URL like the other integration suites (runs in CI, skips locally +// without a DB). See events.integration.test.ts for the rationale. +const hasDb = Boolean(process.env['DATABASE_URL']); +const API_KEY = 'retention-test-key'; + +let projectId: string; // retention_days = 30 + +async function seedIdentity(project: string, ageDays: number): Promise { + const id = `ret-${ageDays}d-${crypto.randomUUID()}`; + await db` + INSERT INTO identities (id, project_id, last_seen) + VALUES (${id}, ${project}, now() - make_interval(days => ${ageDays})) + `; + return id; +} + +beforeAll(async () => { + if (!hasDb) return; + await migrate(); + await db`DELETE FROM projects WHERE api_key_hash IN (${hashApiKey(API_KEY)}, ${hashApiKey(`${API_KEY}-keep`)})`; + const [proj] = await db<{ id: string }[]>` + INSERT INTO projects (api_key_hash, name, retention_days) + VALUES (${hashApiKey(API_KEY)}, 'Retention Test', 30) RETURNING id + `; + projectId = proj!.id; +}); + +afterAll(async () => { + if (!hasDb) return; + await db`DELETE FROM projects WHERE api_key_hash IN (${hashApiKey(API_KEY)}, ${hashApiKey(`${API_KEY}-keep`)})`; + await db.end(); +}); + +describe.skipIf(!hasDb)('sweepRetention', () => { + it('deletes identities older than retention_days and keeps recent ones', async () => { + const stale = await seedIdentity(projectId, 60); // older than 30d → swept + const fresh = await seedIdentity(projectId, 1); // within 30d → kept + + const result = await sweepRetention(db); + expect(result.identitiesDeleted).toBeGreaterThanOrEqual(1); + + const rows = await db<{ id: string }[]>` + SELECT id FROM identities WHERE project_id = ${projectId} + `; + const ids = rows.map((r) => r.id); + expect(ids).toContain(fresh); + expect(ids).not.toContain(stale); + }); + + it('skips projects with null retention_days (keep forever)', async () => { + const [keepProj] = await db<{ id: string }[]>` + INSERT INTO projects (api_key_hash, name) + VALUES (${hashApiKey(`${API_KEY}-keep`)}, 'Keep Forever') RETURNING id + `; + const ancient = await seedIdentity(keepProj!.id, 999); + + await sweepRetention(db); + + const rows = await db<{ id: string }[]>`SELECT id FROM identities WHERE id = ${ancient}`; + expect(rows.length).toBe(1); // untouched — retention_days is null + }); +}); diff --git a/packages/server/src/pipeline/retention.ts b/packages/server/src/pipeline/retention.ts new file mode 100644 index 0000000..dee19e2 --- /dev/null +++ b/packages/server/src/pipeline/retention.ts @@ -0,0 +1,35 @@ +import type { Sql } from 'postgres'; +import { logger } from '../logger.js'; + +// The repeatable retention sweep runs on its own BullMQ queue (separate from ingest). +export const RETENTION_QUEUE_NAME = 'retention'; + +// Deletes identities whose most recent activity (last_seen) is older than their +// project's retention_days; snapshots, drifts, risk assessments, cluster merges, and +// account links cascade (ON DELETE CASCADE on identity_id). Projects with a null or +// non-positive retention_days keep data indefinitely and are skipped. Idempotent — +// safe to run repeatedly. Returns per-run counts for logging. (GDPR data-lifecycle, +// ADR-0004.) +export async function sweepRetention( + sql: Sql, +): Promise<{ projects: number; identitiesDeleted: number }> { + const projects = await sql<{ id: string; retention_days: number }[]>` + SELECT id, retention_days + FROM projects + WHERE retention_days IS NOT NULL AND retention_days > 0 + `; + + let identitiesDeleted = 0; + for (const p of projects) { + const deleted = await sql<{ id: string }[]>` + DELETE FROM identities + WHERE project_id = ${p.id} + AND last_seen < now() - make_interval(days => ${p.retention_days}) + RETURNING id + `; + identitiesDeleted += deleted.length; + } + + logger.info({ projects: projects.length, identitiesDeleted }, 'retention sweep complete'); + return { projects: projects.length, identitiesDeleted }; +} diff --git a/packages/server/src/worker.ts b/packages/server/src/worker.ts index 180896b..0319282 100644 --- a/packages/server/src/worker.ts +++ b/packages/server/src/worker.ts @@ -1,10 +1,11 @@ import { startTracing } from './tracing.js'; startTracing(); -import { Worker } from 'bullmq'; +import { Queue, Worker } from 'bullmq'; import { createQueueConnection, INGEST_QUEUE_NAME } from './queue/ingest.js'; import type { IngestJobData } from './queue/ingest.js'; import { resolveSnapshot } from './pipeline/resolve.js'; +import { sweepRetention, RETENTION_QUEUE_NAME } from './pipeline/retention.js'; import { db } from './db/client.js'; // Imported after ./tracing.js so the OTel pino instrumentation patches pino and // injects trace_id/span_id into log lines — same ordering rationale as index.ts. @@ -36,11 +37,30 @@ worker.on('failed', (job, err) => { ); }); +// Daily retention sweep (GDPR data-lifecycle, ADR-0004). A repeatable job is enqueued +// idempotently (BullMQ dedupes by repeat key, so re-adding on every boot is safe) and +// processed by a dedicated worker that deletes data past each project's retention_days. +const retentionQueue = new Queue(RETENTION_QUEUE_NAME, { connection: createQueueConnection() }); +void retentionQueue + .add('sweep', {}, { repeat: { pattern: '30 3 * * *' }, removeOnComplete: true, removeOnFail: 50 }) + .catch((err) => logger.error({ err }, 'failed to schedule retention sweep')); + +const retentionWorker = new Worker( + RETENTION_QUEUE_NAME, + async () => sweepRetention(db), + { connection: createQueueConnection() }, +); +retentionWorker.on('failed', (job, err) => { + logger.error({ jobId: job?.id, err }, 'retention sweep failed'); +}); + // Drain in-flight jobs and close the DB pool on shutdown so a redeploy doesn't drop // work mid-resolution. async function shutdown(): Promise { logger.info('scent-worker shutting down'); await worker.close(); + await retentionWorker.close(); + await retentionQueue.close(); await db.end(); process.exit(0); }