# How I Tamed AI API Rate Limits with a Simple Queue

> Source: <https://dev.to/__c1b9e06dc90a7e0a676b/how-i-tamed-ai-api-rate-limits-with-a-simple-queue-198k>
> Published: 2026-06-17 02:00:33+00:00

A few months back, I was building a content generation tool. The idea was simple: take a list of topics, hit the OpenAI API, and get SEO-optimized articles. My prototype worked great with 5 topics. Then I scaled to 50. Then 200.

That’s when the 429s started flooding my logs. Rate limited. Overloaded. Blocked.

I was frustrated. Not because the API was unstable — it’s actually very reliable — but because I hadn’t thought about the *pace* of my requests. Every failed call meant lost time, wasted retries, and eventually a complete stall while I waited for the cooldown to end.

My first attempt was naïve: just wrap the call in a `try/except`

and retry after a fixed 5 seconds.

``` python
def call_api(prompt):
    while True:
        try:
            response = openai.Completion.create(...)
            return response
        except openai.error.RateLimitError:
            time.sleep(5)
```

This worked… until I had 10 concurrent threads all sleeping at the same time, then waking up together and slamming the API again. The 429s came back in waves. Plus the fixed delay was either too short (still getting rate limited) or too long (wasting time).

I tried increasing the sleep to 30 seconds. That helped, but now my throughput was abysmal. One request every 30 seconds? For 200 topics, that’s nearly two hours. I needed something smarter.

I knew the theory — exponential backoff with jitter — but I’d never implemented it properly. Here’s what I built step-by-step.

Instead of firing requests in parallel uncontrolled, I put all tasks into a `queue.Queue`

and spawned a fixed number of worker threads. Each worker would pull a task, call the API, and if it failed, push the task back onto the queue with a delay.

``` python
import queue
import threading
import time
import random
from functools import wraps

def retry_with_exponential_backoff(max_retries=5, base_delay=1, max_delay=60):
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            retries = 0
            while True:
                try:
                    return func(*args, **kwargs)
                except openai.error.RateLimitError:
                    if retries >= max_retries:
                        raise
                    delay = min(base_delay * (2 ** retries) + random.uniform(0, 1), max_delay)
                    time.sleep(delay)
                    retries += 1
        return wrapper
    return decorator

@retry_with_exponential_backoff(max_retries=5, base_delay=2)
def call_openai(prompt):
    # ... actual API call
```

The decorator handles individual retries, but I still needed to prevent all workers from retrying at the same time. I added a global semaphore and a token bucket approach.

``` python
from threading import Semaphore, Lock
import time

class RateLimiter:
    def __init__(self, calls_per_minute=10):
        self.calls_per_minute = calls_per_minute
        self.interval = 60.0 / calls_per_minute
        self.last_call = time.time()
        self.lock = Lock()

    def wait(self):
        with self.lock:
            elapsed = time.time() - self.last_call
            if elapsed < self.interval:
                time.sleep(self.interval - elapsed)
            self.last_call = time.time()
```

Then I used this inside each worker:

```
rate_limiter = RateLimiter(calls_per_minute=10)

@retry_with_exponential_backoff()
def safe_call(prompt):
    rate_limiter.wait()
    return call_openai(prompt)
python
def worker():
    while True:
        try:
            prompt = task_queue.get(timeout=5)
        except queue.Empty:
            break
        try:
            result = safe_call(prompt)
            # store result
        except Exception as e:
            # log and potentially re-queue
            pass
        finally:
            task_queue.task_done()

num_workers = 4
task_queue = queue.Queue()
threads = []
for _ in range(num_workers):
    t = threading.Thread(target=worker)
    t.start()
    threads.append(t)

# Add all prompts to the queue
for prompt in prompts:
    task_queue.put(prompt)

task_queue.join()  # wait for all tasks to complete
```

With this setup, my 200 topics finished in about 20 minutes — a 6x improvement over the naïve approach — and I never hit a 429 past the first retry.

This pattern works great for batch jobs where latency isn’t critical. For real-time user-facing applications (like chat), you’d want a different approach — maybe pre-allocate a pool of connections or use a managed gateway that handles retries and throttling for you.

If you don’t want to build the queue and rate limiter yourself, there are services that wrap all this (like the one at `https://ai.interwestinfo.com/`

). They handle concurrency, retries, and even load balancing across multiple API keys. But for most personal projects, the 50 lines above are enough.

I’d start with the queue from day one. I’d also add proper structured logging so I can trace each request’s retry history. And I’d use `asyncio`

instead of threading to keep the code simpler — `asyncio`

’s `wait_for`

and `sleep`

make the rate limiter cleaner.

Also, I’d benchmark the optimal number of workers for my rate limit. Too many workers and they’re all sleeping; too few and you’re underutilizing the quota.

Have you hit similar walls with API rate limits? What’s your go-to pattern for handling them? I’m curious if anyone’s used a different backoff formula or a distributed queue like Redis for cross-process rate limiting. Let me know in the comments!
