BLOG EXPERIENCE PROJECTS CONTACT
BACK_TO_BLOG
Machine Learning DEC 28, 2024 8 min read

Building Production ML Pipelines: Lessons from Processing 100K Messages Daily

The 3AM Wake-Up Call

It started with a Slack message at 2:47 AM. Our Kafka-based ML pipeline had crashed, and 47,000 messages were backing up in the queue. By the time I got to my laptop, it was closer to 100K. The incident report would later show that a simple memory leak in our message consumer—something that should’ve been caught in staging—was causing the entire system to grind to a halt every 6 hours.

That was three years into running production ML infrastructure at scale. It taught me something valuable: the difference between a system that works and a system that doesn’t isn’t usually complexity or architecture choices. It’s the unglamorous details. The monitoring. The incremental improvements. The lessons nobody wants to write papers about.

This is what actually works at 100K messages per day.


The Setup We Inherited (And Had To Fix)

When I took over the ML infrastructure team, we were running a Kafka cluster feeding into a consumer group that processed messages through a TensorFlow model. Simple enough on paper. The system handled about 100K messages daily, split across customer requests that ranged from 10 to 1000+ instances per batch.

Our latency was all over the place—p50 was 200ms, which was fine, but p99 was hitting 8+ seconds. Customers were complaining about timeouts on the slower predictions. And our error handling? Let’s just say “retry and hope” isn’t a strategy that scales.

Here’s what a basic consumer looked like:

from kafka import KafkaConsumer
import json

consumer = KafkaConsumer(
    'ml-predictions',
    bootstrap_servers=['kafka:9092'],
    group_id='ml-processor-v1',
)

for message in consumer:
    data = json.loads(message.value)
    prediction = model.predict(data['features'])
    # Send result somewhere...

This works until it doesn’t. And it doesn’t at scale.


Lesson 1: Batch Processing Beats Single-Message Processing

The biggest win came from a deceptively simple change: instead of processing messages one at a time, we started batching them.

Our model was actually built to handle batch inference—it was way more efficient that way—but we were feeding it single instances. This meant every message triggered a full model forward pass, wasting compute and killing our throughput.

from kafka import KafkaConsumer
import json
import time
from collections import defaultdict

consumer = KafkaConsumer(
    'ml-predictions',
    bootstrap_servers=['kafka:9092'],
    group_id='ml-processor-v1',
    max_poll_records=100,  # Important: pull multiple messages
    session_timeout_ms=30000,
)

def process_batch(messages, batch_timeout=0.5):
    """Process a batch with a timeout fallback."""
    batch = []
    features_list = []
    metadata = []

    start_time = time.time()

    for message in messages:
        if time.time() - start_time > batch_timeout:
            break

        try:
            data = json.loads(message.value)
            batch.append(message)
            features_list.append(data['features'])
            metadata.append({'message': message, 'request_id': data.get('id')})
        except json.JSONDecodeError:
            # Dead letter queue handling (critical!)
            send_to_dlq(message)

    if not batch:
        return []

    # Single batch inference call
    predictions = model.predict(features_list)

    # Map results back to original messages
    results = []
    for pred, meta in zip(predictions, metadata):
        results.append({
            'request_id': meta['request_id'],
            'prediction': pred.tolist(),
            'message': meta['message']
        })

    return results

for batch in consumer:
    results = process_batch(batch)
    commit_and_send_results(results)
    consumer.commit()

The difference? Processing time dropped by 65%. We went from ~15K messages/day to 100K+ because we were finally using the hardware efficiently. The p99 latency dropped from 8 seconds to about 1.2 seconds.


Lesson 2: Monitoring Latency Means Understanding Latency

We had metrics, but they were useless. We tracked average latency (misleading) and error rates (only useful if they’re non-zero), but nothing told us where time was actually being spent.

I added percentile tracking across the pipeline stages:

import time
from dataclasses import dataclass
from typing import Dict, List

@dataclass
class LatencyBreakdown:
    kafka_fetch_ms: float
    preprocessing_ms: float
    model_inference_ms: float
    postprocessing_ms: float
    result_sending_ms: float

    @property
    def total_ms(self) -> float:
        return (self.kafka_fetch_ms + self.preprocessing_ms +
                self.model_inference_ms + self.postprocessing_ms +
                self.result_sending_ms)

def process_batch_with_timing(batch: List) -> tuple[List[Dict], LatencyBreakdown]:
    """Process batch while tracking timing at each stage."""
    timings = LatencyBreakdown(0, 0, 0, 0, 0)

    # Stage 1: Kafka fetch (implicit, but we can track)
    start = time.perf_counter()
    # Messages already fetched, but note the time
    timings.kafka_fetch_ms = (time.perf_counter() - start) * 1000

    # Stage 2: Preprocessing
    start = time.perf_counter()
    features_list = []
    for msg in batch:
        data = json.loads(msg.value)
        features_list.append(preprocess(data['features']))
    timings.preprocessing_ms = (time.perf_counter() - start) * 1000

    # Stage 3: Model inference
    start = time.perf_counter()
    predictions = model.predict(features_list)
    timings.model_inference_ms = (time.perf_counter() - start) * 1000

    # Stage 4: Postprocessing
    start = time.perf_counter()
    results = [format_result(pred) for pred in predictions]
    timings.postprocessing_ms = (time.perf_counter() - start) * 1000

    # Stage 5: Send results
    start = time.perf_counter()
    send_batch_results(results)
    timings.result_sending_ms = (time.perf_counter() - start) * 1000

    return results, timings

