Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions apps/api/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
"seed:demo": "ts-node -r tsconfig-paths/register scripts/seed-demo-data.ts"
},
"dependencies": {
"@betterdb/iovalkey-capture": "workspace:*",
"@betterdb/shared": "workspace:*",
"@fastify/static": "^9.1.1",
"@fastify/swagger": "^9.7.0",
Expand Down
2 changes: 2 additions & 0 deletions apps/api/src/app.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import { CliModule } from './cli/cli.module';
import { PosthogProxyModule } from './posthog-proxy/posthog-proxy.module';
import { SystemModule } from './system/system.module';
import { MonitorModule } from './monitor/monitor.module';
import { CommandCaptureModule } from './command-capture/command-capture.module';

let AiModule: any = null;
let LicenseModule: any = null;
Expand Down Expand Up @@ -151,6 +152,7 @@ const baseImports = [
PosthogProxyModule,
SystemModule,
MonitorModule,
CommandCaptureModule,
];

const proprietaryImports = [
Expand Down
197 changes: 197 additions & 0 deletions apps/api/src/command-capture/__tests__/command-capture.service.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,197 @@
import { CommandCaptureService } from '../command-capture.service';
import { MemoryAdapter } from '../../storage/adapters/memory.adapter';

describe('CommandCaptureService', () => {
let service: CommandCaptureService;
let storage: MemoryAdapter;

beforeEach(async () => {
storage = new MemoryAdapter();
await storage.initialize();
service = new CommandCaptureService(storage as any);
});

describe('startSession', () => {
it('creates an active session', async () => {
const session = await service.startSession({
connectionId: 'conn-1',
durationMs: 60_000,
createdBy: 'test-user',
});

expect(session.id).toBeDefined();
expect(session.connectionId).toBe('conn-1');
expect(session.status).toBe('active');
expect(session.durationMs).toBe(60_000);
expect(session.expiresAt).toBeGreaterThan(Date.now() - 1000);
expect(session.commandCount).toBe(0);
expect(session.createdBy).toBe('test-user');
});

it('rejects a second active session for the same instance', async () => {
await service.startSession({ connectionId: 'conn-1', durationMs: 60_000 });
await expect(
service.startSession({ connectionId: 'conn-1', durationMs: 60_000 }),
).rejects.toThrow(/already exists/);
});

it('allows a new session after the first expires', async () => {
const first = await service.startSession({ connectionId: 'conn-1', durationMs: 1 });
// Manually expire it
await storage.updateCommandCaptureSession(first.id, {
status: 'expired',
});
const second = await service.startSession({ connectionId: 'conn-1', durationMs: 60_000 });
expect(second.id).not.toBe(first.id);
expect(second.status).toBe('active');
});
});

describe('stopSession', () => {
it('marks the active session as stopped', async () => {
await service.startSession({ connectionId: 'conn-1', durationMs: 60_000 });
const stopped = await service.stopSession('conn-1');
expect(stopped).not.toBeNull();
expect(stopped!.status).toBe('stopped');
expect(stopped!.stoppedAt).toBeDefined();
});

it('returns null when no active session exists', async () => {
const result = await service.stopSession('nonexistent');
expect(result).toBeNull();
});
});

describe('getActiveWindow (poll)', () => {
it('returns active with caps for an active session', async () => {
await service.startSession({
connectionId: 'conn-1',
durationMs: 60_000,
commandCap: 5000,
});

const window = await service.getActiveWindow('conn-1');
expect(window.active).toBe(true);
expect(window.maxCommands).toBe(5000);
expect(window.maxDurationMs).toBeGreaterThan(0);
expect(window.maxDurationMs).toBeLessThanOrEqual(60_000);
});

it('returns inactive when no session exists', async () => {
const window = await service.getActiveWindow('conn-1');
expect(window.active).toBe(false);
});

it('returns inactive for expired session', async () => {
const session = await service.startSession({
connectionId: 'conn-1',
durationMs: 1,
});
// Force expiry by patching expiresAt
await storage.updateCommandCaptureSession(session.id, {
status: 'active', // still "active" in DB but past expiresAt
});
// Override expiresAt in storage
const stored = await storage.getCommandCaptureSession(session.id);
if (stored) {
(stored as any).expiresAt = Date.now() - 1000;
}

const window = await service.getActiveWindow('conn-1');
expect(window.active).toBe(false);
});
});

describe('getActiveSessions (user-facing status read)', () => {
it('returns the active session', async () => {
await service.startSession({ connectionId: 'conn-1', durationMs: 60_000 });
const sessions = await service.getActiveSessions('conn-1');
expect(sessions).toHaveLength(1);
expect(sessions[0].status).toBe('active');
expect(sessions[0].connectionId).toBe('conn-1');
});

it('returns empty when no active session', async () => {
const sessions = await service.getActiveSessions('conn-1');
expect(sessions).toEqual([]);
});
});

describe('ingestBatch', () => {
it('persists commands for an active session', async () => {
await service.startSession({ connectionId: 'conn-1', durationMs: 60_000 });

const result = await service.ingestBatch('conn-1', {
connectionId: 'wrapper-uuid',
commands: [
{ connectionId: 'wrapper-uuid', name: 'SET', args: ['key1', 'val1'], ts: Date.now() },
{ connectionId: 'wrapper-uuid', name: 'GET', args: ['key1'], ts: Date.now() },
],
});

expect(result.accepted).toBe(2);
expect(result.dropped).toBe(false);
});

it('discards commands for an expired session (with grace)', async () => {
const session = await service.startSession({ connectionId: 'conn-1', durationMs: 1 });
// Force expiry well past the grace window
const stored = await storage.getCommandCaptureSession(session.id);
if (stored) (stored as any).expiresAt = Date.now() - 10_000;

const result = await service.ingestBatch('conn-1', {
connectionId: 'wrapper-uuid',
commands: [
{ connectionId: 'wrapper-uuid', name: 'SET', args: ['x', 'y'], ts: Date.now() },
],
});

expect(result.accepted).toBe(0);
expect(result.dropped).toBe(true);
});

it('discards commands when command cap is reached', async () => {
const session = await service.startSession({ connectionId: 'conn-1', durationMs: 60_000, commandCap: 5 });
// Manually set commandCount to cap
await storage.updateCommandCaptureSession(session.id, { commandCount: 5 });

const result = await service.ingestBatch('conn-1', {
connectionId: 'wrapper-uuid',
commands: [
{ connectionId: 'wrapper-uuid', name: 'SET', args: ['a', 'b'], ts: Date.now() },
],
});

expect(result.accepted).toBe(0);
expect(result.dropped).toBe(true);
});

it('discards commands when no active session exists', async () => {
const result = await service.ingestBatch('conn-1', {
connectionId: 'wrapper-uuid',
commands: [
{ connectionId: 'wrapper-uuid', name: 'SET', args: ['x', 'y'], ts: Date.now() },
],
});

expect(result.accepted).toBe(0);
expect(result.dropped).toBe(true);
});
});

describe('pruneOldRecords', () => {
it('removes records older than retention', async () => {
await service.startSession({ connectionId: 'conn-1', durationMs: 60_000 });

// Insert old records directly into storage
const oldTs = Date.now() - 4 * 24 * 60 * 60 * 1000; // 4 days ago
await storage.saveCommandCaptureRecords([
{ sessionId: 'old-session', connectionId: 'conn-1', wrapperConnectionId: 'w1', name: 'SET', args: ['a', 'b'], ts: oldTs },
{ sessionId: 'old-session', connectionId: 'conn-1', wrapperConnectionId: 'w1', name: 'GET', args: ['a'], ts: Date.now() },
]);

const result = await service.pruneOldRecords(3);
expect(result.records).toBe(1); // only the old one
});
});
});
129 changes: 129 additions & 0 deletions apps/api/src/command-capture/command-capture.controller.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
import {
BadRequestException,
Body,
Controller,
Get,
HttpCode,
Logger,
NotFoundException,
Param,
Post,
Query,
UseGuards,
} from '@nestjs/common';
import { AgentTokenGuard } from '../common/guards/agent-token.guard';
import { ConnectionRegistry } from '../connections/connection-registry.service';
import { CommandCaptureService } from './command-capture.service';
import type { CaptureBatchRequest, CaptureWindowResponse } from '@betterdb/iovalkey-capture';

/**
* Wrapper-facing endpoints for iovalkey-capture poll and ingest.
* Authenticated by the MCP/agent token guard (same as MCP controller).
* Instance authorization: validates the instanceId exists in the connection
* registry (matches existing MCP endpoint pattern — tokens are not scoped
* to specific instances today).
*/
@Controller('api/capture/instance/:instanceId')
@UseGuards(AgentTokenGuard)
export class CommandCaptureController {
private readonly logger = new Logger(CommandCaptureController.name);

constructor(
private readonly captureService: CommandCaptureService,
private readonly registry: ConnectionRegistry,
) {}

/** Validate instanceId is a known connection. Throws NotFoundException if not. */
private assertInstanceExists(instanceId: string): void {
// registry.get() throws NotFoundException if not found
this.registry.get(instanceId);
}

/** Poll: wrapper asks if a capture window is active for this instance. */
@Get('window')
async getWindow(
@Param('instanceId') instanceId: string,
): Promise<CaptureWindowResponse> {
this.assertInstanceExists(instanceId);
return this.captureService.getActiveWindow(instanceId);
}

/** Ingest: wrapper posts a batch of captured commands. */
@Post('batch')
@HttpCode(200)
async ingestBatch(
@Param('instanceId') instanceId: string,
@Body() body: CaptureBatchRequest,
): Promise<{ accepted: number; dropped: boolean }> {
this.assertInstanceExists(instanceId);
if (!body || !Array.isArray(body.commands)) {
throw new BadRequestException('Invalid batch: commands array required');
}
return this.captureService.ingestBatch(instanceId, body);
}
}

/**
* User-facing endpoints to start/stop command capture sessions.
* Uses the same auth pattern as the monitor controller.
*/
@Controller('api/command-capture')

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Duplicate api route prefix

High Severity

The capture controllers register under api/... while other Nest routes use paths like monitor and rely on setGlobalPrefix('api') in production. That yields /api/api/... in prod (breaking CaptureValkey calls to /api/capture/...) and /api/command-capture/... in dev while the web client calls /command-capture/... without the extra segment.

Fix in Cursor Fix in Web

Reviewed by Cursor Bugbot for commit 990ba0f. Configure here.

export class CommandCaptureAdminController {
private readonly logger = new Logger(CommandCaptureAdminController.name);

constructor(private readonly captureService: CommandCaptureService) {}

/** Get the active command capture session for a connection, or null. */
@Get('status')
async status(
@Query('connectionId') connectionId: string,
) {
if (!connectionId) {
throw new BadRequestException('connectionId query param is required');
}
return this.captureService.getActiveWindow(connectionId);
}

/** Get the active session entity with full details (commandCount, etc). */
@Get('session')
async session(
@Query('connectionId') connectionId: string,
) {
if (!connectionId) {
throw new BadRequestException('connectionId query param is required');
}
const sessions = await this.captureService.getActiveSessions(connectionId);
return sessions[0] ?? null;
}

@Post('start')
async start(
@Body() body: { connectionId: string; durationMs: number; commandCap?: number; createdBy?: string },
) {
if (!body.connectionId || !body.durationMs) {
throw new BadRequestException('connectionId and durationMs are required');
}
if (body.durationMs <= 0 || body.durationMs > 24 * 60 * 60 * 1000) {
throw new BadRequestException('durationMs must be between 1 and 86400000 (24h)');
}
try {
return await this.captureService.startSession(body);
} catch (e) {
throw new BadRequestException((e as Error).message);
}
}

@Post('stop')
async stop(
@Body() body: { connectionId: string },
) {
if (!body.connectionId) {
throw new BadRequestException('connectionId is required');
}
const session = await this.captureService.stopSession(body.connectionId);
if (!session) {
return { stopped: false, message: 'No active session found' };
}
return { stopped: true, session };
}
}
13 changes: 13 additions & 0 deletions apps/api/src/command-capture/command-capture.module.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
import { Module } from '@nestjs/common';
import { StorageModule } from '../storage/storage.module';
import { ConnectionsModule } from '../connections/connections.module';
import { CommandCaptureService } from './command-capture.service';
import { CommandCaptureController, CommandCaptureAdminController } from './command-capture.controller';

@Module({
imports: [StorageModule, ConnectionsModule],
controllers: [CommandCaptureController, CommandCaptureAdminController],
providers: [CommandCaptureService],
exports: [CommandCaptureService],
})
export class CommandCaptureModule {}
Loading
Loading