# aegis-gov: a small Python library for multi-agent task graphs and circuit breakers

> Source: <https://dev.to/th19930828/aegis-gov-a-small-python-library-for-multi-agent-task-graphs-and-circuit-breakers-45h2>
> Published: 2026-06-17 13:09:52+00:00

Multi-agent LLM systems have a coordination problem that most tutorials skip past. You can string together a few `asyncio.gather`

calls or a list of prompts, but once you need three or four agents to hand work to each other in a defined order — and you need the whole thing to degrade gracefully when one call fails — the scaffolding grows quickly and gets tangled with provider-specific SDK code.

I wrote `aegis-gov`

to separate that coordination scaffolding from the business logic. It is a small Python library (one hard dependency: `requests`

) that provides:

This article walks through each of those pieces, shows you the actual code, and is honest about what is not there yet.

Suppose you have four agents: a researcher, a writer, a translator, and a publisher. The writer depends on the researcher. The translator and publisher both depend on the writer but can run in parallel. The publisher should not run at all if the writer failed.

Without a scheduler, you write this by hand every time. The failure-cascade logic especially tends to become a set of nested conditionals that grows with each new dependency edge. And if your LLM provider returns a string of 429s or 5xx errors, there is nothing to stop the loop from hammering the endpoint until you kill the process.

`aegis-gov`

addresses both problems with two focused classes: `TaskQueue`

and `AgentPool`

.

```
pip install aegis-gov                      # requests only
pip install "aegis-gov[anthropic]"         # + Anthropic SDK
pip install "aegis-gov[openai]"            # + OpenAI SDK
pip install "aegis-gov[all]"               # both LLM SDKs
```

Python 3.10+ is required.

The `LLMAdapter`

protocol has two methods: `generate()`

returns a string, `stream()`

yields string chunks. All three concrete adapters satisfy this protocol:

``` python
from aegis_gov import AnthropicAdapter, OpenAIAdapter, OllamaAdapter

# Anthropic
adapter = AnthropicAdapter(model="claude-sonnet-4-6")

# OpenAI or any compatible endpoint (LM Studio, vLLM, Azure, etc.)
adapter = OpenAIAdapter(model="gpt-4o-mini", base_url="http://localhost:1234/v1")

# Ollama — no extra package needed, communicates over HTTP
adapter = OllamaAdapter(model="qwen2.5:14b")
```

The adapter is a field on `AgentConfig`

, so switching providers for a single agent is a one-line change. The rest of your orchestration code does not need to know which provider is in use.

``` python
from aegis_gov import OpenMultiAgent, AgentConfig, AnthropicAdapter

oma = OpenMultiAgent()
result = oma.run_agent(
    AgentConfig(
        name="analyst",
        system_prompt="You are a concise market analyst.",
        adapter=AnthropicAdapter(model="claude-haiku-4-5-20251001"),
    ),
    task="Top 3 open-source multi-agent frameworks in 2026?",
)
print(result)
```

`TaskQueue`

takes a list of `Task`

objects, validates the dependency graph for cycles at construction time (raises `CyclicDependencyError`

if one is found), and exposes a `ready()`

method that returns the tasks whose dependencies are all in `done`

state.

``` python
from aegis_gov import OpenMultiAgent, Task
from aegis_gov import AgentConfig, AnthropicAdapter

team = OpenMultiAgent.create_team("pipeline", [
    AgentConfig(name="researcher",  system_prompt="Research topics thoroughly."),
    AgentConfig(name="writer",      system_prompt="Write clear reports."),
    AgentConfig(name="translator",  system_prompt="Translate to Japanese."),
    AgentConfig(name="publisher",   system_prompt="Format output as Markdown."),
])

tasks = [
    Task(id="research",   description="Research AI trends",        agent="researcher"),
    Task(id="draft",      description="Write a report draft",      agent="writer",     depends_on=["research"]),
    Task(id="translate",  description="Translate to Japanese",     agent="translator", depends_on=["draft"]),
    Task(id="publish",    description="Format as Markdown",        agent="publisher",  depends_on=["draft"]),
]

oma = OpenMultiAgent()
results = oma.run_tasks(tasks, team=team)
```

`translate`

and `publish`

both depend only on `draft`

, so they execute in parallel once `draft`

completes.

When `stop_on_failure=False`

(the default), only tasks that directly or transitively depend on a failed task are skipped. Independent branches continue:

``` python
from aegis_gov import TaskQueue, Task

q = TaskQueue([
    Task(id="fetch",   description="Fetch data"),
    Task(id="process", description="Process data",  depends_on=["fetch"]),
    Task(id="report",  description="Write report",  depends_on=["process"]),
], stop_on_failure=False)

q.complete("fetch", success=False)
print(q.skipped_tasks())  # ["process", "report"]
print(q.summary())        # {"failed": 1, "skipped": 2}
```

Setting `stop_on_failure=True`

halts the entire queue on the first failure.

`AgentPool`

wraps a `threading.Semaphore`

to bound how many agents run simultaneously, and tracks consecutive failures to open the circuit:

``` python
from aegis_gov import AgentPool, OpenMultiAgent

pool = AgentPool(
    max_concurrent=4,
    consecutive_failure_limit=5,
    recovery_timeout_s=30.0,
)
oma = OpenMultiAgent(pool=pool)
print(oma.get_status())
# {"pool_state": "closed", "pool_consecutive_failures": 0, ...}
```

The state machine has three states: `CLOSED`

(normal), `OPEN`

(rejecting new work, raising `CircuitOpenError`

), and `HALF_OPEN`

(sending one probe call after `recovery_timeout_s`

elapses). A successful probe returns the circuit to `CLOSED`

; another failure reopens it.

Agents can be given callable tools:

``` python
from aegis_gov import ToolRegistry

registry = ToolRegistry()
# five built-ins are registered automatically:
# file_read, http_get, shell, memory_store, memory_retrieve

registry.define_tool(
    name="search_web",
    description="Search the web for recent information",
    fn=my_search_fn,
    schema={"type": "object", "properties": {"query": {"type": "string"}}, "required": ["query"]},
)
```

The built-in tools are thin wrappers. They are not hardened for production use — treat them as stubs you replace with your own implementations.

Being honest about scope matters:

`threading.Semaphore`

and `ThreadPoolExecutor`

. If you need `asyncio`

-native agents, this is not the right library yet.`stream()`

, but `run_agent()`

and `run_tasks()`

collect the full response before returning.`shell`

, `http_get`

, etc. are thin wrappers without sandboxing, rate limiting, or error enrichment.Areas I plan to work on next, in rough priority order:

`asyncio.Semaphore`

, `async def generate()`

)`run_agent()`

Contributions and issue reports are welcome. The test suite uses pytest; see `pyproject.toml`

for the dev extras.

`pip install "aegis-gov[all]"`
