Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions l1infotreesync/l1infotreesync.go
Original file line number Diff line number Diff line change
Expand Up @@ -487,3 +487,27 @@ func (s *L1InfoTreeSync) IsUpToDate(ctx context.Context, l1Client aggkittypes.Ba

return lastProcessedBlock >= finalizedBlock.Number.Uint64(), nil
}

// SubscribeToGERReorg allows subscribers to receive notifications when GERs are removed due to reorgs.
// The returned channel will receive GERReorgEvent instances containing information about the reorged block
// and all affected L1InfoTreeLeaf entries.
//
// Parameters:
// - subscriberName: A unique identifier for the subscriber (used for logging and debugging)
//
// Returns:
// - A receive-only channel that will receive GERReorgEvent notifications
//
// Example usage:
//
// reorgCh := l1InfoTreeSync.SubscribeToGERReorg("aggsender")
// go func() {
// for event := range reorgCh {
// log.Infof("Received reorg affecting %d GERs from block %d",
// len(event.ReorgedLeaves), event.FirstReorgedBlock)
// // Handle the reorg event
// }
// }()
func (s *L1InfoTreeSync) SubscribeToGERReorg(subscriberName string) <-chan GERReorgEvent {
return s.processor.gerReorgNotifier.Subscribe(subscriberName)
}
41 changes: 41 additions & 0 deletions l1infotreesync/l1infotreesync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,3 +332,44 @@ func TestL1InfoTreeSync_GetCompletionPercentage(t *testing.T) {
mockEVMDriver.EXPECT().GetCompletionPercentage().Return(&percent).Once()
require.Equal(t, &percent, s.GetCompletionPercentage())
}

func TestL1InfoTreeSync_SubscribeToGERReorg(t *testing.T) {
t.Parallel()

ctx := context.Background()
dbPath := path.Join(t.TempDir(), "l1infotreesync_subscribe_reorg.sqlite")

l1InfoTreeSync, err := NewReadOnly(ctx, dbPath)
require.NoError(t, err)

// Subscribe via public interface
reorgCh := l1InfoTreeSync.SubscribeToGERReorg("test-consumer")

// Create state
info := &UpdateL1InfoTree{
MainnetExitRoot: common.HexToHash("beef"),
RollupExitRoot: common.HexToHash("5ca1e"),
ParentHash: common.HexToHash("1010101"),
Timestamp: 420,
BlockPosition: 0,
}
err = l1InfoTreeSync.processor.ProcessBlock(ctx, sync.Block{
Num: 1,
Hash: common.HexToHash("block1"),
Events: []interface{}{Event{UpdateL1InfoTree: info}},
})
require.NoError(t, err)

// Trigger reorg
err = l1InfoTreeSync.processor.Reorg(ctx, 1)
require.NoError(t, err)

// Verify event received
select {
case event := <-reorgCh:
require.Equal(t, uint64(1), event.FirstReorgedBlock)
require.Len(t, event.ReorgedLeaves, 1)
case <-time.After(1 * time.Second):
t.Fatal("timeout waiting for reorg event")
}
}
63 changes: 51 additions & 12 deletions l1infotreesync/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"errors"
"fmt"
mutex "sync"
"time"

aggkitcommon "github.com/agglayer/aggkit/common"
"github.com/agglayer/aggkit/db"
Expand All @@ -28,13 +29,14 @@ var (
)

type processor struct {
db *sql.DB
l1InfoTree treetypes.FullTreer
rollupExitTree treetypes.FullTreer
mu mutex.RWMutex
halted bool
haltedReason string
log *log.Logger
db *sql.DB
l1InfoTree treetypes.FullTreer
rollupExitTree treetypes.FullTreer
mu mutex.RWMutex
halted bool
haltedReason string
log *log.Logger
gerReorgNotifier aggkitcommon.PubSub[GERReorgEvent]
}

