feat(ai): activity-agnostic observability hook for media activities (#720)#760
feat(ai): activity-agnostic observability hook for media activities (#720)#760season179 wants to merge 3 commits into
Conversation
…g details) otelMiddleware only emitted gen_ai.usage.input_tokens/output_tokens even though TokenUsage already carries provider-reported cost, total tokens, cache/reasoning breakdowns, and duration-based billing. Backends like PostHog had to re-derive cost from their own price tables, losing cache discounts and gateway markup (OpenRouter), and duration-billed activities had no cost signal at all. A shared usageAttributes() helper now builds the full guarded attribute set at all three emission sites (RUN_FINISHED chunk, onUsage, onFinish rollup): - gen_ai.usage.total_tokens / gen_ai.usage.cost (de-facto extensions consumed directly by PostHog and LiteLLM-style backends) - gen_ai.usage.cache_read.input_tokens, cache_creation.input_tokens, reasoning.output_tokens (official GenAI semconv names) - tanstack.ai.usage.duration_seconds and the upstream cost split (no semconv equivalent exists) E2E: new /api/otel-usage route drives the existing openai-usage-details and openrouter-cost aimock mounts through otelMiddleware with a local capture tracer; middleware.spec.ts asserts the attributes land on iteration and root spans. Fixes TanStack#721
…anStack#720) Add an `ActivityObserver` contract (onStart/onFinish/onError, payload discriminated by `activity`) registerable on any activity via a new `observers` option. Observers are awaited in order and strictly non-fatal — a throwing observer is logged and skipped, never breaking the activity. Wire it into generateImage, generateVideo, generateAudio, generateSpeech, and generateTranscription. Chat stays on otelMiddleware; the ActivityKind type includes `chat` for a future unified contract. Ship `otelObserver()` on a new `@tanstack/ai/observability` subpath: one gen_ai.* span per call tagged with the correct gen_ai.operation.name, reusing the shared usage-attribute set (now including tanstack.ai.usage.units_billed) and the gen_ai.client.operation.duration histogram when a Meter is supplied. The observer types are exported from the package root while the otelObserver value lives only on the subpath, so importing @tanstack/ai never requires the optional @opentelemetry/api peer. Streaming generateVideo covers the full create/poll/complete lifecycle and ends its span even if the consumer abandons the stream mid-poll; non-streaming generateVideo records a submit span.
|
No actionable comments were generated in the recent review. 🎉 ℹ️ Recent review info⚙️ Run configurationConfiguration used: defaults Review profile: CHILL Plan: Pro Run ID: 📒 Files selected for processing (5)
✅ Files skipped from review due to trivial changes (1)
🚧 Files skipped from review as they are similar to previous changes (4)
📝 WalkthroughWalkthroughAdds activity-agnostic observability to ChangesActivity Observability and Full OTel Usage
Sequence Diagram(s)sequenceDiagram
participant Caller
participant MediaActivity as Media Activity<br/>(generateImage/etc.)
participant notifyObserver
participant ActivityObserver as ActivityObserver<br/>(e.g. otelObserver)
participant Adapter
Caller->>MediaActivity: generateImage({ observers: [otelObserver] })
MediaActivity->>notifyObserver: notifyObserverStart(observers, startEvent)
notifyObserver->>ActivityObserver: onStart(ActivityStartEvent)
ActivityObserver->>ActivityObserver: startSpan(gen_ai.operation.name=image_generation)
MediaActivity->>Adapter: adapter.generateImage(request)
Adapter-->>MediaActivity: result + usage
MediaActivity->>notifyObserver: notifyObserverFinish(observers, finishEvent + durationMs + usage)
notifyObserver->>ActivityObserver: onFinish(ActivityFinishEvent)
ActivityObserver->>ActivityObserver: span.setAttributes(usageAttributes)<br/>span.end()<br/>recordDuration()
MediaActivity-->>Caller: result
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related PRs
Suggested reviewers
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 3
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@docs/advanced/otel.md`:
- Line 194: The openaiImage adapter example in the documentation is using an
outdated model identifier. In the openaiImage function call, replace the model
parameter from 'gpt-image-1' to 'gpt-image-2' to reflect the latest OpenAI image
generation model as defined in the adapter's model-meta.ts file.
In `@packages/ai/src/activities/generateVideo/index.ts`:
- Around line 375-385: The observer lifecycle is mismanaged: notifyObserverStart
at lines 375 and around 395-397 occurs after yields, allowing early breaks to
emit onError without onStart, and notifyObserverFinish at lines 459-472 occurs
after yielding generation:result, causing the finally block (lines 518-539) to
see settled === false and emit a synthetic cancellation error. Move both
notifyObserverStart calls to execute before any yields in the function, and move
notifyObserverFinish to execute before the generation:result yield (or set
settled === true before yielding), so that the finally block's settled flag
accurately reflects whether the observer lifecycle was properly started and
completed.
In `@testing/e2e/src/routes/api.otel-media.ts`:
- Around line 117-127: The payload validation is missing before destructuring
properties from the data object. Currently, if body.forwardedProps, body.data,
or body is null or a primitive value, or if required fields (prompt, provider)
are missing, the destructuring assignment will throw an unhandled error
resulting in a 500 response instead of the proper error response. Add validation
logic before the destructuring of prompt, provider, testId, and aimockPort from
the data object to check that data is a valid object with the required
properties, and return { ok: false, error } with an appropriate error message if
validation fails. Only proceed to createImageAdapter and
createLocalCaptureTracer after successful validation.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 3e1da7aa-1e1b-402b-98f8-1b412af95984
📒 Files selected for processing (27)
.changeset/activity-observers.md.changeset/otel-full-usage-emission.mddocs/advanced/otel.mddocs/config.jsonpackages/ai/package.jsonpackages/ai/src/activities/generateAudio/index.tspackages/ai/src/activities/generateImage/index.tspackages/ai/src/activities/generateSpeech/index.tspackages/ai/src/activities/generateTranscription/index.tspackages/ai/src/activities/generateVideo/index.tspackages/ai/src/index.tspackages/ai/src/middlewares/otel.tspackages/ai/src/observability/index.tspackages/ai/src/observability/notify.tspackages/ai/src/observability/otel.tspackages/ai/src/observability/types.tspackages/ai/src/observability/usage-attributes.tspackages/ai/src/utilities/errors.tspackages/ai/src/utilities/numbers.tspackages/ai/tests/middlewares/otel.test.tspackages/ai/tests/observability/activity-observers.test.tspackages/ai/tests/observability/otel-observer.test.tspackages/ai/vite.config.tstesting/e2e/src/routeTree.gen.tstesting/e2e/src/routes/api.otel-media.tstesting/e2e/src/routes/api.otel-usage.tstesting/e2e/tests/middleware.spec.ts
| await notifyObserverStart( | ||
| observers, | ||
| { | ||
| activity: 'video', | ||
| requestId, | ||
| provider: adapter.name, | ||
| model, | ||
| modelOptions, | ||
| }, | ||
| logger, | ||
| ) |
There was a problem hiding this comment.
Observer terminal state can be misreported as cancelled on successful completion.
At Line 459, notifyObserverFinish runs after yielding generation:result; if the consumer breaks at that yield, finally (Line 519) still sees settled === false and emits a synthetic cancellation error. Also, notifyObserverStart is currently after the first yield (Line 375), so an early break can produce onError without onStart.
Suggested fix
- yield {
- type: 'RUN_STARTED',
- runId,
- threadId,
- timestamp: Date.now(),
- } as StreamChunk
-
await notifyObserverStart(
observers,
{
activity: 'video',
requestId,
provider: adapter.name,
model,
modelOptions,
},
logger,
)
+
+ yield {
+ type: 'RUN_STARTED',
+ runId,
+ threadId,
+ timestamp: Date.now(),
+ } as StreamChunk
...
- yield {
- type: 'CUSTOM',
- name: 'generation:result',
- value: {
- jobId: jobResult.jobId,
- status: 'completed',
- url: urlResult.url,
- expiresAt: urlResult.expiresAt,
- ...(urlResult.usage ? { usage: urlResult.usage } : {}),
- },
- timestamp: Date.now(),
- } as StreamChunk
-
+ settled = true
await notifyObserverFinish(
observers,
{
activity: 'video',
requestId,
provider: adapter.name,
model,
durationMs: Date.now() - obsStartTime,
usage: urlResult.usage,
},
logger,
)
- settled = true
+
+ yield {
+ type: 'CUSTOM',
+ name: 'generation:result',
+ value: {
+ jobId: jobResult.jobId,
+ status: 'completed',
+ url: urlResult.url,
+ expiresAt: urlResult.expiresAt,
+ ...(urlResult.usage ? { usage: urlResult.usage } : {}),
+ },
+ timestamp: Date.now(),
+ } as StreamChunkAlso applies to: 395-397, 459-472, 518-539
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@packages/ai/src/activities/generateVideo/index.ts` around lines 375 - 385,
The observer lifecycle is mismanaged: notifyObserverStart at lines 375 and
around 395-397 occurs after yields, allowing early breaks to emit onError
without onStart, and notifyObserverFinish at lines 459-472 occurs after yielding
generation:result, causing the finally block (lines 518-539) to see settled ===
false and emit a synthetic cancellation error. Move both notifyObserverStart
calls to execute before any yields in the function, and move
notifyObserverFinish to execute before the generation:result yield (or set
settled === true before yielding), so that the finally block's settled flag
accurately reflects whether the observer lifecycle was properly started and
completed.
| const body = await request.json() | ||
| const data = body.forwardedProps ?? body.data ?? body | ||
| const { prompt, provider, testId, aimockPort } = data as { | ||
| prompt: string | ||
| provider: Provider | ||
| testId?: string | ||
| aimockPort?: number | ||
| } | ||
|
|
||
| const adapter = createImageAdapter(provider, aimockPort, testId) | ||
| const { tracer, spans } = createLocalCaptureTracer() |
There was a problem hiding this comment.
Guard payload shape before property access/destructuring.
body.forwardedProps ?? body.data ?? body and the typed destructure run before the handler’s error response path. A null/primitive payload (or missing required fields) can throw and surface as an unhandled 500 instead of { ok: false, error }.
Suggested fix
- const body = await request.json()
- const data = body.forwardedProps ?? body.data ?? body
- const { prompt, provider, testId, aimockPort } = data as {
- prompt: string
- provider: Provider
- testId?: string
- aimockPort?: number
- }
-
- const adapter = createImageAdapter(provider, aimockPort, testId)
+ let prompt: string
+ let provider: Provider
+ let testId: string | undefined
+ let aimockPort: number | undefined
+ let adapter: ReturnType<typeof createImageAdapter>
+
+ try {
+ const body: unknown = await request.json()
+ const container =
+ body && typeof body === 'object'
+ ? ((body as Record<string, unknown>).forwardedProps ??
+ (body as Record<string, unknown>).data ??
+ body)
+ : null
+ if (!container || typeof container !== 'object') {
+ throw new Error('Invalid request body')
+ }
+ const raw = container as Record<string, unknown>
+ if (typeof raw.prompt !== 'string' || typeof raw.provider !== 'string') {
+ throw new Error('Missing required fields: prompt/provider')
+ }
+ prompt = raw.prompt
+ provider = raw.provider as Provider
+ testId = typeof raw.testId === 'string' ? raw.testId : undefined
+ aimockPort = typeof raw.aimockPort === 'number' ? raw.aimockPort : undefined
+ adapter = createImageAdapter(provider, aimockPort, testId)
+ } catch (error) {
+ return new Response(
+ JSON.stringify({
+ ok: false,
+ error: error instanceof Error ? error.message : String(error),
+ }),
+ { status: 200, headers: { 'Content-Type': 'application/json' } },
+ )
+ }🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@testing/e2e/src/routes/api.otel-media.ts` around lines 117 - 127, The payload
validation is missing before destructuring properties from the data object.
Currently, if body.forwardedProps, body.data, or body is null or a primitive
value, or if required fields (prompt, provider) are missing, the destructuring
assignment will throw an unhandled error resulting in a 500 response instead of
the proper error response. Add validation logic before the destructuring of
prompt, provider, testId, and aimockPort from the data object to check that data
is a valid object with the required properties, and return { ok: false, error }
with an appropriate error message if validation fails. Only proceed to
createImageAdapter and createLocalCaptureTracer after successful validation.
Move notifyObserverFinish + settled=true ahead of the generation:result yield in streaming generateVideo. Previously they ran after the yield, so a consumer that stopped reading once it had the result (without pulling RUN_FINISHED) tripped the finally cleanup with settled still false — reporting a spurious cancellation onError and never firing onFinish. Add a regression test for that abandonment case. Also use the latest gpt-image-2 model id in the otel docs and otelObserver JSDoc examples, and clear lint (import order, redundant casts) in the otel-media e2e route.
Summary
Closes #720.
Adds an activity-agnostic observability hook so non-chat activities (image, video, audio, speech, transcription) get the same OpenTelemetry coverage chat already has through
otelMiddleware.An
ActivityObserveris a small contract with three optional callbacks —onStart,onFinish,onError— whose payloads are discriminated by anactivityfield. Observers are registered per call via a newobserversoption, awaited in registration order, and strictly non-fatal: a throwing observer is logged and skipped, never breaking the activity.The first observer shipped is
otelObserver(), exported from a new@tanstack/ai/observabilitysubpath. It opens onegen_ai.*span per call, tagged with the rightgen_ai.operation.namefor the activity (image_generation,video_generation,audio_generation,text_to_speech,transcription), and reuses the shared usage-attribute set so cost, totals, cache/reasoning detail, duration billing, and media unit counts land identically across activities. When aMeteris supplied it also records thegen_ai.client.operation.durationhistogram — the same metric the chat middleware emits.Stacked on #747
This is stacked on #747 (
feat/otel-full-usage-emission), which introduces the sharedusageAttributes()helper this PR consumes. Merge #747 first.Until #747 merges, the diff below includes its single commit (
c7df2a33, "emit full usage on otel spans") in addition to the #720 work — review that commit in #747, not here. Once #747 lands inmain, this PR's diff narrows automatically to just the activity-observer changes.Test plan
packages/ai/tests/observability/— observer notification semantics,otelObserverspan/metric behavior, and a regression test for the streaming-video abandonment case (20 tests, green).testing/e2eroute + spec exercisingotelObserverongenerateImagethrough the mock transport.pnpm test:prgreen locally (sherif, knip, docs, eslint, lib, types, build) plus the E2E suite.Summary by CodeRabbit
otelObserver()to automatically emit OpenTelemetrygen_ai.*spans per activity, including correct operation names and optional duration metrics.@tanstack/ai/observabilitysubpath.