diff --git a/services/chronikService.ts b/services/chronikService.ts index 2e009325..b3e1a5a7 100644 --- a/services/chronikService.ts +++ b/services/chronikService.ts @@ -328,34 +328,30 @@ export class ChronikBlockchainClient { ): AsyncGenerator { const logPrefix = `${this.CHRONIK_MSG_PREFIX}[PARALLEL FETCHING]` + const totalCount = addresses.length console.log( - `${logPrefix} >>> Will fetch latest txs for ${addresses.length} addresses ` + + `${logPrefix} >>> Will fetch latest txs for ${totalCount} addresses ` + `(addressConcurrency=${INITIAL_ADDRESS_SYNC_FETCH_CONCURRENTLY}, pageConcurrency=1).` ) let chronikTxs: ChronikTxWithAddress[] = [] - let lastBatchAddresses: string[] = [] - - const totalCount = addresses.length - let syncedAlready = 0 - for (let i = 0; i < addresses.length; i += INITIAL_ADDRESS_SYNC_FETCH_CONCURRENTLY) { - const addressBatchSlice = addresses.slice(i, i + INITIAL_ADDRESS_SYNC_FETCH_CONCURRENTLY) - lastBatchAddresses = addressBatchSlice.map(a => a.address) - - console.log(`${logPrefix} >>> starting chronik fetching for ${addressBatchSlice.length} addresses... (${syncedAlready}/${totalCount} synced)`) + const completedAddresses: string[] = [] - // Track completed addresses - const completedAddresses: string[] = [] + // Worker pool: maintain exactly INITIAL_ADDRESS_SYNC_FETCH_CONCURRENTLY active workers + const activeWorkers = new Set>() + let nextAddressIndex = 0 - const perAddressWorkers = addressBatchSlice.map(async (address) => { - const addrLogPrefix = `${logPrefix} > ${address.address}:` - const lastSyncedTimestampSeconds = this.getLastSyncTs(address) - const txThresholdFilter = this.txThesholdFilter(address) + // Function to process a single address + const processAddress = async (address: Address, addressIndex: number): Promise => { + const addrLogPrefix = `${logPrefix} > ${address.address}: (${addressIndex + 1}/${totalCount})` + const lastSyncedTimestampSeconds = this.getLastSyncTs(address) + const txThresholdFilter = this.txThesholdFilter(address) - let nextBurstBasePageIndex = 0 - let hasReachedStoppingCondition = false + let nextBurstBasePageIndex = 0 + let hasReachedStoppingCondition = false + let newTxs = 0 - let newTxs = 0 + try { while (!hasReachedStoppingCondition) { const pageIndex = nextBurstBasePageIndex let pageTxs: Tx[] = [] @@ -408,62 +404,73 @@ export class ChronikBlockchainClient { if (newTxs > 0) { console.log(`${addrLogPrefix} ${newTxs} new txs.`) } + } catch (err: any) { + console.error(`${logPrefix}: address job failed: ${err.message as string}`) + } finally { completedAddresses.push(address.address) - }) - syncedAlready += addressBatchSlice.length + } + } - // Start workers but don't wait - yield batches while they're running - const workersPromise = Promise.all( - perAddressWorkers.map(async worker => - await worker.catch(err => console.error(`${logPrefix}: address job failed: ${err.message as string}`)) - ) - ) + // Start next worker from the queue + const startNextWorker = (): void => { + if (nextAddressIndex >= totalCount) { + // No more addresses + return + } - // Race between worker completion and periodic checks to yield batches incrementally - let allWorkersDone = false + const currentIndex = nextAddressIndex + nextAddressIndex++ - while (!allWorkersDone || chronikTxs.length > 0) { - // Yield batches if buffer is large enough - while (chronikTxs.length >= TX_EMIT_BATCH_SIZE) { - const chronikTxsSlice = chronikTxs.splice(0, TX_EMIT_BATCH_SIZE) - yield { chronikTxs: chronikTxsSlice, addressesSynced: [] } - } + const workerPromise = processAddress(addresses[currentIndex], currentIndex).finally(() => { + activeWorkers.delete(workerPromise) + // Immediately start next worker if queue has more + startNextWorker() + }) + activeWorkers.add(workerPromise) + } - // If workers are done, yield any remaining transactions (even if < batch size) - if (allWorkersDone && chronikTxs.length > 0) { - // This clears chronikTxs so the below check for length === 0 is true - const remaining = chronikTxs.splice(0) - yield { chronikTxs: remaining, addressesSynced: [] } - } + // Start initial batch of workers + const initialBatchSize = Math.min(INITIAL_ADDRESS_SYNC_FETCH_CONCURRENTLY, totalCount) + for (let i = 0; i < initialBatchSize; i++) { + startNextWorker() + } - // Yield completed addresses if any - if (completedAddresses.length > 0) { - const completed = completedAddresses.splice(0) - yield { chronikTxs: [], addressesSynced: completed } - } + // Poll and yield batches while workers are active + while (activeWorkers.size > 0 || chronikTxs.length > 0) { + // Yield batches if buffer is large enough. Make sure to drain until there + // are not enough transactions to fill the batch. + while (chronikTxs.length >= TX_EMIT_BATCH_SIZE) { + const chronikTxsSlice = chronikTxs.splice(0, TX_EMIT_BATCH_SIZE) + yield { chronikTxs: chronikTxsSlice, addressesSynced: [] } + } - // If workers are done and no more transactions, break - if (allWorkersDone && chronikTxs.length === 0) { - break - } + // If no active workers, yield any remaining transactions (even if < batch size) + if (activeWorkers.size === 0 && chronikTxs.length > 0) { + const remaining = chronikTxs.splice(0) + yield { chronikTxs: remaining, addressesSynced: [] } + } - // Wait a bit or until workers complete - const raceResult = await Promise.race([ - workersPromise.then(() => true), - new Promise(resolve => setTimeout(() => resolve(false), TX_BATCH_POLLING_DELAY)) - ]) + // Yield completed addresses if any + if (completedAddresses.length > 0) { + const completed = completedAddresses.splice(0) + yield { chronikTxs: [], addressesSynced: completed } + } - // Update flag if workers completed - if (raceResult) { - allWorkersDone = true - } + // If no active workers and no more transactions, break + if (activeWorkers.size === 0 && chronikTxs.length === 0) { + break } - // Ensure all workers are finished - await workersPromise + // Wait a bit for more transactions or worker completion + await Promise.race([ + Promise.all(Array.from(activeWorkers)).then(() => true), + new Promise(resolve => setTimeout(() => resolve(false), TX_BATCH_POLLING_DELAY)) + ]) + } - // Yield batch marker for completed address group - yield { chronikTxs: [], addressesSynced: lastBatchAddresses } + // Wait for all workers to finish (should already be done) + if (activeWorkers.size > 0) { + await Promise.all(Array.from(activeWorkers)) } // Final TX flush after all addresses processed