diff --git a/src/radicle_reticulum/cli.py b/src/radicle_reticulum/cli.py index 5da8def..3e1ff31 100644 --- a/src/radicle_reticulum/cli.py +++ b/src/radicle_reticulum/cli.py @@ -403,6 +403,7 @@ def cmd_seed(args): poll_interval=args.poll_interval, announce_retry_delays=announce_retry_delays, auto_discover=True, + auto_seed=True, rad_home=str(seed_home), ) diff --git a/src/radicle_reticulum/gossip.py b/src/radicle_reticulum/gossip.py index c52594e..b34d5b2 100644 --- a/src/radicle_reticulum/gossip.py +++ b/src/radicle_reticulum/gossip.py @@ -94,6 +94,7 @@ class GossipRelay: announce_retry_delays: Tuple[int, ...] = (5, 15, 30), config_path: Optional[str] = None, auto_discover: bool = False, + auto_seed: bool = False, rad_home: Optional[str] = None, ): """ @@ -110,6 +111,10 @@ class GossipRelay: config_path: Reticulum config path (None = default). auto_discover: Scan storage each poll cycle and add new repo dirs to rids automatically. Useful in seed mode. + auto_seed: When refs arrive for an unknown repo, automatically call + 'rad seed ' so this node starts tracking it, then sync. + Combines with auto_discover: once seeded, the repo is picked up + on the next poll cycle. rad_home: RAD_HOME override for rad CLI calls. None = system default. """ self.identity = identity @@ -120,6 +125,7 @@ class GossipRelay: self.poll_interval = poll_interval self.announce_retry_delays = announce_retry_delays self.auto_discover = auto_discover + self.auto_seed = auto_seed self.rad_home = rad_home existing = RNS.Reticulum.get_instance() @@ -273,7 +279,7 @@ class GossipRelay: if changed: self._known_refs[rid] = refs if changed and not first_poll: - self._broadcast(rid, refs) + self._broadcast(rid, refs, old_refs=old) except Exception as e: RNS.log(f"Gossip poll error ({rid[:20]}): {e}", RNS.LOG_WARNING) @@ -307,19 +313,32 @@ class GossipRelay: }).encode() self._send_packet(destination_hash, payload) - def _broadcast(self, rid: str, refs: Dict[str, str]): - payload = json.dumps({ - "type": "refs", - "rid": rid, - "nid": self.radicle_nid or "", - "refs": refs, - }).encode() + def _broadcast( + self, + rid: str, + refs: Dict[str, str], + old_refs: Optional[Dict[str, str]] = None, + ): + if old_refs is not None: + # Delta mode: only send refs that changed, saving bandwidth on LoRa + to_send = {k: v for k, v in refs.items() if v != old_refs.get(k)} + else: + to_send = refs + + msg: Dict = {"type": "refs", "rid": rid, "nid": self.radicle_nid or "", "refs": to_send} + if old_refs is not None: + msg["delta"] = True + payload = json.dumps(msg).encode() with self._peers_lock: peers = list(self._known_peers.keys()) sent = sum(1 for h in peers if self._send_packet(h, payload)) - RNS.log(f"Broadcast refs for {rid[:20]}... → {sent}/{len(peers)} peers", RNS.LOG_INFO) + RNS.log( + f"Broadcast refs for {rid[:20]}... → {sent}/{len(peers)} peers " + f"({'delta' if old_refs is not None else 'full'}, {len(payload)}B)", + RNS.LOG_INFO, + ) def _send_packet(self, peer_hash: bytes, payload: bytes) -> bool: try: @@ -369,6 +388,7 @@ class GossipRelay: rid: str = msg.get("rid", "") nid: str = msg.get("nid", "") + is_delta: bool = msg.get("delta", False) remote_refs: Dict[str, str] = msg.get("refs", {}) if not rid or not remote_refs: @@ -376,18 +396,29 @@ class GossipRelay: with self._refs_lock: local_refs = self._known_refs.get(rid, {}) - changed = any(remote_refs.get(r) != local_refs.get(r) for r in remote_refs) + + # Delta packets carry only changed refs — merge onto local known state + effective_refs = {**local_refs, **remote_refs} if is_delta else remote_refs + changed = any(effective_refs.get(r) != local_refs.get(r) for r in effective_refs) if changed: RNS.log( f"Gossip: new refs for {rid[:20]}... from {nid[:24] if nid else 'unknown'}", RNS.LOG_INFO, ) - threading.Thread( - target=self._trigger_sync, - args=(rid, nid), - daemon=True, - ).start() + is_tracked = rid in self.rids + if self.auto_seed and not is_tracked: + threading.Thread( + target=self._auto_seed_and_sync, + args=(rid, nid), + daemon=True, + ).start() + else: + threading.Thread( + target=self._trigger_sync, + args=(rid, nid), + daemon=True, + ).start() def _trigger_sync(self, rid: str, nid: str): """Run rad sync --fetch targeting the specific NID that sent the gossip. @@ -441,6 +472,29 @@ class GossipRelay: if self._on_sync_triggered: self._on_sync_triggered(rid, nid) + def _auto_seed_and_sync(self, rid: str, nid: str): + """Call 'rad seed ' then sync — triggered for repos not yet tracked.""" + env = None + if self.rad_home: + env = os.environ.copy() + env["RAD_HOME"] = self.rad_home + + result = subprocess.run( + ["rad", "seed", rid], + capture_output=True, text=True, timeout=30, env=env, + ) + if result.returncode == 0: + RNS.log(f"Auto-seeded new repo: {rid[:20]}", RNS.LOG_INFO) + if rid not in self.rids: + self.rids.append(rid) + self._trigger_sync(rid, nid) + else: + stderr = result.stderr.strip() + RNS.log( + f"rad seed {rid[:20]} failed: {stderr[:80] if stderr else '(no output)'}", + RNS.LOG_WARNING, + ) + # ── Internal: peer discovery ───────────────────────────────────────────── def _on_announce( diff --git a/tests/test_gossip.py b/tests/test_gossip.py index af10c96..0780932 100644 --- a/tests/test_gossip.py +++ b/tests/test_gossip.py @@ -252,6 +252,153 @@ class TestOnPacket: assert received == [("rad:z3abc123", "z6Mkpeer")] + def test_delta_packet_merges_with_local_refs(self, tmp_path): + """A delta packet updates only the changed refs, keeping others intact.""" + relay = _make_relay(tmp_path) + relay._known_refs["rad:z3abc123"] = { + "refs/heads/main": "aaa", + "refs/heads/dev": "bbb", + } + delta_packet = json.dumps({ + "type": "refs", + "rid": "rad:z3abc123", + "nid": "z6Mkpeer", + "delta": True, + "refs": {"refs/heads/main": "ccc"}, # only main changed + }).encode() + + with patch.object(relay, "_trigger_sync") as mock_sync: + relay._on_packet(delta_packet, MagicMock()) + + mock_sync.assert_called_once() + + def test_delta_packet_no_sync_when_already_known(self, tmp_path): + """Delta packet with refs we already know should not trigger sync.""" + relay = _make_relay(tmp_path) + relay._known_refs["rad:z3abc123"] = {"refs/heads/main": "aaa"} + delta_packet = json.dumps({ + "type": "refs", + "rid": "rad:z3abc123", + "nid": "z6Mkpeer", + "delta": True, + "refs": {"refs/heads/main": "aaa"}, # same value + }).encode() + + with patch.object(relay, "_trigger_sync") as mock_sync: + relay._on_packet(delta_packet, MagicMock()) + + mock_sync.assert_not_called() + + def test_auto_seed_triggered_for_unknown_repo(self, tmp_path): + """When auto_seed=True and RID is unknown, _auto_seed_and_sync is called.""" + relay = _make_relay(tmp_path, auto_seed=True) + packet = self._make_packet("rad:z3brand_new", "z6Mkpeer", SAMPLE_REFS) + + with patch.object(relay, "_auto_seed_and_sync") as mock_auto, \ + patch.object(relay, "_trigger_sync") as mock_sync: + relay._on_packet(packet, MagicMock()) + + mock_auto.assert_called_once_with("rad:z3brand_new", "z6Mkpeer") + mock_sync.assert_not_called() + + def test_auto_seed_not_triggered_for_tracked_repo(self, tmp_path): + """auto_seed=True must not re-seed repos already in self.rids.""" + relay = _make_relay(tmp_path, auto_seed=True) + # rad:z3abc123 is already in rids (default from _make_relay) + relay._known_refs["rad:z3abc123"] = {"refs/heads/main": "old"} + + with patch.object(relay, "_auto_seed_and_sync") as mock_auto, \ + patch.object(relay, "_trigger_sync") as mock_sync: + relay._on_packet( + self._make_packet("rad:z3abc123", "z6Mkpeer", {"refs/heads/main": "new"}), + MagicMock(), + ) + + mock_auto.assert_not_called() + mock_sync.assert_called_once() + + +# ── Broadcast delta ─────────────────────────────────────────────────────────── + +class TestBroadcastDelta: + def test_full_broadcast_when_no_old_refs(self, tmp_path): + relay = _make_relay(tmp_path) + relay._known_peers[b"\x01" * 16] = 0.0 + sent = [] + relay._send_packet = lambda h, p: sent.append(json.loads(p)) or True + + relay._broadcast("rad:z3abc123", SAMPLE_REFS) + + assert len(sent) == 1 + assert sent[0].get("delta") is None or sent[0].get("delta") is False + assert sent[0]["refs"] == SAMPLE_REFS + + def test_delta_broadcast_only_sends_changed_refs(self, tmp_path): + relay = _make_relay(tmp_path) + relay._known_peers[b"\x01" * 16] = 0.0 + sent = [] + relay._send_packet = lambda h, p: sent.append(json.loads(p)) or True + + old = {"refs/heads/main": "aaa", "refs/heads/dev": "bbb"} + new = {"refs/heads/main": "ccc", "refs/heads/dev": "bbb"} # only main changed + relay._broadcast("rad:z3abc123", new, old_refs=old) + + assert len(sent) == 1 + assert sent[0]["delta"] is True + assert sent[0]["refs"] == {"refs/heads/main": "ccc"} + assert "refs/heads/dev" not in sent[0]["refs"] + + def test_initial_refs_send_is_full_not_delta(self, tmp_path): + relay = _make_relay(tmp_path) + relay._known_refs["rad:z3abc123"] = SAMPLE_REFS + sent = [] + relay._send_packet = lambda h, p: sent.append(json.loads(p)) or True + + relay._send_initial_refs(b"\x02" * 16) + + assert len(sent) == 1 + assert sent[0].get("delta") is None or sent[0].get("delta") is False + assert sent[0]["refs"] == SAMPLE_REFS + + +# ── Auto-seed ───────────────────────────────────────────────────────────────── + +class TestAutoSeed: + def test_seeds_and_syncs_unknown_repo(self, tmp_path): + relay = _make_relay(tmp_path, auto_seed=True) + + with patch("radicle_reticulum.gossip.subprocess.run") as mock_run, \ + patch.object(relay, "_trigger_sync") as mock_sync: + mock_run.return_value = MagicMock(returncode=0) + relay._auto_seed_and_sync("rad:z3brand_new", "z6Mkpeer") + + calls = [c[0][0] for c in mock_run.call_args_list] + assert any("seed" in c for c in calls) + assert "rad:z3brand_new" in relay.rids + mock_sync.assert_called_once_with("rad:z3brand_new", "z6Mkpeer") + + def test_no_sync_when_seed_fails(self, tmp_path): + relay = _make_relay(tmp_path, auto_seed=True) + + with patch("radicle_reticulum.gossip.subprocess.run") as mock_run, \ + patch.object(relay, "_trigger_sync") as mock_sync: + mock_run.return_value = MagicMock(returncode=1, stderr="permission denied") + relay._auto_seed_and_sync("rad:z3brand_new", "z6Mkpeer") + + mock_sync.assert_not_called() + assert "rad:z3brand_new" not in relay.rids + + def test_does_not_duplicate_rid(self, tmp_path): + relay = _make_relay(tmp_path, auto_seed=True) + relay.rids.append("rad:z3already") + + with patch("radicle_reticulum.gossip.subprocess.run") as mock_run, \ + patch.object(relay, "_trigger_sync"): + mock_run.return_value = MagicMock(returncode=0) + relay._auto_seed_and_sync("rad:z3already", "z6Mkpeer") + + assert relay.rids.count("rad:z3already") == 1 + # ── Trigger sync ────────────────────────────────────────────────────────────── @@ -439,7 +586,7 @@ class TestWatchdog: broadcasts = [] with patch("radicle_reticulum.gossip._read_refs", return_value=new_refs), \ - patch.object(relay, "_broadcast", side_effect=lambda r, refs: broadcasts.append(r)): + patch.object(relay, "_broadcast", side_effect=lambda r, refs, **kw: broadcasts.append(r)): # Signal the event (simulates watchdog firing) relay._poll_event.set() # Run one poll iteration