From 7c7374062a7a501d8f5ed397e030d0d6fa583db2 Mon Sep 17 00:00:00 2001 From: Riaz Virani Date: Thu, 9 Apr 2026 22:42:54 -0400 Subject: [PATCH 1/9] feat: add vision events SDK and CLI support (ENT-1082) Adds a thin-client adapter for the vision events API with no client-side validation, so new event types and fields work without SDK releases. Includes SDK methods on Workspace, typer CLI commands, and 21 tests. Endpoints: write, write-batch, query, list use cases, upload image. Co-Authored-By: Claude Opus 4.6 (1M context) --- roboflow/adapters/vision_events_api.py | 147 +++++++++ roboflow/cli/__init__.py | 2 + roboflow/cli/handlers/vision_events.py | 269 +++++++++++++++++ roboflow/core/workspace.py | 215 ++++++++++++- tests/test_vision_events.py | 399 +++++++++++++++++++++++++ 5 files changed, 1031 insertions(+), 1 deletion(-) create mode 100644 roboflow/adapters/vision_events_api.py create mode 100644 roboflow/cli/handlers/vision_events.py create mode 100644 tests/test_vision_events.py diff --git a/roboflow/adapters/vision_events_api.py b/roboflow/adapters/vision_events_api.py new file mode 100644 index 00000000..0f7a0396 --- /dev/null +++ b/roboflow/adapters/vision_events_api.py @@ -0,0 +1,147 @@ +import json +import os +from typing import Any, Dict, List, Optional + +import requests +from requests_toolbelt.multipart.encoder import MultipartEncoder + +from roboflow.adapters.rfapi import RoboflowError +from roboflow.config import API_URL + +_BASE = f"{API_URL}/vision-events" + + +def _auth_headers(api_key: str) -> Dict[str, str]: + return {"Authorization": f"Bearer {api_key}"} + + +def write_event(api_key: str, event: Dict[str, Any]) -> dict: + """Create a single vision event. + + Args: + api_key: Roboflow API key. + event: Event payload dict (eventId, eventType, useCaseId, timestamp, etc.). + + Returns: + Parsed JSON response with ``eventId`` and ``created``. + + Raises: + RoboflowError: On non-201 response status codes. + """ + response = requests.post(_BASE, json=event, headers=_auth_headers(api_key)) + if response.status_code != 201: + raise RoboflowError(response.text) + return response.json() + + +def write_batch(api_key: str, events: List[Dict[str, Any]]) -> dict: + """Create multiple vision events in a single request. + + Args: + api_key: Roboflow API key. + events: List of event payload dicts (max 100 per the server). + + Returns: + Parsed JSON response with ``created`` count and ``eventIds``. + + Raises: + RoboflowError: On non-201 response status codes. + """ + response = requests.post( + f"{_BASE}/batch", + json={"events": events}, + headers=_auth_headers(api_key), + ) + if response.status_code != 201: + raise RoboflowError(response.text) + return response.json() + + +def query(api_key: str, query_params: Dict[str, Any]) -> dict: + """Query vision events with filters and pagination. + + Args: + api_key: Roboflow API key. + query_params: Query payload (useCaseId, eventType, startTime, endTime, + cursor, limit, customMetadataFilters, etc.). + + Returns: + Parsed JSON response with ``events``, ``nextCursor``, ``hasMore``, + and ``lookbackDays``. + + Raises: + RoboflowError: On non-200 response status codes. + """ + response = requests.post( + f"{_BASE}/query", + json=query_params, + headers=_auth_headers(api_key), + ) + if response.status_code != 200: + raise RoboflowError(response.text) + return response.json() + + +def list_use_cases(api_key: str, status: Optional[str] = None) -> dict: + """List all use cases for a workspace. + + Args: + api_key: Roboflow API key. + status: Optional status filter (default server-side: "active"). + + Returns: + Parsed JSON response with ``useCases`` list and ``lookbackDays``. + + Raises: + RoboflowError: On non-200 response status codes. + """ + params: Dict[str, str] = {} + if status is not None: + params["status"] = status + response = requests.get( + f"{_BASE}/use-cases", + params=params, + headers=_auth_headers(api_key), + ) + if response.status_code != 200: + raise RoboflowError(response.text) + return response.json() + + +def upload_image( + api_key: str, + image_path: str, + name: Optional[str] = None, + metadata: Optional[Dict[str, Any]] = None, +) -> dict: + """Upload an image for use in vision events. + + Args: + api_key: Roboflow API key. + image_path: Local filesystem path to the image file. + name: Optional custom image name. + metadata: Optional flat dict of metadata to attach. + + Returns: + Parsed JSON response with ``sourceId`` (and optionally ``url``). + + Raises: + RoboflowError: On non-201 response status codes. + """ + filename = name or os.path.basename(image_path) + with open(image_path, "rb") as f: + fields: Dict[str, Any] = { + "file": (filename, f, "application/octet-stream"), + } + if name is not None: + fields["name"] = name + if metadata is not None: + fields["metadata"] = json.dumps(metadata) + m = MultipartEncoder(fields=fields) + headers = _auth_headers(api_key) + headers["Content-Type"] = m.content_type + response = requests.post(f"{_BASE}/upload", data=m, headers=headers) + + if response.status_code != 201: + raise RoboflowError(response.text) + return response.json() diff --git a/roboflow/cli/__init__.py b/roboflow/cli/__init__.py index 3b3d0c42..45d4cce8 100644 --- a/roboflow/cli/__init__.py +++ b/roboflow/cli/__init__.py @@ -185,6 +185,7 @@ def _walk(group: Any, prefix: str = "") -> None: from roboflow.cli.handlers.universe import universe_app # noqa: E402 from roboflow.cli.handlers.version import version_app # noqa: E402 from roboflow.cli.handlers.video import video_app # noqa: E402 +from roboflow.cli.handlers.vision_events import vision_events_app # noqa: E402 from roboflow.cli.handlers.workflow import workflow_app # noqa: E402 from roboflow.cli.handlers.workspace import workspace_app # noqa: E402 @@ -210,6 +211,7 @@ def _walk(group: Any, prefix: str = "") -> None: app.add_typer(universe_app, name="universe") app.add_typer(version_app, name="version") app.add_typer(video_app, name="video") +app.add_typer(vision_events_app, name="vision-events") app.add_typer(workflow_app, name="workflow") app.add_typer(workspace_app, name="workspace") diff --git a/roboflow/cli/handlers/vision_events.py b/roboflow/cli/handlers/vision_events.py new file mode 100644 index 00000000..17487fd1 --- /dev/null +++ b/roboflow/cli/handlers/vision_events.py @@ -0,0 +1,269 @@ +"""Vision events commands: write, query, list use cases, and upload images.""" + +from __future__ import annotations + +from typing import Annotated, Optional + +import typer + +from roboflow.cli._compat import SortedGroup, ctx_to_args + +vision_events_app = typer.Typer( + help="Create, query, and manage vision events.", + cls=SortedGroup, + no_args_is_help=True, +) + + +def _resolve(args): # noqa: ANN001 + """Return api_key or call output_error and return None.""" + from roboflow.cli._resolver import resolve_ws_and_key + + resolved = resolve_ws_and_key(args) + if resolved is None: + return None + _ws, api_key = resolved + return api_key + + +# --------------------------------------------------------------------------- +# write +# --------------------------------------------------------------------------- + + +@vision_events_app.command("write") +def write( + ctx: typer.Context, + event: Annotated[str, typer.Argument(help="JSON string of the event payload")], +) -> None: + """Create a single vision event.""" + args = ctx_to_args(ctx, event=event) + _write(args) + + +def _write(args) -> None: # noqa: ANN001 + import json + + from roboflow.adapters import vision_events_api + from roboflow.adapters.rfapi import RoboflowError + from roboflow.cli._output import output, output_error + + api_key = _resolve(args) + if api_key is None: + return + + try: + event = json.loads(args.event) + except (json.JSONDecodeError, TypeError) as exc: + output_error(args, f"Invalid JSON: {exc}", hint="Pass a valid JSON string.") + return + + try: + result = vision_events_api.write_event(api_key, event) + except RoboflowError as exc: + output_error(args, str(exc)) + return + + output(args, result, text=f"Created event {result.get('eventId', '')}") + + +# --------------------------------------------------------------------------- +# write-batch +# --------------------------------------------------------------------------- + + +@vision_events_app.command("write-batch") +def write_batch( + ctx: typer.Context, + events: Annotated[str, typer.Argument(help="JSON string of the events array")], +) -> None: + """Create multiple vision events in a single request.""" + args = ctx_to_args(ctx, events=events) + _write_batch(args) + + +def _write_batch(args) -> None: # noqa: ANN001 + import json + + from roboflow.adapters import vision_events_api + from roboflow.adapters.rfapi import RoboflowError + from roboflow.cli._output import output, output_error + + api_key = _resolve(args) + if api_key is None: + return + + try: + events = json.loads(args.events) + except (json.JSONDecodeError, TypeError) as exc: + output_error(args, f"Invalid JSON: {exc}", hint="Pass a valid JSON array string.") + return + + try: + result = vision_events_api.write_batch(api_key, events) + except RoboflowError as exc: + output_error(args, str(exc)) + return + + output(args, result, text=f"Created {result.get('created', 0)} event(s)") + + +# --------------------------------------------------------------------------- +# query +# --------------------------------------------------------------------------- + + +@vision_events_app.command("query") +def query( + ctx: typer.Context, + use_case: Annotated[str, typer.Argument(help="Use case identifier to query")], + event_type: Annotated[Optional[str], typer.Option("-t", "--event-type", help="Filter by event type")] = None, + start_time: Annotated[Optional[str], typer.Option("--start", help="ISO 8601 start time")] = None, + end_time: Annotated[Optional[str], typer.Option("--end", help="ISO 8601 end time")] = None, + limit: Annotated[Optional[int], typer.Option("-l", "--limit", help="Max events to return")] = None, + cursor: Annotated[Optional[str], typer.Option("--cursor", help="Pagination cursor")] = None, +) -> None: + """Query vision events with filters and pagination.""" + args = ctx_to_args( + ctx, + use_case=use_case, + event_type=event_type, + start_time=start_time, + end_time=end_time, + limit=limit, + cursor=cursor, + ) + _query(args) + + +def _query(args) -> None: # noqa: ANN001 + from roboflow.adapters import vision_events_api + from roboflow.adapters.rfapi import RoboflowError + from roboflow.cli._output import output, output_error + + api_key = _resolve(args) + if api_key is None: + return + + payload = {"useCaseId": args.use_case} + if args.event_type is not None: + payload["eventType"] = args.event_type + if args.start_time is not None: + payload["startTime"] = args.start_time + if args.end_time is not None: + payload["endTime"] = args.end_time + if args.limit is not None: + payload["limit"] = args.limit + if args.cursor is not None: + payload["cursor"] = args.cursor + + try: + result = vision_events_api.query(api_key, payload) + except RoboflowError as exc: + output_error(args, str(exc)) + return + + events = result.get("events", []) + lines = [f"Found {len(events)} event(s)."] + for evt in events: + lines.append(f" {evt.get('eventId', '')} [{evt.get('eventType', '')}]") + if result.get("nextCursor"): + lines.append(f"\nNext page: --cursor {result['nextCursor']}") + + output(args, result, text="\n".join(lines)) + + +# --------------------------------------------------------------------------- +# use-cases +# --------------------------------------------------------------------------- + + +@vision_events_app.command("use-cases") +def use_cases( + ctx: typer.Context, + status: Annotated[Optional[str], typer.Option("-s", "--status", help="Filter by status (active, inactive)")] = None, +) -> None: + """List vision event use cases for the workspace.""" + args = ctx_to_args(ctx, status=status) + _use_cases(args) + + +def _use_cases(args) -> None: # noqa: ANN001 + from roboflow.adapters import vision_events_api + from roboflow.adapters.rfapi import RoboflowError + from roboflow.cli._output import output, output_error + + api_key = _resolve(args) + if api_key is None: + return + + try: + result = vision_events_api.list_use_cases(api_key, status=args.status) + except RoboflowError as exc: + output_error(args, str(exc)) + return + + items = result.get("useCases") or result.get("solutions", []) + lines = [f"{len(items)} use case(s):"] + for uc in items: + name = uc.get("name", uc.get("id", "")) + if uc.get("eventCount") is not None: + detail = f" ({uc['eventCount']} events)" + elif uc.get("status"): + detail = f" [{uc['status']}]" + else: + detail = "" + lines.append(f" {name}{detail}") + + output(args, result, text="\n".join(lines)) + + +# --------------------------------------------------------------------------- +# upload-image +# --------------------------------------------------------------------------- + + +@vision_events_app.command("upload-image") +def upload_image( + ctx: typer.Context, + image: Annotated[str, typer.Argument(help="Path to the image file")], + name: Annotated[Optional[str], typer.Option("-n", "--name", help="Custom image name")] = None, + metadata: Annotated[ + Optional[str], + typer.Option("-M", "--metadata", help='JSON string of metadata (e.g. \'{"camera_id":"cam001"}\')'), + ] = None, +) -> None: + """Upload an image for use in vision events.""" + args = ctx_to_args(ctx, image=image, name=name, metadata=metadata) + _upload_image(args) + + +def _upload_image(args) -> None: # noqa: ANN001 + import json + + from roboflow.adapters import vision_events_api + from roboflow.adapters.rfapi import RoboflowError + from roboflow.cli._output import output, output_error + + api_key = _resolve(args) + if api_key is None: + return + + try: + parsed_metadata = json.loads(args.metadata) if args.metadata else None + except (json.JSONDecodeError, TypeError) as exc: + output_error(args, f"Invalid metadata JSON: {exc}", hint="Pass a valid JSON string.") + return + + try: + result = vision_events_api.upload_image( + api_key, + image_path=args.image, + name=args.name, + metadata=parsed_metadata, + ) + except RoboflowError as exc: + output_error(args, str(exc)) + return + + output(args, result, text=f"Uploaded image: sourceId={result.get('sourceId', '')}") diff --git a/roboflow/core/workspace.py b/roboflow/core/workspace.py index 698aa59d..0af3f7c3 100644 --- a/roboflow/core/workspace.py +++ b/roboflow/core/workspace.py @@ -13,7 +13,7 @@ from requests.exceptions import HTTPError from tqdm import tqdm -from roboflow.adapters import rfapi +from roboflow.adapters import rfapi, vision_events_api from roboflow.adapters.rfapi import AnnotationSaveError, ImageUploadError, RoboflowError from roboflow.config import API_URL, APP_URL, CLIP_FEATURIZE_URL, DEMO_KEYS from roboflow.core.project import Project @@ -938,6 +938,219 @@ def get_plan(self): return rfapi.get_plan_info(self.__api_key) + # --- Vision Events --- + + def write_vision_event(self, event: Dict[str, Any]) -> dict: + """Create a single vision event. + + The event dict is passed directly to the server with no client-side + validation, so new event types and fields work without an SDK update. + + Args: + event: Event payload containing at minimum ``eventId``, + ``eventType``, ``useCaseId``, and ``timestamp``. + + Returns: + Dict with ``eventId`` and ``created``. + + Example: + >>> ws = rf.workspace() + >>> ws.write_vision_event({ + ... "eventId": "evt-001", + ... "eventType": "quality_check", + ... "useCaseId": "manufacturing-qa", + ... "timestamp": "2024-01-15T10:30:00.000Z", + ... "eventData": {"result": "pass"}, + ... }) + """ + return vision_events_api.write_event( + api_key=self.__api_key, + event=event, + ) + + def write_vision_events_batch(self, events: List[Dict[str, Any]]) -> dict: + """Create multiple vision events in a single request. + + Args: + events: List of event payload dicts (server enforces max 100). + + Returns: + Dict with ``created`` count and ``eventIds`` list. + + Example: + >>> ws = rf.workspace() + >>> ws.write_vision_events_batch([ + ... {"eventId": "e1", "eventType": "custom", "useCaseId": "uc", "timestamp": "2024-01-15T10:00:00Z"}, + ... {"eventId": "e2", "eventType": "custom", "useCaseId": "uc", "timestamp": "2024-01-15T10:01:00Z"}, + ... ]) + """ + return vision_events_api.write_batch( + api_key=self.__api_key, + events=events, + ) + + def query_vision_events( + self, + use_case: str, + *, + event_type: Optional[str] = None, + event_types: Optional[List[str]] = None, + start_time: Optional[str] = None, + end_time: Optional[str] = None, + limit: Optional[int] = None, + cursor: Optional[str] = None, + **filters: Any, + ) -> dict: + """Query vision events with filters and pagination. + + Common filter kwargs are passed through to the server as-is, + supporting ``deviceId``, ``streamId``, ``workflowId``, + ``customMetadataFilters``, ``eventFieldFilters``, etc. + + Args: + use_case: Use case identifier to query. + event_type: Filter by a single event type. + event_types: Filter by multiple event types. + start_time: ISO 8601 start time filter. + end_time: ISO 8601 end time filter. + limit: Maximum number of events to return. + cursor: Pagination cursor from a previous response. + **filters: Additional filter parameters passed to the API. + + Returns: + Dict with ``events``, ``nextCursor``, ``hasMore``, and ``lookbackDays``. + + Example: + >>> ws = rf.workspace() + >>> page = ws.query_vision_events("manufacturing-qa", event_type="quality_check", limit=50) + >>> for evt in page["events"]: + ... print(evt["eventId"]) + """ + payload: Dict[str, Any] = {"useCaseId": use_case} + if event_type is not None: + payload["eventType"] = event_type + if event_types is not None: + payload["eventTypes"] = event_types + if start_time is not None: + payload["startTime"] = start_time + if end_time is not None: + payload["endTime"] = end_time + if limit is not None: + payload["limit"] = limit + if cursor is not None: + payload["cursor"] = cursor + payload.update(filters) + + return vision_events_api.query( + api_key=self.__api_key, + query_params=payload, + ) + + def query_all_vision_events( + self, + use_case: str, + *, + event_type: Optional[str] = None, + event_types: Optional[List[str]] = None, + start_time: Optional[str] = None, + end_time: Optional[str] = None, + limit: Optional[int] = None, + **filters: Any, + ) -> Generator[List[dict], None, None]: + """Paginated query across vision events, yielding one page at a time. + + Automatically follows ``nextCursor`` until all matching events have + been returned. + + Args: + use_case: Use case identifier to query. + event_type: Filter by a single event type. + event_types: Filter by multiple event types. + start_time: ISO 8601 start time filter. + end_time: ISO 8601 end time filter. + limit: Maximum events per page. + **filters: Additional filter parameters passed to the API. + + Yields: + A list of event dicts for each page. + + Example: + >>> ws = rf.workspace() + >>> for page in ws.query_all_vision_events("manufacturing-qa"): + ... for evt in page: + ... print(evt["eventId"]) + """ + cursor = None + while True: + response = self.query_vision_events( + use_case, + event_type=event_type, + event_types=event_types, + start_time=start_time, + end_time=end_time, + limit=limit, + cursor=cursor, + **filters, + ) + events = response.get("events", []) + if not events: + break + yield events + cursor = response.get("nextCursor") + if not cursor or not response.get("hasMore", False): + break + + def list_vision_event_use_cases(self, status: Optional[str] = None) -> dict: + """List all vision event use cases for the workspace. + + Args: + status: Optional status filter (e.g. "active", "inactive"). + + Returns: + Dict with ``useCases`` list and ``lookbackDays``. + + Example: + >>> ws = rf.workspace() + >>> result = ws.list_vision_event_use_cases() + >>> for uc in result["useCases"]: + ... print(uc["name"], uc.get("status")) + """ + result = vision_events_api.list_use_cases( + api_key=self.__api_key, + status=status, + ) + if "useCases" not in result and "solutions" in result: + result["useCases"] = result["solutions"] + return result + + def upload_vision_event_image( + self, + image_path: str, + name: Optional[str] = None, + metadata: Optional[Dict[str, Any]] = None, + ) -> dict: + """Upload an image for use in vision events. + + Args: + image_path: Local path to the image file. + name: Optional custom name for the image. + metadata: Optional flat dict of metadata to attach. + + Returns: + Dict with ``sourceId`` for referencing in events. + + Example: + >>> ws = rf.workspace() + >>> result = ws.upload_vision_event_image("photo.jpg") + >>> source_id = result["sourceId"] + """ + return vision_events_api.upload_image( + api_key=self.__api_key, + image_path=image_path, + name=name, + metadata=metadata, + ) + def __str__(self): projects = self.projects() json_value = {"name": self.name, "url": self.url, "projects": projects} diff --git a/tests/test_vision_events.py b/tests/test_vision_events.py new file mode 100644 index 00000000..5eefa93d --- /dev/null +++ b/tests/test_vision_events.py @@ -0,0 +1,399 @@ +import json +import os +import tempfile +import unittest + +import responses + +from roboflow.adapters.rfapi import RoboflowError +from roboflow.config import API_URL + +# The vision events API does not include workspace in the URL. +# Auth is via Bearer token; workspace is derived server-side from the API key. +_BASE = f"{API_URL}/vision-events" + + +class TestVisionEvents(unittest.TestCase): + API_KEY = "test_key" + WORKSPACE = "test-ws" + + def _make_workspace(self): + from roboflow.core.workspace import Workspace + + info = { + "workspace": { + "name": "Test", + "url": self.WORKSPACE, + "projects": [], + "members": [], + } + } + return Workspace(info, api_key=self.API_KEY, default_workspace=self.WORKSPACE, model_format="yolov8") + + def _assert_bearer_auth(self, call_index=0): + auth = responses.calls[call_index].request.headers.get("Authorization") + self.assertEqual(auth, f"Bearer {self.API_KEY}") + + # --- write_vision_event --- + + @responses.activate + def test_write_event(self): + responses.add(responses.POST, _BASE, json={"eventId": "evt-001"}, status=201) + + ws = self._make_workspace() + event = { + "eventId": "evt-001", + "eventType": "quality_check", + "useCaseId": "uc-1", + "timestamp": "2024-01-15T10:00:00Z", + "eventData": {"result": "pass"}, + } + result = ws.write_vision_event(event) + + self.assertEqual(result["eventId"], "evt-001") + self._assert_bearer_auth() + sent = json.loads(responses.calls[0].request.body) + self.assertEqual(sent["eventId"], "evt-001") + self.assertEqual(sent["eventType"], "quality_check") + self.assertEqual(sent["useCaseId"], "uc-1") + self.assertEqual(sent["eventData"], {"result": "pass"}) + + @responses.activate + def test_write_event_passthrough(self): + """The event dict must be sent to the server unchanged (no filtering or transformation).""" + responses.add(responses.POST, _BASE, json={"eventId": "e1"}, status=201) + + ws = self._make_workspace() + event = { + "eventId": "e1", + "eventType": "safety_alert", + "useCaseId": "warehouse-safety", + "timestamp": "2024-06-01T12:00:00Z", + "deviceId": "cam-5", + "streamId": "stream-a", + "workflowId": "wf-1", + "images": [{"sourceId": "src-1", "label": "frame"}], + "eventData": {"alertType": "fire", "severity": "high"}, + "customMetadata": {"zone": "B3", "temperature": 42.5, "active": True}, + } + ws.write_vision_event(event) + + sent = json.loads(responses.calls[0].request.body) + self.assertEqual(sent, event) + + @responses.activate + def test_write_event_error(self): + responses.add(responses.POST, _BASE, json={"error": "forbidden"}, status=403) + + ws = self._make_workspace() + with self.assertRaises(RoboflowError): + ws.write_vision_event({"eventId": "x", "eventType": "custom", "useCaseId": "s", "timestamp": "t"}) + + # --- write_vision_events_batch --- + + @responses.activate + def test_write_batch(self): + responses.add(responses.POST, f"{_BASE}/batch", json={"created": 2, "eventIds": ["e1", "e2"]}, status=201) + + ws = self._make_workspace() + events = [ + {"eventId": "e1", "eventType": "custom", "useCaseId": "s", "timestamp": "2024-01-15T10:00:00Z"}, + {"eventId": "e2", "eventType": "custom", "useCaseId": "s", "timestamp": "2024-01-15T10:01:00Z"}, + ] + result = ws.write_vision_events_batch(events) + + self.assertEqual(result["created"], 2) + self.assertEqual(result["eventIds"], ["e1", "e2"]) + self._assert_bearer_auth() + sent = json.loads(responses.calls[0].request.body) + self.assertEqual(len(sent["events"]), 2) + + @responses.activate + def test_write_batch_error(self): + responses.add(responses.POST, f"{_BASE}/batch", json={"error": "validation"}, status=400) + + ws = self._make_workspace() + with self.assertRaises(RoboflowError): + ws.write_vision_events_batch([{"bad": "event"}]) + + # --- query_vision_events --- + + @responses.activate + def test_query_basic(self): + body = { + "events": [{"eventId": "e1"}, {"eventId": "e2"}], + "nextCursor": None, + "hasMore": False, + "lookbackDays": 14, + } + responses.add(responses.POST, f"{_BASE}/query", json=body, status=200) + + ws = self._make_workspace() + result = ws.query_vision_events("my-use-case") + + self.assertEqual(len(result["events"]), 2) + self.assertFalse(result["hasMore"]) + self._assert_bearer_auth() + sent = json.loads(responses.calls[0].request.body) + self.assertEqual(sent["useCaseId"], "my-use-case") + + @responses.activate + def test_query_with_filters(self): + body = {"events": [], "nextCursor": None, "hasMore": False, "lookbackDays": 14} + responses.add(responses.POST, f"{_BASE}/query", json=body, status=200) + + ws = self._make_workspace() + ws.query_vision_events( + "my-uc", + event_type="quality_check", + start_time="2024-01-01T00:00:00Z", + end_time="2024-02-01T00:00:00Z", + limit=10, + cursor="abc123", + deviceId={"operator": "eq", "value": "cam-01"}, + ) + + sent = json.loads(responses.calls[0].request.body) + self.assertEqual(sent["useCaseId"], "my-uc") + self.assertEqual(sent["eventType"], "quality_check") + self.assertEqual(sent["startTime"], "2024-01-01T00:00:00Z") + self.assertEqual(sent["endTime"], "2024-02-01T00:00:00Z") + self.assertEqual(sent["limit"], 10) + self.assertEqual(sent["cursor"], "abc123") + self.assertEqual(sent["deviceId"], {"operator": "eq", "value": "cam-01"}) + + @responses.activate + def test_query_with_event_types_plural(self): + body = {"events": [], "nextCursor": None, "hasMore": False, "lookbackDays": 14} + responses.add(responses.POST, f"{_BASE}/query", json=body, status=200) + + ws = self._make_workspace() + ws.query_vision_events("uc", event_types=["quality_check", "safety_alert"]) + + sent = json.loads(responses.calls[0].request.body) + self.assertEqual(sent["eventTypes"], ["quality_check", "safety_alert"]) + self.assertNotIn("eventType", sent) + + @responses.activate + def test_query_omits_none_params(self): + """Optional params that are None must not appear in the payload.""" + body = {"events": [], "nextCursor": None, "hasMore": False, "lookbackDays": 14} + responses.add(responses.POST, f"{_BASE}/query", json=body, status=200) + + ws = self._make_workspace() + ws.query_vision_events("uc") + + sent = json.loads(responses.calls[0].request.body) + self.assertEqual(sent, {"useCaseId": "uc"}) + + @responses.activate + def test_query_error(self): + responses.add(responses.POST, f"{_BASE}/query", json={"error": "unauthorized"}, status=401) + + ws = self._make_workspace() + with self.assertRaises(RoboflowError): + ws.query_vision_events("my-uc") + + # --- query_all_vision_events --- + + @responses.activate + def test_query_all_single_page(self): + body = { + "events": [{"eventId": "e1"}], + "nextCursor": None, + "hasMore": False, + "lookbackDays": 14, + } + responses.add(responses.POST, f"{_BASE}/query", json=body, status=200) + + ws = self._make_workspace() + pages = list(ws.query_all_vision_events("my-uc")) + + self.assertEqual(len(pages), 1) + self.assertEqual(pages[0][0]["eventId"], "e1") + + @responses.activate + def test_query_all_multiple_pages(self): + page1 = {"events": [{"eventId": "e1"}], "nextCursor": "cursor2", "hasMore": True, "lookbackDays": 14} + page2 = {"events": [{"eventId": "e2"}], "nextCursor": None, "hasMore": False, "lookbackDays": 14} + responses.add(responses.POST, f"{_BASE}/query", json=page1, status=200) + responses.add(responses.POST, f"{_BASE}/query", json=page2, status=200) + + ws = self._make_workspace() + pages = list(ws.query_all_vision_events("my-uc")) + + self.assertEqual(len(pages), 2) + self.assertEqual(pages[0][0]["eventId"], "e1") + self.assertEqual(pages[1][0]["eventId"], "e2") + + # Verify cursor was sent in second request + sent2 = json.loads(responses.calls[1].request.body) + self.assertEqual(sent2["cursor"], "cursor2") + + @responses.activate + def test_query_all_forwards_filters(self): + """Filters must be forwarded to every page request, not just the first.""" + page1 = {"events": [{"eventId": "e1"}], "nextCursor": "c2", "hasMore": True, "lookbackDays": 14} + page2 = {"events": [{"eventId": "e2"}], "nextCursor": None, "hasMore": False, "lookbackDays": 14} + responses.add(responses.POST, f"{_BASE}/query", json=page1, status=200) + responses.add(responses.POST, f"{_BASE}/query", json=page2, status=200) + + ws = self._make_workspace() + list(ws.query_all_vision_events("uc", event_type="quality_check", limit=1)) + + sent1 = json.loads(responses.calls[0].request.body) + sent2 = json.loads(responses.calls[1].request.body) + + # Both requests should have the filter + self.assertEqual(sent1["eventType"], "quality_check") + self.assertEqual(sent2["eventType"], "quality_check") + # Second request should also have the cursor + self.assertNotIn("cursor", sent1) + self.assertEqual(sent2["cursor"], "c2") + + @responses.activate + def test_query_all_empty(self): + body = {"events": [], "nextCursor": None, "hasMore": False, "lookbackDays": 14} + responses.add(responses.POST, f"{_BASE}/query", json=body, status=200) + + ws = self._make_workspace() + pages = list(ws.query_all_vision_events("my-uc")) + + self.assertEqual(len(pages), 0) + + # --- list_vision_event_use_cases --- + + @responses.activate + def test_list_use_cases(self): + body = { + "useCases": [ + {"id": "uc-1", "name": "QA", "status": "active"}, + ], + "lookbackDays": 14, + } + responses.add(responses.GET, f"{_BASE}/use-cases", json=body, status=200) + + ws = self._make_workspace() + result = ws.list_vision_event_use_cases() + + self.assertEqual(len(result["useCases"]), 1) + self.assertEqual(result["useCases"][0]["name"], "QA") + self._assert_bearer_auth() + + @responses.activate + def test_list_use_cases_with_status(self): + body = {"useCases": [], "lookbackDays": 14} + responses.add(responses.GET, f"{_BASE}/use-cases", json=body, status=200) + + ws = self._make_workspace() + result = ws.list_vision_event_use_cases(status="inactive") + + self.assertEqual(len(result["useCases"]), 0) + # Verify status was sent as query param + self.assertIn("status=inactive", responses.calls[0].request.url) + + @responses.activate + def test_list_use_cases_legacy_solutions_response(self): + responses.add( + responses.GET, + f"{_BASE}/use-cases", + json={"solutions": [{"id": "uc-legacy", "name": "Legacy"}], "lookbackDays": 14}, + status=200, + ) + + ws = self._make_workspace() + result = ws.list_vision_event_use_cases() + self.assertEqual(result["useCases"][0]["id"], "uc-legacy") + + @responses.activate + def test_list_use_cases_error(self): + responses.add(responses.GET, f"{_BASE}/use-cases", json={"error": "forbidden"}, status=403) + + ws = self._make_workspace() + with self.assertRaises(RoboflowError): + ws.list_vision_event_use_cases() + + # --- upload_vision_event_image --- + + @responses.activate + def test_upload_image(self): + responses.add(responses.POST, f"{_BASE}/upload", json={"success": True, "sourceId": "src-123"}, status=201) + + ws = self._make_workspace() + + with tempfile.NamedTemporaryFile(suffix=".jpg", delete=False) as f: + f.write(b"\xff\xd8\xff\xe0fake-jpeg-data") + tmp_path = f.name + + try: + result = ws.upload_vision_event_image(tmp_path) + self.assertEqual(result["sourceId"], "src-123") + self._assert_bearer_auth() + finally: + os.unlink(tmp_path) + + @responses.activate + def test_upload_image_uses_basename(self): + """When no name is provided, the multipart filename should be the basename of the path.""" + responses.add(responses.POST, f"{_BASE}/upload", json={"success": True, "sourceId": "src-789"}, status=201) + + ws = self._make_workspace() + + with tempfile.NamedTemporaryFile(suffix=".jpg", delete=False, prefix="myimage_") as f: + f.write(b"\xff\xd8\xff\xe0fake") + tmp_path = f.name + + try: + ws.upload_vision_event_image(tmp_path) + request_body = responses.calls[0].request.body + basename = os.path.basename(tmp_path).encode() + if isinstance(request_body, bytes): + self.assertIn(basename, request_body) + finally: + os.unlink(tmp_path) + + @responses.activate + def test_upload_image_with_metadata(self): + responses.add(responses.POST, f"{_BASE}/upload", json={"success": True, "sourceId": "src-456"}, status=201) + + ws = self._make_workspace() + + with tempfile.NamedTemporaryFile(suffix=".png", delete=False) as f: + f.write(b"\x89PNGfake-png-data") + tmp_path = f.name + + try: + result = ws.upload_vision_event_image( + tmp_path, + name="custom-name.png", + metadata={"camera_id": "cam-01"}, + ) + self.assertEqual(result["sourceId"], "src-456") + + request_body = responses.calls[0].request.body + # Verify metadata and name were included in the multipart body + if isinstance(request_body, bytes): + self.assertIn(b"cam-01", request_body) + self.assertIn(b"custom-name.png", request_body) + finally: + os.unlink(tmp_path) + + @responses.activate + def test_upload_image_error(self): + responses.add(responses.POST, f"{_BASE}/upload", json={"error": "forbidden"}, status=403) + + ws = self._make_workspace() + + with tempfile.NamedTemporaryFile(suffix=".jpg", delete=False) as f: + f.write(b"data") + tmp_path = f.name + + try: + with self.assertRaises(RoboflowError): + ws.upload_vision_event_image(tmp_path) + finally: + os.unlink(tmp_path) + + +if __name__ == "__main__": + unittest.main() From d298418f8ac814203ea89cf958e08742361b49a3 Mon Sep 17 00:00:00 2001 From: Riaz Virani Date: Thu, 9 Apr 2026 23:23:13 -0400 Subject: [PATCH 2/9] feat: add custom metadata schema endpoint (ENT-1082) Adds get_custom_metadata_schema to the adapter, get_vision_event_metadata_schema to Workspace, and "roboflow vision-events metadata-schema" CLI command. Co-Authored-By: Claude Opus 4.6 (1M context) --- roboflow/adapters/vision_events_api.py | 22 +++++++++++++++ roboflow/cli/handlers/vision_events.py | 39 ++++++++++++++++++++++++++ roboflow/core/workspace.py | 23 +++++++++++++++ tests/test_vision_events.py | 39 ++++++++++++++++++++++++++ 4 files changed, 123 insertions(+) diff --git a/roboflow/adapters/vision_events_api.py b/roboflow/adapters/vision_events_api.py index 0f7a0396..f43ad34a 100644 --- a/roboflow/adapters/vision_events_api.py +++ b/roboflow/adapters/vision_events_api.py @@ -108,6 +108,28 @@ def list_use_cases(api_key: str, status: Optional[str] = None) -> dict: return response.json() +def get_custom_metadata_schema(api_key: str, use_case_id: str) -> dict: + """Get the custom metadata schema for a use case. + + Args: + api_key: Roboflow API key. + use_case_id: Use case identifier. + + Returns: + Parsed JSON response with ``fields`` mapping field names to their types. + + Raises: + RoboflowError: On non-200 response status codes. + """ + response = requests.get( + f"{_BASE}/custom-metadata-schema/{use_case_id}", + headers=_auth_headers(api_key), + ) + if response.status_code != 200: + raise RoboflowError(response.text) + return response.json() + + def upload_image( api_key: str, image_path: str, diff --git a/roboflow/cli/handlers/vision_events.py b/roboflow/cli/handlers/vision_events.py index 17487fd1..497936c0 100644 --- a/roboflow/cli/handlers/vision_events.py +++ b/roboflow/cli/handlers/vision_events.py @@ -218,6 +218,45 @@ def _use_cases(args) -> None: # noqa: ANN001 output(args, result, text="\n".join(lines)) +# --------------------------------------------------------------------------- +# metadata-schema +# --------------------------------------------------------------------------- + + +@vision_events_app.command("metadata-schema") +def metadata_schema( + ctx: typer.Context, + use_case: Annotated[str, typer.Argument(help="Use case identifier")], +) -> None: + """Get the custom metadata schema for a use case.""" + args = ctx_to_args(ctx, use_case=use_case) + _metadata_schema(args) + + +def _metadata_schema(args) -> None: # noqa: ANN001 + from roboflow.adapters import vision_events_api + from roboflow.adapters.rfapi import RoboflowError + from roboflow.cli._output import output, output_error + + api_key = _resolve(args) + if api_key is None: + return + + try: + result = vision_events_api.get_custom_metadata_schema(api_key, args.use_case) + except RoboflowError as exc: + output_error(args, str(exc)) + return + + fields = result.get("fields", {}) + lines = [f"{len(fields)} field(s):"] + for name, info in fields.items(): + types = ", ".join(info.get("types", [])) + lines.append(f" {name} ({types})") + + output(args, result, text="\n".join(lines)) + + # --------------------------------------------------------------------------- # upload-image # --------------------------------------------------------------------------- diff --git a/roboflow/core/workspace.py b/roboflow/core/workspace.py index 0af3f7c3..398d56b4 100644 --- a/roboflow/core/workspace.py +++ b/roboflow/core/workspace.py @@ -1123,6 +1123,29 @@ def list_vision_event_use_cases(self, status: Optional[str] = None) -> dict: result["useCases"] = result["solutions"] return result + def get_vision_event_metadata_schema(self, use_case: str) -> dict: + """Get the custom metadata schema for a vision event use case. + + Returns discovered field names and their types, useful for building + queries with ``customMetadataFilters``. + + Args: + use_case: Use case identifier. + + Returns: + Dict with ``fields`` mapping field names to ``{"types": [...]}``. + + Example: + >>> ws = rf.workspace() + >>> schema = ws.get_vision_event_metadata_schema("manufacturing-qa") + >>> for field, info in schema["fields"].items(): + ... print(field, info["types"]) + """ + return vision_events_api.get_custom_metadata_schema( + api_key=self.__api_key, + use_case_id=use_case, + ) + def upload_vision_event_image( self, image_path: str, diff --git a/tests/test_vision_events.py b/tests/test_vision_events.py index 5eefa93d..6982083c 100644 --- a/tests/test_vision_events.py +++ b/tests/test_vision_events.py @@ -313,6 +313,45 @@ def test_list_use_cases_error(self): with self.assertRaises(RoboflowError): ws.list_vision_event_use_cases() + # --- get_vision_event_metadata_schema --- + + @responses.activate + def test_get_metadata_schema(self): + body = { + "useCaseId": "manufacturing-qa", + "fields": { + "temperature": {"types": ["number"]}, + "zone": {"types": ["string"]}, + "active": {"types": ["boolean"]}, + }, + } + responses.add( + responses.GET, + f"{_BASE}/custom-metadata-schema/manufacturing-qa", + json=body, + status=200, + ) + + ws = self._make_workspace() + result = ws.get_vision_event_metadata_schema("manufacturing-qa") + + self.assertEqual(len(result["fields"]), 3) + self.assertEqual(result["fields"]["temperature"]["types"], ["number"]) + self._assert_bearer_auth() + + @responses.activate + def test_get_metadata_schema_error(self): + responses.add( + responses.GET, + f"{_BASE}/custom-metadata-schema/nonexistent", + json={"error": "not found"}, + status=404, + ) + + ws = self._make_workspace() + with self.assertRaises(RoboflowError): + ws.get_vision_event_metadata_schema("nonexistent") + # --- upload_vision_event_image --- @responses.activate From 41e47a56b326d2ff9cdaf4556e615ba5dff9b26c Mon Sep 17 00:00:00 2001 From: Riaz Virani Date: Tue, 14 Apr 2026 12:25:29 -0400 Subject: [PATCH 3/9] feat: add roboflow-slim package for lightweight installs (ENT-1082) Publishes a second PyPI package "roboflow-slim" from the same codebase with only lightweight dependencies (no opencv, numpy, matplotlib, Pillow). Supports vision events, workspace management, auth, and CLI. - Guard heavy imports (Project, Workspace, models) with try/except - Make image_utils a lazy import in rfapi.upload_image - Move PIL import into two_stage/two_stage_ocr methods - Add requirements-slim.txt, setup_slim.py, publish-slim CI job - Add test-slim CI job and slim compat tests pip install roboflow is completely unchanged. Co-Authored-By: Claude Opus 4.6 (1M context) --- .github/workflows/publish.yml | 23 +++++++++ .github/workflows/test.yml | 21 ++++++++ Makefile | 8 ++- requirements-slim.txt | 12 +++++ roboflow/__init__.py | 22 ++++++-- roboflow/adapters/rfapi.py | 3 +- roboflow/core/workspace.py | 5 +- setup_slim.py | 54 +++++++++++++++++++ tests/test_slim_compat.py | 97 +++++++++++++++++++++++++++++++++++ 9 files changed, 239 insertions(+), 6 deletions(-) create mode 100644 requirements-slim.txt create mode 100644 setup_slim.py create mode 100644 tests/test_slim_compat.py diff --git a/.github/workflows/publish.yml b/.github/workflows/publish.yml index dcc16258..42234b56 100644 --- a/.github/workflows/publish.yml +++ b/.github/workflows/publish.yml @@ -28,6 +28,29 @@ jobs: run: | make publish -e PYPI_USERNAME=$PYPI_USERNAME -e PYPI_PASSWORD=$PYPI_PASSWORD -e PYPI_TEST_PASSWORD=$PYPI_TEST_PASSWORD + build-slim: + needs: build + runs-on: ubuntu-latest + steps: + - name: ๐Ÿ›Ž๏ธ Checkout + uses: actions/checkout@v4 + with: + ref: ${{ github.head_ref }} + - name: ๐Ÿ Set up Python + uses: actions/setup-python@v5 + with: + python-version: '3.10' + - name: ๐Ÿฆพ Install dependencies + run: | + python -m pip install --upgrade pip + pip install ".[dev]" + - name: ๐Ÿš€ Publish roboflow-slim to PyPi + env: + PYPI_USERNAME: ${{ secrets.PYPI_USERNAME }} + PYPI_PASSWORD: ${{ secrets.PYPI_PASSWORD }} + run: | + make publish-slim -e PYPI_USERNAME=$PYPI_USERNAME -e PYPI_PASSWORD=$PYPI_PASSWORD + deploy-docs: needs: build runs-on: ubuntu-latest diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index fd99017c..7726cf35 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -35,3 +35,24 @@ jobs: make check_code_quality - name: ๐Ÿงช Run tests run: "python -m unittest" + + test-slim: + runs-on: ubuntu-latest + steps: + - name: ๐Ÿ›Ž๏ธ Checkout + uses: actions/checkout@v4 + with: + ref: ${{ github.event.pull_request.head.ref }} + repository: ${{ github.event.pull_request.head.repo.full_name }} + - name: ๐Ÿ Set up Python 3.10 + uses: actions/setup-python@v5 + with: + python-version: '3.10' + - name: ๐Ÿฆพ Install slim dependencies + run: | + python -m pip install --upgrade pip + pip install -r requirements-slim.txt + pip install -e . --no-deps + pip install responses + - name: ๐Ÿงช Run slim-compatible tests + run: "python -m unittest tests.test_slim_compat tests.test_vision_events" diff --git a/Makefile b/Makefile index 1d59d41f..a5a30fcb 100644 --- a/Makefile +++ b/Makefile @@ -1,4 +1,4 @@ -.PHONY: style check_code_quality publish +.PHONY: style check_code_quality publish publish-slim export PYTHONPATH = . check_dirs := roboflow @@ -16,3 +16,9 @@ publish: python setup.py sdist bdist_wheel twine check dist/* twine upload dist/* -u ${PYPI_USERNAME} -p ${PYPI_PASSWORD} --verbose + +publish-slim: + rm -rf dist/ build/ *.egg-info + python setup_slim.py sdist bdist_wheel + twine check dist/* + twine upload dist/* -u ${PYPI_USERNAME} -p ${PYPI_PASSWORD} --verbose diff --git a/requirements-slim.txt b/requirements-slim.txt new file mode 100644 index 00000000..9709e296 --- /dev/null +++ b/requirements-slim.txt @@ -0,0 +1,12 @@ +certifi +idna +requests +urllib3>=1.26.6 +tqdm>=4.41.0 +PyYAML>=5.3.1 +requests_toolbelt +filetype +typer>=0.12.0 +python-dateutil +python-dotenv +six diff --git a/roboflow/__init__.py b/roboflow/__init__.py index 03f9acb7..31e19695 100644 --- a/roboflow/__init__.py +++ b/roboflow/__init__.py @@ -10,11 +10,18 @@ from roboflow.adapters import rfapi from roboflow.config import API_URL, APP_URL, DEMO_KEYS, load_roboflow_api_key -from roboflow.core.project import Project -from roboflow.core.workspace import Workspace -from roboflow.models import CLIPModel, GazeModel # noqa: F401 from roboflow.util.general import write_line +try: + from roboflow.core.project import Project + from roboflow.core.workspace import Workspace + from roboflow.models import CLIPModel, GazeModel # noqa: F401 +except ImportError: + Project = None # type: ignore[assignment,misc] + Workspace = None # type: ignore[assignment,misc] + CLIPModel = None # type: ignore[assignment,misc] + GazeModel = None # type: ignore[assignment,misc] + __version__ = "1.3.1" @@ -226,6 +233,11 @@ def auth(self): return self def workspace(self, the_workspace=None): + if Workspace is None: + raise ImportError( + "Workspace requires additional dependencies. Install the full package: pip install roboflow" + ) + sys.stdout.write("\r" + "loading Roboflow workspace...") sys.stdout.write("\n") sys.stdout.flush() @@ -250,6 +262,10 @@ def project(self, project_name, the_workspace=None): :param the_workspace workspace name :return project object """ + if Project is None: + raise ImportError( + "Project requires additional dependencies. Install the full package: pip install roboflow" + ) if the_workspace is None: if "/" in project_name: diff --git a/roboflow/adapters/rfapi.py b/roboflow/adapters/rfapi.py index 2b7da820..5e224c02 100644 --- a/roboflow/adapters/rfapi.py +++ b/roboflow/adapters/rfapi.py @@ -8,7 +8,6 @@ from requests_toolbelt.multipart.encoder import MultipartEncoder from roboflow.config import API_URL, DEFAULT_BATCH_NAME, DEFAULT_JOB_NAME -from roboflow.util import image_utils class RoboflowError(Exception): @@ -294,6 +293,8 @@ def upload_image( # If image is not a hosted image if not hosted_image: + from roboflow.util import image_utils + image_name = os.path.basename(image_path) imgjpeg = image_utils.file2jpeg(image_path) diff --git a/roboflow/core/workspace.py b/roboflow/core/workspace.py index 398d56b4..ffcdb47e 100644 --- a/roboflow/core/workspace.py +++ b/roboflow/core/workspace.py @@ -9,7 +9,6 @@ from typing import Any, Dict, Generator, List, Optional import requests -from PIL import Image from requests.exceptions import HTTPError from tqdm import tqdm @@ -176,6 +175,8 @@ def two_stage( # TODO: fix docs dict: a json obj containing the results of the second stage detection """ # noqa: E501 // docs + from PIL import Image + results = [] # create PIL image for cropping @@ -245,6 +246,8 @@ def two_stage_ocr( # TODO: fix docs dict: a json obj containing the results of the second stage detection """ # noqa: E501 // docs + from PIL import Image + results = [] # create PIL image for cropping diff --git a/setup_slim.py b/setup_slim.py new file mode 100644 index 00000000..cf93fca9 --- /dev/null +++ b/setup_slim.py @@ -0,0 +1,54 @@ +import re + +import setuptools +from setuptools import find_packages + +with open("./roboflow/__init__.py") as f: + content = f.read() +_search_version = re.search(r'__version__\s*=\s*[\'"]([^\'"]*)[\'"]', content) +assert _search_version +version = _search_version.group(1) + + +with open("README.md") as fh: + long_description = fh.read() + +with open("requirements-slim.txt") as fh: + install_requires = fh.read().split("\n") + +setuptools.setup( + name="roboflow-slim", + version=version, + author="Roboflow", + author_email="support@roboflow.com", + description="Lightweight Roboflow SDK for vision events, workspace management, and CLI", + long_description=long_description, + long_description_content_type="text/markdown", + url="https://github.com/roboflow-ai/roboflow-python", + install_requires=install_requires, + packages=find_packages(exclude=("tests",)), + extras_require={ + "dev": [ + "mypy", + "responses", + "ruff", + "twine", + "types-pyyaml", + "types-requests", + "types-setuptools", + "types-tqdm", + "wheel", + ], + }, + entry_points={ + "console_scripts": [ + "roboflow=roboflow.roboflowpy:main", + ], + }, + classifiers=[ + "Programming Language :: Python :: 3", + "License :: OSI Approved :: Apache Software License", + "Operating System :: OS Independent", + ], + python_requires=">=3.10", +) diff --git a/tests/test_slim_compat.py b/tests/test_slim_compat.py new file mode 100644 index 00000000..adcd6a35 --- /dev/null +++ b/tests/test_slim_compat.py @@ -0,0 +1,97 @@ +"""Tests for slim install compatibility. + +Verifies that the package can be imported and lightweight features work +even when heavy dependencies (PIL, opencv, numpy, matplotlib) are missing. + +In a full install, these tests verify the guards don't break normal behavior. +In a slim install, they verify graceful degradation. +""" + +import unittest + + +class TestSlimImport(unittest.TestCase): + """Verify that importing the package always succeeds.""" + + def test_import_roboflow(self): + import roboflow + + self.assertIsNotNone(roboflow.__version__) + + def test_import_vision_events_adapter(self): + from roboflow.adapters import vision_events_api + + self.assertTrue(callable(vision_events_api.write_event)) + self.assertTrue(callable(vision_events_api.write_batch)) + self.assertTrue(callable(vision_events_api.query)) + self.assertTrue(callable(vision_events_api.list_use_cases)) + self.assertTrue(callable(vision_events_api.get_custom_metadata_schema)) + self.assertTrue(callable(vision_events_api.upload_image)) + + def test_import_config(self): + from roboflow.config import API_URL + + self.assertIsInstance(API_URL, str) + + def test_import_rfapi(self): + from roboflow.adapters.rfapi import RoboflowError + + self.assertTrue(issubclass(RoboflowError, Exception)) + + def test_import_cli(self): + from roboflow.cli import app + + self.assertIsNotNone(app) + + +class TestSlimGracefulDegradation(unittest.TestCase): + """Verify that heavy features fail with clear errors when deps are missing. + + These tests only exercise the error path when PIL/opencv are absent. + In a full install they verify the guard exists but doesn't fire. + """ + + def test_workspace_and_project_attributes_exist(self): + """Workspace and Project are either real classes or None sentinels.""" + import roboflow + + # In full install these are classes; in slim they're None + ws = roboflow.Workspace + proj = roboflow.Project + self.assertTrue(ws is None or callable(ws)) + self.assertTrue(proj is None or callable(proj)) + + def test_roboflow_workspace_guard(self): + """If Workspace is None (slim), calling workspace() raises ImportError.""" + import roboflow + + if roboflow.Workspace is not None: + self.skipTest("Full install, Workspace is available") + + rf = roboflow.Roboflow.__new__(roboflow.Roboflow) + rf.api_key = "test" + rf.current_workspace = "test" + rf.model_format = "yolov8" + + with self.assertRaises(ImportError) as ctx: + rf.workspace() + self.assertIn("pip install roboflow", str(ctx.exception)) + + def test_roboflow_project_guard(self): + """If Project is None (slim), calling project() raises ImportError.""" + import roboflow + + if roboflow.Project is not None: + self.skipTest("Full install, Project is available") + + rf = roboflow.Roboflow.__new__(roboflow.Roboflow) + rf.api_key = "test" + rf.current_workspace = "test" + + with self.assertRaises(ImportError) as ctx: + rf.project("test-project") + self.assertIn("pip install roboflow", str(ctx.exception)) + + +if __name__ == "__main__": + unittest.main() From 6ec7beea34b7d092710101539cd56cdbf5d9c116 Mon Sep 17 00:00:00 2001 From: Riaz Virani Date: Tue, 14 Apr 2026 14:08:39 -0400 Subject: [PATCH 4/9] fix: slim CI tests skip Workspace imports when PIL is missing The test-slim CI job fails because test_vision_events.py imports Workspace which pulls in PIL via its import chain. Fix by catching ImportError in _make_workspace() and skipping, plus adding adapter-level tests that work without Workspace for slim coverage. Co-Authored-By: Claude Opus 4.6 (1M context) --- tests/test_vision_events.py | 63 ++++++++++++++++++++++++++++++++++++- 1 file changed, 62 insertions(+), 1 deletion(-) diff --git a/tests/test_vision_events.py b/tests/test_vision_events.py index 6982083c..180ed7f9 100644 --- a/tests/test_vision_events.py +++ b/tests/test_vision_events.py @@ -18,7 +18,10 @@ class TestVisionEvents(unittest.TestCase): WORKSPACE = "test-ws" def _make_workspace(self): - from roboflow.core.workspace import Workspace + try: + from roboflow.core.workspace import Workspace + except ImportError: + self.skipTest("Workspace requires PIL (not available in slim install)") info = { "workspace": { @@ -434,5 +437,63 @@ def test_upload_image_error(self): os.unlink(tmp_path) +class TestVisionEventsAdapter(unittest.TestCase): + """Tests that call the adapter directly (no Workspace). Work in slim installs.""" + + API_KEY = "test_key" + + @responses.activate + def test_adapter_write_event(self): + from roboflow.adapters import vision_events_api + + responses.add(responses.POST, _BASE, json={"eventId": "e1", "created": True}, status=201) + result = vision_events_api.write_event(self.API_KEY, {"eventId": "e1", "eventType": "custom", "useCaseId": "uc"}) + self.assertEqual(result["eventId"], "e1") + self.assertEqual(responses.calls[0].request.headers["Authorization"], f"Bearer {self.API_KEY}") + + @responses.activate + def test_adapter_write_batch(self): + from roboflow.adapters import vision_events_api + + responses.add(responses.POST, f"{_BASE}/batch", json={"created": 1, "eventIds": ["e1"]}, status=201) + result = vision_events_api.write_batch(self.API_KEY, [{"eventId": "e1"}]) + self.assertEqual(result["created"], 1) + + @responses.activate + def test_adapter_query(self): + from roboflow.adapters import vision_events_api + + body = {"events": [{"eventId": "e1"}], "nextCursor": None, "hasMore": False, "lookbackDays": 14} + responses.add(responses.POST, f"{_BASE}/query", json=body, status=200) + result = vision_events_api.query(self.API_KEY, {"useCaseId": "uc"}) + self.assertEqual(len(result["events"]), 1) + + @responses.activate + def test_adapter_list_use_cases(self): + from roboflow.adapters import vision_events_api + + body = {"useCases": [{"id": "uc-1", "name": "QA"}], "lookbackDays": 14} + responses.add(responses.GET, f"{_BASE}/use-cases", json=body, status=200) + result = vision_events_api.list_use_cases(self.API_KEY) + self.assertEqual(len(result["useCases"]), 1) + + @responses.activate + def test_adapter_get_metadata_schema(self): + from roboflow.adapters import vision_events_api + + body = {"useCaseId": "uc-1", "fields": {"temp": {"types": ["number"]}}} + responses.add(responses.GET, f"{_BASE}/custom-metadata-schema/uc-1", json=body, status=200) + result = vision_events_api.get_custom_metadata_schema(self.API_KEY, "uc-1") + self.assertEqual(result["fields"]["temp"]["types"], ["number"]) + + @responses.activate + def test_adapter_error_raises_roboflow_error(self): + from roboflow.adapters import vision_events_api + + responses.add(responses.POST, _BASE, json={"error": "forbidden"}, status=403) + with self.assertRaises(RoboflowError): + vision_events_api.write_event(self.API_KEY, {"eventId": "x"}) + + if __name__ == "__main__": unittest.main() From 3488f7e0251659226f026c86eef7160359d029f2 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Tue, 14 Apr 2026 18:08:51 +0000 Subject: [PATCH 5/9] =?UTF-8?q?fix(pre=5Fcommit):=20=F0=9F=8E=A8=20auto=20?= =?UTF-8?q?format=20pre-commit=20hooks?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- tests/test_vision_events.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/tests/test_vision_events.py b/tests/test_vision_events.py index 180ed7f9..bcf480b4 100644 --- a/tests/test_vision_events.py +++ b/tests/test_vision_events.py @@ -447,7 +447,9 @@ def test_adapter_write_event(self): from roboflow.adapters import vision_events_api responses.add(responses.POST, _BASE, json={"eventId": "e1", "created": True}, status=201) - result = vision_events_api.write_event(self.API_KEY, {"eventId": "e1", "eventType": "custom", "useCaseId": "uc"}) + result = vision_events_api.write_event( + self.API_KEY, {"eventId": "e1", "eventType": "custom", "useCaseId": "uc"} + ) self.assertEqual(result["eventId"], "e1") self.assertEqual(responses.calls[0].request.headers["Authorization"], f"Bearer {self.API_KEY}") From f5bbdd6816d15fac531f8a4ab806def3c81b0ca4 Mon Sep 17 00:00:00 2001 From: Riaz Virani Date: Tue, 14 Apr 2026 14:15:32 -0400 Subject: [PATCH 6/9] fix: make Workspace importable in slim mode Move all heavy imports in workspace.py (Project, folderparser, PIL, active_learning_utils, image_utils, model_processor, two_stage_utils) to lazy imports inside the methods that use them. Workspace now imports cleanly without PIL/opencv, so vision events, search, folders, and workflows all work in slim installs. Update test mock paths to match the new import locations. Co-Authored-By: Claude Opus 4.6 (1M context) --- roboflow/__init__.py | 8 +------- roboflow/core/workspace.py | 32 +++++++++++++++++++++++--------- tests/test_project.py | 24 ++++++++++++------------ tests/test_slim_compat.py | 27 +++++++-------------------- tests/test_vision_events.py | 5 +---- 5 files changed, 44 insertions(+), 52 deletions(-) diff --git a/roboflow/__init__.py b/roboflow/__init__.py index 31e19695..30ed3fd1 100644 --- a/roboflow/__init__.py +++ b/roboflow/__init__.py @@ -10,15 +10,14 @@ from roboflow.adapters import rfapi from roboflow.config import API_URL, APP_URL, DEMO_KEYS, load_roboflow_api_key +from roboflow.core.workspace import Workspace from roboflow.util.general import write_line try: from roboflow.core.project import Project - from roboflow.core.workspace import Workspace from roboflow.models import CLIPModel, GazeModel # noqa: F401 except ImportError: Project = None # type: ignore[assignment,misc] - Workspace = None # type: ignore[assignment,misc] CLIPModel = None # type: ignore[assignment,misc] GazeModel = None # type: ignore[assignment,misc] @@ -233,11 +232,6 @@ def auth(self): return self def workspace(self, the_workspace=None): - if Workspace is None: - raise ImportError( - "Workspace requires additional dependencies. Install the full package: pip install roboflow" - ) - sys.stdout.write("\r" + "loading Roboflow workspace...") sys.stdout.write("\n") sys.stdout.flush() diff --git a/roboflow/core/workspace.py b/roboflow/core/workspace.py index ffcdb47e..916d6b04 100644 --- a/roboflow/core/workspace.py +++ b/roboflow/core/workspace.py @@ -14,15 +14,7 @@ from roboflow.adapters import rfapi, vision_events_api from roboflow.adapters.rfapi import AnnotationSaveError, ImageUploadError, RoboflowError -from roboflow.config import API_URL, APP_URL, CLIP_FEATURIZE_URL, DEMO_KEYS -from roboflow.core.project import Project -from roboflow.util import folderparser -from roboflow.util.active_learning_utils import check_box_size, clip_encode, count_comparisons -from roboflow.util.general import extract_zip as _extract_zip -from roboflow.util.image_utils import load_labelmap -from roboflow.util.model_processor import process -from roboflow.util.two_stage_utils import ocr_infer -from roboflow.util.versions import normalize_yolo_model_type +from roboflow.config import API_URL, APP_URL, DEMO_KEYS class Workspace: @@ -63,6 +55,8 @@ def projects(self): Returns: List of Project objects. """ + from roboflow.core.project import Project + projects_array = [] for a_project in self.project_list: proj = Project(self.__api_key, a_project, self.model_format) @@ -82,6 +76,8 @@ def project(self, project_id): Returns: Project Object """ + from roboflow.core.project import Project + sys.stdout.write("\r" + "loading Roboflow project...") sys.stdout.write("\n") sys.stdout.flush() @@ -112,6 +108,8 @@ def create_project(self, project_name, project_type, project_license, annotation Returns: Project Object """ # noqa: E501 // docs + from roboflow.core.project import Project + data = { "name": project_name, "type": project_type, @@ -142,6 +140,9 @@ def clip_compare(self, dir: str = "", image_ext: str = ".png", target_image: str dict: a key:value mapping of image_name:comparison_score_to_target """ # noqa: E501 // docs + from roboflow.config import CLIP_FEATURIZE_URL + from roboflow.util.active_learning_utils import clip_encode + # list to store comparison results in comparisons = [] # grab all images in a given directory with ext type @@ -248,6 +249,8 @@ def two_stage_ocr( """ # noqa: E501 // docs from PIL import Image + from roboflow.util.two_stage_utils import ocr_infer + results = [] # create PIL image for cropping @@ -310,6 +313,9 @@ def upload_dataset( num_retries (int, optional): number of times to retry uploading an image if the upload fails. Defaults to 0. is_prediction (bool, optional): whether the annotations provided in the dataset are predictions and not ground truth. Defaults to False. """ # noqa: E501 // docs + from roboflow.util import folderparser + from roboflow.util.image_utils import load_labelmap + if dataset_format != "NOT_USED": print("Warning: parameter 'dataset_format' is deprecated and will be removed in a future release") project, created = self._get_or_create_project( @@ -463,6 +469,9 @@ def active_learning( use_localhost: (bool) = determines if local http format used or remote endpoint local_server: (str) = local http address for inference server, use_localhost must be True for this to be used """ # noqa: E501 // docs + from roboflow.config import CLIP_FEATURIZE_URL + from roboflow.util.active_learning_utils import check_box_size, clip_encode, count_comparisons + if inference_endpoint is None: inference_endpoint = [] if conditionals is None: @@ -611,6 +620,9 @@ def deploy_model( filename (str, optional): The name of the weights file. Defaults to "weights/best.pt". """ + from roboflow.util.model_processor import process + from roboflow.util.versions import normalize_yolo_model_type + if not project_ids: raise ValueError("At least one project ID must be provided") @@ -805,6 +817,8 @@ def search_export( ValueError: If both *dataset* and *annotation_group* are provided. RoboflowError: On API errors or export timeout. """ + from roboflow.util.general import extract_zip as _extract_zip + if dataset is not None and annotation_group is not None: raise ValueError("dataset and annotation_group are mutually exclusive; provide only one") diff --git a/tests/test_project.py b/tests/test_project.py index 4568ed7e..87dbde92 100644 --- a/tests/test_project.py +++ b/tests/test_project.py @@ -61,15 +61,15 @@ def _setup_upload_dataset_mocks( # Create the mock objects mocks = { - "parser": patch("roboflow.core.workspace.folderparser.parsefolder", return_value=test_dataset), - "upload": patch("roboflow.core.workspace.Project.upload_image", side_effect=upload_image_side_effect) + "parser": patch("roboflow.util.folderparser.parsefolder", return_value=test_dataset), + "upload": patch("roboflow.core.project.Project.upload_image", side_effect=upload_image_side_effect) if upload_image_side_effect - else patch("roboflow.core.workspace.Project.upload_image", return_value=image_return), + else patch("roboflow.core.project.Project.upload_image", return_value=image_return), "save_annotation": patch( - "roboflow.core.workspace.Project.save_annotation", side_effect=save_annotation_side_effect + "roboflow.core.project.Project.save_annotation", side_effect=save_annotation_side_effect ) if save_annotation_side_effect - else patch("roboflow.core.workspace.Project.save_annotation", return_value=annotation_return), + else patch("roboflow.core.project.Project.save_annotation", return_value=annotation_return), "get_project": patch( "roboflow.core.workspace.Workspace._get_or_create_project", return_value=(self.project, project_created) ), @@ -348,7 +348,7 @@ def test_project_upload_dataset(self): "extra_mocks": [ ( "load_labelmap", - "roboflow.core.workspace.load_labelmap", + "roboflow.util.image_utils.load_labelmap", {"return_value": {"old_label": "new_label"}}, ) ], @@ -650,13 +650,13 @@ def capture_annotation_calls(annotation_path, **kwargs): return ({"success": True}, 0.1, 0) mocks = { - "parser": patch("roboflow.core.workspace.folderparser.parsefolder", return_value=parsed_dataset), + "parser": patch("roboflow.util.folderparser.parsefolder", return_value=parsed_dataset), "upload": patch( - "roboflow.core.workspace.Project.upload_image", + "roboflow.core.project.Project.upload_image", return_value=({"id": "test-id", "success": True}, 0.1, 0), ), "save_annotation": patch( - "roboflow.core.workspace.Project.save_annotation", side_effect=capture_annotation_calls + "roboflow.core.project.Project.save_annotation", side_effect=capture_annotation_calls ), "get_project": patch( "roboflow.core.workspace.Workspace._get_or_create_project", return_value=(self.project, False) @@ -737,13 +737,13 @@ def capture_annotation_calls(annotation_path, **kwargs): return ({"success": True}, 0.1, 0) mocks = { - "parser": patch("roboflow.core.workspace.folderparser.parsefolder", return_value=parsed_dataset), + "parser": patch("roboflow.util.folderparser.parsefolder", return_value=parsed_dataset), "upload": patch( - "roboflow.core.workspace.Project.upload_image", + "roboflow.core.project.Project.upload_image", return_value=({"id": "test-id", "success": True}, 0.1, 0), ), "save_annotation": patch( - "roboflow.core.workspace.Project.save_annotation", side_effect=capture_annotation_calls + "roboflow.core.project.Project.save_annotation", side_effect=capture_annotation_calls ), "get_project": patch( "roboflow.core.workspace.Workspace._get_or_create_project", return_value=(self.project, False) diff --git a/tests/test_slim_compat.py b/tests/test_slim_compat.py index adcd6a35..972eeacf 100644 --- a/tests/test_slim_compat.py +++ b/tests/test_slim_compat.py @@ -51,31 +51,18 @@ class TestSlimGracefulDegradation(unittest.TestCase): In a full install they verify the guard exists but doesn't fire. """ - def test_workspace_and_project_attributes_exist(self): - """Workspace and Project are either real classes or None sentinels.""" + def test_workspace_always_available(self): + """Workspace imports cleanly even in slim mode.""" import roboflow - # In full install these are classes; in slim they're None - ws = roboflow.Workspace - proj = roboflow.Project - self.assertTrue(ws is None or callable(ws)) - self.assertTrue(proj is None or callable(proj)) + self.assertIsNotNone(roboflow.Workspace) + self.assertTrue(callable(roboflow.Workspace)) - def test_roboflow_workspace_guard(self): - """If Workspace is None (slim), calling workspace() raises ImportError.""" + def test_project_guarded(self): + """Project is either a real class (full) or None (slim).""" import roboflow - if roboflow.Workspace is not None: - self.skipTest("Full install, Workspace is available") - - rf = roboflow.Roboflow.__new__(roboflow.Roboflow) - rf.api_key = "test" - rf.current_workspace = "test" - rf.model_format = "yolov8" - - with self.assertRaises(ImportError) as ctx: - rf.workspace() - self.assertIn("pip install roboflow", str(ctx.exception)) + self.assertTrue(roboflow.Project is None or callable(roboflow.Project)) def test_roboflow_project_guard(self): """If Project is None (slim), calling project() raises ImportError.""" diff --git a/tests/test_vision_events.py b/tests/test_vision_events.py index bcf480b4..42e160af 100644 --- a/tests/test_vision_events.py +++ b/tests/test_vision_events.py @@ -18,10 +18,7 @@ class TestVisionEvents(unittest.TestCase): WORKSPACE = "test-ws" def _make_workspace(self): - try: - from roboflow.core.workspace import Workspace - except ImportError: - self.skipTest("Workspace requires PIL (not available in slim install)") + from roboflow.core.workspace import Workspace info = { "workspace": { From c5ba344b26f9884fa48d15d64d49f159352cac61 Mon Sep 17 00:00:00 2001 From: Riaz Virani Date: Tue, 14 Apr 2026 14:16:23 -0400 Subject: [PATCH 7/9] docs: add roboflow-slim install option to README Co-Authored-By: Claude Opus 4.6 (1M context) --- README.md | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/README.md b/README.md index 22d04177..5e034ff1 100644 --- a/README.md +++ b/README.md @@ -51,6 +51,20 @@ pip install "roboflow[desktop]" ``` +
+ Lightweight install (roboflow-slim) + + If you only need vision events, workspace management, and the CLI (no image processing, inference, or training), install the lightweight package: + + ```bash + pip install roboflow-slim + ``` + + This skips heavy dependencies like OpenCV, NumPy, Matplotlib, and Pillow, reducing install size from ~400MB to ~50MB. Useful for embedded devices, CI pipelines, and serverless environments. + + Both packages share the same codebase and version. `pip install roboflow` includes everything. +
+
Install from source From 008038174119cffb744666fda60780a2d2abceb1 Mon Sep 17 00:00:00 2001 From: Riaz Virani Date: Tue, 14 Apr 2026 14:21:57 -0400 Subject: [PATCH 8/9] fix: add permissions block to test workflow Addresses CodeQL warning about unrestricted GITHUB_TOKEN permissions. Co-Authored-By: Claude Opus 4.6 (1M context) --- .github/workflows/test.yml | 3 +++ 1 file changed, 3 insertions(+) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 7726cf35..d8af7f47 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -6,6 +6,9 @@ on: pull_request: branches: [main] +permissions: + contents: read + jobs: build: strategy: From a30e2a734770a7f8777e82c0b576c5a57034702a Mon Sep 17 00:00:00 2001 From: renzon Date: Tue, 14 Apr 2026 18:49:21 -0300 Subject: [PATCH 9/9] Created crud sdk for vision event use-case --- roboflow/adapters/vision_events_api.py | 91 ++++++++++++++++ roboflow/cli/handlers/vision_events.py | 133 ++++++++++++++++++++++++ roboflow/core/workspace.py | 75 ++++++++++++++ tests/test_vision_events.py | 138 +++++++++++++++++++++++++ 4 files changed, 437 insertions(+) diff --git a/roboflow/adapters/vision_events_api.py b/roboflow/adapters/vision_events_api.py index f43ad34a..358ac5be 100644 --- a/roboflow/adapters/vision_events_api.py +++ b/roboflow/adapters/vision_events_api.py @@ -130,6 +130,97 @@ def get_custom_metadata_schema(api_key: str, use_case_id: str) -> dict: return response.json() +def create_use_case(api_key: str, name: str) -> dict: + """Create a new vision event use case. + + Args: + api_key: Roboflow API key. + name: Human-readable name for the use case. + + Returns: + Parsed JSON response with ``id`` and ``name``. + + Raises: + RoboflowError: On non-201 response status codes. + """ + response = requests.post( + f"{_BASE}/use-cases", + json={"name": name}, + headers=_auth_headers(api_key), + ) + if response.status_code != 201: + raise RoboflowError(response.text) + return response.json() + + +def rename_use_case(api_key: str, use_case_id: str, name: str) -> dict: + """Rename an existing vision event use case. + + Args: + api_key: Roboflow API key. + use_case_id: Use case identifier. + name: New name for the use case. + + Returns: + Parsed JSON response with ``id`` and ``name``. + + Raises: + RoboflowError: On non-200 response status codes. + """ + response = requests.put( + f"{_BASE}/use-cases/{use_case_id}", + json={"name": name}, + headers=_auth_headers(api_key), + ) + if response.status_code != 200: + raise RoboflowError(response.text) + return response.json() + + +def archive_use_case(api_key: str, use_case_id: str) -> dict: + """Archive a vision event use case. + + Args: + api_key: Roboflow API key. + use_case_id: Use case identifier. + + Returns: + Parsed JSON response with ``success``. + + Raises: + RoboflowError: On non-200 response status codes. + """ + response = requests.post( + f"{_BASE}/use-cases/{use_case_id}/archive", + headers=_auth_headers(api_key), + ) + if response.status_code != 200: + raise RoboflowError(response.text) + return response.json() + + +def unarchive_use_case(api_key: str, use_case_id: str) -> dict: + """Unarchive a vision event use case. + + Args: + api_key: Roboflow API key. + use_case_id: Use case identifier. + + Returns: + Parsed JSON response with ``success``. + + Raises: + RoboflowError: On non-200 response status codes. + """ + response = requests.post( + f"{_BASE}/use-cases/{use_case_id}/unarchive", + headers=_auth_headers(api_key), + ) + if response.status_code != 200: + raise RoboflowError(response.text) + return response.json() + + def upload_image( api_key: str, image_path: str, diff --git a/roboflow/cli/handlers/vision_events.py b/roboflow/cli/handlers/vision_events.py index 497936c0..61a521da 100644 --- a/roboflow/cli/handlers/vision_events.py +++ b/roboflow/cli/handlers/vision_events.py @@ -218,6 +218,139 @@ def _use_cases(args) -> None: # noqa: ANN001 output(args, result, text="\n".join(lines)) +# --------------------------------------------------------------------------- +# create-use-case +# --------------------------------------------------------------------------- + + +@vision_events_app.command("create-use-case") +def create_use_case( + ctx: typer.Context, + name: Annotated[str, typer.Argument(help="Name for the new use case")], +) -> None: + """Create a new vision event use case.""" + args = ctx_to_args(ctx, name=name) + _create_use_case(args) + + +def _create_use_case(args) -> None: # noqa: ANN001 + from roboflow.adapters import vision_events_api + from roboflow.adapters.rfapi import RoboflowError + from roboflow.cli._output import output, output_error + + api_key = _resolve(args) + if api_key is None: + return + + try: + result = vision_events_api.create_use_case(api_key, args.name) + except RoboflowError as exc: + output_error(args, str(exc)) + return + + output(args, result, text=f"Created use case {result.get('id', '')} ({result.get('name', '')})") + + +# --------------------------------------------------------------------------- +# rename-use-case +# --------------------------------------------------------------------------- + + +@vision_events_app.command("rename-use-case") +def rename_use_case( + ctx: typer.Context, + use_case: Annotated[str, typer.Argument(help="Use case identifier")], + name: Annotated[str, typer.Option("-n", "--name", help="New name for the use case")], +) -> None: + """Rename an existing vision event use case.""" + args = ctx_to_args(ctx, use_case=use_case, name=name) + _rename_use_case(args) + + +def _rename_use_case(args) -> None: # noqa: ANN001 + from roboflow.adapters import vision_events_api + from roboflow.adapters.rfapi import RoboflowError + from roboflow.cli._output import output, output_error + + api_key = _resolve(args) + if api_key is None: + return + + try: + result = vision_events_api.rename_use_case(api_key, args.use_case, args.name) + except RoboflowError as exc: + output_error(args, str(exc)) + return + + output(args, result, text=f"Renamed use case {result.get('id', '')} to {result.get('name', '')}") + + +# --------------------------------------------------------------------------- +# archive-use-case +# --------------------------------------------------------------------------- + + +@vision_events_app.command("archive-use-case") +def archive_use_case( + ctx: typer.Context, + use_case: Annotated[str, typer.Argument(help="Use case identifier")], +) -> None: + """Archive a vision event use case.""" + args = ctx_to_args(ctx, use_case=use_case) + _archive_use_case(args) + + +def _archive_use_case(args) -> None: # noqa: ANN001 + from roboflow.adapters import vision_events_api + from roboflow.adapters.rfapi import RoboflowError + from roboflow.cli._output import output, output_error + + api_key = _resolve(args) + if api_key is None: + return + + try: + result = vision_events_api.archive_use_case(api_key, args.use_case) + except RoboflowError as exc: + output_error(args, str(exc)) + return + + output(args, result, text=f"Archived use case {args.use_case}") + + +# --------------------------------------------------------------------------- +# unarchive-use-case +# --------------------------------------------------------------------------- + + +@vision_events_app.command("unarchive-use-case") +def unarchive_use_case( + ctx: typer.Context, + use_case: Annotated[str, typer.Argument(help="Use case identifier")], +) -> None: + """Unarchive a vision event use case.""" + args = ctx_to_args(ctx, use_case=use_case) + _unarchive_use_case(args) + + +def _unarchive_use_case(args) -> None: # noqa: ANN001 + from roboflow.adapters import vision_events_api + from roboflow.adapters.rfapi import RoboflowError + from roboflow.cli._output import output, output_error + + api_key = _resolve(args) + if api_key is None: + return + + try: + result = vision_events_api.unarchive_use_case(api_key, args.use_case) + except RoboflowError as exc: + output_error(args, str(exc)) + return + + output(args, result, text=f"Unarchived use case {args.use_case}") + + # --------------------------------------------------------------------------- # metadata-schema # --------------------------------------------------------------------------- diff --git a/roboflow/core/workspace.py b/roboflow/core/workspace.py index 916d6b04..6790a20b 100644 --- a/roboflow/core/workspace.py +++ b/roboflow/core/workspace.py @@ -1140,6 +1140,81 @@ def list_vision_event_use_cases(self, status: Optional[str] = None) -> dict: result["useCases"] = result["solutions"] return result + def create_vision_event_use_case(self, name: str) -> dict: + """Create a new vision event use case. + + Args: + name: Human-readable name for the use case. + + Returns: + Dict with ``id`` and ``name``. + + Example: + >>> ws = rf.workspace() + >>> result = ws.create_vision_event_use_case("manufacturing-qa") + >>> use_case_id = result["id"] + """ + return vision_events_api.create_use_case( + api_key=self.__api_key, + name=name, + ) + + def rename_vision_event_use_case(self, use_case: str, name: str) -> dict: + """Rename an existing vision event use case. + + Args: + use_case: Use case identifier. + name: New name for the use case. + + Returns: + Dict with ``id`` and ``name``. + + Example: + >>> ws = rf.workspace() + >>> ws.rename_vision_event_use_case("abc123", "new-name") + """ + return vision_events_api.rename_use_case( + api_key=self.__api_key, + use_case_id=use_case, + name=name, + ) + + def archive_vision_event_use_case(self, use_case: str) -> dict: + """Archive a vision event use case. + + Args: + use_case: Use case identifier. + + Returns: + Dict with ``success``. + + Example: + >>> ws = rf.workspace() + >>> ws.archive_vision_event_use_case("abc123") + """ + return vision_events_api.archive_use_case( + api_key=self.__api_key, + use_case_id=use_case, + ) + + def unarchive_vision_event_use_case(self, use_case: str) -> dict: + """Unarchive a vision event use case. + + Args: + use_case: Use case identifier. + + Returns: + Dict with ``success``. + + Example: + >>> ws = rf.workspace() + >>> ws.unarchive_vision_event_use_case("abc123") + """ + return vision_events_api.unarchive_use_case( + api_key=self.__api_key, + use_case_id=use_case, + ) + def get_vision_event_metadata_schema(self, use_case: str) -> dict: """Get the custom metadata schema for a vision event use case. diff --git a/tests/test_vision_events.py b/tests/test_vision_events.py index 42e160af..0ab2bb7c 100644 --- a/tests/test_vision_events.py +++ b/tests/test_vision_events.py @@ -313,6 +313,112 @@ def test_list_use_cases_error(self): with self.assertRaises(RoboflowError): ws.list_vision_event_use_cases() + # --- create_vision_event_use_case --- + + @responses.activate + def test_create_use_case(self): + responses.add( + responses.POST, + f"{_BASE}/use-cases", + json={"id": "new-uc", "name": "My Use Case"}, + status=201, + ) + + ws = self._make_workspace() + result = ws.create_vision_event_use_case("My Use Case") + + self.assertEqual(result["id"], "new-uc") + self.assertEqual(result["name"], "My Use Case") + self._assert_bearer_auth() + sent = json.loads(responses.calls[0].request.body) + self.assertEqual(sent["name"], "My Use Case") + + @responses.activate + def test_create_use_case_error(self): + responses.add(responses.POST, f"{_BASE}/use-cases", json={"error": "duplicate name"}, status=409) + + ws = self._make_workspace() + with self.assertRaises(RoboflowError): + ws.create_vision_event_use_case("Existing Name") + + # --- rename_vision_event_use_case --- + + @responses.activate + def test_rename_use_case(self): + responses.add( + responses.PUT, + f"{_BASE}/use-cases/uc-1", + json={"id": "uc-1", "name": "Renamed"}, + status=200, + ) + + ws = self._make_workspace() + result = ws.rename_vision_event_use_case("uc-1", "Renamed") + + self.assertEqual(result["id"], "uc-1") + self.assertEqual(result["name"], "Renamed") + self._assert_bearer_auth() + sent = json.loads(responses.calls[0].request.body) + self.assertEqual(sent["name"], "Renamed") + + @responses.activate + def test_rename_use_case_error(self): + responses.add(responses.PUT, f"{_BASE}/use-cases/nonexistent", json={"error": "not found"}, status=404) + + ws = self._make_workspace() + with self.assertRaises(RoboflowError): + ws.rename_vision_event_use_case("nonexistent", "New Name") + + # --- archive_vision_event_use_case --- + + @responses.activate + def test_archive_use_case(self): + responses.add( + responses.POST, + f"{_BASE}/use-cases/uc-1/archive", + json={"success": True}, + status=200, + ) + + ws = self._make_workspace() + result = ws.archive_vision_event_use_case("uc-1") + + self.assertTrue(result["success"]) + self._assert_bearer_auth() + + @responses.activate + def test_archive_use_case_error(self): + responses.add(responses.POST, f"{_BASE}/use-cases/nonexistent/archive", json={"error": "not found"}, status=404) + + ws = self._make_workspace() + with self.assertRaises(RoboflowError): + ws.archive_vision_event_use_case("nonexistent") + + # --- unarchive_vision_event_use_case --- + + @responses.activate + def test_unarchive_use_case(self): + responses.add( + responses.POST, + f"{_BASE}/use-cases/uc-1/unarchive", + json={"success": True}, + status=200, + ) + + ws = self._make_workspace() + result = ws.unarchive_vision_event_use_case("uc-1") + + self.assertTrue(result["success"]) + self._assert_bearer_auth() + + @responses.activate + def test_unarchive_use_case_error(self): + responses.add(responses.POST, f"{_BASE}/use-cases/uc-1/unarchive", json={"error": "already active"}, status=400) + + ws = self._make_workspace() + with self.assertRaises(RoboflowError): + ws.unarchive_vision_event_use_case("uc-1") + # --- get_vision_event_metadata_schema --- @responses.activate @@ -485,6 +591,38 @@ def test_adapter_get_metadata_schema(self): result = vision_events_api.get_custom_metadata_schema(self.API_KEY, "uc-1") self.assertEqual(result["fields"]["temp"]["types"], ["number"]) + @responses.activate + def test_adapter_create_use_case(self): + from roboflow.adapters import vision_events_api + + responses.add(responses.POST, f"{_BASE}/use-cases", json={"id": "uc-new", "name": "Test"}, status=201) + result = vision_events_api.create_use_case(self.API_KEY, "Test") + self.assertEqual(result["id"], "uc-new") + + @responses.activate + def test_adapter_rename_use_case(self): + from roboflow.adapters import vision_events_api + + responses.add(responses.PUT, f"{_BASE}/use-cases/uc-1", json={"id": "uc-1", "name": "New"}, status=200) + result = vision_events_api.rename_use_case(self.API_KEY, "uc-1", "New") + self.assertEqual(result["name"], "New") + + @responses.activate + def test_adapter_archive_use_case(self): + from roboflow.adapters import vision_events_api + + responses.add(responses.POST, f"{_BASE}/use-cases/uc-1/archive", json={"success": True}, status=200) + result = vision_events_api.archive_use_case(self.API_KEY, "uc-1") + self.assertTrue(result["success"]) + + @responses.activate + def test_adapter_unarchive_use_case(self): + from roboflow.adapters import vision_events_api + + responses.add(responses.POST, f"{_BASE}/use-cases/uc-1/unarchive", json={"success": True}, status=200) + result = vision_events_api.unarchive_use_case(self.API_KEY, "uc-1") + self.assertTrue(result["success"]) + @responses.activate def test_adapter_error_raises_roboflow_error(self): from roboflow.adapters import vision_events_api