Menu

© 2026 Furkanul Islam

}
{
</>

Real-Time Data Streaming with Apache Kafka: The Complete Guide

Master event-driven architectures with Apache Kafka. Learn producers, consumers, brokers, topics, and stream processing patterns for building scalable real-time data platforms.

Apache Kafka has become the backbone of modern real-time data infrastructure, powering event-driven architectures at companies like Netflix, Uber, and LinkedIn. In this guide, I’ll share everything you need to know about Kafka, from core concepts to production deployment patterns.

Understanding Kafka: More Than Just a Message Queue

When I first started with Kafka, I thought it was just another message queue. I was wrong. Kafka is a distributed streaming platform designed for:

  • Publishing and subscribing to streams of records (like a message queue)
  • Storing streams of records durably and fault-tolerantly (like a database)
  • Processing streams of records as they occur (like a stream processor)

Key Differentiators

FeatureTraditional MQKafka
Message retentionDeleted after consumeDays to forever
ThroughputThousands/secMillions/sec
ScalingVerticalHorizontal
Order guaranteePer queuePer partition
Consumer modelPushPull-based

Core Kafka Concepts Explained

Producers: Publishing Events

Producers are applications that publish (write) streams of records to Kafka topics:

from kafka import KafkaProducer
import json
from datetime import datetime

producer = KafkaProducer(
    bootstrap_servers=['kafka-1:9092', 'kafka-2:9092', 'kafka-3:9092'],
    value_serializer=lambda v: json.dumps(v).encode('utf-8'),
    key_serializer=lambda k: k.encode('utf-8') if k else None,
    acks='all',  # Wait for all replicas to acknowledge
    retries=3,
    retry_backoff_ms=100,
    compression_type='snappy',  # Reduce network bandwidth
    batch_size=16384,  # 16KB batches
    linger_ms=5  # Wait up to 5ms to batch messages
)

# Publish an event
event = {
    'event_id': 'evt_12345',
    'user_id': 'usr_67890',
    'event_type': 'page_view',
    'properties': {
        'page': '/products/laptop',
        'referrer': 'google.com'
    },
    'timestamp': datetime.utcnow().isoformat()
}

future = producer.send(
    topic='user-events',
    key='usr_67890',  # Same key = same partition = order preserved
    value=event
)

# Handle success/failure
def on_send_success(metadata):
    print(f"Sent to partition {metadata.partition}, offset {metadata.offset}")

def on_send_error(ex):
    print(f"Failed to send: {ex}")

future.add_callback(on_send_success)
future.add_errback(on_send_error)
producer.flush()

Consumers: Processing Events

Consumers subscribe to topics and process the published records:

from kafka import KafkaConsumer
import json

consumer = KafkaConsumer(
    'user-events',
    bootstrap_servers=['kafka-1:9092', 'kafka-2:9092'],
    auto_offset_reset='latest',  # or 'earliest'
    enable_auto_commit=True,
    auto_commit_interval_ms=1000,
    group_id='analytics-consumer-group',
    consumer_timeout_ms=1000,
    value_deserializer=lambda v: json.loads(v.decode('utf-8')),
    key_deserializer=lambda k: k.decode('utf-8') if k else None,
    max_poll_records=500,
    session_timeout_ms=30000,
    heartbeat_interval_ms=10000
)

def process_event(event):
    # Your business logic here
    print(f"Processing: {event['event_id']}")

try:
    for message in consumer:
        try:
            process_event(message.value)
            # Auto-commit handles offset tracking
        except Exception as e:
            print(f"Error processing message: {e}")
            # Message will be reprocessed on next poll
finally:
    consumer.close()

Topics and Partitions: Organizing Data

Topics in Kafka are similar to tables in a database—they categorize your data. But here’s where Kafka gets powerful: partitions.

Topic: user-events
├── Partition 0 (Leader: Broker 1)
│   ├── Offset 0: {user_id: "A", event: "..."}
│   ├── Offset 1: {user_id: "D", event: "..."}
│   └── Offset 2: {user_id: "A", event: "..."}
├── Partition 1 (Leader: Broker 2)
│   ├── Offset 0: {user_id: "B", event: "..."}
│   └── Offset 1: {user_id: "E", event: "..."}
└── Partition 2 (Leader: Broker 3)
    ├── Offset 0: {user_id: "C", event: "..."}
    └── Offset 1: {user_id: "F", event: "..."}

Key partition insights:

  • Ordering is guaranteed within a partition, not across partitions
  • More partitions = more parallelism (one consumer per partition max)
  • Partitions are immutable—data is appended, never modified

Partitioning Strategy

# Custom partitioner for even distribution
from kafka.partitioner import Partitioner

class UserIdPartitioner(Partitioner):
    def __call__(self, key, all_partitions, available_partitions):
        if key is None:
            # Round-robin for keyless messages
            return all_partitions[self.call_count % len(all_partitions)]
        # Hash-based partitioning for ordered processing
        return hash(key) % len(all_partitions)

producer = KafkaProducer(
    bootstrap_servers=['kafka-1:9092'],
    partitioner=UserIdPartitioner()
)

Brokers and Clusters: The Infrastructure

A Kafka cluster consists of multiple brokers (servers). Here’s my recommended setup:

# docker-compose.yml for development
version: '3.8'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.5.0
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  kafka-1:
    image: confluentinc/cp-kafka:7.5.0
    depends_on: [zookeeper]
    ports: ["9092:9092"]
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-1:9092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
      KAFKA_MIN_INSYNC_REPLICAS: 2
      KAFKA_NUM_PARTITIONS: 3

  kafka-2:
    image: confluentinc/cp-kafka:7.5.0
    depends_on: [zookeeper]
    ports: ["9093:9093"]
    environment:
      KAFKA_BROKER_ID: 2
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-2:9093

  kafka-3:
    image: confluentinc/cp-kafka:7.5.0
    depends_on: [zookeeper]
    ports: ["9094:9094"]
    environment:
      KAFKA_BROKER_ID: 3
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-3:9094

Architecture Patterns with Kafka

Event Sourcing

Store state as a sequence of events rather than current state:

# Event store pattern
class EventStore:
    def __init__(self, kafka_producer):
        self.producer = kafka_producer

    def append(self, stream_id, event_type, payload):
        event = {
            'stream_id': stream_id,
            'event_type': event_type,
            'payload': payload,
            'timestamp': datetime.utcnow().isoformat(),
            'version': self._get_next_version(stream_id)
        }
        self.producer.send('event-store', key=stream_id, value=event)
        return event

# Rebuilding state from events
def rebuild_state(stream_id):
    consumer = KafkaConsumer(
        'event-store',
        auto_offset_reset='earliest',
        group_id=f'rebuild-{stream_id}'
    )

    state = {}
    for message in consumer:
        event = message.value
        if event['stream_id'] == stream_id:
            state = apply_event(state, event)
    return state

CQRS (Command Query Responsibility Segregation)

Separate write and read models for scalability:

┌─────────────┐     ┌──────────────┐     ┌─────────────┐
│   Write     │ ──> │   Kafka      │ ──> │   Read      │
│   Service   │     │   Events     │     │   Models    │
│   (Command) │     │              │     │   (Query)   │
└─────────────┘     └──────────────┘     └─────────────┘


                    ┌──────────────┐
                    │  Projections │
                    │  - UserView  │
                    │  - OrderView │
                    └──────────────┘

Stream Processing with Kafka Streams

from kafka.streams import KafkaStreams, StreamsBuilder
from kafka.streams.kstream import KStream, TimeWindows
from datetime import timedelta

builder = StreamsBuilder()

# Input stream
events: KStream = builder.stream('user-events')

# Filter and transform
filtered = events \
    .filter(lambda k, v: v['event_type'] == 'purchase') \
    .map_values(lambda v: {
        'user_id': v['user_id'],
        'amount': v['amount'],
        'currency': 'USD'
    })

# Aggregation with windowing
revenue_by_user = filtered \
    .group_by_key() \
    .window_by_time(TimeWindows.of(timedelta(hours=1))) \
    .aggregate(
        initializer=lambda: 0.0,
        aggregator=lambda k, v, agg: agg + v['amount'],
        materialized='revenue-store'
    )

# Write to output topic
revenue_by_user.to_stream().to('hourly-revenue')

streams = KafkaStreams(builder.build(), {
    'application.id': 'revenue-processor',
    'bootstrap.servers': 'kafka-1:9092,kafka-2:9092',
    'default.key.serde': 'org.apache.kafka.common.serialization.Serdes$StringSerde',
    'default.value.serde': 'org.apache.kafka.common.serialization.Serdes$DoubleSerde',
    'num.stream.threads': 4
})

streams.start()

Schema Registry: Managing Data Contracts

Using Avro with Schema Registry ensures data compatibility:

from confluent_kafka import avro
from confluent_kafka.avro import AvroProducer

# Define schema
value_schema_str = """
{
  "namespace": "com.example.events",
  "type": "record",
  "name": "UserEvent",
  "fields": [
    {"name": "event_id", "type": "string"},
    {"name": "user_id", "type": "string"},
    {"name": "event_type", "type": "string"},
    {"name": "timestamp", "type": "long"},
    {"name": "properties", "type": ["null", "string"], "default": null}
  ]
}
"""

value_schema = avro.loads(value_schema_str)

producer = AvroProducer({
    'bootstrap.servers': 'kafka-1:9092',
    'schema.registry.url': 'http://schema-registry:8081'
}, default_value_schema=value_schema)

producer.produce(
    topic='user-events',
    value={
        'event_id': 'evt_123',
        'user_id': 'usr_456',
        'event_type': 'login',
        'timestamp': 1234567890
    }
)

Production Best Practices

1. Topic Configuration

from kafka.admin import KafkaAdminClient, NewTopic

admin_client = KafkaAdminClient(bootstrap_servers='kafka-1:9092')

topics = [
    NewTopic(
        name='user-events',
        num_partitions=12,
        replication_factor=3,
        topic_configs={
            'retention.ms': '604800000',  # 7 days
            'segment.bytes': '1073741824',  # 1GB segments
            'min.insync.replicas': '2',
            'compression.type': 'snappy'
        }
    )
]

admin_client.create_topics(topics)

2. Monitoring Consumer Lag

from kafka import KafkaConsumer
from kafka.structs import TopicPartition

def check_consumer_lag():
    consumer = KafkaConsumer(bootstrap_servers='kafka-1:9092')

    # Get all partitions for topic
    partitions = consumer.partitions_for_topic('user-events')
    topic_partitions = [TopicPartition('user-events', p) for p in partitions]

    # Get committed offsets
    consumer.assign(topic_partitions)
    committed = consumer.committed(topic_partitions[0])

    # Get end offsets (latest)
    end_offsets = consumer.end_offsets(topic_partitions)

    # Calculate lag
    for tp in topic_partitions:
        lag = end_offsets[tp] - consumer.committed([tp])[tp]
        if lag > 10000:
            send_alert(f"High lag on partition {tp.partition}: {lag}")

3. Handling Poison Pills

from kafka import KafkaConsumer
import logging

DLQ_TOPIC = 'dead-letter-queue'

def process_with_dlq():
    consumer = KafkaConsumer(
        'user-events',
        group_id='main-processor',
        enable_auto_commit=False  # Manual commit for control
    )

    for message in consumer:
        try:
            process_event(message.value)
            consumer.commit()
        except PoisonPillError:
            # Send to DLQ and commit to avoid infinite loop
            producer.send(DLQ_TOPIC, value={
                'original_message': message.value,
                'error': str(e),
                'topic': message.topic,
                'partition': message.partition,
                'offset': message.offset
            })
            consumer.commit()
            log.error(f"Sent poison pill to DLQ: {message.offset}")
        except Exception as e:
            log.error(f"Processing error: {e}")
            # Don't commit - will retry on next poll

4. Exactly-Once Semantics

# Transactional producer for exactly-once
producer = KafkaProducer(
    bootstrap_servers='kafka-1:9092',
    transactional_id='my-transactional-producer',
    acks='all',
    retries=5,
    max_in_flight_requests_per_connection=5  # Required for EOS
)

producer.init_transactions()

try:
    producer.begin_transaction()

    # Multiple produces in single transaction
    producer.send('topic-1', value=event1)
    producer.send('topic-2', value=event2)

    # Send offsets atomically
    producer.send_transactional_offsets(
        'user-events',
        {TopicPartition('user-events', 0): 12345}
    )

    producer.commit_transaction()
except Exception as e:
    producer.abort_transaction()
    raise

Performance Tuning Guide

SettingDefaultProductionImpact
batch.size1638465536Larger batches = better throughput
linger.ms05-20Wait to batch more messages
compressionnonesnappy/lz450-70% size reduction
buffer.memory33554432134217728More buffering for bursts
num.io.threads816+More I/O parallelism
num.network.threads38+Handle more connections

Common Pitfalls and Solutions

Problem: Consumer Rebalancing Storm

# Solution: Increase session timeout
consumer = KafkaConsumer(
    group_id='my-group',
    session_timeout_ms=45000,  # Default is 10000
    heartbeat_interval_ms=15000,  # Default is 3000
    max_poll_interval_ms=300000  # Give more time for processing
)

Problem: Message Ordering Issues

# Solution: Use same key for related messages
# All events for user_123 go to same partition
producer.send('events', key='user_123', value=event1)
producer.send('events', key='user_123', value=event2)

Problem: High Consumer Lag

# Solutions:
# 1. Increase partitions (requires topic recreation)
# 2. Add more consumers to the group
# 3. Optimize processing logic
# 4. Increase max_poll_records

Key Takeaways

Kafka is powerful but requires understanding of:

  1. Partitioning strategy for parallelism and ordering
  2. Consumer group management for scaling
  3. Schema evolution for data contracts
  4. Monitoring for lag, throughput, and errors
  5. Exactly-once semantics when data integrity is critical

Master these concepts, and you’ll be equipped to build robust, real-time data platforms that scale to millions of events per second.


Questions about Kafka implementation? Reach out through the contact page or connect on LinkedIn.

MD Furkanul Islam

MD Furkanul Islam

Data Engineer & AI/ML Specialist

9+ years building intelligent data systems at scale. Passionate about bridging the gap between data engineering, AI, and robotics.