# Real-Time AI Feature Engineering with Spark Structured Streaming and Databricks Feature Store

> Source: <https://dev.to/jubinsoni/real-time-ai-feature-engineering-with-spark-structured-streaming-and-databricks-feature-store-eii>
> Published: 2026-06-24 09:41:21+00:00

Building point-in-time correct, production-grade feature pipelines — from raw Kafka events to online feature serving in milliseconds, using Spark Structured Streaming and the Databricks Feature Store.

Feature engineering is where most ML projects silently fail in production. Not because the model is wrong — but because the **features the model sees at training time are different from the features it sees at inference time**. This is called **training-serving skew**, and it's the #1 silent killer of ML systems.

Three specific failure modes cause it:

The **Databricks Feature Store** — now part of Unity Catalog as **Feature Engineering in Unity Catalog** — solves all three by:

Understanding the data model behind the Feature Store is essential for designing correct pipelines. Here's how the entities relate:

The critical relationship: a **Model Version** is bound to a **Training Set**, which records exactly which feature tables and which point-in-time lookups were used. This is how Databricks guarantees reproducibility — you can always re-create the exact training data that produced any model version.

```
# Databricks Runtime ML 13.x+ recommended
# Feature Engineering in Unity Catalog (formerly Feature Store)

%pip install databricks-feature-engineering==0.6.0 --quiet
dbutils.library.restartPython()

from databricks.feature_engineering import FeatureEngineeringClient, FeatureLookup
from databricks.feature_engineering.entities.feature_serving_endpoint import (
    ServedEntity, EndpointCoreConfig
)
from pyspark.sql import functions as F, SparkSession
from pyspark.sql.types import (
    StructType, StructField, StringType, LongType,
    DoubleType, TimestampType, ArrayType
)
import mlflow

spark = SparkSession.builder.getOrCreate()
fe = FeatureEngineeringClient()

# Unity Catalog paths
CATALOG       = "prod"
FEATURE_DB    = f"{CATALOG}.feature_store"
EVENTS_TABLE  = f"{CATALOG}.silver.events_clean"
KAFKA_BROKER  = "kafka-broker.internal:9092"
KAFKA_TOPIC   = "user-events"

# Checkpoint locations (ADLS / S3 / GCS)
CHECKPOINT_BASE = "abfss://checkpoints@storage.dfs.core.windows.net/features"
```

The streaming pipeline reads from Kafka, computes windowed aggregations using Spark's stateful streaming engine, and writes features to the Feature Store via `foreachBatch`

. This keeps the feature table continuously fresh.

```
# ── Streaming Feature Pipeline ────────────────────────────────────────────────

# Step 1: Define the raw event schema from Kafka
event_schema = StructType([
    StructField("user_id",       StringType(),    False),
    StructField("event_type",    StringType(),    True),
    StructField("product_id",    StringType(),    True),
    StructField("revenue",       DoubleType(),    True),
    StructField("session_id",    StringType(),    True),
    StructField("platform",      StringType(),    True),
    StructField("event_ts",      TimestampType(), False),
])

# Step 2: Read from Kafka
raw_stream = (
    spark.readStream
        .format("kafka")
        .option("kafka.bootstrap.servers", KAFKA_BROKER)
        .option("subscribe", KAFKA_TOPIC)
        .option("startingOffsets", "latest")
        .option("failOnDataLoss", "false")
        .load()
        .select(
            F.from_json(F.col("value").cast("string"), event_schema).alias("data"),
            F.col("timestamp").alias("kafka_ts")
        )
        .select("data.*", "kafka_ts")
)

# Step 3: Apply watermark and compute windowed features
# Watermark: tolerate up to 10 minutes of late data
windowed_features = (
    raw_stream
        .withWatermark("event_ts", "10 minutes")
        .groupBy(
            F.col("user_id"),
            F.window(F.col("event_ts"), "1 hour", "15 minutes").alias("window")
        )
        .agg(
            F.count("*").alias("event_count_1h"),
            F.sum(F.when(F.col("event_type") == "purchase", F.col("revenue"))
                  .otherwise(0)).alias("revenue_1h"),
            F.countDistinct("session_id").alias("session_count_1h"),
            F.countDistinct("product_id").alias("unique_products_1h"),
            F.sum(F.when(F.col("event_type") == "purchase", 1)
                  .otherwise(0)).alias("purchase_count_1h"),
            F.first("platform").alias("last_platform"),
        )
        # Flatten window struct to scalar columns
        .withColumn("window_start", F.col("window.start"))
        .withColumn("window_end",   F.col("window.end"))
        .withColumn("feature_ts",   F.col("window.end"))   # timestamp key for PIT lookup
        .drop("window")
        # Derived features
        .withColumn("conversion_rate_1h",
            F.when(F.col("event_count_1h") > 0,
                   F.col("purchase_count_1h") / F.col("event_count_1h"))
            .otherwise(0.0))
        .withColumn("avg_revenue_per_purchase_1h",
            F.when(F.col("purchase_count_1h") > 0,
                   F.col("revenue_1h") / F.col("purchase_count_1h"))
            .otherwise(0.0))
)

# Step 4: Write to Feature Store via foreachBatch
# foreachBatch gives us transactional writes per micro-batch
def write_to_feature_store(batch_df, batch_id):
    """
    Called on each micro-batch. Merges feature data into the Feature Store
    table using merge_on keys (user_id + feature_ts).
    """
    if batch_df.isEmpty():
        return

    fe.write_table(
        name=f"{FEATURE_DB}.user_activity_features",
        df=batch_df,
        mode="merge",             # upsert: update existing, insert new
    )
    print(f"Batch {batch_id}: wrote {batch_df.count()} feature rows")

# Step 5: Create the feature table (idempotent — safe to re-run)
try:
    fe.create_table(
        name=f"{FEATURE_DB}.user_activity_features",
        primary_keys=["user_id"],
        timestamp_keys=["feature_ts"],
        schema=windowed_features.schema,
        description=(
            "Real-time user activity features computed from event stream. "
            "1-hour sliding window, refreshed every 15 minutes. "
            "Primary key: user_id. Timestamp key: feature_ts (window end)."
        ),
    )
    print("Feature table created.")
except Exception:
    print("Feature table already exists — continuing.")

# Step 6: Launch the streaming query
streaming_query = (
    windowed_features.writeStream
        .outputMode("update")               # update mode for stateful aggregations
        .option("checkpointLocation", f"{CHECKPOINT_BASE}/user_activity")
        .trigger(processingTime="5 minutes") # micro-batch every 5 min
        .foreachBatch(write_to_feature_store)
        .start()
)

print(f"Streaming query '{streaming_query.name}' running...")
print(f"Status: {streaming_query.status}")
```

