#!/usr/bin/env bash set -euo pipefail UNRAID_IMAGES="" INVENTORY_PATH="" OUT_PATH="artifacts/unraid-to-zima-map.yaml" usage() { cat < --inventory [--out ] Options: --unraid-images Text file with one image reference per line (required) --inventory Inventory YAML from discover-zima-repos-and-apps.sh (required) --out Output YAML (default: artifacts/unraid-to-zima-map.yaml) -h, --help Show this help USAGE } while [[ $# -gt 0 ]]; do case "$1" in --unraid-images) UNRAID_IMAGES="${2:-}" shift 2 ;; --inventory) INVENTORY_PATH="${2:-}" shift 2 ;; --out) OUT_PATH="${2:-}" shift 2 ;; -h|--help) usage exit 0 ;; *) echo "ERROR: Unknown argument: $1" >&2 usage exit 2 ;; esac done if [[ -z "$UNRAID_IMAGES" || -z "$INVENTORY_PATH" ]]; then echo "ERROR: --unraid-images and --inventory are required" >&2 usage exit 2 fi if [[ ! -f "$UNRAID_IMAGES" ]]; then echo "ERROR: unraid image file not found: $UNRAID_IMAGES" >&2 exit 2 fi if [[ ! -f "$INVENTORY_PATH" ]]; then echo "ERROR: inventory file not found: $INVENTORY_PATH" >&2 exit 2 fi mkdir -p "$(dirname "$OUT_PATH")" UNRAID_IMAGES="$UNRAID_IMAGES" \ INVENTORY_PATH="$INVENTORY_PATH" \ OUT_PATH="$OUT_PATH" \ python3 - <<'PY' import datetime as dt import os import re import sys import yaml unraid_images_path = os.environ["UNRAID_IMAGES"] inventory_path = os.environ["INVENTORY_PATH"] out_path = os.environ["OUT_PATH"] ALIASES = { "arch-sonarr": "sonarr", "arch-radarr": "radarr", "arch-prowlarr": "prowlarr", "arch-overseerr": "overseerr", "arch-flaresolverr": "flaresolverr", "arch-qbittorrentvpn": "qbittorrent", "arch-plexpass": "plex", "open-webui": "openwebui", "open_webui": "openwebui", "act_runner": "act-runner", "socket-proxy": "socketproxy", "postgres": "postgresql", "postgresql": "postgres", } GENERIC_BASE_IMAGES = { "redis", "postgres", "postgresql", "mariadb", "mysql", "mongo", "mongodb", "nginx", "memcached", "rabbitmq", "valkey", } def normalize_text(value: str) -> str: value = (value or "").strip().lower() return re.sub(r"[^a-z0-9]+", "", value) def normalize_image_ref(ref: str) -> str: value = (ref or "").strip().strip('"\'') if not value: return "" if "@" in value: value = value.split("@", 1)[0] if "/" in value: prefix, tail = value.rsplit("/", 1) if ":" in tail: tail = tail.split(":", 1)[0] value = f"{prefix}/{tail}" else: if ":" in value: value = value.split(":", 1)[0] return value.lower() def basename_image(normalized_ref: str) -> str: if not normalized_ref: return "" return normalized_ref.rsplit("/", 1)[-1] def candidate_names_from_unraid_image(image_ref: str): normalized = normalize_image_ref(image_ref) base = basename_image(normalized) candidates = set() if base: candidates.add(base) candidates.add(base.replace("_", "-")) if base.startswith("arch-"): candidates.add(base[5:]) if base in ALIASES: candidates.add(ALIASES[base]) replaced = base.replace("_", "-") if replaced in ALIASES: candidates.add(ALIASES[replaced]) return {c for c in candidates if c} with open(inventory_path, "r", encoding="utf-8") as handle: inventory = yaml.safe_load(handle) or {} apps = inventory.get("apps") or [] if not isinstance(apps, list): print("ERROR: inventory apps must be a list", file=sys.stderr) sys.exit(1) app_records = [] for app in apps: if not isinstance(app, dict): continue app_id = str(app.get("app_id") or "").strip() if not app_id: continue title = str(app.get("title") or "").strip() repo_url = str(app.get("repo_url") or "").strip() images = app.get("images") if isinstance(app.get("images"), list) else [] normalized_images = [] seen_images = set() for image in images: if not isinstance(image, str): continue norm = normalize_image_ref(image) if not norm or norm in seen_images: continue seen_images.add(norm) normalized_images.append(norm) app_records.append( { "app_id": app_id, "title": title, "repo_url": repo_url, "images": normalized_images, "id_key": normalize_text(app_id), "title_key": normalize_text(title), } ) index_exact = {} index_basename = {} for app in app_records: for image in app["images"]: index_exact.setdefault(image, []).append(app) bname = basename_image(image) if bname: index_basename.setdefault(bname, []).append(app) def app_name_matches(app, candidate_base: str) -> bool: key = normalize_text(candidate_base) if not key: return False candidate_keys = {key} alias = ALIASES.get(candidate_base) if alias: candidate_keys.add(normalize_text(alias)) for alias_source, alias_target in ALIASES.items(): if alias_target == candidate_base: candidate_keys.add(normalize_text(alias_source)) return (app["id_key"] in candidate_keys) or (app["title_key"] and app["title_key"] in candidate_keys) with open(unraid_images_path, "r", encoding="utf-8") as handle: unraid_images = [line.strip() for line in handle if line.strip() and not line.strip().startswith("#")] mapping = [] for raw_image in unraid_images: normalized = normalize_image_ref(raw_image) base = basename_image(normalized) matched = {} # Strong match: exact normalized image reference from app compose. # For generic bases (redis/postgres/etc), only accept exact matches when app name matches. for app in index_exact.get(normalized, []): if base in GENERIC_BASE_IMAGES and not app_name_matches(app, base): continue key = (app["repo_url"], app["app_id"]) matched.setdefault( key, { "app_id": app["app_id"], "title": app["title"], "repo_url": app["repo_url"], "reasons": set(), }, )["reasons"].add("image_exact") # Medium match: basename image. # Restrict generic/shared images to avoid false positives from sidecar dependencies. if base: for app in index_basename.get(base, []): if base in GENERIC_BASE_IMAGES and not app_name_matches(app, base): continue if base not in GENERIC_BASE_IMAGES and not app_name_matches(app, base): continue key = (app["repo_url"], app["app_id"]) matched.setdefault( key, { "app_id": app["app_id"], "title": app["title"], "repo_url": app["repo_url"], "reasons": set(), }, )["reasons"].add("image_basename") # Fallback match: inferred app name from image aliases. for candidate_name in candidate_names_from_unraid_image(raw_image): candidate_key = normalize_text(candidate_name) if not candidate_key: continue for app in app_records: if candidate_key == app["id_key"] or (app["title_key"] and candidate_key == app["title_key"]): key = (app["repo_url"], app["app_id"]) matched.setdefault( key, { "app_id": app["app_id"], "title": app["title"], "repo_url": app["repo_url"], "reasons": set(), }, )["reasons"].add("name_alias") matched_apps = [] for key in sorted(matched.keys(), key=lambda item: (item[0].lower(), item[1].lower())): info = matched[key] matched_apps.append( { "app_id": info["app_id"], "title": info["title"], "repo_url": info["repo_url"], "reasons": sorted(info["reasons"]), } ) if not matched_apps: status = "missing" elif len(matched_apps) == 1: status = "found" else: status = "ambiguous" mapping.append( { "unraid_image": raw_image, "normalized_image": normalized, "match_status": status, "matched_apps": matched_apps, } ) summary = { "total": len(mapping), "found": sum(1 for item in mapping if item["match_status"] == "found"), "missing": sum(1 for item in mapping if item["match_status"] == "missing"), "ambiguous": sum(1 for item in mapping if item["match_status"] == "ambiguous"), } output = { "schema_version": 1, "generated_at": dt.datetime.now(dt.timezone.utc).isoformat(), "inventory_source": inventory_path, "summary": summary, "mapping": mapping, } with open(out_path, "w", encoding="utf-8") as handle: yaml.safe_dump(output, handle, sort_keys=False, allow_unicode=False) print(f"Wrote {out_path}") print(f"Summary: total={summary['total']} found={summary['found']} missing={summary['missing']} ambiguous={summary['ambiguous']}") PY