From 912b7847b6936f0f72bd098fd1d5f4aaa1884f2d Mon Sep 17 00:00:00 2001 From: Wrisa Date: Thu, 16 Apr 2026 13:18:31 -0700 Subject: [PATCH 01/10] Add workflow and refactor LLM for langchain --- .../examples/workflow/main.py | 115 +++++++ .../examples/workflow/requirements.txt | 8 + .../langchain/callback_handler.py | 45 +-- .../tests/test_workflow_chain.py | 307 ++++++++++++++++++ 4 files changed, 448 insertions(+), 27 deletions(-) create mode 100644 instrumentation-genai/opentelemetry-instrumentation-langchain/examples/workflow/main.py create mode 100644 instrumentation-genai/opentelemetry-instrumentation-langchain/examples/workflow/requirements.txt create mode 100644 instrumentation-genai/opentelemetry-instrumentation-langchain/tests/test_workflow_chain.py diff --git a/instrumentation-genai/opentelemetry-instrumentation-langchain/examples/workflow/main.py b/instrumentation-genai/opentelemetry-instrumentation-langchain/examples/workflow/main.py new file mode 100644 index 0000000000..d74f5efa73 --- /dev/null +++ b/instrumentation-genai/opentelemetry-instrumentation-langchain/examples/workflow/main.py @@ -0,0 +1,115 @@ +""" +LangGraph StateGraph example with an LLM node. + +Similar to the manual example (../manual/main.py) but uses LangGraph's StateGraph +with a node that calls ChatOpenAI. OpenTelemetry LangChain instrumentation traces +the LLM calls made from within the graph node. +""" + +from typing import Annotated + + +from langchain_core.messages import HumanMessage, SystemMessage +from langchain_openai import ChatOpenAI +from langgraph.graph import END, START, StateGraph +from langgraph.graph.message import add_messages +from typing_extensions import TypedDict + +from opentelemetry import _logs, metrics, trace +from opentelemetry.exporter.otlp.proto.grpc._log_exporter import ( + OTLPLogExporter, +) +from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import ( + OTLPMetricExporter, +) +from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import ( + OTLPSpanExporter, +) +from opentelemetry.instrumentation.langchain import LangChainInstrumentor +from opentelemetry.sdk._logs import LoggerProvider +from opentelemetry.sdk._logs.export import BatchLogRecordProcessor +from opentelemetry.sdk.metrics import MeterProvider +from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader +from opentelemetry.sdk.trace import TracerProvider +from opentelemetry.sdk.trace.export import BatchSpanProcessor + + +# Configure tracing +trace.set_tracer_provider(TracerProvider()) +span_processor = BatchSpanProcessor(OTLPSpanExporter()) +trace.get_tracer_provider().add_span_processor(span_processor) + +# Configure logging +_logs.set_logger_provider(LoggerProvider()) +_logs.get_logger_provider().add_log_record_processor( + BatchLogRecordProcessor(OTLPLogExporter()) +) + +# Configure metrics +metrics.set_meter_provider( + MeterProvider( + metric_readers=[ + PeriodicExportingMetricReader( + OTLPMetricExporter(), + ), + ] + ) +) + + +class GraphState(TypedDict): + """State for the graph; messages are accumulated with add_messages.""" + + messages: Annotated[list, add_messages] + + +def build_graph(llm: ChatOpenAI): + """Build a StateGraph with a single LLM node.""" + + def llm_node(state: GraphState) -> dict: + """Node that invokes the LLM with the current messages.""" + response = llm.invoke(state["messages"]) + return {"messages": [response]} + + builder = StateGraph(GraphState) + builder.add_node("llm", llm_node) + builder.add_edge(START, "llm") + builder.add_edge("llm", END) + return builder.compile() + + +def main(): + # Set up instrumentation (traces LLM calls from within graph nodes) + LangChainInstrumentor().instrument() + + # ChatOpenAI setup + llm = ChatOpenAI( + model="gpt-3.5-turbo", + temperature=0.1, + max_tokens=100, + top_p=0.9, + frequency_penalty=0.5, + presence_penalty=0.5, + stop_sequences=["\n", "Human:", "AI:"], + seed=100, + ) + + graph = build_graph(llm) + + initial_messages = [ + SystemMessage(content="You are a helpful assistant!"), + HumanMessage(content="What is the capital of France?"), + ] + + result = graph.invoke({"messages": initial_messages}) + + print("LangGraph output (messages):") + for msg in result.get("messages", []): + print(f" {type(msg).__name__}: {msg.content}") + + # Un-instrument after use + LangChainInstrumentor().uninstrument() + + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/instrumentation-genai/opentelemetry-instrumentation-langchain/examples/workflow/requirements.txt b/instrumentation-genai/opentelemetry-instrumentation-langchain/examples/workflow/requirements.txt new file mode 100644 index 0000000000..f27cb4a3c1 --- /dev/null +++ b/instrumentation-genai/opentelemetry-instrumentation-langchain/examples/workflow/requirements.txt @@ -0,0 +1,8 @@ +langchain==0.3.21 +langchain_openai +langgraph +opentelemetry-sdk>=1.39.0 +opentelemetry-exporter-otlp-proto-grpc>=1.39.0 + +# Uncomment after langchain instrumentation is released +# opentelemetry-instrumentation-langchain~=2.0b0.dev \ No newline at end of file diff --git a/instrumentation-genai/opentelemetry-instrumentation-langchain/src/opentelemetry/instrumentation/langchain/callback_handler.py b/instrumentation-genai/opentelemetry-instrumentation-langchain/src/opentelemetry/instrumentation/langchain/callback_handler.py index 5235af3c7f..42ba85bd8b 100644 --- a/instrumentation-genai/opentelemetry-instrumentation-langchain/src/opentelemetry/instrumentation/langchain/callback_handler.py +++ b/instrumentation-genai/opentelemetry-instrumentation-langchain/src/opentelemetry/instrumentation/langchain/callback_handler.py @@ -25,10 +25,9 @@ _InvocationManager, ) from opentelemetry.util.genai.handler import TelemetryHandler +from opentelemetry.util.genai.invocation import InferenceInvocation from opentelemetry.util.genai.types import ( - Error, InputMessage, - LLMInvocation, # TODO: migrate to InferenceInvocation MessagePart, OutputMessage, Text, @@ -140,25 +139,22 @@ def on_chat_model_start( ) ) - llm_invocation = LLMInvocation( + llm_invocation = self._telemetry_handler.start_inference( + provider, request_model=request_model, - input_messages=input_messages, - provider=provider, - top_p=top_p, - frequency_penalty=frequency_penalty, - presence_penalty=presence_penalty, - stop_sequences=stop_sequences, - seed=seed, - temperature=temperature, - max_tokens=max_tokens, - ) - llm_invocation = self._telemetry_handler.start_llm( - invocation=llm_invocation ) + llm_invocation.input_messages = input_messages + llm_invocation.top_p = top_p + llm_invocation.frequency_penalty = frequency_penalty + llm_invocation.presence_penalty = presence_penalty + llm_invocation.stop_sequences = stop_sequences + llm_invocation.seed = seed + llm_invocation.temperature = temperature + llm_invocation.max_tokens = max_tokens self._invocation_manager.add_invocation_state( run_id=run_id, parent_run_id=parent_run_id, - invocation=llm_invocation, # pyright: ignore[reportArgumentType] + invocation=llm_invocation, ) def on_llm_end( @@ -172,7 +168,7 @@ def on_llm_end( llm_invocation = self._invocation_manager.get_invocation(run_id=run_id) if llm_invocation is None or not isinstance( llm_invocation, - LLMInvocation, + InferenceInvocation, ): # If the invocation does not exist, we cannot set attributes or end it return @@ -247,10 +243,8 @@ def on_llm_end( if response_id is not None: llm_invocation.response_id = str(response_id) - llm_invocation = self._telemetry_handler.stop_llm( - invocation=llm_invocation - ) - if llm_invocation.span and not llm_invocation.span.is_recording(): + llm_invocation.stop() + if not llm_invocation.span.is_recording(): self._invocation_manager.delete_invocation_state(run_id=run_id) def on_llm_error( @@ -264,14 +258,11 @@ def on_llm_error( llm_invocation = self._invocation_manager.get_invocation(run_id=run_id) if llm_invocation is None or not isinstance( llm_invocation, - LLMInvocation, + InferenceInvocation, ): # If the invocation does not exist, we cannot set attributes or end it return - error_otel = Error(message=str(error), type=type(error)) - llm_invocation = self._telemetry_handler.fail_llm( - invocation=llm_invocation, error=error_otel - ) - if llm_invocation.span and not llm_invocation.span.is_recording(): + llm_invocation.fail(error) + if not llm_invocation.span.is_recording(): self._invocation_manager.delete_invocation_state(run_id=run_id) diff --git a/instrumentation-genai/opentelemetry-instrumentation-langchain/tests/test_workflow_chain.py b/instrumentation-genai/opentelemetry-instrumentation-langchain/tests/test_workflow_chain.py new file mode 100644 index 0000000000..d407378e7f --- /dev/null +++ b/instrumentation-genai/opentelemetry-instrumentation-langchain/tests/test_workflow_chain.py @@ -0,0 +1,307 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Unit tests verifying CSA propagation through the LangChain callback handler. + +These tests exercise the OpenTelemetryLangChainCallbackHandler directly using +mock LangChain callback payloads, so they do not require live models or VCR +cassettes. The key behaviour under test: + + on_chain_start(parent_run_id=None) + → WorkflowInvocation created + → TelemetryHandler.start() called + → gen_ai.workflow.name written to context-scoped attributes (CSA) + + on_chat_model_start(...) + → LLMInvocation created + → TelemetryHandler.start() called + → gen_ai.workflow.name read from CSA and stamped on the LLM invocation + + on_llm_end(...) + → LLM span closed with gen_ai.workflow.name attribute set + + on_chain_end(...) + → workflow span closed, CSA scope ends +""" + +from __future__ import annotations + +import os +import uuid +from unittest import TestCase +from unittest.mock import patch + +from langchain_core.outputs import ChatGeneration, LLMResult +from langchain_core.messages import AIMessage + +from opentelemetry import baggage +from opentelemetry import context as otel_context +from opentelemetry.instrumentation.langchain.callback_handler import ( + OpenTelemetryLangChainCallbackHandler, +) +from opentelemetry.sdk.trace import TracerProvider +from opentelemetry.sdk.trace.export import SimpleSpanProcessor +from opentelemetry.sdk.trace.export.in_memory_span_exporter import ( + InMemorySpanExporter, +) +from opentelemetry.trace import SpanKind +from opentelemetry.util.genai.context_attributes import ( + get_context_scoped_attributes, +) +from opentelemetry.util.genai.handler import TelemetryHandler + + +# --------------------------------------------------------------------------- +# Shared helpers +# --------------------------------------------------------------------------- + +def _make_serialized(name: str) -> dict: + """Minimal serialized dict that on_chain_start / on_chat_model_start expect.""" + return {"name": name} + + +def _make_llm_result(content: str = "hello") -> LLMResult: + """Minimal LLMResult with one generation.""" + msg = AIMessage(content=content) + gen = ChatGeneration(message=msg, text=content) + gen.generation_info = {"finish_reason": "stop"} + return LLMResult(generations=[[gen]], llm_output={"model_name": "gpt-3.5-turbo"}) + + +def _make_chat_invocation_params(model_name: str = "gpt-3.5-turbo") -> dict: + """kwargs dict that on_chat_model_start receives for a ChatOpenAI call.""" + return {"invocation_params": {"model_name": model_name, "params": {"model_name": model_name}}} + + +# --------------------------------------------------------------------------- +# Base test class +# --------------------------------------------------------------------------- + +class _CallbackHandlerTestBase(TestCase): + def setUp(self) -> None: + self.span_exporter = InMemorySpanExporter() + tracer_provider = TracerProvider() + tracer_provider.add_span_processor( + SimpleSpanProcessor(self.span_exporter) + ) + telemetry_handler = TelemetryHandler(tracer_provider=tracer_provider) + self.handler = OpenTelemetryLangChainCallbackHandler( + telemetry_handler=telemetry_handler + ) + + def _finished_spans(self): + return self.span_exporter.get_finished_spans() + + def _spans_by_kind(self, kind: SpanKind): + return [s for s in self._finished_spans() if s.kind == kind] + + +# --------------------------------------------------------------------------- +# Tests +# --------------------------------------------------------------------------- + +class TestWorkflowSpanCreation(_CallbackHandlerTestBase): + """Verify that a workflow span is created for top-level chains.""" + + def test_workflow_span_created_for_top_level_chain(self) -> None: + chain_run_id = uuid.uuid4() + + self.handler.on_chain_start( + serialized=_make_serialized("MyChain"), + inputs={}, + run_id=chain_run_id, + parent_run_id=None, + ) + self.handler.on_chain_end( + outputs={}, + run_id=chain_run_id, + parent_run_id=None, + ) + + internal_spans = self._spans_by_kind(SpanKind.INTERNAL) + self.assertEqual(len(internal_spans), 1) + self.assertEqual(internal_spans[0].name, "invoke_workflow MyChain") + + def test_no_workflow_span_for_nested_chain(self) -> None: + """Chains with a parent_run_id are nested — no extra workflow span.""" + parent_run_id = uuid.uuid4() + child_run_id = uuid.uuid4() + + # Start parent (top-level) + self.handler.on_chain_start( + serialized=_make_serialized("ParentChain"), + inputs={}, + run_id=parent_run_id, + parent_run_id=None, + ) + # Start child (nested — should NOT create a workflow span) + self.handler.on_chain_start( + serialized=_make_serialized("ChildChain"), + inputs={}, + run_id=child_run_id, + parent_run_id=parent_run_id, + ) + self.handler.on_chain_end( + outputs={}, + run_id=child_run_id, + parent_run_id=parent_run_id, + ) + self.handler.on_chain_end( + outputs={}, + run_id=parent_run_id, + parent_run_id=None, + ) + + # Only one INTERNAL (workflow) span — for the parent, not the child + internal_spans = self._spans_by_kind(SpanKind.INTERNAL) + self.assertEqual(len(internal_spans), 1) + self.assertEqual(internal_spans[0].name, "invoke_workflow ParentChain") + + +class TestLLMSpanGetsWorkflowName(_CallbackHandlerTestBase): + """Verify gen_ai.workflow.name is propagated to the LLM span via CSA.""" + + def test_llm_span_inside_chain_gets_workflow_name(self) -> None: + chain_run_id = uuid.uuid4() + llm_run_id = uuid.uuid4() + + self.handler.on_chain_start( + serialized=_make_serialized("MyPipeline"), + inputs={}, + run_id=chain_run_id, + parent_run_id=None, + ) + self.handler.on_chat_model_start( + serialized=_make_serialized("ChatOpenAI"), + messages=[[AIMessage(content="hi")]], + run_id=llm_run_id, + parent_run_id=chain_run_id, + metadata={"ls_provider": "openai"}, + **_make_chat_invocation_params("gpt-3.5-turbo"), + ) + self.handler.on_llm_end( + response=_make_llm_result(), + run_id=llm_run_id, + parent_run_id=chain_run_id, + ) + self.handler.on_chain_end( + outputs={}, + run_id=chain_run_id, + parent_run_id=None, + ) + + client_spans = self._spans_by_kind(SpanKind.CLIENT) + self.assertEqual(len(client_spans), 1) + self.assertEqual( + client_spans[0].attributes.get("gen_ai.workflow.name"), + "MyPipeline", + ) + + def test_workflow_name_from_metadata_override(self) -> None: + """metadata['workflow_name'] overrides the serialized chain name.""" + chain_run_id = uuid.uuid4() + llm_run_id = uuid.uuid4() + + self.handler.on_chain_start( + serialized=_make_serialized("InternalChainName"), + inputs={}, + run_id=chain_run_id, + parent_run_id=None, + metadata={"workflow_name": "my_custom_wf"}, + ) + self.handler.on_chat_model_start( + serialized=_make_serialized("ChatOpenAI"), + messages=[[AIMessage(content="hi")]], + run_id=llm_run_id, + parent_run_id=chain_run_id, + metadata={"ls_provider": "openai"}, + **_make_chat_invocation_params("gpt-3.5-turbo"), + ) + self.handler.on_llm_end( + response=_make_llm_result(), + run_id=llm_run_id, + parent_run_id=chain_run_id, + ) + self.handler.on_chain_end( + outputs={}, + run_id=chain_run_id, + parent_run_id=None, + ) + + internal_spans = self._spans_by_kind(SpanKind.INTERNAL) + self.assertEqual(len(internal_spans), 1) + self.assertEqual(internal_spans[0].name, "invoke_workflow my_custom_wf") + + client_spans = self._spans_by_kind(SpanKind.CLIENT) + self.assertEqual(len(client_spans), 1) + self.assertEqual( + client_spans[0].attributes.get("gen_ai.workflow.name"), + "my_custom_wf", + ) + + +class TestCSANotLeakedToBaggage(_CallbackHandlerTestBase): + """Verify that gen_ai.workflow.name is NOT written to W3C Baggage by default.""" + + def test_csa_not_leaked_to_baggage(self) -> None: + env = {k: v for k, v in os.environ.items() if k != "OTEL_PYTHON_GENAI_CAPTURE_BAGGAGE"} + with patch.dict(os.environ, env, clear=True): + chain_run_id = uuid.uuid4() + + self.handler.on_chain_start( + serialized=_make_serialized("BaggageTestChain"), + inputs={}, + run_id=chain_run_id, + parent_run_id=None, + ) + try: + # While workflow is active, baggage should NOT contain the workflow name + baggage_value = baggage.get_baggage("gen_ai.workflow.name") + self.assertIsNone( + baggage_value, + "gen_ai.workflow.name must not be leaked to W3C Baggage by default", + ) + finally: + self.handler.on_chain_end( + outputs={}, + run_id=chain_run_id, + parent_run_id=None, + ) + + +class TestCSAScopeEndsAfterChain(_CallbackHandlerTestBase): + """Verify that the CSA is no longer visible after the chain ends.""" + + def test_csa_not_visible_outside_workflow_scope(self) -> None: + chain_run_id = uuid.uuid4() + + self.handler.on_chain_start( + serialized=_make_serialized("ScopedChain"), + inputs={}, + run_id=chain_run_id, + parent_run_id=None, + ) + self.handler.on_chain_end( + outputs={}, + run_id=chain_run_id, + parent_run_id=None, + ) + + # After on_chain_end the context token is detached — CSA should be gone + attrs = get_context_scoped_attributes() + self.assertIsNone( + attrs.get("gen_ai.workflow.name"), + "gen_ai.workflow.name should not be visible after workflow scope ends", + ) From 1ce4cf5ee9e2b86bbc41b0c98f7ee51bc7e09188 Mon Sep 17 00:00:00 2001 From: Wrisa Date: Thu, 16 Apr 2026 14:09:45 -0700 Subject: [PATCH 02/10] add workflow support and genai dependancy --- .../pyproject.toml | 3 +- .../langchain/callback_handler.py | 71 +++++ .../tests/test_workflow_chain.py | 252 ++++++++++++++++++ 3 files changed, 325 insertions(+), 1 deletion(-) diff --git a/instrumentation-genai/opentelemetry-instrumentation-langchain/pyproject.toml b/instrumentation-genai/opentelemetry-instrumentation-langchain/pyproject.toml index 80a406b7da..a87b82f999 100644 --- a/instrumentation-genai/opentelemetry-instrumentation-langchain/pyproject.toml +++ b/instrumentation-genai/opentelemetry-instrumentation-langchain/pyproject.toml @@ -25,7 +25,8 @@ classifiers = [ "Programming Language :: Python :: 3.14", ] dependencies = [ - "opentelemetry-instrumentation ~= 0.57b0", + "opentelemetry-instrumentation ~= 0.60b0", + "opentelemetry-util-genai >= 0.4b0.dev", ] [project.optional-dependencies] diff --git a/instrumentation-genai/opentelemetry-instrumentation-langchain/src/opentelemetry/instrumentation/langchain/callback_handler.py b/instrumentation-genai/opentelemetry-instrumentation-langchain/src/opentelemetry/instrumentation/langchain/callback_handler.py index 42ba85bd8b..7ac8decd9b 100644 --- a/instrumentation-genai/opentelemetry-instrumentation-langchain/src/opentelemetry/instrumentation/langchain/callback_handler.py +++ b/instrumentation-genai/opentelemetry-instrumentation-langchain/src/opentelemetry/instrumentation/langchain/callback_handler.py @@ -26,6 +26,7 @@ ) from opentelemetry.util.genai.handler import TelemetryHandler from opentelemetry.util.genai.invocation import InferenceInvocation +from opentelemetry.util.genai.invocation import WorkflowInvocation from opentelemetry.util.genai.types import ( InputMessage, MessagePart, @@ -44,6 +45,75 @@ def __init__(self, telemetry_handler: TelemetryHandler) -> None: self._telemetry_handler = telemetry_handler self._invocation_manager = _InvocationManager() + def on_chain_start( + self, + serialized: dict[str, Any], + inputs: dict[str, Any], + *, + run_id: UUID, + parent_run_id: Optional[UUID] = None, + tags: Optional[list[str]] = None, + metadata: Optional[dict[str, Any]] = None, + **kwargs: Any, + ) -> Any: + payload = serialized or {} + name_source = ( + payload.get("name") + or payload.get("id") + or kwargs.get("name") + or (metadata.get("langgraph_node") if metadata else None) + ) + name = str(name_source or "chain") + + if parent_run_id is None: + workflow_name_override = metadata.get("workflow_name") if metadata else None + wf = self._telemetry_handler.start_workflow(name=workflow_name_override or name) + self._invocation_manager.add_invocation_state(run_id, None, wf) + return + else: + self._invocation_manager.add_invocation_state(run_id, parent_run_id) + + + def on_chain_end( + self, + outputs: dict[str, Any], + *, + run_id: UUID, + parent_run_id: Optional[UUID] = None, + **kwargs: Any, + ) -> Any: + invocation = self._invocation_manager.get_invocation(run_id=run_id) + if invocation is None or not isinstance( + invocation, WorkflowInvocation + ): + # If the invocation does not exist, we cannot set attributes or end it + return + + invocation.stop() + + if not invocation.span.is_recording(): + self._invocation_manager.delete_invocation_state(run_id) + + + def on_chain_error( + self, + error: BaseException, + *, + run_id: UUID, + parent_run_id: Optional[UUID] = None, + **kwargs: Any, + ) -> Any: + invocation = self._invocation_manager.get_invocation(run_id=run_id) + if invocation is None or not isinstance( + invocation, WorkflowInvocation + ): + # If the invocation does not exist, we cannot set attributes or end it + return + + invocation.fail(error) + if not invocation.span.is_recording(): + self._invocation_manager.delete_invocation_state(run_id=run_id) + def on_chat_model_start( self, serialized: dict[str, Any], @@ -266,3 +336,4 @@ def on_llm_error( llm_invocation.fail(error) if not llm_invocation.span.is_recording(): self._invocation_manager.delete_invocation_state(run_id=run_id) + diff --git a/instrumentation-genai/opentelemetry-instrumentation-langchain/tests/test_workflow_chain.py b/instrumentation-genai/opentelemetry-instrumentation-langchain/tests/test_workflow_chain.py index d407378e7f..86c76eb685 100644 --- a/instrumentation-genai/opentelemetry-instrumentation-langchain/tests/test_workflow_chain.py +++ b/instrumentation-genai/opentelemetry-instrumentation-langchain/tests/test_workflow_chain.py @@ -57,6 +57,7 @@ InMemorySpanExporter, ) from opentelemetry.trace import SpanKind +from opentelemetry.trace.status import StatusCode from opentelemetry.util.genai.context_attributes import ( get_context_scoped_attributes, ) @@ -305,3 +306,254 @@ def test_csa_not_visible_outside_workflow_scope(self) -> None: attrs.get("gen_ai.workflow.name"), "gen_ai.workflow.name should not be visible after workflow scope ends", ) + + +class TestWorkflowErrorPath(_CallbackHandlerTestBase): + """Verify on_chain_error records error status and cleans up state.""" + + def test_workflow_span_has_error_status_on_chain_error(self) -> None: + chain_run_id = uuid.uuid4() + + self.handler.on_chain_start( + serialized=_make_serialized("FailingChain"), + inputs={}, + run_id=chain_run_id, + parent_run_id=None, + ) + self.handler.on_chain_error( + error=ValueError("something went wrong"), + run_id=chain_run_id, + parent_run_id=None, + ) + + internal_spans = self._spans_by_kind(SpanKind.INTERNAL) + self.assertEqual(len(internal_spans), 1) + span = internal_spans[0] + self.assertEqual(span.name, "invoke_workflow FailingChain") + self.assertEqual(span.status.status_code, StatusCode.ERROR) + self.assertIn("something went wrong", span.status.description) + + def test_workflow_span_error_type_attribute(self) -> None: + chain_run_id = uuid.uuid4() + + self.handler.on_chain_start( + serialized=_make_serialized("FailingChain"), + inputs={}, + run_id=chain_run_id, + parent_run_id=None, + ) + self.handler.on_chain_error( + error=RuntimeError("boom"), + run_id=chain_run_id, + parent_run_id=None, + ) + + internal_spans = self._spans_by_kind(SpanKind.INTERNAL) + self.assertEqual(len(internal_spans), 1) + self.assertEqual( + internal_spans[0].attributes.get("error.type"), "RuntimeError" + ) + + def test_chain_error_cleans_up_invocation_state(self) -> None: + chain_run_id = uuid.uuid4() + + self.handler.on_chain_start( + serialized=_make_serialized("FailingChain"), + inputs={}, + run_id=chain_run_id, + parent_run_id=None, + ) + self.handler.on_chain_error( + error=ValueError("oops"), + run_id=chain_run_id, + parent_run_id=None, + ) + + # Invocation should have been removed — a second error call is a no-op + self.handler.on_chain_error( + error=ValueError("duplicate"), + run_id=chain_run_id, + parent_run_id=None, + ) + # Only one span — the second call was silently ignored + self.assertEqual(len(self._finished_spans()), 1) + + def test_chain_error_csa_scope_ends(self) -> None: + """CSA should be gone after on_chain_error, same as on_chain_end.""" + chain_run_id = uuid.uuid4() + + self.handler.on_chain_start( + serialized=_make_serialized("FailingChain"), + inputs={}, + run_id=chain_run_id, + parent_run_id=None, + ) + self.handler.on_chain_error( + error=ValueError("oops"), + run_id=chain_run_id, + parent_run_id=None, + ) + + attrs = get_context_scoped_attributes() + self.assertIsNone( + attrs.get("gen_ai.workflow.name"), + "gen_ai.workflow.name should not be visible after workflow error", + ) + + def test_chain_error_unknown_run_id_is_noop(self) -> None: + """on_chain_error with an unknown run_id must not raise.""" + self.handler.on_chain_error( + error=ValueError("no matching invocation"), + run_id=uuid.uuid4(), + parent_run_id=None, + ) + self.assertEqual(len(self._finished_spans()), 0) + + def test_chain_end_unknown_run_id_is_noop(self) -> None: + """on_chain_end with an unknown run_id must not raise.""" + self.handler.on_chain_end( + outputs={}, + run_id=uuid.uuid4(), + parent_run_id=None, + ) + self.assertEqual(len(self._finished_spans()), 0) + + +class TestLLMErrorInsideWorkflow(_CallbackHandlerTestBase): + """Verify on_llm_error inside a workflow doesn't break the parent workflow span.""" + + def test_llm_error_inside_workflow_records_error_on_llm_span(self) -> None: + chain_run_id = uuid.uuid4() + llm_run_id = uuid.uuid4() + + self.handler.on_chain_start( + serialized=_make_serialized("MyPipeline"), + inputs={}, + run_id=chain_run_id, + parent_run_id=None, + ) + self.handler.on_chat_model_start( + serialized=_make_serialized("ChatOpenAI"), + messages=[[AIMessage(content="hi")]], + run_id=llm_run_id, + parent_run_id=chain_run_id, + metadata={"ls_provider": "openai"}, + **_make_chat_invocation_params("gpt-3.5-turbo"), + ) + self.handler.on_llm_error( + error=RuntimeError("model timeout"), + run_id=llm_run_id, + parent_run_id=chain_run_id, + ) + self.handler.on_chain_end( + outputs={}, + run_id=chain_run_id, + parent_run_id=None, + ) + + client_spans = self._spans_by_kind(SpanKind.CLIENT) + self.assertEqual(len(client_spans), 1) + self.assertEqual(client_spans[0].status.status_code, StatusCode.ERROR) + + # Workflow span still finishes (not in error state) + internal_spans = self._spans_by_kind(SpanKind.INTERNAL) + self.assertEqual(len(internal_spans), 1) + self.assertNotEqual(internal_spans[0].status.status_code, StatusCode.ERROR) + + def test_llm_error_inside_workflow_llm_span_is_child_of_workflow(self) -> None: + chain_run_id = uuid.uuid4() + llm_run_id = uuid.uuid4() + + self.handler.on_chain_start( + serialized=_make_serialized("MyPipeline"), + inputs={}, + run_id=chain_run_id, + parent_run_id=None, + ) + self.handler.on_chat_model_start( + serialized=_make_serialized("ChatOpenAI"), + messages=[[AIMessage(content="hi")]], + run_id=llm_run_id, + parent_run_id=chain_run_id, + metadata={"ls_provider": "openai"}, + **_make_chat_invocation_params("gpt-3.5-turbo"), + ) + self.handler.on_llm_end( + response=_make_llm_result(), + run_id=llm_run_id, + parent_run_id=chain_run_id, + ) + self.handler.on_chain_end( + outputs={}, + run_id=chain_run_id, + parent_run_id=None, + ) + + internal_spans = self._spans_by_kind(SpanKind.INTERNAL) + client_spans = self._spans_by_kind(SpanKind.CLIENT) + self.assertEqual(len(internal_spans), 1) + self.assertEqual(len(client_spans), 1) + + workflow_span = internal_spans[0] + llm_span = client_spans[0] + self.assertEqual( + llm_span.context.trace_id, + workflow_span.context.trace_id, + "LLM span and workflow span must share the same trace", + ) + self.assertEqual( + llm_span.parent.span_id, + workflow_span.context.span_id, + "LLM span must be a child of the workflow span", + ) + + +class TestWorkflowNameFallback(_CallbackHandlerTestBase): + """Verify the name resolution fallback chain in on_chain_start.""" + + def test_name_falls_back_to_id_list(self) -> None: + chain_run_id = uuid.uuid4() + + self.handler.on_chain_start( + serialized={"id": ["pkg", "mod", "MyRunnableClass"]}, + inputs={}, + run_id=chain_run_id, + parent_run_id=None, + ) + self.handler.on_chain_end(outputs={}, run_id=chain_run_id, parent_run_id=None) + + internal_spans = self._spans_by_kind(SpanKind.INTERNAL) + self.assertEqual(len(internal_spans), 1) + # id is a list — _safe_str(list) produces a string; just verify a span was created + self.assertTrue(internal_spans[0].name.startswith("invoke_workflow ")) + + def test_name_falls_back_to_langgraph_node(self) -> None: + chain_run_id = uuid.uuid4() + + self.handler.on_chain_start( + serialized={}, + inputs={}, + run_id=chain_run_id, + parent_run_id=None, + metadata={"langgraph_node": "my_node"}, + ) + self.handler.on_chain_end(outputs={}, run_id=chain_run_id, parent_run_id=None) + + internal_spans = self._spans_by_kind(SpanKind.INTERNAL) + self.assertEqual(len(internal_spans), 1) + self.assertEqual(internal_spans[0].name, "invoke_workflow my_node") + + def test_name_defaults_to_chain_when_nothing_provided(self) -> None: + chain_run_id = uuid.uuid4() + + self.handler.on_chain_start( + serialized={}, + inputs={}, + run_id=chain_run_id, + parent_run_id=None, + ) + self.handler.on_chain_end(outputs={}, run_id=chain_run_id, parent_run_id=None) + + internal_spans = self._spans_by_kind(SpanKind.INTERNAL) + self.assertEqual(len(internal_spans), 1) + self.assertEqual(internal_spans[0].name, "invoke_workflow chain") From e9fff126f51d3630f9f6a8c8d854a23527facfda Mon Sep 17 00:00:00 2001 From: wrisa Date: Mon, 20 Apr 2026 11:27:36 -0700 Subject: [PATCH 03/10] fixed errors --- .../examples/workflow/main.py | 4 +- .../langchain/callback_handler.py | 29 ++++++----- .../tests/test_workflow_chain.py | 48 ++++++++++++++----- uv.lock | 2 + 4 files changed, 56 insertions(+), 27 deletions(-) diff --git a/instrumentation-genai/opentelemetry-instrumentation-langchain/examples/workflow/main.py b/instrumentation-genai/opentelemetry-instrumentation-langchain/examples/workflow/main.py index d74f5efa73..b48865ee21 100644 --- a/instrumentation-genai/opentelemetry-instrumentation-langchain/examples/workflow/main.py +++ b/instrumentation-genai/opentelemetry-instrumentation-langchain/examples/workflow/main.py @@ -8,7 +8,6 @@ from typing import Annotated - from langchain_core.messages import HumanMessage, SystemMessage from langchain_openai import ChatOpenAI from langgraph.graph import END, START, StateGraph @@ -33,7 +32,6 @@ from opentelemetry.sdk.trace import TracerProvider from opentelemetry.sdk.trace.export import BatchSpanProcessor - # Configure tracing trace.set_tracer_provider(TracerProvider()) span_processor = BatchSpanProcessor(OTLPSpanExporter()) @@ -112,4 +110,4 @@ def main(): if __name__ == "__main__": - main() \ No newline at end of file + main() diff --git a/instrumentation-genai/opentelemetry-instrumentation-langchain/src/opentelemetry/instrumentation/langchain/callback_handler.py b/instrumentation-genai/opentelemetry-instrumentation-langchain/src/opentelemetry/instrumentation/langchain/callback_handler.py index 7ac8decd9b..c55b93481a 100644 --- a/instrumentation-genai/opentelemetry-instrumentation-langchain/src/opentelemetry/instrumentation/langchain/callback_handler.py +++ b/instrumentation-genai/opentelemetry-instrumentation-langchain/src/opentelemetry/instrumentation/langchain/callback_handler.py @@ -25,8 +25,10 @@ _InvocationManager, ) from opentelemetry.util.genai.handler import TelemetryHandler -from opentelemetry.util.genai.invocation import InferenceInvocation -from opentelemetry.util.genai.invocation import WorkflowInvocation +from opentelemetry.util.genai.invocation import ( + InferenceInvocation, + WorkflowInvocation, +) from opentelemetry.util.genai.types import ( InputMessage, MessagePart, @@ -58,21 +60,26 @@ def on_chain_start( ) -> Any: payload = serialized or {} name_source = ( - payload.get("name") - or payload.get("id") - or kwargs.get("name") - or (metadata.get("langgraph_node") if metadata else None) + payload.get("name") + or payload.get("id") + or kwargs.get("name") + or (metadata.get("langgraph_node") if metadata else None) ) name = str(name_source or "chain") if parent_run_id is None: - workflow_name_override = metadata.get("workflow_name") if metadata else None - wf = self._telemetry_handler.start_workflow(name=workflow_name_override or name) + workflow_name_override = ( + metadata.get("workflow_name") if metadata else None + ) + wf = self._telemetry_handler.start_workflow( + name=workflow_name_override or name + ) self._invocation_manager.add_invocation_state(run_id, None, wf) return else: - self._invocation_manager.add_invocation_state(run_id, parent_run_id) - + self._invocation_manager.add_invocation_state( + run_id, parent_run_id + ) def on_chain_end( self, @@ -94,7 +101,6 @@ def on_chain_end( if not invocation.span.is_recording(): self._invocation_manager.delete_invocation_state(run_id) - def on_chain_error( self, error: BaseException, @@ -336,4 +342,3 @@ def on_llm_error( llm_invocation.fail(error) if not llm_invocation.span.is_recording(): self._invocation_manager.delete_invocation_state(run_id=run_id) - diff --git a/instrumentation-genai/opentelemetry-instrumentation-langchain/tests/test_workflow_chain.py b/instrumentation-genai/opentelemetry-instrumentation-langchain/tests/test_workflow_chain.py index 86c76eb685..e1ce91a564 100644 --- a/instrumentation-genai/opentelemetry-instrumentation-langchain/tests/test_workflow_chain.py +++ b/instrumentation-genai/opentelemetry-instrumentation-langchain/tests/test_workflow_chain.py @@ -43,11 +43,10 @@ from unittest import TestCase from unittest.mock import patch -from langchain_core.outputs import ChatGeneration, LLMResult from langchain_core.messages import AIMessage +from langchain_core.outputs import ChatGeneration, LLMResult from opentelemetry import baggage -from opentelemetry import context as otel_context from opentelemetry.instrumentation.langchain.callback_handler import ( OpenTelemetryLangChainCallbackHandler, ) @@ -63,11 +62,11 @@ ) from opentelemetry.util.genai.handler import TelemetryHandler - # --------------------------------------------------------------------------- # Shared helpers # --------------------------------------------------------------------------- + def _make_serialized(name: str) -> dict: """Minimal serialized dict that on_chain_start / on_chat_model_start expect.""" return {"name": name} @@ -78,18 +77,26 @@ def _make_llm_result(content: str = "hello") -> LLMResult: msg = AIMessage(content=content) gen = ChatGeneration(message=msg, text=content) gen.generation_info = {"finish_reason": "stop"} - return LLMResult(generations=[[gen]], llm_output={"model_name": "gpt-3.5-turbo"}) + return LLMResult( + generations=[[gen]], llm_output={"model_name": "gpt-3.5-turbo"} + ) def _make_chat_invocation_params(model_name: str = "gpt-3.5-turbo") -> dict: """kwargs dict that on_chat_model_start receives for a ChatOpenAI call.""" - return {"invocation_params": {"model_name": model_name, "params": {"model_name": model_name}}} + return { + "invocation_params": { + "model_name": model_name, + "params": {"model_name": model_name}, + } + } # --------------------------------------------------------------------------- # Base test class # --------------------------------------------------------------------------- + class _CallbackHandlerTestBase(TestCase): def setUp(self) -> None: self.span_exporter = InMemorySpanExporter() @@ -113,6 +120,7 @@ def _spans_by_kind(self, kind: SpanKind): # Tests # --------------------------------------------------------------------------- + class TestWorkflowSpanCreation(_CallbackHandlerTestBase): """Verify that a workflow span is created for top-level chains.""" @@ -243,7 +251,9 @@ def test_workflow_name_from_metadata_override(self) -> None: internal_spans = self._spans_by_kind(SpanKind.INTERNAL) self.assertEqual(len(internal_spans), 1) - self.assertEqual(internal_spans[0].name, "invoke_workflow my_custom_wf") + self.assertEqual( + internal_spans[0].name, "invoke_workflow my_custom_wf" + ) client_spans = self._spans_by_kind(SpanKind.CLIENT) self.assertEqual(len(client_spans), 1) @@ -257,7 +267,11 @@ class TestCSANotLeakedToBaggage(_CallbackHandlerTestBase): """Verify that gen_ai.workflow.name is NOT written to W3C Baggage by default.""" def test_csa_not_leaked_to_baggage(self) -> None: - env = {k: v for k, v in os.environ.items() if k != "OTEL_PYTHON_GENAI_CAPTURE_BAGGAGE"} + env = { + k: v + for k, v in os.environ.items() + if k != "OTEL_PYTHON_GENAI_CAPTURE_BAGGAGE" + } with patch.dict(os.environ, env, clear=True): chain_run_id = uuid.uuid4() @@ -458,9 +472,13 @@ def test_llm_error_inside_workflow_records_error_on_llm_span(self) -> None: # Workflow span still finishes (not in error state) internal_spans = self._spans_by_kind(SpanKind.INTERNAL) self.assertEqual(len(internal_spans), 1) - self.assertNotEqual(internal_spans[0].status.status_code, StatusCode.ERROR) + self.assertNotEqual( + internal_spans[0].status.status_code, StatusCode.ERROR + ) - def test_llm_error_inside_workflow_llm_span_is_child_of_workflow(self) -> None: + def test_llm_error_inside_workflow_llm_span_is_child_of_workflow( + self, + ) -> None: chain_run_id = uuid.uuid4() llm_run_id = uuid.uuid4() @@ -520,7 +538,9 @@ def test_name_falls_back_to_id_list(self) -> None: run_id=chain_run_id, parent_run_id=None, ) - self.handler.on_chain_end(outputs={}, run_id=chain_run_id, parent_run_id=None) + self.handler.on_chain_end( + outputs={}, run_id=chain_run_id, parent_run_id=None + ) internal_spans = self._spans_by_kind(SpanKind.INTERNAL) self.assertEqual(len(internal_spans), 1) @@ -537,7 +557,9 @@ def test_name_falls_back_to_langgraph_node(self) -> None: parent_run_id=None, metadata={"langgraph_node": "my_node"}, ) - self.handler.on_chain_end(outputs={}, run_id=chain_run_id, parent_run_id=None) + self.handler.on_chain_end( + outputs={}, run_id=chain_run_id, parent_run_id=None + ) internal_spans = self._spans_by_kind(SpanKind.INTERNAL) self.assertEqual(len(internal_spans), 1) @@ -552,7 +574,9 @@ def test_name_defaults_to_chain_when_nothing_provided(self) -> None: run_id=chain_run_id, parent_run_id=None, ) - self.handler.on_chain_end(outputs={}, run_id=chain_run_id, parent_run_id=None) + self.handler.on_chain_end( + outputs={}, run_id=chain_run_id, parent_run_id=None + ) internal_spans = self._spans_by_kind(SpanKind.INTERNAL) self.assertEqual(len(internal_spans), 1) diff --git a/uv.lock b/uv.lock index 14d5ea35b6..b532c37e77 100644 --- a/uv.lock +++ b/uv.lock @@ -3530,6 +3530,7 @@ name = "opentelemetry-instrumentation-langchain" source = { editable = "instrumentation-genai/opentelemetry-instrumentation-langchain" } dependencies = [ { name = "opentelemetry-instrumentation" }, + { name = "opentelemetry-util-genai" }, ] [package.optional-dependencies] @@ -3541,6 +3542,7 @@ instruments = [ requires-dist = [ { name = "langchain", marker = "extra == 'instruments'", specifier = ">=0.3.21" }, { name = "opentelemetry-instrumentation", editable = "opentelemetry-instrumentation" }, + { name = "opentelemetry-util-genai", editable = "util/opentelemetry-util-genai" }, ] provides-extras = ["instruments"] From 6e3462025413841076b957d419050bd08dbdddf0 Mon Sep 17 00:00:00 2001 From: wrisa Date: Mon, 20 Apr 2026 11:41:52 -0700 Subject: [PATCH 04/10] fixed changelog --- .../opentelemetry-instrumentation-langchain/CHANGELOG.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/instrumentation-genai/opentelemetry-instrumentation-langchain/CHANGELOG.md b/instrumentation-genai/opentelemetry-instrumentation-langchain/CHANGELOG.md index 767dfcc7ed..a23d8b33d5 100644 --- a/instrumentation-genai/opentelemetry-instrumentation-langchain/CHANGELOG.md +++ b/instrumentation-genai/opentelemetry-instrumentation-langchain/CHANGELOG.md @@ -7,6 +7,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## Unreleased +- Add LangChain workflow span support and refactor LLM invocation + ([#4449](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/4449)) - Fix compatibility with wrapt 2.x by using positional arguments in `wrap_function_wrapper()` calls ([#4445](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/4445)) - Added span support for genAI langchain llm invocation. From c17607d97883f5b1d7a8857791addff18f3383c6 Mon Sep 17 00:00:00 2001 From: wrisa Date: Mon, 20 Apr 2026 11:55:18 -0700 Subject: [PATCH 05/10] fixed error --- .../instrumentation/langchain/callback_handler.py | 3 ++- .../instrumentation/langchain/invocation_manager.py | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/instrumentation-genai/opentelemetry-instrumentation-langchain/src/opentelemetry/instrumentation/langchain/callback_handler.py b/instrumentation-genai/opentelemetry-instrumentation-langchain/src/opentelemetry/instrumentation/langchain/callback_handler.py index c55b93481a..f99a91a4be 100644 --- a/instrumentation-genai/opentelemetry-instrumentation-langchain/src/opentelemetry/instrumentation/langchain/callback_handler.py +++ b/instrumentation-genai/opentelemetry-instrumentation-langchain/src/opentelemetry/instrumentation/langchain/callback_handler.py @@ -77,8 +77,9 @@ def on_chain_start( self._invocation_manager.add_invocation_state(run_id, None, wf) return else: + # TODO: For agent invocation self._invocation_manager.add_invocation_state( - run_id, parent_run_id + run_id, parent_run_id, ) def on_chain_end( diff --git a/instrumentation-genai/opentelemetry-instrumentation-langchain/src/opentelemetry/instrumentation/langchain/invocation_manager.py b/instrumentation-genai/opentelemetry-instrumentation-langchain/src/opentelemetry/instrumentation/langchain/invocation_manager.py index e8d2293bae..0431934f1e 100644 --- a/instrumentation-genai/opentelemetry-instrumentation-langchain/src/opentelemetry/instrumentation/langchain/invocation_manager.py +++ b/instrumentation-genai/opentelemetry-instrumentation-langchain/src/opentelemetry/instrumentation/langchain/invocation_manager.py @@ -39,7 +39,7 @@ def add_invocation_state( self, run_id: UUID, parent_run_id: Optional[UUID], - invocation: GenAIInvocation, + invocation: GenAIInvocation = None, ): invocation_state = _InvocationState(invocation=invocation) self._invocations[run_id] = invocation_state From a63f271dd81f6de1863ba97c2dec62a24582e65d Mon Sep 17 00:00:00 2001 From: wrisa Date: Mon, 20 Apr 2026 12:02:55 -0700 Subject: [PATCH 06/10] fixed type error --- .../instrumentation/langchain/callback_handler.py | 3 ++- .../instrumentation/langchain/invocation_manager.py | 4 ++-- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/instrumentation-genai/opentelemetry-instrumentation-langchain/src/opentelemetry/instrumentation/langchain/callback_handler.py b/instrumentation-genai/opentelemetry-instrumentation-langchain/src/opentelemetry/instrumentation/langchain/callback_handler.py index f99a91a4be..31a7db329d 100644 --- a/instrumentation-genai/opentelemetry-instrumentation-langchain/src/opentelemetry/instrumentation/langchain/callback_handler.py +++ b/instrumentation-genai/opentelemetry-instrumentation-langchain/src/opentelemetry/instrumentation/langchain/callback_handler.py @@ -79,7 +79,8 @@ def on_chain_start( else: # TODO: For agent invocation self._invocation_manager.add_invocation_state( - run_id, parent_run_id, + run_id, + parent_run_id, ) def on_chain_end( diff --git a/instrumentation-genai/opentelemetry-instrumentation-langchain/src/opentelemetry/instrumentation/langchain/invocation_manager.py b/instrumentation-genai/opentelemetry-instrumentation-langchain/src/opentelemetry/instrumentation/langchain/invocation_manager.py index 0431934f1e..1383c861e8 100644 --- a/instrumentation-genai/opentelemetry-instrumentation-langchain/src/opentelemetry/instrumentation/langchain/invocation_manager.py +++ b/instrumentation-genai/opentelemetry-instrumentation-langchain/src/opentelemetry/instrumentation/langchain/invocation_manager.py @@ -23,7 +23,7 @@ @dataclass class _InvocationState: - invocation: GenAIInvocation + invocation: Optional[GenAIInvocation] children: List[UUID] = field(default_factory=lambda: list()) @@ -39,7 +39,7 @@ def add_invocation_state( self, run_id: UUID, parent_run_id: Optional[UUID], - invocation: GenAIInvocation = None, + invocation: Optional[GenAIInvocation] = None, ): invocation_state = _InvocationState(invocation=invocation) self._invocations[run_id] = invocation_state From e8be64160a552f041f627ba1efa3ce0d91c6ad46 Mon Sep 17 00:00:00 2001 From: Wrisa Date: Tue, 21 Apr 2026 09:22:05 -0700 Subject: [PATCH 07/10] removed optional --- .../opentelemetry/instrumentation/langchain/callback_handler.py | 1 + .../instrumentation/langchain/invocation_manager.py | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/instrumentation-genai/opentelemetry-instrumentation-langchain/src/opentelemetry/instrumentation/langchain/callback_handler.py b/instrumentation-genai/opentelemetry-instrumentation-langchain/src/opentelemetry/instrumentation/langchain/callback_handler.py index 31a7db329d..8b000ee7c6 100644 --- a/instrumentation-genai/opentelemetry-instrumentation-langchain/src/opentelemetry/instrumentation/langchain/callback_handler.py +++ b/instrumentation-genai/opentelemetry-instrumentation-langchain/src/opentelemetry/instrumentation/langchain/callback_handler.py @@ -81,6 +81,7 @@ def on_chain_start( self._invocation_manager.add_invocation_state( run_id, parent_run_id, + None ) def on_chain_end( diff --git a/instrumentation-genai/opentelemetry-instrumentation-langchain/src/opentelemetry/instrumentation/langchain/invocation_manager.py b/instrumentation-genai/opentelemetry-instrumentation-langchain/src/opentelemetry/instrumentation/langchain/invocation_manager.py index 1383c861e8..139e9f4a6c 100644 --- a/instrumentation-genai/opentelemetry-instrumentation-langchain/src/opentelemetry/instrumentation/langchain/invocation_manager.py +++ b/instrumentation-genai/opentelemetry-instrumentation-langchain/src/opentelemetry/instrumentation/langchain/invocation_manager.py @@ -39,7 +39,7 @@ def add_invocation_state( self, run_id: UUID, parent_run_id: Optional[UUID], - invocation: Optional[GenAIInvocation] = None, + invocation: GenAIInvocation, ): invocation_state = _InvocationState(invocation=invocation) self._invocations[run_id] = invocation_state From a9dc1885b14387ec4a345f0265cf95fd568fc4c8 Mon Sep 17 00:00:00 2001 From: Wrisa Date: Tue, 21 Apr 2026 11:49:07 -0700 Subject: [PATCH 08/10] fixed error --- .../instrumentation/langchain/callback_handler.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/instrumentation-genai/opentelemetry-instrumentation-langchain/src/opentelemetry/instrumentation/langchain/callback_handler.py b/instrumentation-genai/opentelemetry-instrumentation-langchain/src/opentelemetry/instrumentation/langchain/callback_handler.py index 8b000ee7c6..d04654a73a 100644 --- a/instrumentation-genai/opentelemetry-instrumentation-langchain/src/opentelemetry/instrumentation/langchain/callback_handler.py +++ b/instrumentation-genai/opentelemetry-instrumentation-langchain/src/opentelemetry/instrumentation/langchain/callback_handler.py @@ -79,9 +79,7 @@ def on_chain_start( else: # TODO: For agent invocation self._invocation_manager.add_invocation_state( - run_id, - parent_run_id, - None + run_id, parent_run_id, None ) def on_chain_end( From 36056317051d40ead8aec371aef49425e32f74d2 Mon Sep 17 00:00:00 2001 From: Wrisa Date: Tue, 21 Apr 2026 12:05:54 -0700 Subject: [PATCH 09/10] ignore --- .../opentelemetry/instrumentation/langchain/callback_handler.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/instrumentation-genai/opentelemetry-instrumentation-langchain/src/opentelemetry/instrumentation/langchain/callback_handler.py b/instrumentation-genai/opentelemetry-instrumentation-langchain/src/opentelemetry/instrumentation/langchain/callback_handler.py index d04654a73a..0223467643 100644 --- a/instrumentation-genai/opentelemetry-instrumentation-langchain/src/opentelemetry/instrumentation/langchain/callback_handler.py +++ b/instrumentation-genai/opentelemetry-instrumentation-langchain/src/opentelemetry/instrumentation/langchain/callback_handler.py @@ -79,7 +79,7 @@ def on_chain_start( else: # TODO: For agent invocation self._invocation_manager.add_invocation_state( - run_id, parent_run_id, None + run_id, parent_run_id, None # type: ignore[arg-type] ) def on_chain_end( From 4b7808fdbc5cb00461e5990f70354c45fdf9186d Mon Sep 17 00:00:00 2001 From: Wrisa Date: Tue, 21 Apr 2026 12:26:30 -0700 Subject: [PATCH 10/10] fixed requests --- .../instrumentation/langchain/callback_handler.py | 4 +++- tests/opentelemetry-docker-tests/tests/test-requirements.txt | 2 +- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/instrumentation-genai/opentelemetry-instrumentation-langchain/src/opentelemetry/instrumentation/langchain/callback_handler.py b/instrumentation-genai/opentelemetry-instrumentation-langchain/src/opentelemetry/instrumentation/langchain/callback_handler.py index 0223467643..3c214d9e65 100644 --- a/instrumentation-genai/opentelemetry-instrumentation-langchain/src/opentelemetry/instrumentation/langchain/callback_handler.py +++ b/instrumentation-genai/opentelemetry-instrumentation-langchain/src/opentelemetry/instrumentation/langchain/callback_handler.py @@ -79,7 +79,9 @@ def on_chain_start( else: # TODO: For agent invocation self._invocation_manager.add_invocation_state( - run_id, parent_run_id, None # type: ignore[arg-type] + run_id, + parent_run_id, + None, # type: ignore[arg-type] ) def on_chain_end( diff --git a/tests/opentelemetry-docker-tests/tests/test-requirements.txt b/tests/opentelemetry-docker-tests/tests/test-requirements.txt index 434f44d597..50dfa259c9 100644 --- a/tests/opentelemetry-docker-tests/tests/test-requirements.txt +++ b/tests/opentelemetry-docker-tests/tests/test-requirements.txt @@ -58,7 +58,7 @@ python-dotenv==0.21.1 pytz==2024.1 PyYAML==5.3.1 redis==5.0.1 -requests==2.25.0 +requests==2.32.4 six==1.16.0 SQLAlchemy==1.4.52 texttable==1.7.0