| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108 | var Stream = require('stream')// through//// a stream that does nothing but re-emit the input.// useful for aggregating a series of changing but not ending streams into one stream)exports = module.exports = throughthrough.through = through//create a readable writable stream.function through (write, end, opts) {  write = write || function (data) { this.queue(data) }  end = end || function () { this.queue(null) }  var ended = false, destroyed = false, buffer = [], _ended = false  var stream = new Stream()  stream.readable = stream.writable = true  stream.paused = false//  stream.autoPause   = !(opts && opts.autoPause   === false)  stream.autoDestroy = !(opts && opts.autoDestroy === false)  stream.write = function (data) {    write.call(this, data)    return !stream.paused  }  function drain() {    while(buffer.length && !stream.paused) {      var data = buffer.shift()      if(null === data)        return stream.emit('end')      else        stream.emit('data', data)    }  }  stream.queue = stream.push = function (data) {//    console.error(ended)    if(_ended) return stream    if(data === null) _ended = true    buffer.push(data)    drain()    return stream  }  //this will be registered as the first 'end' listener  //must call destroy next tick, to make sure we're after any  //stream piped from here.  //this is only a problem if end is not emitted synchronously.  //a nicer way to do this is to make sure this is the last listener for 'end'  stream.on('end', function () {    stream.readable = false    if(!stream.writable && stream.autoDestroy)      process.nextTick(function () {        stream.destroy()      })  })  function _end () {    stream.writable = false    end.call(stream)    if(!stream.readable && stream.autoDestroy)      stream.destroy()  }  stream.end = function (data) {    if(ended) return    ended = true    if(arguments.length) stream.write(data)    _end() // will emit or queue    return stream  }  stream.destroy = function () {    if(destroyed) return    destroyed = true    ended = true    buffer.length = 0    stream.writable = stream.readable = false    stream.emit('close')    return stream  }  stream.pause = function () {    if(stream.paused) return    stream.paused = true    return stream  }  stream.resume = function () {    if(stream.paused) {      stream.paused = false      stream.emit('resume')    }    drain()    //may have become paused again,    //as drain emits 'data'.    if(!stream.paused)      stream.emit('drain')    return stream  }  return stream}
 |