206 lines
7.3 KiB
Python
206 lines
7.3 KiB
Python
#!/usr/bin/env python3
|
|
import importlib.util
|
|
import json
|
|
import tempfile
|
|
from pathlib import Path
|
|
|
|
|
|
ROOT_DIR = Path(__file__).resolve().parents[1]
|
|
AGENT_PATH = ROOT_DIR / "agent" / "discovery_agent.py"
|
|
|
|
|
|
def load_agent_module():
|
|
spec = importlib.util.spec_from_file_location("discovery_agent", AGENT_PATH)
|
|
module = importlib.util.module_from_spec(spec)
|
|
assert spec.loader is not None
|
|
spec.loader.exec_module(module)
|
|
return module
|
|
|
|
|
|
def assert_true(condition, message):
|
|
if not condition:
|
|
raise AssertionError(message)
|
|
|
|
|
|
def test_optin_route_selection(module):
|
|
containers = [
|
|
{"Id": "frigate123", "Names": ["/frigate"], "Image": "ghcr.io/blakeblackshear/frigate:stable"},
|
|
{"Id": "hidden123", "Names": ["/hidden-service"], "Image": "nginx:1.27"},
|
|
]
|
|
inspect_map = {
|
|
"frigate123": {
|
|
"Config": {
|
|
"Env": [
|
|
"LABEL_CADDY_ENABLE=true",
|
|
"LABEL_CADDY_TARGET_PORT=5000",
|
|
"LABEL_CADDY_SCHEME=http",
|
|
]
|
|
},
|
|
"NetworkSettings": {
|
|
"Ports": {
|
|
"5000/tcp": [{"HostIp": "0.0.0.0", "HostPort": "5000"}],
|
|
"8554/tcp": [{"HostIp": "0.0.0.0", "HostPort": "8554"}],
|
|
"8555/tcp": [{"HostIp": "0.0.0.0", "HostPort": "8555"}],
|
|
"8555/udp": [{"HostIp": "0.0.0.0", "HostPort": "8555"}],
|
|
}
|
|
},
|
|
},
|
|
"hidden123": {
|
|
"Config": {
|
|
"Env": [
|
|
"LABEL_CADDY_ENABLE=false",
|
|
"LABEL_CADDY_TARGET_PORT=8080",
|
|
]
|
|
},
|
|
"NetworkSettings": {
|
|
"Ports": {
|
|
"8080/tcp": [{"HostIp": "0.0.0.0", "HostPort": "8080"}],
|
|
}
|
|
},
|
|
},
|
|
}
|
|
|
|
def fake_get_json(url: str):
|
|
if url.endswith("/containers/json?all=0"):
|
|
return containers
|
|
if "/containers/" in url and url.endswith("/json"):
|
|
container_id = url.rsplit("/containers/", 1)[1].rsplit("/json", 1)[0]
|
|
return inspect_map[container_id]
|
|
raise AssertionError(f"unexpected URL in test: {url}")
|
|
|
|
original = module._get_json
|
|
module._get_json = fake_get_json
|
|
try:
|
|
routes = module._collect_routes(
|
|
docker_api_url="http://docker.test",
|
|
env_prefix="LABEL_CADDY_",
|
|
denylist={"caddy-autogen", "caddy-autogen-discovery", "caddy-autogen-socket-proxy"},
|
|
base_domain="home.example.test",
|
|
default_scheme="http",
|
|
default_path="/",
|
|
default_health_uri="",
|
|
)
|
|
finally:
|
|
module._get_json = original
|
|
|
|
assert_true(len(routes) == 1, f"expected 1 route, got {len(routes)}")
|
|
route = routes[0]
|
|
assert_true(route["fqdn"] == "frigate.home.example.test", f"unexpected fqdn: {route['fqdn']}")
|
|
assert_true(route["upstream"] == "host.docker.internal:5000", f"unexpected upstream: {route['upstream']}")
|
|
|
|
caddyfile = module._generate_caddyfile(
|
|
routes=routes,
|
|
token="dummy-token",
|
|
require_cloudflare=True,
|
|
allow_internal_tls_fallback=False,
|
|
wildcard_domain="home.example.test",
|
|
cert_email="",
|
|
status_ui_port=31820,
|
|
status_upstream="discovery-agent:8089",
|
|
)
|
|
|
|
assert_true(":31820" in caddyfile, "expected status UI server block")
|
|
assert_true("remote_ip private_ranges" in caddyfile, "expected LAN-only restriction")
|
|
assert_true("reverse_proxy discovery-agent:8089" in caddyfile, "expected status upstream")
|
|
assert_true("frigate.home.example.test" in caddyfile, "expected frigate host in caddyfile")
|
|
assert_true("reverse_proxy http://host.docker.internal:5000" in caddyfile, "expected web port route")
|
|
assert_true("8554" not in caddyfile and "8555" not in caddyfile, "media ports must not be routed")
|
|
assert_true("hidden-service.home.example.test" not in caddyfile, "non opt-in container must stay hidden")
|
|
|
|
|
|
def test_fail_closed_and_internal_fallback(module):
|
|
routes = [
|
|
{
|
|
"name": "demo",
|
|
"fqdn": "demo.home.example.test",
|
|
"scheme": "http",
|
|
"upstream": "host.docker.internal:5000",
|
|
"path": "/",
|
|
"health_uri": "",
|
|
}
|
|
]
|
|
|
|
failed = False
|
|
try:
|
|
module._generate_caddyfile(
|
|
routes=routes,
|
|
token="",
|
|
require_cloudflare=True,
|
|
allow_internal_tls_fallback=False,
|
|
wildcard_domain="",
|
|
cert_email="",
|
|
status_ui_port=31820,
|
|
status_upstream="discovery-agent:8089",
|
|
)
|
|
except RuntimeError as exc:
|
|
failed = True
|
|
assert_true("CLOUDFLARE_API_TOKEN" in str(exc), "expected explicit token error")
|
|
assert_true(failed, "expected fail-closed RuntimeError without Cloudflare token")
|
|
|
|
fallback_caddyfile = module._generate_caddyfile(
|
|
routes=routes,
|
|
token="",
|
|
require_cloudflare=False,
|
|
allow_internal_tls_fallback=True,
|
|
wildcard_domain="",
|
|
cert_email="",
|
|
status_ui_port=31820,
|
|
status_upstream="discovery-agent:8089",
|
|
)
|
|
assert_true("local_certs" in fallback_caddyfile, "expected local_certs in fallback mode")
|
|
assert_true("tls internal" in fallback_caddyfile, "expected internal tls in fallback mode")
|
|
|
|
|
|
def test_cloudflare_verify_and_cert_discovery(module):
|
|
class FakeResponse:
|
|
def __init__(self, payload):
|
|
self._payload = payload
|
|
|
|
def read(self):
|
|
return json.dumps(self._payload).encode("utf-8")
|
|
|
|
def __enter__(self):
|
|
return self
|
|
|
|
def __exit__(self, exc_type, exc, tb):
|
|
return False
|
|
|
|
def fake_urlopen(req, timeout=0):
|
|
_ = req
|
|
_ = timeout
|
|
return FakeResponse({"success": True})
|
|
|
|
original_urlopen = module.urllib.request.urlopen
|
|
module.urllib.request.urlopen = fake_urlopen
|
|
try:
|
|
status = module._verify_cloudflare_token("https://api.cloudflare.com/client/v4/user/tokens/verify", "token")
|
|
finally:
|
|
module.urllib.request.urlopen = original_urlopen
|
|
|
|
assert_true(status["reachable"] is True, "cloudflare should be reachable in mocked success")
|
|
assert_true(status["token_valid"] is True, "token should be valid in mocked success")
|
|
|
|
with tempfile.TemporaryDirectory() as td:
|
|
cert_dir = Path(td) / "caddy" / "certificates" / "acme-v02.api.letsencrypt.org-directory" / "example.com"
|
|
cert_dir.mkdir(parents=True, exist_ok=True)
|
|
(cert_dir / "demo.home.example.test.crt").write_text("fake", encoding="utf-8")
|
|
(cert_dir / "_.home.example.test.crt").write_text("fake", encoding="utf-8")
|
|
|
|
hosts = module._collect_letsencrypt_hosts(td)
|
|
assert_true("demo.home.example.test" in hosts, "expected concrete cert host")
|
|
assert_true("*.home.example.test" in hosts, "expected wildcard cert host conversion")
|
|
assert_true(module._has_matching_le_cert("api.home.example.test", hosts), "wildcard should match")
|
|
assert_true(module._has_matching_le_cert("demo.home.example.test", hosts), "exact cert should match")
|
|
|
|
|
|
def main():
|
|
module = load_agent_module()
|
|
test_optin_route_selection(module)
|
|
test_fail_closed_and_internal_fallback(module)
|
|
test_cloudflare_verify_and_cert_discovery(module)
|
|
print("Integration tests passed")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|