radicle-reticulum/src/radicle_reticulum/bridge.py

651 lines
24 KiB
Python

"""TCP-to-Reticulum bridge for Radicle.
Bridges radicle-node's TCP connections over Reticulum mesh network.
Allows using Radicle normally while syncing over LoRa, packet radio, etc.
Architecture:
┌─────────────┐ TCP ┌─────────────┐ Reticulum ┌─────────────┐
│ radicle-node│ ◄──────────► │ Bridge │ ◄───────────► │Remote Bridge│
└─────────────┘ localhost └─────────────┘ LoRa/mesh └─────────────┘
The bridge:
1. Allocates a dedicated TCP port per discovered remote bridge
2. Accepts connections from local radicle-node on those ports
3. Tunnels traffic over Reticulum to the correct remote bridge
4. Remote bridges forward to their local radicle-node
"""
import os
import socket
import select
import struct
import subprocess
import threading
import time
from dataclasses import dataclass, field
from typing import Callable, Dict, List, Optional, Set, Tuple
import RNS
from radicle_reticulum.identity import RadicleIdentity
# Default ports
RADICLE_DEFAULT_PORT = 8776
BRIDGE_DEFAULT_PORT = 8777 # Base listen port (first bridge gets this)
# App name for RNS destinations
APP_NAME = "radicle"
ASPECT_BRIDGE = "bridge"
# App data identifier for bridge announces (used for filtering)
BRIDGE_APP_DATA_MAGIC = b"RADICLE_BRIDGE_V1"
# Buffer sizes
TCP_BUFFER_SIZE = 65536
RNS_BUFFER_SIZE = 32768 # Smaller for RNS to avoid fragmentation
@dataclass
class TunnelConnection:
"""Represents a tunneled TCP connection."""
tunnel_id: int
tcp_socket: Optional[socket.socket]
rns_link: Optional[RNS.Link]
remote_destination: Optional[bytes]
created_at: float = field(default_factory=time.time)
bytes_sent: int = 0
bytes_received: int = 0
active: bool = True
def close(self):
"""Close the tunnel."""
self.active = False
if self.tcp_socket:
try:
self.tcp_socket.close()
except OSError:
pass
if self.rns_link:
try:
self.rns_link.teardown()
except Exception:
pass
class RadicleBridge:
"""Bridges Radicle TCP connections over Reticulum.
Each discovered remote bridge gets its own dedicated TCP listen port so
that radicle-node connections are always routed to the correct peer.
The first bridge gets listen_port; subsequent bridges get OS-assigned ports.
"""
def __init__(
self,
identity: Optional[RadicleIdentity] = None,
listen_port: int = BRIDGE_DEFAULT_PORT,
radicle_host: str = "127.0.0.1",
radicle_port: int = RADICLE_DEFAULT_PORT,
config_path: Optional[str] = None,
auto_connect: bool = True,
auto_seed: bool = True,
announce_retry_delays: Tuple[int, ...] = (5, 15, 30),
rad_home: Optional[str] = None,
):
"""Initialize the bridge.
Args:
identity: RNS identity for this bridge
listen_port: Base TCP port; first discovered bridge gets this port,
subsequent ones get OS-assigned ports.
radicle_host: Host where radicle-node listens (for incoming tunnels)
radicle_port: Port where radicle-node listens
config_path: Path to Reticulum config
auto_connect: Automatically connect to discovered bridges
auto_seed: Automatically register discovered NIDs with radicle-node
announce_retry_delays: Seconds between startup re-announces. Use longer
intervals on LoRa to respect duty cycle limits, e.g. (60, 300, 900).
rad_home: RAD_HOME for rad CLI calls. None = use system default.
Set to seed home when bridging a dedicated seed node.
"""
self.listen_port = listen_port
self.radicle_host = radicle_host
self.radicle_port = radicle_port
self.auto_connect = auto_connect
self.auto_seed = auto_seed
self.announce_retry_delays = announce_retry_delays
self.rad_home = rad_home
# Initialize Reticulum
self.reticulum = RNS.Reticulum(config_path)
# Identity
if identity is None:
identity = RadicleIdentity.generate()
self.identity = identity
# Create RNS destination for incoming tunnels
self.destination = RNS.Destination(
self.identity.rns_identity,
RNS.Destination.IN,
RNS.Destination.SINGLE,
APP_NAME,
ASPECT_BRIDGE,
)
self.destination.set_link_established_callback(self._on_incoming_link)
# Known remote bridges: hash -> last_seen
self._remote_bridges: Dict[bytes, float] = {}
self._remote_bridges_lock = threading.Lock()
# Per-bridge dedicated TCP servers: bridge_hash -> server socket / port
self._bridge_servers: Dict[bytes, socket.socket] = {}
self._bridge_ports: Dict[bytes, int] = {}
self._servers_lock = threading.Lock()
# Active tunnels
self._tunnels: Dict[int, TunnelConnection] = {}
self._tunnel_counter = 0
self._tunnels_lock = threading.Lock()
self._running = False
# Callbacks
self._tunnel_opened_cb: Optional[Callable[[TunnelConnection], None]] = None
self._tunnel_closed_cb: Optional[Callable[[TunnelConnection], None]] = None
self._on_bridge_discovered: Optional[Callable[[bytes, Optional[str]], None]] = None
# Local radicle node NID (for announcing to remote bridges)
self._local_radicle_nid: Optional[str] = None
# Remote bridge NIDs: bridge_hash -> radicle_nid (guarded by _remote_bridges_lock)
self._bridge_nids: Dict[bytes, str] = {}
def start(self):
"""Start the bridge."""
self._running = True
# Register announce handler to discover other bridges
RNS.Transport.register_announce_handler(self._handle_announce)
RNS.log("Registered announce handler for bridge discovery", RNS.LOG_INFO)
# Announce presence; repeat a few times so peers that come up shortly
# after us don't miss it due to interface initialisation timing
self.announce()
threading.Thread(target=self._startup_announce_loop, daemon=True).start()
RNS.log("Radicle bridge started", RNS.LOG_INFO)
RNS.log(f" RNS hash: {self.destination.hexhash}", RNS.LOG_INFO)
RNS.log(f" Radicle target: {self.radicle_host}:{self.radicle_port}", RNS.LOG_INFO)
RNS.log(
f" TCP ports: allocated per remote bridge (base port {self.listen_port})",
RNS.LOG_INFO,
)
def _startup_announce_loop(self):
"""Re-announce after startup to catch peers that come up slightly later."""
for delay in self.announce_retry_delays:
time.sleep(delay)
if not self._running:
return
self.announce()
def stop(self):
"""Stop the bridge."""
self._running = False
# Close all tunnels
with self._tunnels_lock:
for tunnel in list(self._tunnels.values()):
tunnel.close()
self._tunnels.clear()
# Close all per-bridge TCP servers
with self._servers_lock:
for srv in self._bridge_servers.values():
try:
srv.close()
except OSError:
pass
self._bridge_servers.clear()
self._bridge_ports.clear()
RNS.log("Radicle bridge stopped", RNS.LOG_INFO)
def set_local_radicle_nid(self, nid: str):
"""Set the local radicle node's NID for announcement."""
self._local_radicle_nid = nid
RNS.log(f"Local radicle NID set: {nid[:32]}...", RNS.LOG_INFO)
def get_remote_bridge_nid(self, bridge_hash: bytes) -> Optional[str]:
"""Get the radicle NID served by a remote bridge."""
with self._remote_bridges_lock:
return self._bridge_nids.get(bridge_hash)
def announce(self):
"""Announce this bridge on the network."""
app_data = BRIDGE_APP_DATA_MAGIC
if self._local_radicle_nid:
nid_bytes = self._local_radicle_nid.encode("utf-8")
app_data += struct.pack("!H", len(nid_bytes)) + nid_bytes
self.destination.announce(app_data=app_data)
RNS.log(
f"Announced bridge: {self.destination.hexhash} (app_data={len(app_data)} bytes)",
RNS.LOG_INFO,
)
def connect_to_bridge(self, destination_hash: bytes, timeout: float = 30.0) -> bool:
"""Connect to a remote bridge (establish RNS path).
Actual per-connection tunneling is set up lazily when radicle-node
connects to the allocated TCP port.
"""
if not RNS.Transport.has_path(destination_hash):
RNS.Transport.request_path(destination_hash)
deadline = time.time() + timeout
while not RNS.Transport.has_path(destination_hash):
if time.time() > deadline:
RNS.log(f"Path timeout: {destination_hash.hex()}", RNS.LOG_WARNING)
return False
time.sleep(0.1)
with self._remote_bridges_lock:
self._remote_bridges[destination_hash] = time.time()
RNS.log(f"Registered remote bridge: {destination_hash.hex()}", RNS.LOG_INFO)
return True
def get_remote_bridges(self) -> List[bytes]:
"""Get list of known remote bridge hashes."""
with self._remote_bridges_lock:
return list(self._remote_bridges.keys())
# ── Per-bridge TCP server ────────────────────────────────────────────────
def _allocate_port_for_bridge(self, bridge_hash: bytes) -> int:
"""Allocate a dedicated TCP listen port for a remote bridge.
The first bridge claims listen_port if available; subsequent bridges
get OS-assigned ephemeral ports. Idempotent — returns the same port
on repeated calls for the same bridge_hash.
"""
with self._servers_lock:
if bridge_hash in self._bridge_ports:
return self._bridge_ports[bridge_hash]
# Try the configured listen_port first (first bridge keeps the
# well-known port); fall back to OS-assigned if already taken.
preferred = self.listen_port if not self._bridge_ports else 0
srv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
srv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
try:
srv.bind(("127.0.0.1", preferred))
except OSError:
srv.bind(("127.0.0.1", 0))
srv.listen(5)
srv.setblocking(False)
port = srv.getsockname()[1]
self._bridge_servers[bridge_hash] = srv
self._bridge_ports[bridge_hash] = port
threading.Thread(
target=self._accept_loop_for_bridge,
args=(srv, bridge_hash),
daemon=True,
).start()
RNS.log(
f"Allocated TCP port {port} for bridge {bridge_hash.hex()[:16]}",
RNS.LOG_INFO,
)
return port
def _accept_loop_for_bridge(self, server: socket.socket, bridge_hash: bytes):
"""Accept TCP connections destined for a specific remote bridge."""
while self._running:
try:
readable, _, _ = select.select([server], [], [], 1.0)
if readable:
client_socket, addr = server.accept()
RNS.log(
f"TCP from {addr} → bridge {bridge_hash.hex()[:16]}",
RNS.LOG_DEBUG,
)
threading.Thread(
target=self._handle_local_connection,
args=(client_socket, bridge_hash),
daemon=True,
).start()
except Exception as e:
if self._running:
RNS.log(
f"Accept error for bridge {bridge_hash.hex()[:16]}: {e}",
RNS.LOG_ERROR,
)
def _handle_local_connection(self, tcp_socket: socket.socket, bridge_hash: bytes):
"""Handle a connection from local radicle-node, tunneled to bridge_hash."""
remote_identity = RNS.Identity.recall(bridge_hash)
if not remote_identity:
RNS.log(
f"Cannot recall identity for {bridge_hash.hex()[:16]}",
RNS.LOG_WARNING,
)
tcp_socket.close()
return
remote_dest = RNS.Destination(
remote_identity,
RNS.Destination.OUT,
RNS.Destination.SINGLE,
APP_NAME,
ASPECT_BRIDGE,
)
rns_link = RNS.Link(remote_dest)
# Wait for link establishment (Noise XK handshake)
deadline = time.time() + 30.0
while rns_link.status != RNS.Link.ACTIVE:
if rns_link.status in (RNS.Link.CLOSED, RNS.Link.FAILED):
RNS.log(
"Link closed/failed before becoming active",
RNS.LOG_WARNING,
)
tcp_socket.close()
return
if time.time() > deadline:
RNS.log("Link establishment timed out after 30s", RNS.LOG_WARNING)
tcp_socket.close()
return
time.sleep(0.1)
with self._tunnels_lock:
self._tunnel_counter += 1
tunnel_id = self._tunnel_counter
tunnel = TunnelConnection(
tunnel_id=tunnel_id,
tcp_socket=tcp_socket,
rns_link=rns_link,
remote_destination=bridge_hash,
)
with self._tunnels_lock:
self._tunnels[tunnel_id] = tunnel
RNS.log(f"Tunnel {tunnel_id} opened to {bridge_hash.hex()[:16]}", RNS.LOG_INFO)
if self._tunnel_opened_cb:
self._tunnel_opened_cb(tunnel)
rns_link.set_packet_callback(
lambda data, pkt: self._on_rns_data(tunnel_id, data)
)
rns_link.set_link_closed_callback(
lambda link: self._on_tunnel_closed(tunnel_id)
)
self._forward_tcp_to_rns(tunnel)
def _forward_tcp_to_rns(self, tunnel: TunnelConnection):
"""Forward data from TCP socket to RNS link."""
tcp_socket = tunnel.tcp_socket
rns_link = tunnel.rns_link
tcp_socket.setblocking(False)
while tunnel.active and self._running:
try:
readable, _, errored = select.select([tcp_socket], [], [tcp_socket], 1.0)
if errored:
break
if readable:
data = tcp_socket.recv(RNS_BUFFER_SIZE)
if not data:
break
if rns_link.status == RNS.Link.ACTIVE:
packet = RNS.Packet(rns_link, data)
packet.send()
tunnel.bytes_sent += len(data)
else:
break
except socket.error:
break
except Exception as e:
RNS.log(f"Forward error: {e}", RNS.LOG_DEBUG)
break
self._on_tunnel_closed(tunnel.tunnel_id)
def _on_rns_data(self, tunnel_id: int, data: bytes):
"""Handle data received from RNS link."""
with self._tunnels_lock:
tunnel = self._tunnels.get(tunnel_id)
if tunnel and tunnel.active and tunnel.tcp_socket:
try:
tunnel.tcp_socket.sendall(data)
tunnel.bytes_received += len(data)
except Exception as e:
RNS.log(f"TCP send error: {e}", RNS.LOG_DEBUG)
self._on_tunnel_closed(tunnel_id)
def _on_tunnel_closed(self, tunnel_id: int):
"""Handle tunnel closure."""
with self._tunnels_lock:
tunnel = self._tunnels.pop(tunnel_id, None)
if tunnel:
tunnel.close()
RNS.log(
f"Tunnel {tunnel_id} closed "
f"(sent: {tunnel.bytes_sent}, recv: {tunnel.bytes_received})",
RNS.LOG_INFO,
)
if self._tunnel_closed_cb:
self._tunnel_closed_cb(tunnel)
def _on_incoming_link(self, link: RNS.Link):
"""Handle incoming RNS link from remote bridge."""
RNS.log("Incoming link from remote bridge", RNS.LOG_DEBUG)
try:
tcp_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
tcp_socket.connect((self.radicle_host, self.radicle_port))
except Exception as e:
RNS.log(f"Cannot connect to local radicle-node: {e}", RNS.LOG_ERROR)
link.teardown()
return
with self._tunnels_lock:
self._tunnel_counter += 1
tunnel_id = self._tunnel_counter
tunnel = TunnelConnection(
tunnel_id=tunnel_id,
tcp_socket=tcp_socket,
rns_link=link,
remote_destination=link.destination.hash if link.destination else None,
)
with self._tunnels_lock:
self._tunnels[tunnel_id] = tunnel
RNS.log(f"Incoming tunnel {tunnel_id} opened", RNS.LOG_INFO)
link.set_packet_callback(
lambda data, pkt: self._on_rns_data(tunnel_id, data)
)
link.set_link_closed_callback(
lambda l: self._on_tunnel_closed(tunnel_id)
)
threading.Thread(
target=self._forward_tcp_to_rns,
args=(tunnel,),
daemon=True,
).start()
# ── Bridge discovery ─────────────────────────────────────────────────────
def set_on_bridge_discovered(
self, callback: Optional[Callable[[bytes, Optional[str]], None]]
):
"""Set callback for when a new bridge is discovered.
Callback: (destination_hash: bytes, radicle_nid: Optional[str])
"""
self._on_bridge_discovered = callback
def _handle_announce(
self,
destination_hash: bytes,
announced_identity: RNS.Identity,
app_data: Optional[bytes],
):
"""Handle RNS announce — process only bridge announces."""
RNS.log(
f"Received announce: {destination_hash.hex()[:16]}... "
f"app_data={app_data[:20] if app_data else None}",
RNS.LOG_VERBOSE,
)
if destination_hash == self.destination.hash:
RNS.log("Ignoring own announcement", RNS.LOG_VERBOSE)
return
if app_data is None or not app_data.startswith(BRIDGE_APP_DATA_MAGIC):
RNS.log("Ignoring non-bridge announce (no magic)", RNS.LOG_VERBOSE)
return
radicle_nid = None
if len(app_data) > len(BRIDGE_APP_DATA_MAGIC):
try:
offset = len(BRIDGE_APP_DATA_MAGIC)
nid_len = struct.unpack("!H", app_data[offset:offset + 2])[0]
offset += 2
available = len(app_data) - offset
nid_len = min(nid_len, available)
nid_str = app_data[offset:offset + nid_len].decode("utf-8")
radicle_nid = nid_str if nid_str else None
except Exception as e:
RNS.log(f"Failed to parse bridge app_data: {e}", RNS.LOG_DEBUG)
with self._remote_bridges_lock:
is_new = destination_hash not in self._remote_bridges
self._remote_bridges[destination_hash] = time.time()
nid_is_new = bool(
radicle_nid and destination_hash not in self._bridge_nids
)
if radicle_nid:
self._bridge_nids[destination_hash] = radicle_nid
if is_new:
nid_info = f" (NID: {radicle_nid[:32]}...)" if radicle_nid else ""
RNS.log(
f"Discovered bridge: {destination_hash.hex()}{nid_info}",
RNS.LOG_INFO,
)
if self._on_bridge_discovered:
self._on_bridge_discovered(destination_hash, radicle_nid)
if self.auto_connect:
threading.Thread(
target=self._auto_connect_to_bridge,
args=(destination_hash,),
daemon=True,
).start()
if self.auto_seed and nid_is_new:
threading.Thread(
target=self._auto_register_seed,
args=(radicle_nid, destination_hash),
daemon=True,
).start()
def _auto_connect_to_bridge(self, destination_hash: bytes):
"""Auto-connect to a discovered bridge in background."""
RNS.log(f"Auto-connecting to bridge: {destination_hash.hex()}", RNS.LOG_INFO)
if self.connect_to_bridge(destination_hash, timeout=30.0):
RNS.log(f"Auto-connected to bridge: {destination_hash.hex()}", RNS.LOG_INFO)
else:
RNS.log(
f"Auto-connect failed for bridge: {destination_hash.hex()}",
RNS.LOG_WARNING,
)
def register_seed(self, radicle_nid: str, port: Optional[int] = None) -> bool:
"""Register a remote radicle NID as reachable through this bridge.
Calls 'rad node connect <NID>@127.0.0.1:<port>' to tell radicle-node
that the given NID is reachable through our bridge TCP server.
port: the bridge's per-NID listen port. Falls back to self.listen_port
if not specified (backward-compatible for single-bridge setups).
"""
if port is None:
port = self.listen_port
addr = f"{radicle_nid}@127.0.0.1:{port}"
RNS.log(f"Registering seed: {addr}", RNS.LOG_INFO)
env = None
if self.rad_home:
env = os.environ.copy()
env["RAD_HOME"] = str(self.rad_home)
try:
result = subprocess.run(
["rad", "node", "connect", addr],
capture_output=True,
text=True,
timeout=30,
env=env,
)
if result.returncode == 0:
RNS.log(f"Seed registered: {radicle_nid[:16]}...", RNS.LOG_INFO)
return True
else:
RNS.log(f"Failed to register seed: {result.stderr}", RNS.LOG_WARNING)
return False
except FileNotFoundError:
RNS.log("'rad' command not found — cannot auto-register seed", RNS.LOG_WARNING)
return False
except subprocess.TimeoutExpired:
RNS.log("Timeout registering seed", RNS.LOG_WARNING)
return False
except Exception as e:
RNS.log(f"Error registering seed: {e}", RNS.LOG_WARNING)
return False
def _auto_register_seed(self, radicle_nid: str, bridge_hash: bytes):
"""Allocate a dedicated TCP port for this bridge and register its NID."""
time.sleep(2.0)
port = self._allocate_port_for_bridge(bridge_hash)
self.register_seed(radicle_nid, port)
def get_stats(self) -> dict:
"""Get bridge statistics."""
with self._tunnels_lock:
active_tunnels = len(self._tunnels)
total_sent = sum(t.bytes_sent for t in self._tunnels.values())
total_recv = sum(t.bytes_received for t in self._tunnels.values())
with self._remote_bridges_lock:
known_bridges = len(self._remote_bridges)
return {
"active_tunnels": active_tunnels,
"known_bridges": known_bridges,
"bytes_sent": total_sent,
"bytes_received": total_recv,
"rns_hash": self.destination.hexhash,
}