Skip to content

feat: Add sync streaming support for Anthropic instrumentation#4155

Merged
xrmx merged 54 commits intoopen-telemetry:mainfrom
vasantteja:anthropic-sync-streaming
Mar 9, 2026
Merged

feat: Add sync streaming support for Anthropic instrumentation#4155
xrmx merged 54 commits intoopen-telemetry:mainfrom
vasantteja:anthropic-sync-streaming

Conversation

@vasantteja
Copy link
Copy Markdown
Contributor

@vasantteja vasantteja commented Feb 1, 2026

Description

This PR adds sync streaming support for the Anthropic instrumentation. It enables telemetry capture for:

  1. Messages.create(stream=True) - Streaming responses via the create method with stream parameter
  2. Messages.stream() - The dedicated streaming method that returns a MessageStreamManager

Key changes:

  • Added StreamWrapper class to wrap Stream[RawMessageStreamEvent] and extract telemetry from streaming chunks
  • Added MessageStreamManagerWrapper to wrap MessageStreamManager context manager
  • Added MessageWrapper for non-streaming response telemetry extraction
  • Renamed MessageCreateParams to MessageRequestParams to reflect broader API coverage
  • Modified messages_create to use manual lifecycle management (start_llm/stop_llm) instead of context manager to support both streaming and non-streaming

Fixes #3949 partially.

Type of change

Please delete options that are not relevant.

  • Bug fix (non-breaking change which fixes an issue)
  • New feature (non-breaking change which adds functionality)
  • Breaking change (fix or feature that would cause existing functionality to not work as expected)
  • This change requires a documentation update

How Has This Been Tested?

Added comprehensive tests for sync streaming functionality:

  • test_sync_messages_create_streaming - Tests streaming with context manager
  • test_sync_messages_create_streaming_iteration - Tests direct iteration without context manager
  • test_sync_messages_create_streaming_connection_error - Tests error handling for streaming
  • test_sync_messages_stream_basic - Tests Messages.stream() method
  • test_sync_messages_stream_with_params - Tests stream with additional parameters (temperature, top_p, top_k)
  • test_sync_messages_stream_token_usage - Tests token usage capture in streaming
  • test_sync_messages_stream_connection_error - Tests error handling for stream method

All tests use VCR cassettes for reproducible HTTP interaction replay.

Does This PR Require a Core Repo Change?

  • Yes. - Link to PR:
  • No.

Checklist:

See contributing.md for styleguide, changelog guidelines, and more.

  • Followed the style guidelines of this project
  • Changelogs have been updated
  • Unit tests have been added
  • Documentation has been updated

- Add support for Messages.create(stream=True) with StreamWrapper
- Add support for Messages.stream() with MessageStreamManagerWrapper
- Add MessageWrapper for non-streaming response telemetry
- Rename MessageCreateParams to MessageRequestParams
- Add comprehensive tests for sync streaming functionality
- Add type: ignore[arg-type] for Union type narrowing in messages_create
- Add type: ignore[return-value] for wrapper return types
- Add type: ignore[return-value] for __exit__ returning None
@vasantteja vasantteja force-pushed the anthropic-sync-streaming branch from 99a2596 to 504d0df Compare February 1, 2026 16:57
@vasantteja vasantteja removed their assignment Feb 5, 2026
@lmolkova
Copy link
Copy Markdown
Member

lmolkova commented Feb 8, 2026

tagging @anirudha who was interested to review the PR :)

@anirudha
Copy link
Copy Markdown

anirudha commented Feb 8, 2026

Thanks. Taking a look today

…r handling

- Introduce constants for provider name and cache token attributes.
- Normalize stop reasons and aggregate cache token fields in MessageWrapper and StreamWrapper.
- Enhance tests to validate input token aggregation and stop reason normalization.
- Update cassettes for new request and response structures in streaming scenarios.
@vasantteja vasantteja removed their assignment Feb 9, 2026
…d consistency

- Simplify constant definitions and normalize function calls in utils.py.
- Enhance test cases by removing unnecessary line breaks and improving formatting.
- Ensure consistent usage of type hints and comments in test functions.
@vasantteja vasantteja removed their assignment Feb 9, 2026
- Update the pylint directive to disable too-many-arguments warning for better clarity.
- Maintain consistency in function signature and improve code readability.
Copy link
Copy Markdown

@anirudha anirudha left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tests all pass locally. Nice work overall — the wrapper separation is clean. One bug to fix (double finalize), rest are suggestions.

Note: conftest.py isn't in this diff so I can't leave a line comment, but scrub_response_headers is a no-op and all new cassettes leak anthropic-organization-id: 455ea6be-bd92-4199-83ec-0c6b39c5c169. Worth scrubbing that or adding it to filter_headers.

Also, the PR description says Fixes #3949 but async streaming isn't covered. Totally fine to scope this to sync only, but Fixes will auto-close the issue on merge. Maybe Partially addresses #3949 instead?

