277 lines
9.2 KiB
Python
277 lines
9.2 KiB
Python
from __future__ import annotations
|
|
|
|
from dataclasses import dataclass
|
|
from ipaddress import IPv4Address
|
|
import threading
|
|
from typing import Callable
|
|
from uuid import uuid4
|
|
|
|
from app.docker_api import DockerApiError, DockerUsageResolver
|
|
from app.interfaces import list_host_interfaces
|
|
from app.ip_commands import CommandError, IpAddressManager
|
|
from app.models import EntryView, IpEntry
|
|
from app.storage import EntryStorage
|
|
|
|
|
|
class ValidationError(RuntimeError):
|
|
pass
|
|
|
|
|
|
class NotFoundError(RuntimeError):
|
|
pass
|
|
|
|
|
|
class ConflictError(RuntimeError):
|
|
pass
|
|
|
|
|
|
class DependencyError(RuntimeError):
|
|
pass
|
|
|
|
|
|
@dataclass
|
|
class EntryListResponse:
|
|
items: list[EntryView]
|
|
usage_known: bool
|
|
usage_error: str | None
|
|
|
|
def to_dict(self) -> dict:
|
|
return {
|
|
"items": [item.to_dict() for item in self.items],
|
|
"usage_known": self.usage_known,
|
|
"usage_error": self.usage_error,
|
|
}
|
|
|
|
|
|
class EntryService:
|
|
def __init__(
|
|
self,
|
|
storage: EntryStorage,
|
|
usage_resolver: DockerUsageResolver,
|
|
ip_manager: IpAddressManager,
|
|
interface_provider: Callable[[], list[str]] = list_host_interfaces,
|
|
):
|
|
self._storage = storage
|
|
self._usage_resolver = usage_resolver
|
|
self._ip_manager = ip_manager
|
|
self._interface_provider = interface_provider
|
|
self._lock = threading.Lock()
|
|
|
|
def list_interfaces(self) -> list[str]:
|
|
interfaces = self._interface_provider()
|
|
if not interfaces:
|
|
raise DependencyError("No network interfaces discovered on host")
|
|
return interfaces
|
|
|
|
def list_entries(self) -> EntryListResponse:
|
|
entries = self._storage.list_entries()
|
|
usage_map, usage_known, usage_error = self._resolve_usage(entries)
|
|
|
|
views: list[EntryView] = []
|
|
for entry in entries:
|
|
containers = sorted(usage_map.get(entry.ip, set()))
|
|
used = bool(containers) if usage_known else False
|
|
effective_enabled = False
|
|
try:
|
|
effective_enabled = self._ip_manager.is_present(entry.ip, entry.cidr, entry.device)
|
|
except CommandError:
|
|
effective_enabled = False
|
|
|
|
views.append(
|
|
EntryView(
|
|
id=entry.id,
|
|
name=entry.name,
|
|
ip=entry.ip,
|
|
cidr=entry.cidr,
|
|
device=entry.device,
|
|
enabled=entry.enabled,
|
|
effective_enabled=effective_enabled,
|
|
used=used,
|
|
containers=containers,
|
|
usage_known=usage_known,
|
|
)
|
|
)
|
|
|
|
return EntryListResponse(items=views, usage_known=usage_known, usage_error=usage_error)
|
|
|
|
def create_entry(self, payload: dict) -> IpEntry:
|
|
parsed = _parse_payload(payload)
|
|
with self._lock:
|
|
entries = self._storage.list_entries()
|
|
self._assert_device_exists(parsed["device"])
|
|
self._assert_unique_binding(entries, ip=parsed["ip"], cidr=parsed["cidr"], device=parsed["device"])
|
|
|
|
created = IpEntry(
|
|
id=uuid4().hex,
|
|
name=parsed["name"],
|
|
ip=parsed["ip"],
|
|
cidr=parsed["cidr"],
|
|
device=parsed["device"],
|
|
enabled=False,
|
|
)
|
|
entries.append(created)
|
|
self._storage.save_entries(entries)
|
|
return created
|
|
|
|
def update_entry(self, entry_id: str, payload: dict) -> IpEntry:
|
|
parsed = _parse_payload(payload)
|
|
with self._lock:
|
|
entries = self._storage.list_entries()
|
|
index, current = _find_entry(entries, entry_id)
|
|
if current.enabled:
|
|
raise ConflictError("Disable entry before updating IP/cidr/device")
|
|
|
|
self._assert_device_exists(parsed["device"])
|
|
self._assert_unique_binding(
|
|
entries,
|
|
ip=parsed["ip"],
|
|
cidr=parsed["cidr"],
|
|
device=parsed["device"],
|
|
ignore_entry_id=entry_id,
|
|
)
|
|
updated = IpEntry(
|
|
id=current.id,
|
|
name=parsed["name"],
|
|
ip=parsed["ip"],
|
|
cidr=parsed["cidr"],
|
|
device=parsed["device"],
|
|
enabled=False,
|
|
)
|
|
entries[index] = updated
|
|
self._storage.save_entries(entries)
|
|
return updated
|
|
|
|
def set_enabled(self, entry_id: str, enabled: bool) -> IpEntry:
|
|
with self._lock:
|
|
entries = self._storage.list_entries()
|
|
index, entry = _find_entry(entries, entry_id)
|
|
|
|
if enabled:
|
|
self._ip_manager.ensure_present(entry.ip, entry.cidr, entry.device)
|
|
entry.enabled = True
|
|
else:
|
|
self._assert_not_used(entry)
|
|
self._ip_manager.ensure_absent(entry.ip, entry.cidr, entry.device)
|
|
entry.enabled = False
|
|
|
|
entries[index] = entry
|
|
self._storage.save_entries(entries)
|
|
return entry
|
|
|
|
def delete_entry(self, entry_id: str) -> None:
|
|
with self._lock:
|
|
entries = self._storage.list_entries()
|
|
_, entry = _find_entry(entries, entry_id)
|
|
if entry.enabled:
|
|
raise ConflictError("Disable entry before deleting")
|
|
|
|
self._assert_not_used(entry)
|
|
remaining = [candidate for candidate in entries if candidate.id != entry_id]
|
|
self._storage.save_entries(remaining)
|
|
|
|
def reconcile_enabled_entries(self) -> list[str]:
|
|
errors: list[str] = []
|
|
with self._lock:
|
|
entries = self._storage.list_entries()
|
|
changed = False
|
|
for entry in entries:
|
|
if not entry.enabled:
|
|
continue
|
|
try:
|
|
self._ip_manager.ensure_present(entry.ip, entry.cidr, entry.device)
|
|
changed = True
|
|
except CommandError as exc:
|
|
errors.append(f"{entry.name} ({entry.ip}/{entry.cidr} {entry.device}): {exc}")
|
|
if changed:
|
|
self._storage.save_entries(entries)
|
|
return errors
|
|
|
|
def _assert_not_used(self, entry: IpEntry) -> None:
|
|
try:
|
|
usage = self._usage_resolver.resolve_ip_usage({entry.ip})
|
|
except DockerApiError as exc:
|
|
raise DependencyError(f"Docker usage check failed: {exc}") from exc
|
|
|
|
containers = sorted(usage.get(entry.ip, set()))
|
|
if containers:
|
|
raise ConflictError(
|
|
"Entry is currently used by container port bindings: " + ", ".join(containers)
|
|
)
|
|
|
|
def _resolve_usage(self, entries: list[IpEntry]) -> tuple[dict[str, set[str]], bool, str | None]:
|
|
ips = {entry.ip for entry in entries}
|
|
if not ips:
|
|
return {}, True, None
|
|
|
|
try:
|
|
usage = self._usage_resolver.resolve_ip_usage(ips)
|
|
return usage, True, None
|
|
except DockerApiError as exc:
|
|
return {ip: set() for ip in ips}, False, str(exc)
|
|
|
|
def _assert_unique_binding(
|
|
self,
|
|
entries: list[IpEntry],
|
|
ip: str,
|
|
cidr: int,
|
|
device: str,
|
|
ignore_entry_id: str | None = None,
|
|
) -> None:
|
|
for entry in entries:
|
|
if ignore_entry_id and entry.id == ignore_entry_id:
|
|
continue
|
|
if entry.ip == ip and entry.cidr == cidr and entry.device == device:
|
|
raise ConflictError("Entry with same ip/cidr/device already exists")
|
|
|
|
def _assert_device_exists(self, device: str) -> None:
|
|
interfaces = self.list_interfaces()
|
|
if device not in interfaces:
|
|
raise ValidationError(f"Unknown network device: {device}")
|
|
|
|
|
|
def _find_entry(entries: list[IpEntry], entry_id: str) -> tuple[int, IpEntry]:
|
|
for index, entry in enumerate(entries):
|
|
if entry.id == entry_id:
|
|
return index, entry
|
|
raise NotFoundError(f"Entry not found: {entry_id}")
|
|
|
|
|
|
def _parse_payload(payload: dict) -> dict:
|
|
name = str(payload.get("name", "")).strip()
|
|
ip = str(payload.get("ip", "")).strip()
|
|
device = str(payload.get("device", "")).strip()
|
|
|
|
if not name:
|
|
raise ValidationError("Field 'name' is required")
|
|
if len(name) > 96:
|
|
raise ValidationError("Field 'name' is too long (max 96)")
|
|
|
|
if not ip:
|
|
raise ValidationError("Field 'ip' is required")
|
|
try:
|
|
ip_obj = IPv4Address(ip)
|
|
except ValueError as exc:
|
|
raise ValidationError(f"Invalid IPv4 address: {ip}") from exc
|
|
|
|
if not device:
|
|
raise ValidationError("Field 'device' is required")
|
|
if any(ch.isspace() for ch in device):
|
|
raise ValidationError("Field 'device' cannot contain whitespace")
|
|
|
|
raw_cidr = payload.get("cidr")
|
|
if raw_cidr is None:
|
|
raise ValidationError("Field 'cidr' is required")
|
|
try:
|
|
cidr = int(raw_cidr)
|
|
except (TypeError, ValueError) as exc:
|
|
raise ValidationError("Field 'cidr' must be an integer") from exc
|
|
if cidr < 0 or cidr > 32:
|
|
raise ValidationError("Field 'cidr' must be in range 0..32")
|
|
|
|
return {
|
|
"name": name,
|
|
"ip": str(ip_obj),
|
|
"cidr": cidr,
|
|
"device": device,
|
|
}
|