const { Readable } = require('streamx') module.exports = function (s, forks = 2) { const streams = new Array(forks) const status = new Array(forks).fill(true) let ended = false for (let i = 0; i < forks; i++) { streams[i] = new Readable({ read (cb) { const check = !status[i] status[i] = true if (check && allReadable()) s.resume() cb(null) } }) } s.on('end', function () { ended = true for (const stream of streams) stream.push(null) }) s.on('error', function (err) { for (const stream of streams) stream.destroy(err) }) s.on('close', function () { if (ended) return for (const stream of streams) stream.destroy() }) s.on('data', function (data) { let needsPause = false for (let i = 0; i < streams.length; i++) { if (!(status[i] = streams[i].push(data))) { needsPause = true } } if (needsPause) s.pause() }) return streams function allReadable () { for (let j = 0; j < status.length; j++) { if (!status[j]) return false } return true } }