Skip to content

feat: improve observability clients#443

Open
nborges-aws wants to merge 1 commit intomainfrom
feat/observabilityClient
Open

feat: improve observability clients#443
nborges-aws wants to merge 1 commit intomainfrom
feat/observabilityClient

Conversation

@nborges-aws
Copy link
Copy Markdown
Contributor

Summary

Migrates observability functionality from the starter toolkit into the SDK across two areas:

  1. New ObservabilityClient — manages CloudWatch delivery configuration and X-Ray Transaction Search for AgentCore resources.
  2. Expanded CloudWatchSpanHelper — adds trace-based querying, runtime log querying, and session discovery.

ObservabilityClient

Ported from the starter toolkit's ObservabilityDeliveryManager and xray.py.

Methods added:

  • enable_observability_for_resource
  • disable_observability_for_resource
  • enable_transaction_search
  • get_observability_status

CloudWatchSpanHelper

Methods added:

  • query_spans_by_trace
  • query_runtime_logs_by_traces
  • get_latest_session_id

Methods expanded (both non-breaking changes):

  • query_log_group — added optional agent_id parameter to allow filtering by agent_id
  • fetch_spans — added optional agent_id parameter to allow filtering by agent_id

Testing

  • ObservabilityClient: 22 unit tests + 9 integration tests
  • CloudWatchSpanHelper: 18 new unit tests + 5 integration tests

By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice.

@github-actions
Copy link
Copy Markdown
Contributor

✅ No Breaking Changes Detected

No public API breaking changes found in this PR.

agent_filter = (
f'\n | parse resource.attributes.cloud.resource_id "runtime/*/" as parsedAgentId'
f"\n | filter parsedAgentId = '{agent_id}'"
)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[CRITICAL] CWL Insights injection — agent_id, session_id, trace_id, endpoint_name

Every new method interpolates caller-controlled values directly into CWL Insights query strings with only quote-wrapping and no validation:

f"| filter parsedAgentId = '{agent_id}'"          # line 69
f'| filter @message like "{session_id}"'           # line 71
f"| filter traceId = '{trace_id}'"                 # lines 234, 356
f"/aws/bedrock-agentcore/runtimes/{agent_id}-{endpoint_name}"  # line 261 (log group name)
f"| filter parsedAgentId = '{agent_id}'"           # line 291

A value like x' | stats count(*) as n # rewrites the query semantics. A malicious endpoint_name can pivot the log group path to any group the caller's IAM role can read. In any multi-tenant SDK wrapper this is a cross-tenant log disclosure vector. No test exercises a hostile input.

Fix: validate all interpolated values against a strict pattern (e.g. ^[A-Za-z0-9_\-:.]{1,64}$) and reject anything else before interpolation.

start = time.time()
while True:
if time.time() - start > timeout:
raise TimeoutError(f"Query {query_id} timed out after {timeout} seconds")
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[CRITICAL] Running query is never cancelled on timeout, error, or exception

When TimeoutError is raised (or any exception propagates out of the polling loop), the query continues running server-side. There is no try/finally and no call to stop_query(queryId=query_id).

Per AWS docs: "queries continue to run until completion" unless StopQuery is explicitly called. Abandoned queries consume one of your 100 regional concurrent query slots (shared with scheduled queries) for up to the 60-minute server-side timeout. The _query_runtime_logs_individually fallback multiplies this: a single failed batch query spawns N individual queries that can each also leak a slot.

Fix:

status = None
try:
    while True:
        ...
        if status == "Complete":
            return result.get("results", [])
finally:
    if status != "Complete":
        self.logs_client.stop_query(queryId=query_id)

result = self.logs_client.get_query_results(queryId=query_id)
status = result["status"]
if status == "Complete":
return result.get("results", [])
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[HIGH] No 10,000-row truncation warning — silent data loss

_execute_query returns result.get("results", []) with no check on statistics.recordsMatched. Per AWS docs, GetQueryResults has a hard ceiling of 10,000 rows; statistics.recordsMatched reflects the total matching events regardless of this cap, so truncation is detectable.

Contrast with query_log_group (lines 106–117) which explicitly checks if records_matched > 10000 and warns. All three new methods (query_spans_by_trace, query_runtime_logs_by_traces, get_latest_session_id) go through _execute_query and silently truncate. Evaluation pipelines computing metrics over truncated span data will produce incorrect results with no signal.

Fix: port the recordsMatched check from query_log_group into this method.

results.extend(self._execute_query(query, log_group, start_time_ms, end_time_ms))
except Exception as e:
logger.warning("Failed to query runtime logs for trace %s: %s", trace_id, e)
return results
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[HIGH] Fallback silently returns partial results — no indication of which traces failed

