diff --git a/index.js b/index.js index 49d995a..81108ad 100755 --- a/index.js +++ b/index.js @@ -12,15 +12,11 @@ const fs = require('fs'); const nodePath = require('path'); // renamed to prevent conflicts in `scanDir` const { promisify } = require('util'); const { execFile } = require('child_process'); -const { PassThrough, Transform, ReadableStream } = require('stream'); +const { PassThrough, Transform, Readable } = require('stream'); const { Socket } = require('dgram'); const NodeClamError = require('./lib/NodeClamError'); const NodeClamTransform = require('./lib/NodeClamTransform.js'); -/** - * @typedef {ReadableStream} ReadableStream - */ - // Enable these once the FS.promises API is no longer experimental // const fsPromises = require('fs').promises; // const fsAccess = fsPromises.access; @@ -1182,7 +1178,7 @@ class NodeClam { // Ex. uploadStream.pipe().pipe(destination_stream) return new Transform({ // This should be fired on each chunk received - async transform(chunk, encoding, cb) { + transform(chunk, encoding, cb) { // DRY method for handling each chunk as it comes in const doTransform = () => { // Write data to our fork stream. If it fails, @@ -1231,134 +1227,147 @@ class NodeClam { // Setup an array to collect the responses from ClamAV this._clamavResponseChunks = []; - try { - // Get a connection to the ClamAV Socket - this._clamavSocket = await me._initSocket('passthrough'); - if (me.settings.debugMode) console.log(`${me.debugLabel}: ClamAV Socket Initialized...`); - - // Setup a pipeline that will pass chunks through our custom Tranform and on to ClamAV - this._forkStream.pipe(this._clamavTransform).pipe(this._clamavSocket); - - // When the CLamAV socket connection is closed (could be after 'end' or because of an error)... - this._clamavSocket - .on('close', (hadError) => { - if (me.settings.debugMode) - console.log( - `${me.debugLabel}: ClamAV socket has been closed! Because of Error:`, - hadError - ); - this._clamavSocket.end(); - }) - // When the ClamAV socket connection ends (receives chunk) - .on('end', () => { - this._clamavSocket.end(); - if (me.settings.debugMode) - console.log(`${me.debugLabel}: ClamAV socket has received the last chunk!`); - // Process the collected chunks - const response = Buffer.concat(this._clamavResponseChunks); - const result = me._processResult(response.toString('utf8'), null); - this._clamavResponseChunks = []; - if (me.settings.debugMode) { - console.log(`${me.debugLabel}: Result of scan:`, result); - console.log( - `${me.debugLabel}: It took ${_avScanTime} seconds to scan the file(s).` - ); - clearScanBenchmark(); - } + // Get a connection to the ClamAV Socket + me._initSocket('passthrough').then( + (socket) => { + this._clamavSocket = socket; + + if (me.settings.debugMode) console.log(`${me.debugLabel}: ClamAV Socket Initialized...`); - // If the scan timed-out - if (result.timeout === true) this.emit('timeout'); + // Setup a pipeline that will pass chunks through our custom Tranform and on to ClamAV + this._forkStream.pipe(this._clamavTransform).pipe(this._clamavSocket); - // NOTE: "scan-complete" could be called by the `handleError` method. - // We don't want to to double-emit this message. - if (_scanComplete === false) { - _scanComplete = true; + // When the CLamAV socket connection is closed (could be after 'end' or because of an error)... + this._clamavSocket + .on('close', (hadError) => { + if (me.settings.debugMode) + console.log( + `${me.debugLabel}: ClamAV socket has been closed! Because of Error:`, + hadError + ); this._clamavSocket.end(); - this.emit('scan-complete', result); - } - }) - // If connection timesout. - .on('timeout', () => { - this.emit('timeout', new Error('Connection to host/socket has timed out')); - this._clamavSocket.end(); - if (me.settings.debugMode) - console.log(`${me.debugLabel}: Connection to host/socket has timed out`); - }) - // When the ClamAV socket is ready to receive packets (this will probably never fire here) - .on('ready', () => { - if (me.settings.debugMode) - console.log(`${me.debugLabel}: ClamAV socket ready to receive`); - }) - // When we are officially connected to the ClamAV socket (probably will never fire here) - .on('connect', () => { - if (me.settings.debugMode) console.log(`${me.debugLabel}: Connected to ClamAV socket`); - }) - // If an error is emitted from the ClamAV socket - .on('error', (err) => { - console.error(`${me.debugLabel}: Error emitted from ClamAV socket: `, err); - handleError(err); - }) - // If ClamAV is sending stuff to us (ie, an "OK", "Virus FOUND", or "ERROR") - .on('data', (cvChunk) => { - // Push this chunk to our results collection array - this._clamavResponseChunks.push(cvChunk); - if (me.settings.debugMode) - console.log(`${me.debugLabel}: Got result!`, cvChunk.toString()); - - // Parse what we've gotten back from ClamAV so far... - const response = Buffer.concat(this._clamavResponseChunks); - const result = me._processResult(response.toString(), null); - - // If there's an error supplied or if we detect a virus or timeout, stop stream immediately. - if ( - result instanceof NodeClamError || - (typeof result === 'object' && - (('isInfected' in result && result.isInfected === true) || - ('timeout' in result && result.timeout === true))) - ) { - // If a virus is detected... - if ( - typeof result === 'object' && - 'isInfected' in result && - result.isInfected === true - ) { - handleError(null, true, result); + }) + // When the ClamAV socket connection ends (receives chunk) + .on('end', () => { + this._clamavSocket.end(); + if (me.settings.debugMode) + console.log(`${me.debugLabel}: ClamAV socket has received the last chunk!`); + // Process the collected chunks + const response = Buffer.concat(this._clamavResponseChunks); + const result = me._processResult(response.toString('utf8'), null); + this._clamavResponseChunks = []; + if (me.settings.debugMode) { + console.log(`${me.debugLabel}: Result of scan:`, result); + console.log( + `${me.debugLabel}: It took ${_avScanTime} seconds to scan the file(s).` + ); + clearScanBenchmark(); } - // If a timeout is detected... - else if ( - typeof result === 'object' && - 'isInfected' in result && - result.isInfected === true - ) { - this.emit('timeout'); - handleError(null, false, result); - } + // If the scan timed-out + if (result.timeout === true) this.emit('timeout'); - // If any other kind of error is detected... - else { - handleError(result); + // NOTE: "scan-complete" could be called by the `handleError` method. + // We don't want to to double-emit this message. + if (_scanComplete === false) { + _scanComplete = true; + this._clamavSocket.end(); + this.emit('scan-complete', result); } - } - // For debugging purposes, spit out what was processed (if anything). - else if (me.settings.debugMode) - console.log(`${me.debugLabel}: Processed Result: `, result, response.toString()); - }); + }) + // If connection timesout. + .on('timeout', () => { + this.emit('timeout', new Error('Connection to host/socket has timed out')); + this._clamavSocket.end(); + if (me.settings.debugMode) + console.log(`${me.debugLabel}: Connection to host/socket has timed out`); + }) + // When the ClamAV socket is ready to receive packets (this will probably never fire here) + .on('ready', () => { + if (me.settings.debugMode) + console.log(`${me.debugLabel}: ClamAV socket ready to receive`); + }) + // When we are officially connected to the ClamAV socket (probably will never fire here) + .on('connect', () => { + if (me.settings.debugMode) + console.log(`${me.debugLabel}: Connected to ClamAV socket`); + }) + // If an error is emitted from the ClamAV socket + .on('error', (err) => { + console.error(`${me.debugLabel}: Error emitted from ClamAV socket: `, err); + handleError(err); + }) + // If ClamAV is sending stuff to us (ie, an "OK", "Virus FOUND", or "ERROR") + .on('data', (cvChunk) => { + // Push this chunk to our results collection array + this._clamavResponseChunks.push(cvChunk); + if (me.settings.debugMode) + console.log(`${me.debugLabel}: Got result!`, cvChunk.toString()); + + // Parse what we've gotten back from ClamAV so far... + const response = Buffer.concat(this._clamavResponseChunks); + const result = me._processResult(response.toString(), null); + + // If there's an error supplied or if we detect a virus or timeout, stop stream immediately. + if ( + result instanceof NodeClamError || + (typeof result === 'object' && + (('isInfected' in result && result.isInfected === true) || + ('timeout' in result && result.timeout === true))) + ) { + // If a virus is detected... + if ( + typeof result === 'object' && + 'isInfected' in result && + result.isInfected === true + ) { + handleError(null, true, result); + } + + // If a timeout is detected... + else if ( + typeof result === 'object' && + 'isInfected' in result && + result.isInfected === true + ) { + this.emit('timeout'); + handleError(null, false, result); + } + + // If any other kind of error is detected... + else { + handleError(result); + } + } + // For debugging purposes, spit out what was processed (if anything). + else if (me.settings.debugMode) + console.log( + `${me.debugLabel}: Processed Result: `, + result, + response.toString() + ); + }); + + if (me.settings.debugMode) console.log(`${me.debugLabel}: Doing initial transform!`); + // Handle the chunk + doTransform(); + }, + (err) => { + // Close socket if it's currently valid + if ( + this._clamavSocket && + 'readyState' in this._clamavSocket && + this._clamavSocket.readyState + ) { + this._clamavSocket.end(); + } - if (me.settings.debugMode) console.log(`${me.debugLabel}: Doing initial transform!`); - // Handle the chunk - doTransform(); - } catch (err) { - // Close socket if it's currently valid - if (this._clamavSocket && 'readyState' in this._clamavSocket && this._clamavSocket.readyState) { - this._clamavSocket.end(); + // If there's an issue connecting to the ClamAV socket, this is where that's handled + if (me.settings.debugMode) + console.error(`${me.debugLabel}: Error initiating socket to ClamAV: `, err); + handleError(err); } - - // If there's an issue connecting to the ClamAV socket, this is where that's handled - if (me.settings.debugMode) - console.error(`${me.debugLabel}: Error initiating socket to ClamAV: `, err); - handleError(err); - } + ); } else { // if (me.settings.debugMode) console.log(`${me.debugLabel}: Doing transform: ${++counter}`); // Handle the chunk @@ -2093,7 +2102,7 @@ class NodeClam { * use of a TCP or UNIX Domain socket. In other words, this will not work if you only * have access to a local ClamAV binary. * - * @param {ReadableStream} stream - A readable stream to scan + * @param {Readable} stream - A readable stream to scan * @param {Function} [cb] - What to do when the socket response with results * @returns {Promise} Object like: `{ file: String, isInfected: Boolean, viruses: Array }` * @example diff --git a/tests/index.js b/tests/index.js index 4b3d9d2..430eb40 100755 --- a/tests/index.js +++ b/tests/index.js @@ -1530,6 +1530,23 @@ describe('passthrough', () => { }); }); + // https://github.com/kylefarris/clamscan/issues/82 + it('should not throw multiple callback error', (done) => { + // To reliably reproduce the issue in the broken code, it's important that this is an async generator + // and it emits some chunks larger than the default highWaterMark of 16 KB. + async function* gen(i = 10) { + while (i < 25) { + yield Buffer.from(new Array(i++ * 1024).fill()); + } + } + + const input = Readable.from(gen()); + const av = clamscan.passthrough(); + + // The failure case will throw an error and not finish + input.pipe(av).on('end', done).resume(); + }); + if (!process.env.CI) { it('should handle a 0-byte file', () => { const input = fs.createReadStream(emptyFile);