Queue.js

import kue from 'kue';
import _ from 'lodash';
import System from './System';

async function processorWrapper(job, processor, resolve, reject) {
	let res;
	let jobDetails;
	if (job.state() === 'active') {
		reject(new Error('Job already processing'));
		return;
	}
	if (job.state() === 'inactive') {
		job.active();
		job.attempt(() => {});
		try {
			res = await processor(job.data.input);
		}
		catch (e) {
			job.error(e.message);
			job._error = e.message;
			if (!job.data.options.noFailure) {
				job.failed();
				reject(new Error('Job failed ' + e));
				return;
			}
		}
		if (!_.isNil(res)) {
			job.result = res;
			job.set('result', JSON.stringify(res));
		}
		job.complete();
		jobDetails = _.pick(job.toJSON(), ['id', 'type', 'data', 'result', 'state', 'error', 'created_at', 'updated_at', 'attempts']);
		jobDetails.attempts.made++;
		jobDetails.attempts.remaining--;
	}
	// Job is already complete/failed
	else {
		jobDetails = _.pick(job.toJSON(), ['id', 'type', 'data', 'result', 'state', 'error', 'created_at', 'updated_at', 'attempts']);
	}
	resolve(jobDetails);
}

async function iterateOverJobs(queue, jobType, numOfJobs, olderThan, action) {
	const now = Date.now();
	return new Promise((resolve, reject) => {
		kue.Job.rangeByType(queue, jobType, 0, numOfJobs, 'asc', async (err, jobs) => {
			if (err) {
				reject(new Error('Could not fetch jobs: ' + err));
				return;
			}
			for (let i = 0; i < jobs.length; i++) {
				if (now - jobs[i].created_at > olderThan) {
					jobs[i].log(`Doing ${action}`);
					jobs[i][action](() => {});
				}
				else break;
			}
			resolve();
		});
	});
}

/**
 * Job Queue
 */
class Queue {
	static jobs;
	static queues = {};
	static logger = console;

	/**
	 * Initialise the redis connection
	 * @param {object} [redis={port: 6379, host: '127.0.0.1'}] Redis connection settings object
	 * @param {boolean} [enableWatchdog=false] Will watch for stuck jobs due to any connection issues
	 * @see https://github.com/Automattic/kue#unstable-redis-connections
	 */
	static init(redis, enableWatchdog) {
		if (!Queue.jobs) {
			Queue.jobs = kue.createQueue({
				redis,
			});
			if (enableWatchdog)	Queue.jobs.watchStuckJobs(10000);
			Queue.jobs.on('error', (err) => {
				Queue.logger.error(`[Queue] ${err}`);
			});

			System.onExit(Queue.exit);
		}
	}

	/**
	 * @typedef {object} queueOpts
	 * @property {boolean} [enableWatchdog=false] Will watch for stuck jobs default: false
	 * @property {Console} [logger] default logs to console
	 */

	/**
	 * Create a new Queue
	 * The redis and enableWatchdog settings are required only the first time to init
	 * Can also be set beforehand by calling Queue.init()
	 * @param {string} name Name of the queue
	 * @param {object} [redis={port: 6379, host: '127.0.0.1'}] Redis connection settings object
	 * @param {boolean|queueOpts} [options={}] enableWatchdog or opts object (default = {})
	 * Read more here :  https://github.com/Automattic/kue#unstable-redis-connections
	 */
	constructor(name, redis = {port: 6379, host: '127.0.0.1'}, options = {}) {
		this.name = `${name}${process.env.NODE_ENV ? '-' + process.env.NODE_ENV : ''}`;

		if (typeof options !== 'object') options = {enableWatchdog: !!options};
		else options.enableWatchdog = !!options.enableWatchdog;
		if (!Queue.queues[this.name]) Queue.queues[this.name] = {processorAdded: false};

		this.logger = options.logger || Queue.logger;
		this.paused = undefined;
		this.kueCtx = undefined;

		Queue.init(redis, options.enableWatchdog);
	}


	/**
	 * @typedef {object} addOpts
	 * @property {number|string} [priority=0] Priority of the job, lower number is better
	 * Options are : low: 10, normal: 0, medium: -5, high: -10, critical: -15 | Or any integer
	 * @property {number} [attempts] Number of attempts
	 * @property {number} [delay] Delay in between jobs
	 * @property {number} [ttl] Time to live for job
	 * @property {boolean} [removeOnComplete] Remove job on completion
	 * @property {boolean} [noFailure] Mark job as complete even if it fails
	 */

