Skip to content
This repository was archived by the owner on Dec 1, 2024. It is now read-only.

Commit 12e218c

Browse files
committed
Merge pull request #84 from rvagg/multi-read-stream-close
extracted ReadStream state manager, fixes #82
2 parents 7bb7d9c + 8639f08 commit 12e218c

3 files changed

Lines changed: 99 additions & 27 deletions

File tree

lib/read-stream-state.js

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
function State () {
2+
this.ended = this._ready = this._reading = this._destroyed = this._paused = false
3+
}
4+
5+
State.prototype.end = function() {
6+
this.ended = true
7+
this._destroyed = false
8+
}
9+
10+
State.prototype.ready = function() {
11+
this._ready = true
12+
}
13+
14+
State.prototype.destroy = function() {
15+
this._destroyed = true
16+
}
17+
18+
State.prototype.pause = function() {
19+
this._paused = true
20+
}
21+
22+
State.prototype.resume = function() {
23+
this._paused = false
24+
}
25+
26+
State.prototype.read = function() {
27+
this._reading = true
28+
}
29+
30+
State.prototype.endRead = function() {
31+
this._reading = false
32+
}
33+
34+
State.prototype.canPause = function() {
35+
return !this.ended && !this._paused
36+
}
37+
38+
State.prototype.canResume = function() {
39+
return !this.ended && this._paused
40+
}
41+
42+
State.prototype.canRead = function() {
43+
return !this.ended && !this._reading && !this._paused
44+
}
45+
46+
State.prototype.canCleanup = function() {
47+
return !this.ended && !this._reading
48+
}
49+
50+
State.prototype.canEmitData = function() {
51+
return !this.ended && !this._destroyed
52+
}
53+
54+
State.prototype.canEnd = function() {
55+
return !this.ended
56+
}
57+
58+
module.exports = function () { return new State() }

lib/read-stream.js

Lines changed: 25 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ var Stream = require('stream').Stream
77
, bufferStream = require('simple-bufferstream')
88
, inherits = require('util').inherits
99
, extend = require('util')._extend
10+
, State = require('./read-stream-state')
1011

1112
, toEncoding = require('./util').toEncoding
1213
, toSlice = require('./util').toSlice
@@ -28,11 +29,11 @@ var Stream = require('stream').Stream
2829
}
2930
, makeNoData = function () { return null }
3031

31-
3232
function ReadStream (options, db, iteratorFactory) {
3333
Stream.call(this)
3434

35-
this._status = 'ready'
35+
this._state = State()
36+
3637
this._dataEvent = 'data'
3738
this.readable = true
3839
this.writable = false
@@ -59,9 +60,10 @@ function ReadStream (options, db, iteratorFactory) {
5960

6061

6162
var ready = function () {
62-
if (this._status == 'ended')
63+
if (!this._state.canEmitData())
6364
return
6465

66+
this._state.ready()
6567
this._iterator = iteratorFactory(this._options)
6668
this.emit('ready')
6769
this._read()
@@ -76,21 +78,22 @@ function ReadStream (options, db, iteratorFactory) {
7678
inherits(ReadStream, Stream)
7779

7880
ReadStream.prototype.destroy = function () {
79-
this._status = 'destroyed'
80-
this._cleanup()
81+
this._state.destroy()
82+
if (this._state.canCleanup())
83+
this._cleanup()
8184
}
8285

8386
ReadStream.prototype.pause = function () {
84-
if (this._status != 'ended' && !/\+paused$/.test(this._status)) {
87+
if (this._state.canPause()) {
88+
this._state.pause()
8589
this.emit('pause')
86-
this._status += '+paused' // preserve existing status
8790
}
8891
}
8992

9093
ReadStream.prototype.resume = function () {
91-
if (this._status != 'ended') {
94+
if (this._state.canResume()) {
9295
this.emit('resume')
93-
this._status = this._status.replace(/\+paused$/, '')
96+
this._state.resume()
9497
this._read()
9598
}
9699
}
@@ -115,44 +118,39 @@ ReadStream.prototype.pipe = function (dest) {
115118
}
116119

117120
ReadStream.prototype._read = function () {
118-
if (this._status == 'ready') {
119-
this._status = 'reading'
121+
if (this._state.canRead()) {
122+
this._state.read()
120123
this._iterator.next(this._onData.bind(this))
121124
}
122125
}
123126

124127
ReadStream.prototype._onData = function (err, key, value) {
125-
if (err)
128+
this._state.endRead()
129+
if (err || !arguments.length /* end */ || !this._state.canEmitData())
126130
return this._cleanup(err)
127-
if (!arguments.length) // end
128-
return this._cleanup()
129-
if (this._status == 'ended')
130-
return
131-
if (/^reading/.test(this._status))
132-
this._status = this._status.replace(/^reading/, 'ready')
133-
this._read()
131+
this._read() // queue another read even tho we may not need it
134132
this.emit(this._dataEvent, this._makeData(key, value))
135133
}
136134

137135
ReadStream.prototype._cleanup = function (err) {
138-
if (this._status == 'ended')
139-
return err && this.emit('error', err)
136+
if (err)
137+
this.emit('error', err)
138+
139+
if (!this._state.canEnd())
140+
return
140141

141-
var s = this._status
142-
this._status = 'ended'
142+
this._state.end()
143143
this.readable = false
144144

145145
if (this._iterator) {
146146
this._iterator.end(function () {
147+
this._iterator = null
147148
this.emit('close')
148149
}.bind(this))
149150
} else
150151
this.emit('close')
151152

152-
if (err)
153-
this.emit('error', err)
154-
else (s != 'destroyed')
155-
this.emit('end')
153+
this.emit('end')
156154
}
157155

158156
ReadStream.prototype.toString = function () {

test/read-stream-test.js

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -655,4 +655,20 @@ buster.testCase('ReadStream', {
655655
}.bind(this))
656656
}.bind(this))
657657
}
658+
659+
, 'test can only end once': function (done) {
660+
this.openTestDatabase(function (db) {
661+
db.batch(this.sourceData.slice(), function (err) {
662+
refute(err)
663+
664+
var rs = db.createReadStream()
665+
.on('close', done)
666+
667+
process.nextTick(function () {
668+
rs.destroy()
669+
})
670+
671+
}.bind(this))
672+
}.bind(this))
673+
}
658674
})

0 commit comments

Comments
 (0)