OAuth 2.0 Authorization Server framework for MCP servers. Issue and manage tokens that protect MCP tool access.
Pair with mcp-authflow-resource on the resource server side.
Token storage with PostgreSQL and in-memory backendsRFC 6749 standardized OAuth error responsesRFC 7523 client authentication with algorithm allowlist and JTI replay protection (Redis or in-memory)private_key_jwt
RFC 7636 PKCE verification (S256
+plain
) and input validation for the token endpointRFC 8628 Device Authorization Grant— sans-IO polling state machine and code generators** Sliding-window rate limitingfor token endpoints Input validationfor client IDs and scopes CORS helperswith origin allowlisting Async-first**design, built on Starlette
pip install mcp-authflow
pip install mcp-authflow[postgres]
Build an OAuth authorization server that issues tokens for MCP clients:
import secrets
import time
from contextlib import asynccontextmanager
from starlette.applications import Starlette
from starlette.requests import Request
from starlette.responses import JSONResponse
from starlette.routing import Route
from mcp_authflow.rate_limiting import SlidingWindowRateLimiter
from mcp_authflow.responses import invalid_request, rate_limit_exceeded
from mcp_authflow.storage import MemoryTokenStorage
from mcp_authflow.validation import parse_scope_field, validate_client_id
storage = MemoryTokenStorage() # Use PostgresTokenStorage for production
limiter = SlidingWindowRateLimiter(requests_per_window=60, window_seconds=3600)
async def token_endpoint(request: Request) -> JSONResponse:
form = await request.form()
client_id = str(form.get("client_id", ""))
if not limiter.is_allowed(client_id):
return rate_limit_exceeded(
"Too many requests",
retry_after=limiter.get_retry_after(client_id),
)
if not validate_client_id(client_id):
return invalid_request("Invalid client_id format")
token = secrets.token_urlsafe(32)
scopes = parse_scope_field(form.get("scope"))
expires_at = int(time.time()) + 3600
await storage.store_token(
token=token,
client_id=client_id,
scopes=scopes.split(),
expires_at=expires_at,
resource=str(form.get("resource", "")),
)
return JSONResponse({
"access_token": token,
"token_type": "bearer",
"expires_in": 3600,
"scope": scopes,
})
async def introspect_endpoint(request: Request) -> JSONResponse:
form = await request.form()
token = str(form.get("token", ""))
token_data = await storage.load_token(token)
if not token_data or token_data["expires_at"] < time.time():
return JSONResponse({"active": False})
return JSONResponse({
"active": True,
"client_id": token_data["client_id"],
"scope": " ".join(token_data["scopes"]),
"exp": token_data["expires_at"],
"aud": token_data.get("resource", ""),
})
@asynccontextmanager
async def lifespan(app):
await storage.initialize()
yield
await storage.close()
app = Starlette(
routes=[
Route("/token", token_endpoint, methods=["POST"]),
Route("/introspect", introspect_endpoint, methods=["POST"]),
],
lifespan=lifespan,
)
Run with: uvicorn myapp:app --port 8000
MCP Client (Claude, etc.)
|
1. Authorization request
|
v
+---------------------+
| Auth Server | <-- this package
| (mcp-authflow) |
| |
| /token | 2. Issues access token
| /introspect | 4. Validates token
+---------------------+
^
|
4. Token introspection (RFC 7662)
|
+---------------------+
| Resource Server | <-- mcp-authflow-resource
| (MCP tools) |
| |
| 3. Client calls |
| MCP tools with |
| Bearer token |
+---------------------+
- MCP client authenticates with the auth server
- Auth server issues an access token (stored in PostgreSQL or memory)
- Client calls MCP tools on the resource server with the Bearer token
- Resource server validates the token by calling the auth server's
/introspect
endpoint
Abstract base class with two implementations:
from mcp_authflow.storage import MemoryTokenStorage, PostgresTokenStorage
storage = MemoryTokenStorage()
storage = PostgresTokenStorage(database_url="postgresql://user:pass@host/db")
storage = PostgresTokenStorage()
await storage.initialize() # Open the connection pool (in-memory needs no setup)
PostgresTokenStorage
does not create or migrate its schema — it expects the tables to already exist, so you stay in control of migrations. Apply this DDL (e.g. via your migration tool) before first use:
CREATE TABLE IF NOT EXISTS mcp_access_tokens (
token TEXT PRIMARY KEY,
client_id TEXT NOT NULL,
scopes TEXT NOT NULL DEFAULT '',
resource TEXT,
expires_at TIMESTAMPTZ NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
user_id INTEGER
);
-- Only needed if you use the refresh-token methods.
CREATE TABLE IF NOT EXISTS mcp_refresh_tokens (
token TEXT PRIMARY KEY,
client_id TEXT NOT NULL,
scopes TEXT NOT NULL DEFAULT '',
resource TEXT,
expires_at TIMESTAMPTZ NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
user_id INTEGER
);
Storage interface:
| Method | Description |
|---|---|
store_token(token, client_id, scopes, expires_at, resource?, user_id?) |
|
| Store an access token | |
| `load_token(token) -> dict | None` |
| Look up a token | |
delete_token(token) |
|
| Revoke a token | |
cleanup_expired_tokens() -> int |
|
| Purge expired tokens, returns count | |
get_token_count() -> int |
|
| Count active tokens | |
store_refresh_token(...) |
|
| Store a refresh token (same interface) | |
| `load_refresh_token(token) -> dict | None` |
| Look up a refresh token | |
delete_refresh_token(token) |
|
| Revoke a refresh token | |
cleanup_expired_refresh_tokens() -> int |
|
| Purge expired refresh tokens, returns count |
Token data returned by load_token()
:
{
"token": str,
"client_id": str,
"scopes": list[str],
"resource": str | None, # RFC 8707 resource binding
"expires_at": int, # Unix timestamp
"created_at": int, # Unix timestamp
"user_id": int | None,
}
Standardized error helpers following RFC 6749:
from mcp_authflow.responses import (
invalid_request, # 400 - Missing/invalid parameters
invalid_client, # 401 - Authentication failure
invalid_grant, # 400 - Expired/invalid code or token
invalid_scope, # 400 - Scope violation
slow_down, # 400 - Device flow rate limiting
rate_limit_exceeded, # 429 - Too many requests
server_error, # 500 - Internal error
backend_timeout, # 504 - Upstream timeout
)
Each returns a Starlette JSONResponse
with the appropriate status code and Cache-Control: no-store
header.
from mcp_authflow.rate_limiting import SlidingWindowRateLimiter
limiter = SlidingWindowRateLimiter(
requests_per_window=60, # Max requests per window
window_seconds=3600, # Window duration (1 hour)
)
if not limiter.is_allowed(client_id):
retry_after = limiter.get_retry_after(client_id) # Seconds until next allowed request
python
from mcp_authflow.validation import validate_client_id, parse_scope_field
validate_client_id("my-client-123") # True (alphanumeric + hyphens/underscores)
validate_client_id("") # False
parse_scope_field("read write") # "read write"
parse_scope_field(["read", "write"]) # "read write"
parse_scope_field(None) # "read" (default)
python
from mcp_authflow.cors import parse_allowed_origins, build_cors_headers
origins = parse_allowed_origins()
headers = build_cors_headers(request, origins)
| Env Variable | Description | Default |
|---|---|---|
DATABASE_URL |
||
PostgreSQL connection string (for PostgresTokenStorage ) |
||
| Required for postgres | ||
ALLOWED_MCP_ORIGINS |
||
| Comma-separated allowed CORS origins | Empty (no CORS) |
MIT