cd /news/machine-learning/azure-databricks-for-mlops-and-featu… · home topics machine-learning article
[ARTICLE · art-42204] src=dev.to ↗ pub= topic=machine-learning verified=true sentiment=↑ positive

Azure Databricks for MLOps and Feature Engineering at Scale with Apache Spark, Delta Lake, and MLflow

Azure Databricks provides a production-grade feature engineering pipeline for MLOps using Apache Spark, Delta Lake, and MLflow. The pipeline follows the Medallion Architecture with Bronze, Silver, and Gold layers to transform raw data into ML-ready features. A customer churn prediction use case demonstrates append-only ingestion, deduplication, schema enforcement, and aggregation for scalable feature engineering.

read6 min views1 publishedJun 28, 2026

Raw data doesn't win model competitions. Features do. And when your raw data is tens of billions of rows sitting across multiple sources, you can't afford to run pandas in a notebook and call it a day.

In this tutorial I'll walk through building a production-grade feature engineering pipeline on Azure Databricks using:

The use case is a customer churn prediction system, but the patterns apply to any ML feature pipeline.

The pipeline follows the Medallion Architecture — a layered approach where data gets progressively cleaner and more feature-ready as it moves from Bronze to Silver to Gold. MLflow sits across all three layers tracking every run.

Layer Delta Table What happens here Typical latency
Bronze
churn.bronze.events
Raw ingest, no transforms, append only Minutes
Silver
churn.silver.customers
Deduplication, null handling, schema enforcement Minutes
Gold
churn.gold.features
Aggregations, window functions, encoding Minutes to hours
MLflow Run
N/A Training, metric logging, artifact storage Hours
Registry
N/A Versioned model store, stage promotion On demand

The Bronze layer is append-only. No transforms. No business logic. Just get the data in and preserve it exactly as it arrived so you can always replay from source.

from pyspark.sql import SparkSession
from pyspark.sql.functions import current_timestamp, lit
from delta.tables import DeltaTable

spark = SparkSession.builder.getOrCreate()

raw_events = spark.read.format('json').load('abfss://raw@yourstorage.dfs.core.windows.net/events/')

bronze_df = raw_events.withColumn('_ingested_at', current_timestamp()) \
                       .withColumn('_source', lit('events_api'))

bronze_df.write \
    .format('delta') \
    .mode('append') \
    .option('mergeSchema', 'true') \
    .saveAsTable('churn.bronze.events')

print(f"Bronze rows written: {bronze_df.count()}")

Why append-only?If your downstream pipeline produces bad features, you want to replay from Bronze without re-ingesting from source. Overwriting Bronze breaks that ability.

Silver is where you enforce schema, handle nulls, deduplicate, and standardize. Think of it as your canonical, trusted dataset.

from pyspark.sql.functions import col, to_timestamp, when, trim, upper
from delta.tables import DeltaTable

bronze = spark.table('churn.bronze.events')

silver_df = bronze \
    .filter(col('customer_id').isNotNull()) \
    .filter(col('event_type').isNotNull()) \
    .dropDuplicates(['customer_id', 'event_id']) \
    .withColumn('event_ts',     to_timestamp(col('event_timestamp'))) \
    .withColumn('event_type',   upper(trim(col('event_type')))) \
    .withColumn('country_code', when(col('country').isNull(), lit('UNKNOWN'))
                                .otherwise(upper(col('country')))) \
    .select(
        'customer_id',
        'event_id',
        'event_type',
        'event_ts',
        'country_code',
        'product_id',
        'session_id',
        '_ingested_at',
    )

if DeltaTable.isDeltaTable(spark, 'churn.silver.customers'):
    silver_table = DeltaTable.forName(spark, 'churn.silver.customers')
    silver_table.alias('tgt').merge(
        silver_df.alias('src'),
        'tgt.customer_id = src.customer_id AND tgt.event_id = src.event_id'
    ).whenNotMatchedInsertAll().execute()
else:
    silver_df.write.format('delta').saveAsTable('churn.silver.customers')

print(f"Silver table updated. Total rows: {spark.table('churn.silver.customers').count()}")

This is the heart of the pipeline. We compute aggregated, windowed, and encoded features that the model will actually train on.

from pyspark.sql.functions import (
    col, count, countDistinct, sum as _sum,
    avg, datediff, max as _max, min as _min,
    current_date, expr, when
)
from pyspark.sql.window import Window

silver = spark.table('churn.silver.customers')

today = current_date()

agg_features = silver \
    .withColumn('days_since_event', datediff(today, col('event_ts'))) \
    .groupBy('customer_id') \
    .agg(
        count('event_id')                                          .alias('total_events'),
        countDistinct('session_id')                                .alias('total_sessions'),
        countDistinct('product_id')                                .alias('distinct_products'),
        _sum(when(col('days_since_event') <= 30, 1).otherwise(0)) .alias('events_last_30d'),
        _sum(when(col('days_since_event') <= 90, 1).otherwise(0)) .alias('events_last_90d'),
        _max('event_ts')                                           .alias('last_event_ts'),
        _min('event_ts')                                           .alias('first_event_ts'),
    ) \
    .withColumn('days_since_last_event', datediff(today, col('last_event_ts'))) \
    .withColumn('customer_tenure_days',  datediff(today, col('first_event_ts'))) \
    .withColumn('avg_events_per_day',
        col('total_events') / (col('customer_tenure_days') + 1))

feature_df = agg_features \
    .withColumn('recency_tier',
        when(col('days_since_last_event') <= 7,  lit(3))   # active
       .when(col('days_since_last_event') <= 30, lit(2))   # at risk
       .otherwise(lit(1))                                   # churned
    ) \
    .withColumn('engagement_score',
        (col('events_last_30d') * 0.6 + col('events_last_90d') * 0.4) /
        (col('customer_tenure_days') + 1)
    )

