Sandboxed Workers (Fork Mode)
Execute jobs in separate child processes for better isolation and fault tolerance.
Overview
Sandboxed workers allow you to run jobs in isolated child processes, providing several benefits:
- Process isolation - Job crashes don't affect the main application
- Memory isolation - Each job has its own memory space
- Resource limits - Control CPU and memory usage per job
- Better debugging - Easier to identify problematic jobs
- Fault tolerance - Main process continues even if jobs crash
Basic Setup
1. Create Child Worker Script
First, create a worker script that handles job execution in child processes:
// childWorker.js
const Chronos = require('chronos-jobs');
process.on('message', message => {
if (message === 'cancel') {
console.log('Received cancellation signal');
process.exit(2);
}
});
(async () => {
try {
// Get process arguments (name, jobId, and path to job definitions)
const [, , name, jobId, definitionsPath] = process.argv;
if (!name || !jobId) {
throw new Error(`Invalid parameters: ${JSON.stringify(process.argv)}`);
}
// Set descriptive process title
process.title = `${process.title} (worker: ${name}/${jobId})`;
// Connect to database
const scheduler = new Chronos({
name: `worker-${name}-${jobId}`,
forkedWorker: true,
db: { address: process.env.MONGODB_URL }
});
// Load job definitions
if (definitionsPath) {
const loadDefinitions = require(definitionsPath);
loadDefinitions(scheduler, true); // true = definitions only, no scheduling
}
// Run the specific job
await scheduler.runForkedJob(jobId);
console.log(`Job ${name}/${jobId} completed successfully`);
process.exit(0);
} catch (error) {
console.error('Worker error:', error);
// Send error back to parent if possible
if (process.send) {
process.send(JSON.stringify(error));
}
process.exit(1);
}
})();
2. Job Definitions File
Create a definitions file that can be loaded by workers:
// jobDefinitions.js
module.exports = (scheduler, definitionsOnly = false) => {
scheduler.define('send email', async (job) => {
const { to, subject, body } = job.attrs.data;
// Simulate email sending
console.log(`Sending email to ${to}: ${subject}`);
await new Promise(resolve => setTimeout(resolve, 1000));
return { sent: true, timestamp: new Date() };
});
scheduler.define('process image', async (job) => {
const { imageUrl, operations } = job.attrs.data;
// Simulate image processing
console.log(`Processing image ${imageUrl} with operations:`, operations);
await new Promise(resolve => setTimeout(resolve, 3000));
return { processed: true, outputUrl: `processed_${imageUrl}` };
});
scheduler.define('heavy calculation', async (job) => {
const { numbers } = job.attrs.data;
// CPU-intensive task
let result = 0;
for (let i = 0; i < numbers.length; i++) {
result += Math.pow(numbers[i], 2);
}
return { result, processedCount: numbers.length };
});
// Only define jobs, don't schedule them when loading in worker
if (!definitionsOnly) {
// Schedule recurring jobs here
scheduler.every('1 hour', 'cleanup task');
}
};
3. Enable Fork Mode
Configure jobs to run in fork mode:
// main.js
const Chronos = require('chronos-jobs');
const path = require('path');
const scheduler = new Chronos({
db: { address: 'mongodb://localhost:27017/scheduler' }
});
// Load job definitions in main process too
require('./jobDefinitions')(scheduler);
// Create and configure jobs for fork mode
const createForkedJob = async (name, data) => {
const job = scheduler.create(name, data);
// Enable fork mode
job.forkMode(true);
// Optional: specify worker script path
job.workerScript(path.resolve(__dirname, 'childWorker.js'));
// Optional: specify job definitions path
job.definitionsPath(path.resolve(__dirname, 'jobDefinitions.js'));
await job.save();
return job;
};
// Usage
(async () => {
await scheduler.start();
// Create forked jobs
await createForkedJob('send email', {
to: 'user@example.com',
subject: 'Welcome!',
body: 'Thanks for signing up!'
});
await createForkedJob('process image', {
imageUrl: 'image1.jpg',
operations: ['resize', 'compress']
});
})();
Advanced Configuration
Resource Limits
Control resource usage for child processes:
const { spawn } = require('child_process');
class ResourceLimitedWorker {
constructor(options = {}) {
this.maxMemory = options.maxMemory || 512; // MB
this.maxCpu = options.maxCpu || 50; // CPU percentage
this.timeout = options.timeout || 300000; // 5 minutes
}
async runJob(jobId, workerScript, definitionsPath) {
return new Promise((resolve, reject) => {
const child = spawn('node', [
'--max-old-space-size=' + this.maxMemory,
workerScript,
'job',
jobId,
definitionsPath
], {
stdio: ['pipe', 'pipe', 'pipe', 'ipc'],
env: {
...process.env,
MONGODB_URL: process.env.MONGODB_URL
}
});
let output = '';
let errorOutput = '';
const timer = setTimeout(() => {
child.kill('SIGTERM');
reject(new Error(`Job ${jobId} timed out after ${this.timeout}ms`));
}, this.timeout);
child.stdout.on('data', (data) => {
output += data.toString();
});
child.stderr.on('data', (data) => {
errorOutput += data.toString();
});
child.on('message', (message) => {
try {
const error = JSON.parse(message);
reject(new Error(error.message || 'Worker error'));
} catch (e) {
// Not an error message
console.log('Worker message:', message);
}
});
child.on('exit', (code, signal) => {
clearTimeout(timer);
if (code === 0) {
resolve({ success: true, output });
} else if (signal === 'SIGTERM') {
reject(new Error(`Job ${jobId} was terminated`));
} else {
reject(new Error(`Job ${jobId} exited with code ${code}: ${errorOutput}`));
}
});
child.on('error', (error) => {
clearTimeout(timer);
reject(error);
});
});
}
}
Worker Pool Management
Manage a pool of worker processes:
class WorkerPool {
constructor(options = {}) {
this.maxWorkers = options.maxWorkers || require('os').cpus().length;
this.workers = new Map();
this.queue = [];
this.activeJobs = 0;
}
async executeJob(jobId, workerScript, definitionsPath) {
return new Promise((resolve, reject) => {
this.queue.push({ jobId, workerScript, definitionsPath, resolve, reject });
this.processQueue();
});
}
async processQueue() {
if (this.queue.length === 0 || this.activeJobs >= this.maxWorkers) {
return;
}
const { jobId, workerScript, definitionsPath, resolve, reject } = this.queue.shift();
this.activeJobs++;
try {
const worker = new ResourceLimitedWorker({
maxMemory: 256,
timeout: 600000 // 10 minutes
});
const result = await worker.runJob(jobId, workerScript, definitionsPath);
resolve(result);
} catch (error) {
reject(error);
} finally {
this.activeJobs--;
// Process next job in queue
setImmediate(() => this.processQueue());
}
}
getStats() {
return {
activeJobs: this.activeJobs,
queuedJobs: this.queue.length,
maxWorkers: this.maxWorkers
};
}
}
const workerPool = new WorkerPool({ maxWorkers: 4 });
Error Handling and Monitoring
Implement comprehensive error handling:
class ForkJobManager {
constructor(scheduler) {
this.scheduler = scheduler;
this.workerPool = new WorkerPool();
this.jobMetrics = new Map();
this.setupEventHandlers();
}
setupEventHandlers() {
this.scheduler.on('start:forked-job', (job) => {
console.log(`Starting forked job: ${job.attrs.name} (${job.attrs._id})`);
this.jobMetrics.set(job.attrs._id, {
startTime: Date.now(),
name: job.attrs.name,
status: 'running'
});
});
this.scheduler.on('complete:forked-job', (job) => {
const metrics = this.jobMetrics.get(job.attrs._id);
if (metrics) {
metrics.duration = Date.now() - metrics.startTime;
metrics.status = 'completed';
console.log(`Forked job completed: ${job.attrs.name} in ${metrics.duration}ms`);
}
});
this.scheduler.on('fail:forked-job', (error, job) => {
const metrics = this.jobMetrics.get(job.attrs._id);
if (metrics) {
metrics.duration = Date.now() - metrics.startTime;
metrics.status = 'failed';
metrics.error = error.message;
console.error(`Forked job failed: ${job.attrs.name} after ${metrics.duration}ms`, error);
}
});
}
async runForkedJob(job) {
try {
const result = await this.workerPool.executeJob(
job.attrs._id,
job.attrs.workerScript || './childWorker.js',
job.attrs.definitionsPath || './jobDefinitions.js'
);
// Store result if configured
if (job.attrs.shouldSaveResult) {
job.attrs.result = result;
await job.save();
}
return result;
} catch (error) {
// Log error details
console.error('Forked job execution failed:', {
jobId: job.attrs._id,
jobName: job.attrs.name,
error: error.message,
workerPoolStats: this.workerPool.getStats()
});
throw error;
}
}
getMetrics() {
const metrics = Array.from(this.jobMetrics.values());
const completed = metrics.filter(m => m.status === 'completed');
const failed = metrics.filter(m => m.status === 'failed');
return {
total: metrics.length,
completed: completed.length,
failed: failed.length,
running: metrics.filter(m => m.status === 'running').length,
avgDuration: completed.length > 0
? completed.reduce((sum, m) => sum + m.duration, 0) / completed.length
: 0,
workerPool: this.workerPool.getStats()
};
}
}
Best Practices
1. Minimize Data Transfer
Keep job data small to reduce serialization overhead:
// ❌ Bad - large data objects
const job = scheduler.create('process-data', {
largeDataset: massiveArray,
configuration: complexConfig
});
// ✅ Good - use IDs and fetch data in worker
const job = scheduler.create('process-data', {
datasetId: 'dataset-123',
configId: 'config-456'
});
2. Proper Cleanup
Ensure proper cleanup in worker processes:
// childWorker.js
const cleanup = () => {
// Close database connections
if (mongoose.connection.readyState === 1) {
mongoose.connection.close();
}
// Clear any intervals/timeouts
clearInterval(heartbeatInterval);
// Release other resources
releaseResources();
};
process.on('SIGTERM', () => {
console.log('Received SIGTERM, cleaning up...');
cleanup();
process.exit(0);
});
process.on('SIGINT', () => {
console.log('Received SIGINT, cleaning up...');
cleanup();
process.exit(0);
});
3. Health Monitoring
Monitor worker health:
// Health check for forked workers
const monitorWorkers = () => {
const activeWorkers = Array.from(workerPool.workers.values());
activeWorkers.forEach(worker => {
const runtime = Date.now() - worker.startTime;
// Check for long-running workers
if (runtime > 600000) { // 10 minutes
console.warn(`Long-running worker detected: ${worker.jobId} (${runtime}ms)`);
}
// Check memory usage if available
if (worker.memoryUsage && worker.memoryUsage > 1024 * 1024 * 512) { // 512MB
console.warn(`High memory usage worker: ${worker.jobId} (${worker.memoryUsage} bytes)`);
}
});
};
setInterval(monitorWorkers, 60000); // Check every minute
Sandboxed workers provide excellent isolation and fault tolerance, making them ideal for processing untrusted code or resource-intensive jobs that might crash or consume excessive resources.