Skip to content

Commit fff1b8d

Browse files
committed
fix(groq): align instrumentation with OTel GenAI semconv v1.40.0
1 parent dcd5cb0 commit fff1b8d

File tree

13 files changed

+1963
-389
lines changed

13 files changed

+1963
-389
lines changed

packages/opentelemetry-instrumentation-groq/opentelemetry/instrumentation/groq/__init__.py

Lines changed: 107 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -35,9 +35,7 @@
3535
)
3636
from opentelemetry.semconv_ai import (
3737
SUPPRESS_LANGUAGE_MODEL_INSTRUMENTATION_KEY,
38-
LLMRequestTypeValues,
3938
Meters,
40-
SpanAttributes,
4139
)
4240
from opentelemetry.trace import SpanKind, Tracer, get_tracer
4341
from opentelemetry.trace.status import Status, StatusCode
@@ -49,21 +47,21 @@
4947

5048
_instruments = ("groq >= 0.9.0",)
5149

50+
_GROQ = GenAIAttributes.GenAiProviderNameValues.GROQ.value
51+
_CHAT = GenAIAttributes.GenAiOperationNameValues.CHAT.value
5252

5353
WRAPPED_METHODS = [
5454
{
5555
"package": "groq.resources.chat.completions",
5656
"object": "Completions",
5757
"method": "create",
58-
"span_name": "groq.chat",
5958
},
6059
]
6160
WRAPPED_AMETHODS = [
6261
{
6362
"package": "groq.resources.chat.completions",
6463
"object": "AsyncCompletions",
6564
"method": "create",
66-
"span_name": "groq.chat",
6765
},
6866
]
6967

@@ -125,53 +123,93 @@ def _create_metrics(meter: Meter):
125123

126124

127125
def _process_streaming_chunk(chunk):
128-
"""Extract content, finish_reason and usage from a streaming chunk."""
126+
"""Extract content, tool_calls_delta, finish_reasons and usage from a streaming chunk."""
129127
if not chunk.choices:
130-
return None, None, None
131-
132-
delta = chunk.choices[0].delta
133-
content = delta.content if hasattr(delta, "content") else None
134-
finish_reason = chunk.choices[0].finish_reason
128+
return None, [], [], None
129+
130+
content = ""
131+
tool_calls_delta = []
132+
finish_reasons = []
133+
for choice in chunk.choices:
134+
delta = choice.delta
135+
if hasattr(delta, "content") and delta.content:
136+
content += delta.content
137+
if hasattr(delta, "tool_calls") and delta.tool_calls:
138+
tool_calls_delta.extend(delta.tool_calls)
139+
if choice.finish_reason:
140+
finish_reasons.append(choice.finish_reason)
135141

136142
# Extract usage from x_groq if present in the final chunk
137143
usage = None
138144
if hasattr(chunk, "x_groq") and chunk.x_groq and chunk.x_groq.usage:
139145
usage = chunk.x_groq.usage
140146

141-
return content, finish_reason, usage
147+
return content, tool_calls_delta, finish_reasons, usage
142148

143149

144-
def _handle_streaming_response(
145-
span, accumulated_content, finish_reason, usage, event_logger
146-
):
147-
set_model_streaming_response_attributes(span, usage)
150+
def _accumulate_tool_calls(accumulated: dict, tool_calls_delta: list) -> dict:
151+
"""Merge a list of streaming tool_call delta objects into the accumulator dict.
152+
153+
The accumulator maps tool call index → {id, function: {name, arguments}}.
154+
Delta objects may be Pydantic models or dicts; arguments arrive as JSON fragments.
155+
"""
156+
for tc in tool_calls_delta:
157+
if isinstance(tc, dict):
158+
idx = tc.get("index", 0)
159+
tc_id = tc.get("id") or ""
160+
fn = tc.get("function") or {}
161+
fn_name = fn.get("name") or ""
162+
fn_args = fn.get("arguments") or ""
163+
else:
164+
idx = getattr(tc, "index", 0)
165+
tc_id = getattr(tc, "id", None) or ""
166+
fn = getattr(tc, "function", None)
167+
fn_name = (getattr(fn, "name", None) or "") if fn else ""
168+
fn_args = (getattr(fn, "arguments", None) or "") if fn else ""
169+
170+
if idx not in accumulated:
171+
accumulated[idx] = {"id": tc_id, "function": {"name": fn_name, "arguments": ""}}
172+
else:
173+
if tc_id:
174+
accumulated[idx]["id"] = tc_id
175+
if fn_name:
176+
accumulated[idx]["function"]["name"] = fn_name
177+
accumulated[idx]["function"]["arguments"] += fn_args
178+
return accumulated
179+
180+
181+
def _handle_streaming_response(span, accumulated_content, tool_calls, finish_reasons, usage, event_logger):
182+
# finish_reasons is a list; use first entry for message-level finish_reason
183+
finish_reason = finish_reasons[0] if finish_reasons else None
184+
set_model_streaming_response_attributes(span, usage, finish_reasons)
148185
if should_emit_events() and event_logger:
149-
emit_streaming_response_events(accumulated_content, finish_reason, event_logger)
186+
emit_streaming_response_events(accumulated_content, finish_reason, event_logger, tool_calls=tool_calls)
150187
else:
151-
set_streaming_response_attributes(
152-
span, accumulated_content, finish_reason, usage
153-
)
188+
set_streaming_response_attributes(span, accumulated_content, finish_reason, usage, tool_calls=tool_calls)
154189

