index.js 1.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899
  1. var Readable = require('readable-stream/readable');
  2. var util = require('util');
  3. function isReadable(stream) {
  4. if (typeof stream.pipe !== 'function') {
  5. return false;
  6. }
  7. if (!stream.readable) {
  8. return false;
  9. }
  10. if (typeof stream._read !== 'function') {
  11. return false;
  12. }
  13. if (!stream._readableState) {
  14. return false;
  15. }
  16. return true;
  17. }
  18. function addStream (streams, stream) {
  19. if (!isReadable(stream)) {
  20. throw new Error('All input streams must be readable');
  21. }
  22. var self = this;
  23. stream._buffer = [];
  24. stream.on('readable', function () {
  25. var chunk = stream.read();
  26. while (chunk) {
  27. if (this === streams[0]) {
  28. self.push(chunk);
  29. } else {
  30. this._buffer.push(chunk);
  31. }
  32. chunk = stream.read();
  33. }
  34. });
  35. stream.on('end', function () {
  36. for (var stream = streams[0];
  37. stream && stream._readableState.ended;
  38. stream = streams[0]) {
  39. while (stream._buffer.length) {
  40. self.push(stream._buffer.shift());
  41. }
  42. streams.shift();
  43. }
  44. if (!streams.length) {
  45. self.push(null);
  46. }
  47. });
  48. stream.on('error', this.emit.bind(this, 'error'));
  49. streams.push(stream);
  50. }
  51. function OrderedStreams (streams, options) {
  52. if (!(this instanceof(OrderedStreams))) {
  53. return new OrderedStreams(streams, options);
  54. }
  55. streams = streams || [];
  56. options = options || {};
  57. options.objectMode = true;
  58. Readable.call(this, options);
  59. if (!Array.isArray(streams)) {
  60. streams = [streams];
  61. }
  62. if (!streams.length) {
  63. return this.push(null); // no streams, close
  64. }
  65. var addStreamBinded = addStream.bind(this, []);
  66. streams.forEach(function (item) {
  67. if (Array.isArray(item)) {
  68. item.forEach(addStreamBinded);
  69. } else {
  70. addStreamBinded(item);
  71. }
  72. });
  73. }
  74. util.inherits(OrderedStreams, Readable);
  75. OrderedStreams.prototype._read = function () {};
  76. module.exports = OrderedStreams;