Every Python Concept a Generative AI Developer Actually Needs to Know A comprehensive guide details the essential Python concepts every generative AI developer needs, emphasizing async/await for concurrent LLM calls, streaming, and RAG pipelines. It demonstrates how async coroutines can achieve up to 60x speedups by handling multiple API calls concurrently without extra hardware. From async coroutines that power real-time LLM streaming, to memory tricks that let you process million-document datasets — the complete map, written for engineers building with AI today. Most Python tutorials teach you the language. This one teaches you the language as a GenAI engineer uses it — where every concept has a direct line to a real problem you will hit building LLM pipelines, RAG systems, and AI agents. Here is the brutal truth about building LLM applications: your code spends most of its time waiting. Waiting for llm to respond . Waiting for an embedding API. Waiting for a vector database. Without async, you serve one user at a time. With async, you serve thousands concurrently — on a single thread. What actually happens when you write await When Python hits an await expression, it pauses the current coroutine and hands control back to the event loop. The event loop looks at everything else that's ready to run, makes progress on it, and returns to your coroutine once the awaited result is available. No threads. No OS context switches. Pure cooperative multitasking. python import asyncio import anthropicclient = anthropic.AsyncAnthropic async def ask claude prompt : str, label : str - str : Every await is a potential pause - but only if something else needs CPU message = await client.messages.create model = "claude-opus-4-5",max tokens = 512, messages= {"role":"user","content":prompt} return f" {label} {message.content 0 .text}"async def main : questions = "What is a transformer architecture?", "A" , "Explain RAG in one paragraph.", "B" , "What is chain-of-thought prompting?", "C" , "Describe the attention mechanism briefly.", "D" , "What is a vector database used for?", "E" , All 5 fire at once - total time ≈ slowest single call ~2s Sequential would take ~10s results = await asyncio.gather ask claude q, l for q, l in questions for r in results: print r asyncio.run main ⚡ Real-World Impact Sequential LLM calls for 100 documents × 3 seconds each = 5 minutes. With asyncio.gather they run concurrently and finish in ~3–5 seconds. That's a 60× speedup with zero extra hardware. Tasks: fire and forget then collect later asyncio.create task schedules a coroutine immediately without waiting for it. This lets you kick off parallel work and collect results later — perfect for RAG pipelines where you retrieve from a vector store and a web search at the same time. php async def rag pipeline query: str - str : phase 1 : kick off both retrievals simultaneously task vector = asyncio.create task search vector db query task web = asyncio.create task search web query both run concurrently while we do other prep work system prompt = "You are a helpful research assistant" collect results - awaiting blocks only unitl each is redy vector hits , web hits = await task vector, await task web context = build context vector hits, web hits phase 2 : single llm call with full context return await call llm system prompt, context, query total : max vector latency,web latency + llm latency Streaming tokens in real time with async generators ChatGPT-style streaming — where tokens appear as they’re generated — requires async generators. Instead of waiting for the full response, you yield each token as it arrives and forward it to the client immediately. python import anthropicclient = anthropic.AsyncAnthropic async def stream response prompt: str : """ Async generator - yields tokens as they arrive from the llm " async with client.messages.stream model = "claude-opus-4.5", max tokens = 1024, messages = {"role":"user", "content"prompt} as stream: async for text in stream.text stream: yield text each token arrives here ~50 ms apart async def handle request prompt: str : full text = "" async for token in stream response prompt : print token, end="", flush=True real-time display full text += token print return full textasyncio.run handle request "Explain diffusion models simply." Locks: protecting shared state across coroutines Even though asyncio is single-threaded, race conditions exist. If two coroutines both read-then-write a shared counter without a lock, you’ll get wrong results. asyncio.Lock ensures only one coroutine is inside the critical section at a time. python import asynciofrom collections import defualtdictrequest counts : dict str,int = defaultdict int lock = asyncio.Lock async def tracked embed Text : str , model : str - list float : async with lock: request counts model += 1 if request counts model 1000: raise RuntimeError f"Daily limit hit for {model}" return await call embedding api text,model Many powerful Python libraries — requests, some database drivers, HuggingFace's synchronous API — are blocking. You can't just slap await on them. But you also can't leave performance on the table. Threading is the answer. The GIL: what it blocks and what it doesn’t The Global Interpreter Lock GIL is a mutex in CPython that prevents more than one thread from running Python bytecode at the same time. It sounds like threading is useless — but it isn’t, because the GIL is released during: I/O operations : Network calls, file reads/writes, socket operations. The GIL releases while the OS handles I/O. → Threads work great here NumPy, PyTorch ops, SciPy — all run C code that releases the GIL for the duration. → Threads work great here Loops, string operations, pure Python math. The GIL never releases — threads don’t help. → Use multiprocessing instead threadpool embedding.py python from concurrent.futures import ThreadPoolExecutor, as completedfrom sentence transformers import SentenceTransformer blocking library - can't use asyncio but threads work finemodel = SentenceTransformer "all-MiniLM-L6-V2" def embed text text : str, idx : int - tuple : embedding = model.encode text GIL released - C extension runs return idx, embedding.tolist texts = f"Document chunk {i} " for i in range 50 with ThreadPoolExecutor max workers = 8 as pool:futures = {pool.submit embed text, t, i : i for i, t in enumerate texts } results = {} for future in as completed futures : idx, embedding = future.result results idx = embeddingprint f"Embedded {len results } chunks" python import threading, timemodel ready = threading.Event api sem = threading.Semaphore 5 max 5 concurrent inferencedef load model : print "Loading model weights..." time.sleep 3 simulate loading 7B param model model ready.set unblocks ALL waiting threads at once print "Model ready " def inference worker worker id: int : model ready.wait block here until model is loaded with api sem: at most 5 simultaneous inference calls print f"Worker {worker id}: running inference" time.sleep 0.5 simulate inferenceloader = threading.Thread target=load model, daemon=True workers = threading.Thread target=inference worker, args= i, for i in range 12 loader.start for w in workers: w.start for w in workers: w.join Tokenising 10 million documents. Computing cosine similarity across a 100K-embedding matrix. Running feature extraction before model training. This is CPU-bound work — and threading won’t help you. You need multiprocessing: separate OS processes, each with their own Python interpreter and GIL, running truly in parallel. python from concurrent.futures import ProcessPoolExecutorfrom transformers import AutoTokenizerimport multiprocessing as mptokenizer = None process-local - each worker initialises its owndef init worker : """Called once per process. Avoids re-loading the tokenizer for every job.""" global tokenizer tokenizer = AutoTokenizer.from pretrained "gpt2" def tokenise chunk text: str - dict: Pure CPU work - runs across all cores simultaneously tokens = tokenizer text, truncation=True, max length=512, padding="max length", return tensors=None return { "input ids": tokens "input ids" , "attention mask": tokens "attention mask" , "token count": sum tokens "attention mask" }def preprocess dataset texts: list str - list dict : n = mp.cpu count print f"Using {n} cores to process {len texts } texts" with ProcessPoolExecutor max workers=n, initializer=init worker as pool: return list pool.map tokenise chunk, texts, chunksize=128 if name == " main ": REQUIRED on Windows / macOS docs = f"Training document number {i}" for i in range 100 000 tokenised = preprocess dataset docs print f"Done - {len tokenised } chunks tokenised" Passing large NumPy arrays through Queue serialises them through pickle — slow and memory-intensive. multiprocessing.shared memory lets all processes read and write the same raw memory block. For embedding matrices, this is transformative. python import numpy as np from multiprocessing import shared memory , Processdef fill shard shm name: str, shape: tuple, start: int, end: int : Attach to existing shared block - no data copying shm = shared memory.SharedMemory name=shm name matrix = np.ndarray shape, dtype=np.float32, buffer=shm.buf for i in range start, end : matrix i = np.random.randn 1536 .astype np.float32 shm.close if name == " main ": N, DIM = 50 000, 1536 shape = N, DIM One allocation shared by all processes - 300MB once, not 4×300MB shm = shared memory.SharedMemory create=True, size=N DIM 4 matrix = np.ndarray shape, dtype=np.float32, buffer=shm.buf chunk = N // 4 procs = Process target=fill shard, args= shm.name, shape, i chunk, i+1 chunk for i in range 4 for p in procs: p.start for p in procs: p.join print f"Matrix ready: {matrix.shape}" shm.unlink free the OS-level shared block Pre-training data for a modern LLM is measured in terabytes. You cannot load it into RAM. You need to stream it, process it, and feed it to the model one batch at a time — and generators are exactly the right tool for this. The fundamental idea: yield is a pause button A regular function runs to completion and returns once. A generator function runs to a yield, returns that value, and then pauses — preserving all local state — until the caller asks for the next value. No list is ever built. Memory usage stays constant. python import josnfrom pathlib import PAth import itertools def stream josnl path: str : """Yield one record at a time from a multi-GB JSONL file""" with open path, encoding = "utf-8" as f : for line in f : if line.strip : yield josn.loads line one record in memory at a timedef clean records : """Filter + transform - another generator, no new list.""" for r in records: text = r.get "text", "" .strip if len text = 100: yield {"text": text, "source": r.get "url", "unknown" }def batch iterable, size: int : """Yield fixed-size batches. Classic pattern for mini-batch training.""" buf = for item in iterable: buf.append item if len buf == size: yield buf buf = if buf: yield buf Compose: the ENTIRE file is processed with O batch size memorypipeline = batch clean stream jsonl "training.jsonl" , size=32 for mini batch in pipeline: print f"Training on batch of {len mini batch } documents" trainer.step mini batch Bonus: chain multiple datasets seamlessly with itertoolsmulti dataset = itertools.chain.from iterable stream jsonl p for p in "data1.jsonl", "data2.jsonl", "data3.jsonl" 💡 Generator vs List Comprehension x for x in data builds a full list in RAM. x for x in data is a lazy generator expression — use it when you only need to iterate once, especially over large datasets. For 1M embeddings at dim=1536, the difference is 6 GB vs a few hundred bytes. Every LLM call needs logging . Every external API call needs retries. Every expensive computation needs caching. You don’t want this logic scattered through your code — you want it applied cleanly via decorators. python import functools , time , asyncio, loggingfrom typing import Callable def trace llm func: Callable - Callable: a Callable is anything that you can put parentheses after and execute. This includes standard functions, lambda expressions, and even classes that implement the special call dunder method """Log every LLM call: inputs, latency, and any errors.""" @functools.wraps func async def wrapper args, kwargs : start = time.perf counter logging.info f"→ {func. name }" try: result = await func args, kwargs ms = time.perf counter - start 1000 logging.info f"✓ {func. name } completed in {ms:.0f}ms" return result except Exception as e: ms = time.perf counter - start 1000 logging.error f"✗ {func. name } failed after {ms:.0f}ms: {e}" raise return wrapperdef retry max attempts: int = 3, base delay: float = 1.0 : """Exponential backoff retry for async functions.""" def decorator func : @functools.wraps func async def wrapper args, kwargs : for attempt in range max attempts : try: return await func args, kwargs except ConnectionError, TimeoutError as e: if attempt == max attempts - 1: raise delay = base delay 2 attempt print f"Retry {attempt+1}/{max attempts} in {delay}s: {e}" await asyncio.sleep delay return wrapper return decorator@trace llm@retry max attempts=3 async def generate prompt: str - str: Your actual LLM call goes here return "response" Handling Dynamic or Unknown Arguments Sometimes you are writing something highly dynamic — like a decorator that logs latency or adds a retry loop exactly like your @trace llm or @retry implementations . You don't know ahead of time what arguments the underlying function will take. For those cases, you use ... an ellipsis to tell Python: "This is a function, but it can accept any arguments." python from typing import Callable, Anyimport functoolsdef simple logger func: Callable ..., Any - Callable ..., Any : @functools.wraps func def wrapper args, kwargs : print f"Calling function: {func. name }" return func args, kwargs return wrapper lru cache: free performance for embedding lookups Embedding API calls are expensive. If your app re-embeds the same query text multiple times, you’re burning money. @functools.lru cache memoizes function results in memory — identical inputs return the cached result instantly. python import functools @functools.lru cache maxsize=10000 def get embedding text : str, model : str - tuple float, ... : note : returns tulpe hashable not list, so it can be cached. embedding = call embedding api text, model return tuple embedding first call : hit the api ~100 ms v1 = get embedding "What is RAG ?" , "text-embedding-3-small" second call : returns instantly from cache 0ms v2 = get embedding "What is RAG?" , "text-embedding-3-small" print get embedding.cache info CacheInfo hits=1, misses=1, maxsize=10000, currsize=1 partial: create specialised LLM callers from a general functiondef call llm prompt, model, temperature, max tokens : ...creative = functools.partial call llm, model="claude-opus-4-5", temperature=0.9, max tokens=2048 analyst = functools.partial call llm, model="claude-opus-4-5", temperature=0.1, max tokens=512 Functools Think offunctools as amodifier toolkit for functions. In Python, functions are “first-class citizens”, meaning you can pass them around like variables, return them from other functions, module provides built-in tools to adapt , enhance and cache those functions without rewriting their core code. Here are few important tools in the functools module. 1. functools.lru cache The memory saver : A built in cache decorator. “ LRU” stands for Least Recently Used . How it works : It acts like asticky note on a function.The firstitme , the function get called with a specific input , python does the heavy work and saves the result .The next time , it get called with the same input, python completely skips running the function and instantly hands the saved result. python import functools@functools.lru cache maxsize=128 def fetch embedding text: str : Pretend this is a slow, expensive API call to OpenAI return call embedding api text 2.functools.wraps The identity preserver : A decorator used inside the custom decorators. How it works:When you wrap a function in a decorator, you accidentally overwrite its metadata like its name and docstring with the decorator’s internal wrapper function. @functools.wraps copies the original function's identity back onto the final product. GenAI Use Case:If you are building a custom @trace llm or @retry api decorator for an agent framework, youmustuse @wraps. Without it, debugging tools, logging frameworks, and IDE autocompletes will think every single function in your codebase is named wrapper. python def my decorator func : @functools.wraps func <-- Keeps the original function's name intact def wrapper args, kwargs : return func args, kwargs return wrapper 3. functools.partial The present factory What it is:A way to freeze a few arguments of an existing function to create a new, specialized function. How it works:Imagine a generalized function that takes 4 arguments. You can use partial to lock in 3 of those arguments, giving you a new simplified function that only requires 1 argument to run. GenAI Use Case:Configuring different flavors of an LLM. You can take a base call llm prompt, model, temperature function and instantly manufacture a creative writer high temperature and a code analyst low temperature without writing multiple distinct functions. python from functools import partialdef call llm prompt, model, temperature : ... Freeze the model and temperature to make specific toolscreative bot = partial call llm, model="claude-3-5", temperature=0.9 strict bot = partial call llm, model="claude-3-5", temperature=0.1 Now you only need to pass the prompt creative bot prompt="Write a poem about a GPU" 4. functools.reduce The Chain Reaction What it is:A tool that applies a function cumulatively to a list of items from left to right, reducing the list down to a single value. How it works:If you have a list 1, 2, 3, 4 and an addition function,reduce will add 1+2 3 , then add that result to 3 6 , then add that result to 4 10 . GenAI Use Case:Sequential processing pipelines. If you have an initial user prompt and an array of text-cleaning steps strip whitespace $\rightarrow$ lower case $\rightarrow$ filter profanity $\rightarrow$ add system context , you can usereduce to cleanly thread the text through the entire pipeline array in a single line. python from functools import reducefuncs = str.strip, str.lower, remove profanity raw prompt = " URGENT: Fix this code " Applies each cleaning function to the result of the last oneclean prompt = reduce lambda text, func: func text , funcs, raw prompt AI applications manage expensive, limited resources: GPU memory, HTTP connection pools, DB connection pools, temporary model checkpoints. When an exception happens mid-pipeline — and it will — you need to guarantee cleanup. Context managers are that guarantee. python from contextlib import contextmanager, asynccontextmanagerimport time, uuid@contextmanagerdef pipeline span name : str : """Trace any code block with start /end timinig and error capture""" span id = str uuid.uuid4 :8 start = time.pref counter print f" START {name} {span id}}" try : yield span id except Exception as e : print f" ERROR {name} : {e}" raise finally : ms = time.perf counter - start 1000 print f" END {name} - {ms:.1f}ms" nest spans to build a full trace treewith pipeline span "full rag" : with pipeline span "retrieval" : time.sleep 0.05 with pipeline span "llm call" : time.sleep 0.12 For dynamically many resources: ExitStackfrom contextlib import ExitStackdef load model shards shard paths: list str : with ExitStack as stack: Open an unknown number of shards - all cleaned up on exit handles = stack.enter context open p, "rb" for p in shard paths stack.callback lambda: print "All shards closed" for i, fh in enumerate handles : header = fh.read 256 print f"Shard {i}: read {len header } bytes" LLMs are probabilistic. They don’t always produce valid JSON, the right fields, or values in the expected range. Your code has to validate, parse, and handle errors — and the Python type system with Pydantic makes this robust and readable. python from pydantic import BaseModel, Field, validatorfrom dataclasses import dataclass, fieldfrom typing import Literalimport json, anthropic Dataclasses: lightweight typed containers for internal use@dataclassclass LLMResponse: content: str model: str input tokens: int output tokens: int latency ms: float finish reason: Literal "end turn", "max tokens", "stop sequence" metadata: dict = field default factory=dict @property def total tokens self - int: return self.input tokens + self.output tokens Pydantic: for LLM-produced JSON that must be validated at runtimeclass ExtractedFact BaseModel : basemodel - blueprint that defines exactly what fields a piece of data must have along with their expected types. claim: str = Field description="The factual claim" confidence: float = Field ge=0.0, le=1.0, description="0-1 confidence" source: str = Field description="Quote from source text" class FactExtractionResult BaseModel : facts: list ExtractedFact summary: str @validator "facts" while ype hints str,float,int check the shape of the data, validators check the quality and rules of the data def at least one cls, v : if not v: raise ValueError "Must extract at least one fact" return vasync def extract facts text: str - FactExtractionResult: client = anthropic.AsyncAnthropic resp = await client.messages.create model="claude-opus-4-5", max tokens=1024, system="Extract facts. Return ONLY valid JSON, no markdown.", messages= {"role": "user", "content": text} raw = resp.content 0 .text return FactExtractionResult json.loads raw validates or raises Behind the Scenes: What Happens on Failure? When your pipeline callsFactExtractionResult json.loads raw llm text , Pydantic evaluates the types first, then runs your validator functions sequentially. If any condition fails, it raises aValidationError. Instead of crashing silently or poisoning your database with invalid entries, you can catch this specific error in atry/except block and automatically trigger a retry prompt back to the LLM e.g., "Your previous output failed validation because the confidence score was out of bounds. Please fix it." Protocols: write components that work with any LLM backend Protocol introduces Duck Typing directly into Python’s static type-hinting system. It allows you to define a contract based entirely on behavior methods and properties rather than family history. It stems from the old engineering phrase: "If it walks like a duck and quacks like a duck, treat it like a duck." What is a Protocol? The Structural Blueprint A Protocol is an invisible contract. It defines a list of methods and attributes that a class must have to be considered valid, but your classes never have to explicitly inherit from it. They just have to match the design. Real-World Example Imagine you are building an AI agent framework and want to support multiple vector databases like Chroma, Pinecone, or Milvus . Instead of forcing every database driver to inherit from a shared base class, you define a structural Protocol: python from typing import Protocol, runtime checkable@runtime checkableclass Embedder Protocol : """Any class that implements embed satisfies this - no inheritance.""" async def embed self, text: str - list float : ... @property def dimension self - int: ...@runtime checkableclass VectorStore Protocol : async def upsert self, doc id: str, vec: list float , meta: dict - None: ... async def search self, vec: list float , top k: int - list dict : ...class RAGPipeline: """Backend-agnostic: works with OpenAI, Cohere, or any compliant Embedder.""" def init self, embedder: Embedder, store: VectorStore : self.embedder = embedder self.store = store async def ingest self, doc id: str, text: str : vec = await self.embedder.embed text await self.store.upsert doc id, vec, {"text": text} async def retrieve self, query: str, top k: int = 5 - list dict : q vec = await self.embedder.embed query return await self.store.search q vec, top k What is@runtime checkable? The Reality Check By default, a Protocolonly exists for static type checkers the squiggly lines in VS Code or your pre-commit tests . Once your Python script is actively running on a production server, Protocols vanish from memory. If you try to use a standard Python isinstance check at runtime to verify if a class fits your protocol, Python will crash with a severe error: TypeError: Instance and class checks can only be used with @runtime checkable protocols. Adding the @runtime checkable decorator solves this. It tells Python's live runtime engine:"When I runisinstance obj, Protocol , actually inspect the object, look at its available methods, and see if it qualifies." python from typing import Protocol, runtime checkable@runtime checkableclass VectorStore Protocol : async def search self, vec: list float , top k: int - list dict : ... --- Inside your core agent pipeline ---def initialize pipeline db plugin: Any : This live check ONLY works because we used @runtime checkable if not isinstance db plugin, VectorStore : raise ValueError "The provided plugin is missing a valid search method " Why this matters for GenAI Developers When building production-grade AI applications, vendor lock-in is a constant risk. LLM providers change, vector databases evolve, and embedding models shift. Using Protocol combined with @runtime checkable allows you to write perfectly swappable components. Your core RAG pipeline can accept any object a developer passes to it, as long as it fulfills the structural methods required by your framework. A 7B parameter model in float16 occupies 14 GB of RAM. A 70B model needs 140 GB. Even working with embeddings at scale is a memory challenge. Understanding Python’s memory model isn’t academic — it directly determines what you can run and how fast. python import sys,gc,weakref slots : eliminates per-instance dict - saves ~40% memoryclass TokenSlotted: slots = 'id', 'text', 'logprob' def init self, id, text, logprob : self.id = id; self.text = text; self.logprob = logprobclass TokenDict: def init self, id, text, logprob : self.id = id; self.text = text; self.logprob = logprobs = TokenSlotted 1, "hello", -0.5 d = TokenDict 1, "hello", -0.5 print f"Slotted: {sys.getsizeof s } bytes" ~56 bytesprint f"Dict: {sys.getsizeof d } bytes" ~232 bytes For 1M tokens: saves ~176MB RAM weakref: cache without preventing garbage collectionimport weakrefclass EmbeddingCache: def init self : self. cache: dict str, weakref.ref = {} def put self, key: str, arr : self. cache key = weakref.ref arr def get self, key: str : ref = self. cache.get key return ref if ref else None Returns None if the array was garbage-collected tracemalloc: find exactly what's consuming memoryimport tracemalloctracemalloc.start import numpy as npembeddings = np.random.randn 10 000, 1536 .astype np.float32 60MBsnapshot = tracemalloc.take snapshot top stats = snapshot.statistics "lineno" print f"Top allocation: {top stats 0 .size / 1e6:.1f}MB" sys system inspector — the gateway to python’s internal engine and the os .sys.getsizeof — looks at python object and tells exactly how many bytes of RAM it is consuming, slots — drops an object memory footprintfrom~232 bytes down to ~56 bytes. When processing millions of tokens or database chunks, sys lets you mathematically audit your RAM savings. gc garbage collector —automated cleaning crew. python automatically deletes object from memory when no one is using them anymore via reference counting . gc module is the specialized sub-system that hunts down these “reference cycle” and clears out. gc.collect — to force an immediate , manual hard-purge of dead variables to instantly free up VRAM/RAM. weakref ghost pointer — a way to reference an object without keeping it alive. Normally, if there is a object into a dictionary cache , that dictionary holds a “Strong reference’ to it.Event if the rest of the app completely deleted that object , it stays trapped alive in RAM because the cache dictionary is still holding onto it. weakref creates a“ghost pointer” allows the cache to see and use the data, but if the rest of the application deletes the original object, the grabage collector is allowed to destroy it anyway.The cache entry simply turns intoNone. These are the techniques used inside the frameworks you use every day. Understanding them lets you write code that feels like a framework — extensible, expressive, and self-documenting. Metaclasses: auto-registering model providers The factory blueprints If a regular class is ablueprint for creating objects, a metaclass is a blueprint forcreating classes.It is a piece of code that hooks into a python at the exact moment a class is being born defined and allows you to modify it before it exists. How they work:When Python reads a file and hits class MyLLMProvider:, a metaclass intercepts that event, inspects the class's properties, and can dynamically inject methods, rewrite names, or catalog it. class ModelRegistry type : """Every subclass with a model id gets registered automatically.""" registry: dict str, type = {} def new mcs, name, bases, ns : cls = super . new mcs, name, bases, ns if "model id" in ns: mcs. registry ns "model id" = cls return cls @classmethod def get mcs, model id: str - type: if model id not in mcs. registry: raise KeyError f"Unknown: {model id}. Options: {list mcs. registry }" return mcs. registry model id class BaseProvider metaclass=ModelRegistry : passclass ClaudeProvider BaseProvider : model id = "claude-opus-4-5" async def generate self, prompt : ... auto-registered on class creationclass GPT4Provider BaseProvider : model id = "gpt-4o" async def generate self, prompt : ... auto-registered on class creation Dynamic selection from config - no if/elif chainsprovider = ModelRegistry.get "claude-opus-4-5" 💡The Takeaway:This eliminates complexif/elif routing logic across your codebase, making your framework completely plug-and-play for new open-source models. data:"I am a private variable. Please leave me alone." A warning to developers . data :"I am a magic system hook. Python uses me to make syntax work." Built-in framework behavior . Dunder methods: pipelines with the | operator Short for“Double Underscore”methods, these are built-in hooks that always start and end with two underscores like init , call , or or .They allow you to define how your custom objects react to native Python operators. For instance, if you want to be able to use the + sign between two custom data objects, you define the add dunder method inside your class. python from future import annotationsfrom typing import Callable, Anyclass Step: def init self, fn: Callable, name="" : self.fn = fn self.name = name or fn. name def call self, data: Any - Any: return self.fn data def or self, other: Step - Step: step1 | step2 → a new step that chains both def chained data : return other self data return Step chained, f"{self.name}|{other.name}" def repr self : return f"Step {self.name r} " Define individual stepsstrip = Step str.strip, "strip" lower = Step str.lower, "lower" tokenise = Step str.split, "tokenise" count = Step len, "count" Compose with | - reads like Unix pipespreprocess = strip | lower | tokenise | countprint preprocess " Hello World from GenAI " 4print repr preprocess Step 'strip|lower|tokenise|count' Descriptors The Smart Gatekeepers python class BoundedTemperature: def set self, instance, value : The gatekeeper catches the value BEFORE it gets saved if not 0.0 <= value <= 2.0 : raise ValueError "LLM Temperature must be between 0.0 and 2.0 " instance. dict self.name = valueclass OpenAIModel: temperature = BoundedTemperature Reusable gatekeeper applied instantly The Impact: It acts as a defensive shield. It stops bad data or AI hallucinations from breaking your database or crashing your application downstream. This is where theory meets the real world. LLM APIs have rate limits. Workers crash. Tasks fail. Production AI systems need to handle all of this gracefully — not just in the happy path. Async rate limiter with token bucket python import asyncio, timefrom collections import dequeclass RateLimiter: """Token-bucket rate limiter: max N calls per M seconds.""" def init self, max calls: int, period: float = 60.0 : self.max calls = max calls self.period = period self. calls: deque float = deque self. lock = asyncio.Lock async def acquire self : async with self. lock: now = time.monotonic while self. calls and now - self. calls 0 self.period: self. calls.popleft if len self. calls = self.max calls: wait = self.period - now - self. calls 0 await asyncio.sleep wait self. calls.append time.monotonic async def aenter self : await self.acquire ; return self async def aexit self, : pass 60 RPM cap, max 10 simultaneous connectionslimiter = RateLimiter max calls=60, period=60.0 sem = asyncio.Semaphore 10 async def safe llm call prompt: str, idx: int - str: async with sem: cap concurrency async with limiter: cap rate await asyncio.sleep 0.5 return f" {idx} response"async def process prompts: list str - list str : tasks = safe llm call p, i for i, p in enumerate prompts return await asyncio.gather tasks Mixing asyncio + multiprocessing for hybrid workloads In a high-performance GenAI application, you often have a hybrid workload — meaning your code has to do two completely different types of tasks back-to-back: If you try to do both on a single thread, the heavy math phase will completely freeze your async loop, causing all your incoming user chat requests to lag or time out. Here is how mixing them solves this, explained simply. The Analogy: The Head Chef & The Prep Cooks Imagine your master kitchen hasone Head Chef Asyncio and a team of4 Prep Cooks in separate back kitchens Multiprocessing . The I/O Phase:The Head Chef writes down 100 different prompt orders and fires them off over the internet to a supplier. Because the chef is using asyncio, they don't sit around waiting by the phone. They effortlessly keep handling new incoming restaurant orders while the supplier processes the request. The Hand-off:Suddenly, the supplier ships back a massive truckload of raw data matrices. The Head Chef needs to calculate the mathematical similarity between all of them. The Trap:If the Head Chef sits down to crunch those numbers manually, they will be stuck at their desk for seconds. The kitchen gridlocks. No new orders can be taken. The Multi-Hybrid Solution:Instead, the Head Chef stays at the front counter. They take the massive data payload, chop it into 4 pieces, and throw it into a chute labeledloop.run in executor . Down the chute, the4 Prep Cooks Process Pool grab the math data. Each cook uses their own independent kitchen core to crunch the numbers. While they are sweating over the heavy math, the Head Chef isstillat the front counter, completely unblocked, happily streaming tokens and taking new user requests. When a Prep Cook finishes their math matrix, they throw the result back up the chute. The Head Chef catches it seamlessly using the await keyword. python import asynciofrom concurrent.futures import ProcessPoolExecutorimport numpy as npdef cpu similarity embeddings: list list float - list list float : """CPU-bound work - runs in a separate process, bypasses GIL.""" arr = np.array embeddings, dtype=np.float32 norm = arr / np.linalg.norm arr, axis=1, keepdims=True + 1e-8 return norm @ norm.T .tolist async def full pipeline texts: list str - list list float : loop = asyncio.get running loop Phase 1: I/O-bound - async embedding fetch embeddings = await fetch embeddings async texts Phase 2: CPU-bound - offload to process pool non-blocking with ProcessPoolExecutor max workers=4 as pool: sim matrix = await loop.run in executor pool, cpu similarity, embeddings return sim matrix run in executor bridges asyncio and multiprocessing cleanly: - the event loop remains unblocked during CPU work - CPU uses all available cores - result is awaited naturally Why this is Necessary for GenAI The Impact Without this hybrid pattern, you face an engineering nightmare:Async loop starvation. The moment your Python script tries to calculate a large Cosine Similarity matrix or tokenize a massive block of text on a single thread, the Global Interpreter Lock GIL freezes everything. Your real-time streaming tokens will stutter, web sockets will drop connections, and your health-check endpoints will fail. By bridging them together with run in executor: You get theultra-low memory overheadof Asyncio for network scaling. You get theraw CPU powerof Multiprocessing across all your machine’s hardware cores. Your application remains $100\%$ responsive to users, even while crunching millions of data points in the background Building production-grade Generative AI applications requires more than just knowing how to prompt an LLM or construct a basic RAG pipeline. As we scale these systems, the traditional bottlenecks of software engineering shift. Our programs spend massive amounts of time bound by network latency waiting for API providers, while concurrently demanding high-performance CPU and memory efficiency to manage massive vector datasets, token structures, and local model shards. Every Python Concept a Generative AI Developer Actually Needs to Know https://pub.towardsai.net/every-python-concept-a-generative-ai-developer-actually-needs-to-know-ba278864877b was originally published in Towards AI https://pub.towardsai.net on Medium, where people are continuing the conversation by highlighting and responding to this story.