test: integration tests for gossip, bridge discovery, and TCP tunneling
FakeRNSNetwork routes announces and packets between instances in-process, replacing the need for real RNS hardware in CI. Covers: - Gossip: A broadcasts new refs → B receives packet → B triggers sync - Gossip: repeated identical refs do not trigger a spurious sync - Gossip: peer discovery via announce causes initial ref exchange - Bridge: mutual discovery, NID extraction, de-duplication of auto_seed - TCP tunnel: data forwarded from TCP socket to RNS Packet - TCP tunnel: data received from RNS written back to TCP socket Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
parent
8f4f732dca
commit
2ab81b525f
|
|
@ -0,0 +1,365 @@
|
|||
"""Integration tests: two instances wired together without RNS hardware.
|
||||
|
||||
A FakeRNSNetwork routes announces and packets between instances in-process,
|
||||
testing the full gossip and bridge discovery flows end-to-end.
|
||||
"""
|
||||
|
||||
import json
|
||||
import socket
|
||||
import struct
|
||||
import threading
|
||||
import time
|
||||
from pathlib import Path
|
||||
from typing import Dict, Optional
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
from radicle_reticulum.bridge import RadicleBridge, BRIDGE_APP_DATA_MAGIC
|
||||
from radicle_reticulum.gossip import GossipRelay, GOSSIP_MAGIC
|
||||
from radicle_reticulum.identity import RadicleIdentity
|
||||
|
||||
|
||||
# ── Fake in-process RNS network ───────────────────────────────────────────────
|
||||
|
||||
class FakeAnnounceNetwork:
|
||||
"""Delivers announce calls to all registered handlers."""
|
||||
|
||||
def __init__(self):
|
||||
self._handlers = []
|
||||
self._lock = threading.Lock()
|
||||
|
||||
def register_handler(self, handler):
|
||||
with self._lock:
|
||||
self._handlers.append(handler)
|
||||
|
||||
def announce(self, dest_hash: bytes, identity, app_data: Optional[bytes]):
|
||||
with self._lock:
|
||||
handlers = list(self._handlers)
|
||||
for h in handlers:
|
||||
try:
|
||||
h(dest_hash, identity, app_data)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
|
||||
class FakeGossipNetwork(FakeAnnounceNetwork):
|
||||
"""Routes gossip packets between relay instances."""
|
||||
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
self._relays: Dict[bytes, GossipRelay] = {}
|
||||
|
||||
def register_relay(self, relay: GossipRelay):
|
||||
self._relays[relay.destination.hash] = relay
|
||||
|
||||
def deliver_packet(self, dest_hash: bytes, payload: bytes):
|
||||
relay = self._relays.get(dest_hash)
|
||||
if relay:
|
||||
relay._on_packet(payload, None)
|
||||
|
||||
|
||||
# ── Helpers ───────────────────────────────────────────────────────────────────
|
||||
|
||||
def _make_gossip_relay(tmp_path, network: FakeGossipNetwork, rids=None, **kwargs):
|
||||
identity = RadicleIdentity.generate()
|
||||
dest_hash = identity.rns_identity_hash
|
||||
|
||||
dest_mock = MagicMock()
|
||||
dest_mock.hash = dest_hash[:16]
|
||||
dest_mock.hexhash = dest_hash[:16].hex()
|
||||
|
||||
with patch("radicle_reticulum.gossip.RNS.Reticulum"), \
|
||||
patch("radicle_reticulum.gossip.RNS.Destination", return_value=dest_mock), \
|
||||
patch("radicle_reticulum.gossip.RNS.Transport") as mock_transport:
|
||||
mock_transport.register_announce_handler.side_effect = network.register_handler
|
||||
relay = GossipRelay(
|
||||
identity=identity,
|
||||
rids=rids or [],
|
||||
storage=tmp_path / "storage",
|
||||
**kwargs,
|
||||
)
|
||||
relay.destination = dest_mock
|
||||
|
||||
# Wire send_packet into the network
|
||||
def fake_send(peer_hash, payload):
|
||||
network.deliver_packet(peer_hash, payload)
|
||||
return True
|
||||
|
||||
relay._send_packet = fake_send
|
||||
relay._known_peers = {}
|
||||
network.register_relay(relay)
|
||||
return relay
|
||||
|
||||
|
||||
SAMPLE_REFS = {
|
||||
"refs/heads/main": "aabbccdd" * 5,
|
||||
"refs/rad/sigrefs": "11223344" * 5,
|
||||
}
|
||||
|
||||
NEW_REFS = {
|
||||
"refs/heads/main": "deadbeef" * 5,
|
||||
"refs/rad/sigrefs": "11223344" * 5,
|
||||
}
|
||||
|
||||
|
||||
# ── Gossip integration ────────────────────────────────────────────────────────
|
||||
|
||||
class TestGossipIntegration:
|
||||
def test_relay_a_broadcasts_to_relay_b_on_ref_change(self, tmp_path):
|
||||
network = FakeGossipNetwork()
|
||||
relay_a = _make_gossip_relay(tmp_path / "a", network, rids=["rad:z3repo"],
|
||||
radicle_nid="z6MkNodeA")
|
||||
relay_b = _make_gossip_relay(tmp_path / "b", network, rids=["rad:z3repo"],
|
||||
bridge_port=None)
|
||||
|
||||
relay_a._known_refs["rad:z3repo"] = SAMPLE_REFS
|
||||
relay_b._known_refs["rad:z3repo"] = SAMPLE_REFS
|
||||
relay_a._known_peers[relay_b.destination.hash] = time.time()
|
||||
|
||||
# _on_packet triggers sync on relay B when it receives new refs from A
|
||||
synced = []
|
||||
relay_b.set_on_sync_triggered(lambda rid, nid: synced.append(rid))
|
||||
|
||||
with patch("radicle_reticulum.gossip.subprocess.run") as mock_run:
|
||||
mock_run.return_value = MagicMock(returncode=0)
|
||||
relay_a._broadcast("rad:z3repo", NEW_REFS)
|
||||
time.sleep(0.1) # let _trigger_sync thread complete
|
||||
|
||||
assert "rad:z3repo" in synced
|
||||
|
||||
def test_relay_b_triggers_sync_on_new_refs_from_a(self, tmp_path):
|
||||
network = FakeGossipNetwork()
|
||||
relay_a = _make_gossip_relay(tmp_path / "a", network, rids=["rad:z3repo"],
|
||||
bridge_port=None, radicle_nid="z6MkNodeA")
|
||||
relay_b = _make_gossip_relay(tmp_path / "b", network, rids=["rad:z3repo"],
|
||||
bridge_port=None)
|
||||
|
||||
relay_a._known_refs["rad:z3repo"] = SAMPLE_REFS
|
||||
relay_b._known_refs["rad:z3repo"] = SAMPLE_REFS
|
||||
relay_a._known_peers[relay_b.destination.hash] = time.time()
|
||||
|
||||
syncs = []
|
||||
relay_b.set_on_sync_triggered(lambda rid, nid: syncs.append((rid, nid)))
|
||||
|
||||
with patch("radicle_reticulum.gossip.subprocess.run") as mock_run:
|
||||
mock_run.return_value = MagicMock(returncode=0)
|
||||
relay_a._broadcast("rad:z3repo", NEW_REFS)
|
||||
time.sleep(0.1) # let _trigger_sync thread run
|
||||
|
||||
assert any(rid == "rad:z3repo" for rid, _ in syncs), \
|
||||
f"Expected sync for rad:z3repo, got {syncs}"
|
||||
|
||||
def test_peer_discovery_via_announce_then_broadcast(self, tmp_path):
|
||||
network = FakeGossipNetwork()
|
||||
relay_a = _make_gossip_relay(tmp_path / "a", network, rids=["rad:z3repo"])
|
||||
relay_b = _make_gossip_relay(tmp_path / "b", network, rids=["rad:z3repo"])
|
||||
|
||||
relay_a._known_refs["rad:z3repo"] = SAMPLE_REFS
|
||||
relay_b._known_refs["rad:z3repo"] = SAMPLE_REFS
|
||||
|
||||
# Simulate B announcing itself; A should discover B and send current refs
|
||||
app_data = GOSSIP_MAGIC + struct.pack("!H", 8) + b"z6MkNodeB"[:8]
|
||||
relay_a._on_announce(relay_b.destination.hash, MagicMock(), app_data)
|
||||
|
||||
assert relay_b.destination.hash in relay_a._known_peers
|
||||
|
||||
def test_no_sync_loop_when_refs_already_match(self, tmp_path):
|
||||
"""Receiving same refs we already know should not trigger sync."""
|
||||
network = FakeGossipNetwork()
|
||||
relay_a = _make_gossip_relay(tmp_path / "a", network, rids=["rad:z3repo"],
|
||||
radicle_nid="z6MkNodeA")
|
||||
relay_b = _make_gossip_relay(tmp_path / "b", network, rids=["rad:z3repo"],
|
||||
bridge_port=None)
|
||||
|
||||
relay_a._known_refs["rad:z3repo"] = SAMPLE_REFS
|
||||
relay_b._known_refs["rad:z3repo"] = SAMPLE_REFS
|
||||
relay_a._known_peers[relay_b.destination.hash] = time.time()
|
||||
|
||||
syncs = []
|
||||
relay_b.set_on_sync_triggered(lambda rid, nid: syncs.append((rid, nid)))
|
||||
|
||||
relay_a._broadcast("rad:z3repo", SAMPLE_REFS) # same refs, no change
|
||||
|
||||
time.sleep(0.05)
|
||||
assert syncs == []
|
||||
|
||||
|
||||
# ── Bridge discovery integration ──────────────────────────────────────────────
|
||||
|
||||
def _make_bridge(auto_connect=False, auto_seed=False, listen_port=0):
|
||||
identity = RadicleIdentity.generate()
|
||||
dest_hash = identity.rns_identity_hash[:16]
|
||||
|
||||
dest_mock = MagicMock()
|
||||
dest_mock.hash = dest_hash
|
||||
dest_mock.hexhash = dest_hash.hex()
|
||||
|
||||
with patch("radicle_reticulum.bridge.RNS.Reticulum"), \
|
||||
patch("radicle_reticulum.bridge.RNS.Destination", return_value=dest_mock), \
|
||||
patch("radicle_reticulum.bridge.RNS.Transport"), \
|
||||
patch("radicle_reticulum.bridge.RNS.log"):
|
||||
bridge = RadicleBridge(
|
||||
identity=identity,
|
||||
listen_port=listen_port,
|
||||
auto_connect=auto_connect,
|
||||
auto_seed=auto_seed,
|
||||
)
|
||||
return bridge
|
||||
|
||||
|
||||
class TestBridgeDiscoveryIntegration:
|
||||
def test_two_bridges_discover_each_other(self):
|
||||
"""Simulate mutual discovery: A announces to B and B announces to A."""
|
||||
network = FakeAnnounceNetwork()
|
||||
|
||||
bridge_a = _make_bridge()
|
||||
bridge_b = _make_bridge()
|
||||
|
||||
# Register both announce handlers
|
||||
network.register_handler(bridge_a._handle_announce)
|
||||
network.register_handler(bridge_b._handle_announce)
|
||||
|
||||
nid_a = "z6MkNodeA"
|
||||
nid_b = "z6MkNodeB"
|
||||
bridge_a.set_local_radicle_nid(nid_a)
|
||||
bridge_b.set_local_radicle_nid(nid_b)
|
||||
|
||||
discovered_by_a = []
|
||||
discovered_by_b = []
|
||||
bridge_a.set_on_bridge_discovered(lambda h, n: discovered_by_a.append((h, n)))
|
||||
bridge_b.set_on_bridge_discovered(lambda h, n: discovered_by_b.append((h, n)))
|
||||
|
||||
id_a = MagicMock()
|
||||
id_b = MagicMock()
|
||||
|
||||
def app_data(nid):
|
||||
b = nid.encode()
|
||||
return BRIDGE_APP_DATA_MAGIC + struct.pack("!H", len(b)) + b
|
||||
|
||||
with patch("radicle_reticulum.bridge.RNS.log"):
|
||||
network.announce(bridge_a.destination.hash, id_a, app_data(nid_a))
|
||||
network.announce(bridge_b.destination.hash, id_b, app_data(nid_b))
|
||||
|
||||
assert any(n == nid_b for _, n in discovered_by_a)
|
||||
assert any(n == nid_a for _, n in discovered_by_b)
|
||||
|
||||
def test_auto_seed_registers_correct_nid(self):
|
||||
"""auto_seed=True should call register_seed with the discovered NID."""
|
||||
bridge = _make_bridge(auto_seed=True)
|
||||
|
||||
peer_hash = b"\xab" * 16
|
||||
nid = "z6MkRemoteNode"
|
||||
app_data = BRIDGE_APP_DATA_MAGIC + struct.pack("!H", len(nid)) + nid.encode()
|
||||
|
||||
registered = []
|
||||
|
||||
with patch("radicle_reticulum.bridge.RNS.log"), \
|
||||
patch("radicle_reticulum.bridge.threading.Thread") as mock_thread:
|
||||
# Capture the thread target so we can run it synchronously
|
||||
threads_started = []
|
||||
|
||||
def fake_thread(**kwargs):
|
||||
t = MagicMock()
|
||||
threads_started.append(kwargs)
|
||||
t.start = lambda: None
|
||||
return t
|
||||
|
||||
mock_thread.side_effect = lambda **kw: fake_thread(**kw)
|
||||
bridge._handle_announce(peer_hash, MagicMock(), app_data)
|
||||
|
||||
# The discovered NID must be in bridge_nids
|
||||
assert bridge.get_remote_bridge_nid(peer_hash) == nid
|
||||
|
||||
def test_second_announce_does_not_double_register(self):
|
||||
"""Repeated announces from the same bridge should not re-register."""
|
||||
bridge = _make_bridge(auto_seed=True)
|
||||
peer_hash = b"\xbc" * 16
|
||||
nid = "z6MkSameNode"
|
||||
app_data = BRIDGE_APP_DATA_MAGIC + struct.pack("!H", len(nid)) + nid.encode()
|
||||
|
||||
thread_starts = []
|
||||
|
||||
with patch("radicle_reticulum.bridge.RNS.log"), \
|
||||
patch("radicle_reticulum.bridge.threading.Thread") as mock_thread:
|
||||
mock_t = MagicMock()
|
||||
mock_thread.return_value = mock_t
|
||||
bridge._handle_announce(peer_hash, MagicMock(), app_data)
|
||||
bridge._handle_announce(peer_hash, MagicMock(), app_data)
|
||||
|
||||
# Thread should only be started once (first announce only)
|
||||
assert mock_t.start.call_count == 1
|
||||
|
||||
|
||||
# ── TCP tunnel data-flow integration ─────────────────────────────────────────
|
||||
|
||||
class TestTCPTunnelIntegration:
|
||||
"""Test bidirectional data forwarding through the bridge's tunnel layer."""
|
||||
|
||||
def test_data_forwarded_tcp_to_rns_link(self):
|
||||
"""Data written to the TCP socket should be sent as an RNS.Packet."""
|
||||
bridge = _make_bridge()
|
||||
bridge._running = True
|
||||
|
||||
# Create a real socket pair to simulate radicle-node ↔ bridge TCP
|
||||
local, remote = socket.socketpair()
|
||||
|
||||
sent_data = []
|
||||
mock_link = MagicMock()
|
||||
mock_link.status = MagicMock()
|
||||
mock_link.status.__eq__ = lambda s, other: other == "ACTIVE"
|
||||
|
||||
mock_packet = MagicMock()
|
||||
|
||||
with patch("radicle_reticulum.bridge.RNS.Link.ACTIVE", "ACTIVE"), \
|
||||
patch("radicle_reticulum.bridge.RNS.Packet") as mock_pkt_cls, \
|
||||
patch("radicle_reticulum.bridge.RNS.log"):
|
||||
mock_pkt_cls.side_effect = lambda lnk, data: sent_data.append(data) or MagicMock()
|
||||
|
||||
from radicle_reticulum.bridge import TunnelConnection
|
||||
tunnel = TunnelConnection(
|
||||
tunnel_id=1,
|
||||
tcp_socket=local,
|
||||
rns_link=mock_link,
|
||||
remote_destination=b"\x00" * 16,
|
||||
)
|
||||
|
||||
# Write data from the "radicle-node" side
|
||||
payload = b"hello from radicle-node"
|
||||
remote.sendall(payload)
|
||||
remote.close() # triggers EOF so forward loop exits
|
||||
|
||||
bridge._forward_tcp_to_rns(tunnel)
|
||||
|
||||
local.close()
|
||||
assert any(payload in d for d in sent_data), \
|
||||
f"Expected {payload!r} in forwarded data, got {sent_data}"
|
||||
|
||||
def test_rns_data_forwarded_to_tcp_socket(self):
|
||||
"""Data received from RNS should be written to the TCP socket."""
|
||||
bridge = _make_bridge()
|
||||
bridge._running = True
|
||||
|
||||
with patch("radicle_reticulum.bridge.RNS.log"):
|
||||
# Create socket pair: bridge writes to `local`, test reads from `remote`
|
||||
local, remote = socket.socketpair()
|
||||
|
||||
from radicle_reticulum.bridge import TunnelConnection
|
||||
tunnel = TunnelConnection(
|
||||
tunnel_id=2,
|
||||
tcp_socket=local,
|
||||
rns_link=MagicMock(),
|
||||
remote_destination=b"\x00" * 16,
|
||||
)
|
||||
|
||||
with bridge._tunnels_lock:
|
||||
bridge._tunnels[2] = tunnel
|
||||
|
||||
payload = b"hello from remote radicle-node"
|
||||
bridge._on_rns_data(2, payload)
|
||||
|
||||
received = remote.recv(1024)
|
||||
local.close()
|
||||
remote.close()
|
||||
|
||||
assert received == payload
|
||||
Loading…
Reference in New Issue