From 352c8e0b735998603a7cfd91e6093fb9f991264e Mon Sep 17 00:00:00 2001 From: Brian Rogers Date: Tue, 24 Mar 2026 18:11:39 -0600 Subject: [PATCH] initial commit --- README.md | 202 ++++++++++++++++++++++ config_manager.py | 192 +++++++++++++++++++++ convert.py | 209 +++++++++++++++++++++++ install.sh | 162 ++++++++++++++++++ interaction.py | 178 +++++++++++++++++++ loop_detector.py | 232 +++++++++++++++++++++++++ map_interactions.py | 308 +++++++++++++++++++++++++++++++++ recorder.py | 390 ++++++++++++++++++++++++++++++++++++++++++ swf_config.json | 102 +++++++++++ swf_inspector.py | 194 +++++++++++++++++++++ tests.py | 407 ++++++++++++++++++++++++++++++++++++++++++++ 11 files changed, 2576 insertions(+) create mode 100644 README.md create mode 100644 config_manager.py create mode 100644 convert.py create mode 100644 install.sh create mode 100644 interaction.py create mode 100644 loop_detector.py create mode 100644 map_interactions.py create mode 100644 recorder.py create mode 100644 swf_config.json create mode 100644 swf_inspector.py create mode 100644 tests.py diff --git a/README.md b/README.md new file mode 100644 index 0000000..9a53de6 --- /dev/null +++ b/README.md @@ -0,0 +1,202 @@ +# SWF → MP4 Converter + +Converts SWF (Flash) files to MP4 videos on Fedora, with: +- **Automatic loop detection** — stops recording after 5 seconds of a repeated loop +- **Interactive branch capture** — records separate MP4s for each clickable path +- **Batch processing** — handles entire directories of SWF files +- **GUI interaction mapper** — click-record tool to find interaction coordinates + +--- + +## File Structure + +``` +swf_converter/ +├── convert.py # Main entry point — run this +├── recorder.py # FFmpeg + Ruffle orchestration +├── loop_detector.py # Frame-hash loop detection + FFmpeg freeze detection +├── interaction.py # xdotool click injection + interaction recording +├── config_manager.py # JSON config loading and validation +├── swf_inspector.py # Reads SWF binary headers (FPS, frame count, dimensions) +├── map_interactions.py # GUI tool to record interaction coordinates +├── install.sh # Fedora dependency installer +├── tests.py # Unit tests +└── configs/ + └── swf_config.json # Per-file settings and interaction definitions +``` + +--- + +## Installation + +```bash +bash install.sh +``` + +This installs: `ffmpeg`, `xdotool`, `ImageMagick`, `scrot`, `python3-tkinter`, +`Xvfb`, and downloads the latest **Ruffle** binary to `~/.local/bin/ruffle`. + +--- + +## Quick Start + +### 1. Inspect your SWF files + +```bash +python3 convert.py --inspect *.swf +``` + +Shows version, FPS, frame count, dimensions, and estimated duration for each file. + +### 2. Generate a starter config + +```bash +python3 convert.py --generate-config *.swf +``` + +Creates `configs/swf_config.json` pre-populated with each SWF's metadata. +Edit it to add `end_seconds` and interaction points. + +### 3. Map interaction points (optional) + +For interactive SWFs, use the GUI tool to record where/when clicks happen: + +```bash +python3 map_interactions.py my_interactive.swf +``` + +1. Click **Launch & Start** — opens the SWF in Ruffle and starts a timer +2. Watch the SWF — when an interactive moment appears, click **Record Click** +3. Coordinates are auto-detected from your mouse position +4. Click **Save Config** — writes interactions to `configs/swf_config.json` + +### 4. Convert + +```bash +# Single file +python3 convert.py my.swf + +# Directory of SWF files +python3 convert.py ./swf_files/ -o ./output/ + +# Dry run (shows what would happen without recording) +python3 convert.py ./swf_files/ --dry-run + +# Override loop grace period +python3 convert.py *.swf --loop-grace 8.0 +``` + +--- + +## Output Structure + +For a SWF called `lesson.swf` with two interaction points: + +``` +output/ +├── lesson_base.mp4 ← Linear content, no clicks +├── lesson_button_yes.mp4 ← Branch: click "Yes" at t=12.5s +└── lesson_button_no.mp4 ← Branch: click "No" at t=12.5s +``` + +--- + +## Configuration Reference + +```json +{ + "_defaults": { + "emulator": "ruffle", // "ruffle" or "lightspark" + "loop_detection": "hash", // "hash", "freeze", or "none" + "loop_grace": 5.0, // Seconds after loop detected before stopping + "max_duration": 600.0, // Hard cap in seconds + "fps": 30, // Capture frame rate + "crf": 18, // FFmpeg quality (0–51, lower=better) + "audio": true, // Capture audio via PulseAudio + "startup_delay": 3.0 // Wait after launching emulator + }, + + "my_animation.swf": { + "end_seconds": 42.0, // Force-stop at this timestamp + "interactions": [ + { + "t": 12.5, // Seconds from SWF start + "x": 320, // X coordinate in window + "y": 240, // Y coordinate in window + "label": "button_yes", // Output filename suffix + "button": 1, // 1=left, 2=middle, 3=right + "double": false // Double-click? + } + ] + } +} +``` + +--- + +## Loop Detection Methods + +| Method | How it works | Best for | +|---------|-------------|----------| +| `hash` | Compares MD5 hashes of downsampled screenshots every 0.4s. Detects repeated frame sequences. | Most animations | +| `freeze` | Runs FFmpeg `freezedetect` on the raw recording as post-processing. Catches visual stills. | Animations that fade to a static frame | +| `none` | No loop detection — relies on `end_seconds` or `max_duration`. | When you know exact duration | + +All methods add a configurable **grace period** (default 5 seconds) after detection, +so the loop point itself is visible in the output. + +--- + +## Headless / Server Use + +If running without a desktop display: + +```bash +# Start virtual display +Xvfb :99 -screen 0 1024x768x24 & +export DISPLAY=:99 + +# Start virtual audio sink +pulseaudio --start +pactl load-module module-null-sink sink_name=virtual + +# Convert +python3 convert.py ./swf_files/ -o ./output/ +``` + +--- + +## Running Tests + +```bash +python3 -m pytest tests.py -v +# or +python3 tests.py +``` + +--- + +## Troubleshooting + +**"No Flash emulator found"** +→ Download Ruffle from https://github.com/ruffle-rs/ruffle/releases and place at `~/.local/bin/ruffle` + +**"Could not find emulator window"** +→ Ensure a display is available (`echo $DISPLAY` should return `:0` or `:99`) +→ Try increasing `startup_delay` in config (some SWFs load slowly) + +**Audio missing from output** +→ Ensure PulseAudio is running: `pulseaudio --start` +→ Or disable audio: set `"audio": false` in config + +**Loop not detected** +→ Try `"loop_detection": "freeze"` for animations that end on a static frame +→ Or set `"end_seconds"` manually after inspecting the SWF + +**SWF doesn't work in Ruffle** +→ Set `"emulator": "lightspark"` in config for that file +→ Install Lightspark: `sudo dnf install lightspark` + +**xdotool clicks not registering** +→ Check window focus — the Ruffle window must be in the foreground +→ Use `map_interactions.py` to verify coordinates match what you see on screen diff --git a/config_manager.py b/config_manager.py new file mode 100644 index 0000000..19cb0bc --- /dev/null +++ b/config_manager.py @@ -0,0 +1,192 @@ +#!/usr/bin/env python3 +""" +Config Manager - Loads, validates, and merges per-SWF configuration. +""" + +import json +import logging +from pathlib import Path +from typing import Optional + +log = logging.getLogger("config_manager") + +# Schema documentation for config files +CONFIG_SCHEMA = { + # Per-file keys (all optional — omit to use global defaults) + "emulator": "str: 'ruffle' or 'lightspark'", + "loop_detection": "str: 'hash' | 'freeze' | 'none'", + "loop_grace": "float: seconds to continue after loop detected (default 5.0)", + "max_duration": "float: hard cap in seconds (default 600)", + "end_seconds": "float: stop at this timestamp (overrides loop detection)", + "end_frame": "int: stop at this frame number (overrides loop detection)", + "fps": "int: capture FPS (default 30)", + "crf": "int: FFmpeg quality 0–51, lower=better (default 18)", + "window_size": "str: e.g. '800x600' fallback if geometry detection fails", + "startup_delay": "float: seconds to wait after launching emulator (default 3.0)", + "audio": "bool: capture audio (default true)", + "interactions": [ + { + "t": "float: seconds from SWF start to fire click", + "x": "int: X coordinate in window", + "y": "int: Y coordinate in window", + "label": "str: output file suffix (e.g. 'button_yes')", + "button": "int: mouse button 1=left 2=mid 3=right (default 1)", + "double": "bool: double-click (default false)", + } + ], +} + +DEFAULT_CONFIG = { + "emulator": "ruffle", + "loop_detection": "hash", + "loop_grace": 5.0, + "max_duration": 600.0, + "fps": 30, + "crf": 18, + "window_size": "800x600", + "startup_delay": 3.0, + "audio": True, + "interactions": [], +} + + +class ConfigManager: + """ + Loads a JSON config file mapping SWF filenames to per-file settings. + + Config file format: + { + "_defaults": { ... global defaults ... }, + "my_animation.swf": { + "end_seconds": 42, + "interactions": [ + { "t": 12.5, "x": 320, "y": 240, "label": "button_yes" } + ] + }, + "menu.swf": { + "loop_detection": "none", + "end_seconds": 30 + } + } + """ + + def __init__(self, config_path: str = "configs/swf_config.json"): + self.config_path = Path(config_path) + self._data: dict = {} + self._load() + + def _load(self): + if self.config_path.exists(): + try: + with open(self.config_path) as f: + self._data = json.load(f) + log.info(f"Loaded config from {self.config_path}") + except json.JSONDecodeError as e: + log.error(f"Config JSON parse error: {e}. Using defaults.") + self._data = {} + else: + log.info(f"Config file not found ({self.config_path}). Using defaults for all files.") + self._data = {} + + def get(self, swf_filename: str, global_overrides: Optional[dict] = None) -> dict: + """ + Return the merged config for a SWF file. + + Priority (highest first): + 1. Per-file config in JSON + 2. _defaults section in JSON + 3. global_overrides (from CLI args) + 4. DEFAULT_CONFIG (hardcoded fallbacks) + """ + cfg = DEFAULT_CONFIG.copy() + + if global_overrides: + cfg.update({k: v for k, v in global_overrides.items() if v is not None}) + + file_defaults = self._data.get("_defaults", {}) + cfg.update(file_defaults) + + file_cfg = self._data.get(swf_filename, {}) + cfg.update(file_cfg) + + self._validate(cfg, swf_filename) + return cfg + + def _validate(self, cfg: dict, swf_filename: str): + """Log warnings for obviously wrong config values.""" + if cfg.get("crf") is not None and not (0 <= cfg["crf"] <= 51): + log.warning(f"{swf_filename}: crf={cfg['crf']} is out of range 0–51.") + if cfg.get("fps") is not None and not (1 <= cfg["fps"] <= 120): + log.warning(f"{swf_filename}: fps={cfg['fps']} is unusual.") + for i, interaction in enumerate(cfg.get("interactions", [])): + for required in ("t", "x", "y"): + if required not in interaction: + log.warning( + f"{swf_filename}: interaction[{i}] missing required key '{required}'." + ) + + def generate_starter(self, swf_files: list, inspector=None) -> dict: + """ + Generate a starter config file for the given SWF files. + Optionally uses SWFInspector to pre-populate FPS and estimated duration. + """ + config = { + "_defaults": { + "loop_detection": "hash", + "loop_grace": 5.0, + "max_duration": 600.0, + "fps": 30, + "audio": True, + }, + "_schema": CONFIG_SCHEMA, + } + + for swf in swf_files: + entry = {} + if inspector: + info = inspector.inspect(swf) + if info.get("fps"): + entry["fps"] = info["fps"] + if info.get("frame_count"): + estimated_seconds = round(info["frame_count"] / max(info.get("fps", 24), 1), 1) + entry["_estimated_duration_seconds"] = estimated_seconds + entry["_comment"] = ( + f"SWF: {info.get('frame_count')} frames @ {info.get('fps')} fps " + f"≈ {estimated_seconds}s. Set end_seconds to control stop time." + ) + entry["interactions"] = [ + { + "_comment": "Add click interactions here. Example:", + "t": 10.0, + "x": 400, + "y": 300, + "label": "example_click", + } + ] + config[swf.name] = entry + + self.config_path.parent.mkdir(parents=True, exist_ok=True) + with open(self.config_path, "w") as f: + json.dump(config, f, indent=2) + + log.info(f"Starter config written to {self.config_path}") + return config + + def save(self): + with open(self.config_path, "w") as f: + json.dump(self._data, f, indent=2) + log.info(f"Config saved to {self.config_path}") + + def set_file_config(self, swf_filename: str, key: str, value): + """Programmatically update a single config key for a SWF file.""" + if swf_filename not in self._data: + self._data[swf_filename] = {} + self._data[swf_filename][key] = value + + def add_interaction(self, swf_filename: str, interaction: dict): + """Append an interaction entry for a SWF file.""" + if swf_filename not in self._data: + self._data[swf_filename] = {} + if "interactions" not in self._data[swf_filename]: + self._data[swf_filename]["interactions"] = [] + self._data[swf_filename]["interactions"].append(interaction) diff --git a/convert.py b/convert.py new file mode 100644 index 0000000..e44e028 --- /dev/null +++ b/convert.py @@ -0,0 +1,209 @@ +#!/usr/bin/env python3 +""" +SWF to MP4 Converter - Main Orchestrator +Handles loop detection and interactive branch capture. +""" + +import argparse +import json +import logging +import os +import sys +import time +from pathlib import Path + +from recorder import Recorder +from loop_detector import LoopDetector +from interaction import InteractionController +from config_manager import ConfigManager +from swf_inspector import SWFInspector + +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s [%(levelname)s] %(name)s: %(message)s", + handlers=[ + logging.StreamHandler(sys.stdout), + logging.FileHandler("logs/convert.log"), + ], +) +log = logging.getLogger("convert") + + +def convert_swf(swf_path: Path, cfg: dict, output_dir: Path, dry_run: bool = False): + """Convert a single SWF file, including all interactive branches.""" + swf_name = swf_path.stem + log.info(f"=== Converting: {swf_path.name} ===") + + recorder = Recorder(swf_path, cfg) + interactions = cfg.get("interactions", []) + + results = [] + + # --- Pass 1: Base recording (no clicks) --- + base_output = output_dir / f"{swf_name}_base.mp4" + log.info(f" Pass 1/{ 1 + len(interactions) }: Base recording → {base_output.name}") + if not dry_run: + success = recorder.record( + output_path=base_output, + clicks=[], + ) + if success: + results.append({"label": "base", "file": str(base_output)}) + log.info(f" ✓ Base recording saved: {base_output.name}") + else: + log.error(f" ✗ Base recording failed for {swf_path.name}") + else: + log.info(f" [DRY RUN] Would record base pass → {base_output.name}") + results.append({"label": "base", "file": str(base_output), "dry_run": True}) + + # --- Pass N: One recording per interaction point --- + for i, interaction in enumerate(interactions, 1): + label = interaction.get("label", f"interaction_{i}") + branch_output = output_dir / f"{swf_name}_{label}.mp4" + log.info( + f" Pass {i + 1}/{ 1 + len(interactions) }: Branch '{label}' " + f"(click at t={interaction['t']}s, x={interaction['x']}, y={interaction['y']}) " + f"→ {branch_output.name}" + ) + if not dry_run: + success = recorder.record( + output_path=branch_output, + clicks=[interaction], + ) + if success: + results.append({"label": label, "file": str(branch_output)}) + log.info(f" ✓ Branch '{label}' saved: {branch_output.name}") + else: + log.error(f" ✗ Branch '{label}' failed for {swf_path.name}") + else: + log.info(f" [DRY RUN] Would record branch '{label}' → {branch_output.name}") + results.append({"label": label, "file": str(branch_output), "dry_run": True}) + + return results + + +def main(): + parser = argparse.ArgumentParser( + description="Convert SWF files to MP4 with loop detection and interaction capture." + ) + parser.add_argument( + "input", + nargs="+", + help="SWF file(s) or directory containing SWF files", + ) + parser.add_argument( + "-o", "--output", + default="output", + help="Output directory for MP4 files (default: output/)", + ) + parser.add_argument( + "-c", "--config", + default="configs/swf_config.json", + help="Path to JSON config file (default: configs/swf_config.json)", + ) + parser.add_argument( + "--inspect", + action="store_true", + help="Inspect SWF files and print metadata without converting", + ) + parser.add_argument( + "--generate-config", + action="store_true", + help="Generate a starter config file from the given SWF files", + ) + parser.add_argument( + "--dry-run", + action="store_true", + help="Show what would be done without actually recording", + ) + parser.add_argument( + "--loop-grace", + type=float, + default=5.0, + help="Seconds to continue recording after a loop is detected (default: 5.0)", + ) + parser.add_argument( + "--max-duration", + type=float, + default=600.0, + help="Hard cap on recording duration in seconds (default: 600)", + ) + args = parser.parse_args() + + # Collect SWF files + swf_files = [] + for inp in args.input: + p = Path(inp) + if p.is_dir(): + swf_files.extend(sorted(p.glob("**/*.swf"))) + elif p.suffix.lower() == ".swf": + swf_files.append(p) + else: + log.warning(f"Skipping non-SWF file: {p}") + + if not swf_files: + log.error("No SWF files found.") + sys.exit(1) + + log.info(f"Found {len(swf_files)} SWF file(s).") + + # Inspect mode + if args.inspect: + inspector = SWFInspector() + for swf in swf_files: + info = inspector.inspect(swf) + print(json.dumps({str(swf): info}, indent=2)) + return + + # Generate config mode + if args.generate_config: + cfg_mgr = ConfigManager(args.config) + inspector = SWFInspector() + cfg_mgr.generate_starter(swf_files, inspector) + print(f"Starter config written to: {args.config}") + print("Edit it to add interaction points, then run without --generate-config.") + return + + # Load config + cfg_mgr = ConfigManager(args.config) + global_defaults = { + "loop_grace": args.loop_grace, + "max_duration": args.max_duration, + } + + output_dir = Path(args.output) + output_dir.mkdir(parents=True, exist_ok=True) + + # Convert each file + all_results = {} + for swf_path in swf_files: + cfg = cfg_mgr.get(swf_path.name, global_defaults) + try: + results = convert_swf(swf_path, cfg, output_dir, dry_run=args.dry_run) + all_results[swf_path.name] = results + except Exception as e: + log.exception(f"Unexpected error converting {swf_path.name}: {e}") + all_results[swf_path.name] = {"error": str(e)} + + # Summary + print("\n" + "=" * 60) + print("CONVERSION SUMMARY") + print("=" * 60) + for swf_name, results in all_results.items(): + print(f"\n{swf_name}:") + if isinstance(results, list): + for r in results: + status = "[DRY RUN]" if r.get("dry_run") else "✓" + print(f" {status} [{r['label']}] → {r['file']}") + else: + print(f" ✗ ERROR: {results.get('error')}") + + # Write results JSON + results_path = output_dir / "conversion_results.json" + with open(results_path, "w") as f: + json.dump(all_results, f, indent=2) + log.info(f"Results written to {results_path}") + + +if __name__ == "__main__": + main() diff --git a/install.sh b/install.sh new file mode 100644 index 0000000..f01ab35 --- /dev/null +++ b/install.sh @@ -0,0 +1,162 @@ +#!/usr/bin/env bash +# ============================================================================= +# SWF Converter — Fedora Setup Script +# ============================================================================= +# Run with: bash install.sh +# ============================================================================= +set -euo pipefail + +RED='\033[0;31m' +GREEN='\033[0;32m' +YELLOW='\033[1;33m' +CYAN='\033[0;36m' +BOLD='\033[1m' +NC='\033[0m' + +info() { echo -e "${CYAN}[INFO]${NC} $*"; } +success() { echo -e "${GREEN}[OK]${NC} $*"; } +warn() { echo -e "${YELLOW}[WARN]${NC} $*"; } +error() { echo -e "${RED}[ERR]${NC} $*"; } + +echo -e "\n${BOLD}SWF → MP4 Converter Setup (Fedora)${NC}" +echo "==================================================" + +# --- System dependencies --- +info "Installing system packages via dnf..." +sudo dnf install -y \ + ffmpeg \ + xdotool \ + ImageMagick \ + scrot \ + python3 \ + python3-pip \ + python3-tkinter \ + xorg-x11-server-Xvfb + +success "System packages installed." + +# --- Python dependencies --- +info "Installing Python packages..." +pip3 install --user pillow + +success "Python packages installed." + +# --- Ruffle --- +echo "" +info "Checking for Ruffle..." + +RUFFLE_BIN="$HOME/.local/bin/ruffle" +if command -v ruffle &>/dev/null || [ -f "$RUFFLE_BIN" ]; then + success "Ruffle found: $(command -v ruffle 2>/dev/null || echo $RUFFLE_BIN)" +else + warn "Ruffle not found. Attempting to download latest release..." + mkdir -p "$HOME/.local/bin" + + # Detect arch + ARCH=$(uname -m) + case "$ARCH" in + x86_64) RUFFLE_ARCH="x86_64-linux" ;; + aarch64) RUFFLE_ARCH="aarch64-linux" ;; + *) + error "Unknown architecture: $ARCH" + echo "Download Ruffle manually from: https://github.com/ruffle-rs/ruffle/releases" + echo "Place the binary at: $RUFFLE_BIN" + RUFFLE_ARCH="" + ;; + esac + + if [ -n "$RUFFLE_ARCH" ]; then + # Get latest release URL + RELEASE_URL=$(curl -s https://api.github.com/repos/ruffle-rs/ruffle/releases/latest \ + | grep "browser_download_url" \ + | grep "$RUFFLE_ARCH" \ + | grep -v ".sha256" \ + | head -1 \ + | cut -d '"' -f 4) + + if [ -n "$RELEASE_URL" ]; then + info "Downloading: $RELEASE_URL" + TMP_DIR=$(mktemp -d) + curl -L "$RELEASE_URL" -o "$TMP_DIR/ruffle.tar.gz" + tar -xzf "$TMP_DIR/ruffle.tar.gz" -C "$TMP_DIR" + RUFFLE_EXE=$(find "$TMP_DIR" -name "ruffle" -type f | head -1) + if [ -n "$RUFFLE_EXE" ]; then + cp "$RUFFLE_EXE" "$RUFFLE_BIN" + chmod +x "$RUFFLE_BIN" + rm -rf "$TMP_DIR" + success "Ruffle installed to $RUFFLE_BIN" + else + error "Could not find ruffle binary in archive." + echo "Download manually: https://github.com/ruffle-rs/ruffle/releases" + fi + else + error "Could not find Ruffle release for $RUFFLE_ARCH." + echo "Download manually: https://github.com/ruffle-rs/ruffle/releases" + fi + fi +fi + +# Ensure ~/.local/bin is on PATH +if [[ ":$PATH:" != *":$HOME/.local/bin:"* ]]; then + warn "~/.local/bin is not in your PATH." + echo " Add this to your ~/.bashrc or ~/.zshrc:" + echo ' export PATH="$HOME/.local/bin:$PATH"' +fi + +# --- Lightspark (optional fallback) --- +echo "" +info "Lightspark (optional fallback emulator)..." +if command -v lightspark &>/dev/null; then + success "Lightspark found." +else + warn "Lightspark not found. Optional — only needed if Ruffle can't run a SWF." + echo " To install: sudo dnf install lightspark" + echo " Or build from source: https://github.com/lightspark/lightspark" +fi + +# --- Verify ffmpeg capabilities --- +echo "" +info "Verifying FFmpeg x11grab support..." +if ffmpeg -f x11grab -i :0 -t 0.1 /dev/null 2>&1 | grep -q "x11grab"; then + warn "x11grab reported an issue — ensure a display (:0) is available when recording." +else + success "FFmpeg x11grab available." +fi + +# --- Verify PulseAudio --- +info "Verifying PulseAudio..." +if pactl info &>/dev/null; then + success "PulseAudio running." +else + warn "PulseAudio not running. Audio recording may fail." + echo " Start with: pulseaudio --start" + echo " Or use '--no-audio' flag when converting (set audio: false in config)." +fi + +# --- Virtual display setup (for headless/server use) --- +echo "" +info "Virtual display (Xvfb) — for headless environments only..." +echo " If running on a server without a display, start Xvfb with:" +echo " Xvfb :99 -screen 0 1024x768x24 &" +echo " export DISPLAY=:99" +echo " Then run conversions normally." + +# --- Final check --- +echo "" +echo -e "${BOLD}Setup complete! Quick-start:${NC}" +echo "" +echo " # Inspect a SWF file:" +echo " python3 convert.py --inspect my.swf" +echo "" +echo " # Generate a config template:" +echo " python3 convert.py --generate-config *.swf" +echo "" +echo " # Map interaction points interactively:" +echo " python3 map_interactions.py my.swf" +echo "" +echo " # Convert a single SWF:" +echo " python3 convert.py my.swf" +echo "" +echo " # Convert a directory of SWFs:" +echo " python3 convert.py ./swf_files/ -o ./output/" +echo "" diff --git a/interaction.py b/interaction.py new file mode 100644 index 0000000..f87ea43 --- /dev/null +++ b/interaction.py @@ -0,0 +1,178 @@ +#!/usr/bin/env python3 +""" +Interaction Controller - Injects mouse clicks into a running Ruffle window +at specified timestamps using xdotool. +""" + +import logging +import subprocess +import threading +import time +from dataclasses import dataclass +from typing import List, Optional + +log = logging.getLogger("interaction") + + +@dataclass +class ClickEvent: + """A single click interaction to perform.""" + t: float # Time (seconds from start of recording) to fire this click + x: int # X coordinate relative to the window + y: int # Y coordinate relative to the window + label: str = "" # Human-readable label + button: int = 1 # Mouse button (1=left, 2=middle, 3=right) + double: bool = False # Whether to double-click + + +class InteractionController: + """ + Schedules and dispatches click events into a running Ruffle window. + + Usage + ----- + ctrl = InteractionController(window_id="0x1234abc", clicks=[...]) + ctrl.start(recording_start_time=time.monotonic()) + # ... recording happens ... + ctrl.stop() + """ + + def __init__(self, window_id: str, clicks: List[dict]): + self.window_id = window_id + self.clicks = [ + ClickEvent( + t=c["t"], + x=c["x"], + y=c["y"], + label=c.get("label", ""), + button=c.get("button", 1), + double=c.get("double", False), + ) + for c in sorted(clicks, key=lambda c: c["t"]) + ] + self._thread: Optional[threading.Thread] = None + self._stop_event = threading.Event() + self._recording_start: float = 0.0 + + def start(self, recording_start_time: float): + """Begin the click scheduler. Call immediately after recording starts.""" + self._recording_start = recording_start_time + self._stop_event.clear() + self._thread = threading.Thread(target=self._schedule_clicks, daemon=True) + self._thread.start() + log.info(f"InteractionController started with {len(self.clicks)} click(s).") + + def stop(self): + """Cancel any pending clicks.""" + self._stop_event.set() + if self._thread: + self._thread.join(timeout=2.0) + + def wait_until_done(self): + """Block until all clicks have been dispatched.""" + if self._thread: + self._thread.join() + + # ------------------------------------------------------------------ + # Internal + # ------------------------------------------------------------------ + + def _schedule_clicks(self): + for click in self.clicks: + if self._stop_event.is_set(): + break + elapsed = time.monotonic() - self._recording_start + wait = click.t - elapsed + if wait > 0: + # Sleep in small increments so we can respond to stop events + deadline = time.monotonic() + wait + while time.monotonic() < deadline: + if self._stop_event.is_set(): + return + time.sleep(min(0.05, deadline - time.monotonic())) + + if not self._stop_event.is_set(): + self._fire_click(click) + + def _fire_click(self, click: ClickEvent): + """Execute a click using xdotool.""" + try: + # Move mouse to position within window + subprocess.run( + [ + "xdotool", "mousemove", + "--window", self.window_id, + str(click.x), str(click.y), + ], + check=True, + timeout=2.0, + ) + time.sleep(0.05) # Brief settle time + + # Fire the click(s) + click_cmd = ["xdotool", "click", "--window", self.window_id, str(click.button)] + if click.double: + subprocess.run(click_cmd, check=True, timeout=2.0) + time.sleep(0.1) + subprocess.run(click_cmd, check=True, timeout=2.0) + + log.info( + f" Click fired: label='{click.label}' " + f"t={click.t:.2f}s x={click.x} y={click.y} " + f"btn={click.button} double={click.double}" + ) + except subprocess.CalledProcessError as e: + log.error(f"xdotool click failed: {e}") + except FileNotFoundError: + log.error("xdotool not found. Install with: sudo dnf install xdotool") + except subprocess.TimeoutExpired: + log.error("xdotool click timed out.") + + +class InteractionMapper: + """ + Helper for discovering interaction points interactively. + Runs Ruffle in the foreground and records the user's clicks + timestamps + so they can be saved to a config file. + """ + + def __init__(self, window_id: str): + self.window_id = window_id + self._recorded: list = [] + self._start_time: float = 0.0 + self._running = False + + def start_recording(self): + """ + Listen for clicks in the Ruffle window via xdotool key event tracking. + Note: This is a best-effort approach — for accurate mapping, use the + interactive GUI tool (gui.py) which shows a live overlay. + """ + self._start_time = time.monotonic() + self._running = True + log.info("Interaction recording started. Press Ctrl+C to stop.") + try: + while self._running: + # Poll for click position using xdotool + result = subprocess.run( + ["xdotool", "getmouselocation", "--window", self.window_id], + capture_output=True, + text=True, + timeout=1.0, + ) + # This is polled, not event-driven — use GUI for real click capture + time.sleep(0.1) + except KeyboardInterrupt: + self._running = False + log.info("Interaction recording stopped.") + + def get_recorded(self) -> list: + return self._recorded + + def record_click_at(self, x: int, y: int, label: str = ""): + """Manually register a click event (called from GUI or external hook).""" + t = time.monotonic() - self._start_time + entry = {"t": round(t, 2), "x": x, "y": y, "label": label or f"click_{len(self._recorded)+1}"} + self._recorded.append(entry) + log.info(f"Recorded interaction: {entry}") + return entry diff --git a/loop_detector.py b/loop_detector.py new file mode 100644 index 0000000..60cff30 --- /dev/null +++ b/loop_detector.py @@ -0,0 +1,232 @@ +#!/usr/bin/env python3 +""" +Loop Detector - Detects when a SWF has entered a repeating loop. + +Strategy: Periodically capture a downsampled screenshot of the Ruffle window, +hash it, and compare against recent hashes. A repeated hash sequence indicates +a loop has started. +""" + +import hashlib +import logging +import subprocess +import time +from collections import deque +from dataclasses import dataclass, field +from typing import Optional + +log = logging.getLogger("loop_detector") + + +@dataclass +class FrameRecord: + timestamp: float + hash: str + + +class LoopDetector: + """ + Detects animation loops by comparing periodic frame hashes. + + Parameters + ---------- + window_id : str + X11 window ID of the Ruffle window (hex string like '0x1234abc'). + poll_interval : float + Seconds between frame captures (default 0.4 — 2-3 samples/sec is enough). + sequence_length : int + Number of consecutive matching frames required to confirm a loop (default 4). + hash_history : int + How many past hashes to retain for comparison (default 300 ≈ 2 min at 0.4s). + thumbnail_size : str + Resolution to downsample frames to before hashing (smaller = faster, less + sensitive to minor rendering differences). + """ + + def __init__( + self, + window_id: str, + poll_interval: float = 0.4, + sequence_length: int = 4, + hash_history: int = 300, + thumbnail_size: str = "64x64", + ): + self.window_id = window_id + self.poll_interval = poll_interval + self.sequence_length = sequence_length + self.thumbnail_size = thumbnail_size + self._history: deque[FrameRecord] = deque(maxlen=hash_history) + self._loop_detected_at: Optional[float] = None + self._running = False + + # ------------------------------------------------------------------ + # Public API + # ------------------------------------------------------------------ + + def start(self): + """Begin background polling (call in a thread).""" + self._running = True + self._poll_loop() + + def stop(self): + self._running = False + + def loop_detected(self) -> bool: + return self._loop_detected_at is not None + + def loop_detected_at(self) -> Optional[float]: + """Wall-clock time when the loop was first detected, or None.""" + return self._loop_detected_at + + def reset(self): + self._history.clear() + self._loop_detected_at = None + + # ------------------------------------------------------------------ + # Internal + # ------------------------------------------------------------------ + + def _poll_loop(self): + while self._running: + h = self._capture_hash() + if h: + now = time.monotonic() + record = FrameRecord(timestamp=now, hash=h) + self._check_for_loop(record) + self._history.append(record) + time.sleep(self.poll_interval) + + def _capture_hash(self) -> Optional[str]: + """ + Screenshot the Ruffle window (downsampled) and return an MD5 hash. + Uses ImageMagick `import` — falls back to `scrot` if unavailable. + """ + try: + result = subprocess.run( + [ + "import", + "-window", self.window_id, + "-resize", self.thumbnail_size, + "-depth", "8", + "png:-", + ], + capture_output=True, + timeout=2.0, + ) + if result.returncode == 0 and result.stdout: + return hashlib.md5(result.stdout).hexdigest() + else: + log.debug(f"import failed: {result.stderr.decode()[:200]}") + return self._capture_hash_scrot() + except FileNotFoundError: + return self._capture_hash_scrot() + except subprocess.TimeoutExpired: + log.warning("Frame capture timed out.") + return None + + def _capture_hash_scrot(self) -> Optional[str]: + """Fallback: use scrot to capture window by ID.""" + try: + import tempfile, os + with tempfile.NamedTemporaryFile(suffix=".png", delete=False) as f: + tmp = f.name + result = subprocess.run( + ["scrot", "--window", self.window_id, tmp], + capture_output=True, + timeout=2.0, + ) + if result.returncode == 0: + # Downsample with ffmpeg then hash + result2 = subprocess.run( + [ + "ffmpeg", "-y", "-i", tmp, + "-vf", f"scale={self.thumbnail_size.replace('x', ':')}", + "-frames:v", "1", "-f", "rawvideo", "-pix_fmt", "rgb24", "pipe:1", + ], + capture_output=True, + timeout=3.0, + ) + os.unlink(tmp) + if result2.returncode == 0 and result2.stdout: + return hashlib.md5(result2.stdout).hexdigest() + return None + except Exception as e: + log.debug(f"scrot fallback failed: {e}") + return None + + def _check_for_loop(self, current: FrameRecord): + if self._loop_detected_at is not None: + return # already detected + + # Include the current (not-yet-appended) record in the full timeline + history_list = list(self._history) + [current] + if len(history_list) < self.sequence_length * 2: + return # not enough history yet + + # Build a sliding window of the last `sequence_length` hashes + recent = [r.hash for r in history_list[-self.sequence_length:]] + + # Search backwards in the earlier portion for a matching sequence + search_range = history_list[: -self.sequence_length] + for i in range(len(search_range) - self.sequence_length + 1): + window = [r.hash for r in search_range[i : i + self.sequence_length]] + if window == recent: + self._loop_detected_at = current.timestamp + log.info( + f"Loop detected at t={current.timestamp:.1f}s " + f"(matched sequence starting at history index {i})" + ) + return + + +class StillnessDetector: + """ + Simpler alternative: detect when the video has been completely static + for a given duration. Uses FFmpeg freezedetect on the recorded file. + + This runs as a post-processing step on a raw recording. + """ + + @staticmethod + def find_freeze_start(video_path: str, noise_tolerance: float = 0.001, min_duration: float = 5.0) -> Optional[float]: + """ + Returns the timestamp (seconds) where a freeze of >= min_duration starts, + or None if no such freeze exists. + """ + result = subprocess.run( + [ + "ffmpeg", "-i", video_path, + "-vf", f"freezedetect=n={noise_tolerance}:d={min_duration}", + "-f", "null", "-", + ], + capture_output=True, + text=True, + timeout=300, + ) + output = result.stderr + for line in output.splitlines(): + if "freeze_start" in line: + try: + t = float(line.split("freeze_start:")[1].strip().split()[0]) + log.info(f"Freeze detected at {t:.2f}s in {video_path}") + return t + except (IndexError, ValueError): + pass + return None + + @staticmethod + def trim_at_freeze(input_path: str, output_path: str, freeze_start: float, grace: float = 5.0): + """Trim a video to end `grace` seconds after the freeze start.""" + end_time = freeze_start + grace + subprocess.run( + [ + "ffmpeg", "-y", + "-i", input_path, + "-t", str(end_time), + "-c:v", "libx264", "-crf", "18", + "-c:a", "aac", + output_path, + ], + check=True, + ) + log.info(f"Trimmed video saved to {output_path} (end={end_time:.1f}s)") diff --git a/map_interactions.py b/map_interactions.py new file mode 100644 index 0000000..bb11e95 --- /dev/null +++ b/map_interactions.py @@ -0,0 +1,308 @@ +#!/usr/bin/env python3 +""" +Interaction Mapper GUI + +A Tkinter-based tool that: +1. Launches a SWF file in Ruffle +2. Shows a transparent overlay with a timer +3. Records every click (timestamp + coordinates) as you interact +4. Saves the interactions to the config file + +Run: + python3 map_interactions.py my_animation.swf +""" + +import argparse +import json +import logging +import subprocess +import sys +import threading +import time +import tkinter as tk +from pathlib import Path +from tkinter import messagebox, simpledialog + +from config_manager import ConfigManager +from recorder import find_binary, find_window_id, get_window_geometry, RUFFLE_CANDIDATES + +log = logging.getLogger("map_interactions") +logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s") + + +class InteractionMapperGUI: + def __init__(self, swf_path: Path, config_path: str): + self.swf_path = swf_path + self.config_mgr = ConfigManager(config_path) + self.recorded_clicks = [] + self.start_time = None + self.emulator_proc = None + self.ruffle_window_id = None + self._running = False + + self.root = tk.Tk() + self.root.title(f"Interaction Mapper — {swf_path.name}") + self.root.configure(bg="#1a1a2e") + self.root.resizable(False, False) + self._build_ui() + + def _build_ui(self): + root = self.root + PAD = 16 + FONT_MONO = ("Courier New", 11) + FONT_LABEL = ("Helvetica", 10) + BG = "#1a1a2e" + FG = "#e2e8f0" + ACCENT = "#7c3aed" + GREEN = "#22c55e" + RED = "#ef4444" + CARD = "#16213e" + + root.configure(bg=BG) + + # Title + tk.Label( + root, text="⚡ SWF Interaction Mapper", font=("Helvetica", 14, "bold"), + bg=BG, fg=ACCENT + ).pack(pady=(PAD, 4)) + + tk.Label( + root, text=f"File: {self.swf_path.name}", font=FONT_LABEL, + bg=BG, fg="#94a3b8" + ).pack() + + # Timer display + timer_frame = tk.Frame(root, bg=CARD, padx=PAD, pady=PAD) + timer_frame.pack(fill="x", padx=PAD, pady=(PAD, 0)) + + tk.Label(timer_frame, text="Elapsed Time", font=FONT_LABEL, bg=CARD, fg="#94a3b8").pack() + self.timer_label = tk.Label( + timer_frame, text="00:00.000", font=("Courier New", 28, "bold"), + bg=CARD, fg=GREEN + ) + self.timer_label.pack() + + # Instructions + instr_frame = tk.Frame(root, bg=CARD, padx=PAD, pady=8) + instr_frame.pack(fill="x", padx=PAD, pady=4) + instructions = ( + "1. Click 'Launch & Start' to open the SWF\n" + "2. Click 'Record Click' each time an interaction fires\n" + "3. Enter coordinates from the Ruffle window\n" + "4. Click 'Save Config' when done" + ) + tk.Label(instr_frame, text=instructions, font=("Helvetica", 9), bg=CARD, fg="#94a3b8", + justify="left").pack(anchor="w") + + # Recorded clicks list + list_frame = tk.Frame(root, bg=CARD, padx=PAD, pady=PAD) + list_frame.pack(fill="both", expand=True, padx=PAD, pady=4) + tk.Label(list_frame, text="Recorded Interactions", font=("Helvetica", 10, "bold"), + bg=CARD, fg=FG).pack(anchor="w") + + self.clicks_listbox = tk.Listbox( + list_frame, font=FONT_MONO, bg="#0f172a", fg=FG, + selectbackground=ACCENT, height=10, width=52, + borderwidth=0, highlightthickness=1, highlightbackground=ACCENT + ) + self.clicks_listbox.pack(fill="both", expand=True, pady=4) + + # Buttons + btn_frame = tk.Frame(root, bg=BG) + btn_frame.pack(fill="x", padx=PAD, pady=PAD) + + def btn(parent, text, cmd, color=ACCENT): + return tk.Button( + parent, text=text, command=cmd, + bg=color, fg="white", font=("Helvetica", 10, "bold"), + relief="flat", padx=12, pady=6, cursor="hand2", + activebackground=color, activeforeground="white", + ) + + btn(btn_frame, "▶ Launch & Start", self._launch_and_start, GREEN).pack(side="left", padx=2) + btn(btn_frame, "⊕ Record Click", self._record_click_dialog, ACCENT).pack(side="left", padx=2) + btn(btn_frame, "✕ Delete Selected", self._delete_selected, RED).pack(side="left", padx=2) + btn(btn_frame, "💾 Save Config", self._save_config, "#0ea5e9").pack(side="right", padx=2) + + self.status_label = tk.Label( + root, text="Ready. Click 'Launch & Start' to begin.", + font=("Helvetica", 9), bg=BG, fg="#94a3b8" + ) + self.status_label.pack(pady=(0, PAD)) + + root.protocol("WM_DELETE_WINDOW", self._on_close) + root.after(100, self._update_timer) + + def _launch_and_start(self): + if self._running: + self._set_status("Already running!") + return + + binary = find_binary(RUFFLE_CANDIDATES) + if not binary: + messagebox.showerror( + "Ruffle Not Found", + "Ruffle binary not found.\n\n" + "Download from: https://github.com/ruffle-rs/ruffle/releases\n" + "Place at: ~/.local/bin/ruffle" + ) + return + + self._set_status(f"Launching {self.swf_path.name}...") + self.emulator_proc = subprocess.Popen( + [binary, str(self.swf_path)], + stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL, + ) + self.start_time = time.monotonic() + self._running = True + self._set_status(f"Running. Record clicks as they happen in Ruffle.") + + def _record_click_dialog(self): + if not self._running: + self._set_status("Launch the SWF first!") + return + + elapsed = time.monotonic() - self.start_time + + # Try to auto-detect mouse position in Ruffle window + auto_x, auto_y = None, None + if self.ruffle_window_id is None: + self.ruffle_window_id = find_window_id("Ruffle", timeout=1.0) + + if self.ruffle_window_id: + try: + result = subprocess.run( + ["xdotool", "getmouselocation", "--shell"], + capture_output=True, text=True, timeout=1.0 + ) + geo = get_window_geometry(self.ruffle_window_id) + if result.returncode == 0 and geo: + d = {} + for line in result.stdout.splitlines(): + if "=" in line: + k, v = line.split("=", 1) + d[k.strip().lower()] = int(v.strip()) + auto_x = d.get("x", 0) - geo.get("x", 0) + auto_y = d.get("y", 0) - geo.get("y", 0) + except Exception: + pass + + # Dialog + dialog = tk.Toplevel(self.root) + dialog.title("Record Interaction") + dialog.configure(bg="#1a1a2e") + dialog.grab_set() + dialog.resizable(False, False) + + tk.Label(dialog, text=f"Time: {elapsed:.2f}s", font=("Courier New", 12, "bold"), + bg="#1a1a2e", fg="#22c55e").pack(pady=(12, 4)) + + fields = {} + for label, default in [ + ("X coordinate", str(auto_x or 0)), + ("Y coordinate", str(auto_y or 0)), + ("Label", f"click_{len(self.recorded_clicks)+1}"), + ]: + f = tk.Frame(dialog, bg="#1a1a2e") + f.pack(fill="x", padx=16, pady=2) + tk.Label(f, text=label, width=14, anchor="w", bg="#1a1a2e", fg="#e2e8f0", + font=("Helvetica", 10)).pack(side="left") + e = tk.Entry(f, bg="#0f172a", fg="#e2e8f0", insertbackground="white", + font=("Courier New", 10), width=20) + e.insert(0, default) + e.pack(side="left", padx=4) + fields[label] = e + + def confirm(): + try: + x = int(fields["X coordinate"].get()) + y = int(fields["Y coordinate"].get()) + label = fields["Label"].get().strip() or f"click_{len(self.recorded_clicks)+1}" + entry = {"t": round(elapsed, 3), "x": x, "y": y, "label": label} + self.recorded_clicks.append(entry) + self.clicks_listbox.insert( + tk.END, + f" t={entry['t']:7.3f}s x={x:4d} y={y:4d} [{label}]" + ) + self._set_status(f"Recorded: {label} at t={elapsed:.2f}s ({x},{y})") + dialog.destroy() + except ValueError: + messagebox.showerror("Invalid Input", "X and Y must be integers.", parent=dialog) + + tk.Button( + dialog, text="✓ Confirm", command=confirm, + bg="#7c3aed", fg="white", font=("Helvetica", 10, "bold"), + relief="flat", padx=12, pady=6 + ).pack(pady=12) + + def _delete_selected(self): + sel = self.clicks_listbox.curselection() + if not sel: + return + idx = sel[0] + self.clicks_listbox.delete(idx) + del self.recorded_clicks[idx] + self._set_status(f"Deleted interaction {idx + 1}.") + + def _save_config(self): + if not self.recorded_clicks: + if not messagebox.askyesno("No Interactions", "No interactions recorded. Save empty entry?"): + return + + for click in self.recorded_clicks: + self.config_mgr.add_interaction(self.swf_path.name, click) + self.config_mgr.save() + self._set_status(f"✓ Saved {len(self.recorded_clicks)} interaction(s) to config.") + messagebox.showinfo( + "Saved", + f"Saved {len(self.recorded_clicks)} interaction(s) to:\n{self.config_mgr.config_path}" + ) + + def _update_timer(self): + if self._running and self.start_time: + elapsed = time.monotonic() - self.start_time + minutes = int(elapsed // 60) + seconds = elapsed % 60 + self.timer_label.config(text=f"{minutes:02d}:{seconds:06.3f}") + self.root.after(50, self._update_timer) + + def _set_status(self, msg: str): + self.status_label.config(text=msg) + log.info(msg) + + def _on_close(self): + if self.emulator_proc: + try: + self.emulator_proc.terminate() + except Exception: + pass + self.root.destroy() + + def run(self): + self.root.mainloop() + + +def main(): + parser = argparse.ArgumentParser( + description="GUI tool for recording SWF interaction points." + ) + parser.add_argument("swf", help="SWF file to map interactions for") + parser.add_argument( + "-c", "--config", + default="configs/swf_config.json", + help="Config file to save interactions to", + ) + args = parser.parse_args() + + swf_path = Path(args.swf) + if not swf_path.exists(): + print(f"Error: SWF file not found: {swf_path}", file=sys.stderr) + sys.exit(1) + + app = InteractionMapperGUI(swf_path, args.config) + app.run() + + +if __name__ == "__main__": + main() diff --git a/recorder.py b/recorder.py new file mode 100644 index 0000000..a4bd833 --- /dev/null +++ b/recorder.py @@ -0,0 +1,390 @@ +#!/usr/bin/env python3 +""" +Recorder - Launches Ruffle, captures the window with FFmpeg, injects clicks, +and stops recording based on loop detection or configured duration. +""" + +import logging +import os +import signal +import subprocess +import threading +import time +import tempfile +from pathlib import Path +from typing import List, Optional + +from loop_detector import LoopDetector, StillnessDetector +from interaction import InteractionController + +log = logging.getLogger("recorder") + +# Default Ruffle binary locations to try (in order) +RUFFLE_CANDIDATES = [ + "ruffle", + os.path.expanduser("~/.local/bin/ruffle"), + "/usr/local/bin/ruffle", + "/opt/ruffle/ruffle", + os.path.expanduser("~/ruffle"), +] + +# Default Lightspark binary (fallback emulator) +LIGHTSPARK_CANDIDATES = [ + "lightspark", + "/usr/local/bin/lightspark", +] + + +def find_binary(candidates: list) -> Optional[str]: + for c in candidates: + try: + result = subprocess.run(["which", c], capture_output=True, text=True) + if result.returncode == 0: + return c + except Exception: + pass + if os.path.isfile(c) and os.access(c, os.X_OK): + return c + return None + + +def find_window_id(title_fragment: str, timeout: float = 15.0) -> Optional[str]: + """Poll xdotool until a window with the given title fragment appears.""" + deadline = time.monotonic() + timeout + while time.monotonic() < deadline: + try: + result = subprocess.run( + ["xdotool", "search", "--name", title_fragment], + capture_output=True, + text=True, + timeout=2.0, + ) + if result.returncode == 0 and result.stdout.strip(): + wid = result.stdout.strip().splitlines()[0] + log.info(f"Found window '{title_fragment}': ID={wid}") + return wid + except Exception: + pass + time.sleep(0.5) + log.error(f"Window '{title_fragment}' not found within {timeout}s.") + return None + + +def get_window_geometry(window_id: str) -> Optional[dict]: + """Return {'x': int, 'y': int, 'width': int, 'height': int} for a window.""" + try: + result = subprocess.run( + ["xdotool", "getwindowgeometry", "--shell", window_id], + capture_output=True, text=True, timeout=3.0, + ) + if result.returncode == 0: + geo = {} + for line in result.stdout.splitlines(): + if "=" in line: + k, v = line.split("=", 1) + geo[k.strip().lower()] = int(v.strip()) + return geo + except Exception as e: + log.debug(f"getwindowgeometry failed: {e}") + return None + + +class Recorder: + """ + Manages a single SWF→MP4 conversion pass. + + One Recorder is created per SWF file. `.record()` can be called multiple + times (once per interaction branch). + """ + + def __init__(self, swf_path: Path, cfg: dict): + self.swf_path = swf_path + self.cfg = cfg + + # Config values with sensible defaults + self.loop_grace = cfg.get("loop_grace", 5.0) + self.max_duration = cfg.get("max_duration", 600.0) + self.fps = cfg.get("fps", 30) + self.crf = cfg.get("crf", 18) # FFmpeg quality (lower = better) + self.window_size = cfg.get("window_size", "800x600") + self.emulator = cfg.get("emulator", "ruffle") # "ruffle" or "lightspark" + self.end_seconds = cfg.get("end_seconds") # Hard stop time, if known + self.end_frame = cfg.get("end_frame") # Hard stop frame, if known + self.loop_detection = cfg.get("loop_detection", "hash") # "hash", "freeze", or "none" + self.startup_delay = cfg.get("startup_delay", 3.0) # Wait after launch before recording + self.audio = cfg.get("audio", True) + + def record(self, output_path: Path, clicks: list) -> bool: + """ + Run one recording pass. + + Parameters + ---------- + output_path : Path + Where to save the resulting MP4. + clicks : list + List of click dicts {t, x, y, label, ...} to inject. + + Returns + ------- + bool + True on success. + """ + log.info(f"Starting recording pass: {output_path.name}") + output_path.parent.mkdir(parents=True, exist_ok=True) + + # If using freeze detection, we record raw first then post-process + use_freeze_postprocess = ( + self.loop_detection == "freeze" + and self.end_seconds is None + and self.end_frame is None + ) + + raw_path = output_path + if use_freeze_postprocess: + tmp = tempfile.NamedTemporaryFile(suffix=".mp4", delete=False) + raw_path = Path(tmp.name) + tmp.close() + + try: + return self._do_record(raw_path, output_path, clicks, use_freeze_postprocess) + except Exception as e: + log.exception(f"Recording failed: {e}") + return False + + def _do_record( + self, + raw_path: Path, + final_path: Path, + clicks: list, + use_freeze_postprocess: bool, + ) -> bool: + # --- Find emulator --- + if self.emulator == "lightspark": + binary = find_binary(LIGHTSPARK_CANDIDATES) + if not binary: + log.error("Lightspark not found. Falling back to Ruffle.") + binary = find_binary(RUFFLE_CANDIDATES) + else: + binary = find_binary(RUFFLE_CANDIDATES) + + if not binary: + log.error( + "No Flash emulator found. Install Ruffle:\n" + " https://github.com/ruffle-rs/ruffle/releases\n" + " Place the binary at ~/.local/bin/ruffle" + ) + return False + + # --- Launch emulator --- + env = os.environ.copy() + env["DISPLAY"] = env.get("DISPLAY", ":0") + + emulator_cmd = [binary, str(self.swf_path)] + log.info(f"Launching: {' '.join(emulator_cmd)}") + emulator_proc = subprocess.Popen( + emulator_cmd, + env=env, + stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL, + ) + + # --- Find window --- + window_id = find_window_id("Ruffle", timeout=15.0) + if not window_id: + window_id = find_window_id(self.swf_path.stem, timeout=5.0) + if not window_id: + log.error("Could not find emulator window. Is a display available?") + emulator_proc.kill() + return False + + # --- Wait for startup --- + log.info(f"Waiting {self.startup_delay}s for emulator startup...") + time.sleep(self.startup_delay) + + # Get window geometry for FFmpeg + geo = get_window_geometry(window_id) + if geo: + capture_size = f"{geo['width']}x{geo['height']}" + capture_offset = f"{geo['x']},{geo['y']}" + else: + capture_size = self.window_size + capture_offset = "0,0" + + # --- Start FFmpeg recording --- + display = env.get("DISPLAY", ":0") + ffmpeg_cmd = self._build_ffmpeg_cmd( + display=display, + capture_size=capture_size, + capture_offset=capture_offset, + output_path=raw_path, + ) + log.info(f"FFmpeg command: {' '.join(ffmpeg_cmd)}") + ffmpeg_proc = subprocess.Popen( + ffmpeg_cmd, + stdin=subprocess.PIPE, + stdout=subprocess.DEVNULL, + stderr=subprocess.PIPE, + ) + + recording_start = time.monotonic() + + # --- Start click injector --- + interaction_ctrl = None + if clicks: + interaction_ctrl = InteractionController(window_id=window_id, clicks=clicks) + interaction_ctrl.start(recording_start_time=recording_start) + + # --- Start loop detector (hash-based) --- + loop_detector = None + stop_event = threading.Event() + + if self.loop_detection == "hash" and self.end_seconds is None: + loop_detector = LoopDetector(window_id=window_id) + loop_thread = threading.Thread( + target=loop_detector.start, daemon=True + ) + loop_thread.start() + + # --- Monitor recording --- + stop_reason = self._monitor_recording( + emulator_proc=emulator_proc, + ffmpeg_proc=ffmpeg_proc, + loop_detector=loop_detector, + recording_start=recording_start, + ) + + # --- Stop everything --- + log.info(f"Stopping recording. Reason: {stop_reason}") + + if loop_detector: + loop_detector.stop() + if interaction_ctrl: + interaction_ctrl.stop() + + # Send 'q' to FFmpeg to gracefully finalize the MP4 + try: + ffmpeg_proc.stdin.write(b"q") + ffmpeg_proc.stdin.flush() + ffmpeg_proc.wait(timeout=10) + except Exception: + ffmpeg_proc.kill() + + # Kill emulator + try: + emulator_proc.terminate() + emulator_proc.wait(timeout=5) + except Exception: + emulator_proc.kill() + + # --- Post-process: freeze detection trim --- + if use_freeze_postprocess and raw_path.exists(): + log.info("Running freeze detection post-process...") + freeze_t = StillnessDetector.find_freeze_start( + str(raw_path), + min_duration=3.0, + ) + if freeze_t: + StillnessDetector.trim_at_freeze( + str(raw_path), + str(final_path), + freeze_t, + grace=self.loop_grace, + ) + raw_path.unlink(missing_ok=True) + else: + log.info("No freeze detected; using full recording.") + raw_path.rename(final_path) + elif not use_freeze_postprocess and raw_path != final_path: + raw_path.rename(final_path) + + success = final_path.exists() and final_path.stat().st_size > 0 + if success: + duration = time.monotonic() - recording_start + log.info(f"Recording complete: {final_path} ({duration:.1f}s recorded)") + else: + log.error(f"Output file missing or empty: {final_path}") + + return success + + def _monitor_recording( + self, + emulator_proc, + ffmpeg_proc, + loop_detector: Optional[LoopDetector], + recording_start: float, + ) -> str: + """ + Block until we decide to stop recording. Returns a string describing why. + """ + loop_grace_remaining = self.loop_grace + loop_stop_at = None + + while True: + elapsed = time.monotonic() - recording_start + + # Hard stop: configured end_seconds + if self.end_seconds is not None and elapsed >= self.end_seconds: + return f"end_seconds={self.end_seconds}" + + # Hard stop: max_duration + if elapsed >= self.max_duration: + return f"max_duration={self.max_duration}" + + # Emulator crashed + if emulator_proc.poll() is not None: + return "emulator_exited" + + # FFmpeg crashed + if ffmpeg_proc.poll() is not None: + return "ffmpeg_exited" + + # Loop detection + if loop_detector and loop_detector.loop_detected(): + if loop_stop_at is None: + loop_stop_at = time.monotonic() + self.loop_grace + log.info(f"Loop confirmed; will stop in {self.loop_grace}s at t={elapsed + self.loop_grace:.1f}s") + if time.monotonic() >= loop_stop_at: + return f"loop_detected+{self.loop_grace}s_grace" + + time.sleep(0.2) + + def _build_ffmpeg_cmd( + self, + display: str, + capture_size: str, + capture_offset: str, + output_path: Path, + ) -> list: + """Build the FFmpeg command for x11grab capture.""" + cmd = [ + "ffmpeg", "-y", + # Input: X11 screen capture + "-f", "x11grab", + "-framerate", str(self.fps), + "-video_size", capture_size, + "-i", f"{display}+{capture_offset}", + ] + + if self.audio: + cmd += [ + # Input: PulseAudio capture + "-f", "pulse", + "-i", "default", + ] + + cmd += [ + # Video encoding + "-c:v", "libx264", + "-preset", "fast", + "-crf", str(self.crf), + "-pix_fmt", "yuv420p", + ] + + if self.audio: + cmd += ["-c:a", "aac", "-b:a", "128k"] + else: + cmd += ["-an"] + + cmd += [str(output_path)] + return cmd diff --git a/swf_config.json b/swf_config.json new file mode 100644 index 0000000..4d78f84 --- /dev/null +++ b/swf_config.json @@ -0,0 +1,102 @@ +{ + "_comment": "SWF Converter Configuration File. Keys with _ prefix are comments/metadata.", + "_schema_reference": "See config_manager.py for full schema documentation.", + + "_defaults": { + "_comment": "These apply to ALL SWF files unless overridden per-file.", + "emulator": "ruffle", + "loop_detection": "hash", + "loop_grace": 5.0, + "max_duration": 600.0, + "fps": 30, + "crf": 18, + "window_size": "800x600", + "startup_delay": 3.0, + "audio": true + }, + + "simple_animation.swf": { + "_comment": "A basic non-interactive animation with a known loop point.", + "end_seconds": 45.0, + "interactions": [] + }, + + "looping_intro.swf": { + "_comment": "An animation that loops endlessly. Use hash detection + grace period.", + "loop_detection": "hash", + "loop_grace": 5.0, + "interactions": [] + }, + + "interactive_lesson.swf": { + "_comment": "An interactive lesson with two choice buttons appearing at t=12.5s.", + "loop_detection": "hash", + "loop_grace": 5.0, + "interactions": [ + { + "t": 12.5, + "x": 320, + "y": 240, + "label": "button_yes", + "_comment": "Clicking 'Yes' at the choice screen" + }, + { + "t": 12.5, + "x": 320, + "y": 340, + "label": "button_no", + "_comment": "Clicking 'No' at the choice screen" + } + ] + }, + + "quiz.swf": { + "_comment": "A quiz with multiple questions. Each question has clickable answer areas.", + "loop_detection": "hash", + "loop_grace": 3.0, + "interactions": [ + { + "t": 8.0, + "x": 200, + "y": 300, + "label": "q1_answer_a" + }, + { + "t": 8.0, + "x": 200, + "y": 380, + "label": "q1_answer_b" + }, + { + "t": 8.0, + "x": 200, + "y": 460, + "label": "q1_answer_c" + } + ] + }, + + "menu.swf": { + "_comment": "A menu SWF with known end time. No loop detection needed.", + "loop_detection": "none", + "end_seconds": 30.0, + "interactions": [] + }, + + "problematic.swf": { + "_comment": "A SWF that Ruffle struggles with — use Lightspark as fallback.", + "emulator": "lightspark", + "loop_detection": "freeze", + "loop_grace": 5.0, + "interactions": [] + }, + + "high_quality_export.swf": { + "_comment": "Export at higher quality for archival purposes.", + "crf": 12, + "fps": 60, + "window_size": "1280x720", + "loop_detection": "hash", + "interactions": [] + } +} diff --git a/swf_inspector.py b/swf_inspector.py new file mode 100644 index 0000000..11613ee --- /dev/null +++ b/swf_inspector.py @@ -0,0 +1,194 @@ +#!/usr/bin/env python3 +""" +SWF Inspector - Reads SWF binary headers to extract metadata without +running the file. + +Parses: SWF version, frame rate, frame count, dimensions, compression type. +Reference: https://open-flash.github.io/mirrors/swf-spec-19.pdf +""" + +import logging +import struct +import zlib +from pathlib import Path +from typing import Optional + +log = logging.getLogger("swf_inspector") + +# SWF signatures +SIG_UNCOMPRESSED = b"FWS" +SIG_ZLIB = b"CWS" +SIG_LZMA = b"ZWS" + + +class SWFInspector: + """ + Reads SWF file headers and returns useful metadata. + + Usage + ----- + info = SWFInspector().inspect(Path("my.swf")) + print(info) + # {'version': 8, 'compression': 'none', 'fps': 24.0, + # 'frame_count': 300, 'width': 550, 'height': 400} + """ + + def inspect(self, swf_path: Path) -> dict: + """Return a dict of metadata about the SWF file.""" + result = { + "path": str(swf_path), + "size_bytes": swf_path.stat().st_size if swf_path.exists() else None, + "version": None, + "compression": None, + "fps": None, + "frame_count": None, + "width": None, + "height": None, + "estimated_duration_seconds": None, + "error": None, + } + + if not swf_path.exists(): + result["error"] = "File not found" + return result + + try: + with open(swf_path, "rb") as f: + header = f.read(8) + + if len(header) < 8: + result["error"] = "File too small to be a valid SWF" + return result + + sig = header[:3] + version = header[3] + file_length = struct.unpack_from("> 3) & 0x1F # top 5 bits of first byte + rect_bits = 5 + 4 * nbits + rect_bytes = (rect_bits + 7) // 8 + + if len(data) < rect_bytes + 4: + return + + # Parse RECT fields (Xmin, Xmax, Ymin, Ymax) in twips (1/20 px) + bits = int.from_bytes(data[pos : pos + rect_bytes], "big") + total_bits = rect_bytes * 8 + shift = total_bits - 5 - nbits + xmin = self._signed_bits(bits >> shift, nbits) + shift -= nbits + xmax = self._signed_bits(bits >> shift, nbits) + shift -= nbits + ymin = self._signed_bits(bits >> shift, nbits) + shift -= nbits + ymax = self._signed_bits(bits >> shift, nbits) + + result["width"] = round((xmax - xmin) / 20) + result["height"] = round((ymax - ymin) / 20) + + pos += rect_bytes + + # FrameRate: FIXED8 (8.8 fixed point, little-endian) + if pos + 2 > len(data): + return + frame_rate_raw = struct.unpack_from(" len(data): + return + result["frame_count"] = struct.unpack_from(" int: + """Convert an unsigned integer to signed given bit width.""" + value &= (1 << nbits) - 1 + if value >= (1 << (nbits - 1)): + value -= (1 << nbits) + return value + + def inspect_many(self, paths: list) -> dict: + """Inspect multiple SWF files, returning {filename: info} dict.""" + return {p.name: self.inspect(p) for p in paths} + + def print_report(self, swf_path: Path): + """Pretty-print inspection results for a single SWF.""" + info = self.inspect(swf_path) + print(f"\n{'='*50}") + print(f"SWF: {info['path']}") + print(f"{'='*50}") + if info.get("error"): + print(f" ⚠ Error: {info['error']}") + print(f" Version : {info['version']}") + print(f" Compression: {info['compression']}") + print(f" Dimensions : {info['width']} × {info['height']} px") + print(f" FPS : {info['fps']}") + print(f" Frames : {info['frame_count']}") + print(f" Est. length: {info['estimated_duration_seconds']}s") + print(f" File size : {info['size_bytes']:,} bytes" if info['size_bytes'] else " File size : N/A") + print() + + +if __name__ == "__main__": + import sys + inspector = SWFInspector() + for path in sys.argv[1:]: + inspector.print_report(Path(path)) diff --git a/tests.py b/tests.py new file mode 100644 index 0000000..ab17ad4 --- /dev/null +++ b/tests.py @@ -0,0 +1,407 @@ +#!/usr/bin/env python3 +""" +Unit tests for SWF Converter components. +Run with: python3 -m pytest tests.py -v + or: python3 tests.py +""" + +import json +import struct +import tempfile +import time +import unittest +import zlib +from pathlib import Path +from unittest.mock import MagicMock, patch, call + +from config_manager import ConfigManager, DEFAULT_CONFIG +from loop_detector import LoopDetector, StillnessDetector, FrameRecord +from interaction import InteractionController, ClickEvent +from swf_inspector import SWFInspector + + +# ============================================================================= +# ConfigManager Tests +# ============================================================================= + +class TestConfigManager(unittest.TestCase): + + def setUp(self): + self.tmpdir = tempfile.mkdtemp() + self.config_path = str(Path(self.tmpdir) / "test_config.json") + + def _make_config(self, data: dict): + with open(self.config_path, "w") as f: + json.dump(data, f) + + def test_defaults_used_when_no_config_file(self): + cfg_mgr = ConfigManager("/nonexistent/path.json") + cfg = cfg_mgr.get("anything.swf") + self.assertEqual(cfg["loop_grace"], DEFAULT_CONFIG["loop_grace"]) + self.assertEqual(cfg["max_duration"], DEFAULT_CONFIG["max_duration"]) + + def test_per_file_config_overrides_defaults(self): + self._make_config({ + "my.swf": {"end_seconds": 42.0, "fps": 24} + }) + cfg_mgr = ConfigManager(self.config_path) + cfg = cfg_mgr.get("my.swf") + self.assertEqual(cfg["end_seconds"], 42.0) + self.assertEqual(cfg["fps"], 24) + + def test_global_defaults_section(self): + self._make_config({ + "_defaults": {"loop_grace": 10.0}, + "my.swf": {} + }) + cfg_mgr = ConfigManager(self.config_path) + cfg = cfg_mgr.get("my.swf") + self.assertEqual(cfg["loop_grace"], 10.0) + + def test_per_file_overrides_global_defaults(self): + self._make_config({ + "_defaults": {"loop_grace": 10.0}, + "my.swf": {"loop_grace": 2.0} + }) + cfg_mgr = ConfigManager(self.config_path) + cfg = cfg_mgr.get("my.swf") + self.assertEqual(cfg["loop_grace"], 2.0) + + def test_cli_overrides_applied(self): + cfg_mgr = ConfigManager(self.config_path) + cfg = cfg_mgr.get("my.swf", global_overrides={"loop_grace": 99.0}) + self.assertEqual(cfg["loop_grace"], 99.0) + + def test_per_file_overrides_cli(self): + self._make_config({"my.swf": {"loop_grace": 7.0}}) + cfg_mgr = ConfigManager(self.config_path) + cfg = cfg_mgr.get("my.swf", global_overrides={"loop_grace": 99.0}) + self.assertEqual(cfg["loop_grace"], 7.0) + + def test_interactions_list(self): + interactions = [ + {"t": 5.0, "x": 100, "y": 200, "label": "btn1"}, + {"t": 10.0, "x": 300, "y": 400, "label": "btn2"}, + ] + self._make_config({"my.swf": {"interactions": interactions}}) + cfg_mgr = ConfigManager(self.config_path) + cfg = cfg_mgr.get("my.swf") + self.assertEqual(cfg["interactions"], interactions) + + def test_add_interaction_and_save(self): + cfg_mgr = ConfigManager(self.config_path) + cfg_mgr.add_interaction("my.swf", {"t": 1.0, "x": 50, "y": 60, "label": "click_1"}) + cfg_mgr.save() + cfg_mgr2 = ConfigManager(self.config_path) + cfg = cfg_mgr2.get("my.swf") + self.assertEqual(len(cfg["interactions"]), 1) + self.assertEqual(cfg["interactions"][0]["label"], "click_1") + + def test_generate_starter_config(self): + swf1 = Path(self.tmpdir) / "anim.swf" + swf1.touch() + cfg_mgr = ConfigManager(self.config_path) + cfg_mgr.generate_starter([swf1]) + self.assertTrue(Path(self.config_path).exists()) + with open(self.config_path) as f: + data = json.load(f) + self.assertIn("anim.swf", data) + self.assertIn("_defaults", data) + + def test_missing_file_returns_defaults(self): + cfg_mgr = ConfigManager(self.config_path) + cfg = cfg_mgr.get("nonexistent.swf") + self.assertEqual(cfg["emulator"], DEFAULT_CONFIG["emulator"]) + + def test_unknown_keys_passed_through(self): + self._make_config({"my.swf": {"custom_key": "custom_value"}}) + cfg_mgr = ConfigManager(self.config_path) + cfg = cfg_mgr.get("my.swf") + self.assertEqual(cfg.get("custom_key"), "custom_value") + + +# ============================================================================= +# LoopDetector Tests +# ============================================================================= + +class TestLoopDetector(unittest.TestCase): + + def _make_detector(self, **kwargs): + return LoopDetector(window_id="0xfake", **kwargs) + + def test_no_loop_initially(self): + d = self._make_detector() + self.assertFalse(d.loop_detected()) + self.assertIsNone(d.loop_detected_at()) + + def test_loop_detected_on_repeated_sequence(self): + d = self._make_detector(sequence_length=3) + # Build history: A B C D E ... A B C ← should detect loop + hashes = ["aaa", "bbb", "ccc", "ddd", "eee", "fff", + "aaa", "bbb", "ccc"] + t = 0.0 + for h in hashes: + record = FrameRecord(timestamp=t, hash=h) + d._check_for_loop(record) + d._history.append(record) + t += 0.5 + + self.assertTrue(d.loop_detected()) + + def test_no_false_positive_with_unique_frames(self): + d = self._make_detector(sequence_length=3) + import hashlib + for i in range(50): + h = hashlib.md5(str(i).encode()).hexdigest() + record = FrameRecord(timestamp=float(i) * 0.4, hash=h) + d._check_for_loop(record) + d._history.append(record) + self.assertFalse(d.loop_detected()) + + def test_loop_not_detected_with_insufficient_history(self): + d = self._make_detector(sequence_length=4) + # Only 3 records — not enough to compare + for i, h in enumerate(["aaa", "bbb", "aaa"]): + record = FrameRecord(timestamp=float(i), hash=h) + d._check_for_loop(record) + d._history.append(record) + self.assertFalse(d.loop_detected()) + + def test_reset_clears_detection(self): + d = self._make_detector(sequence_length=2) + hashes = ["aaa", "bbb", "ccc", "aaa", "bbb"] + t = 0.0 + for h in hashes: + record = FrameRecord(timestamp=t, hash=h) + d._check_for_loop(record) + d._history.append(record) + t += 0.5 + d.reset() + self.assertFalse(d.loop_detected()) + self.assertEqual(len(d._history), 0) + + def test_second_detection_does_not_overwrite_first(self): + d = self._make_detector(sequence_length=2) + hashes = ["aaa", "bbb", "ccc", "aaa", "bbb", "ddd", "aaa", "bbb"] + t = 0.0 + first_detection = None + for h in hashes: + record = FrameRecord(timestamp=t, hash=h) + d._check_for_loop(record) + d._history.append(record) + if d.loop_detected() and first_detection is None: + first_detection = d.loop_detected_at() + t += 0.5 + self.assertEqual(d.loop_detected_at(), first_detection) + + +# ============================================================================= +# InteractionController Tests +# ============================================================================= + +class TestInteractionController(unittest.TestCase): + + def test_clicks_sorted_by_time(self): + clicks = [ + {"t": 10.0, "x": 1, "y": 1, "label": "b"}, + {"t": 2.0, "x": 2, "y": 2, "label": "a"}, + {"t": 5.0, "x": 3, "y": 3, "label": "c"}, + ] + ctrl = InteractionController("0xfake", clicks) + times = [c.t for c in ctrl.clicks] + self.assertEqual(times, [2.0, 5.0, 10.0]) + + def test_click_event_defaults(self): + ctrl = InteractionController("0xfake", [{"t": 1.0, "x": 10, "y": 20}]) + c = ctrl.clicks[0] + self.assertEqual(c.button, 1) + self.assertFalse(c.double) + self.assertEqual(c.label, "") + + def test_empty_clicks_list(self): + ctrl = InteractionController("0xfake", []) + self.assertEqual(ctrl.clicks, []) + + @patch("subprocess.run") + def test_fire_click_calls_xdotool(self, mock_run): + mock_run.return_value = MagicMock(returncode=0) + ctrl = InteractionController("0x1234", []) + click = ClickEvent(t=0.0, x=100, y=200, label="test", button=1, double=False) + ctrl._fire_click(click) + + calls = mock_run.call_args_list + # First call should be mousemove + self.assertIn("mousemove", calls[0][0][0]) + self.assertIn("100", calls[0][0][0]) + self.assertIn("200", calls[0][0][0]) + # Second call should be click + self.assertIn("click", calls[1][0][0]) + + @patch("subprocess.run") + def test_double_click_fires_twice(self, mock_run): + mock_run.return_value = MagicMock(returncode=0) + ctrl = InteractionController("0x1234", []) + click = ClickEvent(t=0.0, x=50, y=50, label="dbl", button=1, double=True) + ctrl._fire_click(click) + # mousemove + click + click = 3 calls + click_calls = [c for c in mock_run.call_args_list if "click" in str(c)] + self.assertEqual(len(click_calls), 2) + + def test_stop_cancels_pending_clicks(self): + clicks = [{"t": 60.0, "x": 1, "y": 1, "label": "late"}] + ctrl = InteractionController("0xfake", clicks) + ctrl.start(recording_start_time=time.monotonic()) + time.sleep(0.1) + ctrl.stop() + # Should not have fired (click is at t=60s) + # Just verify stop() doesn't raise + self.assertTrue(True) + + +# ============================================================================= +# SWFInspector Tests +# ============================================================================= + +class TestSWFInspector(unittest.TestCase): + + def _make_swf(self, tmpdir, fps=24.0, frame_count=100, width=550, height=400, + compressed=False): + """Create a minimal valid SWF binary.""" + # RECT: nbits=15 for values up to 16384 (550*20=11000, 400*20=8000) + # We'll use nbits=15, all values in twips + nbits = 15 + xmin, xmax, ymin, ymax = 0, width * 20, 0, height * 20 + total_bits = 5 + 4 * nbits + total_bytes = (total_bits + 7) // 8 + + # Build the bitstream + bits = nbits << (total_bytes * 8 - 5) + for val in [xmin, xmax, ymin, ymax]: + shift_pos = total_bytes * 8 - 5 - (([xmin, xmax, ymin, ymax].index(val) + 1) * nbits) + # Redo properly: + pass + + # Simpler approach: use a known valid RECT for 550x400 + # nbits=15: 0b01111 = 0x0F in top 5 bits + # Then 4 x 15-bit values: 0, 11000, 0, 8000 + # Pack as big-endian + nbits = 14 # enough for 11000 (14 bits = 16383 max) + total_bits_n = 5 + 4 * nbits # = 61 bits + total_bytes_n = (total_bits_n + 7) // 8 # = 8 bytes + big_val = (nbits << (total_bytes_n * 8 - 5)) + big_val |= (0 & ((1 << nbits) - 1)) << (total_bytes_n * 8 - 5 - nbits) + big_val |= (xmax & ((1 << nbits) - 1)) << (total_bytes_n * 8 - 5 - 2*nbits) + big_val |= (0 & ((1 << nbits) - 1)) << (total_bytes_n * 8 - 5 - 3*nbits) + big_val |= (ymax & ((1 << nbits) - 1)) << (total_bytes_n * 8 - 5 - 4*nbits) + rect_bytes = big_val.to_bytes(total_bytes_n, "big") + + # FrameRate: FIXED8 little-endian + frame_rate_raw = int(fps * 256) + frame_rate_bytes = struct.pack("