123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283 |
- 'use strict'
- /* eslint-disable no-var */
- var reusify = require('reusify')
- function fastqueue (context, worker, concurrency) {
- if (typeof context === 'function') {
- concurrency = worker
- worker = context
- context = null
- }
- if (concurrency < 1) {
- throw new Error('fastqueue concurrency must be greater than 1')
- }
- var cache = reusify(Task)
- var queueHead = null
- var queueTail = null
- var _running = 0
- var errorHandler = null
- var self = {
- push: push,
- drain: noop,
- saturated: noop,
- pause: pause,
- paused: false,
- concurrency: concurrency,
- running: running,
- resume: resume,
- idle: idle,
- length: length,
- getQueue: getQueue,
- unshift: unshift,
- empty: noop,
- kill: kill,
- killAndDrain: killAndDrain,
- error: error
- }
- return self
- function running () {
- return _running
- }
- function pause () {
- self.paused = true
- }
- function length () {
- var current = queueHead
- var counter = 0
- while (current) {
- current = current.next
- counter++
- }
- return counter
- }
- function getQueue () {
- var current = queueHead
- var tasks = []
- while (current) {
- tasks.push(current.value)
- current = current.next
- }
- return tasks
- }
- function resume () {
- if (!self.paused) return
- self.paused = false
- for (var i = 0; i < self.concurrency; i++) {
- _running++
- release()
- }
- }
- function idle () {
- return _running === 0 && self.length() === 0
- }
- function push (value, done) {
- var current = cache.get()
- current.context = context
- current.release = release
- current.value = value
- current.callback = done || noop
- current.errorHandler = errorHandler
- if (_running === self.concurrency || self.paused) {
- if (queueTail) {
- queueTail.next = current
- queueTail = current
- } else {
- queueHead = current
- queueTail = current
- self.saturated()
- }
- } else {
- _running++
- worker.call(context, current.value, current.worked)
- }
- }
- function unshift (value, done) {
- var current = cache.get()
- current.context = context
- current.release = release
- current.value = value
- current.callback = done || noop
- if (_running === self.concurrency || self.paused) {
- if (queueHead) {
- current.next = queueHead
- queueHead = current
- } else {
- queueHead = current
- queueTail = current
- self.saturated()
- }
- } else {
- _running++
- worker.call(context, current.value, current.worked)
- }
- }
- function release (holder) {
- if (holder) {
- cache.release(holder)
- }
- var next = queueHead
- if (next) {
- if (!self.paused) {
- if (queueTail === queueHead) {
- queueTail = null
- }
- queueHead = next.next
- next.next = null
- worker.call(context, next.value, next.worked)
- if (queueTail === null) {
- self.empty()
- }
- } else {
- _running--
- }
- } else if (--_running === 0) {
- self.drain()
- }
- }
- function kill () {
- queueHead = null
- queueTail = null
- self.drain = noop
- }
- function killAndDrain () {
- queueHead = null
- queueTail = null
- self.drain()
- self.drain = noop
- }
- function error (handler) {
- errorHandler = handler
- }
- }
- function noop () {}
- function Task () {
- this.value = null
- this.callback = noop
- this.next = null
- this.release = noop
- this.context = null
- this.errorHandler = null
- var self = this
- this.worked = function worked (err, result) {
- var callback = self.callback
- var errorHandler = self.errorHandler
- var val = self.value
- self.value = null
- self.callback = noop
- if (self.errorHandler) {
- errorHandler(err, val)
- }
- callback.call(self.context, err, result)
- self.release(self)
- }
- }
- function queueAsPromised (context, worker, concurrency) {
- if (typeof context === 'function') {
- concurrency = worker
- worker = context
- context = null
- }
- function asyncWrapper (arg, cb) {
- worker.call(this, arg)
- .then(function (res) {
- cb(null, res)
- }, cb)
- }
- var queue = fastqueue(context, asyncWrapper, concurrency)
- var pushCb = queue.push
- var unshiftCb = queue.unshift
- queue.push = push
- queue.unshift = unshift
- queue.drained = drained
- return queue
- function push (value) {
- var p = new Promise(function (resolve, reject) {
- pushCb(value, function (err, result) {
- if (err) {
- reject(err)
- return
- }
- resolve(result)
- })
- })
- // Let's fork the promise chain to
- // make the error bubble up to the user but
- // not lead to a unhandledRejection
- p.catch(noop)
- return p
- }
- function unshift (value) {
- var p = new Promise(function (resolve, reject) {
- unshiftCb(value, function (err, result) {
- if (err) {
- reject(err)
- return
- }
- resolve(result)
- })
- })
- // Let's fork the promise chain to
- // make the error bubble up to the user but
- // not lead to a unhandledRejection
- p.catch(noop)
- return p
- }
- function drained () {
- var previousDrain = queue.drain
- var p = new Promise(function (resolve) {
- queue.drain = function () {
- previousDrain()
- resolve()
- }
- })
- return p
- }
- }
- module.exports = fastqueue
- module.exports.promise = queueAsPromised
|