Infra: Queue

Overview

BullMQ backed by Redis. No separate broker process — Redis is already running.

Two queues:

  • notifications — consumed by notification-service
  • ai-jobs — consumed by ai-worker

Job definitions (types + options) live in the shared packages/queue package so both producers (core-api) and consumers (notification-service, ai-worker) share the same types.


packages/queue structure

packages/queue/
├── src/
│   ├── index.ts          # exports everything
│   ├── connection.ts     # shared Redis connection config
│   ├── queues.ts         # Queue instances
│   ├── jobs/
│   │   ├── notifications.ts   # notification job types + options
│   │   └── ai.ts              # AI job types + options
└── package.json

Shared connection

// connection.ts
import { ConnectionOptions } from 'bullmq';
 
export const redisConnection: ConnectionOptions = {
  host: process.env.REDIS_HOST ?? 'redis',
  port: Number(process.env.REDIS_PORT ?? 6379),
  maxRetriesPerRequest: null,  // required for BullMQ workers
};

Queue instances

// queues.ts
import { Queue } from 'bullmq';
import { redisConnection } from './connection';
 
export const notificationsQueue = new Queue('notifications', {
  connection: redisConnection,
  defaultJobOptions: {
    attempts: 3,
    backoff: { type: 'exponential', delay: 2000 },
    removeOnComplete: { age: 86400 },    // keep 24h
    removeOnFail: { age: 604800 },       // keep 7 days (for debugging)
  },
});
 
export const aiJobsQueue = new Queue('ai-jobs', {
  connection: redisConnection,
  defaultJobOptions: {
    attempts: 3,
    backoff: { type: 'exponential', delay: 5000 },
    removeOnComplete: { age: 3600 },
    removeOnFail: { age: 604800 },
  },
});

Notification job definitions

// jobs/notifications.ts
 
export type NotificationJob =
  | { name: 'send-email';      data: SendEmailJobData }
  | { name: 'send-in-app';     data: SendInAppJobData }
  | { name: 'trigger-webhook'; data: TriggerWebhookJobData };
 
export interface SendEmailJobData {
  tenantId: string;
  to: string | string[];
  template: EmailTemplate;
  data: Record<string, unknown>;
  replyTo?: string;
  cc?: string[];
}
 
export type EmailTemplate =
  | 'otp'
  | 'application-submitted'
  | 'application-status-changed'
  | 'job-posted'
  | 'verification-approved'
  | 'verification-rejected'
  | 'announcement'
  | 'cycle-enrollment'
  | 'event-reminder'
  | 'debarment-notice';
 
export interface SendInAppJobData {
  tenantId: string;
  userId: string;
  title: string;
  body: string;
  link?: string;
  type: 'info' | 'success' | 'warning' | 'action_required';
}
 
export interface TriggerWebhookJobData {
  tenantId: string;
  url: string;
  secret: string;
  event: string;
  payload: Record<string, unknown>;
}

AI job definitions

// jobs/ai.ts
 
export type AiJob =
  | { name: 'rewrite-resume';           data: RewriteResumeJobData }
  | { name: 'suggest-improvements';     data: SuggestImprovementsJobData }
  | { name: 'generate-student-summary'; data: GenerateStudentSummaryJobData };
 
export interface RewriteResumeJobData {
  tenantId: string;
  studentId: string;
  resumeId: string;
  jobId: string;
  jobTitle: string;
  jobDescription: string;
  tone: 'concise' | 'detailed' | 'professional';
}
 
export interface SuggestImprovementsJobData {
  tenantId: string;
  studentId: string;
  resumeId: string;
  jobId?: string;
}
 
export interface GenerateStudentSummaryJobData {
  tenantId: string;
  studentId: string;
}

Enqueuing jobs in core-api

// In any module in core-api:
import { notificationsQueue, aiJobsQueue } from '@placement/queue';
 
// Send email when application is submitted
await notificationsQueue.add('send-email', {
  tenantId: req.tenant.id,
  to: student.email,
  template: 'application-submitted',
  data: {
    studentName: student.name,
    jobTitle: job.title,
    companyName: company.name,
    applicationId: application.id,
  },
});
 
// Trigger AI resume rewrite
await aiJobsQueue.add('rewrite-resume', {
  tenantId: req.tenant.id,
  studentId: student.id,
  resumeId: resume.id,
  jobId: job.id,
  jobTitle: job.title,
  jobDescription: job.description,
  tone: req.body.tone,
}, {
  jobId: `rewrite-${resume.id}-${job.id}`,  // deduplicate: same resume+job = one job
});

BullBoard (queue monitoring UI)

In development, mount BullBoard on core-api for visibility:

// In core-api server.ts (dev only)
import { createBullBoard } from '@bull-board/api';
import { BullMQAdapter } from '@bull-board/api/bullMQAdapter';
import { FastifyAdapter } from '@bull-board/fastify';
 
if (process.env.NODE_ENV !== 'production') {
  const serverAdapter = new FastifyAdapter();
  createBullBoard({
    queues: [
      new BullMQAdapter(notificationsQueue),
      new BullMQAdapter(aiJobsQueue),
    ],
    serverAdapter,
  });
  app.register(serverAdapter.registerPlugin(), { prefix: '/admin/queues' });
}

Access at http://localhost:4000/admin/queues in local dev.


Dead letter handling

BullMQ moves jobs to the "failed" state after all retry attempts are exhausted. Failed jobs stay in Redis for 7 days (per removeOnFail config).

For production alerting: add a worker.on('failed') handler that logs to your observability stack (Datadog, Sentry, etc.) and alerts if a job has been retried 3 times and still failing.

worker.on('failed', (job, err) => {
  console.error({
    event: 'job_failed',
    queue: worker.name,
    jobId: job?.id,
    jobName: job?.name,
    tenantId: job?.data?.tenantId,
    attempts: job?.attemptsMade,
    error: err.message,
  });
});