-
Notifications
You must be signed in to change notification settings - Fork 936
fix(groq): align instrumentation with OTel GenAI semconv v1.40.0 #4010
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 1 commit
fff1b8d
f2d08fa
e0a111c
f3431c2
e2ca494
84e336d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -35,9 +35,7 @@ | |
| ) | ||
| from opentelemetry.semconv_ai import ( | ||
| SUPPRESS_LANGUAGE_MODEL_INSTRUMENTATION_KEY, | ||
| LLMRequestTypeValues, | ||
| Meters, | ||
| SpanAttributes, | ||
| ) | ||
| from opentelemetry.trace import SpanKind, Tracer, get_tracer | ||
| from opentelemetry.trace.status import Status, StatusCode | ||
|
|
@@ -49,21 +47,21 @@ | |
|
|
||
| _instruments = ("groq >= 0.9.0",) | ||
|
|
||
| _GROQ = GenAIAttributes.GenAiProviderNameValues.GROQ.value | ||
| _CHAT = GenAIAttributes.GenAiOperationNameValues.CHAT.value | ||
|
|
||
| WRAPPED_METHODS = [ | ||
| { | ||
| "package": "groq.resources.chat.completions", | ||
| "object": "Completions", | ||
| "method": "create", | ||
| "span_name": "groq.chat", | ||
| }, | ||
| ] | ||
| WRAPPED_AMETHODS = [ | ||
| { | ||
| "package": "groq.resources.chat.completions", | ||
| "object": "AsyncCompletions", | ||
| "method": "create", | ||
| "span_name": "groq.chat", | ||
| }, | ||
| ] | ||
|
|
||
|
|
@@ -125,53 +123,93 @@ def _create_metrics(meter: Meter): | |
|
|
||
|
|
||
| def _process_streaming_chunk(chunk): | ||
| """Extract content, finish_reason and usage from a streaming chunk.""" | ||
| """Extract content, tool_calls_delta, finish_reasons and usage from a streaming chunk.""" | ||
| if not chunk.choices: | ||
| return None, None, None | ||
|
|
||
| delta = chunk.choices[0].delta | ||
| content = delta.content if hasattr(delta, "content") else None | ||
| finish_reason = chunk.choices[0].finish_reason | ||
| return None, [], [], None | ||
|
|
||
| content = "" | ||
| tool_calls_delta = [] | ||
| finish_reasons = [] | ||
| for choice in chunk.choices: | ||
| delta = choice.delta | ||
| if hasattr(delta, "content") and delta.content: | ||
| content += delta.content | ||
| if hasattr(delta, "tool_calls") and delta.tool_calls: | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same here
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. fixed |
||
| tool_calls_delta.extend(delta.tool_calls) | ||
| if choice.finish_reason: | ||
| finish_reasons.append(choice.finish_reason) | ||
|
|
||
| # Extract usage from x_groq if present in the final chunk | ||
| usage = None | ||
| if hasattr(chunk, "x_groq") and chunk.x_groq and chunk.x_groq.usage: | ||
| usage = chunk.x_groq.usage | ||
|
|
||
| return content, finish_reason, usage | ||
| return content, tool_calls_delta, finish_reasons, usage | ||
|
|
||
|
|
||
| def _handle_streaming_response( | ||
| span, accumulated_content, finish_reason, usage, event_logger | ||
| ): | ||
| set_model_streaming_response_attributes(span, usage) | ||
| def _accumulate_tool_calls(accumulated: dict, tool_calls_delta: list) -> dict: | ||
| """Merge a list of streaming tool_call delta objects into the accumulator dict. | ||
|
|
||
| The accumulator maps tool call index → {id, function: {name, arguments}}. | ||
| Delta objects may be Pydantic models or dicts; arguments arrive as JSON fragments. | ||
| """ | ||
| for tc in tool_calls_delta: | ||
| if isinstance(tc, dict): | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. When is the tool calls bit a dict?
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. fixed |
||
| idx = tc.get("index", 0) | ||
| tc_id = tc.get("id") or "" | ||
| fn = tc.get("function") or {} | ||
| fn_name = fn.get("name") or "" | ||
| fn_args = fn.get("arguments") or "" | ||
| else: | ||
| idx = getattr(tc, "index", 0) | ||
| tc_id = getattr(tc, "id", None) or "" | ||
| fn = getattr(tc, "function", None) | ||
| fn_name = (getattr(fn, "name", None) or "") if fn else "" | ||
| fn_args = (getattr(fn, "arguments", None) or "") if fn else "" | ||
|
|
||
| if idx not in accumulated: | ||
| accumulated[idx] = {"id": tc_id, "function": {"name": fn_name, "arguments": ""}} | ||
| else: | ||
| if tc_id: | ||
| accumulated[idx]["id"] = tc_id | ||
| if fn_name: | ||
| accumulated[idx]["function"]["name"] = fn_name | ||
| accumulated[idx]["function"]["arguments"] += fn_args | ||
| return accumulated | ||
|
|
||
|
|
||
| def _handle_streaming_response(span, accumulated_content, tool_calls, finish_reasons, usage, event_logger): | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You should state the type of each param in this function
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. fixed |
||
| # finish_reasons is a list; use first entry for message-level finish_reason | ||
| finish_reason = finish_reasons[0] if finish_reasons else None | ||
| set_model_streaming_response_attributes(span, usage, finish_reasons) | ||
| if should_emit_events() and event_logger: | ||
| emit_streaming_response_events(accumulated_content, finish_reason, event_logger) | ||
| emit_streaming_response_events(accumulated_content, finish_reason, event_logger, tool_calls=tool_calls) | ||
| else: | ||
| set_streaming_response_attributes( | ||
| span, accumulated_content, finish_reason, usage | ||
| ) | ||
| set_streaming_response_attributes(span, accumulated_content, finish_reason, usage, tool_calls=tool_calls) | ||
|
|
||
|
|
||
| def _create_stream_processor(response, span, event_logger): | ||
| """Create a generator that processes a stream while collecting telemetry.""" | ||
| accumulated_content = "" | ||
| finish_reason = None | ||
| accumulated_tool_calls: dict = {} | ||
| accumulated_finish_reasons: list = [] | ||
| usage = None | ||
|
|
||
| for chunk in response: | ||
| content, chunk_finish_reason, chunk_usage = _process_streaming_chunk(chunk) | ||
| content, tool_calls_delta, chunk_finish_reasons, chunk_usage = _process_streaming_chunk(chunk) | ||
| if content: | ||
| accumulated_content += content | ||
| if chunk_finish_reason: | ||
| finish_reason = chunk_finish_reason | ||
| if tool_calls_delta: | ||
| _accumulate_tool_calls(accumulated_tool_calls, tool_calls_delta) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You do not use the return value here..
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fixed: _accumulate_tool_calls now returns None — modifies the dict in-place, no return value needed. |
||
| for fr in chunk_finish_reasons: | ||
| if fr not in accumulated_finish_reasons: | ||
| accumulated_finish_reasons.append(fr) | ||
|
coderabbitai[bot] marked this conversation as resolved.
Outdated
|
||
| if chunk_usage: | ||
| usage = chunk_usage | ||
| yield chunk | ||
|
|
||
| _handle_streaming_response( | ||
| span, accumulated_content, finish_reason, usage, event_logger | ||
| ) | ||
| tool_calls = [accumulated_tool_calls[i] for i in sorted(accumulated_tool_calls)] or None | ||
| _handle_streaming_response(span, accumulated_content, tool_calls, accumulated_finish_reasons, usage, event_logger) | ||
|
|
||
| if span.is_recording(): | ||
| span.set_status(Status(StatusCode.OK)) | ||
|
|
@@ -182,22 +220,25 @@ def _create_stream_processor(response, span, event_logger): | |
| async def _create_async_stream_processor(response, span, event_logger): | ||
| """Create an async generator that processes a stream while collecting telemetry.""" | ||
| accumulated_content = "" | ||
| finish_reason = None | ||
| accumulated_tool_calls: dict = {} | ||
| accumulated_finish_reasons: list = [] | ||
| usage = None | ||
|
|
||
| async for chunk in response: | ||
| content, chunk_finish_reason, chunk_usage = _process_streaming_chunk(chunk) | ||
| content, tool_calls_delta, chunk_finish_reasons, chunk_usage = _process_streaming_chunk(chunk) | ||
| if content: | ||
| accumulated_content += content | ||
| if chunk_finish_reason: | ||
| finish_reason = chunk_finish_reason | ||
| if tool_calls_delta: | ||
| _accumulate_tool_calls(accumulated_tool_calls, tool_calls_delta) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same here - you do not use the return file
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fixed: _accumulate_tool_calls now returns None — modifies the dict in-place, no return value needed. |
||
| for fr in chunk_finish_reasons: | ||
| if fr not in accumulated_finish_reasons: | ||
| accumulated_finish_reasons.append(fr) | ||
| if chunk_usage: | ||
| usage = chunk_usage | ||
| yield chunk | ||
|
|
||
| _handle_streaming_response( | ||
| span, accumulated_content, finish_reason, usage, event_logger | ||
| ) | ||
| tool_calls = [accumulated_tool_calls[i] for i in sorted(accumulated_tool_calls)] or None | ||
| _handle_streaming_response(span, accumulated_content, tool_calls, accumulated_finish_reasons, usage, event_logger) | ||
|
|
||
| if span.is_recording(): | ||
| span.set_status(Status(StatusCode.OK)) | ||
|
|
@@ -240,13 +281,14 @@ def _wrap( | |
| ): | ||
| return wrapped(*args, **kwargs) | ||
|
|
||
| name = to_wrap.get("span_name") | ||
| llm_model = kwargs.get("model", "") | ||
| span = tracer.start_span( | ||
| name, | ||
| f"{_CHAT} {llm_model}", | ||
| kind=SpanKind.CLIENT, | ||
| attributes={ | ||
| GenAIAttributes.GEN_AI_SYSTEM: "groq", | ||
| SpanAttributes.LLM_REQUEST_TYPE: LLMRequestTypeValues.COMPLETION.value, | ||
| GenAIAttributes.GEN_AI_PROVIDER_NAME: _GROQ, | ||
| GenAIAttributes.GEN_AI_OPERATION_NAME: _CHAT, | ||
| GenAIAttributes.GEN_AI_REQUEST_MODEL: llm_model, | ||
| }, | ||
| ) | ||
|
|
||
|
|
@@ -263,6 +305,9 @@ def _wrap( | |
| duration = end_time - start_time | ||
| duration_histogram.record(duration, attributes=attributes) | ||
|
|
||
| if span.is_recording(): | ||
| span.set_status(Status(StatusCode.ERROR)) | ||
| span.end() | ||
| raise e | ||
|
|
||
| end_time = time.time() | ||
|
|
@@ -322,13 +367,14 @@ async def _awrap( | |
| ): | ||
| return await wrapped(*args, **kwargs) | ||
|
|
||
| name = to_wrap.get("span_name") | ||
| llm_model = kwargs.get("model", "") | ||
| span = tracer.start_span( | ||
| name, | ||
| f"{_CHAT} {llm_model}", | ||
| kind=SpanKind.CLIENT, | ||
| attributes={ | ||
| GenAIAttributes.GEN_AI_SYSTEM: "groq", | ||
| SpanAttributes.LLM_REQUEST_TYPE: LLMRequestTypeValues.COMPLETION.value, | ||
| GenAIAttributes.GEN_AI_PROVIDER_NAME: _GROQ, | ||
| GenAIAttributes.GEN_AI_OPERATION_NAME: _CHAT, | ||
| GenAIAttributes.GEN_AI_REQUEST_MODEL: llm_model, | ||
| }, | ||
| ) | ||
|
|
||
|
|
@@ -346,13 +392,16 @@ async def _awrap( | |
| duration = end_time - start_time | ||
| duration_histogram.record(duration, attributes=attributes) | ||
|
|
||
| if span.is_recording(): | ||
| span.set_status(Status(StatusCode.ERROR)) | ||
| span.end() | ||
| raise e | ||
|
|
||
| end_time = time.time() | ||
|
|
||
| if is_streaming_response(response): | ||
| try: | ||
| return await _create_async_stream_processor(response, span, event_logger) | ||
| return _create_async_stream_processor(response, span, event_logger) | ||
| except Exception as ex: | ||
| logger.warning( | ||
| "Failed to process streaming response for groq span, error: %s", | ||
|
|
@@ -362,16 +411,23 @@ async def _awrap( | |
| span.end() | ||
| raise | ||
| elif response: | ||
| metric_attributes = shared_metrics_attributes(response) | ||
| try: | ||
| metric_attributes = shared_metrics_attributes(response) | ||
|
|
||
| if duration_histogram: | ||
| duration = time.time() - start_time | ||
| duration_histogram.record( | ||
| duration, | ||
| attributes=metric_attributes, | ||
| ) | ||
| if duration_histogram: | ||
| duration = time.time() - start_time | ||
| duration_histogram.record( | ||
| duration, | ||
| attributes=metric_attributes, | ||
| ) | ||
|
|
||
| _handle_response(span, response, token_histogram, event_logger) | ||
|
|
||
| _handle_response(span, response, token_histogram, event_logger) | ||
| except Exception as ex: # pylint: disable=broad-except | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why do you need th pylint comment here?
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. not needed, removed |
||
| logger.warning( | ||
| "Failed to set response attributes for groq span, error: %s", | ||
| str(ex), | ||
| ) | ||
|
|
||
| if span.is_recording(): | ||
| span.set_status(Status(StatusCode.OK)) | ||
|
|
@@ -424,9 +480,7 @@ def _instrument(self, **kwargs): | |
| event_logger = None | ||
| if not Config.use_legacy_attributes: | ||
| logger_provider = kwargs.get("logger_provider") | ||
| event_logger = get_logger( | ||
| __name__, __version__, logger_provider=logger_provider | ||
| ) | ||
| event_logger = get_logger(__name__, __version__, logger_provider=logger_provider) | ||
|
|
||
| for wrapped_method in WRAPPED_METHODS: | ||
| wrap_package = wrapped_method.get("package") | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isnt this hasattr(delta, "content") the same as delta.content?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed