From 78a787165bfb3284b1dbaae62ded9375dd5644b2 Mon Sep 17 00:00:00 2001 From: Fabien Date: Thu, 26 Feb 2026 15:33:45 +0100 Subject: [PATCH 1/8] Store transaction inputs/outputs address as string This release some constraints on the database and improve the performances. --- .../migration.sql | 44 +++++++++++++++++++ prisma-local/schema.prisma | 10 +---- services/chronikService.ts | 31 +++---------- services/transactionService.ts | 16 +++---- tests/unittests/transactionService.test.ts | 12 ++--- 5 files changed, 67 insertions(+), 46 deletions(-) create mode 100644 prisma-local/migrations/20260223150000_change_transaction_input_output_address_to_string/migration.sql diff --git a/prisma-local/migrations/20260223150000_change_transaction_input_output_address_to_string/migration.sql b/prisma-local/migrations/20260223150000_change_transaction_input_output_address_to_string/migration.sql new file mode 100644 index 00000000..65f604fb --- /dev/null +++ b/prisma-local/migrations/20260223150000_change_transaction_input_output_address_to_string/migration.sql @@ -0,0 +1,44 @@ +-- Drop foreign key constraints +ALTER TABLE `TransactionInput` DROP FOREIGN KEY `TransactionInput_addressId_fkey`; +ALTER TABLE `TransactionOutput` DROP FOREIGN KEY `TransactionOutput_addressId_fkey`; + +-- Drop old indexes +ALTER TABLE `TransactionInput` DROP INDEX `TransactionInput_addressId_idx`; +ALTER TABLE `TransactionOutput` DROP INDEX `TransactionOutput_addressId_idx`; + +-- Add new address column (nullable temporarily) +ALTER TABLE `TransactionInput` ADD COLUMN `address` VARCHAR(255) NULL; +ALTER TABLE `TransactionOutput` ADD COLUMN `address` VARCHAR(255) NULL; + +-- Populate address column from Address table +UPDATE `TransactionInput` ti +INNER JOIN `Address` a ON ti.`addressId` = a.`id` +SET ti.`address` = a.`address`; + +UPDATE `TransactionOutput` tout +INNER JOIN `Address` a ON tout.`addressId` = a.`id` +SET tout.`address` = a.`address`; + +-- Make address column NOT NULL +ALTER TABLE `TransactionInput` MODIFY COLUMN `address` VARCHAR(255) NOT NULL; +ALTER TABLE `TransactionOutput` MODIFY COLUMN `address` VARCHAR(255) NOT NULL; + +-- Delete Address entries that were only used by TransactionInput/TransactionOutput +-- These addresses are no longer needed since we store addresses as strings +DELETE a FROM `Address` a +WHERE ( + -- Address is referenced in TransactionInput or TransactionOutput + EXISTS (SELECT 1 FROM `TransactionInput` ti WHERE ti.`addressId` = a.`id`) + OR EXISTS (SELECT 1 FROM `TransactionOutput` tout WHERE tout.`addressId` = a.`id`) +) +AND NOT ( + -- But exclude addresses that are still used elsewhere + EXISTS (SELECT 1 FROM `Transaction` t WHERE t.`addressId` = a.`id`) + OR EXISTS (SELECT 1 FROM `AddressesOnUserProfiles` aup WHERE aup.`addressId` = a.`id`) + OR EXISTS (SELECT 1 FROM `AddressesOnButtons` ab WHERE ab.`addressId` = a.`id`) + OR EXISTS (SELECT 1 FROM `ClientPayment` cp WHERE cp.`addressString` = a.`address`) +); + +-- Drop old addressId column +ALTER TABLE `TransactionInput` DROP COLUMN `addressId`; +ALTER TABLE `TransactionOutput` DROP COLUMN `addressId`; diff --git a/prisma-local/schema.prisma b/prisma-local/schema.prisma index 97c95e04..2abad1bc 100644 --- a/prisma-local/schema.prisma +++ b/prisma-local/schema.prisma @@ -23,8 +23,6 @@ model Address { paybuttons AddressesOnButtons[] transactions Transaction[] clientPayments ClientPayment[] - transactionInputs TransactionInput[] - transactionOutputs TransactionOutput[] @@index([networkId], map: "Address_networkId_fkey") } @@ -93,27 +91,23 @@ model Transaction { model TransactionInput { id String @id @default(dbgenerated("(uuid())")) transactionId String - addressId String + address String @db.VarChar(255) index Int transaction Transaction @relation(fields: [transactionId], references: [id], onDelete: Cascade) - address Address @relation(fields: [addressId], references: [id], onDelete: Cascade) amount Decimal @db.Decimal(24, 8) @@index([transactionId]) - @@index([addressId]) } model TransactionOutput { id String @id @default(dbgenerated("(uuid())")) transactionId String - addressId String + address String @db.VarChar(255) index Int transaction Transaction @relation(fields: [transactionId], references: [id], onDelete: Cascade) - address Address @relation(fields: [addressId], references: [id], onDelete: Cascade) amount Decimal @db.Decimal(24, 8) @@index([transactionId]) - @@index([addressId]) } model Wallet { diff --git a/services/chronikService.ts b/services/chronikService.ts index 63b708e0..8b08934e 100644 --- a/services/chronikService.ts +++ b/services/chronikService.ts @@ -20,7 +20,7 @@ import { import { Address, Prisma, ClientPaymentStatus } from '@prisma/client' import xecaddr from 'xecaddrjs' import { getAddressPrefix, satoshisToUnit } from 'utils/index' -import { fetchAddressesArray, fetchAllAddressesForNetworkId, getEarliestUnconfirmedTxTimestampForAddress, getLatestConfirmedTxTimestampForAddress, setSyncing, setSyncingBatch, updateLastSynced, updateManyLastSynced, upsertAddress } from './addressService' +import { fetchAddressesArray, fetchAllAddressesForNetworkId, getEarliestUnconfirmedTxTimestampForAddress, getLatestConfirmedTxTimestampForAddress, setSyncing, setSyncingBatch, updateLastSynced, updateManyLastSynced } from './addressService' import * as ws from 'ws' import { BroadcastTxData } from 'ws-service/types' import config from 'config' @@ -288,26 +288,9 @@ export class ChronikBlockchainClient { const inputAddresses = this.getSortedInputAddresses(transaction) const outputAddresses = this.getSortedOutputAddresses(transaction) - const uniqueAddressStrings = [...new Set([ - ...inputAddresses.map(({ address: addr }) => addr), - ...outputAddresses.map(({ address: addr }) => addr) - ])] - const addressIdMap = new Map() - await Promise.all( - uniqueAddressStrings.map(async (addrStr) => { - try { - const parsed = parseAddress(addrStr) - const addr = await upsertAddress(parsed) - addressIdMap.set(parsed, addr.id) - } catch { - // Skip invalid addresses: don't upsert, don't add to map - } - }) - ) - - const getAddressId = (addr: string): string | undefined => { + const parseAddressString = (addr: string): string | undefined => { try { - return addressIdMap.get(parseAddress(addr)) + return parseAddress(addr) } catch { return undefined } @@ -322,13 +305,13 @@ export class ChronikBlockchainClient { opReturn, inputs: { create: inputAddresses - .map(({ address: addr, amount: amt }, i) => ({ addressId: getAddressId(addr), index: i, amount: amt })) - .filter((item): item is { addressId: string, index: number, amount: Prisma.Decimal } => item.addressId !== undefined) + .map(({ address: addr, amount: amt }, i) => ({ address: parseAddressString(addr), index: i, amount: amt })) + .filter((item): item is { address: string, index: number, amount: Prisma.Decimal } => item.address !== undefined) }, outputs: { create: outputAddresses - .map(({ address: addr, amount: amt }, i) => ({ addressId: getAddressId(addr), index: i, amount: amt })) - .filter((item): item is { addressId: string, index: number, amount: Prisma.Decimal } => item.addressId !== undefined) + .map(({ address: addr, amount: amt }, i) => ({ address: parseAddressString(addr), index: i, amount: amt })) + .filter((item): item is { address: string, index: number, amount: Prisma.Decimal } => item.address !== undefined) } } } diff --git a/services/transactionService.ts b/services/transactionService.ts index 825772b9..e3d0dd6a 100644 --- a/services/transactionService.ts +++ b/services/transactionService.ts @@ -54,10 +54,10 @@ export function getSimplifiedTrasaction (tx: TransactionWithAddressAndPrices, in const parsedOpReturn = resolveOpReturn(opReturn) - const dbInputsArr = (tx as { inputs?: Array<{ address: { address: string }, amount: Prisma.Decimal }> }).inputs - const dbOutputsArr = (tx as { outputs?: Array<{ address: { address: string }, amount: Prisma.Decimal }> }).outputs - const resolvedInputAddresses = inputAddresses ?? (Array.isArray(dbInputsArr) ? dbInputsArr.map(i => ({ address: i.address.address, amount: i.amount })) : []) - const resolvedOutputAddresses = outputAddresses ?? (Array.isArray(dbOutputsArr) ? dbOutputsArr.map(o => ({ address: o.address.address, amount: o.amount })) : []) + const dbInputsArr = (tx as { inputs?: Array<{ address: string, amount: Prisma.Decimal }> }).inputs + const dbOutputsArr = (tx as { outputs?: Array<{ address: string, amount: Prisma.Decimal }> }).outputs + const resolvedInputAddresses = inputAddresses ?? (Array.isArray(dbInputsArr) ? dbInputsArr.map(i => ({ address: i.address, amount: i.amount })) : []) + const resolvedOutputAddresses = outputAddresses ?? (Array.isArray(dbOutputsArr) ? dbOutputsArr.map(o => ({ address: o.address, amount: o.amount })) : []) const simplifiedTransaction: SimplifiedTransaction = { hash, @@ -96,8 +96,8 @@ const includePrices = { const includeAddressAndPrices = { address: true, ...includePrices, - inputs: { include: { address: true }, orderBy: { index: 'asc' as const } }, - outputs: { include: { address: true }, orderBy: { index: 'asc' as const } } + inputs: { orderBy: { index: 'asc' as const } }, + outputs: { orderBy: { index: 'asc' as const } } } const transactionWithPrices = Prisma.validator()( @@ -137,8 +137,8 @@ const includePaybuttonsAndPrices = { } }, ...includePrices, - inputs: { include: { address: true }, orderBy: { index: 'asc' as const } }, - outputs: { include: { address: true }, orderBy: { index: 'asc' as const } } + inputs: { orderBy: { index: 'asc' as const } }, + outputs: { orderBy: { index: 'asc' as const } } } export const includePaybuttonsAndPricesAndInvoices = { ...includePaybuttonsAndPrices, diff --git a/tests/unittests/transactionService.test.ts b/tests/unittests/transactionService.test.ts index 0b88f919..4293dbc1 100644 --- a/tests/unittests/transactionService.test.ts +++ b/tests/unittests/transactionService.test.ts @@ -27,8 +27,8 @@ const includePaybuttonsAndPrices = { } }, ...includePrices, - inputs: { include: { address: true }, orderBy: { index: 'asc' as const } }, - outputs: { include: { address: true }, orderBy: { index: 'asc' as const } } + inputs: { orderBy: { index: 'asc' as const } }, + outputs: { orderBy: { index: 'asc' as const } } } describe('Create services', () => { @@ -199,12 +199,12 @@ describe('Address object arrays (input/output) integration', () => { it('getSimplifiedTrasaction uses inputs/outputs from tx when not provided explicitly', () => { const inputsFromDb = [ - { address: { address: 'ecash:qqinput1' }, amount: new Prisma.Decimal(1.23) }, - { address: { address: 'ecash:qqinput2' }, amount: new Prisma.Decimal(4.56) } + { address: 'ecash:qqinput1', amount: new Prisma.Decimal(1.23) }, + { address: 'ecash:qqinput2', amount: new Prisma.Decimal(4.56) } ] const outputsFromDb = [ - { address: { address: 'ecash:qqout1' }, amount: new Prisma.Decimal(7.89) }, - { address: { address: 'ecash:qqout2' }, amount: new Prisma.Decimal(0.12) } + { address: 'ecash:qqout1', amount: new Prisma.Decimal(7.89) }, + { address: 'ecash:qqout2', amount: new Prisma.Decimal(0.12) } ] const tx: any = { hash: 'hash1', From 90df86f63ec42cd169100731c91a9fb0f43ec8ac Mon Sep 17 00:00:00 2001 From: Fabien Date: Fri, 27 Feb 2026 15:38:01 +0100 Subject: [PATCH 2/8] Reduce concurrency For the chronik calls, and the db batch sizes. This avoids too long db transactions from impacting the performances and allow to start working faster on the chronik transactions. --- constants/index.ts | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/constants/index.ts b/constants/index.ts index de1daea1..f4b630c0 100644 --- a/constants/index.ts +++ b/constants/index.ts @@ -277,9 +277,9 @@ export const MEMPOOL_PROCESS_DELAY = 100 // On chronik, the max allowed is 200 export const CHRONIK_FETCH_N_TXS_PER_PAGE = 200 -export const INITIAL_ADDRESS_SYNC_FETCH_CONCURRENTLY = 128 -export const TX_EMIT_BATCH_SIZE = 2_000 // for our generator, not chronik -export const DB_COMMIT_BATCH_SIZE = 2_000 // tamanho dos lotes para commit no DB +export const INITIAL_ADDRESS_SYNC_FETCH_CONCURRENTLY = 16 +export const TX_EMIT_BATCH_SIZE = 200 // for our generator, not chronik +export const DB_COMMIT_BATCH_SIZE = 200 // tamanho dos lotes para commit no DB export const TRIGGER_POST_CONCURRENCY = 100 export const TRIGGER_EMAIL_CONCURRENCY = 100 From 707f22126c9b0139b555429f0d0901381a20e254 Mon Sep 17 00:00:00 2001 From: Fabien Date: Fri, 27 Feb 2026 16:07:36 +0100 Subject: [PATCH 3/8] Update the logic in createManyTransactions This avoid initiating a DB transaction for each tx, and favor bulk operations instead. Since it also removes the batch processing this removes the connection limits to the db issue entirely when an address has too many txs. --- services/transactionService.ts | 229 ++++++++++++++++++++++++++++----- 1 file changed, 194 insertions(+), 35 deletions(-) diff --git a/services/transactionService.ts b/services/transactionService.ts index e3d0dd6a..8cc44206 100644 --- a/services/transactionService.ts +++ b/services/transactionService.ts @@ -558,47 +558,209 @@ export async function connectAllTransactionsToPrices (): Promise { console.log('[PRICES] Finished connecting txs to prices.') } -interface TxDistinguished { - tx: TransactionWithNetwork - isCreated: boolean -} - export async function createManyTransactions ( transactionsData: Prisma.TransactionUncheckedCreateInput[] ): Promise { - const insertedTransactionsDistinguished: TxDistinguished[] = [] + if (transactionsData.length === 0) { + return [] + } + + // Extract flat transaction data and separate inputs/outputs + const flatTxData = transactionsData.map(tx => ({ + hash: tx.hash, + amount: tx.amount, + timestamp: tx.timestamp, + addressId: tx.addressId, + confirmed: tx.confirmed ?? false, + opReturn: tx.opReturn ?? '', + orphaned: false + })) + + const txInputsOutputs = transactionsData.map((tx) => { + const inputs = (tx.inputs != null) && 'create' in tx.inputs ? tx.inputs.create : [] + const outputs = (tx.outputs != null) && 'create' in tx.outputs ? tx.outputs.create : [] + return { + hash: tx.hash, + addressId: tx.addressId, + inputs: Array.isArray(inputs) ? inputs : [], + outputs: Array.isArray(outputs) ? outputs : [] + } + }) + + const insertedTransactions: TransactionWithNetwork[] = [] + const updatedTransactions: TransactionWithNetwork[] = [] await prisma.$transaction( async (prisma) => { - const BATCH_SIZE = 50 - for (let i = 0; i < transactionsData.length; i += BATCH_SIZE) { - const batch = transactionsData.slice(i, i + BATCH_SIZE) - - const results = await Promise.all( - batch.map(async (tx) => - await prisma.transaction.upsert({ - create: tx, - where: { - Transaction_hash_addressId_unique_constraint: { - hash: tx.hash, - addressId: tx.addressId - } - }, - update: { - confirmed: tx.confirmed, - timestamp: tx.timestamp - }, - include: includeNetwork + // 1. Query existing transactions in one go + const existingTxs = await prisma.transaction.findMany({ + where: { + OR: flatTxData.map(tx => ({ + hash: tx.hash, + addressId: tx.addressId + })) + }, + select: { + id: true, + hash: true, + addressId: true, + confirmed: true, + timestamp: true, + orphaned: true + } + }) + + // Create a map for quick lookup + const existingMap = new Map() + for (const tx of existingTxs) { + existingMap.set(`${tx.hash}:${tx.addressId}`, tx) + } + + // 2. Split into new and existing transactions + const newTxs: typeof flatTxData = [] + const newTxsInputsOutputs: typeof txInputsOutputs = [] + const toUpdate: Array<{ + id: string + confirmed: boolean + timestamp: number + orphaned: boolean + }> = [] + + for (let i = 0; i < flatTxData.length; i++) { + const tx = flatTxData[i] + const key = `${tx.hash}:${tx.addressId}` + const existing = existingMap.get(key) + + if (existing != null) { + // Check if confirmed, timestamp, or orphaned changed. These are the + // only fields that can be changed after the transaction is created. + // This is to avoid updating the transaction with the same data. + const confirmedChanged = existing.confirmed !== tx.confirmed + const timestampChanged = existing.timestamp !== tx.timestamp + const orphanedChanged = existing.orphaned !== tx.orphaned + + if (confirmedChanged || timestampChanged || orphanedChanged) { + toUpdate.push({ + id: existing.id, + confirmed: tx.confirmed, + timestamp: tx.timestamp, + orphaned: tx.orphaned }) - ) - ) + } + } else { + newTxs.push(tx) + newTxsInputsOutputs.push(txInputsOutputs[i]) + } + } + + // 3. Create new transactions using createMany (bulk operation) + if (newTxs.length > 0) { + // Create all transactions at once (without inputs/outputs) + await prisma.transaction.createMany({ + data: newTxs, + skipDuplicates: true + }) + + // Query back the created transactions to get their IDs + const createdTxs = await prisma.transaction.findMany({ + where: { + OR: newTxs.map(tx => ({ + hash: tx.hash, + addressId: tx.addressId + })) + }, + select: { + id: true, + hash: true, + addressId: true, + address: { + select: { + networkId: true + } + } + } + }) + + // Create a map to match transactions with their inputs/outputs + const txMap = new Map() + for (let i = 0; i < newTxs.length; i++) { + const tx = newTxs[i] + const created = createdTxs.find(ct => ct.hash === tx.hash && ct.addressId === tx.addressId) + if (created != null) { + txMap.set(`${tx.hash}:${tx.addressId}`, { + tx: created as any, + inputs: newTxsInputsOutputs[i].inputs, + outputs: newTxsInputsOutputs[i].outputs + }) + } + } + + // Create all inputs at once + const allInputs: Array<{ transactionId: string, address: string, index: number, amount: Prisma.Decimal }> = [] + for (const [, { tx, inputs }] of txMap) { + for (const input of inputs) { + allInputs.push({ + transactionId: tx.id, + address: input.address, + index: input.index, + amount: input.amount instanceof Prisma.Decimal ? input.amount : new Prisma.Decimal(input.amount as string | number) + }) + } + } + + if (allInputs.length > 0) { + await prisma.transactionInput.createMany({ + data: allInputs, + skipDuplicates: true + }) + } + + // Create all outputs at once + const allOutputs: Array<{ transactionId: string, address: string, index: number, amount: Prisma.Decimal }> = [] + for (const [, { tx, outputs }] of txMap) { + for (const output of outputs) { + allOutputs.push({ + transactionId: tx.id, + address: output.address, + index: output.index, + amount: output.amount instanceof Prisma.Decimal ? output.amount : new Prisma.Decimal(output.amount as string | number) + }) + } + } - for (const upsertedTx of results) { - insertedTransactionsDistinguished.push({ - tx: upsertedTx, - isCreated: upsertedTx.createdAt.getTime() === upsertedTx.updatedAt.getTime() + if (allOutputs.length > 0) { + await prisma.transactionOutput.createMany({ + data: allOutputs, + skipDuplicates: true }) } + + // Fetch the full transactions with includes for return value + const fullTxs = await prisma.transaction.findMany({ + where: { + id: { in: createdTxs.map(t => t.id) } + }, + include: includeNetwork + }) + + insertedTransactions.push(...fullTxs) + } + + // 4. Update existing transactions that changed + if (toUpdate.length > 0) { + const updatePromises = toUpdate.map(async update => + await prisma.transaction.update({ + where: { id: update.id }, + data: { + confirmed: update.confirmed, + timestamp: update.timestamp, + orphaned: update.orphaned + }, + include: includeNetwork + }) + ) + const updated = await Promise.all(updatePromises) + updatedTransactions.push(...updated) } }, { @@ -606,10 +768,7 @@ export async function createManyTransactions ( } ) - const insertedTransactions = insertedTransactionsDistinguished - .filter((txD) => txD.isCreated) - .map((txD) => txD.tx) - + // 5. Connect prices only for newly created transactions await connectTransactionsListToPrices(insertedTransactions) const txsWithPaybuttonsAndPrices = await fetchTransactionsWithPaybuttonsAndPricesForIdList(insertedTransactions.map((tx) => tx.id)) From 5fec2a5b421335f78dca9b9a880d9185cc9655c7 Mon Sep 17 00:00:00 2001 From: Fabien Date: Fri, 27 Feb 2026 16:18:18 +0100 Subject: [PATCH 4/8] Optimize getTransactionFromChronikTransaction This is a 2-step optimization: - remove unneeded async in satoshisToUnit() callsites, this reduce locks - optimize the inputs and outputs processing by avoiding looping several times, avoid copying arrays and doing redundant formatting/checks. This also fixes a logging error and a loop exit condition that could potentially cause an infinite loop. Inspired by https://github.com/PayButton/paybutton-server/commit/6298e8451afd74358fd4481ef216c4faad802419 and https://github.com/PayButton/paybutton-server/commit/c4cb339af784cf608b2750d62a95fb6886dd3a08. This is a ~500x improvement on my machine. --- pages/api/address/balance/[address].ts | 2 +- services/chronikService.ts | 87 ++++++++++++-------------- utils/index.ts | 2 +- 3 files changed, 43 insertions(+), 48 deletions(-) diff --git a/pages/api/address/balance/[address].ts b/pages/api/address/balance/[address].ts index 6bc0b3ca..93801e95 100644 --- a/pages/api/address/balance/[address].ts +++ b/pages/api/address/balance/[address].ts @@ -17,7 +17,7 @@ export default async (req: NextApiRequest, res: NextApiResponse): Promise try { const address = parseAddress(req.query.address as string) const response = await multiBlockchainClient.getBalance(address) - const balance = await satoshisToUnit(response, xecaddr.detectAddressFormat(address)) + const balance = satoshisToUnit(response, xecaddr.detectAddressFormat(address)) res.status(200).send(balance) } catch (err: any) { switch (err.message) { diff --git a/services/chronikService.ts b/services/chronikService.ts index 8b08934e..2a48a463 100644 --- a/services/chronikService.ts +++ b/services/chronikService.ts @@ -26,7 +26,7 @@ import { BroadcastTxData } from 'ws-service/types' import config from 'config' import io, { Socket } from 'socket.io-client' import moment from 'moment' -import { OpReturnData, parseAddress, parseError, parseOpReturnData } from 'utils/validators' +import { OpReturnData, parseError, parseOpReturnData } from 'utils/validators' import { executeAddressTriggers, executeTriggersBatch } from './triggerService' import { appendTxsToFile } from 'prisma-local/seeds/transactions' import { PHASE_PRODUCTION_BUILD } from 'next/dist/shared/lib/constants' @@ -251,7 +251,7 @@ export class ChronikBlockchainClient { } } - private async getTransactionAmountAndData (transaction: Tx, addressString: string): Promise<{amount: Prisma.Decimal, opReturn: string}> { + private getTransactionAmountAndData (transaction: Tx, addressString: string): {amount: Prisma.Decimal, opReturn: string} { let totalOutput = 0n let totalInput = 0n const addressFormat = xecaddr.detectAddressFormat(addressString) @@ -271,31 +271,27 @@ export class ChronikBlockchainClient { } } } + for (const input of transaction.inputs) { if (input?.outputScript?.includes(script) === true) { totalInput += input.sats } } + const satoshis = totalOutput - totalInput + const amount = satoshisToUnit(satoshis, addressFormat) + return { - amount: await satoshisToUnit(satoshis, addressFormat), + amount, opReturn } } - private async getTransactionFromChronikTransaction (transaction: Tx, address: Address): Promise { - const { amount, opReturn } = await this.getTransactionAmountAndData(transaction, address.address) + private getTransactionFromChronikTransaction (transaction: Tx, address: Address): Prisma.TransactionUncheckedCreateInput { + const { amount, opReturn } = this.getTransactionAmountAndData(transaction, address.address) const inputAddresses = this.getSortedInputAddresses(transaction) const outputAddresses = this.getSortedOutputAddresses(transaction) - const parseAddressString = (addr: string): string | undefined => { - try { - return parseAddress(addr) - } catch { - return undefined - } - } - return { hash: transaction.txid, amount, @@ -305,13 +301,9 @@ export class ChronikBlockchainClient { opReturn, inputs: { create: inputAddresses - .map(({ address: addr, amount: amt }, i) => ({ address: parseAddressString(addr), index: i, amount: amt })) - .filter((item): item is { address: string, index: number, amount: Prisma.Decimal } => item.address !== undefined) }, outputs: { create: outputAddresses - .map(({ address: addr, amount: amt }, i) => ({ address: parseAddressString(addr), index: i, amount: amt })) - .filter((item): item is { address: string, index: number, amount: Prisma.Decimal } => item.address !== undefined) } } } @@ -367,11 +359,15 @@ export class ChronikBlockchainClient { pageTxs = [] } - if (pageTxs.length === 0) { + if (pageIndex === 0 && pageTxs.length === 0) { console.log(`${addrLogPrefix} EMPTY ADDRESS`) break } + if (pageTxs.length < CHRONIK_FETCH_N_TXS_PER_PAGE) { + hasReachedStoppingCondition = true + } + const newestTs = Number(pageTxs[0].block?.timestamp ?? pageTxs[0].timeFirstSeen) if (newestTs < lastSyncedTimestampSeconds) { @@ -455,9 +451,7 @@ export class ChronikBlockchainClient { page += 1 - const transactionsToPersist = await Promise.all( - [...confirmedTransactions, ...unconfirmedTransactions].map(async tx => await this.getTransactionFromChronikTransaction(tx, address)) - ) + const transactionsToPersist = [...confirmedTransactions, ...unconfirmedTransactions].map(tx => this.getTransactionFromChronikTransaction(tx, address)) const persistedTransactions = await createManyTransactions(transactionsToPersist) if (persistedTransactions.length > 0) { const simplifiedTransactions = getSimplifiedTransactions(persistedTransactions) @@ -532,7 +526,7 @@ export class ChronikBlockchainClient { } } - private getSortedInputAddresses (transaction: Tx): Array<{address: string, amount: Prisma.Decimal}> { + private getSortedInputAddresses (transaction: Tx): Array<{address: string, index: number, amount: Prisma.Decimal}> { const addressSatsMap = new Map() transaction.inputs.forEach((inp) => { const address = outputScriptToAddress(this.networkSlug, inp.outputScript) @@ -544,16 +538,18 @@ export class ChronikBlockchainClient { const unitDivisor = this.networkId === XEC_NETWORK_ID ? 1e2 : (this.networkId === BCH_NETWORK_ID ? 1e8 : 1) - const sortedInputAddresses = Array.from(addressSatsMap.entries()) - .sort(([, valueA], [, valueB]) => Number(valueB - valueA)) - return sortedInputAddresses.map(([address, sats]) => { + const result: Array<{address: string, index: number, amount: Prisma.Decimal}> = [] + let index = 0 + for (const [address, sats] of addressSatsMap.entries()) { const decimal = new Prisma.Decimal(sats.toString()) const amount = decimal.dividedBy(unitDivisor) - return { address, amount } - }) + result.push({ address, index, amount }) + index++ + } + return result } - private getSortedOutputAddresses (transaction: Tx): Array<{address: string, amount: Prisma.Decimal}> { + private getSortedOutputAddresses (transaction: Tx): Array<{address: string, index: number, amount: Prisma.Decimal}> { const addressSatsMap = new Map() transaction.outputs.forEach((out) => { const address = outputScriptToAddress(this.networkSlug, out.outputScript) @@ -565,14 +561,15 @@ export class ChronikBlockchainClient { const unitDivisor = this.networkId === XEC_NETWORK_ID ? 1e2 : (this.networkId === BCH_NETWORK_ID ? 1e8 : 1) - const sortedOutputAddresses = Array.from(addressSatsMap.entries()) - .sort(([, valueA], [, valueB]) => Number(valueB - valueA)) - .map(([address, sats]) => { - const decimal = new Prisma.Decimal(sats.toString()) - const amount = decimal.dividedBy(unitDivisor) - return { address, amount } - }) - return sortedOutputAddresses + const result: Array<{address: string, index: number, amount: Prisma.Decimal}> = [] + let index = 0 + for (const [address, sats] of addressSatsMap.entries()) { + const decimal = new Prisma.Decimal(sats.toString()) + const amount = decimal.dividedBy(unitDivisor) + result.push({ address, index, amount }) + index++ + } + return result } public async waitForSyncing (txId: string, addressStringArray: string[]): Promise { @@ -764,14 +761,14 @@ export class ChronikBlockchainClient { private async getAddressesForTransaction (transaction: Tx): Promise { const relatedAddresses = this.getRelatedAddressesForTransaction(transaction) const addressesFromStringArray = await fetchAddressesArray(relatedAddresses) - const addressesWithTransactions: AddressWithTransaction[] = await Promise.all(addressesFromStringArray.map( - async address => { + const addressesWithTransactions: AddressWithTransaction[] = addressesFromStringArray.map( + address => { return { address, - transaction: await this.getTransactionFromChronikTransaction(transaction, address) + transaction: this.getTransactionFromChronikTransaction(transaction, address) } } - )) + ) const zero = new Prisma.Decimal(0) return addressesWithTransactions.filter( addressWithTransaction => !(zero.equals(addressWithTransaction.transaction.amount as Prisma.Decimal)) @@ -835,12 +832,10 @@ export class ChronikBlockchainClient { const involvedAddrIds = new Set(batch.chronikTxs.map(({ address }) => address.id)) try { - const pairsFromBatch: RowWithRaw[] = await Promise.all( - batch.chronikTxs.map(async ({ tx, address }) => { - const row = await this.getTransactionFromChronikTransaction(tx, address) - return { row, raw: tx } - }) - ) + const pairsFromBatch: RowWithRaw[] = batch.chronikTxs.map(({ tx, address }) => { + const row = this.getTransactionFromChronikTransaction(tx, address) + return { row, raw: tx } + }) for (const { row } of pairsFromBatch) { perAddrCount.set(row.addressId, (perAddrCount.get(row.addressId) ?? 0) + 1) diff --git a/utils/index.ts b/utils/index.ts index 140a2729..21fcfff3 100644 --- a/utils/index.ts +++ b/utils/index.ts @@ -49,7 +49,7 @@ export const getAddressPrefixed = function (addressString: string): string { return `${getAddressPrefix(addressString)}:${removeAddressPrefix(addressString)}` } -export async function satoshisToUnit (satoshis: bigint, networkFormat: string): Promise { +export function satoshisToUnit (satoshis: bigint, networkFormat: string): Prisma.Decimal { const decimal = new Prisma.Decimal(satoshis.toString()) if (networkFormat === xecaddr.Format.Xecaddr) { return decimal.dividedBy(1e2) From 1a18e66a807e4f4fe1cbda759c0ecb29265fd661 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Estev=C3=A3o?= Date: Thu, 26 Feb 2026 10:56:09 -0300 Subject: [PATCH 5/8] fix: one network at a time --- services/chronikService.ts | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/services/chronikService.ts b/services/chronikService.ts index 2a48a463..665e2677 100644 --- a/services/chronikService.ts +++ b/services/chronikService.ts @@ -1156,10 +1156,8 @@ class MultiBlockchainClient { public async syncMissedTransactions (): Promise { await this.waitForStart() - await Promise.all([ - this.clients.ecash.syncMissedTransactions(), - this.clients.bitcoincash.syncMissedTransactions() - ]) + await this.clients.ecash.syncMissedTransactions() + await this.clients.bitcoincash.syncMissedTransactions() } public async syncAndSubscribeAddresses (addresses: Address[]): Promise { From 0de5060524dc259aa5ed53dae4c46fe5219ba3fb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Estev=C3=A3o?= Date: Thu, 26 Feb 2026 10:58:51 -0300 Subject: [PATCH 6/8] fix: current prices after initial sync --- jobs/initJobs.ts | 29 +++++++++++++++-------------- jobs/workers.ts | 3 ++- 2 files changed, 17 insertions(+), 15 deletions(-) diff --git a/jobs/initJobs.ts b/jobs/initJobs.ts index 0a25b755..3d7a0c1d 100644 --- a/jobs/initJobs.ts +++ b/jobs/initJobs.ts @@ -16,19 +16,7 @@ const main = async (): Promise => { await blockchainQueue.obliterate({ force: true }) await cleanupQueue.obliterate({ force: true }) - await pricesQueue.add('syncCurrentPrices', - {}, - { - jobId: 'syncCurrentPrices', - removeOnFail: false, - repeat: { - every: CURRENT_PRICE_REPEAT_DELAY - } - } - ) - - await syncCurrentPricesWorker(pricesQueue.name) - + // Start blockchain sync first; current prices job starts only after it completes await blockchainQueue.add('syncBlockchainAndPrices', {}, { @@ -37,7 +25,20 @@ const main = async (): Promise => { removeOnFail: true } ) - await syncBlockchainAndPricesWorker(blockchainQueue.name) + void await syncBlockchainAndPricesWorker(blockchainQueue.name, async () => { + await pricesQueue.add('syncCurrentPrices', + {}, + { + jobId: 'syncCurrentPrices', + removeOnFail: false, + repeat: { + every: CURRENT_PRICE_REPEAT_DELAY + } + } + ) + await syncCurrentPricesWorker(pricesQueue.name) + console.log('Current prices sync job started after blockchain sync completion.') + }) await cleanupQueue.add( 'cleanupClientPayments', diff --git a/jobs/workers.ts b/jobs/workers.ts index d913176f..99899820 100644 --- a/jobs/workers.ts +++ b/jobs/workers.ts @@ -31,7 +31,7 @@ export const syncCurrentPricesWorker = async (queueName: string): Promise }) } -export const syncBlockchainAndPricesWorker = async (queueName: string): Promise => { +export const syncBlockchainAndPricesWorker = async (queueName: string, onComplete?: () => Promise | void): Promise => { const worker = new Worker( queueName, async (job) => { @@ -53,6 +53,7 @@ export const syncBlockchainAndPricesWorker = async (queueName: string): Promise< await multiBlockchainClient.destroy() console.log('Done.') console.log(`job ${job.id as string}: blockchain + prices sync finished`) + await onComplete?.() })() }) From 578591bdaaadc18cfb43fe3638f9a861fbb0ec2b Mon Sep 17 00:00:00 2001 From: Fabien Date: Fri, 27 Feb 2026 16:33:34 +0100 Subject: [PATCH 7/8] Fix the chronik service tests --- tests/unittests/chronikService.test.ts | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) diff --git a/tests/unittests/chronikService.test.ts b/tests/unittests/chronikService.test.ts index 18de3f67..a8d9440f 100644 --- a/tests/unittests/chronikService.test.ts +++ b/tests/unittests/chronikService.test.ts @@ -8,7 +8,7 @@ import { ChronikBlockchainClient, multiBlockchainClient } from '../../services/chronikService' -import { Address } from '@prisma/client' +import { Address, Prisma } from '@prisma/client' import { fetchAddressesArray } from '../../services/addressService' import { fetchUnconfirmedTransactions, deleteTransactions, upsertTransaction } from '../../services/transactionService' import { executeAddressTriggers } from '../../services/triggerService' @@ -1403,14 +1403,22 @@ describe('WS onMessage matrix (no re-mocks)', () => { .mockReturnValue(['ecash:qqkv9wr69ry2p9l53lxp635va4h86wv435995w8p2h']) // minimal transaction shape for downstream + // Note: getTransactionFromChronikTransaction is now synchronous, so use mockReturnValue jest.spyOn(client, 'getTransactionFromChronikTransaction') - .mockResolvedValue({ + .mockReturnValue({ hash: 'txCONF', - amount: '0.01', + amount: new Prisma.Decimal('0.01'), timestamp: Math.floor(Date.now() / 1000), addressId: 'addr-1', - confirmed: false, - opReturn: JSON.stringify({ message: { type: 'PAY', paymentId: 'pid-1' } }) + confirmed: true, + orphaned: false, + opReturn: JSON.stringify({ message: { type: 'PAY', paymentId: 'pid-1' } }), + inputs: { + create: [] + }, + outputs: { + create: [] + } }) const paySpy = jest.spyOn(client, 'handleUpdateClientPaymentStatus') From 111e2cc9e4d766dcef2769825f6330a1295f03d6 Mon Sep 17 00:00:00 2001 From: Fabien Date: Fri, 27 Feb 2026 16:47:11 +0100 Subject: [PATCH 8/8] Remove TransactionOutput from the database It's unclear what the use case is and it was added in #1110 to avoid having to do it later. It appears that this table is costly as it contains a lot of outputs. Let's remove it for now to favor disk size and performance. We can add it back later if we need it. --- .../migration.sql | 4 ++ prisma-local/schema.prisma | 11 ----- services/chronikService.ts | 4 -- services/transactionService.ts | 47 +++++-------------- tests/unittests/transactionService.test.ts | 13 +++-- 5 files changed, 21 insertions(+), 58 deletions(-) create mode 100644 prisma-local/migrations/20260227163713_remove_transaction_output/migration.sql diff --git a/prisma-local/migrations/20260227163713_remove_transaction_output/migration.sql b/prisma-local/migrations/20260227163713_remove_transaction_output/migration.sql new file mode 100644 index 00000000..6cf78ca1 --- /dev/null +++ b/prisma-local/migrations/20260227163713_remove_transaction_output/migration.sql @@ -0,0 +1,4 @@ +-- Remove TransactionOutput table +-- This table is being removed as its use case is unclear and it impacts performance + +DROP TABLE IF EXISTS `TransactionOutput`; diff --git a/prisma-local/schema.prisma b/prisma-local/schema.prisma index 2abad1bc..9b6fd490 100644 --- a/prisma-local/schema.prisma +++ b/prisma-local/schema.prisma @@ -79,7 +79,6 @@ model Transaction { prices PricesOnTransactions[] invoices Invoice[] inputs TransactionInput[] - outputs TransactionOutput[] createdAt DateTime @default(now()) updatedAt DateTime @updatedAt @@ -99,16 +98,6 @@ model TransactionInput { @@index([transactionId]) } -model TransactionOutput { - id String @id @default(dbgenerated("(uuid())")) - transactionId String - address String @db.VarChar(255) - index Int - transaction Transaction @relation(fields: [transactionId], references: [id], onDelete: Cascade) - amount Decimal @db.Decimal(24, 8) - - @@index([transactionId]) -} model Wallet { id String @id @default(dbgenerated("(uuid())")) diff --git a/services/chronikService.ts b/services/chronikService.ts index 665e2677..96354607 100644 --- a/services/chronikService.ts +++ b/services/chronikService.ts @@ -290,7 +290,6 @@ export class ChronikBlockchainClient { private getTransactionFromChronikTransaction (transaction: Tx, address: Address): Prisma.TransactionUncheckedCreateInput { const { amount, opReturn } = this.getTransactionAmountAndData(transaction, address.address) const inputAddresses = this.getSortedInputAddresses(transaction) - const outputAddresses = this.getSortedOutputAddresses(transaction) return { hash: transaction.txid, @@ -301,9 +300,6 @@ export class ChronikBlockchainClient { opReturn, inputs: { create: inputAddresses - }, - outputs: { - create: outputAddresses } } } diff --git a/services/transactionService.ts b/services/transactionService.ts index 8cc44206..68f2226e 100644 --- a/services/transactionService.ts +++ b/services/transactionService.ts @@ -55,9 +55,9 @@ export function getSimplifiedTrasaction (tx: TransactionWithAddressAndPrices, in const parsedOpReturn = resolveOpReturn(opReturn) const dbInputsArr = (tx as { inputs?: Array<{ address: string, amount: Prisma.Decimal }> }).inputs - const dbOutputsArr = (tx as { outputs?: Array<{ address: string, amount: Prisma.Decimal }> }).outputs const resolvedInputAddresses = inputAddresses ?? (Array.isArray(dbInputsArr) ? dbInputsArr.map(i => ({ address: i.address, amount: i.amount })) : []) - const resolvedOutputAddresses = outputAddresses ?? (Array.isArray(dbOutputsArr) ? dbOutputsArr.map(o => ({ address: o.address, amount: o.amount })) : []) + // outputAddresses must be provided as parameter since TransactionOutput is no longer stored in DB + const resolvedOutputAddresses = outputAddresses ?? [] const simplifiedTransaction: SimplifiedTransaction = { hash, @@ -96,8 +96,7 @@ const includePrices = { const includeAddressAndPrices = { address: true, ...includePrices, - inputs: { orderBy: { index: 'asc' as const } }, - outputs: { orderBy: { index: 'asc' as const } } + inputs: { orderBy: { index: 'asc' as const } } } const transactionWithPrices = Prisma.validator()( @@ -137,8 +136,7 @@ const includePaybuttonsAndPrices = { } }, ...includePrices, - inputs: { orderBy: { index: 'asc' as const } }, - outputs: { orderBy: { index: 'asc' as const } } + inputs: { orderBy: { index: 'asc' as const } } } export const includePaybuttonsAndPricesAndInvoices = { ...includePaybuttonsAndPrices, @@ -576,14 +574,12 @@ export async function createManyTransactions ( orphaned: false })) - const txInputsOutputs = transactionsData.map((tx) => { + const txInputs = transactionsData.map((tx) => { const inputs = (tx.inputs != null) && 'create' in tx.inputs ? tx.inputs.create : [] - const outputs = (tx.outputs != null) && 'create' in tx.outputs ? tx.outputs.create : [] return { hash: tx.hash, addressId: tx.addressId, - inputs: Array.isArray(inputs) ? inputs : [], - outputs: Array.isArray(outputs) ? outputs : [] + inputs: Array.isArray(inputs) ? inputs : [] } }) @@ -618,7 +614,7 @@ export async function createManyTransactions ( // 2. Split into new and existing transactions const newTxs: typeof flatTxData = [] - const newTxsInputsOutputs: typeof txInputsOutputs = [] + const newTxsInputs: typeof txInputs = [] const toUpdate: Array<{ id: string confirmed: boolean @@ -649,7 +645,7 @@ export async function createManyTransactions ( } } else { newTxs.push(tx) - newTxsInputsOutputs.push(txInputsOutputs[i]) + newTxsInputs.push(txInputs[i]) } } @@ -681,16 +677,15 @@ export async function createManyTransactions ( } }) - // Create a map to match transactions with their inputs/outputs - const txMap = new Map() + // Create a map to match transactions with their inputs + const txMap = new Map() for (let i = 0; i < newTxs.length; i++) { const tx = newTxs[i] const created = createdTxs.find(ct => ct.hash === tx.hash && ct.addressId === tx.addressId) if (created != null) { txMap.set(`${tx.hash}:${tx.addressId}`, { tx: created as any, - inputs: newTxsInputsOutputs[i].inputs, - outputs: newTxsInputsOutputs[i].outputs + inputs: newTxsInputs[i].inputs }) } } @@ -715,26 +710,6 @@ export async function createManyTransactions ( }) } - // Create all outputs at once - const allOutputs: Array<{ transactionId: string, address: string, index: number, amount: Prisma.Decimal }> = [] - for (const [, { tx, outputs }] of txMap) { - for (const output of outputs) { - allOutputs.push({ - transactionId: tx.id, - address: output.address, - index: output.index, - amount: output.amount instanceof Prisma.Decimal ? output.amount : new Prisma.Decimal(output.amount as string | number) - }) - } - } - - if (allOutputs.length > 0) { - await prisma.transactionOutput.createMany({ - data: allOutputs, - skipDuplicates: true - }) - } - // Fetch the full transactions with includes for return value const fullTxs = await prisma.transaction.findMany({ where: { diff --git a/tests/unittests/transactionService.test.ts b/tests/unittests/transactionService.test.ts index 4293dbc1..e2e3f47a 100644 --- a/tests/unittests/transactionService.test.ts +++ b/tests/unittests/transactionService.test.ts @@ -27,8 +27,7 @@ const includePaybuttonsAndPrices = { } }, ...includePrices, - inputs: { orderBy: { index: 'asc' as const } }, - outputs: { orderBy: { index: 'asc' as const } } + inputs: { orderBy: { index: 'asc' as const } } } describe('Create services', () => { @@ -197,12 +196,12 @@ describe('Address object arrays (input/output) integration', () => { expect(simplified.outputAddresses).toEqual(outputs) }) - it('getSimplifiedTrasaction uses inputs/outputs from tx when not provided explicitly', () => { + it('getSimplifiedTrasaction uses inputs from tx when not provided explicitly, but outputs must be provided as parameter', () => { const inputsFromDb = [ { address: 'ecash:qqinput1', amount: new Prisma.Decimal(1.23) }, { address: 'ecash:qqinput2', amount: new Prisma.Decimal(4.56) } ] - const outputsFromDb = [ + const outputsProvided = [ { address: 'ecash:qqout1', amount: new Prisma.Decimal(7.89) }, { address: 'ecash:qqout2', amount: new Prisma.Decimal(0.12) } ] @@ -214,10 +213,10 @@ describe('Address object arrays (input/output) integration', () => { address: { address: 'ecash:qqprimaryaddressxxxxxxxxxxxxxxxxxxxxx' }, timestamp: 1700000000, prices: mockedTransaction.prices, - inputs: inputsFromDb, - outputs: outputsFromDb + inputs: inputsFromDb + // Note: outputs are no longer stored in DB, so they won't be read from tx.outputs } - const simplified = transactionService.getSimplifiedTrasaction(tx) + const simplified = transactionService.getSimplifiedTrasaction(tx, undefined, outputsProvided) expect(simplified.inputAddresses).toEqual([ { address: 'ecash:qqinput1', amount: new Prisma.Decimal(1.23) }, { address: 'ecash:qqinput2', amount: new Prisma.Decimal(4.56) }