|
| 1 | +"""Utilities for extracting structured data from LlamaIndex raw responses.""" |
| 2 | + |
| 3 | +from dataclasses import dataclass |
| 4 | +from typing import Any, List, Optional |
| 5 | + |
| 6 | +from ._message_utils import map_finish_reason |
| 7 | + |
| 8 | +# Map LlamaIndex LLM class names to OTel well-known provider values. |
| 9 | +_PROVIDER_MAP = { |
| 10 | + "OpenAI": "openai", |
| 11 | + "AzureOpenAI": "azure.ai.openai", |
| 12 | + "Anthropic": "anthropic", |
| 13 | + "Cohere": "cohere", |
| 14 | + "Groq": "groq", |
| 15 | + "MistralAI": "mistral_ai", |
| 16 | + "Bedrock": "aws.bedrock", |
| 17 | + "Gemini": "gcp.gemini", |
| 18 | + "VertexAI": "gcp.vertex_ai", |
| 19 | + "DeepSeek": "deepseek", |
| 20 | + "Perplexity": "perplexity", |
| 21 | +} |
| 22 | + |
| 23 | + |
| 24 | +@dataclass |
| 25 | +class TokenUsage: |
| 26 | + input_tokens: Optional[int] = None |
| 27 | + output_tokens: Optional[int] = None |
| 28 | + total_tokens: Optional[int] = None |
| 29 | + |
| 30 | + |
| 31 | +def detect_provider_name(instance_or_class_name: Any) -> Optional[str]: |
| 32 | + """Detect OTel provider name from a LlamaIndex LLM instance or class name string. |
| 33 | +
|
| 34 | + Returns OTel well-known value if available, otherwise lowercase class name. |
| 35 | + Returns None if input is None. |
| 36 | + """ |
| 37 | + if instance_or_class_name is None: |
| 38 | + return None |
| 39 | + class_name = ( |
| 40 | + instance_or_class_name |
| 41 | + if isinstance(instance_or_class_name, str) |
| 42 | + else instance_or_class_name.__class__.__name__ |
| 43 | + ) |
| 44 | + return _PROVIDER_MAP.get(class_name, class_name.lower()) |
| 45 | + |
| 46 | + |
| 47 | +def extract_model_from_raw(raw: Any) -> Optional[str]: |
| 48 | + """Extract model name from raw LLM response (object or dict).""" |
| 49 | + if hasattr(raw, "model"): |
| 50 | + return raw.model |
| 51 | + if isinstance(raw, dict): |
| 52 | + return raw.get("model") |
| 53 | + return None |
| 54 | + |
| 55 | + |
| 56 | +def extract_response_id(raw: Any) -> Optional[str]: |
| 57 | + """Extract response ID from raw LLM response (object or dict).""" |
| 58 | + if hasattr(raw, "id"): |
| 59 | + return raw.id |
| 60 | + if isinstance(raw, dict): |
| 61 | + return raw.get("id") |
| 62 | + return None |
| 63 | + |
| 64 | + |
| 65 | +def extract_token_usage(raw: Any) -> TokenUsage: |
| 66 | + """Extract token usage from raw response. Handles OpenAI, Cohere, and dict formats.""" |
| 67 | + usage = _get_nested(raw, "usage") |
| 68 | + if usage: |
| 69 | + result = _extract_openai_usage(usage) |
| 70 | + if result.input_tokens is not None: |
| 71 | + return result |
| 72 | + |
| 73 | + meta = _get_nested(raw, "meta") |
| 74 | + if meta: |
| 75 | + return _extract_cohere_usage(meta) |
| 76 | + |
| 77 | + return TokenUsage() |
| 78 | + |
| 79 | + |
| 80 | +def _get_nested(obj: Any, key: str) -> Any: |
| 81 | + """Get a nested attribute or dict key from obj.""" |
| 82 | + val = getattr(obj, key, None) |
| 83 | + if val is not None: |
| 84 | + return val |
| 85 | + if isinstance(obj, dict): |
| 86 | + return obj.get(key) |
| 87 | + return None |
| 88 | + |
| 89 | + |
| 90 | +def _extract_openai_usage(usage: Any) -> TokenUsage: |
| 91 | + """Extract tokens from OpenAI-style usage object/dict.""" |
| 92 | + if hasattr(usage, "completion_tokens"): |
| 93 | + return TokenUsage( |
| 94 | + input_tokens=usage.prompt_tokens, |
| 95 | + output_tokens=usage.completion_tokens, |
| 96 | + total_tokens=usage.total_tokens, |
| 97 | + ) |
| 98 | + if isinstance(usage, dict): |
| 99 | + return TokenUsage( |
| 100 | + input_tokens=usage.get("prompt_tokens"), |
| 101 | + output_tokens=usage.get("completion_tokens"), |
| 102 | + total_tokens=usage.get("total_tokens"), |
| 103 | + ) |
| 104 | + return TokenUsage() |
| 105 | + |
| 106 | + |
| 107 | +def _extract_cohere_usage(meta: Any) -> TokenUsage: |
| 108 | + """Extract tokens from Cohere-style meta.tokens or meta.billed_units.""" |
| 109 | + tokens = _get_nested(meta, "tokens") |
| 110 | + if tokens: |
| 111 | + inp = _get_int(tokens, "input_tokens") |
| 112 | + out = _get_int(tokens, "output_tokens") |
| 113 | + if inp is not None: |
| 114 | + return TokenUsage(input_tokens=inp, output_tokens=out, total_tokens=_safe_sum(inp, out)) |
| 115 | + |
| 116 | + billed = _get_nested(meta, "billed_units") |
| 117 | + if billed: |
| 118 | + inp = _get_int(billed, "input_tokens") |
| 119 | + out = _get_int(billed, "output_tokens") |
| 120 | + if inp is not None: |
| 121 | + return TokenUsage(input_tokens=inp, output_tokens=out, total_tokens=_safe_sum(inp, out)) |
| 122 | + |
| 123 | + return TokenUsage() |
| 124 | + |
| 125 | + |
| 126 | +def _get_int(obj: Any, key: str) -> Optional[int]: |
| 127 | + """Get an integer attribute or dict key from obj.""" |
| 128 | + val = getattr(obj, key, None) |
| 129 | + if val is None and isinstance(obj, dict): |
| 130 | + val = obj.get(key) |
| 131 | + return int(val) if val is not None else None |
| 132 | + |
| 133 | + |
| 134 | +def _safe_sum(a: Optional[int], b: Optional[int]) -> Optional[int]: |
| 135 | + if a is not None and b is not None: |
| 136 | + return a + b |
| 137 | + return None |
| 138 | + |
| 139 | + |
| 140 | +def extract_finish_reasons(raw: Any) -> List[str]: |
| 141 | + """Extract and map finish reasons from raw LLM response. |
| 142 | +
|
| 143 | + Handles OpenAI choices[], Google Gemini candidates[], Anthropic stop_reason, |
| 144 | + Cohere finish_reason, and Ollama done_reason. |
| 145 | + Returns empty list if no finish reason found. |
| 146 | + """ |
| 147 | + if raw is None: |
| 148 | + return [] |
| 149 | + |
| 150 | + # OpenAI format: choices[].finish_reason |
| 151 | + choices = _get_nested(raw, "choices") |
| 152 | + if choices and isinstance(choices, (list, tuple)): |
| 153 | + reasons = _collect_finish_reasons_from_choices(choices) |
| 154 | + if reasons: |
| 155 | + return reasons |
| 156 | + |
| 157 | + # Google Gemini format: candidates[].finish_reason |
| 158 | + candidates = _get_nested(raw, "candidates") |
| 159 | + if candidates and isinstance(candidates, (list, tuple)): |
| 160 | + reasons = _collect_finish_reasons_from_candidates(candidates) |
| 161 | + if reasons: |
| 162 | + return reasons |
| 163 | + |
| 164 | + # Anthropic format: stop_reason |
| 165 | + stop_reason = _get_nested(raw, "stop_reason") |
| 166 | + if stop_reason and isinstance(stop_reason, str): |
| 167 | + mapped = map_finish_reason(stop_reason) |
| 168 | + if mapped: |
| 169 | + return [mapped] |
| 170 | + |
| 171 | + # Cohere / generic: finish_reason (direct attr or in meta) |
| 172 | + fr = _get_nested(raw, "finish_reason") |
| 173 | + if fr and isinstance(fr, str): |
| 174 | + mapped = map_finish_reason(fr) |
| 175 | + if mapped: |
| 176 | + return [mapped] |
| 177 | + |
| 178 | + # Ollama format: done_reason |
| 179 | + done_reason = _get_nested(raw, "done_reason") |
| 180 | + if done_reason and isinstance(done_reason, str): |
| 181 | + mapped = map_finish_reason(done_reason) |
| 182 | + if mapped: |
| 183 | + return [mapped] |
| 184 | + |
| 185 | + return [] |
| 186 | + |
| 187 | + |
| 188 | +def _collect_finish_reasons_from_choices(choices: Any) -> List[str]: |
| 189 | + """Collect mapped finish reasons from an OpenAI-style choices array.""" |
| 190 | + reasons = [] |
| 191 | + try: |
| 192 | + for choice in choices: |
| 193 | + fr = getattr(choice, "finish_reason", None) |
| 194 | + if fr is None and isinstance(choice, dict): |
| 195 | + fr = choice.get("finish_reason") |
| 196 | + mapped = map_finish_reason(fr) |
| 197 | + if mapped: |
| 198 | + reasons.append(mapped) |
| 199 | + except (TypeError, StopIteration): |
| 200 | + pass |
| 201 | + return reasons |
| 202 | + |
| 203 | + |
| 204 | +def _collect_finish_reasons_from_candidates(candidates: Any) -> List[str]: |
| 205 | + """Collect mapped finish reasons from a Google Gemini-style candidates array.""" |
| 206 | + reasons = [] |
| 207 | + try: |
| 208 | + for candidate in candidates: |
| 209 | + fr = getattr(candidate, "finish_reason", None) |
| 210 | + if fr is None and isinstance(candidate, dict): |
| 211 | + fr = candidate.get("finish_reason") |
| 212 | + # Gemini finish_reason may be an enum; convert to string name |
| 213 | + if fr is not None and not isinstance(fr, str): |
| 214 | + fr = fr.name if hasattr(fr, "name") else str(fr) |
| 215 | + mapped = map_finish_reason(fr) |
| 216 | + if mapped: |
| 217 | + reasons.append(mapped) |
| 218 | + except (TypeError, StopIteration): |
| 219 | + pass |
| 220 | + return reasons |
0 commit comments