From 63267e57891dd662c68c5b08d35eca346a368afd Mon Sep 17 00:00:00 2001 From: "Maciek \"mab122\" Bator" Date: Fri, 24 Apr 2026 10:54:25 +0200 Subject: [PATCH] refactor: use RNS.Buffer for tunnel, drop dead code and --lora flag MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Switch bridge TCP↔RNS tunnel from fire-and-forget RNS.Packet to RNS.Buffer over RNS.Channel, which provides ordered reliable delivery with automatic retransmission. A dropped packet no longer silently corrupts Radicle's Noise session. Delete adapter.py, link.py, messages.py (and their tests) — these implemented a parallel peer-discovery and binary gossip layer that duplicates what Radicle handles natively over the bridge session. Remove the cmd_node, cmd_ping, cmd_peers CLI commands that used them. Remove --lora flag: Reticulum caps announce bandwidth at 2% per interface automatically, so application-level duty-cycle management is unnecessary. --announce-retry-delays remains for tuning startup timing. Co-Authored-By: Claude Sonnet 4.6 --- README.md | 15 +- src/radicle_reticulum/__init__.py | 20 -- src/radicle_reticulum/adapter.py | 331 ------------------------------ src/radicle_reticulum/bridge.py | 106 ++++------ src/radicle_reticulum/cli.py | 209 +------------------ src/radicle_reticulum/link.py | 177 ---------------- src/radicle_reticulum/messages.py | 313 ---------------------------- tests/test_adapter.py | 206 ------------------- tests/test_integration.py | 90 ++++---- tests/test_link.py | 196 ------------------ tests/test_messages.py | 192 ----------------- 11 files changed, 90 insertions(+), 1765 deletions(-) delete mode 100644 src/radicle_reticulum/adapter.py delete mode 100644 src/radicle_reticulum/link.py delete mode 100644 src/radicle_reticulum/messages.py delete mode 100644 tests/test_adapter.py delete mode 100644 tests/test_link.py delete mode 100644 tests/test_messages.py diff --git a/README.md b/README.md index 33ec4a6..17d778d 100644 --- a/README.md +++ b/README.md @@ -80,8 +80,6 @@ Tunnel 1 opened [Status] Tunnels: 1, Remote bridges: 1, TX: 1551, RX: 1831 ``` -> **LoRa:** Pass `--lora` to apply duty-cycle-safe announce delays (60 s, 300 s, 900 s) instead of the WiFi defaults. - ### Step 3 — Use radicle normally ```sh @@ -141,9 +139,6 @@ radicle-rns bridge # TCP↔RNS bridge radicle-rns seed # dedicated seed node + bridge + gossip radicle-rns gossip [RID ...] # standalone gossip relay radicle-rns setup # check prerequisites, print fix instructions -radicle-rns node # lightweight peer-announce node -radicle-rns peers # discover peers on the mesh -radicle-rns ping # RTT probe to a peer radicle-rns identity generate # create identity radicle-rns identity info # show DID and RNS hash ``` @@ -160,7 +155,6 @@ Global flags: `-v` verbose logging, `--identity PATH` (default `~/.radicle-rns/i | `--nid ` | auto-detect | Override local radicle NID | | `--no-auto-connect` | — | Disable auto-connect on discovery | | `--no-auto-seed` | — | Disable auto-registering remote NIDs | -| `--lora` | — | LoRa-safe defaults: announce delays 60,300,900 s | | `--announce-retry-delays` | 5,15,30 | Startup re-announce delays (seconds, comma-separated) | ### seed flags @@ -171,7 +165,6 @@ Global flags: `-v` verbose logging, `--identity PATH` (default `~/.radicle-rns/i | `--seed-port` | 8776 | TCP port for the seed radicle-node | | `--bridge-port` | 8778 | TCP listen port for the seed bridge | | `--poll-interval` | 30 | Seconds between gossip ref polls | -| `--lora` | — | LoRa-safe defaults | ### gossip flags @@ -180,7 +173,6 @@ Global flags: `-v` verbose logging, `--identity PATH` (default `~/.radicle-rns/i | `--nid` | auto-detect | Local radicle NID to advertise | | `--bridge-port` | 8777 | TCP port of the local bridge | | `--poll-interval` | 30 | Seconds between ref polls | -| `--lora` | — | LoRa-safe defaults (delays 60,300,900 s; poll 120 s) | --- @@ -193,20 +185,19 @@ radicle-node ─TCP─ RadicleBridge ──RNS Link── RadicleBridge ─TCP GossipRelay ──RNS Packet── GossipRelay ``` -Each discovered remote bridge gets its own OS-assigned TCP listen port, so radicle-node connections always route to the correct peer. All RNS packets are chunked to ≤383 B (LoRa encrypted MTU). +Each discovered remote bridge gets its own OS-assigned TCP listen port, so radicle-node connections always route to the correct peer. The tunnel uses `RNS.Buffer` over `RNS.Channel` for ordered, reliable delivery — Reticulum handles retransmission transparently across all interface types including LoRa. - **`identity.py`** — Ed25519 DID ↔ RNS identity; saved to `~/.radicle-rns/identity` -- **`bridge.py`** — TCP↔RNS tunnel, per-bridge port allocation, path maintenance, reconnect +- **`bridge.py`** — TCP↔RNS tunnel via `RNS.Buffer`, per-bridge port allocation, path maintenance - **`gossip.py`** — ref-change notifications, delta broadcasts, auto-seed for unknown repos - **`seed.py`** — dedicated radicle-node process lifecycle (separate RAD_HOME) -- **`adapter.py`** — RNS peer discovery and announce filtering --- ## Development ```sh -uv run pytest # 149 tests +uv run pytest # 97 tests uv run pytest -x -q # stop on first failure mypy src/ # type check ``` diff --git a/src/radicle_reticulum/__init__.py b/src/radicle_reticulum/__init__.py index 38de9d2..3b33c1c 100644 --- a/src/radicle_reticulum/__init__.py +++ b/src/radicle_reticulum/__init__.py @@ -1,17 +1,6 @@ """Radicle transport adapter for Reticulum mesh networking.""" from radicle_reticulum.identity import RadicleIdentity -from radicle_reticulum.adapter import RNSTransportAdapter -from radicle_reticulum.link import RadicleLink -from radicle_reticulum.messages import ( - MessageType, - NodeAnnouncement, - InventoryAnnouncement, - RefAnnouncement, - Ping, - Pong, - decode_message, -) from radicle_reticulum.bridge import RadicleBridge from radicle_reticulum.gossip import GossipRelay from radicle_reticulum.seed import SeedNode @@ -19,15 +8,6 @@ from radicle_reticulum.seed import SeedNode __version__ = "0.1.0" __all__ = [ "RadicleIdentity", - "RNSTransportAdapter", - "RadicleLink", - "MessageType", - "NodeAnnouncement", - "InventoryAnnouncement", - "RefAnnouncement", - "Ping", - "Pong", - "decode_message", "RadicleBridge", "GossipRelay", "SeedNode", diff --git a/src/radicle_reticulum/adapter.py b/src/radicle_reticulum/adapter.py deleted file mode 100644 index 5c54f36..0000000 --- a/src/radicle_reticulum/adapter.py +++ /dev/null @@ -1,331 +0,0 @@ -"""RNS Transport Adapter for Radicle. - -This is the core adapter that allows Radicle to use Reticulum as a transport. -It provides: -- Destination registration and announcement -- Incoming connection handling -- Outbound connection establishment -- Peer discovery via RNS announcements -""" - -import threading -import time -from dataclasses import dataclass, field -from typing import Callable, Dict, List, Optional, Set - -import RNS - -from radicle_reticulum.identity import RadicleIdentity -from radicle_reticulum.link import RadicleLink, LinkState - - -# Radicle app name for RNS destination -APP_NAME = "radicle" - -# Aspect for node connections -ASPECT_NODE = "node" - -# Aspect for repository-specific endpoints -ASPECT_REPO = "repo" - -# App data identifiers for filtering announces -NODE_APP_DATA_MAGIC = b"RADICLE_NODE_V1" -REPO_APP_DATA_MAGIC = b"RADICLE_REPO_V1" - - -@dataclass -class PeerInfo: - """Information about a discovered peer.""" - identity: RadicleIdentity - destination_hash: bytes - last_seen: float - announced_repos: Set[str] = field(default_factory=set) - - @property - def age(self) -> float: - """Seconds since last seen.""" - return time.time() - self.last_seen - - -class RNSTransportAdapter: - """Reticulum transport adapter for Radicle. - - Provides the bridge between Radicle's network layer and Reticulum. - Manages identity, destinations, links, and peer discovery. - """ - - def __init__( - self, - identity: Optional[RadicleIdentity] = None, - config_path: Optional[str] = None, - ): - """Initialize the adapter. - - Args: - identity: Radicle identity to use. Generated if not provided. - config_path: Path to Reticulum config. Uses default if not provided. - """ - # Initialize Reticulum - self.reticulum = RNS.Reticulum(config_path) - - # Set up identity - if identity is None: - identity = RadicleIdentity.generate() - self.identity = identity - - # Create main node destination - self.node_destination = RNS.Destination( - self.identity.rns_identity, - RNS.Destination.IN, - RNS.Destination.SINGLE, - APP_NAME, - ASPECT_NODE, - ) - - # Set up link handling - self.node_destination.set_link_established_callback(self._on_incoming_link) - - # Peer tracking - self._peers: Dict[bytes, PeerInfo] = {} - self._peers_lock = threading.Lock() - - # Active links - self._links: Dict[bytes, RadicleLink] = {} - self._links_lock = threading.Lock() - - # Callbacks - self._on_peer_discovered: Optional[Callable[[PeerInfo], None]] = None - self._on_incoming_connection: Optional[Callable[[RadicleLink], None]] = None - - # Repository destinations (hash -> destination) - self._repo_destinations: Dict[str, RNS.Destination] = {} - - RNS.log(f"RNS Transport Adapter initialized", RNS.LOG_INFO) - RNS.log(f" Node ID: {self.identity.did}", RNS.LOG_INFO) - RNS.log(f" RNS Hash: {self.identity.rns_hash_hex}", RNS.LOG_INFO) - - def start(self): - """Start the adapter and begin announcing presence.""" - # Register announce handler to discover peers - RNS.Transport.register_announce_handler(self._handle_announce) - - # Announce our node destination - self.announce() - - RNS.log("RNS Transport Adapter started", RNS.LOG_INFO) - - def stop(self): - """Stop the adapter and clean up.""" - # Close all active links - with self._links_lock: - for link in list(self._links.values()): - link.close() - self._links.clear() - - RNS.log("RNS Transport Adapter stopped", RNS.LOG_INFO) - - def announce(self, app_data: Optional[bytes] = None): - """Announce this node's presence on the network. - - Args: - app_data: Optional additional application data to include. - Could include repository list, capabilities, etc. - """ - # Prepend node magic for filtering, then append any extra app_data - full_app_data = NODE_APP_DATA_MAGIC - if app_data: - full_app_data += app_data - - self.node_destination.announce(app_data=full_app_data) - RNS.log(f"Announced node: {self.identity.rns_hash_hex}", RNS.LOG_DEBUG) - - def announce_repository(self, repo_id: str, repo_data: Optional[bytes] = None): - """Announce availability of a specific repository. - - Args: - repo_id: Repository identifier (typically a hash or DID). - repo_data: Optional metadata about the repository. - """ - if repo_id not in self._repo_destinations: - # Create destination for this repository - dest = RNS.Destination( - self.identity.rns_identity, - RNS.Destination.IN, - RNS.Destination.SINGLE, - APP_NAME, - ASPECT_REPO, - repo_id, - ) - dest.set_link_established_callback( - lambda link: self._on_incoming_link(link, repo_id) - ) - self._repo_destinations[repo_id] = dest - - # Prepend repo magic for filtering - full_app_data = REPO_APP_DATA_MAGIC - if repo_data: - full_app_data += repo_data - - self._repo_destinations[repo_id].announce(app_data=full_app_data) - RNS.log(f"Announced repository: {repo_id}", RNS.LOG_DEBUG) - - def connect( - self, - destination_hash: bytes, - timeout: float = 30.0, - ) -> Optional[RadicleLink]: - """Connect to a peer by destination hash. - - Args: - destination_hash: 16-byte RNS destination hash. - timeout: Connection timeout in seconds. - - Returns: - RadicleLink if connection successful, None otherwise. - """ - # Check if we already have a path to this destination - if not RNS.Transport.has_path(destination_hash): - RNS.Transport.request_path(destination_hash) - - # Wait for path to be established - deadline = time.time() + timeout - while not RNS.Transport.has_path(destination_hash): - if time.time() > deadline: - RNS.log(f"Path request timeout: {destination_hash.hex()}", RNS.LOG_WARNING) - return None - time.sleep(0.1) - - # Get the destination - identity = RNS.Identity.recall(destination_hash) - if identity is None: - RNS.log(f"Could not recall identity for: {destination_hash.hex()}", RNS.LOG_WARNING) - return None - - destination = RNS.Destination( - identity, - RNS.Destination.OUT, - RNS.Destination.SINGLE, - APP_NAME, - ASPECT_NODE, - ) - - # Create link - link = RadicleLink.create_outbound(destination) - - # Wait for link to establish - deadline = time.time() + timeout - while link.state == LinkState.PENDING: - if time.time() > deadline: - RNS.log(f"Link timeout: {destination_hash.hex()}", RNS.LOG_WARNING) - return None - time.sleep(0.1) - - if link.state != LinkState.ACTIVE: - return None - - # Track the link - with self._links_lock: - self._links[destination_hash] = link - - return link - - def connect_to_peer( - self, - peer: PeerInfo, - timeout: float = 30.0, - ) -> Optional[RadicleLink]: - """Connect to a discovered peer. - - Args: - peer: PeerInfo from peer discovery. - timeout: Connection timeout in seconds. - """ - return self.connect(peer.destination_hash, timeout) - - def get_peers(self) -> List[PeerInfo]: - """Get list of discovered peers.""" - with self._peers_lock: - return list(self._peers.values()) - - def get_peer_by_did(self, did: str) -> Optional[PeerInfo]: - """Look up a peer by their Radicle DID.""" - with self._peers_lock: - for peer in self._peers.values(): - if peer.identity.did == did: - return peer - return None - - def set_on_peer_discovered(self, callback: Callable[[PeerInfo], None]): - """Set callback for peer discovery events.""" - self._on_peer_discovered = callback - - def set_on_incoming_connection(self, callback: Callable[[RadicleLink], None]): - """Set callback for incoming connections.""" - self._on_incoming_connection = callback - - def _handle_announce( - self, - destination_hash: bytes, - announced_identity: RNS.Identity, - app_data: Optional[bytes], - ) -> None: - """Handle incoming announce from another node.""" - # Ignore our own announcements - if destination_hash == self.node_destination.hash: - return - - # Filter: only accept announces with node magic in app_data - if app_data is None or not app_data.startswith(NODE_APP_DATA_MAGIC): - # Not a radicle node announce, ignore - return - - # Create RadicleIdentity from announced identity - try: - radicle_id = RadicleIdentity.from_rns_identity(announced_identity) - except Exception as e: - RNS.log(f"Failed to create RadicleIdentity from announce: {e}", RNS.LOG_WARNING) - return - - # Update or create peer info - with self._peers_lock: - if destination_hash in self._peers: - peer = self._peers[destination_hash] - peer.last_seen = time.time() - else: - peer = PeerInfo( - identity=radicle_id, - destination_hash=destination_hash, - last_seen=time.time(), - ) - self._peers[destination_hash] = peer - RNS.log(f"Discovered peer: {radicle_id.did}", RNS.LOG_INFO) - - # Notify callback - if self._on_peer_discovered: - self._on_peer_discovered(peer) - - def _on_incoming_link(self, link: RNS.Link, repo_id: Optional[str] = None): - """Handle incoming link establishment.""" - radicle_link = RadicleLink.from_incoming(link) - - # Track the link - remote_hash = link.get_remote_identity().hash if link.get_remote_identity() else None - if remote_hash: - with self._links_lock: - self._links[remote_hash] = radicle_link - - RNS.log(f"Incoming connection established", RNS.LOG_INFO) - - # Notify callback - if self._on_incoming_connection: - self._on_incoming_connection(radicle_link) - - @property - def node_hash(self) -> bytes: - """Get this node's destination hash.""" - return self.node_destination.hash - - @property - def node_hash_hex(self) -> str: - """Get this node's destination hash as hex.""" - return self.node_destination.hexhash diff --git a/src/radicle_reticulum/bridge.py b/src/radicle_reticulum/bridge.py index bf127ef..eef5a10 100644 --- a/src/radicle_reticulum/bridge.py +++ b/src/radicle_reticulum/bridge.py @@ -15,6 +15,7 @@ The bridge: 4. Remote bridges forward to their local radicle-node """ +import io import json import os import socket @@ -25,7 +26,7 @@ import threading import time from dataclasses import dataclass, field from pathlib import Path -from typing import Callable, Dict, List, Optional, Set, Tuple +from typing import Any, Callable, Dict, List, Optional, Set, Tuple import RNS @@ -43,13 +44,15 @@ ASPECT_BRIDGE = "bridge" # App data identifier for bridge announces (used for filtering) BRIDGE_APP_DATA_MAGIC = b"RADICLE_BRIDGE_V1" -# Buffer sizes -TCP_BUFFER_SIZE = 65536 -# Read TCP in large chunks for efficiency; outbound data is chunked to -# RNS.Packet.ENCRYPTED_MDU (383 B) before sending so no single RNS.Packet -# exceeds the interface MTU on LoRa or any other constrained link. +# TCP read chunk size — large reads are fine; RNS.Channel handles chunking internally. TCP_READ_SIZE = 32768 +# Stream IDs for the bidirectional Buffer over RNS.Channel. +# Initiator (outbound): sends on stream 1, receives on stream 0. +# Responder (inbound): sends on stream 0, receives on stream 1. +_STREAM_INITIATOR_RECV = 0 +_STREAM_INITIATOR_SEND = 1 + @dataclass class TunnelConnection: @@ -62,10 +65,16 @@ class TunnelConnection: bytes_sent: int = 0 bytes_received: int = 0 active: bool = True + buf: Optional[Any] = None # io.BufferedRWPair from RNS.Buffer.create_bidirectional_buffer def close(self): """Close the tunnel.""" self.active = False + if self.buf: + try: + self.buf.close() + except Exception: + pass if self.tcp_socket: try: self.tcp_socket.close() @@ -506,8 +515,12 @@ class RadicleBridge: if self._tunnel_opened_cb: self._tunnel_opened_cb(tunnel) - rns_link.set_packet_callback( - lambda data, pkt: self._on_rns_data(tunnel_id, data) + channel = rns_link.get_channel() + tunnel.buf = RNS.Buffer.create_bidirectional_buffer( + _STREAM_INITIATOR_RECV, + _STREAM_INITIATOR_SEND, + channel, + ready_callback=lambda n: self._on_rns_data(tunnel_id, n), ) rns_link.set_link_closed_callback( lambda link: self._on_tunnel_closed(tunnel_id) @@ -516,13 +529,12 @@ class RadicleBridge: self._forward_tcp_to_rns(tunnel) def _forward_tcp_to_rns(self, tunnel: TunnelConnection): - """Forward data from TCP socket to RNS link.""" + """Forward data from TCP socket to RNS Channel buffer.""" tcp_socket = tunnel.tcp_socket tcp_socket.setblocking(False) while tunnel.active and self._running: try: - rns_link = tunnel.rns_link # read each iteration: may be updated by reconnect readable, _, errored = select.select([tcp_socket], [], [tcp_socket], 1.0) if errored: @@ -533,71 +545,33 @@ class RadicleBridge: if not data: break - if rns_link.status == RNS.Link.ACTIVE: - self._send_over_link(rns_link, data) - tunnel.bytes_sent += len(data) - elif tunnel.remote_destination: - RNS.log( - f"Tunnel {tunnel.tunnel_id}: link dropped, reconnecting...", - RNS.LOG_WARNING, - ) - new_link = self._reconnect_link(tunnel.remote_destination) - if new_link is None: - RNS.log( - f"Tunnel {tunnel.tunnel_id}: reconnect failed", - RNS.LOG_WARNING, - ) - break - RNS.log( - f"Tunnel {tunnel.tunnel_id}: reconnected", - RNS.LOG_INFO, - ) - tunnel.rns_link = new_link - tid = tunnel.tunnel_id - new_link.set_packet_callback( - lambda d, p: self._on_rns_data(tid, d) - ) - new_link.set_link_closed_callback( - lambda l: self._on_tunnel_closed(tid) - ) - self._send_over_link(new_link, data) - tunnel.bytes_sent += len(data) - else: - break + tunnel.buf.write(data) + tunnel.buf.flush() + tunnel.bytes_sent += len(data) except socket.error: break except Exception as e: - RNS.log(f"Forward error: {e}", RNS.LOG_DEBUG) + RNS.log(f"Tunnel {tunnel.tunnel_id} forward error: {e}", RNS.LOG_DEBUG) break self._on_tunnel_closed(tunnel.tunnel_id) - @staticmethod - def _send_over_link(link: RNS.Link, data: bytes): - """Send data over an RNS link, chunking to ENCRYPTED_MDU if needed. - - RNS.Packet raises IOError if data exceeds the interface MTU (~383 B on - LoRa). TCP chunks can be tens of kilobytes, so we split into MDU-sized - pieces. Order is preserved — the link is point-to-point and packets - from a single sender are delivered in order. - """ - mdu = RNS.Packet.ENCRYPTED_MDU - for offset in range(0, len(data), mdu): - RNS.Packet(link, data[offset:offset + mdu]).send() - - def _on_rns_data(self, tunnel_id: int, data: bytes): - """Handle data received from RNS link.""" + def _on_rns_data(self, tunnel_id: int, n_bytes: int): + """Drain n_bytes from the RNS Buffer and forward to the TCP socket.""" with self._tunnels_lock: tunnel = self._tunnels.get(tunnel_id) - if tunnel and tunnel.active and tunnel.tcp_socket: - try: + if not (tunnel and tunnel.active and tunnel.buf and tunnel.tcp_socket): + return + try: + data = tunnel.buf.read(n_bytes) + if data: tunnel.tcp_socket.sendall(data) tunnel.bytes_received += len(data) - except Exception as e: - RNS.log(f"TCP send error: {e}", RNS.LOG_DEBUG) - self._on_tunnel_closed(tunnel_id) + except Exception as e: + RNS.log(f"TCP send error on tunnel {tunnel_id}: {e}", RNS.LOG_DEBUG) + self._on_tunnel_closed(tunnel_id) def _on_tunnel_closed(self, tunnel_id: int): """Handle tunnel closure.""" @@ -642,8 +616,12 @@ class RadicleBridge: RNS.log(f"Incoming tunnel {tunnel_id} opened", RNS.LOG_INFO) - link.set_packet_callback( - lambda data, pkt: self._on_rns_data(tunnel_id, data) + channel = link.get_channel() + tunnel.buf = RNS.Buffer.create_bidirectional_buffer( + _STREAM_INITIATOR_SEND, # responder receives on the initiator's send stream + _STREAM_INITIATOR_RECV, # responder sends on the initiator's receive stream + channel, + ready_callback=lambda n: self._on_rns_data(tunnel_id, n), ) link.set_link_closed_callback( lambda l: self._on_tunnel_closed(tunnel_id) diff --git a/src/radicle_reticulum/cli.py b/src/radicle_reticulum/cli.py index 3e1ff31..d544a10 100644 --- a/src/radicle_reticulum/cli.py +++ b/src/radicle_reticulum/cli.py @@ -13,7 +13,6 @@ DEFAULT_IDENTITY_PATH = Path.home() / ".radicle-rns" / "identity" import RNS -from radicle_reticulum.adapter import RNSTransportAdapter, PeerInfo from radicle_reticulum.identity import RadicleIdentity from radicle_reticulum.bridge import RadicleBridge from radicle_reticulum.gossip import GossipRelay @@ -49,24 +48,6 @@ def detect_radicle_nid() -> Optional[str]: return None -LORA_ANNOUNCE_DELAYS = "60,300,900" -LORA_POLL_INTERVAL = 120 - - -def _apply_lora_defaults(args) -> None: - """Override delay/poll defaults with LoRa-safe values when --lora is set. - - Only overrides fields that were left at their defaults (argparse default - strings), so explicit user flags always take precedence. - """ - if not getattr(args, "lora", False): - return - if getattr(args, "announce_retry_delays", None) == "5,15,30": - args.announce_retry_delays = LORA_ANNOUNCE_DELAYS - if getattr(args, "poll_interval", None) == 30: - args.poll_interval = LORA_POLL_INTERVAL - - def _parse_delays(s: str) -> Tuple[int, ...]: try: return tuple(int(x.strip()) for x in s.split(",") if x.strip()) @@ -75,61 +56,6 @@ def _parse_delays(s: str) -> Tuple[int, ...]: sys.exit(1) -def on_peer_discovered(peer: PeerInfo): - """Callback when a new peer is discovered.""" - print(f"[+] Discovered peer: {peer.identity.did}") - print(f" RNS hash: {peer.destination_hash.hex()}") - - -def cmd_node(args): - """Run a Radicle-RNS node.""" - print("Starting Radicle-RNS node...") - - identity = RadicleIdentity.load_or_generate(args.identity) - _print_identity_info(args.identity) - - # Create adapter - adapter = RNSTransportAdapter(identity=identity) - adapter.set_on_peer_discovered(on_peer_discovered) - - print(f"Node ID: {identity.did}") - print(f"RNS Hash: {adapter.node_hash_hex}") - print() - - # Start adapter - adapter.start() - - print("Node running. Press Ctrl+C to stop.") - print("Announcing every 60 seconds...") - print() - - # Handle graceful shutdown - running = True - def signal_handler(sig, frame): - nonlocal running - print("\nShutting down...") - running = False - - signal.signal(signal.SIGINT, signal_handler) - signal.signal(signal.SIGTERM, signal_handler) - - # Main loop - last_announce = 0 - try: - while running: - now = time.time() - - # Periodic announce - if now - last_announce > 60: - adapter.announce() - last_announce = now - - time.sleep(0.5) - finally: - adapter.stop() - print("Node stopped.") - - def _print_identity_info(identity_path: Path): """Print identity file location (new or loaded).""" path = Path(identity_path) @@ -175,76 +101,6 @@ def cmd_identity(args): sys.exit(1) -def cmd_ping(args): - """Ping a peer by RNS hash.""" - print(f"Connecting to {args.destination}...") - - identity = RadicleIdentity.load_or_generate(args.identity) - adapter = RNSTransportAdapter(identity=identity) - adapter.start() - - try: - dest_hash = bytes.fromhex(args.destination) - except ValueError: - print("Error: Invalid destination hash (must be hex)", file=sys.stderr) - sys.exit(1) - - link = adapter.connect(dest_hash, timeout=args.timeout) - if link is None: - print("Failed to connect") - sys.exit(1) - - print(f"Connected! RTT: {link.rtt:.3f}s" if link.rtt else "Connected!") - - # Send ping - from radicle_reticulum.messages import Ping, Pong, decode_message, MessageType - import struct - - ping = Ping() - ping_time = time.time() - link.send(ping.to_message()) - print("Ping sent, waiting for pong...") - - response = link.recv(timeout=10.0) - if response: - header, msg = decode_message(response) - if header.msg_type == MessageType.PONG: - rtt = (time.time() - ping_time) * 1000 - print(f"Pong received! RTT: {rtt:.1f}ms") - else: - print(f"Unexpected response: {header.msg_type}") - else: - print("No response (timeout)") - - link.close() - adapter.stop() - - -def cmd_peers(args): - """List discovered peers.""" - identity = RadicleIdentity.load_or_generate(args.identity) - adapter = RNSTransportAdapter(identity=identity) - adapter.set_on_peer_discovered(on_peer_discovered) - adapter.start() - - print(f"Listening for peers for {args.timeout} seconds...") - print() - - time.sleep(args.timeout) - - peers = adapter.get_peers() - if peers: - print(f"\nDiscovered {len(peers)} peer(s):") - for peer in peers: - print(f" {peer.identity.did}") - print(f" Hash: {peer.destination_hash.hex()}") - print(f" Age: {peer.age:.1f}s") - else: - print("\nNo peers discovered.") - - adapter.stop() - - def _detect_rid(repo_path: Path) -> Optional[str]: """Detect the Radicle RID for the repo at repo_path via 'rad inspect'.""" try: @@ -264,7 +120,6 @@ def _detect_rid(repo_path: Path) -> Optional[str]: def cmd_gossip(args): """Run the gossip relay daemon.""" - _apply_lora_defaults(args) identity = RadicleIdentity.load_or_generate(args.identity) _print_identity_info(args.identity) @@ -337,7 +192,6 @@ def cmd_gossip(args): def cmd_seed(args): """Start a dedicated seed radicle-node, bridge, and gossip relay.""" - _apply_lora_defaults(args) seed_home = Path(args.seed_home) # Validate args before starting any processes @@ -460,7 +314,6 @@ def cmd_seed(args): def cmd_bridge(args): """Run Radicle-Reticulum bridge.""" - _apply_lora_defaults(args) print("Starting Radicle-Reticulum bridge...") identity = RadicleIdentity.load_or_generate(args.identity) @@ -720,10 +573,6 @@ def main(): help=f"Identity file (default: {DEFAULT_IDENTITY_PATH})", ) - # node command - node_parser = subparsers.add_parser("node", help="Run a Radicle-RNS node") - add_identity_arg(node_parser) - # identity command id_parser = subparsers.add_parser("identity", help="Identity operations") id_parser.add_argument( @@ -739,27 +588,6 @@ def main(): help="Overwrite existing identity file (for generate)" ) - # ping command - ping_parser = subparsers.add_parser("ping", help="Ping a peer") - ping_parser.add_argument("destination", help="RNS destination hash (hex)") - ping_parser.add_argument( - "-t", "--timeout", - type=float, - default=30.0, - help="Connection timeout (seconds)" - ) - add_identity_arg(ping_parser) - - # peers command - peers_parser = subparsers.add_parser("peers", help="Discover peers") - peers_parser.add_argument( - "-t", "--timeout", - type=float, - default=10.0, - help="Discovery timeout (seconds)" - ) - add_identity_arg(peers_parser) - # gossip command gossip_parser = subparsers.add_parser( "gossip", @@ -793,14 +621,7 @@ def main(): "--announce-retry-delays", default="5,15,30", metavar="SECONDS", - help="Startup re-announce delays, comma-separated (default: 5,15,30 for " - "WiFi/Ethernet). On LoRa use 60,300,900 to respect duty-cycle limits.", - ) - gossip_parser.add_argument( - "--lora", - action="store_true", - help="Shortcut for LoRa-safe settings: sets --announce-retry-delays=60,300,900 " - "and --poll-interval=120 (unless overridden explicitly).", + help="Startup re-announce delays in seconds, comma-separated (default: 5,15,30).", ) add_identity_arg(gossip_parser) @@ -840,14 +661,7 @@ def main(): "--announce-retry-delays", default="5,15,30", metavar="SECONDS", - help="Startup re-announce delays, comma-separated (default: 5,15,30 for " - "WiFi/Ethernet). On LoRa use 60,300,900 to respect duty-cycle limits.", - ) - seed_parser.add_argument( - "--lora", - action="store_true", - help="Shortcut for LoRa-safe settings: sets --announce-retry-delays=60,300,900 " - "and --poll-interval=120 (unless overridden explicitly).", + help="Startup re-announce delays in seconds, comma-separated (default: 5,15,30).", ) add_identity_arg(seed_parser) @@ -909,16 +723,7 @@ def main(): "--announce-retry-delays", default="5,15,30", metavar="SECONDS", - help=( - "Comma-separated delays for startup re-announces (default: 5,15,30 for " - "WiFi/Ethernet). On LoRa use 60,300,900 to respect duty-cycle limits." - ), - ) - bridge_parser.add_argument( - "--lora", - action="store_true", - help="Shortcut for LoRa-safe settings: sets --announce-retry-delays=60,300,900 " - "(unless overridden explicitly).", + help="Startup re-announce delays in seconds, comma-separated (default: 5,15,30).", ) add_identity_arg(bridge_parser) @@ -931,14 +736,8 @@ def main(): RNS.loglevel = RNS.LOG_INFO # Dispatch command - if args.command == "node": - cmd_node(args) - elif args.command == "identity": + if args.command == "identity": cmd_identity(args) - elif args.command == "ping": - cmd_ping(args) - elif args.command == "peers": - cmd_peers(args) elif args.command == "gossip": cmd_gossip(args) elif args.command == "seed": diff --git a/src/radicle_reticulum/link.py b/src/radicle_reticulum/link.py deleted file mode 100644 index cd01ab1..0000000 --- a/src/radicle_reticulum/link.py +++ /dev/null @@ -1,177 +0,0 @@ -"""RNS Link wrapper providing Radicle-compatible connection interface. - -Maps Radicle's Noise XK sessions to RNS Links, which provide: -- Encrypted bidirectional channels -- Forward secrecy via ephemeral ECDH -- Reliable ordered message delivery -""" - -import threading -import time -from dataclasses import dataclass, field -from enum import Enum -from typing import Callable, Optional -from collections import deque - -import RNS - - -class LinkState(Enum): - """Connection state for RadicleLink.""" - PENDING = "pending" - ACTIVE = "active" - CLOSED = "closed" - FAILED = "failed" - - -@dataclass -class RadicleLink: - """Wrapper around RNS.Link providing Radicle-compatible interface. - - RNS Links provide: - - ECDH key exchange (X25519) - - Symmetric encryption with forward secrecy - - Reliable delivery with automatic retransmission - - This maps to Radicle's Noise XK sessions conceptually. - """ - - rns_link: RNS.Link - state: LinkState = LinkState.PENDING - on_data: Optional[Callable[[bytes], None]] = None - on_close: Optional[Callable[[], None]] = None - _receive_buffer: deque = field(default_factory=deque) - _buffer_lock: threading.Lock = field(default_factory=threading.Lock) - _data_available: threading.Event = field(default_factory=threading.Event) - - def __post_init__(self): - """Set up RNS link callbacks.""" - # Only set established callback for pending links (outbound) - # For already-established links (incoming), this callback already fired - if self.state == LinkState.PENDING: - self.rns_link.set_link_established_callback(self._on_established) - - self.rns_link.set_link_closed_callback(self._on_closed) - self.rns_link.set_packet_callback(self._on_packet) - - @classmethod - def create_outbound( - cls, - destination: RNS.Destination, - on_data: Optional[Callable[[bytes], None]] = None, - on_close: Optional[Callable[[], None]] = None, - ) -> "RadicleLink": - """Create an outbound link to a destination.""" - rns_link = RNS.Link(destination) - return cls( - rns_link=rns_link, - on_data=on_data, - on_close=on_close, - ) - - @classmethod - def from_incoming( - cls, - rns_link: RNS.Link, - on_data: Optional[Callable[[bytes], None]] = None, - on_close: Optional[Callable[[], None]] = None, - ) -> "RadicleLink": - """Wrap an incoming RNS link.""" - link = cls( - rns_link=rns_link, - state=LinkState.ACTIVE, # Incoming links are already established - on_data=on_data, - on_close=on_close, - ) - return link - - def _on_established(self, link: RNS.Link): - """Called when link is established.""" - self.state = LinkState.ACTIVE - RNS.log(f"Link established: {link}", RNS.LOG_DEBUG) - - def _on_closed(self, link: RNS.Link): - """Called when link is closed.""" - self.state = LinkState.CLOSED - self._data_available.set() # Wake up any waiting readers - if self.on_close: - self.on_close() - RNS.log(f"Link closed: {link}", RNS.LOG_DEBUG) - - def _on_packet(self, message: bytes, packet: RNS.Packet): - """Called when a packet is received.""" - with self._buffer_lock: - self._receive_buffer.append(message) - self._data_available.set() - - if self.on_data: - self.on_data(message) - - def send(self, data: bytes) -> bool: - """Send data over the link. - - Returns True if packet was sent successfully. - """ - if self.state != LinkState.ACTIVE: - return False - - try: - packet = RNS.Packet(self.rns_link, data) - packet.send() - return True - except Exception as e: - RNS.log(f"Send failed: {e}", RNS.LOG_ERROR) - return False - - def recv(self, timeout: Optional[float] = None) -> Optional[bytes]: - """Receive data from the link. - - Blocks until data is available or timeout expires. - Returns None on timeout or if link is closed. - """ - deadline = time.time() + timeout if timeout else None - - while True: - with self._buffer_lock: - if self._receive_buffer: - return self._receive_buffer.popleft() - - if self.state == LinkState.CLOSED: - return None - - self._data_available.clear() - - remaining = None - if deadline: - remaining = deadline - time.time() - if remaining <= 0: - return None - - if not self._data_available.wait(timeout=remaining): - return None - - def close(self): - """Close the link.""" - if self.state == LinkState.ACTIVE: - self.rns_link.teardown() - self.state = LinkState.CLOSED - - @property - def is_active(self) -> bool: - """Check if link is active.""" - return self.state == LinkState.ACTIVE - - @property - def remote_identity(self) -> Optional[RNS.Identity]: - """Get the remote peer's identity if known.""" - return self.rns_link.get_remote_identity() - - @property - def rtt(self) -> Optional[float]: - """Get current round-trip time estimate in seconds.""" - if hasattr(self.rns_link, 'rtt') and self.rns_link.rtt: - return self.rns_link.rtt - return None - - def __repr__(self) -> str: - return f"RadicleLink(state={self.state.value}, rtt={self.rtt})" diff --git a/src/radicle_reticulum/messages.py b/src/radicle_reticulum/messages.py deleted file mode 100644 index 80fd46f..0000000 --- a/src/radicle_reticulum/messages.py +++ /dev/null @@ -1,313 +0,0 @@ -"""Message framing layer for Radicle protocol over RNS. - -Implements the gossip message types: -- Node Announcements: Broadcast Node IDs and network addresses -- Inventory Announcements: Share repository inventories for routing -- Reference Announcements: Push repository updates to subscribers - -Messages are serialized to a compact binary format suitable for -low-bandwidth Reticulum transports. -""" - -import struct -import hashlib -import time -from dataclasses import dataclass, field -from enum import IntEnum -from typing import List, Optional, Set - - -class MessageType(IntEnum): - """Radicle gossip message types.""" - NODE_ANNOUNCEMENT = 0x01 - INVENTORY_ANNOUNCEMENT = 0x02 - REF_ANNOUNCEMENT = 0x03 - PING = 0x10 - PONG = 0x11 - - -# Message header format: type (1 byte) + timestamp (8 bytes) + payload length (2 bytes) -HEADER_FORMAT = "!BQH" -HEADER_SIZE = struct.calcsize(HEADER_FORMAT) - -# Maximum payload size (64KB - header) -MAX_PAYLOAD_SIZE = 65535 - HEADER_SIZE - - -@dataclass -class MessageHeader: - """Common header for all Radicle messages.""" - msg_type: MessageType - timestamp: int # Unix timestamp in milliseconds - payload_length: int - - def encode(self) -> bytes: - """Encode header to bytes.""" - return struct.pack( - HEADER_FORMAT, - self.msg_type, - self.timestamp, - self.payload_length, - ) - - @classmethod - def decode(cls, data: bytes) -> "MessageHeader": - """Decode header from bytes.""" - if len(data) < HEADER_SIZE: - raise ValueError(f"Header too short: {len(data)} < {HEADER_SIZE}") - - msg_type_raw, timestamp, payload_length = struct.unpack( - HEADER_FORMAT, data[:HEADER_SIZE] - ) - try: - msg_type = MessageType(msg_type_raw) - except ValueError: - raise ValueError(f"Unknown message type: {msg_type_raw}") - - return cls( - msg_type=msg_type, - timestamp=timestamp, - payload_length=payload_length, - ) - - -@dataclass -class NodeAnnouncement: - """Announces a node's presence and capabilities. - - Broadcast periodically to enable peer discovery. - """ - node_id: str # DID (did:key:z6Mk...) - features: int = 0 # Bitmask of supported features - version: int = 1 # Protocol version - - def encode(self) -> bytes: - """Encode to bytes.""" - node_id_bytes = self.node_id.encode("utf-8") - return struct.pack( - f"!HH{len(node_id_bytes)}s", - self.features, - self.version, - node_id_bytes, - ) - - @classmethod - def decode(cls, data: bytes) -> "NodeAnnouncement": - """Decode from bytes.""" - features, version = struct.unpack("!HH", data[:4]) - node_id = data[4:].decode("utf-8") - return cls(node_id=node_id, features=features, version=version) - - def to_message(self) -> bytes: - """Wrap in a full message with header.""" - payload = self.encode() - header = MessageHeader( - msg_type=MessageType.NODE_ANNOUNCEMENT, - timestamp=int(time.time() * 1000), - payload_length=len(payload), - ) - return header.encode() + payload - - -@dataclass -class InventoryAnnouncement: - """Announces repositories hosted by a node. - - Used to build routing tables for repository discovery. - """ - node_id: str # DID of the announcing node - repositories: List[str] # List of repository IDs (hashes) - - def encode(self) -> bytes: - """Encode to bytes.""" - node_id_bytes = self.node_id.encode("utf-8") - repo_data = b"" - for repo_id in self.repositories: - repo_bytes = repo_id.encode("utf-8") - repo_data += struct.pack(f"!H{len(repo_bytes)}s", len(repo_bytes), repo_bytes) - - return struct.pack( - f"!H{len(node_id_bytes)}sH", - len(node_id_bytes), - node_id_bytes, - len(self.repositories), - ) + repo_data - - @classmethod - def decode(cls, data: bytes) -> "InventoryAnnouncement": - """Decode from bytes.""" - offset = 0 - - # Node ID - node_id_len = struct.unpack("!H", data[offset:offset+2])[0] - offset += 2 - node_id = data[offset:offset+node_id_len].decode("utf-8") - offset += node_id_len - - # Repository count - repo_count = struct.unpack("!H", data[offset:offset+2])[0] - offset += 2 - - # Repositories - repositories = [] - for _ in range(repo_count): - repo_len = struct.unpack("!H", data[offset:offset+2])[0] - offset += 2 - repo_id = data[offset:offset+repo_len].decode("utf-8") - offset += repo_len - repositories.append(repo_id) - - return cls(node_id=node_id, repositories=repositories) - - def to_message(self) -> bytes: - """Wrap in a full message with header.""" - payload = self.encode() - header = MessageHeader( - msg_type=MessageType.INVENTORY_ANNOUNCEMENT, - timestamp=int(time.time() * 1000), - payload_length=len(payload), - ) - return header.encode() + payload - - -@dataclass -class RefAnnouncement: - """Announces a reference update in a repository. - - Pushed to subscribers when refs change (commits, branches, etc). - """ - repository_id: str # Repository ID - ref_name: str # Reference name (e.g., "refs/heads/main") - old_oid: bytes # Previous object ID (20 bytes, all zeros if new) - new_oid: bytes # New object ID (20 bytes) - signature: bytes = b"" # Ed25519 signature of the update - - def encode(self) -> bytes: - """Encode to bytes.""" - repo_bytes = self.repository_id.encode("utf-8") - ref_bytes = self.ref_name.encode("utf-8") - - return struct.pack( - f"!H{len(repo_bytes)}sH{len(ref_bytes)}s20s20sH{len(self.signature)}s", - len(repo_bytes), - repo_bytes, - len(ref_bytes), - ref_bytes, - self.old_oid, - self.new_oid, - len(self.signature), - self.signature, - ) - - @classmethod - def decode(cls, data: bytes) -> "RefAnnouncement": - """Decode from bytes.""" - offset = 0 - - # Repository ID - repo_len = struct.unpack("!H", data[offset:offset+2])[0] - offset += 2 - repository_id = data[offset:offset+repo_len].decode("utf-8") - offset += repo_len - - # Ref name - ref_len = struct.unpack("!H", data[offset:offset+2])[0] - offset += 2 - ref_name = data[offset:offset+ref_len].decode("utf-8") - offset += ref_len - - # OIDs - old_oid = data[offset:offset+20] - offset += 20 - new_oid = data[offset:offset+20] - offset += 20 - - # Signature - sig_len = struct.unpack("!H", data[offset:offset+2])[0] - offset += 2 - signature = data[offset:offset+sig_len] - - return cls( - repository_id=repository_id, - ref_name=ref_name, - old_oid=old_oid, - new_oid=new_oid, - signature=signature, - ) - - def to_message(self) -> bytes: - """Wrap in a full message with header.""" - payload = self.encode() - header = MessageHeader( - msg_type=MessageType.REF_ANNOUNCEMENT, - timestamp=int(time.time() * 1000), - payload_length=len(payload), - ) - return header.encode() + payload - - -@dataclass -class Ping: - """Simple ping message for keepalive/latency measurement.""" - nonce: bytes = field(default_factory=lambda: struct.pack("!Q", int(time.time() * 1000))) - - def encode(self) -> bytes: - return self.nonce - - @classmethod - def decode(cls, data: bytes) -> "Ping": - return cls(nonce=data[:8]) - - def to_message(self) -> bytes: - payload = self.encode() - header = MessageHeader( - msg_type=MessageType.PING, - timestamp=int(time.time() * 1000), - payload_length=len(payload), - ) - return header.encode() + payload - - -@dataclass -class Pong: - """Response to ping.""" - nonce: bytes # Echo back the ping nonce - - def encode(self) -> bytes: - return self.nonce - - @classmethod - def decode(cls, data: bytes) -> "Pong": - return cls(nonce=data[:8]) - - def to_message(self) -> bytes: - payload = self.encode() - header = MessageHeader( - msg_type=MessageType.PONG, - timestamp=int(time.time() * 1000), - payload_length=len(payload), - ) - return header.encode() + payload - - -def decode_message(data: bytes): - """Decode a message from bytes. - - Returns tuple of (header, message_object). - """ - header = MessageHeader.decode(data) - payload = data[HEADER_SIZE:HEADER_SIZE + header.payload_length] - - decoders = { - MessageType.NODE_ANNOUNCEMENT: NodeAnnouncement.decode, - MessageType.INVENTORY_ANNOUNCEMENT: InventoryAnnouncement.decode, - MessageType.REF_ANNOUNCEMENT: RefAnnouncement.decode, - MessageType.PING: Ping.decode, - MessageType.PONG: Pong.decode, - } - - decoder = decoders.get(header.msg_type) - if decoder is None: - raise ValueError(f"Unknown message type: {header.msg_type}") - - return header, decoder(payload) diff --git a/tests/test_adapter.py b/tests/test_adapter.py deleted file mode 100644 index 6a5d603..0000000 --- a/tests/test_adapter.py +++ /dev/null @@ -1,206 +0,0 @@ -"""Tests for RNSTransportAdapter peer discovery logic (RNS networking mocked).""" - -import time -from unittest.mock import MagicMock, patch, call - -import pytest -import RNS - -from radicle_reticulum.adapter import ( - PeerInfo, - RNSTransportAdapter, - NODE_APP_DATA_MAGIC, - REPO_APP_DATA_MAGIC, -) -from radicle_reticulum.identity import RadicleIdentity - - -# --------------------------------------------------------------------------- -# Helpers -# --------------------------------------------------------------------------- - -def _make_dest_mock(hash_bytes: bytes = b"\xaa" * 16) -> MagicMock: - dest = MagicMock() - dest.hash = hash_bytes - dest.hexhash = hash_bytes.hex() - return dest - - -def _make_adapter() -> RNSTransportAdapter: - """Instantiate adapter with all RNS I/O patched out.""" - identity = RadicleIdentity.generate() - dest_mock = _make_dest_mock() - - with patch("radicle_reticulum.adapter.RNS.Reticulum"), \ - patch("radicle_reticulum.adapter.RNS.Destination", return_value=dest_mock), \ - patch("radicle_reticulum.adapter.RNS.log"): - adapter = RNSTransportAdapter(identity=identity) - - return adapter - - -# --------------------------------------------------------------------------- -# PeerInfo -# --------------------------------------------------------------------------- - -class TestPeerInfo: - def test_age_increases_over_time(self): - identity = RadicleIdentity.generate() - peer = PeerInfo( - identity=identity, - destination_hash=b"\x01" * 16, - last_seen=time.time() - 5.0, - ) - assert peer.age >= 5.0 - - def test_age_near_zero_for_fresh_peer(self): - identity = RadicleIdentity.generate() - peer = PeerInfo( - identity=identity, - destination_hash=b"\x02" * 16, - last_seen=time.time(), - ) - assert peer.age < 1.0 - - def test_announced_repos_defaults_empty(self): - identity = RadicleIdentity.generate() - peer = PeerInfo(identity=identity, destination_hash=b"\x03" * 16, last_seen=0) - assert peer.announced_repos == set() - - -# --------------------------------------------------------------------------- -# Adapter construction & peer list -# --------------------------------------------------------------------------- - -class TestAdapterConstruction: - def test_identity_is_stored(self): - adapter = _make_adapter() - assert adapter.identity is not None - assert adapter.identity.did.startswith("did:key:") - - def test_initial_peer_list_is_empty(self): - adapter = _make_adapter() - assert adapter.get_peers() == [] - - def test_get_peer_by_did_returns_none_when_absent(self): - adapter = _make_adapter() - assert adapter.get_peer_by_did("did:key:z6Mkunknown") is None - - -# --------------------------------------------------------------------------- -# _handle_announce -# --------------------------------------------------------------------------- - -class TestHandleAnnounce: - def _make_rns_identity(self) -> RNS.Identity: - """Create a real RNS.Identity (keypair only, no networking).""" - return RNS.Identity() - - def test_ignores_own_announce(self): - adapter = _make_adapter() - own_hash = adapter.node_destination.hash # the mock's hash - - discovered = [] - adapter.set_on_peer_discovered(discovered.append) - - rns_id = self._make_rns_identity() - with patch("radicle_reticulum.adapter.RNS.log"): - adapter._handle_announce(own_hash, rns_id, NODE_APP_DATA_MAGIC) - - assert discovered == [] - assert adapter.get_peers() == [] - - def test_ignores_non_radicle_announce(self): - adapter = _make_adapter() - rns_id = self._make_rns_identity() - foreign_hash = b"\xbb" * 16 - - discovered = [] - adapter.set_on_peer_discovered(discovered.append) - - with patch("radicle_reticulum.adapter.RNS.log"): - adapter._handle_announce(foreign_hash, rns_id, b"SOME_OTHER_APP") - - assert discovered == [] - - def test_ignores_announce_with_no_app_data(self): - adapter = _make_adapter() - rns_id = self._make_rns_identity() - - with patch("radicle_reticulum.adapter.RNS.log"): - adapter._handle_announce(b"\xcc" * 16, rns_id, None) - - assert adapter.get_peers() == [] - - def test_discovers_valid_peer(self): - adapter = _make_adapter() - rns_id = self._make_rns_identity() - peer_hash = b"\xdd" * 16 - - discovered = [] - adapter.set_on_peer_discovered(discovered.append) - - with patch("radicle_reticulum.adapter.RNS.log"): - adapter._handle_announce(peer_hash, rns_id, NODE_APP_DATA_MAGIC) - - assert len(discovered) == 1 - peers = adapter.get_peers() - assert len(peers) == 1 - assert peers[0].destination_hash == peer_hash - - def test_second_announce_updates_last_seen_not_duplicates(self): - adapter = _make_adapter() - rns_id = self._make_rns_identity() - peer_hash = b"\xee" * 16 - - discovered = [] - adapter.set_on_peer_discovered(discovered.append) - - with patch("radicle_reticulum.adapter.RNS.log"): - adapter._handle_announce(peer_hash, rns_id, NODE_APP_DATA_MAGIC) - old_seen = adapter.get_peers()[0].last_seen - time.sleep(0.01) - adapter._handle_announce(peer_hash, rns_id, NODE_APP_DATA_MAGIC) - - # Callback only fires once (first discovery) - assert len(discovered) == 1 - # But last_seen was refreshed - assert adapter.get_peers()[0].last_seen >= old_seen - - def test_multiple_distinct_peers_are_all_tracked(self): - adapter = _make_adapter() - - with patch("radicle_reticulum.adapter.RNS.log"): - for i in range(3): - rns_id = self._make_rns_identity() - peer_hash = bytes([i]) * 16 - adapter._handle_announce(peer_hash, rns_id, NODE_APP_DATA_MAGIC) - - assert len(adapter.get_peers()) == 3 - - def test_get_peer_by_did_returns_correct_peer(self): - adapter = _make_adapter() - rns_id = self._make_rns_identity() - peer_hash = b"\xff" * 16 - - with patch("radicle_reticulum.adapter.RNS.log"): - adapter._handle_announce(peer_hash, rns_id, NODE_APP_DATA_MAGIC) - - peers = adapter.get_peers() - did = peers[0].identity.did - found = adapter.get_peer_by_did(did) - assert found is not None - assert found.destination_hash == peer_hash - - def test_extra_app_data_after_magic_still_accepted(self): - """Announces with extra bytes after the magic prefix are valid.""" - adapter = _make_adapter() - rns_id = self._make_rns_identity() - peer_hash = b"\x11" * 16 - - with patch("radicle_reticulum.adapter.RNS.log"): - adapter._handle_announce( - peer_hash, rns_id, NODE_APP_DATA_MAGIC + b"\x00\x01extra" - ) - - assert len(adapter.get_peers()) == 1 diff --git a/tests/test_integration.py b/tests/test_integration.py index 6e59142..69b3a9c 100644 --- a/tests/test_integration.py +++ b/tests/test_integration.py @@ -296,72 +296,64 @@ class TestBridgeDiscoveryIntegration: class TestTCPTunnelIntegration: """Test bidirectional data forwarding through the bridge's tunnel layer.""" - def test_data_forwarded_tcp_to_rns_link(self): - """Data written to the TCP socket should be sent as an RNS.Packet.""" + def test_data_forwarded_tcp_to_rns_buffer(self): + """Data written to the TCP socket should be written to the RNS Buffer.""" bridge = _make_bridge() bridge._running = True - # Create a real socket pair to simulate radicle-node ↔ bridge TCP local, remote = socket.socketpair() - sent_data = [] - mock_link = MagicMock() - mock_link.status = MagicMock() - mock_link.status.__eq__ = lambda s, other: other == "ACTIVE" + written_data = [] + mock_buf = MagicMock() + mock_buf.write.side_effect = lambda d: written_data.append(bytes(d)) - mock_packet = MagicMock() + from radicle_reticulum.bridge import TunnelConnection + tunnel = TunnelConnection( + tunnel_id=1, + tcp_socket=local, + rns_link=MagicMock(), + remote_destination=b"\x00" * 16, + buf=mock_buf, + ) - with patch("radicle_reticulum.bridge.RNS.Link.ACTIVE", "ACTIVE"), \ - patch("radicle_reticulum.bridge.RNS.Packet") as mock_pkt_cls, \ - patch("radicle_reticulum.bridge.RNS.log"): - mock_pkt_cls.side_effect = lambda lnk, data: sent_data.append(data) or MagicMock() - mock_pkt_cls.ENCRYPTED_MDU = 383 # must be set so chunking uses the real value - - from radicle_reticulum.bridge import TunnelConnection - tunnel = TunnelConnection( - tunnel_id=1, - tcp_socket=local, - rns_link=mock_link, - remote_destination=b"\x00" * 16, - ) - - # Write data from the "radicle-node" side - payload = b"hello from radicle-node" - remote.sendall(payload) - remote.close() # triggers EOF so forward loop exits + payload = b"hello from radicle-node" + remote.sendall(payload) + remote.close() + with patch("radicle_reticulum.bridge.RNS.log"): bridge._forward_tcp_to_rns(tunnel) local.close() - # Payload fits in one MDU chunk, so it should arrive as a single packet - assert sent_data == [payload], \ - f"Expected [{payload!r}], got {sent_data}" + assert b"".join(written_data) == payload def test_rns_data_forwarded_to_tcp_socket(self): - """Data received from RNS should be written to the TCP socket.""" + """Data received from RNS Buffer should be written to the TCP socket.""" bridge = _make_bridge() bridge._running = True + local, remote = socket.socketpair() + + payload = b"hello from remote radicle-node" + mock_buf = MagicMock() + mock_buf.read.return_value = payload + + from radicle_reticulum.bridge import TunnelConnection + tunnel = TunnelConnection( + tunnel_id=2, + tcp_socket=local, + rns_link=MagicMock(), + remote_destination=b"\x00" * 16, + buf=mock_buf, + ) + + with bridge._tunnels_lock: + bridge._tunnels[2] = tunnel + with patch("radicle_reticulum.bridge.RNS.log"): - # Create socket pair: bridge writes to `local`, test reads from `remote` - local, remote = socket.socketpair() + bridge._on_rns_data(2, len(payload)) - from radicle_reticulum.bridge import TunnelConnection - tunnel = TunnelConnection( - tunnel_id=2, - tcp_socket=local, - rns_link=MagicMock(), - remote_destination=b"\x00" * 16, - ) - - with bridge._tunnels_lock: - bridge._tunnels[2] = tunnel - - payload = b"hello from remote radicle-node" - bridge._on_rns_data(2, payload) - - received = remote.recv(1024) - local.close() - remote.close() + received = remote.recv(1024) + local.close() + remote.close() assert received == payload diff --git a/tests/test_link.py b/tests/test_link.py deleted file mode 100644 index d355a89..0000000 --- a/tests/test_link.py +++ /dev/null @@ -1,196 +0,0 @@ -"""Tests for RadicleLink (pure logic — no RNS networking required).""" - -import threading -import time -from unittest.mock import MagicMock, patch - -import pytest - -from radicle_reticulum.link import RadicleLink, LinkState - - -def make_link(state: LinkState = LinkState.ACTIVE) -> tuple[RadicleLink, MagicMock]: - """Create a RadicleLink with a mock RNS.Link.""" - mock_rns_link = MagicMock() - mock_rns_link.rtt = None - link = RadicleLink(rns_link=mock_rns_link, state=state) - return link, mock_rns_link - - -class TestRadicleLinkState: - def test_active_link_is_active(self): - link, _ = make_link(LinkState.ACTIVE) - assert link.is_active - assert link.state == LinkState.ACTIVE - - def test_pending_link_is_not_active(self): - link, _ = make_link(LinkState.PENDING) - assert not link.is_active - - def test_on_established_sets_active(self): - link, _ = make_link(LinkState.PENDING) - link._on_established(MagicMock()) - assert link.state == LinkState.ACTIVE - assert link.is_active - - def test_on_closed_sets_closed(self): - link, _ = make_link(LinkState.ACTIVE) - link._on_closed(MagicMock()) - assert link.state == LinkState.CLOSED - assert not link.is_active - - def test_close_calls_teardown(self): - link, mock_rns = make_link(LinkState.ACTIVE) - link.close() - mock_rns.teardown.assert_called_once() - assert link.state == LinkState.CLOSED - - def test_close_on_inactive_link_is_noop(self): - link, mock_rns = make_link(LinkState.CLOSED) - link.close() - mock_rns.teardown.assert_not_called() - - def test_repr(self): - link, _ = make_link(LinkState.ACTIVE) - r = repr(link) - assert "active" in r - assert "RadicleLink" in r - - -class TestRadicleLinkSend: - def test_send_on_active_link_succeeds(self): - link, _ = make_link(LinkState.ACTIVE) - with patch("radicle_reticulum.link.RNS.Packet") as mock_packet_cls: - mock_packet = MagicMock() - mock_packet_cls.return_value = mock_packet - result = link.send(b"hello world") - assert result is True - mock_packet.send.assert_called_once() - - def test_send_on_inactive_link_returns_false(self): - link, _ = make_link(LinkState.CLOSED) - result = link.send(b"data") - assert result is False - - def test_send_on_pending_link_returns_false(self): - link, _ = make_link(LinkState.PENDING) - result = link.send(b"data") - assert result is False - - def test_send_exception_returns_false(self): - link, _ = make_link(LinkState.ACTIVE) - with patch("radicle_reticulum.link.RNS.Packet") as mock_packet_cls: - mock_packet_cls.side_effect = RuntimeError("send failed") - result = link.send(b"data") - assert result is False - - -class TestRadicleLinkRecv: - def test_recv_returns_buffered_data(self): - link, _ = make_link() - link._on_packet(b"buffered", MagicMock()) - result = link.recv(timeout=0.1) - assert result == b"buffered" - - def test_recv_returns_in_order(self): - link, _ = make_link() - for i in range(3): - link._on_packet(f"msg{i}".encode(), MagicMock()) - assert link.recv(timeout=0.1) == b"msg0" - assert link.recv(timeout=0.1) == b"msg1" - assert link.recv(timeout=0.1) == b"msg2" - - def test_recv_timeout_returns_none(self): - link, _ = make_link() - start = time.time() - result = link.recv(timeout=0.05) - elapsed = time.time() - start - assert result is None - assert elapsed >= 0.04 - - def test_recv_woken_by_data(self): - link, _ = make_link() - results = [] - - def delayed_send(): - time.sleep(0.05) - link._on_packet(b"delayed", MagicMock()) - - t = threading.Thread(target=delayed_send, daemon=True) - t.start() - result = link.recv(timeout=1.0) - t.join() - assert result == b"delayed" - - def test_recv_returns_none_when_closed(self): - link, _ = make_link() - link._on_closed(MagicMock()) - result = link.recv(timeout=0.1) - assert result is None - - def test_recv_woken_by_close(self): - """recv() unblocks when link closes while waiting.""" - link, _ = make_link() - result_holder = [] - - def close_after_delay(): - time.sleep(0.05) - link._on_closed(MagicMock()) - - t = threading.Thread(target=close_after_delay, daemon=True) - t.start() - result = link.recv(timeout=2.0) - t.join() - assert result is None - - def test_on_packet_calls_on_data_callback(self): - link, _ = make_link() - received = [] - link.on_data = received.append - link._on_packet(b"callback data", MagicMock()) - assert received == [b"callback data"] - - def test_on_closed_calls_on_close_callback(self): - link, _ = make_link() - called = [] - link.on_close = lambda: called.append(True) - link._on_closed(MagicMock()) - assert called == [True] - - -class TestRadicleLinkProperties: - def test_rtt_returns_none_when_unavailable(self): - link, mock_rns = make_link() - mock_rns.rtt = None - assert link.rtt is None - - def test_rtt_returns_value_when_available(self): - link, mock_rns = make_link() - mock_rns.rtt = 0.15 - assert link.rtt == pytest.approx(0.15) - - def test_remote_identity_delegates_to_rns(self): - link, mock_rns = make_link() - fake_id = MagicMock() - mock_rns.get_remote_identity.return_value = fake_id - assert link.remote_identity is fake_id - - -class TestRadicleLinkFactories: - def test_from_incoming_is_active(self): - mock_rns = MagicMock() - link = RadicleLink.from_incoming(mock_rns) - assert link.state == LinkState.ACTIVE - - def test_from_incoming_does_not_set_established_callback(self): - mock_rns = MagicMock() - RadicleLink.from_incoming(mock_rns) - mock_rns.set_link_established_callback.assert_not_called() - - def test_create_outbound_is_pending(self): - mock_dest = MagicMock() - with patch("radicle_reticulum.link.RNS.Link") as mock_link_cls: - mock_rns = MagicMock() - mock_link_cls.return_value = mock_rns - link = RadicleLink.create_outbound(mock_dest) - assert link.state == LinkState.PENDING diff --git a/tests/test_messages.py b/tests/test_messages.py deleted file mode 100644 index a353a05..0000000 --- a/tests/test_messages.py +++ /dev/null @@ -1,192 +0,0 @@ -"""Tests for message framing.""" - -import pytest -import time -from radicle_reticulum.messages import ( - MessageType, - MessageHeader, - NodeAnnouncement, - InventoryAnnouncement, - RefAnnouncement, - Ping, - Pong, - decode_message, - HEADER_SIZE, -) - - -class TestMessageHeader: - """Test message header encoding/decoding.""" - - def test_encode_decode_roundtrip(self): - """Test header encode/decode roundtrip.""" - header = MessageHeader( - msg_type=MessageType.NODE_ANNOUNCEMENT, - timestamp=1234567890123, - payload_length=42, - ) - - encoded = header.encode() - assert len(encoded) == HEADER_SIZE - - decoded = MessageHeader.decode(encoded) - assert decoded.msg_type == header.msg_type - assert decoded.timestamp == header.timestamp - assert decoded.payload_length == header.payload_length - - def test_header_too_short(self): - """Test that short data raises error.""" - with pytest.raises(ValueError, match="Header too short"): - MessageHeader.decode(b"\x00\x01") - - -class TestNodeAnnouncement: - """Test NodeAnnouncement message.""" - - def test_encode_decode_roundtrip(self): - """Test encode/decode roundtrip.""" - msg = NodeAnnouncement( - node_id="did:key:z6MkhaXgBZDvotDkL5257faiztiGiC2QtKLGpbnnEGta2doK", - features=0x0003, - version=1, - ) - - encoded = msg.encode() - decoded = NodeAnnouncement.decode(encoded) - - assert decoded.node_id == msg.node_id - assert decoded.features == msg.features - assert decoded.version == msg.version - - def test_to_message_includes_header(self): - """Test that to_message includes proper header.""" - msg = NodeAnnouncement(node_id="did:key:z6Mk...") - - full_message = msg.to_message() - header = MessageHeader.decode(full_message) - - assert header.msg_type == MessageType.NODE_ANNOUNCEMENT - assert header.timestamp > 0 - assert header.payload_length == len(msg.encode()) - - -class TestInventoryAnnouncement: - """Test InventoryAnnouncement message.""" - - def test_encode_decode_roundtrip(self): - """Test encode/decode roundtrip.""" - msg = InventoryAnnouncement( - node_id="did:key:z6MkhaXgBZDvotDkL5257faiztiGiC2QtKLGpbnnEGta2doK", - repositories=[ - "rad:z3gqcJUoA1n9HaHKufZs5FCSGazv5", - "rad:z4gqcJUoA1n9HaHKufZs5FCSGazv6", - ], - ) - - encoded = msg.encode() - decoded = InventoryAnnouncement.decode(encoded) - - assert decoded.node_id == msg.node_id - assert decoded.repositories == msg.repositories - - def test_empty_repositories(self): - """Test with empty repository list.""" - msg = InventoryAnnouncement( - node_id="did:key:z6Mk...", - repositories=[], - ) - - encoded = msg.encode() - decoded = InventoryAnnouncement.decode(encoded) - - assert decoded.repositories == [] - - -class TestRefAnnouncement: - """Test RefAnnouncement message.""" - - def test_encode_decode_roundtrip(self): - """Test encode/decode roundtrip.""" - msg = RefAnnouncement( - repository_id="rad:z3gqcJUoA1n9HaHKufZs5FCSGazv5", - ref_name="refs/heads/main", - old_oid=b"\x00" * 20, - new_oid=bytes.fromhex("abc123def456789012345678901234567890abcd"), - signature=b"fake_signature_bytes", - ) - - encoded = msg.encode() - decoded = RefAnnouncement.decode(encoded) - - assert decoded.repository_id == msg.repository_id - assert decoded.ref_name == msg.ref_name - assert decoded.old_oid == msg.old_oid - assert decoded.new_oid == msg.new_oid - assert decoded.signature == msg.signature - - -class TestPingPong: - """Test Ping/Pong messages.""" - - def test_ping_encode_decode(self): - """Test Ping encode/decode.""" - ping = Ping() - encoded = ping.encode() - decoded = Ping.decode(encoded) - - assert decoded.nonce == ping.nonce - - def test_pong_echoes_nonce(self): - """Test Pong echoes ping nonce.""" - ping = Ping() - pong = Pong(nonce=ping.nonce) - - assert pong.nonce == ping.nonce - - -class TestDecodeMessage: - """Test the decode_message function.""" - - def test_decode_node_announcement(self): - """Test decoding a NodeAnnouncement message.""" - msg = NodeAnnouncement(node_id="did:key:z6Mk...") - full_message = msg.to_message() - - header, decoded = decode_message(full_message) - - assert header.msg_type == MessageType.NODE_ANNOUNCEMENT - assert isinstance(decoded, NodeAnnouncement) - assert decoded.node_id == msg.node_id - - def test_decode_inventory_announcement(self): - """Test decoding an InventoryAnnouncement message.""" - msg = InventoryAnnouncement( - node_id="did:key:z6Mk...", - repositories=["repo1", "repo2"], - ) - full_message = msg.to_message() - - header, decoded = decode_message(full_message) - - assert header.msg_type == MessageType.INVENTORY_ANNOUNCEMENT - assert isinstance(decoded, InventoryAnnouncement) - assert decoded.repositories == msg.repositories - - def test_decode_ping(self): - """Test decoding a Ping message.""" - ping = Ping() - full_message = ping.to_message() - - header, decoded = decode_message(full_message) - - assert header.msg_type == MessageType.PING - assert isinstance(decoded, Ping) - - def test_unknown_message_type_raises(self): - """Test that unknown message types raise error.""" - # Create a message with invalid type - import struct - bad_message = struct.pack("!BQH", 0xFF, 0, 0) - - with pytest.raises(ValueError, match="Unknown message type"): - decode_message(bad_message)