This is the most critical part of the Feature Store. When creating training data, we must join labels to features **at the timestamp of the label event** — not the current time. This prevents data leakage.

```
# ── Point-in-Time Correct Training Dataset ────────────────────────────────────

# Step 1: Load the label dataset
# Each row = one prediction target event, with the exact timestamp
# at which a model would have needed to make a prediction.

labels_df = (
    spark.table(f"{CATALOG}.gold.churn_labels")
        .select(
            "user_id",
            "churn_label",                        # 0 = retained, 1 = churned
            F.col("observation_ts").alias("event_timestamp"),  # point-in-time anchor
            "experiment_split"                    # train/val/test
        )
        .filter(F.col("observation_ts") >= "2024-01-01")
)

print(f"Label rows: {labels_df.count():,}")
labels_df.show(5)
# +----------+-----------+---------------------+-----------------+
# | user_id  |churn_label| event_timestamp     | experiment_split|
# +----------+-----------+---------------------+-----------------+
# | u_123456 | 0         | 2024-03-15 14:22:00 | train           |
# | u_789012 | 1         | 2024-03-15 18:45:00 | train           |

# Step 2: Define feature lookups
# as_of_timestamp=None → use the label's event_timestamp (point-in-time)
# Databricks will join each label row to the feature values
# that were valid at event_timestamp — not the latest values.

feature_lookups = [
    # User activity features — 1h window features from the streaming pipeline
    FeatureLookup(
        table_name=f"{FEATURE_DB}.user_activity_features",
        feature_names=[
            "event_count_1h",
            "revenue_1h",
            "session_count_1h",
            "unique_products_1h",
            "purchase_count_1h",
            "conversion_rate_1h",
            "avg_revenue_per_purchase_1h",
            "last_platform",
        ],
        lookup_key="user_id",
        timestamp_lookup_key="event_timestamp",    # ← PIT anchor
    ),

    # User profile features — slower-changing, from batch pipeline
    FeatureLookup(
        table_name=f"{FEATURE_DB}.user_profile_features",
        feature_names=[
            "account_age_days",
            "lifetime_revenue",
            "preferred_category",
            "subscription_tier",
        ],
        lookup_key="user_id",
        timestamp_lookup_key="event_timestamp",    # ← PIT anchor
    ),

    # Transaction aggregates — 30d and 90d rolling windows
    FeatureLookup(
        table_name=f"{FEATURE_DB}.transaction_features",
        feature_names=[
            "purchase_count_30d",
            "purchase_count_90d",
            "avg_order_value_30d",
            "days_since_last_purchase",
            "category_diversity_score",
        ],
        lookup_key="user_id",
        timestamp_lookup_key="event_timestamp",
    ),
]

# Step 3: Create training dataset (Feature Store handles the PIT join)
training_set = fe.create_training_set(
    df=labels_df,
    feature_lookups=feature_lookups,
    label="churn_label",
    exclude_columns=["observation_ts", "experiment_split"],
)

# The returned DataFrame has features + labels, PIT-correct
training_df = training_set.load_df()
print(f"Training rows: {training_df.count():,}")
print(f"Training cols: {len(training_df.columns)}")
training_df.show(3)

# Step 4: Train model and log via Feature Store (preserves lineage!)
from sklearn.ensemble import GradientBoostingClassifier
import pandas as pd

train_pdf = (
    training_df
        .filter(F.col("experiment_split") == "train")
        .drop("experiment_split", "user_id")
        .fillna(0)
        .toPandas()
)

X_train = train_pdf.drop(columns=["churn_label"])
y_train = train_pdf["churn_label"]

model = GradientBoostingClassifier(
    n_estimators=300,
    learning_rate=0.05,
    max_depth=5,
    subsample=0.8,
    random_state=42,
)

with mlflow.start_run(run_name="churn-gbm-v1") as run:
    model.fit(X_train, y_train)

    # Log model via Feature Store — this records the feature lineage
    fe.log_model(
        model=model,
        artifact_path="churn_model",
        flavor=mlflow.sklearn,
        training_set=training_set,      # ← binds model to its feature lookups
        registered_model_name=f"{CATALOG}.ml.user_churn_model",
    )
    print(f"Logged model with feature lineage. Run: {run.info.run_id}")
```

