Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 73 additions & 16 deletions benchmark-scripts/poi_stream_density.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,14 @@
poi-alert-service, and poi-ui stay running

Latency is measured from ``vlm_application_metrics`` files written by the
``vlm_metrics_logger`` package (user_log_start_time / log_end_time calls
in poi-backend).
``vlm_metrics_logger`` package. In ``alert_service.py``:
- **start**: ``user_log_start_time(frame_ts_ms, ...)`` — the DLStreamer frame
capture timestamp from the MQTT payload (epoch ms). This is the moment
the camera frame was decoded by the pipeline.
- **end**: ``log_end_time(...)`` — wall-clock time at alert dispatch.

This measures **true end-to-end latency**:
camera frame capture → DLStreamer decode/detect/reid → MQTT → FAISS match → alert dispatch.

Sub-commands
------------
Expand All @@ -43,6 +49,12 @@
BENCHMARK_DURATION Max wait for single benchmark in seconds (default: 120)
RESULTS_DIR Where to write results (default: ./results)
MAX_ITERATIONS Safety cap on iterations (default: 50)
RESOURCE_CONFIG Path to device resource config file relative to app_dir
(e.g. configs/res/all-gpu.env). Passed to init.sh on
every re-init so device and model precision are preserved
across stream-density iterations. Prefer passing
--resource_config on the CLI (set automatically by
``make benchmark DEVICE=...``).
"""

import argparse
Expand Down Expand Up @@ -444,16 +456,33 @@ def _generate_dlstreamer_config(app_dir: str, num_scenes: int) -> None:
logger.info("Generated DLStreamer configs for %d total cameras", base_camera_count + num_scenes - 1)


def _reinit_env(app_dir: str) -> None:
"""Re-run init.sh to regenerate .env with updated config."""
def _reinit_env(app_dir: str, resource_config: str = "") -> None:
"""Re-run init.sh to regenerate .env with updated config.

Parameters
----------
app_dir:
Absolute path to the person-of-interest/ directory.
resource_config:
Path to the device resource config file (e.g. configs/res/all-gpu.env).
Passed as the ``RESOURCE_CONFIG`` env var to init.sh so that device,
precision, and pre-process settings are preserved across stream-density
iterations. When empty, init.sh uses its own default (all-gpu-cpu.env).
"""
init_script = Path(app_dir) / ".." / "scenescape" / "scripts" / "init.sh"
if not init_script.exists():
logger.warning("init.sh not found at %s — skipping .env regeneration", init_script)
return

env = os.environ.copy()
if resource_config:
env["RESOURCE_CONFIG"] = resource_config
logger.info("Re-running init.sh with RESOURCE_CONFIG=%s …", resource_config)
else:
logger.info("Re-running init.sh to update .env …")

cmd = f"bash {shlex.quote(str(init_script))} {shlex.quote(app_dir)}"
logger.info("Re-running init.sh to update .env …")
result = subprocess.run(cmd, shell=True, capture_output=True, text=True)
result = subprocess.run(cmd, shell=True, capture_output=True, text=True, env=env)
if result.returncode != 0:
logger.warning("init.sh returned non-zero:\n%s", result.stderr[-500:])
else:
Expand All @@ -479,7 +508,7 @@ def _wait_for_web_healthy(timeout: int = 300) -> None:
logger.warning("Web container did not become healthy after %ds — continuing anyway", timeout)


def _scale_pipeline_services(app_dir: str, num_scenes: int, wait: int = 90) -> None:
def _scale_pipeline_services(app_dir: str, num_scenes: int, wait: int = 90, resource_config: str = "") -> None:
"""
Scale the POI video pipeline to N scenes.

Expand All @@ -497,7 +526,7 @@ def _scale_pipeline_services(app_dir: str, num_scenes: int, wait: int = 90) -> N

_set_stream_density(app_dir, num_scenes)
_generate_cameras_override(app_dir, num_scenes)
_reinit_env(app_dir)
_reinit_env(app_dir, resource_config=resource_config)
_generate_dlstreamer_config(app_dir, num_scenes)

