Streaming Foundations II: Python Consumers and PostgreSQL
Goal: Write a Python Consumer to read and deserialize Kafka messages. Setup a target PostgreSQL database and use
psycopg2to persist streaming events in real-time, while exploring the bottlenecks of native consumption architectures.
1. The Native Python Consumer
In the previous post, we wrote 1000 taxi ride records into Redpanda’s rides topic. These data points are stored exclusively as raw byte streams on the broker.
Now, we need to write a Python script to consume and interpret those bytes.
Configuring the Deserializer
A consumer doesn’t just pull data from Kafka; it also needs to reconstruct the raw JSON bytes back into our expected Python data structure — the Ride dataclass.
This process is the exact inverse of production:
- Receive UTF-8 encoded bytes and decode them into a string.
- Use
json.loadsto turn the JSON string into a dictionary. - Unpack the dictionary to instantiate a
Rideobject.
|
|
We pass ride_deserializer as the value_deserializer. By doing this, every message.value yielded by the consumer loop is automatically converted into a Ride instance.
Connecting and Listening to Kafka
|
|
Key Parameter Breakdown:
auto_offset_reset='earliest': Determines where to start reading if this is our first time connecting.earliestmeans reading from the absolute beginning of the topic. The other option,latest(the default), means only listening to new messages arriving after the script boots up.group_id='rides-console': This is the consumer group identifier. Kafka relies on group IDs to independently track and manage read Offsets for every unique group. If you restart a script with the samegroup_id, it resumes right where it left off. If you change thegroup_id(and hit theearliestreset flag), it will reread the entire topic from scratch.
2. Writing Stream Data to a Relational Database
Printing to the console is great for debugging, but in reality, streaming pipelines usually feed data into systems like PostgreSQL for business querying and dashboards.
Since our script acts as a direct passthrough pipe, we have to wire everything manually. First, spin up the database via Docker:
|
|
Drop into the Postgres environment and create the event storage table. Because native Python doesn’t provide automatic stream-to-table mappings, the columns must match our schema exactly:
|
|
Persisting Records
We extend our script by importing the Postgres driver psycopg2-binary:
|
|
And just like that, we’ve built an extreme end-to-end streaming component: Python Taxi Script -> Redpanda Broker -> Python Postgres Script -> PostgreSQL Persistence.
3. Why Native Python Consumers Aren’t Enough for Production?
The infrastructure is functional. We moved data from the sender to the database in milliseconds. However, deploying this in the real world introduces massive engineering pitfalls. Let’s look at the limitations:
- Manual State and Offset Management (Checkpoints): What if the Python Postgres service crashes halfway through a chunk? Can we rely solely on Kafka’s underlying offsets to recover? What if it crashes in the microsecond after successfully writing to Postgres but before acknowledging Kafka (Making Exactly-Once processing extremely painful)? You’d have to build robust retry logic and idempotency checks manually.
- Windowing and Aggregations: What if you now need the “Total Revenue Per Hour”? And what if, at 10 AM, you hesitantly receive delayed messages generated at 8 AM inside a tunnel?
- Cluttered Connector Code: You are forced to spend enormous amounts of energy writing low-level networking and serialization boilerplate to communicate with Postgres, S3, or Elasticsearch.
This is exactly why nobody uses raw Python loop scripts to build large-scale streaming hubs. In the next post, we introduce the professional heavy-hitter: Apache Flink, and use it to elegantly replace this vulnerable native consumer code.