	/**
	 * Add a job to the Queue
	 * @param {*} input Job data
	 * @param {addOpts} opts
	 * @return {number} The ID of the job created
	 */
	async addJob(input, {
		priority = 0,
		attempts = this.attempts,
		delay = this.delay,
		ttl = this.ttl,
		removeOnComplete = this.removeOnComplete,
		noFailure = this.noFailure ? true : undefined,
		_getResult = false,
		_timeout,
		_dummy,
	} = {}) {
		return new Promise((resolve, reject) => {
			let completed = false;
			const options = {
				noFailure,
				_dummy,
				_timeout,
			};
			const job = Queue.jobs
				.create(this.name, {input, options})
				.priority(priority);

			// default = 1
			if (attempts) {
				job.attempts(this.attempts);
			}
			// default = 0
			if (delay) {
				job.delay(this.delay).backoff(true);
			}
			// default = 0, i.e. infinite
			if (ttl > 0) {
				job.ttl(this.ttl);
			}
			// default = false
			if (removeOnComplete) {
				job.removeOnComplete(true);
			}

			if (_getResult) {
				job.on('complete', (res) => {
					completed = true;
					resolve(res);
				})
					.on('failed', (errMsg) => {
						if (!completed) {
							completed = true;
							reject(new Error(errMsg));
						}
					})
					.on('remove', () => {
						if (!completed) {
							completed = true;
							reject(new Error('Job Removed before completion'));
						}
					});
			}

			job.save((err) => {
				if (err) reject(new Error(err));
				else if (!_getResult) resolve(job.id);
				else if (!_.isNil(_timeout)) {
					setTimeout(() => {
						if (!completed) {
							completed = true;
							job.log('Error: Timed out');
							job.failed();
							reject(new Error('Timed out'));
						}
					}, _timeout);
				}
			});
		});
	}

	/**
	 * Add a job to the Queue, wait for it to process and return result
	 * Preferably set PRIORITY HIGH or it might timeout if lots of other tasks are in queue
	 * Queue will process job only if timeout is not passed when processing begins
	 * @param {any} input Job data
	 * @param {addOpts} opts
	 * @param {number} [timeout=180000] wait for this time else throw err
	 * @return {any} result
	 */
	async addAndProcess(input, opts = {}, timeout = 180000) {
		opts._getResult = true;
		opts._timeout = timeout;
		return this.addJob(input, opts);
	}

	/**
	 * Set default number of retry attempts for any job added later
	 * @param {number} attempts Number of attempts (>= 0), default = 1
	 */
	setAttempts(attempts) {
		this.attempts = attempts;
	}

	/**
	 * Set delay b/w successive jobs for any job added later
	 * @param {number} delay Delay b/w jobs, milliseconds, default = 0
	 */
	setDelay(delay) {
		this.delay = delay;
	}

	/**
	 * Set default TTL (time to live) for new jobs added from now on,
	 * will fail job if not completed in TTL time
	 * @param {number} ttl Time in milliseconds, infinite when 0. default = 0
	 */
	setTTL(ttl) {
		this.ttl = ttl;
	}

	/**
	 * Sets default removeOnComplete for any job added to this Queue from now on
	 * @param {boolean} removeOnComplete default = false
	 */
	setRemoveOnCompletion(removeOnComplete) {
		this.removeOnComplete = removeOnComplete;
	}

	/**
	 * Sets default noFailure for any job added to this Queue from now on.
	 * This will mark the job complete even if it fails when true
	 * @param {boolean} noFailure default = false
	 */
	setNoFailure(noFailure) {
		this.noFailure = noFailure;
	}

	/**
	 * An async function which will be called to process the job data
	 * @callback processorCallback
	 * @param {*} jobData The information saved in the job during adding of job
	 * @return {*} Will be saved in return field in JobDetails
	 */

