408 lines
16 KiB
Python
408 lines
16 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Unit tests for SWF Converter components.
|
|
Run with: python3 -m pytest tests.py -v
|
|
or: python3 tests.py
|
|
"""
|
|
|
|
import json
|
|
import struct
|
|
import tempfile
|
|
import time
|
|
import unittest
|
|
import zlib
|
|
from pathlib import Path
|
|
from unittest.mock import MagicMock, patch, call
|
|
|
|
from config_manager import ConfigManager, DEFAULT_CONFIG
|
|
from loop_detector import LoopDetector, StillnessDetector, FrameRecord
|
|
from interaction import InteractionController, ClickEvent
|
|
from swf_inspector import SWFInspector
|
|
|
|
|
|
# =============================================================================
|
|
# ConfigManager Tests
|
|
# =============================================================================
|
|
|
|
class TestConfigManager(unittest.TestCase):
|
|
|
|
def setUp(self):
|
|
self.tmpdir = tempfile.mkdtemp()
|
|
self.config_path = str(Path(self.tmpdir) / "test_config.json")
|
|
|
|
def _make_config(self, data: dict):
|
|
with open(self.config_path, "w") as f:
|
|
json.dump(data, f)
|
|
|
|
def test_defaults_used_when_no_config_file(self):
|
|
cfg_mgr = ConfigManager("/nonexistent/path.json")
|
|
cfg = cfg_mgr.get("anything.swf")
|
|
self.assertEqual(cfg["loop_grace"], DEFAULT_CONFIG["loop_grace"])
|
|
self.assertEqual(cfg["max_duration"], DEFAULT_CONFIG["max_duration"])
|
|
|
|
def test_per_file_config_overrides_defaults(self):
|
|
self._make_config({
|
|
"my.swf": {"end_seconds": 42.0, "fps": 24}
|
|
})
|
|
cfg_mgr = ConfigManager(self.config_path)
|
|
cfg = cfg_mgr.get("my.swf")
|
|
self.assertEqual(cfg["end_seconds"], 42.0)
|
|
self.assertEqual(cfg["fps"], 24)
|
|
|
|
def test_global_defaults_section(self):
|
|
self._make_config({
|
|
"_defaults": {"loop_grace": 10.0},
|
|
"my.swf": {}
|
|
})
|
|
cfg_mgr = ConfigManager(self.config_path)
|
|
cfg = cfg_mgr.get("my.swf")
|
|
self.assertEqual(cfg["loop_grace"], 10.0)
|
|
|
|
def test_per_file_overrides_global_defaults(self):
|
|
self._make_config({
|
|
"_defaults": {"loop_grace": 10.0},
|
|
"my.swf": {"loop_grace": 2.0}
|
|
})
|
|
cfg_mgr = ConfigManager(self.config_path)
|
|
cfg = cfg_mgr.get("my.swf")
|
|
self.assertEqual(cfg["loop_grace"], 2.0)
|
|
|
|
def test_cli_overrides_applied(self):
|
|
cfg_mgr = ConfigManager(self.config_path)
|
|
cfg = cfg_mgr.get("my.swf", global_overrides={"loop_grace": 99.0})
|
|
self.assertEqual(cfg["loop_grace"], 99.0)
|
|
|
|
def test_per_file_overrides_cli(self):
|
|
self._make_config({"my.swf": {"loop_grace": 7.0}})
|
|
cfg_mgr = ConfigManager(self.config_path)
|
|
cfg = cfg_mgr.get("my.swf", global_overrides={"loop_grace": 99.0})
|
|
self.assertEqual(cfg["loop_grace"], 7.0)
|
|
|
|
def test_interactions_list(self):
|
|
interactions = [
|
|
{"t": 5.0, "x": 100, "y": 200, "label": "btn1"},
|
|
{"t": 10.0, "x": 300, "y": 400, "label": "btn2"},
|
|
]
|
|
self._make_config({"my.swf": {"interactions": interactions}})
|
|
cfg_mgr = ConfigManager(self.config_path)
|
|
cfg = cfg_mgr.get("my.swf")
|
|
self.assertEqual(cfg["interactions"], interactions)
|
|
|
|
def test_add_interaction_and_save(self):
|
|
cfg_mgr = ConfigManager(self.config_path)
|
|
cfg_mgr.add_interaction("my.swf", {"t": 1.0, "x": 50, "y": 60, "label": "click_1"})
|
|
cfg_mgr.save()
|
|
cfg_mgr2 = ConfigManager(self.config_path)
|
|
cfg = cfg_mgr2.get("my.swf")
|
|
self.assertEqual(len(cfg["interactions"]), 1)
|
|
self.assertEqual(cfg["interactions"][0]["label"], "click_1")
|
|
|
|
def test_generate_starter_config(self):
|
|
swf1 = Path(self.tmpdir) / "anim.swf"
|
|
swf1.touch()
|
|
cfg_mgr = ConfigManager(self.config_path)
|
|
cfg_mgr.generate_starter([swf1])
|
|
self.assertTrue(Path(self.config_path).exists())
|
|
with open(self.config_path) as f:
|
|
data = json.load(f)
|
|
self.assertIn("anim.swf", data)
|
|
self.assertIn("_defaults", data)
|
|
|
|
def test_missing_file_returns_defaults(self):
|
|
cfg_mgr = ConfigManager(self.config_path)
|
|
cfg = cfg_mgr.get("nonexistent.swf")
|
|
self.assertEqual(cfg["emulator"], DEFAULT_CONFIG["emulator"])
|
|
|
|
def test_unknown_keys_passed_through(self):
|
|
self._make_config({"my.swf": {"custom_key": "custom_value"}})
|
|
cfg_mgr = ConfigManager(self.config_path)
|
|
cfg = cfg_mgr.get("my.swf")
|
|
self.assertEqual(cfg.get("custom_key"), "custom_value")
|
|
|
|
|
|
# =============================================================================
|
|
# LoopDetector Tests
|
|
# =============================================================================
|
|
|
|
class TestLoopDetector(unittest.TestCase):
|
|
|
|
def _make_detector(self, **kwargs):
|
|
return LoopDetector(window_id="0xfake", **kwargs)
|
|
|
|
def test_no_loop_initially(self):
|
|
d = self._make_detector()
|
|
self.assertFalse(d.loop_detected())
|
|
self.assertIsNone(d.loop_detected_at())
|
|
|
|
def test_loop_detected_on_repeated_sequence(self):
|
|
d = self._make_detector(sequence_length=3)
|
|
# Build history: A B C D E ... A B C ← should detect loop
|
|
hashes = ["aaa", "bbb", "ccc", "ddd", "eee", "fff",
|
|
"aaa", "bbb", "ccc"]
|
|
t = 0.0
|
|
for h in hashes:
|
|
record = FrameRecord(timestamp=t, hash=h)
|
|
d._check_for_loop(record)
|
|
d._history.append(record)
|
|
t += 0.5
|
|
|
|
self.assertTrue(d.loop_detected())
|
|
|
|
def test_no_false_positive_with_unique_frames(self):
|
|
d = self._make_detector(sequence_length=3)
|
|
import hashlib
|
|
for i in range(50):
|
|
h = hashlib.md5(str(i).encode()).hexdigest()
|
|
record = FrameRecord(timestamp=float(i) * 0.4, hash=h)
|
|
d._check_for_loop(record)
|
|
d._history.append(record)
|
|
self.assertFalse(d.loop_detected())
|
|
|
|
def test_loop_not_detected_with_insufficient_history(self):
|
|
d = self._make_detector(sequence_length=4)
|
|
# Only 3 records — not enough to compare
|
|
for i, h in enumerate(["aaa", "bbb", "aaa"]):
|
|
record = FrameRecord(timestamp=float(i), hash=h)
|
|
d._check_for_loop(record)
|
|
d._history.append(record)
|
|
self.assertFalse(d.loop_detected())
|
|
|
|
def test_reset_clears_detection(self):
|
|
d = self._make_detector(sequence_length=2)
|
|
hashes = ["aaa", "bbb", "ccc", "aaa", "bbb"]
|
|
t = 0.0
|
|
for h in hashes:
|
|
record = FrameRecord(timestamp=t, hash=h)
|
|
d._check_for_loop(record)
|
|
d._history.append(record)
|
|
t += 0.5
|
|
d.reset()
|
|
self.assertFalse(d.loop_detected())
|
|
self.assertEqual(len(d._history), 0)
|
|
|
|
def test_second_detection_does_not_overwrite_first(self):
|
|
d = self._make_detector(sequence_length=2)
|
|
hashes = ["aaa", "bbb", "ccc", "aaa", "bbb", "ddd", "aaa", "bbb"]
|
|
t = 0.0
|
|
first_detection = None
|
|
for h in hashes:
|
|
record = FrameRecord(timestamp=t, hash=h)
|
|
d._check_for_loop(record)
|
|
d._history.append(record)
|
|
if d.loop_detected() and first_detection is None:
|
|
first_detection = d.loop_detected_at()
|
|
t += 0.5
|
|
self.assertEqual(d.loop_detected_at(), first_detection)
|
|
|
|
|
|
# =============================================================================
|
|
# InteractionController Tests
|
|
# =============================================================================
|
|
|
|
class TestInteractionController(unittest.TestCase):
|
|
|
|
def test_clicks_sorted_by_time(self):
|
|
clicks = [
|
|
{"t": 10.0, "x": 1, "y": 1, "label": "b"},
|
|
{"t": 2.0, "x": 2, "y": 2, "label": "a"},
|
|
{"t": 5.0, "x": 3, "y": 3, "label": "c"},
|
|
]
|
|
ctrl = InteractionController("0xfake", clicks)
|
|
times = [c.t for c in ctrl.clicks]
|
|
self.assertEqual(times, [2.0, 5.0, 10.0])
|
|
|
|
def test_click_event_defaults(self):
|
|
ctrl = InteractionController("0xfake", [{"t": 1.0, "x": 10, "y": 20}])
|
|
c = ctrl.clicks[0]
|
|
self.assertEqual(c.button, 1)
|
|
self.assertFalse(c.double)
|
|
self.assertEqual(c.label, "")
|
|
|
|
def test_empty_clicks_list(self):
|
|
ctrl = InteractionController("0xfake", [])
|
|
self.assertEqual(ctrl.clicks, [])
|
|
|
|
@patch("subprocess.run")
|
|
def test_fire_click_calls_xdotool(self, mock_run):
|
|
mock_run.return_value = MagicMock(returncode=0)
|
|
ctrl = InteractionController("0x1234", [])
|
|
click = ClickEvent(t=0.0, x=100, y=200, label="test", button=1, double=False)
|
|
ctrl._fire_click(click)
|
|
|
|
calls = mock_run.call_args_list
|
|
# First call should be mousemove
|
|
self.assertIn("mousemove", calls[0][0][0])
|
|
self.assertIn("100", calls[0][0][0])
|
|
self.assertIn("200", calls[0][0][0])
|
|
# Second call should be click
|
|
self.assertIn("click", calls[1][0][0])
|
|
|
|
@patch("subprocess.run")
|
|
def test_double_click_fires_twice(self, mock_run):
|
|
mock_run.return_value = MagicMock(returncode=0)
|
|
ctrl = InteractionController("0x1234", [])
|
|
click = ClickEvent(t=0.0, x=50, y=50, label="dbl", button=1, double=True)
|
|
ctrl._fire_click(click)
|
|
# mousemove + click + click = 3 calls
|
|
click_calls = [c for c in mock_run.call_args_list if "click" in str(c)]
|
|
self.assertEqual(len(click_calls), 2)
|
|
|
|
def test_stop_cancels_pending_clicks(self):
|
|
clicks = [{"t": 60.0, "x": 1, "y": 1, "label": "late"}]
|
|
ctrl = InteractionController("0xfake", clicks)
|
|
ctrl.start(recording_start_time=time.monotonic())
|
|
time.sleep(0.1)
|
|
ctrl.stop()
|
|
# Should not have fired (click is at t=60s)
|
|
# Just verify stop() doesn't raise
|
|
self.assertTrue(True)
|
|
|
|
|
|
# =============================================================================
|
|
# SWFInspector Tests
|
|
# =============================================================================
|
|
|
|
class TestSWFInspector(unittest.TestCase):
|
|
|
|
def _make_swf(self, tmpdir, fps=24.0, frame_count=100, width=550, height=400,
|
|
compressed=False):
|
|
"""Create a minimal valid SWF binary."""
|
|
# RECT: nbits=15 for values up to 16384 (550*20=11000, 400*20=8000)
|
|
# We'll use nbits=15, all values in twips
|
|
nbits = 15
|
|
xmin, xmax, ymin, ymax = 0, width * 20, 0, height * 20
|
|
total_bits = 5 + 4 * nbits
|
|
total_bytes = (total_bits + 7) // 8
|
|
|
|
# Build the bitstream
|
|
bits = nbits << (total_bytes * 8 - 5)
|
|
for val in [xmin, xmax, ymin, ymax]:
|
|
shift_pos = total_bytes * 8 - 5 - (([xmin, xmax, ymin, ymax].index(val) + 1) * nbits)
|
|
# Redo properly:
|
|
pass
|
|
|
|
# Simpler approach: use a known valid RECT for 550x400
|
|
# nbits=15: 0b01111 = 0x0F in top 5 bits
|
|
# Then 4 x 15-bit values: 0, 11000, 0, 8000
|
|
# Pack as big-endian
|
|
nbits = 14 # enough for 11000 (14 bits = 16383 max)
|
|
total_bits_n = 5 + 4 * nbits # = 61 bits
|
|
total_bytes_n = (total_bits_n + 7) // 8 # = 8 bytes
|
|
big_val = (nbits << (total_bytes_n * 8 - 5))
|
|
big_val |= (0 & ((1 << nbits) - 1)) << (total_bytes_n * 8 - 5 - nbits)
|
|
big_val |= (xmax & ((1 << nbits) - 1)) << (total_bytes_n * 8 - 5 - 2*nbits)
|
|
big_val |= (0 & ((1 << nbits) - 1)) << (total_bytes_n * 8 - 5 - 3*nbits)
|
|
big_val |= (ymax & ((1 << nbits) - 1)) << (total_bytes_n * 8 - 5 - 4*nbits)
|
|
rect_bytes = big_val.to_bytes(total_bytes_n, "big")
|
|
|
|
# FrameRate: FIXED8 little-endian
|
|
frame_rate_raw = int(fps * 256)
|
|
frame_rate_bytes = struct.pack("<H", frame_rate_raw)
|
|
|
|
# FrameCount: UI16 little-endian
|
|
frame_count_bytes = struct.pack("<H", frame_count)
|
|
|
|
body = rect_bytes + frame_rate_bytes + frame_count_bytes
|
|
|
|
sig = b"CWS" if compressed else b"FWS"
|
|
version = 8
|
|
payload = body if not compressed else zlib.compress(body)
|
|
file_length = 8 + len(payload)
|
|
header = sig + bytes([version]) + struct.pack("<I", file_length)
|
|
|
|
path = Path(tmpdir) / "test.swf"
|
|
with open(path, "wb") as f:
|
|
f.write(header + payload)
|
|
return path
|
|
|
|
def setUp(self):
|
|
self.tmpdir = tempfile.mkdtemp()
|
|
|
|
def test_inspect_uncompressed_swf(self):
|
|
swf = self._make_swf(self.tmpdir, fps=24.0, frame_count=120)
|
|
inspector = SWFInspector()
|
|
info = inspector.inspect(swf)
|
|
self.assertIsNone(info["error"])
|
|
self.assertEqual(info["version"], 8)
|
|
self.assertEqual(info["compression"], "none")
|
|
self.assertAlmostEqual(info["fps"], 24.0, places=0)
|
|
self.assertEqual(info["frame_count"], 120)
|
|
|
|
def test_inspect_zlib_swf(self):
|
|
swf = self._make_swf(self.tmpdir, fps=30.0, frame_count=240, compressed=True)
|
|
inspector = SWFInspector()
|
|
info = inspector.inspect(swf)
|
|
self.assertEqual(info["compression"], "zlib")
|
|
self.assertAlmostEqual(info["fps"], 30.0, places=0)
|
|
self.assertEqual(info["frame_count"], 240)
|
|
|
|
def test_inspect_nonexistent_file(self):
|
|
inspector = SWFInspector()
|
|
info = inspector.inspect(Path("/nonexistent.swf"))
|
|
self.assertIsNotNone(info["error"])
|
|
|
|
def test_estimated_duration(self):
|
|
swf = self._make_swf(self.tmpdir, fps=24.0, frame_count=240)
|
|
inspector = SWFInspector()
|
|
info = inspector.inspect(swf)
|
|
self.assertAlmostEqual(info["estimated_duration_seconds"], 10.0, places=0)
|
|
|
|
def test_inspect_empty_file(self):
|
|
path = Path(self.tmpdir) / "empty.swf"
|
|
path.write_bytes(b"")
|
|
inspector = SWFInspector()
|
|
info = inspector.inspect(path)
|
|
self.assertIsNotNone(info["error"])
|
|
|
|
def test_inspect_many(self):
|
|
swf1 = self._make_swf(self.tmpdir, fps=24.0, frame_count=48)
|
|
swf2_path = Path(self.tmpdir) / "test2.swf"
|
|
import shutil
|
|
shutil.copy(swf1, swf2_path)
|
|
inspector = SWFInspector()
|
|
results = inspector.inspect_many([swf1, swf2_path])
|
|
self.assertIn(swf1.name, results)
|
|
self.assertIn(swf2_path.name, results)
|
|
|
|
|
|
# =============================================================================
|
|
# StillnessDetector Tests
|
|
# =============================================================================
|
|
|
|
class TestStillnessDetector(unittest.TestCase):
|
|
|
|
@patch("subprocess.run")
|
|
def test_find_freeze_start_parses_output(self, mock_run):
|
|
mock_run.return_value = MagicMock(
|
|
returncode=0,
|
|
stderr="[freezedetect @ 0x...] freeze_start: 42.5\n",
|
|
)
|
|
result = StillnessDetector.find_freeze_start("fake.mp4")
|
|
self.assertAlmostEqual(result, 42.5)
|
|
|
|
@patch("subprocess.run")
|
|
def test_find_freeze_start_returns_none_when_no_freeze(self, mock_run):
|
|
mock_run.return_value = MagicMock(returncode=0, stderr="no freeze here\n")
|
|
result = StillnessDetector.find_freeze_start("fake.mp4")
|
|
self.assertIsNone(result)
|
|
|
|
@patch("subprocess.run")
|
|
def test_trim_at_freeze_calls_ffmpeg(self, mock_run):
|
|
mock_run.return_value = MagicMock(returncode=0)
|
|
StillnessDetector.trim_at_freeze("in.mp4", "out.mp4", freeze_start=30.0, grace=5.0)
|
|
call_args = mock_run.call_args[0][0]
|
|
self.assertIn("ffmpeg", call_args)
|
|
self.assertIn("35.0", call_args) # end_time = 30 + 5
|
|
|
|
|
|
# =============================================================================
|
|
# Main
|
|
# =============================================================================
|
|
|
|
if __name__ == "__main__":
|
|
loader = unittest.TestLoader()
|
|
suite = loader.discover(".", pattern="tests.py")
|
|
runner = unittest.TextTestRunner(verbosity=2)
|
|
result = runner.run(suite)
|
|
exit(0 if result.wasSuccessful() else 1)
|