Source code for reprostim.qr.video_audit

# SPDX-FileCopyrightText: 2020-2026 ReproNim Team <info@repronim.org>
#
# SPDX-License-Identifier: MIT

"""
API to analyze video files recorded by reprostim-videocapture, along with
their corresponding log files and QR/audio metadata. It extracts key
information about each recording and produces a summary table (videos.tsv)
suitable for quality control, sharing, and further analysis.
"""

import csv
import fnmatch
import json
import logging
import os
import re
import shlex
import subprocess
import tempfile
import traceback
from datetime import datetime
from enum import Enum
from time import time
from typing import Dict, Generator, List, Optional, Set, Tuple

from filelock import FileLock, Timeout
from pydantic import BaseModel

from reprostim.qr.qr_parse import (
    InfoSummary,
    ParseContext,
    ParseSummary,
    VideoTimeInfo,
    do_info_file,
    do_parse,
)

# initialize the logger
# Note: all logs out to stderr
logger = logging.getLogger(__name__)
# logging.getLogger().addHandler(logging.StreamHandler(sys.stderr))
logger.debug(f"name={__name__}")


# Precalculated globals

# User@Host string
# UPDATED_BY = f"{getpass.getuser()}@{socket.gethostname()}"

# Global REPROSTIM-METADATA-JSON regex pattern
JSON_PATTERN = re.compile(r"REPROSTIM-METADATA-JSON: (.*) :REPROSTIM-METADATA-JSON")


