feat: improve observability clients#443
Conversation
✅ No Breaking Changes DetectedNo public API breaking changes found in this PR. |
| agent_filter = ( | ||
| f'\n | parse resource.attributes.cloud.resource_id "runtime/*/" as parsedAgentId' | ||
| f"\n | filter parsedAgentId = '{agent_id}'" | ||
| ) |
There was a problem hiding this comment.
[CRITICAL] CWL Insights injection — agent_id, session_id, trace_id, endpoint_name
Every new method interpolates caller-controlled values directly into CWL Insights query strings with only quote-wrapping and no validation:
f"| filter parsedAgentId = '{agent_id}'" # line 69
f'| filter @message like "{session_id}"' # line 71
f"| filter traceId = '{trace_id}'" # lines 234, 356
f"/aws/bedrock-agentcore/runtimes/{agent_id}-{endpoint_name}" # line 261 (log group name)
f"| filter parsedAgentId = '{agent_id}'" # line 291A value like x' | stats count(*) as n # rewrites the query semantics. A malicious endpoint_name can pivot the log group path to any group the caller's IAM role can read. In any multi-tenant SDK wrapper this is a cross-tenant log disclosure vector. No test exercises a hostile input.
Fix: validate all interpolated values against a strict pattern (e.g. ^[A-Za-z0-9_\-:.]{1,64}$) and reject anything else before interpolation.
| start = time.time() | ||
| while True: | ||
| if time.time() - start > timeout: | ||
| raise TimeoutError(f"Query {query_id} timed out after {timeout} seconds") |
There was a problem hiding this comment.
[CRITICAL] Running query is never cancelled on timeout, error, or exception
When TimeoutError is raised (or any exception propagates out of the polling loop), the query continues running server-side. There is no try/finally and no call to stop_query(queryId=query_id).
Per AWS docs: "queries continue to run until completion" unless StopQuery is explicitly called. Abandoned queries consume one of your 100 regional concurrent query slots (shared with scheduled queries) for up to the 60-minute server-side timeout. The _query_runtime_logs_individually fallback multiplies this: a single failed batch query spawns N individual queries that can each also leak a slot.
Fix:
status = None
try:
while True:
...
if status == "Complete":
return result.get("results", [])
finally:
if status != "Complete":
self.logs_client.stop_query(queryId=query_id)| result = self.logs_client.get_query_results(queryId=query_id) | ||
| status = result["status"] | ||
| if status == "Complete": | ||
| return result.get("results", []) |
There was a problem hiding this comment.
[HIGH] No 10,000-row truncation warning — silent data loss
_execute_query returns result.get("results", []) with no check on statistics.recordsMatched. Per AWS docs, GetQueryResults has a hard ceiling of 10,000 rows; statistics.recordsMatched reflects the total matching events regardless of this cap, so truncation is detectable.
Contrast with query_log_group (lines 106–117) which explicitly checks if records_matched > 10000 and warns. All three new methods (query_spans_by_trace, query_runtime_logs_by_traces, get_latest_session_id) go through _execute_query and silently truncate. Evaluation pipelines computing metrics over truncated span data will produce incorrect results with no signal.
Fix: port the recordsMatched check from query_log_group into this method.
| results.extend(self._execute_query(query, log_group, start_time_ms, end_time_ms)) | ||
| except Exception as e: | ||
| logger.warning("Failed to query runtime logs for trace %s: %s", trace_id, e) | ||
| return results |
There was a problem hiding this comment.
[HIGH] Fallback silently returns partial results — no indication of which traces failed
Each per-trace except Exception is swallowed and the loop continues. The caller receives a plain List[dict] with no metadata about completeness — indistinguishable from a full successful result. Evaluation pipelines will compute error rates, latency, and span counts over an unknown partial dataset and report them as authoritative.
Fix: return a structured result such as {"results": [...], "failed_trace_ids": [...], "complete": bool}, or raise after the batch fallback already fired to ensure the caller is aware.
| results["logs_enabled"] = True | ||
| elif resource_type in AUTO_LOG_RESOURCE_TYPES: | ||
| results["logs_enabled"] = True | ||
| results["deliveries"]["logs"] = {"status": "auto-created by AWS"} |
There was a problem hiding this comment.
[MEDIUM] logs_enabled: True returned for runtimes with no delivery verification
elif resource_type in AUTO_LOG_RESOURCE_TYPES:
results["logs_enabled"] = True
results["deliveries"]["logs"] = {"status": "auto-created by AWS"}AWS docs confirm that AgentCore does auto-create a CloudWatch log group when a runtime resource is created, so the assumption is generally correct. However, the code makes no AWS call to verify the log group actually exists before returning logs_enabled: True. If the runtime was never fully provisioned, or if auto-creation is delayed, the caller has no way to distinguish "confirmed" from "assumed."
Fix: document the assumption explicitly in the docstring, and consider a lightweight describe_log_groups check (with a short timeout) for callers who need confirmation rather than assumption.
|
|
||
| # Delete deliveries referencing this source first | ||
| try: | ||
| deliveries = self._logs_client.describe_deliveries() |
There was a problem hiding this comment.
[HIGH] describe_deliveries is not paginated
deliveries = self._logs_client.describe_deliveries()Per the AWS API docs, DescribeDeliveries returns at most 50 results per page and includes a nextToken for subsequent pages. Accounts with >50 deliveries will have the target delivery on a later page. The subsequent delete_delivery_source then fails with ConflictException (not handled here — only ResourceNotFoundException is caught), leaving orphaned delivery sources and returning status: partial with no clear explanation.
Fix:
paginator = self._logs_client.get_paginator("describe_deliveries")
for page in paginator.paginate():
for delivery in page.get("deliveries", []):
...| results["deleted"].append(f"log_group:{lg}") | ||
| except ClientError as e: | ||
| if e.response["Error"]["Code"] != "ResourceNotFoundException": | ||
| results["errors"].append(f"Failed to delete log group {lg}: {e}") |
There was a problem hiding this comment.
[CRITICAL] disable_observability_for_resource(delete_log_group=True) deletes log groups for ALL resource types, not just the one being disabled
disable_observability_for_resource takes no resource_type parameter. When delete_log_group=True, it iterates all of SUPPORTED_RESOURCE_TYPES and attempts to delete:
/aws/bedrock-agentcore/runtimes/{resource_id}/aws/vendedlogs/bedrock-agentcore/gateway/APPLICATION_LOGS/{resource_id}/aws/vendedlogs/bedrock-agentcore/memory/APPLICATION_LOGS/{resource_id}
Disabling a memory resource silently attempts to delete the auto-managed runtime log group and any gateway log group sharing the same ID. ResourceNotFoundException is silently swallowed so collateral deletion goes unnoticed. Note the asymmetry: enable_observability_for_resource requires resource_type and constructs exactly one path; disable does not.
Fix: add resource_type as a required parameter to disable_observability_for_resource and construct exactly one log group path, matching the enable-side logic.
| if self._need_indexing_rule(): | ||
| try: | ||
| self._xray_client.update_indexing_rule( | ||
| Name="Default", Rule={"Probabilistic": {"DesiredSamplingPercentage": 1}} |
There was a problem hiding this comment.
[MEDIUM] X-Ray indexing rule sampling hardcoded to 1% with no parameter to override
Rule={"Probabilistic": {"DesiredSamplingPercentage": 1}}Note: _need_indexing_rule only triggers this if no Default rule exists yet — it will not overwrite an existing rule. However, on a fresh account 1% is set silently with no documentation or way to change it. A new user enabling Transaction Search would expect full (or at least high) coverage, not 1%.
Fix: expose as a parameter (e.g. sampling_percentage: int = 100) and document the cost/coverage tradeoff in the docstring.
| status[suffix]["configured"] = True | ||
| status[suffix]["source_name"] = source_name | ||
| except ClientError: | ||
| pass |
There was a problem hiding this comment.
[HIGH] get_observability_status only checks delivery-source existence — reports configured: True when logs aren't flowing
Per the AWS docs, a delivery source, destination, and delivery are three independent objects created in sequence. A source can exist without a delivery connecting it to a destination — the docs state: "Use CreateDelivery to create a delivery by pairing exactly one delivery source and one delivery destination." Without an active delivery, logs do not flow.
This method only calls get_delivery_source. It will return configured: True even when the delivery or destination is missing (e.g., if enable_observability_for_resource crashed midway).
Fix: also check that at least one active delivery exists linking source to destination via describe_deliveries.
Summary
Migrates observability functionality from the starter toolkit into the SDK across two areas:
ObservabilityClient— manages CloudWatch delivery configuration and X-Ray Transaction Search for AgentCore resources.CloudWatchSpanHelper— adds trace-based querying, runtime log querying, and session discovery.ObservabilityClient
Ported from the starter toolkit's
ObservabilityDeliveryManagerandxray.py.Methods added:
CloudWatchSpanHelper
Methods added:
Methods expanded (both non-breaking changes):
Testing
By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice.