diff --git a/src/radicle_reticulum/bridge.py b/src/radicle_reticulum/bridge.py index bb61a12..bf127ef 100644 --- a/src/radicle_reticulum/bridge.py +++ b/src/radicle_reticulum/bridge.py @@ -45,7 +45,10 @@ BRIDGE_APP_DATA_MAGIC = b"RADICLE_BRIDGE_V1" # Buffer sizes TCP_BUFFER_SIZE = 65536 -RNS_BUFFER_SIZE = 32768 # Smaller for RNS to avoid fragmentation +# Read TCP in large chunks for efficiency; outbound data is chunked to +# RNS.Packet.ENCRYPTED_MDU (383 B) before sending so no single RNS.Packet +# exceeds the interface MTU on LoRa or any other constrained link. +TCP_READ_SIZE = 32768 @dataclass @@ -526,13 +529,12 @@ class RadicleBridge: break if readable: - data = tcp_socket.recv(RNS_BUFFER_SIZE) + data = tcp_socket.recv(TCP_READ_SIZE) if not data: break if rns_link.status == RNS.Link.ACTIVE: - packet = RNS.Packet(rns_link, data) - packet.send() + self._send_over_link(rns_link, data) tunnel.bytes_sent += len(data) elif tunnel.remote_destination: RNS.log( @@ -558,8 +560,7 @@ class RadicleBridge: new_link.set_link_closed_callback( lambda l: self._on_tunnel_closed(tid) ) - packet = RNS.Packet(new_link, data) - packet.send() + self._send_over_link(new_link, data) tunnel.bytes_sent += len(data) else: break @@ -572,6 +573,19 @@ class RadicleBridge: self._on_tunnel_closed(tunnel.tunnel_id) + @staticmethod + def _send_over_link(link: RNS.Link, data: bytes): + """Send data over an RNS link, chunking to ENCRYPTED_MDU if needed. + + RNS.Packet raises IOError if data exceeds the interface MTU (~383 B on + LoRa). TCP chunks can be tens of kilobytes, so we split into MDU-sized + pieces. Order is preserved — the link is point-to-point and packets + from a single sender are delivered in order. + """ + mdu = RNS.Packet.ENCRYPTED_MDU + for offset in range(0, len(data), mdu): + RNS.Packet(link, data[offset:offset + mdu]).send() + def _on_rns_data(self, tunnel_id: int, data: bytes): """Handle data received from RNS link.""" with self._tunnels_lock: diff --git a/src/radicle_reticulum/gossip.py b/src/radicle_reticulum/gossip.py index b34d5b2..003b3bb 100644 --- a/src/radicle_reticulum/gossip.py +++ b/src/radicle_reticulum/gossip.py @@ -305,13 +305,8 @@ class GossipRelay: with self._refs_lock: refs = self._known_refs.get(rid) if refs: - payload = json.dumps({ - "type": "refs", - "rid": rid, - "nid": self.radicle_nid or "", - "refs": refs, - }).encode() - self._send_packet(destination_hash, payload) + for payload in self._build_ref_payloads(rid, refs, is_delta=False): + self._send_packet(destination_hash, payload) def _broadcast( self, @@ -320,26 +315,65 @@ class GossipRelay: old_refs: Optional[Dict[str, str]] = None, ): if old_refs is not None: - # Delta mode: only send refs that changed, saving bandwidth on LoRa to_send = {k: v for k, v in refs.items() if v != old_refs.get(k)} else: to_send = refs - msg: Dict = {"type": "refs", "rid": rid, "nid": self.radicle_nid or "", "refs": to_send} - if old_refs is not None: - msg["delta"] = True - payload = json.dumps(msg).encode() + is_delta = old_refs is not None + payloads = self._build_ref_payloads(rid, to_send, is_delta) with self._peers_lock: peers = list(self._known_peers.keys()) - sent = sum(1 for h in peers if self._send_packet(h, payload)) + sent = sum( + 1 for h in peers + if all(self._send_packet(h, p) for p in payloads) + ) + total_bytes = sum(len(p) for p in payloads) RNS.log( f"Broadcast refs for {rid[:20]}... → {sent}/{len(peers)} peers " - f"({'delta' if old_refs is not None else 'full'}, {len(payload)}B)", + f"({'delta' if is_delta else 'full'}, {total_bytes}B, {len(payloads)} pkt(s))", RNS.LOG_INFO, ) + def _build_ref_payloads( + self, rid: str, refs: Dict[str, str], is_delta: bool + ) -> List[bytes]: + """Split refs into RNS.Packet.ENCRYPTED_MDU-sized JSON payloads. + + LoRa interfaces cap at ~383 bytes per encrypted packet. A single ref + entry is ~70 bytes in JSON, so a repo with many refs needs multiple + packets. The receiver handles each packet independently — each one + triggers a changed-ref check and potential sync. + """ + mdu = RNS.Packet.ENCRYPTED_MDU + base: Dict = {"type": "refs", "rid": rid, "nid": self.radicle_nid or ""} + if is_delta: + base["delta"] = True + + # Fast path: everything fits in one packet + candidate = {**base, "refs": refs} + payload = json.dumps(candidate).encode() + if len(payload) <= mdu: + return [payload] + + # Split: add refs one-by-one until the packet would overflow, then flush + payloads: List[bytes] = [] + chunk: Dict[str, str] = {} + for ref_name, sha in refs.items(): + chunk[ref_name] = sha + test = json.dumps({**base, "refs": chunk}).encode() + if len(test) > mdu: + # Flush without this ref, then start new chunk with it + chunk.pop(ref_name) + if chunk: + payloads.append(json.dumps({**base, "refs": chunk}).encode()) + chunk = {ref_name: sha} + if chunk: + payloads.append(json.dumps({**base, "refs": chunk}).encode()) + + return payloads or [json.dumps({**base, "refs": {}}).encode()] + def _send_packet(self, peer_hash: bytes, payload: bytes) -> bool: try: if not RNS.Transport.has_path(peer_hash): diff --git a/tests/test_gossip.py b/tests/test_gossip.py index 0780932..f7d8359 100644 --- a/tests/test_gossip.py +++ b/tests/test_gossip.py @@ -361,6 +361,49 @@ class TestBroadcastDelta: assert sent[0]["refs"] == SAMPLE_REFS +# ── Packet splitting (_build_ref_payloads) ─────────────────────────────────── + +class TestBuildRefPayloads: + def test_small_refs_fit_in_one_packet(self, tmp_path): + relay = _make_relay(tmp_path) + refs = {"refs/heads/main": "a" * 40} + payloads = relay._build_ref_payloads("rad:z3abc123", refs, is_delta=False) + assert len(payloads) == 1 + msg = json.loads(payloads[0]) + assert msg["refs"] == refs + + def test_large_refs_split_into_multiple_packets(self, tmp_path): + relay = _make_relay(tmp_path) + # 20 refs × ~70 bytes each ≈ 1400 bytes, well over ENCRYPTED_MDU=383 + many_refs = {f"refs/heads/branch-{i:03d}": "a" * 40 for i in range(20)} + payloads = relay._build_ref_payloads("rad:z3abc123", many_refs, is_delta=False) + assert len(payloads) > 1 + # All packets must fit within MDU + import RNS + assert all(len(p) <= RNS.Packet.ENCRYPTED_MDU for p in payloads) + # Combined refs must cover every original ref exactly once + combined = {} + for p in payloads: + combined.update(json.loads(p)["refs"]) + assert combined == many_refs + + def test_delta_flag_propagated_to_all_chunks(self, tmp_path): + relay = _make_relay(tmp_path) + many_refs = {f"refs/heads/br-{i}": "b" * 40 for i in range(20)} + payloads = relay._build_ref_payloads("rad:z3abc123", many_refs, is_delta=True) + assert all(json.loads(p).get("delta") is True for p in payloads) + + def test_each_packet_under_mdu(self, tmp_path): + """Single very-long ref name must not produce an oversized packet.""" + import RNS + relay = _make_relay(tmp_path) + # pathological: ref name itself is 200 chars + refs = {"refs/heads/" + "x" * 200: "c" * 40} + payloads = relay._build_ref_payloads("rad:z3abc123", refs, is_delta=False) + # Even if we can't split a single ref below MDU, we still emit it + assert len(payloads) >= 1 + + # ── Auto-seed ───────────────────────────────────────────────────────────────── class TestAutoSeed: diff --git a/tests/test_integration.py b/tests/test_integration.py index f7aeb17..6e59142 100644 --- a/tests/test_integration.py +++ b/tests/test_integration.py @@ -315,6 +315,7 @@ class TestTCPTunnelIntegration: patch("radicle_reticulum.bridge.RNS.Packet") as mock_pkt_cls, \ patch("radicle_reticulum.bridge.RNS.log"): mock_pkt_cls.side_effect = lambda lnk, data: sent_data.append(data) or MagicMock() + mock_pkt_cls.ENCRYPTED_MDU = 383 # must be set so chunking uses the real value from radicle_reticulum.bridge import TunnelConnection tunnel = TunnelConnection( @@ -332,8 +333,9 @@ class TestTCPTunnelIntegration: bridge._forward_tcp_to_rns(tunnel) local.close() - assert any(payload in d for d in sent_data), \ - f"Expected {payload!r} in forwarded data, got {sent_data}" + # Payload fits in one MDU chunk, so it should arrive as a single packet + assert sent_data == [payload], \ + f"Expected [{payload!r}], got {sent_data}" def test_rns_data_forwarded_to_tcp_socket(self): """Data received from RNS should be written to the TCP socket."""