# SPDX-FileCopyrightText: 2020-2026 ReproNim Team <info@repronim.org>
#
# SPDX-License-Identifier: MIT
"""
PsychoPy-based script to produce time calibration session
with embedded video QR-codes and audiocodes integrated
with `MRI`/`BIRCH`/`Magewell USB capture` devices.
API to parse `(*.mkv)` video files recorded by `reprostim-videocapture`
utility and extract embedded video media info, QR-codes and audiocodes into
JSONL format.
"""
import types
from dataclasses import dataclass
from time import sleep, time
t0 = time()
import importlib.util
import logging
import os
import shutil
import signal
from datetime import datetime
from enum import Enum
from ..__about__ import __version__
# setup logging
logger = logging.getLogger(__name__)
logger.info("reprostim timesync-stimuli script started")
#######################################################
# Constants
MAX_TR_TIMEOUT: float = 4.0
[docs]
class Mode(str, Enum):
"""Enum for the mode of the script operation."""
EVENT = "event"
"""Listen for keyboard events or MRI pulse and generate QR/audio codes."""
INTERVAL = "interval"
"""Produce QR/audio codes at regular intervals."""
# Just play a beep
BEEP = "beep"
"""Play a beep sound for audio test purposes."""
DEVICES = "devices"
"""List available audio devices to the console output."""
#######################################################
# Classes
[docs]
@dataclass
class SeriesData:
"""Class to hold series data for trigger events."""
num: int
"""Series number."""
tr_count: int = 0
"""Trigger events count in the series."""
tr_last_time: float = None
"""Last trigger event time."""
tr_timeout: float = MAX_TR_TIMEOUT
"""Trigger event max interval/timeout in seconds."""
#######################################################
# Functions
[docs]
def get_output_file_name(
prefix: str, start_ts: datetime, end_ts: datetime = None
) -> str:
"""
Generates an output file name based on the given prefix and timestamps.
This function creates a file name by formatting the provided start and
optional end timestamps into a string and appending them to the provided
prefix. The timestamps are converted to strings using the `get_ts_str`
function.
:param prefix: The prefix to use for the file name.
:type prefix: str
:param start_ts: The start timestamp to be included in the file name.
:type start_ts: datetime
:param end_ts: The optional end timestamp to be included in the file name.
:type end_ts: datetime, optional
:return: The generated file name.
:rtype: str
"""
start_str: str = get_ts_str(start_ts)
end_str: str = get_ts_str(end_ts) if end_ts else ""
return f"{prefix}{start_str}--{end_str}.log"
[docs]
def get_ts_str(ts: datetime) -> str:
"""Get a formatted string representation of a timestamp.
This function formats the provided timestamp into a string using
the specified format (`%Y.%m.%d-%H.%M.%S.%f`). The format includes
year, month, day, hour, minute, second, and microsecond.
:param ts: The timestamp to be formatted.
:type ts: datetime
:return: The formatted string representation of the timestamp.
:rtype: str
"""
ts_format = "%Y.%m.%d-%H.%M.%S.%f"
return f"{ts.strftime(ts_format)[:-3]}"
[docs]
def safe_remove(file_name: str):
"""Safely remove a file if it exists.
This function checks if the specified file exists and is a valid file.
If it is, it attempts to remove the file. If the removal fails,
it logs an error message.
:param file_name: The name of the file to be removed.
:type file_name: str
"""
if file_name:
if os.path.isfile(file_name): # Check if it's a file
try:
os.remove(file_name)
logger.debug(f"File {file_name} deleted successfully.")
except Exception as e:
logger.error(f"Failed delete file {file_name}: {e}")
else:
logger.warning(f"File {file_name} does not exist or is not a valid file.")
[docs]
def store_audiocode(audio_file: str, audio_data: int, logfn: str):
"""Store the audio code data in to the standalone `*.wav` file.
:param audio_file: The path to the audio file to be copied.
:type audio_file: str
:param audio_data: The audio data used in audio code.
:type audio_data: int
:param logfn: The current log file name.
:type logfn: str
"""
# if audio_file exits, copy it to {logfn}audiocode_{audio_data}.wav
if os.path.isfile(audio_file):
sfile = f"{os.path.splitext(logfn)[0]}audiocode_{audio_data}.wav"
shutil.copy(audio_file, sfile)
#######################################################
# Main script code
[docs]
def do_init(logfn: str) -> bool:
"""
Initializes a log file.
:param logfn: The path to the log file to be checked.
:type logfn: str
:return: `True` if the log file does not exist and can be
initialized, `False` otherwise.
:rtype: bool
"""
if os.path.exists(logfn):
logger.error(f"Log file {logfn} already exists")
return False
return True
[docs]
def do_main(
mode: Mode,
logfn: str,
is_fullscreen: bool,
win_size: tuple[int, int],
display: int,
qr_scale: float,
qr_duration: float,
qr_async: bool,
audio_codec: str,
mute: bool,
ntrials: int,
duration: float,
interval: float,
keep_audiocode: bool,
out_func=print,
) -> int:
"""Main function to run the `timesync-stimuli` PsychoPy-based script.
This function initializes the PsychoPy environment, sets up the window,
and handles the main loop for displaying QR codes and audio codes.
It also handles keyboard events and manages the series of trigger events.
The function takes various parameters to configure the behavior of the script.
:param mode: The mode of operation (e.g., EVENT, INTERVAL, BEEP, DEVICES).
:type mode: Mode
:param logfn: The name of the log file to write to.
:type logfn: str
:param is_fullscreen: Whether to run the window in fullscreen mode.
:type is_fullscreen: bool
:param win_size: The size of the window (width, height) in pixels.
:type win_size: tuple[int, int]
:param display: The display number to use.
:type display: int
:param qr_scale: The scale factor for the QR code size.
:type qr_scale: float
:param qr_duration: The duration of the QR code display in seconds.
:type qr_duration: float
:param qr_async: Whether to display the QR code asynchronously.
:type qr_async: bool
:param audio_codec: The audio codec to use for the audio code.
:type audio_codec: str
:param mute: Whether to mute the audio output.
:type mute: bool
:param ntrials: The number of trials to run.
:type ntrials: int
:param duration: The duration of the experiment in seconds.
:type duration: float
:param interval: The interval between trials in seconds.
:type interval: float
:param keep_audiocode: Whether to keep the audio code file after use.
:type keep_audiocode: bool
:param out_func: The function to use for output (default is `print`).
:type out_func: callable
:return: 0 on success, -1 on error.
:rtype: int
"""
logger.info("main script started")
if not importlib.util.find_spec("psychopy"):
logger.error(
"Module 'psychopy' not found, re-install environment with [all] extras"
)
return -1
# late psychopy init
# setup psychopy logs
from psychopy import logging as pl
from reprostim.qr.psychopy import (
EventType,
QrCode,
QrConfig,
QrStim,
get_iso_time,
get_times,
to_json,
)
# psychopy logging doesn't support filtering by message content,
# so this is a patch to filter out flooding messages like "No keypress"
_pl_log_method = pl.root.log
def pl_filtered_log(self, message_, level, t=None, obj=None, **kwargs):
if "No keypress (maxWait exceeded)" in str(message_):
return
_pl_log_method(message_, level, t, obj, **kwargs)
pl.root.log = types.MethodType(pl_filtered_log, pl.root)
# pl.console.setLevel(pl.NOTSET)
pl.console.setLevel(pl.DEBUG)
def log(f, rec):
"""Log a QR record to a file.
This function takes a file handle and a record dictionary,
converts the record to a JSON string, and writes it to the file.
:param f: The file handle to write the log to.
:type f: file object
:param rec: The record dictionary to be logged.
:type rec: dict
"""
s = to_json(rec).rstrip()
f.write(s + os.linesep)
logger.debug(f"LOG {s}")
from psychopy import core, event, visual
# from psychopy.hardware import keyboard
# late audio init
from ..audio.audiocodes import (
AudioCodec,
AudioCodeInfo,
beep,
list_audio_devices,
play_audio,
save_audiocode,
)
sys_shutdown: bool = False
w_keys: list[str] = []
def check_keys(max_wait: float = 0) -> list[str]:
nonlocal w_keys
keys: list[str]
# use cached keys if any
if w_keys:
keys = w_keys
w_keys = []
return keys
if max_wait > 0:
keys = event.waitKeys(maxWait=max_wait)
else:
keys = event.getKeys()
logger.debug(f"keys={keys}")
return keys if keys else []
def on_signal(signum, frame):
logger.info(f"Received system signal: {signum}")
if signum in [2, 15]:
nonlocal sys_shutdown
logger.debug("setting sys_shutdown=True")
sys_shutdown = True
def series_begin(series_num: int) -> SeriesData:
sd = SeriesData(num=series_num, tr_count=0, tr_timeout=MAX_TR_TIMEOUT)
logger.debug(f"series begin: {sd.num}")
# log series begin event
nonlocal f
log(
f,
QrCode(
event=EventType.SERIES_START,
mode=mode,
logfn=logfn,
series_num=sd.num,
),
)
out_func(f"Series [{sd.num}] started")
return sd
def series_end(sd: SeriesData) -> SeriesData:
if sd:
logger.debug(f"series end: {sd.num}")
# log series end event
nonlocal f
log(
f,
QrCode(
event=EventType.SERIES_END,
mode=mode,
logfn=logfn,
series_num=sd.num,
trigger_count=sd.tr_count,
),
)
out_func(f"Series [{sd.num}] ended, trigger count: {sd.tr_count}")
return None
def wait_or_keys(
timeout: float = 0,
async_: bool = False,
break_keys=None,
) -> list[str]:
if async_:
# use manual wait loop
start_time = core.getTime()
# wait loop
while core.getTime() - start_time < timeout:
keys = event.getKeys(keyList=break_keys)
if keys:
return keys
core.wait(0.001) # reduce CPU usage and pass control to other threads
else:
core.wait(timeout)
return []
if mode == Mode.BEEP:
for _ in range(ntrials):
beep(interval * 0.5, async_=True)
sleep(interval)
return 0
if mode == Mode.DEVICES:
list_audio_devices()
return 0
# register signal hook
signal.signal(signal.SIGINT, on_signal)
signal.signal(signal.SIGTERM, on_signal)
audio_data: int = 0
audio_file: str = None
audio_info: AudioCodeInfo = None
f = open(logfn, "w")
qr_config: QrConfig = QrConfig(scale=qr_scale)
win = visual.Window(
fullscr=is_fullscreen,
title=f"ReproStim timesync-stimuli v{__version__}",
name="timesync-stimuli",
size=win_size,
screen=display,
)
logger.debug(f"win.winHandle class: {type(win.winHandle).__name__}")
win.winHandle.on_activate = lambda: out_func("Window activated")
win.winHandle.on_activate()
win.winHandle.on_deactivate = lambda: out_func("Window deactivated")
# win.winHandle.set_caption(win.title)
win.mouseVisible = False # hides the mouse pointer
# win.winHandle.activate()
if win.monitor:
logger.info(f"display [{display}] info:")
fr = win.getActualFrameRate()
logger.info(
f" {win.size[0]}x{win.size[1]} px, " f" {round(fr, 2)} Hz"
if fr
else " N/A Hz"
)
# log script started event
log(
f,
QrCode(
event=EventType.SESSION_START,
mode=mode,
logfn=logfn,
start_time=t0,
start_time_formatted=get_iso_time(t0),
),
)
# test audio first on startup
if not mute:
logger.info("check audio code and output...")
test_duration: float = 0.05
# decrease volume
test_volume: float = 0.05
a_file, a_info = save_audiocode(
code_uint16=0, codec=AudioCodec.NFE, code_duration=test_duration
)
logger.info(f" {a_info}")
play_audio(a_file, volume=test_volume, async_=True)
wait_or_keys(test_duration, async_=False)
safe_remove(a_file)
message = visual.TextStim(
win,
text="""Waiting for scanner trigger.\nInstructions
for Participant...""",
)
message.draw()
fixation = visual.TextStim(win, text="+")
visual.TextStim(win, text="", pos=(0, -0.7), height=0.05)
win.flip()
fixation.draw() # Change properties of existing stim
win.flip()
# spd = 0.500 # Stimulus Presentation Duration
# soa = 6.000 # Stimulus Onset Asynchrony
# iwt = 5 # Initial Wait Time between scanner trigger and first stimulus
# stim_images = []
# stim_names = []
# keys = [] # None received/expected
# kb = keyboard.Keyboard()
t_start = time()
t_end = t_start + duration if duration > 0 else None
logger.debug(f"warming time: {(t_start-t0):.6f} sec")
logger.debug(f"starting loop with {ntrials} trials...")
terminate: bool = False
series_num: int = 0
cur_series: SeriesData = None
for acqNum in range(ntrials):
# check the duration time limit
if t_end and time() > t_end:
out_func("Time is up!")
terminate = True
# exit processing loop if requested
if terminate:
break
logger.debug(f"trial {acqNum}")
# prepare audio code file if any
if not mute:
if keep_audiocode and audio_file:
store_audiocode(audio_file, audio_data, logfn)
safe_remove(audio_file)
audio_data = acqNum
audio_file, audio_info_ = save_audiocode(
code_uint16=audio_data, codec=audio_codec, code_duration=qr_duration
)
audio_info = audio_info_
logger.debug(f" {audio_info}")
if mode == Mode.EVENT:
out_func("Waiting for an event...")
while True:
if sys_shutdown:
break
# check the duration time limit
if t_end and time() > t_end:
out_func("Time is up!")
terminate = True
break
keys = check_keys(0.2)
# only break if 5 or exit keys are pressed
if (
keys
and len(keys) > 0
and (
"q" in keys
or "escape" in keys
or "5" in keys
or "num_5" in keys
)
):
break
# additional check the series if any expired
# inside the trigger pulse waiting loop
if (
cur_series
and cur_series.tr_count > 0
and (time() - cur_series.tr_last_time) > cur_series.tr_timeout
):
logger.debug("series expired")
cur_series = series_end(cur_series)
# break external loop/terminate
if terminate:
break
if sys_shutdown:
out_func("Shutdown timesync-stimuli...")
break
if cur_series and cur_series.tr_count > 0:
# calculate time delta from last impulse if any
dt: float = time() - cur_series.tr_last_time
if dt > cur_series.tr_timeout:
logger.debug(f"timed out after {dt} sec, renew series")
cur_series = series_end(cur_series)
else:
# after receiving two first consecutive triggers
# pulses -- take the temporal distance between
# them +50% as the next value of tr_timeout
if cur_series.tr_count == 1:
cur_series.tr_timeout = dt * 1.5
logger.debug(f"update tr_timeout: {cur_series.tr_timeout}")
elif mode == Mode.INTERVAL:
# keys = kb.getKeys(waitRelease=False)
keys = check_keys()
target_time = t_start + acqNum * interval
to_wait = target_time - time()
# sleep some part of it if long enough
if to_wait >= 0.2:
sleep(to_wait * 0.7)
# busy loop without sleep to not miss it
while time() < target_time:
sleep(0) # pass CPU to other threads
if sys_shutdown:
break
# 2nd break
if sys_shutdown:
out_func("Shutdown timesync-stimuli...")
break
else:
raise ValueError(mode)
# start series if not started
if not cur_series:
cur_series = series_begin(series_num)
series_num += 1
# trigger record,
rec = QrCode(
event=EventType.MRI_TRIGGER_RECEIVED, mode=mode, logfn=logfn, acqNum=acqNum
)
if cur_series:
rec["series_num"] = cur_series.num
if mode == Mode.INTERVAL:
rec["interval"] = interval
if not mute:
play_audio(audio_file, async_=True)
audio_time, audio_time_str = get_times()
rec["a_time"] = audio_time
rec["a_time_str"] = audio_time_str
rec["a_data"] = audio_data
rec["a_codec"] = audio_codec
rec["a_f0"] = audio_info.f0
rec["a_f1"] = audio_info.f1
rec["a_pre_delay"] = audio_info.pre_delay
rec["a_post_delay"] = audio_info.post_delay
rec["a_duration"] = audio_info.duration
if audio_codec == AudioCodec.NFE:
rec["a_freq"] = audio_info.nfe_freq
rec["a_df"] = audio_info.nfe_df
# NOTE: should we add codec info to the log?
# like f0, f1, sampleRate, bit_duration, duration, etc
tkeys = time()
rec.apply_keys(keys, tkeys)
qr = QrStim(win, rec, qr_config)
qr.draw()
win.flip()
rec.apply_times("time_flip", "time_flip_formatted")
w_keys = wait_or_keys(qr_duration, qr_async, ["5", "num_5", "escape", "q"])
fixation.draw()
win.flip()
rec.apply_times("prior_time_off", "prior_time_off_str")
log(f, rec)
out_func(
f"Trigger pulse: acq={acqNum}, "
f"series={cur_series.num if cur_series else 'N/A'}, "
f"keys={keys}"
)
# update trigger event series data
if cur_series:
cur_series.tr_count = cur_series.tr_count + 1
cur_series.tr_last_time = tkeys
if "q" in keys or "escape" in keys:
break
cur_series = series_end(cur_series)
f.close()
if keep_audiocode and audio_file:
store_audiocode(audio_file, audio_data, logfn)
# cleanup temporary audio code file if any
safe_remove(audio_file)
logger.info("main script finished")
return 0