I'm using Cloudflare Workers, where API can return a ReadableStream (via new Response(new ReadableStream(...), { status: 200 }). But I want to return more complex objects containing ReadableStreams, so I used seroval to serialize them:
import { toCrossJSONStream } from "seroval";
import { ReadableStreamPlugin } from "seroval-plugins/web";
export function serializeStream(object: unknown) {
let cancel!: () => void;
return new ReadableStream<string>({
start(controller) {
cancel = toCrossJSONStream(object, {
plugins: [ReadableStreamPlugin],
onParse(node, initial) {
const line = (initial ? "" : "\n") + JSON.stringify(node);
// console.log("[server-function]", "onParse", line);
controller.enqueue(line);
},
onDone() {
// console.log("[server-function]", "onDone");
controller.close();
},
});
},
cancel() {
cancel();
},
}).pipeThrough(new TextEncoderStream());
}
However, one thing I lost is, when client disconnects before receiving the full response, the cancel method above will be called, but it doesn't actually cancel the ReadableStream being serialized:
import { toCrossJSONStream } from "seroval";
import { ReadableStreamPlugin } from "seroval-plugins/web";
const cancel = toCrossJSONStream(
new ReadableStream({
async start(controller) {
while (true) {
await new Promise((resolve) => {
setTimeout(resolve, 1000);
});
console.log("ReadableStream enqueue");
controller.enqueue(0);
}
},
cancel() {
console.log("ReadableStream cancel");
},
}),
{
plugins: [ReadableStreamPlugin],
onParse(node, initial) {
console.log("onParse", JSON.stringify(node), initial);
},
},
);
setTimeout(() => {
console.log("call cancel");
cancel();
}, 5000);
After 5 seconds, it stops printing onParse, but there is no "ReadableStream cancel", and "ReadableStream enqueue" continues indefinitely.
I'm using Cloudflare Workers, where API can return a
ReadableStream(vianew Response(new ReadableStream(...), { status: 200 }). But I want to return more complex objects containingReadableStreams, so I usedserovalto serialize them:However, one thing I lost is, when client disconnects before receiving the full response, the
cancelmethod above will be called, but it doesn't actually cancel theReadableStreambeing serialized:After 5 seconds, it stops printing
onParse, but there is no "ReadableStream cancel", and "ReadableStream enqueue" continues indefinitely.