155190

156191
def _create_stream_processor(response, span, event_logger):
157192
"""Create a generator that processes a stream while collecting telemetry."""
158193
accumulated_content = ""
159-
finish_reason = None
194+
accumulated_tool_calls: dict = {}
195+
accumulated_finish_reasons: list = []
160196
usage = None
161197

162198
for chunk in response:
163-
content, chunk_finish_reason, chunk_usage = _process_streaming_chunk(chunk)
199+
content, tool_calls_delta, chunk_finish_reasons, chunk_usage = _process_streaming_chunk(chunk)
164200
if content:
165201
accumulated_content += content
166-
if chunk_finish_reason:
167-
finish_reason = chunk_finish_reason
202+
if tool_calls_delta:
203+
_accumulate_tool_calls(accumulated_tool_calls, tool_calls_delta)
204+
for fr in chunk_finish_reasons:
205+
if fr not in accumulated_finish_reasons:
206+
accumulated_finish_reasons.append(fr)
168207
if chunk_usage:
169208
usage = chunk_usage
170209
yield chunk
171210

172-
_handle_streaming_response(
173-
span, accumulated_content, finish_reason, usage, event_logger
174-
)
211+
tool_calls = [accumulated_tool_calls[i] for i in sorted(accumulated_tool_calls)] or None
212+
_handle_streaming_response(span, accumulated_content, tool_calls, accumulated_finish_reasons, usage, event_logger)
175213

176214
if span.is_recording():
177215
span.set_status(Status(StatusCode.OK))
@@ -182,22 +220,25 @@ def _create_stream_processor(response, span, event_logger):
182220
async def _create_async_stream_processor(response, span, event_logger):
183221
"""Create an async generator that processes a stream while collecting telemetry."""
184222
accumulated_content = ""
185-
finish_reason = None
223+
accumulated_tool_calls: dict = {}
224+
accumulated_finish_reasons: list = []
186225
usage = None
187226

188227
async for chunk in response:
189-
content, chunk_finish_reason, chunk_usage = _process_streaming_chunk(chunk)
228+
content, tool_calls_delta, chunk_finish_reasons, chunk_usage = _process_streaming_chunk(chunk)
190229
if content:
191230
accumulated_content += content
192-
if chunk_finish_reason:
193-
finish_reason = chunk_finish_reason
231+
if tool_calls_delta:
232+
_accumulate_tool_calls(accumulated_tool_calls, tool_calls_delta)
233+
for fr in chunk_finish_reasons:
234+
if fr not in accumulated_finish_reasons:
235+
accumulated_finish_reasons.append(fr)
194236
if chunk_usage:
195237
usage = chunk_usage
196238
yield chunk
197239

198-
_handle_streaming_response(
199-
span, accumulated_content, finish_reason, usage, event_logger
200-
)
240+
tool_calls = [accumulated_tool_calls[i] for i in sorted(accumulated_tool_calls)] or None
241+
_handle_streaming_response(span, accumulated_content, tool_calls, accumulated_finish_reasons, usage, event_logger)
201242

202243
if span.is_recording():
203244
span.set_status(Status(StatusCode.OK))
@@ -240,13 +281,14 @@ def _wrap(
240281
):
241282
return wrapped(*args, **kwargs)
242283

