Compare commits

..

No commits in common. "a14f39f93a01300ef169b5650c3f4a0e40acc6e6" and "f71b87e34ae4855a4d1b3cf46a82ba453800989e" have entirely different histories.

14 changed files with 2346 additions and 649 deletions

253
README.md
View File

@ -1,104 +1,208 @@
# radicle-reticulum # radicle-reticulum
Bridges [Radicle](https://radicle.xyz) (decentralized Git) over [Reticulum](https://reticulum.network) mesh networking — LoRa, packet radio, serial, I2P, and more. Bridges [Radicle](https://radicle.xyz) (decentralized Git) over [Reticulum](https://reticulum.network) mesh networking — LoRa, packet radio, serial, I2P, and more. Enables offline-first code collaboration without internet infrastructure.
`rad push` and `rad fetch` work normally. The bridge is transparent: radicle-node sees ordinary TCP peers, unaware it is talking over a mesh. **Why:** Radicle requires publicly reachable seed nodes; Reticulum routes over any physical medium. Both use Ed25519 keys — a natural fit.
--- ---
## Prerequisites ## Prerequisites
- **Radicle**`rad` CLI + `radicle-node`: [radicle.xyz/install](https://radicle.xyz/install) Both machines need:
- **uv**: [docs.astral.sh/uv](https://docs.astral.sh/uv/getting-started/installation/) (manages Python and dependencies) - [Radicle](https://radicle.xyz/install) (`rad` CLI + `radicle-node`)
- [uv](https://docs.astral.sh/uv/getting-started/installation/) (Python package manager)
- Git
--- ---
## Install ## Install
On **each machine**, clone this repo and install:
```sh ```sh
git clone rad:z4NMdcKbw2TETQ56fbQfbibFHtZqZ git clone rad:z4NMdcKbw2TETQ56fbQfbibFHtZqZ # via radicle
# or: git clone https://github.com/youruser/radicle-reticulum
cd radicle-reticulum cd radicle-reticulum
uv sync uv sync
``` ```
Optional — faster push detection on Linux/macOS (inotify):
```sh
uv sync --extra watch
```
--- ---
## Quick start ## Setup: connect two machines over mesh
Do this on **every machine**. Do this once on each machine.
### 1. Reticulum connectivity ### Step 1 — Configure radicle-node to listen on localhost
On the same LAN, Reticulum discovers peers automatically via multicast — no configuration needed. Edit `~/.radicle/config.json`, find the `"node"` section, set `"listen"`:
For other transports (internet, LoRa, packet radio, I2P) configure the appropriate Reticulum interface in `~/.reticulum/config` first. If this is your first time using Reticulum, generate the annotated example config:
```sh
rnsd --exampleconfig > ~/.reticulum/config
```
Then edit it to enable the interface for your transport. See [Reticulum interfaces](#reticulum-interfaces) below and the [Reticulum manual](https://reticulum.network/manual/) for details. Once Reticulum can reach between your machines the bridge works the same regardless of transport.
### 2. Configure radicle-node to listen on localhost
Edit `~/.radicle/config.json`:
```json ```json
"node": { "node": {
"listen": ["127.0.0.1:8776"] "listen": ["127.0.0.1:8776"],
...
} }
``` ```
Restart: Then (re)start radicle-node:
```sh ```sh
rad node start rad node start
rad node status # should show "listening … 127.0.0.1:8776" rad node status # confirm: "listening for inbound connections on 127.0.0.1:8776"
``` ```
### 3. Start the bridge on both machines ### Step 2 — Start the bridge on both machines
Run on each machine:
```sh ```sh
uv run radicle-rns bridge uv run radicle-rns bridge
``` ```
The bridges announce themselves over RNS and discover each other within about a minute. When a peer is found its radicle NID is registered automatically: Within ~30 seconds the bridges discover each other via RNS announce, connect automatically, and register each other's NIDs with radicle-node. You'll see:
``` ```
[+] Discovered bridge: <hash> (NID: z6Mk...) [+] Discovered bridge: <hash> (NID: z6Mk...)
[Status] Tunnels: 0, Remote bridges: 1, TX: 0, RX: 0
``` ```
Once radicle-node connects through the tunnel: Once radicle-node connects through the bridge, tunnels open automatically:
``` ```
Tunnel 1 opened Tunnel 1 opened / Incoming tunnel 1 opened
[Status] Tunnels: 1, Remote bridges: 1, TX: 1551, RX: 1831 [Status] Tunnels: 1, Remote bridges: 1, TX: 1551, RX: 1831
``` ```
### 4. Use radicle normally Bytes in TX/RX confirm radicle gossip is flowing over the mesh.
> **LoRa note:** The bridge re-announces at t+5s, t+15s, t+30s after startup. On LoRa,
> use `--announce-retry-delays 60,300,900` to respect duty cycle limits.
---
## Share a repository
### Machine A — init and push a repo
```sh ```sh
# Machine A mkdir myproject && cd myproject
rad init --name myproject --description "" --default-branch main git init
rad push # prints RID: rad:z3... git commit --allow-empty -m "init"
rad init --name myproject --description "my project" --default-branch main
# Machine B
rad clone rad:z3...
``` ```
That's it. `rad push`, `rad fetch`, and `rad sync` all work as usual — they talk to the local daemon, which syncs through the bridge. `rad init` prints the repository ID (`rad:z3...`). Share it with Machine B.
```sh
# make a commit and push
echo "hello" > hello.txt
git add . && git commit -m "hello"
rad push
```
### Machine B — clone it
```sh
rad clone rad:z3... # use the RID from Machine A
cd myproject
cat hello.txt # hello
```
### Machine A — fetch updates from Machine B
```sh
# Machine B: edit, commit, push
echo "world" >> hello.txt
git add . && git commit -m "world"
rad push
# Machine A:
rad fetch
git pull
```
---
## Commands
```
radicle-rns bridge # TCP↔RNS bridge (main command)
radicle-rns node # lightweight peer-announce node
radicle-rns peers # discover peers on the mesh
radicle-rns ping <hash> # RTT probe to a peer
radicle-rns identity generate # create/show identity
radicle-rns sync <repo> # LXMF store-and-forward sync
radicle-rns bundle create <repo> # pack repo into a bundle file
radicle-rns bundle apply <bundle> <repo> # unpack a bundle into a repo
radicle-rns bundle info <bundle> # inspect bundle metadata
radicle-rns bundle qr-encode <bundle> # print ASCII QR (≤2953 bytes)
radicle-rns bundle qr-decode <image.png> # decode QR back to bundle
```
Global flags: `-v` verbose logging, `--identity PATH` (default `~/.radicle-rns/identity`).
### Bridge flags
| Flag | Default | Description |
|------|---------|-------------|
| `-l, --listen-port` | 8777 | TCP port radicle-node connects to |
| `--radicle-port` | 8776 | Port radicle-node listens on |
| `-c, --connect <hash>` | — | Connect to a remote bridge by RNS hash |
| `--nid <NID>` | auto-detect | Override local radicle NID |
| `--no-auto-connect` | — | Disable auto-connect on discovery |
| `--no-auto-seed` | — | Disable auto-registering remote NIDs |
---
## Air-gapped / QR transfer
For links too slow even for Reticulum, transfer tiny incremental bundles via QR code (max 2953 bytes):
```sh
# Sender — create a small incremental bundle and encode it
uv run radicle-rns bundle create ./myrepo --incremental --basis myrepo.refs.json
uv run radicle-rns bundle qr-encode myrepo-*.radicle-bundle # prints ASCII QR to terminal
# Receiver — photograph the QR, then decode and apply
uv run radicle-rns bundle qr-decode qr-photo.png -o received.radicle-bundle
uv run radicle-rns bundle apply received.radicle-bundle ./myrepo
```
QR image output (PNG) and image decoding require optional deps:
```sh
uv sync --extra qr # qrcode for PNG output
pip install pillow pyzbar # for qr-decode from image file
```
---
## Architecture
```
radicle-node ──TCP:8777── RadicleBridge ──RNS Link── RadicleBridge ──TCP:8776── radicle-node
(Machine A) (Machine A) (Machine B) (Machine B)
```
- **Identity** (`identity.py`) — Ed25519 DID ↔ RNS destination; saved to `~/.radicle-rns/identity`
- **Bridge** (`bridge.py`) — TCP↔RNS tunnel, announces itself, discovers peers
- **SyncManager** (`sync.py`) — LXMF store-and-forward bundles; auto-pushes on refs announce
- **AdaptiveSyncManager** (`adaptive.py`) — selects FULL/INCREMENTAL/MINIMAL/QR by RTT + throughput
- **GitBundle** (`git_bundle.py`) — full and incremental Git bundles for delay-tolerant transfer
- **QR** (`qr.py`) — visual air-gap transfer for tiny bundles
---
## Development
```sh
uv run pytest # 158 tests
uv run pytest -x -q # stop on first failure
```
--- ---
## Reticulum interfaces ## Reticulum interfaces
For LoRa, serial, or I2P, edit `~/.reticulum/config`: On the same LAN, Reticulum auto-discovers peers via UDP multicast — no config needed. For LoRa / serial / I2P, edit `~/.reticulum/config`:
```ini ```ini
[[lora_interface]] [[lora_interface]]
@ -110,65 +214,4 @@ For LoRa, serial, or I2P, edit `~/.reticulum/config`:
codingrate = 5 codingrate = 5
``` ```
See the [Reticulum manual](https://reticulum.network/manual/) for the full interface list. See [Reticulum docs](https://reticulum.network/manual/) for the full interface list.
Initial clones over LoRa are impractical (pack objects can be megabytes; LoRa is ~15 kbps). Clone over a fast link first, then sync incrementally over the mesh.
---
## Commands
```
radicle-rns bridge # TCP↔RNS bridge
radicle-rns gossip [RID ...] # ref-change relay (auto-detected from CWD)
radicle-rns setup # check prerequisites
radicle-rns identity generate # create identity
radicle-rns identity info # show DID and RNS hash
```
Global flags: `-v` verbose logging, `--identity PATH` (default `~/.radicle-rns/identity`).
### bridge flags
| Flag | Default | Description |
|------|---------|-------------|
| `-l, --listen-port` | 8777 | Base TCP port for incoming radicle-node connections |
| `--radicle-port` | 8776 | Port radicle-node listens on |
| `-c, --connect <hash>` | — | Connect to a specific bridge by RNS hash |
| `--nid <NID>` | auto-detect | Local radicle NID to announce |
| `--no-auto-connect` | — | Disable auto-connect on discovery |
| `--no-auto-seed` | — | Disable auto-registering remote NIDs |
| `--announce-retry-delays` | 5,15,30 | Startup re-announce delays (seconds, comma-separated) |
### gossip flags
| Flag | Default | Description |
|------|---------|-------------|
| `--nid` | auto-detect | Local radicle NID to advertise |
| `--bridge-port` | 8777 | TCP port of the local bridge |
| `--poll-interval` | 30 | Seconds between ref polls |
---
## How it works
```
radicle-node ─TCP─ RadicleBridge ──RNS Link── RadicleBridge ─TCP─ radicle-node
(Machine A) (Machine A) (Machine B) (Machine B)
│ │
GossipRelay ──RNS Packet── GossipRelay
```
The bridge tunnels radicle-node's TCP stream over an `RNS.Buffer` — an ordered, reliable channel that works across all Reticulum interfaces including LoRa. Reticulum handles peer discovery, routing, encryption, and retransmission transparently.
The gossip relay is a lightweight side-channel (~300 bytes per event) that wakes peers when refs change, useful when the bridge TCP session is not yet live.
---
## Development
```sh
uv run pytest # 97 tests
uv run pytest -x -q # stop on first failure
mypy src/
```

View File

@ -1,12 +1,34 @@
"""Radicle transport adapter for Reticulum mesh networking.""" """Radicle transport adapter for Reticulum mesh networking."""
from radicle_reticulum.identity import RadicleIdentity from radicle_reticulum.identity import RadicleIdentity
from radicle_reticulum.adapter import RNSTransportAdapter
from radicle_reticulum.link import RadicleLink
from radicle_reticulum.messages import (
MessageType,
NodeAnnouncement,
InventoryAnnouncement,
RefAnnouncement,
Ping,
Pong,
decode_message,
)
from radicle_reticulum.bridge import RadicleBridge from radicle_reticulum.bridge import RadicleBridge
from radicle_reticulum.gossip import GossipRelay from radicle_reticulum.gossip import GossipRelay
from radicle_reticulum.seed import SeedNode
__version__ = "0.1.0" __version__ = "0.1.0"
__all__ = [ __all__ = [
"RadicleIdentity", "RadicleIdentity",
"RNSTransportAdapter",
"RadicleLink",
"MessageType",
"NodeAnnouncement",
"InventoryAnnouncement",
"RefAnnouncement",
"Ping",
"Pong",
"decode_message",
"RadicleBridge", "RadicleBridge",
"GossipRelay", "GossipRelay",
"SeedNode",
] ]

View File

@ -0,0 +1,331 @@
"""RNS Transport Adapter for Radicle.
This is the core adapter that allows Radicle to use Reticulum as a transport.
It provides:
- Destination registration and announcement
- Incoming connection handling
- Outbound connection establishment
- Peer discovery via RNS announcements
"""
import threading
import time
from dataclasses import dataclass, field
from typing import Callable, Dict, List, Optional, Set
import RNS
from radicle_reticulum.identity import RadicleIdentity
from radicle_reticulum.link import RadicleLink, LinkState
# Radicle app name for RNS destination
APP_NAME = "radicle"
# Aspect for node connections
ASPECT_NODE = "node"
# Aspect for repository-specific endpoints
ASPECT_REPO = "repo"
# App data identifiers for filtering announces
NODE_APP_DATA_MAGIC = b"RADICLE_NODE_V1"
REPO_APP_DATA_MAGIC = b"RADICLE_REPO_V1"
@dataclass
class PeerInfo:
"""Information about a discovered peer."""
identity: RadicleIdentity
destination_hash: bytes
last_seen: float
announced_repos: Set[str] = field(default_factory=set)
@property
def age(self) -> float:
"""Seconds since last seen."""
return time.time() - self.last_seen
class RNSTransportAdapter:
"""Reticulum transport adapter for Radicle.
Provides the bridge between Radicle's network layer and Reticulum.
Manages identity, destinations, links, and peer discovery.
"""
def __init__(
self,
identity: Optional[RadicleIdentity] = None,
config_path: Optional[str] = None,
):
"""Initialize the adapter.
Args:
identity: Radicle identity to use. Generated if not provided.
config_path: Path to Reticulum config. Uses default if not provided.
"""
# Initialize Reticulum
self.reticulum = RNS.Reticulum(config_path)
# Set up identity
if identity is None:
identity = RadicleIdentity.generate()
self.identity = identity
# Create main node destination
self.node_destination = RNS.Destination(
self.identity.rns_identity,
RNS.Destination.IN,
RNS.Destination.SINGLE,
APP_NAME,
ASPECT_NODE,
)
# Set up link handling
self.node_destination.set_link_established_callback(self._on_incoming_link)
# Peer tracking
self._peers: Dict[bytes, PeerInfo] = {}
self._peers_lock = threading.Lock()
# Active links
self._links: Dict[bytes, RadicleLink] = {}
self._links_lock = threading.Lock()
# Callbacks
self._on_peer_discovered: Optional[Callable[[PeerInfo], None]] = None
self._on_incoming_connection: Optional[Callable[[RadicleLink], None]] = None
# Repository destinations (hash -> destination)
self._repo_destinations: Dict[str, RNS.Destination] = {}
RNS.log(f"RNS Transport Adapter initialized", RNS.LOG_INFO)
RNS.log(f" Node ID: {self.identity.did}", RNS.LOG_INFO)
RNS.log(f" RNS Hash: {self.identity.rns_hash_hex}", RNS.LOG_INFO)
def start(self):
"""Start the adapter and begin announcing presence."""
# Register announce handler to discover peers
RNS.Transport.register_announce_handler(self._handle_announce)
# Announce our node destination
self.announce()
RNS.log("RNS Transport Adapter started", RNS.LOG_INFO)
def stop(self):
"""Stop the adapter and clean up."""
# Close all active links
with self._links_lock:
for link in list(self._links.values()):
link.close()
self._links.clear()
RNS.log("RNS Transport Adapter stopped", RNS.LOG_INFO)
def announce(self, app_data: Optional[bytes] = None):
"""Announce this node's presence on the network.
Args:
app_data: Optional additional application data to include.
Could include repository list, capabilities, etc.
"""
# Prepend node magic for filtering, then append any extra app_data
full_app_data = NODE_APP_DATA_MAGIC
if app_data:
full_app_data += app_data
self.node_destination.announce(app_data=full_app_data)
RNS.log(f"Announced node: {self.identity.rns_hash_hex}", RNS.LOG_DEBUG)
def announce_repository(self, repo_id: str, repo_data: Optional[bytes] = None):
"""Announce availability of a specific repository.
Args:
repo_id: Repository identifier (typically a hash or DID).
repo_data: Optional metadata about the repository.
"""
if repo_id not in self._repo_destinations:
# Create destination for this repository
dest = RNS.Destination(
self.identity.rns_identity,
RNS.Destination.IN,
RNS.Destination.SINGLE,
APP_NAME,
ASPECT_REPO,
repo_id,
)
dest.set_link_established_callback(
lambda link: self._on_incoming_link(link, repo_id)
)
self._repo_destinations[repo_id] = dest
# Prepend repo magic for filtering
full_app_data = REPO_APP_DATA_MAGIC
if repo_data:
full_app_data += repo_data
self._repo_destinations[repo_id].announce(app_data=full_app_data)
RNS.log(f"Announced repository: {repo_id}", RNS.LOG_DEBUG)
def connect(
self,
destination_hash: bytes,
timeout: float = 30.0,
) -> Optional[RadicleLink]:
"""Connect to a peer by destination hash.
Args:
destination_hash: 16-byte RNS destination hash.
timeout: Connection timeout in seconds.
Returns:
RadicleLink if connection successful, None otherwise.
"""
# Check if we already have a path to this destination
if not RNS.Transport.has_path(destination_hash):
RNS.Transport.request_path(destination_hash)
# Wait for path to be established
deadline = time.time() + timeout
while not RNS.Transport.has_path(destination_hash):
if time.time() > deadline:
RNS.log(f"Path request timeout: {destination_hash.hex()}", RNS.LOG_WARNING)
return None
time.sleep(0.1)
# Get the destination
identity = RNS.Identity.recall(destination_hash)
if identity is None:
RNS.log(f"Could not recall identity for: {destination_hash.hex()}", RNS.LOG_WARNING)
return None
destination = RNS.Destination(
identity,
RNS.Destination.OUT,
RNS.Destination.SINGLE,
APP_NAME,
ASPECT_NODE,
)
# Create link
link = RadicleLink.create_outbound(destination)
# Wait for link to establish
deadline = time.time() + timeout
while link.state == LinkState.PENDING:
if time.time() > deadline:
RNS.log(f"Link timeout: {destination_hash.hex()}", RNS.LOG_WARNING)
return None
time.sleep(0.1)
if link.state != LinkState.ACTIVE:
return None
# Track the link
with self._links_lock:
self._links[destination_hash] = link
return link
def connect_to_peer(
self,
peer: PeerInfo,
timeout: float = 30.0,
) -> Optional[RadicleLink]:
"""Connect to a discovered peer.
Args:
peer: PeerInfo from peer discovery.
timeout: Connection timeout in seconds.
"""
return self.connect(peer.destination_hash, timeout)
def get_peers(self) -> List[PeerInfo]:
"""Get list of discovered peers."""
with self._peers_lock:
return list(self._peers.values())
def get_peer_by_did(self, did: str) -> Optional[PeerInfo]:
"""Look up a peer by their Radicle DID."""
with self._peers_lock:
for peer in self._peers.values():
if peer.identity.did == did:
return peer
return None
def set_on_peer_discovered(self, callback: Callable[[PeerInfo], None]):
"""Set callback for peer discovery events."""
self._on_peer_discovered = callback
def set_on_incoming_connection(self, callback: Callable[[RadicleLink], None]):
"""Set callback for incoming connections."""
self._on_incoming_connection = callback
def _handle_announce(
self,
destination_hash: bytes,
announced_identity: RNS.Identity,
app_data: Optional[bytes],
) -> None:
"""Handle incoming announce from another node."""
# Ignore our own announcements
if destination_hash == self.node_destination.hash:
return
# Filter: only accept announces with node magic in app_data
if app_data is None or not app_data.startswith(NODE_APP_DATA_MAGIC):
# Not a radicle node announce, ignore
return
# Create RadicleIdentity from announced identity
try:
radicle_id = RadicleIdentity.from_rns_identity(announced_identity)
except Exception as e:
RNS.log(f"Failed to create RadicleIdentity from announce: {e}", RNS.LOG_WARNING)
return
# Update or create peer info
with self._peers_lock:
if destination_hash in self._peers:
peer = self._peers[destination_hash]
peer.last_seen = time.time()
else:
peer = PeerInfo(
identity=radicle_id,
destination_hash=destination_hash,
last_seen=time.time(),
)
self._peers[destination_hash] = peer
RNS.log(f"Discovered peer: {radicle_id.did}", RNS.LOG_INFO)
# Notify callback
if self._on_peer_discovered:
self._on_peer_discovered(peer)
def _on_incoming_link(self, link: RNS.Link, repo_id: Optional[str] = None):
"""Handle incoming link establishment."""
radicle_link = RadicleLink.from_incoming(link)
# Track the link
remote_hash = link.get_remote_identity().hash if link.get_remote_identity() else None
if remote_hash:
with self._links_lock:
self._links[remote_hash] = radicle_link
RNS.log(f"Incoming connection established", RNS.LOG_INFO)
# Notify callback
if self._on_incoming_connection:
self._on_incoming_connection(radicle_link)
@property
def node_hash(self) -> bytes:
"""Get this node's destination hash."""
return self.node_destination.hash
@property
def node_hash_hex(self) -> str:
"""Get this node's destination hash as hex."""
return self.node_destination.hexhash

View File

@ -15,7 +15,6 @@ The bridge:
4. Remote bridges forward to their local radicle-node 4. Remote bridges forward to their local radicle-node
""" """
import io
import json import json
import os import os
import socket import socket
@ -26,7 +25,7 @@ import threading
import time import time
from dataclasses import dataclass, field from dataclasses import dataclass, field
from pathlib import Path from pathlib import Path
from typing import Any, Callable, Dict, List, Optional, Set, Tuple from typing import Callable, Dict, List, Optional, Set, Tuple
import RNS import RNS
@ -44,14 +43,9 @@ ASPECT_BRIDGE = "bridge"
# App data identifier for bridge announces (used for filtering) # App data identifier for bridge announces (used for filtering)
BRIDGE_APP_DATA_MAGIC = b"RADICLE_BRIDGE_V1" BRIDGE_APP_DATA_MAGIC = b"RADICLE_BRIDGE_V1"
# TCP read chunk size — large reads are fine; RNS.Channel handles chunking internally. # Buffer sizes
TCP_READ_SIZE = 32768 TCP_BUFFER_SIZE = 65536
RNS_BUFFER_SIZE = 32768 # Smaller for RNS to avoid fragmentation
# Stream IDs for the bidirectional Buffer over RNS.Channel.
# Initiator (outbound): sends on stream 1, receives on stream 0.
# Responder (inbound): sends on stream 0, receives on stream 1.
_STREAM_INITIATOR_RECV = 0
_STREAM_INITIATOR_SEND = 1
@dataclass @dataclass
@ -65,16 +59,10 @@ class TunnelConnection:
bytes_sent: int = 0 bytes_sent: int = 0
bytes_received: int = 0 bytes_received: int = 0
active: bool = True active: bool = True
buf: Optional[Any] = None # io.BufferedRWPair from RNS.Buffer.create_bidirectional_buffer
def close(self): def close(self):
"""Close the tunnel.""" """Close the tunnel."""
self.active = False self.active = False
if self.buf:
try:
self.buf.close()
except Exception:
pass
if self.tcp_socket: if self.tcp_socket:
try: try:
self.tcp_socket.close() self.tcp_socket.close()
@ -87,17 +75,6 @@ class TunnelConnection:
pass pass
class _AnnounceHandler:
"""Wraps a callback into the object interface RNS.Transport requires."""
aspect_filter = f"{APP_NAME}.{ASPECT_BRIDGE}"
def __init__(self, callback):
self._callback = callback
def received_announce(self, destination_hash, announced_identity, app_data):
self._callback(destination_hash, announced_identity, app_data)
class RadicleBridge: class RadicleBridge:
"""Bridges Radicle TCP connections over Reticulum. """Bridges Radicle TCP connections over Reticulum.
@ -199,7 +176,7 @@ class RadicleBridge:
self._running = True self._running = True
# Register announce handler to discover other bridges # Register announce handler to discover other bridges
RNS.Transport.register_announce_handler(_AnnounceHandler(self._handle_announce)) RNS.Transport.register_announce_handler(self._handle_announce)
RNS.log("Registered announce handler for bridge discovery", RNS.LOG_INFO) RNS.log("Registered announce handler for bridge discovery", RNS.LOG_INFO)
# Load persisted NIDs first so radicle-node is ready for reconnects # Load persisted NIDs first so radicle-node is ready for reconnects
@ -326,7 +303,7 @@ class RadicleBridge:
link = RNS.Link(remote_dest) link = RNS.Link(remote_dest)
deadline = time.time() + half deadline = time.time() + half
while link.status != RNS.Link.ACTIVE: while link.status != RNS.Link.ACTIVE:
if link.status in (RNS.Link.CLOSED, RNS.Link.TIMEOUT, RNS.Link.STALE): if link.status in (RNS.Link.CLOSED, RNS.Link.FAILED):
return None return None
if time.time() > deadline: if time.time() > deadline:
return None return None
@ -494,9 +471,9 @@ class RadicleBridge:
# Wait for link establishment (Noise XK handshake) # Wait for link establishment (Noise XK handshake)
deadline = time.time() + 30.0 deadline = time.time() + 30.0
while rns_link.status != RNS.Link.ACTIVE: while rns_link.status != RNS.Link.ACTIVE:
if rns_link.status in (RNS.Link.CLOSED, RNS.Link.TIMEOUT, RNS.Link.STALE): if rns_link.status in (RNS.Link.CLOSED, RNS.Link.FAILED):
RNS.log( RNS.log(
"Link closed/timed out before becoming active", "Link closed/failed before becoming active",
RNS.LOG_WARNING, RNS.LOG_WARNING,
) )
tcp_socket.close() tcp_socket.close()
@ -526,12 +503,8 @@ class RadicleBridge:
if self._tunnel_opened_cb: if self._tunnel_opened_cb:
self._tunnel_opened_cb(tunnel) self._tunnel_opened_cb(tunnel)
channel = rns_link.get_channel() rns_link.set_packet_callback(
tunnel.buf = RNS.Buffer.create_bidirectional_buffer( lambda data, pkt: self._on_rns_data(tunnel_id, data)
_STREAM_INITIATOR_RECV,
_STREAM_INITIATOR_SEND,
channel,
ready_callback=lambda n: self._on_rns_data(tunnel_id, n),
) )
rns_link.set_link_closed_callback( rns_link.set_link_closed_callback(
lambda link: self._on_tunnel_closed(tunnel_id) lambda link: self._on_tunnel_closed(tunnel_id)
@ -540,49 +513,77 @@ class RadicleBridge:
self._forward_tcp_to_rns(tunnel) self._forward_tcp_to_rns(tunnel)
def _forward_tcp_to_rns(self, tunnel: TunnelConnection): def _forward_tcp_to_rns(self, tunnel: TunnelConnection):
"""Forward data from TCP socket to RNS Channel buffer.""" """Forward data from TCP socket to RNS link."""
tcp_socket = tunnel.tcp_socket tcp_socket = tunnel.tcp_socket
tcp_socket.setblocking(False) tcp_socket.setblocking(False)
while tunnel.active and self._running: while tunnel.active and self._running:
try: try:
rns_link = tunnel.rns_link # read each iteration: may be updated by reconnect
readable, _, errored = select.select([tcp_socket], [], [tcp_socket], 1.0) readable, _, errored = select.select([tcp_socket], [], [tcp_socket], 1.0)
if errored: if errored:
break break
if readable: if readable:
data = tcp_socket.recv(TCP_READ_SIZE) data = tcp_socket.recv(RNS_BUFFER_SIZE)
if not data: if not data:
break break
tunnel.buf.write(data) if rns_link.status == RNS.Link.ACTIVE:
tunnel.buf.flush() packet = RNS.Packet(rns_link, data)
tunnel.bytes_sent += len(data) packet.send()
tunnel.bytes_sent += len(data)
elif tunnel.remote_destination:
RNS.log(
f"Tunnel {tunnel.tunnel_id}: link dropped, reconnecting...",
RNS.LOG_WARNING,
)
new_link = self._reconnect_link(tunnel.remote_destination)
if new_link is None:
RNS.log(
f"Tunnel {tunnel.tunnel_id}: reconnect failed",
RNS.LOG_WARNING,
)
break
RNS.log(
f"Tunnel {tunnel.tunnel_id}: reconnected",
RNS.LOG_INFO,
)
tunnel.rns_link = new_link
tid = tunnel.tunnel_id
new_link.set_packet_callback(
lambda d, p: self._on_rns_data(tid, d)
)
new_link.set_link_closed_callback(
lambda l: self._on_tunnel_closed(tid)
)
packet = RNS.Packet(new_link, data)
packet.send()
tunnel.bytes_sent += len(data)
else:
break
except socket.error: except socket.error:
break break
except Exception as e: except Exception as e:
RNS.log(f"Tunnel {tunnel.tunnel_id} forward error: {e}", RNS.LOG_DEBUG) RNS.log(f"Forward error: {e}", RNS.LOG_DEBUG)
break break
self._on_tunnel_closed(tunnel.tunnel_id) self._on_tunnel_closed(tunnel.tunnel_id)
def _on_rns_data(self, tunnel_id: int, n_bytes: int): def _on_rns_data(self, tunnel_id: int, data: bytes):
"""Drain n_bytes from the RNS Buffer and forward to the TCP socket.""" """Handle data received from RNS link."""
with self._tunnels_lock: with self._tunnels_lock:
tunnel = self._tunnels.get(tunnel_id) tunnel = self._tunnels.get(tunnel_id)
if not (tunnel and tunnel.active and tunnel.buf and tunnel.tcp_socket): if tunnel and tunnel.active and tunnel.tcp_socket:
return try:
try:
data = tunnel.buf.read(n_bytes)
if data:
tunnel.tcp_socket.sendall(data) tunnel.tcp_socket.sendall(data)
tunnel.bytes_received += len(data) tunnel.bytes_received += len(data)
except Exception as e: except Exception as e:
RNS.log(f"TCP send error on tunnel {tunnel_id}: {e}", RNS.LOG_DEBUG) RNS.log(f"TCP send error: {e}", RNS.LOG_DEBUG)
self._on_tunnel_closed(tunnel_id) self._on_tunnel_closed(tunnel_id)
def _on_tunnel_closed(self, tunnel_id: int): def _on_tunnel_closed(self, tunnel_id: int):
"""Handle tunnel closure.""" """Handle tunnel closure."""
@ -627,12 +628,8 @@ class RadicleBridge:
RNS.log(f"Incoming tunnel {tunnel_id} opened", RNS.LOG_INFO) RNS.log(f"Incoming tunnel {tunnel_id} opened", RNS.LOG_INFO)
channel = link.get_channel() link.set_packet_callback(
tunnel.buf = RNS.Buffer.create_bidirectional_buffer( lambda data, pkt: self._on_rns_data(tunnel_id, data)
_STREAM_INITIATOR_SEND, # responder receives on the initiator's send stream
_STREAM_INITIATOR_RECV, # responder sends on the initiator's receive stream
channel,
ready_callback=lambda n: self._on_rns_data(tunnel_id, n),
) )
link.set_link_closed_callback( link.set_link_closed_callback(
lambda l: self._on_tunnel_closed(tunnel_id) lambda l: self._on_tunnel_closed(tunnel_id)

View File

@ -13,9 +13,11 @@ DEFAULT_IDENTITY_PATH = Path.home() / ".radicle-rns" / "identity"
import RNS import RNS
from radicle_reticulum.adapter import RNSTransportAdapter, PeerInfo
from radicle_reticulum.identity import RadicleIdentity from radicle_reticulum.identity import RadicleIdentity
from radicle_reticulum.bridge import RadicleBridge from radicle_reticulum.bridge import RadicleBridge
from radicle_reticulum.gossip import GossipRelay from radicle_reticulum.gossip import GossipRelay
from radicle_reticulum.seed import SeedNode, DEFAULT_SEED_HOME, DEFAULT_SEED_PORT
def detect_radicle_nid() -> Optional[str]: def detect_radicle_nid() -> Optional[str]:
@ -47,6 +49,24 @@ def detect_radicle_nid() -> Optional[str]:
return None return None
LORA_ANNOUNCE_DELAYS = "60,300,900"
LORA_POLL_INTERVAL = 120
def _apply_lora_defaults(args) -> None:
"""Override delay/poll defaults with LoRa-safe values when --lora is set.
Only overrides fields that were left at their defaults (argparse default
strings), so explicit user flags always take precedence.
"""
if not getattr(args, "lora", False):
return
if getattr(args, "announce_retry_delays", None) == "5,15,30":
args.announce_retry_delays = LORA_ANNOUNCE_DELAYS
if getattr(args, "poll_interval", None) == 30:
args.poll_interval = LORA_POLL_INTERVAL
def _parse_delays(s: str) -> Tuple[int, ...]: def _parse_delays(s: str) -> Tuple[int, ...]:
try: try:
return tuple(int(x.strip()) for x in s.split(",") if x.strip()) return tuple(int(x.strip()) for x in s.split(",") if x.strip())
@ -55,6 +75,61 @@ def _parse_delays(s: str) -> Tuple[int, ...]:
sys.exit(1) sys.exit(1)
def on_peer_discovered(peer: PeerInfo):
"""Callback when a new peer is discovered."""
print(f"[+] Discovered peer: {peer.identity.did}")
print(f" RNS hash: {peer.destination_hash.hex()}")
def cmd_node(args):
"""Run a Radicle-RNS node."""
print("Starting Radicle-RNS node...")
identity = RadicleIdentity.load_or_generate(args.identity)
_print_identity_info(args.identity)
# Create adapter
adapter = RNSTransportAdapter(identity=identity)
adapter.set_on_peer_discovered(on_peer_discovered)
print(f"Node ID: {identity.did}")
print(f"RNS Hash: {adapter.node_hash_hex}")
print()
# Start adapter
adapter.start()
print("Node running. Press Ctrl+C to stop.")
print("Announcing every 60 seconds...")
print()
# Handle graceful shutdown
running = True
def signal_handler(sig, frame):
nonlocal running
print("\nShutting down...")
running = False
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
# Main loop
last_announce = 0
try:
while running:
now = time.time()
# Periodic announce
if now - last_announce > 60:
adapter.announce()
last_announce = now
time.sleep(0.5)
finally:
adapter.stop()
print("Node stopped.")
def _print_identity_info(identity_path: Path): def _print_identity_info(identity_path: Path):
"""Print identity file location (new or loaded).""" """Print identity file location (new or loaded)."""
path = Path(identity_path) path = Path(identity_path)
@ -100,6 +175,76 @@ def cmd_identity(args):
sys.exit(1) sys.exit(1)
def cmd_ping(args):
"""Ping a peer by RNS hash."""
print(f"Connecting to {args.destination}...")
identity = RadicleIdentity.load_or_generate(args.identity)
adapter = RNSTransportAdapter(identity=identity)
adapter.start()
try:
dest_hash = bytes.fromhex(args.destination)
except ValueError:
print("Error: Invalid destination hash (must be hex)", file=sys.stderr)
sys.exit(1)
link = adapter.connect(dest_hash, timeout=args.timeout)
if link is None:
print("Failed to connect")
sys.exit(1)
print(f"Connected! RTT: {link.rtt:.3f}s" if link.rtt else "Connected!")
# Send ping
from radicle_reticulum.messages import Ping, Pong, decode_message, MessageType
import struct
ping = Ping()
ping_time = time.time()
link.send(ping.to_message())
print("Ping sent, waiting for pong...")
response = link.recv(timeout=10.0)
if response:
header, msg = decode_message(response)
if header.msg_type == MessageType.PONG:
rtt = (time.time() - ping_time) * 1000
print(f"Pong received! RTT: {rtt:.1f}ms")
else:
print(f"Unexpected response: {header.msg_type}")
else:
print("No response (timeout)")
link.close()
adapter.stop()
def cmd_peers(args):
"""List discovered peers."""
identity = RadicleIdentity.load_or_generate(args.identity)
adapter = RNSTransportAdapter(identity=identity)
adapter.set_on_peer_discovered(on_peer_discovered)
adapter.start()
print(f"Listening for peers for {args.timeout} seconds...")
print()
time.sleep(args.timeout)
peers = adapter.get_peers()
if peers:
print(f"\nDiscovered {len(peers)} peer(s):")
for peer in peers:
print(f" {peer.identity.did}")
print(f" Hash: {peer.destination_hash.hex()}")
print(f" Age: {peer.age:.1f}s")
else:
print("\nNo peers discovered.")
adapter.stop()
def _detect_rid(repo_path: Path) -> Optional[str]: def _detect_rid(repo_path: Path) -> Optional[str]:
"""Detect the Radicle RID for the repo at repo_path via 'rad inspect'.""" """Detect the Radicle RID for the repo at repo_path via 'rad inspect'."""
try: try:
@ -119,6 +264,7 @@ def _detect_rid(repo_path: Path) -> Optional[str]:
def cmd_gossip(args): def cmd_gossip(args):
"""Run the gossip relay daemon.""" """Run the gossip relay daemon."""
_apply_lora_defaults(args)
identity = RadicleIdentity.load_or_generate(args.identity) identity = RadicleIdentity.load_or_generate(args.identity)
_print_identity_info(args.identity) _print_identity_info(args.identity)
@ -189,8 +335,131 @@ def cmd_gossip(args):
print("Gossip relay stopped.") print("Gossip relay stopped.")
def cmd_seed(args):
"""Start a dedicated seed radicle-node, bridge, and gossip relay."""
_apply_lora_defaults(args)
seed_home = Path(args.seed_home)
# Validate args before starting any processes
announce_retry_delays = _parse_delays(args.announce_retry_delays)
seed = SeedNode(seed_home=seed_home, port=args.seed_port)
# First-time setup: guide the user
if not seed.is_initialized():
seed.write_config()
print(f"Seed home: {seed_home}")
print()
print("Seed identity not found. Initialize it with:")
print(f" RAD_HOME={seed_home} rad auth")
print()
print("Then run 'radicle-rns seed' again.")
sys.exit(1)
# Start seed radicle-node
print(f"Starting seed radicle-node (port {args.seed_port})...")
try:
seed.start()
except RuntimeError as e:
print(f"Error: {e}", file=sys.stderr)
sys.exit(1)
nid = seed.get_nid()
if not nid:
print("Error: could not get seed NID.", file=sys.stderr)
seed.stop()
sys.exit(1)
identity = RadicleIdentity.load_or_generate(args.identity)
_print_identity_info(args.identity)
bridge = RadicleBridge(
identity=identity,
listen_port=args.bridge_port,
radicle_host="127.0.0.1",
radicle_port=args.seed_port,
auto_connect=True,
auto_seed=True,
announce_retry_delays=announce_retry_delays,
rad_home=str(seed_home),
state_path=seed_home / "bridge_state.json",
)
bridge.set_local_radicle_nid(nid)
def on_peer_seed_discovered(dest_hash, remote_nid=None):
nid_info = f" (NID: {remote_nid[:32]})" if remote_nid else ""
print(f"[+] Discovered remote seed: {dest_hash.hex()[:16]}{nid_info}")
bridge.set_on_bridge_discovered(on_peer_seed_discovered)
# Gossip relay: watches seed's storage, notifies remote seeds of ref changes.
# bridge_port=None: bridge's auto_seed already registered NIDs on correct ports.
gossip = GossipRelay(
identity=identity,
rids=[],
storage=seed_home / "storage",
radicle_nid=nid,
bridge_port=None,
poll_interval=args.poll_interval,
announce_retry_delays=announce_retry_delays,
auto_discover=True,
rad_home=str(seed_home),
)
running = True
def signal_handler(sig, frame):
nonlocal running
print("\nShutting down...")
running = False
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
try:
bridge.start()
gossip.start()
print()
print("Seed node running.")
print(f" Seed NID: {nid}")
print(f" Seed port: {args.seed_port}")
print(f" Bridge hash: {bridge.destination.hexhash}")
print(f" Gossip hash: {gossip.destination.hexhash}")
print()
print("Add this seed to your radicle node (one-time setup):")
print(f" rad node connect {nid}@127.0.0.1:{args.seed_port}")
print()
print("Other machines running 'radicle-rns seed' will discover this")
print("seed automatically and sync over the mesh.")
print()
print("Press Ctrl+C to stop.")
print()
last_known_bridges = -1
while running:
if not seed.is_running():
print("Seed radicle-node exited unexpectedly.", file=sys.stderr)
break
stats = bridge.get_stats()
if stats["known_bridges"] != last_known_bridges:
last_known_bridges = stats["known_bridges"]
print(f"[Status] Remote seeds: {stats['known_bridges']}, "
f"Tunnels: {stats['active_tunnels']}, "
f"Gossip peers: {gossip.get_stats()['known_peers']}")
time.sleep(5)
finally:
gossip.stop()
bridge.stop()
seed.stop()
print("Seed stopped.")
def cmd_bridge(args): def cmd_bridge(args):
"""Run Radicle-Reticulum bridge.""" """Run Radicle-Reticulum bridge."""
_apply_lora_defaults(args)
print("Starting Radicle-Reticulum bridge...") print("Starting Radicle-Reticulum bridge...")
identity = RadicleIdentity.load_or_generate(args.identity) identity = RadicleIdentity.load_or_generate(args.identity)
@ -313,6 +582,9 @@ def cmd_bridge(args):
def cmd_setup(args): def cmd_setup(args):
"""Check prerequisites and print setup instructions.""" """Check prerequisites and print setup instructions."""
seed_home = Path(args.seed_home)
seed_port = args.seed_port
ok = True ok = True
def check(label: str, passed: bool, fix: Optional[str] = None) -> bool: def check(label: str, passed: bool, fix: Optional[str] = None) -> bool:
@ -324,6 +596,7 @@ def cmd_setup(args):
print("Checking prerequisites...") print("Checking prerequisites...")
# rad CLI
try: try:
r = subprocess.run(["rad", "--version"], capture_output=True, text=True, timeout=5) r = subprocess.run(["rad", "--version"], capture_output=True, text=True, timeout=5)
ver = r.stdout.strip().split("\n")[0] if r.returncode == 0 else None ver = r.stdout.strip().split("\n")[0] if r.returncode == 0 else None
@ -331,18 +604,21 @@ def cmd_setup(args):
except FileNotFoundError: except FileNotFoundError:
ok &= check("rad CLI", False, "Install Radicle: https://radicle.xyz") ok &= check("rad CLI", False, "Install Radicle: https://radicle.xyz")
# radicle-node
try: try:
r = subprocess.run(["radicle-node", "--version"], capture_output=True, text=True, timeout=5) r = subprocess.run(["radicle-node", "--version"], capture_output=True, text=True, timeout=5)
ok &= check("radicle-node", r.returncode == 0, "Install Radicle: https://radicle.xyz") ok &= check("radicle-node", r.returncode == 0, "Install Radicle: https://radicle.xyz")
except FileNotFoundError: except FileNotFoundError:
ok &= check("radicle-node", False, "Install Radicle: https://radicle.xyz") ok &= check("radicle-node", False, "Install Radicle: https://radicle.xyz")
# RNS Python library
try: try:
import RNS as _rns # noqa: F401 import RNS as _rns # noqa: F401
ok &= check("Reticulum (RNS)", True) ok &= check("Reticulum (RNS)", True)
except ImportError: except ImportError:
ok &= check("Reticulum (RNS)", False, "pip install rns (or install Reticulum: https://reticulum.network)") ok &= check("Reticulum (RNS)", False, "pip install rns")
# watchdog (optional)
try: try:
import watchdog # noqa: F401 import watchdog # noqa: F401
check("watchdog (instant push detection)", True) check("watchdog (instant push detection)", True)
@ -350,40 +626,75 @@ def cmd_setup(args):
check( check(
"watchdog (optional — enables instant push detection)", "watchdog (optional — enables instant push detection)",
False, False,
"uv sync --extra watch", "pip install watchdog # or: uv add watchdog",
) )
print() print()
print("Radicle identity...") print("Seed identity...")
seed_node = SeedNode(seed_home=seed_home, port=seed_port)
seed_initialized = seed_node.is_initialized()
ok &= check(
f"Seed identity at {seed_home}",
seed_initialized,
f"RAD_HOME={seed_home} rad auth",
)
seed_nid: Optional[str] = None
if seed_initialized:
seed_nid = seed_node.get_nid()
check(
f"Seed NID: {seed_nid[:48] if seed_nid else '(could not read)'}",
bool(seed_nid),
)
print()
print("User radicle-node configuration...")
user_nid = detect_radicle_nid() user_nid = detect_radicle_nid()
ok &= check( ok &= check(
f"radicle identity{f' ({user_nid[:32]}...)' if user_nid else ''}", f"Your radicle identity{f' ({user_nid[:32]}...)' if user_nid else ''}",
bool(user_nid), bool(user_nid),
"rad auth", "rad auth # initialise your radicle identity first",
) )
nid_in_config = False # Check whether the seed is registered in the user's radicle-node.
try: # We do this by calling 'rad node' and looking for the seed NID.
import json as _json seed_registered = False
cfg = Path.home() / ".radicle" / "config.json" if seed_nid:
if cfg.exists(): try:
data = _json.loads(cfg.read_text()) r = subprocess.run(
listen = data.get("node", {}).get("listen", []) ["rad", "node"], capture_output=True, text=True, timeout=5
nid_in_config = any("127.0.0.1" in addr for addr in listen) )
except Exception: seed_registered = seed_nid in r.stdout
pass except Exception:
ok &= check( pass
"radicle-node listens on 127.0.0.1", ok &= check(
nid_in_config, "Seed registered in your radicle node",
'set "node": {"listen": ["127.0.0.1:8776"]} in ~/.radicle/config.json', seed_registered,
f"rad node connect {seed_nid}@127.0.0.1:{seed_port}",
)
print()
print("Seed process...")
seed_listening = seed_node._port_open()
check(
f"Seed radicle-node listening on port {seed_port}",
seed_listening,
f"radicle-rns seed --seed-home {seed_home} --seed-port {seed_port}",
) )
print() print()
if ok: if ok and seed_listening:
print("All checks passed. Run: radicle-rns bridge") print("All checks passed. Seed is running.")
elif ok:
print("All checks passed. Start the seed with:")
print(f" radicle-rns seed")
else: else:
print("Setup incomplete. Follow the instructions above, then re-run: radicle-rns setup") print("Setup incomplete. Follow the instructions above, then run:")
print(f" radicle-rns setup # re-check")
print(f" radicle-rns seed # once all checks pass")
def main(): def main():
@ -408,6 +719,10 @@ def main():
help=f"Identity file (default: {DEFAULT_IDENTITY_PATH})", help=f"Identity file (default: {DEFAULT_IDENTITY_PATH})",
) )
# node command
node_parser = subparsers.add_parser("node", help="Run a Radicle-RNS node")
add_identity_arg(node_parser)
# identity command # identity command
id_parser = subparsers.add_parser("identity", help="Identity operations") id_parser = subparsers.add_parser("identity", help="Identity operations")
id_parser.add_argument( id_parser.add_argument(
@ -423,6 +738,27 @@ def main():
help="Overwrite existing identity file (for generate)" help="Overwrite existing identity file (for generate)"
) )
# ping command
ping_parser = subparsers.add_parser("ping", help="Ping a peer")
ping_parser.add_argument("destination", help="RNS destination hash (hex)")
ping_parser.add_argument(
"-t", "--timeout",
type=float,
default=30.0,
help="Connection timeout (seconds)"
)
add_identity_arg(ping_parser)
# peers command
peers_parser = subparsers.add_parser("peers", help="Discover peers")
peers_parser.add_argument(
"-t", "--timeout",
type=float,
default=10.0,
help="Discovery timeout (seconds)"
)
add_identity_arg(peers_parser)
# gossip command # gossip command
gossip_parser = subparsers.add_parser( gossip_parser = subparsers.add_parser(
"gossip", "gossip",
@ -456,14 +792,81 @@ def main():
"--announce-retry-delays", "--announce-retry-delays",
default="5,15,30", default="5,15,30",
metavar="SECONDS", metavar="SECONDS",
help="Startup re-announce delays in seconds, comma-separated (default: 5,15,30).", help="Startup re-announce delays, comma-separated (default: 5,15,30 for "
"WiFi/Ethernet). On LoRa use 60,300,900 to respect duty-cycle limits.",
)
gossip_parser.add_argument(
"--lora",
action="store_true",
help="Shortcut for LoRa-safe settings: sets --announce-retry-delays=60,300,900 "
"and --poll-interval=120 (unless overridden explicitly).",
) )
add_identity_arg(gossip_parser) add_identity_arg(gossip_parser)
subparsers.add_parser( # seed command
seed_parser = subparsers.add_parser(
"seed",
help="Start a Radicle seed node and bridge it to the mesh over Reticulum",
)
seed_parser.add_argument(
"--seed-home",
default=str(DEFAULT_SEED_HOME),
metavar="PATH",
help=f"RAD_HOME for the seed node (default: {DEFAULT_SEED_HOME})",
)
seed_parser.add_argument(
"--seed-port",
type=int,
default=DEFAULT_SEED_PORT,
metavar="PORT",
help=f"TCP port for the seed radicle-node (default: {DEFAULT_SEED_PORT})",
)
seed_parser.add_argument(
"--bridge-port",
type=int,
default=8778,
metavar="PORT",
help="TCP listen port for the seed bridge (default: 8778)",
)
seed_parser.add_argument(
"--poll-interval",
type=int,
default=30,
metavar="SECONDS",
help="Seconds between gossip ref polls (default: 30)",
)
seed_parser.add_argument(
"--announce-retry-delays",
default="5,15,30",
metavar="SECONDS",
help="Startup re-announce delays, comma-separated (default: 5,15,30 for "
"WiFi/Ethernet). On LoRa use 60,300,900 to respect duty-cycle limits.",
)
seed_parser.add_argument(
"--lora",
action="store_true",
help="Shortcut for LoRa-safe settings: sets --announce-retry-delays=60,300,900 "
"and --poll-interval=120 (unless overridden explicitly).",
)
add_identity_arg(seed_parser)
setup_parser = subparsers.add_parser(
"setup", "setup",
help="Check prerequisites and print setup instructions", help="Check prerequisites and print setup instructions",
) )
setup_parser.add_argument(
"--seed-home",
default=str(DEFAULT_SEED_HOME),
metavar="PATH",
help=f"RAD_HOME for the seed node (default: {DEFAULT_SEED_HOME})",
)
setup_parser.add_argument(
"--seed-port",
type=int,
default=DEFAULT_SEED_PORT,
metavar="PORT",
help=f"Seed TCP port (default: {DEFAULT_SEED_PORT})",
)
bridge_parser = subparsers.add_parser("bridge", help="Run Radicle-Reticulum bridge") bridge_parser = subparsers.add_parser("bridge", help="Run Radicle-Reticulum bridge")
bridge_parser.add_argument( bridge_parser.add_argument(
@ -505,7 +908,16 @@ def main():
"--announce-retry-delays", "--announce-retry-delays",
default="5,15,30", default="5,15,30",
metavar="SECONDS", metavar="SECONDS",
help="Startup re-announce delays in seconds, comma-separated (default: 5,15,30).", help=(
"Comma-separated delays for startup re-announces (default: 5,15,30 for "
"WiFi/Ethernet). On LoRa use 60,300,900 to respect duty-cycle limits."
),
)
bridge_parser.add_argument(
"--lora",
action="store_true",
help="Shortcut for LoRa-safe settings: sets --announce-retry-delays=60,300,900 "
"(unless overridden explicitly).",
) )
add_identity_arg(bridge_parser) add_identity_arg(bridge_parser)
@ -518,10 +930,18 @@ def main():
RNS.loglevel = RNS.LOG_INFO RNS.loglevel = RNS.LOG_INFO
# Dispatch command # Dispatch command
if args.command == "identity": if args.command == "node":
cmd_node(args)
elif args.command == "identity":
cmd_identity(args) cmd_identity(args)
elif args.command == "ping":
cmd_ping(args)
elif args.command == "peers":
cmd_peers(args)
elif args.command == "gossip": elif args.command == "gossip":
cmd_gossip(args) cmd_gossip(args)
elif args.command == "seed":
cmd_seed(args)
elif args.command == "setup": elif args.command == "setup":
cmd_setup(args) cmd_setup(args)
elif args.command == "bridge": elif args.command == "bridge":

View File

@ -34,18 +34,6 @@ GOSSIP_MAGIC = b"RADICLE_GOSSIP_V1"
DEFAULT_POLL_INTERVAL = 30 # seconds DEFAULT_POLL_INTERVAL = 30 # seconds
PATH_REQUEST_TIMEOUT = 15 # seconds to wait for a path before giving up PATH_REQUEST_TIMEOUT = 15 # seconds to wait for a path before giving up
WATCHDOG_DEBOUNCE = 2.0 # seconds to absorb rapid filesystem events before polling
class _AnnounceHandler:
"""Wraps a callback into the object interface RNS.Transport requires."""
aspect_filter = f"{APP_NAME}.{ASPECT_GOSSIP}"
def __init__(self, callback):
self._callback = callback
def received_announce(self, destination_hash, announced_identity, app_data):
self._callback(destination_hash, announced_identity, app_data)
def _radicle_storage_path() -> Path: def _radicle_storage_path() -> Path:
@ -105,7 +93,6 @@ class GossipRelay:
announce_retry_delays: Tuple[int, ...] = (5, 15, 30), announce_retry_delays: Tuple[int, ...] = (5, 15, 30),
config_path: Optional[str] = None, config_path: Optional[str] = None,
auto_discover: bool = False, auto_discover: bool = False,
auto_seed: bool = False,
rad_home: Optional[str] = None, rad_home: Optional[str] = None,
): ):
""" """
@ -122,10 +109,6 @@ class GossipRelay:
config_path: Reticulum config path (None = default). config_path: Reticulum config path (None = default).
auto_discover: Scan storage each poll cycle and add new repo dirs auto_discover: Scan storage each poll cycle and add new repo dirs
to rids automatically. Useful in seed mode. to rids automatically. Useful in seed mode.
auto_seed: When refs arrive for an unknown repo, automatically call
'rad seed <RID>' so this node starts tracking it, then sync.
Combines with auto_discover: once seeded, the repo is picked up
on the next poll cycle.
rad_home: RAD_HOME override for rad CLI calls. None = system default. rad_home: RAD_HOME override for rad CLI calls. None = system default.
""" """
self.identity = identity self.identity = identity
@ -136,7 +119,6 @@ class GossipRelay:
self.poll_interval = poll_interval self.poll_interval = poll_interval
self.announce_retry_delays = announce_retry_delays self.announce_retry_delays = announce_retry_delays
self.auto_discover = auto_discover self.auto_discover = auto_discover
self.auto_seed = auto_seed
self.rad_home = rad_home self.rad_home = rad_home
existing = RNS.Reticulum.get_instance() existing = RNS.Reticulum.get_instance()
@ -166,7 +148,7 @@ class GossipRelay:
def start(self): def start(self):
"""Start the relay: announce, begin polling, register announce handler.""" """Start the relay: announce, begin polling, register announce handler."""
self._running = True self._running = True
RNS.Transport.register_announce_handler(_AnnounceHandler(self._on_announce)) RNS.Transport.register_announce_handler(self._on_announce)
self.announce() self.announce()
threading.Thread(target=self._startup_announce_loop, daemon=True).start() threading.Thread(target=self._startup_announce_loop, daemon=True).start()
self._start_watcher() self._start_watcher()
@ -290,100 +272,32 @@ class GossipRelay:
if changed: if changed:
self._known_refs[rid] = refs self._known_refs[rid] = refs
if changed and not first_poll: if changed and not first_poll:
self._broadcast(rid, refs, old_refs=old) self._broadcast(rid, refs)
except Exception as e: except Exception as e:
RNS.log(f"Gossip poll error ({rid[:20]}): {e}", RNS.LOG_WARNING) RNS.log(f"Gossip poll error ({rid[:20]}): {e}", RNS.LOG_WARNING)
def _poll_loop(self): def _poll_loop(self):
self._poll_loop_once() # establish baseline refs without broadcasting on startup
while self._running: while self._running:
# Returns True if woken early (watchdog event), False if poll interval elapsed
triggered_early = self._poll_event.wait(timeout=self.poll_interval)
self._poll_event.clear()
if not self._running:
return
if triggered_early:
# Debounce: absorb any rapid-fire events from a multi-commit push
time.sleep(WATCHDOG_DEBOUNCE)
self._poll_event.clear()
self._poll_loop_once() self._poll_loop_once()
# Wait for next poll: woken early by watchdog event or stop()
self._poll_event.wait(timeout=self.poll_interval)
self._poll_event.clear()
# ── Internal: sending ──────────────────────────────────────────────────── # ── Internal: sending ────────────────────────────────────────────────────
def _send_initial_refs(self, destination_hash: bytes): def _broadcast(self, rid: str, refs: Dict[str, str]):
"""Push our current known refs to a newly discovered peer.""" payload = json.dumps({
for rid in list(self.rids): "type": "refs",
with self._refs_lock: "rid": rid,
refs = self._known_refs.get(rid) "nid": self.radicle_nid or "",
if refs: "refs": refs,
for payload in self._build_ref_payloads(rid, refs, is_delta=False): }).encode()
self._send_packet(destination_hash, payload)
def _broadcast(
self,
rid: str,
refs: Dict[str, str],
old_refs: Optional[Dict[str, str]] = None,
):
if old_refs is not None:
to_send = {k: v for k, v in refs.items() if v != old_refs.get(k)}
else:
to_send = refs
is_delta = old_refs is not None
payloads = self._build_ref_payloads(rid, to_send, is_delta)
with self._peers_lock: with self._peers_lock:
peers = list(self._known_peers.keys()) peers = list(self._known_peers.keys())
sent = sum( sent = sum(1 for h in peers if self._send_packet(h, payload))
1 for h in peers RNS.log(f"Broadcast refs for {rid[:20]}... → {sent}/{len(peers)} peers", RNS.LOG_INFO)
if all(self._send_packet(h, p) for p in payloads)
)
total_bytes = sum(len(p) for p in payloads)
RNS.log(
f"Broadcast refs for {rid[:20]}... → {sent}/{len(peers)} peers "
f"({'delta' if is_delta else 'full'}, {total_bytes}B, {len(payloads)} pkt(s))",
RNS.LOG_INFO,
)
def _build_ref_payloads(
self, rid: str, refs: Dict[str, str], is_delta: bool
) -> List[bytes]:
"""Split refs into RNS.Packet.ENCRYPTED_MDU-sized JSON payloads.
LoRa interfaces cap at ~383 bytes per encrypted packet. A single ref
entry is ~70 bytes in JSON, so a repo with many refs needs multiple
packets. The receiver handles each packet independently each one
triggers a changed-ref check and potential sync.
"""
mdu = RNS.Packet.ENCRYPTED_MDU
base: Dict = {"type": "refs", "rid": rid, "nid": self.radicle_nid or ""}
if is_delta:
base["delta"] = True
# Fast path: everything fits in one packet
candidate = {**base, "refs": refs}
payload = json.dumps(candidate).encode()
if len(payload) <= mdu:
return [payload]
# Split: add refs one-by-one until the packet would overflow, then flush
payloads: List[bytes] = []
chunk: Dict[str, str] = {}
for ref_name, sha in refs.items():
chunk[ref_name] = sha
test = json.dumps({**base, "refs": chunk}).encode()
if len(test) > mdu:
# Flush without this ref, then start new chunk with it
chunk.pop(ref_name)
if chunk:
payloads.append(json.dumps({**base, "refs": chunk}).encode())
chunk = {ref_name: sha}
if chunk:
payloads.append(json.dumps({**base, "refs": chunk}).encode())
return payloads or [json.dumps({**base, "refs": {}}).encode()]
def _send_packet(self, peer_hash: bytes, payload: bytes) -> bool: def _send_packet(self, peer_hash: bytes, payload: bytes) -> bool:
try: try:
@ -433,7 +347,6 @@ class GossipRelay:
rid: str = msg.get("rid", "") rid: str = msg.get("rid", "")
nid: str = msg.get("nid", "") nid: str = msg.get("nid", "")
is_delta: bool = msg.get("delta", False)
remote_refs: Dict[str, str] = msg.get("refs", {}) remote_refs: Dict[str, str] = msg.get("refs", {})
if not rid or not remote_refs: if not rid or not remote_refs:
@ -441,68 +354,35 @@ class GossipRelay:
with self._refs_lock: with self._refs_lock:
local_refs = self._known_refs.get(rid, {}) local_refs = self._known_refs.get(rid, {})
changed = any(remote_refs.get(r) != local_refs.get(r) for r in remote_refs)
# Delta packets carry only changed refs — merge onto local known state
effective_refs = {**local_refs, **remote_refs} if is_delta else remote_refs
changed = any(effective_refs.get(r) != local_refs.get(r) for r in effective_refs)
if changed: if changed:
RNS.log( RNS.log(
f"Gossip: new refs for {rid[:20]}... from {nid[:24] if nid else 'unknown'}", f"Gossip: new refs for {rid[:20]}... from {nid[:24] if nid else 'unknown'}",
RNS.LOG_INFO, RNS.LOG_INFO,
) )
is_tracked = rid in self.rids threading.Thread(
if self.auto_seed and not is_tracked: target=self._trigger_sync,
threading.Thread( args=(rid, nid),
target=self._auto_seed_and_sync, daemon=True,
args=(rid, nid), ).start()
daemon=True,
).start()
else:
threading.Thread(
target=self._trigger_sync,
args=(rid, nid),
daemon=True,
).start()
def _trigger_sync(self, rid: str, nid: str): def _trigger_sync(self, rid: str, nid: str):
"""Run rad sync --fetch targeting the specific NID that sent the gossip. """Run rad node connect (if needed) then rad sync --fetch."""
In bridge mode (bridge_port set): first ensures the NID is reachable via
'rad node connect', then runs 'rad sync --fetch --seed NID@...' to target
only that peer rather than all known seeds.
In seed mode (bridge_port=None): skips the connect step the bridge's
auto_seed already registered the NID at its dedicated port. We still
pass --seed so radicle-node fetches from the right peer.
"""
env = None env = None
if self.rad_home: if self.rad_home:
env = os.environ.copy() env = os.environ.copy()
env["RAD_HOME"] = self.rad_home env["RAD_HOME"] = self.rad_home
seed_addr: Optional[str] = None
if nid and self.bridge_port is not None: if nid and self.bridge_port is not None:
addr = f"{nid}@127.0.0.1:{self.bridge_port}" subprocess.run(
result = subprocess.run( ["rad", "node", "connect", f"{nid}@127.0.0.1:{self.bridge_port}"],
["rad", "node", "connect", addr],
capture_output=True, timeout=15, env=env, capture_output=True, timeout=15, env=env,
) )
if result.returncode == 0:
seed_addr = addr
else:
RNS.log(
f"rad node connect {addr} failed (rc={result.returncode}); "
"falling back to any available seed",
RNS.LOG_WARNING,
)
cmd = ["rad", "sync", "--fetch", "--rid", rid]
if seed_addr:
cmd += ["--seed", seed_addr]
result = subprocess.run( result = subprocess.run(
cmd, capture_output=True, text=True, timeout=120, env=env, ["rad", "sync", "--fetch", "--rid", rid],
capture_output=True, text=True, timeout=120, env=env,
) )
if result.returncode == 0: if result.returncode == 0:
@ -517,29 +397,6 @@ class GossipRelay:
if self._on_sync_triggered: if self._on_sync_triggered:
self._on_sync_triggered(rid, nid) self._on_sync_triggered(rid, nid)
def _auto_seed_and_sync(self, rid: str, nid: str):
"""Call 'rad seed <RID>' then sync — triggered for repos not yet tracked."""
env = None
if self.rad_home:
env = os.environ.copy()
env["RAD_HOME"] = self.rad_home
result = subprocess.run(
["rad", "seed", rid],
capture_output=True, text=True, timeout=30, env=env,
)
if result.returncode == 0:
RNS.log(f"Auto-seeded new repo: {rid[:20]}", RNS.LOG_INFO)
if rid not in self.rids:
self.rids.append(rid)
self._trigger_sync(rid, nid)
else:
stderr = result.stderr.strip()
RNS.log(
f"rad seed {rid[:20]} failed: {stderr[:80] if stderr else '(no output)'}",
RNS.LOG_WARNING,
)
# ── Internal: peer discovery ───────────────────────────────────────────── # ── Internal: peer discovery ─────────────────────────────────────────────
def _on_announce( def _on_announce(
@ -574,11 +431,15 @@ class GossipRelay:
+ (f" (NID: {radicle_nid[:32]})" if radicle_nid else ""), + (f" (NID: {radicle_nid[:32]})" if radicle_nid else ""),
RNS.LOG_INFO, RNS.LOG_INFO,
) )
# Send current refs in a background thread — _send_packet may block # Send our current refs so the peer knows our state immediately
# up to PATH_REQUEST_TIMEOUT waiting for a path, which would stall for rid in self.rids:
# the announce handler if called inline here. with self._refs_lock:
threading.Thread( refs = self._known_refs.get(rid)
target=self._send_initial_refs, if refs:
args=(destination_hash,), payload = json.dumps({
daemon=True, "type": "refs",
).start() "rid": rid,
"nid": self.radicle_nid or "",
"refs": refs,
}).encode()
self._send_packet(destination_hash, payload)

View File

@ -0,0 +1,177 @@
"""RNS Link wrapper providing Radicle-compatible connection interface.
Maps Radicle's Noise XK sessions to RNS Links, which provide:
- Encrypted bidirectional channels
- Forward secrecy via ephemeral ECDH
- Reliable ordered message delivery
"""
import threading
import time
from dataclasses import dataclass, field
from enum import Enum
from typing import Callable, Optional
from collections import deque
import RNS
class LinkState(Enum):
"""Connection state for RadicleLink."""
PENDING = "pending"
ACTIVE = "active"
CLOSED = "closed"
FAILED = "failed"
@dataclass
class RadicleLink:
"""Wrapper around RNS.Link providing Radicle-compatible interface.
RNS Links provide:
- ECDH key exchange (X25519)
- Symmetric encryption with forward secrecy
- Reliable delivery with automatic retransmission
This maps to Radicle's Noise XK sessions conceptually.
"""
rns_link: RNS.Link
state: LinkState = LinkState.PENDING
on_data: Optional[Callable[[bytes], None]] = None
on_close: Optional[Callable[[], None]] = None
_receive_buffer: deque = field(default_factory=deque)
_buffer_lock: threading.Lock = field(default_factory=threading.Lock)
_data_available: threading.Event = field(default_factory=threading.Event)
def __post_init__(self):
"""Set up RNS link callbacks."""
# Only set established callback for pending links (outbound)
# For already-established links (incoming), this callback already fired
if self.state == LinkState.PENDING:
self.rns_link.set_link_established_callback(self._on_established)
self.rns_link.set_link_closed_callback(self._on_closed)
self.rns_link.set_packet_callback(self._on_packet)
@classmethod
def create_outbound(
cls,
destination: RNS.Destination,
on_data: Optional[Callable[[bytes], None]] = None,
on_close: Optional[Callable[[], None]] = None,
) -> "RadicleLink":
"""Create an outbound link to a destination."""
rns_link = RNS.Link(destination)
return cls(
rns_link=rns_link,
on_data=on_data,
on_close=on_close,
)
@classmethod
def from_incoming(
cls,
rns_link: RNS.Link,
on_data: Optional[Callable[[bytes], None]] = None,
on_close: Optional[Callable[[], None]] = None,
) -> "RadicleLink":
"""Wrap an incoming RNS link."""
link = cls(
rns_link=rns_link,
state=LinkState.ACTIVE, # Incoming links are already established
on_data=on_data,
on_close=on_close,
)
return link
def _on_established(self, link: RNS.Link):
"""Called when link is established."""
self.state = LinkState.ACTIVE
RNS.log(f"Link established: {link}", RNS.LOG_DEBUG)
def _on_closed(self, link: RNS.Link):
"""Called when link is closed."""
self.state = LinkState.CLOSED
self._data_available.set() # Wake up any waiting readers
if self.on_close:
self.on_close()
RNS.log(f"Link closed: {link}", RNS.LOG_DEBUG)
def _on_packet(self, message: bytes, packet: RNS.Packet):
"""Called when a packet is received."""
with self._buffer_lock:
self._receive_buffer.append(message)
self._data_available.set()
if self.on_data:
self.on_data(message)
def send(self, data: bytes) -> bool:
"""Send data over the link.
Returns True if packet was sent successfully.
"""
if self.state != LinkState.ACTIVE:
return False
try:
packet = RNS.Packet(self.rns_link, data)
packet.send()
return True
except Exception as e:
RNS.log(f"Send failed: {e}", RNS.LOG_ERROR)
return False
def recv(self, timeout: Optional[float] = None) -> Optional[bytes]:
"""Receive data from the link.
Blocks until data is available or timeout expires.
Returns None on timeout or if link is closed.
"""
deadline = time.time() + timeout if timeout else None
while True:
with self._buffer_lock:
if self._receive_buffer:
return self._receive_buffer.popleft()
if self.state == LinkState.CLOSED:
return None
self._data_available.clear()
remaining = None
if deadline:
remaining = deadline - time.time()
if remaining <= 0:
return None
if not self._data_available.wait(timeout=remaining):
return None
def close(self):
"""Close the link."""
if self.state == LinkState.ACTIVE:
self.rns_link.teardown()
self.state = LinkState.CLOSED
@property
def is_active(self) -> bool:
"""Check if link is active."""
return self.state == LinkState.ACTIVE
@property
def remote_identity(self) -> Optional[RNS.Identity]:
"""Get the remote peer's identity if known."""
return self.rns_link.get_remote_identity()
@property
def rtt(self) -> Optional[float]:
"""Get current round-trip time estimate in seconds."""
if hasattr(self.rns_link, 'rtt') and self.rns_link.rtt:
return self.rns_link.rtt
return None
def __repr__(self) -> str:
return f"RadicleLink(state={self.state.value}, rtt={self.rtt})"

View File

@ -0,0 +1,313 @@
"""Message framing layer for Radicle protocol over RNS.
Implements the gossip message types:
- Node Announcements: Broadcast Node IDs and network addresses
- Inventory Announcements: Share repository inventories for routing
- Reference Announcements: Push repository updates to subscribers
Messages are serialized to a compact binary format suitable for
low-bandwidth Reticulum transports.
"""
import struct
import hashlib
import time
from dataclasses import dataclass, field
from enum import IntEnum
from typing import List, Optional, Set
class MessageType(IntEnum):
"""Radicle gossip message types."""
NODE_ANNOUNCEMENT = 0x01
INVENTORY_ANNOUNCEMENT = 0x02
REF_ANNOUNCEMENT = 0x03
PING = 0x10
PONG = 0x11
# Message header format: type (1 byte) + timestamp (8 bytes) + payload length (2 bytes)
HEADER_FORMAT = "!BQH"
HEADER_SIZE = struct.calcsize(HEADER_FORMAT)
# Maximum payload size (64KB - header)
MAX_PAYLOAD_SIZE = 65535 - HEADER_SIZE
@dataclass
class MessageHeader:
"""Common header for all Radicle messages."""
msg_type: MessageType
timestamp: int # Unix timestamp in milliseconds
payload_length: int
def encode(self) -> bytes:
"""Encode header to bytes."""
return struct.pack(
HEADER_FORMAT,
self.msg_type,
self.timestamp,
self.payload_length,
)
@classmethod
def decode(cls, data: bytes) -> "MessageHeader":
"""Decode header from bytes."""
if len(data) < HEADER_SIZE:
raise ValueError(f"Header too short: {len(data)} < {HEADER_SIZE}")
msg_type_raw, timestamp, payload_length = struct.unpack(
HEADER_FORMAT, data[:HEADER_SIZE]
)
try:
msg_type = MessageType(msg_type_raw)
except ValueError:
raise ValueError(f"Unknown message type: {msg_type_raw}")
return cls(
msg_type=msg_type,
timestamp=timestamp,
payload_length=payload_length,
)
@dataclass
class NodeAnnouncement:
"""Announces a node's presence and capabilities.
Broadcast periodically to enable peer discovery.
"""
node_id: str # DID (did:key:z6Mk...)
features: int = 0 # Bitmask of supported features
version: int = 1 # Protocol version
def encode(self) -> bytes:
"""Encode to bytes."""
node_id_bytes = self.node_id.encode("utf-8")
return struct.pack(
f"!HH{len(node_id_bytes)}s",
self.features,
self.version,
node_id_bytes,
)
@classmethod
def decode(cls, data: bytes) -> "NodeAnnouncement":
"""Decode from bytes."""
features, version = struct.unpack("!HH", data[:4])
node_id = data[4:].decode("utf-8")
return cls(node_id=node_id, features=features, version=version)
def to_message(self) -> bytes:
"""Wrap in a full message with header."""
payload = self.encode()
header = MessageHeader(
msg_type=MessageType.NODE_ANNOUNCEMENT,
timestamp=int(time.time() * 1000),
payload_length=len(payload),
)
return header.encode() + payload
@dataclass
class InventoryAnnouncement:
"""Announces repositories hosted by a node.
Used to build routing tables for repository discovery.
"""
node_id: str # DID of the announcing node
repositories: List[str] # List of repository IDs (hashes)
def encode(self) -> bytes:
"""Encode to bytes."""
node_id_bytes = self.node_id.encode("utf-8")
repo_data = b""
for repo_id in self.repositories:
repo_bytes = repo_id.encode("utf-8")
repo_data += struct.pack(f"!H{len(repo_bytes)}s", len(repo_bytes), repo_bytes)
return struct.pack(
f"!H{len(node_id_bytes)}sH",
len(node_id_bytes),
node_id_bytes,
len(self.repositories),
) + repo_data
@classmethod
def decode(cls, data: bytes) -> "InventoryAnnouncement":
"""Decode from bytes."""
offset = 0
# Node ID
node_id_len = struct.unpack("!H", data[offset:offset+2])[0]
offset += 2
node_id = data[offset:offset+node_id_len].decode("utf-8")
offset += node_id_len
# Repository count
repo_count = struct.unpack("!H", data[offset:offset+2])[0]
offset += 2
# Repositories
repositories = []
for _ in range(repo_count):
repo_len = struct.unpack("!H", data[offset:offset+2])[0]
offset += 2
repo_id = data[offset:offset+repo_len].decode("utf-8")
offset += repo_len
repositories.append(repo_id)
return cls(node_id=node_id, repositories=repositories)
def to_message(self) -> bytes:
"""Wrap in a full message with header."""
payload = self.encode()
header = MessageHeader(
msg_type=MessageType.INVENTORY_ANNOUNCEMENT,
timestamp=int(time.time() * 1000),
payload_length=len(payload),
)
return header.encode() + payload
@dataclass
class RefAnnouncement:
"""Announces a reference update in a repository.
Pushed to subscribers when refs change (commits, branches, etc).
"""
repository_id: str # Repository ID
ref_name: str # Reference name (e.g., "refs/heads/main")
old_oid: bytes # Previous object ID (20 bytes, all zeros if new)
new_oid: bytes # New object ID (20 bytes)
signature: bytes = b"" # Ed25519 signature of the update
def encode(self) -> bytes:
"""Encode to bytes."""
repo_bytes = self.repository_id.encode("utf-8")
ref_bytes = self.ref_name.encode("utf-8")
return struct.pack(
f"!H{len(repo_bytes)}sH{len(ref_bytes)}s20s20sH{len(self.signature)}s",
len(repo_bytes),
repo_bytes,
len(ref_bytes),
ref_bytes,
self.old_oid,
self.new_oid,
len(self.signature),
self.signature,
)
@classmethod
def decode(cls, data: bytes) -> "RefAnnouncement":
"""Decode from bytes."""
offset = 0
# Repository ID
repo_len = struct.unpack("!H", data[offset:offset+2])[0]
offset += 2
repository_id = data[offset:offset+repo_len].decode("utf-8")
offset += repo_len
# Ref name
ref_len = struct.unpack("!H", data[offset:offset+2])[0]
offset += 2
ref_name = data[offset:offset+ref_len].decode("utf-8")
offset += ref_len
# OIDs
old_oid = data[offset:offset+20]
offset += 20
new_oid = data[offset:offset+20]
offset += 20
# Signature
sig_len = struct.unpack("!H", data[offset:offset+2])[0]
offset += 2
signature = data[offset:offset+sig_len]
return cls(
repository_id=repository_id,
ref_name=ref_name,
old_oid=old_oid,
new_oid=new_oid,
signature=signature,
)
def to_message(self) -> bytes:
"""Wrap in a full message with header."""
payload = self.encode()
header = MessageHeader(
msg_type=MessageType.REF_ANNOUNCEMENT,
timestamp=int(time.time() * 1000),
payload_length=len(payload),
)
return header.encode() + payload
@dataclass
class Ping:
"""Simple ping message for keepalive/latency measurement."""
nonce: bytes = field(default_factory=lambda: struct.pack("!Q", int(time.time() * 1000)))
def encode(self) -> bytes:
return self.nonce
@classmethod
def decode(cls, data: bytes) -> "Ping":
return cls(nonce=data[:8])
def to_message(self) -> bytes:
payload = self.encode()
header = MessageHeader(
msg_type=MessageType.PING,
timestamp=int(time.time() * 1000),
payload_length=len(payload),
)
return header.encode() + payload
@dataclass
class Pong:
"""Response to ping."""
nonce: bytes # Echo back the ping nonce
def encode(self) -> bytes:
return self.nonce
@classmethod
def decode(cls, data: bytes) -> "Pong":
return cls(nonce=data[:8])
def to_message(self) -> bytes:
payload = self.encode()
header = MessageHeader(
msg_type=MessageType.PONG,
timestamp=int(time.time() * 1000),
payload_length=len(payload),
)
return header.encode() + payload
def decode_message(data: bytes):
"""Decode a message from bytes.
Returns tuple of (header, message_object).
"""
header = MessageHeader.decode(data)
payload = data[HEADER_SIZE:HEADER_SIZE + header.payload_length]
decoders = {
MessageType.NODE_ANNOUNCEMENT: NodeAnnouncement.decode,
MessageType.INVENTORY_ANNOUNCEMENT: InventoryAnnouncement.decode,
MessageType.REF_ANNOUNCEMENT: RefAnnouncement.decode,
MessageType.PING: Ping.decode,
MessageType.PONG: Pong.decode,
}
decoder = decoders.get(header.msg_type)
if decoder is None:
raise ValueError(f"Unknown message type: {header.msg_type}")
return header, decoder(payload)

View File

@ -0,0 +1,174 @@
"""Radicle seed node manager.
Starts and manages a dedicated radicle-node configured as a seed server
(storage.policy = allow). The seed runs under its own RAD_HOME so it
doesn't interfere with the user's own radicle identity.
Users add the seed as a peer once:
rad node connect <seed-NID>@127.0.0.1:<seed-port>
The bridge then connects this seed to remote seeds over Reticulum.
"""
import json
import os
import socket
import subprocess
import time
from pathlib import Path
from typing import Optional
DEFAULT_SEED_HOME = Path.home() / ".radicle-rns" / "seed"
DEFAULT_SEED_PORT = 8779
SEED_CONFIG = {
"node": {
"alias": "radicle-rns-seed",
"listen": [], # filled in by write_config()
"connect": [],
"externalAddresses": [],
"seedingPolicy": {
"default": "allow",
"repos": {}
},
"db": {"journalMode": "wal"},
}
}
class SeedNode:
"""Manages a dedicated radicle-node process configured as a seed."""
def __init__(
self,
seed_home: Path = DEFAULT_SEED_HOME,
port: int = DEFAULT_SEED_PORT,
):
self.seed_home = Path(seed_home)
self.port = port
self._process: Optional[subprocess.Popen] = None
# ── Setup ────────────────────────────────────────────────────────────────
def is_initialized(self) -> bool:
"""Return True if rad auth has been run for this seed home."""
try:
result = subprocess.run(
["rad", "self"],
env=self._env(),
capture_output=True,
text=True,
timeout=5,
)
return result.returncode == 0 and "NID" in result.stdout
except Exception:
return False
def write_config(self):
"""Write config.json only if it does not already exist."""
self.seed_home.mkdir(parents=True, exist_ok=True)
config_path = self.seed_home / "config.json"
if config_path.exists():
return
config = json.loads(json.dumps(SEED_CONFIG)) # deep copy
config["node"]["listen"] = [f"127.0.0.1:{self.port}"]
config_path.write_text(json.dumps(config, indent=2))
# ── Lifecycle ─────────────────────────────────────────────────────────────
def start(self):
"""Start radicle-node for the seed. Raises if not initialized."""
if not self.is_initialized():
raise RuntimeError(
f"Seed identity not found. Initialize it with:\n"
f" RAD_HOME={self.seed_home} rad auth\n"
f"Then run 'radicle-rns seed' again."
)
self.write_config()
# Discard stdout/stderr — if kept as PIPE the buffer fills and the
# process deadlocks once radicle-node produces more than ~64 KB of logs.
self._process = subprocess.Popen(
["radicle-node"],
env=self._env(),
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
)
# Wait up to 10s for the port to open.
deadline = time.time() + 10
while time.time() < deadline:
if self._process.poll() is not None:
raise RuntimeError(
f"Seed radicle-node exited immediately after launch. "
f"Check: RAD_HOME={self.seed_home} radicle-node"
)
if self._port_open():
return
time.sleep(0.3)
if self._process.poll() is not None:
raise RuntimeError("Seed radicle-node exited before port opened.")
# Still running but port not open — may be slow on first run; continue.
print(f"Warning: seed port {self.port} not yet open after 10s, continuing anyway.")
def stop(self):
"""Stop the seed radicle-node."""
if self._process and self._process.poll() is None:
self._process.terminate()
try:
self._process.wait(timeout=5)
except subprocess.TimeoutExpired:
self._process.kill()
self._process.wait() # reap zombie
def is_running(self) -> bool:
return self._process is not None and self._process.poll() is None
# ── Queries ───────────────────────────────────────────────────────────────
def get_nid(self) -> Optional[str]:
"""Return the seed's radicle NID (z6Mk...)."""
result = subprocess.run(
["rad", "self"],
env=self._env(),
capture_output=True,
text=True,
timeout=5,
)
if result.returncode != 0:
return None
for line in result.stdout.splitlines():
parts = line.split()
if len(parts) >= 2 and parts[0] == "NID":
return parts[1]
return None
def connect_peer(self, nid: str, addr: str) -> bool:
"""Tell the seed to connect to a remote peer."""
result = subprocess.run(
["rad", "node", "connect", f"{nid}@{addr}"],
env=self._env(),
capture_output=True,
text=True,
timeout=30,
)
return result.returncode == 0
# ── Internal ──────────────────────────────────────────────────────────────
def _env(self) -> dict:
env = os.environ.copy()
env["RAD_HOME"] = str(self.seed_home)
return env
def _port_open(self) -> bool:
try:
with socket.create_connection(("127.0.0.1", self.port), timeout=0.5):
return True
except OSError:
return False

206
tests/test_adapter.py Normal file
View File

@ -0,0 +1,206 @@
"""Tests for RNSTransportAdapter peer discovery logic (RNS networking mocked)."""
import time
from unittest.mock import MagicMock, patch, call
import pytest
import RNS
from radicle_reticulum.adapter import (
PeerInfo,
RNSTransportAdapter,
NODE_APP_DATA_MAGIC,
REPO_APP_DATA_MAGIC,
)
from radicle_reticulum.identity import RadicleIdentity
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _make_dest_mock(hash_bytes: bytes = b"\xaa" * 16) -> MagicMock:
dest = MagicMock()
dest.hash = hash_bytes
dest.hexhash = hash_bytes.hex()
return dest
def _make_adapter() -> RNSTransportAdapter:
"""Instantiate adapter with all RNS I/O patched out."""
identity = RadicleIdentity.generate()
dest_mock = _make_dest_mock()
with patch("radicle_reticulum.adapter.RNS.Reticulum"), \
patch("radicle_reticulum.adapter.RNS.Destination", return_value=dest_mock), \
patch("radicle_reticulum.adapter.RNS.log"):
adapter = RNSTransportAdapter(identity=identity)
return adapter
# ---------------------------------------------------------------------------
# PeerInfo
# ---------------------------------------------------------------------------
class TestPeerInfo:
def test_age_increases_over_time(self):
identity = RadicleIdentity.generate()
peer = PeerInfo(
identity=identity,
destination_hash=b"\x01" * 16,
last_seen=time.time() - 5.0,
)
assert peer.age >= 5.0
def test_age_near_zero_for_fresh_peer(self):
identity = RadicleIdentity.generate()
peer = PeerInfo(
identity=identity,
destination_hash=b"\x02" * 16,
last_seen=time.time(),
)
assert peer.age < 1.0
def test_announced_repos_defaults_empty(self):
identity = RadicleIdentity.generate()
peer = PeerInfo(identity=identity, destination_hash=b"\x03" * 16, last_seen=0)
assert peer.announced_repos == set()
# ---------------------------------------------------------------------------
# Adapter construction & peer list
# ---------------------------------------------------------------------------
class TestAdapterConstruction:
def test_identity_is_stored(self):
adapter = _make_adapter()
assert adapter.identity is not None
assert adapter.identity.did.startswith("did:key:")
def test_initial_peer_list_is_empty(self):
adapter = _make_adapter()
assert adapter.get_peers() == []
def test_get_peer_by_did_returns_none_when_absent(self):
adapter = _make_adapter()
assert adapter.get_peer_by_did("did:key:z6Mkunknown") is None
# ---------------------------------------------------------------------------
# _handle_announce
# ---------------------------------------------------------------------------
class TestHandleAnnounce:
def _make_rns_identity(self) -> RNS.Identity:
"""Create a real RNS.Identity (keypair only, no networking)."""
return RNS.Identity()
def test_ignores_own_announce(self):
adapter = _make_adapter()
own_hash = adapter.node_destination.hash # the mock's hash
discovered = []
adapter.set_on_peer_discovered(discovered.append)
rns_id = self._make_rns_identity()
with patch("radicle_reticulum.adapter.RNS.log"):
adapter._handle_announce(own_hash, rns_id, NODE_APP_DATA_MAGIC)
assert discovered == []
assert adapter.get_peers() == []
def test_ignores_non_radicle_announce(self):
adapter = _make_adapter()
rns_id = self._make_rns_identity()
foreign_hash = b"\xbb" * 16
discovered = []
adapter.set_on_peer_discovered(discovered.append)
with patch("radicle_reticulum.adapter.RNS.log"):
adapter._handle_announce(foreign_hash, rns_id, b"SOME_OTHER_APP")
assert discovered == []
def test_ignores_announce_with_no_app_data(self):
adapter = _make_adapter()
rns_id = self._make_rns_identity()
with patch("radicle_reticulum.adapter.RNS.log"):
adapter._handle_announce(b"\xcc" * 16, rns_id, None)
assert adapter.get_peers() == []
def test_discovers_valid_peer(self):
adapter = _make_adapter()
rns_id = self._make_rns_identity()
peer_hash = b"\xdd" * 16
discovered = []
adapter.set_on_peer_discovered(discovered.append)
with patch("radicle_reticulum.adapter.RNS.log"):
adapter._handle_announce(peer_hash, rns_id, NODE_APP_DATA_MAGIC)
assert len(discovered) == 1
peers = adapter.get_peers()
assert len(peers) == 1
assert peers[0].destination_hash == peer_hash
def test_second_announce_updates_last_seen_not_duplicates(self):
adapter = _make_adapter()
rns_id = self._make_rns_identity()
peer_hash = b"\xee" * 16
discovered = []
adapter.set_on_peer_discovered(discovered.append)
with patch("radicle_reticulum.adapter.RNS.log"):
adapter._handle_announce(peer_hash, rns_id, NODE_APP_DATA_MAGIC)
old_seen = adapter.get_peers()[0].last_seen
time.sleep(0.01)
adapter._handle_announce(peer_hash, rns_id, NODE_APP_DATA_MAGIC)
# Callback only fires once (first discovery)
assert len(discovered) == 1
# But last_seen was refreshed
assert adapter.get_peers()[0].last_seen >= old_seen
def test_multiple_distinct_peers_are_all_tracked(self):
adapter = _make_adapter()
with patch("radicle_reticulum.adapter.RNS.log"):
for i in range(3):
rns_id = self._make_rns_identity()
peer_hash = bytes([i]) * 16
adapter._handle_announce(peer_hash, rns_id, NODE_APP_DATA_MAGIC)
assert len(adapter.get_peers()) == 3
def test_get_peer_by_did_returns_correct_peer(self):
adapter = _make_adapter()
rns_id = self._make_rns_identity()
peer_hash = b"\xff" * 16
with patch("radicle_reticulum.adapter.RNS.log"):
adapter._handle_announce(peer_hash, rns_id, NODE_APP_DATA_MAGIC)
peers = adapter.get_peers()
did = peers[0].identity.did
found = adapter.get_peer_by_did(did)
assert found is not None
assert found.destination_hash == peer_hash
def test_extra_app_data_after_magic_still_accepted(self):
"""Announces with extra bytes after the magic prefix are valid."""
adapter = _make_adapter()
rns_id = self._make_rns_identity()
peer_hash = b"\x11" * 16
with patch("radicle_reticulum.adapter.RNS.log"):
adapter._handle_announce(
peer_hash, rns_id, NODE_APP_DATA_MAGIC + b"\x00\x01extra"
)
assert len(adapter.get_peers()) == 1

View File

@ -252,196 +252,6 @@ class TestOnPacket:
assert received == [("rad:z3abc123", "z6Mkpeer")] assert received == [("rad:z3abc123", "z6Mkpeer")]
def test_delta_packet_merges_with_local_refs(self, tmp_path):
"""A delta packet updates only the changed refs, keeping others intact."""
relay = _make_relay(tmp_path)
relay._known_refs["rad:z3abc123"] = {
"refs/heads/main": "aaa",
"refs/heads/dev": "bbb",
}
delta_packet = json.dumps({
"type": "refs",
"rid": "rad:z3abc123",
"nid": "z6Mkpeer",
"delta": True,
"refs": {"refs/heads/main": "ccc"}, # only main changed
}).encode()
with patch.object(relay, "_trigger_sync") as mock_sync:
relay._on_packet(delta_packet, MagicMock())
mock_sync.assert_called_once()
def test_delta_packet_no_sync_when_already_known(self, tmp_path):
"""Delta packet with refs we already know should not trigger sync."""
relay = _make_relay(tmp_path)
relay._known_refs["rad:z3abc123"] = {"refs/heads/main": "aaa"}
delta_packet = json.dumps({
"type": "refs",
"rid": "rad:z3abc123",
"nid": "z6Mkpeer",
"delta": True,
"refs": {"refs/heads/main": "aaa"}, # same value
}).encode()
with patch.object(relay, "_trigger_sync") as mock_sync:
relay._on_packet(delta_packet, MagicMock())
mock_sync.assert_not_called()
def test_auto_seed_triggered_for_unknown_repo(self, tmp_path):
"""When auto_seed=True and RID is unknown, _auto_seed_and_sync is called."""
relay = _make_relay(tmp_path, auto_seed=True)
packet = self._make_packet("rad:z3brand_new", "z6Mkpeer", SAMPLE_REFS)
with patch.object(relay, "_auto_seed_and_sync") as mock_auto, \
patch.object(relay, "_trigger_sync") as mock_sync:
relay._on_packet(packet, MagicMock())
mock_auto.assert_called_once_with("rad:z3brand_new", "z6Mkpeer")
mock_sync.assert_not_called()
def test_auto_seed_not_triggered_for_tracked_repo(self, tmp_path):
"""auto_seed=True must not re-seed repos already in self.rids."""
relay = _make_relay(tmp_path, auto_seed=True)
# rad:z3abc123 is already in rids (default from _make_relay)
relay._known_refs["rad:z3abc123"] = {"refs/heads/main": "old"}
with patch.object(relay, "_auto_seed_and_sync") as mock_auto, \
patch.object(relay, "_trigger_sync") as mock_sync:
relay._on_packet(
self._make_packet("rad:z3abc123", "z6Mkpeer", {"refs/heads/main": "new"}),
MagicMock(),
)
mock_auto.assert_not_called()
mock_sync.assert_called_once()
# ── Broadcast delta ───────────────────────────────────────────────────────────
class TestBroadcastDelta:
def test_full_broadcast_when_no_old_refs(self, tmp_path):
relay = _make_relay(tmp_path)
relay._known_peers[b"\x01" * 16] = 0.0
sent = []
relay._send_packet = lambda h, p: sent.append(json.loads(p)) or True
relay._broadcast("rad:z3abc123", SAMPLE_REFS)
assert len(sent) == 1
assert sent[0].get("delta") is None or sent[0].get("delta") is False
assert sent[0]["refs"] == SAMPLE_REFS
def test_delta_broadcast_only_sends_changed_refs(self, tmp_path):
relay = _make_relay(tmp_path)
relay._known_peers[b"\x01" * 16] = 0.0
sent = []
relay._send_packet = lambda h, p: sent.append(json.loads(p)) or True
old = {"refs/heads/main": "aaa", "refs/heads/dev": "bbb"}
new = {"refs/heads/main": "ccc", "refs/heads/dev": "bbb"} # only main changed
relay._broadcast("rad:z3abc123", new, old_refs=old)
assert len(sent) == 1
assert sent[0]["delta"] is True
assert sent[0]["refs"] == {"refs/heads/main": "ccc"}
assert "refs/heads/dev" not in sent[0]["refs"]
def test_initial_refs_send_is_full_not_delta(self, tmp_path):
relay = _make_relay(tmp_path)
relay._known_refs["rad:z3abc123"] = SAMPLE_REFS
sent = []
relay._send_packet = lambda h, p: sent.append(json.loads(p)) or True
relay._send_initial_refs(b"\x02" * 16)
assert len(sent) == 1
assert sent[0].get("delta") is None or sent[0].get("delta") is False
assert sent[0]["refs"] == SAMPLE_REFS
# ── Packet splitting (_build_ref_payloads) ───────────────────────────────────
class TestBuildRefPayloads:
def test_small_refs_fit_in_one_packet(self, tmp_path):
relay = _make_relay(tmp_path)
refs = {"refs/heads/main": "a" * 40}
payloads = relay._build_ref_payloads("rad:z3abc123", refs, is_delta=False)
assert len(payloads) == 1
msg = json.loads(payloads[0])
assert msg["refs"] == refs
def test_large_refs_split_into_multiple_packets(self, tmp_path):
relay = _make_relay(tmp_path)
# 20 refs × ~70 bytes each ≈ 1400 bytes, well over ENCRYPTED_MDU=383
many_refs = {f"refs/heads/branch-{i:03d}": "a" * 40 for i in range(20)}
payloads = relay._build_ref_payloads("rad:z3abc123", many_refs, is_delta=False)
assert len(payloads) > 1
# All packets must fit within MDU
import RNS
assert all(len(p) <= RNS.Packet.ENCRYPTED_MDU for p in payloads)
# Combined refs must cover every original ref exactly once
combined = {}
for p in payloads:
combined.update(json.loads(p)["refs"])
assert combined == many_refs
def test_delta_flag_propagated_to_all_chunks(self, tmp_path):
relay = _make_relay(tmp_path)
many_refs = {f"refs/heads/br-{i}": "b" * 40 for i in range(20)}
payloads = relay._build_ref_payloads("rad:z3abc123", many_refs, is_delta=True)
assert all(json.loads(p).get("delta") is True for p in payloads)
def test_each_packet_under_mdu(self, tmp_path):
"""Single very-long ref name must not produce an oversized packet."""
import RNS
relay = _make_relay(tmp_path)
# pathological: ref name itself is 200 chars
refs = {"refs/heads/" + "x" * 200: "c" * 40}
payloads = relay._build_ref_payloads("rad:z3abc123", refs, is_delta=False)
# Even if we can't split a single ref below MDU, we still emit it
assert len(payloads) >= 1
# ── Auto-seed ─────────────────────────────────────────────────────────────────
class TestAutoSeed:
def test_seeds_and_syncs_unknown_repo(self, tmp_path):
relay = _make_relay(tmp_path, auto_seed=True)
with patch("radicle_reticulum.gossip.subprocess.run") as mock_run, \
patch.object(relay, "_trigger_sync") as mock_sync:
mock_run.return_value = MagicMock(returncode=0)
relay._auto_seed_and_sync("rad:z3brand_new", "z6Mkpeer")
calls = [c[0][0] for c in mock_run.call_args_list]
assert any("seed" in c for c in calls)
assert "rad:z3brand_new" in relay.rids
mock_sync.assert_called_once_with("rad:z3brand_new", "z6Mkpeer")
def test_no_sync_when_seed_fails(self, tmp_path):
relay = _make_relay(tmp_path, auto_seed=True)
with patch("radicle_reticulum.gossip.subprocess.run") as mock_run, \
patch.object(relay, "_trigger_sync") as mock_sync:
mock_run.return_value = MagicMock(returncode=1, stderr="permission denied")
relay._auto_seed_and_sync("rad:z3brand_new", "z6Mkpeer")
mock_sync.assert_not_called()
assert "rad:z3brand_new" not in relay.rids
def test_does_not_duplicate_rid(self, tmp_path):
relay = _make_relay(tmp_path, auto_seed=True)
relay.rids.append("rad:z3already")
with patch("radicle_reticulum.gossip.subprocess.run") as mock_run, \
patch.object(relay, "_trigger_sync"):
mock_run.return_value = MagicMock(returncode=0)
relay._auto_seed_and_sync("rad:z3already", "z6Mkpeer")
assert relay.rids.count("rad:z3already") == 1
# ── Trigger sync ────────────────────────────────────────────────────────────── # ── Trigger sync ──────────────────────────────────────────────────────────────
@ -479,57 +289,6 @@ class TestTriggerSync:
assert "--fetch" in sync_calls[0] assert "--fetch" in sync_calls[0]
assert "--rid" in sync_calls[0] assert "--rid" in sync_calls[0]
def test_seed_flag_added_when_connect_succeeds(self, tmp_path):
relay = _make_relay(tmp_path, bridge_port=8777)
with patch("radicle_reticulum.gossip.subprocess.run") as mock_run:
mock_run.return_value = MagicMock(returncode=0)
relay._trigger_sync("rad:z3abc123", "z6Mkpeer")
all_calls = [c[0][0] for c in mock_run.call_args_list]
sync_calls = [c for c in all_calls if "sync" in c]
assert "--seed" in sync_calls[0]
assert any("z6Mkpeer@127.0.0.1:8777" in arg for arg in sync_calls[0])
def test_no_seed_flag_when_connect_fails(self, tmp_path):
relay = _make_relay(tmp_path, bridge_port=8777)
call_count = 0
def fake_run(cmd, **kwargs):
nonlocal call_count
call_count += 1
# First call (connect) fails; second call (sync) succeeds
return MagicMock(returncode=1 if call_count == 1 else 0)
with patch("radicle_reticulum.gossip.subprocess.run", side_effect=fake_run):
relay._trigger_sync("rad:z3abc123", "z6Mkpeer")
# Sync should not include --seed when connect failed
# (we can't easily inspect the call args here, but the function should not raise)
assert call_count == 2 # connect attempted + sync still ran
def test_debounce_clears_event_on_early_wakeup(self, tmp_path):
"""Poll loop should absorb rapid events and only poll once after debounce."""
relay = _make_relay(tmp_path)
relay._running = True
polled = []
original_once = relay._poll_loop_once
relay._poll_loop_once = lambda: polled.append(1)
# Simulate: event already set when loop starts
relay._poll_event.set()
import threading, time
t = threading.Thread(target=relay._poll_loop, daemon=True)
t.start()
time.sleep(0.1)
relay._running = False
relay._poll_event.set()
t.join(timeout=3)
# Should have polled at least once (startup poll) and at most twice
assert 1 <= len(polled) <= 2
# ── Peer discovery ──────────────────────────────────────────────────────────── # ── Peer discovery ────────────────────────────────────────────────────────────
@ -629,7 +388,7 @@ class TestWatchdog:
broadcasts = [] broadcasts = []
with patch("radicle_reticulum.gossip._read_refs", return_value=new_refs), \ with patch("radicle_reticulum.gossip._read_refs", return_value=new_refs), \
patch.object(relay, "_broadcast", side_effect=lambda r, refs, **kw: broadcasts.append(r)): patch.object(relay, "_broadcast", side_effect=lambda r, refs: broadcasts.append(r)):
# Signal the event (simulates watchdog firing) # Signal the event (simulates watchdog firing)
relay._poll_event.set() relay._poll_event.set()
# Run one poll iteration # Run one poll iteration

View File

@ -296,64 +296,70 @@ class TestBridgeDiscoveryIntegration:
class TestTCPTunnelIntegration: class TestTCPTunnelIntegration:
"""Test bidirectional data forwarding through the bridge's tunnel layer.""" """Test bidirectional data forwarding through the bridge's tunnel layer."""
def test_data_forwarded_tcp_to_rns_buffer(self): def test_data_forwarded_tcp_to_rns_link(self):
"""Data written to the TCP socket should be written to the RNS Buffer.""" """Data written to the TCP socket should be sent as an RNS.Packet."""
bridge = _make_bridge() bridge = _make_bridge()
bridge._running = True bridge._running = True
# Create a real socket pair to simulate radicle-node ↔ bridge TCP
local, remote = socket.socketpair() local, remote = socket.socketpair()
written_data = [] sent_data = []
mock_buf = MagicMock() mock_link = MagicMock()
mock_buf.write.side_effect = lambda d: written_data.append(bytes(d)) mock_link.status = MagicMock()
mock_link.status.__eq__ = lambda s, other: other == "ACTIVE"
from radicle_reticulum.bridge import TunnelConnection mock_packet = MagicMock()
tunnel = TunnelConnection(
tunnel_id=1,
tcp_socket=local,
rns_link=MagicMock(),
remote_destination=b"\x00" * 16,
buf=mock_buf,
)
payload = b"hello from radicle-node" with patch("radicle_reticulum.bridge.RNS.Link.ACTIVE", "ACTIVE"), \
remote.sendall(payload) patch("radicle_reticulum.bridge.RNS.Packet") as mock_pkt_cls, \
remote.close() patch("radicle_reticulum.bridge.RNS.log"):
mock_pkt_cls.side_effect = lambda lnk, data: sent_data.append(data) or MagicMock()
from radicle_reticulum.bridge import TunnelConnection
tunnel = TunnelConnection(
tunnel_id=1,
tcp_socket=local,
rns_link=mock_link,
remote_destination=b"\x00" * 16,
)
# Write data from the "radicle-node" side
payload = b"hello from radicle-node"
remote.sendall(payload)
remote.close() # triggers EOF so forward loop exits
with patch("radicle_reticulum.bridge.RNS.log"):
bridge._forward_tcp_to_rns(tunnel) bridge._forward_tcp_to_rns(tunnel)
local.close() local.close()
assert b"".join(written_data) == payload assert any(payload in d for d in sent_data), \
f"Expected {payload!r} in forwarded data, got {sent_data}"
def test_rns_data_forwarded_to_tcp_socket(self): def test_rns_data_forwarded_to_tcp_socket(self):
"""Data received from RNS Buffer should be written to the TCP socket.""" """Data received from RNS should be written to the TCP socket."""
bridge = _make_bridge() bridge = _make_bridge()
bridge._running = True bridge._running = True
local, remote = socket.socketpair()
payload = b"hello from remote radicle-node"
mock_buf = MagicMock()
mock_buf.read.return_value = payload
from radicle_reticulum.bridge import TunnelConnection
tunnel = TunnelConnection(
tunnel_id=2,
tcp_socket=local,
rns_link=MagicMock(),
remote_destination=b"\x00" * 16,
buf=mock_buf,
)
with bridge._tunnels_lock:
bridge._tunnels[2] = tunnel
with patch("radicle_reticulum.bridge.RNS.log"): with patch("radicle_reticulum.bridge.RNS.log"):
bridge._on_rns_data(2, len(payload)) # Create socket pair: bridge writes to `local`, test reads from `remote`
local, remote = socket.socketpair()
received = remote.recv(1024) from radicle_reticulum.bridge import TunnelConnection
local.close() tunnel = TunnelConnection(
remote.close() tunnel_id=2,
tcp_socket=local,
rns_link=MagicMock(),
remote_destination=b"\x00" * 16,
)
with bridge._tunnels_lock:
bridge._tunnels[2] = tunnel
payload = b"hello from remote radicle-node"
bridge._on_rns_data(2, payload)
received = remote.recv(1024)
local.close()
remote.close()
assert received == payload assert received == payload

196
tests/test_link.py Normal file
View File

@ -0,0 +1,196 @@
"""Tests for RadicleLink (pure logic — no RNS networking required)."""
import threading
import time
from unittest.mock import MagicMock, patch
import pytest
from radicle_reticulum.link import RadicleLink, LinkState
def make_link(state: LinkState = LinkState.ACTIVE) -> tuple[RadicleLink, MagicMock]:
"""Create a RadicleLink with a mock RNS.Link."""
mock_rns_link = MagicMock()
mock_rns_link.rtt = None
link = RadicleLink(rns_link=mock_rns_link, state=state)
return link, mock_rns_link
class TestRadicleLinkState:
def test_active_link_is_active(self):
link, _ = make_link(LinkState.ACTIVE)
assert link.is_active
assert link.state == LinkState.ACTIVE
def test_pending_link_is_not_active(self):
link, _ = make_link(LinkState.PENDING)
assert not link.is_active
def test_on_established_sets_active(self):
link, _ = make_link(LinkState.PENDING)
link._on_established(MagicMock())
assert link.state == LinkState.ACTIVE
assert link.is_active
def test_on_closed_sets_closed(self):
link, _ = make_link(LinkState.ACTIVE)
link._on_closed(MagicMock())
assert link.state == LinkState.CLOSED
assert not link.is_active
def test_close_calls_teardown(self):
link, mock_rns = make_link(LinkState.ACTIVE)
link.close()
mock_rns.teardown.assert_called_once()
assert link.state == LinkState.CLOSED
def test_close_on_inactive_link_is_noop(self):
link, mock_rns = make_link(LinkState.CLOSED)
link.close()
mock_rns.teardown.assert_not_called()
def test_repr(self):
link, _ = make_link(LinkState.ACTIVE)
r = repr(link)
assert "active" in r
assert "RadicleLink" in r
class TestRadicleLinkSend:
def test_send_on_active_link_succeeds(self):
link, _ = make_link(LinkState.ACTIVE)
with patch("radicle_reticulum.link.RNS.Packet") as mock_packet_cls:
mock_packet = MagicMock()
mock_packet_cls.return_value = mock_packet
result = link.send(b"hello world")
assert result is True
mock_packet.send.assert_called_once()
def test_send_on_inactive_link_returns_false(self):
link, _ = make_link(LinkState.CLOSED)
result = link.send(b"data")
assert result is False
def test_send_on_pending_link_returns_false(self):
link, _ = make_link(LinkState.PENDING)
result = link.send(b"data")
assert result is False
def test_send_exception_returns_false(self):
link, _ = make_link(LinkState.ACTIVE)
with patch("radicle_reticulum.link.RNS.Packet") as mock_packet_cls:
mock_packet_cls.side_effect = RuntimeError("send failed")
result = link.send(b"data")
assert result is False
class TestRadicleLinkRecv:
def test_recv_returns_buffered_data(self):
link, _ = make_link()
link._on_packet(b"buffered", MagicMock())
result = link.recv(timeout=0.1)
assert result == b"buffered"
def test_recv_returns_in_order(self):
link, _ = make_link()
for i in range(3):
link._on_packet(f"msg{i}".encode(), MagicMock())
assert link.recv(timeout=0.1) == b"msg0"
assert link.recv(timeout=0.1) == b"msg1"
assert link.recv(timeout=0.1) == b"msg2"
def test_recv_timeout_returns_none(self):
link, _ = make_link()
start = time.time()
result = link.recv(timeout=0.05)
elapsed = time.time() - start
assert result is None
assert elapsed >= 0.04
def test_recv_woken_by_data(self):
link, _ = make_link()
results = []
def delayed_send():
time.sleep(0.05)
link._on_packet(b"delayed", MagicMock())
t = threading.Thread(target=delayed_send, daemon=True)
t.start()
result = link.recv(timeout=1.0)
t.join()
assert result == b"delayed"
def test_recv_returns_none_when_closed(self):
link, _ = make_link()
link._on_closed(MagicMock())
result = link.recv(timeout=0.1)
assert result is None
def test_recv_woken_by_close(self):
"""recv() unblocks when link closes while waiting."""
link, _ = make_link()
result_holder = []
def close_after_delay():
time.sleep(0.05)
link._on_closed(MagicMock())
t = threading.Thread(target=close_after_delay, daemon=True)
t.start()
result = link.recv(timeout=2.0)
t.join()
assert result is None
def test_on_packet_calls_on_data_callback(self):
link, _ = make_link()
received = []
link.on_data = received.append
link._on_packet(b"callback data", MagicMock())
assert received == [b"callback data"]
def test_on_closed_calls_on_close_callback(self):
link, _ = make_link()
called = []
link.on_close = lambda: called.append(True)
link._on_closed(MagicMock())
assert called == [True]
class TestRadicleLinkProperties:
def test_rtt_returns_none_when_unavailable(self):
link, mock_rns = make_link()
mock_rns.rtt = None
assert link.rtt is None
def test_rtt_returns_value_when_available(self):
link, mock_rns = make_link()
mock_rns.rtt = 0.15
assert link.rtt == pytest.approx(0.15)
def test_remote_identity_delegates_to_rns(self):
link, mock_rns = make_link()
fake_id = MagicMock()
mock_rns.get_remote_identity.return_value = fake_id
assert link.remote_identity is fake_id
class TestRadicleLinkFactories:
def test_from_incoming_is_active(self):
mock_rns = MagicMock()
link = RadicleLink.from_incoming(mock_rns)
assert link.state == LinkState.ACTIVE
def test_from_incoming_does_not_set_established_callback(self):
mock_rns = MagicMock()
RadicleLink.from_incoming(mock_rns)
mock_rns.set_link_established_callback.assert_not_called()
def test_create_outbound_is_pending(self):
mock_dest = MagicMock()
with patch("radicle_reticulum.link.RNS.Link") as mock_link_cls:
mock_rns = MagicMock()
mock_link_cls.return_value = mock_rns
link = RadicleLink.create_outbound(mock_dest)
assert link.state == LinkState.PENDING

192
tests/test_messages.py Normal file
View File

@ -0,0 +1,192 @@
"""Tests for message framing."""
import pytest
import time
from radicle_reticulum.messages import (
MessageType,
MessageHeader,
NodeAnnouncement,
InventoryAnnouncement,
RefAnnouncement,
Ping,
Pong,
decode_message,
HEADER_SIZE,
)
class TestMessageHeader:
"""Test message header encoding/decoding."""
def test_encode_decode_roundtrip(self):
"""Test header encode/decode roundtrip."""
header = MessageHeader(
msg_type=MessageType.NODE_ANNOUNCEMENT,
timestamp=1234567890123,
payload_length=42,
)
encoded = header.encode()
assert len(encoded) == HEADER_SIZE
decoded = MessageHeader.decode(encoded)
assert decoded.msg_type == header.msg_type
assert decoded.timestamp == header.timestamp
assert decoded.payload_length == header.payload_length
def test_header_too_short(self):
"""Test that short data raises error."""
with pytest.raises(ValueError, match="Header too short"):
MessageHeader.decode(b"\x00\x01")
class TestNodeAnnouncement:
"""Test NodeAnnouncement message."""
def test_encode_decode_roundtrip(self):
"""Test encode/decode roundtrip."""
msg = NodeAnnouncement(
node_id="did:key:z6MkhaXgBZDvotDkL5257faiztiGiC2QtKLGpbnnEGta2doK",
features=0x0003,
version=1,
)
encoded = msg.encode()
decoded = NodeAnnouncement.decode(encoded)
assert decoded.node_id == msg.node_id
assert decoded.features == msg.features
assert decoded.version == msg.version
def test_to_message_includes_header(self):
"""Test that to_message includes proper header."""
msg = NodeAnnouncement(node_id="did:key:z6Mk...")
full_message = msg.to_message()
header = MessageHeader.decode(full_message)
assert header.msg_type == MessageType.NODE_ANNOUNCEMENT
assert header.timestamp > 0
assert header.payload_length == len(msg.encode())
class TestInventoryAnnouncement:
"""Test InventoryAnnouncement message."""
def test_encode_decode_roundtrip(self):
"""Test encode/decode roundtrip."""
msg = InventoryAnnouncement(
node_id="did:key:z6MkhaXgBZDvotDkL5257faiztiGiC2QtKLGpbnnEGta2doK",
repositories=[
"rad:z3gqcJUoA1n9HaHKufZs5FCSGazv5",
"rad:z4gqcJUoA1n9HaHKufZs5FCSGazv6",
],
)
encoded = msg.encode()
decoded = InventoryAnnouncement.decode(encoded)
assert decoded.node_id == msg.node_id
assert decoded.repositories == msg.repositories
def test_empty_repositories(self):
"""Test with empty repository list."""
msg = InventoryAnnouncement(
node_id="did:key:z6Mk...",
repositories=[],
)
encoded = msg.encode()
decoded = InventoryAnnouncement.decode(encoded)
assert decoded.repositories == []
class TestRefAnnouncement:
"""Test RefAnnouncement message."""
def test_encode_decode_roundtrip(self):
"""Test encode/decode roundtrip."""
msg = RefAnnouncement(
repository_id="rad:z3gqcJUoA1n9HaHKufZs5FCSGazv5",
ref_name="refs/heads/main",
old_oid=b"\x00" * 20,
new_oid=bytes.fromhex("abc123def456789012345678901234567890abcd"),
signature=b"fake_signature_bytes",
)
encoded = msg.encode()
decoded = RefAnnouncement.decode(encoded)
assert decoded.repository_id == msg.repository_id
assert decoded.ref_name == msg.ref_name
assert decoded.old_oid == msg.old_oid
assert decoded.new_oid == msg.new_oid
assert decoded.signature == msg.signature
class TestPingPong:
"""Test Ping/Pong messages."""
def test_ping_encode_decode(self):
"""Test Ping encode/decode."""
ping = Ping()
encoded = ping.encode()
decoded = Ping.decode(encoded)
assert decoded.nonce == ping.nonce
def test_pong_echoes_nonce(self):
"""Test Pong echoes ping nonce."""
ping = Ping()
pong = Pong(nonce=ping.nonce)
assert pong.nonce == ping.nonce
class TestDecodeMessage:
"""Test the decode_message function."""
def test_decode_node_announcement(self):
"""Test decoding a NodeAnnouncement message."""
msg = NodeAnnouncement(node_id="did:key:z6Mk...")
full_message = msg.to_message()
header, decoded = decode_message(full_message)
assert header.msg_type == MessageType.NODE_ANNOUNCEMENT
assert isinstance(decoded, NodeAnnouncement)
assert decoded.node_id == msg.node_id
def test_decode_inventory_announcement(self):
"""Test decoding an InventoryAnnouncement message."""
msg = InventoryAnnouncement(
node_id="did:key:z6Mk...",
repositories=["repo1", "repo2"],
)
full_message = msg.to_message()
header, decoded = decode_message(full_message)
assert header.msg_type == MessageType.INVENTORY_ANNOUNCEMENT
assert isinstance(decoded, InventoryAnnouncement)
assert decoded.repositories == msg.repositories
def test_decode_ping(self):
"""Test decoding a Ping message."""
ping = Ping()
full_message = ping.to_message()
header, decoded = decode_message(full_message)
assert header.msg_type == MessageType.PING
assert isinstance(decoded, Ping)
def test_unknown_message_type_raises(self):
"""Test that unknown message types raise error."""
# Create a message with invalid type
import struct
bad_message = struct.pack("!BQH", 0xFF, 0, 0)
with pytest.raises(ValueError, match="Unknown message type"):
decode_message(bad_message)