Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ the first consumer-visible behaviour change and will drive the next SDK version
- **Automation-detector wording**: the anti-tamper flag now reads "Anti-tamper signals" (not "Automation detected") when the combined confidence is weak — e.g. devtools open in a dev environment — so a human isn't labelled a bot. The machine-readable `code` (`automation_suspected`) is unchanged; the reason text also drops the `tamper.` prefix for readability.

### Internal
- **Retention sweeper** ([ADR-0004](docs/adr/0004-consent-and-data-lifecycle.md)): a daily BullMQ repeatable job in the worker deletes identities (and, by cascade, their snapshots/drifts/risk/links) whose `last_seen` is older than their project's `retention_days`; projects with a null `retention_days` keep data indefinitely. `sweepRetention()` is idempotent and unit-tested against a real DB.
- **Data-subject endpoints** ([ADR-0004](docs/adr/0004-consent-and-data-lifecycle.md)): `DELETE /v1/identity/:id` (GDPR Art. 17 — erases the identity; snapshots/drifts/risk assessments/account links cascade) and `GET /v1/identity/:id/export` (Art. 20 — the full bundle, including each snapshot's consent provenance). Erasure is strictly key-gated (a non-GET never reaches the route via an admin session); export is readable by key or admin session like the other reads.
- **Consent provenance + IP minimization on the server** ([ADR-0004](docs/adr/0004-consent-and-data-lifecycle.md), migration 012): snapshots now record the lawful basis, consent version, and grant time the SDK forwards (falling back to a per-project `lawful_basis_default`). The stored `client_ip` is **network-truncated by default** (`/24` IPv4, `/48` IPv6 via `minimizeIp`) — still city-accurate for impossible-travel, while the full IP is used only transiently in the request path (so the anonymizer detector is unaffected). A per-project `store_full_ip` flag keeps the full address for operators with a documented basis. New `projects.retention_days` column lands here for the upcoming retention sweeper.
- **Demo recognises a new device on re-observe**: the demo showed the synchronous `POST /v1/resolve` result, then committed via `sdk.flush()` (async ingest). `flush()` returns on enqueue, not on commit, so a first-sight device read as "new" on every click. The demo now polls resolve (bounded) until the commit lands before re-enabling Observe, and the status spells out the eventual-consistency step — so the next click resolves against committed history and recognises the device.
Expand Down
68 changes: 68 additions & 0 deletions packages/server/src/pipeline/retention.integration.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
import { describe, it, expect, beforeAll, afterAll } from 'vitest';
import { migrate } from '../db/migrate.js';
import { db } from '../db/client.js';
import { sweepRetention } from './retention.js';
import { hashApiKey } from '../middleware/api-key.js';

// Gated on DATABASE_URL like the other integration suites (runs in CI, skips locally
// without a DB). See events.integration.test.ts for the rationale.
const hasDb = Boolean(process.env['DATABASE_URL']);
const API_KEY = 'retention-test-key';

let projectId: string; // retention_days = 30

async function seedIdentity(project: string, ageDays: number): Promise<string> {
const id = `ret-${ageDays}d-${crypto.randomUUID()}`;
await db`
INSERT INTO identities (id, project_id, last_seen)
VALUES (${id}, ${project}, now() - make_interval(days => ${ageDays}))
`;
return id;
}

beforeAll(async () => {
if (!hasDb) return;
await migrate();
await db`DELETE FROM projects WHERE api_key_hash IN (${hashApiKey(API_KEY)}, ${hashApiKey(`${API_KEY}-keep`)})`;
const [proj] = await db<{ id: string }[]>`
INSERT INTO projects (api_key_hash, name, retention_days)
VALUES (${hashApiKey(API_KEY)}, 'Retention Test', 30) RETURNING id
`;
projectId = proj!.id;
});

afterAll(async () => {
if (!hasDb) return;
await db`DELETE FROM projects WHERE api_key_hash IN (${hashApiKey(API_KEY)}, ${hashApiKey(`${API_KEY}-keep`)})`;
await db.end();
});

describe.skipIf(!hasDb)('sweepRetention', () => {
it('deletes identities older than retention_days and keeps recent ones', async () => {
const stale = await seedIdentity(projectId, 60); // older than 30d → swept
const fresh = await seedIdentity(projectId, 1); // within 30d → kept

const result = await sweepRetention(db);
expect(result.identitiesDeleted).toBeGreaterThanOrEqual(1);

const rows = await db<{ id: string }[]>`
SELECT id FROM identities WHERE project_id = ${projectId}
`;
const ids = rows.map((r) => r.id);
expect(ids).toContain(fresh);
expect(ids).not.toContain(stale);
});

it('skips projects with null retention_days (keep forever)', async () => {
const [keepProj] = await db<{ id: string }[]>`
INSERT INTO projects (api_key_hash, name)
VALUES (${hashApiKey(`${API_KEY}-keep`)}, 'Keep Forever') RETURNING id
`;
const ancient = await seedIdentity(keepProj!.id, 999);

await sweepRetention(db);

const rows = await db<{ id: string }[]>`SELECT id FROM identities WHERE id = ${ancient}`;
expect(rows.length).toBe(1); // untouched — retention_days is null
});
});
35 changes: 35 additions & 0 deletions packages/server/src/pipeline/retention.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import type { Sql } from 'postgres';
import { logger } from '../logger.js';

// The repeatable retention sweep runs on its own BullMQ queue (separate from ingest).
export const RETENTION_QUEUE_NAME = 'retention';

// Deletes identities whose most recent activity (last_seen) is older than their
// project's retention_days; snapshots, drifts, risk assessments, cluster merges, and
// account links cascade (ON DELETE CASCADE on identity_id). Projects with a null or
// non-positive retention_days keep data indefinitely and are skipped. Idempotent —
// safe to run repeatedly. Returns per-run counts for logging. (GDPR data-lifecycle,
// ADR-0004.)
export async function sweepRetention(
sql: Sql,
): Promise<{ projects: number; identitiesDeleted: number }> {
const projects = await sql<{ id: string; retention_days: number }[]>`
SELECT id, retention_days
FROM projects
WHERE retention_days IS NOT NULL AND retention_days > 0
`;

let identitiesDeleted = 0;
for (const p of projects) {
const deleted = await sql<{ id: string }[]>`
DELETE FROM identities
WHERE project_id = ${p.id}
AND last_seen < now() - make_interval(days => ${p.retention_days})
RETURNING id
`;
identitiesDeleted += deleted.length;
}

logger.info({ projects: projects.length, identitiesDeleted }, 'retention sweep complete');
return { projects: projects.length, identitiesDeleted };
}
22 changes: 21 additions & 1 deletion packages/server/src/worker.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
import { startTracing } from './tracing.js';
startTracing();

import { Worker } from 'bullmq';
import { Queue, Worker } from 'bullmq';
import { createQueueConnection, INGEST_QUEUE_NAME } from './queue/ingest.js';
import type { IngestJobData } from './queue/ingest.js';
import { resolveSnapshot } from './pipeline/resolve.js';
import { sweepRetention, RETENTION_QUEUE_NAME } from './pipeline/retention.js';
import { db } from './db/client.js';
// Imported after ./tracing.js so the OTel pino instrumentation patches pino and
// injects trace_id/span_id into log lines — same ordering rationale as index.ts.
Expand Down Expand Up @@ -36,11 +37,30 @@ worker.on('failed', (job, err) => {
);
});

// Daily retention sweep (GDPR data-lifecycle, ADR-0004). A repeatable job is enqueued
// idempotently (BullMQ dedupes by repeat key, so re-adding on every boot is safe) and
// processed by a dedicated worker that deletes data past each project's retention_days.
const retentionQueue = new Queue(RETENTION_QUEUE_NAME, { connection: createQueueConnection() });
void retentionQueue
.add('sweep', {}, { repeat: { pattern: '30 3 * * *' }, removeOnComplete: true, removeOnFail: 50 })
.catch((err) => logger.error({ err }, 'failed to schedule retention sweep'));

const retentionWorker = new Worker(
RETENTION_QUEUE_NAME,
async () => sweepRetention(db),
{ connection: createQueueConnection() },
);
retentionWorker.on('failed', (job, err) => {
logger.error({ jobId: job?.id, err }, 'retention sweep failed');
});

// Drain in-flight jobs and close the DB pool on shutdown so a redeploy doesn't drop
// work mid-resolution.
async function shutdown(): Promise<void> {
logger.info('scent-worker shutting down');
await worker.close();
await retentionWorker.close();
await retentionQueue.close();
await db.end();
process.exit(0);
}
Expand Down
Loading