Skip to main content

Managing Jobs

Learn how to query, monitor, and control existing jobs in your Chronos scheduler.

Querying Jobs

scheduler.jobs(query, [sort], [limit], [skip])

Find jobs in the database using MongoDB queries:

// Find all jobs
const allJobs = await scheduler.jobs({});

// Find jobs by name
const emailJobs = await scheduler.jobs({ name: 'send-email' });

// Find failed jobs
const failedJobs = await scheduler.jobs({
failedAt: { $exists: true }
});

// Find jobs scheduled for today
const today = new Date();
today.setHours(0, 0, 0, 0);
const tomorrow = new Date(today);
tomorrow.setDate(tomorrow.getDate() + 1);

const todaysJobs = await scheduler.jobs({
nextRunAt: { $gte: today, $lt: tomorrow }
});

Sorting and Pagination

// Sort by next run time (ascending)
const sortedJobs = await scheduler.jobs(
{},
{ nextRunAt: 1 },
10, // limit
0 // skip
);

// Get most recent jobs
const recentJobs = await scheduler.jobs(
{},
{ lastRunAt: -1 },
5
);

// Paginated results
const page2Jobs = await scheduler.jobs(
{},
{ nextRunAt: 1 },
20, // 20 per page
20 // skip first 20 (page 2)
);

Advanced Queries

// Jobs with specific data
const userJobs = await scheduler.jobs({
'data.userId': '12345'
});

// Jobs in date range
const thisWeekJobs = await scheduler.jobs({
nextRunAt: {
$gte: startOfWeek,
$lte: endOfWeek
}
});

// Complex queries
const complexQuery = await scheduler.jobs({
$and: [
{ name: { $in: ['email-job', 'sms-job'] } },
{ priority: { $gte: 0 } },
{ failedAt: { $exists: false } }
]
});

Cancelling Jobs

scheduler.cancel(query)

Remove jobs from the database:

// Cancel specific job type
const cancelledCount = await scheduler.cancel({
name: 'send-newsletter'
});

console.log(`Cancelled ${cancelledCount} newsletter jobs`);

// Cancel jobs for specific user
await scheduler.cancel({
'data.userId': 'user123'
});

// Cancel all failed jobs older than 7 days
const sevenDaysAgo = new Date(Date.now() - 7 * 24 * 60 * 60 * 1000);
await scheduler.cancel({
failedAt: { $lt: sevenDaysAgo }
});

// Cancel jobs with specific criteria
await scheduler.cancel({
$and: [
{ name: 'cleanup-task' },
{ nextRunAt: { $gt: new Date() } } // Only future jobs
]
});

Individual Job Cancellation

// Find and cancel specific jobs
const jobs = await scheduler.jobs({
name: 'process-order',
'data.orderId': 'order123'
});

for (const job of jobs) {
await job.remove();
}

Enabling/Disabling Jobs

scheduler.disable(query)

Disable jobs without deleting them:

// Disable all jobs of a type
const disabledCount = await scheduler.disable({
name: 'maintenance-task'
});

// Disable jobs conditionally
await scheduler.disable({
name: 'send-notification',
'data.priority': 'low'
});

scheduler.enable(query)

Re-enable disabled jobs:

// Re-enable all maintenance tasks
const enabledCount = await scheduler.enable({
name: 'maintenance-task'
});

// Enable specific jobs
await scheduler.enable({
name: 'send-notification',
disabled: true
});

Individual Job Control

const jobs = await scheduler.jobs({ name: 'backup-database' });

for (const job of jobs) {
// Disable individual job
job.disable();
await job.save();

// Later, re-enable it
job.enable();
await job.save();
}

Job Status and Monitoring

Check Job Status

const job = await scheduler.jobs({ 
name: 'process-payment',
'data.orderId': 'order123'
});

