cd /news/machine-learning/real-time-ai-feature-engineering-wit… Β· home β€Ί topics β€Ί machine-learning β€Ί article
[ARTICLE Β· art-37515] src=dev.to β†— pub= topic=machine-learning verified=true sentiment=↑ positive

Real-Time AI Feature Engineering with Spark Structured Streaming and Databricks Feature Store

Databricks introduced real-time AI feature engineering using Spark Structured Streaming and the Databricks Feature Store, now part of Unity Catalog as Feature Engineering in Unity Catalog. The solution addresses training-serving skew by ensuring point-in-time correct features from raw Kafka events to online serving in milliseconds. The streaming pipeline reads from Kafka, computes windowed aggregations, and writes features to the Feature Store via foreachBatch.

read9 min views1 publishedJun 24, 2026

Building point-in-time correct, production-grade feature pipelines β€” from raw Kafka events to online feature serving in milliseconds, using Spark Structured Streaming and the Databricks Feature Store.

Feature engineering is where most ML projects silently fail in production. Not because the model is wrong β€” but because the features the model sees at training time are different from the features it sees at inference time. This is called training-serving skew, and it's the #1 silent killer of ML systems.

Three specific failure modes cause it:

The Databricks Feature Store β€” now part of Unity Catalog as Feature Engineering in Unity Catalog β€” solves all three by:

Understanding the data model behind the Feature Store is essential for designing correct pipelines. Here's how the entities relate:

The critical relationship: a Model Version is bound to a Training Set, which records exactly which feature tables and which point-in-time lookups were used. This is how Databricks guarantees reproducibility β€” you can always re-create the exact training data that produced any model version.


%pip install databricks-feature-engineering==0.6.0 --quiet
dbutils.library.restartPython()

from databricks.feature_engineering import FeatureEngineeringClient, FeatureLookup
from databricks.feature_engineering.entities.feature_serving_endpoint import (
    ServedEntity, EndpointCoreConfig
)
from pyspark.sql import functions as F, SparkSession
from pyspark.sql.types import (
    StructType, StructField, StringType, LongType,
    DoubleType, TimestampType, ArrayType
)
import mlflow

spark = SparkSession.builder.getOrCreate()
fe = FeatureEngineeringClient()

CATALOG       = "prod"
FEATURE_DB    = f"{CATALOG}.feature_store"
EVENTS_TABLE  = f"{CATALOG}.silver.events_clean"
KAFKA_BROKER  = "kafka-broker.internal:9092"
KAFKA_TOPIC   = "user-events"

CHECKPOINT_BASE = "abfss://checkpoints@storage.dfs.core.windows.net/features"

The streaming pipeline reads from Kafka, computes windowed aggregations using Spark's stateful streaming engine, and writes features to the Feature Store via foreachBatch

. This keeps the feature table continuously fresh.


event_schema = StructType([
    StructField("user_id",       StringType(),    False),
    StructField("event_type",    StringType(),    True),
    StructField("product_id",    StringType(),    True),
    StructField("revenue",       DoubleType(),    True),
    StructField("session_id",    StringType(),    True),
    StructField("platform",      StringType(),    True),
    StructField("event_ts",      TimestampType(), False),
])

raw_stream = (
    spark.readStream
        .format("kafka")
        .option("kafka.bootstrap.servers", KAFKA_BROKER)
        .option("subscribe", KAFKA_TOPIC)
        .option("startingOffsets", "latest")
        .option("failOnDataLoss", "false")
        .load()
        .select(
            F.from_json(F.col("value").cast("string"), event_schema).alias("data"),
            F.col("timestamp").alias("kafka_ts")
        )
        .select("data.*", "kafka_ts")
)

