Multi-Agent — Implementing the Orchestrator Worker Pattern A developer implemented a multi-agent architecture using the Orchestrator Worker pattern, where an orchestrator delegates tasks to specialized workers for search and quality checking. The approach, inspired by Anthropic research showing a 90% improvement in task completion rates, replaces a single-agent system with role-specific agents to handle complex tasks more effectively. Through Chapter 6 Fine-tuning https://dev.to/hiroki-kameyama/fine-tuning-domain-specializing-models-with-lora-180g , we focused on improving a single AI system. This chapter introduces multi-agent design, where multiple Agents collaborate on complex tasks. Our previous Agent implementations used a "one Agent does everything" approach. For complex tasks, putting all responsibility on a single Agent has limits. Single Agent before User → Agent → Tools → Answer One Agent makes all decisions and executes everything Multi-Agent now User → Orchestrator → Search Agent → Answer Generation Agent → Quality Check Agent → Final Answer Each role is specialized and they collaborate Multi-agent is effective when: 2026 Trend:Anthropic research shows multi-agent architectures improve task completion rates by 90%. The Orchestrator × Worker pattern has become the production standard. User's question ↓ Orchestrator Analyzes task and delegates to two workers ↓ ↓ Search Worker Quality Check Worker Searches pgvector Evaluates answer quality ↓ ↓ Orchestrator Integrates results and generates final answer ↓ Final Answer pgvector-tutorial/ ├── existing files └── multiagent/ ├── search worker.py ★ Search specialist worker ├── quality worker.py ★ Quality check specialist worker ├── orchestrator.py ★ Orchestrator └── 14 multiagent.py ★ Execution script multiagent/search worker.py A worker focused exclusively on pgvector search. Single responsibility search only keeps the prompt simple and improves accuracy. multiagent/search worker.py """ Search specialist worker Role: Search and return documents from pgvector Responsibility: Search only no answer generation """ import sys import os sys.path.append os.path.dirname os.path.dirname os.path.abspath file import psycopg2 from google import genai from google.genai import types from dotenv import load dotenv import time load dotenv client = genai.Client api key=os.getenv "GEMINI API KEY" conn = psycopg2.connect host=os.getenv "DB HOST" , port=os.getenv "DB PORT" , dbname=os.getenv "DB NAME" , user=os.getenv "DB USER" , password=os.getenv "DB PASSWORD" , cur = conn.cursor SEARCH WORKER SYSTEM = """You are a document search specialist. Search for documents related to the given question from pgvector and return the results as-is. Constraints: - Do not generate answers just return search results - If no results found, return "No relevant documents found" - Use category filtering when the category is clear """ def get embedding text: str - list float : result = client.models.embed content model="gemini-embedding-001", contents=text, config=types.EmbedContentConfig task type="RETRIEVAL QUERY", output dimensionality=768, , return result.embeddings 0 .values def search documents query: str, top k: int = 3 - list dict : """Search all categories.""" query embedding = get embedding query cur.execute """ SELECT title, body, category, 1 - embedding <= %s::vector AS similarity FROM documents ORDER BY embedding <= %s::vector LIMIT %s; """, query embedding, query embedding, top k rows = cur.fetchall return {"title": r 0 , "body": r 1 , "category": r 2 , "similarity": round r 3 , 4 } for r in rows def search by category query: str, category: str, top k: int = 3 - list dict : """Category-filtered search.""" query embedding = get embedding query cur.execute """ SELECT title, body, category, 1 - embedding <= %s::vector AS similarity FROM documents WHERE category = %s ORDER BY embedding <= %s::vector LIMIT %s; """, query embedding, category, query embedding, top k rows = cur.fetchall return {"title": r 0 , "body": r 1 , "category": r 2 , "similarity": round r 3 , 4 } for r in rows def list categories - list dict : """Get category list.""" cur.execute """ SELECT category, COUNT as count FROM documents GROUP BY category ORDER BY count DESC; """ rows = cur.fetchall return {"category": r 0 , "count": r 1 } for r in rows SEARCH TOOLS = types.Tool function declarations= types.FunctionDeclaration name="search documents", description="Search all categories for documents related to a query.", parameters=types.Schema type=types.Type.OBJECT, properties={ "query": types.Schema type=types.Type.STRING, description="Search query" , "top k": types.Schema type=types.Type.INTEGER, description="Number of results" , }, required= "query" , , , types.FunctionDeclaration name="search by category", description="Search documents in a specific category only.", parameters=types.Schema type=types.Type.OBJECT, properties={ "query": types.Schema type=types.Type.STRING , "category": types.Schema type=types.Type.STRING, description="ML / Python / Cloud" , "top k": types.Schema type=types.Type.INTEGER , }, required= "query", "category" , , , types.FunctionDeclaration name="list categories", description="Return the list of categories in the DB.", parameters=types.Schema type=types.Type.OBJECT, properties={} , , def dispatch func name: str, func args: dict : if func name == "search documents": return search documents func args elif func name == "search by category": return search by category func args elif func name == "list categories": return list categories return {"error": f"unknown: {func name}"} def run search worker question: str - dict: """ Main function for the search worker. Returns: { "docs": list of retrieved documents , "steps": number of steps executed , "worker": "search" } """ print f" Search Worker Question: {question :40 }..." contents = types.Content role="user", parts= types.Part text=question docs = steps = 0 for in range 5 : for attempt in range 3 : try: response = client.models.generate content model="gemini-2.5-flash", contents=contents, config=types.GenerateContentConfig system instruction=SEARCH WORKER SYSTEM, tools= SEARCH TOOLS , , break except Exception as e: if "503" in str e or "429" in str e and attempt < 2: time.sleep attempt + 1 10 else: raise candidates = response.candidates if not candidates or not candidates 0 .content.parts: break part = candidates 0 .content.parts 0 if part.function call: func name = part.function call.name func args = dict part.function call.args result = dispatch func name, func args steps += 1 print f" Search Worker {func name} → {len result if isinstance result, list else result} results" if func name in "search documents", "search by category" and isinstance result, list : docs.extend result contents.append types.Content role="model", parts= types.Part function call=part.function call contents.append types.Content role="user", parts= types.Part function response=types.FunctionResponse name=func name, response={"result": result}, else: break Deduplicate by title seen = set unique docs = for doc in docs: if doc "title" not in seen: seen.add doc "title" unique docs.append doc print f" Search Worker Done: retrieved {len unique docs } documents" return {"docs": unique docs, "steps": steps, "worker": "search"} multiagent/quality worker.py A worker focused exclusively on evaluating answer quality. multiagent/quality worker.py """ Quality check specialist worker Role: Evaluate the quality of generated answers Responsibility: Quality evaluation only no search or answer generation """ import sys import os sys.path.append os.path.dirname os.path.dirname os.path.abspath file from google import genai from google.genai import types from dotenv import load dotenv import time import json import re load dotenv client = genai.Client api key=os.getenv "GEMINI API KEY" QUALITY WORKER SYSTEM = """You are a quality evaluation specialist for AI answers. Evaluate the given "question, reference documents, and answer" set. Evaluation criteria: 1. Faithfulness 0.0–1.0 : Is the answer based on the documents? 2. Relevancy 0.0–1.0 : Does it correctly answer the question? 3. Completeness 0.0–1.0 : Does it include all necessary information? Return ONLY the following JSON format: { "faithfulness": 0.0–1.0, "relevancy": 0.0–1.0, "completeness": 0.0–1.0, "overall": 0.0–1.0, "feedback": "Note any areas for improvement" } """ def run quality worker question: str, docs: list dict , answer: str - dict: """ Main function for the quality check worker. Returns: { "faithfulness": float, "relevancy": float, "completeness": float, "overall": float, "feedback": str, "worker": "quality" } """ print f" Quality Worker Evaluating answer quality..." context = "\n\n".join f" {d 'title' } \n{d 'body' }" for d in docs prompt = f"""Please evaluate the following. Question {question} Reference Documents {context} Answer to Evaluate {answer} Return the evaluation result in JSON format.""" for attempt in range 3 : try: response = client.models.generate content model="gemini-2.5-flash", contents=prompt, config=types.GenerateContentConfig system instruction=QUALITY WORKER SYSTEM, , break except Exception as e: if "503" in str e or "429" in str e and attempt < 2: time.sleep attempt + 1 10 else: raise raw = response.text.strip match = re.search r'\{. \}', raw, re.DOTALL if match: try: result = json.loads match.group result "worker" = "quality" print f" Quality Worker Overall: {result.get 'overall', 'N/A' }" return result except json.JSONDecodeError: pass print f" Quality Worker Parse failed, using default values" return { "faithfulness": 0.5, "relevancy": 0.5, "completeness": 0.5, "overall": 0.5, "feedback": "Could not retrieve evaluation", "worker": "quality", } multiagent/orchestrator.py The command center that coordinates the two workers. multiagent/orchestrator.py """ Orchestrator Role: Receive user questions, direct workers, and integrate results Responsibility: Task decomposition, worker invocation, result integration """ import sys import os sys.path.append os.path.dirname os.path.dirname os.path.abspath file from google import genai from google.genai import types from dotenv import load dotenv from multiagent.search worker import run search worker from multiagent.quality worker import run quality worker import time load dotenv client = genai.Client api key=os.getenv "GEMINI API KEY" ORCHESTRATOR SYSTEM = """You are an orchestrator that decomposes tasks and directs multiple workers. Your role: 1. Receive the user's question 2. Generate an answer based on documents retrieved by the search worker 3. Review quality check results and improve the answer if needed 4. Return the final answer to the user Constraints: - Base answers on the reference documents - Improve the answer if quality score is below 0.7 - Aim for concise and accurate answers """ def generate answer question: str, docs: list dict - str: time.sleep 15 Rate limit safety 5 requests/min free tier context = "\n\n".join f" {d 'title' } \n{d 'body' }" for d in docs prompt = f"""Answer the question based on the following documents. Reference Documents {context} Question {question} Answer concisely, based on the documents """ for attempt in range 3 : try: response = client.models.generate content model="gemini-2.5-flash", contents=prompt, config=types.GenerateContentConfig system instruction=ORCHESTRATOR SYSTEM, , return response.text except Exception as e: if "503" in str e or "429" in str e and attempt < 2: time.sleep attempt + 1 10 else: raise return "Could not generate an answer." def improve answer question: str, docs: list dict , answer: str, feedback: str - str: context = "\n\n".join f" {d 'title' } \n{d 'body' }" for d in docs prompt = f"""Please improve the following answer. Question {question} Reference Documents {context} Current Answer {answer} Improvement Feedback {feedback} Improved Answer""" for attempt in range 3 : try: response = client.models.generate content model="gemini-2.5-flash", contents=prompt, return response.text except Exception as e: if "503" in str e or "429" in str e and attempt < 2: time.sleep attempt + 1 10 else: raise return answer def run orchestrator question: str - dict: """ Main orchestrator function. Flow: 1. Request search from search worker 2. Generate answer from search results 3. Request evaluation from quality check worker 4. Improve answer if score is low 5. Return final answer """ print f"\n{'=' 60}" print f"Orchestrator started" print f"Question: {question}" print f"{'=' 60}" ── Step 1: Request search from search worker ───────────── print "\n Step 1 Delegating to search worker..." search result = run search worker question docs = search result "docs" if not docs: return { "answer": "No relevant documents found.", "quality": {"overall": 0.0}, "docs count": 0, "improved": False, } print f" → Retrieved {len docs } documents" ── Step 2: Generate answer ─────────────────────────────── print "\n Step 2 Generating answer..." answer = generate answer question, docs print f" → Answer generated {len answer } chars " ── Step 3: Request evaluation from quality worker ──────── print "\n Step 3 Delegating to quality check worker..." quality = run quality worker question, docs, answer improved = False ── Step 4: Improve if quality is low ──────────────────── if quality.get "overall", 0 < 0.7: print f"\n Step 4 Quality score {quality.get 'overall' } < 0.7 → Improving answer..." feedback = quality.get "feedback", "Please improve the answer" answer = improve answer question, docs, answer, feedback improved = True print f" → Answer improvement complete" else: print f"\n Step 4 Quality score {quality.get 'overall' } ≥ 0.7 → No improvement needed" print f"\n{'=' 60}" print "Orchestrator complete" print f"{'=' 60}" return { "answer": answer, "quality": quality, "docs count": len docs , "improved": improved, } 14 multiagent.py python 14 multiagent.py import time import sys import os sys.path.append os.path.dirname os.path.abspath file from multiagent.orchestrator import run orchestrator def main : questions = "Tell me in detail about ML evaluation metrics", "What are the methods for reducing AWS costs?", for question in questions: result = run orchestrator question print f"\nFinal Answer:" print result "answer" print f"\nQuality Scores:" q = result "quality" print f" Faithfulness: {q.get 'faithfulness', 'N/A' }" print f" Relevancy: {q.get 'relevancy', 'N/A' }" print f" Completeness: {q.get 'completeness', 'N/A' }" print f" Overall: {q.get 'overall', 'N/A' }" print f" Improved: {'Yes' if result 'improved' else 'No'}" print f" Docs used: {result 'docs count' }" print "\n" + "=" 60 time.sleep 30 30s between questions rate limit safety if name == " main ": main mkdir multiagent touch multiagent/ init .py python 14 multiagent.py Sample output: ============================================================ Orchestrator started Question: Tell me in detail about ML evaluation metrics ============================================================ Step 1 Delegating to search worker... Search Worker Question: Tell me in detail about ML ev... Search Worker list categories → 3 results Search Worker search by category → 2 results Search Worker Done: retrieved 2 documents Step 2 Generating answer... → Answer generated 324 chars Step 3 Delegating to quality check worker... Quality Worker Evaluating answer quality... Quality Worker Overall: 0.92 Step 4 Quality score 0.92 ≥ 0.7 → No improvement needed ============================================================ | Pattern | Structure | Best For | |---|---|---| Orchestrator × Worker this chapter | 1 command center + multiple specialists | General task decomposition | Sequential Pipeline | Agent A → Agent B → Agent C | Document processing, contract generation | Parallel Fan-out | Multiple Agents execute simultaneously | High-speed research, comparison analysis | Hierarchical | Upper orchestrator → Lower orchestrator → Workers | Large-scale complex tasks | Watch out for cost explosions When the orchestrator receives full results from all workers, the context window balloons. Always have workers return structured JSON summaries — never pass the full text. Set step limits Always configure a max steps limit on each Agent to prevent runaway loops. Error handling Use try/except per worker so that one worker's failure doesn't halt the entire system. | Error | Cause | Fix | |---|---|---| ModuleNotFoundError: multiagent | Missing init .py | Run touch multiagent/ init .py | NameError: name 'time' is not defined | Missing import time | Add import time at top of 14 multiagent.py | 429 RESOURCE EXHAUSTED per-minute | Gemini free tier: 5 req/min | Add time.sleep 15 at start of generate answer | 429 RESOURCE EXHAUSTED daily | Gemini free tier: 20 req/day | Re-run the next day | | Quality worker JSON parse failure | LLM returns malformed JSON | Extract JSON block with re.search | | Cost overrun | Sending full text to workers | Pass summaries / structured JSON | asyncio.gather to parallelize the search and quality workers for speed