if (job.length > 0) {
const paymentJob = job[0];

console.log('Job Status:', {
id: paymentJob.attrs._id,
name: paymentJob.attrs.name,
nextRun: paymentJob.attrs.nextRunAt,
lastRun: paymentJob.attrs.lastRunAt,
failed: !!paymentJob.attrs.failedAt,
locked: !!paymentJob.attrs.lockedAt,
disabled: !!paymentJob.attrs.disabled
});
}

Monitor Job Progress

scheduler.define('long-running-task', async (job) => {
const items = job.attrs.data.items;

for (let i = 0; i < items.length; i++) {
await processItem(items[i]);

// Update progress (custom field)
job.attrs.progress = {
completed: i + 1,
total: items.length,
percentage: Math.round(((i + 1) / items.length) * 100)
};

await job.save();

// Extend lock if needed
if (i % 10 === 0) {
await job.touch();
}
}
});

// Check progress
const progressJobs = await scheduler.jobs({
name: 'long-running-task',
lockedAt: { $exists: true }
});

progressJobs.forEach(job => {
if (job.attrs.progress) {
console.log(`Job ${job.attrs._id}: ${job.attrs.progress.percentage}% complete`);
}
});

Bulk Operations

Update Multiple Jobs

// Find jobs that need updating
const jobs = await scheduler.jobs({
name: 'send-email',
'data.template': 'old-template'
});

// Update them
for (const job of jobs) {
job.attrs.data.template = 'new-template';
await job.save();
}

// Or use MongoDB operations directly
const result = await scheduler._collection.updateMany(
{
name: 'send-email',
'data.template': 'old-template'
},
{
$set: { 'data.template': 'new-template' }
}
);

Reschedule Jobs

// Reschedule all pending jobs to run 1 hour later
const pendingJobs = await scheduler.jobs({
nextRunAt: { $gt: new Date() },
lockedAt: { $exists: false }
});

for (const job of pendingJobs) {
const newTime = new Date(job.attrs.nextRunAt.getTime() + 60 * 60 * 1000);
job.schedule(newTime);
await job.save();
}

Data Cleanup

scheduler.purge()

Remove jobs without defined processors:

// Remove orphaned jobs (jobs without definitions)
const removedCount = await scheduler.purge();
console.log(`Removed ${removedCount} orphaned jobs`);
warning

Only run scheduler.purge() after you've defined all your jobs. It will remove any job that doesn't have a corresponding define() call.

Custom Cleanup

// Clean up old completed jobs
async function cleanupOldJobs(daysOld = 30) {
const cutoffDate = new Date();
cutoffDate.setDate(cutoffDate.getDate() - daysOld);

const removed = await scheduler.cancel({
lastFinishedAt: { $lt: cutoffDate }
});

console.log(`Cleaned up ${removed} old jobs`);
return removed;
}

// Schedule regular cleanup
scheduler.define('cleanup-old-jobs', async () => {
await cleanupOldJobs(30);
});

scheduler.every('1 day', 'cleanup-old-jobs');

Job Statistics

Basic Statistics

async function getJobStatistics() {
const [total, pending, running, failed, completed] = await Promise.all([
scheduler.jobs({}).then(jobs => jobs.length),
scheduler.jobs({
nextRunAt: { $gte: new Date() },
lockedAt: { $exists: false }
}).then(jobs => jobs.length),
scheduler.jobs({
lockedAt: { $exists: true },
failedAt: { $exists: false }
}).then(jobs => jobs.length),
scheduler.jobs({
failedAt: { $exists: true }
}).then(jobs => jobs.length),
scheduler.jobs({
lastFinishedAt: { $exists: true },
failedAt: { $exists: false }
}).then(jobs => jobs.length)
]);

return {
total,
pending,
running,
failed,
completed,
successRate: total > 0 ? (completed / (completed + failed)) * 100 : 0
};
}

const stats = await getJobStatistics();
console.log('Job Statistics:', stats);

Job Type Breakdown

