Skip to content

Commit 09a85a8

Browse files
fix: call onerror for silently swallowed transport errors (#1580)
Co-authored-by: Felix Weinberger <fweinberger@anthropic.com>
1 parent e79d14a commit 09a85a8

2 files changed

Lines changed: 183 additions & 1 deletion

File tree

src/server/webStandardStreamableHttp.ts

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -383,6 +383,7 @@ export class WebStandardStreamableHTTPServerTransport implements Transport {
383383
// The client MUST include an Accept header, listing text/event-stream as a supported content type.
384384
const acceptHeader = req.headers.get('accept');
385385
if (!acceptHeader?.includes('text/event-stream')) {
386+
this.onerror?.(new Error('Not Acceptable: Client must accept text/event-stream'));
386387
return this.createJsonErrorResponse(406, -32000, 'Not Acceptable: Client must accept text/event-stream');
387388
}
388389

@@ -409,6 +410,7 @@ export class WebStandardStreamableHTTPServerTransport implements Transport {
409410
// Check if there's already an active standalone SSE stream for this session
410411
if (this._streamMapping.get(this._standaloneSseStreamId) !== undefined) {
411412
// Only one GET SSE stream is allowed per session
413+
this.onerror?.(new Error('Conflict: Only one SSE stream is allowed per session'));
412414
return this.createJsonErrorResponse(409, -32000, 'Conflict: Only one SSE stream is allowed per session');
413415
}
414416

@@ -460,6 +462,7 @@ export class WebStandardStreamableHTTPServerTransport implements Transport {
460462
*/
461463
private async replayEvents(lastEventId: string): Promise<Response> {
462464
if (!this._eventStore) {
465+
this.onerror?.(new Error('Event store not configured'));
463466
return this.createJsonErrorResponse(400, -32000, 'Event store not configured');
464467
}
465468

@@ -470,11 +473,13 @@ export class WebStandardStreamableHTTPServerTransport implements Transport {
470473
streamId = await this._eventStore.getStreamIdForEventId(lastEventId);
471474

472475
if (!streamId) {
476+
this.onerror?.(new Error('Invalid event ID format'));
473477
return this.createJsonErrorResponse(400, -32000, 'Invalid event ID format');
474478
}
475479

476480
// Check conflict with the SAME streamId we'll use for mapping
477481
if (this._streamMapping.get(streamId) !== undefined) {
482+
this.onerror?.(new Error('Conflict: Stream already has an active connection'));
478483
return this.createJsonErrorResponse(409, -32000, 'Conflict: Stream already has an active connection');
479484
}
480485
}
@@ -556,7 +561,8 @@ export class WebStandardStreamableHTTPServerTransport implements Transport {
556561
eventData += `data: ${JSON.stringify(message)}\n\n`;
557562
controller.enqueue(encoder.encode(eventData));
558563
return true;
559-
} catch {
564+
} catch (error) {
565+
this.onerror?.(error as Error);
560566
return false;
561567
}
562568
}
@@ -565,6 +571,7 @@ export class WebStandardStreamableHTTPServerTransport implements Transport {
565571
* Handles unsupported requests (PUT, PATCH, etc.)
566572
*/
567573
private handleUnsupportedRequest(): Response {
574+
this.onerror?.(new Error('Method not allowed.'));
568575
return new Response(
569576
JSON.stringify({
570577
jsonrpc: '2.0',
@@ -593,6 +600,7 @@ export class WebStandardStreamableHTTPServerTransport implements Transport {
593600
const acceptHeader = req.headers.get('accept');
594601
// The client MUST include an Accept header, listing both application/json and text/event-stream as supported content types.
595602
if (!acceptHeader?.includes('application/json') || !acceptHeader.includes('text/event-stream')) {
603+
this.onerror?.(new Error('Not Acceptable: Client must accept both application/json and text/event-stream'));
596604
return this.createJsonErrorResponse(
597605
406,
598606
-32000,
@@ -602,6 +610,7 @@ export class WebStandardStreamableHTTPServerTransport implements Transport {
602610

603611
const ct = req.headers.get('content-type');
604612
if (!ct || !ct.includes('application/json')) {
613+
this.onerror?.(new Error('Unsupported Media Type: Content-Type must be application/json'));
605614
return this.createJsonErrorResponse(415, -32000, 'Unsupported Media Type: Content-Type must be application/json');
606615
}
607616

@@ -618,6 +627,7 @@ export class WebStandardStreamableHTTPServerTransport implements Transport {
618627
try {
619628
rawMessage = await req.json();
620629
} catch {
630+
this.onerror?.(new Error('Parse error: Invalid JSON'));
621631
return this.createJsonErrorResponse(400, -32700, 'Parse error: Invalid JSON');
622632
}
623633
}
@@ -632,6 +642,7 @@ export class WebStandardStreamableHTTPServerTransport implements Transport {
632642
messages = [JSONRPCMessageSchema.parse(rawMessage)];
633643
}
634644
} catch {
645+
this.onerror?.(new Error('Parse error: Invalid JSON-RPC message'));
635646
return this.createJsonErrorResponse(400, -32700, 'Parse error: Invalid JSON-RPC message');
636647
}
637648

@@ -642,9 +653,11 @@ export class WebStandardStreamableHTTPServerTransport implements Transport {
642653
// If it's a server with session management and the session ID is already set we should reject the request
643654
// to avoid re-initialization.
644655
if (this._initialized && this.sessionId !== undefined) {
656+
this.onerror?.(new Error('Invalid Request: Server already initialized'));
645657
return this.createJsonErrorResponse(400, -32600, 'Invalid Request: Server already initialized');
646658
}
647659
if (messages.length > 1) {
660+
this.onerror?.(new Error('Invalid Request: Only one initialization request is allowed'));
648661
return this.createJsonErrorResponse(400, -32600, 'Invalid Request: Only one initialization request is allowed');
649662
}
650663
this.sessionId = this.sessionIdGenerator?.();
@@ -824,18 +837,21 @@ export class WebStandardStreamableHTTPServerTransport implements Transport {
824837
}
825838
if (!this._initialized) {
826839
// If the server has not been initialized yet, reject all requests
840+
this.onerror?.(new Error('Bad Request: Server not initialized'));
827841
return this.createJsonErrorResponse(400, -32000, 'Bad Request: Server not initialized');
828842
}
829843

830844
const sessionId = req.headers.get('mcp-session-id');
831845

832846
if (!sessionId) {
833847
// Non-initialization requests without a session ID should return 400 Bad Request
848+
this.onerror?.(new Error('Bad Request: Mcp-Session-Id header is required'));
834849
return this.createJsonErrorResponse(400, -32000, 'Bad Request: Mcp-Session-Id header is required');
835850
}
836851

837852
if (sessionId !== this.sessionId) {
838853
// Reject requests with invalid session ID with 404 Not Found
854+
this.onerror?.(new Error('Session not found'));
839855
return this.createJsonErrorResponse(404, -32001, 'Session not found');
840856
}
841857

@@ -859,6 +875,12 @@ export class WebStandardStreamableHTTPServerTransport implements Transport {
859875
const protocolVersion = req.headers.get('mcp-protocol-version');
860876

861877
if (protocolVersion !== null && !SUPPORTED_PROTOCOL_VERSIONS.includes(protocolVersion)) {
878+
this.onerror?.(
879+
new Error(
880+
`Bad Request: Unsupported protocol version: ${protocolVersion}` +
881+
` (supported versions: ${SUPPORTED_PROTOCOL_VERSIONS.join(', ')})`
882+
)
883+
);
862884
return this.createJsonErrorResponse(
863885
400,
864886
-32000,

test/server/streamableHttp.test.ts

Lines changed: 160 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ import { createServer, type Server, IncomingMessage, ServerResponse } from 'node
22
import { AddressInfo, createServer as netCreateServer } from 'node:net';
33
import { randomUUID } from 'node:crypto';
44
import { EventStore, StreamableHTTPServerTransport, EventId, StreamId } from '../../src/server/streamableHttp.js';
5+
import { WebStandardStreamableHTTPServerTransport } from '../../src/server/webStandardStreamableHttp.js';
56
import { McpServer } from '../../src/server/mcp.js';
67
import { CallToolResult, JSONRPCMessage } from '../../src/types.js';
78
import { AuthInfo } from '../../src/server/auth/types.js';
@@ -3112,3 +3113,162 @@ async function createTestServerWithDnsProtection(config: {
31123113
baseUrl: serverUrl
31133114
};
31143115
}
3116+
3117+
describe('WebStandardStreamableHTTPServerTransport - onerror callback', () => {
3118+
let transport: WebStandardStreamableHTTPServerTransport;
3119+
let mcpServer: McpServer;
3120+
let onerrorSpy: ReturnType<typeof vi.fn<(error: Error) => void>>;
3121+
3122+
/** Shorthand to build a Web Standard Request for direct transport testing. */
3123+
function req(method: string, opts?: { body?: unknown; headers?: Record<string, string> }): Request {
3124+
const headers: Record<string, string> = { ...opts?.headers };
3125+
if (method === 'POST') {
3126+
headers['Accept'] ??= 'application/json, text/event-stream';
3127+
headers['Content-Type'] ??= 'application/json';
3128+
} else if (method === 'GET') {
3129+
headers['Accept'] ??= 'text/event-stream';
3130+
}
3131+
return new Request('http://localhost/mcp', {
3132+
method,
3133+
headers,
3134+
body: opts?.body !== undefined ? (typeof opts.body === 'string' ? opts.body : JSON.stringify(opts.body)) : undefined
3135+
});
3136+
}
3137+
3138+
function withSession(sessionId: string, extra?: Record<string, string>): Record<string, string> {
3139+
return { 'mcp-session-id': sessionId, 'mcp-protocol-version': '2025-11-25', ...extra };
3140+
}
3141+
3142+
beforeEach(async () => {
3143+
onerrorSpy = vi.fn<(error: Error) => void>();
3144+
mcpServer = new McpServer({ name: 'test-server', version: '1.0.0' });
3145+
transport = new WebStandardStreamableHTTPServerTransport({ sessionIdGenerator: () => randomUUID() });
3146+
transport.onerror = onerrorSpy;
3147+
await mcpServer.connect(transport);
3148+
});
3149+
3150+
afterEach(async () => {
3151+
await transport.close();
3152+
});
3153+
3154+
async function initializeServer(): Promise<string> {
3155+
onerrorSpy.mockClear();
3156+
const response = await transport.handleRequest(req('POST', { body: TEST_MESSAGES.initialize }));
3157+
expect(response.status).toBe(200);
3158+
return response.headers.get('mcp-session-id') as string;
3159+
}
3160+
3161+
it('should call onerror for invalid JSON in POST', async () => {
3162+
await initializeServer();
3163+
await transport.handleRequest(req('POST', { body: 'not valid json' }));
3164+
expect(onerrorSpy).toHaveBeenCalled();
3165+
expect(onerrorSpy.mock.calls[0]![0]!.message).toMatch(/Invalid JSON/);
3166+
});
3167+
3168+
it('should call onerror for invalid JSON-RPC message', async () => {
3169+
const sid = await initializeServer();
3170+
await transport.handleRequest(req('POST', { body: { not: 'valid' }, headers: withSession(sid) }));
3171+
expect(onerrorSpy).toHaveBeenCalled();
3172+
expect(onerrorSpy.mock.calls[0]![0]!.message).toMatch(/Invalid JSON-RPC message/);
3173+
});
3174+
3175+
it('should call onerror for missing Accept header on POST', async () => {
3176+
await transport.handleRequest(
3177+
req('POST', { body: TEST_MESSAGES.initialize, headers: { Accept: 'application/json', 'Content-Type': 'application/json' } })
3178+
);
3179+
expect(onerrorSpy).toHaveBeenCalled();
3180+
expect(onerrorSpy.mock.calls[0]![0]!.message).toMatch(/Not Acceptable/);
3181+
});
3182+
3183+
it('should call onerror for unsupported Content-Type', async () => {
3184+
await transport.handleRequest(
3185+
req('POST', {
3186+
body: TEST_MESSAGES.initialize,
3187+
headers: { Accept: 'application/json, text/event-stream', 'Content-Type': 'text/plain' }
3188+
})
3189+
);
3190+
expect(onerrorSpy).toHaveBeenCalled();
3191+
expect(onerrorSpy.mock.calls[0]![0]!.message).toMatch(/Unsupported Media Type/);
3192+
});
3193+
3194+
it('should call onerror when server is not initialized', async () => {
3195+
await transport.handleRequest(req('POST', { body: TEST_MESSAGES.toolsList }));
3196+
expect(onerrorSpy).toHaveBeenCalledTimes(1);
3197+
expect(onerrorSpy.mock.calls[0]![0]!.message).toMatch(/Server not initialized/);
3198+
});
3199+
3200+
it('should call onerror for invalid session ID', async () => {
3201+
await initializeServer();
3202+
await transport.handleRequest(req('POST', { body: TEST_MESSAGES.toolsList, headers: withSession('invalid-session-id') }));
3203+
expect(onerrorSpy).toHaveBeenCalled();
3204+
expect(onerrorSpy.mock.calls[0]![0]!.message).toMatch(/Session not found/);
3205+
});
3206+
3207+
it('should call onerror for re-initialization attempt', async () => {
3208+
await initializeServer();
3209+
await transport.handleRequest(req('POST', { body: TEST_MESSAGES.initialize }));
3210+
expect(onerrorSpy).toHaveBeenCalled();
3211+
expect(onerrorSpy.mock.calls[0]![0]!.message).toMatch(/Server already initialized/);
3212+
});
3213+
3214+
it('should call onerror for missing Accept header on GET', async () => {
3215+
const sid = await initializeServer();
3216+
await transport.handleRequest(req('GET', { headers: { Accept: 'application/json', ...withSession(sid) } }));
3217+
expect(onerrorSpy).toHaveBeenCalled();
3218+
expect(onerrorSpy.mock.calls[0]![0]!.message).toMatch(/Not Acceptable/);
3219+
});
3220+
3221+
it('should call onerror for concurrent SSE streams', async () => {
3222+
const sid = await initializeServer();
3223+
const response1 = await transport.handleRequest(req('GET', { headers: withSession(sid) }));
3224+
expect(response1.status).toBe(200);
3225+
await transport.handleRequest(req('GET', { headers: withSession(sid) }));
3226+
expect(onerrorSpy).toHaveBeenCalled();
3227+
expect(onerrorSpy.mock.calls[0]![0]!.message).toMatch(/Only one SSE stream/);
3228+
});
3229+
3230+
it('should call onerror for unsupported protocol version', async () => {
3231+
const sid = await initializeServer();
3232+
await transport.handleRequest(
3233+
req('POST', { body: TEST_MESSAGES.toolsList, headers: withSession(sid, { 'mcp-protocol-version': 'unsupported-version' }) })
3234+
);
3235+
expect(onerrorSpy).toHaveBeenCalled();
3236+
expect(onerrorSpy.mock.calls[0]![0]!.message).toMatch(/Unsupported protocol version/);
3237+
});
3238+
3239+
it('should call onerror for unsupported HTTP methods', async () => {
3240+
await transport.handleRequest(req('PUT'));
3241+
expect(onerrorSpy).toHaveBeenCalledTimes(1);
3242+
expect(onerrorSpy.mock.calls[0]![0]!.message).toMatch(/Method not allowed/);
3243+
});
3244+
3245+
it('should call onerror for invalid event ID in replay', async () => {
3246+
const eventStore: EventStore = {
3247+
async storeEvent(): Promise<EventId> {
3248+
return 'evt-1';
3249+
},
3250+
async getStreamIdForEventId(): Promise<StreamId | undefined> {
3251+
return undefined;
3252+
},
3253+
async replayEventsAfter(): Promise<StreamId> {
3254+
return 'stream-1';
3255+
}
3256+
};
3257+
const storeTransport = new WebStandardStreamableHTTPServerTransport({ sessionIdGenerator: () => randomUUID(), eventStore });
3258+
const storeSpy = vi.fn<(error: Error) => void>();
3259+
storeTransport.onerror = storeSpy;
3260+
await new McpServer({ name: 'test', version: '1.0.0' }).connect(storeTransport);
3261+
3262+
const initResp = await storeTransport.handleRequest(req('POST', { body: TEST_MESSAGES.initialize }));
3263+
const sid = initResp.headers.get('mcp-session-id') as string;
3264+
storeSpy.mockClear();
3265+
3266+
const response = await storeTransport.handleRequest(
3267+
req('GET', { headers: { ...withSession(sid), 'Last-Event-ID': 'unknown-event-id' } })
3268+
);
3269+
expect(response.status).toBe(400);
3270+
expect(storeSpy).toHaveBeenCalledTimes(1);
3271+
expect(storeSpy.mock.calls[0]![0]!.message).toMatch(/Invalid event ID format/);
3272+
await storeTransport.close();
3273+
});
3274+
});

0 commit comments

Comments
 (0)