index.js 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207
  1. 'use strict';
  2. const Stream = require('readable-stream');
  3. const isStream = require('isstream');
  4. const util = require('util');
  5. // Inherit of Readable stream
  6. util.inherits(StreamQueue, Stream.Readable);
  7. // Constructor
  8. function StreamQueue(options) {
  9. const _this = this;
  10. options = options || {};
  11. // Ensure new were used
  12. if (!(_this instanceof StreamQueue)) {
  13. return new (StreamQueue.bind.apply(
  14. StreamQueue, // eslint-disable-line
  15. [StreamQueue].concat([].slice.call(arguments, 0))
  16. ))();
  17. }
  18. // Set queue state object
  19. _this._queueState = {
  20. _pauseFlowingStream: true,
  21. _resumeFlowingStream: true,
  22. _objectMode: false,
  23. _streams: [],
  24. _running: false,
  25. _ending: false,
  26. _awaitDrain: null,
  27. _internalStream: null,
  28. _curStream: null,
  29. };
  30. // Options
  31. if (!(isStream(options) || 'function' === typeof options)) {
  32. if ('boolean' == typeof options.pauseFlowingStream) {
  33. _this._queueState._pauseFlowingStream = options.pauseFlowingStream;
  34. delete options.pauseFlowingStream;
  35. }
  36. if ('boolean' == typeof options.resumeFlowingStream) {
  37. _this._queueState._resumeFlowingStream = options.resumeFlowingStream;
  38. delete options.resumeFlowingStream;
  39. }
  40. if ('boolean' == typeof options.objectMode) {
  41. _this._queueState._objectMode = options.objectMode;
  42. }
  43. }
  44. // Prepare the stream to pipe in
  45. this._queueState._internalStream = new Stream.Writable(
  46. isStream(options) || 'function' === typeof options ? {}.undef : options
  47. );
  48. this._queueState._internalStream._write = (chunk, encoding, cb) => {
  49. if (_this.push(chunk)) {
  50. cb();
  51. return true;
  52. }
  53. _this._queueState._awaitDrain = cb;
  54. return false;
  55. };
  56. // Parent constructor
  57. Stream.Readable.call(
  58. this,
  59. isStream(options) || 'function' === typeof options ? {}.undef : options
  60. );
  61. // Queue given streams and ends
  62. if (
  63. 1 < arguments.length ||
  64. isStream(options) ||
  65. 'function' === typeof options
  66. ) {
  67. _this.done.apply(
  68. this,
  69. [].slice.call(
  70. arguments,
  71. isStream(options) || 'function' === typeof options ? 0 : 1
  72. )
  73. );
  74. }
  75. }
  76. // Queue each stream given in argument
  77. StreamQueue.prototype.queue = function sqQueue() {
  78. let streams = [].slice.call(arguments, 0);
  79. const _this = this;
  80. if (_this._queueState._ending) {
  81. throw new Error('Cannot add more streams to the queue.');
  82. }
  83. streams = streams.map(stream => {
  84. function wrapper(stream) {
  85. stream.on('error', err => {
  86. _this.emit('error', err);
  87. });
  88. if ('undefined' == typeof stream._readableState) {
  89. stream = new Stream.Readable({
  90. objectMode: _this._queueState._objectMode,
  91. }).wrap(stream);
  92. }
  93. if (
  94. _this._queueState._pauseFlowingStream &&
  95. stream._readableState.flowing
  96. ) {
  97. stream.pause();
  98. }
  99. return stream;
  100. }
  101. if ('function' === typeof stream) {
  102. return () => wrapper(stream());
  103. }
  104. return wrapper(stream);
  105. });
  106. _this._queueState._streams = _this._queueState._streams.length
  107. ? _this._queueState._streams.concat(streams)
  108. : streams;
  109. if (!_this._queueState._running) {
  110. _this._pipeNextStream();
  111. }
  112. return _this;
  113. };
  114. // Pipe the next available stream
  115. StreamQueue.prototype._read = function sqRead() {
  116. if (this._queueState._awaitDrain) {
  117. this._queueState._awaitDrain();
  118. this._queueState._awaitDrain = null;
  119. this._queueState._internalStream.emit('drain');
  120. }
  121. };
  122. // Pipe the next available stream
  123. StreamQueue.prototype._pipeNextStream = function _sqPipe() {
  124. const _this = this;
  125. if (!_this._queueState._streams.length) {
  126. if (_this._queueState._ending) {
  127. _this.push(null);
  128. } else {
  129. _this._queueState._running = false;
  130. }
  131. return;
  132. }
  133. _this._queueState._running = true;
  134. if ('function' === typeof _this._queueState._streams[0]) {
  135. _this._queueState._curStream = _this._queueState._streams.shift()();
  136. } else {
  137. _this._queueState._curStream = _this._queueState._streams.shift();
  138. }
  139. _this._queueState._curStream.once('end', () => {
  140. _this._pipeNextStream();
  141. });
  142. if (
  143. _this._queueState._resumeFlowingStream &&
  144. _this._queueState._curStream._readableState.flowing
  145. ) {
  146. _this._queueState._curStream.resume();
  147. }
  148. _this._queueState._curStream.pipe(_this._queueState._internalStream, {
  149. end: false,
  150. });
  151. };
  152. // Queue each stream given in argument
  153. StreamQueue.prototype.done = function sqDone() {
  154. const _this = this;
  155. if (_this._queueState._ending) {
  156. throw new Error('streamqueue: The queue is already ending.');
  157. }
  158. if (arguments.length) {
  159. _this.queue.apply(_this, arguments);
  160. }
  161. _this._queueState._ending = true;
  162. if (!_this._queueState._running) {
  163. _this.push(null);
  164. }
  165. return this;
  166. };
  167. // Length
  168. Object.defineProperty(StreamQueue.prototype, 'length', {
  169. get() {
  170. return (
  171. this._queueState._streams.length + (this._queueState._running ? 1 : 0)
  172. );
  173. },
  174. });
  175. StreamQueue.obj = function streamQueueObj(options) {
  176. const firstArgumentIsAStream = !options || isStream(options);
  177. const streams = [].slice.call(arguments, firstArgumentIsAStream ? 0 : 1);
  178. options = firstArgumentIsAStream ? {} : options;
  179. options.objectMode = true;
  180. return StreamQueue.apply({}.undef, [options].concat(streams));
  181. };
  182. module.exports = StreamQueue;