Defining Jobs
Learn how to define job processors that handle your background tasks efficiently and reliably.
Basic Job Definition
Use scheduler.define()
to specify what happens when a job executes:
scheduler.define('job-name', async (job) => {
// Your job logic here
console.log('Job executed with data:', job.attrs.data);
});
Job Function Signatures
Chronos supports multiple function signatures for different use cases:
Async/Await (Recommended)
scheduler.define('async-job', async (job) => {
const result = await someAsyncOperation();
return result; // Optional return value
});
Promise-based
scheduler.define('promise-job', (job) => {
return someAsyncOperation()
.then(result => {
console.log('Job completed');
return result;
});
});
Callback-based (Legacy)
scheduler.define('callback-job', (job, done) => {
someAsyncOperation((err, result) => {
if (err) {
return done(err);
}
done(); // Must call done() when finished
});
});
Synchronous
scheduler.define('sync-job', (job) => {
// Synchronous operations
const result = processSomeData(job.attrs.data);
return result;
});
Job Options
Configure job behavior with options:
scheduler.define('configured-job', jobFunction, {
concurrency: 5, // Max concurrent instances
lockLifetime: 60000, // Lock timeout (60 seconds)
priority: 'high', // Job priority
shouldSaveResult: true, // Save return value to database
lockLimit: 10 // Max locked instances
});
Concurrency Control
Limit how many instances of a job can run simultaneously:
// Only 2 instances of this job can run at once
scheduler.define('resource-intensive', async (job) => {
await heavyComputation();
}, { concurrency: 2 });
// Single instance only (no concurrency)
scheduler.define('exclusive-job', async (job) => {
await criticalOperation();
}, { concurrency: 1 });
Lock Management
Control how long jobs stay locked:
scheduler.define('long-running-job', async (job) => {
// This job might take up to 5 minutes
await longProcess();
}, {
lockLifetime: 300000 // 5 minutes
});
Priority Levels
Set job execution priority:
// High priority - runs before normal jobs
scheduler.define('urgent-notification', async (job) => {
await sendUrgentAlert();
}, { priority: 'high' });
// Low priority - runs after normal jobs
scheduler.define('cleanup-task', async (job) => {
await cleanupOldFiles();
}, { priority: 'low' });
Priority values:
highest
(20)high
(10)normal
(0) - defaultlow
(-10)lowest
(-20)- Custom numbers: any integer value
Result Persistence
Save job return values to the database:
scheduler.define('data-processor', async (job) => {
const result = await processData(job.attrs.data);
return result; // Saved to job.attrs.result if shouldSaveResult is true
}, {
shouldSaveResult: true
});
// Later, access the result
const jobs = await scheduler.jobs({ name: 'data-processor' });
console.log('Job result:', jobs[0].attrs.result);
Accessing Job Data
Jobs receive data through the job.attrs
object:
scheduler.define('email-sender', async (job) => {
const { to, subject, body, priority } = job.attrs.data;
const jobId = job.attrs._id;
const scheduledAt = job.attrs.nextRunAt;
console.log(`Sending email ${jobId} scheduled for ${scheduledAt}`);
await emailService.send({ to, subject, body });
});
Job Attributes
Key job attributes available in job.attrs
:
{
_id: ObjectId, // Unique job identifier
name: String, // Job name
data: Object, // Job data payload
type: String, // 'normal' or 'single'
priority: Number, // Job priority (-20 to 20)
nextRunAt: Date, // When job should run
lastRunAt: Date, // When job last ran
lastFinishedAt: Date, // When job last completed
lockedAt: Date, // When job was locked for processing
failedAt: Date, // When job failed (if applicable)
failReason: String, // Failure reason
repeatInterval: String, // Repeat pattern (if recurring)
result: Any // Job return value (if shouldSaveResult: true)
}
Error Handling
Implement robust error handling in your jobs:
Basic Error Handling
scheduler.define('api-call', async (job) => {
try {
const response = await fetch(job.attrs.data.url);
if (!response.ok) {
throw new Error(`HTTP ${response.status}: ${response.statusText}`);
}
return await response.json();
} catch (error) {
console.error('API call failed:', error.message);
throw error; // Re-throw to mark job as failed
}
});
Custom Failure Handling
scheduler.define('payment-processor', async (job) => {
const { orderId, amount } = job.attrs.data;
try {
const result = await processPayment(orderId, amount);
return result;
} catch (error) {
// Custom failure reason
job.fail(`Payment failed for order ${orderId}: ${error.message}`);
// Schedule retry for temporary failures
if (error.code === 'TEMPORARY_FAILURE') {
await scheduler.schedule('in 5 minutes', 'payment-processor', {
orderId,
amount,
retryCount: (job.attrs.data.retryCount || 0) + 1
});
}
throw error; // Still mark original job as failed
}
});
Retry Logic
scheduler.define('resilient-job', async (job) => {
const maxRetries = 3;
const retryCount = job.attrs.data.retryCount || 0;
try {
await unreliableOperation();
} catch (error) {
if (retryCount < maxRetries) {
// Schedule retry with exponential backoff
const delay = Math.pow(2, retryCount) * 1000; // 1s, 2s, 4s
await scheduler.schedule(`in ${delay} milliseconds`, 'resilient-job', {
...job.attrs.data,
retryCount: retryCount + 1
});
return; // Don't fail the original job
}
// Max retries exceeded
throw new Error(`Job failed after ${maxRetries} retries: ${error.message}`);
}
});
Long-Running Jobs
For jobs that take a long time, manage the lock lifetime:
scheduler.define('file-processor', async (job) => {
const files = job.attrs.data.files;
for (let i = 0; i < files.length; i++) {
await processFile(files[i]);
// Extend lock every 10 files
if (i % 10 === 0) {
await job.touch(); // Reset lock timeout
}
}
}, {
lockLifetime: 600000 // 10 minutes initial lock
});
Job Validation
Validate job data before processing:
function validateEmailData(data) {
const required = ['to', 'subject', 'body'];
for (const field of required) {
if (!data[field]) {
throw new Error(`Missing required field: ${field}`);
}
}
if (!/^[^\s@]+@[^\s@]+\.[^\s@]+$/.test(data.to)) {
throw new Error('Invalid email address');
}
}
scheduler.define('send-email', async (job) => {
validateEmailData(job.attrs.data);
const { to, subject, body } = job.attrs.data;
await emailService.send({ to, subject, body });
});
Dynamic Job Definitions
Define jobs dynamically based on configuration:
const emailTemplates = {
welcome: { subject: 'Welcome!', template: 'welcome.html' },
reset: { subject: 'Password Reset', template: 'reset.html' }
};
Object.entries(emailTemplates).forEach(([type, config]) => {
scheduler.define(`send-${type}-email`, async (job) => {
const { to, data } = job.attrs.data;
const html = await renderTemplate(config.template, data);
await emailService.send({
to,
subject: config.subject,
html
});
});
});
TypeScript Support
Use TypeScript for better type safety:
interface EmailJobData {
to: string;
subject: string;
body: string;
template?: string;
}
scheduler.define('typed-email', async (job: any) => {
const data: EmailJobData = job.attrs.data;
// Full type checking and IntelliSense
await emailService.send({
to: data.to,
subject: data.subject,
html: data.body
});
});
Job Definition Patterns
Factory Pattern
function createApiSyncJob(serviceName, endpoint) {
return async (job) => {
const { params } = job.attrs.data;
console.log(`Syncing ${serviceName} from ${endpoint}`);
const response = await fetch(endpoint, {
method: 'POST',
body: JSON.stringify(params)
});
return await response.json();
};
}
scheduler.define('sync-users', createApiSyncJob('Users', '/api/users/sync'));
scheduler.define('sync-orders', createApiSyncJob('Orders', '/api/orders/sync'));
Class-Based Jobs
class EmailJob {
constructor(emailService, templateEngine) {
this.emailService = emailService;
this.templateEngine = templateEngine;
}
async execute(job) {
const { to, templateName, data } = job.attrs.data;
const html = await this.templateEngine.render(templateName, data);
return await this.emailService.send({ to, html });
}
}
const emailJob = new EmailJob(emailService, templateEngine);
scheduler.define('send-templated-email', emailJob.execute.bind(emailJob));
Best Practices
1. Keep Jobs Focused
// Good: Single responsibility
scheduler.define('send-welcome-email', async (job) => {
await sendWelcomeEmail(job.attrs.data);
});
scheduler.define('create-user-profile', async (job) => {
await createUserProfile(job.attrs.data);
});
// Avoid: Multiple responsibilities
scheduler.define('user-onboarding', async (job) => {
await sendWelcomeEmail(job.attrs.data);
await createUserProfile(job.attrs.data);
await setupUserPreferences(job.attrs.data);
});
2. Make Jobs Idempotent
scheduler.define('update-user-stats', async (job) => {
const { userId, date } = job.attrs.data;
// Check if already processed
const existing = await UserStats.findOne({ userId, date });
if (existing) {
return existing; // Already processed
}
// Process only if not already done
const stats = await calculateUserStats(userId, date);
return await UserStats.create({ userId, date, ...stats });
});
3. Use Meaningful Names
// Good: Descriptive names
scheduler.define('send-password-reset-email', handler);
scheduler.define('process-credit-card-payment', handler);
scheduler.define('cleanup-expired-sessions', handler);
// Avoid: Generic names
scheduler.define('email-job', handler);
scheduler.define('process-data', handler);
scheduler.define('cleanup', handler);
4. Handle Edge Cases
scheduler.define('file-upload-processor', async (job) => {
const { fileId } = job.attrs.data;
// Handle missing file
const file = await File.findById(fileId);
if (!file) {
console.log(`File ${fileId} not found, skipping`);
return;
}
// Handle already processed
if (file.processed) {
console.log(`File ${fileId} already processed`);
return;
}
// Process file
await processFile(file);
// Mark as processed
await File.updateOne({ _id: fileId }, { processed: true });
});