Building point-in-time correct, production-grade feature pipelines β from raw Kafka events to online feature serving in milliseconds, using Spark Structured Streaming and the Databricks Feature Store.
Feature engineering is where most ML projects silently fail in production. Not because the model is wrong β but because the features the model sees at training time are different from the features it sees at inference time. This is called training-serving skew, and it's the #1 silent killer of ML systems.
Three specific failure modes cause it:
The Databricks Feature Store β now part of Unity Catalog as Feature Engineering in Unity Catalog β solves all three by:
Understanding the data model behind the Feature Store is essential for designing correct pipelines. Here's how the entities relate:
The critical relationship: a Model Version is bound to a Training Set, which records exactly which feature tables and which point-in-time lookups were used. This is how Databricks guarantees reproducibility β you can always re-create the exact training data that produced any model version.
%pip install databricks-feature-engineering==0.6.0 --quiet
dbutils.library.restartPython()
from databricks.feature_engineering import FeatureEngineeringClient, FeatureLookup
from databricks.feature_engineering.entities.feature_serving_endpoint import (
ServedEntity, EndpointCoreConfig
)
from pyspark.sql import functions as F, SparkSession
from pyspark.sql.types import (
StructType, StructField, StringType, LongType,
DoubleType, TimestampType, ArrayType
)
import mlflow
spark = SparkSession.builder.getOrCreate()
fe = FeatureEngineeringClient()
CATALOG = "prod"
FEATURE_DB = f"{CATALOG}.feature_store"
EVENTS_TABLE = f"{CATALOG}.silver.events_clean"
KAFKA_BROKER = "kafka-broker.internal:9092"
KAFKA_TOPIC = "user-events"
CHECKPOINT_BASE = "abfss://checkpoints@storage.dfs.core.windows.net/features"
The streaming pipeline reads from Kafka, computes windowed aggregations using Spark's stateful streaming engine, and writes features to the Feature Store via foreachBatch
. This keeps the feature table continuously fresh.
event_schema = StructType([
StructField("user_id", StringType(), False),
StructField("event_type", StringType(), True),
StructField("product_id", StringType(), True),
StructField("revenue", DoubleType(), True),
StructField("session_id", StringType(), True),
StructField("platform", StringType(), True),
StructField("event_ts", TimestampType(), False),
])
raw_stream = (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", KAFKA_BROKER)
.option("subscribe", KAFKA_TOPIC)
.option("startingOffsets", "latest")
.option("failOnDataLoss", "false")
.load()
.select(
F.from_json(F.col("value").cast("string"), event_schema).alias("data"),
F.col("timestamp").alias("kafka_ts")
)
.select("data.*", "kafka_ts")
)
windowed_features = (
raw_stream
.withWatermark("event_ts", "10 minutes")
.groupBy(
F.col("user_id"),
F.window(F.col("event_ts"), "1 hour", "15 minutes").alias("window")
)
.agg(
F.count("*").alias("event_count_1h"),
F.sum(F.when(F.col("event_type") == "purchase", F.col("revenue"))
.otherwise(0)).alias("revenue_1h"),
F.countDistinct("session_id").alias("session_count_1h"),
F.countDistinct("product_id").alias("unique_products_1h"),
F.sum(F.when(F.col("event_type") == "purchase", 1)
.otherwise(0)).alias("purchase_count_1h"),
F.first("platform").alias("last_platform"),
)
.withColumn("window_start", F.col("window.start"))
.withColumn("window_end", F.col("window.end"))
.withColumn("feature_ts", F.col("window.end")) # timestamp key for PIT lookup
.drop("window")
.withColumn("conversion_rate_1h",
F.when(F.col("event_count_1h") > 0,
F.col("purchase_count_1h") / F.col("event_count_1h"))
.otherwise(0.0))
.withColumn("avg_revenue_per_purchase_1h",
F.when(F.col("purchase_count_1h") > 0,
F.col("revenue_1h") / F.col("purchase_count_1h"))
.otherwise(0.0))
)
def write_to_feature_store(batch_df, batch_id):
"""
Called on each micro-batch. Merges feature data into the Feature Store
table using merge_on keys (user_id + feature_ts).
"""
if batch_df.isEmpty():
return
fe.write_table(
name=f"{FEATURE_DB}.user_activity_features",
df=batch_df,
mode="merge", # upsert: update existing, insert new
)
print(f"Batch {batch_id}: wrote {batch_df.count()} feature rows")
try:
fe.create_table(
name=f"{FEATURE_DB}.user_activity_features",
primary_keys=["user_id"],
timestamp_keys=["feature_ts"],
schema=windowed_features.schema,
description=(
"Real-time user activity features computed from event stream. "
"1-hour sliding window, refreshed every 15 minutes. "
"Primary key: user_id. Timestamp key: feature_ts (window end)."
),
)
print("Feature table created.")
except Exception:
print("Feature table already exists β continuing.")
streaming_query = (
windowed_features.writeStream
.outputMode("update") # update mode for stateful aggregations
.option("checkpointLocation", f"{CHECKPOINT_BASE}/user_activity")
.trigger(processingTime="5 minutes") # micro-batch every 5 min
.foreachBatch(write_to_feature_store)
.start()
)
print(f"Streaming query '{streaming_query.name}' running...")
print(f"Status: {streaming_query.status}")
This is the most critical part of the Feature Store. When creating training data, we must join labels to features at the timestamp of the label event β not the current time. This prevents data leakage.
labels_df = (
spark.table(f"{CATALOG}.gold.churn_labels")
.select(
"user_id",
"churn_label", # 0 = retained, 1 = churned
F.col("observation_ts").alias("event_timestamp"), # point-in-time anchor
"experiment_split" # train/val/test
)
.filter(F.col("observation_ts") >= "2024-01-01")
)
print(f"Label rows: {labels_df.count():,}")
labels_df.show(5)
feature_lookups = [
FeatureLookup(
table_name=f"{FEATURE_DB}.user_activity_features",
feature_names=[
"event_count_1h",
"revenue_1h",
"session_count_1h",
"unique_products_1h",
"purchase_count_1h",
"conversion_rate_1h",
"avg_revenue_per_purchase_1h",
"last_platform",
],
lookup_key="user_id",
timestamp_lookup_key="event_timestamp", # β PIT anchor
),
FeatureLookup(
table_name=f"{FEATURE_DB}.user_profile_features",
feature_names=[
"account_age_days",
"lifetime_revenue",
"preferred_category",
"subscription_tier",
],
lookup_key="user_id",
timestamp_lookup_key="event_timestamp", # β PIT anchor
),
FeatureLookup(
table_name=f"{FEATURE_DB}.transaction_features",
feature_names=[
"purchase_count_30d",
"purchase_count_90d",
"avg_order_value_30d",
"days_since_last_purchase",
"category_diversity_score",
],
lookup_key="user_id",
timestamp_lookup_key="event_timestamp",
),
]
training_set = fe.create_training_set(
df=labels_df,
feature_lookups=feature_lookups,
label="churn_label",
exclude_columns=["observation_ts", "experiment_split"],
)
training_df = training_set.load_df()
print(f"Training rows: {training_df.count():,}")
print(f"Training cols: {len(training_df.columns)}")
training_df.show(3)
from sklearn.ensemble import GradientBoostingClassifier
import pandas as pd
train_pdf = (
training_df
.filter(F.col("experiment_split") == "train")
.drop("experiment_split", "user_id")
.fillna(0)
.toPandas()
)
X_train = train_pdf.drop(columns=["churn_label"])
y_train = train_pdf["churn_label"]
model = GradientBoostingClassifier(
n_estimators=300,
learning_rate=0.05,
max_depth=5,
subsample=0.8,
random_state=42,
)
with mlflow.start_run(run_name="churn-gbm-v1") as run:
model.fit(X_train, y_train)
fe.log_model(
model=model,
artifact_path="churn_model",
flavor=mlflow.sklearn,
training_set=training_set, # β binds model to its feature lookups
registered_model_name=f"{CATALOG}.ml.user_churn_model",
)
print(f"Logged model with feature lineage. Run: {run.info.run_id}")
For real-time inference, the model needs features in milliseconds β not the seconds it takes to query Delta Lake. Databricks Feature Store can publish features to an online store (DynamoDB, Cosmos DB, MySQL, etc.) for low-latency reads.
from databricks.feature_engineering.entities.feature_store_online_table import (
OnlineTable, OnlineTableSpec, TriggeredSchedulingPolicy
)
online_table_spec = OnlineTableSpec(
primary_key_columns=["user_id"],
source_table_full_name=f"{FEATURE_DB}.user_activity_features",
run_triggered=OnlineTableSpec.TriggeredSchedulingPolicy(), # sync on-demand
)
online_table = fe.create_online_table(spec=online_table_spec)
print(f"Online table: {online_table.name}")
print(f"Status: {online_table.status.detailed_state}")
fe.refresh_online_table(name=f"{FEATURE_DB}.user_activity_features")
At inference time, the Feature Store SDK performs automatic feature lookups, joining the incoming request data with features from the online store before passing them to the model.
import requests, json
WORKSPACE_URL = "https://<workspace>.azuredatabricks.net"
TOKEN = dbutils.secrets.get("prod-scope", "databricks-token")
def predict_churn(user_ids: list) -> list:
"""
Send only user_id β the serving endpoint fetches features automatically
from the online store and runs inference.
"""
payload = {
"dataframe_records": [
{"user_id": uid} for uid in user_ids
]
}
resp = requests.post(
f"{WORKSPACE_URL}/serving-endpoints/churn-predictor/invocations",
headers={
"Authorization": f"Bearer {TOKEN}",
"Content-Type": "application/json",
},
data=json.dumps(payload),
timeout=5,
)
resp.raise_for_status()
return resp.json()["predictions"]
predictions = predict_churn(["u_123456", "u_789012", "u_345678"])
for uid, pred in zip(["u_123456", "u_789012", "u_345678"], predictions):
print(f"{uid}: churn_probability = {pred:.4f}")
def get_features(user_ids: list) -> dict:
payload = {
"dataframe_records": [{"user_id": uid} for uid in user_ids]
}
resp = requests.post(
f"{WORKSPACE_URL}/serving-endpoints/user-features-serving/invocations",
headers={
"Authorization": f"Bearer {TOKEN}",
"Content-Type": "application/json",
},
data=json.dumps(payload),
timeout=5,
)
return resp.json()
batch_labels = spark.table(f"{CATALOG}.gold.users_to_score_today") \
.select("user_id", F.current_timestamp().alias("event_timestamp"))
batch_predictions = fe.score_batch(
model_uri=f"models:/{CATALOG}.ml.user_churn_model@champion",
df=batch_labels,
result_type="double",
)
batch_predictions.select("user_id", "prediction") \
.write.format("delta").mode("overwrite") \
.saveAsTable(f"{CATALOG}.gold.churn_scores_daily")
A summary of the feature tables in our pipeline, their update cadence, and their role in the ML lifecycle:
| Feature Table | Primary Key | Timestamp Key | Update Method | Latency | Used In |
|---|---|---|---|---|---|
user_activity_features |
|||||
user_id |
|||||
feature_ts |
|||||
| Spark Structured Streaming | ~5 min | Real-time churn, recommendation | |||
transaction_features |
|||||
user_id |
|||||
feature_ts |
|||||
| Scheduled batch (hourly) | ~60 min | Churn, LTV prediction | |||
user_profile_features |
|||||
user_id |
|||||
updated_at |
|||||
| CDC from OLTP (near real-time) | ~2 min | All models | |||
product_features |
|||||
product_id |
|||||
feature_ts |
|||||
| Scheduled batch (daily) | ~24 hr | Recommendation, search ranking | |||
session_features |
|||||
session_id |
|||||
session_end_ts |
|||||
| Streaming (micro-batch) | ~1 min | Click-through rate, abandon prediction | |||
cohort_features |
|||||
cohort_id |
|||||
computed_at |
|||||
| Weekly batch | ~7 days | Segmentation, A/B analysis |
Freshness vs cost tradeoff:Streaming features are ~10Γ more expensive to compute than batch features (continuous cluster vs scheduled job). Only promote a feature to streaming if your model's performance degrades meaningfully with stale data β validate this with an offline ablation study first.
timestamp_lookup_key
are non-negotiable for any model trained on time-series data. A missing event_timestamp
in your label table is a data leakage bug waiting to happen.fe.log_model()
is the right model logging callmlflow.sklearn.log_model()
. It records feature lineage, enabling reproducible re-training and automatic feature lookup at serving time.fe.score_batch()
Databricks β Feature Engineering in Unity Catalog (Overview)
π https://docs.databricks.com/en/machine-learning/feature-store/uc/feature-tables-uc.html
Databricks β Create and Manage Online Tables
π https://docs.databricks.com/en/machine-learning/feature-store/online-tables.html
Databricks β Point-in-Time Feature Lookups
π https://docs.databricks.com/en/machine-learning/feature-store/time-series.html
Apache Spark β Structured Streaming Programming Guide
π https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html
Apache Spark β Streaming Watermarks for Late Data Handling
Databricks β Feature Store Python API Reference
π https://docs.databricks.com/en/machine-learning/feature-store/python-api.html
Databricks β Score Batch with Feature Store
π https://docs.databricks.com/en/machine-learning/feature-store/score-batch.html
"Feature Stores for ML" β Feast Documentation (open-source reference)
"Rethinking Feature Stores" β Chip Huyen (huyenchip.com)
π https://huyenchip.com/2023/01/08/feature-store.html
Databricks β Model Serving with Automatic Feature Lookup
π https://docs.databricks.com/en/machine-learning/model-serving/feature-store-model-serving.html
"Building Machine Learning Pipelines" β Hannes Hapke & Catherine Nelson (O'Reilly)
π https://www.oreilly.com/library/view/building-machine-learning/9781492053187/
This concludes the 4-part Databricks series: