Menu

© 2026 Furkanul Islam

}
{
</>

Python Essentials for Data Engineering: Libraries, Patterns, and Best Practices

Master the Python libraries, design patterns, and performance techniques that every data engineer needs. Comprehensive guide with real-world examples for building robust data pipelines.

Python has become the lingua franca of data engineering, and for good reason. Its simplicity, rich ecosystem, and versatility make it the perfect tool for building data pipelines, ETL processes, and analytics platforms. In this guide, I’ll share the essential libraries, patterns, and techniques I use daily.

Why Python Dominates Data Engineering

Before diving into code, let’s understand why Python won:

FactorWhy It Matters
ReadabilityPipelines are read more than written
Library ecosystemOne language for ETL, ML, and analytics
Community supportQuick solutions to common problems
IntegrationConnects to every database and API
Talent poolEasier to hire Python developers

Essential Libraries Every Data Engineer Should Master

1. Pandas: Data Manipulation Foundation

Despite newer alternatives, Pandas remains indispensable for small to medium datasets:

import pandas as pd
from pandas import DataFrame, Series

# Efficient data loading
df = pd.read_parquet('s3://data-lake/events/')  # Faster than CSV

# Chained operations for readability
result = (
    df
    .query('status == "active" and amount > 100')
    .assign(
        revenue=lambda x: x['amount'] * x['price'],
        date=lambda x: pd.to_datetime(x['timestamp']).dt.date
    )
    .groupby(['date', 'category'])
    .agg(
        total_revenue=('revenue', 'sum'),
        avg_amount=('amount', 'mean'),
        transaction_count=('revenue', 'count')
    )
    .reset_index()
    .sort_values('total_revenue', ascending=False)
)

# Memory optimization for large datasets
def optimize_dataframe(df):
    """Reduce memory usage by optimizing dtypes."""
    for col in df.columns:
        col_type = df[col].dtype

        if col_type in ['int8', 'int16', 'int32', 'int64']:
            c_min = df[col].min()
            c_max = df[col].max()
            if c_min > np.iinfo(np.int8).min and c_max < np.iinfo(np.int8).max:
                df[col] = df[col].astype(np.int8)
            elif c_min > np.iinfo(np.int16).min and c_max < np.iinfo(np.int16).max:
                df[col] = df[col].astype(np.int16)
            # ... continue for other types

        elif col_type in ['float32', 'float64']:
            if df[col].min() > np.finfo(np.float32).min and df[col].max() < np.finfo(np.float32).max:
                df[col] = df[col].astype(np.float32)

        elif col_type == 'object':
            if df[col].nunique() / len(df) < 0.5:  # Low cardinality
                df[col] = df[col].astype('category')

    return df

df_optimized = optimize_dataframe(df.copy())
print(f"Memory reduced: {df.memory_usage(deep=True).sum() / 1024**2:.2f} MB -> {df_optimized.memory_usage(deep=True).sum() / 1024**2:.2f} MB")

2. PySpark: Large-Scale Processing

When data exceeds memory, PySpark is your go-to:

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, TimestampType

# Initialize with optimal settings
spark = SparkSession.builder \
    .appName("DataEngineeringPipeline") \
    .config("spark.sql.adaptive.enabled", "true") \
    .config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
    .config("spark.sql.shuffle.partitions", "200") \
    .getOrCreate()

# Define schema for faster parsing (schema inference is slow)
schema = StructType([
    StructField("user_id", StringType(), True),
    StructField("event_type", StringType(), True),
    StructField("amount", IntegerType(), True),
    StructField("timestamp", TimestampType(), True),
    StructField("metadata", StringType(), True)
])

# Read with schema
df = spark.read.schema(schema).parquet("s3://data-lake/events/")

# Window functions for analytics
window_spec = Window.partitionBy("user_id").orderBy("timestamp")

enriched = df.withColumn(
    "running_total",
    F.sum("amount").over(window_spec.rowsBetween(Window.unboundedPreceding, Window.currentRow))
).withColumn(
    "previous_event",
    F.lag("event_type", 1).over(window_spec)
).withColumn(
    "days_since_last",
    F.datediff("timestamp", F.lag("timestamp", 1).over(window_spec))
)

# Efficient joins with broadcast
small_df = spark.read.parquet("s3://reference-data/countries/")
broadcast_small = F.broadcast(small_df)

result = enriched.join(broadcast_small, enriched.country_code == small_df.code, "left")

# Write with partitioning
result.write \
    .mode("overwrite") \
    .partitionBy("event_date", "country") \
    .parquet("s3://data-lake/processed/events/")

3. SQLAlchemy: Database Abstraction

from sqlalchemy import create_engine, Column, Integer, String, DateTime, func, select
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from contextlib import contextmanager

Base = declarative_base()

class Event(Base):
    __tablename__ = 'events'

    id = Column(Integer, primary_key=True)
    user_id = Column(String, nullable=False)
    event_type = Column(String, nullable=False)
    timestamp = Column(DateTime, default=func.now())

    def __repr__(self):
        return f"<Event(user_id='{self.user_id}', event_type='{self.event_type}')>"

# Connection pool configuration
engine = create_engine(
    'postgresql://user:password@host:5432/analytics',
    pool_size=20,
    max_overflow=10,
    pool_recycle=3600,
    pool_pre_ping=True,  # Connection health check
    echo=False  # Set True for SQL logging
)

Session = sessionmaker(bind=engine)

@contextmanager
def session_scope():
    """Context manager for database sessions."""
    session = Session()
    try:
        yield session
        session.commit()
    except Exception:
        session.rollback()
        raise
    finally:
        session.close()

# Usage
with session_scope() as session:
    # Bulk insert for performance
    events = [
        Event(user_id=f"user_{i}", event_type="signup")
        for i in range(1000)
    ]
    session.bulk_save_objects(events)

    # Efficient queries
    results = session.query(
        Event.user_id,
        func.count(Event.id).label('event_count')
    ).group_by(Event.user_id).all()

4. Apache Airflow: Workflow Orchestration

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.postgres.operators.postgres import PostgresOperator
from datetime import datetime, timedelta

default_args = {
    'owner': 'data-engineering',
    'depends_on_past': False,
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 3,
    'retry_delay': timedelta(minutes=5),
}

def extract_data(**context):
    """Extract data from source API."""
    import requests

    response = requests.get('https://api.source.com/data')
    data = response.json()

    # Push to XCom for downstream tasks
    context['ti'].xcom_push(key='extracted_records', value=len(data))
    return data

def transform_data(**context):
    """Transform extracted data."""
    ti = context['ti']
    raw_data = ti.xcom_pull(task_ids='extract', key='return_value')

    # Transformation logic
    transformed = [
        {
            'id': record['id'],
            'value': float(record['value']),
            'processed_at': datetime.now().isoformat()
        }
        for record in raw_data
    ]

    ti.xcom_push(key='transformed_data', value=transformed)
    return transformed

def load_data(**context):
    """Load transformed data to warehouse."""
    ti = context['ti']
    data = ti.xcom_pull(task_ids='transform', key='transformed_data')

    # Load to database
    # ... database insert logic

with DAG(
    'daily_etl_pipeline',
    default_args=default_args,
    description='Daily ETL pipeline for analytics',
    schedule_interval='0 2 * * *',  # Daily at 2 AM
    start_date=datetime(2026, 1, 1),
    catchup=False,
    tags=['etl', 'analytics'],
) as dag:

    extract = PythonOperator(
        task_id='extract',
        python_callable=extract_data,
    )

    transform = PythonOperator(
        task_id='transform',
        python_callable=transform_data,
    )

    load = PythonOperator(
        task_id='load',
        python_callable=load_data,
    )

    # Define task dependencies
    extract >> transform >> load

5. Pydantic: Data Validation

from pydantic import BaseModel, EmailStr, validator, Field
from datetime import datetime
from typing import Optional, List

class EventSchema(BaseModel):
    event_id: str = Field(..., min_length=10, description="Unique event identifier")
    user_id: str = Field(..., pattern=r"^usr_[0-9]+$")
    event_type: str = Field(..., description="Type of event")
    timestamp: datetime
    amount: Optional[float] = Field(None, ge=0, le=1000000)
    metadata: Optional[dict] = None

    @validator('event_id')
    def validate_event_id(cls, v):
        if not v.startswith('evt_'):
            raise ValueError('Event ID must start with evt_')
        return v

    @validator('timestamp')
    def validate_timestamp_not_future(cls, v):
        if v > datetime.utcnow():
            raise ValueError('Timestamp cannot be in the future')
        return v

    class Config:
        schema_extra = {
            "example": {
                "event_id": "evt_12345678",
                "user_id": "usr_12345",
                "event_type": "purchase",
                "timestamp": "2026-03-09T10:00:00Z",
                "amount": 99.99
            }
        }

# Usage in pipeline
def process_event(raw_data: dict) -> dict:
    try:
        event = EventSchema(**raw_data)
        # Process validated event
        return event.dict()
    except ValidationError as e:
        log.error(f"Validation failed: {e}")
        send_to_dlq(raw_data, str(e))
        raise

Essential Design Patterns for Data Engineering

1. ETL Pipeline Pattern

from abc import ABC, abstractmethod
from typing import Any, Dict, List
import logging

logger = logging.getLogger(__name__)

class Extractor(ABC):
    @abstractmethod
    def extract(self, **kwargs) -> List[Dict]:
        pass

class Transformer(ABC):
    @abstractmethod
    def transform(self, data: List[Dict], **kwargs) -> List[Dict]:
        pass

class Loader(ABC):
    @abstractmethod
    def load(self, data: List[Dict], **kwargs) -> int:
        pass

class ETLPipeline:
    def __init__(self, extractor: Extractor, transformer: Transformer, loader: Loader):
        self.extractor = extractor
        self.transformer = transformer
        self.loader = loader

    def run(self, **kwargs) -> Dict[str, Any]:
        """Execute the full pipeline with error handling."""
        try:
            logger.info(f"Starting pipeline with kwargs: {kwargs}")

            # Extract
            logger.info("Starting extraction...")
            raw_data = self.extractor.extract(**kwargs)
            logger.info(f"Extracted {len(raw_data)} records")

            # Transform
            logger.info("Starting transformation...")
            transformed_data = self.transformer.transform(raw_data, **kwargs)
            logger.info(f"Transformed {len(transformed_data)} records")

            # Load
            logger.info("Starting loading...")
            loaded_count = self.loader.load(transformed_data, **kwargs)
            logger.info(f"Loaded {loaded_count} records")

            return {
                'status': 'success',
                'extracted': len(raw_data),
                'transformed': len(transformed_data),
                'loaded': loaded_count
            }

        except Exception as e:
            logger.error(f"Pipeline failed: {e}")
            return {
                'status': 'failed',
                'error': str(e)
            }

# Concrete implementation
class APIExtractor(Extractor):
    def __init__(self, api_client):
        self.api_client = api_client

    def extract(self, endpoint: str, **kwargs) -> List[Dict]:
        response = self.api_client.get(endpoint, params=kwargs)
        return response.json()['data']

# Usage
pipeline = ETLPipeline(
    extractor=APIExtractor(api_client),
    transformer=DataTransformer(),
    loader=DatabaseLoader(engine)
)
result = pipeline.run(endpoint='/events', start_date='2026-01-01')

2. Incremental Load Pattern

from datetime import datetime, timedelta

class IncrementalLoader:
    def __init__(self, db_engine, state_table='etl_state'):
        self.engine = db_engine
        self.state_table = state_table

    def get_last_processed(self, source_name: str) -> datetime:
        """Get the last successfully processed timestamp."""
        query = f"""
            SELECT last_processed_at
            FROM {self.state_table}
            WHERE source_name = %s
        """
        with self.engine.connect() as conn:
            result = conn.execute(query, (source_name,)).fetchone()
            return result[0] if result else datetime.min

    def update_state(self, source_name: str, timestamp: datetime):
        """Update the processing state."""
        upsert = f"""
            INSERT INTO {self.state_table} (source_name, last_processed_at, updated_at)
            VALUES (%s, %s, NOW())
            ON CONFLICT (source_name)
            DO UPDATE SET last_processed_at = %s, updated_at = NOW()
        """
        with self.engine.connect() as conn:
            conn.execute(upsert, (source_name, timestamp, timestamp))

    def load_incremental(self, source_name: str, fetch_func, load_func, batch_size=1000):
        """Load only new/changed data since last run."""
        last_timestamp = self.get_last_processed(source_name)

        # Fetch only new records
        new_records = fetch_func(since=last_timestamp, batch_size=batch_size)

        if not new_records:
            logger.info(f"No new records for {source_name}")
            return 0

        # Load new records
        loaded = load_func(new_records)

        # Update state with max timestamp from loaded records
        max_timestamp = max(r['updated_at'] for r in new_records)
        self.update_state(source_name, max_timestamp)

        return loaded

3. Retry with Exponential Backoff

import time
import random
from functools import wraps

def retry_with_backoff(
    max_retries=5,
    base_delay=1.0,
    max_delay=60.0,
    jitter=True,
    exceptions=(Exception,)
):
    """Decorator for retrying failed operations with exponential backoff."""
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            delay = base_delay
            last_exception = None

            for attempt in range(max_retries + 1):
                try:
                    return func(*args, **kwargs)
                except exceptions as e:
                    last_exception = e

                    if attempt == max_retries:
                        break

                    # Calculate delay with optional jitter
                    if jitter:
                        delay = delay * 2 + random.uniform(0, 1)
                    else:
                        delay = delay * 2

                    delay = min(delay, max_delay)

                    logger.warning(
                        f"Attempt {attempt + 1}/{max_retries + 1} failed: {e}. "
                        f"Retrying in {delay:.2f}s..."
                    )
                    time.sleep(delay)

            raise last_exception
        return wrapper
    return decorator

# Usage
@retry_with_backoff(
    max_retries=5,
    exceptions=(ConnectionError, TimeoutError)
)
def fetch_from_api(endpoint):
    response = requests.get(endpoint, timeout=30)
    response.raise_for_status()
    return response.json()

4. Data Quality Checks

from dataclasses import dataclass
from typing import Callable, List, Optional
from enum import Enum

class Severity(Enum):
    WARNING = "warning"
    ERROR = "error"
    CRITICAL = "critical"

@dataclass
class CheckResult:
    name: str
    passed: bool
    severity: Severity
    message: str
    value: Optional[float] = None

class DataQualityChecker:
    def __init__(self, df):
        self.df = df
        self.results: List[CheckResult] = []

    def check_not_null(self, column: str, threshold: float = 0.95) -> CheckResult:
        """Check that column has at least threshold % non-null values."""
        non_null_pct = self.df[column].notna().mean()
        passed = non_null_pct >= threshold

        result = CheckResult(
            name=f"not_null_{column}",
            passed=passed,
            severity=Severity.ERROR if threshold == 1.0 else Severity.WARNING,
            message=f"Column {column} has {non_null_pct*100:.1f}% non-null (expected >= {threshold*100}%)",
            value=non_null_pct
        )
        self.results.append(result)
        return result

    def check_unique(self, column: str) -> CheckResult:
        """Check that column values are unique."""
        duplicate_count = self.df[column].duplicated().sum()
        total_count = len(self.df)
        passed = duplicate_count == 0

        result = CheckResult(
            name=f"unique_{column}",
            passed=passed,
            severity=Severity.ERROR,
            message=f"Column {column} has {duplicate_count} duplicates out of {total_count} rows",
            value=1 - (duplicate_count / total_count)
        )
        self.results.append(result)
        return result

    def check_range(self, column: str, min_val: float, max_val: float) -> CheckResult:
        """Check that column values are within expected range."""
        in_range = ((self.df[column] >= min_val) & (self.df[column] <= max_val)).mean()
        passed = in_range >= 0.99  # Allow 1% outliers

        result = CheckResult(
            name=f"range_{column}",
            passed=passed,
            severity=Severity.WARNING,
            message=f"Column {column} has {in_range*100:.1f}% values in range [{min_val}, {max_val}]",
            value=in_range
        )
        self.results.append(result)
        return result

    def check_custom(self, name: str, check_func: Callable, threshold: float = 0.95) -> CheckResult:
        """Run a custom check function."""
        result_value = check_func(self.df)
        passed = result_value >= threshold

        result = CheckResult(
            name=f"custom_{name}",
            passed=passed,
            severity=Severity.WARNING,
            message=f"Custom check {name}: {result_value:.2f} (threshold: {threshold})",
            value=result_value
        )
        self.results.append(result)
        return result

    def summary(self) -> dict:
        """Return summary of all checks."""
        total = len(self.results)
        passed = sum(1 for r in self.results if r.passed)
        errors = sum(1 for r in self.results if r.severity == Severity.ERROR and not r.passed)
        critical = sum(1 for r in self.results if r.severity == Severity.CRITICAL and not r.passed)

        return {
            'total_checks': total,
            'passed': passed,
            'failed': total - passed,
            'errors': errors,
            'critical': critical,
            'success_rate': passed / total if total > 0 else 1.0
        }

# Usage
checker = DataQualityChecker(df)
checker.check_not_null('user_id', threshold=1.0)
checker.check_unique('event_id')
checker.check_range('amount', 0, 1000000)
checker.check_custom('valid_emails', lambda df: df['email'].str.contains('@').mean())

summary = checker.summary()
if summary['critical'] > 0:
    raise ValueError(f"Data quality check failed: {summary['critical']} critical issues")

Performance Optimization Tips

1. Vectorization Over Loops

# SLOW: Python loops
result = []
for idx, row in df.iterrows():
    result.append(row['amount'] * row['price'] * 1.08)
df['total'] = result

# FAST: Vectorized operations
df['total'] = df['amount'] * df['price'] * 1.08

2. Use Appropriate File Formats

# File size comparison for 1M rows
df.to_csv('data.csv')           # 100 MB, slow read/write
df.to_json('data.json')         # 150 MB, slowest
df.to_parquet('data.parquet')   # 25 MB, fast read/write  <-- Best choice
df.to_feather('data.feather')   # 30 MB, fastest reads

3. Parallel Processing

from multiprocessing import Pool
from concurrent.futures import ProcessPoolExecutor

# Using multiprocessing
def process_chunk(chunk_df):
    # Heavy transformation
    return transformed_chunk

chunks = np.array_split(df, multiprocessing.cpu_count())

with Pool() as pool:
    results = pool.map(process_chunk, chunks)

result_df = pd.concat(results)

# Using concurrent.futures (modern approach)
with ProcessPoolExecutor() as executor:
    results = list(executor.map(process_chunk, chunks))

Complete Pipeline Example

Here’s a production-ready pipeline combining all patterns:

import pandas as pd
from sqlalchemy import create_engine
from pydantic import ValidationError
import logging
from datetime import datetime

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class SalesDataPipeline:
    def __init__(self, db_url: str, api_client):
        self.db_engine = create_engine(db_url)
        self.api_client = api_client

    @retry_with_backoff(max_retries=3)
    def extract(self, start_date: str, end_date: str) -> pd.DataFrame:
        """Extract sales data from API."""
        data = self.api_client.get_sales(
            start_date=start_date,
            end_date=end_date
        )
        return pd.DataFrame(data)

    def transform(self, df: pd.DataFrame) -> pd.DataFrame:
        """Transform and validate data."""
        # Data quality checks
        checker = DataQualityChecker(df)
        checker.check_not_null('order_id')
        checker.check_unique('order_id')
        checker.check_range('amount', 0, 1000000)

        if checker.summary()['errors'] > 0:
            raise ValueError(f"Data quality issues: {checker.summary()}")

        # Transformations
        return (
            df
            .assign(
                order_date=lambda x: pd.to_datetime(x['order_timestamp']).dt.date,
                revenue_usd=lambda x: x['amount'] * x['exchange_rate'],
                loaded_at=lambda _: datetime.utcnow()
            )
            [['order_id', 'customer_id', 'order_date', 'revenue_usd', 'loaded_at']]
        )

    def load(self, df: pd.DataFrame, table_name: str):
        """Load data to database."""
        df.to_sql(
            table_name,
            self.db_engine,
            if_exists='append',
            index=False,
            method='multi',
            chunksize=1000
        )
        logger.info(f"Loaded {len(df)} records to {table_name}")

    def run(self, start_date: str, end_date: str):
        """Execute full pipeline."""
        logger.info(f"Starting pipeline for {start_date} to {end_date}")

        try:
            # Extract
            raw_df = self.extract(start_date, end_date)
            logger.info(f"Extracted {len(raw_df)} records")

            # Transform
            transformed_df = self.transform(raw_df)
            logger.info(f"Transformed to {len(transformed_df)} records")

            # Load
            self.load(transformed_df, 'fact_sales')

            logger.info("Pipeline completed successfully")
            return True

        except Exception as e:
            logger.error(f"Pipeline failed: {e}")
            raise

# Usage
pipeline = SalesDataPipeline(
    db_url='postgresql://user:pass@localhost/analytics',
    api_client=SalesAPIClient(api_key='xxx')
)
pipeline.run(start_date='2026-03-01', end_date='2026-03-08')

Key Takeaways

Mastering Python for data engineering means:

  1. Know your libraries: Pandas for small data, PySpark for big data
  2. Use patterns: ETL pipelines, incremental loads, retry logic
  3. Validate data: Schema validation, quality checks
  4. Optimize performance: Vectorization, parallel processing, proper file formats
  5. Handle errors gracefully: Logging, retries, alerting

Python’s ecosystem continues to evolve, but these fundamentals will serve you well in any data engineering role.


Questions about Python data engineering patterns? Reach out through the contact page or connect on LinkedIn.

MD Furkanul Islam

MD Furkanul Islam

Data Engineer & AI/ML Specialist

9+ years building intelligent data systems at scale. Passionate about bridging the gap between data engineering, AI, and robotics.