# Bring up any new camera services
Expand Down Expand Up @@ -734,7 +763,16 @@ def _save_alert_thumbnails(
import urllib.error

thumbs_dir = os.path.join(results_dir, f"thumbnails_iter{iteration}")
os.makedirs(thumbs_dir, exist_ok=True)
try:
os.makedirs(thumbs_dir, exist_ok=True)
except PermissionError:
logger.warning(
"Cannot create thumbnails directory %s (permission denied). "
"The results/ directory may be owned by root (written by a Docker container). "
"Run: sudo chown -R $USER results/",
thumbs_dir,
)
return 0
saved = 0

try:
Expand Down Expand Up @@ -858,10 +896,15 @@ def _extract_poi_latency(stats: Dict[str, float], metric: str) -> float:
Extract a single representative POI latency value from collected stats.

Priority:
1. vlm_application_metrics file values (MQTT receive → alert dispatch,
measures POI application latency only, excludes DLStreamer pipeline)
2. Alerts API fallback (``mqtt_received_at`` → ``dispatched_at``)
3. Returns 0 if no data available
1. vlm_application_metrics file values — TRUE end-to-end latency:
start = DLStreamer frame capture timestamp (set by user_log_start_time
in alert_service.py using the MQTT payload's frame timestamp field),
end = wall-clock time at alert dispatch (log_end_time).
This spans: camera frame capture → DLStreamer pipeline → FAISS match
→ alert dispatch.
2. Alerts API fallback (``mqtt_received_at`` → ``dispatched_at``) —
POI application latency only, excludes DLStreamer pipeline latency.
3. Returns 0 if no data available.

Note: Docker-log-based ``log_detection_to_alert_ms`` is excluded because
log timestamps have only second-level precision and the first-match-to-
Expand Down Expand Up @@ -940,6 +983,7 @@ def __init__(
single_run: bool = False,
single_run_scenes: int = 1,
benchmark_duration: int = 120,
resource_config: str = "",
):
self.app_dir = os.path.abspath(app_dir)
self.target_latency_ms = target_latency_ms
Expand All @@ -952,6 +996,7 @@ def __init__(
self.single_run = single_run
self.single_run_scenes = single_run_scenes
self.benchmark_duration = benchmark_duration
self.resource_config = resource_config
os.makedirs(self.results_dir, exist_ok=True)

def _services_running(self) -> bool:
Expand Down Expand Up @@ -1029,7 +1074,8 @@ def run(self) -> StreamDensityResult:
logger.info("Services already running — skipping scaling for single benchmark")
else:
# Scale to desired scene count
_scale_pipeline_services(self.app_dir, num_scenes, wait=self.init_duration)
_scale_pipeline_services(self.app_dir, num_scenes, wait=self.init_duration,
resource_config=self.resource_config)

# Wait for data collection
if self.single_run:
Expand Down Expand Up @@ -1250,6 +1296,7 @@ def cmd_run(args) -> None:
single_run=args.single_run,
single_run_scenes=args.scenes,
benchmark_duration=args.benchmark_duration,
resource_config=args.resource_config,
)
result = tester.run()
sys.exit(0 if result.met_target else 1)
Expand All @@ -1260,7 +1307,7 @@ def cmd_generate(args) -> None:
_set_stream_density(args.app_dir, num)
_generate_dlstreamer_config(args.app_dir, num)
_generate_cameras_override(args.app_dir, num)
_reinit_env(args.app_dir)
_reinit_env(args.app_dir, resource_config=args.resource_config)
print(f"Generated overrides for {num} scene(s). Run 'make demo' to start.")


Expand All @@ -1275,7 +1322,7 @@ def cmd_clean(args) -> None:
_set_stream_density(app_dir, 1)
_generate_dlstreamer_config(app_dir, 1)
_clean_cameras_override(app_dir)
_reinit_env(app_dir)
_reinit_env(app_dir, resource_config=getattr(args, "resource_config", ""))
print("Cleaned up – stream_density reset to 1.")


Expand Down Expand Up @@ -1317,17 +1364,27 @@ def main() -> None:
"Exits early when an alert is received.")
p_run.add_argument("--scenes", type=int, default=1,
help="Number of scenes for single-run mode")
p_run.add_argument("--resource_config", default="",
help="Absolute path to device resource config "
"(e.g. /path/to/configs/res/all-gpu.env). "
"Passed as RESOURCE_CONFIG to init.sh on every "
"re-init so device and precision are preserved "
"across stream-density iterations.")
p_run.set_defaults(func=cmd_run)

# --- generate ---
p_gen = sub.add_parser("generate", help="Generate overrides for N scenes")
p_gen.add_argument("app_dir")
p_gen.add_argument("--scenes", type=int, default=1)
p_gen.add_argument("--resource_config", default="",
help="Absolute path to device resource config file.")
p_gen.set_defaults(func=cmd_generate)

# --- clean ---
p_clean = sub.add_parser("clean", help="Revert to single scene")
p_clean.add_argument("app_dir")
p_clean.add_argument("--resource_config", default="",
help="Absolute path to device resource config file.")
p_clean.set_defaults(func=cmd_clean)

# --- down ---
Expand Down
Loading