Moving machine learning models from Jupyter notebooks to production is where the real engineering challenge begins. I’ve learned this the hard way—what works perfectly in a notebook can fail spectacularly when exposed to real users, real data, and real constraints. In this comprehensive guide, I’ll share the patterns, tools, and lessons I’ve developed for deploying ML systems that scale.
The Production ML Gap
Here’s what separates notebook code from production code:
| Notebook Code | Production Code |
|---|---|
| Manual data loading | Automated data pipelines |
| Hardcoded paths | Configuration management |
| No testing | Comprehensive tests |
| Single version | Version control & rollback |
| No monitoring | Metrics, logging, alerting |
| Fixed resources | Auto-scaling |
| No security | Authentication, authorization |
The ML Production Pipeline
┌─────────────┐ ┌──────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ Data │ -> │ Training │ -> │ Model │ -> │ Deploy │ -> │ Monitor │
│ Pipeline │ │ Pipeline │ │ Registry │ │ Service │ │ & Alert │
└─────────────┘ └──────────────┘ └─────────────┘ └─────────────┘ └─────────────┘
│ │ │ │ │
▼ ▼ ▼ ▼ ▼
Validation Validation Versioning Load Balancing Drift Detection
Schema checks Metrics Lineage Health checks Performance
Quality tests Cross-val Artifacts Scaling Logging
Step 1: From Notebook to Modular Code
Notebook Anti-Pattern
# ❌ DON'T: Notebook-style code
import pandas as pd
from sklearn.ensemble import RandomForestClassifier
# Hardcoded paths
df = pd.read_csv('/Users/me/data/customers.csv')
# Preprocessing mixed with logic
df['age'] = 2024 - df['birth_year']
df = df[df['age'] > 18]
df = pd.get_dummies(df, columns=['category'])
# Model training
X = df.drop('churn', axis=1)
y = df['churn']
model = RandomForestClassifier()
model.fit(X, y)
# No validation, no tests, no config
Production-Ready Refactor
# ✅ DO: Modular, configurable code
# src/model_training/train.py
from pathlib import Path
from typing import Dict, Any
import joblib
import mlflow
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import cross_val_score
import pandas as pd
from src.config import Config
from src.data.loading import load_data
from src.features.preprocessing import Preprocessor
from src.validation.schema import validate_schema
class ModelTrainer:
"""Train ML models with proper validation and logging."""
def __init__(self, config: Config):
self.config = config
self.preprocessor = Preprocessor(config.feature_config)
self.model = None
self.metrics = {}
def prepare_data(self, data_path: str) -> tuple:
"""Load and prepare training data."""
# Load data
df = load_data(data_path)
# Validate schema
validate_schema(df, self.config.expected_schema)
# Separate features and target
X = df.drop(self.config.target_column, axis=1)
y = df[self.config.target_column]
# Train-test split
from sklearn.model_selection import train_test_split
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.2, random_state=42, stratify=y
)
return X_train, X_test, y_train, y_test
def train(self, X_train, y_train) -> None:
"""Train the model."""
# Preprocess
X_processed = self.preprocessor.fit_transform(X_train)
# Initialize model
self.model = RandomForestClassifier(
n_estimators=self.config.n_estimators,
max_depth=self.config.max_depth,
min_samples_split=self.config.min_samples_split,
random_state=42,
n_jobs=-1
)
# Train with cross-validation
cv_scores = cross_val_score(
self.model, X_processed, y_train,
cv=5, scoring='roc_auc', n_jobs=-1
)
# Fit final model
self.model.fit(X_processed, y_train)
# Store metrics
self.metrics = {
'cv_mean_auc': cv_scores.mean(),
'cv_std_auc': cv_scores.std(),
'train_auc': self.model.score(X_processed, y_train)
}
def evaluate(self, X_test, y_test) -> Dict[str, float]:
"""Evaluate on test set."""
X_processed = self.preprocessor.transform(X_test)
y_pred = self.model.predict(X_test)
y_pred_proba = self.model.predict_proba(X_test)[:, 1]
from sklearn.metrics import (
accuracy_score, precision_score, recall_score,
f1_score, roc_auc_score, confusion_matrix
)
test_metrics = {
'accuracy': accuracy_score(y_test, y_pred),
'precision': precision_score(y_test, y_pred),
'recall': recall_score(y_test, y_pred),
'f1': f1_score(y_test, y_pred),
'auc': roc_auc_score(y_test, y_pred_proba)
}
return {**self.metrics, **test_metrics}
def log_experiment(self, run_name: str) -> None:
"""Log to MLflow."""
mlflow.set_experiment(self.config.experiment_name)
with mlflow.start_run(run_name=run_name):
# Log parameters
mlflow.log_params({
'n_estimators': self.config.n_estimators,
'max_depth': self.config.max_depth,
'min_samples_split': self.config.min_samples_split
})
# Log metrics
mlflow.log_metrics(self.metrics)
# Log model
mlflow.sklearn.log_model(
self.model,
"model",
registered_model_name=self.config.model_name
)
def save(self, path: str) -> None:
"""Save model and preprocessor."""
Path(path).mkdir(parents=True, exist_ok=True)
joblib.dump(self.model, Path(path) / 'model.pkl')
joblib.dump(self.preprocessor, Path(path) / 'preprocessor.pkl')
Step 2: Containerization with Docker
Production Dockerfile
# Dockerfile
FROM python:3.10-slim
# Set environment variables
ENV PYTHONDONTWRITEBYTECODE=1 \
PYTHONUNBUFFERED=1 \
PIP_NO_CACHE_DIR=1 \
PIP_DISABLE_PIP_VERSION_CHECK=1
# Install system dependencies
RUN apt-get update && apt-get install -y --no-install-recommends \
build-essential \
&& rm -rf /var/lib/apt/lists/*
# Create non-root user
RUN groupadd --gid 1000 appgroup && \
useradd --uid 1000 --gid appgroup --shell /bin/bash --create-home appuser
WORKDIR /app
# Copy requirements first (layer caching)
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Copy application code
COPY --chown=appuser:appgroup src/ ./src/
COPY --chown=appuser:appgroup models/ ./models/
# Create directories for logs and artifacts
RUN mkdir -p /app/logs /app/artifacts && \
chown -R appuser:appgroup /app
# Switch to non-root user
USER appuser
# Expose port
EXPOSE 8000
# Health check
HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \
CMD python -c "import requests; requests.get('http://localhost:8000/health')"
# Run application
CMD ["uvicorn", "src.api.app:app", "--host", "0.0.0.0", "--port", "8000"]
Requirements.txt
# requirements.txt
# Core ML libraries
scikit-learn==1.3.2
pandas==2.1.3
numpy==1.26.2
# API server
fastapi==0.104.1
uvicorn[standard]==0.24.0
pydantic==2.5.0
# Model management
mlflow==2.9.2
boto3==1.34.0
# Monitoring
prometheus-client==0.19.0
structlog==23.2.0
# Testing
pytest==7.4.3
pytest-cov==4.1.0
httpx==0.25.2
# Utilities
python-dotenv==1.0.0
pyyaml==6.0.1
joblib==1.3.2
Step 3: Model Serving with FastAPI
Prediction Service
# src/api/app.py
from fastapi import FastAPI, HTTPException, Depends, BackgroundTasks
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel, Field, validator
from typing import List, Optional, Dict, Any
import joblib
import structlog
import time
from prometheus_client import Counter, Histogram, generate_latest
from src.model_training.train import ModelTrainer
from src.config import Config
# Initialize logging
logger = structlog.get_logger()
# Prometheus metrics
PREDICTION_COUNTER = Counter(
'predictions_total',
'Total number of predictions',
['model_version', 'status']
)
PREDICTION_LATENCY = Histogram(
'prediction_latency_seconds',
'Prediction latency in seconds',
['model_version']
)
app = FastAPI(
title="ML Prediction Service",
description="Production ML model serving",
version="1.0.0"
)
# CORS middleware
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # Configure appropriately for production
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Load configuration
config = Config()
# Global model cache
model_cache = {
'model': None,
'preprocessor': None,
'version': None
}
class PredictionRequest(BaseModel):
"""Request schema for predictions."""
features: Dict[str, Any] = Field(..., description="Feature dictionary")
customer_id: Optional[str] = Field(None, description="Customer ID for tracking")
@validator('features')
def validate_features(cls, v):
if not v:
raise ValueError('Features cannot be empty')
return v
class PredictionResponse(BaseModel):
"""Response schema for predictions."""
prediction: float = Field(..., description="Prediction score")
predicted_class: int = Field(..., description="Predicted class label")
confidence: float = Field(..., description="Prediction confidence")
model_version: str = Field(..., description="Model version used")
prediction_id: str = Field(..., description="Unique prediction ID")
class ModelInfo(BaseModel):
"""Model metadata."""
model_name: str
version: str
trained_at: str
metrics: Dict[str, float]
feature_names: List[str]
def load_model(model_path: str):
"""Load model and preprocessor."""
logger.info("loading_model", path=model_path)
model_cache['model'] = joblib.load(f"{model_path}/model.pkl")
model_cache['preprocessor'] = joblib.load(f"{model_path}/preprocessor.pkl")
model_cache['version'] = model_path.split('/')[-1]
logger.info("model_loaded", version=model_cache['version'])
@app.on_event("startup")
async def startup_event():
"""Load model on startup."""
load_model(config.model_path)
@app.get("/health")
async def health_check():
"""Health check endpoint."""
return {
"status": "healthy",
"model_loaded": model_cache['model'] is not None,
"model_version": model_cache['version']
}
@app.get("/metrics")
async def metrics():
"""Prometheus metrics endpoint."""
return generate_latest()
@app.get("/model", response_model=ModelInfo)
async def get_model_info():
"""Get model metadata."""
# Load model metadata from MLflow or registry
return ModelInfo(
model_name=config.model_name,
version=model_cache['version'],
trained_at=config.model_trained_at,
metrics=config.model_metrics,
feature_names=config.feature_names
)
@app.post("/predict", response_model=PredictionResponse)
async def predict(request: PredictionRequest):
"""Make a prediction."""
start_time = time.time()
prediction_id = f"pred_{int(time.time() * 1000)}"
try:
if model_cache['model'] is None:
raise HTTPException(status_code=503, detail="Model not loaded")
# Prepare features
import pandas as pd
features_df = pd.DataFrame([request.features])
# Validate feature schema
expected_features = set(config.feature_names)
actual_features = set(features_df.columns)
if expected_features != actual_features:
missing = expected_features - actual_features
extra = actual_features - expected_features
raise HTTPException(
status_code=400,
detail=f"Feature mismatch. Missing: {missing}, Extra: {extra}"
)
# Preprocess
X_processed = model_cache['preprocessor'].transform(features_df)
# Predict
prediction_proba = model_cache['model'].predict_proba(X_processed)[0]
prediction = model_cache['model'].predict(X_processed)[0]
# Calculate latency
latency = time.time() - start_time
# Log prediction
logger.info(
"prediction_made",
prediction_id=prediction_id,
customer_id=request.customer_id,
prediction=prediction,
confidence=float(max(prediction_proba)),
latency_ms=latency * 1000
)
# Record metrics
PREDICTION_COUNTER.labels(
model_version=model_cache['version'],
status='success'
).inc()
PREDICTION_LATENCY.labels(
model_version=model_cache['version']
).observe(latency)
return PredictionResponse(
prediction=float(max(prediction_proba)),
predicted_class=int(prediction),
confidence=float(max(prediction_proba)),
model_version=model_cache['version'],
prediction_id=prediction_id
)
except HTTPException:
raise
except Exception as e:
logger.error("prediction_error", error=str(e), exc_info=True)
PREDICTION_COUNTER.labels(
model_version=model_cache['version'],
status='error'
).inc()
raise HTTPException(status_code=500, detail=str(e))
@app.post("/predict/batch")
async def batch_predict(requests: List[PredictionRequest]):
"""Batch predictions."""
if len(requests) > config.max_batch_size:
raise HTTPException(
status_code=400,
detail=f"Batch size exceeds maximum ({config.max_batch_size})"
)
results = []
for req in requests:
try:
result = await predict(req)
results.append(result)
except Exception as e:
results.append({"error": str(e), "customer_id": req.customer_id})
return {"predictions": results, "batch_size": len(requests)}
@app.post("/reload-model")
async def reload_model():
"""Reload model (for rolling updates)."""
logger.info("reloading_model")
load_model(config.model_path)
return {"status": "success", "version": model_cache['version']}
Step 4: Model Versioning with MLflow
# src/model_registry.py
import mlflow
from mlflow.tracking import MlflowClient
from typing import Optional, Dict, Any
import logging
logger = logging.getLogger(__name__)
class ModelRegistry:
"""Manage model versions and stage transitions."""
def __init__(self, tracking_uri: str, model_name: str):
self.client = MlflowClient(tracking_uri=tracking_uri)
self.model_name = model_name
def register_model(self, run_id: str) -> str:
"""Register model from run."""
model_uri = f"runs:/{run_id}/model"
registered_model = self.client.create_registered_model(
name=self.model_name,
description="Customer churn prediction model"
)
model_version = self.client.create_model_version(
name=self.model_name,
source=model_uri,
run_id=run_id
)
logger.info(f"Registered model version: {model_version.version}")
return model_version.version
def transition_to_staging(self, version: str) -> None:
"""Transition model to staging."""
self.client.transition_model_version_stage(
name=self.model_name,
version=version,
stage="Staging"
)
def transition_to_production(self, version: str) -> None:
"""Transition model to production."""
# Archive current production model
current_prod = self.get_production_model()
if current_prod:
self.client.transition_model_version_stage(
name=self.model_name,
version=current_prod['version'],
stage="Archived"
)
# Promote new model
self.client.transition_model_version_stage(
name=self.model_name,
version=version,
stage="Production"
)
def get_production_model(self) -> Optional[Dict[str, Any]]:
"""Get current production model."""
versions = self.client.search_model_versions(
f"name='{self.model_name}' AND stage='Production'"
)
if versions:
version = versions[0]
return {
'version': version.version,
'run_id': version.run_id,
'source': version.source
}
return None
def get_staging_model(self) -> Optional[Dict[str, Any]]:
"""Get current staging model."""
versions = self.client.search_model_versions(
f"name='{self.model_name}' AND stage='Staging'"
)
if versions:
return {'version': versions[0].version}
return None
def compare_versions(self, version_a: str, version_b: str) -> Dict:
"""Compare two model versions."""
run_a = self.client.get_run(self.client.get_model_version(
self.model_name, version_a
).run_id)
run_b = self.client.get_run(self.client.get_model_version(
self.model_name, version_b
).run_id)
return {
'version_a': {
'version': version_a,
'metrics': run_a.data.metrics,
'params': run_a.data.params
},
'version_b': {
'version': version_b,
'metrics': run_b.data.metrics,
'params': run_b.data.params
}
}
Step 5: A/B Testing Infrastructure
# src/api/ab_testing.py
from typing import Dict, List
import random
from dataclasses import dataclass
@dataclass
class ModelConfig:
version: str
endpoint: str
traffic_percentage: float
weight: float = 1.0
class ABTestRouter:
"""Route traffic between model versions."""
def __init__(self, model_configs: List[ModelConfig]):
self.model_configs = model_configs
self._validate_configs()
def _validate_configs(self):
"""Validate traffic percentages sum to 100%."""
total = sum(m.traffic_percentage for m in self.model_configs)
if abs(total - 1.0) > 0.001:
raise ValueError(f"Traffic percentages must sum to 1.0, got {total}")
def select_model(self, request_id: str) -> ModelConfig:
"""Select model version based on traffic split."""
# Use request_id for consistent routing (same user always gets same model)
hash_value = hash(request_id) % 100
cumulative = 0
for config in self.model_configs:
cumulative += config.traffic_percentage * 100
if hash_value < cumulative:
return config
return self.model_configs[-1]
def get_weights(self) -> Dict[str, float]:
"""Get model weights for analysis."""
return {m.version: m.weight for m in self.model_configs}
# Usage in FastAPI
router = ABTestRouter([
ModelConfig(version="v1.2.0", endpoint="http://model-v1:8000", traffic_percentage=0.9),
ModelConfig(version="v1.3.0", endpoint="http://model-v2:8000", traffic_percentage=0.1),
])
Step 6: Monitoring and Drift Detection
# src/monitoring/drift.py
import numpy as np
import pandas as pd
from scipy import stats
from typing import Dict, List
from prometheus_client import Gauge
# Prometheus gauges for drift metrics
PSI_GAUGE = Gauge('model_drift_psi', 'Population Stability Index', ['feature'])
KL_GAUGE = Gauge('model_drift_kl', 'KL Divergence', ['feature'])
class DriftDetector:
"""Detect data and concept drift."""
def __init__(self, reference_data: pd.DataFrame, threshold: float = 0.2):
self.reference_data = reference_data
self.threshold = threshold
self.reference_stats = self._compute_reference_stats()
def _compute_reference_stats(self) -> Dict:
"""Compute statistics from reference data."""
stats = {}
for column in self.reference_data.columns:
if self.reference_data[column].dtype in ['float64', 'int64']:
# Numerical features
stats[column] = {
'mean': self.reference_data[column].mean(),
'std': self.reference_data[column].std(),
'histogram': np.histogram(self.reference_data[column], bins=20)
}
else:
# Categorical features
stats[column] = {
'value_counts': self.reference_data[column].value_counts(normalize=True)
}
return stats
def calculate_psi(self, reference: np.ndarray, current: np.ndarray, bins: int = 20) -> float:
"""Calculate Population Stability Index."""
# Create bins from reference data
hist_ref, bin_edges = np.histogram(reference, bins=bins)
hist_curr, _ = np.histogram(current, bins=bin_edges)
# Convert to proportions
prop_ref = (hist_ref + 1) / (len(reference) + bins)
prop_curr = (hist_curr + 1) / (len(current) + bins)
# Calculate PSI
psi = np.sum((prop_curr - prop_ref) * np.log(prop_curr / prop_ref))
return psi
def calculate_kl_divergence(self, p: np.ndarray, q: np.ndarray) -> float:
"""Calculate KL divergence between distributions."""
# Add small epsilon to avoid log(0)
epsilon = 1e-10
p = p + epsilon
q = q + epsilon
# Normalize
p = p / np.sum(p)
q = q / np.sum(q)
return np.sum(p * np.log(p / q))
def detect_drift(self, current_data: pd.DataFrame) -> Dict[str, Any]:
"""Detect drift in current data."""
drift_results = {
'overall_drift': False,
'features_drifted': [],
'drift_scores': {}
}
for feature in current_data.columns:
if feature not in self.reference_stats:
continue
if current_data[feature].dtype in ['float64', 'int64']:
# Numerical features - use PSI
psi = self.calculate_psi(
self.reference_data[feature].values,
current_data[feature].values
)
drift_results['drift_scores'][feature] = {
'psi': psi,
'drifted': psi > self.threshold
}
# Update Prometheus metrics
PSI_GAUGE.labels(feature=feature).set(psi)
else:
# Categorical features - compare distributions
ref_counts = self.reference_stats[feature]['value_counts']
curr_counts = current_data[feature].value_counts(normalize=True)
# Align indices
all_categories = ref_counts.index.union(curr_counts.index)
ref_aligned = ref_counts.reindex(all_categories, fill_value=0)
curr_aligned = curr_counts.reindex(all_categories, fill_value=0)
kl_div = self.calculate_kl_divergence(
ref_aligned.values,
curr_aligned.values
)
drift_results['drift_scores'][feature] = {
'kl_divergence': kl_div,
'drifted': kl_div > self.threshold
}
KL_GAUGE.labels(feature=feature).set(kl_div)
if drift_results['drift_scores'][feature]['drifted']:
drift_results['features_drifted'].append(feature)
drift_results['overall_drift'] = len(drift_results['features_drifted']) > 0
return drift_results
def detect_concept_drift(
self,
predictions: np.ndarray,
actuals: np.ndarray,
window_size: int = 1000
) -> Dict[str, float]:
"""Detect concept drift by monitoring accuracy over time."""
if len(predictions) < window_size:
return {'error': 'Insufficient data'}
# Calculate accuracy in recent window vs overall
recent_accuracy = np.mean(predictions[-window_size:] == actuals[-window_size:])
overall_accuracy = np.mean(predictions == actuals)
accuracy_drop = overall_accuracy - recent_accuracy
return {
'recent_accuracy': recent_accuracy,
'overall_accuracy': overall_accuracy,
'accuracy_drop': accuracy_drop,
'concept_drift_detected': accuracy_drop > 0.05
}
Complete Production Pipeline Example
# pipeline.py
from src.config import Config
from src.data.loading import load_data
from src.model_training.train import ModelTrainer
from src.model_registry import ModelRegistry
from src.monitoring.drift import DriftDetector
import mlflow
def run_training_pipeline():
"""Run complete training pipeline."""
config = Config()
# Initialize trainer
trainer = ModelTrainer(config)
# Prepare data
X_train, X_test, y_train, y_test = trainer.prepare_data(config.data_path)
# Train model
trainer.train(X_train, y_train)
# Evaluate
metrics = trainer.evaluate(X_test, y_test)
print(f"Model metrics: {metrics}")
# Log to MLflow
with mlflow.start_run(run_name=f"training_{config.timestamp}"):
trainer.log_experiment(f"training_{config.timestamp}")
run_id = mlflow.active_run().info.run_id
# Register model
registry = ModelRegistry(config.mlflow_uri, config.model_name)
version = registry.register_model(run_id)
# Validate before promotion
if metrics['auc'] > config.min_auc_threshold:
registry.transition_to_staging(version)
print(f"Model {version} promoted to staging")
else:
print(f"Model {version} did not meet threshold (AUC: {metrics['auc']})")
return metrics
if __name__ == "__main__":
metrics = run_training_pipeline()
Key Takeaways
Deploying ML models to production requires:
- Modular code: Separate data loading, preprocessing, training, and serving
- Containerization: Docker for consistent, reproducible deployments
- Model versioning: MLflow or similar for tracking and registry
- Monitoring: Drift detection, performance tracking, alerting
- Testing: Unit tests, integration tests, model validation
- A/B testing: Infrastructure for safe rollouts and comparisons
The journey from notebook to production is challenging but essential. Start with these patterns, and iterate based on your specific requirements.
Questions about ML deployment? Reach out through the contact page or connect on LinkedIn.
Related Posts
Computer Vision Projects That Changed My Perspective: Real-World Applications
Explore real-world computer vision applications from defect detection to medical imaging. Learn practical insights on YOLO, U-Net, model deployment, and lessons from production CV systems.
DevOpsDeploying Data Applications with Kubernetes: The Complete Guide
Master Kubernetes for data workloads. Learn pods, services, deployments, statefulsets, Helm, and production patterns for running scalable, reliable data applications.
AI/MLAI Ethics and Responsible Development: A Practical Guide
Explore the ethical considerations every AI practitioner must understand. Learn about bias mitigation, privacy preservation, transparency, and accountability in AI systems with real-world examples.