	/**
	 * Attach a processor to the Queue which will keep getting jobs as it completes them
	 * @param {processorCallback} processor Function to be called to process the job data
	 * @param {number} [concurrency=1] The number of jobs this processor can handle parallely
	 */
	async addProcessor(processor, concurrency = 1) {
		if (Queue.queues[this.name].processorAdded) {
			throw new Error(`Processor already added for queue ${this.name}, can only be set once per queue.`);
		}
		// Increase max event listeners limit
		Queue.jobs.setMaxListeners(Queue.jobs.getMaxListeners() + concurrency);

		// ctx Can be used to pause and resume worker,
		// For detailed info : https://github.com/Automattic/kue#pause-processing
		Queue.jobs.process(this.name, concurrency, async (job, ctx, done) => {
			if (!this.kueCtx) this.kueCtx = ctx;
			if (job.data.options._dummy) {
				done(null, true);
				return;
			}
			if (job.data.options._timeout !== undefined &&
				(Date.now() - job.created_at) > job.data.options._timeout) {
				job.log(`Time passed: ${(Date.now() - job.created_at)}, Timeout: ${job.data.options._timeout}`);
				done(new Error('Timed out'));
				return;
			}
			job.log('Start processing');
			let res;
			try {
				res = await processor(job.data.input);
			}
			catch (e) {
				job.log('Errored: ' + e.message);
				if (job.data.options.noFailure) {
					job.error(e);
				}
				else {
					done(new Error(this.name + ' Job failed: ' + e.message));
					return;
				}
			}
			job.log('Done');
			done(null, res);
		});

		this.paused = false;
		Queue.queues[this.name].processorAdded = true;
		try {
			// We add this so that keuCtx gets set without having to wait for a job to be added
			await this.addAndProcess({}, {
				_dummy: true,
				priority: Number.MIN_SAFE_INTEGER,
				removeOnComplete: true,
			}, 10000);
		}
		catch (err) {
			this.logger.error('[Queue] Could not set kue ctx');
		}
	}

	/**
	 * Pause Queue processing
	 * Gives timeout time to all workers to complete their current jobs then stops them
	 * @param {number} [timeout=5000] Time to complete current jobs in ms
	 */
	async pauseProcessor(timeout = 5000) {
		if (!Queue.queues[this.name].processorAdded) throw new Error('No processor present');
		if (this.paused) return;
		await new Promise((resolve, reject) => {
			if (!this.kueCtx) {
				reject(new Error('Worker context not yet available, please add atleast one job before pausing/resuming'));
				return;
			}
			this.kueCtx.pause(timeout, (err) => {
				if (err) reject(err);
				else resolve();
			});
		});
		this.paused = true;
	}

	/**
	 * Resume Queue processing
	 */
	resumeProcessor() {
		if (!Queue.queues[this.name].processorAdded) throw new Error('No processor present');
		if (!this.paused) return;
		if (!this.kueCtx) {
			throw new Error('Worker context not yet available, please add atleast one job before pausing/resuming');
		}
		this.kueCtx.resume();
		this.paused = false;
	}

	/**
	 * Return count of jobs in Queue of JobType
	 * @param {string} queue Queue name
	 * @param {string} jobType One of {'inactive', 'delayed' ,'active', 'complete', 'failed'}
	 * @return {number} count
	 */
	static async getCount(queue, jobType) {
		return new Promise((resolve, reject) => {
			Queue.jobs[jobType + 'Count'](queue, (err, total) => {
				if (err) reject(new Error('Could not get total ' + jobType + ' jobs: ' + err));
				else resolve(total);
			});
		});
	}

	/**
	 * Return count of inactive jobs in Queue
	 * @return {number} inactiveCount
	 */
	async inactiveJobs() {
		return Queue.getCount(this.name, 'inactive');
	}

	/**
	 * Alias for inactiveJobs
 	 * @return {number} inactiveCount
	 */
	async pendingJobs() {
		return this.inactiveJobs();
	}

	/**
	 * Return count of completed jobs in Queue
	 * Might return 0 if removeOnComplete was true
	 * @return {number} completeCount
	 */
	async completedJobs() {
		return Queue.getCount(this.name, 'complete');
	}

	/**
	 * Return count of failed jobs in Queue
	 * @return {number} failedCount
	 */
	async failedJobs() {
		return Queue.getCount(this.name, 'failed');
	}

	/**
	 * Return count of delayed jobs in Queue
	 * @return {number} delayedCount
	 */
	async delayedJobs() {
		return Queue.getCount(this.name, 'delayed');
	}

	/**
	 * Return count of active jobs in Queue
	 * @return {number} activeCount
	 */
	async activeJobs() {
		return Queue.getCount(this.name, 'active');
	}

	/**
	 * Internal data object
	 * @typedef {object} internalData
	 * @property {*} input Input data given to job
	 * @property {object} options Internal options used to set noFailure and extra properties
	 */

