I'm going to be blunt: if you're running a SaaS that uses Claude, and you haven't implemented the Batch API yet, you're hemorrhaging money. Not metaphorically—literally 50% off the per-token cost for any request that doesn't need a response in the next 60 seconds.
At CitizenApp, we process thousands of document summaries, compliance classifications, and policy analyses daily. Our first instinct was synchronous: user uploads document → Claude responds in real-time → dashboard updates. It felt responsive. It also felt like burning cash.
Then we realized: 80% of those requests don't actually need synchronous responses. A user uploads a document for summarization? They're fine waiting 5–30 minutes. A tenant wants to bulk-classify 500 policies? That's explicitly an async job. The Batch API is built for this, and it's absurdly underutilized.
Here's how I wired it into CitizenApp—and why you should too.
The 50% cost reduction is the headline, but the real win is architectural. Batching forces you to separate concerns:
This separation is healthy. You stop cramming everything through Claude's sync API and actually think about what needs to be urgent.
In CitizenApp's case, we reduced our monthly Claude bill from ~$8k to ~$4k while actually increasing feature coverage. The key was off 65% of our workload to batches.
Here's the flow:
User Request
↓
FastAPI Endpoint (Validate, Enqueue)
↓
PostgreSQL Queue Table
↓
Batch Processor (reads queue, submits to Anthropic)
↓
Anthropic Batch Job (runs in background)
↓
Webhook/Polling Handler (gets results)
↓
PostgreSQL LISTEN/NOTIFY
↓
WebSocket → React 19 Dashboard (real-time update)
Let's build it.
from sqlalchemy import Column, String, Integer, Text, DateTime, Enum, ForeignKey
from sqlalchemy.orm import declarative_base
from datetime import datetime
import enum
Base = declarative_base()
class AIJobStatus(str, enum.Enum):
QUEUED = "queued"
SUBMITTED = "submitted"
PROCESSING = "processing"
COMPLETED = "completed"
FAILED = "failed"
class AIJob(Base):
__tablename__ = "ai_jobs"
id = Column(String, primary_key=True)
tenant_id = Column(String, ForeignKey("tenants.id"), nullable=False, index=True)
user_id = Column(String, nullable=False)
job_type = Column(String, nullable=False) # "summarize", "classify", etc.
status = Column(Enum(AIJobStatus), default=AIJobStatus.QUEUED, index=True)
input_data = Column(Text, nullable=False) # JSON stringified
result = Column(Text, nullable=True) # Result from Claude
batch_id = Column(String, nullable=True, index=True) # Anthropic batch ID
request_id = Column(String, nullable=True) # Within batch
created_at = Column(DateTime, default=datetime.utcnow, index=True)
completed_at = Column(DateTime, nullable=True)
error_message = Column(Text, nullable=True)
python
from fastapi import APIRouter, Depends, HTTPException
from pydantic import BaseModel
import uuid
import json
from sqlalchemy.orm import Session
from database import get_db
from models import AIJob, AIJobStatus
router = APIRouter()
class SummarizeRequest(BaseModel):
document_text: str
max_length: int = 500
@router.post("/api/ai/summarize")
async def queue_summarize(
request: SummarizeRequest,
db: Session = Depends(get_db),
tenant_id: str = Depends(get_tenant_id),
user_id: str = Depends(get_user_id),
):
"""
Queue a document for summarization (async).
Returns immediately with job ID.
User receives result via WebSocket when batch completes.
"""
job_id = str(uuid.uuid4())
job = AIJob(
id=job_id,
tenant_id=tenant_id,
user_id=user_id,
job_type="summarize",
input_data=json.dumps({
"document_text": request.document_text,
"max_length": request.max_length,
}),
status=AIJobStatus.QUEUED,
)
db.add(job)
db.commit()
return {
"job_id": job_id,
"status": "queued",
"message": "Your request is queued. You'll receive results in 5-30 minutes.",
}
I prefer a separate worker process (via Celery or APScheduler) that runs periodically. Here's APScheduler for simplicity:
python
from anthropic import Anthropic
from sqlalchemy.orm import Session
from database import SessionLocal
from models import AIJob, AIJobStatus
import json
from datetime import datetime
client = Anthropic()
def process_batch_jobs():
"""
Run every 5 minutes (via APScheduler).
Collects queued jobs and submits to Anthropic Batch API.
"""
db = SessionLocal()
queued_jobs = db.query(AIJob).filter(
AIJob.status == AIJobStatus.QUEUED
).limit(100).all()
if not queued_jobs:
db.close()
return
requests = []
job_map = {}
for job in queued_jobs:
input_data = json.loads(job.input_data)
if job.job_type == "summarize":
message = f"Summarize the following document in {input_data['max_length']} words:\n\n{input_data['document_text']}"
elif job.job_type == "classify":
message = f"Classify this text into one of: {input_data['categories']}\n\nText: {input_data['text']}"
else:
continue
request_id = job.id
job_map[request_id] = job.id
requests.append({
"custom_id": request_id,
"params": {
"model": "claude-3-5-sonnet-20241022",
"max_tokens": 1024,
"messages": [{"role": "user", "content": message}],
},
})
batch = client.beta.messages.batches.create(
requests=requests,
betas=["batch-2024-09-24"],
)
for job in queued_jobs:
job.status = AIJobStatus.SUBMITTED
job.batch_id = batch.id
db.commit()
print(f"Submitted batch {batch.id} with {len(requests)} requests")
db.close()
def poll_batch_results():
"""
Run every 30 seconds.
Checks submitted batches for completion, stores results.
"""
db = SessionLocal()
submitted_jobs = db.query(AIJob).filter(
AIJob.status == AIJobStatus.SUBMITTED
).all()
batch_ids = set(job.batch_id for job in submitted_jobs)
for batch_id in batch_ids:
batch = client.beta.messages.batches.retrieve(batch_id, betas=["batch-2024-09-24"])
if batch.processing_status == "in_progress":
continue
if batch.processing_status == "expired":
for job in submitted_jobs:
if job.batch_id == batch_id:
job.status = AIJobStatus.FAILED
job.error_message = "Batch expired"
db.commit()
continue
results = client.beta.messages.batches.results(batch_id, betas=["batch-2024-09-24"])
for result in results:
request_id = result.custom_id
job = db.query(AIJob).filter(AIJob.id == request_id).first()
if not job:
continue
if result.result.type == "succeeded":
job.status = AIJobStatus.COMPLETED
job.result = result.result.message.content[0].text
elif result.result.type == "errored":
job.status = AIJobStatus.FAILED
job.error_message = result.result.error.message
job.completed_at = datetime.utcnow()
db.commit()
notify_clients_batch_complete(batch_id)
db.close()