Skip to content

Commit f6c3baa

Browse files
committed
feat: add configurable concurrency for web scraping
- Add maxConcurrency option to control parallel page requests - Implement batch-based concurrent scraping in DefaultScraperStrategy - Update CLI and types to support configurable concurrency - Document concurrency model in architecture
1 parent 6f71856 commit f6c3baa

File tree

5 files changed

+129
-75
lines changed

5 files changed

+129
-75
lines changed

ARCHITECTURE.md

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -18,15 +18,15 @@ The Documentation MCP Server is designed with a modular architecture that ensure
1818
```
1919
src/
2020
├── cli.ts # CLI interface implementation
21-
├── index.ts # MCP server interface
22-
├── pipeline/ # Document processing pipeline
23-
├── scraper/ # Web scraping implementation
24-
│ └── strategies/ # Scraping strategies for different sources
25-
├── splitter/ # Document splitting and chunking
26-
├── store/ # Vector store and document storage
27-
├── tools/ # Core functionality tools
28-
├── types/ # Shared type definitions
29-
└── utils/ # Common utilities and helpers
21+
├── index.ts # MCP server interface
22+
├── pipeline/ # Document processing pipeline
23+
├── scraper/ # Web scraping implementation
24+
│ └── strategies/ # Scraping strategies for different sources
25+
├── splitter/ # Document splitting and chunking
26+
├── store/ # Vector store and document storage
27+
├── tools/ # Core functionality tools
28+
├── types/ # Shared type definitions
29+
└── utils/ # Common utilities and helpers
3030
```
3131

3232
## Tools Layer
@@ -67,7 +67,7 @@ The project uses a unified progress reporting system with typed callbacks for al
6767
- Provides real-time feedback across multiple levels (page, document, storage)
6868
- Ensures consistent progress tracking across components
6969
- Supports different output formats for CLI and MCP interfaces
70-
- Enables parallel processing with individual progress tracking
70+
- Enables parallel processing with individual progress tracking through configurable batch-based concurrency
7171

7272
### Logging Strategy
7373

src/cli.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,11 @@ async function main() {
4444
.description("Scrape and index documentation from a URL")
4545
.option("-p, --max-pages <number>", "Maximum pages to scrape", "100")
4646
.option("-d, --max-depth <number>", "Maximum navigation depth", "3")
47+
.option(
48+
"-c, --max-concurrency <number>",
49+
"Maximum concurrent page requests",
50+
"3"
51+
)
4752
.option(
4853
"--subpages-only",
4954
"Allow scraping pages outside the initial URL path",
@@ -57,6 +62,7 @@ async function main() {
5762
options: {
5863
maxPages: Number.parseInt(options.maxPages),
5964
maxDepth: Number.parseInt(options.maxDepth),
65+
maxConcurrency: Number.parseInt(options.maxConcurrency),
6066
},
6167
});
6268
console.log(`✅ Successfully scraped ${result.pagesScraped} pages`);

src/scraper/strategies/DefaultScraperStrategy.ts

Lines changed: 110 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,97 @@ export class DefaultScraperStrategy implements ScraperStrategy {
5555
}
5656
}
5757

