# Anthropic Batch API for Asynchronous Multi-Tenant AI Processing: Cutting Claude Costs by 50% Without Sacrificing Feature Responsiveness

> Source: <https://dev.to/uaslimcreate/anthropic-batch-api-for-asynchronous-multi-tenant-ai-processing-cutting-claude-costs-by-50-18gb>
> Published: 2026-06-06 08:47:58+00:00

I'm going to be blunt: if you're running a SaaS that uses Claude, and you haven't implemented the Batch API yet, you're hemorrhaging money. Not metaphorically—literally 50% off the per-token cost for any request that doesn't need a response in the next 60 seconds.

At CitizenApp, we process thousands of document summaries, compliance classifications, and policy analyses daily. Our first instinct was synchronous: user uploads document → Claude responds in real-time → dashboard updates. It felt responsive. It also felt like burning cash.

Then we realized: 80% of those requests don't actually need synchronous responses. A user uploads a document for summarization? They're fine waiting 5–30 minutes. A tenant wants to bulk-classify 500 policies? That's explicitly an async job. The Batch API is built for this, and it's absurdly underutilized.

Here's how I wired it into CitizenApp—and why you should too.

The 50% cost reduction is the headline, but the real win is architectural. Batching forces you to separate concerns:

This separation is *healthy*. You stop cramming everything through Claude's sync API and actually think about what needs to be urgent.

In CitizenApp's case, we reduced our monthly Claude bill from ~$8k to ~$4k while actually *increasing* feature coverage. The key was offloading 65% of our workload to batches.

Here's the flow:

```
User Request
    ↓
FastAPI Endpoint (Validate, Enqueue)
    ↓
PostgreSQL Queue Table
    ↓
Batch Processor (reads queue, submits to Anthropic)
    ↓
Anthropic Batch Job (runs in background)
    ↓
Webhook/Polling Handler (gets results)
    ↓
PostgreSQL LISTEN/NOTIFY
    ↓
WebSocket → React 19 Dashboard (real-time update)
```

Let's build it.

``` python
# models.py
from sqlalchemy import Column, String, Integer, Text, DateTime, Enum, ForeignKey
from sqlalchemy.orm import declarative_base
from datetime import datetime
import enum

Base = declarative_base()

class AIJobStatus(str, enum.Enum):
    QUEUED = "queued"
    SUBMITTED = "submitted"
    PROCESSING = "processing"
    COMPLETED = "completed"
    FAILED = "failed"

class AIJob(Base):
    __tablename__ = "ai_jobs"

    id = Column(String, primary_key=True)
    tenant_id = Column(String, ForeignKey("tenants.id"), nullable=False, index=True)
    user_id = Column(String, nullable=False)
    job_type = Column(String, nullable=False)  # "summarize", "classify", etc.

    status = Column(Enum(AIJobStatus), default=AIJobStatus.QUEUED, index=True)

    input_data = Column(Text, nullable=False)  # JSON stringified
    result = Column(Text, nullable=True)  # Result from Claude

    batch_id = Column(String, nullable=True, index=True)  # Anthropic batch ID
    request_id = Column(String, nullable=True)  # Within batch

    created_at = Column(DateTime, default=datetime.utcnow, index=True)
    completed_at = Column(DateTime, nullable=True)

    error_message = Column(Text, nullable=True)
python
# api/ai.py
from fastapi import APIRouter, Depends, HTTPException
from pydantic import BaseModel
import uuid
import json
from sqlalchemy.orm import Session
from database import get_db
from models import AIJob, AIJobStatus

router = APIRouter()

class SummarizeRequest(BaseModel):
    document_text: str
    max_length: int = 500

@router.post("/api/ai/summarize")
async def queue_summarize(
    request: SummarizeRequest,
    db: Session = Depends(get_db),
    tenant_id: str = Depends(get_tenant_id),
    user_id: str = Depends(get_user_id),
):
    """
    Queue a document for summarization (async).
    Returns immediately with job ID.
    User receives result via WebSocket when batch completes.
    """

    job_id = str(uuid.uuid4())

    job = AIJob(
        id=job_id,
        tenant_id=tenant_id,
        user_id=user_id,
        job_type="summarize",
        input_data=json.dumps({
            "document_text": request.document_text,
            "max_length": request.max_length,
        }),
        status=AIJobStatus.QUEUED,
    )

    db.add(job)
    db.commit()

    return {
        "job_id": job_id,
        "status": "queued",
        "message": "Your request is queued. You'll receive results in 5-30 minutes.",
    }
```

