Real-Time AI Feature Engineering with Spark Structured Streaming and Databricks Feature Store Databricks introduced real-time AI feature engineering using Spark Structured Streaming and the Databricks Feature Store, now part of Unity Catalog as Feature Engineering in Unity Catalog. The solution addresses training-serving skew by ensuring point-in-time correct features from raw Kafka events to online serving in milliseconds. The streaming pipeline reads from Kafka, computes windowed aggregations, and writes features to the Feature Store via foreachBatch. Building point-in-time correct, production-grade feature pipelines — from raw Kafka events to online feature serving in milliseconds, using Spark Structured Streaming and the Databricks Feature Store. Feature engineering is where most ML projects silently fail in production. Not because the model is wrong — but because the features the model sees at training time are different from the features it sees at inference time . This is called training-serving skew , and it's the 1 silent killer of ML systems. Three specific failure modes cause it: The Databricks Feature Store — now part of Unity Catalog as Feature Engineering in Unity Catalog — solves all three by: Understanding the data model behind the Feature Store is essential for designing correct pipelines. Here's how the entities relate: The critical relationship: a Model Version is bound to a Training Set , which records exactly which feature tables and which point-in-time lookups were used. This is how Databricks guarantees reproducibility — you can always re-create the exact training data that produced any model version. Databricks Runtime ML 13.x+ recommended Feature Engineering in Unity Catalog formerly Feature Store %pip install databricks-feature-engineering==0.6.0 --quiet dbutils.library.restartPython from databricks.feature engineering import FeatureEngineeringClient, FeatureLookup from databricks.feature engineering.entities.feature serving endpoint import ServedEntity, EndpointCoreConfig from pyspark.sql import functions as F, SparkSession from pyspark.sql.types import StructType, StructField, StringType, LongType, DoubleType, TimestampType, ArrayType import mlflow spark = SparkSession.builder.getOrCreate fe = FeatureEngineeringClient Unity Catalog paths CATALOG = "prod" FEATURE DB = f"{CATALOG}.feature store" EVENTS TABLE = f"{CATALOG}.silver.events clean" KAFKA BROKER = "kafka-broker.internal:9092" KAFKA TOPIC = "user-events" Checkpoint locations ADLS / S3 / GCS CHECKPOINT BASE = "abfss://checkpoints@storage.dfs.core.windows.net/features" The streaming pipeline reads from Kafka, computes windowed aggregations using Spark's stateful streaming engine, and writes features to the Feature Store via foreachBatch . This keeps the feature table continuously fresh. ── Streaming Feature Pipeline ──────────────────────────────────────────────── Step 1: Define the raw event schema from Kafka event schema = StructType StructField "user id", StringType , False , StructField "event type", StringType , True , StructField "product id", StringType , True , StructField "revenue", DoubleType , True , StructField "session id", StringType , True , StructField "platform", StringType , True , StructField "event ts", TimestampType , False , Step 2: Read from Kafka raw stream = spark.readStream .format "kafka" .option "kafka.bootstrap.servers", KAFKA BROKER .option "subscribe", KAFKA TOPIC .option "startingOffsets", "latest" .option "failOnDataLoss", "false" .load .select F.from json F.col "value" .cast "string" , event schema .alias "data" , F.col "timestamp" .alias "kafka ts" .select "data. ", "kafka ts" Step 3: Apply watermark and compute windowed features Watermark: tolerate up to 10 minutes of late data windowed features = raw stream .withWatermark "event ts", "10 minutes" .groupBy F.col "user id" , F.window F.col "event ts" , "1 hour", "15 minutes" .alias "window" .agg F.count " " .alias "event count 1h" , F.sum F.when F.col "event type" == "purchase", F.col "revenue" .otherwise 0 .alias "revenue 1h" , F.countDistinct "session id" .alias "session count 1h" , F.countDistinct "product id" .alias "unique products 1h" , F.sum F.when F.col "event type" == "purchase", 1 .otherwise 0 .alias "purchase count 1h" , F.first "platform" .alias "last platform" , Flatten window struct to scalar columns .withColumn "window start", F.col "window.start" .withColumn "window end", F.col "window.end" .withColumn "feature ts", F.col "window.end" timestamp key for PIT lookup .drop "window" Derived features .withColumn "conversion rate 1h", F.when F.col "event count 1h" 0, F.col "purchase count 1h" / F.col "event count 1h" .otherwise 0.0 .withColumn "avg revenue per purchase 1h", F.when F.col "purchase count 1h" 0, F.col "revenue 1h" / F.col "purchase count 1h" .otherwise 0.0 Step 4: Write to Feature Store via foreachBatch foreachBatch gives us transactional writes per micro-batch def write to feature store batch df, batch id : """ Called on each micro-batch. Merges feature data into the Feature Store table using merge on keys user id + feature ts . """ if batch df.isEmpty : return fe.write table name=f"{FEATURE DB}.user activity features", df=batch df, mode="merge", upsert: update existing, insert new print f"Batch {batch id}: wrote {batch df.count } feature rows" Step 5: Create the feature table idempotent — safe to re-run try: fe.create table name=f"{FEATURE DB}.user activity features", primary keys= "user id" , timestamp keys= "feature ts" , schema=windowed features.schema, description= "Real-time user activity features computed from event stream. " "1-hour sliding window, refreshed every 15 minutes. " "Primary key: user id. Timestamp key: feature ts window end ." , print "Feature table created." except Exception: print "Feature table already exists — continuing." Step 6: Launch the streaming query streaming query = windowed features.writeStream .outputMode "update" update mode for stateful aggregations .option "checkpointLocation", f"{CHECKPOINT BASE}/user activity" .trigger processingTime="5 minutes" micro-batch every 5 min .foreachBatch write to feature store .start print f"Streaming query '{streaming query.name}' running..." print f"Status: {streaming query.status}" This is the most critical part of the Feature Store. When creating training data, we must join labels to features at the timestamp of the label event — not the current time. This prevents data leakage. ── Point-in-Time Correct Training Dataset ──────────────────────────────────── Step 1: Load the label dataset Each row = one prediction target event, with the exact timestamp at which a model would have needed to make a prediction. labels df = spark.table f"{CATALOG}.gold.churn labels" .select "user id", "churn label", 0 = retained, 1 = churned F.col "observation ts" .alias "event timestamp" , point-in-time anchor "experiment split" train/val/test .filter F.col "observation ts" = "2024-01-01" print f"Label rows: {labels df.count :,}" labels df.show 5 +----------+-----------+---------------------+-----------------+ | user id |churn label| event timestamp | experiment split| +----------+-----------+---------------------+-----------------+ | u 123456 | 0 | 2024-03-15 14:22:00 | train | | u 789012 | 1 | 2024-03-15 18:45:00 | train | Step 2: Define feature lookups as of timestamp=None → use the label's event timestamp point-in-time Databricks will join each label row to the feature values that were valid at event timestamp — not the latest values. feature lookups = User activity features — 1h window features from the streaming pipeline FeatureLookup table name=f"{FEATURE DB}.user activity features", feature names= "event count 1h", "revenue 1h", "session count 1h", "unique products 1h", "purchase count 1h", "conversion rate 1h", "avg revenue per purchase 1h", "last platform", , lookup key="user id", timestamp lookup key="event timestamp", ← PIT anchor , User profile features — slower-changing, from batch pipeline FeatureLookup table name=f"{FEATURE DB}.user profile features", feature names= "account age days", "lifetime revenue", "preferred category", "subscription tier", , lookup key="user id", timestamp lookup key="event timestamp", ← PIT anchor , Transaction aggregates — 30d and 90d rolling windows FeatureLookup table name=f"{FEATURE DB}.transaction features", feature names= "purchase count 30d", "purchase count 90d", "avg order value 30d", "days since last purchase", "category diversity score", , lookup key="user id", timestamp lookup key="event timestamp", , Step 3: Create training dataset Feature Store handles the PIT join training set = fe.create training set df=labels df, feature lookups=feature lookups, label="churn label", exclude columns= "observation ts", "experiment split" , The returned DataFrame has features + labels, PIT-correct training df = training set.load df print f"Training rows: {training df.count :,}" print f"Training cols: {len training df.columns }" training df.show 3 Step 4: Train model and log via Feature Store preserves lineage from sklearn.ensemble import GradientBoostingClassifier import pandas as pd train pdf = training df .filter F.col "experiment split" == "train" .drop "experiment split", "user id" .fillna 0 .toPandas X train = train pdf.drop columns= "churn label" y train = train pdf "churn label" model = GradientBoostingClassifier n estimators=300, learning rate=0.05, max depth=5, subsample=0.8, random state=42, with mlflow.start run run name="churn-gbm-v1" as run: model.fit X train, y train Log model via Feature Store — this records the feature lineage fe.log model model=model, artifact path="churn model", flavor=mlflow.sklearn, training set=training set, ← binds model to its feature lookups registered model name=f"{CATALOG}.ml.user churn model", print f"Logged model with feature lineage. Run: {run.info.run id}" For real-time inference, the model needs features in milliseconds — not the seconds it takes to query Delta Lake. Databricks Feature Store can publish features to an online store DynamoDB, Cosmos DB, MySQL, etc. for low-latency reads. ── Publish Features to Online Store ───────────────────────────────────────── Online stores are configured per feature table. Here we publish user activity features to DynamoDB for <5ms lookups. from databricks.feature engineering.entities.feature store online table import OnlineTable, OnlineTableSpec, TriggeredSchedulingPolicy Create an online table spec backed by a serverless real-time compute layer online table spec = OnlineTableSpec primary key columns= "user id" , source table full name=f"{FEATURE DB}.user activity features", run triggered=OnlineTableSpec.TriggeredSchedulingPolicy , sync on-demand OR for continuous sync: run continuous=OnlineTableSpec.ContinuousSchedulingPolicy Create the online table idempotent online table = fe.create online table spec=online table spec print f"Online table: {online table.name}" print f"Status: {online table.status.detailed state}" Trigger an initial sync from the offline Delta table to the online store fe.refresh online table name=f"{FEATURE DB}.user activity features" At inference time, the Feature Store SDK performs automatic feature lookups, joining the incoming request data with features from the online store before passing them to the model. ── Real-Time Feature Serving at Inference ──────────────────────────────────── import requests, json WORKSPACE URL = "https://