Multi-agent LLM systems have a coordination problem that most tutorials skip past. You can string together a few asyncio.gather
calls or a list of prompts, but once you need three or four agents to hand work to each other in a defined order — and you need the whole thing to degrade gracefully when one call fails — the scaffolding grows quickly and gets tangled with provider-specific SDK code.
I wrote aegis-gov
to separate that coordination scaffolding from the business logic. It is a small Python library (one hard dependency: requests
) that provides:
This article walks through each of those pieces, shows you the actual code, and is honest about what is not there yet.
Suppose you have four agents: a researcher, a writer, a translator, and a publisher. The writer depends on the researcher. The translator and publisher both depend on the writer but can run in parallel. The publisher should not run at all if the writer failed.
Without a scheduler, you write this by hand every time. The failure-cascade logic especially tends to become a set of nested conditionals that grows with each new dependency edge. And if your LLM provider returns a string of 429s or 5xx errors, there is nothing to stop the loop from hammering the endpoint until you kill the process.
aegis-gov
addresses both problems with two focused classes: TaskQueue
and AgentPool
.
pip install aegis-gov # requests only
pip install "aegis-gov[anthropic]" # + Anthropic SDK
pip install "aegis-gov[openai]" # + OpenAI SDK
pip install "aegis-gov[all]" # both LLM SDKs
Python 3.10+ is required.
The LLMAdapter
protocol has two methods: generate()
returns a string, stream()
yields string chunks. All three concrete adapters satisfy this protocol:
from aegis_gov import AnthropicAdapter, OpenAIAdapter, OllamaAdapter
adapter = AnthropicAdapter(model="claude-sonnet-4-6")
adapter = OpenAIAdapter(model="gpt-4o-mini", base_url="http://localhost:1234/v1")
adapter = OllamaAdapter(model="qwen2.5:14b")
The adapter is a field on AgentConfig
, so switching providers for a single agent is a one-line change. The rest of your orchestration code does not need to know which provider is in use.
from aegis_gov import OpenMultiAgent, AgentConfig, AnthropicAdapter
oma = OpenMultiAgent()
result = oma.run_agent(
AgentConfig(
name="analyst",
system_prompt="You are a concise market analyst.",
adapter=AnthropicAdapter(model="claude-haiku-4-5-20251001"),
),
task="Top 3 open-source multi-agent frameworks in 2026?",
)
print(result)
TaskQueue
takes a list of Task
objects, validates the dependency graph for cycles at construction time (raises CyclicDependencyError
if one is found), and exposes a ready()
method that returns the tasks whose dependencies are all in done
state.
from aegis_gov import OpenMultiAgent, Task
from aegis_gov import AgentConfig, AnthropicAdapter
team = OpenMultiAgent.create_team("pipeline", [
AgentConfig(name="researcher", system_prompt="Research topics thoroughly."),
AgentConfig(name="writer", system_prompt="Write clear reports."),
AgentConfig(name="translator", system_prompt="Translate to Japanese."),
AgentConfig(name="publisher", system_prompt="Format output as Markdown."),
])
tasks = [
Task(id="research", description="Research AI trends", agent="researcher"),
Task(id="draft", description="Write a report draft", agent="writer", depends_on=["research"]),
Task(id="translate", description="Translate to Japanese", agent="translator", depends_on=["draft"]),
Task(id="publish", description="Format as Markdown", agent="publisher", depends_on=["draft"]),
]
oma = OpenMultiAgent()
results = oma.run_tasks(tasks, team=team)
translate
and publish
both depend only on draft
, so they execute in parallel once draft
completes.
When stop_on_failure=False
(the default), only tasks that directly or transitively depend on a failed task are skipped. Independent branches continue:
from aegis_gov import TaskQueue, Task
q = TaskQueue([
Task(id="fetch", description="Fetch data"),
Task(id="process", description="Process data", depends_on=["fetch"]),
Task(id="report", description="Write report", depends_on=["process"]),
], stop_on_failure=False)
q.complete("fetch", success=False)
print(q.skipped_tasks()) # ["process", "report"]
print(q.summary()) # {"failed": 1, "skipped": 2}
Setting stop_on_failure=True
halts the entire queue on the first failure.
AgentPool
wraps a threading.Semaphore
to bound how many agents run simultaneously, and tracks consecutive failures to open the circuit:
from aegis_gov import AgentPool, OpenMultiAgent
pool = AgentPool(
max_concurrent=4,
consecutive_failure_limit=5,
recovery_timeout_s=30.0,
)
oma = OpenMultiAgent(pool=pool)
print(oma.get_status())
The state machine has three states: CLOSED
(normal), OPEN
(rejecting new work, raising CircuitOpenError
), and HALF_OPEN
(sending one probe call after recovery_timeout_s
elapses). A successful probe returns the circuit to CLOSED
; another failure reopens it.
Agents can be given callable tools:
from aegis_gov import ToolRegistry
registry = ToolRegistry()
registry.define_tool(
name="search_web",
description="Search the web for recent information",
fn=my_search_fn,
schema={"type": "object", "properties": {"query": {"type": "string"}}, "required": ["query"]},
)
The built-in tools are thin wrappers. They are not hardened for production use — treat them as stubs you replace with your own implementations.
Being honest about scope matters:
threading.Semaphore
and ThreadPoolExecutor
. If you need asyncio
-native agents, this is not the right library yet.stream()
, but run_agent()
and run_tasks()
collect the full response before returning.shell
, http_get
, etc. are thin wrappers without sandboxing, rate limiting, or error enrichment.Areas I plan to work on next, in rough priority order:
asyncio.Semaphore
, async def generate()
)run_agent()
Contributions and issue reports are welcome. The test suite uses pytest; see pyproject.toml
for the dev extras.
pip install "aegis-gov[all]"