processAsyncTree.js 1.4 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162
  1. /*
  2. MIT License http://www.opensource.org/licenses/mit-license.php
  3. Author Tobias Koppers @sokra
  4. */
  5. "use strict";
  6. /**
  7. * @template T
  8. * @template {Error} E
  9. * @param {Iterable<T>} items initial items
  10. * @param {number} concurrency number of items running in parallel
  11. * @param {function(T, function(T): void, function(E=): void): void} processor worker which pushes more items
  12. * @param {function(E=): void} callback all items processed
  13. * @returns {void}
  14. */
  15. const processAsyncTree = (items, concurrency, processor, callback) => {
  16. const queue = Array.from(items);
  17. if (queue.length === 0) return callback();
  18. let processing = 0;
  19. let finished = false;
  20. let processScheduled = true;
  21. const push = item => {
  22. queue.push(item);
  23. if (!processScheduled && processing < concurrency) {
  24. processScheduled = true;
  25. process.nextTick(processQueue);
  26. }
  27. };
  28. const processorCallback = err => {
  29. processing--;
  30. if (err && !finished) {
  31. finished = true;
  32. callback(err);
  33. return;
  34. }
  35. if (!processScheduled) {
  36. processScheduled = true;
  37. process.nextTick(processQueue);
  38. }
  39. };
  40. const processQueue = () => {
  41. if (finished) return;
  42. while (processing < concurrency && queue.length > 0) {
  43. processing++;
  44. const item = queue.pop();
  45. processor(item, push, processorCallback);
  46. }
  47. processScheduled = false;
  48. if (queue.length === 0 && processing === 0 && !finished) {
  49. finished = true;
  50. callback();
  51. }
  52. };
  53. processQueue();
  54. };
  55. module.exports = processAsyncTree;