# SPDX-FileCopyrightText: 2020-2026 ReproNim Team <info@repronim.org>
#
# SPDX-License-Identifier: MIT
"""
API to analyze video files recorded by reprostim-videocapture, along with
their corresponding log files and QR/audio metadata. It extracts key
information about each recording and produces a summary table (videos.tsv)
suitable for quality control, sharing, and further analysis.
"""
import csv
import fnmatch
import json
import logging
import os
import re
import shlex
import subprocess
import tempfile
import traceback
from datetime import datetime
from enum import Enum
from time import time
from typing import Dict, Generator, List, Optional, Set, Tuple
from filelock import FileLock, Timeout
from pydantic import BaseModel
from reprostim.qr.qr_parse import (
InfoSummary,
ParseContext,
ParseSummary,
VideoTimeInfo,
do_info_file,
do_parse,
)
# initialize the logger
# Note: all logs out to stderr
logger = logging.getLogger(__name__)
# logging.getLogger().addHandler(logging.StreamHandler(sys.stderr))
logger.debug(f"name={__name__}")
# Precalculated globals
# User@Host string
# UPDATED_BY = f"{getpass.getuser()}@{socket.gethostname()}"
# Global REPROSTIM-METADATA-JSON regex pattern
JSON_PATTERN = re.compile(r"REPROSTIM-METADATA-JSON: (.*) :REPROSTIM-METADATA-JSON")
# NB: move in future to audio package or tool?
[docs]
class AudioInfo(BaseModel):
"""Audio stream information extracted from the video file with ffprobe."""
bits_per_sample: Optional[int] = None # Bits per sample
channels: Optional[int] = None # Number of audio channels
codec: Optional[str] = None # Audio codec used
codec_long: Optional[str] = None # Audio codec detailed name
codec_rfc6381: Optional[str] = None # Audio codec in RFC 6381 format
duration_sec: Optional[float] = None # Duration in seconds
profile: Optional[str] = None # Audio codec profile (e.g., "LC")
sample_rate: Optional[int] = None # Sample rate in Hz
start_time: Optional[float] = None # Start time of the audio stream in seconds
tag_str: Optional[str] = None # Codec tag string (e.g., "[0][0][0][0]")
[docs]
class VideoInfo(BaseModel):
"""Video stream information extracted from the video file with ffprobe."""
bit_depth: Optional[int] = None # Bit depth per channel, 8, 10, 12, 16 etc.
codec: Optional[str] = None # Video codec used
codec_long: Optional[str] = None # Video codec detailed name
codec_rfc6381: Optional[str] = None # Video codec in RFC 6381 format
duration_sec: Optional[float] = None # Duration in seconds
fps: Optional[float] = None # Frames per second
height: Optional[int] = None # Video frame height in pixels
level: Optional[int] = None # Video codec level
pix_fmt: Optional[str] = None # Pixel format (e.g., "yuv420p")
profile: Optional[str] = None # Video codec profile (e.g., "High")
start_time: Optional[float] = None # Start time of the video stream in seconds
tag_str: Optional[str] = None # Codec tag string (e.g., "[0][0][0][0]")
width: Optional[int] = None # Video frame width in pixels
[docs]
class VaMode(str, Enum):
"""Video audit processing mode constants."""
FULL = "full"
"""Process all files, overwrite existing records in TSV
completely."""
INCREMENTAL = "incremental"
"""Process only new files not present in existing TSV and
keep existing records."""
FORCE = "force"
"""Force redo/overwrite specified records in TSV"""
RERUN_FOR_NA = "rerun-for-na"
"""Process only records with 'n/a' values in existing TSV
in columns that are usually filled by external tools.
Intended for run external slow tools like detect-noscreen
or qr-parser."""
RESET_TO_NA = "reset-to-na"
"""Reset specified columns to 'n/a' in existing TSV.
Intended to clear results of external tools like
detect-noscreen or qr-parser to rerun them from scratch."""
[docs]
class VaRecord(BaseModel):
"""A single record in the videos.tsv audit summary."""
# File info
path: str = "n/a" # Path to the .mkv video file
present: bool = False # Whether the file is present
complete: bool = False # Whether the recording was completed
# and end timestamp is present
name: str = "n/a" # Short base name of the file
# Start and end timestamps
start_date: str = "n/a"
start_time: str = "n/a"
end_date: str = "n/a"
end_time: str = "n/a"
# Video detection info
video_res_detected: str = "n/a" # e.g., "1920x1080"
video_fps_detected: str = "n/a"
video_dur_detected: str = "n/a" # video duration in seconds
# based on timestamps
video_res_recorded: str = "n/a" # e.g., "1920x1080"
video_fps_recorded: str = "n/a"
video_dur_recorded: str = "n/a" # video duration in seconds
video_size_mb: str = "n/a" # size of the video file in MB
video_rate_mbpm: str = "n/a" # bitrate in mbpm
# Audio info
audio_sr: str = "n/a" # sample rate
audio_dur: str = "n/a" # audio stream duration in seconds
# Duration
duration: str = "n/a" # seconds
duration_h: str = "n/a" # human-readable, e.g., "01:23:45"
# Analysis info
no_signal_frames: str = "n/a" # number of frames with no signal, or %
qr_records_number: str = "n/a" # number of detected QR codes
# Coherence check
file_log_coherent: bool = False # whether video/audio info matches extracted
# Update info
no_signal_updated_on: str = (
"n/a" # provide separate timestamps for nosignal ext tool
)
qr_updated_on: str = "n/a" # provide separate timestamps for qr ext tool
updated_on: str = "n/a" # last updated timestamp for basic internal processing
# updated_by: str = "n/a"
[docs]
class VaSource(str, Enum):
"""Video audit source constants."""
ALL = "all"
"""Run all available audit sources."""
INTERNAL = "internal"
"""Basic and default behaviour to process quickly video files
using only mediainfo and logs metadata."""
NOSIGNAL = "nosignal"
"""Process video files to detect no-signal frames. This is slow process and
can take some time depending on the video length."""
QR = "qr"
"""Process video files to extract QR codes metadata. This is very slow
process and time is almost the same as video duration at this moment."""
[docs]
class VaContext(BaseModel):
"""Context for video audit processing."""
c_internal: Optional[int] = 0
"""Count of files processed with INTERNAL source"""
c_nosignal: Optional[int] = 0
"""Count of files processed with NOSIGNAL source"""
c_qr: Optional[int] = 0
"""Count of files processed with QR source"""
log_level: Optional[str] = None
"""Logging level to be used in external tool, one of DEBUG,
INFO, WARNING, ERROR, CRITICAL (default: None)"""
max_counter: Optional[int] = -1
"""Max number of records to process or -1 for
unlimited (default: -1)
"""
mode: Optional[VaMode] = VaMode.INCREMENTAL
"""Operation mode, one of VaMode values (default: INCREMENTAL)
"""
nosignal_data_dir: Optional[str] = "derivatives/nosignal"
"""Directory to store nosignal output data in JSON format.
"""
nosignal_log_dir: Optional[str] = "logs/nosignal"
"""Directory to store nosignal logs.
"""
nosignal_opts: Optional[List] = [
"--number-of-checks",
"100",
"--truncated",
"fixup",
"--invalid-timing",
"fixup",
"--threshold",
"1.01",
]
"""Additional options to pass to detect-noscreen tool.
"""
path_mask: Optional[str] = None
"""Optional path mask to filter video files based on their paths.
"""
qr_data_dir: Optional[str] = "derivatives/qr"
"""Directory to store qr-parse output data in JSON format.
"""
qr_log_dir: Optional[str] = "logs/qr"
"""Directory to store qr-parse logs.
"""
qr_opts: Optional[List] = []
"""Additional options to pass to qr-parse tool if any.
"""
recursive: Optional[bool] = False
"""Whether to scan directories recursively. Default: False
"""
skip_names: Optional[set] = None
"""Optional set of file base names to skip (for incremental mode)
"""
source: Optional[Set[VaSource]] = {VaSource.INTERNAL}
"""One of VaSource values to specify audit source (default: INTERNAL)
"""
updated_paths: Optional[set] = set()
"""Optional set of updated record paths
"""
[docs]
def check_coherent(vr: VaRecord) -> bool:
"""Check if the video record is coherent."""
if not vr.present:
logger.debug("check_coherent: File not present")
return False
if not vr.complete:
logger.debug("check_coherent: File not complete")
return False
if vr.start_date == "n/a" or vr.start_time == "n/a":
logger.debug("check_coherent: No start date/time")
return False
if vr.end_date == "n/a" or vr.end_time == "n/a":
logger.debug("check_coherent: No end date/time")
return False
if vr.video_res_detected == "n/a" or vr.video_fps_detected == "n/a":
logger.debug("check_coherent: No video res/fps detected")
return False
if vr.video_res_recorded == "n/a" or vr.video_fps_recorded == "n/a":
logger.debug("check_coherent: No video res/fps recorded")
return False
if vr.duration == "n/a" or vr.duration_h == "n/a":
logger.debug("check_coherent: No duration available")
return False
if vr.video_res_detected != vr.video_res_recorded:
logger.debug(
f"check_coherent: Video res mismatch: "
f"detected={vr.video_res_detected}, "
f"recorded={vr.video_res_recorded}"
)
return False
if int(float(vr.video_fps_detected)) != int(float(vr.video_fps_recorded)):
logger.debug(
f"check_coherent: Video fps mismatch: "
f"detected={vr.video_fps_detected}, "
f"recorded={vr.video_fps_recorded}"
)
return False
return True
[docs]
def check_ffprobe():
"""Check if ffprobe is installed and available in PATH.
:return: True if ffprobe is available, False otherwise
:rtype: bool
"""
try:
# Try running `ffprobe -version` to see if it's installed
subprocess.run(
["ffprobe", "-version"],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
check=True,
)
logger.debug("ffprobe is installed")
return True
except (subprocess.CalledProcessError, FileNotFoundError):
logger.error("Error: ffprobe is not installed")
return False
[docs]
def audio_codec_to_rfc6381(codec: str, profile: Optional[str]) -> Optional[str]:
"""Return the RFC 6381 codec string for an audio stream.
:param codec: Short codec name as reported by ffprobe (e.g. ``"aac"``).
:type codec: str
:param profile: Codec profile string (e.g. ``"LC"``), or ``None``.
:type profile: Optional[str]
:return: RFC 6381 string (e.g. ``"mp4a.40.2"``), or ``None`` when the
codec is not recognized.
:rtype: Optional[str]
"""
if codec == "aac":
aot = {"LC": 2, "HE-AAC": 5, "HE-AACv2": 29, "LD": 23, "ELD": 39}.get(
profile, 2
)
return f"mp4a.40.{aot}"
if codec in ("mp3", "mp2"):
return "mp4a.69"
if codec == "opus":
return "opus"
return None
[docs]
def video_codec_to_rfc6381(
codec: str, profile: Optional[str], level: Optional[int]
) -> Optional[str]:
"""Return the RFC 6381 codec string for a video stream.
:param codec: Short codec name as reported by ffprobe (e.g. ``"h264"``).
:type codec: str
:param profile: Codec profile string (e.g. ``"High"``), or ``None``.
:type profile: Optional[str]
:param level: Codec level integer as reported by ffprobe (e.g. ``42``),
or ``None``.
:type level: Optional[int]
:return: RFC 6381 string (e.g. ``"avc1.64002A"``), or ``None`` when the
codec is not recognised.
:rtype: Optional[str]
"""
if codec == "h264":
profile_idc = {
"Baseline": 0x42,
"Main": 0x4D,
"High": 0x64,
"High 10": 0x6E,
"High 4:2:2": 0x7A,
"High 4:4:4 Predictive": 0xF4,
}.get(profile, 0x42)
level_idc = level if level is not None else 0
return f"avc1.{profile_idc:02X}00{level_idc:02X}"
return None
# NB: move in future to audio package or tool?
[docs]
def get_audio_video_info_ffprobe(path: str) -> Tuple[AudioInfo, VideoInfo]:
"""Extract audio and video stream information from the video file using ffprobe.
Issues a single ffprobe call that reads all streams, then splits results
into an :class:`AudioInfo` (first audio stream) and a :class:`VideoInfo`
(first video stream).
:param path: Path to the video file (.mkv, .mp4, .avi)
:type path: str
:return: Tuple of (AudioInfo, VideoInfo) with extracted stream information.
Fields are ``None`` when the corresponding stream is absent or a
value cannot be parsed.
:rtype: Tuple[AudioInfo, VideoInfo]
"""
logger.debug(f"get_audio_video_info_ffprobe: {path}")
ai: AudioInfo = AudioInfo()
vi: VideoInfo = VideoInfo()
try:
cmd = [
"ffprobe",
"-v",
"quiet", # suppress logs
"-print_format",
"json", # JSON output
"-show_streams", # streams
path,
]
# cmd = ["ffprobe", "-h"]
logger.debug(f"run: {' '.join(cmd)}")
result = subprocess.run(cmd, capture_output=True, text=True, check=True)
# logger.debug(f"ffprobe -> {result.stdout}")
o = json.loads(result.stdout)
logger.debug(f"ffprobe output: {o}")
streams = o.get("streams", [])
audio_streams = [s for s in streams if s.get("codec_type") == "audio"]
video_streams = [s for s in streams if s.get("codec_type") == "video"]
if audio_streams:
s = audio_streams[0]
bps = s.get("bits_per_sample")
if bps is not None and bps != 0:
ai.bits_per_sample = bps
ai.channels = s.get("channels")
ai.codec = s.get("codec_name")
ai.codec_long = s.get("codec_long_name")
ai.profile = s.get("profile")
ai.sample_rate = int(s["sample_rate"]) if "sample_rate" in s else None
ai.start_time = float(s["start_time"]) if "start_time" in s else None
ai.tag_str = s.get("codec_tag_string")
ai.codec_rfc6381 = audio_codec_to_rfc6381(ai.codec or "", ai.profile)
if "tags" in s and "DURATION" in s["tags"]:
h, m, sec = s["tags"]["DURATION"].split(":")
ai.duration_sec = int(h) * 3600 + int(m) * 60 + float(sec)
elif "duration" in s:
ai.duration_sec = float(s["duration"])
if video_streams:
s = video_streams[0]
vi.codec = s.get("codec_name")
vi.codec_long = s.get("codec_long_name")
vi.profile = s.get("profile")
vi.level = s.get("level")
vi.width = s.get("width")
vi.height = s.get("height")
vi.pix_fmt = s.get("pix_fmt")
vi.start_time = float(s["start_time"]) if "start_time" in s else None
vi.tag_str = s.get("codec_tag_string")
bprs = s.get("bits_per_raw_sample")
if bprs is not None:
try:
v = int(bprs)
if v != 0:
vi.bit_depth = v
except (ValueError, TypeError):
pass
for fps_key in ("avg_frame_rate", "r_frame_rate"):
fps_str = s.get(fps_key)
if fps_str and fps_str != "0/0":
try:
num, den = fps_str.split("/")
den_int = int(den)
if den_int != 0:
vi.fps = float(int(num)) / den_int
break
except (ValueError, ZeroDivisionError):
pass
vi.codec_rfc6381 = video_codec_to_rfc6381(
vi.codec or "", vi.profile, vi.level
)
if "tags" in s and "DURATION" in s["tags"]:
h, m, sec = s["tags"]["DURATION"].split(":")
vi.duration_sec = int(h) * 3600 + int(m) * 60 + float(sec)
elif "duration" in s:
vi.duration_sec = float(s["duration"])
except FileNotFoundError:
logger.error("ffprobe is not installed or not in PATH")
except subprocess.CalledProcessError as e:
logger.error(f"ffprobe error: {e} {e.stdout} {e.stderr}")
return ai, vi
def _save_tsv(records: List[VaRecord], path_out: str):
fields = list(VaRecord.model_fields.keys())
with open(path_out, "w", newline="", encoding="utf-8") as f:
writer = csv.DictWriter(
f, fieldnames=fields, delimiter="\t", lineterminator="\n"
)
writer.writeheader()
for r in records:
writer.writerow(r.model_dump())
def _load_tsv(path_in: str) -> List[VaRecord]:
records = []
with open(path_in, "r", newline="", encoding="utf-8") as f:
reader = csv.DictReader(f, delimiter="\t")
for row in reader:
records.append(VaRecord(**row))
return records
_tsv_cache: Dict[str, List[VaRecord]] = {}
def _get_tsv_records(
path_in: str, cached: bool = False, use_lock: bool = True
) -> List[VaRecord]:
"""Load TSV records from a file, optionally using a module-level cache.
When ``cached=True`` and the file has already been loaded, returns the
cached list immediately without acquiring the file lock. If the cache is
cold, falls back to loading under a file lock and populates the cache.
When ``cached=False`` (default), always reloads from disk under a file
lock and refreshes the cache entry for ``path_in``.
When ``use_lock=False``, the file is read directly without acquiring the
advisory lock (dirty-read mode). This is safe for read-only access when
the lock file is owned by a different OS user.
:param path_in: Path to the TSV file to load.
:type path_in: str
:param cached: If ``True``, return cached data when available; if
``False``, always reload from disk and refresh the cache.
Default: ``False``.
:type cached: bool
:param use_lock: If ``True`` (default), acquire ``path_in.lock`` before
reading. If ``False``, skip the lock (dirty-read mode).
:type use_lock: bool
:return: List of VaRecord objects loaded from the TSV file.
:rtype: List[VaRecord]
"""
if cached and path_in in _tsv_cache:
return _tsv_cache[path_in]
if use_lock:
lock = FileLock(f"{path_in}.lock", timeout=5)
with lock:
records = _load_tsv(path_in)
else:
records = _load_tsv(path_in)
_tsv_cache[path_in] = records
return records
def _compare_rec_ts(r1: VaRecord, r2: VaRecord, field: str = "updated_on") -> int:
"""Compare two VaRecord objects based on their timestamp field.
:param r1: First VaRecord object
:type r1: VaRecord
:param r2: Second VaRecord object
:type r2: VaRecord
:param field: Field name to compare timestamps (default: "updated_on")
:type field: str
:return: -1 if r1 < r2, 0 if equal, 1 if r1 > r2
:rtype: int
"""
t1_str = getattr(r1, field)
t2_str = getattr(r2, field)
# quick check for equality to prevent ts parsing
if t1_str == t2_str:
return 0
if t1_str == "n/a" and t2_str == "n/a":
return 0
if t1_str == "n/a":
return -1
if t2_str == "n/a":
return 1
t1 = datetime.strptime(t1_str, "%Y-%m-%d %H:%M:%S.%f")
t2 = datetime.strptime(t2_str, "%Y-%m-%d %H:%M:%S.%f")
if t1 < t2:
return -1
elif t1 > t2:
return 1
else:
return 0
def _match_recs(recs1: List[VaRecord], recs2: List[VaRecord]) -> bool:
"""Returns True if records match, False on first mismatch"""
if len(recs1) != len(recs2):
return False
for r1, r2 in zip(recs1, recs2):
# logger.debug(f"r1: {r1}, r2: {r2}")
if r1.model_dump_json() != r2.model_dump_json():
return False
return True
def _merge_rec(ctx: VaContext, rec_cur: VaRecord, rec_new: VaRecord) -> VaRecord:
"""Merge two VaRecord objects based on the context mode.
Use updated_on, no_signal_updated_on, qr_updated_on timestamps to decide which
part of the record to keep.
"""
# provide addition check for record key
if rec_cur.name != rec_new.name:
raise ValueError(
f"_merge_rec: Record names do not match: "
f"{rec_cur.name} != {rec_new.name}"
)
c_internal: int = _compare_rec_ts(rec_new, rec_cur)
c_nosignal: int = _compare_rec_ts(rec_new, rec_cur, field="no_signal_updated_on")
c_qr: int = _compare_rec_ts(rec_new, rec_cur, field="qr_updated_on")
# when timestamps are the same, select the latest record:
if c_internal == 0 and c_nosignal == 0 and c_qr == 0:
return rec_new
# otherwise build merged record first by internal basic data:
rec_latest: VaRecord = rec_new if c_internal >= 0 else rec_cur
rec: VaRecord = rec_latest.model_copy()
# then select the latest nosignal data:
rec_latest = rec_new if c_nosignal >= 0 else rec_cur
rec.no_signal_frames = rec_latest.no_signal_frames
rec.no_signal_updated_on = rec_latest.no_signal_updated_on
# and select the latest qr data:
rec_latest = rec_new if c_qr >= 0 else rec_cur
rec.qr_records_number = rec_latest.qr_records_number
rec.qr_updated_on = rec_latest.qr_updated_on
return rec
def _merge_recs(
ctx: VaContext,
recs0: List[VaRecord], # old original videos.tsv records
recs_cur: List[VaRecord], # current latest transactional videos.tsv records
recs_new: List[VaRecord], # new records to merge based on recs0
):
# before any merging check if recs0 and recs_cur are the same and skip merge
if _match_recs(recs0, recs_cur):
logger.debug("_merge_recs: No changes in recs_cur since load, skipping merge")
return recs_new
# (A) when mode is [full] - use recs and override everything in recs_cur
if ctx.mode == VaMode.FULL:
logger.debug("_merge_recs: Full mode, overriding all records")
return recs_new
# (B) when mode is [force] - merge all records from recs into recs_cur
if ctx.mode == VaMode.FORCE:
logger.debug(
"_merge_recs: Force mode, merging all new records over existing ones"
)
if len(recs_cur) > 0:
merged_dict = {r.name: r for r in recs_cur}
merged_dict.update({r.name: r for r in recs_new})
recs_new = list(merged_dict.values())
return recs_new
# (C) when mode is [rerun-for-na] or [reset-to-na]
# merge only records from recs where related fields are updated
# and use timestamps
# or
# (D) when mode is [incremental] - add only new records from recs if timestamp
# is older than in recs_cur
if ctx.mode in {VaMode.RERUN_FOR_NA, VaMode.RESET_TO_NA, VaMode.INCREMENTAL}:
logger.debug(
f"_merge_recs: {ctx.mode} mode, merging selectively based on timestamps"
)
if len(recs_cur) > 0:
# first build dict of current records
merged_dict = {r.name: r for r in recs_cur}
# and then compare/merge with new records
for rec in recs_new:
# existing record, need to merge manually
if rec.name in merged_dict:
merged_dict[rec.name] = _merge_rec(ctx, merged_dict[rec.name], rec)
else:
# new record, just add it
merged_dict[rec.name] = rec
recs_new = list(merged_dict.values())
return recs_new
def _normalize_path(path: str) -> str:
# Note: revise if further normalization for record.path when needed
# (e.g., resolve symlinks, case sensitivity, etc)
return path
def _set_updated(ctx: VaContext, vr: VaRecord, field: str = "updated_on"):
ctx.updated_paths.add(vr.path)
setattr(vr, field, format_tts(time()))
# vr.updated_by = UPDATED_BY
# build dated path for output files with structure base_dir/<YYYY>/<MM>/filename.<ext>
def _build_dated_path(vr: VaRecord, base_dir: str, ext: str) -> str:
path: str = base_dir
if vr.start_date and vr.start_date != "n/a" and len(vr.start_date) == 10:
year, month, day = vr.start_date.split("-")
path = os.path.join(path, year)
path = os.path.join(path, month)
os.makedirs(path, exist_ok=True)
file_name = f"{os.path.basename(vr.path)}.{ext}"
return os.path.join(path, file_name)
[docs]
def do_audit_file(ctx: VaContext, path: str) -> Generator[VaRecord, None, None]:
"""Audit a single video file.
:param ctx: VaContext object with processing context
:type ctx: VaContext
:param path: Path to the video file
:type path: str
:return: Generator of VaRecord objects
:rtype: Generator[VaRecord, None, None]
"""
logger.debug(f"do_audit_file(path={path})")
# check max files limit
if 0 <= ctx.max_counter <= ctx.c_internal:
logger.debug(f"Max files limit reached: {ctx.max_counter}")
return
if ctx.skip_names and path in ctx.skip_names:
logger.info(f"Skipping file by path : {path}")
return
# filter by path mask
if ctx.path_mask and not fnmatch.fnmatch(path, ctx.path_mask):
logger.info(f"Skipping file by path mask : {path}")
return
vr: VaRecord = VaRecord()
try:
if os.path.exists(path):
vr.present = True
vr.path = _normalize_path(path)
vr.name = os.path.basename(path)
if ctx.skip_names and vr.name in ctx.skip_names:
logger.info(f"Skipping file by name : {vr.name}")
return
# try to find session_begin in log file
sb = find_metadata_json(path + ".log", "type", "session_begin")
if sb is not None:
logger.debug(f"sb: {sb}")
vr.video_fps_detected = sb["frameRate"]
vr.video_res_detected = f"{sb['cx']}x{sb['cy']}"
vi: InfoSummary
vti: VideoTimeInfo
vi, vti = do_info_file(path, True)
logger.debug(f"vi: {vi}")
logger.debug(f"vti: {vi}")
if vti is not None:
vr.start_date = format_date(vti.start_time)
vr.start_time = format_time(vti.start_time)
vr.end_date = format_date(vti.end_time)
vr.end_time = format_time(vti.end_time)
if vti.end_time is not None:
vr.complete = True
if vi is not None:
if vi.duration_sec is not None:
vr.video_dur_detected = str(round(vi.duration_sec, 1))
vr.duration = vr.video_dur_detected
vr.duration_h = format_duration(vi.duration_sec)
if vi.size_mb is not None:
vr.video_size_mb = str(vi.size_mb)
if vi.rate_mbpm is not None:
vr.video_rate_mbpm = str(vi.rate_mbpm)
# Note: just quick parse w/o QR processing so default context is used
ps: ParseSummary = next(do_parse(ParseContext(), path, True, True))
logger.info(f"ps: {ps}")
if ps is not None:
if (
ps.video_duration is not None
and 0 <= ps.video_duration < 604800.0
):
vr.video_dur_recorded = str(round(ps.video_duration, 1))
if vr.duration is None or vr.duration == "n/a":
video_duration = ps.video_duration
if video_duration is not None:
if 0 <= video_duration < 604800.0:
vr.duration = str(round(video_duration, 1))
vr.duration_h = format_duration(video_duration)
else:
video_duration = None
# vr.start_date = format_date(ps.video_isotime_start)
# vr.start_time = format_time(ps.video_isotime_start)
# vr.end_date = format_date(ps.video_isotime_end)
# vr.end_time = format_time(ps.video_isotime_end)
# if ps.video_isotime_end is not None:
# vr.complete = True
vr.video_res_recorded = (
f"{ps.video_frame_width}x{ps.video_frame_height}"
)
vr.video_fps_recorded = f"{round(ps.video_frame_rate, 1)}"
# try to get audio and video info
ai: AudioInfo
ai, _ = get_audio_video_info_ffprobe(path)
if ai is not None:
logger.debug(f"ai: {ai}")
if ai.sample_rate is not None:
vr.audio_sr = f"{ai.sample_rate}Hz"
if ai.bits_per_sample is not None:
vr.audio_sr += f" {ai.bits_per_sample}b"
if ai.channels is not None:
vr.audio_sr += f" {ai.channels}ch"
if ai.codec is not None:
vr.audio_sr += f" {ai.codec}"
if ai.duration_sec is not None:
vr.audio_dur = str(round(ai.duration_sec, 1))
if vr.duration == "n/a" or vr.duration is None:
vr.duration = vr.audio_dur
vr.duration_h = format_duration(ai.duration_sec)
# catch all exceptions
except Exception as e:
logger.error(f"Unhandled exception occurred when processing file: {path}")
logger.error(f"Details: {e}")
logger.error(traceback.format_exc()) # logs full stack trace
vr.file_log_coherent = check_coherent(vr)
_set_updated(ctx, vr)
ctx.c_internal += 1
logger.debug(f"c_internal -> {ctx.c_internal}")
yield vr
[docs]
def do_audit_dir(ctx: VaContext, path: str) -> Generator[VaRecord, None, None]:
"""Audit video files in directory with .mkv, .mp4, .avi extensions.
:param ctx: VaContext object with processing context
:type ctx: VaContext
:param path: Path to the directory
:type path: str
:return: Generator of VaRecord objects
:rtype: Generator[VaRecord, None, None]
"""
logger.debug(f"do_audit_dir(path={path}, recursive={ctx.recursive})")
# check max files limit
if 0 <= ctx.max_counter <= ctx.c_internal:
logger.debug(f"Max files limit reached: {ctx.max_counter}")
return
# check if path exists
if not os.path.exists(path):
logger.error(f"Path does not exist: {path}")
return
# check if path is a directory
if not os.path.isdir(path):
logger.error(f"Path is not a directory: {path}")
return
if ctx.skip_names and path in ctx.skip_names:
logger.info(f"Skipping directory by path : {path}")
return
for name in sorted(os.listdir(path)):
path2 = os.path.join(path, name)
if os.path.isfile(path2) and name.lower().endswith((".mkv", ".mp4", ".avi")):
logger.debug(f"Found video: {path2}")
yield from do_audit_file(ctx, path2)
elif ctx.recursive and os.path.isdir(path2):
logger.debug(f"Descending into directory: {path2}")
yield from do_audit_dir(ctx, path2)
[docs]
def do_audit_internal(
ctx: VaContext, paths_dir_or_file: List[str]
) -> Generator[VaRecord, None, None]:
"""Audit a single video file or all video files in a directory.
:param ctx: VaContext object with processing context
:type ctx: VaContext
:param paths_dir_or_file: List of path to the video file or directory
:type paths_dir_or_file: List[str]
:return: Generator of VaRecord objects
:rtype: Generator[VaRecord, None, None]
"""
logger.debug(
f"do_audit_internal(paths_dir_or_file={paths_dir_or_file}, "
f"recursive={ctx.recursive})"
)
# check source is INTERNAL or ALL
if not ctx.source & {VaSource.INTERNAL, VaSource.ALL}:
logger.debug("Skipping internal source as per context")
return
# prevent run for RERUN_FOR_NA mode
if ctx.mode == VaMode.RERUN_FOR_NA:
logger.debug("Skipping internal source for rerun-for-na mode")
return
# prevent run for RESET_TO_NA mode
if ctx.mode == VaMode.RESET_TO_NA:
logger.debug("Skipping internal source for reset-to-na mode")
return
for path in paths_dir_or_file:
if not os.path.exists(path):
logger.error(f"Path does not exist: {path}")
return
if os.path.isfile(path):
yield from do_audit_file(ctx, path)
elif os.path.isdir(path):
yield from do_audit_dir(ctx, path)
[docs]
def run_ext_nosignal(ctx: VaContext, vr: VaRecord) -> VaRecord:
"""Run detect-noscreen external tools on the specified VaRecord.
:param ctx: VaContext object with processing context
:type ctx: VaContext
:param vr: VaRecord object to process
:type vr: VaRecord
:return: Updated VaRecord object
:rtype: VaRecord
"""
logger.debug(
f"run_ext_nosignal(path={vr.path}, no_signal_frames={vr.no_signal_frames})"
)
# check mode is NOSIGNAL or ALL
if not ctx.source & {VaSource.NOSIGNAL, VaSource.ALL}:
logger.debug("Skipping nosignal source as per context")
return vr
# check max files limit
if 0 <= ctx.max_counter <= ctx.c_nosignal:
logger.debug(f"Max nosignal limit reached: {ctx.max_counter}")
return vr
# filter by path mask
if ctx.path_mask and not fnmatch.fnmatch(vr.path, ctx.path_mask):
logger.info(f"Skipping nosignal by path mask : {vr.path}")
return vr
# reset related fields to n/a if any
if ctx.mode == VaMode.RESET_TO_NA:
if vr.no_signal_frames != "n/a":
vr.no_signal_frames = "n/a"
logger.debug(f"Reset no_signal_frames -> {vr.no_signal_frames}")
_set_updated(ctx, vr, field="no_signal_updated_on")
ctx.c_nosignal += 1
logger.debug(f"c_nosignal -> {ctx.c_nosignal}")
return vr
if ctx.mode == VaMode.RERUN_FOR_NA:
if vr.no_signal_frames != "n/a":
logger.debug("Skipping nosignal source as per context (not n/a)")
return vr
# make sure data and logs dirs exist
os.makedirs(ctx.nosignal_data_dir, exist_ok=True)
os.makedirs(ctx.nosignal_log_dir, exist_ok=True)
# build paths
# base_name = os.path.basename(vr.path)
json_path = _build_dated_path(vr, ctx.nosignal_data_dir, "nosignal.json")
log_path = _build_dated_path(vr, ctx.nosignal_log_dir, "nosignal.log")
# prepare command-line to run
cmd = ["reprostim"]
# optionally add log level
if ctx.log_level is not None:
cmd += ["--log-level", ctx.log_level]
cmd += ["detect-noscreen"]
cmd += ctx.nosignal_opts
cmd += ["--output", json_path]
cmd += [vr.path]
logger.debug(f"cmd: {' '.join(cmd)}")
# use lock file
path_lock: str = f"{vr.path}.nosignal.lock"
logger.debug(f"use lock file : {path_lock}")
lock = FileLock(path_lock, timeout=5)
try:
with lock:
# run the command and capture output
try:
with open(log_path, "w", encoding="utf-8") as log_file:
result = subprocess.run(
cmd,
stdout=log_file,
stderr=subprocess.STDOUT,
text=True,
check=True,
)
logger.debug(
f"detect-noscreen completed with return code {result.returncode}"
)
except subprocess.CalledProcessError as e:
logger.error(f"detect-noscreen failed: {e} {e.stdout} {e.stderr}")
return vr
# now read the output JSON file
if os.path.exists(json_path):
try:
with open(json_path, "r", encoding="utf-8") as f:
data = json.load(f)
logger.debug(f"detect-noscreen output data: {data}")
if "nosignal_rate" in data:
vr.no_signal_frames = (
f"{float(data['nosignal_rate']) * 100:.1f}"
)
else:
vr.no_signal_frames = "0.0"
logger.debug(f"Set no_signal_frames -> {vr.no_signal_frames}")
_set_updated(ctx, vr, field="no_signal_updated_on")
ctx.c_nosignal += 1
logger.debug(f"c_nosignal -> {ctx.c_nosignal}")
except (json.JSONDecodeError, IOError) as e:
logger.error(f"Failed to read/parse nosignal JSON output: {e}")
except Timeout as el:
logger.error(f"File is already locked by another process ({vr.path}) : {el}")
return vr
[docs]
def run_ext_qr(ctx: VaContext, vr: VaRecord) -> VaRecord:
"""Run qr-parse external tool on the specified VaRecord.
:param ctx: VaContext object with processing context
:type ctx: VaContext
:param vr: VaRecord object to process
:type vr: VaRecord
:return: Updated VaRecord object
:rtype: VaRecord
"""
logger.debug(
f"run_ext_qr(path={vr.path}, qr_records_number={vr.qr_records_number})"
)
# check mode is QR or ALL
if not ctx.source & {VaSource.QR, VaSource.ALL}:
logger.debug("Skipping qr source as per context")
return vr
# check max files limit
if 0 <= ctx.max_counter <= ctx.c_qr:
logger.debug(f"Max qr limit reached: {ctx.max_counter}")
return vr
# filter by path mask
if ctx.path_mask and not fnmatch.fnmatch(vr.path, ctx.path_mask):
logger.info(f"Skipping qr by path mask : {vr.path}")
return vr
# reset related fields to n/a if any
if ctx.mode == VaMode.RESET_TO_NA:
if vr.qr_records_number != "n/a":
vr.qr_records_number = "n/a"
logger.debug(f"Reset qr_records_number -> {vr.qr_records_number}")
_set_updated(ctx, vr, field="qr_updated_on")
ctx.c_qr += 1
logger.debug(f"c_qr -> {ctx.c_qr}")
return vr
if ctx.mode == VaMode.RERUN_FOR_NA:
if vr.qr_records_number != "n/a":
logger.debug("Skipping qr source as per context (not n/a)")
return vr
# make sure data and logs dirs exist
os.makedirs(ctx.qr_data_dir, exist_ok=True)
os.makedirs(ctx.qr_log_dir, exist_ok=True)
# build paths
base_name = os.path.basename(vr.path)
jsonl_path = _build_dated_path(vr, ctx.qr_data_dir, "qrinfo.jsonl")
log_path = _build_dated_path(vr, ctx.qr_log_dir, "qrinfo.log")
ffmpeg_log_path = _build_dated_path(vr, ctx.qr_log_dir, "ffmpeg.log")
# use lock file
path_lock: str = f"{vr.path}.qr.lock"
logger.debug(f"use lock file : {path_lock}")
lock = FileLock(path_lock, timeout=5)
try:
with lock:
with tempfile.TemporaryDirectory() as tmpdir:
logger.debug(f"tmpdir : {tmpdir}")
tmp_video: str = os.path.join(tmpdir, base_name)
logger.debug(f"tmp_video : {tmp_video}")
# set default value first to prevent stale n/a data for ffmpeg
vr.qr_records_number = "-2"
_set_updated(ctx, vr, field="qr_updated_on")
try:
# convert to mkv without audio
# like: ffmpeg -i "$file" -an -c copy "$tmp_mkv_file"
logger.debug(f"ffmpeg_log_path : {ffmpeg_log_path}")
with open(
ffmpeg_log_path, "w", encoding="utf-8"
) as ffmpeg_log_file:
cmd = ["ffmpeg", "-i", vr.path, "-an", "-c", "copy", tmp_video]
logger.debug(f"cmd: {' '.join(cmd)}")
result = subprocess.run(
cmd,
stdout=ffmpeg_log_file,
stderr=subprocess.STDOUT,
text=True,
check=True,
)
logger.debug(
f"ffmpeg completed with return code {result.returncode}"
)
# execute qr-parse action like below:
#
# reprostim --log-level $LOG_LEVEL
# qr-parse "$tmp_mkv_file"
# >"$OUT_DIR"/"$base_name".qrinfo.jsonl
# 2>"$OUT_DIR"/"$base_name".qrinfo.log
#
# prepare command-line to run
cmd = ["reprostim"]
# optionally add log level
if ctx.log_level is not None:
cmd += ["--log-level", ctx.log_level]
cmd += ["qr-parse"]
cmd += ctx.qr_opts
cmd += [tmp_video]
logger.debug(f"cmd: {' '.join(cmd)}")
# run reprostim qr-parse command and capture output
logger.debug(f"log_path: {log_path}")
logger.debug(f"jsonl_path: {jsonl_path}")
with open(log_path, "w", encoding="utf-8") as log_file:
with open(jsonl_path, "w", encoding="utf-8") as jsonl_file:
result = subprocess.run(
cmd,
stdout=jsonl_file,
stderr=log_file,
text=True,
check=True,
)
logger.debug(
f"qr-parse completed with return code "
f"{result.returncode}"
)
# set default value first to prevent stale n/a data for qr-parse
vr.qr_records_number = "-1"
_set_updated(ctx, vr, field="qr_updated_on")
# now read the output JSON file
if os.path.exists(jsonl_path):
try:
with open(jsonl_path, "r", encoding="utf-8") as f:
for line in f:
record = json.loads(line)
if record.get("type") == "ParseSummary":
logger.debug(f"qr-parse summary: {record}")
vr.qr_records_number = str(
record.get("qr_count", "0")
)
logger.debug(
f"Set qr_records_number -> "
f"{vr.qr_records_number}"
)
_set_updated(ctx, vr, field="qr_updated_on")
ctx.c_qr += 1
logger.debug(f"c_qr -> {ctx.c_qr}")
break
except (json.JSONDecodeError, IOError) as e:
logger.error(f"Failed to read/parse qr JSON output: {e}")
else:
logger.info(f"No qr-parse output JSON file found: {jsonl_path}")
except subprocess.CalledProcessError as e:
logger.error(f"qr failed: {e} {e.stdout} {e.stderr}")
except Timeout as el:
logger.error(f"File is already locked by another process ({vr.path}) : {el}")
return vr
[docs]
def run_ext_all(ctx: VaContext, vr: VaRecord) -> VaRecord:
"""Run all external tools on the specified VaRecord.
:param ctx: VaContext object with processing context
:type ctx: VaContext
:param vr: VaRecord object to process
:type vr: VaRecord
:return: Updated VaRecord object
:rtype: VaRecord
"""
logger.debug(f"run_ext_all(path={vr.path})")
return run_ext_qr(ctx, run_ext_nosignal(ctx, vr))
[docs]
def do_audit(
ctx: VaContext, paths_dir_or_file: List[str]
) -> Generator[VaRecord, None, None]:
"""Generator that audits files and applies all external tools to
each record if any, depending on context and options.
"""
logger.debug(f"do_audit(paths_dir_or_file={paths_dir_or_file})")
for rec in do_audit_internal(ctx, paths_dir_or_file):
yield run_ext_all(ctx, rec)
[docs]
def do_ext(
ctx: VaContext, recs: List[VaRecord], paths_dir_or_file: List[str]
) -> Generator[VaRecord, None, None]:
"""Generator that runs external tools on existing records
depending on context and options.
"""
logger.debug("do_ext(...)")
has_filter = bool(paths_dir_or_file)
# create separate sets of dirs and files from paths_dir_or_file
path_dirs = set()
path_files = set()
for path in paths_dir_or_file:
if path == "*":
has_filter = False
break
if os.path.exists(path):
if os.path.isfile(path):
path_files.add(_normalize_path(path))
elif os.path.isdir(path):
path_dirs.add(_normalize_path(path))
else:
logger.error(f"Path does not exist: {path}")
for rec in recs:
# filter by paths when specified
f_match: bool = False
if has_filter:
# filter by file path
# logger.debug(f"rec.path : {rec.path}")
# logger.debug(f"path_files : {path_files}")
if not f_match and rec.path in path_files:
logger.debug(f"Matched ext record by PATH filter: {rec.path}")
f_match = True
# check path starts with one of dir names
if (
not f_match
and path_dirs
and any(rec.path.startswith(d) for d in path_dirs)
):
logger.debug(f"Matched ext record by DIR filter: {rec.path}")
f_match = True
if not f_match:
logger.debug(f"Skipping ext record by PATH / DIR filters: {rec.path}")
yield rec
continue
# process matched ext record
yield run_ext_all(ctx, rec)
[docs]
def get_file_video_audit(
path: str,
path_tsv: str = None,
cached: bool = False,
use_lock: bool = True,
) -> VaRecord:
"""Get a single VaRecord by auditing a single video file.
:param path: Path to the video file.
:type path: str
:param path_tsv: Optional path to existing TSV file to load existing
records from. Default: None.
:type path_tsv: str
:param cached: If ``True``, return cached TSV data when available instead
of reloading from disk. Passed through to
:func:`_get_tsv_records`. Default: ``False``.
:type cached: bool
:param use_lock: If ``True`` (default), acquire the advisory file lock
before reading ``path_tsv``. If ``False``, skip the lock
(dirty-read mode).
:type use_lock: bool
:return: VaRecord object.
:rtype: VaRecord
"""
logger.debug(
f"get_file_video_audit(path={path}, path_tsv={path_tsv},"
f" cached={cached}, use_lock={use_lock})"
)
# If path_tsv is provided and exists, try to load record from TSV
if path_tsv and os.path.exists(path_tsv):
logger.debug(f"Loading video audit record from TSV: {path_tsv}")
try:
records: List[VaRecord] = _get_tsv_records(
path_tsv, cached=cached, use_lock=use_lock
)
for rec in records:
if rec.path == path:
logger.debug(f"Found matching record in TSV for: {path}")
return rec
logger.debug(
f"No matching record found in TSV for: {path}, falling back to audit"
)
except Timeout:
logger.warning(
f"Timeout acquiring lock for {path_tsv}, falling back to audit"
)
except Exception as e:
logger.warning(
f"Error loading TSV file {path_tsv}: {e}, falling back to audit"
)
# Fall back to auditing the file directly
logger.debug(f"Auditing video file directly: {path}")
ctx: VaContext = VaContext(
skip_names=None,
c_internal=0,
c_nosignal=0,
c_qr=0,
log_level=os.environ["REPROSTIM_LOG_LEVEL"],
max_counter=1,
mode=VaMode.INCREMENTAL,
path_mask=None,
recursive=False,
source={VaSource.INTERNAL},
)
return next(do_audit_file(ctx, path), None)
def _parse_rec_datetime(date_str: str, time_str: str) -> Optional[datetime]:
"""Parse a VaRecord date/time string pair into a datetime object.
:param date_str: Date string in ``'YYYY-MM-DD'`` format, or ``'n/a'``.
:type date_str: str
:param time_str: Time string in ``'HH:MM:SS.mmm'`` format, or ``'n/a'``.
:type time_str: str
:return: Parsed datetime object, or ``None`` if either value is ``'n/a'``
or parsing fails.
:rtype: Optional[datetime]
"""
if date_str == "n/a" or time_str == "n/a":
return None
try:
return datetime.strptime(f"{date_str} {time_str}", "%Y-%m-%d %H:%M:%S.%f")
except ValueError:
return None
[docs]
def find_video_audit_by_timerange(
path_tsv: str,
start: datetime,
end: datetime,
cached: bool = False,
use_lock: bool = True,
) -> List[VaRecord]:
"""Find all VaRecords whose recording interval intersects the given
time range, sorted by record start time ascending.
Only records with both ``present`` and ``complete`` flags set to ``True``
are considered. A record intersects ``[start, end]`` when its start is
before ``end`` and its end is after ``start``.
:param path_tsv: Path to the TSV file to search.
:type path_tsv: str
:param start: Start of the query time range (inclusive).
:type start: datetime
:param end: End of the query time range (inclusive).
:type end: datetime
:param cached: If ``True``, return cached TSV data when available instead
of reloading from disk. Default: ``False``.
:type cached: bool
:param use_lock: If ``True`` (default), acquire the advisory lock before
loading from disk. If ``False``, skip the lock
(dirty-read mode). Default: ``True``.
:type use_lock: bool
:return: List of matching VaRecord objects sorted by start time ascending.
:rtype: List[VaRecord]
"""
logger.debug(
f"find_video_audit_by_timerange(path_tsv={path_tsv},"
f" start={start}, end={end}, cached={cached}, use_lock={use_lock})"
)
records = _get_tsv_records(path_tsv, cached=cached, use_lock=use_lock)
# Note: in the future, consider optimizing this by caching parsed start/end_ts
# in VaRecord and/or using a more efficient data structure for lookups
result: List[VaRecord] = []
for rec in records:
if not rec.present or not rec.complete:
continue
if rec.start_date == "n/a" or rec.start_time == "n/a":
continue
if rec.end_date == "n/a" or rec.end_time == "n/a":
continue
rec_start = _parse_rec_datetime(rec.start_date, rec.start_time)
if rec_start is None:
continue
if rec_start >= end:
continue
rec_end = _parse_rec_datetime(rec.end_date, rec.end_time)
if rec_end is not None and rec_end <= start:
continue
result.append(rec)
result.sort(key=lambda r: _parse_rec_datetime(r.start_date, r.start_time))
return result
[docs]
def do_main(
paths: List[str],
path_tsv: str,
recursive: bool = False,
mode: VaMode = VaMode.INCREMENTAL,
va_src: Set[VaSource] = None,
max_files: int = -1,
path_mask: str = None,
verbose: bool = False,
out_func=print,
nosignal_opts: str = None,
qr_opts: str = None,
):
"""The main function invoked by CLI to analyze video files with
logs and save the results to a TSV file.
:param paths: One or more paths to the video file or directory
:type paths: List[str]
:param path_tsv: Path to the output TSV file, default 'videos.tsv'.
:type path_tsv: str
:param recursive: Whether to scan directories recursively. Default: False
:type recursive: bool
:param mode: Operation mode, one of VaMode values (default: INCREMENTAL)
:type mode: VaMode
:param va_src: Set of VaSource values to specify audit sources.
Default: ``{VaSource.INTERNAL}``.
:type va_src: Set[VaSource]
:param max_files: Maximum number of video files/records to process.
Use -1 for unlimited (default: -1)
:type max_files: int
:param path_mask: Optional fnmatch-style mask to filter files
:type path_mask: str
:param verbose: Whether to print verbose JSON output
to stdout (default: False)
:type verbose: bool
:param out_func: Function to stdout results (default: print)
:type out_func: Callable[[str], None]
:param nosignal_opts: Optional string of extra options to pass to
detect-noscreen, parsed via shlex. Overrides
the built-in defaults when provided.
:type nosignal_opts: str
:param qr_opts: Optional string of extra options to pass to qr-parse,
parsed via shlex. No extra options by default.
:type qr_opts: str
:return: 0 on success, 1 on failure
:rtype: int
"""
if va_src is None:
va_src = {VaSource.INTERNAL}
logger.debug("video-audit command")
logger.debug(f"paths : {paths}")
logger.debug(f"path_tsv : {path_tsv}")
# double validate each path is valid and exists
for path in paths:
if not os.path.exists(path):
logger.error(f"Path does not exist: {path}")
return 1
if not check_ffprobe():
out_func(
"Error: ffprobe is not installed. Make sure" " ffmpeg package is installed."
)
logger.error(
"!!! ffprobe is required to parse audio but"
" not found. Make sure ffmpeg package is installed."
)
lock = FileLock(f"{path_tsv}.lock")
recs0: List[VaRecord] = []
# in case path_tsv exists, and mode is not FULL,
# load existing records
if mode != VaMode.FULL and os.path.exists(path_tsv):
logger.info(f"Loading existing TSV file: {path_tsv}")
with lock:
recs0 = _load_tsv(path_tsv)
logger.info(f"Loaded {len(recs0)} existing records from TSV")
# skip files set in case of INCREMENTAL mode
skip_names = None
if mode == VaMode.INCREMENTAL and len(recs0) > 0:
skip_names = {r.name for r in recs0}
# collect all records from generator into a list
# setup audit context first
ctx: VaContext = VaContext(
skip_names=skip_names,
c_internal=0,
c_nosignal=0,
c_qr=0,
log_level=os.environ["REPROSTIM_LOG_LEVEL"],
max_counter=max_files,
mode=mode,
nosignal_opts=(
shlex.split(nosignal_opts)
if nosignal_opts is not None
else VaContext.model_fields["nosignal_opts"].default
),
path_mask=path_mask,
qr_opts=(
shlex.split(qr_opts)
if qr_opts is not None
else VaContext.model_fields["qr_opts"].default
),
recursive=recursive,
source=va_src,
)
recs1: List[VaRecord] = list(do_audit(ctx, paths))
if verbose:
for vr in recs1:
out_func(f"{vr.model_dump_json()}")
# merge recs0 and recs1, with recs1 taking precedence by .name key
recs = recs1
if len(recs0) > 0:
merged_dict = {r.name: r for r in recs0}
merged_dict.update({r.name: r for r in recs1})
recs = list(merged_dict.values())
if mode in {VaMode.RERUN_FOR_NA, VaMode.RESET_TO_NA}:
recs = list(do_ext(ctx, recs, paths))
logger.info(f"Audited records count: {len(ctx.updated_paths)}")
logger.info(f"Saving results to TSV file : {path_tsv}")
logger.info(f"Total records to save : {len(recs)}")
# sort records by name
recs.sort(key=lambda r: r.name)
with lock:
recs_cur: List[VaRecord] = (
_load_tsv(path_tsv) if os.path.exists(path_tsv) else []
)
recs = _merge_recs(ctx, recs0, recs_cur, recs)
# sort records by name again
recs.sort(key=lambda r: r.name)
_save_tsv(recs, path_tsv)
return 0