Files
zima-apps/Apps/docker-ip-addr-manager/backend/app/service.py
T
2026-03-18 21:43:59 +01:00

277 lines
9.2 KiB
Python

from __future__ import annotations
from dataclasses import dataclass
from ipaddress import IPv4Address
import threading
from typing import Callable
from uuid import uuid4
from app.docker_api import DockerApiError, DockerUsageResolver
from app.interfaces import list_host_interfaces
from app.ip_commands import CommandError, IpAddressManager
from app.models import EntryView, IpEntry
from app.storage import EntryStorage
class ValidationError(RuntimeError):
pass
class NotFoundError(RuntimeError):
pass
class ConflictError(RuntimeError):
pass
class DependencyError(RuntimeError):
pass
@dataclass
class EntryListResponse:
items: list[EntryView]
usage_known: bool
usage_error: str | None
def to_dict(self) -> dict:
return {
"items": [item.to_dict() for item in self.items],
"usage_known": self.usage_known,
"usage_error": self.usage_error,
}
class EntryService:
def __init__(
self,
storage: EntryStorage,
usage_resolver: DockerUsageResolver,
ip_manager: IpAddressManager,
interface_provider: Callable[[], list[str]] = list_host_interfaces,
):
self._storage = storage
self._usage_resolver = usage_resolver
self._ip_manager = ip_manager
self._interface_provider = interface_provider
self._lock = threading.Lock()
def list_interfaces(self) -> list[str]:
interfaces = self._interface_provider()
if not interfaces:
raise DependencyError("No network interfaces discovered on host")
return interfaces
def list_entries(self) -> EntryListResponse:
entries = self._storage.list_entries()
usage_map, usage_known, usage_error = self._resolve_usage(entries)
views: list[EntryView] = []
for entry in entries:
containers = sorted(usage_map.get(entry.ip, set()))
used = bool(containers) if usage_known else False
effective_enabled = False
try:
effective_enabled = self._ip_manager.is_present(entry.ip, entry.cidr, entry.device)
except CommandError:
effective_enabled = False
views.append(
EntryView(
id=entry.id,
name=entry.name,
ip=entry.ip,
cidr=entry.cidr,
device=entry.device,
enabled=entry.enabled,
effective_enabled=effective_enabled,
used=used,
containers=containers,
usage_known=usage_known,
)
)
return EntryListResponse(items=views, usage_known=usage_known, usage_error=usage_error)
def create_entry(self, payload: dict) -> IpEntry:
parsed = _parse_payload(payload)
with self._lock:
entries = self._storage.list_entries()
self._assert_device_exists(parsed["device"])
self._assert_unique_binding(entries, ip=parsed["ip"], cidr=parsed["cidr"], device=parsed["device"])
created = IpEntry(
id=uuid4().hex,
name=parsed["name"],
ip=parsed["ip"],
cidr=parsed["cidr"],
device=parsed["device"],
enabled=False,
)
entries.append(created)
self._storage.save_entries(entries)
return created
def update_entry(self, entry_id: str, payload: dict) -> IpEntry:
parsed = _parse_payload(payload)
with self._lock:
entries = self._storage.list_entries()
index, current = _find_entry(entries, entry_id)
if current.enabled:
raise ConflictError("Disable entry before updating IP/cidr/device")
self._assert_device_exists(parsed["device"])
self._assert_unique_binding(
entries,
ip=parsed["ip"],
cidr=parsed["cidr"],
device=parsed["device"],
ignore_entry_id=entry_id,
)
updated = IpEntry(
id=current.id,
name=parsed["name"],
ip=parsed["ip"],
cidr=parsed["cidr"],
device=parsed["device"],
enabled=False,
)
entries[index] = updated
self._storage.save_entries(entries)
return updated
def set_enabled(self, entry_id: str, enabled: bool) -> IpEntry:
with self._lock:
entries = self._storage.list_entries()
index, entry = _find_entry(entries, entry_id)
if enabled:
self._ip_manager.ensure_present(entry.ip, entry.cidr, entry.device)
entry.enabled = True
else:
self._assert_not_used(entry)
self._ip_manager.ensure_absent(entry.ip, entry.cidr, entry.device)
entry.enabled = False
entries[index] = entry
self._storage.save_entries(entries)
return entry
def delete_entry(self, entry_id: str) -> None:
with self._lock:
entries = self._storage.list_entries()
_, entry = _find_entry(entries, entry_id)
if entry.enabled:
raise ConflictError("Disable entry before deleting")
self._assert_not_used(entry)
remaining = [candidate for candidate in entries if candidate.id != entry_id]
self._storage.save_entries(remaining)
def reconcile_enabled_entries(self) -> list[str]:
errors: list[str] = []
with self._lock:
entries = self._storage.list_entries()
changed = False
for entry in entries:
if not entry.enabled:
continue
try:
self._ip_manager.ensure_present(entry.ip, entry.cidr, entry.device)
changed = True
except CommandError as exc:
errors.append(f"{entry.name} ({entry.ip}/{entry.cidr} {entry.device}): {exc}")
if changed:
self._storage.save_entries(entries)
return errors
def _assert_not_used(self, entry: IpEntry) -> None:
try:
usage = self._usage_resolver.resolve_ip_usage({entry.ip})
except DockerApiError as exc:
raise DependencyError(f"Docker usage check failed: {exc}") from exc
containers = sorted(usage.get(entry.ip, set()))
if containers:
raise ConflictError(
"Entry is currently used by container port bindings: " + ", ".join(containers)
)
def _resolve_usage(self, entries: list[IpEntry]) -> tuple[dict[str, set[str]], bool, str | None]:
ips = {entry.ip for entry in entries}
if not ips:
return {}, True, None
try:
usage = self._usage_resolver.resolve_ip_usage(ips)
return usage, True, None
except DockerApiError as exc:
return {ip: set() for ip in ips}, False, str(exc)
def _assert_unique_binding(
self,
entries: list[IpEntry],
ip: str,
cidr: int,
device: str,
ignore_entry_id: str | None = None,
) -> None:
for entry in entries:
if ignore_entry_id and entry.id == ignore_entry_id:
continue
if entry.ip == ip and entry.cidr == cidr and entry.device == device:
raise ConflictError("Entry with same ip/cidr/device already exists")
def _assert_device_exists(self, device: str) -> None:
interfaces = self.list_interfaces()
if device not in interfaces:
raise ValidationError(f"Unknown network device: {device}")
def _find_entry(entries: list[IpEntry], entry_id: str) -> tuple[int, IpEntry]:
for index, entry in enumerate(entries):
if entry.id == entry_id:
return index, entry
raise NotFoundError(f"Entry not found: {entry_id}")
def _parse_payload(payload: dict) -> dict:
name = str(payload.get("name", "")).strip()
ip = str(payload.get("ip", "")).strip()
device = str(payload.get("device", "")).strip()
if not name:
raise ValidationError("Field 'name' is required")
if len(name) > 96:
raise ValidationError("Field 'name' is too long (max 96)")
if not ip:
raise ValidationError("Field 'ip' is required")
try:
ip_obj = IPv4Address(ip)
except ValueError as exc:
raise ValidationError(f"Invalid IPv4 address: {ip}") from exc
if not device:
raise ValidationError("Field 'device' is required")
if any(ch.isspace() for ch in device):
raise ValidationError("Field 'device' cannot contain whitespace")
raw_cidr = payload.get("cidr")
if raw_cidr is None:
raise ValidationError("Field 'cidr' is required")
try:
cidr = int(raw_cidr)
except (TypeError, ValueError) as exc:
raise ValidationError("Field 'cidr' must be an integer") from exc
if cidr < 0 or cidr > 32:
raise ValidationError("Field 'cidr' must be in range 0..32")
return {
"name": name,
"ip": str(ip_obj),
"cidr": cidr,
"device": device,
}