diff --git a/tests/test_integration.py b/tests/test_integration.py new file mode 100644 index 0000000..f7aeb17 --- /dev/null +++ b/tests/test_integration.py @@ -0,0 +1,365 @@ +"""Integration tests: two instances wired together without RNS hardware. + +A FakeRNSNetwork routes announces and packets between instances in-process, +testing the full gossip and bridge discovery flows end-to-end. +""" + +import json +import socket +import struct +import threading +import time +from pathlib import Path +from typing import Dict, Optional +from unittest.mock import MagicMock, patch + +import pytest + +from radicle_reticulum.bridge import RadicleBridge, BRIDGE_APP_DATA_MAGIC +from radicle_reticulum.gossip import GossipRelay, GOSSIP_MAGIC +from radicle_reticulum.identity import RadicleIdentity + + +# ── Fake in-process RNS network ─────────────────────────────────────────────── + +class FakeAnnounceNetwork: + """Delivers announce calls to all registered handlers.""" + + def __init__(self): + self._handlers = [] + self._lock = threading.Lock() + + def register_handler(self, handler): + with self._lock: + self._handlers.append(handler) + + def announce(self, dest_hash: bytes, identity, app_data: Optional[bytes]): + with self._lock: + handlers = list(self._handlers) + for h in handlers: + try: + h(dest_hash, identity, app_data) + except Exception: + pass + + +class FakeGossipNetwork(FakeAnnounceNetwork): + """Routes gossip packets between relay instances.""" + + def __init__(self): + super().__init__() + self._relays: Dict[bytes, GossipRelay] = {} + + def register_relay(self, relay: GossipRelay): + self._relays[relay.destination.hash] = relay + + def deliver_packet(self, dest_hash: bytes, payload: bytes): + relay = self._relays.get(dest_hash) + if relay: + relay._on_packet(payload, None) + + +# ── Helpers ─────────────────────────────────────────────────────────────────── + +def _make_gossip_relay(tmp_path, network: FakeGossipNetwork, rids=None, **kwargs): + identity = RadicleIdentity.generate() + dest_hash = identity.rns_identity_hash + + dest_mock = MagicMock() + dest_mock.hash = dest_hash[:16] + dest_mock.hexhash = dest_hash[:16].hex() + + with patch("radicle_reticulum.gossip.RNS.Reticulum"), \ + patch("radicle_reticulum.gossip.RNS.Destination", return_value=dest_mock), \ + patch("radicle_reticulum.gossip.RNS.Transport") as mock_transport: + mock_transport.register_announce_handler.side_effect = network.register_handler + relay = GossipRelay( + identity=identity, + rids=rids or [], + storage=tmp_path / "storage", + **kwargs, + ) + relay.destination = dest_mock + + # Wire send_packet into the network + def fake_send(peer_hash, payload): + network.deliver_packet(peer_hash, payload) + return True + + relay._send_packet = fake_send + relay._known_peers = {} + network.register_relay(relay) + return relay + + +SAMPLE_REFS = { + "refs/heads/main": "aabbccdd" * 5, + "refs/rad/sigrefs": "11223344" * 5, +} + +NEW_REFS = { + "refs/heads/main": "deadbeef" * 5, + "refs/rad/sigrefs": "11223344" * 5, +} + + +# ── Gossip integration ──────────────────────────────────────────────────────── + +class TestGossipIntegration: + def test_relay_a_broadcasts_to_relay_b_on_ref_change(self, tmp_path): + network = FakeGossipNetwork() + relay_a = _make_gossip_relay(tmp_path / "a", network, rids=["rad:z3repo"], + radicle_nid="z6MkNodeA") + relay_b = _make_gossip_relay(tmp_path / "b", network, rids=["rad:z3repo"], + bridge_port=None) + + relay_a._known_refs["rad:z3repo"] = SAMPLE_REFS + relay_b._known_refs["rad:z3repo"] = SAMPLE_REFS + relay_a._known_peers[relay_b.destination.hash] = time.time() + + # _on_packet triggers sync on relay B when it receives new refs from A + synced = [] + relay_b.set_on_sync_triggered(lambda rid, nid: synced.append(rid)) + + with patch("radicle_reticulum.gossip.subprocess.run") as mock_run: + mock_run.return_value = MagicMock(returncode=0) + relay_a._broadcast("rad:z3repo", NEW_REFS) + time.sleep(0.1) # let _trigger_sync thread complete + + assert "rad:z3repo" in synced + + def test_relay_b_triggers_sync_on_new_refs_from_a(self, tmp_path): + network = FakeGossipNetwork() + relay_a = _make_gossip_relay(tmp_path / "a", network, rids=["rad:z3repo"], + bridge_port=None, radicle_nid="z6MkNodeA") + relay_b = _make_gossip_relay(tmp_path / "b", network, rids=["rad:z3repo"], + bridge_port=None) + + relay_a._known_refs["rad:z3repo"] = SAMPLE_REFS + relay_b._known_refs["rad:z3repo"] = SAMPLE_REFS + relay_a._known_peers[relay_b.destination.hash] = time.time() + + syncs = [] + relay_b.set_on_sync_triggered(lambda rid, nid: syncs.append((rid, nid))) + + with patch("radicle_reticulum.gossip.subprocess.run") as mock_run: + mock_run.return_value = MagicMock(returncode=0) + relay_a._broadcast("rad:z3repo", NEW_REFS) + time.sleep(0.1) # let _trigger_sync thread run + + assert any(rid == "rad:z3repo" for rid, _ in syncs), \ + f"Expected sync for rad:z3repo, got {syncs}" + + def test_peer_discovery_via_announce_then_broadcast(self, tmp_path): + network = FakeGossipNetwork() + relay_a = _make_gossip_relay(tmp_path / "a", network, rids=["rad:z3repo"]) + relay_b = _make_gossip_relay(tmp_path / "b", network, rids=["rad:z3repo"]) + + relay_a._known_refs["rad:z3repo"] = SAMPLE_REFS + relay_b._known_refs["rad:z3repo"] = SAMPLE_REFS + + # Simulate B announcing itself; A should discover B and send current refs + app_data = GOSSIP_MAGIC + struct.pack("!H", 8) + b"z6MkNodeB"[:8] + relay_a._on_announce(relay_b.destination.hash, MagicMock(), app_data) + + assert relay_b.destination.hash in relay_a._known_peers + + def test_no_sync_loop_when_refs_already_match(self, tmp_path): + """Receiving same refs we already know should not trigger sync.""" + network = FakeGossipNetwork() + relay_a = _make_gossip_relay(tmp_path / "a", network, rids=["rad:z3repo"], + radicle_nid="z6MkNodeA") + relay_b = _make_gossip_relay(tmp_path / "b", network, rids=["rad:z3repo"], + bridge_port=None) + + relay_a._known_refs["rad:z3repo"] = SAMPLE_REFS + relay_b._known_refs["rad:z3repo"] = SAMPLE_REFS + relay_a._known_peers[relay_b.destination.hash] = time.time() + + syncs = [] + relay_b.set_on_sync_triggered(lambda rid, nid: syncs.append((rid, nid))) + + relay_a._broadcast("rad:z3repo", SAMPLE_REFS) # same refs, no change + + time.sleep(0.05) + assert syncs == [] + + +# ── Bridge discovery integration ────────────────────────────────────────────── + +def _make_bridge(auto_connect=False, auto_seed=False, listen_port=0): + identity = RadicleIdentity.generate() + dest_hash = identity.rns_identity_hash[:16] + + dest_mock = MagicMock() + dest_mock.hash = dest_hash + dest_mock.hexhash = dest_hash.hex() + + with patch("radicle_reticulum.bridge.RNS.Reticulum"), \ + patch("radicle_reticulum.bridge.RNS.Destination", return_value=dest_mock), \ + patch("radicle_reticulum.bridge.RNS.Transport"), \ + patch("radicle_reticulum.bridge.RNS.log"): + bridge = RadicleBridge( + identity=identity, + listen_port=listen_port, + auto_connect=auto_connect, + auto_seed=auto_seed, + ) + return bridge + + +class TestBridgeDiscoveryIntegration: + def test_two_bridges_discover_each_other(self): + """Simulate mutual discovery: A announces to B and B announces to A.""" + network = FakeAnnounceNetwork() + + bridge_a = _make_bridge() + bridge_b = _make_bridge() + + # Register both announce handlers + network.register_handler(bridge_a._handle_announce) + network.register_handler(bridge_b._handle_announce) + + nid_a = "z6MkNodeA" + nid_b = "z6MkNodeB" + bridge_a.set_local_radicle_nid(nid_a) + bridge_b.set_local_radicle_nid(nid_b) + + discovered_by_a = [] + discovered_by_b = [] + bridge_a.set_on_bridge_discovered(lambda h, n: discovered_by_a.append((h, n))) + bridge_b.set_on_bridge_discovered(lambda h, n: discovered_by_b.append((h, n))) + + id_a = MagicMock() + id_b = MagicMock() + + def app_data(nid): + b = nid.encode() + return BRIDGE_APP_DATA_MAGIC + struct.pack("!H", len(b)) + b + + with patch("radicle_reticulum.bridge.RNS.log"): + network.announce(bridge_a.destination.hash, id_a, app_data(nid_a)) + network.announce(bridge_b.destination.hash, id_b, app_data(nid_b)) + + assert any(n == nid_b for _, n in discovered_by_a) + assert any(n == nid_a for _, n in discovered_by_b) + + def test_auto_seed_registers_correct_nid(self): + """auto_seed=True should call register_seed with the discovered NID.""" + bridge = _make_bridge(auto_seed=True) + + peer_hash = b"\xab" * 16 + nid = "z6MkRemoteNode" + app_data = BRIDGE_APP_DATA_MAGIC + struct.pack("!H", len(nid)) + nid.encode() + + registered = [] + + with patch("radicle_reticulum.bridge.RNS.log"), \ + patch("radicle_reticulum.bridge.threading.Thread") as mock_thread: + # Capture the thread target so we can run it synchronously + threads_started = [] + + def fake_thread(**kwargs): + t = MagicMock() + threads_started.append(kwargs) + t.start = lambda: None + return t + + mock_thread.side_effect = lambda **kw: fake_thread(**kw) + bridge._handle_announce(peer_hash, MagicMock(), app_data) + + # The discovered NID must be in bridge_nids + assert bridge.get_remote_bridge_nid(peer_hash) == nid + + def test_second_announce_does_not_double_register(self): + """Repeated announces from the same bridge should not re-register.""" + bridge = _make_bridge(auto_seed=True) + peer_hash = b"\xbc" * 16 + nid = "z6MkSameNode" + app_data = BRIDGE_APP_DATA_MAGIC + struct.pack("!H", len(nid)) + nid.encode() + + thread_starts = [] + + with patch("radicle_reticulum.bridge.RNS.log"), \ + patch("radicle_reticulum.bridge.threading.Thread") as mock_thread: + mock_t = MagicMock() + mock_thread.return_value = mock_t + bridge._handle_announce(peer_hash, MagicMock(), app_data) + bridge._handle_announce(peer_hash, MagicMock(), app_data) + + # Thread should only be started once (first announce only) + assert mock_t.start.call_count == 1 + + +# ── TCP tunnel data-flow integration ───────────────────────────────────────── + +class TestTCPTunnelIntegration: + """Test bidirectional data forwarding through the bridge's tunnel layer.""" + + def test_data_forwarded_tcp_to_rns_link(self): + """Data written to the TCP socket should be sent as an RNS.Packet.""" + bridge = _make_bridge() + bridge._running = True + + # Create a real socket pair to simulate radicle-node ↔ bridge TCP + local, remote = socket.socketpair() + + sent_data = [] + mock_link = MagicMock() + mock_link.status = MagicMock() + mock_link.status.__eq__ = lambda s, other: other == "ACTIVE" + + mock_packet = MagicMock() + + with patch("radicle_reticulum.bridge.RNS.Link.ACTIVE", "ACTIVE"), \ + patch("radicle_reticulum.bridge.RNS.Packet") as mock_pkt_cls, \ + patch("radicle_reticulum.bridge.RNS.log"): + mock_pkt_cls.side_effect = lambda lnk, data: sent_data.append(data) or MagicMock() + + from radicle_reticulum.bridge import TunnelConnection + tunnel = TunnelConnection( + tunnel_id=1, + tcp_socket=local, + rns_link=mock_link, + remote_destination=b"\x00" * 16, + ) + + # Write data from the "radicle-node" side + payload = b"hello from radicle-node" + remote.sendall(payload) + remote.close() # triggers EOF so forward loop exits + + bridge._forward_tcp_to_rns(tunnel) + + local.close() + assert any(payload in d for d in sent_data), \ + f"Expected {payload!r} in forwarded data, got {sent_data}" + + def test_rns_data_forwarded_to_tcp_socket(self): + """Data received from RNS should be written to the TCP socket.""" + bridge = _make_bridge() + bridge._running = True + + with patch("radicle_reticulum.bridge.RNS.log"): + # Create socket pair: bridge writes to `local`, test reads from `remote` + local, remote = socket.socketpair() + + from radicle_reticulum.bridge import TunnelConnection + tunnel = TunnelConnection( + tunnel_id=2, + tcp_socket=local, + rns_link=MagicMock(), + remote_destination=b"\x00" * 16, + ) + + with bridge._tunnels_lock: + bridge._tunnels[2] = tunnel + + payload = b"hello from remote radicle-node" + bridge._on_rns_data(2, payload) + + received = remote.recv(1024) + local.close() + remote.close() + + assert received == payload