# NB: move in future to audio package or tool?
[docs] class AudioInfo(BaseModel): """Audio stream information extracted from the video file with ffprobe.""" bits_per_sample: Optional[int] = None # Bits per sample channels: Optional[int] = None # Number of audio channels codec: Optional[str] = None # Audio codec used codec_long: Optional[str] = None # Audio codec detailed name codec_rfc6381: Optional[str] = None # Audio codec in RFC 6381 format duration_sec: Optional[float] = None # Duration in seconds profile: Optional[str] = None # Audio codec profile (e.g., "LC") sample_rate: Optional[int] = None # Sample rate in Hz start_time: Optional[float] = None # Start time of the audio stream in seconds tag_str: Optional[str] = None # Codec tag string (e.g., "[0][0][0][0]")
[docs] class VideoInfo(BaseModel): """Video stream information extracted from the video file with ffprobe.""" bit_depth: Optional[int] = None # Bit depth per channel, 8, 10, 12, 16 etc. codec: Optional[str] = None # Video codec used codec_long: Optional[str] = None # Video codec detailed name codec_rfc6381: Optional[str] = None # Video codec in RFC 6381 format duration_sec: Optional[float] = None # Duration in seconds fps: Optional[float] = None # Frames per second height: Optional[int] = None # Video frame height in pixels level: Optional[int] = None # Video codec level pix_fmt: Optional[str] = None # Pixel format (e.g., "yuv420p") profile: Optional[str] = None # Video codec profile (e.g., "High") start_time: Optional[float] = None # Start time of the video stream in seconds tag_str: Optional[str] = None # Codec tag string (e.g., "[0][0][0][0]") width: Optional[int] = None # Video frame width in pixels
[docs] class VaMode(str, Enum): """Video audit processing mode constants.""" FULL = "full" """Process all files, overwrite existing records in TSV completely.""" INCREMENTAL = "incremental" """Process only new files not present in existing TSV and keep existing records.""" FORCE = "force" """Force redo/overwrite specified records in TSV""" RERUN_FOR_NA = "rerun-for-na" """Process only records with 'n/a' values in existing TSV in columns that are usually filled by external tools. Intended for run external slow tools like detect-noscreen or qr-parser.""" RESET_TO_NA = "reset-to-na" """Reset specified columns to 'n/a' in existing TSV. Intended to clear results of external tools like detect-noscreen or qr-parser to rerun them from scratch."""
[docs] class VaRecord(BaseModel): """A single record in the videos.tsv audit summary.""" # File info path: str = "n/a" # Path to the .mkv video file present: bool = False # Whether the file is present complete: bool = False # Whether the recording was completed # and end timestamp is present name: str = "n/a" # Short base name of the file # Start and end timestamps start_date: str = "n/a" start_time: str = "n/a" end_date: str = "n/a" end_time: str = "n/a" # Video detection info video_res_detected: str = "n/a" # e.g., "1920x1080" video_fps_detected: str = "n/a" video_dur_detected: str = "n/a" # video duration in seconds # based on timestamps video_res_recorded: str = "n/a" # e.g., "1920x1080" video_fps_recorded: str = "n/a" video_dur_recorded: str = "n/a" # video duration in seconds video_size_mb: str = "n/a" # size of the video file in MB video_rate_mbpm: str = "n/a" # bitrate in mbpm # Audio info audio_sr: str = "n/a" # sample rate audio_dur: str = "n/a" # audio stream duration in seconds # Duration duration: str = "n/a" # seconds duration_h: str = "n/a" # human-readable, e.g., "01:23:45" # Analysis info no_signal_frames: str = "n/a" # number of frames with no signal, or % qr_records_number: str = "n/a" # number of detected QR codes # Coherence check file_log_coherent: bool = False # whether video/audio info matches extracted # Update info no_signal_updated_on: str = ( "n/a" # provide separate timestamps for nosignal ext tool ) qr_updated_on: str = "n/a" # provide separate timestamps for qr ext tool updated_on: str = "n/a" # last updated timestamp for basic internal processing
# updated_by: str = "n/a"
[docs] class VaSource(str, Enum): """Video audit source constants.""" ALL = "all" """Run all available audit sources.""" INTERNAL = "internal" """Basic and default behaviour to process quickly video files using only mediainfo and logs metadata.""" NOSIGNAL = "nosignal" """Process video files to detect no-signal frames. This is slow process and can take some time depending on the video length.""" QR = "qr" """Process video files to extract QR codes metadata. This is very slow process and time is almost the same as video duration at this moment."""
[docs] class VaContext(BaseModel): """Context for video audit processing.""" c_internal: Optional[int] = 0 """Count of files processed with INTERNAL source""" c_nosignal: Optional[int] = 0 """Count of files processed with NOSIGNAL source""" c_qr: Optional[int] = 0 """Count of files processed with QR source""" log_level: Optional[str] = None """Logging level to be used in external tool, one of DEBUG, INFO, WARNING, ERROR, CRITICAL (default: None)""" max_counter: Optional[int] = -1 """Max number of records to process or -1 for unlimited (default: -1) """ mode: Optional[VaMode] = VaMode.INCREMENTAL """Operation mode, one of VaMode values (default: INCREMENTAL) """ nosignal_data_dir: Optional[str] = "derivatives/nosignal" """Directory to store nosignal output data in JSON format. """ nosignal_log_dir: Optional[str] = "logs/nosignal" """Directory to store nosignal logs. """ nosignal_opts: Optional[List] = [ "--number-of-checks", "100", "--truncated", "fixup", "--invalid-timing", "fixup", "--threshold", "1.01", ] """Additional options to pass to detect-noscreen tool. """ path_mask: Optional[str] = None """Optional path mask to filter video files based on their paths. """ qr_data_dir: Optional[str] = "derivatives/qr" """Directory to store qr-parse output data in JSON format. """ qr_log_dir: Optional[str] = "logs/qr" """Directory to store qr-parse logs. """ qr_opts: Optional[List] = [] """Additional options to pass to qr-parse tool if any. """ recursive: Optional[bool] = False """Whether to scan directories recursively. Default: False """ skip_names: Optional[set] = None """Optional set of file base names to skip (for incremental mode) """ source: Optional[Set[VaSource]] = {VaSource.INTERNAL} """One of VaSource values to specify audit source (default: INTERNAL) """ updated_paths: Optional[set] = set() """Optional set of updated record paths """
[docs] def check_coherent(vr: VaRecord) -> bool: """Check if the video record is coherent.""" if not vr.present: logger.debug("check_coherent: File not present") return False if not vr.complete: logger.debug("check_coherent: File not complete") return False if vr.start_date == "n/a" or vr.start_time == "n/a": logger.debug("check_coherent: No start date/time") return False if vr.end_date == "n/a" or vr.end_time == "n/a": logger.debug("check_coherent: No end date/time") return False if vr.video_res_detected == "n/a" or vr.video_fps_detected == "n/a": logger.debug("check_coherent: No video res/fps detected") return False if vr.video_res_recorded == "n/a" or vr.video_fps_recorded == "n/a": logger.debug("check_coherent: No video res/fps recorded") return False if vr.duration == "n/a" or vr.duration_h == "n/a": logger.debug("check_coherent: No duration available") return False if vr.video_res_detected != vr.video_res_recorded: logger.debug( f"check_coherent: Video res mismatch: " f"detected={vr.video_res_detected}, " f"recorded={vr.video_res_recorded}" ) return False if int(float(vr.video_fps_detected)) != int(float(vr.video_fps_recorded)): logger.debug( f"check_coherent: Video fps mismatch: " f"detected={vr.video_fps_detected}, " f"recorded={vr.video_fps_recorded}" ) return False return True
[docs] def check_ffprobe(): """Check if ffprobe is installed and available in PATH. :return: True if ffprobe is available, False otherwise :rtype: bool """ try: # Try running `ffprobe -version` to see if it's installed subprocess.run( ["ffprobe", "-version"], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, check=True, ) logger.debug("ffprobe is installed") return True except (subprocess.CalledProcessError, FileNotFoundError): logger.error("Error: ffprobe is not installed") return False
[docs] def format_date(dt: datetime) -> str: """ Extract date from datetime as 'YYYY-MM-DD' string. Ignores timezone. :param dt: datetime object :type dt: datetime :return: formatted date string :rtype: str """ if dt is None: return "n/a" return dt.date().isoformat()
[docs] def format_duration(duration_sec: float) -> str: """ Convert duration in seconds to human-readable string HH:MM:SS or HH:MM:SS.mmm. If duration_sec is None, return "n/a" :param duration_sec: Duration in seconds :type duration_sec: float :return: Formatted duration string :rtype: str """ if duration_sec is None: return "n/a" hours = int(duration_sec // 3600) minutes = int((duration_sec % 3600) // 60) seconds = duration_sec % 60 # Include milliseconds if needed return f"{hours:02d}:{minutes:02d}:{seconds:06.3f}" # HH:MM:SS.mmm
[docs] def format_time(dt: datetime) -> str: """ Extract time from datetime as 'HH:MM:SS.mmm' string. Ignores timezone. :param dt: datetime object :type dt: datetime :return: formatted time string :rtype: str """ if dt is None: return "n/a" # Convert microseconds to milliseconds ms = dt.microsecond // 1000 return f"{dt.hour:02d}:{dt.minute:02d}:{dt.second:02d}.{ms:03d}"
[docs] def format_tts(t: float) -> str: """Converts a time.time() timestamp into an ISO 8601 formatted string with local timezone. This function takes a timestamp in seconds and converts it into a datetime object with local timezone. :param t: The timestamp to be converted in seconds. :type t: float :return: The ISO 8601 formatted string with local timezone. :rtype: str """ dt: datetime = datetime.fromtimestamp(t) return f"{format_date(dt)} {format_time(dt)}"
[docs] def iter_metadata_json(log_path: str) -> Generator[Dict, None, None]: """ Iterate over all REPROSTIM-METADATA-JSON lines in the log file. Yields parsed JSON dictionaries. :param log_path: Path to the log file :type log_path: str :return: Generator of parsed JSON dictionaries :rtype: Generator[Dict, None, None] """ if not os.path.exists(log_path): logger.error(f"Log file does not exist: {log_path}") return # file missing, generator yields nothing with open(log_path, "r", encoding="utf-8") as f: for line in f: match = JSON_PATTERN.search(line) if match: try: data = json.loads(match.group(1)) yield data except json.JSONDecodeError: logger.error(f"Failed to parse JSON in line: {line}") continue
[docs] def find_metadata_json(path: str, key: str, value) -> Optional[Dict]: """Find the first metadata JSON entry with a specific key-value pair. :param path: Path to the log file :type path: str :param key: Key to search for :type key: str :param value: Value to match :type value: Any :return: The first matching dictionary or None if not found :rtype: Optional[Dict] """ return next( (msg for msg in iter_metadata_json(path) if msg.get(key) == value), None )
[docs] def audio_codec_to_rfc6381(codec: str, profile: Optional[str]) -> Optional[str]: """Return the RFC 6381 codec string for an audio stream. :param codec: Short codec name as reported by ffprobe (e.g. ``"aac"``). :type codec: str :param profile: Codec profile string (e.g. ``"LC"``), or ``None``. :type profile: Optional[str] :return: RFC 6381 string (e.g. ``"mp4a.40.2"``), or ``None`` when the codec is not recognized. :rtype: Optional[str] """ if codec == "aac": aot = {"LC": 2, "HE-AAC": 5, "HE-AACv2": 29, "LD": 23, "ELD": 39}.get( profile, 2 ) return f"mp4a.40.{aot}" if codec in ("mp3", "mp2"): return "mp4a.69" if codec == "opus": return "opus" return None
[docs] def video_codec_to_rfc6381( codec: str, profile: Optional[str], level: Optional[int] ) -> Optional[str]: """Return the RFC 6381 codec string for a video stream. :param codec: Short codec name as reported by ffprobe (e.g. ``"h264"``). :type codec: str :param profile: Codec profile string (e.g. ``"High"``), or ``None``. :type profile: Optional[str] :param level: Codec level integer as reported by ffprobe (e.g. ``42``), or ``None``. :type level: Optional[int] :return: RFC 6381 string (e.g. ``"avc1.64002A"``), or ``None`` when the codec is not recognised. :rtype: Optional[str] """ if codec == "h264": profile_idc = { "Baseline": 0x42, "Main": 0x4D, "High": 0x64, "High 10": 0x6E, "High 4:2:2": 0x7A, "High 4:4:4 Predictive": 0xF4, }.get(profile, 0x42) level_idc = level if level is not None else 0 return f"avc1.{profile_idc:02X}00{level_idc:02X}" return None
# NB: move in future to audio package or tool?
[docs] def get_audio_video_info_ffprobe(path: str) -> Tuple[AudioInfo, VideoInfo]: """Extract audio and video stream information from the video file using ffprobe. Issues a single ffprobe call that reads all streams, then splits results into an :class:`AudioInfo` (first audio stream) and a :class:`VideoInfo` (first video stream). :param path: Path to the video file (.mkv, .mp4, .avi) :type path: str :return: Tuple of (AudioInfo, VideoInfo) with extracted stream information. Fields are ``None`` when the corresponding stream is absent or a value cannot be parsed. :rtype: Tuple[AudioInfo, VideoInfo] """ logger.debug(f"get_audio_video_info_ffprobe: {path}") ai: AudioInfo = AudioInfo() vi: VideoInfo = VideoInfo() try: cmd = [ "ffprobe", "-v", "quiet", # suppress logs "-print_format", "json", # JSON output "-show_streams", # streams path, ] # cmd = ["ffprobe", "-h"] logger.debug(f"run: {' '.join(cmd)}") result = subprocess.run(cmd, capture_output=True, text=True, check=True) # logger.debug(f"ffprobe -> {result.stdout}") o = json.loads(result.stdout) logger.debug(f"ffprobe output: {o}") streams = o.get("streams", []) audio_streams = [s for s in streams if s.get("codec_type") == "audio"] video_streams = [s for s in streams if s.get("codec_type") == "video"] if audio_streams: s = audio_streams[0] bps = s.get("bits_per_sample") if bps is not None and bps != 0: ai.bits_per_sample = bps ai.channels = s.get("channels") ai.codec = s.get("codec_name") ai.codec_long = s.get("codec_long_name") ai.profile = s.get("profile") ai.sample_rate = int(s["sample_rate"]) if "sample_rate" in s else None ai.start_time = float(s["start_time"]) if "start_time" in s else None ai.tag_str = s.get("codec_tag_string") ai.codec_rfc6381 = audio_codec_to_rfc6381(ai.codec or "", ai.profile) if "tags" in s and "DURATION" in s["tags"]: h, m, sec = s["tags"]["DURATION"].split(":") ai.duration_sec = int(h) * 3600 + int(m) * 60 + float(sec) elif "duration" in s: ai.duration_sec = float(s["duration"]) if video_streams: s = video_streams[0] vi.codec = s.get("codec_name") vi.codec_long = s.get("codec_long_name") vi.profile = s.get("profile") vi.level = s.get("level") vi.width = s.get("width") vi.height = s.get("height") vi.pix_fmt = s.get("pix_fmt") vi.start_time = float(s["start_time"]) if "start_time" in s else None vi.tag_str = s.get("codec_tag_string") bprs = s.get("bits_per_raw_sample") if bprs is not None: try: v = int(bprs) if v != 0: vi.bit_depth = v except (ValueError, TypeError): pass for fps_key in ("avg_frame_rate", "r_frame_rate"): fps_str = s.get(fps_key) if fps_str and fps_str != "0/0": try: num, den = fps_str.split("/") den_int = int(den) if den_int != 0: vi.fps = float(int(num)) / den_int break except (ValueError, ZeroDivisionError): pass vi.codec_rfc6381 = video_codec_to_rfc6381( vi.codec or "", vi.profile, vi.level ) if "tags" in s and "DURATION" in s["tags"]: h, m, sec = s["tags"]["DURATION"].split(":") vi.duration_sec = int(h) * 3600 + int(m) * 60 + float(sec) elif "duration" in s: vi.duration_sec = float(s["duration"]) except FileNotFoundError: logger.error("ffprobe is not installed or not in PATH") except subprocess.CalledProcessError as e: logger.error(f"ffprobe error: {e} {e.stdout} {e.stderr}") return ai, vi
def _save_tsv(records: List[VaRecord], path_out: str): fields = list(VaRecord.model_fields.keys()) with open(path_out, "w", newline="", encoding="utf-8") as f: writer = csv.DictWriter( f, fieldnames=fields, delimiter="\t", lineterminator="\n" ) writer.writeheader() for r in records: writer.writerow(r.model_dump()) def _load_tsv(path_in: str) -> List[VaRecord]: records = [] with open(path_in, "r", newline="", encoding="utf-8") as f: reader = csv.DictReader(f, delimiter="\t") for row in reader: records.append(VaRecord(**row)) return records _tsv_cache: Dict[str, List[VaRecord]] = {} def _get_tsv_records( path_in: str, cached: bool = False, use_lock: bool = True ) -> List[VaRecord]: """Load TSV records from a file, optionally using a module-level cache. When ``cached=True`` and the file has already been loaded, returns the cached list immediately without acquiring the file lock. If the cache is cold, falls back to loading under a file lock and populates the cache. When ``cached=False`` (default), always reloads from disk under a file lock and refreshes the cache entry for ``path_in``. When ``use_lock=False``, the file is read directly without acquiring the advisory lock (dirty-read mode). This is safe for read-only access when the lock file is owned by a different OS user. :param path_in: Path to the TSV file to load. :type path_in: str :param cached: If ``True``, return cached data when available; if ``False``, always reload from disk and refresh the cache. Default: ``False``. :type cached: bool :param use_lock: If ``True`` (default), acquire ``path_in.lock`` before reading. If ``False``, skip the lock (dirty-read mode). :type use_lock: bool :return: List of VaRecord objects loaded from the TSV file. :rtype: List[VaRecord] """ if cached and path_in in _tsv_cache: return _tsv_cache[path_in] if use_lock: lock = FileLock(f"{path_in}.lock", timeout=5) with lock: records = _load_tsv(path_in) else: records = _load_tsv(path_in) _tsv_cache[path_in] = records return records def _compare_rec_ts(r1: VaRecord, r2: VaRecord, field: str = "updated_on") -> int: """Compare two VaRecord objects based on their timestamp field. :param r1: First VaRecord object :type r1: VaRecord :param r2: Second VaRecord object :type r2: VaRecord :param field: Field name to compare timestamps (default: "updated_on") :type field: str :return: -1 if r1 < r2, 0 if equal, 1 if r1 > r2 :rtype: int """ t1_str = getattr(r1, field) t2_str = getattr(r2, field) # quick check for equality to prevent ts parsing if t1_str == t2_str: return 0 if t1_str == "n/a" and t2_str == "n/a": return 0 if t1_str == "n/a": return -1 if t2_str == "n/a": return 1 t1 = datetime.strptime(t1_str, "%Y-%m-%d %H:%M:%S.%f") t2 = datetime.strptime(t2_str, "%Y-%m-%d %H:%M:%S.%f") if t1 < t2: return -1 elif t1 > t2: return 1 else: return 0 def _match_recs(recs1: List[VaRecord], recs2: List[VaRecord]) -> bool: """Returns True if records match, False on first mismatch""" if len(recs1) != len(recs2): return False for r1, r2 in zip(recs1, recs2): # logger.debug(f"r1: {r1}, r2: {r2}") if r1.model_dump_json() != r2.model_dump_json(): return False return True def _merge_rec(ctx: VaContext, rec_cur: VaRecord, rec_new: VaRecord) -> VaRecord: """Merge two VaRecord objects based on the context mode. Use updated_on, no_signal_updated_on, qr_updated_on timestamps to decide which part of the record to keep. """ # provide addition check for record key if rec_cur.name != rec_new.name: raise ValueError( f"_merge_rec: Record names do not match: " f"{rec_cur.name} != {rec_new.name}" ) c_internal: int = _compare_rec_ts(rec_new, rec_cur) c_nosignal: int = _compare_rec_ts(rec_new, rec_cur, field="no_signal_updated_on") c_qr: int = _compare_rec_ts(rec_new, rec_cur, field="qr_updated_on") # when timestamps are the same, select the latest record: if c_internal == 0 and c_nosignal == 0 and c_qr == 0: return rec_new # otherwise build merged record first by internal basic data: rec_latest: VaRecord = rec_new if c_internal >= 0 else rec_cur rec: VaRecord = rec_latest.model_copy() # then select the latest nosignal data: rec_latest = rec_new if c_nosignal >= 0 else rec_cur rec.no_signal_frames = rec_latest.no_signal_frames rec.no_signal_updated_on = rec_latest.no_signal_updated_on # and select the latest qr data: rec_latest = rec_new if c_qr >= 0 else rec_cur rec.qr_records_number = rec_latest.qr_records_number rec.qr_updated_on = rec_latest.qr_updated_on return rec def _merge_recs( ctx: VaContext, recs0: List[VaRecord], # old original videos.tsv records recs_cur: List[VaRecord], # current latest transactional videos.tsv records recs_new: List[VaRecord], # new records to merge based on recs0 ): # before any merging check if recs0 and recs_cur are the same and skip merge if _match_recs(recs0, recs_cur): logger.debug("_merge_recs: No changes in recs_cur since load, skipping merge") return recs_new # (A) when mode is [full] - use recs and override everything in recs_cur if ctx.mode == VaMode.FULL: logger.debug("_merge_recs: Full mode, overriding all records") return recs_new # (B) when mode is [force] - merge all records from recs into recs_cur if ctx.mode == VaMode.FORCE: logger.debug( "_merge_recs: Force mode, merging all new records over existing ones" ) if len(recs_cur) > 0: merged_dict = {r.name: r for r in recs_cur} merged_dict.update({r.name: r for r in recs_new}) recs_new = list(merged_dict.values()) return recs_new # (C) when mode is [rerun-for-na] or [reset-to-na] # merge only records from recs where related fields are updated # and use timestamps # or # (D) when mode is [incremental] - add only new records from recs if timestamp # is older than in recs_cur if ctx.mode in {VaMode.RERUN_FOR_NA, VaMode.RESET_TO_NA, VaMode.INCREMENTAL}: logger.debug( f"_merge_recs: {ctx.mode} mode, merging selectively based on timestamps" ) if len(recs_cur) > 0: # first build dict of current records merged_dict = {r.name: r for r in recs_cur} # and then compare/merge with new records for rec in recs_new: # existing record, need to merge manually if rec.name in merged_dict: merged_dict[rec.name] = _merge_rec(ctx, merged_dict[rec.name], rec) else: # new record, just add it merged_dict[rec.name] = rec recs_new = list(merged_dict.values()) return recs_new def _normalize_path(path: str) -> str: # Note: revise if further normalization for record.path when needed # (e.g., resolve symlinks, case sensitivity, etc) return path def _set_updated(ctx: VaContext, vr: VaRecord, field: str = "updated_on"): ctx.updated_paths.add(vr.path) setattr(vr, field, format_tts(time())) # vr.updated_by = UPDATED_BY # build dated path for output files with structure base_dir/<YYYY>/<MM>/filename.<ext> def _build_dated_path(vr: VaRecord, base_dir: str, ext: str) -> str: path: str = base_dir if vr.start_date and vr.start_date != "n/a" and len(vr.start_date) == 10: year, month, day = vr.start_date.split("-") path = os.path.join(path, year) path = os.path.join(path, month) os.makedirs(path, exist_ok=True) file_name = f"{os.path.basename(vr.path)}.{ext}" return os.path.join(path, file_name)
[docs] def do_audit_file(ctx: VaContext, path: str) -> Generator[VaRecord, None, None]: """Audit a single video file. :param ctx: VaContext object with processing context :type ctx: VaContext :param path: Path to the video file :type path: str :return: Generator of VaRecord objects :rtype: Generator[VaRecord, None, None] """ logger.debug(f"do_audit_file(path={path})") # check max files limit if 0 <= ctx.max_counter <= ctx.c_internal: logger.debug(f"Max files limit reached: {ctx.max_counter}") return if ctx.skip_names and path in ctx.skip_names: logger.info(f"Skipping file by path : {path}") return # filter by path mask if ctx.path_mask and not fnmatch.fnmatch(path, ctx.path_mask): logger.info(f"Skipping file by path mask : {path}") return vr: VaRecord = VaRecord() try: if os.path.exists(path): vr.present = True vr.path = _normalize_path(path) vr.name = os.path.basename(path) if ctx.skip_names and vr.name in ctx.skip_names: logger.info(f"Skipping file by name : {vr.name}") return # try to find session_begin in log file sb = find_metadata_json(path + ".log", "type", "session_begin") if sb is not None: logger.debug(f"sb: {sb}") vr.video_fps_detected = sb["frameRate"] vr.video_res_detected = f"{sb['cx']}x{sb['cy']}" vi: InfoSummary vti: VideoTimeInfo vi, vti = do_info_file(path, True) logger.debug(f"vi: {vi}") logger.debug(f"vti: {vi}") if vti is not None: vr.start_date = format_date(vti.start_time) vr.start_time = format_time(vti.start_time) vr.end_date = format_date(vti.end_time) vr.end_time = format_time(vti.end_time) if vti.end_time is not None: vr.complete = True if vi is not None: if vi.duration_sec is not None: vr.video_dur_detected = str(round(vi.duration_sec, 1)) vr.duration = vr.video_dur_detected vr.duration_h = format_duration(vi.duration_sec) if vi.size_mb is not None: vr.video_size_mb = str(vi.size_mb) if vi.rate_mbpm is not None: vr.video_rate_mbpm = str(vi.rate_mbpm) # Note: just quick parse w/o QR processing so default context is used ps: ParseSummary = next(do_parse(ParseContext(), path, True, True)) logger.info(f"ps: {ps}") if ps is not None: if ( ps.video_duration is not None and 0 <= ps.video_duration < 604800.0 ): vr.video_dur_recorded = str(round(ps.video_duration, 1)) if vr.duration is None or vr.duration == "n/a": video_duration = ps.video_duration if video_duration is not None: if 0 <= video_duration < 604800.0: vr.duration = str(round(video_duration, 1)) vr.duration_h = format_duration(video_duration) else: video_duration = None # vr.start_date = format_date(ps.video_isotime_start) # vr.start_time = format_time(ps.video_isotime_start) # vr.end_date = format_date(ps.video_isotime_end) # vr.end_time = format_time(ps.video_isotime_end) # if ps.video_isotime_end is not None: # vr.complete = True vr.video_res_recorded = ( f"{ps.video_frame_width}x{ps.video_frame_height}" ) vr.video_fps_recorded = f"{round(ps.video_frame_rate, 1)}" # try to get audio and video info ai: AudioInfo ai, _ = get_audio_video_info_ffprobe(path) if ai is not None: logger.debug(f"ai: {ai}") if ai.sample_rate is not None: vr.audio_sr = f"{ai.sample_rate}Hz" if ai.bits_per_sample is not None: vr.audio_sr += f" {ai.bits_per_sample}b" if ai.channels is not None: vr.audio_sr += f" {ai.channels}ch" if ai.codec is not None: vr.audio_sr += f" {ai.codec}" if ai.duration_sec is not None: vr.audio_dur = str(round(ai.duration_sec, 1)) if vr.duration == "n/a" or vr.duration is None: vr.duration = vr.audio_dur vr.duration_h = format_duration(ai.duration_sec) # catch all exceptions except Exception as e: logger.error(f"Unhandled exception occurred when processing file: {path}") logger.error(f"Details: {e}") logger.error(traceback.format_exc()) # logs full stack trace vr.file_log_coherent = check_coherent(vr) _set_updated(ctx, vr) ctx.c_internal += 1 logger.debug(f"c_internal -> {ctx.c_internal}") yield vr
[docs] def do_audit_dir(ctx: VaContext, path: str) -> Generator[VaRecord, None, None]: """Audit video files in directory with .mkv, .mp4, .avi extensions. :param ctx: VaContext object with processing context :type ctx: VaContext :param path: Path to the directory :type path: str :return: Generator of VaRecord objects :rtype: Generator[VaRecord, None, None] """ logger.debug(f"do_audit_dir(path={path}, recursive={ctx.recursive})") # check max files limit if 0 <= ctx.max_counter <= ctx.c_internal: logger.debug(f"Max files limit reached: {ctx.max_counter}") return # check if path exists if not os.path.exists(path): logger.error(f"Path does not exist: {path}") return # check if path is a directory if not os.path.isdir(path): logger.error(f"Path is not a directory: {path}") return if ctx.skip_names and path in ctx.skip_names: logger.info(f"Skipping directory by path : {path}") return for name in sorted(os.listdir(path)): path2 = os.path.join(path, name) if os.path.isfile(path2) and name.lower().endswith((".mkv", ".mp4", ".avi")): logger.debug(f"Found video: {path2}") yield from do_audit_file(ctx, path2) elif ctx.recursive and os.path.isdir(path2): logger.debug(f"Descending into directory: {path2}") yield from do_audit_dir(ctx, path2)
[docs] def do_audit_internal( ctx: VaContext, paths_dir_or_file: List[str] ) -> Generator[VaRecord, None, None]: """Audit a single video file or all video files in a directory. :param ctx: VaContext object with processing context :type ctx: VaContext :param paths_dir_or_file: List of path to the video file or directory :type paths_dir_or_file: List[str] :return: Generator of VaRecord objects :rtype: Generator[VaRecord, None, None] """ logger.debug( f"do_audit_internal(paths_dir_or_file={paths_dir_or_file}, " f"recursive={ctx.recursive})" ) # check source is INTERNAL or ALL if not ctx.source & {VaSource.INTERNAL, VaSource.ALL}: logger.debug("Skipping internal source as per context") return # prevent run for RERUN_FOR_NA mode if ctx.mode == VaMode.RERUN_FOR_NA: logger.debug("Skipping internal source for rerun-for-na mode") return # prevent run for RESET_TO_NA mode if ctx.mode == VaMode.RESET_TO_NA: logger.debug("Skipping internal source for reset-to-na mode") return for path in paths_dir_or_file: if not os.path.exists(path): logger.error(f"Path does not exist: {path}") return if os.path.isfile(path): yield from do_audit_file(ctx, path) elif os.path.isdir(path): yield from do_audit_dir(ctx, path)
[docs] def run_ext_nosignal(ctx: VaContext, vr: VaRecord) -> VaRecord: """Run detect-noscreen external tools on the specified VaRecord. :param ctx: VaContext object with processing context :type ctx: VaContext :param vr: VaRecord object to process :type vr: VaRecord :return: Updated VaRecord object :rtype: VaRecord """ logger.debug( f"run_ext_nosignal(path={vr.path}, no_signal_frames={vr.no_signal_frames})" ) # check mode is NOSIGNAL or ALL if not ctx.source & {VaSource.NOSIGNAL, VaSource.ALL}: logger.debug("Skipping nosignal source as per context") return vr # check max files limit if 0 <= ctx.max_counter <= ctx.c_nosignal: logger.debug(f"Max nosignal limit reached: {ctx.max_counter}") return vr # filter by path mask if ctx.path_mask and not fnmatch.fnmatch(vr.path, ctx.path_mask): logger.info(f"Skipping nosignal by path mask : {vr.path}") return vr # reset related fields to n/a if any if ctx.mode == VaMode.RESET_TO_NA: if vr.no_signal_frames != "n/a": vr.no_signal_frames = "n/a" logger.debug(f"Reset no_signal_frames -> {vr.no_signal_frames}") _set_updated(ctx, vr, field="no_signal_updated_on") ctx.c_nosignal += 1 logger.debug(f"c_nosignal -> {ctx.c_nosignal}") return vr if ctx.mode == VaMode.RERUN_FOR_NA: if vr.no_signal_frames != "n/a": logger.debug("Skipping nosignal source as per context (not n/a)") return vr # make sure data and logs dirs exist os.makedirs(ctx.nosignal_data_dir, exist_ok=True) os.makedirs(ctx.nosignal_log_dir, exist_ok=True) # build paths # base_name = os.path.basename(vr.path) json_path = _build_dated_path(vr, ctx.nosignal_data_dir, "nosignal.json") log_path = _build_dated_path(vr, ctx.nosignal_log_dir, "nosignal.log") # prepare command-line to run cmd = ["reprostim"] # optionally add log level if ctx.log_level is not None: cmd += ["--log-level", ctx.log_level] cmd += ["detect-noscreen"] cmd += ctx.nosignal_opts cmd += ["--output", json_path] cmd += [vr.path] logger.debug(f"cmd: {' '.join(cmd)}") # use lock file path_lock: str = f"{vr.path}.nosignal.lock" logger.debug(f"use lock file : {path_lock}") lock = FileLock(path_lock, timeout=5) try: with lock: # run the command and capture output try: with open(log_path, "w", encoding="utf-8") as log_file: result = subprocess.run( cmd, stdout=log_file, stderr=subprocess.STDOUT, text=True, check=True, ) logger.debug( f"detect-noscreen completed with return code {result.returncode}" ) except subprocess.CalledProcessError as e: logger.error(f"detect-noscreen failed: {e} {e.stdout} {e.stderr}") return vr # now read the output JSON file if os.path.exists(json_path): try: with open(json_path, "r", encoding="utf-8") as f: data = json.load(f) logger.debug(f"detect-noscreen output data: {data}") if "nosignal_rate" in data: vr.no_signal_frames = ( f"{float(data['nosignal_rate']) * 100:.1f}" ) else: vr.no_signal_frames = "0.0" logger.debug(f"Set no_signal_frames -> {vr.no_signal_frames}") _set_updated(ctx, vr, field="no_signal_updated_on") ctx.c_nosignal += 1 logger.debug(f"c_nosignal -> {ctx.c_nosignal}") except (json.JSONDecodeError, IOError) as e: logger.error(f"Failed to read/parse nosignal JSON output: {e}") except Timeout as el: logger.error(f"File is already locked by another process ({vr.path}) : {el}") return vr
[docs] def run_ext_qr(ctx: VaContext, vr: VaRecord) -> VaRecord: """Run qr-parse external tool on the specified VaRecord. :param ctx: VaContext object with processing context :type ctx: VaContext :param vr: VaRecord object to process :type vr: VaRecord :return: Updated VaRecord object :rtype: VaRecord """ logger.debug( f"run_ext_qr(path={vr.path}, qr_records_number={vr.qr_records_number})" ) # check mode is QR or ALL if not ctx.source & {VaSource.QR, VaSource.ALL}: logger.debug("Skipping qr source as per context") return vr # check max files limit if 0 <= ctx.max_counter <= ctx.c_qr: logger.debug(f"Max qr limit reached: {ctx.max_counter}") return vr # filter by path mask if ctx.path_mask and not fnmatch.fnmatch(vr.path, ctx.path_mask): logger.info(f"Skipping qr by path mask : {vr.path}") return vr # reset related fields to n/a if any if ctx.mode == VaMode.RESET_TO_NA: if vr.qr_records_number != "n/a": vr.qr_records_number = "n/a" logger.debug(f"Reset qr_records_number -> {vr.qr_records_number}") _set_updated(ctx, vr, field="qr_updated_on") ctx.c_qr += 1 logger.debug(f"c_qr -> {ctx.c_qr}") return vr if ctx.mode == VaMode.RERUN_FOR_NA: if vr.qr_records_number != "n/a": logger.debug("Skipping qr source as per context (not n/a)") return vr # make sure data and logs dirs exist os.makedirs(ctx.qr_data_dir, exist_ok=True) os.makedirs(ctx.qr_log_dir, exist_ok=True) # build paths base_name = os.path.basename(vr.path) jsonl_path = _build_dated_path(vr, ctx.qr_data_dir, "qrinfo.jsonl") log_path = _build_dated_path(vr, ctx.qr_log_dir, "qrinfo.log") ffmpeg_log_path = _build_dated_path(vr, ctx.qr_log_dir, "ffmpeg.log") # use lock file path_lock: str = f"{vr.path}.qr.lock" logger.debug(f"use lock file : {path_lock}") lock = FileLock(path_lock, timeout=5) try: with lock: with tempfile.TemporaryDirectory() as tmpdir: logger.debug(f"tmpdir : {tmpdir}") tmp_video: str = os.path.join(tmpdir, base_name) logger.debug(f"tmp_video : {tmp_video}") # set default value first to prevent stale n/a data for ffmpeg vr.qr_records_number = "-2" _set_updated(ctx, vr, field="qr_updated_on") try: # convert to mkv without audio # like: ffmpeg -i "$file" -an -c copy "$tmp_mkv_file" logger.debug(f"ffmpeg_log_path : {ffmpeg_log_path}") with open( ffmpeg_log_path, "w", encoding="utf-8" ) as ffmpeg_log_file: cmd = ["ffmpeg", "-i", vr.path, "-an", "-c", "copy", tmp_video] logger.debug(f"cmd: {' '.join(cmd)}") result = subprocess.run( cmd, stdout=ffmpeg_log_file, stderr=subprocess.STDOUT, text=True, check=True, ) logger.debug( f"ffmpeg completed with return code {result.returncode}" ) # execute qr-parse action like below: # # reprostim --log-level $LOG_LEVEL # qr-parse "$tmp_mkv_file" # >"$OUT_DIR"/"$base_name".qrinfo.jsonl # 2>"$OUT_DIR"/"$base_name".qrinfo.log # # prepare command-line to run cmd = ["reprostim"] # optionally add log level if ctx.log_level is not None: cmd += ["--log-level", ctx.log_level] cmd += ["qr-parse"] cmd += ctx.qr_opts cmd += [tmp_video] logger.debug(f"cmd: {' '.join(cmd)}") # run reprostim qr-parse command and capture output logger.debug(f"log_path: {log_path}") logger.debug(f"jsonl_path: {jsonl_path}") with open(log_path, "w", encoding="utf-8") as log_file: with open(jsonl_path, "w", encoding="utf-8") as jsonl_file: result = subprocess.run( cmd, stdout=jsonl_file, stderr=log_file, text=True, check=True, ) logger.debug( f"qr-parse completed with return code " f"{result.returncode}" ) # set default value first to prevent stale n/a data for qr-parse vr.qr_records_number = "-1" _set_updated(ctx, vr, field="qr_updated_on") # now read the output JSON file if os.path.exists(jsonl_path): try: with open(jsonl_path, "r", encoding="utf-8") as f: for line in f: record = json.loads(line) if record.get("type") == "ParseSummary": logger.debug(f"qr-parse summary: {record}") vr.qr_records_number = str( record.get("qr_count", "0") ) logger.debug( f"Set qr_records_number -> " f"{vr.qr_records_number}" ) _set_updated(ctx, vr, field="qr_updated_on") ctx.c_qr += 1 logger.debug(f"c_qr -> {ctx.c_qr}") break except (json.JSONDecodeError, IOError) as e: logger.error(f"Failed to read/parse qr JSON output: {e}") else: logger.info(f"No qr-parse output JSON file found: {jsonl_path}") except subprocess.CalledProcessError as e: logger.error(f"qr failed: {e} {e.stdout} {e.stderr}") except Timeout as el: logger.error(f"File is already locked by another process ({vr.path}) : {el}") return vr
[docs] def run_ext_all(ctx: VaContext, vr: VaRecord) -> VaRecord: """Run all external tools on the specified VaRecord. :param ctx: VaContext object with processing context :type ctx: VaContext :param vr: VaRecord object to process :type vr: VaRecord :return: Updated VaRecord object :rtype: VaRecord """ logger.debug(f"run_ext_all(path={vr.path})") return run_ext_qr(ctx, run_ext_nosignal(ctx, vr))
[docs] def do_audit( ctx: VaContext, paths_dir_or_file: List[str] ) -> Generator[VaRecord, None, None]: """Generator that audits files and applies all external tools to each record if any, depending on context and options. """ logger.debug(f"do_audit(paths_dir_or_file={paths_dir_or_file})") for rec in do_audit_internal(ctx, paths_dir_or_file): yield run_ext_all(ctx, rec)
[docs] def do_ext( ctx: VaContext, recs: List[VaRecord], paths_dir_or_file: List[str] ) -> Generator[VaRecord, None, None]: """Generator that runs external tools on existing records depending on context and options. """ logger.debug("do_ext(...)") has_filter = bool(paths_dir_or_file) # create separate sets of dirs and files from paths_dir_or_file path_dirs = set() path_files = set() for path in paths_dir_or_file: if path == "*": has_filter = False break if os.path.exists(path): if os.path.isfile(path): path_files.add(_normalize_path(path)) elif os.path.isdir(path): path_dirs.add(_normalize_path(path)) else: logger.error(f"Path does not exist: {path}") for rec in recs: # filter by paths when specified f_match: bool = False if has_filter: # filter by file path # logger.debug(f"rec.path : {rec.path}") # logger.debug(f"path_files : {path_files}") if not f_match and rec.path in path_files: logger.debug(f"Matched ext record by PATH filter: {rec.path}") f_match = True # check path starts with one of dir names if ( not f_match and path_dirs and any(rec.path.startswith(d) for d in path_dirs) ): logger.debug(f"Matched ext record by DIR filter: {rec.path}") f_match = True if not f_match: logger.debug(f"Skipping ext record by PATH / DIR filters: {rec.path}") yield rec continue # process matched ext record yield run_ext_all(ctx, rec)
[docs] def get_file_video_audit( path: str, path_tsv: str = None, cached: bool = False, use_lock: bool = True, ) -> VaRecord: """Get a single VaRecord by auditing a single video file. :param path: Path to the video file. :type path: str :param path_tsv: Optional path to existing TSV file to load existing records from. Default: None. :type path_tsv: str :param cached: If ``True``, return cached TSV data when available instead of reloading from disk. Passed through to :func:`_get_tsv_records`. Default: ``False``. :type cached: bool :param use_lock: If ``True`` (default), acquire the advisory file lock before reading ``path_tsv``. If ``False``, skip the lock (dirty-read mode). :type use_lock: bool :return: VaRecord object. :rtype: VaRecord """ logger.debug( f"get_file_video_audit(path={path}, path_tsv={path_tsv}," f" cached={cached}, use_lock={use_lock})" ) # If path_tsv is provided and exists, try to load record from TSV if path_tsv and os.path.exists(path_tsv): logger.debug(f"Loading video audit record from TSV: {path_tsv}") try: records: List[VaRecord] = _get_tsv_records( path_tsv, cached=cached, use_lock=use_lock ) for rec in records: if rec.path == path: logger.debug(f"Found matching record in TSV for: {path}") return rec logger.debug( f"No matching record found in TSV for: {path}, falling back to audit" ) except Timeout: logger.warning( f"Timeout acquiring lock for {path_tsv}, falling back to audit" ) except Exception as e: logger.warning( f"Error loading TSV file {path_tsv}: {e}, falling back to audit" ) # Fall back to auditing the file directly logger.debug(f"Auditing video file directly: {path}") ctx: VaContext = VaContext( skip_names=None, c_internal=0, c_nosignal=0, c_qr=0, log_level=os.environ["REPROSTIM_LOG_LEVEL"], max_counter=1, mode=VaMode.INCREMENTAL, path_mask=None, recursive=False, source={VaSource.INTERNAL}, ) return next(do_audit_file(ctx, path), None)
def _parse_rec_datetime(date_str: str, time_str: str) -> Optional[datetime]: """Parse a VaRecord date/time string pair into a datetime object. :param date_str: Date string in ``'YYYY-MM-DD'`` format, or ``'n/a'``. :type date_str: str :param time_str: Time string in ``'HH:MM:SS.mmm'`` format, or ``'n/a'``. :type time_str: str :return: Parsed datetime object, or ``None`` if either value is ``'n/a'`` or parsing fails. :rtype: Optional[datetime] """ if date_str == "n/a" or time_str == "n/a": return None try: return datetime.strptime(f"{date_str} {time_str}", "%Y-%m-%d %H:%M:%S.%f") except ValueError: return None
[docs] def find_video_audit_by_timerange( path_tsv: str, start: datetime, end: datetime, cached: bool = False, use_lock: bool = True, ) -> List[VaRecord]: """Find all VaRecords whose recording interval intersects the given time range, sorted by record start time ascending. Only records with both ``present`` and ``complete`` flags set to ``True`` are considered. A record intersects ``[start, end]`` when its start is before ``end`` and its end is after ``start``. :param path_tsv: Path to the TSV file to search. :type path_tsv: str :param start: Start of the query time range (inclusive). :type start: datetime :param end: End of the query time range (inclusive). :type end: datetime :param cached: If ``True``, return cached TSV data when available instead of reloading from disk. Default: ``False``. :type cached: bool :param use_lock: If ``True`` (default), acquire the advisory lock before loading from disk. If ``False``, skip the lock (dirty-read mode). Default: ``True``. :type use_lock: bool :return: List of matching VaRecord objects sorted by start time ascending. :rtype: List[VaRecord] """ logger.debug( f"find_video_audit_by_timerange(path_tsv={path_tsv}," f" start={start}, end={end}, cached={cached}, use_lock={use_lock})" ) records = _get_tsv_records(path_tsv, cached=cached, use_lock=use_lock) # Note: in the future, consider optimizing this by caching parsed start/end_ts # in VaRecord and/or using a more efficient data structure for lookups result: List[VaRecord] = [] for rec in records: if not rec.present or not rec.complete: continue if rec.start_date == "n/a" or rec.start_time == "n/a": continue if rec.end_date == "n/a" or rec.end_time == "n/a": continue rec_start = _parse_rec_datetime(rec.start_date, rec.start_time) if rec_start is None: continue if rec_start >= end: continue rec_end = _parse_rec_datetime(rec.end_date, rec.end_time) if rec_end is not None and rec_end <= start: continue result.append(rec) result.sort(key=lambda r: _parse_rec_datetime(r.start_date, r.start_time)) return result
[docs] def do_main( paths: List[str], path_tsv: str, recursive: bool = False, mode: VaMode = VaMode.INCREMENTAL, va_src: Set[VaSource] = None, max_files: int = -1, path_mask: str = None, verbose: bool = False, out_func=print, nosignal_opts: str = None, qr_opts: str = None, ): """The main function invoked by CLI to analyze video files with logs and save the results to a TSV file. :param paths: One or more paths to the video file or directory :type paths: List[str] :param path_tsv: Path to the output TSV file, default 'videos.tsv'. :type path_tsv: str :param recursive: Whether to scan directories recursively. Default: False :type recursive: bool :param mode: Operation mode, one of VaMode values (default: INCREMENTAL) :type mode: VaMode :param va_src: Set of VaSource values to specify audit sources. Default: ``{VaSource.INTERNAL}``. :type va_src: Set[VaSource] :param max_files: Maximum number of video files/records to process. Use -1 for unlimited (default: -1) :type max_files: int :param path_mask: Optional fnmatch-style mask to filter files :type path_mask: str :param verbose: Whether to print verbose JSON output to stdout (default: False) :type verbose: bool :param out_func: Function to stdout results (default: print) :type out_func: Callable[[str], None] :param nosignal_opts: Optional string of extra options to pass to detect-noscreen, parsed via shlex. Overrides the built-in defaults when provided. :type nosignal_opts: str :param qr_opts: Optional string of extra options to pass to qr-parse, parsed via shlex. No extra options by default. :type qr_opts: str :return: 0 on success, 1 on failure :rtype: int """ if va_src is None: va_src = {VaSource.INTERNAL} logger.debug("video-audit command") logger.debug(f"paths : {paths}") logger.debug(f"path_tsv : {path_tsv}") # double validate each path is valid and exists for path in paths: if not os.path.exists(path): logger.error(f"Path does not exist: {path}") return 1 if not check_ffprobe(): out_func( "Error: ffprobe is not installed. Make sure" " ffmpeg package is installed." ) logger.error( "!!! ffprobe is required to parse audio but" " not found. Make sure ffmpeg package is installed." ) lock = FileLock(f"{path_tsv}.lock") recs0: List[VaRecord] = [] # in case path_tsv exists, and mode is not FULL, # load existing records if mode != VaMode.FULL and os.path.exists(path_tsv): logger.info(f"Loading existing TSV file: {path_tsv}") with lock: recs0 = _load_tsv(path_tsv) logger.info(f"Loaded {len(recs0)} existing records from TSV") # skip files set in case of INCREMENTAL mode skip_names = None if mode == VaMode.INCREMENTAL and len(recs0) > 0: skip_names = {r.name for r in recs0} # collect all records from generator into a list # setup audit context first ctx: VaContext = VaContext( skip_names=skip_names, c_internal=0, c_nosignal=0, c_qr=0, log_level=os.environ["REPROSTIM_LOG_LEVEL"], max_counter=max_files, mode=mode, nosignal_opts=( shlex.split(nosignal_opts) if nosignal_opts is not None else VaContext.model_fields["nosignal_opts"].default ), path_mask=path_mask, qr_opts=( shlex.split(qr_opts) if qr_opts is not None else VaContext.model_fields["qr_opts"].default ), recursive=recursive, source=va_src, ) recs1: List[VaRecord] = list(do_audit(ctx, paths)) if verbose: for vr in recs1: out_func(f"{vr.model_dump_json()}") # merge recs0 and recs1, with recs1 taking precedence by .name key recs = recs1 if len(recs0) > 0: merged_dict = {r.name: r for r in recs0} merged_dict.update({r.name: r for r in recs1}) recs = list(merged_dict.values()) if mode in {VaMode.RERUN_FOR_NA, VaMode.RESET_TO_NA}: recs = list(do_ext(ctx, recs, paths)) logger.info(f"Audited records count: {len(ctx.updated_paths)}") logger.info(f"Saving results to TSV file : {path_tsv}") logger.info(f"Total records to save : {len(recs)}") # sort records by name recs.sort(key=lambda r: r.name) with lock: recs_cur: List[VaRecord] = ( _load_tsv(path_tsv) if os.path.exists(path_tsv) else [] ) recs = _merge_recs(ctx, recs0, recs_cur, recs) # sort records by name again recs.sort(key=lambda r: r.name) _save_tsv(recs, path_tsv) return 0