Skip to main content

Defining Jobs

Learn how to define job processors that handle your background tasks efficiently and reliably.

Basic Job Definition

Use scheduler.define() to specify what happens when a job executes:

scheduler.define('job-name', async (job) => {
// Your job logic here
console.log('Job executed with data:', job.attrs.data);
});

Job Function Signatures

Chronos supports multiple function signatures for different use cases:

scheduler.define('async-job', async (job) => {
const result = await someAsyncOperation();
return result; // Optional return value
});

Promise-based

scheduler.define('promise-job', (job) => {
return someAsyncOperation()
.then(result => {
console.log('Job completed');
return result;
});
});

Callback-based (Legacy)

scheduler.define('callback-job', (job, done) => {
someAsyncOperation((err, result) => {
if (err) {
return done(err);
}
done(); // Must call done() when finished
});
});

Synchronous

scheduler.define('sync-job', (job) => {
// Synchronous operations
const result = processSomeData(job.attrs.data);
return result;
});

Job Options

Configure job behavior with options:

scheduler.define('configured-job', jobFunction, {
concurrency: 5, // Max concurrent instances
lockLifetime: 60000, // Lock timeout (60 seconds)
priority: 'high', // Job priority
shouldSaveResult: true, // Save return value to database
lockLimit: 10 // Max locked instances
});

Concurrency Control

Limit how many instances of a job can run simultaneously:

// Only 2 instances of this job can run at once
scheduler.define('resource-intensive', async (job) => {
await heavyComputation();
}, { concurrency: 2 });

// Single instance only (no concurrency)
scheduler.define('exclusive-job', async (job) => {
await criticalOperation();
}, { concurrency: 1 });

Lock Management

Control how long jobs stay locked:

scheduler.define('long-running-job', async (job) => {
// This job might take up to 5 minutes
await longProcess();
}, {
lockLifetime: 300000 // 5 minutes
});

Priority Levels

Set job execution priority:

// High priority - runs before normal jobs
scheduler.define('urgent-notification', async (job) => {
await sendUrgentAlert();
}, { priority: 'high' });

// Low priority - runs after normal jobs
scheduler.define('cleanup-task', async (job) => {
await cleanupOldFiles();
}, { priority: 'low' });

Priority values:

  • highest (20)
  • high (10)
  • normal (0) - default
  • low (-10)
  • lowest (-20)
  • Custom numbers: any integer value

Result Persistence

Save job return values to the database:

scheduler.define('data-processor', async (job) => {
const result = await processData(job.attrs.data);
return result; // Saved to job.attrs.result if shouldSaveResult is true
}, {
shouldSaveResult: true
});

// Later, access the result
const jobs = await scheduler.jobs({ name: 'data-processor' });
console.log('Job result:', jobs[0].attrs.result);

Accessing Job Data

Jobs receive data through the job.attrs object:

scheduler.define('email-sender', async (job) => {
const { to, subject, body, priority } = job.attrs.data;
const jobId = job.attrs._id;
const scheduledAt = job.attrs.nextRunAt;

console.log(`Sending email ${jobId} scheduled for ${scheduledAt}`);
await emailService.send({ to, subject, body });
});

Job Attributes

Key job attributes available in job.attrs:

{
_id: ObjectId, // Unique job identifier
name: String, // Job name
data: Object, // Job data payload
type: String, // 'normal' or 'single'
priority: Number, // Job priority (-20 to 20)
nextRunAt: Date, // When job should run
lastRunAt: Date, // When job last ran
lastFinishedAt: Date, // When job last completed
lockedAt: Date, // When job was locked for processing
failedAt: Date, // When job failed (if applicable)
failReason: String, // Failure reason
repeatInterval: String, // Repeat pattern (if recurring)
result: Any // Job return value (if shouldSaveResult: true)
}

Error Handling

Implement robust error handling in your jobs:

Basic Error Handling

scheduler.define('api-call', async (job) => {
try {
const response = await fetch(job.attrs.data.url);
if (!response.ok) {
throw new Error(`HTTP ${response.status}: ${response.statusText}`);
}
return await response.json();
} catch (error) {
console.error('API call failed:', error.message);
throw error; // Re-throw to mark job as failed
}
});

Custom Failure Handling

scheduler.define('payment-processor', async (job) => {
const { orderId, amount } = job.attrs.data;

try {
const result = await processPayment(orderId, amount);
return result;
} catch (error) {
// Custom failure reason
job.fail(`Payment failed for order ${orderId}: ${error.message}`);

// Schedule retry for temporary failures
if (error.code === 'TEMPORARY_FAILURE') {
await scheduler.schedule('in 5 minutes', 'payment-processor', {
orderId,
amount,
retryCount: (job.attrs.data.retryCount || 0) + 1
});
}

throw error; // Still mark original job as failed
}
});

Retry Logic

scheduler.define('resilient-job', async (job) => {
const maxRetries = 3;
const retryCount = job.attrs.data.retryCount || 0;

try {
await unreliableOperation();
} catch (error) {
if (retryCount < maxRetries) {
// Schedule retry with exponential backoff
const delay = Math.pow(2, retryCount) * 1000; // 1s, 2s, 4s
await scheduler.schedule(`in ${delay} milliseconds`, 'resilient-job', {
...job.attrs.data,
retryCount: retryCount + 1
});
return; // Don't fail the original job
}

// Max retries exceeded
throw new Error(`Job failed after ${maxRetries} retries: ${error.message}`);
}
});

Long-Running Jobs

For jobs that take a long time, manage the lock lifetime:

scheduler.define('file-processor', async (job) => {
const files = job.attrs.data.files;

for (let i = 0; i < files.length; i++) {
await processFile(files[i]);

// Extend lock every 10 files
if (i % 10 === 0) {
await job.touch(); // Reset lock timeout
}
}
}, {
lockLifetime: 600000 // 10 minutes initial lock
});

Job Validation

Validate job data before processing:

function validateEmailData(data) {
const required = ['to', 'subject', 'body'];
for (const field of required) {
if (!data[field]) {
throw new Error(`Missing required field: ${field}`);
}
}

if (!/^[^\s@]+@[^\s@]+\.[^\s@]+$/.test(data.to)) {
throw new Error('Invalid email address');
}
}

scheduler.define('send-email', async (job) => {
validateEmailData(job.attrs.data);

const { to, subject, body } = job.attrs.data;
await emailService.send({ to, subject, body });
});

Dynamic Job Definitions

Define jobs dynamically based on configuration:

const emailTemplates = {
welcome: { subject: 'Welcome!', template: 'welcome.html' },
reset: { subject: 'Password Reset', template: 'reset.html' }
};

Object.entries(emailTemplates).forEach(([type, config]) => {
scheduler.define(`send-${type}-email`, async (job) => {
const { to, data } = job.attrs.data;

const html = await renderTemplate(config.template, data);
await emailService.send({
to,
subject: config.subject,
html
});
});
});

TypeScript Support

Use TypeScript for better type safety:

interface EmailJobData {
to: string;
subject: string;
body: string;
template?: string;
}

scheduler.define('typed-email', async (job: any) => {
const data: EmailJobData = job.attrs.data;

// Full type checking and IntelliSense
await emailService.send({
to: data.to,
subject: data.subject,
html: data.body
});
});

Job Definition Patterns

Factory Pattern

function createApiSyncJob(serviceName, endpoint) {
return async (job) => {
const { params } = job.attrs.data;
console.log(`Syncing ${serviceName} from ${endpoint}`);

const response = await fetch(endpoint, {
method: 'POST',
body: JSON.stringify(params)
});

return await response.json();
};
}

scheduler.define('sync-users', createApiSyncJob('Users', '/api/users/sync'));
scheduler.define('sync-orders', createApiSyncJob('Orders', '/api/orders/sync'));

Class-Based Jobs

class EmailJob {
constructor(emailService, templateEngine) {
this.emailService = emailService;
this.templateEngine = templateEngine;
}

async execute(job) {
const { to, templateName, data } = job.attrs.data;

const html = await this.templateEngine.render(templateName, data);
return await this.emailService.send({ to, html });
}
}

const emailJob = new EmailJob(emailService, templateEngine);
scheduler.define('send-templated-email', emailJob.execute.bind(emailJob));

Best Practices

1. Keep Jobs Focused

// Good: Single responsibility
scheduler.define('send-welcome-email', async (job) => {
await sendWelcomeEmail(job.attrs.data);
});

scheduler.define('create-user-profile', async (job) => {
await createUserProfile(job.attrs.data);
});

// Avoid: Multiple responsibilities
scheduler.define('user-onboarding', async (job) => {
await sendWelcomeEmail(job.attrs.data);
await createUserProfile(job.attrs.data);
await setupUserPreferences(job.attrs.data);
});

2. Make Jobs Idempotent

scheduler.define('update-user-stats', async (job) => {
const { userId, date } = job.attrs.data;

// Check if already processed
const existing = await UserStats.findOne({ userId, date });
if (existing) {
return existing; // Already processed
}

// Process only if not already done
const stats = await calculateUserStats(userId, date);
return await UserStats.create({ userId, date, ...stats });
});

3. Use Meaningful Names

// Good: Descriptive names
scheduler.define('send-password-reset-email', handler);
scheduler.define('process-credit-card-payment', handler);
scheduler.define('cleanup-expired-sessions', handler);

// Avoid: Generic names
scheduler.define('email-job', handler);
scheduler.define('process-data', handler);
scheduler.define('cleanup', handler);

4. Handle Edge Cases

scheduler.define('file-upload-processor', async (job) => {
const { fileId } = job.attrs.data;

// Handle missing file
const file = await File.findById(fileId);
if (!file) {
console.log(`File ${fileId} not found, skipping`);
return;
}

// Handle already processed
if (file.processed) {
console.log(`File ${fileId} already processed`);
return;
}

// Process file
await processFile(file);

// Mark as processed
await File.updateOne({ _id: fileId }, { processed: true });
});