FDE Architecture Framework: Build Production ML Systems That Don't Break The FDE (Feature-Decision-Execution) architecture framework separates ML prediction, business logic, and system action into distinct layers to prevent production failures common in monolithic ML services. The tutorial teaches how to implement each layer with clean interfaces, test independently, and deploy with different scaling strategies, using a fraud detection system as an example. FDE Architecture Framework: Build Production ML Systems That Don't Break Feature-Decision-Execution FDE is the layered architecture pattern that separates ML prediction from business logic from system action — the pattern that makes production ML systems maintainable, auditable, and safe to iterate on. Table of Contents FDE Architecture Framework: Build Production ML Systems That Don’t Break Level: Intermediate Time to complete: 60–90 minutes Prerequisites: Python, basic familiarity with REST APIs and databases; no prior MLOps experience required Learning Objectives By the end of this tutorial you will be able to: - Explain the three layers of the FDE architecture and why they are separated - Implement each layer with a clean interface contract in Python - Wire the layers together into a working fraud detection system - Test each layer independently and in combination - Deploy layers independently with different scaling and release strategies Table of Contents Why Monolithic ML Services Fail part-1 The FDE Pattern: Three Layers, Three Concerns part-2 The Feature Layer: Serving ML Inputs part-3 The Decision Layer: Model + Business Logic part-4 The Execution Layer: Taking Action Safely part-5 Tutorial: FDE for Banking Fraud Detection part-6 Layer Contracts: The API Between Layers part-7 Testing Strategy for FDE Systems part-8 Deployment Patterns: Independent Layer Operations part-9 When to Apply FDE and When Not To part-10 Exercises exercises Part 1 — Why Monolithic ML Services Fail The typical lifecycle of a monolithic ML service looks like this: Month 1 : Data scientist trains a fraud model. Engineer wraps it in a Flask app. The app reads features from the database, runs the model, writes a decision to the decisions table, calls the fraud case management API. It works. Month 6 : The model needs to be retrained. The engineer updates the model artifact, redeploys the service, and the business rules that were hardcoded alongside the model in the same function break because the output schema changed. Hotfix deployed. Three other integration points also break. Month 12 : The fraud case management API is being replaced. The new API requires a different payload structure. The engineer modifies the service. But the service also calls a legacy audit system with the old format, and that system can’t be changed without a 6-week change request. The team works around it by transforming the payload twice in the same function. Month 18 : Nobody understands the service anymore. Changing the model requires understanding the business rules. Changing the business rules requires understanding the API integrations. Changing the API integrations requires understanding the model output format. There is no test for any of it. This is not a hypothetical. It is the standard lifecycle of 80% of production ML systems built without an explicit architecture pattern. The root cause is the same in every case: prediction, business logic, and system action are tangled in a single service . When one changes, the others break. When something goes wrong in production, it’s unclear which layer is the source. FDE separates these concerns at the architecture level. Part 2 — The FDE Pattern: Three Layers, Three Concerns ┌─────────────────────────────────────────────────────────┐ │ INCOMING REQUEST │ │ transaction, API call, event │ └────────────────────────┬────────────────────────────────┘ │ ┌──────────▼──────────┐ │ FEATURE LAYER │ ← "What do we know?" │ │ │ • Feature serving │ │ • Signal retrieval │ │ • Feature groups │ │ • Freshness checks │ └──────────┬──────────┘ │ FeatureSet ┌──────────▼──────────┐ │ DECISION LAYER │ ← "What should we do?" │ │ │ • ML model s │ │ • Business rules │ │ • Output contract │ │ • Explainability │ └──────────┬──────────┘ │ DecisionResult ┌──────────▼──────────┐ │ EXECUTION LAYER │ ← "Do it safely" │ │ │ • Action dispatch │ │ • Idempotency │ │ • Rollback plan │ │ • Audit logging │ └──────────┬──────────┘ │ ExecutionResult ┌────▼────┐ │ CALLER │ └─────────┘ Each layer has a single responsibility and communicates with adjacent layers through a typed contract. The contract is the key: it means each layer can be developed, tested, deployed, and scaled independently. Feature Layer : given a request context customer ID, transaction data, session info , produce the feature vector the decision layer needs. Nothing else. Decision Layer : given a feature set, produce a decision approve/decline/review, score, recommended action, reason codes . Nothing else. Execution Layer : given a decision, take the appropriate action in downstream systems safely and idempotently. Nothing else. Part 3 — The Feature Layer: Serving ML Inputs 3.1 Responsibilities The Feature Layer is responsible for: - Serving pre-computed features from a fast store Redis, DynamoDB, Feast - Computing real-time features from the incoming request - Joining online features with request-time signals - Validating feature completeness and freshness - Returning a typed FeatureSet object 3.2 The FeatureSet Contract python from dataclasses import dataclass, field from typing import Optional, Dict, Any from datetime import datetime from enum import Enum class FeatureFreshness Enum : FRESH = "fresh" within expected refresh window STALE = "stale" older than expected, but available MISSING = "missing" not available at all SYNTHETIC = "synthetic" imputed from fallback logic @dataclass class FeatureGroup: """A logical group of related features with metadata.""" name: str features: Dict str, Any freshness: FeatureFreshness computed at: datetime source: str "online store", "real time", "fallback" @dataclass class FeatureSet: """ The contract from Feature Layer → Decision Layer. Contains all features the decision model needs, with provenance. """ request id: str customer id: str timestamp: datetime groups: Dict str, FeatureGroup is complete: bool False if any critical group is MISSING warnings: list = field default factory=list def get self, group: str, feature: str, default=None : """Safe feature access with default fallback.""" grp = self.groups.get group if grp is None or grp.freshness == FeatureFreshness.MISSING: return default return grp.features.get feature, default def get vector self, group: str - Dict str, Any : """Get all features in a group as a flat dict.""" grp = self.groups.get group if grp is None: return {} return grp.features 3.3 Implementing the Feature Layer python import redis import json import numpy as np from datetime import datetime, timedelta from typing import Optional import logging logger = logging.getLogger name class FeatureLayer: """ Feature serving layer for the FDE architecture. Combines online-stored pre-computed features with real-time signals. """ def init self, redis url: str = "redis://localhost:6379", freshness threshold minutes: int = 60 : self.redis = redis.from url redis url, decode responses=True self.freshness threshold = timedelta minutes=freshness threshold minutes def get features self, request id: str, customer id: str, transaction: dict - FeatureSet: """ Main entry point: build a complete FeatureSet for a decisioning request. """ timestamp = datetime.utcnow groups = {} warnings = 1. Online-stored customer risk features Redis groups "customer risk" = self. get customer risk customer id, timestamp 2. Spend behaviour features Redis, refreshed hourly groups "spend behaviour" = self. get spend behaviour customer id, timestamp 3. Real-time transaction features computed from the request itself groups "transaction context" = self. compute transaction features transaction, timestamp 4. Session risk features real-time from request groups "session context" = self. compute session features transaction, timestamp Check completeness: "customer risk" is always required is complete = groups "customer risk" .freshness = FeatureFreshness.MISSING if not is complete: warnings.append "customer risk features missing — decisioning may degrade" for name, group in groups.items : if group.freshness == FeatureFreshness.STALE: warnings.append f"{name} features are stale 60min old " return FeatureSet request id=request id, customer id=customer id, timestamp=timestamp, groups=groups, is complete=is complete, warnings=warnings, def get customer risk self, customer id: str, now: datetime - FeatureGroup: """Retrieve pre-computed customer risk features from Redis.""" key = f"features:customer risk:{customer id}" data = self.redis.hgetall key if not data: return FeatureGroup name="customer risk", features={}, freshness=FeatureFreshness.MISSING, computed at=now, source="online store", computed at = datetime.fromisoformat data.get " computed at", now.isoformat age = now - computed at freshness = FeatureFreshness.FRESH if age < self.freshness threshold else FeatureFreshness.STALE features = { "credit score": float data.get "credit score", 0 , "fraud score 30d": float data.get "fraud score 30d", 0 , "account age months": int data.get "account age months", 0 , "dispute count 90d": int data.get "dispute count 90d", 0 , "velocity 1h": float data.get "velocity 1h", 0 , $ in last hour "velocity 24h": float data.get "velocity 24h", 0 , $ in last 24h "international ratio": float data.get "international ratio", 0 , } return FeatureGroup name="customer risk", features=features, freshness=freshness, computed at=computed at, source="online store", def get spend behaviour self, customer id: str, now: datetime - FeatureGroup: """Retrieve spend pattern features.""" key = f"features:spend:{customer id}" data = self.redis.hgetall key if not data: Return synthetic defaults rather than MISSING for non-critical features return FeatureGroup name="spend behaviour", features={"avg txn amount 30d": 0.0, "top mcc": "unknown"}, freshness=FeatureFreshness.SYNTHETIC, computed at=now, source="fallback", return FeatureGroup name="spend behaviour", features={ "avg txn amount 30d": float data.get "avg txn amount 30d", 0 , "std txn amount 30d": float data.get "std txn amount 30d", 0 , "top mcc": data.get "top mcc", "unknown" , "unique merchants 7d": int data.get "unique merchants 7d", 0 , "weekend spend ratio": float data.get "weekend spend ratio", 0.5 , }, freshness=FeatureFreshness.FRESH, computed at=now, source="online store", def compute transaction features self, txn: dict, now: datetime - FeatureGroup: """Compute features from the current transaction — always real-time.""" amount = float txn.get "amount", 0 is intl = txn.get "country", "US" = "US" is online = txn.get "channel", "" in "web", "mobile", "api" is night = now.hour < 6 or now.hour = 22 return FeatureGroup name="transaction context", features={ "amount": amount, "amount log": float np.log1p amount , "is international": int is intl , "is online": int is online , "is night": int is night , "mcc": txn.get "mcc", "0000" , "merchant country": txn.get "country", "US" , }, freshness=FeatureFreshness.FRESH, computed at=now, source="real time", def compute session features self, txn: dict, now: datetime - FeatureGroup: """Compute session-level risk signals.""" return FeatureGroup name="session context", features={ "device fingerprint match": int txn.get "device known", True , "ip country match": int txn.get "ip matches billing", True , "auth method": txn.get "auth method", "pin" , }, freshness=FeatureFreshness.FRESH, computed at=now, source="real time", Part 4 — The Decision Layer: Model + Business Logic 4.1 The DecisionResult Contract python from dataclasses import dataclass, field from typing import List, Optional from enum import Enum class DecisionAction Enum : APPROVE = "approve" DECLINE = "decline" REVIEW = "review" send to human review queue CHALLENGE = "challenge" step-up authentication @dataclass class DecisionResult: """ The contract from Decision Layer → Execution Layer. Encodes what to do, why, and how confident we are. """ request id: str customer id: str action: DecisionAction fraud score: float 0.0 safe to 1.0 fraud confidence: float model confidence in this decision reason codes: List str human-readable reasons for regulatory policy applied: str which rule or model made this decision model version: str is model driven: bool False if overridden by a hard rule metadata: dict = field default factory=dict 4.2 Implementing the Decision Layer python import numpy as np from sklearn.ensemble import GradientBoostingClassifier import joblib class DecisionLayer: """ Decision layer: combines ML model scores with business rules to produce a typed DecisionResult. Critically: business rules are explicit and auditable, not embedded in the model. """ Fraud score thresholds easily tunable without model retrain DECLINE THRESHOLD = 0.85 REVIEW THRESHOLD = 0.60 CHALLENGE THRESHOLD = 0.40 def init self, model path: str, model version: str : self.model = joblib.load model path self.model version = model version def decide self, feature set: FeatureSet - DecisionResult: """ Core decision logic: run model, apply business rules, return result. """ Step 1: Check if any hard-rule pre-empts the model hard rule result = self. check hard rules feature set if hard rule result is not None: return hard rule result Step 2: If feature set is incomplete, apply conservative policy if not feature set.is complete: return self. incomplete features policy feature set Step 3: Run the ML model fraud score, confidence = self. score feature set Step 4: Apply threshold policy action, reason codes = self. apply thresholds fraud score, confidence, feature set return DecisionResult request id=feature set.request id, customer id=feature set.customer id, action=action, fraud score=fraud score, confidence=confidence, reason codes=reason codes, policy applied="gbm v3 threshold policy", model version=self.model version, is model driven=True, def check hard rules self, fs: FeatureSet - Optional DecisionResult : """ Hard rules that override the model. These encode: regulatory requirements, credit policy, known fraud patterns, and operational constraints. """ customer id = fs.customer id Rule 1: Immediate decline for known fraud lists OFAC, internal blocklist if self. is on blocklist customer id : return DecisionResult request id=fs.request id, customer id=customer id, action=DecisionAction.DECLINE, fraud score=1.0, confidence=1.0, reason codes= "BLOCKED ENTITY" , policy applied="hard rule:blocklist", model version=self.model version, is model driven=False, Rule 2: Extreme velocity — always decline velocity 1h = fs.get "customer risk", "velocity 1h", 0 if velocity 1h 10 000: return DecisionResult request id=fs.request id, customer id=customer id, action=DecisionAction.DECLINE, fraud score=0.95, confidence=1.0, reason codes= "VELOCITY EXCEEDED" , policy applied="hard rule:velocity limit", model version=self.model version, is model driven=False, Rule 3: International transaction on account flagged as domestic-only if fs.get "transaction context", "is international" and fs.get "customer risk", "international ratio", 0 < 0.01 : Step up to challenge rather than decline return DecisionResult request id=fs.request id, customer id=customer id, action=DecisionAction.CHALLENGE, fraud score=0.50, confidence=0.70, reason codes= "UNUSUAL GEO" , policy applied="hard rule:geo challenge", model version=self.model version, is model driven=False, return None no hard rule fired; proceed to model def score self, fs: FeatureSet - tuple: """Build feature vector and run GBM model.""" risk = fs.get vector "customer risk" spend = fs.get vector "spend behaviour" txn = fs.get vector "transaction context" session = fs.get vector "session context" Deviation of current amount from historical average avg = spend.get "avg txn amount 30d", txn.get "amount", 1 std = spend.get "std txn amount 30d", avg 0.5 or 1.0 amount zscore = txn.get "amount", 0 - avg / std X = np.array risk.get "fraud score 30d", 0 , risk.get "velocity 1h", 0 / 1000, risk.get "velocity 24h", 0 / 10000, risk.get "dispute count 90d", 0 , risk.get "international ratio", 0 , txn.get "amount log", 0 , amount zscore, txn.get "is international", 0 , txn.get "is online", 0 , txn.get "is night", 0 , session.get "device fingerprint match", 1 , session.get "ip country match", 1 , proba = self.model.predict proba X 0 fraud prob = float proba 1 confidence = float max proba confidence is how far from 0.5 return fraud prob, confidence def apply thresholds self, fraud score: float, confidence: float, fs: FeatureSet - tuple: """Map fraud score to action with reason codes.""" if fraud score = self.DECLINE THRESHOLD: action = DecisionAction.DECLINE reasons = self. get reason codes fraud score, fs elif fraud score = self.REVIEW THRESHOLD: action = DecisionAction.REVIEW reasons = self. get reason codes fraud score, fs elif fraud score = self.CHALLENGE THRESHOLD: action = DecisionAction.CHALLENGE reasons = "ELEVATED RISK" else: action = DecisionAction.APPROVE reasons = "WITHIN NORMAL PARAMETERS" return action, reasons def get reason codes self, score: float, fs: FeatureSet - List str : """Generate regulatory-grade reason codes FCRA compliant .""" codes = if fs.get "customer risk", "velocity 1h", 0 2000: codes.append "HIGH VELOCITY" if fs.get "transaction context", "is international" : codes.append "INTERNATIONAL TRANSACTION" if fs.get "transaction context", "is night" : codes.append "UNUSUAL TIME" if not fs.get "session context", "device fingerprint match", True : codes.append "UNRECOGNISED DEVICE" if not codes: codes.append "MODEL RISK SCORE" return codes :4 max 4 reason codes per FCRA def incomplete features policy self, fs: FeatureSet - DecisionResult: """Conservative policy when features are missing.""" return DecisionResult request id=fs.request id, customer id=fs.customer id, action=DecisionAction.REVIEW, fraud score=0.50, confidence=0.10, reason codes= "INSUFFICIENT FEATURES" , policy applied="fallback:incomplete features", model version=self.model version, is model driven=False, metadata={"warnings": fs.warnings}, def is on blocklist self, customer id: str - bool: In production: call blocklist service or Redis set return False Part 5 — The Execution Layer: Taking Action Safely 5.1 Responsibilities The Execution Layer takes the DecisionResult and acts on it. Its responsibilities are: Routing : send the decision to the right downstream system approve → payment rails, decline → decline handler, review → case management queue Idempotency : ensure that retried requests don’t double-execute actions Audit logging : write an immutable record of every action taken Rollback : for reversible actions, maintain rollback capability 5.2 Implementing the Execution Layer python import uuid import time import redis from typing import Optional from dataclasses import dataclass @dataclass class ExecutionResult: """The response from the Execution Layer back to the caller.""" request id: str action taken: str success: bool reference id: Optional str downstream system reference is idempotent: bool True if this was a duplicate request audit trail id: str error: Optional str = None class ExecutionLayer: """ Execution layer: safely dispatches decisions to downstream systems. Handles idempotency, audit logging, and rollback for reversible actions. """ def init self, redis url: str, audit logger, payment client, review queue client : self.redis = redis.from url redis url self.audit = audit logger self.payments = payment client self.review queue = review queue client self.idempotency ttl = 86400 24 hours def execute self, decision: DecisionResult, transaction: dict - ExecutionResult: """ Main entry point: execute the decision safely. """ Step 1: Idempotency check idempotency key = f"exec:idempotent:{decision.request id}" existing = self.redis.get idempotency key if existing: Request already processed — return cached result cached = json.loads existing return ExecutionResult request id=decision.request id, action taken=cached "action taken" , success=True, reference id=cached.get "reference id" , is idempotent=True, audit trail id=cached "audit trail id" , Step 2: Execute the action audit id = str uuid.uuid4 result = self. dispatch decision, transaction, audit id Step 3: Write to audit log always — even on failure self.audit.write { "audit trail id": audit id, "request id": decision.request id, "customer id": decision.customer id, "decision": decision.action.value, "fraud score": decision.fraud score, "reason codes": decision.reason codes, "policy applied": decision.policy applied, "model version": decision.model version, "is model driven": decision.is model driven, "action taken": result.action taken, "success": result.success, "reference id": result.reference id, "timestamp": time.time , "transaction": transaction, } Step 4: Cache for idempotency if result.success: self.redis.setex idempotency key, self.idempotency ttl, json.dumps { "action taken": result.action taken, "reference id": result.reference id, "audit trail id": audit id, } , return result def dispatch self, decision: DecisionResult, txn: dict, audit id: str - ExecutionResult: """Route decision to the appropriate handler.""" action = decision.action if action == DecisionAction.APPROVE: return self. handle approve decision, txn, audit id elif action == DecisionAction.DECLINE: return self. handle decline decision, txn, audit id elif action == DecisionAction.REVIEW: return self. handle review decision, txn, audit id elif action == DecisionAction.CHALLENGE: return self. handle challenge decision, txn, audit id else: raise ValueError f"Unknown action: {action}" def handle approve self, decision, txn, audit id - ExecutionResult: """Approve: authorise the transaction on the payment rails.""" try: ref = self.payments.authorise transaction id=txn "transaction id" , amount=txn "amount" , merchant=txn "merchant id" , auth code=str uuid.uuid4 :8 .upper , return ExecutionResult request id=decision.request id, action taken="approved", success=True, reference id=ref, is idempotent=False, audit trail id=audit id, except Exception as e: return ExecutionResult request id=decision.request id, action taken="approve failed", success=False, reference id=None, is idempotent=False, audit trail id=audit id, error=str e , def handle decline self, decision, txn, audit id - ExecutionResult: """Decline: reject the transaction with reason codes.""" self.payments.decline transaction id=txn "transaction id" , decline codes=decision.reason codes, return ExecutionResult request id=decision.request id, action taken="declined", success=True, reference id=None, is idempotent=False, audit trail id=audit id, def handle review self, decision, txn, audit id - ExecutionResult: """Review: route to human fraud analyst queue.""" case id = self.review queue.enqueue { "transaction": txn, "fraud score": decision.fraud score, "reason codes": decision.reason codes, "audit trail id": audit id, "priority": "high" if decision.fraud score 0.75 else "normal", } return ExecutionResult request id=decision.request id, action taken="queued for review", success=True, reference id=case id, is idempotent=False, audit trail id=audit id, def handle challenge self, decision, txn, audit id - ExecutionResult: """Challenge: trigger step-up authentication flow.""" challenge id = self.payments.initiate challenge transaction id=txn "transaction id" , challenge type="otp sms", return ExecutionResult request id=decision.request id, action taken="challenge initiated", success=True, reference id=challenge id, is idempotent=False, audit trail id=audit id, Part 6 — Tutorial: Wire It Together 6.1 The FDE Orchestrator python import uuid import logging logger = logging.getLogger name class FraudDecisionService: """ FDE orchestrator for the fraud decision system. Coordinates Feature → Decision → Execution, handles errors at each layer. """ def init self, feature layer: FeatureLayer, decision layer: DecisionLayer, execution layer: ExecutionLayer : self.features = feature layer self.decision = decision layer self.execution = execution layer def process self, transaction: dict - dict: """ Full FDE pipeline for a single transaction. Returns a structured response for the calling payment system. """ request id = transaction.get "request id" or str uuid.uuid4 customer id = transaction "customer id" ── Layer 1: Features ───────────────────────────────────────────────── try: feature set = self.features.get features request id=request id, customer id=customer id, transaction=transaction, except Exception as e: logger.error f"Feature layer failed for {request id}: {e}" Fail open with synthetic empty features or fail closed — your policy feature set = FeatureSet request id=request id, customer id=customer id, timestamp=datetime.utcnow , groups={}, is complete=False, warnings= f"Feature layer error: {e}" , ── Layer 2: Decision ───────────────────────────────────────────────── try: decision = self.decision.decide feature set except Exception as e: logger.error f"Decision layer failed for {request id}: {e}" Safe fallback: review rather than approve or decline decision = DecisionResult request id=request id, customer id=customer id, action=DecisionAction.REVIEW, fraud score=0.5, confidence=0.0, reason codes= "DECISION SYSTEM ERROR" , policy applied="fallback:system error", model version="unknown", is model driven=False, ── Layer 3: Execution ──────────────────────────────────────────────── try: result = self.execution.execute decision, transaction except Exception as e: logger.critical f"Execution layer failed for {request id}: {e}" return { "request id": request id, "status": "error", "action": "system error", "error": str e , } return { "request id": request id, "action": result.action taken, "reference id": result.reference id, "fraud score": decision.fraud score, "reason codes": decision.reason codes, "audit trail id": result.audit trail id, "model version": decision.model version, } 6.2 Usage Example Initialise the service in production: inject via DI container service = FraudDecisionService feature layer=FeatureLayer redis url="redis://localhost:6379" , decision layer=DecisionLayer model path="models/fraud gbm v3.joblib", model version="v3.2.1", , execution layer=ExecutionLayer redis url="redis://localhost:6379", audit logger=AuditLogger , payment client=PaymentClient , review queue client=ReviewQueueClient , , Process a transaction result = service.process { "request id": "txn-20260612-001", "customer id": "customer 8472", "transaction id": "auth-0001-20260612", "amount": 1850.00, "merchant id": "merchant 12345", "mcc": "5411", grocery store "country": "US", "channel": "mobile", "device known": True, "ip matches billing": True, "auth method": "biometric", } print result {'request id': 'txn-20260612-001', 'action': 'approved', 'reference id': 'AUTH-7F3A', 'fraud score': 0.12, 'reason codes': 'WITHIN NORMAL PARAMETERS' , 'audit trail id': '...', 'model version': 'v3.2.1'} Part 7 — Layer Contracts: The API Between Layers The contracts FeatureSet , DecisionResult , ExecutionResult are the most important part of the FDE architecture. Here’s what makes a good contract: Typed and validated. Use Python dataclasses, Pydantic, or a schema registry. Untyped dicts between layers are the first step toward the monolith failure mode. Versioned. When the Decision Layer adds a new field to DecisionResult , the Execution Layer should not break. Design contracts with forward-compatibility in mind: add fields, don’t remove them; use Optional types for new fields. Observable. Every contract exchange should be logged at the INFO level with at minimum: request id, timestamp, layer, and key decision fields. This is what makes the system debuggable when something goes wrong. Documented. Every field should have a docstring. Future engineers reading the code shouldn’t need to trace through three layers to understand what confidence means in DecisionResult . Part 8 — Testing Strategy for FDE Systems One of the most powerful properties of FDE is testability. Each layer can be tested independently. python import pytest from unittest.mock import MagicMock, patch class TestDecisionLayer: """Unit tests for the Decision Layer — no Feature or Execution Layer needed.""" def setup method self : self.decision = DecisionLayer model path="tests/fixtures/mock model.joblib", model version="test-v1", def test hard rule velocity triggers decline self : """Velocity hard rule should decline before model runs.""" fs = FeatureSet request id="test-001", customer id="c001", timestamp=datetime.utcnow , groups={ "customer risk": FeatureGroup name="customer risk", features={"velocity 1h": 15000.0}, exceeds $10K limit freshness=FeatureFreshness.FRESH, computed at=datetime.utcnow , source="online store", , "transaction context": FeatureGroup name="transaction context", features={"is international": 0, "amount": 500.0, "amount log": 6.2}, freshness=FeatureFreshness.FRESH, computed at=datetime.utcnow , source="real time", , "session context": FeatureGroup name="session context", features={}, freshness=FeatureFreshness.FRESH, computed at=datetime.utcnow , source="real time", , "spend behaviour": FeatureGroup name="spend behaviour", features={}, freshness=FeatureFreshness.SYNTHETIC, computed at=datetime.utcnow , source="fallback", , }, is complete=True, result = self.decision.decide fs assert result.action == DecisionAction.DECLINE assert result.is model driven == False assert "VELOCITY EXCEEDED" in result.reason codes def test normal transaction approves self : """Low-risk transaction should approve.""" Build a low-risk feature set fs = build low risk feature set "test-002", "c002" result = self.decision.decide fs assert result.action == DecisionAction.APPROVE assert result.fraud score < 0.40 def test incomplete features route to review self : """Incomplete feature set should trigger review, not approve.""" fs = FeatureSet request id="test-003", customer id="c003", timestamp=datetime.utcnow , groups={}, is complete=False, warnings= "customer risk missing" , result = self.decision.decide fs assert result.action == DecisionAction.REVIEW assert "INSUFFICIENT FEATURES" in result.reason codes class TestFDEIntegration: """Integration tests: all three layers working together.""" def test full pipeline approve self : """End-to-end: normal transaction should be approved.""" service = build test service uses test doubles for external systems result = service.process build normal transaction assert result "action" == "approved" assert result "fraud score" < 0.40 def test idempotency self : """Same request id processed twice should return same result.""" service = build test service txn = build normal transaction result1 = service.process txn result2 = service.process txn same request id assert result1 "action" == result2 "action" assert result1 "audit trail id" == result2 "audit trail id" Second call should be idempotent Part 9 — Deployment: Independent Layer Operations The FDE architecture enables a deployment pattern that monolithic services can’t support: layer-level canary releases . ┌─────────────────────────────────────────────────────────────┐ │ CANARY DEPLOYMENT │ │ │ │ Feature Layer v1.2 ──→ Decision Layer v3 90% ──→ Exec│ │ └──→ Decision Layer v4 10% ──→ Exec│ │ │ │ This lets you A/B test a new model without │ │ touching the Feature or Execution layers. │ └─────────────────────────────────────────────────────────────┘ Canary pattern for the Decision Layer: python import random class CanaryDecisionLayer: """ Routes a percentage of traffic to a candidate Decision Layer while keeping the rest on the stable version. """ def init self, stable: DecisionLayer, candidate: DecisionLayer, candidate pct: float = 0.10 : self.stable = stable self.candidate = candidate self.pct = candidate pct def decide self, feature set: FeatureSet - DecisionResult: if random.random < self.pct: result = self.candidate.decide feature set result.metadata "canary" = True else: result = self.stable.decide feature set result.metadata "canary" = False return result Shadow mode — run the new layer in parallel but don’t use its output — is the safest way to validate a new Decision Layer before any traffic switches: class ShadowDecisionLayer: """ Runs the shadow layer in parallel for comparison, but returns stable output. Logs shadow vs stable divergence for analysis. """ def init self, stable: DecisionLayer, shadow: DecisionLayer, metrics client : self.stable = stable self.shadow = shadow self.metrics = metrics client def decide self, feature set: FeatureSet - DecisionResult: stable result = self.stable.decide feature set Run shadow asynchronously don't block on it try: shadow result = self.shadow.decide feature set agrees = stable result.action == shadow result.action self.metrics.record "shadow agreement", int agrees , tags={ "stable action": stable result.action.value, "shadow action": shadow result.action.value, } except Exception as e: self.metrics.record "shadow error", 1 return stable result always return stable Part 10 — When to Apply FDE and When Not To Apply FDE when: - The system takes actions in downstream systems not just returns predictions - Multiple teams own different parts of the pipeline data engineering owns Feature, ML owns Decision, platform engineering owns Execution - Regulatory audit trails are required - You need to be able to change the model without changing the action logic - Latency budget allows for the overhead of layer boundaries adds ~5–10ms for well-implemented serialisation Don’t apply FDE when: - You’re building a pure prediction service that returns a score with no action - The system is small enough that a single team maintains the entire pipeline - Latency budget is so tight < 10ms that layer boundaries are prohibitive - The model and business logic are fundamentally inseparable e.g., the business logic is just a thin wrapper on the model output FDE is an architectural pattern for systems that act, not just systems that predict. If your system returns a score and a human or another system decides what to do with it, a simpler architecture is appropriate. Part 11 — Exercises Exercise 1: Add a Fourth Layer — Feedback A production FDE system needs a fourth layer: Feedback, which observes the outcomes of executed decisions and routes them back to the Feature Layer to update online features and the Decision Layer to trigger model retraining . Design and implement a FeedbackLayer class that: - Accepts feedback events transaction disputed, fraud confirmed, challenge passed - Updates the customer’s velocity 1h and fraud score 30d in Redis - Logs feedback events to a stream for offline model retraining - Closes the loop: the next transaction for this customer uses updated features Exercise 2: Rate Limiting in the Execution Layer The Execution Layer should rate-limit decline actions per customer: if a customer has been declined 3 times in 10 minutes, the 4th request should be escalated to a human review rather than auto-declined. Add this logic to ExecutionLayer. handle decline using Redis counters with TTL. What should happen to the idempotency cache when a declined request is re-routed to review? Exercise 3: Feature Layer Fallback Hierarchy The Feature Layer currently falls back to a synthetic default when spend behaviour is missing. Implement a tiered fallback hierarchy: - First: try the online Redis store - Second: try a warm cache recent data from the last 24h, stored in a secondary Redis key - Third: compute an approximate feature from the incoming transaction itself - Last resort: return SYNTHETIC with a global mean value Add a fallback tier field to FeatureGroup to track which tier served each feature. Exercise 4: Explainability Endpoint Add a /explain/{request id} API endpoint to the FDE service that returns: - The feature values that were used for this decision - Which features were most influential SHAP values from the GBM model - Whether a hard rule or the model made the decision - The full audit trail for regulatory inquiry Exercise 5: Multi-Model Decision Layer Extend the Decision Layer to support a model ensemble: GBM as the primary scorer, a logistic regression as a second check, and a rule-based anomaly detector. Implement a VotingDecisionLayer that: - Takes the maximum fraud score from all three - Requires two-out-of-three agreement before approving a transaction above $5,000 - Logs which model s drove the decision Summary | Layer | Responsibility | Input | Output | |---|---|---|---| Feature | ”What do we know?“ | request id, customer id, transaction | FeatureSet | Decision | ”What should we do?” | FeatureSet | DecisionResult | Execution | ”Do it safely” | DecisionResult , transaction | ExecutionResult | The FDE pattern doesn’t make ML easier. It makes production ML maintainable — which is harder and more important. The model you build today will be replaced in 18 months. The architecture you build today will outlast three model generations. Build it right. Further Reading Designing ML Systems — Chip Huyen 2022 https://www.oreilly.com/library/view/designing-machine-learning/9781098107956/ — Chapter 7 covers serving patterns that align with FDE Feast Feature Store Documentation https://docs.feast.dev/ Real-Time ML for Production — Made With ML https://madewithml.com/courses/mlops/serving/ The ML Test Score — Breck et al., Google 2017 https://research.google/pubs/the-ml-test-score-a-rubric-for-ml-production-readiness-and-technical-debt-reduction/ Rules of Machine Learning — Google https://developers.google.com/machine-learning/guides/rules-of-ml Enterprise AI Architecture Want more enterprise AI architecture breakdowns? Subscribe to SuperML.