windowed_features = (
    raw_stream
        .withWatermark("event_ts", "10 minutes")
        .groupBy(
            F.col("user_id"),
            F.window(F.col("event_ts"), "1 hour", "15 minutes").alias("window")
        )
        .agg(
            F.count("*").alias("event_count_1h"),
            F.sum(F.when(F.col("event_type") == "purchase", F.col("revenue"))
                  .otherwise(0)).alias("revenue_1h"),
            F.countDistinct("session_id").alias("session_count_1h"),
            F.countDistinct("product_id").alias("unique_products_1h"),
            F.sum(F.when(F.col("event_type") == "purchase", 1)
                  .otherwise(0)).alias("purchase_count_1h"),
            F.first("platform").alias("last_platform"),
        )
        .withColumn("window_start", F.col("window.start"))
        .withColumn("window_end",   F.col("window.end"))
        .withColumn("feature_ts",   F.col("window.end"))   # timestamp key for PIT lookup
        .drop("window")
        .withColumn("conversion_rate_1h",
            F.when(F.col("event_count_1h") > 0,
                   F.col("purchase_count_1h") / F.col("event_count_1h"))
            .otherwise(0.0))
        .withColumn("avg_revenue_per_purchase_1h",
            F.when(F.col("purchase_count_1h") > 0,
                   F.col("revenue_1h") / F.col("purchase_count_1h"))
            .otherwise(0.0))
)

def write_to_feature_store(batch_df, batch_id):
    """
    Called on each micro-batch. Merges feature data into the Feature Store
    table using merge_on keys (user_id + feature_ts).
    """
    if batch_df.isEmpty():
        return

    fe.write_table(
        name=f"{FEATURE_DB}.user_activity_features",
        df=batch_df,
        mode="merge",             # upsert: update existing, insert new
    )
    print(f"Batch {batch_id}: wrote {batch_df.count()} feature rows")

try:
    fe.create_table(
        name=f"{FEATURE_DB}.user_activity_features",
        primary_keys=["user_id"],
        timestamp_keys=["feature_ts"],
        schema=windowed_features.schema,
        description=(
            "Real-time user activity features computed from event stream. "
            "1-hour sliding window, refreshed every 15 minutes. "
            "Primary key: user_id. Timestamp key: feature_ts (window end)."
        ),
    )
    print("Feature table created.")
except Exception:
    print("Feature table already exists β€” continuing.")

streaming_query = (
    windowed_features.writeStream
        .outputMode("update")               # update mode for stateful aggregations
        .option("checkpointLocation", f"{CHECKPOINT_BASE}/user_activity")
        .trigger(processingTime="5 minutes") # micro-batch every 5 min
        .foreachBatch(write_to_feature_store)
        .start()
)

print(f"Streaming query '{streaming_query.name}' running...")
print(f"Status: {streaming_query.status}")

This is the most critical part of the Feature Store. When creating training data, we must join labels to features at the timestamp of the label event β€” not the current time. This prevents data leakage.



labels_df = (
    spark.table(f"{CATALOG}.gold.churn_labels")
        .select(
            "user_id",
            "churn_label",                        # 0 = retained, 1 = churned
            F.col("observation_ts").alias("event_timestamp"),  # point-in-time anchor
            "experiment_split"                    # train/val/test
        )
        .filter(F.col("observation_ts") >= "2024-01-01")
)

print(f"Label rows: {labels_df.count():,}")
labels_df.show(5)


feature_lookups = [
    FeatureLookup(
        table_name=f"{FEATURE_DB}.user_activity_features",
        feature_names=[
            "event_count_1h",
            "revenue_1h",
            "session_count_1h",
            "unique_products_1h",
            "purchase_count_1h",
            "conversion_rate_1h",
            "avg_revenue_per_purchase_1h",
            "last_platform",
        ],
        lookup_key="user_id",
        timestamp_lookup_key="event_timestamp",    # ← PIT anchor
    ),

    FeatureLookup(
        table_name=f"{FEATURE_DB}.user_profile_features",
        feature_names=[
            "account_age_days",
            "lifetime_revenue",
            "preferred_category",
            "subscription_tier",
        ],
        lookup_key="user_id",
        timestamp_lookup_key="event_timestamp",    # ← PIT anchor
    ),

    FeatureLookup(
        table_name=f"{FEATURE_DB}.transaction_features",
        feature_names=[
            "purchase_count_30d",
            "purchase_count_90d",
            "avg_order_value_30d",
            "days_since_last_purchase",
            "category_diversity_score",
        ],
        lookup_key="user_id",
        timestamp_lookup_key="event_timestamp",
    ),
]

