Skip to content

Commit e01d31e

Browse files
committed
feat: Add pipeline management tools to MCP server
- Implemented new tools (`ListJobsTool`, `GetJobStatusTool`, `CancelJobTool`) to manage pipeline jobs (list, get status, cancel). - Registered these tools in the MCP server (`src/mcp/index.ts`). - Refactored `ScrapeTool` to accept `PipelineManager` via constructor injection, removing internal instantiation. - Updated `ScrapeTool` usage in MCP server, CLI, and tests to provide `PipelineManager`. - Added basic tests for the new pipeline management tools. - Corrected `ScrapeTool` tests to align with refactoring.
1 parent 2b345c7 commit e01d31e

33 files changed

+2032
-483
lines changed

.DS_Store

0 Bytes
Binary file not shown.

src/cli.ts

Lines changed: 27 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,28 +2,37 @@
22
import "dotenv/config";
33
import { Command } from "commander";
44
import { DocumentManagementService } from "./store/DocumentManagementService";
5+
import { PipelineManager } from "./pipeline/PipelineManager"; // Import PipelineManager
56
import { FindVersionTool, ListLibrariesTool, ScrapeTool, SearchTool } from "./tools";
67

78
const formatOutput = (data: unknown) => JSON.stringify(data, null, 2);
89

910
async function main() {
10-
const docService = new DocumentManagementService();
11+
let docService: DocumentManagementService | undefined;
12+
let pipelineManager: PipelineManager | undefined;
1113

1214
try {
15+
docService = new DocumentManagementService();
1316
await docService.initialize();
1417

18+
// Instantiate PipelineManager for CLI use
19+
pipelineManager = new PipelineManager(docService); // Assign inside try
20+
// Start the manager for the CLI session
21+
await pipelineManager.start();
22+
1523
const tools = {
1624
listLibraries: new ListLibrariesTool(docService),
1725
findVersion: new FindVersionTool(docService),
18-
scrape: new ScrapeTool(docService),
26+
scrape: new ScrapeTool(docService, pipelineManager), // Pass manager
1927
search: new SearchTool(docService),
2028
};
2129

2230
const program = new Command();
2331

2432
// Handle cleanup on SIGINT
2533
process.on("SIGINT", async () => {
26-
await docService.shutdown();
34+
if (pipelineManager) await pipelineManager.stop(); // Check before stopping
35+
if (docService) await docService.shutdown(); // Check before stopping
2736
process.exit(0);
2837
});
2938

@@ -52,8 +61,15 @@ async function main() {
5261
maxConcurrency: Number.parseInt(options.maxConcurrency),
5362
ignoreErrors: options.ignoreErrors,
5463
},
64+
// CLI always waits for completion (default behavior)
5565
});
56-
console.log(`✅ Successfully scraped ${result.pagesScraped} pages`);
66+
// Type guard to satisfy TypeScript
67+
if ("pagesScraped" in result) {
68+
console.log(`✅ Successfully scraped ${result.pagesScraped} pages`);
69+
} else {
70+
// This branch should not be hit by the CLI
71+
console.log(`🚀 Scraping job started with ID: ${result.jobId}`);
72+
}
5773
});
5874

