toad-scheduler

In-memory Node.js and browser job scheduler

README

toad-scheduler


In-memory TypeScript job scheduler that repeatedly executes given tasks within specified intervals of time (e. g. "each 20 seconds"). Cron syntax is also supported in case you need it.

Node.js 12+ and modern browsers are supported

Getting started


First install the package:

  1. ```bash
  2. npm i toad-scheduler
  3. ```

Next, set up your jobs:

  1. ```js
  2. const { ToadScheduler, SimpleIntervalJob, Task } = require('toad-scheduler')

  3. const scheduler = new ToadScheduler()

  4. const task = new Task('simple task', () => { counter++ })
  5. const job = new SimpleIntervalJob({ seconds: 20, }, task)

  6. scheduler.addSimpleIntervalJob(job)

  7. // when stopping your app
  8. scheduler.stop()
  9. ```


Usage with async tasks


In order to avoid unhandled rejections, make sure to use AsyncTask if your task is asynchronous:

  1. ```js
  2. const { ToadScheduler, SimpleIntervalJob, AsyncTask } = require('toad-scheduler')

  3. const scheduler = new ToadScheduler()

  4. const task = new AsyncTask(
  5.     'simple task',
  6.     () => { return db.pollForSomeData().then((result) => { /* continue the promise chain */ }) },
  7.     (err: Error) => { /* handle error here */ }
  8. )
  9. const job = new SimpleIntervalJob({ seconds: 20, }, task)

  10. scheduler.addSimpleIntervalJob(job)

  11. // when stopping your app
  12. scheduler.stop()
  13. ```

Note that in order to avoid memory leaks, it is recommended to use promise chains instead of async/await inside task definition. See talk on common Promise mistakes for more details.

Asynchronous error handling


Note that your error handlers can be asynchronous and return a promise. In such case an additional catch block will be attached to them, and should
there be an error while trying to resolve that promise, and logging error will be logged using the default error handler (console.error).

Preventing task run overruns


In case you want to prevent second instance of a task from being fired up while first one is still executing, you can use preventOverrun options:
  1. ```js
  2. import { ToadScheduler, SimpleIntervalJob, Task } from 'toad-scheduler';

  3. const scheduler = new ToadScheduler();

  4. const task = new Task('simple task', () => {
  5.     // if this task runs long, second one won't be started until this one concludes
  6. console.log('Task triggered');
  7. });

  8. const job = new SimpleIntervalJob(
  9. { seconds: 20, runImmediately: true },
  10. task,
  11.     {
  12.         id: 'id_1',
  13.         preventOverrun: true,
  14.     }
  15. );

  16. //create and start jobs
  17. scheduler.addSimpleIntervalJob(job);
  18. ```

Using IDs and ES6-style imports


You can attach IDs to tasks to identify them later. This is helpful in projects that run a lot of tasks and especially if you want to target some of the tasks specifically (e. g. in order to stop or restart them, or to check their status).

  1. ```js
  2. import { ToadScheduler, SimpleIntervalJob, Task } from 'toad-scheduler';

  3. const scheduler = new ToadScheduler();

  4. const task = new Task('simple task', () => {
  5. console.log('Task triggered');
  6. });

  7. const job1 = new SimpleIntervalJob(
  8. { seconds: 20, runImmediately: true },
  9. task,
  10.     { id: 'id_1' }
  11. );

  12. const job2 = new SimpleIntervalJob(
  13. { seconds: 15, runImmediately: true },
  14. task,
  15.     { id: 'id_2' }
  16. );

  17. //create and start jobs
  18. scheduler.addSimpleIntervalJob(job1);
  19. scheduler.addSimpleIntervalJob(job2);

  20. // stop job with ID: id_2
  21. scheduler.stopById('id_2');

  22. // remove job with ID: id_1
  23. scheduler.removeById('id_1');

  24. // check status of jobs
  25. console.log(scheduler.getById('id_1').getStatus()); // returns Error (job not found)

  26. console.log(scheduler.getById('id_2').getStatus()); // returns "stopped" and can be started again

  27. ```

Cron support


You can use CronJob instances for handling Cron-style scheduling:
  1. ```ts
  2.       const task = new AsyncTask('simple task', () => {
  3.         // Execute your asynchronous logic here
  4.       })
  5.       const job = new CronJob(
  6.         {
  7.           cronExpression: '*/2 * * * * *',
  8.         },
  9.         task,
  10.         {
  11.           preventOverrun: true,
  12.         }
  13.       )
  14.       scheduler.addCronJob(job)
  15. ```

Note that you need to install "croner" library for this to work. Run npm i croner in order to install this dependency.

Usage in clustered environments


toad-scheduler does not persist its state by design, and has no out-of-the-box concurrency management features. In case it is necessary
to prevent parallel execution of jobs in clustered environment, it is highly recommended to use redis-semaphore in your tasks.

