Skip to content

Commit 30e5623

Browse files
rjtokenringdsammaruga
authored andcommitted
Added bytes streamer to decouple generation and playback
1 parent 3f91a0a commit 30e5623

File tree

1 file changed

+161
-30
lines changed

1 file changed

+161
-30
lines changed

src/arduino/app_bricks/sound_generator/__init__.py

Lines changed: 161 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -5,14 +5,15 @@
55
from arduino.app_utils import WaveGenerator, brick
66
from arduino.app_peripherals.speaker import Speaker
77
import threading
8+
from typing import Iterable
89
import numpy as np
910

1011
from .effects import *
1112
from .loaders import ABCNotationLoader
1213

1314

1415
@brick
15-
class SoundGenerator:
16+
class SoundGeneratorStreamer:
1617
SAMPLE_RATE = 16000
1718
A4_FREQUENCY = 440.0
1819

@@ -53,17 +54,15 @@ class SoundGenerator:
5354

5455
def __init__(
5556
self,
56-
output_device: Speaker = None,
5757
bpm: int = 120,
5858
time_signature: tuple = (4, 4),
5959
octaves: int = 8,
6060
wave_form: str = "sine",
6161
master_volume: float = 1.0,
6262
sound_effects: list = None,
6363
):
64-
"""Initialize the SoundGenerator.
64+
"""Initialize the SoundGeneratorStreamer. Generates sound blocks for streaming, without internal playback.
6565
Args:
66-
output_device (Speaker, optional): The output device to play sound through.
6766
wave_form (str): The type of wave form to generate. Supported values
6867
are "sine" (default), "square", "triangle" and "sawtooth".
6968
bpm (int): The tempo in beats per minute for note duration calculations.
@@ -79,12 +78,6 @@ def __init__(
7978
self.time_signature = time_signature
8079
self._master_volume = master_volume
8180
self._sound_effects = sound_effects
82-
if output_device is None:
83-
self._self_created_device = True
84-
self._output_device = Speaker(sample_rate=self.SAMPLE_RATE, format="FLOAT_LE")
85-
else:
86-
self._self_created_device = False
87-
self._output_device = output_device
8881

8982
self._cfg_lock = threading.Lock()
9083
self._notes = {}
@@ -93,12 +86,10 @@ def __init__(
9386
self._notes.update(notes)
9487

9588
def start(self):
96-
if self._self_created_device:
97-
self._output_device.start(notify_if_started=False)
89+
pass
9890

9991
def stop(self):
100-
if self._self_created_device:
101-
self._output_device.stop()
92+
pass
10293

10394
def set_master_volume(self, volume: float):
10495
"""
@@ -245,7 +236,11 @@ def _get_note(self, note: str) -> float | None:
245236
return None
246237
return self._notes.get(note.strip().upper())
247238

248-
def play_polyphonic(self, notes: list[list[tuple[str, float]]], as_tone: bool = False, volume: float = None):
239+
def _to_bytes(self, signal: np.ndarray) -> bytes:
240+
# Format: "FLOAT_LE" -> (ALSA: "PCM_FORMAT_FLOAT_LE", np.float32),
241+
return signal.astype(np.float32).tobytes()
242+
243+
def play_polyphonic(self, notes: list[list[tuple[str, float]]], as_tone: bool = False, volume: float = None) -> bytes:
249244
"""
250245
Play multiple sequences of musical notes simultaneously (poliphony).
251246
It is possible to play multi track music by providing a list of sequences,
@@ -255,6 +250,8 @@ def play_polyphonic(self, notes: list[list[tuple[str, float]]], as_tone: bool =
255250
notes (list[list[tuple[str, float]]]): List of sequences, each sequence is a list of tuples (note, duration).
256251
as_tone (bool): If True, play as tones, considering duration in seconds
257252
volume (float, optional): Volume level (0.0 to 1.0). If None, uses master volume.
253+
Returns:
254+
bytes: The audio block of the mixed sequences (float32).
258255
"""
259256
if volume is None:
260257
volume = self._master_volume
@@ -296,18 +293,17 @@ def play_polyphonic(self, notes: list[list[tuple[str, float]]], as_tone: bool =
296293
mixed /= np.max(np.abs(mixed)) # Normalize to prevent clipping
297294
blk = mixed.astype(np.float32)
298295
blk = self._apply_sound_effects(blk, base_frequency)
299-
try:
300-
self._output_device.play(blk, block_on_queue=False)
301-
except Exception as e:
302-
print(f"Error playing multiple sequences: {e}")
296+
return self._to_bytes(blk)
303297

304-
def play_chord(self, notes: list[str], note_duration: float | str = 1 / 4, volume: float = None):
298+
def play_chord(self, notes: list[str], note_duration: float | str = 1 / 4, volume: float = None) -> bytes:
305299
"""
306300
Play a chord consisting of multiple musical notes simultaneously for a specified duration and volume.
307301
Args:
308302
notes (list[str]): List of musical notes to play (e.g., ['A4', 'C#5', 'E5']).
309303
note_duration (float | str): Duration of the chord as a float (like 1/4, 1/8) or a symbol ('W', 'H', 'Q', etc.).
310304
volume (float, optional): Volume level (0.0 to 1.0). If None, uses master volume.
305+
Returns:
306+
bytes: The audio block of the mixed sequences (float32).
311307
"""
312308
duration = self._note_duration(note_duration)
313309
if len(notes) == 1:
@@ -333,18 +329,17 @@ def play_chord(self, notes: list[str], note_duration: float | str = 1 / 4, volum
333329
chord /= np.max(np.abs(chord)) # Normalize to prevent clipping
334330
blk = chord.astype(np.float32)
335331
blk = self._apply_sound_effects(blk, base_frequency)
336-
try:
337-
self._output_device.play(blk, block_on_queue=False)
338-
except Exception as e:
339-
print(f"Error playing chord {notes}: {e}")
332+
return self._to_bytes(blk)
340333

341-
def play(self, note: str, note_duration: float | str = 1 / 4, volume: float = None):
334+
def play(self, note: str, note_duration: float | str = 1 / 4, volume: float = None) -> bytes:
342335
"""
343336
Play a musical note for a specified duration and volume.
344337
Args:
345338
note (str): The musical note to play (e.g., 'A4', 'C#5', 'REST').
346339
note_duration (float | str): Duration of the note as a float (like 1/4, 1/8) or a symbol ('W', 'H', 'Q', etc.).
347340
volume (float, optional): Volume level (0.0 to 1.0). If None, uses master volume.
341+
Returns:
342+
bytes: The audio block of the played note (float32).
348343
"""
349344
duration = self._note_duration(note_duration)
350345
frequency = self._get_note(note)
@@ -353,30 +348,34 @@ def play(self, note: str, note_duration: float | str = 1 / 4, volume: float = No
353348
volume = self._master_volume
354349
data = self._wave_gen.generate_block(float(frequency), duration, volume)
355350
data = self._apply_sound_effects(data, frequency)
356-
self._output_device.play(data, block_on_queue=False)
351+
return self._to_bytes(data)
357352

358-
def play_tone(self, note: str, duration: float = 0.25, volume: float = None):
353+
def play_tone(self, note: str, duration: float = 0.25, volume: float = None) -> bytes:
359354
"""
360355
Play a musical note for a specified duration and volume.
361356
Args:
362357
note (str): The musical note to play (e.g., 'A4', 'C#5', 'REST').
363358
duration (float): Duration of the note as a float in seconds.
364359
volume (float, optional): Volume level (0.0 to 1.0). If None, uses master volume.
360+
Returns:
361+
bytes: The audio block of the played note (float32).
365362
"""
366363
frequency = self._get_note(note)
367364
if frequency is not None and frequency >= 0.0 and duration > 0.0:
368365
if volume is None:
369366
volume = self._master_volume
370367
data = self._wave_gen.generate_block(float(frequency), duration, volume)
371368
data = self._apply_sound_effects(data, frequency)
372-
self._output_device.play(data, block_on_queue=False)
369+
return self._to_bytes(data)
373370

374-
def play_abc(self, abc_string: str, volume: float = None):
371+
def play_abc(self, abc_string: str, volume: float = None) -> Iterable[bytes]:
375372
"""
376373
Play a sequence of musical notes defined in ABC notation.
377374
Args:
378375
abc_string (str): ABC notation string defining the sequence of notes.
379376
volume (float, optional): Volume level (0.0 to 1.0). If None, uses master volume.
377+
Returns:
378+
Iterable[bytes]: An iterable yielding the audio blocks of the played notes (float32).
380379
"""
381380
if not abc_string or abc_string.strip() == "":
382381
return
@@ -388,4 +387,136 @@ def play_abc(self, abc_string: str, volume: float = None):
388387
if frequency is not None and frequency >= 0.0:
389388
data = self._wave_gen.generate_block(float(frequency), duration, volume)
390389
data = self._apply_sound_effects(data, frequency)
391-
self._output_device.play(data, block_on_queue=False)
390+
yield self._to_bytes(data)
391+
392+
393+
@brick
394+
class SoundGenerator(SoundGeneratorStreamer):
395+
def __init__(
396+
self,
397+
output_device: Speaker = None,
398+
bpm: int = 120,
399+
time_signature: tuple = (4, 4),
400+
octaves: int = 8,
401+
wave_form: str = "sine",
402+
master_volume: float = 1.0,
403+
sound_effects: list = None,
404+
):
405+
"""Initialize the SoundGenerator.
406+
Args:
407+
output_device (Speaker, optional): The output device to play sound through.
408+
wave_form (str): The type of wave form to generate. Supported values
409+
are "sine" (default), "square", "triangle" and "sawtooth".
410+
bpm (int): The tempo in beats per minute for note duration calculations.
411+
master_volume (float): The master volume level (0.0 to 1.0).
412+
octaves (int): Number of octaves to generate notes for (starting from octave
413+
0 up to octaves-1).
414+
sound_effects (list, optional): List of sound effect instances to apply to the audio
415+
signal (e.g., [SoundEffect.adsr()]). See SoundEffect class for available effects.
416+
"""
417+
418+
super().__init__(
419+
bpm=bpm,
420+
time_signature=time_signature,
421+
octaves=octaves,
422+
wave_form=wave_form,
423+
master_volume=master_volume,
424+
sound_effects=sound_effects,
425+
)
426+
427+
if output_device is None:
428+
self._self_created_device = True
429+
self._output_device = Speaker(sample_rate=self.SAMPLE_RATE, format="FLOAT_LE")
430+
else:
431+
self._self_created_device = False
432+
self._output_device = output_device
433+
434+
def start(self):
435+
if self._self_created_device:
436+
self._output_device.start(notify_if_started=False)
437+
438+
def stop(self):
439+
if self._self_created_device:
440+
self._output_device.stop()
441+
442+
def set_master_volume(self, volume: float):
443+
"""
444+
Set the master volume level.
445+
Args:
446+
volume (float): Volume level (0.0 to 1.0).
447+
"""
448+
super().set_master_volume(volume)
449+
450+
def set_effects(self, effects: list):
451+
"""
452+
Set the list of sound effects to apply to the audio signal.
453+
Args:
454+
effects (list): List of sound effect instances (e.g., [SoundEffect.adsr()]).
455+
"""
456+
super().set_effects(effects)
457+
458+
def play_polyphonic(self, notes: list[list[tuple[str, float]]], as_tone: bool = False, volume: float = None):
459+
"""
460+
Play multiple sequences of musical notes simultaneously (poliphony).
461+
It is possible to play multi track music by providing a list of sequences,
462+
where each sequence is a list of tuples (note, duration).
463+
Duration is in notes fractions (e.g., 1/4 for quarter note).
464+
Args:
465+
notes (list[list[tuple[str, float]]]): List of sequences, each sequence is a list of tuples (note, duration).
466+
as_tone (bool): If True, play as tones, considering duration in seconds
467+
volume (float, optional): Volume level (0.0 to 1.0). If None, uses master volume.
468+
"""
469+
blk = super().play_polyphonic(notes, as_tone, volume)
470+
try:
471+
self._output_device.play(blk, block_on_queue=False)
472+
except Exception as e:
473+
print(f"Error playing multiple sequences: {e}")
474+
475+
def play_chord(self, notes: list[str], note_duration: float | str = 1 / 4, volume: float = None):
476+
"""
477+
Play a chord consisting of multiple musical notes simultaneously for a specified duration and volume.
478+
Args:
479+
notes (list[str]): List of musical notes to play (e.g., ['A4', 'C#5', 'E5']).
480+
note_duration (float | str): Duration of the chord as a float (like 1/4, 1/8) or a symbol ('W', 'H', 'Q', etc.).
481+
volume (float, optional): Volume level (0.0 to 1.0). If None, uses master volume.
482+
"""
483+
blk = super().play_chord(notes, note_duration, volume)
484+
try:
485+
self._output_device.play(blk, block_on_queue=False)
486+
except Exception as e:
487+
print(f"Error playing chord {notes}: {e}")
488+
489+
def play(self, note: str, note_duration: float | str = 1 / 4, volume: float = None):
490+
"""
491+
Play a musical note for a specified duration and volume.
492+
Args:
493+
note (str): The musical note to play (e.g., 'A4', 'C#5', 'REST').
494+
note_duration (float | str): Duration of the note as a float (like 1/4, 1/8) or a symbol ('W', 'H', 'Q', etc.).
495+
volume (float, optional): Volume level (0.0 to 1.0). If None, uses master volume.
496+
"""
497+
data = super().play(note, note_duration, volume)
498+
self._output_device.play(data, block_on_queue=False)
499+
500+
def play_tone(self, note: str, duration: float = 0.25, volume: float = None):
501+
"""
502+
Play a musical note for a specified duration and volume.
503+
Args:
504+
note (str): The musical note to play (e.g., 'A4', 'C#5', 'REST').
505+
duration (float): Duration of the note as a float in seconds.
506+
volume (float, optional): Volume level (0.0 to 1.0). If None, uses master volume.
507+
"""
508+
data = super().play_tone(note, duration, volume)
509+
self._output_device.play(data, block_on_queue=False)
510+
511+
def play_abc(self, abc_string: str, volume: float = None):
512+
"""
513+
Play a sequence of musical notes defined in ABC notation.
514+
Args:
515+
abc_string (str): ABC notation string defining the sequence of notes.
516+
volume (float, optional): Volume level (0.0 to 1.0). If None, uses master volume.
517+
"""
518+
if not abc_string or abc_string.strip() == "":
519+
return
520+
player = super().play_abc(abc_string, volume)
521+
for data in player:
522+
self._output_device.play(data, block_on_queue=False)

0 commit comments

Comments
 (0)