243-
name = to_wrap.get("span_name")
284+
llm_model = kwargs.get("model", "")
244285
span = tracer.start_span(
245-
name,
286+
f"{_CHAT} {llm_model}",
246287
kind=SpanKind.CLIENT,
247288
attributes={
248-
GenAIAttributes.GEN_AI_SYSTEM: "groq",
249-
SpanAttributes.LLM_REQUEST_TYPE: LLMRequestTypeValues.COMPLETION.value,
289+
GenAIAttributes.GEN_AI_PROVIDER_NAME: _GROQ,
290+
GenAIAttributes.GEN_AI_OPERATION_NAME: _CHAT,
291+
GenAIAttributes.GEN_AI_REQUEST_MODEL: llm_model,
250292
},
251293
)
252294

@@ -263,6 +305,9 @@ def _wrap(
263305
duration = end_time - start_time
264306
duration_histogram.record(duration, attributes=attributes)
265307

308+
if span.is_recording():
309+
span.set_status(Status(StatusCode.ERROR))
310+
span.end()
266311
raise e
267312

268313
end_time = time.time()
@@ -322,13 +367,14 @@ async def _awrap(
322367
):
323368
return await wrapped(*args, **kwargs)
324369

325-
name = to_wrap.get("span_name")
370+
llm_model = kwargs.get("model", "")
326371
span = tracer.start_span(
327-
name,
372+
f"{_CHAT} {llm_model}",
328373
kind=SpanKind.CLIENT,
329374
attributes={
330-
GenAIAttributes.GEN_AI_SYSTEM: "groq",
331-
SpanAttributes.LLM_REQUEST_TYPE: LLMRequestTypeValues.COMPLETION.value,
375+
GenAIAttributes.GEN_AI_PROVIDER_NAME: _GROQ,
376+
GenAIAttributes.GEN_AI_OPERATION_NAME: _CHAT,
377+
GenAIAttributes.GEN_AI_REQUEST_MODEL: llm_model,
332378
},
333379
)
334380

@@ -346,13 +392,16 @@ async def _awrap(
346392
duration = end_time - start_time
347393
duration_histogram.record(duration, attributes=attributes)
348394

395+
if span.is_recording():
396+
span.set_status(Status(StatusCode.ERROR))
397+
span.end()
349398
raise e
350399

351400
end_time = time.time()
352401

353402
if is_streaming_response(response):
354403
try:
355-
return await _create_async_stream_processor(response, span, event_logger)
404+
return _create_async_stream_processor(response, span, event_logger)
356405
except Exception as ex:
357406
logger.warning(
358407
"Failed to process streaming response for groq span, error: %s",
@@ -362,16 +411,23 @@ async def _awrap(
362411
span.end()
363412
raise
364413
elif response:
365-
metric_attributes = shared_metrics_attributes(response)
414+
try:
415+
metric_attributes = shared_metrics_attributes(response)
366416

367-
if duration_histogram:
368-
duration = time.time() - start_time
369-
duration_histogram.record(
370-
duration,
371-
attributes=metric_attributes,
372-
)
417+
if duration_histogram:
418+
duration = time.time() - start_time
419+
duration_histogram.record(
420+
duration,
421+
attributes=metric_attributes,
422+
)
423+
424+
_handle_response(span, response, token_histogram, event_logger)
373425

374-
_handle_response(span, response, token_histogram, event_logger)
426+
except Exception as ex: # pylint: disable=broad-except
427+
logger.warning(
428+
"Failed to set response attributes for groq span, error: %s",
429+
str(ex),
430+
)
375431

376432
if span.is_recording():
377433
span.set_status(Status(StatusCode.OK))
@@ -424,9 +480,7 @@ def _instrument(self, **kwargs):
424480
event_logger = None
425481
if not Config.use_legacy_attributes:
426482
logger_provider = kwargs.get("logger_provider")
427-
event_logger = get_logger(
428-
__name__, __version__, logger_provider=logger_provider
429-
)
483+
event_logger = get_logger(__name__, __version__, logger_provider=logger_provider)
430484

431485
for wrapped_method in WRAPPED_METHODS:
432486
wrap_package = wrapped_method.get("package")

packages/opentelemetry-instrumentation-groq/opentelemetry/instrumentation/groq/event_emitter.py

Lines changed: 17 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
from opentelemetry._logs import Logger, LogRecord
66
from opentelemetry.instrumentation.groq.event_models import ChoiceEvent, MessageEvent
7+
from opentelemetry.instrumentation.groq.span_utils import _map_groq_finish_reason
78
from opentelemetry.instrumentation.groq.utils import (
89
dont_throw,
910
should_emit_events,
@@ -26,10 +27,7 @@ class Roles(Enum):
2627
VALID_MESSAGE_ROLES = {role.value for role in Roles}
2728
"""The valid roles for naming the message event."""
2829

29-
EVENT_ATTRIBUTES = {
30-
# Should be GenAIAttributes.GenAiSystemValues.GROQ.value but it's not defined in the opentelemetry-semconv package
31-
GenAIAttributes.GEN_AI_SYSTEM: "groq"
32-
}
30+
EVENT_ATTRIBUTES = {GenAIAttributes.GEN_AI_PROVIDER_NAME: GenAIAttributes.GenAiProviderNameValues.GROQ.value}
3331
"""The attributes to be used for the event."""
3432

3533

@@ -38,7 +36,9 @@ def emit_message_events(kwargs: dict, event_logger):
3836
for message in kwargs.get("messages", []):
3937
emit_event(
4038
MessageEvent(
41-
content=message.get("content"), role=message.get("role", "unknown")
39+
content=message.get("content"),
40+
role=message.get("role", "unknown"),
41+
tool_calls=message.get("tool_calls"),
4242
),
4343
event_logger=event_logger,
4444
)
@@ -54,30 +54,33 @@ def emit_choice_events(response: ChatCompletion, event_logger):
5454
"content": choice.message.content,
5555
"role": choice.message.role or "unknown",
5656
},
57-
finish_reason=choice.finish_reason,
57+
finish_reason=_map_groq_finish_reason(choice.finish_reason) or "",
58+
tool_calls=choice.message.tool_calls or None,
5859
),
5960
event_logger=event_logger,
6061
)
6162

6263

6364
@dont_throw
6465
def emit_streaming_response_events(
65-
accumulated_content: str, finish_reason: Union[str, None], event_logger
66+
accumulated_content: str,
67+
finish_reason: Union[str, None],
68+
event_logger,
69+
tool_calls=None,
6670
):
6771
"""Emit events for streaming response."""
6872
emit_event(
6973
ChoiceEvent(
7074
index=0,
7175
message={"content": accumulated_content, "role": "assistant"},
72-
finish_reason=finish_reason or "unknown",
76+
finish_reason=_map_groq_finish_reason(finish_reason) or "",
77+
tool_calls=tool_calls,
7378
),
7479
event_logger,
7580
)
7681

7782

78-
def emit_event(
79-
event: Union[MessageEvent, ChoiceEvent], event_logger: Union[Logger, None]
80-
) -> None:
83+
def emit_event(event: Union[MessageEvent, ChoiceEvent], event_logger: Union[Logger, None]) -> None:
8184
"""
8285
Emit an event to the OpenTelemetry SDK.
8386
@@ -119,11 +122,7 @@ def _emit_message_event(event: MessageEvent, event_logger: Logger) -> None:
119122
for tool_call in body["tool_calls"]:
120123
tool_call["function"].pop("arguments", None)
121124

122-
log_record = LogRecord(
123-
body=body,
124-
attributes=EVENT_ATTRIBUTES,
125-
event_name=name
126-
)
125+
log_record = LogRecord(body=body, attributes=EVENT_ATTRIBUTES, event_name=name)
127126
event_logger.emit(log_record)
128127

129128

@@ -139,14 +138,10 @@ def _emit_choice_event(event: ChoiceEvent, event_logger: Logger) -> None:
139138

140139
if not should_send_prompts():
141140
body["message"].pop("content", None)
141+
body["message"].pop("role", None)
142142
if body.get("tool_calls") is not None:
143143
for tool_call in body["tool_calls"]:
144144
tool_call["function"].pop("arguments", None)
145145

146-
log_record = LogRecord(
147-
body=body,
148-
attributes=EVENT_ATTRIBUTES,
149-
event_name="gen_ai.choice"
150-
151-
)
146+
log_record = LogRecord(body=body, attributes=EVENT_ATTRIBUTES, event_name="gen_ai.choice")
152147
event_logger.emit(log_record)

packages/opentelemetry-instrumentation-groq/opentelemetry/instrumentation/groq/event_models.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,5 +37,5 @@ class ChoiceEvent:
3737

3838
index: int
3939
message: CompletionMessage
40-
finish_reason: str = "unknown"
40+
finish_reason: str = ""
4141
tool_calls: Optional[List[ToolCall]] = None

0 commit comments

Comments
 (0)