Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1385,6 +1385,84 @@ describe('ProviderClient', () => {
});
});

describe('stream_options.include_usage injection', () => {
it('injects stream_options.include_usage for OpenAI streaming requests', async () => {
mockFetch.mockResolvedValue(new Response('{}', { status: 200 }));
await client.forward({
provider: 'openai',
apiKey: 'sk-test',
model: 'gpt-4o',
body: { messages: [{ role: 'user', content: 'Hello' }] },
stream: true,
});

const sentBody = JSON.parse(mockFetch.mock.calls[0][1].body);
expect(sentBody.stream_options).toEqual({ include_usage: true });
});

it('injects stream_options.include_usage for OpenRouter streaming requests', async () => {
mockFetch.mockResolvedValue(new Response('{}', { status: 200 }));
await client.forward({
provider: 'openrouter',
apiKey: 'sk-or',
model: 'openai/gpt-4o',
body: { messages: [{ role: 'user', content: 'Hello' }] },
stream: true,
});

const sentBody = JSON.parse(mockFetch.mock.calls[0][1].body);
expect(sentBody.stream_options).toEqual({ include_usage: true });
});

it('does not inject stream_options for non-streaming OpenAI requests', async () => {
mockFetch.mockResolvedValue(new Response('{}', { status: 200 }));
await client.forward({
provider: 'openai',
apiKey: 'sk-test',
model: 'gpt-4o',
body: { messages: [{ role: 'user', content: 'Hello' }] },
stream: false,
});

const sentBody = JSON.parse(mockFetch.mock.calls[0][1].body);
expect(sentBody.stream_options).toBeUndefined();
});

it('does not inject stream_options for non-passthrough providers', async () => {
mockFetch.mockResolvedValue(new Response('{}', { status: 200 }));
await client.forward({
provider: 'mistral',
apiKey: 'sk-mi',
model: 'mistral-small',
body: { messages: [{ role: 'user', content: 'Hello' }] },
stream: true,
});

const sentBody = JSON.parse(mockFetch.mock.calls[0][1].body);
expect(sentBody.stream_options).toBeUndefined();
});

it('preserves existing stream_options fields when injecting include_usage', async () => {
mockFetch.mockResolvedValue(new Response('{}', { status: 200 }));
await client.forward({
provider: 'openai',
apiKey: 'sk-test',
model: 'gpt-4o',
body: {
messages: [{ role: 'user', content: 'Hello' }],
stream_options: { some_field: 'value' },
},
stream: true,
});

const sentBody = JSON.parse(mockFetch.mock.calls[0][1].body);
expect(sentBody.stream_options).toEqual({
some_field: 'value',
include_usage: true,
});
});
});

describe('Custom endpoint', () => {
it('uses custom endpoint instead of resolving by provider name', async () => {
mockFetch.mockResolvedValue(new Response('{}', { status: 200 }));
Expand Down
58 changes: 58 additions & 0 deletions packages/backend/src/routing/proxy/__tests__/stream-writer.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -484,6 +484,64 @@ describe('pipeStream', () => {
});
});

it('should capture usage from leftover passthrough buffer', async () => {
const { res } = mockResponse();
const usagePayload = JSON.stringify({
choices: [],
usage: { prompt_tokens: 31, completion_tokens: 10, cache_read_tokens: 5 },
});
// Final chunk with usage does NOT end with \n\n — stays in passthroughBuffer
const stream = createReadableStream([
`data: ${JSON.stringify({ choices: [{ delta: { content: 'hi' } }] })}\n\n`,
`data: ${usagePayload}`,
]);

const usage = await pipeStream(stream, res as never);

expect(usage).toEqual({
prompt_tokens: 31,
completion_tokens: 10,
cache_read_tokens: 5,
cache_creation_tokens: undefined,
});
});

it('should handle non-JSON leftover in passthrough buffer', async () => {
const { res } = mockResponse();
const stream = createReadableStream([
`data: ${JSON.stringify({ choices: [{ delta: { content: 'hi' } }] })}\n\n`,
'data: not-valid-json',
]);

const usage = await pipeStream(stream, res as never);

expect(usage).toBeNull();
});

it('should skip [DONE] in passthrough flush buffer', async () => {
const { res } = mockResponse();
const stream = createReadableStream([
`data: ${JSON.stringify({ choices: [{ delta: { content: 'hi' } }] })}\n\n`,
'data: [DONE]',
]);

const usage = await pipeStream(stream, res as never);

expect(usage).toBeNull();
});

it('should not flush passthrough buffer when whitespace-only', async () => {
const { res } = mockResponse();
const stream = createReadableStream([
`data: ${JSON.stringify({ choices: [{ delta: { content: 'hi' } }] })}\n\n`,
' \n ',
]);

const usage = await pipeStream(stream, res as never);

expect(usage).toBeNull();
});

it('should capture usage from leftover buffer in flush section with transform', async () => {
const { res } = mockResponse();
const usagePayload = JSON.stringify({
Expand Down
12 changes: 12 additions & 0 deletions packages/backend/src/routing/proxy/provider-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,18 @@ export class ProviderClient {
url = `${endpoint.baseUrl}${endpoint.buildPath(bareModel)}`;
headers = endpoint.buildHeaders(apiKey, authType);
const sanitized = sanitizeOpenAiBody(body, endpointKey, model);

// Inject stream_options.include_usage so providers always send token
// usage in streaming responses — needed for both DB logging and
// downstream clients (e.g. OpenClaw context management).
if (stream && (endpointKey === 'openai' || endpointKey === 'openrouter')) {
const existing =
typeof sanitized.stream_options === 'object' && sanitized.stream_options !== null
? (sanitized.stream_options as Record<string, unknown>)
: {};
sanitized.stream_options = { ...existing, include_usage: true };
}

requestBody = { ...sanitized, model: bareModel, stream };

// Inject cache_control for OpenRouter requests targeting Anthropic models
Expand Down
26 changes: 26 additions & 0 deletions packages/backend/src/routing/proxy/stream-writer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,32 @@ export async function pipeStream(
}
}

// Flush any remaining passthrough buffer content for usage extraction.
// The final SSE chunk (containing usage) may not end with \n\n,
// leaving it unparsed in passthroughBuffer.
if (!transform && passthroughBuffer.trim()) {
const payload = passthroughBuffer
.split('\n')
.map((line) => (line.startsWith('data: ') ? line.slice(6) : line))
.join('\n')
.trim();
if (payload && payload !== '[DONE]') {
try {
const obj = JSON.parse(payload);
if (obj.usage && typeof obj.usage.prompt_tokens === 'number') {
capturedUsage = {
prompt_tokens: obj.usage.prompt_tokens,
completion_tokens: obj.usage.completion_tokens ?? 0,
cache_read_tokens: obj.usage.cache_read_tokens,
cache_creation_tokens: obj.usage.cache_creation_tokens,
};
}
} catch {
/* ignore non-JSON remaining content */
}
}
}

// Flush any remaining buffer content through the transform
if (transform && sseBuffer.trim()) {
const payload = sseBuffer
Expand Down
Loading