Files
zima-apps/Apps/docker-ip-addr-manager/backend/app/docker_api.py
T
2026-03-18 21:43:59 +01:00

135 lines
4.8 KiB
Python

from __future__ import annotations
import http.client
import json
import socket
from urllib.parse import urlencode, urlparse
class DockerApiError(RuntimeError):
pass
class _UnixHTTPConnection(http.client.HTTPConnection):
def __init__(self, socket_path: str, timeout: float):
super().__init__(host="localhost", timeout=timeout)
self._socket_path = socket_path
def connect(self) -> None:
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
sock.settimeout(self.timeout)
sock.connect(self._socket_path)
self.sock = sock
class DockerApiClient:
def __init__(self, base_url: str, timeout_seconds: float = 3.0):
parsed = urlparse(base_url)
self._timeout = timeout_seconds
if parsed.scheme == "unix":
self._mode = "unix"
self._socket_path = parsed.path or "/var/run/docker.sock"
self._base_path = ""
elif parsed.scheme in {"http", "https"}:
if not parsed.netloc:
raise ValueError(f"Invalid Docker URL (missing host): {base_url}")
self._mode = parsed.scheme
self._host = parsed.hostname or "localhost"
self._port = parsed.port
self._base_path = parsed.path.rstrip("/")
else:
raise ValueError(f"Unsupported Docker URL scheme: {parsed.scheme}")
def list_containers(self, include_stopped: bool = True) -> list[dict]:
query = urlencode({"all": "1" if include_stopped else "0"})
return self._get_json(f"/containers/json?{query}")
def inspect_container(self, container_id: str) -> dict:
return self._get_json(f"/containers/{container_id}/json")
def _get_json(self, path: str):
body = self._get(path)
try:
return json.loads(body)
except json.JSONDecodeError as exc:
raise DockerApiError(f"Docker API returned invalid JSON for {path}: {exc}") from exc
def _get(self, path: str) -> str:
if not path.startswith("/"):
path = "/" + path
if self._mode == "unix":
conn = _UnixHTTPConnection(self._socket_path, timeout=self._timeout)
request_path = f"{self._base_path}{path}"
else:
if self._mode == "https":
conn = http.client.HTTPSConnection(self._host, self._port, timeout=self._timeout)
else:
conn = http.client.HTTPConnection(self._host, self._port, timeout=self._timeout)
request_path = f"{self._base_path}{path}"
try:
conn.request("GET", request_path)
response = conn.getresponse()
payload = response.read().decode("utf-8", errors="replace")
except OSError as exc:
raise DockerApiError(f"Docker API request failed for {path}: {exc}") from exc
finally:
conn.close()
if response.status < 200 or response.status >= 300:
raise DockerApiError(
f"Docker API error for {path}: HTTP {response.status} {response.reason}; body={payload[:400]}"
)
return payload
class DockerUsageResolver:
def __init__(self, docker_client: DockerApiClient):
self._docker = docker_client
def resolve_ip_usage(self, ips: set[str]) -> dict[str, set[str]]:
usage: dict[str, set[str]] = {ip: set() for ip in ips}
if not ips:
return usage
containers = self._docker.list_containers(include_stopped=True)
for container in containers:
container_id = container.get("Id")
if not container_id:
continue
inspect_payload = self._docker.inspect_container(container_id)
container_name = _display_name(container, inspect_payload)
ports = ((inspect_payload.get("NetworkSettings") or {}).get("Ports") or {})
if not isinstance(ports, dict):
continue
for bindings in ports.values():
if not bindings:
continue
for binding in bindings:
if not isinstance(binding, dict):
continue
host_ip = binding.get("HostIp")
if host_ip in usage:
usage[host_ip].add(container_name)
return usage
def _display_name(container_summary: dict, container_inspect: dict) -> str:
inspect_name = container_inspect.get("Name")
if isinstance(inspect_name, str) and inspect_name.strip("/"):
return inspect_name.strip("/")
names = container_summary.get("Names")
if isinstance(names, list) and names:
first_name = str(names[0]).strip("/")
if first_name:
return first_name
container_id = container_summary.get("Id", "")
return str(container_id)[:12] or "unknown"