radicle-reticulum/tests/test_gossip.py

435 lines
17 KiB
Python

"""Tests for GossipRelay."""
import json
import subprocess
import time
from pathlib import Path
from unittest.mock import MagicMock, patch, call
import pytest
from radicle_reticulum.gossip import GossipRelay, _read_refs, _radicle_storage_path
from radicle_reticulum.identity import RadicleIdentity
# ── Helpers ──────────────────────────────────────────────────────────────────
def _make_relay(tmp_path, rids=None, **kwargs):
identity = RadicleIdentity.generate()
with patch("radicle_reticulum.gossip.RNS.Reticulum"), \
patch("radicle_reticulum.gossip.RNS.Destination") as mock_dest_cls, \
patch("radicle_reticulum.gossip.RNS.Transport"):
mock_dest = MagicMock()
mock_dest.hash = b"\x00" * 16
mock_dest.hexhash = "0" * 32
mock_dest_cls.return_value = mock_dest
relay = GossipRelay(
identity=identity,
rids=rids or ["rad:z3abc123"],
storage=tmp_path / "storage",
**kwargs,
)
relay.destination = mock_dest
return relay
SAMPLE_REFS = {
"refs/heads/main": "abc123def456" * 3,
"refs/rad/sigrefs": "def456abc123" * 3,
}
# ── _read_refs ────────────────────────────────────────────────────────────────
class TestReadRefs:
def test_missing_storage_returns_empty(self, tmp_path):
result = _read_refs(tmp_path / "storage", "rad:z3nonexistent")
assert result == {}
def test_reads_refs_from_bare_repo(self, tmp_path):
rid = "rad:z3testrepo"
repo_path = tmp_path / "z3testrepo"
repo_path.mkdir()
show_ref_output = (
"abc123def456abc123def456abc123def456abc1 refs/heads/main\n"
"def456abc123def456abc123def456abc123def4 refs/rad/sigrefs\n"
)
with patch("radicle_reticulum.gossip.subprocess.run") as mock_run:
mock_run.return_value = MagicMock(returncode=0, stdout=show_ref_output)
result = _read_refs(tmp_path, rid)
assert result["refs/heads/main"] == "abc123def456abc123def456abc123def456abc1"
assert result["refs/rad/sigrefs"] == "def456abc123def456abc123def456abc123def4"
def test_strips_rad_prefix(self, tmp_path):
# rid with "rad:" prefix should map to directory without it
repo_path = tmp_path / "z3abc"
repo_path.mkdir()
result = _read_refs(tmp_path, "rad:z3abc")
assert result == {} # empty repo, no refs — just confirming no crash
# ── GossipRelay construction ──────────────────────────────────────────────────
class TestGossipRelayInit:
def test_defaults(self, tmp_path):
relay = _make_relay(tmp_path)
assert relay.rids == ["rad:z3abc123"]
assert relay.poll_interval == 30
assert relay.bridge_port == 8777
assert relay.announce_retry_delays == (5, 15, 30)
def test_custom_params(self, tmp_path):
relay = _make_relay(
tmp_path,
rids=["rad:z3a", "rad:z3b"],
bridge_port=9000,
poll_interval=60,
)
assert len(relay.rids) == 2
assert relay.bridge_port == 9000
assert relay.poll_interval == 60
# ── Broadcast ────────────────────────────────────────────────────────────────
class TestBroadcast:
def test_broadcast_sends_to_known_peers(self, tmp_path):
relay = _make_relay(tmp_path, radicle_nid="z6Mktest")
peer_hash = b"\x01" * 16
relay._known_peers[peer_hash] = time.time()
with patch.object(relay, "_send_packet", return_value=True) as mock_send:
relay._broadcast("rad:z3abc123", SAMPLE_REFS)
mock_send.assert_called_once()
args = mock_send.call_args[0]
assert args[0] == peer_hash
msg = json.loads(args[1].decode())
assert msg["rid"] == "rad:z3abc123"
assert msg["nid"] == "z6Mktest"
assert msg["refs"] == SAMPLE_REFS
def test_broadcast_no_peers_sends_nothing(self, tmp_path):
relay = _make_relay(tmp_path)
with patch.object(relay, "_send_packet") as mock_send:
relay._broadcast("rad:z3abc123", SAMPLE_REFS)
mock_send.assert_not_called()
def test_broadcast_multiple_peers(self, tmp_path):
relay = _make_relay(tmp_path)
for i in range(3):
relay._known_peers[bytes([i]) * 16] = time.time()
with patch.object(relay, "_send_packet", return_value=True) as mock_send:
relay._broadcast("rad:z3abc123", SAMPLE_REFS)
assert mock_send.call_count == 3
# ── Poll loop ─────────────────────────────────────────────────────────────────
class TestPollLoop:
def test_broadcasts_on_ref_change(self, tmp_path):
relay = _make_relay(tmp_path)
relay._known_refs["rad:z3abc123"] = {"refs/heads/main": "old_sha"}
new_refs = {"refs/heads/main": "new_sha"}
with patch("radicle_reticulum.gossip._read_refs", return_value=new_refs), \
patch.object(relay, "_broadcast") as mock_broadcast:
# Simulate one poll iteration
for rid in relay.rids:
refs = new_refs
old = relay._known_refs.get(rid)
if refs and refs != old:
if old is not None:
relay._broadcast(rid, refs)
relay._known_refs[rid] = refs
mock_broadcast.assert_called_once_with("rad:z3abc123", new_refs)
def test_no_broadcast_on_first_poll(self, tmp_path):
relay = _make_relay(tmp_path)
# No existing refs — first poll should not broadcast
with patch("radicle_reticulum.gossip._read_refs", return_value=SAMPLE_REFS), \
patch.object(relay, "_broadcast") as mock_broadcast:
for rid in relay.rids:
refs = SAMPLE_REFS
old = relay._known_refs.get(rid)
if refs and refs != old:
if old is not None:
relay._broadcast(rid, refs)
relay._known_refs[rid] = refs
mock_broadcast.assert_not_called()
assert relay._known_refs["rad:z3abc123"] == SAMPLE_REFS
def test_no_broadcast_when_refs_unchanged(self, tmp_path):
relay = _make_relay(tmp_path)
relay._known_refs["rad:z3abc123"] = SAMPLE_REFS
with patch("radicle_reticulum.gossip._read_refs", return_value=SAMPLE_REFS), \
patch.object(relay, "_broadcast") as mock_broadcast:
for rid in relay.rids:
refs = SAMPLE_REFS
old = relay._known_refs.get(rid)
if refs and refs != old:
if old is not None:
relay._broadcast(rid, refs)
mock_broadcast.assert_not_called()
# ── Incoming packets ──────────────────────────────────────────────────────────
class TestOnPacket:
def _make_packet(self, rid, nid, refs):
return json.dumps({"type": "refs", "rid": rid, "nid": nid, "refs": refs}).encode()
def test_triggers_sync_on_new_refs(self, tmp_path):
relay = _make_relay(tmp_path)
relay._known_refs["rad:z3abc123"] = {"refs/heads/main": "old"}
mock_packet = MagicMock()
with patch.object(relay, "_trigger_sync") as mock_sync:
relay._on_packet(
self._make_packet("rad:z3abc123", "z6Mkpeer", {"refs/heads/main": "new"}),
mock_packet,
)
mock_sync.assert_called_once_with("rad:z3abc123", "z6Mkpeer")
def test_no_sync_when_refs_match(self, tmp_path):
relay = _make_relay(tmp_path)
relay._known_refs["rad:z3abc123"] = SAMPLE_REFS
mock_packet = MagicMock()
with patch.object(relay, "_trigger_sync") as mock_sync:
relay._on_packet(
self._make_packet("rad:z3abc123", "z6Mkpeer", SAMPLE_REFS),
mock_packet,
)
mock_sync.assert_not_called()
def test_ignores_invalid_json(self, tmp_path):
relay = _make_relay(tmp_path)
mock_packet = MagicMock()
# Should not raise
relay._on_packet(b"not json at all", mock_packet)
def test_ignores_wrong_type(self, tmp_path):
relay = _make_relay(tmp_path)
mock_packet = MagicMock()
with patch.object(relay, "_trigger_sync") as mock_sync:
relay._on_packet(
json.dumps({"type": "ping", "rid": "rad:z3abc123"}).encode(),
mock_packet,
)
mock_sync.assert_not_called()
def test_triggers_sync_for_unknown_repo(self, tmp_path):
relay = _make_relay(tmp_path)
# No prior known refs — any incoming refs are "new"
mock_packet = MagicMock()
with patch.object(relay, "_trigger_sync") as mock_sync:
relay._on_packet(
self._make_packet("rad:z3unknown", "z6Mkpeer", SAMPLE_REFS),
mock_packet,
)
mock_sync.assert_called_once()
def test_sync_callback_invoked(self, tmp_path):
relay = _make_relay(tmp_path)
received = []
relay.set_on_sync_triggered(lambda rid, nid: received.append((rid, nid)))
with patch("radicle_reticulum.gossip.subprocess.run") as mock_run:
mock_run.return_value = MagicMock(returncode=0)
relay._trigger_sync("rad:z3abc123", "z6Mkpeer")
assert received == [("rad:z3abc123", "z6Mkpeer")]
# ── Trigger sync ──────────────────────────────────────────────────────────────
class TestTriggerSync:
def test_calls_rad_node_connect_when_nid_given(self, tmp_path):
relay = _make_relay(tmp_path, bridge_port=8777)
with patch("radicle_reticulum.gossip.subprocess.run") as mock_run:
mock_run.return_value = MagicMock(returncode=0)
relay._trigger_sync("rad:z3abc123", "z6Mkpeer123")
calls = [c[0][0] for c in mock_run.call_args_list]
connect_calls = [c for c in calls if "connect" in c]
assert connect_calls
assert "z6Mkpeer123@127.0.0.1:8777" in connect_calls[0]
def test_skips_connect_when_no_nid(self, tmp_path):
relay = _make_relay(tmp_path)
with patch("radicle_reticulum.gossip.subprocess.run") as mock_run:
mock_run.return_value = MagicMock(returncode=0)
relay._trigger_sync("rad:z3abc123", "")
calls = [c[0][0] for c in mock_run.call_args_list]
connect_calls = [c for c in calls if "connect" in c]
assert not connect_calls
def test_calls_rad_sync_fetch(self, tmp_path):
relay = _make_relay(tmp_path)
with patch("radicle_reticulum.gossip.subprocess.run") as mock_run:
mock_run.return_value = MagicMock(returncode=0)
relay._trigger_sync("rad:z3abc123", "")
all_calls = [c[0][0] for c in mock_run.call_args_list]
sync_calls = [c for c in all_calls if "sync" in c]
assert sync_calls
assert "--fetch" in sync_calls[0]
assert "--rid" in sync_calls[0]
# ── Peer discovery ────────────────────────────────────────────────────────────
class TestPeerDiscovery:
def _make_app_data(self, nid=None):
import struct
from radicle_reticulum.gossip import GOSSIP_MAGIC
data = GOSSIP_MAGIC
if nid:
b = nid.encode()
data += struct.pack("!H", len(b)) + b
return data
def test_new_peer_added(self, tmp_path):
relay = _make_relay(tmp_path)
peer_hash = b"\x02" * 16
relay._known_refs["rad:z3abc123"] = SAMPLE_REFS
with patch.object(relay, "_send_packet"):
relay._on_announce(peer_hash, MagicMock(), self._make_app_data("z6Mkpeer"))
assert peer_hash in relay._known_peers
def test_own_announce_ignored(self, tmp_path):
relay = _make_relay(tmp_path)
own_hash = relay.destination.hash
with patch.object(relay, "_send_packet") as mock_send:
relay._on_announce(own_hash, MagicMock(), self._make_app_data())
mock_send.assert_not_called()
assert own_hash not in relay._known_peers
def test_non_gossip_announce_ignored(self, tmp_path):
relay = _make_relay(tmp_path)
peer_hash = b"\x03" * 16
with patch.object(relay, "_send_packet") as mock_send:
relay._on_announce(peer_hash, MagicMock(), b"OTHER_APP_DATA")
mock_send.assert_not_called()
assert peer_hash not in relay._known_peers
def test_sends_current_refs_to_new_peer(self, tmp_path):
relay = _make_relay(tmp_path)
relay._known_refs["rad:z3abc123"] = SAMPLE_REFS
peer_hash = b"\x04" * 16
with patch.object(relay, "_send_packet") as mock_send:
relay._on_announce(peer_hash, MagicMock(), self._make_app_data())
mock_send.assert_called_once()
assert mock_send.call_args[0][0] == peer_hash
def test_duplicate_announce_not_re_sent(self, tmp_path):
relay = _make_relay(tmp_path)
relay._known_refs["rad:z3abc123"] = SAMPLE_REFS
peer_hash = b"\x05" * 16
relay._known_peers[peer_hash] = time.time() # already known
with patch.object(relay, "_send_packet") as mock_send:
relay._on_announce(peer_hash, MagicMock(), self._make_app_data())
mock_send.assert_not_called()
# ── push_refs_now ─────────────────────────────────────────────────────────────
class TestPushRefsNow:
def test_push_refs_now_broadcasts_immediately(self, tmp_path):
relay = _make_relay(tmp_path)
peer_hash = b"\x06" * 16
relay._known_peers[peer_hash] = time.time()
with patch("radicle_reticulum.gossip._read_refs", return_value=SAMPLE_REFS), \
patch.object(relay, "_send_packet", return_value=True) as mock_send:
relay.push_refs_now("rad:z3abc123")
mock_send.assert_called_once()
def test_push_refs_now_updates_known_refs(self, tmp_path):
relay = _make_relay(tmp_path)
with patch("radicle_reticulum.gossip._read_refs", return_value=SAMPLE_REFS), \
patch.object(relay, "_send_packet", return_value=True):
relay.push_refs_now("rad:z3abc123")
assert relay._known_refs["rad:z3abc123"] == SAMPLE_REFS
# ── Watchdog / poll event ──────────────────────────────────────────────────────
class TestWatchdog:
def test_poll_event_wakes_poll_loop_early(self, tmp_path):
relay = _make_relay(tmp_path)
relay._known_refs["rad:z3abc123"] = {"refs/heads/main": "old"}
new_refs = {"refs/heads/main": "new"}
broadcasts = []
with patch("radicle_reticulum.gossip._read_refs", return_value=new_refs), \
patch.object(relay, "_broadcast", side_effect=lambda r, refs: broadcasts.append(r)):
# Signal the event (simulates watchdog firing)
relay._poll_event.set()
# Run one poll iteration
relay._poll_loop_once()
assert broadcasts == ["rad:z3abc123"]
def test_stop_sets_poll_event(self, tmp_path):
relay = _make_relay(tmp_path)
relay._running = True
with patch("radicle_reticulum.gossip.RNS.log"):
relay.stop()
assert relay._poll_event.is_set()
def test_start_watcher_graceful_without_watchdog(self, tmp_path):
relay = _make_relay(tmp_path)
with patch.dict("sys.modules", {"watchdog": None, "watchdog.observers": None,
"watchdog.events": None}), \
patch("radicle_reticulum.gossip.RNS.log"):
relay._start_watcher()
assert relay._observer is None
def test_auto_discover_adds_new_repos(self, tmp_path):
relay = _make_relay(tmp_path, rids=[])
storage = tmp_path / "storage"
storage.mkdir()
(storage / "z3newrepo").mkdir()
relay.storage = storage
relay._discover_rids()
assert "rad:z3newrepo" in relay.rids
def test_auto_discover_does_not_add_duplicates(self, tmp_path):
relay = _make_relay(tmp_path, rids=["rad:z3existing"])
storage = tmp_path / "storage"
storage.mkdir()
(storage / "z3existing").mkdir()
relay.storage = storage
relay._discover_rids()
assert relay.rids.count("rad:z3existing") == 1