123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991 |
- const { EventEmitter } = require('events')
- const STREAM_DESTROYED = new Error('Stream was destroyed')
- const PREMATURE_CLOSE = new Error('Premature close')
- const queueTick = require('queue-tick')
- const FIFO = require('fast-fifo')
- /* eslint-disable no-multi-spaces */
- const MAX = ((1 << 25) - 1)
- // Shared state
- const OPENING = 0b001
- const DESTROYING = 0b010
- const DESTROYED = 0b100
- const NOT_OPENING = MAX ^ OPENING
- // Read state
- const READ_ACTIVE = 0b0000000000001 << 3
- const READ_PRIMARY = 0b0000000000010 << 3
- const READ_SYNC = 0b0000000000100 << 3
- const READ_QUEUED = 0b0000000001000 << 3
- const READ_RESUMED = 0b0000000010000 << 3
- const READ_PIPE_DRAINED = 0b0000000100000 << 3
- const READ_ENDING = 0b0000001000000 << 3
- const READ_EMIT_DATA = 0b0000010000000 << 3
- const READ_EMIT_READABLE = 0b0000100000000 << 3
- const READ_EMITTED_READABLE = 0b0001000000000 << 3
- const READ_DONE = 0b0010000000000 << 3
- const READ_NEXT_TICK = 0b0100000000001 << 3 // also active
- const READ_NEEDS_PUSH = 0b1000000000000 << 3
- const READ_NOT_ACTIVE = MAX ^ READ_ACTIVE
- const READ_NON_PRIMARY = MAX ^ READ_PRIMARY
- const READ_NON_PRIMARY_AND_PUSHED = MAX ^ (READ_PRIMARY | READ_NEEDS_PUSH)
- const READ_NOT_SYNC = MAX ^ READ_SYNC
- const READ_PUSHED = MAX ^ READ_NEEDS_PUSH
- const READ_PAUSED = MAX ^ READ_RESUMED
- const READ_NOT_QUEUED = MAX ^ (READ_QUEUED | READ_EMITTED_READABLE)
- const READ_NOT_ENDING = MAX ^ READ_ENDING
- const READ_PIPE_NOT_DRAINED = MAX ^ (READ_RESUMED | READ_PIPE_DRAINED)
- const READ_NOT_NEXT_TICK = MAX ^ READ_NEXT_TICK
- // Write state
- const WRITE_ACTIVE = 0b000000001 << 16
- const WRITE_PRIMARY = 0b000000010 << 16
- const WRITE_SYNC = 0b000000100 << 16
- const WRITE_QUEUED = 0b000001000 << 16
- const WRITE_UNDRAINED = 0b000010000 << 16
- const WRITE_DONE = 0b000100000 << 16
- const WRITE_EMIT_DRAIN = 0b001000000 << 16
- const WRITE_NEXT_TICK = 0b010000001 << 16 // also active
- const WRITE_FINISHING = 0b100000000 << 16
- const WRITE_NOT_ACTIVE = MAX ^ WRITE_ACTIVE
- const WRITE_NOT_SYNC = MAX ^ WRITE_SYNC
- const WRITE_NON_PRIMARY = MAX ^ WRITE_PRIMARY
- const WRITE_NOT_FINISHING = MAX ^ WRITE_FINISHING
- const WRITE_DRAINED = MAX ^ WRITE_UNDRAINED
- const WRITE_NOT_QUEUED = MAX ^ WRITE_QUEUED
- const WRITE_NOT_NEXT_TICK = MAX ^ WRITE_NEXT_TICK
- // Combined shared state
- const ACTIVE = READ_ACTIVE | WRITE_ACTIVE
- const NOT_ACTIVE = MAX ^ ACTIVE
- const DONE = READ_DONE | WRITE_DONE
- const DESTROY_STATUS = DESTROYING | DESTROYED
- const OPEN_STATUS = DESTROY_STATUS | OPENING
- const AUTO_DESTROY = DESTROY_STATUS | DONE
- const NON_PRIMARY = WRITE_NON_PRIMARY & READ_NON_PRIMARY
- const TICKING = (WRITE_NEXT_TICK | READ_NEXT_TICK) & NOT_ACTIVE
- const ACTIVE_OR_TICKING = ACTIVE | TICKING
- const IS_OPENING = OPEN_STATUS | TICKING
- // Combined read state
- const READ_PRIMARY_STATUS = OPEN_STATUS | READ_ENDING | READ_DONE
- const READ_STATUS = OPEN_STATUS | READ_DONE | READ_QUEUED
- const READ_FLOWING = READ_RESUMED | READ_PIPE_DRAINED
- const READ_ACTIVE_AND_SYNC = READ_ACTIVE | READ_SYNC
- const READ_ACTIVE_AND_SYNC_AND_NEEDS_PUSH = READ_ACTIVE | READ_SYNC | READ_NEEDS_PUSH
- const READ_PRIMARY_AND_ACTIVE = READ_PRIMARY | READ_ACTIVE
- const READ_ENDING_STATUS = OPEN_STATUS | READ_ENDING | READ_QUEUED
- const READ_EMIT_READABLE_AND_QUEUED = READ_EMIT_READABLE | READ_QUEUED
- const READ_READABLE_STATUS = OPEN_STATUS | READ_EMIT_READABLE | READ_QUEUED | READ_EMITTED_READABLE
- const SHOULD_NOT_READ = OPEN_STATUS | READ_ACTIVE | READ_ENDING | READ_DONE | READ_NEEDS_PUSH
- const READ_BACKPRESSURE_STATUS = DESTROY_STATUS | READ_ENDING | READ_DONE
- // Combined write state
- const WRITE_PRIMARY_STATUS = OPEN_STATUS | WRITE_FINISHING | WRITE_DONE
- const WRITE_QUEUED_AND_UNDRAINED = WRITE_QUEUED | WRITE_UNDRAINED
- const WRITE_QUEUED_AND_ACTIVE = WRITE_QUEUED | WRITE_ACTIVE
- const WRITE_DRAIN_STATUS = WRITE_QUEUED | WRITE_UNDRAINED | OPEN_STATUS | WRITE_ACTIVE
- const WRITE_STATUS = OPEN_STATUS | WRITE_ACTIVE | WRITE_QUEUED
- const WRITE_PRIMARY_AND_ACTIVE = WRITE_PRIMARY | WRITE_ACTIVE
- const WRITE_ACTIVE_AND_SYNC = WRITE_ACTIVE | WRITE_SYNC
- const WRITE_FINISHING_STATUS = OPEN_STATUS | WRITE_FINISHING | WRITE_QUEUED_AND_ACTIVE | WRITE_DONE
- const WRITE_BACKPRESSURE_STATUS = WRITE_UNDRAINED | DESTROY_STATUS | WRITE_FINISHING | WRITE_DONE
- const asyncIterator = Symbol.asyncIterator || Symbol('asyncIterator')
- class WritableState {
- constructor (stream, { highWaterMark = 16384, map = null, mapWritable, byteLength, byteLengthWritable } = {}) {
- this.stream = stream
- this.queue = new FIFO()
- this.highWaterMark = highWaterMark
- this.buffered = 0
- this.error = null
- this.pipeline = null
- this.byteLength = byteLengthWritable || byteLength || defaultByteLength
- this.map = mapWritable || map
- this.afterWrite = afterWrite.bind(this)
- this.afterUpdateNextTick = updateWriteNT.bind(this)
- }
- get ended () {
- return (this.stream._duplexState & WRITE_DONE) !== 0
- }
- push (data) {
- if (this.map !== null) data = this.map(data)
- this.buffered += this.byteLength(data)
- this.queue.push(data)
- if (this.buffered < this.highWaterMark) {
- this.stream._duplexState |= WRITE_QUEUED
- return true
- }
- this.stream._duplexState |= WRITE_QUEUED_AND_UNDRAINED
- return false
- }
- shift () {
- const data = this.queue.shift()
- const stream = this.stream
- this.buffered -= this.byteLength(data)
- if (this.buffered === 0) stream._duplexState &= WRITE_NOT_QUEUED
- return data
- }
- end (data) {
- if (typeof data === 'function') this.stream.once('finish', data)
- else if (data !== undefined && data !== null) this.push(data)
- this.stream._duplexState = (this.stream._duplexState | WRITE_FINISHING) & WRITE_NON_PRIMARY
- }
- autoBatch (data, cb) {
- const buffer = []
- const stream = this.stream
- buffer.push(data)
- while ((stream._duplexState & WRITE_STATUS) === WRITE_QUEUED_AND_ACTIVE) {
- buffer.push(stream._writableState.shift())
- }
- if ((stream._duplexState & OPEN_STATUS) !== 0) return cb(null)
- stream._writev(buffer, cb)
- }
- update () {
- const stream = this.stream
- while ((stream._duplexState & WRITE_STATUS) === WRITE_QUEUED) {
- const data = this.shift()
- stream._duplexState |= WRITE_ACTIVE_AND_SYNC
- stream._write(data, this.afterWrite)
- stream._duplexState &= WRITE_NOT_SYNC
- }
- if ((stream._duplexState & WRITE_PRIMARY_AND_ACTIVE) === 0) this.updateNonPrimary()
- }
- updateNonPrimary () {
- const stream = this.stream
- if ((stream._duplexState & WRITE_FINISHING_STATUS) === WRITE_FINISHING) {
- stream._duplexState = (stream._duplexState | WRITE_ACTIVE) & WRITE_NOT_FINISHING
- stream._final(afterFinal.bind(this))
- return
- }
- if ((stream._duplexState & DESTROY_STATUS) === DESTROYING) {
- if ((stream._duplexState & ACTIVE_OR_TICKING) === 0) {
- stream._duplexState |= ACTIVE
- stream._destroy(afterDestroy.bind(this))
- }
- return
- }
- if ((stream._duplexState & IS_OPENING) === OPENING) {
- stream._duplexState = (stream._duplexState | ACTIVE) & NOT_OPENING
- stream._open(afterOpen.bind(this))
- }
- }
- updateNextTick () {
- if ((this.stream._duplexState & WRITE_NEXT_TICK) !== 0) return
- this.stream._duplexState |= WRITE_NEXT_TICK
- queueTick(this.afterUpdateNextTick)
- }
- }
- class ReadableState {
- constructor (stream, { highWaterMark = 16384, map = null, mapReadable, byteLength, byteLengthReadable } = {}) {
- this.stream = stream
- this.queue = new FIFO()
- this.highWaterMark = highWaterMark
- this.buffered = 0
- this.error = null
- this.pipeline = null
- this.byteLength = byteLengthReadable || byteLength || defaultByteLength
- this.map = mapReadable || map
- this.pipeTo = null
- this.afterRead = afterRead.bind(this)
- this.afterUpdateNextTick = updateReadNT.bind(this)
- }
- get ended () {
- return (this.stream._duplexState & READ_DONE) !== 0
- }
- pipe (pipeTo, cb) {
- if (this.pipeTo !== null) throw new Error('Can only pipe to one destination')
- if (typeof cb !== 'function') cb = null
- this.stream._duplexState |= READ_PIPE_DRAINED
- this.pipeTo = pipeTo
- this.pipeline = new Pipeline(this.stream, pipeTo, cb)
- if (cb) this.stream.on('error', noop) // We already error handle this so supress crashes
- if (isStreamx(pipeTo)) {
- pipeTo._writableState.pipeline = this.pipeline
- if (cb) pipeTo.on('error', noop) // We already error handle this so supress crashes
- pipeTo.on('finish', this.pipeline.finished.bind(this.pipeline)) // TODO: just call finished from pipeTo itself
- } else {
- const onerror = this.pipeline.done.bind(this.pipeline, pipeTo)
- const onclose = this.pipeline.done.bind(this.pipeline, pipeTo, null) // onclose has a weird bool arg
- pipeTo.on('error', onerror)
- pipeTo.on('close', onclose)
- pipeTo.on('finish', this.pipeline.finished.bind(this.pipeline))
- }
- pipeTo.on('drain', afterDrain.bind(this))
- this.stream.emit('piping', pipeTo)
- pipeTo.emit('pipe', this.stream)
- }
- push (data) {
- const stream = this.stream
- if (data === null) {
- this.highWaterMark = 0
- stream._duplexState = (stream._duplexState | READ_ENDING) & READ_NON_PRIMARY_AND_PUSHED
- return false
- }
- if (this.map !== null) data = this.map(data)
- this.buffered += this.byteLength(data)
- this.queue.push(data)
- stream._duplexState = (stream._duplexState | READ_QUEUED) & READ_PUSHED
- return this.buffered < this.highWaterMark
- }
- shift () {
- const data = this.queue.shift()
- this.buffered -= this.byteLength(data)
- if (this.buffered === 0) this.stream._duplexState &= READ_NOT_QUEUED
- return data
- }
- unshift (data) {
- let tail
- const pending = []
- while ((tail = this.queue.shift()) !== undefined) {
- pending.push(tail)
- }
- this.push(data)
- for (let i = 0; i < pending.length; i++) {
- this.queue.push(pending[i])
- }
- }
- read () {
- const stream = this.stream
- if ((stream._duplexState & READ_STATUS) === READ_QUEUED) {
- const data = this.shift()
- if (this.pipeTo !== null && this.pipeTo.write(data) === false) stream._duplexState &= READ_PIPE_NOT_DRAINED
- if ((stream._duplexState & READ_EMIT_DATA) !== 0) stream.emit('data', data)
- return data
- }
- return null
- }
- drain () {
- const stream = this.stream
- while ((stream._duplexState & READ_STATUS) === READ_QUEUED && (stream._duplexState & READ_FLOWING) !== 0) {
- const data = this.shift()
- if (this.pipeTo !== null && this.pipeTo.write(data) === false) stream._duplexState &= READ_PIPE_NOT_DRAINED
- if ((stream._duplexState & READ_EMIT_DATA) !== 0) stream.emit('data', data)
- }
- }
- update () {
- const stream = this.stream
- this.drain()
- while (this.buffered < this.highWaterMark && (stream._duplexState & SHOULD_NOT_READ) === 0) {
- stream._duplexState |= READ_ACTIVE_AND_SYNC_AND_NEEDS_PUSH
- stream._read(this.afterRead)
- stream._duplexState &= READ_NOT_SYNC
- if ((stream._duplexState & READ_ACTIVE) === 0) this.drain()
- }
- if ((stream._duplexState & READ_READABLE_STATUS) === READ_EMIT_READABLE_AND_QUEUED) {
- stream._duplexState |= READ_EMITTED_READABLE
- stream.emit('readable')
- }
- if ((stream._duplexState & READ_PRIMARY_AND_ACTIVE) === 0) this.updateNonPrimary()
- }
- updateNonPrimary () {
- const stream = this.stream
- if ((stream._duplexState & READ_ENDING_STATUS) === READ_ENDING) {
- stream._duplexState = (stream._duplexState | READ_DONE) & READ_NOT_ENDING
- stream.emit('end')
- if ((stream._duplexState & AUTO_DESTROY) === DONE) stream._duplexState |= DESTROYING
- if (this.pipeTo !== null) this.pipeTo.end()
- }
- if ((stream._duplexState & DESTROY_STATUS) === DESTROYING) {
- if ((stream._duplexState & ACTIVE_OR_TICKING) === 0) {
- stream._duplexState |= ACTIVE
- stream._destroy(afterDestroy.bind(this))
- }
- return
- }
- if ((stream._duplexState & IS_OPENING) === OPENING) {
- stream._duplexState = (stream._duplexState | ACTIVE) & NOT_OPENING
- stream._open(afterOpen.bind(this))
- }
- }
- updateNextTick () {
- if ((this.stream._duplexState & READ_NEXT_TICK) !== 0) return
- this.stream._duplexState |= READ_NEXT_TICK
- queueTick(this.afterUpdateNextTick)
- }
- }
- class TransformState {
- constructor (stream) {
- this.data = null
- this.afterTransform = afterTransform.bind(stream)
- this.afterFinal = null
- }
- }
- class Pipeline {
- constructor (src, dst, cb) {
- this.from = src
- this.to = dst
- this.afterPipe = cb
- this.error = null
- this.pipeToFinished = false
- }
- finished () {
- this.pipeToFinished = true
- }
- done (stream, err) {
- if (err) this.error = err
- if (stream === this.to) {
- this.to = null
- if (this.from !== null) {
- if ((this.from._duplexState & READ_DONE) === 0 || !this.pipeToFinished) {
- this.from.destroy(this.error || new Error('Writable stream closed prematurely'))
- }
- return
- }
- }
- if (stream === this.from) {
- this.from = null
- if (this.to !== null) {
- if ((stream._duplexState & READ_DONE) === 0) {
- this.to.destroy(this.error || new Error('Readable stream closed before ending'))
- }
- return
- }
- }
- if (this.afterPipe !== null) this.afterPipe(this.error)
- this.to = this.from = this.afterPipe = null
- }
- }
- function afterDrain () {
- this.stream._duplexState |= READ_PIPE_DRAINED
- if ((this.stream._duplexState & READ_ACTIVE_AND_SYNC) === 0) this.updateNextTick()
- else this.drain()
- }
- function afterFinal (err) {
- const stream = this.stream
- if (err) stream.destroy(err)
- if ((stream._duplexState & DESTROY_STATUS) === 0) {
- stream._duplexState |= WRITE_DONE
- stream.emit('finish')
- }
- if ((stream._duplexState & AUTO_DESTROY) === DONE) {
- stream._duplexState |= DESTROYING
- }
- stream._duplexState &= WRITE_NOT_ACTIVE
- this.update()
- }
- function afterDestroy (err) {
- const stream = this.stream
- if (!err && this.error !== STREAM_DESTROYED) err = this.error
- if (err) stream.emit('error', err)
- stream._duplexState |= DESTROYED
- stream.emit('close')
- const rs = stream._readableState
- const ws = stream._writableState
- if (rs !== null && rs.pipeline !== null) rs.pipeline.done(stream, err)
- if (ws !== null && ws.pipeline !== null) ws.pipeline.done(stream, err)
- }
- function afterWrite (err) {
- const stream = this.stream
- if (err) stream.destroy(err)
- stream._duplexState &= WRITE_NOT_ACTIVE
- if ((stream._duplexState & WRITE_DRAIN_STATUS) === WRITE_UNDRAINED) {
- stream._duplexState &= WRITE_DRAINED
- if ((stream._duplexState & WRITE_EMIT_DRAIN) === WRITE_EMIT_DRAIN) {
- stream.emit('drain')
- }
- }
- if ((stream._duplexState & WRITE_SYNC) === 0) this.update()
- }
- function afterRead (err) {
- if (err) this.stream.destroy(err)
- this.stream._duplexState &= READ_NOT_ACTIVE
- if ((this.stream._duplexState & READ_SYNC) === 0) this.update()
- }
- function updateReadNT () {
- this.stream._duplexState &= READ_NOT_NEXT_TICK
- this.update()
- }
- function updateWriteNT () {
- this.stream._duplexState &= WRITE_NOT_NEXT_TICK
- this.update()
- }
- function afterOpen (err) {
- const stream = this.stream
- if (err) stream.destroy(err)
- if ((stream._duplexState & DESTROYING) === 0) {
- if ((stream._duplexState & READ_PRIMARY_STATUS) === 0) stream._duplexState |= READ_PRIMARY
- if ((stream._duplexState & WRITE_PRIMARY_STATUS) === 0) stream._duplexState |= WRITE_PRIMARY
- stream.emit('open')
- }
- stream._duplexState &= NOT_ACTIVE
- if (stream._writableState !== null) {
- stream._writableState.update()
- }
- if (stream._readableState !== null) {
- stream._readableState.update()
- }
- }
- function afterTransform (err, data) {
- if (data !== undefined && data !== null) this.push(data)
- this._writableState.afterWrite(err)
- }
- class Stream extends EventEmitter {
- constructor (opts) {
- super()
- this._duplexState = 0
- this._readableState = null
- this._writableState = null
- if (opts) {
- if (opts.open) this._open = opts.open
- if (opts.destroy) this._destroy = opts.destroy
- if (opts.predestroy) this._predestroy = opts.predestroy
- if (opts.signal) {
- opts.signal.addEventListener('abort', abort.bind(this))
- }
- }
- }
- _open (cb) {
- cb(null)
- }
- _destroy (cb) {
- cb(null)
- }
- _predestroy () {
- // does nothing
- }
- get readable () {
- return this._readableState !== null ? true : undefined
- }
- get writable () {
- return this._writableState !== null ? true : undefined
- }
- get destroyed () {
- return (this._duplexState & DESTROYED) !== 0
- }
- get destroying () {
- return (this._duplexState & DESTROY_STATUS) !== 0
- }
- destroy (err) {
- if ((this._duplexState & DESTROY_STATUS) === 0) {
- if (!err) err = STREAM_DESTROYED
- this._duplexState = (this._duplexState | DESTROYING) & NON_PRIMARY
- if (this._readableState !== null) {
- this._readableState.error = err
- this._readableState.updateNextTick()
- }
- if (this._writableState !== null) {
- this._writableState.error = err
- this._writableState.updateNextTick()
- }
- this._predestroy()
- }
- }
- on (name, fn) {
- if (this._readableState !== null) {
- if (name === 'data') {
- this._duplexState |= (READ_EMIT_DATA | READ_RESUMED)
- this._readableState.updateNextTick()
- }
- if (name === 'readable') {
- this._duplexState |= READ_EMIT_READABLE
- this._readableState.updateNextTick()
- }
- }
- if (this._writableState !== null) {
- if (name === 'drain') {
- this._duplexState |= WRITE_EMIT_DRAIN
- this._writableState.updateNextTick()
- }
- }
- return super.on(name, fn)
- }
- }
- class Readable extends Stream {
- constructor (opts) {
- super(opts)
- this._duplexState |= OPENING | WRITE_DONE
- this._readableState = new ReadableState(this, opts)
- if (opts) {
- if (opts.read) this._read = opts.read
- if (opts.eagerOpen) this.resume().pause()
- }
- }
- _read (cb) {
- cb(null)
- }
- pipe (dest, cb) {
- this._readableState.pipe(dest, cb)
- this._readableState.updateNextTick()
- return dest
- }
- read () {
- this._readableState.updateNextTick()
- return this._readableState.read()
- }
- push (data) {
- this._readableState.updateNextTick()
- return this._readableState.push(data)
- }
- unshift (data) {
- this._readableState.updateNextTick()
- return this._readableState.unshift(data)
- }
- resume () {
- this._duplexState |= READ_RESUMED
- this._readableState.updateNextTick()
- return this
- }
- pause () {
- this._duplexState &= READ_PAUSED
- return this
- }
- static _fromAsyncIterator (ite, opts) {
- let destroy
- const rs = new Readable({
- ...opts,
- read (cb) {
- ite.next().then(push).then(cb.bind(null, null)).catch(cb)
- },
- predestroy () {
- destroy = ite.return()
- },
- destroy (cb) {
- if (!destroy) return cb(null)
- destroy.then(cb.bind(null, null)).catch(cb)
- }
- })
- return rs
- function push (data) {
- if (data.done) rs.push(null)
- else rs.push(data.value)
- }
- }
- static from (data, opts) {
- if (isReadStreamx(data)) return data
- if (data[asyncIterator]) return this._fromAsyncIterator(data[asyncIterator](), opts)
- if (!Array.isArray(data)) data = data === undefined ? [] : [data]
- let i = 0
- return new Readable({
- ...opts,
- read (cb) {
- this.push(i === data.length ? null : data[i++])
- cb(null)
- }
- })
- }
- static isBackpressured (rs) {
- return (rs._duplexState & READ_BACKPRESSURE_STATUS) !== 0 || rs._readableState.buffered >= rs._readableState.highWaterMark
- }
- static isPaused (rs) {
- return (rs._duplexState & READ_RESUMED) === 0
- }
- [asyncIterator] () {
- const stream = this
- let error = null
- let promiseResolve = null
- let promiseReject = null
- this.on('error', (err) => { error = err })
- this.on('readable', onreadable)
- this.on('close', onclose)
- return {
- [asyncIterator] () {
- return this
- },
- next () {
- return new Promise(function (resolve, reject) {
- promiseResolve = resolve
- promiseReject = reject
- const data = stream.read()
- if (data !== null) ondata(data)
- else if ((stream._duplexState & DESTROYED) !== 0) ondata(null)
- })
- },
- return () {
- return destroy(null)
- },
- throw (err) {
- return destroy(err)
- }
- }
- function onreadable () {
- if (promiseResolve !== null) ondata(stream.read())
- }
- function onclose () {
- if (promiseResolve !== null) ondata(null)
- }
- function ondata (data) {
- if (promiseReject === null) return
- if (error) promiseReject(error)
- else if (data === null && (stream._duplexState & READ_DONE) === 0) promiseReject(STREAM_DESTROYED)
- else promiseResolve({ value: data, done: data === null })
- promiseReject = promiseResolve = null
- }
- function destroy (err) {
- stream.destroy(err)
- return new Promise((resolve, reject) => {
- if (stream._duplexState & DESTROYED) return resolve({ value: undefined, done: true })
- stream.once('close', function () {
- if (err) reject(err)
- else resolve({ value: undefined, done: true })
- })
- })
- }
- }
- }
- class Writable extends Stream {
- constructor (opts) {
- super(opts)
- this._duplexState |= OPENING | READ_DONE
- this._writableState = new WritableState(this, opts)
- if (opts) {
- if (opts.writev) this._writev = opts.writev
- if (opts.write) this._write = opts.write
- if (opts.final) this._final = opts.final
- }
- }
- _writev (batch, cb) {
- cb(null)
- }
- _write (data, cb) {
- this._writableState.autoBatch(data, cb)
- }
- _final (cb) {
- cb(null)
- }
- static isBackpressured (ws) {
- return (ws._duplexState & WRITE_BACKPRESSURE_STATUS) !== 0
- }
- write (data) {
- this._writableState.updateNextTick()
- return this._writableState.push(data)
- }
- end (data) {
- this._writableState.updateNextTick()
- this._writableState.end(data)
- return this
- }
- }
- class Duplex extends Readable { // and Writable
- constructor (opts) {
- super(opts)
- this._duplexState = OPENING
- this._writableState = new WritableState(this, opts)
- if (opts) {
- if (opts.writev) this._writev = opts.writev
- if (opts.write) this._write = opts.write
- if (opts.final) this._final = opts.final
- }
- }
- _writev (batch, cb) {
- cb(null)
- }
- _write (data, cb) {
- this._writableState.autoBatch(data, cb)
- }
- _final (cb) {
- cb(null)
- }
- write (data) {
- this._writableState.updateNextTick()
- return this._writableState.push(data)
- }
- end (data) {
- this._writableState.updateNextTick()
- this._writableState.end(data)
- return this
- }
- }
- class Transform extends Duplex {
- constructor (opts) {
- super(opts)
- this._transformState = new TransformState(this)
- if (opts) {
- if (opts.transform) this._transform = opts.transform
- if (opts.flush) this._flush = opts.flush
- }
- }
- _write (data, cb) {
- if (this._readableState.buffered >= this._readableState.highWaterMark) {
- this._transformState.data = data
- } else {
- this._transform(data, this._transformState.afterTransform)
- }
- }
- _read (cb) {
- if (this._transformState.data !== null) {
- const data = this._transformState.data
- this._transformState.data = null
- cb(null)
- this._transform(data, this._transformState.afterTransform)
- } else {
- cb(null)
- }
- }
- _transform (data, cb) {
- cb(null, data)
- }
- _flush (cb) {
- cb(null)
- }
- _final (cb) {
- this._transformState.afterFinal = cb
- this._flush(transformAfterFlush.bind(this))
- }
- }
- class PassThrough extends Transform {}
- function transformAfterFlush (err, data) {
- const cb = this._transformState.afterFinal
- if (err) return cb(err)
- if (data !== null && data !== undefined) this.push(data)
- this.push(null)
- cb(null)
- }
- function pipelinePromise (...streams) {
- return new Promise((resolve, reject) => {
- return pipeline(...streams, (err) => {
- if (err) return reject(err)
- resolve()
- })
- })
- }
- function pipeline (stream, ...streams) {
- const all = Array.isArray(stream) ? [...stream, ...streams] : [stream, ...streams]
- const done = (all.length && typeof all[all.length - 1] === 'function') ? all.pop() : null
- if (all.length < 2) throw new Error('Pipeline requires at least 2 streams')
- let src = all[0]
- let dest = null
- let error = null
- for (let i = 1; i < all.length; i++) {
- dest = all[i]
- if (isStreamx(src)) {
- src.pipe(dest, onerror)
- } else {
- errorHandle(src, true, i > 1, onerror)
- src.pipe(dest)
- }
- src = dest
- }
- if (done) {
- let fin = false
- dest.on('finish', () => { fin = true })
- dest.on('error', err => { error = error || err })
- dest.on('close', () => done(error || (fin ? null : PREMATURE_CLOSE)))
- }
- return dest
- function errorHandle (s, rd, wr, onerror) {
- s.on('error', onerror)
- s.on('close', onclose)
- function onclose () {
- if (rd && s._readableState && !s._readableState.ended) return onerror(PREMATURE_CLOSE)
- if (wr && s._writableState && !s._writableState.ended) return onerror(PREMATURE_CLOSE)
- }
- }
- function onerror (err) {
- if (!err || error) return
- error = err
- for (const s of all) {
- s.destroy(err)
- }
- }
- }
- function isStream (stream) {
- return !!stream._readableState || !!stream._writableState
- }
- function isStreamx (stream) {
- return typeof stream._duplexState === 'number' && isStream(stream)
- }
- function isReadStreamx (stream) {
- return isStreamx(stream) && stream.readable
- }
- function isTypedArray (data) {
- return typeof data === 'object' && data !== null && typeof data.byteLength === 'number'
- }
- function defaultByteLength (data) {
- return isTypedArray(data) ? data.byteLength : 1024
- }
- function noop () {}
- function abort () {
- this.destroy(new Error('Stream aborted.'))
- }
- module.exports = {
- pipeline,
- pipelinePromise,
- isStream,
- isStreamx,
- Stream,
- Writable,
- Readable,
- Duplex,
- Transform,
- // Export PassThrough for compatibility with Node.js core's stream module
- PassThrough
- }
|