training_set = fe.create_training_set(
    df=labels_df,
    feature_lookups=feature_lookups,
    label="churn_label",
    exclude_columns=["observation_ts", "experiment_split"],
)

training_df = training_set.load_df()
print(f"Training rows: {training_df.count():,}")
print(f"Training cols: {len(training_df.columns)}")
training_df.show(3)

from sklearn.ensemble import GradientBoostingClassifier
import pandas as pd

train_pdf = (
    training_df
        .filter(F.col("experiment_split") == "train")
        .drop("experiment_split", "user_id")
        .fillna(0)
        .toPandas()
)

X_train = train_pdf.drop(columns=["churn_label"])
y_train = train_pdf["churn_label"]

model = GradientBoostingClassifier(
    n_estimators=300,
    learning_rate=0.05,
    max_depth=5,
    subsample=0.8,
    random_state=42,
)

with mlflow.start_run(run_name="churn-gbm-v1") as run:
    model.fit(X_train, y_train)

    fe.log_model(
        model=model,
        artifact_path="churn_model",
        flavor=mlflow.sklearn,
        training_set=training_set,      # ← binds model to its feature lookups
        registered_model_name=f"{CATALOG}.ml.user_churn_model",
    )
    print(f"Logged model with feature lineage. Run: {run.info.run_id}")

For real-time inference, the model needs features in milliseconds β€” not the seconds it takes to query Delta Lake. Databricks Feature Store can publish features to an online store (DynamoDB, Cosmos DB, MySQL, etc.) for low-latency reads.


from databricks.feature_engineering.entities.feature_store_online_table import (
    OnlineTable, OnlineTableSpec, TriggeredSchedulingPolicy
)

online_table_spec = OnlineTableSpec(
    primary_key_columns=["user_id"],
    source_table_full_name=f"{FEATURE_DB}.user_activity_features",
    run_triggered=OnlineTableSpec.TriggeredSchedulingPolicy(),  # sync on-demand
)

online_table = fe.create_online_table(spec=online_table_spec)
print(f"Online table: {online_table.name}")
print(f"Status:       {online_table.status.detailed_state}")

fe.refresh_online_table(name=f"{FEATURE_DB}.user_activity_features")

At inference time, the Feature Store SDK performs automatic feature lookups, joining the incoming request data with features from the online store before passing them to the model.


import requests, json

WORKSPACE_URL = "https://<workspace>.azuredatabricks.net"
TOKEN = dbutils.secrets.get("prod-scope", "databricks-token")


def predict_churn(user_ids: list) -> list:
    """
    Send only user_id β€” the serving endpoint fetches features automatically
    from the online store and runs inference.
    """
    payload = {
        "dataframe_records": [
            {"user_id": uid} for uid in user_ids
        ]
    }
    resp = requests.post(
        f"{WORKSPACE_URL}/serving-endpoints/churn-predictor/invocations",
        headers={
            "Authorization": f"Bearer {TOKEN}",
            "Content-Type":  "application/json",
        },
        data=json.dumps(payload),
        timeout=5,
    )
    resp.raise_for_status()
    return resp.json()["predictions"]

predictions = predict_churn(["u_123456", "u_789012", "u_345678"])
for uid, pred in zip(["u_123456", "u_789012", "u_345678"], predictions):
    print(f"{uid}: churn_probability = {pred:.4f}")

def get_features(user_ids: list) -> dict:
    payload = {
        "dataframe_records": [{"user_id": uid} for uid in user_ids]
    }
    resp = requests.post(
        f"{WORKSPACE_URL}/serving-endpoints/user-features-serving/invocations",
        headers={
            "Authorization": f"Bearer {TOKEN}",
            "Content-Type":  "application/json",
        },
        data=json.dumps(payload),
        timeout=5,
    )
    return resp.json()

batch_labels = spark.table(f"{CATALOG}.gold.users_to_score_today") \
    .select("user_id", F.current_timestamp().alias("event_timestamp"))

