index.js 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173
  1. /**
  2. * Module dependencies.
  3. */
  4. try {
  5. var EventEmitter = require('events').EventEmitter;
  6. if (!EventEmitter) throw new Error();
  7. } catch (err) {
  8. var Emitter = require('emitter');
  9. }
  10. /**
  11. * Defer.
  12. */
  13. var defer = typeof process !== 'undefined' && process && typeof process.nextTick === 'function'
  14. ? process.nextTick
  15. : function(fn){ setTimeout(fn); };
  16. /**
  17. * Noop.
  18. */
  19. function noop(){}
  20. /**
  21. * Expose `Batch`.
  22. */
  23. module.exports = Batch;
  24. /**
  25. * Create a new Batch.
  26. */
  27. function Batch() {
  28. if (!(this instanceof Batch)) return new Batch;
  29. this.fns = [];
  30. this.concurrency(Infinity);
  31. this.throws(true);
  32. for (var i = 0, len = arguments.length; i < len; ++i) {
  33. this.push(arguments[i]);
  34. }
  35. }
  36. /**
  37. * Inherit from `EventEmitter.prototype`.
  38. */
  39. if (EventEmitter) {
  40. Batch.prototype.__proto__ = EventEmitter.prototype;
  41. } else {
  42. Emitter(Batch.prototype);
  43. }
  44. /**
  45. * Set concurrency to `n`.
  46. *
  47. * @param {Number} n
  48. * @return {Batch}
  49. * @api public
  50. */
  51. Batch.prototype.concurrency = function(n){
  52. this.n = n;
  53. return this;
  54. };
  55. /**
  56. * Queue a function.
  57. *
  58. * @param {Function} fn
  59. * @return {Batch}
  60. * @api public
  61. */
  62. Batch.prototype.push = function(fn){
  63. this.fns.push(fn);
  64. return this;
  65. };
  66. /**
  67. * Set wether Batch will or will not throw up.
  68. *
  69. * @param {Boolean} throws
  70. * @return {Batch}
  71. * @api public
  72. */
  73. Batch.prototype.throws = function(throws) {
  74. this.e = !!throws;
  75. return this;
  76. };
  77. /**
  78. * Execute all queued functions in parallel,
  79. * executing `cb(err, results)`.
  80. *
  81. * @param {Function} cb
  82. * @return {Batch}
  83. * @api public
  84. */
  85. Batch.prototype.end = function(cb){
  86. var self = this
  87. , total = this.fns.length
  88. , pending = total
  89. , results = []
  90. , errors = []
  91. , cb = cb || noop
  92. , fns = this.fns
  93. , max = this.n
  94. , throws = this.e
  95. , index = 0
  96. , done;
  97. // empty
  98. if (!fns.length) return defer(function(){
  99. cb(null, results);
  100. });
  101. // process
  102. function next() {
  103. var i = index++;
  104. var fn = fns[i];
  105. if (!fn) return;
  106. var start = new Date;
  107. try {
  108. fn(callback);
  109. } catch (err) {
  110. callback(err);
  111. }
  112. function callback(err, res){
  113. if (done) return;
  114. if (err && throws) return done = true, defer(function(){
  115. cb(err);
  116. });
  117. var complete = total - pending + 1;
  118. var end = new Date;
  119. results[i] = res;
  120. errors[i] = err;
  121. self.emit('progress', {
  122. index: i,
  123. value: res,
  124. error: err,
  125. pending: pending,
  126. total: total,
  127. complete: complete,
  128. percent: complete / total * 100 | 0,
  129. start: start,
  130. end: end,
  131. duration: end - start
  132. });
  133. if (--pending) next();
  134. else defer(function(){
  135. if(!throws) cb(errors, results);
  136. else cb(null, results);
  137. });
  138. }
  139. }
  140. // concurrency
  141. for (var i = 0; i < fns.length; i++) {
  142. if (i == max) break;
  143. next();
  144. }
  145. return this;
  146. };