|
63 | 63 |
|
64 | 64 | from __future__ import annotations |
65 | 65 |
|
| 66 | +import logging |
66 | 67 | import timeit |
67 | 68 | from contextlib import contextmanager |
68 | 69 | from typing import Iterator, TypeVar |
|
88 | 89 | _apply_error_attributes, |
89 | 90 | _apply_llm_finish_attributes, |
90 | 91 | _apply_tool_call_attributes, |
| 92 | + _apply_workflow_finish_attributes, |
91 | 93 | _finish_tool_call_span, |
92 | 94 | _get_embedding_span_name, |
93 | 95 | _get_llm_span_name, |
94 | 96 | _get_tool_call_span_name, |
| 97 | + _get_workflow_span_name, |
95 | 98 | _maybe_emit_llm_event, |
96 | 99 | ) |
97 | 100 | from opentelemetry.util.genai.types import ( |
|
100 | 103 | GenAIInvocation, |
101 | 104 | LLMInvocation, |
102 | 105 | ToolCall, |
| 106 | + WorkflowInvocation, |
103 | 107 | ) |
104 | 108 | from opentelemetry.util.genai.version import __version__ |
105 | 109 |
|
| 110 | +_logger = logging.getLogger(__name__) |
| 111 | + |
| 112 | + |
| 113 | +def _safe_detach(invocation: GenAIInvocation) -> None: |
| 114 | + """Detach the context token if still present, as a safety net.""" |
| 115 | + if invocation.context_token is not None: |
| 116 | + try: |
| 117 | + otel_context.detach(invocation.context_token) |
| 118 | + except Exception: # pylint: disable=broad-except |
| 119 | + pass |
| 120 | + if invocation.span is not None: |
| 121 | + try: |
| 122 | + invocation.span.end() |
| 123 | + except Exception: # pylint: disable=broad-except |
| 124 | + pass |
| 125 | + |
| 126 | + |
106 | 127 | _T = TypeVar("_T", bound=GenAIInvocation) |
107 | 128 |
|
108 | 129 |
|
@@ -158,20 +179,24 @@ def _record_metrics( |
158 | 179 |
|
159 | 180 | def _start(self, invocation: _T) -> _T: |
160 | 181 | """Start a GenAI invocation and create a pending span entry.""" |
161 | | - span_kind = SpanKind.CLIENT |
162 | 182 | if isinstance(invocation, LLMInvocation): |
163 | 183 | span_name = _get_llm_span_name(invocation) |
| 184 | + kind = SpanKind.CLIENT |
164 | 185 | elif isinstance(invocation, EmbeddingInvocation): |
165 | 186 | span_name = _get_embedding_span_name(invocation) |
| 187 | + kind = SpanKind.CLIENT |
166 | 188 | elif isinstance(invocation, ToolCall): |
167 | 189 | span_name = _get_tool_call_span_name(invocation) |
168 | | - span_kind = SpanKind.INTERNAL |
| 190 | + kind = SpanKind.INTERNAL |
| 191 | + elif isinstance(invocation, WorkflowInvocation): |
| 192 | + span_name = _get_workflow_span_name(invocation) |
| 193 | + kind = SpanKind.INTERNAL |
169 | 194 | else: |
170 | 195 | span_name = "" |
171 | | - |
| 196 | + kind = SpanKind.CLIENT |
172 | 197 | span = self._tracer.start_span( |
173 | 198 | name=span_name, |
174 | | - kind=span_kind, |
| 199 | + kind=kind, |
175 | 200 | ) |
176 | 201 | if isinstance(invocation, ToolCall): |
177 | 202 | _apply_tool_call_attributes( |
@@ -203,6 +228,9 @@ def _stop(self, invocation: _T) -> _T: |
203 | 228 | elif isinstance(invocation, ToolCall): |
204 | 229 | _finish_tool_call_span(span, invocation, capture_content=True) |
205 | 230 | self._record_metrics(invocation, span) |
| 231 | + elif isinstance(invocation, WorkflowInvocation): |
| 232 | + _apply_workflow_finish_attributes(span, invocation) |
| 233 | + # TODO: Add workflow metrics when supported |
206 | 234 | finally: |
207 | 235 | # Detach context and end span even if finishing fails |
208 | 236 | otel_context.detach(invocation.context_token) |
@@ -234,6 +262,10 @@ def _fail(self, invocation: _T, error: Error) -> _T: |
234 | 262 | _finish_tool_call_span(span, invocation, capture_content=True) |
235 | 263 | self._record_metrics(invocation, span, error_type=error_type) |
236 | 264 | span.set_status(Status(StatusCode.ERROR, error.message)) |
| 265 | + elif isinstance(invocation, WorkflowInvocation): |
| 266 | + _apply_workflow_finish_attributes(span, invocation) |
| 267 | + _apply_error_attributes(span, error, error_type) |
| 268 | + # TODO: Add workflow metrics when supported |
237 | 269 | finally: |
238 | 270 | # Detach context and end span even if finishing fails |
239 | 271 | otel_context.detach(invocation.context_token) |
@@ -347,6 +379,46 @@ def embedding( |
347 | 379 | raise |
348 | 380 | self.stop(invocation) |
349 | 381 |
|
| 382 | + @contextmanager |
| 383 | + def workflow( |
| 384 | + self, invocation: WorkflowInvocation | None = None |
| 385 | + ) -> Iterator[WorkflowInvocation]: |
| 386 | + """Context manager for Workflow invocations. |
| 387 | +
|
| 388 | + Only set data attributes on the invocation object, do not modify the span or context. |
| 389 | +
|
| 390 | + Starts the span on entry. On normal exit, finalizes the invocation and ends the span. |
| 391 | + If an exception occurs inside the context, marks the span as error, ends it, and |
| 392 | + re-raises the original exception. |
| 393 | + """ |
| 394 | + if invocation is None: |
| 395 | + invocation = WorkflowInvocation() |
| 396 | + |
| 397 | + try: |
| 398 | + self.start(invocation) |
| 399 | + except Exception: # pylint: disable=broad-except |
| 400 | + _logger.warning( |
| 401 | + "Failed to start workflow telemetry", exc_info=True |
| 402 | + ) |
| 403 | + |
| 404 | + try: |
| 405 | + yield invocation |
| 406 | + except Exception as exc: |
| 407 | + try: |
| 408 | + self.fail(invocation, Error(message=str(exc), type=type(exc))) |
| 409 | + except Exception: # pylint: disable=broad-except |
| 410 | + _logger.warning( |
| 411 | + "Failed to record workflow failure", exc_info=True |
| 412 | + ) |
| 413 | + _safe_detach(invocation) |
| 414 | + raise |
| 415 | + |
| 416 | + try: |
| 417 | + self.stop(invocation) |
| 418 | + except Exception: # pylint: disable=broad-except |
| 419 | + _logger.warning("Failed to stop workflow telemetry", exc_info=True) |
| 420 | + _safe_detach(invocation) |
| 421 | + |
350 | 422 |
|
351 | 423 | def get_telemetry_handler( |
352 | 424 | tracer_provider: TracerProvider | None = None, |
|
0 commit comments