From 48235642e207d7c176ad3105644c4c293c48fa9c Mon Sep 17 00:00:00 2001 From: joanestebanr <129153821+joanestebanr@users.noreply.github.com> Date: Mon, 16 Feb 2026 10:47:28 +0100 Subject: [PATCH] feat: l1infotree publish GER reorged --- l1infotreesync/l1infotreesync.go | 24 +++ l1infotreesync/l1infotreesync_test.go | 41 +++++ l1infotreesync/processor.go | 63 ++++++-- l1infotreesync/processor_test.go | 219 ++++++++++++++++++++++++++ 4 files changed, 335 insertions(+), 12 deletions(-) diff --git a/l1infotreesync/l1infotreesync.go b/l1infotreesync/l1infotreesync.go index 97b87438a..1d1d35993 100644 --- a/l1infotreesync/l1infotreesync.go +++ b/l1infotreesync/l1infotreesync.go @@ -487,3 +487,27 @@ func (s *L1InfoTreeSync) IsUpToDate(ctx context.Context, l1Client aggkittypes.Ba return lastProcessedBlock >= finalizedBlock.Number.Uint64(), nil } + +// SubscribeToGERReorg allows subscribers to receive notifications when GERs are removed due to reorgs. +// The returned channel will receive GERReorgEvent instances containing information about the reorged block +// and all affected L1InfoTreeLeaf entries. +// +// Parameters: +// - subscriberName: A unique identifier for the subscriber (used for logging and debugging) +// +// Returns: +// - A receive-only channel that will receive GERReorgEvent notifications +// +// Example usage: +// +// reorgCh := l1InfoTreeSync.SubscribeToGERReorg("aggsender") +// go func() { +// for event := range reorgCh { +// log.Infof("Received reorg affecting %d GERs from block %d", +// len(event.ReorgedLeaves), event.FirstReorgedBlock) +// // Handle the reorg event +// } +// }() +func (s *L1InfoTreeSync) SubscribeToGERReorg(subscriberName string) <-chan GERReorgEvent { + return s.processor.gerReorgNotifier.Subscribe(subscriberName) +} diff --git a/l1infotreesync/l1infotreesync_test.go b/l1infotreesync/l1infotreesync_test.go index 96ab5a0c4..a3cc58abc 100644 --- a/l1infotreesync/l1infotreesync_test.go +++ b/l1infotreesync/l1infotreesync_test.go @@ -332,3 +332,44 @@ func TestL1InfoTreeSync_GetCompletionPercentage(t *testing.T) { mockEVMDriver.EXPECT().GetCompletionPercentage().Return(&percent).Once() require.Equal(t, &percent, s.GetCompletionPercentage()) } + +func TestL1InfoTreeSync_SubscribeToGERReorg(t *testing.T) { + t.Parallel() + + ctx := context.Background() + dbPath := path.Join(t.TempDir(), "l1infotreesync_subscribe_reorg.sqlite") + + l1InfoTreeSync, err := NewReadOnly(ctx, dbPath) + require.NoError(t, err) + + // Subscribe via public interface + reorgCh := l1InfoTreeSync.SubscribeToGERReorg("test-consumer") + + // Create state + info := &UpdateL1InfoTree{ + MainnetExitRoot: common.HexToHash("beef"), + RollupExitRoot: common.HexToHash("5ca1e"), + ParentHash: common.HexToHash("1010101"), + Timestamp: 420, + BlockPosition: 0, + } + err = l1InfoTreeSync.processor.ProcessBlock(ctx, sync.Block{ + Num: 1, + Hash: common.HexToHash("block1"), + Events: []interface{}{Event{UpdateL1InfoTree: info}}, + }) + require.NoError(t, err) + + // Trigger reorg + err = l1InfoTreeSync.processor.Reorg(ctx, 1) + require.NoError(t, err) + + // Verify event received + select { + case event := <-reorgCh: + require.Equal(t, uint64(1), event.FirstReorgedBlock) + require.Len(t, event.ReorgedLeaves, 1) + case <-time.After(1 * time.Second): + t.Fatal("timeout waiting for reorg event") + } +} diff --git a/l1infotreesync/processor.go b/l1infotreesync/processor.go index 269557c37..c39aaa341 100644 --- a/l1infotreesync/processor.go +++ b/l1infotreesync/processor.go @@ -6,6 +6,7 @@ import ( "errors" "fmt" mutex "sync" + "time" aggkitcommon "github.com/agglayer/aggkit/common" "github.com/agglayer/aggkit/db" @@ -28,13 +29,14 @@ var ( ) type processor struct { - db *sql.DB - l1InfoTree treetypes.FullTreer - rollupExitTree treetypes.FullTreer - mu mutex.RWMutex - halted bool - haltedReason string - log *log.Logger + db *sql.DB + l1InfoTree treetypes.FullTreer + rollupExitTree treetypes.FullTreer + mu mutex.RWMutex + halted bool + haltedReason string + log *log.Logger + gerReorgNotifier aggkitcommon.PubSub[GERReorgEvent] } // UpdateL1InfoTree representation of the UpdateL1InfoTree event @@ -121,6 +123,18 @@ func (l *L1InfoTreeInitial) String() string { return fmt.Sprintf("BlockNumber: %d, LeafCount: %d, L1InfoRoot: %s", l.BlockNumber, l.LeafCount, l.L1InfoRoot.String()) } +// GERReorgEvent represents information about GERs removed during a reorg +type GERReorgEvent struct { + FirstReorgedBlock uint64 // Block number where reorg started + ReorgedLeaves []*L1InfoTreeLeaf // All affected L1InfoTree leaves + Timestamp uint64 // Unix timestamp when reorg occurred +} + +func (g *GERReorgEvent) String() string { + return fmt.Sprintf("GERReorgEvent{FirstReorgedBlock: %d, ReorgedLeaves: %d, Timestamp: %d}", + g.FirstReorgedBlock, len(g.ReorgedLeaves), g.Timestamp) +} + // Hash as expected by the tree func (l *L1InfoTreeLeaf) GetHash() common.Hash { rawTimestamp := aggkitcommon.Uint64ToBigEndianBytes(l.Timestamp) @@ -148,10 +162,11 @@ func newProcessor(dbPath string) (*processor, error) { return nil, err } return &processor{ - db: database, - l1InfoTree: tree.NewAppendOnlyTree(database, migrations.L1InfoTreePrefix), - rollupExitTree: tree.NewUpdatableTree(database, migrations.RollupExitTreePrefix), - log: log.WithFields("processor", "l1infotreesync"), + db: database, + l1InfoTree: tree.NewAppendOnlyTree(database, migrations.L1InfoTreePrefix), + rollupExitTree: tree.NewUpdatableTree(database, migrations.RollupExitTreePrefix), + log: log.WithFields("processor", "l1infotreesync"), + gerReorgNotifier: aggkitcommon.NewGenericSubscriber[GERReorgEvent](), }, nil } @@ -327,6 +342,18 @@ func (p *processor) Reorg(ctx context.Context, firstReorgedBlock uint64) error { } }() + // Query affected L1InfoTreeLeaves BEFORE cascade delete + var reorgedLeaves []*L1InfoTreeLeaf + err = meddler.QueryAll(tx, &reorgedLeaves, + `SELECT * FROM l1info_leaf WHERE block_num >= $1 ORDER BY block_num ASC, block_pos ASC;`, + firstReorgedBlock) + if err != nil && !errors.Is(err, sql.ErrNoRows) { + return fmt.Errorf("failed to query affected l1info_leaf entries: %w", err) + } + + p.log.Debugf("found %d l1info_leaf entries to be reorged from block %d", + len(reorgedLeaves), firstReorgedBlock) + res, err := tx.Exec(`DELETE FROM block WHERE num >= $1;`, firstReorgedBlock) if err != nil { return err @@ -349,12 +376,24 @@ func (p *processor) Reorg(ctx context.Context, firstReorgedBlock uint64) error { return err } - p.log.Infof("reorged to block %d, %d rows affected", firstReorgedBlock, rowsAffected) + p.log.Infof("reorged to block %d, %d block rows affected, %d l1info_leaf entries removed", + firstReorgedBlock, rowsAffected, len(reorgedLeaves)) shouldRollback = false if rowsAffected > 0 { p.unhalt() + + // Publish notification ONLY if there were affected leaves + if len(reorgedLeaves) > 0 { + event := GERReorgEvent{ + FirstReorgedBlock: firstReorgedBlock, + ReorgedLeaves: reorgedLeaves, + Timestamp: uint64(time.Now().Unix()), + } + p.log.Infof("publishing GER reorg event: %s", event.String()) + p.gerReorgNotifier.Publish(event) + } } return nil } diff --git a/l1infotreesync/processor_test.go b/l1infotreesync/processor_test.go index e676090a1..e9644558a 100644 --- a/l1infotreesync/processor_test.go +++ b/l1infotreesync/processor_test.go @@ -389,6 +389,225 @@ func TestProcessor_ConcurrentProcessBlockAndReorg(t *testing.T) { } } +func TestProcessor_Reorg_PublishesGERReorgEvent(t *testing.T) { + t.Parallel() + + ctx := context.Background() + dbPath := path.Join(t.TempDir(), "processor_reorg_publishes_event.sqlite") + p, err := newProcessor(dbPath) + require.NoError(t, err) + + // Subscribe to reorg events + reorgCh := p.gerReorgNotifier.Subscribe("test-subscriber") + + // Create initial state with multiple L1InfoTree leaves + info1 := &UpdateL1InfoTree{ + MainnetExitRoot: common.HexToHash("beef"), + RollupExitRoot: common.HexToHash("5ca1e"), + ParentHash: common.HexToHash("1010101"), + Timestamp: 420, + BlockPosition: 0, + } + err = p.ProcessBlock(ctx, aggkitsync.Block{ + Num: 1, + Hash: common.HexToHash("block1"), + Events: []interface{}{Event{UpdateL1InfoTree: info1}}, + }) + require.NoError(t, err) + + info2 := &UpdateL1InfoTree{ + MainnetExitRoot: common.HexToHash("dead"), + RollupExitRoot: common.HexToHash("c0de"), + ParentHash: common.HexToHash("2020202"), + Timestamp: 421, + BlockPosition: 0, + } + err = p.ProcessBlock(ctx, aggkitsync.Block{ + Num: 2, + Hash: common.HexToHash("block2"), + Events: []interface{}{Event{UpdateL1InfoTree: info2}}, + }) + require.NoError(t, err) + + info3 := &UpdateL1InfoTree{ + MainnetExitRoot: common.HexToHash("fade"), + RollupExitRoot: common.HexToHash("babe"), + ParentHash: common.HexToHash("3030303"), + Timestamp: 422, + BlockPosition: 0, + } + err = p.ProcessBlock(ctx, aggkitsync.Block{ + Num: 3, + Hash: common.HexToHash("block3"), + Events: []interface{}{Event{UpdateL1InfoTree: info3}}, + }) + require.NoError(t, err) + + // Verify initial state + lastInfo, err := p.GetLastInfo() + require.NoError(t, err) + require.Equal(t, uint64(3), lastInfo.BlockNumber) + require.Equal(t, uint32(2), lastInfo.L1InfoTreeIndex) + + // Perform reorg from block 2 + err = p.Reorg(ctx, 2) + require.NoError(t, err) + + // Verify reorg event was published + select { + case event := <-reorgCh: + require.Equal(t, uint64(2), event.FirstReorgedBlock) + require.Len(t, event.ReorgedLeaves, 2, "should have 2 reorged leaves (blocks 2 and 3)") + + // Verify the reorged leaves contain correct data + require.Equal(t, uint64(2), event.ReorgedLeaves[0].BlockNumber) + require.Equal(t, uint32(1), event.ReorgedLeaves[0].L1InfoTreeIndex) + require.Equal(t, CalculateGER(info2.MainnetExitRoot, info2.RollupExitRoot), + event.ReorgedLeaves[0].GlobalExitRoot) + + require.Equal(t, uint64(3), event.ReorgedLeaves[1].BlockNumber) + require.Equal(t, uint32(2), event.ReorgedLeaves[1].L1InfoTreeIndex) + require.Equal(t, CalculateGER(info3.MainnetExitRoot, info3.RollupExitRoot), + event.ReorgedLeaves[1].GlobalExitRoot) + + require.Greater(t, event.Timestamp, uint64(0), "timestamp should be set") + case <-time.After(1 * time.Second): + t.Fatal("timeout waiting for reorg event") + } + + // Verify database state after reorg + lastInfo, err = p.GetLastInfo() + require.NoError(t, err) + require.Equal(t, uint64(1), lastInfo.BlockNumber, "only block 1 should remain") + require.Equal(t, uint32(0), lastInfo.L1InfoTreeIndex) +} + +func TestProcessor_Reorg_NoEventWhenNoLeaves(t *testing.T) { + t.Parallel() + + ctx := context.Background() + dbPath := path.Join(t.TempDir(), "processor_reorg_no_event.sqlite") + p, err := newProcessor(dbPath) + require.NoError(t, err) + + // Subscribe to reorg events + reorgCh := p.gerReorgNotifier.Subscribe("test-subscriber") + + // Create a block without any L1InfoTree events + err = p.ProcessBlock(ctx, aggkitsync.Block{ + Num: 1, + Hash: common.HexToHash("block1"), + Events: []interface{}{}, + }) + require.NoError(t, err) + + // Perform reorg + err = p.Reorg(ctx, 1) + require.NoError(t, err) + + // Verify NO event was published (since no l1info_leaf entries were affected) + select { + case event := <-reorgCh: + t.Fatalf("unexpected reorg event received: %+v", event) + case <-time.After(100 * time.Millisecond): + // Expected: no event + } +} + +func TestProcessor_Reorg_NoEventWhenRowsNotAffected(t *testing.T) { + t.Parallel() + + ctx := context.Background() + dbPath := path.Join(t.TempDir(), "processor_reorg_no_rows.sqlite") + p, err := newProcessor(dbPath) + require.NoError(t, err) + + // Subscribe to reorg events + reorgCh := p.gerReorgNotifier.Subscribe("test-subscriber") + + // Create initial state + info1 := &UpdateL1InfoTree{ + MainnetExitRoot: common.HexToHash("beef"), + RollupExitRoot: common.HexToHash("5ca1e"), + ParentHash: common.HexToHash("1010101"), + Timestamp: 420, + BlockPosition: 0, + } + err = p.ProcessBlock(ctx, aggkitsync.Block{ + Num: 1, + Hash: common.HexToHash("block1"), + Events: []interface{}{Event{UpdateL1InfoTree: info1}}, + }) + require.NoError(t, err) + + // Reorg from block 10 (which doesn't exist) + err = p.Reorg(ctx, 10) + require.NoError(t, err) + + // Verify NO event was published (since rowsAffected == 0) + select { + case event := <-reorgCh: + t.Fatalf("unexpected reorg event received: %+v", event) + case <-time.After(100 * time.Millisecond): + // Expected: no event + } +} + +func TestProcessor_Reorg_MultipleSubscribers(t *testing.T) { + t.Parallel() + + ctx := context.Background() + dbPath := path.Join(t.TempDir(), "processor_reorg_multiple_subs.sqlite") + p, err := newProcessor(dbPath) + require.NoError(t, err) + + // Create multiple subscribers + sub1Ch := p.gerReorgNotifier.Subscribe("subscriber-1") + sub2Ch := p.gerReorgNotifier.Subscribe("subscriber-2") + sub3Ch := p.gerReorgNotifier.Subscribe("subscriber-3") + + // Create initial state + info1 := &UpdateL1InfoTree{ + MainnetExitRoot: common.HexToHash("beef"), + RollupExitRoot: common.HexToHash("5ca1e"), + ParentHash: common.HexToHash("1010101"), + Timestamp: 420, + BlockPosition: 0, + } + err = p.ProcessBlock(ctx, aggkitsync.Block{ + Num: 1, + Hash: common.HexToHash("block1"), + Events: []interface{}{Event{UpdateL1InfoTree: info1}}, + }) + require.NoError(t, err) + + // Perform reorg + err = p.Reorg(ctx, 1) + require.NoError(t, err) + + // Verify all subscribers receive the event + var wg sync.WaitGroup + wg.Add(3) + + checkSubscriber := func(ch <-chan GERReorgEvent, name string) { + defer wg.Done() + select { + case event := <-ch: + require.Equal(t, uint64(1), event.FirstReorgedBlock) + require.Len(t, event.ReorgedLeaves, 1) + t.Logf("%s received event successfully", name) + case <-time.After(1 * time.Second): + t.Errorf("%s: timeout waiting for reorg event", name) + } + } + + go checkSubscriber(sub1Ch, "subscriber-1") + go checkSubscriber(sub2Ch, "subscriber-2") + go checkSubscriber(sub3Ch, "subscriber-3") + + wg.Wait() +} + func TestProcessBlockUpdateL1InfoTreeV2DontMatchTree(t *testing.T) { sut, err := newProcessor(path.Join(t.TempDir(), "l1infotreesyncTestProcessBlockUpdateL1InfoTreeV2DontMatchTree.sqlite")) require.NoError(t, err)