Skip to content

Commit 4df92e2

Browse files
authored
Add response wrappers for OpenAI Responses API streams. (#4280)
* Add response wrappers for OpenAI Responses API streams. * Enhance docstrings for ResponseStreamWrapper and ResponseStreamManagerWrapper to include references to the OpenAI SDK source. This improves clarity on the functionality and origin of the wrappers. * Add wrappers for OpenAI Responses API streams and response stream managers in CHANGELOG.md * Refactor response handling in OpenAI response wrappers to improve event processing and error handling. Introduced conditional imports for response events and error types, ensuring compatibility with missing modules. Updated event processing logic to utilize a consolidated response events tuple. * Refactor OpenAI response wrappers to enhance error handling and event processing. Introduced context managers for safe instrumentation, improved import handling for response events, and streamlined response attribute extraction logic. * Add unit tests for ResponseStreamManagerWrapper to validate error handling and stream finalization behavior. * Remove unnecessary blank line in test_response_wrappers.py to improve code cleanliness. * Refactor event handling in ResponseStreamWrapper to simplify type checks for response events. Removed unnecessary conditional checks for event types, enhancing code clarity and maintainability. * Enhance ResponseStreamWrapper and ResponseStreamManagerWrapper with improved error handling and context management. Introduced ExitStack for better resource management and added NotImplementedError for unimplemented parse methods. Updated TODO comments for future refactoring with wrapt.ObjectProxy.
1 parent ff7f60e commit 4df92e2

3 files changed

Lines changed: 413 additions & 0 deletions

File tree

instrumentation-genai/opentelemetry-instrumentation-openai-v2/CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
1313
`OTEL_SEMCONV_STABILITY_OPT_IN` to `gen_ai_latest_experimental` to enable.
1414
Add dependency on `opentelemetry-util-genai` pypi package.
1515
([#3715](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3715))
16+
- Add wrappers for OpenAI Responses API streams and response stream managers
17+
([#4280](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/4280))
1618

1719
## Version 2.3b0 (2025-12-24)
1820

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,305 @@
1+
"""Wrappers for OpenAI Responses API streams and stream managers."""
2+
3+
from __future__ import annotations
4+
5+
import logging
6+
from contextlib import ExitStack, contextmanager
7+
from types import TracebackType
8+
from typing import TYPE_CHECKING, Callable, Generator, Generic, TypeVar
9+
10+
from opentelemetry.util.genai.handler import TelemetryHandler
11+
from opentelemetry.util.genai.types import Error, LLMInvocation
12+
13+
# OpenAI Responses internals are version-gated (added in openai>=1.66.0), so
14+
# pylint may not resolve them in all lint environments even though we guard
15+
# runtime usage with ImportError fallbacks below.
16+
try:
17+
from openai.lib.streaming.responses._events import ( # pylint: disable=no-name-in-module
18+
ResponseCompletedEvent,
19+
)
20+
from openai.types.responses import ( # pylint: disable=no-name-in-module
21+
ResponseCreatedEvent,
22+
ResponseErrorEvent,
23+
ResponseFailedEvent,
24+
ResponseIncompleteEvent,
25+
ResponseInProgressEvent,
26+
)
27+
28+
_RESPONSE_EVENTS_WITH_RESPONSE = (
29+
ResponseCreatedEvent,
30+
ResponseInProgressEvent,
31+
ResponseFailedEvent,
32+
ResponseIncompleteEvent,
33+
ResponseCompletedEvent,
34+
)
35+
except ImportError:
36+
ResponseCompletedEvent = None
37+
ResponseCreatedEvent = None
38+
ResponseErrorEvent = None
39+
ResponseFailedEvent = None
40+
ResponseIncompleteEvent = None
41+
ResponseInProgressEvent = None
42+
_RESPONSE_EVENTS_WITH_RESPONSE = ()
43+
44+
try:
45+
from opentelemetry.instrumentation.openai_v2.response_extractors import ( # pylint: disable=no-name-in-module
46+
_set_invocation_response_attributes,
47+
)
48+
except ImportError:
49+
_set_invocation_response_attributes = None
50+
51+
if TYPE_CHECKING:
52+
from openai.lib.streaming.responses._events import ( # pylint: disable=no-name-in-module
53+
ResponseStreamEvent,
54+
)
55+
from openai.lib.streaming.responses._responses import (
56+
ResponseStream,
57+
ResponseStreamManager,
58+
) # pylint: disable=no-name-in-module
59+
from openai.types.responses import ( # pylint: disable=no-name-in-module
60+
ParsedResponse,
61+
Response,
62+
)
63+
64+
_logger = logging.getLogger(__name__)
65+
TextFormatT = TypeVar("TextFormatT")
66+
ResponseT = TypeVar("ResponseT")
67+
68+
69+
def _set_response_attributes(
70+
invocation: "LLMInvocation",
71+
result: "ParsedResponse[TextFormatT] | Response | None",
72+
capture_content: bool,
73+
) -> None:
74+
if _set_invocation_response_attributes is None:
75+
return
76+
_set_invocation_response_attributes(invocation, result, capture_content)
77+
78+
79+
class _ResponseProxy(Generic[ResponseT]):
80+
def __init__(self, response: ResponseT, finalize: Callable[[], None]):
81+
self._response = response
82+
self._finalize = finalize
83+
84+
def close(self) -> None:
85+
try:
86+
self._response.close()
87+
finally:
88+
self._finalize()
89+
90+
def __getattr__(self, name: str):
91+
return getattr(self._response, name)
92+
93+
94+
class ResponseStreamWrapper(Generic[TextFormatT]):
95+
"""Wrapper for OpenAI Responses API stream objects.
96+
97+
Wraps ResponseStream from the OpenAI SDK:
98+
https://github.com/openai/openai-python/blob/656e3cab4a18262a49b961d41293367e45ee71b9/src/openai/_streaming.py#L55
99+
"""
100+
101+
def __init__(
102+
self,
103+
stream: "ResponseStream[TextFormatT]",
104+
handler: TelemetryHandler,
105+
invocation: "LLMInvocation",
106+
capture_content: bool,
107+
):
108+
self.stream = stream
109+
self.handler = handler
110+
self.invocation = invocation
111+
self._capture_content = capture_content
112+
self._finalized = False
113+
114+
def __enter__(self) -> "ResponseStreamWrapper":
115+
return self
116+
117+
def __exit__(
118+
self,
119+
exc_type: type[BaseException] | None,
120+
exc_val: BaseException | None,
121+
exc_tb: TracebackType | None,
122+
) -> bool:
123+
try:
124+
if exc_type is not None:
125+
self._fail(
126+
str(exc_val), type(exc_val) if exc_val else Exception
127+
)
128+
finally:
129+
self.close()
130+
return False
131+
132+
def close(self) -> None:
133+
try:
134+
self.stream.close()
135+
finally:
136+
self._stop(None)
137+
138+
def __iter__(self) -> "ResponseStreamWrapper":
139+
return self
140+
141+
def __next__(self) -> "ResponseStreamEvent[TextFormatT]":
142+
try:
143+
event = next(self.stream)
144+
except StopIteration:
145+
self._stop(None)
146+
raise
147+
except Exception as error:
148+
self._fail(str(error), type(error))
149+
raise
150+
with self._safe_instrumentation("event processing"):
151+
self.process_event(event)
152+
return event
153+
154+
def get_final_response(self) -> "ParsedResponse[TextFormatT]":
155+
self.until_done()
156+
return self.stream.get_final_response()
157+
158+
def until_done(self) -> "ResponseStreamWrapper":
159+
for _ in self:
160+
pass
161+
return self
162+
163+
def parse(self) -> "ResponseStreamWrapper":
164+
raise NotImplementedError(
165+
"ResponseStreamWrapper.parse() is not implemented"
166+
)
167+
168+
# TODO: Replace __getattr__ passthrough with wrapt.ObjectProxy in a future
169+
# cleanup once wrapt 2 typing support is available (wrapt PR #3903).
170+
def __getattr__(self, name: str):
171+
return getattr(self.stream, name)
172+
173+
@property
174+
def response(self):
175+
response = self.stream.response
176+
if response is None:
177+
return None
178+
return _ResponseProxy(response, lambda: self._stop(None))
179+
180+
def _stop(
181+
self, result: "ParsedResponse[TextFormatT] | Response | None"
182+
) -> None:
183+
if self._finalized:
184+
return
185+
with self._safe_instrumentation("response attribute extraction"):
186+
_set_response_attributes(
187+
self.invocation, result, self._capture_content
188+
)
189+
with self._safe_instrumentation("stop_llm"):
190+
self.handler.stop_llm(self.invocation)
191+
self._finalized = True
192+
193+
def _fail(self, message: str, error_type: type[BaseException]) -> None:
194+
if self._finalized:
195+
return
196+
with self._safe_instrumentation("fail_llm"):
197+
self.handler.fail_llm(
198+
self.invocation, Error(message=message, type=error_type)
199+
)
200+
self._finalized = True
201+
202+
@staticmethod
203+
@contextmanager
204+
def _safe_instrumentation(context: str) -> Generator[None, None, None]:
205+
try:
206+
yield
207+
except Exception: # pylint: disable=broad-exception-caught
208+
_logger.debug(
209+
"OpenAI responses instrumentation error during %s",
210+
context,
211+
exc_info=True,
212+
stacklevel=2,
213+
)
214+
215+
def process_event(self, event: "ResponseStreamEvent[TextFormatT]") -> None:
216+
event_type = event.type
217+
response: "ParsedResponse[TextFormatT] | Response | None" = None
218+
219+
if isinstance(event, _RESPONSE_EVENTS_WITH_RESPONSE):
220+
response = event.response
221+
222+
if response and not self.invocation.request_model:
223+
model = response.model
224+
if model:
225+
self.invocation.request_model = model
226+
227+
if isinstance(event, ResponseCompletedEvent):
228+
self._stop(response)
229+
return
230+
231+
if isinstance(event, (ResponseFailedEvent, ResponseIncompleteEvent)):
232+
with self._safe_instrumentation("response attribute extraction"):
233+
_set_response_attributes(
234+
self.invocation, response, self._capture_content
235+
)
236+
self._fail(event_type, RuntimeError)
237+
return
238+
239+
if isinstance(event, ResponseErrorEvent):
240+
error_type = event.code or "response.error"
241+
message = event.message or error_type
242+
self._fail(message, RuntimeError)
243+
244+
245+
class ResponseStreamManagerWrapper(Generic[TextFormatT]):
246+
"""Wrapper for OpenAI Responses API stream managers.
247+
248+
Wraps ResponseStreamManager from the OpenAI SDK:
249+
https://github.com/openai/openai-python/blob/656e3cab4a18262a49b961d41293367e45ee71b9/src/openai/lib/streaming/responses/_responses.py#L95
250+
"""
251+
252+
def __init__(
253+
self,
254+
manager: "ResponseStreamManager[TextFormatT]",
255+
handler: TelemetryHandler,
256+
invocation: "LLMInvocation",
257+
capture_content: bool,
258+
):
259+
self._manager = manager
260+
self._handler = handler
261+
self._invocation = invocation
262+
self._capture_content = capture_content
263+
self._stream_wrapper: ResponseStreamWrapper[TextFormatT] | None = None
264+
265+
def __enter__(self) -> ResponseStreamWrapper[TextFormatT]:
266+
stream = self._manager.__enter__()
267+
self._stream_wrapper = ResponseStreamWrapper(
268+
stream,
269+
self._handler,
270+
self._invocation,
271+
self._capture_content,
272+
)
273+
return self._stream_wrapper
274+
275+
def __exit__(
276+
self,
277+
exc_type: type[BaseException] | None,
278+
exc_val: BaseException | None,
279+
exc_tb: TracebackType | None,
280+
) -> bool:
281+
suppressed = False
282+
stream_wrapper = self._stream_wrapper
283+
self._stream_wrapper = None
284+
with ExitStack() as cleanup:
285+
if stream_wrapper is not None:
286+
287+
def finalize_stream_wrapper() -> None:
288+
if suppressed:
289+
stream_wrapper.__exit__(None, None, None)
290+
else:
291+
stream_wrapper.__exit__(exc_type, exc_val, exc_tb)
292+
293+
cleanup.callback(finalize_stream_wrapper)
294+
suppressed = self._manager.__exit__(exc_type, exc_val, exc_tb)
295+
return suppressed
296+
297+
def parse(self) -> "ResponseStreamManagerWrapper[TextFormatT]":
298+
raise NotImplementedError(
299+
"ResponseStreamManagerWrapper.parse() is not implemented"
300+
)
301+
302+
# TODO: Replace __getattr__ passthrough with wrapt.ObjectProxy in a future
303+
# cleanup once wrapt 2 typing support is available (wrapt PR #3903).
304+
def __getattr__(self, name: str):
305+
return getattr(self._manager, name)

0 commit comments

Comments
 (0)