diff --git a/trackstudio/calibration/calibration.py b/trackstudio/calibration/calibration.py index 9e4bbcc..7f1721c 100644 --- a/trackstudio/calibration/calibration.py +++ b/trackstudio/calibration/calibration.py @@ -3,137 +3,151 @@ Handles all camera calibration functionality for the vision system """ -import json import logging +import numpy as np +import cv2 import time -import traceback -from pathlib import Path -from typing import Any +import json +import os +from typing import Dict, List, Tuple, Optional, Any -import cv2 -import numpy as np +# Assuming ServerConfig is in the same package (e.g., your_project_name.config) +from ..core.config import ServerConfig logger = logging.getLogger(__name__) - class CameraCalibration: - """Handles camera calibration for BEV transformation""" - + """ + Handles camera calibration for BEV transformation and intrinsic (undistortion) parameters. + Homography matrices are loaded from `calibration_file`. + Intrinsic parameters are loaded from paths specified in `ServerConfig.CALIBRATION_FILES`. + """ + def __init__(self, calibration_file: str = "calibration_data.json"): - # Convert to absolute path to avoid working directory issues - self.calibration_file = str(Path(calibration_file).resolve()) - self.homography_matrices: dict[int, np.ndarray] = {} - - # Initialize default homography matrices + # Path for BEV/Homography calibration data + self.calibration_file = calibration_file + self.homography_matrices: Dict[int, np.ndarray] = {} + + # Dictionary to store intrinsic calibration data {stream_id: loaded_json_data} + self.intrinsic_calibration_data: Dict[int, Dict[str, Any]] = {} + + # Initialize default homography matrices (BEV) self._initialize_default_homography() - - # Load existing calibration data - self.load_calibration_data() - - logger.info("πŸ“ Camera calibration module initialized") - + + # Load existing homography calibration data from self.calibration_file + self.load_homography_calibration_data() + + # NEW: Load intrinsic calibration data based on ServerConfig + self._load_intrinsic_from_server_config() + + logger.info("πŸ“ Camera calibration module initialized.") + def _initialize_default_homography(self): - """Initialize default homography matrices for cameras""" + """Initialize default homography matrices for cameras for BEV transformation.""" # Improved default homography matrices that preserve aspect ratios # Camera frame: 720x480, BEV canvas: 600x600 # Scale factor to maintain aspect ratio: 600/720 = 0.833 - scale_x = 0.833 # Scale down to fit width - scale_y = 1.25 # Scale up to account for perspective - + scale_x = 0.833 # Scale down to fit width + scale_y = 1.25 # Scale up to account for perspective (example) + self.homography_matrices = { - 0: np.array( - [ # Camera 0 (top-left) - better default transformation - [scale_x, 0.0, 50.0], # Scale x and offset slightly - [0.0, scale_y, 50.0], # Scale y and offset slightly - [0.0, 0.001, 1.0], # Minimal perspective distortion - ], - dtype=np.float32, - ), - 1: np.array( - [ # Camera 1 (top-right) - [scale_x, 0.0, 150.0], # More x offset for right camera - [0.0, scale_y, 50.0], - [0.0, 0.001, 1.0], - ], - dtype=np.float32, - ), - 2: np.array( - [ # Camera 2 (bottom-left) - [scale_x, 0.0, 50.0], - [0.0, scale_y, 350.0], # Y offset for bottom row - [0.0, 0.001, 1.0], - ], - dtype=np.float32, - ), - 3: np.array( - [ # Camera 3 (bottom-right) - [scale_x, 0.0, 150.0], - [0.0, scale_y, 350.0], # Both x and y offset - [0.0, 0.001, 1.0], - ], - dtype=np.float32, - ), + 0: np.array([ # Camera 0 (top-left) - better default transformation + [scale_x, 0.0, 50.0], # Scale x and offset slightly + [0.0, scale_y, 50.0], # Scale y and offset slightly + [0.0, 0.001, 1.0] # Minimal perspective distortion + ], dtype=np.float32), + 1: np.array([ # Camera 1 (top-right) + [scale_x, 0.0, 150.0], # More x offset for right camera + [0.0, scale_y, 50.0], + [0.0, 0.001, 1.0] + ], dtype=np.float32), + 2: np.array([ # Camera 2 (bottom-left) + [scale_x, 0.0, 50.0], + [0.0, scale_y, 350.0], # Y offset for bottom row + [0.0, 0.001, 1.0] + ], dtype=np.float32), + 3: np.array([ # Camera 3 (bottom-right) + [scale_x, 0.0, 150.0], + [0.0, scale_y, 350.0], # Both x and y offset + [0.0, 0.001, 1.0] + ], dtype=np.float32) } - - def calibrate_camera( - self, - camera_id: int, - image_points: list[tuple[float, float]], - bev_points: list[tuple[float, float]], - bev_size: int = 600, - ) -> tuple[bool, str, np.ndarray | None]: + + def _load_intrinsic_from_server_config(self): """ - Calibrate camera using 4-point correspondence - + NEW: Loads intrinsic calibration data from files specified in ServerConfig.CALIBRATION_FILES. + This data is stored in self.intrinsic_calibration_data. + """ + if ServerConfig.CALIBRATION_FILES: + logger.info(f"Loading intrinsic calibration files from ServerConfig: {ServerConfig.CALIBRATION_FILES}") + for stream_id, file_path in ServerConfig.CALIBRATION_FILES.items(): + try: + if os.path.exists(file_path): + with open(file_path, 'r') as f: + data = json.load(f) + self.intrinsic_calibration_data[stream_id] = data + logger.info(f"βœ… Loaded intrinsic data for stream {stream_id} from {file_path}") + else: + logger.warning(f"Intrinsic calibration file not found for stream {stream_id}: {file_path}") + except json.JSONDecodeError: + logger.error(f"Invalid JSON in intrinsic calibration file for stream {stream_id}: {file_path}") + except Exception as e: + logger.error(f"Error loading intrinsic calibration for stream {stream_id} from {file_path}: {e}") + else: + logger.info("No intrinsic calibration files specified in ServerConfig.CALIBRATION_FILES.") + + def calibrate_camera(self, camera_id: int, image_points: List[Tuple[float, float]], + bev_points: List[Tuple[float, float]], bev_size: int = 600) -> Tuple[bool, str, Optional[np.ndarray]]: + """ + Calibrate camera for BEV transformation using 4-point correspondence. + Args: - camera_id: Camera ID (0 or 1) + camera_id: Camera ID (0-3) image_points: List of 4 points in image coordinates [(x, y), ...] bev_points: List of 4 corresponding points in normalized BEV coordinates [0-1] bev_size: Size of BEV map in pixels for transformation - + Returns: Tuple of (success, message, homography_matrix) """ try: if len(image_points) != 4 or len(bev_points) != 4: return False, "Exactly 4 point pairs are required for calibration", None - + # Convert to numpy arrays img_pts = np.array(image_points, dtype=np.float32) bev_pts = np.array(bev_points, dtype=np.float32) - + # Convert normalized BEV points [0-1] to actual BEV coordinates bev_pts_pixel = bev_pts * bev_size - + # Compute homography matrix directly without aspect ratio correction - # The frontend coordinate system now handles aspect ratios properly homography_matrix, mask = cv2.findHomography(img_pts, bev_pts_pixel, cv2.RANSAC) - + if homography_matrix is None: return False, "Failed to compute homography matrix. Check point correspondences.", None - + # Store the homography matrix self.homography_matrices[camera_id] = homography_matrix - - logger.info(f"πŸ“ Camera {camera_id} calibrated successfully") - return True, f"Camera {camera_id} calibrated successfully", homography_matrix - + + logger.info(f"πŸ“ Camera {camera_id} BEV calibrated successfully") + return True, f"Camera {camera_id} BEV calibrated successfully", homography_matrix + except Exception as e: - error_msg = f"Calibration failed: {str(e)}" + error_msg = f"BEV calibration failed: {str(e)}" logger.error(f"❌ {error_msg}") return False, error_msg, None - - def transform_image_with_homography( - self, image: np.ndarray, camera_id: int, output_size: tuple[int, int] = (400, 400) - ) -> np.ndarray | None: + + def transform_image_with_homography(self, image: np.ndarray, camera_id: int, + output_size: Tuple[int, int] = (400, 400)) -> Optional[np.ndarray]: """ - Transform an image using the calibrated homography matrix - + Transform an image using the calibrated homography matrix (for BEV). + Args: image: Input image to transform camera_id: Camera ID to get homography matrix for output_size: Output image size (width, height) - + Returns: Transformed image or None if no homography available """ @@ -141,161 +155,203 @@ def transform_image_with_homography( if camera_id not in self.homography_matrices: logger.warning(f"No homography matrix available for camera {camera_id}") return None - + homography_matrix = self.homography_matrices[camera_id] - + # Apply homography transformation - return cv2.warpPerspective(image, homography_matrix, output_size) - + transformed_image = cv2.warpPerspective(image, homography_matrix, output_size) + + return transformed_image + except Exception as e: - logger.error(f"❌ Error transforming image: {e}") + logger.error(f"❌ Error transforming image with homography: {e}") return None - - def transform_points_to_bev(self, points: list[tuple[float, float]], camera_id: int) -> list[tuple[float, float]]: + + def transform_points_to_bev(self, points: List[Tuple[float, float]], camera_id: int) -> List[Tuple[float, float]]: """ - Transform image points to BEV coordinates using homography - + Transform image points to BEV coordinates using homography. + Args: points: List of (x, y) points in image coordinates camera_id: Camera ID to get homography matrix for - + Returns: List of transformed points in BEV coordinates """ if camera_id not in self.homography_matrices or not points: return [] - + try: homography_matrix = self.homography_matrices[camera_id] - + # Convert points to numpy array format expected by cv2.perspectiveTransform pts_array = np.array(points, dtype=np.float32).reshape(-1, 1, 2) - + # Apply homography transformation transformed_pts = cv2.perspectiveTransform(pts_array, homography_matrix) - + # Convert back to list of tuples return [(float(pt[0][0]), float(pt[0][1])) for pt in transformed_pts] - + except Exception as e: - logger.error(f"❌ Error transforming points: {e}") + logger.error(f"❌ Error transforming points to BEV: {e}") return [] - - def get_homography_matrix(self, camera_id: int) -> np.ndarray | None: + + def get_homography_matrix(self, camera_id: int) -> Optional[np.ndarray]: """Get the current homography matrix for a camera""" return self.homography_matrices.get(camera_id, None) - + def update_homography(self, camera_id: int, homography_matrix: np.ndarray): - """Update homography matrix for a specific camera""" + """Update homography matrix for a specific camera (for BEV).""" self.homography_matrices[camera_id] = homography_matrix logger.info(f"πŸ“ Updated homography matrix for camera {camera_id}") - - def save_calibration_data( - self, - camera_id: int, - image_points: list[tuple[float, float]], - bev_points: list[tuple[float, float]], - homography_matrix: np.ndarray, - bev_size: int = 400, - ): - """Save calibration data to file""" - logger.info(f"πŸ”§ Attempting to save calibration data for camera {camera_id} to {self.calibration_file}") + + def save_calibration_data(self, camera_id: int, image_points: List[Tuple[float, float]], + bev_points: List[Tuple[float, float]], homography_matrix: np.ndarray, + bev_size: int = 400): + """Save BEV calibration data to file (homography matrices, points).""" try: # Load existing data WITHOUT updating in-memory matrices calibration_data = {} - if Path(self.calibration_file).exists(): - logger.info(f"πŸ“– Loading existing calibration data from {self.calibration_file}") - with Path(self.calibration_file).open() as f: + if os.path.exists(self.calibration_file): + with open(self.calibration_file, 'r') as f: calibration_data = json.load(f) - else: - logger.info(f"πŸ“ Creating new calibration data file at {self.calibration_file}") - - # Update with new calibration + + # Update with new calibration for this camera + # Store camera ID as string key in JSON for consistency calibration_data[f"camera{camera_id}"] = { "homography_matrix": homography_matrix.tolist(), "image_points": image_points, "bev_points": bev_points, "bev_size": bev_size, - "calibrated_at": time.time(), + "calibrated_at": time.time() } - + # Save to file - logger.info(f"πŸ’Ύ Writing calibration data to {self.calibration_file}") - with Path(self.calibration_file).open("w") as f: + with open(self.calibration_file, 'w') as f: json.dump(calibration_data, f, indent=2) - - logger.info(f"βœ… Successfully saved calibration data for camera {camera_id} to {self.calibration_file}") - + + logger.info(f"πŸ’Ύ Saved BEV calibration data for camera {camera_id} to {self.calibration_file}") + except Exception as e: - logger.error(f"❌ Error saving calibration data to {self.calibration_file}: {e}") - - logger.error(f"πŸ“ Traceback: {traceback.format_exc()}") - - def load_calibration_data(self) -> dict: - """Load calibration data from file and update homography matrices""" - if Path(self.calibration_file).exists(): + logger.error(f"❌ Error saving BEV calibration data: {e}") + + def load_homography_calibration_data(self) -> Dict: + """Load homography calibration data from file and update in-memory matrices.""" + if os.path.exists(self.calibration_file): try: - with Path(self.calibration_file).open() as f: + with open(self.calibration_file, 'r') as f: data = json.load(f) - - # Load homography matrices + + # Load homography matrices from the file for camera_key, calibration in data.items(): - if camera_key.startswith("camera") and "homography_matrix" in calibration: - camera_id = int(camera_key.replace("camera", "")) - matrix = np.array(calibration["homography_matrix"], dtype=np.float32) + if camera_key.startswith('camera') and 'homography_matrix' in calibration: + # Extract integer camera_id from string key (e.g., "camera0" -> 0) + camera_id = int(camera_key.replace('camera', '')) + matrix = np.array(calibration['homography_matrix'], dtype=np.float32) self.homography_matrices[camera_id] = matrix - logger.info(f"πŸ“ Loaded homography matrix for camera {camera_id}") - + logger.info(f"πŸ“ Loaded homography matrix for camera {camera_id} from {self.calibration_file}") + return data - + + except json.JSONDecodeError: + logger.error(f"❌ Invalid JSON in homography calibration file: {self.calibration_file}. Returning empty data.") + return {} except Exception as e: - logger.error(f"❌ Error loading calibration file: {e}") + logger.error(f"❌ Error loading homography calibration file: {e}. Returning empty data.") return {} - + + logger.info(f"No homography calibration file found at {self.calibration_file}. Using default matrices.") return {} - + def clear_calibration_data(self): - """Clear all calibration data and reset to defaults""" + """Clear all calibration data (homography and intrinsic) and reset homographies to defaults.""" try: - if Path(self.calibration_file).exists(): - Path(self.calibration_file).unlink() - + if os.path.exists(self.calibration_file): + os.remove(self.calibration_file) + # Reset homography matrices to defaults self._initialize_default_homography() + + # Clear intrinsic calibration data + self.intrinsic_calibration_data = {} - logger.info("πŸ—‘οΈ Cleared all calibration data") - + logger.info("πŸ—‘οΈ Cleared all calibration data (homography and intrinsic).") + except Exception as e: logger.error(f"❌ Error clearing calibration data: {e}") - - def get_calibration_status(self) -> dict[str, Any]: - """Get calibration status for all cameras""" - calibration_data = self.load_calibration_data() - - return { - "camera0": { - "calibrated": "camera0" in calibration_data, - "calibrated_at": calibration_data.get("camera0", {}).get("calibrated_at", None), - "has_homography": 0 in self.homography_matrices, - }, - "camera1": { - "calibrated": "camera1" in calibration_data, - "calibrated_at": calibration_data.get("camera1", {}).get("calibrated_at", None), - "has_homography": 1 in self.homography_matrices, - }, - "camera2": { - "calibrated": "camera2" in calibration_data, - "calibrated_at": calibration_data.get("camera2", {}).get("calibrated_at", None), - "has_homography": 2 in self.homography_matrices, - }, - "camera3": { - "calibrated": "camera3" in calibration_data, - "calibrated_at": calibration_data.get("camera3", {}).get("calibrated_at", None), - "has_homography": 3 in self.homography_matrices, - }, - } - + + def get_calibration_status(self) -> Dict[str, Any]: + """Get calibration status for all cameras (including both BEV and intrinsic).""" + # Reload homography data to ensure status is fresh from disk + homography_data_from_file = self.load_homography_calibration_data() + + status = {} + # Iterate over all possible stream IDs (based on default homography keys or ServerConfig enabled streams) + all_stream_ids = sorted(list(set(self.homography_matrices.keys()) | set(ServerConfig.get_enabled_streams()))) + + for stream_id in all_stream_ids: + camera_key = f"camera{stream_id}" + + # Check BEV calibration status + bev_calibrated = camera_key in homography_data_from_file and stream_id in self.homography_matrices + bev_calibrated_at = homography_data_from_file.get(camera_key, {}).get("calibrated_at", None) + + # Check intrinsic calibration status + intrinsic_calibrated = stream_id in self.intrinsic_calibration_data + intrinsic_data_timestamp = self.intrinsic_calibration_data.get(stream_id, {}).get("timestamp", None) # Assuming timestamp in intrinsic JSON + + status[camera_key] = { + "bev_calibrated": bev_calibrated, + "bev_calibrated_at": bev_calibrated_at, + "has_homography_matrix": stream_id in self.homography_matrices, + "intrinsic_calibrated": intrinsic_calibrated, + "intrinsic_data_timestamp": intrinsic_data_timestamp # Show when the intrinsic data was generated + } + + return status + def is_camera_calibrated(self, camera_id: int) -> bool: - """Check if a camera is properly calibrated""" - calibration_data = self.load_calibration_data() - return f"camera{camera_id}" in calibration_data and camera_id in self.homography_matrices + """ + Check if a camera has both BEV homography and intrinsic calibration loaded. + (You might adjust this logic if only one type of calibration is strictly required). + """ + # Checks if BEV homography is loaded AND intrinsic data is loaded + return camera_id in self.homography_matrices and camera_id in self.intrinsic_calibration_data + + def load_intrinsic_calibration(self, camera_id: int) -> Tuple[Optional[np.ndarray], Optional[np.ndarray], Tuple[int, int], Optional[np.ndarray]]: + """ + Load intrinsic calibration (K matrix and distortion coefficients) for a given camera_id. + This method now retrieves data from `self.intrinsic_calibration_data` + which was populated by `_load_intrinsic_from_server_config()`. + + Returns: + Tuple of (K, D, image_size, new_K_from_file) + Returns (None, None, (2560, 1440), None) on failure or if data is not found. + """ + data = self.intrinsic_calibration_data.get(camera_id) + if data: + try: + K = np.array(data['camera_matrix_K'], dtype=np.float32) + D = np.array(data['distortion_coefficients_D'], dtype=np.float32).flatten() + image_size = (data['image_width'], data['image_height']) + + new_K_from_file = None + if 'new_camera_matrix' in data: + new_K_from_file = np.array(data['new_camera_matrix'], dtype=np.float32) + + logger.info(f"πŸ“· Loaded intrinsic calibration for camera {camera_id} from in-memory data.") + logger.info(f" Image size: {image_size[0]}x{image_size[1]}") + logger.info(f" Distortion coefficients: {len(D)} (model: {'fisheye' if len(D) == 4 else ('rational' if len(D) > 5 else 'standard')})") + + return K, D, image_size, new_K_from_file + + except KeyError as e: + logger.warning(f"Intrinsic data for camera {camera_id} missing key: {e}. Data structure might be incorrect.") + return None, None, (2560, 1440), None # Return default size and None for matrices + except Exception as e: + logger.warning(f"Error parsing intrinsic calibration data for camera {camera_id}: {e}") + return None, None, (2560, 1440), None # Return default size and None for matrices + + logger.warning(f"No intrinsic calibration data found for camera {camera_id} in in-memory storage.") + return None, None, (2560, 1440), None # Default size \ No newline at end of file diff --git a/trackstudio/cli.py b/trackstudio/cli.py index b62b2c0..7cf0832 100644 --- a/trackstudio/cli.py +++ b/trackstudio/cli.py @@ -4,6 +4,7 @@ import json import sys +import logging # Import logging from pathlib import Path import click @@ -13,7 +14,6 @@ console = Console() - @click.group() @click.version_option() def cli(): @@ -28,7 +28,7 @@ def cli(): "--tracker", "-t", default="rfdetr", - type=str, # Allow any string, validation happens later + type=str, help="Vision tracker to use (rfdetr, dummy, or custom)", ) @click.option("--merger", "-m", default="bev_cluster", help="Cross-camera merger to use") @@ -37,37 +37,125 @@ def cli(): @click.option("--share", is_flag=True, help="Create public URL") @click.option("--no-browser", is_flag=True, help="Do not open browser automatically") @click.option("--vision-fps", default=10.0, type=float, help="Vision processing FPS") -@click.option("--calibration-file", type=click.Path(exists=True), help="Calibration data file") +# MODIFIED: Allow multiple calibration files, format ID=PATH +@click.option( + "--calibration-files", + "-cf", # Added short option for convenience + multiple=True, + type=str, + metavar="ID=PATH", + help="Calibration data files (e.g., -cf 0=cam0.json -cf 1=cam1.json). IDs must match stream order in config or --streams." +) +# NEW: Option to enable undistortion +@click.option( + "--enable-undistortion", + "-eu", # Added short option + is_flag=True, + help="Enable camera undistortion based on calibration data." +) @click.option("--debug", is_flag=True, help="Enable debug logging") -def run(streams, config, tracker, merger, port, host, share, no_browser, vision_fps, calibration_file, debug): +def run( + streams, config, tracker, merger, port, host, share, no_browser, + vision_fps, calibration_files, enable_undistortion, debug +): """Run TrackStudio server""" + app_logger = logging.getLogger(__name__) # Get logger for this module + # Show banner console.print( Panel.fit("[bold blue]TrackStudio[/bold blue] πŸŽ₯\nMulti-Camera Vision Tracking System", border_style="blue") ) - # Set up logging - if debug: - import logging # noqa: PLC0415 - - logging.basicConfig(level=logging.DEBUG) - # Load config file if provided config_data = {} if config: - with Path(config).open() as f: - config_data = json.load(f) + try: + with Path(config).open() as f: + config_data = json.load(f) console.print(f"[green]βœ“[/green] Loaded config from {config}") + except FileNotFoundError: + console.print(f"[red]Error:[/red] Config file not found at {config}") + app_logger.error(f"Config file not found: {config}") + sys.exit(1) + except json.JSONDecodeError as e: + console.print(f"[red]Error:[/red] Invalid JSON in config file {config}: {e}") + app_logger.error(f"Invalid JSON in config file {config}: {e}") + sys.exit(1) + except Exception as e: + console.print(f"[red]Error:[/red] Failed to load config file {config}: {e}") + app_logger.error(f"Failed to load config file {config}: {e}") + sys.exit(1) + # Use streams from command line or config - if config_data.get("rtsp_streams") is None: - config_data["rtsp_streams"] = ( - [*streams] if streams else ["rtsp://localhost:8554/camera0", "rtsp://localhost:8554/camera1"] - ) + # Command line --streams take precedence over config_data.rtsp_streams + if streams: + config_data["rtsp_streams"] = list(streams) + elif "rtsp_streams" not in config_data or not config_data["rtsp_streams"]: + # Fallback to defaults if neither CLI nor config provides streams + config_data["rtsp_streams"] = ["rtsp://localhost:8554/camera0", "rtsp://localhost:8554/camera1"] + app_logger.info("No RTSP streams specified, using defaults.") + + # Process camera names (can be from config or automatically generated) + if "camera_names" not in config_data or not config_data["camera_names"]: + num_streams_defined = len(config_data.get("rtsp_streams", [])) + config_data["camera_names"] = [f"Camera {i}" for i in range(num_streams_defined)] + app_logger.info(f"No camera names specified, generating {num_streams_defined} default names.") + + + # --- NEW: Handle calibration files and undistortion setting --- + # Prepare calibration file paths dictionary + cli_calibration_paths = {} + if calibration_files: + for item in calibration_files: + try: + stream_id_str, file_path = item.split('=', 1) + stream_id = int(stream_id_str) + cli_calibration_paths[stream_id] = file_path + except ValueError: + console.print(f"[red]Error:[/red] Invalid format for --calibration-files: '{item}'. Expected 'ID=PATH'.") + app_logger.error(f"Invalid format for --calibration-files: '{item}'. Expected 'ID=PATH'.") + sys.exit(1) + + # Merge CLI calibration paths with any from config_data, with CLI taking precedence + if "calibration_files" not in config_data: + config_data["calibration_files"] = {} + + # Ensure config_data["calibration_files"] is a dictionary and contains integer keys + if isinstance(config_data["calibration_files"], dict): + # Convert keys from string to int if they are strings (typical for JSON) + config_data["calibration_files"] = { + int(k): v for k, v in config_data["calibration_files"].items() + } + else: + app_logger.warning("Config file's 'calibration_files' is not a dictionary, ignoring it.") + config_data["calibration_files"] = {} # Reset to empty dict if malformed + + config_data["calibration_files"].update(cli_calibration_paths) + + # Set enable_undistortion flag. CLI argument takes precedence. + if enable_undistortion: + config_data["enable_undistortion"] = True + elif "enable_undistortion" not in config_data: + config_data["enable_undistortion"] = False # Default to False if not in config or CLI + + # --- End NEW: Handle calibration files and undistortion setting --- + + + # Import here to avoid circular imports and ensure ServerConfig is fully loaded + from . import launch + from .core.config import ServerConfig # Import ServerConfig here + + # Populate ServerConfig with the resolved values + ServerConfig.VISION_API_ENABLED = config_data.get("vision_api_enabled", ServerConfig.VISION_API_ENABLED) + ServerConfig.ENABLE_UNDISTORTION = config_data.get("enable_undistortion", False) # Ensure default is False + ServerConfig.CALIBRATION_FILES = config_data["calibration_files"] # Set the populated dict + + # Also ensure streams and names are set for ServerConfig.get_enabled_streams() + ServerConfig._config_data["rtsp_streams"] = config_data["rtsp_streams"] + ServerConfig._config_data["camera_names"] = config_data["camera_names"] - # Import here to avoid circular imports - from . import launch # noqa: PLC0415 # Display configuration table = Table(title="Configuration", show_header=False) @@ -80,6 +168,14 @@ def run(streams, config, tracker, merger, port, host, share, no_browser, vision_ table.add_row("Server", f"{config_data.get('server_name', host)}:{config_data.get('server_port', port)}") table.add_row("Share", "Yes" if config_data.get("share", share) else "No") table.add_row("Streams", str(len(config_data.get("rtsp_streams", [])))) + table.add_row("Enable Undistortion", "Yes" if ServerConfig.ENABLE_UNDISTORTION else "No") # Display new setting + + # Display calibration files if any + if ServerConfig.CALIBRATION_FILES: + cal_files_str = "\n".join([f" {k}: {v}" for k, v in ServerConfig.CALIBRATION_FILES.items()]) + table.add_row("Calibration Files", cal_files_str) + else: + table.add_row("Calibration Files", "None specified") console.print(table) console.print() @@ -91,7 +187,9 @@ def run(streams, config, tracker, merger, port, host, share, no_browser, vision_ console.print() try: - # Launch TrackStudio + # Launch TrackStudio. The 'launch' function should then read from ServerConfig. + # No need to pass calibration_file or rtsp_streams directly here anymore, + # as ServerConfig should hold the definitive truth. app = launch( tracker=config_data.get("tracker", tracker), merger=config_data.get("merger", merger), @@ -100,8 +198,9 @@ def run(streams, config, tracker, merger, port, host, share, no_browser, vision_ server_port=config_data.get("server_port", port), share=config_data.get("share", share), open_browser=config_data.get("open_browser", not no_browser), - calibration_file=config_data.get("calibration_file", calibration_file), - rtsp_streams=config_data.get("rtsp_streams", streams), + # calibration_file=config_data.get("calibration_file", calibration_file), # REMOVED + # rtsp_streams=config_data.get("rtsp_streams", streams), # REMOVED + # The launch function should implicitly use ServerConfig now ) # Keep running until interrupted @@ -113,8 +212,7 @@ def run(streams, config, tracker, merger, port, host, share, no_browser, vision_ except Exception as e: console.print(f"\n[red]Error: {e}[/red]") if debug: - import traceback # noqa: PLC0415 - + import traceback traceback.print_exc() sys.exit(1) @@ -122,7 +220,7 @@ def run(streams, config, tracker, merger, port, host, share, no_browser, vision_ @cli.command() def demo(): """Run TrackStudio with demo configuration""" - from . import demo as run_demo # noqa: PLC0415 + from . import demo as run_demo console.print( Panel.fit( @@ -137,7 +235,7 @@ def demo(): @cli.command() def list(): """List available trackers and mergers""" - from . import list_mergers, list_trackers # noqa: PLC0415 + from . import list_mergers, list_trackers # Create trackers table trackers_table = Table(title="Available Trackers") @@ -170,10 +268,36 @@ def list(): @click.argument("stream_urls", nargs=-1, required=True) @click.option("--output", "-o", default="config.json", help="Output configuration file") @click.option("--names", "-n", multiple=True, help="Camera names (same order as streams)") -def config(stream_urls, output, names): +@click.option( + "--calibration-files", + "-cf", + multiple=True, + type=str, + metavar="ID=PATH", + help="Calibration data files to include in the config (e.g., -cf 0=cam0.json -cf 1=cam1.json)." +) +@click.option( + "--enable-undistortion", + "-eu", + is_flag=True, + help="Include enable_undistortion flag in the config file." +) +def config(stream_urls, output, names, calibration_files, enable_undistortion): """Generate configuration file""" - # Create config + # Process CLI calibration files for config generation + config_cal_files = {} + if calibration_files: + for item in calibration_files: + try: + stream_id_str, file_path = item.split('=', 1) + stream_id = int(stream_id_str) + config_cal_files[str(stream_id)] = file_path # Store as string keys for JSON + except ValueError: + console.print(f"[red]Error:[/red] Invalid format for --calibration-files: '{item}'. Expected 'ID=PATH'.") + sys.exit(1) + + # Create config data config_data = { "rtsp_streams": list(stream_urls), "camera_names": list(names) if names else [f"Camera {i}" for i in range(len(stream_urls))], @@ -182,16 +306,21 @@ def config(stream_urls, output, names): "vision_fps": 10.0, "server_port": 8000, "server_name": "127.0.0.1", + "calibration_files": config_cal_files, # Add calibration files to config + "enable_undistortion": enable_undistortion # Add enable_undistortion to config } # Write config - with Path(output).open("w") as f: - json.dump(config_data, f, indent=2) - - console.print(f"[green]βœ“[/green] Configuration saved to {output}") - console.print("\nGenerated configuration:") - console.print(json.dumps(config_data, indent=2)) - console.print(f"\nRun with: [cyan]trackstudio run --config {output}[/cyan]") + try: + with Path(output).open("w") as f: + json.dump(config_data, f, indent=2) + console.print(f"[green]βœ“[/green] Configuration saved to {output}") + console.print("\nGenerated configuration:") + console.print(json.dumps(config_data, indent=2)) + console.print(f"\nRun with: [cyan]trackstudio run --config {output}[/cyan]") + except Exception as e: + console.print(f"[red]Error:[/red] Failed to write config file {output}: {e}") + sys.exit(1) def main(): @@ -200,4 +329,4 @@ def main(): if __name__ == "__main__": - main() + main() \ No newline at end of file diff --git a/trackstudio/core/config.py b/trackstudio/core/config.py index e0b0f82..d49f3ff 100644 --- a/trackstudio/core/config.py +++ b/trackstudio/core/config.py @@ -3,192 +3,352 @@ """ import os -from typing import Any +import json # Added import for json +import logging # Added import for logging +from pathlib import Path +from typing import List, Dict, Any, Tuple, Optional -class ServerConfig: - """Server-specific configuration""" +# Initialize logger for this module +logger = logging.getLogger(__name__) +class ServerConfig: + """ + Server-specific configuration settings for TrackStudio. + Configuration can be sourced from environment variables, defaults, + and a loaded JSON configuration file. CLI arguments will then override these. + """ + # Server Settings - use SERVER_IP environment variable SERVER_IP = os.getenv("SERVER_IP", "localhost") SERVER_NAME = os.getenv("SERVER_NAME", "localhost") SERVER_PORT = int(os.getenv("SERVER_PORT", "8002")) SERVER_RELOAD = os.getenv("SERVER_RELOAD", "true").lower() == "true" - + # CORS Settings - include both localhost and SERVER_IP CORS_ORIGINS = [ "http://localhost:5174", # Vite dev server "http://localhost:3000", # Alternative dev server f"http://{SERVER_IP}:3000", f"http://{SERVER_IP}:5173", - f"http://{SERVER_IP}:5174", + f"http://{SERVER_IP}:5174" ] - + # Add localhost variants if SERVER_IP is different if SERVER_IP != "localhost": - CORS_ORIGINS.extend(["http://127.0.0.1:3000", "http://127.0.0.1:5173", "http://127.0.0.1:5174"]) - + CORS_ORIGINS.extend([ + "http://127.0.0.1:3000", + "http://127.0.0.1:5173", + "http://127.0.0.1:5174" + ]) + # WebRTC Settings - STUN_SERVERS = ["stun:stun.l.google.com:19302", "stun:stun1.l.google.com:19302", "stun:stun2.l.google.com:19302"] - + STUN_SERVERS = [ + "stun:stun.l.google.com:19302", + "stun:stun1.l.google.com:19302", + "stun:stun2.l.google.com:19302" + ] + # WebRTC Connection Timeouts (prevent ICE transaction issues) WEBRTC_TIMEOUTS = { "answer_timeout": 20.0, # Increased from 15s for multi-stream - "ice_timeout": 30.0, # ICE gathering timeout + "ice_timeout": 30.0, # ICE gathering timeout "connection_timeout": 45.0, # Overall connection timeout - "close_timeout": 5.0, # Peer connection close timeout + "close_timeout": 5.0 # Peer connection close timeout } - + # Integration with Vision Package - use SERVER_IP for stream URLs VISION_API_ENABLED = True STREAM_SERVER_URL = f"rtmp://{SERVER_IP}:1936" # Default RTMP server for backward compatibility STREAM_STAT_URL = f"http://{SERVER_IP}:8085/stat" # Stream statistics URL - + # Camera Resolution Configuration CAMERA_RESOLUTION = { "individual_width": int(os.getenv("CAMERA_WIDTH", "720")), "individual_height": int(os.getenv("CAMERA_HEIGHT", "480")), "combined_width": int(os.getenv("COMBINED_WIDTH", "1440")), "combined_height": int(os.getenv("COMBINED_HEIGHT", "480")), - "fps": int(os.getenv("CAMERA_FPS", "15")), + "fps": int(os.getenv("CAMERA_FPS", "15")) } - - # BEV Coordinate System Configuration + + # BEV Coordinate System Configuration # Adjust these to match your real-world calibration scale BEV_CONFIG = { - "calibration_canvas_size": int(os.getenv("BEV_CANVAS_SIZE", "600")), # Calibration canvas size (pixels) - "real_world_area_meters": float(os.getenv("BEV_AREA_METERS", "12.0")), # Real world area represented (meters) - "max_coordinate_range": float(os.getenv("BEV_MAX_RANGE", "10.0")), # Maximum coordinate range (Β±meters) + "calibration_canvas_size": int(os.getenv("BEV_CANVAS_SIZE", "600")), # Calibration canvas size (pixels) + "real_world_area_meters": float(os.getenv("BEV_AREA_METERS", "12.0")), # Real world area represented (meters) + "max_coordinate_range": float(os.getenv("BEV_MAX_RANGE", "10.0")) # Maximum coordinate range (Β±meters) } - + # Stream Configuration (Configurable RTMP/RTSP streams, max 4) STREAM_CONFIG = { "max_streams": 4, - "active_streams": int(os.getenv("NUM_STREAMS", "2")), # Number of active streams (1-4) - "layout_mode": os.getenv("LAYOUT_MODE", "auto"), # auto, grid, horizontal, vertical + "active_streams": int(os.getenv("NUM_STREAMS", "2")), # Number of active streams (1-4) + "layout_mode": os.getenv("LAYOUT_MODE", "auto"), # auto, grid, horizontal, vertical } - + # Stream Sources Configuration (RTMP/RTSP with codec specification) STREAM_SOURCES = [ { "id": 0, "name": os.getenv("STREAM_0_NAME", "Stream 0"), - "type": os.getenv("STREAM_0_TYPE", "rtmp"), # rtmp or rtsp + "type": os.getenv("STREAM_0_TYPE", "rtmp"), # rtmp or rtsp "url": os.getenv("STREAM_0_URL", f"{STREAM_SERVER_URL}/live/stream0"), - "codec": os.getenv("STREAM_0_CODEC", "h264"), # h264, h265, mjpeg, auto + "codec": os.getenv("STREAM_0_CODEC", "h264"), # h264, h265, mjpeg, auto "enabled": os.getenv("STREAM_0_ENABLED", "true").lower() == "true", - "position": {"x": 0, "y": 0}, # Grid position for layout + "position": {"x": 0, "y": 0} # Grid position for layout }, { "id": 1, - "name": os.getenv("STREAM_1_NAME", "Stream 1"), - "type": os.getenv("STREAM_1_TYPE", "rtmp"), # rtmp or rtsp + "name": os.getenv("STREAM_1_NAME", "Stream 1"), + "type": os.getenv("STREAM_1_TYPE", "rtmp"), # rtmp or rtsp "url": os.getenv("STREAM_1_URL", f"{STREAM_SERVER_URL}/live/stream1"), - "codec": os.getenv("STREAM_1_CODEC", "h264"), # h264, h265, mjpeg, auto + "codec": os.getenv("STREAM_1_CODEC", "h264"), # h264, h265, mjpeg, auto "enabled": os.getenv("STREAM_1_ENABLED", "true").lower() == "true", - "position": {"x": 1, "y": 0}, # Grid position for layout + "position": {"x": 1, "y": 0} # Grid position for layout }, { "id": 2, "name": os.getenv("STREAM_2_NAME", "Stream 2"), - "type": os.getenv("STREAM_2_TYPE", "rtsp"), # rtmp or rtsp + "type": os.getenv("STREAM_2_TYPE", "rtsp"), # rtmp or rtsp "url": os.getenv("STREAM_2_URL", "rtsp://192.168.1.100:554/stream"), - "codec": os.getenv("STREAM_2_CODEC", "h264"), # h264, h265, mjpeg, auto + "codec": os.getenv("STREAM_2_CODEC", "h264"), # h264, h265, mjpeg, auto "enabled": os.getenv("STREAM_2_ENABLED", "false").lower() == "true", - "position": {"x": 0, "y": 1}, # Grid position for layout + "position": {"x": 0, "y": 1} # Grid position for layout }, { "id": 3, "name": os.getenv("STREAM_3_NAME", "Stream 3"), - "type": os.getenv("STREAM_3_TYPE", "rtsp"), # rtmp or rtsp + "type": os.getenv("STREAM_3_TYPE", "rtsp"), # rtmp or rtsp "url": os.getenv("STREAM_3_URL", "rtsp://192.168.1.101:554/stream"), - "codec": os.getenv("STREAM_3_CODEC", "h264"), # h264, h265, mjpeg, auto + "codec": os.getenv("STREAM_3_CODEC", "h264"), # h264, h265, mjpeg, auto "enabled": os.getenv("STREAM_3_ENABLED", "false").lower() == "true", - "position": {"x": 1, "y": 1}, # Grid position for layout - }, + "position": {"x": 1, "y": 1} # Grid position for layout + } ] - # Helper methods for stream configuration + # Undistortion settings - now configurable via config file or CLI + # This default will be overridden by load_config_from_file and then by CLI + ENABLE_UNDISTORTION = os.getenv("ENABLE_UNDISTORTION", "false").lower() == "true" # Default to false unless explicitly enabled + + # NEW: Dictionary to store paths to calibration files, keyed by stream ID + CALIBRATION_FILES: Dict[int, str] = {} # This will be populated by load_config_from_file or directly by cli.py + + # Internal dictionary to hold raw config data loaded from a JSON file + _config_data: Dict[str, Any] = {} + @classmethod - def get_enabled_streams(cls) -> list[dict[str, Any]]: - """Get list of enabled streams""" - enabled = [stream for stream in cls.STREAM_SOURCES if stream["enabled"]] - # Limit to active_streams count - return enabled[: cls.STREAM_CONFIG["active_streams"]] + def load_config_from_file(cls, file_path: str): + """ + Loads configuration from a JSON file into _config_data and applies + relevant settings to class attributes. + """ + try: + with Path(file_path).open('r') as f: + loaded_data = json.load(f) + cls._config_data.update(loaded_data) # Update internal config data + logger.info(f"Loaded server configuration from {file_path}") + cls._apply_config_data() + except FileNotFoundError: + logger.warning(f"Config file not found at {file_path}. Using environment variables and defaults.") + except json.JSONDecodeError: + logger.error(f"Invalid JSON in config file: {file_path}. Please check its format.") + except Exception as e: + logger.error(f"Error loading config file {file_path}: {e}") @classmethod - def get_active_stream_count(cls) -> int: - """Get number of active streams""" - return min(cls.STREAM_CONFIG["active_streams"], len([s for s in cls.STREAM_SOURCES if s["enabled"]])) + def _apply_config_data(cls): + """ + Applies loaded raw config data from _config_data to relevant class attributes. + This method is typically called internally after loading a config file. + """ + # --- Apply general server settings from config file if present --- + cls.SERVER_IP = cls._config_data.get("server_ip", cls.SERVER_IP) + cls.SERVER_NAME = cls._config_data.get("server_name", cls.SERVER_NAME) + cls.SERVER_PORT = cls._config_data.get("server_port", cls.SERVER_PORT) + # Convert reload string to boolean + if "server_reload" in cls._config_data: + cls.SERVER_RELOAD = str(cls._config_data["server_reload"]).lower() == "true" + + # --- Apply Stream Sources from config file if present --- + # This allows defining streams directly in the config JSON + if "stream_sources" in cls._config_data and isinstance(cls._config_data["stream_sources"], list): + # Clear existing default streams and load from config + cls.STREAM_SOURCES = [] + for stream_entry in cls._config_data["stream_sources"]: + # Ensure 'enabled' is boolean, and 'id' is int + if isinstance(stream_entry, dict) and 'id' in stream_entry and 'url' in stream_entry: + stream_entry['enabled'] = str(stream_entry.get('enabled', 'true')).lower() == 'true' + stream_entry['id'] = int(stream_entry['id']) + cls.STREAM_SOURCES.append(stream_entry) + else: + logger.warning(f"Malformed stream entry in config: {stream_entry}. Skipping.") + logger.info(f"Loaded {len(cls.STREAM_SOURCES)} stream sources from config file.") + + # --- Apply Camera Resolution from config file if present --- + if "camera_resolution" in cls._config_data and isinstance(cls._config_data["camera_resolution"], dict): + cls.CAMERA_RESOLUTION.update(cls._config_data["camera_resolution"]) + logger.info("Loaded camera resolution from config file.") + + # --- Apply BEV config from config file if present --- + if "bev_config" in cls._config_data and isinstance(cls._config_data["bev_config"], dict): + cls.BEV_CONFIG.update(cls._config_data["bev_config"]) + logger.info("Loaded BEV configuration from config file.") + + # --- Apply Stream Config (max_streams, active_streams, layout_mode) from config file if present --- + if "stream_config" in cls._config_data and isinstance(cls._config_data["stream_config"], dict): + cls.STREAM_CONFIG.update(cls._config_data["stream_config"]) + # Ensure active_streams is an int and within bounds + if isinstance(cls.STREAM_CONFIG.get("active_streams"), (str, int)): + cls.STREAM_CONFIG["active_streams"] = min(int(cls.STREAM_CONFIG["active_streams"]), cls.STREAM_CONFIG["max_streams"]) + logger.info("Loaded stream configuration (max_streams, active_streams, layout_mode) from config file.") + + # --- NEW: Apply ENABLE_UNDISTORTION from config file --- + if "enable_undistortion" in cls._config_data: + cls.ENABLE_UNDISTORTION = str(cls._config_data["enable_undistortion"]).lower() == "true" + logger.info(f"ENABLE_UNDISTORTION set to {cls.ENABLE_UNDISTORTION} from config.") + + # --- NEW: Apply CALIBRATION_FILES from config file --- + if "calibration_files" in cls._config_data and isinstance(cls._config_data["calibration_files"], dict): + # Convert keys to integers as stream IDs are integers + cls.CALIBRATION_FILES = { + int(k): v for k, v in cls._config_data["calibration_files"].items() + } + logger.info(f"CALIBRATION_FILES loaded from config: {cls.CALIBRATION_FILES}") + else: + cls.CALIBRATION_FILES = {} # Ensure it's empty if not found or malformed + if "calibration_files" in cls._config_data: + logger.warning("Config file's 'calibration_files' is not a dictionary or malformed, ignoring it.") + + # --- For backward compatibility with simpler config structures: --- + # If 'rtsp_streams' and 'camera_names' are in the main _config_data, + # ensure they are handled by get_enabled_streams. + # These are used as a fallback if STREAM_SOURCES is not explicitly defined in the config. + # The cli.py will set these directly if --streams is used. + # No explicit processing needed here for _config_data["rtsp_streams"] and _config_data["camera_names"] + # as get_enabled_streams accesses them directly from _config_data. + + + # Helper methods for stream configuration @classmethod - def get_stream_by_id(cls, stream_id: int) -> dict[str, Any]: - """Get stream configuration by ID""" + def get_enabled_streams(cls) -> List[Dict[str, Any]]: + """ + Get list of enabled streams based on STREAM_SOURCES. + Prioritizes streams from STREAM_SOURCES with 'enabled: true'. + If STREAM_SOURCES is not configured, it tries to use 'rtsp_streams' and 'camera_names' from _config_data. + """ + # If STREAM_SOURCES were loaded or are defined, use them as primary source + if cls.STREAM_SOURCES: + enabled_from_sources = [stream for stream in cls.STREAM_SOURCES if stream.get("enabled", False)] + # Limit to active_streams count, ensure order if position is used + # For simplicity, we just limit by count, not specific positions here. + return enabled_from_sources[:cls.STREAM_CONFIG["active_streams"]] + + # Fallback for older config styles that use "rtsp_streams" and "camera_names" + # This branch is primarily for when STREAM_SOURCES is not defined in the config file. + streams = [] + rtsp_urls = cls._config_data.get("rtsp_streams") + camera_names = cls._config_data.get("camera_names") + + if rtsp_urls and isinstance(rtsp_urls, list): + for i, url in enumerate(rtsp_urls): + stream_name = f"Camera {i}" + if camera_names and i < len(camera_names) and isinstance(camera_names[i], str): + stream_name = camera_names[i] + + # Check if stream is implicitly enabled (e.g., within active_streams limit) + # For this fallback, all streams in rtsp_urls are considered "enabled" up to active_streams limit + if i < cls.STREAM_CONFIG["active_streams"]: + streams.append({ + "id": i, + "name": stream_name, + "type": "rtsp", # Assume RTSP for this fallback structure + "url": url, + "codec": "h264", # Assume H264 for this fallback + "enabled": True, + "position": {"x": i % 2, "y": i // 2} # Simple grid position + }) + return streams + + @classmethod + def get_active_stream_count(cls) -> int: + """Get number of active streams configured to be used.""" + return len(cls.get_enabled_streams()) # Calculated from the result of get_enabled_streams + + @classmethod + def get_stream_by_id(cls, stream_id: int) -> Optional[Dict[str, Any]]: + """Get stream configuration by ID from STREAM_SOURCES.""" for stream in cls.STREAM_SOURCES: if stream["id"] == stream_id: return stream - raise ValueError(f"Stream {stream_id} not found in configuration") - + logger.warning(f"Stream {stream_id} not found in STREAM_SOURCES configuration.") + return None + # Legacy camera compatibility (for backward compatibility) + # This builds a list of "cameras" from the enabled streams. @classmethod - def get_default_cameras(cls) -> list[dict[str, Any]]: - """Build legacy camera list from stream sources""" + def get_default_cameras(cls) -> List[Dict[str, Any]]: + """Build legacy camera list from enabled stream sources for backward compatibility.""" return [ { "id": stream["id"], "name": stream["name"], - "stream_url": stream["url"], + "stream_url": stream["url"], "enabled": stream["enabled"], "resolution": { "width": cls.CAMERA_RESOLUTION["individual_width"], "height": cls.CAMERA_RESOLUTION["individual_height"], - "fps": cls.CAMERA_RESOLUTION["fps"], - }, - } - for stream in cls.get_enabled_streams() + "fps": cls.CAMERA_RESOLUTION["fps"] + } + } for stream in cls.get_enabled_streams() # Uses the modern get_enabled_streams now ] + + # Initialize DEFAULT_CAMERAS dynamically when the class is loaded for first time. + # This ensures it's always up-to-date with initial config. + DEFAULT_CAMERAS: List[Dict[str, Any]] = [] # Placeholder - # Initialize DEFAULT_CAMERAS dynamically for backward compatibility - DEFAULT_CAMERAS = [] # Will be populated dynamically - + @classmethod def _update_default_cameras(cls): - """Update DEFAULT_CAMERAS list dynamically""" + """Force update of DEFAULT_CAMERAS list dynamically.""" cls.DEFAULT_CAMERAS = cls.get_default_cameras() - + @classmethod - def get_camera_config(cls, camera_id: int) -> dict[str, Any]: - """Get configuration for a specific camera""" - # Update DEFAULT_CAMERAS if needed + def get_camera_config(cls, camera_id: int) -> Dict[str, Any]: + """Get configuration for a specific camera from the DEFAULT_CAMERAS list.""" + # Ensure DEFAULT_CAMERAS is populated (e.g., after loading config) if not cls.DEFAULT_CAMERAS: cls._update_default_cameras() - + for camera in cls.DEFAULT_CAMERAS: if camera["id"] == camera_id: return camera - raise ValueError(f"Camera {camera_id} not found in configuration") - + raise ValueError(f"Camera {camera_id} not found in configuration's DEFAULT_CAMERAS.") + @classmethod - def get_camera_resolution(cls) -> dict[str, Any]: - """Get camera resolution configuration""" + def get_camera_resolution(cls) -> Dict[str, Any]: + """Get camera resolution configuration.""" return cls.CAMERA_RESOLUTION.copy() - + @classmethod - def get_combined_resolution(cls) -> tuple[int, int]: - """Get combined stream resolution (width, height)""" + def get_combined_resolution(cls) -> Tuple[int, int]: + """Get combined stream resolution (width, height).""" return (cls.CAMERA_RESOLUTION["combined_width"], cls.CAMERA_RESOLUTION["combined_height"]) - + @classmethod - def get_individual_resolution(cls) -> tuple[int, int]: - """Get individual camera resolution (width, height)""" + def get_individual_resolution(cls) -> Tuple[int, int]: + """Get individual camera resolution (width, height).""" return (cls.CAMERA_RESOLUTION["individual_width"], cls.CAMERA_RESOLUTION["individual_height"]) - + @classmethod def get_stream_url(cls, stream_name: str) -> str: - """Get stream URL for a specific stream (RTMP by default)""" + """Get stream URL for a specific stream (RTMP by default for legacy).""" return f"{cls.STREAM_SERVER_URL}/live/{stream_name}" - - @classmethod + + @classmethod def get_rtsp_url(cls, stream_name: str) -> str: - """Get RTSP URL for a specific stream (legacy method for backward compatibility)""" - return cls.get_stream_url(stream_name) + """Get RTSP URL for a specific stream (legacy method for backward compatibility).""" + # This is a legacy method. Ideally, streams should be configured directly in STREAM_SOURCES + # with their specific URL and type. + logger.warning("Using legacy get_rtsp_url. Configure streams directly in STREAM_SOURCES for better control.") + return cls.get_stream_url(stream_name) \ No newline at end of file diff --git a/trackstudio/core/stream_combiner.py b/trackstudio/core/stream_combiner.py index d3d4b56..4af9770 100644 --- a/trackstudio/core/stream_combiner.py +++ b/trackstudio/core/stream_combiner.py @@ -6,97 +6,201 @@ """ import asyncio -import contextlib import logging +import cv2 +import numpy as np +import os import time +import contextlib # Import contextlib for suppress +from typing import Optional, Dict, List, Deque, Any from collections import deque from fractions import Fraction -from typing import Optional -import cv2 -import numpy as np +# Assuming these are in your project's structure relative to this file from aiortc import VideoStreamTrack -from .config import ServerConfig -from .vision_api import VisionResult +from .vision_api import VisionResult, get_vision_api +from .config import ServerConfig # Import ServerConfig logger = logging.getLogger(__name__) -# Vision API will be set by TrackStudioApp -vision_api = None - +# Global VisionAPI instance, set externally via StreamCombinerManager.set_vision_api +vision_api = None class StreamFrame: """Container for a frame with its capture timestamp and stream ID""" - def __init__(self, frame: np.ndarray, timestamp: float, stream_id: int): self.frame = frame self.timestamp = timestamp self.stream_id = stream_id - class StreamCombinerTrack(VideoStreamTrack): """VideoTrack that captures individual RTMP/RTSP streams and combines them with manual delays""" - + def __init__(self): super().__init__() - + # Individual stream captures self.stream_caps = {} # {stream_id: cv2.VideoCapture} self.is_running = False - + # Initialize timing attributes required by aiortc self._timestamp = 0 self._start_time = None - + # Simple vision processing flag self.vision_processing = False - - # Get active streams from config + + # Get active streams from config - these are set via ServerConfig from cli.py self.enabled_streams = ServerConfig.get_enabled_streams() self.active_stream_ids = [stream["id"] for stream in self.enabled_streams] - + # Time-shift delay buffer system (smooth delayed video) - dynamic streams self.stream_delays = dict.fromkeys(self.active_stream_ids, 0) self.frame_buffers = {stream_id: deque() for stream_id in self.active_stream_ids} self.last_frames = dict.fromkeys(self.active_stream_ids) - + # Stream status tracking self.stream_status = dict.fromkeys(self.active_stream_ids, "initializing") # initializing, ready, error - self.stream_init_timeout = { - stream_id: time.time() + 30.0 for stream_id in self.active_stream_ids - } # 30 second timeout - + self.stream_init_timeout = {stream_id: time.time() + 30.0 for stream_id in self.active_stream_ids} # 30 second timeout + # Frame read statistics for debugging flickering - self.frame_read_stats = { - stream_id: {"success": 0, "failures": 0, "last_failure_time": 0} for stream_id in self.active_stream_ids - } + self.frame_read_stats = {stream_id: {"success": 0, "failures": 0, "last_failure_time": 0} for stream_id in self.active_stream_ids} self.stats_report_interval = 100 # Report every 100 frames self.frame_counter_for_stats = 0 - + logger.info(f"πŸ“Œ Initialized per-stream delays: {self.stream_delays}") - + # FPS calculation self.fps = 30 # target FPS self.frame_count = 0 self.last_fps_time = time.time() self.measured_fps = 0.0 - + # Vision processor result - self.latest_vision_result: VisionResult | None = None - - # Background vision processing task - + self.latest_vision_result: Optional['VisionResult'] = None + logger.info("🎬 Video track created - streams will be initialized on-demand") - + # Vision tracking is now opt-in - must be explicitly enabled via API logger.info("🧠 Vision tracking is disabled by default - enable via /api/vision/enable if needed") + # Add undistortion maps storage + self.undistort_maps = {} # {stream_id: (map1, map2)} + self.new_camera_matrices = {} # Store the optimized K matrices + + # Initialize undistortion after other init + logger.info("πŸ”§ Initializing undistortion maps...") + self._initialize_undistortion() + + def _initialize_undistortion(self): + """Initialize undistortion maps for each camera""" + try: + # Check ServerConfig to see if undistortion is enabled + if not ServerConfig.ENABLE_UNDISTORTION: + logger.info("ℹ️ Undistortion is disabled in ServerConfig.") + return + + # Import CameraCalibration from the appropriate path (e.g., .calibration if in same package) + from ..calibration import CameraCalibration + + # Initialize CameraCalibration. It will automatically try to load from ServerConfig.CALIBRATION_FILES + # if no specific paths are provided here, which is what we want with CLI integration. + calibration = CameraCalibration() + + for stream_id in self.active_stream_ids: + K, D, original_size, new_K_from_file = calibration.load_intrinsic_calibration(stream_id) + if K is not None and D is not None: + # Create undistortion maps at ORIGINAL resolution + orig_w, orig_h = original_size # e.g., 2560x1440 + + # Check distortion model based on coefficient count + if len(D) == 4: # Fisheye has 4 distortion coefficients (k1, k2, k3, k4) + logger.info(f"🐟 Using fisheye undistortion for stream {stream_id}") + # Fisheye undistortion + if new_K_from_file is not None: + new_K = new_K_from_file + else: + # If new_K is not in the calibration file, estimate it + new_K = cv2.fisheye.estimateNewCameraMatrixForUndistortRectify( + K, D, (orig_w, orig_h), np.eye(3), balance=0.0 # balance=0.0 means no black borders + ) + map1, map2 = cv2.fisheye.initUndistortRectifyMap( + K, D, np.eye(3), new_K, (orig_w, orig_h), cv2.CV_16SC2 + ) + else: + # For rational model (8 or 14 coefficients) or standard (5 coefficients) + logger.info(f"πŸ“· Using standard undistortion for stream {stream_id} ({len(D)} coefficients)") + + # USE THE PRE-COMPUTED new_K IF AVAILABLE + if new_K_from_file is not None: + new_K = new_K_from_file + logger.info(f"βœ… Using pre-computed optimal camera matrix from calibration (alpha=0)") + else: + # Calculate it if not provided (fallback) + # cv2.getOptimalNewCameraMatrix can handle D with more than 5 coefficients + new_K, roi = cv2.getOptimalNewCameraMatrix( + K, D, (orig_w, orig_h), 0.0, (orig_w, orig_h) # alpha=0.0 means minimal black regions + ) + logger.info(f"πŸ“Š Calculated optimal camera matrix with alpha=0.0") + + # Create undistortion maps - use all coefficients for initUndistortRectifyMap + map1, map2 = cv2.initUndistortRectifyMap( + K, D, None, new_K, (orig_w, orig_h), cv2.CV_16SC2 + ) + + self.undistort_maps[stream_id] = (map1, map2) + self.new_camera_matrices[stream_id] = new_K + + logger.info(f"βœ… Initialized undistortion for stream {stream_id} at {orig_w}x{orig_h}") + logger.info(f" Using new camera matrix: {new_K is not None}") + + except Exception as e: + logger.error(f"Error initializing undistortion: {e}") + import traceback + logger.error(traceback.format_exc()) + + def _undistort_frame(self, frame: np.ndarray, stream_id: int) -> np.ndarray: + """Apply undistortion to a frame if calibration is available and enabled.""" + # Only proceed if undistortion is enabled in ServerConfig + if not ServerConfig.ENABLE_UNDISTORTION or stream_id not in self.undistort_maps: + return frame # Return original frame if disabled or no maps for this stream + + try: + map1, map2 = self.undistort_maps[stream_id] + + # Get current frame dimensions + frame_h, frame_w = frame.shape[:2] + + # Calibration resolution is hardcoded to 2560x1440 for consistency + calib_w, calib_h = 2560, 1440 + + # Always resize to calibration resolution before undistorting + # This ensures consistent results regardless of input resolution + if (frame_w, frame_h) != (calib_w, calib_h): + frame_full = cv2.resize(frame, (calib_w, calib_h), interpolation=cv2.INTER_LINEAR) + else: + frame_full = frame + + # Apply undistortion at full resolution + undistorted_full = cv2.remap(frame_full, map1, map2, cv2.INTER_LINEAR) + + # Now resize to target resolution for display (720x480) + undistorted_small = cv2.resize(undistorted_full, (720, 480), interpolation=cv2.INTER_LINEAR) + + return undistorted_small + + except Exception as e: + logger.error(f"Error undistorting frame for stream {stream_id}: {e}") + import traceback + logger.error(traceback.format_exc()) + return frame # Return original frame on error + def _create_status_frame(self, width: int, height: int, stream_id: int, status: str) -> np.ndarray: """Create a status frame with text overlay""" # Create black frame frame = np.zeros((height, width, 3), dtype=np.uint8) - + # Add some color for visual feedback if status == "initializing": # Blue tint for initializing @@ -127,77 +231,63 @@ def _create_status_frame(self, width: int, height: int, stream_id: int, status: else: text = f"Stream {stream_id} - {status}" color = (255, 255, 255) # White text - + try: - import cv2 # noqa: PLC0415 - # Add main text font = cv2.FONT_HERSHEY_SIMPLEX font_scale = 0.8 thickness = 2 - + # Get text size for centering (text_width, text_height), baseline = cv2.getTextSize(text, font, font_scale, thickness) text_x = (width - text_width) // 2 text_y = (height + text_height) // 2 - + # Add text with outline for better visibility - cv2.putText( - frame, text, (text_x - 1, text_y - 1), font, font_scale, (0, 0, 0), thickness + 1 - ) # Black outline + cv2.putText(frame, text, (text_x-1, text_y-1), font, font_scale, (0, 0, 0), thickness+1) # Black outline cv2.putText(frame, text, (text_x, text_y), font, font_scale, color, thickness) - + # Add smaller status text below status_text = f"Status: {status.title()}" font_scale_small = 0.5 (status_width, status_height), _ = cv2.getTextSize(status_text, font, font_scale_small, 1) status_x = (width - status_width) // 2 status_y = text_y + 40 - - cv2.putText( - frame, status_text, (status_x - 1, status_y - 1), font, font_scale_small, (0, 0, 0), 2 - ) # Black outline - cv2.putText( - frame, status_text, (status_x, status_y), font, font_scale_small, (200, 200, 200), 1 - ) # Light gray text - + + cv2.putText(frame, status_text, (status_x-1, status_y-1), font, font_scale_small, (0, 0, 0), 2) # Black outline + cv2.putText(frame, status_text, (status_x, status_y), font, font_scale_small, (200, 200, 200), 1) # Light gray text + except Exception as e: logger.warning(f"Could not add text overlay to status frame: {e}") - + return frame - + def _build_stream_captures(self): """Build simple OpenCV VideoCapture objects for each stream""" logger.info(f"🎬 Creating stream captures for {len(self.enabled_streams)} streams") - print(f"🎬 Creating stream captures for {len(self.enabled_streams)} streams") - + print(f"🎬 Creating stream captures for {len(self.enabled_streams)} streams") # Keep print for immediate feedback + for stream in self.enabled_streams: stream_id = stream["id"] stream_url = stream["url"] stream_name = stream["name"] - + try: # Simple direct capture - OpenCV will use FFmpeg backend automatically logger.info(f"πŸ“Ή Opening stream {stream_name} (ID: {stream_id}): {stream_url}") - print(f"πŸ“Ή Opening stream {stream_name} (ID: {stream_id}): {stream_url}") - + print(f"πŸ“Ή Opening stream {stream_name} (ID: {stream_id}): {stream_url}") # Keep print + # Set environment variable to control FFmpeg timeout - import os # noqa: PLC0415 - # Increase timeout to 60 seconds for IP cameras, use TCP transport - os.environ["OPENCV_FFMPEG_CAPTURE_OPTIONS"] = ( - "rtsp_transport;tcp|timeout;60000000|buffer_size;1024000|max_delay;5000000" - ) - + os.environ['OPENCV_FFMPEG_CAPTURE_OPTIONS'] = 'rtsp_transport;tcp|timeout;60000000|buffer_size;1024000|max_delay;5000000' + # Explicitly use FFmpeg backend to avoid GStreamer warnings self.stream_caps[stream_id] = cv2.VideoCapture(stream_url, cv2.CAP_FFMPEG) - + # Set timeout and buffer properties before checking if opened self.stream_caps[stream_id].set(cv2.CAP_PROP_OPEN_TIMEOUT_MSEC, 60000) # 60 second timeout - self.stream_caps[stream_id].set( - cv2.CAP_PROP_READ_TIMEOUT_MSEC, 5000 - ) # 5 second read timeout (shorter for responsiveness) - + self.stream_caps[stream_id].set(cv2.CAP_PROP_READ_TIMEOUT_MSEC, 5000) # 5 second read timeout (shorter for responsiveness) + # Set some buffer properties to reduce latency but allow some buffering for stability if self.stream_caps[stream_id].isOpened(): # Set buffer size to 2-3 frames to handle network jitter while maintaining low latency @@ -205,18 +295,18 @@ def _build_stream_captures(self): # Set FPS to match expected camera FPS self.stream_caps[stream_id].set(cv2.CAP_PROP_FPS, 30) logger.info(f"βœ… Stream {stream_id} opened successfully") - print(f"βœ… Stream {stream_id} opened successfully") + print(f"βœ… Stream {stream_id} opened successfully") # Keep print else: logger.warning(f"⚠️ Stream {stream_id} failed to open immediately, will retry...") - print(f"⚠️ Stream {stream_id} failed to open immediately, will retry...") - + print(f"⚠️ Stream {stream_id} failed to open immediately, will retry...") # Keep print + # Initialize stream status self.stream_status[stream_id] = "initializing" # Set timeout based on 4G mode is_4g_stream = os.getenv("4G_MODE", "false").lower() == "true" timeout_seconds = 60.0 if is_4g_stream else 30.0 self.stream_init_timeout[stream_id] = time.time() + timeout_seconds - + except Exception as e: logger.error(f"❌ Failed to initialize stream {stream_id} ({stream_name}): {e}") self.stream_status[stream_id] = "error" @@ -224,19 +314,19 @@ def _build_stream_captures(self): async def recv(self): """Receive frames from individual streams, apply delays via frame skipping, and combine manually""" current_time = time.time() - + # Calculate relative timestamp for both video and vision (always needed) if self._start_time is None: self._start_time = current_time relative_timestamp = current_time - self._start_time - + # Initialize capture in background if not already done if not self.stream_caps: asyncio.create_task(self._initialize_capture()) - + # Always return a frame - black/status frames if not ready, real video when ready # NEVER return None to prevent aiortc crashes - + # Calculate layout based on number of active streams (default to 2 if not initialized) num_streams = len(self.active_stream_ids) if self.active_stream_ids else 2 if num_streams == 1: @@ -245,36 +335,39 @@ async def recv(self): combined_width, combined_height = 1440, 480 else: combined_width, combined_height = 1440, 960 - + # If no streams are ready, return black frame if not self.stream_caps or not any(cap.isOpened() for cap in self.stream_caps.values()): # Create simple black frame black_frame = np.zeros((combined_height, combined_width, 3), dtype=np.uint8) rgb_frame = cv2.cvtColor(black_frame, cv2.COLOR_BGR2RGB) rgb_frame = np.ascontiguousarray(rgb_frame, dtype=np.uint8) - - from av import VideoFrame # noqa: PLC0415 - + + from av import VideoFrame av_frame = VideoFrame.from_ndarray(rgb_frame, format="rgb24") - + pts = int(relative_timestamp * 90000) av_frame.pts = pts av_frame.time_base = Fraction(1, 90000) - + return av_frame - + try: current_time = time.time() - + # === CAPTURE FRESH FRAMES FROM EACH STREAM === fresh_frames = {} for stream_id, cap in self.stream_caps.items(): ret, frame = cap.read() if ret and frame is not None: - # Resize frame to 720x480 if needed + # Apply undistortion FIRST if enabled + if ServerConfig.ENABLE_UNDISTORTION: + frame = self._undistort_frame(frame, stream_id) + + # Ensure final frame is 720x480 after any processing (undistortion includes resize) if frame.shape[:2] != (480, 720): frame = cv2.resize(frame, (720, 480)) - + fresh_frames[stream_id] = frame # Always update last_frames with fresh data self.last_frames[stream_id] = frame @@ -282,19 +375,16 @@ async def recv(self): if self.stream_status[stream_id] == "initializing": self.stream_status[stream_id] = "ready" logger.info(f"βœ… Stream {stream_id} is now ready") - + # Track successful frame reads self.frame_read_stats[stream_id]["success"] += 1 else: # Track failed frame reads self.frame_read_stats[stream_id]["failures"] += 1 self.frame_read_stats[stream_id]["last_failure_time"] = current_time - + # Check for timeout - if ( - current_time > self.stream_init_timeout[stream_id] - and self.stream_status[stream_id] == "initializing" - ): + if current_time > self.stream_init_timeout[stream_id] and self.stream_status[stream_id] == "initializing": self.stream_status[stream_id] = "timeout" logger.warning(f"⏰ Stream {stream_id} initialization timeout") elif self.stream_status[stream_id] == "ready": @@ -303,7 +393,7 @@ async def recv(self): logger.warning(f"πŸ“‘ Stream {stream_id} stopped receiving frames") # Schedule reconnection attempt asyncio.create_task(self._reconnect_stream(stream_id)) - + # Report frame read statistics periodically to identify flickering causes self.frame_counter_for_stats += 1 if self.frame_counter_for_stats % self.stats_report_interval == 0: @@ -311,45 +401,36 @@ async def recv(self): stats = self.frame_read_stats[stream_id] total = stats["success"] + stats["failures"] if total > 0: - (stats["success"] / total) * 100 failure_rate = (stats["failures"] / total) * 100 if failure_rate > 5.0: # More than 5% failure rate - logger.warning( - f"⚠️ Stream {stream_id} has high failure rate ({failure_rate:.1f}%) - this could cause flickering!" - ) + logger.warning(f"⚠️ Stream {stream_id} has high failure rate ({failure_rate:.1f}%) - this could cause flickering!") # Reset stats for next interval for stream_id in self.active_stream_ids: - self.frame_read_stats[stream_id] = { - "success": 0, - "failures": 0, - "last_failure_time": self.frame_read_stats[stream_id]["last_failure_time"], - } - + self.frame_read_stats[stream_id] = {"success": 0, "failures": 0, "last_failure_time": self.frame_read_stats[stream_id]["last_failure_time"]} + # === APPLY TIME-SHIFT DELAY (SMOOTH DELAYED VIDEO) === output_frames = {} for stream_id in self.active_stream_ids if self.active_stream_ids else []: delay_ms = self.stream_delays[stream_id] stream_status = self.stream_status[stream_id] - + # Check if stream is ready or if we need a status frame if stream_status in ["initializing", "timeout", "error"]: # Generate status frame instead of waiting for real frame status_frame = self._create_status_frame(720, 480, stream_id, stream_status) output_frames[stream_id] = status_frame continue - + # Add fresh frame to buffer with timestamp if stream_id in fresh_frames: frame_with_time = StreamFrame(fresh_frames[stream_id], current_time, stream_id) self.frame_buffers[stream_id].append(frame_with_time) - + # Clean old frames (keep max 10 seconds) - while ( - self.frame_buffers[stream_id] - and current_time - self.frame_buffers[stream_id][0].timestamp > 10.0 - ): + while (self.frame_buffers[stream_id] and + current_time - self.frame_buffers[stream_id][0].timestamp > 10.0): self.frame_buffers[stream_id].popleft() - + if delay_ms == 0: # No delay - use fresh frame or recent cached frame if stream_id in fresh_frames: @@ -360,10 +441,8 @@ async def recv(self): # Keep a small buffer of recent frames even with no delay to smooth over frame read failures self.frame_buffers[stream_id].append(frame_with_time) # Only keep last 2 seconds of frames for anti-flicker buffering - while ( - self.frame_buffers[stream_id] - and current_time - self.frame_buffers[stream_id][0].timestamp > 2.0 - ): + while (self.frame_buffers[stream_id] and + current_time - self.frame_buffers[stream_id][0].timestamp > 2.0): self.frame_buffers[stream_id].popleft() output_frames[stream_id] = self.last_frames[stream_id] else: @@ -373,17 +452,17 @@ async def recv(self): # Time-shift delay - find frame closest to target delay time delay_seconds = delay_ms / 1000.0 target_time = current_time - delay_seconds - + best_frame = None - best_time_diff = float("inf") - + best_time_diff = float('inf') + # Find frame closest to target delay time (prevents drift during delay changes) for buffered_frame in self.frame_buffers[stream_id]: time_diff = abs(buffered_frame.timestamp - target_time) if time_diff < best_time_diff: best_time_diff = time_diff best_frame = buffered_frame.frame - + # Use best available frame or fallback to last frame if best_frame is not None: output_frames[stream_id] = best_frame @@ -392,59 +471,52 @@ async def recv(self): else: # Generate preparing frame if no data yet output_frames[stream_id] = self._create_status_frame(720, 480, stream_id, "preparing") - + # === COMBINE FRAMES WITH DYNAMIC LAYOUT === # Calculate layout based on number of active streams num_streams = len(self.active_stream_ids) if num_streams == 1: - grid_cols, _grid_rows = 1, 1 + grid_cols, grid_rows = 1, 1 combined_width, combined_height = 720, 480 elif num_streams == 2: - grid_cols, _grid_rows = 2, 1 + grid_cols, grid_rows = 2, 1 combined_width, combined_height = 1440, 480 - elif num_streams in {3, 4}: - grid_cols, _grid_rows = 2, 2 - combined_width, combined_height = 1440, 960 else: - grid_cols, _grid_rows = 2, 2 # Default + grid_cols, grid_rows = 2, 2 # Default for 3 or 4 streams combined_width, combined_height = 1440, 960 - + # Create combined frame (dynamic size) combined_frame = np.zeros((combined_height, combined_width, 3), dtype=np.uint8) - + # If no frames at all, return black frame (startup case) if not output_frames: pass # No frames available yet - + # Place frames in grid layout for i, stream_id in enumerate(self.active_stream_ids if self.active_stream_ids else []): if stream_id in output_frames: # Calculate grid position col = i % grid_cols row = i // grid_cols - + # Calculate position in combined frame x_pos = col * 720 y_pos = row * 480 - + # Ensure we don't exceed frame bounds if y_pos + 480 <= combined_height and x_pos + 720 <= combined_width: - combined_frame[y_pos : y_pos + 480, x_pos : x_pos + 720] = output_frames[stream_id] - + combined_frame[y_pos:y_pos+480, x_pos:x_pos+720] = output_frames[stream_id] + # Process vision (re-enabled for tracking) - detect what we're actually sending if vision_api and vision_api.is_tracking_enabled() and not self.vision_processing: # Process the exact combined frame that we're sending to WebRTC # Use current time - detections match what's displayed on screen - asyncio.create_task( - self._process_frame_vision( - combined_frame.copy(), current_time, self.frame_count + 1, relative_timestamp - ) - ) - + asyncio.create_task(self._process_frame_vision(combined_frame.copy(), current_time, self.frame_count + 1, relative_timestamp)) + # Convert BGR to RGB for WebRTC rgb_frame = cv2.cvtColor(combined_frame, cv2.COLOR_BGR2RGB) rgb_frame = np.ascontiguousarray(rgb_frame, dtype=np.uint8) - + # Update FPS counter self.frame_count += 1 now = time.time() @@ -452,70 +524,66 @@ async def recv(self): self.measured_fps = self.frame_count / (now - self.last_fps_time) self.frame_count = 0 self.last_fps_time = now - + # Create VideoFrame for aiortc - from av import VideoFrame # noqa: PLC0415 - + from av import VideoFrame av_frame = VideoFrame.from_ndarray(rgb_frame, format="rgb24") - + # Set proper timestamps for smooth playback # Use pts in 90kHz timebase (standard for video) pts = int(relative_timestamp * 90000) av_frame.pts = pts av_frame.time_base = Fraction(1, 90000) - + return av_frame - + except Exception as e: logger.error(f"Error in recv: {e}") # Return black frame on error black_frame = np.zeros((combined_height, combined_width, 3), dtype=np.uint8) rgb_frame = cv2.cvtColor(black_frame, cv2.COLOR_BGR2RGB) - - from av import VideoFrame # noqa: PLC0415 - + + from av import VideoFrame av_frame = VideoFrame.from_ndarray(rgb_frame, format="rgb24") av_frame.pts = int(relative_timestamp * 90000) av_frame.time_base = Fraction(1, 90000) return av_frame - + async def _initialize_capture(self): """Initialize the stream captures""" if self.stream_caps: return # Already initialized - + try: logger.info("πŸ”§ Initializing stream captures...") self._build_stream_captures() - + # Mark as running self.is_running = True - + # Count how many actually opened open_count = sum(1 for cap in self.stream_caps.values() if cap.isOpened()) total_streams = len(self.stream_caps) - + if open_count > 0: logger.info(f"βœ… {open_count}/{total_streams} stream captures opened successfully") else: - logger.warning("⚠️ No streams opened immediately, will retry in background") - + logger.warning(f"⚠️ No streams opened immediately, will retry in background") + except Exception as e: logger.error(f"Failed to initialize stream captures: {e}") - - async def _process_frame_vision( - self, frame: np.ndarray, timestamp: float, frame_id: int, relative_timestamp: float - ): + + async def _process_frame_vision(self, frame: np.ndarray, timestamp: float, frame_id: int, relative_timestamp: float): """Multi-stream vision processing with synchronized timestamps""" if self.vision_processing: return # Skip if already processing - + self.vision_processing = True try: # Process frame with vision API - pass stream information num_streams = len(self.active_stream_ids) stream_ids = list(self.active_stream_ids) - + if vision_api: result = vision_api.process_combined_frame(frame, timestamp, num_streams, stream_ids) if result: @@ -523,64 +591,59 @@ async def _process_frame_vision( # Simple timestamp - just what we need result.timestamp = timestamp self.latest_vision_result = result - else: logger.warning(f"⚠️ Vision processing returned None for frame {frame_id}") else: logger.warning("Vision API not available for processing.") - + except Exception as e: logger.error(f"❌ Vision processing error for frame {frame_id}: {e}") finally: self.vision_processing = False - + async def _reconnect_stream(self, stream_id: int): """Attempt to reconnect a failed stream""" try: self.stream_status[stream_id] = "reconnecting" logger.info(f"πŸ”„ Attempting to reconnect stream {stream_id}...") - + # Wait a bit before reconnecting (exponential backoff) await asyncio.sleep(5) - + # Close existing capture if stream_id in self.stream_caps: - with contextlib.suppress(Exception): + with contextlib.suppress(Exception): # Use contextlib.suppress for cleaner error handling self.stream_caps[stream_id].release() - + # Find stream configuration stream = next((s for s in self.enabled_streams if s["id"] == stream_id), None) if not stream: logger.error(f"Stream {stream_id} not found in configuration") return - + # Reconnect with simple OpenCV VideoCapture try: logger.info(f"🎬 Reconnecting stream {stream_id}...") - + # Set environment variable to control FFmpeg timeout - import os # noqa: PLC0415 - # Increase timeout to 60 seconds for IP cameras, use TCP transport - os.environ["OPENCV_FFMPEG_CAPTURE_OPTIONS"] = ( - "rtsp_transport;tcp|timeout;60000000|buffer_size;1024000|max_delay;5000000" - ) - + os.environ['OPENCV_FFMPEG_CAPTURE_OPTIONS'] = 'rtsp_transport;tcp|timeout;60000000|buffer_size;1024000|max_delay;5000000' + # Explicitly use FFmpeg backend to avoid GStreamer warnings self.stream_caps[stream_id] = cv2.VideoCapture(stream["url"], cv2.CAP_FFMPEG) - + # Set timeout and buffer properties self.stream_caps[stream_id].set(cv2.CAP_PROP_OPEN_TIMEOUT_MSEC, 60000) self.stream_caps[stream_id].set(cv2.CAP_PROP_READ_TIMEOUT_MSEC, 5000) - + # Set buffer size for stability if self.stream_caps[stream_id].isOpened(): self.stream_caps[stream_id].set(cv2.CAP_PROP_BUFFERSIZE, 3) self.stream_caps[stream_id].set(cv2.CAP_PROP_FPS, 30) - + # Give it time to initialize await asyncio.sleep(3) - + # Check if reconnection successful if self.stream_caps[stream_id].isOpened(): self.stream_status[stream_id] = "initializing" @@ -593,17 +656,15 @@ async def _reconnect_stream(self, stream_id: int): await asyncio.sleep(30) if self.stream_status[stream_id] == "error": # Still in error state asyncio.create_task(self._reconnect_stream(stream_id)) - + except Exception as e: logger.error(f"Error creating capture for stream {stream_id}: {e}") self.stream_status[stream_id] = "error" - + except Exception as e: logger.error(f"Error reconnecting stream {stream_id}: {e}") self.stream_status[stream_id] = "error" - - # Background vision processing removed - not needed and was causing conflicts - + def cleanup(self): """Cleanup resources and reset delays""" if self.stream_caps: @@ -611,57 +672,56 @@ def cleanup(self): cap.release() self.stream_caps = {} self.is_running = False - + # Reset all delays to 0 when stopping old_delays = self.stream_delays.copy() - self.stream_delays = dict.fromkeys(self.active_stream_ids, 0) - + self.stream_delays = dict.fromkeys(self.active_stream_ids, 0) + # Clear all frame buffers for stream_id in self.frame_buffers: self.frame_buffers[stream_id].clear() self.last_frames = dict.fromkeys(self.active_stream_ids) - + # Reset stream status self.stream_status = dict.fromkeys(self.active_stream_ids, "initializing") self.stream_init_timeout = {stream_id: time.time() + 30.0 for stream_id in self.active_stream_ids} - + if any(old_delays.values()): logger.info(f"πŸ”„ Reset delays from {old_delays} to {self.stream_delays} (clean restart)") - + logger.info("πŸ›‘ Stream captures released and delays reset") - + async def set_stream_delay(self, stream_id: int, delay_ms: int) -> bool: """Set delay for a specific stream in milliseconds (INSTANT - no pipeline restart!)""" if stream_id not in self.active_stream_ids: logger.warning(f"Invalid stream_id: {stream_id} (active streams: {self.active_stream_ids})") return False - + # Store the delay setting old_delay = self.stream_delays[stream_id] self.stream_delays[stream_id] = delay_ms logger.info(f"πŸ• Stream {stream_id} delay changing from {old_delay}ms to {delay_ms}ms") - + # Time-shift delay - smooth delayed video with immediate adjustment if delay_ms > 0: - logger.info( - f"βœ… Time-shift delay updated instantly! Stream {stream_id} will jump to {delay_ms}ms delay (prevents drift)" - ) + logger.info(f"βœ… Time-shift delay updated instantly! Stream {stream_id} will jump to {delay_ms}ms delay (prevents drift)") else: logger.info(f"βœ… Delay cleared for stream {stream_id} - real-time video") - + # Clear old frames outside new delay range to prevent confusion if delay_ms > 0: max_age = (delay_ms / 1000.0) + 2.0 # Keep 2 extra seconds current_time = time.time() - + # Remove frames older than needed - while self.frame_buffers[stream_id] and current_time - self.frame_buffers[stream_id][0].timestamp > max_age: + while (self.frame_buffers[stream_id] and + current_time - self.frame_buffers[stream_id][0].timestamp > max_age): self.frame_buffers[stream_id].popleft() - + logger.info(f"πŸ”§ Optimized buffer for {delay_ms}ms delay - immediate sync adjustment") - + return True - + def get_stream_delays(self) -> dict: """Get current delay settings for all streams""" return self.stream_delays.copy() @@ -669,89 +729,87 @@ def get_stream_delays(self) -> dict: class StreamCombinerManager: """Manages Stream Combiner - simplified from stream combiner version""" - + def __init__(self): - self.track: StreamCombinerTrack | None = None + self.track: Optional[StreamCombinerTrack] = None self.is_running = False - + @property def vision_api(self): """Access the global vision API instance""" - return vision_api - + return vision_api + async def start(self) -> bool: """Start the stream combiner (non-blocking)""" if self.is_running: logger.info("Stream combiner is already running") return True - + try: logger.info("🎬 Starting stream combiner in background...") - + # Get or create track (track is created immediately in get_video_track now) if not self.track: self.track = StreamCombinerTrack() - + # Start initialization in background asyncio.create_task(self._background_start()) - + # Return immediately - WebRTC can start with black frames return True - + except Exception as e: logger.error(f"Error starting stream combiner: {e}") return False - + async def _background_start(self): """Background initialization of streams""" try: if not self.track: logger.error("No track available for background initialization") return - + logger.info("πŸ”§ Initializing streams in background...") await self.track._initialize_capture() - + if self.track.is_running: self.is_running = True logger.info("βœ… Stream combiner started successfully") else: logger.warning("⚠️ Stream combiner initialization incomplete") - + except Exception as e: logger.error(f"Error in background stream start: {e}") - + async def stop(self): """Stop the stream combiner""" if not self.is_running: return - + logger.info("πŸ›‘ Stopping stream combiner...") - + if self.track: self.track.cleanup() self.track = None - + self.is_running = False logger.info("βœ… Stream combiner stopped") - - def get_video_track(self) -> StreamCombinerTrack | None: + + def get_video_track(self) -> Optional[StreamCombinerTrack]: """Get the video track for WebRTC - always returns a track (black frames if not ready)""" if not self.track: # Create track immediately, even if streams aren't ready self.track = StreamCombinerTrack() logger.info("🎬 Created video track (will show black frames until streams are ready)") return self.track - + def is_alive(self) -> bool: """Check if the combiner is alive""" - return ( - self.is_running - and self.track is not None - and bool(self.track.stream_caps) - and all(cap.isOpened() for cap in self.track.stream_caps.values()) - ) - + return (self.is_running and + self.track is not None and + bool(self.track.stream_caps) and + all(cap.isOpened() for cap in self.track.stream_caps.values())) + def get_stats(self) -> dict: """Get current stream statistics""" if self.track and self.is_running: @@ -759,130 +817,155 @@ def get_stats(self) -> dict: "fps": round(self.track.measured_fps, 1), "frameCount": self.track.frame_count, "isRunning": True, - "timestamp": time.time(), + "timestamp": time.time() } - return {"fps": 0, "frameCount": 0, "isRunning": False, "timestamp": time.time()} - - def get_latest_vision_result(self) -> Optional["VisionResult"]: + return { + "fps": 0, + "frameCount": 0, + "isRunning": False, + "timestamp": time.time() + } + + def get_latest_vision_result(self) -> Optional['VisionResult']: """Get the latest vision processing result""" if self.track: return self.track.latest_vision_result return None - - def set_vision_api(self, api): + + def set_vision_api(self, api: Any): # Added type hint for clarity """Set the VisionAPI instance to use""" - global vision_api # noqa: PLW0603 + global vision_api vision_api = api - logger.info(f"πŸ”— StreamCombinerManager received VisionAPI with {api.tracker.__class__.__name__}") - + # Modified log message slightly to be robust if 'api' doesn't have 'tracker' attribute + logger.info(f"πŸ”— StreamCombinerManager received VisionAPI with {api.__class__.__name__}") + def enable_vision_tracking(self): """Enable vision processing""" if vision_api: vision_api.enable_tracking() - - # Don't start background processing - let WebRTC handle vision processing - # Background processing was causing conflicts with main vision pipeline logger.info("βœ… Vision tracking enabled (WebRTC will handle processing)") - + def disable_vision_tracking(self): """Disable vision processing""" if vision_api: vision_api.disable_tracking() logger.info("πŸ›‘ Vision tracking disabled") - + def is_vision_tracking_enabled(self) -> bool: """Check if vision tracking is enabled""" return vision_api.is_tracking_enabled() if vision_api else False - + def get_vision_statistics(self) -> dict: """Get vision processing statistics""" return vision_api.get_statistics() if vision_api else {} - - def get_latest_frame(self) -> np.ndarray | None: + + def get_latest_frame(self) -> Optional[np.ndarray]: """Get the latest frame from the combiner for calibration purposes""" - if ( - not self.track - or not self.track.stream_caps - or not all(cap.isOpened() for cap in self.track.stream_caps.values()) - ): + # Ensure tracks and capabilities are initialized before trying to read frames + if not self.track or not self.track.stream_caps or not all(cap.isOpened() for cap in self.track.stream_caps.values()): + logger.debug("get_latest_frame: Track or stream capabilities not ready.") return None - + try: - # Read current frame from each stream frames = [] - for stream_id, cap in self.track.stream_caps.items(): - ret, frame = cap.read() - if ret and frame is not None: - # Resize frame to 720x480 if needed - if frame.shape[:2] != (480, 720): - frame = cv2.resize(frame, (720, 480)) - frames.append((stream_id, frame)) - + # Iterate through the streams that are actually configured and open + # It's safer to iterate through self.track.active_stream_ids and get the cap + for stream_id in self.track.active_stream_ids: + cap = self.track.stream_caps.get(stream_id) + if cap and cap.isOpened(): + ret, frame = cap.read() + if ret and frame is not None: + # Apply undistortion to calibration frame if enabled + if ServerConfig.ENABLE_UNDISTORTION: + frame = self.track._undistort_frame(frame, stream_id) + # Resize frame to 720x480 for consistency, if it's not already + if frame.shape[:2] != (480, 720): + frame = cv2.resize(frame, (720, 480)) + frames.append((stream_id, frame)) + else: + logger.warning(f"get_latest_frame: Failed to read frame from stream {stream_id}") + else: + logger.warning(f"get_latest_frame: Stream {stream_id} capture not open or available.") + if frames: - # Calculate layout based on number of active streams - num_streams = len(self.track.active_stream_ids) - if num_streams == 1: - grid_cols, _grid_rows = 1, 1 + # Calculate layout based on number of active streams (can be different from actual captured frames) + # Use total active streams for layout to maintain consistent grid + num_streams_for_layout = len(self.track.active_stream_ids) + + # Default to 2x2 for more than 2 streams to avoid a long single row + if num_streams_for_layout == 1: + grid_cols, grid_rows = 1, 1 combined_width, combined_height = 720, 480 - elif num_streams == 2: - grid_cols, _grid_rows = 2, 1 + elif num_streams_for_layout == 2: + grid_cols, grid_rows = 2, 1 combined_width, combined_height = 1440, 480 - elif num_streams in {3, 4}: - grid_cols, _grid_rows = 2, 2 - combined_width, combined_height = 1440, 960 - else: - grid_cols, _grid_rows = 2, 2 # Default + else: # 3 or more streams, assume 2x2 layout for up to 4 streams + grid_cols, grid_rows = 2, 2 combined_width, combined_height = 1440, 960 - - # Combine frames manually with dynamic layout + combined_frame = np.zeros((combined_height, combined_width, 3), dtype=np.uint8) - for i, (_stream_id, frame) in enumerate(frames): - # Calculate grid position - col = i % grid_cols - row = i // grid_cols - - # Calculate position in combined frame + + # Place captured frames into the combined_frame + # Sort frames by stream_id if you want a consistent order regardless of capture order + frames.sort(key=lambda x: x[0]) + for i, (stream_id, frame) in enumerate(frames): + # Find its original index in active_stream_ids to get correct grid position + try: + original_idx = self.track.active_stream_ids.index(stream_id) + except ValueError: + logger.warning(f"Stream ID {stream_id} not found in active_stream_ids for layout mapping. Using sequential index {i}.") + original_idx = i # Fallback to sequential index if not found (should ideally not happen if active_stream_ids is correctly populated) + + col = original_idx % grid_cols + row = original_idx // grid_cols + x_pos = col * 720 y_pos = row * 480 - + # Ensure we don't exceed frame bounds if y_pos + 480 <= combined_height and x_pos + 720 <= combined_width: - combined_frame[y_pos : y_pos + 480, x_pos : x_pos + 720] = frame - + combined_frame[y_pos:y_pos+480, x_pos:x_pos+720] = frame + else: + logger.warning(f"Frame for stream {stream_id} at grid position ({col},{row}) exceeds combined frame bounds. Skipping placement.") + return combined_frame - logger.warning("No valid frames to combine") - return None + else: + logger.warning("No valid frames to combine for get_latest_frame.") + return None except Exception as e: - logger.error(f"Error capturing frame: {e}") - + logger.error(f"Error in get_latest_frame: {e}") + import traceback + logger.error(traceback.format_exc()) + return None - + async def set_stream_delay(self, stream_id: int, delay_ms: int) -> bool: """Set delay for a specific stream""" if not self.track: logger.warning("No active track to set delay on") return False - + return await self.track.set_stream_delay(stream_id, delay_ms) - + def get_stream_delays(self) -> dict: """Get current delay settings for all streams""" if not self.track: try: + # If track is not initialized, try to get enabled streams from ServerConfig enabled_streams = ServerConfig.get_enabled_streams() return {stream["id"]: 0 for stream in enabled_streams} except Exception as e: - logger.warning(f"Could not get enabled streams: {e}") - # Return default for 2 streams - return {0: 0, 1: 0} + logger.warning(f"Could not get enabled streams for get_stream_delays (no track): {e}") + # Fallback to a default structure if ServerConfig access also fails + return {0: 0, 1: 0} return self.track.get_stream_delays() - + async def set_all_delays(self, delays: dict) -> bool: """Set delays for all streams at once""" if not self.track: logger.warning("No active track to set delays on") return False - + success = True for stream_id, delay_ms in delays.items(): if not await self.set_stream_delay(stream_id, delay_ms): @@ -891,4 +974,4 @@ async def set_all_delays(self, delays: dict) -> bool: # Global manager instance -stream_combiner_manager = StreamCombinerManager() +stream_combiner_manager = StreamCombinerManager() \ No newline at end of file