From e051d82af1f63fbd93afd763a47cf5d3cce663bc Mon Sep 17 00:00:00 2001 From: "Maciek \"mab122\" Bator" Date: Wed, 22 Apr 2026 15:44:59 +0200 Subject: [PATCH] feat: replace parallel git-bundle layer with real Radicle seed bridging MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Rips out the custom git-bundle/sync/adaptive/qr parallel sync stack and replaces it with a proper seed-to-seed architecture: two real radicle-node instances (each with seedingPolicy=allow) bridge over Reticulum/LoRa. Users connect their own radicle-node to the local seed once; all mesh sync happens in the background using Radicle's own protocol, so atomicity, Noise XK encryption, and pack verification are all handled natively. New modules: - seed.py: SeedNode — manages a dedicated radicle-node subprocess under its own RAD_HOME (~/.radicle-rns/seed/), writes a seed config only on first run, uses DEVNULL I/O to prevent pipe-buffer deadlocks - gossip.py: GossipRelay — polls ~/.radicle/storage// for ref changes, broadcasts ~200-500 byte RNS packets to known peers, on receipt calls `rad sync --fetch --rid X`; thread-safe via separate peers/refs locks New CLI commands: - `radicle-rns seed`: starts seed radicle-node + bridge as one command; auto-discovers remote seeds over the mesh and connects them - `radicle-rns gossip`: runs ref-watching notification relay bridge.py: added rad_home parameter so `rad node connect` is called with the seed's RAD_HOME when auto-connecting remote seeds. Bug fixes applied during review: - seed.py: PIPE→DEVNULL (64 KB pipe buffer deadlock) - seed.py: missing wait() after kill() (zombie process) - seed.py: write_config() now idempotent (preserves user customisations) - seed.py: is_initialized() wraps TimeoutExpired in except Exception - seed.py: removed hardcoded "network":"main" (breaks testnet) - seed.py: moved `import socket` to top-level - cli.py: bridge.start() inside try/finally (orphaned seed on start error) - cli.py: status line gated on known_bridges change (not every 5 s) - cli.py: removed dead get_remote_bridges() call in bridge status loop - gossip.py: added _refs_lock protecting _known_refs across threads Co-Authored-By: Claude Sonnet 4.6 --- pyproject.toml | 5 - src/radicle_reticulum/__init__.py | 60 +-- src/radicle_reticulum/adaptive.py | 347 ------------- src/radicle_reticulum/bridge.py | 11 + src/radicle_reticulum/cli.py | 631 +++++++++-------------- src/radicle_reticulum/git_bundle.py | 467 ----------------- src/radicle_reticulum/gossip.py | 374 ++++++++++++++ src/radicle_reticulum/qr.py | 191 ------- src/radicle_reticulum/seed.py | 174 +++++++ src/radicle_reticulum/sync.py | 770 ---------------------------- tests/test_adaptive.py | 153 ------ tests/test_git_bundle.py | 267 ---------- tests/test_gossip.py | 378 ++++++++++++++ tests/test_qr.py | 108 ---- tests/test_sync.py | 543 -------------------- 15 files changed, 1197 insertions(+), 3282 deletions(-) delete mode 100644 src/radicle_reticulum/adaptive.py delete mode 100644 src/radicle_reticulum/git_bundle.py create mode 100644 src/radicle_reticulum/gossip.py delete mode 100644 src/radicle_reticulum/qr.py create mode 100644 src/radicle_reticulum/seed.py delete mode 100644 src/radicle_reticulum/sync.py delete mode 100644 tests/test_adaptive.py delete mode 100644 tests/test_git_bundle.py create mode 100644 tests/test_gossip.py delete mode 100644 tests/test_qr.py delete mode 100644 tests/test_sync.py diff --git a/pyproject.toml b/pyproject.toml index 0faa688..9747817 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -15,14 +15,10 @@ dependencies = [ ] [project.optional-dependencies] -qr = [ - "qrcode>=7.0", -] dev = [ "pytest>=7.0.0", "pytest-asyncio>=0.21.0", "mypy>=1.0.0", - "qrcode>=7.0", ] [project.scripts] @@ -36,7 +32,6 @@ dev = [ "pytest>=7.0.0", "pytest-asyncio>=0.21.0", "mypy>=1.0.0", - "qrcode>=8.2", ] [build-system] diff --git a/src/radicle_reticulum/__init__.py b/src/radicle_reticulum/__init__.py index 75a4e0b..38de9d2 100644 --- a/src/radicle_reticulum/__init__.py +++ b/src/radicle_reticulum/__init__.py @@ -12,44 +12,15 @@ from radicle_reticulum.messages import ( Pong, decode_message, ) -from radicle_reticulum.git_bundle import ( - GitBundle, - GitBundleGenerator, - GitBundleApplicator, - BundleType, - BundleMetadata, -) -from radicle_reticulum.sync import ( - SyncManager, - SyncMode, - RefsAnnouncement, - create_dead_drop_bundle, - apply_dead_drop_bundle, -) -from radicle_reticulum.adaptive import ( - SyncStrategy, - LinkQuality, - BandwidthProbe, - AdaptiveSyncManager, - select_strategy, -) from radicle_reticulum.bridge import RadicleBridge -from radicle_reticulum.qr import ( - encode_bundle_to_qr, - decode_bundle_from_qr_data, - decode_bundle_from_qr_image, - BundleTooLargeForQR, - QR_MAX_BYTES as QR_BUNDLE_MAX_BYTES, -) +from radicle_reticulum.gossip import GossipRelay +from radicle_reticulum.seed import SeedNode __version__ = "0.1.0" __all__ = [ - # Identity "RadicleIdentity", - # Transport "RNSTransportAdapter", "RadicleLink", - # Messages "MessageType", "NodeAnnouncement", "InventoryAnnouncement", @@ -57,30 +28,7 @@ __all__ = [ "Ping", "Pong", "decode_message", - # Git bundles - "GitBundle", - "GitBundleGenerator", - "GitBundleApplicator", - "BundleType", - "BundleMetadata", - # Sync - "SyncManager", - "SyncMode", - "RefsAnnouncement", - "create_dead_drop_bundle", - "apply_dead_drop_bundle", - # Adaptive sync - "SyncStrategy", - "LinkQuality", - "BandwidthProbe", - "AdaptiveSyncManager", - "select_strategy", - # Bridge "RadicleBridge", - # QR - "encode_bundle_to_qr", - "decode_bundle_from_qr_data", - "decode_bundle_from_qr_image", - "BundleTooLargeForQR", - "QR_BUNDLE_MAX_BYTES", + "GossipRelay", + "SeedNode", ] diff --git a/src/radicle_reticulum/adaptive.py b/src/radicle_reticulum/adaptive.py deleted file mode 100644 index 39f88e3..0000000 --- a/src/radicle_reticulum/adaptive.py +++ /dev/null @@ -1,347 +0,0 @@ -"""Adaptive sync with automatic bandwidth detection. - -Automatically selects sync strategy based on: -- Link RTT (round-trip time) -- Measured throughput -- Bundle size estimation - -Strategies: -- FULL: Fast links (>100 Kbps) - send complete bundles -- INCREMENTAL: Medium links (1-100 Kbps) - send only changes -- MINIMAL: Slow links (<1 Kbps, LoRa) - send only refs + request -- QR: Tiny payloads (<3KB) - encode as QR for visual transfer -""" - -import time -import struct -from dataclasses import dataclass -from enum import Enum -from typing import Optional, Tuple - -import RNS - -from radicle_reticulum.link import RadicleLink - - -class SyncStrategy(Enum): - """Sync strategy based on link quality.""" - FULL = "full" # >100 Kbps - send everything - INCREMENTAL = "incremental" # 1-100 Kbps - only changes - MINIMAL = "minimal" # <1 Kbps - refs only, request pulls - QR = "qr" # <3 KB payload - QR code capable - - -@dataclass -class LinkQuality: - """Measured link quality metrics.""" - rtt_ms: float # Round-trip time in milliseconds - throughput_bps: float # Estimated throughput in bits per second - packet_loss: float # Packet loss ratio (0-1) - is_lora: bool # Detected as LoRa link - strategy: SyncStrategy # Recommended strategy - - @property - def throughput_kbps(self) -> float: - return self.throughput_bps / 1000 - - def __repr__(self) -> str: - return ( - f"LinkQuality(rtt={self.rtt_ms:.0f}ms, " - f"throughput={self.throughput_kbps:.1f}Kbps, " - f"strategy={self.strategy.value})" - ) - - -# Bandwidth thresholds (bits per second) -THRESHOLD_FULL = 100_000 # 100 Kbps - use full sync -THRESHOLD_INCREMENTAL = 1_000 # 1 Kbps - use incremental -# Below 1 Kbps - use minimal/QR - -# RTT thresholds (milliseconds) - used as heuristic -RTT_FAST = 100 # <100ms likely ethernet/wifi -RTT_MEDIUM = 1000 # <1s likely internet -RTT_SLOW = 10000 # <10s likely LoRa/satellite - -# QR code capacity -QR_MAX_BYTES = 2953 # QR version 40, binary mode - - -class BandwidthProbe: - """Probes link quality to determine optimal sync strategy.""" - - # Probe packet sizes - PROBE_SMALL = 64 # Minimum probe - PROBE_MEDIUM = 512 # Medium probe - PROBE_LARGE = 2048 # Large probe (if link allows) - - def __init__(self, link: RadicleLink): - self.link = link - self._measurements: list = [] - - def measure_rtt(self, samples: int = 3) -> float: - """Measure round-trip time with multiple samples.""" - rtts = [] - - for _ in range(samples): - # Send probe packet - probe_data = struct.pack("!BQ", 0x01, int(time.time() * 1000000)) - start = time.time() - - if not self.link.send(probe_data): - continue - - # Wait for response - response = self.link.recv(timeout=30.0) - if response: - rtt = (time.time() - start) * 1000 # ms - rtts.append(rtt) - - time.sleep(0.1) # Brief pause between probes - - if not rtts: - return float('inf') - - # Use median RTT - rtts.sort() - return rtts[len(rtts) // 2] - - def measure_throughput(self, duration: float = 2.0) -> float: - """Measure throughput by sending test data.""" - # Start with small packets, increase if successful - packet_size = self.PROBE_SMALL - bytes_sent = 0 - start = time.time() - - while time.time() - start < duration: - # Create probe packet - probe_data = struct.pack("!BI", 0x02, packet_size) + b"\x00" * (packet_size - 5) - - if self.link.send(probe_data): - bytes_sent += len(probe_data) - - # Try larger packets if successful - if packet_size < self.PROBE_LARGE: - packet_size = min(packet_size * 2, self.PROBE_LARGE) - else: - # Reduce packet size on failure - packet_size = max(packet_size // 2, self.PROBE_SMALL) - - time.sleep(0.01) # Small delay - - elapsed = time.time() - start - if elapsed > 0: - return (bytes_sent * 8) / elapsed # bits per second - return 0 - - def detect_link_type(self) -> bool: - """Detect if link is LoRa based on characteristics.""" - # LoRa typically has: - # - High RTT (>1s) - # - Low throughput (<10 Kbps) - # - Link properties may indicate - - if hasattr(self.link.rns_link, 'get_establishment_rate'): - rate = self.link.rns_link.get_establishment_rate() - if rate and rate < 10000: # <10 Kbps - return True - - # Check RTT heuristic - if self.link.rtt and self.link.rtt > 1.0: # >1 second RTT - return True - - return False - - def probe(self, quick: bool = False) -> LinkQuality: - """Probe link and return quality assessment. - - Args: - quick: If True, use faster but less accurate probing - """ - # Quick mode uses existing RTT if available - if quick and self.link.rtt: - rtt_ms = self.link.rtt * 1000 - # Estimate throughput from RTT - if rtt_ms < RTT_FAST: - throughput = THRESHOLD_FULL * 10 # Assume fast - elif rtt_ms < RTT_MEDIUM: - throughput = THRESHOLD_FULL - elif rtt_ms < RTT_SLOW: - throughput = THRESHOLD_INCREMENTAL * 10 - else: - throughput = THRESHOLD_INCREMENTAL / 10 - else: - # Full measurement - rtt_ms = self.measure_rtt(samples=2 if quick else 3) - throughput = self.measure_throughput(duration=1.0 if quick else 2.0) - - is_lora = self.detect_link_type() - - # Determine strategy - if throughput >= THRESHOLD_FULL and rtt_ms < RTT_MEDIUM: - strategy = SyncStrategy.FULL - elif throughput >= THRESHOLD_INCREMENTAL: - strategy = SyncStrategy.INCREMENTAL - else: - strategy = SyncStrategy.MINIMAL - - return LinkQuality( - rtt_ms=rtt_ms, - throughput_bps=throughput, - packet_loss=0.0, # TODO: measure - is_lora=is_lora, - strategy=strategy, - ) - - -def estimate_transfer_time(size_bytes: int, quality: LinkQuality) -> float: - """Estimate transfer time in seconds for given size and quality.""" - if quality.throughput_bps <= 0: - return float('inf') - - # Account for protocol overhead (~20%) - effective_throughput = quality.throughput_bps * 0.8 - bits = size_bytes * 8 - return bits / effective_throughput - - -def select_strategy( - bundle_size: int, - incremental_size: Optional[int], - quality: LinkQuality, - max_transfer_time: float = 3600, # 1 hour default max -) -> Tuple[SyncStrategy, str]: - """Select optimal sync strategy based on sizes and link quality. - - Returns (strategy, reason). - """ - # Check if QR is viable for tiny payloads - if incremental_size and incremental_size <= QR_MAX_BYTES: - return SyncStrategy.QR, f"Incremental fits in QR ({incremental_size} bytes)" - - # Calculate transfer times - full_time = estimate_transfer_time(bundle_size, quality) - incr_time = estimate_transfer_time(incremental_size or bundle_size, quality) - - # If LoRa detected, prefer minimal/incremental - if quality.is_lora: - if incremental_size and incr_time < max_transfer_time: - return SyncStrategy.INCREMENTAL, f"LoRa link, incremental viable ({incr_time:.0f}s)" - return SyncStrategy.MINIMAL, "LoRa link, request-based sync recommended" - - # For fast links, full sync if reasonable - if quality.strategy == SyncStrategy.FULL: - if full_time < 60: # Under 1 minute - return SyncStrategy.FULL, f"Fast link, full sync in {full_time:.0f}s" - elif incremental_size and incr_time < full_time / 2: - return SyncStrategy.INCREMENTAL, f"Large repo, incremental faster ({incr_time:.0f}s vs {full_time:.0f}s)" - return SyncStrategy.FULL, f"Fast link, full sync in {full_time:.0f}s" - - # For medium links, prefer incremental - if incremental_size and incr_time < max_transfer_time: - return SyncStrategy.INCREMENTAL, f"Medium link, incremental in {incr_time:.0f}s" - - if full_time < max_transfer_time: - return SyncStrategy.FULL, f"No incremental available, full in {full_time:.0f}s" - - return SyncStrategy.MINIMAL, f"Transfer too slow ({full_time:.0f}s), use request-based" - - -class AdaptiveSyncManager: - """Sync manager with automatic bandwidth adaptation.""" - - def __init__(self, sync_manager): - """Wrap a SyncManager with adaptive capabilities.""" - self.sync_manager = sync_manager - self._link_qualities: dict = {} # peer_hash -> LinkQuality - - def probe_peer(self, link: RadicleLink, quick: bool = True) -> LinkQuality: - """Probe a peer's link quality.""" - prober = BandwidthProbe(link) - quality = prober.probe(quick=quick) - - # Cache result - if link.remote_identity: - self._link_qualities[link.remote_identity.hash] = quality - - return quality - - def get_cached_quality(self, peer_hash: bytes) -> Optional[LinkQuality]: - """Get cached link quality for a peer.""" - return self._link_qualities.get(peer_hash) - - def adaptive_sync( - self, - repository_id: str, - link: RadicleLink, - peer_refs: Optional[dict] = None, - ) -> Tuple[bool, str]: - """Perform adaptive sync based on link quality. - - Returns (success, description). - """ - from radicle_reticulum.git_bundle import GitBundleGenerator, estimate_bundle_size - from radicle_reticulum.sync import SyncMode - - # Get repository info - state = self.sync_manager._repos.get(repository_id) - if not state: - return False, "Repository not registered" - - # Probe link quality - quality = self.probe_peer(link, quick=True) - RNS.log(f"Link quality: {quality}", RNS.LOG_INFO) - - # Estimate bundle sizes - generator = GitBundleGenerator(state.local_path) - full_size = estimate_bundle_size(state.local_path) - - # Estimate incremental size (rough: compare ref counts) - current_refs = generator.get_refs() - if peer_refs: - changed = sum(1 for r, s in current_refs.items() - if r not in peer_refs or peer_refs[r] != s) - # Rough estimate: assume ~10KB per changed ref average - incr_size = changed * 10240 - else: - incr_size = None - - # Select strategy - strategy, reason = select_strategy(full_size, incr_size, quality) - RNS.log(f"Selected strategy: {strategy.value} - {reason}", RNS.LOG_INFO) - - # Execute strategy - if strategy == SyncStrategy.QR: - bundle = self.sync_manager.create_sync_bundle( - repository_id, peer_refs=peer_refs, mode=SyncMode.INCREMENTAL - ) - if bundle: - # Generate QR (handled by caller) - return True, f"QR: {len(bundle.encode())} bytes" - return False, "No changes for QR" - - elif strategy == SyncStrategy.FULL: - bundle = self.sync_manager.create_sync_bundle( - repository_id, mode=SyncMode.FULL - ) - elif strategy == SyncStrategy.INCREMENTAL: - bundle = self.sync_manager.create_sync_bundle( - repository_id, peer_refs=peer_refs, mode=SyncMode.INCREMENTAL - ) - else: # MINIMAL - # Just announce refs, let peer request what they need - self.sync_manager.announce_refs(repository_id) - return True, "Announced refs for request-based sync" - - if bundle is None: - return True, "No changes to sync" - - # Send bundle - if link.remote_identity: - success = self.sync_manager.send_bundle( - bundle, link.remote_identity.hash - ) - if success: - return True, f"Sent {bundle.metadata.bundle_type.value} bundle ({bundle.metadata.size_bytes} bytes)" - return False, "Failed to send bundle" - - return False, "No remote identity on link" diff --git a/src/radicle_reticulum/bridge.py b/src/radicle_reticulum/bridge.py index acd30fa..4cf5add 100644 --- a/src/radicle_reticulum/bridge.py +++ b/src/radicle_reticulum/bridge.py @@ -15,6 +15,7 @@ The bridge: 4. Remote bridges forward to their local radicle-node """ +import os import socket import select import struct @@ -90,6 +91,7 @@ class RadicleBridge: auto_connect: bool = True, auto_seed: bool = True, announce_retry_delays: Tuple[int, ...] = (5, 15, 30), + rad_home: Optional[str] = None, ): """Initialize the bridge. @@ -103,6 +105,8 @@ class RadicleBridge: auto_seed: Automatically register discovered NIDs with radicle-node announce_retry_delays: Seconds between startup re-announces. Use longer intervals on LoRa to respect duty cycle limits, e.g. (60, 300, 900). + rad_home: RAD_HOME for rad CLI calls. None = use system default. + Set to seed home when bridging a dedicated seed node. """ self.listen_port = listen_port self.radicle_host = radicle_host @@ -110,6 +114,7 @@ class RadicleBridge: self.auto_connect = auto_connect self.auto_seed = auto_seed self.announce_retry_delays = announce_retry_delays + self.rad_home = rad_home # Initialize Reticulum self.reticulum = RNS.Reticulum(config_path) @@ -557,12 +562,18 @@ class RadicleBridge: addr = f"{radicle_nid}@127.0.0.1:{self.listen_port}" RNS.log(f"Registering seed: {addr}", RNS.LOG_INFO) + env = None + if self.rad_home: + env = os.environ.copy() + env["RAD_HOME"] = str(self.rad_home) + try: result = subprocess.run( ["rad", "node", "connect", addr], capture_output=True, text=True, timeout=30, + env=env, ) if result.returncode == 0: RNS.log(f"Seed registered: {radicle_nid[:16]}...", RNS.LOG_INFO) diff --git a/src/radicle_reticulum/cli.py b/src/radicle_reticulum/cli.py index 2d2a2c3..03dfffd 100644 --- a/src/radicle_reticulum/cli.py +++ b/src/radicle_reticulum/cli.py @@ -1,7 +1,6 @@ """Command-line interface for Radicle-Reticulum adapter.""" import argparse -import json import os import subprocess import sys @@ -16,24 +15,9 @@ import RNS from radicle_reticulum.adapter import RNSTransportAdapter, PeerInfo from radicle_reticulum.identity import RadicleIdentity -from radicle_reticulum.git_bundle import ( - GitBundleGenerator, - GitBundleApplicator, - BundleType, - estimate_bundle_size, -) -from radicle_reticulum.sync import ( - SyncManager, - SyncMode, - create_dead_drop_bundle, - apply_dead_drop_bundle, -) -from radicle_reticulum.adaptive import ( - SyncStrategy, - LinkQuality, - select_strategy, -) from radicle_reticulum.bridge import RadicleBridge +from radicle_reticulum.gossip import GossipRelay +from radicle_reticulum.seed import SeedNode, DEFAULT_SEED_HOME, DEFAULT_SEED_PORT def detect_radicle_nid() -> Optional[str]: @@ -235,319 +219,208 @@ def cmd_peers(args): adapter.stop() -def cmd_bundle_create(args): - """Create a bundle from a repository.""" - repo_path = Path(args.repo).resolve() - if not repo_path.exists(): - print(f"Error: Repository not found: {repo_path}", file=sys.stderr) - sys.exit(1) - - # Generate identity for signing - identity = RadicleIdentity.generate() - source_node = identity.did - - # Repository ID - repo_id = args.repo_id or f"rad:local:{repo_path.name}" - +def _detect_rid(repo_path: Path) -> Optional[str]: + """Detect the Radicle RID for the repo at repo_path via 'rad inspect'.""" try: - generator = GitBundleGenerator(repo_path) - - # Get current refs - refs = generator.get_refs() - print(f"Repository: {repo_path}") - print(f"Refs found: {len(refs)}") - for ref, sha in refs.items(): - print(f" {ref}: {sha[:12]}") - print() - - # Determine output path - if args.output: - output_path = Path(args.output) - else: - timestamp = int(time.time()) - output_path = Path(f"{repo_path.name}-{timestamp}.bundle") - - # Create bundle - if args.incremental and args.basis: - # Load basis refs from file - basis_refs = json.loads(Path(args.basis).read_text()) - bundle = generator.create_incremental_bundle( - repository_id=repo_id, - source_node=source_node, - basis_refs=basis_refs, - output_path=output_path, - ) - if bundle is None: - print("No changes to bundle (repository is up to date)") - sys.exit(0) - else: - bundle = generator.create_full_bundle( - repository_id=repo_id, - source_node=source_node, - output_path=output_path, - ) - - # Also save transport format - transport_path = output_path.with_suffix(".radicle-bundle") - transport_path.write_bytes(bundle.encode()) - - print(f"Bundle created: {output_path}") - print(f"Transport file: {transport_path}") - print(f"Type: {bundle.metadata.bundle_type.value}") - print(f"Size: {bundle.metadata.size_bytes:,} bytes") - print(f"Refs: {len(bundle.metadata.refs_included)}") - - # Save current refs for future incremental - refs_path = output_path.with_suffix(".refs.json") - refs_path.write_text(json.dumps(refs, indent=2)) - print(f"Refs saved: {refs_path}") - - except Exception as e: - print(f"Error creating bundle: {e}", file=sys.stderr) - sys.exit(1) - - -def cmd_bundle_apply(args): - """Apply a bundle to a repository.""" - bundle_path = Path(args.bundle).resolve() - repo_path = Path(args.repo).resolve() - - if not bundle_path.exists(): - print(f"Error: Bundle not found: {bundle_path}", file=sys.stderr) - sys.exit(1) - - if not repo_path.exists(): - print(f"Error: Repository not found: {repo_path}", file=sys.stderr) - sys.exit(1) - - try: - # Determine bundle format - if bundle_path.suffix == ".radicle-bundle": - # Full transport format with metadata - applied = apply_dead_drop_bundle(bundle_path, repo_path) - else: - # Raw git bundle - applicator = GitBundleApplicator(repo_path) - - # Create a minimal GitBundle wrapper - from radicle_reticulum.git_bundle import GitBundle, BundleMetadata - import hashlib - - bundle_data = bundle_path.read_bytes() - metadata = BundleMetadata( - bundle_type=BundleType.FULL, - repository_id="unknown", - source_node="unknown", - timestamp=int(time.time() * 1000), - refs_included=[], - prerequisites=[], - size_bytes=len(bundle_data), - checksum=hashlib.sha256(bundle_data).digest(), - ) - bundle = GitBundle(metadata=metadata, data=bundle_data) - applied = applicator.apply_bundle(bundle) - - print(f"Bundle applied successfully!") - print(f"Applied refs: {len(applied)}") - for ref, sha in applied.items(): - print(f" {ref}: {sha[:12]}") - - except Exception as e: - print(f"Error applying bundle: {e}", file=sys.stderr) - sys.exit(1) - - -def cmd_bundle_info(args): - """Show information about a bundle.""" - bundle_path = Path(args.bundle).resolve() - - if not bundle_path.exists(): - print(f"Error: Bundle not found: {bundle_path}", file=sys.stderr) - sys.exit(1) - - try: - if bundle_path.suffix == ".radicle-bundle": - # Full transport format - from radicle_reticulum.git_bundle import GitBundle - bundle = GitBundle.decode(bundle_path.read_bytes()) - - print(f"Bundle: {bundle_path}") - print(f"Format: Radicle transport bundle") - print(f"Type: {bundle.metadata.bundle_type.value}") - print(f"Repository: {bundle.metadata.repository_id}") - print(f"Source: {bundle.metadata.source_node}") - print(f"Timestamp: {time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(bundle.metadata.timestamp / 1000))}") - print(f"Size: {bundle.metadata.size_bytes:,} bytes") - print(f"Checksum: {bundle.metadata.checksum.hex()}") - print(f"Refs ({len(bundle.metadata.refs_included)}):") - for ref in bundle.metadata.refs_included: - print(f" {ref}") - if bundle.metadata.prerequisites: - print(f"Prerequisites ({len(bundle.metadata.prerequisites)}):") - for prereq in bundle.metadata.prerequisites: - print(f" {prereq}") - else: - # Raw git bundle - use git to inspect - import subprocess - result = subprocess.run( - ["git", "bundle", "list-heads", str(bundle_path)], - capture_output=True, - text=True, - ) - print(f"Bundle: {bundle_path}") - print(f"Format: Raw Git bundle") - print(f"Size: {bundle_path.stat().st_size:,} bytes") - print(f"Refs:") - for line in result.stdout.strip().split("\n"): - if line: - print(f" {line}") - - except Exception as e: - print(f"Error reading bundle: {e}", file=sys.stderr) - sys.exit(1) - - -def cmd_bundle_qr_encode(args): - """Encode a bundle as a QR code for visual / air-gapped transfer.""" - from radicle_reticulum.qr import encode_bundle_to_qr, BundleTooLargeForQR, QR_MAX_BYTES - from radicle_reticulum.git_bundle import GitBundle - - bundle_path = Path(args.bundle).resolve() - if not bundle_path.exists(): - print(f"Error: Bundle not found: {bundle_path}", file=sys.stderr) - sys.exit(1) - - try: - bundle = GitBundle.decode(bundle_path.read_bytes()) - except Exception as e: - print(f"Error reading bundle: {e}", file=sys.stderr) - sys.exit(1) - - output_path = Path(args.output) if args.output else None - - try: - ascii_art = encode_bundle_to_qr( - bundle, - output_path=output_path, - error_correction=args.error_correction, + result = subprocess.run( + ["rad", "inspect", "--rid"], + cwd=repo_path, + capture_output=True, text=True, timeout=5, ) - except BundleTooLargeForQR as e: - print(f"Error: {e}", file=sys.stderr) - print(f"Tip: Use 'bundle create --incremental' to reduce bundle size.", file=sys.stderr) - sys.exit(1) - except ImportError as e: - print(f"Error: {e}", file=sys.stderr) - sys.exit(1) - - print(ascii_art) - if output_path: - print(f"QR image saved: {output_path}") - print(f"Bundle size: {len(bundle.encode())} / {QR_MAX_BYTES} bytes") - print(f"Repository: {bundle.metadata.repository_id}") - print(f"Refs: {len(bundle.metadata.refs_included)}") + if result.returncode == 0: + rid = result.stdout.strip() + if rid.startswith("rad:"): + return rid + except Exception: + pass + return None -def cmd_bundle_qr_decode(args): - """Decode a bundle from a QR code image file.""" - from radicle_reticulum.qr import decode_bundle_from_qr_image - - image_path = Path(args.image).resolve() - if not image_path.exists(): - print(f"Error: Image not found: {image_path}", file=sys.stderr) - sys.exit(1) - - try: - bundle = decode_bundle_from_qr_image(image_path) - except ImportError as e: - print(f"Error: {e}", file=sys.stderr) - sys.exit(1) - except ValueError as e: - print(f"Error: {e}", file=sys.stderr) - sys.exit(1) - - output_path = Path(args.output) if args.output else image_path.with_suffix(".radicle-bundle") - output_path.write_bytes(bundle.encode()) - - print(f"Bundle decoded successfully!") - print(f" Repository: {bundle.metadata.repository_id}") - print(f" Refs: {len(bundle.metadata.refs_included)}") - print(f" Size: {bundle.metadata.size_bytes:,} bytes") - print(f" Saved to: {output_path}") - print() - print(f"Apply with: radicle-rns bundle apply {output_path} ") - - -def cmd_sync(args): - """Sync a repository with a peer.""" - repo_path = Path(args.repo).resolve() - - if not repo_path.exists(): - print(f"Error: Repository not found: {repo_path}", file=sys.stderr) - sys.exit(1) - +def cmd_gossip(args): + """Run the gossip relay daemon.""" identity = RadicleIdentity.load_or_generate(args.identity) - repo_id = args.repo_id or f"rad:local:{repo_path.name}" + _print_identity_info(args.identity) - print(f"Starting sync manager...") - print(f"Repository: {repo_path}") - print(f"Node ID: {identity.did}") - - sync_manager = SyncManager(identity=identity) - sync_manager.start() - sync_manager.register_repository(repo_id, repo_path) - - if args.peer: - # Connect to specific peer - try: - dest_hash = bytes.fromhex(args.peer) - except ValueError: - print(f"Error: Invalid peer hash", file=sys.stderr) + # Collect RIDs: explicit args + auto-detect from CWD + rids = list(args.rids) + if not rids: + rid = _detect_rid(Path.cwd()) + if rid: + print(f"Auto-detected RID: {rid}") + rids.append(rid) + else: + print("Error: no RIDs given and could not auto-detect from current directory.", file=sys.stderr) + print("Pass one or more RIDs as arguments, or run from inside a radicle repo.", file=sys.stderr) sys.exit(1) - print(f"Creating bundle for peer {args.peer}...") + nid = args.nid or detect_radicle_nid() + if nid: + print(f"Local NID: {nid}") - # Load peer's known refs if available - peer_refs = None - if args.peer_refs: - peer_refs = json.loads(Path(args.peer_refs).read_text()) + try: + announce_retry_delays = tuple( + int(x.strip()) for x in args.announce_retry_delays.split(",") if x.strip() + ) + except ValueError: + print("Error: --announce-retry-delays must be comma-separated integers.", file=sys.stderr) + sys.exit(1) - mode = SyncMode.INCREMENTAL if peer_refs else SyncMode.FULL - bundle = sync_manager.create_sync_bundle(repo_id, peer_refs=peer_refs, mode=mode) + relay = GossipRelay( + identity=identity, + rids=rids, + radicle_nid=nid, + bridge_port=args.bridge_port, + poll_interval=args.poll_interval, + announce_retry_delays=announce_retry_delays, + ) - if bundle: - print(f"Bundle created: {bundle.metadata.size_bytes:,} bytes") - print(f"Type: {bundle.metadata.bundle_type.value}") + def on_sync(rid, peer_nid): + print(f"[sync] {rid[:24]}... from {peer_nid[:24] if peer_nid else 'peer'}") - if args.send: - print(f"Sending to peer...") - if sync_manager.send_bundle(bundle, dest_hash): - print("Bundle sent successfully!") - else: - print("Failed to send bundle") - else: - print("No changes to sync") - else: - # Listen for incoming syncs - print("Listening for sync requests... Press Ctrl+C to stop.") + relay.set_on_sync_triggered(on_sync) + relay.start() - def on_bundle(bundle): - print(f"Received bundle: {bundle.metadata.repository_id}") - print(f" From: {bundle.metadata.source_node}") - print(f" Size: {bundle.metadata.size_bytes:,} bytes") + print() + print(f"Gossip relay running:") + print(f" RNS address: {relay.destination.hexhash}") + print(f" Repos: {', '.join(r[:28] for r in rids)}") + print(f" Poll: every {args.poll_interval}s") + print() + print("Press Ctrl+C to stop.") + print() - sync_manager.set_on_bundle_received(on_bundle) + running = True - running = True - def signal_handler(sig, frame): - nonlocal running - running = False + def signal_handler(sig, frame): + nonlocal running + print("\nShutting down...") + running = False - signal.signal(signal.SIGINT, signal_handler) + signal.signal(signal.SIGINT, signal_handler) + signal.signal(signal.SIGTERM, signal_handler) + last_stats = None + try: while running: - time.sleep(0.5) + stats = relay.get_stats() + if stats != last_stats: + print(f"[Status] Peers: {stats['known_peers']}, " + f"Repos: {stats['watched_repos']}, " + f"Refs: {stats['refs_per_repo']}") + last_stats = dict(stats) + time.sleep(5) + finally: + relay.stop() + print("Gossip relay stopped.") - sync_manager.stop() + +def cmd_seed(args): + """Start a dedicated seed radicle-node and bridge it to the mesh.""" + seed_home = Path(args.seed_home) + + seed = SeedNode(seed_home=seed_home, port=args.seed_port) + + # First-time setup: guide the user + if not seed.is_initialized(): + seed.write_config() + print(f"Seed home: {seed_home}") + print() + print("Seed identity not found. Initialize it with:") + print(f" RAD_HOME={seed_home} rad auth") + print() + print("Then run 'radicle-rns seed' again.") + sys.exit(1) + + # Start seed radicle-node + print(f"Starting seed radicle-node (port {args.seed_port})...") + try: + seed.start() + except RuntimeError as e: + print(f"Error: {e}", file=sys.stderr) + sys.exit(1) + + nid = seed.get_nid() + if not nid: + print("Error: could not get seed NID.", file=sys.stderr) + seed.stop() + sys.exit(1) + + # Start bridge pointing at the seed + identity = RadicleIdentity.load_or_generate(args.identity) + _print_identity_info(args.identity) + + try: + announce_retry_delays = tuple( + int(x.strip()) for x in args.announce_retry_delays.split(",") if x.strip() + ) + except ValueError: + print("Error: --announce-retry-delays must be comma-separated integers.", file=sys.stderr) + seed.stop() + sys.exit(1) + + bridge = RadicleBridge( + identity=identity, + listen_port=args.bridge_port, + radicle_host="127.0.0.1", + radicle_port=args.seed_port, + auto_connect=True, + auto_seed=True, + announce_retry_delays=announce_retry_delays, + rad_home=str(seed_home), + ) + bridge.set_local_radicle_nid(nid) + + def on_peer_seed_discovered(dest_hash, remote_nid=None): + nid_info = f" (NID: {remote_nid[:32]})" if remote_nid else "" + print(f"[+] Discovered remote seed: {dest_hash.hex()[:16]}{nid_info}") + + bridge.set_on_bridge_discovered(on_peer_seed_discovered) + + running = True + + def signal_handler(sig, frame): + nonlocal running + print("\nShutting down...") + running = False + + signal.signal(signal.SIGINT, signal_handler) + signal.signal(signal.SIGTERM, signal_handler) + + try: + bridge.start() + + print() + print("Seed node running.") + print(f" Seed NID: {nid}") + print(f" Seed port: {args.seed_port}") + print(f" Bridge hash: {bridge.destination.hexhash}") + print() + print("Add this seed to your radicle node:") + print(f" rad node connect {nid}@127.0.0.1:{args.seed_port}") + print() + print("Other machines running 'radicle-rns seed' will discover this") + print("seed automatically and sync over the mesh.") + print() + print("Press Ctrl+C to stop.") + print() + + last_known_bridges = -1 + while running: + if not seed.is_running(): + print("Seed radicle-node exited unexpectedly.", file=sys.stderr) + break + + stats = bridge.get_stats() + if stats["known_bridges"] != last_known_bridges: + last_known_bridges = stats["known_bridges"] + print(f"[Status] Remote seeds: {stats['known_bridges']}, " + f"Tunnels: {stats['active_tunnels']}") + + time.sleep(5) + finally: + bridge.stop() + seed.stop() + print("Seed stopped.") def cmd_bridge(args): @@ -666,7 +539,6 @@ def cmd_bridge(args): # Show status changes stats = bridge.get_stats() if stats != last_stats: - bridges = bridge.get_remote_bridges() print(f"[Status] Tunnels: {stats['active_tunnels']}, " f"Remote bridges: {stats['known_bridges']}, " f"TX: {stats['bytes_sent']}, RX: {stats['bytes_received']}") @@ -740,57 +612,78 @@ def main(): ) add_identity_arg(peers_parser) - # bundle command group - bundle_parser = subparsers.add_parser("bundle", help="Git bundle operations") - bundle_subparsers = bundle_parser.add_subparsers(dest="bundle_command") - - # bundle create - bundle_create_parser = bundle_subparsers.add_parser("create", help="Create a bundle") - bundle_create_parser.add_argument("repo", help="Path to Git repository") - bundle_create_parser.add_argument("-o", "--output", help="Output bundle path") - bundle_create_parser.add_argument("--repo-id", help="Radicle repository ID") - bundle_create_parser.add_argument("--incremental", action="store_true", - help="Create incremental bundle") - bundle_create_parser.add_argument("--basis", help="Basis refs JSON file for incremental") - - # bundle apply - bundle_apply_parser = bundle_subparsers.add_parser("apply", help="Apply a bundle") - bundle_apply_parser.add_argument("bundle", help="Path to bundle file") - bundle_apply_parser.add_argument("repo", help="Path to Git repository") - - # bundle info - bundle_info_parser = bundle_subparsers.add_parser("info", help="Show bundle info") - bundle_info_parser.add_argument("bundle", help="Path to bundle file") - - # bundle qr-encode - bundle_qr_enc_parser = bundle_subparsers.add_parser( - "qr-encode", help=f"Encode bundle as QR code (max {2953} bytes)" + # gossip command + gossip_parser = subparsers.add_parser( + "gossip", + help="Run gossip relay: watch refs, notify peers, trigger rad sync", ) - bundle_qr_enc_parser.add_argument("bundle", help="Path to .radicle-bundle file") - bundle_qr_enc_parser.add_argument("-o", "--output", help="Output PNG path (requires Pillow)") - bundle_qr_enc_parser.add_argument( - "--ec", dest="error_correction", default="L", - choices=["L", "M", "Q", "H"], - help="QR error correction level (default: L = max capacity)", + gossip_parser.add_argument( + "rids", + nargs="*", + metavar="RID", + help="Radicle repo IDs to watch (auto-detected from CWD if omitted)", ) - - # bundle qr-decode - bundle_qr_dec_parser = bundle_subparsers.add_parser( - "qr-decode", help="Decode bundle from QR code image (requires pyzbar + Pillow)" + gossip_parser.add_argument( + "--nid", + help="Local radicle NID to advertise (auto-detected if omitted)", ) - bundle_qr_dec_parser.add_argument("image", help="Path to QR code image") - bundle_qr_dec_parser.add_argument("-o", "--output", help="Output .radicle-bundle path") + gossip_parser.add_argument( + "--bridge-port", + type=int, + default=8777, + metavar="PORT", + help="TCP port of the local bridge (default: 8777)", + ) + gossip_parser.add_argument( + "--poll-interval", + type=int, + default=30, + metavar="SECONDS", + help="Seconds between ref polls (default: 30)", + ) + gossip_parser.add_argument( + "--announce-retry-delays", + default="5,15,30", + metavar="SECONDS", + help="Startup re-announce delays, comma-separated (default: 5,15,30). " + "Use longer values on LoRa, e.g. 60,300,900", + ) + add_identity_arg(gossip_parser) - # sync command - sync_parser = subparsers.add_parser("sync", help="Sync repository with peers") - sync_parser.add_argument("repo", help="Path to Git repository") - sync_parser.add_argument("--repo-id", help="Radicle repository ID") - sync_parser.add_argument("--peer", help="Peer RNS hash to sync with") - sync_parser.add_argument("--peer-refs", help="JSON file with peer's known refs") - sync_parser.add_argument("--send", action="store_true", help="Send bundle to peer") - add_identity_arg(sync_parser) + # seed command + seed_parser = subparsers.add_parser( + "seed", + help="Start a Radicle seed node and bridge it to the mesh over Reticulum", + ) + seed_parser.add_argument( + "--seed-home", + default=str(DEFAULT_SEED_HOME), + metavar="PATH", + help=f"RAD_HOME for the seed node (default: {DEFAULT_SEED_HOME})", + ) + seed_parser.add_argument( + "--seed-port", + type=int, + default=DEFAULT_SEED_PORT, + metavar="PORT", + help=f"TCP port for the seed radicle-node (default: {DEFAULT_SEED_PORT})", + ) + seed_parser.add_argument( + "--bridge-port", + type=int, + default=8778, + metavar="PORT", + help="TCP listen port for the seed bridge (default: 8778)", + ) + seed_parser.add_argument( + "--announce-retry-delays", + default="5,15,30", + metavar="SECONDS", + help="Startup re-announce delays, comma-separated (default: 5,15,30). " + "Use longer values on LoRa, e.g. 60,300,900", + ) + add_identity_arg(seed_parser) - # bridge command bridge_parser = subparsers.add_parser("bridge", help="Run Radicle-Reticulum bridge") bridge_parser.add_argument( "-l", "--listen-port", @@ -855,22 +748,10 @@ def main(): cmd_ping(args) elif args.command == "peers": cmd_peers(args) - elif args.command == "bundle": - if args.bundle_command == "create": - cmd_bundle_create(args) - elif args.bundle_command == "apply": - cmd_bundle_apply(args) - elif args.bundle_command == "info": - cmd_bundle_info(args) - elif args.bundle_command == "qr-encode": - cmd_bundle_qr_encode(args) - elif args.bundle_command == "qr-decode": - cmd_bundle_qr_decode(args) - else: - bundle_parser.print_help() - sys.exit(1) - elif args.command == "sync": - cmd_sync(args) + elif args.command == "gossip": + cmd_gossip(args) + elif args.command == "seed": + cmd_seed(args) elif args.command == "bridge": cmd_bridge(args) else: diff --git a/src/radicle_reticulum/git_bundle.py b/src/radicle_reticulum/git_bundle.py deleted file mode 100644 index beac0e7..0000000 --- a/src/radicle_reticulum/git_bundle.py +++ /dev/null @@ -1,467 +0,0 @@ -"""Git bundle generation and application for Radicle repos. - -Supports both full repository bundles and incremental bundles -containing only new commits since a known state. - -Radicle stores data under these ref namespaces: -- refs/heads/* - Git branches -- refs/tags/* - Git tags -- refs/rad/id - Repository identity -- refs/rad/sigrefs - Signed refs -- refs/rad/cob/* - Collaborative Objects (issues, patches, etc.) - - refs/rad/cob/xyz.issue/* - - refs/rad/cob/xyz.patch/* -""" - -import os -import subprocess -import tempfile -import hashlib -from dataclasses import dataclass, field -from enum import Enum -from pathlib import Path -from typing import Dict, List, Optional, Set, Tuple -import struct -import time - - -class BundleType(Enum): - """Type of Git bundle.""" - FULL = "full" # Complete repository - INCREMENTAL = "incremental" # Only new commits - - -# Radicle ref patterns to include in sync -RADICLE_REF_PATTERNS = [ - "refs/heads/*", - "refs/tags/*", - "refs/rad/id", - "refs/rad/sigrefs", - "refs/rad/cob/*", -] - - -@dataclass -class BundleMetadata: - """Metadata about a Git bundle for transport.""" - bundle_type: BundleType - repository_id: str # Radicle repo ID (rad:z...) - source_node: str # DID of source node - timestamp: int # Unix timestamp ms - refs_included: List[str] # List of refs in bundle - prerequisites: List[str] # Commits required (for incremental) - size_bytes: int - checksum: bytes # SHA-256 of bundle data - - def encode(self) -> bytes: - """Encode metadata to bytes.""" - repo_bytes = self.repository_id.encode("utf-8") - node_bytes = self.source_node.encode("utf-8") - refs_data = b"".join( - struct.pack(f"!H{len(r)}s", len(r), r.encode("utf-8")) - for r in self.refs_included - ) - prereq_data = b"".join( - struct.pack(f"!H{len(p)}s", len(p), p.encode("utf-8")) - for p in self.prerequisites - ) - - return struct.pack( - f"!BH{len(repo_bytes)}sH{len(node_bytes)}sQIH{len(refs_data)}sH{len(prereq_data)}s32s", - 1 if self.bundle_type == BundleType.FULL else 2, - len(repo_bytes), repo_bytes, - len(node_bytes), node_bytes, - self.timestamp, - self.size_bytes, - len(self.refs_included), refs_data, - len(self.prerequisites), prereq_data, - self.checksum, - ) - - @classmethod - def decode(cls, data: bytes) -> Tuple["BundleMetadata", int]: - """Decode metadata from bytes. Returns (metadata, bytes_consumed).""" - offset = 0 - - # Bundle type - bundle_type_raw = struct.unpack("!B", data[offset:offset+1])[0] - bundle_type = BundleType.FULL if bundle_type_raw == 1 else BundleType.INCREMENTAL - offset += 1 - - # Repository ID - repo_len = struct.unpack("!H", data[offset:offset+2])[0] - offset += 2 - repository_id = data[offset:offset+repo_len].decode("utf-8") - offset += repo_len - - # Source node - node_len = struct.unpack("!H", data[offset:offset+2])[0] - offset += 2 - source_node = data[offset:offset+node_len].decode("utf-8") - offset += node_len - - # Timestamp and size - timestamp, size_bytes = struct.unpack("!QI", data[offset:offset+12]) - offset += 12 - - # Refs - refs_count = struct.unpack("!H", data[offset:offset+2])[0] - offset += 2 - refs_included = [] - for _ in range(refs_count): - ref_len = struct.unpack("!H", data[offset:offset+2])[0] - offset += 2 - refs_included.append(data[offset:offset+ref_len].decode("utf-8")) - offset += ref_len - - # Prerequisites - prereq_count = struct.unpack("!H", data[offset:offset+2])[0] - offset += 2 - prerequisites = [] - for _ in range(prereq_count): - prereq_len = struct.unpack("!H", data[offset:offset+2])[0] - offset += 2 - prerequisites.append(data[offset:offset+prereq_len].decode("utf-8")) - offset += prereq_len - - # Checksum - checksum = data[offset:offset+32] - offset += 32 - - return cls( - bundle_type=bundle_type, - repository_id=repository_id, - source_node=source_node, - timestamp=timestamp, - refs_included=refs_included, - prerequisites=prerequisites, - size_bytes=size_bytes, - checksum=checksum, - ), offset - - -@dataclass -class GitBundle: - """A Git bundle with metadata for transport.""" - metadata: BundleMetadata - data: bytes - - def encode(self) -> bytes: - """Encode bundle with metadata for transport.""" - meta_bytes = self.metadata.encode() - return struct.pack("!I", len(meta_bytes)) + meta_bytes + self.data - - @classmethod - def decode(cls, data: bytes) -> "GitBundle": - """Decode bundle from transport format.""" - meta_len = struct.unpack("!I", data[:4])[0] - metadata, _ = BundleMetadata.decode(data[4:4+meta_len]) - bundle_data = data[4+meta_len:] - - # Verify checksum - actual_checksum = hashlib.sha256(bundle_data).digest() - if actual_checksum != metadata.checksum: - raise ValueError("Bundle checksum mismatch") - - return cls(metadata=metadata, data=bundle_data) - - def save(self, path: Path) -> None: - """Save bundle data to file.""" - path.write_bytes(self.data) - - @property - def size(self) -> int: - """Get total size including metadata.""" - return len(self.encode()) - - -class GitBundleGenerator: - """Generates Git bundles from repositories.""" - - def __init__(self, repo_path: Path): - """Initialize with path to Git repository.""" - self.repo_path = Path(repo_path) - if not (self.repo_path / ".git").exists() and not (self.repo_path / "HEAD").exists(): - raise ValueError(f"Not a Git repository: {repo_path}") - - def _run_git(self, *args: str, check: bool = True) -> subprocess.CompletedProcess: - """Run a git command in the repository.""" - return subprocess.run( - ["git", *args], - cwd=self.repo_path, - capture_output=True, - text=True, - check=check, - ) - - def _run_git_binary(self, *args: str) -> bytes: - """Run a git command and return binary output.""" - result = subprocess.run( - ["git", *args], - cwd=self.repo_path, - capture_output=True, - check=True, - ) - return result.stdout - - def get_refs(self, patterns: Optional[List[str]] = None) -> Dict[str, str]: - """Get refs matching patterns. Returns {ref_name: commit_sha}.""" - if patterns is None: - patterns = RADICLE_REF_PATTERNS - - refs = {} - for pattern in patterns: - result = self._run_git("for-each-ref", "--format=%(refname) %(objectname)", pattern, check=False) - if result.returncode == 0: - for line in result.stdout.strip().split("\n"): - if line: - parts = line.split() - if len(parts) == 2: - refs[parts[0]] = parts[1] - return refs - - def get_radicle_repo_id(self) -> Optional[str]: - """Get Radicle repository ID if this is a Radicle repo.""" - # Radicle stores repo ID in .git/rad or config - rad_dir = self.repo_path / ".git" / "rad" - if rad_dir.exists(): - # Try to read from rad config - try: - result = self._run_git("config", "--get", "rad.id", check=False) - if result.returncode == 0: - return result.stdout.strip() - except Exception: - pass - return None - - def create_full_bundle( - self, - repository_id: str, - source_node: str, - output_path: Optional[Path] = None, - ref_patterns: Optional[List[str]] = None, - ) -> GitBundle: - """Create a full bundle containing all refs. - - Args: - repository_id: Radicle repo ID (rad:z...) - source_node: DID of the source node - output_path: Optional path to write bundle file - ref_patterns: Ref patterns to include (default: Radicle patterns) - """ - refs = self.get_refs(ref_patterns) - if not refs: - raise ValueError("No refs to bundle") - - # Create bundle with all refs - with tempfile.NamedTemporaryFile(suffix=".bundle", delete=False) as f: - bundle_path = f.name - - try: - # Build ref list for bundle create - ref_args = list(refs.keys()) - self._run_git("bundle", "create", bundle_path, *ref_args) - - bundle_data = Path(bundle_path).read_bytes() - finally: - os.unlink(bundle_path) - - metadata = BundleMetadata( - bundle_type=BundleType.FULL, - repository_id=repository_id, - source_node=source_node, - timestamp=int(time.time() * 1000), - refs_included=list(refs.keys()), - prerequisites=[], - size_bytes=len(bundle_data), - checksum=hashlib.sha256(bundle_data).digest(), - ) - - bundle = GitBundle(metadata=metadata, data=bundle_data) - - if output_path: - bundle.save(output_path) - - return bundle - - def create_incremental_bundle( - self, - repository_id: str, - source_node: str, - basis_refs: Dict[str, str], - output_path: Optional[Path] = None, - ref_patterns: Optional[List[str]] = None, - ) -> Optional[GitBundle]: - """Create an incremental bundle with only new commits. - - Args: - repository_id: Radicle repo ID - source_node: DID of the source node - basis_refs: Known refs at destination {ref_name: commit_sha} - output_path: Optional path to write bundle file - ref_patterns: Ref patterns to include - - Returns: - GitBundle if there are changes, None if no changes - """ - current_refs = self.get_refs(ref_patterns) - if not current_refs: - return None - - # Find refs that have changed or are new - changed_refs = {} - for ref, sha in current_refs.items(): - if ref not in basis_refs or basis_refs[ref] != sha: - changed_refs[ref] = sha - - if not changed_refs: - return None # No changes - - # Build exclusion list (commits the destination already has) - exclusions = [f"^{sha}" for sha in basis_refs.values() if sha] - - with tempfile.NamedTemporaryFile(suffix=".bundle", delete=False) as f: - bundle_path = f.name - - try: - # Create bundle with changed refs, excluding known commits - bundle_args = list(changed_refs.keys()) + exclusions - result = self._run_git("bundle", "create", bundle_path, *bundle_args, check=False) - - if result.returncode != 0: - # May fail if no new commits (all excluded) - if "empty bundle" in result.stderr.lower(): - return None - raise subprocess.CalledProcessError(result.returncode, "git bundle create", result.stderr) - - bundle_data = Path(bundle_path).read_bytes() - finally: - if os.path.exists(bundle_path): - os.unlink(bundle_path) - - metadata = BundleMetadata( - bundle_type=BundleType.INCREMENTAL, - repository_id=repository_id, - source_node=source_node, - timestamp=int(time.time() * 1000), - refs_included=list(changed_refs.keys()), - prerequisites=list(basis_refs.values()), - size_bytes=len(bundle_data), - checksum=hashlib.sha256(bundle_data).digest(), - ) - - bundle = GitBundle(metadata=metadata, data=bundle_data) - - if output_path: - bundle.save(output_path) - - return bundle - - -class GitBundleApplicator: - """Applies Git bundles to repositories.""" - - def __init__(self, repo_path: Path): - """Initialize with path to Git repository.""" - self.repo_path = Path(repo_path) - - def _run_git(self, *args: str, check: bool = True) -> subprocess.CompletedProcess: - """Run a git command in the repository.""" - return subprocess.run( - ["git", *args], - cwd=self.repo_path, - capture_output=True, - text=True, - check=check, - ) - - def verify_bundle(self, bundle: GitBundle) -> Tuple[bool, str]: - """Verify a bundle can be applied. - - Returns (success, message). - """ - with tempfile.NamedTemporaryFile(suffix=".bundle", delete=False) as f: - f.write(bundle.data) - bundle_path = f.name - - try: - result = self._run_git("bundle", "verify", bundle_path, check=False) - if result.returncode == 0: - return True, "Bundle verified successfully" - else: - return False, result.stderr.strip() - finally: - os.unlink(bundle_path) - - def apply_bundle(self, bundle: GitBundle, fetch_all: bool = True) -> Dict[str, str]: - """Apply a bundle to the repository. - - Args: - bundle: The GitBundle to apply - fetch_all: If True, fetch all refs from bundle - - Returns: - Dict of applied refs {ref_name: commit_sha} - """ - with tempfile.NamedTemporaryFile(suffix=".bundle", delete=False) as f: - f.write(bundle.data) - bundle_path = f.name - - try: - # Verify first - ok, msg = self.verify_bundle(bundle) - if not ok: - raise ValueError(f"Bundle verification failed: {msg}") - - # List refs in bundle - result = self._run_git("bundle", "list-heads", bundle_path) - bundle_refs = {} - for line in result.stdout.strip().split("\n"): - if line: - parts = line.split() - if len(parts) >= 2: - bundle_refs[parts[1]] = parts[0] - - # Fetch from bundle - if fetch_all: - # Fetch all refs, preserving their names - for ref in bundle_refs: - self._run_git("fetch", bundle_path, f"{ref}:{ref}", check=False) - else: - self._run_git("fetch", bundle_path) - - return bundle_refs - finally: - os.unlink(bundle_path) - - def get_current_refs(self, patterns: Optional[List[str]] = None) -> Dict[str, str]: - """Get current refs for computing incremental basis.""" - if patterns is None: - patterns = RADICLE_REF_PATTERNS - - refs = {} - for pattern in patterns: - result = self._run_git("for-each-ref", "--format=%(refname) %(objectname)", pattern, check=False) - if result.returncode == 0: - for line in result.stdout.strip().split("\n"): - if line: - parts = line.split() - if len(parts) == 2: - refs[parts[0]] = parts[1] - return refs - - -def estimate_bundle_size(repo_path: Path, ref_patterns: Optional[List[str]] = None) -> int: - """Estimate the size of a full bundle without creating it.""" - result = subprocess.run( - ["git", "count-objects", "-v"], - cwd=repo_path, - capture_output=True, - text=True, - ) - # Parse size-pack from output - for line in result.stdout.split("\n"): - if line.startswith("size-pack:"): - # size-pack is in KB - return int(line.split(":")[1].strip()) * 1024 - return 0 diff --git a/src/radicle_reticulum/gossip.py b/src/radicle_reticulum/gossip.py new file mode 100644 index 0000000..75cec17 --- /dev/null +++ b/src/radicle_reticulum/gossip.py @@ -0,0 +1,374 @@ +"""Gossip relay for Radicle over Reticulum. + +Watches local Radicle storage for ref changes and sends tiny notification +packets to peer relays over RNS (including LoRa). On receipt, calls +'rad sync --fetch' so radicle-node pulls the actual git data through the +TCP bridge. + +Flow: + refs change in ~/.radicle/storage// + → detect via poll + → RNS packet "refs changed for rid X, nid Y" (~200-500 bytes) + → peer relay receives it + → peer calls: rad sync --fetch --rid X + → radicle-node fetches via TCP bridge +""" + +import json +import struct +import subprocess +import threading +import time +from pathlib import Path +from typing import Callable, Dict, List, Optional, Tuple + +import RNS + +from radicle_reticulum.identity import RadicleIdentity + + +APP_NAME = "radicle" +ASPECT_GOSSIP = "gossip" +GOSSIP_MAGIC = b"RADICLE_GOSSIP_V1" + +DEFAULT_POLL_INTERVAL = 30 # seconds +PATH_REQUEST_TIMEOUT = 15 # seconds to wait for a path before giving up + + +def _radicle_storage_path() -> Path: + """Return ~/.radicle/storage, using 'rad path' if available.""" + try: + result = subprocess.run( + ["rad", "path"], + capture_output=True, text=True, timeout=5, + ) + if result.returncode == 0: + return Path(result.stdout.strip()) / "storage" + except Exception: + pass + return Path.home() / ".radicle" / "storage" + + +def _read_refs(storage: Path, rid: str) -> Dict[str, str]: + """Read current git refs from radicle storage for a repo. + + Returns {ref_name: sha} or empty dict if the repo isn't in storage yet. + """ + rid_hash = rid.removeprefix("rad:") + repo_path = storage / rid_hash + if not repo_path.exists(): + return {} + try: + result = subprocess.run( + ["git", "show-ref"], + cwd=repo_path, + capture_output=True, text=True, timeout=10, + ) + refs: Dict[str, str] = {} + for line in result.stdout.splitlines(): + parts = line.split(maxsplit=1) + if len(parts) == 2: + refs[parts[1]] = parts[0] + return refs + except Exception: + return {} + + +class GossipRelay: + """Watches Radicle refs and notifies peers over RNS when they change. + + Designed to run alongside the TCP bridge. The bridge carries the actual + git pack data; this relay only sends tiny "go fetch" signals. + """ + + def __init__( + self, + identity: RadicleIdentity, + rids: List[str], + storage: Optional[Path] = None, + radicle_nid: Optional[str] = None, + bridge_port: int = 8777, + poll_interval: int = DEFAULT_POLL_INTERVAL, + announce_retry_delays: Tuple[int, ...] = (5, 15, 30), + config_path: Optional[str] = None, + ): + """ + Args: + identity: RNS/Radicle identity for this relay. + rids: List of Radicle repository IDs to watch (e.g. 'rad:z3abc...'). + storage: Path to radicle storage dir. Auto-detected if None. + radicle_nid: Local radicle NID to advertise to peers. + bridge_port: TCP port the bridge listens on (for rad node connect). + poll_interval: Seconds between ref polls. + announce_retry_delays: Startup re-announce delays (seconds). + config_path: Reticulum config path (None = default). + """ + self.identity = identity + self.rids = list(rids) + self.storage = storage or _radicle_storage_path() + self.radicle_nid = radicle_nid + self.bridge_port = bridge_port + self.poll_interval = poll_interval + self.announce_retry_delays = announce_retry_delays + + self.reticulum = RNS.Reticulum(config_path) + + self.destination = RNS.Destination( + identity.rns_identity, + RNS.Destination.IN, + RNS.Destination.SINGLE, + APP_NAME, + ASPECT_GOSSIP, + ) + self.destination.set_packet_callback(self._on_packet) + + self._known_peers: Dict[bytes, float] = {} # dest_hash -> last_seen + self._peers_lock = threading.Lock() + self._known_refs: Dict[str, Dict[str, str]] = {} # rid -> refs + self._refs_lock = threading.Lock() + self._running = False + + self._on_sync_triggered: Optional[Callable[[str, str], None]] = None + + # ── Lifecycle ──────────────────────────────────────────────────────────── + + def start(self): + """Start the relay: announce, begin polling, register announce handler.""" + self._running = True + RNS.Transport.register_announce_handler(self._on_announce) + self.announce() + threading.Thread(target=self._startup_announce_loop, daemon=True).start() + threading.Thread(target=self._poll_loop, daemon=True).start() + RNS.log(f"Gossip relay started: {self.destination.hexhash}", RNS.LOG_INFO) + RNS.log( + f" Watching {len(self.rids)} repo(s), polling every {self.poll_interval}s", + RNS.LOG_INFO, + ) + + def stop(self): + """Stop the relay.""" + self._running = False + RNS.log("Gossip relay stopped", RNS.LOG_INFO) + + # ── Public API ─────────────────────────────────────────────────────────── + + def announce(self): + """Announce this relay on the RNS network.""" + app_data = GOSSIP_MAGIC + if self.radicle_nid: + nid_bytes = self.radicle_nid.encode() + app_data += struct.pack("!H", len(nid_bytes)) + nid_bytes + self.destination.announce(app_data=app_data) + RNS.log(f"Gossip relay announced: {self.destination.hexhash}", RNS.LOG_DEBUG) + + def set_on_sync_triggered(self, callback: Callable[[str, str], None]): + """Set callback invoked when an incoming gossip message triggers a sync. + + Callback signature: callback(rid: str, nid: str) + """ + self._on_sync_triggered = callback + + def push_refs_now(self, rid: str): + """Immediately broadcast current refs for a repo to all known peers.""" + refs = _read_refs(self.storage, rid) + if refs: + with self._refs_lock: + self._known_refs[rid] = refs + self._broadcast(rid, refs) + + def get_stats(self) -> dict: + with self._peers_lock: + peer_count = len(self._known_peers) + with self._refs_lock: + refs_per_repo = {rid: len(r) for rid, r in self._known_refs.items()} + return { + "known_peers": peer_count, + "watched_repos": len(self.rids), + "refs_per_repo": refs_per_repo, + } + + # ── Internal: polling ──────────────────────────────────────────────────── + + def _startup_announce_loop(self): + for delay in self.announce_retry_delays: + time.sleep(delay) + if not self._running: + return + self.announce() + + def _poll_loop(self): + while self._running: + for rid in self.rids: + try: + refs = _read_refs(self.storage, rid) + with self._refs_lock: + old = self._known_refs.get(rid) + changed = bool(refs and refs != old) + first_poll = old is None + if changed: + self._known_refs[rid] = refs + if changed and not first_poll: + self._broadcast(rid, refs) + except Exception as e: + RNS.log(f"Gossip poll error ({rid[:20]}): {e}", RNS.LOG_WARNING) + + # Interruptible sleep + for _ in range(self.poll_interval): + if not self._running: + return + time.sleep(1) + + # ── Internal: sending ──────────────────────────────────────────────────── + + def _broadcast(self, rid: str, refs: Dict[str, str]): + payload = json.dumps({ + "type": "refs", + "rid": rid, + "nid": self.radicle_nid or "", + "refs": refs, + }).encode() + + with self._peers_lock: + peers = list(self._known_peers.keys()) + + sent = sum(1 for h in peers if self._send_packet(h, payload)) + RNS.log(f"Broadcast refs for {rid[:20]}... → {sent}/{len(peers)} peers", RNS.LOG_INFO) + + def _send_packet(self, peer_hash: bytes, payload: bytes) -> bool: + try: + if not RNS.Transport.has_path(peer_hash): + RNS.Transport.request_path(peer_hash) + deadline = time.time() + PATH_REQUEST_TIMEOUT + while not RNS.Transport.has_path(peer_hash): + if time.time() > deadline: + RNS.log( + f"No path to gossip peer {peer_hash.hex()[:16]}", + RNS.LOG_WARNING, + ) + return False + time.sleep(0.2) + + peer_identity = RNS.Identity.recall(peer_hash) + if peer_identity is None: + RNS.log( + f"Identity not known for {peer_hash.hex()[:16]}", + RNS.LOG_WARNING, + ) + return False + + dest = RNS.Destination( + peer_identity, + RNS.Destination.OUT, + RNS.Destination.SINGLE, + APP_NAME, + ASPECT_GOSSIP, + ) + RNS.Packet(dest, payload).send() + return True + except Exception as e: + RNS.log(f"Gossip send error: {e}", RNS.LOG_WARNING) + return False + + # ── Internal: receiving ────────────────────────────────────────────────── + + def _on_packet(self, data: bytes, packet: RNS.Packet): + try: + msg = json.loads(data.decode()) + except Exception: + return + + if msg.get("type") != "refs": + return + + rid: str = msg.get("rid", "") + nid: str = msg.get("nid", "") + remote_refs: Dict[str, str] = msg.get("refs", {}) + + if not rid or not remote_refs: + return + + with self._refs_lock: + local_refs = self._known_refs.get(rid, {}) + changed = any(remote_refs.get(r) != local_refs.get(r) for r in remote_refs) + + if changed: + RNS.log( + f"Gossip: new refs for {rid[:20]}... from {nid[:24] if nid else 'unknown'}", + RNS.LOG_INFO, + ) + threading.Thread( + target=self._trigger_sync, + args=(rid, nid), + daemon=True, + ).start() + + def _trigger_sync(self, rid: str, nid: str): + """Run rad node connect (if needed) then rad sync --fetch.""" + if nid: + subprocess.run( + ["rad", "node", "connect", f"{nid}@127.0.0.1:{self.bridge_port}"], + capture_output=True, timeout=15, + ) + + result = subprocess.run( + ["rad", "sync", "--fetch", "--rid", rid], + capture_output=True, text=True, timeout=120, + ) + + if result.returncode == 0: + RNS.log(f"Sync succeeded: {rid[:20]}", RNS.LOG_INFO) + else: + stderr = result.stderr.strip() + RNS.log( + f"Sync failed for {rid[:20]}: {stderr[:120] if stderr else '(no output)'}", + RNS.LOG_WARNING, + ) + + if self._on_sync_triggered: + self._on_sync_triggered(rid, nid) + + # ── Internal: peer discovery ───────────────────────────────────────────── + + def _on_announce( + self, + destination_hash: bytes, + announced_identity: RNS.Identity, + app_data: Optional[bytes], + ): + if destination_hash == self.destination.hash: + return + if not app_data or not app_data.startswith(GOSSIP_MAGIC): + return + + radicle_nid: Optional[str] = None + if len(app_data) > len(GOSSIP_MAGIC): + try: + offset = len(GOSSIP_MAGIC) + nid_len = struct.unpack("!H", app_data[offset:offset + 2])[0] + raw = app_data[offset + 2: offset + 2 + nid_len] + radicle_nid = raw.decode() or None + except Exception: + pass + + with self._peers_lock: + is_new = destination_hash not in self._known_peers + self._known_peers[destination_hash] = time.time() + + if is_new: + RNS.log( + f"Discovered gossip peer: {destination_hash.hex()[:16]}" + + (f" (NID: {radicle_nid[:32]})" if radicle_nid else ""), + RNS.LOG_INFO, + ) + # Send our current refs so the peer knows our state immediately + for rid in self.rids: + with self._refs_lock: + refs = self._known_refs.get(rid) + if refs: + payload = json.dumps({ + "type": "refs", + "rid": rid, + "nid": self.radicle_nid or "", + "refs": refs, + }).encode() + self._send_packet(destination_hash, payload) diff --git a/src/radicle_reticulum/qr.py b/src/radicle_reticulum/qr.py deleted file mode 100644 index d521efa..0000000 --- a/src/radicle_reticulum/qr.py +++ /dev/null @@ -1,191 +0,0 @@ -"""QR code encoding/decoding for Radicle bundles. - -Enables visual transfer of tiny bundles (≤2953 bytes) without any network -connection — useful for air-gapped machines, paper backups, or "dead drop" -workflows where two people briefly see each other's screens. - -Requires the optional 'qrcode' package: - pip install radicle-reticulum[qr] # or: pip install qrcode -""" - -import base64 -import hashlib -from pathlib import Path -from typing import Optional - -from radicle_reticulum.git_bundle import GitBundle - -# QR code capacity: version 40, binary mode, error correction L -QR_MAX_BYTES = 2953 - -# Magic header to identify Radicle QR payloads (8 bytes) -QR_MAGIC = b"RADQR\x01\x00\x00" - - -class BundleTooLargeForQR(ValueError): - """Raised when a bundle exceeds QR capacity.""" - pass - - -def encode_bundle_to_qr( - bundle: GitBundle, - output_path: Optional[Path] = None, - error_correction: str = "L", -) -> str: - """Encode a GitBundle as a QR code. - - The bundle data is base64-encoded and wrapped with a magic prefix + SHA-256 - checksum so the receiver can verify integrity after scanning. - - Args: - bundle: The GitBundle to encode. Must be ≤ QR_MAX_BYTES when serialised. - output_path: If given, write a PNG image to this path (requires qrcode[pil]). - If None, returns terminal-printable ASCII art. - error_correction: QR error correction level: L, M, Q, or H (default L - gives maximum data capacity). - - Returns: - ASCII art string for terminal display (always), and also writes PNG if - output_path is specified. - - Raises: - BundleTooLargeForQR: If the serialised bundle exceeds 2953 bytes. - ImportError: If the 'qrcode' package is not installed. - """ - try: - import qrcode - from qrcode.constants import ERROR_CORRECT_L, ERROR_CORRECT_M, ERROR_CORRECT_Q, ERROR_CORRECT_H - except ImportError as e: - raise ImportError( - "The 'qrcode' package is required for QR encoding. " - "Install it with: pip install 'radicle-reticulum[qr]' or pip install qrcode" - ) from e - - ec_map = { - "L": ERROR_CORRECT_L, - "M": ERROR_CORRECT_M, - "Q": ERROR_CORRECT_Q, - "H": ERROR_CORRECT_H, - } - ec = ec_map.get(error_correction.upper(), ERROR_CORRECT_L) - - bundle_bytes = bundle.encode() - if len(bundle_bytes) > QR_MAX_BYTES: - raise BundleTooLargeForQR( - f"Bundle is {len(bundle_bytes)} bytes, exceeds QR capacity of {QR_MAX_BYTES} bytes. " - "Use a different sync strategy for larger bundles." - ) - - # Payload: magic + 4-byte length + checksum (32 bytes) + bundle data - checksum = hashlib.sha256(bundle_bytes).digest() - length_prefix = len(bundle_bytes).to_bytes(4, "big") - payload = QR_MAGIC + length_prefix + checksum + bundle_bytes - - qr = qrcode.QRCode( - version=None, # auto-select - error_correction=ec, - box_size=10, - border=4, - ) - qr.add_data(payload) - qr.make(fit=True) - - if output_path is not None: - try: - from PIL import Image - img = qr.make_image(fill_color="black", back_color="white") - img.save(str(output_path)) - except ImportError: - raise ImportError( - "Saving QR images requires Pillow. " - "Install it with: pip install 'qrcode[pil]'" - ) - - # Always return ASCII art for terminal - import io - buf = io.StringIO() - qr.print_ascii(out=buf, invert=True) - return buf.getvalue() - - -def decode_bundle_from_qr_data(data: bytes) -> GitBundle: - """Decode a GitBundle from raw QR payload bytes (as scanned from a QR code). - - This is the inverse of encode_bundle_to_qr. The data should be the raw - bytes scanned from the QR code. - - Args: - data: Raw bytes decoded from a QR code scan. - - Returns: - The decoded GitBundle. - - Raises: - ValueError: If the data is not a valid Radicle QR payload. - """ - if not data.startswith(QR_MAGIC): - raise ValueError( - f"Not a Radicle QR payload (expected magic {QR_MAGIC!r}, " - f"got {data[:len(QR_MAGIC)]!r})" - ) - - offset = len(QR_MAGIC) - bundle_len = int.from_bytes(data[offset:offset + 4], "big") - offset += 4 - - stored_checksum = data[offset:offset + 32] - offset += 32 - - bundle_bytes = data[offset:offset + bundle_len] - if len(bundle_bytes) != bundle_len: - raise ValueError( - f"Truncated QR payload: expected {bundle_len} bytes, got {len(bundle_bytes)}" - ) - - actual_checksum = hashlib.sha256(bundle_bytes).digest() - if actual_checksum != stored_checksum: - raise ValueError( - f"QR payload checksum mismatch: data may be corrupted" - ) - - return GitBundle.decode(bundle_bytes) - - -def decode_bundle_from_qr_image(image_path: Path) -> GitBundle: - """Decode a GitBundle from a QR code image file. - - Requires pyzbar and Pillow: - pip install pyzbar Pillow - - Args: - image_path: Path to the QR code image (PNG, JPG, etc.) - - Returns: - The decoded GitBundle. - - Raises: - ImportError: If pyzbar or Pillow are not installed. - ValueError: If no valid Radicle QR code is found in the image. - """ - try: - from pyzbar.pyzbar import decode as pyzbar_decode - from PIL import Image - except ImportError as e: - raise ImportError( - "Decoding QR images requires pyzbar and Pillow. " - "Install them with: pip install pyzbar Pillow" - ) from e - - image = Image.open(str(image_path)) - decoded_objects = pyzbar_decode(image) - - for obj in decoded_objects: - try: - return decode_bundle_from_qr_data(obj.data) - except ValueError: - continue - - raise ValueError( - f"No valid Radicle QR code found in {image_path}. " - "Make sure the image contains a QR code created by 'radicle-rns bundle qr-encode'." - ) diff --git a/src/radicle_reticulum/seed.py b/src/radicle_reticulum/seed.py new file mode 100644 index 0000000..bb53c58 --- /dev/null +++ b/src/radicle_reticulum/seed.py @@ -0,0 +1,174 @@ +"""Radicle seed node manager. + +Starts and manages a dedicated radicle-node configured as a seed server +(storage.policy = allow). The seed runs under its own RAD_HOME so it +doesn't interfere with the user's own radicle identity. + +Users add the seed as a peer once: + rad node connect @127.0.0.1: + +The bridge then connects this seed to remote seeds over Reticulum. +""" + +import json +import os +import socket +import subprocess +import time +from pathlib import Path +from typing import Optional + + +DEFAULT_SEED_HOME = Path.home() / ".radicle-rns" / "seed" +DEFAULT_SEED_PORT = 8779 + + +SEED_CONFIG = { + "node": { + "alias": "radicle-rns-seed", + "listen": [], # filled in by write_config() + "connect": [], + "externalAddresses": [], + "seedingPolicy": { + "default": "allow", + "repos": {} + }, + "db": {"journalMode": "wal"}, + } +} + + +class SeedNode: + """Manages a dedicated radicle-node process configured as a seed.""" + + def __init__( + self, + seed_home: Path = DEFAULT_SEED_HOME, + port: int = DEFAULT_SEED_PORT, + ): + self.seed_home = Path(seed_home) + self.port = port + self._process: Optional[subprocess.Popen] = None + + # ── Setup ──────────────────────────────────────────────────────────────── + + def is_initialized(self) -> bool: + """Return True if rad auth has been run for this seed home.""" + try: + result = subprocess.run( + ["rad", "self"], + env=self._env(), + capture_output=True, + text=True, + timeout=5, + ) + return result.returncode == 0 and "NID" in result.stdout + except Exception: + return False + + def write_config(self): + """Write config.json only if it does not already exist.""" + self.seed_home.mkdir(parents=True, exist_ok=True) + config_path = self.seed_home / "config.json" + if config_path.exists(): + return + config = json.loads(json.dumps(SEED_CONFIG)) # deep copy + config["node"]["listen"] = [f"127.0.0.1:{self.port}"] + config_path.write_text(json.dumps(config, indent=2)) + + # ── Lifecycle ───────────────────────────────────────────────────────────── + + def start(self): + """Start radicle-node for the seed. Raises if not initialized.""" + if not self.is_initialized(): + raise RuntimeError( + f"Seed identity not found. Initialize it with:\n" + f" RAD_HOME={self.seed_home} rad auth\n" + f"Then run 'radicle-rns seed' again." + ) + + self.write_config() + + # Discard stdout/stderr — if kept as PIPE the buffer fills and the + # process deadlocks once radicle-node produces more than ~64 KB of logs. + self._process = subprocess.Popen( + ["radicle-node"], + env=self._env(), + stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL, + ) + + # Wait up to 10s for the port to open. + deadline = time.time() + 10 + while time.time() < deadline: + if self._process.poll() is not None: + raise RuntimeError( + f"Seed radicle-node exited immediately after launch. " + f"Check: RAD_HOME={self.seed_home} radicle-node" + ) + if self._port_open(): + return + time.sleep(0.3) + + if self._process.poll() is not None: + raise RuntimeError("Seed radicle-node exited before port opened.") + + # Still running but port not open — may be slow on first run; continue. + print(f"Warning: seed port {self.port} not yet open after 10s, continuing anyway.") + + def stop(self): + """Stop the seed radicle-node.""" + if self._process and self._process.poll() is None: + self._process.terminate() + try: + self._process.wait(timeout=5) + except subprocess.TimeoutExpired: + self._process.kill() + self._process.wait() # reap zombie + + def is_running(self) -> bool: + return self._process is not None and self._process.poll() is None + + # ── Queries ─────────────────────────────────────────────────────────────── + + def get_nid(self) -> Optional[str]: + """Return the seed's radicle NID (z6Mk...).""" + result = subprocess.run( + ["rad", "self"], + env=self._env(), + capture_output=True, + text=True, + timeout=5, + ) + if result.returncode != 0: + return None + for line in result.stdout.splitlines(): + parts = line.split() + if len(parts) >= 2 and parts[0] == "NID": + return parts[1] + return None + + def connect_peer(self, nid: str, addr: str) -> bool: + """Tell the seed to connect to a remote peer.""" + result = subprocess.run( + ["rad", "node", "connect", f"{nid}@{addr}"], + env=self._env(), + capture_output=True, + text=True, + timeout=30, + ) + return result.returncode == 0 + + # ── Internal ────────────────────────────────────────────────────────────── + + def _env(self) -> dict: + env = os.environ.copy() + env["RAD_HOME"] = str(self.seed_home) + return env + + def _port_open(self) -> bool: + try: + with socket.create_connection(("127.0.0.1", self.port), timeout=0.5): + return True + except OSError: + return False diff --git a/src/radicle_reticulum/sync.py b/src/radicle_reticulum/sync.py deleted file mode 100644 index 5faee9f..0000000 --- a/src/radicle_reticulum/sync.py +++ /dev/null @@ -1,770 +0,0 @@ -"""Sync manager for Radicle repositories over Reticulum. - -Handles repository synchronization using: -- Full bundles for initial clone or fast networks -- Incremental bundles for updates over low-bandwidth links (LoRa) -- LXMF store-and-forward for offline peers -""" - -import os -import threading -import time -import hashlib -from dataclasses import dataclass, field -from enum import Enum -from pathlib import Path -from typing import Callable, Dict, List, Optional, Set, Tuple - -import RNS -import LXMF - -from radicle_reticulum.identity import RadicleIdentity -from radicle_reticulum.git_bundle import ( - GitBundle, - GitBundleGenerator, - GitBundleApplicator, - BundleMetadata, - BundleType, - RADICLE_REF_PATTERNS, - estimate_bundle_size, -) - - -# LXMF content type for Radicle bundles -CONTENT_TYPE_BUNDLE = 0x52 # 'R' for Radicle — complete bundle -CONTENT_TYPE_BUNDLE_CHUNK = 0x53 # chunked fragment of a large bundle -CONTENT_TYPE_BUNDLE_REQUEST = 0x55 -CONTENT_TYPE_REFS_ANNOUNCE = 0x54 - -# Chunk header layout: bundle_id(16) + chunk_num(2) + total_chunks(2) = 20 bytes -CHUNK_HEADER_SIZE = 20 - -# Maximum LXMF message size (configurable, default ~500KB for LoRa-friendly chunks) -DEFAULT_MAX_LXMF_SIZE = 500 * 1024 - -# For LoRa, much smaller chunks -LORA_MAX_LXMF_SIZE = 32 * 1024 - - -class SyncMode(Enum): - """Sync mode based on transport capability.""" - FULL = "full" # Fast network - send full bundles - INCREMENTAL = "incremental" # Slow network - send only changes - AUTO = "auto" # Detect based on link quality - - -@dataclass -class RepoSyncState: - """Tracks sync state for a repository.""" - repository_id: str - local_path: Path - known_peers: Dict[str, Dict[str, str]] = field(default_factory=dict) # peer_did -> {ref: sha} - last_sync: Dict[str, float] = field(default_factory=dict) # peer_did -> timestamp - pending_bundles: List[bytes] = field(default_factory=list) - - -@dataclass -class RefsAnnouncement: - """Announces current ref state for a repository.""" - repository_id: str - node_id: str - refs: Dict[str, str] # {ref_name: commit_sha} - timestamp: int - - def encode(self) -> bytes: - """Encode to bytes for LXMF transport.""" - import struct - - repo_bytes = self.repository_id.encode("utf-8") - node_bytes = self.node_id.encode("utf-8") - - refs_data = b"" - for ref, sha in self.refs.items(): - ref_bytes = ref.encode("utf-8") - sha_bytes = sha.encode("utf-8") - refs_data += struct.pack(f"!H{len(ref_bytes)}sH{len(sha_bytes)}s", - len(ref_bytes), ref_bytes, - len(sha_bytes), sha_bytes) - - return struct.pack( - f"!H{len(repo_bytes)}sH{len(node_bytes)}sQH", - len(repo_bytes), repo_bytes, - len(node_bytes), node_bytes, - self.timestamp, - len(self.refs), - ) + refs_data - - @classmethod - def decode(cls, data: bytes) -> "RefsAnnouncement": - """Decode from bytes.""" - import struct - offset = 0 - - repo_len = struct.unpack("!H", data[offset:offset+2])[0] - offset += 2 - repository_id = data[offset:offset+repo_len].decode("utf-8") - offset += repo_len - - node_len = struct.unpack("!H", data[offset:offset+2])[0] - offset += 2 - node_id = data[offset:offset+node_len].decode("utf-8") - offset += node_len - - timestamp, refs_count = struct.unpack("!QH", data[offset:offset+10]) - offset += 10 - - refs = {} - for _ in range(refs_count): - ref_len = struct.unpack("!H", data[offset:offset+2])[0] - offset += 2 - ref = data[offset:offset+ref_len].decode("utf-8") - offset += ref_len - - sha_len = struct.unpack("!H", data[offset:offset+2])[0] - offset += 2 - sha = data[offset:offset+sha_len].decode("utf-8") - offset += sha_len - - refs[ref] = sha - - return cls( - repository_id=repository_id, - node_id=node_id, - refs=refs, - timestamp=timestamp, - ) - - -class SyncManager: - """Manages repository synchronization over Reticulum. - - Features: - - Full bundle sync for initial clone / fast networks - - Incremental sync for LoRa / low-bandwidth - - LXMF store-and-forward for offline peers - - Automatic chunking for large bundles - """ - - def __init__( - self, - identity: RadicleIdentity, - storage_path: Optional[Path] = None, - max_bundle_size: int = DEFAULT_MAX_LXMF_SIZE, - auto_push: bool = False, - ): - """Initialize sync manager. - - Args: - identity: Local node identity - storage_path: Path for storing sync state and pending bundles - max_bundle_size: Maximum bundle size before chunking - auto_push: If True, speculatively push incremental bundles to peers - when we receive their RefsAnnouncement and we have newer data. - Eliminates the request round-trip for common sync patterns. - """ - self.identity = identity - self.max_bundle_size = max_bundle_size - self.auto_push = auto_push - - # Storage for sync state - if storage_path is None: - storage_path = Path.home() / ".radicle-rns" - self.storage_path = Path(storage_path) - self.storage_path.mkdir(parents=True, exist_ok=True) - - # Repository tracking - self._repos: Dict[str, RepoSyncState] = {} - self._repos_lock = threading.Lock() - - # LXMF setup - self._lxmf_router: Optional[LXMF.LXMRouter] = None - self._lxmf_destination: Optional[LXMF.LXMFDeliveryDestination] = None - - # Known peers: lxmf_source_hash (bytes) -> last_seen (float) - self._known_peers: Dict[bytes, float] = {} - self._peers_lock = threading.Lock() - - # Chunk reassembly buffers: bundle_id (16 bytes) -> {chunk_num: data} - self._chunk_buffers: Dict[bytes, Dict[int, bytes]] = {} - self._chunk_totals: Dict[bytes, int] = {} # bundle_id -> total_chunks - self._chunk_lock = threading.Lock() - - # Callbacks - self._on_bundle_received: Optional[Callable[[GitBundle], None]] = None - self._on_refs_announced: Optional[Callable[[RefsAnnouncement], None]] = None - - def start(self, reticulum: Optional[RNS.Reticulum] = None): - """Start the sync manager and LXMF router.""" - if reticulum is None: - reticulum = RNS.Reticulum() - - # Create LXMF router for store-and-forward - self._lxmf_router = LXMF.LXMRouter( - identity=self.identity.rns_identity, - storagepath=str(self.storage_path / "lxmf"), - ) - - # Create our LXMF destination - self._lxmf_destination = self._lxmf_router.register_delivery_identity( - self.identity.rns_identity, - display_name=f"Radicle Node {self.identity.rns_hash_hex[:8]}", - ) - self._lxmf_destination.set_delivery_callback(self._on_lxmf_delivery) - - RNS.log(f"Sync manager started", RNS.LOG_INFO) - RNS.log(f" LXMF address: {self._lxmf_destination.hash.hex()}", RNS.LOG_INFO) - - def stop(self): - """Stop the sync manager.""" - if self._lxmf_router: - # LXMF router cleanup - pass - RNS.log("Sync manager stopped", RNS.LOG_INFO) - - def register_peer(self, destination_hash: bytes): - """Register a known peer by their LXMF source hash. - - Peers are also learned automatically from incoming messages. - """ - with self._peers_lock: - self._known_peers[destination_hash] = time.time() - RNS.log(f"Registered peer: {destination_hash.hex()}", RNS.LOG_DEBUG) - - def get_known_peers(self) -> List[bytes]: - """Return list of known peer hashes.""" - with self._peers_lock: - return list(self._known_peers.keys()) - - def _send_lxmf_message( - self, - destination_hash: bytes, - content_type: int, - content: bytes, - propagate: bool = True, - ) -> bool: - """Send an LXMF message to a destination hash. - - Args: - destination_hash: RNS identity hash of the recipient. - content_type: Radicle content type constant. - content: Message payload bytes. - propagate: Use LXMF propagation (store-and-forward) if True. - - Returns: - True if the message was queued successfully. - """ - if self._lxmf_router is None or self._lxmf_destination is None: - RNS.log("Sync manager not started, cannot send LXMF message", RNS.LOG_WARNING) - return False - - try: - destination_identity = RNS.Identity.recall(destination_hash) - if destination_identity is None: - RNS.log(f"Unknown peer identity: {destination_hash.hex()}", RNS.LOG_WARNING) - return False - - lxmf_dest = RNS.Destination( - destination_identity, - RNS.Destination.OUT, - RNS.Destination.SINGLE, - "lxmf", - "delivery", - ) - - message = LXMF.LXMessage( - lxmf_dest, - self._lxmf_destination, - content, - desired_method=( - LXMF.LXMessage.PROPAGATED if propagate else LXMF.LXMessage.DIRECT - ), - ) - message.fields[LXMF.FIELD_CUSTOM_TYPE] = content_type - self._lxmf_router.handle_outbound(message) - return True - - except Exception as e: - RNS.log(f"Failed to send LXMF message: {e}", RNS.LOG_ERROR) - return False - - def register_repository(self, repository_id: str, local_path: Path) -> RepoSyncState: - """Register a repository for syncing. - - Args: - repository_id: Radicle repo ID (rad:z...) - local_path: Path to local Git repository - """ - with self._repos_lock: - if repository_id in self._repos: - return self._repos[repository_id] - - state = RepoSyncState( - repository_id=repository_id, - local_path=Path(local_path), - ) - self._repos[repository_id] = state - - RNS.log(f"Registered repository: {repository_id}", RNS.LOG_INFO) - return state - - def announce_refs(self, repository_id: str) -> bool: - """Announce current refs for a repository. - - This allows peers to determine if they need updates. - """ - with self._repos_lock: - if repository_id not in self._repos: - return False - state = self._repos[repository_id] - - try: - generator = GitBundleGenerator(state.local_path) - refs = generator.get_refs() - - announcement = RefsAnnouncement( - repository_id=repository_id, - node_id=self.identity.did, - refs=refs, - timestamp=int(time.time() * 1000), - ) - ann_bytes = announcement.encode() - - # Broadcast to all known peers via LXMF (store-and-forward) - peers = self.get_known_peers() - if not peers: - RNS.log( - f"Refs announcement: {repository_id} ({len(refs)} refs) — no peers known", - RNS.LOG_DEBUG, - ) - return True - - sent = sum( - 1 for peer_hash in peers - if self._send_lxmf_message( - peer_hash, CONTENT_TYPE_REFS_ANNOUNCE, ann_bytes, propagate=True - ) - ) - RNS.log( - f"Refs announcement: {repository_id} ({len(refs)} refs) sent to {sent}/{len(peers)} peers", - RNS.LOG_INFO, - ) - return True - except Exception as e: - RNS.log(f"Failed to announce refs: {e}", RNS.LOG_ERROR) - return False - - def create_sync_bundle( - self, - repository_id: str, - peer_refs: Optional[Dict[str, str]] = None, - mode: SyncMode = SyncMode.AUTO, - ) -> Optional[GitBundle]: - """Create a bundle for syncing to a peer. - - Args: - repository_id: Repository to sync - peer_refs: Known refs at peer (for incremental sync) - mode: Sync mode (full, incremental, or auto) - - Returns: - GitBundle ready for transport, or None if no changes - """ - with self._repos_lock: - if repository_id not in self._repos: - raise ValueError(f"Repository not registered: {repository_id}") - state = self._repos[repository_id] - - generator = GitBundleGenerator(state.local_path) - - # Decide sync mode - if mode == SyncMode.AUTO: - if peer_refs: - # Have peer state, can do incremental - mode = SyncMode.INCREMENTAL - else: - mode = SyncMode.FULL - - if mode == SyncMode.FULL: - return generator.create_full_bundle( - repository_id=repository_id, - source_node=self.identity.did, - ) - else: - if peer_refs is None: - peer_refs = {} - return generator.create_incremental_bundle( - repository_id=repository_id, - source_node=self.identity.did, - basis_refs=peer_refs, - ) - - def send_bundle( - self, - bundle: GitBundle, - destination_hash: bytes, - propagate: bool = True, - ) -> bool: - """Send a bundle to a peer via LXMF. - - Args: - bundle: The bundle to send - destination_hash: RNS destination hash of recipient - propagate: If True, use LXMF propagation for offline delivery - """ - if self._lxmf_router is None: - raise RuntimeError("Sync manager not started") - - try: - # Encode bundle for transport - bundle_data = bundle.encode() - - # Check if we need to chunk - if len(bundle_data) > self.max_bundle_size: - return self._send_chunked_bundle(bundle_data, destination_hash, propagate) - - # Create LXMF message - destination_identity = RNS.Identity.recall(destination_hash) - if destination_identity is None: - RNS.log(f"Unknown destination: {destination_hash.hex()}", RNS.LOG_WARNING) - return False - - lxmf_dest = RNS.Destination( - destination_identity, - RNS.Destination.OUT, - RNS.Destination.SINGLE, - "lxmf", - "delivery", - ) - - message = LXMF.LXMessage( - lxmf_dest, - self._lxmf_destination, - bundle_data, - desired_method=LXMF.LXMessage.PROPAGATED if propagate else LXMF.LXMessage.DIRECT, - ) - message.fields[LXMF.FIELD_CUSTOM_TYPE] = CONTENT_TYPE_BUNDLE - - self._lxmf_router.handle_outbound(message) - RNS.log(f"Sent bundle ({len(bundle_data)} bytes) to {destination_hash.hex()}", RNS.LOG_INFO) - return True - - except Exception as e: - RNS.log(f"Failed to send bundle: {e}", RNS.LOG_ERROR) - return False - - def _send_chunked_bundle( - self, - bundle_data: bytes, - destination_hash: bytes, - propagate: bool, - ) -> bool: - """Send a large bundle as sequential LXMF chunks. - - Each chunk has a 20-byte header: bundle_id(16) + chunk_num(2) + total(2). - The receiver reassembles chunks ordered by chunk_num. - """ - import struct - - chunk_size = self.max_bundle_size - CHUNK_HEADER_SIZE - total_chunks = (len(bundle_data) + chunk_size - 1) // chunk_size - bundle_id = hashlib.sha256(bundle_data).digest()[:16] - - RNS.log(f"Sending bundle in {total_chunks} chunks ({len(bundle_data)} bytes)", RNS.LOG_INFO) - - for i in range(total_chunks): - start = i * chunk_size - end = min(start + chunk_size, len(bundle_data)) - chunk_data = bundle_data[start:end] - - chunk_msg = struct.pack("!16sHH", bundle_id, i, total_chunks) + chunk_data - - if not self._send_lxmf_message( - destination_hash, CONTENT_TYPE_BUNDLE_CHUNK, chunk_msg, propagate - ): - RNS.log(f"Failed to send chunk {i+1}/{total_chunks}", RNS.LOG_WARNING) - return False - - RNS.log(f" Chunk {i+1}/{total_chunks} ({len(chunk_data)} bytes)", RNS.LOG_DEBUG) - - return True - - def apply_bundle(self, bundle: GitBundle) -> Dict[str, str]: - """Apply a received bundle to local repository. - - Returns dict of applied refs. - """ - with self._repos_lock: - repo_id = bundle.metadata.repository_id - if repo_id not in self._repos: - raise ValueError(f"Repository not registered: {repo_id}") - state = self._repos[repo_id] - - applicator = GitBundleApplicator(state.local_path) - - # Verify and apply - ok, msg = applicator.verify_bundle(bundle) - if not ok: - raise ValueError(f"Bundle verification failed: {msg}") - - applied_refs = applicator.apply_bundle(bundle) - - # Update known peer state - source_node = bundle.metadata.source_node - with self._repos_lock: - state.known_peers[source_node] = applied_refs - state.last_sync[source_node] = time.time() - - RNS.log(f"Applied bundle from {source_node}: {len(applied_refs)} refs", RNS.LOG_INFO) - return applied_refs - - def _on_lxmf_delivery(self, message: LXMF.LXMessage): - """Handle incoming LXMF message.""" - import struct - - # Learn the peer's address for future announcements - if hasattr(message, "source_hash") and message.source_hash: - with self._peers_lock: - self._known_peers[message.source_hash] = time.time() - - content_type = message.fields.get(LXMF.FIELD_CUSTOM_TYPE, 0) - - if content_type == CONTENT_TYPE_BUNDLE: - self._handle_bundle_message(message.content) - - elif content_type == CONTENT_TYPE_BUNDLE_CHUNK: - self._handle_chunk_message(message.content) - - elif content_type == CONTENT_TYPE_REFS_ANNOUNCE: - try: - announcement = RefsAnnouncement.decode(message.content) - RNS.log( - f"Received refs announcement for {announcement.repository_id}", - RNS.LOG_DEBUG, - ) - if self._on_refs_announced: - self._on_refs_announced(announcement) - if self.auto_push and message.source_hash: - self._maybe_push_to_peer(message.source_hash, announcement) - except Exception as e: - RNS.log(f"Failed to process refs announcement: {e}", RNS.LOG_ERROR) - - def _handle_bundle_message(self, content: bytes): - """Process a complete bundle payload.""" - try: - bundle = GitBundle.decode(content) - RNS.log(f"Received bundle for {bundle.metadata.repository_id}", RNS.LOG_INFO) - - if self._on_bundle_received: - self._on_bundle_received(bundle) - elif bundle.metadata.repository_id in self._repos: - self.apply_bundle(bundle) - except Exception as e: - RNS.log(f"Failed to process bundle: {e}", RNS.LOG_ERROR) - - def _handle_chunk_message(self, content: bytes): - """Reassemble a chunked bundle fragment.""" - import struct - - if len(content) < CHUNK_HEADER_SIZE: - RNS.log("Received malformed chunk (too short)", RNS.LOG_WARNING) - return - - bundle_id, chunk_num, total_chunks = struct.unpack("!16sHH", content[:CHUNK_HEADER_SIZE]) - chunk_data = content[CHUNK_HEADER_SIZE:] - - with self._chunk_lock: - if bundle_id not in self._chunk_buffers: - self._chunk_buffers[bundle_id] = {} - self._chunk_totals[bundle_id] = total_chunks - - self._chunk_buffers[bundle_id][chunk_num] = chunk_data - RNS.log( - f"Chunk {chunk_num+1}/{total_chunks} received for bundle {bundle_id.hex()[:8]}", - RNS.LOG_DEBUG, - ) - - if len(self._chunk_buffers[bundle_id]) == total_chunks: - # All chunks received — reassemble - ordered = [self._chunk_buffers[bundle_id][i] for i in range(total_chunks)] - bundle_data = b"".join(ordered) - del self._chunk_buffers[bundle_id] - del self._chunk_totals[bundle_id] - else: - return - - RNS.log(f"All {total_chunks} chunks received, reassembling bundle", RNS.LOG_INFO) - self._handle_bundle_message(bundle_data) - - # ------------------------------------------------------------------ - # Phase 4: speculative push - # ------------------------------------------------------------------ - - def _should_push_to_peer( - self, - our_refs: Dict[str, str], - peer_refs: Dict[str, str], - ) -> bool: - """Return True if we have commits the peer doesn't. - - True when any ref we hold differs from what the peer announced, - or when we have refs entirely absent from their announcement. - """ - for ref, sha in our_refs.items(): - if peer_refs.get(ref) != sha: - return True - return False - - def _maybe_push_to_peer( - self, - source_hash: bytes, - announcement: RefsAnnouncement, - ): - """Speculatively push an incremental bundle to a peer if we have newer data. - - Called when auto_push=True and we receive a RefsAnnouncement. We - compute an incremental bundle relative to the peer's announced refs. - If the repo produces a non-empty bundle we send it immediately — - eliminating the request round-trip that would otherwise be needed. - """ - with self._repos_lock: - state = self._repos.get(announcement.repository_id) - if state is None: - return - - try: - generator = GitBundleGenerator(state.local_path) - our_refs = generator.get_refs() - - if not self._should_push_to_peer(our_refs, announcement.refs): - RNS.log( - f"Speculative push skipped: peer {source_hash.hex()[:8]} " - f"is up-to-date for {announcement.repository_id}", - RNS.LOG_DEBUG, - ) - return - - bundle = generator.create_incremental_bundle( - repository_id=announcement.repository_id, - source_node=self.identity.did, - basis_refs=announcement.refs, - ) - if bundle is None: - return # git reported no changes despite differing refs - - bundle_data = bundle.encode() - if len(bundle_data) > self.max_bundle_size: - success = self._send_chunked_bundle(bundle_data, source_hash, propagate=True) - else: - success = self._send_lxmf_message( - source_hash, CONTENT_TYPE_BUNDLE, bundle_data, propagate=True - ) - - if success: - RNS.log( - f"Speculative push: sent {len(bundle_data)}-byte incremental bundle " - f"to {source_hash.hex()[:8]} for {announcement.repository_id}", - RNS.LOG_INFO, - ) - - except Exception as e: - RNS.log(f"Speculative push failed: {e}", RNS.LOG_WARNING) - - def set_on_bundle_received(self, callback: Callable[[GitBundle], None]): - """Set callback for received bundles.""" - self._on_bundle_received = callback - - def set_on_refs_announced(self, callback: Callable[[RefsAnnouncement], None]): - """Set callback for refs announcements.""" - self._on_refs_announced = callback - - def get_sync_status(self, repository_id: str) -> Optional[Dict]: - """Get sync status for a repository.""" - with self._repos_lock: - if repository_id not in self._repos: - return None - state = self._repos[repository_id] - - return { - "repository_id": repository_id, - "local_path": str(state.local_path), - "known_peers": len(state.known_peers), - "last_syncs": { - peer: time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(ts)) - for peer, ts in state.last_sync.items() - }, - } - - -def create_dead_drop_bundle( - repo_path: Path, - repository_id: str, - source_node: str, - output_path: Path, - incremental_basis: Optional[Dict[str, str]] = None, -) -> GitBundle: - """Create a bundle file for dead-drop transfer (USB, etc). - - Args: - repo_path: Path to Git repository - repository_id: Radicle repo ID - source_node: Source node DID - output_path: Where to write the bundle file - incremental_basis: Known refs for incremental bundle - - Returns: - The created GitBundle - """ - generator = GitBundleGenerator(repo_path) - - if incremental_basis: - bundle = generator.create_incremental_bundle( - repository_id=repository_id, - source_node=source_node, - basis_refs=incremental_basis, - output_path=output_path, - ) - else: - bundle = generator.create_full_bundle( - repository_id=repository_id, - source_node=source_node, - output_path=output_path, - ) - - if bundle is None: - raise ValueError("No changes to bundle") - - # Also save the full transport format - transport_path = output_path.with_suffix(".radicle-bundle") - transport_path.write_bytes(bundle.encode()) - - RNS.log(f"Created dead-drop bundle: {output_path}", RNS.LOG_INFO) - RNS.log(f" Type: {bundle.metadata.bundle_type.value}", RNS.LOG_INFO) - RNS.log(f" Size: {bundle.metadata.size_bytes} bytes", RNS.LOG_INFO) - RNS.log(f" Refs: {len(bundle.metadata.refs_included)}", RNS.LOG_INFO) - - return bundle - - -def apply_dead_drop_bundle( - bundle_path: Path, - repo_path: Path, -) -> Dict[str, str]: - """Apply a dead-drop bundle to a repository. - - Args: - bundle_path: Path to .radicle-bundle file - repo_path: Path to target Git repository - - Returns: - Dict of applied refs - """ - # Load bundle - bundle_data = bundle_path.read_bytes() - bundle = GitBundle.decode(bundle_data) - - # Apply - applicator = GitBundleApplicator(repo_path) - applied = applicator.apply_bundle(bundle) - - RNS.log(f"Applied dead-drop bundle: {len(applied)} refs", RNS.LOG_INFO) - return applied diff --git a/tests/test_adaptive.py b/tests/test_adaptive.py deleted file mode 100644 index 15a3443..0000000 --- a/tests/test_adaptive.py +++ /dev/null @@ -1,153 +0,0 @@ -"""Tests for adaptive sync strategy selection.""" - -import pytest -from radicle_reticulum.adaptive import ( - SyncStrategy, - LinkQuality, - estimate_transfer_time, - select_strategy, - THRESHOLD_FULL, - THRESHOLD_INCREMENTAL, - RTT_FAST, - RTT_MEDIUM, - RTT_SLOW, - QR_MAX_BYTES, -) - - -def make_quality( - rtt_ms: float = 50, - throughput_bps: float = 1_000_000, - is_lora: bool = False, - strategy: SyncStrategy = SyncStrategy.FULL, -) -> LinkQuality: - return LinkQuality( - rtt_ms=rtt_ms, - throughput_bps=throughput_bps, - packet_loss=0.0, - is_lora=is_lora, - strategy=strategy, - ) - - -class TestLinkQuality: - def test_throughput_kbps(self): - q = make_quality(throughput_bps=50_000) - assert q.throughput_kbps == pytest.approx(50.0) - - def test_repr(self): - q = make_quality(rtt_ms=100, throughput_bps=1000) - r = repr(q) - assert "100ms" in r - assert "1.0Kbps" in r - - -class TestEstimateTransferTime: - def test_zero_throughput_is_infinite(self): - q = make_quality(throughput_bps=0) - assert estimate_transfer_time(1024, q) == float("inf") - - def test_known_value(self): - # 1 MB at 1 Mbps effective = 8s, but 80% efficiency → 10s - q = make_quality(throughput_bps=1_000_000) - t = estimate_transfer_time(1_000_000, q) - assert t == pytest.approx(10.0, rel=0.01) - - def test_larger_file_takes_longer(self): - q = make_quality(throughput_bps=10_000) - t_small = estimate_transfer_time(1_000, q) - t_large = estimate_transfer_time(100_000, q) - assert t_large > t_small - - -class TestSelectStrategy: - def test_fast_link_small_repo_uses_full(self): - q = make_quality(rtt_ms=10, throughput_bps=10_000_000, strategy=SyncStrategy.FULL) - strategy, reason = select_strategy( - bundle_size=100_000, - incremental_size=None, - quality=q, - ) - assert strategy == SyncStrategy.FULL - assert "full" in reason.lower() or "fast" in reason.lower() - - def test_lora_link_prefers_incremental_when_viable(self): - q = make_quality(rtt_ms=5000, throughput_bps=2400, is_lora=True, strategy=SyncStrategy.MINIMAL) - strategy, reason = select_strategy( - bundle_size=500_000, - incremental_size=5_000, - quality=q, - max_transfer_time=3600, - ) - assert strategy == SyncStrategy.INCREMENTAL - - def test_lora_link_falls_back_to_minimal_when_too_large(self): - q = make_quality(rtt_ms=5000, throughput_bps=100, is_lora=True, strategy=SyncStrategy.MINIMAL) - strategy, reason = select_strategy( - bundle_size=10_000_000, - incremental_size=5_000_000, - quality=q, - max_transfer_time=3600, - ) - assert strategy == SyncStrategy.MINIMAL - - def test_tiny_incremental_uses_qr(self): - q = make_quality(rtt_ms=10, throughput_bps=10_000_000, strategy=SyncStrategy.FULL) - strategy, reason = select_strategy( - bundle_size=50_000, - incremental_size=QR_MAX_BYTES - 1, - quality=q, - ) - assert strategy == SyncStrategy.QR - assert str(QR_MAX_BYTES - 1) in reason - - def test_qr_not_selected_when_incremental_too_large(self): - q = make_quality(rtt_ms=10, throughput_bps=10_000_000, strategy=SyncStrategy.FULL) - strategy, _ = select_strategy( - bundle_size=50_000, - incremental_size=QR_MAX_BYTES + 1, - quality=q, - ) - assert strategy != SyncStrategy.QR - - def test_medium_link_prefers_incremental_over_full(self): - q = make_quality( - rtt_ms=500, - throughput_bps=50_000, - strategy=SyncStrategy.INCREMENTAL, - ) - strategy, _ = select_strategy( - bundle_size=5_000_000, - incremental_size=50_000, - quality=q, - max_transfer_time=3600, - ) - assert strategy == SyncStrategy.INCREMENTAL - - def test_no_incremental_available_falls_back_to_full_or_minimal(self): - q = make_quality(rtt_ms=50, throughput_bps=10_000_000, strategy=SyncStrategy.FULL) - strategy, _ = select_strategy( - bundle_size=100_000, - incremental_size=None, - quality=q, - ) - assert strategy in (SyncStrategy.FULL, SyncStrategy.INCREMENTAL) - - def test_unreachably_slow_link_uses_minimal(self): - q = make_quality(rtt_ms=30000, throughput_bps=10, strategy=SyncStrategy.MINIMAL) - strategy, _ = select_strategy( - bundle_size=10_000_000, - incremental_size=None, - quality=q, - max_transfer_time=3600, - ) - assert strategy == SyncStrategy.MINIMAL - - def test_fast_link_large_repo_prefers_incremental_if_faster(self): - q = make_quality(rtt_ms=10, throughput_bps=500_000, strategy=SyncStrategy.FULL) - strategy, _ = select_strategy( - bundle_size=100_000_000, - incremental_size=100_000, - quality=q, - ) - assert strategy == SyncStrategy.INCREMENTAL diff --git a/tests/test_git_bundle.py b/tests/test_git_bundle.py deleted file mode 100644 index 028d90a..0000000 --- a/tests/test_git_bundle.py +++ /dev/null @@ -1,267 +0,0 @@ -"""Tests for Git bundle generation and application.""" - -import os -import subprocess -import tempfile -from pathlib import Path - -import pytest - -from radicle_reticulum.git_bundle import ( - BundleMetadata, - BundleType, - GitBundle, - GitBundleGenerator, - GitBundleApplicator, - RADICLE_REF_PATTERNS, -) - - -@pytest.fixture -def temp_git_repo(): - """Create a temporary Git repository for testing.""" - with tempfile.TemporaryDirectory() as tmpdir: - repo_path = Path(tmpdir) / "test_repo" - repo_path.mkdir() - - # Initialize repo - subprocess.run(["git", "init"], cwd=repo_path, check=True, capture_output=True) - subprocess.run( - ["git", "config", "user.email", "test@test.com"], - cwd=repo_path, check=True, capture_output=True - ) - subprocess.run( - ["git", "config", "user.name", "Test User"], - cwd=repo_path, check=True, capture_output=True - ) - - # Create initial commit - (repo_path / "README.md").write_text("# Test Repo\n") - subprocess.run(["git", "add", "README.md"], cwd=repo_path, check=True, capture_output=True) - subprocess.run( - ["git", "commit", "-m", "Initial commit"], - cwd=repo_path, check=True, capture_output=True - ) - - yield repo_path - - -@pytest.fixture -def temp_bare_repo(): - """Create a temporary bare Git repository for testing.""" - with tempfile.TemporaryDirectory() as tmpdir: - repo_path = Path(tmpdir) / "bare_repo.git" - subprocess.run(["git", "init", "--bare", str(repo_path)], check=True, capture_output=True) - yield repo_path - - -class TestBundleMetadata: - """Test BundleMetadata encoding/decoding.""" - - def test_encode_decode_full_bundle(self): - """Test encode/decode of full bundle metadata.""" - metadata = BundleMetadata( - bundle_type=BundleType.FULL, - repository_id="rad:z3gqcJUoA1n9HaHKufZs5FCSGazv5", - source_node="did:key:z6MkhaXgBZDvotDkL5257faiztiGiC2QtKLGpbnnEGta2doK", - timestamp=1234567890123, - refs_included=["refs/heads/main", "refs/rad/id"], - prerequisites=[], - size_bytes=1024, - checksum=b"\x00" * 32, - ) - - encoded = metadata.encode() - decoded, consumed = BundleMetadata.decode(encoded) - - assert decoded.bundle_type == metadata.bundle_type - assert decoded.repository_id == metadata.repository_id - assert decoded.source_node == metadata.source_node - assert decoded.timestamp == metadata.timestamp - assert decoded.refs_included == metadata.refs_included - assert decoded.prerequisites == metadata.prerequisites - assert decoded.size_bytes == metadata.size_bytes - assert decoded.checksum == metadata.checksum - - def test_encode_decode_incremental_bundle(self): - """Test encode/decode of incremental bundle metadata.""" - metadata = BundleMetadata( - bundle_type=BundleType.INCREMENTAL, - repository_id="rad:test", - source_node="did:key:test", - timestamp=1000, - refs_included=["refs/heads/feature"], - prerequisites=["abc123", "def456"], - size_bytes=512, - checksum=b"\xff" * 32, - ) - - encoded = metadata.encode() - decoded, _ = BundleMetadata.decode(encoded) - - assert decoded.bundle_type == BundleType.INCREMENTAL - assert decoded.prerequisites == ["abc123", "def456"] - - -class TestGitBundle: - """Test GitBundle encoding/decoding.""" - - def test_encode_decode_roundtrip(self): - """Test bundle encode/decode roundtrip.""" - import hashlib - - bundle_data = b"fake git bundle data" - metadata = BundleMetadata( - bundle_type=BundleType.FULL, - repository_id="rad:test", - source_node="did:key:test", - timestamp=1000, - refs_included=["refs/heads/main"], - prerequisites=[], - size_bytes=len(bundle_data), - checksum=hashlib.sha256(bundle_data).digest(), - ) - - bundle = GitBundle(metadata=metadata, data=bundle_data) - encoded = bundle.encode() - decoded = GitBundle.decode(encoded) - - assert decoded.data == bundle_data - assert decoded.metadata.repository_id == metadata.repository_id - - def test_checksum_verification_fails_on_corruption(self): - """Test that checksum verification catches corruption.""" - import hashlib - - bundle_data = b"original data" - metadata = BundleMetadata( - bundle_type=BundleType.FULL, - repository_id="rad:test", - source_node="did:key:test", - timestamp=1000, - refs_included=[], - prerequisites=[], - size_bytes=len(bundle_data), - checksum=hashlib.sha256(bundle_data).digest(), - ) - - bundle = GitBundle(metadata=metadata, data=bundle_data) - encoded = bytearray(bundle.encode()) - - # Corrupt the data portion - encoded[-1] ^= 0xFF - - with pytest.raises(ValueError, match="checksum mismatch"): - GitBundle.decode(bytes(encoded)) - - -class TestGitBundleGenerator: - """Test GitBundleGenerator.""" - - def test_get_refs(self, temp_git_repo): - """Test getting refs from repository.""" - generator = GitBundleGenerator(temp_git_repo) - refs = generator.get_refs(["refs/heads/*"]) - - assert "refs/heads/main" in refs or "refs/heads/master" in refs - - def test_create_full_bundle(self, temp_git_repo): - """Test creating a full bundle.""" - generator = GitBundleGenerator(temp_git_repo) - - bundle = generator.create_full_bundle( - repository_id="rad:test", - source_node="did:key:test", - ) - - assert bundle is not None - assert bundle.metadata.bundle_type == BundleType.FULL - assert bundle.metadata.size_bytes > 0 - assert len(bundle.data) > 0 - - def test_create_incremental_bundle_no_changes(self, temp_git_repo): - """Test that incremental bundle returns None when no changes.""" - generator = GitBundleGenerator(temp_git_repo) - - # Get current refs as basis - current_refs = generator.get_refs() - - # Create incremental with same refs - should be None - bundle = generator.create_incremental_bundle( - repository_id="rad:test", - source_node="did:key:test", - basis_refs=current_refs, - ) - - assert bundle is None - - def test_create_incremental_bundle_with_changes(self, temp_git_repo): - """Test creating incremental bundle with new commits.""" - generator = GitBundleGenerator(temp_git_repo) - - # Get current refs as basis - basis_refs = generator.get_refs() - - # Add new commit - (temp_git_repo / "new_file.txt").write_text("new content") - subprocess.run(["git", "add", "new_file.txt"], cwd=temp_git_repo, check=True, capture_output=True) - subprocess.run( - ["git", "commit", "-m", "Add new file"], - cwd=temp_git_repo, check=True, capture_output=True - ) - - # Create incremental - bundle = generator.create_incremental_bundle( - repository_id="rad:test", - source_node="did:key:test", - basis_refs=basis_refs, - ) - - assert bundle is not None - assert bundle.metadata.bundle_type == BundleType.INCREMENTAL - - def test_invalid_repo_path(self): - """Test that invalid repo path raises error.""" - with pytest.raises(ValueError, match="Not a Git repository"): - GitBundleGenerator(Path("/nonexistent/path")) - - -class TestGitBundleApplicator: - """Test GitBundleApplicator.""" - - def test_apply_bundle(self, temp_git_repo, temp_bare_repo): - """Test applying a bundle to a repository.""" - # Create bundle from source repo - generator = GitBundleGenerator(temp_git_repo) - bundle = generator.create_full_bundle( - repository_id="rad:test", - source_node="did:key:test", - ) - - # Apply to bare repo - applicator = GitBundleApplicator(temp_bare_repo) - applied_refs = applicator.apply_bundle(bundle) - - assert len(applied_refs) > 0 - assert any("main" in ref or "master" in ref for ref in applied_refs) - - def test_verify_bundle(self, temp_git_repo, temp_bare_repo): - """Test bundle verification.""" - generator = GitBundleGenerator(temp_git_repo) - bundle = generator.create_full_bundle( - repository_id="rad:test", - source_node="did:key:test", - ) - - applicator = GitBundleApplicator(temp_bare_repo) - ok, msg = applicator.verify_bundle(bundle) - - assert ok - assert "verified" in msg.lower() or msg == "" - - def test_get_current_refs(self, temp_git_repo): - """Test getting current refs.""" - applicator = GitBundleApplicator(temp_git_repo) - refs = applicator.get_current_refs(["refs/heads/*"]) - - assert len(refs) > 0 diff --git a/tests/test_gossip.py b/tests/test_gossip.py new file mode 100644 index 0000000..fd67933 --- /dev/null +++ b/tests/test_gossip.py @@ -0,0 +1,378 @@ +"""Tests for GossipRelay.""" + +import json +import subprocess +import time +from pathlib import Path +from unittest.mock import MagicMock, patch, call + +import pytest + +from radicle_reticulum.gossip import GossipRelay, _read_refs, _radicle_storage_path +from radicle_reticulum.identity import RadicleIdentity + + +# ── Helpers ────────────────────────────────────────────────────────────────── + +def _make_relay(tmp_path, rids=None, **kwargs): + identity = RadicleIdentity.generate() + with patch("radicle_reticulum.gossip.RNS.Reticulum"), \ + patch("radicle_reticulum.gossip.RNS.Destination") as mock_dest_cls, \ + patch("radicle_reticulum.gossip.RNS.Transport"): + mock_dest = MagicMock() + mock_dest.hash = b"\x00" * 16 + mock_dest.hexhash = "0" * 32 + mock_dest_cls.return_value = mock_dest + relay = GossipRelay( + identity=identity, + rids=rids or ["rad:z3abc123"], + storage=tmp_path / "storage", + **kwargs, + ) + relay.destination = mock_dest + return relay + + +SAMPLE_REFS = { + "refs/heads/main": "abc123def456" * 3, + "refs/rad/sigrefs": "def456abc123" * 3, +} + + +# ── _read_refs ──────────────────────────────────────────────────────────────── + +class TestReadRefs: + def test_missing_storage_returns_empty(self, tmp_path): + result = _read_refs(tmp_path / "storage", "rad:z3nonexistent") + assert result == {} + + def test_reads_refs_from_bare_repo(self, tmp_path): + rid = "rad:z3testrepo" + repo_path = tmp_path / "z3testrepo" + repo_path.mkdir() + show_ref_output = ( + "abc123def456abc123def456abc123def456abc1 refs/heads/main\n" + "def456abc123def456abc123def456abc123def4 refs/rad/sigrefs\n" + ) + with patch("radicle_reticulum.gossip.subprocess.run") as mock_run: + mock_run.return_value = MagicMock(returncode=0, stdout=show_ref_output) + result = _read_refs(tmp_path, rid) + + assert result["refs/heads/main"] == "abc123def456abc123def456abc123def456abc1" + assert result["refs/rad/sigrefs"] == "def456abc123def456abc123def456abc123def4" + + def test_strips_rad_prefix(self, tmp_path): + # rid with "rad:" prefix should map to directory without it + repo_path = tmp_path / "z3abc" + repo_path.mkdir() + result = _read_refs(tmp_path, "rad:z3abc") + assert result == {} # empty repo, no refs — just confirming no crash + + +# ── GossipRelay construction ────────────────────────────────────────────────── + +class TestGossipRelayInit: + def test_defaults(self, tmp_path): + relay = _make_relay(tmp_path) + assert relay.rids == ["rad:z3abc123"] + assert relay.poll_interval == 30 + assert relay.bridge_port == 8777 + assert relay.announce_retry_delays == (5, 15, 30) + + def test_custom_params(self, tmp_path): + relay = _make_relay( + tmp_path, + rids=["rad:z3a", "rad:z3b"], + bridge_port=9000, + poll_interval=60, + ) + assert len(relay.rids) == 2 + assert relay.bridge_port == 9000 + assert relay.poll_interval == 60 + + +# ── Broadcast ──────────────────────────────────────────────────────────────── + +class TestBroadcast: + def test_broadcast_sends_to_known_peers(self, tmp_path): + relay = _make_relay(tmp_path, radicle_nid="z6Mktest") + peer_hash = b"\x01" * 16 + relay._known_peers[peer_hash] = time.time() + + with patch.object(relay, "_send_packet", return_value=True) as mock_send: + relay._broadcast("rad:z3abc123", SAMPLE_REFS) + + mock_send.assert_called_once() + args = mock_send.call_args[0] + assert args[0] == peer_hash + msg = json.loads(args[1].decode()) + assert msg["rid"] == "rad:z3abc123" + assert msg["nid"] == "z6Mktest" + assert msg["refs"] == SAMPLE_REFS + + def test_broadcast_no_peers_sends_nothing(self, tmp_path): + relay = _make_relay(tmp_path) + with patch.object(relay, "_send_packet") as mock_send: + relay._broadcast("rad:z3abc123", SAMPLE_REFS) + mock_send.assert_not_called() + + def test_broadcast_multiple_peers(self, tmp_path): + relay = _make_relay(tmp_path) + for i in range(3): + relay._known_peers[bytes([i]) * 16] = time.time() + + with patch.object(relay, "_send_packet", return_value=True) as mock_send: + relay._broadcast("rad:z3abc123", SAMPLE_REFS) + + assert mock_send.call_count == 3 + + +# ── Poll loop ───────────────────────────────────────────────────────────────── + +class TestPollLoop: + def test_broadcasts_on_ref_change(self, tmp_path): + relay = _make_relay(tmp_path) + relay._known_refs["rad:z3abc123"] = {"refs/heads/main": "old_sha"} + + new_refs = {"refs/heads/main": "new_sha"} + + with patch("radicle_reticulum.gossip._read_refs", return_value=new_refs), \ + patch.object(relay, "_broadcast") as mock_broadcast: + # Simulate one poll iteration + for rid in relay.rids: + refs = new_refs + old = relay._known_refs.get(rid) + if refs and refs != old: + if old is not None: + relay._broadcast(rid, refs) + relay._known_refs[rid] = refs + + mock_broadcast.assert_called_once_with("rad:z3abc123", new_refs) + + def test_no_broadcast_on_first_poll(self, tmp_path): + relay = _make_relay(tmp_path) + # No existing refs — first poll should not broadcast + + with patch("radicle_reticulum.gossip._read_refs", return_value=SAMPLE_REFS), \ + patch.object(relay, "_broadcast") as mock_broadcast: + for rid in relay.rids: + refs = SAMPLE_REFS + old = relay._known_refs.get(rid) + if refs and refs != old: + if old is not None: + relay._broadcast(rid, refs) + relay._known_refs[rid] = refs + + mock_broadcast.assert_not_called() + assert relay._known_refs["rad:z3abc123"] == SAMPLE_REFS + + def test_no_broadcast_when_refs_unchanged(self, tmp_path): + relay = _make_relay(tmp_path) + relay._known_refs["rad:z3abc123"] = SAMPLE_REFS + + with patch("radicle_reticulum.gossip._read_refs", return_value=SAMPLE_REFS), \ + patch.object(relay, "_broadcast") as mock_broadcast: + for rid in relay.rids: + refs = SAMPLE_REFS + old = relay._known_refs.get(rid) + if refs and refs != old: + if old is not None: + relay._broadcast(rid, refs) + + mock_broadcast.assert_not_called() + + +# ── Incoming packets ────────────────────────────────────────────────────────── + +class TestOnPacket: + def _make_packet(self, rid, nid, refs): + return json.dumps({"type": "refs", "rid": rid, "nid": nid, "refs": refs}).encode() + + def test_triggers_sync_on_new_refs(self, tmp_path): + relay = _make_relay(tmp_path) + relay._known_refs["rad:z3abc123"] = {"refs/heads/main": "old"} + mock_packet = MagicMock() + + with patch.object(relay, "_trigger_sync") as mock_sync: + relay._on_packet( + self._make_packet("rad:z3abc123", "z6Mkpeer", {"refs/heads/main": "new"}), + mock_packet, + ) + + mock_sync.assert_called_once_with("rad:z3abc123", "z6Mkpeer") + + def test_no_sync_when_refs_match(self, tmp_path): + relay = _make_relay(tmp_path) + relay._known_refs["rad:z3abc123"] = SAMPLE_REFS + mock_packet = MagicMock() + + with patch.object(relay, "_trigger_sync") as mock_sync: + relay._on_packet( + self._make_packet("rad:z3abc123", "z6Mkpeer", SAMPLE_REFS), + mock_packet, + ) + + mock_sync.assert_not_called() + + def test_ignores_invalid_json(self, tmp_path): + relay = _make_relay(tmp_path) + mock_packet = MagicMock() + # Should not raise + relay._on_packet(b"not json at all", mock_packet) + + def test_ignores_wrong_type(self, tmp_path): + relay = _make_relay(tmp_path) + mock_packet = MagicMock() + with patch.object(relay, "_trigger_sync") as mock_sync: + relay._on_packet( + json.dumps({"type": "ping", "rid": "rad:z3abc123"}).encode(), + mock_packet, + ) + mock_sync.assert_not_called() + + def test_triggers_sync_for_unknown_repo(self, tmp_path): + relay = _make_relay(tmp_path) + # No prior known refs — any incoming refs are "new" + mock_packet = MagicMock() + with patch.object(relay, "_trigger_sync") as mock_sync: + relay._on_packet( + self._make_packet("rad:z3unknown", "z6Mkpeer", SAMPLE_REFS), + mock_packet, + ) + mock_sync.assert_called_once() + + def test_sync_callback_invoked(self, tmp_path): + relay = _make_relay(tmp_path) + received = [] + relay.set_on_sync_triggered(lambda rid, nid: received.append((rid, nid))) + + with patch("radicle_reticulum.gossip.subprocess.run") as mock_run: + mock_run.return_value = MagicMock(returncode=0) + relay._trigger_sync("rad:z3abc123", "z6Mkpeer") + + assert received == [("rad:z3abc123", "z6Mkpeer")] + + +# ── Trigger sync ────────────────────────────────────────────────────────────── + +class TestTriggerSync: + def test_calls_rad_node_connect_when_nid_given(self, tmp_path): + relay = _make_relay(tmp_path, bridge_port=8777) + with patch("radicle_reticulum.gossip.subprocess.run") as mock_run: + mock_run.return_value = MagicMock(returncode=0) + relay._trigger_sync("rad:z3abc123", "z6Mkpeer123") + + calls = [c[0][0] for c in mock_run.call_args_list] + connect_calls = [c for c in calls if "connect" in c] + assert connect_calls + assert "z6Mkpeer123@127.0.0.1:8777" in connect_calls[0] + + def test_skips_connect_when_no_nid(self, tmp_path): + relay = _make_relay(tmp_path) + with patch("radicle_reticulum.gossip.subprocess.run") as mock_run: + mock_run.return_value = MagicMock(returncode=0) + relay._trigger_sync("rad:z3abc123", "") + + calls = [c[0][0] for c in mock_run.call_args_list] + connect_calls = [c for c in calls if "connect" in c] + assert not connect_calls + + def test_calls_rad_sync_fetch(self, tmp_path): + relay = _make_relay(tmp_path) + with patch("radicle_reticulum.gossip.subprocess.run") as mock_run: + mock_run.return_value = MagicMock(returncode=0) + relay._trigger_sync("rad:z3abc123", "") + + all_calls = [c[0][0] for c in mock_run.call_args_list] + sync_calls = [c for c in all_calls if "sync" in c] + assert sync_calls + assert "--fetch" in sync_calls[0] + assert "--rid" in sync_calls[0] + + +# ── Peer discovery ──────────────────────────────────────────────────────────── + +class TestPeerDiscovery: + def _make_app_data(self, nid=None): + import struct + from radicle_reticulum.gossip import GOSSIP_MAGIC + data = GOSSIP_MAGIC + if nid: + b = nid.encode() + data += struct.pack("!H", len(b)) + b + return data + + def test_new_peer_added(self, tmp_path): + relay = _make_relay(tmp_path) + peer_hash = b"\x02" * 16 + relay._known_refs["rad:z3abc123"] = SAMPLE_REFS + + with patch.object(relay, "_send_packet"): + relay._on_announce(peer_hash, MagicMock(), self._make_app_data("z6Mkpeer")) + + assert peer_hash in relay._known_peers + + def test_own_announce_ignored(self, tmp_path): + relay = _make_relay(tmp_path) + own_hash = relay.destination.hash + + with patch.object(relay, "_send_packet") as mock_send: + relay._on_announce(own_hash, MagicMock(), self._make_app_data()) + + mock_send.assert_not_called() + assert own_hash not in relay._known_peers + + def test_non_gossip_announce_ignored(self, tmp_path): + relay = _make_relay(tmp_path) + peer_hash = b"\x03" * 16 + + with patch.object(relay, "_send_packet") as mock_send: + relay._on_announce(peer_hash, MagicMock(), b"OTHER_APP_DATA") + + mock_send.assert_not_called() + assert peer_hash not in relay._known_peers + + def test_sends_current_refs_to_new_peer(self, tmp_path): + relay = _make_relay(tmp_path) + relay._known_refs["rad:z3abc123"] = SAMPLE_REFS + peer_hash = b"\x04" * 16 + + with patch.object(relay, "_send_packet") as mock_send: + relay._on_announce(peer_hash, MagicMock(), self._make_app_data()) + + mock_send.assert_called_once() + assert mock_send.call_args[0][0] == peer_hash + + def test_duplicate_announce_not_re_sent(self, tmp_path): + relay = _make_relay(tmp_path) + relay._known_refs["rad:z3abc123"] = SAMPLE_REFS + peer_hash = b"\x05" * 16 + relay._known_peers[peer_hash] = time.time() # already known + + with patch.object(relay, "_send_packet") as mock_send: + relay._on_announce(peer_hash, MagicMock(), self._make_app_data()) + + mock_send.assert_not_called() + + +# ── push_refs_now ───────────────────────────────────────────────────────────── + +class TestPushRefsNow: + def test_push_refs_now_broadcasts_immediately(self, tmp_path): + relay = _make_relay(tmp_path) + peer_hash = b"\x06" * 16 + relay._known_peers[peer_hash] = time.time() + + with patch("radicle_reticulum.gossip._read_refs", return_value=SAMPLE_REFS), \ + patch.object(relay, "_send_packet", return_value=True) as mock_send: + relay.push_refs_now("rad:z3abc123") + + mock_send.assert_called_once() + + def test_push_refs_now_updates_known_refs(self, tmp_path): + relay = _make_relay(tmp_path) + with patch("radicle_reticulum.gossip._read_refs", return_value=SAMPLE_REFS), \ + patch.object(relay, "_send_packet", return_value=True): + relay.push_refs_now("rad:z3abc123") + + assert relay._known_refs["rad:z3abc123"] == SAMPLE_REFS diff --git a/tests/test_qr.py b/tests/test_qr.py deleted file mode 100644 index db8264b..0000000 --- a/tests/test_qr.py +++ /dev/null @@ -1,108 +0,0 @@ -"""Tests for QR bundle encoding/decoding.""" - -import hashlib -import pytest - -from radicle_reticulum.git_bundle import GitBundle, BundleMetadata, BundleType -from radicle_reticulum.qr import ( - encode_bundle_to_qr, - decode_bundle_from_qr_data, - BundleTooLargeForQR, - QR_MAX_BYTES, - QR_MAGIC, -) - - -def make_small_bundle(data: bytes = b"tiny git bundle data") -> GitBundle: - """Create a minimal GitBundle for testing.""" - metadata = BundleMetadata( - bundle_type=BundleType.INCREMENTAL, - repository_id="rad:z3test", - source_node="did:key:z6Mktest", - timestamp=1000, - refs_included=["refs/heads/main"], - prerequisites=["abc123"], - size_bytes=len(data), - checksum=hashlib.sha256(data).digest(), - ) - return GitBundle(metadata=metadata, data=data) - - -class TestQRPayloadRoundtrip: - """Test the binary payload encode/decode without QR rendering.""" - - def _encode_payload(self, bundle: GitBundle) -> bytes: - """Extract the raw bytes that would be put into a QR code.""" - import struct - bundle_bytes = bundle.encode() - checksum = hashlib.sha256(bundle_bytes).digest() - length_prefix = len(bundle_bytes).to_bytes(4, "big") - return QR_MAGIC + length_prefix + checksum + bundle_bytes - - def test_decode_roundtrip(self): - bundle = make_small_bundle() - payload = self._encode_payload(bundle) - decoded = decode_bundle_from_qr_data(payload) - assert decoded.metadata.repository_id == bundle.metadata.repository_id - assert decoded.data == bundle.data - - def test_decode_rejects_wrong_magic(self): - bundle = make_small_bundle() - payload = b"WRONGMAGIC" + self._encode_payload(bundle)[len(QR_MAGIC):] - with pytest.raises(ValueError, match="Not a Radicle QR payload"): - decode_bundle_from_qr_data(payload) - - def test_decode_detects_corruption(self): - bundle = make_small_bundle() - payload = bytearray(self._encode_payload(bundle)) - payload[-1] ^= 0xFF # flip last bit of bundle data - with pytest.raises(ValueError, match="checksum mismatch"): - decode_bundle_from_qr_data(bytes(payload)) - - def test_decode_rejects_truncated_payload(self): - bundle = make_small_bundle() - payload = self._encode_payload(bundle) - # Truncate the bundle data portion - truncated = payload[:-10] - with pytest.raises(ValueError, match="Truncated"): - decode_bundle_from_qr_data(truncated) - - -class TestBundleTooLarge: - def test_oversized_bundle_raises(self): - large_data = b"x" * (QR_MAX_BYTES + 1) - bundle = make_small_bundle(data=large_data) - with pytest.raises(BundleTooLargeForQR, match="QR capacity"): - encode_bundle_to_qr(bundle) - - def test_exact_limit_would_include_overhead(self): - # QR_MAX_BYTES is the limit for the serialised bundle, including metadata. - # A bundle with data just under the limit should still fail due to metadata overhead. - # This test verifies the check catches realistic oversized bundles. - oversized_data = b"a" * QR_MAX_BYTES - bundle = make_small_bundle(data=oversized_data) - with pytest.raises(BundleTooLargeForQR): - encode_bundle_to_qr(bundle) - - -class TestQREncodeOutput: - """Test QR rendering (requires qrcode package).""" - - def test_encode_returns_ascii_art(self): - bundle = make_small_bundle() - result = encode_bundle_to_qr(bundle) - assert isinstance(result, str) - assert len(result) > 0 - # ASCII art QR codes use block characters - assert any(c in result for c in ("█", "░", " ", "\n")) - - def test_encode_small_bundle_succeeds(self): - bundle = make_small_bundle(b"small") - result = encode_bundle_to_qr(bundle) - assert result # non-empty - - def test_error_correction_levels(self): - bundle = make_small_bundle() - for level in ("L", "M", "Q", "H"): - result = encode_bundle_to_qr(bundle, error_correction=level) - assert isinstance(result, str) diff --git a/tests/test_sync.py b/tests/test_sync.py deleted file mode 100644 index 3111cee..0000000 --- a/tests/test_sync.py +++ /dev/null @@ -1,543 +0,0 @@ -"""Tests for sync data structures and SyncManager logic (no RNS networking).""" - -import struct -import time -import tempfile -from pathlib import Path -from unittest.mock import MagicMock, patch - -import pytest - -import LXMF - -from radicle_reticulum.sync import ( - RefsAnnouncement, - SyncManager, - SyncMode, - CONTENT_TYPE_BUNDLE, - CONTENT_TYPE_BUNDLE_CHUNK, - CONTENT_TYPE_REFS_ANNOUNCE, - CHUNK_HEADER_SIZE, -) -from radicle_reticulum.identity import RadicleIdentity -from radicle_reticulum.git_bundle import GitBundle, BundleMetadata, BundleType -import hashlib - - -# --------------------------------------------------------------------------- -# Helpers -# --------------------------------------------------------------------------- - -def _make_manager(tmp_path: Path) -> SyncManager: - """Instantiate SyncManager with LXMF/RNS components patched out.""" - identity = RadicleIdentity.generate() - - mock_router = MagicMock() - mock_dest = MagicMock() - mock_dest.hash = b"\x00" * 32 - mock_dest.hash_hex = "00" * 32 - - with patch("radicle_reticulum.sync.RNS.Reticulum"), \ - patch("radicle_reticulum.sync.LXMF.LXMRouter", return_value=mock_router), \ - patch("radicle_reticulum.sync.RNS.log"): - mock_router.register_delivery_identity.return_value = mock_dest - manager = SyncManager(identity=identity, storage_path=tmp_path) - manager._lxmf_router = mock_router - manager._lxmf_destination = mock_dest - - return manager - - -def _make_bundle(data: bytes = b"git bundle") -> GitBundle: - checksum = hashlib.sha256(data).digest() - metadata = BundleMetadata( - bundle_type=BundleType.FULL, - repository_id="rad:z3test", - source_node="did:key:z6Mktest", - timestamp=1000, - refs_included=["refs/heads/main"], - prerequisites=[], - size_bytes=len(data), - checksum=checksum, - ) - return GitBundle(metadata=metadata, data=data) - - -class TestRefsAnnouncement: - def test_encode_decode_roundtrip(self): - ann = RefsAnnouncement( - repository_id="rad:z3gqcJUoA1n9HaHKufZs5FCSGazv5", - node_id="did:key:z6MkhaXgBZDvotDkL5257faiztiGiC2QtKLGpbnnEGta2doK", - refs={ - "refs/heads/main": "abc123def456789012345678901234567890abcd", - "refs/rad/id": "0000000000000000000000000000000000000001", - }, - timestamp=1234567890123, - ) - - encoded = ann.encode() - decoded = RefsAnnouncement.decode(encoded) - - assert decoded.repository_id == ann.repository_id - assert decoded.node_id == ann.node_id - assert decoded.refs == ann.refs - assert decoded.timestamp == ann.timestamp - - def test_empty_refs(self): - ann = RefsAnnouncement( - repository_id="rad:test", - node_id="did:key:z6Mk...", - refs={}, - timestamp=0, - ) - decoded = RefsAnnouncement.decode(ann.encode()) - assert decoded.refs == {} - - def test_many_refs(self): - refs = {f"refs/heads/branch-{i}": "a" * 40 for i in range(50)} - ann = RefsAnnouncement( - repository_id="rad:z3test", - node_id="did:key:z6Mktest", - refs=refs, - timestamp=9999, - ) - decoded = RefsAnnouncement.decode(ann.encode()) - assert decoded.refs == refs - - def test_special_characters_in_ref_names(self): - refs = {"refs/heads/feature/my-branch_v2.0": "b" * 40} - ann = RefsAnnouncement( - repository_id="rad:z3test", - node_id="did:key:z6Mktest", - refs=refs, - timestamp=1, - ) - decoded = RefsAnnouncement.decode(ann.encode()) - assert decoded.refs == refs - - def test_encode_produces_bytes(self): - ann = RefsAnnouncement( - repository_id="rad:test", - node_id="did:key:z6Mk", - refs={"refs/heads/main": "a" * 40}, - timestamp=1000, - ) - assert isinstance(ann.encode(), bytes) - assert len(ann.encode()) > 0 - - -# --------------------------------------------------------------------------- -# SyncManager state management -# --------------------------------------------------------------------------- - -class TestSyncManagerPeers: - def test_initial_peer_list_empty(self, tmp_path): - manager = _make_manager(tmp_path) - assert manager.get_known_peers() == [] - - def test_register_peer_adds_to_list(self, tmp_path): - manager = _make_manager(tmp_path) - with patch("radicle_reticulum.sync.RNS.log"): - manager.register_peer(b"\x01" * 16) - assert b"\x01" * 16 in manager.get_known_peers() - - def test_register_multiple_peers(self, tmp_path): - manager = _make_manager(tmp_path) - hashes = [bytes([i]) * 16 for i in range(3)] - with patch("radicle_reticulum.sync.RNS.log"): - for h in hashes: - manager.register_peer(h) - assert set(manager.get_known_peers()) == set(hashes) - - def test_peer_learned_from_incoming_message(self, tmp_path): - manager = _make_manager(tmp_path) - source_hash = b"\xab" * 16 - - bundle = _make_bundle() - msg = MagicMock() - msg.source_hash = source_hash - msg.fields = {LXMF.FIELD_CUSTOM_TYPE: CONTENT_TYPE_BUNDLE} - msg.content = bundle.encode() - - with patch("radicle_reticulum.sync.RNS.log"): - manager._on_lxmf_delivery(msg) - - assert source_hash in manager.get_known_peers() - - def test_message_without_source_hash_still_processed(self, tmp_path): - manager = _make_manager(tmp_path) - bundle = _make_bundle() - - msg = MagicMock() - msg.source_hash = None - msg.fields = {LXMF.FIELD_CUSTOM_TYPE: CONTENT_TYPE_BUNDLE} - msg.content = bundle.encode() - - received = [] - manager.set_on_bundle_received(received.append) - - with patch("radicle_reticulum.sync.RNS.log"): - manager._on_lxmf_delivery(msg) - - assert len(received) == 1 - - -# --------------------------------------------------------------------------- -# SyncManager delivery callbacks -# --------------------------------------------------------------------------- - -class TestSyncManagerDelivery: - def test_bundle_callback_fires_on_bundle_message(self, tmp_path): - manager = _make_manager(tmp_path) - bundle = _make_bundle() - - received = [] - manager.set_on_bundle_received(received.append) - - msg = MagicMock() - msg.source_hash = b"\x01" * 16 - msg.fields = {LXMF.FIELD_CUSTOM_TYPE: CONTENT_TYPE_BUNDLE} - msg.content = bundle.encode() - - with patch("radicle_reticulum.sync.RNS.log"): - manager._on_lxmf_delivery(msg) - - assert len(received) == 1 - assert received[0].metadata.repository_id == "rad:z3test" - - def test_refs_callback_fires_on_refs_announce_message(self, tmp_path): - manager = _make_manager(tmp_path) - ann = RefsAnnouncement( - repository_id="rad:z3test", - node_id="did:key:z6Mktest", - refs={"refs/heads/main": "a" * 40}, - timestamp=999, - ) - - received = [] - manager.set_on_refs_announced(received.append) - - msg = MagicMock() - msg.source_hash = b"\x02" * 16 - msg.fields = {LXMF.FIELD_CUSTOM_TYPE: CONTENT_TYPE_REFS_ANNOUNCE} - msg.content = ann.encode() - - with patch("radicle_reticulum.sync.RNS.log"): - manager._on_lxmf_delivery(msg) - - assert len(received) == 1 - assert received[0].repository_id == "rad:z3test" - - def test_unknown_content_type_silently_ignored(self, tmp_path): - manager = _make_manager(tmp_path) - - msg = MagicMock() - msg.source_hash = None - msg.fields = {LXMF.FIELD_CUSTOM_TYPE: 0xFF} - msg.content = b"garbage" - - with patch("radicle_reticulum.sync.RNS.log"): - manager._on_lxmf_delivery(msg) # should not raise - - -# --------------------------------------------------------------------------- -# Chunk reassembly -# --------------------------------------------------------------------------- - -class TestChunkReassembly: - def _make_chunks(self, data: bytes, chunk_size: int = 10): - """Split data into chunk messages as _send_chunked_bundle would.""" - bundle_id = hashlib.sha256(data).digest()[:16] - total = (len(data) + chunk_size - 1) // chunk_size - msgs = [] - for i in range(total): - chunk = data[i * chunk_size: (i + 1) * chunk_size] - header = struct.pack("!16sHH", bundle_id, i, total) - msgs.append(header + chunk) - return msgs - - def test_reassemble_two_chunks(self, tmp_path): - manager = _make_manager(tmp_path) - bundle = _make_bundle(b"first half second half ") - bundle_bytes = bundle.encode() - - received = [] - manager.set_on_bundle_received(received.append) - - chunks = self._make_chunks(bundle_bytes, chunk_size=len(bundle_bytes) // 2 + 1) - assert len(chunks) == 2 - - for chunk_data in chunks: - msg = MagicMock() - msg.source_hash = b"\x01" * 16 - msg.fields = {LXMF.FIELD_CUSTOM_TYPE: CONTENT_TYPE_BUNDLE_CHUNK} - msg.content = chunk_data - with patch("radicle_reticulum.sync.RNS.log"): - manager._on_lxmf_delivery(msg) - - assert len(received) == 1 - assert received[0].data == bundle.data - - def test_out_of_order_chunks_reassemble_correctly(self, tmp_path): - manager = _make_manager(tmp_path) - bundle = _make_bundle(b"abcdefghijklmnopqrstuvwxyz0123456789") - bundle_bytes = bundle.encode() - - received = [] - manager.set_on_bundle_received(received.append) - - chunks = self._make_chunks(bundle_bytes, chunk_size=8) - assert len(chunks) >= 3 - - for chunk_data in reversed(chunks): # deliver in reverse order - msg = MagicMock() - msg.source_hash = None - msg.fields = {LXMF.FIELD_CUSTOM_TYPE: CONTENT_TYPE_BUNDLE_CHUNK} - msg.content = chunk_data - with patch("radicle_reticulum.sync.RNS.log"): - manager._on_lxmf_delivery(msg) - - assert len(received) == 1 - assert received[0].data == bundle.data - - def test_malformed_chunk_too_short_is_ignored(self, tmp_path): - manager = _make_manager(tmp_path) - - msg = MagicMock() - msg.source_hash = None - msg.fields = {LXMF.FIELD_CUSTOM_TYPE: CONTENT_TYPE_BUNDLE_CHUNK} - msg.content = b"\x00" * (CHUNK_HEADER_SIZE - 1) # too short - - with patch("radicle_reticulum.sync.RNS.log"): - manager._on_lxmf_delivery(msg) # should not raise - - assert manager._chunk_buffers == {} - - def test_partial_chunks_not_delivered_until_complete(self, tmp_path): - manager = _make_manager(tmp_path) - bundle = _make_bundle(b"partial test data here") - bundle_bytes = bundle.encode() - - received = [] - manager.set_on_bundle_received(received.append) - - chunks = self._make_chunks(bundle_bytes, chunk_size=5) - assert len(chunks) >= 2 - - # Send only first chunk - msg = MagicMock() - msg.source_hash = None - msg.fields = {LXMF.FIELD_CUSTOM_TYPE: CONTENT_TYPE_BUNDLE_CHUNK} - msg.content = chunks[0] - with patch("radicle_reticulum.sync.RNS.log"): - manager._on_lxmf_delivery(msg) - - assert received == [] # not yet complete - - -# --------------------------------------------------------------------------- -# SyncManager repository management -# --------------------------------------------------------------------------- - -class TestSyncManagerRepositories: - def test_register_repository(self, tmp_path): - manager = _make_manager(tmp_path) - repo_path = tmp_path / "repo" - repo_path.mkdir() - - with patch("radicle_reticulum.sync.RNS.log"): - state = manager.register_repository("rad:z3test", repo_path) - - assert state.repository_id == "rad:z3test" - assert state.local_path == repo_path - - def test_register_same_repo_twice_returns_same_state(self, tmp_path): - manager = _make_manager(tmp_path) - repo_path = tmp_path / "repo" - repo_path.mkdir() - - with patch("radicle_reticulum.sync.RNS.log"): - s1 = manager.register_repository("rad:z3test", repo_path) - s2 = manager.register_repository("rad:z3test", repo_path) - - assert s1 is s2 - - def test_get_sync_status_returns_none_for_unknown(self, tmp_path): - manager = _make_manager(tmp_path) - assert manager.get_sync_status("rad:unknown") is None - - def test_get_sync_status_after_register(self, tmp_path): - manager = _make_manager(tmp_path) - repo_path = tmp_path / "repo" - repo_path.mkdir() - - with patch("radicle_reticulum.sync.RNS.log"): - manager.register_repository("rad:z3test", repo_path) - - status = manager.get_sync_status("rad:z3test") - assert status is not None - assert status["repository_id"] == "rad:z3test" - assert status["known_peers"] == 0 - - -# --------------------------------------------------------------------------- -# Phase 4: speculative push -# --------------------------------------------------------------------------- - -def _make_manager_auto_push(tmp_path: Path) -> SyncManager: - identity = RadicleIdentity.generate() - mock_router = MagicMock() - mock_dest = MagicMock() - mock_dest.hash = b"\x00" * 32 - mock_dest.hash_hex = "00" * 32 - - with patch("radicle_reticulum.sync.RNS.Reticulum"), \ - patch("radicle_reticulum.sync.LXMF.LXMRouter", return_value=mock_router), \ - patch("radicle_reticulum.sync.RNS.log"): - mock_router.register_delivery_identity.return_value = mock_dest - manager = SyncManager(identity=identity, storage_path=tmp_path, auto_push=True) - manager._lxmf_router = mock_router - manager._lxmf_destination = mock_dest - - return manager - - -class TestShouldPushToPeer: - def test_no_push_when_peer_up_to_date(self, tmp_path): - manager = _make_manager(tmp_path) - our_refs = {"refs/heads/main": "a" * 40} - peer_refs = {"refs/heads/main": "a" * 40} - assert manager._should_push_to_peer(our_refs, peer_refs) is False - - def test_push_when_peer_behind(self, tmp_path): - manager = _make_manager(tmp_path) - our_refs = {"refs/heads/main": "b" * 40} - peer_refs = {"refs/heads/main": "a" * 40} - assert manager._should_push_to_peer(our_refs, peer_refs) is True - - def test_push_when_peer_missing_ref(self, tmp_path): - manager = _make_manager(tmp_path) - our_refs = {"refs/heads/main": "a" * 40, "refs/heads/dev": "b" * 40} - peer_refs = {"refs/heads/main": "a" * 40} - assert manager._should_push_to_peer(our_refs, peer_refs) is True - - def test_no_push_when_our_refs_empty(self, tmp_path): - manager = _make_manager(tmp_path) - assert manager._should_push_to_peer({}, {"refs/heads/main": "a" * 40}) is False - - def test_no_push_when_both_empty(self, tmp_path): - manager = _make_manager(tmp_path) - assert manager._should_push_to_peer({}, {}) is False - - -class TestSpeculativePush: - def _make_announcement(self, repo_id="rad:z3test", refs=None): - return RefsAnnouncement( - repository_id=repo_id, - node_id="did:key:z6Mkpeer", - refs=refs or {"refs/heads/main": "a" * 40}, - timestamp=1000, - ) - - def test_auto_push_false_does_not_call_maybe_push(self, tmp_path): - manager = _make_manager(tmp_path) # auto_push=False by default - ann = self._make_announcement() - - msg = MagicMock() - msg.source_hash = b"\x01" * 16 - msg.fields = {LXMF.FIELD_CUSTOM_TYPE: CONTENT_TYPE_REFS_ANNOUNCE} - msg.content = ann.encode() - - with patch("radicle_reticulum.sync.RNS.log"), \ - patch.object(manager, "_maybe_push_to_peer") as mock_push: - manager._on_lxmf_delivery(msg) - - mock_push.assert_not_called() - - def test_auto_push_true_calls_maybe_push_on_refs_announce(self, tmp_path): - manager = _make_manager_auto_push(tmp_path) - ann = self._make_announcement() - - msg = MagicMock() - msg.source_hash = b"\x02" * 16 - msg.fields = {LXMF.FIELD_CUSTOM_TYPE: CONTENT_TYPE_REFS_ANNOUNCE} - msg.content = ann.encode() - - with patch("radicle_reticulum.sync.RNS.log"), \ - patch.object(manager, "_maybe_push_to_peer") as mock_push: - manager._on_lxmf_delivery(msg) - - mock_push.assert_called_once_with(b"\x02" * 16, mock_push.call_args[0][1]) - - def test_auto_push_skipped_when_source_hash_none(self, tmp_path): - manager = _make_manager_auto_push(tmp_path) - ann = self._make_announcement() - - msg = MagicMock() - msg.source_hash = None - msg.fields = {LXMF.FIELD_CUSTOM_TYPE: CONTENT_TYPE_REFS_ANNOUNCE} - msg.content = ann.encode() - - with patch("radicle_reticulum.sync.RNS.log"), \ - patch.object(manager, "_maybe_push_to_peer") as mock_push: - manager._on_lxmf_delivery(msg) - - mock_push.assert_not_called() - - def test_maybe_push_skips_unknown_repository(self, tmp_path): - manager = _make_manager_auto_push(tmp_path) - ann = self._make_announcement(repo_id="rad:unknown") - - with patch("radicle_reticulum.sync.RNS.log"), \ - patch("radicle_reticulum.sync.GitBundleGenerator") as mock_gen: - manager._maybe_push_to_peer(b"\x03" * 16, ann) - - mock_gen.assert_not_called() - - def test_maybe_push_skips_when_peer_up_to_date(self, tmp_path): - manager = _make_manager_auto_push(tmp_path) - repo_path = tmp_path / "repo" - repo_path.mkdir() - - with patch("radicle_reticulum.sync.RNS.log"): - manager.register_repository("rad:z3test", repo_path) - - our_refs = {"refs/heads/main": "a" * 40} - peer_refs = {"refs/heads/main": "a" * 40} - ann = self._make_announcement(refs=peer_refs) - - with patch("radicle_reticulum.sync.RNS.log"), \ - patch("radicle_reticulum.sync.GitBundleGenerator") as mock_gen_cls: - mock_gen = MagicMock() - mock_gen_cls.return_value = mock_gen - mock_gen.get_refs.return_value = our_refs - manager._maybe_push_to_peer(b"\x04" * 16, ann) - - mock_gen.create_incremental_bundle.assert_not_called() - - def test_maybe_push_sends_bundle_when_ahead(self, tmp_path): - manager = _make_manager_auto_push(tmp_path) - repo_path = tmp_path / "repo" - repo_path.mkdir() - - with patch("radicle_reticulum.sync.RNS.log"): - manager.register_repository("rad:z3test", repo_path) - - our_refs = {"refs/heads/main": "b" * 40} - peer_refs = {"refs/heads/main": "a" * 40} - ann = self._make_announcement(refs=peer_refs) - bundle = _make_bundle(b"incremental data") - - with patch("radicle_reticulum.sync.RNS.log"), \ - patch("radicle_reticulum.sync.GitBundleGenerator") as mock_gen_cls, \ - patch.object(manager, "_send_lxmf_message", return_value=True) as mock_send: - mock_gen = MagicMock() - mock_gen_cls.return_value = mock_gen - mock_gen.get_refs.return_value = our_refs - mock_gen.create_incremental_bundle.return_value = bundle - manager._maybe_push_to_peer(b"\x05" * 16, ann) - - mock_send.assert_called_once() - call_args = mock_send.call_args[0] - assert call_args[0] == b"\x05" * 16 - assert call_args[1] == CONTENT_TYPE_BUNDLE