Files

255 lines
9.1 KiB
Python

"""Audio engine: device discovery, decoding/normalization, and overlapping playback.
Design decisions (see soundboard_plan.md review):
* True overlap: each play spawns its own dedicated thread + sounddevice.OutputStream,
so sounds mix instead of cutting each other off. (The plan's sd.play() singleton
cannot do this.)
* Cross-platform virtual device: auto-selection prefers VB-Cable on Windows and a
null-sink / virtual device on Linux.
* Live volume: the engine's volume is read on every audio block, so moving the slider
affects already-playing sounds.
Heavy/native imports (sounddevice, soundfile, pydub) are loaded lazily so that the pure
logic in config.py / board.py and the unit tests can run without audio libraries present.
"""
from __future__ import annotations
import os
import platform
import threading
from typing import Callable, Optional
import numpy as np
# Formats decodable directly by libsndfile (soundfile); everything else goes via pydub/ffmpeg.
_SOUNDFILE_EXTS = {"wav", "flac", "aiff", "aif"}
SUPPORTED_EXTS = _SOUNDFILE_EXTS | {"mp3", "ogg", "m4a", "aac"}
# Priority order for auto-selecting a virtual output device, case-insensitive substring match.
# Works for Windows (VB-Cable) and Linux (PulseAudio/PipeWire null sink) alike.
_AUTO_DEVICE_PRIORITY = ("cable input", "vb-audio", "vb-cable", "virtual", "null")
DEFAULT_TARGET_CHANNELS = 2
# --------------------------------------------------------------------------- decoding
def _ext(file_path: str) -> str:
return os.path.splitext(file_path)[1].lower().lstrip(".")
def match_channels(data: np.ndarray, target_channels: int) -> np.ndarray:
"""Coerce a 2-D (frames, channels) float array to ``target_channels`` channels.
Mono -> duplicated across channels; extra channels -> truncated; in-between -> the
first channel is tiled to fill. Keeps overlapping streams from erroring on a device
that expects a fixed channel count (sounddevice does not up/down-mix for us).
"""
if data.ndim == 1:
data = data.reshape(-1, 1)
have = data.shape[1]
if have == target_channels:
return data
if have == 1:
return np.tile(data, (1, target_channels))
if have > target_channels:
return data[:, :target_channels].copy()
# 1 < have < target: pad by repeating the first channel
pad = np.tile(data[:, :1], (1, target_channels - have))
return np.concatenate([data, pad], axis=1)
def load_audio(file_path: str, target_channels: int = DEFAULT_TARGET_CHANNELS):
"""Load any supported format -> (float32 ndarray shape (frames, target_channels), samplerate).
Both code paths are normalized to the same dtype and channel layout so that mixed
files don't error or pitch-shift when streamed on one device.
"""
ext = _ext(file_path)
if ext in _SOUNDFILE_EXTS:
import soundfile as sf
data, sr = sf.read(file_path, dtype="float32", always_2d=True)
else:
from pydub import AudioSegment
seg = AudioSegment.from_file(file_path)
sr = seg.frame_rate
samples = np.array(seg.get_array_of_samples(), dtype=np.float32)
samples = samples.reshape((-1, seg.channels))
# Normalize integer PCM to [-1, 1] using the actual sample width.
max_val = float(1 << (8 * seg.sample_width - 1))
data = samples / max_val
data = match_channels(np.asarray(data, dtype=np.float32), target_channels)
return data, int(sr)
# ----------------------------------------------------------------- device discovery
def list_output_devices():
"""Return [{'index': int, 'name': str, 'channels': int}, ...] for output-capable devices."""
import sounddevice as sd
out = []
for i, dev in enumerate(sd.query_devices()):
if dev.get("max_output_channels", 0) > 0:
out.append(
{
"index": i,
"name": dev["name"],
"channels": dev["max_output_channels"],
}
)
return out
def auto_select_device(devices) -> Optional[dict]:
"""Pick the best virtual-output device from list_output_devices() output, or None.
Pure function (no audio imports) so it is unit-testable with plain dicts.
"""
for needle in _AUTO_DEVICE_PRIORITY:
for dev in devices:
if needle in dev["name"].lower():
return dev
return None
# ------------------------------------------------------------------------ playback
class _Playback:
"""A single sound playing on its own stream in its own thread."""
def __init__(self, engine: "AudioEngine", pid: int, file_path: str,
on_start: Optional[Callable], on_finish: Optional[Callable]):
self._engine = engine
self.pid = pid
self.file_path = file_path
self._on_start = on_start
self._on_finish = on_finish
self._stop = threading.Event()
self._thread = threading.Thread(target=self._run, name=f"playback-{pid}", daemon=True)
def start(self):
self._thread.start()
def stop(self):
self._stop.set()
def _run(self):
import sounddevice as sd
try:
data, sr = load_audio(self.file_path, self._engine.target_channels)
except Exception as exc: # decoding failed — report and bail without crashing GUI
self._engine._remove(self.pid)
if self._on_finish:
self._on_finish(self.pid, exc)
return
if self._on_start:
self._on_start(self.pid)
error: Optional[Exception] = None
blocksize = 2048
try:
with sd.OutputStream(
samplerate=sr,
device=self._engine.device_index,
channels=data.shape[1],
dtype="float32",
) as stream:
i = 0
n = len(data)
while i < n and not self._stop.is_set():
chunk = data[i : i + blocksize] * self._engine.volume
stream.write(np.ascontiguousarray(chunk, dtype=np.float32))
i += blocksize
except Exception as exc:
error = exc
finally:
self._engine._remove(self.pid)
if self._on_finish:
self._on_finish(self.pid, error)
class AudioEngine:
"""Owns the selected output device, master volume, and the set of active playbacks.
Callbacks (on_start/on_finish) fire from playback worker threads — GUI callers must
marshal back to the UI thread themselves (e.g. tkinter ``root.after``).
"""
def __init__(self, device_name: Optional[str] = None, volume: float = 1.0):
self.volume = float(volume)
self.device_name: Optional[str] = None
self.device_index: Optional[int] = None
self.target_channels = DEFAULT_TARGET_CHANNELS
self._playbacks: dict[int, _Playback] = {}
self._counter = 0
self._lock = threading.Lock()
if device_name:
self.set_device(device_name)
# ---- configuration -----------------------------------------------------
def set_device(self, device_name: Optional[str]):
"""Resolve a device by name substring and remember its index + channel count."""
self.device_name = device_name
if not device_name:
self.device_index = None
self.target_channels = DEFAULT_TARGET_CHANNELS
return
for dev in list_output_devices():
if device_name.lower() in dev["name"].lower():
self.device_index = dev["index"]
self.target_channels = min(DEFAULT_TARGET_CHANNELS, max(1, dev["channels"]))
return
raise ValueError(f"Output device not found: {device_name!r}")
def set_volume(self, volume: float):
"""Set master volume in [0.0, 1.0]; applied live to playing sounds."""
self.volume = max(0.0, min(1.0, float(volume)))
# ---- playback ----------------------------------------------------------
def play(self, file_path: str, on_start=None, on_finish=None) -> int:
"""Start a sound (non-blocking, overlapping). Returns a playback id."""
with self._lock:
pid = self._counter
self._counter += 1
pb = _Playback(self, pid, file_path, on_start, on_finish)
self._playbacks[pid] = pb
pb.start()
return pid
def stop(self, pid: int):
with self._lock:
pb = self._playbacks.get(pid)
if pb:
pb.stop()
def stop_file(self, file_path: str):
"""Stop every active playback of a given file path."""
with self._lock:
targets = [pb for pb in self._playbacks.values() if pb.file_path == file_path]
for pb in targets:
pb.stop()
def stop_all(self):
with self._lock:
targets = list(self._playbacks.values())
for pb in targets:
pb.stop()
def active_count(self) -> int:
with self._lock:
return len(self._playbacks)
def _remove(self, pid: int):
with self._lock:
self._playbacks.pop(pid, None)