Each per-trace except Exception is swallowed and the loop continues. The caller receives a plain List[dict] with no metadata about completeness — indistinguishable from a full successful result. Evaluation pipelines will compute error rates, latency, and span counts over an unknown partial dataset and report them as authoritative.

Fix: return a structured result such as {"results": [...], "failed_trace_ids": [...], "complete": bool}, or raise after the batch fallback already fired to ensure the caller is aware.

results["logs_enabled"] = True
elif resource_type in AUTO_LOG_RESOURCE_TYPES:
results["logs_enabled"] = True
results["deliveries"]["logs"] = {"status": "auto-created by AWS"}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[MEDIUM] logs_enabled: True returned for runtimes with no delivery verification

elif resource_type in AUTO_LOG_RESOURCE_TYPES:
    results["logs_enabled"] = True
    results["deliveries"]["logs"] = {"status": "auto-created by AWS"}

AWS docs confirm that AgentCore does auto-create a CloudWatch log group when a runtime resource is created, so the assumption is generally correct. However, the code makes no AWS call to verify the log group actually exists before returning logs_enabled: True. If the runtime was never fully provisioned, or if auto-creation is delayed, the caller has no way to distinguish "confirmed" from "assumed."

Fix: document the assumption explicitly in the docstring, and consider a lightweight describe_log_groups check (with a short timeout) for callers who need confirmation rather than assumption.


# Delete deliveries referencing this source first
try:
deliveries = self._logs_client.describe_deliveries()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[HIGH] describe_deliveries is not paginated

deliveries = self._logs_client.describe_deliveries()

Per the AWS API docs, DescribeDeliveries returns at most 50 results per page and includes a nextToken for subsequent pages. Accounts with >50 deliveries will have the target delivery on a later page. The subsequent delete_delivery_source then fails with ConflictException (not handled here — only ResourceNotFoundException is caught), leaving orphaned delivery sources and returning status: partial with no clear explanation.

Fix:

paginator = self._logs_client.get_paginator("describe_deliveries")
for page in paginator.paginate():
    for delivery in page.get("deliveries", []):
        ...

results["deleted"].append(f"log_group:{lg}")
except ClientError as e:
if e.response["Error"]["Code"] != "ResourceNotFoundException":
results["errors"].append(f"Failed to delete log group {lg}: {e}")
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[CRITICAL] disable_observability_for_resource(delete_log_group=True) deletes log groups for ALL resource types, not just the one being disabled

disable_observability_for_resource takes no resource_type parameter. When delete_log_group=True, it iterates all of SUPPORTED_RESOURCE_TYPES and attempts to delete:

  • /aws/bedrock-agentcore/runtimes/{resource_id}
  • /aws/vendedlogs/bedrock-agentcore/gateway/APPLICATION_LOGS/{resource_id}
  • /aws/vendedlogs/bedrock-agentcore/memory/APPLICATION_LOGS/{resource_id}

Disabling a memory resource silently attempts to delete the auto-managed runtime log group and any gateway log group sharing the same ID. ResourceNotFoundException is silently swallowed so collateral deletion goes unnoticed. Note the asymmetry: enable_observability_for_resource requires resource_type and constructs exactly one path; disable does not.

Fix: add resource_type as a required parameter to disable_observability_for_resource and construct exactly one log group path, matching the enable-side logic.

if self._need_indexing_rule():
try:
self._xray_client.update_indexing_rule(
Name="Default", Rule={"Probabilistic": {"DesiredSamplingPercentage": 1}}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[MEDIUM] X-Ray indexing rule sampling hardcoded to 1% with no parameter to override

Rule={"Probabilistic": {"DesiredSamplingPercentage": 1}}

Note: _need_indexing_rule only triggers this if no Default rule exists yet — it will not overwrite an existing rule. However, on a fresh account 1% is set silently with no documentation or way to change it. A new user enabling Transaction Search would expect full (or at least high) coverage, not 1%.

Fix: expose as a parameter (e.g. sampling_percentage: int = 100) and document the cost/coverage tradeoff in the docstring.

status[suffix]["configured"] = True
status[suffix]["source_name"] = source_name
except ClientError:
pass
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[HIGH] get_observability_status only checks delivery-source existence — reports configured: True when logs aren't flowing

Per the AWS docs, a delivery source, destination, and delivery are three independent objects created in sequence. A source can exist without a delivery connecting it to a destination — the docs state: "Use CreateDelivery to create a delivery by pairing exactly one delivery source and one delivery destination." Without an active delivery, logs do not flow.

This method only calls get_delivery_source. It will return configured: True even when the delivery or destination is missing (e.g., if enable_observability_for_resource crashed midway).

Fix: also check that at least one active delivery exists linking source to destination via describe_deliveries.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants