Service Spec: ai-worker

What this service is

A Python worker that processes AI jobs. Consumes from the ai-jobs BullMQ queue via a Node.js bridge (since BullMQ is Node-native), OR via a direct Redis queue using a Python BullMQ-compatible client. Calls LLM APIs, writes results back to the database, and notifies core-api when done.


Tech stack

  • Runtime: Python 3.11
  • Framework: FastAPI (for health endpoint only)
  • Queue: bullmq Python client (python-bullmq) — compatible with Node BullMQ jobs
  • LLM: openai SDK (GPT-4o) or anthropic SDK (Claude) — configurable
  • Database: asyncpg + raw SQL (or prisma Python client)
  • HTTP client: httpx (async)
  • Validation: pydantic v2

Folder structure

apps/ai-worker/
├── src/
│   ├── main.py                    # start worker + health server
│   ├── worker.py                  # BullMQ worker setup
│   ├── processors/
│   │   ├── rewrite_resume.py      # AI resume rewrite
│   │   ├── suggest_improvements.py # AI resume suggestions
│   │   ├── generate_summary.py    # AI student summary
│   │   └── generate_analytics_summary.py
│   ├── llm/
│   │   ├── client.py              # unified LLM client (OpenAI or Anthropic)
│   │   └── prompts.py             # all prompts as constants
│   ├── db/
│   │   └── client.py              # asyncpg connection pool
│   └── health.py                  # FastAPI health endpoint
├── Dockerfile
├── requirements.txt
└── pyproject.toml

Queue setup

# worker.py
from bullmq import Worker
import asyncio
 
async def process_job(job, job_token):
    match job.name:
        case "rewrite-resume":
            return await process_rewrite_resume(job.data)
        case "suggest-improvements":
            return await process_suggest_improvements(job.data)
        case "generate-student-summary":
            return await process_generate_summary(job.data)
        case _:
            raise ValueError(f"Unknown job type: {job.name}")
 
worker = Worker("ai-jobs", process_job, {
    "connection": {
        "host": REDIS_HOST,
        "port": REDIS_PORT,
    },
    "concurrency": 3,   # LLM calls are slow, don't over-parallelise
})

Job types and payloads

rewrite-resume

class RewriteResumeJob(BaseModel):
    tenant_id: str
    student_id: str
    resume_id: str
    job_id: str           # the job they're applying to (for context)
    job_title: str
    job_description: str
    tone: Literal["concise", "detailed", "professional"]

Processor logic:

  1. Fetch resume JSON from DB
  2. Fetch job description
  3. Build prompt (see below)
  4. Call LLM
  5. Parse response as resume JSON (same schema)
  6. Write new resume to DB with is_ai_variant: true, base_resume_id: original_id
  7. Mark all bullets as verified: true (AI variants are auto-verified)
  8. Call core-api internal endpoint to notify student (POST /internal/notifications/in-app)

Prompt:

REWRITE_RESUME_PROMPT = """
You are a professional resume editor helping a student tailor their resume for a specific job.
 
Job Title: {job_title}
Job Description: {job_description}
 
Student's current resume (JSON format):
{resume_json}
 
Instructions:
- Rewrite the bullet points in the experience and projects sections to better match the job description
- Keep all factual information (dates, company names, titles) exactly the same
- Do not add any experience or skills the student does not have
- Improve phrasing to use active verbs and quantify impact where data is already present
- Tone: {tone}
 
Return ONLY valid JSON in exactly the same schema as the input resume. No preamble, no explanation.
"""

Response parsing:

response_text = await llm_client.complete(prompt)
# Strip any markdown fences if model adds them
clean = response_text.strip().removeprefix("```json").removesuffix("```").strip()
rewritten_resume = ResumeSchema.model_validate_json(clean)

suggest-improvements

class SuggestImprovementsJob(BaseModel):
    tenant_id: str
    student_id: str
    resume_id: str
    job_id: Optional[str]   # optional: if provided, tailor suggestions to this job

Returns a list of suggestions (not a rewritten resume — the student reviews and accepts/rejects each):

class ResumeSuggestion(BaseModel):
    section: str              # "experience.0.bullets.2"
    original: str
    suggested: str
    reason: str               # why this change improves the resume

Written to a resume_suggestions table. Frontend shows diff view for student to accept/reject.


generate-student-summary

class GenerateStudentSummaryJob(BaseModel):
    tenant_id: str
    student_id: str

Generates a 3-4 sentence AI summary of the student for the admin "Student detail" view. Stored in student_ai_summaries table with generated_at timestamp. Regenerated when student's profile is verified or significantly updated.

Prompt:

STUDENT_SUMMARY_PROMPT = """
Based on this student's profile data, write a brief 3-4 sentence professional summary 
for a placement officer reviewing the student's candidacy. Mention their academic 
background, most notable experience, key skills, and anything distinctive.
Be factual and concise. Do not embellish.
 
Student profile:
{profile_json}
"""

LLM client abstraction

# llm/client.py
from abc import ABC, abstractmethod
 
class LLMClient(ABC):
    @abstractmethod
    async def complete(self, prompt: str, max_tokens: int = 2000) -> str:
        pass
 
class OpenAIClient(LLMClient):
    async def complete(self, prompt: str, max_tokens: int = 2000) -> str:
        response = await self.client.chat.completions.create(
            model="gpt-4o",
            messages=[{"role": "user", "content": prompt}],
            max_tokens=max_tokens,
            temperature=0.3,    # low temp for structured output tasks
        )
        return response.choices[0].message.content
 
class AnthropicClient(LLMClient):
    async def complete(self, prompt: str, max_tokens: int = 2000) -> str:
        response = await self.client.messages.create(
            model="claude-sonnet-4-20250514",
            max_tokens=max_tokens,
            messages=[{"role": "user", "content": prompt}],
        )
        return response.content[0].text
 
# Select client from env var
def get_llm_client() -> LLMClient:
    provider = os.environ.get("LLM_PROVIDER", "openai")
    if provider == "anthropic":
        return AnthropicClient(api_key=os.environ["ANTHROPIC_API_KEY"])
    return OpenAIClient(api_key=os.environ["OPENAI_API_KEY"])

Calling back to core-api

When AI jobs complete, the worker notifies core-api via an internal HTTP call (not via the queue — this is a direct fast call):

# After writing result to DB:
async with httpx.AsyncClient() as client:
    await client.post(
        f"{CORE_API_INTERNAL_URL}/internal/ai-jobs/complete",
        json={
            "jobId": job.id,
            "jobType": job.name,
            "tenantId": data.tenant_id,
            "resultId": new_resume_id,    # or summary ID etc
        },
        headers={"x-internal-secret": CORE_API_INTERNAL_SECRET},
    )

core-api then enqueues an in-app notification for the student.


Error handling

LLM calls can fail in these ways:

  1. Rate limit (429) → BullMQ retries with backoff. Max 3 attempts.
  2. Invalid JSON response → log the raw response, mark job as failed. Do not retry (same prompt will likely fail again). Write error to ai_job_errors table so the student sees "AI rewrite failed, please try again".
  3. Context too long → truncate resume to fit context window. Log truncation.
  4. Timeout (>30s) → treat as failure, BullMQ retries.

Health check

FastAPI endpoint on port 4002:

@app.get("/health")
async def health():
    return {
        "status": "ok",
        "worker_active": worker.is_running(),
        "llm_provider": os.environ.get("LLM_PROVIDER", "openai"),
    }

Environment variables

LLM_PROVIDER=openai              # or "anthropic"
OPENAI_API_KEY=sk-...
ANTHROPIC_API_KEY=sk-ant-...     # if using Anthropic
REDIS_URL=redis://redis:6379
DATABASE_URL=postgresql://...
CORE_API_INTERNAL_URL=http://core-api:4000
CORE_API_INTERNAL_SECRET=...