diff --git a/apps/api/package.json b/apps/api/package.json index 3c54f470..c24a7e1f 100644 --- a/apps/api/package.json +++ b/apps/api/package.json @@ -39,6 +39,7 @@ "seed:demo": "ts-node -r tsconfig-paths/register scripts/seed-demo-data.ts" }, "dependencies": { + "@betterdb/iovalkey-capture": "workspace:*", "@betterdb/shared": "workspace:*", "@fastify/static": "^9.1.1", "@fastify/swagger": "^9.7.0", diff --git a/apps/api/src/app.module.ts b/apps/api/src/app.module.ts index 3f5f02dd..8862d9f8 100644 --- a/apps/api/src/app.module.ts +++ b/apps/api/src/app.module.ts @@ -25,6 +25,7 @@ import { CliModule } from './cli/cli.module'; import { PosthogProxyModule } from './posthog-proxy/posthog-proxy.module'; import { SystemModule } from './system/system.module'; import { MonitorModule } from './monitor/monitor.module'; +import { CommandCaptureModule } from './command-capture/command-capture.module'; let AiModule: any = null; let LicenseModule: any = null; @@ -151,6 +152,7 @@ const baseImports = [ PosthogProxyModule, SystemModule, MonitorModule, + CommandCaptureModule, ]; const proprietaryImports = [ diff --git a/apps/api/src/command-capture/__tests__/command-capture.service.spec.ts b/apps/api/src/command-capture/__tests__/command-capture.service.spec.ts new file mode 100644 index 00000000..5b7de198 --- /dev/null +++ b/apps/api/src/command-capture/__tests__/command-capture.service.spec.ts @@ -0,0 +1,197 @@ +import { CommandCaptureService } from '../command-capture.service'; +import { MemoryAdapter } from '../../storage/adapters/memory.adapter'; + +describe('CommandCaptureService', () => { + let service: CommandCaptureService; + let storage: MemoryAdapter; + + beforeEach(async () => { + storage = new MemoryAdapter(); + await storage.initialize(); + service = new CommandCaptureService(storage as any); + }); + + describe('startSession', () => { + it('creates an active session', async () => { + const session = await service.startSession({ + connectionId: 'conn-1', + durationMs: 60_000, + createdBy: 'test-user', + }); + + expect(session.id).toBeDefined(); + expect(session.connectionId).toBe('conn-1'); + expect(session.status).toBe('active'); + expect(session.durationMs).toBe(60_000); + expect(session.expiresAt).toBeGreaterThan(Date.now() - 1000); + expect(session.commandCount).toBe(0); + expect(session.createdBy).toBe('test-user'); + }); + + it('rejects a second active session for the same instance', async () => { + await service.startSession({ connectionId: 'conn-1', durationMs: 60_000 }); + await expect( + service.startSession({ connectionId: 'conn-1', durationMs: 60_000 }), + ).rejects.toThrow(/already exists/); + }); + + it('allows a new session after the first expires', async () => { + const first = await service.startSession({ connectionId: 'conn-1', durationMs: 1 }); + // Manually expire it + await storage.updateCommandCaptureSession(first.id, { + status: 'expired', + }); + const second = await service.startSession({ connectionId: 'conn-1', durationMs: 60_000 }); + expect(second.id).not.toBe(first.id); + expect(second.status).toBe('active'); + }); + }); + + describe('stopSession', () => { + it('marks the active session as stopped', async () => { + await service.startSession({ connectionId: 'conn-1', durationMs: 60_000 }); + const stopped = await service.stopSession('conn-1'); + expect(stopped).not.toBeNull(); + expect(stopped!.status).toBe('stopped'); + expect(stopped!.stoppedAt).toBeDefined(); + }); + + it('returns null when no active session exists', async () => { + const result = await service.stopSession('nonexistent'); + expect(result).toBeNull(); + }); + }); + + describe('getActiveWindow (poll)', () => { + it('returns active with caps for an active session', async () => { + await service.startSession({ + connectionId: 'conn-1', + durationMs: 60_000, + commandCap: 5000, + }); + + const window = await service.getActiveWindow('conn-1'); + expect(window.active).toBe(true); + expect(window.maxCommands).toBe(5000); + expect(window.maxDurationMs).toBeGreaterThan(0); + expect(window.maxDurationMs).toBeLessThanOrEqual(60_000); + }); + + it('returns inactive when no session exists', async () => { + const window = await service.getActiveWindow('conn-1'); + expect(window.active).toBe(false); + }); + + it('returns inactive for expired session', async () => { + const session = await service.startSession({ + connectionId: 'conn-1', + durationMs: 1, + }); + // Force expiry by patching expiresAt + await storage.updateCommandCaptureSession(session.id, { + status: 'active', // still "active" in DB but past expiresAt + }); + // Override expiresAt in storage + const stored = await storage.getCommandCaptureSession(session.id); + if (stored) { + (stored as any).expiresAt = Date.now() - 1000; + } + + const window = await service.getActiveWindow('conn-1'); + expect(window.active).toBe(false); + }); + }); + + describe('getActiveSessions (user-facing status read)', () => { + it('returns the active session', async () => { + await service.startSession({ connectionId: 'conn-1', durationMs: 60_000 }); + const sessions = await service.getActiveSessions('conn-1'); + expect(sessions).toHaveLength(1); + expect(sessions[0].status).toBe('active'); + expect(sessions[0].connectionId).toBe('conn-1'); + }); + + it('returns empty when no active session', async () => { + const sessions = await service.getActiveSessions('conn-1'); + expect(sessions).toEqual([]); + }); + }); + + describe('ingestBatch', () => { + it('persists commands for an active session', async () => { + await service.startSession({ connectionId: 'conn-1', durationMs: 60_000 }); + + const result = await service.ingestBatch('conn-1', { + connectionId: 'wrapper-uuid', + commands: [ + { connectionId: 'wrapper-uuid', name: 'SET', args: ['key1', 'val1'], ts: Date.now() }, + { connectionId: 'wrapper-uuid', name: 'GET', args: ['key1'], ts: Date.now() }, + ], + }); + + expect(result.accepted).toBe(2); + expect(result.dropped).toBe(false); + }); + + it('discards commands for an expired session (with grace)', async () => { + const session = await service.startSession({ connectionId: 'conn-1', durationMs: 1 }); + // Force expiry well past the grace window + const stored = await storage.getCommandCaptureSession(session.id); + if (stored) (stored as any).expiresAt = Date.now() - 10_000; + + const result = await service.ingestBatch('conn-1', { + connectionId: 'wrapper-uuid', + commands: [ + { connectionId: 'wrapper-uuid', name: 'SET', args: ['x', 'y'], ts: Date.now() }, + ], + }); + + expect(result.accepted).toBe(0); + expect(result.dropped).toBe(true); + }); + + it('discards commands when command cap is reached', async () => { + const session = await service.startSession({ connectionId: 'conn-1', durationMs: 60_000, commandCap: 5 }); + // Manually set commandCount to cap + await storage.updateCommandCaptureSession(session.id, { commandCount: 5 }); + + const result = await service.ingestBatch('conn-1', { + connectionId: 'wrapper-uuid', + commands: [ + { connectionId: 'wrapper-uuid', name: 'SET', args: ['a', 'b'], ts: Date.now() }, + ], + }); + + expect(result.accepted).toBe(0); + expect(result.dropped).toBe(true); + }); + + it('discards commands when no active session exists', async () => { + const result = await service.ingestBatch('conn-1', { + connectionId: 'wrapper-uuid', + commands: [ + { connectionId: 'wrapper-uuid', name: 'SET', args: ['x', 'y'], ts: Date.now() }, + ], + }); + + expect(result.accepted).toBe(0); + expect(result.dropped).toBe(true); + }); + }); + + describe('pruneOldRecords', () => { + it('removes records older than retention', async () => { + await service.startSession({ connectionId: 'conn-1', durationMs: 60_000 }); + + // Insert old records directly into storage + const oldTs = Date.now() - 4 * 24 * 60 * 60 * 1000; // 4 days ago + await storage.saveCommandCaptureRecords([ + { sessionId: 'old-session', connectionId: 'conn-1', wrapperConnectionId: 'w1', name: 'SET', args: ['a', 'b'], ts: oldTs }, + { sessionId: 'old-session', connectionId: 'conn-1', wrapperConnectionId: 'w1', name: 'GET', args: ['a'], ts: Date.now() }, + ]); + + const result = await service.pruneOldRecords(3); + expect(result.records).toBe(1); // only the old one + }); + }); +}); diff --git a/apps/api/src/command-capture/command-capture.controller.ts b/apps/api/src/command-capture/command-capture.controller.ts new file mode 100644 index 00000000..0077c844 --- /dev/null +++ b/apps/api/src/command-capture/command-capture.controller.ts @@ -0,0 +1,129 @@ +import { + BadRequestException, + Body, + Controller, + Get, + HttpCode, + Logger, + NotFoundException, + Param, + Post, + Query, + UseGuards, +} from '@nestjs/common'; +import { AgentTokenGuard } from '../common/guards/agent-token.guard'; +import { ConnectionRegistry } from '../connections/connection-registry.service'; +import { CommandCaptureService } from './command-capture.service'; +import type { CaptureBatchRequest, CaptureWindowResponse } from '@betterdb/iovalkey-capture'; + +/** + * Wrapper-facing endpoints for iovalkey-capture poll and ingest. + * Authenticated by the MCP/agent token guard (same as MCP controller). + * Instance authorization: validates the instanceId exists in the connection + * registry (matches existing MCP endpoint pattern — tokens are not scoped + * to specific instances today). + */ +@Controller('api/capture/instance/:instanceId') +@UseGuards(AgentTokenGuard) +export class CommandCaptureController { + private readonly logger = new Logger(CommandCaptureController.name); + + constructor( + private readonly captureService: CommandCaptureService, + private readonly registry: ConnectionRegistry, + ) {} + + /** Validate instanceId is a known connection. Throws NotFoundException if not. */ + private assertInstanceExists(instanceId: string): void { + // registry.get() throws NotFoundException if not found + this.registry.get(instanceId); + } + + /** Poll: wrapper asks if a capture window is active for this instance. */ + @Get('window') + async getWindow( + @Param('instanceId') instanceId: string, + ): Promise { + this.assertInstanceExists(instanceId); + return this.captureService.getActiveWindow(instanceId); + } + + /** Ingest: wrapper posts a batch of captured commands. */ + @Post('batch') + @HttpCode(200) + async ingestBatch( + @Param('instanceId') instanceId: string, + @Body() body: CaptureBatchRequest, + ): Promise<{ accepted: number; dropped: boolean }> { + this.assertInstanceExists(instanceId); + if (!body || !Array.isArray(body.commands)) { + throw new BadRequestException('Invalid batch: commands array required'); + } + return this.captureService.ingestBatch(instanceId, body); + } +} + +/** + * User-facing endpoints to start/stop command capture sessions. + * Uses the same auth pattern as the monitor controller. + */ +@Controller('api/command-capture') +export class CommandCaptureAdminController { + private readonly logger = new Logger(CommandCaptureAdminController.name); + + constructor(private readonly captureService: CommandCaptureService) {} + + /** Get the active command capture session for a connection, or null. */ + @Get('status') + async status( + @Query('connectionId') connectionId: string, + ) { + if (!connectionId) { + throw new BadRequestException('connectionId query param is required'); + } + return this.captureService.getActiveWindow(connectionId); + } + + /** Get the active session entity with full details (commandCount, etc). */ + @Get('session') + async session( + @Query('connectionId') connectionId: string, + ) { + if (!connectionId) { + throw new BadRequestException('connectionId query param is required'); + } + const sessions = await this.captureService.getActiveSessions(connectionId); + return sessions[0] ?? null; + } + + @Post('start') + async start( + @Body() body: { connectionId: string; durationMs: number; commandCap?: number; createdBy?: string }, + ) { + if (!body.connectionId || !body.durationMs) { + throw new BadRequestException('connectionId and durationMs are required'); + } + if (body.durationMs <= 0 || body.durationMs > 24 * 60 * 60 * 1000) { + throw new BadRequestException('durationMs must be between 1 and 86400000 (24h)'); + } + try { + return await this.captureService.startSession(body); + } catch (e) { + throw new BadRequestException((e as Error).message); + } + } + + @Post('stop') + async stop( + @Body() body: { connectionId: string }, + ) { + if (!body.connectionId) { + throw new BadRequestException('connectionId is required'); + } + const session = await this.captureService.stopSession(body.connectionId); + if (!session) { + return { stopped: false, message: 'No active session found' }; + } + return { stopped: true, session }; + } +} diff --git a/apps/api/src/command-capture/command-capture.module.ts b/apps/api/src/command-capture/command-capture.module.ts new file mode 100644 index 00000000..56b9b4c8 --- /dev/null +++ b/apps/api/src/command-capture/command-capture.module.ts @@ -0,0 +1,13 @@ +import { Module } from '@nestjs/common'; +import { StorageModule } from '../storage/storage.module'; +import { ConnectionsModule } from '../connections/connections.module'; +import { CommandCaptureService } from './command-capture.service'; +import { CommandCaptureController, CommandCaptureAdminController } from './command-capture.controller'; + +@Module({ + imports: [StorageModule, ConnectionsModule], + controllers: [CommandCaptureController, CommandCaptureAdminController], + providers: [CommandCaptureService], + exports: [CommandCaptureService], +}) +export class CommandCaptureModule {} diff --git a/apps/api/src/command-capture/command-capture.service.ts b/apps/api/src/command-capture/command-capture.service.ts new file mode 100644 index 00000000..928f8658 --- /dev/null +++ b/apps/api/src/command-capture/command-capture.service.ts @@ -0,0 +1,205 @@ +import { Inject, Injectable, Logger } from '@nestjs/common'; +import { randomUUID } from 'crypto'; +import type { StoragePort } from '../common/interfaces/storage-port.interface'; +import type { + StoredCommandCaptureSession, + StoredCommandCaptureRecord, + CommandCaptureSessionStatus, +} from '@betterdb/shared'; +import type { + CaptureWindowResponse, + CaptureBatchRequest, +} from '@betterdb/iovalkey-capture'; + +const DEFAULT_RETENTION_DAYS = 3; +const PRUNE_INTERVAL_MS = 60 * 60 * 1000; // prune at most once per hour +/** Grace window for in-flight batches sent just before expiry (covers wrapper→Monitor network latency). */ +const EXPIRY_GRACE_MS = 5_000; + +@Injectable() +export class CommandCaptureService { + private readonly logger = new Logger(CommandCaptureService.name); + private lastPruneAt = 0; + + constructor( + @Inject('STORAGE_CLIENT') private readonly storage: StoragePort, + ) {} + + // -- Session lifecycle (user-facing) -- + + async startSession(input: { + connectionId: string; + durationMs: number; + commandCap?: number; + createdBy?: string; + }): Promise { + const existing = await this.storage.getCommandCaptureSessions({ + connectionId: input.connectionId, + status: 'active', + limit: 1, + }); + + // Lazily expire any session past its expiresAt + for (const session of existing) { + if (session.expiresAt <= Date.now()) { + await this.storage.updateCommandCaptureSession(session.id, { + status: 'expired', + }); + } + } + + // Re-check for truly active sessions + const stillActive = await this.storage.getCommandCaptureSessions({ + connectionId: input.connectionId, + status: 'active', + limit: 1, + }); + const live = stillActive.filter((s) => s.expiresAt > Date.now()); + if (live.length > 0) { + throw new Error(`An active command capture session already exists for connection ${input.connectionId}`); + } + + const now = Date.now(); + const session: StoredCommandCaptureSession = { + id: randomUUID(), + connectionId: input.connectionId, + status: 'active', + startedAt: now, + durationMs: input.durationMs, + expiresAt: now + input.durationMs, + commandCap: input.commandCap, + commandCount: 0, + createdBy: input.createdBy, + }; + + await this.storage.saveCommandCaptureSession(session); + this.logger.log( + `Started command capture session ${session.id} for connection ${input.connectionId} ` + + `(duration=${input.durationMs}ms, cap=${input.commandCap ?? 'none'})`, + ); + return session; + } + + async stopSession(connectionId: string): Promise { + const sessions = await this.storage.getCommandCaptureSessions({ + connectionId, + status: 'active', + limit: 1, + }); + const active = sessions.find((s) => s.expiresAt > Date.now()); + if (!active) return null; + + await this.storage.updateCommandCaptureSession(active.id, { + status: 'stopped', + stoppedAt: Date.now(), + }); + this.logger.log(`Stopped command capture session ${active.id}`); + return { ...active, status: 'stopped', stoppedAt: Date.now() }; + } + + // -- Status read (user-facing) -- + + async getActiveSessions(connectionId: string): Promise { + const sessions = await this.storage.getCommandCaptureSessions({ + connectionId, + status: 'active', + limit: 1, + }); + // Lazily expire + const live: StoredCommandCaptureSession[] = []; + for (const session of sessions) { + if (session.expiresAt <= Date.now()) { + await this.storage.updateCommandCaptureSession(session.id, { status: 'expired' }); + } else { + live.push(session); + } + } + return live; + } + + // -- Poll (wrapper-facing) -- + + async getActiveWindow(connectionId: string): Promise { + const sessions = await this.storage.getCommandCaptureSessions({ + connectionId, + status: 'active', + limit: 1, + }); + + for (const session of sessions) { + if (session.expiresAt <= Date.now()) { + await this.storage.updateCommandCaptureSession(session.id, { status: 'expired' }); + continue; + } + if (session.commandCap && session.commandCount >= session.commandCap) { + await this.storage.updateCommandCaptureSession(session.id, { status: 'stopped', stoppedAt: Date.now() }); + continue; + } + return { + active: true, + maxCommands: session.commandCap, + maxDurationMs: session.expiresAt - Date.now(), + expiresAt: session.expiresAt, + }; + } + + return { active: false }; + } + + // -- Ingest (wrapper-facing) -- + + async ingestBatch(connectionId: string, batch: CaptureBatchRequest): Promise<{ accepted: number; dropped: boolean }> { + const sessions = await this.storage.getCommandCaptureSessions({ + connectionId, + status: 'active', + limit: 1, + }); + const now = Date.now(); + const active = sessions.find((s) => s.expiresAt + EXPIRY_GRACE_MS > now); + + if (!active) { + this.logger.debug(`Ingest for connection ${connectionId}: no active/unexpired session, discarding ${batch.commands.length} commands`); + return { accepted: 0, dropped: true }; + } + + // Command cap enforcement — reject if already at cap + if (active.commandCap && active.commandCount >= active.commandCap) { + this.logger.debug(`Ingest for connection ${connectionId}: command cap reached (${active.commandCount}/${active.commandCap}), discarding`); + return { accepted: 0, dropped: true }; + } + + const records: StoredCommandCaptureRecord[] = batch.commands.map((cmd) => ({ + sessionId: active.id, + connectionId, + wrapperConnectionId: batch.connectionId, + name: cmd.name, + args: cmd.args, + ts: cmd.ts, + })); + + const saved = await this.storage.saveCommandCaptureRecords(records); + await this.storage.updateCommandCaptureSession(active.id, { + commandCount: active.commandCount + saved, + }); + + // Inline prune, throttled to at most once per PRUNE_INTERVAL_MS + const pruneNow = Date.now(); + if (pruneNow - this.lastPruneAt > PRUNE_INTERVAL_MS) { + this.lastPruneAt = pruneNow; + this.pruneOldRecords().catch((err) => + this.logger.warn(`Prune failed: ${(err as Error).message}`), + ); + } + + return { accepted: saved, dropped: false }; + } + + // -- Retention -- + + async pruneOldRecords(retentionDays: number = DEFAULT_RETENTION_DAYS): Promise<{ records: number; sessions: number }> { + const cutoff = Date.now() - retentionDays * 24 * 60 * 60 * 1000; + const records = await this.storage.pruneOldCommandCaptureRecords(cutoff); + const sessions = await this.storage.pruneOldCommandCaptureSessions(cutoff); + return { records, sessions }; + } +} diff --git a/apps/api/src/common/interfaces/storage-port.interface.ts b/apps/api/src/common/interfaces/storage-port.interface.ts index be865464..6a1e0d74 100644 --- a/apps/api/src/common/interfaces/storage-port.interface.ts +++ b/apps/api/src/common/interfaces/storage-port.interface.ts @@ -61,6 +61,17 @@ export type { ScheduledCaptureQueryOptions, ScheduledCapturePatch, } from '@betterdb/shared'; +export type { + CommandCaptureSessionStatus, + StoredCommandCaptureSession, + CommandCaptureSessionQueryOptions, + StoredCommandCaptureRecord, +} from '@betterdb/shared'; +import type { + StoredCommandCaptureSession, + CommandCaptureSessionQueryOptions, + StoredCommandCaptureRecord, +} from '@betterdb/shared'; import type { AppSettings, AuditQueryOptions, @@ -503,6 +514,15 @@ export interface StoragePort { pruneOldCaptureTriggers(cutoffTimestamp: number): Promise; pruneOldScheduledCaptures(cutoffTimestamp: number): Promise; + // Command Capture (iovalkey-capture wrapper) — distinct from MONITOR stream capture above + saveCommandCaptureSession(session: StoredCommandCaptureSession): Promise; + getCommandCaptureSession(id: string): Promise; + getCommandCaptureSessions(options?: CommandCaptureSessionQueryOptions): Promise; + updateCommandCaptureSession(id: string, patch: Partial>): Promise; + saveCommandCaptureRecords(records: StoredCommandCaptureRecord[]): Promise; + pruneOldCommandCaptureRecords(cutoffTimestamp: number): Promise; + pruneOldCommandCaptureSessions(cutoffTimestamp: number): Promise; + // Connection Management Methods (not connection-scoped, they manage connections themselves) saveConnection(config: DatabaseConnectionConfig): Promise; getConnections(): Promise; diff --git a/apps/api/src/storage/adapters/__tests__/command-capture-storage.spec.ts b/apps/api/src/storage/adapters/__tests__/command-capture-storage.spec.ts new file mode 100644 index 00000000..f9d20d19 --- /dev/null +++ b/apps/api/src/storage/adapters/__tests__/command-capture-storage.spec.ts @@ -0,0 +1,136 @@ +import * as fs from 'fs'; +import * as os from 'os'; +import * as path from 'path'; +import { randomUUID } from 'crypto'; +import { MemoryAdapter } from '../memory.adapter'; +import { SqliteAdapter } from '../sqlite.adapter'; +import type { + StoredCommandCaptureSession, + StoredCommandCaptureRecord, +} from '../../../common/interfaces/storage-port.interface'; + +const CONNECTION_ID = 'conn-test'; + +function makeSession(overrides: Partial = {}): StoredCommandCaptureSession { + const now = Date.now(); + return { + id: randomUUID(), + connectionId: CONNECTION_ID, + status: 'active', + startedAt: now, + durationMs: 60_000, + expiresAt: now + 60_000, + commandCount: 0, + ...overrides, + }; +} + +function makeRecords(sessionId: string, count: number, tsBase = Date.now()): StoredCommandCaptureRecord[] { + return Array.from({ length: count }, (_, i) => ({ + sessionId, + connectionId: CONNECTION_ID, + wrapperConnectionId: 'wrapper-1', + name: 'SET', + args: [`key${i}`, `val${i}`], + ts: tsBase + i, + })); +} + +describe.each([ + ['MemoryAdapter', () => { + const adapter = new MemoryAdapter(); + return { adapter, cleanup: async () => {} }; + }], + ['SqliteAdapter', () => { + const tmpDir = fs.mkdtempSync(path.join(os.tmpdir(), 'cmd-capture-test-')); + const dbPath = path.join(tmpDir, 'test.db'); + const adapter = new SqliteAdapter({ filepath: dbPath }); + return { + adapter, + cleanup: async () => { + try { fs.rmSync(tmpDir, { recursive: true }); } catch { /* ignore */ } + }, + }; + }], +])('%s: command capture storage', (_name, factory) => { + let adapter: MemoryAdapter | SqliteAdapter; + let cleanup: () => Promise; + + beforeAll(async () => { + const ctx = factory(); + adapter = ctx.adapter; + cleanup = ctx.cleanup; + await adapter.initialize(); + }); + + afterAll(async () => { + await cleanup(); + }); + + it('saves and retrieves a session', async () => { + const session = makeSession(); + await adapter.saveCommandCaptureSession(session); + const got = await adapter.getCommandCaptureSession(session.id); + expect(got).not.toBeNull(); + expect(got!.id).toBe(session.id); + expect(got!.connectionId).toBe(CONNECTION_ID); + expect(got!.status).toBe('active'); + }); + + it('queries sessions by connectionId and status', async () => { + const s1 = makeSession({ connectionId: 'q-conn', status: 'active' }); + const s2 = makeSession({ connectionId: 'q-conn', status: 'stopped' }); + await adapter.saveCommandCaptureSession(s1); + await adapter.saveCommandCaptureSession(s2); + + const active = await adapter.getCommandCaptureSessions({ connectionId: 'q-conn', status: 'active' }); + expect(active.length).toBe(1); + expect(active[0].id).toBe(s1.id); + }); + + it('updates session fields', async () => { + const session = makeSession(); + await adapter.saveCommandCaptureSession(session); + await adapter.updateCommandCaptureSession(session.id, { status: 'stopped', stoppedAt: Date.now(), commandCount: 42 }); + const got = await adapter.getCommandCaptureSession(session.id); + expect(got!.status).toBe('stopped'); + expect(got!.commandCount).toBe(42); + expect(got!.stoppedAt).toBeDefined(); + }); + + it('bulk writes and reads back records', async () => { + const session = makeSession(); + await adapter.saveCommandCaptureSession(session); + const records = makeRecords(session.id, 100); + const saved = await adapter.saveCommandCaptureRecords(records); + expect(saved).toBe(100); + }); + + it('prunes records by timestamp', async () => { + const oldTs = Date.now() - 10 * 24 * 60 * 60 * 1000; // 10 days ago + const session = makeSession({ startedAt: oldTs, expiresAt: oldTs + 60_000 }); + await adapter.saveCommandCaptureSession(session); + + const oldRecords = makeRecords(session.id, 5, oldTs); + const newRecords = makeRecords(session.id, 3, Date.now()); + await adapter.saveCommandCaptureRecords([...oldRecords, ...newRecords]); + + const cutoff = Date.now() - 5 * 24 * 60 * 60 * 1000; // 5 days ago + const pruned = await adapter.pruneOldCommandCaptureRecords(cutoff); + expect(pruned).toBe(5); + }); + + it('prunes sessions by timestamp (non-active only)', async () => { + const oldTs = Date.now() - 10 * 24 * 60 * 60 * 1000; + const old1 = makeSession({ startedAt: oldTs, status: 'stopped', connectionId: 'prune-conn' }); + const old2 = makeSession({ startedAt: oldTs, status: 'active', connectionId: 'prune-conn' }); // active = kept + const recent = makeSession({ status: 'stopped', connectionId: 'prune-conn' }); // recent = kept + await adapter.saveCommandCaptureSession(old1); + await adapter.saveCommandCaptureSession(old2); + await adapter.saveCommandCaptureSession(recent); + + const cutoff = Date.now() - 5 * 24 * 60 * 60 * 1000; + const pruned = await adapter.pruneOldCommandCaptureSessions(cutoff); + expect(pruned).toBe(1); // only old1 + }); +}); diff --git a/apps/api/src/storage/adapters/memory.adapter.ts b/apps/api/src/storage/adapters/memory.adapter.ts index d9392735..dea40c8e 100644 --- a/apps/api/src/storage/adapters/memory.adapter.ts +++ b/apps/api/src/storage/adapters/memory.adapter.ts @@ -39,6 +39,9 @@ import { CaptureSessionQueryOptions, StoredCaptureChunk, CaptureSessionPatch, + StoredCommandCaptureSession, + CommandCaptureSessionQueryOptions, + StoredCommandCaptureRecord, StoredCaptureTrigger, CaptureTriggerQueryOptions, CaptureTriggerPatch, @@ -1357,6 +1360,8 @@ export class MemoryAdapter implements StoragePort { private captureChunks: StoredCaptureChunk[] = []; private captureTriggers: Map = new Map(); private scheduledCaptures: Map = new Map(); + private commandCaptureSessions: Map = new Map(); + private commandCaptureRecords: StoredCommandCaptureRecord[] = []; private cloneProposal(p: StoredCacheProposal): StoredCacheProposal { @@ -1696,4 +1701,62 @@ export class MemoryAdapter implements StoragePort { } return pruned; } + + // -- Command Capture (iovalkey-capture wrapper) -- + + async saveCommandCaptureSession(session: StoredCommandCaptureSession): Promise { + this.commandCaptureSessions.set(session.id, { ...session }); + return session.id; + } + + async getCommandCaptureSession(id: string): Promise { + return this.commandCaptureSessions.get(id) ?? null; + } + + async getCommandCaptureSessions(options?: CommandCaptureSessionQueryOptions): Promise { + let sessions = Array.from(this.commandCaptureSessions.values()); + if (options?.connectionId) { + sessions = sessions.filter((s) => s.connectionId === options.connectionId); + } + if (options?.status) { + sessions = sessions.filter((s) => s.status === options.status); + } + sessions.sort((a, b) => b.startedAt - a.startedAt); + if (options?.limit) { + sessions = sessions.slice(0, options.limit); + } + return sessions; + } + + async updateCommandCaptureSession( + id: string, + patch: Partial>, + ): Promise { + const session = this.commandCaptureSessions.get(id); + if (!session) return false; + Object.assign(session, patch); + return true; + } + + async saveCommandCaptureRecords(records: StoredCommandCaptureRecord[]): Promise { + this.commandCaptureRecords.push(...records); + return records.length; + } + + async pruneOldCommandCaptureRecords(cutoffTimestamp: number): Promise { + const before = this.commandCaptureRecords.length; + this.commandCaptureRecords = this.commandCaptureRecords.filter((r) => r.ts >= cutoffTimestamp); + return before - this.commandCaptureRecords.length; + } + + async pruneOldCommandCaptureSessions(cutoffTimestamp: number): Promise { + let pruned = 0; + for (const [id, session] of this.commandCaptureSessions) { + if (session.startedAt < cutoffTimestamp && session.status !== 'active') { + this.commandCaptureSessions.delete(id); + pruned++; + } + } + return pruned; + } } diff --git a/apps/api/src/storage/adapters/postgres.adapter.ts b/apps/api/src/storage/adapters/postgres.adapter.ts index ec7c90da..f0fbbf71 100644 --- a/apps/api/src/storage/adapters/postgres.adapter.ts +++ b/apps/api/src/storage/adapters/postgres.adapter.ts @@ -46,6 +46,9 @@ import { StoredScheduledCapture, ScheduledCaptureQueryOptions, ScheduledCapturePatch, + StoredCommandCaptureSession, + CommandCaptureSessionQueryOptions, + StoredCommandCaptureRecord, } from '../../common/interfaces/storage-port.interface'; import type { ActorSource, @@ -1720,6 +1723,36 @@ export class PostgresAdapter implements StoragePort { CREATE INDEX IF NOT EXISTS idx_scheduled_captures_conn_status ON scheduled_captures(connection_id, status); + -- Command Capture (iovalkey-capture wrapper) + CREATE TABLE IF NOT EXISTS command_capture_sessions ( + id TEXT PRIMARY KEY, + connection_id TEXT NOT NULL, + status TEXT NOT NULL DEFAULT 'active', + started_at BIGINT NOT NULL, + duration_ms BIGINT NOT NULL, + expires_at BIGINT NOT NULL, + stopped_at BIGINT, + command_cap INTEGER, + command_count INTEGER NOT NULL DEFAULT 0, + created_by TEXT + ); + CREATE INDEX IF NOT EXISTS idx_cmd_capture_sessions_conn_status + ON command_capture_sessions(connection_id, status); + + CREATE TABLE IF NOT EXISTS command_capture_records ( + id BIGSERIAL PRIMARY KEY, + session_id TEXT NOT NULL, + connection_id TEXT NOT NULL, + wrapper_connection_id TEXT NOT NULL, + name TEXT NOT NULL, + args JSONB NOT NULL DEFAULT '[]', + ts BIGINT NOT NULL + ); + CREATE INDEX IF NOT EXISTS idx_cmd_capture_records_session + ON command_capture_records(session_id); + CREATE INDEX IF NOT EXISTS idx_cmd_capture_records_ts + ON command_capture_records(ts); + -- Idempotent migration for deployments that ran the PR 19 schema ALTER TABLE scheduled_captures ADD COLUMN IF NOT EXISTS cron_expression TEXT; @@ -4430,6 +4463,100 @@ export class PostgresAdapter implements StoragePort { nodeId: (row.node_id as string | null) ?? undefined, })); } + + // -- Command Capture (iovalkey-capture wrapper) -- + + async saveCommandCaptureSession(session: StoredCommandCaptureSession): Promise { + if (!this.pool) throw new Error('Database not initialized'); + await this.pool.query(` + INSERT INTO command_capture_sessions (id, connection_id, status, started_at, duration_ms, expires_at, stopped_at, command_cap, command_count, created_by) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10) + `, [ + session.id, session.connectionId, session.status, session.startedAt, + session.durationMs, session.expiresAt, session.stoppedAt ?? null, + session.commandCap ?? null, session.commandCount, session.createdBy ?? null, + ]); + return session.id; + } + + async getCommandCaptureSession(id: string): Promise { + if (!this.pool) throw new Error('Database not initialized'); + const { rows } = await this.pool.query('SELECT * FROM command_capture_sessions WHERE id = $1', [id]); + return rows.length > 0 ? this.mapCommandCaptureSessionRow(rows[0]) : null; + } + + async getCommandCaptureSessions(options?: CommandCaptureSessionQueryOptions): Promise { + if (!this.pool) throw new Error('Database not initialized'); + const conditions: string[] = []; + const params: unknown[] = []; + let paramIdx = 1; + if (options?.connectionId) { conditions.push(`connection_id = $${paramIdx++}`); params.push(options.connectionId); } + if (options?.status) { conditions.push(`status = $${paramIdx++}`); params.push(options.status); } + const where = conditions.length > 0 ? `WHERE ${conditions.join(' AND ')}` : ''; + const limit = options?.limit ? `LIMIT ${options.limit}` : ''; + const { rows } = await this.pool.query(`SELECT * FROM command_capture_sessions ${where} ORDER BY started_at DESC ${limit}`, params); + return rows.map((r: Record) => this.mapCommandCaptureSessionRow(r)); + } + + async updateCommandCaptureSession( + id: string, + patch: Partial>, + ): Promise { + if (!this.pool) throw new Error('Database not initialized'); + const sets: string[] = []; + const params: unknown[] = []; + let paramIdx = 1; + if (patch.status !== undefined) { sets.push(`status = $${paramIdx++}`); params.push(patch.status); } + if (patch.stoppedAt !== undefined) { sets.push(`stopped_at = $${paramIdx++}`); params.push(patch.stoppedAt); } + if (patch.commandCount !== undefined) { sets.push(`command_count = $${paramIdx++}`); params.push(patch.commandCount); } + if (sets.length === 0) return false; + params.push(id); + const result = await this.pool.query(`UPDATE command_capture_sessions SET ${sets.join(', ')} WHERE id = $${paramIdx}`, params); + return (result.rowCount ?? 0) > 0; + } + + async saveCommandCaptureRecords(records: StoredCommandCaptureRecord[]): Promise { + if (!this.pool || records.length === 0) return 0; + const values: unknown[] = []; + const placeholders: string[] = []; + let paramIdx = 1; + for (const r of records) { + placeholders.push(`($${paramIdx++}, $${paramIdx++}, $${paramIdx++}, $${paramIdx++}, $${paramIdx++}, $${paramIdx++})`); + values.push(r.sessionId, r.connectionId, r.wrapperConnectionId, r.name, JSON.stringify(r.args), r.ts); + } + const result = await this.pool.query(` + INSERT INTO command_capture_records (session_id, connection_id, wrapper_connection_id, name, args, ts) + VALUES ${placeholders.join(', ')} + `, values); + return result.rowCount ?? 0; + } + + async pruneOldCommandCaptureRecords(cutoffTimestamp: number): Promise { + if (!this.pool) throw new Error('Database not initialized'); + const result = await this.pool.query('DELETE FROM command_capture_records WHERE ts < $1', [cutoffTimestamp]); + return result.rowCount ?? 0; + } + + async pruneOldCommandCaptureSessions(cutoffTimestamp: number): Promise { + if (!this.pool) throw new Error('Database not initialized'); + const result = await this.pool.query("DELETE FROM command_capture_sessions WHERE started_at < $1 AND status != 'active'", [cutoffTimestamp]); + return result.rowCount ?? 0; + } + + private mapCommandCaptureSessionRow(row: Record): StoredCommandCaptureSession { + return { + id: row.id as string, + connectionId: row.connection_id as string, + status: row.status as StoredCommandCaptureSession['status'], + startedAt: Number(row.started_at), + durationMs: Number(row.duration_ms), + expiresAt: Number(row.expires_at), + stoppedAt: row.stopped_at != null ? Number(row.stopped_at) : undefined, + commandCap: row.command_cap != null ? Number(row.command_cap) : undefined, + commandCount: Number(row.command_count) ?? 0, + createdBy: (row.created_by as string | null) ?? undefined, + }; + } } function normaliseNodeSegments(raw: unknown): StoredCaptureSession['nodeSegments'] | undefined { diff --git a/apps/api/src/storage/adapters/sqlite.adapter.ts b/apps/api/src/storage/adapters/sqlite.adapter.ts index 301bc370..edd5ef74 100644 --- a/apps/api/src/storage/adapters/sqlite.adapter.ts +++ b/apps/api/src/storage/adapters/sqlite.adapter.ts @@ -50,6 +50,9 @@ import { ScheduledCapturePatch, CaptureTriggerQueryOptions, CaptureTriggerPatch, + StoredCommandCaptureSession, + CommandCaptureSessionQueryOptions, + StoredCommandCaptureRecord, } from '../../common/interfaces/storage-port.interface'; import type { VectorIndexSnapshot, @@ -1472,6 +1475,36 @@ export class SqliteAdapter implements StoragePort { CREATE INDEX IF NOT EXISTS idx_scheduled_captures_conn_status ON scheduled_captures(connection_id, status); + + -- Command Capture (iovalkey-capture wrapper) + CREATE TABLE IF NOT EXISTS command_capture_sessions ( + id TEXT PRIMARY KEY, + connection_id TEXT NOT NULL, + status TEXT NOT NULL DEFAULT 'active', + started_at INTEGER NOT NULL, + duration_ms INTEGER NOT NULL, + expires_at INTEGER NOT NULL, + stopped_at INTEGER, + command_cap INTEGER, + command_count INTEGER NOT NULL DEFAULT 0, + created_by TEXT + ); + CREATE INDEX IF NOT EXISTS idx_cmd_capture_sessions_conn_status + ON command_capture_sessions(connection_id, status); + + CREATE TABLE IF NOT EXISTS command_capture_records ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + session_id TEXT NOT NULL, + connection_id TEXT NOT NULL, + wrapper_connection_id TEXT NOT NULL, + name TEXT NOT NULL, + args TEXT NOT NULL DEFAULT '[]', + ts INTEGER NOT NULL + ); + CREATE INDEX IF NOT EXISTS idx_cmd_capture_records_session + ON command_capture_records(session_id); + CREATE INDEX IF NOT EXISTS idx_cmd_capture_records_ts + ON command_capture_records(ts); `); // Idempotent migration for deployments that ran the PR 19 schema before @@ -4155,6 +4188,99 @@ export class SqliteAdapter implements StoragePort { lastSkipReason: (row.last_skip_reason as string | null) ?? undefined, }; } + + // -- Command Capture (iovalkey-capture wrapper) -- + + async saveCommandCaptureSession(session: StoredCommandCaptureSession): Promise { + if (!this.db) throw new Error('Database not initialized'); + this.db.prepare(` + INSERT INTO command_capture_sessions (id, connection_id, status, started_at, duration_ms, expires_at, stopped_at, command_cap, command_count, created_by) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + `).run( + session.id, session.connectionId, session.status, session.startedAt, + session.durationMs, session.expiresAt, session.stoppedAt ?? null, + session.commandCap ?? null, session.commandCount, session.createdBy ?? null, + ); + return session.id; + } + + async getCommandCaptureSession(id: string): Promise { + if (!this.db) throw new Error('Database not initialized'); + const row = this.db.prepare('SELECT * FROM command_capture_sessions WHERE id = ?').get(id) as Record | undefined; + return row ? this.mapCommandCaptureSessionRow(row) : null; + } + + async getCommandCaptureSessions(options?: CommandCaptureSessionQueryOptions): Promise { + if (!this.db) throw new Error('Database not initialized'); + const conditions: string[] = []; + const params: unknown[] = []; + if (options?.connectionId) { conditions.push('connection_id = ?'); params.push(options.connectionId); } + if (options?.status) { conditions.push('status = ?'); params.push(options.status); } + const where = conditions.length > 0 ? `WHERE ${conditions.join(' AND ')}` : ''; + const limit = options?.limit ? `LIMIT ${options.limit}` : ''; + const rows = this.db.prepare(`SELECT * FROM command_capture_sessions ${where} ORDER BY started_at DESC ${limit}`).all(...params) as Record[]; + return rows.map((r) => this.mapCommandCaptureSessionRow(r)); + } + + async updateCommandCaptureSession( + id: string, + patch: Partial>, + ): Promise { + if (!this.db) throw new Error('Database not initialized'); + const sets: string[] = []; + const params: unknown[] = []; + if (patch.status !== undefined) { sets.push('status = ?'); params.push(patch.status); } + if (patch.stoppedAt !== undefined) { sets.push('stopped_at = ?'); params.push(patch.stoppedAt); } + if (patch.commandCount !== undefined) { sets.push('command_count = ?'); params.push(patch.commandCount); } + if (sets.length === 0) return false; + params.push(id); + const result = this.db.prepare(`UPDATE command_capture_sessions SET ${sets.join(', ')} WHERE id = ?`).run(...params); + return (result as { changes: number }).changes > 0; + } + + async saveCommandCaptureRecords(records: StoredCommandCaptureRecord[]): Promise { + if (!this.db || records.length === 0) return 0; + const stmt = this.db.prepare(` + INSERT INTO command_capture_records (session_id, connection_id, wrapper_connection_id, name, args, ts) + VALUES (?, ?, ?, ?, ?, ?) + `); + let count = 0; + const transaction = this.db.transaction(() => { + for (const r of records) { + const result = stmt.run(r.sessionId, r.connectionId, r.wrapperConnectionId, r.name, JSON.stringify(r.args), r.ts); + if ((result as { changes: number }).changes > 0) count++; + } + }); + transaction(); + return count; + } + + async pruneOldCommandCaptureRecords(cutoffTimestamp: number): Promise { + if (!this.db) throw new Error('Database not initialized'); + const result = this.db.prepare('DELETE FROM command_capture_records WHERE ts < ?').run(cutoffTimestamp); + return (result as { changes: number }).changes; + } + + async pruneOldCommandCaptureSessions(cutoffTimestamp: number): Promise { + if (!this.db) throw new Error('Database not initialized'); + const result = this.db.prepare("DELETE FROM command_capture_sessions WHERE started_at < ? AND status != 'active'").run(cutoffTimestamp); + return (result as { changes: number }).changes; + } + + private mapCommandCaptureSessionRow(row: Record): StoredCommandCaptureSession { + return { + id: row.id as string, + connectionId: row.connection_id as string, + status: row.status as StoredCommandCaptureSession['status'], + startedAt: row.started_at as number, + durationMs: row.duration_ms as number, + expiresAt: row.expires_at as number, + stoppedAt: (row.stopped_at as number | null) ?? undefined, + commandCap: (row.command_cap as number | null) ?? undefined, + commandCount: (row.command_count as number) ?? 0, + createdBy: (row.created_by as string | null) ?? undefined, + }; + } } function parseNodeSegmentsJson(raw: unknown): StoredCaptureSession['nodeSegments'] | undefined { diff --git a/apps/web/src/api/command-capture.ts b/apps/web/src/api/command-capture.ts new file mode 100644 index 00000000..64d09f4c --- /dev/null +++ b/apps/web/src/api/command-capture.ts @@ -0,0 +1,36 @@ +import type { StoredCommandCaptureSession } from '@betterdb/shared'; +import { fetchApi } from './client'; + +export type { StoredCommandCaptureSession }; + +export interface StartCommandCaptureParams { + connectionId: string; + durationMs: number; + commandCap?: number; + createdBy?: string; +} + +export const commandCaptureApi = { + /** Get the active session for a connection (user-authed). */ + getSession: (connectionId: string): Promise => { + return fetchApi( + `/command-capture/session?connectionId=${encodeURIComponent(connectionId)}`, + ); + }, + + /** Start a command capture session. */ + start: (params: StartCommandCaptureParams): Promise => { + return fetchApi('/command-capture/start', { + method: 'POST', + body: JSON.stringify(params), + }); + }, + + /** Stop the active command capture session for a connection. */ + stop: (connectionId: string): Promise<{ stopped: boolean; session?: StoredCommandCaptureSession }> => { + return fetchApi<{ stopped: boolean; session?: StoredCommandCaptureSession }>('/command-capture/stop', { + method: 'POST', + body: JSON.stringify({ connectionId }), + }); + }, +}; diff --git a/apps/web/src/components/layout/AppSidebar.tsx b/apps/web/src/components/layout/AppSidebar.tsx index b485bbff..e78e1ec9 100644 --- a/apps/web/src/components/layout/AppSidebar.tsx +++ b/apps/web/src/components/layout/AppSidebar.tsx @@ -103,7 +103,7 @@ export function AppSidebar({ cloudUser, onFeedbackClick }: SidebarProps) { Audit Trail - MONITOR + Monitor / Command Capture Webhooks diff --git a/apps/web/src/pages/Monitor.tsx b/apps/web/src/pages/Monitor.tsx index 12cb888d..117df602 100644 --- a/apps/web/src/pages/Monitor.tsx +++ b/apps/web/src/pages/Monitor.tsx @@ -15,6 +15,7 @@ import { SchedulesTable } from './monitor/schedules-table'; import { SessionsTable } from './monitor/sessions-table'; import { StartSessionModal } from './monitor/start-session-modal'; import { TriggersTable } from './monitor/triggers-table'; +import { CommandCaptureControl } from './command-capture/CommandCaptureControl'; export function Monitor() { const { currentConnection } = useConnection(); @@ -25,6 +26,7 @@ export function Monitor() { const triggersEnabled = hasFeature(Feature.MONITOR_ANOMALY_TRIGGER); const schedulesEnabled = hasFeature(Feature.MONITOR_SCHEDULED_CAPTURES); + const [captureType, setCaptureType] = useState<'monitor' | 'client'>('monitor'); const [startOpen, setStartOpen] = useState(false); const [scheduleOpen, setScheduleOpen] = useState(false); const [cancellingId, setCancellingId] = useState(); @@ -86,20 +88,39 @@ export function Monitor() {
-

MONITOR

+

Monitor / Command Capture

- On-demand command capture sessions for Valkey/Redis instances. Start, stop, and - review past sessions for the currently selected connection. + {captureType === 'monitor' + ? 'On-demand command capture sessions for Valkey/Redis instances via MONITOR. ' + + 'Start, stop, and review past sessions for the currently selected connection.' + : 'Capture commands from your application via the @betterdb/iovalkey-capture ' + + 'client wrapper. Start a capture window and instrumented clients will ship ' + + 'their commands here.'}

-
- - -
+ {captureType === 'monitor' && ( +
+ + +
+ )}
+ setCaptureType(v as 'monitor' | 'client')} + > + + Server (MONITOR) + Client Library + + + + {captureType === 'client' && } + + {captureType === 'monitor' && ( Sessions @@ -154,6 +175,7 @@ export function Monitor() { )} + )} {connectionId && ( ({ + useConnection: () => ({ + currentConnection: { id: mockConnectionId, name: 'test' }, + }), +})); + +vi.mock('../../hooks/useUpgradePrompt', () => ({ + useUpgradePrompt: () => ({ showUpgradePrompt: vi.fn() }), +})); + +const mockGetSession = vi.fn(); +const mockStart = vi.fn(); +const mockStop = vi.fn(); + +vi.mock('../../api/command-capture', () => ({ + commandCaptureApi: { + getSession: (...args: unknown[]) => mockGetSession(...args), + start: (...args: unknown[]) => mockStart(...args), + stop: (...args: unknown[]) => mockStop(...args), + }, +})); + +import { CommandCaptureControl } from './CommandCaptureControl'; + +function renderWithClient(ui: React.ReactElement) { + const queryClient = new QueryClient({ + defaultOptions: { queries: { retry: false, gcTime: 0 } }, + }); + return render( + {ui}, + ); +} + +describe('CommandCaptureControl', () => { + beforeEach(() => { + vi.clearAllMocks(); + mockGetSession.mockResolvedValue(null); + }); + + it('renders idle state with start affordance', async () => { + renderWithClient(); + await waitFor(() => { + expect(screen.getByText('Start Capture')).toBeInTheDocument(); + }); + expect(screen.getByText('Command Capture')).toBeInTheDocument(); + // Duration presets visible + expect(screen.getByText('30s')).toBeInTheDocument(); + expect(screen.getByText('5m')).toBeInTheDocument(); + }); + + it('calls start endpoint with chosen duration', async () => { + mockStart.mockResolvedValue({ + id: 'sess-1', + connectionId: mockConnectionId, + status: 'active', + startedAt: Date.now(), + durationMs: 30000, + expiresAt: Date.now() + 30000, + commandCount: 0, + }); + + renderWithClient(); + await waitFor(() => { + expect(screen.getByText('Start Capture')).toBeInTheDocument(); + }); + + fireEvent.click(screen.getByText('Start Capture')); + + await waitFor(() => { + expect(mockStart).toHaveBeenCalledWith({ + connectionId: mockConnectionId, + durationMs: 30000, + commandCap: undefined, + }); + }); + }); + + it('renders active state with countdown and stop control', async () => { + const now = Date.now(); + mockGetSession.mockResolvedValue({ + id: 'sess-1', + connectionId: mockConnectionId, + status: 'active', + startedAt: now - 5000, + durationMs: 60000, + expiresAt: now + 55000, + commandCount: 42, + }); + + renderWithClient(); + await waitFor(() => { + expect(screen.getByText('Capturing')).toBeInTheDocument(); + }); + expect(screen.getByText('Stop Capture')).toBeInTheDocument(); + expect(screen.getByText(/42/)).toBeInTheDocument(); + }); + + it('calls stop endpoint when stop is clicked', async () => { + const now = Date.now(); + mockGetSession.mockResolvedValue({ + id: 'sess-1', + connectionId: mockConnectionId, + status: 'active', + startedAt: now, + durationMs: 60000, + expiresAt: now + 60000, + commandCount: 0, + }); + mockStop.mockResolvedValue({ stopped: true }); + + renderWithClient(); + await waitFor(() => { + expect(screen.getByText('Stop Capture')).toBeInTheDocument(); + }); + + fireEvent.click(screen.getByText('Stop Capture')); + await waitFor(() => { + expect(mockStop).toHaveBeenCalledWith(mockConnectionId); + }); + }); + + it('renders idle for expired/inactive session', async () => { + mockGetSession.mockResolvedValue(null); + + renderWithClient(); + await waitFor(() => { + expect(screen.getByText('Start Capture')).toBeInTheDocument(); + }); + expect(screen.queryByText('Capturing')).not.toBeInTheDocument(); + }); +}); diff --git a/apps/web/src/pages/command-capture/CommandCaptureControl.tsx b/apps/web/src/pages/command-capture/CommandCaptureControl.tsx new file mode 100644 index 00000000..1f5aa695 --- /dev/null +++ b/apps/web/src/pages/command-capture/CommandCaptureControl.tsx @@ -0,0 +1,225 @@ +import { useCallback, useEffect, useMemo, useState } from 'react'; +import { useQueryClient } from '@tanstack/react-query'; +import { useConnection } from '../../hooks/useConnection'; +import { usePolling } from '../../hooks/usePolling'; +import { commandCaptureApi, type StoredCommandCaptureSession } from '../../api/command-capture'; +import { Button } from '../../components/ui/button'; +import { Input } from '../../components/ui/input'; +import { Card, CardContent } from '../../components/ui/card'; +import { + Select, + SelectContent, + SelectItem, + SelectTrigger, + SelectValue, +} from '../../components/ui/select'; + +type Unit = 's' | 'm'; + +const DURATION_PRESETS: Array<{ label: string; seconds: number }> = [ + { label: '30s', seconds: 30 }, + { label: '1m', seconds: 60 }, + { label: '5m', seconds: 300 }, + { label: '15m', seconds: 900 }, + { label: '1h', seconds: 3600 }, +]; + +function formatCountdown(expiresAt: number): string { + const remaining = Math.max(0, expiresAt - Date.now()); + const secs = Math.floor(remaining / 1000); + if (secs >= 60) { + const m = Math.floor(secs / 60); + const s = secs % 60; + return `${m}m ${s}s`; + } + return `${secs}s`; +} + +export function CommandCaptureControl() { + const { currentConnection } = useConnection(); + const connectionId = currentConnection?.id; + const queryClient = useQueryClient(); + + const queryKey = useMemo( + () => ['command-capture', 'session', connectionId ?? 'none'], + [connectionId], + ); + + const sessionQuery = usePolling({ + fetcher: () => + connectionId + ? commandCaptureApi.getSession(connectionId) + : Promise.resolve(null), + interval: 5000, + enabled: !!connectionId, + queryKey, + refetchKey: connectionId, + }); + + const session = sessionQuery.data; + const isActive = session?.status === 'active' && session.expiresAt > Date.now(); + + // Countdown timer + const [, setTick] = useState(0); + useEffect(() => { + if (!isActive) return; + const timer = setInterval(() => setTick((t) => t + 1), 1000); + return () => clearInterval(timer); + }, [isActive]); + + // Start form state + const [duration, setDuration] = useState(30); + const [unit, setUnit] = useState('s'); + const [commandCap, setCommandCap] = useState(''); + const [submitting, setSubmitting] = useState(false); + const [error, setError] = useState(null); + + const durationMs = unit === 'm' ? duration * 60 * 1000 : duration * 1000; + + const handleStart = useCallback(async () => { + if (!connectionId) return; + setSubmitting(true); + setError(null); + try { + await commandCaptureApi.start({ + connectionId, + durationMs, + commandCap: commandCap ? parseInt(commandCap, 10) : undefined, + }); + await queryClient.invalidateQueries({ queryKey }); + } catch (e) { + setError((e as Error).message); + } finally { + setSubmitting(false); + } + }, [connectionId, durationMs, commandCap, queryClient, queryKey]); + + const handleStop = useCallback(async () => { + if (!connectionId) return; + setSubmitting(true); + setError(null); + try { + await commandCaptureApi.stop(connectionId); + await queryClient.invalidateQueries({ queryKey }); + } catch (e) { + setError((e as Error).message); + } finally { + setSubmitting(false); + } + }, [connectionId, queryClient, queryKey]); + + const handlePreset = useCallback((seconds: number) => { + if (seconds >= 60) { + setDuration(seconds / 60); + setUnit('m'); + } else { + setDuration(seconds); + setUnit('s'); + } + }, []); + + if (!connectionId) { + return null; + } + + return ( + + +

Command Capture

+ + {isActive && session ? ( +
+
+ + Capturing + + {formatCountdown(session.expiresAt)} remaining + +
+ +
+
Commands captured: {session.commandCount.toLocaleString()}
+ {session.commandCap && ( +
+ Command cap: {session.commandCap.toLocaleString()} +
+ )} +
+ + +
+ ) : ( +
+
+ {DURATION_PRESETS.map((p) => ( + + ))} +
+ +
+ setDuration(parseInt(e.target.value, 10) || 1)} + className="w-20" + /> + +
+ +
+ + setCommandCap(e.target.value)} + className="mt-1" + /> +
+ + + + {error && ( +

{error}

+ )} +
+ )} +
+
+ ); +} diff --git a/packages/iovalkey-capture/package.json b/packages/iovalkey-capture/package.json new file mode 100644 index 00000000..cb1c18d1 --- /dev/null +++ b/packages/iovalkey-capture/package.json @@ -0,0 +1,38 @@ +{ + "name": "@betterdb/iovalkey-capture", + "version": "0.1.0", + "description": "Capture-only wrapper around iovalkey that tees outbound commands to BetterDB Monitor", + "keywords": [ + "valkey", + "redis", + "iovalkey", + "capture", + "monitoring", + "betterdb" + ], + "license": "MIT", + "repository": { + "type": "git", + "url": "https://github.com/BetterDB-inc/monitor", + "directory": "packages/iovalkey-capture" + }, + "main": "./dist/index.js", + "types": "./dist/index.d.ts", + "files": [ + "dist" + ], + "scripts": { + "build": "tsc", + "typecheck": "tsc --noEmit", + "test": "vitest run" + }, + "peerDependencies": { + "iovalkey": ">=0.3.0" + }, + "devDependencies": { + "@types/node": "^22.19.15", + "iovalkey": "^0.3.3", + "typescript": "^5.7.0", + "vitest": "^4.1.1" + } +} diff --git a/packages/iovalkey-capture/src/CaptureValkey.ts b/packages/iovalkey-capture/src/CaptureValkey.ts new file mode 100644 index 00000000..4011406a --- /dev/null +++ b/packages/iovalkey-capture/src/CaptureValkey.ts @@ -0,0 +1,211 @@ +import { Redis } from 'iovalkey'; +import type { RedisOptions } from 'iovalkey'; +import { randomUUID } from 'node:crypto'; +import type { + CaptureConfig, + CapturedCommand, + CaptureStats, + CaptureBatchRequest, + CaptureWindowResponse, +} from './types'; + +const DEFAULT_BATCH_SIZE = 1000; +const DEFAULT_FLUSH_INTERVAL_MS = 5000; +const DEFAULT_MAX_BUFFERED = 100_000; +const DEFAULT_POLL_INTERVAL_MS = 15_000; + +export interface CaptureValkeyOptions extends RedisOptions { + capture: CaptureConfig; +} + +export class CaptureValkey extends Redis { + readonly connectionId: string; + + private readonly captureConfig: CaptureConfig; + private readonly batchSize: number; + private readonly maxBuffered: number; + + private buffer: CapturedCommand[] = []; + private capturing = false; + private windowExpiresAt: number | undefined; + private windowCommandCap: number | undefined; + private windowCapturedCount = 0; + + private capturedCount = 0; + private droppedCount = 0; + private failedFlushCount = 0; + private errorCount = 0; + + private flushTimer: ReturnType | undefined; + private pollTimer: ReturnType | undefined; + + constructor(port: number, host: string, options: CaptureValkeyOptions); + constructor(path: string, options: CaptureValkeyOptions); + constructor(port: number, options: CaptureValkeyOptions); + constructor(options: CaptureValkeyOptions); + constructor( + arg1: number | string | CaptureValkeyOptions, + arg2?: string | CaptureValkeyOptions, + arg3?: CaptureValkeyOptions, + ) { + // Extract capture config from whichever arg is the options object. + let captureConfig: CaptureConfig | undefined; + for (const arg of [arg3, arg2, arg1]) { + if (arg && typeof arg === 'object' && 'capture' in arg) { + captureConfig = (arg as CaptureValkeyOptions).capture; + break; + } + } + if (!captureConfig) { + throw new Error('CaptureValkey requires a `capture` config in options'); + } + + // Pass all args through to iovalkey. Extra `capture` key is ignored by parseOptions. + super(arg1 as number, arg2 as string, arg3 as RedisOptions); + + this.connectionId = randomUUID(); + this.captureConfig = captureConfig; + this.batchSize = captureConfig.batchSize ?? DEFAULT_BATCH_SIZE; + this.maxBuffered = captureConfig.maxBufferedCommands ?? DEFAULT_MAX_BUFFERED; + + this.startFlushTimer(captureConfig.flushIntervalMs ?? DEFAULT_FLUSH_INTERVAL_MS); + this.startPollTimer(captureConfig.pollIntervalMs ?? DEFAULT_POLL_INTERVAL_MS); + } + + // -- sendCommand override -- + + sendCommand(command: { name: string; args: unknown[] }, ...rest: unknown[]): unknown { + if (this.capturing) { + // Per-command wall-clock expiry check — O(1), no async + if (this.windowExpiresAt !== undefined && Date.now() >= this.windowExpiresAt) { + void this.endCaptureWindow(); + } else try { + if (this.buffer.length >= this.maxBuffered) { + this.droppedCount++; + } else { + const record: CapturedCommand = { + connectionId: this.connectionId, + name: command.name, + args: command.args.map((a) => (a === null || a === undefined ? '' : String(a))), + ts: Date.now(), + }; + this.buffer.push(record); + this.capturedCount++; + this.windowCapturedCount++; + + if (this.buffer.length >= this.batchSize) { + void this.flush(); + } + + if (this.windowCommandCap !== undefined && this.windowCapturedCount >= this.windowCommandCap) { + void this.endCaptureWindow(); + } + } + } catch { + this.errorCount++; + } + } + + return super.sendCommand(command as never, ...(rest as [])); + } + + // -- Stats -- + + stats(): CaptureStats { + return { + capturedCount: this.capturedCount, + droppedCount: this.droppedCount, + failedFlushCount: this.failedFlushCount, + errorCount: this.errorCount, + buffered: this.buffer.length, + }; + } + + // -- Flush -- + + private async flush(): Promise { + if (this.buffer.length === 0) return; + + const batch = this.buffer; + this.buffer = []; + + const body: CaptureBatchRequest = { + connectionId: this.connectionId, + commands: batch, + }; + + try { + const instanceId = encodeURIComponent(this.captureConfig.instanceId); + const resp = await fetch(`${this.captureConfig.monitorUrl}/api/capture/instance/${instanceId}/batch`, { + method: 'POST', + headers: { + 'Content-Type': 'application/json', + 'Authorization': `Bearer ${this.captureConfig.token}`, + }, + body: JSON.stringify(body), + }); + if (!resp.ok) { + this.failedFlushCount++; + } + } catch { + this.failedFlushCount++; + } + } + + // -- Capture window polling -- + + private async poll(): Promise { + try { + const instanceId = encodeURIComponent(this.captureConfig.instanceId); + const resp = await fetch(`${this.captureConfig.monitorUrl}/api/capture/instance/${instanceId}/window`, { + headers: { + 'Authorization': `Bearer ${this.captureConfig.token}`, + }, + }); + if (!resp.ok) return; + + const data = (await resp.json()) as CaptureWindowResponse; + + if (data.active && !this.capturing) { + this.capturing = true; + this.windowCapturedCount = 0; + this.windowCommandCap = data.maxCommands; + this.windowExpiresAt = data.expiresAt; + } else if (!data.active && this.capturing) { + void this.endCaptureWindow(); + } + } catch { + this.errorCount++; + } + } + + private async endCaptureWindow(): Promise { + this.capturing = false; + this.windowExpiresAt = undefined; + this.windowCommandCap = undefined; + await this.flush(); + } + + // -- Timers -- + + private startFlushTimer(intervalMs: number): void { + this.flushTimer = setInterval(() => void this.flush(), intervalMs); + this.flushTimer.unref(); + } + + private startPollTimer(intervalMs: number): void { + this.pollTimer = setInterval(() => void this.poll(), intervalMs); + this.pollTimer.unref(); + // Fire immediately so the first poll doesn't wait a full interval + void this.poll(); + } + + /** + * Clean up timers. Call before disconnecting if you want deterministic shutdown. + */ + async destroyCapture(): Promise { + if (this.flushTimer) clearInterval(this.flushTimer); + if (this.pollTimer) clearInterval(this.pollTimer); + await this.flush(); + } +} diff --git a/packages/iovalkey-capture/src/__tests__/CaptureValkey.test.ts b/packages/iovalkey-capture/src/__tests__/CaptureValkey.test.ts new file mode 100644 index 00000000..a03a174b --- /dev/null +++ b/packages/iovalkey-capture/src/__tests__/CaptureValkey.test.ts @@ -0,0 +1,245 @@ +import { describe, it, expect, vi, beforeEach, afterEach } from 'vitest'; +import { CaptureValkey } from '../CaptureValkey'; +import type { CaptureValkeyOptions } from '../CaptureValkey'; +import type { CapturedCommand } from '../types'; +import { Redis } from 'iovalkey'; +import Command from 'iovalkey/built/Command'; + +// Capture what fetch receives +let fetchCalls: Array<{ url: string; body: string; headers: Record }> = []; +let fetchShouldFail = false; + +vi.stubGlobal('fetch', vi.fn(async (url: string, init?: RequestInit) => { + if (fetchShouldFail) throw new Error('network error'); + fetchCalls.push({ + url, + body: init?.body as string ?? '', + headers: Object.fromEntries(Object.entries(init?.headers ?? {})), + }); + if (url.includes('/window')) { + return { ok: true, json: async () => ({ active: false }) } as Response; + } + return { ok: true } as Response; +})); + +function makeClient(overrides?: Partial): CaptureValkey { + const client = new CaptureValkey({ + lazyConnect: true, + capture: { + token: 'test-token', + monitorUrl: 'http://localhost:3000', + instanceId: 'test-instance-1', + batchSize: 5, + flushIntervalMs: 60_000, + maxBufferedCommands: 10, + pollIntervalMs: 60_000, + ...overrides, + }, + } as CaptureValkeyOptions); + + // Stub super.sendCommand so it doesn't try to write to the wire. + // We spy on the Redis prototype method and make it a no-op returning undefined. + vi.spyOn(Redis.prototype, 'sendCommand').mockReturnValue(undefined); + + return client; +} + +function setCapturing(client: CaptureValkey, value: boolean): void { + (client as unknown as { capturing: boolean }).capturing = value; +} + +function getBuffer(client: CaptureValkey): CapturedCommand[] { + return (client as unknown as { buffer: CapturedCommand[] }).buffer; +} + +function cmd(name: string, args: string[]): Command { + return new Command(name, args); +} + +describe('CaptureValkey', () => { + beforeEach(() => { + fetchCalls = []; + fetchShouldFail = false; + vi.useFakeTimers(); + }); + + afterEach(() => { + vi.restoreAllMocks(); + vi.useRealTimers(); + }); + + it('creates with a connectionId', () => { + const client = makeClient(); + expect(client.connectionId).toBeTruthy(); + expect(typeof client.connectionId).toBe('string'); + void client.destroyCapture(); + }); + + it('records a single command when capturing is on', () => { + const client = makeClient(); + setCapturing(client, true); + + client.sendCommand(cmd('SET', ['key1', 'value1'])); + + const s = client.stats(); + expect(s.capturedCount).toBe(1); + expect(s.buffered).toBe(1); + + const buf = getBuffer(client); + expect(buf[0].name).toBe('SET'); + expect(buf[0].args).toEqual(['key1', 'value1']); + expect(buf[0].connectionId).toBe(client.connectionId); + expect(buf[0].ts).toBeGreaterThan(0); + void client.destroyCapture(); + }); + + it('does not record when capturing is off', () => { + const client = makeClient(); + + client.sendCommand(cmd('GET', ['key1'])); + + expect(client.stats().capturedCount).toBe(0); + expect(client.stats().buffered).toBe(0); + void client.destroyCapture(); + }); + + it('drops with droppedCount when buffer is full', () => { + const client = makeClient({ maxBufferedCommands: 3 }); + setCapturing(client, true); + + for (let i = 0; i < 5; i++) { + client.sendCommand(cmd('PING', [])); + } + + const s = client.stats(); + expect(s.capturedCount).toBe(3); + expect(s.droppedCount).toBe(2); + expect(s.buffered).toBe(3); + void client.destroyCapture(); + }); + + it('flushes when buffer reaches batchSize', async () => { + const client = makeClient({ batchSize: 3 }); + setCapturing(client, true); + + for (let i = 0; i < 3; i++) { + client.sendCommand(cmd('INCR', [`counter${i}`])); + } + + await vi.advanceTimersByTimeAsync(0); + + const batchPost = fetchCalls.find((c) => c.url.includes('/api/capture/instance/test-instance-1/batch')); + expect(batchPost).toBeTruthy(); + const body = JSON.parse(batchPost!.body); + expect(body.commands).toHaveLength(3); + expect(body.connectionId).toBe(client.connectionId); + expect(batchPost!.headers['Authorization']).toBe('Bearer test-token'); + void client.destroyCapture(); + }); + + it('does not throw into caller when capture logic errors', () => { + const client = makeClient(); + setCapturing(client, true); + + // Sabotage the buffer to cause an internal error + (client as unknown as { buffer: unknown }).buffer = null; + + // sendCommand should NOT throw from capture — iovalkey errors are separate + expect(() => { + client.sendCommand(cmd('SET', ['a', 'b'])); + }).not.toThrow(); + + // Restore buffer so stats() works + (client as unknown as { buffer: CapturedCommand[] }).buffer = []; + expect(client.stats().errorCount).toBe(1); + void client.destroyCapture(); + }); + + it('increments failedFlushCount on POST failure', async () => { + fetchShouldFail = true; + const client = makeClient({ batchSize: 1 }); + setCapturing(client, true); + + client.sendCommand(cmd('SET', ['x', 'y'])); + + await vi.advanceTimersByTimeAsync(0); + expect(client.stats().failedFlushCount).toBe(1); + void client.destroyCapture(); + }); + + it('pipeline commands are captured via sendCommand override', () => { + const client = makeClient(); + setCapturing(client, true); + + client.sendCommand(cmd('SET', ['a', '1'])); + client.sendCommand(cmd('SET', ['b', '2'])); + client.sendCommand(cmd('GET', ['a'])); + + expect(client.stats().capturedCount).toBe(3); + const buf = getBuffer(client); + expect(buf.map((r) => r.name)).toEqual(['SET', 'SET', 'GET']); + void client.destroyCapture(); + }); + + it('stats() returns correct shape', () => { + const client = makeClient(); + const s = client.stats(); + expect(s).toEqual({ + capturedCount: 0, + droppedCount: 0, + failedFlushCount: 0, + errorCount: 0, + buffered: 0, + }); + void client.destroyCapture(); + }); + + // -- Expiry enforcement -- + + it('does not buffer a command after windowExpiresAt', () => { + const client = makeClient(); + setCapturing(client, true); + // Set expiresAt in the past + (client as unknown as { windowExpiresAt: number }).windowExpiresAt = Date.now() - 1000; + + client.sendCommand(cmd('SET', ['a', 'b'])); + + // Command should NOT be buffered — expiry triggers endCaptureWindow + expect(client.stats().capturedCount).toBe(0); + expect(client.stats().buffered).toBe(0); + // capturing should now be false + expect((client as unknown as { capturing: boolean }).capturing).toBe(false); + void client.destroyCapture(); + }); + + it('flushes remaining buffer once on expiry and stops shipping', async () => { + const client = makeClient({ batchSize: 100 }); // high batch size so auto-flush doesn't fire + setCapturing(client, true); + (client as unknown as { windowExpiresAt: number }).windowExpiresAt = Date.now() + 60_000; + + // Buffer some commands + client.sendCommand(cmd('SET', ['x', '1'])); + client.sendCommand(cmd('SET', ['y', '2'])); + expect(client.stats().buffered).toBe(2); + + // Now expire + (client as unknown as { windowExpiresAt: number }).windowExpiresAt = Date.now() - 1; + client.sendCommand(cmd('GET', ['z'])); // triggers expiry check + + // Let the final flush fire + await vi.advanceTimersByTimeAsync(0); + + // The 2 buffered commands should have been flushed, GET should not be buffered + expect(client.stats().capturedCount).toBe(2); + const batchPost = fetchCalls.find((c) => c.url.includes('/batch')); + expect(batchPost).toBeTruthy(); + const body = JSON.parse(batchPost!.body); + expect(body.commands).toHaveLength(2); + + // No further commands are buffered + fetchCalls = []; + client.sendCommand(cmd('SET', ['after', 'expiry'])); + expect(client.stats().capturedCount).toBe(2); // unchanged + void client.destroyCapture(); + }); +}); diff --git a/packages/iovalkey-capture/src/index.ts b/packages/iovalkey-capture/src/index.ts new file mode 100644 index 00000000..dda54180 --- /dev/null +++ b/packages/iovalkey-capture/src/index.ts @@ -0,0 +1,9 @@ +export { CaptureValkey } from './CaptureValkey'; +export type { CaptureValkeyOptions } from './CaptureValkey'; +export type { + CaptureConfig, + CapturedCommand, + CaptureStats, + CaptureBatchRequest, + CaptureWindowResponse, +} from './types'; diff --git a/packages/iovalkey-capture/src/types.ts b/packages/iovalkey-capture/src/types.ts new file mode 100644 index 00000000..23f19377 --- /dev/null +++ b/packages/iovalkey-capture/src/types.ts @@ -0,0 +1,73 @@ +/** + * Capture configuration, namespaced under `capture` on the Redis options object. + */ +export interface CaptureConfig { + /** Auth token carrying workspace/instance binding. Required. */ + token: string; + /** Monitor HTTP endpoint base URL. Required. */ + monitorUrl: string; + /** Monitor connection/instance ID for this Valkey instance. Required. */ + instanceId: string; + /** Commands per batch before POST. Default: 1000. */ + batchSize?: number; + /** Flush partial batch on a timer (ms). Default: 5000. */ + flushIntervalMs?: number; + /** Bounded buffer cap. Default: 100000. */ + maxBufferedCommands?: number; + /** Capture-window poll interval (ms). Default: 15000. */ + pollIntervalMs?: number; +} + +/** + * A single captured command record. + */ +export interface CapturedCommand { + /** Stable per-instance connection identifier. */ + connectionId: string; + /** Command name (e.g. "SET", "GET", "HSET"). */ + name: string; + /** Command arguments, serialized as strings. */ + args: string[]; + /** Epoch ms when the command was intercepted. */ + ts: number; +} + +/** + * Debugging/observability stats. + */ +export interface CaptureStats { + /** Total commands captured into the buffer. */ + capturedCount: number; + /** Commands dropped because the buffer was full. */ + droppedCount: number; + /** Flush POSTs that failed (batch dropped, no retry). */ + failedFlushCount: number; + /** Internal errors swallowed by the capture path. */ + errorCount: number; + /** Commands currently in the buffer awaiting flush. */ + buffered: number; +} + +// -- Monitor API shapes (consumed by Monitor in a follow-up) -- + +/** + * Request body for the batch POST endpoint. + */ +export interface CaptureBatchRequest { + connectionId: string; + commands: CapturedCommand[]; +} + +/** + * Response from the capture-window poll endpoint. + */ +export interface CaptureWindowResponse { + /** Whether a capture window is currently active. */ + active: boolean; + /** Optional max commands to capture in this window. */ + maxCommands?: number; + /** Optional max duration (ms) for this window. */ + maxDurationMs?: number; + /** Absolute epoch-ms expiry. Wrapper uses this for per-command gating. */ + expiresAt?: number; +} diff --git a/packages/iovalkey-capture/tsconfig.json b/packages/iovalkey-capture/tsconfig.json new file mode 100644 index 00000000..78f3e3fd --- /dev/null +++ b/packages/iovalkey-capture/tsconfig.json @@ -0,0 +1,19 @@ +{ + "compilerOptions": { + "target": "ES2022", + "module": "CommonJS", + "lib": ["ES2022"], + "declaration": true, + "outDir": "./dist", + "rootDir": "./src", + "strict": true, + "esModuleInterop": true, + "skipLibCheck": true, + "forceConsistentCasingInFileNames": true, + "resolveJsonModule": true, + "moduleResolution": "node", + "types": ["node"] + }, + "include": ["src/**/*"], + "exclude": ["node_modules", "dist", "src/__tests__"] +} diff --git a/packages/shared/src/types/monitor.ts b/packages/shared/src/types/monitor.ts index 8ba72921..d0601e75 100644 --- a/packages/shared/src/types/monitor.ts +++ b/packages/shared/src/types/monitor.ts @@ -143,3 +143,36 @@ export interface CaptureSessionPatch { terminationReason?: string; nodeSegments?: CaptureNodeSegment[]; } + +// -- Command capture (iovalkey-capture wrapper) -- + +export type CommandCaptureSessionStatus = 'active' | 'stopped' | 'expired'; + +export interface StoredCommandCaptureSession { + id: string; + connectionId: string; + status: CommandCaptureSessionStatus; + startedAt: number; + durationMs: number; + expiresAt: number; + stoppedAt?: number; + commandCap?: number; + commandCount: number; + createdBy?: string; +} + +export interface CommandCaptureSessionQueryOptions { + connectionId?: string; + status?: CommandCaptureSessionStatus; + limit?: number; +} + +export interface StoredCommandCaptureRecord { + sessionId: string; + connectionId: string; + /** Wrapper-side connection identifier (per CaptureValkey instance). */ + wrapperConnectionId: string; + name: string; + args: string[]; + ts: number; +} diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 81252123..68ce70ba 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -36,6 +36,9 @@ importers: apps/api: dependencies: + '@betterdb/iovalkey-capture': + specifier: workspace:* + version: link:../../packages/iovalkey-capture '@betterdb/shared': specifier: workspace:* version: link:../../packages/shared @@ -502,6 +505,21 @@ importers: specifier: ^5.9.3 version: 5.9.3 + packages/iovalkey-capture: + devDependencies: + '@types/node': + specifier: ^22.19.15 + version: 22.19.15 + iovalkey: + specifier: ^0.3.3 + version: 0.3.3 + typescript: + specifier: ^5.7.0 + version: 5.9.3 + vitest: + specifier: ^4.1.1 + version: 4.1.1(@opentelemetry/api@1.9.0)(@types/node@22.19.15)(happy-dom@20.8.9)(msw@2.12.14(@types/node@22.19.15)(typescript@5.9.3))(vite@8.0.5(@emnapi/core@1.8.1)(@emnapi/runtime@1.8.1)(@types/node@22.19.15)(esbuild@0.27.4)(jiti@2.6.1)(terser@5.44.1)(tsx@4.21.0)(yaml@2.8.2)) + packages/mcp: dependencies: '@modelcontextprotocol/sdk': @@ -8675,10 +8693,6 @@ packages: resolution: {integrity: sha512-u9r3uZC0bdpGOXtlxUIdwf9pkmvhqJdrVCH9fapQtgy/OeTTMZ1nqH7agtvEfmGui6e1XxjcdrlxvxJvc3sMqw==} engines: {node: '>=18'} - tinyglobby@0.2.15: - resolution: {integrity: sha512-j2Zq4NyQYG5XMST4cbs02Ak8iJUdxRM0XI5QyxXuZOzKOINmWurp3smXu3y5wDcJrptwpSjgXHzIQxR0omXljQ==} - engines: {node: '>=12.0.0'} - tinyglobby@0.2.16: resolution: {integrity: sha512-pn99VhoACYR8nFHhxqix+uvsbXineAasWm5ojXoN8xEwK5Kd3/TrhNn1wByuD52UxWRLy8pu+kRMniEi6Eq9Zg==} engines: {node: '>=12.0.0'} @@ -12592,7 +12606,7 @@ snapshots: dependencies: '@types/estree': 1.0.8 estree-walker: 2.0.2 - picomatch: 4.0.3 + picomatch: 4.0.4 '@scarf/scarf@1.4.0': {} @@ -13477,7 +13491,7 @@ snapshots: debug: 4.4.3(supports-color@5.5.0) minimatch: 10.2.5 semver: 7.7.4 - tinyglobby: 0.2.15 + tinyglobby: 0.2.16 ts-api-utils: 2.5.0(typescript@5.9.3) typescript: 5.9.3 transitivePeerDependencies: @@ -15525,7 +15539,7 @@ snapshots: isstream: 0.1.2 jsonwebtoken: 9.0.3 mime-types: 2.1.35 - retry-axios: 2.6.0(axios@1.14.0(debug@4.4.3)) + retry-axios: 2.6.0(axios@1.14.0) tough-cookie: 4.1.4 transitivePeerDependencies: - supports-color @@ -17726,7 +17740,7 @@ snapshots: ret@0.5.0: {} - retry-axios@2.6.0(axios@1.14.0(debug@4.4.3)): + retry-axios@2.6.0(axios@1.14.0): dependencies: axios: 1.14.0(debug@4.4.3) @@ -18323,11 +18337,6 @@ snapshots: tinyexec@1.0.4: {} - tinyglobby@0.2.15: - dependencies: - fdir: 6.5.0(picomatch@4.0.4) - picomatch: 4.0.4 - tinyglobby@0.2.16: dependencies: fdir: 6.5.0(picomatch@4.0.4) @@ -18683,11 +18692,11 @@ snapshots: magic-string: 0.30.21 obug: 2.1.1 pathe: 2.0.3 - picomatch: 4.0.3 + picomatch: 4.0.4 std-env: 4.0.0 tinybench: 2.9.0 tinyexec: 1.0.4 - tinyglobby: 0.2.15 + tinyglobby: 0.2.16 tinyrainbow: 3.1.0 vite: 8.0.5(@emnapi/core@1.8.1)(@emnapi/runtime@1.8.1)(@types/node@22.19.15)(esbuild@0.27.4)(jiti@2.6.1)(terser@5.44.1)(tsx@4.21.0)(yaml@2.8.2) why-is-node-running: 2.3.0