from __future__ import annotations from dataclasses import dataclass from ipaddress import IPv4Address import threading from typing import Callable from uuid import uuid4 from app.docker_api import DockerApiError, DockerUsageResolver from app.interfaces import list_host_interfaces from app.ip_commands import CommandError, IpAddressManager from app.models import EntryView, IpEntry from app.storage import EntryStorage class ValidationError(RuntimeError): pass class NotFoundError(RuntimeError): pass class ConflictError(RuntimeError): pass class DependencyError(RuntimeError): pass @dataclass class EntryListResponse: items: list[EntryView] usage_known: bool usage_error: str | None def to_dict(self) -> dict: return { "items": [item.to_dict() for item in self.items], "usage_known": self.usage_known, "usage_error": self.usage_error, } class EntryService: def __init__( self, storage: EntryStorage, usage_resolver: DockerUsageResolver, ip_manager: IpAddressManager, interface_provider: Callable[[], list[str]] = list_host_interfaces, ): self._storage = storage self._usage_resolver = usage_resolver self._ip_manager = ip_manager self._interface_provider = interface_provider self._lock = threading.Lock() def list_interfaces(self) -> list[str]: interfaces = self._interface_provider() if not interfaces: raise DependencyError("No network interfaces discovered on host") return interfaces def list_entries(self) -> EntryListResponse: entries = self._storage.list_entries() usage_map, usage_known, usage_error = self._resolve_usage(entries) views: list[EntryView] = [] for entry in entries: containers = sorted(usage_map.get(entry.ip, set())) used = bool(containers) if usage_known else False effective_enabled = False try: effective_enabled = self._ip_manager.is_present(entry.ip, entry.cidr, entry.device) except CommandError: effective_enabled = False views.append( EntryView( id=entry.id, name=entry.name, ip=entry.ip, cidr=entry.cidr, device=entry.device, enabled=entry.enabled, effective_enabled=effective_enabled, used=used, containers=containers, usage_known=usage_known, ) ) return EntryListResponse(items=views, usage_known=usage_known, usage_error=usage_error) def create_entry(self, payload: dict) -> IpEntry: parsed = _parse_payload(payload) with self._lock: entries = self._storage.list_entries() self._assert_device_exists(parsed["device"]) self._assert_unique_binding(entries, ip=parsed["ip"], cidr=parsed["cidr"], device=parsed["device"]) created = IpEntry( id=uuid4().hex, name=parsed["name"], ip=parsed["ip"], cidr=parsed["cidr"], device=parsed["device"], enabled=False, ) entries.append(created) self._storage.save_entries(entries) return created def update_entry(self, entry_id: str, payload: dict) -> IpEntry: parsed = _parse_payload(payload) with self._lock: entries = self._storage.list_entries() index, current = _find_entry(entries, entry_id) if current.enabled: raise ConflictError("Disable entry before updating IP/cidr/device") self._assert_device_exists(parsed["device"]) self._assert_unique_binding( entries, ip=parsed["ip"], cidr=parsed["cidr"], device=parsed["device"], ignore_entry_id=entry_id, ) updated = IpEntry( id=current.id, name=parsed["name"], ip=parsed["ip"], cidr=parsed["cidr"], device=parsed["device"], enabled=False, ) entries[index] = updated self._storage.save_entries(entries) return updated def set_enabled(self, entry_id: str, enabled: bool) -> IpEntry: with self._lock: entries = self._storage.list_entries() index, entry = _find_entry(entries, entry_id) if enabled: self._ip_manager.ensure_present(entry.ip, entry.cidr, entry.device) entry.enabled = True else: self._assert_not_used(entry) self._ip_manager.ensure_absent(entry.ip, entry.cidr, entry.device) entry.enabled = False entries[index] = entry self._storage.save_entries(entries) return entry def delete_entry(self, entry_id: str) -> None: with self._lock: entries = self._storage.list_entries() _, entry = _find_entry(entries, entry_id) if entry.enabled: raise ConflictError("Disable entry before deleting") self._assert_not_used(entry) remaining = [candidate for candidate in entries if candidate.id != entry_id] self._storage.save_entries(remaining) def reconcile_enabled_entries(self) -> list[str]: errors: list[str] = [] with self._lock: entries = self._storage.list_entries() changed = False for entry in entries: if not entry.enabled: continue try: self._ip_manager.ensure_present(entry.ip, entry.cidr, entry.device) changed = True except CommandError as exc: errors.append(f"{entry.name} ({entry.ip}/{entry.cidr} {entry.device}): {exc}") if changed: self._storage.save_entries(entries) return errors def _assert_not_used(self, entry: IpEntry) -> None: try: usage = self._usage_resolver.resolve_ip_usage({entry.ip}) except DockerApiError as exc: raise DependencyError(f"Docker usage check failed: {exc}") from exc containers = sorted(usage.get(entry.ip, set())) if containers: raise ConflictError( "Entry is currently used by container port bindings: " + ", ".join(containers) ) def _resolve_usage(self, entries: list[IpEntry]) -> tuple[dict[str, set[str]], bool, str | None]: ips = {entry.ip for entry in entries} if not ips: return {}, True, None try: usage = self._usage_resolver.resolve_ip_usage(ips) return usage, True, None except DockerApiError as exc: return {ip: set() for ip in ips}, False, str(exc) def _assert_unique_binding( self, entries: list[IpEntry], ip: str, cidr: int, device: str, ignore_entry_id: str | None = None, ) -> None: for entry in entries: if ignore_entry_id and entry.id == ignore_entry_id: continue if entry.ip == ip and entry.cidr == cidr and entry.device == device: raise ConflictError("Entry with same ip/cidr/device already exists") def _assert_device_exists(self, device: str) -> None: interfaces = self.list_interfaces() if device not in interfaces: raise ValidationError(f"Unknown network device: {device}") def _find_entry(entries: list[IpEntry], entry_id: str) -> tuple[int, IpEntry]: for index, entry in enumerate(entries): if entry.id == entry_id: return index, entry raise NotFoundError(f"Entry not found: {entry_id}") def _parse_payload(payload: dict) -> dict: name = str(payload.get("name", "")).strip() ip = str(payload.get("ip", "")).strip() device = str(payload.get("device", "")).strip() if not name: raise ValidationError("Field 'name' is required") if len(name) > 96: raise ValidationError("Field 'name' is too long (max 96)") if not ip: raise ValidationError("Field 'ip' is required") try: ip_obj = IPv4Address(ip) except ValueError as exc: raise ValidationError(f"Invalid IPv4 address: {ip}") from exc if not device: raise ValidationError("Field 'device' is required") if any(ch.isspace() for ch in device): raise ValidationError("Field 'device' cannot contain whitespace") raw_cidr = payload.get("cidr") if raw_cidr is None: raise ValidationError("Field 'cidr' is required") try: cidr = int(raw_cidr) except (TypeError, ValueError) as exc: raise ValidationError("Field 'cidr' must be an integer") from exc if cidr < 0 or cidr > 32: raise ValidationError("Field 'cidr' must be in range 0..32") return { "name": name, "ip": str(ip_obj), "cidr": cidr, "device": device, }