fix: reconnect after tunnel drop — path maintenance + link re-establishment

- _path_maintenance_loop: runs every 60s, re-requests stale paths to all
  known bridges so radicle-node retries are fast after a LoRa glitch
- _reconnect_link: attempts to re-establish an RNS link after it drops
  mid-transfer; splits 20s timeout between path recovery and handshake
- _forward_tcp_to_rns: on link CLOSED/FAILED, tries _reconnect_link once
  before closing the TCP socket — preserves the TCP connection on brief
  glitches, re-registers packet/close callbacks on the new link
- _stop_event wakes the maintenance loop immediately on stop()

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Maciek "mab122" Bator 2026-04-23 13:31:42 +02:00
parent 05dc078f31
commit 4d3fdcf5f9
1 changed files with 97 additions and 1 deletions

View File

@ -162,6 +162,8 @@ class RadicleBridge:
# Remote bridge NIDs: bridge_hash -> radicle_nid (guarded by _remote_bridges_lock) # Remote bridge NIDs: bridge_hash -> radicle_nid (guarded by _remote_bridges_lock)
self._bridge_nids: Dict[bytes, str] = {} self._bridge_nids: Dict[bytes, str] = {}
self._stop_event = threading.Event()
def start(self): def start(self):
"""Start the bridge.""" """Start the bridge."""
self._running = True self._running = True
@ -174,6 +176,7 @@ class RadicleBridge:
# after us don't miss it due to interface initialisation timing # after us don't miss it due to interface initialisation timing
self.announce() self.announce()
threading.Thread(target=self._startup_announce_loop, daemon=True).start() threading.Thread(target=self._startup_announce_loop, daemon=True).start()
threading.Thread(target=self._path_maintenance_loop, daemon=True).start()
RNS.log("Radicle bridge started", RNS.LOG_INFO) RNS.log("Radicle bridge started", RNS.LOG_INFO)
RNS.log(f" RNS hash: {self.destination.hexhash}", RNS.LOG_INFO) RNS.log(f" RNS hash: {self.destination.hexhash}", RNS.LOG_INFO)
@ -191,9 +194,75 @@ class RadicleBridge:
return return
self.announce() self.announce()
def _path_maintenance_loop(self):
"""Periodically re-request paths to known bridges that have gone stale.
Keeps RNS path table warm so the next radicle-node connection attempt
finds a path immediately instead of waiting 15-30s for re-discovery.
"""
while self._running:
self._stop_event.wait(timeout=60)
self._stop_event.clear()
if not self._running:
return
with self._remote_bridges_lock:
bridges = list(self._remote_bridges.keys())
for bridge_hash in bridges:
if not RNS.Transport.has_path(bridge_hash):
RNS.log(
f"Path stale for {bridge_hash.hex()[:16]}, requesting refresh",
RNS.LOG_INFO,
)
RNS.Transport.request_path(bridge_hash)
def _reconnect_link(
self, bridge_hash: bytes, timeout: float = 20.0
) -> Optional[RNS.Link]:
"""Re-establish an RNS link to a known bridge after a drop.
Returns an ACTIVE link on success, None if path is gone or timeout.
Splits timeout evenly between path recovery and link handshake.
"""
half = timeout / 2
if not RNS.Transport.has_path(bridge_hash):
RNS.Transport.request_path(bridge_hash)
deadline = time.time() + half
while not RNS.Transport.has_path(bridge_hash):
if time.time() > deadline:
RNS.log(
f"Reconnect: no path to {bridge_hash.hex()[:16]} after {half:.0f}s",
RNS.LOG_WARNING,
)
return None
time.sleep(0.5)
remote_identity = RNS.Identity.recall(bridge_hash)
if remote_identity is None:
return None
remote_dest = RNS.Destination(
remote_identity,
RNS.Destination.OUT,
RNS.Destination.SINGLE,
APP_NAME,
ASPECT_BRIDGE,
)
link = RNS.Link(remote_dest)
deadline = time.time() + half
while link.status != RNS.Link.ACTIVE:
if link.status in (RNS.Link.CLOSED, RNS.Link.FAILED):
return None
if time.time() > deadline:
return None
time.sleep(0.1)
return link
def stop(self): def stop(self):
"""Stop the bridge.""" """Stop the bridge."""
self._running = False self._running = False
self._stop_event.set()
# Close all tunnels # Close all tunnels
with self._tunnels_lock: with self._tunnels_lock:
@ -394,11 +463,11 @@ class RadicleBridge:
def _forward_tcp_to_rns(self, tunnel: TunnelConnection): def _forward_tcp_to_rns(self, tunnel: TunnelConnection):
"""Forward data from TCP socket to RNS link.""" """Forward data from TCP socket to RNS link."""
tcp_socket = tunnel.tcp_socket tcp_socket = tunnel.tcp_socket
rns_link = tunnel.rns_link
tcp_socket.setblocking(False) tcp_socket.setblocking(False)
while tunnel.active and self._running: while tunnel.active and self._running:
try: try:
rns_link = tunnel.rns_link # read each iteration: may be updated by reconnect
readable, _, errored = select.select([tcp_socket], [], [tcp_socket], 1.0) readable, _, errored = select.select([tcp_socket], [], [tcp_socket], 1.0)
if errored: if errored:
@ -413,6 +482,33 @@ class RadicleBridge:
packet = RNS.Packet(rns_link, data) packet = RNS.Packet(rns_link, data)
packet.send() packet.send()
tunnel.bytes_sent += len(data) tunnel.bytes_sent += len(data)
elif tunnel.remote_destination:
RNS.log(
f"Tunnel {tunnel.tunnel_id}: link dropped, reconnecting...",
RNS.LOG_WARNING,
)
new_link = self._reconnect_link(tunnel.remote_destination)
if new_link is None:
RNS.log(
f"Tunnel {tunnel.tunnel_id}: reconnect failed",
RNS.LOG_WARNING,
)
break
RNS.log(
f"Tunnel {tunnel.tunnel_id}: reconnected",
RNS.LOG_INFO,
)
tunnel.rns_link = new_link
tid = tunnel.tunnel_id
new_link.set_packet_callback(
lambda d, p: self._on_rns_data(tid, d)
)
new_link.set_link_closed_callback(
lambda l: self._on_tunnel_closed(tid)
)
packet = RNS.Packet(new_link, data)
packet.send()
tunnel.bytes_sent += len(data)
else: else:
break break