diff --git a/docs/client/networking.md b/docs/client/networking.md index 8160d6eb..39989a5e 100644 --- a/docs/client/networking.md +++ b/docs/client/networking.md @@ -33,6 +33,7 @@ Each node entry contains an ENR. This is an Ethereum Node Record. It includes: - The node's public key - Network address - Port numbers +- Committee assignments (for aggregators) - Other metadata In production, dynamic discovery would replace static configuration. @@ -62,15 +63,35 @@ Messages are organized by topic. Topic names follow a pattern that includes: This structure lets clients subscribe to relevant messages and ignore others. +The payload carried in the gossipsub message is the SSZ-encoded, +Snappy-compressed message, which type is identified by the topic: + +| Topic Name | Message Type | Encoding | +|------------------------------------------------------------|-----------------------------|--------------| +| /lean/consensus/devnet3/blocks/ssz_snappy | SignedBlockWithAttestation | SSZ + Snappy | +| /lean/consensus/devnet3/attestations/ssz_snappy | SignedAttestation | SSZ + Snappy | +| /lean/consensus/devnet3/attestation_{subnet_id}/ssz_snappy | SignedAttestation | SSZ + Snappy | +| /lean/consensus/devnet3/aggregation/ssz_snappy | SignedAggregatedAttestation | SSZ + Snappy | + ### Message Types -Two main message types exist: +Three main message types exist: + +* _Blocks_, defined by the `SignedBlockWithAttestation` type, are proposed by +validators and propagated on the block topic. Every node needs to see blocks +quickly. -Blocks are proposed by validators. They propagate on the block topic. Every -node needs to see blocks quickly. +* _Attestations_, defined by the `SignedAttestation` type, come from all +validators. They propagate on the global attestation topic. Additionally, +each committee has its own attestation topic. Validators publish to their +committee's attestation topic and global attestation topic. Non-aggregating +validators subscribe only to the global attestation topic, while aggregators +subscribe to both the global and their committee's attestation topic. -Attestations come from all validators. They propagate on the attestation topic. High volume -but small messages. +* _Committee aggregations_, defined by the `SignedAggregatedAttestation` type, +created by committee aggregators. These combine attestations from committee +members. Aggregations propagate on the aggregation topic to which every +validator subscribes. ### Encoding diff --git a/docs/client/validator.md b/docs/client/validator.md index 3284c4f2..40b62f18 100644 --- a/docs/client/validator.md +++ b/docs/client/validator.md @@ -2,8 +2,9 @@ ## Overview -Validators participate in consensus by proposing blocks and producing attestations. This -document describes what honest validators do. +Validators participate in consensus by proposing blocks and producing attestations. +Optionally validators can opt-in to behave as aggregators in a single or multiple +committees. This document describes what honest validators do. ## Validator Assignment @@ -16,6 +17,32 @@ diversity helps test interoperability. In production, validator assignment will work differently. The current approach is temporary for devnet testing. +## Attestation Committees and Subnets + +Attestation committee is a group of validators contributing to the common +aggregated attestations. Subnets are network channels dedicated to specific committees. + +In the devnet-3 design, however, there is one global subnet for signed +attestations propagation, in addition to publishing into per committee subnets. +This is due to 3SF-mini consensus design, that requires 2/3+ of all +attestations to be observed by any validator to compute safe target correctly. + +Note that non-aggregating validators do not need to subscribe to committee +attestation subnets. They only need to subscribe to the global attestation +subnet. + +Every validator is assigned to a single committee. Number of committees is +defined in config.yaml. Each committee maps to a subnet ID. Validator's +subnet ID is derived using their validator index modulo number of committees. +This is to simplify debugging and testing. In the future, validator's subnet ID +will be assigned randomly per epoch. + +## Aggregator assignment + +Some validators are self-assigned as aggregators. Aggregators collect and combine +attestations from other validators in their committee. To become an aggregator, +a validator sets `is_aggregator` flag to true as ENR record field. + ## Proposing Blocks Each slot has exactly one designated proposer. The proposer is determined by @@ -52,7 +79,7 @@ receive and validate it. ## Attesting -Every validator attestations in every slot. Attesting happens in the second interval, +Every validator attests in every slot. Attesting happens in the second interval, after proposals are made. ### What to Attest For @@ -78,8 +105,8 @@ compute the head. ### Broadcasting Attestations -Validators sign their attestations and broadcast them. The network uses a single topic -for all attestations. No subnets or committees in the current design. +Validators sign their attestations and broadcast them into the global +attestation topic and its corresponding subnet topic. ## Timing @@ -98,11 +125,7 @@ blocks and attestations. Attestation aggregation combines multiple attestations into one. This saves bandwidth and block space. -Devnet 0 has no aggregation. Each attestation is separate. Future devnets will add -aggregation. - -When aggregation is added, aggregators will collect attestations and combine them. -Aggregated attestations will be broadcast separately. +Devnet-3 introduces signatures aggregation. Aggregators will collect attestations and combine them. Aggregated attestations will be broadcast separately. ## Signature Handling diff --git a/src/lean_spec/subspecs/chain/config.py b/src/lean_spec/subspecs/chain/config.py index aa00fee7..22adf314 100644 --- a/src/lean_spec/subspecs/chain/config.py +++ b/src/lean_spec/subspecs/chain/config.py @@ -37,6 +37,12 @@ VALIDATOR_REGISTRY_LIMIT: Final = Uint64(2**12) """The maximum number of validators that can be in the registry.""" +ATTESTATION_COMMITTEE_COUNT: Final = Uint64(1) +"""The number of attestation committees per slot.""" + +COMMITTEE_SIGNATURE_THRESHOLD_RATIO: Final = 0.9 +"""Default ratio of committee signature participation required to trigger aggregation.""" + class _ChainConfig(StrictBaseModel): """ @@ -52,6 +58,12 @@ class _ChainConfig(StrictBaseModel): historical_roots_limit: Uint64 validator_registry_limit: Uint64 + # Attestation / Networking + attestation_committee_count: Uint64 + + # Aggregation behavior + committee_signature_threshold_ratio: float + # The Devnet Chain Configuration. DEVNET_CONFIG: Final = _ChainConfig( @@ -59,4 +71,6 @@ class _ChainConfig(StrictBaseModel): justification_lookback_slots=JUSTIFICATION_LOOKBACK_SLOTS, historical_roots_limit=HISTORICAL_ROOTS_LIMIT, validator_registry_limit=VALIDATOR_REGISTRY_LIMIT, + attestation_committee_count=ATTESTATION_COMMITTEE_COUNT, + committee_signature_threshold_ratio=COMMITTEE_SIGNATURE_THRESHOLD_RATIO, ) diff --git a/src/lean_spec/subspecs/containers/attestation/attestation.py b/src/lean_spec/subspecs/containers/attestation/attestation.py index 1a0e7fb6..26e6e79f 100644 --- a/src/lean_spec/subspecs/containers/attestation/attestation.py +++ b/src/lean_spec/subspecs/containers/attestation/attestation.py @@ -20,6 +20,7 @@ from lean_spec.subspecs.ssz import hash_tree_root from lean_spec.types import Bytes32, Container, Uint64 +from ...xmss.aggregation import AggregatedSignatureProof from ...xmss.containers import Signature from ..checkpoint import Checkpoint from .aggregation_bits import AggregationBits @@ -107,3 +108,10 @@ def aggregate_by_data( ) for data, validator_ids in data_to_validator_ids.items() ] + +class SignedAggregatedAttestation(Container): + data: AttestationData + """Combined attestation data similar to the beacon chain format.""" + + proof: AggregatedSignatureProof + """Aggregated signature proof covering all participating validators.""" diff --git a/src/lean_spec/subspecs/containers/state/state.py b/src/lean_spec/subspecs/containers/state/state.py index 3326c2dc..d5b5266e 100644 --- a/src/lean_spec/subspecs/containers/state/state.py +++ b/src/lean_spec/subspecs/containers/state/state.py @@ -2,6 +2,7 @@ from typing import AbstractSet, Iterable +from lean_spec.subspecs.networking.subnet import compute_subnet_id, compute_subnet_size from lean_spec.subspecs.ssz.hash import hash_tree_root from lean_spec.subspecs.xmss.aggregation import ( AggregatedSignatureProof, @@ -30,6 +31,7 @@ JustifiedSlots, Validators, ) +from ...chain.config import ATTESTATION_COMMITTEE_COUNT class State(Container): @@ -715,15 +717,13 @@ def build_block( # Add new attestations and continue iteration attestations.extend(new_attestations) - # Compute the aggregated signatures for the attestations. - # If the attestations cannot be aggregated, split it in a greedy way. - aggregated_attestations, aggregated_signatures = self.compute_aggregated_signatures( + # Select aggregated attestations and proofs for the final block + aggregated_attestations, aggregated_signatures = self.select_aggregated_proofs( attestations, - gossip_signatures, aggregated_payloads, ) - # Update the block with the aggregated attestations + # Update the block with the aggregated attestations and proofs final_block = candidate_block.model_copy( update={ "body": BlockBody( @@ -738,26 +738,18 @@ def build_block( return final_block, post_state, aggregated_attestations, aggregated_signatures - def compute_aggregated_signatures( + def aggregate_gossip_signatures( self, attestations: list[Attestation], gossip_signatures: dict[SignatureKey, "Signature"] | None = None, - aggregated_payloads: dict[SignatureKey, list[AggregatedSignatureProof]] | None = None, - ) -> tuple[list[AggregatedAttestation], list[AggregatedSignatureProof]]: + threshold_ratio: float = 0.0, + ) -> list[tuple[AggregatedAttestation, AggregatedSignatureProof]]: """ - Compute aggregated signatures for a set of attestations. - - This method implements a two-phase signature collection strategy: - - 1. **Gossip Phase**: For each attestation group, first attempt to collect - individual XMSS signatures from the gossip network. These are fresh - signatures that validators broadcast when they attest. + Collect aggregated signatures from gossip network and aggregate them. - 2. **Fallback Phase**: For any validators not covered by gossip, fall back - to previously-seen aggregated proofs from blocks. This uses a greedy - set-cover approach to minimize the number of proofs needed. - - The result is a list of (attestation, proof) pairs ready for block inclusion. + For each attestation group, attempt to collect individual XMSS signatures + from the gossip network. These are fresh signatures that validators + broadcast when they attest. Parameters ---------- @@ -765,15 +757,15 @@ def compute_aggregated_signatures( Individual attestations to aggregate and sign. gossip_signatures : dict[SignatureKey, Signature] | None Per-validator XMSS signatures learned from the gossip network. - aggregated_payloads : dict[SignatureKey, list[AggregatedSignatureProof]] | None - Aggregated proofs learned from previously-seen blocks. + threshold_ratio : float + Minimum ratio of committee signatures required to produce an aggregation. + Defaults to 0.0 (aggregate even if only 1 signature). Returns: ------- - tuple[list[AggregatedAttestation], list[AggregatedSignatureProof]] - Paired attestations and their corresponding proofs. + list[tuple[AggregatedAttestation, AggregatedSignatureProof]] + - List of (attestation, proof) pairs from gossip collection. """ - # Accumulator for (attestation, proof) pairs. results: list[tuple[AggregatedAttestation, AggregatedSignatureProof]] = [] # Group individual attestations by data @@ -790,8 +782,6 @@ def compute_aggregated_signatures( # Get the list of validators who attested to this data. validator_ids = aggregated.aggregation_bits.to_validator_indices() - # Phase 1: Gossip Collection - # # When a validator creates an attestation, it broadcasts the # individual XMSS signature over the gossip network. If we have # received these signatures, we can aggregate them ourselves. @@ -803,16 +793,10 @@ def compute_aggregated_signatures( gossip_keys: list[PublicKey] = [] gossip_ids: list[Uint64] = [] - # Track validators we couldn't find signatures for. - # - # These will need to be covered by Phase 2 (existing proofs). - remaining: set[Uint64] = set() - # Attempt to collect each validator's signature from gossip. # # Signatures are keyed by (validator ID, data root). # - If a signature exists, we add it to our collection. - # - Otherwise, we mark that validator as "remaining" for the fallback phase. if gossip_signatures: for vid in validator_ids: key = SignatureKey(vid, data_root) @@ -821,18 +805,29 @@ def compute_aggregated_signatures( gossip_sigs.append(sig) gossip_keys.append(self.validators[vid].get_pubkey()) gossip_ids.append(vid) - else: - # No signature available: mark for fallback coverage. - remaining.add(vid) - else: - # No gossip data at all: all validators need fallback coverage. - remaining = set(validator_ids) # If we collected any gossip signatures, aggregate them into a proof. # # The aggregation combines multiple XMSS signatures into a single # compact proof that can verify all participants signed the message. if gossip_ids: + # Check participation threshold if required + if threshold_ratio > 0.0: + # Calculate committee size for the subnet of these validators + # We assume all validators in an aggregation group belong to the same subnet + first_validator_id = gossip_ids[0] + subnet_id = compute_subnet_id(first_validator_id, ATTESTATION_COMMITTEE_COUNT) + + # Count total validators in this subnet + committee_size = compute_subnet_size( + subnet_id, + ATTESTATION_COMMITTEE_COUNT, + len(self.validators), + ) + + if len(gossip_ids) < committee_size * threshold_ratio: + continue + participants = AggregationBits.from_validator_indices(gossip_ids) proof = AggregatedSignatureProof.aggregate( participants=participants, @@ -841,14 +836,53 @@ def compute_aggregated_signatures( message=data_root, epoch=data.slot, ) - results.append( - ( - AggregatedAttestation(aggregation_bits=participants, data=data), - proof, - ) - ) + attestation = AggregatedAttestation(aggregation_bits=participants, data=data) + results.append((attestation, proof)) + + return results + + def select_aggregated_proofs( + self, + attestations: list[Attestation], + aggregated_payloads: dict[SignatureKey, list[AggregatedSignatureProof]] | None = None, + ) -> tuple[list[AggregatedAttestation], list[AggregatedSignatureProof]]: + """ + Select aggregated proofs for a set of attestations. + + This method selects aggregated proofs from aggregated_payloads, + prioritizing proofs from the most recent blocks. - # Phase 2: Fallback to existing proofs + Strategy: + 1. For each attestation group, aggregate as many signatures as possible + from the most recent block's proofs. + 2. If remaining validators exist after step 1, include proofs from + previous blocks that cover them. + + Parameters: + ---------- + attestations : list[Attestation] + Individual attestations to aggregate and sign. + aggregated_payloads : dict[SignatureKey, list[AggregatedSignatureProof]] | None + Aggregated proofs learned from previously-seen blocks. + The list for each key should be ordered with most recent proofs first. + + Returns: + ------- + tuple[list[AggregatedAttestation], list[AggregatedSignatureProof]] + Paired attestations and their corresponding proofs. + """ + results: list[tuple[AggregatedAttestation, AggregatedSignatureProof]] = [] + + # Group individual attestations by data + for aggregated in AggregatedAttestation.aggregate_by_data(attestations): + data = aggregated.data + data_root = data.data_root_bytes() + validator_ids = aggregated.aggregation_bits.to_validator_indices() # validators contributed to this attestation + + # Validators that are missing in the current aggregation are put into remaining. + remaining: set[Uint64] = set(validator_ids) + + # Fallback to existing proofs # # Some validators may not have broadcast their signatures over gossip, # but we might have seen proofs for them in previously-received blocks. @@ -924,14 +958,10 @@ def compute_aggregated_signatures( remaining -= covered # Final Assembly - # - # - We built a list of (attestation, proof) tuples. - # - Now we unzip them into two parallel lists for the return value. - - # Handle the empty case explicitly. if not results: return [], [] # Unzip the results into parallel lists. aggregated_attestations, aggregated_proofs = zip(*results, strict=True) return list(aggregated_attestations), list(aggregated_proofs) + diff --git a/src/lean_spec/subspecs/forkchoice/store.py b/src/lean_spec/subspecs/forkchoice/store.py index 5625cf46..7adc1188 100644 --- a/src/lean_spec/subspecs/forkchoice/store.py +++ b/src/lean_spec/subspecs/forkchoice/store.py @@ -19,6 +19,8 @@ JUSTIFICATION_LOOKBACK_SLOTS, SECONDS_PER_INTERVAL, SECONDS_PER_SLOT, + ATTESTATION_COMMITTEE_COUNT, + COMMITTEE_SIGNATURE_THRESHOLD_RATIO, ) from lean_spec.subspecs.containers import ( Attestation, @@ -46,6 +48,10 @@ is_proposer, ) from lean_spec.types.container import Container +from lean_spec.subspecs.networking import compute_subnet_id + +from src.lean_spec.subspecs.containers.attestation.attestation import SignedAggregatedAttestation +from src.lean_spec.subspecs.xmss.aggregation import AggregationError class Store(Container): @@ -143,9 +149,15 @@ class Store(Container): - Only stores the attestation data, not signatures. """ - gossip_signatures: dict[SignatureKey, Signature] = {} + aggregated_in_current_slot: Boolean = Boolean(False) + """ + Tracks whether committee signatures have been successfully aggregated in the current slot. + Reset at the start of each slot (Interval 0). """ - Per-validator XMSS signatures learned from gossip. + + gossip_committee_signatures: dict[SignatureKey, Signature] = {} + """ + Per-validator XMSS signatures learned from committee attesters. Keyed by SignatureKey(validator_id, attestation_data_root). """ @@ -269,6 +281,8 @@ def validate_attestation(self, attestation: Attestation) -> None: def on_gossip_attestation( self, signed_attestation: SignedAttestation, + is_aggregator: bool, + current_validator_id: Uint64, scheme: GeneralizedXmssScheme = TARGET_SIGNATURE_SCHEME, ) -> "Store": """ @@ -276,12 +290,15 @@ def on_gossip_attestation( This method: 1. Verifies the XMSS signature - 2. Stores the signature in the gossip signature map + 2. If current node is aggregator, stores the signature in the gossip signature map if it belongs + to the current validator's subnet 3. Processes the attestation data via on_attestation Args: signed_attestation: The signed attestation from gossip. scheme: XMSS signature scheme for verification. + is_aggregator: True if current validator holds aggregator role. + current_validator_id: Index of the current validator processing this attestation. Returns: New Store with attestation processed and signature stored. @@ -313,16 +330,29 @@ def on_gossip_attestation( public_key, attestation_data.slot, attestation_data.data_root_bytes(), scheme ), "Signature verification failed" - # Store signature for later lookup during block building - new_gossip_sigs = dict(self.gossip_signatures) - sig_key = SignatureKey(validator_id, attestation_data.data_root_bytes()) - new_gossip_sigs[sig_key] = signature - # Process the attestation data store = self.on_attestation(attestation=attestation, is_from_block=False) - # Return store with updated signature map - return store.model_copy(update={"gossip_signatures": new_gossip_sigs}) + current_validator_subnet = compute_subnet_id(current_validator_id, ATTESTATION_COMMITTEE_COUNT) + attester_subnet = compute_subnet_id(validator_id, ATTESTATION_COMMITTEE_COUNT) + + # Store signature for later aggregation if applicable + new_commitee_sigs = dict(self.gossip_committee_signatures) + if is_aggregator and current_validator_subnet == attester_subnet: + # If this validator is an aggregator for this attestation, + # also store the signature in the committee signatures map. + sig_key = SignatureKey(validator_id, attestation_data.data_root_bytes()) + new_commitee_sigs[sig_key] = signature + + # If in the interval 2 of the slot and not yet aggregated, try to aggregate + current_interval = (self.time // SECONDS_PER_INTERVAL) % INTERVALS_PER_SLOT + if current_interval == 2 and not store.aggregated_in_current_slot: + store = store.aggregate_committee_signatures( + threshold_ratio=COMMITTEE_SIGNATURE_THRESHOLD_RATIO + ) + + # Return store with updated signature maps + return store.model_copy(update={"gossip_committee_signatures": new_commitee_sigs}) def on_attestation( self, @@ -454,9 +484,92 @@ def on_attestation( } ) + def on_gossip_aggregated_attestation(self, signed_attestation: SignedAggregatedAttestation) -> "Store": + """ + Process a signed aggregated attestation received via aggregation topic + + This method: + 1. Verifies the aggregated attestation + 2. Stores the aggregation in aggregation_payloads map + + Args: + signed_attestation: The signed aggregated attestation from committee aggregation. + + Returns: + New Store with aggregation processed and stored. + + Raises: + ValueError: If validator not found in state. + AssertionError: If signature verification fails. + """ + data = signed_attestation.data + proof = signed_attestation.proof + + # Get validator IDs who participated in this aggregation + validator_ids = proof.participants.to_validator_indices() + + # Retrieve the relevant state to look up public keys for verification. + key_state = self.states.get(data.target.root) + assert key_state is not None, ( + f"No state available to verify committee aggregation for target " + f"{data.target.root.hex()}" + ) + + # Ensure all participants exist in the active set + validators = key_state.validators + for validator_id in validator_ids: + assert validator_id < Uint64(len(validators)), ( + f"Validator {validator_id} not found in state {data.target.root.hex()}" + ) + + # Prepare public keys for verification + public_keys = [validators[vid].get_pubkey() for vid in validator_ids] + + # Verify the leanVM aggregated proof + try: + proof.verify( + public_keys=public_keys, + message=data.data_root_bytes(), + epoch=data.slot, + ) + except AggregationError as exc: + raise AssertionError( + f"Committee aggregation signature verification failed: {exc}" + ) from exc + + # Copy the aggregated proof map for updates + # Must deep copy the lists to maintain immutability of previous store snapshots + new_aggregated_payloads = copy.deepcopy(self.aggregated_payloads) + data_root = data.data_root_bytes() + + store = self + for vid in validator_ids: + # Update Proof Map + # + # Store the proof so future block builders can reuse this aggregation + key = SignatureKey(vid, data_root) + new_aggregated_payloads.setdefault(key, []).append(proof) + + + # Process the attestation data. Since it's from gossip, is_from_block=False. + # Note, we could have already processed individual attestations from this aggregation, + # during votes propagation into attestation topic, but it's safe to re-process here as + # on_attestation has idempotent behavior. + store = store.on_attestation( + attestation=Attestation(validator_id=vid, data=data), + is_from_block=False, + ) + + # Return store with updated aggregated payloads + return store.model_copy(update={"aggregated_payloads": new_aggregated_payloads}) + + + + def on_block( self, signed_block_with_attestation: SignedBlockWithAttestation, + current_validator: Uint64, scheme: GeneralizedXmssScheme = TARGET_SIGNATURE_SCHEME, ) -> "Store": """ @@ -492,6 +605,7 @@ def on_block( Args: signed_block_with_attestation: Complete signed block with proposer attestation. + current_validator: Index of the current validator processing this block. scheme: XMSS signature scheme to use for signature verification. Returns: @@ -598,16 +712,21 @@ def on_block( # 1. NOT affect this block's fork choice position (processed as "new") # 2. Be available for inclusion in future blocks # 3. Influence fork choice only after interval 3 (end of slot) - # - # We also store the proposer's signature for potential future block building. - proposer_sig_key = SignatureKey( - proposer_attestation.validator_id, - proposer_attestation.data.data_root_bytes(), - ) - new_gossip_sigs = dict(store.gossip_signatures) - new_gossip_sigs[proposer_sig_key] = ( - signed_block_with_attestation.signature.proposer_signature - ) + + new_gossip_sigs = dict(store.gossip_committee_signatures) + + # Store proposer signature for future lookup if he belongs to the same committee as current validator + proposer_validator_id = proposer_attestation.validator_id + proposer_subnet_id = compute_subnet_id(proposer_validator_id, ATTESTATION_COMMITTEE_COUNT) + current_validator_subnet_id = compute_subnet_id(current_validator, ATTESTATION_COMMITTEE_COUNT) + if proposer_subnet_id == current_validator_subnet_id: + proposer_sig_key = SignatureKey( + proposer_attestation.validator_id, + proposer_attestation.data.data_root_bytes(), + ) + new_gossip_sigs[proposer_sig_key] = ( + signed_block_with_attestation.signature.proposer_signature + ) store = store.on_attestation( attestation=proposer_attestation, @@ -615,7 +734,7 @@ def on_block( ) # Update store with proposer signature - store = store.model_copy(update={"gossip_signatures": new_gossip_sigs}) + store = store.model_copy(update={"gossip_committee_signatures": new_gossip_sigs}) return store @@ -760,7 +879,7 @@ def accept_new_attestations(self) -> "Store": - Interval 0: Block proposal - Interval 1: Validators cast attestations (enter "new") - Interval 2: Safe target update - - Interval 3: Attestations accepted (move to "known") + - Interval 3: Process accumulated attestations This staged progression ensures proper timing and prevents premature influence on fork choice decisions. @@ -814,7 +933,63 @@ def update_safe_target(self) -> "Store": return self.model_copy(update={"safe_target": safe_target}) - def tick_interval(self, has_proposal: bool) -> "Store": + def aggregate_committee_signatures(self, threshold_ratio: float = 0.0) -> "Store": + """ + Aggregate committee signatures for attestations in committee_signatures. + + This method aggregates signatures from the gossip_committee_signatures map. + + Args: + threshold_ratio: Minimum participation ratio (0.0 to 1.0). + Aggregates only if signature count / committee size >= ratio. + + Returns: + New Store with updated aggregated_payloads and aggregated_in_current_slot flag. + """ + new_aggregated_payloads = dict(self.aggregated_payloads) + + attestations = self.latest_new_attestations + committee_signatures = self.gossip_committee_signatures + + head_state = self.states[self.head] + # Perform aggregation + aggregated_results = head_state.aggregate_gossip_signatures( + attestations, + committee_signatures, + threshold_ratio=threshold_ratio, + ) + + # iterate to broadcast aggregated attestations + for aggregated_attestation, aggregated_signature in aggregated_results: + signed_aggregated_attestation = SignedAggregatedAttestation( + data = aggregated_attestation.data, + proof = aggregated_signature, + ) + # Note: here we should broadcast the aggregated signature to committee_aggregators topic + + # Compute new aggregated payloads + for aggregated_attestation, aggregated_signature in aggregated_results: + data_root = aggregated_attestation.data.data_root_bytes() + validator_ids = aggregated_signature.participants.to_validator_indices() + for vid in validator_ids: + sig_key = SignatureKey(vid, data_root) + if sig_key not in new_aggregated_payloads: + new_aggregated_payloads[sig_key] = [] + new_aggregated_payloads[sig_key].append(aggregated_signature) + + # If we produced any aggregations, mark as done for this slot + aggregated_flag = self.aggregated_in_current_slot + if aggregated_results: + aggregated_flag = Boolean(True) + + return self.model_copy( + update={ + "aggregated_payloads": new_aggregated_payloads, + "aggregated_in_current_slot": aggregated_flag, + } + ) + + def tick_interval(self, has_proposal: bool, is_aggregator: bool) -> "Store": """ Advance store time by one interval and perform interval-specific actions. @@ -833,21 +1008,20 @@ def tick_interval(self, has_proposal: bool) -> "Store": - If proposal exists, immediately accept new attestations - This ensures validators see the block before attesting - **Interval 1 (Validator Attesting)**: - - Validators create and gossip attestations - - No store action (waiting for attestations to arrive) - **Interval 2 (Safe Target Update)**: - Compute safe target with 2/3+ majority - Provides validators with a stable attestation target + - Aggregators check for 90% participation before aggregating **Interval 3 (Attestation Acceptance)**: - Accept accumulated attestations (new → known) - Update head based on new attestation weights - Prepare for next slot + - Aggregators force aggregation if not done yet Args: has_proposal: Whether a proposal exists for this interval. + is_aggregator: Whether the node is an aggregator. Returns: New Store with advanced time and interval-specific updates applied. @@ -857,19 +1031,26 @@ def tick_interval(self, has_proposal: bool) -> "Store": current_interval = store.time % SECONDS_PER_SLOT % INTERVALS_PER_SLOT if current_interval == Uint64(0): - # Start of slot - process attestations if proposal exists + # Start of slot - reset flags and process attestations if proposal exists + store = store.model_copy(update={"aggregated_in_current_slot": Boolean(False)}) if has_proposal: store = store.accept_new_attestations() elif current_interval == Uint64(2): # Mid-slot - update safe target for validators store = store.update_safe_target() + if is_aggregator: + # Wait for configured ratio of signatures from subnet validators + store = store.aggregate_committee_signatures(threshold_ratio=COMMITTEE_SIGNATURE_THRESHOLD_RATIO) elif current_interval == Uint64(3): - # End of slot - accept accumulated attestations + # End of slot - finalize aggregation and accept attestations + if is_aggregator and not store.aggregated_in_current_slot: + # Aggregate no matter how many signatures if not done before + store = store.aggregate_committee_signatures(threshold_ratio=0.0) store = store.accept_new_attestations() return store - def on_tick(self, time: Uint64, has_proposal: bool) -> "Store": + def on_tick(self, time: Uint64, has_proposal: bool, is_aggregator: bool) -> "Store": """ Advance forkchoice store time to given timestamp. @@ -880,6 +1061,7 @@ def on_tick(self, time: Uint64, has_proposal: bool) -> "Store": Args: time: Target time in seconds since genesis. has_proposal: Whether node has proposal for current slot. + is_aggregator: Whether the node is an aggregator. Returns: New Store with time advanced and all interval actions performed. @@ -899,7 +1081,7 @@ def on_tick(self, time: Uint64, has_proposal: bool) -> "Store": should_signal_proposal = has_proposal and (store.time + Uint64(1)) == tick_interval_time # Advance by one interval with appropriate signaling - store = store.tick_interval(should_signal_proposal) + store = store.tick_interval(should_signal_proposal, is_aggregator) return store @@ -928,7 +1110,9 @@ def get_proposal_head(self, slot: Slot) -> tuple["Store", Bytes32]: slot_time = self.config.genesis_time + slot * SECONDS_PER_SLOT # Advance time to current slot (ticking intervals) - store = self.on_tick(slot_time, True) + # It is safe not to aggregate during advancement, as it is too + # late to aggregate committee signatures anyway when proposing + store = self.on_tick(slot_time, True, is_aggregator=False) # Process any pending attestations before proposal store = store.accept_new_attestations() @@ -1095,7 +1279,7 @@ def produce_block_with_signatures( parent_root=head_root, available_attestations=available_attestations, known_block_roots=set(store.blocks.keys()), - gossip_signatures=store.gossip_signatures, + gossip_signatures=store.gossip_committee_signatures, aggregated_payloads=store.aggregated_payloads, ) diff --git a/src/lean_spec/subspecs/networking/__init__.py b/src/lean_spec/subspecs/networking/__init__.py index 086ad046..70c00424 100644 --- a/src/lean_spec/subspecs/networking/__init__.py +++ b/src/lean_spec/subspecs/networking/__init__.py @@ -34,6 +34,7 @@ ) from .transport import PeerId from .types import DomainType, ForkDigest, ProtocolId +from .subnet import compute_subnet_id __all__ = [ # Config @@ -73,4 +74,5 @@ "ForkDigest", "PeerId", "ProtocolId", + "compute_subnet_id", ] diff --git a/src/lean_spec/subspecs/networking/gossipsub/topic.py b/src/lean_spec/subspecs/networking/gossipsub/topic.py index 0bb2040b..b9faa8ef 100644 --- a/src/lean_spec/subspecs/networking/gossipsub/topic.py +++ b/src/lean_spec/subspecs/networking/gossipsub/topic.py @@ -87,6 +87,19 @@ Used in the topic string to identify signed attestation messages. """ +ATTESTATION_SUBNET_TOPIC_NAME: str = "attestation_{subnet_id}" +"""Template topic name for attestation subnet messages. + +Used in the topic string to identify attestation messages for a specific subnet. +`{subnet_id}` should be replaced with the subnet identifier (0-63). +""" + +AGGREGATED_ATTESTATION_TOPIC_NAME: str = "aggregation" +"""Topic name for committee aggregation messages. + +Used in the topic string to identify committee's aggregation messages. +""" + class TopicKind(Enum): """Gossip topic types. @@ -103,6 +116,12 @@ class TopicKind(Enum): ATTESTATION = ATTESTATION_TOPIC_NAME """Signed attestation messages.""" + ATTESTATION_SUBNET = ATTESTATION_SUBNET_TOPIC_NAME + """Attestation subnet messages.""" + + AGGREGATED_ATTESTATION = AGGREGATED_ATTESTATION_TOPIC_NAME + """Committee aggregated signatures messages.""" + def __str__(self) -> str: """Return the topic name string.""" return self.value @@ -207,6 +226,18 @@ def attestation(cls, fork_digest: str) -> GossipTopic: """ return cls(kind=TopicKind.ATTESTATION, fork_digest=fork_digest) + @classmethod + def committee_aggregation(cls, fork_digest: str) -> GossipTopic: + """Create a committee aggregation topic for the given fork. + + Args: + fork_digest: Fork digest as 0x-prefixed hex string. + + Returns: + GossipTopic for committee aggregation messages. + """ + return cls(kind=TopicKind.COMMITTEE_AGGREGATION, fork_digest=fork_digest) + def format_topic_string( topic_name: str, diff --git a/src/lean_spec/subspecs/networking/subnet.py b/src/lean_spec/subspecs/networking/subnet.py new file mode 100644 index 00000000..75b0f268 --- /dev/null +++ b/src/lean_spec/subspecs/networking/subnet.py @@ -0,0 +1,40 @@ +"""Subnet helpers for networking. + +Provides a small utility to compute a validator's attestation subnet id from +its validator index and number of committees. +""" +from __future__ import annotations + +from src.lean_spec.types import Uint64 + + +def compute_subnet_id(validator_index: Uint64, num_committees: Uint64) -> Uint64: + """Compute the attestation subnet id for a validator. + + Args: + validator_index: Non-negative validator index . + num_committees: Positive number of committees. + + Returns: + An integer subnet id in 0..(num_committees-1). + """ + subnet_id = validator_index % num_committees + return subnet_id + +def compute_subnet_size(subnet_id: Uint64, num_committees: Uint64, total_validators: Uint64) -> Uint64: + """Compute the size of a given subnet. + + Args: + subnet_id: The subnet id to compute the size for. + num_committees: Positive number of committees. + total_validators: Total number of validators. + + Returns: + The size of the specified subnet. + """ + base_size = total_validators // num_committees + remainder = total_validators % num_committees + if subnet_id < remainder: + return base_size + 1 + else: + return base_size \ No newline at end of file