Skip to content
Merged
Show file tree
Hide file tree
Changes from 31 commits
Commits
Show all changes
54 commits
Select commit Hold shift + click to select a range
988fa6e
Add sync streaming support for Anthropic instrumentation
vasantteja Feb 1, 2026
ea0bd94
Add changelog entry for sync streaming support
vasantteja Feb 1, 2026
504d0df
Fix type checking errors with type: ignore comments
vasantteja Feb 1, 2026
8df752a
Refactor Anthropic instrumentation to improve usage tracking and erro…
vasantteja Feb 9, 2026
1b09377
Refactor utility functions and test cases for improved readability an…
vasantteja Feb 9, 2026
e6c83ac
Refactor argument handling in assert_span_attributes function
vasantteja Feb 9, 2026
1ed3c6b
Enhance tests for streaming message handling in Anthropic instrumenta…
vasantteja Feb 9, 2026
a011520
Merge branch 'main' into anthropic-sync-streaming
vasantteja Feb 9, 2026
2851e4a
Update test_sync_messages.py to disable pylint warning for too-many-l…
vasantteja Feb 9, 2026
3e5cbda
Merge branch 'anthropic-sync-streaming' of https://github.com/vasantt…
vasantteja Feb 9, 2026
38d4429
Enhance StreamWrapper and MessageStreamManagerWrapper for idempotent …
vasantteja Feb 10, 2026
685e161
Merge branch 'main' into anthropic-sync-streaming
vasantteja Feb 10, 2026
0f481c1
Enhance Anthropic instrumentation to support content capture
vasantteja Feb 11, 2026
75fcb3b
Enhance tests for sync message creation in Anthropic instrumentation
vasantteja Feb 11, 2026
8b5b20f
Remove sensitive 'anthropic-organization-id' headers from test casset…
vasantteja Feb 11, 2026
d48c7e3
Refactor tests for sync message handling in Anthropic instrumentation
vasantteja Feb 11, 2026
ee173da
Refactor utils.py for improved type safety and clarity
vasantteja Feb 12, 2026
ed46be8
Enhance Anthropic instrumentation tests for EVENT_ONLY content capture
vasantteja Feb 12, 2026
d1af778
Refactor assertion in sync messages test for clarity
vasantteja Feb 12, 2026
5093cbe
Merge branch 'main' into anthropic-sync-streaming
vasantteja Feb 13, 2026
401e1b1
Refactor content capture logic and enhance streaming tests for Anthro…
vasantteja Feb 13, 2026
15d1b05
unsetting the model.
vasantteja Feb 13, 2026
44b97a8
Merge branch 'main' into anthropic-sync-streaming
vasantteja Feb 16, 2026
7800a0e
Remove instrumentation for Messages.stream() and refactor related cod…
vasantteja Feb 16, 2026
df9911e
Refactor Anthropic instrumentation: reorganize imports, enhance utili…
vasantteja Feb 17, 2026
2590274
Add message extractors for Anthropic instrumentation.
vasantteja Feb 17, 2026
b4adeec
Refactor message extractors in Anthropic instrumentation: reorganize …
vasantteja Feb 17, 2026
e9c235a
Update test cassettes for Anthropic instrumentation: streamline reque…
vasantteja Feb 18, 2026
da275d0
Enhance Anthropic instrumentation: update MessageWrapper and StreamWr…
vasantteja Feb 19, 2026
2c2a780
Update test cassettes for Anthropic instrumentation: modify message I…
vasantteja Feb 19, 2026
cdd95b1
Merge branch 'main' into anthropic-sync-streaming
vasantteja Feb 19, 2026
1d1b7f5
Rename StreamWrapper to MessagesStreamWrapper and update references i…
vasantteja Feb 24, 2026
0793c46
Merge branch 'main' into anthropic-sync-streaming
vasantteja Feb 24, 2026
5effa69
Refactor type annotations in message extractors and wrappers for impr…
vasantteja Feb 25, 2026
89704d2
Merge branch 'anthropic-sync-streaming' of https://github.com/vasantt…
vasantteja Feb 25, 2026
a835a75
Enhance type annotations in message extractors and patch for improved…
vasantteja Feb 25, 2026
471204c
Enhance type safety and error handling in message processing. Update …
vasantteja Feb 26, 2026
d1bf3fe
Refactor assertions in test_sync_messages.py for improved readability…
vasantteja Feb 26, 2026
138bfa0
Merge branch 'main' into anthropic-sync-streaming
vasantteja Feb 26, 2026
cc0a5b0
Merge branch 'main' into anthropic-sync-streaming
vasantteja Feb 26, 2026
0dc8a9f
enforce strong typing system.
vasantteja Feb 26, 2026
f62dd6d
Update anthropic dependency version to 0.51.0 in pyproject.toml and r…
vasantteja Feb 26, 2026
1bf7695
Refactor usage token extraction to utilize a new UsageTokens dataclas…
vasantteja Feb 27, 2026
3d2bd63
Update anthropic dependency version in uv.lock to 0.51.0 for compatib…
vasantteja Feb 27, 2026
cc8e4d1
Add tests for should_capture_content function in test_events_options.py.
vasantteja Feb 27, 2026
1656950
Enhance Anthropic instrumentation by adding logging support and refin…
vasantteja Mar 3, 2026
ccf5551
Merge branch 'main' into anthropic-sync-streaming
vasantteja Mar 3, 2026
d62632f
Refactor content capturing utility function to clarify its purpose in…
vasantteja Mar 3, 2026
5c2481a
Refactor import statements in patch.py for improved readability and o…
vasantteja Mar 4, 2026
8c51037
Merge branch 'main' into anthropic-sync-streaming
vasantteja Mar 4, 2026
f2bc3cc
Merge branch 'main' into anthropic-sync-streaming
vasantteja Mar 5, 2026
d317443
Merge branch 'main' into anthropic-sync-streaming
aabmass Mar 5, 2026
07aee9f
Merge branch 'main' into anthropic-sync-streaming
vasantteja Mar 6, 2026
f5054b3
Merge branch 'main' into anthropic-sync-streaming
vasantteja Mar 9, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Added

