cd /news/ai-tools/show-hn-oauth-2-0-framework-for-mcp-… · home topics ai-tools article
[ARTICLE · art-15497] src=github.com pub= topic=ai-tools verified=true sentiment=· neutral

Show HN: OAuth 2.0 framework for MCP servers

A new open-source Python framework, mcp-authflow, provides an OAuth 2.0 authorization server for Model Context Protocol (MCP) servers, enabling token issuance and management to protect MCP tool access. The framework supports RFC 6749, RFC 7523, RFC 7636 PKCE, and RFC 8628 Device Authorization Grant, with PostgreSQL and in-memory token storage, sliding-window rate limiting, and CORS helpers. Developers can install the package via pip and build an authorization server that issues tokens for MCP clients, with endpoints for token issuance and introspection.

read5 min publishedMay 27, 2026

OAuth 2.0 Authorization Server framework for MCP servers. Issue and manage tokens that protect MCP tool access.

Pair with mcp-authflow-resource on the resource server side.

Token storage with PostgreSQL and in-memory backendsRFC 6749 standardized OAuth error responsesRFC 7523 client authentication with algorithm allowlist and JTI replay protection (Redis or in-memory)private_key_jwt

RFC 7636 PKCE verification (S256

+plain

) and input validation for the token endpointRFC 8628 Device Authorization Grant— sans-IO polling state machine and code generators** Sliding-window rate limitingfor token endpoints Input validationfor client IDs and scopes CORS helperswith origin allowlisting Async-first**design, built on Starlette

pip install mcp-authflow

pip install mcp-authflow[postgres]

Build an OAuth authorization server that issues tokens for MCP clients:

import secrets
import time
from contextlib import asynccontextmanager

from starlette.applications import Starlette
from starlette.requests import Request
from starlette.responses import JSONResponse
from starlette.routing import Route

from mcp_authflow.rate_limiting import SlidingWindowRateLimiter
from mcp_authflow.responses import invalid_request, rate_limit_exceeded
from mcp_authflow.storage import MemoryTokenStorage
from mcp_authflow.validation import parse_scope_field, validate_client_id


storage = MemoryTokenStorage()  # Use PostgresTokenStorage for production
limiter = SlidingWindowRateLimiter(requests_per_window=60, window_seconds=3600)


async def token_endpoint(request: Request) -> JSONResponse:
    form = await request.form()
    client_id = str(form.get("client_id", ""))

    if not limiter.is_allowed(client_id):
        return rate_limit_exceeded(
            "Too many requests",
            retry_after=limiter.get_retry_after(client_id),
        )

    if not validate_client_id(client_id):
        return invalid_request("Invalid client_id format")

    token = secrets.token_urlsafe(32)
    scopes = parse_scope_field(form.get("scope"))
    expires_at = int(time.time()) + 3600

    await storage.store_token(
        token=token,
        client_id=client_id,
        scopes=scopes.split(),
        expires_at=expires_at,
        resource=str(form.get("resource", "")),
    )

    return JSONResponse({
        "access_token": token,
        "token_type": "bearer",
        "expires_in": 3600,
        "scope": scopes,
    })


async def introspect_endpoint(request: Request) -> JSONResponse:
    form = await request.form()
    token = str(form.get("token", ""))

    token_data = await storage.load_token(token)
    if not token_data or token_data["expires_at"] < time.time():
        return JSONResponse({"active": False})

    return JSONResponse({
        "active": True,
        "client_id": token_data["client_id"],
        "scope": " ".join(token_data["scopes"]),
        "exp": token_data["expires_at"],
        "aud": token_data.get("resource", ""),
    })

@asynccontextmanager
async def lifespan(app):
    await storage.initialize()
    yield
    await storage.close()

app = Starlette(
    routes=[
        Route("/token", token_endpoint, methods=["POST"]),
        Route("/introspect", introspect_endpoint, methods=["POST"]),
    ],
    lifespan=lifespan,
)

Run with: uvicorn myapp:app --port 8000

                         MCP Client (Claude, etc.)
                                |
                  1. Authorization request
                                |
                                v
                    +---------------------+
                    |   Auth Server        |   <-- this package
                    |   (mcp-authflow)  |
                    |                     |
                    |  /token             |   2. Issues access token
                    |  /introspect        |   4. Validates token
                    +---------------------+
                                ^
                                |
                     4. Token introspection (RFC 7662)
                                |
                    +---------------------+
                    |   Resource Server    |   <-- mcp-authflow-resource
                    |   (MCP tools)       |
                    |                     |
                    |  3. Client calls    |
                    |     MCP tools with  |
                    |     Bearer token    |
                    +---------------------+
  • MCP client authenticates with the auth server
  • Auth server issues an access token (stored in PostgreSQL or memory)
  • Client calls MCP tools on the resource server with the Bearer token
  • Resource server validates the token by calling the auth server's /introspect

