#!/usr/bin/env python3 """ Loop Detector - Detects when a SWF has entered a repeating loop. Strategy: Periodically capture a downsampled screenshot of the Ruffle window, hash it, and compare against recent hashes. A repeated hash sequence indicates a loop has started. """ import hashlib import logging import subprocess import time from collections import deque from dataclasses import dataclass, field from typing import Optional log = logging.getLogger("loop_detector") @dataclass class FrameRecord: timestamp: float hash: str class LoopDetector: """ Detects animation loops by comparing periodic frame hashes. Parameters ---------- window_id : str X11 window ID of the Ruffle window (hex string like '0x1234abc'). poll_interval : float Seconds between frame captures (default 0.4 — 2-3 samples/sec is enough). sequence_length : int Number of consecutive matching frames required to confirm a loop (default 4). hash_history : int How many past hashes to retain for comparison (default 300 ≈ 2 min at 0.4s). thumbnail_size : str Resolution to downsample frames to before hashing (smaller = faster, less sensitive to minor rendering differences). """ def __init__( self, window_id: str, poll_interval: float = 0.4, sequence_length: int = 4, hash_history: int = 300, thumbnail_size: str = "64x64", ): self.window_id = window_id self.poll_interval = poll_interval self.sequence_length = sequence_length self.thumbnail_size = thumbnail_size self._history: deque[FrameRecord] = deque(maxlen=hash_history) self._loop_detected_at: Optional[float] = None self._running = False # ------------------------------------------------------------------ # Public API # ------------------------------------------------------------------ def start(self): """Begin background polling (call in a thread).""" self._running = True self._poll_loop() def stop(self): self._running = False def loop_detected(self) -> bool: return self._loop_detected_at is not None def loop_detected_at(self) -> Optional[float]: """Wall-clock time when the loop was first detected, or None.""" return self._loop_detected_at def reset(self): self._history.clear() self._loop_detected_at = None # ------------------------------------------------------------------ # Internal # ------------------------------------------------------------------ def _poll_loop(self): while self._running: h = self._capture_hash() if h: now = time.monotonic() record = FrameRecord(timestamp=now, hash=h) self._check_for_loop(record) self._history.append(record) time.sleep(self.poll_interval) def _capture_hash(self) -> Optional[str]: """ Screenshot the Ruffle window (downsampled) and return an MD5 hash. Uses ImageMagick `import` — falls back to `scrot` if unavailable. """ try: result = subprocess.run( [ "import", "-window", self.window_id, "-resize", self.thumbnail_size, "-depth", "8", "png:-", ], capture_output=True, timeout=2.0, ) if result.returncode == 0 and result.stdout: return hashlib.md5(result.stdout).hexdigest() else: log.debug(f"import failed: {result.stderr.decode()[:200]}") return self._capture_hash_scrot() except FileNotFoundError: return self._capture_hash_scrot() except subprocess.TimeoutExpired: log.warning("Frame capture timed out.") return None def _capture_hash_scrot(self) -> Optional[str]: """Fallback: use scrot to capture window by ID.""" try: import tempfile, os with tempfile.NamedTemporaryFile(suffix=".png", delete=False) as f: tmp = f.name result = subprocess.run( ["scrot", "--window", self.window_id, tmp], capture_output=True, timeout=2.0, ) if result.returncode == 0: # Downsample with ffmpeg then hash result2 = subprocess.run( [ "ffmpeg", "-y", "-i", tmp, "-vf", f"scale={self.thumbnail_size.replace('x', ':')}", "-frames:v", "1", "-f", "rawvideo", "-pix_fmt", "rgb24", "pipe:1", ], capture_output=True, timeout=3.0, ) os.unlink(tmp) if result2.returncode == 0 and result2.stdout: return hashlib.md5(result2.stdout).hexdigest() return None except Exception as e: log.debug(f"scrot fallback failed: {e}") return None def _check_for_loop(self, current: FrameRecord): if self._loop_detected_at is not None: return # already detected # Include the current (not-yet-appended) record in the full timeline history_list = list(self._history) + [current] if len(history_list) < self.sequence_length * 2: return # not enough history yet # Build a sliding window of the last `sequence_length` hashes recent = [r.hash for r in history_list[-self.sequence_length:]] # Search backwards in the earlier portion for a matching sequence search_range = history_list[: -self.sequence_length] for i in range(len(search_range) - self.sequence_length + 1): window = [r.hash for r in search_range[i : i + self.sequence_length]] if window == recent: self._loop_detected_at = current.timestamp log.info( f"Loop detected at t={current.timestamp:.1f}s " f"(matched sequence starting at history index {i})" ) return class StillnessDetector: """ Simpler alternative: detect when the video has been completely static for a given duration. Uses FFmpeg freezedetect on the recorded file. This runs as a post-processing step on a raw recording. """ @staticmethod def find_freeze_start(video_path: str, noise_tolerance: float = 0.001, min_duration: float = 5.0) -> Optional[float]: """ Returns the timestamp (seconds) where a freeze of >= min_duration starts, or None if no such freeze exists. """ result = subprocess.run( [ "ffmpeg", "-i", video_path, "-vf", f"freezedetect=n={noise_tolerance}:d={min_duration}", "-f", "null", "-", ], capture_output=True, text=True, timeout=300, ) output = result.stderr for line in output.splitlines(): if "freeze_start" in line: try: t = float(line.split("freeze_start:")[1].strip().split()[0]) log.info(f"Freeze detected at {t:.2f}s in {video_path}") return t except (IndexError, ValueError): pass return None @staticmethod def trim_at_freeze(input_path: str, output_path: str, freeze_start: float, grace: float = 5.0): """Trim a video to end `grace` seconds after the freeze start.""" end_time = freeze_start + grace subprocess.run( [ "ffmpeg", "-y", "-i", input_path, "-t", str(end_time), "-c:v", "libx264", "-crf", "18", "-c:a", "aac", output_path, ], check=True, ) log.info(f"Trimmed video saved to {output_path} (end={end_time:.1f}s)")