cd /news/large-language-models/anthropic-batch-api-for-asynchronous… · home topics large-language-models article
[ARTICLE · art-23253] src=dev.to pub= topic=large-language-models verified=true sentiment=↑ positive

Anthropic Batch API for Asynchronous Multi-Tenant AI Processing: Cutting Claude Costs by 50% Without Sacrificing Feature Responsiveness

Anthropic's Batch API can cut Claude AI costs by 50% for asynchronous workloads, according to a developer at CitizenApp. The team reduced their monthly Claude bill from approximately $8,000 to $4,000 by offloading 65% of their document summarization, compliance classification, and policy analysis requests to batch processing. The architecture separates synchronous user-facing features from background jobs, enabling the cost savings without sacrificing responsiveness for requests that need immediate responses.

read4 min publishedJun 6, 2026

I'm going to be blunt: if you're running a SaaS that uses Claude, and you haven't implemented the Batch API yet, you're hemorrhaging money. Not metaphorically—literally 50% off the per-token cost for any request that doesn't need a response in the next 60 seconds.

At CitizenApp, we process thousands of document summaries, compliance classifications, and policy analyses daily. Our first instinct was synchronous: user uploads document → Claude responds in real-time → dashboard updates. It felt responsive. It also felt like burning cash.

Then we realized: 80% of those requests don't actually need synchronous responses. A user uploads a document for summarization? They're fine waiting 5–30 minutes. A tenant wants to bulk-classify 500 policies? That's explicitly an async job. The Batch API is built for this, and it's absurdly underutilized.

Here's how I wired it into CitizenApp—and why you should too.

The 50% cost reduction is the headline, but the real win is architectural. Batching forces you to separate concerns:

This separation is healthy. You stop cramming everything through Claude's sync API and actually think about what needs to be urgent.

In CitizenApp's case, we reduced our monthly Claude bill from ~$8k to ~$4k while actually increasing feature coverage. The key was off 65% of our workload to batches.

Here's the flow:

User Request
    ↓
FastAPI Endpoint (Validate, Enqueue)
    ↓
PostgreSQL Queue Table
    ↓
Batch Processor (reads queue, submits to Anthropic)
    ↓
Anthropic Batch Job (runs in background)
    ↓
Webhook/Polling Handler (gets results)
    ↓
PostgreSQL LISTEN/NOTIFY
    ↓
WebSocket → React 19 Dashboard (real-time update)

Let's build it.

from sqlalchemy import Column, String, Integer, Text, DateTime, Enum, ForeignKey
from sqlalchemy.orm import declarative_base
from datetime import datetime
import enum

Base = declarative_base()

class AIJobStatus(str, enum.Enum):
    QUEUED = "queued"
    SUBMITTED = "submitted"
    PROCESSING = "processing"
    COMPLETED = "completed"
    FAILED = "failed"

class AIJob(Base):
    __tablename__ = "ai_jobs"

    id = Column(String, primary_key=True)
    tenant_id = Column(String, ForeignKey("tenants.id"), nullable=False, index=True)
    user_id = Column(String, nullable=False)
    job_type = Column(String, nullable=False)  # "summarize", "classify", etc.

    status = Column(Enum(AIJobStatus), default=AIJobStatus.QUEUED, index=True)

    input_data = Column(Text, nullable=False)  # JSON stringified
    result = Column(Text, nullable=True)  # Result from Claude

    batch_id = Column(String, nullable=True, index=True)  # Anthropic batch ID
    request_id = Column(String, nullable=True)  # Within batch

    created_at = Column(DateTime, default=datetime.utcnow, index=True)
    completed_at = Column(DateTime, nullable=True)

    error_message = Column(Text, nullable=True)
python
from fastapi import APIRouter, Depends, HTTPException
from pydantic import BaseModel
import uuid
import json
from sqlalchemy.orm import Session
from database import get_db
from models import AIJob, AIJobStatus

router = APIRouter()

class SummarizeRequest(BaseModel):
    document_text: str
    max_length: int = 500

