Source code for reprostim.capture.nosignal

# SPDX-FileCopyrightText: 2020-2026 ReproNim Team <info@repronim.org>
#
# SPDX-License-Identifier: MIT

"""
Provides functionality to search no-signal/rainbow frames in
the video files (`*.mkv`) recorded by `reprostim-videocapture`
utility.
"""

import logging.config
import subprocess
import time
from datetime import timedelta

import cv2
import numpy as np
from pydantic import BaseModel, Field

logger = logging.getLogger(__name__)
logger.debug(f"name={__name__}")


[docs] class VideoInfo(BaseModel): """ Class representing information about a video file. """ error: str = Field(None, description="Error message") """Error message if any.""" fps: float = Field(..., description="Frames per second (FPS)") """Video frame rate (FPS).""" width: int = Field(..., description="Video frame width") """Video frame width in px.""" height: int = Field(..., description="Video frame height") """Video frame height in px.""" is_invalid_timing: bool = Field( False, description="Is video with invalid duration/timing" ) """ Specifies if video has invalid duration and needs in fixup (when calculated duration is 0 or greater than 2 days). """ is_truncated: bool = Field(False, description="Is video truncated") """Specifies truncated video which needs in fixup.""" frames_count: int = Field(..., description="Total number of frames") """Specifies total number of frames.""" nosignal_count: int = Field(..., description="Total number of nosignal " "frames") """Specifies total number of nosignal frames.""" nosignal_rate: float = Field( ..., description="Rate of nosignal frames " "in fraction of total frames" ) """Specifies rate of nosignal frames in fraction of total frames.""" scanned_count: int = Field(..., description="Total number of scanned frames") """Specifies total number of scanned frames.""" def __str__(self): return ( f"VideoInfo({self.width}x{self.height}, fps={self.fps}, " f"is_truncated={self.is_truncated}, " f"frames_count={self.frames_count}, " f"scanned_count={self.scanned_count}, " f"nosignal_count={self.nosignal_count}, " f"nosignal_rate={self.nosignal_rate})" )
# Define the range of colors for the rainbow screen # This is just an example range, adjust it based on your rainbow screen lower_rainbow = np.array([0, 50, 50]) upper_rainbow = np.array([30, 255, 255]) # Specify nosignal grid size (8 bands) for custom algorithm grid_rows: int = 6 grid_cols: int = 8 grid_colors = [[None for _ in range(grid_cols)] for _ in range(grid_rows)]
[docs] def auto_fix_video(video_path: str, temp_path: str): """ Fixes `*.mkv` video file with invalid audio/video timing by copying only video stream data. Note: audio data will be lost in generated file with fixup. :param video_path: The path to the video file that needs to be fixed. :param temp_path: The temporary path where the fixed video will be saved. Example ------- >>> auto_fix_video("path/to/video.mp4", "path/to/fixed_video.mp4") """ logger.info(f"Run mediainfo to get video information: mediainfo -i {video_path}") res = subprocess.run( f"mediainfo -i {video_path}", check=True, shell=True, capture_output=True, text=True, ) logger.info(f"[exit code] : {res.returncode}") logger.info(f"[stdout] : {res.stdout}") logger.info(f"[stderr] : {res.stderr}") logger.info(f"Run fixup ffmpeg : ffmpeg -i {video_path} -an -c copy {temp_path}") res = subprocess.run( f"ffmpeg -i {video_path} -c copy {temp_path}", check=True, shell=True, capture_output=True, text=True, ) logger.info(f"[exit code] : {res.returncode}") logger.info(f"[stdout] : {res.stdout}") logger.info(f"[stderr] : {res.stderr}") logger.info("Video fixup completed.")
# Function to calculate opencv2 color difference
[docs] def calc_color_diff(color1, color2): """ Calculates the color difference between two colors based on their RGB components. The color difference is computed as the sum of the absolute differences between the `red`, `green`, and `blue` channels of the two colors. :param color1: The first color, which should be in the format of a NumPy array or tuple containing the `blue`, `green`, and `red` components of the color. :param color2: The second color, which should also be in the same format as `color1`. :return: The absolute sum of the differences between the `red`, `green`, and `blue` components of `color1` and `color2`. """ b1, g1, r1 = color1.astype(np.int32) b2, g2, r2 = color2.astype(np.int32) return abs(r1 - r2) + abs(g1 - g2) + abs(b1 - b2)
[docs] def has_rainbow(frame): """ Detects the presence of rainbow colors in a given frame. Note: produce false positive results on real data and was improved in has_rainbow2 API implementation. :param frame: The input frame (image) in as a NumPy array. :return: `True` if a significant number of rainbow pixels are detected, `False` otherwise. """ # Convert frame to HSV color space hsv = cv2.cvtColor(frame, cv2.COLOR_BGR2HSV) # Threshold the frame to extract rainbow colors mask = cv2.inRange(hsv, lower_rainbow, upper_rainbow) # Count non-zero pixels in the mask pixel_count = cv2.countNonZero(mask) # logger.debug(f"pixel_count={pixel_count}") # If a significant number of rainbow pixels are detected, return True return pixel_count > 10000 # Adjust threshold as needed
[docs] def has_rainbow2(frame): """ Detects the presence of rainbow colors in a given frame (recommended). Match nosignal grid colors using custom algorithm based on reference rainbow image sample data. :param frame: The input frame (image) in as a NumPy array. :return: `True` if a significant number of rainbow pixels are detected, `False` otherwise. """ height, width, _ = frame.shape cy = height // grid_rows cx = width // grid_cols n = grid_rows * grid_cols diff = 0 for i in range(grid_rows): for j in range(grid_cols): x = int(j * cx + cx // 2) y = int(i * cy + cy // 2) clr1 = grid_colors[i][j] clr2 = frame[y, x] diff = diff + calc_color_diff(clr1, clr2) diff = diff / n logger.debug(f"diff={diff}") # NOTE: tune the threshold value as needed return diff < 35
[docs] def init_grid_colors(ref_image_path: str): """ Initializes rainbow screen image color table based on frame reference image. Note: necessary for has_rainbow2 API only. :param frame: The reference image path. """ logger.debug(f"ref_image_path={ref_image_path}") # Load reference image image = cv2.imread(str(ref_image_path)) # Get the dimensions of the image height, width, _ = image.shape # Calculate the height and width of grid cell cell_height = height // grid_rows cell_width = width // grid_cols # Loop over the grid for i in range(grid_rows): for j in range(grid_cols): # Calculate the center of each region x = int(j * cell_width + cell_width // 2) y = int(i * cell_height + cell_height // 2) # Get the pixel color at the center # opencv use (row, column) coordinates clr = image[y, x] # Store the color in the 2D list grid_colors[i][j] = clr # dump the grid colors logger.debug("Nosignal reference grid colors:") for i, row in enumerate(grid_colors): for j, color in enumerate(row): b, g, r = color.astype(np.int32) # OpenCV stores colors as BGR logger.debug(f" grid_colors[{i+1}, {j+1}] : RGB({r}, {g}, {b})")
[docs] def find_no_signal( video_path: str, step: int = 1, number_of_checks: int = 0, show_progress_sec: float = 0.0, check_first_frames: int = 0, ) -> VideoInfo: """ Scans a video `*.mkv` file to detect frames that contain a rainbow pattern (no signal frames). This function opens a video file, scans frames for a specific nosignal image pattern, and returns statistics on the frames scanned, the frames with no signal, and the rate of no-signal frames. :param video_path: The path to the video `*.mkv` file to scan. :param step: The step size for frame scanning. The function skips frames by this step. :param number_of_checks: The number of checks to perform. If greater than 0, this limits the number of frames to check in the video in range from 1st to the last one. :param show_progress_sec: The interval (in seconds) to display progress during the scan in stdout. Progress is shown when `show_progress_sec` is greater than 0.0. :param check_first_frames: The number of frames to check at the start of the video. If greater than 0, the scan will stop right after analyzing these frames. :return: A `VideoInfo` object with scan details. Example ------- .. code-block:: python video_info = find_no_signal("path/to/video.mp4", number_of_checks=5) print(video_info.nosignal_rate) """ vi: VideoInfo = VideoInfo( fps=0, width=0, height=0, is_invalid_timing=False, is_truncated=False, frames_count=0, nosignal_count=0, nosignal_rate=0.0, scanned_count=0, ) if step < 1: vi.error = "Step must be greater than 0" return vi if check_first_frames > 0 and number_of_checks > 0: logger.warning( f"check_first_frames value specified, " f"ignore number_of_checks({number_of_checks}) value." ) number_of_checks = 0 if number_of_checks < 0: vi.error = "Number of checks must be 0 or greater than 0" return vi if number_of_checks > 0 and step > 1: logger.warning( f"Number of checks is set, specified step value({step}) will be ignored." ) step = 1 cap = cv2.VideoCapture(video_path) if not cap.isOpened(): vi.error = f"Couldn't open the video file: {video_path}" return vi n1 = cap.get(cv2.CAP_PROP_FRAME_COUNT) logger.debug(f"n1={str(n1)}") frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) logger.debug(f"frame_count={str(frame_count)}") fps = cap.get(cv2.CAP_PROP_FPS) frame_width: int = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) frame_height: int = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) duration_sec: float = frame_count / fps if fps > 0 else -1.0 logger.debug(f"duration_sec={duration_sec}") vi.width = frame_width vi.height = frame_height vi.fps = round(fps, 2) logger.info( f"Video resolution={frame_width}x{frame_height}, fps={str(fps)}, " f"frames count={str(frame_count)}" ) logger.info( f"pos ms={cap.get(cv2.CAP_PROP_POS_MSEC)}, " f"pos frames={cap.get(cv2.CAP_PROP_POS_FRAMES)}" ) nosignal_frames = [] # for i in range(frame_count): # logger.debug(f"i={i}") # ret, frame = cap.read() # if not ret: # logger.error(f"Error reading frame {i}") # break # # if not has_rainbow(frame): # no_signal_frames.append(i) pos_first_frame: int = 0 pos_last_frame: int = pos_first_frame + frame_count if pos_first_frame > pos_last_frame: vi.is_truncated = True if check_first_frames == 0: vi.error = f"Invalid frame range: {pos_first_frame} - {pos_last_frame}" return vi if duration_sec < 0 or duration_sec > 2 * 24 * 60 * 60: vi.is_invalid_timing = True if check_first_frames == 0: vi.error = f"Invalid video duration: {duration_sec} seconds" return vi pos_cur_frame: int = pos_first_frame pos_next_frame: int = pos_cur_frame nosignal_counter: int = 0 scan_counter: int = 0 ts_progress: float = time.time() progress_interval: float = show_progress_sec while True: if show_progress_sec > 0.0 and time.time() > ts_progress: dt: str = str(timedelta(milliseconds=int(cap.get(cv2.CAP_PROP_POS_MSEC)))) logger.info(f"Scanning progress: {pos_cur_frame} / {pos_last_frame}, {dt}") ts_progress = time.time() + progress_interval logger.debug( f"pos_frame={pos_cur_frame}, " f"time={round(pos_cur_frame / fps, 1)}" ) cap.set(cv2.CAP_PROP_POS_FRAMES, pos_cur_frame) ret, frame = cap.read() if ret is False: logger.debug(f"Failed reading frame {pos_cur_frame}") break # for some rare videos, opencv continues to read frames even # after the end of video # e.g. Videos/2024/03/2024.03.18.14.39.38.336_2024.03.18.14.44.02.541.mkv # to fix this cases, just break the loop if pos_cur_frame > pos_last_frame: logger.debug( f"Failed reading frame sequencer, " f"pos_cur_frame={pos_cur_frame} > " f"pos_last_frame={pos_last_frame}" ) break # also double check number_of_checks for similar cases if 0 < number_of_checks <= scan_counter: logger.debug( f"Failed reading frame, number_of_checks={number_of_checks} " f"limit reached" ) break scan_counter += 1 # break if check_first_frames is set and reached if check_first_frames > 0 and scan_counter >= check_first_frames: logger.debug(f"check_first_frames={check_first_frames} reached") break # set next frame position if number_of_checks > 0: pos_next_frame = pos_first_frame + int( frame_count * scan_counter / number_of_checks ) else: pos_next_frame = pos_cur_frame + step if has_rainbow2(frame): logger.debug("rainbow-yes") nosignal_counter = nosignal_counter + 1 nosignal_frames.append(pos_cur_frame) else: logger.debug("rainbow-no") pos_cur_frame = pos_next_frame logger.debug(f"pos_frame={pos_cur_frame}") vi.frames_count = scan_counter if vi.is_truncated else pos_cur_frame vi.nosignal_count = nosignal_counter vi.scanned_count = scan_counter if scan_counter > 0: vi.nosignal_rate = round(nosignal_counter / scan_counter, 3) cap.release() # Calculate time from beginning for each identified frame # time_from_beginning = [frame_idx / fps for frame_idx in # no_signal_frames] return vi