	/**
	 * Job status object.
	 * @typedef {object} jobDetails
	 * @property {number} id
	 * @property {string} type Name of the Queue
	 * @property {internalData} data Internal data object, includes input and options
	 * @property {*} result Result of the processor callback
	 * @property {string} state One of {'inactive', 'delayed' ,'active', 'complete', 'failed'}
	 * @property {*} error
	 * @property {number} created_at unix time stamp
	 * @property {number} updated_at unix time stamp
	 * @property {object} attempts Attempts Object
	 */

	/**
	 * Process a single job in the Queue and mark it complete or failed,
	 * for when you want to manually process jobs
	 * @param {processorCallback} processor Function to be called to process the job data, without ctx
	 * @return {jobDetails} Job object of completed job
	 */
	async processJob(processor) {
		return new Promise((resolve, reject) => {
			kue.Job.rangeByType(this.name, 'inactive', 0, 1, 'asc', async (err, jobs) => {
				if (jobs.length === 0 || err) {
					reject(err || new Error('Queue empty'));
					return;
				}
				const job = jobs[0];
				await processorWrapper(job, processor, resolve, reject);
			});
		});
	}

	/**
	 * Cleanup function to be called during startup,
	 * resets active jobs older than specified time
	 * @param {number} [olderThan=5000] Time in milliseconds, default = 5000
	 */
	async cleanup(olderThan = 5000) {
		const n = await Queue.getCount(this.name, 'active');
		await iterateOverJobs(this.name, 'active', n, olderThan, 'inactive');
	}

	/**
	 * Removes any old jobs from queue
	 * older than specified time
	 * @param {number} [olderThan=3600000] Time in milliseconds, default = 3600000 (1 hr)
	 */
	async delete(olderThan = 3600000) {
		const completed = await this.completedJobs();
		const removeComplete = iterateOverJobs(this.name, 'complete', completed, olderThan, 'remove');

		const failed = await this.failedJobs();
		const removeFailed = iterateOverJobs(this.name, 'failed', failed, olderThan, 'remove');

		const inactive = await this.pendingJobs();
		const removeInactive = iterateOverJobs(this.name, 'inactive', inactive, olderThan, 'remove');

		const delayed = await this.delayedJobs();
		const removeDelayed = iterateOverJobs(this.name, 'delayed', delayed, olderThan, 'remove');

		const active = await this.activeJobs();
		const removeActive = iterateOverJobs(this.name, 'active', active, olderThan, 'remove');

		await Promise.all([removeComplete, removeFailed, removeInactive, removeDelayed, removeActive]);
	}

	/**
	 * Function to query the status of a job
	 * @param {number} jobId Job id for which status info is required
	 * @return {jobDetails} Object full of job details like state, time, attempts, etc.
	 */
	static async status(jobId) {
		return new Promise((resolve, reject) => {
			kue.Job.get(jobId, (err, job) => {
				if (err || !job) {
					reject(new Error('Job not found ' + err));
					return;
				}
				job = _.pick(job.toJSON(), ['id', 'type', 'data', 'result', 'state', 'error', 'created_at', 'updated_at', 'attempts']);
				resolve(job);
			});
		});
	}

	/**
	 * Manualy process a specific Job. Returns existing result if job already processed
	 * @param {number} jobId Id of the job to be processed
	 * @param {processorCallback} processor Function to be called to process the job data, without ctx
	 * @return {jobDetails} Result of processor function and job object of completed job
	 */
	static async processJobById(jobId, processor) {
		return new Promise((resolve, reject) => {
			kue.Job.get(jobId, async (err, job) => {
				if (err || !job) {
					reject(new Error('Could not fetch job' + err));
					return;
				}
				await processorWrapper(job, processor, resolve, reject);
			});
		});
	}

	/**
	 * Function shuts down the Queue gracefully.
	 * Waits for active jobs to complete until timeout, then marks them failed.
	 * @param {number} [timeout=10000] Time in milliseconds, default = 10000
	 * @return {boolean}
	 */
	static async exit(timeout = 10000) {
		return new Promise((resolve) => {
			if (Queue.jobs === undefined) {
				resolve(true);
			}
			else {
				Queue.logger.log('[Queue] Shutting down redis queue');
				Queue.jobs.shutdown(timeout, () => {
					Queue.jobs = undefined;
					resolve(true);
				});
			}
		});
	}
}

export default Queue;