|
| 1 | +# Copyright The OpenTelemetry Authors |
| 2 | +# |
| 3 | +# Licensed under the Apache License, Version 2.0 (the "License"); |
| 4 | +# you may not use this file except in compliance with the License. |
| 5 | +# You may obtain a copy of the License at |
| 6 | +# |
| 7 | +# http://www.apache.org/licenses/LICENSE-2.0 |
| 8 | +# |
| 9 | +# Unless required by applicable law or agreed to in writing, software |
| 10 | +# distributed under the License is distributed on an "AS IS" BASIS, |
| 11 | +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 12 | +# See the License for the specific language governing permissions and |
| 13 | +# limitations under the License. |
| 14 | + |
| 15 | +from __future__ import annotations |
| 16 | + |
| 17 | +import timeit |
| 18 | +from abc import ABC, abstractmethod |
| 19 | +from contextvars import Token |
| 20 | +from dataclasses import dataclass |
| 21 | +from typing import TYPE_CHECKING, Any, Type |
| 22 | + |
| 23 | +from typing_extensions import TypeAlias |
| 24 | + |
| 25 | +from opentelemetry._logs import Logger |
| 26 | +from opentelemetry.context import Context, attach, detach |
| 27 | +from opentelemetry.semconv.attributes import error_attributes |
| 28 | +from opentelemetry.trace import INVALID_SPAN as _INVALID_SPAN |
| 29 | +from opentelemetry.trace import Span, SpanKind, Tracer, set_span_in_context |
| 30 | +from opentelemetry.trace.status import Status, StatusCode |
| 31 | + |
| 32 | +if TYPE_CHECKING: |
| 33 | + from opentelemetry.util.genai.metrics import InvocationMetricsRecorder |
| 34 | + |
| 35 | +ContextToken: TypeAlias = Token[Context] |
| 36 | + |
| 37 | + |
| 38 | +@dataclass |
| 39 | +class Error: |
| 40 | + message: str |
| 41 | + type: Type[BaseException] |
| 42 | + |
| 43 | + |
| 44 | +class GenAIInvocation(ABC): |
| 45 | + """ |
| 46 | + Base class for all GenAI invocation types. Manages the lifecycle of a single |
| 47 | + GenAI operation (LLM call, embedding, tool execution, workflow, etc.). |
| 48 | +
|
| 49 | + Use the factory methods on TelemetryHandler (start_inference, start_embedding, |
| 50 | + start_workflow, start_tool) rather than constructing invocations directly. |
| 51 | + """ |
| 52 | + |
| 53 | + def __init__( |
| 54 | + self, |
| 55 | + # Individual components instead of TelemetryHandler to avoid a circular |
| 56 | + # import between handler.py and the invocation modules. |
| 57 | + tracer: Tracer, |
| 58 | + metrics_recorder: InvocationMetricsRecorder, |
| 59 | + logger: Logger, |
| 60 | + operation_name: str, |
| 61 | + span_name: str, |
| 62 | + span_kind: SpanKind = SpanKind.CLIENT, |
| 63 | + attributes: dict[str, Any] | None = None, |
| 64 | + metric_attributes: dict[str, Any] | None = None, |
| 65 | + ) -> None: |
| 66 | + self._tracer = tracer |
| 67 | + self._metrics_recorder = metrics_recorder |
| 68 | + self._logger = logger |
| 69 | + self._operation_name: str = operation_name |
| 70 | + self.attributes: dict[str, Any] = ( |
| 71 | + {} if attributes is None else attributes |
| 72 | + ) |
| 73 | + """Additional attributes to set on spans and/or events. Not set on metrics.""" |
| 74 | + self.metric_attributes: dict[str, Any] = ( |
| 75 | + {} if metric_attributes is None else metric_attributes |
| 76 | + ) |
| 77 | + """Additional attributes to set on metrics. Must be low cardinality. Not set on spans or events.""" |
| 78 | + self.span: Span = _INVALID_SPAN |
| 79 | + self._span_context: Context |
| 80 | + self._span_name: str = span_name |
| 81 | + self._span_kind: SpanKind = span_kind |
| 82 | + self._context_token: ContextToken | None = None |
| 83 | + self._monotonic_start_s: float | None = None |
| 84 | + |
| 85 | + def _start(self) -> None: |
| 86 | + """Start the invocation span and attach it to the current context.""" |
| 87 | + self.span = self._tracer.start_span( |
| 88 | + name=self._span_name, |
| 89 | + kind=self._span_kind, |
| 90 | + ) |
| 91 | + self._span_context = set_span_in_context(self.span) |
| 92 | + self._monotonic_start_s = timeit.default_timer() |
| 93 | + self._context_token = attach(self._span_context) |
| 94 | + |
| 95 | + def _get_metric_attributes(self) -> dict[str, Any]: |
| 96 | + """Return low-cardinality attributes for metric recording.""" |
| 97 | + return self.metric_attributes |
| 98 | + |
| 99 | + def _get_metric_token_counts(self) -> dict[str, int]: # pylint: disable=no-self-use |
| 100 | + """Return {token_type: count} for token histogram recording.""" |
| 101 | + return {} |
| 102 | + |
| 103 | + def _apply_error_attributes(self, error: Error) -> None: |
| 104 | + """Apply error status and error.type attribute to the span, events, and metrics.""" |
| 105 | + error_type = error.type.__qualname__ |
| 106 | + self.span.set_status(Status(StatusCode.ERROR, error.message)) |
| 107 | + self.attributes[error_attributes.ERROR_TYPE] = error_type |
| 108 | + self.metric_attributes[error_attributes.ERROR_TYPE] = error_type |
| 109 | + |
| 110 | + @abstractmethod |
| 111 | + def _apply_finish(self, error: Error | None = None) -> None: |
| 112 | + """Apply finish telemetry (attributes, metrics, events).""" |
| 113 | + |
| 114 | + def _finish(self, error: Error | None = None) -> None: |
| 115 | + """Apply finish telemetry and end the span.""" |
| 116 | + if self._context_token is None: |
| 117 | + return |
| 118 | + try: |
| 119 | + self._apply_finish(error) |
| 120 | + finally: |
| 121 | + try: |
| 122 | + detach(self._context_token) |
| 123 | + except Exception: # pylint: disable=broad-except |
| 124 | + pass |
| 125 | + self.span.end() |
| 126 | + |
| 127 | + def stop(self) -> None: |
| 128 | + """Finalize the invocation successfully and end its span.""" |
| 129 | + self._finish() |
| 130 | + |
| 131 | + def fail(self, error: Error | BaseException) -> None: |
| 132 | + """Fail the invocation and end its span with error status.""" |
| 133 | + if isinstance(error, BaseException): |
| 134 | + error = Error(type=type(error), message=str(error)) |
| 135 | + self._finish(error) |
0 commit comments