# Building a LangGraph RAG Agent from Scratch — with a Live UI That Shows Every Step

> Source: <https://dev.to/ameya_joshi_68fa01c3a1a16/building-a-langgraph-rag-agent-from-scratch-with-a-live-ui-that-shows-every-step-4nle>
> Published: 2026-06-06 20:06:02+00:00

I built a learning project that teaches LangChain and LangGraph step by step — starting from a raw LLM call and ending with a full ReAct agent backed by RAG, streamed over SSE to a React UI that **visualises every node in the agent loop in real time**.

This post walks through the whole thing: what each concept does, how it connects to the next, and how the live pipeline view works.

```
frontend/   ← React + Vite chat UI (live agent loop visualisation)
backend/    ← FastAPI server wrapping the RAG agent
step*.py    ← 6 progressive learning files
```

The agent answers questions about rate limiting algorithms. That's just the domain — the real goal is to understand **how LangChain and LangGraph fit together**.

| File | Concept introduced |
|---|---|
`step1_llm_basics.py` |
Chat models, messages, `.invoke()` , statelessness |
`step2_prompts_and_chains.py` |
Prompt templates, LCEL `\ |
{% raw %}`step3_tools.py`
|
`@tool` decorator, `bind_tools()` , manual tool loop |
`step4_langgraph_intro.py` |
`StateGraph` , nodes, edges, conditional routing |
`step5_full_agent.py` |
Full ReAct loop with `ToolNode`
|
`step6_rag_agent.py` |
RAG — FAISS, HuggingFace embeddings, retriever tool |

The simplest possible thing: call a model and read the reply.

``` python
from langchain_groq import ChatGroq
from langchain_core.messages import SystemMessage, HumanMessage

llm = ChatGroq(model="llama-3.3-70b-versatile")

messages = [
    SystemMessage(content="You are a rate limiting expert."),
    HumanMessage(content="What is token bucket?"),
]

response = llm.invoke(messages)
print(response.content)
```

**Key insight:** The LLM is stateless. Every call is independent. You manage the conversation history yourself by passing the full message list each time.

LangChain Expression Language (LCEL) lets you compose components with the `|`

pipe operator — the same way Unix pipes work.

``` python
from langchain_core.prompts import ChatPromptTemplate

prompt = ChatPromptTemplate.from_messages([
    ("system", "You are a rate limiting expert."),
    ("human", "{question}"),
])

# Chain: prompt → LLM
chain = prompt | llm

# Invoke
response = chain.invoke({"question": "Compare token bucket and leaky bucket"})

# Stream tokens as they arrive
for chunk in chain.stream({"question": "What is sliding window log?"}):
    print(chunk.content, end="", flush=True)
```

**Key insight:** LCEL chains are lazy. `.stream()`

and `.batch()`

are first-class — no extra code needed.

Tools let the LLM take actions. The `@tool`

decorator turns a Python function into something the model can call.

``` python
from langchain_core.tools import tool
from langchain_groq import ChatGroq

@tool
def get_algorithm_info(algorithm: str) -> str:
    """Return a brief description of a rate limiting algorithm."""
    descriptions = {
        "token_bucket":    "Tokens refill at a fixed rate up to a capacity cap. Allows bursts.",
        "fixed_window":    "Counts requests in fixed time windows. Simple but has boundary spikes.",
        "sliding_window":  "Precise per-request log. High memory, no boundary spikes.",
        "leaky_bucket":    "Queue drains at a constant rate. Smooths traffic, no bursts allowed.",
    }
    return descriptions.get(algorithm, "Unknown algorithm.")

# Bind tools to the model — it now knows what tools exist and their signatures
llm_with_tools = ChatGroq(model="meta-llama/llama-4-scout-17b-16e-instruct").bind_tools(
    [get_algorithm_info]
)

response = llm_with_tools.invoke("Tell me about token bucket")
# response.tool_calls → [{"name": "get_algorithm_info", "args": {"algorithm": "token_bucket"}}]
```

**Key insight:** `bind_tools()`

sends the tool schemas to the model. The model returns a structured `tool_calls`

list — it does not execute the tools itself. *You* run them and send the results back.

LangGraph models the agent as a **state machine**. You define:

``` python
from langgraph.graph import StateGraph, END
from langgraph.graph.message import add_messages
from typing import Annotated
from typing_extensions import TypedDict

class State(TypedDict):
    messages: Annotated[list, add_messages]  # reducer: appends, never replaces

def node_a(state: State):
    return {"messages": ["Hello from node A"]}

def node_b(state: State):
    return {"messages": ["Hello from node B"]}

def route(state: State):
    return "b" if len(state["messages"]) < 3 else END

graph = StateGraph(State)
graph.add_node("a", node_a)
graph.add_node("b", node_b)
graph.set_entry_point("a")
graph.add_conditional_edges("a", route, {"b": "b", END: END})
graph.add_edge("b", "a")

app = graph.compile()
```

**Key insight:** `add_messages`

is a **reducer**. When a node returns `{"messages": [new_msg]}`

, LangGraph appends it to the list instead of replacing it. This is how the conversation history accumulates automatically.

The ReAct pattern (Reason + Act) is: LLM decides what to do → tools execute it → LLM sees the result → repeat.

LangGraph's `ToolNode`

handles the execution side automatically.

``` python
from langgraph.graph import StateGraph, END
from langgraph.graph.message import add_messages
from langgraph.prebuilt import ToolNode
from langchain_groq import ChatGroq
from langchain_core.messages import HumanMessage
from typing import Annotated
from typing_extensions import TypedDict

tools = [get_algorithm_info, recommend_algorithm, calculate_token_bucket]
llm   = ChatGroq(model="meta-llama/llama-4-scout-17b-16e-instruct").bind_tools(tools)

class State(TypedDict):
    messages: Annotated[list, add_messages]

def llm_node(state: State):
    return {"messages": [llm.invoke(state["messages"])]}

def tools_condition(state: State):
    return "tools" if state["messages"][-1].tool_calls else END

graph = StateGraph(State)
graph.add_node("llm",   llm_node)
graph.add_node("tools", ToolNode(tools))
graph.set_entry_point("llm")
graph.add_conditional_edges("llm", tools_condition)
graph.add_edge("tools", "llm")  # always loop back after tool execution

agent = graph.compile()

result = agent.invoke({"messages": [HumanMessage(content="What algorithm for bursty traffic?")]})
print(result["messages"][-1].content)
```

**The loop:**

```
START → [llm] → has tool_calls? → YES → [tools] → back to [llm]
                                → NO  → END
```

Retrieval-Augmented Generation (RAG) gives the agent long-form knowledge from documents. We embed documents into a FAISS vector store and expose it as a tool.

``` python
from langchain_huggingface import HuggingFaceEmbeddings
from langchain_community.vectorstores import FAISS
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain_core.tools import tool

# Index documents once at startup
embeddings = HuggingFaceEmbeddings(model_name="all-MiniLM-L6-v2")

docs = load_knowledge_base()           # returns list of Document objects
splitter = RecursiveCharacterTextSplitter(chunk_size=500, chunk_overlap=50)
chunks = splitter.split_documents(docs)

vectorstore = FAISS.from_documents(chunks, embeddings)
retriever   = vectorstore.as_retriever(search_kwargs={"k": 3})

# Expose retrieval as a tool
@tool
def search_knowledge_base(query: str) -> str:
    """Search the rate limiting knowledge base for relevant information."""
    docs = retriever.invoke(query)
    return "\n---\n".join(d.page_content for d in docs)
```

**Key insight:** RAG is just a tool from the agent's perspective. The LLM decides *when* to call it based on the question. The retriever converts the query to an embedding, finds the nearest chunks in FAISS, and returns them as context.

The backend wraps the agent in a FastAPI server. The interesting part is the streaming endpoint, which uses `agent.astream_events()`

— a granular async generator that fires events for every internal state change in the graph.

``` python
from fastapi.responses import StreamingResponse
from langchain_core.messages import HumanMessage
import json

@app.post("/chat/stream")
async def chat_stream(request: ChatRequest):
    async def generate():
        llm_call_count = 0
        graph_started  = False

        async for event in agent.astream_events(
            {"messages": [HumanMessage(content=request.message)]},
            version="v2",
        ):
            kind = event["event"]
            node = event.get("metadata", {}).get("langgraph_node", "")

            # LLM node starting
            if kind == "on_chat_model_start" and node == "llm":
                if not graph_started:
                    graph_started = True
                    yield sse({"type": "pipeline", "phase": "graph_start"})
                llm_call_count += 1
                yield sse({"type": "pipeline", "phase": "llm_start", "call": llm_call_count})

            # LLM done — emit routing decision
            elif kind == "on_chat_model_end" and node == "llm":
                output     = event["data"].get("output")
                tool_calls = getattr(output, "tool_calls", []) if output else []
                yield sse({
                    "type":       "pipeline",
                    "phase":      "llm_end",
                    "decision":   "tools" if tool_calls else "answer",
                    "tool_names": [tc["name"] for tc in tool_calls],
                })

            # Tool executing
            elif kind == "on_tool_start":
                yield sse({"type": "pipeline", "phase": "tool_start",
                           "tool": event["name"], "args": event["data"].get("input", {})})

            # Tool done
            elif kind == "on_tool_end":
                out     = event["data"].get("output", "")
                content = out.content if hasattr(out, "content") else str(out)
                yield sse({"type": "pipeline", "phase": "tool_end",
                           "tool": event["name"], "preview": content[:120]})

            # Individual LLM output tokens (final answer only)
            elif kind == "on_chat_model_stream" and node == "llm":
                chunk = event["data"]["chunk"]
                if chunk.content and not getattr(chunk, "tool_call_chunks", []):
                    yield sse({"type": "token", "content": chunk.content})

        yield sse({"type": "pipeline", "phase": "graph_end"})
        yield "data: [DONE]\n\n"

    return StreamingResponse(generate(), media_type="text/event-stream")
```

**Why astream_events instead of astream?**

`astream()`

gives you one event per *node* that completes — coarse-grained. `astream_events(version="v2")`

fires for every internal lifecycle hook: model start/stream/end, tool start/end, chain start/end. This is what lets us show individual tokens and the routing decision in real time.

Every assistant response shows a collapsible **Agent Loop** panel. Each node card appears and updates live as the corresponding event arrives from the SSE stream.

```
🚀 StateGraph Initialized          [langgraph]
   StateGraph.compile() · add_messages reducer
   ↓
🧠 LLM Node — Call #1  ⟳           [langchain]   ← spinning while active
   ChatGroq(llama-4-scout) · bind_tools(4)
   AIMessage has tool_calls → selected: search_knowledge_base
   ↓
◆  Conditional Edge → tools node   [langgraph]
   add_conditional_edges · tools_condition(state)
   has tool_calls → route to tools
   ↓
🔍 ToolNode: search_knowledge_base ⟳ [langchain]
   FAISS vector search · HuggingFace embeddings
   query: HTTP headers rate limiting
   → Retrieved 3 relevant chunk(s)
   ↓
🧠 LLM Node — Call #2  ✓           [langchain]
   LLM sees ToolMessage in state
   no tool_calls → generating final answer
   ↓
◆  Conditional Edge → END          [langgraph]
   no tool_calls → route to END
   ↓
🏁 Graph END                       [langgraph]
   messages[-1].content → response
```

Nodes are **colour-coded**:

Badges identify which framework is responsible: `langgraph`

(purple) vs `langchain`

(orange).

Tokens from the LLM arrive in bursts over SSE. Rather than applying them immediately, a character queue drains at a fixed pace (18ms/char) so the text types out at a readable speed:

``` js
const CHAR_DELAY = 18  // ms per character

// When a token event arrives, push each character into the queue
if (ev.type === 'token') {
  tokenQueue.current.push(...ev.content.split(''))
  startTicker(assistantId)
}

// Ticker drains one char at a time
const startTicker = (id) => {
  tickerRef.current = setInterval(() => {
    if (!tokenQueue.current.length) return
    const ch = tokenQueue.current.shift()
    setMessages(prev => prev.map(m =>
      m.id === id ? { ...m, content: (m.content || '') + ch } : m
    ))
  }, CHAR_DELAY)
}
```

| Layer | Technology |
|---|---|
| LLM | Groq — `llama-4-scout-17b` (tool calling), `llama-3.3-70b` (text) |
| Agent framework | LangGraph — `StateGraph` , `ToolNode` , `add_conditional_edges`
|
| RAG | LangChain + HuggingFace `all-MiniLM-L6-v2` embeddings + FAISS |
| Streaming |
`astream_events(version="v2")` → Server-Sent Events |
| Backend | FastAPI + uvicorn |
| Frontend | React 18 + Vite + react-markdown |

```
# Python deps (uses uv to avoid system Python issues)
uv venv .venv --python 3.12
uv pip install -r requirements.txt

# Frontend deps
cd frontend && npm install && cd ..

# Terminal 1 — backend
cd backend
GROQ_API_KEY=your_key uvicorn main:app --port 8000 --reload

# Terminal 2 — frontend
cd frontend && npm run dev
```

Open ** http://localhost:5173**. The first run downloads the embedding model (~90 MB) and caches it.

**LangChain** gives you the building blocks: models, prompt templates, tools, LCEL chains, vector stores.

**LangGraph** gives you the control flow: a state machine where you decide the loop, the branching, and when to stop.

The two fit together naturally — LangGraph nodes call LangChain components, and LangChain tools feed results back into LangGraph state via `add_messages`

.

The most clarifying thing was building the UI that shows the loop. When you watch the graph execute in real time — LLM node lights up, routing decision fires, ToolNode spins, LLM node fires again — the ReAct pattern stops being abstract and becomes something you can see.

*The full source is on GitHub. The step*.py files are designed to be read in order — each one is self-contained and introduces exactly one new concept.*
