diff --git a/simulation_bridge/docs/performance.md b/simulation_bridge/docs/performance.md index 1fe1ce32..a9512641 100644 --- a/simulation_bridge/docs/performance.md +++ b/simulation_bridge/docs/performance.md @@ -23,7 +23,7 @@ A new row is appended to the CSV each time the bridge processes a client request | **Timestamp** | Float (epoch seconds) | Time when `start_operation()` is called. Serves as the reference point for all time deltas. | | **Client ID** | String | Identifier of the requesting client. | | **Client Protocol** | String | Protocol used to communicate with the bridge (e.g., REST, MQTT). | -| **Operation ID** | String (UUID) | Unique identifier for each request. Used for cross-referencing logs and traces. Internally paired with **Client Protocol** to avoid collisions when identical IDs originate from different clients. | +| **Operation ID** | String (UUID) | Unique identifier for each request. Used for cross-referencing logs and traces. Internally tracked together with **Client Protocol**, **Client ID** and **Simulation Type** to avoid collisions when identical IDs originate from different clients or simulation types. | | **Simulation Type** | String | Type of simulation requested (`batch`, `streaming`, or `interactive`). | | **Request Received Time** | Float (seconds) | When the bridge's event loop first registers the incoming API call. | | **Core Received Input Time** | Float (seconds) | The moment when the simulation core acknowledged and buffered the input payload (i.e., when it consumed the data from the bridge). The difference between `Core Received Input Time` and `Request Received Time` quantifies the signal overhead. | diff --git a/simulation_bridge/src/utils/performance_monitor.py b/simulation_bridge/src/utils/performance_monitor.py index 854ee148..7d5693ff 100644 --- a/simulation_bridge/src/utils/performance_monitor.py +++ b/simulation_bridge/src/utils/performance_monitor.py @@ -49,8 +49,14 @@ class _Paths: @dataclass class _RuntimeState: """Internal runtime state for the monitor.""" - metrics_by_operation_id: Dict[Tuple[str, str], PerformanceMetrics] = field( - default_factory=dict) + metrics_by_operation_id: Dict[ + Tuple[str, str, str, str], PerformanceMetrics + ] = field( + default_factory=dict + ) + op_index: Dict[Tuple[str, str], Tuple[str, str, str, str]] = field( + default_factory=dict + ) metrics_history: List[PerformanceMetrics] = field(default_factory=list) process: Optional[psutil.Process] = None @@ -113,10 +119,14 @@ def history(self) -> List[PerformanceMetrics]: """Return the list of completed metrics.""" return list(self._state.metrics_history) - def get_metric(self, operation_id: str, - protocol: str) -> Optional[PerformanceMetrics]: + def get_metric( + self, operation_id: str, protocol: str + ) -> Optional[PerformanceMetrics]: """Return metrics for an ongoing operation.""" - return self._state.metrics_by_operation_id.get((operation_id, protocol)) + key = self._state.op_index.get((protocol, operation_id)) + if not key: + return None + return self._state.metrics_by_operation_id.get(key) def _write_csv_headers(self) -> None: if not self._enabled: @@ -158,7 +168,8 @@ def _update_system_metrics(self, metric: PerformanceMetrics) -> None: def _is_valid_operation(self, operation_id: str, protocol: str) -> bool: return self._enabled and ( - operation_id, protocol) in self._state.metrics_by_operation_id + protocol, operation_id + ) in self._state.op_index def start_operation( self, @@ -167,12 +178,14 @@ def start_operation( protocol: str = "unknown", simulation_type: str = "unknown", ) -> None: - key = (operation_id, protocol) - if not self._enabled or key in self._state.metrics_by_operation_id: + index_key = (protocol, operation_id) + full_key = (protocol, operation_id, client_id, simulation_type) + if not self._enabled or index_key in self._state.op_index: return now = time.time() - self._state.metrics_by_operation_id[key] = PerformanceMetrics( + self._state.op_index[index_key] = full_key + self._state.metrics_by_operation_id[full_key] = PerformanceMetrics( client_id=client_id, client_protocol=protocol, operation_id=operation_id, @@ -207,7 +220,10 @@ def record_event(self, operation_id: str, protocol: str, return now = time.time() - metric = self._state.metrics_by_operation_id[(operation_id, protocol)] + metric_key = self._state.op_index.get((protocol, operation_id)) + if not metric_key: + return + metric = self._state.metrics_by_operation_id[metric_key] if event == EventType.CORE_RECEIVED_INPUT: metric.core_received_input_time = now @@ -225,8 +241,8 @@ def record_event(self, operation_id: str, protocol: str, def finalize_operation(self, operation_id: str, protocol: str) -> None: if not self._is_valid_operation(operation_id, protocol): return - metric = self._state.metrics_by_operation_id.pop( - (operation_id, protocol)) + metric_key = self._state.op_index.pop((protocol, operation_id)) + metric = self._state.metrics_by_operation_id.pop(metric_key) metric.result_completed_time = time.time() metric.total_duration = metric.result_completed_time - metric.request_received_time if metric.core_sent_input_time: @@ -243,13 +259,12 @@ def _update_timestamp(self, operation_id: str, if not self._is_valid_operation(operation_id, protocol): return now = time.time() - setattr( - self._state.metrics_by_operation_id[(operation_id, protocol)], - field_name, - now) - self._update_system_metrics( - self._state.metrics_by_operation_id[(operation_id, protocol)] - ) + metric_key = self._state.op_index.get((protocol, operation_id)) + if not metric_key: + return + metric = self._state.metrics_by_operation_id[metric_key] + setattr(metric, field_name, now) + self._update_system_metrics(metric) def _save_metrics_to_csv(self, metric: PerformanceMetrics) -> None: if not self._enabled: