#!/usr/bin/env python3 """ Recorder - Launches Ruffle, captures the window with FFmpeg, injects clicks, and stops recording based on loop detection or configured duration. """ import logging import os import signal import subprocess import threading import time import tempfile from pathlib import Path from typing import List, Optional from loop_detector import LoopDetector, StillnessDetector from interaction import InteractionController log = logging.getLogger("recorder") # Default Ruffle binary locations to try (in order) RUFFLE_CANDIDATES = [ "ruffle", os.path.expanduser("~/.local/bin/ruffle"), "/usr/local/bin/ruffle", "/opt/ruffle/ruffle", os.path.expanduser("~/ruffle"), ] # Default Lightspark binary (fallback emulator) LIGHTSPARK_CANDIDATES = [ "lightspark", "/usr/local/bin/lightspark", ] def find_binary(candidates: list) -> Optional[str]: for c in candidates: try: result = subprocess.run(["which", c], capture_output=True, text=True) if result.returncode == 0: return c except Exception: pass if os.path.isfile(c) and os.access(c, os.X_OK): return c return None def find_window_id(title_fragment: str, timeout: float = 15.0) -> Optional[str]: """Poll xdotool until a window with the given title fragment appears.""" deadline = time.monotonic() + timeout while time.monotonic() < deadline: try: result = subprocess.run( ["xdotool", "search", "--name", title_fragment], capture_output=True, text=True, timeout=2.0, ) if result.returncode == 0 and result.stdout.strip(): wid = result.stdout.strip().splitlines()[0] log.info(f"Found window '{title_fragment}': ID={wid}") return wid except Exception: pass time.sleep(0.5) log.error(f"Window '{title_fragment}' not found within {timeout}s.") return None def get_window_geometry(window_id: str) -> Optional[dict]: """Return {'x': int, 'y': int, 'width': int, 'height': int} for a window.""" try: result = subprocess.run( ["xdotool", "getwindowgeometry", "--shell", window_id], capture_output=True, text=True, timeout=3.0, ) if result.returncode == 0: geo = {} for line in result.stdout.splitlines(): if "=" in line: k, v = line.split("=", 1) geo[k.strip().lower()] = int(v.strip()) return geo except Exception as e: log.debug(f"getwindowgeometry failed: {e}") return None class Recorder: """ Manages a single SWF→MP4 conversion pass. One Recorder is created per SWF file. `.record()` can be called multiple times (once per interaction branch). """ def __init__(self, swf_path: Path, cfg: dict): self.swf_path = swf_path self.cfg = cfg # Config values with sensible defaults self.loop_grace = cfg.get("loop_grace", 5.0) self.max_duration = cfg.get("max_duration", 600.0) self.fps = cfg.get("fps", 30) self.crf = cfg.get("crf", 18) # FFmpeg quality (lower = better) self.window_size = cfg.get("window_size", "800x600") self.emulator = cfg.get("emulator", "ruffle") # "ruffle" or "lightspark" self.end_seconds = cfg.get("end_seconds") # Hard stop time, if known self.end_frame = cfg.get("end_frame") # Hard stop frame, if known self.loop_detection = cfg.get("loop_detection", "hash") # "hash", "freeze", or "none" self.startup_delay = cfg.get("startup_delay", 3.0) # Wait after launch before recording self.audio = cfg.get("audio", True) def record(self, output_path: Path, clicks: list) -> bool: """ Run one recording pass. Parameters ---------- output_path : Path Where to save the resulting MP4. clicks : list List of click dicts {t, x, y, label, ...} to inject. Returns ------- bool True on success. """ log.info(f"Starting recording pass: {output_path.name}") output_path.parent.mkdir(parents=True, exist_ok=True) # If using freeze detection, we record raw first then post-process use_freeze_postprocess = ( self.loop_detection == "freeze" and self.end_seconds is None and self.end_frame is None ) raw_path = output_path if use_freeze_postprocess: tmp = tempfile.NamedTemporaryFile(suffix=".mp4", delete=False) raw_path = Path(tmp.name) tmp.close() try: return self._do_record(raw_path, output_path, clicks, use_freeze_postprocess) except Exception as e: log.exception(f"Recording failed: {e}") return False def _do_record( self, raw_path: Path, final_path: Path, clicks: list, use_freeze_postprocess: bool, ) -> bool: # --- Find emulator --- if self.emulator == "lightspark": binary = find_binary(LIGHTSPARK_CANDIDATES) if not binary: log.error("Lightspark not found. Falling back to Ruffle.") binary = find_binary(RUFFLE_CANDIDATES) else: binary = find_binary(RUFFLE_CANDIDATES) if not binary: log.error( "No Flash emulator found. Install Ruffle:\n" " https://github.com/ruffle-rs/ruffle/releases\n" " Place the binary at ~/.local/bin/ruffle" ) return False # --- Launch emulator --- env = os.environ.copy() env["DISPLAY"] = env.get("DISPLAY", ":0") emulator_cmd = [binary, str(self.swf_path)] log.info(f"Launching: {' '.join(emulator_cmd)}") emulator_proc = subprocess.Popen( emulator_cmd, env=env, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, ) # --- Find window --- window_id = find_window_id("Ruffle", timeout=15.0) if not window_id: window_id = find_window_id(self.swf_path.stem, timeout=5.0) if not window_id: log.error("Could not find emulator window. Is a display available?") emulator_proc.kill() return False # --- Wait for startup --- log.info(f"Waiting {self.startup_delay}s for emulator startup...") time.sleep(self.startup_delay) # Get window geometry for FFmpeg geo = get_window_geometry(window_id) if geo: capture_size = f"{geo['width']}x{geo['height']}" capture_offset = f"{geo['x']},{geo['y']}" else: capture_size = self.window_size capture_offset = "0,0" # --- Start FFmpeg recording --- display = env.get("DISPLAY", ":0") ffmpeg_cmd = self._build_ffmpeg_cmd( display=display, capture_size=capture_size, capture_offset=capture_offset, output_path=raw_path, ) log.info(f"FFmpeg command: {' '.join(ffmpeg_cmd)}") ffmpeg_proc = subprocess.Popen( ffmpeg_cmd, stdin=subprocess.PIPE, stdout=subprocess.DEVNULL, stderr=subprocess.PIPE, ) recording_start = time.monotonic() # --- Start click injector --- interaction_ctrl = None if clicks: interaction_ctrl = InteractionController(window_id=window_id, clicks=clicks) interaction_ctrl.start(recording_start_time=recording_start) # --- Start loop detector (hash-based) --- loop_detector = None stop_event = threading.Event() if self.loop_detection == "hash" and self.end_seconds is None: loop_detector = LoopDetector(window_id=window_id) loop_thread = threading.Thread( target=loop_detector.start, daemon=True ) loop_thread.start() # --- Monitor recording --- stop_reason = self._monitor_recording( emulator_proc=emulator_proc, ffmpeg_proc=ffmpeg_proc, loop_detector=loop_detector, recording_start=recording_start, ) # --- Stop everything --- log.info(f"Stopping recording. Reason: {stop_reason}") if loop_detector: loop_detector.stop() if interaction_ctrl: interaction_ctrl.stop() # Send 'q' to FFmpeg to gracefully finalize the MP4 try: ffmpeg_proc.stdin.write(b"q") ffmpeg_proc.stdin.flush() ffmpeg_proc.wait(timeout=10) except Exception: ffmpeg_proc.kill() # Kill emulator try: emulator_proc.terminate() emulator_proc.wait(timeout=5) except Exception: emulator_proc.kill() # --- Post-process: freeze detection trim --- if use_freeze_postprocess and raw_path.exists(): log.info("Running freeze detection post-process...") freeze_t = StillnessDetector.find_freeze_start( str(raw_path), min_duration=3.0, ) if freeze_t: StillnessDetector.trim_at_freeze( str(raw_path), str(final_path), freeze_t, grace=self.loop_grace, ) raw_path.unlink(missing_ok=True) else: log.info("No freeze detected; using full recording.") raw_path.rename(final_path) elif not use_freeze_postprocess and raw_path != final_path: raw_path.rename(final_path) success = final_path.exists() and final_path.stat().st_size > 0 if success: duration = time.monotonic() - recording_start log.info(f"Recording complete: {final_path} ({duration:.1f}s recorded)") else: log.error(f"Output file missing or empty: {final_path}") return success def _monitor_recording( self, emulator_proc, ffmpeg_proc, loop_detector: Optional[LoopDetector], recording_start: float, ) -> str: """ Block until we decide to stop recording. Returns a string describing why. """ loop_grace_remaining = self.loop_grace loop_stop_at = None while True: elapsed = time.monotonic() - recording_start # Hard stop: configured end_seconds if self.end_seconds is not None and elapsed >= self.end_seconds: return f"end_seconds={self.end_seconds}" # Hard stop: max_duration if elapsed >= self.max_duration: return f"max_duration={self.max_duration}" # Emulator crashed if emulator_proc.poll() is not None: return "emulator_exited" # FFmpeg crashed if ffmpeg_proc.poll() is not None: return "ffmpeg_exited" # Loop detection if loop_detector and loop_detector.loop_detected(): if loop_stop_at is None: loop_stop_at = time.monotonic() + self.loop_grace log.info(f"Loop confirmed; will stop in {self.loop_grace}s at t={elapsed + self.loop_grace:.1f}s") if time.monotonic() >= loop_stop_at: return f"loop_detected+{self.loop_grace}s_grace" time.sleep(0.2) def _build_ffmpeg_cmd( self, display: str, capture_size: str, capture_offset: str, output_path: Path, ) -> list: """Build the FFmpeg command for x11grab capture.""" cmd = [ "ffmpeg", "-y", # Input: X11 screen capture "-f", "x11grab", "-framerate", str(self.fps), "-video_size", capture_size, "-i", f"{display}+{capture_offset}", ] if self.audio: cmd += [ # Input: PulseAudio capture "-f", "pulse", "-i", "default", ] cmd += [ # Video encoding "-c:v", "libx264", "-preset", "fast", "-crf", str(self.crf), "-pix_fmt", "yuv420p", ] if self.audio: cmd += ["-c:a", "aac", "-b:a", "128k"] else: cmd += ["-an"] cmd += [str(output_path)] return cmd