Files
swf-convertion/loop_detector.py
2026-03-24 18:11:39 -06:00

233 lines
8.0 KiB
Python

#!/usr/bin/env python3
"""
Loop Detector - Detects when a SWF has entered a repeating loop.
Strategy: Periodically capture a downsampled screenshot of the Ruffle window,
hash it, and compare against recent hashes. A repeated hash sequence indicates
a loop has started.
"""
import hashlib
import logging
import subprocess
import time
from collections import deque
from dataclasses import dataclass, field
from typing import Optional
log = logging.getLogger("loop_detector")
@dataclass
class FrameRecord:
timestamp: float
hash: str
class LoopDetector:
"""
Detects animation loops by comparing periodic frame hashes.
Parameters
----------
window_id : str
X11 window ID of the Ruffle window (hex string like '0x1234abc').
poll_interval : float
Seconds between frame captures (default 0.4 — 2-3 samples/sec is enough).
sequence_length : int
Number of consecutive matching frames required to confirm a loop (default 4).
hash_history : int
How many past hashes to retain for comparison (default 300 ≈ 2 min at 0.4s).
thumbnail_size : str
Resolution to downsample frames to before hashing (smaller = faster, less
sensitive to minor rendering differences).
"""
def __init__(
self,
window_id: str,
poll_interval: float = 0.4,
sequence_length: int = 4,
hash_history: int = 300,
thumbnail_size: str = "64x64",
):
self.window_id = window_id
self.poll_interval = poll_interval
self.sequence_length = sequence_length
self.thumbnail_size = thumbnail_size
self._history: deque[FrameRecord] = deque(maxlen=hash_history)
self._loop_detected_at: Optional[float] = None
self._running = False
# ------------------------------------------------------------------
# Public API
# ------------------------------------------------------------------
def start(self):
"""Begin background polling (call in a thread)."""
self._running = True
self._poll_loop()
def stop(self):
self._running = False
def loop_detected(self) -> bool:
return self._loop_detected_at is not None
def loop_detected_at(self) -> Optional[float]:
"""Wall-clock time when the loop was first detected, or None."""
return self._loop_detected_at
def reset(self):
self._history.clear()
self._loop_detected_at = None
# ------------------------------------------------------------------
# Internal
# ------------------------------------------------------------------
def _poll_loop(self):
while self._running:
h = self._capture_hash()
if h:
now = time.monotonic()
record = FrameRecord(timestamp=now, hash=h)
self._check_for_loop(record)
self._history.append(record)
time.sleep(self.poll_interval)
def _capture_hash(self) -> Optional[str]:
"""
Screenshot the Ruffle window (downsampled) and return an MD5 hash.
Uses ImageMagick `import` — falls back to `scrot` if unavailable.
"""
try:
result = subprocess.run(
[
"import",
"-window", self.window_id,
"-resize", self.thumbnail_size,
"-depth", "8",
"png:-",
],
capture_output=True,
timeout=2.0,
)
if result.returncode == 0 and result.stdout:
return hashlib.md5(result.stdout).hexdigest()
else:
log.debug(f"import failed: {result.stderr.decode()[:200]}")
return self._capture_hash_scrot()
except FileNotFoundError:
return self._capture_hash_scrot()
except subprocess.TimeoutExpired:
log.warning("Frame capture timed out.")
return None
def _capture_hash_scrot(self) -> Optional[str]:
"""Fallback: use scrot to capture window by ID."""
try:
import tempfile, os
with tempfile.NamedTemporaryFile(suffix=".png", delete=False) as f:
tmp = f.name
result = subprocess.run(
["scrot", "--window", self.window_id, tmp],
capture_output=True,
timeout=2.0,
)
if result.returncode == 0:
# Downsample with ffmpeg then hash
result2 = subprocess.run(
[
"ffmpeg", "-y", "-i", tmp,
"-vf", f"scale={self.thumbnail_size.replace('x', ':')}",
"-frames:v", "1", "-f", "rawvideo", "-pix_fmt", "rgb24", "pipe:1",
],
capture_output=True,
timeout=3.0,
)
os.unlink(tmp)
if result2.returncode == 0 and result2.stdout:
return hashlib.md5(result2.stdout).hexdigest()
return None
except Exception as e:
log.debug(f"scrot fallback failed: {e}")
return None
def _check_for_loop(self, current: FrameRecord):
if self._loop_detected_at is not None:
return # already detected
# Include the current (not-yet-appended) record in the full timeline
history_list = list(self._history) + [current]
if len(history_list) < self.sequence_length * 2:
return # not enough history yet
# Build a sliding window of the last `sequence_length` hashes
recent = [r.hash for r in history_list[-self.sequence_length:]]
# Search backwards in the earlier portion for a matching sequence
search_range = history_list[: -self.sequence_length]
for i in range(len(search_range) - self.sequence_length + 1):
window = [r.hash for r in search_range[i : i + self.sequence_length]]
if window == recent:
self._loop_detected_at = current.timestamp
log.info(
f"Loop detected at t={current.timestamp:.1f}s "
f"(matched sequence starting at history index {i})"
)
return
class StillnessDetector:
"""
Simpler alternative: detect when the video has been completely static
for a given duration. Uses FFmpeg freezedetect on the recorded file.
This runs as a post-processing step on a raw recording.
"""
@staticmethod
def find_freeze_start(video_path: str, noise_tolerance: float = 0.001, min_duration: float = 5.0) -> Optional[float]:
"""
Returns the timestamp (seconds) where a freeze of >= min_duration starts,
or None if no such freeze exists.
"""
result = subprocess.run(
[
"ffmpeg", "-i", video_path,
"-vf", f"freezedetect=n={noise_tolerance}:d={min_duration}",
"-f", "null", "-",
],
capture_output=True,
text=True,
timeout=300,
)
output = result.stderr
for line in output.splitlines():
if "freeze_start" in line:
try:
t = float(line.split("freeze_start:")[1].strip().split()[0])
log.info(f"Freeze detected at {t:.2f}s in {video_path}")
return t
except (IndexError, ValueError):
pass
return None
@staticmethod
def trim_at_freeze(input_path: str, output_path: str, freeze_start: float, grace: float = 5.0):
"""Trim a video to end `grace` seconds after the freeze start."""
end_time = freeze_start + grace
subprocess.run(
[
"ffmpeg", "-y",
"-i", input_path,
"-t", str(end_time),
"-c:v", "libx264", "-crf", "18",
"-c:a", "aac",
output_path,
],
check=True,
)
log.info(f"Trimmed video saved to {output_path} (end={end_time:.1f}s)")