I prefer a separate worker process (via Celery or APScheduler) that runs periodically. Here's APScheduler for simplicity:

``` python
python
# workers/batch_processor.py
from anthropic import Anthropic
from sqlalchemy.orm import Session
from database import SessionLocal
from models import AIJob, AIJobStatus
import json
from datetime import datetime

client = Anthropic()

def process_batch_jobs():
    """
    Run every 5 minutes (via APScheduler).
    Collects queued jobs and submits to Anthropic Batch API.
    """
    db = SessionLocal()

    # Get all queued jobs (batch size: 10k requests per batch max)
    queued_jobs = db.query(AIJob).filter(
        AIJob.status == AIJobStatus.QUEUED
    ).limit(100).all()

    if not queued_jobs:
        db.close()
        return

    # Build batch request
    requests = []
    job_map = {}

    for job in queued_jobs:
        input_data = json.loads(job.input_data)

        if job.job_type == "summarize":
            message = f"Summarize the following document in {input_data['max_length']} words:\n\n{input_data['document_text']}"
        elif job.job_type == "classify":
            message = f"Classify this text into one of: {input_data['categories']}\n\nText: {input_data['text']}"
        else:
            continue

        request_id = job.id
        job_map[request_id] = job.id

        requests.append({
            "custom_id": request_id,
            "params": {
                "model": "claude-3-5-sonnet-20241022",
                "max_tokens": 1024,
                "messages": [{"role": "user", "content": message}],
            },
        })

    # Submit batch to Anthropic
    batch = client.beta.messages.batches.create(
        requests=requests,
        betas=["batch-2024-09-24"],
    )

    # Mark all jobs as submitted
    for job in queued_jobs:
        job.status = AIJobStatus.SUBMITTED
        job.batch_id = batch.id

    db.commit()
    print(f"Submitted batch {batch.id} with {len(requests)} requests")
    db.close()

def poll_batch_results():
    """
    Run every 30 seconds.
    Checks submitted batches for completion, stores results.
    """
    db = SessionLocal()

    # Get all submitted jobs
    submitted_jobs = db.query(AIJob).filter(
        AIJob.status == AIJobStatus.SUBMITTED
    ).all()

    batch_ids = set(job.batch_id for job in submitted_jobs)

    for batch_id in batch_ids:
        batch = client.beta.messages.batches.retrieve(batch_id, betas=["batch-2024-09-24"])

        if batch.processing_status == "in_progress":
            continue

        if batch.processing_status == "expired":
            # Mark jobs as failed
            for job in submitted_jobs:
                if job.batch_id == batch_id:
                    job.status = AIJobStatus.FAILED
                    job.error_message = "Batch expired"
            db.commit()
            continue

        # Batch complete — fetch results
        results = client.beta.messages.batches.results(batch_id, betas=["batch-2024-09-24"])

        for result in results:
            request_id = result.custom_id
            job = db.query(AIJob).filter(AIJob.id == request_id).first()

            if not job:
                continue

            if result.result.type == "succeeded":
                job.status = AIJobStatus.COMPLETED
                job.result = result.result.message.content[0].text
            elif result.result.type == "errored":
                job.status = AIJobStatus.FAILED
                job.error_message = result.result.error.message

            job.completed_at = datetime.utcnow()

        db.commit()
        # Notify connected clients (next step)
        notify_clients_batch_complete(batch_id)

    db.close()
```


