Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .changeset/activity-observers.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
'@tanstack/ai': minor
---

Add an activity-agnostic observability hook for non-chat activities (#720). The media activities — `generateImage`, `generateVideo`, `generateAudio`, `generateSpeech`, and `generateTranscription` — now accept an `observers` option taking lightweight `ActivityObserver`s (`onStart` / `onFinish` / `onError`, payload discriminated by `activity`). Observers are awaited in order and strictly non-fatal — a throwing observer is logged and skipped, never breaking the activity.

Ships `otelObserver()` on the new `@tanstack/ai/observability` subpath: it emits one `gen_ai.*` span per activity call, tagged with the correct `gen_ai.operation.name` (`image_generation`, `video_generation`, `audio_generation`, `text_to_speech`, `transcription`), and reuses the same `gen_ai.usage.*` attribute set as `otelMiddleware` — now including `tanstack.ai.usage.units_billed` for unit-billed media. With a `Meter` it also records the `gen_ai.client.operation.duration` histogram per activity. The `ActivityObserver` types are exported from the package root, while the `otelObserver` value lives on the subpath so importing `@tanstack/ai` never requires the optional `@opentelemetry/api` peer.
5 changes: 5 additions & 0 deletions .changeset/otel-full-usage-emission.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@tanstack/ai': minor
---

`otelMiddleware` now emits the rest of the reported `TokenUsage` on spans instead of only input/output tokens (#721). When the provider reports them, spans carry `gen_ai.usage.total_tokens`, `gen_ai.usage.cost` (provider-reported cost — cache discounts and gateway markup included, so backends like PostHog no longer re-derive cost from price tables), the official semconv cache/reasoning breakdowns (`gen_ai.usage.cache_read.input_tokens`, `gen_ai.usage.cache_creation.input_tokens`, `gen_ai.usage.reasoning.output_tokens`), and TanStack-namespaced attributes for duration-based billing (`tanstack.ai.usage.duration_seconds`) and the upstream cost split (`tanstack.ai.usage.upstream_cost` / `upstream_input_cost` / `upstream_output_cost`). All attributes are guarded — spans stay unchanged when a provider doesn't report a field. Media-oriented fields (`unitsBilled`, per-modality token breakdowns) and the provider-shaped `providerUsageDetails` bag are intentionally not emitted; media-activity observability is tracked in #720.
47 changes: 47 additions & 0 deletions docs/advanced/otel.md
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,15 @@ Iteration spans are numbered (`#0`, `#1`, ...) so distinct iterations of the sam
| iteration | `gen_ai.request.max_tokens` | from config |
| iteration | `gen_ai.usage.input_tokens` | per iteration |
| iteration | `gen_ai.usage.output_tokens` | per iteration |
| root / iteration | `gen_ai.usage.total_tokens` | provider-reported total |
| root / iteration | `gen_ai.usage.cost` | provider-reported cost, when available |
| root / iteration | `gen_ai.usage.cache_read.input_tokens` | cached prompt tokens, when reported |
| root / iteration | `gen_ai.usage.cache_creation.input_tokens` | cache-write prompt tokens, when reported |
| root / iteration | `gen_ai.usage.reasoning.output_tokens` | reasoning/thinking tokens, when reported |
| root / iteration | `tanstack.ai.usage.duration_seconds` | duration-based billing (e.g. transcription), when reported |
| root / iteration | `tanstack.ai.usage.upstream_cost` | gateway upstream cost (e.g. OpenRouter), when reported |
| root / iteration | `tanstack.ai.usage.upstream_input_cost` | upstream input cost split, when reported |
| root / iteration | `tanstack.ai.usage.upstream_output_cost` | upstream output cost split, when reported |
| iteration | `gen_ai.response.finish_reasons` | `[stop]`, `[tool_calls]`, ... |
| root | `gen_ai.usage.input_tokens` | rolled up |
| root | `gen_ai.usage.output_tokens` | rolled up |
Expand All @@ -81,6 +90,8 @@ Iteration spans are numbered (`#0`, `#1`, ...) so distinct iterations of the sam
| tool | `gen_ai.tool.type` | `function` |
| tool | `tanstack.ai.tool.outcome` | `success` / `error` |

Usage attributes beyond input/output tokens are emitted only when the provider reports them, so spans stay clean otherwise. Cache and reasoning breakdowns use the official GenAI semconv names; `gen_ai.usage.cost` and `gen_ai.usage.total_tokens` are de-facto extensions consumed directly by backends like PostHog — without them, backends re-derive cost from their own price tables and lose cache discounts and gateway markup. Fields with no established convention (duration-based billing, the upstream cost split) are TanStack-namespaced.

### Metrics

Two GenAI-standard histograms:
Expand Down Expand Up @@ -164,6 +175,42 @@ otelMiddleware({
})
```

## Beyond chat: media activities

`otelMiddleware` covers `chat()`. The media activities — `generateImage`, `generateVideo`, `generateAudio`, `generateSpeech`, and `generateTranscription` — are single request → response (or submit → poll for video), so instead of the chat middleware pipeline they take a lighter **observer**. Pass `otelObserver()` (from the `@tanstack/ai/observability` subpath) on the activity's `observers` option to emit one span per call:

```ts
import { generateImage } from '@tanstack/ai'
import { otelObserver } from '@tanstack/ai/observability'
import { openaiImage } from '@tanstack/ai-openai'
import { trace, metrics } from '@opentelemetry/api'

const observer = otelObserver({
tracer: trace.getTracer('my-app'),
meter: metrics.getMeter('my-app'),
})

const result = await generateImage({
adapter: openaiImage('gpt-image-1'),
Comment thread
coderabbitai[bot] marked this conversation as resolved.
Outdated
prompt: 'A serene mountain landscape at sunset',
observers: [observer],
})
```

Each call produces one `CLIENT` span tagged with the activity's `gen_ai.operation.name`:

| Activity | `gen_ai.operation.name` |
| --- | --- |
| `generateImage` | `image_generation` |
| `generateVideo` | `video_generation` |
| `generateAudio` | `audio_generation` |
| `generateSpeech` | `text_to_speech` |
| `generateTranscription` | `transcription` |

The span carries `gen_ai.system` and `gen_ai.request.model` at start and, on finish, the same `gen_ai.usage.*` / `tanstack.ai.usage.*` attributes documented above — including `tanstack.ai.usage.units_billed` for unit-billed media. When a `Meter` is supplied it records the `gen_ai.client.operation.duration` histogram, tagged per activity. For streaming video the span covers the full create → poll → complete lifecycle; for non-streaming `generateVideo` it covers job submission.

`otelObserver` supports the same `spanNameFormatter` and `attributeEnricher` extension points. For a custom backend, implement the `ActivityObserver` contract (`onStart` / `onFinish` / `onError`) directly — its event payload is discriminated by `activity`. The observer types are exported from the package root; the `otelObserver` value lives on the `@tanstack/ai/observability` subpath so importing `@tanstack/ai` never requires the optional `@opentelemetry/api` peer.

## Related

- [Middleware](./middleware) — the lifecycle this middleware hooks into
Expand Down
3 changes: 2 additions & 1 deletion docs/config.json
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,8 @@
{
"label": "OpenTelemetry",
"to": "advanced/otel",
"addedAt": "2026-05-08"
"addedAt": "2026-05-08",
"updatedAt": "2026-06-15"
}
]
},
Expand Down
4 changes: 4 additions & 0 deletions packages/ai/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@
"types": "./dist/esm/middlewares/otel.d.ts",
"import": "./dist/esm/middlewares/otel.js"
},
"./observability": {
"types": "./dist/esm/observability/index.d.ts",
"import": "./dist/esm/observability/index.js"
},
"./adapter-internals": {
"types": "./dist/esm/adapter-internals.d.ts",
"import": "./dist/esm/adapter-internals.js"
Expand Down
51 changes: 50 additions & 1 deletion packages/ai/src/activities/generateAudio/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,14 @@
import { aiEventClient } from '@tanstack/ai-event-client'
import { streamGenerationResult } from '../stream-generation-result.js'
import { resolveDebugOption } from '../../logger/resolve'
import {
notifyObserverError,
notifyObserverFinish,
notifyObserverStart,
} from '../../observability/notify'
import type { InternalLogger } from '../../logger/internal-logger'
import type { DebugOption } from '../../logger/types'
import type { ActivityObserver } from '../../observability/types'
import type { AudioAdapter } from './adapter'
import type { AudioGenerationResult, StreamChunk } from '../../types'

Expand Down Expand Up @@ -70,6 +76,12 @@ export interface AudioActivityOptions<
* control and/or a custom `Logger`.
*/
debug?: DebugOption
/**
* Observability hooks notified on start, success, and error. Pass
* `otelObserver()` to emit OpenTelemetry spans, or implement the
* `ActivityObserver` contract for a custom backend.
*/
observers?: Array<ActivityObserver>
}

// ===========================
Expand Down Expand Up @@ -135,7 +147,7 @@ async function runGenerateAudio<
>(
options: AudioActivityOptions<TAdapter, boolean>,
): Promise<AudioGenerationResult> {
const { adapter, stream: _stream, debug: _debug, ...rest } = options
const { adapter, stream: _stream, debug: _debug, observers, ...rest } = options
const model = adapter.model
const requestId = createId('audio')
const startTime = Date.now()
Expand All @@ -145,6 +157,18 @@ async function runGenerateAudio<
(adapter as { name?: string }).name ??
'unknown'

await notifyObserverStart(
observers,
{
activity: 'audio',
requestId,
provider: adapter.name,
model,
modelOptions: rest.modelOptions,
},
logger,
)

aiEventClient.emit('audio:request:started', {
requestId,
provider: adapter.name,
Expand Down Expand Up @@ -189,6 +213,19 @@ async function runGenerateAudio<
audioDuration: result.audio.duration,
})

await notifyObserverFinish(
observers,
{
activity: 'audio',
requestId,
provider: adapter.name,
model,
durationMs: elapsedMs,
usage: result.usage,
},
logger,
)

return result
} catch (error) {
const elapsedMs = Date.now() - startTime
Expand All @@ -202,6 +239,18 @@ async function runGenerateAudio<
modelOptions: rest.modelOptions as Record<string, unknown> | undefined,
timestamp: Date.now(),
})
await notifyObserverError(
observers,
{
activity: 'audio',
requestId,
provider: adapter.name,
model,
durationMs: elapsedMs,
error,
},
logger,
)
logger.errors('generateAudio activity failed', {
error,
source: 'generateAudio',
Expand Down
51 changes: 50 additions & 1 deletion packages/ai/src/activities/generateImage/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,14 @@
import { aiEventClient } from '@tanstack/ai-event-client'
import { streamGenerationResult } from '../stream-generation-result.js'
import { resolveDebugOption } from '../../logger/resolve'
import {
notifyObserverError,
notifyObserverFinish,
notifyObserverStart,
} from '../../observability/notify'
import type { InternalLogger } from '../../logger/internal-logger'
import type { DebugOption } from '../../logger/types'
import type { ActivityObserver } from '../../observability/types'
import type { ImageAdapter } from './adapter'
import type { ImageGenerationResult, StreamChunk } from '../../types'

Expand Down Expand Up @@ -92,6 +98,12 @@ export type ImageActivityOptions<
* control and/or a custom `Logger`.
*/
debug?: DebugOption
/**
* Observability hooks notified on start, success, and error. Pass
* `otelObserver()` to emit OpenTelemetry spans, or implement the
* `ActivityObserver` contract for a custom backend.
*/
observers?: Array<ActivityObserver>
} & ({} extends ImageProviderOptionsForModel<TAdapter, TAdapter['model']>
? {
/** Provider-specific options for image generation */ modelOptions?: ImageProviderOptionsForModel<
Expand Down Expand Up @@ -197,12 +209,24 @@ async function runGenerateImage<
>(
options: ImageActivityOptions<TAdapter, boolean>,
): Promise<ImageGenerationResult> {
const { adapter, stream: _stream, debug: _debug, ...rest } = options
const { adapter, stream: _stream, debug: _debug, observers, ...rest } = options
const model = adapter.model
const requestId = createId('image')
const startTime = Date.now()
const logger: InternalLogger = resolveDebugOption(options.debug)

await notifyObserverStart(
observers,
{
activity: 'image',
requestId,
provider: adapter.name,
model,
modelOptions: rest.modelOptions,
},
logger,
)

aiEventClient.emit('image:request:started', {
requestId,
provider: adapter.name,
Expand Down Expand Up @@ -255,8 +279,33 @@ async function runGenerateImage<
count: result.images.length,
})

await notifyObserverFinish(
observers,
{
activity: 'image',
requestId,
provider: adapter.name,
model,
durationMs: duration,
usage: result.usage,
},
logger,
)

return result
} catch (error) {
await notifyObserverError(
observers,
{
activity: 'image',
requestId,
provider: adapter.name,
model,
durationMs: Date.now() - startTime,
error,
},
logger,
)
logger.errors('generateImage activity failed', {
error,
source: 'generateImage',
Expand Down
51 changes: 50 additions & 1 deletion packages/ai/src/activities/generateSpeech/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,14 @@
import { aiEventClient } from '@tanstack/ai-event-client'
import { streamGenerationResult } from '../stream-generation-result.js'
import { resolveDebugOption } from '../../logger/resolve'
import {
notifyObserverError,
notifyObserverFinish,
notifyObserverStart,
} from '../../observability/notify'
import type { InternalLogger } from '../../logger/internal-logger'
import type { DebugOption } from '../../logger/types'
import type { ActivityObserver } from '../../observability/types'
import type { TTSAdapter } from './adapter'
import type { StreamChunk, TTSResult } from '../../types'

Expand Down Expand Up @@ -73,6 +79,12 @@ export interface TTSActivityOptions<
* control and/or a custom `Logger`.
*/
debug?: DebugOption
/**
* Observability hooks notified on start, success, and error. Pass
* `otelObserver()` to emit OpenTelemetry spans, or implement the
* `ActivityObserver` contract for a custom backend.
*/
observers?: Array<ActivityObserver>
}

// ===========================
Expand Down Expand Up @@ -143,7 +155,7 @@ export function generateSpeech<
async function runGenerateSpeech<
TAdapter extends TTSAdapter<string, TTSProviderOptions<TAdapter>>,
>(options: TTSActivityOptions<TAdapter, boolean>): Promise<TTSResult> {
const { adapter, stream: _stream, debug: _debug, ...rest } = options
const { adapter, stream: _stream, debug: _debug, observers, ...rest } = options
const model = adapter.model
const requestId = createId('speech')
const startTime = Date.now()
Expand All @@ -153,6 +165,18 @@ async function runGenerateSpeech<
(adapter as { name?: string }).name ??
'unknown'

await notifyObserverStart(
observers,
{
activity: 'speech',
requestId,
provider: adapter.name,
model,
modelOptions: rest.modelOptions,
},
logger,
)

aiEventClient.emit('speech:request:started', {
requestId,
provider: adapter.name,
Expand Down Expand Up @@ -202,6 +226,19 @@ async function runGenerateSpeech<
contentType: result.contentType,
})

await notifyObserverFinish(
observers,
{
activity: 'speech',
requestId,
provider: adapter.name,
model,
durationMs: duration,
usage: result.usage,
},
logger,
)

return result
} catch (error) {
const duration = Date.now() - startTime
Expand All @@ -215,6 +252,18 @@ async function runGenerateSpeech<
modelOptions: rest.modelOptions as Record<string, unknown> | undefined,
timestamp: Date.now(),
})
await notifyObserverError(
observers,
{
activity: 'speech',
requestId,
provider: adapter.name,
model,
durationMs: duration,
error,
},
logger,
)
logger.errors('generateSpeech activity failed', {
error,
source: 'generateSpeech',
Expand Down
Loading