| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153 | 'use strict'var PassThrough = require('readable-stream').PassThroughvar inherits = require('inherits')var p = require('process-nextick-args')function Cloneable (stream, opts) {  if (!(this instanceof Cloneable)) {    return new Cloneable(stream, opts)  }  var objectMode = stream._readableState.objectMode  this._original = stream  this._clonesCount = 1  opts = opts || {}  opts.objectMode = objectMode  PassThrough.call(this, opts)  forwardDestroy(stream, this)  this.on('newListener', onData)  this.once('resume', onResume)  this._hasListener = true}inherits(Cloneable, PassThrough)function onData (event, listener) {  if (event === 'data' || event === 'readable') {    this._hasListener = false    this.removeListener('newListener', onData)    this.removeListener('resume', onResume)    p.nextTick(clonePiped, this)  }}function onResume () {  this._hasListener = false  this.removeListener('newListener', onData)  p.nextTick(clonePiped, this)}Cloneable.prototype.clone = function () {  if (!this._original) {    throw new Error('already started')  }  this._clonesCount++  // the events added by the clone should not count  // for starting the flow  this.removeListener('newListener', onData)  var clone = new Clone(this)  if (this._hasListener) {    this.on('newListener', onData)  }  return clone}Cloneable.prototype._destroy = function (err, cb) {  if (!err) {    this.push(null)    this.end()    this.emit('close')  }  p.nextTick(cb, err)}function forwardDestroy (src, dest) {  src.on('error', destroy)  src.on('close', onClose)  function destroy (err) {    src.removeListener('close', onClose)    dest.destroy(err)  }  function onClose () {    dest.end()  }}function clonePiped (that) {  if (--that._clonesCount === 0 && !that._readableState.destroyed) {    that._original.pipe(that)    that._original = undefined  }}function Clone (parent, opts) {  if (!(this instanceof Clone)) {    return new Clone(parent, opts)  }  var objectMode = parent._readableState.objectMode  opts = opts || {}  opts.objectMode = objectMode  this.parent = parent  PassThrough.call(this, opts)  forwardDestroy(parent, this)  parent.pipe(this)  // the events added by the clone should not count  // for starting the flow  // so we add the newListener handle after we are done  this.on('newListener', onDataClone)  this.on('resume', onResumeClone)}function onDataClone (event, listener) {  // We start the flow once all clones are piped or destroyed  if (event === 'data' || event === 'readable' || event === 'close') {    p.nextTick(clonePiped, this.parent)    this.removeListener('newListener', onDataClone)  }}function onResumeClone () {  this.removeListener('newListener', onDataClone)  p.nextTick(clonePiped, this.parent)}inherits(Clone, PassThrough)Clone.prototype.clone = function () {  return this.parent.clone()}Cloneable.isCloneable = function (stream) {  return stream instanceof Cloneable || stream instanceof Clone}Clone.prototype._destroy = function (err, cb) {  if (!err) {    this.push(null)    this.end()    this.emit('close')  }  p.nextTick(cb, err)}module.exports = Cloneable
 |