Menu

© 2026 Furkanul Islam

}
{
</>

Deploying Machine Learning Models to Production: A Complete Guide

Learn how to take ML models from Jupyter notebooks to production-ready systems. Covers containerization, model versioning, A/B testing, monitoring, and MLOps best practices with real examples.

Moving machine learning models from Jupyter notebooks to production is where the real engineering challenge begins. I’ve learned this the hard way—what works perfectly in a notebook can fail spectacularly when exposed to real users, real data, and real constraints. In this comprehensive guide, I’ll share the patterns, tools, and lessons I’ve developed for deploying ML systems that scale.

The Production ML Gap

Here’s what separates notebook code from production code:

Notebook CodeProduction Code
Manual data loadingAutomated data pipelines
Hardcoded pathsConfiguration management
No testingComprehensive tests
Single versionVersion control & rollback
No monitoringMetrics, logging, alerting
Fixed resourcesAuto-scaling
No securityAuthentication, authorization

The ML Production Pipeline

┌─────────────┐    ┌──────────────┐    ┌─────────────┐    ┌─────────────┐    ┌─────────────┐
│   Data      │ -> │   Training   │ -> │   Model     │ -> │   Deploy    │ -> │   Monitor   │
│   Pipeline  │    │   Pipeline   │    │   Registry  │    │   Service   │    │   & Alert   │
└─────────────┘    └──────────────┘    └─────────────┘    └─────────────┘    └─────────────┘
     │                    │                    │                    │                    │
     ▼                    ▼                    ▼                    ▼                    ▼
  Validation           Validation         Versioning          Load Balancing     Drift Detection
  Schema checks        Metrics            Lineage             Health checks      Performance
  Quality tests        Cross-val          Artifacts           Scaling            Logging

Step 1: From Notebook to Modular Code

Notebook Anti-Pattern

# ❌ DON'T: Notebook-style code
import pandas as pd
from sklearn.ensemble import RandomForestClassifier

# Hardcoded paths
df = pd.read_csv('/Users/me/data/customers.csv')

# Preprocessing mixed with logic
df['age'] = 2024 - df['birth_year']
df = df[df['age'] > 18]
df = pd.get_dummies(df, columns=['category'])

# Model training
X = df.drop('churn', axis=1)
y = df['churn']
model = RandomForestClassifier()
model.fit(X, y)

# No validation, no tests, no config

Production-Ready Refactor

# ✅ DO: Modular, configurable code
# src/model_training/train.py

from pathlib import Path
from typing import Dict, Any
import joblib
import mlflow
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import cross_val_score
import pandas as pd

from src.config import Config
from src.data.loading import load_data
from src.features.preprocessing import Preprocessor
from src.validation.schema import validate_schema

class ModelTrainer:
    """Train ML models with proper validation and logging."""

    def __init__(self, config: Config):
        self.config = config
        self.preprocessor = Preprocessor(config.feature_config)
        self.model = None
        self.metrics = {}

    def prepare_data(self, data_path: str) -> tuple:
        """Load and prepare training data."""
        # Load data
        df = load_data(data_path)

        # Validate schema
        validate_schema(df, self.config.expected_schema)

        # Separate features and target
        X = df.drop(self.config.target_column, axis=1)
        y = df[self.config.target_column]

        # Train-test split
        from sklearn.model_selection import train_test_split
        X_train, X_test, y_train, y_test = train_test_split(
            X, y, test_size=0.2, random_state=42, stratify=y
        )

        return X_train, X_test, y_train, y_test

    def train(self, X_train, y_train) -> None:
        """Train the model."""
        # Preprocess
        X_processed = self.preprocessor.fit_transform(X_train)

        # Initialize model
        self.model = RandomForestClassifier(
            n_estimators=self.config.n_estimators,
            max_depth=self.config.max_depth,
            min_samples_split=self.config.min_samples_split,
            random_state=42,
            n_jobs=-1
        )

        # Train with cross-validation
        cv_scores = cross_val_score(
            self.model, X_processed, y_train,
            cv=5, scoring='roc_auc', n_jobs=-1
        )

        # Fit final model
        self.model.fit(X_processed, y_train)

        # Store metrics
        self.metrics = {
            'cv_mean_auc': cv_scores.mean(),
            'cv_std_auc': cv_scores.std(),
            'train_auc': self.model.score(X_processed, y_train)
        }

    def evaluate(self, X_test, y_test) -> Dict[str, float]:
        """Evaluate on test set."""
        X_processed = self.preprocessor.transform(X_test)

        y_pred = self.model.predict(X_test)
        y_pred_proba = self.model.predict_proba(X_test)[:, 1]

        from sklearn.metrics import (
            accuracy_score, precision_score, recall_score,
            f1_score, roc_auc_score, confusion_matrix
        )

        test_metrics = {
            'accuracy': accuracy_score(y_test, y_pred),
            'precision': precision_score(y_test, y_pred),
            'recall': recall_score(y_test, y_pred),
            'f1': f1_score(y_test, y_pred),
            'auc': roc_auc_score(y_test, y_pred_proba)
        }

        return {**self.metrics, **test_metrics}

    def log_experiment(self, run_name: str) -> None:
        """Log to MLflow."""
        mlflow.set_experiment(self.config.experiment_name)

        with mlflow.start_run(run_name=run_name):
            # Log parameters
            mlflow.log_params({
                'n_estimators': self.config.n_estimators,
                'max_depth': self.config.max_depth,
                'min_samples_split': self.config.min_samples_split
            })

            # Log metrics
            mlflow.log_metrics(self.metrics)

            # Log model
            mlflow.sklearn.log_model(
                self.model,
                "model",
                registered_model_name=self.config.model_name
            )

    def save(self, path: str) -> None:
        """Save model and preprocessor."""
        Path(path).mkdir(parents=True, exist_ok=True)
        joblib.dump(self.model, Path(path) / 'model.pkl')
        joblib.dump(self.preprocessor, Path(path) / 'preprocessor.pkl')

Step 2: Containerization with Docker

Production Dockerfile

# Dockerfile
FROM python:3.10-slim

# Set environment variables
ENV PYTHONDONTWRITEBYTECODE=1 \
    PYTHONUNBUFFERED=1 \
    PIP_NO_CACHE_DIR=1 \
    PIP_DISABLE_PIP_VERSION_CHECK=1

# Install system dependencies
RUN apt-get update && apt-get install -y --no-install-recommends \
    build-essential \
    && rm -rf /var/lib/apt/lists/*

# Create non-root user
RUN groupadd --gid 1000 appgroup && \
    useradd --uid 1000 --gid appgroup --shell /bin/bash --create-home appuser

WORKDIR /app

# Copy requirements first (layer caching)
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# Copy application code
COPY --chown=appuser:appgroup src/ ./src/
COPY --chown=appuser:appgroup models/ ./models/

# Create directories for logs and artifacts
RUN mkdir -p /app/logs /app/artifacts && \
    chown -R appuser:appgroup /app

# Switch to non-root user
USER appuser

# Expose port
EXPOSE 8000

# Health check
HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \
    CMD python -c "import requests; requests.get('http://localhost:8000/health')"

# Run application
CMD ["uvicorn", "src.api.app:app", "--host", "0.0.0.0", "--port", "8000"]

Requirements.txt

# requirements.txt
# Core ML libraries
scikit-learn==1.3.2
pandas==2.1.3
numpy==1.26.2

# API server
fastapi==0.104.1
uvicorn[standard]==0.24.0
pydantic==2.5.0

# Model management
mlflow==2.9.2
boto3==1.34.0

# Monitoring
prometheus-client==0.19.0
structlog==23.2.0

# Testing
pytest==7.4.3
pytest-cov==4.1.0
httpx==0.25.2

# Utilities
python-dotenv==1.0.0
pyyaml==6.0.1
joblib==1.3.2

Step 3: Model Serving with FastAPI

Prediction Service

# src/api/app.py

from fastapi import FastAPI, HTTPException, Depends, BackgroundTasks
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel, Field, validator
from typing import List, Optional, Dict, Any
import joblib
import structlog
import time
from prometheus_client import Counter, Histogram, generate_latest

from src.model_training.train import ModelTrainer
from src.config import Config

# Initialize logging
logger = structlog.get_logger()

# Prometheus metrics
PREDICTION_COUNTER = Counter(
    'predictions_total',
    'Total number of predictions',
    ['model_version', 'status']
)

PREDICTION_LATENCY = Histogram(
    'prediction_latency_seconds',
    'Prediction latency in seconds',
    ['model_version']
)

app = FastAPI(
    title="ML Prediction Service",
    description="Production ML model serving",
    version="1.0.0"
)

# CORS middleware
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],  # Configure appropriately for production
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

# Load configuration
config = Config()

# Global model cache
model_cache = {
    'model': None,
    'preprocessor': None,
    'version': None
}

class PredictionRequest(BaseModel):
    """Request schema for predictions."""

    features: Dict[str, Any] = Field(..., description="Feature dictionary")
    customer_id: Optional[str] = Field(None, description="Customer ID for tracking")

    @validator('features')
    def validate_features(cls, v):
        if not v:
            raise ValueError('Features cannot be empty')
        return v

class PredictionResponse(BaseModel):
    """Response schema for predictions."""

    prediction: float = Field(..., description="Prediction score")
    predicted_class: int = Field(..., description="Predicted class label")
    confidence: float = Field(..., description="Prediction confidence")
    model_version: str = Field(..., description="Model version used")
    prediction_id: str = Field(..., description="Unique prediction ID")

class ModelInfo(BaseModel):
    """Model metadata."""

    model_name: str
    version: str
    trained_at: str
    metrics: Dict[str, float]
    feature_names: List[str]

def load_model(model_path: str):
    """Load model and preprocessor."""
    logger.info("loading_model", path=model_path)

    model_cache['model'] = joblib.load(f"{model_path}/model.pkl")
    model_cache['preprocessor'] = joblib.load(f"{model_path}/preprocessor.pkl")
    model_cache['version'] = model_path.split('/')[-1]

    logger.info("model_loaded", version=model_cache['version'])

@app.on_event("startup")
async def startup_event():
    """Load model on startup."""
    load_model(config.model_path)

@app.get("/health")
async def health_check():
    """Health check endpoint."""
    return {
        "status": "healthy",
        "model_loaded": model_cache['model'] is not None,
        "model_version": model_cache['version']
    }

@app.get("/metrics")
async def metrics():
    """Prometheus metrics endpoint."""
    return generate_latest()

@app.get("/model", response_model=ModelInfo)
async def get_model_info():
    """Get model metadata."""
    # Load model metadata from MLflow or registry
    return ModelInfo(
        model_name=config.model_name,
        version=model_cache['version'],
        trained_at=config.model_trained_at,
        metrics=config.model_metrics,
        feature_names=config.feature_names
    )

@app.post("/predict", response_model=PredictionResponse)
async def predict(request: PredictionRequest):
    """Make a prediction."""
    start_time = time.time()
    prediction_id = f"pred_{int(time.time() * 1000)}"

    try:
        if model_cache['model'] is None:
            raise HTTPException(status_code=503, detail="Model not loaded")

        # Prepare features
        import pandas as pd
        features_df = pd.DataFrame([request.features])

        # Validate feature schema
        expected_features = set(config.feature_names)
        actual_features = set(features_df.columns)

        if expected_features != actual_features:
            missing = expected_features - actual_features
            extra = actual_features - expected_features
            raise HTTPException(
                status_code=400,
                detail=f"Feature mismatch. Missing: {missing}, Extra: {extra}"
            )

        # Preprocess
        X_processed = model_cache['preprocessor'].transform(features_df)

        # Predict
        prediction_proba = model_cache['model'].predict_proba(X_processed)[0]
        prediction = model_cache['model'].predict(X_processed)[0]

        # Calculate latency
        latency = time.time() - start_time

        # Log prediction
        logger.info(
            "prediction_made",
            prediction_id=prediction_id,
            customer_id=request.customer_id,
            prediction=prediction,
            confidence=float(max(prediction_proba)),
            latency_ms=latency * 1000
        )

        # Record metrics
        PREDICTION_COUNTER.labels(
            model_version=model_cache['version'],
            status='success'
        ).inc()

        PREDICTION_LATENCY.labels(
            model_version=model_cache['version']
        ).observe(latency)

        return PredictionResponse(
            prediction=float(max(prediction_proba)),
            predicted_class=int(prediction),
            confidence=float(max(prediction_proba)),
            model_version=model_cache['version'],
            prediction_id=prediction_id
        )

    except HTTPException:
        raise
    except Exception as e:
        logger.error("prediction_error", error=str(e), exc_info=True)

        PREDICTION_COUNTER.labels(
            model_version=model_cache['version'],
            status='error'
        ).inc()

        raise HTTPException(status_code=500, detail=str(e))

@app.post("/predict/batch")
async def batch_predict(requests: List[PredictionRequest]):
    """Batch predictions."""
    if len(requests) > config.max_batch_size:
        raise HTTPException(
            status_code=400,
            detail=f"Batch size exceeds maximum ({config.max_batch_size})"
        )

    results = []
    for req in requests:
        try:
            result = await predict(req)
            results.append(result)
        except Exception as e:
            results.append({"error": str(e), "customer_id": req.customer_id})

    return {"predictions": results, "batch_size": len(requests)}

@app.post("/reload-model")
async def reload_model():
    """Reload model (for rolling updates)."""
    logger.info("reloading_model")
    load_model(config.model_path)
    return {"status": "success", "version": model_cache['version']}

Step 4: Model Versioning with MLflow

# src/model_registry.py

import mlflow
from mlflow.tracking import MlflowClient
from typing import Optional, Dict, Any
import logging

logger = logging.getLogger(__name__)

class ModelRegistry:
    """Manage model versions and stage transitions."""

    def __init__(self, tracking_uri: str, model_name: str):
        self.client = MlflowClient(tracking_uri=tracking_uri)
        self.model_name = model_name

    def register_model(self, run_id: str) -> str:
        """Register model from run."""
        model_uri = f"runs:/{run_id}/model"

        registered_model = self.client.create_registered_model(
            name=self.model_name,
            description="Customer churn prediction model"
        )

        model_version = self.client.create_model_version(
            name=self.model_name,
            source=model_uri,
            run_id=run_id
        )

        logger.info(f"Registered model version: {model_version.version}")
        return model_version.version

    def transition_to_staging(self, version: str) -> None:
        """Transition model to staging."""
        self.client.transition_model_version_stage(
            name=self.model_name,
            version=version,
            stage="Staging"
        )

    def transition_to_production(self, version: str) -> None:
        """Transition model to production."""
        # Archive current production model
        current_prod = self.get_production_model()
        if current_prod:
            self.client.transition_model_version_stage(
                name=self.model_name,
                version=current_prod['version'],
                stage="Archived"
            )

        # Promote new model
        self.client.transition_model_version_stage(
            name=self.model_name,
            version=version,
            stage="Production"
        )

    def get_production_model(self) -> Optional[Dict[str, Any]]:
        """Get current production model."""
        versions = self.client.search_model_versions(
            f"name='{self.model_name}' AND stage='Production'"
        )

        if versions:
            version = versions[0]
            return {
                'version': version.version,
                'run_id': version.run_id,
                'source': version.source
            }
        return None

    def get_staging_model(self) -> Optional[Dict[str, Any]]:
        """Get current staging model."""
        versions = self.client.search_model_versions(
            f"name='{self.model_name}' AND stage='Staging'"
        )

        if versions:
            return {'version': versions[0].version}
        return None

    def compare_versions(self, version_a: str, version_b: str) -> Dict:
        """Compare two model versions."""
        run_a = self.client.get_run(self.client.get_model_version(
            self.model_name, version_a
        ).run_id)

        run_b = self.client.get_run(self.client.get_model_version(
            self.model_name, version_b
        ).run_id)

        return {
            'version_a': {
                'version': version_a,
                'metrics': run_a.data.metrics,
                'params': run_a.data.params
            },
            'version_b': {
                'version': version_b,
                'metrics': run_b.data.metrics,
                'params': run_b.data.params
            }
        }

Step 5: A/B Testing Infrastructure

# src/api/ab_testing.py

from typing import Dict, List
import random
from dataclasses import dataclass

@dataclass
class ModelConfig:
    version: str
    endpoint: str
    traffic_percentage: float
    weight: float = 1.0

class ABTestRouter:
    """Route traffic between model versions."""

    def __init__(self, model_configs: List[ModelConfig]):
        self.model_configs = model_configs
        self._validate_configs()

    def _validate_configs(self):
        """Validate traffic percentages sum to 100%."""
        total = sum(m.traffic_percentage for m in self.model_configs)
        if abs(total - 1.0) > 0.001:
            raise ValueError(f"Traffic percentages must sum to 1.0, got {total}")

    def select_model(self, request_id: str) -> ModelConfig:
        """Select model version based on traffic split."""
        # Use request_id for consistent routing (same user always gets same model)
        hash_value = hash(request_id) % 100

        cumulative = 0
        for config in self.model_configs:
            cumulative += config.traffic_percentage * 100
            if hash_value < cumulative:
                return config

        return self.model_configs[-1]

    def get_weights(self) -> Dict[str, float]:
        """Get model weights for analysis."""
        return {m.version: m.weight for m in self.model_configs}

# Usage in FastAPI
router = ABTestRouter([
    ModelConfig(version="v1.2.0", endpoint="http://model-v1:8000", traffic_percentage=0.9),
    ModelConfig(version="v1.3.0", endpoint="http://model-v2:8000", traffic_percentage=0.1),
])

Step 6: Monitoring and Drift Detection

# src/monitoring/drift.py

import numpy as np
import pandas as pd
from scipy import stats
from typing import Dict, List
from prometheus_client import Gauge

# Prometheus gauges for drift metrics
PSI_GAUGE = Gauge('model_drift_psi', 'Population Stability Index', ['feature'])
KL_GAUGE = Gauge('model_drift_kl', 'KL Divergence', ['feature'])

class DriftDetector:
    """Detect data and concept drift."""

    def __init__(self, reference_data: pd.DataFrame, threshold: float = 0.2):
        self.reference_data = reference_data
        self.threshold = threshold
        self.reference_stats = self._compute_reference_stats()

    def _compute_reference_stats(self) -> Dict:
        """Compute statistics from reference data."""
        stats = {}

        for column in self.reference_data.columns:
            if self.reference_data[column].dtype in ['float64', 'int64']:
                # Numerical features
                stats[column] = {
                    'mean': self.reference_data[column].mean(),
                    'std': self.reference_data[column].std(),
                    'histogram': np.histogram(self.reference_data[column], bins=20)
                }
            else:
                # Categorical features
                stats[column] = {
                    'value_counts': self.reference_data[column].value_counts(normalize=True)
                }

        return stats

    def calculate_psi(self, reference: np.ndarray, current: np.ndarray, bins: int = 20) -> float:
        """Calculate Population Stability Index."""
        # Create bins from reference data
        hist_ref, bin_edges = np.histogram(reference, bins=bins)
        hist_curr, _ = np.histogram(current, bins=bin_edges)

        # Convert to proportions
        prop_ref = (hist_ref + 1) / (len(reference) + bins)
        prop_curr = (hist_curr + 1) / (len(current) + bins)

        # Calculate PSI
        psi = np.sum((prop_curr - prop_ref) * np.log(prop_curr / prop_ref))

        return psi

    def calculate_kl_divergence(self, p: np.ndarray, q: np.ndarray) -> float:
        """Calculate KL divergence between distributions."""
        # Add small epsilon to avoid log(0)
        epsilon = 1e-10
        p = p + epsilon
        q = q + epsilon

        # Normalize
        p = p / np.sum(p)
        q = q / np.sum(q)

        return np.sum(p * np.log(p / q))

    def detect_drift(self, current_data: pd.DataFrame) -> Dict[str, Any]:
        """Detect drift in current data."""
        drift_results = {
            'overall_drift': False,
            'features_drifted': [],
            'drift_scores': {}
        }

        for feature in current_data.columns:
            if feature not in self.reference_stats:
                continue

            if current_data[feature].dtype in ['float64', 'int64']:
                # Numerical features - use PSI
                psi = self.calculate_psi(
                    self.reference_data[feature].values,
                    current_data[feature].values
                )

                drift_results['drift_scores'][feature] = {
                    'psi': psi,
                    'drifted': psi > self.threshold
                }

                # Update Prometheus metrics
                PSI_GAUGE.labels(feature=feature).set(psi)

            else:
                # Categorical features - compare distributions
                ref_counts = self.reference_stats[feature]['value_counts']
                curr_counts = current_data[feature].value_counts(normalize=True)

                # Align indices
                all_categories = ref_counts.index.union(curr_counts.index)
                ref_aligned = ref_counts.reindex(all_categories, fill_value=0)
                curr_aligned = curr_counts.reindex(all_categories, fill_value=0)

                kl_div = self.calculate_kl_divergence(
                    ref_aligned.values,
                    curr_aligned.values
                )

                drift_results['drift_scores'][feature] = {
                    'kl_divergence': kl_div,
                    'drifted': kl_div > self.threshold
                }

                KL_GAUGE.labels(feature=feature).set(kl_div)

            if drift_results['drift_scores'][feature]['drifted']:
                drift_results['features_drifted'].append(feature)

        drift_results['overall_drift'] = len(drift_results['features_drifted']) > 0

        return drift_results

    def detect_concept_drift(
        self,
        predictions: np.ndarray,
        actuals: np.ndarray,
        window_size: int = 1000
    ) -> Dict[str, float]:
        """Detect concept drift by monitoring accuracy over time."""
        if len(predictions) < window_size:
            return {'error': 'Insufficient data'}

        # Calculate accuracy in recent window vs overall
        recent_accuracy = np.mean(predictions[-window_size:] == actuals[-window_size:])
        overall_accuracy = np.mean(predictions == actuals)

        accuracy_drop = overall_accuracy - recent_accuracy

        return {
            'recent_accuracy': recent_accuracy,
            'overall_accuracy': overall_accuracy,
            'accuracy_drop': accuracy_drop,
            'concept_drift_detected': accuracy_drop > 0.05
        }

Complete Production Pipeline Example

# pipeline.py

from src.config import Config
from src.data.loading import load_data
from src.model_training.train import ModelTrainer
from src.model_registry import ModelRegistry
from src.monitoring.drift import DriftDetector
import mlflow

def run_training_pipeline():
    """Run complete training pipeline."""
    config = Config()

    # Initialize trainer
    trainer = ModelTrainer(config)

    # Prepare data
    X_train, X_test, y_train, y_test = trainer.prepare_data(config.data_path)

    # Train model
    trainer.train(X_train, y_train)

    # Evaluate
    metrics = trainer.evaluate(X_test, y_test)
    print(f"Model metrics: {metrics}")

    # Log to MLflow
    with mlflow.start_run(run_name=f"training_{config.timestamp}"):
        trainer.log_experiment(f"training_{config.timestamp}")
        run_id = mlflow.active_run().info.run_id

    # Register model
    registry = ModelRegistry(config.mlflow_uri, config.model_name)
    version = registry.register_model(run_id)

    # Validate before promotion
    if metrics['auc'] > config.min_auc_threshold:
        registry.transition_to_staging(version)
        print(f"Model {version} promoted to staging")
    else:
        print(f"Model {version} did not meet threshold (AUC: {metrics['auc']})")

    return metrics

if __name__ == "__main__":
    metrics = run_training_pipeline()

Key Takeaways

Deploying ML models to production requires:

  1. Modular code: Separate data loading, preprocessing, training, and serving
  2. Containerization: Docker for consistent, reproducible deployments
  3. Model versioning: MLflow or similar for tracking and registry
  4. Monitoring: Drift detection, performance tracking, alerting
  5. Testing: Unit tests, integration tests, model validation
  6. A/B testing: Infrastructure for safe rollouts and comparisons

The journey from notebook to production is challenging but essential. Start with these patterns, and iterate based on your specific requirements.


Questions about ML deployment? Reach out through the contact page or connect on LinkedIn.

MD Furkanul Islam

MD Furkanul Islam

Data Engineer & AI/ML Specialist

9+ years building intelligent data systems at scale. Passionate about bridging the gap between data engineering, AI, and robotics.