Queue Package
A powerful job queue system built on bun-queue, providing job dispatching, workers, failed job management, and scheduling capabilities.
Installation
bun add @stacksjs/queue
Basic Usage
import { dispatch, Job, Queue, Worker } from '@stacksjs/queue'
await dispatch('send-email', { to: 'user@example.com', subject: 'Welcome' })
await job('SendWelcomeEmail').dispatch({ userId: 1 })
Defining Jobs
File-Based Jobs
Create jobs in app/Jobs/:
import { Job } from '@stacksjs/queue'
export default class SendWelcomeEmail extends Job {
queue = 'emails'
tries = 3
backoff = 60
timeout = 120
async handle(data: { userId: number }) {
const user = await User.find(data.userId)
await sendEmail({
to: user.email,
template: 'welcome',
data: { name: user.name }
})
}
async failed(error: Error, data: any) {
await logFailure('SendWelcomeEmail', error, data)
}
}
Inline Jobs
import { dispatch, JobBase } from '@stacksjs/queue'
await dispatch('process-order', {
orderId: 123,
handler: async (data) => {
await processOrder(data.orderId)
}
})
class ProcessPayment extends JobBase {
queue = 'payments'
tries = 5
async handle(data: { paymentId: number }) {
await processPayment(data.paymentId)
}
}
await dispatch(new ProcessPayment(), { paymentId: 456 })
Dispatching Jobs
Basic Dispatch
import { dispatch, dispatchSync } from '@stacksjs/queue'
await dispatch('job-name', { key: 'value' })
await dispatchSync('job-name', { key: 'value' })
Conditional Dispatch
import { dispatchIf, dispatchUnless } from '@stacksjs/queue'
await dispatchIf(user.isActive, 'send-notification', { userId: user.id })
await dispatchUnless(user.optedOut, 'send-marketing', { userId: user.id })
Delayed Dispatch
import { dispatchAfter } from '@stacksjs/queue'
await dispatchAfter(60, 'reminder-email', { userId: 1 })
await dispatchAfter(new Date('2024-12-25'), 'christmas-promo', {})
Job Chains
import { dispatchChain, chain } from '@stacksjs/queue'
await dispatchChain([
{ job: 'validate-order', data: { orderId: 1 } },
{ job: 'process-payment', data: { orderId: 1 } },
{ job: 'send-confirmation', data: { orderId: 1 } }
])
await chain()
.add('step-1', { data: 'a' })
.add('step-2', { data: 'b' })
.add('step-3', { data: 'c' })
.dispatch()
Batch Processing
import { batch } from '@stacksjs/queue'
const result = await batch([
{ job: 'process-user', data: { userId: 1 } },
{ job: 'process-user', data: { userId: 2 } },
{ job: 'process-user', data: { userId: 3 } }
])
.allowFailures()
.onSuccess(() => console.log('All succeeded'))
.onFailure(() => console.log('Some failed'))
.dispatch()
console.log(result.successful)
console.log(result.failed)
Queue Workers
Starting Workers
buddy queue:work
buddy queue:work --queue=emails
buddy queue:work --queue=high,default,low
buddy queue:work --max-jobs=100
buddy queue:work --stop-when-empty
Programmatic Workers
import { Worker, QueueWorker, WorkerManager } from '@stacksjs/queue'
const worker = new QueueWorker({
queues: ['default', 'emails'],
concurrency: 5,
maxJobs: 1000
})
await worker.start()
const manager = new WorkerManager()
manager.addWorker('default', { concurrency: 3 })
manager.addWorker('emails', { concurrency: 2 })
await manager.start()
process.on('SIGTERM', async () => {
await manager.stop()
})
Worker Functions
import {
startProcessor,
stopProcessor,
isWorkerRunning,
getActiveJobCount
} from '@stacksjs/queue'
await startProcessor({
queues: ['default'],
concurrency: 5
})
const running = isWorkerRunning()
const activeJobs = getActiveJobCount()
await stopProcessor()
Failed Jobs
Managing Failed Jobs
import {
FailedJobManager,
executeFailedJobs,
retryFailedJob
} from '@stacksjs/queue'
const manager = new FailedJobManager()
const failedJobs = await manager.all()
await retryFailedJob(failedJobId)
await executeFailedJobs()
await manager.delete(failedJobId)
await manager.flush()
Failed Job Notifications
import {
configureFailedJobNotifications,
FailedJobNotifier,
notifyJobFailed
} from '@stacksjs/queue'
configureFailedJobNotifications({
channels: ['email', 'slack'],
email: {
to: 'admin@example.com'
},
slack: {
webhook: 'https://hooks.slack.com/...'
}
})
const notifier = getFailedJobNotifier()
await notifyJobFailed({
job: 'SendEmail',
error: new Error('SMTP connection failed'),
data: { to: 'user@test.com' },
attempts: 3
})
Middleware
Built-in Middleware
import {
middleware,
RateLimitMiddleware,
UniqueJobMiddleware,
ThrottleMiddleware,
WithoutOverlappingMiddleware,
SkipIfMiddleware,
FailureMiddleware
} from '@stacksjs/queue'
export default class SendNotification extends Job {
middleware = [
new RateLimitMiddleware({
key: 'notifications',
maxAttempts: 100,
decayMinutes: 1
})
]
}
export default class ProcessOrder extends Job {
middleware = [
new UniqueJobMiddleware({
key: (data) => `order:${data.orderId}`,
ttl: 3600
})
]
}
export default class SendEmail extends Job {
middleware = [
new ThrottleMiddleware({
maxAttempts: 10,
seconds: 60
})
]
}
export default class GenerateReport extends Job {
middleware = [
new WithoutOverlappingMiddleware({
key: 'report-generation',
releaseAfter: 300
})
]
}
export default class SyncData extends Job {
middleware = [
new SkipIfMiddleware(async (data) => {
return await isMaintenanceMode()
})
]
}
Priority Queues
import { PriorityQueue } from '@stacksjs/queue'
const queue = new PriorityQueue('orders')
await queue.add({ orderId: 1 }, { priority: 10 })
await queue.add({ orderId: 2 }, { priority: 5 })
await queue.add({ orderId: 3 }, { priority: 1 })
Dead Letter Queue
import { DeadLetterQueue } from '@stacksjs/queue'
const dlq = new DeadLetterQueue({
maxRetries: 3,
retentionDays: 7
})
const deadJobs = await dlq.list()
await dlq.retry(deadJobId)
await dlq.discard(deadJobId)
Queue Events
import {
QueueEvents,
onQueueEvent,
OnQueueEvent,
emitQueueEvent,
QueueMetrics
} from '@stacksjs/queue'
onQueueEvent('job:completed', (payload) => {
console.log(`Job ${payload.jobId} completed`)
})
onQueueEvent('job:failed', (payload) => {
console.log(`Job ${payload.jobId} failed: ${payload.error}`)
})
const metrics = new QueueMetrics()
console.log(await metrics.getJobStats())
console.log(await metrics.getQueueDepth('default'))
Health Checks
import {
checkQueueHealth,
isQueueHealthy,
createHealthCheckHandler
} from '@stacksjs/queue'
const health = await checkQueueHealth()
console.log(health.status)
console.log(health.queues)
console.log(health.workers)
const healthy = await isQueueHealthy()
const handler = createHealthCheckHandler()
Job Scheduling
import {
startScheduler,
stopScheduler,
getSchedulerStatus,
triggerJob,
getRegisteredJobs
} from '@stacksjs/queue'
await startScheduler()
const status = getSchedulerStatus()
console.log(status.running)
console.log(status.nextRunTimes)
const jobs = getRegisteredJobs()
await triggerJob('daily-report')
await stopScheduler()
Testing
import {
fake,
restore,
QueueTester,
createQueueTester,
getFakeQueue,
isFaked,
expectJobToFail,
runTestJob
} from '@stacksjs/queue'
fake()
await dispatch('send-email', { to: 'test@example.com' })
const fakeQueue = getFakeQueue()
const jobs = fakeQueue.dispatched('send-email')
expect(jobs).toHaveLength(1)
expect(fakeQueue.hasDispatched('send-email')).toBe(true)
expect(fakeQueue.dispatched('send-email')[0].data).toEqual({
to: 'test@example.com'
})
const result = await runTestJob('send-email', { to: 'test@example.com' })
await expectJobToFail('invalid-job', { data: 'bad' })
restore()
Job Discovery
import {
discoverJobs,
getAllJobs,
getJob,
executeJob,
getScheduledJobs,
jobRegistry
} from '@stacksjs/queue'
await discoverJobs()
const jobs = getAllJobs()
const job = getJob('SendWelcomeEmail')
await executeJob('SendWelcomeEmail', { userId: 1 })
const scheduledJobs = getScheduledJobs()
Rate Limiting & Locking
import { RateLimiter, DistributedLock } from '@stacksjs/queue'
const limiter = new RateLimiter({
key: 'api-calls',
maxAttempts: 100,
decaySeconds: 60
})
if (await limiter.attempt()) {
} else {
}
const lock = new DistributedLock('resource-key')
if (await lock.acquire(30)) {
try {
await doExclusiveWork()
} finally {
await lock.release()
}
}
Leader Election
import { LeaderElection } from '@stacksjs/queue'
const election = new LeaderElection('worker-leader')
if (await election.isLeader()) {
await processScheduledJobs()
}
Edge Cases
Handling Job Timeouts
export default class LongRunningJob extends Job {
timeout = 300
async handle(data: any) {
await veryLongProcess()
}
async timedOut(data: any) {
await cleanup(data)
}
}
Graceful Shutdown
import { gracefulShutdown } from '@stacksjs/queue'
process.on('SIGTERM', async () => {
await gracefulShutdown({
timeout: 30000,
force: false
})
process.exit(0)
})
Job Retries with Backoff
export default class UnreliableJob extends Job {
tries = 5
backoff = [60, 300, 900, 3600]
backoffStrategy = 'exponential'
}
API Reference
Dispatch Functions
| Function | Description |
dispatch(job, data) | Dispatch job to queue |
dispatchSync(job, data) | Execute job immediately |
dispatchIf(condition, job, data) | Conditional dispatch |
dispatchUnless(condition, job, data) | Inverse conditional |
dispatchAfter(delay, job, data) | Delayed dispatch |
dispatchChain(jobs) | Sequential jobs |
chain() | Chain builder |
batch(jobs) | Batch processor |
Worker Methods
| Method | Description |
startProcessor(options) | Start processing |
stopProcessor() | Stop processing |
isWorkerRunning() | Check worker status |
getActiveJobCount() | Get active jobs |
Failed Job Methods
| Method | Description |
executeFailedJobs() | Retry all failed |
retryFailedJob(id) | Retry specific job |
FailedJobManager.all() | List failed jobs |
FailedJobManager.delete(id) | Delete failed job |
FailedJobManager.flush() | Clear all failed |
Job Class Properties
| Property | Description |
queue | Queue name |
tries | Max attempts |
backoff | Retry delay (seconds) |
timeout | Max execution time |
middleware | Job middleware array |
Underlying Libraries
The Stacks queue package is built on these zero-dependency libraries from the Stacks ecosystem:
External Resources