Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## Unreleased

- Add LangChain workflow span support and refactor LLM invocation
([#4449](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/4449))
- Fix compatibility with wrapt 2.x by using positional arguments in `wrap_function_wrapper()` calls
([#4445](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/4445))
- Added span support for genAI langchain llm invocation.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
"""
LangGraph StateGraph example with an LLM node.

Similar to the manual example (../manual/main.py) but uses LangGraph's StateGraph
with a node that calls ChatOpenAI. OpenTelemetry LangChain instrumentation traces
the LLM calls made from within the graph node.
"""

from typing import Annotated

from langchain_core.messages import HumanMessage, SystemMessage
from langchain_openai import ChatOpenAI
from langgraph.graph import END, START, StateGraph
from langgraph.graph.message import add_messages
from typing_extensions import TypedDict

from opentelemetry import _logs, metrics, trace
from opentelemetry.exporter.otlp.proto.grpc._log_exporter import (
OTLPLogExporter,
)
from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import (
OTLPMetricExporter,
)
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import (
OTLPSpanExporter,
)
from opentelemetry.instrumentation.langchain import LangChainInstrumentor
from opentelemetry.sdk._logs import LoggerProvider
from opentelemetry.sdk._logs.export import BatchLogRecordProcessor
from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor

# Configure tracing
trace.set_tracer_provider(TracerProvider())
span_processor = BatchSpanProcessor(OTLPSpanExporter())
trace.get_tracer_provider().add_span_processor(span_processor)

# Configure logging
_logs.set_logger_provider(LoggerProvider())
_logs.get_logger_provider().add_log_record_processor(
BatchLogRecordProcessor(OTLPLogExporter())
)

# Configure metrics
metrics.set_meter_provider(
MeterProvider(
metric_readers=[
PeriodicExportingMetricReader(
OTLPMetricExporter(),
),
]
)
)


class GraphState(TypedDict):
"""State for the graph; messages are accumulated with add_messages."""

messages: Annotated[list, add_messages]


def build_graph(llm: ChatOpenAI):
"""Build a StateGraph with a single LLM node."""

def llm_node(state: GraphState) -> dict:
"""Node that invokes the LLM with the current messages."""
response = llm.invoke(state["messages"])
return {"messages": [response]}

builder = StateGraph(GraphState)
builder.add_node("llm", llm_node)
builder.add_edge(START, "llm")
builder.add_edge("llm", END)
return builder.compile()


def main():
# Set up instrumentation (traces LLM calls from within graph nodes)
LangChainInstrumentor().instrument()

# ChatOpenAI setup
llm = ChatOpenAI(
model="gpt-3.5-turbo",
temperature=0.1,
max_tokens=100,
top_p=0.9,
frequency_penalty=0.5,
presence_penalty=0.5,
stop_sequences=["\n", "Human:", "AI:"],
seed=100,
)

graph = build_graph(llm)

initial_messages = [
SystemMessage(content="You are a helpful assistant!"),
HumanMessage(content="What is the capital of France?"),
]

result = graph.invoke({"messages": initial_messages})

print("LangGraph output (messages):")
for msg in result.get("messages", []):
print(f" {type(msg).__name__}: {msg.content}")

# Un-instrument after use
LangChainInstrumentor().uninstrument()


if __name__ == "__main__":
main()
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
langchain==0.3.21
langchain_openai
langgraph
opentelemetry-sdk>=1.39.0
opentelemetry-exporter-otlp-proto-grpc>=1.39.0

# Uncomment after langchain instrumentation is released
# opentelemetry-instrumentation-langchain~=2.0b0.dev
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ classifiers = [
"Programming Language :: Python :: 3.14",
]
dependencies = [
"opentelemetry-instrumentation ~= 0.57b0",
"opentelemetry-instrumentation ~= 0.60b0",
"opentelemetry-util-genai >= 0.4b0.dev",
]

[project.optional-dependencies]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,12 @@
_InvocationManager,
)
from opentelemetry.util.genai.handler import TelemetryHandler
from opentelemetry.util.genai.invocation import (
InferenceInvocation,
WorkflowInvocation,
)
from opentelemetry.util.genai.types import (
Error,
InputMessage,
LLMInvocation, # TODO: migrate to InferenceInvocation
MessagePart,
OutputMessage,
Text,
Expand All @@ -45,6 +47,82 @@ def __init__(self, telemetry_handler: TelemetryHandler) -> None:
self._telemetry_handler = telemetry_handler
self._invocation_manager = _InvocationManager()

def on_chain_start(
self,
serialized: dict[str, Any],
inputs: dict[str, Any],
*,
run_id: UUID,
parent_run_id: Optional[UUID] = None,
tags: Optional[list[str]] = None,
metadata: Optional[dict[str, Any]] = None,
**kwargs: Any,
) -> Any:
payload = serialized or {}
name_source = (
payload.get("name")
or payload.get("id")
or kwargs.get("name")
or (metadata.get("langgraph_node") if metadata else None)
)
name = str(name_source or "chain")

if parent_run_id is None:
workflow_name_override = (
metadata.get("workflow_name") if metadata else None
)
wf = self._telemetry_handler.start_workflow(
name=workflow_name_override or name
)
self._invocation_manager.add_invocation_state(run_id, None, wf)
return
else:
# TODO: For agent invocation
self._invocation_manager.add_invocation_state(
run_id,
parent_run_id,
None, # type: ignore[arg-type]
)
Comment thread
wrisa marked this conversation as resolved.

def on_chain_end(
self,
outputs: dict[str, Any],
*,
run_id: UUID,
parent_run_id: Optional[UUID] = None,
**kwargs: Any,
) -> Any:
invocation = self._invocation_manager.get_invocation(run_id=run_id)
if invocation is None or not isinstance(
invocation, WorkflowInvocation
):
# If the invocation does not exist, we cannot set attributes or end it
return

invocation.stop()

if not invocation.span.is_recording():
self._invocation_manager.delete_invocation_state(run_id)

def on_chain_error(
self,
error: BaseException,
*,
run_id: UUID,
parent_run_id: Optional[UUID] = None,
**kwargs: Any,
) -> Any:
invocation = self._invocation_manager.get_invocation(run_id=run_id)
if invocation is None or not isinstance(
invocation, WorkflowInvocation
):
# If the invocation does not exist, we cannot set attributes or end it
return

invocation.fail(error)
if not invocation.span.is_recording():
self._invocation_manager.delete_invocation_state(run_id=run_id)

def on_chat_model_start(
self,
serialized: dict[str, Any],
Expand Down Expand Up @@ -140,25 +218,22 @@ def on_chat_model_start(
)
)

llm_invocation = LLMInvocation(
llm_invocation = self._telemetry_handler.start_inference(
provider,
request_model=request_model,
input_messages=input_messages,
provider=provider,
top_p=top_p,
frequency_penalty=frequency_penalty,
presence_penalty=presence_penalty,
stop_sequences=stop_sequences,
seed=seed,
temperature=temperature,
max_tokens=max_tokens,
)
llm_invocation = self._telemetry_handler.start_llm(
invocation=llm_invocation
)
llm_invocation.input_messages = input_messages
llm_invocation.top_p = top_p
llm_invocation.frequency_penalty = frequency_penalty
llm_invocation.presence_penalty = presence_penalty
llm_invocation.stop_sequences = stop_sequences
llm_invocation.seed = seed
llm_invocation.temperature = temperature
llm_invocation.max_tokens = max_tokens
self._invocation_manager.add_invocation_state(
run_id=run_id,
parent_run_id=parent_run_id,
invocation=llm_invocation, # pyright: ignore[reportArgumentType]
invocation=llm_invocation,
)

def on_llm_end(
Expand All @@ -172,7 +247,7 @@ def on_llm_end(
llm_invocation = self._invocation_manager.get_invocation(run_id=run_id)
if llm_invocation is None or not isinstance(
llm_invocation,
LLMInvocation,
InferenceInvocation,
):
# If the invocation does not exist, we cannot set attributes or end it
return
Expand Down Expand Up @@ -247,10 +322,8 @@ def on_llm_end(
if response_id is not None:
llm_invocation.response_id = str(response_id)

llm_invocation = self._telemetry_handler.stop_llm(
invocation=llm_invocation
)
if llm_invocation.span and not llm_invocation.span.is_recording():
llm_invocation.stop()
if not llm_invocation.span.is_recording():
self._invocation_manager.delete_invocation_state(run_id=run_id)

def on_llm_error(
Expand All @@ -264,14 +337,11 @@ def on_llm_error(
llm_invocation = self._invocation_manager.get_invocation(run_id=run_id)
if llm_invocation is None or not isinstance(
llm_invocation,
LLMInvocation,
InferenceInvocation,
):
# If the invocation does not exist, we cannot set attributes or end it
return

error_otel = Error(message=str(error), type=type(error))
llm_invocation = self._telemetry_handler.fail_llm(
invocation=llm_invocation, error=error_otel
)
if llm_invocation.span and not llm_invocation.span.is_recording():
llm_invocation.fail(error)
if not llm_invocation.span.is_recording():
self._invocation_manager.delete_invocation_state(run_id=run_id)
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@

@dataclass
class _InvocationState:
invocation: GenAIInvocation
invocation: Optional[GenAIInvocation]
children: List[UUID] = field(default_factory=lambda: list())


Expand Down
Loading
Loading