feat: replace parallel git-bundle layer with real Radicle seed bridging

Rips out the custom git-bundle/sync/adaptive/qr parallel sync stack and
replaces it with a proper seed-to-seed architecture: two real radicle-node
instances (each with seedingPolicy=allow) bridge over Reticulum/LoRa.
Users connect their own radicle-node to the local seed once; all mesh
sync happens in the background using Radicle's own protocol, so atomicity,
Noise XK encryption, and pack verification are all handled natively.

New modules:
- seed.py: SeedNode — manages a dedicated radicle-node subprocess under its
  own RAD_HOME (~/.radicle-rns/seed/), writes a seed config only on first
  run, uses DEVNULL I/O to prevent pipe-buffer deadlocks
- gossip.py: GossipRelay — polls ~/.radicle/storage/<rid>/ for ref changes,
  broadcasts ~200-500 byte RNS packets to known peers, on receipt calls
  `rad sync --fetch --rid X`; thread-safe via separate peers/refs locks

New CLI commands:
- `radicle-rns seed`: starts seed radicle-node + bridge as one command;
  auto-discovers remote seeds over the mesh and connects them
- `radicle-rns gossip`: runs ref-watching notification relay

bridge.py: added rad_home parameter so `rad node connect` is called with
the seed's RAD_HOME when auto-connecting remote seeds.

Bug fixes applied during review:
- seed.py: PIPE→DEVNULL (64 KB pipe buffer deadlock)
- seed.py: missing wait() after kill() (zombie process)
- seed.py: write_config() now idempotent (preserves user customisations)
- seed.py: is_initialized() wraps TimeoutExpired in except Exception
- seed.py: removed hardcoded "network":"main" (breaks testnet)
- seed.py: moved `import socket` to top-level
- cli.py: bridge.start() inside try/finally (orphaned seed on start error)
- cli.py: status line gated on known_bridges change (not every 5 s)
- cli.py: removed dead get_remote_bridges() call in bridge status loop
- gossip.py: added _refs_lock protecting _known_refs across threads

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Maciek "mab122" Bator 2026-04-22 15:44:59 +02:00
parent be25772602
commit e051d82af1
15 changed files with 1197 additions and 3282 deletions

View File

@ -15,14 +15,10 @@ dependencies = [
]
[project.optional-dependencies]
qr = [
"qrcode>=7.0",
]
dev = [
"pytest>=7.0.0",
"pytest-asyncio>=0.21.0",
"mypy>=1.0.0",
"qrcode>=7.0",
]
[project.scripts]
@ -36,7 +32,6 @@ dev = [
"pytest>=7.0.0",
"pytest-asyncio>=0.21.0",
"mypy>=1.0.0",
"qrcode>=8.2",
]
[build-system]

View File

@ -12,44 +12,15 @@ from radicle_reticulum.messages import (
Pong,
decode_message,
)
from radicle_reticulum.git_bundle import (
GitBundle,
GitBundleGenerator,
GitBundleApplicator,
BundleType,
BundleMetadata,
)
from radicle_reticulum.sync import (
SyncManager,
SyncMode,
RefsAnnouncement,
create_dead_drop_bundle,
apply_dead_drop_bundle,
)
from radicle_reticulum.adaptive import (
SyncStrategy,
LinkQuality,
BandwidthProbe,
AdaptiveSyncManager,
select_strategy,
)
from radicle_reticulum.bridge import RadicleBridge
from radicle_reticulum.qr import (
encode_bundle_to_qr,
decode_bundle_from_qr_data,
decode_bundle_from_qr_image,
BundleTooLargeForQR,
QR_MAX_BYTES as QR_BUNDLE_MAX_BYTES,
)
from radicle_reticulum.gossip import GossipRelay
from radicle_reticulum.seed import SeedNode
__version__ = "0.1.0"
__all__ = [
# Identity
"RadicleIdentity",
# Transport
"RNSTransportAdapter",
"RadicleLink",
# Messages
"MessageType",
"NodeAnnouncement",
"InventoryAnnouncement",
@ -57,30 +28,7 @@ __all__ = [
"Ping",
"Pong",
"decode_message",
# Git bundles
"GitBundle",
"GitBundleGenerator",
"GitBundleApplicator",
"BundleType",
"BundleMetadata",
# Sync
"SyncManager",
"SyncMode",
"RefsAnnouncement",
"create_dead_drop_bundle",
"apply_dead_drop_bundle",
# Adaptive sync
"SyncStrategy",
"LinkQuality",
"BandwidthProbe",
"AdaptiveSyncManager",
"select_strategy",
# Bridge
"RadicleBridge",
# QR
"encode_bundle_to_qr",
"decode_bundle_from_qr_data",
"decode_bundle_from_qr_image",
"BundleTooLargeForQR",
"QR_BUNDLE_MAX_BYTES",
"GossipRelay",
"SeedNode",
]

View File

@ -1,347 +0,0 @@
"""Adaptive sync with automatic bandwidth detection.
Automatically selects sync strategy based on:
- Link RTT (round-trip time)
- Measured throughput
- Bundle size estimation
Strategies:
- FULL: Fast links (>100 Kbps) - send complete bundles
- INCREMENTAL: Medium links (1-100 Kbps) - send only changes
- MINIMAL: Slow links (<1 Kbps, LoRa) - send only refs + request
- QR: Tiny payloads (<3KB) - encode as QR for visual transfer
"""
import time
import struct
from dataclasses import dataclass
from enum import Enum
from typing import Optional, Tuple
import RNS
from radicle_reticulum.link import RadicleLink
class SyncStrategy(Enum):
"""Sync strategy based on link quality."""
FULL = "full" # >100 Kbps - send everything
INCREMENTAL = "incremental" # 1-100 Kbps - only changes
MINIMAL = "minimal" # <1 Kbps - refs only, request pulls
QR = "qr" # <3 KB payload - QR code capable
@dataclass
class LinkQuality:
"""Measured link quality metrics."""
rtt_ms: float # Round-trip time in milliseconds
throughput_bps: float # Estimated throughput in bits per second
packet_loss: float # Packet loss ratio (0-1)
is_lora: bool # Detected as LoRa link
strategy: SyncStrategy # Recommended strategy
@property
def throughput_kbps(self) -> float:
return self.throughput_bps / 1000
def __repr__(self) -> str:
return (
f"LinkQuality(rtt={self.rtt_ms:.0f}ms, "
f"throughput={self.throughput_kbps:.1f}Kbps, "
f"strategy={self.strategy.value})"
)
# Bandwidth thresholds (bits per second)
THRESHOLD_FULL = 100_000 # 100 Kbps - use full sync
THRESHOLD_INCREMENTAL = 1_000 # 1 Kbps - use incremental
# Below 1 Kbps - use minimal/QR
# RTT thresholds (milliseconds) - used as heuristic
RTT_FAST = 100 # <100ms likely ethernet/wifi
RTT_MEDIUM = 1000 # <1s likely internet
RTT_SLOW = 10000 # <10s likely LoRa/satellite
# QR code capacity
QR_MAX_BYTES = 2953 # QR version 40, binary mode
class BandwidthProbe:
"""Probes link quality to determine optimal sync strategy."""
# Probe packet sizes
PROBE_SMALL = 64 # Minimum probe
PROBE_MEDIUM = 512 # Medium probe
PROBE_LARGE = 2048 # Large probe (if link allows)
def __init__(self, link: RadicleLink):
self.link = link
self._measurements: list = []
def measure_rtt(self, samples: int = 3) -> float:
"""Measure round-trip time with multiple samples."""
rtts = []
for _ in range(samples):
# Send probe packet
probe_data = struct.pack("!BQ", 0x01, int(time.time() * 1000000))
start = time.time()
if not self.link.send(probe_data):
continue
# Wait for response
response = self.link.recv(timeout=30.0)
if response:
rtt = (time.time() - start) * 1000 # ms
rtts.append(rtt)
time.sleep(0.1) # Brief pause between probes
if not rtts:
return float('inf')
# Use median RTT
rtts.sort()
return rtts[len(rtts) // 2]
def measure_throughput(self, duration: float = 2.0) -> float:
"""Measure throughput by sending test data."""
# Start with small packets, increase if successful
packet_size = self.PROBE_SMALL
bytes_sent = 0
start = time.time()
while time.time() - start < duration:
# Create probe packet
probe_data = struct.pack("!BI", 0x02, packet_size) + b"\x00" * (packet_size - 5)
if self.link.send(probe_data):
bytes_sent += len(probe_data)
# Try larger packets if successful
if packet_size < self.PROBE_LARGE:
packet_size = min(packet_size * 2, self.PROBE_LARGE)
else:
# Reduce packet size on failure
packet_size = max(packet_size // 2, self.PROBE_SMALL)
time.sleep(0.01) # Small delay
elapsed = time.time() - start
if elapsed > 0:
return (bytes_sent * 8) / elapsed # bits per second
return 0
def detect_link_type(self) -> bool:
"""Detect if link is LoRa based on characteristics."""
# LoRa typically has:
# - High RTT (>1s)
# - Low throughput (<10 Kbps)
# - Link properties may indicate
if hasattr(self.link.rns_link, 'get_establishment_rate'):
rate = self.link.rns_link.get_establishment_rate()
if rate and rate < 10000: # <10 Kbps
return True
# Check RTT heuristic
if self.link.rtt and self.link.rtt > 1.0: # >1 second RTT
return True
return False
def probe(self, quick: bool = False) -> LinkQuality:
"""Probe link and return quality assessment.
Args:
quick: If True, use faster but less accurate probing
"""
# Quick mode uses existing RTT if available
if quick and self.link.rtt:
rtt_ms = self.link.rtt * 1000
# Estimate throughput from RTT
if rtt_ms < RTT_FAST:
throughput = THRESHOLD_FULL * 10 # Assume fast
elif rtt_ms < RTT_MEDIUM:
throughput = THRESHOLD_FULL
elif rtt_ms < RTT_SLOW:
throughput = THRESHOLD_INCREMENTAL * 10
else:
throughput = THRESHOLD_INCREMENTAL / 10
else:
# Full measurement
rtt_ms = self.measure_rtt(samples=2 if quick else 3)
throughput = self.measure_throughput(duration=1.0 if quick else 2.0)
is_lora = self.detect_link_type()
# Determine strategy
if throughput >= THRESHOLD_FULL and rtt_ms < RTT_MEDIUM:
strategy = SyncStrategy.FULL
elif throughput >= THRESHOLD_INCREMENTAL:
strategy = SyncStrategy.INCREMENTAL
else:
strategy = SyncStrategy.MINIMAL
return LinkQuality(
rtt_ms=rtt_ms,
throughput_bps=throughput,
packet_loss=0.0, # TODO: measure
is_lora=is_lora,
strategy=strategy,
)
def estimate_transfer_time(size_bytes: int, quality: LinkQuality) -> float:
"""Estimate transfer time in seconds for given size and quality."""
if quality.throughput_bps <= 0:
return float('inf')
# Account for protocol overhead (~20%)
effective_throughput = quality.throughput_bps * 0.8
bits = size_bytes * 8
return bits / effective_throughput
def select_strategy(
bundle_size: int,
incremental_size: Optional[int],
quality: LinkQuality,
max_transfer_time: float = 3600, # 1 hour default max
) -> Tuple[SyncStrategy, str]:
"""Select optimal sync strategy based on sizes and link quality.
Returns (strategy, reason).
"""
# Check if QR is viable for tiny payloads
if incremental_size and incremental_size <= QR_MAX_BYTES:
return SyncStrategy.QR, f"Incremental fits in QR ({incremental_size} bytes)"
# Calculate transfer times
full_time = estimate_transfer_time(bundle_size, quality)
incr_time = estimate_transfer_time(incremental_size or bundle_size, quality)
# If LoRa detected, prefer minimal/incremental
if quality.is_lora:
if incremental_size and incr_time < max_transfer_time:
return SyncStrategy.INCREMENTAL, f"LoRa link, incremental viable ({incr_time:.0f}s)"
return SyncStrategy.MINIMAL, "LoRa link, request-based sync recommended"
# For fast links, full sync if reasonable
if quality.strategy == SyncStrategy.FULL:
if full_time < 60: # Under 1 minute
return SyncStrategy.FULL, f"Fast link, full sync in {full_time:.0f}s"
elif incremental_size and incr_time < full_time / 2:
return SyncStrategy.INCREMENTAL, f"Large repo, incremental faster ({incr_time:.0f}s vs {full_time:.0f}s)"
return SyncStrategy.FULL, f"Fast link, full sync in {full_time:.0f}s"
# For medium links, prefer incremental
if incremental_size and incr_time < max_transfer_time:
return SyncStrategy.INCREMENTAL, f"Medium link, incremental in {incr_time:.0f}s"
if full_time < max_transfer_time:
return SyncStrategy.FULL, f"No incremental available, full in {full_time:.0f}s"
return SyncStrategy.MINIMAL, f"Transfer too slow ({full_time:.0f}s), use request-based"
class AdaptiveSyncManager:
"""Sync manager with automatic bandwidth adaptation."""
def __init__(self, sync_manager):
"""Wrap a SyncManager with adaptive capabilities."""
self.sync_manager = sync_manager
self._link_qualities: dict = {} # peer_hash -> LinkQuality
def probe_peer(self, link: RadicleLink, quick: bool = True) -> LinkQuality:
"""Probe a peer's link quality."""
prober = BandwidthProbe(link)
quality = prober.probe(quick=quick)
# Cache result
if link.remote_identity:
self._link_qualities[link.remote_identity.hash] = quality
return quality
def get_cached_quality(self, peer_hash: bytes) -> Optional[LinkQuality]:
"""Get cached link quality for a peer."""
return self._link_qualities.get(peer_hash)
def adaptive_sync(
self,
repository_id: str,
link: RadicleLink,
peer_refs: Optional[dict] = None,
) -> Tuple[bool, str]:
"""Perform adaptive sync based on link quality.
Returns (success, description).
"""
from radicle_reticulum.git_bundle import GitBundleGenerator, estimate_bundle_size
from radicle_reticulum.sync import SyncMode
# Get repository info
state = self.sync_manager._repos.get(repository_id)
if not state:
return False, "Repository not registered"
# Probe link quality
quality = self.probe_peer(link, quick=True)
RNS.log(f"Link quality: {quality}", RNS.LOG_INFO)
# Estimate bundle sizes
generator = GitBundleGenerator(state.local_path)
full_size = estimate_bundle_size(state.local_path)
# Estimate incremental size (rough: compare ref counts)
current_refs = generator.get_refs()
if peer_refs:
changed = sum(1 for r, s in current_refs.items()
if r not in peer_refs or peer_refs[r] != s)
# Rough estimate: assume ~10KB per changed ref average
incr_size = changed * 10240
else:
incr_size = None
# Select strategy
strategy, reason = select_strategy(full_size, incr_size, quality)
RNS.log(f"Selected strategy: {strategy.value} - {reason}", RNS.LOG_INFO)
# Execute strategy
if strategy == SyncStrategy.QR:
bundle = self.sync_manager.create_sync_bundle(
repository_id, peer_refs=peer_refs, mode=SyncMode.INCREMENTAL
)
if bundle:
# Generate QR (handled by caller)
return True, f"QR: {len(bundle.encode())} bytes"
return False, "No changes for QR"
elif strategy == SyncStrategy.FULL:
bundle = self.sync_manager.create_sync_bundle(
repository_id, mode=SyncMode.FULL
)
elif strategy == SyncStrategy.INCREMENTAL:
bundle = self.sync_manager.create_sync_bundle(
repository_id, peer_refs=peer_refs, mode=SyncMode.INCREMENTAL
)
else: # MINIMAL
# Just announce refs, let peer request what they need
self.sync_manager.announce_refs(repository_id)
return True, "Announced refs for request-based sync"
if bundle is None:
return True, "No changes to sync"
# Send bundle
if link.remote_identity:
success = self.sync_manager.send_bundle(
bundle, link.remote_identity.hash
)
if success:
return True, f"Sent {bundle.metadata.bundle_type.value} bundle ({bundle.metadata.size_bytes} bytes)"
return False, "Failed to send bundle"
return False, "No remote identity on link"

View File

@ -15,6 +15,7 @@ The bridge:
4. Remote bridges forward to their local radicle-node
"""
import os
import socket
import select
import struct
@ -90,6 +91,7 @@ class RadicleBridge:
auto_connect: bool = True,
auto_seed: bool = True,
announce_retry_delays: Tuple[int, ...] = (5, 15, 30),
rad_home: Optional[str] = None,
):
"""Initialize the bridge.
@ -103,6 +105,8 @@ class RadicleBridge:
auto_seed: Automatically register discovered NIDs with radicle-node
announce_retry_delays: Seconds between startup re-announces. Use longer
intervals on LoRa to respect duty cycle limits, e.g. (60, 300, 900).
rad_home: RAD_HOME for rad CLI calls. None = use system default.
Set to seed home when bridging a dedicated seed node.
"""
self.listen_port = listen_port
self.radicle_host = radicle_host
@ -110,6 +114,7 @@ class RadicleBridge:
self.auto_connect = auto_connect
self.auto_seed = auto_seed
self.announce_retry_delays = announce_retry_delays
self.rad_home = rad_home
# Initialize Reticulum
self.reticulum = RNS.Reticulum(config_path)
@ -557,12 +562,18 @@ class RadicleBridge:
addr = f"{radicle_nid}@127.0.0.1:{self.listen_port}"
RNS.log(f"Registering seed: {addr}", RNS.LOG_INFO)
env = None
if self.rad_home:
env = os.environ.copy()
env["RAD_HOME"] = str(self.rad_home)
try:
result = subprocess.run(
["rad", "node", "connect", addr],
capture_output=True,
text=True,
timeout=30,
env=env,
)
if result.returncode == 0:
RNS.log(f"Seed registered: {radicle_nid[:16]}...", RNS.LOG_INFO)

View File

@ -1,7 +1,6 @@
"""Command-line interface for Radicle-Reticulum adapter."""
import argparse
import json
import os
import subprocess
import sys
@ -16,24 +15,9 @@ import RNS
from radicle_reticulum.adapter import RNSTransportAdapter, PeerInfo
from radicle_reticulum.identity import RadicleIdentity
from radicle_reticulum.git_bundle import (
GitBundleGenerator,
GitBundleApplicator,
BundleType,
estimate_bundle_size,
)
from radicle_reticulum.sync import (
SyncManager,
SyncMode,
create_dead_drop_bundle,
apply_dead_drop_bundle,
)
from radicle_reticulum.adaptive import (
SyncStrategy,
LinkQuality,
select_strategy,
)
from radicle_reticulum.bridge import RadicleBridge
from radicle_reticulum.gossip import GossipRelay
from radicle_reticulum.seed import SeedNode, DEFAULT_SEED_HOME, DEFAULT_SEED_PORT
def detect_radicle_nid() -> Optional[str]:
@ -235,319 +219,208 @@ def cmd_peers(args):
adapter.stop()
def cmd_bundle_create(args):
"""Create a bundle from a repository."""
repo_path = Path(args.repo).resolve()
if not repo_path.exists():
print(f"Error: Repository not found: {repo_path}", file=sys.stderr)
sys.exit(1)
# Generate identity for signing
identity = RadicleIdentity.generate()
source_node = identity.did
# Repository ID
repo_id = args.repo_id or f"rad:local:{repo_path.name}"
def _detect_rid(repo_path: Path) -> Optional[str]:
"""Detect the Radicle RID for the repo at repo_path via 'rad inspect'."""
try:
generator = GitBundleGenerator(repo_path)
# Get current refs
refs = generator.get_refs()
print(f"Repository: {repo_path}")
print(f"Refs found: {len(refs)}")
for ref, sha in refs.items():
print(f" {ref}: {sha[:12]}")
print()
# Determine output path
if args.output:
output_path = Path(args.output)
else:
timestamp = int(time.time())
output_path = Path(f"{repo_path.name}-{timestamp}.bundle")
# Create bundle
if args.incremental and args.basis:
# Load basis refs from file
basis_refs = json.loads(Path(args.basis).read_text())
bundle = generator.create_incremental_bundle(
repository_id=repo_id,
source_node=source_node,
basis_refs=basis_refs,
output_path=output_path,
)
if bundle is None:
print("No changes to bundle (repository is up to date)")
sys.exit(0)
else:
bundle = generator.create_full_bundle(
repository_id=repo_id,
source_node=source_node,
output_path=output_path,
)
# Also save transport format
transport_path = output_path.with_suffix(".radicle-bundle")
transport_path.write_bytes(bundle.encode())
print(f"Bundle created: {output_path}")
print(f"Transport file: {transport_path}")
print(f"Type: {bundle.metadata.bundle_type.value}")
print(f"Size: {bundle.metadata.size_bytes:,} bytes")
print(f"Refs: {len(bundle.metadata.refs_included)}")
# Save current refs for future incremental
refs_path = output_path.with_suffix(".refs.json")
refs_path.write_text(json.dumps(refs, indent=2))
print(f"Refs saved: {refs_path}")
except Exception as e:
print(f"Error creating bundle: {e}", file=sys.stderr)
sys.exit(1)
def cmd_bundle_apply(args):
"""Apply a bundle to a repository."""
bundle_path = Path(args.bundle).resolve()
repo_path = Path(args.repo).resolve()
if not bundle_path.exists():
print(f"Error: Bundle not found: {bundle_path}", file=sys.stderr)
sys.exit(1)
if not repo_path.exists():
print(f"Error: Repository not found: {repo_path}", file=sys.stderr)
sys.exit(1)
try:
# Determine bundle format
if bundle_path.suffix == ".radicle-bundle":
# Full transport format with metadata
applied = apply_dead_drop_bundle(bundle_path, repo_path)
else:
# Raw git bundle
applicator = GitBundleApplicator(repo_path)
# Create a minimal GitBundle wrapper
from radicle_reticulum.git_bundle import GitBundle, BundleMetadata
import hashlib
bundle_data = bundle_path.read_bytes()
metadata = BundleMetadata(
bundle_type=BundleType.FULL,
repository_id="unknown",
source_node="unknown",
timestamp=int(time.time() * 1000),
refs_included=[],
prerequisites=[],
size_bytes=len(bundle_data),
checksum=hashlib.sha256(bundle_data).digest(),
)
bundle = GitBundle(metadata=metadata, data=bundle_data)
applied = applicator.apply_bundle(bundle)
print(f"Bundle applied successfully!")
print(f"Applied refs: {len(applied)}")
for ref, sha in applied.items():
print(f" {ref}: {sha[:12]}")
except Exception as e:
print(f"Error applying bundle: {e}", file=sys.stderr)
sys.exit(1)
def cmd_bundle_info(args):
"""Show information about a bundle."""
bundle_path = Path(args.bundle).resolve()
if not bundle_path.exists():
print(f"Error: Bundle not found: {bundle_path}", file=sys.stderr)
sys.exit(1)
try:
if bundle_path.suffix == ".radicle-bundle":
# Full transport format
from radicle_reticulum.git_bundle import GitBundle
bundle = GitBundle.decode(bundle_path.read_bytes())
print(f"Bundle: {bundle_path}")
print(f"Format: Radicle transport bundle")
print(f"Type: {bundle.metadata.bundle_type.value}")
print(f"Repository: {bundle.metadata.repository_id}")
print(f"Source: {bundle.metadata.source_node}")
print(f"Timestamp: {time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(bundle.metadata.timestamp / 1000))}")
print(f"Size: {bundle.metadata.size_bytes:,} bytes")
print(f"Checksum: {bundle.metadata.checksum.hex()}")
print(f"Refs ({len(bundle.metadata.refs_included)}):")
for ref in bundle.metadata.refs_included:
print(f" {ref}")
if bundle.metadata.prerequisites:
print(f"Prerequisites ({len(bundle.metadata.prerequisites)}):")
for prereq in bundle.metadata.prerequisites:
print(f" {prereq}")
else:
# Raw git bundle - use git to inspect
import subprocess
result = subprocess.run(
["git", "bundle", "list-heads", str(bundle_path)],
capture_output=True,
text=True,
["rad", "inspect", "--rid"],
cwd=repo_path,
capture_output=True, text=True, timeout=5,
)
print(f"Bundle: {bundle_path}")
print(f"Format: Raw Git bundle")
print(f"Size: {bundle_path.stat().st_size:,} bytes")
print(f"Refs:")
for line in result.stdout.strip().split("\n"):
if line:
print(f" {line}")
except Exception as e:
print(f"Error reading bundle: {e}", file=sys.stderr)
sys.exit(1)
if result.returncode == 0:
rid = result.stdout.strip()
if rid.startswith("rad:"):
return rid
except Exception:
pass
return None
def cmd_bundle_qr_encode(args):
"""Encode a bundle as a QR code for visual / air-gapped transfer."""
from radicle_reticulum.qr import encode_bundle_to_qr, BundleTooLargeForQR, QR_MAX_BYTES
from radicle_reticulum.git_bundle import GitBundle
bundle_path = Path(args.bundle).resolve()
if not bundle_path.exists():
print(f"Error: Bundle not found: {bundle_path}", file=sys.stderr)
sys.exit(1)
try:
bundle = GitBundle.decode(bundle_path.read_bytes())
except Exception as e:
print(f"Error reading bundle: {e}", file=sys.stderr)
sys.exit(1)
output_path = Path(args.output) if args.output else None
try:
ascii_art = encode_bundle_to_qr(
bundle,
output_path=output_path,
error_correction=args.error_correction,
)
except BundleTooLargeForQR as e:
print(f"Error: {e}", file=sys.stderr)
print(f"Tip: Use 'bundle create --incremental' to reduce bundle size.", file=sys.stderr)
sys.exit(1)
except ImportError as e:
print(f"Error: {e}", file=sys.stderr)
sys.exit(1)
print(ascii_art)
if output_path:
print(f"QR image saved: {output_path}")
print(f"Bundle size: {len(bundle.encode())} / {QR_MAX_BYTES} bytes")
print(f"Repository: {bundle.metadata.repository_id}")
print(f"Refs: {len(bundle.metadata.refs_included)}")
def cmd_bundle_qr_decode(args):
"""Decode a bundle from a QR code image file."""
from radicle_reticulum.qr import decode_bundle_from_qr_image
image_path = Path(args.image).resolve()
if not image_path.exists():
print(f"Error: Image not found: {image_path}", file=sys.stderr)
sys.exit(1)
try:
bundle = decode_bundle_from_qr_image(image_path)
except ImportError as e:
print(f"Error: {e}", file=sys.stderr)
sys.exit(1)
except ValueError as e:
print(f"Error: {e}", file=sys.stderr)
sys.exit(1)
output_path = Path(args.output) if args.output else image_path.with_suffix(".radicle-bundle")
output_path.write_bytes(bundle.encode())
print(f"Bundle decoded successfully!")
print(f" Repository: {bundle.metadata.repository_id}")
print(f" Refs: {len(bundle.metadata.refs_included)}")
print(f" Size: {bundle.metadata.size_bytes:,} bytes")
print(f" Saved to: {output_path}")
print()
print(f"Apply with: radicle-rns bundle apply {output_path} <repo-path>")
def cmd_sync(args):
"""Sync a repository with a peer."""
repo_path = Path(args.repo).resolve()
if not repo_path.exists():
print(f"Error: Repository not found: {repo_path}", file=sys.stderr)
sys.exit(1)
def cmd_gossip(args):
"""Run the gossip relay daemon."""
identity = RadicleIdentity.load_or_generate(args.identity)
repo_id = args.repo_id or f"rad:local:{repo_path.name}"
_print_identity_info(args.identity)
print(f"Starting sync manager...")
print(f"Repository: {repo_path}")
print(f"Node ID: {identity.did}")
sync_manager = SyncManager(identity=identity)
sync_manager.start()
sync_manager.register_repository(repo_id, repo_path)
if args.peer:
# Connect to specific peer
try:
dest_hash = bytes.fromhex(args.peer)
except ValueError:
print(f"Error: Invalid peer hash", file=sys.stderr)
# Collect RIDs: explicit args + auto-detect from CWD
rids = list(args.rids)
if not rids:
rid = _detect_rid(Path.cwd())
if rid:
print(f"Auto-detected RID: {rid}")
rids.append(rid)
else:
print("Error: no RIDs given and could not auto-detect from current directory.", file=sys.stderr)
print("Pass one or more RIDs as arguments, or run from inside a radicle repo.", file=sys.stderr)
sys.exit(1)
print(f"Creating bundle for peer {args.peer}...")
nid = args.nid or detect_radicle_nid()
if nid:
print(f"Local NID: {nid}")
# Load peer's known refs if available
peer_refs = None
if args.peer_refs:
peer_refs = json.loads(Path(args.peer_refs).read_text())
try:
announce_retry_delays = tuple(
int(x.strip()) for x in args.announce_retry_delays.split(",") if x.strip()
)
except ValueError:
print("Error: --announce-retry-delays must be comma-separated integers.", file=sys.stderr)
sys.exit(1)
mode = SyncMode.INCREMENTAL if peer_refs else SyncMode.FULL
bundle = sync_manager.create_sync_bundle(repo_id, peer_refs=peer_refs, mode=mode)
relay = GossipRelay(
identity=identity,
rids=rids,
radicle_nid=nid,
bridge_port=args.bridge_port,
poll_interval=args.poll_interval,
announce_retry_delays=announce_retry_delays,
)
if bundle:
print(f"Bundle created: {bundle.metadata.size_bytes:,} bytes")
print(f"Type: {bundle.metadata.bundle_type.value}")
def on_sync(rid, peer_nid):
print(f"[sync] {rid[:24]}... from {peer_nid[:24] if peer_nid else 'peer'}")
if args.send:
print(f"Sending to peer...")
if sync_manager.send_bundle(bundle, dest_hash):
print("Bundle sent successfully!")
else:
print("Failed to send bundle")
else:
print("No changes to sync")
else:
# Listen for incoming syncs
print("Listening for sync requests... Press Ctrl+C to stop.")
relay.set_on_sync_triggered(on_sync)
relay.start()
def on_bundle(bundle):
print(f"Received bundle: {bundle.metadata.repository_id}")
print(f" From: {bundle.metadata.source_node}")
print(f" Size: {bundle.metadata.size_bytes:,} bytes")
sync_manager.set_on_bundle_received(on_bundle)
print()
print(f"Gossip relay running:")
print(f" RNS address: {relay.destination.hexhash}")
print(f" Repos: {', '.join(r[:28] for r in rids)}")
print(f" Poll: every {args.poll_interval}s")
print()
print("Press Ctrl+C to stop.")
print()
running = True
def signal_handler(sig, frame):
nonlocal running
print("\nShutting down...")
running = False
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
last_stats = None
try:
while running:
time.sleep(0.5)
stats = relay.get_stats()
if stats != last_stats:
print(f"[Status] Peers: {stats['known_peers']}, "
f"Repos: {stats['watched_repos']}, "
f"Refs: {stats['refs_per_repo']}")
last_stats = dict(stats)
time.sleep(5)
finally:
relay.stop()
print("Gossip relay stopped.")
sync_manager.stop()
def cmd_seed(args):
"""Start a dedicated seed radicle-node and bridge it to the mesh."""
seed_home = Path(args.seed_home)
seed = SeedNode(seed_home=seed_home, port=args.seed_port)
# First-time setup: guide the user
if not seed.is_initialized():
seed.write_config()
print(f"Seed home: {seed_home}")
print()
print("Seed identity not found. Initialize it with:")
print(f" RAD_HOME={seed_home} rad auth")
print()
print("Then run 'radicle-rns seed' again.")
sys.exit(1)
# Start seed radicle-node
print(f"Starting seed radicle-node (port {args.seed_port})...")
try:
seed.start()
except RuntimeError as e:
print(f"Error: {e}", file=sys.stderr)
sys.exit(1)
nid = seed.get_nid()
if not nid:
print("Error: could not get seed NID.", file=sys.stderr)
seed.stop()
sys.exit(1)
# Start bridge pointing at the seed
identity = RadicleIdentity.load_or_generate(args.identity)
_print_identity_info(args.identity)
try:
announce_retry_delays = tuple(
int(x.strip()) for x in args.announce_retry_delays.split(",") if x.strip()
)
except ValueError:
print("Error: --announce-retry-delays must be comma-separated integers.", file=sys.stderr)
seed.stop()
sys.exit(1)
bridge = RadicleBridge(
identity=identity,
listen_port=args.bridge_port,
radicle_host="127.0.0.1",
radicle_port=args.seed_port,
auto_connect=True,
auto_seed=True,
announce_retry_delays=announce_retry_delays,
rad_home=str(seed_home),
)
bridge.set_local_radicle_nid(nid)
def on_peer_seed_discovered(dest_hash, remote_nid=None):
nid_info = f" (NID: {remote_nid[:32]})" if remote_nid else ""
print(f"[+] Discovered remote seed: {dest_hash.hex()[:16]}{nid_info}")
bridge.set_on_bridge_discovered(on_peer_seed_discovered)
running = True
def signal_handler(sig, frame):
nonlocal running
print("\nShutting down...")
running = False
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
try:
bridge.start()
print()
print("Seed node running.")
print(f" Seed NID: {nid}")
print(f" Seed port: {args.seed_port}")
print(f" Bridge hash: {bridge.destination.hexhash}")
print()
print("Add this seed to your radicle node:")
print(f" rad node connect {nid}@127.0.0.1:{args.seed_port}")
print()
print("Other machines running 'radicle-rns seed' will discover this")
print("seed automatically and sync over the mesh.")
print()
print("Press Ctrl+C to stop.")
print()
last_known_bridges = -1
while running:
if not seed.is_running():
print("Seed radicle-node exited unexpectedly.", file=sys.stderr)
break
stats = bridge.get_stats()
if stats["known_bridges"] != last_known_bridges:
last_known_bridges = stats["known_bridges"]
print(f"[Status] Remote seeds: {stats['known_bridges']}, "
f"Tunnels: {stats['active_tunnels']}")
time.sleep(5)
finally:
bridge.stop()
seed.stop()
print("Seed stopped.")
def cmd_bridge(args):
@ -666,7 +539,6 @@ def cmd_bridge(args):
# Show status changes
stats = bridge.get_stats()
if stats != last_stats:
bridges = bridge.get_remote_bridges()
print(f"[Status] Tunnels: {stats['active_tunnels']}, "
f"Remote bridges: {stats['known_bridges']}, "
f"TX: {stats['bytes_sent']}, RX: {stats['bytes_received']}")
@ -740,57 +612,78 @@ def main():
)
add_identity_arg(peers_parser)
# bundle command group
bundle_parser = subparsers.add_parser("bundle", help="Git bundle operations")
bundle_subparsers = bundle_parser.add_subparsers(dest="bundle_command")
# bundle create
bundle_create_parser = bundle_subparsers.add_parser("create", help="Create a bundle")
bundle_create_parser.add_argument("repo", help="Path to Git repository")
bundle_create_parser.add_argument("-o", "--output", help="Output bundle path")
bundle_create_parser.add_argument("--repo-id", help="Radicle repository ID")
bundle_create_parser.add_argument("--incremental", action="store_true",
help="Create incremental bundle")
bundle_create_parser.add_argument("--basis", help="Basis refs JSON file for incremental")
# bundle apply
bundle_apply_parser = bundle_subparsers.add_parser("apply", help="Apply a bundle")
bundle_apply_parser.add_argument("bundle", help="Path to bundle file")
bundle_apply_parser.add_argument("repo", help="Path to Git repository")
# bundle info
bundle_info_parser = bundle_subparsers.add_parser("info", help="Show bundle info")
bundle_info_parser.add_argument("bundle", help="Path to bundle file")
# bundle qr-encode
bundle_qr_enc_parser = bundle_subparsers.add_parser(
"qr-encode", help=f"Encode bundle as QR code (max {2953} bytes)"
# gossip command
gossip_parser = subparsers.add_parser(
"gossip",
help="Run gossip relay: watch refs, notify peers, trigger rad sync",
)
bundle_qr_enc_parser.add_argument("bundle", help="Path to .radicle-bundle file")
bundle_qr_enc_parser.add_argument("-o", "--output", help="Output PNG path (requires Pillow)")
bundle_qr_enc_parser.add_argument(
"--ec", dest="error_correction", default="L",
choices=["L", "M", "Q", "H"],
help="QR error correction level (default: L = max capacity)",
gossip_parser.add_argument(
"rids",
nargs="*",
metavar="RID",
help="Radicle repo IDs to watch (auto-detected from CWD if omitted)",
)
# bundle qr-decode
bundle_qr_dec_parser = bundle_subparsers.add_parser(
"qr-decode", help="Decode bundle from QR code image (requires pyzbar + Pillow)"
gossip_parser.add_argument(
"--nid",
help="Local radicle NID to advertise (auto-detected if omitted)",
)
bundle_qr_dec_parser.add_argument("image", help="Path to QR code image")
bundle_qr_dec_parser.add_argument("-o", "--output", help="Output .radicle-bundle path")
gossip_parser.add_argument(
"--bridge-port",
type=int,
default=8777,
metavar="PORT",
help="TCP port of the local bridge (default: 8777)",
)
gossip_parser.add_argument(
"--poll-interval",
type=int,
default=30,
metavar="SECONDS",
help="Seconds between ref polls (default: 30)",
)
gossip_parser.add_argument(
"--announce-retry-delays",
default="5,15,30",
metavar="SECONDS",
help="Startup re-announce delays, comma-separated (default: 5,15,30). "
"Use longer values on LoRa, e.g. 60,300,900",
)
add_identity_arg(gossip_parser)
# sync command
sync_parser = subparsers.add_parser("sync", help="Sync repository with peers")
sync_parser.add_argument("repo", help="Path to Git repository")
sync_parser.add_argument("--repo-id", help="Radicle repository ID")
sync_parser.add_argument("--peer", help="Peer RNS hash to sync with")
sync_parser.add_argument("--peer-refs", help="JSON file with peer's known refs")
sync_parser.add_argument("--send", action="store_true", help="Send bundle to peer")
add_identity_arg(sync_parser)
# seed command
seed_parser = subparsers.add_parser(
"seed",
help="Start a Radicle seed node and bridge it to the mesh over Reticulum",
)
seed_parser.add_argument(
"--seed-home",
default=str(DEFAULT_SEED_HOME),
metavar="PATH",
help=f"RAD_HOME for the seed node (default: {DEFAULT_SEED_HOME})",
)
seed_parser.add_argument(
"--seed-port",
type=int,
default=DEFAULT_SEED_PORT,
metavar="PORT",
help=f"TCP port for the seed radicle-node (default: {DEFAULT_SEED_PORT})",
)
seed_parser.add_argument(
"--bridge-port",
type=int,
default=8778,
metavar="PORT",
help="TCP listen port for the seed bridge (default: 8778)",
)
seed_parser.add_argument(
"--announce-retry-delays",
default="5,15,30",
metavar="SECONDS",
help="Startup re-announce delays, comma-separated (default: 5,15,30). "
"Use longer values on LoRa, e.g. 60,300,900",
)
add_identity_arg(seed_parser)
# bridge command
bridge_parser = subparsers.add_parser("bridge", help="Run Radicle-Reticulum bridge")
bridge_parser.add_argument(
"-l", "--listen-port",
@ -855,22 +748,10 @@ def main():
cmd_ping(args)
elif args.command == "peers":
cmd_peers(args)
elif args.command == "bundle":
if args.bundle_command == "create":
cmd_bundle_create(args)
elif args.bundle_command == "apply":
cmd_bundle_apply(args)
elif args.bundle_command == "info":
cmd_bundle_info(args)
elif args.bundle_command == "qr-encode":
cmd_bundle_qr_encode(args)
elif args.bundle_command == "qr-decode":
cmd_bundle_qr_decode(args)
else:
bundle_parser.print_help()
sys.exit(1)
elif args.command == "sync":
cmd_sync(args)
elif args.command == "gossip":
cmd_gossip(args)
elif args.command == "seed":
cmd_seed(args)
elif args.command == "bridge":
cmd_bridge(args)
else:

View File

@ -1,467 +0,0 @@
"""Git bundle generation and application for Radicle repos.
Supports both full repository bundles and incremental bundles
containing only new commits since a known state.
Radicle stores data under these ref namespaces:
- refs/heads/* - Git branches
- refs/tags/* - Git tags
- refs/rad/id - Repository identity
- refs/rad/sigrefs - Signed refs
- refs/rad/cob/* - Collaborative Objects (issues, patches, etc.)
- refs/rad/cob/xyz.issue/*
- refs/rad/cob/xyz.patch/*
"""
import os
import subprocess
import tempfile
import hashlib
from dataclasses import dataclass, field
from enum import Enum
from pathlib import Path
from typing import Dict, List, Optional, Set, Tuple
import struct
import time
class BundleType(Enum):
"""Type of Git bundle."""
FULL = "full" # Complete repository
INCREMENTAL = "incremental" # Only new commits
# Radicle ref patterns to include in sync
RADICLE_REF_PATTERNS = [
"refs/heads/*",
"refs/tags/*",
"refs/rad/id",
"refs/rad/sigrefs",
"refs/rad/cob/*",
]
@dataclass
class BundleMetadata:
"""Metadata about a Git bundle for transport."""
bundle_type: BundleType
repository_id: str # Radicle repo ID (rad:z...)
source_node: str # DID of source node
timestamp: int # Unix timestamp ms
refs_included: List[str] # List of refs in bundle
prerequisites: List[str] # Commits required (for incremental)
size_bytes: int
checksum: bytes # SHA-256 of bundle data
def encode(self) -> bytes:
"""Encode metadata to bytes."""
repo_bytes = self.repository_id.encode("utf-8")
node_bytes = self.source_node.encode("utf-8")
refs_data = b"".join(
struct.pack(f"!H{len(r)}s", len(r), r.encode("utf-8"))
for r in self.refs_included
)
prereq_data = b"".join(
struct.pack(f"!H{len(p)}s", len(p), p.encode("utf-8"))
for p in self.prerequisites
)
return struct.pack(
f"!BH{len(repo_bytes)}sH{len(node_bytes)}sQIH{len(refs_data)}sH{len(prereq_data)}s32s",
1 if self.bundle_type == BundleType.FULL else 2,
len(repo_bytes), repo_bytes,
len(node_bytes), node_bytes,
self.timestamp,
self.size_bytes,
len(self.refs_included), refs_data,
len(self.prerequisites), prereq_data,
self.checksum,
)
@classmethod
def decode(cls, data: bytes) -> Tuple["BundleMetadata", int]:
"""Decode metadata from bytes. Returns (metadata, bytes_consumed)."""
offset = 0
# Bundle type
bundle_type_raw = struct.unpack("!B", data[offset:offset+1])[0]
bundle_type = BundleType.FULL if bundle_type_raw == 1 else BundleType.INCREMENTAL
offset += 1
# Repository ID
repo_len = struct.unpack("!H", data[offset:offset+2])[0]
offset += 2
repository_id = data[offset:offset+repo_len].decode("utf-8")
offset += repo_len
# Source node
node_len = struct.unpack("!H", data[offset:offset+2])[0]
offset += 2
source_node = data[offset:offset+node_len].decode("utf-8")
offset += node_len
# Timestamp and size
timestamp, size_bytes = struct.unpack("!QI", data[offset:offset+12])
offset += 12
# Refs
refs_count = struct.unpack("!H", data[offset:offset+2])[0]
offset += 2
refs_included = []
for _ in range(refs_count):
ref_len = struct.unpack("!H", data[offset:offset+2])[0]
offset += 2
refs_included.append(data[offset:offset+ref_len].decode("utf-8"))
offset += ref_len
# Prerequisites
prereq_count = struct.unpack("!H", data[offset:offset+2])[0]
offset += 2
prerequisites = []
for _ in range(prereq_count):
prereq_len = struct.unpack("!H", data[offset:offset+2])[0]
offset += 2
prerequisites.append(data[offset:offset+prereq_len].decode("utf-8"))
offset += prereq_len
# Checksum
checksum = data[offset:offset+32]
offset += 32
return cls(
bundle_type=bundle_type,
repository_id=repository_id,
source_node=source_node,
timestamp=timestamp,
refs_included=refs_included,
prerequisites=prerequisites,
size_bytes=size_bytes,
checksum=checksum,
), offset
@dataclass
class GitBundle:
"""A Git bundle with metadata for transport."""
metadata: BundleMetadata
data: bytes
def encode(self) -> bytes:
"""Encode bundle with metadata for transport."""
meta_bytes = self.metadata.encode()
return struct.pack("!I", len(meta_bytes)) + meta_bytes + self.data
@classmethod
def decode(cls, data: bytes) -> "GitBundle":
"""Decode bundle from transport format."""
meta_len = struct.unpack("!I", data[:4])[0]
metadata, _ = BundleMetadata.decode(data[4:4+meta_len])
bundle_data = data[4+meta_len:]
# Verify checksum
actual_checksum = hashlib.sha256(bundle_data).digest()
if actual_checksum != metadata.checksum:
raise ValueError("Bundle checksum mismatch")
return cls(metadata=metadata, data=bundle_data)
def save(self, path: Path) -> None:
"""Save bundle data to file."""
path.write_bytes(self.data)
@property
def size(self) -> int:
"""Get total size including metadata."""
return len(self.encode())
class GitBundleGenerator:
"""Generates Git bundles from repositories."""
def __init__(self, repo_path: Path):
"""Initialize with path to Git repository."""
self.repo_path = Path(repo_path)
if not (self.repo_path / ".git").exists() and not (self.repo_path / "HEAD").exists():
raise ValueError(f"Not a Git repository: {repo_path}")
def _run_git(self, *args: str, check: bool = True) -> subprocess.CompletedProcess:
"""Run a git command in the repository."""
return subprocess.run(
["git", *args],
cwd=self.repo_path,
capture_output=True,
text=True,
check=check,
)
def _run_git_binary(self, *args: str) -> bytes:
"""Run a git command and return binary output."""
result = subprocess.run(
["git", *args],
cwd=self.repo_path,
capture_output=True,
check=True,
)
return result.stdout
def get_refs(self, patterns: Optional[List[str]] = None) -> Dict[str, str]:
"""Get refs matching patterns. Returns {ref_name: commit_sha}."""
if patterns is None:
patterns = RADICLE_REF_PATTERNS
refs = {}
for pattern in patterns:
result = self._run_git("for-each-ref", "--format=%(refname) %(objectname)", pattern, check=False)
if result.returncode == 0:
for line in result.stdout.strip().split("\n"):
if line:
parts = line.split()
if len(parts) == 2:
refs[parts[0]] = parts[1]
return refs
def get_radicle_repo_id(self) -> Optional[str]:
"""Get Radicle repository ID if this is a Radicle repo."""
# Radicle stores repo ID in .git/rad or config
rad_dir = self.repo_path / ".git" / "rad"
if rad_dir.exists():
# Try to read from rad config
try:
result = self._run_git("config", "--get", "rad.id", check=False)
if result.returncode == 0:
return result.stdout.strip()
except Exception:
pass
return None
def create_full_bundle(
self,
repository_id: str,
source_node: str,
output_path: Optional[Path] = None,
ref_patterns: Optional[List[str]] = None,
) -> GitBundle:
"""Create a full bundle containing all refs.
Args:
repository_id: Radicle repo ID (rad:z...)
source_node: DID of the source node
output_path: Optional path to write bundle file
ref_patterns: Ref patterns to include (default: Radicle patterns)
"""
refs = self.get_refs(ref_patterns)
if not refs:
raise ValueError("No refs to bundle")
# Create bundle with all refs
with tempfile.NamedTemporaryFile(suffix=".bundle", delete=False) as f:
bundle_path = f.name
try:
# Build ref list for bundle create
ref_args = list(refs.keys())
self._run_git("bundle", "create", bundle_path, *ref_args)
bundle_data = Path(bundle_path).read_bytes()
finally:
os.unlink(bundle_path)
metadata = BundleMetadata(
bundle_type=BundleType.FULL,
repository_id=repository_id,
source_node=source_node,
timestamp=int(time.time() * 1000),
refs_included=list(refs.keys()),
prerequisites=[],
size_bytes=len(bundle_data),
checksum=hashlib.sha256(bundle_data).digest(),
)
bundle = GitBundle(metadata=metadata, data=bundle_data)
if output_path:
bundle.save(output_path)
return bundle
def create_incremental_bundle(
self,
repository_id: str,
source_node: str,
basis_refs: Dict[str, str],
output_path: Optional[Path] = None,
ref_patterns: Optional[List[str]] = None,
) -> Optional[GitBundle]:
"""Create an incremental bundle with only new commits.
Args:
repository_id: Radicle repo ID
source_node: DID of the source node
basis_refs: Known refs at destination {ref_name: commit_sha}
output_path: Optional path to write bundle file
ref_patterns: Ref patterns to include
Returns:
GitBundle if there are changes, None if no changes
"""
current_refs = self.get_refs(ref_patterns)
if not current_refs:
return None
# Find refs that have changed or are new
changed_refs = {}
for ref, sha in current_refs.items():
if ref not in basis_refs or basis_refs[ref] != sha:
changed_refs[ref] = sha
if not changed_refs:
return None # No changes
# Build exclusion list (commits the destination already has)
exclusions = [f"^{sha}" for sha in basis_refs.values() if sha]
with tempfile.NamedTemporaryFile(suffix=".bundle", delete=False) as f:
bundle_path = f.name
try:
# Create bundle with changed refs, excluding known commits
bundle_args = list(changed_refs.keys()) + exclusions
result = self._run_git("bundle", "create", bundle_path, *bundle_args, check=False)
if result.returncode != 0:
# May fail if no new commits (all excluded)
if "empty bundle" in result.stderr.lower():
return None
raise subprocess.CalledProcessError(result.returncode, "git bundle create", result.stderr)
bundle_data = Path(bundle_path).read_bytes()
finally:
if os.path.exists(bundle_path):
os.unlink(bundle_path)
metadata = BundleMetadata(
bundle_type=BundleType.INCREMENTAL,
repository_id=repository_id,
source_node=source_node,
timestamp=int(time.time() * 1000),
refs_included=list(changed_refs.keys()),
prerequisites=list(basis_refs.values()),
size_bytes=len(bundle_data),
checksum=hashlib.sha256(bundle_data).digest(),
)
bundle = GitBundle(metadata=metadata, data=bundle_data)
if output_path:
bundle.save(output_path)
return bundle
class GitBundleApplicator:
"""Applies Git bundles to repositories."""
def __init__(self, repo_path: Path):
"""Initialize with path to Git repository."""
self.repo_path = Path(repo_path)
def _run_git(self, *args: str, check: bool = True) -> subprocess.CompletedProcess:
"""Run a git command in the repository."""
return subprocess.run(
["git", *args],
cwd=self.repo_path,
capture_output=True,
text=True,
check=check,
)
def verify_bundle(self, bundle: GitBundle) -> Tuple[bool, str]:
"""Verify a bundle can be applied.
Returns (success, message).
"""
with tempfile.NamedTemporaryFile(suffix=".bundle", delete=False) as f:
f.write(bundle.data)
bundle_path = f.name
try:
result = self._run_git("bundle", "verify", bundle_path, check=False)
if result.returncode == 0:
return True, "Bundle verified successfully"
else:
return False, result.stderr.strip()
finally:
os.unlink(bundle_path)
def apply_bundle(self, bundle: GitBundle, fetch_all: bool = True) -> Dict[str, str]:
"""Apply a bundle to the repository.
Args:
bundle: The GitBundle to apply
fetch_all: If True, fetch all refs from bundle
Returns:
Dict of applied refs {ref_name: commit_sha}
"""
with tempfile.NamedTemporaryFile(suffix=".bundle", delete=False) as f:
f.write(bundle.data)
bundle_path = f.name
try:
# Verify first
ok, msg = self.verify_bundle(bundle)
if not ok:
raise ValueError(f"Bundle verification failed: {msg}")
# List refs in bundle
result = self._run_git("bundle", "list-heads", bundle_path)
bundle_refs = {}
for line in result.stdout.strip().split("\n"):
if line:
parts = line.split()
if len(parts) >= 2:
bundle_refs[parts[1]] = parts[0]
# Fetch from bundle
if fetch_all:
# Fetch all refs, preserving their names
for ref in bundle_refs:
self._run_git("fetch", bundle_path, f"{ref}:{ref}", check=False)
else:
self._run_git("fetch", bundle_path)
return bundle_refs
finally:
os.unlink(bundle_path)
def get_current_refs(self, patterns: Optional[List[str]] = None) -> Dict[str, str]:
"""Get current refs for computing incremental basis."""
if patterns is None:
patterns = RADICLE_REF_PATTERNS
refs = {}
for pattern in patterns:
result = self._run_git("for-each-ref", "--format=%(refname) %(objectname)", pattern, check=False)
if result.returncode == 0:
for line in result.stdout.strip().split("\n"):
if line:
parts = line.split()
if len(parts) == 2:
refs[parts[0]] = parts[1]
return refs
def estimate_bundle_size(repo_path: Path, ref_patterns: Optional[List[str]] = None) -> int:
"""Estimate the size of a full bundle without creating it."""
result = subprocess.run(
["git", "count-objects", "-v"],
cwd=repo_path,
capture_output=True,
text=True,
)
# Parse size-pack from output
for line in result.stdout.split("\n"):
if line.startswith("size-pack:"):
# size-pack is in KB
return int(line.split(":")[1].strip()) * 1024
return 0

View File

@ -0,0 +1,374 @@
"""Gossip relay for Radicle over Reticulum.
Watches local Radicle storage for ref changes and sends tiny notification
packets to peer relays over RNS (including LoRa). On receipt, calls
'rad sync --fetch' so radicle-node pulls the actual git data through the
TCP bridge.
Flow:
refs change in ~/.radicle/storage/<rid>/
detect via poll
RNS packet "refs changed for rid X, nid Y" (~200-500 bytes)
peer relay receives it
peer calls: rad sync --fetch --rid X
radicle-node fetches via TCP bridge
"""
import json
import struct
import subprocess
import threading
import time
from pathlib import Path
from typing import Callable, Dict, List, Optional, Tuple
import RNS
from radicle_reticulum.identity import RadicleIdentity
APP_NAME = "radicle"
ASPECT_GOSSIP = "gossip"
GOSSIP_MAGIC = b"RADICLE_GOSSIP_V1"
DEFAULT_POLL_INTERVAL = 30 # seconds
PATH_REQUEST_TIMEOUT = 15 # seconds to wait for a path before giving up
def _radicle_storage_path() -> Path:
"""Return ~/.radicle/storage, using 'rad path' if available."""
try:
result = subprocess.run(
["rad", "path"],
capture_output=True, text=True, timeout=5,
)
if result.returncode == 0:
return Path(result.stdout.strip()) / "storage"
except Exception:
pass
return Path.home() / ".radicle" / "storage"
def _read_refs(storage: Path, rid: str) -> Dict[str, str]:
"""Read current git refs from radicle storage for a repo.
Returns {ref_name: sha} or empty dict if the repo isn't in storage yet.
"""
rid_hash = rid.removeprefix("rad:")
repo_path = storage / rid_hash
if not repo_path.exists():
return {}
try:
result = subprocess.run(
["git", "show-ref"],
cwd=repo_path,
capture_output=True, text=True, timeout=10,
)
refs: Dict[str, str] = {}
for line in result.stdout.splitlines():
parts = line.split(maxsplit=1)
if len(parts) == 2:
refs[parts[1]] = parts[0]
return refs
except Exception:
return {}
class GossipRelay:
"""Watches Radicle refs and notifies peers over RNS when they change.
Designed to run alongside the TCP bridge. The bridge carries the actual
git pack data; this relay only sends tiny "go fetch" signals.
"""
def __init__(
self,
identity: RadicleIdentity,
rids: List[str],
storage: Optional[Path] = None,
radicle_nid: Optional[str] = None,
bridge_port: int = 8777,
poll_interval: int = DEFAULT_POLL_INTERVAL,
announce_retry_delays: Tuple[int, ...] = (5, 15, 30),
config_path: Optional[str] = None,
):
"""
Args:
identity: RNS/Radicle identity for this relay.
rids: List of Radicle repository IDs to watch (e.g. 'rad:z3abc...').
storage: Path to radicle storage dir. Auto-detected if None.
radicle_nid: Local radicle NID to advertise to peers.
bridge_port: TCP port the bridge listens on (for rad node connect).
poll_interval: Seconds between ref polls.
announce_retry_delays: Startup re-announce delays (seconds).
config_path: Reticulum config path (None = default).
"""
self.identity = identity
self.rids = list(rids)
self.storage = storage or _radicle_storage_path()
self.radicle_nid = radicle_nid
self.bridge_port = bridge_port
self.poll_interval = poll_interval
self.announce_retry_delays = announce_retry_delays
self.reticulum = RNS.Reticulum(config_path)
self.destination = RNS.Destination(
identity.rns_identity,
RNS.Destination.IN,
RNS.Destination.SINGLE,
APP_NAME,
ASPECT_GOSSIP,
)
self.destination.set_packet_callback(self._on_packet)
self._known_peers: Dict[bytes, float] = {} # dest_hash -> last_seen
self._peers_lock = threading.Lock()
self._known_refs: Dict[str, Dict[str, str]] = {} # rid -> refs
self._refs_lock = threading.Lock()
self._running = False
self._on_sync_triggered: Optional[Callable[[str, str], None]] = None
# ── Lifecycle ────────────────────────────────────────────────────────────
def start(self):
"""Start the relay: announce, begin polling, register announce handler."""
self._running = True
RNS.Transport.register_announce_handler(self._on_announce)
self.announce()
threading.Thread(target=self._startup_announce_loop, daemon=True).start()
threading.Thread(target=self._poll_loop, daemon=True).start()
RNS.log(f"Gossip relay started: {self.destination.hexhash}", RNS.LOG_INFO)
RNS.log(
f" Watching {len(self.rids)} repo(s), polling every {self.poll_interval}s",
RNS.LOG_INFO,
)
def stop(self):
"""Stop the relay."""
self._running = False
RNS.log("Gossip relay stopped", RNS.LOG_INFO)
# ── Public API ───────────────────────────────────────────────────────────
def announce(self):
"""Announce this relay on the RNS network."""
app_data = GOSSIP_MAGIC
if self.radicle_nid:
nid_bytes = self.radicle_nid.encode()
app_data += struct.pack("!H", len(nid_bytes)) + nid_bytes
self.destination.announce(app_data=app_data)
RNS.log(f"Gossip relay announced: {self.destination.hexhash}", RNS.LOG_DEBUG)
def set_on_sync_triggered(self, callback: Callable[[str, str], None]):
"""Set callback invoked when an incoming gossip message triggers a sync.
Callback signature: callback(rid: str, nid: str)
"""
self._on_sync_triggered = callback
def push_refs_now(self, rid: str):
"""Immediately broadcast current refs for a repo to all known peers."""
refs = _read_refs(self.storage, rid)
if refs:
with self._refs_lock:
self._known_refs[rid] = refs
self._broadcast(rid, refs)
def get_stats(self) -> dict:
with self._peers_lock:
peer_count = len(self._known_peers)
with self._refs_lock:
refs_per_repo = {rid: len(r) for rid, r in self._known_refs.items()}
return {
"known_peers": peer_count,
"watched_repos": len(self.rids),
"refs_per_repo": refs_per_repo,
}
# ── Internal: polling ────────────────────────────────────────────────────
def _startup_announce_loop(self):
for delay in self.announce_retry_delays:
time.sleep(delay)
if not self._running:
return
self.announce()
def _poll_loop(self):
while self._running:
for rid in self.rids:
try:
refs = _read_refs(self.storage, rid)
with self._refs_lock:
old = self._known_refs.get(rid)
changed = bool(refs and refs != old)
first_poll = old is None
if changed:
self._known_refs[rid] = refs
if changed and not first_poll:
self._broadcast(rid, refs)
except Exception as e:
RNS.log(f"Gossip poll error ({rid[:20]}): {e}", RNS.LOG_WARNING)
# Interruptible sleep
for _ in range(self.poll_interval):
if not self._running:
return
time.sleep(1)
# ── Internal: sending ────────────────────────────────────────────────────
def _broadcast(self, rid: str, refs: Dict[str, str]):
payload = json.dumps({
"type": "refs",
"rid": rid,
"nid": self.radicle_nid or "",
"refs": refs,
}).encode()
with self._peers_lock:
peers = list(self._known_peers.keys())
sent = sum(1 for h in peers if self._send_packet(h, payload))
RNS.log(f"Broadcast refs for {rid[:20]}... → {sent}/{len(peers)} peers", RNS.LOG_INFO)
def _send_packet(self, peer_hash: bytes, payload: bytes) -> bool:
try:
if not RNS.Transport.has_path(peer_hash):
RNS.Transport.request_path(peer_hash)
deadline = time.time() + PATH_REQUEST_TIMEOUT
while not RNS.Transport.has_path(peer_hash):
if time.time() > deadline:
RNS.log(
f"No path to gossip peer {peer_hash.hex()[:16]}",
RNS.LOG_WARNING,
)
return False
time.sleep(0.2)
peer_identity = RNS.Identity.recall(peer_hash)
if peer_identity is None:
RNS.log(
f"Identity not known for {peer_hash.hex()[:16]}",
RNS.LOG_WARNING,
)
return False
dest = RNS.Destination(
peer_identity,
RNS.Destination.OUT,
RNS.Destination.SINGLE,
APP_NAME,
ASPECT_GOSSIP,
)
RNS.Packet(dest, payload).send()
return True
except Exception as e:
RNS.log(f"Gossip send error: {e}", RNS.LOG_WARNING)
return False
# ── Internal: receiving ──────────────────────────────────────────────────
def _on_packet(self, data: bytes, packet: RNS.Packet):
try:
msg = json.loads(data.decode())
except Exception:
return
if msg.get("type") != "refs":
return
rid: str = msg.get("rid", "")
nid: str = msg.get("nid", "")
remote_refs: Dict[str, str] = msg.get("refs", {})
if not rid or not remote_refs:
return
with self._refs_lock:
local_refs = self._known_refs.get(rid, {})
changed = any(remote_refs.get(r) != local_refs.get(r) for r in remote_refs)
if changed:
RNS.log(
f"Gossip: new refs for {rid[:20]}... from {nid[:24] if nid else 'unknown'}",
RNS.LOG_INFO,
)
threading.Thread(
target=self._trigger_sync,
args=(rid, nid),
daemon=True,
).start()
def _trigger_sync(self, rid: str, nid: str):
"""Run rad node connect (if needed) then rad sync --fetch."""
if nid:
subprocess.run(
["rad", "node", "connect", f"{nid}@127.0.0.1:{self.bridge_port}"],
capture_output=True, timeout=15,
)
result = subprocess.run(
["rad", "sync", "--fetch", "--rid", rid],
capture_output=True, text=True, timeout=120,
)
if result.returncode == 0:
RNS.log(f"Sync succeeded: {rid[:20]}", RNS.LOG_INFO)
else:
stderr = result.stderr.strip()
RNS.log(
f"Sync failed for {rid[:20]}: {stderr[:120] if stderr else '(no output)'}",
RNS.LOG_WARNING,
)
if self._on_sync_triggered:
self._on_sync_triggered(rid, nid)
# ── Internal: peer discovery ─────────────────────────────────────────────
def _on_announce(
self,
destination_hash: bytes,
announced_identity: RNS.Identity,
app_data: Optional[bytes],
):
if destination_hash == self.destination.hash:
return
if not app_data or not app_data.startswith(GOSSIP_MAGIC):
return
radicle_nid: Optional[str] = None
if len(app_data) > len(GOSSIP_MAGIC):
try:
offset = len(GOSSIP_MAGIC)
nid_len = struct.unpack("!H", app_data[offset:offset + 2])[0]
raw = app_data[offset + 2: offset + 2 + nid_len]
radicle_nid = raw.decode() or None
except Exception:
pass
with self._peers_lock:
is_new = destination_hash not in self._known_peers
self._known_peers[destination_hash] = time.time()
if is_new:
RNS.log(
f"Discovered gossip peer: {destination_hash.hex()[:16]}"
+ (f" (NID: {radicle_nid[:32]})" if radicle_nid else ""),
RNS.LOG_INFO,
)
# Send our current refs so the peer knows our state immediately
for rid in self.rids:
with self._refs_lock:
refs = self._known_refs.get(rid)
if refs:
payload = json.dumps({
"type": "refs",
"rid": rid,
"nid": self.radicle_nid or "",
"refs": refs,
}).encode()
self._send_packet(destination_hash, payload)

View File

@ -1,191 +0,0 @@
"""QR code encoding/decoding for Radicle bundles.
Enables visual transfer of tiny bundles (2953 bytes) without any network
connection useful for air-gapped machines, paper backups, or "dead drop"
workflows where two people briefly see each other's screens.
Requires the optional 'qrcode' package:
pip install radicle-reticulum[qr] # or: pip install qrcode
"""
import base64
import hashlib
from pathlib import Path
from typing import Optional
from radicle_reticulum.git_bundle import GitBundle
# QR code capacity: version 40, binary mode, error correction L
QR_MAX_BYTES = 2953
# Magic header to identify Radicle QR payloads (8 bytes)
QR_MAGIC = b"RADQR\x01\x00\x00"
class BundleTooLargeForQR(ValueError):
"""Raised when a bundle exceeds QR capacity."""
pass
def encode_bundle_to_qr(
bundle: GitBundle,
output_path: Optional[Path] = None,
error_correction: str = "L",
) -> str:
"""Encode a GitBundle as a QR code.
The bundle data is base64-encoded and wrapped with a magic prefix + SHA-256
checksum so the receiver can verify integrity after scanning.
Args:
bundle: The GitBundle to encode. Must be QR_MAX_BYTES when serialised.
output_path: If given, write a PNG image to this path (requires qrcode[pil]).
If None, returns terminal-printable ASCII art.
error_correction: QR error correction level: L, M, Q, or H (default L
gives maximum data capacity).
Returns:
ASCII art string for terminal display (always), and also writes PNG if
output_path is specified.
Raises:
BundleTooLargeForQR: If the serialised bundle exceeds 2953 bytes.
ImportError: If the 'qrcode' package is not installed.
"""
try:
import qrcode
from qrcode.constants import ERROR_CORRECT_L, ERROR_CORRECT_M, ERROR_CORRECT_Q, ERROR_CORRECT_H
except ImportError as e:
raise ImportError(
"The 'qrcode' package is required for QR encoding. "
"Install it with: pip install 'radicle-reticulum[qr]' or pip install qrcode"
) from e
ec_map = {
"L": ERROR_CORRECT_L,
"M": ERROR_CORRECT_M,
"Q": ERROR_CORRECT_Q,
"H": ERROR_CORRECT_H,
}
ec = ec_map.get(error_correction.upper(), ERROR_CORRECT_L)
bundle_bytes = bundle.encode()
if len(bundle_bytes) > QR_MAX_BYTES:
raise BundleTooLargeForQR(
f"Bundle is {len(bundle_bytes)} bytes, exceeds QR capacity of {QR_MAX_BYTES} bytes. "
"Use a different sync strategy for larger bundles."
)
# Payload: magic + 4-byte length + checksum (32 bytes) + bundle data
checksum = hashlib.sha256(bundle_bytes).digest()
length_prefix = len(bundle_bytes).to_bytes(4, "big")
payload = QR_MAGIC + length_prefix + checksum + bundle_bytes
qr = qrcode.QRCode(
version=None, # auto-select
error_correction=ec,
box_size=10,
border=4,
)
qr.add_data(payload)
qr.make(fit=True)
if output_path is not None:
try:
from PIL import Image
img = qr.make_image(fill_color="black", back_color="white")
img.save(str(output_path))
except ImportError:
raise ImportError(
"Saving QR images requires Pillow. "
"Install it with: pip install 'qrcode[pil]'"
)
# Always return ASCII art for terminal
import io
buf = io.StringIO()
qr.print_ascii(out=buf, invert=True)
return buf.getvalue()
def decode_bundle_from_qr_data(data: bytes) -> GitBundle:
"""Decode a GitBundle from raw QR payload bytes (as scanned from a QR code).
This is the inverse of encode_bundle_to_qr. The data should be the raw
bytes scanned from the QR code.
Args:
data: Raw bytes decoded from a QR code scan.
Returns:
The decoded GitBundle.
Raises:
ValueError: If the data is not a valid Radicle QR payload.
"""
if not data.startswith(QR_MAGIC):
raise ValueError(
f"Not a Radicle QR payload (expected magic {QR_MAGIC!r}, "
f"got {data[:len(QR_MAGIC)]!r})"
)
offset = len(QR_MAGIC)
bundle_len = int.from_bytes(data[offset:offset + 4], "big")
offset += 4
stored_checksum = data[offset:offset + 32]
offset += 32
bundle_bytes = data[offset:offset + bundle_len]
if len(bundle_bytes) != bundle_len:
raise ValueError(
f"Truncated QR payload: expected {bundle_len} bytes, got {len(bundle_bytes)}"
)
actual_checksum = hashlib.sha256(bundle_bytes).digest()
if actual_checksum != stored_checksum:
raise ValueError(
f"QR payload checksum mismatch: data may be corrupted"
)
return GitBundle.decode(bundle_bytes)
def decode_bundle_from_qr_image(image_path: Path) -> GitBundle:
"""Decode a GitBundle from a QR code image file.
Requires pyzbar and Pillow:
pip install pyzbar Pillow
Args:
image_path: Path to the QR code image (PNG, JPG, etc.)
Returns:
The decoded GitBundle.
Raises:
ImportError: If pyzbar or Pillow are not installed.
ValueError: If no valid Radicle QR code is found in the image.
"""
try:
from pyzbar.pyzbar import decode as pyzbar_decode
from PIL import Image
except ImportError as e:
raise ImportError(
"Decoding QR images requires pyzbar and Pillow. "
"Install them with: pip install pyzbar Pillow"
) from e
image = Image.open(str(image_path))
decoded_objects = pyzbar_decode(image)
for obj in decoded_objects:
try:
return decode_bundle_from_qr_data(obj.data)
except ValueError:
continue
raise ValueError(
f"No valid Radicle QR code found in {image_path}. "
"Make sure the image contains a QR code created by 'radicle-rns bundle qr-encode'."
)

View File

@ -0,0 +1,174 @@
"""Radicle seed node manager.
Starts and manages a dedicated radicle-node configured as a seed server
(storage.policy = allow). The seed runs under its own RAD_HOME so it
doesn't interfere with the user's own radicle identity.
Users add the seed as a peer once:
rad node connect <seed-NID>@127.0.0.1:<seed-port>
The bridge then connects this seed to remote seeds over Reticulum.
"""
import json
import os
import socket
import subprocess
import time
from pathlib import Path
from typing import Optional
DEFAULT_SEED_HOME = Path.home() / ".radicle-rns" / "seed"
DEFAULT_SEED_PORT = 8779
SEED_CONFIG = {
"node": {
"alias": "radicle-rns-seed",
"listen": [], # filled in by write_config()
"connect": [],
"externalAddresses": [],
"seedingPolicy": {
"default": "allow",
"repos": {}
},
"db": {"journalMode": "wal"},
}
}
class SeedNode:
"""Manages a dedicated radicle-node process configured as a seed."""
def __init__(
self,
seed_home: Path = DEFAULT_SEED_HOME,
port: int = DEFAULT_SEED_PORT,
):
self.seed_home = Path(seed_home)
self.port = port
self._process: Optional[subprocess.Popen] = None
# ── Setup ────────────────────────────────────────────────────────────────
def is_initialized(self) -> bool:
"""Return True if rad auth has been run for this seed home."""
try:
result = subprocess.run(
["rad", "self"],
env=self._env(),
capture_output=True,
text=True,
timeout=5,
)
return result.returncode == 0 and "NID" in result.stdout
except Exception:
return False
def write_config(self):
"""Write config.json only if it does not already exist."""
self.seed_home.mkdir(parents=True, exist_ok=True)
config_path = self.seed_home / "config.json"
if config_path.exists():
return
config = json.loads(json.dumps(SEED_CONFIG)) # deep copy
config["node"]["listen"] = [f"127.0.0.1:{self.port}"]
config_path.write_text(json.dumps(config, indent=2))
# ── Lifecycle ─────────────────────────────────────────────────────────────
def start(self):
"""Start radicle-node for the seed. Raises if not initialized."""
if not self.is_initialized():
raise RuntimeError(
f"Seed identity not found. Initialize it with:\n"
f" RAD_HOME={self.seed_home} rad auth\n"
f"Then run 'radicle-rns seed' again."
)
self.write_config()
# Discard stdout/stderr — if kept as PIPE the buffer fills and the
# process deadlocks once radicle-node produces more than ~64 KB of logs.
self._process = subprocess.Popen(
["radicle-node"],
env=self._env(),
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
)
# Wait up to 10s for the port to open.
deadline = time.time() + 10
while time.time() < deadline:
if self._process.poll() is not None:
raise RuntimeError(
f"Seed radicle-node exited immediately after launch. "
f"Check: RAD_HOME={self.seed_home} radicle-node"
)
if self._port_open():
return
time.sleep(0.3)
if self._process.poll() is not None:
raise RuntimeError("Seed radicle-node exited before port opened.")
# Still running but port not open — may be slow on first run; continue.
print(f"Warning: seed port {self.port} not yet open after 10s, continuing anyway.")
def stop(self):
"""Stop the seed radicle-node."""
if self._process and self._process.poll() is None:
self._process.terminate()
try:
self._process.wait(timeout=5)
except subprocess.TimeoutExpired:
self._process.kill()
self._process.wait() # reap zombie
def is_running(self) -> bool:
return self._process is not None and self._process.poll() is None
# ── Queries ───────────────────────────────────────────────────────────────
def get_nid(self) -> Optional[str]:
"""Return the seed's radicle NID (z6Mk...)."""
result = subprocess.run(
["rad", "self"],
env=self._env(),
capture_output=True,
text=True,
timeout=5,
)
if result.returncode != 0:
return None
for line in result.stdout.splitlines():
parts = line.split()
if len(parts) >= 2 and parts[0] == "NID":
return parts[1]
return None
def connect_peer(self, nid: str, addr: str) -> bool:
"""Tell the seed to connect to a remote peer."""
result = subprocess.run(
["rad", "node", "connect", f"{nid}@{addr}"],
env=self._env(),
capture_output=True,
text=True,
timeout=30,
)
return result.returncode == 0
# ── Internal ──────────────────────────────────────────────────────────────
def _env(self) -> dict:
env = os.environ.copy()
env["RAD_HOME"] = str(self.seed_home)
return env
def _port_open(self) -> bool:
try:
with socket.create_connection(("127.0.0.1", self.port), timeout=0.5):
return True
except OSError:
return False

View File

@ -1,770 +0,0 @@
"""Sync manager for Radicle repositories over Reticulum.
Handles repository synchronization using:
- Full bundles for initial clone or fast networks
- Incremental bundles for updates over low-bandwidth links (LoRa)
- LXMF store-and-forward for offline peers
"""
import os
import threading
import time
import hashlib
from dataclasses import dataclass, field
from enum import Enum
from pathlib import Path
from typing import Callable, Dict, List, Optional, Set, Tuple
import RNS
import LXMF
from radicle_reticulum.identity import RadicleIdentity
from radicle_reticulum.git_bundle import (
GitBundle,
GitBundleGenerator,
GitBundleApplicator,
BundleMetadata,
BundleType,
RADICLE_REF_PATTERNS,
estimate_bundle_size,
)
# LXMF content type for Radicle bundles
CONTENT_TYPE_BUNDLE = 0x52 # 'R' for Radicle — complete bundle
CONTENT_TYPE_BUNDLE_CHUNK = 0x53 # chunked fragment of a large bundle
CONTENT_TYPE_BUNDLE_REQUEST = 0x55
CONTENT_TYPE_REFS_ANNOUNCE = 0x54
# Chunk header layout: bundle_id(16) + chunk_num(2) + total_chunks(2) = 20 bytes
CHUNK_HEADER_SIZE = 20
# Maximum LXMF message size (configurable, default ~500KB for LoRa-friendly chunks)
DEFAULT_MAX_LXMF_SIZE = 500 * 1024
# For LoRa, much smaller chunks
LORA_MAX_LXMF_SIZE = 32 * 1024
class SyncMode(Enum):
"""Sync mode based on transport capability."""
FULL = "full" # Fast network - send full bundles
INCREMENTAL = "incremental" # Slow network - send only changes
AUTO = "auto" # Detect based on link quality
@dataclass
class RepoSyncState:
"""Tracks sync state for a repository."""
repository_id: str
local_path: Path
known_peers: Dict[str, Dict[str, str]] = field(default_factory=dict) # peer_did -> {ref: sha}
last_sync: Dict[str, float] = field(default_factory=dict) # peer_did -> timestamp
pending_bundles: List[bytes] = field(default_factory=list)
@dataclass
class RefsAnnouncement:
"""Announces current ref state for a repository."""
repository_id: str
node_id: str
refs: Dict[str, str] # {ref_name: commit_sha}
timestamp: int
def encode(self) -> bytes:
"""Encode to bytes for LXMF transport."""
import struct
repo_bytes = self.repository_id.encode("utf-8")
node_bytes = self.node_id.encode("utf-8")
refs_data = b""
for ref, sha in self.refs.items():
ref_bytes = ref.encode("utf-8")
sha_bytes = sha.encode("utf-8")
refs_data += struct.pack(f"!H{len(ref_bytes)}sH{len(sha_bytes)}s",
len(ref_bytes), ref_bytes,
len(sha_bytes), sha_bytes)
return struct.pack(
f"!H{len(repo_bytes)}sH{len(node_bytes)}sQH",
len(repo_bytes), repo_bytes,
len(node_bytes), node_bytes,
self.timestamp,
len(self.refs),
) + refs_data
@classmethod
def decode(cls, data: bytes) -> "RefsAnnouncement":
"""Decode from bytes."""
import struct
offset = 0
repo_len = struct.unpack("!H", data[offset:offset+2])[0]
offset += 2
repository_id = data[offset:offset+repo_len].decode("utf-8")
offset += repo_len
node_len = struct.unpack("!H", data[offset:offset+2])[0]
offset += 2
node_id = data[offset:offset+node_len].decode("utf-8")
offset += node_len
timestamp, refs_count = struct.unpack("!QH", data[offset:offset+10])
offset += 10
refs = {}
for _ in range(refs_count):
ref_len = struct.unpack("!H", data[offset:offset+2])[0]
offset += 2
ref = data[offset:offset+ref_len].decode("utf-8")
offset += ref_len
sha_len = struct.unpack("!H", data[offset:offset+2])[0]
offset += 2
sha = data[offset:offset+sha_len].decode("utf-8")
offset += sha_len
refs[ref] = sha
return cls(
repository_id=repository_id,
node_id=node_id,
refs=refs,
timestamp=timestamp,
)
class SyncManager:
"""Manages repository synchronization over Reticulum.
Features:
- Full bundle sync for initial clone / fast networks
- Incremental sync for LoRa / low-bandwidth
- LXMF store-and-forward for offline peers
- Automatic chunking for large bundles
"""
def __init__(
self,
identity: RadicleIdentity,
storage_path: Optional[Path] = None,
max_bundle_size: int = DEFAULT_MAX_LXMF_SIZE,
auto_push: bool = False,
):
"""Initialize sync manager.
Args:
identity: Local node identity
storage_path: Path for storing sync state and pending bundles
max_bundle_size: Maximum bundle size before chunking
auto_push: If True, speculatively push incremental bundles to peers
when we receive their RefsAnnouncement and we have newer data.
Eliminates the request round-trip for common sync patterns.
"""
self.identity = identity
self.max_bundle_size = max_bundle_size
self.auto_push = auto_push
# Storage for sync state
if storage_path is None:
storage_path = Path.home() / ".radicle-rns"
self.storage_path = Path(storage_path)
self.storage_path.mkdir(parents=True, exist_ok=True)
# Repository tracking
self._repos: Dict[str, RepoSyncState] = {}
self._repos_lock = threading.Lock()
# LXMF setup
self._lxmf_router: Optional[LXMF.LXMRouter] = None
self._lxmf_destination: Optional[LXMF.LXMFDeliveryDestination] = None
# Known peers: lxmf_source_hash (bytes) -> last_seen (float)
self._known_peers: Dict[bytes, float] = {}
self._peers_lock = threading.Lock()
# Chunk reassembly buffers: bundle_id (16 bytes) -> {chunk_num: data}
self._chunk_buffers: Dict[bytes, Dict[int, bytes]] = {}
self._chunk_totals: Dict[bytes, int] = {} # bundle_id -> total_chunks
self._chunk_lock = threading.Lock()
# Callbacks
self._on_bundle_received: Optional[Callable[[GitBundle], None]] = None
self._on_refs_announced: Optional[Callable[[RefsAnnouncement], None]] = None
def start(self, reticulum: Optional[RNS.Reticulum] = None):
"""Start the sync manager and LXMF router."""
if reticulum is None:
reticulum = RNS.Reticulum()
# Create LXMF router for store-and-forward
self._lxmf_router = LXMF.LXMRouter(
identity=self.identity.rns_identity,
storagepath=str(self.storage_path / "lxmf"),
)
# Create our LXMF destination
self._lxmf_destination = self._lxmf_router.register_delivery_identity(
self.identity.rns_identity,
display_name=f"Radicle Node {self.identity.rns_hash_hex[:8]}",
)
self._lxmf_destination.set_delivery_callback(self._on_lxmf_delivery)
RNS.log(f"Sync manager started", RNS.LOG_INFO)
RNS.log(f" LXMF address: {self._lxmf_destination.hash.hex()}", RNS.LOG_INFO)
def stop(self):
"""Stop the sync manager."""
if self._lxmf_router:
# LXMF router cleanup
pass
RNS.log("Sync manager stopped", RNS.LOG_INFO)
def register_peer(self, destination_hash: bytes):
"""Register a known peer by their LXMF source hash.
Peers are also learned automatically from incoming messages.
"""
with self._peers_lock:
self._known_peers[destination_hash] = time.time()
RNS.log(f"Registered peer: {destination_hash.hex()}", RNS.LOG_DEBUG)
def get_known_peers(self) -> List[bytes]:
"""Return list of known peer hashes."""
with self._peers_lock:
return list(self._known_peers.keys())
def _send_lxmf_message(
self,
destination_hash: bytes,
content_type: int,
content: bytes,
propagate: bool = True,
) -> bool:
"""Send an LXMF message to a destination hash.
Args:
destination_hash: RNS identity hash of the recipient.
content_type: Radicle content type constant.
content: Message payload bytes.
propagate: Use LXMF propagation (store-and-forward) if True.
Returns:
True if the message was queued successfully.
"""
if self._lxmf_router is None or self._lxmf_destination is None:
RNS.log("Sync manager not started, cannot send LXMF message", RNS.LOG_WARNING)
return False
try:
destination_identity = RNS.Identity.recall(destination_hash)
if destination_identity is None:
RNS.log(f"Unknown peer identity: {destination_hash.hex()}", RNS.LOG_WARNING)
return False
lxmf_dest = RNS.Destination(
destination_identity,
RNS.Destination.OUT,
RNS.Destination.SINGLE,
"lxmf",
"delivery",
)
message = LXMF.LXMessage(
lxmf_dest,
self._lxmf_destination,
content,
desired_method=(
LXMF.LXMessage.PROPAGATED if propagate else LXMF.LXMessage.DIRECT
),
)
message.fields[LXMF.FIELD_CUSTOM_TYPE] = content_type
self._lxmf_router.handle_outbound(message)
return True
except Exception as e:
RNS.log(f"Failed to send LXMF message: {e}", RNS.LOG_ERROR)
return False
def register_repository(self, repository_id: str, local_path: Path) -> RepoSyncState:
"""Register a repository for syncing.
Args:
repository_id: Radicle repo ID (rad:z...)
local_path: Path to local Git repository
"""
with self._repos_lock:
if repository_id in self._repos:
return self._repos[repository_id]
state = RepoSyncState(
repository_id=repository_id,
local_path=Path(local_path),
)
self._repos[repository_id] = state
RNS.log(f"Registered repository: {repository_id}", RNS.LOG_INFO)
return state
def announce_refs(self, repository_id: str) -> bool:
"""Announce current refs for a repository.
This allows peers to determine if they need updates.
"""
with self._repos_lock:
if repository_id not in self._repos:
return False
state = self._repos[repository_id]
try:
generator = GitBundleGenerator(state.local_path)
refs = generator.get_refs()
announcement = RefsAnnouncement(
repository_id=repository_id,
node_id=self.identity.did,
refs=refs,
timestamp=int(time.time() * 1000),
)
ann_bytes = announcement.encode()
# Broadcast to all known peers via LXMF (store-and-forward)
peers = self.get_known_peers()
if not peers:
RNS.log(
f"Refs announcement: {repository_id} ({len(refs)} refs) — no peers known",
RNS.LOG_DEBUG,
)
return True
sent = sum(
1 for peer_hash in peers
if self._send_lxmf_message(
peer_hash, CONTENT_TYPE_REFS_ANNOUNCE, ann_bytes, propagate=True
)
)
RNS.log(
f"Refs announcement: {repository_id} ({len(refs)} refs) sent to {sent}/{len(peers)} peers",
RNS.LOG_INFO,
)
return True
except Exception as e:
RNS.log(f"Failed to announce refs: {e}", RNS.LOG_ERROR)
return False
def create_sync_bundle(
self,
repository_id: str,
peer_refs: Optional[Dict[str, str]] = None,
mode: SyncMode = SyncMode.AUTO,
) -> Optional[GitBundle]:
"""Create a bundle for syncing to a peer.
Args:
repository_id: Repository to sync
peer_refs: Known refs at peer (for incremental sync)
mode: Sync mode (full, incremental, or auto)
Returns:
GitBundle ready for transport, or None if no changes
"""
with self._repos_lock:
if repository_id not in self._repos:
raise ValueError(f"Repository not registered: {repository_id}")
state = self._repos[repository_id]
generator = GitBundleGenerator(state.local_path)
# Decide sync mode
if mode == SyncMode.AUTO:
if peer_refs:
# Have peer state, can do incremental
mode = SyncMode.INCREMENTAL
else:
mode = SyncMode.FULL
if mode == SyncMode.FULL:
return generator.create_full_bundle(
repository_id=repository_id,
source_node=self.identity.did,
)
else:
if peer_refs is None:
peer_refs = {}
return generator.create_incremental_bundle(
repository_id=repository_id,
source_node=self.identity.did,
basis_refs=peer_refs,
)
def send_bundle(
self,
bundle: GitBundle,
destination_hash: bytes,
propagate: bool = True,
) -> bool:
"""Send a bundle to a peer via LXMF.
Args:
bundle: The bundle to send
destination_hash: RNS destination hash of recipient
propagate: If True, use LXMF propagation for offline delivery
"""
if self._lxmf_router is None:
raise RuntimeError("Sync manager not started")
try:
# Encode bundle for transport
bundle_data = bundle.encode()
# Check if we need to chunk
if len(bundle_data) > self.max_bundle_size:
return self._send_chunked_bundle(bundle_data, destination_hash, propagate)
# Create LXMF message
destination_identity = RNS.Identity.recall(destination_hash)
if destination_identity is None:
RNS.log(f"Unknown destination: {destination_hash.hex()}", RNS.LOG_WARNING)
return False
lxmf_dest = RNS.Destination(
destination_identity,
RNS.Destination.OUT,
RNS.Destination.SINGLE,
"lxmf",
"delivery",
)
message = LXMF.LXMessage(
lxmf_dest,
self._lxmf_destination,
bundle_data,
desired_method=LXMF.LXMessage.PROPAGATED if propagate else LXMF.LXMessage.DIRECT,
)
message.fields[LXMF.FIELD_CUSTOM_TYPE] = CONTENT_TYPE_BUNDLE
self._lxmf_router.handle_outbound(message)
RNS.log(f"Sent bundle ({len(bundle_data)} bytes) to {destination_hash.hex()}", RNS.LOG_INFO)
return True
except Exception as e:
RNS.log(f"Failed to send bundle: {e}", RNS.LOG_ERROR)
return False
def _send_chunked_bundle(
self,
bundle_data: bytes,
destination_hash: bytes,
propagate: bool,
) -> bool:
"""Send a large bundle as sequential LXMF chunks.
Each chunk has a 20-byte header: bundle_id(16) + chunk_num(2) + total(2).
The receiver reassembles chunks ordered by chunk_num.
"""
import struct
chunk_size = self.max_bundle_size - CHUNK_HEADER_SIZE
total_chunks = (len(bundle_data) + chunk_size - 1) // chunk_size
bundle_id = hashlib.sha256(bundle_data).digest()[:16]
RNS.log(f"Sending bundle in {total_chunks} chunks ({len(bundle_data)} bytes)", RNS.LOG_INFO)
for i in range(total_chunks):
start = i * chunk_size
end = min(start + chunk_size, len(bundle_data))
chunk_data = bundle_data[start:end]
chunk_msg = struct.pack("!16sHH", bundle_id, i, total_chunks) + chunk_data
if not self._send_lxmf_message(
destination_hash, CONTENT_TYPE_BUNDLE_CHUNK, chunk_msg, propagate
):
RNS.log(f"Failed to send chunk {i+1}/{total_chunks}", RNS.LOG_WARNING)
return False
RNS.log(f" Chunk {i+1}/{total_chunks} ({len(chunk_data)} bytes)", RNS.LOG_DEBUG)
return True
def apply_bundle(self, bundle: GitBundle) -> Dict[str, str]:
"""Apply a received bundle to local repository.
Returns dict of applied refs.
"""
with self._repos_lock:
repo_id = bundle.metadata.repository_id
if repo_id not in self._repos:
raise ValueError(f"Repository not registered: {repo_id}")
state = self._repos[repo_id]
applicator = GitBundleApplicator(state.local_path)
# Verify and apply
ok, msg = applicator.verify_bundle(bundle)
if not ok:
raise ValueError(f"Bundle verification failed: {msg}")
applied_refs = applicator.apply_bundle(bundle)
# Update known peer state
source_node = bundle.metadata.source_node
with self._repos_lock:
state.known_peers[source_node] = applied_refs
state.last_sync[source_node] = time.time()
RNS.log(f"Applied bundle from {source_node}: {len(applied_refs)} refs", RNS.LOG_INFO)
return applied_refs
def _on_lxmf_delivery(self, message: LXMF.LXMessage):
"""Handle incoming LXMF message."""
import struct
# Learn the peer's address for future announcements
if hasattr(message, "source_hash") and message.source_hash:
with self._peers_lock:
self._known_peers[message.source_hash] = time.time()
content_type = message.fields.get(LXMF.FIELD_CUSTOM_TYPE, 0)
if content_type == CONTENT_TYPE_BUNDLE:
self._handle_bundle_message(message.content)
elif content_type == CONTENT_TYPE_BUNDLE_CHUNK:
self._handle_chunk_message(message.content)
elif content_type == CONTENT_TYPE_REFS_ANNOUNCE:
try:
announcement = RefsAnnouncement.decode(message.content)
RNS.log(
f"Received refs announcement for {announcement.repository_id}",
RNS.LOG_DEBUG,
)
if self._on_refs_announced:
self._on_refs_announced(announcement)
if self.auto_push and message.source_hash:
self._maybe_push_to_peer(message.source_hash, announcement)
except Exception as e:
RNS.log(f"Failed to process refs announcement: {e}", RNS.LOG_ERROR)
def _handle_bundle_message(self, content: bytes):
"""Process a complete bundle payload."""
try:
bundle = GitBundle.decode(content)
RNS.log(f"Received bundle for {bundle.metadata.repository_id}", RNS.LOG_INFO)
if self._on_bundle_received:
self._on_bundle_received(bundle)
elif bundle.metadata.repository_id in self._repos:
self.apply_bundle(bundle)
except Exception as e:
RNS.log(f"Failed to process bundle: {e}", RNS.LOG_ERROR)
def _handle_chunk_message(self, content: bytes):
"""Reassemble a chunked bundle fragment."""
import struct
if len(content) < CHUNK_HEADER_SIZE:
RNS.log("Received malformed chunk (too short)", RNS.LOG_WARNING)
return
bundle_id, chunk_num, total_chunks = struct.unpack("!16sHH", content[:CHUNK_HEADER_SIZE])
chunk_data = content[CHUNK_HEADER_SIZE:]
with self._chunk_lock:
if bundle_id not in self._chunk_buffers:
self._chunk_buffers[bundle_id] = {}
self._chunk_totals[bundle_id] = total_chunks
self._chunk_buffers[bundle_id][chunk_num] = chunk_data
RNS.log(
f"Chunk {chunk_num+1}/{total_chunks} received for bundle {bundle_id.hex()[:8]}",
RNS.LOG_DEBUG,
)
if len(self._chunk_buffers[bundle_id]) == total_chunks:
# All chunks received — reassemble
ordered = [self._chunk_buffers[bundle_id][i] for i in range(total_chunks)]
bundle_data = b"".join(ordered)
del self._chunk_buffers[bundle_id]
del self._chunk_totals[bundle_id]
else:
return
RNS.log(f"All {total_chunks} chunks received, reassembling bundle", RNS.LOG_INFO)
self._handle_bundle_message(bundle_data)
# ------------------------------------------------------------------
# Phase 4: speculative push
# ------------------------------------------------------------------
def _should_push_to_peer(
self,
our_refs: Dict[str, str],
peer_refs: Dict[str, str],
) -> bool:
"""Return True if we have commits the peer doesn't.
True when any ref we hold differs from what the peer announced,
or when we have refs entirely absent from their announcement.
"""
for ref, sha in our_refs.items():
if peer_refs.get(ref) != sha:
return True
return False
def _maybe_push_to_peer(
self,
source_hash: bytes,
announcement: RefsAnnouncement,
):
"""Speculatively push an incremental bundle to a peer if we have newer data.
Called when auto_push=True and we receive a RefsAnnouncement. We
compute an incremental bundle relative to the peer's announced refs.
If the repo produces a non-empty bundle we send it immediately
eliminating the request round-trip that would otherwise be needed.
"""
with self._repos_lock:
state = self._repos.get(announcement.repository_id)
if state is None:
return
try:
generator = GitBundleGenerator(state.local_path)
our_refs = generator.get_refs()
if not self._should_push_to_peer(our_refs, announcement.refs):
RNS.log(
f"Speculative push skipped: peer {source_hash.hex()[:8]} "
f"is up-to-date for {announcement.repository_id}",
RNS.LOG_DEBUG,
)
return
bundle = generator.create_incremental_bundle(
repository_id=announcement.repository_id,
source_node=self.identity.did,
basis_refs=announcement.refs,
)
if bundle is None:
return # git reported no changes despite differing refs
bundle_data = bundle.encode()
if len(bundle_data) > self.max_bundle_size:
success = self._send_chunked_bundle(bundle_data, source_hash, propagate=True)
else:
success = self._send_lxmf_message(
source_hash, CONTENT_TYPE_BUNDLE, bundle_data, propagate=True
)
if success:
RNS.log(
f"Speculative push: sent {len(bundle_data)}-byte incremental bundle "
f"to {source_hash.hex()[:8]} for {announcement.repository_id}",
RNS.LOG_INFO,
)
except Exception as e:
RNS.log(f"Speculative push failed: {e}", RNS.LOG_WARNING)
def set_on_bundle_received(self, callback: Callable[[GitBundle], None]):
"""Set callback for received bundles."""
self._on_bundle_received = callback
def set_on_refs_announced(self, callback: Callable[[RefsAnnouncement], None]):
"""Set callback for refs announcements."""
self._on_refs_announced = callback
def get_sync_status(self, repository_id: str) -> Optional[Dict]:
"""Get sync status for a repository."""
with self._repos_lock:
if repository_id not in self._repos:
return None
state = self._repos[repository_id]
return {
"repository_id": repository_id,
"local_path": str(state.local_path),
"known_peers": len(state.known_peers),
"last_syncs": {
peer: time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(ts))
for peer, ts in state.last_sync.items()
},
}
def create_dead_drop_bundle(
repo_path: Path,
repository_id: str,
source_node: str,
output_path: Path,
incremental_basis: Optional[Dict[str, str]] = None,
) -> GitBundle:
"""Create a bundle file for dead-drop transfer (USB, etc).
Args:
repo_path: Path to Git repository
repository_id: Radicle repo ID
source_node: Source node DID
output_path: Where to write the bundle file
incremental_basis: Known refs for incremental bundle
Returns:
The created GitBundle
"""
generator = GitBundleGenerator(repo_path)
if incremental_basis:
bundle = generator.create_incremental_bundle(
repository_id=repository_id,
source_node=source_node,
basis_refs=incremental_basis,
output_path=output_path,
)
else:
bundle = generator.create_full_bundle(
repository_id=repository_id,
source_node=source_node,
output_path=output_path,
)
if bundle is None:
raise ValueError("No changes to bundle")
# Also save the full transport format
transport_path = output_path.with_suffix(".radicle-bundle")
transport_path.write_bytes(bundle.encode())
RNS.log(f"Created dead-drop bundle: {output_path}", RNS.LOG_INFO)
RNS.log(f" Type: {bundle.metadata.bundle_type.value}", RNS.LOG_INFO)
RNS.log(f" Size: {bundle.metadata.size_bytes} bytes", RNS.LOG_INFO)
RNS.log(f" Refs: {len(bundle.metadata.refs_included)}", RNS.LOG_INFO)
return bundle
def apply_dead_drop_bundle(
bundle_path: Path,
repo_path: Path,
) -> Dict[str, str]:
"""Apply a dead-drop bundle to a repository.
Args:
bundle_path: Path to .radicle-bundle file
repo_path: Path to target Git repository
Returns:
Dict of applied refs
"""
# Load bundle
bundle_data = bundle_path.read_bytes()
bundle = GitBundle.decode(bundle_data)
# Apply
applicator = GitBundleApplicator(repo_path)
applied = applicator.apply_bundle(bundle)
RNS.log(f"Applied dead-drop bundle: {len(applied)} refs", RNS.LOG_INFO)
return applied

View File

@ -1,153 +0,0 @@
"""Tests for adaptive sync strategy selection."""
import pytest
from radicle_reticulum.adaptive import (
SyncStrategy,
LinkQuality,
estimate_transfer_time,
select_strategy,
THRESHOLD_FULL,
THRESHOLD_INCREMENTAL,
RTT_FAST,
RTT_MEDIUM,
RTT_SLOW,
QR_MAX_BYTES,
)
def make_quality(
rtt_ms: float = 50,
throughput_bps: float = 1_000_000,
is_lora: bool = False,
strategy: SyncStrategy = SyncStrategy.FULL,
) -> LinkQuality:
return LinkQuality(
rtt_ms=rtt_ms,
throughput_bps=throughput_bps,
packet_loss=0.0,
is_lora=is_lora,
strategy=strategy,
)
class TestLinkQuality:
def test_throughput_kbps(self):
q = make_quality(throughput_bps=50_000)
assert q.throughput_kbps == pytest.approx(50.0)
def test_repr(self):
q = make_quality(rtt_ms=100, throughput_bps=1000)
r = repr(q)
assert "100ms" in r
assert "1.0Kbps" in r
class TestEstimateTransferTime:
def test_zero_throughput_is_infinite(self):
q = make_quality(throughput_bps=0)
assert estimate_transfer_time(1024, q) == float("inf")
def test_known_value(self):
# 1 MB at 1 Mbps effective = 8s, but 80% efficiency → 10s
q = make_quality(throughput_bps=1_000_000)
t = estimate_transfer_time(1_000_000, q)
assert t == pytest.approx(10.0, rel=0.01)
def test_larger_file_takes_longer(self):
q = make_quality(throughput_bps=10_000)
t_small = estimate_transfer_time(1_000, q)
t_large = estimate_transfer_time(100_000, q)
assert t_large > t_small
class TestSelectStrategy:
def test_fast_link_small_repo_uses_full(self):
q = make_quality(rtt_ms=10, throughput_bps=10_000_000, strategy=SyncStrategy.FULL)
strategy, reason = select_strategy(
bundle_size=100_000,
incremental_size=None,
quality=q,
)
assert strategy == SyncStrategy.FULL
assert "full" in reason.lower() or "fast" in reason.lower()
def test_lora_link_prefers_incremental_when_viable(self):
q = make_quality(rtt_ms=5000, throughput_bps=2400, is_lora=True, strategy=SyncStrategy.MINIMAL)
strategy, reason = select_strategy(
bundle_size=500_000,
incremental_size=5_000,
quality=q,
max_transfer_time=3600,
)
assert strategy == SyncStrategy.INCREMENTAL
def test_lora_link_falls_back_to_minimal_when_too_large(self):
q = make_quality(rtt_ms=5000, throughput_bps=100, is_lora=True, strategy=SyncStrategy.MINIMAL)
strategy, reason = select_strategy(
bundle_size=10_000_000,
incremental_size=5_000_000,
quality=q,
max_transfer_time=3600,
)
assert strategy == SyncStrategy.MINIMAL
def test_tiny_incremental_uses_qr(self):
q = make_quality(rtt_ms=10, throughput_bps=10_000_000, strategy=SyncStrategy.FULL)
strategy, reason = select_strategy(
bundle_size=50_000,
incremental_size=QR_MAX_BYTES - 1,
quality=q,
)
assert strategy == SyncStrategy.QR
assert str(QR_MAX_BYTES - 1) in reason
def test_qr_not_selected_when_incremental_too_large(self):
q = make_quality(rtt_ms=10, throughput_bps=10_000_000, strategy=SyncStrategy.FULL)
strategy, _ = select_strategy(
bundle_size=50_000,
incremental_size=QR_MAX_BYTES + 1,
quality=q,
)
assert strategy != SyncStrategy.QR
def test_medium_link_prefers_incremental_over_full(self):
q = make_quality(
rtt_ms=500,
throughput_bps=50_000,
strategy=SyncStrategy.INCREMENTAL,
)
strategy, _ = select_strategy(
bundle_size=5_000_000,
incremental_size=50_000,
quality=q,
max_transfer_time=3600,
)
assert strategy == SyncStrategy.INCREMENTAL
def test_no_incremental_available_falls_back_to_full_or_minimal(self):
q = make_quality(rtt_ms=50, throughput_bps=10_000_000, strategy=SyncStrategy.FULL)
strategy, _ = select_strategy(
bundle_size=100_000,
incremental_size=None,
quality=q,
)
assert strategy in (SyncStrategy.FULL, SyncStrategy.INCREMENTAL)
def test_unreachably_slow_link_uses_minimal(self):
q = make_quality(rtt_ms=30000, throughput_bps=10, strategy=SyncStrategy.MINIMAL)
strategy, _ = select_strategy(
bundle_size=10_000_000,
incremental_size=None,
quality=q,
max_transfer_time=3600,
)
assert strategy == SyncStrategy.MINIMAL
def test_fast_link_large_repo_prefers_incremental_if_faster(self):
q = make_quality(rtt_ms=10, throughput_bps=500_000, strategy=SyncStrategy.FULL)
strategy, _ = select_strategy(
bundle_size=100_000_000,
incremental_size=100_000,
quality=q,
)
assert strategy == SyncStrategy.INCREMENTAL

View File

@ -1,267 +0,0 @@
"""Tests for Git bundle generation and application."""
import os
import subprocess
import tempfile
from pathlib import Path
import pytest
from radicle_reticulum.git_bundle import (
BundleMetadata,
BundleType,
GitBundle,
GitBundleGenerator,
GitBundleApplicator,
RADICLE_REF_PATTERNS,
)
@pytest.fixture
def temp_git_repo():
"""Create a temporary Git repository for testing."""
with tempfile.TemporaryDirectory() as tmpdir:
repo_path = Path(tmpdir) / "test_repo"
repo_path.mkdir()
# Initialize repo
subprocess.run(["git", "init"], cwd=repo_path, check=True, capture_output=True)
subprocess.run(
["git", "config", "user.email", "test@test.com"],
cwd=repo_path, check=True, capture_output=True
)
subprocess.run(
["git", "config", "user.name", "Test User"],
cwd=repo_path, check=True, capture_output=True
)
# Create initial commit
(repo_path / "README.md").write_text("# Test Repo\n")
subprocess.run(["git", "add", "README.md"], cwd=repo_path, check=True, capture_output=True)
subprocess.run(
["git", "commit", "-m", "Initial commit"],
cwd=repo_path, check=True, capture_output=True
)
yield repo_path
@pytest.fixture
def temp_bare_repo():
"""Create a temporary bare Git repository for testing."""
with tempfile.TemporaryDirectory() as tmpdir:
repo_path = Path(tmpdir) / "bare_repo.git"
subprocess.run(["git", "init", "--bare", str(repo_path)], check=True, capture_output=True)
yield repo_path
class TestBundleMetadata:
"""Test BundleMetadata encoding/decoding."""
def test_encode_decode_full_bundle(self):
"""Test encode/decode of full bundle metadata."""
metadata = BundleMetadata(
bundle_type=BundleType.FULL,
repository_id="rad:z3gqcJUoA1n9HaHKufZs5FCSGazv5",
source_node="did:key:z6MkhaXgBZDvotDkL5257faiztiGiC2QtKLGpbnnEGta2doK",
timestamp=1234567890123,
refs_included=["refs/heads/main", "refs/rad/id"],
prerequisites=[],
size_bytes=1024,
checksum=b"\x00" * 32,
)
encoded = metadata.encode()
decoded, consumed = BundleMetadata.decode(encoded)
assert decoded.bundle_type == metadata.bundle_type
assert decoded.repository_id == metadata.repository_id
assert decoded.source_node == metadata.source_node
assert decoded.timestamp == metadata.timestamp
assert decoded.refs_included == metadata.refs_included
assert decoded.prerequisites == metadata.prerequisites
assert decoded.size_bytes == metadata.size_bytes
assert decoded.checksum == metadata.checksum
def test_encode_decode_incremental_bundle(self):
"""Test encode/decode of incremental bundle metadata."""
metadata = BundleMetadata(
bundle_type=BundleType.INCREMENTAL,
repository_id="rad:test",
source_node="did:key:test",
timestamp=1000,
refs_included=["refs/heads/feature"],
prerequisites=["abc123", "def456"],
size_bytes=512,
checksum=b"\xff" * 32,
)
encoded = metadata.encode()
decoded, _ = BundleMetadata.decode(encoded)
assert decoded.bundle_type == BundleType.INCREMENTAL
assert decoded.prerequisites == ["abc123", "def456"]
class TestGitBundle:
"""Test GitBundle encoding/decoding."""
def test_encode_decode_roundtrip(self):
"""Test bundle encode/decode roundtrip."""
import hashlib
bundle_data = b"fake git bundle data"
metadata = BundleMetadata(
bundle_type=BundleType.FULL,
repository_id="rad:test",
source_node="did:key:test",
timestamp=1000,
refs_included=["refs/heads/main"],
prerequisites=[],
size_bytes=len(bundle_data),
checksum=hashlib.sha256(bundle_data).digest(),
)
bundle = GitBundle(metadata=metadata, data=bundle_data)
encoded = bundle.encode()
decoded = GitBundle.decode(encoded)
assert decoded.data == bundle_data
assert decoded.metadata.repository_id == metadata.repository_id
def test_checksum_verification_fails_on_corruption(self):
"""Test that checksum verification catches corruption."""
import hashlib
bundle_data = b"original data"
metadata = BundleMetadata(
bundle_type=BundleType.FULL,
repository_id="rad:test",
source_node="did:key:test",
timestamp=1000,
refs_included=[],
prerequisites=[],
size_bytes=len(bundle_data),
checksum=hashlib.sha256(bundle_data).digest(),
)
bundle = GitBundle(metadata=metadata, data=bundle_data)
encoded = bytearray(bundle.encode())
# Corrupt the data portion
encoded[-1] ^= 0xFF
with pytest.raises(ValueError, match="checksum mismatch"):
GitBundle.decode(bytes(encoded))
class TestGitBundleGenerator:
"""Test GitBundleGenerator."""
def test_get_refs(self, temp_git_repo):
"""Test getting refs from repository."""
generator = GitBundleGenerator(temp_git_repo)
refs = generator.get_refs(["refs/heads/*"])
assert "refs/heads/main" in refs or "refs/heads/master" in refs
def test_create_full_bundle(self, temp_git_repo):
"""Test creating a full bundle."""
generator = GitBundleGenerator(temp_git_repo)
bundle = generator.create_full_bundle(
repository_id="rad:test",
source_node="did:key:test",
)
assert bundle is not None
assert bundle.metadata.bundle_type == BundleType.FULL
assert bundle.metadata.size_bytes > 0
assert len(bundle.data) > 0
def test_create_incremental_bundle_no_changes(self, temp_git_repo):
"""Test that incremental bundle returns None when no changes."""
generator = GitBundleGenerator(temp_git_repo)
# Get current refs as basis
current_refs = generator.get_refs()
# Create incremental with same refs - should be None
bundle = generator.create_incremental_bundle(
repository_id="rad:test",
source_node="did:key:test",
basis_refs=current_refs,
)
assert bundle is None
def test_create_incremental_bundle_with_changes(self, temp_git_repo):
"""Test creating incremental bundle with new commits."""
generator = GitBundleGenerator(temp_git_repo)
# Get current refs as basis
basis_refs = generator.get_refs()
# Add new commit
(temp_git_repo / "new_file.txt").write_text("new content")
subprocess.run(["git", "add", "new_file.txt"], cwd=temp_git_repo, check=True, capture_output=True)
subprocess.run(
["git", "commit", "-m", "Add new file"],
cwd=temp_git_repo, check=True, capture_output=True
)
# Create incremental
bundle = generator.create_incremental_bundle(
repository_id="rad:test",
source_node="did:key:test",
basis_refs=basis_refs,
)
assert bundle is not None
assert bundle.metadata.bundle_type == BundleType.INCREMENTAL
def test_invalid_repo_path(self):
"""Test that invalid repo path raises error."""
with pytest.raises(ValueError, match="Not a Git repository"):
GitBundleGenerator(Path("/nonexistent/path"))
class TestGitBundleApplicator:
"""Test GitBundleApplicator."""
def test_apply_bundle(self, temp_git_repo, temp_bare_repo):
"""Test applying a bundle to a repository."""
# Create bundle from source repo
generator = GitBundleGenerator(temp_git_repo)
bundle = generator.create_full_bundle(
repository_id="rad:test",
source_node="did:key:test",
)
# Apply to bare repo
applicator = GitBundleApplicator(temp_bare_repo)
applied_refs = applicator.apply_bundle(bundle)
assert len(applied_refs) > 0
assert any("main" in ref or "master" in ref for ref in applied_refs)
def test_verify_bundle(self, temp_git_repo, temp_bare_repo):
"""Test bundle verification."""
generator = GitBundleGenerator(temp_git_repo)
bundle = generator.create_full_bundle(
repository_id="rad:test",
source_node="did:key:test",
)
applicator = GitBundleApplicator(temp_bare_repo)
ok, msg = applicator.verify_bundle(bundle)
assert ok
assert "verified" in msg.lower() or msg == ""
def test_get_current_refs(self, temp_git_repo):
"""Test getting current refs."""
applicator = GitBundleApplicator(temp_git_repo)
refs = applicator.get_current_refs(["refs/heads/*"])
assert len(refs) > 0

378
tests/test_gossip.py Normal file
View File

@ -0,0 +1,378 @@
"""Tests for GossipRelay."""
import json
import subprocess
import time
from pathlib import Path
from unittest.mock import MagicMock, patch, call
import pytest
from radicle_reticulum.gossip import GossipRelay, _read_refs, _radicle_storage_path
from radicle_reticulum.identity import RadicleIdentity
# ── Helpers ──────────────────────────────────────────────────────────────────
def _make_relay(tmp_path, rids=None, **kwargs):
identity = RadicleIdentity.generate()
with patch("radicle_reticulum.gossip.RNS.Reticulum"), \
patch("radicle_reticulum.gossip.RNS.Destination") as mock_dest_cls, \
patch("radicle_reticulum.gossip.RNS.Transport"):
mock_dest = MagicMock()
mock_dest.hash = b"\x00" * 16
mock_dest.hexhash = "0" * 32
mock_dest_cls.return_value = mock_dest
relay = GossipRelay(
identity=identity,
rids=rids or ["rad:z3abc123"],
storage=tmp_path / "storage",
**kwargs,
)
relay.destination = mock_dest
return relay
SAMPLE_REFS = {
"refs/heads/main": "abc123def456" * 3,
"refs/rad/sigrefs": "def456abc123" * 3,
}
# ── _read_refs ────────────────────────────────────────────────────────────────
class TestReadRefs:
def test_missing_storage_returns_empty(self, tmp_path):
result = _read_refs(tmp_path / "storage", "rad:z3nonexistent")
assert result == {}
def test_reads_refs_from_bare_repo(self, tmp_path):
rid = "rad:z3testrepo"
repo_path = tmp_path / "z3testrepo"
repo_path.mkdir()
show_ref_output = (
"abc123def456abc123def456abc123def456abc1 refs/heads/main\n"
"def456abc123def456abc123def456abc123def4 refs/rad/sigrefs\n"
)
with patch("radicle_reticulum.gossip.subprocess.run") as mock_run:
mock_run.return_value = MagicMock(returncode=0, stdout=show_ref_output)
result = _read_refs(tmp_path, rid)
assert result["refs/heads/main"] == "abc123def456abc123def456abc123def456abc1"
assert result["refs/rad/sigrefs"] == "def456abc123def456abc123def456abc123def4"
def test_strips_rad_prefix(self, tmp_path):
# rid with "rad:" prefix should map to directory without it
repo_path = tmp_path / "z3abc"
repo_path.mkdir()
result = _read_refs(tmp_path, "rad:z3abc")
assert result == {} # empty repo, no refs — just confirming no crash
# ── GossipRelay construction ──────────────────────────────────────────────────
class TestGossipRelayInit:
def test_defaults(self, tmp_path):
relay = _make_relay(tmp_path)
assert relay.rids == ["rad:z3abc123"]
assert relay.poll_interval == 30
assert relay.bridge_port == 8777
assert relay.announce_retry_delays == (5, 15, 30)
def test_custom_params(self, tmp_path):
relay = _make_relay(
tmp_path,
rids=["rad:z3a", "rad:z3b"],
bridge_port=9000,
poll_interval=60,
)
assert len(relay.rids) == 2
assert relay.bridge_port == 9000
assert relay.poll_interval == 60
# ── Broadcast ────────────────────────────────────────────────────────────────
class TestBroadcast:
def test_broadcast_sends_to_known_peers(self, tmp_path):
relay = _make_relay(tmp_path, radicle_nid="z6Mktest")
peer_hash = b"\x01" * 16
relay._known_peers[peer_hash] = time.time()
with patch.object(relay, "_send_packet", return_value=True) as mock_send:
relay._broadcast("rad:z3abc123", SAMPLE_REFS)
mock_send.assert_called_once()
args = mock_send.call_args[0]
assert args[0] == peer_hash
msg = json.loads(args[1].decode())
assert msg["rid"] == "rad:z3abc123"
assert msg["nid"] == "z6Mktest"
assert msg["refs"] == SAMPLE_REFS
def test_broadcast_no_peers_sends_nothing(self, tmp_path):
relay = _make_relay(tmp_path)
with patch.object(relay, "_send_packet") as mock_send:
relay._broadcast("rad:z3abc123", SAMPLE_REFS)
mock_send.assert_not_called()
def test_broadcast_multiple_peers(self, tmp_path):
relay = _make_relay(tmp_path)
for i in range(3):
relay._known_peers[bytes([i]) * 16] = time.time()
with patch.object(relay, "_send_packet", return_value=True) as mock_send:
relay._broadcast("rad:z3abc123", SAMPLE_REFS)
assert mock_send.call_count == 3
# ── Poll loop ─────────────────────────────────────────────────────────────────
class TestPollLoop:
def test_broadcasts_on_ref_change(self, tmp_path):
relay = _make_relay(tmp_path)
relay._known_refs["rad:z3abc123"] = {"refs/heads/main": "old_sha"}
new_refs = {"refs/heads/main": "new_sha"}
with patch("radicle_reticulum.gossip._read_refs", return_value=new_refs), \
patch.object(relay, "_broadcast") as mock_broadcast:
# Simulate one poll iteration
for rid in relay.rids:
refs = new_refs
old = relay._known_refs.get(rid)
if refs and refs != old:
if old is not None:
relay._broadcast(rid, refs)
relay._known_refs[rid] = refs
mock_broadcast.assert_called_once_with("rad:z3abc123", new_refs)
def test_no_broadcast_on_first_poll(self, tmp_path):
relay = _make_relay(tmp_path)
# No existing refs — first poll should not broadcast
with patch("radicle_reticulum.gossip._read_refs", return_value=SAMPLE_REFS), \
patch.object(relay, "_broadcast") as mock_broadcast:
for rid in relay.rids:
refs = SAMPLE_REFS
old = relay._known_refs.get(rid)
if refs and refs != old:
if old is not None:
relay._broadcast(rid, refs)
relay._known_refs[rid] = refs
mock_broadcast.assert_not_called()
assert relay._known_refs["rad:z3abc123"] == SAMPLE_REFS
def test_no_broadcast_when_refs_unchanged(self, tmp_path):
relay = _make_relay(tmp_path)
relay._known_refs["rad:z3abc123"] = SAMPLE_REFS
with patch("radicle_reticulum.gossip._read_refs", return_value=SAMPLE_REFS), \
patch.object(relay, "_broadcast") as mock_broadcast:
for rid in relay.rids:
refs = SAMPLE_REFS
old = relay._known_refs.get(rid)
if refs and refs != old:
if old is not None:
relay._broadcast(rid, refs)
mock_broadcast.assert_not_called()
# ── Incoming packets ──────────────────────────────────────────────────────────
class TestOnPacket:
def _make_packet(self, rid, nid, refs):
return json.dumps({"type": "refs", "rid": rid, "nid": nid, "refs": refs}).encode()
def test_triggers_sync_on_new_refs(self, tmp_path):
relay = _make_relay(tmp_path)
relay._known_refs["rad:z3abc123"] = {"refs/heads/main": "old"}
mock_packet = MagicMock()
with patch.object(relay, "_trigger_sync") as mock_sync:
relay._on_packet(
self._make_packet("rad:z3abc123", "z6Mkpeer", {"refs/heads/main": "new"}),
mock_packet,
)
mock_sync.assert_called_once_with("rad:z3abc123", "z6Mkpeer")
def test_no_sync_when_refs_match(self, tmp_path):
relay = _make_relay(tmp_path)
relay._known_refs["rad:z3abc123"] = SAMPLE_REFS
mock_packet = MagicMock()
with patch.object(relay, "_trigger_sync") as mock_sync:
relay._on_packet(
self._make_packet("rad:z3abc123", "z6Mkpeer", SAMPLE_REFS),
mock_packet,
)
mock_sync.assert_not_called()
def test_ignores_invalid_json(self, tmp_path):
relay = _make_relay(tmp_path)
mock_packet = MagicMock()
# Should not raise
relay._on_packet(b"not json at all", mock_packet)
def test_ignores_wrong_type(self, tmp_path):
relay = _make_relay(tmp_path)
mock_packet = MagicMock()
with patch.object(relay, "_trigger_sync") as mock_sync:
relay._on_packet(
json.dumps({"type": "ping", "rid": "rad:z3abc123"}).encode(),
mock_packet,
)
mock_sync.assert_not_called()
def test_triggers_sync_for_unknown_repo(self, tmp_path):
relay = _make_relay(tmp_path)
# No prior known refs — any incoming refs are "new"
mock_packet = MagicMock()
with patch.object(relay, "_trigger_sync") as mock_sync:
relay._on_packet(
self._make_packet("rad:z3unknown", "z6Mkpeer", SAMPLE_REFS),
mock_packet,
)
mock_sync.assert_called_once()
def test_sync_callback_invoked(self, tmp_path):
relay = _make_relay(tmp_path)
received = []
relay.set_on_sync_triggered(lambda rid, nid: received.append((rid, nid)))
with patch("radicle_reticulum.gossip.subprocess.run") as mock_run:
mock_run.return_value = MagicMock(returncode=0)
relay._trigger_sync("rad:z3abc123", "z6Mkpeer")
assert received == [("rad:z3abc123", "z6Mkpeer")]
# ── Trigger sync ──────────────────────────────────────────────────────────────
class TestTriggerSync:
def test_calls_rad_node_connect_when_nid_given(self, tmp_path):
relay = _make_relay(tmp_path, bridge_port=8777)
with patch("radicle_reticulum.gossip.subprocess.run") as mock_run:
mock_run.return_value = MagicMock(returncode=0)
relay._trigger_sync("rad:z3abc123", "z6Mkpeer123")
calls = [c[0][0] for c in mock_run.call_args_list]
connect_calls = [c for c in calls if "connect" in c]
assert connect_calls
assert "z6Mkpeer123@127.0.0.1:8777" in connect_calls[0]
def test_skips_connect_when_no_nid(self, tmp_path):
relay = _make_relay(tmp_path)
with patch("radicle_reticulum.gossip.subprocess.run") as mock_run:
mock_run.return_value = MagicMock(returncode=0)
relay._trigger_sync("rad:z3abc123", "")
calls = [c[0][0] for c in mock_run.call_args_list]
connect_calls = [c for c in calls if "connect" in c]
assert not connect_calls
def test_calls_rad_sync_fetch(self, tmp_path):
relay = _make_relay(tmp_path)
with patch("radicle_reticulum.gossip.subprocess.run") as mock_run:
mock_run.return_value = MagicMock(returncode=0)
relay._trigger_sync("rad:z3abc123", "")
all_calls = [c[0][0] for c in mock_run.call_args_list]
sync_calls = [c for c in all_calls if "sync" in c]
assert sync_calls
assert "--fetch" in sync_calls[0]
assert "--rid" in sync_calls[0]
# ── Peer discovery ────────────────────────────────────────────────────────────
class TestPeerDiscovery:
def _make_app_data(self, nid=None):
import struct
from radicle_reticulum.gossip import GOSSIP_MAGIC
data = GOSSIP_MAGIC
if nid:
b = nid.encode()
data += struct.pack("!H", len(b)) + b
return data
def test_new_peer_added(self, tmp_path):
relay = _make_relay(tmp_path)
peer_hash = b"\x02" * 16
relay._known_refs["rad:z3abc123"] = SAMPLE_REFS
with patch.object(relay, "_send_packet"):
relay._on_announce(peer_hash, MagicMock(), self._make_app_data("z6Mkpeer"))
assert peer_hash in relay._known_peers
def test_own_announce_ignored(self, tmp_path):
relay = _make_relay(tmp_path)
own_hash = relay.destination.hash
with patch.object(relay, "_send_packet") as mock_send:
relay._on_announce(own_hash, MagicMock(), self._make_app_data())
mock_send.assert_not_called()
assert own_hash not in relay._known_peers
def test_non_gossip_announce_ignored(self, tmp_path):
relay = _make_relay(tmp_path)
peer_hash = b"\x03" * 16
with patch.object(relay, "_send_packet") as mock_send:
relay._on_announce(peer_hash, MagicMock(), b"OTHER_APP_DATA")
mock_send.assert_not_called()
assert peer_hash not in relay._known_peers
def test_sends_current_refs_to_new_peer(self, tmp_path):
relay = _make_relay(tmp_path)
relay._known_refs["rad:z3abc123"] = SAMPLE_REFS
peer_hash = b"\x04" * 16
with patch.object(relay, "_send_packet") as mock_send:
relay._on_announce(peer_hash, MagicMock(), self._make_app_data())
mock_send.assert_called_once()
assert mock_send.call_args[0][0] == peer_hash
def test_duplicate_announce_not_re_sent(self, tmp_path):
relay = _make_relay(tmp_path)
relay._known_refs["rad:z3abc123"] = SAMPLE_REFS
peer_hash = b"\x05" * 16
relay._known_peers[peer_hash] = time.time() # already known
with patch.object(relay, "_send_packet") as mock_send:
relay._on_announce(peer_hash, MagicMock(), self._make_app_data())
mock_send.assert_not_called()
# ── push_refs_now ─────────────────────────────────────────────────────────────
class TestPushRefsNow:
def test_push_refs_now_broadcasts_immediately(self, tmp_path):
relay = _make_relay(tmp_path)
peer_hash = b"\x06" * 16
relay._known_peers[peer_hash] = time.time()
with patch("radicle_reticulum.gossip._read_refs", return_value=SAMPLE_REFS), \
patch.object(relay, "_send_packet", return_value=True) as mock_send:
relay.push_refs_now("rad:z3abc123")
mock_send.assert_called_once()
def test_push_refs_now_updates_known_refs(self, tmp_path):
relay = _make_relay(tmp_path)
with patch("radicle_reticulum.gossip._read_refs", return_value=SAMPLE_REFS), \
patch.object(relay, "_send_packet", return_value=True):
relay.push_refs_now("rad:z3abc123")
assert relay._known_refs["rad:z3abc123"] == SAMPLE_REFS

View File

@ -1,108 +0,0 @@
"""Tests for QR bundle encoding/decoding."""
import hashlib
import pytest
from radicle_reticulum.git_bundle import GitBundle, BundleMetadata, BundleType
from radicle_reticulum.qr import (
encode_bundle_to_qr,
decode_bundle_from_qr_data,
BundleTooLargeForQR,
QR_MAX_BYTES,
QR_MAGIC,
)
def make_small_bundle(data: bytes = b"tiny git bundle data") -> GitBundle:
"""Create a minimal GitBundle for testing."""
metadata = BundleMetadata(
bundle_type=BundleType.INCREMENTAL,
repository_id="rad:z3test",
source_node="did:key:z6Mktest",
timestamp=1000,
refs_included=["refs/heads/main"],
prerequisites=["abc123"],
size_bytes=len(data),
checksum=hashlib.sha256(data).digest(),
)
return GitBundle(metadata=metadata, data=data)
class TestQRPayloadRoundtrip:
"""Test the binary payload encode/decode without QR rendering."""
def _encode_payload(self, bundle: GitBundle) -> bytes:
"""Extract the raw bytes that would be put into a QR code."""
import struct
bundle_bytes = bundle.encode()
checksum = hashlib.sha256(bundle_bytes).digest()
length_prefix = len(bundle_bytes).to_bytes(4, "big")
return QR_MAGIC + length_prefix + checksum + bundle_bytes
def test_decode_roundtrip(self):
bundle = make_small_bundle()
payload = self._encode_payload(bundle)
decoded = decode_bundle_from_qr_data(payload)
assert decoded.metadata.repository_id == bundle.metadata.repository_id
assert decoded.data == bundle.data
def test_decode_rejects_wrong_magic(self):
bundle = make_small_bundle()
payload = b"WRONGMAGIC" + self._encode_payload(bundle)[len(QR_MAGIC):]
with pytest.raises(ValueError, match="Not a Radicle QR payload"):
decode_bundle_from_qr_data(payload)
def test_decode_detects_corruption(self):
bundle = make_small_bundle()
payload = bytearray(self._encode_payload(bundle))
payload[-1] ^= 0xFF # flip last bit of bundle data
with pytest.raises(ValueError, match="checksum mismatch"):
decode_bundle_from_qr_data(bytes(payload))
def test_decode_rejects_truncated_payload(self):
bundle = make_small_bundle()
payload = self._encode_payload(bundle)
# Truncate the bundle data portion
truncated = payload[:-10]
with pytest.raises(ValueError, match="Truncated"):
decode_bundle_from_qr_data(truncated)
class TestBundleTooLarge:
def test_oversized_bundle_raises(self):
large_data = b"x" * (QR_MAX_BYTES + 1)
bundle = make_small_bundle(data=large_data)
with pytest.raises(BundleTooLargeForQR, match="QR capacity"):
encode_bundle_to_qr(bundle)
def test_exact_limit_would_include_overhead(self):
# QR_MAX_BYTES is the limit for the serialised bundle, including metadata.
# A bundle with data just under the limit should still fail due to metadata overhead.
# This test verifies the check catches realistic oversized bundles.
oversized_data = b"a" * QR_MAX_BYTES
bundle = make_small_bundle(data=oversized_data)
with pytest.raises(BundleTooLargeForQR):
encode_bundle_to_qr(bundle)
class TestQREncodeOutput:
"""Test QR rendering (requires qrcode package)."""
def test_encode_returns_ascii_art(self):
bundle = make_small_bundle()
result = encode_bundle_to_qr(bundle)
assert isinstance(result, str)
assert len(result) > 0
# ASCII art QR codes use block characters
assert any(c in result for c in ("", "", " ", "\n"))
def test_encode_small_bundle_succeeds(self):
bundle = make_small_bundle(b"small")
result = encode_bundle_to_qr(bundle)
assert result # non-empty
def test_error_correction_levels(self):
bundle = make_small_bundle()
for level in ("L", "M", "Q", "H"):
result = encode_bundle_to_qr(bundle, error_correction=level)
assert isinstance(result, str)

View File

@ -1,543 +0,0 @@
"""Tests for sync data structures and SyncManager logic (no RNS networking)."""
import struct
import time
import tempfile
from pathlib import Path
from unittest.mock import MagicMock, patch
import pytest
import LXMF
from radicle_reticulum.sync import (
RefsAnnouncement,
SyncManager,
SyncMode,
CONTENT_TYPE_BUNDLE,
CONTENT_TYPE_BUNDLE_CHUNK,
CONTENT_TYPE_REFS_ANNOUNCE,
CHUNK_HEADER_SIZE,
)
from radicle_reticulum.identity import RadicleIdentity
from radicle_reticulum.git_bundle import GitBundle, BundleMetadata, BundleType
import hashlib
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _make_manager(tmp_path: Path) -> SyncManager:
"""Instantiate SyncManager with LXMF/RNS components patched out."""
identity = RadicleIdentity.generate()
mock_router = MagicMock()
mock_dest = MagicMock()
mock_dest.hash = b"\x00" * 32
mock_dest.hash_hex = "00" * 32
with patch("radicle_reticulum.sync.RNS.Reticulum"), \
patch("radicle_reticulum.sync.LXMF.LXMRouter", return_value=mock_router), \
patch("radicle_reticulum.sync.RNS.log"):
mock_router.register_delivery_identity.return_value = mock_dest
manager = SyncManager(identity=identity, storage_path=tmp_path)
manager._lxmf_router = mock_router
manager._lxmf_destination = mock_dest
return manager
def _make_bundle(data: bytes = b"git bundle") -> GitBundle:
checksum = hashlib.sha256(data).digest()
metadata = BundleMetadata(
bundle_type=BundleType.FULL,
repository_id="rad:z3test",
source_node="did:key:z6Mktest",
timestamp=1000,
refs_included=["refs/heads/main"],
prerequisites=[],
size_bytes=len(data),
checksum=checksum,
)
return GitBundle(metadata=metadata, data=data)
class TestRefsAnnouncement:
def test_encode_decode_roundtrip(self):
ann = RefsAnnouncement(
repository_id="rad:z3gqcJUoA1n9HaHKufZs5FCSGazv5",
node_id="did:key:z6MkhaXgBZDvotDkL5257faiztiGiC2QtKLGpbnnEGta2doK",
refs={
"refs/heads/main": "abc123def456789012345678901234567890abcd",
"refs/rad/id": "0000000000000000000000000000000000000001",
},
timestamp=1234567890123,
)
encoded = ann.encode()
decoded = RefsAnnouncement.decode(encoded)
assert decoded.repository_id == ann.repository_id
assert decoded.node_id == ann.node_id
assert decoded.refs == ann.refs
assert decoded.timestamp == ann.timestamp
def test_empty_refs(self):
ann = RefsAnnouncement(
repository_id="rad:test",
node_id="did:key:z6Mk...",
refs={},
timestamp=0,
)
decoded = RefsAnnouncement.decode(ann.encode())
assert decoded.refs == {}
def test_many_refs(self):
refs = {f"refs/heads/branch-{i}": "a" * 40 for i in range(50)}
ann = RefsAnnouncement(
repository_id="rad:z3test",
node_id="did:key:z6Mktest",
refs=refs,
timestamp=9999,
)
decoded = RefsAnnouncement.decode(ann.encode())
assert decoded.refs == refs
def test_special_characters_in_ref_names(self):
refs = {"refs/heads/feature/my-branch_v2.0": "b" * 40}
ann = RefsAnnouncement(
repository_id="rad:z3test",
node_id="did:key:z6Mktest",
refs=refs,
timestamp=1,
)
decoded = RefsAnnouncement.decode(ann.encode())
assert decoded.refs == refs
def test_encode_produces_bytes(self):
ann = RefsAnnouncement(
repository_id="rad:test",
node_id="did:key:z6Mk",
refs={"refs/heads/main": "a" * 40},
timestamp=1000,
)
assert isinstance(ann.encode(), bytes)
assert len(ann.encode()) > 0
# ---------------------------------------------------------------------------
# SyncManager state management
# ---------------------------------------------------------------------------
class TestSyncManagerPeers:
def test_initial_peer_list_empty(self, tmp_path):
manager = _make_manager(tmp_path)
assert manager.get_known_peers() == []
def test_register_peer_adds_to_list(self, tmp_path):
manager = _make_manager(tmp_path)
with patch("radicle_reticulum.sync.RNS.log"):
manager.register_peer(b"\x01" * 16)
assert b"\x01" * 16 in manager.get_known_peers()
def test_register_multiple_peers(self, tmp_path):
manager = _make_manager(tmp_path)
hashes = [bytes([i]) * 16 for i in range(3)]
with patch("radicle_reticulum.sync.RNS.log"):
for h in hashes:
manager.register_peer(h)
assert set(manager.get_known_peers()) == set(hashes)
def test_peer_learned_from_incoming_message(self, tmp_path):
manager = _make_manager(tmp_path)
source_hash = b"\xab" * 16
bundle = _make_bundle()
msg = MagicMock()
msg.source_hash = source_hash
msg.fields = {LXMF.FIELD_CUSTOM_TYPE: CONTENT_TYPE_BUNDLE}
msg.content = bundle.encode()
with patch("radicle_reticulum.sync.RNS.log"):
manager._on_lxmf_delivery(msg)
assert source_hash in manager.get_known_peers()
def test_message_without_source_hash_still_processed(self, tmp_path):
manager = _make_manager(tmp_path)
bundle = _make_bundle()
msg = MagicMock()
msg.source_hash = None
msg.fields = {LXMF.FIELD_CUSTOM_TYPE: CONTENT_TYPE_BUNDLE}
msg.content = bundle.encode()
received = []
manager.set_on_bundle_received(received.append)
with patch("radicle_reticulum.sync.RNS.log"):
manager._on_lxmf_delivery(msg)
assert len(received) == 1
# ---------------------------------------------------------------------------
# SyncManager delivery callbacks
# ---------------------------------------------------------------------------
class TestSyncManagerDelivery:
def test_bundle_callback_fires_on_bundle_message(self, tmp_path):
manager = _make_manager(tmp_path)
bundle = _make_bundle()
received = []
manager.set_on_bundle_received(received.append)
msg = MagicMock()
msg.source_hash = b"\x01" * 16
msg.fields = {LXMF.FIELD_CUSTOM_TYPE: CONTENT_TYPE_BUNDLE}
msg.content = bundle.encode()
with patch("radicle_reticulum.sync.RNS.log"):
manager._on_lxmf_delivery(msg)
assert len(received) == 1
assert received[0].metadata.repository_id == "rad:z3test"
def test_refs_callback_fires_on_refs_announce_message(self, tmp_path):
manager = _make_manager(tmp_path)
ann = RefsAnnouncement(
repository_id="rad:z3test",
node_id="did:key:z6Mktest",
refs={"refs/heads/main": "a" * 40},
timestamp=999,
)
received = []
manager.set_on_refs_announced(received.append)
msg = MagicMock()
msg.source_hash = b"\x02" * 16
msg.fields = {LXMF.FIELD_CUSTOM_TYPE: CONTENT_TYPE_REFS_ANNOUNCE}
msg.content = ann.encode()
with patch("radicle_reticulum.sync.RNS.log"):
manager._on_lxmf_delivery(msg)
assert len(received) == 1
assert received[0].repository_id == "rad:z3test"
def test_unknown_content_type_silently_ignored(self, tmp_path):
manager = _make_manager(tmp_path)
msg = MagicMock()
msg.source_hash = None
msg.fields = {LXMF.FIELD_CUSTOM_TYPE: 0xFF}
msg.content = b"garbage"
with patch("radicle_reticulum.sync.RNS.log"):
manager._on_lxmf_delivery(msg) # should not raise
# ---------------------------------------------------------------------------
# Chunk reassembly
# ---------------------------------------------------------------------------
class TestChunkReassembly:
def _make_chunks(self, data: bytes, chunk_size: int = 10):
"""Split data into chunk messages as _send_chunked_bundle would."""
bundle_id = hashlib.sha256(data).digest()[:16]
total = (len(data) + chunk_size - 1) // chunk_size
msgs = []
for i in range(total):
chunk = data[i * chunk_size: (i + 1) * chunk_size]
header = struct.pack("!16sHH", bundle_id, i, total)
msgs.append(header + chunk)
return msgs
def test_reassemble_two_chunks(self, tmp_path):
manager = _make_manager(tmp_path)
bundle = _make_bundle(b"first half second half ")
bundle_bytes = bundle.encode()
received = []
manager.set_on_bundle_received(received.append)
chunks = self._make_chunks(bundle_bytes, chunk_size=len(bundle_bytes) // 2 + 1)
assert len(chunks) == 2
for chunk_data in chunks:
msg = MagicMock()
msg.source_hash = b"\x01" * 16
msg.fields = {LXMF.FIELD_CUSTOM_TYPE: CONTENT_TYPE_BUNDLE_CHUNK}
msg.content = chunk_data
with patch("radicle_reticulum.sync.RNS.log"):
manager._on_lxmf_delivery(msg)
assert len(received) == 1
assert received[0].data == bundle.data
def test_out_of_order_chunks_reassemble_correctly(self, tmp_path):
manager = _make_manager(tmp_path)
bundle = _make_bundle(b"abcdefghijklmnopqrstuvwxyz0123456789")
bundle_bytes = bundle.encode()
received = []
manager.set_on_bundle_received(received.append)
chunks = self._make_chunks(bundle_bytes, chunk_size=8)
assert len(chunks) >= 3
for chunk_data in reversed(chunks): # deliver in reverse order
msg = MagicMock()
msg.source_hash = None
msg.fields = {LXMF.FIELD_CUSTOM_TYPE: CONTENT_TYPE_BUNDLE_CHUNK}
msg.content = chunk_data
with patch("radicle_reticulum.sync.RNS.log"):
manager._on_lxmf_delivery(msg)
assert len(received) == 1
assert received[0].data == bundle.data
def test_malformed_chunk_too_short_is_ignored(self, tmp_path):
manager = _make_manager(tmp_path)
msg = MagicMock()
msg.source_hash = None
msg.fields = {LXMF.FIELD_CUSTOM_TYPE: CONTENT_TYPE_BUNDLE_CHUNK}
msg.content = b"\x00" * (CHUNK_HEADER_SIZE - 1) # too short
with patch("radicle_reticulum.sync.RNS.log"):
manager._on_lxmf_delivery(msg) # should not raise
assert manager._chunk_buffers == {}
def test_partial_chunks_not_delivered_until_complete(self, tmp_path):
manager = _make_manager(tmp_path)
bundle = _make_bundle(b"partial test data here")
bundle_bytes = bundle.encode()
received = []
manager.set_on_bundle_received(received.append)
chunks = self._make_chunks(bundle_bytes, chunk_size=5)
assert len(chunks) >= 2
# Send only first chunk
msg = MagicMock()
msg.source_hash = None
msg.fields = {LXMF.FIELD_CUSTOM_TYPE: CONTENT_TYPE_BUNDLE_CHUNK}
msg.content = chunks[0]
with patch("radicle_reticulum.sync.RNS.log"):
manager._on_lxmf_delivery(msg)
assert received == [] # not yet complete
# ---------------------------------------------------------------------------
# SyncManager repository management
# ---------------------------------------------------------------------------
class TestSyncManagerRepositories:
def test_register_repository(self, tmp_path):
manager = _make_manager(tmp_path)
repo_path = tmp_path / "repo"
repo_path.mkdir()
with patch("radicle_reticulum.sync.RNS.log"):
state = manager.register_repository("rad:z3test", repo_path)
assert state.repository_id == "rad:z3test"
assert state.local_path == repo_path
def test_register_same_repo_twice_returns_same_state(self, tmp_path):
manager = _make_manager(tmp_path)
repo_path = tmp_path / "repo"
repo_path.mkdir()
with patch("radicle_reticulum.sync.RNS.log"):
s1 = manager.register_repository("rad:z3test", repo_path)
s2 = manager.register_repository("rad:z3test", repo_path)
assert s1 is s2
def test_get_sync_status_returns_none_for_unknown(self, tmp_path):
manager = _make_manager(tmp_path)
assert manager.get_sync_status("rad:unknown") is None
def test_get_sync_status_after_register(self, tmp_path):
manager = _make_manager(tmp_path)
repo_path = tmp_path / "repo"
repo_path.mkdir()
with patch("radicle_reticulum.sync.RNS.log"):
manager.register_repository("rad:z3test", repo_path)
status = manager.get_sync_status("rad:z3test")
assert status is not None
assert status["repository_id"] == "rad:z3test"
assert status["known_peers"] == 0
# ---------------------------------------------------------------------------
# Phase 4: speculative push
# ---------------------------------------------------------------------------
def _make_manager_auto_push(tmp_path: Path) -> SyncManager:
identity = RadicleIdentity.generate()
mock_router = MagicMock()
mock_dest = MagicMock()
mock_dest.hash = b"\x00" * 32
mock_dest.hash_hex = "00" * 32
with patch("radicle_reticulum.sync.RNS.Reticulum"), \
patch("radicle_reticulum.sync.LXMF.LXMRouter", return_value=mock_router), \
patch("radicle_reticulum.sync.RNS.log"):
mock_router.register_delivery_identity.return_value = mock_dest
manager = SyncManager(identity=identity, storage_path=tmp_path, auto_push=True)
manager._lxmf_router = mock_router
manager._lxmf_destination = mock_dest
return manager
class TestShouldPushToPeer:
def test_no_push_when_peer_up_to_date(self, tmp_path):
manager = _make_manager(tmp_path)
our_refs = {"refs/heads/main": "a" * 40}
peer_refs = {"refs/heads/main": "a" * 40}
assert manager._should_push_to_peer(our_refs, peer_refs) is False
def test_push_when_peer_behind(self, tmp_path):
manager = _make_manager(tmp_path)
our_refs = {"refs/heads/main": "b" * 40}
peer_refs = {"refs/heads/main": "a" * 40}
assert manager._should_push_to_peer(our_refs, peer_refs) is True
def test_push_when_peer_missing_ref(self, tmp_path):
manager = _make_manager(tmp_path)
our_refs = {"refs/heads/main": "a" * 40, "refs/heads/dev": "b" * 40}
peer_refs = {"refs/heads/main": "a" * 40}
assert manager._should_push_to_peer(our_refs, peer_refs) is True
def test_no_push_when_our_refs_empty(self, tmp_path):
manager = _make_manager(tmp_path)
assert manager._should_push_to_peer({}, {"refs/heads/main": "a" * 40}) is False
def test_no_push_when_both_empty(self, tmp_path):
manager = _make_manager(tmp_path)
assert manager._should_push_to_peer({}, {}) is False
class TestSpeculativePush:
def _make_announcement(self, repo_id="rad:z3test", refs=None):
return RefsAnnouncement(
repository_id=repo_id,
node_id="did:key:z6Mkpeer",
refs=refs or {"refs/heads/main": "a" * 40},
timestamp=1000,
)
def test_auto_push_false_does_not_call_maybe_push(self, tmp_path):
manager = _make_manager(tmp_path) # auto_push=False by default
ann = self._make_announcement()
msg = MagicMock()
msg.source_hash = b"\x01" * 16
msg.fields = {LXMF.FIELD_CUSTOM_TYPE: CONTENT_TYPE_REFS_ANNOUNCE}
msg.content = ann.encode()
with patch("radicle_reticulum.sync.RNS.log"), \
patch.object(manager, "_maybe_push_to_peer") as mock_push:
manager._on_lxmf_delivery(msg)
mock_push.assert_not_called()
def test_auto_push_true_calls_maybe_push_on_refs_announce(self, tmp_path):
manager = _make_manager_auto_push(tmp_path)
ann = self._make_announcement()
msg = MagicMock()
msg.source_hash = b"\x02" * 16
msg.fields = {LXMF.FIELD_CUSTOM_TYPE: CONTENT_TYPE_REFS_ANNOUNCE}
msg.content = ann.encode()
with patch("radicle_reticulum.sync.RNS.log"), \
patch.object(manager, "_maybe_push_to_peer") as mock_push:
manager._on_lxmf_delivery(msg)
mock_push.assert_called_once_with(b"\x02" * 16, mock_push.call_args[0][1])
def test_auto_push_skipped_when_source_hash_none(self, tmp_path):
manager = _make_manager_auto_push(tmp_path)
ann = self._make_announcement()
msg = MagicMock()
msg.source_hash = None
msg.fields = {LXMF.FIELD_CUSTOM_TYPE: CONTENT_TYPE_REFS_ANNOUNCE}
msg.content = ann.encode()
with patch("radicle_reticulum.sync.RNS.log"), \
patch.object(manager, "_maybe_push_to_peer") as mock_push:
manager._on_lxmf_delivery(msg)
mock_push.assert_not_called()
def test_maybe_push_skips_unknown_repository(self, tmp_path):
manager = _make_manager_auto_push(tmp_path)
ann = self._make_announcement(repo_id="rad:unknown")
with patch("radicle_reticulum.sync.RNS.log"), \
patch("radicle_reticulum.sync.GitBundleGenerator") as mock_gen:
manager._maybe_push_to_peer(b"\x03" * 16, ann)
mock_gen.assert_not_called()
def test_maybe_push_skips_when_peer_up_to_date(self, tmp_path):
manager = _make_manager_auto_push(tmp_path)
repo_path = tmp_path / "repo"
repo_path.mkdir()
with patch("radicle_reticulum.sync.RNS.log"):
manager.register_repository("rad:z3test", repo_path)
our_refs = {"refs/heads/main": "a" * 40}
peer_refs = {"refs/heads/main": "a" * 40}
ann = self._make_announcement(refs=peer_refs)
with patch("radicle_reticulum.sync.RNS.log"), \
patch("radicle_reticulum.sync.GitBundleGenerator") as mock_gen_cls:
mock_gen = MagicMock()
mock_gen_cls.return_value = mock_gen
mock_gen.get_refs.return_value = our_refs
manager._maybe_push_to_peer(b"\x04" * 16, ann)
mock_gen.create_incremental_bundle.assert_not_called()
def test_maybe_push_sends_bundle_when_ahead(self, tmp_path):
manager = _make_manager_auto_push(tmp_path)
repo_path = tmp_path / "repo"
repo_path.mkdir()
with patch("radicle_reticulum.sync.RNS.log"):
manager.register_repository("rad:z3test", repo_path)
our_refs = {"refs/heads/main": "b" * 40}
peer_refs = {"refs/heads/main": "a" * 40}
ann = self._make_announcement(refs=peer_refs)
bundle = _make_bundle(b"incremental data")
with patch("radicle_reticulum.sync.RNS.log"), \
patch("radicle_reticulum.sync.GitBundleGenerator") as mock_gen_cls, \
patch.object(manager, "_send_lxmf_message", return_value=True) as mock_send:
mock_gen = MagicMock()
mock_gen_cls.return_value = mock_gen
mock_gen.get_refs.return_value = our_refs
mock_gen.create_incremental_bundle.return_value = bundle
manager._maybe_push_to_peer(b"\x05" * 16, ann)
mock_send.assert_called_once()
call_args = mock_send.call_args[0]
assert call_args[0] == b"\x05" * 16
assert call_args[1] == CONTENT_TYPE_BUNDLE