queue

Asynchronous function queue with adjustable concurrency

README

  1. ```
  2.    ____  __  _____  __  _____
  3.   / __ `/ / / / _ \/ / / / _ \
  4. / /_/ / /_/ /  __/ /_/ /  __/
  5. \__, /\__,_/\___/\__,_/\___/
  6.    /_/

  7. ```

Asynchronous function queue with adjustable concurrency.

This module exports a class Queuethat implements most of the ArrayAPI. Pass async functions (ones that accept a callback or return a promise) to an instance's additive array methods. Processing begins when you call q.start().

Example


Do npm run exampleor npm run devand open the example directory (and your console) to run the following program:

  1. ``` js
  2. import Queue from 'queue'

  3. const q = new Queue({ results: [] })

  4. // add jobs using the familiar Array API
  5. q.push(cb => {
  6.   const result = 'two'
  7.   cb(null, result)
  8. })

  9. q.push(
  10.   cb => {
  11.     const result = 'four'
  12.     cb(null, result)
  13.   },
  14.   cb => {
  15.     const result = 'five'
  16.     cb(null, result)
  17.   }
  18. )

  19. // jobs can accept a callback or return a promise
  20. q.push(() => {
  21.   return new Promise((resolve, reject) => {
  22.     const result = 'one'
  23.     resolve(result)
  24.   })
  25. })

  26. q.unshift(cb => {
  27.   const result = 'one'
  28.   cb(null, result)
  29. })

  30. q.splice(2, 0, cb => {
  31.   const result = 'three'
  32.   cb(null, result)
  33. })

  34. // use the timeout feature to deal with jobs that
  35. // take too long or forget to execute a callback
  36. q.timeout = 100

  37. q.addEventListener('timeout', e => {
  38.   console.log('job timed out:', e.detail.job.toString().replace(/\n/g, ''))
  39.   e.detail.next()
  40. })

  41. q.push(cb => {
  42.   setTimeout(() => {
  43.     console.log('slow job finished')
  44.     cb()
  45.   }, 200)
  46. })

  47. q.push(cb => {
  48.   console.log('forgot to execute callback')
  49. })

  50. // jobs can also override the queue's timeout
  51. // on a per-job basis
  52. function extraSlowJob (cb) {
  53.   setTimeout(() => {
  54.     console.log('extra slow job finished')
  55.     cb()
  56.   }, 400)
  57. }

  58. extraSlowJob.timeout = 500
  59. q.push(extraSlowJob)

  60. // jobs can also opt-out of the timeout altogether
  61. function superSlowJob (cb) {
  62.   setTimeout(() => {
  63.     console.log('super slow job finished')
  64.     cb()
  65.   }, 1000)
  66. }

  67. superSlowJob.timeout = null
  68. q.push(superSlowJob)

  69. // get notified when jobs complete
  70. q.addEventListener('success', e => {
  71.   console.log('job finished processing:', e.detail.toString().replace(/\n/g, ''))
  72.   console.log('The result is:', e.detail.result)
  73. })

  74. // begin processing, get notified on end / failure
  75. q.start(err => {
  76.   if (err) throw err
  77.   console.log('all done:', q.results)
  78. })
  79. ```

Install


  1. ``` null
  2. npm install queue

  3. yarn add queue

  4. ```

Test


  1. ``` null
  2. npm test

  3. npm run dev // for testing in a browser, open test directory (and your console)

  4. ```

API


const q = new Queue([opts])


Constructor. optsmay contain initial values for:

q.concurrency
q.timeout
q.autostart
q.results

Instance methods


q.start([cb])


Explicitly starts processing jobs and provides feedback to the caller when the queue empties or an error occurs. If cb is not passed a promise will be returned.

q.stop()


Stops the queue. can be resumed with q.start().

q.end([err])


Stop and empty the queue immediately.

Instance methods mixed in from Array


Mozilla has docs on how these methods work here . Note that slicedoes not copy the queue.

q.push(element1, ..., elementN)


q.unshift(element1, ..., elementN)


q.splice(index , howMany[, element1[, ...[, elementN]]])


q.pop()


q.shift()


q.slice(begin[, end])


q.reverse()


q.indexOf(searchElement[, fromIndex])


q.lastIndexOf(searchElement[, fromIndex])


Properties


q.concurrency


Max number of jobs the queue should process concurrently, defaults to Infinity.

q.timeout


Milliseconds to wait for a job to execute its callback. This can be overridden by specifying a timeoutproperty on a per-job basis.

q.autostart


Ensures the queue is always running if jobs are available. Useful in situations where you are using a queue only for concurrency control.

q.results


An array to set job callback arguments on.

q.length


Jobs pending + jobs to process (readonly).

Events


q.dispatchEvent(new QueueEvent('start', { job }))


Immediately before a job begins to execute.

q.dispatchEvent(new QueueEvent('success', { result: [...result], job }))


After a job executes its callback.

q.dispatchEvent(new QueueEvent('error', { err, job }))


After a job passes an error to its callback.

q.dispatchEvent(new QueueEvent('timeout', { next, job }))


After q.timeoutmilliseconds have elapsed and a job has not executed its callback.

q.dispatchEvent(new QueueEvent('end', { err }))


After all jobs have been processed

Releases


The latest stable release is published to npm . Abbreviated changelog below:

7.0
Modernized codebase, added new maintainer (@MaksimLavrenyuk)

6.0
Add startevent before job begins (@joelgriffith)
Add timeoutproperty on a job to override the queue's timeout (@joelgriffith)

5.0
Updated TypeScript bindings (@Codex-)

4.4
Add results feature

4.3
Add promise support (@kwolfy)

4.2
Unref timers on end

4.1
Add autostart feature

4.0
Change license to MIT

3.1.x
Add .npmignore

3.0.x
Change the default concurrency to Infinity
Allow q.start()to accept an optional callback executed on q.emit('end')

2.x
Major api changes / not backwards compatible with 1.x

1.x
Early prototype

License


MIT