@router.post("/api/ai/summarize")
async def queue_summarize(
    request: SummarizeRequest,
    db: Session = Depends(get_db),
    tenant_id: str = Depends(get_tenant_id),
    user_id: str = Depends(get_user_id),
):
    """
    Queue a document for summarization (async).
    Returns immediately with job ID.
    User receives result via WebSocket when batch completes.
    """

    job_id = str(uuid.uuid4())

    job = AIJob(
        id=job_id,
        tenant_id=tenant_id,
        user_id=user_id,
        job_type="summarize",
        input_data=json.dumps({
            "document_text": request.document_text,
            "max_length": request.max_length,
        }),
        status=AIJobStatus.QUEUED,
    )

    db.add(job)
    db.commit()

    return {
        "job_id": job_id,
        "status": "queued",
        "message": "Your request is queued. You'll receive results in 5-30 minutes.",
    }

I prefer a separate worker process (via Celery or APScheduler) that runs periodically. Here's APScheduler for simplicity:

python
from anthropic import Anthropic
from sqlalchemy.orm import Session
from database import SessionLocal
from models import AIJob, AIJobStatus
import json
from datetime import datetime

client = Anthropic()

def process_batch_jobs():
    """
    Run every 5 minutes (via APScheduler).
    Collects queued jobs and submits to Anthropic Batch API.
    """
    db = SessionLocal()

    queued_jobs = db.query(AIJob).filter(
        AIJob.status == AIJobStatus.QUEUED
    ).limit(100).all()

    if not queued_jobs:
        db.close()
        return

    requests = []
    job_map = {}

    for job in queued_jobs:
        input_data = json.loads(job.input_data)

        if job.job_type == "summarize":
            message = f"Summarize the following document in {input_data['max_length']} words:\n\n{input_data['document_text']}"
        elif job.job_type == "classify":
            message = f"Classify this text into one of: {input_data['categories']}\n\nText: {input_data['text']}"
        else:
            continue

        request_id = job.id
        job_map[request_id] = job.id

        requests.append({
            "custom_id": request_id,
            "params": {
                "model": "claude-3-5-sonnet-20241022",
                "max_tokens": 1024,
                "messages": [{"role": "user", "content": message}],
            },
        })

    batch = client.beta.messages.batches.create(
        requests=requests,
        betas=["batch-2024-09-24"],
    )

    for job in queued_jobs:
        job.status = AIJobStatus.SUBMITTED
        job.batch_id = batch.id

    db.commit()
    print(f"Submitted batch {batch.id} with {len(requests)} requests")
    db.close()

def poll_batch_results():
    """
    Run every 30 seconds.
    Checks submitted batches for completion, stores results.
    """
    db = SessionLocal()

    submitted_jobs = db.query(AIJob).filter(
        AIJob.status == AIJobStatus.SUBMITTED
    ).all()

    batch_ids = set(job.batch_id for job in submitted_jobs)

    for batch_id in batch_ids:
        batch = client.beta.messages.batches.retrieve(batch_id, betas=["batch-2024-09-24"])

        if batch.processing_status == "in_progress":
            continue

        if batch.processing_status == "expired":
            for job in submitted_jobs:
                if job.batch_id == batch_id:
                    job.status = AIJobStatus.FAILED
                    job.error_message = "Batch expired"
            db.commit()
            continue

        results = client.beta.messages.batches.results(batch_id, betas=["batch-2024-09-24"])

        for result in results:
            request_id = result.custom_id
            job = db.query(AIJob).filter(AIJob.id == request_id).first()

            if not job:
                continue

            if result.result.type == "succeeded":
                job.status = AIJobStatus.COMPLETED
                job.result = result.result.message.content[0].text
            elif result.result.type == "errored":
                job.status = AIJobStatus.FAILED
                job.error_message = result.result.error.message

            job.completed_at = datetime.utcnow()

        db.commit()
        notify_clients_batch_complete(batch_id)

    db.close()
── more in #large-language-models 4 stories · sorted by recency
sponsored brought to you by zahid.host 4,200+ EU-deployed projects
reading about agents? ship yours in a single git push.

Run your AI side-project on zahid.host

EU-based hosting, git-push deploys, automatic HTTPS, no cold starts. Free tier with a custom domain — perfect for shipping the agent you just read about.

$git push zahid main
Live at https://your-agent.zahid.host
Get free account → Pricing
from €0/mo · no card required
LIVE [news/anthropic-batch-api-…] indexed:0 read:4min 2026-06-06 ·