| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991 | const { EventEmitter } = require('events')const STREAM_DESTROYED = new Error('Stream was destroyed')const PREMATURE_CLOSE = new Error('Premature close')const queueTick = require('queue-tick')const FIFO = require('fast-fifo')/* eslint-disable no-multi-spaces */const MAX = ((1 << 25) - 1)// Shared stateconst OPENING     = 0b001const DESTROYING  = 0b010const DESTROYED   = 0b100const NOT_OPENING = MAX ^ OPENING// Read stateconst READ_ACTIVE           = 0b0000000000001 << 3const READ_PRIMARY          = 0b0000000000010 << 3const READ_SYNC             = 0b0000000000100 << 3const READ_QUEUED           = 0b0000000001000 << 3const READ_RESUMED          = 0b0000000010000 << 3const READ_PIPE_DRAINED     = 0b0000000100000 << 3const READ_ENDING           = 0b0000001000000 << 3const READ_EMIT_DATA        = 0b0000010000000 << 3const READ_EMIT_READABLE    = 0b0000100000000 << 3const READ_EMITTED_READABLE = 0b0001000000000 << 3const READ_DONE             = 0b0010000000000 << 3const READ_NEXT_TICK        = 0b0100000000001 << 3 // also activeconst READ_NEEDS_PUSH       = 0b1000000000000 << 3const READ_NOT_ACTIVE             = MAX ^ READ_ACTIVEconst READ_NON_PRIMARY            = MAX ^ READ_PRIMARYconst READ_NON_PRIMARY_AND_PUSHED = MAX ^ (READ_PRIMARY | READ_NEEDS_PUSH)const READ_NOT_SYNC               = MAX ^ READ_SYNCconst READ_PUSHED                 = MAX ^ READ_NEEDS_PUSHconst READ_PAUSED                 = MAX ^ READ_RESUMEDconst READ_NOT_QUEUED             = MAX ^ (READ_QUEUED | READ_EMITTED_READABLE)const READ_NOT_ENDING             = MAX ^ READ_ENDINGconst READ_PIPE_NOT_DRAINED       = MAX ^ (READ_RESUMED | READ_PIPE_DRAINED)const READ_NOT_NEXT_TICK          = MAX ^ READ_NEXT_TICK// Write stateconst WRITE_ACTIVE     = 0b000000001 << 16const WRITE_PRIMARY    = 0b000000010 << 16const WRITE_SYNC       = 0b000000100 << 16const WRITE_QUEUED     = 0b000001000 << 16const WRITE_UNDRAINED  = 0b000010000 << 16const WRITE_DONE       = 0b000100000 << 16const WRITE_EMIT_DRAIN = 0b001000000 << 16const WRITE_NEXT_TICK  = 0b010000001 << 16 // also activeconst WRITE_FINISHING  = 0b100000000 << 16const WRITE_NOT_ACTIVE    = MAX ^ WRITE_ACTIVEconst WRITE_NOT_SYNC      = MAX ^ WRITE_SYNCconst WRITE_NON_PRIMARY   = MAX ^ WRITE_PRIMARYconst WRITE_NOT_FINISHING = MAX ^ WRITE_FINISHINGconst WRITE_DRAINED       = MAX ^ WRITE_UNDRAINEDconst WRITE_NOT_QUEUED    = MAX ^ WRITE_QUEUEDconst WRITE_NOT_NEXT_TICK = MAX ^ WRITE_NEXT_TICK// Combined shared stateconst ACTIVE = READ_ACTIVE | WRITE_ACTIVEconst NOT_ACTIVE = MAX ^ ACTIVEconst DONE = READ_DONE | WRITE_DONEconst DESTROY_STATUS = DESTROYING | DESTROYEDconst OPEN_STATUS = DESTROY_STATUS | OPENINGconst AUTO_DESTROY = DESTROY_STATUS | DONEconst NON_PRIMARY = WRITE_NON_PRIMARY & READ_NON_PRIMARYconst TICKING = (WRITE_NEXT_TICK | READ_NEXT_TICK) & NOT_ACTIVEconst ACTIVE_OR_TICKING = ACTIVE | TICKINGconst IS_OPENING = OPEN_STATUS | TICKING// Combined read stateconst READ_PRIMARY_STATUS = OPEN_STATUS | READ_ENDING | READ_DONEconst READ_STATUS = OPEN_STATUS | READ_DONE | READ_QUEUEDconst READ_FLOWING = READ_RESUMED | READ_PIPE_DRAINEDconst READ_ACTIVE_AND_SYNC = READ_ACTIVE | READ_SYNCconst READ_ACTIVE_AND_SYNC_AND_NEEDS_PUSH = READ_ACTIVE | READ_SYNC | READ_NEEDS_PUSHconst READ_PRIMARY_AND_ACTIVE = READ_PRIMARY | READ_ACTIVEconst READ_ENDING_STATUS = OPEN_STATUS | READ_ENDING | READ_QUEUEDconst READ_EMIT_READABLE_AND_QUEUED = READ_EMIT_READABLE | READ_QUEUEDconst READ_READABLE_STATUS = OPEN_STATUS | READ_EMIT_READABLE | READ_QUEUED | READ_EMITTED_READABLEconst SHOULD_NOT_READ = OPEN_STATUS | READ_ACTIVE | READ_ENDING | READ_DONE | READ_NEEDS_PUSHconst READ_BACKPRESSURE_STATUS = DESTROY_STATUS | READ_ENDING | READ_DONE// Combined write stateconst WRITE_PRIMARY_STATUS = OPEN_STATUS | WRITE_FINISHING | WRITE_DONEconst WRITE_QUEUED_AND_UNDRAINED = WRITE_QUEUED | WRITE_UNDRAINEDconst WRITE_QUEUED_AND_ACTIVE = WRITE_QUEUED | WRITE_ACTIVEconst WRITE_DRAIN_STATUS = WRITE_QUEUED | WRITE_UNDRAINED | OPEN_STATUS | WRITE_ACTIVEconst WRITE_STATUS = OPEN_STATUS | WRITE_ACTIVE | WRITE_QUEUEDconst WRITE_PRIMARY_AND_ACTIVE = WRITE_PRIMARY | WRITE_ACTIVEconst WRITE_ACTIVE_AND_SYNC = WRITE_ACTIVE | WRITE_SYNCconst WRITE_FINISHING_STATUS = OPEN_STATUS | WRITE_FINISHING | WRITE_QUEUED_AND_ACTIVE | WRITE_DONEconst WRITE_BACKPRESSURE_STATUS = WRITE_UNDRAINED | DESTROY_STATUS | WRITE_FINISHING | WRITE_DONEconst asyncIterator = Symbol.asyncIterator || Symbol('asyncIterator')class WritableState {  constructor (stream, { highWaterMark = 16384, map = null, mapWritable, byteLength, byteLengthWritable } = {}) {    this.stream = stream    this.queue = new FIFO()    this.highWaterMark = highWaterMark    this.buffered = 0    this.error = null    this.pipeline = null    this.byteLength = byteLengthWritable || byteLength || defaultByteLength    this.map = mapWritable || map    this.afterWrite = afterWrite.bind(this)    this.afterUpdateNextTick = updateWriteNT.bind(this)  }  get ended () {    return (this.stream._duplexState & WRITE_DONE) !== 0  }  push (data) {    if (this.map !== null) data = this.map(data)    this.buffered += this.byteLength(data)    this.queue.push(data)    if (this.buffered < this.highWaterMark) {      this.stream._duplexState |= WRITE_QUEUED      return true    }    this.stream._duplexState |= WRITE_QUEUED_AND_UNDRAINED    return false  }  shift () {    const data = this.queue.shift()    const stream = this.stream    this.buffered -= this.byteLength(data)    if (this.buffered === 0) stream._duplexState &= WRITE_NOT_QUEUED    return data  }  end (data) {    if (typeof data === 'function') this.stream.once('finish', data)    else if (data !== undefined && data !== null) this.push(data)    this.stream._duplexState = (this.stream._duplexState | WRITE_FINISHING) & WRITE_NON_PRIMARY  }  autoBatch (data, cb) {    const buffer = []    const stream = this.stream    buffer.push(data)    while ((stream._duplexState & WRITE_STATUS) === WRITE_QUEUED_AND_ACTIVE) {      buffer.push(stream._writableState.shift())    }    if ((stream._duplexState & OPEN_STATUS) !== 0) return cb(null)    stream._writev(buffer, cb)  }  update () {    const stream = this.stream    while ((stream._duplexState & WRITE_STATUS) === WRITE_QUEUED) {      const data = this.shift()      stream._duplexState |= WRITE_ACTIVE_AND_SYNC      stream._write(data, this.afterWrite)      stream._duplexState &= WRITE_NOT_SYNC    }    if ((stream._duplexState & WRITE_PRIMARY_AND_ACTIVE) === 0) this.updateNonPrimary()  }  updateNonPrimary () {    const stream = this.stream    if ((stream._duplexState & WRITE_FINISHING_STATUS) === WRITE_FINISHING) {      stream._duplexState = (stream._duplexState | WRITE_ACTIVE) & WRITE_NOT_FINISHING      stream._final(afterFinal.bind(this))      return    }    if ((stream._duplexState & DESTROY_STATUS) === DESTROYING) {      if ((stream._duplexState & ACTIVE_OR_TICKING) === 0) {        stream._duplexState |= ACTIVE        stream._destroy(afterDestroy.bind(this))      }      return    }    if ((stream._duplexState & IS_OPENING) === OPENING) {      stream._duplexState = (stream._duplexState | ACTIVE) & NOT_OPENING      stream._open(afterOpen.bind(this))    }  }  updateNextTick () {    if ((this.stream._duplexState & WRITE_NEXT_TICK) !== 0) return    this.stream._duplexState |= WRITE_NEXT_TICK    queueTick(this.afterUpdateNextTick)  }}class ReadableState {  constructor (stream, { highWaterMark = 16384, map = null, mapReadable, byteLength, byteLengthReadable } = {}) {    this.stream = stream    this.queue = new FIFO()    this.highWaterMark = highWaterMark    this.buffered = 0    this.error = null    this.pipeline = null    this.byteLength = byteLengthReadable || byteLength || defaultByteLength    this.map = mapReadable || map    this.pipeTo = null    this.afterRead = afterRead.bind(this)    this.afterUpdateNextTick = updateReadNT.bind(this)  }  get ended () {    return (this.stream._duplexState & READ_DONE) !== 0  }  pipe (pipeTo, cb) {    if (this.pipeTo !== null) throw new Error('Can only pipe to one destination')    if (typeof cb !== 'function') cb = null    this.stream._duplexState |= READ_PIPE_DRAINED    this.pipeTo = pipeTo    this.pipeline = new Pipeline(this.stream, pipeTo, cb)    if (cb) this.stream.on('error', noop) // We already error handle this so supress crashes    if (isStreamx(pipeTo)) {      pipeTo._writableState.pipeline = this.pipeline      if (cb) pipeTo.on('error', noop) // We already error handle this so supress crashes      pipeTo.on('finish', this.pipeline.finished.bind(this.pipeline)) // TODO: just call finished from pipeTo itself    } else {      const onerror = this.pipeline.done.bind(this.pipeline, pipeTo)      const onclose = this.pipeline.done.bind(this.pipeline, pipeTo, null) // onclose has a weird bool arg      pipeTo.on('error', onerror)      pipeTo.on('close', onclose)      pipeTo.on('finish', this.pipeline.finished.bind(this.pipeline))    }    pipeTo.on('drain', afterDrain.bind(this))    this.stream.emit('piping', pipeTo)    pipeTo.emit('pipe', this.stream)  }  push (data) {    const stream = this.stream    if (data === null) {      this.highWaterMark = 0      stream._duplexState = (stream._duplexState | READ_ENDING) & READ_NON_PRIMARY_AND_PUSHED      return false    }    if (this.map !== null) data = this.map(data)    this.buffered += this.byteLength(data)    this.queue.push(data)    stream._duplexState = (stream._duplexState | READ_QUEUED) & READ_PUSHED    return this.buffered < this.highWaterMark  }  shift () {    const data = this.queue.shift()    this.buffered -= this.byteLength(data)    if (this.buffered === 0) this.stream._duplexState &= READ_NOT_QUEUED    return data  }  unshift (data) {    let tail    const pending = []    while ((tail = this.queue.shift()) !== undefined) {      pending.push(tail)    }    this.push(data)    for (let i = 0; i < pending.length; i++) {      this.queue.push(pending[i])    }  }  read () {    const stream = this.stream    if ((stream._duplexState & READ_STATUS) === READ_QUEUED) {      const data = this.shift()      if (this.pipeTo !== null && this.pipeTo.write(data) === false) stream._duplexState &= READ_PIPE_NOT_DRAINED      if ((stream._duplexState & READ_EMIT_DATA) !== 0) stream.emit('data', data)      return data    }    return null  }  drain () {    const stream = this.stream    while ((stream._duplexState & READ_STATUS) === READ_QUEUED && (stream._duplexState & READ_FLOWING) !== 0) {      const data = this.shift()      if (this.pipeTo !== null && this.pipeTo.write(data) === false) stream._duplexState &= READ_PIPE_NOT_DRAINED      if ((stream._duplexState & READ_EMIT_DATA) !== 0) stream.emit('data', data)    }  }  update () {    const stream = this.stream    this.drain()    while (this.buffered < this.highWaterMark && (stream._duplexState & SHOULD_NOT_READ) === 0) {      stream._duplexState |= READ_ACTIVE_AND_SYNC_AND_NEEDS_PUSH      stream._read(this.afterRead)      stream._duplexState &= READ_NOT_SYNC      if ((stream._duplexState & READ_ACTIVE) === 0) this.drain()    }    if ((stream._duplexState & READ_READABLE_STATUS) === READ_EMIT_READABLE_AND_QUEUED) {      stream._duplexState |= READ_EMITTED_READABLE      stream.emit('readable')    }    if ((stream._duplexState & READ_PRIMARY_AND_ACTIVE) === 0) this.updateNonPrimary()  }  updateNonPrimary () {    const stream = this.stream    if ((stream._duplexState & READ_ENDING_STATUS) === READ_ENDING) {      stream._duplexState = (stream._duplexState | READ_DONE) & READ_NOT_ENDING      stream.emit('end')      if ((stream._duplexState & AUTO_DESTROY) === DONE) stream._duplexState |= DESTROYING      if (this.pipeTo !== null) this.pipeTo.end()    }    if ((stream._duplexState & DESTROY_STATUS) === DESTROYING) {      if ((stream._duplexState & ACTIVE_OR_TICKING) === 0) {        stream._duplexState |= ACTIVE        stream._destroy(afterDestroy.bind(this))      }      return    }    if ((stream._duplexState & IS_OPENING) === OPENING) {      stream._duplexState = (stream._duplexState | ACTIVE) & NOT_OPENING      stream._open(afterOpen.bind(this))    }  }  updateNextTick () {    if ((this.stream._duplexState & READ_NEXT_TICK) !== 0) return    this.stream._duplexState |= READ_NEXT_TICK    queueTick(this.afterUpdateNextTick)  }}class TransformState {  constructor (stream) {    this.data = null    this.afterTransform = afterTransform.bind(stream)    this.afterFinal = null  }}class Pipeline {  constructor (src, dst, cb) {    this.from = src    this.to = dst    this.afterPipe = cb    this.error = null    this.pipeToFinished = false  }  finished () {    this.pipeToFinished = true  }  done (stream, err) {    if (err) this.error = err    if (stream === this.to) {      this.to = null      if (this.from !== null) {        if ((this.from._duplexState & READ_DONE) === 0 || !this.pipeToFinished) {          this.from.destroy(this.error || new Error('Writable stream closed prematurely'))        }        return      }    }    if (stream === this.from) {      this.from = null      if (this.to !== null) {        if ((stream._duplexState & READ_DONE) === 0) {          this.to.destroy(this.error || new Error('Readable stream closed before ending'))        }        return      }    }    if (this.afterPipe !== null) this.afterPipe(this.error)    this.to = this.from = this.afterPipe = null  }}function afterDrain () {  this.stream._duplexState |= READ_PIPE_DRAINED  if ((this.stream._duplexState & READ_ACTIVE_AND_SYNC) === 0) this.updateNextTick()  else this.drain()}function afterFinal (err) {  const stream = this.stream  if (err) stream.destroy(err)  if ((stream._duplexState & DESTROY_STATUS) === 0) {    stream._duplexState |= WRITE_DONE    stream.emit('finish')  }  if ((stream._duplexState & AUTO_DESTROY) === DONE) {    stream._duplexState |= DESTROYING  }  stream._duplexState &= WRITE_NOT_ACTIVE  this.update()}function afterDestroy (err) {  const stream = this.stream  if (!err && this.error !== STREAM_DESTROYED) err = this.error  if (err) stream.emit('error', err)  stream._duplexState |= DESTROYED  stream.emit('close')  const rs = stream._readableState  const ws = stream._writableState  if (rs !== null && rs.pipeline !== null) rs.pipeline.done(stream, err)  if (ws !== null && ws.pipeline !== null) ws.pipeline.done(stream, err)}function afterWrite (err) {  const stream = this.stream  if (err) stream.destroy(err)  stream._duplexState &= WRITE_NOT_ACTIVE  if ((stream._duplexState & WRITE_DRAIN_STATUS) === WRITE_UNDRAINED) {    stream._duplexState &= WRITE_DRAINED    if ((stream._duplexState & WRITE_EMIT_DRAIN) === WRITE_EMIT_DRAIN) {      stream.emit('drain')    }  }  if ((stream._duplexState & WRITE_SYNC) === 0) this.update()}function afterRead (err) {  if (err) this.stream.destroy(err)  this.stream._duplexState &= READ_NOT_ACTIVE  if ((this.stream._duplexState & READ_SYNC) === 0) this.update()}function updateReadNT () {  this.stream._duplexState &= READ_NOT_NEXT_TICK  this.update()}function updateWriteNT () {  this.stream._duplexState &= WRITE_NOT_NEXT_TICK  this.update()}function afterOpen (err) {  const stream = this.stream  if (err) stream.destroy(err)  if ((stream._duplexState & DESTROYING) === 0) {    if ((stream._duplexState & READ_PRIMARY_STATUS) === 0) stream._duplexState |= READ_PRIMARY    if ((stream._duplexState & WRITE_PRIMARY_STATUS) === 0) stream._duplexState |= WRITE_PRIMARY    stream.emit('open')  }  stream._duplexState &= NOT_ACTIVE  if (stream._writableState !== null) {    stream._writableState.update()  }  if (stream._readableState !== null) {    stream._readableState.update()  }}function afterTransform (err, data) {  if (data !== undefined && data !== null) this.push(data)  this._writableState.afterWrite(err)}class Stream extends EventEmitter {  constructor (opts) {    super()    this._duplexState = 0    this._readableState = null    this._writableState = null    if (opts) {      if (opts.open) this._open = opts.open      if (opts.destroy) this._destroy = opts.destroy      if (opts.predestroy) this._predestroy = opts.predestroy      if (opts.signal) {        opts.signal.addEventListener('abort', abort.bind(this))      }    }  }  _open (cb) {    cb(null)  }  _destroy (cb) {    cb(null)  }  _predestroy () {    // does nothing  }  get readable () {    return this._readableState !== null ? true : undefined  }  get writable () {    return this._writableState !== null ? true : undefined  }  get destroyed () {    return (this._duplexState & DESTROYED) !== 0  }  get destroying () {    return (this._duplexState & DESTROY_STATUS) !== 0  }  destroy (err) {    if ((this._duplexState & DESTROY_STATUS) === 0) {      if (!err) err = STREAM_DESTROYED      this._duplexState = (this._duplexState | DESTROYING) & NON_PRIMARY      if (this._readableState !== null) {        this._readableState.error = err        this._readableState.updateNextTick()      }      if (this._writableState !== null) {        this._writableState.error = err        this._writableState.updateNextTick()      }      this._predestroy()    }  }  on (name, fn) {    if (this._readableState !== null) {      if (name === 'data') {        this._duplexState |= (READ_EMIT_DATA | READ_RESUMED)        this._readableState.updateNextTick()      }      if (name === 'readable') {        this._duplexState |= READ_EMIT_READABLE        this._readableState.updateNextTick()      }    }    if (this._writableState !== null) {      if (name === 'drain') {        this._duplexState |= WRITE_EMIT_DRAIN        this._writableState.updateNextTick()      }    }    return super.on(name, fn)  }}class Readable extends Stream {  constructor (opts) {    super(opts)    this._duplexState |= OPENING | WRITE_DONE    this._readableState = new ReadableState(this, opts)    if (opts) {      if (opts.read) this._read = opts.read      if (opts.eagerOpen) this.resume().pause()    }  }  _read (cb) {    cb(null)  }  pipe (dest, cb) {    this._readableState.pipe(dest, cb)    this._readableState.updateNextTick()    return dest  }  read () {    this._readableState.updateNextTick()    return this._readableState.read()  }  push (data) {    this._readableState.updateNextTick()    return this._readableState.push(data)  }  unshift (data) {    this._readableState.updateNextTick()    return this._readableState.unshift(data)  }  resume () {    this._duplexState |= READ_RESUMED    this._readableState.updateNextTick()    return this  }  pause () {    this._duplexState &= READ_PAUSED    return this  }  static _fromAsyncIterator (ite, opts) {    let destroy    const rs = new Readable({      ...opts,      read (cb) {        ite.next().then(push).then(cb.bind(null, null)).catch(cb)      },      predestroy () {        destroy = ite.return()      },      destroy (cb) {        if (!destroy) return cb(null)        destroy.then(cb.bind(null, null)).catch(cb)      }    })    return rs    function push (data) {      if (data.done) rs.push(null)      else rs.push(data.value)    }  }  static from (data, opts) {    if (isReadStreamx(data)) return data    if (data[asyncIterator]) return this._fromAsyncIterator(data[asyncIterator](), opts)    if (!Array.isArray(data)) data = data === undefined ? [] : [data]    let i = 0    return new Readable({      ...opts,      read (cb) {        this.push(i === data.length ? null : data[i++])        cb(null)      }    })  }  static isBackpressured (rs) {    return (rs._duplexState & READ_BACKPRESSURE_STATUS) !== 0 || rs._readableState.buffered >= rs._readableState.highWaterMark  }  static isPaused (rs) {    return (rs._duplexState & READ_RESUMED) === 0  }  [asyncIterator] () {    const stream = this    let error = null    let promiseResolve = null    let promiseReject = null    this.on('error', (err) => { error = err })    this.on('readable', onreadable)    this.on('close', onclose)    return {      [asyncIterator] () {        return this      },      next () {        return new Promise(function (resolve, reject) {          promiseResolve = resolve          promiseReject = reject          const data = stream.read()          if (data !== null) ondata(data)          else if ((stream._duplexState & DESTROYED) !== 0) ondata(null)        })      },      return () {        return destroy(null)      },      throw (err) {        return destroy(err)      }    }    function onreadable () {      if (promiseResolve !== null) ondata(stream.read())    }    function onclose () {      if (promiseResolve !== null) ondata(null)    }    function ondata (data) {      if (promiseReject === null) return      if (error) promiseReject(error)      else if (data === null && (stream._duplexState & READ_DONE) === 0) promiseReject(STREAM_DESTROYED)      else promiseResolve({ value: data, done: data === null })      promiseReject = promiseResolve = null    }    function destroy (err) {      stream.destroy(err)      return new Promise((resolve, reject) => {        if (stream._duplexState & DESTROYED) return resolve({ value: undefined, done: true })        stream.once('close', function () {          if (err) reject(err)          else resolve({ value: undefined, done: true })        })      })    }  }}class Writable extends Stream {  constructor (opts) {    super(opts)    this._duplexState |= OPENING | READ_DONE    this._writableState = new WritableState(this, opts)    if (opts) {      if (opts.writev) this._writev = opts.writev      if (opts.write) this._write = opts.write      if (opts.final) this._final = opts.final    }  }  _writev (batch, cb) {    cb(null)  }  _write (data, cb) {    this._writableState.autoBatch(data, cb)  }  _final (cb) {    cb(null)  }  static isBackpressured (ws) {    return (ws._duplexState & WRITE_BACKPRESSURE_STATUS) !== 0  }  write (data) {    this._writableState.updateNextTick()    return this._writableState.push(data)  }  end (data) {    this._writableState.updateNextTick()    this._writableState.end(data)    return this  }}class Duplex extends Readable { // and Writable  constructor (opts) {    super(opts)    this._duplexState = OPENING    this._writableState = new WritableState(this, opts)    if (opts) {      if (opts.writev) this._writev = opts.writev      if (opts.write) this._write = opts.write      if (opts.final) this._final = opts.final    }  }  _writev (batch, cb) {    cb(null)  }  _write (data, cb) {    this._writableState.autoBatch(data, cb)  }  _final (cb) {    cb(null)  }  write (data) {    this._writableState.updateNextTick()    return this._writableState.push(data)  }  end (data) {    this._writableState.updateNextTick()    this._writableState.end(data)    return this  }}class Transform extends Duplex {  constructor (opts) {    super(opts)    this._transformState = new TransformState(this)    if (opts) {      if (opts.transform) this._transform = opts.transform      if (opts.flush) this._flush = opts.flush    }  }  _write (data, cb) {    if (this._readableState.buffered >= this._readableState.highWaterMark) {      this._transformState.data = data    } else {      this._transform(data, this._transformState.afterTransform)    }  }  _read (cb) {    if (this._transformState.data !== null) {      const data = this._transformState.data      this._transformState.data = null      cb(null)      this._transform(data, this._transformState.afterTransform)    } else {      cb(null)    }  }  _transform (data, cb) {    cb(null, data)  }  _flush (cb) {    cb(null)  }  _final (cb) {    this._transformState.afterFinal = cb    this._flush(transformAfterFlush.bind(this))  }}class PassThrough extends Transform {}function transformAfterFlush (err, data) {  const cb = this._transformState.afterFinal  if (err) return cb(err)  if (data !== null && data !== undefined) this.push(data)  this.push(null)  cb(null)}function pipelinePromise (...streams) {  return new Promise((resolve, reject) => {    return pipeline(...streams, (err) => {      if (err) return reject(err)      resolve()    })  })}function pipeline (stream, ...streams) {  const all = Array.isArray(stream) ? [...stream, ...streams] : [stream, ...streams]  const done = (all.length && typeof all[all.length - 1] === 'function') ? all.pop() : null  if (all.length < 2) throw new Error('Pipeline requires at least 2 streams')  let src = all[0]  let dest = null  let error = null  for (let i = 1; i < all.length; i++) {    dest = all[i]    if (isStreamx(src)) {      src.pipe(dest, onerror)    } else {      errorHandle(src, true, i > 1, onerror)      src.pipe(dest)    }    src = dest  }  if (done) {    let fin = false    dest.on('finish', () => { fin = true })    dest.on('error', err => { error = error || err })    dest.on('close', () => done(error || (fin ? null : PREMATURE_CLOSE)))  }  return dest  function errorHandle (s, rd, wr, onerror) {    s.on('error', onerror)    s.on('close', onclose)    function onclose () {      if (rd && s._readableState && !s._readableState.ended) return onerror(PREMATURE_CLOSE)      if (wr && s._writableState && !s._writableState.ended) return onerror(PREMATURE_CLOSE)    }  }  function onerror (err) {    if (!err || error) return    error = err    for (const s of all) {      s.destroy(err)    }  }}function isStream (stream) {  return !!stream._readableState || !!stream._writableState}function isStreamx (stream) {  return typeof stream._duplexState === 'number' && isStream(stream)}function isReadStreamx (stream) {  return isStreamx(stream) && stream.readable}function isTypedArray (data) {  return typeof data === 'object' && data !== null && typeof data.byteLength === 'number'}function defaultByteLength (data) {  return isTypedArray(data) ? data.byteLength : 1024}function noop () {}function abort () {  this.destroy(new Error('Stream aborted.'))}module.exports = {  pipeline,  pipelinePromise,  isStream,  isStreamx,  Stream,  Writable,  Readable,  Duplex,  Transform,  // Export PassThrough for compatibility with Node.js core's stream module  PassThrough}
 |