Skip to content

Commit aa697dc

Browse files
addaleaxjuanarbol
authored andcommitted
quic: implement sendFD() support
Fixes: nodejs#75 PR-URL: nodejs#150 Reviewed-By: James M Snell <jasnell@gmail.com>
1 parent 17b2cb9 commit aa697dc

2 files changed

Lines changed: 141 additions & 4 deletions

File tree

doc/api/quic.md

Lines changed: 49 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -943,7 +943,7 @@ socket.on('ready', () => {
943943
});
944944
```
945945

946-
#### Call Results#
946+
#### Call Results
947947

948948
A call on a socket that is not ready to send or no longer open may throw a
949949
Not running Error.
@@ -1106,6 +1106,54 @@ added: REPLACEME
11061106

11071107
The `QuicServerSession` or `QuicClientSession`.
11081108

1109+
### quicstream.sendFD(fd[, options])
1110+
<!-- YAML
1111+
added: REPLACEME
1112+
-->
1113+
1114+
* `fd` {number|FileHandle} A readable file descriptor.
1115+
* `options` {Object}
1116+
* `offset` {number} The offset position at which to begin reading.
1117+
Default: `-1`.
1118+
* `length` {number} The amount of data from the fd to send.
1119+
Default: `-1`.
1120+
1121+
Instead of using a `Quicstream` as a writable stream, send data from a given file
1122+
descriptor.
1123+
1124+
If `offset` is set to a non-negative number, reading starts from that position
1125+
and the file offset will not be advanced.
1126+
If `length` is set to a non-negative number, it gives the maximum number of
1127+
bytes that are read from the file.
1128+
1129+
The file descriptor or `FileHandle` is not closed when the stream is closed,
1130+
so it will need to be closed manually once it is no longer needed.
1131+
Using the same file descriptor concurrently for multiple streams
1132+
is not supported and may result in data loss. Re-using a file descriptor
1133+
after a stream has finished is supported.
1134+
1135+
### quicstream.sendFile(path[, options])
1136+
<!-- YAML
1137+
added: REPLACEME
1138+
-->
1139+
1140+
* `path` {string|Buffer|URL}
1141+
* `options` {Object}
1142+
* `onError` {Function} Callback function invoked in the case of an
1143+
error before send.
1144+
* `offset` {number} The offset position at which to begin reading.
1145+
Default: `-1`.
1146+
* `length` {number} The amount of data from the fd to send.
1147+
Default: `-1`.
1148+
1149+
Instead of using a `QuicStream` as a writable stream, send data from a given file
1150+
path.
1151+
1152+
The `options.onError` callback will be called if the file could not be opened.
1153+
If `offset` is set to a non-negative number, reading starts from that position.
1154+
If `length` is set to a non-negative number, it gives the maximum number of
1155+
bytes that are read from the file.
1156+
11091157
### quicstream.unidirectional
11101158
<!-- YAML
11111159
added: REPLACEME

lib/internal/quic/core.js

Lines changed: 92 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,12 @@ const {
2121
validateQuicClientSessionOptions,
2222
validateQuicSocketOptions,
2323
} = require('internal/quic/util');
24+
const { validateNumber } = require('internal/validators');
2425
const util = require('util');
2526
const assert = require('internal/assert');
2627
const EventEmitter = require('events');
28+
const fs = require('fs');
29+
const fsPromisesInternal = require('internal/fs/promises');
2730
const { Duplex } = require('stream');
2831
const {
2932
createSecureContext: _createSecureContext
@@ -32,7 +35,7 @@ const {
3235
translatePeerCertificate
3336
} = require('_tls_common');
3437
const {
35-
defaultTriggerAsyncIdScope, // eslint-disable-line no-unused-vars
38+
defaultTriggerAsyncIdScope,
3639
symbols: {
3740
async_id_symbol,
3841
owner_symbol,
@@ -52,14 +55,15 @@ const {
5255

5356
const {
5457
ShutdownWrap,
55-
kReadBytesOrError, // eslint-disable-line no-unused-vars
56-
streamBaseState // eslint-disable-line no-unused-vars
58+
kReadBytesOrError,
59+
streamBaseState
5760
} = internalBinding('stream_wrap');
5861

5962
const {
6063
codes: {
6164
ERR_INVALID_ARG_TYPE,
6265
ERR_INVALID_ARG_VALUE,
66+
ERR_INVALID_OPT_VALUE,
6367
ERR_INVALID_CALLBACK,
6468
ERR_OUT_OF_RANGE,
6569
ERR_QUIC_ERROR,
@@ -78,6 +82,10 @@ const {
7882
exceptionWithHostPort
7983
} = require('internal/errors');
8084

85+
const { FileHandle } = internalBinding('fs');
86+
const { StreamPipe } = internalBinding('stream_pipe');
87+
const { UV_EOF } = internalBinding('uv');
88+
8189
const {
8290
QuicSocket: QuicSocketHandle,
8391
initSecureContext,
@@ -2253,6 +2261,87 @@ class QuicStream extends Duplex {
22532261
streamOnResume.call(this);
22542262
}
22552263

2264+
sendFile(path, options = {}) {
2265+
fs.open(path, 'r', QuicStream.#onFileOpened.bind(this, options));
2266+
}
2267+
2268+
static #onFileOpened = function(options, err, fd) {
2269+
const onError = options.onError;
2270+
if (err) {
2271+
if (onError) {
2272+
this.close();
2273+
onError(err);
2274+
} else {
2275+
this.destroy(err);
2276+
}
2277+
return;
2278+
}
2279+
2280+
if (this.destroyed || this.closed) {
2281+
fs.close(fd, (err) => { if (err) throw err; });
2282+
return;
2283+
}
2284+
2285+
this.sendFD(fd, options, true);
2286+
}
2287+
2288+
sendFD(fd, { offset = -1, length = -1 } = {}, ownsFd = false) {
2289+
if (this.destroyed || this.#closed)
2290+
return;
2291+
2292+
if (typeof offset !== 'number')
2293+
throw new ERR_INVALID_OPT_VALUE('options.offset', offset);
2294+
if (typeof length !== 'number')
2295+
throw new ERR_INVALID_OPT_VALUE('options.length', length);
2296+
2297+
if (fd instanceof fsPromisesInternal.FileHandle)
2298+
fd = fd.fd;
2299+
else if (typeof fd !== 'number')
2300+
throw new ERR_INVALID_ARG_TYPE('fd', ['number', 'FileHandle'], fd);
2301+
2302+
this[kUpdateTimer]();
2303+
this.ownsFd = ownsFd;
2304+
2305+
// Close the writable side of the stream, but only as far as the writable
2306+
// stream implementation is concerned.
2307+
this._final = null;
2308+
this.end();
2309+
2310+
defaultTriggerAsyncIdScope(this[async_id_symbol],
2311+
QuicStream.#startFilePipe,
2312+
this, fd, offset, length);
2313+
}
2314+
2315+
static #startFilePipe = (stream, fd, offset, length) => {
2316+
const handle = new FileHandle(fd, offset, length);
2317+
handle.onread = QuicStream.#onPipedFileHandleRead;
2318+
handle.stream = stream;
2319+
2320+
const pipe = new StreamPipe(handle, stream[kHandle]);
2321+
pipe.onunpipe = QuicStream.#onFileUnpipe;
2322+
pipe.start();
2323+
2324+
// Exact length of the file doesn't matter here, since the
2325+
// stream is closing anyway - just use 1 to signify that
2326+
// a write does exist
2327+
stream[kTrackWriteState](stream, 1);
2328+
}
2329+
2330+
static #onFileUnpipe = function() { // Called on the StreamPipe instance.
2331+
const stream = this.sink[owner_symbol];
2332+
if (stream.ownsFd)
2333+
this.source.close().catch(stream.destroy.bind(stream));
2334+
else
2335+
this.source.releaseFD();
2336+
}
2337+
2338+
static #onPipedFileHandleRead = function() {
2339+
const err = streamBaseState[kReadBytesOrError];
2340+
if (err < 0 && err !== UV_EOF) {
2341+
this.stream.destroy(errnoException(err, 'sendFD'));
2342+
}
2343+
}
2344+
22562345
get resetReceived() {
22572346
return (this.#resetCode !== undefined) ?
22582347
{ code: this.#resetCode | 0, finalSize: this.#resetFinalSize | 0 } :

0 commit comments

Comments
 (0)