diff --git a/src/lean_spec/subspecs/networking/client/event_source.py b/src/lean_spec/subspecs/networking/client/event_source.py index 563ca269..84d35a23 100644 --- a/src/lean_spec/subspecs/networking/client/event_source.py +++ b/src/lean_spec/subspecs/networking/client/event_source.py @@ -21,6 +21,7 @@ from lean_spec.subspecs.containers import SignedBlockWithAttestation from lean_spec.subspecs.containers.attestation import SignedAttestation +from lean_spec.subspecs.networking.config import GOSSIPSUB_DEFAULT_PROTOCOL_ID from lean_spec.subspecs.networking.gossipsub.topic import GossipTopic, TopicKind from lean_spec.subspecs.networking.reqresp.message import Status from lean_spec.subspecs.networking.service.events import ( @@ -36,6 +37,7 @@ ConnectionManager, YamuxConnection, ) +from lean_spec.subspecs.networking.varint import encode as encode_varint from .reqresp_client import ReqRespClient @@ -56,20 +58,7 @@ class LiveNetworkEventSource: - Accept incoming connections and emit PeerConnectedEvent - Dial outbound connections and emit PeerConnectedEvent - Exchange Status messages and emit PeerStatusEvent - - (Future) Handle gossip messages and emit GossipBlockEvent/GossipAttestationEvent - - Usage - ----- - :: - - event_source = LiveNetworkEventSource.create(connection_manager) - - # Dial bootnodes - await event_source.dial("/ip4/127.0.0.1/tcp/9000") - - # Consume events - async for event in event_source: - await handle_event(event) + - Publish locally-produced blocks and attestations to the gossip network """ connection_manager: ConnectionManager @@ -274,10 +263,6 @@ def stop(self) -> None: """Stop the event source.""" self._running = False - # ========================================================================= - # Gossip Message Handling (placeholder for future implementation) - # ========================================================================= - async def _emit_gossip_block( self, block: SignedBlockWithAttestation, @@ -309,3 +294,58 @@ async def _emit_gossip_attestation( await self._events.put( GossipAttestationEvent(attestation=attestation, peer_id=peer_id, topic=topic) ) + + async def publish(self, topic: str, data: bytes) -> None: + """ + Broadcast a message to all connected peers on a topic. + + Used by NetworkService to publish locally-produced blocks and + attestations to the gossip network. + + Args: + topic: Gossip topic string. + data: Compressed message bytes (SSZ + Snappy). + """ + if not self._connections: + logger.debug("No peers connected, cannot publish to %s", topic) + return + + for peer_id, conn in list(self._connections.items()): + try: + await self._send_gossip_message(conn, topic, data) + except Exception as e: + logger.warning("Failed to publish to peer %s: %s", peer_id, e) + + async def _send_gossip_message( + self, + conn: YamuxConnection, + topic: str, + data: bytes, + ) -> None: + """ + Send a gossip message to a peer. + + Opens a new stream for the gossip message and sends the data. + + Args: + conn: Connection to the peer. + topic: Topic string for the message. + data: Message bytes to send. + """ + # Open a new outbound stream for gossip protocol. + stream = await conn.open_stream(GOSSIPSUB_DEFAULT_PROTOCOL_ID) + + try: + # Format: topic length (varint) + topic + data length (varint) + data + topic_bytes = topic.encode("utf-8") + + # Write topic length and topic. + await stream.write(encode_varint(len(topic_bytes))) + await stream.write(topic_bytes) + + # Write data length and data. + await stream.write(encode_varint(len(data))) + await stream.write(data) + + finally: + await stream.close() diff --git a/src/lean_spec/subspecs/networking/config.py b/src/lean_spec/subspecs/networking/config.py index 85c55129..a5b3fd8a 100644 --- a/src/lean_spec/subspecs/networking/config.py +++ b/src/lean_spec/subspecs/networking/config.py @@ -2,7 +2,7 @@ from typing_extensions import Final -from lean_spec.types.byte_arrays import Bytes4 +from lean_spec.types.byte_arrays import Bytes1 from .types import DomainType @@ -32,8 +32,36 @@ # --- Gossip Message Domains --- -MESSAGE_DOMAIN_INVALID_SNAPPY: Final[DomainType] = Bytes4(b"\x00\x00\x00\x00") -"""4-byte domain for gossip message-id isolation of invalid snappy messages.""" +MESSAGE_DOMAIN_INVALID_SNAPPY: Final[DomainType] = Bytes1(b"\x00") +"""1-byte domain for gossip message-id isolation of invalid snappy messages. -MESSAGE_DOMAIN_VALID_SNAPPY: Final[DomainType] = Bytes4(b"\x01\x00\x00\x00") -"""4-byte domain for gossip message-id isolation of valid snappy messages.""" +Per Ethereum spec, prepended to the message hash when decompression fails. +""" + +MESSAGE_DOMAIN_VALID_SNAPPY: Final[DomainType] = Bytes1(b"\x01") +"""1-byte domain for gossip message-id isolation of valid snappy messages. + +Per Ethereum spec, prepended to the message hash when decompression succeeds. +""" + +# --- Gossipsub Protocol IDs --- + +GOSSIPSUB_PROTOCOL_ID_V10: Final[str] = "/meshsub/1.0.0" +"""Gossipsub v1.0 protocol ID - basic mesh pubsub.""" + +GOSSIPSUB_PROTOCOL_ID_V11: Final[str] = "/meshsub/1.1.0" +"""Gossipsub v1.1 protocol ID - peer scoring, extended validators. + +This is the minimum version required by the Ethereum consensus spec. +""" + +GOSSIPSUB_PROTOCOL_ID_V12: Final[str] = "/meshsub/1.2.0" +"""Gossipsub v1.2 protocol ID - IDONTWANT bandwidth optimization.""" + +GOSSIPSUB_DEFAULT_PROTOCOL_ID: Final[str] = GOSSIPSUB_PROTOCOL_ID_V11 +""" +Default protocol ID per Ethereum consensus spec requirements. + +The Ethereum consensus P2P spec states: +"Clients MUST support the gossipsub v1 libp2p Protocol including the gossipsub v1.1 extension." +""" diff --git a/src/lean_spec/subspecs/networking/enr/enr.py b/src/lean_spec/subspecs/networking/enr/enr.py index a8377ff4..b302fd38 100644 --- a/src/lean_spec/subspecs/networking/enr/enr.py +++ b/src/lean_spec/subspecs/networking/enr/enr.py @@ -49,15 +49,24 @@ - EIP-778: https://eips.ethereum.org/EIPS/eip-778 """ +from __future__ import annotations + +import base64 from typing import ClassVar +from typing_extensions import Self + from lean_spec.subspecs.networking.types import Multiaddr, NodeId, SeqNumber -from lean_spec.types import StrictBaseModel +from lean_spec.types import RLPDecodingError, StrictBaseModel, Uint64 +from lean_spec.types.rlp import decode_list as rlp_decode_list from . import keys from .eth2 import AttestationSubnets, Eth2Data from .keys import EnrKey +ENR_PREFIX = "enr:" +"""Text prefix for ENR strings.""" + class ENR(StrictBaseModel): r""" @@ -218,3 +227,68 @@ def __str__(self) -> str: if eth2 := self.eth2_data: parts.append(f"fork={eth2.fork_digest.hex()}") return ", ".join(parts) + ")" + + @classmethod + def from_string(cls, enr_text: str) -> Self: + """ + Parse an ENR from its text representation. + + Text format is URL-safe base64 with `enr:` prefix. + + Args: + enr_text: ENR string (e.g., "enr:-IS4Q...") + + Returns: + Parsed ENR instance. + + Raises: + ValueError: If the string is malformed or RLP decoding fails. + """ + if not enr_text.startswith(ENR_PREFIX): + raise ValueError(f"ENR must start with '{ENR_PREFIX}'") + + # Extract base64url content after prefix. + b64_content = enr_text[len(ENR_PREFIX) :] + + # Base64url decode (add padding if needed). + # + # Python's base64.urlsafe_b64decode requires proper padding. + padding = 4 - (len(b64_content) % 4) + if padding != 4: + b64_content += "=" * padding + + try: + rlp_data = base64.urlsafe_b64decode(b64_content) + except Exception as e: + raise ValueError(f"Invalid base64 encoding: {e}") from e + + # RLP decode: [signature, seq, k1, v1, k2, v2, ...] + try: + items = rlp_decode_list(rlp_data) + except RLPDecodingError as e: + raise ValueError(f"Invalid RLP encoding: {e}") from e + + if len(items) < 2: + raise ValueError("ENR must have at least signature and seq") + + if len(items) % 2 != 0: + raise ValueError("ENR key/value pairs must be even") + + signature = items[0] + seq_bytes = items[1] + seq = int.from_bytes(seq_bytes, "big") if seq_bytes else 0 + + # Parse key/value pairs. + # + # Keys are strings, values are arbitrary bytes. + pairs: dict[str, bytes] = {} + for i in range(2, len(items), 2): + key = items[i].decode("utf-8") + value = items[i + 1] + pairs[key] = value + + return cls( + signature=signature, + seq=Uint64(seq), + pairs=pairs, + ) diff --git a/src/lean_spec/subspecs/networking/gossipsub/parameters.py b/src/lean_spec/subspecs/networking/gossipsub/parameters.py index 96f49b80..641848c5 100644 --- a/src/lean_spec/subspecs/networking/gossipsub/parameters.py +++ b/src/lean_spec/subspecs/networking/gossipsub/parameters.py @@ -60,6 +60,7 @@ from __future__ import annotations from lean_spec.subspecs.chain.config import DEVNET_CONFIG +from lean_spec.subspecs.networking.config import GOSSIPSUB_DEFAULT_PROTOCOL_ID from lean_spec.types import StrictBaseModel @@ -71,7 +72,7 @@ class GossipsubParameters(StrictBaseModel): Default values follow the Ethereum consensus P2P specification. """ - protocol_id: str = "/meshsub/1.3.0" + protocol_id: str = GOSSIPSUB_DEFAULT_PROTOCOL_ID """The protocol ID for gossip messages.""" # ------------------------------------------------------------------------- diff --git a/src/lean_spec/subspecs/networking/service/events.py b/src/lean_spec/subspecs/networking/service/events.py index 347e3fe9..91b6334b 100644 --- a/src/lean_spec/subspecs/networking/service/events.py +++ b/src/lean_spec/subspecs/networking/service/events.py @@ -128,7 +128,8 @@ class NetworkEventSource(Protocol): Abstract source of network events. This protocol defines the interface that network implementations must - provide. It is an async iterator that yields NetworkEvent objects. + provide. It is an async iterator that yields NetworkEvent objects and + supports publishing outbound messages. Any class that implements async iteration over NetworkEvent can serve as a source. @@ -161,3 +162,15 @@ async def __anext__(self) -> NetworkEvent: StopAsyncIteration: When no more events will arrive. """ ... + + async def publish(self, topic: str, data: bytes) -> None: + """ + Publish a message to all connected peers on a topic. + + Used to broadcast locally-produced blocks and attestations. + + Args: + topic: Gossip topic string. + data: Message bytes to publish. + """ + ... diff --git a/src/lean_spec/subspecs/networking/service/service.py b/src/lean_spec/subspecs/networking/service/service.py index af7022d8..26244ea5 100644 --- a/src/lean_spec/subspecs/networking/service/service.py +++ b/src/lean_spec/subspecs/networking/service/service.py @@ -11,7 +11,8 @@ 1. Consumes events from an abstract source (async iterator) 2. Routes each event to the appropriate sync handler -3. Runs until stopped or the source exhausts +3. Publishes locally-produced blocks and attestations to peers +4. Runs until stopped or the source exhausts This means: - The network layer produces events, @@ -21,9 +22,15 @@ from __future__ import annotations +import logging from dataclasses import dataclass, field from typing import TYPE_CHECKING +from lean_spec.snappy import frame_compress +from lean_spec.subspecs.containers import SignedBlockWithAttestation +from lean_spec.subspecs.containers.attestation import SignedAttestation +from lean_spec.subspecs.networking.gossipsub.topic import GossipTopic + from .events import ( GossipAttestationEvent, GossipBlockEvent, @@ -37,20 +44,24 @@ if TYPE_CHECKING: from lean_spec.subspecs.sync import SyncService +logger = logging.getLogger(__name__) + @dataclass(slots=True) class NetworkService: """ - Routes network events to the sync service. + Routes network events to the sync service and publishes outbound messages. + + This service: - This service is intentionally minimal. It does not: + - Routes inbound events to sync handlers + - Publishes locally-produced blocks and attestations to the network + + It does not: - Manage connections (libp2p handles this) - Score peers (libp2p gossipsub handles this) - Buffer events (async iteration provides backpressure) - - Produce outbound messages (validators need this, not sync) - - It only routes inbound events to the appropriate handlers. """ sync_service: SyncService @@ -59,6 +70,9 @@ class NetworkService: event_source: NetworkEventSource """Source of network events (libp2p wrapper or test mock).""" + fork_digest: str = field(default="0x00000000") + """Fork digest for gossip topics (4-byte hex string).""" + _running: bool = field(default=False, repr=False) """Whether the event loop is running.""" @@ -162,3 +176,37 @@ def is_running(self) -> bool: def events_processed(self) -> int: """Total events processed since creation.""" return self._events_processed + + async def publish_block(self, block: SignedBlockWithAttestation) -> None: + """ + Publish a block to the gossip network. + + Encodes the block as SSZ, compresses with Snappy, and broadcasts + to all connected peers on the block topic. + + Args: + block: Signed block to publish. + """ + topic = GossipTopic.block(self.fork_digest) + ssz_bytes = block.encode_bytes() + compressed = frame_compress(ssz_bytes) + + await self.event_source.publish(str(topic), compressed) + logger.debug("Published block at slot %s", block.message.block.slot) + + async def publish_attestation(self, attestation: SignedAttestation) -> None: + """ + Publish an attestation to the gossip network. + + Encodes the attestation as SSZ, compresses with Snappy, and broadcasts + to all connected peers on the attestation topic. + + Args: + attestation: Signed attestation to publish. + """ + topic = GossipTopic.attestation(self.fork_digest) + ssz_bytes = attestation.encode_bytes() + compressed = frame_compress(ssz_bytes) + + await self.event_source.publish(str(topic), compressed) + logger.debug("Published attestation for slot %s", attestation.message.slot) diff --git a/src/lean_spec/subspecs/networking/types.py b/src/lean_spec/subspecs/networking/types.py index 62f29693..98c59334 100644 --- a/src/lean_spec/subspecs/networking/types.py +++ b/src/lean_spec/subspecs/networking/types.py @@ -3,10 +3,16 @@ from enum import IntEnum, auto from lean_spec.types import Uint64 -from lean_spec.types.byte_arrays import Bytes4, Bytes32 +from lean_spec.types.byte_arrays import Bytes1, Bytes4, Bytes32 -DomainType = Bytes4 -"""4-byte domain for message-id isolation in Gossipsub.""" +DomainType = Bytes1 +"""1-byte domain for message-id isolation in Gossipsub. + +The domain is a single byte prepended to the message hash to compute the gossip message ID. + +- Valid messages use 0x01, +- Invalid messages use 0x00. +""" NodeId = Bytes32 """32-byte node identifier for Discovery v5, derived from ``keccak256(pubkey)``.""" diff --git a/src/lean_spec/subspecs/node/node.py b/src/lean_spec/subspecs/node/node.py index 978cc9ef..37adf9fe 100644 --- a/src/lean_spec/subspecs/node/node.py +++ b/src/lean_spec/subspecs/node/node.py @@ -214,12 +214,16 @@ def from_genesis(cls, config: NodeConfig) -> Node: # # Validators need keys to sign blocks and attestations. # Without a registry, the node runs in passive mode. + # + # Wire callbacks to publish produced blocks/attestations to the network. validator_service: ValidatorService | None = None if config.validator_registry is not None: validator_service = ValidatorService( sync_service=sync_service, clock=clock, registry=config.validator_registry, + on_block=network_service.publish_block, + on_attestation=network_service.publish_attestation, ) return cls( diff --git a/src/lean_spec/types/__init__.py b/src/lean_spec/types/__init__.py index c6cebe57..c179e04c 100644 --- a/src/lean_spec/types/__init__.py +++ b/src/lean_spec/types/__init__.py @@ -13,6 +13,9 @@ SSZTypeError, SSZValueError, ) +from .rlp import RLPDecodingError, RLPItem +from .rlp import decode as rlp_decode +from .rlp import encode as rlp_encode from .ssz_base import SSZType from .uint import Uint64 from .validator import is_proposer @@ -37,6 +40,11 @@ "SSZType", "Boolean", "Container", + # RLP encoding/decoding + "rlp_encode", + "rlp_decode", + "RLPItem", + "RLPDecodingError", # Exceptions "SSZError", "SSZTypeError", diff --git a/src/lean_spec/types/rlp.py b/src/lean_spec/types/rlp.py new file mode 100644 index 00000000..b278cd15 --- /dev/null +++ b/src/lean_spec/types/rlp.py @@ -0,0 +1,301 @@ +""" +Recursive Length Prefix (RLP) Encoding +====================================== + +RLP is Ethereum's serialization format for arbitrary nested binary data. +It is used for encoding transactions, blocks, ENR records, and more. + +Encoding Rules +-------------- + +RLP encodes two types of items: + +1. **Byte strings** (including empty string) +2. **Lists** of items (including empty list) + +Byte ranges determine the encoding: + ++-------------+-----------------------------------------------------------+ +| Prefix | Meaning | ++=============+===========================================================+ +| [0x00-0x7f] | Single byte, value is the byte itself | ++-------------+-----------------------------------------------------------+ +| [0x80-0xb7] | Short string (0-55 bytes), length = prefix - 0x80 | ++-------------+-----------------------------------------------------------+ +| [0xb8-0xbf] | Long string (>55 bytes), prefix - 0xb7 = length of length | ++-------------+-----------------------------------------------------------+ +| [0xc0-0xf7] | Short list (0-55 bytes payload), length = prefix - 0xc0 | ++-------------+-----------------------------------------------------------+ +| [0xf8-0xff] | Long list (>55 bytes payload), prefix - 0xf7 = len of len | ++-------------+-----------------------------------------------------------+ + +References: +---------- +- Ethereum Yellow Paper, Appendix B +- https://ethereum.org/en/developers/docs/data-structures-and-encoding/rlp/ +- https://github.com/ethereum/pyrlp +""" + +from __future__ import annotations + +from typing import TypeAlias + +RLPItem: TypeAlias = bytes | list["RLPItem"] +""" +RLP-encodable item. + +Either: +- bytes (a byte string) +- list of RLP items (recursive) +""" + + +SINGLE_BYTE_MAX = 0x7F +"""Boundary between single-byte encoding [0x00-0x7f] and string prefix.""" + +SHORT_STRING_PREFIX = 0x80 +"""Prefix for short strings (0-55 bytes). Final prefix = 0x80 + length.""" + +SHORT_STRING_MAX_LEN = 55 +"""Maximum string length for short encoding.""" + +LONG_STRING_BASE = 0xB7 +"""Base for long string prefix. Final prefix = 0xb7 + length_of_length.""" + +SHORT_LIST_PREFIX = 0xC0 +"""Prefix for short lists (0-55 bytes payload). Final prefix = 0xc0 + length.""" + +SHORT_LIST_MAX_LEN = 55 +"""Maximum list payload length for short encoding.""" + +LONG_LIST_BASE = 0xF7 +"""Base for long list prefix. Final prefix = 0xf7 + length_of_length.""" + + +def encode(item: RLPItem) -> bytes: + """ + Encode an item using RLP. + + Args: + item: Bytes or nested list of bytes to encode. + + Returns: + RLP-encoded bytes. + + Raises: + TypeError: If item is not bytes or list. + """ + if isinstance(item, bytes): + return _encode_bytes(item) + if isinstance(item, list): + return _encode_list(item) + raise TypeError(f"Cannot RLP encode type: {type(item).__name__}") + + +def _encode_bytes(data: bytes) -> bytes: + """ + Encode a byte string. + + Single bytes in [0x00, 0x7f] encode as themselves. + Short strings (0-55 bytes) use prefix 0x80 + length. + Long strings (>55 bytes) use prefix 0xb7 + length-of-length, then length. + """ + length = len(data) + + # Single byte encoding: values 0x00-0x7f encode as themselves. + if length == 1 and data[0] <= SINGLE_BYTE_MAX: + return data + + # Short string: 0-55 bytes. + if length <= SHORT_STRING_MAX_LEN: + return bytes([SHORT_STRING_PREFIX + length]) + data + + # Long string: >55 bytes. + length_bytes = _encode_length(length) + return bytes([LONG_STRING_BASE + len(length_bytes)]) + length_bytes + data + + +def _encode_list(items: list[RLPItem]) -> bytes: + """ + Encode a list of items. + + Recursively encodes each item, concatenates, then adds list prefix. + Short lists (0-55 bytes payload) use prefix 0xc0 + length. + Long lists (>55 bytes payload) use prefix 0xf7 + length-of-length, then length. + """ + # Recursively encode all items. + payload = b"".join(encode(item) for item in items) + length = len(payload) + + # Short list: 0-55 bytes payload. + if length <= SHORT_LIST_MAX_LEN: + return bytes([SHORT_LIST_PREFIX + length]) + payload + + # Long list: >55 bytes payload. + length_bytes = _encode_length(length) + return bytes([LONG_LIST_BASE + len(length_bytes)]) + length_bytes + payload + + +def _encode_length(value: int) -> bytes: + """ + Encode length as minimal big-endian bytes. + + Used for long string/list length encoding where length > 55. + Returns minimal representation with no leading zeros. + """ + if value == 0: + # Defensive: should never be called with 0 for valid long encodings. + return b"" + return value.to_bytes((value.bit_length() + 7) // 8, "big") + + +class RLPDecodingError(Exception): + """Error during RLP decoding.""" + + +def decode(data: bytes) -> RLPItem: + """ + Decode RLP-encoded bytes. + + Args: + data: RLP-encoded bytes. + + Returns: + Decoded item (bytes or nested list). + + Raises: + RLPDecodingError: If data is malformed. + """ + if len(data) == 0: + raise RLPDecodingError("Empty RLP data") + + item, consumed = _decode_item(data, 0) + + if consumed != len(data): + raise RLPDecodingError(f"Trailing data: decoded {consumed} of {len(data)} bytes") + + return item + + +def decode_list(data: bytes) -> list[bytes]: + """ + Decode RLP data as a flat list of byte items. + + This is a convenience function for cases like ENR where + we expect a flat list of byte strings (no nested lists). + + Args: + data: RLP-encoded bytes. + + Returns: + List of decoded byte strings. + + Raises: + RLPDecodingError: If data is not a list or contains nested lists. + """ + item = decode(data) + + if not isinstance(item, list): + raise RLPDecodingError("Expected RLP list") + + result: list[bytes] = [] + for i, elem in enumerate(item): + if not isinstance(elem, bytes): + raise RLPDecodingError(f"Element {i} is not bytes") + result.append(elem) + + return result + + +def _decode_item(data: bytes, offset: int) -> tuple[RLPItem, int]: + """ + Decode a single RLP item starting at offset. + + Returns (decoded_item, bytes_consumed). + """ + if offset >= len(data): + raise RLPDecodingError("Unexpected end of data") + + prefix = data[offset] + + # Single byte: 0x00-0x7f. + if prefix <= SINGLE_BYTE_MAX: + return data[offset : offset + 1], offset + 1 + + # Short string: 0x80-0xb7. + if prefix <= LONG_STRING_BASE: + length = prefix - SHORT_STRING_PREFIX + start = offset + 1 + end = start + length + _check_bounds(data, end) + return data[start:end], end + + # Long string: 0xb8-0xbf. + if prefix < SHORT_LIST_PREFIX: + len_of_len = prefix - LONG_STRING_BASE + start = offset + 1 + _check_bounds(data, start + len_of_len) + + # Validate: no leading zeros in length encoding. + if len_of_len > 1 and data[start] == 0: + raise RLPDecodingError("Non-canonical: leading zeros in length encoding") + + length = int.from_bytes(data[start : start + len_of_len], "big") + + # Validate: length must require this many bytes. + if length <= SHORT_STRING_MAX_LEN: + raise RLPDecodingError("Non-canonical: long string encoding for short string") + + payload_start = start + len_of_len + payload_end = payload_start + length + _check_bounds(data, payload_end) + return data[payload_start:payload_end], payload_end + + # Short list: 0xc0-0xf7. + if prefix <= LONG_LIST_BASE: + length = prefix - SHORT_LIST_PREFIX + start = offset + 1 + end = start + length + _check_bounds(data, end) + return _decode_list_payload(data, start, end), end + + # Long list: 0xf8-0xff. + len_of_len = prefix - LONG_LIST_BASE + start = offset + 1 + _check_bounds(data, start + len_of_len) + + # Validate: no leading zeros in length encoding. + if len_of_len > 1 and data[start] == 0: + raise RLPDecodingError("Non-canonical: leading zeros in length encoding") + + length = int.from_bytes(data[start : start + len_of_len], "big") + + # Validate: length must require this many bytes. + if length <= SHORT_LIST_MAX_LEN: + raise RLPDecodingError("Non-canonical: long list encoding for short list") + + payload_start = start + len_of_len + payload_end = payload_start + length + _check_bounds(data, payload_end) + return _decode_list_payload(data, payload_start, payload_end), payload_end + + +def _decode_list_payload(data: bytes, start: int, end: int) -> list[RLPItem]: + """Decode list payload between start and end offsets.""" + items: list[RLPItem] = [] + offset = start + + while offset < end: + item, offset = _decode_item(data, offset) + items.append(item) + + if offset != end: + raise RLPDecodingError("List payload length mismatch") + + return items + + +def _check_bounds(data: bytes, end: int) -> None: + """Verify end offset is within data bounds.""" + if end > len(data): + raise RLPDecodingError(f"Data too short: need {end}, have {len(data)}") diff --git a/tests/lean_spec/subspecs/networking/enr/__init__.py b/tests/lean_spec/subspecs/networking/enr/__init__.py new file mode 100644 index 00000000..5f5bb545 --- /dev/null +++ b/tests/lean_spec/subspecs/networking/enr/__init__.py @@ -0,0 +1 @@ +"""Tests for ENR (Ethereum Node Record) module.""" diff --git a/tests/lean_spec/subspecs/networking/enr/test_enr.py b/tests/lean_spec/subspecs/networking/enr/test_enr.py new file mode 100644 index 00000000..bd3f4ae1 --- /dev/null +++ b/tests/lean_spec/subspecs/networking/enr/test_enr.py @@ -0,0 +1,795 @@ +"""Tests for the Ethereum Node Record (ENR) module (EIP-778). + +This module tests ENR parsing, validation, and property accessors using the +official EIP-778 test vector and additional edge cases. + +References: +- EIP-778: https://eips.ethereum.org/EIPS/eip-778 +""" + +from __future__ import annotations + +import pytest + +from lean_spec.subspecs.networking.enr import ENR, keys +from lean_spec.subspecs.networking.enr.enr import ENR_PREFIX +from lean_spec.types import Uint64 + +# ============================================================================= +# Official EIP-778 Test Vector +# ============================================================================= +# From: https://eips.ethereum.org/EIPS/eip-778 +# +# Node ID: a448f24c6d18e575453db13171562b71999873db5b286df957af199ec94617f7 +# Sequence: 1 +# IPv4: 127.0.0.1 +# UDP port: 30303 +# Identity scheme: "v4" +# Compressed secp256k1 public key: +# 03ca634cae0d49acb401d8a4c6b6fe8c55b70d115bf400769cc1400f3258cd3138 + +OFFICIAL_ENR_STRING = ( + "enr:-IS4QHCYrYZbAKWCBRlAy5zzaDZXJBGkcnh4MHcBFZntXNFrdvJjX04jRzjz" + "CBOO" + "nrkTfj499SZuOh8R33Ls8RRcy5wBgmlkgnY0gmlwhH8AAAGJc2VjcDI1NmsxoQ" + "PKY0" + "yuDUmstAHYpMa2_oxVtw0RW_QAdpzBQA8yWM0xOIN1ZHCCdl8" +) +# Note: The ENR above has the base64 split across lines for readability, +# we need to join it. +OFFICIAL_ENR_STRING = ( + "enr:-IS4QHCYrYZbAKWCBRlAy5zzaDZXJBGkcnh4MHcBFZntXNFrdvJjX04jRzjz" + "CBOOnrkTfj499SZuOh8R33Ls8RRcy5wBgmlkgnY0gmlwhH8AAAGJc2VjcDI1NmsxoQ" + "PKY0yuDUmstAHYpMa2_oxVtw0RW_QAdpzBQA8yWM0xOIN1ZHCCdl8" +) + +OFFICIAL_NODE_ID = "a448f24c6d18e575453db13171562b71999873db5b286df957af199ec94617f7" +OFFICIAL_SEQ = 1 +OFFICIAL_IPV4 = "127.0.0.1" +OFFICIAL_UDP_PORT = 30303 +OFFICIAL_IDENTITY_SCHEME = "v4" +OFFICIAL_SECP256K1_PUBKEY = bytes.fromhex( + "03ca634cae0d49acb401d8a4c6b6fe8c55b70d115bf400769cc1400f3258cd3138" +) +OFFICIAL_SIGNATURE = bytes.fromhex( + "7098ad865b00a582051940cb9cf36836572411a47278783077011599ed5cd16b" + "76f2635f4e234738f308138e9eb9137e3e3df5266e3a1f11df72ecf1145ccb9c" +) + + +class TestOfficialEIP778Vector: + """Tests using the official EIP-778 test vector.""" + + def test_parse_official_enr_string(self) -> None: + """Official ENR string parses successfully.""" + enr = ENR.from_string(OFFICIAL_ENR_STRING) + assert enr is not None + + def test_official_enr_sequence_number(self) -> None: + """Official ENR has sequence number 1.""" + enr = ENR.from_string(OFFICIAL_ENR_STRING) + assert enr.seq == Uint64(OFFICIAL_SEQ) + + def test_official_enr_identity_scheme(self) -> None: + """Official ENR uses "v4" identity scheme.""" + enr = ENR.from_string(OFFICIAL_ENR_STRING) + assert enr.identity_scheme == OFFICIAL_IDENTITY_SCHEME + + def test_official_enr_ipv4_address(self) -> None: + """Official ENR has IPv4 address 127.0.0.1.""" + enr = ENR.from_string(OFFICIAL_ENR_STRING) + assert enr.ip4 == OFFICIAL_IPV4 + + def test_official_enr_udp_port(self) -> None: + """Official ENR has UDP port 30303.""" + enr = ENR.from_string(OFFICIAL_ENR_STRING) + assert enr.udp_port == OFFICIAL_UDP_PORT + + def test_official_enr_public_key(self) -> None: + """Official ENR has 33-byte compressed secp256k1 public key.""" + enr = ENR.from_string(OFFICIAL_ENR_STRING) + assert enr.public_key is not None + assert len(enr.public_key) == 33 + assert enr.public_key == OFFICIAL_SECP256K1_PUBKEY + + def test_official_enr_signature_length(self) -> None: + """Official ENR has 64-byte signature.""" + enr = ENR.from_string(OFFICIAL_ENR_STRING) + assert len(enr.signature) == 64 + + def test_official_enr_signature_value(self) -> None: + """Official ENR signature matches expected value.""" + enr = ENR.from_string(OFFICIAL_ENR_STRING) + assert enr.signature == OFFICIAL_SIGNATURE + + def test_official_enr_is_valid(self) -> None: + """Official ENR passes structural validation.""" + enr = ENR.from_string(OFFICIAL_ENR_STRING) + assert enr.is_valid() + + def test_official_enr_no_tcp_port(self) -> None: + """Official ENR does not have TCP port.""" + enr = ENR.from_string(OFFICIAL_ENR_STRING) + assert enr.tcp_port is None + + def test_official_enr_no_ipv6(self) -> None: + """Official ENR does not have IPv6 address.""" + enr = ENR.from_string(OFFICIAL_ENR_STRING) + assert enr.ip6 is None + + def test_official_enr_no_multiaddr(self) -> None: + """Official ENR has no multiaddr (no TCP port).""" + enr = ENR.from_string(OFFICIAL_ENR_STRING) + assert enr.multiaddr() is None + + +class TestTextFormatValidation: + """Tests for ENR text format parsing and validation.""" + + def test_prefix_required(self) -> None: + """ENR must start with 'enr:' prefix.""" + # Remove prefix from valid ENR + invalid = OFFICIAL_ENR_STRING[len(ENR_PREFIX) :] + with pytest.raises(ValueError, match=r"must start with 'enr:'"): + ENR.from_string(invalid) + + def test_wrong_prefix_rejected(self) -> None: + """ENR with wrong prefix is rejected.""" + invalid = "eth:" + OFFICIAL_ENR_STRING[len(ENR_PREFIX) :] + with pytest.raises(ValueError, match=r"must start with 'enr:'"): + ENR.from_string(invalid) + + def test_empty_string_rejected(self) -> None: + """Empty string is rejected.""" + with pytest.raises(ValueError, match=r"must start with 'enr:'"): + ENR.from_string("") + + def test_prefix_only_rejected(self) -> None: + """Prefix only without data is rejected.""" + with pytest.raises(ValueError): + ENR.from_string("enr:") + + def test_invalid_base64_rejected(self) -> None: + """Invalid base64 encoding is rejected.""" + invalid = "enr:!!!invalid-base64!!!" + with pytest.raises(ValueError, match=r"Invalid base64"): + ENR.from_string(invalid) + + def test_base64url_without_padding(self) -> None: + """Base64url without padding is handled correctly.""" + # The official ENR string has no padding and should parse fine + enr = ENR.from_string(OFFICIAL_ENR_STRING) + assert enr is not None + + def test_case_sensitive_prefix(self) -> None: + """Prefix is case-sensitive (ENR: is invalid).""" + invalid = "ENR:" + OFFICIAL_ENR_STRING[len(ENR_PREFIX) :] + with pytest.raises(ValueError, match=r"must start with 'enr:'"): + ENR.from_string(invalid) + + +class TestRLPStructureValidation: + """Tests for RLP structure validation during parsing.""" + + def test_minimum_fields_required(self) -> None: + """ENR must have at least signature and seq.""" + # Create RLP for just signature (missing seq) + import base64 + + from lean_spec.types.rlp import encode + + # RLP list with only signature + rlp_data = encode([b"\x00" * 64]) + b64_content = base64.urlsafe_b64encode(rlp_data).decode("utf-8").rstrip("=") + + with pytest.raises(ValueError, match=r"at least signature and seq"): + ENR.from_string(f"enr:{b64_content}") + + def test_odd_number_of_kv_pairs_rejected(self) -> None: + """ENR key/value pairs must be even count.""" + import base64 + + from lean_spec.types.rlp import encode + + # [signature, seq, key1] - odd number after signature/seq + rlp_data = encode([b"\x00" * 64, b"\x01", b"id"]) + b64_content = base64.urlsafe_b64encode(rlp_data).decode("utf-8").rstrip("=") + + with pytest.raises(ValueError, match=r"key/value pairs must be even"): + ENR.from_string(f"enr:{b64_content}") + + def test_empty_rlp_rejected(self) -> None: + """Empty RLP data is rejected.""" + import base64 + + b64_content = base64.urlsafe_b64encode(b"").decode("utf-8").rstrip("=") + + with pytest.raises(ValueError, match=r"Invalid RLP"): + ENR.from_string(f"enr:{b64_content}") + + def test_malformed_rlp_rejected(self) -> None: + """Malformed RLP is rejected.""" + import base64 + + # Invalid RLP: truncated list + malformed = bytes([0xC5, 0x01, 0x02]) # Claims 5 bytes but only has 2 + b64_content = base64.urlsafe_b64encode(malformed).decode("utf-8").rstrip("=") + + with pytest.raises(ValueError, match=r"Invalid RLP"): + ENR.from_string(f"enr:{b64_content}") + + def test_valid_minimal_enr(self) -> None: + """Minimal valid ENR with only required fields parses.""" + import base64 + + from lean_spec.types.rlp import encode + + # [signature(64), seq(1), "id", "v4", "secp256k1", pubkey(33)] + rlp_data = encode( + [ + b"\x00" * 64, # signature + b"\x01", # seq = 1 + b"id", + b"v4", + b"secp256k1", + b"\x02" + b"\x00" * 32, # compressed pubkey + ] + ) + b64_content = base64.urlsafe_b64encode(rlp_data).decode("utf-8").rstrip("=") + + enr = ENR.from_string(f"enr:{b64_content}") + assert enr.seq == Uint64(1) + assert enr.identity_scheme == "v4" + + +class TestPropertyAccessors: + """Tests for ENR property accessors.""" + + def test_identity_scheme_returns_v4(self) -> None: + """identity_scheme property returns 'v4' for valid ENR.""" + enr = ENR( + signature=b"\x00" * 64, + seq=Uint64(1), + pairs={keys.ID: b"v4", keys.SECP256K1: b"\x02" + b"\x00" * 32}, + ) + assert enr.identity_scheme == "v4" + + def test_identity_scheme_returns_none_when_missing(self) -> None: + """identity_scheme returns None when 'id' key is absent.""" + enr = ENR( + signature=b"\x00" * 64, + seq=Uint64(1), + pairs={keys.SECP256K1: b"\x02" + b"\x00" * 32}, + ) + assert enr.identity_scheme is None + + def test_public_key_returns_33_bytes(self) -> None: + """public_key returns 33-byte compressed secp256k1 key.""" + expected_key = b"\x03" + b"\xab" * 32 + enr = ENR( + signature=b"\x00" * 64, + seq=Uint64(1), + pairs={keys.ID: b"v4", keys.SECP256K1: expected_key}, + ) + public_key = enr.public_key + assert public_key is not None + assert public_key == expected_key + assert len(public_key) == 33 + + def test_public_key_returns_none_when_missing(self) -> None: + """public_key returns None when secp256k1 key is absent.""" + enr = ENR( + signature=b"\x00" * 64, + seq=Uint64(1), + pairs={keys.ID: b"v4"}, + ) + assert enr.public_key is None + + def test_ip4_formats_address_correctly(self) -> None: + """ip4 property formats IPv4 address as dotted string.""" + enr = ENR( + signature=b"\x00" * 64, + seq=Uint64(1), + pairs={ + keys.ID: b"v4", + keys.SECP256K1: b"\x02" + b"\x00" * 32, + keys.IP: b"\x7f\x00\x00\x01", # 127.0.0.1 + }, + ) + assert enr.ip4 == "127.0.0.1" + + def test_ip4_various_addresses(self) -> None: + """ip4 formats various IPv4 addresses correctly.""" + test_cases = [ + (b"\x00\x00\x00\x00", "0.0.0.0"), + (b"\xff\xff\xff\xff", "255.255.255.255"), + (b"\xc0\xa8\x01\x01", "192.168.1.1"), + (b"\x0a\x00\x00\x01", "10.0.0.1"), + ] + for ip_bytes, expected in test_cases: + enr = ENR( + signature=b"\x00" * 64, + seq=Uint64(1), + pairs={keys.ID: b"v4", keys.IP: ip_bytes}, + ) + assert enr.ip4 == expected + + def test_ip4_returns_none_when_missing(self) -> None: + """ip4 returns None when 'ip' key is absent.""" + enr = ENR( + signature=b"\x00" * 64, + seq=Uint64(1), + pairs={keys.ID: b"v4"}, + ) + assert enr.ip4 is None + + def test_ip4_returns_none_for_wrong_length(self) -> None: + """ip4 returns None when IP bytes are not 4 bytes.""" + enr = ENR( + signature=b"\x00" * 64, + seq=Uint64(1), + pairs={keys.ID: b"v4", keys.IP: b"\x7f\x00\x00"}, # Only 3 bytes + ) + assert enr.ip4 is None + + def test_ip6_formats_address_correctly(self) -> None: + """ip6 property formats IPv6 address as colon-separated hex.""" + # ::1 (loopback) + ipv6_bytes = b"\x00" * 15 + b"\x01" + enr = ENR( + signature=b"\x00" * 64, + seq=Uint64(1), + pairs={keys.ID: b"v4", keys.IP6: ipv6_bytes}, + ) + assert enr.ip6 == "0000:0000:0000:0000:0000:0000:0000:0001" + + def test_ip6_returns_none_when_missing(self) -> None: + """ip6 returns None when 'ip6' key is absent.""" + enr = ENR( + signature=b"\x00" * 64, + seq=Uint64(1), + pairs={keys.ID: b"v4"}, + ) + assert enr.ip6 is None + + def test_ip6_returns_none_for_wrong_length(self) -> None: + """ip6 returns None when IP bytes are not 16 bytes.""" + enr = ENR( + signature=b"\x00" * 64, + seq=Uint64(1), + pairs={keys.ID: b"v4", keys.IP6: b"\x00" * 8}, # Only 8 bytes + ) + assert enr.ip6 is None + + def test_udp_port_extracts_correctly(self) -> None: + """udp_port extracts port number from big-endian bytes.""" + enr = ENR( + signature=b"\x00" * 64, + seq=Uint64(1), + pairs={keys.ID: b"v4", keys.UDP: (30303).to_bytes(2, "big")}, + ) + assert enr.udp_port == 30303 + + def test_udp_port_various_values(self) -> None: + """udp_port handles various port values.""" + test_cases = [ + (b"\x00\x01", 1), + (b"\xff\xff", 65535), + (b"\x23\x28", 9000), + (b"\x76\x5f", 30303), + ] + for port_bytes, expected in test_cases: + enr = ENR( + signature=b"\x00" * 64, + seq=Uint64(1), + pairs={keys.ID: b"v4", keys.UDP: port_bytes}, + ) + assert enr.udp_port == expected + + def test_udp_port_returns_none_when_missing(self) -> None: + """udp_port returns None when 'udp' key is absent.""" + enr = ENR( + signature=b"\x00" * 64, + seq=Uint64(1), + pairs={keys.ID: b"v4"}, + ) + assert enr.udp_port is None + + def test_tcp_port_extracts_correctly(self) -> None: + """tcp_port extracts port number from big-endian bytes.""" + enr = ENR( + signature=b"\x00" * 64, + seq=Uint64(1), + pairs={keys.ID: b"v4", keys.TCP: (9000).to_bytes(2, "big")}, + ) + assert enr.tcp_port == 9000 + + def test_tcp_port_returns_none_when_missing(self) -> None: + """tcp_port returns None when 'tcp' key is absent.""" + enr = ENR( + signature=b"\x00" * 64, + seq=Uint64(1), + pairs={keys.ID: b"v4"}, + ) + assert enr.tcp_port is None + + +class TestValidationMethods: + """Tests for ENR validation methods.""" + + def test_is_valid_returns_true_for_complete_v4_enr(self) -> None: + """is_valid() returns True for complete v4 ENR.""" + enr = ENR( + signature=b"\x00" * 64, + seq=Uint64(1), + pairs={keys.ID: b"v4", keys.SECP256K1: b"\x02" + b"\x00" * 32}, + ) + assert enr.is_valid() + + def test_is_valid_returns_false_for_missing_public_key(self) -> None: + """is_valid() returns False when secp256k1 key is missing.""" + enr = ENR( + signature=b"\x00" * 64, + seq=Uint64(1), + pairs={keys.ID: b"v4"}, + ) + assert not enr.is_valid() + + def test_is_valid_returns_false_for_wrong_identity_scheme(self) -> None: + """is_valid() returns False for non-v4 identity scheme.""" + enr = ENR( + signature=b"\x00" * 64, + seq=Uint64(1), + pairs={keys.ID: b"v5", keys.SECP256K1: b"\x02" + b"\x00" * 32}, + ) + assert not enr.is_valid() + + def test_is_valid_returns_false_for_missing_identity_scheme(self) -> None: + """is_valid() returns False when 'id' key is missing.""" + enr = ENR( + signature=b"\x00" * 64, + seq=Uint64(1), + pairs={keys.SECP256K1: b"\x02" + b"\x00" * 32}, + ) + assert not enr.is_valid() + + def test_is_valid_returns_false_for_wrong_pubkey_length(self) -> None: + """is_valid() returns False for public key != 33 bytes.""" + # 32 bytes (uncompressed prefix missing) + enr = ENR( + signature=b"\x00" * 64, + seq=Uint64(1), + pairs={keys.ID: b"v4", keys.SECP256K1: b"\x00" * 32}, + ) + assert not enr.is_valid() + + # 65 bytes (uncompressed format) + enr = ENR( + signature=b"\x00" * 64, + seq=Uint64(1), + pairs={keys.ID: b"v4", keys.SECP256K1: b"\x04" + b"\x00" * 64}, + ) + assert not enr.is_valid() + + def test_is_valid_returns_false_for_wrong_signature_length(self) -> None: + """is_valid() returns False for signature != 64 bytes.""" + enr = ENR( + signature=b"\x00" * 63, # 63 bytes + seq=Uint64(1), + pairs={keys.ID: b"v4", keys.SECP256K1: b"\x02" + b"\x00" * 32}, + ) + assert not enr.is_valid() + + +class TestMultiaddrGeneration: + """Tests for multiaddr() method.""" + + def test_multiaddr_with_ipv4_and_tcp(self) -> None: + """multiaddr() generates correct format with IPv4 and TCP.""" + enr = ENR( + signature=b"\x00" * 64, + seq=Uint64(1), + pairs={ + keys.ID: b"v4", + keys.IP: b"\xc0\xa8\x01\x01", # 192.168.1.1 + keys.TCP: (9000).to_bytes(2, "big"), + }, + ) + assert enr.multiaddr() == "/ip4/192.168.1.1/tcp/9000" + + def test_multiaddr_with_ipv6_and_tcp(self) -> None: + """multiaddr() generates correct format with IPv6 and TCP.""" + ipv6_bytes = b"\x00" * 15 + b"\x01" # ::1 + enr = ENR( + signature=b"\x00" * 64, + seq=Uint64(1), + pairs={ + keys.ID: b"v4", + keys.IP6: ipv6_bytes, + keys.TCP: (9000).to_bytes(2, "big"), + }, + ) + assert enr.multiaddr() == "/ip6/0000:0000:0000:0000:0000:0000:0000:0001/tcp/9000" + + def test_multiaddr_returns_none_without_tcp(self) -> None: + """multiaddr() returns None when TCP port is absent.""" + enr = ENR( + signature=b"\x00" * 64, + seq=Uint64(1), + pairs={ + keys.ID: b"v4", + keys.IP: b"\xc0\xa8\x01\x01", + keys.UDP: (30303).to_bytes(2, "big"), # UDP, not TCP + }, + ) + assert enr.multiaddr() is None + + def test_multiaddr_returns_none_without_ip(self) -> None: + """multiaddr() returns None when no IP address is present.""" + enr = ENR( + signature=b"\x00" * 64, + seq=Uint64(1), + pairs={keys.ID: b"v4", keys.TCP: (9000).to_bytes(2, "big")}, + ) + assert enr.multiaddr() is None + + def test_multiaddr_prefers_ipv4_over_ipv6(self) -> None: + """multiaddr() uses IPv4 when both IPv4 and IPv6 are present.""" + enr = ENR( + signature=b"\x00" * 64, + seq=Uint64(1), + pairs={ + keys.ID: b"v4", + keys.IP: b"\xc0\xa8\x01\x01", # 192.168.1.1 + keys.IP6: b"\x00" * 15 + b"\x01", # ::1 + keys.TCP: (9000).to_bytes(2, "big"), + }, + ) + assert enr.multiaddr() == "/ip4/192.168.1.1/tcp/9000" + + +class TestStringRepresentation: + """Tests for ENR string representation.""" + + def test_str_includes_seq(self) -> None: + """__str__() includes sequence number.""" + enr = ENR( + signature=b"\x00" * 64, + seq=Uint64(42), + pairs={keys.ID: b"v4"}, + ) + result = str(enr) + assert "seq=42" in result + + def test_str_includes_ip(self) -> None: + """__str__() includes IP address when present.""" + enr = ENR( + signature=b"\x00" * 64, + seq=Uint64(1), + pairs={keys.ID: b"v4", keys.IP: b"\xc0\xa8\x01\x01"}, + ) + result = str(enr) + assert "192.168.1.1" in result + + def test_str_includes_tcp_port(self) -> None: + """__str__() includes TCP port when present.""" + enr = ENR( + signature=b"\x00" * 64, + seq=Uint64(1), + pairs={keys.ID: b"v4", keys.TCP: (9000).to_bytes(2, "big")}, + ) + result = str(enr) + assert "tcp=9000" in result + + def test_str_includes_udp_port(self) -> None: + """__str__() includes UDP port when present.""" + enr = ENR( + signature=b"\x00" * 64, + seq=Uint64(1), + pairs={keys.ID: b"v4", keys.UDP: (30303).to_bytes(2, "big")}, + ) + result = str(enr) + assert "udp=30303" in result + + def test_str_minimal_enr(self) -> None: + """__str__() works for minimal ENR.""" + enr = ENR( + signature=b"\x00" * 64, + seq=Uint64(1), + pairs={}, + ) + result = str(enr) + assert result.startswith("ENR(") + assert result.endswith(")") + assert "seq=1" in result + + +class TestKeyAccessMethods: + """Tests for get() and has() methods.""" + + def test_get_existing_key(self) -> None: + """get() returns value for existing key.""" + enr = ENR( + signature=b"\x00" * 64, + seq=Uint64(1), + pairs={keys.ID: b"v4"}, + ) + assert enr.get(keys.ID) == b"v4" + + def test_get_missing_key(self) -> None: + """get() returns None for missing key.""" + enr = ENR( + signature=b"\x00" * 64, + seq=Uint64(1), + pairs={keys.ID: b"v4"}, + ) + assert enr.get(keys.IP) is None + + def test_has_existing_key(self) -> None: + """has() returns True for existing key.""" + enr = ENR( + signature=b"\x00" * 64, + seq=Uint64(1), + pairs={keys.ID: b"v4", keys.IP: b"\x7f\x00\x00\x01"}, + ) + assert enr.has(keys.ID) + assert enr.has(keys.IP) + + def test_has_missing_key(self) -> None: + """has() returns False for missing key.""" + enr = ENR( + signature=b"\x00" * 64, + seq=Uint64(1), + pairs={keys.ID: b"v4"}, + ) + assert not enr.has(keys.IP) + assert not enr.has(keys.TCP) + assert not enr.has(keys.ETH2) + + +class TestEdgeCases: + """Tests for edge cases and boundary conditions.""" + + def test_enr_with_only_required_fields(self) -> None: + """ENR with minimum required fields is valid.""" + import base64 + + from lean_spec.types.rlp import encode + + rlp_data = encode( + [ + b"\x00" * 64, # signature + b"\x01", # seq + b"id", + b"v4", + b"secp256k1", + b"\x02" + b"\x00" * 32, + ] + ) + b64_content = base64.urlsafe_b64encode(rlp_data).decode("utf-8").rstrip("=") + + enr = ENR.from_string(f"enr:{b64_content}") + assert enr.is_valid() + assert enr.ip4 is None + assert enr.tcp_port is None + assert enr.udp_port is None + + def test_enr_with_ipv6_only(self) -> None: + """ENR with IPv6 but no IPv4 parses correctly.""" + import base64 + + from lean_spec.types.rlp import encode + + ipv6_bytes = bytes.fromhex("20010db8000000000000000000000001") # 2001:db8::1 + rlp_data = encode( + [ + b"\x00" * 64, + b"\x01", + b"id", + b"v4", + b"ip6", + ipv6_bytes, + b"secp256k1", + b"\x02" + b"\x00" * 32, + b"tcp", + (9000).to_bytes(2, "big"), + ] + ) + b64_content = base64.urlsafe_b64encode(rlp_data).decode("utf-8").rstrip("=") + + enr = ENR.from_string(f"enr:{b64_content}") + assert enr.ip4 is None + assert enr.ip6 is not None + assert enr.tcp_port == 9000 + # multiaddr should use IPv6 + multiaddr = enr.multiaddr() + assert multiaddr is not None + assert "/ip6/" in multiaddr + + def test_enr_with_both_tcp_and_udp(self) -> None: + """ENR with both TCP and UDP ports parses correctly.""" + import base64 + + from lean_spec.types.rlp import encode + + rlp_data = encode( + [ + b"\x00" * 64, + b"\x01", + b"id", + b"v4", + b"ip", + b"\xc0\xa8\x01\x01", + b"secp256k1", + b"\x02" + b"\x00" * 32, + b"tcp", + (9000).to_bytes(2, "big"), + b"udp", + (30303).to_bytes(2, "big"), + ] + ) + b64_content = base64.urlsafe_b64encode(rlp_data).decode("utf-8").rstrip("=") + + enr = ENR.from_string(f"enr:{b64_content}") + assert enr.tcp_port == 9000 + assert enr.udp_port == 30303 + assert enr.multiaddr() == "/ip4/192.168.1.1/tcp/9000" + + def test_sequence_number_zero(self) -> None: + """ENR with sequence number 0 is valid.""" + import base64 + + from lean_spec.types.rlp import encode + + rlp_data = encode( + [ + b"\x00" * 64, + b"", # Empty bytes = 0 + b"id", + b"v4", + b"secp256k1", + b"\x02" + b"\x00" * 32, + ] + ) + b64_content = base64.urlsafe_b64encode(rlp_data).decode("utf-8").rstrip("=") + + enr = ENR.from_string(f"enr:{b64_content}") + assert enr.seq == Uint64(0) + + def test_large_sequence_number(self) -> None: + """ENR with large sequence number parses correctly.""" + import base64 + + from lean_spec.types.rlp import encode + + large_seq = (2**32).to_bytes(5, "big") + rlp_data = encode( + [ + b"\x00" * 64, + large_seq, + b"id", + b"v4", + b"secp256k1", + b"\x02" + b"\x00" * 32, + ] + ) + b64_content = base64.urlsafe_b64encode(rlp_data).decode("utf-8").rstrip("=") + + enr = ENR.from_string(f"enr:{b64_content}") + assert enr.seq == Uint64(2**32) + + +class TestENRConstants: + """Tests for ENR constants.""" + + def test_max_size_constant(self) -> None: + """MAX_SIZE is 300 bytes per EIP-778.""" + assert ENR.MAX_SIZE == 300 + + def test_scheme_constant(self) -> None: + """SCHEME is 'v4' for current identity scheme.""" + assert ENR.SCHEME == "v4" + + def test_prefix_constant(self) -> None: + """ENR_PREFIX is 'enr:' for text encoding.""" + assert ENR_PREFIX == "enr:" diff --git a/tests/lean_spec/subspecs/networking/test_network_service.py b/tests/lean_spec/subspecs/networking/test_network_service.py index f31070a7..59119bec 100644 --- a/tests/lean_spec/subspecs/networking/test_network_service.py +++ b/tests/lean_spec/subspecs/networking/test_network_service.py @@ -44,6 +44,7 @@ class MockEventSource: events: list[NetworkEvent] = field(default_factory=list) _index: int = field(default=0, init=False) + _published: list[tuple[str, bytes]] = field(default_factory=list, init=False) def __aiter__(self) -> "MockEventSource": return self @@ -56,6 +57,10 @@ async def __anext__(self) -> NetworkEvent: await asyncio.sleep(0) return event + async def publish(self, topic: str, data: bytes) -> None: + """Mock publish - records published messages for testing.""" + self._published.append((topic, data)) + @dataclass class MockNetworkRequester: diff --git a/tests/lean_spec/subspecs/node/test_node.py b/tests/lean_spec/subspecs/node/test_node.py index 73ec8ab0..6d8511b3 100644 --- a/tests/lean_spec/subspecs/node/test_node.py +++ b/tests/lean_spec/subspecs/node/test_node.py @@ -21,6 +21,7 @@ def __init__(self, events: list[NetworkEvent] | None = None) -> None: """Initialize with optional list of events.""" self._events = events or [] self._index = 0 + self._published: list[tuple[str, bytes]] = [] def __aiter__(self) -> MockEventSource: """Return self as async iterator.""" @@ -34,6 +35,10 @@ async def __anext__(self) -> NetworkEvent: self._index += 1 return event + async def publish(self, topic: str, data: bytes) -> None: + """Mock publish - records published messages for testing.""" + self._published.append((topic, data)) + class MockNetworkRequester: """Mock network requester for testing.""" diff --git a/tests/lean_spec/types/test_rlp.py b/tests/lean_spec/types/test_rlp.py new file mode 100644 index 00000000..8a8d8f24 --- /dev/null +++ b/tests/lean_spec/types/test_rlp.py @@ -0,0 +1,684 @@ +"""Tests for the RLP (Recursive Length Prefix) encoding module.""" + +from __future__ import annotations + +import pytest + +from lean_spec.types.rlp import ( + LONG_LIST_BASE, + LONG_STRING_BASE, + SHORT_LIST_MAX_LEN, + SHORT_LIST_PREFIX, + SHORT_STRING_MAX_LEN, + SHORT_STRING_PREFIX, + SINGLE_BYTE_MAX, + RLPDecodingError, + RLPItem, + decode, + decode_list, + encode, +) + +# Derived constants for test assertions. +# Long encoding prefixes are BASE + 1 (for 1-byte length). +LONG_STRING_PREFIX = LONG_STRING_BASE + 1 # 0xB8 +LONG_LIST_PREFIX = LONG_LIST_BASE + 1 # 0xF8 + + +class TestEncodeEmptyString: + """Tests for encoding empty byte strings.""" + + def test_encode_empty_string(self) -> None: + """Empty string encodes to 0x80.""" + result = encode(b"") + assert result == bytes.fromhex("80") + + +class TestEncodeSingleByte: + """Tests for single byte encoding (0x00-0x7f).""" + + def test_encode_byte_0x00(self) -> None: + """Byte 0x00 encodes as itself.""" + result = encode(b"\x00") + assert result == bytes.fromhex("00") + + def test_encode_byte_0x01(self) -> None: + """Byte 0x01 encodes as itself.""" + result = encode(b"\x01") + assert result == bytes.fromhex("01") + + def test_encode_byte_0x7f(self) -> None: + """Maximum single-byte value (0x7f) encodes as itself.""" + result = encode(b"\x7f") + assert result == bytes.fromhex("7f") + + @pytest.mark.parametrize("byte_val", range(0x00, SINGLE_BYTE_MAX + 1)) + def test_encode_all_single_byte_values(self, byte_val: int) -> None: + """All single-byte values 0x00-0x7f encode as themselves.""" + data = bytes([byte_val]) + result = encode(data) + assert result == data + + +class TestEncodeShortString: + """Tests for short string encoding (0-55 bytes).""" + + def test_encode_short_string_dog(self) -> None: + """'dog' encodes with prefix 0x83 (0x80 + 3) followed by ASCII bytes.""" + result = encode(b"dog") + assert result == bytes.fromhex("83646f67") + + def test_encode_short_string_55_bytes(self) -> None: + """55-byte string uses short string encoding (max for this category).""" + data = b"Lorem ipsum dolor sit amet, consectetur adipisicing eli" + assert len(data) == SHORT_STRING_MAX_LEN + result = encode(data) + expected = bytes.fromhex( + "b74c6f72656d20697073756d20646f6c6f722073697420616d65742c20" + "636f6e7365637465747572206164697069736963696e6720656c69" + ) + assert result == expected + + def test_encode_single_byte_above_0x7f(self) -> None: + """Single byte 0x80 uses short string encoding, not single-byte encoding.""" + result = encode(b"\x80") + assert result == bytes([SHORT_STRING_PREFIX + 1, 0x80]) + + @pytest.mark.parametrize("length", [1, 10, 20, 30, 40, 50, SHORT_STRING_MAX_LEN]) + def test_encode_short_string_various_lengths(self, length: int) -> None: + """Short strings of various lengths are prefixed with 0x80 + length.""" + # Use bytes above 0x7f to ensure short string encoding is used + data = bytes([0x80 + (i % 0x7F) for i in range(length)]) + result = encode(data) + assert result[0] == SHORT_STRING_PREFIX + length + assert result[1:] == data + + +class TestEncodeLongString: + """Tests for long string encoding (>55 bytes).""" + + def test_encode_long_string_56_bytes(self) -> None: + """56-byte string uses long string encoding.""" + data = b"Lorem ipsum dolor sit amet, consectetur adipisicing elit" + assert len(data) == SHORT_STRING_MAX_LEN + 1 + result = encode(data) + expected = bytes.fromhex( + "b8384c6f72656d20697073756d20646f6c6f722073697420616d65742c20" + "636f6e7365637465747572206164697069736963696e6720656c6974" + ) + assert result == expected + + def test_encode_long_string_1024_bytes(self) -> None: + """1024-byte string encodes with 2-byte length prefix.""" + # Use simple repeated bytes to avoid codespell false positives. + data = b"x" * 1024 + assert len(data) == 1024 + result = encode(data) + # Prefix 0xb9 = 0xb7 + 2 (2 bytes for length) + # Length 0x0400 = 1024 in big-endian + assert result[0] == LONG_STRING_PREFIX + 1 # 0xb9 + assert result[1:3] == b"\x04\x00" + assert result[3:] == data + + def test_encode_long_string_boundary(self) -> None: + """String at exact boundary (56 bytes) uses long encoding.""" + data = b"a" * (SHORT_STRING_MAX_LEN + 1) + result = encode(data) + # Prefix 0xb8 = 0xb7 + 1 (1 byte for length) + assert result[0] == LONG_STRING_PREFIX + assert result[1] == len(data) + assert result[2:] == data + + +class TestEncodeEmptyList: + """Tests for encoding empty lists.""" + + def test_encode_empty_list(self) -> None: + """Empty list encodes to 0xc0.""" + result = encode([]) + assert result == bytes.fromhex("c0") + + +class TestEncodeShortList: + """Tests for short list encoding (0-55 bytes payload).""" + + def test_encode_string_list(self) -> None: + """List of strings ['dog', 'god', 'cat'] encodes correctly.""" + result = encode([b"dog", b"god", b"cat"]) + assert result == bytes.fromhex("cc83646f6783676f6483636174") + + def test_encode_multilist(self) -> None: + """Mixed list ['zw', [4], 1] encodes correctly.""" + # 4 encodes as 0x04 (single byte) + # 1 encodes as 0x01 (single byte) + result = encode([b"zw", [b"\x04"], b"\x01"]) + assert result == bytes.fromhex("c6827a77c10401") + + def test_encode_short_list_max_payload(self) -> None: + """Short list with 55 bytes of payload uses short list encoding.""" + # Create a list that has exactly 55 bytes of payload + # Each "a" encodes as 0x61 (single byte), so 55 "a"s = 55 bytes payload + items: list[RLPItem] = [b"a" for _ in range(SHORT_LIST_MAX_LEN)] + result = encode(items) + assert result[0] == SHORT_LIST_PREFIX + SHORT_LIST_MAX_LEN # 0xf7 + + +class TestEncodeLongList: + """Tests for long list encoding (>55 bytes payload).""" + + def test_encode_long_list_four_nested(self) -> None: + """Long list with 4 nested lists encodes correctly.""" + inner = [b"asdf", b"qwer", b"zxcv"] + result = encode([inner, inner, inner, inner]) + expected = bytes.fromhex( + "f840cf84617364668471776572847a786376cf84617364668471776572847a786376" + "cf84617364668471776572847a786376cf84617364668471776572847a786376" + ) + assert result == expected + + def test_encode_long_list_32_nested(self) -> None: + """Long list with 32 nested lists uses 2-byte length prefix.""" + inner = [b"asdf", b"qwer", b"zxcv"] + result = encode([inner] * 32) + expected_start = bytes.fromhex("f90200") # 0xf9 = 0xf7 + 2, length = 0x0200 = 512 + assert result[:3] == expected_start + + def test_encode_short_list_11_elements(self) -> None: + """List with 11 4-byte strings has >55 byte payload, uses long encoding.""" + items: list[RLPItem] = [ + b"asdf", + b"qwer", + b"zxcv", + b"asdf", + b"qwer", + b"zxcv", + b"asdf", + b"qwer", + b"zxcv", + b"asdf", + b"qwer", + ] + result = encode(items) + expected = bytes.fromhex( + "f784617364668471776572847a78637684617364668471776572847a78637684617364" + "668471776572847a78637684617364668471776572" + ) + assert result == expected + + +class TestEncodeNestedLists: + """Tests for encoding nested list structures.""" + + def test_encode_lists_of_lists(self) -> None: + """Nested empty lists [[[], []], []] encode correctly.""" + result = encode([[[], []], []]) + assert result == bytes.fromhex("c4c2c0c0c0") + + def test_encode_lists_of_lists_complex(self) -> None: + """Complex nested structure [[], [[]], [[], [[]]]] encodes correctly.""" + result = encode([[], [[]], [[], [[]]]]) + assert result == bytes.fromhex("c7c0c1c0c3c0c1c0") + + +class TestEncodeIntegers: + """Tests for encoding integers (as byte strings).""" + + def test_encode_zero(self) -> None: + """Integer 0 encodes as empty string (0x80).""" + # In RLP, 0 is represented as empty byte string + result = encode(b"") + assert result == bytes.fromhex("80") + + def test_encode_small_integers(self) -> None: + """Small integers 1-127 encode as single bytes.""" + assert encode(b"\x01") == bytes.fromhex("01") + assert encode(b"\x10") == bytes.fromhex("10") # 16 + assert encode(b"\x4f") == bytes.fromhex("4f") # 79 + assert encode(b"\x7f") == bytes.fromhex("7f") # 127 + + def test_encode_medium_integers(self) -> None: + """Integers >= 128 encode as short strings.""" + # 128 = 0x80 (1 byte, but > 0x7f so needs prefix) + assert encode(b"\x80") == bytes.fromhex("8180") + + # 1000 = 0x03e8 (2 bytes) + assert encode((1000).to_bytes(2, "big")) == bytes.fromhex("8203e8") + + # 100000 = 0x0186a0 (3 bytes) + assert encode((100000).to_bytes(3, "big")) == bytes.fromhex("830186a0") + + def test_encode_big_integer_2_pow_256(self) -> None: + """2^256 encodes as 33-byte string.""" + big_int = 2**256 + big_bytes = big_int.to_bytes(33, "big") + result = encode(big_bytes) + expected = bytes.fromhex( + "a1010000000000000000000000000000000000000000000000000000000000000000" + ) + assert result == expected + + +class TestEncodeTypeErrors: + """Tests for type validation during encoding.""" + + def test_encode_invalid_type_int(self) -> None: + """Encoding an integer directly raises TypeError.""" + with pytest.raises(TypeError, match=r"Cannot RLP encode type: int"): + encode(42) # type: ignore[arg-type] + + def test_encode_invalid_type_str(self) -> None: + """Encoding a string directly raises TypeError.""" + with pytest.raises(TypeError, match=r"Cannot RLP encode type: str"): + encode("hello") # type: ignore[arg-type] + + def test_encode_invalid_type_none(self) -> None: + """Encoding None raises TypeError.""" + with pytest.raises(TypeError, match=r"Cannot RLP encode type: NoneType"): + encode(None) # type: ignore[arg-type] + + def test_encode_invalid_nested_type(self) -> None: + """Encoding a list with invalid nested type raises TypeError.""" + with pytest.raises(TypeError, match=r"Cannot RLP encode type: int"): + encode([b"valid", 123]) # type: ignore[list-item] + + +class TestDecodeEmptyString: + """Tests for decoding empty byte strings.""" + + def test_decode_empty_string(self) -> None: + """0x80 decodes to empty string.""" + result = decode(bytes.fromhex("80")) + assert result == b"" + + +class TestDecodeSingleByte: + """Tests for decoding single bytes (0x00-0x7f).""" + + def test_decode_byte_0x00(self) -> None: + """0x00 decodes to single byte 0x00.""" + result = decode(bytes.fromhex("00")) + assert result == b"\x00" + + def test_decode_byte_0x01(self) -> None: + """0x01 decodes to single byte 0x01.""" + result = decode(bytes.fromhex("01")) + assert result == b"\x01" + + def test_decode_byte_0x7f(self) -> None: + """0x7f decodes to single byte 0x7f.""" + result = decode(bytes.fromhex("7f")) + assert result == b"\x7f" + + @pytest.mark.parametrize("byte_val", range(0x00, SINGLE_BYTE_MAX + 1)) + def test_decode_all_single_byte_values(self, byte_val: int) -> None: + """All single-byte values 0x00-0x7f decode correctly.""" + data = bytes([byte_val]) + result = decode(data) + assert result == data + + +class TestDecodeShortString: + """Tests for decoding short strings.""" + + def test_decode_short_string_dog(self) -> None: + """0x83646f67 decodes to 'dog'.""" + result = decode(bytes.fromhex("83646f67")) + assert result == b"dog" + + def test_decode_short_string_55_bytes(self) -> None: + """55-byte short string decodes correctly.""" + encoded = bytes.fromhex( + "b74c6f72656d20697073756d20646f6c6f722073697420616d65742c20" + "636f6e7365637465747572206164697069736963696e6720656c69" + ) + result = decode(encoded) + assert result == b"Lorem ipsum dolor sit amet, consectetur adipisicing eli" + + +class TestDecodeLongString: + """Tests for decoding long strings.""" + + def test_decode_long_string_56_bytes(self) -> None: + """56-byte long string decodes correctly.""" + encoded = bytes.fromhex( + "b8384c6f72656d20697073756d20646f6c6f722073697420616d65742c20" + "636f6e7365637465747572206164697069736963696e6720656c6974" + ) + result = decode(encoded) + assert result == b"Lorem ipsum dolor sit amet, consectetur adipisicing elit" + + def test_decode_long_string_1024_bytes(self) -> None: + """1024-byte string with 2-byte length prefix decodes correctly.""" + # Use simple repeated bytes to avoid codespell false positives. + expected_data = b"y" * 1024 + encoded = encode(expected_data) + result = decode(encoded) + assert result == expected_data + + +class TestDecodeEmptyList: + """Tests for decoding empty lists.""" + + def test_decode_empty_list(self) -> None: + """0xc0 decodes to empty list.""" + result = decode(bytes.fromhex("c0")) + assert result == [] + + +class TestDecodeShortList: + """Tests for decoding short lists.""" + + def test_decode_string_list(self) -> None: + """Encoded string list decodes correctly.""" + result = decode(bytes.fromhex("cc83646f6783676f6483636174")) + assert result == [b"dog", b"god", b"cat"] + + def test_decode_multilist(self) -> None: + """Mixed list decodes correctly.""" + result = decode(bytes.fromhex("c6827a77c10401")) + assert result == [b"zw", [b"\x04"], b"\x01"] + + +class TestDecodeLongList: + """Tests for decoding long lists.""" + + def test_decode_long_list_four_nested(self) -> None: + """Long list with 4 nested lists decodes correctly.""" + encoded = bytes.fromhex( + "f840cf84617364668471776572847a786376cf84617364668471776572847a786376" + "cf84617364668471776572847a786376cf84617364668471776572847a786376" + ) + result = decode(encoded) + inner = [b"asdf", b"qwer", b"zxcv"] + assert result == [inner, inner, inner, inner] + + +class TestDecodeNestedLists: + """Tests for decoding nested list structures.""" + + def test_decode_lists_of_lists(self) -> None: + """Nested empty lists decode correctly.""" + result = decode(bytes.fromhex("c4c2c0c0c0")) + assert result == [[[], []], []] + + def test_decode_lists_of_lists_complex(self) -> None: + """Complex nested structure decodes correctly.""" + result = decode(bytes.fromhex("c7c0c1c0c3c0c1c0")) + assert result == [[], [[]], [[], [[]]]] + + +class TestDecodeErrors: + """Tests for decoding error conditions.""" + + def test_decode_empty_data(self) -> None: + """Decoding empty data raises RLPDecodingError.""" + with pytest.raises(RLPDecodingError, match=r"Empty RLP data"): + decode(b"") + + def test_decode_trailing_data(self) -> None: + """Extra bytes after valid RLP raise RLPDecodingError.""" + # Valid empty string (0x80) followed by extra byte + with pytest.raises(RLPDecodingError, match=r"Trailing data"): + decode(bytes.fromhex("8000")) + + def test_decode_short_string_truncated(self) -> None: + """Truncated short string raises RLPDecodingError.""" + # 0x83 indicates 3-byte string, but only 2 bytes provided + with pytest.raises(RLPDecodingError, match=r"Data too short"): + decode(bytes.fromhex("836465")) # "de" instead of "dog" + + def test_decode_long_string_truncated_length(self) -> None: + """Truncated length field in long string raises RLPDecodingError.""" + # 0xb9 indicates 2-byte length, but only 1 byte provided + with pytest.raises(RLPDecodingError, match=r"Data too short"): + decode(bytes.fromhex("b904")) + + def test_decode_long_string_truncated_payload(self) -> None: + """Truncated payload in long string raises RLPDecodingError.""" + # 0xb838 indicates 56 bytes, but insufficient data provided + with pytest.raises(RLPDecodingError, match=r"Data too short"): + decode(bytes.fromhex("b8380000")) # Only 2 bytes of payload + + def test_decode_short_list_truncated(self) -> None: + """Truncated short list raises RLPDecodingError.""" + # 0xc3 indicates 3-byte payload, but only 2 bytes provided + with pytest.raises(RLPDecodingError, match=r"Data too short"): + decode(bytes.fromhex("c38080")) + + def test_decode_long_list_truncated_length(self) -> None: + """Truncated length field in long list raises RLPDecodingError.""" + # 0xf9 indicates 2-byte length, but only 1 byte provided + with pytest.raises(RLPDecodingError, match=r"Data too short"): + decode(bytes.fromhex("f904")) + + def test_decode_non_canonical_long_string_for_short(self) -> None: + """Using long string encoding for short string is non-canonical.""" + # 0xb801 indicates long string with 1-byte length containing 0x38 (56) + # but 0x38 <= 55, so this should be encoded as short string + with pytest.raises(RLPDecodingError, match=r"Non-canonical.*long string"): + # 0xb8 followed by length 0x37 (55) - should have used short encoding + decode(bytes.fromhex("b837") + b"a" * 55) + + def test_decode_non_canonical_long_list_for_short(self) -> None: + """Using long list encoding for short list is non-canonical.""" + # 0xf8 followed by length 0x37 (55) - should have used short encoding + with pytest.raises(RLPDecodingError, match=r"Non-canonical.*long list"): + decode(bytes.fromhex("f837") + bytes.fromhex("80") * 55) + + +class TestDecodeListFunction: + """Tests for the decode_list convenience function.""" + + def test_decode_list_success(self) -> None: + """decode_list returns list of bytes for flat list.""" + result = decode_list(bytes.fromhex("cc83646f6783676f6483636174")) + assert result == [b"dog", b"god", b"cat"] + + def test_decode_list_not_a_list(self) -> None: + """decode_list raises error when data is not a list.""" + with pytest.raises(RLPDecodingError, match=r"Expected RLP list"): + decode_list(bytes.fromhex("83646f67")) # Encodes "dog", not a list + + def test_decode_list_nested_list_rejected(self) -> None: + """decode_list raises error when list contains nested lists.""" + with pytest.raises(RLPDecodingError, match=r"Element .* is not bytes"): + decode_list(bytes.fromhex("c4c2c0c0c0")) # [[[], []], []] + + +class TestEncodeDecodeRoundtrip: + """Tests for encode/decode roundtrip invariants.""" + + @pytest.mark.parametrize( + "item", + [ + b"", + b"\x00", + b"\x7f", + b"\x80", + b"dog", + b"a" * SHORT_STRING_MAX_LEN, + b"a" * (SHORT_STRING_MAX_LEN + 1), + b"a" * 256, + [], + [b""], + [b"a", b"b", b"c"], + [[b"nested"]], + [[], [[]], [[], [[]]]], + [b"mixed", [b"nested", b"list"], b"end"], + ], + ) + def test_roundtrip(self, item: RLPItem) -> None: + """Encoding then decoding returns the original item.""" + encoded = encode(item) + decoded = decode(encoded) + assert decoded == item + + def test_roundtrip_large_nested_structure(self) -> None: + """Complex nested structure survives roundtrip.""" + inner = [b"asdf", b"qwer", b"zxcv"] + structure: RLPItem = [ + inner, + [inner, inner], + [[inner], [inner, inner]], + ] + encoded = encode(structure) + decoded = decode(encoded) + assert decoded == structure + + +class TestOfficialEthereumVectors: + """Tests using official Ethereum RLP test vectors.""" + + def test_emptystring(self) -> None: + """Official test vector: emptystring.""" + assert encode(b"") == bytes.fromhex("80") + assert decode(bytes.fromhex("80")) == b"" + + def test_bytestring00(self) -> None: + """Official test vector: bytestring00.""" + assert encode(b"\x00") == bytes.fromhex("00") + assert decode(bytes.fromhex("00")) == b"\x00" + + def test_bytestring01(self) -> None: + """Official test vector: bytestring01.""" + assert encode(b"\x01") == bytes.fromhex("01") + assert decode(bytes.fromhex("01")) == b"\x01" + + def test_bytestring7f(self) -> None: + """Official test vector: bytestring7F.""" + assert encode(b"\x7f") == bytes.fromhex("7f") + assert decode(bytes.fromhex("7f")) == b"\x7f" + + def test_shortstring(self) -> None: + """Official test vector: shortstring.""" + assert encode(b"dog") == bytes.fromhex("83646f67") + assert decode(bytes.fromhex("83646f67")) == b"dog" + + def test_shortstring2(self) -> None: + """Official test vector: shortstring2 (55 bytes - max short string).""" + data = b"Lorem ipsum dolor sit amet, consectetur adipisicing eli" + expected = bytes.fromhex( + "b74c6f72656d20697073756d20646f6c6f722073697420616d65742c20" + "636f6e7365637465747572206164697069736963696e6720656c69" + ) + assert encode(data) == expected + assert decode(expected) == data + + def test_longstring(self) -> None: + """Official test vector: longstring (56 bytes - min long string).""" + data = b"Lorem ipsum dolor sit amet, consectetur adipisicing elit" + expected = bytes.fromhex( + "b8384c6f72656d20697073756d20646f6c6f722073697420616d65742c20" + "636f6e7365637465747572206164697069736963696e6720656c6974" + ) + assert encode(data) == expected + assert decode(expected) == data + + def test_emptylist(self) -> None: + """Official test vector: emptylist.""" + assert encode([]) == bytes.fromhex("c0") + assert decode(bytes.fromhex("c0")) == [] + + def test_stringlist(self) -> None: + """Official test vector: stringlist.""" + data: RLPItem = [b"dog", b"god", b"cat"] + expected = bytes.fromhex("cc83646f6783676f6483636174") + assert encode(data) == expected + assert decode(expected) == data + + def test_multilist(self) -> None: + """Official test vector: multilist.""" + # "zw" = 0x7a77, [4] = 0x04, 1 = 0x01 + data: RLPItem = [b"zw", [b"\x04"], b"\x01"] + expected = bytes.fromhex("c6827a77c10401") + assert encode(data) == expected + assert decode(expected) == data + + def test_listsoflists(self) -> None: + """Official test vector: listsoflists.""" + data: RLPItem = [[[], []], []] + expected = bytes.fromhex("c4c2c0c0c0") + assert encode(data) == expected + assert decode(expected) == data + + def test_listsoflists2(self) -> None: + """Official test vector: listsoflists2.""" + data: RLPItem = [[], [[]], [[], [[]]]] + expected = bytes.fromhex("c7c0c1c0c3c0c1c0") + assert encode(data) == expected + assert decode(expected) == data + + def test_dicttest1(self) -> None: + """Official test vector: dictTest1 (list of key-value pairs).""" + data: RLPItem = [ + [b"key1", b"val1"], + [b"key2", b"val2"], + [b"key3", b"val3"], + [b"key4", b"val4"], + ] + expected = bytes.fromhex( + "ecca846b6579318476616c31ca846b6579328476616c32" + "ca846b6579338476616c33ca846b6579348476616c34" + ) + assert encode(data) == expected + assert decode(expected) == data + + def test_longlist1(self) -> None: + """Official test vector: longList1.""" + inner: RLPItem = [b"asdf", b"qwer", b"zxcv"] + data: RLPItem = [inner, inner, inner, inner] + expected = bytes.fromhex( + "f840cf84617364668471776572847a786376cf84617364668471776572847a786376" + "cf84617364668471776572847a786376cf84617364668471776572847a786376" + ) + assert encode(data) == expected + assert decode(expected) == data + + +class TestBoundaryConditions: + """Tests for boundary conditions based on module constants.""" + + def test_single_byte_max_boundary(self) -> None: + """Verify SINGLE_BYTE_MAX boundary (0x7f vs 0x80).""" + # 0x7f = single byte encoding + assert encode(bytes([SINGLE_BYTE_MAX])) == bytes([SINGLE_BYTE_MAX]) + # 0x80 = short string encoding + assert encode(bytes([SINGLE_BYTE_MAX + 1])) == bytes([0x81, 0x80]) + + def test_short_string_max_boundary(self) -> None: + """Verify SHORT_STRING_MAX_LEN boundary (55 vs 56 bytes).""" + # 55 bytes = short string encoding (prefix 0xb7) + data_55 = b"a" * SHORT_STRING_MAX_LEN + encoded_55 = encode(data_55) + assert encoded_55[0] == SHORT_STRING_PREFIX + SHORT_STRING_MAX_LEN # 0xb7 + + # 56 bytes = long string encoding (prefix 0xb8) + data_56 = b"a" * (SHORT_STRING_MAX_LEN + 1) + encoded_56 = encode(data_56) + assert encoded_56[0] == LONG_STRING_PREFIX # 0xb8 + + def test_short_list_max_boundary(self) -> None: + """Verify SHORT_LIST_MAX_LEN boundary (55 vs 56 bytes payload).""" + # 55 bytes payload = short list encoding (prefix 0xf7) + items_55: list[RLPItem] = [b"a" for _ in range(SHORT_LIST_MAX_LEN)] + encoded_55 = encode(items_55) + assert encoded_55[0] == SHORT_LIST_PREFIX + SHORT_LIST_MAX_LEN # 0xf7 + + # 56 bytes payload = long list encoding (prefix 0xf8) + items_56: list[RLPItem] = [b"a" for _ in range(SHORT_LIST_MAX_LEN + 1)] + encoded_56 = encode(items_56) + assert encoded_56[0] == LONG_LIST_PREFIX # 0xf8 + + def test_prefix_boundaries(self) -> None: + """Verify prefix range boundaries from RLP spec.""" + # Verify constants match RLP specification + assert SHORT_STRING_PREFIX == 0x80 + assert LONG_STRING_PREFIX == 0xB8 + assert SHORT_LIST_PREFIX == 0xC0 + assert LONG_LIST_PREFIX == 0xF8 + + # Short string prefix range: 0x80-0xb7 (length 0-55) + assert SHORT_STRING_PREFIX + SHORT_STRING_MAX_LEN == 0xB7 + + # Short list prefix range: 0xc0-0xf7 (length 0-55) + assert SHORT_LIST_PREFIX + SHORT_LIST_MAX_LEN == 0xF7