diff --git a/src/lean_spec/__main__.py b/src/lean_spec/__main__.py index 378d5663..dcde40ac 100644 --- a/src/lean_spec/__main__.py +++ b/src/lean_spec/__main__.py @@ -9,6 +9,7 @@ python -m lean_spec --genesis config.yaml --bootnode enr:-IS4QHCYrYZbAKW... python -m lean_spec --genesis config.yaml --checkpoint-sync-url http://localhost:5052 python -m lean_spec --genesis config.yaml --validator-keys ./keys --node-id lean_spec_0 + python -m lean_spec --genesis config.yaml --validator-keys ./keys --is-aggregator Options: --genesis Path to genesis YAML file (required) @@ -17,6 +18,7 @@ --checkpoint-sync-url URL to fetch finalized checkpoint state for fast sync --validator-keys Path to validator keys directory --node-id Node identifier for validator assignment (default: lean_spec_0) + --is-aggregator Enable aggregator mode for attestation aggregation (default: false) """ from __future__ import annotations @@ -163,6 +165,7 @@ def _init_from_genesis( genesis: GenesisConfig, event_source: LiveNetworkEventSource, validator_registry: ValidatorRegistry | None = None, + is_aggregator: bool = False, ) -> Node: """ Initialize a node from genesis configuration. @@ -171,6 +174,7 @@ def _init_from_genesis( genesis: Genesis configuration with time and validators. event_source: Network transport for the node. validator_registry: Optional registry with validator secret keys. + is_aggregator: Enable aggregator mode for attestation aggregation. Returns: A fully initialized Node starting from genesis. @@ -192,6 +196,7 @@ def _init_from_genesis( network=event_source.reqresp_client, validator_registry=validator_registry, fork_digest=GOSSIP_FORK_DIGEST, + is_aggregator=is_aggregator, ) # Create and return the node. @@ -203,6 +208,7 @@ async def _init_from_checkpoint( genesis: GenesisConfig, event_source: LiveNetworkEventSource, validator_registry: ValidatorRegistry | None = None, + is_aggregator: bool = False, ) -> Node | None: """ Initialize a node from a checkpoint state fetched from a remote node. @@ -229,6 +235,7 @@ async def _init_from_checkpoint( genesis: Local genesis configuration for validation. event_source: Network transport for the node. validator_registry: Optional registry with validator secret keys. + is_aggregator: Enable aggregator mode for attestation aggregation. Returns: A fully initialized Node if successful, None if checkpoint sync failed. @@ -301,6 +308,7 @@ async def _init_from_checkpoint( network=event_source.reqresp_client, validator_registry=validator_registry, fork_digest=GOSSIP_FORK_DIGEST, + is_aggregator=is_aggregator, ) # Create node and inject checkpoint store. @@ -392,6 +400,7 @@ async def run_node( validator_keys_path: Path | None = None, node_id: str = "lean_spec_0", genesis_time_now: bool = False, + is_aggregator: bool = False, ) -> None: """ Run the lean consensus node. @@ -404,6 +413,7 @@ async def run_node( validator_keys_path: Optional path to validator keys directory. node_id: Node identifier for validator assignment. genesis_time_now: Override genesis time to current time for testing. + is_aggregator: Enable aggregator mode for attestation aggregation. """ import time @@ -431,6 +441,10 @@ async def run_node( len(genesis.genesis_validators), ) + # Log aggregator mode if enabled + if is_aggregator: + logger.info("Aggregator mode enabled - node will perform attestation aggregation") + # Load validator keys if path provided. # # The registry holds secret keys for validators assigned to this node. @@ -504,6 +518,7 @@ async def run_node( genesis=genesis, event_source=event_source, validator_registry=validator_registry, + is_aggregator=is_aggregator, ) if node is None: # Checkpoint sync failed. Exit rather than falling back. @@ -516,6 +531,7 @@ async def run_node( genesis=genesis, event_source=event_source, validator_registry=validator_registry, + is_aggregator=is_aggregator, ) logger.info("Node initialized, peer_id=%s", event_source.connection_manager.peer_id) @@ -646,6 +662,11 @@ def main() -> None: action="store_true", help="Override genesis time to current time (for testing)", ) + parser.add_argument( + "--is-aggregator", + action="store_true", + help="Enable aggregator mode (node performs attestation aggregation)", + ) args = parser.parse_args() @@ -663,6 +684,7 @@ def main() -> None: args.validator_keys, args.node_id, args.genesis_time_now, + args.is_aggregator, ) ) except KeyboardInterrupt: diff --git a/src/lean_spec/subspecs/networking/enr/enr.py b/src/lean_spec/subspecs/networking/enr/enr.py index 1f2f3e24..81d38b26 100644 --- a/src/lean_spec/subspecs/networking/enr/enr.py +++ b/src/lean_spec/subspecs/networking/enr/enr.py @@ -186,6 +186,14 @@ def sync_committee_subnets(self) -> SyncCommitteeSubnets | None: return SyncCommitteeSubnets.decode_bytes(syncnets) return None + @property + def is_aggregator(self) -> bool: + """Check if node advertises aggregator capability.""" + aggregator_bytes = self.get(keys.IS_AGGREGATOR) + if aggregator_bytes and len(aggregator_bytes) == 1: + return aggregator_bytes[0] == 0x01 + return False + def is_valid(self) -> bool: """ Check structural validity (does NOT verify cryptographic signature). diff --git a/src/lean_spec/subspecs/networking/enr/keys.py b/src/lean_spec/subspecs/networking/enr/keys.py index f45bf719..a39f09a9 100644 --- a/src/lean_spec/subspecs/networking/enr/keys.py +++ b/src/lean_spec/subspecs/networking/enr/keys.py @@ -47,3 +47,6 @@ SYNCNETS: Final[EnrKey] = "syncnets" """Sync committee subnet subscriptions (1 byte bitvector).""" + +IS_AGGREGATOR: Final[EnrKey] = "is_aggregator" +"""Aggregator capability flag (1 byte: 0x00 = false, 0x01 = true).""" diff --git a/src/lean_spec/subspecs/networking/service/service.py b/src/lean_spec/subspecs/networking/service/service.py index e949643e..193c64f1 100644 --- a/src/lean_spec/subspecs/networking/service/service.py +++ b/src/lean_spec/subspecs/networking/service/service.py @@ -75,6 +75,9 @@ class NetworkService: fork_digest: str = field(default="0x00000000") """Fork digest for gossip topics (4-byte hex string).""" + is_aggregator: bool = field(default=False) + """Whether this node functions as an aggregator.""" + _running: bool = field(default=False, repr=False) """Whether the event loop is running.""" diff --git a/src/lean_spec/subspecs/node/helpers.py b/src/lean_spec/subspecs/node/helpers.py index f1cdf7f7..8ffe6a83 100644 --- a/src/lean_spec/subspecs/node/helpers.py +++ b/src/lean_spec/subspecs/node/helpers.py @@ -3,18 +3,24 @@ from lean_spec.subspecs.containers.validator import ValidatorIndex -def is_aggregator(validator_id: ValidatorIndex | None) -> bool: +def is_aggregator( + validator_id: ValidatorIndex | None, + node_is_aggregator: bool = False, +) -> bool: """ Determine if a validator is an aggregator. + A validator acts as an aggregator when: + 1. The validator is active (validator_id is not None) + 2. The node operator has enabled aggregator mode + Args: validator_id: The index of the validator. + node_is_aggregator: Whether the node is configured as an aggregator. Returns: - True if the validator is an aggregator, False otherwise. + True if the validator should perform aggregation, False otherwise. """ if validator_id is None: return False - return ( - False # Placeholder implementation, in future should be defined by node operator settings - ) + return node_is_aggregator diff --git a/src/lean_spec/subspecs/node/node.py b/src/lean_spec/subspecs/node/node.py index c4ec515c..1b612d7a 100644 --- a/src/lean_spec/subspecs/node/node.py +++ b/src/lean_spec/subspecs/node/node.py @@ -94,6 +94,18 @@ class NodeConfig: For devnet testing with ream, use "devnet0". """ + is_aggregator: bool = field(default=False) + """ + Whether this node functions as an aggregator. + + When True: + - The node performs attestation aggregation operations + - The ENR advertises aggregator capability to peers + + When False (default): + - The node runs in standard validator or passive mode + """ + def get_local_validator_id(registry: ValidatorRegistry | None) -> ValidatorIndex | None: """ @@ -218,6 +230,7 @@ def from_genesis(cls, config: NodeConfig) -> Node: clock=clock, network=config.network, database=database, + is_aggregator=config.is_aggregator, ) chain_service = ChainService(sync_service=sync_service, clock=clock) @@ -225,6 +238,7 @@ def from_genesis(cls, config: NodeConfig) -> Node: sync_service=sync_service, event_source=config.event_source, fork_digest=config.fork_digest, + is_aggregator=config.is_aggregator, ) # Create API server if configured diff --git a/src/lean_spec/subspecs/sync/service.py b/src/lean_spec/subspecs/sync/service.py index 37a4d5a3..e210d16f 100644 --- a/src/lean_spec/subspecs/sync/service.py +++ b/src/lean_spec/subspecs/sync/service.py @@ -150,6 +150,9 @@ class SyncService: database: Database | None = field(default=None) """Optional database for persisting blocks and states.""" + is_aggregator: bool = field(default=False) + """Whether this node functions as an aggregator.""" + process_block: BlockProcessor = field(default=default_block_processor) """Block processor function. Defaults to Store.on_block().""" @@ -425,7 +428,10 @@ async def on_gossip_attestation( from lean_spec.subspecs.node.helpers import is_aggregator # Check if we are an aggregator - is_aggregator_role = is_aggregator(self.store.validator_id) + is_aggregator_role = is_aggregator( + self.store.validator_id, + node_is_aggregator=self.is_aggregator, + ) # Integrate the attestation into forkchoice state. #