Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -415,6 +415,24 @@ await migrations.runOne(ctx, internal.migrations.clearField, {
});
```

### Automatic bandwidth management

Migrations monitor transaction bandwidth usage during batch processing. If a
batch approaches
[transaction limits](https://docs.convex.dev/production/state/limits#transactions),
it may stop early and let the remaining documents be picked up in the next
batch.

This helps avoid hitting limits when your `migrateOne` function performs
expensive operations (reading related documents, writing to multiple tables,
etc.).

This works automatically with no configuration needed. If you frequently see
batches ending early, consider reducing the batch size for that migration.

**Note**: bandwidth monitoring is only available for sequential (non-parallel)
batch processing.

### Parallelizing batches

Each batch is processed serially, but within a batch you can have each
Expand Down
2 changes: 2 additions & 0 deletions example/convex/_generated/api.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
*/

import type * as example from "../example.js";
import type * as tests_shortCircuit from "../tests/shortCircuit.js";

import type {
ApiFromModules,
Expand All @@ -18,6 +19,7 @@ import type {

declare const fullApi: ApiFromModules<{
example: typeof example;
"tests/shortCircuit": typeof tests_shortCircuit;
}>;

/**
Expand Down
4 changes: 2 additions & 2 deletions example/convex/setup.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ import component from "@convex-dev/migrations/test";
const modules = import.meta.glob("./**/*.*s");
// When users want to write tests that use your component, they need to
// explicitly register it with its schema and modules.
export function initConvexTest() {
const t = convexTest(schema, modules);
export function initConvexTest(opts?: Parameters<typeof convexTest>[0]) {
const t = convexTest({ ...opts, schema, modules });
component.register(t);
return t;
}
Expand Down
109 changes: 109 additions & 0 deletions example/convex/tests/shortCircuit.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
import { afterEach, beforeEach, describe, expect, test, vi } from "vitest";
import { initConvexTest } from "../setup.test";
import { internal } from "../_generated/api";

/**
* Seed documents that all need migration (optionalField undefined).
*/
async function seedDocs(t: ReturnType<typeof initConvexTest>, count: number) {
await t.run(async (ctx) => {
for (let i = 0; i < count; i++) {
await ctx.db.insert("myTable", {
requiredField: "seed " + i,
optionalField: undefined,
unionField: "1",
});
}
});
}

describe("short-circuit with transaction limits", () => {
beforeEach(() => {
vi.useFakeTimers();
});
afterEach(() => {
vi.useRealTimers();
});

test("migration short-circuits and completes with tight write limits", async () => {
// documentsWritten limit of 10:
// Each batch: component state insert(1) + migration patches + state patch(1)
// + schedule. With batchSize 5, after ~4 patches: used > remaining,
// triggering short-circuit. Each scheduled batch gets a fresh budget.
const t = initConvexTest({ transactionLimits: { documentsWritten: 10 } });
await seedDocs(t, 10);

// Start migration via the runner (goes through component for scheduling).
await t.mutation(internal.tests.shortCircuit.setDefaultValueWithSchema, {
fn: "example:setDefaultValueWithSchema",
});

// Let all scheduled batches run to completion.
await t.finishAllScheduledFunctions(vi.runAllTimers);

// Verify all documents were migrated despite short-circuiting.
await t.run(async (ctx) => {
const docs = await ctx.db.query("myTable").collect();
expect(docs).toHaveLength(10);
expect(docs.every((d) => d.optionalField !== undefined)).toBe(true);
});
});

test("migration processes full batches when limits are generous", async () => {
const t = initConvexTest({ transactionLimits: true });
await seedDocs(t, 10);

await t.mutation(internal.tests.shortCircuit.setDefaultValueWithSchema, {
fn: "example:setDefaultValueWithSchema",
});
await t.finishAllScheduledFunctions(vi.runAllTimers);

await t.run(async (ctx) => {
const docs = await ctx.db.query("myTable").collect();
expect(docs).toHaveLength(10);
expect(docs.every((d) => d.optionalField !== undefined)).toBe(true);
});
});

test("short-circuit produces partial batch result", async () => {
// With documentsWritten: 10 and batchSize 10 on 10 docs:
// After ~5 patches, used(5) > remaining(5) is false, but after
// patch 6: used(6) > remaining(4) → short-circuit before doc 6.
const t = initConvexTest({ transactionLimits: { documentsWritten: 10 } });
await seedDocs(t, 10);

// Run a single batch directly (not through the component).
const result = await t.mutation(
internal.tests.shortCircuit.setDefaultValueWithSchema,
{
cursor: null,
batchSize: 10,
dryRun: false,
},
);

// Should have short-circuited: processed fewer than 10 docs.
expect(result.processed).toBeLessThan(10);
expect(result.processed).toBeGreaterThan(0);
expect(result.isDone).toBe(false);

Check failure on line 88 in example/convex/tests/shortCircuit.test.ts

View workflow job for this annotation

GitHub Actions / Test and lint

example/convex/tests/shortCircuit.test.ts > short-circuit with transaction limits > short-circuit produces partial batch result

AssertionError: expected undefined to be false // Object.is equality - Expected: false + Received: undefined ❯ example/convex/tests/shortCircuit.test.ts:88:27
// Cursor should have advanced (paginator gives exact cursor).
expect(result.continueCursor).not.toBeNull();
});

test("short-circuited migration eventually completes all docs", async () => {
const t = initConvexTest({ transactionLimits: { documentsWritten: 10 } });
await seedDocs(t, 6);

// Start and let scheduler finish all batches.
await t.mutation(internal.tests.shortCircuit.setDefaultValueWithSchema, {
fn: "example:setDefaultValueWithSchema",
});
await t.finishAllScheduledFunctions(vi.runAllTimers);

await t.run(async (ctx) => {
const docs = await ctx.db.query("myTable").collect();
expect(docs).toHaveLength(6);
expect(docs.every((d) => d.optionalField !== undefined)).toBe(true);
});
});
});
20 changes: 20 additions & 0 deletions example/convex/tests/shortCircuit.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import { Migrations } from "@convex-dev/migrations";
import { components } from "../_generated/api";
import { internalMutation } from "../_generated/server";
import schema from "../schema";

export const migrationsWithSchema = new Migrations(components.migrations, {
internalMutation,
schema,
});

// Migration with schema (uses paginator for exact short-circuit cursors).
export const setDefaultValueWithSchema = migrationsWithSchema.define({
table: "myTable",
batchSize: 10,
migrateOne: async (_ctx, doc) => {
if (doc.optionalField === undefined) {
return { optionalField: "default" };
}
},
});
57 changes: 53 additions & 4 deletions src/client/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,7 @@
const componentFlag = metadata.componentPath
? ` --component ${metadata.componentPath}`
: "";
throw new Error(

Check failure on line 340 in src/client/index.ts

View workflow job for this annotation

GitHub Actions / Test and lint

example/convex/tests/shortCircuit.test.ts > short-circuit with transaction limits > short-circuited migration eventually completes all docs

Error: Function name mismatch: expected tests/shortCircuit:setDefaultValueWithSchema, got example:setDefaultValueWithSchema. Hint: you can call it directly with `npx convex run tests/shortCircuit:setDefaultValueWithSchema`. ❯ handler src/client/index.ts:340:19 ❯ invokeFunction node_modules/convex/src/server/impl/registration_impl.ts:90:13 ❯ invokeMutation node_modules/convex/src/server/impl/registration_impl.ts:70:17 ❯ runMutationWithHandler node_modules/convex-test/dist/index.js:1539:31 ❯ Object.mutationFromPath node_modules/convex-test/dist/index.js:1668:32 ❯ mutation node_modules/convex-test/dist/index.js:1714:20 ❯ example/convex/tests/shortCircuit.test.ts:98:5

Check failure on line 340 in src/client/index.ts

View workflow job for this annotation

GitHub Actions / Test and lint

example/convex/tests/shortCircuit.test.ts > short-circuit with transaction limits > migration processes full batches when limits are generous

Error: Function name mismatch: expected tests/shortCircuit:setDefaultValueWithSchema, got example:setDefaultValueWithSchema. Hint: you can call it directly with `npx convex run tests/shortCircuit:setDefaultValueWithSchema`. ❯ handler src/client/index.ts:340:19 ❯ invokeFunction node_modules/convex/src/server/impl/registration_impl.ts:90:13 ❯ invokeMutation node_modules/convex/src/server/impl/registration_impl.ts:70:17 ❯ runMutationWithHandler node_modules/convex-test/dist/index.js:1539:31 ❯ Object.mutationFromPath node_modules/convex-test/dist/index.js:1668:32 ❯ mutation node_modules/convex-test/dist/index.js:1714:20 ❯ example/convex/tests/shortCircuit.test.ts:56:5

Check failure on line 340 in src/client/index.ts

View workflow job for this annotation

GitHub Actions / Test and lint

example/convex/tests/shortCircuit.test.ts > short-circuit with transaction limits > migration short-circuits and completes with tight write limits

Error: Function name mismatch: expected tests/shortCircuit:setDefaultValueWithSchema, got example:setDefaultValueWithSchema. Hint: you can call it directly with `npx convex run tests/shortCircuit:setDefaultValueWithSchema`. ❯ handler src/client/index.ts:340:19 ❯ invokeFunction node_modules/convex/src/server/impl/registration_impl.ts:90:13 ❯ invokeMutation node_modules/convex/src/server/impl/registration_impl.ts:70:17 ❯ runMutationWithHandler node_modules/convex-test/dist/index.js:1539:31 ❯ Object.mutationFromPath node_modules/convex-test/dist/index.js:1668:32 ❯ mutation node_modules/convex-test/dist/index.js:1714:20 ❯ example/convex/tests/shortCircuit.test.ts:37:5
`Function name mismatch: expected ${metadata.name}, got ${args.fn}. Hint: you can call it directly with \`npx convex run ${metadata.name}${componentFlag}\`.`,
);
}
Expand Down Expand Up @@ -406,6 +406,7 @@
({ continueCursor, page, isDone } = await range.paginate({
cursor: args.cursor,
numItems,
maximumBytesRead: 2 << 20, // 2 MiB
}));
} catch (e) {
console.error(
Expand Down Expand Up @@ -433,17 +434,55 @@
throw error;
}
}
let processedCount = 0;
let shortCircuited = false;
if (parallelize) {
await Promise.all(page.map(doOne));
processedCount = page.length;
} else {
for (const doc of page) {
// Only short-circuit when using the new paginator, since we
// need to re-paginate for an exact cursor. Without it we'd
// return the same cursor and re-process documents.
if (useNewPaginator && processedCount > 0) {
const metrics = await getTransactionMetrics();
if (isNearTransactionLimits(metrics)) {
const remaining = page.length - processedCount;
console.log(
`Short-circuiting batch after processing ` +
`${processedCount}/${page.length} documents due to ` +
`transaction limits. ${remaining} documents will be ` +
`retried in the next batch.`,
);
shortCircuited = true;
break;
}
}
await doOne(doc);
processedCount++;
}
}
const result = {
continueCursor,
isDone,
processed: page.length,
let shortCircuitCursor: string = continueCursor;
if (shortCircuited && processedCount > 0) {
// Re-paginate with the exact count to get a precise cursor.
const q2 = paginator(ctx.db as any, paginatorSchema as any).query(
table,
) as unknown as QueryInitializer<
NamedTableInfo<DataModel, TableName>
>;
const range2 = customRange ? customRange(q2) : q2;
const { continueCursor: exactCursor } = await (
range2 as any
).paginate({
cursor: args.cursor,
numItems: processedCount,
});
shortCircuitCursor = exactCursor;
}
const result: MigrationResult = {
continueCursor: shortCircuited ? shortCircuitCursor : continueCursor,
isDone: shortCircuited ? false : isDone,
processed: processedCount,
};
if (args.dryRun) {
// Throwing an error rolls back the transaction
Expand Down Expand Up @@ -739,6 +778,7 @@
const address = getFunctionAddress(fnRef);
const fnHandle =
address.functionHandle ?? (await createFunctionHandle(fnRef));
let previousProcessed = 0;
while (true) {
const status = await ctx.runMutation(component.lib.migrate, {
name,
Expand Down Expand Up @@ -780,6 +820,15 @@
return typeof cursor === "string" && cursor.startsWith("[");
}

function isNearTransactionLimits(metrics: TransactionMetrics): boolean {
for (const metric of Object.values(metrics) as TransactionMetric[]) {
if (metric.remaining < metric.used) {
return true;
}
}
return false;
}

/* Type utils follow */

type QueryCtx = Pick<GenericQueryCtx<GenericDataModel>, "runQuery">;
Expand Down
Loading