Skip to content

Commit 9008dfd

Browse files
authored
feat: add WebWritableStream for Web Streams API support (#2376)
The existing WritableStream wraps Node.js stream.Writable, which isn't compatible with the Web Streams API. This makes it impossible to pipe fetch() response bodies directly into the parser. Add a new WebWritableStream class exposed via the subpath export "htmlparser2/WebWritableStream" that wraps the Parser using the standard Web Streams API WritableStream. It accepts both string and Uint8Array chunks, using TextDecoder in streaming mode for proper handling of fragmented multi-byte UTF-8 sequences. Closes #1862
1 parent 21dedfa commit 9008dfd

4 files changed

Lines changed: 6003 additions & 0 deletions

File tree

package.json

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,10 @@
3232
"types": "./dist/index.d.ts",
3333
"default": "./dist/index.js"
3434
},
35+
"./WebWritableStream": {
36+
"types": "./dist/WebWritableStream.d.ts",
37+
"default": "./dist/WebWritableStream.js"
38+
},
3539
"./WritableStream": {
3640
"types": "./dist/WritableStream.d.ts",
3741
"default": "./dist/WritableStream.js"

src/WebWritableStream.spec.ts

Lines changed: 143 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,143 @@
1+
import * as fs from "node:fs/promises";
2+
import { describe, expect, it, vi } from "vitest";
3+
import * as helper from "./__fixtures__/testHelper.js";
4+
import type { Handler, ParserOptions } from "./Parser.js";
5+
import { WebWritableStream } from "./WebWritableStream.js";
6+
7+
describe("WebWritableStream", () => {
8+
it("should decode fragmented unicode characters", async () => {
9+
const ontext = vi.fn();
10+
const stream = new WebWritableStream({ ontext });
11+
12+
const writer = stream.getWriter();
13+
// € is U+20AC, encoded as 0xE2 0x82 0xAC in UTF-8
14+
await writer.write(new Uint8Array([0xe2, 0x82]));
15+
await writer.write(new Uint8Array([0xac]));
16+
await writer.write("");
17+
await writer.close();
18+
19+
expect(ontext).toHaveBeenCalledWith("€");
20+
});
21+
22+
it("should handle string chunks", async () => {
23+
const ontext = vi.fn();
24+
const stream = new WebWritableStream({ ontext });
25+
26+
const writer = stream.getWriter();
27+
await writer.write("hello");
28+
await writer.close();
29+
30+
expect(ontext).toHaveBeenCalledWith("hello");
31+
});
32+
33+
it("should handle empty stream", async () => {
34+
const onend = vi.fn();
35+
const stream = new WebWritableStream({ onend });
36+
37+
const writer = stream.getWriter();
38+
await writer.close();
39+
40+
expect(onend).toHaveBeenCalledTimes(1);
41+
});
42+
43+
it("should handle abort signal", async () => {
44+
const ontext = vi.fn();
45+
const onend = vi.fn();
46+
const stream = new WebWritableStream({ ontext, onend });
47+
48+
const writer = stream.getWriter();
49+
await writer.abort(new Error("aborted"));
50+
51+
expect(ontext).not.toHaveBeenCalled();
52+
expect(onend).not.toHaveBeenCalled();
53+
});
54+
55+
it("should work with ReadableStream.pipeTo", async () => {
56+
const onopentag = vi.fn();
57+
const ontext = vi.fn();
58+
const stream = new WebWritableStream({ onopentag, ontext });
59+
60+
const html = "<div>hello</div>";
61+
const readable = new ReadableStream<string>({
62+
start(controller) {
63+
controller.enqueue(html);
64+
controller.close();
65+
},
66+
});
67+
68+
await readable.pipeTo(stream);
69+
70+
expect(onopentag).toHaveBeenCalledWith("div", {}, false);
71+
expect(ontext).toHaveBeenCalledWith("hello");
72+
});
73+
74+
it("Basic html", () => testStream("Basic.html"));
75+
it("Attributes", () => testStream("Attributes.html"));
76+
it("SVG", () => testStream("Svg.html"));
77+
it("RSS feed", () => testStream("RSS_Example.xml", { xmlMode: true }));
78+
it("Atom feed", () => testStream("Atom_Example.xml", { xmlMode: true }));
79+
it("RDF feed", () => testStream("RDF_Example.xml", { xmlMode: true }));
80+
});
81+
82+
function getPromiseEventCollector(): [
83+
handler: Partial<Handler>,
84+
promise: Promise<unknown>,
85+
] {
86+
let handler: Partial<Handler> | undefined;
87+
const promise = new Promise<unknown>((resolve, reject) => {
88+
handler = helper.getEventCollector((error, events) => {
89+
if (error) {
90+
reject(error);
91+
} else {
92+
resolve(events);
93+
}
94+
});
95+
});
96+
97+
if (!handler) {
98+
throw new Error("Failed to initialize event handler");
99+
}
100+
101+
return [handler, promise];
102+
}
103+
104+
async function testStream(
105+
file: string,
106+
options?: ParserOptions,
107+
): Promise<void> {
108+
const filePath = new URL(`__fixtures__/Documents/${file}`, import.meta.url);
109+
110+
const [streamHandler, eventsPromise] = getPromiseEventCollector();
111+
112+
const fileContents = await fs.readFile(filePath);
113+
114+
// Pipe file contents through a ReadableStream into the WebWritableStream
115+
const readable = new ReadableStream<Uint8Array>({
116+
start(controller) {
117+
controller.enqueue(new Uint8Array(fileContents));
118+
controller.close();
119+
},
120+
});
121+
122+
await readable.pipeTo(new WebWritableStream(streamHandler, options));
123+
124+
const events = await eventsPromise;
125+
126+
expect(events).toMatchSnapshot();
127+
128+
// Verify single-pass produces identical results
129+
const [singlePassHandler, singlePassPromise] = getPromiseEventCollector();
130+
131+
const singlePassReadable = new ReadableStream<string>({
132+
start(controller) {
133+
controller.enqueue(fileContents.toString());
134+
controller.close();
135+
},
136+
});
137+
138+
await singlePassReadable.pipeTo(
139+
new WebWritableStream(singlePassHandler, options),
140+
);
141+
142+
expect(await singlePassPromise).toStrictEqual(events);
143+
}

src/WebWritableStream.ts

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
import { type Handler, Parser, type ParserOptions } from "./Parser.js";
2+
3+
/**
4+
* WebWritableStream makes the `Parser` interface available as a Web Streams API WritableStream.
5+
*
6+
* This is useful for piping `fetch()` response bodies directly into the parser.
7+
* @see Parser
8+
* @example
9+
* ```typescript
10+
* import { WebWritableStream } from "htmlparser2/WebWritableStream";
11+
*
12+
* const stream = new WebWritableStream({
13+
* onopentag(name, attribs) {
14+
* console.log("Opened:", name);
15+
* },
16+
* });
17+
*
18+
* const response = await fetch("https://example.com");
19+
* await response.body.pipeTo(stream);
20+
* ```
21+
*/
22+
// eslint-disable-next-line n/no-unsupported-features/node-builtins -- Web Streams API; requires a runtime with WritableStream (browsers, Deno, Node ≥18.0)
23+
export class WebWritableStream extends WritableStream<string | Uint8Array> {
24+
constructor(cbs: Partial<Handler>, options?: ParserOptions) {
25+
const parser = new Parser(cbs, options);
26+
const decoder = new TextDecoder();
27+
const streamOption: TextDecodeOptions = { stream: true };
28+
29+
super({
30+
write(chunk) {
31+
parser.write(
32+
typeof chunk === "string"
33+
? chunk
34+
: decoder.decode(chunk, streamOption),
35+
);
36+
},
37+
close() {
38+
// Flush any remaining bytes in the decoder
39+
const remaining = decoder.decode();
40+
if (remaining) {
41+
parser.write(remaining);
42+
}
43+
parser.end();
44+
},
45+
});
46+
}
47+
}

0 commit comments

Comments
 (0)