refactor: use RNS.Buffer for tunnel, drop dead code and --lora flag

Switch bridge TCP↔RNS tunnel from fire-and-forget RNS.Packet to
RNS.Buffer over RNS.Channel, which provides ordered reliable delivery
with automatic retransmission. A dropped packet no longer silently
corrupts Radicle's Noise session.

Delete adapter.py, link.py, messages.py (and their tests) — these
implemented a parallel peer-discovery and binary gossip layer that
duplicates what Radicle handles natively over the bridge session.
Remove the cmd_node, cmd_ping, cmd_peers CLI commands that used them.

Remove --lora flag: Reticulum caps announce bandwidth at 2% per
interface automatically, so application-level duty-cycle management
is unnecessary. --announce-retry-delays remains for tuning startup
timing.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Maciek "mab122" Bator 2026-04-24 10:54:25 +02:00
parent aff4719910
commit 63267e5789
11 changed files with 90 additions and 1765 deletions

View File

@ -80,8 +80,6 @@ Tunnel 1 opened
[Status] Tunnels: 1, Remote bridges: 1, TX: 1551, RX: 1831
```
> **LoRa:** Pass `--lora` to apply duty-cycle-safe announce delays (60 s, 300 s, 900 s) instead of the WiFi defaults.
### Step 3 — Use radicle normally
```sh
@ -141,9 +139,6 @@ radicle-rns bridge # TCP↔RNS bridge
radicle-rns seed # dedicated seed node + bridge + gossip
radicle-rns gossip [RID ...] # standalone gossip relay
radicle-rns setup # check prerequisites, print fix instructions
radicle-rns node # lightweight peer-announce node
radicle-rns peers # discover peers on the mesh
radicle-rns ping <hash> # RTT probe to a peer
radicle-rns identity generate # create identity
radicle-rns identity info # show DID and RNS hash
```
@ -160,7 +155,6 @@ Global flags: `-v` verbose logging, `--identity PATH` (default `~/.radicle-rns/i
| `--nid <NID>` | auto-detect | Override local radicle NID |
| `--no-auto-connect` | — | Disable auto-connect on discovery |
| `--no-auto-seed` | — | Disable auto-registering remote NIDs |
| `--lora` | — | LoRa-safe defaults: announce delays 60,300,900 s |
| `--announce-retry-delays` | 5,15,30 | Startup re-announce delays (seconds, comma-separated) |
### seed flags
@ -171,7 +165,6 @@ Global flags: `-v` verbose logging, `--identity PATH` (default `~/.radicle-rns/i
| `--seed-port` | 8776 | TCP port for the seed radicle-node |
| `--bridge-port` | 8778 | TCP listen port for the seed bridge |
| `--poll-interval` | 30 | Seconds between gossip ref polls |
| `--lora` | — | LoRa-safe defaults |
### gossip flags
@ -180,7 +173,6 @@ Global flags: `-v` verbose logging, `--identity PATH` (default `~/.radicle-rns/i
| `--nid` | auto-detect | Local radicle NID to advertise |
| `--bridge-port` | 8777 | TCP port of the local bridge |
| `--poll-interval` | 30 | Seconds between ref polls |
| `--lora` | — | LoRa-safe defaults (delays 60,300,900 s; poll 120 s) |
---
@ -193,20 +185,19 @@ radicle-node ─TCP─ RadicleBridge ──RNS Link── RadicleBridge ─TCP
GossipRelay ──RNS Packet── GossipRelay
```
Each discovered remote bridge gets its own OS-assigned TCP listen port, so radicle-node connections always route to the correct peer. All RNS packets are chunked to ≤383 B (LoRa encrypted MTU).
Each discovered remote bridge gets its own OS-assigned TCP listen port, so radicle-node connections always route to the correct peer. The tunnel uses `RNS.Buffer` over `RNS.Channel` for ordered, reliable delivery — Reticulum handles retransmission transparently across all interface types including LoRa.
- **`identity.py`** — Ed25519 DID ↔ RNS identity; saved to `~/.radicle-rns/identity`
- **`bridge.py`** — TCP↔RNS tunnel, per-bridge port allocation, path maintenance, reconnect
- **`bridge.py`** — TCP↔RNS tunnel via `RNS.Buffer`, per-bridge port allocation, path maintenance
- **`gossip.py`** — ref-change notifications, delta broadcasts, auto-seed for unknown repos
- **`seed.py`** — dedicated radicle-node process lifecycle (separate RAD_HOME)
- **`adapter.py`** — RNS peer discovery and announce filtering
---
## Development
```sh
uv run pytest # 149 tests
uv run pytest # 97 tests
uv run pytest -x -q # stop on first failure
mypy src/ # type check
```

View File

@ -1,17 +1,6 @@
"""Radicle transport adapter for Reticulum mesh networking."""
from radicle_reticulum.identity import RadicleIdentity
from radicle_reticulum.adapter import RNSTransportAdapter
from radicle_reticulum.link import RadicleLink
from radicle_reticulum.messages import (
MessageType,
NodeAnnouncement,
InventoryAnnouncement,
RefAnnouncement,
Ping,
Pong,
decode_message,
)
from radicle_reticulum.bridge import RadicleBridge
from radicle_reticulum.gossip import GossipRelay
from radicle_reticulum.seed import SeedNode
@ -19,15 +8,6 @@ from radicle_reticulum.seed import SeedNode
__version__ = "0.1.0"
__all__ = [
"RadicleIdentity",
"RNSTransportAdapter",
"RadicleLink",
"MessageType",
"NodeAnnouncement",
"InventoryAnnouncement",
"RefAnnouncement",
"Ping",
"Pong",
"decode_message",
"RadicleBridge",
"GossipRelay",
"SeedNode",

View File

@ -1,331 +0,0 @@
"""RNS Transport Adapter for Radicle.
This is the core adapter that allows Radicle to use Reticulum as a transport.
It provides:
- Destination registration and announcement
- Incoming connection handling
- Outbound connection establishment
- Peer discovery via RNS announcements
"""
import threading
import time
from dataclasses import dataclass, field
from typing import Callable, Dict, List, Optional, Set
import RNS
from radicle_reticulum.identity import RadicleIdentity
from radicle_reticulum.link import RadicleLink, LinkState
# Radicle app name for RNS destination
APP_NAME = "radicle"
# Aspect for node connections
ASPECT_NODE = "node"
# Aspect for repository-specific endpoints
ASPECT_REPO = "repo"
# App data identifiers for filtering announces
NODE_APP_DATA_MAGIC = b"RADICLE_NODE_V1"
REPO_APP_DATA_MAGIC = b"RADICLE_REPO_V1"
@dataclass
class PeerInfo:
"""Information about a discovered peer."""
identity: RadicleIdentity
destination_hash: bytes
last_seen: float
announced_repos: Set[str] = field(default_factory=set)
@property
def age(self) -> float:
"""Seconds since last seen."""
return time.time() - self.last_seen
class RNSTransportAdapter:
"""Reticulum transport adapter for Radicle.
Provides the bridge between Radicle's network layer and Reticulum.
Manages identity, destinations, links, and peer discovery.
"""
def __init__(
self,
identity: Optional[RadicleIdentity] = None,
config_path: Optional[str] = None,
):
"""Initialize the adapter.
Args:
identity: Radicle identity to use. Generated if not provided.
config_path: Path to Reticulum config. Uses default if not provided.
"""
# Initialize Reticulum
self.reticulum = RNS.Reticulum(config_path)
# Set up identity
if identity is None:
identity = RadicleIdentity.generate()
self.identity = identity
# Create main node destination
self.node_destination = RNS.Destination(
self.identity.rns_identity,
RNS.Destination.IN,
RNS.Destination.SINGLE,
APP_NAME,
ASPECT_NODE,
)
# Set up link handling
self.node_destination.set_link_established_callback(self._on_incoming_link)
# Peer tracking
self._peers: Dict[bytes, PeerInfo] = {}
self._peers_lock = threading.Lock()
# Active links
self._links: Dict[bytes, RadicleLink] = {}
self._links_lock = threading.Lock()
# Callbacks
self._on_peer_discovered: Optional[Callable[[PeerInfo], None]] = None
self._on_incoming_connection: Optional[Callable[[RadicleLink], None]] = None
# Repository destinations (hash -> destination)
self._repo_destinations: Dict[str, RNS.Destination] = {}
RNS.log(f"RNS Transport Adapter initialized", RNS.LOG_INFO)
RNS.log(f" Node ID: {self.identity.did}", RNS.LOG_INFO)
RNS.log(f" RNS Hash: {self.identity.rns_hash_hex}", RNS.LOG_INFO)
def start(self):
"""Start the adapter and begin announcing presence."""
# Register announce handler to discover peers
RNS.Transport.register_announce_handler(self._handle_announce)
# Announce our node destination
self.announce()
RNS.log("RNS Transport Adapter started", RNS.LOG_INFO)
def stop(self):
"""Stop the adapter and clean up."""
# Close all active links
with self._links_lock:
for link in list(self._links.values()):
link.close()
self._links.clear()
RNS.log("RNS Transport Adapter stopped", RNS.LOG_INFO)
def announce(self, app_data: Optional[bytes] = None):
"""Announce this node's presence on the network.
Args:
app_data: Optional additional application data to include.
Could include repository list, capabilities, etc.
"""
# Prepend node magic for filtering, then append any extra app_data
full_app_data = NODE_APP_DATA_MAGIC
if app_data:
full_app_data += app_data
self.node_destination.announce(app_data=full_app_data)
RNS.log(f"Announced node: {self.identity.rns_hash_hex}", RNS.LOG_DEBUG)
def announce_repository(self, repo_id: str, repo_data: Optional[bytes] = None):
"""Announce availability of a specific repository.
Args:
repo_id: Repository identifier (typically a hash or DID).
repo_data: Optional metadata about the repository.
"""
if repo_id not in self._repo_destinations:
# Create destination for this repository
dest = RNS.Destination(
self.identity.rns_identity,
RNS.Destination.IN,
RNS.Destination.SINGLE,
APP_NAME,
ASPECT_REPO,
repo_id,
)
dest.set_link_established_callback(
lambda link: self._on_incoming_link(link, repo_id)
)
self._repo_destinations[repo_id] = dest
# Prepend repo magic for filtering
full_app_data = REPO_APP_DATA_MAGIC
if repo_data:
full_app_data += repo_data
self._repo_destinations[repo_id].announce(app_data=full_app_data)
RNS.log(f"Announced repository: {repo_id}", RNS.LOG_DEBUG)
def connect(
self,
destination_hash: bytes,
timeout: float = 30.0,
) -> Optional[RadicleLink]:
"""Connect to a peer by destination hash.
Args:
destination_hash: 16-byte RNS destination hash.
timeout: Connection timeout in seconds.
Returns:
RadicleLink if connection successful, None otherwise.
"""
# Check if we already have a path to this destination
if not RNS.Transport.has_path(destination_hash):
RNS.Transport.request_path(destination_hash)
# Wait for path to be established
deadline = time.time() + timeout
while not RNS.Transport.has_path(destination_hash):
if time.time() > deadline:
RNS.log(f"Path request timeout: {destination_hash.hex()}", RNS.LOG_WARNING)
return None
time.sleep(0.1)
# Get the destination
identity = RNS.Identity.recall(destination_hash)
if identity is None:
RNS.log(f"Could not recall identity for: {destination_hash.hex()}", RNS.LOG_WARNING)
return None
destination = RNS.Destination(
identity,
RNS.Destination.OUT,
RNS.Destination.SINGLE,
APP_NAME,
ASPECT_NODE,
)
# Create link
link = RadicleLink.create_outbound(destination)
# Wait for link to establish
deadline = time.time() + timeout
while link.state == LinkState.PENDING:
if time.time() > deadline:
RNS.log(f"Link timeout: {destination_hash.hex()}", RNS.LOG_WARNING)
return None
time.sleep(0.1)
if link.state != LinkState.ACTIVE:
return None
# Track the link
with self._links_lock:
self._links[destination_hash] = link
return link
def connect_to_peer(
self,
peer: PeerInfo,
timeout: float = 30.0,
) -> Optional[RadicleLink]:
"""Connect to a discovered peer.
Args:
peer: PeerInfo from peer discovery.
timeout: Connection timeout in seconds.
"""
return self.connect(peer.destination_hash, timeout)
def get_peers(self) -> List[PeerInfo]:
"""Get list of discovered peers."""
with self._peers_lock:
return list(self._peers.values())
def get_peer_by_did(self, did: str) -> Optional[PeerInfo]:
"""Look up a peer by their Radicle DID."""
with self._peers_lock:
for peer in self._peers.values():
if peer.identity.did == did:
return peer
return None
def set_on_peer_discovered(self, callback: Callable[[PeerInfo], None]):
"""Set callback for peer discovery events."""
self._on_peer_discovered = callback
def set_on_incoming_connection(self, callback: Callable[[RadicleLink], None]):
"""Set callback for incoming connections."""
self._on_incoming_connection = callback
def _handle_announce(
self,
destination_hash: bytes,
announced_identity: RNS.Identity,
app_data: Optional[bytes],
) -> None:
"""Handle incoming announce from another node."""
# Ignore our own announcements
if destination_hash == self.node_destination.hash:
return
# Filter: only accept announces with node magic in app_data
if app_data is None or not app_data.startswith(NODE_APP_DATA_MAGIC):
# Not a radicle node announce, ignore
return
# Create RadicleIdentity from announced identity
try:
radicle_id = RadicleIdentity.from_rns_identity(announced_identity)
except Exception as e:
RNS.log(f"Failed to create RadicleIdentity from announce: {e}", RNS.LOG_WARNING)
return
# Update or create peer info
with self._peers_lock:
if destination_hash in self._peers:
peer = self._peers[destination_hash]
peer.last_seen = time.time()
else:
peer = PeerInfo(
identity=radicle_id,
destination_hash=destination_hash,
last_seen=time.time(),
)
self._peers[destination_hash] = peer
RNS.log(f"Discovered peer: {radicle_id.did}", RNS.LOG_INFO)
# Notify callback
if self._on_peer_discovered:
self._on_peer_discovered(peer)
def _on_incoming_link(self, link: RNS.Link, repo_id: Optional[str] = None):
"""Handle incoming link establishment."""
radicle_link = RadicleLink.from_incoming(link)
# Track the link
remote_hash = link.get_remote_identity().hash if link.get_remote_identity() else None
if remote_hash:
with self._links_lock:
self._links[remote_hash] = radicle_link
RNS.log(f"Incoming connection established", RNS.LOG_INFO)
# Notify callback
if self._on_incoming_connection:
self._on_incoming_connection(radicle_link)
@property
def node_hash(self) -> bytes:
"""Get this node's destination hash."""
return self.node_destination.hash
@property
def node_hash_hex(self) -> str:
"""Get this node's destination hash as hex."""
return self.node_destination.hexhash

View File

@ -15,6 +15,7 @@ The bridge:
4. Remote bridges forward to their local radicle-node
"""
import io
import json
import os
import socket
@ -25,7 +26,7 @@ import threading
import time
from dataclasses import dataclass, field
from pathlib import Path
from typing import Callable, Dict, List, Optional, Set, Tuple
from typing import Any, Callable, Dict, List, Optional, Set, Tuple
import RNS
@ -43,13 +44,15 @@ ASPECT_BRIDGE = "bridge"
# App data identifier for bridge announces (used for filtering)
BRIDGE_APP_DATA_MAGIC = b"RADICLE_BRIDGE_V1"
# Buffer sizes
TCP_BUFFER_SIZE = 65536
# Read TCP in large chunks for efficiency; outbound data is chunked to
# RNS.Packet.ENCRYPTED_MDU (383 B) before sending so no single RNS.Packet
# exceeds the interface MTU on LoRa or any other constrained link.
# TCP read chunk size — large reads are fine; RNS.Channel handles chunking internally.
TCP_READ_SIZE = 32768
# Stream IDs for the bidirectional Buffer over RNS.Channel.
# Initiator (outbound): sends on stream 1, receives on stream 0.
# Responder (inbound): sends on stream 0, receives on stream 1.
_STREAM_INITIATOR_RECV = 0
_STREAM_INITIATOR_SEND = 1
@dataclass
class TunnelConnection:
@ -62,10 +65,16 @@ class TunnelConnection:
bytes_sent: int = 0
bytes_received: int = 0
active: bool = True
buf: Optional[Any] = None # io.BufferedRWPair from RNS.Buffer.create_bidirectional_buffer
def close(self):
"""Close the tunnel."""
self.active = False
if self.buf:
try:
self.buf.close()
except Exception:
pass
if self.tcp_socket:
try:
self.tcp_socket.close()
@ -506,8 +515,12 @@ class RadicleBridge:
if self._tunnel_opened_cb:
self._tunnel_opened_cb(tunnel)
rns_link.set_packet_callback(
lambda data, pkt: self._on_rns_data(tunnel_id, data)
channel = rns_link.get_channel()
tunnel.buf = RNS.Buffer.create_bidirectional_buffer(
_STREAM_INITIATOR_RECV,
_STREAM_INITIATOR_SEND,
channel,
ready_callback=lambda n: self._on_rns_data(tunnel_id, n),
)
rns_link.set_link_closed_callback(
lambda link: self._on_tunnel_closed(tunnel_id)
@ -516,13 +529,12 @@ class RadicleBridge:
self._forward_tcp_to_rns(tunnel)
def _forward_tcp_to_rns(self, tunnel: TunnelConnection):
"""Forward data from TCP socket to RNS link."""
"""Forward data from TCP socket to RNS Channel buffer."""
tcp_socket = tunnel.tcp_socket
tcp_socket.setblocking(False)
while tunnel.active and self._running:
try:
rns_link = tunnel.rns_link # read each iteration: may be updated by reconnect
readable, _, errored = select.select([tcp_socket], [], [tcp_socket], 1.0)
if errored:
@ -533,71 +545,33 @@ class RadicleBridge:
if not data:
break
if rns_link.status == RNS.Link.ACTIVE:
self._send_over_link(rns_link, data)
tunnel.bytes_sent += len(data)
elif tunnel.remote_destination:
RNS.log(
f"Tunnel {tunnel.tunnel_id}: link dropped, reconnecting...",
RNS.LOG_WARNING,
)
new_link = self._reconnect_link(tunnel.remote_destination)
if new_link is None:
RNS.log(
f"Tunnel {tunnel.tunnel_id}: reconnect failed",
RNS.LOG_WARNING,
)
break
RNS.log(
f"Tunnel {tunnel.tunnel_id}: reconnected",
RNS.LOG_INFO,
)
tunnel.rns_link = new_link
tid = tunnel.tunnel_id
new_link.set_packet_callback(
lambda d, p: self._on_rns_data(tid, d)
)
new_link.set_link_closed_callback(
lambda l: self._on_tunnel_closed(tid)
)
self._send_over_link(new_link, data)
tunnel.bytes_sent += len(data)
else:
break
tunnel.buf.write(data)
tunnel.buf.flush()
tunnel.bytes_sent += len(data)
except socket.error:
break
except Exception as e:
RNS.log(f"Forward error: {e}", RNS.LOG_DEBUG)
RNS.log(f"Tunnel {tunnel.tunnel_id} forward error: {e}", RNS.LOG_DEBUG)
break
self._on_tunnel_closed(tunnel.tunnel_id)
@staticmethod
def _send_over_link(link: RNS.Link, data: bytes):
"""Send data over an RNS link, chunking to ENCRYPTED_MDU if needed.
RNS.Packet raises IOError if data exceeds the interface MTU (~383 B on
LoRa). TCP chunks can be tens of kilobytes, so we split into MDU-sized
pieces. Order is preserved the link is point-to-point and packets
from a single sender are delivered in order.
"""
mdu = RNS.Packet.ENCRYPTED_MDU
for offset in range(0, len(data), mdu):
RNS.Packet(link, data[offset:offset + mdu]).send()
def _on_rns_data(self, tunnel_id: int, data: bytes):
"""Handle data received from RNS link."""
def _on_rns_data(self, tunnel_id: int, n_bytes: int):
"""Drain n_bytes from the RNS Buffer and forward to the TCP socket."""
with self._tunnels_lock:
tunnel = self._tunnels.get(tunnel_id)
if tunnel and tunnel.active and tunnel.tcp_socket:
try:
if not (tunnel and tunnel.active and tunnel.buf and tunnel.tcp_socket):
return
try:
data = tunnel.buf.read(n_bytes)
if data:
tunnel.tcp_socket.sendall(data)
tunnel.bytes_received += len(data)
except Exception as e:
RNS.log(f"TCP send error: {e}", RNS.LOG_DEBUG)
self._on_tunnel_closed(tunnel_id)
except Exception as e:
RNS.log(f"TCP send error on tunnel {tunnel_id}: {e}", RNS.LOG_DEBUG)
self._on_tunnel_closed(tunnel_id)
def _on_tunnel_closed(self, tunnel_id: int):
"""Handle tunnel closure."""
@ -642,8 +616,12 @@ class RadicleBridge:
RNS.log(f"Incoming tunnel {tunnel_id} opened", RNS.LOG_INFO)
link.set_packet_callback(
lambda data, pkt: self._on_rns_data(tunnel_id, data)
channel = link.get_channel()
tunnel.buf = RNS.Buffer.create_bidirectional_buffer(
_STREAM_INITIATOR_SEND, # responder receives on the initiator's send stream
_STREAM_INITIATOR_RECV, # responder sends on the initiator's receive stream
channel,
ready_callback=lambda n: self._on_rns_data(tunnel_id, n),
)
link.set_link_closed_callback(
lambda l: self._on_tunnel_closed(tunnel_id)

View File

@ -13,7 +13,6 @@ DEFAULT_IDENTITY_PATH = Path.home() / ".radicle-rns" / "identity"
import RNS
from radicle_reticulum.adapter import RNSTransportAdapter, PeerInfo
from radicle_reticulum.identity import RadicleIdentity
from radicle_reticulum.bridge import RadicleBridge
from radicle_reticulum.gossip import GossipRelay
@ -49,24 +48,6 @@ def detect_radicle_nid() -> Optional[str]:
return None
LORA_ANNOUNCE_DELAYS = "60,300,900"
LORA_POLL_INTERVAL = 120
def _apply_lora_defaults(args) -> None:
"""Override delay/poll defaults with LoRa-safe values when --lora is set.
Only overrides fields that were left at their defaults (argparse default
strings), so explicit user flags always take precedence.
"""
if not getattr(args, "lora", False):
return
if getattr(args, "announce_retry_delays", None) == "5,15,30":
args.announce_retry_delays = LORA_ANNOUNCE_DELAYS
if getattr(args, "poll_interval", None) == 30:
args.poll_interval = LORA_POLL_INTERVAL
def _parse_delays(s: str) -> Tuple[int, ...]:
try:
return tuple(int(x.strip()) for x in s.split(",") if x.strip())
@ -75,61 +56,6 @@ def _parse_delays(s: str) -> Tuple[int, ...]:
sys.exit(1)
def on_peer_discovered(peer: PeerInfo):
"""Callback when a new peer is discovered."""
print(f"[+] Discovered peer: {peer.identity.did}")
print(f" RNS hash: {peer.destination_hash.hex()}")
def cmd_node(args):
"""Run a Radicle-RNS node."""
print("Starting Radicle-RNS node...")
identity = RadicleIdentity.load_or_generate(args.identity)
_print_identity_info(args.identity)
# Create adapter
adapter = RNSTransportAdapter(identity=identity)
adapter.set_on_peer_discovered(on_peer_discovered)
print(f"Node ID: {identity.did}")
print(f"RNS Hash: {adapter.node_hash_hex}")
print()
# Start adapter
adapter.start()
print("Node running. Press Ctrl+C to stop.")
print("Announcing every 60 seconds...")
print()
# Handle graceful shutdown
running = True
def signal_handler(sig, frame):
nonlocal running
print("\nShutting down...")
running = False
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
# Main loop
last_announce = 0
try:
while running:
now = time.time()
# Periodic announce
if now - last_announce > 60:
adapter.announce()
last_announce = now
time.sleep(0.5)
finally:
adapter.stop()
print("Node stopped.")
def _print_identity_info(identity_path: Path):
"""Print identity file location (new or loaded)."""
path = Path(identity_path)
@ -175,76 +101,6 @@ def cmd_identity(args):
sys.exit(1)
def cmd_ping(args):
"""Ping a peer by RNS hash."""
print(f"Connecting to {args.destination}...")
identity = RadicleIdentity.load_or_generate(args.identity)
adapter = RNSTransportAdapter(identity=identity)
adapter.start()
try:
dest_hash = bytes.fromhex(args.destination)
except ValueError:
print("Error: Invalid destination hash (must be hex)", file=sys.stderr)
sys.exit(1)
link = adapter.connect(dest_hash, timeout=args.timeout)
if link is None:
print("Failed to connect")
sys.exit(1)
print(f"Connected! RTT: {link.rtt:.3f}s" if link.rtt else "Connected!")
# Send ping
from radicle_reticulum.messages import Ping, Pong, decode_message, MessageType
import struct
ping = Ping()
ping_time = time.time()
link.send(ping.to_message())
print("Ping sent, waiting for pong...")
response = link.recv(timeout=10.0)
if response:
header, msg = decode_message(response)
if header.msg_type == MessageType.PONG:
rtt = (time.time() - ping_time) * 1000
print(f"Pong received! RTT: {rtt:.1f}ms")
else:
print(f"Unexpected response: {header.msg_type}")
else:
print("No response (timeout)")
link.close()
adapter.stop()
def cmd_peers(args):
"""List discovered peers."""
identity = RadicleIdentity.load_or_generate(args.identity)
adapter = RNSTransportAdapter(identity=identity)
adapter.set_on_peer_discovered(on_peer_discovered)
adapter.start()
print(f"Listening for peers for {args.timeout} seconds...")
print()
time.sleep(args.timeout)
peers = adapter.get_peers()
if peers:
print(f"\nDiscovered {len(peers)} peer(s):")
for peer in peers:
print(f" {peer.identity.did}")
print(f" Hash: {peer.destination_hash.hex()}")
print(f" Age: {peer.age:.1f}s")
else:
print("\nNo peers discovered.")
adapter.stop()
def _detect_rid(repo_path: Path) -> Optional[str]:
"""Detect the Radicle RID for the repo at repo_path via 'rad inspect'."""
try:
@ -264,7 +120,6 @@ def _detect_rid(repo_path: Path) -> Optional[str]:
def cmd_gossip(args):
"""Run the gossip relay daemon."""
_apply_lora_defaults(args)
identity = RadicleIdentity.load_or_generate(args.identity)
_print_identity_info(args.identity)
@ -337,7 +192,6 @@ def cmd_gossip(args):
def cmd_seed(args):
"""Start a dedicated seed radicle-node, bridge, and gossip relay."""
_apply_lora_defaults(args)
seed_home = Path(args.seed_home)
# Validate args before starting any processes
@ -460,7 +314,6 @@ def cmd_seed(args):
def cmd_bridge(args):
"""Run Radicle-Reticulum bridge."""
_apply_lora_defaults(args)
print("Starting Radicle-Reticulum bridge...")
identity = RadicleIdentity.load_or_generate(args.identity)
@ -720,10 +573,6 @@ def main():
help=f"Identity file (default: {DEFAULT_IDENTITY_PATH})",
)
# node command
node_parser = subparsers.add_parser("node", help="Run a Radicle-RNS node")
add_identity_arg(node_parser)
# identity command
id_parser = subparsers.add_parser("identity", help="Identity operations")
id_parser.add_argument(
@ -739,27 +588,6 @@ def main():
help="Overwrite existing identity file (for generate)"
)
# ping command
ping_parser = subparsers.add_parser("ping", help="Ping a peer")
ping_parser.add_argument("destination", help="RNS destination hash (hex)")
ping_parser.add_argument(
"-t", "--timeout",
type=float,
default=30.0,
help="Connection timeout (seconds)"
)
add_identity_arg(ping_parser)
# peers command
peers_parser = subparsers.add_parser("peers", help="Discover peers")
peers_parser.add_argument(
"-t", "--timeout",
type=float,
default=10.0,
help="Discovery timeout (seconds)"
)
add_identity_arg(peers_parser)
# gossip command
gossip_parser = subparsers.add_parser(
"gossip",
@ -793,14 +621,7 @@ def main():
"--announce-retry-delays",
default="5,15,30",
metavar="SECONDS",
help="Startup re-announce delays, comma-separated (default: 5,15,30 for "
"WiFi/Ethernet). On LoRa use 60,300,900 to respect duty-cycle limits.",
)
gossip_parser.add_argument(
"--lora",
action="store_true",
help="Shortcut for LoRa-safe settings: sets --announce-retry-delays=60,300,900 "
"and --poll-interval=120 (unless overridden explicitly).",
help="Startup re-announce delays in seconds, comma-separated (default: 5,15,30).",
)
add_identity_arg(gossip_parser)
@ -840,14 +661,7 @@ def main():
"--announce-retry-delays",
default="5,15,30",
metavar="SECONDS",
help="Startup re-announce delays, comma-separated (default: 5,15,30 for "
"WiFi/Ethernet). On LoRa use 60,300,900 to respect duty-cycle limits.",
)
seed_parser.add_argument(
"--lora",
action="store_true",
help="Shortcut for LoRa-safe settings: sets --announce-retry-delays=60,300,900 "
"and --poll-interval=120 (unless overridden explicitly).",
help="Startup re-announce delays in seconds, comma-separated (default: 5,15,30).",
)
add_identity_arg(seed_parser)
@ -909,16 +723,7 @@ def main():
"--announce-retry-delays",
default="5,15,30",
metavar="SECONDS",
help=(
"Comma-separated delays for startup re-announces (default: 5,15,30 for "
"WiFi/Ethernet). On LoRa use 60,300,900 to respect duty-cycle limits."
),
)
bridge_parser.add_argument(
"--lora",
action="store_true",
help="Shortcut for LoRa-safe settings: sets --announce-retry-delays=60,300,900 "
"(unless overridden explicitly).",
help="Startup re-announce delays in seconds, comma-separated (default: 5,15,30).",
)
add_identity_arg(bridge_parser)
@ -931,14 +736,8 @@ def main():
RNS.loglevel = RNS.LOG_INFO
# Dispatch command
if args.command == "node":
cmd_node(args)
elif args.command == "identity":
if args.command == "identity":
cmd_identity(args)
elif args.command == "ping":
cmd_ping(args)
elif args.command == "peers":
cmd_peers(args)
elif args.command == "gossip":
cmd_gossip(args)
elif args.command == "seed":

View File

@ -1,177 +0,0 @@
"""RNS Link wrapper providing Radicle-compatible connection interface.
Maps Radicle's Noise XK sessions to RNS Links, which provide:
- Encrypted bidirectional channels
- Forward secrecy via ephemeral ECDH
- Reliable ordered message delivery
"""
import threading
import time
from dataclasses import dataclass, field
from enum import Enum
from typing import Callable, Optional
from collections import deque
import RNS
class LinkState(Enum):
"""Connection state for RadicleLink."""
PENDING = "pending"
ACTIVE = "active"
CLOSED = "closed"
FAILED = "failed"
@dataclass
class RadicleLink:
"""Wrapper around RNS.Link providing Radicle-compatible interface.
RNS Links provide:
- ECDH key exchange (X25519)
- Symmetric encryption with forward secrecy
- Reliable delivery with automatic retransmission
This maps to Radicle's Noise XK sessions conceptually.
"""
rns_link: RNS.Link
state: LinkState = LinkState.PENDING
on_data: Optional[Callable[[bytes], None]] = None
on_close: Optional[Callable[[], None]] = None
_receive_buffer: deque = field(default_factory=deque)
_buffer_lock: threading.Lock = field(default_factory=threading.Lock)
_data_available: threading.Event = field(default_factory=threading.Event)
def __post_init__(self):
"""Set up RNS link callbacks."""
# Only set established callback for pending links (outbound)
# For already-established links (incoming), this callback already fired
if self.state == LinkState.PENDING:
self.rns_link.set_link_established_callback(self._on_established)
self.rns_link.set_link_closed_callback(self._on_closed)
self.rns_link.set_packet_callback(self._on_packet)
@classmethod
def create_outbound(
cls,
destination: RNS.Destination,
on_data: Optional[Callable[[bytes], None]] = None,
on_close: Optional[Callable[[], None]] = None,
) -> "RadicleLink":
"""Create an outbound link to a destination."""
rns_link = RNS.Link(destination)
return cls(
rns_link=rns_link,
on_data=on_data,
on_close=on_close,
)
@classmethod
def from_incoming(
cls,
rns_link: RNS.Link,
on_data: Optional[Callable[[bytes], None]] = None,
on_close: Optional[Callable[[], None]] = None,
) -> "RadicleLink":
"""Wrap an incoming RNS link."""
link = cls(
rns_link=rns_link,
state=LinkState.ACTIVE, # Incoming links are already established
on_data=on_data,
on_close=on_close,
)
return link
def _on_established(self, link: RNS.Link):
"""Called when link is established."""
self.state = LinkState.ACTIVE
RNS.log(f"Link established: {link}", RNS.LOG_DEBUG)
def _on_closed(self, link: RNS.Link):
"""Called when link is closed."""
self.state = LinkState.CLOSED
self._data_available.set() # Wake up any waiting readers
if self.on_close:
self.on_close()
RNS.log(f"Link closed: {link}", RNS.LOG_DEBUG)
def _on_packet(self, message: bytes, packet: RNS.Packet):
"""Called when a packet is received."""
with self._buffer_lock:
self._receive_buffer.append(message)
self._data_available.set()
if self.on_data:
self.on_data(message)
def send(self, data: bytes) -> bool:
"""Send data over the link.
Returns True if packet was sent successfully.
"""
if self.state != LinkState.ACTIVE:
return False
try:
packet = RNS.Packet(self.rns_link, data)
packet.send()
return True
except Exception as e:
RNS.log(f"Send failed: {e}", RNS.LOG_ERROR)
return False
def recv(self, timeout: Optional[float] = None) -> Optional[bytes]:
"""Receive data from the link.
Blocks until data is available or timeout expires.
Returns None on timeout or if link is closed.
"""
deadline = time.time() + timeout if timeout else None
while True:
with self._buffer_lock:
if self._receive_buffer:
return self._receive_buffer.popleft()
if self.state == LinkState.CLOSED:
return None
self._data_available.clear()
remaining = None
if deadline:
remaining = deadline - time.time()
if remaining <= 0:
return None
if not self._data_available.wait(timeout=remaining):
return None
def close(self):
"""Close the link."""
if self.state == LinkState.ACTIVE:
self.rns_link.teardown()
self.state = LinkState.CLOSED
@property
def is_active(self) -> bool:
"""Check if link is active."""
return self.state == LinkState.ACTIVE
@property
def remote_identity(self) -> Optional[RNS.Identity]:
"""Get the remote peer's identity if known."""
return self.rns_link.get_remote_identity()
@property
def rtt(self) -> Optional[float]:
"""Get current round-trip time estimate in seconds."""
if hasattr(self.rns_link, 'rtt') and self.rns_link.rtt:
return self.rns_link.rtt
return None
def __repr__(self) -> str:
return f"RadicleLink(state={self.state.value}, rtt={self.rtt})"

View File

@ -1,313 +0,0 @@
"""Message framing layer for Radicle protocol over RNS.
Implements the gossip message types:
- Node Announcements: Broadcast Node IDs and network addresses
- Inventory Announcements: Share repository inventories for routing
- Reference Announcements: Push repository updates to subscribers
Messages are serialized to a compact binary format suitable for
low-bandwidth Reticulum transports.
"""
import struct
import hashlib
import time
from dataclasses import dataclass, field
from enum import IntEnum
from typing import List, Optional, Set
class MessageType(IntEnum):
"""Radicle gossip message types."""
NODE_ANNOUNCEMENT = 0x01
INVENTORY_ANNOUNCEMENT = 0x02
REF_ANNOUNCEMENT = 0x03
PING = 0x10
PONG = 0x11
# Message header format: type (1 byte) + timestamp (8 bytes) + payload length (2 bytes)
HEADER_FORMAT = "!BQH"
HEADER_SIZE = struct.calcsize(HEADER_FORMAT)
# Maximum payload size (64KB - header)
MAX_PAYLOAD_SIZE = 65535 - HEADER_SIZE
@dataclass
class MessageHeader:
"""Common header for all Radicle messages."""
msg_type: MessageType
timestamp: int # Unix timestamp in milliseconds
payload_length: int
def encode(self) -> bytes:
"""Encode header to bytes."""
return struct.pack(
HEADER_FORMAT,
self.msg_type,
self.timestamp,
self.payload_length,
)
@classmethod
def decode(cls, data: bytes) -> "MessageHeader":
"""Decode header from bytes."""
if len(data) < HEADER_SIZE:
raise ValueError(f"Header too short: {len(data)} < {HEADER_SIZE}")
msg_type_raw, timestamp, payload_length = struct.unpack(
HEADER_FORMAT, data[:HEADER_SIZE]
)
try:
msg_type = MessageType(msg_type_raw)
except ValueError:
raise ValueError(f"Unknown message type: {msg_type_raw}")
return cls(
msg_type=msg_type,
timestamp=timestamp,
payload_length=payload_length,
)
@dataclass
class NodeAnnouncement:
"""Announces a node's presence and capabilities.
Broadcast periodically to enable peer discovery.
"""
node_id: str # DID (did:key:z6Mk...)
features: int = 0 # Bitmask of supported features
version: int = 1 # Protocol version
def encode(self) -> bytes:
"""Encode to bytes."""
node_id_bytes = self.node_id.encode("utf-8")
return struct.pack(
f"!HH{len(node_id_bytes)}s",
self.features,
self.version,
node_id_bytes,
)
@classmethod
def decode(cls, data: bytes) -> "NodeAnnouncement":
"""Decode from bytes."""
features, version = struct.unpack("!HH", data[:4])
node_id = data[4:].decode("utf-8")
return cls(node_id=node_id, features=features, version=version)
def to_message(self) -> bytes:
"""Wrap in a full message with header."""
payload = self.encode()
header = MessageHeader(
msg_type=MessageType.NODE_ANNOUNCEMENT,
timestamp=int(time.time() * 1000),
payload_length=len(payload),
)
return header.encode() + payload
@dataclass
class InventoryAnnouncement:
"""Announces repositories hosted by a node.
Used to build routing tables for repository discovery.
"""
node_id: str # DID of the announcing node
repositories: List[str] # List of repository IDs (hashes)
def encode(self) -> bytes:
"""Encode to bytes."""
node_id_bytes = self.node_id.encode("utf-8")
repo_data = b""
for repo_id in self.repositories:
repo_bytes = repo_id.encode("utf-8")
repo_data += struct.pack(f"!H{len(repo_bytes)}s", len(repo_bytes), repo_bytes)
return struct.pack(
f"!H{len(node_id_bytes)}sH",
len(node_id_bytes),
node_id_bytes,
len(self.repositories),
) + repo_data
@classmethod
def decode(cls, data: bytes) -> "InventoryAnnouncement":
"""Decode from bytes."""
offset = 0
# Node ID
node_id_len = struct.unpack("!H", data[offset:offset+2])[0]
offset += 2
node_id = data[offset:offset+node_id_len].decode("utf-8")
offset += node_id_len
# Repository count
repo_count = struct.unpack("!H", data[offset:offset+2])[0]
offset += 2
# Repositories
repositories = []
for _ in range(repo_count):
repo_len = struct.unpack("!H", data[offset:offset+2])[0]
offset += 2
repo_id = data[offset:offset+repo_len].decode("utf-8")
offset += repo_len
repositories.append(repo_id)
return cls(node_id=node_id, repositories=repositories)
def to_message(self) -> bytes:
"""Wrap in a full message with header."""
payload = self.encode()
header = MessageHeader(
msg_type=MessageType.INVENTORY_ANNOUNCEMENT,
timestamp=int(time.time() * 1000),
payload_length=len(payload),
)
return header.encode() + payload
@dataclass
class RefAnnouncement:
"""Announces a reference update in a repository.
Pushed to subscribers when refs change (commits, branches, etc).
"""
repository_id: str # Repository ID
ref_name: str # Reference name (e.g., "refs/heads/main")
old_oid: bytes # Previous object ID (20 bytes, all zeros if new)
new_oid: bytes # New object ID (20 bytes)
signature: bytes = b"" # Ed25519 signature of the update
def encode(self) -> bytes:
"""Encode to bytes."""
repo_bytes = self.repository_id.encode("utf-8")
ref_bytes = self.ref_name.encode("utf-8")
return struct.pack(
f"!H{len(repo_bytes)}sH{len(ref_bytes)}s20s20sH{len(self.signature)}s",
len(repo_bytes),
repo_bytes,
len(ref_bytes),
ref_bytes,
self.old_oid,
self.new_oid,
len(self.signature),
self.signature,
)
@classmethod
def decode(cls, data: bytes) -> "RefAnnouncement":
"""Decode from bytes."""
offset = 0
# Repository ID
repo_len = struct.unpack("!H", data[offset:offset+2])[0]
offset += 2
repository_id = data[offset:offset+repo_len].decode("utf-8")
offset += repo_len
# Ref name
ref_len = struct.unpack("!H", data[offset:offset+2])[0]
offset += 2
ref_name = data[offset:offset+ref_len].decode("utf-8")
offset += ref_len
# OIDs
old_oid = data[offset:offset+20]
offset += 20
new_oid = data[offset:offset+20]
offset += 20
# Signature
sig_len = struct.unpack("!H", data[offset:offset+2])[0]
offset += 2
signature = data[offset:offset+sig_len]
return cls(
repository_id=repository_id,
ref_name=ref_name,
old_oid=old_oid,
new_oid=new_oid,
signature=signature,
)
def to_message(self) -> bytes:
"""Wrap in a full message with header."""
payload = self.encode()
header = MessageHeader(
msg_type=MessageType.REF_ANNOUNCEMENT,
timestamp=int(time.time() * 1000),
payload_length=len(payload),
)
return header.encode() + payload
@dataclass
class Ping:
"""Simple ping message for keepalive/latency measurement."""
nonce: bytes = field(default_factory=lambda: struct.pack("!Q", int(time.time() * 1000)))
def encode(self) -> bytes:
return self.nonce
@classmethod
def decode(cls, data: bytes) -> "Ping":
return cls(nonce=data[:8])
def to_message(self) -> bytes:
payload = self.encode()
header = MessageHeader(
msg_type=MessageType.PING,
timestamp=int(time.time() * 1000),
payload_length=len(payload),
)
return header.encode() + payload
@dataclass
class Pong:
"""Response to ping."""
nonce: bytes # Echo back the ping nonce
def encode(self) -> bytes:
return self.nonce
@classmethod
def decode(cls, data: bytes) -> "Pong":
return cls(nonce=data[:8])
def to_message(self) -> bytes:
payload = self.encode()
header = MessageHeader(
msg_type=MessageType.PONG,
timestamp=int(time.time() * 1000),
payload_length=len(payload),
)
return header.encode() + payload
def decode_message(data: bytes):
"""Decode a message from bytes.
Returns tuple of (header, message_object).
"""
header = MessageHeader.decode(data)
payload = data[HEADER_SIZE:HEADER_SIZE + header.payload_length]
decoders = {
MessageType.NODE_ANNOUNCEMENT: NodeAnnouncement.decode,
MessageType.INVENTORY_ANNOUNCEMENT: InventoryAnnouncement.decode,
MessageType.REF_ANNOUNCEMENT: RefAnnouncement.decode,
MessageType.PING: Ping.decode,
MessageType.PONG: Pong.decode,
}
decoder = decoders.get(header.msg_type)
if decoder is None:
raise ValueError(f"Unknown message type: {header.msg_type}")
return header, decoder(payload)

View File

@ -1,206 +0,0 @@
"""Tests for RNSTransportAdapter peer discovery logic (RNS networking mocked)."""
import time
from unittest.mock import MagicMock, patch, call
import pytest
import RNS
from radicle_reticulum.adapter import (
PeerInfo,
RNSTransportAdapter,
NODE_APP_DATA_MAGIC,
REPO_APP_DATA_MAGIC,
)
from radicle_reticulum.identity import RadicleIdentity
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _make_dest_mock(hash_bytes: bytes = b"\xaa" * 16) -> MagicMock:
dest = MagicMock()
dest.hash = hash_bytes
dest.hexhash = hash_bytes.hex()
return dest
def _make_adapter() -> RNSTransportAdapter:
"""Instantiate adapter with all RNS I/O patched out."""
identity = RadicleIdentity.generate()
dest_mock = _make_dest_mock()
with patch("radicle_reticulum.adapter.RNS.Reticulum"), \
patch("radicle_reticulum.adapter.RNS.Destination", return_value=dest_mock), \
patch("radicle_reticulum.adapter.RNS.log"):
adapter = RNSTransportAdapter(identity=identity)
return adapter
# ---------------------------------------------------------------------------
# PeerInfo
# ---------------------------------------------------------------------------
class TestPeerInfo:
def test_age_increases_over_time(self):
identity = RadicleIdentity.generate()
peer = PeerInfo(
identity=identity,
destination_hash=b"\x01" * 16,
last_seen=time.time() - 5.0,
)
assert peer.age >= 5.0
def test_age_near_zero_for_fresh_peer(self):
identity = RadicleIdentity.generate()
peer = PeerInfo(
identity=identity,
destination_hash=b"\x02" * 16,
last_seen=time.time(),
)
assert peer.age < 1.0
def test_announced_repos_defaults_empty(self):
identity = RadicleIdentity.generate()
peer = PeerInfo(identity=identity, destination_hash=b"\x03" * 16, last_seen=0)
assert peer.announced_repos == set()
# ---------------------------------------------------------------------------
# Adapter construction & peer list
# ---------------------------------------------------------------------------
class TestAdapterConstruction:
def test_identity_is_stored(self):
adapter = _make_adapter()
assert adapter.identity is not None
assert adapter.identity.did.startswith("did:key:")
def test_initial_peer_list_is_empty(self):
adapter = _make_adapter()
assert adapter.get_peers() == []
def test_get_peer_by_did_returns_none_when_absent(self):
adapter = _make_adapter()
assert adapter.get_peer_by_did("did:key:z6Mkunknown") is None
# ---------------------------------------------------------------------------
# _handle_announce
# ---------------------------------------------------------------------------
class TestHandleAnnounce:
def _make_rns_identity(self) -> RNS.Identity:
"""Create a real RNS.Identity (keypair only, no networking)."""
return RNS.Identity()
def test_ignores_own_announce(self):
adapter = _make_adapter()
own_hash = adapter.node_destination.hash # the mock's hash
discovered = []
adapter.set_on_peer_discovered(discovered.append)
rns_id = self._make_rns_identity()
with patch("radicle_reticulum.adapter.RNS.log"):
adapter._handle_announce(own_hash, rns_id, NODE_APP_DATA_MAGIC)
assert discovered == []
assert adapter.get_peers() == []
def test_ignores_non_radicle_announce(self):
adapter = _make_adapter()
rns_id = self._make_rns_identity()
foreign_hash = b"\xbb" * 16
discovered = []
adapter.set_on_peer_discovered(discovered.append)
with patch("radicle_reticulum.adapter.RNS.log"):
adapter._handle_announce(foreign_hash, rns_id, b"SOME_OTHER_APP")
assert discovered == []
def test_ignores_announce_with_no_app_data(self):
adapter = _make_adapter()
rns_id = self._make_rns_identity()
with patch("radicle_reticulum.adapter.RNS.log"):
adapter._handle_announce(b"\xcc" * 16, rns_id, None)
assert adapter.get_peers() == []
def test_discovers_valid_peer(self):
adapter = _make_adapter()
rns_id = self._make_rns_identity()
peer_hash = b"\xdd" * 16
discovered = []
adapter.set_on_peer_discovered(discovered.append)
with patch("radicle_reticulum.adapter.RNS.log"):
adapter._handle_announce(peer_hash, rns_id, NODE_APP_DATA_MAGIC)
assert len(discovered) == 1
peers = adapter.get_peers()
assert len(peers) == 1
assert peers[0].destination_hash == peer_hash
def test_second_announce_updates_last_seen_not_duplicates(self):
adapter = _make_adapter()
rns_id = self._make_rns_identity()
peer_hash = b"\xee" * 16
discovered = []
adapter.set_on_peer_discovered(discovered.append)
with patch("radicle_reticulum.adapter.RNS.log"):
adapter._handle_announce(peer_hash, rns_id, NODE_APP_DATA_MAGIC)
old_seen = adapter.get_peers()[0].last_seen
time.sleep(0.01)
adapter._handle_announce(peer_hash, rns_id, NODE_APP_DATA_MAGIC)
# Callback only fires once (first discovery)
assert len(discovered) == 1
# But last_seen was refreshed
assert adapter.get_peers()[0].last_seen >= old_seen
def test_multiple_distinct_peers_are_all_tracked(self):
adapter = _make_adapter()
with patch("radicle_reticulum.adapter.RNS.log"):
for i in range(3):
rns_id = self._make_rns_identity()
peer_hash = bytes([i]) * 16
adapter._handle_announce(peer_hash, rns_id, NODE_APP_DATA_MAGIC)
assert len(adapter.get_peers()) == 3
def test_get_peer_by_did_returns_correct_peer(self):
adapter = _make_adapter()
rns_id = self._make_rns_identity()
peer_hash = b"\xff" * 16
with patch("radicle_reticulum.adapter.RNS.log"):
adapter._handle_announce(peer_hash, rns_id, NODE_APP_DATA_MAGIC)
peers = adapter.get_peers()
did = peers[0].identity.did
found = adapter.get_peer_by_did(did)
assert found is not None
assert found.destination_hash == peer_hash
def test_extra_app_data_after_magic_still_accepted(self):
"""Announces with extra bytes after the magic prefix are valid."""
adapter = _make_adapter()
rns_id = self._make_rns_identity()
peer_hash = b"\x11" * 16
with patch("radicle_reticulum.adapter.RNS.log"):
adapter._handle_announce(
peer_hash, rns_id, NODE_APP_DATA_MAGIC + b"\x00\x01extra"
)
assert len(adapter.get_peers()) == 1

View File

@ -296,72 +296,64 @@ class TestBridgeDiscoveryIntegration:
class TestTCPTunnelIntegration:
"""Test bidirectional data forwarding through the bridge's tunnel layer."""
def test_data_forwarded_tcp_to_rns_link(self):
"""Data written to the TCP socket should be sent as an RNS.Packet."""
def test_data_forwarded_tcp_to_rns_buffer(self):
"""Data written to the TCP socket should be written to the RNS Buffer."""
bridge = _make_bridge()
bridge._running = True
# Create a real socket pair to simulate radicle-node ↔ bridge TCP
local, remote = socket.socketpair()
sent_data = []
mock_link = MagicMock()
mock_link.status = MagicMock()
mock_link.status.__eq__ = lambda s, other: other == "ACTIVE"
written_data = []
mock_buf = MagicMock()
mock_buf.write.side_effect = lambda d: written_data.append(bytes(d))
mock_packet = MagicMock()
from radicle_reticulum.bridge import TunnelConnection
tunnel = TunnelConnection(
tunnel_id=1,
tcp_socket=local,
rns_link=MagicMock(),
remote_destination=b"\x00" * 16,
buf=mock_buf,
)
with patch("radicle_reticulum.bridge.RNS.Link.ACTIVE", "ACTIVE"), \
patch("radicle_reticulum.bridge.RNS.Packet") as mock_pkt_cls, \
patch("radicle_reticulum.bridge.RNS.log"):
mock_pkt_cls.side_effect = lambda lnk, data: sent_data.append(data) or MagicMock()
mock_pkt_cls.ENCRYPTED_MDU = 383 # must be set so chunking uses the real value
from radicle_reticulum.bridge import TunnelConnection
tunnel = TunnelConnection(
tunnel_id=1,
tcp_socket=local,
rns_link=mock_link,
remote_destination=b"\x00" * 16,
)
# Write data from the "radicle-node" side
payload = b"hello from radicle-node"
remote.sendall(payload)
remote.close() # triggers EOF so forward loop exits
payload = b"hello from radicle-node"
remote.sendall(payload)
remote.close()
with patch("radicle_reticulum.bridge.RNS.log"):
bridge._forward_tcp_to_rns(tunnel)
local.close()
# Payload fits in one MDU chunk, so it should arrive as a single packet
assert sent_data == [payload], \
f"Expected [{payload!r}], got {sent_data}"
assert b"".join(written_data) == payload
def test_rns_data_forwarded_to_tcp_socket(self):
"""Data received from RNS should be written to the TCP socket."""
"""Data received from RNS Buffer should be written to the TCP socket."""
bridge = _make_bridge()
bridge._running = True
local, remote = socket.socketpair()
payload = b"hello from remote radicle-node"
mock_buf = MagicMock()
mock_buf.read.return_value = payload
from radicle_reticulum.bridge import TunnelConnection
tunnel = TunnelConnection(
tunnel_id=2,
tcp_socket=local,
rns_link=MagicMock(),
remote_destination=b"\x00" * 16,
buf=mock_buf,
)
with bridge._tunnels_lock:
bridge._tunnels[2] = tunnel
with patch("radicle_reticulum.bridge.RNS.log"):
# Create socket pair: bridge writes to `local`, test reads from `remote`
local, remote = socket.socketpair()
bridge._on_rns_data(2, len(payload))
from radicle_reticulum.bridge import TunnelConnection
tunnel = TunnelConnection(
tunnel_id=2,
tcp_socket=local,
rns_link=MagicMock(),
remote_destination=b"\x00" * 16,
)
with bridge._tunnels_lock:
bridge._tunnels[2] = tunnel
payload = b"hello from remote radicle-node"
bridge._on_rns_data(2, payload)
received = remote.recv(1024)
local.close()
remote.close()
received = remote.recv(1024)
local.close()
remote.close()
assert received == payload

View File

@ -1,196 +0,0 @@
"""Tests for RadicleLink (pure logic — no RNS networking required)."""
import threading
import time
from unittest.mock import MagicMock, patch
import pytest
from radicle_reticulum.link import RadicleLink, LinkState
def make_link(state: LinkState = LinkState.ACTIVE) -> tuple[RadicleLink, MagicMock]:
"""Create a RadicleLink with a mock RNS.Link."""
mock_rns_link = MagicMock()
mock_rns_link.rtt = None
link = RadicleLink(rns_link=mock_rns_link, state=state)
return link, mock_rns_link
class TestRadicleLinkState:
def test_active_link_is_active(self):
link, _ = make_link(LinkState.ACTIVE)
assert link.is_active
assert link.state == LinkState.ACTIVE
def test_pending_link_is_not_active(self):
link, _ = make_link(LinkState.PENDING)
assert not link.is_active
def test_on_established_sets_active(self):
link, _ = make_link(LinkState.PENDING)
link._on_established(MagicMock())
assert link.state == LinkState.ACTIVE
assert link.is_active
def test_on_closed_sets_closed(self):
link, _ = make_link(LinkState.ACTIVE)
link._on_closed(MagicMock())
assert link.state == LinkState.CLOSED
assert not link.is_active
def test_close_calls_teardown(self):
link, mock_rns = make_link(LinkState.ACTIVE)
link.close()
mock_rns.teardown.assert_called_once()
assert link.state == LinkState.CLOSED
def test_close_on_inactive_link_is_noop(self):
link, mock_rns = make_link(LinkState.CLOSED)
link.close()
mock_rns.teardown.assert_not_called()
def test_repr(self):
link, _ = make_link(LinkState.ACTIVE)
r = repr(link)
assert "active" in r
assert "RadicleLink" in r
class TestRadicleLinkSend:
def test_send_on_active_link_succeeds(self):
link, _ = make_link(LinkState.ACTIVE)
with patch("radicle_reticulum.link.RNS.Packet") as mock_packet_cls:
mock_packet = MagicMock()
mock_packet_cls.return_value = mock_packet
result = link.send(b"hello world")
assert result is True
mock_packet.send.assert_called_once()
def test_send_on_inactive_link_returns_false(self):
link, _ = make_link(LinkState.CLOSED)
result = link.send(b"data")
assert result is False
def test_send_on_pending_link_returns_false(self):
link, _ = make_link(LinkState.PENDING)
result = link.send(b"data")
assert result is False
def test_send_exception_returns_false(self):
link, _ = make_link(LinkState.ACTIVE)
with patch("radicle_reticulum.link.RNS.Packet") as mock_packet_cls:
mock_packet_cls.side_effect = RuntimeError("send failed")
result = link.send(b"data")
assert result is False
class TestRadicleLinkRecv:
def test_recv_returns_buffered_data(self):
link, _ = make_link()
link._on_packet(b"buffered", MagicMock())
result = link.recv(timeout=0.1)
assert result == b"buffered"
def test_recv_returns_in_order(self):
link, _ = make_link()
for i in range(3):
link._on_packet(f"msg{i}".encode(), MagicMock())
assert link.recv(timeout=0.1) == b"msg0"
assert link.recv(timeout=0.1) == b"msg1"
assert link.recv(timeout=0.1) == b"msg2"
def test_recv_timeout_returns_none(self):
link, _ = make_link()
start = time.time()
result = link.recv(timeout=0.05)
elapsed = time.time() - start
assert result is None
assert elapsed >= 0.04
def test_recv_woken_by_data(self):
link, _ = make_link()
results = []
def delayed_send():
time.sleep(0.05)
link._on_packet(b"delayed", MagicMock())
t = threading.Thread(target=delayed_send, daemon=True)
t.start()
result = link.recv(timeout=1.0)
t.join()
assert result == b"delayed"
def test_recv_returns_none_when_closed(self):
link, _ = make_link()
link._on_closed(MagicMock())
result = link.recv(timeout=0.1)
assert result is None
def test_recv_woken_by_close(self):
"""recv() unblocks when link closes while waiting."""
link, _ = make_link()
result_holder = []
def close_after_delay():
time.sleep(0.05)
link._on_closed(MagicMock())
t = threading.Thread(target=close_after_delay, daemon=True)
t.start()
result = link.recv(timeout=2.0)
t.join()
assert result is None
def test_on_packet_calls_on_data_callback(self):
link, _ = make_link()
received = []
link.on_data = received.append
link._on_packet(b"callback data", MagicMock())
assert received == [b"callback data"]
def test_on_closed_calls_on_close_callback(self):
link, _ = make_link()
called = []
link.on_close = lambda: called.append(True)
link._on_closed(MagicMock())
assert called == [True]
class TestRadicleLinkProperties:
def test_rtt_returns_none_when_unavailable(self):
link, mock_rns = make_link()
mock_rns.rtt = None
assert link.rtt is None
def test_rtt_returns_value_when_available(self):
link, mock_rns = make_link()
mock_rns.rtt = 0.15
assert link.rtt == pytest.approx(0.15)
def test_remote_identity_delegates_to_rns(self):
link, mock_rns = make_link()
fake_id = MagicMock()
mock_rns.get_remote_identity.return_value = fake_id
assert link.remote_identity is fake_id
class TestRadicleLinkFactories:
def test_from_incoming_is_active(self):
mock_rns = MagicMock()
link = RadicleLink.from_incoming(mock_rns)
assert link.state == LinkState.ACTIVE
def test_from_incoming_does_not_set_established_callback(self):
mock_rns = MagicMock()
RadicleLink.from_incoming(mock_rns)
mock_rns.set_link_established_callback.assert_not_called()
def test_create_outbound_is_pending(self):
mock_dest = MagicMock()
with patch("radicle_reticulum.link.RNS.Link") as mock_link_cls:
mock_rns = MagicMock()
mock_link_cls.return_value = mock_rns
link = RadicleLink.create_outbound(mock_dest)
assert link.state == LinkState.PENDING

View File

@ -1,192 +0,0 @@
"""Tests for message framing."""
import pytest
import time
from radicle_reticulum.messages import (
MessageType,
MessageHeader,
NodeAnnouncement,
InventoryAnnouncement,
RefAnnouncement,
Ping,
Pong,
decode_message,
HEADER_SIZE,
)
class TestMessageHeader:
"""Test message header encoding/decoding."""
def test_encode_decode_roundtrip(self):
"""Test header encode/decode roundtrip."""
header = MessageHeader(
msg_type=MessageType.NODE_ANNOUNCEMENT,
timestamp=1234567890123,
payload_length=42,
)
encoded = header.encode()
assert len(encoded) == HEADER_SIZE
decoded = MessageHeader.decode(encoded)
assert decoded.msg_type == header.msg_type
assert decoded.timestamp == header.timestamp
assert decoded.payload_length == header.payload_length
def test_header_too_short(self):
"""Test that short data raises error."""
with pytest.raises(ValueError, match="Header too short"):
MessageHeader.decode(b"\x00\x01")
class TestNodeAnnouncement:
"""Test NodeAnnouncement message."""
def test_encode_decode_roundtrip(self):
"""Test encode/decode roundtrip."""
msg = NodeAnnouncement(
node_id="did:key:z6MkhaXgBZDvotDkL5257faiztiGiC2QtKLGpbnnEGta2doK",
features=0x0003,
version=1,
)
encoded = msg.encode()
decoded = NodeAnnouncement.decode(encoded)
assert decoded.node_id == msg.node_id
assert decoded.features == msg.features
assert decoded.version == msg.version
def test_to_message_includes_header(self):
"""Test that to_message includes proper header."""
msg = NodeAnnouncement(node_id="did:key:z6Mk...")
full_message = msg.to_message()
header = MessageHeader.decode(full_message)
assert header.msg_type == MessageType.NODE_ANNOUNCEMENT
assert header.timestamp > 0
assert header.payload_length == len(msg.encode())
class TestInventoryAnnouncement:
"""Test InventoryAnnouncement message."""
def test_encode_decode_roundtrip(self):
"""Test encode/decode roundtrip."""
msg = InventoryAnnouncement(
node_id="did:key:z6MkhaXgBZDvotDkL5257faiztiGiC2QtKLGpbnnEGta2doK",
repositories=[
"rad:z3gqcJUoA1n9HaHKufZs5FCSGazv5",
"rad:z4gqcJUoA1n9HaHKufZs5FCSGazv6",
],
)
encoded = msg.encode()
decoded = InventoryAnnouncement.decode(encoded)
assert decoded.node_id == msg.node_id
assert decoded.repositories == msg.repositories
def test_empty_repositories(self):
"""Test with empty repository list."""
msg = InventoryAnnouncement(
node_id="did:key:z6Mk...",
repositories=[],
)
encoded = msg.encode()
decoded = InventoryAnnouncement.decode(encoded)
assert decoded.repositories == []
class TestRefAnnouncement:
"""Test RefAnnouncement message."""
def test_encode_decode_roundtrip(self):
"""Test encode/decode roundtrip."""
msg = RefAnnouncement(
repository_id="rad:z3gqcJUoA1n9HaHKufZs5FCSGazv5",
ref_name="refs/heads/main",
old_oid=b"\x00" * 20,
new_oid=bytes.fromhex("abc123def456789012345678901234567890abcd"),
signature=b"fake_signature_bytes",
)
encoded = msg.encode()
decoded = RefAnnouncement.decode(encoded)
assert decoded.repository_id == msg.repository_id
assert decoded.ref_name == msg.ref_name
assert decoded.old_oid == msg.old_oid
assert decoded.new_oid == msg.new_oid
assert decoded.signature == msg.signature
class TestPingPong:
"""Test Ping/Pong messages."""
def test_ping_encode_decode(self):
"""Test Ping encode/decode."""
ping = Ping()
encoded = ping.encode()
decoded = Ping.decode(encoded)
assert decoded.nonce == ping.nonce
def test_pong_echoes_nonce(self):
"""Test Pong echoes ping nonce."""
ping = Ping()
pong = Pong(nonce=ping.nonce)
assert pong.nonce == ping.nonce
class TestDecodeMessage:
"""Test the decode_message function."""
def test_decode_node_announcement(self):
"""Test decoding a NodeAnnouncement message."""
msg = NodeAnnouncement(node_id="did:key:z6Mk...")
full_message = msg.to_message()
header, decoded = decode_message(full_message)
assert header.msg_type == MessageType.NODE_ANNOUNCEMENT
assert isinstance(decoded, NodeAnnouncement)
assert decoded.node_id == msg.node_id
def test_decode_inventory_announcement(self):
"""Test decoding an InventoryAnnouncement message."""
msg = InventoryAnnouncement(
node_id="did:key:z6Mk...",
repositories=["repo1", "repo2"],
)
full_message = msg.to_message()
header, decoded = decode_message(full_message)
assert header.msg_type == MessageType.INVENTORY_ANNOUNCEMENT
assert isinstance(decoded, InventoryAnnouncement)
assert decoded.repositories == msg.repositories
def test_decode_ping(self):
"""Test decoding a Ping message."""
ping = Ping()
full_message = ping.to_message()
header, decoded = decode_message(full_message)
assert header.msg_type == MessageType.PING
assert isinstance(decoded, Ping)
def test_unknown_message_type_raises(self):
"""Test that unknown message types raise error."""
# Create a message with invalid type
import struct
bad_message = struct.pack("!BQH", 0xFF, 0, 0)
with pytest.raises(ValueError, match="Unknown message type"):
decode_message(bad_message)