"""Tests for GossipRelay.""" import json import subprocess import time from pathlib import Path from unittest.mock import MagicMock, patch, call import pytest from radicle_reticulum.gossip import GossipRelay, _read_refs, _radicle_storage_path from radicle_reticulum.identity import RadicleIdentity # ── Helpers ────────────────────────────────────────────────────────────────── def _make_relay(tmp_path, rids=None, **kwargs): identity = RadicleIdentity.generate() with patch("radicle_reticulum.gossip.RNS.Reticulum"), \ patch("radicle_reticulum.gossip.RNS.Destination") as mock_dest_cls, \ patch("radicle_reticulum.gossip.RNS.Transport"): mock_dest = MagicMock() mock_dest.hash = b"\x00" * 16 mock_dest.hexhash = "0" * 32 mock_dest_cls.return_value = mock_dest relay = GossipRelay( identity=identity, rids=rids or ["rad:z3abc123"], storage=tmp_path / "storage", **kwargs, ) relay.destination = mock_dest return relay SAMPLE_REFS = { "refs/heads/main": "abc123def456" * 3, "refs/rad/sigrefs": "def456abc123" * 3, } # ── _read_refs ──────────────────────────────────────────────────────────────── class TestReadRefs: def test_missing_storage_returns_empty(self, tmp_path): result = _read_refs(tmp_path / "storage", "rad:z3nonexistent") assert result == {} def test_reads_refs_from_bare_repo(self, tmp_path): rid = "rad:z3testrepo" repo_path = tmp_path / "z3testrepo" repo_path.mkdir() show_ref_output = ( "abc123def456abc123def456abc123def456abc1 refs/heads/main\n" "def456abc123def456abc123def456abc123def4 refs/rad/sigrefs\n" ) with patch("radicle_reticulum.gossip.subprocess.run") as mock_run: mock_run.return_value = MagicMock(returncode=0, stdout=show_ref_output) result = _read_refs(tmp_path, rid) assert result["refs/heads/main"] == "abc123def456abc123def456abc123def456abc1" assert result["refs/rad/sigrefs"] == "def456abc123def456abc123def456abc123def4" def test_strips_rad_prefix(self, tmp_path): # rid with "rad:" prefix should map to directory without it repo_path = tmp_path / "z3abc" repo_path.mkdir() result = _read_refs(tmp_path, "rad:z3abc") assert result == {} # empty repo, no refs — just confirming no crash # ── GossipRelay construction ────────────────────────────────────────────────── class TestGossipRelayInit: def test_defaults(self, tmp_path): relay = _make_relay(tmp_path) assert relay.rids == ["rad:z3abc123"] assert relay.poll_interval == 30 assert relay.bridge_port == 8777 assert relay.announce_retry_delays == (5, 15, 30) def test_custom_params(self, tmp_path): relay = _make_relay( tmp_path, rids=["rad:z3a", "rad:z3b"], bridge_port=9000, poll_interval=60, ) assert len(relay.rids) == 2 assert relay.bridge_port == 9000 assert relay.poll_interval == 60 # ── Broadcast ──────────────────────────────────────────────────────────────── class TestBroadcast: def test_broadcast_sends_to_known_peers(self, tmp_path): relay = _make_relay(tmp_path, radicle_nid="z6Mktest") peer_hash = b"\x01" * 16 relay._known_peers[peer_hash] = time.time() with patch.object(relay, "_send_packet", return_value=True) as mock_send: relay._broadcast("rad:z3abc123", SAMPLE_REFS) mock_send.assert_called_once() args = mock_send.call_args[0] assert args[0] == peer_hash msg = json.loads(args[1].decode()) assert msg["rid"] == "rad:z3abc123" assert msg["nid"] == "z6Mktest" assert msg["refs"] == SAMPLE_REFS def test_broadcast_no_peers_sends_nothing(self, tmp_path): relay = _make_relay(tmp_path) with patch.object(relay, "_send_packet") as mock_send: relay._broadcast("rad:z3abc123", SAMPLE_REFS) mock_send.assert_not_called() def test_broadcast_multiple_peers(self, tmp_path): relay = _make_relay(tmp_path) for i in range(3): relay._known_peers[bytes([i]) * 16] = time.time() with patch.object(relay, "_send_packet", return_value=True) as mock_send: relay._broadcast("rad:z3abc123", SAMPLE_REFS) assert mock_send.call_count == 3 # ── Poll loop ───────────────────────────────────────────────────────────────── class TestPollLoop: def test_broadcasts_on_ref_change(self, tmp_path): relay = _make_relay(tmp_path) relay._known_refs["rad:z3abc123"] = {"refs/heads/main": "old_sha"} new_refs = {"refs/heads/main": "new_sha"} with patch("radicle_reticulum.gossip._read_refs", return_value=new_refs), \ patch.object(relay, "_broadcast") as mock_broadcast: # Simulate one poll iteration for rid in relay.rids: refs = new_refs old = relay._known_refs.get(rid) if refs and refs != old: if old is not None: relay._broadcast(rid, refs) relay._known_refs[rid] = refs mock_broadcast.assert_called_once_with("rad:z3abc123", new_refs) def test_no_broadcast_on_first_poll(self, tmp_path): relay = _make_relay(tmp_path) # No existing refs — first poll should not broadcast with patch("radicle_reticulum.gossip._read_refs", return_value=SAMPLE_REFS), \ patch.object(relay, "_broadcast") as mock_broadcast: for rid in relay.rids: refs = SAMPLE_REFS old = relay._known_refs.get(rid) if refs and refs != old: if old is not None: relay._broadcast(rid, refs) relay._known_refs[rid] = refs mock_broadcast.assert_not_called() assert relay._known_refs["rad:z3abc123"] == SAMPLE_REFS def test_no_broadcast_when_refs_unchanged(self, tmp_path): relay = _make_relay(tmp_path) relay._known_refs["rad:z3abc123"] = SAMPLE_REFS with patch("radicle_reticulum.gossip._read_refs", return_value=SAMPLE_REFS), \ patch.object(relay, "_broadcast") as mock_broadcast: for rid in relay.rids: refs = SAMPLE_REFS old = relay._known_refs.get(rid) if refs and refs != old: if old is not None: relay._broadcast(rid, refs) mock_broadcast.assert_not_called() # ── Incoming packets ────────────────────────────────────────────────────────── class TestOnPacket: def _make_packet(self, rid, nid, refs): return json.dumps({"type": "refs", "rid": rid, "nid": nid, "refs": refs}).encode() def test_triggers_sync_on_new_refs(self, tmp_path): relay = _make_relay(tmp_path) relay._known_refs["rad:z3abc123"] = {"refs/heads/main": "old"} mock_packet = MagicMock() with patch.object(relay, "_trigger_sync") as mock_sync: relay._on_packet( self._make_packet("rad:z3abc123", "z6Mkpeer", {"refs/heads/main": "new"}), mock_packet, ) mock_sync.assert_called_once_with("rad:z3abc123", "z6Mkpeer") def test_no_sync_when_refs_match(self, tmp_path): relay = _make_relay(tmp_path) relay._known_refs["rad:z3abc123"] = SAMPLE_REFS mock_packet = MagicMock() with patch.object(relay, "_trigger_sync") as mock_sync: relay._on_packet( self._make_packet("rad:z3abc123", "z6Mkpeer", SAMPLE_REFS), mock_packet, ) mock_sync.assert_not_called() def test_ignores_invalid_json(self, tmp_path): relay = _make_relay(tmp_path) mock_packet = MagicMock() # Should not raise relay._on_packet(b"not json at all", mock_packet) def test_ignores_wrong_type(self, tmp_path): relay = _make_relay(tmp_path) mock_packet = MagicMock() with patch.object(relay, "_trigger_sync") as mock_sync: relay._on_packet( json.dumps({"type": "ping", "rid": "rad:z3abc123"}).encode(), mock_packet, ) mock_sync.assert_not_called() def test_triggers_sync_for_unknown_repo(self, tmp_path): relay = _make_relay(tmp_path) # No prior known refs — any incoming refs are "new" mock_packet = MagicMock() with patch.object(relay, "_trigger_sync") as mock_sync: relay._on_packet( self._make_packet("rad:z3unknown", "z6Mkpeer", SAMPLE_REFS), mock_packet, ) mock_sync.assert_called_once() def test_sync_callback_invoked(self, tmp_path): relay = _make_relay(tmp_path) received = [] relay.set_on_sync_triggered(lambda rid, nid: received.append((rid, nid))) with patch("radicle_reticulum.gossip.subprocess.run") as mock_run: mock_run.return_value = MagicMock(returncode=0) relay._trigger_sync("rad:z3abc123", "z6Mkpeer") assert received == [("rad:z3abc123", "z6Mkpeer")] def test_delta_packet_merges_with_local_refs(self, tmp_path): """A delta packet updates only the changed refs, keeping others intact.""" relay = _make_relay(tmp_path) relay._known_refs["rad:z3abc123"] = { "refs/heads/main": "aaa", "refs/heads/dev": "bbb", } delta_packet = json.dumps({ "type": "refs", "rid": "rad:z3abc123", "nid": "z6Mkpeer", "delta": True, "refs": {"refs/heads/main": "ccc"}, # only main changed }).encode() with patch.object(relay, "_trigger_sync") as mock_sync: relay._on_packet(delta_packet, MagicMock()) mock_sync.assert_called_once() def test_delta_packet_no_sync_when_already_known(self, tmp_path): """Delta packet with refs we already know should not trigger sync.""" relay = _make_relay(tmp_path) relay._known_refs["rad:z3abc123"] = {"refs/heads/main": "aaa"} delta_packet = json.dumps({ "type": "refs", "rid": "rad:z3abc123", "nid": "z6Mkpeer", "delta": True, "refs": {"refs/heads/main": "aaa"}, # same value }).encode() with patch.object(relay, "_trigger_sync") as mock_sync: relay._on_packet(delta_packet, MagicMock()) mock_sync.assert_not_called() def test_auto_seed_triggered_for_unknown_repo(self, tmp_path): """When auto_seed=True and RID is unknown, _auto_seed_and_sync is called.""" relay = _make_relay(tmp_path, auto_seed=True) packet = self._make_packet("rad:z3brand_new", "z6Mkpeer", SAMPLE_REFS) with patch.object(relay, "_auto_seed_and_sync") as mock_auto, \ patch.object(relay, "_trigger_sync") as mock_sync: relay._on_packet(packet, MagicMock()) mock_auto.assert_called_once_with("rad:z3brand_new", "z6Mkpeer") mock_sync.assert_not_called() def test_auto_seed_not_triggered_for_tracked_repo(self, tmp_path): """auto_seed=True must not re-seed repos already in self.rids.""" relay = _make_relay(tmp_path, auto_seed=True) # rad:z3abc123 is already in rids (default from _make_relay) relay._known_refs["rad:z3abc123"] = {"refs/heads/main": "old"} with patch.object(relay, "_auto_seed_and_sync") as mock_auto, \ patch.object(relay, "_trigger_sync") as mock_sync: relay._on_packet( self._make_packet("rad:z3abc123", "z6Mkpeer", {"refs/heads/main": "new"}), MagicMock(), ) mock_auto.assert_not_called() mock_sync.assert_called_once() # ── Broadcast delta ─────────────────────────────────────────────────────────── class TestBroadcastDelta: def test_full_broadcast_when_no_old_refs(self, tmp_path): relay = _make_relay(tmp_path) relay._known_peers[b"\x01" * 16] = 0.0 sent = [] relay._send_packet = lambda h, p: sent.append(json.loads(p)) or True relay._broadcast("rad:z3abc123", SAMPLE_REFS) assert len(sent) == 1 assert sent[0].get("delta") is None or sent[0].get("delta") is False assert sent[0]["refs"] == SAMPLE_REFS def test_delta_broadcast_only_sends_changed_refs(self, tmp_path): relay = _make_relay(tmp_path) relay._known_peers[b"\x01" * 16] = 0.0 sent = [] relay._send_packet = lambda h, p: sent.append(json.loads(p)) or True old = {"refs/heads/main": "aaa", "refs/heads/dev": "bbb"} new = {"refs/heads/main": "ccc", "refs/heads/dev": "bbb"} # only main changed relay._broadcast("rad:z3abc123", new, old_refs=old) assert len(sent) == 1 assert sent[0]["delta"] is True assert sent[0]["refs"] == {"refs/heads/main": "ccc"} assert "refs/heads/dev" not in sent[0]["refs"] def test_initial_refs_send_is_full_not_delta(self, tmp_path): relay = _make_relay(tmp_path) relay._known_refs["rad:z3abc123"] = SAMPLE_REFS sent = [] relay._send_packet = lambda h, p: sent.append(json.loads(p)) or True relay._send_initial_refs(b"\x02" * 16) assert len(sent) == 1 assert sent[0].get("delta") is None or sent[0].get("delta") is False assert sent[0]["refs"] == SAMPLE_REFS # ── Packet splitting (_build_ref_payloads) ─────────────────────────────────── class TestBuildRefPayloads: def test_small_refs_fit_in_one_packet(self, tmp_path): relay = _make_relay(tmp_path) refs = {"refs/heads/main": "a" * 40} payloads = relay._build_ref_payloads("rad:z3abc123", refs, is_delta=False) assert len(payloads) == 1 msg = json.loads(payloads[0]) assert msg["refs"] == refs def test_large_refs_split_into_multiple_packets(self, tmp_path): relay = _make_relay(tmp_path) # 20 refs × ~70 bytes each ≈ 1400 bytes, well over ENCRYPTED_MDU=383 many_refs = {f"refs/heads/branch-{i:03d}": "a" * 40 for i in range(20)} payloads = relay._build_ref_payloads("rad:z3abc123", many_refs, is_delta=False) assert len(payloads) > 1 # All packets must fit within MDU import RNS assert all(len(p) <= RNS.Packet.ENCRYPTED_MDU for p in payloads) # Combined refs must cover every original ref exactly once combined = {} for p in payloads: combined.update(json.loads(p)["refs"]) assert combined == many_refs def test_delta_flag_propagated_to_all_chunks(self, tmp_path): relay = _make_relay(tmp_path) many_refs = {f"refs/heads/br-{i}": "b" * 40 for i in range(20)} payloads = relay._build_ref_payloads("rad:z3abc123", many_refs, is_delta=True) assert all(json.loads(p).get("delta") is True for p in payloads) def test_each_packet_under_mdu(self, tmp_path): """Single very-long ref name must not produce an oversized packet.""" import RNS relay = _make_relay(tmp_path) # pathological: ref name itself is 200 chars refs = {"refs/heads/" + "x" * 200: "c" * 40} payloads = relay._build_ref_payloads("rad:z3abc123", refs, is_delta=False) # Even if we can't split a single ref below MDU, we still emit it assert len(payloads) >= 1 # ── Auto-seed ───────────────────────────────────────────────────────────────── class TestAutoSeed: def test_seeds_and_syncs_unknown_repo(self, tmp_path): relay = _make_relay(tmp_path, auto_seed=True) with patch("radicle_reticulum.gossip.subprocess.run") as mock_run, \ patch.object(relay, "_trigger_sync") as mock_sync: mock_run.return_value = MagicMock(returncode=0) relay._auto_seed_and_sync("rad:z3brand_new", "z6Mkpeer") calls = [c[0][0] for c in mock_run.call_args_list] assert any("seed" in c for c in calls) assert "rad:z3brand_new" in relay.rids mock_sync.assert_called_once_with("rad:z3brand_new", "z6Mkpeer") def test_no_sync_when_seed_fails(self, tmp_path): relay = _make_relay(tmp_path, auto_seed=True) with patch("radicle_reticulum.gossip.subprocess.run") as mock_run, \ patch.object(relay, "_trigger_sync") as mock_sync: mock_run.return_value = MagicMock(returncode=1, stderr="permission denied") relay._auto_seed_and_sync("rad:z3brand_new", "z6Mkpeer") mock_sync.assert_not_called() assert "rad:z3brand_new" not in relay.rids def test_does_not_duplicate_rid(self, tmp_path): relay = _make_relay(tmp_path, auto_seed=True) relay.rids.append("rad:z3already") with patch("radicle_reticulum.gossip.subprocess.run") as mock_run, \ patch.object(relay, "_trigger_sync"): mock_run.return_value = MagicMock(returncode=0) relay._auto_seed_and_sync("rad:z3already", "z6Mkpeer") assert relay.rids.count("rad:z3already") == 1 # ── Trigger sync ────────────────────────────────────────────────────────────── class TestTriggerSync: def test_calls_rad_node_connect_when_nid_given(self, tmp_path): relay = _make_relay(tmp_path, bridge_port=8777) with patch("radicle_reticulum.gossip.subprocess.run") as mock_run: mock_run.return_value = MagicMock(returncode=0) relay._trigger_sync("rad:z3abc123", "z6Mkpeer123") calls = [c[0][0] for c in mock_run.call_args_list] connect_calls = [c for c in calls if "connect" in c] assert connect_calls assert "z6Mkpeer123@127.0.0.1:8777" in connect_calls[0] def test_skips_connect_when_no_nid(self, tmp_path): relay = _make_relay(tmp_path) with patch("radicle_reticulum.gossip.subprocess.run") as mock_run: mock_run.return_value = MagicMock(returncode=0) relay._trigger_sync("rad:z3abc123", "") calls = [c[0][0] for c in mock_run.call_args_list] connect_calls = [c for c in calls if "connect" in c] assert not connect_calls def test_calls_rad_sync_fetch(self, tmp_path): relay = _make_relay(tmp_path) with patch("radicle_reticulum.gossip.subprocess.run") as mock_run: mock_run.return_value = MagicMock(returncode=0) relay._trigger_sync("rad:z3abc123", "") all_calls = [c[0][0] for c in mock_run.call_args_list] sync_calls = [c for c in all_calls if "sync" in c] assert sync_calls assert "--fetch" in sync_calls[0] assert "--rid" in sync_calls[0] def test_seed_flag_added_when_connect_succeeds(self, tmp_path): relay = _make_relay(tmp_path, bridge_port=8777) with patch("radicle_reticulum.gossip.subprocess.run") as mock_run: mock_run.return_value = MagicMock(returncode=0) relay._trigger_sync("rad:z3abc123", "z6Mkpeer") all_calls = [c[0][0] for c in mock_run.call_args_list] sync_calls = [c for c in all_calls if "sync" in c] assert "--seed" in sync_calls[0] assert any("z6Mkpeer@127.0.0.1:8777" in arg for arg in sync_calls[0]) def test_no_seed_flag_when_connect_fails(self, tmp_path): relay = _make_relay(tmp_path, bridge_port=8777) call_count = 0 def fake_run(cmd, **kwargs): nonlocal call_count call_count += 1 # First call (connect) fails; second call (sync) succeeds return MagicMock(returncode=1 if call_count == 1 else 0) with patch("radicle_reticulum.gossip.subprocess.run", side_effect=fake_run): relay._trigger_sync("rad:z3abc123", "z6Mkpeer") # Sync should not include --seed when connect failed # (we can't easily inspect the call args here, but the function should not raise) assert call_count == 2 # connect attempted + sync still ran def test_debounce_clears_event_on_early_wakeup(self, tmp_path): """Poll loop should absorb rapid events and only poll once after debounce.""" relay = _make_relay(tmp_path) relay._running = True polled = [] original_once = relay._poll_loop_once relay._poll_loop_once = lambda: polled.append(1) # Simulate: event already set when loop starts relay._poll_event.set() import threading, time t = threading.Thread(target=relay._poll_loop, daemon=True) t.start() time.sleep(0.1) relay._running = False relay._poll_event.set() t.join(timeout=3) # Should have polled at least once (startup poll) and at most twice assert 1 <= len(polled) <= 2 # ── Peer discovery ──────────────────────────────────────────────────────────── class TestPeerDiscovery: def _make_app_data(self, nid=None): import struct from radicle_reticulum.gossip import GOSSIP_MAGIC data = GOSSIP_MAGIC if nid: b = nid.encode() data += struct.pack("!H", len(b)) + b return data def test_new_peer_added(self, tmp_path): relay = _make_relay(tmp_path) peer_hash = b"\x02" * 16 relay._known_refs["rad:z3abc123"] = SAMPLE_REFS with patch.object(relay, "_send_packet"): relay._on_announce(peer_hash, MagicMock(), self._make_app_data("z6Mkpeer")) assert peer_hash in relay._known_peers def test_own_announce_ignored(self, tmp_path): relay = _make_relay(tmp_path) own_hash = relay.destination.hash with patch.object(relay, "_send_packet") as mock_send: relay._on_announce(own_hash, MagicMock(), self._make_app_data()) mock_send.assert_not_called() assert own_hash not in relay._known_peers def test_non_gossip_announce_ignored(self, tmp_path): relay = _make_relay(tmp_path) peer_hash = b"\x03" * 16 with patch.object(relay, "_send_packet") as mock_send: relay._on_announce(peer_hash, MagicMock(), b"OTHER_APP_DATA") mock_send.assert_not_called() assert peer_hash not in relay._known_peers def test_sends_current_refs_to_new_peer(self, tmp_path): relay = _make_relay(tmp_path) relay._known_refs["rad:z3abc123"] = SAMPLE_REFS peer_hash = b"\x04" * 16 with patch.object(relay, "_send_packet") as mock_send: relay._on_announce(peer_hash, MagicMock(), self._make_app_data()) mock_send.assert_called_once() assert mock_send.call_args[0][0] == peer_hash def test_duplicate_announce_not_re_sent(self, tmp_path): relay = _make_relay(tmp_path) relay._known_refs["rad:z3abc123"] = SAMPLE_REFS peer_hash = b"\x05" * 16 relay._known_peers[peer_hash] = time.time() # already known with patch.object(relay, "_send_packet") as mock_send: relay._on_announce(peer_hash, MagicMock(), self._make_app_data()) mock_send.assert_not_called() # ── push_refs_now ───────────────────────────────────────────────────────────── class TestPushRefsNow: def test_push_refs_now_broadcasts_immediately(self, tmp_path): relay = _make_relay(tmp_path) peer_hash = b"\x06" * 16 relay._known_peers[peer_hash] = time.time() with patch("radicle_reticulum.gossip._read_refs", return_value=SAMPLE_REFS), \ patch.object(relay, "_send_packet", return_value=True) as mock_send: relay.push_refs_now("rad:z3abc123") mock_send.assert_called_once() def test_push_refs_now_updates_known_refs(self, tmp_path): relay = _make_relay(tmp_path) with patch("radicle_reticulum.gossip._read_refs", return_value=SAMPLE_REFS), \ patch.object(relay, "_send_packet", return_value=True): relay.push_refs_now("rad:z3abc123") assert relay._known_refs["rad:z3abc123"] == SAMPLE_REFS # ── Watchdog / poll event ────────────────────────────────────────────────────── class TestWatchdog: def test_poll_event_wakes_poll_loop_early(self, tmp_path): relay = _make_relay(tmp_path) relay._known_refs["rad:z3abc123"] = {"refs/heads/main": "old"} new_refs = {"refs/heads/main": "new"} broadcasts = [] with patch("radicle_reticulum.gossip._read_refs", return_value=new_refs), \ patch.object(relay, "_broadcast", side_effect=lambda r, refs, **kw: broadcasts.append(r)): # Signal the event (simulates watchdog firing) relay._poll_event.set() # Run one poll iteration relay._poll_loop_once() assert broadcasts == ["rad:z3abc123"] def test_stop_sets_poll_event(self, tmp_path): relay = _make_relay(tmp_path) relay._running = True with patch("radicle_reticulum.gossip.RNS.log"): relay.stop() assert relay._poll_event.is_set() def test_start_watcher_graceful_without_watchdog(self, tmp_path): relay = _make_relay(tmp_path) with patch.dict("sys.modules", {"watchdog": None, "watchdog.observers": None, "watchdog.events": None}), \ patch("radicle_reticulum.gossip.RNS.log"): relay._start_watcher() assert relay._observer is None def test_auto_discover_adds_new_repos(self, tmp_path): relay = _make_relay(tmp_path, rids=[]) storage = tmp_path / "storage" storage.mkdir() (storage / "z3newrepo").mkdir() relay.storage = storage relay._discover_rids() assert "rad:z3newrepo" in relay.rids def test_auto_discover_does_not_add_duplicates(self, tmp_path): relay = _make_relay(tmp_path, rids=["rad:z3existing"]) storage = tmp_path / "storage" storage.mkdir() (storage / "z3existing").mkdir() relay.storage = storage relay._discover_rids() assert relay.rids.count("rad:z3existing") == 1