YC Medical
ENTER

PyFlink II: Windows, Watermarks, and Late Events

Goal: Understand and apply the core mechanisms of Flink stream processing—Windows and Watermarks—to handle aggregations and delayed events. Implement PostgreSQL UPSERTs for late-arriving event corrections.


If a stream processing platform just moves data verbatim from left to right, a native Python Consumer running an infinite loop is probably sufficient (like we saw in the first post). But when the requirement becomes: “Give me the total revenue accumulated in the past 1 hour for each taxi pickup zone” — the real challenge begins.

This is the boundary between Stream Processing and Micro-batching: How do you define “1 hour”?

To calculate this out-flowing streaming data, we need to “chop” it into buckets. Native Flink SQL offers 3 incredibly powerful time-window mechanisms that are impossible to match with manual code:

  1. Tumbling Windows:
    The most common form. Fixed-size and non-overlapping. Example: Calculating daily transaction volumes once a day, or reporting hourly trips. For traditional batch developers, this is the low-latency version of a batch job.

    1
    
    TUMBLE(TABLE events, DESCRIPTOR(event_timestamp), INTERVAL '1' HOUR)
  2. Sliding Windows (Hopping):
    The spike radar. Fixed-size, but overlapping. A single taxi ride might fall into several sliding windows. Example: Finding the peak order volume in any 5-minute interval, reported every 1 minute (a sliding step of 1 minute).

    1
    
    HOP(TABLE events, DESCRIPTOR(event_timestamp), INTERVAL '15' MINUTE, INTERVAL '1' HOUR)
  3. Session Windows:
    For user trajectories. The size is dynamic. It doesn’t look at “duration,” it looks at “idle time.” Example: If you play a game for 10 minutes, log off, and don’t return for 30 minutes, you are considered “out of combat.” The session ends and a settlement is triggered. Any actions occurring during that active period are scoped to a dedicated dynamic window.


2. Window Heartbeats and Triggers: Watermarks

Defining a 1-hour tumbling window seems fine. But real-world networks are terrible — a driver might pass through a long, signal-dead tunnel. A ride that occurred at 13:58 might not sync to our Kafka cluster until 14:05.

If we close the 13:00~14:00 window based on the objective objective clock striking 14:00, we will lose the payment record for that tunnel-delayed ride.

If we don’t settle the window immediately, the stream system just waits. How long should it wait? 5 minutes? 1 week? Because we can’t emit records without settling, the entire pipeline gets clogged with an indefinite sentence.

We need a clear yardstick and a “patience threshold” — this is the Watermark.

When constructing the kafka-events source table, we manually establish this yardstick:

1
2
3
4
5
6
7
8
9
CREATE TABLE events (
    PULocationID INTEGER,
...
    -- 1. Convert the raw epoch milliseconds into a standard Flink TIMESTAMP column
    event_timestamp AS TO_TIMESTAMP_LTZ(tpep_pickup_datetime, 3),
...
    -- 2. Set the wait time (patience) as the yardstick to 5 seconds
    WATERMARK for event_timestamp as event_timestamp - INTERVAL '5' SECOND
)

What this code means is: Flink will keep an eye on the latest event_timestamp it has received. “Subtracting 5 seconds” from this value gives us the current water level of the system.

  • As long as this Watermark has not overflowed (reached) the physical boundary of the current window (e.g., 14:00:00), this premature window will not close! It continues to accept taxi events belonging to the 13:00~14:00 cycle that arrived late due to network latency.
  • Only when the majority of newly received taxi orders in the system have an event_timestamp reaching 14:00:05 (Watermark calculates to: 14:00:05 - 5s = 14:00:00), does the 13:00 window truly declare termination and commit the aggregated total to Postgres!

3. UPSERT: Insuring Against Severe Delays

Setting an INTERVAL '5' SECOND watermark handles minor tardiness. But what if the taxi is stuck in a dead zone for 20 seconds? By then, the window’s heartbeat has long flatlined! The Postgres database has already recorded the finalized numbers.

We remedy this extreme case by configuring the target PostgreSQL database to perform an UPSERT (Adaptive Update or Insert):

  1. Source Aggregation: We must ensure that the window timestamp and the location ID are compositely used as the primary key (PRIMARY KEY).
  2. Explicitly declare this primary key in the Flink Postgres JDBC table DDL:
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
CREATE TABLE processed_events_aggregated (
    window_start TIMESTAMP(3),
    PULocationID INT,
    num_trips BIGINT,
    total_revenue DOUBLE,
    -- Enable UPSERT behavior for JDBC by declaring NOT ENFORCED
    PRIMARY KEY (window_start, PULocationID) NOT ENFORCED
) WITH (
    'connector' = 'jdbc', ....
)

With this, when a severely overdue event as described above arrives, Flink won’t just crash or forcefully append a duplicate num_trips=1 row to the end of the relational table. Instead, it issues a Modification Command (UPDATE) to Postgres. PostgreSQL uses the primary key to precisely locate the original [13:00, Total Revenue] row and silently performs a self-healing overwrite (Corrected Update).

Conclusion

When building rigorous real-time computation pipelines, handing data blindly from left to right does not cross the threshold of true engineering. Watermarks dictate the balance between low latency and result accuracy, while downstream database Primary Key UPSERTs provide system-level safety nets. This is an indispensable set of core neural primitives Flink utilizes for constructing cyber-industrial data architectures.