Jobs & Queues
Stacks provides a powerful job queue system for handling background tasks, built on bun-queue. Process emails, notifications, reports, and other time-consuming tasks asynchronously.
Overview
The job system helps you:
- Defer work - Process tasks in the background
- Handle failures - Automatic retries with backoff
- Scale processing - Run multiple workers
- Monitor progress - Track job status and failures
Quick Start
Dispatching Jobs
import { dispatch } from '@stacksjs/queue'
// Dispatch a job
await dispatch('send-email', {
to: 'user@example.com',
subject: 'Welcome!',
template: 'welcome',
})
// Using job classes
await job('SendWelcomeEmail').dispatch({ userId: 1 })
Creating Jobs
Create job files in app/Jobs/:
// app/Jobs/SendWelcomeEmail.ts
import { Job } from '@stacksjs/queue'
import { mail } from '@stacksjs/email'
export default class SendWelcomeEmail extends Job {
// Queue name
queue = 'emails'
// Retry attempts
tries = 3
// Seconds between retries
backoff = 60
// Maximum execution time (seconds)
timeout = 120
async handle(data: { userId: number }) {
const user = await User.find(data.userId)
await mail.send({
to: user.email,
subject: 'Welcome to our app!',
template: 'emails/welcome',
data: { name: user.name },
})
}
// Called when all retries fail
async failed(error: Error, data: { userId: number }) {
console.error(`Failed to send welcome email to user ${data.userId}:`, error)
await notifyAdmin('Welcome email failed', { userId: data.userId, error })
}
}
Dispatch Methods
Basic Dispatch
import { dispatch, dispatchSync } from '@stacksjs/queue'
// Queue the job (async processing)
await dispatch('process-order', { orderId: 123 })
// Execute immediately (sync)
await dispatchSync('process-order', { orderId: 123 })
Conditional Dispatch
import { dispatchIf, dispatchUnless } from '@stacksjs/queue'
// Dispatch only if condition is true
await dispatchIf(
user.wantsNotifications,
'send-notification',
{ userId: user.id }
)
// Dispatch unless condition is true
await dispatchUnless(
user.optedOut,
'send-marketing',
{ userId: user.id }
)
Delayed Dispatch
import { dispatchAfter } from '@stacksjs/queue'
// Dispatch after 60 seconds
await dispatchAfter(60, 'send-reminder', { userId: 1 })
// Dispatch at specific time
await dispatchAfter(
new Date('2024-12-25T00:00:00Z'),
'send-christmas-promo',
{ userId: 1 }
)
Job Chains
Execute jobs in sequence:
import { chain } from '@stacksjs/queue'
await chain()
.add('validate-order', { orderId: 1 })
.add('process-payment', { orderId: 1 })
.add('send-confirmation', { orderId: 1 })
.add('notify-warehouse', { orderId: 1 })
.dispatch()
If any job fails, the chain stops.
Batch Processing
Process jobs in parallel:
import { batch } from '@stacksjs/queue'
const result = await batch([
{ job: 'process-user', data: { userId: 1 } },
{ job: 'process-user', data: { userId: 2 } },
{ job: 'process-user', data: { userId: 3 } },
])
.allowFailures() // Continue even if some fail
.onSuccess(() => console.log('All completed'))
.onFailure((failed) => console.log(`${failed.length} failed`))
.dispatch()
console.log(`Successful: ${result.successful}`)
console.log(`Failed: ${result.failed}`)
Job Configuration
Retry Settings
export default class UnreliableJob extends Job {
// Number of retry attempts
tries = 5
// Seconds between retries (fixed)
backoff = 60
// Or exponential backoff (array of delays)
backoff = [60, 300, 900, 3600] // 1min, 5min, 15min, 1hr
// Or use backoff strategy
backoffStrategy = 'exponential' // 'linear' | 'exponential'
}
Timeouts
export default class LongRunningJob extends Job {
// Maximum execution time in seconds
timeout = 300 // 5 minutes
async handle(data: any) {
await veryLongProcess()
}
// Called if job times out
async timedOut(data: any) {
await cleanup(data)
await notifyTimeout(data)
}
}
Unique Jobs
Prevent duplicate jobs:
import { UniqueJobMiddleware } from '@stacksjs/queue'
export default class ProcessOrder extends Job {
middleware = [
new UniqueJobMiddleware({
key: (data) => `order:${data.orderId}`,
ttl: 3600, // Unique for 1 hour
})
]
}
Rate Limiting
import { RateLimitMiddleware } from '@stacksjs/queue'
export default class SendNotification extends Job {
middleware = [
new RateLimitMiddleware({
key: 'notifications',
maxAttempts: 100,
decayMinutes: 1, // 100 per minute
})
]
}
Running Workers
CLI Commands
# Start queue worker
buddy queue:work
# Specify queue
buddy queue:work --queue=emails
# Multiple queues with priority (left = highest)
buddy queue:work --queue=high,default,low
# Limit jobs processed
buddy queue:work --max-jobs=100
# Stop when empty
buddy queue:work --stop-when-empty
Programmatic Workers
import { QueueWorker, WorkerManager } from '@stacksjs/queue'
// Single worker
const worker = new QueueWorker({
queues: ['default', 'emails'],
concurrency: 5,
maxJobs: 1000,
})
await worker.start()
// Multiple workers with manager
const manager = new WorkerManager()
manager.addWorker('default', { concurrency: 3 })
manager.addWorker('emails', { concurrency: 2 })
manager.addWorker('reports', { concurrency: 1 })
await manager.start()
// Graceful shutdown
process.on('SIGTERM', async () => {
await manager.stop()
process.exit(0)
})
Failed Jobs
Handling Failures
export default class ImportData extends Job {
tries = 3
async handle(data: { fileId: number }) {
const file = await Storage.get(data.fileId)
await processFile(file)
}
async failed(error: Error, data: { fileId: number }) {
// Log the failure
console.error('Import failed:', error)
// Notify admins
await slack.send({
channel: '#alerts',
text: `Data import failed for file ${data.fileId}: ${error.message}`,
})
// Mark file as failed
await File.update(data.fileId, { status: 'failed' })
}
}
Managing Failed Jobs
import { FailedJobManager, retryFailedJob, executeFailedJobs } from '@stacksjs/queue'
const manager = new FailedJobManager()
// List failed jobs
const failedJobs = await manager.all()
// Retry a specific job
await retryFailedJob(failedJobId)
// Retry all failed jobs
await executeFailedJobs()
// Delete a failed job
await manager.delete(failedJobId)
// Clear all failed jobs
await manager.flush()
Failed Job Notifications
import { configureFailedJobNotifications } from '@stacksjs/queue'
configureFailedJobNotifications({
channels: ['email', 'slack'],
email: {
to: 'admin@example.com',
},
slack: {
webhook: process.env.SLACK_WEBHOOK_URL,
},
})
Scheduling Jobs
Schedule recurring jobs with cron expressions:
// app/Jobs/DailyReport.ts
export default class DailyReport extends Job {
// Run at 8am every day
schedule = '0 8 _ _ *'
async handle() {
const report = await generateDailyReport()
await sendReportEmail(report)
}
}
Starting the Scheduler
buddy queue:schedule
Or programmatically:
import { startScheduler, stopScheduler, getSchedulerStatus } from '@stacksjs/queue'
await startScheduler()
// Check status
const status = getSchedulerStatus()
console.log(status.running)
console.log(status.nextRunTimes)
// Manually trigger a scheduled job
await triggerJob('daily-report')
// Stop scheduler
await stopScheduler()
Queue Events
Listen for job events:
import { onQueueEvent } from '@stacksjs/queue'
onQueueEvent('job:dispatched', (payload) => {
console.log(`Job dispatched: ${payload.jobName}`)
})
onQueueEvent('job:started', (payload) => {
console.log(`Job started: ${payload.jobId}`)
})
onQueueEvent('job:completed', (payload) => {
console.log(`Job completed: ${payload.jobId} in ${payload.duration}ms`)
})
onQueueEvent('job:failed', (payload) => {
console.error(`Job failed: ${payload.jobId}`, payload.error)
})
onQueueEvent('job:retrying', (payload) => {
console.log(`Retrying job: ${payload.jobId}, attempt ${payload.attempt}`)
})
Health Checks
Monitor queue health:
import { checkQueueHealth, isQueueHealthy } from '@stacksjs/queue'
// Detailed health check
const health = await checkQueueHealth()
console.log(health.status) // 'healthy' | 'degraded' | 'unhealthy'
console.log(health.queues) // Per-queue status
console.log(health.workers) // Worker status
// Simple boolean check
if (!await isQueueHealthy()) {
await alertOps('Queue system unhealthy')
}
Testing Jobs
import { describe, expect, it } from 'bun:test'
import { fake, getFakeQueue, restore, runTestJob } from '@stacksjs/queue'
describe('SendWelcomeEmail', () => {
it('dispatches email job', async () => {
// Fake the queue (jobs won't actually process)
fake()
// Dispatch job
await dispatch('SendWelcomeEmail', { userId: 1 })
// Assert job was dispatched
const fakeQueue = getFakeQueue()
expect(fakeQueue.hasDispatched('SendWelcomeEmail')).toBe(true)
// Check job data
const jobs = fakeQueue.dispatched('SendWelcomeEmail')
expect(jobs[0].data).toEqual({ userId: 1 })
// Restore real queue
restore()
})
it('handles job execution', async () => {
// Actually run the job in test
const result = await runTestJob('SendWelcomeEmail', { userId: 1 })
expect(result.success).toBe(true)
})
})
Best Practices
DO
- Keep jobs small - One responsibility per job
- Make jobs idempotent - Safe to retry multiple times
- Set appropriate timeouts - Prevent hung jobs
- Handle failures gracefully - Implement
failed()method - Use unique constraints - Prevent duplicate processing
DON'T
- Don't process user input directly - Validate in the web request
- Don't store sensitive data - Use IDs and fetch fresh data
- Don't use too many retries - 3-5 is usually sufficient
- Don't ignore failures - Set up notifications
Related Resources
- Queue Package - Full API reference
- Scheduler Package - Cron scheduling
- Error Handling - Handling job failures
- Logging - Logging job activity