# I rebuilt Zo Computer's seven subsystems in 800 lines of Python — here's the architecture, the tradeoffs, and what I cut

> Source: <https://dev.to/aman_sachan_126d19c4a2773/i-rebuilt-zo-computers-seven-subsystems-in-800-lines-of-python-heres-the-architecture-the-2757>
> Published: 2026-06-13 01:48:38+00:00

I've been using [Zo Computer](https://zo.computer) as my primary AI workspace for a few months. The piece I kept coming back to wasn't the model — it was the *substrate*: the agent manager that spawns parallel sessions, the skills registry that auto-loads `SKILL.md`

files, the memory engine that compresses old context, the rrule-based scheduler, the compute pool that turns idle machines into workers, the BYOK client that swaps between Groq/OpenAI/Anthropic, and the headless browser that actually clicks things.

So I asked the obvious question: how much of that is *concept* and how much is platform glue? Could a single Python package on a laptop give a developer 80% of the same shape?

[ZoClone](https://github.com/AmSach/ZoClone) is my answer. Seven files in `src/`

, ~800 lines of dependency-light Python, and every subsystem above is wired up. No daemon, no Docker, no Postgres — just `~/.zoclone/*.db`

and a `ThreadPoolExecutor`

.

Here's the architecture, what I learned about which parts are easy to clone and which ones are doing real work, and the shortcuts I had to take to fit the whole thing in a single repo.

```
ZoClone/
├── src/
│   ├── zo.py              # top-level orchestrator + ask() loop
│   ├── agent_manager.py   # parallel async agents via Zo /zo/ask
│   ├── skills.py          # SKILL.md auto-loader + handler dispatch
│   ├── memory.py          # TF-IDF fallback embeddings + context recall
│   ├── automation.py      # rrule scheduler with minute/hour/day cadences
│   ├── compute_pool.py    # node registry + priority FIFO dispatch
│   ├── browser.py         # Playwright headless + navigate/screenshot/eval
│   ├── byok.py            # key vault for Groq/OpenAI/Anthropic/Ollama
│   ├── zo_client.py       # OpenAI-compatible chat() abstraction
│   └── services.py        # process supervisor (start/stop/logs)
```

Total LoC: **775**. No `__init__.py`

magic, no metaclass tricks, no plugin discovery beyond a directory scan. The constraint forced every interface to be a plain function or a class with three methods.

`zo.py`

Everything threads through a single `ZoClone`

class that owns the DB connection, a thread pool, and a `AIClient`

that's lazily constructed on first call to `ask()`

.

``` python
class ZoClone:
    def __init__(self):
        self.db = init_db()
        self.executor = ThreadPoolExecutor(max_workers=10)
        self.ai_client = None
        self.pool = pool        # module-level singleton
        self.hosting = hosting  # module-level singleton
        self.memory = memory
        self.scheduler = scheduler

    def ask(self, conv_id: str, message: str, provider: str = "groq",
            model: str = "", tools: list[dict] = None) -> dict:
        if not self.ai_client:
            key = get_key(provider)
            m = model or PROVIDERS[provider]["models"][0]
            self.ai_client = AIClient(provider, m, key)

        messages = self.memory.get_context(conv_id)
        messages.append({"role": "user", "content": message})
        system = f"You are Sentience, an advanced AI running locally. Workspace: {os.getcwd()}."

        resp = self.ai_client.chat(
            [{"role": "system", "content": system}] + messages[-20:],
            tools or [],
        )
        # ... persist + return
```

The trick is `AIClient`

— it's the *only* piece that has to be OpenAI-compatible, because every modern provider (Groq, Together, OpenRouter, Ollama, LM Studio) has converged on the chat completions schema. Anthropic needed a tiny shim, but Groq works out of the box.

`SKILL.md`

This is the part I'm proudest of. The directory scan is six lines:

``` python
def load_all_skills():
    global SKILLS
    SKILLS = {}
    if not SKILL_DIR.exists():
        return
    for item in SKILL_DIR.iterdir():
        if item.is_dir() and (item / "SKILL.md").exists():
            skill = load_skill(item.name, item / "SKILL.md")
            if skill:
                SKILLS[skill.name] = skill
```

The interesting bit is the SKILL.md parser. It accepts the same frontmatter shape as the Agent Skills spec — `name`

, `description`

, `triggers`

(comma-separated) — and looks for `scripts/<name>.py`

to find a `run()`

or `execute()`

callable. That's the entire plugin API. There's no registration, no decorator, no manifest; drop a folder in `skills/`

and the next `import`

picks it up.

The price: there's no versioning, no dependency declaration, no per-skill sandbox. If you want a skill to be hermetic, you have to do that yourself. For a single-user laptop, that's fine. For a multi-tenant platform, it's not.

`aiohttp`

over `/zo/ask`

I cheated here, and I'm fine with it. The original "spawn a parallel agent" primitive is *itself* a remote call to a model, and Zo's `/zo/ask`

endpoint is open to anyone with a token. So:

``` python
async def spawn(self, agent_id: str, prompt: str, callback=None):
    async with aiohttp.ClientSession() as session:
        async with session.post(
            "https://api.zo.computer/zo/ask",
            headers={"authorization": self.api_token, "content-type": "application/json"},
            json={"input": prompt, "model_name": "vercel:minimax/minimax-m2.7"},
        ) as resp:
            return {"agent_id": agent_id, "output": (await resp.json())["output"]}

async def spawn_all(self, agents: list) -> list:
    return await asyncio.gather(*[self.spawn(a["id"], a["prompt"]) for a in agents])
```

`spawn_all`

fires N concurrent requests, asyncio.gather waits for the slowest, and you get a list of outputs back. A `ThreadPoolExecutor(max_workers=10)`

is the sync equivalent for callers that don't want to be async. In practice the bottleneck is the model, not the network — 10 parallel calls saturate the rate limiter long before they saturate `asyncio`

.

I'll be honest: this is the weakest subsystem. `embed_tfidf`

hashes tokens into a 512-dim vector, `cosine`

does the math, and `recall()`

returns the top-k nodes whose embedding has the highest similarity. It works for short prompts and small corpora, but it is *not* semantic — `database`

and `sql`

don't cluster the way they would with a real embedding model.

The reason I shipped it anyway: a real embedding model (sentence-transformers, or a remote call) is one swap away, and the *interface* — `memorize(content, meta) -> nid`

, `recall(query, top_k) -> [{id, content, meta}]`

— doesn't change. When I get around to plugging in `nomic-embed-text`

via Ollama, nothing in `zo.py`

needs to move. The trick was defining the right shape first and being honest about which fields the placeholder is faking.

The rrule spec is a 50-page document. I needed three frequencies and a count. So:

``` php
def parse_rrule(rrule: str) -> dict:
    result = {"interval": 86400, "count": 0}  # default daily
    if "FREQ=DAILY" in rrule: result["interval"] = 86400
    elif "FREQ=HOURLY" in rrule: result["interval"] = 3600
    elif "FREQ=MINUTELY" in rrule: result["interval"] = 60
    if "COUNT=" in rrule:
        m = re.search(r"COUNT=(\d+)", rrule)
        if m: result["count"] = int(m.group(1))
    return result
```

A daemon thread wakes once a minute, asks SQLite for `WHERE enabled=1 AND next_run <= now`

, fires each one's `handler`

, and bumps `next_run`

by the interval. That's the entire automation system. It's missing timezones, exceptions, and DST handling, but for "run this every hour" it is correct and reliable.

`ComputePool`

keeps `self.jobs`

and `self.nodes`

as in-memory dicts protected by a `threading.Lock`

. Heartbeats update `last_heartbeat`

; dispatch sorts pending jobs by `-priority`

and assigns the top one to the next polling node. No leader election, no Raft, no gossip protocol.

``` php
def assign_job(self, node_id: str) -> dict | None:
    with self.lock:
        pending = [j for j in self.jobs.values() if j["status"] == "pending"]
        if not pending: return None
        pending.sort(key=lambda x: -x["priority"])
        job = pending[0]
        job["status"] = "assigned"
        job["assigned_node"] = node_id
        if node_id in self.nodes:
            self.nodes[node_id]["status"] = "busy"
        return job
```

This is a real footgun: in-process state means a process restart loses every pending job. For a *real* grid you'd want this in Postgres with row-level locks. But for "let me run a job on my second laptop", `pip install`

is the whole onboarding.

Three things are *not* in the package and probably never will be:

`zo`

and call `zo.ask(...)`

from a Flask route, a Tk window, a Discord bot, a cron job.`whoami()`

returns the local username. If you want a team plan, fork the repo.`nomic-embed-text`

(private, free, runs on the same box) and the interface stays the same.

```
git clone https://github.com/AmSach/ZoClone
cd ZoClone && pip install aiohttp playwright
python -m playwright install chromium
python -c "from src.zo import zo; print(zo.ask('test-conv', 'hi'))"
```

If you want a skill added, drop a folder in `skills/`

with a `SKILL.md`

+ `scripts/foo.py`

and open a PR. I merge in 24 hours. If you find a real bug in one of the seven subsystems, open an issue with a minimal repro — there are only 775 lines to search.

*Seven files, one Python process, no cloud dependency. The shape matters more than the scale.*
