-
Notifications
You must be signed in to change notification settings - Fork 137
Expand file tree
/
Copy pathserver.ts
More file actions
328 lines (291 loc) · 13.9 KB
/
server.ts
File metadata and controls
328 lines (291 loc) · 13.9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
/*
* Express server implementation used for standby Actor mode.
*/
import { randomUUID } from 'node:crypto';
import { InMemoryTaskStore } from '@modelcontextprotocol/sdk/experimental/tasks/stores/in-memory.js';
import { SSEServerTransport } from '@modelcontextprotocol/sdk/server/sse.js';
import { StreamableHTTPServerTransport } from '@modelcontextprotocol/sdk/server/streamableHttp.js';
import type { InitializeRequest, JSONRPCMessage } from '@modelcontextprotocol/sdk/types.js';
import type { Request, Response } from 'express';
import express from 'express';
import log from '@apify/log';
import { parseBooleanOrNull } from '@apify/utilities';
import { ApifyClient } from '../apify_client.js';
import { ActorsMcpServer } from '../mcp/server.js';
import type { ApifyRequestParams } from '../types.js';
import { parseUiMode } from '../types.js';
import { getHelpMessage, HEADER_READINESS_PROBE, Routes, TransportType } from './const.js';
import { getActorRunData } from './utils.js';
export function createExpressApp(
host: string,
): express.Express {
const app = express();
const mcpServers: { [sessionId: string]: ActorsMcpServer } = {};
const transportsSSE: { [sessionId: string]: SSEServerTransport } = {};
const transports: { [sessionId: string]: StreamableHTTPServerTransport } = {};
const taskStore = new InMemoryTaskStore();
function respondWithError(res: Response, error: unknown, logMessage: string, statusCode = 500) {
if (statusCode >= 500) {
// Server errors (>= 500) - log as exception
log.exception(error instanceof Error ? error : new Error(String(error)), 'Error in request', { logMessage, statusCode });
} else {
// Client errors (< 500) - log as softFail without stack trace
const errorMessage = error instanceof Error ? error.message : String(error);
log.softFail('Error in request', { logMessage, error: errorMessage, statusCode });
}
if (!res.headersSent) {
res.status(statusCode).json({
jsonrpc: '2.0',
error: {
code: statusCode === 500 ? -32603 : -32000,
message: statusCode === 500 ? 'Internal server error' : 'Bad Request',
},
id: null,
});
}
}
app.get(Routes.ROOT, async (req: Request, res: Response) => {
if (req.headers && req.get(HEADER_READINESS_PROBE) !== undefined) {
log.debug('Received readiness probe');
res.status(200).json({ message: 'Server is ready' }).end();
return;
}
try {
log.info('MCP API', {
mth: req.method,
rt: Routes.ROOT,
tr: TransportType.HTTP,
});
res.setHeader('Content-Type', 'text/event-stream');
res.setHeader('Cache-Control', 'no-cache');
res.setHeader('Connection', 'keep-alive');
res.status(200).json({ message: `Actor is using Model Context Protocol. ${getHelpMessage(host)}`, data: getActorRunData() }).end();
} catch (error) {
respondWithError(res, error, `Error in GET ${Routes.ROOT}`);
}
});
app.head(Routes.ROOT, (_req: Request, res: Response) => {
res.status(200).end();
});
app.get(Routes.SSE, async (req: Request, res: Response) => {
try {
log.info('MCP API', {
mth: req.method,
rt: Routes.SSE,
tr: TransportType.SSE,
});
// Extract telemetry query parameters
const urlParams = new URL(req.url, `http://${req.headers.host}`).searchParams;
const telemetryEnabledParam = urlParams.get('telemetry-enabled');
// URL param > env var > default (true)
const telemetryEnabled = parseBooleanOrNull(telemetryEnabledParam)
?? parseBooleanOrNull(process.env.TELEMETRY_ENABLED)
?? true;
const uiMode = parseUiMode(urlParams.get('ui')) ?? parseUiMode(process.env.UI_MODE);
// Extract payment mode parameter - if payment=skyfire, enable skyfire mode
const paymentParam = urlParams.get('payment');
const skyfireMode = paymentParam === 'skyfire';
const mcpServer = new ActorsMcpServer({
taskStore,
setupSigintHandler: false,
transportType: 'sse',
telemetry: {
enabled: telemetryEnabled,
},
uiMode,
skyfireMode,
});
const transport = new SSEServerTransport(Routes.MESSAGE, res);
// Generate a unique session ID for this SSE connection
const mcpSessionId = transport.sessionId;
// Load MCP server tools
const apifyToken = process.env.APIFY_TOKEN as string;
log.debug('Loading tools from URL', { mcpSessionId: transport.sessionId, tr: TransportType.SSE });
const apifyClient = new ApifyClient({ token: apifyToken });
await mcpServer.loadToolsFromUrl(req.url, apifyClient);
transportsSSE[transport.sessionId] = transport;
mcpServers[transport.sessionId] = mcpServer;
// Create a proxy for transport.onmessage to inject session ID into all requests
const originalOnMessage = transport.onmessage;
transport.onmessage = (message: JSONRPCMessage) => {
const msgRecord = message as Record<string, unknown>;
// Inject session ID into all requests with params
if (msgRecord.params) {
const params = msgRecord.params as ApifyRequestParams;
params._meta ??= {};
params._meta.mcpSessionId = mcpSessionId;
}
// Call the original onmessage handler
if (originalOnMessage) {
originalOnMessage(message);
}
};
await mcpServer.connect(transport);
res.on('close', () => {
log.info('Connection closed, cleaning up', {
mcpSessionId: transport.sessionId,
});
delete transportsSSE[transport.sessionId];
delete mcpServers[transport.sessionId];
});
} catch (error) {
respondWithError(res, error, `Error in GET ${Routes.SSE}`);
}
});
app.post(Routes.MESSAGE, async (req: Request, res: Response) => {
try {
log.info('MCP API', {
mth: req.method,
rt: Routes.MESSAGE,
tr: TransportType.HTTP,
});
const sessionId = new URL(req.url, `http://${req.headers.host}`).searchParams.get('sessionId');
if (!sessionId) {
log.softFail('No session ID provided in POST request', { statusCode: 400 });
res.status(400).json({
jsonrpc: '2.0',
error: {
code: -32000,
message: 'Bad Request: No session ID provided',
},
id: null,
});
return;
}
const transport = transportsSSE[sessionId];
if (transport) {
await transport.handlePostMessage(req, res);
} else {
log.softFail('Server is not connected to the client.', { statusCode: 404 });
res.status(404).json({
jsonrpc: '2.0',
error: {
code: -32000,
message: 'Not Found: Server is not connected to the client. '
+ 'Connect to the server with GET request to /sse endpoint',
},
id: null,
});
}
} catch (error) {
respondWithError(res, error, `Error in POST ${Routes.MESSAGE}`);
}
});
// express.json() middleware to parse JSON bodies.
// It must be used before the POST /mcp route but after the GET /sse route :shrug:
app.use(express.json());
app.post(Routes.MCP, async (req: Request, res: Response) => {
log.info('Received MCP request:', req.body);
try {
// Check for existing session ID
const sessionId = req.headers['mcp-session-id'] as string | undefined;
let transport: StreamableHTTPServerTransport;
if (sessionId && transports[sessionId]) {
// Reuse existing transport
transport = transports[sessionId];
// Inject session ID into request params for existing sessions
if (req.body?.params) {
req.body.params._meta ??= {};
req.body.params._meta.mcpSessionId = sessionId;
}
} else if (!sessionId && isInitializeRequest(req.body)) {
// New initialization request
transport = new StreamableHTTPServerTransport({
sessionIdGenerator: () => randomUUID(),
enableJsonResponse: false, // Use SSE response mode
});
// Extract telemetry query parameters
const urlParams = new URL(req.url, `http://${req.headers.host}`).searchParams;
const telemetryEnabledParam = urlParams.get('telemetry-enabled');
// URL param > env var > default (true)
const telemetryEnabled = parseBooleanOrNull(telemetryEnabledParam)
?? parseBooleanOrNull(process.env.TELEMETRY_ENABLED)
?? true;
const uiMode = parseUiMode(urlParams.get('ui')) ?? parseUiMode(process.env.UI_MODE);
// Extract payment mode parameter - if payment=skyfire, enable skyfire mode
const paymentParam = urlParams.get('payment');
const skyfireMode = paymentParam === 'skyfire';
const mcpServer = new ActorsMcpServer({
taskStore,
setupSigintHandler: false,
initializeRequestData: req.body as InitializeRequest,
transportType: 'http',
telemetry: {
enabled: telemetryEnabled,
},
uiMode,
skyfireMode,
});
// Load MCP server tools
const apifyToken = process.env.APIFY_TOKEN as string;
log.debug('Loading tools from URL', { mcpSessionId: transport.sessionId, tr: TransportType.HTTP });
const apifyClient = new ApifyClient({ token: apifyToken });
await mcpServer.loadToolsFromUrl(req.url, apifyClient);
// Connect the transport to the MCP server BEFORE handling the request
await mcpServer.connect(transport);
// After handling the request, if we get a session ID back, store the transport
await transport.handleRequest(req, res, req.body);
// Store the transport by session ID for future requests
if (transport.sessionId) {
transports[transport.sessionId] = transport;
mcpServers[transport.sessionId] = mcpServer;
}
return; // Already handled
} else {
// Invalid request - no session ID or not initialization request
res.status(404).json({
jsonrpc: '2.0',
error: {
code: -32000,
message: 'Not Found: No valid session ID provided or not initialization request',
},
id: null,
});
return;
}
// Inject session ID into request params for all requests
if (req.body?.params && sessionId) {
req.body.params._meta ??= {};
req.body.params._meta.mcpSessionId = sessionId;
}
// Handle the request with existing transport - no need to reconnect
await transport.handleRequest(req, res, req.body);
} catch (error) {
respondWithError(res, error, 'Error handling MCP request');
}
});
// Handle GET requests for SSE streams according to spec
app.get(Routes.MCP, async (_req: Request, res: Response) => {
// We don't support GET requests for this server
// The spec requires returning 405 Method Not Allowed in this case
res.status(405).set('Allow', 'POST').send('Method Not Allowed');
});
app.delete(Routes.MCP, async (req: Request, res: Response) => {
const sessionId = req.headers['mcp-session-id'] as string | undefined;
const transport = transports[sessionId || ''] as StreamableHTTPServerTransport | undefined;
if (transport) {
log.info('MCP API', {
mth: req.method,
rt: Routes.MESSAGE,
tr: TransportType.HTTP,
mcpSessionId: sessionId,
});
await transport.handleRequest(req, res, req.body);
return;
}
log.softFail('Session not found', { mcpSessionId: sessionId, statusCode: 404 });
res.status(404).send('Not Found: Session not found').end();
});
// Catch-all for undefined routes
app.use((req: Request, res: Response) => {
res.status(404).json({ message: `There is nothing at route ${req.method} ${req.originalUrl}. ${getHelpMessage(host)}` }).end();
});
return app;
}
// Helper function to detect initialize requests
function isInitializeRequest(body: unknown): boolean {
if (Array.isArray(body)) {
return body.some((msg) => typeof msg === 'object' && msg !== null && 'method' in msg && msg.method === 'initialize');
}
return typeof body === 'object' && body !== null && 'method' in body && body.method === 'initialize';
}