Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
158 changes: 71 additions & 87 deletions services/detection/detection.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
"""
detector.py YOLOv8/v9 frame-level object detection.
detection.py YOLOv8/v9 frame-level object detection.

Usage (CLI):
python detector.py --source data/sample_videos/sample.mp4
python detector.py --source 0 # webcam
python detection.py --source data/sample_videos/sample.mp4
python detection.py --source 0 # webcam

Usage (API):
from services.detection.detector import Detector
from services.detection.detection import Detector
detector = Detector()
results = detector.detect(frame)
"""
Expand Down Expand Up @@ -48,13 +48,20 @@ class DetectionFrame:
logger = logging.getLogger(__name__)


# ─── Detector Class ──────────────────────────────────────────────────────────

class Detector:
"""Wraps a YOLO model for frame-by-frame inference."""
"""YOLOv8/v9 wrapper for frame-level object detection.

Runs inference on individual BGR frames and returns structured
DetectionFrameSchema objects with bounding boxes, labels, confidence
scores, and zone memberships.

Attributes:
PERSON_CLASS_ID: YOLO class index for 'person'.
TARGET_LABELS: Set of object labels to retain from YOLO output.
"""

PERSON_CLASS_ID = 0 # COCO class ID for 'person'
TARGET_LABELS = { # labels to pass downstream (filter noise)
PERSON_CLASS_ID = 0
TARGET_LABELS = {
"person", "backpack", "handbag", "cell phone", "laptop"
}

Expand Down Expand Up @@ -89,13 +96,20 @@ def __init__(
self.model = YOLO(model_name)
self.conf = confidence_threshold
self.device = device

def detect(self, frame: np.ndarray, frame_id: int = 0) -> DetectionFrame:
"""Run YOLO inference on a single frame.

Performs object detection on the provided BGR image, filters
detections to supported target labels, assigns zone memberships,
and returns the results in a structured DetectionFrame object.
def detect(self, frame: np.ndarray, frame_id: int = 0) -> DetectionFrameSchema:
"""Run YOLO inference on a single BGR frame.

Args:
frame: BGR image as numpy array (H, W, 3).
frame_id: Frame index for downstream tracking.

Returns:
DetectionFrameSchema with all detected objects and zone memberships.

Example:
detector = Detector()
det_frame = detector.detect(frame, frame_id=42)
"""
results = self.model(frame, device=self.device, verbose=False)
detections: list[Detection] = []
Expand All @@ -121,60 +135,48 @@ def detect(self, frame: np.ndarray, frame_id: int = 0) -> DetectionFrame:
x1, y1, x2, y2 = box.tolist()
cx, cy = (x1 + x2) / 2, (y1 + y2) / 2

zones = [z.name for z in get_zones_for_point(cx, cy, zones=active_zones)]
_ = [z.name for z in get_zones_for_point(cx, cy)]

# Use simple list bbox to match local Detection dataclass
detections.append(Detection(
detections.append(DetectionSchema(
label=label,
bbox=[x1, y1, x2, y2],
confidence=float(conf),
center=(cx, cy),
zones_present=zones,
class_id=int(cls_id),
))

return DetectionFrame(
return DetectionFrameSchema(
frame_id=frame_id,
detections=detections,
timestamp_ms=cv2.getTickCount() / cv2.getTickFrequency() * 1000,
)


# ─── Rendering ────────────────────────────────────────────────────────────────

LABEL_COLORS: dict[str, tuple[int, int, int]] = {
"person": (0, 120, 255),
"backpack": (255, 165, 0),
"handbag": (255, 165, 0),
"cell phone":(0, 200, 200),
"laptop": (200, 0, 200),
"person": (0, 120, 255),
"backpack": (255, 165, 0),
"handbag": (255, 165, 0),
"cell phone": (0, 200, 200),
"laptop": (200, 0, 200),
}

def draw_detections(frame: np.ndarray, det_frame: DetectionFrame) -> np.ndarray:
"""Render detections and zone overlays on a frame.

Draws configured zone polygons, object bounding boxes, labels,
confidence scores, centroids, and frame statistics on a copy
of the input image.
def draw_detections(frame: np.ndarray, det_frame: DetectionFrameSchema) -> np.ndarray:
"""Draw bounding boxes, labels, and zone overlays onto a BGR frame.

Args:
frame: Original BGR image frame.
det_frame: Detection results generated by the detector.
Args:
frame: Original BGR image as numpy array (H, W, 3).
det_frame: DetectionFrameSchema containing all detected objects.

Returns:
Annotated image containing visualized detections and zones.
Returns:
Annotated BGR frame with boxes, labels, zones, and HUD overlay.

Example:
>>> annotated = draw_detections(frame, det_frame)
>>> cv2.imshow("Detections", annotated)
"""
Example:
annotated = draw_detections(frame, det_frame)
cv2.imshow("Output", annotated)
"""
out = frame.copy()

active_zones = get_zones()

# Draw zone polygons
for zone in active_zones:
if not getattr(zone, 'valid', True):
continue
for zone in DEFAULT_ZONES:
pts = zone.as_array().reshape((-1, 1, 2))
overlay = out.copy()
cv2.fillPoly(overlay, [pts], zone.color_bgr)
Expand All @@ -183,47 +185,41 @@ def draw_detections(frame: np.ndarray, det_frame: DetectionFrame) -> np.ndarray:
cv2.putText(out, zone.name, zone.polygon[0],
cv2.FONT_HERSHEY_SIMPLEX, 0.5, zone.color_bgr, 1)

# Draw detections
for det in det_frame.detections:
x1, y1, x2, y2 = int(det.bbox.x1), int(det.bbox.y1), int(det.bbox.x2), int(det.bbox.y2)
cx, cy = det.bbox.center
color = LABEL_COLORS.get(det.label, (200, 200, 200))
cv2.rectangle(out, (x1, y1), (x2, y2), color, 2)

label_text = f"{det.label} {det.confidence:.2f}"
if det.zones_present:
label_text += f" [{', '.join(det.zones_present)}]"

cv2.putText(out, label_text, (x1, y1 - 8),
cv2.FONT_HERSHEY_SIMPLEX, 0.55, color, 2)

# Centroid dot
cv2.circle(out, (int(det.center[0]), int(det.center[1])), 4, color, -1)
cv2.circle(out, (int(cx), int(cy)), 4, color, -1)

# HUD
cv2.putText(out, f"Frame: {det_frame.frame_id} | Detections: {len(det_frame.detections)}",
(10, 28), cv2.FONT_HERSHEY_SIMPLEX, 0.65, (255, 255, 255), 2)
cv2.putText(
out,
f"Frame: {det_frame.frame_id} | Detections: {len(det_frame.detections)}",
(10, 28),
cv2.FONT_HERSHEY_SIMPLEX,
0.65,
(255, 255, 255),
2,
)

return out


# ─── CLI Entry Point ─────────────────────────────────────────────────────────
"""Run the detection pipeline as a standalone CLI application.

Parses command-line arguments, loads a YOLO detector, processes
frames from a webcam or video source, generates scene graphs,
renders detection overlays, and optionally writes output video.

Args:
None
def main() -> None:
"""CLI entry point for running the detection demo on video or webcam.

Returns:
None
Parses arguments, initializes the Detector, and runs the inference loop.
Optionally writes annotated output to a video file.

Example:
>>> python detector.py --source data/sample_videos/sample.mp4
>>> python detector.py --source 0 --model yolov8n.pt
"""
def main() -> None:
Example:
python detection.py --source data/sample_videos/sample.mp4 --output out.mp4
"""
parser = argparse.ArgumentParser(description="Run Agentic Vision detection demo")
parser.add_argument("--source", default="0", help="Video file path or camera index")
parser.add_argument("--model", default=settings.detector_model, help="YOLO model name")
Expand All @@ -239,7 +235,7 @@ def main() -> None:
raise RuntimeError(f"Cannot open source: {source}")

fps = cap.get(cv2.CAP_PROP_FPS) or 30
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
logger.info(f"Stream: {width}x{height} @ {fps:.1f} FPS")

Expand All @@ -255,21 +251,9 @@ def main() -> None:
break

det_frame = detector.detect(frame, frame_id=frame_id)
builder = SceneGraph(det_frame)

builder.build_graph()
graph_text = builder.serialize_graph()

if frame_id % 30 == 0 and graph_text:
prompt = build_reasoning_prompt(graph_text)
print("\nLLM PROMPT:\n")
print(prompt)



annotated = draw_detections(frame, det_frame)
annotated = draw_detections(frame, det_frame)

cv2.imshow("Agentic Vision Detection", annotated)
cv2.imshow("Agentic Vision Detection", annotated)
if writer:
writer.write(annotated)

Expand Down
19 changes: 18 additions & 1 deletion services/memory/memory.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,17 +200,27 @@ def _handle_dead(self, event: TrackLifecycleEvent) -> None:

@staticmethod
def _track_key(camera_id: str, track_id: int) -> str:
"""Return the Redis key for a per-track state blob."""
return f"track:{camera_id}:{track_id}"

@staticmethod
def _event_key(camera_id: str, frame_id: int) -> str:
"""Return the Redis key for a per-frame event list."""
return f"event:{camera_id}:{frame_id}"

def _load_record(self, camera_id: str, track_id: int) -> Optional[dict]:
"""Load and deserialise a track record from Redis, or return None."""
raw = self._r.get(self._track_key(camera_id, track_id))
return json.loads(raw) if raw else None

def _update_record(self, event: TrackLifecycleEvent, state: str, anomalous: bool = False) -> None:
def _update_record(self, event: TrackLifecycleEvent, state: str) -> None:
"""
Update an existing track record's state and timing fields in Redis.

Args:
event: Source lifecycle event supplying updated field values.
state: New state string (e.g. 'LOST', 'DEAD').
"""
record = self._load_record(event.camera_id, event.track_id) or {}
record.update(
{
Expand All @@ -234,6 +244,13 @@ def _append_event(
global_id: Optional[str],
anomalous: bool = False,
) -> None:
"""
Append a lifecycle event dict to the per-frame Redis event log.

Args:
event: Source lifecycle event.
global_id: Assigned global identity string, or None.
"""
key = self._event_key(event.camera_id, event.frame_id)
raw = self._r.get(key)
evts: list[dict] = json.loads(raw) if raw else []
Expand Down
Loading
Loading