diff --git a/.changeset/activity-observers.md b/.changeset/activity-observers.md new file mode 100644 index 000000000..5e68fa6a9 --- /dev/null +++ b/.changeset/activity-observers.md @@ -0,0 +1,7 @@ +--- +'@tanstack/ai': minor +--- + +Add an activity-agnostic observability hook for non-chat activities (#720). The media activities — `generateImage`, `generateVideo`, `generateAudio`, `generateSpeech`, and `generateTranscription` — now accept an `observers` option taking lightweight `ActivityObserver`s (`onStart` / `onFinish` / `onError`, payload discriminated by `activity`). Observers are awaited in order and strictly non-fatal — a throwing observer is logged and skipped, never breaking the activity. + +Ships `otelObserver()` on the new `@tanstack/ai/observability` subpath: it emits one `gen_ai.*` span per activity call, tagged with the correct `gen_ai.operation.name` (`image_generation`, `video_generation`, `audio_generation`, `text_to_speech`, `transcription`), and reuses the same `gen_ai.usage.*` attribute set as `otelMiddleware` — now including `tanstack.ai.usage.units_billed` for unit-billed media. With a `Meter` it also records the `gen_ai.client.operation.duration` histogram per activity. The `ActivityObserver` types are exported from the package root, while the `otelObserver` value lives on the subpath so importing `@tanstack/ai` never requires the optional `@opentelemetry/api` peer. diff --git a/.changeset/otel-full-usage-emission.md b/.changeset/otel-full-usage-emission.md new file mode 100644 index 000000000..bda815e31 --- /dev/null +++ b/.changeset/otel-full-usage-emission.md @@ -0,0 +1,5 @@ +--- +'@tanstack/ai': minor +--- + +`otelMiddleware` now emits the rest of the reported `TokenUsage` on spans instead of only input/output tokens (#721). When the provider reports them, spans carry `gen_ai.usage.total_tokens`, `gen_ai.usage.cost` (provider-reported cost — cache discounts and gateway markup included, so backends like PostHog no longer re-derive cost from price tables), the official semconv cache/reasoning breakdowns (`gen_ai.usage.cache_read.input_tokens`, `gen_ai.usage.cache_creation.input_tokens`, `gen_ai.usage.reasoning.output_tokens`), and TanStack-namespaced attributes for duration-based billing (`tanstack.ai.usage.duration_seconds`) and the upstream cost split (`tanstack.ai.usage.upstream_cost` / `upstream_input_cost` / `upstream_output_cost`). All attributes are guarded — spans stay unchanged when a provider doesn't report a field. Media-oriented fields (`unitsBilled`, per-modality token breakdowns) and the provider-shaped `providerUsageDetails` bag are intentionally not emitted; media-activity observability is tracked in #720. diff --git a/docs/advanced/otel.md b/docs/advanced/otel.md index cf5f3ac2c..0eeb1051d 100644 --- a/docs/advanced/otel.md +++ b/docs/advanced/otel.md @@ -72,6 +72,15 @@ Iteration spans are numbered (`#0`, `#1`, ...) so distinct iterations of the sam | iteration | `gen_ai.request.max_tokens` | from config | | iteration | `gen_ai.usage.input_tokens` | per iteration | | iteration | `gen_ai.usage.output_tokens` | per iteration | +| root / iteration | `gen_ai.usage.total_tokens` | provider-reported total | +| root / iteration | `gen_ai.usage.cost` | provider-reported cost, when available | +| root / iteration | `gen_ai.usage.cache_read.input_tokens` | cached prompt tokens, when reported | +| root / iteration | `gen_ai.usage.cache_creation.input_tokens` | cache-write prompt tokens, when reported | +| root / iteration | `gen_ai.usage.reasoning.output_tokens` | reasoning/thinking tokens, when reported | +| root / iteration | `tanstack.ai.usage.duration_seconds` | duration-based billing (e.g. transcription), when reported | +| root / iteration | `tanstack.ai.usage.upstream_cost` | gateway upstream cost (e.g. OpenRouter), when reported | +| root / iteration | `tanstack.ai.usage.upstream_input_cost` | upstream input cost split, when reported | +| root / iteration | `tanstack.ai.usage.upstream_output_cost` | upstream output cost split, when reported | | iteration | `gen_ai.response.finish_reasons` | `[stop]`, `[tool_calls]`, ... | | root | `gen_ai.usage.input_tokens` | rolled up | | root | `gen_ai.usage.output_tokens` | rolled up | @@ -81,6 +90,8 @@ Iteration spans are numbered (`#0`, `#1`, ...) so distinct iterations of the sam | tool | `gen_ai.tool.type` | `function` | | tool | `tanstack.ai.tool.outcome` | `success` / `error` | +Usage attributes beyond input/output tokens are emitted only when the provider reports them, so spans stay clean otherwise. Cache and reasoning breakdowns use the official GenAI semconv names; `gen_ai.usage.cost` and `gen_ai.usage.total_tokens` are de-facto extensions consumed directly by backends like PostHog — without them, backends re-derive cost from their own price tables and lose cache discounts and gateway markup. Fields with no established convention (duration-based billing, the upstream cost split) are TanStack-namespaced. + ### Metrics Two GenAI-standard histograms: @@ -164,6 +175,42 @@ otelMiddleware({ }) ``` +## Beyond chat: media activities + +`otelMiddleware` covers `chat()`. The media activities — `generateImage`, `generateVideo`, `generateAudio`, `generateSpeech`, and `generateTranscription` — are single request → response (or submit → poll for video), so instead of the chat middleware pipeline they take a lighter **observer**. Pass `otelObserver()` (from the `@tanstack/ai/observability` subpath) on the activity's `observers` option to emit one span per call: + +```ts +import { generateImage } from '@tanstack/ai' +import { otelObserver } from '@tanstack/ai/observability' +import { openaiImage } from '@tanstack/ai-openai' +import { trace, metrics } from '@opentelemetry/api' + +const observer = otelObserver({ + tracer: trace.getTracer('my-app'), + meter: metrics.getMeter('my-app'), +}) + +const result = await generateImage({ + adapter: openaiImage('gpt-image-2'), + prompt: 'A serene mountain landscape at sunset', + observers: [observer], +}) +``` + +Each call produces one `CLIENT` span tagged with the activity's `gen_ai.operation.name`: + +| Activity | `gen_ai.operation.name` | +| --- | --- | +| `generateImage` | `image_generation` | +| `generateVideo` | `video_generation` | +| `generateAudio` | `audio_generation` | +| `generateSpeech` | `text_to_speech` | +| `generateTranscription` | `transcription` | + +The span carries `gen_ai.system` and `gen_ai.request.model` at start and, on finish, the same `gen_ai.usage.*` / `tanstack.ai.usage.*` attributes documented above — including `tanstack.ai.usage.units_billed` for unit-billed media. When a `Meter` is supplied it records the `gen_ai.client.operation.duration` histogram, tagged per activity. For streaming video the span covers the full create → poll → complete lifecycle; for non-streaming `generateVideo` it covers job submission. + +`otelObserver` supports the same `spanNameFormatter` and `attributeEnricher` extension points. For a custom backend, implement the `ActivityObserver` contract (`onStart` / `onFinish` / `onError`) directly — its event payload is discriminated by `activity`. The observer types are exported from the package root; the `otelObserver` value lives on the `@tanstack/ai/observability` subpath so importing `@tanstack/ai` never requires the optional `@opentelemetry/api` peer. + ## Related - [Middleware](./middleware) — the lifecycle this middleware hooks into diff --git a/docs/config.json b/docs/config.json index e3fc3b712..83ad44c2f 100644 --- a/docs/config.json +++ b/docs/config.json @@ -280,7 +280,8 @@ { "label": "OpenTelemetry", "to": "advanced/otel", - "addedAt": "2026-05-08" + "addedAt": "2026-05-08", + "updatedAt": "2026-06-15" } ] }, diff --git a/packages/ai/package.json b/packages/ai/package.json index 1ffd3b4f8..13185fc00 100644 --- a/packages/ai/package.json +++ b/packages/ai/package.json @@ -33,6 +33,10 @@ "types": "./dist/esm/middlewares/otel.d.ts", "import": "./dist/esm/middlewares/otel.js" }, + "./observability": { + "types": "./dist/esm/observability/index.d.ts", + "import": "./dist/esm/observability/index.js" + }, "./adapter-internals": { "types": "./dist/esm/adapter-internals.d.ts", "import": "./dist/esm/adapter-internals.js" diff --git a/packages/ai/src/activities/generateAudio/index.ts b/packages/ai/src/activities/generateAudio/index.ts index c377726ab..eb2545b84 100644 --- a/packages/ai/src/activities/generateAudio/index.ts +++ b/packages/ai/src/activities/generateAudio/index.ts @@ -8,8 +8,14 @@ import { aiEventClient } from '@tanstack/ai-event-client' import { streamGenerationResult } from '../stream-generation-result.js' import { resolveDebugOption } from '../../logger/resolve' +import { + notifyObserverError, + notifyObserverFinish, + notifyObserverStart, +} from '../../observability/notify' import type { InternalLogger } from '../../logger/internal-logger' import type { DebugOption } from '../../logger/types' +import type { ActivityObserver } from '../../observability/types' import type { AudioAdapter } from './adapter' import type { AudioGenerationResult, StreamChunk } from '../../types' @@ -70,6 +76,12 @@ export interface AudioActivityOptions< * control and/or a custom `Logger`. */ debug?: DebugOption + /** + * Observability hooks notified on start, success, and error. Pass + * `otelObserver()` to emit OpenTelemetry spans, or implement the + * `ActivityObserver` contract for a custom backend. + */ + observers?: Array } // =========================== @@ -135,7 +147,7 @@ async function runGenerateAudio< >( options: AudioActivityOptions, ): Promise { - const { adapter, stream: _stream, debug: _debug, ...rest } = options + const { adapter, stream: _stream, debug: _debug, observers, ...rest } = options const model = adapter.model const requestId = createId('audio') const startTime = Date.now() @@ -145,6 +157,18 @@ async function runGenerateAudio< (adapter as { name?: string }).name ?? 'unknown' + await notifyObserverStart( + observers, + { + activity: 'audio', + requestId, + provider: adapter.name, + model, + modelOptions: rest.modelOptions, + }, + logger, + ) + aiEventClient.emit('audio:request:started', { requestId, provider: adapter.name, @@ -189,6 +213,19 @@ async function runGenerateAudio< audioDuration: result.audio.duration, }) + await notifyObserverFinish( + observers, + { + activity: 'audio', + requestId, + provider: adapter.name, + model, + durationMs: elapsedMs, + usage: result.usage, + }, + logger, + ) + return result } catch (error) { const elapsedMs = Date.now() - startTime @@ -202,6 +239,18 @@ async function runGenerateAudio< modelOptions: rest.modelOptions as Record | undefined, timestamp: Date.now(), }) + await notifyObserverError( + observers, + { + activity: 'audio', + requestId, + provider: adapter.name, + model, + durationMs: elapsedMs, + error, + }, + logger, + ) logger.errors('generateAudio activity failed', { error, source: 'generateAudio', diff --git a/packages/ai/src/activities/generateImage/index.ts b/packages/ai/src/activities/generateImage/index.ts index b8d173b09..d0fd7608a 100644 --- a/packages/ai/src/activities/generateImage/index.ts +++ b/packages/ai/src/activities/generateImage/index.ts @@ -8,8 +8,14 @@ import { aiEventClient } from '@tanstack/ai-event-client' import { streamGenerationResult } from '../stream-generation-result.js' import { resolveDebugOption } from '../../logger/resolve' +import { + notifyObserverError, + notifyObserverFinish, + notifyObserverStart, +} from '../../observability/notify' import type { InternalLogger } from '../../logger/internal-logger' import type { DebugOption } from '../../logger/types' +import type { ActivityObserver } from '../../observability/types' import type { ImageAdapter } from './adapter' import type { ImageGenerationResult, StreamChunk } from '../../types' @@ -92,6 +98,12 @@ export type ImageActivityOptions< * control and/or a custom `Logger`. */ debug?: DebugOption + /** + * Observability hooks notified on start, success, and error. Pass + * `otelObserver()` to emit OpenTelemetry spans, or implement the + * `ActivityObserver` contract for a custom backend. + */ + observers?: Array } & ({} extends ImageProviderOptionsForModel ? { /** Provider-specific options for image generation */ modelOptions?: ImageProviderOptionsForModel< @@ -197,12 +209,24 @@ async function runGenerateImage< >( options: ImageActivityOptions, ): Promise { - const { adapter, stream: _stream, debug: _debug, ...rest } = options + const { adapter, stream: _stream, debug: _debug, observers, ...rest } = options const model = adapter.model const requestId = createId('image') const startTime = Date.now() const logger: InternalLogger = resolveDebugOption(options.debug) + await notifyObserverStart( + observers, + { + activity: 'image', + requestId, + provider: adapter.name, + model, + modelOptions: rest.modelOptions, + }, + logger, + ) + aiEventClient.emit('image:request:started', { requestId, provider: adapter.name, @@ -255,8 +279,33 @@ async function runGenerateImage< count: result.images.length, }) + await notifyObserverFinish( + observers, + { + activity: 'image', + requestId, + provider: adapter.name, + model, + durationMs: duration, + usage: result.usage, + }, + logger, + ) + return result } catch (error) { + await notifyObserverError( + observers, + { + activity: 'image', + requestId, + provider: adapter.name, + model, + durationMs: Date.now() - startTime, + error, + }, + logger, + ) logger.errors('generateImage activity failed', { error, source: 'generateImage', diff --git a/packages/ai/src/activities/generateSpeech/index.ts b/packages/ai/src/activities/generateSpeech/index.ts index 9512b3299..b2583d6ce 100644 --- a/packages/ai/src/activities/generateSpeech/index.ts +++ b/packages/ai/src/activities/generateSpeech/index.ts @@ -8,8 +8,14 @@ import { aiEventClient } from '@tanstack/ai-event-client' import { streamGenerationResult } from '../stream-generation-result.js' import { resolveDebugOption } from '../../logger/resolve' +import { + notifyObserverError, + notifyObserverFinish, + notifyObserverStart, +} from '../../observability/notify' import type { InternalLogger } from '../../logger/internal-logger' import type { DebugOption } from '../../logger/types' +import type { ActivityObserver } from '../../observability/types' import type { TTSAdapter } from './adapter' import type { StreamChunk, TTSResult } from '../../types' @@ -73,6 +79,12 @@ export interface TTSActivityOptions< * control and/or a custom `Logger`. */ debug?: DebugOption + /** + * Observability hooks notified on start, success, and error. Pass + * `otelObserver()` to emit OpenTelemetry spans, or implement the + * `ActivityObserver` contract for a custom backend. + */ + observers?: Array } // =========================== @@ -143,7 +155,7 @@ export function generateSpeech< async function runGenerateSpeech< TAdapter extends TTSAdapter>, >(options: TTSActivityOptions): Promise { - const { adapter, stream: _stream, debug: _debug, ...rest } = options + const { adapter, stream: _stream, debug: _debug, observers, ...rest } = options const model = adapter.model const requestId = createId('speech') const startTime = Date.now() @@ -153,6 +165,18 @@ async function runGenerateSpeech< (adapter as { name?: string }).name ?? 'unknown' + await notifyObserverStart( + observers, + { + activity: 'speech', + requestId, + provider: adapter.name, + model, + modelOptions: rest.modelOptions, + }, + logger, + ) + aiEventClient.emit('speech:request:started', { requestId, provider: adapter.name, @@ -202,6 +226,19 @@ async function runGenerateSpeech< contentType: result.contentType, }) + await notifyObserverFinish( + observers, + { + activity: 'speech', + requestId, + provider: adapter.name, + model, + durationMs: duration, + usage: result.usage, + }, + logger, + ) + return result } catch (error) { const duration = Date.now() - startTime @@ -215,6 +252,18 @@ async function runGenerateSpeech< modelOptions: rest.modelOptions as Record | undefined, timestamp: Date.now(), }) + await notifyObserverError( + observers, + { + activity: 'speech', + requestId, + provider: adapter.name, + model, + durationMs: duration, + error, + }, + logger, + ) logger.errors('generateSpeech activity failed', { error, source: 'generateSpeech', diff --git a/packages/ai/src/activities/generateTranscription/index.ts b/packages/ai/src/activities/generateTranscription/index.ts index 90262e9e9..584faa404 100644 --- a/packages/ai/src/activities/generateTranscription/index.ts +++ b/packages/ai/src/activities/generateTranscription/index.ts @@ -8,8 +8,14 @@ import { aiEventClient } from '@tanstack/ai-event-client' import { streamGenerationResult } from '../stream-generation-result.js' import { resolveDebugOption } from '../../logger/resolve' +import { + notifyObserverError, + notifyObserverFinish, + notifyObserverStart, +} from '../../observability/notify' import type { InternalLogger } from '../../logger/internal-logger' import type { DebugOption } from '../../logger/types' +import type { ActivityObserver } from '../../observability/types' import type { TranscriptionAdapter } from './adapter' import type { StreamChunk, TranscriptionResult } from '../../types' @@ -76,6 +82,12 @@ export interface TranscriptionActivityOptions< * control and/or a custom `Logger`. */ debug?: DebugOption + /** + * Observability hooks notified on start, success, and error. Pass + * `otelObserver()` to emit OpenTelemetry spans, or implement the + * `ActivityObserver` contract for a custom backend. + */ + observers?: Array } // =========================== @@ -174,7 +186,7 @@ async function runGenerateTranscription< >( options: TranscriptionActivityOptions, ): Promise { - const { adapter, stream: _stream, debug: _debug, ...rest } = options + const { adapter, stream: _stream, debug: _debug, observers, ...rest } = options const model = adapter.model const requestId = createId('transcription') const startTime = Date.now() @@ -184,6 +196,18 @@ async function runGenerateTranscription< (adapter as { name?: string }).name ?? 'unknown' + await notifyObserverStart( + observers, + { + activity: 'transcription', + requestId, + provider: adapter.name, + model, + modelOptions: rest.modelOptions, + }, + logger, + ) + aiEventClient.emit('transcription:request:started', { requestId, provider: adapter.name, @@ -220,6 +244,19 @@ async function runGenerateTranscription< { hasText: !!result.text }, ) + await notifyObserverFinish( + observers, + { + activity: 'transcription', + requestId, + provider: adapter.name, + model, + durationMs: duration, + usage: result.usage, + }, + logger, + ) + return result } catch (error) { const duration = Date.now() - startTime @@ -233,6 +270,18 @@ async function runGenerateTranscription< modelOptions: rest.modelOptions as Record | undefined, timestamp: Date.now(), }) + await notifyObserverError( + observers, + { + activity: 'transcription', + requestId, + provider: adapter.name, + model, + durationMs: duration, + error, + }, + logger, + ) logger.errors('generateTranscription activity failed', { error, source: 'generateTranscription', diff --git a/packages/ai/src/activities/generateVideo/index.ts b/packages/ai/src/activities/generateVideo/index.ts index 4e0e48896..016ee9a37 100644 --- a/packages/ai/src/activities/generateVideo/index.ts +++ b/packages/ai/src/activities/generateVideo/index.ts @@ -10,8 +10,14 @@ import { aiEventClient } from '@tanstack/ai-event-client' import { toRunErrorPayload } from '../error-payload' import { resolveDebugOption } from '../../logger/resolve' +import { + notifyObserverError, + notifyObserverFinish, + notifyObserverStart, +} from '../../observability/notify' import type { InternalLogger } from '../../logger/internal-logger' import type { DebugOption } from '../../logger/types' +import type { ActivityObserver } from '../../observability/types' import type { VideoAdapter } from './adapter' import type { StreamChunk, @@ -111,6 +117,14 @@ export type VideoCreateOptions< * control and/or a custom `Logger`. */ debug?: DebugOption + /** + * Observability hooks notified on start, success, and error. Pass + * `otelObserver()` to emit OpenTelemetry spans, or implement the + * `ActivityObserver` contract for a custom backend. In streaming mode the + * span covers the full create→poll→complete lifecycle; in non-streaming mode + * it covers job submission. + */ + observers?: Array } & ({} extends VideoProviderOptions ? { /** Provider-specific options for video generation */ modelOptions?: VideoProviderOptions @@ -250,14 +264,28 @@ export function generateVideo< async function runCreateVideoJob< TAdapter extends VideoAdapter, >(options: VideoCreateOptions): Promise { - const { adapter, prompt, size, duration, modelOptions } = options + const { adapter, prompt, size, duration, modelOptions, observers } = options const model = adapter.model + const requestId = createId('video') + const startTime = Date.now() const logger: InternalLogger = resolveDebugOption(options.debug) const providerName = (adapter as { name?: string; provider?: string }).provider ?? (adapter as { name?: string }).name ?? 'unknown' + await notifyObserverStart( + observers, + { + activity: 'video', + requestId, + provider: adapter.name, + model, + modelOptions, + }, + logger, + ) + logger.request(`activity=generateVideo provider=${providerName}`, { provider: providerName, model, @@ -276,8 +304,33 @@ async function runCreateVideoJob< jobId: result.jobId, model: result.model, }) + // Non-streaming create only submits the job; usage isn't known until the + // job completes via polling, so the span covers submission only. + await notifyObserverFinish( + observers, + { + activity: 'video', + requestId, + provider: adapter.name, + model, + durationMs: Date.now() - startTime, + }, + logger, + ) return result } catch (error) { + await notifyObserverError( + observers, + { + activity: 'video', + requestId, + provider: adapter.name, + model, + durationMs: Date.now() - startTime, + error, + }, + logger, + ) logger.errors('generateVideo activity failed', { error, source: 'generateVideo', @@ -297,9 +350,11 @@ function sleep(ms: number): Promise { async function* runStreamingVideoGeneration< TAdapter extends VideoAdapter, >(options: VideoCreateOptions): AsyncIterable { - const { adapter, prompt, size, duration, modelOptions } = options + const { adapter, prompt, size, duration, modelOptions, observers } = options const model = adapter.model const runId = options.runId ?? createId('run') + const requestId = createId('video') + const obsStartTime = Date.now() const pollingInterval = options.pollingInterval ?? 2000 const maxDuration = options.maxDuration ?? 600_000 const logger: InternalLogger = resolveDebugOption(options.debug) @@ -317,6 +372,18 @@ async function* runStreamingVideoGeneration< timestamp: Date.now(), } as StreamChunk + await notifyObserverStart( + observers, + { + activity: 'video', + requestId, + provider: adapter.name, + model, + modelOptions, + }, + logger, + ) + logger.request( `activity=generateVideo provider=${providerName} stream=true`, { @@ -325,6 +392,9 @@ async function* runStreamingVideoGeneration< }, ) + // Tracks whether a terminal observer event (finish/error) has already fired, + // so the `finally` below can fire one on abandonment without double-firing. + let settled = false try { // Create the video generation job const jobResult = await adapter.createVideoJob({ @@ -373,6 +443,24 @@ async function* runStreamingVideoGeneration< }, ) + // Fire finish before yielding the terminal chunks: the generation has + // succeeded, so a consumer that stops reading after `generation:result` + // (without pulling `RUN_FINISHED`) must not trip the abandonment path in + // `finally`, which would otherwise report a spurious cancellation. + await notifyObserverFinish( + observers, + { + activity: 'video', + requestId, + provider: adapter.name, + model, + durationMs: Date.now() - obsStartTime, + usage: urlResult.usage, + }, + logger, + ) + settled = true + yield { type: 'CUSTOM', name: 'generation:result', @@ -404,6 +492,19 @@ async function* runStreamingVideoGeneration< throw new Error('Video generation timed out') } catch (error: unknown) { const payload = toRunErrorPayload(error, 'Video generation failed') + await notifyObserverError( + observers, + { + activity: 'video', + requestId, + provider: adapter.name, + model, + durationMs: Date.now() - obsStartTime, + error, + }, + logger, + ) + settled = true logger.errors('generateVideo activity failed', { message: payload.message, code: payload.code, @@ -418,6 +519,28 @@ async function* runStreamingVideoGeneration< error: payload, timestamp: Date.now(), } as StreamChunk + } finally { + if (!settled) { + // The consumer abandoned the stream (broke the `for await` loop or + // disconnected) before completion, so the generator is being unwound at + // a `yield` without reaching finish/error. Fire a terminal cancellation + // event so otelObserver ends its span instead of leaking it. + await notifyObserverError( + observers, + { + activity: 'video', + requestId, + provider: adapter.name, + model, + durationMs: Date.now() - obsStartTime, + error: { + name: 'cancelled', + message: 'Video generation stream abandoned before completion', + }, + }, + logger, + ) + } } } diff --git a/packages/ai/src/index.ts b/packages/ai/src/index.ts index dbb38722b..580a51485 100644 --- a/packages/ai/src/index.ts +++ b/packages/ai/src/index.ts @@ -118,6 +118,18 @@ export type { ErrorInfo, } from './activities/chat/middleware/index' +// Observability — activity-agnostic observer types. Pure types only; the +// `otelObserver` value lives at `@tanstack/ai/observability` so the root barrel +// never requires the optional `@opentelemetry/api` peer dependency. +export type { + ActivityObserver, + ActivityKind, + ActivityEventBase, + ActivityStartEvent, + ActivityFinishEvent, + ActivityErrorEvent, +} from './observability/types' + // All types export * from './types' diff --git a/packages/ai/src/middlewares/otel.ts b/packages/ai/src/middlewares/otel.ts index c953e3f2f..af136f085 100644 --- a/packages/ai/src/middlewares/otel.ts +++ b/packages/ai/src/middlewares/otel.ts @@ -8,6 +8,9 @@ import { MAX_TOKENS_KEYS, NESTED_MAX_TOKENS_KEY, } from '../utilities/sampling-keys' +import { firstNumber } from '../utilities/numbers' +import { errorMessage, errorTypeName } from '../utilities/errors' +import { usageAttributes } from '../observability/usage-attributes' import type { AttributeValue, Exception, @@ -166,38 +169,6 @@ function messageEventName(role: string): string { } } -/** - * Return the first candidate that is a finite `number`, or `undefined`. Used to - * pick a sampling attribute from among the several provider-native spellings. - */ -function firstNumber(...candidates: Array): number | undefined { - for (const candidate of candidates) { - if (typeof candidate === 'number' && Number.isFinite(candidate)) { - return candidate - } - } - return undefined -} - -function errorMessage(err: unknown): string | undefined { - if (err instanceof Error) return err.message - if (typeof err === 'string') return err - if (err && typeof err === 'object' && 'message' in err) { - const m = (err as { message?: unknown }).message - if (typeof m === 'string') return m - } - return undefined -} - -function errorTypeName(err: unknown): string { - if (err instanceof Error) return err.name || 'Error' - if (err && typeof err === 'object' && 'name' in err) { - const n = (err as { name?: unknown }).name - if (typeof n === 'string') return n - } - return 'Error' -} - function safeCall(label: string, fn: () => T): T | undefined { try { return fn() @@ -524,10 +495,7 @@ export function otelMiddleware(options: OtelMiddlewareOptions): ChatMiddleware { // `runOnUsage` when `chunk.usage` is present, and `onUsage` is the // canonical place for the metric. Recording in both would double-count. if (chunk.usage) { - span.setAttributes({ - 'gen_ai.usage.input_tokens': chunk.usage.promptTokens, - 'gen_ai.usage.output_tokens': chunk.usage.completionTokens, - }) + span.setAttributes(usageAttributes(chunk.usage)) } if (captureContent && state.assistantTextBuffer.length > 0) { @@ -584,10 +552,7 @@ export function otelMiddleware(options: OtelMiddlewareOptions): ChatMiddleware { } const span = state.currentIterationSpan ?? state.rootSpan - span.setAttributes({ - 'gen_ai.usage.input_tokens': usage.promptTokens, - 'gen_ai.usage.output_tokens': usage.completionTokens, - }) + span.setAttributes(usageAttributes(usage)) }) }, @@ -905,10 +870,7 @@ export function otelMiddleware(options: OtelMiddlewareOptions): ChatMiddleware { } if (info.usage) { - state.rootSpan.setAttributes({ - 'gen_ai.usage.input_tokens': info.usage.promptTokens, - 'gen_ai.usage.output_tokens': info.usage.completionTokens, - }) + state.rootSpan.setAttributes(usageAttributes(info.usage)) } if (info.finishReason) { state.rootSpan.setAttribute('gen_ai.response.finish_reasons', [ diff --git a/packages/ai/src/observability/index.ts b/packages/ai/src/observability/index.ts new file mode 100644 index 000000000..0c10901e8 --- /dev/null +++ b/packages/ai/src/observability/index.ts @@ -0,0 +1,17 @@ +// Public entry for the `@tanstack/ai/observability` subpath. +// +// Exposed here (not from the package root) so that importing `@tanstack/ai` +// never eagerly requires `@opentelemetry/api`, which is an optional peer +// dependency — mirroring how `otelMiddleware` lives at `@tanstack/ai/middlewares/otel`. +// The pure observer types are re-exported from the root for ergonomics; the +// `otelObserver` value lives only here. +export { otelObserver } from './otel' +export type { OtelObserverOptions } from './otel' +export type { + ActivityKind, + ActivityObserver, + ActivityEventBase, + ActivityStartEvent, + ActivityFinishEvent, + ActivityErrorEvent, +} from './types' diff --git a/packages/ai/src/observability/notify.ts b/packages/ai/src/observability/notify.ts new file mode 100644 index 000000000..faf2bb9e1 --- /dev/null +++ b/packages/ai/src/observability/notify.ts @@ -0,0 +1,60 @@ +import type { InternalLogger } from '../logger/internal-logger' +import type { + ActivityErrorEvent, + ActivityFinishEvent, + ActivityObserver, + ActivityStartEvent, +} from './types' + +/** + * Fan an event out to each observer's hook in registration order, awaiting each + * one. A hook that throws is logged and skipped — observers are strictly + * non-fatal, so a broken observer can never break the activity it watches. + */ +async function runHooks( + observers: ReadonlyArray | undefined, + event: TEvent, + select: ( + observer: ActivityObserver, + ) => ((event: TEvent) => void | Promise) | undefined, + phase: 'onStart' | 'onFinish' | 'onError', + logger: InternalLogger, +): Promise { + if (!observers || observers.length === 0) return + for (const observer of observers) { + const hook = select(observer) + if (!hook) continue + try { + await hook.call(observer, event) + } catch (error) { + logger.errors( + `observer "${observer.name ?? 'anonymous'}" ${phase} hook failed`, + { error, source: 'observer' }, + ) + } + } +} + +export function notifyObserverStart( + observers: ReadonlyArray | undefined, + event: ActivityStartEvent, + logger: InternalLogger, +): Promise { + return runHooks(observers, event, (o) => o.onStart, 'onStart', logger) +} + +export function notifyObserverFinish( + observers: ReadonlyArray | undefined, + event: ActivityFinishEvent, + logger: InternalLogger, +): Promise { + return runHooks(observers, event, (o) => o.onFinish, 'onFinish', logger) +} + +export function notifyObserverError( + observers: ReadonlyArray | undefined, + event: ActivityErrorEvent, + logger: InternalLogger, +): Promise { + return runHooks(observers, event, (o) => o.onError, 'onError', logger) +} diff --git a/packages/ai/src/observability/otel.ts b/packages/ai/src/observability/otel.ts new file mode 100644 index 000000000..4694d8229 --- /dev/null +++ b/packages/ai/src/observability/otel.ts @@ -0,0 +1,175 @@ +import { SpanKind, SpanStatusCode } from '@opentelemetry/api' +import { errorMessage, errorTypeName } from '../utilities/errors' +import { usageAttributes } from './usage-attributes' +import type { + AttributeValue, + Exception, + Meter, + Span, + Tracer, +} from '@opentelemetry/api' +import type { ActivityKind, ActivityObserver, ActivityStartEvent } from './types' + +/** + * `gen_ai.operation.name` per activity. Chat uses the GenAI semconv value; + * media operations have no semconv entry yet, so these are the de-facto names + * consumed by GenAI backends (PostHog, Langfuse, …). Documented in + * `docs/advanced/otel.md`. + */ +const OPERATION_NAME: Record = { + chat: 'chat', + image: 'image_generation', + video: 'video_generation', + audio: 'audio_generation', + speech: 'text_to_speech', + transcription: 'transcription', +} + +export interface OtelObserverOptions { + /** OTel `Tracer` used to start one span per activity call. */ + tracer: Tracer + /** + * Optional OTel `Meter`. When provided, the observer records the + * `gen_ai.client.operation.duration` histogram (seconds) — the same metric + * the chat `otelMiddleware` emits. Omit to disable metrics. + */ + meter?: Meter + /** Override the default span name (`" "`). */ + spanNameFormatter?: (event: ActivityStartEvent) => string + /** Add extra attributes to the span at start. */ + attributeEnricher?: ( + event: ActivityStartEvent, + ) => Record +} + +function safeCall(label: string, fn: () => T): T | undefined { + try { + return fn() + } catch (err) { + // Keep the observer non-fatal but surface broken extension callbacks, so a + // throwing spanNameFormatter/attributeEnricher never loses the whole span. + console.warn(`[otelObserver] ${label} failed`, err) + return undefined + } +} + +/** + * An {@link ActivityObserver} that emits one OpenTelemetry span per activity + * call, tagged with the right `gen_ai.operation.name` for the activity + * (`image_generation`, `text_to_speech`, …). Register it on any media activity + * via its `observers` option. + * + * Reuses the same `gen_ai.usage.*` attribute set as the chat `otelMiddleware`, + * so cost, totals, cache/reasoning details, duration billing, and media unit + * counts land identically across activities. + * + * @example + * ```ts + * import { generateImage } from '@tanstack/ai' + * import { otelObserver } from '@tanstack/ai/observability' + * import { openaiImage } from '@tanstack/ai-openai' + * import { trace, metrics } from '@opentelemetry/api' + * + * const observer = otelObserver({ + * tracer: trace.getTracer('my-app'), + * meter: metrics.getMeter('my-app'), + * }) + * + * await generateImage({ + * adapter: openaiImage('gpt-image-2'), + * prompt: 'A serene mountain landscape at sunset', + * observers: [observer], + * }) + * ``` + */ +export function otelObserver(options: OtelObserverOptions): ActivityObserver { + const { tracer, meter, spanNameFormatter, attributeEnricher } = options + + const durationHistogram = meter?.createHistogram( + 'gen_ai.client.operation.duration', + { + description: 'GenAI client operation duration', + unit: 's', + }, + ) + + // Spans live only for the duration of a single activity call; keyed by the + // event's requestId so concurrent calls don't collide. + const spans = new Map() + + const recordDuration = ( + activity: ActivityKind, + provider: string, + model: string, + durationMs: number, + errorType?: string, + ): void => { + if (!durationHistogram) return + durationHistogram.record(durationMs / 1000, { + 'gen_ai.system': provider, + 'gen_ai.operation.name': OPERATION_NAME[activity], + 'gen_ai.request.model': model, + ...(errorType ? { 'error.type': errorType } : {}), + }) + } + + return { + name: 'otel', + + onStart(event) { + const operationName = OPERATION_NAME[event.activity] + const name = + safeCall('spanNameFormatter', () => spanNameFormatter?.(event)) ?? + `${operationName} ${event.model}` + const span = tracer.startSpan(name, { + kind: SpanKind.CLIENT, + attributes: { + 'gen_ai.system': event.provider, + 'gen_ai.operation.name': operationName, + 'gen_ai.request.model': event.model, + }, + }) + const enriched = safeCall('attributeEnricher', () => + attributeEnricher?.(event), + ) + if (enriched) span.setAttributes(enriched) + spans.set(event.requestId, span) + }, + + onFinish(event) { + const span = spans.get(event.requestId) + spans.delete(event.requestId) + if (span) { + if (event.usage) span.setAttributes(usageAttributes(event.usage)) + span.end() + } + recordDuration( + event.activity, + event.provider, + event.model, + event.durationMs, + ) + }, + + onError(event) { + const span = spans.get(event.requestId) + spans.delete(event.requestId) + if (span) { + span.recordException(event.error as Exception) + const message = errorMessage(event.error) + span.setStatus({ + code: SpanStatusCode.ERROR, + ...(message !== undefined ? { message } : {}), + }) + span.end() + } + recordDuration( + event.activity, + event.provider, + event.model, + event.durationMs, + errorTypeName(event.error), + ) + }, + } +} diff --git a/packages/ai/src/observability/types.ts b/packages/ai/src/observability/types.ts new file mode 100644 index 000000000..407d64ec9 --- /dev/null +++ b/packages/ai/src/observability/types.ts @@ -0,0 +1,86 @@ +import type { TokenUsage } from '../types' + +/** + * The kind of activity an {@link ActivityObserver} event describes. + * + * `chat` is included so the same observer contract can cover chat in the + * future; today the activity functions that fire observer events are the media + * activities (`image`, `video`, `audio`, `speech`, `transcription`). Chat + * observability is served by `otelMiddleware`. + */ +export type ActivityKind = + | 'chat' + | 'image' + | 'video' + | 'audio' + | 'speech' + | 'transcription' + +/** Fields present on every activity observer event. */ +export interface ActivityEventBase { + /** + * Stable id correlating the `onStart` / `onFinish` / `onError` events of a + * single activity call. + */ + requestId: string + /** Adapter/provider name (e.g. `"openai"`). Emitted as `gen_ai.system`. */ + provider: string + /** Model id. Emitted as `gen_ai.request.model`. */ + model: string + /** Provider-specific options passed to the activity, if any. */ + modelOptions?: unknown +} + +/** + * Fired before the adapter request begins. Carries the common identity fields + * plus the `activity` discriminator; `otelObserver` uses it to open a span. + * + * Request inputs (prompt, size, voice, …) are intentionally not duplicated onto + * this event — they are already published on the `aiEventClient` + * `*:request:started` events for anyone who needs them. + */ +export interface ActivityStartEvent extends ActivityEventBase { + activity: ActivityKind +} + +/** Fired after the activity completes successfully. */ +export interface ActivityFinishEvent extends ActivityEventBase { + activity: ActivityKind + /** Wall-clock duration of the activity call, in milliseconds. */ + durationMs: number + /** Unified usage, when the provider reported it. */ + usage?: TokenUsage +} + +/** Fired when the activity throws before completing. */ +export interface ActivityErrorEvent extends ActivityEventBase { + activity: ActivityKind + /** Wall-clock duration until the failure, in milliseconds. */ + durationMs: number + /** The thrown value (typically an `Error`). */ + error: unknown +} + +/** + * Activity-agnostic observability hook. + * + * A thin lifecycle observer registerable on any activity via its `observers` + * option. Unlike the chat middleware pipeline (which can rewrite config, + * chunks, and tool calls), an observer is read-only and single request → + * response shaped — the right fit for media activities. Ship `otelObserver()` + * for OpenTelemetry, or implement the three hooks directly for custom backends. + * + * Hooks are awaited in registration order and are non-fatal: a hook that throws + * is logged and skipped, never breaking the activity. Keep them cheap — they + * run inline with the request. + */ +export interface ActivityObserver { + /** Optional name, surfaced in diagnostics when a hook throws. */ + name?: string + /** Called before the adapter request begins. */ + onStart?: (event: ActivityStartEvent) => void | Promise + /** Called after the activity completes successfully. */ + onFinish?: (event: ActivityFinishEvent) => void | Promise + /** Called when the activity throws before completing. */ + onError?: (event: ActivityErrorEvent) => void | Promise +} diff --git a/packages/ai/src/observability/usage-attributes.ts b/packages/ai/src/observability/usage-attributes.ts new file mode 100644 index 000000000..f5d1b2c08 --- /dev/null +++ b/packages/ai/src/observability/usage-attributes.ts @@ -0,0 +1,65 @@ +import { firstNumber } from '../utilities/numbers' +import type { AttributeValue } from '@opentelemetry/api' +import type { TokenUsage } from '../types' + +/** + * Build the full set of `gen_ai.usage.*` span attributes from a `TokenUsage`. + * + * Beyond input/output tokens, this emits provider-reported cost, total tokens, + * cache and reasoning breakdowns, duration-based billing, and media unit counts + * — every field is guarded so spans stay clean when a provider doesn't report + * it. Cache and reasoning use the official GenAI semconv names; + * `gen_ai.usage.cost` and `gen_ai.usage.total_tokens` are de-facto extensions + * consumed by backends like PostHog (which otherwise re-derive cost from their + * own price tables, losing cache discounts and gateway markup). Fields with no + * semconv or de-facto convention (`costDetails`, `durationSeconds`, + * `unitsBilled`) are TanStack-namespaced. + * + * Shared by the chat `otelMiddleware` and the activity-agnostic `otelObserver` + * so usage lands identically whichever surface produced the span. + * + * Deliberately not emitted: `providerUsageDetails` (a provider-shaped bag, + * unsafe to spread onto spans) and the per-modality token breakdowns + * (`promptTokensDetails.audioTokens`, etc.) — those can balloon the attribute + * set and have no agreed convention yet. + */ +export function usageAttributes( + usage: TokenUsage, +): Record { + const attrs: Record = { + 'gen_ai.usage.input_tokens': usage.promptTokens, + 'gen_ai.usage.output_tokens': usage.completionTokens, + } + const optional: Array<[key: string, value: unknown]> = [ + ['gen_ai.usage.total_tokens', usage.totalTokens], + ['gen_ai.usage.cost', usage.cost], + [ + 'gen_ai.usage.cache_read.input_tokens', + usage.promptTokensDetails?.cachedTokens, + ], + [ + 'gen_ai.usage.cache_creation.input_tokens', + usage.promptTokensDetails?.cacheWriteTokens, + ], + [ + 'gen_ai.usage.reasoning.output_tokens', + usage.completionTokensDetails?.reasoningTokens, + ], + ['tanstack.ai.usage.duration_seconds', usage.durationSeconds], + ['tanstack.ai.usage.units_billed', usage.unitsBilled], + ['tanstack.ai.usage.upstream_cost', usage.costDetails?.upstreamCost], + [ + 'tanstack.ai.usage.upstream_input_cost', + usage.costDetails?.upstreamInputCost, + ], + [ + 'tanstack.ai.usage.upstream_output_cost', + usage.costDetails?.upstreamOutputCost, + ], + ] + for (const [key, value] of optional) { + const num = firstNumber(value) + if (num !== undefined) attrs[key] = num + } + return attrs +} diff --git a/packages/ai/src/utilities/errors.ts b/packages/ai/src/utilities/errors.ts new file mode 100644 index 000000000..1fb03f1f6 --- /dev/null +++ b/packages/ai/src/utilities/errors.ts @@ -0,0 +1,29 @@ +/** + * Best-effort extraction of a human-readable message from an unknown thrown + * value, returning `undefined` when none can be found. + * + * Shared by the OpenTelemetry surfaces (`otelMiddleware` and `otelObserver`) so + * error reporting stays identical across chat and media spans. + */ +export function errorMessage(err: unknown): string | undefined { + if (err instanceof Error) return err.message + if (typeof err === 'string') return err + if (err && typeof err === 'object' && 'message' in err) { + const m = (err as { message?: unknown }).message + if (typeof m === 'string') return m + } + return undefined +} + +/** + * Best-effort extraction of an error's type name (used for the `error.type` + * metric attribute), falling back to `'Error'` when no name is available. + */ +export function errorTypeName(err: unknown): string { + if (err instanceof Error) return err.name || 'Error' + if (err && typeof err === 'object' && 'name' in err) { + const n = (err as { name?: unknown }).name + if (typeof n === 'string') return n + } + return 'Error' +} diff --git a/packages/ai/src/utilities/numbers.ts b/packages/ai/src/utilities/numbers.ts new file mode 100644 index 000000000..e638fb65e --- /dev/null +++ b/packages/ai/src/utilities/numbers.ts @@ -0,0 +1,15 @@ +/** + * Return the first candidate that is a finite `number`, or `undefined`. + * + * Handy for picking a value from among several possible spellings/sources where + * only some are populated — e.g. the provider-native sampling option names read + * by the OTel middleware, or the optional numeric fields on `TokenUsage`. + */ +export function firstNumber(...candidates: Array): number | undefined { + for (const candidate of candidates) { + if (typeof candidate === 'number' && Number.isFinite(candidate)) { + return candidate + } + } + return undefined +} diff --git a/packages/ai/tests/middlewares/otel.test.ts b/packages/ai/tests/middlewares/otel.test.ts index 95bc3d6bf..68f5ec56c 100644 --- a/packages/ai/tests/middlewares/otel.test.ts +++ b/packages/ai/tests/middlewares/otel.test.ts @@ -307,6 +307,151 @@ describe('otelMiddleware — duration histogram and rollup', () => { }) }) +describe('otelMiddleware — full usage emission', () => { + // Everything `TokenUsage` carries beyond input/output tokens: cost, + // totals, cache/reasoning breakdowns, duration-based billing, and the + // upstream cost split. Backends like PostHog consume `gen_ai.usage.cost` + // directly; without it they re-derive cost from their own price tables + // and lose cache discounts / gateway markup (OpenRouter). + const fullUsage = { + promptTokens: 100, + completionTokens: 50, + totalTokens: 165, + promptTokensDetails: { cachedTokens: 80, cacheWriteTokens: 10 }, + completionTokensDetails: { reasoningTokens: 15 }, + durationSeconds: 2.5, + cost: 0.0123, + costDetails: { + upstreamCost: 0.01, + upstreamInputCost: 0.004, + upstreamOutputCost: 0.006, + }, + } + + const expectFullUsageAttrs = (span: FakeSpan) => { + expect(span.attributes['gen_ai.usage.input_tokens']).toBe(100) + expect(span.attributes['gen_ai.usage.output_tokens']).toBe(50) + expect(span.attributes['gen_ai.usage.total_tokens']).toBe(165) + expect(span.attributes['gen_ai.usage.cost']).toBe(0.0123) + expect(span.attributes['gen_ai.usage.cache_read.input_tokens']).toBe(80) + expect(span.attributes['gen_ai.usage.cache_creation.input_tokens']).toBe( + 10, + ) + expect(span.attributes['gen_ai.usage.reasoning.output_tokens']).toBe(15) + expect(span.attributes['tanstack.ai.usage.duration_seconds']).toBe(2.5) + expect(span.attributes['tanstack.ai.usage.upstream_cost']).toBe(0.01) + expect(span.attributes['tanstack.ai.usage.upstream_input_cost']).toBe( + 0.004, + ) + expect(span.attributes['tanstack.ai.usage.upstream_output_cost']).toBe( + 0.006, + ) + } + + it('emits cost, totals, and detail breakdowns from RUN_FINISHED chunk.usage', async () => { + const { tracer, spans } = createFakeTracer() + const mw = otelMiddleware({ tracer }) + const ctx = makeCtx() + + await runToIterationStart(mw, ctx) + await mw.onChunk?.(ctx, { + ...ev.runFinished('stop'), + model: 'gpt-4o', + usage: fullUsage, + }) + + expectFullUsageAttrs(spans[1]!) + }) + + it('emits cost, totals, and detail breakdowns from onUsage', async () => { + const { tracer, spans } = createFakeTracer() + const mw = otelMiddleware({ tracer }) + const ctx = makeCtx() + + await runToIterationStart(mw, ctx) + await mw.onUsage?.(ctx, fullUsage) + + expectFullUsageAttrs(spans[1]!) + }) + + it('rolls up cost, totals, and detail breakdowns onto the root span on onFinish', async () => { + const { tracer, spans } = createFakeTracer() + const mw = otelMiddleware({ tracer }) + const ctx = makeCtx() + + await runToIterationStart(mw, ctx) + await mw.onChunk?.(ctx, { ...ev.runFinished('stop'), model: 'gpt-4o' }) + await mw.onFinish?.(ctx, { + finishReason: 'stop', + duration: 1250, + content: '', + usage: fullUsage, + }) + + expectFullUsageAttrs(spans[0]!) + }) + + it('omits optional usage attributes when the provider does not report them', async () => { + const { tracer, spans } = createFakeTracer() + const mw = otelMiddleware({ tracer }) + const ctx = makeCtx() + + await runToIterationStart(mw, ctx) + await mw.onUsage?.(ctx, { + promptTokens: 100, + completionTokens: 50, + totalTokens: 150, + }) + + const span = spans[1]! + expect(span.attributes['gen_ai.usage.input_tokens']).toBe(100) + expect(span.attributes['gen_ai.usage.output_tokens']).toBe(50) + expect(span.attributes['gen_ai.usage.total_tokens']).toBe(150) + expect(span.attributes['gen_ai.usage.cost']).toBeUndefined() + expect( + span.attributes['gen_ai.usage.cache_read.input_tokens'], + ).toBeUndefined() + expect( + span.attributes['gen_ai.usage.cache_creation.input_tokens'], + ).toBeUndefined() + expect( + span.attributes['gen_ai.usage.reasoning.output_tokens'], + ).toBeUndefined() + expect( + span.attributes['tanstack.ai.usage.duration_seconds'], + ).toBeUndefined() + expect(span.attributes['tanstack.ai.usage.upstream_cost']).toBeUndefined() + expect( + span.attributes['tanstack.ai.usage.upstream_input_cost'], + ).toBeUndefined() + expect( + span.attributes['tanstack.ai.usage.upstream_output_cost'], + ).toBeUndefined() + }) + + it('emits zero-valued usage fields instead of dropping them', async () => { + // cost 0 is a real report (OpenRouter free models), and the OpenRouter + // extractor deliberately preserves it. Pin that the presence guard is + // `!== undefined`, not truthiness — a truthy guard would drop zeros. + const { tracer, spans } = createFakeTracer() + const mw = otelMiddleware({ tracer }) + const ctx = makeCtx() + + await runToIterationStart(mw, ctx) + await mw.onUsage?.(ctx, { + promptTokens: 100, + completionTokens: 50, + totalTokens: 150, + cost: 0, + promptTokensDetails: { cachedTokens: 0 }, + }) + + const span = spans[1]! + expect(span.attributes['gen_ai.usage.cost']).toBe(0) + expect(span.attributes['gen_ai.usage.cache_read.input_tokens']).toBe(0) + }) +}) + describe('otelMiddleware — tool spans', () => { it('creates a tool span as child of the iteration span (including after RUN_FINISHED)', async () => { const { tracer, spans } = createFakeTracer() diff --git a/packages/ai/tests/observability/activity-observers.test.ts b/packages/ai/tests/observability/activity-observers.test.ts new file mode 100644 index 000000000..d67b2ada6 --- /dev/null +++ b/packages/ai/tests/observability/activity-observers.test.ts @@ -0,0 +1,373 @@ +import { describe, expect, it, vi } from 'vitest' +import { + generateAudio, + generateImage, + generateSpeech, + generateTranscription, + generateVideo, +} from '../../src/index' +import { otelObserver } from '../../src/observability/otel' +import { createFakeTracer } from '../middlewares/fake-otel' +import type { + ActivityErrorEvent, + ActivityFinishEvent, + ActivityObserver, + ActivityStartEvent, +} from '../../src/observability/types' + +function recordingObserver() { + const events = { + start: [] as Array, + finish: [] as Array, + error: [] as Array, + } + const observer: ActivityObserver = { + name: 'rec', + onStart: (e) => { + events.start.push(e) + }, + onFinish: (e) => { + events.finish.push(e) + }, + onError: (e) => { + events.error.push(e) + }, + } + return { observer, events } +} + +describe('activity observers — wiring', () => { + it('generateImage fires start then finish with usage', async () => { + const { observer, events } = recordingObserver() + const adapter = { + kind: 'image' as const, + name: 'openai', + model: 'gpt-image-1', + generateImages: vi.fn(async () => ({ + images: [{ url: 'https://example.com/i.png' }], + usage: { + promptTokens: 0, + completionTokens: 0, + totalTokens: 0, + unitsBilled: 1, + cost: 0.04, + }, + })), + } + + const result = await generateImage({ + adapter: adapter as any, + prompt: 'a sunset', + observers: [observer], + }) + + expect(result.images).toHaveLength(1) + expect(events.start).toHaveLength(1) + expect(events.start[0]!.activity).toBe('image') + expect(events.start[0]!.provider).toBe('openai') + expect(events.finish).toHaveLength(1) + expect(events.finish[0]!.usage?.cost).toBe(0.04) + expect(events.error).toHaveLength(0) + // start/finish share the correlation id + expect(events.finish[0]!.requestId).toBe(events.start[0]!.requestId) + }) + + it('generateImage fires error and rethrows', async () => { + const { observer, events } = recordingObserver() + const adapter = { + kind: 'image' as const, + name: 'openai', + model: 'gpt-image-1', + generateImages: vi.fn(async () => { + throw new Error('image boom') + }), + } + + await expect( + generateImage({ + adapter: adapter as any, + prompt: 'x', + observers: [observer], + debug: false, + }), + ).rejects.toThrow('image boom') + + expect(events.start).toHaveLength(1) + expect(events.finish).toHaveLength(0) + expect(events.error).toHaveLength(1) + expect((events.error[0]!.error as Error).message).toBe('image boom') + }) + + it('generateImage with otelObserver produces a span', async () => { + const { tracer, spans } = createFakeTracer() + const adapter = { + kind: 'image' as const, + name: 'openai', + model: 'gpt-image-1', + generateImages: vi.fn(async () => ({ + images: [{ url: 'https://example.com/i.png' }], + usage: { promptTokens: 0, completionTokens: 0, totalTokens: 0, cost: 0.02 }, + })), + } + + await generateImage({ + adapter: adapter as any, + prompt: 'a sunset', + observers: [otelObserver({ tracer })], + }) + + expect(spans).toHaveLength(1) + expect(spans[0]!.attributes['gen_ai.operation.name']).toBe('image_generation') + expect(spans[0]!.attributes['gen_ai.usage.cost']).toBe(0.02) + expect(spans[0]!.ended).toBe(true) + }) + + it('generateSpeech fires start/finish', async () => { + const { observer, events } = recordingObserver() + const adapter = { + kind: 'tts' as const, + name: 'openai', + model: 'gpt-4o-mini-tts', + generateSpeech: vi.fn(async () => ({ + audio: 'base64', + format: 'mp3', + contentType: 'audio/mpeg', + usage: { promptTokens: 5, completionTokens: 0, totalTokens: 5 }, + })), + } + + await generateSpeech({ + adapter: adapter as any, + text: 'hello', + observers: [observer], + }) + + expect(events.start[0]!.activity).toBe('speech') + expect(events.finish[0]!.usage?.promptTokens).toBe(5) + }) + + it('generateTranscription fires start/finish', async () => { + const { observer, events } = recordingObserver() + const adapter = { + kind: 'transcription' as const, + name: 'openai', + model: 'whisper-1', + transcribe: vi.fn(async () => ({ + text: 'hello world', + language: 'en', + usage: { + promptTokens: 0, + completionTokens: 0, + totalTokens: 0, + durationSeconds: 4, + }, + })), + } + + await generateTranscription({ + adapter: adapter as any, + audio: 'base64', + observers: [observer], + }) + + expect(events.start[0]!.activity).toBe('transcription') + expect(events.finish[0]!.usage?.durationSeconds).toBe(4) + }) + + it('generateAudio fires start/finish', async () => { + const { observer, events } = recordingObserver() + const adapter = { + kind: 'audio' as const, + name: 'fal', + model: 'fal-ai/diffrhythm', + generateAudio: vi.fn(async () => ({ + audio: { url: 'https://example.com/a.mp3', contentType: 'audio/mpeg' }, + usage: { promptTokens: 0, completionTokens: 0, totalTokens: 0, unitsBilled: 1 }, + })), + } + + await generateAudio({ + adapter: adapter as any, + prompt: 'an upbeat track', + observers: [observer], + }) + + expect(events.start[0]!.activity).toBe('audio') + expect(events.finish[0]!.usage?.unitsBilled).toBe(1) + }) + + it('generateVideo (non-streaming) fires start/finish for the submit', async () => { + const { observer, events } = recordingObserver() + const adapter = { + kind: 'video' as const, + name: 'openai', + model: 'sora-2', + createVideoJob: vi.fn(async () => ({ jobId: 'job-1', model: 'sora-2' })), + getVideoStatus: vi.fn(), + getVideoUrl: vi.fn(), + } + + const job = await generateVideo({ + adapter: adapter as any, + prompt: 'a cat', + observers: [observer], + }) + + expect(job.jobId).toBe('job-1') + expect(events.start[0]!.activity).toBe('video') + expect(events.finish).toHaveLength(1) + expect(events.finish[0]!.usage).toBeUndefined() + }) + + it('generateVideo (streaming) fires finish with usage at completion', async () => { + const { observer, events } = recordingObserver() + const adapter = { + kind: 'video' as const, + name: 'openai', + model: 'sora-2', + createVideoJob: vi.fn(async () => ({ jobId: 'job-1', model: 'sora-2' })), + getVideoStatus: vi.fn(async () => ({ status: 'completed' as const })), + getVideoUrl: vi.fn(async () => ({ + url: 'https://example.com/v.mp4', + usage: { promptTokens: 0, completionTokens: 0, totalTokens: 0, unitsBilled: 1 }, + })), + } + + const stream = generateVideo({ + adapter: adapter as any, + prompt: 'a cat', + stream: true, + pollingInterval: 1, + observers: [observer], + }) + for await (const _chunk of stream) { + // drain + } + + expect(events.start[0]!.activity).toBe('video') + expect(events.finish).toHaveLength(1) + expect(events.finish[0]!.usage?.unitsBilled).toBe(1) + expect(events.error).toHaveLength(0) + }) + + it('generateVideo (streaming) fires error when the job fails', async () => { + const { observer, events } = recordingObserver() + const adapter = { + kind: 'video' as const, + name: 'openai', + model: 'sora-2', + createVideoJob: vi.fn(async () => ({ jobId: 'job-1', model: 'sora-2' })), + getVideoStatus: vi.fn(async () => ({ + status: 'failed' as const, + error: 'generation failed', + })), + getVideoUrl: vi.fn(), + } + + const stream = generateVideo({ + adapter: adapter as any, + prompt: 'a cat', + stream: true, + pollingInterval: 1, + observers: [observer], + debug: false, + }) + for await (const _chunk of stream) { + // drain — error surfaces as a RUN_ERROR chunk, not a throw + } + + expect(events.finish).toHaveLength(0) + expect(events.error).toHaveLength(1) + expect(events.error[0]!.activity).toBe('video') + }) + + it('generateVideo (streaming) fires a terminal error if the consumer abandons mid-poll', async () => { + const { observer, events } = recordingObserver() + const adapter = { + kind: 'video' as const, + name: 'openai', + model: 'sora-2', + createVideoJob: vi.fn(async () => ({ jobId: 'job-1', model: 'sora-2' })), + // Never completes, so the poll loop keeps running until we abandon it. + getVideoStatus: vi.fn(async () => ({ status: 'in_progress' as const })), + getVideoUrl: vi.fn(), + } + + const stream = generateVideo({ + adapter: adapter as any, + prompt: 'a cat', + stream: true, + pollingInterval: 1, + observers: [observer], + debug: false, + }) + for await (const chunk of stream) { + // Abandon once the job is created — onStart has fired and the span is open. + if ((chunk as { name?: string }).name === 'video:job:created') break + } + + expect(events.start).toHaveLength(1) + expect(events.finish).toHaveLength(0) + // The `finally` cleanup fires a cancellation error so the span is ended. + expect(events.error).toHaveLength(1) + }) + + it('generateVideo (streaming) fires finish (not a cancellation) when the consumer stops after the result', async () => { + const { observer, events } = recordingObserver() + const adapter = { + kind: 'video' as const, + name: 'openai', + model: 'sora-2', + createVideoJob: vi.fn(async () => ({ jobId: 'job-1', model: 'sora-2' })), + getVideoStatus: vi.fn(async () => ({ status: 'completed' as const })), + getVideoUrl: vi.fn(async () => ({ + url: 'https://example.com/v.mp4', + usage: { promptTokens: 0, completionTokens: 0, totalTokens: 0, unitsBilled: 1 }, + })), + } + + const stream = generateVideo({ + adapter: adapter as any, + prompt: 'a cat', + stream: true, + pollingInterval: 1, + observers: [observer], + }) + for await (const chunk of stream) { + // The generation succeeded; stop reading before pulling RUN_FINISHED. + if ((chunk as { name?: string }).name === 'generation:result') break + } + + expect(events.start).toHaveLength(1) + expect(events.finish).toHaveLength(1) + expect(events.finish[0]!.usage?.unitsBilled).toBe(1) + // Abandoning after success must not be reported as a cancellation. + expect(events.error).toHaveLength(0) + }) + + it('a throwing observer never breaks the activity', async () => { + const adapter = { + kind: 'image' as const, + name: 'openai', + model: 'gpt-image-1', + generateImages: vi.fn(async () => ({ + images: [{ url: 'https://example.com/i.png' }], + })), + } + const brokenObserver: ActivityObserver = { + name: 'broken', + onStart: () => { + throw new Error('observer broke') + }, + } + + const result = await generateImage({ + adapter: adapter as any, + prompt: 'x', + observers: [brokenObserver], + debug: false, + }) + + expect(result.images).toHaveLength(1) + }) +}) diff --git a/packages/ai/tests/observability/otel-observer.test.ts b/packages/ai/tests/observability/otel-observer.test.ts new file mode 100644 index 000000000..31ce9954f --- /dev/null +++ b/packages/ai/tests/observability/otel-observer.test.ts @@ -0,0 +1,202 @@ +import { describe, expect, it } from 'vitest' +import { SpanKind, SpanStatusCode } from '@opentelemetry/api' +import { otelObserver } from '../../src/observability/otel' +import { usageAttributes } from '../../src/observability/usage-attributes' +import { createFakeMeter, createFakeTracer } from '../middlewares/fake-otel' +import type { + ActivityKind, + ActivityStartEvent, +} from '../../src/observability/types' +import type { TokenUsage } from '../../src/types' + +function startEvent( + overrides: Partial = {}, +): ActivityStartEvent { + return { + activity: 'image', + requestId: 'req-1', + provider: 'openai', + model: 'gpt-image-1', + ...overrides, + } as ActivityStartEvent +} + +describe('otelObserver', () => { + it('opens a CLIENT span with gen_ai attributes on start', () => { + const { tracer, spans } = createFakeTracer() + const observer = otelObserver({ tracer }) + + observer.onStart?.(startEvent()) + + expect(spans).toHaveLength(1) + const span = spans[0]! + expect(span.kind).toBe(SpanKind.CLIENT) + expect(span.name).toBe('image_generation gpt-image-1') + expect(span.attributes['gen_ai.system']).toBe('openai') + expect(span.attributes['gen_ai.operation.name']).toBe('image_generation') + expect(span.attributes['gen_ai.request.model']).toBe('gpt-image-1') + expect(span.ended).toBe(false) + }) + + it('maps each activity to the right gen_ai.operation.name', () => { + const cases: Array<[ActivityKind, string]> = [ + ['image', 'image_generation'], + ['video', 'video_generation'], + ['audio', 'audio_generation'], + ['speech', 'text_to_speech'], + ['transcription', 'transcription'], + ['chat', 'chat'], + ] + for (const [activity, operation] of cases) { + const { tracer, spans } = createFakeTracer() + const observer = otelObserver({ tracer }) + observer.onStart?.( + startEvent({ activity, requestId: activity } as Partial), + ) + expect(spans[0]!.attributes['gen_ai.operation.name']).toBe(operation) + } + }) + + it('attaches usage attributes and ends the span on finish', () => { + const { tracer, spans } = createFakeTracer() + const { meter, records } = createFakeMeter() + const observer = otelObserver({ tracer, meter }) + + observer.onStart?.(startEvent()) + observer.onFinish?.({ + activity: 'image', + requestId: 'req-1', + provider: 'openai', + model: 'gpt-image-1', + durationMs: 1500, + usage: { + promptTokens: 0, + completionTokens: 0, + totalTokens: 0, + unitsBilled: 1, + cost: 0.04, + }, + }) + + const span = spans[0]! + expect(span.ended).toBe(true) + expect(span.attributes['gen_ai.usage.cost']).toBe(0.04) + expect(span.attributes['tanstack.ai.usage.units_billed']).toBe(1) + + expect(records).toHaveLength(1) + expect(records[0]!.name).toBe('gen_ai.client.operation.duration') + expect(records[0]!.value).toBe(1.5) + expect(records[0]!.attributes?.['gen_ai.operation.name']).toBe( + 'image_generation', + ) + }) + + it('finishes cleanly when no usage is reported', () => { + const { tracer, spans } = createFakeTracer() + const observer = otelObserver({ tracer }) + + observer.onStart?.(startEvent({ activity: 'video' })) + observer.onFinish?.({ + activity: 'video', + requestId: 'req-1', + provider: 'openai', + model: 'gpt-image-1', + durationMs: 10, + }) + + expect(spans[0]!.ended).toBe(true) + expect(spans[0]!.attributes['gen_ai.usage.cost']).toBeUndefined() + }) + + it('records the exception and ERROR status on error', () => { + const { tracer, spans } = createFakeTracer() + const { meter, records } = createFakeMeter() + const observer = otelObserver({ tracer, meter }) + + observer.onStart?.(startEvent()) + const error = new TypeError('boom') + observer.onError?.({ + activity: 'image', + requestId: 'req-1', + provider: 'openai', + model: 'gpt-image-1', + durationMs: 200, + error, + }) + + const span = spans[0]! + expect(span.ended).toBe(true) + expect(span.status.code).toBe(SpanStatusCode.ERROR) + expect(span.status.message).toBe('boom') + expect(span.exceptions[0]!.exception).toBe(error) + expect(records[0]!.attributes?.['error.type']).toBe('TypeError') + }) + + it('keys spans by requestId so concurrent calls do not collide', () => { + const { tracer, spans } = createFakeTracer() + const observer = otelObserver({ tracer }) + + observer.onStart?.(startEvent({ requestId: 'a', model: 'model-a' })) + observer.onStart?.(startEvent({ requestId: 'b', model: 'model-b' })) + // Finish the second one first — should end the model-b span, not model-a. + observer.onFinish?.({ + activity: 'image', + requestId: 'b', + provider: 'openai', + model: 'model-b', + durationMs: 5, + }) + + const spanA = spans.find((s) => s.attributes['gen_ai.request.model'] === 'model-a')! + const spanB = spans.find((s) => s.attributes['gen_ai.request.model'] === 'model-b')! + expect(spanB.ended).toBe(true) + expect(spanA.ended).toBe(false) + }) + + it('applies spanNameFormatter and attributeEnricher', () => { + const { tracer, spans } = createFakeTracer() + const observer = otelObserver({ + tracer, + spanNameFormatter: (e) => `custom:${e.activity}`, + attributeEnricher: () => ({ 'app.tenant': 'acme' }), + }) + + observer.onStart?.(startEvent()) + expect(spans[0]!.name).toBe('custom:image') + expect(spans[0]!.attributes['app.tenant']).toBe('acme') + }) + + it('keeps the span when a formatter throws', () => { + const { tracer, spans } = createFakeTracer() + const observer = otelObserver({ + tracer, + spanNameFormatter: () => { + throw new Error('formatter broke') + }, + }) + + observer.onStart?.(startEvent()) + // Falls back to the default name instead of losing the span. + expect(spans).toHaveLength(1) + expect(spans[0]!.name).toBe('image_generation gpt-image-1') + }) +}) + +describe('usageAttributes', () => { + it('emits guarded media + cost fields, omitting absent ones', () => { + const usage: TokenUsage = { + promptTokens: 10, + completionTokens: 0, + totalTokens: 10, + durationSeconds: 12.5, + unitsBilled: 3, + } + const attrs = usageAttributes(usage) + + expect(attrs['gen_ai.usage.input_tokens']).toBe(10) + expect(attrs['tanstack.ai.usage.duration_seconds']).toBe(12.5) + expect(attrs['tanstack.ai.usage.units_billed']).toBe(3) + // No cost reported → key absent. + expect('gen_ai.usage.cost' in attrs).toBe(false) + }) +}) diff --git a/packages/ai/vite.config.ts b/packages/ai/vite.config.ts index 5189bd6eb..e79f76973 100644 --- a/packages/ai/vite.config.ts +++ b/packages/ai/vite.config.ts @@ -35,6 +35,7 @@ export default mergeConfig( './src/activities/index.ts', './src/middlewares/index.ts', './src/middlewares/otel.ts', + './src/observability/index.ts', './src/adapter-internals.ts', ], srcDir: './src', diff --git a/testing/e2e/src/routeTree.gen.ts b/testing/e2e/src/routeTree.gen.ts index 23f0cc4ff..8e293306b 100644 --- a/testing/e2e/src/routeTree.gen.ts +++ b/testing/e2e/src/routeTree.gen.ts @@ -26,6 +26,8 @@ import { Route as ApiTranscriptionRouteImport } from './routes/api.transcription import { Route as ApiToolsTestRouteImport } from './routes/api.tools-test' import { Route as ApiToolCallLifecycleWireRouteImport } from './routes/api.tool-call-lifecycle-wire' import { Route as ApiSummarizeRouteImport } from './routes/api.summarize' +import { Route as ApiOtelUsageRouteImport } from './routes/api.otel-usage' +import { Route as ApiOtelMediaRouteImport } from './routes/api.otel-media' import { Route as ApiOpenrouterWebToolsWireRouteImport } from './routes/api.openrouter-web-tools-wire' import { Route as ApiOpenrouterCostRouteImport } from './routes/api.openrouter-cost' import { Route as ApiOpenaiUsageDetailsRouteImport } from './routes/api.openai-usage-details' @@ -136,6 +138,16 @@ const ApiSummarizeRoute = ApiSummarizeRouteImport.update({ path: '/api/summarize', getParentRoute: () => rootRouteImport, } as any) +const ApiOtelUsageRoute = ApiOtelUsageRouteImport.update({ + id: '/api/otel-usage', + path: '/api/otel-usage', + getParentRoute: () => rootRouteImport, +} as any) +const ApiOtelMediaRoute = ApiOtelMediaRouteImport.update({ + id: '/api/otel-media', + path: '/api/otel-media', + getParentRoute: () => rootRouteImport, +} as any) const ApiOpenrouterWebToolsWireRoute = ApiOpenrouterWebToolsWireRouteImport.update({ id: '/api/openrouter-web-tools-wire', @@ -284,6 +296,8 @@ export interface FileRoutesByFullPath { '/api/openai-usage-details': typeof ApiOpenaiUsageDetailsRoute '/api/openrouter-cost': typeof ApiOpenrouterCostRoute '/api/openrouter-web-tools-wire': typeof ApiOpenrouterWebToolsWireRoute + '/api/otel-media': typeof ApiOtelMediaRoute + '/api/otel-usage': typeof ApiOtelUsageRoute '/api/summarize': typeof ApiSummarizeRoute '/api/tool-call-lifecycle-wire': typeof ApiToolCallLifecycleWireRoute '/api/tools-test': typeof ApiToolsTestRoute @@ -326,6 +340,8 @@ export interface FileRoutesByTo { '/api/openai-usage-details': typeof ApiOpenaiUsageDetailsRoute '/api/openrouter-cost': typeof ApiOpenrouterCostRoute '/api/openrouter-web-tools-wire': typeof ApiOpenrouterWebToolsWireRoute + '/api/otel-media': typeof ApiOtelMediaRoute + '/api/otel-usage': typeof ApiOtelUsageRoute '/api/summarize': typeof ApiSummarizeRoute '/api/tool-call-lifecycle-wire': typeof ApiToolCallLifecycleWireRoute '/api/tools-test': typeof ApiToolsTestRoute @@ -369,6 +385,8 @@ export interface FileRoutesById { '/api/openai-usage-details': typeof ApiOpenaiUsageDetailsRoute '/api/openrouter-cost': typeof ApiOpenrouterCostRoute '/api/openrouter-web-tools-wire': typeof ApiOpenrouterWebToolsWireRoute + '/api/otel-media': typeof ApiOtelMediaRoute + '/api/otel-usage': typeof ApiOtelUsageRoute '/api/summarize': typeof ApiSummarizeRoute '/api/tool-call-lifecycle-wire': typeof ApiToolCallLifecycleWireRoute '/api/tools-test': typeof ApiToolsTestRoute @@ -413,6 +431,8 @@ export interface FileRouteTypes { | '/api/openai-usage-details' | '/api/openrouter-cost' | '/api/openrouter-web-tools-wire' + | '/api/otel-media' + | '/api/otel-usage' | '/api/summarize' | '/api/tool-call-lifecycle-wire' | '/api/tools-test' @@ -455,6 +475,8 @@ export interface FileRouteTypes { | '/api/openai-usage-details' | '/api/openrouter-cost' | '/api/openrouter-web-tools-wire' + | '/api/otel-media' + | '/api/otel-usage' | '/api/summarize' | '/api/tool-call-lifecycle-wire' | '/api/tools-test' @@ -497,6 +519,8 @@ export interface FileRouteTypes { | '/api/openai-usage-details' | '/api/openrouter-cost' | '/api/openrouter-web-tools-wire' + | '/api/otel-media' + | '/api/otel-usage' | '/api/summarize' | '/api/tool-call-lifecycle-wire' | '/api/tools-test' @@ -540,6 +564,8 @@ export interface RootRouteChildren { ApiOpenaiUsageDetailsRoute: typeof ApiOpenaiUsageDetailsRoute ApiOpenrouterCostRoute: typeof ApiOpenrouterCostRoute ApiOpenrouterWebToolsWireRoute: typeof ApiOpenrouterWebToolsWireRoute + ApiOtelMediaRoute: typeof ApiOtelMediaRoute + ApiOtelUsageRoute: typeof ApiOtelUsageRoute ApiSummarizeRoute: typeof ApiSummarizeRoute ApiToolCallLifecycleWireRoute: typeof ApiToolCallLifecycleWireRoute ApiToolsTestRoute: typeof ApiToolsTestRoute @@ -670,6 +696,20 @@ declare module '@tanstack/react-router' { preLoaderRoute: typeof ApiSummarizeRouteImport parentRoute: typeof rootRouteImport } + '/api/otel-usage': { + id: '/api/otel-usage' + path: '/api/otel-usage' + fullPath: '/api/otel-usage' + preLoaderRoute: typeof ApiOtelUsageRouteImport + parentRoute: typeof rootRouteImport + } + '/api/otel-media': { + id: '/api/otel-media' + path: '/api/otel-media' + fullPath: '/api/otel-media' + preLoaderRoute: typeof ApiOtelMediaRouteImport + parentRoute: typeof rootRouteImport + } '/api/openrouter-web-tools-wire': { id: '/api/openrouter-web-tools-wire' path: '/api/openrouter-web-tools-wire' @@ -921,6 +961,8 @@ const rootRouteChildren: RootRouteChildren = { ApiOpenaiUsageDetailsRoute: ApiOpenaiUsageDetailsRoute, ApiOpenrouterCostRoute: ApiOpenrouterCostRoute, ApiOpenrouterWebToolsWireRoute: ApiOpenrouterWebToolsWireRoute, + ApiOtelMediaRoute: ApiOtelMediaRoute, + ApiOtelUsageRoute: ApiOtelUsageRoute, ApiSummarizeRoute: ApiSummarizeRoute, ApiToolCallLifecycleWireRoute: ApiToolCallLifecycleWireRoute, ApiToolsTestRoute: ApiToolsTestRoute, diff --git a/testing/e2e/src/routes/api.otel-media.ts b/testing/e2e/src/routes/api.otel-media.ts new file mode 100644 index 000000000..95a140797 --- /dev/null +++ b/testing/e2e/src/routes/api.otel-media.ts @@ -0,0 +1,152 @@ +import { createFileRoute } from '@tanstack/react-router' +import { generateImage } from '@tanstack/ai' +import { otelObserver } from '@tanstack/ai/observability' +import type { Provider } from '@/lib/types' +import type { + AttributeValue, + Context, + Span, + SpanContext, + SpanStatus, + Tracer, +} from '@opentelemetry/api' +import { createImageAdapter } from '@/lib/media-providers' + +interface CapturedSpan { + name: string + kind?: number + attributes: Record + status: SpanStatus + ended: boolean +} + +/** + * Single-request in-memory tracer (mirrors `api.otel-usage.ts`). Everything + * happens inside one POST, so spans collect into a local array returned in the + * response body. + */ +function createLocalCaptureTracer(): { + tracer: Tracer + spans: Array +} { + const spans: Array = [] + let spanSeq = 0 + const tracer: Tracer = { + startSpan(name, options = {}, _ctx?: Context): Span { + const id = `span-${spanSeq++}` + const attributes: Record = {} + for (const [k, v] of Object.entries(options.attributes ?? {})) { + if (v !== undefined) attributes[k] = v + } + const captured: CapturedSpan = { + name, + kind: options.kind, + attributes, + status: { code: 0 }, + ended: false, + } + spans.push(captured) + const span: Span = { + spanContext(): SpanContext { + return { traceId: 'otel-media-trace', spanId: id, traceFlags: 1 } + }, + setAttribute(key, value) { + captured.attributes[key] = value + return span + }, + setAttributes(next) { + for (const [k, v] of Object.entries(next)) { + captured.attributes[k] = v as AttributeValue + } + return span + }, + addEvent() { + return span + }, + addLink() { + return span + }, + addLinks() { + return span + }, + setStatus(status) { + captured.status = status + return span + }, + updateName(next) { + captured.name = next + return span + }, + end() { + captured.ended = true + }, + isRecording() { + return !captured.ended + }, + recordException() {}, + } + return span + }, + + startActiveSpan(...args: Array) { + const fn = args[args.length - 1] as (span: Span) => unknown + const name = args[0] as string + const span = tracer.startSpan(name, {}) + try { + return fn(span) + } finally { + span.end() + } + }, + } + return { tracer, spans } +} + +/** + * Drives `generateImage` with an `otelObserver` against the same aimock mount + * the image-gen feature tests use, and returns the captured spans. End-to-end + * proof for #720: the activity-agnostic observer emits a `gen_ai.*` span tagged + * `image_generation` for a non-chat activity, through the public + * `@tanstack/ai/observability` subpath. + */ +export const Route = createFileRoute('/api/otel-media')({ + server: { + handlers: { + POST: async ({ request }) => { + await import('@/lib/llmock-server').then((m) => m.ensureLLMock()) + const body = await request.json() + const data = body.forwardedProps ?? body.data ?? body + const { prompt, provider, testId, aimockPort } = data as { + prompt: string + provider: Provider + testId?: string + aimockPort?: number + } + + const adapter = createImageAdapter(provider, aimockPort, testId) + const { tracer, spans } = createLocalCaptureTracer() + + try { + await generateImage({ + adapter, + prompt, + observers: [otelObserver({ tracer })], + }) + } catch (error) { + return new Response( + JSON.stringify({ + ok: false, + error: error instanceof Error ? error.message : String(error), + }), + { status: 200, headers: { 'Content-Type': 'application/json' } }, + ) + } + + return new Response(JSON.stringify({ ok: true, spans }), { + status: 200, + headers: { 'Content-Type': 'application/json' }, + }) + }, + }, + }, +}) diff --git a/testing/e2e/src/routes/api.otel-usage.ts b/testing/e2e/src/routes/api.otel-usage.ts new file mode 100644 index 000000000..0171fdd2b --- /dev/null +++ b/testing/e2e/src/routes/api.otel-usage.ts @@ -0,0 +1,167 @@ +import { createFileRoute } from '@tanstack/react-router' +import { chat, createChatOptions } from '@tanstack/ai' +import { otelMiddleware } from '@tanstack/ai/middlewares/otel' +import { createOpenaiChatCompletions } from '@tanstack/ai-openai' +import { createOpenRouterText } from '@tanstack/ai-openrouter' +import type { + AttributeValue, + Context, + Span, + SpanContext, + Tracer, +} from '@opentelemetry/api' + +const LLMOCK_DEFAULT_BASE = process.env.LLMOCK_URL || 'http://127.0.0.1:4010' +const DUMMY_KEY = 'sk-e2e-test-dummy-key' + +interface CapturedSpan { + name: string + kind?: number + attributes: Record + ended: boolean +} + +/** + * Single-request in-memory tracer. Unlike the per-testId capture in + * `api.middleware-test.ts`, everything here happens inside one POST, so spans + * collect into a local array returned directly in the response body. + */ +function createLocalCaptureTracer(): { + tracer: Tracer + spans: Array +} { + const spans: Array = [] + let spanSeq = 0 + const tracer: Tracer = { + startSpan(name, options = {}, _ctx?: Context): Span { + const id = `span-${spanSeq++}` + const attributes: Record = {} + for (const [k, v] of Object.entries(options.attributes ?? {})) { + if (v !== undefined) attributes[k] = v as AttributeValue + } + const captured: CapturedSpan = { + name, + kind: options.kind, + attributes, + ended: false, + } + spans.push(captured) + const span: Span = { + spanContext(): SpanContext { + return { traceId: 'otel-usage-trace', spanId: id, traceFlags: 1 } + }, + setAttribute(key, value) { + captured.attributes[key] = value as AttributeValue + return span + }, + setAttributes(next) { + for (const [k, v] of Object.entries(next)) { + captured.attributes[k] = v as AttributeValue + } + return span + }, + addEvent() { + return span + }, + addLink() { + return span + }, + addLinks() { + return span + }, + setStatus() { + return span + }, + updateName(next) { + captured.name = next + return span + }, + end() { + captured.ended = true + }, + isRecording() { + return !captured.ended + }, + recordException() {}, + } + return span + }, + // Minimal implementation — otelMiddleware never calls startActiveSpan. + // eslint-disable-next-line @typescript-eslint/no-explicit-any + startActiveSpan(...args: Array) { + const fn = args[args.length - 1] as (span: Span) => unknown + const name = args[0] as string + const span = tracer.startSpan(name, {}) + try { + return fn(span) + } finally { + span.end() + } + }, + } + return { tracer, spans } +} + +/** + * Drives a chat adapter with `otelMiddleware` against the existing + * hand-crafted aimock mounts that report rich usage, and returns the captured + * spans. Companion E2E proof for full-usage span emission (#721): + * + * - `provider: 'openai'` → `/openai-usage-details` mount, whose trailing usage + * chunk carries `total_tokens`, `prompt_tokens_details.cached_tokens`, and + * `completion_tokens_details.reasoning_tokens`. + * - `provider: 'openrouter'` → `/openrouter-cost` mount, whose trailing usage + * chunk carries `cost` / `cost_details`. + * + * The spec asserts the corresponding `gen_ai.usage.*` / `tanstack.ai.usage.*` + * attributes land on the iteration and root spans. + */ +export const Route = createFileRoute('/api/otel-usage')({ + server: { + handlers: { + POST: async ({ request }) => { + let provider = 'openai' + try { + const body = (await request.json()) as { provider?: string } + if (typeof body.provider === 'string') provider = body.provider + } catch { + // No/invalid body — default provider. + } + + const adapter = + provider === 'openrouter' + ? createOpenRouterText('openai/gpt-4o' as never, DUMMY_KEY, { + serverURL: `${LLMOCK_DEFAULT_BASE}/openrouter-cost/v1`, + }) + : createOpenaiChatCompletions('gpt-4o', DUMMY_KEY, { + baseURL: `${LLMOCK_DEFAULT_BASE}/openai-usage-details/v1`, + }) + + const { tracer, spans } = createLocalCaptureTracer() + + try { + for await (const _chunk of chat({ + ...createChatOptions({ adapter }), + messages: [{ role: 'user', content: 'hi' }], + middleware: [otelMiddleware({ tracer })], + })) { + // Drain — the assertions live on the captured spans. + } + } catch (error) { + return new Response( + JSON.stringify({ + ok: false, + error: error instanceof Error ? error.message : String(error), + }), + { status: 200, headers: { 'Content-Type': 'application/json' } }, + ) + } + + return new Response(JSON.stringify({ ok: true, spans }), { + status: 200, + headers: { 'Content-Type': 'application/json' }, + }) + }, + }, + }, +}) diff --git a/testing/e2e/tests/middleware.spec.ts b/testing/e2e/tests/middleware.spec.ts index e039d78ab..237f8df12 100644 --- a/testing/e2e/tests/middleware.spec.ts +++ b/testing/e2e/tests/middleware.spec.ts @@ -193,6 +193,113 @@ test.describe('Middleware Lifecycle', () => { } }) + test('otel middleware emits total/cache/reasoning usage details on spans', async ({ + request, + }) => { + // `/api/otel-usage` drives the OpenAI adapter against the + // `/openai-usage-details` aimock mount (total_tokens + cached_tokens + + // reasoning_tokens) with otelMiddleware attached, and returns the + // captured spans. End-to-end proof for #721: the full TokenUsage reaches + // span attributes, not just input/output tokens. + const res = await request.post('/api/otel-usage', { + data: { provider: 'openai' }, + }) + expect(res.ok()).toBe(true) + const { ok, error, spans } = await res.json() + expect(error ?? null).toBeNull() + expect(ok).toBe(true) + + const iterationSpans = spans.filter( + (s: any) => s.kind === SpanKind.CLIENT && s.ended, + ) + expect(iterationSpans.length).toBeGreaterThanOrEqual(1) + expect(iterationSpans[0].attributes).toMatchObject({ + 'gen_ai.usage.input_tokens': 100, + 'gen_ai.usage.output_tokens': 50, + 'gen_ai.usage.total_tokens': 150, + 'gen_ai.usage.cache_read.input_tokens': 80, + 'gen_ai.usage.reasoning.output_tokens': 30, + }) + + // The root span rolls up the final usage on onFinish. + const rootSpans = spans.filter((s: any) => s.kind === SpanKind.INTERNAL) + expect(rootSpans).toHaveLength(1) + expect(rootSpans[0].attributes).toMatchObject({ + 'gen_ai.usage.total_tokens': 150, + 'gen_ai.usage.cache_read.input_tokens': 80, + 'gen_ai.usage.reasoning.output_tokens': 30, + }) + }) + + test('otel middleware emits provider-reported cost on spans', async ({ + request, + }) => { + // OpenRouter adapter against the `/openrouter-cost` mount, whose trailing + // usage chunk carries `cost` / `cost_details`. Backends like PostHog read + // `gen_ai.usage.cost` directly instead of re-deriving from price tables. + const res = await request.post('/api/otel-usage', { + data: { provider: 'openrouter' }, + }) + expect(res.ok()).toBe(true) + const { ok, error, spans } = await res.json() + expect(error ?? null).toBeNull() + expect(ok).toBe(true) + + const iterationSpans = spans.filter( + (s: any) => s.kind === SpanKind.CLIENT && s.ended, + ) + expect(iterationSpans.length).toBeGreaterThanOrEqual(1) + expect(iterationSpans[0].attributes).toMatchObject({ + 'gen_ai.usage.input_tokens': 11, + 'gen_ai.usage.output_tokens': 3, + 'gen_ai.usage.total_tokens': 14, + 'gen_ai.usage.cost': 0.0042, + 'tanstack.ai.usage.upstream_cost': 0.0038, + 'tanstack.ai.usage.upstream_input_cost': 0.0012, + 'tanstack.ai.usage.upstream_output_cost': 0.0026, + }) + + const rootSpans = spans.filter((s: any) => s.kind === SpanKind.INTERNAL) + expect(rootSpans).toHaveLength(1) + expect(rootSpans[0].attributes['gen_ai.usage.cost']).toBe(0.0042) + }) + + test('otelObserver emits an image_generation span for generateImage', async ({ + request, + testId, + aimockPort, + }) => { + // `/api/otel-media` drives generateImage with an otelObserver against the + // image-gen aimock mount and returns the captured spans. End-to-end proof + // for #720: the activity-agnostic observer tags a non-chat activity with the + // right `gen_ai.operation.name`, through the public + // `@tanstack/ai/observability` subpath. + const res = await request.post('/api/otel-media', { + data: { + prompt: 'a guitar in a music store', + provider: 'openai', + testId, + aimockPort, + }, + }) + expect(res.ok()).toBe(true) + const { ok, error, spans } = await res.json() + expect(error ?? null).toBeNull() + expect(ok).toBe(true) + + const mediaSpans = spans.filter( + (s: any) => s.attributes['gen_ai.operation.name'] === 'image_generation', + ) + expect(mediaSpans).toHaveLength(1) + expect(mediaSpans[0].kind).toBe(SpanKind.CLIENT) + expect(mediaSpans[0].ended).toBe(true) + expect(mediaSpans[0].attributes).toMatchObject({ + 'gen_ai.system': 'openai', + 'gen_ai.operation.name': 'image_generation', + 'gen_ai.request.model': 'gpt-image-1', + }) + }) + test('no middleware passes content through unchanged', async ({ page, testId,