AsyncQueue.js 9.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373
  1. /*
  2. MIT License http://www.opensource.org/licenses/mit-license.php
  3. Author Tobias Koppers @sokra
  4. */
  5. "use strict";
  6. const { SyncHook, AsyncSeriesHook } = require("tapable");
  7. const { makeWebpackError } = require("../HookWebpackError");
  8. const WebpackError = require("../WebpackError");
  9. const ArrayQueue = require("./ArrayQueue");
  10. const QUEUED_STATE = 0;
  11. const PROCESSING_STATE = 1;
  12. const DONE_STATE = 2;
  13. let inHandleResult = 0;
  14. /**
  15. * @template T
  16. * @callback Callback
  17. * @param {(WebpackError | null)=} err
  18. * @param {T=} result
  19. */
  20. /**
  21. * @template T
  22. * @template K
  23. * @template R
  24. */
  25. class AsyncQueueEntry {
  26. /**
  27. * @param {T} item the item
  28. * @param {Callback<R>} callback the callback
  29. */
  30. constructor(item, callback) {
  31. this.item = item;
  32. /** @type {typeof QUEUED_STATE | typeof PROCESSING_STATE | typeof DONE_STATE} */
  33. this.state = QUEUED_STATE;
  34. this.callback = callback;
  35. /** @type {Callback<R>[] | undefined} */
  36. this.callbacks = undefined;
  37. this.result = undefined;
  38. /** @type {WebpackError | undefined} */
  39. this.error = undefined;
  40. }
  41. }
  42. /**
  43. * @template T
  44. * @template K
  45. * @template R
  46. */
  47. class AsyncQueue {
  48. /**
  49. * @param {Object} options options object
  50. * @param {string=} options.name name of the queue
  51. * @param {number=} options.parallelism how many items should be processed at once
  52. * @param {AsyncQueue<any, any, any>=} options.parent parent queue, which will have priority over this queue and with shared parallelism
  53. * @param {function(T): K=} options.getKey extract key from item
  54. * @param {function(T, Callback<R>): void} options.processor async function to process items
  55. */
  56. constructor({ name, parallelism, parent, processor, getKey }) {
  57. this._name = name;
  58. this._parallelism = parallelism || 1;
  59. this._processor = processor;
  60. this._getKey =
  61. getKey || /** @type {(T) => K} */ (item => /** @type {any} */ (item));
  62. /** @type {Map<K, AsyncQueueEntry<T, K, R>>} */
  63. this._entries = new Map();
  64. /** @type {ArrayQueue<AsyncQueueEntry<T, K, R>>} */
  65. this._queued = new ArrayQueue();
  66. /** @type {AsyncQueue<any, any, any>[]} */
  67. this._children = undefined;
  68. this._activeTasks = 0;
  69. this._willEnsureProcessing = false;
  70. this._needProcessing = false;
  71. this._stopped = false;
  72. this._root = parent ? parent._root : this;
  73. if (parent) {
  74. if (this._root._children === undefined) {
  75. this._root._children = [this];
  76. } else {
  77. this._root._children.push(this);
  78. }
  79. }
  80. this.hooks = {
  81. /** @type {AsyncSeriesHook<[T]>} */
  82. beforeAdd: new AsyncSeriesHook(["item"]),
  83. /** @type {SyncHook<[T]>} */
  84. added: new SyncHook(["item"]),
  85. /** @type {AsyncSeriesHook<[T]>} */
  86. beforeStart: new AsyncSeriesHook(["item"]),
  87. /** @type {SyncHook<[T]>} */
  88. started: new SyncHook(["item"]),
  89. /** @type {SyncHook<[T, Error, R]>} */
  90. result: new SyncHook(["item", "error", "result"])
  91. };
  92. this._ensureProcessing = this._ensureProcessing.bind(this);
  93. }
  94. /**
  95. * @param {T} item an item
  96. * @param {Callback<R>} callback callback function
  97. * @returns {void}
  98. */
  99. add(item, callback) {
  100. if (this._stopped) return callback(new WebpackError("Queue was stopped"));
  101. this.hooks.beforeAdd.callAsync(item, err => {
  102. if (err) {
  103. callback(
  104. makeWebpackError(err, `AsyncQueue(${this._name}).hooks.beforeAdd`)
  105. );
  106. return;
  107. }
  108. const key = this._getKey(item);
  109. const entry = this._entries.get(key);
  110. if (entry !== undefined) {
  111. if (entry.state === DONE_STATE) {
  112. if (inHandleResult++ > 3) {
  113. process.nextTick(() => callback(entry.error, entry.result));
  114. } else {
  115. callback(entry.error, entry.result);
  116. }
  117. inHandleResult--;
  118. } else if (entry.callbacks === undefined) {
  119. entry.callbacks = [callback];
  120. } else {
  121. entry.callbacks.push(callback);
  122. }
  123. return;
  124. }
  125. const newEntry = new AsyncQueueEntry(item, callback);
  126. if (this._stopped) {
  127. this.hooks.added.call(item);
  128. this._root._activeTasks++;
  129. process.nextTick(() =>
  130. this._handleResult(newEntry, new WebpackError("Queue was stopped"))
  131. );
  132. } else {
  133. this._entries.set(key, newEntry);
  134. this._queued.enqueue(newEntry);
  135. const root = this._root;
  136. root._needProcessing = true;
  137. if (root._willEnsureProcessing === false) {
  138. root._willEnsureProcessing = true;
  139. setImmediate(root._ensureProcessing);
  140. }
  141. this.hooks.added.call(item);
  142. }
  143. });
  144. }
  145. /**
  146. * @param {T} item an item
  147. * @returns {void}
  148. */
  149. invalidate(item) {
  150. const key = this._getKey(item);
  151. const entry = this._entries.get(key);
  152. this._entries.delete(key);
  153. if (entry.state === QUEUED_STATE) {
  154. this._queued.delete(entry);
  155. }
  156. }
  157. /**
  158. * Waits for an already started item
  159. * @param {T} item an item
  160. * @param {Callback<R>} callback callback function
  161. * @returns {void}
  162. */
  163. waitFor(item, callback) {
  164. const key = this._getKey(item);
  165. const entry = this._entries.get(key);
  166. if (entry === undefined) {
  167. return callback(
  168. new WebpackError(
  169. "waitFor can only be called for an already started item"
  170. )
  171. );
  172. }
  173. if (entry.state === DONE_STATE) {
  174. process.nextTick(() => callback(entry.error, entry.result));
  175. } else if (entry.callbacks === undefined) {
  176. entry.callbacks = [callback];
  177. } else {
  178. entry.callbacks.push(callback);
  179. }
  180. }
  181. /**
  182. * @returns {void}
  183. */
  184. stop() {
  185. this._stopped = true;
  186. const queue = this._queued;
  187. this._queued = new ArrayQueue();
  188. const root = this._root;
  189. for (const entry of queue) {
  190. this._entries.delete(this._getKey(entry.item));
  191. root._activeTasks++;
  192. this._handleResult(entry, new WebpackError("Queue was stopped"));
  193. }
  194. }
  195. /**
  196. * @returns {void}
  197. */
  198. increaseParallelism() {
  199. const root = this._root;
  200. root._parallelism++;
  201. /* istanbul ignore next */
  202. if (root._willEnsureProcessing === false && root._needProcessing) {
  203. root._willEnsureProcessing = true;
  204. setImmediate(root._ensureProcessing);
  205. }
  206. }
  207. /**
  208. * @returns {void}
  209. */
  210. decreaseParallelism() {
  211. const root = this._root;
  212. root._parallelism--;
  213. }
  214. /**
  215. * @param {T} item an item
  216. * @returns {boolean} true, if the item is currently being processed
  217. */
  218. isProcessing(item) {
  219. const key = this._getKey(item);
  220. const entry = this._entries.get(key);
  221. return entry !== undefined && entry.state === PROCESSING_STATE;
  222. }
  223. /**
  224. * @param {T} item an item
  225. * @returns {boolean} true, if the item is currently queued
  226. */
  227. isQueued(item) {
  228. const key = this._getKey(item);
  229. const entry = this._entries.get(key);
  230. return entry !== undefined && entry.state === QUEUED_STATE;
  231. }
  232. /**
  233. * @param {T} item an item
  234. * @returns {boolean} true, if the item is currently queued
  235. */
  236. isDone(item) {
  237. const key = this._getKey(item);
  238. const entry = this._entries.get(key);
  239. return entry !== undefined && entry.state === DONE_STATE;
  240. }
  241. /**
  242. * @returns {void}
  243. */
  244. _ensureProcessing() {
  245. while (this._activeTasks < this._parallelism) {
  246. const entry = this._queued.dequeue();
  247. if (entry === undefined) break;
  248. this._activeTasks++;
  249. entry.state = PROCESSING_STATE;
  250. this._startProcessing(entry);
  251. }
  252. this._willEnsureProcessing = false;
  253. if (this._queued.length > 0) return;
  254. if (this._children !== undefined) {
  255. for (const child of this._children) {
  256. while (this._activeTasks < this._parallelism) {
  257. const entry = child._queued.dequeue();
  258. if (entry === undefined) break;
  259. this._activeTasks++;
  260. entry.state = PROCESSING_STATE;
  261. child._startProcessing(entry);
  262. }
  263. if (child._queued.length > 0) return;
  264. }
  265. }
  266. if (!this._willEnsureProcessing) this._needProcessing = false;
  267. }
  268. /**
  269. * @param {AsyncQueueEntry<T, K, R>} entry the entry
  270. * @returns {void}
  271. */
  272. _startProcessing(entry) {
  273. this.hooks.beforeStart.callAsync(entry.item, err => {
  274. if (err) {
  275. this._handleResult(
  276. entry,
  277. makeWebpackError(err, `AsyncQueue(${this._name}).hooks.beforeStart`)
  278. );
  279. return;
  280. }
  281. let inCallback = false;
  282. try {
  283. this._processor(entry.item, (e, r) => {
  284. inCallback = true;
  285. this._handleResult(entry, e, r);
  286. });
  287. } catch (err) {
  288. if (inCallback) throw err;
  289. this._handleResult(entry, err, null);
  290. }
  291. this.hooks.started.call(entry.item);
  292. });
  293. }
  294. /**
  295. * @param {AsyncQueueEntry<T, K, R>} entry the entry
  296. * @param {WebpackError=} err error, if any
  297. * @param {R=} result result, if any
  298. * @returns {void}
  299. */
  300. _handleResult(entry, err, result) {
  301. this.hooks.result.callAsync(entry.item, err, result, hookError => {
  302. const error = hookError
  303. ? makeWebpackError(hookError, `AsyncQueue(${this._name}).hooks.result`)
  304. : err;
  305. const callback = entry.callback;
  306. const callbacks = entry.callbacks;
  307. entry.state = DONE_STATE;
  308. entry.callback = undefined;
  309. entry.callbacks = undefined;
  310. entry.result = result;
  311. entry.error = error;
  312. const root = this._root;
  313. root._activeTasks--;
  314. if (root._willEnsureProcessing === false && root._needProcessing) {
  315. root._willEnsureProcessing = true;
  316. setImmediate(root._ensureProcessing);
  317. }
  318. if (inHandleResult++ > 3) {
  319. process.nextTick(() => {
  320. callback(error, result);
  321. if (callbacks !== undefined) {
  322. for (const callback of callbacks) {
  323. callback(error, result);
  324. }
  325. }
  326. });
  327. } else {
  328. callback(error, result);
  329. if (callbacks !== undefined) {
  330. for (const callback of callbacks) {
  331. callback(error, result);
  332. }
  333. }
  334. }
  335. inHandleResult--;
  336. });
  337. }
  338. clear() {
  339. this._entries.clear();
  340. this._queued.clear();
  341. this._activeTasks = 0;
  342. this._willEnsureProcessing = false;
  343. this._needProcessing = false;
  344. this._stopped = false;
  345. }
  346. }
  347. module.exports = AsyncQueue;