// UpdateL1InfoTree representation of the UpdateL1InfoTree event
Expand Down Expand Up @@ -121,6 +123,18 @@ func (l *L1InfoTreeInitial) String() string {
return fmt.Sprintf("BlockNumber: %d, LeafCount: %d, L1InfoRoot: %s", l.BlockNumber, l.LeafCount, l.L1InfoRoot.String())
}

// GERReorgEvent represents information about GERs removed during a reorg
type GERReorgEvent struct {
FirstReorgedBlock uint64 // Block number where reorg started
ReorgedLeaves []*L1InfoTreeLeaf // All affected L1InfoTree leaves
Timestamp uint64 // Unix timestamp when reorg occurred
}

func (g *GERReorgEvent) String() string {
return fmt.Sprintf("GERReorgEvent{FirstReorgedBlock: %d, ReorgedLeaves: %d, Timestamp: %d}",
g.FirstReorgedBlock, len(g.ReorgedLeaves), g.Timestamp)
}

// Hash as expected by the tree
func (l *L1InfoTreeLeaf) GetHash() common.Hash {
rawTimestamp := aggkitcommon.Uint64ToBigEndianBytes(l.Timestamp)
Expand Down Expand Up @@ -148,10 +162,11 @@ func newProcessor(dbPath string) (*processor, error) {
return nil, err
}
return &processor{
db: database,
l1InfoTree: tree.NewAppendOnlyTree(database, migrations.L1InfoTreePrefix),
rollupExitTree: tree.NewUpdatableTree(database, migrations.RollupExitTreePrefix),
log: log.WithFields("processor", "l1infotreesync"),
db: database,
l1InfoTree: tree.NewAppendOnlyTree(database, migrations.L1InfoTreePrefix),
rollupExitTree: tree.NewUpdatableTree(database, migrations.RollupExitTreePrefix),
log: log.WithFields("processor", "l1infotreesync"),
gerReorgNotifier: aggkitcommon.NewGenericSubscriber[GERReorgEvent](),
}, nil
}

Expand Down Expand Up @@ -327,6 +342,18 @@ func (p *processor) Reorg(ctx context.Context, firstReorgedBlock uint64) error {
}
}()

// Query affected L1InfoTreeLeaves BEFORE cascade delete
var reorgedLeaves []*L1InfoTreeLeaf
err = meddler.QueryAll(tx, &reorgedLeaves,
`SELECT * FROM l1info_leaf WHERE block_num >= $1 ORDER BY block_num ASC, block_pos ASC;`,
firstReorgedBlock)
if err != nil && !errors.Is(err, sql.ErrNoRows) {
return fmt.Errorf("failed to query affected l1info_leaf entries: %w", err)
}

p.log.Debugf("found %d l1info_leaf entries to be reorged from block %d",
len(reorgedLeaves), firstReorgedBlock)

res, err := tx.Exec(`DELETE FROM block WHERE num >= $1;`, firstReorgedBlock)
if err != nil {
return err
Expand All @@ -349,12 +376,24 @@ func (p *processor) Reorg(ctx context.Context, firstReorgedBlock uint64) error {
return err
}

p.log.Infof("reorged to block %d, %d rows affected", firstReorgedBlock, rowsAffected)
p.log.Infof("reorged to block %d, %d block rows affected, %d l1info_leaf entries removed",
firstReorgedBlock, rowsAffected, len(reorgedLeaves))

shouldRollback = false

if rowsAffected > 0 {
p.unhalt()

// Publish notification ONLY if there were affected leaves
if len(reorgedLeaves) > 0 {
event := GERReorgEvent{
FirstReorgedBlock: firstReorgedBlock,
ReorgedLeaves: reorgedLeaves,
Timestamp: uint64(time.Now().Unix()),
}
p.log.Infof("publishing GER reorg event: %s", event.String())
p.gerReorgNotifier.Publish(event)
}
}
return nil
}
Expand Down
Loading
Loading