radicle-reticulum/src/radicle_reticulum/gossip.py

444 lines
16 KiB
Python

"""Gossip relay for Radicle over Reticulum.
Watches local Radicle storage for ref changes and sends tiny notification
packets to peer relays over RNS (including LoRa). On receipt, calls
'rad sync --fetch' so radicle-node pulls the actual git data through the
TCP bridge.
Flow:
refs change in ~/.radicle/storage/<rid>/
→ detect via poll
→ RNS packet "refs changed for rid X, nid Y" (~200-500 bytes)
→ peer relay receives it
→ peer calls: rad sync --fetch --rid X
→ radicle-node fetches via TCP bridge
"""
import json
import os
import struct
import subprocess
import threading
import time
from pathlib import Path
from typing import Callable, Dict, List, Optional, Tuple
import RNS
from radicle_reticulum.identity import RadicleIdentity
APP_NAME = "radicle"
ASPECT_GOSSIP = "gossip"
GOSSIP_MAGIC = b"RADICLE_GOSSIP_V1"
DEFAULT_POLL_INTERVAL = 30 # seconds
PATH_REQUEST_TIMEOUT = 15 # seconds to wait for a path before giving up
def _radicle_storage_path() -> Path:
"""Return ~/.radicle/storage, using 'rad path' if available."""
try:
result = subprocess.run(
["rad", "path"],
capture_output=True, text=True, timeout=5,
)
if result.returncode == 0:
return Path(result.stdout.strip()) / "storage"
except Exception:
pass
return Path.home() / ".radicle" / "storage"
def _read_refs(storage: Path, rid: str) -> Dict[str, str]:
"""Read current git refs from radicle storage for a repo.
Returns {ref_name: sha} or empty dict if the repo isn't in storage yet.
"""
rid_hash = rid.removeprefix("rad:")
repo_path = storage / rid_hash
if not repo_path.exists():
return {}
try:
result = subprocess.run(
["git", "show-ref"],
cwd=repo_path,
capture_output=True, text=True, timeout=10,
)
refs: Dict[str, str] = {}
for line in result.stdout.splitlines():
parts = line.split(maxsplit=1)
if len(parts) == 2:
refs[parts[1]] = parts[0]
return refs
except Exception:
return {}
class GossipRelay:
"""Watches Radicle refs and notifies peers over RNS when they change.
Designed to run alongside the TCP bridge. The bridge carries the actual
git pack data; this relay only sends tiny "go fetch" signals.
"""
def __init__(
self,
identity: RadicleIdentity,
rids: List[str],
storage: Optional[Path] = None,
radicle_nid: Optional[str] = None,
bridge_port: Optional[int] = 8777,
poll_interval: int = DEFAULT_POLL_INTERVAL,
announce_retry_delays: Tuple[int, ...] = (5, 15, 30),
config_path: Optional[str] = None,
auto_discover: bool = False,
rad_home: Optional[str] = None,
):
"""
Args:
identity: RNS/Radicle identity for this relay.
rids: List of Radicle repository IDs to watch (e.g. 'rad:z3abc...').
storage: Path to radicle storage dir. Auto-detected if None.
radicle_nid: Local radicle NID to advertise to peers.
bridge_port: TCP port for 'rad node connect' in _trigger_sync.
Pass None to skip that step (seed mode: bridge's auto_seed
already registered NIDs on correct per-bridge ports).
poll_interval: Seconds between ref polls.
announce_retry_delays: Startup re-announce delays (seconds).
config_path: Reticulum config path (None = default).
auto_discover: Scan storage each poll cycle and add new repo dirs
to rids automatically. Useful in seed mode.
rad_home: RAD_HOME override for rad CLI calls. None = system default.
"""
self.identity = identity
self.rids = list(rids)
self.storage = storage or _radicle_storage_path()
self.radicle_nid = radicle_nid
self.bridge_port = bridge_port
self.poll_interval = poll_interval
self.announce_retry_delays = announce_retry_delays
self.auto_discover = auto_discover
self.rad_home = rad_home
self.reticulum = RNS.Reticulum(config_path)
self.destination = RNS.Destination(
identity.rns_identity,
RNS.Destination.IN,
RNS.Destination.SINGLE,
APP_NAME,
ASPECT_GOSSIP,
)
self.destination.set_packet_callback(self._on_packet)
self._known_peers: Dict[bytes, float] = {} # dest_hash -> last_seen
self._peers_lock = threading.Lock()
self._known_refs: Dict[str, Dict[str, str]] = {} # rid -> refs
self._refs_lock = threading.Lock()
self._running = False
self._poll_event = threading.Event() # set by watchdog or stop()
self._observer = None # watchdog Observer, if available
self._on_sync_triggered: Optional[Callable[[str, str], None]] = None
# ── Lifecycle ────────────────────────────────────────────────────────────
def start(self):
"""Start the relay: announce, begin polling, register announce handler."""
self._running = True
RNS.Transport.register_announce_handler(self._on_announce)
self.announce()
threading.Thread(target=self._startup_announce_loop, daemon=True).start()
self._start_watcher()
threading.Thread(target=self._poll_loop, daemon=True).start()
RNS.log(f"Gossip relay started: {self.destination.hexhash}", RNS.LOG_INFO)
RNS.log(
f" Watching {len(self.rids)} repo(s), "
f"{'inotify+' if self._observer else ''}poll every {self.poll_interval}s",
RNS.LOG_INFO,
)
def stop(self):
"""Stop the relay."""
self._running = False
self._poll_event.set() # wake poll loop so it exits promptly
if self._observer:
try:
self._observer.stop()
self._observer.join(timeout=3)
except Exception:
pass
RNS.log("Gossip relay stopped", RNS.LOG_INFO)
def _start_watcher(self):
"""Set up a watchdog filesystem observer if the library is available."""
try:
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
relay = self
class _Handler(FileSystemEventHandler):
def on_any_event(self, event):
if not event.is_directory:
relay._poll_event.set()
self.storage.mkdir(parents=True, exist_ok=True)
self._observer = Observer()
self._observer.schedule(_Handler(), str(self.storage), recursive=True)
self._observer.start()
RNS.log(f"Watchdog active on {self.storage}", RNS.LOG_INFO)
except ImportError:
RNS.log(
"watchdog not installed — install it for instant push detection "
"(pip install watchdog). Falling back to polling.",
RNS.LOG_INFO,
)
# ── Public API ───────────────────────────────────────────────────────────
def announce(self):
"""Announce this relay on the RNS network."""
app_data = GOSSIP_MAGIC
if self.radicle_nid:
nid_bytes = self.radicle_nid.encode()
app_data += struct.pack("!H", len(nid_bytes)) + nid_bytes
self.destination.announce(app_data=app_data)
RNS.log(f"Gossip relay announced: {self.destination.hexhash}", RNS.LOG_DEBUG)
def set_on_sync_triggered(self, callback: Callable[[str, str], None]):
"""Set callback invoked when an incoming gossip message triggers a sync.
Callback signature: callback(rid: str, nid: str)
"""
self._on_sync_triggered = callback
def push_refs_now(self, rid: str):
"""Immediately broadcast current refs for a repo to all known peers."""
refs = _read_refs(self.storage, rid)
if refs:
with self._refs_lock:
self._known_refs[rid] = refs
self._broadcast(rid, refs)
def get_stats(self) -> dict:
with self._peers_lock:
peer_count = len(self._known_peers)
with self._refs_lock:
refs_per_repo = {rid: len(r) for rid, r in self._known_refs.items()}
return {
"known_peers": peer_count,
"watched_repos": len(self.rids),
"refs_per_repo": refs_per_repo,
}
# ── Internal: polling ────────────────────────────────────────────────────
def _startup_announce_loop(self):
for delay in self.announce_retry_delays:
time.sleep(delay)
if not self._running:
return
self.announce()
def _discover_rids(self):
"""Scan storage for new repo dirs and add them to self.rids."""
if not self.storage.exists():
return
try:
for repo_dir in self.storage.iterdir():
if repo_dir.is_dir():
rid = f"rad:{repo_dir.name}"
if rid not in self.rids:
self.rids.append(rid)
RNS.log(f"Auto-discovered repo: {rid[:40]}", RNS.LOG_INFO)
except Exception as e:
RNS.log(f"Error scanning storage: {e}", RNS.LOG_DEBUG)
def _poll_loop_once(self):
"""Check all watched repos for ref changes and broadcast any diffs."""
if self.auto_discover:
self._discover_rids()
for rid in self.rids:
try:
refs = _read_refs(self.storage, rid)
with self._refs_lock:
old = self._known_refs.get(rid)
changed = bool(refs and refs != old)
first_poll = old is None
if changed:
self._known_refs[rid] = refs
if changed and not first_poll:
self._broadcast(rid, refs)
except Exception as e:
RNS.log(f"Gossip poll error ({rid[:20]}): {e}", RNS.LOG_WARNING)
def _poll_loop(self):
while self._running:
self._poll_loop_once()
# Wait for next poll: woken early by watchdog event or stop()
self._poll_event.wait(timeout=self.poll_interval)
self._poll_event.clear()
# ── Internal: sending ────────────────────────────────────────────────────
def _broadcast(self, rid: str, refs: Dict[str, str]):
payload = json.dumps({
"type": "refs",
"rid": rid,
"nid": self.radicle_nid or "",
"refs": refs,
}).encode()
with self._peers_lock:
peers = list(self._known_peers.keys())
sent = sum(1 for h in peers if self._send_packet(h, payload))
RNS.log(f"Broadcast refs for {rid[:20]}... → {sent}/{len(peers)} peers", RNS.LOG_INFO)
def _send_packet(self, peer_hash: bytes, payload: bytes) -> bool:
try:
if not RNS.Transport.has_path(peer_hash):
RNS.Transport.request_path(peer_hash)
deadline = time.time() + PATH_REQUEST_TIMEOUT
while not RNS.Transport.has_path(peer_hash):
if time.time() > deadline:
RNS.log(
f"No path to gossip peer {peer_hash.hex()[:16]}",
RNS.LOG_WARNING,
)
return False
time.sleep(0.2)
peer_identity = RNS.Identity.recall(peer_hash)
if peer_identity is None:
RNS.log(
f"Identity not known for {peer_hash.hex()[:16]}",
RNS.LOG_WARNING,
)
return False
dest = RNS.Destination(
peer_identity,
RNS.Destination.OUT,
RNS.Destination.SINGLE,
APP_NAME,
ASPECT_GOSSIP,
)
RNS.Packet(dest, payload).send()
return True
except Exception as e:
RNS.log(f"Gossip send error: {e}", RNS.LOG_WARNING)
return False
# ── Internal: receiving ──────────────────────────────────────────────────
def _on_packet(self, data: bytes, packet: RNS.Packet):
try:
msg = json.loads(data.decode())
except Exception:
return
if msg.get("type") != "refs":
return
rid: str = msg.get("rid", "")
nid: str = msg.get("nid", "")
remote_refs: Dict[str, str] = msg.get("refs", {})
if not rid or not remote_refs:
return
with self._refs_lock:
local_refs = self._known_refs.get(rid, {})
changed = any(remote_refs.get(r) != local_refs.get(r) for r in remote_refs)
if changed:
RNS.log(
f"Gossip: new refs for {rid[:20]}... from {nid[:24] if nid else 'unknown'}",
RNS.LOG_INFO,
)
threading.Thread(
target=self._trigger_sync,
args=(rid, nid),
daemon=True,
).start()
def _trigger_sync(self, rid: str, nid: str):
"""Run rad node connect (if needed) then rad sync --fetch."""
env = None
if self.rad_home:
env = os.environ.copy()
env["RAD_HOME"] = self.rad_home
if nid and self.bridge_port is not None:
subprocess.run(
["rad", "node", "connect", f"{nid}@127.0.0.1:{self.bridge_port}"],
capture_output=True, timeout=15, env=env,
)
result = subprocess.run(
["rad", "sync", "--fetch", "--rid", rid],
capture_output=True, text=True, timeout=120, env=env,
)
if result.returncode == 0:
RNS.log(f"Sync succeeded: {rid[:20]}", RNS.LOG_INFO)
else:
stderr = result.stderr.strip()
RNS.log(
f"Sync failed for {rid[:20]}: {stderr[:120] if stderr else '(no output)'}",
RNS.LOG_WARNING,
)
if self._on_sync_triggered:
self._on_sync_triggered(rid, nid)
# ── Internal: peer discovery ─────────────────────────────────────────────
def _on_announce(
self,
destination_hash: bytes,
announced_identity: RNS.Identity,
app_data: Optional[bytes],
):
if destination_hash == self.destination.hash:
return
if not app_data or not app_data.startswith(GOSSIP_MAGIC):
return
radicle_nid: Optional[str] = None
if len(app_data) > len(GOSSIP_MAGIC):
try:
offset = len(GOSSIP_MAGIC)
nid_len = struct.unpack("!H", app_data[offset:offset + 2])[0]
raw = app_data[offset + 2: offset + 2 + nid_len]
radicle_nid = raw.decode() or None
except Exception:
pass
with self._peers_lock:
is_new = destination_hash not in self._known_peers
self._known_peers[destination_hash] = time.time()
if is_new:
RNS.log(
f"Discovered gossip peer: {destination_hash.hex()[:16]}"
+ (f" (NID: {radicle_nid[:32]})" if radicle_nid else ""),
RNS.LOG_INFO,
)
# Send our current refs so the peer knows our state immediately
for rid in self.rids:
with self._refs_lock:
refs = self._known_refs.get(rid)
if refs:
payload = json.dumps({
"type": "refs",
"rid": rid,
"nid": self.radicle_nid or "",
"refs": refs,
}).encode()
self._send_packet(destination_hash, payload)