58+
private async processUrl(
59+
item: { url: string; depth: number },
60+
options: ScrapeOptions,
61+
progressCallback?: ProgressCallback<ScrapingProgress>
62+
): Promise<string[]> {
63+
const { url, depth } = item;
64+
const normalizedUrl = normalizeUrl(url, this.urlNormalizerOptions);
65+
66+
logger.info(
67+
`🌐 Scraping page ${this.pageCount}/${options.maxPages} (depth ${depth}/${options.maxDepth}): ${normalizedUrl}`
68+
);
69+
70+
try {
71+
const result = await this.htmlScraper.scrapePageWithRetry(url);
72+
73+
// Convert and emit the document immediately
74+
await progressCallback?.({
75+
pagesScraped: this.pageCount,
76+
maxPages: options.maxPages,
77+
currentUrl: normalizedUrl,
78+
depth,
79+
maxDepth: options.maxDepth,
80+
document: {
81+
content: result.content,
82+
metadata: {
83+
url: result.url,
84+
title: result.title,
85+
library: options.library,
86+
version: options.version,
87+
} satisfies ScraperMetadata,
88+
},
89+
});
90+
91+
// Return links to be processed by the main loop
92+
return result.links;
93+
} catch (error) {
94+
logger.error(`Failed to scrape page ${url}: ${error}`);
95+
return [];
96+
}
97+
}
98+
99+
private async processBatch(
100+
batch: Array<{ url: string; depth: number }>,
101+
baseUrl: URL,
102+
options: ScrapeOptions,
103+
progressCallback?: ProgressCallback<ScrapingProgress>
104+
): Promise<Array<{ url: string; depth: number }>> {
105+
// Process all URLs in the batch concurrently
106+
const results = await Promise.all(
107+
batch.map(async (item) => {
108+
// Increment page count before processing each URL
109+
this.pageCount++;
110+
const links = await this.processUrl(item, options, progressCallback);
111+
112+
if (item.depth < options.maxDepth) {
113+
return links
114+
.map((link) => {
115+
try {
116+
const targetUrl = new URL(link);
117+
const normalizedLink = normalizeUrl(
118+
link,
119+
this.urlNormalizerOptions
120+
);
121+
122+
if (
123+
!this.visited.has(normalizedLink) &&
124+
(!options.subpagesOnly ||
125+
this.isSubpage(baseUrl, targetUrl)) &&
126+
(!this.shouldFollowLinkFn ||
127+
this.shouldFollowLinkFn(baseUrl, targetUrl))
128+
) {
129+
this.visited.add(normalizedLink);
130+
return { url: link, depth: item.depth + 1 };
131+
}
132+
} catch (error) {
133+
// Invalid URL
134+
}
135+
return null;
136+
})
137+
.filter(
138+
(item): item is { url: string; depth: number } => item !== null
139+
);
140+
}
141+
return [];
142+
})
143+
);
144+
145+
// Flatten and return all new URLs to process
146+
return results.flat();
147+
}
148+
58149
async scrape(
59150
options: ScrapeOptions,
60151
progressCallback?: ProgressCallback<ScrapingProgress>
@@ -68,78 +159,32 @@ export class DefaultScraperStrategy implements ScraperStrategy {
68159
];
69160

70161
// Track URLs we've seen (either queued or visited)
71-
// Add starting URL to the tracking set
72162
this.visited.add(normalizeUrl(options.url, this.urlNormalizerOptions));
73163

74164
while (queue.length > 0 && this.pageCount < options.maxPages) {
75-
const current = queue.shift();
76-
if (!current) continue;
77-
78-
const { url, depth } = current;
79-
const normalizedUrl = normalizeUrl(url, this.urlNormalizerOptions);
80-
81-
// Since we track at queueing time, this check is mostly
82-
// for safety in case of URL normalization differences
83-
if (!this.visited.has(normalizedUrl)) {
84-
// This shouldn't happen if our normalization is consistent,
85-
// but let's add it to visited to be safe
86-
this.visited.add(normalizedUrl);
165+
// Take a batch of URLs to process
166+
const remainingPages = options.maxPages - this.pageCount;
167+
if (remainingPages <= 0) {
168+
break;
87169
}
88170

89-
this.pageCount++;
90-
91-
logger.info(
92-
`🌐 Scraping page ${this.pageCount}/${options.maxPages} (depth ${depth}/${options.maxDepth}): ${normalizedUrl}`
171+
const batchSize = Math.min(
172+
options.maxConcurrency ?? 3,
173+
remainingPages,
174+
queue.length
175+
);
176+
const batch = queue.splice(0, batchSize);
177+
178+
// Process the batch and get new URLs
179+
const newUrls = await this.processBatch(
180+
batch,
181+
baseUrl,
182+
options,
183+
progressCallback
93184
);
94185

95-
try {
96-
const result = await this.htmlScraper.scrapePageWithRetry(url);
97-
98-
// Convert and emit the document immediately
99-
await progressCallback?.({
100-
pagesScraped: this.pageCount,
101-
maxPages: options.maxPages,
102-
currentUrl: normalizedUrl,
103-
depth,
104-
maxDepth: options.maxDepth,
105-
document: {
106-
content: result.content,
107-
metadata: {
108-
url: result.url,
109-
title: result.title,
110-
library: options.library,
111-
version: options.version,
112-
} satisfies ScraperMetadata,
113-
},
114-
});
115-
116-
// Queue child pages if we haven't reached max depth
117-
if (depth < options.maxDepth) {
118-
for (const link of result.links) {
119-
const targetUrl = new URL(link);
120-
const normalizedLink = normalizeUrl(
121-
link,
122-
this.urlNormalizerOptions
123-
);
124-
125-
// Skip if already visited or queued (now combined in this.visited)
126-
if (
127-
this.visited.has(normalizedLink) ||
128-
(options.subpagesOnly && !this.isSubpage(baseUrl, targetUrl)) ||
129-
(this.shouldFollowLinkFn &&
130-
!this.shouldFollowLinkFn(baseUrl, targetUrl))
131-
) {
132-
continue;
133-
}
134-
135-
// Add to queue and track immediately in visited set
136-
queue.push({ url: link, depth: depth + 1 });
137-
this.visited.add(normalizedLink);
138-
}
139-
}
140-
} catch (error) {
141-
logger.error(`Failed to scrape page ${url}: ${error}`);
142-
}
186+
// Add new URLs to the queue
187+
queue.push(...newUrls);
143188
}
144189
}
145190
}

src/tools/ScrapeTool.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ export interface ScrapeToolOptions {
1111
options?: {
1212
maxPages?: number;
1313
maxDepth?: number;
14+
maxConcurrency?: number;
1415
};
1516
}
1617

@@ -83,6 +84,7 @@ export class ScrapeTool {
8384
version: version,
8485
maxPages: scraperOptions?.maxPages ?? 100,
8586
maxDepth: scraperOptions?.maxDepth ?? 3,
87+
maxConcurrency: scraperOptions?.maxConcurrency ?? 3,
8688
subpagesOnly: true,
8789
});
8890

src/types/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ export interface ScrapeOptions {
2020
maxPages: number;
2121
maxDepth: number;
2222
subpagesOnly?: boolean;
23+
maxConcurrency?: number;
2324
}
2425

2526
export interface PageResult {

0 commit comments

Comments
 (0)