From 26eca8f826e9c6e5e4e6e2c928a99ee2ab53c1e0 Mon Sep 17 00:00:00 2001 From: Ian Macartney <366683+ianmacartney@users.noreply.github.com> Date: Thu, 16 Apr 2026 00:05:47 -0700 Subject: [PATCH 1/3] restore --- README.md | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/README.md b/README.md index 410acf3..0d561a9 100644 --- a/README.md +++ b/README.md @@ -415,6 +415,24 @@ await migrations.runOne(ctx, internal.migrations.clearField, { }); ``` +### Automatic bandwidth management + +Migrations monitor transaction bandwidth usage during batch processing. If a +batch approaches +[transaction limits](https://docs.convex.dev/production/state/limits#transactions), +it may stop early and let the remaining documents be picked up in the next +batch. + +This helps avoid hitting limits when your `migrateOne` function performs +expensive operations (reading related documents, writing to multiple tables, +etc.). + +This works automatically with no configuration needed. If you frequently see +batches ending early, consider reducing the batch size for that migration. + +**Note**: bandwidth monitoring is only available for sequential (non-parallel) +batch processing. + ### Parallelizing batches Each batch is processed serially, but within a batch you can have each From 2dc6acced8d08c81ab680b5f0514926825401d83 Mon Sep 17 00:00:00 2001 From: Ian Macartney <366683+ianmacartney@users.noreply.github.com> Date: Wed, 15 Apr 2026 21:23:09 -0700 Subject: [PATCH 2/3] test short circuiting --- example/convex/_generated/api.d.ts | 2 + example/convex/setup.test.ts | 4 +- example/convex/tests/shortCircuit.test.ts | 109 ++++++++++++++++++++++ example/convex/tests/shortCircuit.ts | 20 ++++ 4 files changed, 133 insertions(+), 2 deletions(-) create mode 100644 example/convex/tests/shortCircuit.test.ts create mode 100644 example/convex/tests/shortCircuit.ts diff --git a/example/convex/_generated/api.d.ts b/example/convex/_generated/api.d.ts index efd266b..2386890 100644 --- a/example/convex/_generated/api.d.ts +++ b/example/convex/_generated/api.d.ts @@ -9,6 +9,7 @@ */ import type * as example from "../example.js"; +import type * as tests_shortCircuit from "../tests/shortCircuit.js"; import type { ApiFromModules, @@ -18,6 +19,7 @@ import type { declare const fullApi: ApiFromModules<{ example: typeof example; + "tests/shortCircuit": typeof tests_shortCircuit; }>; /** diff --git a/example/convex/setup.test.ts b/example/convex/setup.test.ts index 9a982e7..7d445a3 100644 --- a/example/convex/setup.test.ts +++ b/example/convex/setup.test.ts @@ -7,8 +7,8 @@ import component from "@convex-dev/migrations/test"; const modules = import.meta.glob("./**/*.*s"); // When users want to write tests that use your component, they need to // explicitly register it with its schema and modules. -export function initConvexTest() { - const t = convexTest(schema, modules); +export function initConvexTest(opts?: Parameters[0]) { + const t = convexTest({ ...opts, schema, modules }); component.register(t); return t; } diff --git a/example/convex/tests/shortCircuit.test.ts b/example/convex/tests/shortCircuit.test.ts new file mode 100644 index 0000000..6706e2f --- /dev/null +++ b/example/convex/tests/shortCircuit.test.ts @@ -0,0 +1,109 @@ +import { afterEach, beforeEach, describe, expect, test, vi } from "vitest"; +import { initConvexTest } from "../setup.test"; +import { internal } from "../_generated/api"; + +/** + * Seed documents that all need migration (optionalField undefined). + */ +async function seedDocs(t: ReturnType, count: number) { + await t.run(async (ctx) => { + for (let i = 0; i < count; i++) { + await ctx.db.insert("myTable", { + requiredField: "seed " + i, + optionalField: undefined, + unionField: "1", + }); + } + }); +} + +describe("short-circuit with transaction limits", () => { + beforeEach(() => { + vi.useFakeTimers(); + }); + afterEach(() => { + vi.useRealTimers(); + }); + + test("migration short-circuits and completes with tight write limits", async () => { + // documentsWritten limit of 10: + // Each batch: component state insert(1) + migration patches + state patch(1) + // + schedule. With batchSize 5, after ~4 patches: used > remaining, + // triggering short-circuit. Each scheduled batch gets a fresh budget. + const t = initConvexTest({ transactionLimits: { documentsWritten: 10 } }); + await seedDocs(t, 10); + + // Start migration via the runner (goes through component for scheduling). + await t.mutation(internal.tests.shortCircuit.setDefaultValueWithSchema, { + fn: "example:setDefaultValueWithSchema", + }); + + // Let all scheduled batches run to completion. + await t.finishAllScheduledFunctions(vi.runAllTimers); + + // Verify all documents were migrated despite short-circuiting. + await t.run(async (ctx) => { + const docs = await ctx.db.query("myTable").collect(); + expect(docs).toHaveLength(10); + expect(docs.every((d) => d.optionalField !== undefined)).toBe(true); + }); + }); + + test("migration processes full batches when limits are generous", async () => { + const t = initConvexTest({ transactionLimits: true }); + await seedDocs(t, 10); + + await t.mutation(internal.tests.shortCircuit.setDefaultValueWithSchema, { + fn: "example:setDefaultValueWithSchema", + }); + await t.finishAllScheduledFunctions(vi.runAllTimers); + + await t.run(async (ctx) => { + const docs = await ctx.db.query("myTable").collect(); + expect(docs).toHaveLength(10); + expect(docs.every((d) => d.optionalField !== undefined)).toBe(true); + }); + }); + + test("short-circuit produces partial batch result", async () => { + // With documentsWritten: 10 and batchSize 10 on 10 docs: + // After ~5 patches, used(5) > remaining(5) is false, but after + // patch 6: used(6) > remaining(4) → short-circuit before doc 6. + const t = initConvexTest({ transactionLimits: { documentsWritten: 10 } }); + await seedDocs(t, 10); + + // Run a single batch directly (not through the component). + const result = await t.mutation( + internal.tests.shortCircuit.setDefaultValueWithSchema, + { + cursor: null, + batchSize: 10, + dryRun: false, + }, + ); + + // Should have short-circuited: processed fewer than 10 docs. + expect(result.processed).toBeLessThan(10); + expect(result.processed).toBeGreaterThan(0); + expect(result.isDone).toBe(false); + // Cursor should have advanced (paginator gives exact cursor). + expect(result.continueCursor).not.toBeNull(); + }); + + test("short-circuited migration eventually completes all docs", async () => { + const t = initConvexTest({ transactionLimits: { documentsWritten: 10 } }); + await seedDocs(t, 6); + + // Start and let scheduler finish all batches. + await t.mutation(internal.tests.shortCircuit.setDefaultValueWithSchema, { + fn: "example:setDefaultValueWithSchema", + }); + await t.finishAllScheduledFunctions(vi.runAllTimers); + + await t.run(async (ctx) => { + const docs = await ctx.db.query("myTable").collect(); + expect(docs).toHaveLength(6); + expect(docs.every((d) => d.optionalField !== undefined)).toBe(true); + }); + }); +}); diff --git a/example/convex/tests/shortCircuit.ts b/example/convex/tests/shortCircuit.ts new file mode 100644 index 0000000..873c895 --- /dev/null +++ b/example/convex/tests/shortCircuit.ts @@ -0,0 +1,20 @@ +import { Migrations } from "@convex-dev/migrations"; +import { components } from "../_generated/api"; +import { internalMutation } from "../_generated/server"; +import schema from "../schema"; + +export const migrationsWithSchema = new Migrations(components.migrations, { + internalMutation, + schema, +}); + +// Migration with schema (uses paginator for exact short-circuit cursors). +export const setDefaultValueWithSchema = migrationsWithSchema.define({ + table: "myTable", + batchSize: 10, + migrateOne: async (_ctx, doc) => { + if (doc.optionalField === undefined) { + return { optionalField: "default" }; + } + }, +}); From d4c0381a096626cb37ed2849b8af7295ec5400b1 Mon Sep 17 00:00:00 2001 From: Ian Macartney <366683+ianmacartney@users.noreply.github.com> Date: Wed, 15 Apr 2026 22:59:51 -0700 Subject: [PATCH 3/3] wip short circuiting --- src/client/index.ts | 57 +++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 53 insertions(+), 4 deletions(-) diff --git a/src/client/index.ts b/src/client/index.ts index befb6c5..5db467d 100644 --- a/src/client/index.ts +++ b/src/client/index.ts @@ -406,6 +406,7 @@ export class Migrations< ({ continueCursor, page, isDone } = await range.paginate({ cursor: args.cursor, numItems, + maximumBytesRead: 2 << 20, // 2 MiB })); } catch (e) { console.error( @@ -433,17 +434,55 @@ export class Migrations< throw error; } } + let processedCount = 0; + let shortCircuited = false; if (parallelize) { await Promise.all(page.map(doOne)); + processedCount = page.length; } else { for (const doc of page) { + // Only short-circuit when using the new paginator, since we + // need to re-paginate for an exact cursor. Without it we'd + // return the same cursor and re-process documents. + if (useNewPaginator && processedCount > 0) { + const metrics = await getTransactionMetrics(); + if (isNearTransactionLimits(metrics)) { + const remaining = page.length - processedCount; + console.log( + `Short-circuiting batch after processing ` + + `${processedCount}/${page.length} documents due to ` + + `transaction limits. ${remaining} documents will be ` + + `retried in the next batch.`, + ); + shortCircuited = true; + break; + } + } await doOne(doc); + processedCount++; } } - const result = { - continueCursor, - isDone, - processed: page.length, + let shortCircuitCursor: string = continueCursor; + if (shortCircuited && processedCount > 0) { + // Re-paginate with the exact count to get a precise cursor. + const q2 = paginator(ctx.db as any, paginatorSchema as any).query( + table, + ) as unknown as QueryInitializer< + NamedTableInfo + >; + const range2 = customRange ? customRange(q2) : q2; + const { continueCursor: exactCursor } = await ( + range2 as any + ).paginate({ + cursor: args.cursor, + numItems: processedCount, + }); + shortCircuitCursor = exactCursor; + } + const result: MigrationResult = { + continueCursor: shortCircuited ? shortCircuitCursor : continueCursor, + isDone: shortCircuited ? false : isDone, + processed: processedCount, }; if (args.dryRun) { // Throwing an error rolls back the transaction @@ -739,6 +778,7 @@ export async function runToCompletion( const address = getFunctionAddress(fnRef); const fnHandle = address.functionHandle ?? (await createFunctionHandle(fnRef)); + let previousProcessed = 0; while (true) { const status = await ctx.runMutation(component.lib.migrate, { name, @@ -780,6 +820,15 @@ export function isNewFormatCursor(cursor: string | null): boolean { return typeof cursor === "string" && cursor.startsWith("["); } +function isNearTransactionLimits(metrics: TransactionMetrics): boolean { + for (const metric of Object.values(metrics) as TransactionMetric[]) { + if (metric.remaining < metric.used) { + return true; + } + } + return false; +} + /* Type utils follow */ type QueryCtx = Pick, "runQuery">;