Basic Usage Examples
Get started with Chronos using these practical examples for common use cases.
Simple Job Scheduling
One-time Job
Schedule a job to run once at a specific time:
const Chronos = require('chronos-jobs');
const scheduler = new Chronos({
db: { address: 'mongodb://localhost:27017/scheduler' }
});
// Define the job
scheduler.define('send welcome email', async job => {
const { email, name } = job.attrs.data;
console.log(`Sending welcome email to ${name} (${email})`);
// Simulate email sending
await sendWelcomeEmail(email, name);
});
// Schedule it to run in 5 minutes
await scheduler.schedule('in 5 minutes', 'send welcome email', {
email: 'user@example.com',
name: 'John Doe'
});
await scheduler.start();
Recurring Job
Schedule a job to run repeatedly:
// Define a cleanup job
scheduler.define('cleanup old files', async job => {
console.log('Cleaning up old temporary files...');
const fs = require('fs').promises;
const path = require('path');
const tempDir = './temp';
const files = await fs.readdir(tempDir);
const oneHourAgo = Date.now() - (60 * 60 * 1000);
for (const file of files) {
const filePath = path.join(tempDir, file);
const stats = await fs.stat(filePath);
if (stats.mtime.getTime() < oneHourAgo) {
await fs.unlink(filePath);
console.log(`Deleted: ${file}`);
}
}
});
// Run every hour
await scheduler.every('1 hour', 'cleanup old files');
Cron-style Scheduling
Use cron expressions for precise timing:
scheduler.define('daily report', async job => {
console.log('Generating daily report...');
const report = await generateDailyReport();
await emailReport('admin@company.com', report);
});
// Run every day at 8:00 AM
await scheduler.every('0 8 * * *', 'daily report');
// Run every weekday at 9:30 AM
await scheduler.every('30 9 * * 1-5', 'weekday morning task');
// Run on the 1st of every month at midnight
await scheduler.every('0 0 1 * *', 'monthly cleanup');
Data Processing Examples
Batch Processing
Process large datasets in batches:
scheduler.define('process user data', async job => {
const { batchId, batchSize = 100 } = job.attrs.data;
console.log(`Processing batch ${batchId} with size ${batchSize}`);
// Get users to process
const users = await User.find({ processed: false })
.limit(batchSize)
.exec();
if (users.length === 0) {
console.log('No more users to process');
return { processed: 0 };
}
let processed = 0;
for (const user of users) {
try {
await processUserData(user);
user.processed = true;
await user.save();
processed++;
} catch (error) {
console.error(`Failed to process user ${user._id}:`, error);
}
}
// Schedule next batch if there are more users
if (users.length === batchSize) {
await scheduler.now('process user data', {
batchId: batchId + 1,
batchSize
});
}
return { processed };
});
// Start batch processing
await scheduler.now('process user data', { batchId: 1, batchSize: 50 });
File Processing Pipeline
Process uploaded files through multiple stages:
// Stage 1: Validate uploaded file
scheduler.define('validate file', async job => {
const { fileId, filePath } = job.attrs.data;
console.log(`Validating file: ${fileId}`);
const isValid = await validateFile(filePath);
if (isValid) {
// Schedule next stage
await scheduler.now('process file', { fileId, filePath });
} else {
// Mark file as invalid
await File.findByIdAndUpdate(fileId, {
status: 'invalid',
error: 'File validation failed'
});
}
});
// Stage 2: Process file
scheduler.define('process file', async job => {
const { fileId, filePath } = job.attrs.data;
console.log(`Processing file: ${fileId}`);
const processedPath = await processFile(filePath);
// Update database
await File.findByIdAndUpdate(fileId, {
status: 'processed',
processedPath,
processedAt: new Date()
});
// Schedule cleanup
await scheduler.schedule('in 24 hours', 'cleanup temp file', { filePath });
});
// Stage 3: Cleanup temporary files
scheduler.define('cleanup temp file', async job => {
const { filePath } = job.attrs.data;
try {
await fs.unlink(filePath);
console.log(`Cleaned up: ${filePath}`);
} catch (error) {
console.error(`Failed to cleanup ${filePath}:`, error);
}
});
Notification Systems
User Notification Queue
Process user notifications with priorities:
scheduler.define('send notification', async job => {
const { userId, type, data, priority } = job.attrs.data;
console.log(`Sending ${priority} priority ${type} notification to user ${userId}`);
const user = await User.findById(userId);
if (!user) {
throw new Error(`User ${userId} not found`);
}
switch (type) {
case 'email':
await sendEmail(user.email, data.subject, data.body);
break;
case 'push':
await sendPushNotification(user.deviceTokens, data.message);
break;
case 'sms':
await sendSMS(user.phoneNumber, data.message);
break;
}
// Log notification
await NotificationLog.create({
userId,
type,
sentAt: new Date(),
data
});
}, {
priority: 'high',
concurrency: 10
});
// Helper function to queue notifications
const queueNotification = async (userId, type, data, priority = 'normal') => {
const job = scheduler.create('send notification', {
userId,
type,
data,
priority
});
job.priority(priority);
await job.save();
};
// Usage
await queueNotification('user123', 'email', {
subject: 'Welcome to our platform!',
body: 'Thanks for signing up...'
}, 'high');
Scheduled Reminders
Send reminders at specific intervals:
scheduler.define('send reminder', async job => {
const { userId, reminderType, eventId, attempt = 1 } = job.attrs.data;
const user = await User.findById(userId);
const event = await Event.findById(eventId);
if (!event || event.cancelled) {
console.log(`Event ${eventId} cancelled, skipping reminder`);
return;
}
console.log(`Sending reminder ${attempt} for event ${eventId} to user ${userId}`);
await sendReminderEmail(user.email, event, attempt);
// Schedule next reminder if needed
if (attempt < 3 && event.date > new Date()) {
const nextReminderTime = getNextReminderTime(event.date, attempt + 1);
await scheduler.schedule(nextReminderTime, 'send reminder', {
userId,
reminderType,
eventId,
attempt: attempt + 1
});
}
});
// Function to schedule event reminders
const scheduleEventReminders = async (eventId, userIds) => {
const event = await Event.findById(eventId);
for (const userId of userIds) {
// First reminder: 24 hours before
const reminder1 = new Date(event.date.getTime() - 24 * 60 * 60 * 1000);
if (reminder1 > new Date()) {
await scheduler.schedule(reminder1, 'send reminder', {
userId,
reminderType: 'event',
eventId,
attempt: 1
});
}
}
};
Integration Examples
Webhook Processing
Process incoming webhooks asynchronously:
// Express route handler
app.post('/webhook/payment', express.json(), (req, res) => {
// Respond immediately
res.status(200).json({ received: true });
// Process webhook asynchronously
scheduler.now('process payment webhook', {
payload: req.body,
headers: req.headers,
timestamp: new Date()
});
});
scheduler.define('process payment webhook', async job => {
const { payload, headers, timestamp } = job.attrs.data;
console.log('Processing payment webhook:', payload.id);
// Verify webhook signature
if (!verifyWebhookSignature(payload, headers['x-signature'])) {
throw new Error('Invalid webhook signature');
}
// Process payment event
switch (payload.event) {
case 'payment.completed':
await handlePaymentCompleted(payload.data);
break;
case 'payment.failed':
await handlePaymentFailed(payload.data);
break;
case 'subscription.cancelled':
await handleSubscriptionCancelled(payload.data);
break;
}
// Log webhook processing
await WebhookLog.create({
eventId: payload.id,
event: payload.event,
processedAt: new Date(),
processingTime: Date.now() - timestamp.getTime()
});
});
API Rate Limiting
Implement rate-limited API calls:
scheduler.define('sync external data', async job => {
const { apiEndpoint, params, retryCount = 0 } = job.attrs.data;
console.log(`Syncing data from ${apiEndpoint} (attempt ${retryCount + 1})`);
try {
// Make rate-limited API call
const response = await makeRateLimitedRequest(apiEndpoint, params);
// Process response
await processApiResponse(response.data);
console.log(`Successfully synced data from ${apiEndpoint}`);
} catch (error) {
if (error.status === 429 && retryCount < 3) {
// Rate limited, retry with exponential backoff
const delay = Math.pow(2, retryCount) * 60000; // 1min, 2min, 4min
console.log(`Rate limited, retrying in ${delay/1000} seconds`);
await scheduler.schedule(
new Date(Date.now() + delay),
'sync external data',
{ ...job.attrs.data, retryCount: retryCount + 1 }
);
} else {
throw error;
}
}
}, {
concurrency: 1 // Ensure sequential processing for rate limiting
});
// Schedule regular syncing
await scheduler.every('5 minutes', 'sync external data', {
apiEndpoint: 'https://api.example.com/data',
params: { limit: 100 }
});
Error Handling Patterns
Retry with Exponential Backoff
scheduler.define('reliable task', async job => {
const { taskData, attempt = 1, maxRetries = 5 } = job.attrs.data;
try {
console.log(`Executing task (attempt ${attempt})`);
await performUnreliableTask(taskData);
console.log('Task completed successfully');
} catch (error) {
console.error(`Task failed (attempt ${attempt}):`, error.message);
if (attempt < maxRetries) {
// Schedule retry with exponential backoff
const delay = Math.min(Math.pow(2, attempt) * 1000, 300000); // Max 5 minutes
await scheduler.schedule(
new Date(Date.now() + delay),
'reliable task',
{
taskData,
attempt: attempt + 1,
maxRetries
}
);
console.log(`Scheduled retry ${attempt + 1} in ${delay}ms`);
} else {
// Max retries reached, handle permanent failure
await handlePermanentFailure(taskData, error);
throw new Error(`Task permanently failed after ${maxRetries} attempts: ${error.message}`);
}
}
});
These examples demonstrate common patterns and best practices for using Chronos in real-world applications. Each example can be adapted to fit your specific use case and requirements.