From 42fd1227fdd44d4370c6550dc6fef05c3f8805e4 Mon Sep 17 00:00:00 2001 From: Marco Colombo Date: Fri, 24 Oct 2025 15:44:01 +0200 Subject: [PATCH 01/22] Sound generator Brick --- .../app_bricks/sound_generator/README.md | 42 +++ .../app_bricks/sound_generator/__init__.py | 335 ++++++++++++++++++ .../sound_generator/brick_config.yaml | 7 + .../app_bricks/sound_generator/effects.py | 243 +++++++++++++ .../examples/1_play_sequence.py | 51 +++ .../app_bricks/sound_generator/loaders.py | 181 ++++++++++ .../app_peripherals/speaker/__init__.py | 11 +- src/arduino/app_utils/audio.py | 64 ++++ 8 files changed, 931 insertions(+), 3 deletions(-) create mode 100644 src/arduino/app_bricks/sound_generator/README.md create mode 100644 src/arduino/app_bricks/sound_generator/__init__.py create mode 100644 src/arduino/app_bricks/sound_generator/brick_config.yaml create mode 100644 src/arduino/app_bricks/sound_generator/effects.py create mode 100644 src/arduino/app_bricks/sound_generator/examples/1_play_sequence.py create mode 100644 src/arduino/app_bricks/sound_generator/loaders.py diff --git a/src/arduino/app_bricks/sound_generator/README.md b/src/arduino/app_bricks/sound_generator/README.md new file mode 100644 index 00000000..d78021d3 --- /dev/null +++ b/src/arduino/app_bricks/sound_generator/README.md @@ -0,0 +1,42 @@ +# Sound Generator Brick + +Play sounds and melodies + +## Code example and usage + +```python +from arduino.app_bricks.sound_generator import SoundGenerator, SoundEffect +from arduino.app_utils import App + +player = SoundGenerator(sound_effects=[SoundEffect.adsr()]) + +fur_elise = [ + ("E5", 1/4), ("D#5", 1/4), ("E5", 1/4), ("D#5", 1/4), ("E5", 1/4), + ("B4", 1/4), ("D5", 1/4), ("C5", 1/4), ("A4", 1/2), + + ("C4", 1/4), ("E4", 1/4), ("A4", 1/4), ("B4", 1/2), + ("E4", 1/4), ("G#4", 1/4), ("B4", 1/4), ("C5", 1/2), + + ("E4", 1/4), ("E5", 1/4), ("D#5", 1/4), ("E5", 1/4), ("D#5", 1/4), ("E5", 1/4), + ("B4", 1/4), ("D5", 1/4), ("C5", 1/4), ("A4", 1/2), + + ("C4", 1/4), ("E4", 1/4), ("A4", 1/4), ("B4", 1/2), + ("E4", 1/4), ("C5", 1/4), ("B4", 1/4), ("A4", 1.0), +] +for note, duration in fur_elise: + player.play(note, duration) + +App.run() +``` + +waveform can be customized to change effect. For example, for a retro-gaming sound, you can configure "square" wave form. + +```python +player = SoundGenerator(wave_form="square") +``` + +instead, to have a more "rock" like sound, you can add effect + +```python +player = SoundGenerator(sound_effects=[SoundEffect.adsr(), SoundEffect.overdrive(drive=180.0), SoundEffect.chorus(depth_ms=15, rate_hz=0.2, mix=0.4)]) +``` diff --git a/src/arduino/app_bricks/sound_generator/__init__.py b/src/arduino/app_bricks/sound_generator/__init__.py new file mode 100644 index 00000000..7b809400 --- /dev/null +++ b/src/arduino/app_bricks/sound_generator/__init__.py @@ -0,0 +1,335 @@ +# SPDX-FileCopyrightText: Copyright (C) 2025 ARDUINO SA +# +# SPDX-License-Identifier: MPL-2.0 + +from arduino.app_utils import WaveGenerator, brick +from arduino.app_peripherals.speaker import Speaker +import threading +import numpy as np + +from .effects import * +from .loaders import ABCNotationLoader + + +@brick +class SoundGenerator: + SAMPLE_RATE = 16000 + A4_FREQUENCY = 440.0 + + # Semitone mapping for the 12 notes (0 = C, 11 = B). + # This is used to determine the relative position within an octave. + SEMITONE_MAP = { + "C": 0, + "C#": 1, + "DB": 1, + "D": 2, + "D#": 3, + "EB": 3, + "E": 4, + "F": 5, + "F#": 6, + "GB": 6, + "G": 7, + "G#": 8, + "AB": 8, + "A": 9, + "A#": 10, + "BB": 10, + "B": 11, + } + + NOTE_DURATTION = { + "W": 1.0, # Whole + "H": 0.5, # Half + "Q": 0.25, # Quarter + "E": 0.125, # Eighth + "S": 0.0625, # Sixteenth + "T": 0.03125, # Thirty-second + "X": 0.015625, # Sixty-fourth + } + + # The reference point in the overall semitone count from C0. A4 is (4 * 12) + 9 semitones from C0. + A4_SEMITONE_INDEX = (4 * 12) + 9 + + def __init__( + self, + output_device: Speaker = None, + bpm: int = 120, + time_signature: tuple = (4, 4), + octaves: int = 8, + wave_form: str = "sine", + master_volume: float = 1.0, + sound_effects: list = None, + ): + """Initialize the SoundGenerator. + Args: + output_device (Speaker, optional): The output device to play sound through. + wave_form (str): The type of wave form to generate. Supported values + are "sine" (default), "square", "triangle" and "sawtooth". + bpm (int): The tempo in beats per minute for note duration calculations. + master_volume (float): The master volume level (0.0 to 1.0). + octaves (int): Number of octaves to generate notes for (starting from octave + 0 up to octaves-1). + sound_effects (list, optional): List of sound effect instances to apply to the audio + signal (e.g., [SoundEffect.adsr()]). See SoundEffect class for available effects. + """ + + self._wave_gen = WaveGenerator(sample_rate=self.SAMPLE_RATE, wave_form=wave_form) + self._bpm = bpm + self.time_signature = time_signature + self._master_volume = master_volume + self._sound_effects = sound_effects + if output_device is None: + self._self_created_device = True + self._output_device = Speaker(sample_rate=self.SAMPLE_RATE, format="FLOAT_LE") + else: + self._self_created_device = False + self._output_device = output_device + + self._cfg_lock = threading.Lock() + self._notes = {} + for octave in range(octaves): + notes = self._fill_node_frequencies(octave) + self._notes.update(notes) + + def start(self): + if self._self_created_device: + self._output_device.start(notify_if_started=False) + + def stop(self): + if self._self_created_device: + self._output_device.stop() + + def set_master_volume(self, volume: float): + """ + Set the master volume level. + Args: + volume (float): Volume level (0.0 to 1.0). + """ + self._master_volume = max(0.0, min(1.0, volume)) + + def set_effects(self, effects: list): + """ + Set the list of sound effects to apply to the audio signal. + Args: + effects (list): List of sound effect instances (e.g., [SoundEffect.adsr()]). + """ + with self._cfg_lock: + self._sound_effects = effects + + def _fill_node_frequencies(self, octave: int) -> dict: + """ + Given a sequence of notes with their names and octaves, fill in their frequencies. + + """ + notes = {} + + notes[f"REST"] = 0.0 # Rest note + + # Generate frequencies for all notes in the given octave + for note_name in self.SEMITONE_MAP: + frequency = self._note_to_frequency(note_name, octave) + notes[f"{note_name}{octave}"] = frequency + + return notes + + def _note_to_frequency(self, note_name: str, octave: int) -> float: + """ + Calculates the frequency (in Hz) of a musical note based on its name and octave. + + It uses the standard 12-tone equal temperament formula: f = f0 * 2^(n/12), + where f0 is the reference frequency (A4=440Hz) and n is the number of + semitones from the reference note. + + Args: + note_name: The name of the note (e.g., 'A', 'C#', 'Bb', case-insensitive). + octave: The octave number (e.g., 4 for A4, 5 for C5). + + Returns: + The frequency in Hertz (float). + """ + # 1. Normalize the note name for lookup + normalized_note = note_name.strip().upper() + if len(normalized_note) > 1 and normalized_note[1] == "#": + # Ensure sharps are treated correctly (e.g., 'C#' is fine) + pass + elif len(normalized_note) > 1 and normalized_note[1].lower() == "b": + # Replace 'B' (flat) with 'B' for consistent dictionary key + normalized_note = normalized_note[0] + "B" + + # 2. Look up the semitone count within the octave + if normalized_note not in self.SEMITONE_MAP: + raise ValueError(f"Invalid note name: {note_name}. Please use notes like 'A', 'C#', 'Eb', etc.") + + semitones_in_octave = self.SEMITONE_MAP[normalized_note] + + # 3. Calculate the absolute semitone index (from C0) + # Total semitones = (octave number * 12) + semitones_from_C_in_octave + target_semitone_index = (octave * 12) + semitones_in_octave + + # 4. Calculate 'n', the number of semitones from the reference pitch (A4) + # A4 is the reference, so n is the distance from A4. + semitones_from_a4 = target_semitone_index - self.A4_SEMITONE_INDEX + + # 5. Calculate the frequency + # f = 440 * 2^(n/12) + frequency_hz = self.A4_FREQUENCY * (2.0 ** (semitones_from_a4 / 12.0)) + + return frequency_hz + + def _note_duration(self, symbol: str | float | int) -> float: + """ + Decode a note duration symbol into its corresponding fractional value. + Args: + symbol (str | float | int): Note duration symbol (e.g., 'W', 'H', 'Q', etc.) or a float/int value. + Returns: + float: Corresponding fractional duration value or the float itself if provided. + """ + + if isinstance(symbol, float) or isinstance(symbol, int): + return self._compute_time_duration(symbol) + + duration = self.NOTE_DURATTION.get(symbol.upper(), None) + if duration is not None: + return self._compute_time_duration(duration) + + return self._compute_time_duration(1 / 4) # Default to quarter note + + def _compute_time_duration(self, note_fraction: float) -> float: + """ + Compute the time duration in seconds for a given note fraction and time signature. + Args: + note_fraction (float): The fraction of the note (e.g., 1.0 for whole, 0.5 for half). + time_signature (tuple): The time signature as (numerator, denominator). + Returns: + float: Duration in seconds. + """ + + numerator, denominator = self.time_signature + + # For compound time signatures (6/8, 9/8, 12/8), the beat is the dotted quarter note (3/8) + if denominator == 8 and numerator % 3 == 0: + beat_value = 3 / 8 + else: + beat_value = 1 / denominator # es. 1/4 in 4/4 + + # Calculate the duration of a single beat in seconds + beat_duration = 60.0 / self._bpm + + # Compute the total duration + return beat_duration * (note_fraction / beat_value) + + def _apply_sound_effects(self, signal: np.ndarray, frequency: float) -> np.ndarray: + """ + Apply the configured sound effects to the audio signal. + Args: + signal (np.ndarray): Input audio signal. + Returns: + np.ndarray: Processed audio signal with sound effects applied. + """ + with self._cfg_lock: + if self._sound_effects is None: + return signal + + processed_signal = signal + for effect in self._sound_effects: + if hasattr(effect, "apply_with_tone"): + processed_signal = effect.apply_with_tone(processed_signal, frequency) + else: + processed_signal = effect.apply(processed_signal) + + return processed_signal + + def _get_note(self, note: str) -> float | None: + if note is None: + return None + return self._notes.get(note.strip().upper()) + + def play_chord(self, notes: list[str], note_duration: float | str = 1 / 4, volume: float = None): + """ + Play a chord consisting of multiple musical notes simultaneously for a specified duration and volume. + Args: + notes (list[str]): List of musical notes to play (e.g., ['A4', 'C#5', 'E5']). + note_duration (float | str): Duration of the chord as a float (like 1/4, 1/8) or a symbol ('W', 'H', 'Q', etc.). + volume (float, optional): Volume level (0.0 to 1.0). If None, uses master volume. + """ + duration = self._note_duration(note_duration) + if len(notes) == 1: + self.play(notes[0], duration, volume) + return + + waves = [] + base_frequency = None + for note in notes: + frequency = self._get_note(note) + if frequency: + if base_frequency is None: + base_frequency = frequency + if volume is None: + volume = self._master_volume + data = self._wave_gen.generate_block(float(frequency), duration, volume) + waves.append(data) + else: + continue + if len(waves) == 0: + return + chord = np.sum(waves, axis=0, dtype=np.float32) + chord /= np.max(np.abs(chord)) # Normalize to prevent clipping + blk = chord.astype(np.float32) + blk = self._apply_sound_effects(blk, base_frequency) + try: + self._output_device.play(blk, block_on_queue=False) + except Exception as e: + print(f"Error playing chord {notes}: {e}") + + def play(self, note: str, note_duration: float | str = 1 / 4, volume: float = None): + """ + Play a musical note for a specified duration and volume. + Args: + note (str): The musical note to play (e.g., 'A4', 'C#5', 'REST'). + note_duration (float | str): Duration of the note as a float (like 1/4, 1/8) or a symbol ('W', 'H', 'Q', etc.). + volume (float, optional): Volume level (0.0 to 1.0). If None, uses master volume. + """ + duration = self._note_duration(note_duration) + frequency = self._get_note(note) + if frequency is not None and frequency >= 0.0: + if volume is None: + volume = self._master_volume + data = self._wave_gen.generate_block(float(frequency), duration, volume) + data = self._apply_sound_effects(data, frequency) + self._output_device.play(data, block_on_queue=False) + + def play_tone(self, note: str, duration: float = 0.25, volume: float = None): + """ + Play a musical note for a specified duration and volume. + Args: + note (str): The musical note to play (e.g., 'A4', 'C#5', 'REST'). + duration (float): Duration of the note as a float in seconds. + volume (float, optional): Volume level (0.0 to 1.0). If None, uses master volume. + """ + frequency = self._get_note(note) + if frequency is not None and frequency >= 0.0 and duration > 0.0: + if volume is None: + volume = self._master_volume + data = self._wave_gen.generate_block(float(frequency), duration, volume) + data = self._apply_sound_effects(data, frequency) + self._output_device.play(data, block_on_queue=False) + + def play_abc(self, abc_string: str, volume: float = None): + """ + Play a sequence of musical notes defined in ABC notation. + Args: + abc_string (str): ABC notation string defining the sequence of notes. + volume (float, optional): Volume level (0.0 to 1.0). If None, uses master volume. + """ + if not abc_string or abc_string.strip() == "": + return + if volume is None: + volume = self._master_volume + metadata, notes = ABCNotationLoader.parse_abc_notation(abc_string) + for note, duration in notes: + frequency = self._get_note(note) + if frequency is not None and frequency >= 0.0: + data = self._wave_gen.generate_block(float(frequency), duration, volume) + data = self._apply_sound_effects(data, frequency) + self._output_device.play(data, block_on_queue=False) diff --git a/src/arduino/app_bricks/sound_generator/brick_config.yaml b/src/arduino/app_bricks/sound_generator/brick_config.yaml new file mode 100644 index 00000000..ccaf9380 --- /dev/null +++ b/src/arduino/app_bricks/sound_generator/brick_config.yaml @@ -0,0 +1,7 @@ +id: arduino:sound_generator +name: Sound Generator +description: Generate sounds like notes, tones, or melodies using waveforms. +category: audio +required_devices: + - microphone + \ No newline at end of file diff --git a/src/arduino/app_bricks/sound_generator/effects.py b/src/arduino/app_bricks/sound_generator/effects.py new file mode 100644 index 00000000..c4e9cf20 --- /dev/null +++ b/src/arduino/app_bricks/sound_generator/effects.py @@ -0,0 +1,243 @@ +# SPDX-FileCopyrightText: Copyright (C) 2025 ARDUINO SA +# +# SPDX-License-Identifier: MPL-2.0 + +import numpy as np + + +class SoundEffect: + @staticmethod + def overdrive(drive: float = 1.0) -> np.ndarray: + """ + Apply overdrive effect to the audio signal. + Args: + signal (np.ndarray): Input audio signal. + drive (float): Overdrive intensity factor. + Returns: + np.ndarray: Processed audio signal with overdrive effect. + """ + + class SoundEffectOverdrive: + def __init__(self, drive: float = 1.0): + pass + + def apply(self, signal: np.ndarray) -> np.ndarray: + signal = signal * drive + # soft clipping + return (2 / 3) * np.tanh(signal) + + return SoundEffectOverdrive(drive) + + @staticmethod + def chorus(fs: int = 16000, depth_ms=10, rate_hz: float = 0.25, mix: float = 0.5) -> np.ndarray: + """ + Apply chorus effect to the audio signal. + Args: + signal (np.ndarray): Input audio signal. + fs (int): Sampling frequency in Hz. + depth_ms (float): Depth of the chorus effect in milliseconds. + rate_hz (float): Rate of the LFO in Hz. + mix (float): Mix ratio between dry and wet signals (0.0 to 1.0). + Returns: + np.ndarray: Processed audio signal with chorus effect. + """ + + class SoundEffectChorus: + def __init__(self, fs: int = 16000, depth_ms: int = 10, rate_hz: float = 0.25, mix: float = 0.5): + self.fs = fs + self.depth_ms = depth_ms + self.rate_hz = rate_hz + self.mix = mix + pass + + def apply(self, signal: np.ndarray) -> np.ndarray: + n = len(signal) + depth = (self.depth_ms / 1000.0) * self.fs # in samples + t = np.arange(n) + + lfo = (np.sin(2 * np.pi * self.rate_hz * t / self.fs) + 1) / 2 # [0..1] + delay = (lfo * depth).astype(int) + + out = np.zeros_like(signal) + for i in range(n): + d = delay[i] + if i - d >= 0: + out[i] = signal[i - d] + + # mix dry/wet + return ((1 - self.mix) * signal + self.mix * out).astype(np.float32) + + return SoundEffectChorus(fs, depth_ms, rate_hz, mix) + + @staticmethod + def adsr(attack: float = 0.015, decay: float = 0.2, sustain: float = 0.5, release: float = 0.35): + """ + Apply ADSR (attack/decay/sustain/release) envelope to the audio signal. + Args: + fs (int): Sampling frequency in Hz. + attack (float): Attack time in seconds. + decay (float): Decay time in seconds. + sustain (float): Sustain level (0.0 to 1.0). + release (float): Release time in seconds. + """ + + class SoundEffectADSR: + def __init__(self, attack: float = 0.015, decay: float = 0.2, sustain: float = 0.5, release: float = 0.35): + """ + Initialize ADSR effect. + Args: + attack (float): Attack time in seconds. + decay (float): Decay time in seconds. + sustain (float): Sustain level (0.0 to 1.0). + release (float): Release time in seconds. + """ + self.attack = attack + self.decay = decay + self.sustain = sustain + self.release = release + + def apply(self, signal: np.ndarray) -> np.ndarray: + """ + Apply ADSR filter on signal. + Args: + signal: np.ndarray float32 (audio) + """ + n = len(signal) + env = np.zeros(n) + + a = int(n * self.attack) + d = int(n * self.decay) + r = int(n * self.release) + + s = max(0, n - (a + d + r)) + + env[:a] = np.linspace(0, 1, a, endpoint=False) # Attack + env[a : a + d] = np.linspace(1, self.sustain, d, endpoint=False) # Decay + env[a + d : a + d + s] = self.sustain # Sustain + env[a + d + s :] = np.linspace(self.sustain, 0, n - (a + d + s), endpoint=False) # Release + + return (signal * env).astype(np.float32) + + return SoundEffectADSR(attack, decay, sustain, release) + + @staticmethod + def tremolo(depth: float = 0.5, rate: float = 5.0): + class SoundEffectTremolo: + def __init__(self, depth: float = 0.5, rate: float = 5.0): + """ + Tremolo effect block-local. + Args: + depth (float): modulation depth (0=no effect, 1=full) + rate (float): rate in cycles per block + """ + self.depth = np.clip(depth, 0.0, 1.0) + self.rate = rate # cicli di tremolo per blocco + + def apply(self, signal: np.ndarray) -> np.ndarray: + """ + Apply tremolo to a block of audio. + Args: + signal (np.ndarray): input block + """ + n = len(signal) + t = np.linspace(0, 1, n, endpoint=False) # normalizzato al blocco + lfo = (1 - self.depth) + self.depth * np.sin(2 * np.pi * self.rate * t) + return (signal * lfo).astype(np.float32) + + return SoundEffectTremolo(depth, rate) + + @staticmethod + def vibrato(depth: float = 0.02, rate: float = 0.5): + class SoundEffectVibrato: + def __init__(self, depth: float = 0.02, rate: float = 2.0): + """ + Vibrato effect + Args: + depth (float): max deviation (0=no effect, 0.5=max) + rate (float): number of cycles per block + """ + self.depth = np.clip(depth, 0.0, 0.5) + self.rate = rate + + def apply(self, signal: np.ndarray) -> np.ndarray: + n = len(signal) + t = np.linspace(0, 1, n, endpoint=False) + lfo = self.depth * n * np.sin(2 * np.pi * self.rate * t) + indices = np.arange(n) + lfo + indices = np.clip(indices, 0, n - 1.001) + i0 = np.floor(indices).astype(int) + i1 = np.ceil(indices).astype(int) + frac = indices - i0 + output = (1 - frac) * signal[i0] + frac * signal[i1] + return output.astype(np.float32) + + return SoundEffectVibrato(depth=depth, rate=rate) + + @staticmethod + def bitcrusher(bits: int = 4, reduction: int = 6): + class SoundEffectBitcrusher: + def __init__(self, bits: int = 4, reduction: int = 4): + """ + Bitcrusher effect. + Args: + bit_depth (int): Number of bits for quantization (1-16). + reduction (int): Redeuction factor for downsampling (>=1). + """ + self.bit_depth = np.clip(bits, 1, 16) + self.reduction = max(1, reduction) + + def apply(self, signal: np.ndarray) -> np.ndarray: + # Downsampling + reduced = signal[:: self.reduction] + expanded = np.repeat(reduced, self.reduction) + expanded = expanded[: len(signal)] # taglia se serve + + # Quantization + levels = 2**self.bit_depth + crushed = np.round(expanded * (levels / 2)) / (levels / 2) + crushed = np.clip(crushed, -1.0, 1.0) + return crushed.astype(np.float32) + + return SoundEffectBitcrusher(bits, reduction) + + @staticmethod + def octaver(oct_up: bool = True, oct_down: bool = False): + class SoundEffectOctaver: + def __init__(self, oct_up: bool = True, oct_down: bool = True): + """ + Octaver effect. + Args: + oct_up (bool): Add one octave above the original signal. + oct_down (bool): Add one octave below the original signal. + """ + self.oct_up = oct_up + self.oct_down = oct_down + + def apply(self, signal: np.ndarray) -> np.ndarray: + """ + Apply the octaver effect to a mono audio signal. + signal: numpy array with float values in range [-1, 1] + """ + output = signal.astype(np.float32) + n = len(signal) + + # Upper octave + if self.oct_up: + up = np.zeros(n, dtype=np.float32) + up[: n // 2] = signal[::2] + output += up + + # Lower octave + if self.oct_down: + down = np.zeros(n, dtype=np.float32) + down[::2] = signal[: n // 2] + output += down + + # Normalize to prevent clipping + max_val = np.max(np.abs(output)) + if max_val > 1.0: + output /= max_val + + return output + + return SoundEffectOctaver(oct_up, oct_down) diff --git a/src/arduino/app_bricks/sound_generator/examples/1_play_sequence.py b/src/arduino/app_bricks/sound_generator/examples/1_play_sequence.py new file mode 100644 index 00000000..906f6b89 --- /dev/null +++ b/src/arduino/app_bricks/sound_generator/examples/1_play_sequence.py @@ -0,0 +1,51 @@ +# SPDX-FileCopyrightText: Copyright (C) 2025 ARDUINO SA +# +# SPDX-License-Identifier: MPL-2.0 + +# EXAMPLE_NAME: Play a sequence of notes (Fur Elise) +from arduino.app_bricks.sound_generator import SoundGenerator, SoundEffect +from arduino.app_utils import App + +player = SoundGenerator(sound_effects=[SoundEffect.adsr()]) + +fur_elise = [ + ("E5", 1 / 8), + ("D#5", 1 / 8), + ("E5", 1 / 8), + ("D#5", 1 / 8), + ("E5", 1 / 8), + ("B4", 1 / 8), + ("D5", 1 / 8), + ("C5", 1 / 8), + ("A4", 1 / 4), + ("C4", 1 / 8), + ("E4", 1 / 8), + ("A4", 1 / 8), + ("B4", 1 / 8), + ("E4", 1 / 8), + ("G#4", 1 / 8), + ("B4", 1 / 8), + ("C5", 1 / 8), + ("E4", 1 / 8), + ("E5", 1 / 8), + ("D#5", 1 / 8), + ("E5", 1 / 8), + ("D#5", 1 / 8), + ("E5", 1 / 8), + ("B4", 1 / 8), + ("D5", 1 / 8), + ("C5", 1 / 8), + ("A4", 1 / 4), + ("C4", 1 / 8), + ("E4", 1 / 8), + ("A4", 1 / 8), + ("B4", 1 / 4), + ("E4", 1 / 8), + ("C5", 1 / 8), + ("B4", 1 / 8), + ("A4", 1), +] +for note, duration in fur_elise: + player.play(note, duration) + +App.run() diff --git a/src/arduino/app_bricks/sound_generator/loaders.py b/src/arduino/app_bricks/sound_generator/loaders.py new file mode 100644 index 00000000..2e2636ec --- /dev/null +++ b/src/arduino/app_bricks/sound_generator/loaders.py @@ -0,0 +1,181 @@ +# SPDX-FileCopyrightText: Copyright (C) 2025 ARDUINO SA +# +# SPDX-License-Identifier: MPL-2.0 + +from arduino.app_utils.logger import Logger +from typing import List, Tuple +import re + +logger = Logger(__name__) + + +class ABCNotationLoader: + NOT_HANDLED_RESERVED_LINES = r"^[A-Za-z]:" + + @staticmethod + def _parse_abc_duration(duration_str: str, default_duration_in_seconds: float) -> float: + """ + Parse ABC duration notation (e.g., '2', '/2', '3/2'). + The returned duration is in absolute seconds. + - default_duration_in_seconds: The absolute duration (in seconds) of the + note specified by the L: field, calculated using the Q: field (BPM). + Args: + duration_str (str): Duration string from ABC notation. + default_duration_in_seconds (float): Default duration in seconds for a single unit. + Returns: + float: Calculated duration in seconds. + """ + if not duration_str: + return default_duration_in_seconds + + # Handle fractions (e.g., C/2, C/4) + if "/" in duration_str: + parts = duration_str.split("/") + # Handles C/ (division by 2) and C4/2 (multiplication by 4, division by 2) + numerator = int(parts[0]) if parts[0] else 1 + denominator = int(parts[1]) if len(parts) > 1 and parts[1] else 2 + + return default_duration_in_seconds * numerator / denominator + + try: + multiplier = int(duration_str) + return default_duration_in_seconds * multiplier + except ValueError: + return default_duration_in_seconds + + @staticmethod + def parse_abc_notation(abc_string: str, default_octave: int = 4) -> Tuple[dict, List[Tuple[str, float]]]: + """ + Parse ABC notation and convert to an array of (note, duration_in_seconds) tuples. + Args: + abc_string (str): ABC notation string. + default_octave (int): Default octave for uppercase notes (C4). + + Returns: + Tuple[dict, List[Tuple[str, float]]]: Metadata dictionary and list of (note, duration) tuples. + """ + + metadata = {} + + lines = abc_string.split("\n") + music_lines = [] + + # --- Parse Header Fields --- + for line in lines: + line = line.strip() + if re.match(ABCNotationLoader.NOT_HANDLED_RESERVED_LINES, line): + if line.startswith("X:"): + metadata["reference"] = line[2:].strip() + elif line.startswith("T:"): + metadata["title"] = line[2:].strip() + elif line.startswith("K:"): + metadata["key"] = line[2:].strip() + elif line.startswith("L:"): + metadata["default_length"] = line[2:].strip() + elif line.startswith("Q:"): + metadata["tempo"] = line[2:].strip() + elif line.startswith("M:"): + metadata["meter"] = line[2:].strip() + elif line.startswith("C:"): + metadata["composer"] = line[2:].strip() + elif line.startswith("R:"): + metadata["rhythm"] = line[2:].strip() + elif not line.startswith("%") and line: + music_lines.append(line) + + # Standard ABC default for L: is 1/8 if not specified. + default_unit_fraction = 1 / 8 + + if "default_length" in metadata and metadata["default_length"]: + match_L = re.match(r"(\d+)/(\d+)", metadata["default_length"]) + if match_L: + num, denom = int(match_L.group(1)), int(match_L.group(2)) + default_unit_fraction = num / denom + + bpm = 120 # Default BPM if Q: is not specified + beat_unit_fraction = 0.25 # Default beat unit (1/4 or quarter note) + + if "tempo" in metadata and metadata["tempo"]: + # Q: field is typically 'note_fraction=BPM', e.g. '1/4=120' + match_Q = re.match(r"(\d+/\d+)=(\d+)", metadata["tempo"].replace(" ", "")) + + if match_Q: + note_str, bpm_str = match_Q.groups() + bpm = int(bpm_str) + + q_num, q_denom = map(int, note_str.split("/")) + beat_unit_fraction = q_num / q_denom + else: + try: + bpm = int(metadata["tempo"].replace(" ", "")) + except ValueError: + pass # Keep default BPM + + # Duration in seconds of the note specified as the beat unit (Q: note) + duration_of_beat_unit = 60.0 / bpm + + # Calculate the ratio between the default L: unit and the Q: beat unit. + # This handles cases where L: and Q: define different note values (e.g., L:1/16, Q:1/4=120) + ratio_to_beat_unit = default_unit_fraction / beat_unit_fraction + + # The absolute duration in seconds of the note defined by L: + default_duration_in_seconds = ratio_to_beat_unit * duration_of_beat_unit + + # Informational output + if "title" in metadata: + logger.info(f"Playing: {metadata['title']}") + logger.info(f"BPM: {bpm}, Beat Unit Fraction: {beat_unit_fraction:.3f}, Default L: {default_unit_fraction:.3f}") + logger.info(f"Duration of 1 beat: {duration_of_beat_unit:.3f}s. Default L: Duration: {default_duration_in_seconds:.3f}s") + + # --- 5. Parse Music Lines --- + music_string = " ".join(music_lines) + result = [] + + # Tokenize notes, rests, and bar lines + music_string = re.sub(r'"[^"]*"', "", music_string) # Remove chord annotations + tokens = re.findall(r"[A-Ga-g][',]*[#b]?[0-9]*/?[0-9]*|z[0-9]*/?[0-9]*|\|", music_string) + + for token in tokens: + if token == "|": + continue + + # Parse Rest + if token.startswith("z"): + # Use the duration in seconds as the base unit + duration = ABCNotationLoader._parse_abc_duration(token[1:], default_duration_in_seconds) + result.append(("REST", duration)) + continue + + # Parse Note + note_char = token[0] + rest = token[1:] + + octave = default_octave + if note_char.islower(): + octave = octave + 1 + note_char = note_char.upper() + + # Handle octave markers (',) - adjust octave accordingly - increase/decrease octave + octave_markers = re.findall(r"[',]", rest) + for marker in octave_markers: + if marker == "'": + octave += 1 + elif marker == ",": + octave -= 1 + + rest = re.sub(r"[',]", "", rest) + + # Handle accidentals (# sharp, b flat) + accidental = "" + if rest and rest[0] in ["#", "b"]: + accidental = rest[0].upper() + rest = rest[1:] + + duration = ABCNotationLoader._parse_abc_duration(rest, default_duration_in_seconds) + + # Build note name (e.g., C#4) + note_name = f"{note_char}{accidental}{octave}" + result.append((note_name, duration)) + + metadata["actual_bpm"] = bpm + return metadata, result diff --git a/src/arduino/app_peripherals/speaker/__init__.py b/src/arduino/app_peripherals/speaker/__init__.py index d5c423fb..63a9edab 100644 --- a/src/arduino/app_peripherals/speaker/__init__.py +++ b/src/arduino/app_peripherals/speaker/__init__.py @@ -352,10 +352,15 @@ def _clear_queue(self): break logger.debug("Playback queue cleared.") - def start(self): + def start(self, notify_if_started: bool = True): """Start the spaker stream by opening the PCM device.""" if self._is_reproducing.is_set(): - raise RuntimeError("Spaker is already reproducing audio, cannot start again.") + if notify_if_started: + raise RuntimeError("Spaker is already reproducing audio, cannot start again.") + else: + logger.debug("Spaker is already reproducing audio, start() call ignored.") + return + self._clear_queue() self._open_pcm() self._is_reproducing.set() @@ -367,7 +372,7 @@ def start(self): def stop(self): """Close the PCM device if open.""" if not self._is_reproducing.is_set(): - logger.warning("Spaker is not recording, nothing to stop.") + logger.debug("Spaker is not recording, nothing to stop.") return # Stop the playback thread diff --git a/src/arduino/app_utils/audio.py b/src/arduino/app_utils/audio.py index aac98256..3e8815e8 100644 --- a/src/arduino/app_utils/audio.py +++ b/src/arduino/app_utils/audio.py @@ -6,6 +6,70 @@ import numpy as np +class WaveGenerator: + """Generate wave audio blocks. + + This class produces wave blocks as NumPy buffers. + + Attributes: + sample_rate (int): Audio sample rate in Hz. + """ + + def __init__(self, wave_form: str = "sine", sample_rate: int = 16000): + """Create a new WaveGenerator. + + Args: + wave_form (str): The type of wave form to generate. Supported values + are "sine", "square", "triangle", "white_noise" and "sawtooth". + sample_rate (int): The playback sample rate (Hz) used to compute + phase increments and buffer sizes. + """ + self.wave_form = wave_form.lower() + self.sample_rate = int(sample_rate) + + def generate_block(self, freq: float, block_dur: float, master_volume: float = 1.0): + """Generate a block of float32 audio samples. + + Returned buffer is a NumPy view (float32) into an internal preallocated array and is valid + until the next call to this method. + + Args: + freq (float): Target frequency in Hz for this block. + block_dur (float): Duration of the requested block in seconds. + master_volume (float, optional): Global gain multiplier. Defaults + to 1.0. + + Returns: + numpy.ndarray: A 1-D float32 NumPy array containing the generated + audio samples for the requested block. + """ + N = max(1, int(self.sample_rate * block_dur)) + + # compute wave form based on selected type + t = np.arange(N, dtype=np.float32) / self.sample_rate + + match self.wave_form: + case "square": + samples = 0.5 * (1 + np.sign(np.sin(2.0 * np.pi * freq * t))) + case "triangle": + samples = 2.0 * np.abs(2.0 * (freq * t % 1) - 1.0) - 1.0 + case "sawtooth": + samples = 2.0 * (freq * t % 1.0) - 1.0 + case "white_noise": + samples = np.random.uniform(-1.0, 1.0, size=N).astype(np.float32) + case _: # "sine" e default + samples = np.sin(2.0 * np.pi * freq * t) + + samples = samples.astype(np.float32) + + # apply gain + mg = float(master_volume) + if mg != 1.0: + np.multiply(samples, mg, out=samples) + + return samples + + class SineGenerator: """Generate sine-wave audio blocks with amplitude envelope smoothing. From e3e6ac4e60d2dfa6d4a373b8793d3144b78c45fa Mon Sep 17 00:00:00 2001 From: Marco Colombo Date: Fri, 24 Oct 2025 17:20:48 +0200 Subject: [PATCH 02/22] support for transpose directive --- src/arduino/app_bricks/sound_generator/loaders.py | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/src/arduino/app_bricks/sound_generator/loaders.py b/src/arduino/app_bricks/sound_generator/loaders.py index 2e2636ec..6b454dac 100644 --- a/src/arduino/app_bricks/sound_generator/loaders.py +++ b/src/arduino/app_bricks/sound_generator/loaders.py @@ -80,6 +80,15 @@ def parse_abc_notation(abc_string: str, default_octave: int = 4) -> Tuple[dict, metadata["composer"] = line[2:].strip() elif line.startswith("R:"): metadata["rhythm"] = line[2:].strip() + elif line.startswith("%%transpose"): + # Handle transpose directive if needed + matched = re.match(r"%%transpose\s+(-?\d+)", line) + if matched: + # only octave transposition is supported + octaves = int(matched.group(1)) / 12 + if octaves + default_octave < 0: + octaves = 0 + metadata["transpose"] = int(octaves) elif not line.startswith("%") and line: music_lines.append(line) @@ -126,6 +135,8 @@ def parse_abc_notation(abc_string: str, default_octave: int = 4) -> Tuple[dict, logger.info(f"Playing: {metadata['title']}") logger.info(f"BPM: {bpm}, Beat Unit Fraction: {beat_unit_fraction:.3f}, Default L: {default_unit_fraction:.3f}") logger.info(f"Duration of 1 beat: {duration_of_beat_unit:.3f}s. Default L: Duration: {default_duration_in_seconds:.3f}s") + if "transpose" in metadata: + logger.info(f"Transposing by {metadata['transpose']} octaves. Target default octave: {default_octave + metadata['transpose']}") # --- 5. Parse Music Lines --- music_string = " ".join(music_lines) @@ -151,6 +162,8 @@ def parse_abc_notation(abc_string: str, default_octave: int = 4) -> Tuple[dict, rest = token[1:] octave = default_octave + if "transpose" in metadata: + octave += metadata["transpose"] if note_char.islower(): octave = octave + 1 note_char = note_char.upper() From 149f389ba950936fd282dd63e39b921d2ffed8f9 Mon Sep 17 00:00:00 2001 From: Marco Colombo Date: Mon, 27 Oct 2025 11:51:12 +0100 Subject: [PATCH 03/22] Add play poliphonic sounds --- .../app_bricks/sound_generator/__init__.py | 56 +++++++++++++++++++ 1 file changed, 56 insertions(+) diff --git a/src/arduino/app_bricks/sound_generator/__init__.py b/src/arduino/app_bricks/sound_generator/__init__.py index 7b809400..7bd52066 100644 --- a/src/arduino/app_bricks/sound_generator/__init__.py +++ b/src/arduino/app_bricks/sound_generator/__init__.py @@ -245,6 +245,62 @@ def _get_note(self, note: str) -> float | None: return None return self._notes.get(note.strip().upper()) + def play_polyphonic(self, notes: list[list[tuple[str, float]]], as_tone: bool = False, volume: float = None): + """ + Play multiple sequences of musical notes simultaneously (poliphony). + It is possible to play multi track music by providing a list of sequences, + where each sequence is a list of tuples (note, duration). + Duration is in notes fractions (e.g., 1/4 for quarter note). + Args: + notes (list[list[tuple[str, float]]]): List of sequences, each sequence is a list of tuples (note, duration). + as_tone (bool): If True, play as tones, considering duration in seconds + volume (float, optional): Volume level (0.0 to 1.0). If None, uses master volume. + """ + if volume is None: + volume = self._master_volume + + # Multi track mixing + sequences_data = [] + base_frequency = None + for sequence in notes: + sequence_waves = [] + for note, duration in sequence: + frequency = self._get_note(note) + if frequency >= 0.0: + if base_frequency is None: + base_frequency = frequency + if as_tone == False: + duration = self._note_duration(duration) + data = self._wave_gen.generate_block(float(frequency), duration, volume) + sequence_waves.append(data) + else: + continue + if len(sequence_waves) > 0: + single_track_data = np.concatenate(sequence_waves) + sequences_data.append(single_track_data) + + if len(sequences_data) == 0: + return + + # Mix sequences - align lengths + max_length = max(len(seq) for seq in sequences_data) + # Pad shorter sequences with zeros + for i in range(len(sequences_data)): + seq = sequences_data[i] + if len(seq) < max_length: + padding = np.zeros(max_length - len(seq), dtype=np.float32) + sequences_data[i] = np.concatenate((seq, padding)) + + # Sum all sequences + mixed = np.sum(sequences_data, axis=0, dtype=np.float32) + mixed /= np.max(np.abs(mixed)) # Normalize to prevent clipping + blk = mixed.astype(np.float32) + blk = self._apply_sound_effects(blk, base_frequency) + try: + self._output_device.play(blk, block_on_queue=False) + except Exception as e: + print(f"Error playing multiple sequences: {e}") + def play_chord(self, notes: list[str], note_duration: float | str = 1 / 4, volume: float = None): """ Play a chord consisting of multiple musical notes simultaneously for a specified duration and volume. From 3f91a0a4e42d6616da1c0b4d42afd739b55b0405 Mon Sep 17 00:00:00 2001 From: Marco Colombo Date: Mon, 27 Oct 2025 14:55:45 +0100 Subject: [PATCH 04/22] tests --- .../app_bricks/sound_generator/test_abc.py | 95 +++++++++++++++++++ .../sound_generator/test_effects.py | 68 +++++++++++++ 2 files changed, 163 insertions(+) create mode 100644 tests/arduino/app_bricks/sound_generator/test_abc.py create mode 100644 tests/arduino/app_bricks/sound_generator/test_effects.py diff --git a/tests/arduino/app_bricks/sound_generator/test_abc.py b/tests/arduino/app_bricks/sound_generator/test_abc.py new file mode 100644 index 00000000..d1f3f007 --- /dev/null +++ b/tests/arduino/app_bricks/sound_generator/test_abc.py @@ -0,0 +1,95 @@ +from arduino.app_bricks.sound_generator import ABCNotationLoader + + +def test_abc_loader(): + full_abc = """ + X:1 + T:Main Theme + M:4/4 + L:1/8 + Q:1/4=130 + K:Cm + "Cm"E2 E2 E2 "Ab"C>G | "Cm"E2 "Ab"C>G "Cm"E4 | + "Cm"B2 B2 B2 "Ab"c>G | "Fm"^D#2 "Ab"C>G "Cm"E4 | + """ + + reference_notes = [ + ("E4", 60 / 130), + ("E4", 60 / 130), + ("E4", 60 / 130), + ("C4", (60 / 130) / 2), + ("G4", (60 / 130) / 2), + ("E4", 60 / 130), + ("C4", (60 / 130) / 2), + ("G4", (60 / 130) / 2), + ("E4", (60 / 130) * 2), + ("B4", 60 / 130), + ("B4", 60 / 130), + ("B4", 60 / 130), + ("C5", (60 / 130) / 2), + ("G4", (60 / 130) / 2), + ("D#4", 60 / 130), + ("C4", (60 / 130) / 2), + ("G4", (60 / 130) / 2), + ("E4", (60 / 130) * 2), + ] + + metadata, loaded = ABCNotationLoader.parse_abc_notation(full_abc) + assert metadata["title"] == "Main Theme" + assert "transpose" not in metadata + assert metadata["tempo"] == "1/4=130" + + i_ref = 0 + for note, duration in loaded: + print(f"Note: {note}, Duration: {duration}") + assert note == reference_notes[i_ref][0] + assert abs(duration - reference_notes[i_ref][1]) < 0.01 + i_ref += 1 + + +def test_abc_loader_with_transpose(): + full_abc = """ + X:1 + T:Main Theme + M:4/4 + L:1/8 + Q:1/4=130 + K:Cm + %%transpose -12 + "Cm"E2 E2 E2 "Ab"C>G | "Cm"E2 "Ab"C>G "Cm"E4 | + "Cm"B2 B2 B2 "Ab"c>G | "Fm"^D#2 "Ab"C>G "Cm"E4 | + """ + + reference_notes = [ + ("E3", 60 / 130), + ("E3", 60 / 130), + ("E3", 60 / 130), + ("C3", (60 / 130) / 2), + ("G3", (60 / 130) / 2), + ("E3", 60 / 130), + ("C3", (60 / 130) / 2), + ("G3", (60 / 130) / 2), + ("E3", (60 / 130) * 2), + ("B3", 60 / 130), + ("B3", 60 / 130), + ("B3", 60 / 130), + ("C4", (60 / 130) / 2), + ("G3", (60 / 130) / 2), + ("D#3", 60 / 130), + ("C3", (60 / 130) / 2), + ("G3", (60 / 130) / 2), + ("E3", (60 / 130) * 2), + ] + + metadata, loaded = ABCNotationLoader.parse_abc_notation(full_abc) + assert metadata["title"] == "Main Theme" + assert "transpose" in metadata + assert metadata["transpose"] == -1 + assert metadata["tempo"] == "1/4=130" + + i_ref = 0 + for note, duration in loaded: + print(f"Note: {note}, Duration: {duration}") + assert note == reference_notes[i_ref][0] + assert abs(duration - reference_notes[i_ref][1]) < 0.01 + i_ref += 1 diff --git a/tests/arduino/app_bricks/sound_generator/test_effects.py b/tests/arduino/app_bricks/sound_generator/test_effects.py new file mode 100644 index 00000000..781af770 --- /dev/null +++ b/tests/arduino/app_bricks/sound_generator/test_effects.py @@ -0,0 +1,68 @@ +# SPDX-FileCopyrightText: Copyright (C) 2025 ARDUINO SA +# +# SPDX-License-Identifier: MPL-2.0 + +import pytest +from arduino.app_bricks.sound_generator.effects import SoundEffect +from arduino.app_bricks.sound_generator import SoundGenerator +from arduino.app_utils.audio import WaveGenerator + + +def test_adsr_effect(): + generator = WaveGenerator(sample_rate=16000, wave_form="square") + adsr = SoundEffect.adsr() + blk = generator.generate_block(440.0, 1 / 8, 1.0) # Generate a block to initialize + assert adsr is not None + + # Apply ADSR effect + processed = adsr.apply(blk) + + assert processed is not None + assert len(processed) == len(blk) + + +def test_available_notes(): + note_sequence = [ + ("E5", 0.125), + ("E5", 0.125), + ("REST", 0.125), + ("E5", 0.125), + ("REST", 0.125), + ("C5", 0.125), + ("E5", 0.125), + ("REST", 0.125), + ("G5", 0.25), + ("REST", 0.25), + ("G4", 0.25), + ("REST", 0.25), + ("C5", 0.25), + ("REST", 0.125), + ("G4", 0.25), + ("REST", 0.125), + ("E4", 0.25), + ("REST", 0.125), + ("A4", 0.25), + ("B4", 0.25), + ("Bb4", 0.125), + ("A4", 0.25), + ("G4", 0.125), + ("E5", 0.125), + ("G5", 0.125), + ("A5", 0.25), + ("F5", 0.125), + ("G5", 0.125), + ("REST", 0.125), + ("E5", 0.25), + ("C5", 0.125), + ("D5", 0.125), + ("B4", 0.25), + ] + + generator = SoundGenerator() + for note, duration in note_sequence: + print(f"Testing note: {note}") + frequency = generator._get_note(note) + if "REST" != note: + assert frequency is not None and frequency > 0.0 + else: + assert frequency is not None and frequency == 0.0 From 30e56236387aa86f90b70d587e9c98201e8b6a79 Mon Sep 17 00:00:00 2001 From: Marco Colombo Date: Tue, 28 Oct 2025 16:12:33 +0100 Subject: [PATCH 05/22] Added bytes streamer to decouple generation and playback --- .../app_bricks/sound_generator/__init__.py | 191 +++++++++++++++--- 1 file changed, 161 insertions(+), 30 deletions(-) diff --git a/src/arduino/app_bricks/sound_generator/__init__.py b/src/arduino/app_bricks/sound_generator/__init__.py index 7bd52066..322be07c 100644 --- a/src/arduino/app_bricks/sound_generator/__init__.py +++ b/src/arduino/app_bricks/sound_generator/__init__.py @@ -5,6 +5,7 @@ from arduino.app_utils import WaveGenerator, brick from arduino.app_peripherals.speaker import Speaker import threading +from typing import Iterable import numpy as np from .effects import * @@ -12,7 +13,7 @@ @brick -class SoundGenerator: +class SoundGeneratorStreamer: SAMPLE_RATE = 16000 A4_FREQUENCY = 440.0 @@ -53,7 +54,6 @@ class SoundGenerator: def __init__( self, - output_device: Speaker = None, bpm: int = 120, time_signature: tuple = (4, 4), octaves: int = 8, @@ -61,9 +61,8 @@ def __init__( master_volume: float = 1.0, sound_effects: list = None, ): - """Initialize the SoundGenerator. + """Initialize the SoundGeneratorStreamer. Generates sound blocks for streaming, without internal playback. Args: - output_device (Speaker, optional): The output device to play sound through. wave_form (str): The type of wave form to generate. Supported values are "sine" (default), "square", "triangle" and "sawtooth". bpm (int): The tempo in beats per minute for note duration calculations. @@ -79,12 +78,6 @@ def __init__( self.time_signature = time_signature self._master_volume = master_volume self._sound_effects = sound_effects - if output_device is None: - self._self_created_device = True - self._output_device = Speaker(sample_rate=self.SAMPLE_RATE, format="FLOAT_LE") - else: - self._self_created_device = False - self._output_device = output_device self._cfg_lock = threading.Lock() self._notes = {} @@ -93,12 +86,10 @@ def __init__( self._notes.update(notes) def start(self): - if self._self_created_device: - self._output_device.start(notify_if_started=False) + pass def stop(self): - if self._self_created_device: - self._output_device.stop() + pass def set_master_volume(self, volume: float): """ @@ -245,7 +236,11 @@ def _get_note(self, note: str) -> float | None: return None return self._notes.get(note.strip().upper()) - def play_polyphonic(self, notes: list[list[tuple[str, float]]], as_tone: bool = False, volume: float = None): + def _to_bytes(self, signal: np.ndarray) -> bytes: + # Format: "FLOAT_LE" -> (ALSA: "PCM_FORMAT_FLOAT_LE", np.float32), + return signal.astype(np.float32).tobytes() + + def play_polyphonic(self, notes: list[list[tuple[str, float]]], as_tone: bool = False, volume: float = None) -> bytes: """ Play multiple sequences of musical notes simultaneously (poliphony). It is possible to play multi track music by providing a list of sequences, @@ -255,6 +250,8 @@ def play_polyphonic(self, notes: list[list[tuple[str, float]]], as_tone: bool = notes (list[list[tuple[str, float]]]): List of sequences, each sequence is a list of tuples (note, duration). as_tone (bool): If True, play as tones, considering duration in seconds volume (float, optional): Volume level (0.0 to 1.0). If None, uses master volume. + Returns: + bytes: The audio block of the mixed sequences (float32). """ if volume is None: volume = self._master_volume @@ -296,18 +293,17 @@ def play_polyphonic(self, notes: list[list[tuple[str, float]]], as_tone: bool = mixed /= np.max(np.abs(mixed)) # Normalize to prevent clipping blk = mixed.astype(np.float32) blk = self._apply_sound_effects(blk, base_frequency) - try: - self._output_device.play(blk, block_on_queue=False) - except Exception as e: - print(f"Error playing multiple sequences: {e}") + return self._to_bytes(blk) - def play_chord(self, notes: list[str], note_duration: float | str = 1 / 4, volume: float = None): + def play_chord(self, notes: list[str], note_duration: float | str = 1 / 4, volume: float = None) -> bytes: """ Play a chord consisting of multiple musical notes simultaneously for a specified duration and volume. Args: notes (list[str]): List of musical notes to play (e.g., ['A4', 'C#5', 'E5']). note_duration (float | str): Duration of the chord as a float (like 1/4, 1/8) or a symbol ('W', 'H', 'Q', etc.). volume (float, optional): Volume level (0.0 to 1.0). If None, uses master volume. + Returns: + bytes: The audio block of the mixed sequences (float32). """ duration = self._note_duration(note_duration) if len(notes) == 1: @@ -333,18 +329,17 @@ def play_chord(self, notes: list[str], note_duration: float | str = 1 / 4, volum chord /= np.max(np.abs(chord)) # Normalize to prevent clipping blk = chord.astype(np.float32) blk = self._apply_sound_effects(blk, base_frequency) - try: - self._output_device.play(blk, block_on_queue=False) - except Exception as e: - print(f"Error playing chord {notes}: {e}") + return self._to_bytes(blk) - def play(self, note: str, note_duration: float | str = 1 / 4, volume: float = None): + def play(self, note: str, note_duration: float | str = 1 / 4, volume: float = None) -> bytes: """ Play a musical note for a specified duration and volume. Args: note (str): The musical note to play (e.g., 'A4', 'C#5', 'REST'). note_duration (float | str): Duration of the note as a float (like 1/4, 1/8) or a symbol ('W', 'H', 'Q', etc.). volume (float, optional): Volume level (0.0 to 1.0). If None, uses master volume. + Returns: + bytes: The audio block of the played note (float32). """ duration = self._note_duration(note_duration) frequency = self._get_note(note) @@ -353,15 +348,17 @@ def play(self, note: str, note_duration: float | str = 1 / 4, volume: float = No volume = self._master_volume data = self._wave_gen.generate_block(float(frequency), duration, volume) data = self._apply_sound_effects(data, frequency) - self._output_device.play(data, block_on_queue=False) + return self._to_bytes(data) - def play_tone(self, note: str, duration: float = 0.25, volume: float = None): + def play_tone(self, note: str, duration: float = 0.25, volume: float = None) -> bytes: """ Play a musical note for a specified duration and volume. Args: note (str): The musical note to play (e.g., 'A4', 'C#5', 'REST'). duration (float): Duration of the note as a float in seconds. volume (float, optional): Volume level (0.0 to 1.0). If None, uses master volume. + Returns: + bytes: The audio block of the played note (float32). """ frequency = self._get_note(note) if frequency is not None and frequency >= 0.0 and duration > 0.0: @@ -369,14 +366,16 @@ def play_tone(self, note: str, duration: float = 0.25, volume: float = None): volume = self._master_volume data = self._wave_gen.generate_block(float(frequency), duration, volume) data = self._apply_sound_effects(data, frequency) - self._output_device.play(data, block_on_queue=False) + return self._to_bytes(data) - def play_abc(self, abc_string: str, volume: float = None): + def play_abc(self, abc_string: str, volume: float = None) -> Iterable[bytes]: """ Play a sequence of musical notes defined in ABC notation. Args: abc_string (str): ABC notation string defining the sequence of notes. volume (float, optional): Volume level (0.0 to 1.0). If None, uses master volume. + Returns: + Iterable[bytes]: An iterable yielding the audio blocks of the played notes (float32). """ if not abc_string or abc_string.strip() == "": return @@ -388,4 +387,136 @@ def play_abc(self, abc_string: str, volume: float = None): if frequency is not None and frequency >= 0.0: data = self._wave_gen.generate_block(float(frequency), duration, volume) data = self._apply_sound_effects(data, frequency) - self._output_device.play(data, block_on_queue=False) + yield self._to_bytes(data) + + +@brick +class SoundGenerator(SoundGeneratorStreamer): + def __init__( + self, + output_device: Speaker = None, + bpm: int = 120, + time_signature: tuple = (4, 4), + octaves: int = 8, + wave_form: str = "sine", + master_volume: float = 1.0, + sound_effects: list = None, + ): + """Initialize the SoundGenerator. + Args: + output_device (Speaker, optional): The output device to play sound through. + wave_form (str): The type of wave form to generate. Supported values + are "sine" (default), "square", "triangle" and "sawtooth". + bpm (int): The tempo in beats per minute for note duration calculations. + master_volume (float): The master volume level (0.0 to 1.0). + octaves (int): Number of octaves to generate notes for (starting from octave + 0 up to octaves-1). + sound_effects (list, optional): List of sound effect instances to apply to the audio + signal (e.g., [SoundEffect.adsr()]). See SoundEffect class for available effects. + """ + + super().__init__( + bpm=bpm, + time_signature=time_signature, + octaves=octaves, + wave_form=wave_form, + master_volume=master_volume, + sound_effects=sound_effects, + ) + + if output_device is None: + self._self_created_device = True + self._output_device = Speaker(sample_rate=self.SAMPLE_RATE, format="FLOAT_LE") + else: + self._self_created_device = False + self._output_device = output_device + + def start(self): + if self._self_created_device: + self._output_device.start(notify_if_started=False) + + def stop(self): + if self._self_created_device: + self._output_device.stop() + + def set_master_volume(self, volume: float): + """ + Set the master volume level. + Args: + volume (float): Volume level (0.0 to 1.0). + """ + super().set_master_volume(volume) + + def set_effects(self, effects: list): + """ + Set the list of sound effects to apply to the audio signal. + Args: + effects (list): List of sound effect instances (e.g., [SoundEffect.adsr()]). + """ + super().set_effects(effects) + + def play_polyphonic(self, notes: list[list[tuple[str, float]]], as_tone: bool = False, volume: float = None): + """ + Play multiple sequences of musical notes simultaneously (poliphony). + It is possible to play multi track music by providing a list of sequences, + where each sequence is a list of tuples (note, duration). + Duration is in notes fractions (e.g., 1/4 for quarter note). + Args: + notes (list[list[tuple[str, float]]]): List of sequences, each sequence is a list of tuples (note, duration). + as_tone (bool): If True, play as tones, considering duration in seconds + volume (float, optional): Volume level (0.0 to 1.0). If None, uses master volume. + """ + blk = super().play_polyphonic(notes, as_tone, volume) + try: + self._output_device.play(blk, block_on_queue=False) + except Exception as e: + print(f"Error playing multiple sequences: {e}") + + def play_chord(self, notes: list[str], note_duration: float | str = 1 / 4, volume: float = None): + """ + Play a chord consisting of multiple musical notes simultaneously for a specified duration and volume. + Args: + notes (list[str]): List of musical notes to play (e.g., ['A4', 'C#5', 'E5']). + note_duration (float | str): Duration of the chord as a float (like 1/4, 1/8) or a symbol ('W', 'H', 'Q', etc.). + volume (float, optional): Volume level (0.0 to 1.0). If None, uses master volume. + """ + blk = super().play_chord(notes, note_duration, volume) + try: + self._output_device.play(blk, block_on_queue=False) + except Exception as e: + print(f"Error playing chord {notes}: {e}") + + def play(self, note: str, note_duration: float | str = 1 / 4, volume: float = None): + """ + Play a musical note for a specified duration and volume. + Args: + note (str): The musical note to play (e.g., 'A4', 'C#5', 'REST'). + note_duration (float | str): Duration of the note as a float (like 1/4, 1/8) or a symbol ('W', 'H', 'Q', etc.). + volume (float, optional): Volume level (0.0 to 1.0). If None, uses master volume. + """ + data = super().play(note, note_duration, volume) + self._output_device.play(data, block_on_queue=False) + + def play_tone(self, note: str, duration: float = 0.25, volume: float = None): + """ + Play a musical note for a specified duration and volume. + Args: + note (str): The musical note to play (e.g., 'A4', 'C#5', 'REST'). + duration (float): Duration of the note as a float in seconds. + volume (float, optional): Volume level (0.0 to 1.0). If None, uses master volume. + """ + data = super().play_tone(note, duration, volume) + self._output_device.play(data, block_on_queue=False) + + def play_abc(self, abc_string: str, volume: float = None): + """ + Play a sequence of musical notes defined in ABC notation. + Args: + abc_string (str): ABC notation string defining the sequence of notes. + volume (float, optional): Volume level (0.0 to 1.0). If None, uses master volume. + """ + if not abc_string or abc_string.strip() == "": + return + player = super().play_abc(abc_string, volume) + for data in player: + self._output_device.play(data, block_on_queue=False) From 7f3997699297987b7008f1060eabb4f9b670ab65 Mon Sep 17 00:00:00 2001 From: Marco Colombo Date: Wed, 29 Oct 2025 16:01:55 +0100 Subject: [PATCH 06/22] Work ok effects --- .../app_bricks/sound_generator/README.md | 4 +- .../app_bricks/sound_generator/__init__.py | 50 ++++++++++---- .../app_bricks/sound_generator/effects.py | 12 ++-- .../examples/2_stream_sequence.py | 66 ++++++++++++++++++ .../examples/3_play_abc_notation.py | 31 +++++++++ .../sound_generator/examples/4_effects.py | 67 +++++++++++++++++++ 6 files changed, 209 insertions(+), 21 deletions(-) create mode 100644 src/arduino/app_bricks/sound_generator/examples/2_stream_sequence.py create mode 100644 src/arduino/app_bricks/sound_generator/examples/3_play_abc_notation.py create mode 100644 src/arduino/app_bricks/sound_generator/examples/4_effects.py diff --git a/src/arduino/app_bricks/sound_generator/README.md b/src/arduino/app_bricks/sound_generator/README.md index d78021d3..7b85e650 100644 --- a/src/arduino/app_bricks/sound_generator/README.md +++ b/src/arduino/app_bricks/sound_generator/README.md @@ -1,6 +1,6 @@ # Sound Generator Brick -Play sounds and melodies +Play sounds and melodies ## Code example and usage @@ -35,7 +35,7 @@ waveform can be customized to change effect. For example, for a retro-gaming sou player = SoundGenerator(wave_form="square") ``` -instead, to have a more "rock" like sound, you can add effect +instead, to have a more "rock" like sound, you can add effects like: ```python player = SoundGenerator(sound_effects=[SoundEffect.adsr(), SoundEffect.overdrive(drive=180.0), SoundEffect.chorus(depth_ms=15, rate_hz=0.2, mix=0.4)]) diff --git a/src/arduino/app_bricks/sound_generator/__init__.py b/src/arduino/app_bricks/sound_generator/__init__.py index 322be07c..07ec0efa 100644 --- a/src/arduino/app_bricks/sound_generator/__init__.py +++ b/src/arduino/app_bricks/sound_generator/__init__.py @@ -7,6 +7,7 @@ import threading from typing import Iterable import numpy as np +import time from .effects import * from .loaders import ABCNotationLoader @@ -73,13 +74,14 @@ def __init__( signal (e.g., [SoundEffect.adsr()]). See SoundEffect class for available effects. """ - self._wave_gen = WaveGenerator(sample_rate=self.SAMPLE_RATE, wave_form=wave_form) + self._cfg_lock = threading.Lock() + self._init_wave_generator(wave_form) + self._bpm = bpm self.time_signature = time_signature self._master_volume = master_volume self._sound_effects = sound_effects - self._cfg_lock = threading.Lock() self._notes = {} for octave in range(octaves): notes = self._fill_node_frequencies(octave) @@ -91,6 +93,19 @@ def start(self): def stop(self): pass + def _init_wave_generator(self, wave_form: str): + with self._cfg_lock: + self._wave_gen = WaveGenerator(sample_rate=self.SAMPLE_RATE, wave_form=wave_form) + + def set_wave_form(self, wave_form: str): + """ + Set the wave form type for sound generation. + Args: + wave_form (str): The type of wave form to generate. Supported values + are "sine", "square", "triangle" and "sawtooth". + """ + self._init_wave_generator(wave_form) + def set_master_volume(self, volume: float): """ Set the master volume level. @@ -368,14 +383,14 @@ def play_tone(self, note: str, duration: float = 0.25, volume: float = None) -> data = self._apply_sound_effects(data, frequency) return self._to_bytes(data) - def play_abc(self, abc_string: str, volume: float = None) -> Iterable[bytes]: + def play_abc(self, abc_string: str, volume: float = None) -> Iterable[tuple[bytes, float]]: """ Play a sequence of musical notes defined in ABC notation. Args: abc_string (str): ABC notation string defining the sequence of notes. volume (float, optional): Volume level (0.0 to 1.0). If None, uses master volume. Returns: - Iterable[bytes]: An iterable yielding the audio blocks of the played notes (float32). + Iterable[tuple[bytes, float]]: An iterable yielding the audio blocks of the played notes (float32) and its duration. """ if not abc_string or abc_string.strip() == "": return @@ -387,7 +402,7 @@ def play_abc(self, abc_string: str, volume: float = None) -> Iterable[bytes]: if frequency is not None and frequency >= 0.0: data = self._wave_gen.generate_block(float(frequency), duration, volume) data = self._apply_sound_effects(data, frequency) - yield self._to_bytes(data) + yield (self._to_bytes(data), duration) @brick @@ -424,20 +439,26 @@ def __init__( sound_effects=sound_effects, ) + self._started = threading.Event() if output_device is None: - self._self_created_device = True + self.external_speaker = False self._output_device = Speaker(sample_rate=self.SAMPLE_RATE, format="FLOAT_LE") + self.start() else: - self._self_created_device = False + self.external_speaker = True self._output_device = output_device def start(self): - if self._self_created_device: + if self._started.is_set(): + return + if self.external_speaker == False: self._output_device.start(notify_if_started=False) + self._started.set() def stop(self): - if self._self_created_device: + if self.external_speaker == False: self._output_device.stop() + self._started.clear() def set_master_volume(self, volume: float): """ @@ -508,15 +529,20 @@ def play_tone(self, note: str, duration: float = 0.25, volume: float = None): data = super().play_tone(note, duration, volume) self._output_device.play(data, block_on_queue=False) - def play_abc(self, abc_string: str, volume: float = None): + def play_abc(self, abc_string: str, volume: float = None, wait_completion: bool = False): """ Play a sequence of musical notes defined in ABC notation. Args: abc_string (str): ABC notation string defining the sequence of notes. volume (float, optional): Volume level (0.0 to 1.0). If None, uses master volume. + wait_completion (bool): If True, block until the entire sequence has been played. """ if not abc_string or abc_string.strip() == "": return player = super().play_abc(abc_string, volume) - for data in player: - self._output_device.play(data, block_on_queue=False) + overall_duration = 0.0 + for data, duration in player: + self._output_device.play(data, block_on_queue=True) + overall_duration += duration + if wait_completion: + time.sleep(overall_duration) diff --git a/src/arduino/app_bricks/sound_generator/effects.py b/src/arduino/app_bricks/sound_generator/effects.py index c4e9cf20..be220388 100644 --- a/src/arduino/app_bricks/sound_generator/effects.py +++ b/src/arduino/app_bricks/sound_generator/effects.py @@ -7,7 +7,7 @@ class SoundEffect: @staticmethod - def overdrive(drive: float = 1.0) -> np.ndarray: + def overdrive(drive: float = 100.0) -> np.ndarray: """ Apply overdrive effect to the audio signal. Args: @@ -29,12 +29,11 @@ def apply(self, signal: np.ndarray) -> np.ndarray: return SoundEffectOverdrive(drive) @staticmethod - def chorus(fs: int = 16000, depth_ms=10, rate_hz: float = 0.25, mix: float = 0.5) -> np.ndarray: + def chorus(depth_ms=10, rate_hz: float = 0.25, mix: float = 0.5) -> np.ndarray: """ Apply chorus effect to the audio signal. Args: signal (np.ndarray): Input audio signal. - fs (int): Sampling frequency in Hz. depth_ms (float): Depth of the chorus effect in milliseconds. rate_hz (float): Rate of the LFO in Hz. mix (float): Mix ratio between dry and wet signals (0.0 to 1.0). @@ -43,8 +42,8 @@ def chorus(fs: int = 16000, depth_ms=10, rate_hz: float = 0.25, mix: float = 0.5 """ class SoundEffectChorus: - def __init__(self, fs: int = 16000, depth_ms: int = 10, rate_hz: float = 0.25, mix: float = 0.5): - self.fs = fs + def __init__(self, depth_ms: int = 10, rate_hz: float = 0.25, mix: float = 0.5): + self.fs = 16000 # sample rate self.depth_ms = depth_ms self.rate_hz = rate_hz self.mix = mix @@ -67,14 +66,13 @@ def apply(self, signal: np.ndarray) -> np.ndarray: # mix dry/wet return ((1 - self.mix) * signal + self.mix * out).astype(np.float32) - return SoundEffectChorus(fs, depth_ms, rate_hz, mix) + return SoundEffectChorus(depth_ms, rate_hz, mix) @staticmethod def adsr(attack: float = 0.015, decay: float = 0.2, sustain: float = 0.5, release: float = 0.35): """ Apply ADSR (attack/decay/sustain/release) envelope to the audio signal. Args: - fs (int): Sampling frequency in Hz. attack (float): Attack time in seconds. decay (float): Decay time in seconds. sustain (float): Sustain level (0.0 to 1.0). diff --git a/src/arduino/app_bricks/sound_generator/examples/2_stream_sequence.py b/src/arduino/app_bricks/sound_generator/examples/2_stream_sequence.py new file mode 100644 index 00000000..74718a95 --- /dev/null +++ b/src/arduino/app_bricks/sound_generator/examples/2_stream_sequence.py @@ -0,0 +1,66 @@ +# SPDX-FileCopyrightText: Copyright (C) 2025 ARDUINO SA +# +# SPDX-License-Identifier: MPL-2.0 + +# EXAMPLE_NAME: Stream a sequence of notes over websocket via WebUI +import time +from arduino.app_utils import * +from arduino.app_bricks.web_ui import WebUI +from arduino.app_bricks.sound_generator import SoundGeneratorStreamer, SoundEffect + +ui = WebUI() + +player = SoundGeneratorStreamer(master_volume=1.0, wave_form="square", bpm=120, sound_effects=[SoundEffect.adsr()]) + +tune_sequence = [ + ("E5", 0.125), + ("E5", 0.125), + ("REST", 0.125), + ("E5", 0.125), + ("REST", 0.125), + ("C5", 0.125), + ("E5", 0.125), + ("REST", 0.125), + ("G5", 0.25), + ("REST", 0.25), + ("G4", 0.25), + ("REST", 0.25), + ("C5", 0.25), + ("REST", 0.125), + ("G4", 0.25), + ("REST", 0.125), + ("E4", 0.25), + ("REST", 0.125), + ("A4", 0.25), + ("B4", 0.25), + ("Bb4", 0.125), + ("A4", 0.25), + ("G4", 0.125), + ("E5", 0.125), + ("G5", 0.125), + ("A5", 0.25), + ("F5", 0.125), + ("G5", 0.125), + ("REST", 0.125), + ("E5", 0.25), + ("C5", 0.125), + ("D5", 0.125), + ("B4", 0.25), +] + + +def user_lp(): + while True: + overall_time = 0 + for note, duration in tune_sequence: + frame = player.play_tone(note, duration) + entry = { + "raw_data": frame, + } + ui.send_message("audio_frame", entry) + overall_time += duration + + time.sleep(overall_time) # wait for the whole sequence to finish before restarting + + +App.run(user_loop=user_lp) diff --git a/src/arduino/app_bricks/sound_generator/examples/3_play_abc_notation.py b/src/arduino/app_bricks/sound_generator/examples/3_play_abc_notation.py new file mode 100644 index 00000000..b455130d --- /dev/null +++ b/src/arduino/app_bricks/sound_generator/examples/3_play_abc_notation.py @@ -0,0 +1,31 @@ +# SPDX-FileCopyrightText: Copyright (C) 2025 ARDUINO SA +# +# SPDX-License-Identifier: MPL-2.0 + +# EXAMPLE_NAME: Play music in ABC notation +from arduino.app_bricks.sound_generator import SoundGenerator, SoundEffect +from arduino.app_utils import App + +player = SoundGenerator(sound_effects=[SoundEffect.adsr()]) + + +def play_melody(): + abc_music = """ + X:1 + T:Twinkle, Twinkle Little Star - #11 + T:Alphabet Song + C:Traditional Kid's Song + M:4/4 + L:1/4 + K:D + |"D"D D A A|"G"B B "D"A2 + |"G"G G "D"F F|"A"E/2E/2E/2E/2 "D"D2 + |A A "G"G G|"D"F F "A"E2 + |"D"A A "G"G G|"D"F F "A"E2 + |"D"D D A A|"G"B B "D"A2 + |"G"G G "D"F F|"A"E E "D"D2| + """ + player.play_abc(abc_music, wait_completion=True) + + +App.run(user_loop=play_melody) diff --git a/src/arduino/app_bricks/sound_generator/examples/4_effects.py b/src/arduino/app_bricks/sound_generator/examples/4_effects.py new file mode 100644 index 00000000..1c182bc4 --- /dev/null +++ b/src/arduino/app_bricks/sound_generator/examples/4_effects.py @@ -0,0 +1,67 @@ +# SPDX-FileCopyrightText: Copyright (C) 2025 ARDUINO SA +# +# SPDX-License-Identifier: MPL-2.0 + +# EXAMPLE_NAME: Play a sequence using effects +from arduino.app_bricks.sound_generator import SoundGenerator, SoundEffect +from arduino.app_utils import App + +player = SoundGenerator() + +tune_sequence = [ + ("A4", 0.25), + ("C5", 0.25), + ("E5", 0.25), + ("C5", 0.25), + ("A4", 0.25), + ("C5", 0.25), + ("E5", 0.25), + ("REST", 0.25), + ("G4", 0.25), + ("B4", 0.25), + ("D5", 0.25), + ("B4", 0.25), + ("G4", 0.25), + ("B4", 0.25), + ("D5", 0.25), + ("REST", 0.25), + ("A4", 0.25), + ("A4", 0.25), + ("C5", 0.25), + ("E5", 0.25), + ("F5", 0.5), + ("E5", 0.25), + ("REST", 0.25), + ("D5", 0.25), + ("C5", 0.25), + ("B4", 0.25), + ("A4", 0.25), + ("G4", 0.5), + ("B4", 0.5), + ("REST", 1), +] + +# Play as a retro-game sound +player.set_wave_form("square") +player.set_effects([SoundEffect.adsr()]) # For a more synththetic sound, add SoundEffect.bitcrusher() effect +for note, duration in tune_sequence: + player.play_tone(note, duration) + +# Play with distortion +player.set_wave_form("sine") +player.set_effects([SoundEffect.adsr(), SoundEffect.chorus(), SoundEffect.overdrive(drive=200.0)]) +for note, duration in tune_sequence: + player.play_tone(note, duration) + +# Vibrato effect +player.set_effects([SoundEffect.adsr(), SoundEffect.vibrato()]) +for note, duration in tune_sequence: + player.play_tone(note, duration) + +# Tremolo effect +player.set_wave_form("triangle") +player.set_effects([SoundEffect.adsr(), SoundEffect.tremolo(), SoundEffect.chorus()]) +for note, duration in tune_sequence: + player.play_tone(note, duration) + +App.run() From 6a9113f007bb5a3fde90c346c6057e04d4d11581 Mon Sep 17 00:00:00 2001 From: Marco Colombo Date: Wed, 29 Oct 2025 16:16:15 +0100 Subject: [PATCH 07/22] wait completion on polyphonic sequences --- .../app_bricks/sound_generator/__init__.py | 31 ++++++++++--------- 1 file changed, 17 insertions(+), 14 deletions(-) diff --git a/src/arduino/app_bricks/sound_generator/__init__.py b/src/arduino/app_bricks/sound_generator/__init__.py index 07ec0efa..e76b9c3d 100644 --- a/src/arduino/app_bricks/sound_generator/__init__.py +++ b/src/arduino/app_bricks/sound_generator/__init__.py @@ -255,7 +255,7 @@ def _to_bytes(self, signal: np.ndarray) -> bytes: # Format: "FLOAT_LE" -> (ALSA: "PCM_FORMAT_FLOAT_LE", np.float32), return signal.astype(np.float32).tobytes() - def play_polyphonic(self, notes: list[list[tuple[str, float]]], as_tone: bool = False, volume: float = None) -> bytes: + def play_polyphonic(self, notes: list[list[tuple[str, float]]], as_tone: bool = False, volume: float = None) -> tuple[bytes, float]: """ Play multiple sequences of musical notes simultaneously (poliphony). It is possible to play multi track music by providing a list of sequences, @@ -266,7 +266,7 @@ def play_polyphonic(self, notes: list[list[tuple[str, float]]], as_tone: bool = as_tone (bool): If True, play as tones, considering duration in seconds volume (float, optional): Volume level (0.0 to 1.0). If None, uses master volume. Returns: - bytes: The audio block of the mixed sequences (float32). + tuple[bytes, float]: The audio block of the mixed sequences (float32) and its duration in seconds. """ if volume is None: volume = self._master_volume @@ -274,9 +274,12 @@ def play_polyphonic(self, notes: list[list[tuple[str, float]]], as_tone: bool = # Multi track mixing sequences_data = [] base_frequency = None + max_duration = 0.0 for sequence in notes: sequence_waves = [] + sequence_duration = 0.0 for note, duration in sequence: + sequence_duration += duration frequency = self._get_note(note) if frequency >= 0.0: if base_frequency is None: @@ -287,9 +290,12 @@ def play_polyphonic(self, notes: list[list[tuple[str, float]]], as_tone: bool = sequence_waves.append(data) else: continue + if len(sequence_waves) > 0: single_track_data = np.concatenate(sequence_waves) sequences_data.append(single_track_data) + if sequence_duration > max_duration: + max_duration = sequence_duration if len(sequences_data) == 0: return @@ -308,8 +314,8 @@ def play_polyphonic(self, notes: list[list[tuple[str, float]]], as_tone: bool = mixed /= np.max(np.abs(mixed)) # Normalize to prevent clipping blk = mixed.astype(np.float32) blk = self._apply_sound_effects(blk, base_frequency) - return self._to_bytes(blk) - + return (self._to_bytes(blk), max_duration) + def play_chord(self, notes: list[str], note_duration: float | str = 1 / 4, volume: float = None) -> bytes: """ Play a chord consisting of multiple musical notes simultaneously for a specified duration and volume. @@ -476,7 +482,7 @@ def set_effects(self, effects: list): """ super().set_effects(effects) - def play_polyphonic(self, notes: list[list[tuple[str, float]]], as_tone: bool = False, volume: float = None): + def play_polyphonic(self, notes: list[list[tuple[str, float]]], as_tone: bool = False, volume: float = None, wait_completion: bool = False): """ Play multiple sequences of musical notes simultaneously (poliphony). It is possible to play multi track music by providing a list of sequences, @@ -486,12 +492,12 @@ def play_polyphonic(self, notes: list[list[tuple[str, float]]], as_tone: bool = notes (list[list[tuple[str, float]]]): List of sequences, each sequence is a list of tuples (note, duration). as_tone (bool): If True, play as tones, considering duration in seconds volume (float, optional): Volume level (0.0 to 1.0). If None, uses master volume. + wait_completion (bool): If True, block until the entire sequence has been played. """ - blk = super().play_polyphonic(notes, as_tone, volume) - try: - self._output_device.play(blk, block_on_queue=False) - except Exception as e: - print(f"Error playing multiple sequences: {e}") + blk, duration = super().play_polyphonic(notes, as_tone, volume) + self._output_device.play(blk, block_on_queue=False) + if wait_completion and duration > 0.0: + time.sleep(duration) def play_chord(self, notes: list[str], note_duration: float | str = 1 / 4, volume: float = None): """ @@ -502,10 +508,7 @@ def play_chord(self, notes: list[str], note_duration: float | str = 1 / 4, volum volume (float, optional): Volume level (0.0 to 1.0). If None, uses master volume. """ blk = super().play_chord(notes, note_duration, volume) - try: - self._output_device.play(blk, block_on_queue=False) - except Exception as e: - print(f"Error playing chord {notes}: {e}") + self._output_device.play(blk, block_on_queue=False) def play(self, note: str, note_duration: float | str = 1 / 4, volume: float = None): """ From d3559d56a9e997af3b450daa13e929c2c707df0f Mon Sep 17 00:00:00 2001 From: Marco Colombo Date: Fri, 31 Oct 2025 10:44:16 +0100 Subject: [PATCH 08/22] Move utils code into brick --- .../app_bricks/sound_generator/__init__.py | 7 +- .../sound_generator/examples/4_effects.py | 2 +- .../app_bricks/sound_generator/generator.py | 69 +++++++++++++++++++ src/arduino/app_utils/audio.py | 64 ----------------- 4 files changed, 74 insertions(+), 68 deletions(-) create mode 100644 src/arduino/app_bricks/sound_generator/generator.py diff --git a/src/arduino/app_bricks/sound_generator/__init__.py b/src/arduino/app_bricks/sound_generator/__init__.py index e76b9c3d..54ed4bb6 100644 --- a/src/arduino/app_bricks/sound_generator/__init__.py +++ b/src/arduino/app_bricks/sound_generator/__init__.py @@ -2,13 +2,14 @@ # # SPDX-License-Identifier: MPL-2.0 -from arduino.app_utils import WaveGenerator, brick +from arduino.app_utils import brick from arduino.app_peripherals.speaker import Speaker import threading from typing import Iterable import numpy as np import time +from .generator import WaveSamplesBuilder from .effects import * from .loaders import ABCNotationLoader @@ -95,7 +96,7 @@ def stop(self): def _init_wave_generator(self, wave_form: str): with self._cfg_lock: - self._wave_gen = WaveGenerator(sample_rate=self.SAMPLE_RATE, wave_form=wave_form) + self._wave_gen = WaveSamplesBuilder(sample_rate=self.SAMPLE_RATE, wave_form=wave_form) def set_wave_form(self, wave_form: str): """ @@ -315,7 +316,7 @@ def play_polyphonic(self, notes: list[list[tuple[str, float]]], as_tone: bool = blk = mixed.astype(np.float32) blk = self._apply_sound_effects(blk, base_frequency) return (self._to_bytes(blk), max_duration) - + def play_chord(self, notes: list[str], note_duration: float | str = 1 / 4, volume: float = None) -> bytes: """ Play a chord consisting of multiple musical notes simultaneously for a specified duration and volume. diff --git a/src/arduino/app_bricks/sound_generator/examples/4_effects.py b/src/arduino/app_bricks/sound_generator/examples/4_effects.py index 1c182bc4..16962fc0 100644 --- a/src/arduino/app_bricks/sound_generator/examples/4_effects.py +++ b/src/arduino/app_bricks/sound_generator/examples/4_effects.py @@ -43,7 +43,7 @@ # Play as a retro-game sound player.set_wave_form("square") -player.set_effects([SoundEffect.adsr()]) # For a more synththetic sound, add SoundEffect.bitcrusher() effect +player.set_effects([SoundEffect.adsr()]) # For a more synththetic sound, add SoundEffect.bitcrusher() effect for note, duration in tune_sequence: player.play_tone(note, duration) diff --git a/src/arduino/app_bricks/sound_generator/generator.py b/src/arduino/app_bricks/sound_generator/generator.py new file mode 100644 index 00000000..13596294 --- /dev/null +++ b/src/arduino/app_bricks/sound_generator/generator.py @@ -0,0 +1,69 @@ +# SPDX-FileCopyrightText: Copyright (C) 2025 ARDUINO SA +# +# SPDX-License-Identifier: MPL-2.0 + +import numpy as np + + +class WaveSamplesBuilder: + """Generate wave audio blocks. + + This class produces wave blocks as NumPy buffers. + + Attributes: + sample_rate (int): Audio sample rate in Hz. + """ + + def __init__(self, wave_form: str = "sine", sample_rate: int = 16000): + """Create a new WaveGenerator. + + Args: + wave_form (str): The type of wave form to generate. Supported values + are "sine", "square", "triangle", "white_noise" and "sawtooth". + sample_rate (int): The playback sample rate (Hz) used to compute + phase increments and buffer sizes. + """ + self.wave_form = wave_form.lower() + self.sample_rate = int(sample_rate) + + def generate_block(self, freq: float, block_dur: float, master_volume: float = 1.0): + """Generate a block of float32 audio samples. + + Returned buffer is a NumPy view (float32) into an internal preallocated array and is valid + until the next call to this method. + + Args: + freq (float): Target frequency in Hz for this block. + block_dur (float): Duration of the requested block in seconds. + master_volume (float, optional): Global gain multiplier. Defaults + to 1.0. + + Returns: + numpy.ndarray: A 1-D float32 NumPy array containing the generated + audio samples for the requested block. + """ + N = max(1, int(self.sample_rate * block_dur)) + + # compute wave form based on selected type + t = np.arange(N, dtype=np.float32) / self.sample_rate + + match self.wave_form: + case "square": + samples = 0.5 * (1 + np.sign(np.sin(2.0 * np.pi * freq * t))) + case "triangle": + samples = 2.0 * np.abs(2.0 * (freq * t % 1) - 1.0) - 1.0 + case "sawtooth": + samples = 2.0 * (freq * t % 1.0) - 1.0 + case "white_noise": + samples = np.random.uniform(-1.0, 1.0, size=N).astype(np.float32) + case _: # "sine" e default + samples = np.sin(2.0 * np.pi * freq * t) + + samples = samples.astype(np.float32) + + # apply gain + mg = float(master_volume) + if mg != 1.0: + np.multiply(samples, mg, out=samples) + + return samples diff --git a/src/arduino/app_utils/audio.py b/src/arduino/app_utils/audio.py index 3e8815e8..aac98256 100644 --- a/src/arduino/app_utils/audio.py +++ b/src/arduino/app_utils/audio.py @@ -6,70 +6,6 @@ import numpy as np -class WaveGenerator: - """Generate wave audio blocks. - - This class produces wave blocks as NumPy buffers. - - Attributes: - sample_rate (int): Audio sample rate in Hz. - """ - - def __init__(self, wave_form: str = "sine", sample_rate: int = 16000): - """Create a new WaveGenerator. - - Args: - wave_form (str): The type of wave form to generate. Supported values - are "sine", "square", "triangle", "white_noise" and "sawtooth". - sample_rate (int): The playback sample rate (Hz) used to compute - phase increments and buffer sizes. - """ - self.wave_form = wave_form.lower() - self.sample_rate = int(sample_rate) - - def generate_block(self, freq: float, block_dur: float, master_volume: float = 1.0): - """Generate a block of float32 audio samples. - - Returned buffer is a NumPy view (float32) into an internal preallocated array and is valid - until the next call to this method. - - Args: - freq (float): Target frequency in Hz for this block. - block_dur (float): Duration of the requested block in seconds. - master_volume (float, optional): Global gain multiplier. Defaults - to 1.0. - - Returns: - numpy.ndarray: A 1-D float32 NumPy array containing the generated - audio samples for the requested block. - """ - N = max(1, int(self.sample_rate * block_dur)) - - # compute wave form based on selected type - t = np.arange(N, dtype=np.float32) / self.sample_rate - - match self.wave_form: - case "square": - samples = 0.5 * (1 + np.sign(np.sin(2.0 * np.pi * freq * t))) - case "triangle": - samples = 2.0 * np.abs(2.0 * (freq * t % 1) - 1.0) - 1.0 - case "sawtooth": - samples = 2.0 * (freq * t % 1.0) - 1.0 - case "white_noise": - samples = np.random.uniform(-1.0, 1.0, size=N).astype(np.float32) - case _: # "sine" e default - samples = np.sin(2.0 * np.pi * freq * t) - - samples = samples.astype(np.float32) - - # apply gain - mg = float(master_volume) - if mg != 1.0: - np.multiply(samples, mg, out=samples) - - return samples - - class SineGenerator: """Generate sine-wave audio blocks with amplitude envelope smoothing. From 7dcb9377ff3ea590661968e983b91e20e51fc6d5 Mon Sep 17 00:00:00 2001 From: Marco Colombo Date: Fri, 31 Oct 2025 11:53:42 +0100 Subject: [PATCH 09/22] updated readme --- src/arduino/app_bricks/sound_generator/README.md | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/src/arduino/app_bricks/sound_generator/README.md b/src/arduino/app_bricks/sound_generator/README.md index 7b85e650..084bee93 100644 --- a/src/arduino/app_bricks/sound_generator/README.md +++ b/src/arduino/app_bricks/sound_generator/README.md @@ -1,6 +1,14 @@ # Sound Generator Brick -Play sounds and melodies +Sound Generator is a lightweight and expressive audio generation brick that lets you create, manipulate, and play sounds programmatically. +You can write musical notes, generate tones, and compose melodies — all while shaping the sound through custom waveforms and effects. + +Features: +* *Generate tones and melodies from notes or frequencies. +* Choose your waveform — sine, square, triangle, sawtooth. +* Add sound effects such as chorus, overdrive, delay, vibrato, or distortion. +* Compose procedural music directly from code. +* Real-time playback over speaker ## Code example and usage From 50bc0c401694fbf0a0701100ec6691b7f8443664 Mon Sep 17 00:00:00 2001 From: Marco Colombo Date: Fri, 31 Oct 2025 17:07:20 +0100 Subject: [PATCH 10/22] Fix example --- src/arduino/app_bricks/sound_generator/__init__.py | 1 - .../sound_generator/examples/1_play_sequence.py | 8 +++++--- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/src/arduino/app_bricks/sound_generator/__init__.py b/src/arduino/app_bricks/sound_generator/__init__.py index 54ed4bb6..5e5bb868 100644 --- a/src/arduino/app_bricks/sound_generator/__init__.py +++ b/src/arduino/app_bricks/sound_generator/__init__.py @@ -450,7 +450,6 @@ def __init__( if output_device is None: self.external_speaker = False self._output_device = Speaker(sample_rate=self.SAMPLE_RATE, format="FLOAT_LE") - self.start() else: self.external_speaker = True self._output_device = output_device diff --git a/src/arduino/app_bricks/sound_generator/examples/1_play_sequence.py b/src/arduino/app_bricks/sound_generator/examples/1_play_sequence.py index 906f6b89..1e6e524c 100644 --- a/src/arduino/app_bricks/sound_generator/examples/1_play_sequence.py +++ b/src/arduino/app_bricks/sound_generator/examples/1_play_sequence.py @@ -45,7 +45,9 @@ ("B4", 1 / 8), ("A4", 1), ] -for note, duration in fur_elise: - player.play(note, duration) -App.run() +def user_lp(): + for note, duration in fur_elise: + player.play(note, duration) + +App.run(user_loop=user_lp) From 26b927f379a812712644a0faf40678e9753ab276 Mon Sep 17 00:00:00 2001 From: Marco Colombo Date: Mon, 3 Nov 2025 10:09:45 +0100 Subject: [PATCH 11/22] Added playback queue clear --- src/arduino/app_bricks/sound_generator/__init__.py | 6 ++++++ .../sound_generator/examples/1_play_sequence.py | 2 ++ src/arduino/app_peripherals/speaker/__init__.py | 12 ++++++++++++ 3 files changed, 20 insertions(+) diff --git a/src/arduino/app_bricks/sound_generator/__init__.py b/src/arduino/app_bricks/sound_generator/__init__.py index 5e5bb868..9e504639 100644 --- a/src/arduino/app_bricks/sound_generator/__init__.py +++ b/src/arduino/app_bricks/sound_generator/__init__.py @@ -549,3 +549,9 @@ def play_abc(self, abc_string: str, volume: float = None, wait_completion: bool overall_duration += duration if wait_completion: time.sleep(overall_duration) + + def clear_playback_queue(self): + """ + Clear the playback queue of the output device. + """ + self._output_device.clear_playback_queue() diff --git a/src/arduino/app_bricks/sound_generator/examples/1_play_sequence.py b/src/arduino/app_bricks/sound_generator/examples/1_play_sequence.py index 1e6e524c..58971085 100644 --- a/src/arduino/app_bricks/sound_generator/examples/1_play_sequence.py +++ b/src/arduino/app_bricks/sound_generator/examples/1_play_sequence.py @@ -46,8 +46,10 @@ ("A4", 1), ] + def user_lp(): for note, duration in fur_elise: player.play(note, duration) + App.run(user_loop=user_lp) diff --git a/src/arduino/app_peripherals/speaker/__init__.py b/src/arduino/app_peripherals/speaker/__init__.py index 63a9edab..7ae59f16 100644 --- a/src/arduino/app_peripherals/speaker/__init__.py +++ b/src/arduino/app_peripherals/speaker/__init__.py @@ -504,3 +504,15 @@ def play(self, data: bytes | np.ndarray, block_on_queue: bool = False): except queue.Full: # logger.warning("Playback queue is full, dropping oldest data.") self._playing_queue.get_nowait() + + def is_reproducing(self) -> bool: + """Check if the speaker is currently reproducing audio. + + Returns: + bool: True if reproducing, False otherwise. + """ + return self._is_reproducing.is_set() + + def clear_playback_queue(self): + """Clear the playback queue.""" + self._clear_queue() From 5d3f7b9dd92c2abb574402b583662b6c21e0eebc Mon Sep 17 00:00:00 2001 From: Marco Colombo Date: Mon, 3 Nov 2025 11:00:04 +0100 Subject: [PATCH 12/22] Added wav playback --- .../app_bricks/sound_generator/__init__.py | 59 ++++++++++++++++--- 1 file changed, 50 insertions(+), 9 deletions(-) diff --git a/src/arduino/app_bricks/sound_generator/__init__.py b/src/arduino/app_bricks/sound_generator/__init__.py index 9e504639..b5bd84a1 100644 --- a/src/arduino/app_bricks/sound_generator/__init__.py +++ b/src/arduino/app_bricks/sound_generator/__init__.py @@ -8,6 +8,7 @@ from typing import Iterable import numpy as np import time +from pathlib import Path from .generator import WaveSamplesBuilder from .effects import * @@ -411,6 +412,27 @@ def play_abc(self, abc_string: str, volume: float = None) -> Iterable[tuple[byte data = self._apply_sound_effects(data, frequency) yield (self._to_bytes(data), duration) + def play_wav(self, wav_file: str) -> tuple[bytes, float]: + """ + Play a WAV audio data block. + Args: + wav_file (str): The WAV audio file path. + Returns: + tuple[bytes, float]: The audio block of the WAV file (float32) and its duration in seconds. + """ + import wave + + file_path = Path(wav_file) + if not file_path.exists() or not file_path.is_file(): + raise FileNotFoundError(f"WAV file not found: {wav_file}") + + with wave.open(wav_file, "rb") as wav: + # Read all frames (raw PCM data) + duration = wav.getnframes() / wav.getframerate() + return (wav.readframes(wav.getnframes()), duration) + + return (None, None) + @brick class SoundGenerator(SoundGeneratorStreamer): @@ -482,7 +504,7 @@ def set_effects(self, effects: list): """ super().set_effects(effects) - def play_polyphonic(self, notes: list[list[tuple[str, float]]], as_tone: bool = False, volume: float = None, wait_completion: bool = False): + def play_polyphonic(self, notes: list[list[tuple[str, float]]], as_tone: bool = False, volume: float = None, block: bool = False): """ Play multiple sequences of musical notes simultaneously (poliphony). It is possible to play multi track music by providing a list of sequences, @@ -492,53 +514,66 @@ def play_polyphonic(self, notes: list[list[tuple[str, float]]], as_tone: bool = notes (list[list[tuple[str, float]]]): List of sequences, each sequence is a list of tuples (note, duration). as_tone (bool): If True, play as tones, considering duration in seconds volume (float, optional): Volume level (0.0 to 1.0). If None, uses master volume. - wait_completion (bool): If True, block until the entire sequence has been played. + block (bool): If True, block until the entire sequence has been played. """ blk, duration = super().play_polyphonic(notes, as_tone, volume) self._output_device.play(blk, block_on_queue=False) - if wait_completion and duration > 0.0: + if block and duration > 0.0: time.sleep(duration) - def play_chord(self, notes: list[str], note_duration: float | str = 1 / 4, volume: float = None): + def play_chord(self, notes: list[str], note_duration: float | str = 1 / 4, volume: float = None, block: bool = False): """ Play a chord consisting of multiple musical notes simultaneously for a specified duration and volume. Args: notes (list[str]): List of musical notes to play (e.g., ['A4', 'C#5', 'E5']). note_duration (float | str): Duration of the chord as a float (like 1/4, 1/8) or a symbol ('W', 'H', 'Q', etc.). volume (float, optional): Volume level (0.0 to 1.0). If None, uses master volume. + block (bool): If True, block until the entire chord has been played. """ blk = super().play_chord(notes, note_duration, volume) self._output_device.play(blk, block_on_queue=False) + if block: + duration = self._note_duration(note_duration) + if duration > 0.0: + time.sleep(duration) - def play(self, note: str, note_duration: float | str = 1 / 4, volume: float = None): + def play(self, note: str, note_duration: float | str = 1 / 4, volume: float = None, block: bool = False): """ Play a musical note for a specified duration and volume. Args: note (str): The musical note to play (e.g., 'A4', 'C#5', 'REST'). note_duration (float | str): Duration of the note as a float (like 1/4, 1/8) or a symbol ('W', 'H', 'Q', etc.). volume (float, optional): Volume level (0.0 to 1.0). If None, uses master volume. + block (bool): If True, block until the entire note has been played. """ data = super().play(note, note_duration, volume) self._output_device.play(data, block_on_queue=False) + if block: + duration = self._note_duration(note_duration) + if duration > 0.0: + time.sleep(duration) - def play_tone(self, note: str, duration: float = 0.25, volume: float = None): + def play_tone(self, note: str, duration: float = 0.25, volume: float = None, block: bool = False): """ Play a musical note for a specified duration and volume. Args: note (str): The musical note to play (e.g., 'A4', 'C#5', 'REST'). duration (float): Duration of the note as a float in seconds. volume (float, optional): Volume level (0.0 to 1.0). If None, uses master volume. + block (bool): If True, block until the entire note has been played. """ data = super().play_tone(note, duration, volume) self._output_device.play(data, block_on_queue=False) + if block and duration > 0.0: + time.sleep(duration) - def play_abc(self, abc_string: str, volume: float = None, wait_completion: bool = False): + def play_abc(self, abc_string: str, volume: float = None, block: bool = False): """ Play a sequence of musical notes defined in ABC notation. Args: abc_string (str): ABC notation string defining the sequence of notes. volume (float, optional): Volume level (0.0 to 1.0). If None, uses master volume. - wait_completion (bool): If True, block until the entire sequence has been played. + block (bool): If True, block until the entire sequence has been played. """ if not abc_string or abc_string.strip() == "": return @@ -547,9 +582,15 @@ def play_abc(self, abc_string: str, volume: float = None, wait_completion: bool for data, duration in player: self._output_device.play(data, block_on_queue=True) overall_duration += duration - if wait_completion: + if block: time.sleep(overall_duration) + def play_wav(self, wav_file: str, volume: float = None, block: bool = False): + to_play, duration = super().play_wav(wav_file) + self._output_device.play(to_play, block_on_queue=False) + if block and duration > 0.0: + time.sleep(duration) + def clear_playback_queue(self): """ Clear the playback queue of the output device. From 660f713911d9c99421a83a65e0f6203a2ecbc8ff Mon Sep 17 00:00:00 2001 From: Marco Colombo Date: Mon, 3 Nov 2025 11:28:17 +0100 Subject: [PATCH 13/22] structuring cache --- .../app_bricks/sound_generator/__init__.py | 21 +++++++++++++++++-- 1 file changed, 19 insertions(+), 2 deletions(-) diff --git a/src/arduino/app_bricks/sound_generator/__init__.py b/src/arduino/app_bricks/sound_generator/__init__.py index b5bd84a1..5bf8d602 100644 --- a/src/arduino/app_bricks/sound_generator/__init__.py +++ b/src/arduino/app_bricks/sound_generator/__init__.py @@ -9,6 +9,7 @@ import numpy as np import time from pathlib import Path +from collections import OrderedDict from .generator import WaveSamplesBuilder from .effects import * @@ -426,10 +427,20 @@ def play_wav(self, wav_file: str) -> tuple[bytes, float]: if not file_path.exists() or not file_path.is_file(): raise FileNotFoundError(f"WAV file not found: {wav_file}") + wav_cache = OrderedDict() + + if wav_file in wav_cache: + return wav_cache[wav_file] + with wave.open(wav_file, "rb") as wav: # Read all frames (raw PCM data) duration = wav.getnframes() / wav.getframerate() - return (wav.readframes(wav.getnframes()), duration) + wav_data = wav.readframes(wav.getnframes()) + if len(wav_cache) < 250 * 1024: # 250 KB cache limit + wav_cache[wav_file] = (wav_data, duration) + if len(wav_cache) > 10: + wav_cache.popitem(last=False) + return (wav_data, duration) return (None, None) @@ -585,7 +596,13 @@ def play_abc(self, abc_string: str, volume: float = None, block: bool = False): if block: time.sleep(overall_duration) - def play_wav(self, wav_file: str, volume: float = None, block: bool = False): + def play_wav(self, wav_file: str, block: bool = False): + """ + Play a WAV audio data block. + Args: + wav_file (str): The WAV audio file path. + block (bool): If True, block until the entire WAV file has been played. + """ to_play, duration = super().play_wav(wav_file) self._output_device.play(to_play, block_on_queue=False) if block and duration > 0.0: From d1f86238aa5051ffc6453f86aeff77eaf9241af4 Mon Sep 17 00:00:00 2001 From: Marco Colombo Date: Wed, 5 Nov 2025 09:13:44 +0100 Subject: [PATCH 14/22] wav cache to reduce disk reads --- .../app_bricks/sound_generator/__init__.py | 37 +++++++++++++++---- 1 file changed, 29 insertions(+), 8 deletions(-) diff --git a/src/arduino/app_bricks/sound_generator/__init__.py b/src/arduino/app_bricks/sound_generator/__init__.py index 5bf8d602..695314b2 100644 --- a/src/arduino/app_bricks/sound_generator/__init__.py +++ b/src/arduino/app_bricks/sound_generator/__init__.py @@ -16,6 +16,29 @@ from .loaders import ABCNotationLoader +class LRUDict(OrderedDict): + """A dictionary-like object with a fixed size that evicts the least recently used items.""" + + def __init__(self, maxsize=128, *args, **kwargs): + self.maxsize = maxsize + super().__init__(*args, **kwargs) + + def __getitem__(self, key): + value = super().__getitem__(key) + self.move_to_end(key) + return value + + def __setitem__(self, key, value): + if key in self: + self.move_to_end(key) + + super().__setitem__(key, value) + + if len(self) > self.maxsize: + # Evict the least recently used item (the first item) + self.popitem(last=False) + + @brick class SoundGeneratorStreamer: SAMPLE_RATE = 16000 @@ -90,6 +113,8 @@ def __init__( notes = self._fill_node_frequencies(octave) self._notes.update(notes) + self._wav_cache = LRUDict(maxsize=10) + def start(self): pass @@ -427,19 +452,15 @@ def play_wav(self, wav_file: str) -> tuple[bytes, float]: if not file_path.exists() or not file_path.is_file(): raise FileNotFoundError(f"WAV file not found: {wav_file}") - wav_cache = OrderedDict() - - if wav_file in wav_cache: - return wav_cache[wav_file] + if wav_file in self._wav_cache: + return self._wav_cache[wav_file] with wave.open(wav_file, "rb") as wav: # Read all frames (raw PCM data) duration = wav.getnframes() / wav.getframerate() wav_data = wav.readframes(wav.getnframes()) - if len(wav_cache) < 250 * 1024: # 250 KB cache limit - wav_cache[wav_file] = (wav_data, duration) - if len(wav_cache) > 10: - wav_cache.popitem(last=False) + if len(self._wav_cache) < 250 * 1024: # 250 KB cache limit + self._wav_cache[wav_file] = (wav_data, duration) return (wav_data, duration) return (None, None) From 37fbe03580d5b3b3c4ad08cc5b2e273f8037cdb8 Mon Sep 17 00:00:00 2001 From: Marco Colombo Date: Tue, 11 Nov 2025 16:09:30 +0100 Subject: [PATCH 15/22] Code lint --- .../app_bricks/sound_generator/__init__.py | 15 +++++++++------ src/arduino/app_bricks/sound_generator/effects.py | 2 +- .../app_bricks/sound_generator/test_effects.py | 1 - 3 files changed, 10 insertions(+), 8 deletions(-) diff --git a/src/arduino/app_bricks/sound_generator/__init__.py b/src/arduino/app_bricks/sound_generator/__init__.py index 695314b2..6167b8cc 100644 --- a/src/arduino/app_bricks/sound_generator/__init__.py +++ b/src/arduino/app_bricks/sound_generator/__init__.py @@ -90,12 +90,13 @@ def __init__( ): """Initialize the SoundGeneratorStreamer. Generates sound blocks for streaming, without internal playback. Args: - wave_form (str): The type of wave form to generate. Supported values - are "sine" (default), "square", "triangle" and "sawtooth". bpm (int): The tempo in beats per minute for note duration calculations. - master_volume (float): The master volume level (0.0 to 1.0). + time_signature (tuple): The time signature as (numerator, denominator). octaves (int): Number of octaves to generate notes for (starting from octave 0 up to octaves-1). + wave_form (str): The type of wave form to generate. Supported values + are "sine" (default), "square", "triangle" and "sawtooth". + master_volume (float): The master volume level (0.0 to 1.0). sound_effects (list, optional): List of sound effect instances to apply to the audio signal (e.g., [SoundEffect.adsr()]). See SoundEffect class for available effects. """ @@ -258,6 +259,7 @@ def _apply_sound_effects(self, signal: np.ndarray, frequency: float) -> np.ndarr Apply the configured sound effects to the audio signal. Args: signal (np.ndarray): Input audio signal. + frequency (float): Frequency of the note being played. Returns: np.ndarray: Processed audio signal with sound effects applied. """ @@ -312,7 +314,7 @@ def play_polyphonic(self, notes: list[list[tuple[str, float]]], as_tone: bool = if frequency >= 0.0: if base_frequency is None: base_frequency = frequency - if as_tone == False: + if not as_tone: duration = self._note_duration(duration) data = self._wave_gen.generate_block(float(frequency), duration, volume) sequence_waves.append(data) @@ -489,6 +491,7 @@ def __init__( 0 up to octaves-1). sound_effects (list, optional): List of sound effect instances to apply to the audio signal (e.g., [SoundEffect.adsr()]). See SoundEffect class for available effects. + time_signature (tuple): The time signature as (numerator, denominator). """ super().__init__( @@ -511,12 +514,12 @@ def __init__( def start(self): if self._started.is_set(): return - if self.external_speaker == False: + if not self.external_speaker: self._output_device.start(notify_if_started=False) self._started.set() def stop(self): - if self.external_speaker == False: + if not self.external_speaker: self._output_device.stop() self._started.clear() diff --git a/src/arduino/app_bricks/sound_generator/effects.py b/src/arduino/app_bricks/sound_generator/effects.py index be220388..622b095a 100644 --- a/src/arduino/app_bricks/sound_generator/effects.py +++ b/src/arduino/app_bricks/sound_generator/effects.py @@ -178,7 +178,7 @@ def __init__(self, bits: int = 4, reduction: int = 4): """ Bitcrusher effect. Args: - bit_depth (int): Number of bits for quantization (1-16). + bits (int): Bit depth for quantization (1-16). reduction (int): Redeuction factor for downsampling (>=1). """ self.bit_depth = np.clip(bits, 1, 16) diff --git a/tests/arduino/app_bricks/sound_generator/test_effects.py b/tests/arduino/app_bricks/sound_generator/test_effects.py index 781af770..35b3aed6 100644 --- a/tests/arduino/app_bricks/sound_generator/test_effects.py +++ b/tests/arduino/app_bricks/sound_generator/test_effects.py @@ -2,7 +2,6 @@ # # SPDX-License-Identifier: MPL-2.0 -import pytest from arduino.app_bricks.sound_generator.effects import SoundEffect from arduino.app_bricks.sound_generator import SoundGenerator from arduino.app_utils.audio import WaveGenerator From 47e8b1a3004f76c5592ddc9b7bd2ff58188b40d6 Mon Sep 17 00:00:00 2001 From: Marco Colombo Date: Tue, 11 Nov 2025 16:15:05 +0100 Subject: [PATCH 16/22] fix test --- tests/arduino/app_bricks/sound_generator/test_effects.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/tests/arduino/app_bricks/sound_generator/test_effects.py b/tests/arduino/app_bricks/sound_generator/test_effects.py index 35b3aed6..be50bc9b 100644 --- a/tests/arduino/app_bricks/sound_generator/test_effects.py +++ b/tests/arduino/app_bricks/sound_generator/test_effects.py @@ -3,12 +3,11 @@ # SPDX-License-Identifier: MPL-2.0 from arduino.app_bricks.sound_generator.effects import SoundEffect -from arduino.app_bricks.sound_generator import SoundGenerator -from arduino.app_utils.audio import WaveGenerator +from arduino.app_bricks.sound_generator import SoundGenerator, WaveSamplesBuilder def test_adsr_effect(): - generator = WaveGenerator(sample_rate=16000, wave_form="square") + generator = WaveSamplesBuilder(sample_rate=16000, wave_form="square") adsr = SoundEffect.adsr() blk = generator.generate_block(440.0, 1 / 8, 1.0) # Generate a block to initialize assert adsr is not None From 84f327a8fa49756ceefe07088c03f8e2e2192b12 Mon Sep 17 00:00:00 2001 From: Marco Colombo Date: Tue, 11 Nov 2025 16:34:48 +0100 Subject: [PATCH 17/22] fix test --- tests/arduino/app_bricks/sound_generator/test_effects.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/arduino/app_bricks/sound_generator/test_effects.py b/tests/arduino/app_bricks/sound_generator/test_effects.py index be50bc9b..03ad7070 100644 --- a/tests/arduino/app_bricks/sound_generator/test_effects.py +++ b/tests/arduino/app_bricks/sound_generator/test_effects.py @@ -3,7 +3,7 @@ # SPDX-License-Identifier: MPL-2.0 from arduino.app_bricks.sound_generator.effects import SoundEffect -from arduino.app_bricks.sound_generator import SoundGenerator, WaveSamplesBuilder +from arduino.app_bricks.sound_generator import SoundGeneratorStreamer, WaveSamplesBuilder def test_adsr_effect(): @@ -56,7 +56,7 @@ def test_available_notes(): ("B4", 0.25), ] - generator = SoundGenerator() + generator = SoundGeneratorStreamer() for note, duration in note_sequence: print(f"Testing note: {note}") frequency = generator._get_note(note) From 0ed8c6c60b27765ecf5009f8e0623cad3c758273 Mon Sep 17 00:00:00 2001 From: Marco Colombo Date: Tue, 11 Nov 2025 16:43:33 +0100 Subject: [PATCH 18/22] Fix license header --- tests/arduino/app_bricks/sound_generator/test_abc.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/tests/arduino/app_bricks/sound_generator/test_abc.py b/tests/arduino/app_bricks/sound_generator/test_abc.py index d1f3f007..07f4830f 100644 --- a/tests/arduino/app_bricks/sound_generator/test_abc.py +++ b/tests/arduino/app_bricks/sound_generator/test_abc.py @@ -1,3 +1,7 @@ +# SPDX-FileCopyrightText: Copyright (C) 2025 ARDUINO SA +# +# SPDX-License-Identifier: MPL-2.0 + from arduino.app_bricks.sound_generator import ABCNotationLoader From 579d9b857769a5b57ce2b95e461c896e21802ae4 Mon Sep 17 00:00:00 2001 From: Marco Colombo Date: Fri, 28 Nov 2025 11:10:45 +0100 Subject: [PATCH 19/22] Fix license --- src/arduino/app_bricks/sound_generator/__init__.py | 2 +- src/arduino/app_bricks/sound_generator/effects.py | 2 +- .../app_bricks/sound_generator/examples/1_play_sequence.py | 2 +- .../app_bricks/sound_generator/examples/2_stream_sequence.py | 2 +- .../app_bricks/sound_generator/examples/3_play_abc_notation.py | 2 +- src/arduino/app_bricks/sound_generator/examples/4_effects.py | 2 +- src/arduino/app_bricks/sound_generator/generator.py | 2 +- src/arduino/app_bricks/sound_generator/loaders.py | 2 +- tests/arduino/app_bricks/sound_generator/test_abc.py | 2 +- tests/arduino/app_bricks/sound_generator/test_effects.py | 2 +- 10 files changed, 10 insertions(+), 10 deletions(-) diff --git a/src/arduino/app_bricks/sound_generator/__init__.py b/src/arduino/app_bricks/sound_generator/__init__.py index 6167b8cc..f59fd46b 100644 --- a/src/arduino/app_bricks/sound_generator/__init__.py +++ b/src/arduino/app_bricks/sound_generator/__init__.py @@ -1,4 +1,4 @@ -# SPDX-FileCopyrightText: Copyright (C) 2025 ARDUINO SA +# SPDX-FileCopyrightText: Copyright (C) ARDUINO SRL (http://www.arduino.cc) # # SPDX-License-Identifier: MPL-2.0 diff --git a/src/arduino/app_bricks/sound_generator/effects.py b/src/arduino/app_bricks/sound_generator/effects.py index 622b095a..dc68cd89 100644 --- a/src/arduino/app_bricks/sound_generator/effects.py +++ b/src/arduino/app_bricks/sound_generator/effects.py @@ -1,4 +1,4 @@ -# SPDX-FileCopyrightText: Copyright (C) 2025 ARDUINO SA +# SPDX-FileCopyrightText: Copyright (C) ARDUINO SRL (http://www.arduino.cc) # # SPDX-License-Identifier: MPL-2.0 diff --git a/src/arduino/app_bricks/sound_generator/examples/1_play_sequence.py b/src/arduino/app_bricks/sound_generator/examples/1_play_sequence.py index 58971085..5f659a8d 100644 --- a/src/arduino/app_bricks/sound_generator/examples/1_play_sequence.py +++ b/src/arduino/app_bricks/sound_generator/examples/1_play_sequence.py @@ -1,4 +1,4 @@ -# SPDX-FileCopyrightText: Copyright (C) 2025 ARDUINO SA +# SPDX-FileCopyrightText: Copyright (C) ARDUINO SRL (http://www.arduino.cc) # # SPDX-License-Identifier: MPL-2.0 diff --git a/src/arduino/app_bricks/sound_generator/examples/2_stream_sequence.py b/src/arduino/app_bricks/sound_generator/examples/2_stream_sequence.py index 74718a95..1571de8c 100644 --- a/src/arduino/app_bricks/sound_generator/examples/2_stream_sequence.py +++ b/src/arduino/app_bricks/sound_generator/examples/2_stream_sequence.py @@ -1,4 +1,4 @@ -# SPDX-FileCopyrightText: Copyright (C) 2025 ARDUINO SA +# SPDX-FileCopyrightText: Copyright (C) ARDUINO SRL (http://www.arduino.cc) # # SPDX-License-Identifier: MPL-2.0 diff --git a/src/arduino/app_bricks/sound_generator/examples/3_play_abc_notation.py b/src/arduino/app_bricks/sound_generator/examples/3_play_abc_notation.py index b455130d..f691a89b 100644 --- a/src/arduino/app_bricks/sound_generator/examples/3_play_abc_notation.py +++ b/src/arduino/app_bricks/sound_generator/examples/3_play_abc_notation.py @@ -1,4 +1,4 @@ -# SPDX-FileCopyrightText: Copyright (C) 2025 ARDUINO SA +# SPDX-FileCopyrightText: Copyright (C) ARDUINO SRL (http://www.arduino.cc) # # SPDX-License-Identifier: MPL-2.0 diff --git a/src/arduino/app_bricks/sound_generator/examples/4_effects.py b/src/arduino/app_bricks/sound_generator/examples/4_effects.py index 16962fc0..c31cc8b3 100644 --- a/src/arduino/app_bricks/sound_generator/examples/4_effects.py +++ b/src/arduino/app_bricks/sound_generator/examples/4_effects.py @@ -1,4 +1,4 @@ -# SPDX-FileCopyrightText: Copyright (C) 2025 ARDUINO SA +# SPDX-FileCopyrightText: Copyright (C) ARDUINO SRL (http://www.arduino.cc) # # SPDX-License-Identifier: MPL-2.0 diff --git a/src/arduino/app_bricks/sound_generator/generator.py b/src/arduino/app_bricks/sound_generator/generator.py index 13596294..d0b3dc05 100644 --- a/src/arduino/app_bricks/sound_generator/generator.py +++ b/src/arduino/app_bricks/sound_generator/generator.py @@ -1,4 +1,4 @@ -# SPDX-FileCopyrightText: Copyright (C) 2025 ARDUINO SA +# SPDX-FileCopyrightText: Copyright (C) ARDUINO SRL (http://www.arduino.cc) # # SPDX-License-Identifier: MPL-2.0 diff --git a/src/arduino/app_bricks/sound_generator/loaders.py b/src/arduino/app_bricks/sound_generator/loaders.py index 6b454dac..ef1dfe9b 100644 --- a/src/arduino/app_bricks/sound_generator/loaders.py +++ b/src/arduino/app_bricks/sound_generator/loaders.py @@ -1,4 +1,4 @@ -# SPDX-FileCopyrightText: Copyright (C) 2025 ARDUINO SA +# SPDX-FileCopyrightText: Copyright (C) ARDUINO SRL (http://www.arduino.cc) # # SPDX-License-Identifier: MPL-2.0 diff --git a/tests/arduino/app_bricks/sound_generator/test_abc.py b/tests/arduino/app_bricks/sound_generator/test_abc.py index 07f4830f..bc3579ea 100644 --- a/tests/arduino/app_bricks/sound_generator/test_abc.py +++ b/tests/arduino/app_bricks/sound_generator/test_abc.py @@ -1,4 +1,4 @@ -# SPDX-FileCopyrightText: Copyright (C) 2025 ARDUINO SA +# SPDX-FileCopyrightText: Copyright (C) ARDUINO SRL (http://www.arduino.cc) # # SPDX-License-Identifier: MPL-2.0 diff --git a/tests/arduino/app_bricks/sound_generator/test_effects.py b/tests/arduino/app_bricks/sound_generator/test_effects.py index 03ad7070..33831c5b 100644 --- a/tests/arduino/app_bricks/sound_generator/test_effects.py +++ b/tests/arduino/app_bricks/sound_generator/test_effects.py @@ -1,4 +1,4 @@ -# SPDX-FileCopyrightText: Copyright (C) 2025 ARDUINO SA +# SPDX-FileCopyrightText: Copyright (C) ARDUINO SRL (http://www.arduino.cc) # # SPDX-License-Identifier: MPL-2.0 From 5acf584cdfbdcc264b788c9c6521c8af87b77b83 Mon Sep 17 00:00:00 2001 From: Dario Sammaruga Date: Wed, 10 Dec 2025 22:49:52 +0100 Subject: [PATCH 20/22] improvements --- .../app_bricks/sound_generator/__init__.py | 213 +++++++++++++++++- .../app_peripherals/speaker/__init__.py | 37 ++- 2 files changed, 245 insertions(+), 5 deletions(-) diff --git a/src/arduino/app_bricks/sound_generator/__init__.py b/src/arduino/app_bricks/sound_generator/__init__.py index f59fd46b..6b0ccd92 100644 --- a/src/arduino/app_bricks/sound_generator/__init__.py +++ b/src/arduino/app_bricks/sound_generator/__init__.py @@ -2,12 +2,13 @@ # # SPDX-License-Identifier: MPL-2.0 -from arduino.app_utils import brick +from arduino.app_utils import brick, Logger from arduino.app_peripherals.speaker import Speaker import threading from typing import Iterable import numpy as np import time +import logging from pathlib import Path from collections import OrderedDict @@ -15,6 +16,8 @@ from .effects import * from .loaders import ABCNotationLoader +logger = Logger("SoundGenerator", logging.DEBUG) + class LRUDict(OrderedDict): """A dictionary-like object with a fixed size that evicts the least recently used items.""" @@ -357,6 +360,7 @@ def play_chord(self, notes: list[str], note_duration: float | str = 1 / 4, volum bytes: The audio block of the mixed sequences (float32). """ duration = self._note_duration(note_duration) + logger.debug(f"play_chord: notes={notes}, note_duration={note_duration}, duration={duration}s, volume={volume}") if len(notes) == 1: self.play(notes[0], duration, volume) return @@ -372,6 +376,7 @@ def play_chord(self, notes: list[str], note_duration: float | str = 1 / 4, volum volume = self._master_volume data = self._wave_gen.generate_block(float(frequency), duration, volume) waves.append(data) + logger.debug(f" Generated wave for {note} @ {frequency}Hz, {len(data)} samples") else: continue if len(waves) == 0: @@ -380,7 +385,9 @@ def play_chord(self, notes: list[str], note_duration: float | str = 1 / 4, volum chord /= np.max(np.abs(chord)) # Normalize to prevent clipping blk = chord.astype(np.float32) blk = self._apply_sound_effects(blk, base_frequency) - return self._to_bytes(blk) + audio_bytes = self._to_bytes(blk) + logger.debug(f" Chord generated: {len(audio_bytes)} bytes") + return audio_bytes def play(self, note: str, note_duration: float | str = 1 / 4, volume: float = None) -> bytes: """ @@ -394,12 +401,15 @@ def play(self, note: str, note_duration: float | str = 1 / 4, volume: float = No """ duration = self._note_duration(note_duration) frequency = self._get_note(note) + logger.debug(f"play: note={note}, note_duration={note_duration}, duration={duration}s, frequency={frequency}Hz, volume={volume}") if frequency is not None and frequency >= 0.0: if volume is None: volume = self._master_volume data = self._wave_gen.generate_block(float(frequency), duration, volume) data = self._apply_sound_effects(data, frequency) - return self._to_bytes(data) + audio_bytes = self._to_bytes(data) + logger.debug(f" Generated audio: {len(audio_bytes)} bytes") + return audio_bytes def play_tone(self, note: str, duration: float = 0.25, volume: float = None) -> bytes: """ @@ -506,11 +516,26 @@ def __init__( self._started = threading.Event() if output_device is None: self.external_speaker = False - self._output_device = Speaker(sample_rate=self.SAMPLE_RATE, format="FLOAT_LE") + # Configure periodsize and queue for very responsive stop operations + # Use 62.5ms periods (1000 frames @ 16kHz) for quick response to stop commands + # Very small queue (maxsize=3) = ~190ms total buffer for ultra-responsive stop + period_size = int(self.SAMPLE_RATE * 0.0625) # 1000 frames = 62.5ms + self._output_device = Speaker( + sample_rate=self.SAMPLE_RATE, + format="FLOAT_LE", + periodsize=period_size, + queue_maxsize=3, # Ultra-low latency: 3 × 62.5ms = ~190ms max buffer + ) else: self.external_speaker = True self._output_device = output_device + # Step sequencer state + self._sequence_thread = None + self._sequence_stop_event = threading.Event() + self._sequence_lock = threading.Lock() + self._playback_session_id = 0 # Incremented each playback to invalidate stale callbacks + def start(self): if self._started.is_set(): return @@ -565,8 +590,10 @@ def play_chord(self, notes: list[str], note_duration: float | str = 1 / 4, volum volume (float, optional): Volume level (0.0 to 1.0). If None, uses master volume. block (bool): If True, block until the entire chord has been played. """ + logger.debug(f"SoundGenerator.play_chord: notes={notes}, block_on_queue=False") blk = super().play_chord(notes, note_duration, volume) self._output_device.play(blk, block_on_queue=False) + logger.debug(f" Audio sent to device queue") if block: duration = self._note_duration(note_duration) if duration > 0.0: @@ -581,8 +608,10 @@ def play(self, note: str, note_duration: float | str = 1 / 4, volume: float = No volume (float, optional): Volume level (0.0 to 1.0). If None, uses master volume. block (bool): If True, block until the entire note has been played. """ + logger.debug(f"SoundGenerator.play: note={note}, block_on_queue=False") data = super().play(note, note_duration, volume) self._output_device.play(data, block_on_queue=False) + logger.debug(f" Audio sent to device queue") if block: duration = self._note_duration(note_duration) if duration > 0.0: @@ -637,3 +666,179 @@ def clear_playback_queue(self): Clear the playback queue of the output device. """ self._output_device.clear_playback_queue() + + def play_step_sequence( + self, + sequence: list[list[str]], + note_duration: float | str = 1 / 8, + bpm: int = None, + loop: bool = False, + on_step_callback: callable = None, + on_complete_callback: callable = None, + volume: float = None, + ): + """ + Play a step sequence with automatic timing, pre-buffering, and lookahead. + This method handles all the complexity of buffer management internally, + allowing the app to simply provide the sequence and let the brick manage playback. + + Args: + sequence (list[list[str]]): List of steps, where each step is a list of notes. + Empty list or None means REST (silence) for that step. + Example: [['C4'], ['E4', 'G4'], [], ['C5']] + note_duration (float | str): Duration of each step as a float (like 1/8) or symbol ('E', 'Q', etc.). + bpm (int, optional): Tempo in beats per minute. If None, uses instance BPM. + loop (bool): If True, the sequence will loop indefinitely until stop_sequence() is called. + on_step_callback (callable, optional): Callback function called for each step. + Signature: on_step_callback(current_step: int, total_steps: int) + on_complete_callback (callable, optional): Callback function called when sequence completes (only if loop=False). + Signature: on_complete_callback() + volume (float, optional): Volume level (0.0 to 1.0). If None, uses master volume. + + Returns: + None: Returns immediately after starting playback thread. + + Example: + ```python + # Simple melody with chords + sequence = [ + ["C4"], # Step 0: Single note + ["E4", "G4"], # Step 1: Chord + [], # Step 2: REST + ["C5"], # Step 3: High note + ] + sound_gen.play_step_sequence(sequence, note_duration=1 / 8, bpm=120) + ``` + """ + # Stop any existing sequence + self.stop_sequence() + + # Use instance BPM if not specified + if bpm is None: + bpm = self._bpm + + # Validate sequence + if not sequence or len(sequence) == 0: + logger.warning("play_step_sequence: Empty sequence provided") + return + + # Start playback thread with new session ID + self._sequence_stop_event.clear() + self._playback_session_id += 1 + session_id = self._playback_session_id + self._sequence_thread = threading.Thread( + target=self._playback_sequence_thread, + args=(sequence, note_duration, bpm, loop, on_step_callback, on_complete_callback, volume, session_id), + daemon=True, + name="SoundGen-StepSeq", + ) + self._sequence_thread.start() + logger.info(f"Step sequence started: {len(sequence)} steps at {bpm} BPM (session {session_id})") + + def stop_sequence(self): + """ + Stop the currently playing step sequence. + This method signals the playback thread to stop and clears the queue immediately. + The thread will detect the stop signal and exit at the next check point. + """ + logger.info("stop_sequence() called") + with self._sequence_lock: + if self._sequence_thread and self._sequence_thread.is_alive(): + logger.info("Stopping step sequence playback - calling drop_playback()") + # Increment session ID to invalidate the running thread immediately + self._playback_session_id += 1 + self._sequence_stop_event.set() + # Clear reference immediately - thread will clean itself up + self._sequence_thread = None + self._output_device.drop_playback() + else: + logger.warning("stop_sequence called but no active sequence thread") + + def is_sequence_playing(self) -> bool: + """ + Check if a step sequence is currently playing. + + Returns: + bool: True if a sequence is playing, False otherwise. + """ + with self._sequence_lock: + return self._sequence_thread is not None and self._sequence_thread.is_alive() + + def _playback_sequence_thread( + self, + sequence: list[list[str]], + note_duration: float | str, + bpm: int, + loop: bool, + on_step_callback: callable, + on_complete_callback: callable, + volume: float, + session_id: int, + ): + """Internal thread for step sequence playback. + + Simple approach: generate step-by-step, use block_on_queue=True for natural + synchronization with ALSA consumption. Callbacks are emitted immediately after + queuing each step, ensuring perfect sync with audio playback. + """ + from itertools import cycle + import numpy as np + + try: + duration = self._note_duration(note_duration) + total_steps = len(sequence) + + logger.info(f"Starting sequence: {total_steps} steps at {bpm} BPM") + + # PRE-FILL: Queue one period of silence to prevent first-note underrun + # This gives ALSA something to consume while we generate the first real note + silence_frames = int(duration * self._output_device.sample_rate) + silence = np.zeros(silence_frames, dtype=np.float32).tobytes() + self._output_device.play(silence, block_on_queue=False) + logger.debug(f"Pre-filled queue with {len(silence)} bytes of silence") + + # Create infinite iterator if looping, otherwise single pass + step_iterator = cycle(enumerate(sequence)) if loop else enumerate(sequence) + + for step_index, notes in step_iterator: + # Check for stop signal + if self._sequence_stop_event.is_set(): + logger.debug(f"Sequence stopped at step {step_index}") + break + + # Generate audio for this step + if notes and len(notes) > 0: + if len(notes) == 1: + data = super(SoundGenerator, self).play(notes[0], note_duration, volume) + else: + data = super(SoundGenerator, self).play_chord(notes, note_duration, volume) + else: + # REST: silence + data = super(SoundGenerator, self).play("REST", note_duration, volume) + + # Queue audio - BLOCKS until there's space (natural sync with ALSA!) + if data: + self._output_device.play(data, block_on_queue=True) + + # Emit callback IMMEDIATELY after queuing + # This is synchronized with actual playback timing via blocking + if on_step_callback: + try: + on_step_callback(step_index, total_steps) + except Exception as e: + logger.error(f"Error in step callback: {e}") + + logger.info("Sequence playback ended") + + # Call completion callback if provided and not looping + if not loop and on_complete_callback: + try: + on_complete_callback() + except Exception as e: + logger.error(f"Error in complete callback: {e}") + + except Exception as e: + logger.error(f"Error in sequence playback: {e}", exc_info=True) + finally: + with self._sequence_lock: + self._sequence_thread = None diff --git a/src/arduino/app_peripherals/speaker/__init__.py b/src/arduino/app_peripherals/speaker/__init__.py index 7ae59f16..5e8ea6b0 100644 --- a/src/arduino/app_peripherals/speaker/__init__.py +++ b/src/arduino/app_peripherals/speaker/__init__.py @@ -7,9 +7,10 @@ import threading import queue import re +import logging from arduino.app_utils import Logger -logger = Logger("Speaker") +logger = Logger("Speaker", logging.DEBUG) class SpeakerException(Exception): @@ -97,6 +98,7 @@ def __init__( self._pcm_lock = threading.Lock() self._native_rate = None self._is_reproducing = threading.Event() + self._is_dropping = threading.Event() # Signal to playback loop to pause during drop self._periodsize = periodsize # Store configured periodsize (None = hardware default) self._playing_queue: bytes = queue.Queue(maxsize=queue_maxsize) # Queue for audio data to play with limited capacity self.device = self._resolve_device(device) @@ -432,6 +434,7 @@ def _playback_loop(self): try: data = self._playing_queue.get(timeout=1) # Wait for audio data if data is None: + logger.debug("Got None from queue, skipping") continue # Skip if no data is available # Check queue depth periodically @@ -444,7 +447,14 @@ def _playback_loop(self): with self._pcm_lock: if self._pcm is not None: try: + # Skip writing if drop is in progress + if self._is_dropping.is_set(): + logger.debug("Skipping PCM write during drop operation") + continue + + logger.debug(f"Writing {len(data)} bytes to PCM device") written = self._pcm.write(data) + logger.debug(f"Successfully wrote {len(data)} bytes to PCM device") # Check for ALSA errors (negative return values) if written < 0: @@ -516,3 +526,28 @@ def is_reproducing(self) -> bool: def clear_playback_queue(self): """Clear the playback queue.""" self._clear_queue() + + def drop_playback(self): + """Drop all pending audio data immediately (both queue and hardware buffer). + + This method clears both the software queue and the ALSA hardware buffer, + stopping audio playback immediately. Use this for responsive stop operations. + """ + # Signal playback loop to stop writing temporarily + self._is_dropping.set() + + # Clear software queue first + self._clear_queue() + + # Then drop ALSA hardware buffer + with self._pcm_lock: + if self._pcm is not None: + try: + self._pcm.drop() # Immediately stop PCM, drop pending frames + logger.debug("ALSA PCM buffer dropped") + except Exception as e: + logger.warning(f"Failed to drop PCM buffer: {e}") + + # Allow playback to resume + self._is_dropping.clear() + logger.debug("Playback queue and PCM buffer cleared") From db8bef16954e0d06db31c0de54a2231f47b68933 Mon Sep 17 00:00:00 2001 From: Dario Sammaruga Date: Thu, 11 Dec 2025 12:04:29 +0100 Subject: [PATCH 21/22] rename method Speaker.clear() --- src/arduino/app_bricks/sound_generator/__init__.py | 13 +++++++++++-- src/arduino/app_peripherals/speaker/__init__.py | 13 ++++++------- 2 files changed, 17 insertions(+), 9 deletions(-) diff --git a/src/arduino/app_bricks/sound_generator/__init__.py b/src/arduino/app_bricks/sound_generator/__init__.py index 6b0ccd92..b452fbca 100644 --- a/src/arduino/app_bricks/sound_generator/__init__.py +++ b/src/arduino/app_bricks/sound_generator/__init__.py @@ -146,6 +146,16 @@ def set_master_volume(self, volume: float): """ self._master_volume = max(0.0, min(1.0, volume)) + def set_bpm(self, bpm: int): + """ + Set the tempo in beats per minute. + Args: + bpm (int): Tempo in beats per minute. + """ + with self._cfg_lock: + self._bpm = bpm + logger.debug(f"BPM updated to {bpm}") + def set_effects(self, effects: list): """ Set the list of sound effects to apply to the audio signal. @@ -750,7 +760,7 @@ def stop_sequence(self): self._sequence_stop_event.set() # Clear reference immediately - thread will clean itself up self._sequence_thread = None - self._output_device.drop_playback() + self._output_device.clear() else: logger.warning("stop_sequence called but no active sequence thread") @@ -782,7 +792,6 @@ def _playback_sequence_thread( queuing each step, ensuring perfect sync with audio playback. """ from itertools import cycle - import numpy as np try: duration = self._note_duration(note_duration) diff --git a/src/arduino/app_peripherals/speaker/__init__.py b/src/arduino/app_peripherals/speaker/__init__.py index 5e8ea6b0..2e1bb1a1 100644 --- a/src/arduino/app_peripherals/speaker/__init__.py +++ b/src/arduino/app_peripherals/speaker/__init__.py @@ -7,10 +7,9 @@ import threading import queue import re -import logging from arduino.app_utils import Logger -logger = Logger("Speaker", logging.DEBUG) +logger = Logger("Speaker") class SpeakerException(Exception): @@ -527,8 +526,8 @@ def clear_playback_queue(self): """Clear the playback queue.""" self._clear_queue() - def drop_playback(self): - """Drop all pending audio data immediately (both queue and hardware buffer). + def clear(self): + """Clear all pending audio data immediately (both queue and hardware buffer). This method clears both the software queue and the ALSA hardware buffer, stopping audio playback immediately. Use this for responsive stop operations. @@ -539,14 +538,14 @@ def drop_playback(self): # Clear software queue first self._clear_queue() - # Then drop ALSA hardware buffer + # Then clear ALSA hardware buffer with self._pcm_lock: if self._pcm is not None: try: self._pcm.drop() # Immediately stop PCM, drop pending frames - logger.debug("ALSA PCM buffer dropped") + logger.debug("ALSA PCM buffer cleared") except Exception as e: - logger.warning(f"Failed to drop PCM buffer: {e}") + logger.warning(f"Failed to clear PCM buffer: {e}") # Allow playback to resume self._is_dropping.clear() From 31802a1c504821afaa16dbb86cf46131eb03fa95 Mon Sep 17 00:00:00 2001 From: Dario Sammaruga Date: Thu, 11 Dec 2025 15:14:12 +0100 Subject: [PATCH 22/22] Add MusicComposition dataclass and play_composition method --- .../app_bricks/sound_generator/__init__.py | 34 ++++++++++ .../app_bricks/sound_generator/composition.py | 63 +++++++++++++++++++ 2 files changed, 97 insertions(+) create mode 100644 src/arduino/app_bricks/sound_generator/composition.py diff --git a/src/arduino/app_bricks/sound_generator/__init__.py b/src/arduino/app_bricks/sound_generator/__init__.py index b452fbca..eb8050b3 100644 --- a/src/arduino/app_bricks/sound_generator/__init__.py +++ b/src/arduino/app_bricks/sound_generator/__init__.py @@ -15,6 +15,7 @@ from .generator import WaveSamplesBuilder from .effects import * from .loaders import ABCNotationLoader +from .composition import MusicComposition as MusicComposition logger = Logger("SoundGenerator", logging.DEBUG) @@ -591,6 +592,39 @@ def play_polyphonic(self, notes: list[list[tuple[str, float]]], as_tone: bool = if block and duration > 0.0: time.sleep(duration) + def play_composition(self, composition: "MusicComposition", block: bool = False): + """ + Play a MusicComposition object. + + This method configures the SoundGenerator with the composition's settings + and plays the polyphonic sequence. + + Args: + composition (MusicComposition): The composition to play. + block (bool): If True, block until the entire composition has been played. + + Example: + ```python + from arduino.app_bricks.sound_generator import MusicComposition, SoundGenerator, SoundEffect + + comp = MusicComposition( + composition=[[("C4", 0.25), ("E4", 0.25)], [("G4", 0.5)]], bpm=120, waveform="square", volume=0.8, effects=[SoundEffect.adsr()] + ) + + gen = SoundGenerator() + gen.start() + gen.play_composition(comp, block=True) + ``` + """ + # Configure the generator with composition settings + self.set_bpm(composition.bpm) + self.set_wave_form(composition.waveform) + self.set_master_volume(composition.volume) + self.set_effects(composition.effects) + + # Play the composition + self.play_polyphonic(composition.composition, volume=composition.volume, block=block) + def play_chord(self, notes: list[str], note_duration: float | str = 1 / 4, volume: float = None, block: bool = False): """ Play a chord consisting of multiple musical notes simultaneously for a specified duration and volume. diff --git a/src/arduino/app_bricks/sound_generator/composition.py b/src/arduino/app_bricks/sound_generator/composition.py new file mode 100644 index 00000000..1604be8f --- /dev/null +++ b/src/arduino/app_bricks/sound_generator/composition.py @@ -0,0 +1,63 @@ +# SPDX-FileCopyrightText: Copyright (C) ARDUINO SRL (http://www.arduino.cc) +# +# SPDX-License-Identifier: MPL-2.0 + +from dataclasses import dataclass, field +from .effects import SoundEffect + + +@dataclass +class MusicComposition: + """ + A structured representation of a musical composition for SoundGenerator. + + This class encapsulates all the parameters needed to play a polyphonic composition, + making it easy to save, load, and share musical sequences. + + Attributes: + composition (list[list[tuple[str, float]]]): Polyphonic sequence as a list of tracks. + Each track is a list of tuples (note, duration). + Duration is in note fractions (1/4 = quarter note, 1/8 = eighth note). + Example: [[("C4", 0.25), ("E4", 0.25)], [("G4", 0.5)]] + bpm (int): Tempo in beats per minute. Default: 120. + waveform (str): Wave form type ("sine", "square", "triangle", "sawtooth"). Default: "sine". + volume (float): Master volume level (0.0 to 1.0). Default: 0.8. + effects (list): List of SoundEffect instances to apply. Default: [SoundEffect.adsr()]. + + Example: + ```python + from arduino.app_bricks.sound_generator import MusicComposition, SoundGenerator, SoundEffect + + # Create a composition + comp = MusicComposition( + composition=[ + [("C4", 0.25), ("E4", 0.25), ("G4", 0.25)], # Track 1 + [("REST", 0.25), ("C5", 0.5)], # Track 2 + ], + bpm=120, + waveform="square", + volume=0.8, + effects=[SoundEffect.adsr(), SoundEffect.tremolo(depth=0.5, rate=5.0)], + ) + + # Configure and play with SoundGenerator + gen = SoundGenerator() + gen.start() + gen.play_composition(comp, block=True) + + # Alternatively, set parameters manually and play + gen = SoundGenerator() + gen.start() + gen.set_bpm(comp.bpm) + gen.set_wave_form(comp.waveform) + gen.set_master_volume(comp.volume) + gen.set_effects(comp.effects) + gen.play_polyphonic(comp.composition, volume=comp.volume, block=True) + ``` + """ + + composition: list[list[tuple[str, float]]] + bpm: int = 120 + waveform: str = "sine" + volume: float = 0.8 + effects: list = field(default_factory=lambda: [SoundEffect.adsr()])