jasonbutz.info

Node.js Job Scheduling with Agenda

node, javascript, agenda.js

I have a freelance project I’ve been working on that needed to be setup to allow tasks to be retried automatically. The application depends upon an integration between two 3rd-party systems, and those systems only sync up at certain intervals. If one system didn’t have the information I needed to wait until it did. So I started looking at different scheduling and queueing systems to see if something already existed that would work for my situation. I came across Agenda.js, and it seemed to fit the bill well. I already had a MongoDB in place for my client and this wouldn’t require me using any new services.

I installed the npm package as usual and then went about setting everything up. I wanted to keep as much of the code for agenda and the tasks separate as possible so I created a new directory for all the code, in my case server/lib/processingQueue.

I added an index.js file where I would do most of the setup. I don’t expect to have many items using agenda for the moment so I turned the concurrency way down.

const Agenda = require('agenda');

const agenda = new Agenda({
  db: { address: process.env.MONGODB_URI },
  maxConcurrency: 5,
  defaultConcurrency: 1,
});

agenda.on('ready', function () {
  agenda.define('myJobName', function (job, done) {
    // ...
    done();
  });

  agenda.start();
});

module.exports = {
  agenda,
};

This was a good start, but I didn’t want to clutter the ready event listener with function definitions when I added more tasks in the future. So I created another file in the server/lib/processingQueue directory and gave it the same name as the job. This is where I would define the job and have all the logic.

const { tryDoTheThing } = require('../myLibraryFile');
const { db } = require('../db');
const JOB_NAME = 'myJobName';

function defineJob(agenda) {
  console.log(`Defining ${JOB_NAME} job`);
  agenda.define(JOB_NAME, jobFunction);
}

async function jobFunction(job, done) {
  let { itemId } = job.attrs.data;
  if (!itemId) {
    return done();
  }
  let item = db.findById(itemId);

  let success = await tryDoTheThing(item);

  if (!success) {
    throw new Error(
      `Failed to do the thing in myJobName job, itemId ${itemId}`
    );
  }
  done(success);
}

function scheduleJob(agenda, itemId) {
  var job = agenda.create(JOB_NAME, { itemId });
  job.schedule('in 1 hour');
  job.unique({ 'data.itemId': 'itemId' });
  job.save(function (err) {
    if (err) {
      console.error(`Error saving job for itemId ${itemId}`, err);
    }
    console.log(`Job successfully saved for itemId ${itemId}`);
  });
}

module.exports = {
  JOB_NAME,
  defineJob,
  scheduleJob,
};

After that I went back to my main file for Agenda and updated it to use the new module.

const Agenda = require('agenda');
const myJobName = require('./myJobName');

const agenda = new Agenda({
  db: { address: process.env.MONGODB_URI },
  maxConcurrency: 5,
  defaultConcurrency: 1,
});

agenda.on('ready', function () {
  myJobName.defineJob(agenda);

  agenda.start();
});

module.exports = {
  agenda,
  scheduleMyJobName: async (itemId) => myJobName.scheduleJob(agenda, itemId),
};

Note the exports at the bottom. I’m exporting an arrow function that calls myJobName.scheduleJob. I did this so that no matter where I was I’d only have to import one file to get the Agenda tasks. This works out very nicely because it also meant that if I forgot to import the application in my main JavaScript file it would still start Agenda anywhere. Importing Agenda in multiple places won’t be a problem like it might seem at first because Node only evaluates the file once, then it just passes the same reference after that first import.