Multiple Job Processors
Chronos supports running multiple job processors across different machines or processes, using a MongoDB-based locking mechanism to ensure jobs aren't processed multiple times.
Distributed Processing
Basic Setup
// processor1.js
const Chronos = require('chronos-jobs');
const scheduler = new Chronos({
db: { address: 'mongodb://shared-db:27017/jobs' },
name: 'processor-1' // Unique name for tracking
});
scheduler.define('email job', async job => {
await sendEmail(job.attrs.data);
});
await scheduler.start();
// processor2.js
const Chronos = require('chronos-jobs');
const scheduler = new Chronos({
db: { address: 'mongodb://shared-db:27017/jobs' },
name: 'processor-2' // Different name
});
scheduler.define('email job', async job => {
await sendEmail(job.attrs.data);
});
await scheduler.start();
Both processors can now safely process jobs from the same queue without conflicts.
Lock Configuration
Configure locking behavior to optimize for your use case:
Lock Lifetime
Specify how long a job stays locked:
scheduler.define('long task', async job => {
// This task might take up to 5 minutes
await performLongTask();
}, {
lockLifetime: 300000 // 5 minutes in milliseconds
});
Important: If a job doesn't complete within lockLifetime
, it will be unlocked and available for other processors.
Lock Limits
Control how many jobs each processor can lock:
const scheduler = new Chronos({
db: { address: 'mongodb://localhost:27017/jobs' },
lockLimit: 50, // Total jobs this processor can lock
defaultLockLimit: 10 // Default per job type
});
// Override for specific job
scheduler.define('heavy job', heavyJobHandler, {
lockLimit: 5 // Only lock 5 of these at once
});
Concurrency vs Lock Limits
Understanding the difference is crucial for optimal performance:
- Lock Limit: How many jobs can be locked (reserved) by this processor
- Concurrency: How many jobs can actually run simultaneously
const scheduler = new Chronos({
maxConcurrency: 10, // Run max 10 jobs at once
lockLimit: 50 // But lock up to 50 jobs in advance
});
This allows processors to "reserve" work while having limited execution capacity.
Clustering Example
Here's a complete example using Node.js cluster module:
// cluster-scheduler.js
const cluster = require('cluster');
const os = require('os');
if (cluster.isMaster) {
const cpuCount = os.cpus().length;
console.log(`Master ${process.pid} starting ${cpuCount} workers`);
// Create workers
for (let i = 0; i < cpuCount; i++) {
cluster.fork();
}
// Replace dead workers
cluster.on('exit', (worker, code, signal) => {
console.log(`Worker ${worker.process.pid} died. Starting new worker...`);
cluster.fork();
});
// Graceful shutdown
process.on('SIGTERM', () => {
console.log('Master received SIGTERM, shutting down workers...');
for (const id in cluster.workers) {
cluster.workers[id].kill();
}
});
} else {
// Worker process
const Chronos = require('chronos-jobs');
const scheduler = new Chronos({
db: { address: process.env.MONGODB_URL },
name: `worker-${process.pid}`,
maxConcurrency: 5,
lockLimit: 20
});
// Define jobs
scheduler.define('email job', require('./jobs/email'));
scheduler.define('image resize', require('./jobs/image-resize'));
scheduler.define('data backup', require('./jobs/backup'));
// Graceful shutdown
async function gracefulShutdown() {
console.log(`Worker ${process.pid} shutting down...`);
await scheduler.stop();
process.exit(0);
}
process.on('SIGTERM', gracefulShutdown);
process.on('SIGINT', gracefulShutdown);
// Start processing
scheduler.start().then(() => {
console.log(`Worker ${process.pid} started processing jobs`);
});
}
Docker Scaling
Scale processors using Docker:
# docker-compose.yml
version: '3.8'
services:
mongodb:
image: mongo:5
ports:
- "27017:27017"
volumes:
- mongo-data:/data/db
job-processor:
build: .
depends_on:
- mongodb
environment:
- MONGODB_URL=mongodb://mongodb:27017/jobs
- NODE_ENV=production
deploy:
replicas: 3 # Run 3 instances
restart: unless-stopped
volumes:
mongo-data:
Health Monitoring
Monitor processor health across multiple instances:
const Chronos = require('chronos-jobs');
const scheduler = new Chronos({
db: { address: process.env.MONGODB_URL },
name: `processor-${os.hostname()}-${process.pid}`
});
// Health check endpoint
app.get('/health', async (req, res) => {
try {
const stats = {
processorName: scheduler._name,
isRunning: scheduler._processInterval !== null,
lockedJobs: scheduler._lockedJobs.length,
runningJobs: scheduler._runningJobs.length,
uptime: process.uptime(),
memory: process.memoryUsage()
};
res.json({ status: 'healthy', stats });
} catch (error) {
res.status(500).json({ status: 'unhealthy', error: error.message });
}
});
Best Practices
1. Unique Processor Names
Always use unique names for identification:
const scheduler = new Chronos({
name: `${os.hostname()}-${process.pid}`
});
2. Appropriate Lock Lifetimes
Set lock lifetimes based on actual job duration:
// For quick jobs
scheduler.define('quick-job', handler, { lockLifetime: 30000 }); // 30 seconds
// For slow jobs
scheduler.define('slow-job', handler, { lockLifetime: 600000 }); // 10 minutes
3. Monitor Dead Locks
Clean up jobs locked by dead processors:
// Cleanup script
const cleanupDeadLocks = async () => {
const fiveMinutesAgo = new Date(Date.now() - 5 * 60 * 1000);
const deadJobs = await scheduler.jobs({
lockedAt: { $lt: fiveMinutesAgo },
lastFinishedAt: { $exists: false }
});
for (const job of deadJobs) {
console.log(`Unlocking dead job: ${job.attrs.name} (${job.attrs._id})`);
job.attrs.lockedAt = null;
await job.save();
}
};
// Run cleanup every 5 minutes
setInterval(cleanupDeadLocks, 5 * 60 * 1000);
4. Load Balancing Jobs
Distribute different job types across processors:
// Processor A - handles email jobs
const emailProcessor = new Chronos({ name: 'email-processor' });
emailProcessor.define('send email', emailHandler);
// Processor B - handles image jobs
const imageProcessor = new Chronos({ name: 'image-processor' });
imageProcessor.define('resize image', imageHandler);
imageProcessor.define('generate thumbnail', thumbnailHandler);