12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152 |
- const { Readable } = require('streamx')
- module.exports = function (s, forks = 2) {
- const streams = new Array(forks)
- const status = new Array(forks).fill(true)
- let ended = false
- for (let i = 0; i < forks; i++) {
- streams[i] = new Readable({
- read (cb) {
- const check = !status[i]
- status[i] = true
- if (check && allReadable()) s.resume()
- cb(null)
- }
- })
- }
- s.on('end', function () {
- ended = true
- for (const stream of streams) stream.push(null)
- })
- s.on('error', function (err) {
- for (const stream of streams) stream.destroy(err)
- })
- s.on('close', function () {
- if (ended) return
- for (const stream of streams) stream.destroy()
- })
- s.on('data', function (data) {
- let needsPause = false
- for (let i = 0; i < streams.length; i++) {
- if (!(status[i] = streams[i].push(data))) {
- needsPause = true
- }
- }
- if (needsPause) s.pause()
- })
- return streams
- function allReadable () {
- for (let j = 0; j < status.length; j++) {
- if (!status[j]) return false
- }
- return true
- }
- }
|