Service Spec: ai-worker
What this service is
A Python worker that processes AI jobs. Consumes from the ai-jobs BullMQ queue via a Node.js bridge (since BullMQ is Node-native), OR via a direct Redis queue using a Python BullMQ-compatible client. Calls LLM APIs, writes results back to the database, and notifies core-api when done.
Tech stack
- Runtime: Python 3.11
- Framework: FastAPI (for health endpoint only)
- Queue:
bullmqPython client (python-bullmq) — compatible with Node BullMQ jobs - LLM:
openaiSDK (GPT-4o) oranthropicSDK (Claude) — configurable - Database:
asyncpg+ raw SQL (orprismaPython client) - HTTP client:
httpx(async) - Validation:
pydanticv2
Folder structure
apps/ai-worker/
├── src/
│ ├── main.py # start worker + health server
│ ├── worker.py # BullMQ worker setup
│ ├── processors/
│ │ ├── rewrite_resume.py # AI resume rewrite
│ │ ├── suggest_improvements.py # AI resume suggestions
│ │ ├── generate_summary.py # AI student summary
│ │ └── generate_analytics_summary.py
│ ├── llm/
│ │ ├── client.py # unified LLM client (OpenAI or Anthropic)
│ │ └── prompts.py # all prompts as constants
│ ├── db/
│ │ └── client.py # asyncpg connection pool
│ └── health.py # FastAPI health endpoint
├── Dockerfile
├── requirements.txt
└── pyproject.tomlQueue setup
# worker.py
from bullmq import Worker
import asyncio
async def process_job(job, job_token):
match job.name:
case "rewrite-resume":
return await process_rewrite_resume(job.data)
case "suggest-improvements":
return await process_suggest_improvements(job.data)
case "generate-student-summary":
return await process_generate_summary(job.data)
case _:
raise ValueError(f"Unknown job type: {job.name}")
worker = Worker("ai-jobs", process_job, {
"connection": {
"host": REDIS_HOST,
"port": REDIS_PORT,
},
"concurrency": 3, # LLM calls are slow, don't over-parallelise
})Job types and payloads
rewrite-resume
class RewriteResumeJob(BaseModel):
tenant_id: str
student_id: str
resume_id: str
job_id: str # the job they're applying to (for context)
job_title: str
job_description: str
tone: Literal["concise", "detailed", "professional"]Processor logic:
- Fetch resume JSON from DB
- Fetch job description
- Build prompt (see below)
- Call LLM
- Parse response as resume JSON (same schema)
- Write new resume to DB with
is_ai_variant: true,base_resume_id: original_id - Mark all bullets as
verified: true(AI variants are auto-verified) - Call
core-apiinternal endpoint to notify student (POST /internal/notifications/in-app)
Prompt:
REWRITE_RESUME_PROMPT = """
You are a professional resume editor helping a student tailor their resume for a specific job.
Job Title: {job_title}
Job Description: {job_description}
Student's current resume (JSON format):
{resume_json}
Instructions:
- Rewrite the bullet points in the experience and projects sections to better match the job description
- Keep all factual information (dates, company names, titles) exactly the same
- Do not add any experience or skills the student does not have
- Improve phrasing to use active verbs and quantify impact where data is already present
- Tone: {tone}
Return ONLY valid JSON in exactly the same schema as the input resume. No preamble, no explanation.
"""Response parsing:
response_text = await llm_client.complete(prompt)
# Strip any markdown fences if model adds them
clean = response_text.strip().removeprefix("```json").removesuffix("```").strip()
rewritten_resume = ResumeSchema.model_validate_json(clean)suggest-improvements
class SuggestImprovementsJob(BaseModel):
tenant_id: str
student_id: str
resume_id: str
job_id: Optional[str] # optional: if provided, tailor suggestions to this jobReturns a list of suggestions (not a rewritten resume — the student reviews and accepts/rejects each):
class ResumeSuggestion(BaseModel):
section: str # "experience.0.bullets.2"
original: str
suggested: str
reason: str # why this change improves the resumeWritten to a resume_suggestions table. Frontend shows diff view for student to accept/reject.
generate-student-summary
class GenerateStudentSummaryJob(BaseModel):
tenant_id: str
student_id: strGenerates a 3-4 sentence AI summary of the student for the admin "Student detail" view. Stored in student_ai_summaries table with generated_at timestamp. Regenerated when student's profile is verified or significantly updated.
Prompt:
STUDENT_SUMMARY_PROMPT = """
Based on this student's profile data, write a brief 3-4 sentence professional summary
for a placement officer reviewing the student's candidacy. Mention their academic
background, most notable experience, key skills, and anything distinctive.
Be factual and concise. Do not embellish.
Student profile:
{profile_json}
"""LLM client abstraction
# llm/client.py
from abc import ABC, abstractmethod
class LLMClient(ABC):
@abstractmethod
async def complete(self, prompt: str, max_tokens: int = 2000) -> str:
pass
class OpenAIClient(LLMClient):
async def complete(self, prompt: str, max_tokens: int = 2000) -> str:
response = await self.client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": prompt}],
max_tokens=max_tokens,
temperature=0.3, # low temp for structured output tasks
)
return response.choices[0].message.content
class AnthropicClient(LLMClient):
async def complete(self, prompt: str, max_tokens: int = 2000) -> str:
response = await self.client.messages.create(
model="claude-sonnet-4-20250514",
max_tokens=max_tokens,
messages=[{"role": "user", "content": prompt}],
)
return response.content[0].text
# Select client from env var
def get_llm_client() -> LLMClient:
provider = os.environ.get("LLM_PROVIDER", "openai")
if provider == "anthropic":
return AnthropicClient(api_key=os.environ["ANTHROPIC_API_KEY"])
return OpenAIClient(api_key=os.environ["OPENAI_API_KEY"])Calling back to core-api
When AI jobs complete, the worker notifies core-api via an internal HTTP call (not via the queue — this is a direct fast call):
# After writing result to DB:
async with httpx.AsyncClient() as client:
await client.post(
f"{CORE_API_INTERNAL_URL}/internal/ai-jobs/complete",
json={
"jobId": job.id,
"jobType": job.name,
"tenantId": data.tenant_id,
"resultId": new_resume_id, # or summary ID etc
},
headers={"x-internal-secret": CORE_API_INTERNAL_SECRET},
)core-api then enqueues an in-app notification for the student.
Error handling
LLM calls can fail in these ways:
- Rate limit (429) → BullMQ retries with backoff. Max 3 attempts.
- Invalid JSON response → log the raw response, mark job as failed. Do not retry (same prompt will likely fail again). Write error to
ai_job_errorstable so the student sees "AI rewrite failed, please try again". - Context too long → truncate resume to fit context window. Log truncation.
- Timeout (>30s) → treat as failure, BullMQ retries.
Health check
FastAPI endpoint on port 4002:
@app.get("/health")
async def health():
return {
"status": "ok",
"worker_active": worker.is_running(),
"llm_provider": os.environ.get("LLM_PROVIDER", "openai"),
}Environment variables
LLM_PROVIDER=openai # or "anthropic"
OPENAI_API_KEY=sk-...
ANTHROPIC_API_KEY=sk-ant-... # if using Anthropic
REDIS_URL=redis://redis:6379
DATABASE_URL=postgresql://...
CORE_API_INTERNAL_URL=http://core-api:4000
CORE_API_INTERNAL_SECRET=...