…tion

- Update test cases to validate streaming behavior with various parameters, including token usage and stop reasons.
- Introduce new cassettes for different scenarios, ensuring comprehensive coverage of streaming interactions.
- Refactor existing tests for clarity and consistency in structure and assertions.
…ocals in test_stream_wrapper_finalize_idempotent function
…equirements.oldest.txt for compatibility improvements.
…s for improved clarity and type safety. Update extract_usage_tokens function to return UsageTokens instead of a tuple, and adjust related invocations in MessageWrapper and MessagesStreamWrapper accordingly.
Copy link
Copy Markdown
Member

@Cirilla-zmh Cirilla-zmh left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, thank you!

@vasantteja vasantteja removed their assignment Feb 27, 2026
…ing type hints in messages_create function. Update test cassettes for improved accuracy and consistency in response data.
@MikeGoldsmith MikeGoldsmith moved this from Approved PRs that need fixes to Approved PRs in Python PR digest Mar 3, 2026
… experimental mode. Update related tests to reflect the new function name and ensure accurate assertions for content capturing behavior.
Copy link
Copy Markdown
Contributor

@nagkumar91 nagkumar91 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good — the double-finalization issue I flagged previously has been properly addressed with _finalized guards on both _stop() and _fail(). __exit__ now correctly checks exc_type before calling _fail(), and close() is idempotent via the guard. Clean lifecycle management overall.

Just needs a rebase onto latest main.

@vasantteja vasantteja removed their assignment Mar 9, 2026
@xrmx xrmx enabled auto-merge (squash) March 9, 2026 14:30
@xrmx xrmx merged commit ff7f60e into open-telemetry:main Mar 9, 2026
784 checks passed
@github-project-automation github-project-automation Bot moved this from Approved PRs to Done in Python PR digest Mar 9, 2026
sightseeker added a commit to sightseeker/opentelemetry-python-contrib that referenced this pull request Mar 11, 2026
…telemetry#4155)

* Add sync streaming support for Anthropic instrumentation

- Add support for Messages.create(stream=True) with StreamWrapper
- Add support for Messages.stream() with MessageStreamManagerWrapper
- Add MessageWrapper for non-streaming response telemetry
- Rename MessageCreateParams to MessageRequestParams
- Add comprehensive tests for sync streaming functionality

* Add changelog entry for sync streaming support

* Fix type checking errors with type: ignore comments

- Add type: ignore[arg-type] for Union type narrowing in messages_create
- Add type: ignore[return-value] for wrapper return types
- Add type: ignore[return-value] for __exit__ returning None

* Refactor Anthropic instrumentation to improve usage tracking and error handling

- Introduce constants for provider name and cache token attributes.
- Normalize stop reasons and aggregate cache token fields in MessageWrapper and StreamWrapper.
- Enhance tests to validate input token aggregation and stop reason normalization.
- Update cassettes for new request and response structures in streaming scenarios.

* Refactor utility functions and test cases for improved readability and consistency

- Simplify constant definitions and normalize function calls in utils.py.
- Enhance test cases by removing unnecessary line breaks and improving formatting.
- Ensure consistent usage of type hints and comments in test functions.

* Refactor argument handling in assert_span_attributes function

- Update the pylint directive to disable too-many-arguments warning for better clarity.
- Maintain consistency in function signature and improve code readability.

* Enhance tests for streaming message handling in Anthropic instrumentation

- Update test cases to validate streaming behavior with various parameters, including token usage and stop reasons.
- Introduce new cassettes for different scenarios, ensuring comprehensive coverage of streaming interactions.
- Refactor existing tests for clarity and consistency in structure and assertions.

* Update test_sync_messages.py to disable pylint warning for too-many-locals in test_stream_wrapper_finalize_idempotent function

* Enhance StreamWrapper and MessageStreamManagerWrapper for idempotent finalization

- Refactor finalization logic in StreamWrapper and MessageStreamManagerWrapper to ensure idempotent behavior during context exit.
- Introduce new methods for successful and error finalization, improving clarity and reducing code duplication.
- Add tests to validate double exit idempotency in streaming scenarios, ensuring only one span is emitted.
- Update cassettes to reflect new request and response structures for streaming interactions.

* Enhance Anthropic instrumentation to support content capture

- Added logger_provider to TelemetryHandler for improved logging capabilities.
- Implemented content capture logic in messages_create and messages_stream functions, allowing for the extraction of input messages and system instructions.
- Introduced utility functions for content conversion and message handling in utils.py.
- Updated tests to validate content capture functionality for both synchronous and streaming message creation.
- Added new cassettes to reflect the changes in request and response structures for content capture scenarios.

* Enhance tests for sync message creation in Anthropic instrumentation

- Added checks for the presence of 'tools' and 'thinking' parameters in the installed anthropic SDK.
- Updated test cases to skip if the SDK version does not support these parameters, ensuring compatibility with older versions.
- Improved test robustness by dynamically determining parameter support.