endpoint

Abstract base class with two implementations:

from mcp_authflow.storage import MemoryTokenStorage, PostgresTokenStorage

storage = MemoryTokenStorage()

storage = PostgresTokenStorage(database_url="postgresql://user:pass@host/db")
storage = PostgresTokenStorage()

await storage.initialize()  # Open the connection pool (in-memory needs no setup)

PostgresTokenStorage

does not create or migrate its schema — it expects the tables to already exist, so you stay in control of migrations. Apply this DDL (e.g. via your migration tool) before first use:

CREATE TABLE IF NOT EXISTS mcp_access_tokens (
    token       TEXT PRIMARY KEY,
    client_id   TEXT NOT NULL,
    scopes      TEXT NOT NULL DEFAULT '',
    resource    TEXT,
    expires_at  TIMESTAMPTZ NOT NULL,
    created_at  TIMESTAMPTZ NOT NULL DEFAULT now(),
    user_id     INTEGER
);

-- Only needed if you use the refresh-token methods.
CREATE TABLE IF NOT EXISTS mcp_refresh_tokens (
    token       TEXT PRIMARY KEY,
    client_id   TEXT NOT NULL,
    scopes      TEXT NOT NULL DEFAULT '',
    resource    TEXT,
    expires_at  TIMESTAMPTZ NOT NULL,
    created_at  TIMESTAMPTZ NOT NULL DEFAULT now(),
    user_id     INTEGER
);

Storage interface:

Method Description
store_token(token, client_id, scopes, expires_at, resource?, user_id?)
Store an access token
`load_token(token) -> dict None`
Look up a token
delete_token(token)
Revoke a token
cleanup_expired_tokens() -> int
Purge expired tokens, returns count
get_token_count() -> int
Count active tokens
store_refresh_token(...)
Store a refresh token (same interface)
`load_refresh_token(token) -> dict None`
Look up a refresh token
delete_refresh_token(token)
Revoke a refresh token
cleanup_expired_refresh_tokens() -> int
Purge expired refresh tokens, returns count

Token data returned by load_token()

:

{
    "token": str,
    "client_id": str,
    "scopes": list[str],
    "resource": str | None,       # RFC 8707 resource binding
    "expires_at": int,            # Unix timestamp
    "created_at": int,            # Unix timestamp
    "user_id": int | None,
}

Standardized error helpers following RFC 6749:

from mcp_authflow.responses import (
    invalid_request,       # 400 - Missing/invalid parameters
    invalid_client,        # 401 - Authentication failure
    invalid_grant,         # 400 - Expired/invalid code or token
    invalid_scope,         # 400 - Scope violation
    slow_down,             # 400 - Device flow rate limiting
    rate_limit_exceeded,   # 429 - Too many requests
    server_error,          # 500 - Internal error
    backend_timeout,       # 504 - Upstream timeout
)

Each returns a Starlette JSONResponse

with the appropriate status code and Cache-Control: no-store

header.

from mcp_authflow.rate_limiting import SlidingWindowRateLimiter

limiter = SlidingWindowRateLimiter(
    requests_per_window=60,   # Max requests per window
    window_seconds=3600,      # Window duration (1 hour)
)

if not limiter.is_allowed(client_id):
    retry_after = limiter.get_retry_after(client_id)  # Seconds until next allowed request
python
from mcp_authflow.validation import validate_client_id, parse_scope_field

validate_client_id("my-client-123")  # True (alphanumeric + hyphens/underscores)
validate_client_id("")               # False

parse_scope_field("read write")      # "read write"
parse_scope_field(["read", "write"]) # "read write"
parse_scope_field(None)              # "read" (default)
python
from mcp_authflow.cors import parse_allowed_origins, build_cors_headers

origins = parse_allowed_origins()

headers = build_cors_headers(request, origins)
Env Variable Description Default
DATABASE_URL
PostgreSQL connection string (for PostgresTokenStorage )
Required for postgres
ALLOWED_MCP_ORIGINS
Comma-separated allowed CORS origins Empty (no CORS)

MIT

── more in #ai-tools 4 stories · sorted by recency
sponsored brought to you by zahid.host 4,200+ EU-deployed projects
reading about agents? ship yours in a single git push.

Run your AI side-project on zahid.host

EU-based hosting, git-push deploys, automatic HTTPS, no cold starts. Free tier with a custom domain — perfect for shipping the agent you just read about.

$git push zahid main
Live at https://your-agent.zahid.host
Get free account → Pricing
from €0/mo · no card required
LIVE [news/show-hn-oauth-2-0-fr…] indexed:0 read:5min 2026-05-27 ·