initial commit

This commit is contained in:
2026-03-24 18:11:39 -06:00
commit 352c8e0b73
11 changed files with 2576 additions and 0 deletions
+202
View File
@@ -0,0 +1,202 @@
# SWF → MP4 Converter
Converts SWF (Flash) files to MP4 videos on Fedora, with:
- **Automatic loop detection** — stops recording after 5 seconds of a repeated loop
- **Interactive branch capture** — records separate MP4s for each clickable path
- **Batch processing** — handles entire directories of SWF files
- **GUI interaction mapper** — click-record tool to find interaction coordinates
---
## File Structure
```
swf_converter/
├── convert.py # Main entry point — run this
├── recorder.py # FFmpeg + Ruffle orchestration
├── loop_detector.py # Frame-hash loop detection + FFmpeg freeze detection
├── interaction.py # xdotool click injection + interaction recording
├── config_manager.py # JSON config loading and validation
├── swf_inspector.py # Reads SWF binary headers (FPS, frame count, dimensions)
├── map_interactions.py # GUI tool to record interaction coordinates
├── install.sh # Fedora dependency installer
├── tests.py # Unit tests
└── configs/
└── swf_config.json # Per-file settings and interaction definitions
```
---
## Installation
```bash
bash install.sh
```
This installs: `ffmpeg`, `xdotool`, `ImageMagick`, `scrot`, `python3-tkinter`,
`Xvfb`, and downloads the latest **Ruffle** binary to `~/.local/bin/ruffle`.
---
## Quick Start
### 1. Inspect your SWF files
```bash
python3 convert.py --inspect *.swf
```
Shows version, FPS, frame count, dimensions, and estimated duration for each file.
### 2. Generate a starter config
```bash
python3 convert.py --generate-config *.swf
```
Creates `configs/swf_config.json` pre-populated with each SWF's metadata.
Edit it to add `end_seconds` and interaction points.
### 3. Map interaction points (optional)
For interactive SWFs, use the GUI tool to record where/when clicks happen:
```bash
python3 map_interactions.py my_interactive.swf
```
1. Click **Launch & Start** — opens the SWF in Ruffle and starts a timer
2. Watch the SWF — when an interactive moment appears, click **Record Click**
3. Coordinates are auto-detected from your mouse position
4. Click **Save Config** — writes interactions to `configs/swf_config.json`
### 4. Convert
```bash
# Single file
python3 convert.py my.swf
# Directory of SWF files
python3 convert.py ./swf_files/ -o ./output/
# Dry run (shows what would happen without recording)
python3 convert.py ./swf_files/ --dry-run
# Override loop grace period
python3 convert.py *.swf --loop-grace 8.0
```
---
## Output Structure
For a SWF called `lesson.swf` with two interaction points:
```
output/
├── lesson_base.mp4 ← Linear content, no clicks
├── lesson_button_yes.mp4 ← Branch: click "Yes" at t=12.5s
└── lesson_button_no.mp4 ← Branch: click "No" at t=12.5s
```
---
## Configuration Reference
```json
{
"_defaults": {
"emulator": "ruffle", // "ruffle" or "lightspark"
"loop_detection": "hash", // "hash", "freeze", or "none"
"loop_grace": 5.0, // Seconds after loop detected before stopping
"max_duration": 600.0, // Hard cap in seconds
"fps": 30, // Capture frame rate
"crf": 18, // FFmpeg quality (051, lower=better)
"audio": true, // Capture audio via PulseAudio
"startup_delay": 3.0 // Wait after launching emulator
},
"my_animation.swf": {
"end_seconds": 42.0, // Force-stop at this timestamp
"interactions": [
{
"t": 12.5, // Seconds from SWF start
"x": 320, // X coordinate in window
"y": 240, // Y coordinate in window
"label": "button_yes", // Output filename suffix
"button": 1, // 1=left, 2=middle, 3=right
"double": false // Double-click?
}
]
}
}
```
---
## Loop Detection Methods
| Method | How it works | Best for |
|---------|-------------|----------|
| `hash` | Compares MD5 hashes of downsampled screenshots every 0.4s. Detects repeated frame sequences. | Most animations |
| `freeze` | Runs FFmpeg `freezedetect` on the raw recording as post-processing. Catches visual stills. | Animations that fade to a static frame |
| `none` | No loop detection — relies on `end_seconds` or `max_duration`. | When you know exact duration |
All methods add a configurable **grace period** (default 5 seconds) after detection,
so the loop point itself is visible in the output.
---
## Headless / Server Use
If running without a desktop display:
```bash
# Start virtual display
Xvfb :99 -screen 0 1024x768x24 &
export DISPLAY=:99
# Start virtual audio sink
pulseaudio --start
pactl load-module module-null-sink sink_name=virtual
# Convert
python3 convert.py ./swf_files/ -o ./output/
```
---
## Running Tests
```bash
python3 -m pytest tests.py -v
# or
python3 tests.py
```
---
## Troubleshooting
**"No Flash emulator found"**
→ Download Ruffle from https://github.com/ruffle-rs/ruffle/releases and place at `~/.local/bin/ruffle`
**"Could not find emulator window"**
→ Ensure a display is available (`echo $DISPLAY` should return `:0` or `:99`)
→ Try increasing `startup_delay` in config (some SWFs load slowly)
**Audio missing from output**
→ Ensure PulseAudio is running: `pulseaudio --start`
→ Or disable audio: set `"audio": false` in config
**Loop not detected**
→ Try `"loop_detection": "freeze"` for animations that end on a static frame
→ Or set `"end_seconds"` manually after inspecting the SWF
**SWF doesn't work in Ruffle**
→ Set `"emulator": "lightspark"` in config for that file
→ Install Lightspark: `sudo dnf install lightspark`
**xdotool clicks not registering**
→ Check window focus — the Ruffle window must be in the foreground
→ Use `map_interactions.py` to verify coordinates match what you see on screen
+192
View File
@@ -0,0 +1,192 @@
#!/usr/bin/env python3
"""
Config Manager - Loads, validates, and merges per-SWF configuration.
"""
import json
import logging
from pathlib import Path
from typing import Optional
log = logging.getLogger("config_manager")
# Schema documentation for config files
CONFIG_SCHEMA = {
# Per-file keys (all optional — omit to use global defaults)
"emulator": "str: 'ruffle' or 'lightspark'",
"loop_detection": "str: 'hash' | 'freeze' | 'none'",
"loop_grace": "float: seconds to continue after loop detected (default 5.0)",
"max_duration": "float: hard cap in seconds (default 600)",
"end_seconds": "float: stop at this timestamp (overrides loop detection)",
"end_frame": "int: stop at this frame number (overrides loop detection)",
"fps": "int: capture FPS (default 30)",
"crf": "int: FFmpeg quality 051, lower=better (default 18)",
"window_size": "str: e.g. '800x600' fallback if geometry detection fails",
"startup_delay": "float: seconds to wait after launching emulator (default 3.0)",
"audio": "bool: capture audio (default true)",
"interactions": [
{
"t": "float: seconds from SWF start to fire click",
"x": "int: X coordinate in window",
"y": "int: Y coordinate in window",
"label": "str: output file suffix (e.g. 'button_yes')",
"button": "int: mouse button 1=left 2=mid 3=right (default 1)",
"double": "bool: double-click (default false)",
}
],
}
DEFAULT_CONFIG = {
"emulator": "ruffle",
"loop_detection": "hash",
"loop_grace": 5.0,
"max_duration": 600.0,
"fps": 30,
"crf": 18,
"window_size": "800x600",
"startup_delay": 3.0,
"audio": True,
"interactions": [],
}
class ConfigManager:
"""
Loads a JSON config file mapping SWF filenames to per-file settings.
Config file format:
{
"_defaults": { ... global defaults ... },
"my_animation.swf": {
"end_seconds": 42,
"interactions": [
{ "t": 12.5, "x": 320, "y": 240, "label": "button_yes" }
]
},
"menu.swf": {
"loop_detection": "none",
"end_seconds": 30
}
}
"""
def __init__(self, config_path: str = "configs/swf_config.json"):
self.config_path = Path(config_path)
self._data: dict = {}
self._load()
def _load(self):
if self.config_path.exists():
try:
with open(self.config_path) as f:
self._data = json.load(f)
log.info(f"Loaded config from {self.config_path}")
except json.JSONDecodeError as e:
log.error(f"Config JSON parse error: {e}. Using defaults.")
self._data = {}
else:
log.info(f"Config file not found ({self.config_path}). Using defaults for all files.")
self._data = {}
def get(self, swf_filename: str, global_overrides: Optional[dict] = None) -> dict:
"""
Return the merged config for a SWF file.
Priority (highest first):
1. Per-file config in JSON
2. _defaults section in JSON
3. global_overrides (from CLI args)
4. DEFAULT_CONFIG (hardcoded fallbacks)
"""
cfg = DEFAULT_CONFIG.copy()
if global_overrides:
cfg.update({k: v for k, v in global_overrides.items() if v is not None})
file_defaults = self._data.get("_defaults", {})
cfg.update(file_defaults)
file_cfg = self._data.get(swf_filename, {})
cfg.update(file_cfg)
self._validate(cfg, swf_filename)
return cfg
def _validate(self, cfg: dict, swf_filename: str):
"""Log warnings for obviously wrong config values."""
if cfg.get("crf") is not None and not (0 <= cfg["crf"] <= 51):
log.warning(f"{swf_filename}: crf={cfg['crf']} is out of range 051.")
if cfg.get("fps") is not None and not (1 <= cfg["fps"] <= 120):
log.warning(f"{swf_filename}: fps={cfg['fps']} is unusual.")
for i, interaction in enumerate(cfg.get("interactions", [])):
for required in ("t", "x", "y"):
if required not in interaction:
log.warning(
f"{swf_filename}: interaction[{i}] missing required key '{required}'."
)
def generate_starter(self, swf_files: list, inspector=None) -> dict:
"""
Generate a starter config file for the given SWF files.
Optionally uses SWFInspector to pre-populate FPS and estimated duration.
"""
config = {
"_defaults": {
"loop_detection": "hash",
"loop_grace": 5.0,
"max_duration": 600.0,
"fps": 30,
"audio": True,
},
"_schema": CONFIG_SCHEMA,
}
for swf in swf_files:
entry = {}
if inspector:
info = inspector.inspect(swf)
if info.get("fps"):
entry["fps"] = info["fps"]
if info.get("frame_count"):
estimated_seconds = round(info["frame_count"] / max(info.get("fps", 24), 1), 1)
entry["_estimated_duration_seconds"] = estimated_seconds
entry["_comment"] = (
f"SWF: {info.get('frame_count')} frames @ {info.get('fps')} fps "
f"{estimated_seconds}s. Set end_seconds to control stop time."
)
entry["interactions"] = [
{
"_comment": "Add click interactions here. Example:",
"t": 10.0,
"x": 400,
"y": 300,
"label": "example_click",
}
]
config[swf.name] = entry
self.config_path.parent.mkdir(parents=True, exist_ok=True)
with open(self.config_path, "w") as f:
json.dump(config, f, indent=2)
log.info(f"Starter config written to {self.config_path}")
return config
def save(self):
with open(self.config_path, "w") as f:
json.dump(self._data, f, indent=2)
log.info(f"Config saved to {self.config_path}")
def set_file_config(self, swf_filename: str, key: str, value):
"""Programmatically update a single config key for a SWF file."""
if swf_filename not in self._data:
self._data[swf_filename] = {}
self._data[swf_filename][key] = value
def add_interaction(self, swf_filename: str, interaction: dict):
"""Append an interaction entry for a SWF file."""
if swf_filename not in self._data:
self._data[swf_filename] = {}
if "interactions" not in self._data[swf_filename]:
self._data[swf_filename]["interactions"] = []
self._data[swf_filename]["interactions"].append(interaction)
+209
View File
@@ -0,0 +1,209 @@
#!/usr/bin/env python3
"""
SWF to MP4 Converter - Main Orchestrator
Handles loop detection and interactive branch capture.
"""
import argparse
import json
import logging
import os
import sys
import time
from pathlib import Path
from recorder import Recorder
from loop_detector import LoopDetector
from interaction import InteractionController
from config_manager import ConfigManager
from swf_inspector import SWFInspector
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
handlers=[
logging.StreamHandler(sys.stdout),
logging.FileHandler("logs/convert.log"),
],
)
log = logging.getLogger("convert")
def convert_swf(swf_path: Path, cfg: dict, output_dir: Path, dry_run: bool = False):
"""Convert a single SWF file, including all interactive branches."""
swf_name = swf_path.stem
log.info(f"=== Converting: {swf_path.name} ===")
recorder = Recorder(swf_path, cfg)
interactions = cfg.get("interactions", [])
results = []
# --- Pass 1: Base recording (no clicks) ---
base_output = output_dir / f"{swf_name}_base.mp4"
log.info(f" Pass 1/{ 1 + len(interactions) }: Base recording → {base_output.name}")
if not dry_run:
success = recorder.record(
output_path=base_output,
clicks=[],
)
if success:
results.append({"label": "base", "file": str(base_output)})
log.info(f" ✓ Base recording saved: {base_output.name}")
else:
log.error(f" ✗ Base recording failed for {swf_path.name}")
else:
log.info(f" [DRY RUN] Would record base pass → {base_output.name}")
results.append({"label": "base", "file": str(base_output), "dry_run": True})
# --- Pass N: One recording per interaction point ---
for i, interaction in enumerate(interactions, 1):
label = interaction.get("label", f"interaction_{i}")
branch_output = output_dir / f"{swf_name}_{label}.mp4"
log.info(
f" Pass {i + 1}/{ 1 + len(interactions) }: Branch '{label}' "
f"(click at t={interaction['t']}s, x={interaction['x']}, y={interaction['y']}) "
f"{branch_output.name}"
)
if not dry_run:
success = recorder.record(
output_path=branch_output,
clicks=[interaction],
)
if success:
results.append({"label": label, "file": str(branch_output)})
log.info(f" ✓ Branch '{label}' saved: {branch_output.name}")
else:
log.error(f" ✗ Branch '{label}' failed for {swf_path.name}")
else:
log.info(f" [DRY RUN] Would record branch '{label}'{branch_output.name}")
results.append({"label": label, "file": str(branch_output), "dry_run": True})
return results
def main():
parser = argparse.ArgumentParser(
description="Convert SWF files to MP4 with loop detection and interaction capture."
)
parser.add_argument(
"input",
nargs="+",
help="SWF file(s) or directory containing SWF files",
)
parser.add_argument(
"-o", "--output",
default="output",
help="Output directory for MP4 files (default: output/)",
)
parser.add_argument(
"-c", "--config",
default="configs/swf_config.json",
help="Path to JSON config file (default: configs/swf_config.json)",
)
parser.add_argument(
"--inspect",
action="store_true",
help="Inspect SWF files and print metadata without converting",
)
parser.add_argument(
"--generate-config",
action="store_true",
help="Generate a starter config file from the given SWF files",
)
parser.add_argument(
"--dry-run",
action="store_true",
help="Show what would be done without actually recording",
)
parser.add_argument(
"--loop-grace",
type=float,
default=5.0,
help="Seconds to continue recording after a loop is detected (default: 5.0)",
)
parser.add_argument(
"--max-duration",
type=float,
default=600.0,
help="Hard cap on recording duration in seconds (default: 600)",
)
args = parser.parse_args()
# Collect SWF files
swf_files = []
for inp in args.input:
p = Path(inp)
if p.is_dir():
swf_files.extend(sorted(p.glob("**/*.swf")))
elif p.suffix.lower() == ".swf":
swf_files.append(p)
else:
log.warning(f"Skipping non-SWF file: {p}")
if not swf_files:
log.error("No SWF files found.")
sys.exit(1)
log.info(f"Found {len(swf_files)} SWF file(s).")
# Inspect mode
if args.inspect:
inspector = SWFInspector()
for swf in swf_files:
info = inspector.inspect(swf)
print(json.dumps({str(swf): info}, indent=2))
return
# Generate config mode
if args.generate_config:
cfg_mgr = ConfigManager(args.config)
inspector = SWFInspector()
cfg_mgr.generate_starter(swf_files, inspector)
print(f"Starter config written to: {args.config}")
print("Edit it to add interaction points, then run without --generate-config.")
return
# Load config
cfg_mgr = ConfigManager(args.config)
global_defaults = {
"loop_grace": args.loop_grace,
"max_duration": args.max_duration,
}
output_dir = Path(args.output)
output_dir.mkdir(parents=True, exist_ok=True)
# Convert each file
all_results = {}
for swf_path in swf_files:
cfg = cfg_mgr.get(swf_path.name, global_defaults)
try:
results = convert_swf(swf_path, cfg, output_dir, dry_run=args.dry_run)
all_results[swf_path.name] = results
except Exception as e:
log.exception(f"Unexpected error converting {swf_path.name}: {e}")
all_results[swf_path.name] = {"error": str(e)}
# Summary
print("\n" + "=" * 60)
print("CONVERSION SUMMARY")
print("=" * 60)
for swf_name, results in all_results.items():
print(f"\n{swf_name}:")
if isinstance(results, list):
for r in results:
status = "[DRY RUN]" if r.get("dry_run") else ""
print(f" {status} [{r['label']}] → {r['file']}")
else:
print(f" ✗ ERROR: {results.get('error')}")
# Write results JSON
results_path = output_dir / "conversion_results.json"
with open(results_path, "w") as f:
json.dump(all_results, f, indent=2)
log.info(f"Results written to {results_path}")
if __name__ == "__main__":
main()
+162
View File
@@ -0,0 +1,162 @@
#!/usr/bin/env bash
# =============================================================================
# SWF Converter — Fedora Setup Script
# =============================================================================
# Run with: bash install.sh
# =============================================================================
set -euo pipefail
RED='\033[0;31m'
GREEN='\033[0;32m'
YELLOW='\033[1;33m'
CYAN='\033[0;36m'
BOLD='\033[1m'
NC='\033[0m'
info() { echo -e "${CYAN}[INFO]${NC} $*"; }
success() { echo -e "${GREEN}[OK]${NC} $*"; }
warn() { echo -e "${YELLOW}[WARN]${NC} $*"; }
error() { echo -e "${RED}[ERR]${NC} $*"; }
echo -e "\n${BOLD}SWF → MP4 Converter Setup (Fedora)${NC}"
echo "=================================================="
# --- System dependencies ---
info "Installing system packages via dnf..."
sudo dnf install -y \
ffmpeg \
xdotool \
ImageMagick \
scrot \
python3 \
python3-pip \
python3-tkinter \
xorg-x11-server-Xvfb
success "System packages installed."
# --- Python dependencies ---
info "Installing Python packages..."
pip3 install --user pillow
success "Python packages installed."
# --- Ruffle ---
echo ""
info "Checking for Ruffle..."
RUFFLE_BIN="$HOME/.local/bin/ruffle"
if command -v ruffle &>/dev/null || [ -f "$RUFFLE_BIN" ]; then
success "Ruffle found: $(command -v ruffle 2>/dev/null || echo $RUFFLE_BIN)"
else
warn "Ruffle not found. Attempting to download latest release..."
mkdir -p "$HOME/.local/bin"
# Detect arch
ARCH=$(uname -m)
case "$ARCH" in
x86_64) RUFFLE_ARCH="x86_64-linux" ;;
aarch64) RUFFLE_ARCH="aarch64-linux" ;;
*)
error "Unknown architecture: $ARCH"
echo "Download Ruffle manually from: https://github.com/ruffle-rs/ruffle/releases"
echo "Place the binary at: $RUFFLE_BIN"
RUFFLE_ARCH=""
;;
esac
if [ -n "$RUFFLE_ARCH" ]; then
# Get latest release URL
RELEASE_URL=$(curl -s https://api.github.com/repos/ruffle-rs/ruffle/releases/latest \
| grep "browser_download_url" \
| grep "$RUFFLE_ARCH" \
| grep -v ".sha256" \
| head -1 \
| cut -d '"' -f 4)
if [ -n "$RELEASE_URL" ]; then
info "Downloading: $RELEASE_URL"
TMP_DIR=$(mktemp -d)
curl -L "$RELEASE_URL" -o "$TMP_DIR/ruffle.tar.gz"
tar -xzf "$TMP_DIR/ruffle.tar.gz" -C "$TMP_DIR"
RUFFLE_EXE=$(find "$TMP_DIR" -name "ruffle" -type f | head -1)
if [ -n "$RUFFLE_EXE" ]; then
cp "$RUFFLE_EXE" "$RUFFLE_BIN"
chmod +x "$RUFFLE_BIN"
rm -rf "$TMP_DIR"
success "Ruffle installed to $RUFFLE_BIN"
else
error "Could not find ruffle binary in archive."
echo "Download manually: https://github.com/ruffle-rs/ruffle/releases"
fi
else
error "Could not find Ruffle release for $RUFFLE_ARCH."
echo "Download manually: https://github.com/ruffle-rs/ruffle/releases"
fi
fi
fi
# Ensure ~/.local/bin is on PATH
if [[ ":$PATH:" != *":$HOME/.local/bin:"* ]]; then
warn "~/.local/bin is not in your PATH."
echo " Add this to your ~/.bashrc or ~/.zshrc:"
echo ' export PATH="$HOME/.local/bin:$PATH"'
fi
# --- Lightspark (optional fallback) ---
echo ""
info "Lightspark (optional fallback emulator)..."
if command -v lightspark &>/dev/null; then
success "Lightspark found."
else
warn "Lightspark not found. Optional — only needed if Ruffle can't run a SWF."
echo " To install: sudo dnf install lightspark"
echo " Or build from source: https://github.com/lightspark/lightspark"
fi
# --- Verify ffmpeg capabilities ---
echo ""
info "Verifying FFmpeg x11grab support..."
if ffmpeg -f x11grab -i :0 -t 0.1 /dev/null 2>&1 | grep -q "x11grab"; then
warn "x11grab reported an issue — ensure a display (:0) is available when recording."
else
success "FFmpeg x11grab available."
fi
# --- Verify PulseAudio ---
info "Verifying PulseAudio..."
if pactl info &>/dev/null; then
success "PulseAudio running."
else
warn "PulseAudio not running. Audio recording may fail."
echo " Start with: pulseaudio --start"
echo " Or use '--no-audio' flag when converting (set audio: false in config)."
fi
# --- Virtual display setup (for headless/server use) ---
echo ""
info "Virtual display (Xvfb) — for headless environments only..."
echo " If running on a server without a display, start Xvfb with:"
echo " Xvfb :99 -screen 0 1024x768x24 &"
echo " export DISPLAY=:99"
echo " Then run conversions normally."
# --- Final check ---
echo ""
echo -e "${BOLD}Setup complete! Quick-start:${NC}"
echo ""
echo " # Inspect a SWF file:"
echo " python3 convert.py --inspect my.swf"
echo ""
echo " # Generate a config template:"
echo " python3 convert.py --generate-config *.swf"
echo ""
echo " # Map interaction points interactively:"
echo " python3 map_interactions.py my.swf"
echo ""
echo " # Convert a single SWF:"
echo " python3 convert.py my.swf"
echo ""
echo " # Convert a directory of SWFs:"
echo " python3 convert.py ./swf_files/ -o ./output/"
echo ""
+178
View File
@@ -0,0 +1,178 @@
#!/usr/bin/env python3
"""
Interaction Controller - Injects mouse clicks into a running Ruffle window
at specified timestamps using xdotool.
"""
import logging
import subprocess
import threading
import time
from dataclasses import dataclass
from typing import List, Optional
log = logging.getLogger("interaction")
@dataclass
class ClickEvent:
"""A single click interaction to perform."""
t: float # Time (seconds from start of recording) to fire this click
x: int # X coordinate relative to the window
y: int # Y coordinate relative to the window
label: str = "" # Human-readable label
button: int = 1 # Mouse button (1=left, 2=middle, 3=right)
double: bool = False # Whether to double-click
class InteractionController:
"""
Schedules and dispatches click events into a running Ruffle window.
Usage
-----
ctrl = InteractionController(window_id="0x1234abc", clicks=[...])
ctrl.start(recording_start_time=time.monotonic())
# ... recording happens ...
ctrl.stop()
"""
def __init__(self, window_id: str, clicks: List[dict]):
self.window_id = window_id
self.clicks = [
ClickEvent(
t=c["t"],
x=c["x"],
y=c["y"],
label=c.get("label", ""),
button=c.get("button", 1),
double=c.get("double", False),
)
for c in sorted(clicks, key=lambda c: c["t"])
]
self._thread: Optional[threading.Thread] = None
self._stop_event = threading.Event()
self._recording_start: float = 0.0
def start(self, recording_start_time: float):
"""Begin the click scheduler. Call immediately after recording starts."""
self._recording_start = recording_start_time
self._stop_event.clear()
self._thread = threading.Thread(target=self._schedule_clicks, daemon=True)
self._thread.start()
log.info(f"InteractionController started with {len(self.clicks)} click(s).")
def stop(self):
"""Cancel any pending clicks."""
self._stop_event.set()
if self._thread:
self._thread.join(timeout=2.0)
def wait_until_done(self):
"""Block until all clicks have been dispatched."""
if self._thread:
self._thread.join()
# ------------------------------------------------------------------
# Internal
# ------------------------------------------------------------------
def _schedule_clicks(self):
for click in self.clicks:
if self._stop_event.is_set():
break
elapsed = time.monotonic() - self._recording_start
wait = click.t - elapsed
if wait > 0:
# Sleep in small increments so we can respond to stop events
deadline = time.monotonic() + wait
while time.monotonic() < deadline:
if self._stop_event.is_set():
return
time.sleep(min(0.05, deadline - time.monotonic()))
if not self._stop_event.is_set():
self._fire_click(click)
def _fire_click(self, click: ClickEvent):
"""Execute a click using xdotool."""
try:
# Move mouse to position within window
subprocess.run(
[
"xdotool", "mousemove",
"--window", self.window_id,
str(click.x), str(click.y),
],
check=True,
timeout=2.0,
)
time.sleep(0.05) # Brief settle time
# Fire the click(s)
click_cmd = ["xdotool", "click", "--window", self.window_id, str(click.button)]
if click.double:
subprocess.run(click_cmd, check=True, timeout=2.0)
time.sleep(0.1)
subprocess.run(click_cmd, check=True, timeout=2.0)
log.info(
f" Click fired: label='{click.label}' "
f"t={click.t:.2f}s x={click.x} y={click.y} "
f"btn={click.button} double={click.double}"
)
except subprocess.CalledProcessError as e:
log.error(f"xdotool click failed: {e}")
except FileNotFoundError:
log.error("xdotool not found. Install with: sudo dnf install xdotool")
except subprocess.TimeoutExpired:
log.error("xdotool click timed out.")
class InteractionMapper:
"""
Helper for discovering interaction points interactively.
Runs Ruffle in the foreground and records the user's clicks + timestamps
so they can be saved to a config file.
"""
def __init__(self, window_id: str):
self.window_id = window_id
self._recorded: list = []
self._start_time: float = 0.0
self._running = False
def start_recording(self):
"""
Listen for clicks in the Ruffle window via xdotool key event tracking.
Note: This is a best-effort approach — for accurate mapping, use the
interactive GUI tool (gui.py) which shows a live overlay.
"""
self._start_time = time.monotonic()
self._running = True
log.info("Interaction recording started. Press Ctrl+C to stop.")
try:
while self._running:
# Poll for click position using xdotool
result = subprocess.run(
["xdotool", "getmouselocation", "--window", self.window_id],
capture_output=True,
text=True,
timeout=1.0,
)
# This is polled, not event-driven — use GUI for real click capture
time.sleep(0.1)
except KeyboardInterrupt:
self._running = False
log.info("Interaction recording stopped.")
def get_recorded(self) -> list:
return self._recorded
def record_click_at(self, x: int, y: int, label: str = ""):
"""Manually register a click event (called from GUI or external hook)."""
t = time.monotonic() - self._start_time
entry = {"t": round(t, 2), "x": x, "y": y, "label": label or f"click_{len(self._recorded)+1}"}
self._recorded.append(entry)
log.info(f"Recorded interaction: {entry}")
return entry
+232
View File
@@ -0,0 +1,232 @@
#!/usr/bin/env python3
"""
Loop Detector - Detects when a SWF has entered a repeating loop.
Strategy: Periodically capture a downsampled screenshot of the Ruffle window,
hash it, and compare against recent hashes. A repeated hash sequence indicates
a loop has started.
"""
import hashlib
import logging
import subprocess
import time
from collections import deque
from dataclasses import dataclass, field
from typing import Optional
log = logging.getLogger("loop_detector")
@dataclass
class FrameRecord:
timestamp: float
hash: str
class LoopDetector:
"""
Detects animation loops by comparing periodic frame hashes.
Parameters
----------
window_id : str
X11 window ID of the Ruffle window (hex string like '0x1234abc').
poll_interval : float
Seconds between frame captures (default 0.4 — 2-3 samples/sec is enough).
sequence_length : int
Number of consecutive matching frames required to confirm a loop (default 4).
hash_history : int
How many past hashes to retain for comparison (default 300 ≈ 2 min at 0.4s).
thumbnail_size : str
Resolution to downsample frames to before hashing (smaller = faster, less
sensitive to minor rendering differences).
"""
def __init__(
self,
window_id: str,
poll_interval: float = 0.4,
sequence_length: int = 4,
hash_history: int = 300,
thumbnail_size: str = "64x64",
):
self.window_id = window_id
self.poll_interval = poll_interval
self.sequence_length = sequence_length
self.thumbnail_size = thumbnail_size
self._history: deque[FrameRecord] = deque(maxlen=hash_history)
self._loop_detected_at: Optional[float] = None
self._running = False
# ------------------------------------------------------------------
# Public API
# ------------------------------------------------------------------
def start(self):
"""Begin background polling (call in a thread)."""
self._running = True
self._poll_loop()
def stop(self):
self._running = False
def loop_detected(self) -> bool:
return self._loop_detected_at is not None
def loop_detected_at(self) -> Optional[float]:
"""Wall-clock time when the loop was first detected, or None."""
return self._loop_detected_at
def reset(self):
self._history.clear()
self._loop_detected_at = None
# ------------------------------------------------------------------
# Internal
# ------------------------------------------------------------------
def _poll_loop(self):
while self._running:
h = self._capture_hash()
if h:
now = time.monotonic()
record = FrameRecord(timestamp=now, hash=h)
self._check_for_loop(record)
self._history.append(record)
time.sleep(self.poll_interval)
def _capture_hash(self) -> Optional[str]:
"""
Screenshot the Ruffle window (downsampled) and return an MD5 hash.
Uses ImageMagick `import` — falls back to `scrot` if unavailable.
"""
try:
result = subprocess.run(
[
"import",
"-window", self.window_id,
"-resize", self.thumbnail_size,
"-depth", "8",
"png:-",
],
capture_output=True,
timeout=2.0,
)
if result.returncode == 0 and result.stdout:
return hashlib.md5(result.stdout).hexdigest()
else:
log.debug(f"import failed: {result.stderr.decode()[:200]}")
return self._capture_hash_scrot()
except FileNotFoundError:
return self._capture_hash_scrot()
except subprocess.TimeoutExpired:
log.warning("Frame capture timed out.")
return None
def _capture_hash_scrot(self) -> Optional[str]:
"""Fallback: use scrot to capture window by ID."""
try:
import tempfile, os
with tempfile.NamedTemporaryFile(suffix=".png", delete=False) as f:
tmp = f.name
result = subprocess.run(
["scrot", "--window", self.window_id, tmp],
capture_output=True,
timeout=2.0,
)
if result.returncode == 0:
# Downsample with ffmpeg then hash
result2 = subprocess.run(
[
"ffmpeg", "-y", "-i", tmp,
"-vf", f"scale={self.thumbnail_size.replace('x', ':')}",
"-frames:v", "1", "-f", "rawvideo", "-pix_fmt", "rgb24", "pipe:1",
],
capture_output=True,
timeout=3.0,
)
os.unlink(tmp)
if result2.returncode == 0 and result2.stdout:
return hashlib.md5(result2.stdout).hexdigest()
return None
except Exception as e:
log.debug(f"scrot fallback failed: {e}")
return None
def _check_for_loop(self, current: FrameRecord):
if self._loop_detected_at is not None:
return # already detected
# Include the current (not-yet-appended) record in the full timeline
history_list = list(self._history) + [current]
if len(history_list) < self.sequence_length * 2:
return # not enough history yet
# Build a sliding window of the last `sequence_length` hashes
recent = [r.hash for r in history_list[-self.sequence_length:]]
# Search backwards in the earlier portion for a matching sequence
search_range = history_list[: -self.sequence_length]
for i in range(len(search_range) - self.sequence_length + 1):
window = [r.hash for r in search_range[i : i + self.sequence_length]]
if window == recent:
self._loop_detected_at = current.timestamp
log.info(
f"Loop detected at t={current.timestamp:.1f}s "
f"(matched sequence starting at history index {i})"
)
return
class StillnessDetector:
"""
Simpler alternative: detect when the video has been completely static
for a given duration. Uses FFmpeg freezedetect on the recorded file.
This runs as a post-processing step on a raw recording.
"""
@staticmethod
def find_freeze_start(video_path: str, noise_tolerance: float = 0.001, min_duration: float = 5.0) -> Optional[float]:
"""
Returns the timestamp (seconds) where a freeze of >= min_duration starts,
or None if no such freeze exists.
"""
result = subprocess.run(
[
"ffmpeg", "-i", video_path,
"-vf", f"freezedetect=n={noise_tolerance}:d={min_duration}",
"-f", "null", "-",
],
capture_output=True,
text=True,
timeout=300,
)
output = result.stderr
for line in output.splitlines():
if "freeze_start" in line:
try:
t = float(line.split("freeze_start:")[1].strip().split()[0])
log.info(f"Freeze detected at {t:.2f}s in {video_path}")
return t
except (IndexError, ValueError):
pass
return None
@staticmethod
def trim_at_freeze(input_path: str, output_path: str, freeze_start: float, grace: float = 5.0):
"""Trim a video to end `grace` seconds after the freeze start."""
end_time = freeze_start + grace
subprocess.run(
[
"ffmpeg", "-y",
"-i", input_path,
"-t", str(end_time),
"-c:v", "libx264", "-crf", "18",
"-c:a", "aac",
output_path,
],
check=True,
)
log.info(f"Trimmed video saved to {output_path} (end={end_time:.1f}s)")
+308
View File
@@ -0,0 +1,308 @@
#!/usr/bin/env python3
"""
Interaction Mapper GUI
A Tkinter-based tool that:
1. Launches a SWF file in Ruffle
2. Shows a transparent overlay with a timer
3. Records every click (timestamp + coordinates) as you interact
4. Saves the interactions to the config file
Run:
python3 map_interactions.py my_animation.swf
"""
import argparse
import json
import logging
import subprocess
import sys
import threading
import time
import tkinter as tk
from pathlib import Path
from tkinter import messagebox, simpledialog
from config_manager import ConfigManager
from recorder import find_binary, find_window_id, get_window_geometry, RUFFLE_CANDIDATES
log = logging.getLogger("map_interactions")
logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s")
class InteractionMapperGUI:
def __init__(self, swf_path: Path, config_path: str):
self.swf_path = swf_path
self.config_mgr = ConfigManager(config_path)
self.recorded_clicks = []
self.start_time = None
self.emulator_proc = None
self.ruffle_window_id = None
self._running = False
self.root = tk.Tk()
self.root.title(f"Interaction Mapper — {swf_path.name}")
self.root.configure(bg="#1a1a2e")
self.root.resizable(False, False)
self._build_ui()
def _build_ui(self):
root = self.root
PAD = 16
FONT_MONO = ("Courier New", 11)
FONT_LABEL = ("Helvetica", 10)
BG = "#1a1a2e"
FG = "#e2e8f0"
ACCENT = "#7c3aed"
GREEN = "#22c55e"
RED = "#ef4444"
CARD = "#16213e"
root.configure(bg=BG)
# Title
tk.Label(
root, text="⚡ SWF Interaction Mapper", font=("Helvetica", 14, "bold"),
bg=BG, fg=ACCENT
).pack(pady=(PAD, 4))
tk.Label(
root, text=f"File: {self.swf_path.name}", font=FONT_LABEL,
bg=BG, fg="#94a3b8"
).pack()
# Timer display
timer_frame = tk.Frame(root, bg=CARD, padx=PAD, pady=PAD)
timer_frame.pack(fill="x", padx=PAD, pady=(PAD, 0))
tk.Label(timer_frame, text="Elapsed Time", font=FONT_LABEL, bg=CARD, fg="#94a3b8").pack()
self.timer_label = tk.Label(
timer_frame, text="00:00.000", font=("Courier New", 28, "bold"),
bg=CARD, fg=GREEN
)
self.timer_label.pack()
# Instructions
instr_frame = tk.Frame(root, bg=CARD, padx=PAD, pady=8)
instr_frame.pack(fill="x", padx=PAD, pady=4)
instructions = (
"1. Click 'Launch & Start' to open the SWF\n"
"2. Click 'Record Click' each time an interaction fires\n"
"3. Enter coordinates from the Ruffle window\n"
"4. Click 'Save Config' when done"
)
tk.Label(instr_frame, text=instructions, font=("Helvetica", 9), bg=CARD, fg="#94a3b8",
justify="left").pack(anchor="w")
# Recorded clicks list
list_frame = tk.Frame(root, bg=CARD, padx=PAD, pady=PAD)
list_frame.pack(fill="both", expand=True, padx=PAD, pady=4)
tk.Label(list_frame, text="Recorded Interactions", font=("Helvetica", 10, "bold"),
bg=CARD, fg=FG).pack(anchor="w")
self.clicks_listbox = tk.Listbox(
list_frame, font=FONT_MONO, bg="#0f172a", fg=FG,
selectbackground=ACCENT, height=10, width=52,
borderwidth=0, highlightthickness=1, highlightbackground=ACCENT
)
self.clicks_listbox.pack(fill="both", expand=True, pady=4)
# Buttons
btn_frame = tk.Frame(root, bg=BG)
btn_frame.pack(fill="x", padx=PAD, pady=PAD)
def btn(parent, text, cmd, color=ACCENT):
return tk.Button(
parent, text=text, command=cmd,
bg=color, fg="white", font=("Helvetica", 10, "bold"),
relief="flat", padx=12, pady=6, cursor="hand2",
activebackground=color, activeforeground="white",
)
btn(btn_frame, "▶ Launch & Start", self._launch_and_start, GREEN).pack(side="left", padx=2)
btn(btn_frame, "⊕ Record Click", self._record_click_dialog, ACCENT).pack(side="left", padx=2)
btn(btn_frame, "✕ Delete Selected", self._delete_selected, RED).pack(side="left", padx=2)
btn(btn_frame, "💾 Save Config", self._save_config, "#0ea5e9").pack(side="right", padx=2)
self.status_label = tk.Label(
root, text="Ready. Click 'Launch & Start' to begin.",
font=("Helvetica", 9), bg=BG, fg="#94a3b8"
)
self.status_label.pack(pady=(0, PAD))
root.protocol("WM_DELETE_WINDOW", self._on_close)
root.after(100, self._update_timer)
def _launch_and_start(self):
if self._running:
self._set_status("Already running!")
return
binary = find_binary(RUFFLE_CANDIDATES)
if not binary:
messagebox.showerror(
"Ruffle Not Found",
"Ruffle binary not found.\n\n"
"Download from: https://github.com/ruffle-rs/ruffle/releases\n"
"Place at: ~/.local/bin/ruffle"
)
return
self._set_status(f"Launching {self.swf_path.name}...")
self.emulator_proc = subprocess.Popen(
[binary, str(self.swf_path)],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
)
self.start_time = time.monotonic()
self._running = True
self._set_status(f"Running. Record clicks as they happen in Ruffle.")
def _record_click_dialog(self):
if not self._running:
self._set_status("Launch the SWF first!")
return
elapsed = time.monotonic() - self.start_time
# Try to auto-detect mouse position in Ruffle window
auto_x, auto_y = None, None
if self.ruffle_window_id is None:
self.ruffle_window_id = find_window_id("Ruffle", timeout=1.0)
if self.ruffle_window_id:
try:
result = subprocess.run(
["xdotool", "getmouselocation", "--shell"],
capture_output=True, text=True, timeout=1.0
)
geo = get_window_geometry(self.ruffle_window_id)
if result.returncode == 0 and geo:
d = {}
for line in result.stdout.splitlines():
if "=" in line:
k, v = line.split("=", 1)
d[k.strip().lower()] = int(v.strip())
auto_x = d.get("x", 0) - geo.get("x", 0)
auto_y = d.get("y", 0) - geo.get("y", 0)
except Exception:
pass
# Dialog
dialog = tk.Toplevel(self.root)
dialog.title("Record Interaction")
dialog.configure(bg="#1a1a2e")
dialog.grab_set()
dialog.resizable(False, False)
tk.Label(dialog, text=f"Time: {elapsed:.2f}s", font=("Courier New", 12, "bold"),
bg="#1a1a2e", fg="#22c55e").pack(pady=(12, 4))
fields = {}
for label, default in [
("X coordinate", str(auto_x or 0)),
("Y coordinate", str(auto_y or 0)),
("Label", f"click_{len(self.recorded_clicks)+1}"),
]:
f = tk.Frame(dialog, bg="#1a1a2e")
f.pack(fill="x", padx=16, pady=2)
tk.Label(f, text=label, width=14, anchor="w", bg="#1a1a2e", fg="#e2e8f0",
font=("Helvetica", 10)).pack(side="left")
e = tk.Entry(f, bg="#0f172a", fg="#e2e8f0", insertbackground="white",
font=("Courier New", 10), width=20)
e.insert(0, default)
e.pack(side="left", padx=4)
fields[label] = e
def confirm():
try:
x = int(fields["X coordinate"].get())
y = int(fields["Y coordinate"].get())
label = fields["Label"].get().strip() or f"click_{len(self.recorded_clicks)+1}"
entry = {"t": round(elapsed, 3), "x": x, "y": y, "label": label}
self.recorded_clicks.append(entry)
self.clicks_listbox.insert(
tk.END,
f" t={entry['t']:7.3f}s x={x:4d} y={y:4d} [{label}]"
)
self._set_status(f"Recorded: {label} at t={elapsed:.2f}s ({x},{y})")
dialog.destroy()
except ValueError:
messagebox.showerror("Invalid Input", "X and Y must be integers.", parent=dialog)
tk.Button(
dialog, text="✓ Confirm", command=confirm,
bg="#7c3aed", fg="white", font=("Helvetica", 10, "bold"),
relief="flat", padx=12, pady=6
).pack(pady=12)
def _delete_selected(self):
sel = self.clicks_listbox.curselection()
if not sel:
return
idx = sel[0]
self.clicks_listbox.delete(idx)
del self.recorded_clicks[idx]
self._set_status(f"Deleted interaction {idx + 1}.")
def _save_config(self):
if not self.recorded_clicks:
if not messagebox.askyesno("No Interactions", "No interactions recorded. Save empty entry?"):
return
for click in self.recorded_clicks:
self.config_mgr.add_interaction(self.swf_path.name, click)
self.config_mgr.save()
self._set_status(f"✓ Saved {len(self.recorded_clicks)} interaction(s) to config.")
messagebox.showinfo(
"Saved",
f"Saved {len(self.recorded_clicks)} interaction(s) to:\n{self.config_mgr.config_path}"
)
def _update_timer(self):
if self._running and self.start_time:
elapsed = time.monotonic() - self.start_time
minutes = int(elapsed // 60)
seconds = elapsed % 60
self.timer_label.config(text=f"{minutes:02d}:{seconds:06.3f}")
self.root.after(50, self._update_timer)
def _set_status(self, msg: str):
self.status_label.config(text=msg)
log.info(msg)
def _on_close(self):
if self.emulator_proc:
try:
self.emulator_proc.terminate()
except Exception:
pass
self.root.destroy()
def run(self):
self.root.mainloop()
def main():
parser = argparse.ArgumentParser(
description="GUI tool for recording SWF interaction points."
)
parser.add_argument("swf", help="SWF file to map interactions for")
parser.add_argument(
"-c", "--config",
default="configs/swf_config.json",
help="Config file to save interactions to",
)
args = parser.parse_args()
swf_path = Path(args.swf)
if not swf_path.exists():
print(f"Error: SWF file not found: {swf_path}", file=sys.stderr)
sys.exit(1)
app = InteractionMapperGUI(swf_path, args.config)
app.run()
if __name__ == "__main__":
main()
+390
View File
@@ -0,0 +1,390 @@
#!/usr/bin/env python3
"""
Recorder - Launches Ruffle, captures the window with FFmpeg, injects clicks,
and stops recording based on loop detection or configured duration.
"""
import logging
import os
import signal
import subprocess
import threading
import time
import tempfile
from pathlib import Path
from typing import List, Optional
from loop_detector import LoopDetector, StillnessDetector
from interaction import InteractionController
log = logging.getLogger("recorder")
# Default Ruffle binary locations to try (in order)
RUFFLE_CANDIDATES = [
"ruffle",
os.path.expanduser("~/.local/bin/ruffle"),
"/usr/local/bin/ruffle",
"/opt/ruffle/ruffle",
os.path.expanduser("~/ruffle"),
]
# Default Lightspark binary (fallback emulator)
LIGHTSPARK_CANDIDATES = [
"lightspark",
"/usr/local/bin/lightspark",
]
def find_binary(candidates: list) -> Optional[str]:
for c in candidates:
try:
result = subprocess.run(["which", c], capture_output=True, text=True)
if result.returncode == 0:
return c
except Exception:
pass
if os.path.isfile(c) and os.access(c, os.X_OK):
return c
return None
def find_window_id(title_fragment: str, timeout: float = 15.0) -> Optional[str]:
"""Poll xdotool until a window with the given title fragment appears."""
deadline = time.monotonic() + timeout
while time.monotonic() < deadline:
try:
result = subprocess.run(
["xdotool", "search", "--name", title_fragment],
capture_output=True,
text=True,
timeout=2.0,
)
if result.returncode == 0 and result.stdout.strip():
wid = result.stdout.strip().splitlines()[0]
log.info(f"Found window '{title_fragment}': ID={wid}")
return wid
except Exception:
pass
time.sleep(0.5)
log.error(f"Window '{title_fragment}' not found within {timeout}s.")
return None
def get_window_geometry(window_id: str) -> Optional[dict]:
"""Return {'x': int, 'y': int, 'width': int, 'height': int} for a window."""
try:
result = subprocess.run(
["xdotool", "getwindowgeometry", "--shell", window_id],
capture_output=True, text=True, timeout=3.0,
)
if result.returncode == 0:
geo = {}
for line in result.stdout.splitlines():
if "=" in line:
k, v = line.split("=", 1)
geo[k.strip().lower()] = int(v.strip())
return geo
except Exception as e:
log.debug(f"getwindowgeometry failed: {e}")
return None
class Recorder:
"""
Manages a single SWF→MP4 conversion pass.
One Recorder is created per SWF file. `.record()` can be called multiple
times (once per interaction branch).
"""
def __init__(self, swf_path: Path, cfg: dict):
self.swf_path = swf_path
self.cfg = cfg
# Config values with sensible defaults
self.loop_grace = cfg.get("loop_grace", 5.0)
self.max_duration = cfg.get("max_duration", 600.0)
self.fps = cfg.get("fps", 30)
self.crf = cfg.get("crf", 18) # FFmpeg quality (lower = better)
self.window_size = cfg.get("window_size", "800x600")
self.emulator = cfg.get("emulator", "ruffle") # "ruffle" or "lightspark"
self.end_seconds = cfg.get("end_seconds") # Hard stop time, if known
self.end_frame = cfg.get("end_frame") # Hard stop frame, if known
self.loop_detection = cfg.get("loop_detection", "hash") # "hash", "freeze", or "none"
self.startup_delay = cfg.get("startup_delay", 3.0) # Wait after launch before recording
self.audio = cfg.get("audio", True)
def record(self, output_path: Path, clicks: list) -> bool:
"""
Run one recording pass.
Parameters
----------
output_path : Path
Where to save the resulting MP4.
clicks : list
List of click dicts {t, x, y, label, ...} to inject.
Returns
-------
bool
True on success.
"""
log.info(f"Starting recording pass: {output_path.name}")
output_path.parent.mkdir(parents=True, exist_ok=True)
# If using freeze detection, we record raw first then post-process
use_freeze_postprocess = (
self.loop_detection == "freeze"
and self.end_seconds is None
and self.end_frame is None
)
raw_path = output_path
if use_freeze_postprocess:
tmp = tempfile.NamedTemporaryFile(suffix=".mp4", delete=False)
raw_path = Path(tmp.name)
tmp.close()
try:
return self._do_record(raw_path, output_path, clicks, use_freeze_postprocess)
except Exception as e:
log.exception(f"Recording failed: {e}")
return False
def _do_record(
self,
raw_path: Path,
final_path: Path,
clicks: list,
use_freeze_postprocess: bool,
) -> bool:
# --- Find emulator ---
if self.emulator == "lightspark":
binary = find_binary(LIGHTSPARK_CANDIDATES)
if not binary:
log.error("Lightspark not found. Falling back to Ruffle.")
binary = find_binary(RUFFLE_CANDIDATES)
else:
binary = find_binary(RUFFLE_CANDIDATES)
if not binary:
log.error(
"No Flash emulator found. Install Ruffle:\n"
" https://github.com/ruffle-rs/ruffle/releases\n"
" Place the binary at ~/.local/bin/ruffle"
)
return False
# --- Launch emulator ---
env = os.environ.copy()
env["DISPLAY"] = env.get("DISPLAY", ":0")
emulator_cmd = [binary, str(self.swf_path)]
log.info(f"Launching: {' '.join(emulator_cmd)}")
emulator_proc = subprocess.Popen(
emulator_cmd,
env=env,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
)
# --- Find window ---
window_id = find_window_id("Ruffle", timeout=15.0)
if not window_id:
window_id = find_window_id(self.swf_path.stem, timeout=5.0)
if not window_id:
log.error("Could not find emulator window. Is a display available?")
emulator_proc.kill()
return False
# --- Wait for startup ---
log.info(f"Waiting {self.startup_delay}s for emulator startup...")
time.sleep(self.startup_delay)
# Get window geometry for FFmpeg
geo = get_window_geometry(window_id)
if geo:
capture_size = f"{geo['width']}x{geo['height']}"
capture_offset = f"{geo['x']},{geo['y']}"
else:
capture_size = self.window_size
capture_offset = "0,0"
# --- Start FFmpeg recording ---
display = env.get("DISPLAY", ":0")
ffmpeg_cmd = self._build_ffmpeg_cmd(
display=display,
capture_size=capture_size,
capture_offset=capture_offset,
output_path=raw_path,
)
log.info(f"FFmpeg command: {' '.join(ffmpeg_cmd)}")
ffmpeg_proc = subprocess.Popen(
ffmpeg_cmd,
stdin=subprocess.PIPE,
stdout=subprocess.DEVNULL,
stderr=subprocess.PIPE,
)
recording_start = time.monotonic()
# --- Start click injector ---
interaction_ctrl = None
if clicks:
interaction_ctrl = InteractionController(window_id=window_id, clicks=clicks)
interaction_ctrl.start(recording_start_time=recording_start)
# --- Start loop detector (hash-based) ---
loop_detector = None
stop_event = threading.Event()
if self.loop_detection == "hash" and self.end_seconds is None:
loop_detector = LoopDetector(window_id=window_id)
loop_thread = threading.Thread(
target=loop_detector.start, daemon=True
)
loop_thread.start()
# --- Monitor recording ---
stop_reason = self._monitor_recording(
emulator_proc=emulator_proc,
ffmpeg_proc=ffmpeg_proc,
loop_detector=loop_detector,
recording_start=recording_start,
)
# --- Stop everything ---
log.info(f"Stopping recording. Reason: {stop_reason}")
if loop_detector:
loop_detector.stop()
if interaction_ctrl:
interaction_ctrl.stop()
# Send 'q' to FFmpeg to gracefully finalize the MP4
try:
ffmpeg_proc.stdin.write(b"q")
ffmpeg_proc.stdin.flush()
ffmpeg_proc.wait(timeout=10)
except Exception:
ffmpeg_proc.kill()
# Kill emulator
try:
emulator_proc.terminate()
emulator_proc.wait(timeout=5)
except Exception:
emulator_proc.kill()
# --- Post-process: freeze detection trim ---
if use_freeze_postprocess and raw_path.exists():
log.info("Running freeze detection post-process...")
freeze_t = StillnessDetector.find_freeze_start(
str(raw_path),
min_duration=3.0,
)
if freeze_t:
StillnessDetector.trim_at_freeze(
str(raw_path),
str(final_path),
freeze_t,
grace=self.loop_grace,
)
raw_path.unlink(missing_ok=True)
else:
log.info("No freeze detected; using full recording.")
raw_path.rename(final_path)
elif not use_freeze_postprocess and raw_path != final_path:
raw_path.rename(final_path)
success = final_path.exists() and final_path.stat().st_size > 0
if success:
duration = time.monotonic() - recording_start
log.info(f"Recording complete: {final_path} ({duration:.1f}s recorded)")
else:
log.error(f"Output file missing or empty: {final_path}")
return success
def _monitor_recording(
self,
emulator_proc,
ffmpeg_proc,
loop_detector: Optional[LoopDetector],
recording_start: float,
) -> str:
"""
Block until we decide to stop recording. Returns a string describing why.
"""
loop_grace_remaining = self.loop_grace
loop_stop_at = None
while True:
elapsed = time.monotonic() - recording_start
# Hard stop: configured end_seconds
if self.end_seconds is not None and elapsed >= self.end_seconds:
return f"end_seconds={self.end_seconds}"
# Hard stop: max_duration
if elapsed >= self.max_duration:
return f"max_duration={self.max_duration}"
# Emulator crashed
if emulator_proc.poll() is not None:
return "emulator_exited"
# FFmpeg crashed
if ffmpeg_proc.poll() is not None:
return "ffmpeg_exited"
# Loop detection
if loop_detector and loop_detector.loop_detected():
if loop_stop_at is None:
loop_stop_at = time.monotonic() + self.loop_grace
log.info(f"Loop confirmed; will stop in {self.loop_grace}s at t={elapsed + self.loop_grace:.1f}s")
if time.monotonic() >= loop_stop_at:
return f"loop_detected+{self.loop_grace}s_grace"
time.sleep(0.2)
def _build_ffmpeg_cmd(
self,
display: str,
capture_size: str,
capture_offset: str,
output_path: Path,
) -> list:
"""Build the FFmpeg command for x11grab capture."""
cmd = [
"ffmpeg", "-y",
# Input: X11 screen capture
"-f", "x11grab",
"-framerate", str(self.fps),
"-video_size", capture_size,
"-i", f"{display}+{capture_offset}",
]
if self.audio:
cmd += [
# Input: PulseAudio capture
"-f", "pulse",
"-i", "default",
]
cmd += [
# Video encoding
"-c:v", "libx264",
"-preset", "fast",
"-crf", str(self.crf),
"-pix_fmt", "yuv420p",
]
if self.audio:
cmd += ["-c:a", "aac", "-b:a", "128k"]
else:
cmd += ["-an"]
cmd += [str(output_path)]
return cmd
+102
View File
@@ -0,0 +1,102 @@
{
"_comment": "SWF Converter Configuration File. Keys with _ prefix are comments/metadata.",
"_schema_reference": "See config_manager.py for full schema documentation.",
"_defaults": {
"_comment": "These apply to ALL SWF files unless overridden per-file.",
"emulator": "ruffle",
"loop_detection": "hash",
"loop_grace": 5.0,
"max_duration": 600.0,
"fps": 30,
"crf": 18,
"window_size": "800x600",
"startup_delay": 3.0,
"audio": true
},
"simple_animation.swf": {
"_comment": "A basic non-interactive animation with a known loop point.",
"end_seconds": 45.0,
"interactions": []
},
"looping_intro.swf": {
"_comment": "An animation that loops endlessly. Use hash detection + grace period.",
"loop_detection": "hash",
"loop_grace": 5.0,
"interactions": []
},
"interactive_lesson.swf": {
"_comment": "An interactive lesson with two choice buttons appearing at t=12.5s.",
"loop_detection": "hash",
"loop_grace": 5.0,
"interactions": [
{
"t": 12.5,
"x": 320,
"y": 240,
"label": "button_yes",
"_comment": "Clicking 'Yes' at the choice screen"
},
{
"t": 12.5,
"x": 320,
"y": 340,
"label": "button_no",
"_comment": "Clicking 'No' at the choice screen"
}
]
},
"quiz.swf": {
"_comment": "A quiz with multiple questions. Each question has clickable answer areas.",
"loop_detection": "hash",
"loop_grace": 3.0,
"interactions": [
{
"t": 8.0,
"x": 200,
"y": 300,
"label": "q1_answer_a"
},
{
"t": 8.0,
"x": 200,
"y": 380,
"label": "q1_answer_b"
},
{
"t": 8.0,
"x": 200,
"y": 460,
"label": "q1_answer_c"
}
]
},
"menu.swf": {
"_comment": "A menu SWF with known end time. No loop detection needed.",
"loop_detection": "none",
"end_seconds": 30.0,
"interactions": []
},
"problematic.swf": {
"_comment": "A SWF that Ruffle struggles with — use Lightspark as fallback.",
"emulator": "lightspark",
"loop_detection": "freeze",
"loop_grace": 5.0,
"interactions": []
},
"high_quality_export.swf": {
"_comment": "Export at higher quality for archival purposes.",
"crf": 12,
"fps": 60,
"window_size": "1280x720",
"loop_detection": "hash",
"interactions": []
}
}
+194
View File
@@ -0,0 +1,194 @@
#!/usr/bin/env python3
"""
SWF Inspector - Reads SWF binary headers to extract metadata without
running the file.
Parses: SWF version, frame rate, frame count, dimensions, compression type.
Reference: https://open-flash.github.io/mirrors/swf-spec-19.pdf
"""
import logging
import struct
import zlib
from pathlib import Path
from typing import Optional
log = logging.getLogger("swf_inspector")
# SWF signatures
SIG_UNCOMPRESSED = b"FWS"
SIG_ZLIB = b"CWS"
SIG_LZMA = b"ZWS"
class SWFInspector:
"""
Reads SWF file headers and returns useful metadata.
Usage
-----
info = SWFInspector().inspect(Path("my.swf"))
print(info)
# {'version': 8, 'compression': 'none', 'fps': 24.0,
# 'frame_count': 300, 'width': 550, 'height': 400}
"""
def inspect(self, swf_path: Path) -> dict:
"""Return a dict of metadata about the SWF file."""
result = {
"path": str(swf_path),
"size_bytes": swf_path.stat().st_size if swf_path.exists() else None,
"version": None,
"compression": None,
"fps": None,
"frame_count": None,
"width": None,
"height": None,
"estimated_duration_seconds": None,
"error": None,
}
if not swf_path.exists():
result["error"] = "File not found"
return result
try:
with open(swf_path, "rb") as f:
header = f.read(8)
if len(header) < 8:
result["error"] = "File too small to be a valid SWF"
return result
sig = header[:3]
version = header[3]
file_length = struct.unpack_from("<I", header, 4)[0]
result["version"] = version
result["file_length"] = file_length
if sig == SIG_UNCOMPRESSED:
result["compression"] = "none"
self._parse_body(swf_path, result, offset=8)
elif sig == SIG_ZLIB:
result["compression"] = "zlib"
self._parse_zlib_body(swf_path, result)
elif sig == SIG_LZMA:
result["compression"] = "lzma"
result["error"] = "LZMA-compressed SWF: metadata extraction limited (version 13+)"
# Still try to get basic info from what's available
else:
result["error"] = f"Unknown SWF signature: {sig!r}"
except Exception as e:
result["error"] = str(e)
log.debug(f"SWF inspection error for {swf_path}: {e}", exc_info=True)
# Compute estimated duration
if result["fps"] and result["frame_count"]:
result["estimated_duration_seconds"] = round(
result["frame_count"] / result["fps"], 2
)
return result
def _parse_body(self, swf_path: Path, result: dict, offset: int, data: bytes = None):
"""Parse the uncompressed body of a SWF starting at `offset`."""
try:
if data is None:
with open(swf_path, "rb") as f:
f.seek(offset)
data = f.read(64) # Only need first ~32 bytes
pos = 0
# RECT structure: Nbits (5 bits) then 4 values each Nbits wide
if len(data) < 4:
return
nbits = (data[pos] >> 3) & 0x1F # top 5 bits of first byte
rect_bits = 5 + 4 * nbits
rect_bytes = (rect_bits + 7) // 8
if len(data) < rect_bytes + 4:
return
# Parse RECT fields (Xmin, Xmax, Ymin, Ymax) in twips (1/20 px)
bits = int.from_bytes(data[pos : pos + rect_bytes], "big")
total_bits = rect_bytes * 8
shift = total_bits - 5 - nbits
xmin = self._signed_bits(bits >> shift, nbits)
shift -= nbits
xmax = self._signed_bits(bits >> shift, nbits)
shift -= nbits
ymin = self._signed_bits(bits >> shift, nbits)
shift -= nbits
ymax = self._signed_bits(bits >> shift, nbits)
result["width"] = round((xmax - xmin) / 20)
result["height"] = round((ymax - ymin) / 20)
pos += rect_bytes
# FrameRate: FIXED8 (8.8 fixed point, little-endian)
if pos + 2 > len(data):
return
frame_rate_raw = struct.unpack_from("<H", data, pos)[0]
result["fps"] = round(frame_rate_raw / 256.0, 2)
pos += 2
# FrameCount: UI16
if pos + 2 > len(data):
return
result["frame_count"] = struct.unpack_from("<H", data, pos)[0]
except Exception as e:
log.debug(f"Body parse error: {e}")
def _parse_zlib_body(self, swf_path: Path, result: dict):
"""Decompress zlib SWF body and parse it."""
try:
with open(swf_path, "rb") as f:
f.seek(8) # skip signature + version + file_length
compressed = f.read()
decompressed = zlib.decompress(compressed)
self._parse_body(swf_path, result, offset=0, data=decompressed)
except zlib.error as e:
result["error"] = f"zlib decompression failed: {e}"
except Exception as e:
result["error"] = str(e)
@staticmethod
def _signed_bits(value: int, nbits: int) -> int:
"""Convert an unsigned integer to signed given bit width."""
value &= (1 << nbits) - 1
if value >= (1 << (nbits - 1)):
value -= (1 << nbits)
return value
def inspect_many(self, paths: list) -> dict:
"""Inspect multiple SWF files, returning {filename: info} dict."""
return {p.name: self.inspect(p) for p in paths}
def print_report(self, swf_path: Path):
"""Pretty-print inspection results for a single SWF."""
info = self.inspect(swf_path)
print(f"\n{'='*50}")
print(f"SWF: {info['path']}")
print(f"{'='*50}")
if info.get("error"):
print(f" ⚠ Error: {info['error']}")
print(f" Version : {info['version']}")
print(f" Compression: {info['compression']}")
print(f" Dimensions : {info['width']} × {info['height']} px")
print(f" FPS : {info['fps']}")
print(f" Frames : {info['frame_count']}")
print(f" Est. length: {info['estimated_duration_seconds']}s")
print(f" File size : {info['size_bytes']:,} bytes" if info['size_bytes'] else " File size : N/A")
print()
if __name__ == "__main__":
import sys
inspector = SWFInspector()
for path in sys.argv[1:]:
inspector.print_report(Path(path))
+407
View File
@@ -0,0 +1,407 @@
#!/usr/bin/env python3
"""
Unit tests for SWF Converter components.
Run with: python3 -m pytest tests.py -v
or: python3 tests.py
"""
import json
import struct
import tempfile
import time
import unittest
import zlib
from pathlib import Path
from unittest.mock import MagicMock, patch, call
from config_manager import ConfigManager, DEFAULT_CONFIG
from loop_detector import LoopDetector, StillnessDetector, FrameRecord
from interaction import InteractionController, ClickEvent
from swf_inspector import SWFInspector
# =============================================================================
# ConfigManager Tests
# =============================================================================
class TestConfigManager(unittest.TestCase):
def setUp(self):
self.tmpdir = tempfile.mkdtemp()
self.config_path = str(Path(self.tmpdir) / "test_config.json")
def _make_config(self, data: dict):
with open(self.config_path, "w") as f:
json.dump(data, f)
def test_defaults_used_when_no_config_file(self):
cfg_mgr = ConfigManager("/nonexistent/path.json")
cfg = cfg_mgr.get("anything.swf")
self.assertEqual(cfg["loop_grace"], DEFAULT_CONFIG["loop_grace"])
self.assertEqual(cfg["max_duration"], DEFAULT_CONFIG["max_duration"])
def test_per_file_config_overrides_defaults(self):
self._make_config({
"my.swf": {"end_seconds": 42.0, "fps": 24}
})
cfg_mgr = ConfigManager(self.config_path)
cfg = cfg_mgr.get("my.swf")
self.assertEqual(cfg["end_seconds"], 42.0)
self.assertEqual(cfg["fps"], 24)
def test_global_defaults_section(self):
self._make_config({
"_defaults": {"loop_grace": 10.0},
"my.swf": {}
})
cfg_mgr = ConfigManager(self.config_path)
cfg = cfg_mgr.get("my.swf")
self.assertEqual(cfg["loop_grace"], 10.0)
def test_per_file_overrides_global_defaults(self):
self._make_config({
"_defaults": {"loop_grace": 10.0},
"my.swf": {"loop_grace": 2.0}
})
cfg_mgr = ConfigManager(self.config_path)
cfg = cfg_mgr.get("my.swf")
self.assertEqual(cfg["loop_grace"], 2.0)
def test_cli_overrides_applied(self):
cfg_mgr = ConfigManager(self.config_path)
cfg = cfg_mgr.get("my.swf", global_overrides={"loop_grace": 99.0})
self.assertEqual(cfg["loop_grace"], 99.0)
def test_per_file_overrides_cli(self):
self._make_config({"my.swf": {"loop_grace": 7.0}})
cfg_mgr = ConfigManager(self.config_path)
cfg = cfg_mgr.get("my.swf", global_overrides={"loop_grace": 99.0})
self.assertEqual(cfg["loop_grace"], 7.0)
def test_interactions_list(self):
interactions = [
{"t": 5.0, "x": 100, "y": 200, "label": "btn1"},
{"t": 10.0, "x": 300, "y": 400, "label": "btn2"},
]
self._make_config({"my.swf": {"interactions": interactions}})
cfg_mgr = ConfigManager(self.config_path)
cfg = cfg_mgr.get("my.swf")
self.assertEqual(cfg["interactions"], interactions)
def test_add_interaction_and_save(self):
cfg_mgr = ConfigManager(self.config_path)
cfg_mgr.add_interaction("my.swf", {"t": 1.0, "x": 50, "y": 60, "label": "click_1"})
cfg_mgr.save()
cfg_mgr2 = ConfigManager(self.config_path)
cfg = cfg_mgr2.get("my.swf")
self.assertEqual(len(cfg["interactions"]), 1)
self.assertEqual(cfg["interactions"][0]["label"], "click_1")
def test_generate_starter_config(self):
swf1 = Path(self.tmpdir) / "anim.swf"
swf1.touch()
cfg_mgr = ConfigManager(self.config_path)
cfg_mgr.generate_starter([swf1])
self.assertTrue(Path(self.config_path).exists())
with open(self.config_path) as f:
data = json.load(f)
self.assertIn("anim.swf", data)
self.assertIn("_defaults", data)
def test_missing_file_returns_defaults(self):
cfg_mgr = ConfigManager(self.config_path)
cfg = cfg_mgr.get("nonexistent.swf")
self.assertEqual(cfg["emulator"], DEFAULT_CONFIG["emulator"])
def test_unknown_keys_passed_through(self):
self._make_config({"my.swf": {"custom_key": "custom_value"}})
cfg_mgr = ConfigManager(self.config_path)
cfg = cfg_mgr.get("my.swf")
self.assertEqual(cfg.get("custom_key"), "custom_value")
# =============================================================================
# LoopDetector Tests
# =============================================================================
class TestLoopDetector(unittest.TestCase):
def _make_detector(self, **kwargs):
return LoopDetector(window_id="0xfake", **kwargs)
def test_no_loop_initially(self):
d = self._make_detector()
self.assertFalse(d.loop_detected())
self.assertIsNone(d.loop_detected_at())
def test_loop_detected_on_repeated_sequence(self):
d = self._make_detector(sequence_length=3)
# Build history: A B C D E ... A B C ← should detect loop
hashes = ["aaa", "bbb", "ccc", "ddd", "eee", "fff",
"aaa", "bbb", "ccc"]
t = 0.0
for h in hashes:
record = FrameRecord(timestamp=t, hash=h)
d._check_for_loop(record)
d._history.append(record)
t += 0.5
self.assertTrue(d.loop_detected())
def test_no_false_positive_with_unique_frames(self):
d = self._make_detector(sequence_length=3)
import hashlib
for i in range(50):
h = hashlib.md5(str(i).encode()).hexdigest()
record = FrameRecord(timestamp=float(i) * 0.4, hash=h)
d._check_for_loop(record)
d._history.append(record)
self.assertFalse(d.loop_detected())
def test_loop_not_detected_with_insufficient_history(self):
d = self._make_detector(sequence_length=4)
# Only 3 records — not enough to compare
for i, h in enumerate(["aaa", "bbb", "aaa"]):
record = FrameRecord(timestamp=float(i), hash=h)
d._check_for_loop(record)
d._history.append(record)
self.assertFalse(d.loop_detected())
def test_reset_clears_detection(self):
d = self._make_detector(sequence_length=2)
hashes = ["aaa", "bbb", "ccc", "aaa", "bbb"]
t = 0.0
for h in hashes:
record = FrameRecord(timestamp=t, hash=h)
d._check_for_loop(record)
d._history.append(record)
t += 0.5
d.reset()
self.assertFalse(d.loop_detected())
self.assertEqual(len(d._history), 0)
def test_second_detection_does_not_overwrite_first(self):
d = self._make_detector(sequence_length=2)
hashes = ["aaa", "bbb", "ccc", "aaa", "bbb", "ddd", "aaa", "bbb"]
t = 0.0
first_detection = None
for h in hashes:
record = FrameRecord(timestamp=t, hash=h)
d._check_for_loop(record)
d._history.append(record)
if d.loop_detected() and first_detection is None:
first_detection = d.loop_detected_at()
t += 0.5
self.assertEqual(d.loop_detected_at(), first_detection)
# =============================================================================
# InteractionController Tests
# =============================================================================
class TestInteractionController(unittest.TestCase):
def test_clicks_sorted_by_time(self):
clicks = [
{"t": 10.0, "x": 1, "y": 1, "label": "b"},
{"t": 2.0, "x": 2, "y": 2, "label": "a"},
{"t": 5.0, "x": 3, "y": 3, "label": "c"},
]
ctrl = InteractionController("0xfake", clicks)
times = [c.t for c in ctrl.clicks]
self.assertEqual(times, [2.0, 5.0, 10.0])
def test_click_event_defaults(self):
ctrl = InteractionController("0xfake", [{"t": 1.0, "x": 10, "y": 20}])
c = ctrl.clicks[0]
self.assertEqual(c.button, 1)
self.assertFalse(c.double)
self.assertEqual(c.label, "")
def test_empty_clicks_list(self):
ctrl = InteractionController("0xfake", [])
self.assertEqual(ctrl.clicks, [])
@patch("subprocess.run")
def test_fire_click_calls_xdotool(self, mock_run):
mock_run.return_value = MagicMock(returncode=0)
ctrl = InteractionController("0x1234", [])
click = ClickEvent(t=0.0, x=100, y=200, label="test", button=1, double=False)
ctrl._fire_click(click)
calls = mock_run.call_args_list
# First call should be mousemove
self.assertIn("mousemove", calls[0][0][0])
self.assertIn("100", calls[0][0][0])
self.assertIn("200", calls[0][0][0])
# Second call should be click
self.assertIn("click", calls[1][0][0])
@patch("subprocess.run")
def test_double_click_fires_twice(self, mock_run):
mock_run.return_value = MagicMock(returncode=0)
ctrl = InteractionController("0x1234", [])
click = ClickEvent(t=0.0, x=50, y=50, label="dbl", button=1, double=True)
ctrl._fire_click(click)
# mousemove + click + click = 3 calls
click_calls = [c for c in mock_run.call_args_list if "click" in str(c)]
self.assertEqual(len(click_calls), 2)
def test_stop_cancels_pending_clicks(self):
clicks = [{"t": 60.0, "x": 1, "y": 1, "label": "late"}]
ctrl = InteractionController("0xfake", clicks)
ctrl.start(recording_start_time=time.monotonic())
time.sleep(0.1)
ctrl.stop()
# Should not have fired (click is at t=60s)
# Just verify stop() doesn't raise
self.assertTrue(True)
# =============================================================================
# SWFInspector Tests
# =============================================================================
class TestSWFInspector(unittest.TestCase):
def _make_swf(self, tmpdir, fps=24.0, frame_count=100, width=550, height=400,
compressed=False):
"""Create a minimal valid SWF binary."""
# RECT: nbits=15 for values up to 16384 (550*20=11000, 400*20=8000)
# We'll use nbits=15, all values in twips
nbits = 15
xmin, xmax, ymin, ymax = 0, width * 20, 0, height * 20
total_bits = 5 + 4 * nbits
total_bytes = (total_bits + 7) // 8
# Build the bitstream
bits = nbits << (total_bytes * 8 - 5)
for val in [xmin, xmax, ymin, ymax]:
shift_pos = total_bytes * 8 - 5 - (([xmin, xmax, ymin, ymax].index(val) + 1) * nbits)
# Redo properly:
pass
# Simpler approach: use a known valid RECT for 550x400
# nbits=15: 0b01111 = 0x0F in top 5 bits
# Then 4 x 15-bit values: 0, 11000, 0, 8000
# Pack as big-endian
nbits = 14 # enough for 11000 (14 bits = 16383 max)
total_bits_n = 5 + 4 * nbits # = 61 bits
total_bytes_n = (total_bits_n + 7) // 8 # = 8 bytes
big_val = (nbits << (total_bytes_n * 8 - 5))
big_val |= (0 & ((1 << nbits) - 1)) << (total_bytes_n * 8 - 5 - nbits)
big_val |= (xmax & ((1 << nbits) - 1)) << (total_bytes_n * 8 - 5 - 2*nbits)
big_val |= (0 & ((1 << nbits) - 1)) << (total_bytes_n * 8 - 5 - 3*nbits)
big_val |= (ymax & ((1 << nbits) - 1)) << (total_bytes_n * 8 - 5 - 4*nbits)
rect_bytes = big_val.to_bytes(total_bytes_n, "big")
# FrameRate: FIXED8 little-endian
frame_rate_raw = int(fps * 256)
frame_rate_bytes = struct.pack("<H", frame_rate_raw)
# FrameCount: UI16 little-endian
frame_count_bytes = struct.pack("<H", frame_count)
body = rect_bytes + frame_rate_bytes + frame_count_bytes
sig = b"CWS" if compressed else b"FWS"
version = 8
payload = body if not compressed else zlib.compress(body)
file_length = 8 + len(payload)
header = sig + bytes([version]) + struct.pack("<I", file_length)
path = Path(tmpdir) / "test.swf"
with open(path, "wb") as f:
f.write(header + payload)
return path
def setUp(self):
self.tmpdir = tempfile.mkdtemp()
def test_inspect_uncompressed_swf(self):
swf = self._make_swf(self.tmpdir, fps=24.0, frame_count=120)
inspector = SWFInspector()
info = inspector.inspect(swf)
self.assertIsNone(info["error"])
self.assertEqual(info["version"], 8)
self.assertEqual(info["compression"], "none")
self.assertAlmostEqual(info["fps"], 24.0, places=0)
self.assertEqual(info["frame_count"], 120)
def test_inspect_zlib_swf(self):
swf = self._make_swf(self.tmpdir, fps=30.0, frame_count=240, compressed=True)
inspector = SWFInspector()
info = inspector.inspect(swf)
self.assertEqual(info["compression"], "zlib")
self.assertAlmostEqual(info["fps"], 30.0, places=0)
self.assertEqual(info["frame_count"], 240)
def test_inspect_nonexistent_file(self):
inspector = SWFInspector()
info = inspector.inspect(Path("/nonexistent.swf"))
self.assertIsNotNone(info["error"])
def test_estimated_duration(self):
swf = self._make_swf(self.tmpdir, fps=24.0, frame_count=240)
inspector = SWFInspector()
info = inspector.inspect(swf)
self.assertAlmostEqual(info["estimated_duration_seconds"], 10.0, places=0)
def test_inspect_empty_file(self):
path = Path(self.tmpdir) / "empty.swf"
path.write_bytes(b"")
inspector = SWFInspector()
info = inspector.inspect(path)
self.assertIsNotNone(info["error"])
def test_inspect_many(self):
swf1 = self._make_swf(self.tmpdir, fps=24.0, frame_count=48)
swf2_path = Path(self.tmpdir) / "test2.swf"
import shutil
shutil.copy(swf1, swf2_path)
inspector = SWFInspector()
results = inspector.inspect_many([swf1, swf2_path])
self.assertIn(swf1.name, results)
self.assertIn(swf2_path.name, results)
# =============================================================================
# StillnessDetector Tests
# =============================================================================
class TestStillnessDetector(unittest.TestCase):
@patch("subprocess.run")
def test_find_freeze_start_parses_output(self, mock_run):
mock_run.return_value = MagicMock(
returncode=0,
stderr="[freezedetect @ 0x...] freeze_start: 42.5\n",
)
result = StillnessDetector.find_freeze_start("fake.mp4")
self.assertAlmostEqual(result, 42.5)
@patch("subprocess.run")
def test_find_freeze_start_returns_none_when_no_freeze(self, mock_run):
mock_run.return_value = MagicMock(returncode=0, stderr="no freeze here\n")
result = StillnessDetector.find_freeze_start("fake.mp4")
self.assertIsNone(result)
@patch("subprocess.run")
def test_trim_at_freeze_calls_ffmpeg(self, mock_run):
mock_run.return_value = MagicMock(returncode=0)
StillnessDetector.trim_at_freeze("in.mp4", "out.mp4", freeze_start=30.0, grace=5.0)
call_args = mock_run.call_args[0][0]
self.assertIn("ffmpeg", call_args)
self.assertIn("35.0", call_args) # end_time = 30 + 5
# =============================================================================
# Main
# =============================================================================
if __name__ == "__main__":
loader = unittest.TestLoader()
suite = loader.discover(".", pattern="tests.py")
runner = unittest.TextTestRunner(verbosity=2)
result = runner.run(suite)
exit(0 if result.wasSuccessful() else 1)