Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 15 additions & 10 deletions bridgesync/backfill_tx_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,9 +124,9 @@ type RecordToBackfill struct {
BlockTimestamp uint64 `meddler:"block_timestamp"`
LeafType uint8 `meddler:"leaf_type"`
OriginNetwork uint32 `meddler:"origin_network"`
OriginAddress common.Address `meddler:"origin_address,address"`
OriginAddress common.Address `meddler:"origin_address"`
DestinationNetwork uint32 `meddler:"destination_network"`
DestinationAddress common.Address `meddler:"destination_address,address"`
DestinationAddress common.Address `meddler:"destination_address"`
Amount *big.Int `meddler:"amount,bigint"`
Metadata []byte `meddler:"metadata"`
DepositCount uint32 `meddler:"deposit_count"`
Expand All @@ -139,6 +139,7 @@ type RecordUpdate struct {
BlockPos uint64
TxnSender common.Address
FromAddr common.Address
ToAddr common.Address
}

func (r *RecordUpdate) String() string {
Expand Down Expand Up @@ -309,14 +310,15 @@ func (b *BackfillTxnSender) worker(
Amount: job.Record.Amount,
}
// Extract txn_sender from transaction hash
txnSender, fromAddr, err := b.extractData(ctx, job.Record.TxHash, logEvent)
txnSender, fromAddr, toAddr, err := b.extractData(ctx, job.Record.TxHash, logEvent)

result := TxnSenderResult{
Update: RecordUpdate{
BlockNum: job.Record.BlockNum,
BlockPos: job.Record.BlockPos,
TxnSender: txnSender,
FromAddr: fromAddr,
ToAddr: toAddr,
},
Error: err,
}
Expand All @@ -331,18 +333,19 @@ func (b *BackfillTxnSender) worker(
}
}

// extractData extracts the transaction txn_sender and from_address
// extractData extracts the transaction txn_sender, from_address and to_address
func (b *BackfillTxnSender) extractData(ctx context.Context,
txHash common.Hash,
logEvent *agglayerbridge.AgglayerbridgeBridgeEvent) (txnSender common.Address, fromAddr common.Address, err error) {
logEvent *agglayerbridge.AgglayerbridgeBridgeEvent) (txnSender common.Address,
fromAddr common.Address, toAddr common.Address, err error) {
// Check if context is cancelled before making network call
select {
case <-ctx.Done():
return common.Address{}, common.Address{}, ctx.Err()
return common.Address{}, common.Address{}, common.Address{}, ctx.Err()
default:
}
txnSender, fromAddr, _, err = ExtractTxnAddresses(ctx, b.client, b.bridgeAddr, txHash, logEvent, b.log)
return txnSender, fromAddr, err
txnSender, fromAddr, toAddr, err = ExtractTxnAddresses(ctx, b.client, b.bridgeAddr, txHash, logEvent, b.log)
return txnSender, fromAddr, toAddr, err
}

// bulkUpdate performs a bulk update of multiple records
Expand Down Expand Up @@ -377,7 +380,8 @@ func (b *BackfillTxnSender) bulkUpdate(
UPDATE %s
SET
txn_sender = COALESCE(NULLIF(txn_sender, ''), ?),
from_address = COALESCE(NULLIF(from_address, ''), ?)
from_address = COALESCE(NULLIF(from_address, ''), ?),
to_address = COALESCE(NULLIF(to_address, ''), ?)
WHERE block_num = ? AND block_pos = ?;
`, tableName))
if err != nil {
Expand All @@ -386,7 +390,8 @@ func (b *BackfillTxnSender) bulkUpdate(
defer stmt.Close()

for _, update := range updates {
_, err := stmt.ExecContext(dbCtx, update.TxnSender.Hex(), update.FromAddr.Hex(), update.BlockNum, update.BlockPos)
_, err := stmt.ExecContext(dbCtx, update.TxnSender.Hex(),
update.FromAddr.Hex(), update.ToAddr.Hex(), update.BlockNum, update.BlockPos)
if err != nil {
return fmt.Errorf("failed to execute update for block %d pos %d: %w",
update.BlockNum, update.BlockPos, err)
Expand Down
Loading
Loading