# SPDX-FileCopyrightText: 2020-2026 ReproNim Team <info@repronim.org>
#
# SPDX-License-Identifier: MIT
"""
Audiocodes module for reprostim, provides functionality to
generate and parse QR-like audiocodes to be included in
psychopy based scripts.
"""
import importlib
import logging
import os
import tempfile
import time
from datetime import datetime
from enum import Enum
import numpy as np
from reedsolo import RSCodec
from scipy.io import wavfile
from scipy.io.wavfile import read, write
# optionally: import sounddevice as sd
sd = (
importlib.import_module("sounddevice")
if importlib.util.find_spec("sounddevice")
else None
)
# setup logging
logger = logging.getLogger(__name__)
logger.setLevel(os.environ.get("REPROSTIM_LOG_LEVEL", "INFO"))
######################################
# Setup psychopy audio library
[docs]
class AudioLib(str, Enum):
"""
Enum for the audio libs constants.
"""
PSYCHOPY_SOUNDDEVICE = "psychopy_sounddevice"
"""
PsychoPy `SoundDevice` audio lib
"""
PSYCHOPY_PTB = "psychopy_ptb"
"""
PsychoPy `SoundPTB` audio lib
"""
SOUNDDEVICE = "sounddevice"
"""
`sounddevice` audio lib, see more at:
http://python-sounddevice.readthedocs.io/
"""
_audio_lib = os.environ.get("REPROSTIM_AUDIO_LIB", AudioLib.PSYCHOPY_SOUNDDEVICE)
# optionally import psychopy
try:
from psychopy import prefs # noqa: E402
# skip setup under sphinx/RTD
if os.getenv("REPROSTIM_DOCS") != "True":
prefs.hardware["audioLib"] = ["sounddevice"]
if _audio_lib == AudioLib.PSYCHOPY_SOUNDDEVICE:
logger.debug("Set psychopy audio library: sounddevice")
prefs.hardware["audioLib"] = ["sounddevice"]
elif _audio_lib == AudioLib.PSYCHOPY_PTB:
logger.debug("Set psychopy audio library: ptb")
prefs.hardware["audioLib"] = ["ptb"]
if _audio_lib in (AudioLib.PSYCHOPY_SOUNDDEVICE):
# PsychoPy 2025+ requires explicit plugin loading for sounddevice
# Note: Review this code once psychopy-sounddevice will be back into PsychoPy
# as reported in https://github.com/psychopy/psychopy-sounddevice/issues/5
try:
from psychopy import plugins
plugins.loadPlugin("psychopy-sounddevice")
logger.debug(f"Installed psychopy plugins: {plugins.listPlugins()}")
logger.debug(
f"Loaded psychopy plugins: {plugins.listPlugins(which='loaded')}"
)
if "psychopy-sounddevice" not in plugins.listPlugins(which="loaded"):
# loadPlugin silently failed (e.g. PsychoPy 2025 compatibility
# issue) — bypass/temporary fix to manual registration
logger.warning(
"psychopy-sounddevice not loaded after loadPlugin(), "
"attempting manual import workaround"
)
import psychopy_sounddevice
from psychopy import sound as _sound
_sound.backend_sounddevice = psychopy_sounddevice
importlib.reload(_sound)
logger.debug(
"Manually registered psychopy_sounddevice backend and reloaded "
"psychopy.sound"
)
else:
logger.debug("Loaded psychopy-sounddevice plugin")
except Exception as e:
logger.warning(f"Could not load psychopy-sounddevice plugin: {e}")
from psychopy import core, sound # noqa: E402
from psychtoolbox import audio # noqa: E402
except ImportError:
logger.warn(
"psychopy module not found, if necessary "
"install reprostim with [all] extra dependencies"
)
######################################
# Audio code/qr helper functions
[docs]
def bit_enumerator(data):
"""
Enumerate audio data bits.
This function takes either a string representation of binary digits (`0` and `1`)
or a bytes object and yields individual bits as integers (`0` or `1`).
:param data: The input data containing bits.
:type data: str | bytes | DataMessage
:yield: The extracted bits as integers (`0` or `1`).
:rtype: int
:raises ValueError: If a string contains characters other than `0` and `1`.
:raises TypeError: If the input data type is not supported.
"""
if isinstance(data, DataMessage):
data = data.encode()
if isinstance(data, str):
# If data is a string, iterate over each character
for char in data:
if char not in ("0", "1"):
raise ValueError("String must only contain '0' and '1'.")
yield int(char) # Yield the bit as an integer
elif isinstance(data, bytes):
# If data is bytes, iterate over each byte
for byte in data:
for i in range(7, -1, -1): # Iterate from MSB to LSB
yield (byte >> i) & 1 # Extract and yield the bit
else:
raise TypeError(
"Data must be either a string or bytes. Got: " + str(type(data))
)
[docs]
def bits_to_bytes(detected_bits):
"""
Convert a list of bits (`0` and `1`) into a bytes object.
This function takes a list of bits, groups them into bytes
(8 bits each), and converts them into a bytes object in big-endian order.
:param detected_bits: A list containing only `0` and `1`. The length
of the list must be a multiple of 8.
:type detected_bits: list of int
:returns: A bytes object representing the converted bit sequence.
:rtype: bytes
:raises ValueError: If the length of `detected_bits` is not a multiple of 8.
Example
-------
>>> bits_to_bytes([1, 0, 0, 0, 0, 0, 0, 1])
b'\\x81'
"""
# Check if the length of detected_bits is a multiple of 8
if len(detected_bits) % 8 != 0:
raise ValueError(
f"Detected bits array must be aligned to 8 bits. Length={len(detected_bits)}"
)
byte_array = bytearray() # Use bytearray for mutable bytes
for i in range(0, len(detected_bits), 8):
# Get the next 8 bits
byte_bits = detected_bits[i : i + 8] # noqa: E203
# Convert the list of bits to a byte (big-endian)
byte_value = 0
for bit in byte_bits:
byte_value = (byte_value << 1) | bit # Shift left and add the bit
byte_array.append(byte_value) # Append the byte to the bytearray
return bytes(byte_array) # Convert bytearray to bytes
[docs]
def crc8(data: bytes, polynomial: int = 0x31, init_value: int = 0x00) -> int:
"""
Compute the CRC-8 checksum for a given byte sequence.
:param data: The input data for which to compute the CRC-8 checksum.
:type data: bytes
:param polynomial: Polynomial to calculate CRC (default: 0x31).
:type polynomial: int, optional
:param init_value: The initial CRC value (default: 0x00).
:type init_value: int, optional
:returns: The computed 8-bit CRC checksum.
:rtype: int
Example
-------
>>> crc8(b"123456789")
b'\\0xA2'
>>> crc8(b"Hello", polynomial=0x07, init_value=0xFF)
b'\\0xFC'
"""
crc = init_value
for byte in data:
crc ^= byte # XOR byte into the CRC
for _ in range(8): # Process each bit
if crc & 0x80: # If the highest bit is set
crc = (crc << 1) ^ polynomial # Shift left and XOR with polynomial
else:
crc <<= 1 # Just shift left
crc &= 0xFF # Keep CRC to 8 bits
return crc
######################################
# Constants
[docs]
class AudioCodec(str, Enum):
"""
Enum for audio codecs constants.
"""
FSK = "FSK"
"""
Frequency Shift Keying (FSK), where binary data is
encoded as two different frequencies (f0 and f1)
with a fixed bit duration (baud rate or bit_rate).
"""
NFE = "NFE"
"""
Numerical Frequency Encoding (NFE), where numbers
are mapped directly to specific frequencies. This
codec can encode only certain numeric hash values.
"""
######################################
# Classes
[docs]
class DataMessage:
"""
Class representing an audio data message in big-endian format,
encoded with Reed-Solomon error correction.
The message is structured as follows:
- 1st byte: CRC-8 checksum
- 2nd byte: Length of the data
- 3rd byte onward: The data itself
"""
def __init__(self):
"""
Initializes a new DataMessage instance.
Attributes are set to default values: empty data, length 0, and CRC-8 0.
Reed-Solomon error correction is enabled by default.
"""
self.value: bytes = b""
self.length: int = 0
self.crc8: int = 0
self.use_ecc: bool = True
self.rsc = RSCodec(4)
[docs]
def decode(self, data: bytes):
"""
Decodes the given data bytes, verifying the CRC-8 checksum and parsing the
length and data. If error correction is enabled, Reed-Solomon error correction
is applied to the data before decoding.
:param data: The encoded byte data to decode, including the CRC-8 checksum
and length.
:type data: bytes
:raises ValueError: If the CRC-8 checksum does not match the computed checksum
of the data.
"""
if self.use_ecc:
dec, dec_full, errata_pos_all = self.rsc.decode(data)
data = bytes(dec)
# logger.debug(f"decoded data : {data}")
self.crc8 = data[0]
self.length = data[1]
self.value = data[2:]
n = crc8(self.value)
if self.crc8 != n:
raise ValueError(f"CRC-8 checksum mismatch: {self.crc8} <-> {n}")
[docs]
def encode(self) -> bytes:
"""
Encodes the data message, including the CRC-8 checksum and length,
and applies Reed-Solomon error correction if enabled. Returns the
encoded byte sequence.
:returns: The encoded byte sequence with the CRC-8 checksum, length,
and optionally Reed-Solomon error correction.
:rtype: bytes
"""
logger.debug("size info")
logger.debug(f" - data : {len(self.value)} bytes, {self.value}")
b: bytes = bytes([self.crc8, self.length]) + self.value
logger.debug(f" - message : {len(b)} bytes, {b}")
if self.use_ecc:
b = bytes(self.rsc.encode(b))
logger.debug(f" - ecc : {len(b)} bytes, {b}")
return b
[docs]
def get_bytes(self) -> bytes:
"""
Returns the data value as a bytes object.
:returns: The data stored in the message.
:rtype: bytes
"""
return self.value
[docs]
def get_str(self) -> str:
"""
Returns the data value as a UTF-8 decoded string.
:returns: The decoded string from the message data.
:rtype: str
"""
return self.value.decode("utf-8")
[docs]
def get_uint(self) -> int:
"""
Returns the stored data as an unsigned integer.
Decodes the data based on its length (2, 4, or 8 bytes) and returns it as
either a `uint16`, `uint32`, or `uint64`.
:returns: The decoded unsigned integer.
:rtype: int
:raises ValueError: If the data length is not 2, 4, or 8 bytes.
"""
c = len(self.value)
if c == 2:
return self.get_uint16()
elif c == 4:
return self.get_uint32()
elif c == 8:
return self.get_uint64()
else:
raise ValueError(f"Data length must be 2, 4, or 8 bytes, " f"but was {c}")
[docs]
def get_uint16(self) -> int:
"""
Returns the stored data as a 16-bit unsigned integer (uint16).
The method assumes the data length is exactly 2 bytes. If the length is
not 2, a `ValueError` is raised.
:returns: The decoded 16-bit unsigned integer.
:rtype: int
:raises ValueError: If the data length is not 2 bytes.
"""
if len(self.value) != 2:
raise ValueError(
f"Data length for uint16 must be 2 bytes, " f"but was {len(self.value)}"
)
return int.from_bytes(self.value, "big")
[docs]
def get_uint32(self) -> int:
"""
Returns the stored data as a 32-bit unsigned integer (uint32).
The method assumes the data length is exactly 4 bytes. If the length is
not 4, a `ValueError` is raised.
:returns: The decoded 32-bit unsigned integer.
:rtype: int
:raises ValueError: If the data length is not 4 bytes.
"""
if len(self.value) != 4:
raise ValueError(
f"Data length for uint32 must be 4 bytes, " f"but was {len(self.value)}"
)
return int.from_bytes(self.value, "big")
[docs]
def get_uint64(self) -> int:
"""
Returns the stored data as a 64-bit unsigned integer (uint64).
The method assumes the data length is exactly 8 bytes. If the length is
not 8, a `ValueError` is raised.
:returns: The decoded 64-bit unsigned integer.
:rtype: int
:raises ValueError: If the data length is not 8 bytes.
"""
if len(self.value) != 8:
raise ValueError(
f"Data length for uint64 must be 8 bytes, " f"but was {len(self.value)}"
)
return int.from_bytes(self.value, "big")
[docs]
def set_bytes(self, data: bytes):
"""
Sets the bytes data value.
:param data: The data to be stored in the message.
:type data: bytes
"""
self.value = data
self.length = len(data)
self.crc8 = crc8(data)
[docs]
def set_str(self, s: str):
"""
Sets the string data value.
:param s: The string to be stored in the message.
:type s: str
"""
self.set_bytes(s.encode("utf-8"))
[docs]
def set_uint16(self, i: int):
"""
Sets the 16-bit unsigned integer data value.
:param i: The uint16 to be stored in the message.
:type i: int
"""
self.set_bytes(i.to_bytes(2, "big"))
[docs]
def set_uint32(self, i: int):
"""
Sets the 32-bit unsigned integer data value.
:param i: The uint32 to be stored in the message.
:type i: int
"""
self.set_bytes(i.to_bytes(4, "big"))
[docs]
def set_uint64(self, i: int):
"""
Sets the 64-bit unsigned integer data value.
:param i: The uint64 to be stored in the message.
:type i: int
"""
self.set_bytes(i.to_bytes(8, "big"))
[docs]
class AudioCodeInfo:
"""
Class to provide general information about an audiocode.
"""
def __init__(self):
self.codec = None
"""The codec used for the audiocode (e.g., `FSK`, `NFE`)."""
self.f1 = None
"""The frequency corresponding to logic `1` in FSK modulation."""
self.f0 = None
"""The frequency corresponding to logic `0` in FSK modulation."""
self.nfe_df = None
"""Frequency difference in Hz used by NFE codec."""
self.sample_rate = None
"""The sample rate of the audio signal"""
self.bit_duration = None
"""Bit duration in seconds for FSK audiocode."""
self.nfe_duration = None
"""Duration of NFE signal in seconds."""
self.nfe_freq = None
"""Frequency for NFE modulation when this codec is used."""
self.bit_count = None
"""The number of bits in the encoded data."""
self.volume = None
"""The volume level of the audio signal on range 0..1 ."""
self.duration = None
"""The total duration of the audio signal in seconds."""
self.pre_delay = None
"""Pre-delay time before the audio code starts in seconds."""
self.post_delay = None
"""Post-delay time after the audio code ends in seconds."""
# to string
def __str__(self):
return (
f"AudioCodeInfo(codec={self.codec}, "
f"f1={self.f1}, "
f"f0={self.f0}, "
f"nfe_df={self.nfe_df}, "
f"rate={self.sample_rate}, "
f"bit_duration={self.bit_duration}, "
f"bit_count={self.bit_count}, "
f"nfe_freq={self.nfe_freq}, "
f"volume={self.volume}, "
f"duration={self.duration},"
f"pre_delay={self.pre_delay}, "
f"post_delay={self.post_delay})"
)
[docs]
class AudioCodeEngine:
"""
Class to generate and parse QR-like audiocodes with Frequency Shift Keying
(FSK) or Numerical Frequency Encoding (NFE) modulations.
This class handles the encoding and decoding of audio data using FSK or
NFE modulation schemes, where data is represented as special audio tones.
:param codec: The codec used for encoding the audio data. Defaults to
`AudioCodec.FSK`.
:type codec: AudioCodec
:param f0: The frequency for representing a binary `0` in FSK modulation.
Defaults to 1000 Hz.
:type f0: int
:param f1: The frequency for representing a binary `1` in FSK modulation.
Defaults to 5000 Hz.
:type f1: int
:param nfe_df: The frequency difference used for Numerical Frequency Encoding
(NFE). Defaults to 100 Hz.
:type nfe_df: int
:param sample_rate: The sample rate used for audio generation. Defaults
to 44100 Hz.
:type sample_rate: int
:param bit_duration: The duration of each bit in FSK modulation. Defaults
to 0.0070 seconds.
:type bit_duration: float
:param nfe_duration: The duration for each frequency in NFE encoding.
Defaults to 0.5 seconds.
:type nfe_duration: float
:param volume: The volume level for the audio output. Defaults to 0.80.
:type volume: float
:param pre_delay: The pre-delay before the audio signal starts.
Defaults to 0.1 seconds.
:type pre_delay: float
:param pre_f: The frequency before the start of the tone in FSK encoding.
Defaults to 0 Hz (1780 Hz).
:type pre_f: int
:param post_delay: The post-delay after the audio signal ends. Defaults
to 0.1 seconds.
:type post_delay: float
:param post_f: The frequency after the end of the tone in FSK encoding.
Defaults to 0 Hz (3571 Hz).
:type post_f: int
"""
def __init__(
self,
codec=AudioCodec.FSK,
f0=1000,
f1=5000,
nfe_df=100, # used only in NFE
sample_rate=44100,
bit_duration=0.0070, # used only in FSK
nfe_duration=0.5, # used only in NFE
volume=0.80,
pre_delay=0.1,
pre_f=0, # 1780
post_delay=0.1,
post_f=0, # 3571
):
self.codec = codec
self.f0 = f0
self.f1 = f1
self.nfe_df = nfe_df
self.sample_rate = sample_rate
self.bit_duration = bit_duration
self.nfe_duration = nfe_duration
if volume < 0.0 or volume > 1.0:
raise ValueError("Volume must be between 0.0 and 1.0.")
self.volume = volume
self.pre_delay = pre_delay
self.pre_f = pre_f
self.post_delay = post_delay
self.post_f = post_f
[docs]
def generate_sin(self, freq_hz, duration_sec):
"""
Generates a sine wave audio signal with a specified frequency and duration.
:param freq_hz: The frequency of the sine wave in Hz.
:type freq_hz: float
:param duration_sec: The duration of the sine wave signal in seconds.
:type duration_sec: float
:return: A numpy array representing the generated sine wave audio signal.
:rtype: numpy.ndarray
"""
t = np.linspace(
0, duration_sec, int(self.sample_rate * duration_sec), endpoint=False
)
signal = self.volume * np.sin(2 * np.pi * freq_hz * t)
return signal
[docs]
def generate_fsk(self, data) -> (np.array, AudioCodeInfo):
"""
Generates `FSK` audio signal from the given data.
:param data: The data to be encoded into the FSK signal. This could be a
string, bytes, or int.
:type data: str | bytes | int
:return: A tuple with FSK audio signal and related audiocode information.
:rtype: tuple(np.ndarray, AudioCodeInfo)
"""
logger.debug(
f"audio config : f1={self.f1} Hz, f0={self.f0} Hz, "
f"rate={self.sample_rate} Hz, "
f"bit duration={self.bit_duration} sec, "
f"volume={self.volume}"
)
t = np.linspace(
0,
self.bit_duration,
int(self.sample_rate * self.bit_duration),
endpoint=False,
)
# Create FSK signal
fsk_signal = np.array([])
pre_signal = np.array([])
post_signal = np.array([])
# generate pre-signal if any
if self.pre_delay > 0:
pre_signal = self.generate_sin(self.pre_f, self.pre_delay)
# generate post-signal if any
if self.post_delay > 0:
post_signal = self.generate_sin(self.post_f, self.post_delay)
# generate data signal properly
c: int = 0
sb: str = ""
for bit in bit_enumerator(data):
c += 1
sb += str(bit)
if bit == 1:
fsk_signal = np.concatenate(
(fsk_signal, self.volume * np.sin(2 * np.pi * self.f1 * t))
)
else:
fsk_signal = np.concatenate(
(fsk_signal, self.volume * np.sin(2 * np.pi * self.f0 * t))
)
# concatenate pre-signal, data signal, and post-signal
if self.pre_delay > 0 or self.post_delay > 0:
fsk_signal = np.concatenate((pre_signal, fsk_signal, post_signal))
# Normalize the signal for 100% volume
if self.volume == 1.0:
fsk_signal /= np.max(np.abs(fsk_signal))
sci: AudioCodeInfo = AudioCodeInfo()
sci.codec = AudioCodec.FSK
sci.f1 = self.f1
sci.f0 = self.f0
sci.sample_rate = self.sample_rate
sci.bit_duration = self.bit_duration
sci.bit_count = c
sci.volume = self.volume
sci.duration = c * self.bit_duration + self.pre_delay + self.post_delay
sci.pre_delay = self.pre_delay
sci.post_delay = self.post_delay
logger.debug(f"audio raw bits: count={c}, {sb}")
logger.debug(f"audio duration: {sci.duration:.6f} seconds")
return (fsk_signal, sci)
[docs]
def generate_nfe(self, data) -> (np.array, AudioCodeInfo):
"""
Generates `NFE` audio signal from the given data.
:param data: The data to be encoded into the NFE signal. This could be a
string, bytes, or int.
:type data: str | bytes | int
:return: A tuple with NFE audio signal and related audiocode information.
:rtype: tuple(np.ndarray, AudioCodeInfo)
"""
nfe_signal = np.array([])
pre_signal = np.array([])
post_signal = np.array([])
# generate pre-signal if any
if self.pre_delay > 0:
pre_signal = self.generate_sin(self.pre_f, self.pre_delay)
# generate post-signal if any
if self.post_delay > 0:
post_signal = self.generate_sin(self.post_f, self.post_delay)
n = data.get_uint()
c = int((self.f1 - self.f0) / self.nfe_df) + 1
freq = self.f0 + (n % c) * self.nfe_df
logger.debug(f" n={n}, c={c}, freq={freq}")
nfe_signal = self.generate_sin(freq, self.nfe_duration)
# concatenate pre-signal, data signal, and post-signal
if self.pre_delay > 0 or self.post_delay > 0:
nfe_signal = np.concatenate((pre_signal, nfe_signal, post_signal))
# Normalize the signal for 100% volume
if self.volume == 1.0:
nfe_signal /= np.max(np.abs(nfe_signal))
sci: AudioCodeInfo = AudioCodeInfo()
sci.codec = AudioCodec.NFE
sci.f1 = self.f1
sci.f0 = self.f0
sci.nfe_df = self.nfe_df
sci.sample_rate = self.sample_rate
sci.nfe_duration = self.nfe_duration
sci.nfe_freq = freq
sci.volume = self.volume
sci.duration = self.nfe_duration + self.pre_delay + self.post_delay
sci.pre_delay = self.pre_delay
sci.post_delay = self.post_delay
logger.debug(f"audio duration: {sci.duration:.6f} seconds")
return nfe_signal, sci
[docs]
def generate(self, data) -> (np.array, AudioCodeInfo):
"""
Generates an audio signal based on the specified codec
and provided data.
This method selects the appropriate encoding method based on
the codec and generates the corresponding audio signal. Currently
supports `FSK` and `NFE` encodings.
:param data: The data to be encoded into the audio signal.
The format of the data depends on the selected codec.
:type data: str | bytes | int
:return: A tuple containing the generated audio signal and related
audiocode information.
:rtype: tuple(np.ndarray, AudioCodeInfo)
:raises ValueError: If the codec is not supported.
Example
-------
>>> engine = AudioCodeEngine(codec=AudioCodec.FSK)
>>> signal, info = engine.generate("Hello")
"""
if self.codec == AudioCodec.FSK:
return self.generate_fsk(data)
elif self.codec == AudioCodec.NFE:
return self.generate_nfe(data)
else:
raise ValueError(f"Unsupported codec: {self.codec}")
# play audio data with sounddevice
[docs]
def play_data_sd(self, data):
"""
Plays the audio data using the `sounddevice` library.
This method generates an audio signal based on the provided
data and plays it using the `sounddevice` library.
:param data: The data to be encoded into the audio signal and played.
:type data: str | bytes | int
:return: None
:Example:
>>> engine = AudioCodeEngine()
>>> engine.play_data_sd("Hello")
"""
ts = time.perf_counter()
fsk_signal, sci = self.generate(data)
ts = time.perf_counter() - ts
logger.debug(f"generate time : {ts:.6f} seconds")
ts = time.perf_counter()
# Play the FSK signal
sd.play(fsk_signal, samplerate=self.sample_rate)
# Wait until audio is finished playing
sd.wait()
ts = time.perf_counter() - ts
logger.debug(f"play time : {ts:.6f} seconds")
[docs]
def save(self, data, filename):
"""
Generates and saves the audio signal to `*.wav` file.
:param data: The data to be encoded into the audio signal.
:type data: str | bytes | int
:param filename: The name of the `*.wav` file where the
signal will be saved.
:type filename: str
:return: The AudioCodeInfo object containing information
about the saved audiocode.
:rtype: AudioCodeInfo
Example
-------
>>> engine = AudioCodeEngine()
>>> engine.save("Hello", "output.wav")
"""
fsk_signal, sci = self.generate(data)
# Save the signal to a WAV file
write(filename, self.sample_rate, (fsk_signal * 32767).astype(np.int16))
return sci
[docs]
def parse(self, filename):
"""
Parses the audiocode file to detect and decode stored data.
:param filename: The name of the `*.wav` file to parse.
:type filename: str
:return: The decoded data as bytes.
:rtype: bytes
:raises ValueError: If audiocode is not possible to parse nor decode.
Example
-------
>>> engine = AudioCodeEngine()
>>> decoded_data = engine.parse("encoded_audio.wav")
"""
# Read the WAV file
rate, data = wavfile.read(filename)
# Check if audio is stereo and convert to mono if necessary
if len(data.shape) > 1:
data = data.mean(axis=1)
# Calculate the number of samples for each bit duration
samples_per_bit = int(self.sample_rate * self.bit_duration)
# Prepare a list to hold the detected bits
detected_bits = []
# Analyze the audio in chunks
for i in range(0, len(data), samples_per_bit):
if i + samples_per_bit > len(data):
break # Avoid out of bounds
# Extract the current chunk of audio
chunk = data[i : i + samples_per_bit] # noqa: E203
# Perform FFT on the chunk
fourier = np.fft.fft(chunk)
frequencies = np.fft.fftfreq(len(fourier), 1 / rate)
# Get the magnitudes and filter out positive frequencies
magnitudes = np.abs(fourier)
positive_frequencies = frequencies[: len(frequencies) // 2]
positive_magnitudes = magnitudes[: len(magnitudes) // 2]
# Find the peak frequency
peak_freq = positive_frequencies[np.argmax(positive_magnitudes)]
# Determine if the peak frequency corresponds to a '1' or '0'
if abs(peak_freq - self.f1) < 50: # Frequency for '1'
detected_bits.append(1)
elif abs(peak_freq - self.f0) < 50: # Frequency for '0'
detected_bits.append(0)
dbg_bits: str = "".join([str(bit) for bit in detected_bits])
logger.debug(f"detected bits : count={len(dbg_bits)}, {dbg_bits}")
return bits_to_bytes(detected_bits)
######################################
# Public functions
[docs]
def beep(duration: float = 2.0, async_: bool = False):
"""
Play a beep sound for the given duration.
:param duration: The duration of the beep in seconds. Default is 2.0 seconds.
:type duration: float
:param async_: If True, play the sound asynchronously. Default is False.
:type async_: bool
"""
logger.debug(f"beep(duration={duration})")
play_audio("A", duration, async_)
[docs]
def list_audio_devices():
"""
List all available audio devices.
This function queries and logs available audio devices from different libraries
(`psychopy`, `sounddevice`, `psytoolbox`, and `psychopy.backend_ptb`), and logs
the current default input and output devices.
The function does not return any value but logs detailed information about
each device to the standard logger.
:returns: None
:rtype: None
:raises: None
Example
-------
>>> list_audio_devices()
# Logs detailed information about available audio devices
"""
logger.debug("list_audio_devices()")
logger.debug("[psychopy]")
logger.debug(f"audioLib : {prefs.hardware['audioLib']}")
logger.debug(f"audioDevice : {prefs.hardware['audioDevice']}")
logger.debug("[sounddevice]")
devices = sd.query_devices() # Query all devices
for i, device in enumerate(devices):
logger.debug(f"device [{i}] : {device['name']}")
default_device = sd.default.device # Get the current default input/output devices
logger.debug(f"default in : {default_device[0]}")
logger.debug(f"default out : {default_device[1]}")
logger.debug("[psytoolbox]")
for i, device in enumerate(audio.get_devices()):
logger.debug(f"device [{i}] : {device}")
logger.debug("[psychopy.backend_ptb]")
# TODO: investigate why only single out device listed from
# USB capture but default one is not shown
# logger.debug(sound.backend_ptb.getDevices())
def _play_audio_psychopy(
name: str,
duration: float = None,
volume: float = 0.8,
sample_rate: int = 44100,
async_: bool = False,
):
logger.debug(
f"_play_audio_psychopy(name={name}, duration={duration}, async_={async_})"
)
snd = None
if duration:
snd = sound.Sound(
name, secs=duration, sampleRate=sample_rate, stereo=True, volume=volume
)
else:
snd = sound.Sound(name, stereo=True, sampleRate=sample_rate, volume=volume)
logger.debug(f"Play audio '{snd.sound}' with psychopy {prefs.hardware['audioLib']}")
snd.play()
logger.debug(
f" sampleRate={snd.sampleRate}, duration={snd.getDuration()}, "
f"volume={snd.getVolume()}"
)
if not async_:
logger.debug("Waiting for audio to finish playing...")
core.wait(snd.getDuration())
logger.debug(f"Audio '{snd.sound}' has finished playing.")
def _play_audio_sd(
name: str,
duration: float = None,
volume: float = 0.8,
sample_rate: int = 44100,
async_: bool = False,
):
logger.debug(f"_play_audio_sd(name={name}, duration={duration}, async_={async_})")
data = name
if os.path.exists(name):
rate, signal = read(name)
logger.debug(f"Read audio file: {name}, rate={rate}")
# Convert from int16 to float32
# signal = signal.astype(np.float32) / 32767
data = signal
sd.play(data, samplerate=sample_rate)
[docs]
def play_audio(
name: str,
duration: float = None,
volume: float = 0.8,
sample_rate: int = 44100,
async_: bool = False,
):
"""
Play an audio file with specified parameters.
This function plays an audio file using the specified audio library.
It supports different libraries such as PsychoPy (using Sounddevice or PTB)
and Sounddevice.
:param name: The name of the audio file to play.
:type name: str
:param duration: The duration (in seconds) for which to play the audio.
If not specified, the audio will play in its entirety.
:type duration: float, optional
:param volume: The volume level to set for the audio. Should be a value
between 0.0 and 1.0, where 1.0 is the maximum volume.
Default is 0.8.
:type volume: float, optional
:param sample_rate: The sample rate (in Hz) for the audio playback.
Default is 44100.
:type sample_rate: int, optional
:param async_: Whether to play the audio asynchronously.
If set to `True`, the audio will play in the background.
Default is `False`.
:type async_: bool, optional
:raises ValueError: If the selected audio library is unsupported.
Example
-------
>>> play_audio("sound.wav", duration=5, volume=0.5)
# Plays the audio file "sound.wav" for 5 seconds at half-volume.
"""
logger.info(
f"play_audio: name={name}, duration={duration}, "
f"volume={volume}, sample_rate={sample_rate}, "
f"async_={async_}"
)
if (
_audio_lib == AudioLib.PSYCHOPY_SOUNDDEVICE
or _audio_lib == AudioLib.PSYCHOPY_PTB # noqa: W503
):
_play_audio_psychopy(name, duration, volume, sample_rate, async_)
elif _audio_lib == AudioLib.SOUNDDEVICE:
_play_audio_sd(name, duration, volume, sample_rate, async_)
else:
raise ValueError(f"Unsupported audio library: {_audio_lib}")
[docs]
def save_audiocode(
fname: str = None,
code_uint16: int = None,
code_uint32: int = None,
code_uint64: int = None,
code_str: str = None,
code_bytes: bytes = None,
code_duration: float = 0.5,
codec: AudioCodec = AudioCodec.FSK,
engine=None,
) -> (str, AudioCodeInfo):
"""
Save an audiocode to a file (`*.wav`).
This function saves an audiocode to a file using the specified encoding
engine and codec. The code can be provided in various formats, such as
unsigned integers (16, 32, 64 bits), a string, or bytes.
:param fname: The name of the file where the audio code will be saved.
If not provided, a temporary filename is generated.
:type fname: str, optional
:param code_uint16: The audio code as a 16-bit unsigned integer.
:type code_uint16: int, optional
:param code_uint32: The audio code as a 32-bit unsigned integer.
:type code_uint32: int, optional
:param code_uint64: The audio code as a 64-bit unsigned integer.
:type code_uint64: int, optional
:param code_str: The audio code as a string.
:type code_str: str, optional
:param code_bytes: The audio code as bytes.
:type code_bytes: bytes, optional
:param code_duration: The duration of the audio code in seconds,
used only for NFE codec ATM. Default is 0.5
seconds.
:type code_bytes: float, optional
:param codec: The audio codec to use for encoding the audio code.
Default is `AudioCodec.FSK`.
:type codec: AudioCodec, optional
:param engine: The encoding engine to use. If not specified, an
`AudioCodeEngine` is created using the provided
codec.
:type engine: AudioCodeEngine, optional
:returns: A tuple containing the file name where the audio code
was saved and the corresponding `AudioCodeInfo` object.
:rtype: tuple of (str, AudioCodeInfo)
:raises ValueError: If no code data is provided.
Example
-------
>>> save_audiocode(fname="code.wav", code_uint16=12345)
('code.wav', <AudioCodeInfo object>)
"""
logger.debug(f"save_audiocode(fname={fname}...)")
if not fname:
fname = tempfile.mktemp(
prefix=f"audiocode_{datetime.now().strftime('%Y%m%d_%H%M%S%f')}_",
suffix=".wav",
)
data = DataMessage()
if code_uint16 is not None:
data.set_uint16(code_uint16)
elif code_uint32 is not None:
data.set_uint32(code_uint32)
elif code_uint64 is not None:
data.set_uint64(code_uint64)
elif code_str:
data.set_str(code_str)
elif code_bytes:
data.set_bytes(code_bytes)
else:
raise ValueError("No code data provided.")
if not engine:
engine = AudioCodeEngine(codec=codec, nfe_duration=code_duration)
sci: AudioCodeInfo = engine.save(data, fname)
logger.debug(f" -> {fname}")
return (fname, sci)