#!/usr/bin/env python3
"""
Frigate Config Wizard v1.1
A guided setup tool for Frigate NVR configuration files.
New in v1.1:
- RTSP connection test (ffprobe preferred, OpenCV fallback, raw TCP last resort)
- Local network camera scanner (nmap fast scan OR Python TCP sweep)
- Scan consent + sudo auth gate before any scanning
- Per-camera quick-test button on camera cards
- Dependency status banner on Overview page
"""
import tkinter as tk
from tkinter import ttk, messagebox, filedialog
import yaml
import re
import os
import ipaddress
import subprocess
import threading
import socket
import shutil
from copy import deepcopy
# ── Optional heavy deps (graceful degradation) ────────────────────────────────
try:
import cv2
_HAVE_CV2 = True
except ImportError:
_HAVE_CV2 = False
try:
from onvif import ONVIFCamera
_HAVE_ONVIF = True
except ImportError:
_HAVE_ONVIF = False
# ─── Default config path ──────────────────────────────────────────────────────
DEFAULT_CONFIG_PATH = "/opt/frigate/config.yml"
# ─── Colour palette ───────────────────────────────────────────────────────────
BG_DARK = "#0d1117"
BG_PANEL = "#161b22"
BG_CARD = "#1c2128"
ACCENT = "#00b4d8"
ACCENT2 = "#0077b6"
SUCCESS = "#3fb950"
WARNING = "#d29922"
ERROR = "#f85149"
TEXT_PRI = "#e6edf3"
TEXT_SEC = "#8b949e"
BORDER = "#30363d"
BTN_BG = "#21262d"
BTN_HOV = "#30363d"
FONT_HEAD = ("Courier New", 22, "bold")
FONT_SUB = ("Courier New", 11)
FONT_LABEL = ("Courier New", 10, "bold")
FONT_INPUT = ("Courier New", 10)
FONT_SMALL = ("Courier New", 9)
FONT_MONO = ("Courier New", 9)
# ═══════════════════════════════════════════════════════════════════════════════
# VALIDATION HELPERS
# ═══════════════════════════════════════════════════════════════════════════════
def validate_ip(ip: str) -> bool:
try:
ipaddress.ip_address(ip)
return True
except ValueError:
return False
def validate_rtsp_url(url: str) -> bool:
return bool(re.match(r'^rtsp[s]?://', url, re.IGNORECASE))
def validate_camera_name(name: str) -> bool:
return bool(re.match(r'^[a-zA-Z][a-zA-Z0-9_]*$', name))
def extract_ip_from_rtsp(url: str) -> str:
m = re.search(r'rtsp[s]?://(?:[^@]+@)?([0-9a-fA-F.:]+)', url)
return m.group(1) if m else ""
def get_local_subnet() -> str:
try:
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.connect(("8.8.8.8", 80))
ip = s.getsockname()[0]
s.close()
parts = ip.split(".")
return f"{parts[0]}.{parts[1]}.{parts[2]}.0/24"
except Exception:
return "192.168.1.0/24"
# ═══════════════════════════════════════════════════════════════════════════════
# RTSP CONNECTION TESTER
# ═══════════════════════════════════════════════════════════════════════════════
class RTSPTester:
"""
Tests an RTSP URL by actually connecting to the stream.
Method priority:
1. ffprobe — fastest, no decode, gives codec/resolution info
2. OpenCV — if python3-opencv is installed
3. TCP port check — last resort, just proves the port is reachable
Returns (success: bool, message: str)
"""
TIMEOUT = 8
@classmethod
def test(cls, url: str):
if not validate_rtsp_url(url):
return False, "URL must start with rtsp:// or rtsps://"
if shutil.which("ffprobe"):
return cls._ffprobe(url)
if _HAVE_CV2:
return cls._opencv(url)
return cls._tcp(url)
@classmethod
def _ffprobe(cls, url: str):
cmd = [
"ffprobe", "-v", "error",
"-rtsp_transport", "tcp",
"-i", url,
"-show_entries", "stream=codec_type,width,height,codec_name",
"-of", "default=noprint_wrappers=1",
"-timeout", str(cls.TIMEOUT * 1_000_000),
]
try:
r = subprocess.run(cmd, capture_output=True, text=True,
timeout=cls.TIMEOUT + 2)
if r.returncode == 0 and r.stdout.strip():
info = {k: v for k, v in
(ln.split("=", 1) for ln in r.stdout.strip().splitlines()
if "=" in ln)}
codec = info.get("codec_name", "?")
w = info.get("width", "?")
h = info.get("height", "?")
return True, f"✓ Stream OK | codec={codec} {w}×{h}"
err = (r.stderr or "")[:200]
return False, f"ffprobe failed: {err}"
except subprocess.TimeoutExpired:
return False, f"Timed out after {cls.TIMEOUT}s"
except Exception as e:
return False, f"ffprobe error: {e}"
@classmethod
def _opencv(cls, url: str):
try:
cap = cv2.VideoCapture(url, cv2.CAP_FFMPEG)
cap.set(cv2.CAP_PROP_OPEN_TIMEOUT_MSEC, cls.TIMEOUT * 1000)
ok = cap.isOpened()
if ok:
w = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
h = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
cap.release()
return True, f"✓ Stream OK | {w}×{h} (OpenCV)"
cap.release()
return False, "OpenCV could not open stream"
except Exception as e:
return False, f"OpenCV error: {e}"
@classmethod
def _tcp(cls, url: str):
m = re.search(r'rtsp[s]?://(?:[^@]+@)?([0-9a-fA-F.:]+):?(\d*)', url)
if not m:
return False, "Could not parse host/port from URL"
host = m.group(1)
port = int(m.group(2)) if m.group(2) else 554
try:
s = socket.create_connection((host, port), timeout=cls.TIMEOUT)
s.close()
return True, (f"✓ Port {port} reachable on {host} "
"(install ffprobe for full stream test)")
except (socket.timeout, ConnectionRefusedError, OSError) as e:
return False, f"TCP connect failed: {e}"
# ═══════════════════════════════════════════════════════════════════════════════
# NETWORK SCANNER
# ═══════════════════════════════════════════════════════════════════════════════
RTSP_PORTS = [554, 8554, 10554, 1935, 5543]
RTSP_PATH_HINTS = [
"/stream1", "/stream2", "/h264", "/live",
"/cam/realmonitor?channel=1&subtype=0", # Dahua
"/Streaming/Channels/101", # Hikvision
"/live/ch00_0", # Reolink
"/videoMain", # Axis older
"/1", "/live.sdp",
]
class NetworkScanner:
@staticmethod
def check_nmap() -> bool:
return shutil.which("nmap") is not None
@staticmethod
def scan_subnet(subnet: str, progress_cb=None) -> list:
"""Pure-Python TCP sweep — no root needed, ~254 hosts × 5 ports."""
try:
hosts = [str(h) for h in
ipaddress.ip_network(subnet, strict=False).hosts()]
except ValueError:
return []
results = []
total = len(hosts)
for i, ip in enumerate(hosts):
if progress_cb:
progress_cb(i, total, f"Probing {ip}…")
for port in RTSP_PORTS:
try:
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.settimeout(0.4)
if s.connect_ex((ip, port)) == 0:
results.append({
"ip": ip, "port": port,
"rtsp_guesses": [f"rtsp://{ip}:{port}{p}"
for p in RTSP_PATH_HINTS],
"onvif_uri": None, "onvif_name": None,
})
s.close()
except Exception:
pass
if progress_cb:
progress_cb(total, total, "Scan complete.")
return results
@staticmethod
def scan_with_nmap(subnet: str, sudo_pw: str = "",
progress_cb=None) -> list:
"""nmap SYN scan — much faster; needs root for -sS."""
ports_str = ",".join(str(p) for p in RTSP_PORTS)
if sudo_pw:
cmd = ["sudo", "-S", "nmap"]
else:
cmd = ["nmap"]
cmd += ["-p", ports_str, "--open", "-T4",
"--host-timeout", "3s", "-oG", "-", subnet]
if progress_cb:
progress_cb(0, 1, "Running nmap…")
try:
inp = (sudo_pw + "\n") if sudo_pw else None
r = subprocess.run(cmd, input=inp, capture_output=True,
text=True, timeout=120)
raw = r.stdout
except Exception:
return []
results = []
for line in raw.splitlines():
if "Ports:" not in line:
continue
ip_m = re.search(r'Host: (\d+\.\d+\.\d+\.\d+)', line)
if not ip_m:
continue
ip = ip_m.group(1)
for p_str in re.findall(r'(\d+)/open', line):
results.append({
"ip": ip, "port": int(p_str),
"rtsp_guesses": [f"rtsp://{ip}:{p_str}{p}"
for p in RTSP_PATH_HINTS],
"onvif_uri": None, "onvif_name": None,
})
if progress_cb:
progress_cb(1, 1, f"nmap found {len(results)} open port(s).")
return results
# ═══════════════════════════════════════════════════════════════════════════════
# CONFIG MANAGER
# ═══════════════════════════════════════════════════════════════════════════════
class FrigateConfig:
DEFAULTS = {
"mqtt": {
"enabled": False, "host": "127.0.0.1", "port": 1883,
"topic_prefix": "frigate", "client_id": "frigate",
"user": "", "password": ""
},
"detectors": {"cpu": {"type": "cpu", "num_threads": 3}},
"record": {
"enabled": False,
"retain": {"days": 0, "mode": "all"},
"events": {"retain": {"default": 10, "mode": "motion"}}
},
"snapshots": {"enabled": False, "retain": {"default": 10}},
"cameras": {}
}
def __init__(self):
self.data = deepcopy(self.DEFAULTS)
def load(self, path: str):
with open(path, "r") as f:
self.data = yaml.safe_load(f) or {}
self.data.setdefault("cameras", {})
def save(self, path: str):
"""
Write config.yml. Falls back to a sudo tee write if the file or its
parent directory is not writable by the current user (common when the
wizard lives in /opt/ and was set up as root).
Raises PermissionError with a human-readable message on failure.
"""
abs_path = os.path.abspath(path)
dir_path = os.path.dirname(abs_path)
# Make sure the directory exists (may need sudo too)
if not os.path.isdir(dir_path):
try:
os.makedirs(dir_path, exist_ok=True)
except PermissionError:
# Try via sudo
r = subprocess.run(["sudo", "mkdir", "-p", dir_path],
capture_output=True)
if r.returncode != 0:
raise PermissionError(
f"Cannot create directory {dir_path}.\n"
"Run the wizard with sudo, or fix the directory permissions:\n"
f" sudo mkdir -p {dir_path}\n"
f" sudo chown $USER {dir_path}")
content = yaml.dump(self.data, default_flow_style=False,
sort_keys=False)
# Try a normal write first
try:
with open(abs_path, "w") as f:
f.write(content)
return
except PermissionError:
pass
# Fallback: write via sudo tee (non-interactive — only works if the
# user already has a cached sudo token or NOPASSWD sudo rights)
r = subprocess.run(
["sudo", "tee", abs_path],
input=content, capture_output=True, text=True
)
if r.returncode == 0:
return
raise PermissionError(
f"Cannot write to {abs_path} — permission denied.\n\n"
"Fix options:\n"
" 1. Run the wizard with sudo:\n"
" sudo python3 /opt/frigate/camera_wizard.py\n"
" 2. Give your user write access to the file:\n"
f" sudo chown $USER {abs_path}\n"
" 3. Change the config path (top-right of the wizard) to a\n"
" location your user can write to, e.g. ~/frigate_config.yml"
)
def get_mqtt(self):
return self.data.get("mqtt", deepcopy(self.DEFAULTS["mqtt"]))
def set_mqtt(self, cfg):
self.data["mqtt"] = cfg
def get_detector_type(self) -> str:
for v in self.data.get("detectors", {}).values():
return v.get("type", "cpu")
return "cpu"
def set_detector(self, det_type: str, threads: int = 3):
if det_type == "cpu":
self.data["detectors"] = {"cpu": {"type": "cpu",
"num_threads": threads}}
else:
self.data["detectors"] = {det_type: {"type": det_type}}
def get_cameras(self) -> dict:
return self.data.get("cameras", {})
def add_camera(self, name: str, cfg: dict):
self.data.setdefault("cameras", {})[name] = cfg
def remove_camera(self, name: str):
self.data.get("cameras", {}).pop(name, None)
def all_ips(self, exclude_name: str = "") -> set:
ips = set()
for name, cam in self.get_cameras().items():
if name == exclude_name:
continue
try:
ip = extract_ip_from_rtsp(
cam["ffmpeg"]["inputs"][0]["path"])
if ip:
ips.add(ip)
except (KeyError, IndexError):
pass
return ips
def all_names(self, exclude: str = "") -> set:
return {n for n in self.get_cameras() if n != exclude}
def build_camera_cfg(self, rtsp_url, roles, detect_w, detect_h,
fps, record, snapshots):
return {
"ffmpeg": {"inputs": [{"path": rtsp_url, "roles": roles}]},
"detect": {"width": detect_w, "height": detect_h,
"fps": fps, "enabled": "detect" in roles},
"record": {"enabled": record},
"snapshots": {"enabled": snapshots},
"motion": {"mask": []},
}
# ═══════════════════════════════════════════════════════════════════════════════
# WIDGET HELPERS
# ═══════════════════════════════════════════════════════════════════════════════
def styled_frame(parent, **kw):
return tk.Frame(parent,
bg=kw.pop("bg", BG_CARD),
highlightbackground=kw.pop("hbg", BORDER),
highlightthickness=kw.pop("ht", 1), **kw)
def label(parent, text, font=FONT_LABEL, fg=TEXT_PRI, bg=BG_CARD, **kw):
return tk.Label(parent, text=text, font=font, fg=fg, bg=bg, **kw)
def entry(parent, textvariable=None, width=30, show=None,
bg=BTN_BG, fg=TEXT_PRI, **kw):
"""Styled tk.Entry. bg/fg are named args so callers can override without
hitting a \'multiple values for keyword argument\' collision."""
return tk.Entry(parent, textvariable=textvariable, width=width,
font=FONT_INPUT, bg=bg, fg=fg,
insertbackground=ACCENT, relief="flat",
highlightbackground=BORDER, highlightthickness=1,
show=show or "", **kw)
def btn(parent, text, command, bg=BTN_BG, fg=TEXT_PRI,
accent=False, danger=False, **kw):
color = ACCENT if accent else (ERROR if danger else bg)
fg_col = BG_DARK if accent else fg
b = tk.Button(parent, text=text, command=command,
font=FONT_LABEL, bg=color, fg=fg_col,
relief="flat", cursor="hand2",
activebackground=ACCENT2, activeforeground=BG_DARK,
padx=12, pady=6, **kw)
b.bind("<Enter>", lambda e: b.config(bg=ACCENT2 if accent else BTN_HOV))
b.bind("<Leave>", lambda e: b.config(bg=color))
return b
def section_header(parent, text, bg=BG_CARD):
f = tk.Frame(parent, bg=bg)
tk.Label(f, text="▸ " + text, font=("Courier New", 12, "bold"),
fg=ACCENT, bg=bg).pack(side="left")
tk.Frame(f, bg=BORDER, height=1).pack(
side="left", fill="x", expand=True, padx=(8, 0))
return f
# ═══════════════════════════════════════════════════════════════════════════════
# TOAST
# ═══════════════════════════════════════════════════════════════════════════════
class Toast:
def __init__(self, root):
self.root = root
self._win = None
def show(self, msg, color=SUCCESS, duration=2800):
if self._win:
try: self._win.destroy()
except Exception: pass
w = tk.Toplevel(self.root)
w.overrideredirect(True)
w.attributes("-topmost", True)
w.configure(bg=color)
tk.Label(w, text=msg, font=FONT_LABEL, fg=BG_DARK,
bg=color, padx=16, pady=8).pack()
rx = self.root.winfo_x() + self.root.winfo_width() - 340
ry = self.root.winfo_y() + self.root.winfo_height() - 80
w.geometry(f"+{rx}+{ry}")
self._win = w
self.root.after(duration, self._dismiss)
def _dismiss(self):
try:
if self._win:
self._win.destroy()
self._win = None
except Exception:
pass
# ═══════════════════════════════════════════════════════════════════════════════
# SCAN CONSENT + SUDO AUTH GATE
# ═══════════════════════════════════════════════════════════════════════════════
class ScanConsentDialog(tk.Toplevel):
"""
Explains exactly what will happen before any packets are sent,
and gates sudo credentials when nmap needs elevated privileges.
self.result:
None → user cancelled, do nothing
("nosudo", "") → proceed without root (Python TCP or unprivileged nmap)
("sudo", "<pw>") → proceed with sudo nmap
"""
def __init__(self, parent, subnet: str, use_nmap: bool):
super().__init__(parent)
self.title("Network Scan — Review & Authorise")
self.configure(bg=BG_DARK)
self.resizable(False, False)
self.grab_set()
self.result = None
self._pw_var = tk.StringVar()
self._needs_sudo = use_nmap and (os.geteuid() != 0)
self._build(subnet, use_nmap)
self._center()
def _center(self):
self.update_idletasks()
pw = self.master.winfo_x() + self.master.winfo_width() // 2
ph = self.master.winfo_y() + self.master.winfo_height() // 2
self.geometry(
f"+{pw - self.winfo_width()//2}+{ph - self.winfo_height()//2}")
def _build(self, subnet, use_nmap):
# ── Warning header ────────────────────────────────────────────────────
tk.Frame(self, bg=WARNING).pack(fill="x")
tk.Label(self, text="⚠ BEFORE YOU SCAN",
font=("Courier New", 13, "bold"),
fg=BG_DARK, bg=WARNING, pady=10).pack(fill="x")
body = tk.Frame(self, bg=BG_PANEL)
body.pack(fill="both", expand=True)
# ── What will happen ──────────────────────────────────────────────────
section_header(body, "What this scan will do",
BG_PANEL).pack(fill="x", padx=20, pady=(16, 6))
method = ("nmap TCP/SYN port scan" if use_nmap
else "Python TCP socket connect (no root needed)")
details = (
f" • Subnet scanned : {subnet}\n"
f" • Scan method : {method}\n"
f" • Ports probed : {', '.join(str(p) for p in RTSP_PORTS)}\n"
f" • What is sent : One TCP SYN packet per port per host\n"
f" • Credentials : None are sent during scanning\n"
f" • Disk writes : None\n"
f" • Scope : Your local subnet only\n"
)
label(body, details, font=FONT_SMALL, fg=TEXT_PRI,
bg=BG_PANEL, justify="left").pack(padx=28, anchor="w")
# ── Legal notice ──────────────────────────────────────────────────────
section_header(body, "Important notices",
BG_PANEL).pack(fill="x", padx=20, pady=(12, 6))
warns = (
" • Only scan networks you own or have explicit permission to scan.\n"
" • Unauthorised port scanning may be illegal in your jurisdiction.\n"
" • Some cameras or routers may log or temporarily block the scan.\n"
" • Results are best-effort; not every camera will be found.\n"
)
label(body, warns, font=FONT_SMALL, fg=WARNING,
bg=BG_PANEL, justify="left").pack(padx=28, anchor="w")
# ── Sudo section (only when needed) ───────────────────────────────────
if self._needs_sudo:
section_header(body, "Authentication required",
BG_PANEL).pack(fill="x", padx=20, pady=(12, 6))
label(body,
" nmap SYN scan requires root. Enter your sudo password\n"
" to use the faster scan, or click 'Scan without sudo'\n"
" for a slower Python TCP sweep that needs no extra privileges.",
font=FONT_SMALL, fg=TEXT_SEC,
bg=BG_PANEL, justify="left").pack(padx=28, anchor="w")
pw_row = tk.Frame(body, bg=BG_PANEL)
pw_row.pack(padx=28, pady=8, anchor="w")
label(pw_row, "sudo password:", bg=BG_PANEL).pack(side="left")
entry(pw_row, textvariable=self._pw_var, width=22,
show="•").pack(side="left", padx=8)
# ── Buttons ───────────────────────────────────────────────────────────
bf = tk.Frame(body, bg=BG_PANEL)
bf.pack(fill="x", padx=20, pady=(12, 20))
btn(bf, "✕ Cancel", self._cancel,
danger=True).pack(side="right", padx=4)
if self._needs_sudo:
btn(bf, "▶ Scan with sudo",
self._confirm_sudo, accent=True).pack(side="right", padx=4)
btn(bf, "▶ Scan without sudo (slower)",
self._confirm_nosudo).pack(side="right", padx=4)
else:
btn(bf, "▶ Yes, start scan",
self._confirm_nosudo, accent=True).pack(side="right", padx=4)
def _cancel(self):
self.result = None
self.destroy()
def _confirm_sudo(self):
pw = self._pw_var.get()
if not pw:
messagebox.showwarning("Password required",
"Enter your sudo password first.",
parent=self)
return
try:
r = subprocess.run(["sudo", "-S", "true"],
input=pw + "\n",
capture_output=True, text=True, timeout=5)
if r.returncode != 0:
messagebox.showerror("Authentication failed",
"Incorrect sudo password.", parent=self)
return
except Exception as e:
messagebox.showerror("Error", str(e), parent=self)
return
self.result = ("sudo", pw)
self.destroy()
def _confirm_nosudo(self):
self.result = ("nosudo", "")
self.destroy()
# ═══════════════════════════════════════════════════════════════════════════════
# SCAN PROGRESS DIALOG
# ═══════════════════════════════════════════════════════════════════════════════
# ScanProgressDialog has been replaced by an inline scan banner inside
# CamerasPage._show_scan_banner() — no separate Toplevel needed.
# ═══════════════════════════════════════════════════════════════════════════════
# SCAN RESULTS DIALOG
# ═══════════════════════════════════════════════════════════════════════════════
class ScanResultsDialog(tk.Toplevel):
def __init__(self, parent, results: list, config: FrigateConfig,
on_added=None):
super().__init__(parent)
self.title("Scan Results — Discovered Cameras")
self.configure(bg=BG_DARK)
self.geometry("760x520")
self.grab_set()
self._results = results
self._config = config
self._on_added = on_added
self._build()
self._center()
def _center(self):
self.update_idletasks()
pw = self.master.winfo_x() + self.master.winfo_width() // 2
ph = self.master.winfo_y() + self.master.winfo_height() // 2
self.geometry(
f"+{pw - self.winfo_width()//2}+{ph - self.winfo_height()//2}")
def _build(self):
tk.Label(self,
text=f"🔍 Found {len(self._results)} host(s) with RTSP ports open",
font=("Courier New", 12, "bold"),
fg=BG_DARK, bg=ACCENT2, pady=10).pack(fill="x")
if not self._results:
label(self,
"No cameras found on the local subnet.\n"
"Try entering the RTSP URL manually.",
font=FONT_SUB, fg=TEXT_SEC, bg=BG_DARK).pack(pady=40)
btn(self, "Close", self.destroy).pack()
return
canvas = tk.Canvas(self, bg=BG_PANEL, highlightthickness=0)
sb = ttk.Scrollbar(self, orient="vertical", command=canvas.yview)
inner = tk.Frame(canvas, bg=BG_PANEL)
inner.bind("<Configure>",
lambda e: canvas.configure(
scrollregion=canvas.bbox("all")))
canvas.create_window((0, 0), window=inner, anchor="nw")
canvas.configure(yscrollcommand=sb.set)
canvas.pack(side="left", fill="both", expand=True)
sb.pack(side="right", fill="y")
for item in self._results:
self._result_card(inner, item)
tk.Frame(self, bg=BG_DARK).pack(side="bottom", fill="x")
btn(self, "Close", self.destroy).pack(
side="bottom", padx=16, pady=8, anchor="e")
def _result_card(self, parent, item: dict):
card = styled_frame(parent, bg=BG_CARD, ht=1)
card.pack(fill="x", padx=12, pady=4)
ip, port = item["ip"], item["port"]
top = tk.Frame(card, bg=BG_CARD)
top.pack(fill="x", padx=14, pady=(10, 4))
label(top, f"🌐 {ip}:{port}",
font=("Courier New", 11, "bold"),
fg=TEXT_PRI, bg=BG_CARD).pack(side="left")
if item.get("onvif_uri"):
label(card, f"ONVIF: {item['onvif_uri']}",
font=FONT_SMALL, fg=ACCENT, bg=BG_CARD).pack(
padx=14, anchor="w")
guesses = item["rtsp_guesses"]
if item.get("onvif_uri"):
guesses = [item["onvif_uri"]] + guesses
url_var = tk.StringVar(value=guesses[0] if guesses
else f"rtsp://{ip}:{port}/")
url_row = tk.Frame(card, bg=BG_CARD)
url_row.pack(fill="x", padx=14, pady=(0, 4))
label(url_row, "URL:", font=FONT_SMALL,
fg=TEXT_SEC, bg=BG_CARD).pack(side="left")
entry(url_row, textvariable=url_var, width=46).pack(
side="left", padx=6)
ttk.Combobox(url_row, textvariable=url_var,
values=guesses, width=4,
state="readonly",
font=FONT_SMALL).pack(side="left")
status_lbl = label(card, "", font=FONT_SMALL,
fg=TEXT_SEC, bg=BG_CARD)
status_lbl.pack(padx=14, anchor="w")
bf = tk.Frame(card, bg=BG_CARD)
bf.pack(padx=14, pady=(4, 10), anchor="w")
def _test(u=url_var, sl=status_lbl):
sl.config(text="⏳ Testing…", fg=TEXT_SEC)
card.update_idletasks()
def worker():
ok, msg = RTSPTester.test(u.get())
sl.after(0, lambda: sl.config(
text=msg, fg=SUCCESS if ok else ERROR))
threading.Thread(target=worker, daemon=True).start()
def _add(u=url_var):
CameraDialog(self, self._config,
prefill_rtsp=u.get(),
on_save=self._on_added)
btn(bf, "⚡ Test Connection", _test).pack(side="left", padx=(0, 6))
btn(bf, "+ Add Camera", _add, accent=True).pack(side="left")
# ═══════════════════════════════════════════════════════════════════════════════
# CAMERA FORM DIALOG
# ═══════════════════════════════════════════════════════════════════════════════
class CameraDialog(tk.Toplevel):
def __init__(self, parent, config: FrigateConfig,
existing_name: str = "",
prefill_rtsp: str = "",
on_save=None):
super().__init__(parent)
self.config = config
self.existing_name = existing_name
self.prefill_rtsp = prefill_rtsp
self.on_save = on_save
self.title("Edit Camera" if existing_name else "Add Camera")
self.configure(bg=BG_DARK)
self.resizable(False, False)
self.grab_set()
self._build()
self._center()
if existing_name:
self._populate(existing_name)
elif prefill_rtsp:
self.v_rtsp.set(prefill_rtsp)
def _center(self):
self.update_idletasks()
pw = self.master.winfo_x() + self.master.winfo_width() // 2
ph = self.master.winfo_y() + self.master.winfo_height() // 2
self.geometry(
f"+{pw - self.winfo_width()//2}+{ph - self.winfo_height()//2}")
def _build(self):
pad = dict(padx=16, pady=6)
main = styled_frame(self, bg=BG_PANEL, ht=0)
main.pack(fill="both", expand=True)
tk.Frame(main, bg=ACCENT2).pack(fill="x")
tk.Label(main, text="📷 CAMERA SETUP",
font=("Courier New", 13, "bold"),
fg=BG_DARK, bg=ACCENT2, pady=10).pack(fill="x")
body = tk.Frame(main, bg=BG_PANEL)
body.pack(fill="both", expand=True, padx=20, pady=16)
# Name
section_header(body, "Identification", BG_PANEL).grid(
row=0, column=0, columnspan=2, sticky="ew", pady=(0, 4))
label(body, "Camera Name *", bg=BG_PANEL).grid(
row=1, column=0, sticky="w", **pad)
self.v_name = tk.StringVar()
entry(body, textvariable=self.v_name, width=28).grid(
row=1, column=1, sticky="ew", **pad)
label(body, "Letters, numbers, underscores — must start with a letter",
font=FONT_SMALL, fg=TEXT_SEC, bg=BG_PANEL).grid(
row=2, column=1, sticky="w", padx=16)
# RTSP
section_header(body, "Stream", BG_PANEL).grid(
row=3, column=0, columnspan=2, sticky="ew", pady=(12, 4))
label(body, "RTSP URL *", bg=BG_PANEL).grid(
row=4, column=0, sticky="w", **pad)
self.v_rtsp = tk.StringVar()
self.v_rtsp.trace_add("write", self._on_rtsp_change)
entry(body, textvariable=self.v_rtsp, width=38).grid(
row=4, column=1, sticky="ew", **pad)
label(body, "e.g. rtsp://admin:pass@192.168.1.100:554/stream",
font=FONT_SMALL, fg=TEXT_SEC, bg=BG_PANEL).grid(
row=5, column=1, sticky="w", padx=16)
self.lbl_ip = label(body, "", font=FONT_SMALL, fg=ACCENT, bg=BG_PANEL)
self.lbl_ip.grid(row=6, column=1, sticky="w", padx=16)
# ── Test connection ───────────────────────────────────────────────────
test_row = tk.Frame(body, bg=BG_PANEL)
test_row.grid(row=7, column=0, columnspan=2, sticky="w",
padx=16, pady=(4, 8))
self._test_btn = btn(test_row, "⚡ Test Connection", self._run_test)
self._test_btn.pack(side="left")
self._test_lbl = label(test_row, " (not tested yet)",
font=FONT_SMALL, fg=TEXT_SEC, bg=BG_PANEL)
self._test_lbl.pack(side="left", padx=8)
# Detection
section_header(body, "Detection Settings", BG_PANEL).grid(
row=8, column=0, columnspan=2, sticky="ew", pady=(4, 4))
label(body, "Resolution W×H", bg=BG_PANEL).grid(
row=9, column=0, sticky="w", **pad)
res_f = tk.Frame(body, bg=BG_PANEL)
res_f.grid(row=9, column=1, sticky="w", padx=16)
self.v_w = tk.StringVar(value="1280")
self.v_h = tk.StringVar(value="720")
entry(res_f, textvariable=self.v_w, width=6).pack(side="left")
label(res_f, " × ", fg=TEXT_SEC, bg=BG_PANEL).pack(side="left")
entry(res_f, textvariable=self.v_h, width=6).pack(side="left")
label(body, "FPS", bg=BG_PANEL).grid(
row=10, column=0, sticky="w", **pad)
self.v_fps = tk.StringVar(value="5")
entry(body, textvariable=self.v_fps, width=6).grid(
row=10, column=1, sticky="w", padx=16, pady=6)
# Roles
section_header(body, "Roles & Features", BG_PANEL).grid(
row=11, column=0, columnspan=2, sticky="ew", pady=(8, 4))
chk_f = tk.Frame(body, bg=BG_PANEL)
chk_f.grid(row=12, column=0, columnspan=2, sticky="w", padx=16)
self.v_detect = tk.BooleanVar(value=True)
self.v_record = tk.BooleanVar(value=False)
self.v_snap = tk.BooleanVar(value=True)
self.v_audio = tk.BooleanVar(value=False)
for var, txt in [(self.v_detect, "🔍 Detect"),
(self.v_record, "⏺ Record"),
(self.v_snap, "📸 Snapshots"),
(self.v_audio, "🔊 Audio")]:
tk.Checkbutton(chk_f, text=txt, variable=var,
font=FONT_LABEL, bg=BG_PANEL, fg=TEXT_PRI,
selectcolor=BG_CARD,
activebackground=BG_PANEL,
activeforeground=ACCENT).pack(side="left", padx=8)
self.lbl_err = label(body, "", font=FONT_SMALL, fg=ERROR, bg=BG_PANEL)
self.lbl_err.grid(row=13, column=0, columnspan=2,
padx=16, pady=(8, 0))
bf = tk.Frame(main, bg=BG_PANEL)
bf.pack(fill="x", padx=20, pady=(0, 16))
btn(bf, "✕ Cancel", self.destroy).pack(side="right", padx=4)
btn(bf, "✓ Save Camera", self._save,
accent=True).pack(side="right", padx=4)
body.columnconfigure(1, weight=1)
def _run_test(self):
url = self.v_rtsp.get().strip()
if not url:
self._test_lbl.config(text="Enter a URL first.", fg=WARNING)
return
self._test_btn.config(state="disabled")
self._test_lbl.config(text="⏳ Connecting…", fg=TEXT_SEC)
self.update_idletasks()
def worker():
ok, msg = RTSPTester.test(url)
self.after(0, lambda: self._test_done(ok, msg))
threading.Thread(target=worker, daemon=True).start()
def _test_done(self, ok, msg):
self._test_lbl.config(text=msg, fg=SUCCESS if ok else ERROR)
self._test_btn.config(state="normal")
def _on_rtsp_change(self, *_):
ip = extract_ip_from_rtsp(self.v_rtsp.get())
self.lbl_ip.config(
text=f"Detected IP: {ip}" if ip else "", fg=ACCENT)
self._test_lbl.config(text=" (not tested)", fg=TEXT_SEC)
def _populate(self, name: str):
cam = self.config.get_cameras().get(name, {})
self.v_name.set(name)
try:
url = cam["ffmpeg"]["inputs"][0]["path"]
roles = cam["ffmpeg"]["inputs"][0].get("roles", ["detect"])
self.v_rtsp.set(url)
self.v_detect.set("detect" in roles)
self.v_record.set("record" in roles)
self.v_audio.set("audio" in roles)
except (KeyError, IndexError):
pass
d = cam.get("detect", {})
self.v_w.set(str(d.get("width", 1280)))
self.v_h.set(str(d.get("height", 720)))
self.v_fps.set(str(d.get("fps", 5)))
self.v_snap.set(cam.get("snapshots", {}).get("enabled", True))
self.v_record.set(cam.get("record", {}).get("enabled", False))
def _save(self):
name = self.v_name.get().strip()
rtsp = self.v_rtsp.get().strip()
w_str = self.v_w.get().strip()
h_str = self.v_h.get().strip()
fps_s = self.v_fps.get().strip()
if not name:
self.lbl_err.config(text="❌ Camera name is required.")
return
if not validate_camera_name(name):
self.lbl_err.config(
text="❌ Name: letters/numbers/underscores, must start with letter.")
return
if name in self.config.all_names(exclude=self.existing_name):
self.lbl_err.config(text=f"❌ Name '{name}' already exists.")
return
if not rtsp:
self.lbl_err.config(text="❌ RTSP URL is required.")
return
if not validate_rtsp_url(rtsp):
self.lbl_err.config(text="❌ URL must start with rtsp:// or rtsps://")
return
ip = extract_ip_from_rtsp(rtsp)
if ip:
if not validate_ip(ip):
self.lbl_err.config(text=f"❌ Invalid IP: {ip}")
return
if ip in self.config.all_ips(exclude_name=self.existing_name):
self.lbl_err.config(
text=f"❌ IP {ip} already used by another camera.")
return
try:
w, h, fps = int(w_str), int(h_str), int(fps_s)
assert 64 <= w <= 3840 and 64 <= h <= 2160 and 1 <= fps <= 30
except (ValueError, AssertionError):
self.lbl_err.config(text="❌ Invalid resolution or FPS.")
return
roles = [r for r, v in [("detect", self.v_detect),
("record", self.v_record),
("audio", self.v_audio)] if v.get()]
if not roles:
roles = ["detect"]
cam_cfg = self.config.build_camera_cfg(
rtsp_url=rtsp, roles=roles,
detect_w=w, detect_h=h, fps=fps,
record=self.v_record.get(), snapshots=self.v_snap.get()
)
if self.existing_name and self.existing_name != name:
self.config.remove_camera(self.existing_name)
self.config.add_camera(name, cam_cfg)
if self.on_save:
self.on_save()
self.destroy()
# ═══════════════════════════════════════════════════════════════════════════════
# MQTT PAGE
# ═══════════════════════════════════════════════════════════════════════════════
class MQTTPage(tk.Frame):
def __init__(self, parent, config: FrigateConfig, **kw):
super().__init__(parent, bg=BG_DARK, **kw)
self.config = config
self._build()
def _build(self):
wrap = styled_frame(self, bg=BG_PANEL, ht=0)
wrap.pack(fill="both", expand=True, padx=24, pady=20)
section_header(wrap, "MQTT Broker", BG_PANEL).pack(
fill="x", padx=16, pady=(16, 8))
label(wrap,
"MQTT is optional but enables real-time events, Home Assistant\n"
"integration, and mobile notifications via automation platforms.",
font=FONT_SMALL, fg=TEXT_SEC, bg=BG_PANEL,
justify="left").pack(padx=16, anchor="w", pady=(0, 12))
form = tk.Frame(wrap, bg=BG_PANEL)
form.pack(padx=16, fill="x")
mqtt = self.config.get_mqtt()
self.v_enabled = tk.BooleanVar(value=mqtt.get("enabled", False))
tk.Checkbutton(form, text="Enable MQTT",
variable=self.v_enabled,
font=FONT_LABEL, bg=BG_PANEL, fg=TEXT_PRI,
selectcolor=BG_CARD, activebackground=BG_PANEL,
command=self._toggle).grid(
row=0, column=0, columnspan=2,
sticky="w", pady=8)
rows = [
("Host / IP", "v_host", mqtt.get("host", "127.0.0.1"), False),
("Port", "v_port", str(mqtt.get("port", 1883)), False),
("Topic Prefix", "v_topic", mqtt.get("topic_prefix","frigate"), False),
("Username", "v_user", mqtt.get("user", ""), False),
("Password", "v_pass", mqtt.get("password", ""), True),
]
self._vars = {}
self._entries = {}
for i, (lbl_txt, var_name, default, is_pw) in enumerate(rows, 1):
label(form, lbl_txt, bg=BG_PANEL).grid(
row=i, column=0, sticky="w", padx=4, pady=5)
v = tk.StringVar(value=str(default))
self._vars[var_name] = v
e = entry(form, textvariable=v, width=28,
show="•" if is_pw else "")
e.grid(row=i, column=1, sticky="ew", padx=8, pady=5)
self._entries[var_name] = e
form.columnconfigure(1, weight=1)
self._toggle()
def _toggle(self):
s = "normal" if self.v_enabled.get() else "disabled"
for e in self._entries.values():
e.config(state=s)
def apply(self):
self.config.set_mqtt({
"enabled": self.v_enabled.get(),
"host": self._vars["v_host"].get().strip(),
"port": int(self._vars["v_port"].get().strip() or 1883),
"topic_prefix": self._vars["v_topic"].get().strip(),
"client_id": "frigate",
"user": self._vars["v_user"].get().strip(),
"password": self._vars["v_pass"].get().strip(),
})
# ═══════════════════════════════════════════════════════════════════════════════
# DETECTOR PAGE
# ═══════════════════════════════════════════════════════════════════════════════
class DetectorPage(tk.Frame):
def __init__(self, parent, config: FrigateConfig, **kw):
super().__init__(parent, bg=BG_DARK, **kw)
self.config = config
self._build()
def _build(self):
wrap = styled_frame(self, bg=BG_PANEL, ht=0)
wrap.pack(fill="both", expand=True, padx=24, pady=20)
section_header(wrap, "Object Detector", BG_PANEL).pack(
fill="x", padx=16, pady=(16, 8))
self.v_det = tk.StringVar(value=self.config.get_detector_type())
for val, display, desc in [
("cpu", "🖥 CPU", "Any system. Slower, no special hardware."),
("edgetpu", "⚡ Coral TPU", "Google Coral USB/PCIe. Very fast, ~4W."),
("openvino", "🔷 OpenVINO", "Intel iGPU/CPU via OpenVINO runtime."),
("onnx", "🧠 ONNX", "ONNX runtime — various models/hardware."),
]:
row = tk.Frame(wrap, bg=BG_CARD,
highlightbackground=BORDER, highlightthickness=1)
row.pack(fill="x", padx=16, pady=4)
tk.Radiobutton(row, text=display, variable=self.v_det, value=val,
font=("Courier New", 11, "bold"),
bg=BG_CARD, fg=TEXT_PRI, selectcolor=BG_DARK,
activebackground=BG_CARD).pack(
side="left", padx=12, pady=8)
label(row, desc, font=FONT_SMALL,
fg=TEXT_SEC, bg=BG_CARD).pack(side="left", padx=4)
thr_f = tk.Frame(wrap, bg=BG_PANEL)
thr_f.pack(padx=16, pady=8, anchor="w")
label(thr_f, "CPU threads (for CPU detector):",
bg=BG_PANEL).pack(side="left")
self.v_threads = tk.StringVar(value="3")
entry(thr_f, textvariable=self.v_threads, width=4).pack(
side="left", padx=8)
def apply(self):
try:
t = int(self.v_threads.get())
except ValueError:
t = 3
self.config.set_detector(self.v_det.get(), t)
# ═══════════════════════════════════════════════════════════════════════════════
# CAMERAS PAGE
# ═══════════════════════════════════════════════════════════════════════════════
class CamerasPage(tk.Frame):
def __init__(self, parent, config: FrigateConfig, toast: Toast, **kw):
super().__init__(parent, bg=BG_DARK, **kw)
self.config = config
self.toast = toast
self._build()
self.refresh()
def _build(self):
tb = tk.Frame(self, bg=BG_DARK)
tb.pack(fill="x", padx=24, pady=(16, 0))
label(tb, "CAMERAS", font=("Courier New", 14, "bold"),
fg=ACCENT, bg=BG_DARK).pack(side="left")
btn(tb, "+ Add Camera",
self._add_camera, accent=True).pack(side="right", padx=(4, 0))
btn(tb, "🔍 Scan Network",
self._start_scan).pack(side="right", padx=4)
self.list_frame = styled_frame(self, bg=BG_PANEL, ht=1)
self.list_frame.pack(fill="both", expand=True, padx=24, pady=12)
canvas = tk.Canvas(self.list_frame, bg=BG_PANEL, highlightthickness=0)
sb = ttk.Scrollbar(self.list_frame, orient="vertical",
command=canvas.yview)
self.inner = tk.Frame(canvas, bg=BG_PANEL)
self.inner.bind("<Configure>",
lambda e: canvas.configure(
scrollregion=canvas.bbox("all")))
canvas.create_window((0, 0), window=self.inner, anchor="nw")
canvas.configure(yscrollcommand=sb.set)
canvas.pack(side="left", fill="both", expand=True)
sb.pack(side="right", fill="y")
self.canvas = canvas
def refresh(self):
for w in self.inner.winfo_children():
w.destroy()
cams = self.config.get_cameras()
if not cams:
label(self.inner,
"No cameras configured yet.\n"
"Click '+ Add Camera' or '🔍 Scan Network' to begin.",
font=FONT_SUB, fg=TEXT_SEC, bg=BG_PANEL,
justify="center").pack(pady=60)
return
for name, cam in cams.items():
self._camera_card(name, cam)
def _camera_card(self, name: str, cam: dict):
card = styled_frame(self.inner, bg=BG_CARD, ht=1)
card.pack(fill="x", padx=12, pady=5)
try:
url = cam["ffmpeg"]["inputs"][0]["path"]
roles = cam["ffmpeg"]["inputs"][0].get("roles", [])
ip = extract_ip_from_rtsp(url) or "?"
except (KeyError, IndexError):
url, roles, ip = "unknown", [], "?"
d = cam.get("detect", {})
w_res = d.get("width", "?")
h_res = d.get("height", "?")
fps = d.get("fps", "?")
left = tk.Frame(card, bg=BG_CARD)
left.pack(side="left", fill="both", expand=True, padx=14, pady=10)
label(left, f"📷 {name}",
font=("Courier New", 12, "bold"),
fg=TEXT_PRI, bg=BG_CARD).pack(anchor="w")
label(left, f"IP: {ip} {w_res}×{h_res} @{fps}fps",
font=FONT_SMALL, fg=TEXT_SEC, bg=BG_CARD).pack(anchor="w", pady=2)
label(left, " ".join(f"[{r}]" for r in roles),
font=FONT_SMALL, fg=ACCENT, bg=BG_CARD).pack(anchor="w")
right = tk.Frame(card, bg=BG_CARD)
right.pack(side="right", padx=10, pady=10)
test_lbl = label(right, "", font=FONT_SMALL, fg=TEXT_SEC, bg=BG_CARD)
test_lbl.pack(anchor="e", pady=(0, 4))
def _test_cam(u=url, lbl=test_lbl):
lbl.config(text="⏳…", fg=TEXT_SEC)
self.update_idletasks()
def worker():
ok, msg = RTSPTester.test(u)
lbl.after(0, lambda: lbl.config(
text="✓ OK" if ok else "✗ Fail",
fg=SUCCESS if ok else ERROR))
threading.Thread(target=worker, daemon=True).start()
btn_row = tk.Frame(right, bg=BG_CARD)
btn_row.pack()
btn(btn_row, "⚡ Test", _test_cam).pack(side="left", padx=2)
btn(btn_row, "✏ Edit",
lambda n=name: self._edit_camera(n)).pack(side="left", padx=2)
btn(btn_row, "🗑",
lambda n=name: self._remove_camera(n),
danger=True).pack(side="left", padx=2)
def _add_camera(self):
CameraDialog(self, self.config, on_save=self.refresh)
def _edit_camera(self, name: str):
CameraDialog(self, self.config, existing_name=name,
on_save=self.refresh)
def _remove_camera(self, name: str):
if not messagebox.askyesno("Remove Camera",
f"Remove camera '{name}'?", parent=self):
return
self.config.remove_camera(name)
self.refresh()
# Persist immediately so the deletion survives a restart.
# If the config file is root-owned we catch the error here so the
# in-memory state (already updated) stays consistent and the user
# gets a clear explanation rather than a bare Python traceback.
path = self._get_config_path()
if path:
try:
self.config.save(path)
self.toast.show(f"Camera '{name}' removed.", color=WARNING)
except PermissionError as e:
self.toast.show(f"Removed in memory — save failed (see below)",
color=ERROR, duration=4000)
messagebox.showerror(
"Permission Error",
f"Camera '{name}' was removed from the current session but "
f"could not be written to disk:\n\n{e}",
parent=self
)
else:
self.toast.show(f"Camera '{name}' removed (unsaved session).",
color=WARNING)
def _get_config_path(self) -> str:
"""Walk up to the root FrigateWizard window to get the config path."""
w = self
while w is not None:
if hasattr(w, "config_path"):
return w.config_path.get().strip()
try:
w = w.master
except Exception:
break
return ""
# ── Network scan flow ─────────────────────────────────────────────────────
def _start_scan(self):
subnet = get_local_subnet()
use_nmap = NetworkScanner.check_nmap()
consent = ScanConsentDialog(self, subnet, use_nmap)
self.wait_window(consent)
if consent.result is None:
return
self._run_scan(subnet, consent.result)
def _run_scan(self, subnet: str, auth: tuple):
"""Launch scan in a background thread and show an inline banner."""
self._scan_cancelled = False
# ── Build the inline scan banner ──────────────────────────────────────
# Sits between the toolbar and the camera list; destroyed when done.
banner = tk.Frame(self, bg=ACCENT2,
highlightbackground=BORDER, highlightthickness=1)
# Insert it after the toolbar (index 1) and before the list frame
banner.pack(fill="x", padx=24, pady=(0, 4), before=self.list_frame)
self._scan_banner = banner
# Row 1 – icon + status text + cancel button
row1 = tk.Frame(banner, bg=ACCENT2)
row1.pack(fill="x", padx=12, pady=(8, 2))
self._scan_icon_lbl = tk.Label(row1, text="⬡",
font=("Courier New", 14, "bold"),
fg=BG_DARK, bg=ACCENT2)
self._scan_icon_lbl.pack(side="left")
self._scan_status_lbl = tk.Label(row1,
text=f"Scanning {subnet}…",
font=FONT_LABEL, fg=BG_DARK,
bg=ACCENT2, anchor="w")
self._scan_status_lbl.pack(side="left", padx=10, fill="x",
expand=True)
cancel_btn = tk.Button(row1, text="✕ Cancel",
font=FONT_SMALL, bg=BG_DARK, fg=ERROR,
relief="flat", cursor="hand2",
activebackground=BTN_HOV,
command=self._cancel_scan)
cancel_btn.pack(side="right")
# Row 2 – progress bar
self._scan_bar = ttk.Progressbar(banner, length=400,
mode="determinate")
self._scan_bar.pack(fill="x", padx=12, pady=(2, 8))
# Start the animated pulse (spins the icon while bar is indeterminate)
self._scan_pulse_step = 0
self._pulse_icons = ["⬡", "◈", "⬡", "◈"]
self._animate_scan_icon()
# ── Start scan thread ─────────────────────────────────────────────────
method, pw = auth
has_nmap = NetworkScanner.check_nmap()
threading.Thread(
target=self._scan_worker,
args=(subnet, method, pw, has_nmap),
daemon=True
).start()
def _animate_scan_icon(self):
"""Pulse the hex icon every 400 ms until scan finishes."""
if not hasattr(self, "_scan_banner") or \
not self._scan_banner.winfo_exists():
return
icons = ["⬡", "◈", "⊛", "◈"]
self._scan_pulse_step = (self._scan_pulse_step + 1) % len(icons)
self._scan_icon_lbl.config(text=icons[self._scan_pulse_step])
self._scan_anim_id = self.after(380, self._animate_scan_icon)
def _cancel_scan(self):
self._scan_cancelled = True
self._hide_scan_banner()
def _hide_scan_banner(self):
if hasattr(self, "_scan_anim_id"):
try:
self.after_cancel(self._scan_anim_id)
except Exception:
pass
if hasattr(self, "_scan_banner"):
try:
self._scan_banner.destroy()
except Exception:
pass
def _scan_progress(self, cur, total, msg):
if self._scan_cancelled:
return
def _upd():
if not hasattr(self, "_scan_banner") or \
not self._scan_banner.winfo_exists():
return
self._scan_status_lbl.config(text=msg)
if total > 0:
self._scan_bar["value"] = int(cur / total * 100)
self.after(0, _upd)
def _scan_worker(self, subnet, method, pw, has_nmap):
try:
if method == "sudo":
results = NetworkScanner.scan_with_nmap(
subnet, sudo_pw=pw, progress_cb=self._scan_progress)
elif has_nmap:
results = NetworkScanner.scan_with_nmap(
subnet, progress_cb=self._scan_progress)
else:
results = NetworkScanner.scan_subnet(
subnet, progress_cb=self._scan_progress)
except Exception:
results = []
if not self._scan_cancelled:
self.after(0, lambda: self._scan_done(results))
def _scan_done(self, results: list):
self._hide_scan_banner()
ScanResultsDialog(self, results, self.config, on_added=self.refresh)
# ═══════════════════════════════════════════════════════════════════════════════
# YAML PREVIEW PAGE
# ═══════════════════════════════════════════════════════════════════════════════
class PreviewPage(tk.Frame):
def __init__(self, parent, config: FrigateConfig, **kw):
super().__init__(parent, bg=BG_DARK, **kw)
self.config = config
self._build()
def _build(self):
wrap = tk.Frame(self, bg=BG_DARK)
wrap.pack(fill="both", expand=True, padx=24, pady=16)
label(wrap, "CONFIG PREVIEW",
font=("Courier New", 14, "bold"),
fg=ACCENT, bg=BG_DARK).pack(anchor="w", pady=(0, 8))
self.text = tk.Text(wrap, font=FONT_MONO, bg=BG_CARD, fg=TEXT_PRI,
insertbackground=ACCENT, relief="flat",
highlightbackground=BORDER, highlightthickness=1,
wrap="none", state="disabled")
sb_y = ttk.Scrollbar(wrap, command=self.text.yview)
sb_x = ttk.Scrollbar(wrap, orient="horizontal",
command=self.text.xview)
self.text.config(yscrollcommand=sb_y.set,
xscrollcommand=sb_x.set)
self.text.pack(side="left", fill="both", expand=True)
sb_y.pack(side="right", fill="y")
sb_x.pack(side="bottom", fill="x")
def refresh(self):
content = yaml.dump(self.config.data,
default_flow_style=False, sort_keys=False)
self.text.config(state="normal")
self.text.delete("1.0", "end")
self.text.insert("1.0", content)
self.text.config(state="disabled")
# ═══════════════════════════════════════════════════════════════════════════════
# MAIN APPLICATION
# ═══════════════════════════════════════════════════════════════════════════════
class FrigateWizard(tk.Tk):
def __init__(self):
super().__init__()
self.title("Frigate Config Wizard")
self.geometry("960x700")
self.minsize(820, 580)
self.configure(bg=BG_DARK)
self.config_obj = FrigateConfig()
self.config_path = tk.StringVar(value=DEFAULT_CONFIG_PATH)
self.toast = Toast(self)
if os.path.exists(DEFAULT_CONFIG_PATH):
try:
self.config_obj.load(DEFAULT_CONFIG_PATH)
except Exception:
pass
self._build_ui()
def _build_ui(self):
topbar = tk.Frame(self, bg=ACCENT2, height=56)
topbar.pack(fill="x")
topbar.pack_propagate(False)
tk.Label(topbar, text="⬡ FRIGATE CONFIG WIZARD",
font=("Courier New", 16, "bold"),
fg=BG_DARK, bg=ACCENT2).pack(side="left", padx=20)
path_f = tk.Frame(topbar, bg=ACCENT2)
path_f.pack(side="right", padx=12)
entry(path_f, textvariable=self.config_path, width=36,
bg=BG_DARK, fg=TEXT_SEC).pack(side="left", ipady=2)
btn(path_f, "📂", self._browse_path, bg=BG_DARK).pack(
side="left", padx=2)
body = tk.Frame(self, bg=BG_DARK)
body.pack(fill="both", expand=True)
sidebar = tk.Frame(body, bg=BG_PANEL, width=184)
sidebar.pack(side="left", fill="y")
sidebar.pack_propagate(False)
self.content = tk.Frame(body, bg=BG_DARK)
self.content.pack(side="left", fill="both", expand=True)
botbar = tk.Frame(self, bg=BG_PANEL, height=48)
botbar.pack(fill="x", side="bottom")
botbar.pack_propagate(False)
btn(botbar, "💾 Save Config",
self._save_config, accent=True).pack(side="right", padx=12, pady=8)
btn(botbar, "📥 Load Config",
self._load_config).pack(side="right", padx=4, pady=8)
btn(botbar, "👁 Preview YAML",
self._show_preview).pack(side="right", padx=4, pady=8)
self.status_lbl = label(botbar, "Ready.", font=FONT_SMALL,
fg=TEXT_SEC, bg=BG_PANEL)
self.status_lbl.pack(side="left", padx=12)
self.pages = {}
self._active_page = None
for display, key in [("🏠 Overview", "overview"),
("📡 MQTT", "mqtt"),
("🧠 Detector", "detector"),
("📷 Cameras", "cameras")]:
b = tk.Button(sidebar, text=display, font=FONT_LABEL,
bg=BG_PANEL, fg=TEXT_SEC, relief="flat",
anchor="w", padx=16, pady=10, cursor="hand2",
activebackground=BG_CARD, activeforeground=ACCENT,
command=lambda k=key: self._show_page(k))
b.pack(fill="x")
self.pages[key] = {"btn": b}
tk.Frame(sidebar, bg=BORDER, height=1).pack(fill="x", pady=8)
label(sidebar, "v1.1 | Frigate 0.14+",
font=FONT_SMALL, fg=TEXT_SEC, bg=BG_PANEL).pack(padx=12)
self.overview_page = self._build_overview()
self.mqtt_page = MQTTPage(self.content, self.config_obj)
self.detector_page = DetectorPage(self.content, self.config_obj)
self.cameras_page = CamerasPage(self.content, self.config_obj, self.toast)
self.preview_page = PreviewPage(self.content, self.config_obj)
self.page_frames = {
"overview": self.overview_page,
"mqtt": self.mqtt_page,
"detector": self.detector_page,
"cameras": self.cameras_page,
}
self._show_page("overview")
def _build_overview(self):
f = tk.Frame(self.content, bg=BG_DARK)
hero = tk.Frame(f, bg=BG_DARK)
hero.pack(pady=(32, 14))
tk.Label(hero, text="⬡", font=("Courier New", 48),
fg=ACCENT, bg=BG_DARK).pack()
tk.Label(hero, text="FRIGATE CONFIG WIZARD",
font=("Courier New", 20, "bold"),
fg=TEXT_PRI, bg=BG_DARK).pack()
tk.Label(hero, text="guided setup for your home NVR",
font=("Courier New", 11),
fg=TEXT_SEC, bg=BG_DARK).pack(pady=3)
steps_wrap = tk.Frame(f, bg=BG_DARK)
steps_wrap.pack()
for num, title, desc in [
("1", "MQTT", "Optional broker\nfor HA integration"),
("2", "Detector", "CPU or hardware\nAI acceleration"),
("3", "Cameras", "Add cameras or\nscan the network"),
("4", "Save", "Export config.yml\nto Frigate volume"),
]:
card = styled_frame(steps_wrap, bg=BG_CARD, ht=1)
card.pack(side="left", padx=8, ipadx=12, ipady=10)
tk.Label(card, text=num, font=("Courier New", 24, "bold"),
fg=ACCENT, bg=BG_CARD).pack()
label(card, title, font=("Courier New", 11, "bold"),
fg=TEXT_PRI, bg=BG_CARD).pack(pady=2)
label(card, desc, font=FONT_SMALL, fg=TEXT_SEC,
bg=BG_CARD, justify="center").pack()
qa = tk.Frame(f, bg=BG_DARK)
qa.pack(pady=16)
btn(qa, "→ Start: MQTT Setup",
lambda: self._show_page("mqtt"), accent=True).pack(
side="left", padx=6)
btn(qa, "📷 Jump to Cameras",
lambda: self._show_page("cameras")).pack(side="left", padx=6)
info = tk.Frame(f, bg=BG_PANEL,
highlightbackground=BORDER, highlightthickness=1)
info.pack(padx=40, pady=3, fill="x")
label(info,
"Config path : /opt/frigate/config.yml\n"
"Docker volume : -v /opt/frigate:/config",
font=FONT_SMALL, fg=TEXT_SEC, bg=BG_PANEL,
justify="left").pack(padx=16, pady=8, anchor="w")
# Dependency status
deps = tk.Frame(f, bg=BG_PANEL,
highlightbackground=BORDER, highlightthickness=1)
deps.pack(padx=40, pady=3, fill="x")
label(deps, "Detected optional dependencies:",
font=FONT_SMALL, fg=TEXT_SEC,
bg=BG_PANEL).pack(padx=16, pady=(8, 2), anchor="w")
row = tk.Frame(deps, bg=BG_PANEL)
row.pack(padx=16, pady=(0, 8), anchor="w")
for dep_name, present, note in [
("ffprobe", shutil.which("ffprobe") is not None,
"RTSP stream test"),
("nmap", shutil.which("nmap") is not None,
"Fast network scan"),
("OpenCV", _HAVE_CV2, "Fallback stream test"),
("onvif-zeep", _HAVE_ONVIF, "ONVIF discovery"),
]:
label(row,
f"{'✓' if present else '○'} {dep_name} ",
font=FONT_SMALL,
fg=SUCCESS if present else TEXT_SEC,
bg=BG_PANEL).pack(side="left")
return f
def _show_page(self, key: str):
if self._active_page == "mqtt":
try: self.mqtt_page.apply()
except Exception: pass
if self._active_page == "detector":
try: self.detector_page.apply()
except Exception: pass
for frame in self.page_frames.values():
frame.pack_forget()
self.preview_page.pack_forget()
for k, info in self.pages.items():
info["btn"].config(
bg=BG_CARD if k == key else BG_PANEL,
fg=ACCENT if k == key else TEXT_SEC)
frame = self.page_frames.get(key)
if frame:
frame.pack(fill="both", expand=True)
self._active_page = key
def _show_preview(self):
try: self.mqtt_page.apply()
except Exception: pass
try: self.detector_page.apply()
except Exception: pass
self.preview_page.refresh()
for frame in self.page_frames.values():
frame.pack_forget()
self.preview_page.pack(fill="both", expand=True)
def _browse_path(self):
path = filedialog.asksaveasfilename(
defaultextension=".yml",
filetypes=[("YAML", "*.yml *.yaml"), ("All", "*.*")],
initialfile="config.yml",
title="Choose config.yml save location"
)
if path:
self.config_path.set(path)
def _save_config(self):
try: self.mqtt_page.apply()
except Exception: pass
try: self.detector_page.apply()
except Exception: pass
path = self.config_path.get().strip()
if not path:
messagebox.showerror("Error", "Please set a config file path.")
return
if not self.config_obj.get_cameras():
if not messagebox.askyesno("No Cameras",
"No cameras configured. Save anyway?"):
return
try:
self.config_obj.save(path)
self.toast.show(f"✓ Saved to {path}", color=SUCCESS)
self.status_lbl.config(text=f"Saved: {path}", fg=SUCCESS)
except Exception as e:
messagebox.showerror("Save Failed", str(e))
def _load_config(self):
path = filedialog.askopenfilename(
filetypes=[("YAML", "*.yml *.yaml"), ("All", "*.*")],
title="Load existing config.yml"
)
if not path:
return
try:
self.config_obj.load(path)
self.config_path.set(path)
self.cameras_page.refresh()
self.toast.show(f"✓ Loaded: {os.path.basename(path)}", color=SUCCESS)
self._show_page("cameras")
except Exception as e:
messagebox.showerror("Load Failed", str(e))
# ═══════════════════════════════════════════════════════════════════════════════
if __name__ == "__main__":
app = FrigateWizard()
app.mainloop()