Infra: Queue
Overview
BullMQ backed by Redis. No separate broker process — Redis is already running.
Two queues:
notifications— consumed bynotification-serviceai-jobs— consumed byai-worker
Job definitions (types + options) live in the shared packages/queue package so both producers (core-api) and consumers (notification-service, ai-worker) share the same types.
packages/queue structure
packages/queue/
├── src/
│ ├── index.ts # exports everything
│ ├── connection.ts # shared Redis connection config
│ ├── queues.ts # Queue instances
│ ├── jobs/
│ │ ├── notifications.ts # notification job types + options
│ │ └── ai.ts # AI job types + options
└── package.jsonShared connection
// connection.ts
import { ConnectionOptions } from 'bullmq';
export const redisConnection: ConnectionOptions = {
host: process.env.REDIS_HOST ?? 'redis',
port: Number(process.env.REDIS_PORT ?? 6379),
maxRetriesPerRequest: null, // required for BullMQ workers
};Queue instances
// queues.ts
import { Queue } from 'bullmq';
import { redisConnection } from './connection';
export const notificationsQueue = new Queue('notifications', {
connection: redisConnection,
defaultJobOptions: {
attempts: 3,
backoff: { type: 'exponential', delay: 2000 },
removeOnComplete: { age: 86400 }, // keep 24h
removeOnFail: { age: 604800 }, // keep 7 days (for debugging)
},
});
export const aiJobsQueue = new Queue('ai-jobs', {
connection: redisConnection,
defaultJobOptions: {
attempts: 3,
backoff: { type: 'exponential', delay: 5000 },
removeOnComplete: { age: 3600 },
removeOnFail: { age: 604800 },
},
});Notification job definitions
// jobs/notifications.ts
export type NotificationJob =
| { name: 'send-email'; data: SendEmailJobData }
| { name: 'send-in-app'; data: SendInAppJobData }
| { name: 'trigger-webhook'; data: TriggerWebhookJobData };
export interface SendEmailJobData {
tenantId: string;
to: string | string[];
template: EmailTemplate;
data: Record<string, unknown>;
replyTo?: string;
cc?: string[];
}
export type EmailTemplate =
| 'otp'
| 'application-submitted'
| 'application-status-changed'
| 'job-posted'
| 'verification-approved'
| 'verification-rejected'
| 'announcement'
| 'cycle-enrollment'
| 'event-reminder'
| 'debarment-notice';
export interface SendInAppJobData {
tenantId: string;
userId: string;
title: string;
body: string;
link?: string;
type: 'info' | 'success' | 'warning' | 'action_required';
}
export interface TriggerWebhookJobData {
tenantId: string;
url: string;
secret: string;
event: string;
payload: Record<string, unknown>;
}AI job definitions
// jobs/ai.ts
export type AiJob =
| { name: 'rewrite-resume'; data: RewriteResumeJobData }
| { name: 'suggest-improvements'; data: SuggestImprovementsJobData }
| { name: 'generate-student-summary'; data: GenerateStudentSummaryJobData };
export interface RewriteResumeJobData {
tenantId: string;
studentId: string;
resumeId: string;
jobId: string;
jobTitle: string;
jobDescription: string;
tone: 'concise' | 'detailed' | 'professional';
}
export interface SuggestImprovementsJobData {
tenantId: string;
studentId: string;
resumeId: string;
jobId?: string;
}
export interface GenerateStudentSummaryJobData {
tenantId: string;
studentId: string;
}Enqueuing jobs in core-api
// In any module in core-api:
import { notificationsQueue, aiJobsQueue } from '@placement/queue';
// Send email when application is submitted
await notificationsQueue.add('send-email', {
tenantId: req.tenant.id,
to: student.email,
template: 'application-submitted',
data: {
studentName: student.name,
jobTitle: job.title,
companyName: company.name,
applicationId: application.id,
},
});
// Trigger AI resume rewrite
await aiJobsQueue.add('rewrite-resume', {
tenantId: req.tenant.id,
studentId: student.id,
resumeId: resume.id,
jobId: job.id,
jobTitle: job.title,
jobDescription: job.description,
tone: req.body.tone,
}, {
jobId: `rewrite-${resume.id}-${job.id}`, // deduplicate: same resume+job = one job
});BullBoard (queue monitoring UI)
In development, mount BullBoard on core-api for visibility:
// In core-api server.ts (dev only)
import { createBullBoard } from '@bull-board/api';
import { BullMQAdapter } from '@bull-board/api/bullMQAdapter';
import { FastifyAdapter } from '@bull-board/fastify';
if (process.env.NODE_ENV !== 'production') {
const serverAdapter = new FastifyAdapter();
createBullBoard({
queues: [
new BullMQAdapter(notificationsQueue),
new BullMQAdapter(aiJobsQueue),
],
serverAdapter,
});
app.register(serverAdapter.registerPlugin(), { prefix: '/admin/queues' });
}Access at http://localhost:4000/admin/queues in local dev.
Dead letter handling
BullMQ moves jobs to the "failed" state after all retry attempts are exhausted. Failed jobs stay in Redis for 7 days (per removeOnFail config).
For production alerting: add a worker.on('failed') handler that logs to your observability stack (Datadog, Sentry, etc.) and alerts if a job has been retried 3 times and still failing.
worker.on('failed', (job, err) => {
console.error({
event: 'job_failed',
queue: worker.name,
jobId: job?.id,
jobName: job?.name,
tenantId: job?.data?.tenantId,
attempts: job?.attemptsMade,
error: err.message,
});
});