Here is an example:

  1. ```ts
  2. import { randomUUID } from 'node:crypto'

  3. import type { Redis } from 'ioredis'
  4. import type { LockOptions } from 'redis-semaphore'
  5. import { Mutex } from 'redis-semaphore'
  6. import { AsyncTask } from 'toad-scheduler';

  7. export type BackgroundJobConfiguration = {
  8.     jobId: string
  9. }

  10. export type LockConfiguration = {
  11.     lockName?: string
  12.     refreshInterval?: number
  13.     lockTimeout: number
  14. }

  15. // Abstract Job
  16. export abstract class AbstractBackgroundJob {
  17.     public readonly jobId: string
  18.     protected readonly redis: Redis

  19.     protected constructor(
  20.         options: BackgroundJobConfiguration,
  21.         redis: Redis,
  22.     ) {
  23.         this.jobId = options.jobId
  24.         this.redis = redis
  25.     }

  26.     protected abstract processInternal(executionUuid: string): Promise<void>

  27.     async process() {
  28.         const uuid = randomUUID()

  29.         try {
  30.             await this.processInternal(uuid)
  31.         } catch (err) {
  32.             console.error(logObject)
  33.         }
  34.     }

  35.     protected getJobMutex(key: string, options: LockOptions) {
  36.         return new Mutex(this.redis, this.getJobLockName(key), options)
  37.     }

  38.     protected async tryAcquireExclusiveLock(lockConfiguration: LockConfiguration) {
  39.         const mutex = this.getJobMutex(lockConfiguration.lockName ?? 'exclusive', {
  40.             acquireAttemptsLimit: 1,
  41.             refreshInterval: lockConfiguration.refreshInterval,
  42.             lockTimeout: lockConfiguration.lockTimeout,
  43.         })

  44.         const lock = await mutex.tryAcquire()
  45.         // If someone else already has this lock, skip
  46.         if (!lock) {
  47.             return
  48.         }

  49.         return mutex
  50.     }

  51.     protected getJobLockName(key: string) {
  52.         return `${this.jobId}:locks:${key}`
  53.     }
  54. }

  55. // Job example

  56. const LOCK_TIMEOUT_IN_MSECS = 60 * 1000
  57. const LOCK_REFRESH_IN_MSECS = 10 * 1000

  58. export class SampleJob extends AbstractBackgroundJob {
  59.   constructor(redis: Redis) {
  60.     super(
  61.       {
  62.         jobId: 'SampleJob',
  63.       },
  64.       redis,
  65.     )
  66.   }

  67.   protected async processInternal(executionUuid: string): Promise<void> {
  68.     // We only want a single instance of this job running in entire cluster, let's see if someone else is already processing it
  69.     const lock = await this.tryAcquireExclusiveLock({
  70.       lockTimeout: LOCK_TIMEOUT_IN_MSECS,
  71.       refreshInterval: LOCK_REFRESH_IN_MSECS,
  72.     })

  73.     // Job is already running, skip
  74.     if (!lock) {
  75.       this.logger.debug(`Job already running in another node, skipping (${executionUuid})`)
  76.       return
  77.     }

  78.     try {
  79.       // Process job logic here
  80.       await this.sampleJobLogic()
  81.     } finally {
  82.       await lock.release()
  83.     }
  84.   }

  85.   private async sampleJobLogic() {
  86.     // dummy processing logic
  87.     return Promise.resolve()
  88.   }


  89. // Job registration
  90. function createTask(job: AbstractBackgroundJob): AsyncTask {
  91.     return new AsyncTask(
  92.         job.jobId,
  93.         () => {
  94.             return job.process()
  95.         },
  96.     )
  97. }
  98. ```

API for schedule


days?: number - how many days to wait before executing the job for the next time;
hours?: number - how many hours to wait before executing the job for the next time;
minutes?: number - how many minutes to wait before executing the job for the next time;
seconds?: number - how many seconds to wait before executing the job for the next time;
milliseconds?: number - how many milliseconds to wait before executing the job for the next time;
runImmediately?: boolean - if set to true, in addition to being executed on a given interval, job will also be executed immediately when added or restarted.

API for jobs


start(): void - starts, or restarts (if it's already running) the job;
stop(): void - stops the job. Can be restarted again with start command;
getStatus(): JobStatus - returns the status of the job, which is one of: running, stopped.

API for scheduler


addSimpleIntervalJob(job: SimpleIntervalJob): void - registers and starts a new job;
addLongIntervalJob(job: SimpleIntervalJob): void - registers and starts a new job with support for intervals longer than 24.85 days;
addIntervalJob(job: SimpleIntervalJob | LongIntervalJob): void - registers and starts new interval-based job;
stop(): void - stops all jobs, registered in the scheduler;
getById(id: string): Job - returns the job with a given id.
existsById(id: string): boolean - returns true if job with given id exists, false otherwise.
getAllJobs(): Job[] - returns all registered jobs.
getAllJobsByStatus(status: JobStatus): Job[] - returns all registered jobs with a given status.
stopById(id: string): void - stops the job with a given id.
removeById(id: string): Job | undefined - stops the job with a given id and removes it from the scheduler. If no such job exists, returns undefined, otherwise returns the job.
startById(id: string): void - starts, or restarts (if it's already running) the job with a given id.

[npm-image]: https://img.shields.io/npm/v/toad-scheduler.svg
[npm-url]: https://npmjs.org/package/toad-scheduler
[downloads-image]: https://img.shields.io/npm/dm/toad-scheduler.svg
[downloads-url]: https://npmjs.org/package/toad-scheduler