Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions src/lean_spec/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
python -m lean_spec --genesis config.yaml --bootnode enr:-IS4QHCYrYZbAKW...
python -m lean_spec --genesis config.yaml --checkpoint-sync-url http://localhost:5052
python -m lean_spec --genesis config.yaml --validator-keys ./keys --node-id lean_spec_0
python -m lean_spec --genesis config.yaml --validator-keys ./keys --is-aggregator

Options:
--genesis Path to genesis YAML file (required)
Expand All @@ -17,6 +18,7 @@
--checkpoint-sync-url URL to fetch finalized checkpoint state for fast sync
--validator-keys Path to validator keys directory
--node-id Node identifier for validator assignment (default: lean_spec_0)
--is-aggregator Enable aggregator mode for attestation aggregation (default: false)
"""

from __future__ import annotations
Expand Down Expand Up @@ -163,6 +165,7 @@ def _init_from_genesis(
genesis: GenesisConfig,
event_source: LiveNetworkEventSource,
validator_registry: ValidatorRegistry | None = None,
is_aggregator: bool = False,
) -> Node:
"""
Initialize a node from genesis configuration.
Expand All @@ -171,6 +174,7 @@ def _init_from_genesis(
genesis: Genesis configuration with time and validators.
event_source: Network transport for the node.
validator_registry: Optional registry with validator secret keys.
is_aggregator: Enable aggregator mode for attestation aggregation.

Returns:
A fully initialized Node starting from genesis.
Expand All @@ -192,6 +196,7 @@ def _init_from_genesis(
network=event_source.reqresp_client,
validator_registry=validator_registry,
fork_digest=GOSSIP_FORK_DIGEST,
is_aggregator=is_aggregator,
)

# Create and return the node.
Expand All @@ -203,6 +208,7 @@ async def _init_from_checkpoint(
genesis: GenesisConfig,
event_source: LiveNetworkEventSource,
validator_registry: ValidatorRegistry | None = None,
is_aggregator: bool = False,
) -> Node | None:
"""
Initialize a node from a checkpoint state fetched from a remote node.
Expand All @@ -229,6 +235,7 @@ async def _init_from_checkpoint(
genesis: Local genesis configuration for validation.
event_source: Network transport for the node.
validator_registry: Optional registry with validator secret keys.
is_aggregator: Enable aggregator mode for attestation aggregation.

Returns:
A fully initialized Node if successful, None if checkpoint sync failed.
Expand Down Expand Up @@ -301,6 +308,7 @@ async def _init_from_checkpoint(
network=event_source.reqresp_client,
validator_registry=validator_registry,
fork_digest=GOSSIP_FORK_DIGEST,
is_aggregator=is_aggregator,
)

# Create node and inject checkpoint store.
Expand Down Expand Up @@ -392,6 +400,7 @@ async def run_node(
validator_keys_path: Path | None = None,
node_id: str = "lean_spec_0",
genesis_time_now: bool = False,
is_aggregator: bool = False,
) -> None:
"""
Run the lean consensus node.
Expand All @@ -404,6 +413,7 @@ async def run_node(
validator_keys_path: Optional path to validator keys directory.
node_id: Node identifier for validator assignment.
genesis_time_now: Override genesis time to current time for testing.
is_aggregator: Enable aggregator mode for attestation aggregation.
"""
import time

Expand Down Expand Up @@ -431,6 +441,10 @@ async def run_node(
len(genesis.genesis_validators),
)

# Log aggregator mode if enabled
if is_aggregator:
logger.info("Aggregator mode enabled - node will perform attestation aggregation")

# Load validator keys if path provided.
#
# The registry holds secret keys for validators assigned to this node.
Expand Down Expand Up @@ -504,6 +518,7 @@ async def run_node(
genesis=genesis,
event_source=event_source,
validator_registry=validator_registry,
is_aggregator=is_aggregator,
)
if node is None:
# Checkpoint sync failed. Exit rather than falling back.
Expand All @@ -516,6 +531,7 @@ async def run_node(
genesis=genesis,
event_source=event_source,
validator_registry=validator_registry,
is_aggregator=is_aggregator,
)

logger.info("Node initialized, peer_id=%s", event_source.connection_manager.peer_id)
Expand Down Expand Up @@ -646,6 +662,11 @@ def main() -> None:
action="store_true",
help="Override genesis time to current time (for testing)",
)
parser.add_argument(
"--is-aggregator",
action="store_true",
help="Enable aggregator mode (node performs attestation aggregation)",
)

args = parser.parse_args()

Expand All @@ -663,6 +684,7 @@ def main() -> None:
args.validator_keys,
args.node_id,
args.genesis_time_now,
args.is_aggregator,
)
)
except KeyboardInterrupt:
Expand Down
8 changes: 8 additions & 0 deletions src/lean_spec/subspecs/networking/enr/enr.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,14 @@ def sync_committee_subnets(self) -> SyncCommitteeSubnets | None:
return SyncCommitteeSubnets.decode_bytes(syncnets)
return None

@property
def is_aggregator(self) -> bool:
"""Check if node advertises aggregator capability."""
aggregator_bytes = self.get(keys.IS_AGGREGATOR)
if aggregator_bytes and len(aggregator_bytes) == 1:
return aggregator_bytes[0] == 0x01
return False

def is_valid(self) -> bool:
"""
Check structural validity (does NOT verify cryptographic signature).
Expand Down
3 changes: 3 additions & 0 deletions src/lean_spec/subspecs/networking/enr/keys.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,3 +47,6 @@

SYNCNETS: Final[EnrKey] = "syncnets"
"""Sync committee subnet subscriptions (1 byte bitvector)."""

IS_AGGREGATOR: Final[EnrKey] = "is_aggregator"
"""Aggregator capability flag (1 byte: 0x00 = false, 0x01 = true)."""
3 changes: 3 additions & 0 deletions src/lean_spec/subspecs/networking/service/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,9 @@ class NetworkService:
fork_digest: str = field(default="0x00000000")
"""Fork digest for gossip topics (4-byte hex string)."""

is_aggregator: bool = field(default=False)
"""Whether this node functions as an aggregator."""

_running: bool = field(default=False, repr=False)
"""Whether the event loop is running."""

Expand Down
16 changes: 11 additions & 5 deletions src/lean_spec/subspecs/node/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,24 @@
from lean_spec.subspecs.containers.validator import ValidatorIndex


def is_aggregator(validator_id: ValidatorIndex | None) -> bool:
def is_aggregator(
validator_id: ValidatorIndex | None,
node_is_aggregator: bool = False,
) -> bool:
"""
Determine if a validator is an aggregator.

A validator acts as an aggregator when:
1. The validator is active (validator_id is not None)
2. The node operator has enabled aggregator mode

Args:
validator_id: The index of the validator.
node_is_aggregator: Whether the node is configured as an aggregator.

Returns:
True if the validator is an aggregator, False otherwise.
True if the validator should perform aggregation, False otherwise.
"""
if validator_id is None:
return False
return (
False # Placeholder implementation, in future should be defined by node operator settings
)
return node_is_aggregator
14 changes: 14 additions & 0 deletions src/lean_spec/subspecs/node/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,18 @@ class NodeConfig:
For devnet testing with ream, use "devnet0".
"""

is_aggregator: bool = field(default=False)
"""
Whether this node functions as an aggregator.

When True:
- The node performs attestation aggregation operations
- The ENR advertises aggregator capability to peers

When False (default):
- The node runs in standard validator or passive mode
"""


def get_local_validator_id(registry: ValidatorRegistry | None) -> ValidatorIndex | None:
"""
Expand Down Expand Up @@ -218,13 +230,15 @@ def from_genesis(cls, config: NodeConfig) -> Node:
clock=clock,
network=config.network,
database=database,
is_aggregator=config.is_aggregator,
)

chain_service = ChainService(sync_service=sync_service, clock=clock)
network_service = NetworkService(
sync_service=sync_service,
event_source=config.event_source,
fork_digest=config.fork_digest,
is_aggregator=config.is_aggregator,
)

# Create API server if configured
Expand Down
8 changes: 7 additions & 1 deletion src/lean_spec/subspecs/sync/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,9 @@ class SyncService:
database: Database | None = field(default=None)
"""Optional database for persisting blocks and states."""

is_aggregator: bool = field(default=False)
"""Whether this node functions as an aggregator."""

process_block: BlockProcessor = field(default=default_block_processor)
"""Block processor function. Defaults to Store.on_block()."""

Expand Down Expand Up @@ -425,7 +428,10 @@ async def on_gossip_attestation(
from lean_spec.subspecs.node.helpers import is_aggregator

# Check if we are an aggregator
is_aggregator_role = is_aggregator(self.store.validator_id)
is_aggregator_role = is_aggregator(
self.store.validator_id,
node_is_aggregator=self.is_aggregator,
)

# Integrate the attestation into forkchoice state.
#
Expand Down
Loading