This article was originally published on
[aifoss.dev]
TL;DR: ComfyUI's built-in HTTP server accepts workflow JSON on port 8188 β the same JSON the GUI exports when you click "Save (API Format)". You can queue a prompt, poll for completion, and download the result in under 50 lines of Python. No GUI, no browser tab, no manual clicking.
What you'll have running after this guide:
The GUI is fine for building and testing workflows. It's a problem once you need more than a few images, or need to integrate generation into a pipeline.
Common use cases where the API makes sense:
If you're generating one image at a time and want to tweak nodes visually, stay in the GUI. The API is for automation.
pip install requests websocket-client
If you haven't installed ComfyUI yet, the ComfyUI review covers installation from scratch. For GPU hardware requirements, the Stable Diffusion 8GB VRAM guide has a good breakdown of what each model tier needs.
By default, ComfyUI only listens on 127.0.0.1
β accessible only from the same machine. That's fine if you're scripting locally. For a remote GPU box or Docker container, add --listen
:
python main.py --port 8188
python main.py --listen 0.0.0.0 --port 8188
python main.py --listen 0.0.0.0 --port 8188 --lowvram
Once running, verify it's up:
curl http://127.0.0.1:8188/system_stats
The server starts a REST API and a WebSocket server on the same port. There is no authentication by default β if you expose this to a network, use a firewall rule or reverse proxy with auth.
The GUI workflow JSON and the API workflow JSON are different formats. The GUI format includes node positions, colors, and UI state. The API format is stripped down to just inputs and connections β which is all the server needs.
To export:
workflow_api.json
The API format uses numeric string keys ("1"
, "2"
, "3"
) as node IDs. Each node has a class_type
and an inputs
object. Connections between nodes are expressed as ["source_node_id", output_index]
arrays rather than named references.
Here is a minimal SDXL baseline workflow in API format:
{
"4": {
"class_type": "CheckpointSimple",
"inputs": {
"ckpt_name": "sd_xl_base_1.0.safetensors"
}
},
"5": {
"class_type": "EmptyLatentImage",
"inputs": {
"batch_size": 1,
"height": 1024,
"width": 1024
}
},
"6": {
"class_type": "CLIPTextEncode",
"inputs": {
"clip": ["4", 1],
"text": "a photorealistic red fox in snow, golden hour lighting"
}
},
"7": {
"class_type": "CLIPTextEncode",
"inputs": {
"clip": ["4", 1],
"text": "blurry, low quality, watermark, text"
}
},
"3": {
"class_type": "KSampler",
"inputs": {
"cfg": 7.0,
"denoise": 1.0,
"latent_image": ["5", 0],
"model": ["4", 0],
"negative": ["7", 0],
"positive": ["6", 0],
"sampler_name": "euler",
"scheduler": "normal",
"seed": 42,
"steps": 20
}
},
"8": {
"class_type": "VAEDecode",
"inputs": {
"samples": ["3", 0],
"vae": ["4", 2]
}
},
"9": {
"class_type": "SaveImage",
"inputs": {
"filename_prefix": "api_output",
"images": ["8", 0]
}
}
}
For SDXL specifically, node "4"
connects to both CLIP encoders and the KSampler via its three outputs: [0]
= model, [1]
= CLIP, [2]
= VAE. The connection syntax ["4", 1]
means "output index 1 of node 4."
import uuid
import json
import requests
SERVER = "127.0.0.1:8188"
CLIENT_ID = str(uuid.uuid4())
def queue_prompt(workflow: dict) -> str:
"""Submit a workflow. Returns the prompt_id for tracking."""
payload = {"prompt": workflow, "client_id": CLIENT_ID}
r = requests.post(f"http://{SERVER}/prompt", json=payload)
r.raise_for_status()
return r.json()["prompt_id"]
The client_id
is a UUID you generate once per session. It ties your WebSocket connection to your HTTP requests so the server routes status messages back to you. You can skip it for pure HTTP polling, but you need it for WebSocket tracking.
The server responds immediately with {"prompt_id": "<uuid>", "number": <queue_position>}
. Generation hasn't started yet β the prompt is in queue.
Two approaches β choose based on your use case:
| Approach | Latency | Complexity | Best for |
|---|---|---|---|
HTTP polling /history/{id} |
|||
| ~1s overhead | Low β no extra library | Scripts, batch jobs | |
WebSocket /ws?clientId=... |
|||
| Near-real-time | Medium β event loop | Apps that show progress |
import time
def wait_for_completion(prompt_id: str, poll_secs: float = 1.0) -> dict:
"""Block until the prompt finishes. Returns the history entry."""
url = f"http://{SERVER}/history/{prompt_id}"
while True:
r = requests.get(url)
r.raise_for_status()
history = r.json()
if prompt_id in history:
return history[prompt_id]
time.sleep(poll_secs)
The /history/{prompt_id}
endpoint returns an empty dict {}
while the prompt is queued or running, and the full result object once done. Polling every second adds at most 1 second of latency to your total generation time β acceptable for batch scripts.
import websocket
def generate_with_ws(workflow: dict) -> str:
"""Queue and wait for completion via WebSocket. Returns prompt_id."""
ws = websocket.WebSocket()
ws.connect(f"ws://{SERVER}/ws?clientId={CLIENT_ID}")
prompt_id = queue_prompt(workflow)
while True:
raw = ws.recv()
if not isinstance(raw, str):
continue # binary preview frames β skip
msg = json.loads(raw)
if msg["type"] == "executing":
data = msg["data"]
if data["node"] is None and data["prompt_id"] == prompt_id:
break # null node = generation finished
ws.close()
return prompt_id
The executing
message fires for each node as it runs. When node
is None
and the prompt_id
matches yours, the entire graph has finished executing.
python
import os
def download_images(history_entry: dict, output_dir: str = "output") -> list[str]:
"""Download all output images from a completed prompt."""
os.makedirs(output_dir, exist_ok=True)
saved = []