feature_df \
    .withColumn('feature_date', current_date()) \
    .write \
    .format('delta') \
    .mode('overwrite') \
    .option('replaceWhere', f"feature_date = '{today}'") \
    .saveAsTable('churn.gold.features')

print(f"Gold features written: {feature_df.count()} customers")

With features in Gold, we hand off to MLflow to train, track, and register the model. Notice we log the Delta table version so we can always reproduce exactly which feature snapshot trained which model.

import mlflow
import mlflow.sklearn
from mlflow.models.signature import infer_signature
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import roc_auc_score, f1_score
import pandas as pd

mlflow.set_experiment('/churn-prediction/feature-pipeline')

gold_table  = DeltaTable.forName(spark, 'churn.gold.features')
delta_version = gold_table.history(1).select('version').collect()[0][0]

features_pdf = spark.table('churn.gold.features').toPandas()

FEATURE_COLS = [
    'total_events', 'total_sessions', 'distinct_products',
    'events_last_30d', 'events_last_90d', 'days_since_last_event',
    'customer_tenure_days', 'avg_events_per_day',
    'recency_tier', 'engagement_score',
]
TARGET = 'churned'

X = features_pdf[FEATURE_COLS]
y = features_pdf[TARGET]
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

with mlflow.start_run(run_name=f'gbm-features-v{delta_version}') as run:

    params = {'n_estimators': 200, 'max_depth': 5, 'learning_rate': 0.05}
    model  = GradientBoostingClassifier(**params, random_state=42)
    model.fit(X_train, y_train)

    y_pred = model.predict(X_test)
    y_prob = model.predict_proba(X_test)[:, 1]

    mlflow.log_params(params)
    mlflow.log_metric('roc_auc', roc_auc_score(y_test, y_prob))
    mlflow.log_metric('f1_score', f1_score(y_test, y_pred))
    mlflow.log_param('delta_feature_version', delta_version)
    mlflow.log_param('feature_columns', FEATURE_COLS)
    mlflow.log_param('training_rows', len(X_train))

    signature = infer_signature(X_train, y_pred)
    mlflow.sklearn.log_model(
        model,
        artifact_path='churn-gbm',
        signature=signature,
        registered_model_name='churn-prediction-gbm',
    )

    print(f"Run ID: {run.info.run_id}")
    print(f"ROC-AUC: {roc_auc_score(y_test, y_prob):.4f}")
    print(f"Feature Delta version logged: {delta_version}")

One of the best things about Delta Lake is time travel. If a model behaves unexpectedly in production, you can reload the exact feature snapshot it was trained on.

import mlflow

run = mlflow.get_run('your-run-id-here')
feature_version = int(run.data.params['delta_feature_version'])

historical_features = spark.read \
    .format('delta') \
    .option('versionAsOf', feature_version) \
    .table('churn.gold.features')

print(f"Loaded feature snapshot from Delta version {feature_version}")
print(f"Row count: {historical_features.count()}")

Tool Role in pipeline Why not the alternative
Apache Spark
Distributed feature computation Pandas (single node, OOM at scale), Dask (less native Databricks integration)
Delta Lake
Feature storage with versioning Parquet (no ACID, no time travel), Hive tables (no merge support)
MLflow Tracking
Experiment and param logging Manual logging (not reproducible), W&B (extra cost, less native on Databricks)
MLflow Registry
Model versioning and promotion Custom model store (more ops overhead)
Medallion Architecture
Pipeline layer separation Flat pipelines (hard to debug, no replay capability)
Delta MERGE
Idempotent Silver upserts Overwrite (destroys history), append (creates duplicates)

Shuffle partitions matter. Spark defaults to 200 shuffle partitions which is fine for small data but will bottleneck at scale. Set spark.conf.set("spark.sql.shuffle.partitions", "auto")

on Databricks Runtime 10+ or tune it manually to 2-3x your core count

.

Z-ordering on Gold features. If you're querying Gold by customer_id

frequently, add OPTIMIZE churn.gold.features ZORDER BY (customer_id)

after the write. This co-locates related data and cuts query times dramatically on large tables.

Log Delta version in every MLflow run. This is non-negotiable for reproducibility. Without it you can't prove which feature snapshot trained which model, which becomes a compliance problem in regulated industries.

Cluster autoscaling for feature jobs. Feature engineering jobs tend to have spiky resource needs (big during aggregation, small during writes). Enable autoscaling on your Databricks cluster and set a min/max node count rather than a fixed size.

The combination of Spark, Delta Lake, and MLflow on Databricks gives you a feature engineering pipeline that is reproducible (Delta time travel + MLflow param logging), scalable (Spark handles billions of rows), and auditable (every run is tracked, every feature version is stored).

The Medallion Architecture keeps the pipeline modular — you can rerun just the Gold layer if you change a feature definition without touching Bronze or Silver, and MLflow ties model performance back to the exact feature version that produced it.

── more in #machine-learning 4 stories · sorted by recency
── more on @azure databricks 3 stories trending now
sponsored brought to you by zahid.host 4,200+ EU-deployed projects
reading about agents? ship yours in a single git push.

Run your AI side-project on zahid.host

EU-based hosting, git-push deploys, automatic HTTPS, no cold starts. Free tier with a custom domain — perfect for shipping the agent you just read about.

$git push zahid main
Live at https://your-agent.zahid.host
Get free account → Pricing
from €0/mo · no card required
LIVE [news/azure-databricks-for…] indexed:0 read:6min 2026-06-28 ·