Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## Unreleased

- Add LangChain workflow span support and refactor LLM invocation
([#4449](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/4449))
- Fix compatibility with wrapt 2.x by using positional arguments in `wrap_function_wrapper()` calls
([#4445](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/4445))
- Added span support for genAI langchain llm invocation.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
"""
LangGraph StateGraph example with an LLM node.

Similar to the manual example (../manual/main.py) but uses LangGraph's StateGraph
with a node that calls ChatOpenAI. OpenTelemetry LangChain instrumentation traces
the LLM calls made from within the graph node.
"""

from typing import Annotated

from langchain_core.messages import HumanMessage, SystemMessage
from langchain_openai import ChatOpenAI
from langgraph.graph import END, START, StateGraph
from langgraph.graph.message import add_messages
from typing_extensions import TypedDict

from opentelemetry import _logs, metrics, trace
from opentelemetry.exporter.otlp.proto.grpc._log_exporter import (
OTLPLogExporter,
)
from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import (
OTLPMetricExporter,
)
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import (
OTLPSpanExporter,
)
from opentelemetry.instrumentation.langchain import LangChainInstrumentor
from opentelemetry.sdk._logs import LoggerProvider
from opentelemetry.sdk._logs.export import BatchLogRecordProcessor
from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor

# Configure tracing
trace.set_tracer_provider(TracerProvider())
span_processor = BatchSpanProcessor(OTLPSpanExporter())
trace.get_tracer_provider().add_span_processor(span_processor)

# Configure logging
_logs.set_logger_provider(LoggerProvider())
_logs.get_logger_provider().add_log_record_processor(
BatchLogRecordProcessor(OTLPLogExporter())
)

# Configure metrics
metrics.set_meter_provider(
MeterProvider(
metric_readers=[
PeriodicExportingMetricReader(
OTLPMetricExporter(),
),
]
)
)


class GraphState(TypedDict):
"""State for the graph; messages are accumulated with add_messages."""

messages: Annotated[list, add_messages]


def build_graph(llm: ChatOpenAI):
"""Build a StateGraph with a single LLM node."""

def llm_node(state: GraphState) -> dict:
"""Node that invokes the LLM with the current messages."""
response = llm.invoke(state["messages"])
return {"messages": [response]}

builder = StateGraph(GraphState)
builder.add_node("llm", llm_node)
builder.add_edge(START, "llm")
builder.add_edge("llm", END)
return builder.compile()


def main():
# Set up instrumentation (traces LLM calls from within graph nodes)
LangChainInstrumentor().instrument()

# ChatOpenAI setup
llm = ChatOpenAI(
model="gpt-3.5-turbo",
temperature=0.1,
max_tokens=100,
top_p=0.9,
frequency_penalty=0.5,
presence_penalty=0.5,
stop_sequences=["\n", "Human:", "AI:"],
Copy link

Copilot AI Apr 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ChatOpenAI typically expects stop (not stop_sequences) for stop tokens; using an unsupported constructor kwarg will raise at runtime and make the example fail. Update the example to use the correct parameter name(s) supported by langchain_openai.ChatOpenAI for the pinned langchain==0.3.21.

Suggested change
stop_sequences=["\n", "Human:", "AI:"],
stop=["\n", "Human:", "AI:"],

Copilot uses AI. Check for mistakes.
seed=100,
)

graph = build_graph(llm)

initial_messages = [
SystemMessage(content="You are a helpful assistant!"),
HumanMessage(content="What is the capital of France?"),
]

result = graph.invoke({"messages": initial_messages})

print("LangGraph output (messages):")
for msg in result.get("messages", []):
print(f" {type(msg).__name__}: {msg.content}")

# Un-instrument after use
LangChainInstrumentor().uninstrument()


if __name__ == "__main__":
main()
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
langchain==0.3.21
langchain_openai
langgraph
opentelemetry-sdk>=1.39.0
opentelemetry-exporter-otlp-proto-grpc>=1.39.0

# Uncomment after langchain instrumentation is released
# opentelemetry-instrumentation-langchain~=2.0b0.dev
Comment on lines +7 to +8
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can delete this if not needed anymore

Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ classifiers = [
"Programming Language :: Python :: 3.14",
]
dependencies = [
"opentelemetry-instrumentation ~= 0.57b0",
"opentelemetry-instrumentation ~= 0.60b0",
"opentelemetry-util-genai >= 0.4b0.dev",
]

[project.optional-dependencies]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,12 @@
_InvocationManager,
)
from opentelemetry.util.genai.handler import TelemetryHandler
from opentelemetry.util.genai.invocation import (
InferenceInvocation,
WorkflowInvocation,
)
from opentelemetry.util.genai.types import (
Error,
InputMessage,
LLMInvocation, # TODO: migrate to InferenceInvocation
MessagePart,
OutputMessage,
Text,
Expand All @@ -45,6 +47,82 @@ def __init__(self, telemetry_handler: TelemetryHandler) -> None:
self._telemetry_handler = telemetry_handler
self._invocation_manager = _InvocationManager()

def on_chain_start(
self,
serialized: dict[str, Any],
inputs: dict[str, Any],
*,
run_id: UUID,
parent_run_id: Optional[UUID] = None,
tags: Optional[list[str]] = None,
metadata: Optional[dict[str, Any]] = None,
**kwargs: Any,
) -> Any:
payload = serialized or {}
name_source = (
payload.get("name")
or payload.get("id")
or kwargs.get("name")
or (metadata.get("langgraph_node") if metadata else None)
)
name = str(name_source or "chain")

if parent_run_id is None:
workflow_name_override = (
metadata.get("workflow_name") if metadata else None
)
wf = self._telemetry_handler.start_workflow(
name=workflow_name_override or name
)
self._invocation_manager.add_invocation_state(run_id, None, wf)
return
else:
# TODO: For agent invocation
self._invocation_manager.add_invocation_state(
run_id,
parent_run_id,
None, # type: ignore[arg-type]
)
Comment thread
wrisa marked this conversation as resolved.

Comment on lines +79 to +86
Copy link

Copilot AI Apr 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nested chains are recorded in the invocation manager with invocation=None, but on_chain_end() returns early when the invocation is missing/non-WorkflowInvocation. If parent_run_id is not present in the manager (e.g., out-of-order callbacks or partial instrumentation), this creates orphaned entries that will never be cleaned up. Consider deleting the invocation state on on_chain_end() / on_chain_error() when get_invocation() returns None (or when it’s not a WorkflowInvocation), or avoid storing state at all when the parent is unknown.

Suggested change
else:
# TODO: For agent invocation
self._invocation_manager.add_invocation_state(
run_id,
parent_run_id,
None, # type: ignore[arg-type]
)
parent_invocation = self._invocation_manager.get_invocation(
run_id=parent_run_id
)
if parent_invocation is None or not isinstance(
parent_invocation, WorkflowInvocation
):
# Do not record nested chain state when the parent is unknown or
# not a workflow; otherwise we can create orphaned entries that
# on_chain_end/on_chain_error cannot clean up.
return
# TODO: For agent invocation
self._invocation_manager.add_invocation_state(
run_id,
parent_run_id,
None, # type: ignore[arg-type]
)

Copilot uses AI. Check for mistakes.
def on_chain_end(
self,
outputs: dict[str, Any],
*,
run_id: UUID,
parent_run_id: Optional[UUID] = None,
**kwargs: Any,
) -> Any:
invocation = self._invocation_manager.get_invocation(run_id=run_id)
if invocation is None or not isinstance(
invocation, WorkflowInvocation
):
# If the invocation does not exist, we cannot set attributes or end it
return
Comment on lines +95 to +100
Copy link

Copilot AI Apr 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nested chains are recorded in the invocation manager with invocation=None, but on_chain_end() returns early when the invocation is missing/non-WorkflowInvocation. If parent_run_id is not present in the manager (e.g., out-of-order callbacks or partial instrumentation), this creates orphaned entries that will never be cleaned up. Consider deleting the invocation state on on_chain_end() / on_chain_error() when get_invocation() returns None (or when it’s not a WorkflowInvocation), or avoid storing state at all when the parent is unknown.

Copilot uses AI. Check for mistakes.

invocation.stop()

if not invocation.span.is_recording():
self._invocation_manager.delete_invocation_state(run_id)

def on_chain_error(
self,
error: BaseException,
*,
run_id: UUID,
parent_run_id: Optional[UUID] = None,
**kwargs: Any,
) -> Any:
invocation = self._invocation_manager.get_invocation(run_id=run_id)
if invocation is None or not isinstance(
invocation, WorkflowInvocation
):
# If the invocation does not exist, we cannot set attributes or end it
return

invocation.fail(error)
if not invocation.span.is_recording():
self._invocation_manager.delete_invocation_state(run_id=run_id)

def on_chat_model_start(
self,
serialized: dict[str, Any],
Expand Down Expand Up @@ -140,25 +218,22 @@ def on_chat_model_start(
)
)

llm_invocation = LLMInvocation(
llm_invocation = self._telemetry_handler.start_inference(
provider,
request_model=request_model,
input_messages=input_messages,
provider=provider,
top_p=top_p,
frequency_penalty=frequency_penalty,
presence_penalty=presence_penalty,
stop_sequences=stop_sequences,
seed=seed,
temperature=temperature,
max_tokens=max_tokens,
)
llm_invocation = self._telemetry_handler.start_llm(
invocation=llm_invocation
)
llm_invocation.input_messages = input_messages
llm_invocation.top_p = top_p
llm_invocation.frequency_penalty = frequency_penalty
llm_invocation.presence_penalty = presence_penalty
llm_invocation.stop_sequences = stop_sequences
llm_invocation.seed = seed
llm_invocation.temperature = temperature
llm_invocation.max_tokens = max_tokens
self._invocation_manager.add_invocation_state(
run_id=run_id,
parent_run_id=parent_run_id,
invocation=llm_invocation, # pyright: ignore[reportArgumentType]
invocation=llm_invocation,
)

def on_llm_end(
Expand All @@ -172,7 +247,7 @@ def on_llm_end(
llm_invocation = self._invocation_manager.get_invocation(run_id=run_id)
if llm_invocation is None or not isinstance(
llm_invocation,
LLMInvocation,
InferenceInvocation,
):
# If the invocation does not exist, we cannot set attributes or end it
return
Expand Down Expand Up @@ -247,10 +322,8 @@ def on_llm_end(
if response_id is not None:
llm_invocation.response_id = str(response_id)

llm_invocation = self._telemetry_handler.stop_llm(
invocation=llm_invocation
)
if llm_invocation.span and not llm_invocation.span.is_recording():
llm_invocation.stop()
if not llm_invocation.span.is_recording():
self._invocation_manager.delete_invocation_state(run_id=run_id)

def on_llm_error(
Expand All @@ -264,14 +337,11 @@ def on_llm_error(
llm_invocation = self._invocation_manager.get_invocation(run_id=run_id)
if llm_invocation is None or not isinstance(
llm_invocation,
LLMInvocation,
InferenceInvocation,
):
# If the invocation does not exist, we cannot set attributes or end it
return

error_otel = Error(message=str(error), type=type(error))
llm_invocation = self._telemetry_handler.fail_llm(
invocation=llm_invocation, error=error_otel
)
if llm_invocation.span and not llm_invocation.span.is_recording():
llm_invocation.fail(error)
if not llm_invocation.span.is_recording():
self._invocation_manager.delete_invocation_state(run_id=run_id)
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@

@dataclass
class _InvocationState:
invocation: GenAIInvocation
invocation: Optional[GenAIInvocation]
Copy link

Copilot AI Apr 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now that _InvocationState.invocation is Optional[GenAIInvocation], _InvocationManager.add_invocation_state(...) should accept Optional[GenAIInvocation] as well (and callers should no longer need # type: ignore[arg-type]). Updating the manager’s method signature and any related typing will keep the public surface consistent and prevent type-suppression from hiding real issues.

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you use copilot's suggestion earlier to just return when you have no invocation, we don't need to set optional here. Either path would work

children: List[UUID] = field(default_factory=lambda: list())


Expand Down
Loading
Loading