from __future__ import annotations from dataclasses import dataclass from ipaddress import IPv4Address import threading from typing import Callable from uuid import uuid4 from app.docker_api import DockerApiError, DockerUsageResolver from app.dns_sync import DnsProvider, DnsSyncError, to_fqdn from app.interfaces import list_host_interfaces from app.ip_commands import CommandError, IpAddressManager from app.models import EntryView, IpEntry from app.storage import EntryStorage class ValidationError(RuntimeError): pass class NotFoundError(RuntimeError): pass class ConflictError(RuntimeError): pass class DependencyError(RuntimeError): pass @dataclass class EntryListResponse: items: list[EntryView] usage_known: bool usage_error: str | None def to_dict(self) -> dict: return { "items": [item.to_dict() for item in self.items], "usage_known": self.usage_known, "usage_error": self.usage_error, } class EntryService: def __init__( self, storage: EntryStorage, usage_resolver: DockerUsageResolver, ip_manager: IpAddressManager, interface_provider: Callable[[], list[str]] = list_host_interfaces, dns_provider: DnsProvider | None = None, dns_base_domain: str = "", dns_ttl_seconds: int = 120, ): self._storage = storage self._usage_resolver = usage_resolver self._ip_manager = ip_manager self._interface_provider = interface_provider self._dns_provider = dns_provider self._dns_base_domain = dns_base_domain self._dns_ttl_seconds = dns_ttl_seconds self._lock = threading.Lock() self._dns_errors_by_id: dict[str, str] = {} def list_interfaces(self) -> list[str]: interfaces = self._interface_provider() if not interfaces: raise DependencyError("No network interfaces discovered on host") return interfaces def list_entries(self) -> EntryListResponse: entries = self._storage.list_entries() usage_map, usage_known, usage_error = self._resolve_usage(entries) views: list[EntryView] = [] for entry in entries: containers = sorted(usage_map.get(entry.ip, set())) used = bool(containers) if usage_known else False effective_enabled = False try: effective_enabled = self._ip_manager.is_present(entry.ip, entry.cidr, entry.device) except CommandError: effective_enabled = False views.append( EntryView( id=entry.id, name=entry.name, ip=entry.ip, cidr=entry.cidr, device=entry.device, enabled=entry.enabled, effective_enabled=effective_enabled, used=used, containers=containers, usage_known=usage_known, dns_desired=bool(self._dns_provider) and usage_known and used and entry.enabled, dns_last_error=self._dns_errors_by_id.get(entry.id), ) ) return EntryListResponse(items=views, usage_known=usage_known, usage_error=usage_error) def create_entry(self, payload: dict) -> IpEntry: parsed = _parse_payload(payload) with self._lock: entries = self._storage.list_entries() self._assert_device_exists(parsed["device"]) self._assert_unique_binding(entries, ip=parsed["ip"], cidr=parsed["cidr"], device=parsed["device"]) self._assert_unique_name(entries, name=parsed["name"]) created = IpEntry( id=uuid4().hex, name=parsed["name"], ip=parsed["ip"], cidr=parsed["cidr"], device=parsed["device"], enabled=False, ) entries.append(created) self._storage.save_entries(entries) return created def update_entry(self, entry_id: str, payload: dict) -> IpEntry: parsed = _parse_payload(payload) with self._lock: entries = self._storage.list_entries() index, current = _find_entry(entries, entry_id) if current.enabled: raise ConflictError("Disable entry before updating IP/cidr/device") self._assert_device_exists(parsed["device"]) self._assert_unique_binding( entries, ip=parsed["ip"], cidr=parsed["cidr"], device=parsed["device"], ignore_entry_id=entry_id, ) self._assert_unique_name(entries, name=parsed["name"], ignore_entry_id=entry_id) updated = IpEntry( id=current.id, name=parsed["name"], ip=parsed["ip"], cidr=parsed["cidr"], device=parsed["device"], enabled=False, ) entries[index] = updated self._storage.save_entries(entries) return updated def set_enabled(self, entry_id: str, enabled: bool) -> IpEntry: with self._lock: entries = self._storage.list_entries() index, entry = _find_entry(entries, entry_id) previous_enabled = entry.enabled if enabled: self._ip_manager.ensure_present(entry.ip, entry.cidr, entry.device) entry.enabled = True else: self._assert_not_used(entry) self._ip_manager.ensure_absent(entry.ip, entry.cidr, entry.device) entry.enabled = False try: self._sync_dns_for_entry_locked(entry, strict=True) except Exception: # noqa: BLE001 self._rollback_enable_change(entry, previous_enabled) raise entries[index] = entry self._storage.save_entries(entries) return entry def delete_entry(self, entry_id: str) -> None: with self._lock: entries = self._storage.list_entries() _, entry = _find_entry(entries, entry_id) if entry.enabled: raise ConflictError("Disable entry before deleting") self._assert_not_used(entry) self._delete_dns_for_entry_locked(entry, strict=True) remaining = [candidate for candidate in entries if candidate.id != entry_id] self._storage.save_entries(remaining) self._dns_errors_by_id.pop(entry.id, None) def reconcile_enabled_entries(self) -> list[str]: errors: list[str] = [] with self._lock: entries = self._storage.list_entries() changed = False for entry in entries: if not entry.enabled: continue try: self._ip_manager.ensure_present(entry.ip, entry.cidr, entry.device) changed = True except CommandError as exc: errors.append(f"{entry.name} ({entry.ip}/{entry.cidr} {entry.device}): {exc}") if changed: self._storage.save_entries(entries) return errors def reconcile_dns_records(self) -> list[str]: if not self._dns_provider: return [] errors: list[str] = [] with self._lock: entries = self._storage.list_entries() usage_map, usage_known, usage_error = self._resolve_usage(entries) if not usage_known: msg = f"Docker usage check failed for DNS reconcile: {usage_error or 'unknown error'}" for entry in entries: self._dns_errors_by_id[entry.id] = msg return [msg] for entry in entries: used = bool(usage_map.get(entry.ip, set())) desired = entry.enabled and used try: self._apply_dns_state_locked(entry, desired) self._dns_errors_by_id.pop(entry.id, None) except (DnsSyncError, DependencyError, ConflictError) as exc: self._dns_errors_by_id[entry.id] = str(exc) errors.append(f"{entry.name}: {exc}") return errors def _rollback_enable_change(self, entry: IpEntry, previous_enabled: bool) -> None: try: if previous_enabled: self._ip_manager.ensure_present(entry.ip, entry.cidr, entry.device) entry.enabled = True else: self._ip_manager.ensure_absent(entry.ip, entry.cidr, entry.device) entry.enabled = False except CommandError: pass def _sync_dns_for_entry_locked(self, entry: IpEntry, strict: bool) -> None: if not self._dns_provider: return usage_map, usage_known, usage_error = self._resolve_usage([entry]) if not usage_known: msg = f"Docker usage check failed: {usage_error or 'unknown error'}" self._dns_errors_by_id[entry.id] = msg if strict: raise DependencyError(msg) return desired = entry.enabled and bool(usage_map.get(entry.ip, set())) self._apply_dns_state_locked(entry, desired) self._dns_errors_by_id.pop(entry.id, None) def _delete_dns_for_entry_locked(self, entry: IpEntry, strict: bool) -> None: if not self._dns_provider: return try: fqdn = to_fqdn(entry.name, self._dns_base_domain) self._dns_provider.delete_a_record(fqdn) self._dns_errors_by_id.pop(entry.id, None) except DnsSyncError as exc: self._dns_errors_by_id[entry.id] = str(exc) if strict: raise ConflictError(f"DNS delete failed for {entry.name}: {exc}") from exc def _apply_dns_state_locked(self, entry: IpEntry, desired: bool) -> None: if not self._dns_provider: return try: fqdn = to_fqdn(entry.name, self._dns_base_domain) if desired: self._dns_provider.upsert_a_record(fqdn, entry.ip, self._dns_ttl_seconds) else: self._dns_provider.delete_a_record(fqdn) except DnsSyncError as exc: raise ConflictError(f"DNS sync failed for {entry.name}: {exc}") from exc def _assert_not_used(self, entry: IpEntry) -> None: try: usage = self._usage_resolver.resolve_ip_usage({entry.ip}) except DockerApiError as exc: raise DependencyError(f"Docker usage check failed: {exc}") from exc containers = sorted(usage.get(entry.ip, set())) if containers: raise ConflictError( "Entry is currently used by container port bindings: " + ", ".join(containers) ) def _resolve_usage(self, entries: list[IpEntry]) -> tuple[dict[str, set[str]], bool, str | None]: ips = {entry.ip for entry in entries} if not ips: return {}, True, None try: usage = self._usage_resolver.resolve_ip_usage(ips) return usage, True, None except DockerApiError as exc: return {ip: set() for ip in ips}, False, str(exc) def _assert_unique_binding( self, entries: list[IpEntry], ip: str, cidr: int, device: str, ignore_entry_id: str | None = None, ) -> None: for entry in entries: if ignore_entry_id and entry.id == ignore_entry_id: continue if entry.ip == ip and entry.cidr == cidr and entry.device == device: raise ConflictError("Entry with same ip/cidr/device already exists") def _assert_unique_name(self, entries: list[IpEntry], name: str, ignore_entry_id: str | None = None) -> None: target = name.strip().lower() for entry in entries: if ignore_entry_id and entry.id == ignore_entry_id: continue if entry.name.strip().lower() == target: raise ConflictError("Entry name must be unique") def _assert_device_exists(self, device: str) -> None: interfaces = self.list_interfaces() if device not in interfaces: raise ValidationError(f"Unknown network device: {device}") def _find_entry(entries: list[IpEntry], entry_id: str) -> tuple[int, IpEntry]: for index, entry in enumerate(entries): if entry.id == entry_id: return index, entry raise NotFoundError(f"Entry not found: {entry_id}") def _parse_payload(payload: dict) -> dict: name = str(payload.get("name", "")).strip() ip = str(payload.get("ip", "")).strip() device = str(payload.get("device", "")).strip() if not name: raise ValidationError("Field 'name' is required") if len(name) > 96: raise ValidationError("Field 'name' is too long (max 96)") if not ip: raise ValidationError("Field 'ip' is required") try: ip_obj = IPv4Address(ip) except ValueError as exc: raise ValidationError(f"Invalid IPv4 address: {ip}") from exc if not device: raise ValidationError("Field 'device' is required") if any(ch.isspace() for ch in device): raise ValidationError("Field 'device' cannot contain whitespace") cidr_raw = payload.get("cidr") if cidr_raw is None: raise ValidationError("Field 'cidr' is required") try: cidr = int(cidr_raw) except (TypeError, ValueError) as exc: raise ValidationError("Field 'cidr' must be an integer") from exc if cidr < 0 or cidr > 32: raise ValidationError("Field 'cidr' must be between 0 and 32") return { "name": name, "ip": str(ip_obj), "cidr": cidr, "device": device, }