Skip to content

Standardize Endpoint invoke lifecycle with centralized error handling#61

Closed
acere wants to merge 2 commits intoawslabs:mainfrom
acere:standardize-endpoint-invoke-lifecycle
Closed

Standardize Endpoint invoke lifecycle with centralized error handling#61
acere wants to merge 2 commits intoawslabs:mainfrom
acere:standardize-endpoint-invoke-lifecycle

Conversation

@acere
Copy link
Copy Markdown
Collaborator

@acere acere commented Apr 16, 2026

Summary

Refactors the Endpoint base class to eliminate duplicated error handling, timing, and metadata boilerplate across all 12 endpoint implementations.

Closes #60

What changed

Base class (base.py)

The Endpoint class now provides a structured invoke lifecycle via __init_subclass__ wrapping. Subclasses define three methods:

Method Required Purpose
invoke(payload) Yes API call + parse_response()
parse_response(raw_response, start_t) Yes Extract text, tokens, metadata
prepare_payload(payload, **kwargs) No Merge kwargs, inject model_id, etc.

The wrapper automatically handles:

  • Error handling — exceptions → error InvocationResponse with payload attached
  • Timingtime_to_last_token back-filled for non-streaming endpoints
  • Metadatainput_payload, input_prompt, id always populated
  • _parse_payload — extracts human-readable prompt for observability and token counting fallback

InvocationResponse new field

  • num_tokens_input_cached — input tokens served from prompt cache. Populated by Bedrock (cacheReadInputTokens) and OpenAI (cached_tokens).

Endpoint improvements

  • All Bedrock + SageMaker endpoints now extract ResponseMetadata.RequestId as the response ID
  • SageMaker now captures RetryAttempts (Bedrock already had this)
  • Bedrock streaming preserves partial data on mid-stream errors instead of discarding it
  • BEDROCK_STREAM_ERROR_TYPES defined as a shared frozenset constant, used by both Converse and InvokeModel stream parsers
  • Unknown stream events are skipped gracefully (forward-compatible with new Bedrock event types)
  • All redundant try/except blocks removed from _parse_response methods

Before/after (e.g. OpenAIResponseEndpoint.invoke)

Before (27 lines with 5 duplicate except handlers):

def invoke(self, payload, **kwargs):
    payload = {**kwargs, **payload}
    payload["model"] = self.model_id
    start_t = time.perf_counter()
    try:
        client_response = self._client.responses.create(**payload)
    except APIConnectionError as e:
        logger.exception(e)
        return InvocationResponse.error_output(...)
    except AuthenticationError as e:
        ...  # 4 more identical except blocks
    response = self._parse_response(client_response, start_t)
    response.input_payload = payload
    response.input_prompt = self._parse_payload(payload)
    return response

After (3 lines):

def invoke(self, payload):
    client_response = self._client.responses.create(**payload)
    return self.parse_response(client_response, self._start_t)

Documentation

  • metrics.md — Per-request fields table expanded from 6 to 12 fields
  • key_concepts.md — Endpoint description explains the invoke lifecycle
  • connect_endpoints.md — Added custom endpoint example with the new abstract methods

Testing

  • 666 unit tests pass
  • Integration tests updated with stronger ID assertions (AWS RequestId format)
  • New test for partial data preservation on streaming errors
  • Import sorting and formatting applied via ruff

acere added 2 commits April 16, 2026 22:44
Refactor the Endpoint base class to provide a structured invoke lifecycle
via __init_subclass__ wrapping:

- prepare_payload(payload, **kwargs) → merge kwargs, inject provider fields
- invoke(payload) → API call + parse_response() (abstract)
- parse_response(raw_response, start_t) → extract text/tokens/metadata (abstract)

The base class wrapper automatically provides:
- Error handling: exceptions → error InvocationResponse with partial data
- Timing: time_to_last_token back-fill for non-streaming endpoints
- Metadata: input_payload, input_prompt, id always populated
- _parse_payload for input prompt extraction (token counting fallback)

Additional improvements:
- Add num_tokens_input_cached field for prompt caching (Bedrock + OpenAI)
- Extract AWS RequestId as response ID for Bedrock and SageMaker
- Extract RetryAttempts for SageMaker (Bedrock already had this)
- Preserve partial data on streaming errors instead of discarding
- Define BEDROCK_STREAM_ERROR_TYPES as shared constant
- Skip unknown stream events gracefully (forward-compatible)
- Remove redundant try/except from all _parse_response methods
- Remove uuid4/error handling boilerplate from all endpoint subclasses
- Update docs: metrics table, key concepts, custom endpoint guide

Closes awslabs#60
- Add num_tokens_input_cached to Result.stats aggregation metrics
  and total_cached_input_tokens to run-level stats
- Add integration test for ConverseStream prompt caching with
  unique-per-run prefix to avoid stale cache hits
- Add 6 unit tests verifying mid-stream errors (TimeoutError,
  ConnectionError) are caught by the invoke wrapper for
  BedrockConverseStream, BedrockInvokeStream, and
  OpenAICompletionStreamEndpoint
- Add demo notebook comparing TTFT with/without prompt caching,
  using a CacheBuster callback to guarantee cache misses
- Sort imports across codebase (ruff --select I)
- Update metrics documentation with new stats fields
@acere
Copy link
Copy Markdown
Collaborator Author

acere commented Apr 17, 2026

Superseded by PR #58, which now includes all changes from this PR (endpoint lifecycle refactor, prompt caching metrics, mid-stream error handling, demo notebook). The combined branch was force-pushed to feature/time-bound-runs.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Standardize Endpoint invoke lifecycle with centralized error handling, timing, and metadata

1 participant