# LangGraph vs Locus StateGraph — same workflow, same model (gpt-4o-mini), side by side. LangGraph→real OpenAI; Locus→OCI GenAI via BOAT-OC1.

> Source: <https://gist.github.com/fede-kamel/cb45aeac259bb995f8be23a5d7ca6965>
> Published: 2026-05-22 00:59:29+00:00

| #!/usr/bin/env python3 | |
| """LangGraph vs Locus StateGraph — same workflow, same model, side by side. | |
| Workflow (in both frameworks) | |
| ============================= | |
| START → research → write → review →┐ | |
| ▲ │ | |
| └── if confidence < 0.85 ──┘ (max 3 iterations) | |
| │ | |
| END | |
| State (in both) | |
| =============== | |
| topic: str — user input | |
| notes: list[str] — accumulates across research iterations (REDUCER) | |
| draft: str | |
| confidence: float — review node's score | |
| iter: int — loop counter | |
| Model | |
| ===== | |
| Both sides call ``gpt-4o-mini``: | |
| - LangGraph → real OpenAI API (``OPENAI_API_KEY``) | |
| - Locus → OCI GenAI ``openai.gpt-4o-mini`` (BOAT-OC1 session token, | |
| saasobservai compartment, ``us-chicago-1``) | |
| Plus, this script verifies the API equivalence claims from the prose | |
| comparison I wrote — every claim is exercised and any failures are | |
| collected into ISSUES at the end. | |
| Install | |
| ======= | |
| uv pip install "locus-sdk[oci]==0.2.0b19" langgraph langchain-openai | |
| Env | |
| === | |
| OPENAI_API_KEY=sk-proj-… | |
| OCI_PROFILE=BOAT-OC1 | |
| OCI_COMPARTMENT_ID=ocid1.compartment.oc1..… # saasobservai prod | |
| OCI_REGION=us-chicago-1 | |
| """ | |
| from __future__ import annotations | |
| import asyncio | |
| import operator | |
| import os | |
| import time | |
| from typing import Annotated, Any, TypedDict | |
| from pydantic import BaseModel, Field | |
| # ---------- LangGraph ------------------------------------------------------ | |
| from langgraph.graph import StateGraph as LGStateGraph, START as LG_START, END as LG_END | |
| from langgraph.checkpoint.memory import MemorySaver as LGMemorySaver | |
| from langchain_openai import ChatOpenAI | |
| # ---------- Locus ---------------------------------------------------------- | |
| from locus.multiagent import StateGraph as LocusStateGraph | |
| from locus.multiagent.graph import START as LOCUS_START, END as LOCUS_END | |
| from locus.models import get_model as locus_get_model | |
| from locus.core.reducers import Reducer # noqa: F401 — exists check | |
| TOPIC = "Should a backend team standardising on Python adopt LangGraph in production?" | |
| TARGET_CONFIDENCE = 0.85 | |
| MAX_ITER = 3 | |
| RESEARCH_PROMPT = ( | |
| "You are a tight research analyst. Topic: {topic}\n" | |
| "Existing notes so far ({n_existing}): {notes_so_far}\n" | |
| "Produce TWO new, non-overlapping bullet-point findings (≤ 25 words each). " | |
| "Numeric or factual claims preferred. Output the two bullets only." | |
| ) | |
| WRITE_PROMPT = ( | |
| "You are a technical writer. Topic: {topic}\n" | |
| "Research notes:\n{notes}\n\n" | |
| "Write a 3-sentence verdict paragraph. Be concrete and decisive." | |
| ) | |
| REVIEW_PROMPT = ( | |
| "You are a sceptical editor. Topic: {topic}\nDraft:\n{draft}\n\n" | |
| "Score the draft from 0.0 to 1.0 on (factual confidence + decisiveness + " | |
| "coverage). Reply with ONLY the number, e.g. '0.82'. No explanation." | |
| ) | |
| # =========================================================================== | |
| # Model adapters — give each framework a sync `complete(prompt) -> str` lambda | |
| # =========================================================================== | |
| def _make_openai_complete(): | |
| llm = ChatOpenAI(model="gpt-4o-mini", api_key=os.environ["OPENAI_API_KEY"], temperature=0.2) | |
| def complete(prompt: str) -> str: | |
| return llm.invoke(prompt).content.strip() | |
| return complete | |
| def _make_oci_complete(): | |
| model = locus_get_model( | |
| "oci:openai.gpt-4o-mini", | |
| profile=os.environ.get("OCI_PROFILE", "BOAT-OC1"), | |
| compartment_id=os.environ["OCI_COMPARTMENT_ID"], | |
| region=os.environ.get("OCI_REGION", "us-chicago-1"), | |
| max_tokens=400, | |
| temperature=0.2, | |
| ) | |
| # locus.ModelProtocol exposes async `complete(messages, tools=None)`. | |
| from locus.core.messages import Message | |
| def complete(prompt: str) -> str: | |
| msgs = [Message(role="user", content=prompt)] | |
| resp = asyncio.run(model.complete(msgs)) | |
| # ModelResponse → text | |
| return (resp.content or "").strip() | |
| return complete | |
| def _parse_score(text: str) -> float: | |
| for tok in text.replace(",", " ").split(): | |
| try: | |
| v = float(tok) | |
| if 0.0 <= v <= 1.0: | |
| return v | |
| except ValueError: | |
| continue | |
| return 0.5 # fallback | |
| # =========================================================================== | |
| # IMPLEMENTATION A — LangGraph | |
| # =========================================================================== | |
| class LGState(TypedDict, total=False): | |
| topic: str | |
| notes: Annotated[list[str], operator.add] # reducer = list append | |
| draft: str | |
| confidence: float | |
| iter: int | |
| def build_langgraph(): | |
| complete = _make_openai_complete() | |
| def research(state: LGState) -> dict: | |
| notes_so_far = "\n".join(state.get("notes", [])) or "(none)" | |
| out = complete(RESEARCH_PROMPT.format( | |
| topic=state["topic"], n_existing=len(state.get("notes", [])), | |
| notes_so_far=notes_so_far, | |
| )) | |
| new_notes = [l.strip("-• ").strip() for l in out.splitlines() if l.strip()] | |
| return {"notes": new_notes[:2], "iter": state.get("iter", 0) + 1} | |
| def write(state: LGState) -> dict: | |
| notes = "\n".join(f"- {n}" for n in state["notes"]) | |
| return {"draft": complete(WRITE_PROMPT.format(topic=state["topic"], notes=notes))} | |
| def review(state: LGState) -> dict: | |
| score_text = complete(REVIEW_PROMPT.format(topic=state["topic"], draft=state["draft"])) | |
| return {"confidence": _parse_score(score_text)} | |
| def decide(state: LGState) -> str: | |
| if state["confidence"] >= TARGET_CONFIDENCE or state.get("iter", 0) >= MAX_ITER: | |
| return LG_END | |
| return "research" | |
| g = LGStateGraph(LGState) | |
| g.add_node("research", research) | |
| g.add_node("write", write) | |
| g.add_node("review", review) | |
| g.add_edge(LG_START, "research") | |
| g.add_edge("research", "write") | |
| g.add_edge("write", "review") | |
| g.add_conditional_edges("review", decide, {"research": "research", LG_END: LG_END}) | |
| return g.compile(checkpointer=LGMemorySaver()) | |
| # =========================================================================== | |
| # IMPLEMENTATION B — Locus StateGraph | |
| # =========================================================================== | |
| class LocusState(BaseModel): | |
| topic: str = "" | |
| notes: list[str] = Field(default_factory=list) | |
| draft: str = "" | |
| confidence: float = 0.0 | |
| iter: int = 0 | |
| def build_locus_graph(): | |
| complete = _make_oci_complete() | |
| def research(state: dict) -> dict: | |
| notes_so_far = "\n".join(state.get("notes", []) or []) or "(none)" | |
| out = complete(RESEARCH_PROMPT.format( | |
| topic=state["topic"], n_existing=len(state.get("notes", []) or []), | |
| notes_so_far=notes_so_far, | |
| )) | |
| new_notes = [l.strip("-• ").strip() for l in out.splitlines() if l.strip()][:2] | |
| # Manual append — Locus' reducer behavior is verified separately below. | |
| merged = list(state.get("notes", []) or []) + new_notes | |
| return {"notes": merged, "iter": (state.get("iter", 0) or 0) + 1} | |
| def write(state: dict) -> dict: | |
| notes = "\n".join(f"- {n}" for n in (state.get("notes") or [])) | |
| return {"draft": complete(WRITE_PROMPT.format(topic=state["topic"], notes=notes))} | |
| def review(state: dict) -> dict: | |
| score_text = complete(REVIEW_PROMPT.format(topic=state["topic"], draft=state["draft"])) | |
| return {"confidence": _parse_score(score_text)} | |
| def decide(state: dict) -> str: | |
| if (state.get("confidence") or 0.0) >= TARGET_CONFIDENCE or (state.get("iter") or 0) >= MAX_ITER: | |
| return LOCUS_END | |
| return "research" | |
| g = LocusStateGraph(state_schema=LocusState) | |
| g.add_node("research", research) | |
| g.add_node("write", write) | |
| g.add_node("review", review) | |
| g.add_edge(LOCUS_START, "research") | |
| g.add_edge("research", "write") | |
| g.add_edge("write", "review") | |
| g.add_conditional_edges("review", decide, {"research": "research", LOCUS_END: LOCUS_END}) | |
| return g.compile() | |
| # =========================================================================== | |
| # Driver | |
| # =========================================================================== | |
| def run_langgraph(graph) -> dict: | |
| t0 = time.monotonic() | |
| state = graph.invoke( | |
| {"topic": TOPIC, "notes": [], "iter": 0}, | |
| config={"configurable": {"thread_id": "t1"}, "recursion_limit": 50}, | |
| ) | |
| return {**state, "_elapsed": time.monotonic() - t0} | |
| def run_locus(graph) -> dict: | |
| # NOTE: docs example uses `graph.compile().run_sync(...)` but only async | |
| # entry points exist (`ainvoke`, `astream`, `execute`, `stream`). Adapter: | |
| t0 = time.monotonic() | |
| result = asyncio.run(graph.ainvoke({"topic": TOPIC, "notes": [], "iter": 0})) | |
| state = result.final_state if hasattr(result, "final_state") else result | |
| if hasattr(state, "model_dump"): | |
| state = state.model_dump() | |
| return {**state, "_elapsed": time.monotonic() - t0} | |
| # =========================================================================== | |
| # Equivalence verifier — runs my prose claims past reality | |
| # =========================================================================== | |
| def verify_equivalence_claims() -> list[str]: | |
| """Returns a list of failure strings — empty means every claim checks out.""" | |
| fails: list[str] = [] | |
| add = fails.append | |
| # Claim: builder name + signature parity | |
| try: | |
| from locus.multiagent import StateGraph as LG_locus # noqa: N814 | |
| LG_locus(state_schema=LocusState) | |
| except Exception as e: # noqa: BLE001 | |
| add(f"StateGraph(state_schema=…) builder shape failed in Locus: {e!r}") | |
| # Claim: Send / Command / interrupt importable from claimed paths | |
| try: | |
| from locus.core.send import Send # noqa: F401 | |
| from locus.core.command import Command # noqa: F401 | |
| from locus.core.interrupt import interrupt # noqa: F401 | |
| except Exception as e: # noqa: BLE001 | |
| add(f"Send/Command/interrupt import paths failed: {e!r}") | |
| # Claim: Functional API (@entrypoint / @task) | |
| try: | |
| from locus.multiagent.functional import entrypoint, task # noqa: F401 | |
| except Exception as e: # noqa: BLE001 | |
| add(f"@entrypoint / @task not importable from locus.multiagent.functional: {e!r}") | |
| # Claim: Per-node RetryPolicy + CachePolicy | |
| try: | |
| from locus.multiagent.graph import RetryPolicy, CachePolicy # noqa: F401 | |
| except Exception as e: # noqa: BLE001 | |
| add(f"RetryPolicy/CachePolicy not at locus.multiagent.graph: {e!r}") | |
| # Claim: draw_mermaid + draw_ascii both ship | |
| try: | |
| from locus.multiagent.visualize import draw_mermaid, draw_ascii # noqa: F401 | |
| except Exception as e: # noqa: BLE001 | |
| add(f"draw_mermaid/draw_ascii not importable: {e!r}") | |
| # Claim: GraphConfig holds interrupt_before/interrupt_after/checkpointer | |
| try: | |
| from locus.multiagent.graph import GraphConfig | |
| cfg = GraphConfig(interrupt_before=["a"], interrupt_after=["b"], checkpointer=None) | |
| assert cfg.interrupt_before == ["a"] and cfg.interrupt_after == ["b"] | |
| except Exception as e: # noqa: BLE001 | |
| add(f"GraphConfig interrupt_before/after wiring failed: {e!r}") | |
| # Claim: StreamMode enum has values / updates / nodes / custom | |
| try: | |
| from locus.multiagent.graph import StreamMode | |
| expected = {"values", "updates", "nodes", "custom"} | |
| present = {m.value for m in StreamMode} | |
| missing = expected - present | |
| if missing: | |
| add(f"StreamMode missing modes: {missing} (got {present})") | |
| except Exception as e: # noqa: BLE001 | |
| add(f"StreamMode not importable: {e!r}") | |
| # Claim: Reducer-from-Pydantic extraction (extract_reducers_from_model) | |
| try: | |
| from locus.core.reducers import extract_reducers_from_model # noqa: F401 | |
| except Exception as e: # noqa: BLE001 | |
| add(f"extract_reducers_from_model missing from locus.core.reducers: {e!r}") | |
| # Build a compiled graph once for the next several checks. | |
| from locus.multiagent.graph import START as _S, END as _E | |
| g0 = LocusStateGraph(state_schema=LocusState) | |
| g0.add_node("n", lambda s: {}) | |
| g0.add_edge(_S, "n"); g0.add_edge("n", _E) | |
| compiled = g0.compile() | |
| # Claim from docs/concepts/multi-agent/graph.md (line ~73): | |
| # `result = graph.compile().run_sync({...})` | |
| if not hasattr(compiled, "run_sync"): | |
| add("docs/concepts/multi-agent/graph.md shows `compiled.run_sync(...)` " | |
| "but no such method exists on the compiled graph " | |
| "(only async: ainvoke/astream/execute/stream).") | |
| # Claim from same doc: `graph.compile().get_mermaid()` | |
| if not hasattr(compiled, "get_mermaid"): | |
| add("docs/concepts/multi-agent/graph.md shows `compiled.get_mermaid()` " | |
| "but the actual API is `from locus.multiagent.visualize import " | |
| "draw_mermaid; draw_mermaid(compiled)`.") | |
| # Claim from prose: sync `invoke()` parity with LangGraph's CompiledGraph | |
| if not hasattr(compiled, "invoke"): | |
| add("No sync `compiled.invoke(...)` — LangGraph users expect this " | |
| "as the standard sync entry. Locus exposes only async ainvoke().") | |
| # Verify get_graph() returns something useful for rendering. | |
| if hasattr(compiled, "get_graph"): | |
| sub = compiled.get_graph() | |
| if not hasattr(sub, "draw_mermaid"): | |
| add("`compiled.get_graph()` returns a StateGraph rather than a " | |
| "render-capable object — LangGraph's CompiledGraph.get_graph() " | |
| "returns a Graph with `.draw_mermaid()` / `.draw_ascii()` / " | |
| "`.draw_png()`. Convenience gap.") | |
| return fails | |
| # =========================================================================== | |
| # Main | |
| # =========================================================================== | |
| def main() -> None: | |
| for var in ("OPENAI_API_KEY", "OCI_COMPARTMENT_ID"): | |
| if not os.environ.get(var): | |
| raise SystemExit(f"{var} not set") | |
| print("=" * 76) | |
| print(" LANGGRAPH vs LOCUS — same workflow, same model family (gpt-4o-mini)") | |
| print(f" topic : {TOPIC}") | |
| print(f" target conf : {TARGET_CONFIDENCE} max iter: {MAX_ITER}") | |
| print("=" * 76) | |
| # ----- Equivalence claim check ---------------------------------------- | |
| print("\n— API equivalence claims (verified against current Locus source) —") | |
| fails = verify_equivalence_claims() | |
| if not fails: | |
| print(" ✓ Every claim from the prose comparison resolves cleanly.") | |
| else: | |
| print(f" ✗ {len(fails)} claim(s) didn't hold up:") | |
| for f in fails: | |
| print(f" • {f}") | |
| # ----- Run both graphs ------------------------------------------------- | |
| print("\n— Building + running LangGraph (real OpenAI gpt-4o-mini) —") | |
| lg_graph = build_langgraph() | |
| lg_state = run_langgraph(lg_graph) | |
| print(f" iters={lg_state['iter']} notes={len(lg_state['notes'])} " | |
| f"conf={lg_state['confidence']:.2f} elapsed={lg_state['_elapsed']:.1f}s") | |
| print("\n— Building + running Locus StateGraph (OCI gpt-4o-mini via BOAT-OC1) —") | |
| locus_graph = build_locus_graph() | |
| locus_state = run_locus(locus_graph) | |
| print(f" iters={locus_state['iter']} notes={len(locus_state['notes'])} " | |
| f"conf={locus_state['confidence']:.2f} elapsed={locus_state['_elapsed']:.1f}s") | |
| # ----- Side-by-side ---------------------------------------------------- | |
| print("\n" + "=" * 76) | |
| print(" SIDE-BY-SIDE FINAL STATE") | |
| print("=" * 76) | |
| rows: list[tuple[str, Any, Any]] = [ | |
| ("iters", lg_state["iter"], locus_state["iter"]), | |
| ("notes", len(lg_state["notes"]), len(locus_state["notes"])), | |
| ("confidence", f"{lg_state['confidence']:.2f}", | |
| f"{locus_state['confidence']:.2f}"), | |
| ("elapsed s", f"{lg_state['_elapsed']:.2f}", | |
| f"{locus_state['_elapsed']:.2f}"), | |
| ] | |
| print(f" {'metric':<14}{'LangGraph':<20}{'Locus':<20}") | |
| for name, a, b in rows: | |
| print(f" {name:<14}{str(a):<20}{str(b):<20}") | |
| print("\n— LangGraph draft —") | |
| print(" " + lg_state["draft"].replace("\n", "\n ")) | |
| print("\n— Locus draft —") | |
| print(" " + locus_state["draft"].replace("\n", "\n ")) | |
| # ----- Mermaid from each ---------------------------------------------- | |
| print("\n— Mermaid (LangGraph) —") | |
| try: | |
| print(lg_graph.get_graph().draw_mermaid()) | |
| except Exception as e: # noqa: BLE001 | |
| print(f" (LangGraph Mermaid raised: {e!r})") | |
| print("— Mermaid (Locus) —") | |
| try: | |
| from locus.multiagent.visualize import draw_mermaid as locus_draw_mermaid | |
| print(locus_draw_mermaid(locus_graph)) | |
| except Exception as e: # noqa: BLE001 | |
| print(f" (Locus Mermaid raised: {e!r})") | |
| # ----- Verdict --------------------------------------------------------- | |
| print("\n" + "=" * 76) | |
| if fails: | |
| print(f" RESULT: workflow ran in both; {len(fails)} doc/API gap(s) for Locus follow-up:") | |
| print("=" * 76) | |
| for i, f in enumerate(fails, 1): | |
| print(f"\n Issue #{i}\n {f}") | |
| else: | |
| print(" RESULT: workflow ran in both; every doc claim verified.") | |
| print("=" * 76) | |
| if __name__ == "__main__": | |
| main() |
