index.js 26 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991
  1. const { EventEmitter } = require('events')
  2. const STREAM_DESTROYED = new Error('Stream was destroyed')
  3. const PREMATURE_CLOSE = new Error('Premature close')
  4. const queueTick = require('queue-tick')
  5. const FIFO = require('fast-fifo')
  6. /* eslint-disable no-multi-spaces */
  7. const MAX = ((1 << 25) - 1)
  8. // Shared state
  9. const OPENING = 0b001
  10. const DESTROYING = 0b010
  11. const DESTROYED = 0b100
  12. const NOT_OPENING = MAX ^ OPENING
  13. // Read state
  14. const READ_ACTIVE = 0b0000000000001 << 3
  15. const READ_PRIMARY = 0b0000000000010 << 3
  16. const READ_SYNC = 0b0000000000100 << 3
  17. const READ_QUEUED = 0b0000000001000 << 3
  18. const READ_RESUMED = 0b0000000010000 << 3
  19. const READ_PIPE_DRAINED = 0b0000000100000 << 3
  20. const READ_ENDING = 0b0000001000000 << 3
  21. const READ_EMIT_DATA = 0b0000010000000 << 3
  22. const READ_EMIT_READABLE = 0b0000100000000 << 3
  23. const READ_EMITTED_READABLE = 0b0001000000000 << 3
  24. const READ_DONE = 0b0010000000000 << 3
  25. const READ_NEXT_TICK = 0b0100000000001 << 3 // also active
  26. const READ_NEEDS_PUSH = 0b1000000000000 << 3
  27. const READ_NOT_ACTIVE = MAX ^ READ_ACTIVE
  28. const READ_NON_PRIMARY = MAX ^ READ_PRIMARY
  29. const READ_NON_PRIMARY_AND_PUSHED = MAX ^ (READ_PRIMARY | READ_NEEDS_PUSH)
  30. const READ_NOT_SYNC = MAX ^ READ_SYNC
  31. const READ_PUSHED = MAX ^ READ_NEEDS_PUSH
  32. const READ_PAUSED = MAX ^ READ_RESUMED
  33. const READ_NOT_QUEUED = MAX ^ (READ_QUEUED | READ_EMITTED_READABLE)
  34. const READ_NOT_ENDING = MAX ^ READ_ENDING
  35. const READ_PIPE_NOT_DRAINED = MAX ^ (READ_RESUMED | READ_PIPE_DRAINED)
  36. const READ_NOT_NEXT_TICK = MAX ^ READ_NEXT_TICK
  37. // Write state
  38. const WRITE_ACTIVE = 0b000000001 << 16
  39. const WRITE_PRIMARY = 0b000000010 << 16
  40. const WRITE_SYNC = 0b000000100 << 16
  41. const WRITE_QUEUED = 0b000001000 << 16
  42. const WRITE_UNDRAINED = 0b000010000 << 16
  43. const WRITE_DONE = 0b000100000 << 16
  44. const WRITE_EMIT_DRAIN = 0b001000000 << 16
  45. const WRITE_NEXT_TICK = 0b010000001 << 16 // also active
  46. const WRITE_FINISHING = 0b100000000 << 16
  47. const WRITE_NOT_ACTIVE = MAX ^ WRITE_ACTIVE
  48. const WRITE_NOT_SYNC = MAX ^ WRITE_SYNC
  49. const WRITE_NON_PRIMARY = MAX ^ WRITE_PRIMARY
  50. const WRITE_NOT_FINISHING = MAX ^ WRITE_FINISHING
  51. const WRITE_DRAINED = MAX ^ WRITE_UNDRAINED
  52. const WRITE_NOT_QUEUED = MAX ^ WRITE_QUEUED
  53. const WRITE_NOT_NEXT_TICK = MAX ^ WRITE_NEXT_TICK
  54. // Combined shared state
  55. const ACTIVE = READ_ACTIVE | WRITE_ACTIVE
  56. const NOT_ACTIVE = MAX ^ ACTIVE
  57. const DONE = READ_DONE | WRITE_DONE
  58. const DESTROY_STATUS = DESTROYING | DESTROYED
  59. const OPEN_STATUS = DESTROY_STATUS | OPENING
  60. const AUTO_DESTROY = DESTROY_STATUS | DONE
  61. const NON_PRIMARY = WRITE_NON_PRIMARY & READ_NON_PRIMARY
  62. const TICKING = (WRITE_NEXT_TICK | READ_NEXT_TICK) & NOT_ACTIVE
  63. const ACTIVE_OR_TICKING = ACTIVE | TICKING
  64. const IS_OPENING = OPEN_STATUS | TICKING
  65. // Combined read state
  66. const READ_PRIMARY_STATUS = OPEN_STATUS | READ_ENDING | READ_DONE
  67. const READ_STATUS = OPEN_STATUS | READ_DONE | READ_QUEUED
  68. const READ_FLOWING = READ_RESUMED | READ_PIPE_DRAINED
  69. const READ_ACTIVE_AND_SYNC = READ_ACTIVE | READ_SYNC
  70. const READ_ACTIVE_AND_SYNC_AND_NEEDS_PUSH = READ_ACTIVE | READ_SYNC | READ_NEEDS_PUSH
  71. const READ_PRIMARY_AND_ACTIVE = READ_PRIMARY | READ_ACTIVE
  72. const READ_ENDING_STATUS = OPEN_STATUS | READ_ENDING | READ_QUEUED
  73. const READ_EMIT_READABLE_AND_QUEUED = READ_EMIT_READABLE | READ_QUEUED
  74. const READ_READABLE_STATUS = OPEN_STATUS | READ_EMIT_READABLE | READ_QUEUED | READ_EMITTED_READABLE
  75. const SHOULD_NOT_READ = OPEN_STATUS | READ_ACTIVE | READ_ENDING | READ_DONE | READ_NEEDS_PUSH
  76. const READ_BACKPRESSURE_STATUS = DESTROY_STATUS | READ_ENDING | READ_DONE
  77. // Combined write state
  78. const WRITE_PRIMARY_STATUS = OPEN_STATUS | WRITE_FINISHING | WRITE_DONE
  79. const WRITE_QUEUED_AND_UNDRAINED = WRITE_QUEUED | WRITE_UNDRAINED
  80. const WRITE_QUEUED_AND_ACTIVE = WRITE_QUEUED | WRITE_ACTIVE
  81. const WRITE_DRAIN_STATUS = WRITE_QUEUED | WRITE_UNDRAINED | OPEN_STATUS | WRITE_ACTIVE
  82. const WRITE_STATUS = OPEN_STATUS | WRITE_ACTIVE | WRITE_QUEUED
  83. const WRITE_PRIMARY_AND_ACTIVE = WRITE_PRIMARY | WRITE_ACTIVE
  84. const WRITE_ACTIVE_AND_SYNC = WRITE_ACTIVE | WRITE_SYNC
  85. const WRITE_FINISHING_STATUS = OPEN_STATUS | WRITE_FINISHING | WRITE_QUEUED_AND_ACTIVE | WRITE_DONE
  86. const WRITE_BACKPRESSURE_STATUS = WRITE_UNDRAINED | DESTROY_STATUS | WRITE_FINISHING | WRITE_DONE
  87. const asyncIterator = Symbol.asyncIterator || Symbol('asyncIterator')
  88. class WritableState {
  89. constructor (stream, { highWaterMark = 16384, map = null, mapWritable, byteLength, byteLengthWritable } = {}) {
  90. this.stream = stream
  91. this.queue = new FIFO()
  92. this.highWaterMark = highWaterMark
  93. this.buffered = 0
  94. this.error = null
  95. this.pipeline = null
  96. this.byteLength = byteLengthWritable || byteLength || defaultByteLength
  97. this.map = mapWritable || map
  98. this.afterWrite = afterWrite.bind(this)
  99. this.afterUpdateNextTick = updateWriteNT.bind(this)
  100. }
  101. get ended () {
  102. return (this.stream._duplexState & WRITE_DONE) !== 0
  103. }
  104. push (data) {
  105. if (this.map !== null) data = this.map(data)
  106. this.buffered += this.byteLength(data)
  107. this.queue.push(data)
  108. if (this.buffered < this.highWaterMark) {
  109. this.stream._duplexState |= WRITE_QUEUED
  110. return true
  111. }
  112. this.stream._duplexState |= WRITE_QUEUED_AND_UNDRAINED
  113. return false
  114. }
  115. shift () {
  116. const data = this.queue.shift()
  117. const stream = this.stream
  118. this.buffered -= this.byteLength(data)
  119. if (this.buffered === 0) stream._duplexState &= WRITE_NOT_QUEUED
  120. return data
  121. }
  122. end (data) {
  123. if (typeof data === 'function') this.stream.once('finish', data)
  124. else if (data !== undefined && data !== null) this.push(data)
  125. this.stream._duplexState = (this.stream._duplexState | WRITE_FINISHING) & WRITE_NON_PRIMARY
  126. }
  127. autoBatch (data, cb) {
  128. const buffer = []
  129. const stream = this.stream
  130. buffer.push(data)
  131. while ((stream._duplexState & WRITE_STATUS) === WRITE_QUEUED_AND_ACTIVE) {
  132. buffer.push(stream._writableState.shift())
  133. }
  134. if ((stream._duplexState & OPEN_STATUS) !== 0) return cb(null)
  135. stream._writev(buffer, cb)
  136. }
  137. update () {
  138. const stream = this.stream
  139. while ((stream._duplexState & WRITE_STATUS) === WRITE_QUEUED) {
  140. const data = this.shift()
  141. stream._duplexState |= WRITE_ACTIVE_AND_SYNC
  142. stream._write(data, this.afterWrite)
  143. stream._duplexState &= WRITE_NOT_SYNC
  144. }
  145. if ((stream._duplexState & WRITE_PRIMARY_AND_ACTIVE) === 0) this.updateNonPrimary()
  146. }
  147. updateNonPrimary () {
  148. const stream = this.stream
  149. if ((stream._duplexState & WRITE_FINISHING_STATUS) === WRITE_FINISHING) {
  150. stream._duplexState = (stream._duplexState | WRITE_ACTIVE) & WRITE_NOT_FINISHING
  151. stream._final(afterFinal.bind(this))
  152. return
  153. }
  154. if ((stream._duplexState & DESTROY_STATUS) === DESTROYING) {
  155. if ((stream._duplexState & ACTIVE_OR_TICKING) === 0) {
  156. stream._duplexState |= ACTIVE
  157. stream._destroy(afterDestroy.bind(this))
  158. }
  159. return
  160. }
  161. if ((stream._duplexState & IS_OPENING) === OPENING) {
  162. stream._duplexState = (stream._duplexState | ACTIVE) & NOT_OPENING
  163. stream._open(afterOpen.bind(this))
  164. }
  165. }
  166. updateNextTick () {
  167. if ((this.stream._duplexState & WRITE_NEXT_TICK) !== 0) return
  168. this.stream._duplexState |= WRITE_NEXT_TICK
  169. queueTick(this.afterUpdateNextTick)
  170. }
  171. }
  172. class ReadableState {
  173. constructor (stream, { highWaterMark = 16384, map = null, mapReadable, byteLength, byteLengthReadable } = {}) {
  174. this.stream = stream
  175. this.queue = new FIFO()
  176. this.highWaterMark = highWaterMark
  177. this.buffered = 0
  178. this.error = null
  179. this.pipeline = null
  180. this.byteLength = byteLengthReadable || byteLength || defaultByteLength
  181. this.map = mapReadable || map
  182. this.pipeTo = null
  183. this.afterRead = afterRead.bind(this)
  184. this.afterUpdateNextTick = updateReadNT.bind(this)
  185. }
  186. get ended () {
  187. return (this.stream._duplexState & READ_DONE) !== 0
  188. }
  189. pipe (pipeTo, cb) {
  190. if (this.pipeTo !== null) throw new Error('Can only pipe to one destination')
  191. if (typeof cb !== 'function') cb = null
  192. this.stream._duplexState |= READ_PIPE_DRAINED
  193. this.pipeTo = pipeTo
  194. this.pipeline = new Pipeline(this.stream, pipeTo, cb)
  195. if (cb) this.stream.on('error', noop) // We already error handle this so supress crashes
  196. if (isStreamx(pipeTo)) {
  197. pipeTo._writableState.pipeline = this.pipeline
  198. if (cb) pipeTo.on('error', noop) // We already error handle this so supress crashes
  199. pipeTo.on('finish', this.pipeline.finished.bind(this.pipeline)) // TODO: just call finished from pipeTo itself
  200. } else {
  201. const onerror = this.pipeline.done.bind(this.pipeline, pipeTo)
  202. const onclose = this.pipeline.done.bind(this.pipeline, pipeTo, null) // onclose has a weird bool arg
  203. pipeTo.on('error', onerror)
  204. pipeTo.on('close', onclose)
  205. pipeTo.on('finish', this.pipeline.finished.bind(this.pipeline))
  206. }
  207. pipeTo.on('drain', afterDrain.bind(this))
  208. this.stream.emit('piping', pipeTo)
  209. pipeTo.emit('pipe', this.stream)
  210. }
  211. push (data) {
  212. const stream = this.stream
  213. if (data === null) {
  214. this.highWaterMark = 0
  215. stream._duplexState = (stream._duplexState | READ_ENDING) & READ_NON_PRIMARY_AND_PUSHED
  216. return false
  217. }
  218. if (this.map !== null) data = this.map(data)
  219. this.buffered += this.byteLength(data)
  220. this.queue.push(data)
  221. stream._duplexState = (stream._duplexState | READ_QUEUED) & READ_PUSHED
  222. return this.buffered < this.highWaterMark
  223. }
  224. shift () {
  225. const data = this.queue.shift()
  226. this.buffered -= this.byteLength(data)
  227. if (this.buffered === 0) this.stream._duplexState &= READ_NOT_QUEUED
  228. return data
  229. }
  230. unshift (data) {
  231. let tail
  232. const pending = []
  233. while ((tail = this.queue.shift()) !== undefined) {
  234. pending.push(tail)
  235. }
  236. this.push(data)
  237. for (let i = 0; i < pending.length; i++) {
  238. this.queue.push(pending[i])
  239. }
  240. }
  241. read () {
  242. const stream = this.stream
  243. if ((stream._duplexState & READ_STATUS) === READ_QUEUED) {
  244. const data = this.shift()
  245. if (this.pipeTo !== null && this.pipeTo.write(data) === false) stream._duplexState &= READ_PIPE_NOT_DRAINED
  246. if ((stream._duplexState & READ_EMIT_DATA) !== 0) stream.emit('data', data)
  247. return data
  248. }
  249. return null
  250. }
  251. drain () {
  252. const stream = this.stream
  253. while ((stream._duplexState & READ_STATUS) === READ_QUEUED && (stream._duplexState & READ_FLOWING) !== 0) {
  254. const data = this.shift()
  255. if (this.pipeTo !== null && this.pipeTo.write(data) === false) stream._duplexState &= READ_PIPE_NOT_DRAINED
  256. if ((stream._duplexState & READ_EMIT_DATA) !== 0) stream.emit('data', data)
  257. }
  258. }
  259. update () {
  260. const stream = this.stream
  261. this.drain()
  262. while (this.buffered < this.highWaterMark && (stream._duplexState & SHOULD_NOT_READ) === 0) {
  263. stream._duplexState |= READ_ACTIVE_AND_SYNC_AND_NEEDS_PUSH
  264. stream._read(this.afterRead)
  265. stream._duplexState &= READ_NOT_SYNC
  266. if ((stream._duplexState & READ_ACTIVE) === 0) this.drain()
  267. }
  268. if ((stream._duplexState & READ_READABLE_STATUS) === READ_EMIT_READABLE_AND_QUEUED) {
  269. stream._duplexState |= READ_EMITTED_READABLE
  270. stream.emit('readable')
  271. }
  272. if ((stream._duplexState & READ_PRIMARY_AND_ACTIVE) === 0) this.updateNonPrimary()
  273. }
  274. updateNonPrimary () {
  275. const stream = this.stream
  276. if ((stream._duplexState & READ_ENDING_STATUS) === READ_ENDING) {
  277. stream._duplexState = (stream._duplexState | READ_DONE) & READ_NOT_ENDING
  278. stream.emit('end')
  279. if ((stream._duplexState & AUTO_DESTROY) === DONE) stream._duplexState |= DESTROYING
  280. if (this.pipeTo !== null) this.pipeTo.end()
  281. }
  282. if ((stream._duplexState & DESTROY_STATUS) === DESTROYING) {
  283. if ((stream._duplexState & ACTIVE_OR_TICKING) === 0) {
  284. stream._duplexState |= ACTIVE
  285. stream._destroy(afterDestroy.bind(this))
  286. }
  287. return
  288. }
  289. if ((stream._duplexState & IS_OPENING) === OPENING) {
  290. stream._duplexState = (stream._duplexState | ACTIVE) & NOT_OPENING
  291. stream._open(afterOpen.bind(this))
  292. }
  293. }
  294. updateNextTick () {
  295. if ((this.stream._duplexState & READ_NEXT_TICK) !== 0) return
  296. this.stream._duplexState |= READ_NEXT_TICK
  297. queueTick(this.afterUpdateNextTick)
  298. }
  299. }
  300. class TransformState {
  301. constructor (stream) {
  302. this.data = null
  303. this.afterTransform = afterTransform.bind(stream)
  304. this.afterFinal = null
  305. }
  306. }
  307. class Pipeline {
  308. constructor (src, dst, cb) {
  309. this.from = src
  310. this.to = dst
  311. this.afterPipe = cb
  312. this.error = null
  313. this.pipeToFinished = false
  314. }
  315. finished () {
  316. this.pipeToFinished = true
  317. }
  318. done (stream, err) {
  319. if (err) this.error = err
  320. if (stream === this.to) {
  321. this.to = null
  322. if (this.from !== null) {
  323. if ((this.from._duplexState & READ_DONE) === 0 || !this.pipeToFinished) {
  324. this.from.destroy(this.error || new Error('Writable stream closed prematurely'))
  325. }
  326. return
  327. }
  328. }
  329. if (stream === this.from) {
  330. this.from = null
  331. if (this.to !== null) {
  332. if ((stream._duplexState & READ_DONE) === 0) {
  333. this.to.destroy(this.error || new Error('Readable stream closed before ending'))
  334. }
  335. return
  336. }
  337. }
  338. if (this.afterPipe !== null) this.afterPipe(this.error)
  339. this.to = this.from = this.afterPipe = null
  340. }
  341. }
  342. function afterDrain () {
  343. this.stream._duplexState |= READ_PIPE_DRAINED
  344. if ((this.stream._duplexState & READ_ACTIVE_AND_SYNC) === 0) this.updateNextTick()
  345. else this.drain()
  346. }
  347. function afterFinal (err) {
  348. const stream = this.stream
  349. if (err) stream.destroy(err)
  350. if ((stream._duplexState & DESTROY_STATUS) === 0) {
  351. stream._duplexState |= WRITE_DONE
  352. stream.emit('finish')
  353. }
  354. if ((stream._duplexState & AUTO_DESTROY) === DONE) {
  355. stream._duplexState |= DESTROYING
  356. }
  357. stream._duplexState &= WRITE_NOT_ACTIVE
  358. this.update()
  359. }
  360. function afterDestroy (err) {
  361. const stream = this.stream
  362. if (!err && this.error !== STREAM_DESTROYED) err = this.error
  363. if (err) stream.emit('error', err)
  364. stream._duplexState |= DESTROYED
  365. stream.emit('close')
  366. const rs = stream._readableState
  367. const ws = stream._writableState
  368. if (rs !== null && rs.pipeline !== null) rs.pipeline.done(stream, err)
  369. if (ws !== null && ws.pipeline !== null) ws.pipeline.done(stream, err)
  370. }
  371. function afterWrite (err) {
  372. const stream = this.stream
  373. if (err) stream.destroy(err)
  374. stream._duplexState &= WRITE_NOT_ACTIVE
  375. if ((stream._duplexState & WRITE_DRAIN_STATUS) === WRITE_UNDRAINED) {
  376. stream._duplexState &= WRITE_DRAINED
  377. if ((stream._duplexState & WRITE_EMIT_DRAIN) === WRITE_EMIT_DRAIN) {
  378. stream.emit('drain')
  379. }
  380. }
  381. if ((stream._duplexState & WRITE_SYNC) === 0) this.update()
  382. }
  383. function afterRead (err) {
  384. if (err) this.stream.destroy(err)
  385. this.stream._duplexState &= READ_NOT_ACTIVE
  386. if ((this.stream._duplexState & READ_SYNC) === 0) this.update()
  387. }
  388. function updateReadNT () {
  389. this.stream._duplexState &= READ_NOT_NEXT_TICK
  390. this.update()
  391. }
  392. function updateWriteNT () {
  393. this.stream._duplexState &= WRITE_NOT_NEXT_TICK
  394. this.update()
  395. }
  396. function afterOpen (err) {
  397. const stream = this.stream
  398. if (err) stream.destroy(err)
  399. if ((stream._duplexState & DESTROYING) === 0) {
  400. if ((stream._duplexState & READ_PRIMARY_STATUS) === 0) stream._duplexState |= READ_PRIMARY
  401. if ((stream._duplexState & WRITE_PRIMARY_STATUS) === 0) stream._duplexState |= WRITE_PRIMARY
  402. stream.emit('open')
  403. }
  404. stream._duplexState &= NOT_ACTIVE
  405. if (stream._writableState !== null) {
  406. stream._writableState.update()
  407. }
  408. if (stream._readableState !== null) {
  409. stream._readableState.update()
  410. }
  411. }
  412. function afterTransform (err, data) {
  413. if (data !== undefined && data !== null) this.push(data)
  414. this._writableState.afterWrite(err)
  415. }
  416. class Stream extends EventEmitter {
  417. constructor (opts) {
  418. super()
  419. this._duplexState = 0
  420. this._readableState = null
  421. this._writableState = null
  422. if (opts) {
  423. if (opts.open) this._open = opts.open
  424. if (opts.destroy) this._destroy = opts.destroy
  425. if (opts.predestroy) this._predestroy = opts.predestroy
  426. if (opts.signal) {
  427. opts.signal.addEventListener('abort', abort.bind(this))
  428. }
  429. }
  430. }
  431. _open (cb) {
  432. cb(null)
  433. }
  434. _destroy (cb) {
  435. cb(null)
  436. }
  437. _predestroy () {
  438. // does nothing
  439. }
  440. get readable () {
  441. return this._readableState !== null ? true : undefined
  442. }
  443. get writable () {
  444. return this._writableState !== null ? true : undefined
  445. }
  446. get destroyed () {
  447. return (this._duplexState & DESTROYED) !== 0
  448. }
  449. get destroying () {
  450. return (this._duplexState & DESTROY_STATUS) !== 0
  451. }
  452. destroy (err) {
  453. if ((this._duplexState & DESTROY_STATUS) === 0) {
  454. if (!err) err = STREAM_DESTROYED
  455. this._duplexState = (this._duplexState | DESTROYING) & NON_PRIMARY
  456. if (this._readableState !== null) {
  457. this._readableState.error = err
  458. this._readableState.updateNextTick()
  459. }
  460. if (this._writableState !== null) {
  461. this._writableState.error = err
  462. this._writableState.updateNextTick()
  463. }
  464. this._predestroy()
  465. }
  466. }
  467. on (name, fn) {
  468. if (this._readableState !== null) {
  469. if (name === 'data') {
  470. this._duplexState |= (READ_EMIT_DATA | READ_RESUMED)
  471. this._readableState.updateNextTick()
  472. }
  473. if (name === 'readable') {
  474. this._duplexState |= READ_EMIT_READABLE
  475. this._readableState.updateNextTick()
  476. }
  477. }
  478. if (this._writableState !== null) {
  479. if (name === 'drain') {
  480. this._duplexState |= WRITE_EMIT_DRAIN
  481. this._writableState.updateNextTick()
  482. }
  483. }
  484. return super.on(name, fn)
  485. }
  486. }
  487. class Readable extends Stream {
  488. constructor (opts) {
  489. super(opts)
  490. this._duplexState |= OPENING | WRITE_DONE
  491. this._readableState = new ReadableState(this, opts)
  492. if (opts) {
  493. if (opts.read) this._read = opts.read
  494. if (opts.eagerOpen) this.resume().pause()
  495. }
  496. }
  497. _read (cb) {
  498. cb(null)
  499. }
  500. pipe (dest, cb) {
  501. this._readableState.pipe(dest, cb)
  502. this._readableState.updateNextTick()
  503. return dest
  504. }
  505. read () {
  506. this._readableState.updateNextTick()
  507. return this._readableState.read()
  508. }
  509. push (data) {
  510. this._readableState.updateNextTick()
  511. return this._readableState.push(data)
  512. }
  513. unshift (data) {
  514. this._readableState.updateNextTick()
  515. return this._readableState.unshift(data)
  516. }
  517. resume () {
  518. this._duplexState |= READ_RESUMED
  519. this._readableState.updateNextTick()
  520. return this
  521. }
  522. pause () {
  523. this._duplexState &= READ_PAUSED
  524. return this
  525. }
  526. static _fromAsyncIterator (ite, opts) {
  527. let destroy
  528. const rs = new Readable({
  529. ...opts,
  530. read (cb) {
  531. ite.next().then(push).then(cb.bind(null, null)).catch(cb)
  532. },
  533. predestroy () {
  534. destroy = ite.return()
  535. },
  536. destroy (cb) {
  537. if (!destroy) return cb(null)
  538. destroy.then(cb.bind(null, null)).catch(cb)
  539. }
  540. })
  541. return rs
  542. function push (data) {
  543. if (data.done) rs.push(null)
  544. else rs.push(data.value)
  545. }
  546. }
  547. static from (data, opts) {
  548. if (isReadStreamx(data)) return data
  549. if (data[asyncIterator]) return this._fromAsyncIterator(data[asyncIterator](), opts)
  550. if (!Array.isArray(data)) data = data === undefined ? [] : [data]
  551. let i = 0
  552. return new Readable({
  553. ...opts,
  554. read (cb) {
  555. this.push(i === data.length ? null : data[i++])
  556. cb(null)
  557. }
  558. })
  559. }
  560. static isBackpressured (rs) {
  561. return (rs._duplexState & READ_BACKPRESSURE_STATUS) !== 0 || rs._readableState.buffered >= rs._readableState.highWaterMark
  562. }
  563. static isPaused (rs) {
  564. return (rs._duplexState & READ_RESUMED) === 0
  565. }
  566. [asyncIterator] () {
  567. const stream = this
  568. let error = null
  569. let promiseResolve = null
  570. let promiseReject = null
  571. this.on('error', (err) => { error = err })
  572. this.on('readable', onreadable)
  573. this.on('close', onclose)
  574. return {
  575. [asyncIterator] () {
  576. return this
  577. },
  578. next () {
  579. return new Promise(function (resolve, reject) {
  580. promiseResolve = resolve
  581. promiseReject = reject
  582. const data = stream.read()
  583. if (data !== null) ondata(data)
  584. else if ((stream._duplexState & DESTROYED) !== 0) ondata(null)
  585. })
  586. },
  587. return () {
  588. return destroy(null)
  589. },
  590. throw (err) {
  591. return destroy(err)
  592. }
  593. }
  594. function onreadable () {
  595. if (promiseResolve !== null) ondata(stream.read())
  596. }
  597. function onclose () {
  598. if (promiseResolve !== null) ondata(null)
  599. }
  600. function ondata (data) {
  601. if (promiseReject === null) return
  602. if (error) promiseReject(error)
  603. else if (data === null && (stream._duplexState & READ_DONE) === 0) promiseReject(STREAM_DESTROYED)
  604. else promiseResolve({ value: data, done: data === null })
  605. promiseReject = promiseResolve = null
  606. }
  607. function destroy (err) {
  608. stream.destroy(err)
  609. return new Promise((resolve, reject) => {
  610. if (stream._duplexState & DESTROYED) return resolve({ value: undefined, done: true })
  611. stream.once('close', function () {
  612. if (err) reject(err)
  613. else resolve({ value: undefined, done: true })
  614. })
  615. })
  616. }
  617. }
  618. }
  619. class Writable extends Stream {
  620. constructor (opts) {
  621. super(opts)
  622. this._duplexState |= OPENING | READ_DONE
  623. this._writableState = new WritableState(this, opts)
  624. if (opts) {
  625. if (opts.writev) this._writev = opts.writev
  626. if (opts.write) this._write = opts.write
  627. if (opts.final) this._final = opts.final
  628. }
  629. }
  630. _writev (batch, cb) {
  631. cb(null)
  632. }
  633. _write (data, cb) {
  634. this._writableState.autoBatch(data, cb)
  635. }
  636. _final (cb) {
  637. cb(null)
  638. }
  639. static isBackpressured (ws) {
  640. return (ws._duplexState & WRITE_BACKPRESSURE_STATUS) !== 0
  641. }
  642. write (data) {
  643. this._writableState.updateNextTick()
  644. return this._writableState.push(data)
  645. }
  646. end (data) {
  647. this._writableState.updateNextTick()
  648. this._writableState.end(data)
  649. return this
  650. }
  651. }
  652. class Duplex extends Readable { // and Writable
  653. constructor (opts) {
  654. super(opts)
  655. this._duplexState = OPENING
  656. this._writableState = new WritableState(this, opts)
  657. if (opts) {
  658. if (opts.writev) this._writev = opts.writev
  659. if (opts.write) this._write = opts.write
  660. if (opts.final) this._final = opts.final
  661. }
  662. }
  663. _writev (batch, cb) {
  664. cb(null)
  665. }
  666. _write (data, cb) {
  667. this._writableState.autoBatch(data, cb)
  668. }
  669. _final (cb) {
  670. cb(null)
  671. }
  672. write (data) {
  673. this._writableState.updateNextTick()
  674. return this._writableState.push(data)
  675. }
  676. end (data) {
  677. this._writableState.updateNextTick()
  678. this._writableState.end(data)
  679. return this
  680. }
  681. }
  682. class Transform extends Duplex {
  683. constructor (opts) {
  684. super(opts)
  685. this._transformState = new TransformState(this)
  686. if (opts) {
  687. if (opts.transform) this._transform = opts.transform
  688. if (opts.flush) this._flush = opts.flush
  689. }
  690. }
  691. _write (data, cb) {
  692. if (this._readableState.buffered >= this._readableState.highWaterMark) {
  693. this._transformState.data = data
  694. } else {
  695. this._transform(data, this._transformState.afterTransform)
  696. }
  697. }
  698. _read (cb) {
  699. if (this._transformState.data !== null) {
  700. const data = this._transformState.data
  701. this._transformState.data = null
  702. cb(null)
  703. this._transform(data, this._transformState.afterTransform)
  704. } else {
  705. cb(null)
  706. }
  707. }
  708. _transform (data, cb) {
  709. cb(null, data)
  710. }
  711. _flush (cb) {
  712. cb(null)
  713. }
  714. _final (cb) {
  715. this._transformState.afterFinal = cb
  716. this._flush(transformAfterFlush.bind(this))
  717. }
  718. }
  719. class PassThrough extends Transform {}
  720. function transformAfterFlush (err, data) {
  721. const cb = this._transformState.afterFinal
  722. if (err) return cb(err)
  723. if (data !== null && data !== undefined) this.push(data)
  724. this.push(null)
  725. cb(null)
  726. }
  727. function pipelinePromise (...streams) {
  728. return new Promise((resolve, reject) => {
  729. return pipeline(...streams, (err) => {
  730. if (err) return reject(err)
  731. resolve()
  732. })
  733. })
  734. }
  735. function pipeline (stream, ...streams) {
  736. const all = Array.isArray(stream) ? [...stream, ...streams] : [stream, ...streams]
  737. const done = (all.length && typeof all[all.length - 1] === 'function') ? all.pop() : null
  738. if (all.length < 2) throw new Error('Pipeline requires at least 2 streams')
  739. let src = all[0]
  740. let dest = null
  741. let error = null
  742. for (let i = 1; i < all.length; i++) {
  743. dest = all[i]
  744. if (isStreamx(src)) {
  745. src.pipe(dest, onerror)
  746. } else {
  747. errorHandle(src, true, i > 1, onerror)
  748. src.pipe(dest)
  749. }
  750. src = dest
  751. }
  752. if (done) {
  753. let fin = false
  754. dest.on('finish', () => { fin = true })
  755. dest.on('error', err => { error = error || err })
  756. dest.on('close', () => done(error || (fin ? null : PREMATURE_CLOSE)))
  757. }
  758. return dest
  759. function errorHandle (s, rd, wr, onerror) {
  760. s.on('error', onerror)
  761. s.on('close', onclose)
  762. function onclose () {
  763. if (rd && s._readableState && !s._readableState.ended) return onerror(PREMATURE_CLOSE)
  764. if (wr && s._writableState && !s._writableState.ended) return onerror(PREMATURE_CLOSE)
  765. }
  766. }
  767. function onerror (err) {
  768. if (!err || error) return
  769. error = err
  770. for (const s of all) {
  771. s.destroy(err)
  772. }
  773. }
  774. }
  775. function isStream (stream) {
  776. return !!stream._readableState || !!stream._writableState
  777. }
  778. function isStreamx (stream) {
  779. return typeof stream._duplexState === 'number' && isStream(stream)
  780. }
  781. function isReadStreamx (stream) {
  782. return isStreamx(stream) && stream.readable
  783. }
  784. function isTypedArray (data) {
  785. return typeof data === 'object' && data !== null && typeof data.byteLength === 'number'
  786. }
  787. function defaultByteLength (data) {
  788. return isTypedArray(data) ? data.byteLength : 1024
  789. }
  790. function noop () {}
  791. function abort () {
  792. this.destroy(new Error('Stream aborted.'))
  793. }
  794. module.exports = {
  795. pipeline,
  796. pipelinePromise,
  797. isStream,
  798. isStreamx,
  799. Stream,
  800. Writable,
  801. Readable,
  802. Duplex,
  803. Transform,
  804. // Export PassThrough for compatibility with Node.js core's stream module
  805. PassThrough
  806. }