300 lines
11 KiB
Python
300 lines
11 KiB
Python
"""Tests for RadicleBridge announce filtering and state logic (RNS mocked)."""
|
|
|
|
import struct
|
|
import time
|
|
from unittest.mock import MagicMock, patch, call
|
|
|
|
import pytest
|
|
import RNS
|
|
|
|
from radicle_reticulum.bridge import (
|
|
RadicleBridge,
|
|
BRIDGE_APP_DATA_MAGIC,
|
|
TunnelConnection,
|
|
)
|
|
from radicle_reticulum.identity import RadicleIdentity
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def _make_bridge(auto_connect: bool = False, auto_seed: bool = False) -> RadicleBridge:
|
|
"""Instantiate bridge with all RNS I/O patched out."""
|
|
identity = RadicleIdentity.generate()
|
|
dest_mock = MagicMock()
|
|
dest_mock.hash = b"\xaa" * 16
|
|
dest_mock.hexhash = "aa" * 16
|
|
|
|
with patch("radicle_reticulum.bridge.RNS.Reticulum"), \
|
|
patch("radicle_reticulum.bridge.RNS.Destination", return_value=dest_mock), \
|
|
patch("radicle_reticulum.bridge.RNS.Transport"), \
|
|
patch("radicle_reticulum.bridge.RNS.log"):
|
|
bridge = RadicleBridge(
|
|
identity=identity,
|
|
auto_connect=auto_connect,
|
|
auto_seed=auto_seed,
|
|
)
|
|
return bridge
|
|
|
|
|
|
def _nid_app_data(nid: str) -> bytes:
|
|
"""Build bridge app_data bytes that include a radicle NID."""
|
|
nid_bytes = nid.encode("utf-8")
|
|
return BRIDGE_APP_DATA_MAGIC + struct.pack("!H", len(nid_bytes)) + nid_bytes
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# TunnelConnection
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestTunnelConnection:
|
|
def test_close_closes_tcp_socket(self):
|
|
mock_socket = MagicMock()
|
|
mock_link = MagicMock()
|
|
tunnel = TunnelConnection(
|
|
tunnel_id=1,
|
|
tcp_socket=mock_socket,
|
|
rns_link=mock_link,
|
|
remote_destination=b"\x00" * 16,
|
|
)
|
|
tunnel.close()
|
|
mock_socket.close.assert_called_once()
|
|
mock_link.teardown.assert_called_once()
|
|
assert not tunnel.active
|
|
|
|
def test_close_tolerates_already_closed_socket(self):
|
|
mock_socket = MagicMock()
|
|
mock_socket.close.side_effect = OSError("already closed")
|
|
tunnel = TunnelConnection(
|
|
tunnel_id=2,
|
|
tcp_socket=mock_socket,
|
|
rns_link=None,
|
|
remote_destination=None,
|
|
)
|
|
tunnel.close() # should not raise
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Bridge construction
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestBridgeConstruction:
|
|
def test_initial_state_is_empty(self):
|
|
bridge = _make_bridge()
|
|
assert bridge.get_remote_bridges() == []
|
|
stats = bridge.get_stats()
|
|
assert stats["active_tunnels"] == 0
|
|
assert stats["known_bridges"] == 0
|
|
|
|
def test_local_nid_is_none_initially(self):
|
|
bridge = _make_bridge()
|
|
assert bridge._local_radicle_nid is None
|
|
|
|
def test_set_local_radicle_nid(self):
|
|
bridge = _make_bridge()
|
|
with patch("radicle_reticulum.bridge.RNS.log"):
|
|
bridge.set_local_radicle_nid("z6Mktest")
|
|
assert bridge._local_radicle_nid == "z6Mktest"
|
|
|
|
def test_get_remote_bridge_nid_returns_none_for_unknown(self):
|
|
bridge = _make_bridge()
|
|
assert bridge.get_remote_bridge_nid(b"\x01" * 16) is None
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# _handle_announce
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestBridgeHandleAnnounce:
|
|
def _rns_id(self):
|
|
return RNS.Identity()
|
|
|
|
def test_ignores_own_announce(self):
|
|
bridge = _make_bridge()
|
|
own_hash = bridge.destination.hash # mock's b"\xaa" * 16
|
|
|
|
discovered = []
|
|
bridge.set_on_bridge_discovered(lambda h, n: discovered.append(h))
|
|
|
|
with patch("radicle_reticulum.bridge.RNS.log"):
|
|
bridge._handle_announce(own_hash, self._rns_id(), BRIDGE_APP_DATA_MAGIC)
|
|
|
|
assert discovered == []
|
|
assert bridge.get_remote_bridges() == []
|
|
|
|
def test_ignores_non_bridge_announce(self):
|
|
bridge = _make_bridge()
|
|
|
|
with patch("radicle_reticulum.bridge.RNS.log"):
|
|
bridge._handle_announce(b"\xbb" * 16, self._rns_id(), b"SOME_OTHER_APP")
|
|
|
|
assert bridge.get_remote_bridges() == []
|
|
|
|
def test_ignores_announce_with_no_app_data(self):
|
|
bridge = _make_bridge()
|
|
|
|
with patch("radicle_reticulum.bridge.RNS.log"):
|
|
bridge._handle_announce(b"\xcc" * 16, self._rns_id(), None)
|
|
|
|
assert bridge.get_remote_bridges() == []
|
|
|
|
def test_discovers_bridge_with_magic_only(self):
|
|
bridge = _make_bridge()
|
|
peer_hash = b"\xdd" * 16
|
|
discovered = []
|
|
bridge.set_on_bridge_discovered(lambda h, n: discovered.append((h, n)))
|
|
|
|
with patch("radicle_reticulum.bridge.RNS.log"):
|
|
bridge._handle_announce(peer_hash, self._rns_id(), BRIDGE_APP_DATA_MAGIC)
|
|
|
|
assert len(discovered) == 1
|
|
assert discovered[0][0] == peer_hash
|
|
assert discovered[0][1] is None
|
|
assert peer_hash in bridge.get_remote_bridges()
|
|
|
|
def test_extracts_nid_from_app_data(self):
|
|
bridge = _make_bridge()
|
|
peer_hash = b"\xee" * 16
|
|
nid = "z6MktestNID123"
|
|
discovered = []
|
|
bridge.set_on_bridge_discovered(lambda h, n: discovered.append((h, n)))
|
|
|
|
with patch("radicle_reticulum.bridge.RNS.log"):
|
|
bridge._handle_announce(peer_hash, self._rns_id(), _nid_app_data(nid))
|
|
|
|
assert discovered[0][1] == nid
|
|
assert bridge.get_remote_bridge_nid(peer_hash) == nid
|
|
|
|
def test_second_announce_does_not_fire_callback_again(self):
|
|
bridge = _make_bridge()
|
|
peer_hash = b"\xff" * 16
|
|
discovered = []
|
|
bridge.set_on_bridge_discovered(lambda h, n: discovered.append(h))
|
|
|
|
with patch("radicle_reticulum.bridge.RNS.log"):
|
|
bridge._handle_announce(peer_hash, self._rns_id(), BRIDGE_APP_DATA_MAGIC)
|
|
bridge._handle_announce(peer_hash, self._rns_id(), BRIDGE_APP_DATA_MAGIC)
|
|
|
|
assert len(discovered) == 1
|
|
assert len(bridge.get_remote_bridges()) == 1
|
|
|
|
def test_multiple_distinct_bridges_all_tracked(self):
|
|
bridge = _make_bridge()
|
|
|
|
with patch("radicle_reticulum.bridge.RNS.log"):
|
|
for i in range(4):
|
|
bridge._handle_announce(
|
|
bytes([i]) * 16, self._rns_id(), BRIDGE_APP_DATA_MAGIC
|
|
)
|
|
|
|
assert len(bridge.get_remote_bridges()) == 4
|
|
|
|
def test_auto_connect_spawns_thread(self):
|
|
bridge = _make_bridge(auto_connect=True)
|
|
peer_hash = b"\x11" * 16
|
|
|
|
with patch("radicle_reticulum.bridge.RNS.log"), \
|
|
patch("radicle_reticulum.bridge.threading.Thread") as mock_thread:
|
|
mock_t = MagicMock()
|
|
mock_thread.return_value = mock_t
|
|
bridge._handle_announce(peer_hash, self._rns_id(), BRIDGE_APP_DATA_MAGIC)
|
|
|
|
mock_t.start.assert_called()
|
|
|
|
def test_auto_seed_spawns_thread_when_nid_present(self):
|
|
bridge = _make_bridge(auto_seed=True)
|
|
peer_hash = b"\x22" * 16
|
|
nid = "z6MkAutoSeed"
|
|
|
|
with patch("radicle_reticulum.bridge.RNS.log"), \
|
|
patch("radicle_reticulum.bridge.threading.Thread") as mock_thread:
|
|
mock_t = MagicMock()
|
|
mock_thread.return_value = mock_t
|
|
bridge._handle_announce(peer_hash, self._rns_id(), _nid_app_data(nid))
|
|
|
|
mock_t.start.assert_called()
|
|
|
|
def test_malformed_nid_in_app_data_is_ignored_gracefully(self):
|
|
bridge = _make_bridge()
|
|
peer_hash = b"\x33" * 16
|
|
bad_app_data = BRIDGE_APP_DATA_MAGIC + b"\xff\xff" # nid_len=65535 but truncated
|
|
|
|
discovered = []
|
|
bridge.set_on_bridge_discovered(lambda h, n: discovered.append((h, n)))
|
|
|
|
with patch("radicle_reticulum.bridge.RNS.log"):
|
|
bridge._handle_announce(peer_hash, self._rns_id(), bad_app_data)
|
|
|
|
# Bridge is still discovered, NID just not parsed
|
|
assert len(discovered) == 1
|
|
assert discovered[0][1] is None
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# register_seed
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestRegisterSeed:
|
|
def test_register_seed_success(self):
|
|
bridge = _make_bridge()
|
|
with patch("radicle_reticulum.bridge.subprocess.run") as mock_run, \
|
|
patch("radicle_reticulum.bridge.RNS.log"):
|
|
mock_run.return_value = MagicMock(returncode=0)
|
|
result = bridge.register_seed("z6MktestNID")
|
|
assert result is True
|
|
cmd = mock_run.call_args[0][0]
|
|
assert "rad" in cmd
|
|
assert "z6MktestNID" in " ".join(cmd)
|
|
|
|
def test_register_seed_rad_not_found(self):
|
|
bridge = _make_bridge()
|
|
with patch("radicle_reticulum.bridge.subprocess.run",
|
|
side_effect=FileNotFoundError), \
|
|
patch("radicle_reticulum.bridge.RNS.log"):
|
|
result = bridge.register_seed("z6MkNID")
|
|
assert result is False
|
|
|
|
def test_register_seed_nonzero_exit(self):
|
|
bridge = _make_bridge()
|
|
with patch("radicle_reticulum.bridge.subprocess.run") as mock_run, \
|
|
patch("radicle_reticulum.bridge.RNS.log"):
|
|
mock_run.return_value = MagicMock(returncode=1, stderr="error")
|
|
result = bridge.register_seed("z6MkNID")
|
|
assert result is False
|
|
|
|
def test_register_seed_timeout(self):
|
|
import subprocess
|
|
bridge = _make_bridge()
|
|
with patch("radicle_reticulum.bridge.subprocess.run",
|
|
side_effect=subprocess.TimeoutExpired(cmd="rad", timeout=30)), \
|
|
patch("radicle_reticulum.bridge.RNS.log"):
|
|
result = bridge.register_seed("z6MkNID")
|
|
assert result is False
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# get_stats
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestBridgeStats:
|
|
def test_stats_with_no_tunnels(self):
|
|
bridge = _make_bridge()
|
|
stats = bridge.get_stats()
|
|
assert stats == {
|
|
"active_tunnels": 0,
|
|
"known_bridges": 0,
|
|
"bytes_sent": 0,
|
|
"bytes_received": 0,
|
|
"rns_hash": bridge.destination.hexhash,
|
|
}
|
|
|
|
def test_stats_counts_known_bridges(self):
|
|
bridge = _make_bridge()
|
|
with patch("radicle_reticulum.bridge.RNS.log"):
|
|
bridge._handle_announce(b"\x01" * 16, RNS.Identity(), BRIDGE_APP_DATA_MAGIC)
|
|
bridge._handle_announce(b"\x02" * 16, RNS.Identity(), BRIDGE_APP_DATA_MAGIC)
|
|
|
|
stats = bridge.get_stats()
|
|
assert stats["known_bridges"] == 2
|