From bc42eaf783c0431a76840ffd8276cc555abcc5eb Mon Sep 17 00:00:00 2001 From: Fabien Date: Mon, 2 Mar 2026 15:28:45 +0100 Subject: [PATCH] Continuous transaction fetching during syncing This commit changes the logic of the transactions fetching and processing during the initlal sync. Previously the process was fetching the data from (up to) 16 addresses, then if there was enough txs to fill a batch it would drain the txs, then process the next 16 addresses. This means that it's fetching the addresses by batches of 16 which is suboptimal as most of the time is spent waiting for the last ones to complete. After this patch, the process attempts to always keep 16 address being fetched in parallel, while still processing the transactions the same way. Example: A batch of 16 addresses contains 15 whith no transactions a one address with 10k transactions. Before the patch, the process will patiently wait for the last address to complete before fetching new addresses, so it's single chronik stream. After this patch, the first 15 addresses will be replaced by new ones and so on, so we maintain 16 chronik streams all the time. This speeds up syncing significantly, especially on eCash. There is one minor downside: the logs are not showing the progress accurately anymore. It's printing the current address index over the total addresses, but since they don't start/finish in order the number can jump back and forth. It still gives a good enough estimate of the progress. --- services/chronikService.ts | 133 +++++++++++++++++++------------------ 1 file changed, 70 insertions(+), 63 deletions(-) diff --git a/services/chronikService.ts b/services/chronikService.ts index 2e009325..b3e1a5a7 100644 --- a/services/chronikService.ts +++ b/services/chronikService.ts @@ -328,34 +328,30 @@ export class ChronikBlockchainClient { ): AsyncGenerator { const logPrefix = `${this.CHRONIK_MSG_PREFIX}[PARALLEL FETCHING]` + const totalCount = addresses.length console.log( - `${logPrefix} >>> Will fetch latest txs for ${addresses.length} addresses ` + + `${logPrefix} >>> Will fetch latest txs for ${totalCount} addresses ` + `(addressConcurrency=${INITIAL_ADDRESS_SYNC_FETCH_CONCURRENTLY}, pageConcurrency=1).` ) let chronikTxs: ChronikTxWithAddress[] = [] - let lastBatchAddresses: string[] = [] - - const totalCount = addresses.length - let syncedAlready = 0 - for (let i = 0; i < addresses.length; i += INITIAL_ADDRESS_SYNC_FETCH_CONCURRENTLY) { - const addressBatchSlice = addresses.slice(i, i + INITIAL_ADDRESS_SYNC_FETCH_CONCURRENTLY) - lastBatchAddresses = addressBatchSlice.map(a => a.address) - - console.log(`${logPrefix} >>> starting chronik fetching for ${addressBatchSlice.length} addresses... (${syncedAlready}/${totalCount} synced)`) + const completedAddresses: string[] = [] - // Track completed addresses - const completedAddresses: string[] = [] + // Worker pool: maintain exactly INITIAL_ADDRESS_SYNC_FETCH_CONCURRENTLY active workers + const activeWorkers = new Set>() + let nextAddressIndex = 0 - const perAddressWorkers = addressBatchSlice.map(async (address) => { - const addrLogPrefix = `${logPrefix} > ${address.address}:` - const lastSyncedTimestampSeconds = this.getLastSyncTs(address) - const txThresholdFilter = this.txThesholdFilter(address) + // Function to process a single address + const processAddress = async (address: Address, addressIndex: number): Promise => { + const addrLogPrefix = `${logPrefix} > ${address.address}: (${addressIndex + 1}/${totalCount})` + const lastSyncedTimestampSeconds = this.getLastSyncTs(address) + const txThresholdFilter = this.txThesholdFilter(address) - let nextBurstBasePageIndex = 0 - let hasReachedStoppingCondition = false + let nextBurstBasePageIndex = 0 + let hasReachedStoppingCondition = false + let newTxs = 0 - let newTxs = 0 + try { while (!hasReachedStoppingCondition) { const pageIndex = nextBurstBasePageIndex let pageTxs: Tx[] = [] @@ -408,62 +404,73 @@ export class ChronikBlockchainClient { if (newTxs > 0) { console.log(`${addrLogPrefix} ${newTxs} new txs.`) } + } catch (err: any) { + console.error(`${logPrefix}: address job failed: ${err.message as string}`) + } finally { completedAddresses.push(address.address) - }) - syncedAlready += addressBatchSlice.length + } + } - // Start workers but don't wait - yield batches while they're running - const workersPromise = Promise.all( - perAddressWorkers.map(async worker => - await worker.catch(err => console.error(`${logPrefix}: address job failed: ${err.message as string}`)) - ) - ) + // Start next worker from the queue + const startNextWorker = (): void => { + if (nextAddressIndex >= totalCount) { + // No more addresses + return + } - // Race between worker completion and periodic checks to yield batches incrementally - let allWorkersDone = false + const currentIndex = nextAddressIndex + nextAddressIndex++ - while (!allWorkersDone || chronikTxs.length > 0) { - // Yield batches if buffer is large enough - while (chronikTxs.length >= TX_EMIT_BATCH_SIZE) { - const chronikTxsSlice = chronikTxs.splice(0, TX_EMIT_BATCH_SIZE) - yield { chronikTxs: chronikTxsSlice, addressesSynced: [] } - } + const workerPromise = processAddress(addresses[currentIndex], currentIndex).finally(() => { + activeWorkers.delete(workerPromise) + // Immediately start next worker if queue has more + startNextWorker() + }) + activeWorkers.add(workerPromise) + } - // If workers are done, yield any remaining transactions (even if < batch size) - if (allWorkersDone && chronikTxs.length > 0) { - // This clears chronikTxs so the below check for length === 0 is true - const remaining = chronikTxs.splice(0) - yield { chronikTxs: remaining, addressesSynced: [] } - } + // Start initial batch of workers + const initialBatchSize = Math.min(INITIAL_ADDRESS_SYNC_FETCH_CONCURRENTLY, totalCount) + for (let i = 0; i < initialBatchSize; i++) { + startNextWorker() + } - // Yield completed addresses if any - if (completedAddresses.length > 0) { - const completed = completedAddresses.splice(0) - yield { chronikTxs: [], addressesSynced: completed } - } + // Poll and yield batches while workers are active + while (activeWorkers.size > 0 || chronikTxs.length > 0) { + // Yield batches if buffer is large enough. Make sure to drain until there + // are not enough transactions to fill the batch. + while (chronikTxs.length >= TX_EMIT_BATCH_SIZE) { + const chronikTxsSlice = chronikTxs.splice(0, TX_EMIT_BATCH_SIZE) + yield { chronikTxs: chronikTxsSlice, addressesSynced: [] } + } - // If workers are done and no more transactions, break - if (allWorkersDone && chronikTxs.length === 0) { - break - } + // If no active workers, yield any remaining transactions (even if < batch size) + if (activeWorkers.size === 0 && chronikTxs.length > 0) { + const remaining = chronikTxs.splice(0) + yield { chronikTxs: remaining, addressesSynced: [] } + } - // Wait a bit or until workers complete - const raceResult = await Promise.race([ - workersPromise.then(() => true), - new Promise(resolve => setTimeout(() => resolve(false), TX_BATCH_POLLING_DELAY)) - ]) + // Yield completed addresses if any + if (completedAddresses.length > 0) { + const completed = completedAddresses.splice(0) + yield { chronikTxs: [], addressesSynced: completed } + } - // Update flag if workers completed - if (raceResult) { - allWorkersDone = true - } + // If no active workers and no more transactions, break + if (activeWorkers.size === 0 && chronikTxs.length === 0) { + break } - // Ensure all workers are finished - await workersPromise + // Wait a bit for more transactions or worker completion + await Promise.race([ + Promise.all(Array.from(activeWorkers)).then(() => true), + new Promise(resolve => setTimeout(() => resolve(false), TX_BATCH_POLLING_DELAY)) + ]) + } - // Yield batch marker for completed address group - yield { chronikTxs: [], addressesSynced: lastBatchAddresses } + // Wait for all workers to finish (should already be done) + if (activeWorkers.size > 0) { + await Promise.all(Array.from(activeWorkers)) } // Final TX flush after all addresses processed