SchemaFlow: Agentic Database Change Impact Analysis, SQL Gen and Eval Guardrails OpenAI released SchemaFlow, an AI-assisted database change workflow using the OpenAI Agents SDK, which converts natural-language requests into structured JSON, performs impact analysis, generates SQL, and validates output with guardrails. The system aims to reduce risks in database changes by providing auditable bundles with traceable analysis and implementation artifacts. This cookbook walks through an end-to-end AI-assisted database change workflow using the OpenAI Agents SDK. It demonstrates how OpenAI’s tooling ecosystem can be applied to orchestrate complex, data-intensive workflows across modern enterprise infrastructures. While the current implementation focuses on a retail-oriented schema change and impact-analysis use case, the underlying architectural patterns are domain-agnostic and extensible. The same workflow design can be adapted across industries such as manufacturing, pharmaceuticals, healthcare, logistics, finance, and supply chain operations — wherever structured data workflows, operational reasoning, retrieval-augmented analysis, and automated validation are required. The running example is a retail loyalty-tier change, but the same pattern applies to many database-change requests where teams need traceable impact analysis and reviewable implementation output. The workflow starts from a natural-language database change request, converts it into structured JSON, optionally grounds impact analysis with PDF-based File Search context, generates a safe rollout plan, drafts SQL across data platform layers, validates the output with deterministic guardrails, saves a reusable artifact, and optionally evaluates the flow with Promptfoo. The notebook is intentionally self-contained: all core workflow logic, prompts, guardrails, artifact generation, and eval runtime files are created from notebook cells. Overview Schema changes are deceptively simple. A request like “add a nullable column and backfill it” can affect landing tables, staging models, dimensional tables, marts, reporting logic, lineage assumptions, validation checks, rollback procedures, and release sequencing. The examples use retail customer data because the dependencies are easy to see, but the same kinds of handoffs show up in many analytics and platform teams. This cookbook demonstrates a practical pattern for using agents as a change-analysis and implementation assistant for database engineering work. Instead of asking one model to produce a final SQL script directly, the workflow breaks the task into explicit stages: - Parse the natural-language request into structured JSON. - Analyze impacted objects and operational risks. - Create a rollout plan with prechecks, postchecks, and rollback guidance. - Generate SQL across platform layers. - Run deterministic sanity checks. - Save a machine-readable artifact. - Optionally run Promptfoo evals against the current flow. The result is not just a generated SQL script. It is an auditable bundle containing the interpreted change request, impact analysis, plan, SQL, validation results, optional RAG evidence summaries, and eval outputs. Why This Matters Database change requests often move through several handoffs: product owners describe the need, data engineers interpret it, platform teams assess risk, analytics engineers propagate the field downstream, and reviewers check whether the change is safe. Important context can be lost at each step. SchemaFlow addresses this by turning a free-form change request into a structured, inspectable workflow. This matters because database changes can create hidden failure modes: - A column added to ODS may not be propagated into staging, core, or marts. - A nullable field may accidentally be generated as NOT NULL . - Backfill logic may be omitted even though the request asks for historical population. - Index requirements may be missed. - Downstream reporting dependencies may be unknown unless reference documentation is consulted. - Generated SQL may look plausible but fail basic consistency checks. This cookbook shows a pattern for reducing those risks with staged agent reasoning, typed outputs, optional retrieval context, deterministic guardrails, saved artifacts, and repeatable evals. Key Benefits Structured interpretation – Converts natural-language database requests into a normalized change json contract. Separation of responsibilities – Uses specialized agents for parse, impact analysis, rollout planning, and SQL generation. Optional RAG grounding – Lets the impact-analysis agent use File Search over an uploaded PDF, such as an IFD, schema spec, or lineage document. Typed stage outputs – Uses Pydantic models and Agents SDK output schemas for parse, impact, and plan stages. Guardrail-first workflow – Adds deterministic checks between stages so obvious failures are caught before downstream steps consume bad state. Traceability – Emits OpenAI Agents SDK traces and spans for agent runs, guardrails, artifact generation, and eval execution. Portable artifacts – Saves the final workflow bundle as JSON under artifacts/notebook runs/ . Eval-ready design – Generates Promptfoo provider, assertion, config, and result files from the live notebook state. No database side effects – Produces draft SQL and validation output without executing against a live database. What You’ll Build By the end of this notebook, you will have a working SchemaFlow pipeline that produces: - A parsed database change request: - title - domain - target schema - target table - normalized operations - notes - An impact-analysis report: - impacted tables, columns, indexes, views, or relationships - risks - assumptions - optional File Search evidence summaries - A rollout plan: - implementation steps - prechecks - postchecks - rollback actions - A draft SQL script with four required sections: LANDING ODS STAGING STG CORE DIM/FACT/VIEW MARTS SERVING - A validation result: - expected table checks - expected column checks - required keyword checks such as ALTER TABLE , UPDATE , or CREATE INDEX - A saved JSON artifact: - change request - impact analysis - plan - SQL - validation - optional RAG metadata - A Promptfoo eval harness: - Python provider - Python assertion file - generated Promptfoo config - parse-only eval case - full-flow eval case - timestamped JSON and HTML eval reports Introduction: Use Case and Solution This cookbook focuses on a common enterprise data-engineering scenario: a stakeholder requests a database schema change in natural language, and the data team needs to turn that request into an implementation-ready plan. Here, the retail domain is just a concrete way to make the workflow tangible. The same staged approach can be adapted to other source systems, data products, and review processes. The default request in this notebook is: Add LOYALTY TIER VARCHAR 20 to ODS.ODS CUSTOMER PROFILE as nullable. Backfill from CORE.DIM CUSTOMER on CUSTOMER ID where IS CURRENT=true. Add a non-unique index on CUSTOMER ID, LOYALTY TIER . A human data engineer would typically need to answer several questions before writing production SQL: - What table and schema are being changed? - What exact column, type, and nullability were requested? - Is historical backfill required? - Does the request imply an index? - Which downstream layers need the field propagated? - What risks should reviewers look for? - What checks should be run before and after deployment? - What rollback steps are reasonable? - Does the generated SQL include the required elements? SchemaFlow implements this as a staged agent workflow. Each stage creates a typed intermediate output that the next stage consumes. Deterministic checks then validate the outputs before the notebook saves the final bundle and optionally runs evals. Workflow Overview At a high level, SchemaFlow follows this sequence: The notebook is organized so readers can run the core workflow first and then decide whether they want to run the optional Promptfoo evaluation section. Table of Contents Conceptual Guide Overview overview Why This Matters why-this-matters Key Benefits key-benefits What You’ll Build what-youll-build Introduction: Use Case and Solution introduction-use-case-and-solution Workflow Overview workflow-overview Architecture - Design Patterns architecture-design-patterns System Design system-design Execution Workflow execution-workflow Notebook Implementation Environment Setup environment-setup Input input Optional PDF RAG Context optional-pdf-rag-context Stages 1-2: Parse Change Request + Impact Analysis stages-1-2-parse-change-request--impact-analysis Stages 3-4: Execution Plan + SQL Generation stages-3-4-execution-plan--sql-generation Stage 5: Lightweight SQL Sanity Checks stage-5-lightweight-sql-sanity-checks Final Bundle final-bundle Save Artifact save-artifact Optional Cleanup optional-cleanup Evaluate the Flow with Promptfoo evaluate-the-flow-with-promptfoo Reference Architecture - Design Patterns SchemaFlow uses a staged, contract-driven agent architecture. The goal is to avoid treating the model as a single black-box SQL generator. Instead, each stage has a narrow responsibility and produces an output that can be inspected, validated, traced, and reused. 1. Agent Specialization Each agent performs one primary task: | Agent | Responsibility | Main Output | |---|---|---| | Parse Agent | Extract structured fields from the natural-language request | change json | | Impact Agent | Identify affected objects, assumptions, and risks | impact json | | Plan Agent | Convert the change and impact into rollout steps | plan json | | SQL Agent | Draft SQL across data platform layers | sql text | This specialization makes the workflow easier to debug. If SQL is missing a column, you can inspect whether the issue started in parsing, impact analysis, planning, or SQL generation. 2. Typed Output Contracts The notebook defines Pydantic models for the structured stages: ChangeRequestModel ImpactModel PlanModel Those models are wrapped with AgentOutputSchema so the Agents SDK knows the expected output shape. The workflow also normalizes outputs after model calls to ensure expected keys exist before downstream stages run. 3. Retrieval-Augmented Impact Analysis The PDF RAG section is optional. When PDF PATH is set, the notebook: - Creates an OpenAI vector store. - Uploads the PDF. - Lets OpenAI parse, chunk, embed, and index it. - Gives the Impact Agent a FileSearchTool . - Captures a summary of returned File Search results. This is useful when the change request needs grounding in an IFD, schema document, lineage file, data contract, or architecture reference. 4. Guardrail Gates Between Stages The notebook adds deterministic checks after major stages: - Stages 1-2 guardrails validate parse and impact outputs. - Stages 3-4 guardrails validate plan completeness, data type propagation, and nullability handling. - Stage 5 SQL checks validate expected table, column, and SQL keyword presence. - Post-artifact checks verify the saved JSON artifact exists and round-trips. - Pre-Promptfoo checks verify the notebook state is ready for evals. These checks do not replace human review, but they catch common silent failures early. 5. Artifact-Centered Execution The final bundle is the main workflow artifact. It captures the state needed to review or debug the run: bundle = { "summary": ..., "rag": ..., "change json": ..., "impact json": ..., "plan": ..., "sql": ..., "validation": ... } The notebook saves this bundle under artifacts/notebook runs/ . 6. Eval Runtime Generated from Notebook State Promptfoo runs in a separate process, so it cannot directly read variables from the active notebook kernel. To solve this, Section 10 writes a small reusable Python module and Promptfoo runtime files from the current notebook state. This ensures that prompt edits, CHANGE TEXT edits, and model configuration changes are reflected when the eval files are regenerated. System Design Component Architecture Primary Runtime Objects | Object | Created in | Purpose | |---|---|---| CHANGE TEXT | Input section | The natural-language database change request | change json | Stage 1 | Structured interpretation of the request | rag vector store id | Optional PDF RAG section | Hosted vector store ID for uploaded PDF context | rag file search results | Stage 2 | Summary of File Search results returned to the Impact Agent | impact json | Stage 2 | Impacted objects, risks, and assumptions | plan json | Stage 3 | Rollout plan, checks, and rollback guidance | sql text | Stage 4 | Draft SQL script | validation | Stage 5 | Deterministic SQL sanity-check result | bundle | Final Bundle section | Consolidated workflow output | out path | Save Artifact section | Saved JSON artifact path | promptfoo config | Promptfoo section | Generated eval configuration | Important Boundary SchemaFlow generates draft implementation artifacts. It does not execute SQL against a database, apply migrations, open pull requests, or modify production systems. Execution Workflow Run the notebook in order. Core Workflow - Environment Setup - Imports dependencies. - Verifies the OpenAI Agents SDK version. - Reads OPENAI API KEY . - Configures tracing and model selection. - Input - Defines CHANGE TEXT . - This is the only required business input for the core workflow. - Defines - Optional PDF RAG Context - Leave PDF PATH = None to run without retrieval. - Set PDF PATH to a local PDF to enable File Search context for impact analysis. - Leave - Stages 1-2 - Parse the change request. - Analyze impact. - Optionally use File Search during impact analysis. - Stages 1-2 Guardrails - Confirm parse output is well-formed. - Confirm impact output includes the target. - Confirm impacted objects contain required fields. - Stages 3-4 - Generate an execution plan. - Generate SQL across landing, staging, core, and mart layers. - Stages 3-4 Guardrails - Confirm plan sections are populated. - Confirm data type propagation. - Confirm nullability behavior matches the request. - Stage 5 SQL Sanity Checks - Check for empty SQL. - Check expected target table and columns. - Check required SQL actions implied by the request. - Final Bundle and Artifact - Assemble the full output bundle. - Save it as JSON. - Verify the artifact round-trips successfully. Optional Eval Workflow - Pre-Promptfoo Checks - Confirm the notebook state is ready for evals. - Promptfoo Runtime Generation - Create a reusable SchemaFlow core module. - Write a Promptfoo provider. - Write a Promptfoo assertion file. - Generate Promptfoo test cases and config. - Promptfoo Eval Execution - Run parse-only and full-flow evals. - Save timestamped JSON and HTML reports. - Refresh schemaflow cookbook eval latest. aliases. 1 Environment Setup This section prepares the runtime for the SchemaFlow workflow. The setup cell does the following: - Imports standard Python utilities used throughout the notebook. - Imports the OpenAI client. - Imports the OpenAI Agents SDK primitives: Agent Runner RunConfig AgentOutputSchema FileSearchTool - tracing and span helpers - Verifies that the installed openai-agents package meets the minimum required version. - Reads OPENAI API KEY from the environment or prompts for it. - Sets the model with OPENAI MODEL , defaulting to gpt-5.5 . - Creates a trace group ID so all related agent runs and guardrail spans can be grouped together. The workflow intentionally enables sensitive trace payloads for this demo so prompts, outputs, eval bundles, and tool data are visible in traces. For production usage, review this setting before handling private data. %pip install --quiet -U "openai" "openai-agents =0.17.0" python import os import json import re import uuid from datetime import datetime, timezone from getpass import getpass from importlib.metadata import PackageNotFoundError, version try: from openai import OpenAI except Exception as e: raise RuntimeError "Install dependency first: pip install -U openai" from e MIN AGENTS SDK VERSION = "0.17.0" try: from agents import Agent, AgentOutputSchema, FileSearchTool, Runner, RunConfig, custom span, flush traces, function span, guardrail span, trace, except Exception as e: raise RuntimeError 'Install or upgrade the OpenAI Agents SDK first: pip install -U "openai-agents =0.17.0"' from e def version tuple value : match = re.match r"^ \d+ \. \d+ \. \d+ ", str value or "" return tuple int part for part in match.groups if match else 0, 0, 0 try: AGENTS SDK VERSION = version "openai-agents" except PackageNotFoundError as e: raise RuntimeError 'Install the OpenAI Agents SDK first: pip install -U "openai-agents =0.17.0"' from e if version tuple AGENTS SDK VERSION < version tuple MIN AGENTS SDK VERSION : raise RuntimeError f'OpenAI Agents SDK {MIN AGENTS SDK VERSION}+ is required; found {AGENTS SDK VERSION}. ' 'Upgrade with: pip install -U "openai-agents =0.17.0"' def clean openai api key value : key = value or "" .strip if not key: raise RuntimeError "OPENAI API KEY is required." return key if not os.getenv "OPENAI API KEY", "" .strip : os.environ "OPENAI API KEY" = getpass "Enter your OpenAI API key: " os.environ "OPENAI API KEY" = clean openai api key os.getenv "OPENAI API KEY" OPENAI ORG ID = os.getenv "OPENAI ORG ID", "" .strip if OPENAI ORG ID: os.environ "OPENAI ORG ID" = OPENAI ORG ID MODEL = os.getenv "OPENAI MODEL", "gpt-5.5" TRACE INCLUDE SENSITIVE DATA = os.getenv "OPENAI AGENTS TRACE INCLUDE SENSITIVE DATA", "false" .lower in {"1", "true", "yes", "on"} os.environ "OPENAI AGENTS TRACE INCLUDE SENSITIVE DATA" = "true" if TRACE INCLUDE SENSITIVE DATA else "false" SCHEMAFLOW TRACE GROUP ID = os.getenv "SCHEMAFLOW TRACE GROUP ID" or "schemaflow-cookbook-" + datetime.now timezone.utc .strftime "%Y%m%dT%H%M%SZ" + "-" + uuid.uuid4 .hex :8 os.environ "SCHEMAFLOW TRACE GROUP ID" = SCHEMAFLOW TRACE GROUP ID client = OpenAI api key=os.environ "OPENAI API KEY" print "Using model:", MODEL print "OpenAI Agents SDK:", AGENTS SDK VERSION print "OpenAI organization:", os.getenv "OPENAI ORG ID" or " default for API key " print "Trace group:", SCHEMAFLOW TRACE GROUP ID print "Trace payloads include prompts/outputs:", TRACE INCLUDE SENSITIVE DATA python from concurrent.futures import ThreadPoolExecutor from pydantic import BaseModel, ConfigDict, Field class SchemaFlowBaseModel BaseModel : model config = ConfigDict extra="allow" class OperationModel SchemaFlowBaseModel : op: str details: dict = Field default factory=dict class ChangeRequestModel SchemaFlowBaseModel : title: str | None = None domain: str | None = None target schema: str | None = None target table: str | None = None operations: list OperationModel = Field default factory=list notes: list = Field default factory=list class ImpactObjectModel SchemaFlowBaseModel : type: str name: str reason: str source: str class ImpactModel SchemaFlowBaseModel : impacted objects: list ImpactObjectModel = Field default factory=list risks: list str = Field default factory=list assumptions: list str = Field default factory=list class PlanStepModel SchemaFlowBaseModel : id: str description: str class PlanModel SchemaFlowBaseModel : plan steps: list PlanStepModel = Field default factory=list prechecks: list str = Field default factory=list postchecks: list str = Field default factory=list rollback: list str = Field default factory=list CHANGE OUTPUT SCHEMA = AgentOutputSchema ChangeRequestModel, strict json schema=False IMPACT OUTPUT SCHEMA = AgentOutputSchema ImpactModel, strict json schema=False PLAN OUTPUT SCHEMA = AgentOutputSchema PlanModel, strict json schema=False def parse json text text: str : text = text or "{}" .strip if text.startswith " " : text = re.sub r"^ ?:json ?\s ", "", text text = re.sub r"\s $", "", text .strip try: return json.loads text except json.JSONDecodeError: match = re.search r"\{. \}", text, flags=re.DOTALL if not match: raise return json.loads match.group 0 def model dump value : if value is None or isinstance value, str, int, float, bool, bytes : return value if isinstance value, type : return value if hasattr value, "model dump" : try: return value.model dump except TypeError: pass if hasattr value, "to dict" : try: return value.to dict except TypeError: pass if hasattr value, " dict " : try: return {k: v for k, v in vars value .items if not k.startswith " " } except TypeError: pass return value def agent output to json value : value = model dump value if isinstance value, dict : return value if isinstance value, str : return parse json text value return json.loads json.dumps value, default=str def agent output to text value : value = model dump value if isinstance value, str : return value.strip return json.dumps value, ensure ascii=False def trace metadata metadata: dict | None = None : cleaned = {} for key, value in metadata or {} .items : if value is None: cleaned str key = "" elif isinstance value, bool : cleaned str key = "true" if value else "false" elif isinstance value, dict, list, tuple, set : cleaned str key = json.dumps value, ensure ascii=False, default=str else: cleaned str key = str value return cleaned def schemaflow run config workflow name: str, metadata: dict | None = None : return RunConfig workflow name=workflow name, group id=SCHEMAFLOW TRACE GROUP ID, trace include sensitive data=TRACE INCLUDE SENSITIVE DATA, trace metadata= trace metadata {"notebook": "schemaflow cookbook", metadata or {} } , def runner run sync agent, prompt: str, , workflow name: str, metadata: dict | None = None, max turns: int = 4 : kwargs = {"run config": schemaflow run config workflow name, metadata , "max turns": max turns} try: return Runner.run sync agent, prompt, kwargs except RuntimeError as exc: if "event loop" not in str exc .lower : raise with ThreadPoolExecutor max workers=1 as pool: return pool.submit lambda: Runner.run sync agent, prompt, kwargs .result def run schemaflow json agent , name, instructions, prompt, output schema, model=MODEL, tools=None, workflow name=None, metadata=None : agent = Agent name=name, instructions=instructions, model=model, output type=output schema, tools=tools or result = runner run sync agent, prompt, workflow name=workflow name or name, metadata={"agent": name, metadata or {} } return agent output to json result.final output , result def run schemaflow text agent , name, instructions, prompt, model=MODEL, tools=None, workflow name=None, metadata=None : agent = Agent name=name, instructions=instructions, model=model, tools=tools or result = runner run sync agent, prompt, workflow name=workflow name or name, metadata={"agent": name, metadata or {} } return agent output to text result.final output , result def collect file search results value : results = seen = set def visit node : if node is None or isinstance node, str, int, float, bool, bytes : return if isinstance node, type or callable node : return node id = id node if node id in seen: return seen.add node id node = model dump node if node is None or isinstance node, str, int, float, bool, bytes : return if isinstance node, type or callable node : return if isinstance node, dict : if node.get "type" == "file search call": for result in node.get "results", or : result = model dump result if isinstance result, dict : text = result.get "text" or result.get "content" or "" if isinstance text, list : text = "\n".join str x for x in text results.append {"file id": result.get "file id" , "filename": result.get "filename" or result.get "file name" or result.get "title" , "score": result.get "score" , "text preview": str text :1200 } for child in node.values : visit child elif isinstance node, list, tuple, set : for child in node: visit child visit value return results def agent file search results run result : return collect file search results run result def trace function result name: str, , input obj=None, output obj=None : with function span name, input=json.dumps input obj, ensure ascii=False, default=str if input obj is not None else None, output=json.dumps output obj, ensure ascii=False, default=str if output obj is not None else None, : pass def pretty obj : print json.dumps obj, indent=2, ensure ascii=False 2 Input This section defines the database change request that SchemaFlow will process. Think of it as the compact ticket, issue, or message a data team might receive before turning the request into implementation details. The default request asks the workflow to: - Add LOYALTY TIER VARCHAR 20 to ODS.ODS CUSTOMER PROFILE . - Treat the new column as nullable. - Backfill from CORE.DIM CUSTOMER . - Join on CUSTOMER ID . - Filter the source to current records with IS CURRENT=true . - Add a non-unique index on CUSTOMER ID, LOYALTY TIER . This input is intentionally compact but rich enough to exercise the full workflow: - parsing target schema and table - extracting column name, type, and nullability - recognizing backfill requirements - recognizing index requirements - generating multi-layer SQL - running validation checks for expected table, column, and SQL actions CHANGE TEXT = """Add LOYALTY TIER VARCHAR 20 to ODS.ODS CUSTOMER PROFILE as nullable. Backfill from CORE.DIM CUSTOMER on CUSTOMER ID where IS CURRENT=true. Add a non-unique index on CUSTOMER ID, LOYALTY TIER .""" print CHANGE TEXT 3 Optional PDF RAG Context SchemaFlow can run with or without retrieval context, so readers can start with the request alone and add reference docs only when the change needs them. The sample PDF path in the code cell below points to a file included in the cookbook folder under data/ , not to bytes embedded inside the notebook. Leave PDF PATH = None for static article previews or generic runs. With the default PDF PATH = None , the notebook uses only the natural-language change request. This is enough to demonstrate the core staged workflow. Set PDF PATH to a local PDF when you want the Impact Agent to ground its analysis in reference material, such as: - interface design documents - schema specifications - lineage documentation - data contracts - platform architecture notes - downstream dependency documentation When a PDF is configured, this section: - Validates that the file exists and is a PDF. - Creates an OpenAI vector store with a one-day expiration policy. - Uploads the PDF to the vector store. - Lets OpenAI handle parsing, chunking, embedding, and retrieval. - Stores the vector store ID for the Impact Agent. - Later summarizes any File Search results returned during impact analysis. This keeps the cookbook lightweight because it does not require local embedding models, Chroma, Neo4j, LangGraph, or project-specific Python modules. python from pathlib import Path Optional PDF RAG example. The GitHub repo includes this sample PDF under schemaflow cookbook/data. When running from a repo checkout in the cookbook folder, uncomment the path below to upload it to File Search. For meaningful retrieval hits, pair it with the LOYALTY TIER change request used in this notebook. PDF PATH = None PDF PATH = "data/sample customer loyalty ifd.pdf" RAG MAX RESULTS = 6 rag vector store = None rag vector store id = None rag vector store file = None rag file search results = impact response = None def create pdf vector store pdf path : pdf path = Path pdf path .expanduser .resolve if not pdf path.exists : raise FileNotFoundError f"PDF not found: {pdf path}" if pdf path.suffix.lower = ".pdf": raise ValueError f"Expected a PDF file, got: {pdf path}" with trace "SchemaFlow PDF Vector Store", group id=SCHEMAFLOW TRACE GROUP ID, metadata={"step": "pdf vector store", "pdf path": str pdf path } : with custom span "Create vector store", {"pdf path": str pdf path } : vector store = client.vector stores.create name=f"schemaflow-cookbook-{datetime.now timezone.utc .strftime '%Y%m%dT%H%M%SZ' }", expires after={"anchor": "last active at", "days": 1} with custom span "Upload PDF to vector store", {"vector store id": vector store.id, "pdf path": str pdf path } : with pdf path.open "rb" as handle: vector store file = client.vector stores.files.upload and poll vector store id=vector store.id, file=handle trace function result "PDF vector store ready", input obj={"pdf path": str pdf path }, output obj={"vector store id": vector store.id, "status": getattr vector store file, "status", "unknown" } flush traces return vector store, vector store file if PDF PATH: rag vector store, rag vector store file = create pdf vector store PDF PATH rag vector store id = rag vector store.id print "Created vector store:", rag vector store id print "Uploaded PDF status:", getattr rag vector store file, "status", "unknown" else: print "No PDF configured. Leave PDF PATH as None to run without RAG, or set it to a local PDF path." 4 Stages 1-2 - Parse Change Request + Impact Analysis This section runs the first two agent stages back to back. Together, they answer two practical questions: what exactly was requested, and what else could be affected? Stage 1: Parse Change Request The Parse Agent converts CHANGE TEXT into a structured change json object. Expected fields include: title domain target schema target table operations notes This stage creates the normalized contract that every downstream stage consumes. If the parse step misses the target table, column, data type, nullability, backfill, or index intent, later stages may produce incomplete output. That is why the notebook validates this stage immediately afterward. Stage 2: Impact Analysis The Impact Agent consumes change json and produces impact json . Expected fields include: impacted objects risks assumptions If PDF PATH was configured earlier, the Impact Agent also receives a FileSearchTool connected to the uploaded PDF vector store. This lets the model search reference documentation before returning impact claims. The output is intentionally conservative. When the agent is uncertain, it should call out assumptions and risks instead of inventing undocumented certainty. Impact Dashboard Preview The impact-analysis stage produces structured impact json that can be visualized as a graph of affected objects and relationships. The preview below shows the kind of customer loyalty lineage graph built in the optional Neo4j dashboard section later in the notebook. Run that section to generate the local graph UI from the sample knowledge-graph seed and inspect impacted objects interactively. ============================================================= Stage 1 - Parse Change Request ============================================================= print "=" 60 print "Stage 1 - Parse Change Request" print "=" 60 PARSE SYSTEM = """ You are a precise information extraction system for database change requests. Return STRICT JSON only no prose, no code fences, no comments . Required keys: { "title": str, "domain": str|null, "target schema": str|null, "target table": str|null, "operations": {"op": str, "details": object} , "notes": } Rules: - Use lowercase op names. - If schema/table unknown, set null. - Keep details explicit and typed where possible. """.strip parse user = "Change Request:\n\n" + CHANGE TEXT change json, parse agent result = run schemaflow json agent name="SchemaFlow Parse Agent", instructions=PARSE SYSTEM, prompt=parse user, output schema=CHANGE OUTPUT SCHEMA, workflow name="SchemaFlow Stage 1 Parse", metadata={"stage": "parse change request"} if isinstance change json, dict : change json.setdefault "title", None change json.setdefault "domain", None change json.setdefault "target schema", None change json.setdefault "target table", None if not isinstance change json.get "operations" , list : change json "operations" = change json.get "operations" if change json.get "operations" else if not isinstance change json.get "notes" , list : change json "notes" = pretty change json ============================================================= Stage 2 - Impact Analysis ============================================================= print "\n" + "=" 60 print "Stage 2 - Impact Analysis" print "=" 60 IMPACT SYSTEM = """ You are a cautious impact analysis assistant. Inputs: - change json: normalized change request. - optional File Search context from an uploaded IFD/reference PDF. Task: Return JSON exactly as: { "impacted objects": {"type":"table|column|fk|index|view","name":str,"reason":str,"source":"file search|ifd|inference"} , "risks": str , "assumptions": str } Rules: - Be conservative when uncertain. - Call out data quality/backfill risks explicitly. - If File Search context is available, use it to ground table, column, and downstream-impact claims. """.strip impact user parts = "CHANGE JSON:\n" + json.dumps change json, ensure ascii=False impact tools = if rag vector store id: impact tools.append FileSearchTool vector store ids= rag vector store id , max num results=RAG MAX RESULTS, include search results=True impact user parts.append "Use the file search tool against the uploaded PDF to look for relevant IFD, schema, table, column, lineage, and downstream dependency context before returning JSON." impact json, impact agent result = run schemaflow json agent name="SchemaFlow Impact Agent", instructions=IMPACT SYSTEM, prompt="\n\n".join impact user parts , output schema=IMPACT OUTPUT SCHEMA, tools=impact tools, workflow name="SchemaFlow Stage 2 Impact Analysis", metadata={"stage": "impact analysis", "rag enabled": bool rag vector store id } impact response = impact agent result try: rag file search results = agent file search results impact agent result except Exception as exc: rag file search results = print f"File Search result summary skipped: {type exc . name }: {exc}" if rag vector store id: print "File Search results returned:", len rag file search results for i, result in enumerate rag file search results, start=1 : print f"{i}. {result.get 'filename' or result.get 'file id' } score={result.get 'score' }" if isinstance impact json, dict : impact json.setdefault "impacted objects", impact json.setdefault "risks", impact json.setdefault "assumptions", pretty impact json flush traces Stages 1-2 Output Guardrails This guardrail cell performs deterministic checks on the Parse and Impact outputs before the workflow continues. The checks verify that: change json contains a target schema. change json contains a target table. change json.operations is a non-empty list. impact json.impacted objects contains at least one object.- The impact output references the parsed target table. - Each impacted object has basic required fields such as type, name, and reason. These checks are deliberately lightweight. They do not prove that the analysis is complete, but they catch obvious failure modes before the Plan Agent or SQL Agent consumes malformed or incomplete state. Stages 1-2 Output Guardrails - inspects change json Parse and impact json Impact . stages 1 2 guardrails = with trace "SchemaFlow Stages 1-2 Guardrails", group id=SCHEMAFLOW TRACE GROUP ID, metadata={"stage": "stages 1 2 guardrails"} : def check name, ok, detail="" : ok = bool ok stages 1 2 guardrails.append {"name": name, "ok": ok, "detail": detail} with guardrail span name, triggered=not ok : trace function result name + " detail", output obj={"ok": ok, "detail": detail} target schema = change json.get "target schema" or "" .strip if isinstance change json, dict else "" target table = change json.get "target table" or "" .strip if isinstance change json, dict else "" ops = change json.get "operations" if isinstance change json, dict else None check "parse output well formed", bool target schema and bool target table and isinstance ops, list and len ops 0, f"target={ target schema}.{ target table}, ops={len ops or }" impacted = impact json.get "impacted objects" if isinstance impact json, dict else target fqn = f"{ target schema}.{ target table}" if target schema and target table else "" target in impact = any isinstance o, dict and o.get "name", "" .upper == target fqn.upper or target table and target table.upper in o.get "name", "" .upper for o in impacted or check "impact includes target", bool impacted and target in impact, f"{len impacted or } impacted object s , target match={ target in impact}" malformed = i for i, o in enumerate impacted or if not isinstance o, dict and o.get "type" and o.get "name" and o.get "reason" check "impacted objects well formed", not malformed, "all populated" if not malformed else f"missing fields at indices { malformed :5 }" stages 1 2 guardrails passed = all c "ok" for c in stages 1 2 guardrails trace function result "Stages 1-2 guardrails summary", output obj={"passed": stages 1 2 guardrails passed, "checks": stages 1 2 guardrails} flush traces print f"Stages 1-2 Output Guardrails: {'PASS' if stages 1 2 guardrails passed else 'FAIL'}" for c in stages 1 2 guardrails: flag = "OK " if c "ok" else "FAIL" print f" { flag} { c 'name' :35s} { c 'detail' }" 5 Stages 3-4 - Execution Plan + SQL Generation This section runs the implementation-planning and SQL-generation stages. At this point the workflow shifts from understanding the request to drafting an implementation handoff. Stage 3: Execution Plan The Plan Agent consumes: change json impact json It returns plan json with four sections: plan steps prechecks postchecks rollback The goal is to make the implementation strategy explicit before generating SQL. This helps separate “what should be done” from “what exact SQL should be drafted.” Stage 4: SQL Generation The SQL Agent consumes: change json plan json It returns a single plaintext SQL script. The prompt requires four sections in order: -- === LANDING ODS === -- === STAGING STG === -- === CORE DIM/FACT/VIEW === -- === MARTS SERVING === The generated SQL is intended as a reviewable draft. It should be checked by engineers before any production use. ============================================================= Stage 3 - Execution Plan ============================================================= print "=" 60 print "Stage 3 - Execution Plan" print "=" 60 PLAN SYSTEM = """ You are a senior data engineer creating a safe execution plan. Inputs: - change json - impact json Return JSON: { "plan steps": {"id": "str", "description": "str"} , "prechecks": str , "postchecks": str , "rollback": str } Guidance: - Include practical pre/post checks. - Keep steps executable and concise. """.strip plan user = "\n\n".join "CHANGE JSON:\n" + json.dumps change json, ensure ascii=False , "IMPACT JSON:\n" + json.dumps impact json, ensure ascii=False plan json, plan agent result = run schemaflow json agent name="SchemaFlow Plan Agent", instructions=PLAN SYSTEM, prompt=plan user, output schema=PLAN OUTPUT SCHEMA, workflow name="SchemaFlow Stage 3 Execution Plan", metadata={"stage": "execution plan"} if isinstance plan json, dict : plan json.setdefault "plan steps", plan json.setdefault "prechecks", plan json.setdefault "postchecks", plan json.setdefault "rollback", pretty plan json ============================================================= Stage 4 - SQL Generation ============================================================= print "\n" + "=" 60 print "Stage 4 - SQL Generation" print "=" 60 SQL SYSTEM = """ You are a senior data engineer producing SQL for multi-layer data stacks. Output a SINGLE plaintext script with FOUR sections in order: 1 -- === LANDING ODS === 2 -- === STAGING STG === 3 -- === CORE DIM/FACT/VIEW === 4 -- === MARTS SERVING === Rules: - PostgreSQL dialect. - Prefer idempotent DDL where possible. - Propagate requested changes through downstream layers. - Include concise assumptions as comments. """.strip sql user = "\n\n".join "CHANGE JSON:\n" + json.dumps change json, ensure ascii=False , "PLAN JSON:\n" + json.dumps plan json, ensure ascii=False sql text, sql agent result = run schemaflow text agent name="SchemaFlow SQL Agent", instructions=SQL SYSTEM, prompt=sql user, workflow name="SchemaFlow Stage 4 SQL Generation", metadata={"stage": "sql generation"} print sql text :5000 flush traces Stages 3-4 Output Guardrails This guardrail cell validates the plan and SQL draft before the notebook moves to the final SQL sanity checks. The checks verify that: - all four plan sections are populated: plan steps prechecks postchecks rollback - the data type requested in CHANGE TEXT appears in the generated SQL - nullable requests do not accidentally create NOT NULL constraints - explicit NOT NULL requests are reflected when present These checks complement Stage 5. Stages 3-4 guardrails focus on plan completeness and semantic consistency, while Stage 5 focuses on expected SQL terms and actions. Stages 3-4 Output Guardrails - inspects plan json Plan and sql text SQL . import re as re stages 3 4 guardrails = with trace "SchemaFlow Stages 3-4 Guardrails", group id=SCHEMAFLOW TRACE GROUP ID, metadata={"stage": "stages 3 4 guardrails"} : def check name, ok, detail="" : ok = bool ok stages 3 4 guardrails.append {"name": name, "ok": ok, "detail": detail} with guardrail span name, triggered=not ok : trace function result name + " detail", output obj={"ok": ok, "detail": detail} plan = plan json if isinstance plan json, dict else {} plan missing = k for k in "plan steps", "prechecks", "postchecks", "rollback" if not plan.get k check "plan sections populated", not plan missing, "all four populated" if not plan missing else f"empty: { plan missing}" dtype match = re.search r"\b ?:add\s+\w+\s+|column\s+\w+\s+ ?:VAR ?CHAR\s \ ^ \ |TEXT|INTEGER|INT|BIGINT|BOOLEAN|DATE|TIMESTAMP|NUMERIC\s \ ^ \ |DECIMAL\s \ ^ \ |FLOAT|DOUBLE ", CHANGE TEXT, flags= re.IGNORECASE if dtype match: dtype = " ".join dtype match.group 1 .upper .split check "data type propagated to sql", dtype.lower in sql text.lower , f"expected '{ dtype}' in SQL" else: check "data type propagated to sql", True, "no data type referenced in CHANGE TEXT skipped " change lower = CHANGE TEXT.lower sql lower = sql text.lower expected cols = for op in change json.get "operations" if isinstance change json, dict else or : details = op.get "details" if isinstance op, dict else None if isinstance details, dict : for key in "column", "column name", "name" : val = details.get key if isinstance val, str and val.strip : expected cols.append val.strip .lower if "not null" in change lower: check "nullability matches request", "not null" in sql lower, "request: NOT NULL" elif "nullable" in change lower: ddl lines = for line in sql text.split "\n" : line = line.strip .lower if not any c in line for c in expected cols : continue if "add column" in line or any line.startswith c + " " or line.startswith c + "\t" for c in expected cols : ddl lines.append line.strip bad lines = line for line in ddl lines if "not null" in line.lower check "nullability matches request", not bad lines, "no NOT NULL on nullable column DDL" if not bad lines else f"NOT NULL conflict in {len bad lines } DDL line s " else: check "nullability matches request", True, "no explicit nullability requested skipped " stages 3 4 guardrails passed = all c "ok" for c in stages 3 4 guardrails trace function result "Stages 3-4 guardrails summary", output obj={"passed": stages 3 4 guardrails passed, "checks": stages 3 4 guardrails} flush traces print f"Stages 3-4 Output Guardrails: {'PASS' if stages 3 4 guardrails passed else 'FAIL'}" for c in stages 3 4 guardrails: flag = "OK " if c "ok" else "FAIL" print f" { flag} { c 'name' :35s} { c 'detail' }" 6 Stage 5 - Lightweight SQL Sanity Checks This section runs deterministic checks against the generated SQL for the current notebook run. This is not a full SQL parser and it does not execute the SQL. Instead, it checks for obvious mismatches between the original request, parsed change object, and generated script. These checks are intentionally small and explainable, so a reader can see exactly what passed or failed before the result is saved or evaluated. The checks look for: - empty SQL output - missing target table - missing expected columns - required SQL keywords inferred from the request: ALTER TABLE UPDATE when the request implies backfill or source-based population CREATE INDEX when the request mentions an index The output is stored in validation , which becomes part of the final bundle and is also used by the Promptfoo full-flow assertion. with trace "SchemaFlow Stage 5 SQL Sanity Checks", group id=SCHEMAFLOW TRACE GROUP ID, metadata={"stage": "sql sanity checks"} : issues = sql lower = sql text.lower change lower = CHANGE TEXT.lower if not sql text.strip : issues.append "SQL output is empty" expected schema = change json.get "target schema" or "" .strip expected table = change json.get "target table" or "" .strip if expected table and expected table.lower not in sql lower: issues.append f"Expected target table missing from SQL: {expected table}" expected columns = for operation in change json.get "operations", : details = operation.get "details" if isinstance operation, dict else None if not isinstance details, dict : continue for key in "column", "column name", "name" : value = details.get key if isinstance value, str and value.strip : expected columns.append value.strip for column in dict.fromkeys expected columns : if column.lower not in sql lower: issues.append f"Expected column missing from SQL: {column}" required keywords = "ALTER TABLE" if any term in change lower for term in "backfill", "update", "source it from" : required keywords.append "UPDATE" if "index" in change lower: required keywords.append "CREATE INDEX" for keyword in dict.fromkeys required keywords : if keyword.lower not in sql lower: issues.append f"Expected keyword missing: {keyword}" validation = {"valid": len issues == 0, "issues": issues, "checks": {"expected schema": expected schema or None, "expected table": expected table or None, "expected columns": list dict.fromkeys expected columns , "required keywords": list dict.fromkeys required keywords }} with guardrail span "stage5 sql sanity", triggered=not validation "valid" : trace function result "Stage 5 SQL sanity result", output obj=validation flush traces pretty validation 7 Final Bundle This section assembles the main SchemaFlow output object. The final bundle contains: summary rag change json impact json plan sql validation This object is the reviewable handoff artifact for the notebook run. It collects the model-generated outputs, deterministic validation results, and optional retrieval metadata in one place, so a reviewer does not have to reconstruct the flow from separate cells. The printed summary gives a compact view of the most important run-level information: - parsed title - parsed target - number of RAG hits - number of plan steps - validation status - validation issues with trace "SchemaFlow Final Bundle", group id=SCHEMAFLOW TRACE GROUP ID, metadata={"stage": "final bundle"} : bundle = { "summary": {"matched tables": , "impact risks": impact json.get "risks", , "rag hits": len rag file search results }, "rag": {"vector store id": rag vector store id, "file search results": rag file search results}, "change json": change json, "impact json": impact json, "plan": plan json, "sql": sql text, "validation": validation, } trace function result "Final bundle assembled", output obj=bundle flush traces pretty {"title": change json.get "title" , "target": ".".join x for x in change json.get "target schema" , change json.get "target table" if x , "rag hits": len rag file search results , "plan steps": len plan json.get "plan steps", , "valid": validation.get "valid" , "issues": validation.get "issues", } 8 Save Artifact This section writes the final bundle to disk as JSON. Artifacts are saved under: artifacts/notebook runs/ Each run receives a timestamped filename, which makes it easy to compare outputs across different prompts, models, inputs, or retrieval documents. The saved artifact is useful for: - code review - audit trails - debugging - regression comparison - eval fixture creation - downstream automation python from pathlib import Path with trace "SchemaFlow Save Artifact", group id=SCHEMAFLOW TRACE GROUP ID, metadata={"stage": "save artifact"} : out dir = Path "artifacts/notebook runs" out dir.mkdir parents=True, exist ok=True ts = datetime.now timezone.utc .strftime "%Y%m%dT%H%M%SZ" out path = out dir / f"schemaflow cookbook run {ts}.json" out path.write text json.dumps bundle, indent=2, ensure ascii=False , encoding="utf-8" trace function result "Notebook artifact saved", input obj={"bundle keys": sorted bundle.keys }, output obj={"path": str out path.resolve , "bytes": out path.stat .st size} flush traces print "Saved artifact:", out path.resolve Post-Artifact Generation Sanity Check This cell verifies that the saved artifact is usable. It checks that: - the artifact file exists - the artifact file is non-empty - the file can be loaded with json.loads - the top-level keys on disk match the in-memory bundle This catches file-write issues immediately instead of letting a later review, eval, or automation step consume a missing or malformed artifact. Post-Artifact Generation Sanity Check - re-reads the file Save Artifact wrote. post artifact checks = with trace "SchemaFlow Post-Artifact Guardrails", group id=SCHEMAFLOW TRACE GROUP ID, metadata={"stage": "post artifact guardrails"} : def check name, ok, detail="" : ok = bool ok post artifact checks.append {"name": name, "ok": ok, "detail": detail} with guardrail span name, triggered=not ok : trace function result name + " detail", output obj={"ok": ok, "detail": detail} size = out path.stat .st size if out path.exists else 0 check "artifact file persisted", out path.exists and size 0, f"{ size} bytes" if out path.exists : try: roundtrip = json.loads out path.read text encoding="utf-8" check "artifact roundtrip keys match", set roundtrip.keys == set bundle.keys , f"disk keys={sorted roundtrip.keys }" except Exception as exc: check "artifact roundtrip keys match", False, str exc else: check "artifact roundtrip keys match", False, "saved file missing" post artifact sanity passed = all c "ok" for c in post artifact checks trace function result "Post-artifact guardrails summary", output obj={"passed": post artifact sanity passed, "checks": post artifact checks} flush traces print f"Post-Artifact Sanity Check: {'PASS' if post artifact sanity passed else 'FAIL'}" for c in post artifact checks: flag = "OK " if c "ok" else "FAIL" print f" { flag} { c 'name' :30s} { c 'detail' }" 9 Optional Cleanup This section handles cleanup for the optional PDF vector store. By default, DELETE VECTOR STORE AFTER RUN = False . That default is safe for interactive notebook usage because the vector store is created with a one-day expiration policy. Keeping it temporarily can be useful if you want to inspect traces, rerun downstream stages, or debug File Search behavior. Set DELETE VECTOR STORE AFTER RUN = True before running this cell if you want to delete the vector store immediately after the notebook run. If no PDF was configured, this cell simply reports that no vector store was created. DELETE VECTOR STORE AFTER RUN = False with trace "SchemaFlow Optional Cleanup", group id=SCHEMAFLOW TRACE GROUP ID, metadata= trace metadata {"stage": "optional cleanup", "delete vector store after run": DELETE VECTOR STORE AFTER RUN} : if rag vector store id and DELETE VECTOR STORE AFTER RUN: with custom span "Delete vector store", {"vector store id": rag vector store id} : client.vector stores.delete vector store id=rag vector store id print "Deleted vector store:", rag vector store id elif rag vector store id: trace function result "Vector store retained", output obj={"vector store id": rag vector store id, "expiration": "1 day"} print "Vector store retained with one-day expiration:", rag vector store id else: trace function result "No vector store cleanup", output obj={"created": False} print "No vector store was created." flush traces Pre-Promptfoo Checks / Guardrails This cell is the readiness gate before running Promptfoo. Promptfoo runs the workflow in a separate process, so it is important to confirm that the notebook state is complete and internally consistent before generating eval files. The preflight checks verify that: bundle exists in the notebook kernel. bundle reflects the current change json and plan json .- Stage 5 validation passed. - Stages 1-2 guardrails passed. - Stages 3-4 guardrails passed. - The saved artifact sanity check passed. CHANGE TEXT is consistent with the parsed bundle target. OPENAI API KEY is present.- The installed Agents SDK version meets the minimum requirement. If this section reports failures, rerun or fix the earlier notebook sections before running Promptfoo. python Pre-Promptfoo Checks / Guardrails - deterministic, no LLM calls. import os import re as re pre promptfoo checks = with trace "SchemaFlow Pre-Promptfoo Guardrails", group id=SCHEMAFLOW TRACE GROUP ID, metadata={"stage": "pre promptfoo guardrails"} : def check name, ok, detail="" : ok = bool ok pre promptfoo checks.append {"name": name, "ok": ok, "detail": detail} with guardrail span name, triggered=not ok : trace function result name + " detail", output obj={"ok": ok, "detail": detail} bundle = globals .get "bundle" check "bundle in scope", isinstance bundle, dict and "validation" in bundle, f"keys={sorted bundle.keys if isinstance bundle, dict else 'n/a'}" check "bundle in sync with kernel", bundle.get "change json" == change json and bundle.get "plan" == plan json, "bundle reflects current change json + plan json" check "stage5 validation passed", bool validation.get "valid" , f"{len validation.get 'issues', } issue s recorded by Stage 5" check "stages 1 2 guardrails passed", bool globals .get "stages 1 2 guardrails passed", False , "consumed from Stages 1-2 Output Guardrails cell" check "stages 3 4 guardrails passed", bool globals .get "stages 3 4 guardrails passed", False , "consumed from Stages 3-4 Output Guardrails cell" check "post artifact sanity passed", bool globals .get "post artifact sanity passed", False , "consumed from Post-Artifact Sanity Check cell" target match = re.search r"\b ?:to|from|in|on \s+ A-Za-z \w$ \. A-Za-z \w$ ", CHANGE TEXT, flags= re.IGNORECASE if target match: live target = target match.group 2 .upper bundle target = bundle.get "change json", {} .get "target table" or "" .upper check "change text consistent with bundle", live target == bundle target, f"live='{ live target}', bundle='{ bundle target}'" else: check "change text consistent with bundle", True, "no extractable target in CHANGE TEXT skipped " check "openai api key set in env", bool os.getenv "OPENAI API KEY" , "present" if os.getenv "OPENAI API KEY" else "missing" check "agents sdk min version", version tuple AGENTS SDK VERSION = version tuple MIN AGENTS SDK VERSION , f"found={AGENTS SDK VERSION}, required ={MIN AGENTS SDK VERSION}" pre promptfoo passed = all c "ok" for c in pre promptfoo checks trace function result "Pre-Promptfoo readiness summary", output obj={"passed": pre promptfoo passed, "checks": pre promptfoo checks} flush traces print "=" 60 print f"Pre-Promptfoo Readiness: {'PASS' if pre promptfoo passed else 'FAIL'}" print "=" 60 for c in pre promptfoo checks: flag = "OK " if c "ok" else "FAIL" print f" { flag} { c 'name' :35s} { c 'detail' }" if not pre promptfoo passed: print print "One or more readiness checks failed. Promptfoo will likely fail or eval stale state." print "Investigate the failed checks above before running Section 10." 10 Evaluate the Flow with Promptfoo Promptfoo is now part of OpenAI. This section uses Promptfoo’s Jupyter/Colab pattern to run evals from notebook cells while keeping the SchemaFlow logic readable in Python. Promptfoo itself runs via Node.js, and the evaluated flow is provided through Promptfoo’s Python file:// provider and Python assertion integrations. This optional section turns the notebook workflow into a repeatable eval. The core notebook run validates one live example. Promptfoo adds a reusable eval harness that can run parse-only and full-flow checks using generated provider and assertion files, which is useful when you want to keep the same workflow stable as prompts, models, or inputs change. Because Promptfoo launches a separate Python process, it cannot directly access variables that only exist inside the active notebook kernel. To solve that, the next cells publish runtime files from the current notebook state: - a reusable schemaflow cookbook core.py module - a Python Promptfoo provider - a Python Promptfoo assertion file - generated eval cases - a generated Promptfoo config This section includes three validation layers: - Input preflight - deterministic checks before writing the config - no model calls - Parse-only eval - checks Stage 1 behavior - verifies target, operation presence, expected added column, and expected data type - Full-flow eval - checks downstream impact, SQL terms, and validation status Eval results are printed in the notebook and exported as timestamped JSON and HTML files under: artifacts/promptfoo/results/ The latest successful run also refreshes: schemaflow cookbook eval latest.json schemaflow cookbook eval latest.html Runtime note: the core SchemaFlow cells require Python and an OpenAI API key. The Promptfoo cells additionally require Node.js and npm in the same executable notebook runtime. After the eval runs, Promptfoo provides a compact view of the current change request, expected fields, parse-only check, and full-flow check. Use this view to answer questions such as: - Did the Parse Agent extract the expected target table? - Did it detect the expected added column? - Did it preserve the requested data type? - Did the full flow produce impact risks? - Did the SQL include required terms? - Did deterministic validation pass? Promptfoo Runtime Directory Setup This cell creates notebook-local directories for Promptfoo config, logs, cache, npm cache, and results. Keeping these directories under artifacts/promptfoo/ makes the eval runtime portable and avoids relying on global Promptfoo state under the user’s home directory. The cell also exports environment variables so the generated provider, assertion, and Promptfoo command all use the same trace group and local runtime paths. python from pathlib import Path import os PROMPTFOO DIR = Path "artifacts/promptfoo" PROMPTFOO DIR.mkdir parents=True, exist ok=True PROMPTFOO CONFIG DIR = PROMPTFOO DIR / ".promptfoo" PROMPTFOO LOG DIR = PROMPTFOO CONFIG DIR / "logs" PROMPTFOO CACHE DIR = PROMPTFOO CONFIG DIR / "cache" PROMPTFOO RESULTS DIR = PROMPTFOO DIR / "results" NPM CACHE DIR = PROMPTFOO DIR / ".npm-cache" for path in PROMPTFOO CONFIG DIR, PROMPTFOO LOG DIR, PROMPTFOO CACHE DIR, PROMPTFOO RESULTS DIR, NPM CACHE DIR : path.mkdir parents=True, exist ok=True os.environ "PROMPTFOO CONFIG DIR" = str PROMPTFOO CONFIG DIR.resolve os.environ "PROMPTFOO LOG DIR" = str PROMPTFOO LOG DIR.resolve os.environ "PROMPTFOO CACHE PATH" = str PROMPTFOO CACHE DIR.resolve os.environ "npm config cache" = str NPM CACHE DIR.resolve os.environ "npm config update notifier" = "false" os.environ "npm config loglevel" = "error" os.environ "SCHEMAFLOW TRACE GROUP ID" = SCHEMAFLOW TRACE GROUP ID os.environ "OPENAI AGENTS TRACE INCLUDE SENSITIVE DATA" = os.getenv "OPENAI AGENTS TRACE INCLUDE SENSITIVE DATA", "false" print "Promptfoo runtime dir:", PROMPTFOO DIR.resolve print "Promptfoo config dir:", PROMPTFOO CONFIG DIR.resolve print "Promptfoo results dir:", PROMPTFOO RESULTS DIR.resolve print "Notebook-local npm cache:", NPM CACHE DIR.resolve print "Promptfoo trace group:", SCHEMAFLOW TRACE GROUP ID Node.js and npm Runtime Check Promptfoo runs through Node.js, even though the SchemaFlow provider and assertion logic are written in Python. This cell verifies that the notebook runtime has a supported node and npm available. The check is intentionally explicit. The notebook does not silently install or upgrade Node because that depends on the execution environment. For local macOS notebooks, the cell prefers a supported nvm Node runtime before common Homebrew paths. This helps ensure that the notebook and terminal use the same Node ABI and avoids stale native dependencies. If this check fails, fix the runtime first and then rerun the Promptfoo section. python import os import re import shutil import subprocess from pathlib import Path REQUIRED NODE = "^20.20.0 or =22.22.0" COMMON NODE DIRS = "/opt/homebrew/bin", "/usr/local/bin" def nvm node dirs : root = Path.home / ".nvm" / "versions" / "node" if not root.exists : return candidates = for node bin in root.glob " /bin/node" : version = node version str node bin if version and node is supported version 1 : candidates.append version 1 , str node bin.parent return path for , path in sorted candidates, reverse=True def node version node cmd="node" : try: raw = subprocess.check output node cmd, "--version" , text=True .strip except OSError, subprocess.CalledProcessError : return None match = re.match r"v? \d+ \. \d+ \. \d+ ", raw if not match: return None return raw, tuple int part for part in match.groups def node is supported version tuple : major, minor, patch = version tuple return major == 20 and minor = 20 or major = 22 def prepend path path dir : parts = os.environ.get "PATH", "" .split os.pathsep parts = p for p in parts if p and p = path dir os.environ "PATH" = path dir + os.pathsep + os.pathsep.join parts def ensure promptfoo node runtime : node path = shutil.which "node" npm path = shutil.which "npm" current = node version "node" if node path else None if current and npm path and node is supported current 1 : print f"Node OK: {node path} {current 0 } " print f"npm: {npm path}" return for candidate dir in nvm node dirs , COMMON NODE DIRS : candidate node = Path candidate dir / "node" candidate npm = Path candidate dir / "npm" if not candidate node.exists or not candidate npm.exists : continue candidate = node version str candidate node if candidate and node is supported candidate 1 : prepend path candidate dir print f"Switched notebook PATH to supported Node: {candidate node} {candidate 0 } " print f"npm: {candidate npm}" return detected = current 0 if current else "not found" raise RuntimeError "Promptfoo requires Node.js " + REQUIRED NODE + ".\n" f"Detected Node: {detected}.\n\n" "Use an executable runtime with supported Node/npm before continuing.\n" "Examples:\n" "- Google Colab or Codespaces: run the notebook in that runtime and rerun this cell.\n" "- macOS nvm: nvm install 22 && nvm use 22 , then start Jupyter from that terminal.\n" "- macOS Homebrew: brew install node , then start Jupyter from a terminal where the intended Node is first on PATH.\n" "- nvm: nvm install 22 && nvm use 22 , then start Jupyter from that same shell.\n\n" "Static notebook preview in a browser cannot run Promptfoo evals." ensure promptfoo node runtime Publish SchemaFlow Core Runtime Promptfoo runs the evaluated flow in a separate Python process. This cell writes a reusable Python module named: artifacts/promptfoo/schemaflow cookbook core.py The generated module contains the same core SchemaFlow logic used by the notebook: - Pydantic models - Agents SDK setup - output normalization helpers - Parse Agent execution - Impact Agent execution - optional PDF vector store creation - Plan Agent execution - SQL Agent execution - SQL validation - parse-only eval entrypoint - full-flow eval entrypoint The prompt strings are injected from the current notebook variables. That means if you edit the Parse, Impact, Plan, or SQL prompts above and rerun this cell, the Promptfoo runtime receives the updated prompts. python from pathlib import Path CORE MODULE TEMPLATE = r''' import json import os import re from concurrent.futures import ThreadPoolExecutor from datetime import datetime, timezone from importlib.metadata import PackageNotFoundError, version from pathlib import Path from openai import OpenAI from pydantic import BaseModel, ConfigDict, Field from agents import Agent, AgentOutputSchema, FileSearchTool, Runner, RunConfig, custom span, flush traces, function span, guardrail span, trace MODEL = os.getenv "OPENAI MODEL", MODEL DEFAULT PARSE SYSTEM = PARSE SYSTEM IMPACT SYSTEM = IMPACT SYSTEM PLAN SYSTEM = PLAN SYSTEM SQL SYSTEM = SQL SYSTEM MIN AGENTS SDK VERSION = "0.17.0" TRACE INCLUDE SENSITIVE DATA = os.getenv "OPENAI AGENTS TRACE INCLUDE SENSITIVE DATA", "false" .lower in {"1", "true", "yes", "on"} SCHEMAFLOW TRACE GROUP ID = os.getenv "SCHEMAFLOW TRACE GROUP ID", "schemaflow-cookbook-promptfoo" def version tuple value : match = re.match r"^ \d+ \. \d+ \. \d+ ", str value or "" return tuple int part for part in match.groups if match else 0, 0, 0 try: AGENTS SDK VERSION = version "openai-agents" except PackageNotFoundError as exc: raise RuntimeError 'Install the OpenAI Agents SDK: pip install -U "openai-agents =0.17.0"' from exc if version tuple AGENTS SDK VERSION < version tuple MIN AGENTS SDK VERSION : raise RuntimeError f"OpenAI Agents SDK {MIN AGENTS SDK VERSION}+ is required; found {AGENTS SDK VERSION}." class SchemaFlowBaseModel BaseModel : model config = ConfigDict extra="allow" class OperationModel SchemaFlowBaseModel : op: str details: dict = Field default factory=dict class ChangeRequestModel SchemaFlowBaseModel : title: str | None = None domain: str | None = None target schema: str | None = None target table: str | None = None operations: list OperationModel = Field default factory=list notes: list = Field default factory=list class ImpactObjectModel SchemaFlowBaseModel : type: str name: str reason: str source: str class ImpactModel SchemaFlowBaseModel : impacted objects: list ImpactObjectModel = Field default factory=list risks: list str = Field default factory=list assumptions: list str = Field default factory=list class PlanStepModel SchemaFlowBaseModel : id: str description: str class PlanModel SchemaFlowBaseModel : plan steps: list PlanStepModel = Field default factory=list prechecks: list str = Field default factory=list postchecks: list str = Field default factory=list rollback: list str = Field default factory=list CHANGE OUTPUT SCHEMA = AgentOutputSchema ChangeRequestModel, strict json schema=False IMPACT OUTPUT SCHEMA = AgentOutputSchema ImpactModel, strict json schema=False PLAN OUTPUT SCHEMA = AgentOutputSchema PlanModel, strict json schema=False def clean openai api key value : key = value or "" .strip if not key: raise RuntimeError "OPENAI API KEY is required for SchemaFlow evals" return key def ensure openai api key api key=None : if api key is not None: os.environ "OPENAI API KEY" = clean openai api key api key else: os.environ "OPENAI API KEY" = clean openai api key os.getenv "OPENAI API KEY" org id = os.getenv "OPENAI ORG ID", "" .strip if org id: os.environ "OPENAI ORG ID" = org id def get client api key=None : ensure openai api key api key return OpenAI api key=os.environ "OPENAI API KEY" def parse json text text : text = text or "{}" .strip if text.startswith " " : text = re.sub r"^ ?:json ?\s ", "", text text = re.sub r"\s $", "", text .strip try: return json.loads text except json.JSONDecodeError: match = re.search r"\{. \}", text, flags=re.DOTALL if not match: raise return json.loads match.group 0 def model dump value : if value is None or isinstance value, str, int, float, bool, bytes : return value if isinstance value, type : return value if hasattr value, "model dump" : try: return value.model dump except TypeError: pass if hasattr value, "to dict" : try: return value.to dict except TypeError: pass if hasattr value, " dict " : try: return {k: v for k, v in vars value .items if not k.startswith " " } except TypeError: pass return value def agent output to json value : value = model dump value if isinstance value, dict : return value if isinstance value, str : return parse json text value return json.loads json.dumps value, default=str def agent output to text value : value = model dump value if isinstance value, str : return value.strip return json.dumps value, ensure ascii=False def trace metadata metadata=None : cleaned = {} for key, value in metadata or {} .items : if value is None: cleaned str key = "" elif isinstance value, bool : cleaned str key = "true" if value else "false" elif isinstance value, dict, list, tuple, set : cleaned str key = json.dumps value, ensure ascii=False, default=str else: cleaned str key = str value return cleaned def schemaflow run config workflow name, metadata=None : return RunConfig workflow name=workflow name, group id=SCHEMAFLOW TRACE GROUP ID, trace include sensitive data=TRACE INCLUDE SENSITIVE DATA, trace metadata= trace metadata {"runtime": "promptfoo", metadata or {} } , def runner run sync agent, prompt, , workflow name, metadata=None, max turns=4 : kwargs = {"run config": schemaflow run config workflow name, metadata , "max turns": max turns} try: return Runner.run sync agent, prompt, kwargs except RuntimeError as exc: if "event loop" not in str exc .lower : raise with ThreadPoolExecutor max workers=1 as pool: return pool.submit lambda: Runner.run sync agent, prompt, kwargs .result def run schemaflow json agent , name, instructions, prompt, output schema, model=None, tools=None, workflow name=None, metadata=None : agent = Agent name=name, instructions=instructions, model=model or MODEL, output type=output schema, tools=tools or result = runner run sync agent, prompt, workflow name=workflow name or name, metadata={"agent": name, metadata or {} } return agent output to json result.final output , result def run schemaflow text agent , name, instructions, prompt, model=None, tools=None, workflow name=None, metadata=None : agent = Agent name=name, instructions=instructions, model=model or MODEL, tools=tools or result = runner run sync agent, prompt, workflow name=workflow name or name, metadata={"agent": name, metadata or {} } return agent output to text result.final output , result def trace function result name, , input obj=None, output obj=None : with function span name, input=json.dumps input obj, ensure ascii=False, default=str if input obj is not None else None, output=json.dumps output obj, ensure ascii=False, default=str if output obj is not None else None, : pass def collect file search results value : results = seen = set def visit node : if node is None or isinstance node, str, int, float, bool, bytes : return if isinstance node, type or callable node : return node id = id node if node id in seen: return seen.add node id node = model dump node if node is None or isinstance node, str, int, float, bool, bytes : return if isinstance node, type or callable node : return if isinstance node, dict : if node.get "type" == "file search call": for result in node.get "results", or : result = model dump result if isinstance result, dict : text = result.get "text" or result.get "content" or "" if isinstance text, list : text = "\n".join str x for x in text results.append {"file id": result.get "file id" , "filename": result.get "filename" or result.get "file name" or result.get "title" , "score": result.get "score" , "text preview": str text :1200 } for child in node.values : visit child elif isinstance node, list, tuple, set : for child in node: visit child visit value return results def normalize change change json : if not isinstance change json, dict : change json = {} change json.setdefault "title", None change json.setdefault "domain", None change json.setdefault "target schema", None change json.setdefault "target table", None if not isinstance change json.get "operations" , list : change json "operations" = change json.get "operations" if change json.get "operations" else if not isinstance change json.get "notes" , list : change json "notes" = return change json def parse change change text, model=None : change json, = run schemaflow json agent name="SchemaFlow Parse Agent", instructions=PARSE SYSTEM, prompt="Change Request:\n\n" + change text, output schema=CHANGE OUTPUT SCHEMA, model=model, workflow name="SchemaFlow Eval Parse", metadata={"eval stage": "parse"}, return normalize change change json def run schemaflow parse change text, , model=None, api key=None : ensure openai api key api key with custom span "SchemaFlow Promptfoo Parse Eval", {"eval mode": "parse only", "group id": SCHEMAFLOW TRACE GROUP ID} : try: change json = parse change change text, model=model bundle = {"eval mode": "parse only", "change text": change text, "change json": change json, "validation": {"valid": True, "issues": }} trace function result "Promptfoo parse bundle", input obj={"change text": change text}, output obj=bundle return bundle finally: flush traces def normalize impact impact json : if not isinstance impact json, dict : impact json = {} impact json.setdefault "impacted objects", impact json.setdefault "risks", impact json.setdefault "assumptions", return impact json def normalize plan plan json : if not isinstance plan json, dict : plan json = {} plan json.setdefault "plan steps", plan json.setdefault "prechecks", plan json.setdefault "postchecks", plan json.setdefault "rollback", return plan json def resolve eval pdf path pdf path : requested = Path pdf path .expanduser module dir = Path file .resolve .parent candidates = requested if not requested.is absolute : candidates.extend module dir / requested, module dir.parent / requested, module dir.parent.parent / requested, resolved candidates = for candidate in candidates: resolved = candidate.resolve if resolved in resolved candidates: continue resolved candidates.append resolved if resolved.exists : return resolved attempted = ", ".join str candidate for candidate in resolved candidates raise FileNotFoundError f"PDF not found: {pdf path}. Tried: {attempted}" def create pdf vector store client, pdf path, name prefix="schemaflow-cookbook" : pdf path = resolve eval pdf path pdf path if pdf path.suffix.lower = ".pdf": raise ValueError f"Expected a PDF file, got: {pdf path}" with custom span "Promptfoo create vector store", {"pdf path": str pdf path } : vector store = client.vector stores.create name=f"{name prefix}-{datetime.now timezone.utc .strftime '%Y%m%dT%H%M%SZ' }", expires after={"anchor": "last active at", "days": 1} with custom span "Promptfoo upload PDF to vector store", {"vector store id": vector store.id, "pdf path": str pdf path } : with pdf path.open "rb" as handle: vector store file = client.vector stores.files.upload and poll vector store id=vector store.id, file=handle trace function result "Promptfoo vector store ready", input obj={"pdf path": str pdf path }, output obj={"vector store id": vector store.id, "status": getattr vector store file, "status", "unknown" } return vector store, vector store file def delete vector store client, vector store id : if not vector store id: return try: with custom span "Promptfoo delete vector store", {"vector store id": vector store id} : client.vector stores.delete vector store id=vector store id except Exception: pass def validate sql sql text, required keywords=None : issues = if not sql text or "" .strip : issues.append "SQL output is empty" for keyword in required keywords or "ALTER TABLE" : if keyword.lower not in sql text or "" .lower : issues.append f"Expected keyword missing: {keyword}" validation = {"valid": len issues == 0, "issues": issues} with guardrail span "promptfoo sql validation", triggered=not validation "valid" : trace function result "Promptfoo SQL validation", output obj=validation return validation def run schemaflow case change text, , pdf path=None, rag max results=6, model=None, api key=None, validation keywords=None, delete vector store after run=True : client = get client api key=api key vector store id = None rag file search results = with custom span "SchemaFlow Promptfoo Full Flow Eval", {"eval mode": "full flow", "pdf path": pdf path or "", "group id": SCHEMAFLOW TRACE GROUP ID} : try: change json = parse change change text, model=model impact user parts = "CHANGE JSON:\n" + json.dumps change json, ensure ascii=False impact tools = if pdf path: vector store, = create pdf vector store client, pdf path, name prefix="schemaflow-promptfoo" vector store id = vector store.id impact tools.append FileSearchTool vector store ids= vector store id , max num results=rag max results, include search results=True impact user parts.append "Use the file search tool against the uploaded PDF to look for relevant IFD, schema, table, column, lineage, and downstream dependency context before returning JSON." impact json, impact result = run schemaflow json agent name="SchemaFlow Impact Agent", instructions=IMPACT SYSTEM, prompt="\n\n".join impact user parts , output schema=IMPACT OUTPUT SCHEMA, model=model, tools=impact tools, workflow name="SchemaFlow Eval Impact", metadata={"eval stage": "impact", "rag enabled": bool vector store id } impact json = normalize impact impact json try: rag file search results = collect file search results impact result except Exception as exc: rag file search results = trace function result "Promptfoo File Search summary skipped", output obj={"error": f"{type exc . name }: {exc}"} plan user = "\n\n".join "CHANGE JSON:\n" + json.dumps change json, ensure ascii=False , "IMPACT JSON:\n" + json.dumps impact json, ensure ascii=False plan json, = run schemaflow json agent name="SchemaFlow Plan Agent", instructions=PLAN SYSTEM, prompt=plan user, output schema=PLAN OUTPUT SCHEMA, model=model, workflow name="SchemaFlow Eval Plan", metadata={"eval stage": "plan"} plan json = normalize plan plan json sql user = "\n\n".join "CHANGE JSON:\n" + json.dumps change json, ensure ascii=False , "PLAN JSON:\n" + json.dumps plan json, ensure ascii=False sql text, = run schemaflow text agent name="SchemaFlow SQL Agent", instructions=SQL SYSTEM, prompt=sql user, model=model, workflow name="SchemaFlow Eval SQL", metadata={"eval stage": "sql"} validation = validate sql sql text, required keywords=validation keywords bundle = {"summary": {"matched tables": , "impact risks": impact json.get "risks", , "rag hits": len rag file search results }, "rag": {"enabled": bool vector store id , "vector store id": vector store id, "hits": len rag file search results , "file search results": rag file search results}, "change json": change json, "impact json": impact json, "plan": plan json, "sql": sql text, "validation": validation} trace function result "Promptfoo full-flow bundle", input obj={"change text": change text}, output obj=bundle return bundle finally: if delete vector store after run: delete vector store client, vector store id flush traces ''' core module = CORE MODULE TEMPLATE .replace " MODEL DEFAULT ", repr MODEL .replace " PARSE SYSTEM ", repr PARSE SYSTEM .replace " IMPACT SYSTEM ", repr IMPACT SYSTEM .replace " PLAN SYSTEM ", repr PLAN SYSTEM .replace " SQL SYSTEM ", repr SQL SYSTEM core path = PROMPTFOO DIR / "schemaflow cookbook core.py" core path.write text core module, encoding="utf-8" print "Published SchemaFlow core:", core path.resolve Promptfoo Provider Runtime This cell writes the Promptfoo provider file: artifacts/promptfoo/schemaflow cookbook eval provider.py The provider is the bridge between Promptfoo and SchemaFlow. For each Promptfoo test case, it reads variables such as: change text eval mode - optional pdf path - optional rag max results - validation keywords Then it chooses one of two execution paths: parse only runs only Stage 1 and returns a parse bundle. full flow runs the complete SchemaFlow pipeline and returns the full bundle. The provider returns JSON so Promptfoo assertions can inspect structured fields instead of parsing notebook text output. python %%writefile artifacts/promptfoo/schemaflow cookbook eval provider.py import json import os from agents import flush traces, function span, trace from schemaflow cookbook core import run schemaflow case, run schemaflow parse SCHEMAFLOW TRACE GROUP ID = os.getenv "SCHEMAFLOW TRACE GROUP ID", "schemaflow-cookbook-promptfoo" def json list value : if value is None: return if isinstance value, list : return value try: parsed = json.loads value except Exception: return value return parsed if isinstance parsed, list else parsed def trace function result name, , input obj=None, output obj=None : with function span name, input=json.dumps input obj, ensure ascii=False, default=str if input obj is not None else None, output=json.dumps output obj, ensure ascii=False, default=str if output obj is not None else None : pass def call api prompt, options, context : vars = context or {} .get "vars", {} change text = vars .get "change text" or prompt eval mode = vars .get "eval mode", "full flow" with trace "SchemaFlow Promptfoo Provider", group id=SCHEMAFLOW TRACE GROUP ID, metadata={"eval mode": eval mode} : try: if eval mode == "parse only": bundle = run schemaflow parse change text elif eval mode == "full flow": bundle = run schemaflow case change text, pdf path=vars .get "pdf path" , rag max results=int vars .get "rag max results" or 6 , validation keywords= json list vars .get "validation keywords json" bundle "eval mode" = "full flow" else: raise ValueError f"Unsupported eval mode: {eval mode}" trace function result "Promptfoo provider output", input obj={"eval mode": eval mode, "change text": change text, "vars": vars }, output obj=bundle return {"output": json.dumps bundle, ensure ascii=False } finally: flush traces Promptfoo Assertion Runtime This cell writes the Promptfoo assertion file: artifacts/promptfoo/schemaflow cookbook eval assert.py The assertion file validates provider output for both eval modes. For parse only , it checks: - output is valid JSON - target schema and table match expectations - at least one parsed operation is present - expected added column appears in parsed operations - expected data type appears structurally in parsed operations For full flow , it checks: - output is valid JSON - target schema and table match expectations - at least one parsed operation is present - impact risks are present - required SQL terms are present - validation passed The assertion also emits guardrail spans so eval failures are visible in traces. python %%writefile artifacts/promptfoo/schemaflow cookbook eval assert.py import json import os import re from agents import flush traces, function span, guardrail span, trace SCHEMAFLOW TRACE GROUP ID = os.getenv "SCHEMAFLOW TRACE GROUP ID", "schemaflow-cookbook-promptfoo" def json list value : if value is None: return if isinstance value, list : return value try: parsed = json.loads value except Exception: return value return parsed if isinstance parsed, list else parsed def normalize name value : return value or "" .replace '"', "" .replace "'", "" .strip .upper def normalize text value : return " ".join str value or "" .upper .replace '"', "" .replace "'", "" .split def compact text value : return re.sub r"\s+", "", normalize text value def operation text bundle : operations = bundle.get "change json", {} .get "operations", return normalize text json.dumps operations, ensure ascii=False def trace function result name, , input obj=None, output obj=None : with function span name, input=json.dumps input obj, ensure ascii=False, default=str if input obj is not None else None, output=json.dumps output obj, ensure ascii=False, default=str if output obj is not None else None : pass def check target bundle, expected schema, expected table : if not expected schema or not expected table: return True, "target expectation not configured" change = bundle.get "change json", {} actual schema = normalize name change.get "target schema" actual table = normalize name change.get "target table" return actual schema == normalize name expected schema and actual table == normalize name expected table , f"expected target { normalize name expected schema }.{ normalize name expected table }, got {actual schema}.{actual table}" def check expected text bundle, value, label : if not value: return True, f"{label} expectation not configured" haystack = operation text bundle needle = normalize text value return needle in haystack, f"expected parsed {label} {needle} in operations" def check expected data type bundle, value : if not value: return True, "data type expectation not configured" haystack = operation text bundle compact haystack = compact text haystack compact needle = compact text value if compact needle in compact haystack: return True, "data type matched" match = re.match r" A-Z + \ ? 0-9, \ ?", compact needle if not match: return False, f"expected parsed data type {value} in operations" base type, size = match.groups if base type and base type not in compact haystack: return False, f"expected parsed data type base {base type} in operations" if size: missing sizes = part for part in size.split "," if part and part not in compact haystack if missing sizes: return False, f"expected parsed data type size {size} in operations" return True, "data type matched structurally" def get assert output, context : vars = context or {} .get "vars", {} eval mode = vars .get "eval mode", "full flow" with trace "SchemaFlow Promptfoo Assertion", group id=SCHEMAFLOW TRACE GROUP ID, metadata={"eval mode": eval mode} : try: try: bundle = json.loads output except Exception as exc: result = {"pass": False, "score": 0, "reason": f"Provider output was not JSON: {exc}"} with guardrail span "provider output json", triggered=True : trace function result "Promptfoo assertion parse failure", input obj={"output": output}, output obj=result return result checks = ok, reason = check target bundle, vars .get "expected schema" , vars .get "expected table" checks.append "target matches expected", ok, reason operations = bundle.get "change json", {} .get "operations", checks.append "parsed operation present", isinstance operations, list and len operations 0, "expected at least one parsed operation" if eval mode == "parse only": ok, reason = check expected text bundle, vars .get "expected added column" , "added column" checks.append "expected added column", ok, reason ok, reason = check expected data type bundle, vars .get "expected data type" checks.append "expected data type", ok, reason else: risks = bundle.get "impact json", {} .get "risks", checks.append "impact risks present", isinstance risks, list and len risks 0, "expected at least one impact risk" sql text = bundle.get "sql" or bundle.get "sql text" or "" missing terms = term for term in json list vars .get "sql terms json" if term.lower not in sql text.lower checks.append "sql terms present", not missing terms, "missing SQL terms: " + ", ".join missing terms validation = bundle.get "validation", {} checks.append "validation passed", bool validation.get "valid" , "validation issues: " + "; ".join validation.get "issues", for name, ok, reason in checks: with guardrail span name, triggered=not ok : trace function result "Promptfoo assertion check", output obj={"name": name, "ok": ok, "reason": reason} passed = ok for , ok, in checks if ok failures = reason for , ok, reason in checks if not ok score = len passed / len checks if checks else 0 result = {"pass": score == 1, "score": score, "reason": "All checks passed" if not failures else "; ".join failures } trace function result "Promptfoo assertion result", input obj={"vars": vars }, output obj=result return result finally: flush traces Build Promptfoo Test Cases and Config This cell builds Promptfoo test cases from the current notebook input. By default, it creates two test cases from the current CHANGE TEXT , carrying through PDF PATH when a PDF is configured: - a parse-only test - a full-flow test The helper functions infer expectations from the change request, including: - expected schema - expected table - expected added column - expected data type - expected SQL terms - expected validation keywords The cell also includes optional regression fixtures. Set: RUN EXTRA REGRESSION CASES = True to add those extra cases to the generated config. Before writing promptfooconfig.yaml , the cell runs deterministic input preflight checks. This prevents obviously malformed eval inputs from producing confusing Promptfoo failures. python import json import re import sys from pathlib import Path def infer eval expectations change text : target match = re.search r"\b ?:to|from|in|on \s+ A-Za-z \w$ \. A-Za-z \w$ ", change text, flags=re.IGNORECASE, column type match = re.search r"\badd\s+ A-Za-z \w$ \s+ ?:VAR ?CHAR\s \ ^ \ |TEXT|INTEGER|INT|BIGINT|BOOLEAN|DATE|TIMESTAMP|NUMERIC\s \ ^ \ |DECIMAL\s \ ^ \ |FLOAT|DOUBLE ", change text, flags=re.IGNORECASE, expected schema = target match.group 1 .upper if target match else None expected table = target match.group 2 .upper if target match else None added column = column type match.group 1 .upper if column type match else None data type = " ".join column type match.group 2 .upper .split if column type match else None sql terms = validation keywords = "ALTER TABLE" if expected table: sql terms.append expected table if added column: sql terms.append added column if data type: sql terms.append data type sql terms.append "ALTER TABLE" lower text = change text.lower if any term in lower text for term in "backfill", "update", "source it from" : sql terms.append "UPDATE" validation keywords.append "UPDATE" if "index" in lower text: sql terms.append "CREATE INDEX" validation keywords.append "CREATE INDEX" return { "expected schema": expected schema, "expected table": expected table, "expected added column": added column, "expected data type": data type, "sql terms": list dict.fromkeys sql terms , "validation keywords": list dict.fromkeys validation keywords , } def build eval case description, change text, overrides : expectations = infer eval expectations change text vars = { "change text": change text, "sql terms json": json.dumps expectations "sql terms" , ensure ascii=False , "validation keywords json": json.dumps expectations "validation keywords" , ensure ascii=False , } for key in "expected schema", "expected table", "expected added column", "expected data type" : if expectations.get key : vars key = expectations key vars .update {k: v for k, v in overrides.items if v is not None} return {"description": description, "vars": vars } def json list value : if value is None: return if isinstance value, list : return value parsed = json.loads value return parsed if isinstance parsed, list else parsed def preflight eval case case : vars = case "vars" errors = warnings = change text = vars .get "change text", "" sql terms = json list vars .get "sql terms json" if len change text.strip < 20: errors.append "change text is missing or too short" if not vars .get "expected schema" or not vars .get "expected table" : errors.append "could not infer target schema/table" if not vars .get "expected added column" : warnings.append "could not infer added column" if not vars .get "expected data type" : warnings.append "could not infer added column data type" if len sql terms <= 1: warnings.append "few SQL terms inferred" if vars .get "pdf path" : pdf path = Path vars "pdf path" .expanduser if not pdf path.exists : errors.append f"pdf path does not exist: {pdf path}" elif pdf path.suffix.lower = ".pdf": errors.append f"pdf path is not a PDF: {pdf path}" return {"description": case "description" , "errors": errors, "warnings": warnings} def as promptfoo test case, eval mode : vars = dict case "vars" vars "eval mode" = eval mode label = "Parse-only" if eval mode == "parse only" else "Full flow" return {"description": f"{label}: {case 'description' }", "vars": vars } CURRENT NOTEBOOK EVAL CASE = build eval case "Current notebook change request", CHANGE TEXT, pdf path=str Path PDF PATH .expanduser .resolve if PDF PATH else None, RUN EXTRA REGRESSION CASES = False EXTRA REGRESSION CASES = build eval case "Product style color propagation", """Add COLOR CODE VARCHAR 10 to ODS.ODS PLIM STYLE as nullable. Source it from FLEX.STYLE.COLOR CODE when available and propagate the field through staging and mart outputs used by product reporting.""", , build eval case "Optional customer note field", """Add CUSTOMER SEGMENT NOTE VARCHAR 255 to ODS.ODS CUSTOMER PROFILE as nullable. No historical backfill is required. The field is optional metadata for analyst annotations and should not block existing loads.""", , ADDITIONAL EVAL CASES = EXTRA REGRESSION CASES if RUN EXTRA REGRESSION CASES else INPUT EVAL CASES = CURRENT NOTEBOOK EVAL CASE, ADDITIONAL EVAL CASES INPUT PREFLIGHT RESULTS = preflight eval case case for case in INPUT EVAL CASES INPUT PREFLIGHT ERRORS = f"{result 'description' }: {error}" for result in INPUT PREFLIGHT RESULTS for error in result "errors" print "Input preflight:" for result in INPUT PREFLIGHT RESULTS: status = "PASS" if not result "errors" else "FAIL" print f"- {status}: {result 'description' }" for warning in result "warnings" : print f" warning: {warning}" for error in result "errors" : print f" error: {error}" if INPUT PREFLIGHT ERRORS: raise ValueError "Input preflight failed:\n" + "\n".join INPUT PREFLIGHT ERRORS PROMPTFOO PARSE EVAL CASES = as promptfoo test case, "parse only" for case in INPUT EVAL CASES PROMPTFOO FULL FLOW EVAL CASES = as promptfoo test case, "full flow" for case in INPUT EVAL CASES PROMPTFOO EVAL CASES = PROMPTFOO PARSE EVAL CASES, PROMPTFOO FULL FLOW EVAL CASES promptfoo config = { "description": "SchemaFlow cookbook evals", "prompts": "{{change text}}" , "providers": { "id": "file://schemaflow cookbook eval provider.py", "config": {"pythonExecutable": sys.executable}, } , "defaultTest": { "assert": { "type": "python", "value": "file://schemaflow cookbook eval assert.py", } , }, "tests": PROMPTFOO EVAL CASES, } config path = PROMPTFOO DIR / "promptfooconfig.yaml" config text = " yaml-language-server: $schema=https://promptfoo.dev/config-schema.json\n" + json.dumps promptfoo config, indent=2, ensure ascii=False, config path.write text config text, encoding="utf-8" print "Promptfoo config:", config path.resolve print "Promptfoo Python executable:", sys.executable print "Promptfoo eval cases:", len PROMPTFOO EVAL CASES for case in PROMPTFOO EVAL CASES: vars = case "vars" target = ".".join part for part in vars .get "expected schema" , vars .get "expected table" if part print "-", case "description" , "- ", target or "target not inferred" Run Promptfoo Eval This cell runs Promptfoo non-interactively from the notebook. The command: - runs from artifacts/promptfoo/ - uses the generated promptfooconfig.yaml - uses notebook-local Promptfoo config, cache, logs, and npm cache - runs with concurrency 1 for predictable notebook behavior - keeps CLI output visible in the notebook - writes timestamped JSON and HTML reports - refreshes latest-result aliases after a successful run The exported result files are saved under: artifacts/promptfoo/results/ If the Node.js/npm runtime check failed earlier, fix the runtime before running this cell. %%bash set -euo pipefail cd artifacts/promptfoo export PROMPTFOO CONFIG DIR="$PWD/.promptfoo" export PROMPTFOO LOG DIR="$PWD/.promptfoo/logs" export PROMPTFOO CACHE PATH="$PWD/.promptfoo/cache" export npm config cache="$PWD/.npm-cache" export npm config update notifier=false export npm config loglevel=error export SCHEMAFLOW TRACE GROUP ID="${SCHEMAFLOW TRACE GROUP ID:-schemaflow-cookbook-promptfoo}" export OPENAI AGENTS TRACE INCLUDE SENSITIVE DATA="${OPENAI AGENTS TRACE INCLUDE SENSITIVE DATA:-false}" mkdir -p "$PROMPTFOO LOG DIR" "$PROMPTFOO CACHE PATH" results RUN ID="$ date -u +%Y%m%dT%H%M%SZ " RESULT JSON="results/schemaflow cookbook eval ${RUN ID}.json" RESULT HTML="results/schemaflow cookbook eval ${RUN ID}.html" npx --yes promptfoo@latest eval \ -c promptfooconfig.yaml \ --max-concurrency 1 \ --no-progress-bar \ --description "SchemaFlow cookbook eval ${RUN ID}" \ -o "$RESULT JSON" "$RESULT HTML" cp "$RESULT JSON" results/schemaflow cookbook eval latest.json cp "$RESULT HTML" results/schemaflow cookbook eval latest.html printf '\nSaved Promptfoo results:\n %s\n %s\n' "$RESULT JSON" "$RESULT HTML" printf 'Latest aliases:\n %s\n %s\n' "results/schemaflow cookbook eval latest.json" "results/schemaflow cookbook eval latest.html" printf 'Trace group:\n %s\n' "$SCHEMAFLOW TRACE GROUP ID" Review Latest Promptfoo Results This cell checks whether the latest Promptfoo result aliases exist and prints their paths and sizes. Expected files: artifacts/promptfoo/results/schemaflow cookbook eval latest.json artifacts/promptfoo/results/schemaflow cookbook eval latest.html If the latest JSON file exists, the cell also prints available eval metadata such as the eval ID and aggregate stats. Use this section as a quick confirmation that the eval completed and exported artifacts successfully. python from pathlib import Path import json results dir = Path "artifacts/promptfoo/results" latest json = results dir / "schemaflow cookbook eval latest.json" latest html = results dir / "schemaflow cookbook eval latest.html" for path in latest json, latest html : if path.exists : print f"{path.name}: {path.resolve } {path.stat .st size:,} bytes " else: print f"Missing expected Promptfoo result: {path.resolve }" if latest json.exists : data = json.loads latest json.read text eval id = data.get "evalId" results = data.get "results", {} stats = results.get "stats", {} if isinstance results, dict else {} if eval id: print "Eval ID:", eval id if stats: print "Stats:", stats 11 Optional Neo4j Knowledge Graph & Dashboard This optional section is fully self-contained and does not affect the core pipeline above . It uses a small synthetic customer-loyalty graph seed plus inline dashboard code so the cookbook stays portable. Readers can treat it as a visual appendix: the core workflow works without Neo4j, but graph views make lineage and downstream impact easier to inspect. What this section does, in order: Step 1 - Seed : define a synthetic customer-loyalty graph with ODS, staging, core, mart, and CRM objects, their columns, lineage, and joins as an inline Python data structure - no external files. Step 2 - AI Enrichment : use the OpenAI client already loaded in Section 1 to fill in semantic meaning a short 2-5 word tag like natural-key , foreign-key , monetary-amount , timestamp for every column. Step 3 - Upsert to Neo4j : write the enriched data to a running Neo4j instance via idempotent MERGE Cypher. Nodes are labeled SchemaFlowCookbook so the dashboard ignores stale sample data from older local runs. Dashboard : write a small FastAPI server + D3.js page next to the notebook, launch it on http://127.0.0.1:8005 , and print a clickable link. Prerequisites for this section: - A running Neo4j instance, e.g. via Docker: docker run -d -p 7687:7687 -p 7474:7474 -e NEO4J AUTH=neo4j/change-me-please neo4j:5 Neo4j Desktop or AuraDB free tier also work . NEO4J URI , NEO4J USER , NEO4J PASSWORD loaded via env, or entered at the prompt below .- A free local port 8005 for the dashboard override via NEO4J DASHBOARD PORT . - Optional packages neo4j , fastapi , uvicorn - the next cell will install them lazily if missing. If any prerequisite is missing, every cell below short-circuits with a clear message; nothing throws and the rest of the notebook is unaffected. 11.1 Environment Setup & Optional Dependencies Mirrors Section 1’s OpenAI env loading pattern. Lazy-installs neo4j , fastapi , and uvicorn only when they are not importable, then reads NEO4J URI , NEO4J USER , and NEO4J PASSWORD from the environment or prompts via getpass if missing . If you press Enter at any prompt without typing, the section is disabled NEO4J SECTION ENABLED = False and the remaining cells skip cleanly. python import os import subprocess import sys from getpass import getpass from urllib.parse import urlparse NEO4J SECTION ENABLED = True def ensure pkg pkg, import name=None : name = import name or pkg try: import name return True except Exception: print f"Installing {pkg} only needed for Section 11 ...", flush=True rc = subprocess.call sys.executable, "-m", "pip", "install", "-q", pkg if rc = 0: print f" pip install {pkg} failed rc={rc} ; Section 11 will be skipped." return False try: import name return True except Exception as e: print f" Import still failing after install of {pkg}: {e}" return False for pkg, imp in "neo4j", "neo4j" , "fastapi", "fastapi" , "uvicorn", "uvicorn" : if not ensure pkg pkg, imp : NEO4J SECTION ENABLED = False if not os.getenv "NEO4J URI" : os.environ "NEO4J URI" = getpass "Enter NEO4J URI e.g. neo4j://127.0.0.1:7687 or press Enter to skip: " if not os.getenv "NEO4J USER" : os.environ "NEO4J USER" = getpass "Enter NEO4J USER default 'neo4j' or press Enter to skip: " or "neo4j" if not os.getenv "NEO4J PASSWORD" : os.environ "NEO4J PASSWORD" = getpass "Enter NEO4J PASSWORD or press Enter to skip: " NEO4J URI = os.getenv "NEO4J URI" or "" .strip NEO4J USER = os.getenv "NEO4J USER" or "" .strip NEO4J PASSWORD = os.getenv "NEO4J PASSWORD" or "" def normalize neo4j uri uri : parsed = urlparse uri if parsed.scheme in {"bolt", "bolt+ssc", "bolt+s", "neo4j", "neo4j+ssc", "neo4j+s"}: return uri if parsed.scheme in {"http", "https"} and parsed.hostname in {"127.0.0.1", "localhost", "::1"}: return f"neo4j://{parsed.hostname}:7687" return uri normalized neo4j uri = normalize neo4j uri NEO4J URI if normalized neo4j uri = NEO4J URI: print f"Converted Neo4j browser URL {NEO4J URI r} to driver URI { normalized neo4j uri r}." NEO4J URI = normalized neo4j uri os.environ "NEO4J URI" = NEO4J URI if not NEO4J URI and NEO4J USER and NEO4J PASSWORD : print "Neo4j credentials not fully provided. Section 11 will be skipped other cells will short-circuit safely ." NEO4J SECTION ENABLED = False else: print f"Neo4j configured: {NEO4J USER}@{NEO4J URI}" print f"NEO4J SECTION ENABLED = {NEO4J SECTION ENABLED}" 11.2 Step 1 - Seed: Define the Knowledge Graph Data Builds the in-memory data structure for the graph: schemas, tables with description , primary key , columns with type , nullable , is primary key , optional description , and a semantic meaning placeholder to be filled by AI in the next step , foreign keys, views, lineage edges DERIVED FROM , and joins. This inline synthetic retail graph is aligned to the cookbook change request: LOYALTY TIER is added to ODS.ODS CUSTOMER PROFILE , sourced from CORE.DIM CUSTOMER , and propagated into downstream staging, core, mart, and CRM consumers. No Neo4j calls here - this cell only prepares Python data structures. Nothing is written until Step 3. SCHEMAS = "ODS", "STG", "CORE", "MARTS", "CRM" TABLES = { "ODS.ODS CUSTOMER PROFILE": { "description": "Raw customer profile table that receives LOYALTY TIER in this change.", "primary key": "CUSTOMER ID" , "columns": {"name": "CUSTOMER ID", "type": "VARCHAR 32 ", "nullable": False, "description": "Stable customer identifier from the source system.", "semantic meaning": "customer-identifier"}, {"name": "EMAIL HASH", "type": "VARCHAR 64 ", "nullable": True, "description": "Hashed email value used for matching without exposing PII."}, {"name": "CUSTOMER STATUS", "type": "VARCHAR 20 ", "nullable": True}, {"name": "LOYALTY TIER", "type": "VARCHAR 20 ", "nullable": True, "description": "Nullable loyalty segment added by the change request and backfilled from CORE.DIM CUSTOMER.", "semantic meaning": "loyalty-segment"}, {"name": "UPDATED AT", "type": "TIMESTAMP", "nullable": True}, {"name": "INGESTED AT", "type": "TIMESTAMP", "nullable": True}, , }, "ODS.ODS ORDER": { "description": "Raw order header feed used to measure loyalty-tier revenue impact.", "primary key": "ORDER ID" , "columns": {"name": "ORDER ID", "type": "VARCHAR 40 ", "nullable": False, "semantic meaning": "order-identifier"}, {"name": "ORDER TS", "type": "TIMESTAMP", "nullable": False}, {"name": "CUSTOMER ID", "type": "VARCHAR 32 ", "nullable": True}, {"name": "ORDER STATUS", "type": "VARCHAR 30 ", "nullable": True}, {"name": "NET AMOUNT", "type": "NUMERIC 12,2 ", "nullable": True}, , }, "STG.STG CUSTOMER PROFILE": { "description": "Staging table that normalizes customer profile rows for downstream dimensions and views.", "primary key": "CUSTOMER ID" , "columns": {"name": "CUSTOMER ID", "type": "VARCHAR 32 ", "nullable": False, "semantic meaning": "customer-identifier"}, {"name": "CUSTOMER STATUS", "type": "VARCHAR 20 ", "nullable": True}, {"name": "LOYALTY TIER", "type": "VARCHAR 20 ", "nullable": True, "description": "Propagated loyalty segment from ODS.ODS CUSTOMER PROFILE.", "semantic meaning": "loyalty-segment"}, {"name": "PROFILE UPDATED AT", "type": "TIMESTAMP", "nullable": True}, , }, "STG.STG ORDER ENRICHED": { "description": "Staging order table enriched with customer status and loyalty tier for metrics.", "primary key": "ORDER ID" , "columns": {"name": "ORDER ID", "type": "VARCHAR 40 ", "nullable": False}, {"name": "CUSTOMER ID", "type": "VARCHAR 32 ", "nullable": True}, {"name": "LOYALTY TIER", "type": "VARCHAR 20 ", "nullable": True, "semantic meaning": "loyalty-segment"}, {"name": "ORDER TS", "type": "TIMESTAMP", "nullable": False}, {"name": "NET AMOUNT", "type": "NUMERIC 12,2 ", "nullable": True}, , }, "CORE.DIM CUSTOMER": { "description": "Conformed customer dimension and source for the LOYALTY TIER backfill.", "primary key": "CUSTOMER SK" , "columns": {"name": "CUSTOMER SK", "type": "BIGINT", "nullable": False, "semantic meaning": "surrogate-key"}, {"name": "CUSTOMER ID", "type": "VARCHAR 32 ", "nullable": False, "semantic meaning": "customer-identifier"}, {"name": "EMAIL HASH", "type": "VARCHAR 64 ", "nullable": True}, {"name": "COUNTRY CODE", "type": "VARCHAR 2 ", "nullable": True}, {"name": "LOYALTY TIER", "type": "VARCHAR 20 ", "nullable": True, "description": "Current loyalty segment used as the backfill source.", "semantic meaning": "loyalty-segment"}, {"name": "IS CURRENT", "type": "BOOLEAN", "nullable": False, "semantic meaning": "current-row-flag"}, {"name": "VALID FROM TS", "type": "TIMESTAMP", "nullable": True}, {"name": "VALID TO TS", "type": "TIMESTAMP", "nullable": True}, , }, "CORE.DIM LOYALTY TIER": { "description": "Reference dimension for loyalty tier labels, rank, and benefits.", "primary key": "LOYALTY TIER" , "columns": {"name": "LOYALTY TIER", "type": "VARCHAR 20 ", "nullable": False, "semantic meaning": "loyalty-segment"}, {"name": "TIER RANK", "type": "INTEGER", "nullable": True}, {"name": "TIER DESCRIPTION", "type": "VARCHAR 255 ", "nullable": True}, {"name": "ACTIVE FLAG", "type": "BOOLEAN", "nullable": True}, , }, "CORE.FACT ORDER": { "description": "Order fact table used by revenue and customer 360 marts.", "primary key": "ORDER ID" , "columns": {"name": "ORDER ID", "type": "VARCHAR 40 ", "nullable": False}, {"name": "CUSTOMER SK", "type": "BIGINT", "nullable": True}, {"name": "ORDER TS", "type": "TIMESTAMP", "nullable": False}, {"name": "NET AMOUNT", "type": "NUMERIC 12,2 ", "nullable": True, "semantic meaning": "monetary-amount"}, , }, "CORE.FACT CUSTOMER ACTIVITY": { "description": "Daily customer activity fact used for retention and loyalty reporting.", "primary key": "CUSTOMER SK", "ACTIVITY DATE" , "columns": {"name": "CUSTOMER SK", "type": "BIGINT", "nullable": False}, {"name": "ACTIVITY DATE", "type": "DATE", "nullable": False}, {"name": "LOYALTY TIER", "type": "VARCHAR 20 ", "nullable": True, "semantic meaning": "loyalty-segment"}, {"name": "ORDER COUNT", "type": "INTEGER", "nullable": True}, {"name": "NET AMOUNT", "type": "NUMERIC 12,2 ", "nullable": True}, , }, "CRM.CUSTOMER SEGMENT EXPORT": { "description": "Activation export consumed by marketing journeys and retention campaigns.", "primary key": "CUSTOMER ID" , "columns": {"name": "CUSTOMER ID", "type": "VARCHAR 32 ", "nullable": False}, {"name": "LOYALTY TIER", "type": "VARCHAR 20 ", "nullable": True, "semantic meaning": "loyalty-segment"}, {"name": "SEGMENT CODE", "type": "VARCHAR 40 ", "nullable": True}, {"name": "EXPORT BATCH ID", "type": "VARCHAR 40 ", "nullable": True}, , }, } VIEWS = { "MARTS.VW CUSTOMER 360": {"description": "Customer 360 view with profile, loyalty tier, and recent activity."}, "MARTS.VW LOYALTY REVENUE": {"description": "Revenue by loyalty tier for dashboarding and finance checks."}, "MARTS.VW RETENTION BY TIER": {"description": "Retention metrics grouped by current loyalty tier."}, } from schema, from table, from col, to schema, to table, to col FOREIGN KEYS = "ODS", "ODS ORDER", "CUSTOMER ID", "ODS", "ODS CUSTOMER PROFILE", "CUSTOMER ID" , "STG", "STG CUSTOMER PROFILE", "CUSTOMER ID", "ODS", "ODS CUSTOMER PROFILE", "CUSTOMER ID" , "STG", "STG ORDER ENRICHED", "CUSTOMER ID", "STG", "STG CUSTOMER PROFILE", "CUSTOMER ID" , "CORE", "DIM CUSTOMER", "CUSTOMER ID", "ODS", "ODS CUSTOMER PROFILE", "CUSTOMER ID" , "CORE", "DIM CUSTOMER", "LOYALTY TIER", "CORE", "DIM LOYALTY TIER", "LOYALTY TIER" , "CORE", "FACT ORDER", "CUSTOMER SK", "CORE", "DIM CUSTOMER", "CUSTOMER SK" , "CORE", "FACT CUSTOMER ACTIVITY", "CUSTOMER SK", "CORE", "DIM CUSTOMER", "CUSTOMER SK" , "CORE", "FACT CUSTOMER ACTIVITY", "LOYALTY TIER", "CORE", "DIM LOYALTY TIER", "LOYALTY TIER" , "CRM", "CUSTOMER SEGMENT EXPORT", "CUSTOMER ID", "ODS", "ODS CUSTOMER PROFILE", "CUSTOMER ID" , DERIVED FROM = "ODS.ODS CUSTOMER PROFILE", "CORE.DIM CUSTOMER" , "STG.STG CUSTOMER PROFILE", "ODS.ODS CUSTOMER PROFILE" , "STG.STG ORDER ENRICHED", "ODS.ODS ORDER" , "STG.STG ORDER ENRICHED", "STG.STG CUSTOMER PROFILE" , "CORE.DIM CUSTOMER", "STG.STG CUSTOMER PROFILE" , "CORE.DIM CUSTOMER", "CORE.DIM LOYALTY TIER" , "CORE.FACT ORDER", "STG.STG ORDER ENRICHED" , "CORE.FACT ORDER", "CORE.DIM CUSTOMER" , "CORE.FACT CUSTOMER ACTIVITY", "CORE.FACT ORDER" , "CORE.FACT CUSTOMER ACTIVITY", "CORE.DIM CUSTOMER" , "MARTS.VW CUSTOMER 360", "CORE.DIM CUSTOMER" , "MARTS.VW CUSTOMER 360", "CORE.FACT CUSTOMER ACTIVITY" , "MARTS.VW LOYALTY REVENUE", "CORE.FACT ORDER" , "MARTS.VW LOYALTY REVENUE", "CORE.DIM LOYALTY TIER" , "MARTS.VW RETENTION BY TIER", "CORE.FACT CUSTOMER ACTIVITY" , "MARTS.VW RETENTION BY TIER", "CORE.DIM LOYALTY TIER" , "CRM.CUSTOMER SEGMENT EXPORT", "MARTS.VW CUSTOMER 360" , "CRM.CUSTOMER SEGMENT EXPORT", "MARTS.VW RETENTION BY TIER" , JOINS = "STG.STG ORDER ENRICHED", "STG.STG CUSTOMER PROFILE" , "CORE.FACT ORDER", "CORE.DIM CUSTOMER" , "CORE.FACT CUSTOMER ACTIVITY", "CORE.DIM CUSTOMER" , "CORE.FACT CUSTOMER ACTIVITY", "CORE.DIM LOYALTY TIER" , "MARTS.VW CUSTOMER 360", "CORE.DIM CUSTOMER" , "MARTS.VW LOYALTY REVENUE", "CORE.DIM LOYALTY TIER" , "MARTS.VW RETENTION BY TIER", "CORE.DIM LOYALTY TIER" , for tid, meta in TABLES.items : pk = set meta.get "primary key", or for c in meta "columns" : c.setdefault "is primary key", c "name" in pk c.setdefault "description", None c.setdefault "semantic meaning", None total cols = sum len t "columns" for t in TABLES.values total pks = sum len t.get "primary key", or for t in TABLES.values print f"Seed prepared: {len SCHEMAS } schemas, {len TABLES } tables, {len VIEWS } views, " f"{ total cols} columns { total pks} primary keys , {len FOREIGN KEYS } FKs, " f"{len DERIVED FROM } DERIVED FROM edges, {len JOINS } JOINS edges." 11.3 Step 2 - AI Enrichment Uses the OpenAI client and MODEL already initialized in Section 1 to generate a short semantic meaning tag 2-5 words, e.g. natural-key , foreign-key , monetary-amount , timestamp , descriptive-text for every column whose value is currently None . One LLM call per column, plain-text response. The prompt is defined inline so this cell remains self-contained. Cost control: capped at MAX ENRICH COLS = 30 columns per run override via env SEED AI ENRICH LIMIT . Set SEED AI ENRICH=0 to skip enrichment entirely. Per-column failures are caught and logged; the cell never raises. Skipped entirely if Section 11 was disabled in Step 0. ENRICH SYSTEM = "You are a data architect assistant. Your task is to provide concise semantic-meaning " "tags 2-5 words for database columns. Do not add any preamble or explanation." def enrich one client, model, table name, c : user prompt = f"Provide a short 2-5 word semantic-meaning tag for a database column " f"named '{c 'name' }'. It is part of the table '{table name}'. " f"It has a data type of '{c.get 'type','UNKNOWN' }'. " f"Examples of valid tags: natural-key, foreign-key, surrogate-key, " f"monetary-amount, timestamp, descriptive-text, category-code, count, boolean-flag. " f"Return only the tag, with no quotes, no punctuation at the end, no extra prose." try: resp = client.responses.create model= model, input= {"role": "system", "content": ENRICH SYSTEM}, {"role": "user", "content": user prompt}, , text = getattr resp, "output text", None or "" .strip if not text: for item in getattr resp, "output", or : for sub in getattr item, "content", or : t = getattr sub, "text", None if isinstance t, str : text += t elif isinstance sub, dict : text += sub.get "text", "" text = text.strip .strip '"' .strip "'" .strip "." return text or None except Exception as e: print f" enrichment failed for {table name}.{c 'name' }: {type e . name }: {e}" return None def run ai enrichment : if not NEO4J SECTION ENABLED: print "Section 11 disabled see Step 0 . Skipping AI enrichment." return enabled = os.getenv "SEED AI ENRICH", "1" .strip .lower in "1", "true", "yes", "on" max cols = int os.getenv "SEED AI ENRICH LIMIT", "30" if not enabled: print "AI enrichment disabled via SEED AI ENRICH=0. Columns will be upserted without semantic meaning." return try: c = client m = MODEL except NameError: print "OpenAI client/MODEL not found. Run Section 1 first, then re-run this cell." return missing = for tid, meta in TABLES.items : for col in meta "columns" : if not col.get "semantic meaning" : missing.append tid, col cap = min len missing , max cols print f"AI enrichment: {len missing } column s need semantic meaning; processing first {cap} " f" cap via SEED AI ENRICH LIMIT, model='{ m}' ." updated = 0 for tid, col in missing :cap : tag = enrich one c, m, tid, col if tag: col "semantic meaning" = tag updated += 1 print f