Skip to content

Commit 0d32c17

Browse files
committed
fix(core): When piping a stream to multiple destinations, do so safely
1 parent ed37ec8 commit 0d32c17

File tree

4 files changed

+76
-47
lines changed

4 files changed

+76
-47
lines changed

package.json

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@
3737
"_test": "tsdx test --runInBand",
3838
"test": "yarn _test --ci",
3939
"test-watch": "yarn _test --watch",
40-
"lint": "tsdx lint scripts src test types website",
40+
"lint": "tsdx lint scripts src test website",
4141
"lint-fix": "yarn lint --fix",
4242
"extract-api": "api-extractor run --local",
4343
"check-api": "api-extractor run",
@@ -49,6 +49,7 @@
4949
},
5050
"dependencies": {
5151
"@scarf/scarf": "^1.0.6",
52+
"cloneable-readable": "^2.0.1",
5253
"express": "^4.17.1",
5354
"http-proxy-middleware": "^1.0.4",
5455
"merge-stream": "^2.0.0",
@@ -64,6 +65,7 @@
6465
"@microsoft/api-extractor": "^7.9.0",
6566
"@semantic-release/changelog": "^5.0.1",
6667
"@semantic-release/git": "^9.0.0",
68+
"@types/cloneable-readable": "^2.0.0",
6769
"@types/express": "^4.17.6",
6870
"@types/merge-stream": "^1.1.2",
6971
"@types/node-fetch": "^2.5.7",

src/core/Service.ts

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import { PassThrough } from 'stream'
2+
import cloneable from 'cloneable-readable'
23
import { ServiceProcess } from './ServiceProcess'
34
import { NormalizedServiceConfig } from './validateAndNormalizeConfig'
45
import { ReadyContext } from './ReadyContext'
@@ -10,9 +11,10 @@ import { Logger } from './Logger'
1011
export class Service {
1112
public readonly id: string
1213
public readonly config: NormalizedServiceConfig
13-
public readonly output = new PassThrough({ objectMode: true })
14+
public readonly output = cloneable(new PassThrough({ objectMode: true }))
1415
private readonly logger: Logger
1516
private readonly die: (message: string) => Promise<never>
17+
private readonly outputClone = this.output.clone()
1618
private ready: Promise<void> | undefined
1719
private process: ServiceProcess | undefined
1820
private startResult: Promise<void> | undefined
@@ -46,11 +48,16 @@ export class Service {
4648
return this.startResult
4749
}
4850
private defineReady() {
49-
const output = this.output.pipe(new PassThrough({ objectMode: true }))
50-
const ctx: ReadyContext = { output }
51-
this.ready = promiseTry(() => this.config.ready(ctx)).catch(error =>
52-
this.die(`Error from ready function: ${maybeErrorText(error)}`)
53-
)
51+
const ctx: ReadyContext = {
52+
output: this.outputClone,
53+
}
54+
this.ready = promiseTry(() => this.config.ready(ctx))
55+
.catch(error =>
56+
this.die(`Error from ready function: ${maybeErrorText(error)}`)
57+
)
58+
.then(() => {
59+
this.outputClone.destroy()
60+
})
5461
}
5562
private async startProcess() {
5663
const proc = new ServiceProcess(this.config, () => {
@@ -103,7 +110,7 @@ export class Service {
103110
}
104111
public stop() {
105112
if (!this.stopResult) {
106-
if (!this.process || this.process.isEnded) {
113+
if (!this.process || !this.process.isRunning()) {
107114
this.stopResult = Promise.resolve()
108115
} else {
109116
this.logger.info(`Stopping service '${this.id}'...`)

src/core/ServiceProcess.ts

Lines changed: 43 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -1,70 +1,75 @@
1-
import { PassThrough } from 'stream'
1+
import { Readable, PassThrough } from 'stream'
22
import { ChildProcessWithoutNullStreams } from 'child_process'
3-
import { once } from 'events'
43
import mergeStream from 'merge-stream'
54
import splitStream from 'split'
65
import { NormalizedServiceConfig } from './validateAndNormalizeConfig'
76
import { spawnProcess } from './spawnProcess'
87

9-
const split = () => splitStream((line: string) => `${line}\n`)
10-
118
export class ServiceProcess {
129
public readonly output = new PassThrough({ objectMode: true })
1310
public readonly started: Promise<void>
14-
public isEnded = false
1511
public logTail: string[] = []
1612
private readonly process: ChildProcessWithoutNullStreams
13+
private didError = false
14+
private didEnd = false
1715
private readonly ended: Promise<void>
1816
private wasEndCalled = false
1917
constructor(config: NormalizedServiceConfig, onCrash: () => void) {
2018
this.process = spawnProcess(config)
21-
const childOutput = mergeStream(
22-
this.process.stdout.setEncoding('utf8').pipe(split()),
23-
this.process.stderr.setEncoding('utf8').pipe(split())
24-
)
25-
childOutput.pipe(this.output)
26-
if (config.logTailLength > 0) {
27-
this.output.on('data', line => {
28-
this.logTail.push(line)
29-
if (this.logTail.length > config.logTailLength) {
30-
this.logTail.shift()
31-
}
32-
})
33-
}
34-
const error = new Promise(resolve => this.process.on('error', resolve))
3519
this.started = Promise.race([
36-
error,
37-
new Promise(resolve => setTimeout(resolve, 100)),
20+
new Promise(resolve => this.process.once('error', resolve)),
21+
new Promise(resolve => setTimeout(() => resolve(), 100)),
3822
]).then(error => {
39-
if (!error) {
40-
return
23+
if (error) {
24+
this.didError = true
25+
throw error
4126
}
42-
childOutput.unpipe(this.output)
43-
this.output.end()
44-
return Promise.reject(error)
4527
})
46-
const didStart = this.started.then(
47-
() => true,
48-
() => false
28+
const processOutput = mergeStream(
29+
transformStream(this.process.stdout),
30+
transformStream(this.process.stderr)
4931
)
50-
this.ended = Promise.race([error, once(childOutput, 'end')]).then(() => {
51-
this.isEnded = true
52-
if (!this.wasEndCalled) {
53-
didStart.then(didStart => {
54-
if (didStart) {
55-
onCrash()
32+
this.ended = (async () => {
33+
for await (const line of processOutput as AsyncIterable<string>) {
34+
if (this.didError) {
35+
break
36+
}
37+
this.output.write(line)
38+
if (config.logTailLength > 0) {
39+
this.logTail.push(line)
40+
if (this.logTail.length > config.logTailLength) {
41+
this.logTail.shift()
5642
}
57-
})
43+
}
44+
}
45+
this.didEnd = true
46+
this.output.end()
47+
})()
48+
Promise.all([this.started.catch(() => {}), this.ended]).then(() => {
49+
if (!this.didError && !this.wasEndCalled) {
50+
onCrash()
5851
}
5952
})
6053
}
61-
end() {
54+
public isRunning() {
55+
return !this.didError && !this.didEnd
56+
}
57+
public end() {
6258
if (!this.wasEndCalled) {
6359
this.wasEndCalled = true
64-
if (!this.isEnded) {
60+
if (this.isRunning()) {
6561
this.process.kill('SIGINT')
6662
}
6763
}
6864
return this.ended
6965
}
7066
}
67+
68+
/**
69+
* Split input into stream of utf8 strings ending in '\n'
70+
* */
71+
function transformStream(input: Readable): Readable {
72+
return input
73+
.setEncoding('utf8')
74+
.pipe(splitStream((line: string) => `${line}\n`))
75+
}

yarn.lock

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1372,6 +1372,13 @@
13721372
"@types/connect" "*"
13731373
"@types/node" "*"
13741374

1375+
"@types/cloneable-readable@^2.0.0":
1376+
version "2.0.0"
1377+
resolved "https://registry.yarnpkg.com/@types/cloneable-readable/-/cloneable-readable-2.0.0.tgz#b5bc6602d4771a5db9d80f3867427e9da5f68a63"
1378+
integrity sha512-Q9fTsA3hEbOXmGIZ7StMunr/SCvtdfXDfJcDadYk/MPbS3Xh/fWCsdhW26NVx1XNNcX3SkdBqPkfbOiD6p3q2Q==
1379+
dependencies:
1380+
"@types/node" "*"
1381+
13751382
"@types/color-name@^1.1.1":
13761383
version "1.1.1"
13771384
resolved "https://registry.yarnpkg.com/@types/color-name/-/color-name-1.1.1.tgz#1c1261bbeaa10a8055bbc5d8ab84b7b2afc846a0"
@@ -2613,6 +2620,14 @@ clone@^1.0.2:
26132620
resolved "https://registry.yarnpkg.com/clone/-/clone-1.0.4.tgz#da309cc263df15994c688ca902179ca3c7cd7c7e"
26142621
integrity sha1-2jCcwmPfFZlMaIypAheco8fNfH4=
26152622

2623+
cloneable-readable@^2.0.1:
2624+
version "2.0.1"
2625+
resolved "https://registry.yarnpkg.com/cloneable-readable/-/cloneable-readable-2.0.1.tgz#fc2240beddbe5621b872acad8104dcc86574e225"
2626+
integrity sha512-1ke/wckhpSevGPQzKb+qGHMsuFrSUKQlsKh0PTmscmfAzw8MgONqrg5a0e0Un1YO/cOSS4wAepfXSGus5RoonQ==
2627+
dependencies:
2628+
inherits "^2.0.1"
2629+
readable-stream "^3.3.0"
2630+
26162631
cmd-shim@^3.0.0, cmd-shim@^3.0.3:
26172632
version "3.0.3"
26182633
resolved "https://registry.yarnpkg.com/cmd-shim/-/cmd-shim-3.0.3.tgz#2c35238d3df37d98ecdd7d5f6b8dc6b21cadc7cb"
@@ -7708,7 +7723,7 @@ read@1, read@~1.0.1, read@~1.0.7:
77087723
string_decoder "~1.1.1"
77097724
util-deprecate "~1.0.1"
77107725

7711-
"readable-stream@2 || 3", readable-stream@^3.6.0:
7726+
"readable-stream@2 || 3", readable-stream@^3.3.0, readable-stream@^3.6.0:
77127727
version "3.6.0"
77137728
resolved "https://registry.yarnpkg.com/readable-stream/-/readable-stream-3.6.0.tgz#337bbda3adc0706bd3e024426a286d4b4b2c9198"
77147729
integrity sha512-BViHy7LKeTz4oNnkcLJ+lVSL6vpiFeX6/d3oSH8zCW7UxP2onchk+vTGB143xuFjHS3deTgkKoXXymXqymiIdA==

0 commit comments

Comments
 (0)