For real-time inference, the model needs features in milliseconds — not the seconds it takes to query Delta Lake. Databricks Feature Store can publish features to an **online store** (DynamoDB, Cosmos DB, MySQL, etc.) for low-latency reads.

```
# ── Publish Features to Online Store ─────────────────────────────────────────
# Online stores are configured per feature table.
# Here we publish user_activity_features to DynamoDB for <5ms lookups.

from databricks.feature_engineering.entities.feature_store_online_table import (
    OnlineTable, OnlineTableSpec, TriggeredSchedulingPolicy
)

# Create an online table spec (backed by a serverless real-time compute layer)
online_table_spec = OnlineTableSpec(
    primary_key_columns=["user_id"],
    source_table_full_name=f"{FEATURE_DB}.user_activity_features",
    run_triggered=OnlineTableSpec.TriggeredSchedulingPolicy(),  # sync on-demand
    # OR for continuous sync:
    # run_continuous=OnlineTableSpec.ContinuousSchedulingPolicy()
)

# Create the online table (idempotent)
online_table = fe.create_online_table(spec=online_table_spec)
print(f"Online table: {online_table.name}")
print(f"Status:       {online_table.status.detailed_state}")

# Trigger an initial sync from the offline Delta table to the online store
fe.refresh_online_table(name=f"{FEATURE_DB}.user_activity_features")
```

At inference time, the Feature Store SDK performs automatic feature lookups, joining the incoming request data with features from the online store before passing them to the model.

```
# ── Real-Time Feature Serving at Inference ────────────────────────────────────

import requests, json

WORKSPACE_URL = "https://<workspace>.azuredatabricks.net"
TOKEN = dbutils.secrets.get("prod-scope", "databricks-token")

# Option 1: Model Serving with automatic feature lookup
# When you logged the model with fe.log_model(), Databricks knows which
# features to fetch. You only send the lookup key (user_id) at inference time.

def predict_churn(user_ids: list) -> list:
    """
    Send only user_id — the serving endpoint fetches features automatically
    from the online store and runs inference.
    """
    payload = {
        "dataframe_records": [
            {"user_id": uid} for uid in user_ids
        ]
    }
    resp = requests.post(
        f"{WORKSPACE_URL}/serving-endpoints/churn-predictor/invocations",
        headers={
            "Authorization": f"Bearer {TOKEN}",
            "Content-Type":  "application/json",
        },
        data=json.dumps(payload),
        timeout=5,
    )
    resp.raise_for_status()
    return resp.json()["predictions"]

# Example usage
predictions = predict_churn(["u_123456", "u_789012", "u_345678"])
for uid, pred in zip(["u_123456", "u_789012", "u_345678"], predictions):
    print(f"{uid}: churn_probability = {pred:.4f}")
# u_123456: churn_probability = 0.0821
# u_789012: churn_probability = 0.7643
# u_345678: churn_probability = 0.1209

# Option 2: Direct feature lookup via the Feature Serving endpoint
# Useful when you want raw features without running inference
def get_features(user_ids: list) -> dict:
    payload = {
        "dataframe_records": [{"user_id": uid} for uid in user_ids]
    }
    resp = requests.post(
        f"{WORKSPACE_URL}/serving-endpoints/user-features-serving/invocations",
        headers={
            "Authorization": f"Bearer {TOKEN}",
            "Content-Type":  "application/json",
        },
        data=json.dumps(payload),
        timeout=5,
    )
    return resp.json()

# Option 3: Batch scoring (offline) — uses Delta offline store
# No online store needed; reads directly from the feature table with PIT lookup
batch_labels = spark.table(f"{CATALOG}.gold.users_to_score_today") \
    .select("user_id", F.current_timestamp().alias("event_timestamp"))

batch_predictions = fe.score_batch(
    model_uri=f"models:/{CATALOG}.ml.user_churn_model@champion",
    df=batch_labels,
    result_type="double",
)

batch_predictions.select("user_id", "prediction") \
    .write.format("delta").mode("overwrite") \
    .saveAsTable(f"{CATALOG}.gold.churn_scores_daily")
```

