Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
133 changes: 70 additions & 63 deletions services/chronikService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -328,34 +328,30 @@ export class ChronikBlockchainClient {
): AsyncGenerator<FetchedTxsBatch> {
const logPrefix = `${this.CHRONIK_MSG_PREFIX}[PARALLEL FETCHING]`

const totalCount = addresses.length
console.log(
`${logPrefix} >>> Will fetch latest txs for ${addresses.length} addresses ` +
`${logPrefix} >>> Will fetch latest txs for ${totalCount} addresses ` +
`(addressConcurrency=${INITIAL_ADDRESS_SYNC_FETCH_CONCURRENTLY}, pageConcurrency=1).`
)

let chronikTxs: ChronikTxWithAddress[] = []
let lastBatchAddresses: string[] = []

const totalCount = addresses.length
let syncedAlready = 0
for (let i = 0; i < addresses.length; i += INITIAL_ADDRESS_SYNC_FETCH_CONCURRENTLY) {
const addressBatchSlice = addresses.slice(i, i + INITIAL_ADDRESS_SYNC_FETCH_CONCURRENTLY)
lastBatchAddresses = addressBatchSlice.map(a => a.address)

console.log(`${logPrefix} >>> starting chronik fetching for ${addressBatchSlice.length} addresses... (${syncedAlready}/${totalCount} synced)`)
const completedAddresses: string[] = []

// Track completed addresses
const completedAddresses: string[] = []
// Worker pool: maintain exactly INITIAL_ADDRESS_SYNC_FETCH_CONCURRENTLY active workers
const activeWorkers = new Set<Promise<void>>()
let nextAddressIndex = 0

const perAddressWorkers = addressBatchSlice.map(async (address) => {
const addrLogPrefix = `${logPrefix} > ${address.address}:`
const lastSyncedTimestampSeconds = this.getLastSyncTs(address)
const txThresholdFilter = this.txThesholdFilter(address)
// Function to process a single address
const processAddress = async (address: Address, addressIndex: number): Promise<void> => {
const addrLogPrefix = `${logPrefix} > ${address.address}: (${addressIndex + 1}/${totalCount})`
const lastSyncedTimestampSeconds = this.getLastSyncTs(address)
const txThresholdFilter = this.txThesholdFilter(address)

let nextBurstBasePageIndex = 0
let hasReachedStoppingCondition = false
let nextBurstBasePageIndex = 0
let hasReachedStoppingCondition = false
let newTxs = 0

let newTxs = 0
try {
while (!hasReachedStoppingCondition) {
const pageIndex = nextBurstBasePageIndex
let pageTxs: Tx[] = []
Expand Down Expand Up @@ -408,62 +404,73 @@ export class ChronikBlockchainClient {
if (newTxs > 0) {
console.log(`${addrLogPrefix} ${newTxs} new txs.`)
}
} catch (err: any) {
console.error(`${logPrefix}: address job failed: ${err.message as string}`)
} finally {
completedAddresses.push(address.address)
})
syncedAlready += addressBatchSlice.length
}
}

// Start workers but don't wait - yield batches while they're running
const workersPromise = Promise.all(
perAddressWorkers.map(async worker =>
await worker.catch(err => console.error(`${logPrefix}: address job failed: ${err.message as string}`))
)
)
// Start next worker from the queue
const startNextWorker = (): void => {
if (nextAddressIndex >= totalCount) {
// No more addresses
return
}

// Race between worker completion and periodic checks to yield batches incrementally
let allWorkersDone = false
const currentIndex = nextAddressIndex
nextAddressIndex++

while (!allWorkersDone || chronikTxs.length > 0) {
// Yield batches if buffer is large enough
while (chronikTxs.length >= TX_EMIT_BATCH_SIZE) {
const chronikTxsSlice = chronikTxs.splice(0, TX_EMIT_BATCH_SIZE)
yield { chronikTxs: chronikTxsSlice, addressesSynced: [] }
}
const workerPromise = processAddress(addresses[currentIndex], currentIndex).finally(() => {
activeWorkers.delete(workerPromise)
// Immediately start next worker if queue has more
startNextWorker()
})
activeWorkers.add(workerPromise)
}

// If workers are done, yield any remaining transactions (even if < batch size)
if (allWorkersDone && chronikTxs.length > 0) {
// This clears chronikTxs so the below check for length === 0 is true
const remaining = chronikTxs.splice(0)
yield { chronikTxs: remaining, addressesSynced: [] }
}
// Start initial batch of workers
const initialBatchSize = Math.min(INITIAL_ADDRESS_SYNC_FETCH_CONCURRENTLY, totalCount)
for (let i = 0; i < initialBatchSize; i++) {
startNextWorker()
}

// Yield completed addresses if any
if (completedAddresses.length > 0) {
const completed = completedAddresses.splice(0)
yield { chronikTxs: [], addressesSynced: completed }
}
// Poll and yield batches while workers are active
while (activeWorkers.size > 0 || chronikTxs.length > 0) {
// Yield batches if buffer is large enough. Make sure to drain until there
// are not enough transactions to fill the batch.
while (chronikTxs.length >= TX_EMIT_BATCH_SIZE) {
const chronikTxsSlice = chronikTxs.splice(0, TX_EMIT_BATCH_SIZE)
yield { chronikTxs: chronikTxsSlice, addressesSynced: [] }
}

// If workers are done and no more transactions, break
if (allWorkersDone && chronikTxs.length === 0) {
break
}
// If no active workers, yield any remaining transactions (even if < batch size)
if (activeWorkers.size === 0 && chronikTxs.length > 0) {
const remaining = chronikTxs.splice(0)
yield { chronikTxs: remaining, addressesSynced: [] }
}

// Wait a bit or until workers complete
const raceResult = await Promise.race([
workersPromise.then(() => true),
new Promise<boolean>(resolve => setTimeout(() => resolve(false), TX_BATCH_POLLING_DELAY))
])
// Yield completed addresses if any
if (completedAddresses.length > 0) {
const completed = completedAddresses.splice(0)
yield { chronikTxs: [], addressesSynced: completed }
}

// Update flag if workers completed
if (raceResult) {
allWorkersDone = true
}
// If no active workers and no more transactions, break
if (activeWorkers.size === 0 && chronikTxs.length === 0) {
break
}

// Ensure all workers are finished
await workersPromise
// Wait a bit for more transactions or worker completion
await Promise.race([
Promise.all(Array.from(activeWorkers)).then(() => true),
new Promise<boolean>(resolve => setTimeout(() => resolve(false), TX_BATCH_POLLING_DELAY))
])
}

// Yield batch marker for completed address group
yield { chronikTxs: [], addressesSynced: lastBatchAddresses }
// Wait for all workers to finish (should already be done)
if (activeWorkers.size > 0) {
await Promise.all(Array.from(activeWorkers))
}

// Final TX flush after all addresses processed
Expand Down