Skip to content

Commit 3ae887b

Browse files
authored
fix(bedrock): throw APIError for error events delivered in chunk frames (#1021)
1 parent 74ac150 commit 3ae887b

2 files changed

Lines changed: 67 additions & 2 deletions

File tree

packages/bedrock-sdk/src/core/streaming.ts

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ export class Stream<Item> extends CoreStream<Item> {
6363
}
6464
}
6565

66-
// Note: this function is copied entirely from the core SDK
66+
// Note: this function is adapted from the core SDK
6767
async function* iterator(): AsyncIterator<Item, any, undefined> {
6868
if (consumed) {
6969
throw new Error('Cannot iterate over a consumed stream, use `.tee()` to split the stream.');
@@ -73,13 +73,19 @@ export class Stream<Item> extends CoreStream<Item> {
7373
try {
7474
for await (const sse of iterMessages()) {
7575
if (sse.event === 'chunk') {
76+
let parsed;
7677
try {
77-
yield JSON.parse(sse.data);
78+
parsed = JSON.parse(sse.data);
7879
} catch (e) {
7980
logger.error(`Could not parse message into JSON:`, sse.data);
8081
logger.error(`From chunk:`, sse.raw);
8182
throw e;
8283
}
84+
if (parsed && typeof parsed === 'object' && parsed.type === 'error') {
85+
// Anthropic-format error delivered inside a Bedrock chunk frame
86+
throw new APIError(undefined, parsed, undefined, response.headers, parsed.error?.type);
87+
}
88+
yield parsed;
8389
}
8490

8591
if (sse.event === 'error') {
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
import { Readable } from 'node:stream';
2+
import { EventStreamMarshaller } from '@smithy/eventstream-serde-node';
3+
import { APIError } from '@anthropic-ai/sdk';
4+
import { Stream, fromUtf8, toUtf8 } from '../src/core/streaming';
5+
6+
function encodeChunkFrame(payload: unknown): ReadableStream {
7+
const marshaller = new EventStreamMarshaller({ utf8Encoder: toUtf8, utf8Decoder: fromUtf8 });
8+
const inner = JSON.stringify(payload);
9+
const body = fromUtf8(JSON.stringify({ bytes: Buffer.from(inner).toString('base64') }));
10+
const serialized = marshaller.serialize(
11+
(async function* () {
12+
yield {
13+
headers: {
14+
':message-type': { type: 'string', value: 'event' },
15+
':event-type': { type: 'string', value: 'chunk' },
16+
':content-type': { type: 'string', value: 'application/json' },
17+
},
18+
body,
19+
};
20+
})(),
21+
(msg: any) => msg,
22+
);
23+
return Readable.toWeb(Readable.from(serialized)) as ReadableStream;
24+
}
25+
26+
describe('Bedrock Stream.fromSSEResponse', () => {
27+
test('throws APIError when a chunk frame contains an Anthropic error payload', async () => {
28+
const response = new Response(
29+
encodeChunkFrame({ type: 'error', error: { type: 'overloaded_error', message: 'test' } }),
30+
);
31+
const stream = Stream.fromSSEResponse(response, new AbortController());
32+
33+
let caught: unknown;
34+
try {
35+
for await (const _ of stream) {
36+
// consume
37+
}
38+
} catch (e) {
39+
caught = e;
40+
}
41+
42+
expect(caught).toBeInstanceOf(APIError);
43+
expect(String(caught)).not.toContain('Unexpected event order');
44+
expect((caught as APIError).type).toBe('overloaded_error');
45+
});
46+
47+
test('yields normal chunk payloads unchanged', async () => {
48+
const response = new Response(
49+
encodeChunkFrame({ type: 'message_start', message: { id: 'msg_1', role: 'assistant' } }),
50+
);
51+
const stream = Stream.fromSSEResponse<any>(response, new AbortController());
52+
53+
const events: any[] = [];
54+
for await (const ev of stream) events.push(ev);
55+
56+
expect(events).toHaveLength(1);
57+
expect(events[0].type).toBe('message_start');
58+
});
59+
});

0 commit comments

Comments
 (0)