YC Medical
ENTER

Streaming Foundations II: Python Consumers and PostgreSQL

Goal: Write a Python Consumer to read and deserialize Kafka messages. Setup a target PostgreSQL database and use psycopg2 to persist streaming events in real-time, while exploring the bottlenecks of native consumption architectures.


1. The Native Python Consumer

In the previous post, we wrote 1000 taxi ride records into Redpanda’s rides topic. These data points are stored exclusively as raw byte streams on the broker.

Now, we need to write a Python script to consume and interpret those bytes.

Configuring the Deserializer

A consumer doesn’t just pull data from Kafka; it also needs to reconstruct the raw JSON bytes back into our expected Python data structure — the Ride dataclass.

This process is the exact inverse of production:

  1. Receive UTF-8 encoded bytes and decode them into a string.
  2. Use json.loads to turn the JSON string into a dictionary.
  3. Unpack the dictionary to instantiate a Ride object.
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
import json
from dataclasses import dataclass

@dataclass
class Ride:
    PULocationID: int
    DOLocationID: int
    trip_distance: float
    total_amount: float
    tpep_pickup_datetime: int 

def ride_deserializer(data):
    json_str = data.decode('utf-8')
    ride_dict = json.loads(json_str)
    return Ride(**ride_dict)  # ** kwargs mapping to instantiate the object

We pass ride_deserializer as the value_deserializer. By doing this, every message.value yielded by the consumer loop is automatically converted into a Ride instance.

Connecting and Listening to Kafka

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
from kafka import KafkaConsumer

server = 'localhost:9092'
topic_name = 'rides'

# Instantiate the Consumer
consumer = KafkaConsumer(
    topic_name,
    bootstrap_servers=[server],
    auto_offset_reset='earliest', # Crucial
    group_id='rides-console',     # Crucial
    value_deserializer=ride_deserializer
)

print(f"Listening to {topic_name}...")

Key Parameter Breakdown:

  • auto_offset_reset='earliest': Determines where to start reading if this is our first time connecting. earliest means reading from the absolute beginning of the topic. The other option, latest (the default), means only listening to new messages arriving after the script boots up.
  • group_id='rides-console': This is the consumer group identifier. Kafka relies on group IDs to independently track and manage read Offsets for every unique group. If you restart a script with the same group_id, it resumes right where it left off. If you change the group_id (and hit the earliest reset flag), it will reread the entire topic from scratch.

2. Writing Stream Data to a Relational Database

Printing to the console is great for debugging, but in reality, streaming pipelines usually feed data into systems like PostgreSQL for business querying and dashboards.

Since our script acts as a direct passthrough pipe, we have to wire everything manually. First, spin up the database via Docker:

1
2
3
4
5
6
7
8
9
  postgres:
    image: postgres:18
    restart: on-failure
    environment:
      - POSTGRES_DB=postgres
      - POSTGRES_USER=postgres
      - POSTGRES_PASSWORD=postgres
    ports:
      - "5432:5432"

Drop into the Postgres environment and create the event storage table. Because native Python doesn’t provide automatic stream-to-table mappings, the columns must match our schema exactly:

1
2
3
4
5
6
7
CREATE TABLE processed_events (
    PULocationID INTEGER,
    DOLocationID INTEGER,
    trip_distance DOUBLE PRECISION,
    total_amount DOUBLE PRECISION,
    pickup_datetime TIMESTAMP
);

Persisting Records

We extend our script by importing the Postgres driver psycopg2-binary:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
import psycopg2
from datetime import datetime

# Enable autocommit; no need to manually conn.commit() in batches
conn = psycopg2.connect(
    host='localhost', port=5432, database='postgres',
    user='postgres', password='postgres'
)
conn.autocommit = True
cur = conn.cursor()

# Consumer loops through the endless stream of messages
count = 0
for message in consumer:
    ride = message.value
    # Convert epoch milliseconds back to datetime for the timestamp column
    pickup_dt = datetime.fromtimestamp(ride.tpep_pickup_datetime / 1000)
    
    # Insert row by row
    cur.execute(
        """INSERT INTO processed_events
           (PULocationID, DOLocationID, trip_distance, total_amount, pickup_datetime)
           VALUES (%s, %s, %s, %s, %s)""",
        (ride.PULocationID, ride.DOLocationID,
         ride.trip_distance, ride.total_amount, pickup_dt)
    )
    
    count += 1
    if count % 100 == 0:
        print(f"Inserted {count} rows...")

# Graceful shutdown
consumer.close()
cur.close()
conn.close()

And just like that, we’ve built an extreme end-to-end streaming component: Python Taxi Script -> Redpanda Broker -> Python Postgres Script -> PostgreSQL Persistence.


3. Why Native Python Consumers Aren’t Enough for Production?

The infrastructure is functional. We moved data from the sender to the database in milliseconds. However, deploying this in the real world introduces massive engineering pitfalls. Let’s look at the limitations:

  1. Manual State and Offset Management (Checkpoints): What if the Python Postgres service crashes halfway through a chunk? Can we rely solely on Kafka’s underlying offsets to recover? What if it crashes in the microsecond after successfully writing to Postgres but before acknowledging Kafka (Making Exactly-Once processing extremely painful)? You’d have to build robust retry logic and idempotency checks manually.
  2. Windowing and Aggregations: What if you now need the “Total Revenue Per Hour”? And what if, at 10 AM, you hesitantly receive delayed messages generated at 8 AM inside a tunnel?
  3. Cluttered Connector Code: You are forced to spend enormous amounts of energy writing low-level networking and serialization boilerplate to communicate with Postgres, S3, or Elasticsearch.

This is exactly why nobody uses raw Python loop scripts to build large-scale streaming hubs. In the next post, we introduce the professional heavy-hitter: Apache Flink, and use it to elegantly replace this vulnerable native consumer code.