6060
6161from __future__ import annotations
6262
63+ import logging
6364import timeit
6465from contextlib import contextmanager
6566from typing import Iterator , TypeVar
8384 _apply_embedding_finish_attributes ,
8485 _apply_error_attributes ,
8586 _apply_llm_finish_attributes ,
87+ _apply_workflow_finish_attributes ,
8688 _get_embedding_span_name ,
8789 _get_llm_span_name ,
90+ _get_workflow_span_name ,
8891 _maybe_emit_llm_event ,
8992)
9093from opentelemetry .util .genai .types import (
9194 EmbeddingInvocation ,
9295 Error ,
9396 GenAIInvocation ,
9497 LLMInvocation ,
98+ WorkflowInvocation ,
9599)
96100from opentelemetry .util .genai .version import __version__
97101
102+ _logger = logging .getLogger (__name__ )
103+
104+
105+ def _safe_detach (invocation : GenAIInvocation ) -> None :
106+ """Detach the context token if still present, as a safety net."""
107+ if invocation .context_token is not None :
108+ try :
109+ otel_context .detach (invocation .context_token )
110+ except Exception : # pylint: disable=broad-except
111+ pass
112+ if invocation .span is not None :
113+ try :
114+ invocation .span .end ()
115+ except Exception : # pylint: disable=broad-except
116+ pass
117+
118+
98119_T = TypeVar ("_T" , bound = GenAIInvocation )
99120
100121
@@ -160,13 +181,19 @@ def _start(self, invocation: _T) -> _T:
160181 """Start a GenAI invocation and create a pending span entry."""
161182 if isinstance (invocation , LLMInvocation ):
162183 span_name = _get_llm_span_name (invocation )
184+ kind = SpanKind .CLIENT
163185 elif isinstance (invocation , EmbeddingInvocation ):
164186 span_name = _get_embedding_span_name (invocation )
187+ kind = SpanKind .CLIENT
188+ elif isinstance (invocation , WorkflowInvocation ):
189+ span_name = _get_workflow_span_name (invocation )
190+ kind = SpanKind .CLIENT
165191 else :
166192 span_name = ""
193+ kind = ""
167194 span = self ._tracer .start_span (
168195 name = span_name ,
169- kind = SpanKind . CLIENT ,
196+ kind = kind ,
170197 )
171198 # Record a monotonic start timestamp (seconds) for duration
172199 # calculation using timeit.default_timer.
@@ -192,6 +219,9 @@ def _stop(self, invocation: _T) -> _T:
192219 elif isinstance (invocation , EmbeddingInvocation ):
193220 _apply_embedding_finish_attributes (span , invocation )
194221 self ._record_embedding_metrics (invocation , span )
222+ elif isinstance (invocation , WorkflowInvocation ):
223+ _apply_workflow_finish_attributes (span , invocation )
224+ # TODO: Add workflow metrics when supported
195225 finally :
196226 # Detach context and end span even if finishing fails
197227 otel_context .detach (invocation .context_token )
@@ -222,6 +252,10 @@ def _fail(self, invocation: _T, error: Error) -> _T:
222252 self ._record_embedding_metrics (
223253 invocation , span , error_type = error_type
224254 )
255+ elif isinstance (invocation , WorkflowInvocation ):
256+ _apply_workflow_finish_attributes (span , invocation )
257+ _apply_error_attributes (span , error , error_type )
258+ # TODO: Add workflow metrics when supported
225259 finally :
226260 # Detach context and end span even if finishing fails
227261 otel_context .detach (invocation .context_token )
@@ -304,6 +338,46 @@ def embedding(
304338 raise
305339 self .stop (invocation )
306340
341+ @contextmanager
342+ def workflow (
343+ self , invocation : WorkflowInvocation | None = None
344+ ) -> Iterator [WorkflowInvocation ]:
345+ """Context manager for Workflow invocations.
346+
347+ Only set data attributes on the invocation object, do not modify the span or context.
348+
349+ Starts the span on entry. On normal exit, finalizes the invocation and ends the span.
350+ If an exception occurs inside the context, marks the span as error, ends it, and
351+ re-raises the original exception.
352+ """
353+ if invocation is None :
354+ invocation = WorkflowInvocation ()
355+
356+ try :
357+ self .start (invocation )
358+ except Exception : # pylint: disable=broad-except
359+ _logger .warning (
360+ "Failed to start workflow telemetry" , exc_info = True
361+ )
362+
363+ try :
364+ yield invocation
365+ except Exception as exc :
366+ try :
367+ self .fail (invocation , Error (message = str (exc ), type = type (exc )))
368+ except Exception : # pylint: disable=broad-except
369+ _logger .warning (
370+ "Failed to record workflow failure" , exc_info = True
371+ )
372+ _safe_detach (invocation )
373+ raise
374+
375+ try :
376+ self .stop (invocation )
377+ except Exception : # pylint: disable=broad-except
378+ _logger .warning ("Failed to stop workflow telemetry" , exc_info = True )
379+ _safe_detach (invocation )
380+
307381
308382def get_telemetry_handler (
309383 tracer_provider : TracerProvider | None = None ,
0 commit comments