A summary of the feature tables in our pipeline, their update cadence, and their role in the ML lifecycle:

| Feature Table | Primary Key | Timestamp Key | Update Method | Latency | Used In |
|---|---|---|---|---|---|
`user_activity_features` |
`user_id` |
`feature_ts` |
Spark Structured Streaming | ~5 min | Real-time churn, recommendation |
`transaction_features` |
`user_id` |
`feature_ts` |
Scheduled batch (hourly) | ~60 min | Churn, LTV prediction |
`user_profile_features` |
`user_id` |
`updated_at` |
CDC from OLTP (near real-time) | ~2 min | All models |
`product_features` |
`product_id` |
`feature_ts` |
Scheduled batch (daily) | ~24 hr | Recommendation, search ranking |
`session_features` |
`session_id` |
`session_end_ts` |
Streaming (micro-batch) | ~1 min | Click-through rate, abandon prediction |
`cohort_features` |
`cohort_id` |
`computed_at` |
Weekly batch | ~7 days | Segmentation, A/B analysis |

Freshness vs cost tradeoff:Streaming features are ~10× more expensive to compute than batch features (continuous cluster vs scheduled job). Only promote a feature to streaming if your model's performance degrades meaningfully with stale data — validate this with an offline ablation study first.

`timestamp_lookup_key`

are non-negotiable for any model trained on time-series data. A missing `event_timestamp`

in your label table is a data leakage bug waiting to happen.`fe.log_model()`

is the right model logging call`mlflow.sklearn.log_model()`

. It records feature lineage, enabling reproducible re-training and automatic feature lookup at serving time.`fe.score_batch()`

**Databricks — Feature Engineering in Unity Catalog (Overview)**

🔗 [https://docs.databricks.com/en/machine-learning/feature-store/uc/feature-tables-uc.html](https://docs.databricks.com/en/machine-learning/feature-store/uc/feature-tables-uc.html)

**Databricks — Create and Manage Online Tables**

🔗 [https://docs.databricks.com/en/machine-learning/feature-store/online-tables.html](https://docs.databricks.com/en/machine-learning/feature-store/online-tables.html)

**Databricks — Point-in-Time Feature Lookups**

🔗 [https://docs.databricks.com/en/machine-learning/feature-store/time-series.html](https://docs.databricks.com/en/machine-learning/feature-store/time-series.html)

**Apache Spark — Structured Streaming Programming Guide**

🔗 [https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html)

**Apache Spark — Streaming Watermarks for Late Data Handling**

🔗 [https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#handling-late-data-and-watermarking](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#handling-late-data-and-watermarking)

**Databricks — Feature Store Python API Reference**

🔗 [https://docs.databricks.com/en/machine-learning/feature-store/python-api.html](https://docs.databricks.com/en/machine-learning/feature-store/python-api.html)

**Databricks — Score Batch with Feature Store**

🔗 [https://docs.databricks.com/en/machine-learning/feature-store/score-batch.html](https://docs.databricks.com/en/machine-learning/feature-store/score-batch.html)

**"Feature Stores for ML" — Feast Documentation (open-source reference)**

🔗 [https://docs.feast.dev/](https://docs.feast.dev/)

**"Rethinking Feature Stores" — Chip Huyen (huyenchip.com)**

🔗 [https://huyenchip.com/2023/01/08/feature-store.html](https://huyenchip.com/2023/01/08/feature-store.html)

**Databricks — Model Serving with Automatic Feature Lookup**

🔗 [https://docs.databricks.com/en/machine-learning/model-serving/feature-store-model-serving.html](https://docs.databricks.com/en/machine-learning/model-serving/feature-store-model-serving.html)

**"Building Machine Learning Pipelines" — Hannes Hapke & Catherine Nelson (O'Reilly)**

🔗 [https://www.oreilly.com/library/view/building-machine-learning/9781492053187/](https://www.oreilly.com/library/view/building-machine-learning/9781492053187/)

*This concludes the 4-part Databricks series:*