5975
program
@@ -102,13 +118,15 @@ async function main() {
102118
"-v, --version <string>", // Add optional version flag
103119
"Target version to match (optional, supports ranges)",
104120
)
105-
.action(async (library, options) => { // Update action parameters
121+
.action(async (library, options) => {
122+
// Update action parameters
106123
const versionInfo = await tools.findVersion.execute({
107124
library,
108125
targetVersion: options.version, // Get version from options
109126
});
110127
// findVersion.execute now returns a string, handle potential error messages within it
111-
if (!versionInfo) { // Should not happen with current tool logic, but good practice
128+
if (!versionInfo) {
129+
// Should not happen with current tool logic, but good practice
112130
throw new Error("Failed to get version information");
113131
}
114132
console.log(versionInfo); // Log the descriptive string from the tool
@@ -117,11 +135,13 @@ async function main() {
117135
await program.parseAsync();
118136
} catch (error) {
119137
console.error("Error:", error instanceof Error ? error.message : String(error));
120-
await docService.shutdown();
138+
if (pipelineManager) await pipelineManager.stop(); // Check before stopping
139+
if (docService) await docService.shutdown();
121140
process.exit(1);
122141
}
123142

124143
// Clean shutdown after successful execution
144+
if (pipelineManager) await pipelineManager.stop(); // Check before stopping
125145
await docService.shutdown();
126146
process.exit(0);
127147
}

src/mcp/index.ts

Lines changed: 131 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -4,27 +4,46 @@ import { McpServer, ResourceTemplate } from "@modelcontextprotocol/sdk/server/mc
44
import { StdioServerTransport } from "@modelcontextprotocol/sdk/server/stdio.js";
55
import { z } from "zod";
66
import { DocumentManagementService } from "../store/DocumentManagementService";
7+
import { PipelineManager } from "../pipeline/PipelineManager"; // Import PipelineManager
8+
import { PipelineJobStatus } from "../pipeline/types"; // Import PipelineJobStatus
79
import {
10+
CancelJobTool, // Import new tool
811
FindVersionTool,
12+
GetJobStatusTool, // Import new tool
13+
ListJobsTool, // Import new tool
914
ListLibrariesTool,
10-
ScrapeTool,
15+
ScrapeTool, // Keep existing tools
1116
SearchTool,
1217
VersionNotFoundError,
13-
} from "../tools";
18+
} from "../tools"; // Ensure this path is correct
1419
import { createError, createResponse } from "./utils";
1520
import { logger } from "../utils/logger";
21+
import { PipelineStateError } from "../pipeline/errors"; // Import error type
1622

1723
export async function startServer() {
1824
const docService = new DocumentManagementService();
1925

2026
try {
2127
await docService.initialize();
2228

29+
// Instantiate PipelineManager
30+
// TODO: Check if concurrency needs to be configurable
31+
const pipelineManager = new PipelineManager(docService);
32+
// Start the pipeline manager to process jobs
33+
await pipelineManager.start(); // Assuming start is async and needed
34+
35+
// Instantiate tools, passing dependencies
2336
const tools = {
2437
listLibraries: new ListLibrariesTool(docService),
2538
findVersion: new FindVersionTool(docService),
26-
scrape: new ScrapeTool(docService),
39+
// TODO: Update ScrapeTool constructor if needed to accept PipelineManager
40+
// ScrapeTool currently uses docService.getPipelineManager() which doesn't exist.
41+
// Pass both docService and pipelineManager to ScrapeTool constructor
42+
scrape: new ScrapeTool(docService, pipelineManager),
2743
search: new SearchTool(docService),
44+
listJobs: new ListJobsTool(pipelineManager), // Instantiate new tool
45+
getJobStatus: new GetJobStatusTool(pipelineManager), // Instantiate new tool
46+
cancelJob: new CancelJobTool(pipelineManager), // Instantiate new tool
2847
};
2948

3049
const server = new McpServer(
@@ -41,7 +60,9 @@ export async function startServer() {
4160
},
4261
);
4362

44-
// Scrape docs tool
63+
// --- Existing Tool Definitions ---
64+
65+
// Scrape docs tool (Keep as is for now, but likely needs ScrapeTool refactor)
4566
server.tool(
4667
"scrape_docs",
4768
"Scrape and index documentation from a URL",
@@ -61,23 +82,35 @@ export async function startServer() {
6182
.default(true)
6283
.describe("Only scrape pages under the initial URL path"),
6384
},
85+
// Remove context as it's not used without progress reporting
6486
async ({ url, library, version, maxPages, maxDepth, subpagesOnly }) => {
6587
try {
88+
// Execute scrape tool without waiting and without progress callback
89+
// NOTE: This might fail if ScrapeTool relies on docService.getPipelineManager()
6690
const result = await tools.scrape.execute({
6791
url,
6892
library,
6993
version,
94+
waitForCompletion: false, // Don't wait for completion
95+
// onProgress: undefined, // Explicitly undefined or omitted
7096
options: {
7197
maxPages,
7298
maxDepth,
7399
subpagesOnly,
74100
},
75101
});
76102

103+
// Check the type of result
104+
if ("jobId" in result) {
105+
// If we got a jobId back, report that
106+
return createResponse(`🚀 Scraping job started with ID: ${result.jobId}.`);
107+
}
108+
// This case shouldn't happen if waitForCompletion is false, but handle defensively
77109
return createResponse(
78-
`Successfully scraped ${result.pagesScraped} pages from ${url} for ${library} v${version}.`,
110+
`Scraping finished immediately (unexpectedly) with ${result.pagesScraped} pages.`,
79111
);
80112
} catch (error) {
113+
// Handle errors during job *enqueueing* or initial setup
81114
return createError(
82115
`Failed to scrape documentation: ${
83116
error instanceof Error ? error.message : String(error)
@@ -87,7 +120,7 @@ export async function startServer() {
87120
},
88121
);
89122

90-
// Search docs tool
123+
// Search docs tool (Keep as is)
91124
server.tool(
92125
"search_docs",
93126
"Search indexed documentation. Examples:\n" +
@@ -154,7 +187,7 @@ ${formattedResults.join("")}`,
154187
},
155188
);
156189

157-
// List libraries tool
190+
// List libraries tool (Keep as is)
158191
server.tool("list_libraries", "List all indexed libraries", {}, async () => {
159192
try {
160193
const result = await tools.listLibraries.execute();
@@ -171,7 +204,7 @@ ${formattedResults.join("")}`,
171204
}
172205
});
173206

174-
// Find version tool
207+
// Find version tool (Keep as is)
175208
server.tool(
176209
"find_version",
177210
"Find best matching version for a library",
@@ -206,6 +239,94 @@ ${formattedResults.join("")}`,
206239
},
207240
);
208241

242+
// List jobs tool
243+
server.tool(
244+
"list_jobs",
245+
"List pipeline jobs, optionally filtering by status.",
246+
{
247+
status: z
248+
.nativeEnum(PipelineJobStatus)
249+
.optional()
250+
.describe("Optional status to filter jobs by."),
251+
},
252+
async ({ status }) => {
253+
try {
254+
const result = await tools.listJobs.execute({ status });
255+
// Format the job list for display
256+
const formattedJobs = result.jobs
257+
.map(
258+
(job) =>
259+
`- ID: ${job.id}\n Status: ${job.status}\n Library: ${job.library}@${job.version}\n Created: ${job.createdAt.toISOString()}${job.startedAt ? `\n Started: ${job.startedAt.toISOString()}` : ""}${job.finishedAt ? `\n Finished: ${job.finishedAt.toISOString()}` : ""}${job.error ? `\n Error: ${job.error.message}` : ""}`,
260+
)
261+
.join("\n\n");
262+
return createResponse(
263+
result.jobs.length > 0
264+
? `Current Jobs:\n\n${formattedJobs}`
265+
: "No jobs found matching criteria.",
266+
);
267+
} catch (error) {
268+
return createError(
269+
`Failed to list jobs: ${
270+
error instanceof Error ? error.message : String(error)
271+
}`,
272+
);
273+
}
274+
},
275+
);
276+
277+
// Get job status tool
278+
server.tool(
279+
"get_job_status",
280+
"Get the status and details of a specific pipeline job.",
281+
{
282+
jobId: z.string().uuid().describe("The ID of the job to query."),
283+
},
284+
async ({ jobId }) => {
285+
try {
286+
const result = await tools.getJobStatus.execute({ jobId });
287+
if (!result.job) {
288+
return createError(`Job with ID ${jobId} not found.`);
289+
}
290+
const job = result.job;
291+
const formattedJob = `- ID: ${job.id}\n Status: ${job.status}\n Library: ${job.library}@${job.version}\n Created: ${job.createdAt.toISOString()}${job.startedAt ? `\n Started: ${job.startedAt.toISOString()}` : ""}${job.finishedAt ? `\n Finished: ${job.finishedAt.toISOString()}` : ""}${job.error ? `\n Error: ${job.error.message}` : ""}`;
292+
return createResponse(`Job Status:\n\n${formattedJob}`);
293+
} catch (error) {
294+
return createError(
295+
`Failed to get job status for ${jobId}: ${
296+
error instanceof Error ? error.message : String(error)
297+
}`,
298+
);
299+
}
300+
},
301+
);
302+
303+
// Cancel job tool
304+
server.tool(
305+
"cancel_job",
306+
"Attempt to cancel a queued or running pipeline job.",
307+
{
308+
jobId: z.string().uuid().describe("The ID of the job to cancel."),
309+
},
310+
async ({ jobId }) => {
311+
try {
312+
const result = await tools.cancelJob.execute({ jobId });
313+
// Use the message and success status from the tool's result
314+
if (result.success) {
315+
return createResponse(result.message);
316+
}
317+
// If not successful according to the tool, treat it as an error in MCP
318+
return createError(result.message);
319+
} catch (error) {
320+
// Catch any unexpected errors during the tool execution itself
321+
return createError(
322+
`Failed to cancel job ${jobId}: ${
323+
error instanceof Error ? error.message : String(error)
324+
}`,
325+
);
326+
}
327+
},
328+
);
329+
209330
server.prompt(
210331
"docs",
211332
"Search indexed documentation",
@@ -279,12 +400,13 @@ ${formattedResults.join("")}`,
279400

280401
// Handle cleanup
281402
process.on("SIGINT", async () => {
403+
await pipelineManager.stop(); // Stop the pipeline manager
282404
await docService.shutdown();
283405
await server.close();
284406
process.exit(0);
285407
});
286408
} catch (error) {
287-
await docService.shutdown();
409+
await docService.shutdown(); // Ensure docService shutdown on error too
288410
logger.error(`❌ Fatal Error: ${error}`);
289411
process.exit(1);
290412
}

0 commit comments

Comments
 (0)