I've been using Zo Computer as my primary AI workspace for a few months. The piece I kept coming back to wasn't the model — it was the substrate: the agent manager that spawns parallel sessions, the skills registry that auto-loads SKILL.md
files, the memory engine that compresses old context, the rrule-based scheduler, the compute pool that turns idle machines into workers, the BYOK client that swaps between Groq/OpenAI/Anthropic, and the headless browser that actually clicks things.
So I asked the obvious question: how much of that is concept and how much is platform glue? Could a single Python package on a laptop give a developer 80% of the same shape?
ZoClone is my answer. Seven files in src/
, ~800 lines of dependency-light Python, and every subsystem above is wired up. No daemon, no Docker, no Postgres — just ~/.zoclone/*.db
and a ThreadPoolExecutor
.
Here's the architecture, what I learned about which parts are easy to clone and which ones are doing real work, and the shortcuts I had to take to fit the whole thing in a single repo.
ZoClone/
├── src/
│ ├── zo.py # top-level orchestrator + ask() loop
│ ├── agent_manager.py # parallel async agents via Zo /zo/ask
│ ├── skills.py # SKILL.md auto- + handler dispatch
│ ├── memory.py # TF-IDF fallback embeddings + context recall
│ ├── automation.py # rrule scheduler with minute/hour/day cadences
│ ├── compute_pool.py # node registry + priority FIFO dispatch
│ ├── browser.py # Playwright headless + navigate/screenshot/eval
│ ├── byok.py # key vault for Groq/OpenAI/Anthropic/Ollama
│ ├── zo_client.py # OpenAI-compatible chat() abstraction
│ └── services.py # process supervisor (start/stop/logs)
Total LoC: 775. No __init__.py
magic, no metaclass tricks, no plugin discovery beyond a directory scan. The constraint forced every interface to be a plain function or a class with three methods.
zo.py
Everything threads through a single ZoClone
class that owns the DB connection, a thread pool, and a AIClient
that's lazily constructed on first call to ask()
.
class ZoClone:
def __init__(self):
self.db = init_db()
self.executor = ThreadPoolExecutor(max_workers=10)
self.ai_client = None
self.pool = pool # module-level singleton
self.hosting = hosting # module-level singleton
self.memory = memory
self.scheduler = scheduler
def ask(self, conv_id: str, message: str, provider: str = "groq",
model: str = "", tools: list[dict] = None) -> dict:
if not self.ai_client:
key = get_key(provider)
m = model or PROVIDERS[provider]["models"][0]
self.ai_client = AIClient(provider, m, key)
messages = self.memory.get_context(conv_id)
messages.append({"role": "user", "content": message})
system = f"You are Sentience, an advanced AI running locally. Workspace: {os.getcwd()}."
resp = self.ai_client.chat(
[{"role": "system", "content": system}] + messages[-20:],
tools or [],
)
The trick is AIClient
— it's the only piece that has to be OpenAI-compatible, because every modern provider (Groq, Together, OpenRouter, Ollama, LM Studio) has converged on the chat completions schema. Anthropic needed a tiny shim, but Groq works out of the box.
SKILL.md
This is the part I'm proudest of. The directory scan is six lines:
def load_all_skills():
global SKILLS
SKILLS = {}
if not SKILL_DIR.exists():
return
for item in SKILL_DIR.iterdir():
if item.is_dir() and (item / "SKILL.md").exists():
skill = load_skill(item.name, item / "SKILL.md")
if skill:
SKILLS[skill.name] = skill
The interesting bit is the SKILL.md parser. It accepts the same frontmatter shape as the Agent Skills spec — name
, description
, triggers
(comma-separated) — and looks for scripts/<name>.py
to find a run()
or execute()
callable. That's the entire plugin API. There's no registration, no decorator, no manifest; drop a folder in skills/
and the next import
picks it up.
The price: there's no versioning, no dependency declaration, no per-skill sandbox. If you want a skill to be hermetic, you have to do that yourself. For a single-user laptop, that's fine. For a multi-tenant platform, it's not.
aiohttp
over /zo/ask
I cheated here, and I'm fine with it. The original "spawn a parallel agent" primitive is itself a remote call to a model, and Zo's /zo/ask
endpoint is open to anyone with a token. So:
async def spawn(self, agent_id: str, prompt: str, callback=None):
async with aiohttp.ClientSession() as session:
async with session.post(
"https://api.zo.computer/zo/ask",
headers={"authorization": self.api_token, "content-type": "application/json"},
json={"input": prompt, "model_name": "vercel:minimax/minimax-m2.7"},
) as resp:
return {"agent_id": agent_id, "output": (await resp.json())["output"]}
async def spawn_all(self, agents: list) -> list:
return await asyncio.gather(*[self.spawn(a["id"], a["prompt"]) for a in agents])
spawn_all
fires N concurrent requests, asyncio.gather waits for the slowest, and you get a list of outputs back. A ThreadPoolExecutor(max_workers=10)
is the sync equivalent for callers that don't want to be async. In practice the bottleneck is the model, not the network — 10 parallel calls saturate the rate limiter long before they saturate asyncio
.
I'll be honest: this is the weakest subsystem. embed_tfidf
hashes tokens into a 512-dim vector, cosine
does the math, and recall()
returns the top-k nodes whose embedding has the highest similarity. It works for short prompts and small corpora, but it is not semantic — database
and sql
don't cluster the way they would with a real embedding model.
The reason I shipped it anyway: a real embedding model (sentence-transformers, or a remote call) is one swap away, and the interface — memorize(content, meta) -> nid
, recall(query, top_k) -> [{id, content, meta}]
— doesn't change. When I get around to plugging in nomic-embed-text
via Ollama, nothing in zo.py
needs to move. The trick was defining the right shape first and being honest about which fields the placeholder is faking.
The rrule spec is a 50-page document. I needed three frequencies and a count. So:
def parse_rrule(rrule: str) -> dict:
result = {"interval": 86400, "count": 0} # default daily
if "FREQ=DAILY" in rrule: result["interval"] = 86400
elif "FREQ=HOURLY" in rrule: result["interval"] = 3600
elif "FREQ=MINUTELY" in rrule: result["interval"] = 60
if "COUNT=" in rrule:
m = re.search(r"COUNT=(\d+)", rrule)
if m: result["count"] = int(m.group(1))
return result
A daemon thread wakes once a minute, asks SQLite for WHERE enabled=1 AND next_run <= now
, fires each one's handler
, and bumps next_run
by the interval. That's the entire automation system. It's missing timezones, exceptions, and DST handling, but for "run this every hour" it is correct and reliable.
ComputePool
keeps self.jobs
and self.nodes
as in-memory dicts protected by a threading.Lock
. Heartbeats update last_heartbeat
; dispatch sorts pending jobs by -priority
and assigns the top one to the next polling node. No leader election, no Raft, no gossip protocol.
def assign_job(self, node_id: str) -> dict | None:
with self.lock:
pending = [j for j in self.jobs.values() if j["status"] == "pending"]
if not pending: return None
pending.sort(key=lambda x: -x["priority"])
job = pending[0]
job["status"] = "assigned"
job["assigned_node"] = node_id
if node_id in self.nodes:
self.nodes[node_id]["status"] = "busy"
return job
This is a real footgun: in-process state means a process restart loses every pending job. For a real grid you'd want this in Postgres with row-level locks. But for "let me run a job on my second laptop", pip install
is the whole onboarding.
Three things are not in the package and probably never will be:
zo
and call zo.ask(...)
from a Flask route, a Tk window, a Discord bot, a cron job.whoami()
returns the local username. If you want a team plan, fork the repo.nomic-embed-text
(private, free, runs on the same box) and the interface stays the same.
git clone https://github.com/AmSach/ZoClone
cd ZoClone && pip install aiohttp playwright
python -m playwright install chromium
python -c "from src.zo import zo; print(zo.ask('test-conv', 'hi'))"
If you want a skill added, drop a folder in skills/
with a SKILL.md
scripts/foo.py
and open a PR. I merge in 24 hours. If you find a real bug in one of the seven subsystems, open an issue with a minimal repro — there are only 775 lines to search.
Seven files, one Python process, no cloud dependency. The shape matters more than the scale.