radicle-reticulum/src/radicle_reticulum/bridge.py

606 lines
22 KiB
Python

"""TCP-to-Reticulum bridge for Radicle.
Bridges radicle-node's TCP connections over Reticulum mesh network.
Allows using Radicle normally while syncing over LoRa, packet radio, etc.
Architecture:
┌─────────────┐ TCP ┌─────────────┐ Reticulum ┌─────────────┐
│ radicle-node│ ◄──────────► │ Bridge │ ◄───────────► │Remote Bridge│
└─────────────┘ localhost └─────────────┘ LoRa/mesh └─────────────┘
The bridge:
1. Listens on localhost TCP (default: 8776, Radicle's default port)
2. Accepts connections from local radicle-node
3. Tunnels traffic over Reticulum to remote bridges
4. Remote bridges forward to their local radicle-node
"""
import socket
import select
import struct
import subprocess
import threading
import time
from dataclasses import dataclass, field
from typing import Callable, Dict, List, Optional, Set, Tuple
import RNS
from radicle_reticulum.identity import RadicleIdentity
# Default ports
RADICLE_DEFAULT_PORT = 8776
BRIDGE_DEFAULT_PORT = 8777 # Local listen port for bridge
# App name for RNS destinations
APP_NAME = "radicle"
ASPECT_BRIDGE = "bridge"
# App data identifier for bridge announces (used for filtering)
BRIDGE_APP_DATA_MAGIC = b"RADICLE_BRIDGE_V1"
# Buffer sizes
TCP_BUFFER_SIZE = 65536
RNS_BUFFER_SIZE = 32768 # Smaller for RNS to avoid fragmentation
@dataclass
class TunnelConnection:
"""Represents a tunneled TCP connection."""
tunnel_id: int
tcp_socket: Optional[socket.socket]
rns_link: Optional[RNS.Link]
remote_destination: Optional[bytes]
created_at: float = field(default_factory=time.time)
bytes_sent: int = 0
bytes_received: int = 0
active: bool = True
def close(self):
"""Close the tunnel."""
self.active = False
if self.tcp_socket:
try:
self.tcp_socket.close()
except:
pass
if self.rns_link:
try:
self.rns_link.teardown()
except:
pass
class RadicleBridge:
"""Bridges Radicle TCP connections over Reticulum.
Modes:
- Server mode: Listens for TCP from local radicle-node, tunnels to remote bridges
- Accepts incoming tunnels from remote bridges, forwards to local radicle-node
"""
def __init__(
self,
identity: Optional[RadicleIdentity] = None,
listen_port: int = BRIDGE_DEFAULT_PORT,
radicle_host: str = "127.0.0.1",
radicle_port: int = RADICLE_DEFAULT_PORT,
config_path: Optional[str] = None,
auto_connect: bool = True,
auto_seed: bool = True,
announce_retry_delays: Tuple[int, ...] = (5, 15, 30),
):
"""Initialize the bridge.
Args:
identity: RNS identity for this bridge
listen_port: TCP port to listen on for local radicle-node
radicle_host: Host where radicle-node listens (for incoming tunnels)
radicle_port: Port where radicle-node listens
config_path: Path to Reticulum config
auto_connect: Automatically connect to discovered bridges
auto_seed: Automatically register discovered NIDs with radicle-node
announce_retry_delays: Seconds between startup re-announces. Use longer
intervals on LoRa to respect duty cycle limits, e.g. (60, 300, 900).
"""
self.listen_port = listen_port
self.radicle_host = radicle_host
self.radicle_port = radicle_port
self.auto_connect = auto_connect
self.auto_seed = auto_seed
self.announce_retry_delays = announce_retry_delays
# Initialize Reticulum
self.reticulum = RNS.Reticulum(config_path)
# Identity
if identity is None:
identity = RadicleIdentity.generate()
self.identity = identity
# Create RNS destination for incoming tunnels
self.destination = RNS.Destination(
self.identity.rns_identity,
RNS.Destination.IN,
RNS.Destination.SINGLE,
APP_NAME,
ASPECT_BRIDGE,
)
self.destination.set_link_established_callback(self._on_incoming_link)
# Known remote bridges: hash -> last_seen
self._remote_bridges: Dict[bytes, float] = {}
self._remote_bridges_lock = threading.Lock()
# Active tunnels
self._tunnels: Dict[int, TunnelConnection] = {}
self._tunnel_counter = 0
self._tunnels_lock = threading.Lock()
# TCP server
self._tcp_server: Optional[socket.socket] = None
self._running = False
# Callbacks
self._tunnel_opened_cb: Optional[Callable[[TunnelConnection], None]] = None
self._tunnel_closed_cb: Optional[Callable[[TunnelConnection], None]] = None
self._on_bridge_discovered: Optional[Callable[[bytes, Optional[str]], None]] = None
# Local radicle node NID (for announcing to remote bridges)
self._local_radicle_nid: Optional[str] = None
# Remote bridge NIDs: bridge_hash -> radicle_nid
self._bridge_nids: Dict[bytes, str] = {}
def start(self):
"""Start the bridge."""
self._running = True
# Register announce handler to discover other bridges
RNS.Transport.register_announce_handler(self._handle_announce)
RNS.log(f"Registered announce handler for bridge discovery", RNS.LOG_INFO)
# Start TCP server
self._tcp_server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self._tcp_server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self._tcp_server.bind(("127.0.0.1", self.listen_port))
self._tcp_server.listen(5)
self._tcp_server.setblocking(False)
# Start accept thread
self._accept_thread = threading.Thread(target=self._accept_loop, daemon=True)
self._accept_thread.start()
# Announce presence; repeat a few times so peers that come up shortly
# after us don't miss it due to interface initialisation timing
self.announce()
threading.Thread(target=self._startup_announce_loop, daemon=True).start()
RNS.log(f"Radicle bridge started", RNS.LOG_INFO)
RNS.log(f" TCP listen: 127.0.0.1:{self.listen_port}", RNS.LOG_INFO)
RNS.log(f" RNS hash: {self.destination.hexhash}", RNS.LOG_INFO)
RNS.log(f" Radicle target: {self.radicle_host}:{self.radicle_port}", RNS.LOG_INFO)
def _startup_announce_loop(self):
"""Re-announce after startup to catch peers that come up slightly later.
Delays are configurable — use long values on LoRa to respect duty cycle.
RNS itself also rate-limits announces per interface.
"""
for delay in self.announce_retry_delays:
time.sleep(delay)
if not self._running:
return
self.announce()
def stop(self):
"""Stop the bridge."""
self._running = False
# Close all tunnels
with self._tunnels_lock:
for tunnel in list(self._tunnels.values()):
tunnel.close()
self._tunnels.clear()
# Close TCP server
if self._tcp_server:
self._tcp_server.close()
RNS.log("Radicle bridge stopped", RNS.LOG_INFO)
def set_local_radicle_nid(self, nid: str):
"""Set the local radicle node's NID for announcement.
This allows remote bridges to know which radicle NID is reachable
through this bridge.
"""
self._local_radicle_nid = nid
RNS.log(f"Local radicle NID set: {nid[:32]}...", RNS.LOG_INFO)
def get_remote_bridge_nid(self, bridge_hash: bytes) -> Optional[str]:
"""Get the radicle NID served by a remote bridge."""
return self._bridge_nids.get(bridge_hash)
def announce(self):
"""Announce this bridge on the network."""
# Build app_data: magic + optional NID
app_data = BRIDGE_APP_DATA_MAGIC
if self._local_radicle_nid:
nid_bytes = self._local_radicle_nid.encode("utf-8")
app_data += struct.pack("!H", len(nid_bytes)) + nid_bytes
self.destination.announce(app_data=app_data)
RNS.log(f"Announced bridge: {self.destination.hexhash} (app_data={len(app_data)} bytes)", RNS.LOG_INFO)
def connect_to_bridge(self, destination_hash: bytes, timeout: float = 30.0) -> bool:
"""Connect to a remote bridge.
This establishes the RNS link. Actual tunneling happens when
local radicle-node connects to our TCP port.
"""
# Check if we have a path
if not RNS.Transport.has_path(destination_hash):
RNS.Transport.request_path(destination_hash)
deadline = time.time() + timeout
while not RNS.Transport.has_path(destination_hash):
if time.time() > deadline:
RNS.log(f"Path timeout: {destination_hash.hex()}", RNS.LOG_WARNING)
return False
time.sleep(0.1)
# Remember this bridge
with self._remote_bridges_lock:
self._remote_bridges[destination_hash] = time.time()
RNS.log(f"Registered remote bridge: {destination_hash.hex()}", RNS.LOG_INFO)
return True
def get_remote_bridges(self) -> List[bytes]:
"""Get list of known remote bridge hashes."""
with self._remote_bridges_lock:
return list(self._remote_bridges.keys())
def _accept_loop(self):
"""Accept incoming TCP connections from local radicle-node."""
while self._running:
try:
readable, _, _ = select.select([self._tcp_server], [], [], 1.0)
if readable:
client_socket, addr = self._tcp_server.accept()
RNS.log(f"TCP connection from {addr}", RNS.LOG_DEBUG)
# Handle in new thread
thread = threading.Thread(
target=self._handle_local_connection,
args=(client_socket,),
daemon=True,
)
thread.start()
except Exception as e:
if self._running:
RNS.log(f"Accept error: {e}", RNS.LOG_ERROR)
def _handle_local_connection(self, tcp_socket: socket.socket):
"""Handle a connection from local radicle-node.
Creates tunnel to remote bridge and forwards traffic.
"""
# Get a remote bridge to tunnel to
remote_bridges = self.get_remote_bridges()
if not remote_bridges:
RNS.log("No remote bridges available", RNS.LOG_WARNING)
tcp_socket.close()
return
# Use first available bridge (could add load balancing later)
remote_hash = remote_bridges[0]
# Create RNS link to remote bridge
remote_identity = RNS.Identity.recall(remote_hash)
if not remote_identity:
RNS.log(f"Cannot recall identity for {remote_hash.hex()}", RNS.LOG_WARNING)
tcp_socket.close()
return
remote_dest = RNS.Destination(
remote_identity,
RNS.Destination.OUT,
RNS.Destination.SINGLE,
APP_NAME,
ASPECT_BRIDGE,
)
rns_link = RNS.Link(remote_dest)
# Wait for link establishment
deadline = time.time() + 30.0
while rns_link.status != RNS.Link.ACTIVE:
if rns_link.status == RNS.Link.CLOSED:
RNS.log("Link closed before becoming active (remote refused or unreachable)", RNS.LOG_WARNING)
tcp_socket.close()
return
if time.time() > deadline:
RNS.log("Link establishment timed out after 30s", RNS.LOG_WARNING)
tcp_socket.close()
return
time.sleep(0.1)
# Create tunnel
with self._tunnels_lock:
self._tunnel_counter += 1
tunnel_id = self._tunnel_counter
tunnel = TunnelConnection(
tunnel_id=tunnel_id,
tcp_socket=tcp_socket,
rns_link=rns_link,
remote_destination=remote_hash,
)
with self._tunnels_lock:
self._tunnels[tunnel_id] = tunnel
RNS.log(f"Tunnel {tunnel_id} opened to {remote_hash.hex()[:16]}", RNS.LOG_INFO)
if self._tunnel_opened_cb:
self._tunnel_opened_cb(tunnel)
# Set up bidirectional forwarding
rns_link.set_packet_callback(
lambda data, pkt: self._on_rns_data(tunnel_id, data)
)
rns_link.set_link_closed_callback(
lambda link: self._on_tunnel_closed(tunnel_id)
)
# Forward TCP to RNS
self._forward_tcp_to_rns(tunnel)
def _forward_tcp_to_rns(self, tunnel: TunnelConnection):
"""Forward data from TCP socket to RNS link."""
tcp_socket = tunnel.tcp_socket
rns_link = tunnel.rns_link
tcp_socket.setblocking(False)
while tunnel.active and self._running:
try:
readable, _, errored = select.select([tcp_socket], [], [tcp_socket], 1.0)
if errored:
break
if readable:
data = tcp_socket.recv(RNS_BUFFER_SIZE)
if not data:
break # Connection closed
# Send over RNS
if rns_link.status == RNS.Link.ACTIVE:
packet = RNS.Packet(rns_link, data)
packet.send()
tunnel.bytes_sent += len(data)
else:
break
except socket.error:
break
except Exception as e:
RNS.log(f"Forward error: {e}", RNS.LOG_DEBUG)
break
self._on_tunnel_closed(tunnel.tunnel_id)
def _on_rns_data(self, tunnel_id: int, data: bytes):
"""Handle data received from RNS link."""
with self._tunnels_lock:
tunnel = self._tunnels.get(tunnel_id)
if tunnel and tunnel.active and tunnel.tcp_socket:
try:
tunnel.tcp_socket.sendall(data)
tunnel.bytes_received += len(data)
except Exception as e:
RNS.log(f"TCP send error: {e}", RNS.LOG_DEBUG)
self._on_tunnel_closed(tunnel_id)
def _on_tunnel_closed(self, tunnel_id: int):
"""Handle tunnel closure."""
with self._tunnels_lock:
tunnel = self._tunnels.pop(tunnel_id, None)
if tunnel:
tunnel.close()
RNS.log(
f"Tunnel {tunnel_id} closed (sent: {tunnel.bytes_sent}, recv: {tunnel.bytes_received})",
RNS.LOG_INFO
)
if self._tunnel_closed_cb:
self._tunnel_closed_cb(tunnel)
def _on_incoming_link(self, link: RNS.Link):
"""Handle incoming RNS link from remote bridge."""
RNS.log(f"Incoming link from remote bridge", RNS.LOG_DEBUG)
# Connect to local radicle-node
try:
tcp_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
tcp_socket.connect((self.radicle_host, self.radicle_port))
except Exception as e:
RNS.log(f"Cannot connect to local radicle-node: {e}", RNS.LOG_ERROR)
link.teardown()
return
# Create tunnel
with self._tunnels_lock:
self._tunnel_counter += 1
tunnel_id = self._tunnel_counter
tunnel = TunnelConnection(
tunnel_id=tunnel_id,
tcp_socket=tcp_socket,
rns_link=link,
remote_destination=link.destination.hash if link.destination else None,
)
with self._tunnels_lock:
self._tunnels[tunnel_id] = tunnel
RNS.log(f"Incoming tunnel {tunnel_id} opened", RNS.LOG_INFO)
# Set up bidirectional forwarding
link.set_packet_callback(
lambda data, pkt: self._on_rns_data(tunnel_id, data)
)
link.set_link_closed_callback(
lambda l: self._on_tunnel_closed(tunnel_id)
)
# Forward TCP to RNS in background
thread = threading.Thread(
target=self._forward_tcp_to_rns,
args=(tunnel,),
daemon=True,
)
thread.start()
def set_on_bridge_discovered(self, callback: Optional[Callable[[bytes, Optional[str]], None]]):
"""Set callback for when a new bridge is discovered.
Callback receives (destination_hash, radicle_nid) where radicle_nid
may be None if the remote bridge hasn't set one.
"""
self._on_bridge_discovered = callback
def _handle_announce(
self,
destination_hash: bytes,
announced_identity: RNS.Identity,
app_data: Optional[bytes],
):
"""Handle announce - only process if it's a bridge announce."""
RNS.log(f"Received announce: {destination_hash.hex()[:16]}... app_data={app_data[:20] if app_data else None}", RNS.LOG_VERBOSE)
# Ignore our own announcements
if destination_hash == self.destination.hash:
RNS.log("Ignoring own announcement", RNS.LOG_VERBOSE)
return
# Filter: only accept announces with bridge magic in app_data
if app_data is None or not app_data.startswith(BRIDGE_APP_DATA_MAGIC):
# Not a bridge announce, ignore
RNS.log(f"Ignoring non-bridge announce (no magic)", RNS.LOG_VERBOSE)
return
# Extract radicle NID if present
radicle_nid = None
if len(app_data) > len(BRIDGE_APP_DATA_MAGIC):
try:
offset = len(BRIDGE_APP_DATA_MAGIC)
nid_len = struct.unpack("!H", app_data[offset:offset+2])[0]
offset += 2
nid_str = app_data[offset:offset+nid_len].decode("utf-8")
radicle_nid = nid_str if nid_str else None
except Exception as e:
RNS.log(f"Failed to parse bridge app_data: {e}", RNS.LOG_DEBUG)
with self._remote_bridges_lock:
is_new = destination_hash not in self._remote_bridges
self._remote_bridges[destination_hash] = time.time()
# Check and update NID mapping under the same lock to avoid double-registering
nid_is_new = bool(radicle_nid and destination_hash not in self._bridge_nids)
if radicle_nid:
self._bridge_nids[destination_hash] = radicle_nid
if is_new:
nid_info = f" (NID: {radicle_nid[:32]}...)" if radicle_nid else ""
RNS.log(f"Discovered bridge: {destination_hash.hex()}{nid_info}", RNS.LOG_INFO)
# Notify callback
if self._on_bridge_discovered:
self._on_bridge_discovered(destination_hash, radicle_nid)
# Auto-connect if enabled
if self.auto_connect:
threading.Thread(
target=self._auto_connect_to_bridge,
args=(destination_hash,),
daemon=True,
).start()
# Auto-register seed whenever we first learn the NID (covers --connect path
# where the bridge is pre-registered before its announce arrives)
if self.auto_seed and nid_is_new:
threading.Thread(
target=self._auto_register_seed,
args=(radicle_nid,),
daemon=True,
).start()
def _auto_connect_to_bridge(self, destination_hash: bytes):
"""Auto-connect to a discovered bridge in background."""
RNS.log(f"Auto-connecting to bridge: {destination_hash.hex()}", RNS.LOG_INFO)
if self.connect_to_bridge(destination_hash, timeout=30.0):
RNS.log(f"Auto-connected to bridge: {destination_hash.hex()}", RNS.LOG_INFO)
else:
RNS.log(f"Auto-connect failed for bridge: {destination_hash.hex()}", RNS.LOG_WARNING)
def register_seed(self, radicle_nid: str) -> bool:
"""Register a remote radicle NID as a seed through this bridge.
Calls 'rad node connect <NID>@127.0.0.1:<listen_port>' to tell
radicle-node that the given NID is reachable through our bridge.
Returns True if successful.
"""
addr = f"{radicle_nid}@127.0.0.1:{self.listen_port}"
RNS.log(f"Registering seed: {addr}", RNS.LOG_INFO)
try:
result = subprocess.run(
["rad", "node", "connect", addr],
capture_output=True,
text=True,
timeout=30,
)
if result.returncode == 0:
RNS.log(f"Seed registered: {radicle_nid[:16]}...", RNS.LOG_INFO)
return True
else:
RNS.log(f"Failed to register seed: {result.stderr}", RNS.LOG_WARNING)
return False
except FileNotFoundError:
RNS.log("'rad' command not found - cannot auto-register seed", RNS.LOG_WARNING)
return False
except subprocess.TimeoutExpired:
RNS.log("Timeout registering seed", RNS.LOG_WARNING)
return False
except Exception as e:
RNS.log(f"Error registering seed: {e}", RNS.LOG_WARNING)
return False
def _auto_register_seed(self, radicle_nid: str):
"""Auto-register a seed in background after discovery."""
# Small delay to allow bridge connection to establish first
time.sleep(2.0)
self.register_seed(radicle_nid)
def get_stats(self) -> dict:
"""Get bridge statistics."""
with self._tunnels_lock:
active_tunnels = len(self._tunnels)
total_sent = sum(t.bytes_sent for t in self._tunnels.values())
total_recv = sum(t.bytes_received for t in self._tunnels.values())
with self._remote_bridges_lock:
known_bridges = len(self._remote_bridges)
return {
"active_tunnels": active_tunnels,
"known_bridges": known_bridges,
"bytes_sent": total_sent,
"bytes_received": total_recv,
"rns_hash": self.destination.hexhash,
}