Managing Jobs
Learn how to query, monitor, and control existing jobs in your Chronos scheduler.
Querying Jobs
scheduler.jobs(query, [sort], [limit], [skip])
Find jobs in the database using MongoDB queries:
// Find all jobs
const allJobs = await scheduler.jobs({});
// Find jobs by name
const emailJobs = await scheduler.jobs({ name: 'send-email' });
// Find failed jobs
const failedJobs = await scheduler.jobs({
failedAt: { $exists: true }
});
// Find jobs scheduled for today
const today = new Date();
today.setHours(0, 0, 0, 0);
const tomorrow = new Date(today);
tomorrow.setDate(tomorrow.getDate() + 1);
const todaysJobs = await scheduler.jobs({
nextRunAt: { $gte: today, $lt: tomorrow }
});
Sorting and Pagination
// Sort by next run time (ascending)
const sortedJobs = await scheduler.jobs(
{},
{ nextRunAt: 1 },
10, // limit
0 // skip
);
// Get most recent jobs
const recentJobs = await scheduler.jobs(
{},
{ lastRunAt: -1 },
5
);
// Paginated results
const page2Jobs = await scheduler.jobs(
{},
{ nextRunAt: 1 },
20, // 20 per page
20 // skip first 20 (page 2)
);
Advanced Queries
// Jobs with specific data
const userJobs = await scheduler.jobs({
'data.userId': '12345'
});
// Jobs in date range
const thisWeekJobs = await scheduler.jobs({
nextRunAt: {
$gte: startOfWeek,
$lte: endOfWeek
}
});
// Complex queries
const complexQuery = await scheduler.jobs({
$and: [
{ name: { $in: ['email-job', 'sms-job'] } },
{ priority: { $gte: 0 } },
{ failedAt: { $exists: false } }
]
});
Cancelling Jobs
scheduler.cancel(query)
Remove jobs from the database:
// Cancel specific job type
const cancelledCount = await scheduler.cancel({
name: 'send-newsletter'
});
console.log(`Cancelled ${cancelledCount} newsletter jobs`);
// Cancel jobs for specific user
await scheduler.cancel({
'data.userId': 'user123'
});
// Cancel all failed jobs older than 7 days
const sevenDaysAgo = new Date(Date.now() - 7 * 24 * 60 * 60 * 1000);
await scheduler.cancel({
failedAt: { $lt: sevenDaysAgo }
});
// Cancel jobs with specific criteria
await scheduler.cancel({
$and: [
{ name: 'cleanup-task' },
{ nextRunAt: { $gt: new Date() } } // Only future jobs
]
});
Individual Job Cancellation
// Find and cancel specific jobs
const jobs = await scheduler.jobs({
name: 'process-order',
'data.orderId': 'order123'
});
for (const job of jobs) {
await job.remove();
}
Enabling/Disabling Jobs
scheduler.disable(query)
Disable jobs without deleting them:
// Disable all jobs of a type
const disabledCount = await scheduler.disable({
name: 'maintenance-task'
});
// Disable jobs conditionally
await scheduler.disable({
name: 'send-notification',
'data.priority': 'low'
});
scheduler.enable(query)
Re-enable disabled jobs:
// Re-enable all maintenance tasks
const enabledCount = await scheduler.enable({
name: 'maintenance-task'
});
// Enable specific jobs
await scheduler.enable({
name: 'send-notification',
disabled: true
});
Individual Job Control
const jobs = await scheduler.jobs({ name: 'backup-database' });
for (const job of jobs) {
// Disable individual job
job.disable();
await job.save();
// Later, re-enable it
job.enable();
await job.save();
}
Job Status and Monitoring
Check Job Status
const job = await scheduler.jobs({
name: 'process-payment',
'data.orderId': 'order123'
});
if (job.length > 0) {
const paymentJob = job[0];
console.log('Job Status:', {
id: paymentJob.attrs._id,
name: paymentJob.attrs.name,
nextRun: paymentJob.attrs.nextRunAt,
lastRun: paymentJob.attrs.lastRunAt,
failed: !!paymentJob.attrs.failedAt,
locked: !!paymentJob.attrs.lockedAt,
disabled: !!paymentJob.attrs.disabled
});
}
Monitor Job Progress
scheduler.define('long-running-task', async (job) => {
const items = job.attrs.data.items;
for (let i = 0; i < items.length; i++) {
await processItem(items[i]);
// Update progress (custom field)
job.attrs.progress = {
completed: i + 1,
total: items.length,
percentage: Math.round(((i + 1) / items.length) * 100)
};
await job.save();
// Extend lock if needed
if (i % 10 === 0) {
await job.touch();
}
}
});
// Check progress
const progressJobs = await scheduler.jobs({
name: 'long-running-task',
lockedAt: { $exists: true }
});
progressJobs.forEach(job => {
if (job.attrs.progress) {
console.log(`Job ${job.attrs._id}: ${job.attrs.progress.percentage}% complete`);
}
});
Bulk Operations
Update Multiple Jobs
// Find jobs that need updating
const jobs = await scheduler.jobs({
name: 'send-email',
'data.template': 'old-template'
});
// Update them
for (const job of jobs) {
job.attrs.data.template = 'new-template';
await job.save();
}
// Or use MongoDB operations directly
const result = await scheduler._collection.updateMany(
{
name: 'send-email',
'data.template': 'old-template'
},
{
$set: { 'data.template': 'new-template' }
}
);
Reschedule Jobs
// Reschedule all pending jobs to run 1 hour later
const pendingJobs = await scheduler.jobs({
nextRunAt: { $gt: new Date() },
lockedAt: { $exists: false }
});
for (const job of pendingJobs) {
const newTime = new Date(job.attrs.nextRunAt.getTime() + 60 * 60 * 1000);
job.schedule(newTime);
await job.save();
}
Data Cleanup
scheduler.purge()
Remove jobs without defined processors:
// Remove orphaned jobs (jobs without definitions)
const removedCount = await scheduler.purge();
console.log(`Removed ${removedCount} orphaned jobs`);
warning
Only run scheduler.purge()
after you've defined all your jobs. It will remove any job that doesn't have a corresponding define()
call.
Custom Cleanup
// Clean up old completed jobs
async function cleanupOldJobs(daysOld = 30) {
const cutoffDate = new Date();
cutoffDate.setDate(cutoffDate.getDate() - daysOld);
const removed = await scheduler.cancel({
lastFinishedAt: { $lt: cutoffDate }
});
console.log(`Cleaned up ${removed} old jobs`);
return removed;
}
// Schedule regular cleanup
scheduler.define('cleanup-old-jobs', async () => {
await cleanupOldJobs(30);
});
scheduler.every('1 day', 'cleanup-old-jobs');
Job Statistics
Basic Statistics
async function getJobStatistics() {
const [total, pending, running, failed, completed] = await Promise.all([
scheduler.jobs({}).then(jobs => jobs.length),
scheduler.jobs({
nextRunAt: { $gte: new Date() },
lockedAt: { $exists: false }
}).then(jobs => jobs.length),
scheduler.jobs({
lockedAt: { $exists: true },
failedAt: { $exists: false }
}).then(jobs => jobs.length),
scheduler.jobs({
failedAt: { $exists: true }
}).then(jobs => jobs.length),
scheduler.jobs({
lastFinishedAt: { $exists: true },
failedAt: { $exists: false }
}).then(jobs => jobs.length)
]);
return {
total,
pending,
running,
failed,
completed,
successRate: total > 0 ? (completed / (completed + failed)) * 100 : 0
};
}
const stats = await getJobStatistics();
console.log('Job Statistics:', stats);
Job Type Breakdown
async function getJobTypeStats() {
// Use MongoDB aggregation for efficient grouping
const pipeline = [
{
$group: {
_id: '$name',
count: { $sum: 1 },
failed: {
$sum: {
$cond: [{ $exists: ['$failedAt', true] }, 1, 0]
}
},
completed: {
$sum: {
$cond: [
{
$and: [
{ $exists: ['$lastFinishedAt', true] },
{ $not: { $exists: ['$failedAt', true] } }
]
},
1,
0
]
}
}
}
},
{
$project: {
jobType: '$_id',
total: '$count',
failed: 1,
completed: 1,
successRate: {
$cond: [
{ $eq: [{ $add: ['$completed', '$failed'] }, 0] },
0,
{
$multiply: [
{ $divide: ['$completed', { $add: ['$completed', '$failed'] }] },
100
]
}
]
}
}
},
{ $sort: { total: -1 } }
];
return await scheduler._collection.aggregate(pipeline).toArray();
}
const typeStats = await getJobTypeStats();
console.log('Job Type Statistics:', typeStats);
Real-time Monitoring
Event-Based Monitoring
class JobMonitor {
constructor(scheduler) {
this.stats = {
started: 0,
completed: 0,
failed: 0
};
this.setupEventListeners(scheduler);
}
setupEventListeners(scheduler) {
scheduler.on('start', (job) => {
this.stats.started++;
console.log(`Started: ${job.attrs.name} [${this.stats.started} total]`);
});
scheduler.on('success', (job) => {
this.stats.completed++;
console.log(`Completed: ${job.attrs.name} [${this.stats.completed} total]`);
});
scheduler.on('fail', (err, job) => {
this.stats.failed++;
console.log(`Failed: ${job.attrs.name} [${this.stats.failed} total]`);
});
}
getStats() {
return { ...this.stats };
}
reset() {
this.stats = { started: 0, completed: 0, failed: 0 };
}
}
const monitor = new JobMonitor(scheduler);
Health Check
async function healthCheck() {
try {
// Check if scheduler is connected
if (!scheduler._ready) {
return { status: 'error', message: 'Scheduler not ready' };
}
// Check for stuck jobs (locked > 30 minutes)
const thirtyMinutesAgo = new Date(Date.now() - 30 * 60 * 1000);
const stuckJobs = await scheduler.jobs({
lockedAt: { $lt: thirtyMinutesAgo }
});
// Check failed job rate in last hour
const oneHourAgo = new Date(Date.now() - 60 * 60 * 1000);
const recentFailed = await scheduler.jobs({
failedAt: { $gt: oneHourAgo }
});
return {
status: 'healthy',
stuckJobs: stuckJobs.length,
recentFailures: recentFailed.length,
timestamp: new Date()
};
} catch (error) {
return {
status: 'error',
message: error.message,
timestamp: new Date()
};
}
}
// Run health check periodically
setInterval(async () => {
const health = await healthCheck();
console.log('Health Check:', health);
}, 5 * 60 * 1000); // Every 5 minutes
Best Practices
1. Regular Cleanup
// Schedule automatic cleanup
scheduler.define('system-cleanup', async () => {
// Remove old completed jobs
const thirtyDaysAgo = new Date(Date.now() - 30 * 24 * 60 * 60 * 1000);
const removed = await scheduler.cancel({
lastFinishedAt: { $lt: thirtyDaysAgo }
});
console.log(`Cleaned up ${removed} old jobs`);
});
scheduler.every('1 day', 'system-cleanup');
2. Graceful Job Updates
// When updating job logic, handle existing jobs gracefully
scheduler.define('updated-job-v2', async (job) => {
const version = job.attrs.data.version || 1;
if (version === 1) {
// Handle old version data
return await handleV1Job(job);
} else {
// Handle new version
return await handleV2Job(job);
}
});
3. Job Monitoring Dashboard
// Create a simple monitoring endpoint
app.get('/jobs/status', async (req, res) => {
const stats = await getJobStatistics();
const typeStats = await getJobTypeStats();
const health = await healthCheck();
res.json({
overview: stats,
byType: typeStats,
health: health
});
});