stream-api.js 2.4 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798
  1. 'use strict';
  2. var stream = require('readable-stream');
  3. var util = require('util');
  4. var Readable = stream.Readable;
  5. module.exports = ReaddirpReadable;
  6. util.inherits(ReaddirpReadable, Readable);
  7. function ReaddirpReadable (opts) {
  8. if (!(this instanceof ReaddirpReadable)) return new ReaddirpReadable(opts);
  9. opts = opts || {};
  10. opts.objectMode = true;
  11. Readable.call(this, opts);
  12. // backpressure not implemented at this point
  13. this.highWaterMark = Infinity;
  14. this._destroyed = false;
  15. this._paused = false;
  16. this._warnings = [];
  17. this._errors = [];
  18. this._pauseResumeErrors();
  19. }
  20. var proto = ReaddirpReadable.prototype;
  21. proto._pauseResumeErrors = function () {
  22. var self = this;
  23. self.on('pause', function () { self._paused = true });
  24. self.on('resume', function () {
  25. if (self._destroyed) return;
  26. self._paused = false;
  27. self._warnings.forEach(function (err) { self.emit('warn', err) });
  28. self._warnings.length = 0;
  29. self._errors.forEach(function (err) { self.emit('error', err) });
  30. self._errors.length = 0;
  31. })
  32. }
  33. // called for each entry
  34. proto._processEntry = function (entry) {
  35. if (this._destroyed) return;
  36. this.push(entry);
  37. }
  38. proto._read = function () { }
  39. proto.destroy = function () {
  40. // when stream is destroyed it will emit nothing further, not even errors or warnings
  41. this.push(null);
  42. this.readable = false;
  43. this._destroyed = true;
  44. this.emit('close');
  45. }
  46. proto._done = function () {
  47. this.push(null);
  48. }
  49. // we emit errors and warnings async since we may handle errors like invalid args
  50. // within the initial event loop before any event listeners subscribed
  51. proto._handleError = function (err) {
  52. var self = this;
  53. setImmediate(function () {
  54. if (self._paused) return self._warnings.push(err);
  55. if (!self._destroyed) self.emit('warn', err);
  56. });
  57. }
  58. proto._handleFatalError = function (err) {
  59. var self = this;
  60. setImmediate(function () {
  61. if (self._paused) return self._errors.push(err);
  62. if (!self._destroyed) self.emit('error', err);
  63. });
  64. }
  65. function createStreamAPI () {
  66. var stream = new ReaddirpReadable();
  67. return {
  68. stream : stream
  69. , processEntry : stream._processEntry.bind(stream)
  70. , done : stream._done.bind(stream)
  71. , handleError : stream._handleError.bind(stream)
  72. , handleFatalError : stream._handleFatalError.bind(stream)
  73. };
  74. }
  75. module.exports = createStreamAPI;