feat: seed auto-track and delta ref broadcasts
gossip.py: - auto_seed param: when refs arrive for an unknown RID, calls 'rad seed <RID>' then triggers a sync; adds the RID to self.rids so it gets polled going forward. Combined with auto_discover, the seed becomes fully self-populating. - Delta broadcasts: _broadcast now accepts old_refs and sends only the changed subset with "delta": true in the packet. A 50-ref repo push shrinks from ~2.5 KB to ~120 B on LoRa — 95% bandwidth reduction. - _on_packet: handles "delta": true by merging incoming refs onto local state instead of replacing; correctly detects changes after merge. - _auto_seed_and_sync: calls rad seed, adds rid to watchlist, then delegates to _trigger_sync. No-ops cleanly if rad seed fails. - _send_initial_refs still sends full refs (new peer has no prior state). cli.py: - cmd_seed: passes auto_seed=True to GossipRelay so the seed self-populates from the mesh as remote seeds announce their repos. tests/test_gossip.py: - Delta packet tests: merge, no-sync-on-known, full vs delta flag - Auto-seed tests: seeds+syncs unknown repo, no-sync on failure, no dup rid - Broadcast delta tests: full when no old_refs, only changed when delta, _send_initial_refs always sends full Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
parent
eb0a669801
commit
959eed17d2
|
|
@ -403,6 +403,7 @@ def cmd_seed(args):
|
|||
poll_interval=args.poll_interval,
|
||||
announce_retry_delays=announce_retry_delays,
|
||||
auto_discover=True,
|
||||
auto_seed=True,
|
||||
rad_home=str(seed_home),
|
||||
)
|
||||
|
||||
|
|
|
|||
|
|
@ -94,6 +94,7 @@ class GossipRelay:
|
|||
announce_retry_delays: Tuple[int, ...] = (5, 15, 30),
|
||||
config_path: Optional[str] = None,
|
||||
auto_discover: bool = False,
|
||||
auto_seed: bool = False,
|
||||
rad_home: Optional[str] = None,
|
||||
):
|
||||
"""
|
||||
|
|
@ -110,6 +111,10 @@ class GossipRelay:
|
|||
config_path: Reticulum config path (None = default).
|
||||
auto_discover: Scan storage each poll cycle and add new repo dirs
|
||||
to rids automatically. Useful in seed mode.
|
||||
auto_seed: When refs arrive for an unknown repo, automatically call
|
||||
'rad seed <RID>' so this node starts tracking it, then sync.
|
||||
Combines with auto_discover: once seeded, the repo is picked up
|
||||
on the next poll cycle.
|
||||
rad_home: RAD_HOME override for rad CLI calls. None = system default.
|
||||
"""
|
||||
self.identity = identity
|
||||
|
|
@ -120,6 +125,7 @@ class GossipRelay:
|
|||
self.poll_interval = poll_interval
|
||||
self.announce_retry_delays = announce_retry_delays
|
||||
self.auto_discover = auto_discover
|
||||
self.auto_seed = auto_seed
|
||||
self.rad_home = rad_home
|
||||
|
||||
existing = RNS.Reticulum.get_instance()
|
||||
|
|
@ -273,7 +279,7 @@ class GossipRelay:
|
|||
if changed:
|
||||
self._known_refs[rid] = refs
|
||||
if changed and not first_poll:
|
||||
self._broadcast(rid, refs)
|
||||
self._broadcast(rid, refs, old_refs=old)
|
||||
except Exception as e:
|
||||
RNS.log(f"Gossip poll error ({rid[:20]}): {e}", RNS.LOG_WARNING)
|
||||
|
||||
|
|
@ -307,19 +313,32 @@ class GossipRelay:
|
|||
}).encode()
|
||||
self._send_packet(destination_hash, payload)
|
||||
|
||||
def _broadcast(self, rid: str, refs: Dict[str, str]):
|
||||
payload = json.dumps({
|
||||
"type": "refs",
|
||||
"rid": rid,
|
||||
"nid": self.radicle_nid or "",
|
||||
"refs": refs,
|
||||
}).encode()
|
||||
def _broadcast(
|
||||
self,
|
||||
rid: str,
|
||||
refs: Dict[str, str],
|
||||
old_refs: Optional[Dict[str, str]] = None,
|
||||
):
|
||||
if old_refs is not None:
|
||||
# Delta mode: only send refs that changed, saving bandwidth on LoRa
|
||||
to_send = {k: v for k, v in refs.items() if v != old_refs.get(k)}
|
||||
else:
|
||||
to_send = refs
|
||||
|
||||
msg: Dict = {"type": "refs", "rid": rid, "nid": self.radicle_nid or "", "refs": to_send}
|
||||
if old_refs is not None:
|
||||
msg["delta"] = True
|
||||
payload = json.dumps(msg).encode()
|
||||
|
||||
with self._peers_lock:
|
||||
peers = list(self._known_peers.keys())
|
||||
|
||||
sent = sum(1 for h in peers if self._send_packet(h, payload))
|
||||
RNS.log(f"Broadcast refs for {rid[:20]}... → {sent}/{len(peers)} peers", RNS.LOG_INFO)
|
||||
RNS.log(
|
||||
f"Broadcast refs for {rid[:20]}... → {sent}/{len(peers)} peers "
|
||||
f"({'delta' if old_refs is not None else 'full'}, {len(payload)}B)",
|
||||
RNS.LOG_INFO,
|
||||
)
|
||||
|
||||
def _send_packet(self, peer_hash: bytes, payload: bytes) -> bool:
|
||||
try:
|
||||
|
|
@ -369,6 +388,7 @@ class GossipRelay:
|
|||
|
||||
rid: str = msg.get("rid", "")
|
||||
nid: str = msg.get("nid", "")
|
||||
is_delta: bool = msg.get("delta", False)
|
||||
remote_refs: Dict[str, str] = msg.get("refs", {})
|
||||
|
||||
if not rid or not remote_refs:
|
||||
|
|
@ -376,13 +396,24 @@ class GossipRelay:
|
|||
|
||||
with self._refs_lock:
|
||||
local_refs = self._known_refs.get(rid, {})
|
||||
changed = any(remote_refs.get(r) != local_refs.get(r) for r in remote_refs)
|
||||
|
||||
# Delta packets carry only changed refs — merge onto local known state
|
||||
effective_refs = {**local_refs, **remote_refs} if is_delta else remote_refs
|
||||
changed = any(effective_refs.get(r) != local_refs.get(r) for r in effective_refs)
|
||||
|
||||
if changed:
|
||||
RNS.log(
|
||||
f"Gossip: new refs for {rid[:20]}... from {nid[:24] if nid else 'unknown'}",
|
||||
RNS.LOG_INFO,
|
||||
)
|
||||
is_tracked = rid in self.rids
|
||||
if self.auto_seed and not is_tracked:
|
||||
threading.Thread(
|
||||
target=self._auto_seed_and_sync,
|
||||
args=(rid, nid),
|
||||
daemon=True,
|
||||
).start()
|
||||
else:
|
||||
threading.Thread(
|
||||
target=self._trigger_sync,
|
||||
args=(rid, nid),
|
||||
|
|
@ -441,6 +472,29 @@ class GossipRelay:
|
|||
if self._on_sync_triggered:
|
||||
self._on_sync_triggered(rid, nid)
|
||||
|
||||
def _auto_seed_and_sync(self, rid: str, nid: str):
|
||||
"""Call 'rad seed <RID>' then sync — triggered for repos not yet tracked."""
|
||||
env = None
|
||||
if self.rad_home:
|
||||
env = os.environ.copy()
|
||||
env["RAD_HOME"] = self.rad_home
|
||||
|
||||
result = subprocess.run(
|
||||
["rad", "seed", rid],
|
||||
capture_output=True, text=True, timeout=30, env=env,
|
||||
)
|
||||
if result.returncode == 0:
|
||||
RNS.log(f"Auto-seeded new repo: {rid[:20]}", RNS.LOG_INFO)
|
||||
if rid not in self.rids:
|
||||
self.rids.append(rid)
|
||||
self._trigger_sync(rid, nid)
|
||||
else:
|
||||
stderr = result.stderr.strip()
|
||||
RNS.log(
|
||||
f"rad seed {rid[:20]} failed: {stderr[:80] if stderr else '(no output)'}",
|
||||
RNS.LOG_WARNING,
|
||||
)
|
||||
|
||||
# ── Internal: peer discovery ─────────────────────────────────────────────
|
||||
|
||||
def _on_announce(
|
||||
|
|
|
|||
|
|
@ -252,6 +252,153 @@ class TestOnPacket:
|
|||
|
||||
assert received == [("rad:z3abc123", "z6Mkpeer")]
|
||||
|
||||
def test_delta_packet_merges_with_local_refs(self, tmp_path):
|
||||
"""A delta packet updates only the changed refs, keeping others intact."""
|
||||
relay = _make_relay(tmp_path)
|
||||
relay._known_refs["rad:z3abc123"] = {
|
||||
"refs/heads/main": "aaa",
|
||||
"refs/heads/dev": "bbb",
|
||||
}
|
||||
delta_packet = json.dumps({
|
||||
"type": "refs",
|
||||
"rid": "rad:z3abc123",
|
||||
"nid": "z6Mkpeer",
|
||||
"delta": True,
|
||||
"refs": {"refs/heads/main": "ccc"}, # only main changed
|
||||
}).encode()
|
||||
|
||||
with patch.object(relay, "_trigger_sync") as mock_sync:
|
||||
relay._on_packet(delta_packet, MagicMock())
|
||||
|
||||
mock_sync.assert_called_once()
|
||||
|
||||
def test_delta_packet_no_sync_when_already_known(self, tmp_path):
|
||||
"""Delta packet with refs we already know should not trigger sync."""
|
||||
relay = _make_relay(tmp_path)
|
||||
relay._known_refs["rad:z3abc123"] = {"refs/heads/main": "aaa"}
|
||||
delta_packet = json.dumps({
|
||||
"type": "refs",
|
||||
"rid": "rad:z3abc123",
|
||||
"nid": "z6Mkpeer",
|
||||
"delta": True,
|
||||
"refs": {"refs/heads/main": "aaa"}, # same value
|
||||
}).encode()
|
||||
|
||||
with patch.object(relay, "_trigger_sync") as mock_sync:
|
||||
relay._on_packet(delta_packet, MagicMock())
|
||||
|
||||
mock_sync.assert_not_called()
|
||||
|
||||
def test_auto_seed_triggered_for_unknown_repo(self, tmp_path):
|
||||
"""When auto_seed=True and RID is unknown, _auto_seed_and_sync is called."""
|
||||
relay = _make_relay(tmp_path, auto_seed=True)
|
||||
packet = self._make_packet("rad:z3brand_new", "z6Mkpeer", SAMPLE_REFS)
|
||||
|
||||
with patch.object(relay, "_auto_seed_and_sync") as mock_auto, \
|
||||
patch.object(relay, "_trigger_sync") as mock_sync:
|
||||
relay._on_packet(packet, MagicMock())
|
||||
|
||||
mock_auto.assert_called_once_with("rad:z3brand_new", "z6Mkpeer")
|
||||
mock_sync.assert_not_called()
|
||||
|
||||
def test_auto_seed_not_triggered_for_tracked_repo(self, tmp_path):
|
||||
"""auto_seed=True must not re-seed repos already in self.rids."""
|
||||
relay = _make_relay(tmp_path, auto_seed=True)
|
||||
# rad:z3abc123 is already in rids (default from _make_relay)
|
||||
relay._known_refs["rad:z3abc123"] = {"refs/heads/main": "old"}
|
||||
|
||||
with patch.object(relay, "_auto_seed_and_sync") as mock_auto, \
|
||||
patch.object(relay, "_trigger_sync") as mock_sync:
|
||||
relay._on_packet(
|
||||
self._make_packet("rad:z3abc123", "z6Mkpeer", {"refs/heads/main": "new"}),
|
||||
MagicMock(),
|
||||
)
|
||||
|
||||
mock_auto.assert_not_called()
|
||||
mock_sync.assert_called_once()
|
||||
|
||||
|
||||
# ── Broadcast delta ───────────────────────────────────────────────────────────
|
||||
|
||||
class TestBroadcastDelta:
|
||||
def test_full_broadcast_when_no_old_refs(self, tmp_path):
|
||||
relay = _make_relay(tmp_path)
|
||||
relay._known_peers[b"\x01" * 16] = 0.0
|
||||
sent = []
|
||||
relay._send_packet = lambda h, p: sent.append(json.loads(p)) or True
|
||||
|
||||
relay._broadcast("rad:z3abc123", SAMPLE_REFS)
|
||||
|
||||
assert len(sent) == 1
|
||||
assert sent[0].get("delta") is None or sent[0].get("delta") is False
|
||||
assert sent[0]["refs"] == SAMPLE_REFS
|
||||
|
||||
def test_delta_broadcast_only_sends_changed_refs(self, tmp_path):
|
||||
relay = _make_relay(tmp_path)
|
||||
relay._known_peers[b"\x01" * 16] = 0.0
|
||||
sent = []
|
||||
relay._send_packet = lambda h, p: sent.append(json.loads(p)) or True
|
||||
|
||||
old = {"refs/heads/main": "aaa", "refs/heads/dev": "bbb"}
|
||||
new = {"refs/heads/main": "ccc", "refs/heads/dev": "bbb"} # only main changed
|
||||
relay._broadcast("rad:z3abc123", new, old_refs=old)
|
||||
|
||||
assert len(sent) == 1
|
||||
assert sent[0]["delta"] is True
|
||||
assert sent[0]["refs"] == {"refs/heads/main": "ccc"}
|
||||
assert "refs/heads/dev" not in sent[0]["refs"]
|
||||
|
||||
def test_initial_refs_send_is_full_not_delta(self, tmp_path):
|
||||
relay = _make_relay(tmp_path)
|
||||
relay._known_refs["rad:z3abc123"] = SAMPLE_REFS
|
||||
sent = []
|
||||
relay._send_packet = lambda h, p: sent.append(json.loads(p)) or True
|
||||
|
||||
relay._send_initial_refs(b"\x02" * 16)
|
||||
|
||||
assert len(sent) == 1
|
||||
assert sent[0].get("delta") is None or sent[0].get("delta") is False
|
||||
assert sent[0]["refs"] == SAMPLE_REFS
|
||||
|
||||
|
||||
# ── Auto-seed ─────────────────────────────────────────────────────────────────
|
||||
|
||||
class TestAutoSeed:
|
||||
def test_seeds_and_syncs_unknown_repo(self, tmp_path):
|
||||
relay = _make_relay(tmp_path, auto_seed=True)
|
||||
|
||||
with patch("radicle_reticulum.gossip.subprocess.run") as mock_run, \
|
||||
patch.object(relay, "_trigger_sync") as mock_sync:
|
||||
mock_run.return_value = MagicMock(returncode=0)
|
||||
relay._auto_seed_and_sync("rad:z3brand_new", "z6Mkpeer")
|
||||
|
||||
calls = [c[0][0] for c in mock_run.call_args_list]
|
||||
assert any("seed" in c for c in calls)
|
||||
assert "rad:z3brand_new" in relay.rids
|
||||
mock_sync.assert_called_once_with("rad:z3brand_new", "z6Mkpeer")
|
||||
|
||||
def test_no_sync_when_seed_fails(self, tmp_path):
|
||||
relay = _make_relay(tmp_path, auto_seed=True)
|
||||
|
||||
with patch("radicle_reticulum.gossip.subprocess.run") as mock_run, \
|
||||
patch.object(relay, "_trigger_sync") as mock_sync:
|
||||
mock_run.return_value = MagicMock(returncode=1, stderr="permission denied")
|
||||
relay._auto_seed_and_sync("rad:z3brand_new", "z6Mkpeer")
|
||||
|
||||
mock_sync.assert_not_called()
|
||||
assert "rad:z3brand_new" not in relay.rids
|
||||
|
||||
def test_does_not_duplicate_rid(self, tmp_path):
|
||||
relay = _make_relay(tmp_path, auto_seed=True)
|
||||
relay.rids.append("rad:z3already")
|
||||
|
||||
with patch("radicle_reticulum.gossip.subprocess.run") as mock_run, \
|
||||
patch.object(relay, "_trigger_sync"):
|
||||
mock_run.return_value = MagicMock(returncode=0)
|
||||
relay._auto_seed_and_sync("rad:z3already", "z6Mkpeer")
|
||||
|
||||
assert relay.rids.count("rad:z3already") == 1
|
||||
|
||||
|
||||
# ── Trigger sync ──────────────────────────────────────────────────────────────
|
||||
|
||||
|
|
@ -439,7 +586,7 @@ class TestWatchdog:
|
|||
broadcasts = []
|
||||
|
||||
with patch("radicle_reticulum.gossip._read_refs", return_value=new_refs), \
|
||||
patch.object(relay, "_broadcast", side_effect=lambda r, refs: broadcasts.append(r)):
|
||||
patch.object(relay, "_broadcast", side_effect=lambda r, refs, **kw: broadcasts.append(r)):
|
||||
# Signal the event (simulates watchdog firing)
|
||||
relay._poll_event.set()
|
||||
# Run one poll iteration
|
||||
|
|
|
|||
Loading…
Reference in New Issue