11/*! create-torrent. MIT License. WebTorrent LLC <https://webtorrent.io/opensource> */
22const bencode = require ( 'bencode' )
3- const BlockStream = require ( 'block-stream2 ' )
3+ const blockIterator = require ( 'block-iterator ' )
44const calcPieceLength = require ( 'piece-length' )
55const corePath = require ( 'path' )
6- const { BlobReadStream } = require ( 'fast-blob-stream' )
76const isFile = require ( 'is-file' )
87const junk = require ( 'junk' )
98const joinIterator = require ( 'join-async-iterator' )
10- const once = require ( 'once' )
119const parallel = require ( 'run-parallel' )
1210const queueMicrotask = require ( 'queue-microtask' )
1311const { sha1 } = require ( 'uint8-util' )
14- const { Transform , PassThrough , Readable } = require ( 'streamx ' )
12+ require ( 'fast-readable-async-iterator ' )
1513
1614const getFiles = require ( './get-files' ) // browser exclude
1715
@@ -176,10 +174,10 @@ function _parseInput (input, opts, cb) {
176174 const file = { }
177175
178176 if ( isBlob ( item ) ) {
179- file . getStream = getBlobStream ( item )
177+ file . getStream = item . stream ( )
180178 file . length = item . size
181179 } else if ( Buffer . isBuffer ( item ) ) {
182- file . getStream = getBufferStream ( item )
180+ file . getStream = [ item ] // wrap in iterable to write entire buffer at once instead of unwrapping all bytes
183181 file . length = item . length
184182 } else if ( isReadable ( item ) ) {
185183 file . getStream = getStreamStream ( item , file )
@@ -206,73 +204,42 @@ function _parseInput (input, opts, cb) {
206204
207205const MAX_OUTSTANDING_HASHES = 5
208206
209- function getPieceList ( files , pieceLength , estimatedTorrentLength , opts , cb ) {
210- cb = once ( cb )
207+ async function getPieceList ( files , pieceLength , estimatedTorrentLength , opts , cb ) {
211208 const pieces = [ ]
212209 let length = 0
213210 let hashedLength = 0
214211
215212 const streams = files . map ( file => file . getStream )
216213
214+ const onProgress = opts . onProgress
215+
217216 let remainingHashes = 0
218217 let pieceNum = 0
219218 let ended = false
220219
221- const multistream = Readable . from ( joinIterator ( streams ) )
222- const blockstream = new BlockStream ( pieceLength , { zeroPadding : false } )
223-
224- multistream . on ( 'error' , onError )
225-
226- multistream
227- . pipe ( blockstream )
228- . on ( 'data' , onData )
229- . on ( 'end' , onEnd )
230- . on ( 'error' , onError )
231-
232- function onData ( chunk ) {
233- length += chunk . length
234-
235- const i = pieceNum
236- sha1 ( chunk , hash => {
237- pieces [ i ] = hash
238- remainingHashes -= 1
239- if ( remainingHashes < MAX_OUTSTANDING_HASHES ) {
240- blockstream . resume ( )
241- }
242- hashedLength += chunk . length
243- if ( opts . onProgress ) opts . onProgress ( hashedLength , estimatedTorrentLength )
244- maybeDone ( )
245- } )
246- remainingHashes += 1
247- if ( remainingHashes >= MAX_OUTSTANDING_HASHES ) {
248- blockstream . pause ( )
220+ const iterator = blockIterator ( joinIterator ( streams ) , pieceLength , { zeroPadding : false } )
221+ try {
222+ for await ( const chunk of iterator ) {
223+ await new Promise ( resolve => {
224+ length += chunk . length
225+ const i = pieceNum
226+ ++ pieceNum
227+ if ( ++ remainingHashes < MAX_OUTSTANDING_HASHES ) resolve ( )
228+ sha1 ( chunk , hash => {
229+ pieces [ i ] = hash
230+ -- remainingHashes
231+ hashedLength += chunk . length
232+ if ( onProgress ) onProgress ( hashedLength , estimatedTorrentLength )
233+ resolve ( )
234+ if ( ended && remainingHashes === 0 ) cb ( null , Buffer . from ( pieces . join ( '' ) , 'hex' ) , length )
235+ } )
236+ } )
249237 }
250- pieceNum += 1
251- }
252-
253- function onEnd ( ) {
238+ if ( remainingHashes === 0 ) return cb ( null , Buffer . from ( pieces . join ( '' ) , 'hex' ) , length )
254239 ended = true
255- maybeDone ( )
256- }
257-
258- function onError ( err ) {
259- cleanup ( )
240+ } catch ( err ) {
260241 cb ( err )
261242 }
262-
263- function cleanup ( ) {
264- multistream . removeListener ( 'error' , onError )
265- blockstream . removeListener ( 'data' , onData )
266- blockstream . removeListener ( 'end' , onEnd )
267- blockstream . removeListener ( 'error' , onError )
268- }
269-
270- function maybeDone ( ) {
271- if ( ended && remainingHashes === 0 ) {
272- cleanup ( )
273- cb ( null , Buffer . from ( pieces . join ( '' ) , 'hex' ) , length )
274- }
275- }
276243}
277244
278245function onFiles ( files , opts , cb ) {
@@ -409,45 +376,18 @@ function isReadable (obj) {
409376}
410377
411378/**
412- * Convert a `File` to a lazy readable stream.
413- * @param {File|Blob } file
414- * @return {function }
415- */
416- function getBlobStream ( file ) {
417- return ( ) => new BlobReadStream ( file )
418- }
419-
420- /**
421- * Convert a `Buffer` to a lazy readable stream.
422- * @param {Buffer } buffer
423- * @return {function }
424- */
425- function getBufferStream ( buffer ) {
426- return ( ) => {
427- const s = new PassThrough ( )
428- s . end ( buffer )
429- return s
430- }
431- }
432-
433- /**
434- * Convert a readable stream to a lazy readable stream. Adds instrumentation to track
379+ * Convert a readable stream to a lazy async iterator. Adds instrumentation to track
435380 * the number of bytes in the stream and set `file.length`.
436381 *
382+ * @generator
437383 * @param {Stream } readable
438384 * @param {Object } file
439- * @return {function }
385+ * @return {Uint8Array } stream data/chunk
440386 */
441- function getStreamStream ( readable , file ) {
442- return ( ) => {
443- const counter = new Transform ( )
444- counter . _transform = function ( data , cb ) {
445- file . length += data . length
446- this . push ( data )
447- cb ( )
448- }
449- readable . pipe ( counter )
450- return counter
387+ async function * getStreamStream ( readable , file ) {
388+ for await ( const chunk of readable ) {
389+ file . length += chunk . length
390+ yield chunk
451391 }
452392}
453393
0 commit comments