- Add sync streaming support for `Messages.create(stream=True)` and `Messages.stream()`
([#4155](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/4155))
- `StreamWrapper` for handling `Messages.create(stream=True)` telemetry
- `MessageStreamManagerWrapper` for handling `Messages.stream()` telemetry
- `MessageWrapper` for non-streaming response telemetry extraction
- Initial implementation of Anthropic instrumentation
([#3978](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3978))
- Implement sync `Messages.create` instrumentation with GenAI semantic convention attributes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,9 @@
)

from opentelemetry.instrumentation.anthropic.package import _instruments
from opentelemetry.instrumentation.anthropic.patch import messages_create
from opentelemetry.instrumentation.anthropic.patch import (
messages_create,
)
from opentelemetry.instrumentation.instrumentor import BaseInstrumentor
from opentelemetry.instrumentation.utils import unwrap
from opentelemetry.util.genai.handler import TelemetryHandler
Expand Down Expand Up @@ -89,11 +91,12 @@ def _instrument(self, **kwargs: Any) -> None:
# Get providers from kwargs
tracer_provider = kwargs.get("tracer_provider")
meter_provider = kwargs.get("meter_provider")
logger_provider = kwargs.get("logger_provider")

# TODO: Add logger_provider to TelemetryHandler to capture content events.
handler = TelemetryHandler(
tracer_provider=tracer_provider,
meter_provider=meter_provider,
logger_provider=logger_provider,
)

# Patch Messages.create
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,202 @@
# Copyright The OpenTelemetry Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Get/extract helpers for Anthropic Messages instrumentation."""

from __future__ import annotations

from dataclasses import dataclass
from typing import TYPE_CHECKING, Any, Optional, Sequence, cast
from urllib.parse import urlparse

from opentelemetry.semconv._incubating.attributes import (
gen_ai_attributes as GenAIAttributes,
)
from opentelemetry.semconv._incubating.attributes import (
server_attributes as ServerAttributes,
)
from opentelemetry.util.genai.types import (
InputMessage,
MessagePart,
OutputMessage,
)
from opentelemetry.util.types import AttributeValue

from .utils import (
_as_str,
_get_field,
as_int,
convert_content_to_parts,
normalize_finish_reason,
)

if TYPE_CHECKING:
from anthropic.resources.messages import Messages


@dataclass
class MessageRequestParams:
model: str | None = None
max_tokens: int | None = None
temperature: float | None = None
top_k: int | None = None
top_p: float | None = None
stop_sequences: Sequence[str] | None = None
messages: Any | None = None
system: Any | None = None


GEN_AI_USAGE_CACHE_CREATION_INPUT_TOKENS = (
"gen_ai.usage.cache_creation.input_tokens"
)
GEN_AI_USAGE_CACHE_READ_INPUT_TOKENS = "gen_ai.usage.cache_read.input_tokens"


def extract_usage_tokens(
usage: Any | None,
) -> tuple[int | None, int | None, int | None, int | None]:
Comment thread
vasantteja marked this conversation as resolved.
Outdated
if usage is None:
return None, None, None, None

input_tokens = as_int(getattr(usage, "input_tokens", None))
Comment thread
vasantteja marked this conversation as resolved.
Outdated
cache_creation_input_tokens = as_int(
getattr(usage, "cache_creation_input_tokens", None)
)
cache_read_input_tokens = as_int(
getattr(usage, "cache_read_input_tokens", None)
)
output_tokens = as_int(getattr(usage, "output_tokens", None))

if (
input_tokens is None
and cache_creation_input_tokens is None
and cache_read_input_tokens is None
):
total_input_tokens = None
else:
total_input_tokens = (
(input_tokens or 0)
+ (cache_creation_input_tokens or 0)
+ (cache_read_input_tokens or 0)
)

return (
total_input_tokens,
output_tokens,
cache_creation_input_tokens,
cache_read_input_tokens,
)


def get_input_messages(messages: Any) -> list[InputMessage]:
Comment thread
vasantteja marked this conversation as resolved.
Outdated
if not isinstance(messages, list):
return []

result: list[InputMessage] = []
for message in cast(list[Any], messages):
role = _as_str(_get_field(message, "role")) or "user"
parts = convert_content_to_parts(_get_field(message, "content"))
result.append(InputMessage(role=role, parts=parts))
return result


def get_system_instruction(system: Any) -> list[MessagePart]:
return convert_content_to_parts(system)


def get_output_messages_from_message(message: Any) -> list[OutputMessage]:
if message is None:
return []

parts = convert_content_to_parts(_get_field(message, "content"))
finish_reason = normalize_finish_reason(_get_field(message, "stop_reason"))
return [
OutputMessage(
role=_as_str(_get_field(message, "role")) or "assistant",
parts=parts,
finish_reason=finish_reason or "",
)
]


def extract_params( # pylint: disable=too-many-locals
*,
max_tokens: int | None = None,
messages: Any | None = None,
model: str | None = None,
metadata: Any | None = None,
service_tier: Any | None = None,
stop_sequences: Sequence[str] | None = None,
stream: Any | None = None,
system: Any | None = None,
temperature: float | None = None,
thinking: Any | None = None,
tool_choice: Any | None = None,
tools: Any | None = None,
top_k: int | None = None,
top_p: float | None = None,
extra_headers: Any | None = None,
extra_query: Any | None = None,
extra_body: Any | None = None,
timeout: Any | None = None,
**_kwargs: Any,
) -> MessageRequestParams:
return MessageRequestParams(
model=model,
max_tokens=max_tokens,
temperature=temperature,
top_p=top_p,
top_k=top_k,
stop_sequences=stop_sequences,
messages=messages,
system=system,
)


def _set_server_address_and_port(
client_instance: "Messages", attributes: dict[str, Any]
) -> None:
base_client = getattr(client_instance, "_client", None)
base_url = getattr(base_client, "base_url", None)
if not base_url:
return

port: Optional[int] = None
if hasattr(base_url, "host"):
attributes[ServerAttributes.SERVER_ADDRESS] = base_url.host
port = getattr(base_url, "port", None)
elif isinstance(base_url, str):
url = urlparse(base_url)
attributes[ServerAttributes.SERVER_ADDRESS] = url.hostname
port = url.port

if port and port != 443 and port > 0:
attributes[ServerAttributes.SERVER_PORT] = port


def get_llm_request_attributes(
params: MessageRequestParams, client_instance: "Messages"
) -> dict[str, AttributeValue]:
attributes = {
GenAIAttributes.GEN_AI_OPERATION_NAME: GenAIAttributes.GenAiOperationNameValues.CHAT.value,
GenAIAttributes.GEN_AI_SYSTEM: GenAIAttributes.GenAiSystemValues.ANTHROPIC.value, # pyright: ignore[reportDeprecated]
GenAIAttributes.GEN_AI_REQUEST_MODEL: params.model,
GenAIAttributes.GEN_AI_REQUEST_MAX_TOKENS: params.max_tokens,
GenAIAttributes.GEN_AI_REQUEST_TEMPERATURE: params.temperature,
GenAIAttributes.GEN_AI_REQUEST_TOP_P: params.top_p,
GenAIAttributes.GEN_AI_REQUEST_TOP_K: params.top_k,
GenAIAttributes.GEN_AI_REQUEST_STOP_SEQUENCES: params.stop_sequences,
}
_set_server_address_and_port(client_instance, attributes)
return {k: v for k, v in attributes.items() if v is not None}
Original file line number Diff line number Diff line change
Expand Up @@ -14,65 +14,95 @@

"""Patching functions for Anthropic instrumentation."""

from typing import TYPE_CHECKING, Any, Callable
from typing import TYPE_CHECKING, Any, Callable, Union, cast

from opentelemetry.semconv._incubating.attributes import (
gen_ai_attributes as GenAIAttributes,
)
from opentelemetry.util.genai.handler import TelemetryHandler
from opentelemetry.util.genai.types import LLMInvocation
from opentelemetry.util.genai.types import Error, LLMInvocation
from opentelemetry.util.genai.utils import should_capture_content

from .utils import (
from .messages_extractors import (
extract_params,
get_input_messages,
get_llm_request_attributes,
get_system_instruction,
)
from .wrappers import (
MessageWrapper,
StreamWrapper,
Comment thread
vasantteja marked this conversation as resolved.
Outdated
)

if TYPE_CHECKING:
from anthropic._streaming import Stream
from anthropic.resources.messages import Messages
from anthropic.types import Message
from anthropic.types import Message, RawMessageStreamEvent


ANTHROPIC = "anthropic"


def messages_create(
handler: TelemetryHandler,
) -> Callable[..., "Message"]:
) -> Callable[..., Union["Message", "Stream[RawMessageStreamEvent]"]]:
"""Wrap the `create` method of the `Messages` class to trace it."""
capture_content = should_capture_content()

def traced_method(
wrapped: Callable[..., "Message"],
wrapped: Callable[
..., Union["Message", "Stream[RawMessageStreamEvent]"]
],
instance: "Messages",
args: tuple[Any, ...],
kwargs: dict[str, Any],
) -> "Message":
) -> Union["Message", StreamWrapper]:
params = extract_params(*args, **kwargs)
attributes = get_llm_request_attributes(params, instance)
request_model = str(
attributes.get(GenAIAttributes.GEN_AI_REQUEST_MODEL)
or params.model
or "unknown"
request_model_attribute = attributes.get(
GenAIAttributes.GEN_AI_REQUEST_MODEL
)
request_model = (
request_model_attribute
if isinstance(request_model_attribute, str)
else params.model
)

invocation = LLMInvocation(
request_model=request_model,
provider="anthropic",
provider=ANTHROPIC,
input_messages=get_input_messages(params.messages)
if capture_content
else [],
system_instruction=get_system_instruction(params.system)
if capture_content
else [],
attributes=attributes,
)

with handler.llm(invocation) as invocation:
result = wrapped(*args, **kwargs)

if result.model:
invocation.response_model_name = result.model
is_streaming = kwargs.get("stream", False)
Comment thread
vasantteja marked this conversation as resolved.
Outdated

if result.id:
invocation.response_id = result.id

if result.stop_reason:
invocation.finish_reasons = [result.stop_reason]

if result.usage:
invocation.input_tokens = result.usage.input_tokens
invocation.output_tokens = result.usage.output_tokens

return result

return traced_method
# Use manual lifecycle management for both streaming and non-streaming
Comment thread
vasantteja marked this conversation as resolved.
handler.start_llm(invocation)
try:
result = wrapped(*args, **kwargs)
if is_streaming:
Comment thread
aabmass marked this conversation as resolved.
Outdated
stream_result = cast("Stream[RawMessageStreamEvent]", result)
Comment thread
vasantteja marked this conversation as resolved.
Outdated
return StreamWrapper(
stream_result, handler, invocation, capture_content
)
message_result = cast("Message", result)
Comment thread
vasantteja marked this conversation as resolved.
Outdated
wrapper = MessageWrapper(message_result, capture_content)
wrapper.extract_into(invocation)
handler.stop_llm(invocation)
return wrapper.message
except Exception as exc:
handler.fail_llm(
invocation, Error(message=str(exc), type=type(exc))
)
raise

return cast(
Callable[..., Union["Message", "Stream[RawMessageStreamEvent]"]],
traced_method,
)
Loading