fix: chunk RNS packets to ENCRYPTED_MDU (383 B) — unblocks real LoRa use
bridge.py: - _send_over_link: splits TCP data into RNS.Packet.ENCRYPTED_MDU-sized chunks before sending. RNS.Packet.pack() raises IOError on oversized data; a 32 KB TCP read would silently kill the tunnel on any LoRa or constrained interface. Order is safe — link is point-to-point, single sender per tunnel. - Renamed RNS_BUFFER_SIZE → TCP_READ_SIZE (reads stay large for TCP efficiency; only outbound RNS direction is chunked). gossip.py: - _build_ref_payloads: packs refs into JSON payloads that each fit within ENCRYPTED_MDU. For >5 refs (>383 B), produces multiple packets. The receiver handles each independently — each triggers a change check and potential sync. - _broadcast and _send_initial_refs now use _build_ref_payloads instead of building a single possibly-oversized payload. tests: - test_integration: set mock_pkt_cls.ENCRYPTED_MDU=383 so chunk size is correct under patch; assert single-packet delivery for small payloads - test_gossip: TestBuildRefPayloads — small fits in 1 packet, 20 refs split across multiple packets all ≤ MDU, delta flag propagated to all chunks Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
parent
959eed17d2
commit
d7b124e830
|
|
@ -45,7 +45,10 @@ BRIDGE_APP_DATA_MAGIC = b"RADICLE_BRIDGE_V1"
|
||||||
|
|
||||||
# Buffer sizes
|
# Buffer sizes
|
||||||
TCP_BUFFER_SIZE = 65536
|
TCP_BUFFER_SIZE = 65536
|
||||||
RNS_BUFFER_SIZE = 32768 # Smaller for RNS to avoid fragmentation
|
# Read TCP in large chunks for efficiency; outbound data is chunked to
|
||||||
|
# RNS.Packet.ENCRYPTED_MDU (383 B) before sending so no single RNS.Packet
|
||||||
|
# exceeds the interface MTU on LoRa or any other constrained link.
|
||||||
|
TCP_READ_SIZE = 32768
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
@dataclass
|
||||||
|
|
@ -526,13 +529,12 @@ class RadicleBridge:
|
||||||
break
|
break
|
||||||
|
|
||||||
if readable:
|
if readable:
|
||||||
data = tcp_socket.recv(RNS_BUFFER_SIZE)
|
data = tcp_socket.recv(TCP_READ_SIZE)
|
||||||
if not data:
|
if not data:
|
||||||
break
|
break
|
||||||
|
|
||||||
if rns_link.status == RNS.Link.ACTIVE:
|
if rns_link.status == RNS.Link.ACTIVE:
|
||||||
packet = RNS.Packet(rns_link, data)
|
self._send_over_link(rns_link, data)
|
||||||
packet.send()
|
|
||||||
tunnel.bytes_sent += len(data)
|
tunnel.bytes_sent += len(data)
|
||||||
elif tunnel.remote_destination:
|
elif tunnel.remote_destination:
|
||||||
RNS.log(
|
RNS.log(
|
||||||
|
|
@ -558,8 +560,7 @@ class RadicleBridge:
|
||||||
new_link.set_link_closed_callback(
|
new_link.set_link_closed_callback(
|
||||||
lambda l: self._on_tunnel_closed(tid)
|
lambda l: self._on_tunnel_closed(tid)
|
||||||
)
|
)
|
||||||
packet = RNS.Packet(new_link, data)
|
self._send_over_link(new_link, data)
|
||||||
packet.send()
|
|
||||||
tunnel.bytes_sent += len(data)
|
tunnel.bytes_sent += len(data)
|
||||||
else:
|
else:
|
||||||
break
|
break
|
||||||
|
|
@ -572,6 +573,19 @@ class RadicleBridge:
|
||||||
|
|
||||||
self._on_tunnel_closed(tunnel.tunnel_id)
|
self._on_tunnel_closed(tunnel.tunnel_id)
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _send_over_link(link: RNS.Link, data: bytes):
|
||||||
|
"""Send data over an RNS link, chunking to ENCRYPTED_MDU if needed.
|
||||||
|
|
||||||
|
RNS.Packet raises IOError if data exceeds the interface MTU (~383 B on
|
||||||
|
LoRa). TCP chunks can be tens of kilobytes, so we split into MDU-sized
|
||||||
|
pieces. Order is preserved — the link is point-to-point and packets
|
||||||
|
from a single sender are delivered in order.
|
||||||
|
"""
|
||||||
|
mdu = RNS.Packet.ENCRYPTED_MDU
|
||||||
|
for offset in range(0, len(data), mdu):
|
||||||
|
RNS.Packet(link, data[offset:offset + mdu]).send()
|
||||||
|
|
||||||
def _on_rns_data(self, tunnel_id: int, data: bytes):
|
def _on_rns_data(self, tunnel_id: int, data: bytes):
|
||||||
"""Handle data received from RNS link."""
|
"""Handle data received from RNS link."""
|
||||||
with self._tunnels_lock:
|
with self._tunnels_lock:
|
||||||
|
|
|
||||||
|
|
@ -305,13 +305,8 @@ class GossipRelay:
|
||||||
with self._refs_lock:
|
with self._refs_lock:
|
||||||
refs = self._known_refs.get(rid)
|
refs = self._known_refs.get(rid)
|
||||||
if refs:
|
if refs:
|
||||||
payload = json.dumps({
|
for payload in self._build_ref_payloads(rid, refs, is_delta=False):
|
||||||
"type": "refs",
|
self._send_packet(destination_hash, payload)
|
||||||
"rid": rid,
|
|
||||||
"nid": self.radicle_nid or "",
|
|
||||||
"refs": refs,
|
|
||||||
}).encode()
|
|
||||||
self._send_packet(destination_hash, payload)
|
|
||||||
|
|
||||||
def _broadcast(
|
def _broadcast(
|
||||||
self,
|
self,
|
||||||
|
|
@ -320,26 +315,65 @@ class GossipRelay:
|
||||||
old_refs: Optional[Dict[str, str]] = None,
|
old_refs: Optional[Dict[str, str]] = None,
|
||||||
):
|
):
|
||||||
if old_refs is not None:
|
if old_refs is not None:
|
||||||
# Delta mode: only send refs that changed, saving bandwidth on LoRa
|
|
||||||
to_send = {k: v for k, v in refs.items() if v != old_refs.get(k)}
|
to_send = {k: v for k, v in refs.items() if v != old_refs.get(k)}
|
||||||
else:
|
else:
|
||||||
to_send = refs
|
to_send = refs
|
||||||
|
|
||||||
msg: Dict = {"type": "refs", "rid": rid, "nid": self.radicle_nid or "", "refs": to_send}
|
is_delta = old_refs is not None
|
||||||
if old_refs is not None:
|
payloads = self._build_ref_payloads(rid, to_send, is_delta)
|
||||||
msg["delta"] = True
|
|
||||||
payload = json.dumps(msg).encode()
|
|
||||||
|
|
||||||
with self._peers_lock:
|
with self._peers_lock:
|
||||||
peers = list(self._known_peers.keys())
|
peers = list(self._known_peers.keys())
|
||||||
|
|
||||||
sent = sum(1 for h in peers if self._send_packet(h, payload))
|
sent = sum(
|
||||||
|
1 for h in peers
|
||||||
|
if all(self._send_packet(h, p) for p in payloads)
|
||||||
|
)
|
||||||
|
total_bytes = sum(len(p) for p in payloads)
|
||||||
RNS.log(
|
RNS.log(
|
||||||
f"Broadcast refs for {rid[:20]}... → {sent}/{len(peers)} peers "
|
f"Broadcast refs for {rid[:20]}... → {sent}/{len(peers)} peers "
|
||||||
f"({'delta' if old_refs is not None else 'full'}, {len(payload)}B)",
|
f"({'delta' if is_delta else 'full'}, {total_bytes}B, {len(payloads)} pkt(s))",
|
||||||
RNS.LOG_INFO,
|
RNS.LOG_INFO,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
def _build_ref_payloads(
|
||||||
|
self, rid: str, refs: Dict[str, str], is_delta: bool
|
||||||
|
) -> List[bytes]:
|
||||||
|
"""Split refs into RNS.Packet.ENCRYPTED_MDU-sized JSON payloads.
|
||||||
|
|
||||||
|
LoRa interfaces cap at ~383 bytes per encrypted packet. A single ref
|
||||||
|
entry is ~70 bytes in JSON, so a repo with many refs needs multiple
|
||||||
|
packets. The receiver handles each packet independently — each one
|
||||||
|
triggers a changed-ref check and potential sync.
|
||||||
|
"""
|
||||||
|
mdu = RNS.Packet.ENCRYPTED_MDU
|
||||||
|
base: Dict = {"type": "refs", "rid": rid, "nid": self.radicle_nid or ""}
|
||||||
|
if is_delta:
|
||||||
|
base["delta"] = True
|
||||||
|
|
||||||
|
# Fast path: everything fits in one packet
|
||||||
|
candidate = {**base, "refs": refs}
|
||||||
|
payload = json.dumps(candidate).encode()
|
||||||
|
if len(payload) <= mdu:
|
||||||
|
return [payload]
|
||||||
|
|
||||||
|
# Split: add refs one-by-one until the packet would overflow, then flush
|
||||||
|
payloads: List[bytes] = []
|
||||||
|
chunk: Dict[str, str] = {}
|
||||||
|
for ref_name, sha in refs.items():
|
||||||
|
chunk[ref_name] = sha
|
||||||
|
test = json.dumps({**base, "refs": chunk}).encode()
|
||||||
|
if len(test) > mdu:
|
||||||
|
# Flush without this ref, then start new chunk with it
|
||||||
|
chunk.pop(ref_name)
|
||||||
|
if chunk:
|
||||||
|
payloads.append(json.dumps({**base, "refs": chunk}).encode())
|
||||||
|
chunk = {ref_name: sha}
|
||||||
|
if chunk:
|
||||||
|
payloads.append(json.dumps({**base, "refs": chunk}).encode())
|
||||||
|
|
||||||
|
return payloads or [json.dumps({**base, "refs": {}}).encode()]
|
||||||
|
|
||||||
def _send_packet(self, peer_hash: bytes, payload: bytes) -> bool:
|
def _send_packet(self, peer_hash: bytes, payload: bytes) -> bool:
|
||||||
try:
|
try:
|
||||||
if not RNS.Transport.has_path(peer_hash):
|
if not RNS.Transport.has_path(peer_hash):
|
||||||
|
|
|
||||||
|
|
@ -361,6 +361,49 @@ class TestBroadcastDelta:
|
||||||
assert sent[0]["refs"] == SAMPLE_REFS
|
assert sent[0]["refs"] == SAMPLE_REFS
|
||||||
|
|
||||||
|
|
||||||
|
# ── Packet splitting (_build_ref_payloads) ───────────────────────────────────
|
||||||
|
|
||||||
|
class TestBuildRefPayloads:
|
||||||
|
def test_small_refs_fit_in_one_packet(self, tmp_path):
|
||||||
|
relay = _make_relay(tmp_path)
|
||||||
|
refs = {"refs/heads/main": "a" * 40}
|
||||||
|
payloads = relay._build_ref_payloads("rad:z3abc123", refs, is_delta=False)
|
||||||
|
assert len(payloads) == 1
|
||||||
|
msg = json.loads(payloads[0])
|
||||||
|
assert msg["refs"] == refs
|
||||||
|
|
||||||
|
def test_large_refs_split_into_multiple_packets(self, tmp_path):
|
||||||
|
relay = _make_relay(tmp_path)
|
||||||
|
# 20 refs × ~70 bytes each ≈ 1400 bytes, well over ENCRYPTED_MDU=383
|
||||||
|
many_refs = {f"refs/heads/branch-{i:03d}": "a" * 40 for i in range(20)}
|
||||||
|
payloads = relay._build_ref_payloads("rad:z3abc123", many_refs, is_delta=False)
|
||||||
|
assert len(payloads) > 1
|
||||||
|
# All packets must fit within MDU
|
||||||
|
import RNS
|
||||||
|
assert all(len(p) <= RNS.Packet.ENCRYPTED_MDU for p in payloads)
|
||||||
|
# Combined refs must cover every original ref exactly once
|
||||||
|
combined = {}
|
||||||
|
for p in payloads:
|
||||||
|
combined.update(json.loads(p)["refs"])
|
||||||
|
assert combined == many_refs
|
||||||
|
|
||||||
|
def test_delta_flag_propagated_to_all_chunks(self, tmp_path):
|
||||||
|
relay = _make_relay(tmp_path)
|
||||||
|
many_refs = {f"refs/heads/br-{i}": "b" * 40 for i in range(20)}
|
||||||
|
payloads = relay._build_ref_payloads("rad:z3abc123", many_refs, is_delta=True)
|
||||||
|
assert all(json.loads(p).get("delta") is True for p in payloads)
|
||||||
|
|
||||||
|
def test_each_packet_under_mdu(self, tmp_path):
|
||||||
|
"""Single very-long ref name must not produce an oversized packet."""
|
||||||
|
import RNS
|
||||||
|
relay = _make_relay(tmp_path)
|
||||||
|
# pathological: ref name itself is 200 chars
|
||||||
|
refs = {"refs/heads/" + "x" * 200: "c" * 40}
|
||||||
|
payloads = relay._build_ref_payloads("rad:z3abc123", refs, is_delta=False)
|
||||||
|
# Even if we can't split a single ref below MDU, we still emit it
|
||||||
|
assert len(payloads) >= 1
|
||||||
|
|
||||||
|
|
||||||
# ── Auto-seed ─────────────────────────────────────────────────────────────────
|
# ── Auto-seed ─────────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
class TestAutoSeed:
|
class TestAutoSeed:
|
||||||
|
|
|
||||||
|
|
@ -315,6 +315,7 @@ class TestTCPTunnelIntegration:
|
||||||
patch("radicle_reticulum.bridge.RNS.Packet") as mock_pkt_cls, \
|
patch("radicle_reticulum.bridge.RNS.Packet") as mock_pkt_cls, \
|
||||||
patch("radicle_reticulum.bridge.RNS.log"):
|
patch("radicle_reticulum.bridge.RNS.log"):
|
||||||
mock_pkt_cls.side_effect = lambda lnk, data: sent_data.append(data) or MagicMock()
|
mock_pkt_cls.side_effect = lambda lnk, data: sent_data.append(data) or MagicMock()
|
||||||
|
mock_pkt_cls.ENCRYPTED_MDU = 383 # must be set so chunking uses the real value
|
||||||
|
|
||||||
from radicle_reticulum.bridge import TunnelConnection
|
from radicle_reticulum.bridge import TunnelConnection
|
||||||
tunnel = TunnelConnection(
|
tunnel = TunnelConnection(
|
||||||
|
|
@ -332,8 +333,9 @@ class TestTCPTunnelIntegration:
|
||||||
bridge._forward_tcp_to_rns(tunnel)
|
bridge._forward_tcp_to_rns(tunnel)
|
||||||
|
|
||||||
local.close()
|
local.close()
|
||||||
assert any(payload in d for d in sent_data), \
|
# Payload fits in one MDU chunk, so it should arrive as a single packet
|
||||||
f"Expected {payload!r} in forwarded data, got {sent_data}"
|
assert sent_data == [payload], \
|
||||||
|
f"Expected [{payload!r}], got {sent_data}"
|
||||||
|
|
||||||
def test_rns_data_forwarded_to_tcp_socket(self):
|
def test_rns_data_forwarded_to_tcp_socket(self):
|
||||||
"""Data received from RNS should be written to the TCP socket."""
|
"""Data received from RNS should be written to the TCP socket."""
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue