Skip to main content

Multiple Job Processors

Chronos supports running multiple job processors across different machines or processes, using a MongoDB-based locking mechanism to ensure jobs aren't processed multiple times.

Distributed Processing

Basic Setup

// processor1.js
const Chronos = require('chronos-jobs');

const scheduler = new Chronos({
db: { address: 'mongodb://shared-db:27017/jobs' },
name: 'processor-1' // Unique name for tracking
});

scheduler.define('email job', async job => {
await sendEmail(job.attrs.data);
});

await scheduler.start();
// processor2.js
const Chronos = require('chronos-jobs');

const scheduler = new Chronos({
db: { address: 'mongodb://shared-db:27017/jobs' },
name: 'processor-2' // Different name
});

scheduler.define('email job', async job => {
await sendEmail(job.attrs.data);
});

await scheduler.start();

Both processors can now safely process jobs from the same queue without conflicts.

Lock Configuration

Configure locking behavior to optimize for your use case:

Lock Lifetime

Specify how long a job stays locked:

scheduler.define('long task', async job => {
// This task might take up to 5 minutes
await performLongTask();
}, {
lockLifetime: 300000 // 5 minutes in milliseconds
});

Important: If a job doesn't complete within lockLifetime, it will be unlocked and available for other processors.

Lock Limits

Control how many jobs each processor can lock:

const scheduler = new Chronos({
db: { address: 'mongodb://localhost:27017/jobs' },
lockLimit: 50, // Total jobs this processor can lock
defaultLockLimit: 10 // Default per job type
});

// Override for specific job
scheduler.define('heavy job', heavyJobHandler, {
lockLimit: 5 // Only lock 5 of these at once
});

Concurrency vs Lock Limits

Understanding the difference is crucial for optimal performance:

  • Lock Limit: How many jobs can be locked (reserved) by this processor
  • Concurrency: How many jobs can actually run simultaneously
const scheduler = new Chronos({
maxConcurrency: 10, // Run max 10 jobs at once
lockLimit: 50 // But lock up to 50 jobs in advance
});

This allows processors to "reserve" work while having limited execution capacity.

Clustering Example

Here's a complete example using Node.js cluster module:

// cluster-scheduler.js
const cluster = require('cluster');
const os = require('os');

if (cluster.isMaster) {
const cpuCount = os.cpus().length;

console.log(`Master ${process.pid} starting ${cpuCount} workers`);

// Create workers
for (let i = 0; i < cpuCount; i++) {
cluster.fork();
}

// Replace dead workers
cluster.on('exit', (worker, code, signal) => {
console.log(`Worker ${worker.process.pid} died. Starting new worker...`);
cluster.fork();
});

// Graceful shutdown
process.on('SIGTERM', () => {
console.log('Master received SIGTERM, shutting down workers...');
for (const id in cluster.workers) {
cluster.workers[id].kill();
}
});

} else {
// Worker process
const Chronos = require('chronos-jobs');

const scheduler = new Chronos({
db: { address: process.env.MONGODB_URL },
name: `worker-${process.pid}`,
maxConcurrency: 5,
lockLimit: 20
});

// Define jobs
scheduler.define('email job', require('./jobs/email'));
scheduler.define('image resize', require('./jobs/image-resize'));
scheduler.define('data backup', require('./jobs/backup'));

// Graceful shutdown
async function gracefulShutdown() {
console.log(`Worker ${process.pid} shutting down...`);
await scheduler.stop();
process.exit(0);
}

process.on('SIGTERM', gracefulShutdown);
process.on('SIGINT', gracefulShutdown);

// Start processing
scheduler.start().then(() => {
console.log(`Worker ${process.pid} started processing jobs`);
});
}

Docker Scaling

Scale processors using Docker:

# docker-compose.yml
version: '3.8'
services:
mongodb:
image: mongo:5
ports:
- "27017:27017"
volumes:
- mongo-data:/data/db

job-processor:
build: .
depends_on:
- mongodb
environment:
- MONGODB_URL=mongodb://mongodb:27017/jobs
- NODE_ENV=production
deploy:
replicas: 3 # Run 3 instances
restart: unless-stopped

volumes:
mongo-data:

Health Monitoring

Monitor processor health across multiple instances:

const Chronos = require('chronos-jobs');

const scheduler = new Chronos({
db: { address: process.env.MONGODB_URL },
name: `processor-${os.hostname()}-${process.pid}`
});

// Health check endpoint
app.get('/health', async (req, res) => {
try {
const stats = {
processorName: scheduler._name,
isRunning: scheduler._processInterval !== null,
lockedJobs: scheduler._lockedJobs.length,
runningJobs: scheduler._runningJobs.length,
uptime: process.uptime(),
memory: process.memoryUsage()
};

res.json({ status: 'healthy', stats });
} catch (error) {
res.status(500).json({ status: 'unhealthy', error: error.message });
}
});

Best Practices

1. Unique Processor Names

Always use unique names for identification:

const scheduler = new Chronos({
name: `${os.hostname()}-${process.pid}`
});

2. Appropriate Lock Lifetimes

Set lock lifetimes based on actual job duration:

// For quick jobs
scheduler.define('quick-job', handler, { lockLifetime: 30000 }); // 30 seconds

// For slow jobs
scheduler.define('slow-job', handler, { lockLifetime: 600000 }); // 10 minutes

3. Monitor Dead Locks

Clean up jobs locked by dead processors:

// Cleanup script
const cleanupDeadLocks = async () => {
const fiveMinutesAgo = new Date(Date.now() - 5 * 60 * 1000);

const deadJobs = await scheduler.jobs({
lockedAt: { $lt: fiveMinutesAgo },
lastFinishedAt: { $exists: false }
});

for (const job of deadJobs) {
console.log(`Unlocking dead job: ${job.attrs.name} (${job.attrs._id})`);
job.attrs.lockedAt = null;
await job.save();
}
};

// Run cleanup every 5 minutes
setInterval(cleanupDeadLocks, 5 * 60 * 1000);

4. Load Balancing Jobs

Distribute different job types across processors:

// Processor A - handles email jobs
const emailProcessor = new Chronos({ name: 'email-processor' });
emailProcessor.define('send email', emailHandler);

// Processor B - handles image jobs
const imageProcessor = new Chronos({ name: 'image-processor' });
imageProcessor.define('resize image', imageHandler);
imageProcessor.define('generate thumbnail', thumbnailHandler);