async function getJobTypeStats() {
// Use MongoDB aggregation for efficient grouping
const pipeline = [
{
$group: {
_id: '$name',
count: { $sum: 1 },
failed: {
$sum: {
$cond: [{ $exists: ['$failedAt', true] }, 1, 0]
}
},
completed: {
$sum: {
$cond: [
{
$and: [
{ $exists: ['$lastFinishedAt', true] },
{ $not: { $exists: ['$failedAt', true] } }
]
},
1,
0
]
}
}
}
},
{
$project: {
jobType: '$_id',
total: '$count',
failed: 1,
completed: 1,
successRate: {
$cond: [
{ $eq: [{ $add: ['$completed', '$failed'] }, 0] },
0,
{
$multiply: [
{ $divide: ['$completed', { $add: ['$completed', '$failed'] }] },
100
]
}
]
}
}
},
{ $sort: { total: -1 } }
];

return await scheduler._collection.aggregate(pipeline).toArray();
}

const typeStats = await getJobTypeStats();
console.log('Job Type Statistics:', typeStats);

Real-time Monitoring

Event-Based Monitoring

class JobMonitor {
constructor(scheduler) {
this.stats = {
started: 0,
completed: 0,
failed: 0
};

this.setupEventListeners(scheduler);
}

setupEventListeners(scheduler) {
scheduler.on('start', (job) => {
this.stats.started++;
console.log(`Started: ${job.attrs.name} [${this.stats.started} total]`);
});

scheduler.on('success', (job) => {
this.stats.completed++;
console.log(`Completed: ${job.attrs.name} [${this.stats.completed} total]`);
});

scheduler.on('fail', (err, job) => {
this.stats.failed++;
console.log(`Failed: ${job.attrs.name} [${this.stats.failed} total]`);
});
}

getStats() {
return { ...this.stats };
}

reset() {
this.stats = { started: 0, completed: 0, failed: 0 };
}
}

const monitor = new JobMonitor(scheduler);

Health Check

async function healthCheck() {
try {
// Check if scheduler is connected
if (!scheduler._ready) {
return { status: 'error', message: 'Scheduler not ready' };
}

// Check for stuck jobs (locked > 30 minutes)
const thirtyMinutesAgo = new Date(Date.now() - 30 * 60 * 1000);
const stuckJobs = await scheduler.jobs({
lockedAt: { $lt: thirtyMinutesAgo }
});

// Check failed job rate in last hour
const oneHourAgo = new Date(Date.now() - 60 * 60 * 1000);
const recentFailed = await scheduler.jobs({
failedAt: { $gt: oneHourAgo }
});

return {
status: 'healthy',
stuckJobs: stuckJobs.length,
recentFailures: recentFailed.length,
timestamp: new Date()
};

} catch (error) {
return {
status: 'error',
message: error.message,
timestamp: new Date()
};
}
}

// Run health check periodically
setInterval(async () => {
const health = await healthCheck();
console.log('Health Check:', health);
}, 5 * 60 * 1000); // Every 5 minutes

Best Practices

1. Regular Cleanup

// Schedule automatic cleanup
scheduler.define('system-cleanup', async () => {
// Remove old completed jobs
const thirtyDaysAgo = new Date(Date.now() - 30 * 24 * 60 * 60 * 1000);
const removed = await scheduler.cancel({
lastFinishedAt: { $lt: thirtyDaysAgo }
});

console.log(`Cleaned up ${removed} old jobs`);
});

scheduler.every('1 day', 'system-cleanup');

2. Graceful Job Updates

// When updating job logic, handle existing jobs gracefully
scheduler.define('updated-job-v2', async (job) => {
const version = job.attrs.data.version || 1;

if (version === 1) {
// Handle old version data
return await handleV1Job(job);
} else {
// Handle new version
return await handleV2Job(job);
}
});

3. Job Monitoring Dashboard

// Create a simple monitoring endpoint
app.get('/jobs/status', async (req, res) => {
const stats = await getJobStatistics();
const typeStats = await getJobTypeStats();
const health = await healthCheck();

res.json({
overview: stats,
byType: typeStats,
health: health
});
});