index.js 1.1 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152
  1. const { Readable } = require('streamx')
  2. module.exports = function (s, forks = 2) {
  3. const streams = new Array(forks)
  4. const status = new Array(forks).fill(true)
  5. let ended = false
  6. for (let i = 0; i < forks; i++) {
  7. streams[i] = new Readable({
  8. read (cb) {
  9. const check = !status[i]
  10. status[i] = true
  11. if (check && allReadable()) s.resume()
  12. cb(null)
  13. }
  14. })
  15. }
  16. s.on('end', function () {
  17. ended = true
  18. for (const stream of streams) stream.push(null)
  19. })
  20. s.on('error', function (err) {
  21. for (const stream of streams) stream.destroy(err)
  22. })
  23. s.on('close', function () {
  24. if (ended) return
  25. for (const stream of streams) stream.destroy()
  26. })
  27. s.on('data', function (data) {
  28. let needsPause = false
  29. for (let i = 0; i < streams.length; i++) {
  30. if (!(status[i] = streams[i].push(data))) {
  31. needsPause = true
  32. }
  33. }
  34. if (needsPause) s.pause()
  35. })
  36. return streams
  37. function allReadable () {
  38. for (let j = 0; j < status.length; j++) {
  39. if (!status[j]) return false
  40. }
  41. return true
  42. }
  43. }