batch_predictions = fe.score_batch(
    model_uri=f"models:/{CATALOG}.ml.user_churn_model@champion",
    df=batch_labels,
    result_type="double",
)

batch_predictions.select("user_id", "prediction") \
    .write.format("delta").mode("overwrite") \
    .saveAsTable(f"{CATALOG}.gold.churn_scores_daily")

A summary of the feature tables in our pipeline, their update cadence, and their role in the ML lifecycle:

Feature Table Primary Key Timestamp Key Update Method Latency Used In
user_activity_features
user_id
feature_ts
Spark Structured Streaming ~5 min Real-time churn, recommendation
transaction_features
user_id
feature_ts
Scheduled batch (hourly) ~60 min Churn, LTV prediction
user_profile_features
user_id
updated_at
CDC from OLTP (near real-time) ~2 min All models
product_features
product_id
feature_ts
Scheduled batch (daily) ~24 hr Recommendation, search ranking
session_features
session_id
session_end_ts
Streaming (micro-batch) ~1 min Click-through rate, abandon prediction
cohort_features
cohort_id
computed_at
Weekly batch ~7 days Segmentation, A/B analysis

Freshness vs cost tradeoff:Streaming features are ~10Γ— more expensive to compute than batch features (continuous cluster vs scheduled job). Only promote a feature to streaming if your model's performance degrades meaningfully with stale data β€” validate this with an offline ablation study first.

timestamp_lookup_key

are non-negotiable for any model trained on time-series data. A missing event_timestamp

in your label table is a data leakage bug waiting to happen.fe.log_model()

is the right model logging callmlflow.sklearn.log_model()

. It records feature lineage, enabling reproducible re-training and automatic feature lookup at serving time.fe.score_batch()

Databricks β€” Feature Engineering in Unity Catalog (Overview)

πŸ”— https://docs.databricks.com/en/machine-learning/feature-store/uc/feature-tables-uc.html

Databricks β€” Create and Manage Online Tables

πŸ”— https://docs.databricks.com/en/machine-learning/feature-store/online-tables.html

Databricks β€” Point-in-Time Feature Lookups

πŸ”— https://docs.databricks.com/en/machine-learning/feature-store/time-series.html

Apache Spark β€” Structured Streaming Programming Guide

πŸ”— https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html

Apache Spark β€” Streaming Watermarks for Late Data Handling

πŸ”— https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#handling-late-data-and-watermarking

Databricks β€” Feature Store Python API Reference

πŸ”— https://docs.databricks.com/en/machine-learning/feature-store/python-api.html

Databricks β€” Score Batch with Feature Store

πŸ”— https://docs.databricks.com/en/machine-learning/feature-store/score-batch.html

"Feature Stores for ML" β€” Feast Documentation (open-source reference)

πŸ”— https://docs.feast.dev/

"Rethinking Feature Stores" β€” Chip Huyen (huyenchip.com)

πŸ”— https://huyenchip.com/2023/01/08/feature-store.html

Databricks β€” Model Serving with Automatic Feature Lookup

πŸ”— https://docs.databricks.com/en/machine-learning/model-serving/feature-store-model-serving.html

"Building Machine Learning Pipelines" β€” Hannes Hapke & Catherine Nelson (O'Reilly)

πŸ”— https://www.oreilly.com/library/view/building-machine-learning/9781492053187/

This concludes the 4-part Databricks series:

── more in #machine-learning 4 stories Β· sorted by recency
── more on @databricks 3 stories trending now
sponsored brought to you by zahid.host 4,200+ EU-deployed projects
reading about agents? ship yours in a single git push.

Run your AI side-project on zahid.host

EU-based hosting, git-push deploys, automatic HTTPS, no cold starts. Free tier with a custom domain β€” perfect for shipping the agent you just read about.

$git push zahid main
β†’ Live at https://your-agent.zahid.host βœ“
Get free account β†’ Pricing
from €0/mo Β· no card required
LIVE [news/real-time-ai-feature…] indexed:0 read:9min 2026-06-24 Β· β€”