"""Git bundle generation and application for Radicle repos. Supports both full repository bundles and incremental bundles containing only new commits since a known state. Radicle stores data under these ref namespaces: - refs/heads/* - Git branches - refs/tags/* - Git tags - refs/rad/id - Repository identity - refs/rad/sigrefs - Signed refs - refs/rad/cob/* - Collaborative Objects (issues, patches, etc.) - refs/rad/cob/xyz.issue/* - refs/rad/cob/xyz.patch/* """ import os import subprocess import tempfile import hashlib from dataclasses import dataclass, field from enum import Enum from pathlib import Path from typing import Dict, List, Optional, Set, Tuple import struct import time class BundleType(Enum): """Type of Git bundle.""" FULL = "full" # Complete repository INCREMENTAL = "incremental" # Only new commits # Radicle ref patterns to include in sync RADICLE_REF_PATTERNS = [ "refs/heads/*", "refs/tags/*", "refs/rad/id", "refs/rad/sigrefs", "refs/rad/cob/*", ] @dataclass class BundleMetadata: """Metadata about a Git bundle for transport.""" bundle_type: BundleType repository_id: str # Radicle repo ID (rad:z...) source_node: str # DID of source node timestamp: int # Unix timestamp ms refs_included: List[str] # List of refs in bundle prerequisites: List[str] # Commits required (for incremental) size_bytes: int checksum: bytes # SHA-256 of bundle data def encode(self) -> bytes: """Encode metadata to bytes.""" repo_bytes = self.repository_id.encode("utf-8") node_bytes = self.source_node.encode("utf-8") refs_data = b"".join( struct.pack(f"!H{len(r)}s", len(r), r.encode("utf-8")) for r in self.refs_included ) prereq_data = b"".join( struct.pack(f"!H{len(p)}s", len(p), p.encode("utf-8")) for p in self.prerequisites ) return struct.pack( f"!BH{len(repo_bytes)}sH{len(node_bytes)}sQIH{len(refs_data)}sH{len(prereq_data)}s32s", 1 if self.bundle_type == BundleType.FULL else 2, len(repo_bytes), repo_bytes, len(node_bytes), node_bytes, self.timestamp, self.size_bytes, len(self.refs_included), refs_data, len(self.prerequisites), prereq_data, self.checksum, ) @classmethod def decode(cls, data: bytes) -> Tuple["BundleMetadata", int]: """Decode metadata from bytes. Returns (metadata, bytes_consumed).""" offset = 0 # Bundle type bundle_type_raw = struct.unpack("!B", data[offset:offset+1])[0] bundle_type = BundleType.FULL if bundle_type_raw == 1 else BundleType.INCREMENTAL offset += 1 # Repository ID repo_len = struct.unpack("!H", data[offset:offset+2])[0] offset += 2 repository_id = data[offset:offset+repo_len].decode("utf-8") offset += repo_len # Source node node_len = struct.unpack("!H", data[offset:offset+2])[0] offset += 2 source_node = data[offset:offset+node_len].decode("utf-8") offset += node_len # Timestamp and size timestamp, size_bytes = struct.unpack("!QI", data[offset:offset+12]) offset += 12 # Refs refs_count = struct.unpack("!H", data[offset:offset+2])[0] offset += 2 refs_included = [] for _ in range(refs_count): ref_len = struct.unpack("!H", data[offset:offset+2])[0] offset += 2 refs_included.append(data[offset:offset+ref_len].decode("utf-8")) offset += ref_len # Prerequisites prereq_count = struct.unpack("!H", data[offset:offset+2])[0] offset += 2 prerequisites = [] for _ in range(prereq_count): prereq_len = struct.unpack("!H", data[offset:offset+2])[0] offset += 2 prerequisites.append(data[offset:offset+prereq_len].decode("utf-8")) offset += prereq_len # Checksum checksum = data[offset:offset+32] offset += 32 return cls( bundle_type=bundle_type, repository_id=repository_id, source_node=source_node, timestamp=timestamp, refs_included=refs_included, prerequisites=prerequisites, size_bytes=size_bytes, checksum=checksum, ), offset @dataclass class GitBundle: """A Git bundle with metadata for transport.""" metadata: BundleMetadata data: bytes def encode(self) -> bytes: """Encode bundle with metadata for transport.""" meta_bytes = self.metadata.encode() return struct.pack("!I", len(meta_bytes)) + meta_bytes + self.data @classmethod def decode(cls, data: bytes) -> "GitBundle": """Decode bundle from transport format.""" meta_len = struct.unpack("!I", data[:4])[0] metadata, _ = BundleMetadata.decode(data[4:4+meta_len]) bundle_data = data[4+meta_len:] # Verify checksum actual_checksum = hashlib.sha256(bundle_data).digest() if actual_checksum != metadata.checksum: raise ValueError("Bundle checksum mismatch") return cls(metadata=metadata, data=bundle_data) def save(self, path: Path) -> None: """Save bundle data to file.""" path.write_bytes(self.data) @property def size(self) -> int: """Get total size including metadata.""" return len(self.encode()) class GitBundleGenerator: """Generates Git bundles from repositories.""" def __init__(self, repo_path: Path): """Initialize with path to Git repository.""" self.repo_path = Path(repo_path) if not (self.repo_path / ".git").exists() and not (self.repo_path / "HEAD").exists(): raise ValueError(f"Not a Git repository: {repo_path}") def _run_git(self, *args: str, check: bool = True) -> subprocess.CompletedProcess: """Run a git command in the repository.""" return subprocess.run( ["git", *args], cwd=self.repo_path, capture_output=True, text=True, check=check, ) def _run_git_binary(self, *args: str) -> bytes: """Run a git command and return binary output.""" result = subprocess.run( ["git", *args], cwd=self.repo_path, capture_output=True, check=True, ) return result.stdout def get_refs(self, patterns: Optional[List[str]] = None) -> Dict[str, str]: """Get refs matching patterns. Returns {ref_name: commit_sha}.""" if patterns is None: patterns = RADICLE_REF_PATTERNS refs = {} for pattern in patterns: result = self._run_git("for-each-ref", "--format=%(refname) %(objectname)", pattern, check=False) if result.returncode == 0: for line in result.stdout.strip().split("\n"): if line: parts = line.split() if len(parts) == 2: refs[parts[0]] = parts[1] return refs def get_radicle_repo_id(self) -> Optional[str]: """Get Radicle repository ID if this is a Radicle repo.""" # Radicle stores repo ID in .git/rad or config rad_dir = self.repo_path / ".git" / "rad" if rad_dir.exists(): # Try to read from rad config try: result = self._run_git("config", "--get", "rad.id", check=False) if result.returncode == 0: return result.stdout.strip() except Exception: pass return None def create_full_bundle( self, repository_id: str, source_node: str, output_path: Optional[Path] = None, ref_patterns: Optional[List[str]] = None, ) -> GitBundle: """Create a full bundle containing all refs. Args: repository_id: Radicle repo ID (rad:z...) source_node: DID of the source node output_path: Optional path to write bundle file ref_patterns: Ref patterns to include (default: Radicle patterns) """ refs = self.get_refs(ref_patterns) if not refs: raise ValueError("No refs to bundle") # Create bundle with all refs with tempfile.NamedTemporaryFile(suffix=".bundle", delete=False) as f: bundle_path = f.name try: # Build ref list for bundle create ref_args = list(refs.keys()) self._run_git("bundle", "create", bundle_path, *ref_args) bundle_data = Path(bundle_path).read_bytes() finally: os.unlink(bundle_path) metadata = BundleMetadata( bundle_type=BundleType.FULL, repository_id=repository_id, source_node=source_node, timestamp=int(time.time() * 1000), refs_included=list(refs.keys()), prerequisites=[], size_bytes=len(bundle_data), checksum=hashlib.sha256(bundle_data).digest(), ) bundle = GitBundle(metadata=metadata, data=bundle_data) if output_path: bundle.save(output_path) return bundle def create_incremental_bundle( self, repository_id: str, source_node: str, basis_refs: Dict[str, str], output_path: Optional[Path] = None, ref_patterns: Optional[List[str]] = None, ) -> Optional[GitBundle]: """Create an incremental bundle with only new commits. Args: repository_id: Radicle repo ID source_node: DID of the source node basis_refs: Known refs at destination {ref_name: commit_sha} output_path: Optional path to write bundle file ref_patterns: Ref patterns to include Returns: GitBundle if there are changes, None if no changes """ current_refs = self.get_refs(ref_patterns) if not current_refs: return None # Find refs that have changed or are new changed_refs = {} for ref, sha in current_refs.items(): if ref not in basis_refs or basis_refs[ref] != sha: changed_refs[ref] = sha if not changed_refs: return None # No changes # Build exclusion list (commits the destination already has) exclusions = [f"^{sha}" for sha in basis_refs.values() if sha] with tempfile.NamedTemporaryFile(suffix=".bundle", delete=False) as f: bundle_path = f.name try: # Create bundle with changed refs, excluding known commits bundle_args = list(changed_refs.keys()) + exclusions result = self._run_git("bundle", "create", bundle_path, *bundle_args, check=False) if result.returncode != 0: # May fail if no new commits (all excluded) if "empty bundle" in result.stderr.lower(): return None raise subprocess.CalledProcessError(result.returncode, "git bundle create", result.stderr) bundle_data = Path(bundle_path).read_bytes() finally: if os.path.exists(bundle_path): os.unlink(bundle_path) metadata = BundleMetadata( bundle_type=BundleType.INCREMENTAL, repository_id=repository_id, source_node=source_node, timestamp=int(time.time() * 1000), refs_included=list(changed_refs.keys()), prerequisites=list(basis_refs.values()), size_bytes=len(bundle_data), checksum=hashlib.sha256(bundle_data).digest(), ) bundle = GitBundle(metadata=metadata, data=bundle_data) if output_path: bundle.save(output_path) return bundle class GitBundleApplicator: """Applies Git bundles to repositories.""" def __init__(self, repo_path: Path): """Initialize with path to Git repository.""" self.repo_path = Path(repo_path) def _run_git(self, *args: str, check: bool = True) -> subprocess.CompletedProcess: """Run a git command in the repository.""" return subprocess.run( ["git", *args], cwd=self.repo_path, capture_output=True, text=True, check=check, ) def verify_bundle(self, bundle: GitBundle) -> Tuple[bool, str]: """Verify a bundle can be applied. Returns (success, message). """ with tempfile.NamedTemporaryFile(suffix=".bundle", delete=False) as f: f.write(bundle.data) bundle_path = f.name try: result = self._run_git("bundle", "verify", bundle_path, check=False) if result.returncode == 0: return True, "Bundle verified successfully" else: return False, result.stderr.strip() finally: os.unlink(bundle_path) def apply_bundle(self, bundle: GitBundle, fetch_all: bool = True) -> Dict[str, str]: """Apply a bundle to the repository. Args: bundle: The GitBundle to apply fetch_all: If True, fetch all refs from bundle Returns: Dict of applied refs {ref_name: commit_sha} """ with tempfile.NamedTemporaryFile(suffix=".bundle", delete=False) as f: f.write(bundle.data) bundle_path = f.name try: # Verify first ok, msg = self.verify_bundle(bundle) if not ok: raise ValueError(f"Bundle verification failed: {msg}") # List refs in bundle result = self._run_git("bundle", "list-heads", bundle_path) bundle_refs = {} for line in result.stdout.strip().split("\n"): if line: parts = line.split() if len(parts) >= 2: bundle_refs[parts[1]] = parts[0] # Fetch from bundle if fetch_all: # Fetch all refs, preserving their names for ref in bundle_refs: self._run_git("fetch", bundle_path, f"{ref}:{ref}", check=False) else: self._run_git("fetch", bundle_path) return bundle_refs finally: os.unlink(bundle_path) def get_current_refs(self, patterns: Optional[List[str]] = None) -> Dict[str, str]: """Get current refs for computing incremental basis.""" if patterns is None: patterns = RADICLE_REF_PATTERNS refs = {} for pattern in patterns: result = self._run_git("for-each-ref", "--format=%(refname) %(objectname)", pattern, check=False) if result.returncode == 0: for line in result.stdout.strip().split("\n"): if line: parts = line.split() if len(parts) == 2: refs[parts[0]] = parts[1] return refs def estimate_bundle_size(repo_path: Path, ref_patterns: Optional[List[str]] = None) -> int: """Estimate the size of a full bundle without creating it.""" result = subprocess.run( ["git", "count-objects", "-v"], cwd=repo_path, capture_output=True, text=True, ) # Parse size-pack from output for line in result.stdout.split("\n"): if line.startswith("size-pack:"): # size-pack is in KB return int(line.split(":")[1].strip()) * 1024 return 0