diff --git a/benchmark-scripts/poi_stream_density.py b/benchmark-scripts/poi_stream_density.py index 6b1cf43..64694c3 100644 --- a/benchmark-scripts/poi_stream_density.py +++ b/benchmark-scripts/poi_stream_density.py @@ -40,6 +40,7 @@ SCENE_INCREMENT Scenes to add per iteration (default: 1) INIT_DURATION Warm-up seconds after restart (default: 90) STABILISE_DURATION Extra wait for pipeline to stabilise (default: 30) +BENCHMARK_DURATION Max wait for single benchmark in seconds (default: 120) RESULTS_DIR Where to write results (default: ./results) MAX_ITERATIONS Safety cap on iterations (default: 50) """ @@ -65,7 +66,10 @@ import psutil -from consolidate_multiple_run_of_metrics import get_vlm_application_latency +from consolidate_multiple_run_of_metrics import ( + get_vlm_application_latency, + get_vlm_application_latency_stream_density, +) # --------------------------------------------------------------------------- # Logging @@ -613,13 +617,23 @@ def _collect_poi_latency_from_docker_logs(app_dir: str, duration_secs: int = 30) return stats -def _collect_poi_e2e_latency_from_alerts() -> Dict[str, float]: +def _collect_poi_e2e_latency_from_alerts( + since: Optional[datetime] = None, +) -> Dict[str, float]: """Compute real end-to-end latency from POI alerts API. Each alert contains: - - ``timestamp``: when the person was detected by the camera (MQTT msg) + - ``mqtt_received_at``: when POI backend received the MQTT message (preferred) + - ``timestamp``: DLStreamer frame capture time (fallback, includes pipeline latency) - ``dispatched_at``: when the alert was actually dispatched + Uses ``mqtt_received_at`` when available to measure POI application latency + only, excluding DLStreamer pipeline latency (~6-8s). + + Args: + since: If provided, only include alerts dispatched after this time. + Filters out stale alerts from previous benchmark runs. + Returns dict with ``poi_e2e_latency_avg_ms``, ``poi_e2e_latency_max_ms``, ``poi_e2e_latency_min_ms``, and ``poi_e2e_alert_count``. """ @@ -637,26 +651,50 @@ def _collect_poi_e2e_latency_from_alerts() -> Dict[str, float]: if not isinstance(alerts, list) or not alerts: return {} - from datetime import datetime as _dt + from datetime import datetime as _dt, timezone as _tz latencies_ms: list[float] = [] + skipped = 0 + used_mqtt_received = 0 + used_frame_timestamp = 0 for alert in alerts: - ts_str = alert.get("timestamp", "") + # Prefer mqtt_received_at (POI application latency only, excludes + # DLStreamer pipeline latency) over timestamp (frame capture time). + mqtt_recv = alert.get("mqtt_received_at", "") + start_str = mqtt_recv or alert.get("timestamp", "") dispatched_str = alert.get("dispatched_at", "") - if not ts_str or not dispatched_str: + if not start_str or not dispatched_str: continue try: - # Parse ISO timestamps — handle both Z suffix and +00:00 - ts_str = ts_str.replace("Z", "+00:00") + start_str = start_str.replace("Z", "+00:00") dispatched_str = dispatched_str.replace("Z", "+00:00") - ts = _dt.fromisoformat(ts_str) + start = _dt.fromisoformat(start_str) dispatched = _dt.fromisoformat(dispatched_str) - delta_ms = (dispatched - ts).total_seconds() * 1000 + + # Normalize both to UTC-aware to avoid mixed tz subtraction errors + start_utc = start.astimezone(_tz.utc) if start.tzinfo else start.replace(tzinfo=_tz.utc) + dispatched_utc = dispatched.astimezone(_tz.utc) if dispatched.tzinfo else dispatched.replace(tzinfo=_tz.utc) + + # Filter out alerts from before the benchmark started + if since is not None: + since_aware = since.astimezone(_tz.utc) if since.tzinfo else since.replace(tzinfo=_tz.utc) + if dispatched_utc < since_aware: + skipped += 1 + continue + + delta_ms = (dispatched_utc - start_utc).total_seconds() * 1000 if delta_ms >= 0: latencies_ms.append(delta_ms) + if mqtt_recv: + used_mqtt_received += 1 + else: + used_frame_timestamp += 1 except (ValueError, TypeError): continue + if skipped: + logger.info("Filtered out %d stale alerts (before benchmark start)", skipped) + if not latencies_ms: return {} @@ -666,9 +704,14 @@ def _collect_poi_e2e_latency_from_alerts() -> Dict[str, float]: "poi_e2e_latency_min_ms": min(latencies_ms), "poi_e2e_alert_count": len(latencies_ms), } + label = "MQTT receive → alert dispatch" + if used_frame_timestamp and not used_mqtt_received: + label = "frame capture → alert dispatch (includes DLStreamer latency)" + elif used_frame_timestamp: + label = "start → alert dispatch (mixed sources)" logger.info( - "E2E latency (MQTT detection → alert dispatch): " - "avg=%.0fms, min=%.0fms, max=%.0fms (%d alerts)", + "E2E latency (%s): avg=%.0fms, min=%.0fms, max=%.0fms (%d alerts)", + label, stats["poi_e2e_latency_avg_ms"], stats["poi_e2e_latency_min_ms"], stats["poi_e2e_latency_max_ms"], @@ -677,9 +720,14 @@ def _collect_poi_e2e_latency_from_alerts() -> Dict[str, float]: return stats -def _save_alert_thumbnails(results_dir: str, iteration: int = 1) -> int: +def _save_alert_thumbnails( + results_dir: str, iteration: int = 1, since: Optional[datetime] = None, +) -> int: """Fetch alerts and their thumbnails from the POI API and save to results_dir. + Args: + since: If provided, only save thumbnails for alerts dispatched after this time. + Returns the number of thumbnails saved. """ import urllib.request @@ -701,7 +749,22 @@ def _save_alert_thumbnails(results_dir: str, iteration: int = 1) -> int: logger.info("No alerts found — no thumbnails to save") return 0 + from datetime import datetime as _dt, timezone as _tz + for i, alert in enumerate(alerts): + # Filter stale alerts + if since is not None: + dispatched_str = alert.get("dispatched_at", "") + if dispatched_str: + try: + d_str = dispatched_str.replace("Z", "+00:00") + dispatched = _dt.fromisoformat(d_str) + since_aware = since.astimezone(_tz.utc) if since.tzinfo else since.replace(tzinfo=_tz.utc) + dispatched_utc = dispatched.astimezone(_tz.utc) if dispatched.tzinfo else dispatched.replace(tzinfo=_tz.utc) + if dispatched_utc < since_aware: + continue + except (ValueError, TypeError): + pass # Extract fields from nested alert structure match_data = alert.get("match", {}) thumb_url = match_data.get("thumbnail_path") or alert.get("thumbnail_path") or "" @@ -740,38 +803,52 @@ def _save_alert_thumbnails(results_dir: str, iteration: int = 1) -> int: return saved -def _collect_poi_latency_from_metrics_files(results_dir: str) -> Dict[str, float]: +def _collect_poi_latency_from_metrics_files( + results_dir: str, stream_density: bool = False +) -> Dict[str, float]: """ Extract POI detection-to-alert latency from vlm_application_metrics files. These files are written by the vlm_metrics_logger package via user_log_start_time (detection) and log_end_time (alert dispatch) calls in the poi-backend. + + For single benchmarks uses ``get_vlm_application_latency`` (all pairs). + For stream density uses ``get_vlm_application_latency_stream_density`` + (last 20 pairs) to reflect current-iteration performance. """ all_stats: Dict[str, float] = {} search_dirs = [results_dir, "/tmp"] - for d in search_dirs: - if not os.path.isdir(d): - continue - - # Find vlm_application_metrics files - pattern = os.path.join(d, "vlm_application_metrics_*.txt") - files = sorted(glob.glob(pattern), key=os.path.getmtime) - if not files: - continue - - # Use the most recent file - latest = files[-1] - try: - stats = get_vlm_application_latency(latest) - if stats: - for key, avg_ms in stats.items(): - # Extract a clean key name - all_stats[f"vlm_{key}"] = avg_ms - logger.info("VLM metrics file latency (%s): %s", latest, stats) - except Exception as e: - logger.warning("Failed to parse VLM metrics in %s: %s", d, e) + if stream_density: + for d in search_dirs: + if not os.path.isdir(d): + continue + try: + stats = get_vlm_application_latency_stream_density(d, last_n_pairs=20) + if stats: + for app_id, avg_ms in stats.items(): + all_stats[f"vlm_{app_id}_avg_ms"] = avg_ms + logger.info("VLM stream-density latency (%s): %s", d, stats) + except Exception as e: + logger.warning("Failed to parse VLM metrics in %s: %s", d, e) + else: + for d in search_dirs: + if not os.path.isdir(d): + continue + pattern = os.path.join(d, "vlm_application_metrics_*.txt") + files = sorted(glob.glob(pattern), key=os.path.getmtime) + if not files: + continue + latest = files[-1] + try: + stats = get_vlm_application_latency(latest) + if stats: + for key, avg_ms in stats.items(): + all_stats[f"vlm_{key}"] = avg_ms + logger.info("VLM metrics file latency (%s): %s", latest, stats) + except Exception as e: + logger.warning("Failed to parse VLM metrics in %s: %s", d, e) return all_stats @@ -781,9 +858,9 @@ def _extract_poi_latency(stats: Dict[str, float], metric: str) -> float: Extract a single representative POI latency value from collected stats. Priority: - 1. Real E2E latency from alerts API (MQTT detection → alert dispatch, - millisecond precision) - 2. vlm_application_metrics file values (SAD pipeline) + 1. vlm_application_metrics file values (MQTT receive → alert dispatch, + measures POI application latency only, excludes DLStreamer pipeline) + 2. Alerts API fallback (``mqtt_received_at`` → ``dispatched_at``) 3. Returns 0 if no data available Note: Docker-log-based ``log_detection_to_alert_ms`` is excluded because @@ -791,15 +868,7 @@ def _extract_poi_latency(stats: Dict[str, float], metric: str) -> float: first-alert gap includes dedup delay (60 s TTL), making it unreliable as a per-event latency metric. """ - # Primary: real E2E latency from alerts API - e2e_avg = stats.get("poi_e2e_latency_avg_ms", 0.0) - e2e_max = stats.get("poi_e2e_latency_max_ms", 0.0) - if e2e_avg > 0: - if metric == "max": - return e2e_max - return e2e_avg - - # Fallback: vlm_application_metrics file-based values + # Primary: vlm_application_metrics file-based values vlm_values = [v for k, v in stats.items() if k.startswith("vlm_") and isinstance(v, (int, float)) and v > 0] if vlm_values: @@ -807,6 +876,14 @@ def _extract_poi_latency(stats: Dict[str, float], metric: str) -> float: return max(vlm_values) return mean(vlm_values) + # Fallback: alerts API E2E latency + e2e_avg = stats.get("poi_e2e_latency_avg_ms", 0.0) + e2e_max = stats.get("poi_e2e_latency_max_ms", 0.0) + if e2e_avg > 0: + if metric == "max": + return e2e_max + return e2e_avg + return 0.0 @@ -862,6 +939,7 @@ def __init__( max_iterations: int, single_run: bool = False, single_run_scenes: int = 1, + benchmark_duration: int = 120, ): self.app_dir = os.path.abspath(app_dir) self.target_latency_ms = target_latency_ms @@ -873,8 +951,55 @@ def __init__( self.max_iterations = max_iterations self.single_run = single_run self.single_run_scenes = single_run_scenes + self.benchmark_duration = benchmark_duration os.makedirs(self.results_dir, exist_ok=True) + def _services_running(self) -> bool: + """Check if key POI pipeline services are already running.""" + for container in ("poi-backend", "storewide-lp-lp-video-1"): + result = subprocess.run( + f"docker inspect {container} --format '{{{{.State.Running}}}}'", + shell=True, capture_output=True, text=True) + if result.stdout.strip() != "true": + return False + return True + + def _wait_for_alert_or_timeout(self, duration: int) -> None: + """Poll for alerts during single benchmark, exit early on first alert. + + For single benchmarks the goal is to measure time-to-first-alert. + Instead of sleeping the full duration, poll every 5 seconds and + return as soon as at least one alert is found. + """ + import urllib.request + import urllib.error + + poll_interval = 5 + elapsed = 0 + logger.info("Waiting up to %ds for alert (polling every %ds) …", + duration, poll_interval) + + while elapsed < duration: + sleep_time = min(poll_interval, duration - elapsed) + time.sleep(sleep_time) + elapsed += sleep_time + + try: + req = urllib.request.Request("http://localhost:8000/api/v1/alerts") + with urllib.request.urlopen(req, timeout=10) as resp: + alerts = json.loads(resp.read().decode()) + if isinstance(alerts, list) and len(alerts) > 0: + logger.info("Alert received after %ds — stopping early", elapsed) + # Brief extra wait for metrics files to flush + time.sleep(5) + return + except Exception: + pass + + logger.info("No alerts yet (%d/%ds elapsed)", elapsed, duration) + + logger.info("Benchmark duration reached (%ds) — collecting results", duration) + def run(self) -> StreamDensityResult: """Execute the POI stream-density loop.""" self._print_header() @@ -893,24 +1018,42 @@ def run(self) -> StreamDensityResult: logger.warning("Memory threshold exceeded – stopping.") break + # Record iteration start time for filtering stale alerts + iteration_start = datetime.utcnow() + # Clean old metrics before each measurement _clean_metrics(self.results_dir) - # Scale to desired scene count - _scale_pipeline_services(self.app_dir, num_scenes, wait=self.init_duration) + if self.single_run and self._services_running(): + # Single benchmark: services already up, skip scaling + logger.info("Services already running — skipping scaling for single benchmark") + else: + # Scale to desired scene count + _scale_pipeline_services(self.app_dir, num_scenes, wait=self.init_duration) + + # Wait for data collection + if self.single_run: + # Single benchmark: poll for alerts with early exit + self._wait_for_alert_or_timeout(self.benchmark_duration) + else: + # Stream density: fixed stabilise wait per iteration + logger.info("Collecting data for %ds …", self.stabilise_duration) + time.sleep(self.stabilise_duration) - # Wait for pipeline to stabilise and collect data - logger.info("Collecting data for %ds …", self.stabilise_duration) - time.sleep(self.stabilise_duration) + # Use actual elapsed time for log collection window + elapsed_seconds = int((datetime.utcnow() - iteration_start).total_seconds()) + log_window = elapsed_seconds if self.single_run else self.stabilise_duration # Collect latency from metrics files + docker logs log_stats = _collect_poi_latency_from_docker_logs( - self.app_dir, self.stabilise_duration) - file_stats = _collect_poi_latency_from_metrics_files(self.results_dir) - e2e_stats = _collect_poi_e2e_latency_from_alerts() + self.app_dir, log_window) + file_stats = _collect_poi_latency_from_metrics_files( + self.results_dir, stream_density=not self.single_run) + e2e_stats = _collect_poi_e2e_latency_from_alerts(since=iteration_start) # Save alert thumbnails to results directory - _save_alert_thumbnails(self.results_dir, iteration=iteration) + _save_alert_thumbnails(self.results_dir, iteration=iteration, + since=iteration_start) # Merge all stats stats: Dict[str, float] = {} @@ -994,7 +1137,10 @@ def _print_header(self) -> None: print(f" Latency Metric: {self.latency_metric}") print(f" Scene Increment: +{self.scene_increment}") print(f" Init Duration: {self.init_duration}s") - print(f" Stabilise: {self.stabilise_duration}s") + if self.single_run: + print(f" Benchmark Duration:{self.benchmark_duration}s (exits early on alert)") + else: + print(f" Stabilise: {self.stabilise_duration}s") print(f" Results Dir: {self.results_dir}") print(f" Single-run Mode: {self.single_run}") print("=" * 70) @@ -1103,6 +1249,7 @@ def cmd_run(args) -> None: max_iterations=args.max_iterations, single_run=args.single_run, single_run_scenes=args.scenes, + benchmark_duration=args.benchmark_duration, ) result = tester.run() sys.exit(0 if result.met_target else 1) @@ -1143,6 +1290,7 @@ def main() -> None: scene_increment = _env_int("SCENE_INCREMENT", 1) init_duration = _env_int("INIT_DURATION", 90) stabilise_duration = _env_int("STABILISE_DURATION", 30) + benchmark_duration = _env_int("BENCHMARK_DURATION", 120) results_dir = _env_str("RESULTS_DIR", "./results") max_iterations = _env_int("MAX_ITERATIONS", 50) @@ -1164,6 +1312,9 @@ def main() -> None: p_run.add_argument("--max_iterations", type=int, default=max_iterations) p_run.add_argument("--single_run", action="store_true", help="Run once with --scenes scenes (benchmark mode)") + p_run.add_argument("--benchmark_duration", type=int, default=benchmark_duration, + help="Max duration in seconds for single benchmark (default: 120). " + "Exits early when an alert is received.") p_run.add_argument("--scenes", type=int, default=1, help="Number of scenes for single-run mode") p_run.set_defaults(func=cmd_run)