# In your monitoring:
results, timings = process_batch_with_timing(batch)
log_metrics({
    'latency_p50': timings.total_ms,
    'stage_model_inference': timings.model_inference_ms,
    'stage_preprocessing': timings.preprocessing_ms,
})

This was eye-opening. The model inference wasn’t the bottleneck—it was preprocessing. We were doing feature normalization that should’ve been cached. That fix alone cut p99 by another 200ms.


Lesson 3: Dead Letter Queues Are Non-Negotiable

We lost data. Not a ton, but enough that customers noticed. Messages with malformed JSON or unexpected feature dimensions would crash the consumer, get retried infinitely, and eventually just disappear.

Every message that fails needs to go somewhere you can debug it:

def send_to_dlq(message, reason: str, error: Exception = None):
    """Send failed messages to a dead letter queue for debugging."""
    dlq_producer.send('ml-predictions-dlq',
        value={
            'original_message': message.value.decode('utf-8'),
            'offset': message.offset,
            'partition': message.partition,
            'timestamp': message.timestamp,
            'failure_reason': reason,
            'error_message': str(error) if error else None,
        }.encode('utf-8')
    )

# Usage:
try:
    data = json.loads(message.value)
    features = extract_features(data)
except json.JSONDecodeError as e:
    send_to_dlq(message, 'invalid_json', e)
    continue
except FeatureExtractionError as e:
    send_to_dlq(message, 'feature_extraction_failed', e)
    continue

Having that DLQ meant we could identify problematic customers (one was sending NaN values), fix data pipelines upstream, and actually learn from failures instead of just retrying blindly.


Lesson 4: Graceful Degradation Under Load

The pipeline was fragile. If the model inference slowed down even slightly, the consumer would fall behind, memory would spike, and everything would crash.

We implemented request timeouts and a fallback mechanism:

from concurrent.futures import ThreadPoolExecutor, TimeoutError as FutureTimeoutError

class RobustMLConsumer:
    def __init__(self, model, fallback_model, timeout_seconds=2.0):
        self.model = model
        self.fallback_model = fallback_model  # Lighter, faster model
        self.timeout = timeout_seconds
        self.executor = ThreadPoolExecutor(max_workers=1)

    def predict_with_fallback(self, features_batch):
        """Try primary model, fall back to secondary if it times out."""
        try:
            future = self.executor.submit(self.model.predict, features_batch)
            return future.result(timeout=self.timeout)
        except FutureTimeoutError:
            print(f"Primary model timeout, using fallback")
            return self.fallback_model.predict(features_batch)
        except Exception as e:
            print(f"Model error: {e}, using fallback")
            return self.fallback_model.predict(features_batch)

The fallback wasn’t perfect, but 70% accuracy served immediately beats 95% accuracy that arrives too late. Customers would rather have a slightly worse prediction than a timeout error.


Lesson 5: Commit Strategy Matters

We were losing messages. Not often, but our commit strategy was wrong—we committed offsets before actually sending results. If the consumer crashed between commit and send, those messages were gone.

# BAD: Commit before processing is complete
consumer.commit()
predictions = model.predict(batch)
send_results(predictions)  # If this fails, no retry

# GOOD: Process completely, then commit
predictions = model.predict(batch)
send_results(predictions)  # Retry logic here
consumer.commit()

# EVEN BETTER: Use idempotent result sending
def send_results_idempotent(results):
    """Use message IDs to avoid duplicate results on retry."""
    for result in results:
        producer.send('results',
            key=result['request_id'],  # Same ID = overwrites, ensures idempotency
            value=result
        )

This sounds obvious in hindsight, but running with wrong assumptions about what “commit” means will haunt you at 3 AM.


What Actually Matters

Looking back at the three years of running this pipeline, the big wins weren’t architectural. They were:

  1. Batch processing → 65% latency reduction
  2. Understanding where time goes → 200ms p99 improvement
  3. Handling failures explicitly → 5 fewer incidents per month
  4. Graceful degradation → 99.8% uptime (was 96%)
  5. Correct commit semantics → Zero message loss

All of these are things you learn by shipping, breaking things, and being woken up at 3 AM. There’s no shortcut here.

The next time someone asks about building production ML systems, I don’t talk about fancy algorithms or cutting-edge architectures. I talk about batching, monitoring, and dead letter queues. Boring wins at scale.


Have experience with production ML pipelines? I’d love to hear what your 3AM lessons were. Reach out on Twitter or LinkedIn.