"""Audio engine: device discovery, decoding/normalization, and overlapping playback. Design decisions (see soundboard_plan.md review): * True overlap: each play spawns its own dedicated thread + sounddevice.OutputStream, so sounds mix instead of cutting each other off. (The plan's sd.play() singleton cannot do this.) * Cross-platform virtual device: auto-selection prefers VB-Cable on Windows and a null-sink / virtual device on Linux. * Live volume: the engine's volume is read on every audio block, so moving the slider affects already-playing sounds. Heavy/native imports (sounddevice, soundfile, pydub) are loaded lazily so that the pure logic in config.py / board.py and the unit tests can run without audio libraries present. """ from __future__ import annotations import os import platform import threading from typing import Callable, Optional import numpy as np # Formats decodable directly by libsndfile (soundfile); everything else goes via pydub/ffmpeg. _SOUNDFILE_EXTS = {"wav", "flac", "aiff", "aif"} SUPPORTED_EXTS = _SOUNDFILE_EXTS | {"mp3", "ogg", "m4a", "aac"} # Priority order for auto-selecting a virtual output device, case-insensitive substring match. # Works for Windows (VB-Cable) and Linux (PulseAudio/PipeWire null sink) alike. _AUTO_DEVICE_PRIORITY = ("cable input", "vb-audio", "vb-cable", "virtual", "null") DEFAULT_TARGET_CHANNELS = 2 # --------------------------------------------------------------------------- decoding def _ext(file_path: str) -> str: return os.path.splitext(file_path)[1].lower().lstrip(".") def match_channels(data: np.ndarray, target_channels: int) -> np.ndarray: """Coerce a 2-D (frames, channels) float array to ``target_channels`` channels. Mono -> duplicated across channels; extra channels -> truncated; in-between -> the first channel is tiled to fill. Keeps overlapping streams from erroring on a device that expects a fixed channel count (sounddevice does not up/down-mix for us). """ if data.ndim == 1: data = data.reshape(-1, 1) have = data.shape[1] if have == target_channels: return data if have == 1: return np.tile(data, (1, target_channels)) if have > target_channels: return data[:, :target_channels].copy() # 1 < have < target: pad by repeating the first channel pad = np.tile(data[:, :1], (1, target_channels - have)) return np.concatenate([data, pad], axis=1) def load_audio(file_path: str, target_channels: int = DEFAULT_TARGET_CHANNELS): """Load any supported format -> (float32 ndarray shape (frames, target_channels), samplerate). Both code paths are normalized to the same dtype and channel layout so that mixed files don't error or pitch-shift when streamed on one device. """ ext = _ext(file_path) if ext in _SOUNDFILE_EXTS: import soundfile as sf data, sr = sf.read(file_path, dtype="float32", always_2d=True) else: from pydub import AudioSegment seg = AudioSegment.from_file(file_path) sr = seg.frame_rate samples = np.array(seg.get_array_of_samples(), dtype=np.float32) samples = samples.reshape((-1, seg.channels)) # Normalize integer PCM to [-1, 1] using the actual sample width. max_val = float(1 << (8 * seg.sample_width - 1)) data = samples / max_val data = match_channels(np.asarray(data, dtype=np.float32), target_channels) return data, int(sr) # ----------------------------------------------------------------- device discovery def list_output_devices(): """Return [{'index': int, 'name': str, 'channels': int}, ...] for output-capable devices.""" import sounddevice as sd out = [] for i, dev in enumerate(sd.query_devices()): if dev.get("max_output_channels", 0) > 0: out.append( { "index": i, "name": dev["name"], "channels": dev["max_output_channels"], } ) return out def auto_select_device(devices) -> Optional[dict]: """Pick the best virtual-output device from list_output_devices() output, or None. Pure function (no audio imports) so it is unit-testable with plain dicts. """ for needle in _AUTO_DEVICE_PRIORITY: for dev in devices: if needle in dev["name"].lower(): return dev return None # ------------------------------------------------------------------------ playback class _Playback: """A single sound playing on its own stream in its own thread.""" def __init__(self, engine: "AudioEngine", pid: int, file_path: str, on_start: Optional[Callable], on_finish: Optional[Callable]): self._engine = engine self.pid = pid self.file_path = file_path self._on_start = on_start self._on_finish = on_finish self._stop = threading.Event() self._thread = threading.Thread(target=self._run, name=f"playback-{pid}", daemon=True) def start(self): self._thread.start() def stop(self): self._stop.set() def _run(self): import sounddevice as sd try: data, sr = load_audio(self.file_path, self._engine.target_channels) except Exception as exc: # decoding failed — report and bail without crashing GUI self._engine._remove(self.pid) if self._on_finish: self._on_finish(self.pid, exc) return if self._on_start: self._on_start(self.pid) error: Optional[Exception] = None blocksize = 2048 try: with sd.OutputStream( samplerate=sr, device=self._engine.device_index, channels=data.shape[1], dtype="float32", ) as stream: i = 0 n = len(data) while i < n and not self._stop.is_set(): chunk = data[i : i + blocksize] * self._engine.volume stream.write(np.ascontiguousarray(chunk, dtype=np.float32)) i += blocksize except Exception as exc: error = exc finally: self._engine._remove(self.pid) if self._on_finish: self._on_finish(self.pid, error) class AudioEngine: """Owns the selected output device, master volume, and the set of active playbacks. Callbacks (on_start/on_finish) fire from playback worker threads — GUI callers must marshal back to the UI thread themselves (e.g. tkinter ``root.after``). """ def __init__(self, device_name: Optional[str] = None, volume: float = 1.0): self.volume = float(volume) self.device_name: Optional[str] = None self.device_index: Optional[int] = None self.target_channels = DEFAULT_TARGET_CHANNELS self._playbacks: dict[int, _Playback] = {} self._counter = 0 self._lock = threading.Lock() if device_name: self.set_device(device_name) # ---- configuration ----------------------------------------------------- def set_device(self, device_name: Optional[str]): """Resolve a device by name substring and remember its index + channel count.""" self.device_name = device_name if not device_name: self.device_index = None self.target_channels = DEFAULT_TARGET_CHANNELS return for dev in list_output_devices(): if device_name.lower() in dev["name"].lower(): self.device_index = dev["index"] self.target_channels = min(DEFAULT_TARGET_CHANNELS, max(1, dev["channels"])) return raise ValueError(f"Output device not found: {device_name!r}") def set_volume(self, volume: float): """Set master volume in [0.0, 1.0]; applied live to playing sounds.""" self.volume = max(0.0, min(1.0, float(volume))) # ---- playback ---------------------------------------------------------- def play(self, file_path: str, on_start=None, on_finish=None) -> int: """Start a sound (non-blocking, overlapping). Returns a playback id.""" with self._lock: pid = self._counter self._counter += 1 pb = _Playback(self, pid, file_path, on_start, on_finish) self._playbacks[pid] = pb pb.start() return pid def stop(self, pid: int): with self._lock: pb = self._playbacks.get(pid) if pb: pb.stop() def stop_file(self, file_path: str): """Stop every active playback of a given file path.""" with self._lock: targets = [pb for pb in self._playbacks.values() if pb.file_path == file_path] for pb in targets: pb.stop() def stop_all(self): with self._lock: targets = list(self._playbacks.values()) for pb in targets: pb.stop() def active_count(self) -> int: with self._lock: return len(self._playbacks) def _remove(self, pid: int): with self._lock: self._playbacks.pop(pid, None)