Skip to content

Commit 158cbaf

Browse files
SebConejoclaude
andauthored
fix(proxy): capture streaming token usage for passthrough providers (mnfst#1567)
Two fixes for streaming responses logging 0 tokens: 1. Flush the passthrough buffer after stream ends so usage from the final SSE chunk is captured even without trailing \n\n. 2. Inject stream_options.include_usage for OpenAI/OpenRouter streaming requests so providers always send usage data — needed for both Manifest DB logging and downstream client context management. Closes mnfst#1477, closes mnfst#1502 Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 34ab548 commit 158cbaf

File tree

4 files changed

+174
-0
lines changed

4 files changed

+174
-0
lines changed

packages/backend/src/routing/proxy/__tests__/provider-client.spec.ts

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1385,6 +1385,84 @@ describe('ProviderClient', () => {
13851385
});
13861386
});
13871387

1388+
describe('stream_options.include_usage injection', () => {
1389+
it('injects stream_options.include_usage for OpenAI streaming requests', async () => {
1390+
mockFetch.mockResolvedValue(new Response('{}', { status: 200 }));
1391+
await client.forward({
1392+
provider: 'openai',
1393+
apiKey: 'sk-test',
1394+
model: 'gpt-4o',
1395+
body: { messages: [{ role: 'user', content: 'Hello' }] },
1396+
stream: true,
1397+
});
1398+
1399+
const sentBody = JSON.parse(mockFetch.mock.calls[0][1].body);
1400+
expect(sentBody.stream_options).toEqual({ include_usage: true });
1401+
});
1402+
1403+
it('injects stream_options.include_usage for OpenRouter streaming requests', async () => {
1404+
mockFetch.mockResolvedValue(new Response('{}', { status: 200 }));
1405+
await client.forward({
1406+
provider: 'openrouter',
1407+
apiKey: 'sk-or',
1408+
model: 'openai/gpt-4o',
1409+
body: { messages: [{ role: 'user', content: 'Hello' }] },
1410+
stream: true,
1411+
});
1412+
1413+
const sentBody = JSON.parse(mockFetch.mock.calls[0][1].body);
1414+
expect(sentBody.stream_options).toEqual({ include_usage: true });
1415+
});
1416+
1417+
it('does not inject stream_options for non-streaming OpenAI requests', async () => {
1418+
mockFetch.mockResolvedValue(new Response('{}', { status: 200 }));
1419+
await client.forward({
1420+
provider: 'openai',
1421+
apiKey: 'sk-test',
1422+
model: 'gpt-4o',
1423+
body: { messages: [{ role: 'user', content: 'Hello' }] },
1424+
stream: false,
1425+
});
1426+
1427+
const sentBody = JSON.parse(mockFetch.mock.calls[0][1].body);
1428+
expect(sentBody.stream_options).toBeUndefined();
1429+
});
1430+
1431+
it('does not inject stream_options for non-passthrough providers', async () => {
1432+
mockFetch.mockResolvedValue(new Response('{}', { status: 200 }));
1433+
await client.forward({
1434+
provider: 'mistral',
1435+
apiKey: 'sk-mi',
1436+
model: 'mistral-small',
1437+
body: { messages: [{ role: 'user', content: 'Hello' }] },
1438+
stream: true,
1439+
});
1440+
1441+
const sentBody = JSON.parse(mockFetch.mock.calls[0][1].body);
1442+
expect(sentBody.stream_options).toBeUndefined();
1443+
});
1444+
1445+
it('preserves existing stream_options fields when injecting include_usage', async () => {
1446+
mockFetch.mockResolvedValue(new Response('{}', { status: 200 }));
1447+
await client.forward({
1448+
provider: 'openai',
1449+
apiKey: 'sk-test',
1450+
model: 'gpt-4o',
1451+
body: {
1452+
messages: [{ role: 'user', content: 'Hello' }],
1453+
stream_options: { some_field: 'value' },
1454+
},
1455+
stream: true,
1456+
});
1457+
1458+
const sentBody = JSON.parse(mockFetch.mock.calls[0][1].body);
1459+
expect(sentBody.stream_options).toEqual({
1460+
some_field: 'value',
1461+
include_usage: true,
1462+
});
1463+
});
1464+
});
1465+
13881466
describe('Custom endpoint', () => {
13891467
it('uses custom endpoint instead of resolving by provider name', async () => {
13901468
mockFetch.mockResolvedValue(new Response('{}', { status: 200 }));

packages/backend/src/routing/proxy/__tests__/stream-writer.spec.ts

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -484,6 +484,64 @@ describe('pipeStream', () => {
484484
});
485485
});
486486

487+
it('should capture usage from leftover passthrough buffer', async () => {
488+
const { res } = mockResponse();
489+
const usagePayload = JSON.stringify({
490+
choices: [],
491+
usage: { prompt_tokens: 31, completion_tokens: 10, cache_read_tokens: 5 },
492+
});
493+
// Final chunk with usage does NOT end with \n\n — stays in passthroughBuffer
494+
const stream = createReadableStream([
495+
`data: ${JSON.stringify({ choices: [{ delta: { content: 'hi' } }] })}\n\n`,
496+
`data: ${usagePayload}`,
497+
]);
498+
499+
const usage = await pipeStream(stream, res as never);
500+
501+
expect(usage).toEqual({
502+
prompt_tokens: 31,
503+
completion_tokens: 10,
504+
cache_read_tokens: 5,
505+
cache_creation_tokens: undefined,
506+
});
507+
});
508+
509+
it('should handle non-JSON leftover in passthrough buffer', async () => {
510+
const { res } = mockResponse();
511+
const stream = createReadableStream([
512+
`data: ${JSON.stringify({ choices: [{ delta: { content: 'hi' } }] })}\n\n`,
513+
'data: not-valid-json',
514+
]);
515+
516+
const usage = await pipeStream(stream, res as never);
517+
518+
expect(usage).toBeNull();
519+
});
520+
521+
it('should skip [DONE] in passthrough flush buffer', async () => {
522+
const { res } = mockResponse();
523+
const stream = createReadableStream([
524+
`data: ${JSON.stringify({ choices: [{ delta: { content: 'hi' } }] })}\n\n`,
525+
'data: [DONE]',
526+
]);
527+
528+
const usage = await pipeStream(stream, res as never);
529+
530+
expect(usage).toBeNull();
531+
});
532+
533+
it('should not flush passthrough buffer when whitespace-only', async () => {
534+
const { res } = mockResponse();
535+
const stream = createReadableStream([
536+
`data: ${JSON.stringify({ choices: [{ delta: { content: 'hi' } }] })}\n\n`,
537+
' \n ',
538+
]);
539+
540+
const usage = await pipeStream(stream, res as never);
541+
542+
expect(usage).toBeNull();
543+
});
544+
487545
it('should capture usage from leftover buffer in flush section with transform', async () => {
488546
const { res } = mockResponse();
489547
const usagePayload = JSON.stringify({

packages/backend/src/routing/proxy/provider-client.ts

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,18 @@ export class ProviderClient {
119119
url = `${endpoint.baseUrl}${endpoint.buildPath(bareModel)}`;
120120
headers = endpoint.buildHeaders(apiKey, authType);
121121
const sanitized = sanitizeOpenAiBody(body, endpointKey, model);
122+
123+
// Inject stream_options.include_usage so providers always send token
124+
// usage in streaming responses — needed for both DB logging and
125+
// downstream clients (e.g. OpenClaw context management).
126+
if (stream && (endpointKey === 'openai' || endpointKey === 'openrouter')) {
127+
const existing =
128+
typeof sanitized.stream_options === 'object' && sanitized.stream_options !== null
129+
? (sanitized.stream_options as Record<string, unknown>)
130+
: {};
131+
sanitized.stream_options = { ...existing, include_usage: true };
132+
}
133+
122134
requestBody = { ...sanitized, model: bareModel, stream };
123135

124136
// Inject cache_control for OpenRouter requests targeting Anthropic models

packages/backend/src/routing/proxy/stream-writer.ts

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,32 @@ export async function pipeStream(
144144
}
145145
}
146146

147+
// Flush any remaining passthrough buffer content for usage extraction.
148+
// The final SSE chunk (containing usage) may not end with \n\n,
149+
// leaving it unparsed in passthroughBuffer.
150+
if (!transform && passthroughBuffer.trim()) {
151+
const payload = passthroughBuffer
152+
.split('\n')
153+
.map((line) => (line.startsWith('data: ') ? line.slice(6) : line))
154+
.join('\n')
155+
.trim();
156+
if (payload && payload !== '[DONE]') {
157+
try {
158+
const obj = JSON.parse(payload);
159+
if (obj.usage && typeof obj.usage.prompt_tokens === 'number') {
160+
capturedUsage = {
161+
prompt_tokens: obj.usage.prompt_tokens,
162+
completion_tokens: obj.usage.completion_tokens ?? 0,
163+
cache_read_tokens: obj.usage.cache_read_tokens,
164+
cache_creation_tokens: obj.usage.cache_creation_tokens,
165+
};
166+
}
167+
} catch {
168+
/* ignore non-JSON remaining content */
169+
}
170+
}
171+
}
172+
147173
// Flush any remaining buffer content through the transform
148174
if (transform && sseBuffer.trim()) {
149175
const payload = sseBuffer

0 commit comments

Comments
 (0)