Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion simulation_bridge/docs/performance.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ A new row is appended to the CSV each time the bridge processes a client request
| **Timestamp** | Float (epoch seconds) | Time when `start_operation()` is called. Serves as the reference point for all time deltas. |
| **Client ID** | String | Identifier of the requesting client. |
| **Client Protocol** | String | Protocol used to communicate with the bridge (e.g., REST, MQTT). |
| **Operation ID** | String (UUID) | Unique identifier for each request. Used for cross-referencing logs and traces. Internally paired with **Client Protocol** to avoid collisions when identical IDs originate from different clients. |
| **Operation ID** | String (UUID) | Unique identifier for each request. Used for cross-referencing logs and traces. Internally tracked together with **Client Protocol**, **Client ID** and **Simulation Type** to avoid collisions when identical IDs originate from different clients or simulation types. |
| **Simulation Type** | String | Type of simulation requested (`batch`, `streaming`, or `interactive`). |
| **Request Received Time** | Float (seconds) | When the bridge's event loop first registers the incoming API call. |
| **Core Received Input Time** | Float (seconds) | The moment when the simulation core acknowledged and buffered the input payload (i.e., when it consumed the data from the bridge). The difference between `Core Received Input Time` and `Request Received Time` quantifies the signal overhead. |
Expand Down
53 changes: 34 additions & 19 deletions simulation_bridge/src/utils/performance_monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,14 @@ class _Paths:
@dataclass
class _RuntimeState:
"""Internal runtime state for the monitor."""
metrics_by_operation_id: Dict[Tuple[str, str], PerformanceMetrics] = field(
default_factory=dict)
metrics_by_operation_id: Dict[
Tuple[str, str, str, str], PerformanceMetrics
] = field(
default_factory=dict
)
op_index: Dict[Tuple[str, str], Tuple[str, str, str, str]] = field(
default_factory=dict
)
metrics_history: List[PerformanceMetrics] = field(default_factory=list)
process: Optional[psutil.Process] = None

Expand Down Expand Up @@ -113,10 +119,14 @@ def history(self) -> List[PerformanceMetrics]:
"""Return the list of completed metrics."""
return list(self._state.metrics_history)

def get_metric(self, operation_id: str,
protocol: str) -> Optional[PerformanceMetrics]:
def get_metric(
self, operation_id: str, protocol: str
) -> Optional[PerformanceMetrics]:
"""Return metrics for an ongoing operation."""
return self._state.metrics_by_operation_id.get((operation_id, protocol))
key = self._state.op_index.get((protocol, operation_id))
if not key:
return None
return self._state.metrics_by_operation_id.get(key)

def _write_csv_headers(self) -> None:
if not self._enabled:
Expand Down Expand Up @@ -158,7 +168,8 @@ def _update_system_metrics(self, metric: PerformanceMetrics) -> None:

def _is_valid_operation(self, operation_id: str, protocol: str) -> bool:
return self._enabled and (
operation_id, protocol) in self._state.metrics_by_operation_id
protocol, operation_id
) in self._state.op_index

def start_operation(
self,
Expand All @@ -167,12 +178,14 @@ def start_operation(
protocol: str = "unknown",
simulation_type: str = "unknown",
) -> None:
key = (operation_id, protocol)
if not self._enabled or key in self._state.metrics_by_operation_id:
index_key = (protocol, operation_id)
full_key = (protocol, operation_id, client_id, simulation_type)
if not self._enabled or index_key in self._state.op_index:
return

now = time.time()
self._state.metrics_by_operation_id[key] = PerformanceMetrics(
self._state.op_index[index_key] = full_key
self._state.metrics_by_operation_id[full_key] = PerformanceMetrics(
client_id=client_id,
client_protocol=protocol,
operation_id=operation_id,
Expand Down Expand Up @@ -207,7 +220,10 @@ def record_event(self, operation_id: str, protocol: str,
return

now = time.time()
metric = self._state.metrics_by_operation_id[(operation_id, protocol)]
metric_key = self._state.op_index.get((protocol, operation_id))
if not metric_key:
return
metric = self._state.metrics_by_operation_id[metric_key]

if event == EventType.CORE_RECEIVED_INPUT:
metric.core_received_input_time = now
Expand All @@ -225,8 +241,8 @@ def record_event(self, operation_id: str, protocol: str,
def finalize_operation(self, operation_id: str, protocol: str) -> None:
if not self._is_valid_operation(operation_id, protocol):
return
metric = self._state.metrics_by_operation_id.pop(
(operation_id, protocol))
metric_key = self._state.op_index.pop((protocol, operation_id))
metric = self._state.metrics_by_operation_id.pop(metric_key)
metric.result_completed_time = time.time()
metric.total_duration = metric.result_completed_time - metric.request_received_time
if metric.core_sent_input_time:
Expand All @@ -243,13 +259,12 @@ def _update_timestamp(self, operation_id: str,
if not self._is_valid_operation(operation_id, protocol):
return
now = time.time()
setattr(
self._state.metrics_by_operation_id[(operation_id, protocol)],
field_name,
now)
self._update_system_metrics(
self._state.metrics_by_operation_id[(operation_id, protocol)]
)
metric_key = self._state.op_index.get((protocol, operation_id))
if not metric_key:
return
metric = self._state.metrics_by_operation_id[metric_key]
setattr(metric, field_name, now)
self._update_system_metrics(metric)

def _save_metrics_to_csv(self, metric: PerformanceMetrics) -> None:
if not self._enabled:
Expand Down
Loading