from __future__ import annotations import http.client import json import socket from urllib.parse import urlencode, urlparse class DockerApiError(RuntimeError): pass class _UnixHTTPConnection(http.client.HTTPConnection): def __init__(self, socket_path: str, timeout: float): super().__init__(host="localhost", timeout=timeout) self._socket_path = socket_path def connect(self) -> None: sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) sock.settimeout(self.timeout) sock.connect(self._socket_path) self.sock = sock class DockerApiClient: def __init__(self, base_url: str, timeout_seconds: float = 3.0): parsed = urlparse(base_url) self._timeout = timeout_seconds if parsed.scheme == "unix": self._mode = "unix" self._socket_path = parsed.path or "/var/run/docker.sock" self._base_path = "" elif parsed.scheme in {"http", "https"}: if not parsed.netloc: raise ValueError(f"Invalid Docker URL (missing host): {base_url}") self._mode = parsed.scheme self._host = parsed.hostname or "localhost" self._port = parsed.port self._base_path = parsed.path.rstrip("/") else: raise ValueError(f"Unsupported Docker URL scheme: {parsed.scheme}") def list_containers(self, include_stopped: bool = True) -> list[dict]: query = urlencode({"all": "1" if include_stopped else "0"}) return self._get_json(f"/containers/json?{query}") def inspect_container(self, container_id: str) -> dict: return self._get_json(f"/containers/{container_id}/json") def _get_json(self, path: str): body = self._get(path) try: return json.loads(body) except json.JSONDecodeError as exc: raise DockerApiError(f"Docker API returned invalid JSON for {path}: {exc}") from exc def _get(self, path: str) -> str: if not path.startswith("/"): path = "/" + path if self._mode == "unix": conn = _UnixHTTPConnection(self._socket_path, timeout=self._timeout) request_path = f"{self._base_path}{path}" else: if self._mode == "https": conn = http.client.HTTPSConnection(self._host, self._port, timeout=self._timeout) else: conn = http.client.HTTPConnection(self._host, self._port, timeout=self._timeout) request_path = f"{self._base_path}{path}" try: conn.request("GET", request_path) response = conn.getresponse() payload = response.read().decode("utf-8", errors="replace") except OSError as exc: raise DockerApiError(f"Docker API request failed for {path}: {exc}") from exc finally: conn.close() if response.status < 200 or response.status >= 300: raise DockerApiError( f"Docker API error for {path}: HTTP {response.status} {response.reason}; body={payload[:400]}" ) return payload class DockerUsageResolver: def __init__(self, docker_client: DockerApiClient): self._docker = docker_client def resolve_ip_usage(self, ips: set[str]) -> dict[str, set[str]]: usage: dict[str, set[str]] = {ip: set() for ip in ips} if not ips: return usage containers = self._docker.list_containers(include_stopped=True) for container in containers: container_id = container.get("Id") if not container_id: continue inspect_payload = self._docker.inspect_container(container_id) container_name = _display_name(container, inspect_payload) ports = ((inspect_payload.get("NetworkSettings") or {}).get("Ports") or {}) if not isinstance(ports, dict): continue for bindings in ports.values(): if not bindings: continue for binding in bindings: if not isinstance(binding, dict): continue host_ip = binding.get("HostIp") if host_ip in usage: usage[host_ip].add(container_name) return usage def _display_name(container_summary: dict, container_inspect: dict) -> str: inspect_name = container_inspect.get("Name") if isinstance(inspect_name, str) and inspect_name.strip("/"): return inspect_name.strip("/") names = container_summary.get("Names") if isinstance(names, list) and names: first_name = str(names[0]).strip("/") if first_name: return first_name container_id = container_summary.get("Id", "") return str(container_id)[:12] or "unknown"