Raw data doesn't win model competitions. Features do. And when your raw data is tens of billions of rows sitting across multiple sources, you can't afford to run pandas in a notebook and call it a day.
In this tutorial I'll walk through building a production-grade feature engineering pipeline on Azure Databricks using:
The use case is a customer churn prediction system, but the patterns apply to any ML feature pipeline.
The pipeline follows the Medallion Architecture — a layered approach where data gets progressively cleaner and more feature-ready as it moves from Bronze to Silver to Gold. MLflow sits across all three layers tracking every run.
| Layer | Delta Table | What happens here | Typical latency |
|---|---|---|---|
| Bronze | |||
churn.bronze.events |
|||
| Raw ingest, no transforms, append only | Minutes | ||
| Silver | |||
churn.silver.customers |
|||
| Deduplication, null handling, schema enforcement | Minutes | ||
| Gold | |||
churn.gold.features |
|||
| Aggregations, window functions, encoding | Minutes to hours | ||
| MLflow Run | |||
| N/A | Training, metric logging, artifact storage | Hours | |
| Registry | |||
| N/A | Versioned model store, stage promotion | On demand |
The Bronze layer is append-only. No transforms. No business logic. Just get the data in and preserve it exactly as it arrived so you can always replay from source.
from pyspark.sql import SparkSession
from pyspark.sql.functions import current_timestamp, lit
from delta.tables import DeltaTable
spark = SparkSession.builder.getOrCreate()
raw_events = spark.read.format('json').load('abfss://raw@yourstorage.dfs.core.windows.net/events/')
bronze_df = raw_events.withColumn('_ingested_at', current_timestamp()) \
.withColumn('_source', lit('events_api'))
bronze_df.write \
.format('delta') \
.mode('append') \
.option('mergeSchema', 'true') \
.saveAsTable('churn.bronze.events')
print(f"Bronze rows written: {bronze_df.count()}")
Why append-only?If your downstream pipeline produces bad features, you want to replay from Bronze without re-ingesting from source. Overwriting Bronze breaks that ability.
Silver is where you enforce schema, handle nulls, deduplicate, and standardize. Think of it as your canonical, trusted dataset.
from pyspark.sql.functions import col, to_timestamp, when, trim, upper
from delta.tables import DeltaTable
bronze = spark.table('churn.bronze.events')
silver_df = bronze \
.filter(col('customer_id').isNotNull()) \
.filter(col('event_type').isNotNull()) \
.dropDuplicates(['customer_id', 'event_id']) \
.withColumn('event_ts', to_timestamp(col('event_timestamp'))) \
.withColumn('event_type', upper(trim(col('event_type')))) \
.withColumn('country_code', when(col('country').isNull(), lit('UNKNOWN'))
.otherwise(upper(col('country')))) \
.select(
'customer_id',
'event_id',
'event_type',
'event_ts',
'country_code',
'product_id',
'session_id',
'_ingested_at',
)
if DeltaTable.isDeltaTable(spark, 'churn.silver.customers'):
silver_table = DeltaTable.forName(spark, 'churn.silver.customers')
silver_table.alias('tgt').merge(
silver_df.alias('src'),
'tgt.customer_id = src.customer_id AND tgt.event_id = src.event_id'
).whenNotMatchedInsertAll().execute()
else:
silver_df.write.format('delta').saveAsTable('churn.silver.customers')
print(f"Silver table updated. Total rows: {spark.table('churn.silver.customers').count()}")
This is the heart of the pipeline. We compute aggregated, windowed, and encoded features that the model will actually train on.
from pyspark.sql.functions import (
col, count, countDistinct, sum as _sum,
avg, datediff, max as _max, min as _min,
current_date, expr, when
)
from pyspark.sql.window import Window
silver = spark.table('churn.silver.customers')
today = current_date()
agg_features = silver \
.withColumn('days_since_event', datediff(today, col('event_ts'))) \
.groupBy('customer_id') \
.agg(
count('event_id') .alias('total_events'),
countDistinct('session_id') .alias('total_sessions'),
countDistinct('product_id') .alias('distinct_products'),
_sum(when(col('days_since_event') <= 30, 1).otherwise(0)) .alias('events_last_30d'),
_sum(when(col('days_since_event') <= 90, 1).otherwise(0)) .alias('events_last_90d'),
_max('event_ts') .alias('last_event_ts'),
_min('event_ts') .alias('first_event_ts'),
) \
.withColumn('days_since_last_event', datediff(today, col('last_event_ts'))) \
.withColumn('customer_tenure_days', datediff(today, col('first_event_ts'))) \
.withColumn('avg_events_per_day',
col('total_events') / (col('customer_tenure_days') + 1))
feature_df = agg_features \
.withColumn('recency_tier',
when(col('days_since_last_event') <= 7, lit(3)) # active
.when(col('days_since_last_event') <= 30, lit(2)) # at risk
.otherwise(lit(1)) # churned
) \
.withColumn('engagement_score',
(col('events_last_30d') * 0.6 + col('events_last_90d') * 0.4) /
(col('customer_tenure_days') + 1)
)
feature_df \
.withColumn('feature_date', current_date()) \
.write \
.format('delta') \
.mode('overwrite') \
.option('replaceWhere', f"feature_date = '{today}'") \
.saveAsTable('churn.gold.features')
print(f"Gold features written: {feature_df.count()} customers")
With features in Gold, we hand off to MLflow to train, track, and register the model. Notice we log the Delta table version so we can always reproduce exactly which feature snapshot trained which model.
import mlflow
import mlflow.sklearn
from mlflow.models.signature import infer_signature
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import roc_auc_score, f1_score
import pandas as pd
mlflow.set_experiment('/churn-prediction/feature-pipeline')
gold_table = DeltaTable.forName(spark, 'churn.gold.features')
delta_version = gold_table.history(1).select('version').collect()[0][0]
features_pdf = spark.table('churn.gold.features').toPandas()
FEATURE_COLS = [
'total_events', 'total_sessions', 'distinct_products',
'events_last_30d', 'events_last_90d', 'days_since_last_event',
'customer_tenure_days', 'avg_events_per_day',
'recency_tier', 'engagement_score',
]
TARGET = 'churned'
X = features_pdf[FEATURE_COLS]
y = features_pdf[TARGET]
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
with mlflow.start_run(run_name=f'gbm-features-v{delta_version}') as run:
params = {'n_estimators': 200, 'max_depth': 5, 'learning_rate': 0.05}
model = GradientBoostingClassifier(**params, random_state=42)
model.fit(X_train, y_train)
y_pred = model.predict(X_test)
y_prob = model.predict_proba(X_test)[:, 1]
mlflow.log_params(params)
mlflow.log_metric('roc_auc', roc_auc_score(y_test, y_prob))
mlflow.log_metric('f1_score', f1_score(y_test, y_pred))
mlflow.log_param('delta_feature_version', delta_version)
mlflow.log_param('feature_columns', FEATURE_COLS)
mlflow.log_param('training_rows', len(X_train))
signature = infer_signature(X_train, y_pred)
mlflow.sklearn.log_model(
model,
artifact_path='churn-gbm',
signature=signature,
registered_model_name='churn-prediction-gbm',
)
print(f"Run ID: {run.info.run_id}")
print(f"ROC-AUC: {roc_auc_score(y_test, y_prob):.4f}")
print(f"Feature Delta version logged: {delta_version}")
One of the best things about Delta Lake is time travel. If a model behaves unexpectedly in production, you can reload the exact feature snapshot it was trained on.
import mlflow
run = mlflow.get_run('your-run-id-here')
feature_version = int(run.data.params['delta_feature_version'])
historical_features = spark.read \
.format('delta') \
.option('versionAsOf', feature_version) \
.table('churn.gold.features')
print(f"Loaded feature snapshot from Delta version {feature_version}")
print(f"Row count: {historical_features.count()}")
| Tool | Role in pipeline | Why not the alternative |
|---|---|---|
| Apache Spark | ||
| Distributed feature computation | Pandas (single node, OOM at scale), Dask (less native Databricks integration) | |
| Delta Lake | ||
| Feature storage with versioning | Parquet (no ACID, no time travel), Hive tables (no merge support) | |
| MLflow Tracking | ||
| Experiment and param logging | Manual logging (not reproducible), W&B (extra cost, less native on Databricks) | |
| MLflow Registry | ||
| Model versioning and promotion | Custom model store (more ops overhead) | |
| Medallion Architecture | ||
| Pipeline layer separation | Flat pipelines (hard to debug, no replay capability) | |
| Delta MERGE | ||
| Idempotent Silver upserts | Overwrite (destroys history), append (creates duplicates) |
Shuffle partitions matter. Spark defaults to 200 shuffle partitions which is fine for small data but will bottleneck at scale. Set spark.conf.set("spark.sql.shuffle.partitions", "auto")
on Databricks Runtime 10+ or tune it manually to 2-3x your core count
.
Z-ordering on Gold features. If you're querying Gold by customer_id
frequently, add OPTIMIZE churn.gold.features ZORDER BY (customer_id)
after the write. This co-locates related data and cuts query times dramatically on large tables.
Log Delta version in every MLflow run. This is non-negotiable for reproducibility. Without it you can't prove which feature snapshot trained which model, which becomes a compliance problem in regulated industries.
Cluster autoscaling for feature jobs. Feature engineering jobs tend to have spiky resource needs (big during aggregation, small during writes). Enable autoscaling on your Databricks cluster and set a min/max node count rather than a fixed size.
The combination of Spark, Delta Lake, and MLflow on Databricks gives you a feature engineering pipeline that is reproducible (Delta time travel + MLflow param logging), scalable (Spark handles billions of rows), and auditable (every run is tracked, every feature version is stored).
The Medallion Architecture keeps the pipeline modular — you can rerun just the Gold layer if you change a feature definition without touching Bronze or Silver, and MLflow ties model performance back to the exact feature version that produced it.