* Remove sensitive 'anthropic-organization-id' headers from test cassettes and update header scrubbing logic in tests. This enhances security by ensuring sensitive information is not recorded in test artifacts.

* Refactor tests for sync message handling in Anthropic instrumentation

- Simplified detection of 'tools' and 'thinking' parameters by directly accessing the _Messages class.
- Improved readability of test cases by formatting input message loading.
- Enhanced test function signatures for better clarity and maintainability.

* Refactor utils.py for improved type safety and clarity

- Added type casting for dictionary access to enhance type safety.
- Simplified content block conversion logic to improve readability and maintainability.
- Updated test cases to ensure consistent handling of content types and structures.

* Enhance Anthropic instrumentation tests for EVENT_ONLY content capture

- Introduced a new fixture to instrument Anthropic with EVENT_ONLY content capture mode.
- Added tests to verify that content is not captured in span attributes while ensuring log events are emitted correctly.
- Updated cassettes to reflect new request and response structures for EVENT_ONLY scenarios.
- Enhanced existing tests to cover various content capture scenarios, including streaming and tool usage.

* Refactor assertion in sync messages test for clarity

- Simplified the assertion statement in the test_sync_messages_create_event_only_no_content_in_span function to improve readability.

* Refactor content capture logic and enhance streaming tests for Anthropic instrumentation.

* unsetting the model.

* Remove instrumentation for Messages.stream() and refactor related code. Introduced MessageWrapper and StreamWrapper classes for telemetry handling. Updated tests to reflect changes in instrumentation behavior.

* Refactor Anthropic instrumentation: reorganize imports, enhance utility functions, and update wrapper classes for better clarity and maintainability. Removed unused code and improved type safety in utility functions. Updated tests to reflect changes in the instrumentation behavior.

* Add message extractors for Anthropic instrumentation.

* Refactor message extractors in Anthropic instrumentation: reorganize imports and streamline finish reason normalization for improved clarity and maintainability.

* Update test cassettes for Anthropic instrumentation: streamline request and response structures, enhance error handling scenarios, and ensure consistency in message formats across various test cases. Removed outdated data and improved clarity in test interactions.

* Enhance Anthropic instrumentation: update MessageWrapper and StreamWrapper to include content capture logic, improve type safety with explicit casting, and streamline test cases for better clarity. Added new test for streaming response attributes and refined existing tests to ensure consistency in message handling.

* Update test cassettes for Anthropic instrumentation: modify message IDs, timestamps, and token usage across various test cases. Refine content capture logic and ensure consistency in message formats, including adjustments to event data and headers for improved clarity and accuracy.

* Rename StreamWrapper to MessagesStreamWrapper and update references in code and tests

* Refactor type annotations in message extractors and wrappers for improved type safety. Replace 'Any' with 'object' in several function signatures and class attributes. Introduce logging for error handling in MessagesStreamWrapper to enhance instrumentation reliability.

* Enhance type annotations in message extractors and patch for improved clarity and safety. Update function signatures to use specific types instead of 'object', including changes to parameters in extract_params, get_input_messages, and get_system_instruction. Refactor messages_create to ensure correct type handling for streaming and non-streaming responses. Additionally, streamline message handling in MessagesStreamWrapper for better performance and reliability.

* Enhance type safety and error handling in message processing. Update function signatures in `messages_extractors.py` and `wrappers.py` to include specific types, improving clarity and reliability. Introduce handling for `None` values in `get_input_messages` and `get_system_instruction`. Refactor `MessagesStreamWrapper` to better manage usage updates and ensure correct type handling for streaming responses. Add new test cases for aggregating cache tokens and handling streaming errors.

* Refactor assertions in test_sync_messages.py for improved readability. Simplify assertion statements by removing unnecessary parentheses, enhancing code clarity in cache token tests.

* enforce strong typing system.

* Update anthropic dependency version to 0.51.0 in pyproject.toml and requirements.oldest.txt for compatibility improvements.

* Refactor usage token extraction to utilize a new UsageTokens dataclass for improved clarity and type safety. Update extract_usage_tokens function to return UsageTokens instead of a tuple, and adjust related invocations in MessageWrapper and MessagesStreamWrapper accordingly.

* Update anthropic dependency version in uv.lock to 0.51.0 for compatibility improvements.

* Add tests for should_capture_content function in test_events_options.py.

* Enhance Anthropic instrumentation by adding logging support and refining type hints in messages_create function. Update test cassettes for improved accuracy and consistency in response data.

* Refactor content capturing utility function to clarify its purpose in experimental mode. Update related tests to reflect the new function name and ensure accurate assertions for content capturing behavior.

* Refactor import statements in patch.py for improved readability and organization.

---------

Co-authored-by: Aaron Abbott <aaronabbott@google.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

Status: Done

Development

Successfully merging this pull request may close these issues.

Add OpenTelemetry instrumentation for the Anthropic Claude Python SDK