Source code for reprostim.qr.psychopy

# SPDX-FileCopyrightText: 2020-2026 ReproNim Team <info@repronim.org>
#
# SPDX-License-Identifier: MIT

"""
API to generate visual QR-codes and audio codes
and embed them into PsychoPy scripts for fMRI experiments
"""
import json
import os
from dataclasses import dataclass
from datetime import datetime
from enum import Enum
from time import time
from typing import Any

from psychopy import core, visual
from pydantic import BaseModel, Field, model_validator
from qrcode import QRCode

# All audio-related imports are deferred until audio is enabled


#######################################################
# Constants

#######################################################
# Aux Functions


[docs] def get_iso_time(t): """Converts a timestamp into an ISO 8601 formatted string with local timezone. This function takes a timestamp in seconds and converts it into a datetime object with local timezone. :param t: The timestamp to be converted in seconds. :type t: float :return: The ISO 8601 formatted string with local timezone. :rtype: str """ return datetime.fromtimestamp(t).astimezone().isoformat()
[docs] def get_times(t=None): """Get the current time and its ISO formatted string. This function retrieves the current time in seconds since the epoch and formats it into an ISO 8601 string with local timezone. :param t: Optional timestamp to be converted. If not provided, the current time will be used. :return: A tuple containing the current time in seconds and its ISO formatted string. :rtype: tuple[float, str] """ if t is None: t = time() return t, get_iso_time(t)
[docs] def to_json(obj) -> str: """Convert an object to a JSON string. This function checks if the provided object has a `model_dump_json` method (for Pydantic v2) and uses it to convert the object to a JSON string. If the method is not available, it falls back to using the standard `json.dumps` function. :param obj: The object to be converted to a JSON string. :type obj: Any :return: The JSON string representation of the object. :rtype: str """ if hasattr(obj, "model_dump_json"): # Pydantic v2 return obj.model_dump_json() return json.dumps(obj)
####################################################### # Classes # Enum for the event types
[docs] class EventType(str, Enum): """Enum for known event types for QR code.""" SESSION_START = "started" # ?? "session_start" """Session start event.""" SESSION_END = "session_end" """Session end event.""" SERIES_START = "series_begin" # ?? "series_start" """Series start event.""" SERIES_END = "series_end" """Series end event.""" MRI_TRIGGER_WAITING = "mri_trigger_waiting" """Waiting for MRI trigger event.""" MRI_TRIGGER_RECEIVED = "trigger" # ?? "mri_trigger_received" """MRI trigger received event."""
# original QR code pseudo-data class def _QrCode_mkrec(**kwargs): """Create a basic QR record dictionary.""" t, tstr = get_times() kwargs.update( { "time": t, "time_formatted": tstr, } ) return kwargs
[docs] class QrCode(dict): """Class to hold QR code data. Dict-based QR code record.""" def __init__(self, event: EventType, **kwargs: Any): if not isinstance(event, EventType): raise TypeError(f"event must be an EventType, got {type(event)}") t, tstr = get_times() # Inject mandatory fields kwargs.setdefault("time", t) kwargs.setdefault("time_formatted", tstr) kwargs["event"] = event # Apply keys if provided implicitly if "keys" in kwargs: self.apply_keys(kwargs.pop("keys")) super().__init__(kwargs) # Convenience properties @property def event(self) -> EventType: """Type of the event""" return self["event"] @property def time(self) -> float: """Time of the event, measured in seconds since the Epoch""" return self["time"] @property def time_formatted(self) -> str: """ISO 8601 time of the event""" return self["time_formatted"]
[docs] def apply_keys(self, keys, keys_time=None): """Apply keys and timestamp to the QR code data. :param keys: The keys to be applied to the QR code data. :param keys_time: Optional timestamp for the keys. If not provided, the current time will be used. """ self["keys"] = keys self.apply_times("keys_time", "keys_time_str", keys_time)
[docs] def apply_times(self, t_prop: str, t_str_prop: str, t=None): """Apply custom timestamp to the QR code data. :param t_prop: The key under which to store the timestamp. :param t_str_prop: The key under which to store the ISO formatted timestamp. :param t: Optional timestamp to be applied. If not provided, the current time will be used. """ t_val, t_str = get_times(t) self[t_prop] = t_val self[t_str_prop] = t_str
class _QrCode_Pydantic(BaseModel): """Pydantic model proposal to hold QR code data.""" event: EventType time: float = Field(..., description="Time of the event, seconds since the Epoch") time_formatted: str = Field(..., description="ISO 8601 time of the event") model_config = { "extra": "allow", "frozen": False, # allow assignment } @model_validator(mode="before") @classmethod def inject_defaults(cls, data: Any): """Inject mandatory fields if missing.""" if isinstance(data, dict): t, tstr = get_times() data.setdefault("time", t) data.setdefault("time_formatted", tstr) return data def __getitem__(self, key: str): return getattr(self, key) def __setitem__(self, key: str, value: Any): setattr(self, key, value) def __contains__(self, key: str) -> bool: return hasattr(self, key) # Convenience properties (optional, since Pydantic gives direct attr access) @property def event_type(self) -> EventType: return self.event @property def timestamp(self) -> float: return self.time @property def timestamp_str(self) -> str: return self.time_formatted
[docs] @dataclass class QrConfig: """Class representing default QR and audio codes configuration and system properties.""" align: str = "center" """Alignment of the QR code, default is 'center' .""" anchor: str = "center" """Anchor position of the QR code, default is 'center' .""" audio_codec: Any = None """Audio codec to use for audio code playback, default is AudioCodec.FSK, can be an AudioCodec enum when audio is enabled.""" audio_data_field: str = "seq" """Field in the QR code data to encode as audio data, default is 'seq' .""" audio_enabled: bool = False """Flag to enable audio code playback, default is False .""" audio_sample_rate: int = 44100 """Audio sample rate for audio code playback, default is 44100 Hz .""" audio_volume: float = 0.8 """Audio playback volume. Should be a value between 0.0 and 1.0, where 1.0 is the maximum volume. Default is 0.8 .""" back_color: str = "white" """Background color of the QR code, default is 'white'. Use 'transparent' for opaque background .""" border: int = 2 """Border size of the QR code, default is 2 .""" duration: float = 0.5 """Duration to display the QR code in seconds, default is 0.5 .""" fill_color: str = "black" """Fill color of the QR code, default is 'black' .""" padding: int = 10 """Window padding of QR code, default is 10.""" pos = None """Position of the QR code center, default is (0.0, 0.0) .""" retina_scale: float = 1.0 """Retina scale factor for high-DPI displays, default is 1.0 .""" scale: float = 1.0 """Scale factor for the QR code size, default is 1.0 ."""
[docs] class QrStim(visual.ImageStim): """Class to represent/draw/play QR and audio code stimulus.""" def __init__( self, win, qr_code: QrCode, qr_config: QrConfig = None, **kwargs: Any, ): """Initialize the QR code stimulus. :param win: The PsychoPy window where the QR code will be displayed. :param qr_code: The QR code data to be encoded and displayed. :param qr_config: Configuration for the QR code appearance and position. :param kwargs: Additional keyword arguments for the PsychoPy ImageStim. """ self.qr_code = qr_code self.qr_config = QrConfig() if qr_config is None else qr_config # generate wav file with audio code if enabled and update qr_code data if self.qr_config.audio_enabled: # Lazy import - only load heavy audio processing when needed from reprostim.audio.audiocodes import AudioCodec, save_audiocode ac: AudioCodec = ( self.qr_config.audio_codec if self.qr_config.audio_codec is not None else AudioCodec.FSK ) self.audio_data = int(self.qr_code.get(self.qr_config.audio_data_field, 0)) # logging.debug(f"audio_data: {self.audio_data}") self.audio_file, self.audio_info = save_audiocode( code_uint16=self.audio_data, codec=ac, code_duration=self.qr_config.duration, ) audio_time, audio_time_str = get_times() # NOTE: for better accuracy, consider to specify time # close to QR code draw action in future self.qr_code["a_time"] = audio_time self.qr_code["a_time_str"] = audio_time_str self.qr_code["a_data"] = self.audio_data self.qr_code["a_codec"] = ac.value self.qr_code["a_f0"] = self.audio_info.f0 self.qr_code["a_f1"] = self.audio_info.f1 self.qr_code["a_pre_delay"] = self.audio_info.pre_delay self.qr_code["a_post_delay"] = self.audio_info.post_delay self.qr_code["a_duration"] = self.audio_info.duration if ac == AudioCodec.NFE: self.qr_code["a_freq"] = self.audio_info.nfe_freq self.qr_code["a_df"] = self.audio_info.nfe_df pos = self.qr_config.pos if self.qr_config.pos else (0.0, 0.0) image = self.make_qr(border=self.qr_config.border) size = ( image.size[0] * self.qr_config.scale, image.size[1] * self.qr_config.scale, ) # logging.debug(f"Image size={image.size}, scaled size={size}") super().__init__( win, image, anchor=self.qr_config.anchor, pos=pos, units="pix", size=size, **kwargs, ) dx = int((win.size[0] / self.qr_config.retina_scale - self.size[0]) / 2) dy = int((win.size[1] / self.qr_config.retina_scale - self.size[1]) / 2) x = pos[0] y = pos[1] if self.qr_config.align.startswith("left-"): x = -dx + self.qr_config.padding if self.qr_config.align.startswith("right-"): x = dx - self.qr_config.padding if self.qr_config.align.endswith("-bottom"): y = -dy + self.qr_config.padding if self.qr_config.align.endswith("-top"): y = dy - self.qr_config.padding # logging.debug("win size={win.size},qr size={self.size}, # pos={pos},dx={dx},dy={dy},x={x},y={y}") self.pos = (x, y) def __del__(self): """Cleanup temporary audio file if created.""" if self.qr_config.audio_enabled and self.audio_file: try: os.remove(self.audio_file) except Exception: pass # override draw to play audio if enabled
[docs] def draw(self, win=None): """Draw the QR code stimulus and play audio code if enabled. :param win: Optional PsychoPy window where the QR code will be displayed. If not provided, the window used during initialization will be used. """ super().draw(win) self.play_audio()
[docs] def make_qr(self, data=None, **kwargs): """Generate the QR code image from the QR code data. :param data: Optional data to encode in the QR code. If not provided, the QR code will be generated from the `qr_code` attribute. :param kwargs: Additional keyword arguments for the QRCode constructor. :return: The generated QR code image. """ qr = QRCode(**kwargs) qr.add_data(data if data else to_json(self.qr_code)) return qr.make_image( fill_color=self.qr_config.fill_color, back_color=self.qr_config.back_color, )
[docs] def play_audio(self): """Play the audio code if audio is enabled.""" if self.qr_config.audio_enabled and self.audio_file: # Lazy import - only load heavy audio playback when needed from reprostim.audio.audiocodes import play_audio play_audio( self.audio_file, sample_rate=self.qr_config.audio_sample_rate, volume=self.qr_config.audio_volume, async_=True, )
[docs] def wait(self): """Wait for the duration specified in the QR code configuration.""" core.wait(self.qr_config.duration)