YC Medical
ENTER

Stateful Stream Processing: What Your Pipeline Actually Remembers

Note

COGNITIVE ARCHITECTURE SCAN: Pipeline memory banks online. State store initialised. Checkpoint daemon running. Warning: unbounded state growth detected on partition 7.

A stateless pipeline is a conveyor belt. It receives an item, applies a transformation, and passes it on. No item depends on any other. The belt has no memory.

A stateful pipeline is something closer to a human analyst. It receives data points over time, accumulates them into a running picture, and periodically answers the question: given everything I’ve seen up to this moment, what does the world look like?

The moment you want to answer “how many rides in the last hour”, “what is the running average fare per zone”, or “did this card make five transactions in three minutes”—you are in stateful territory. And stateful processing is where the real engineering begins.


🧠 What State Actually Is

In Flink, state is data that a task accumulates across multiple events and uses to compute future outputs.

The simplest example: a counter.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
# Conceptually: what Flink is doing for a tumbling window aggregation
state = {}  # zone_id → (count, total_revenue)

for event in stream:
    zone = event['PULocationID']
    if zone not in state:
        state[zone] = {'count': 0, 'total': 0.0}
    state[zone]['count'] += 1
    state[zone]['total'] += event['total_amount']

# When window closes: emit aggregated results, reset state

Of course, in production Flink, this is not a Python dictionary living in memory. It is a distributed, fault-tolerant, partitioned state store—but the conceptual model is the same.

Keyed State vs. Operator State

Flink distinguishes between two kinds of state:

Keyed state is partitioned by a key extracted from each event. All events with the same key are routed to the same task slot, where they share a common state partition.

1
2
3
4
5
6
7
-- In Flink SQL, the GROUP BY clause defines the key
SELECT
    PULocationID,               -- This is the key
    COUNT(*) AS num_trips,
    SUM(total_amount) AS revenue
FROM TABLE(TUMBLE(TABLE rides, DESCRIPTOR(pickup_time), INTERVAL '1' HOUR))
GROUP BY PULocationID, window_start, window_end;

Zone 132 goes to task slot A. Zone 48 goes to task slot B. Each maintains its own state independently. This is what makes Flink horizontally scalable: more partitions, more parallel tasks, more throughput.

Operator state is associated with an operator instance rather than a specific key—used primarily for source connectors tracking Kafka offsets.


🏗️ The State Backend: Where State Lives

By default, Flink stores keyed state in memory (the HashMapStateBackend). For small to moderate state sizes, this is fast and sufficient.

For larger state or durability requirements, RocksDB is the production default: a high-performance, embedded key-value store that spills to disk when memory is insufficient.

1
2
3
4
# flink-conf.yaml
state.backend: rocksdb
state.backend.rocksdb.localdir: /tmp/flink-state
state.checkpoints.dir: s3://my-bucket/flink-checkpoints

The choice of backend has significant operational implications:

Backend State Size Checkpoint Speed Use Case
HashMapStateBackend Small–Medium Fast (in-memory snapshot) Low-latency jobs, limited state
RocksDBStateBackend Large–Very Large Slower (incremental snapshots) Production jobs with large state

🔄 Checkpoints: Making State Fault-Tolerant

State that lives only in memory dies with the process. Flink’s checkpointing mechanism takes periodic snapshots of all operator state and persists them to a durable store.

The checkpoint algorithm is based on Chandy-Lamport distributed snapshots. Flink injects special barrier records into the stream. When each task receives a barrier on all its input channels, it serialises its current state and writes it to the checkpoint storage—without pausing event processing.

1
2
3
4
5
6
7
// Checkpoint barriers flow through the stream alongside data:
[event_100] [event_101] [BARRIER_ck42] [event_102] [event_103]

// When task receives BARRIER_ck42:
// 1. Flush current state to: s3://bucket/checkpoints/ck42/task-0/
// 2. Acknowledge to JobManager
// 3. Continue processing event_102, 103...

When the JobManager confirms all tasks have written their state for checkpoint 42, that checkpoint is complete. If the job crashes at any point, it can restart from checkpoint 42 and resume processing from the corresponding Kafka offsets. No data is lost. No aggregations are recomputed from scratch.

Configuring Checkpoints

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
env = StreamExecutionEnvironment.get_execution_environment()

# Checkpoint every 60 seconds
env.enable_checkpointing(60_000)

# Minimum time between checkpoints (prevent checkpoint overlap under slow conditions)
env.get_checkpoint_config().set_min_pause_between_checkpoints(30_000)

# Tolerate 3 consecutive checkpoint failures before failing the job
env.get_checkpoint_config().set_tolerable_checkpoint_failure_number(3)

💣 The State Growth Problem

State is not free. Every distinct key in a keyed state store consumes memory. If the cardinality of your key is high and the state is never cleaned up, you will eventually run out of resources.

Time-to-Live (TTL): State Expiry

Flink provides a built-in TTL mechanism for state. State entries that have not been updated within the TTL window are automatically cleared.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
from pyflink.datastream.state import StateTtlConfig
from pyflink.common import Duration

ttl_config = StateTtlConfig \
    .new_builder(Duration.of_hours(24)) \
    .set_update_type(StateTtlConfig.UpdateType.OnCreateAndWrite) \
    .set_state_visibility(StateTtlConfig.StateVisibility.NeverReturnExpired) \
    .build()

# Apply TTL to a value state descriptor
state_descriptor = ValueStateDescriptor("session_state", Types.STRING())
state_descriptor.enable_time_to_live(ttl_config)

This is critical for session-based processing: if a user has been inactive for 24 hours, their session state is stale and should be freed.

Window State Cleanup

For window aggregations, Flink automatically cleans up window state after the window closes and results are emitted. You do not need to manage this manually for standard tumbling, sliding, or session windows.


🔀 State Redistribution During Scaling

One operational challenge unique to stateful streaming: what happens to state when you scale the job up or down?

If you add more task slots (to handle increased load), existing state must be redistributed across the new parallelism. Flink handles this during savepoints: a manually triggered, full state snapshot that can be used to restart a job with a different parallelism.

1
2
3
4
5
6
# Trigger a savepoint
flink savepoint <job-id> s3://my-bucket/savepoints/

# Restart the job with new parallelism from the savepoint
flink run -p 8 -s s3://my-bucket/savepoints/savepoint-xxx \
  -py /opt/src/job/aggregation_job.py

This is the streaming equivalent of resizing a database replica set—possible, but requiring coordinated care.


🧬 Stateful Processing in the Real World

The DEZoomcamp streaming exercises use stateful windowed aggregations over taxi ride data. In production data engineering roles, the same patterns appear in:

  • Fraud detection: maintaining per-card transaction velocity state over sliding 5-minute windows
  • Recommendation engines: aggregating per-user click streams into feature vectors updated in near real-time
  • IoT monitoring: tracking per-device rolling averages of sensor readings, alerting when thresholds are exceeded
  • Financial trading: calculating per-instrument moving averages and VWAP (volume-weighted average price) across millisecond-frequency event streams

The state management primitives—keyed state, checkpoints, TTL, savepoints—are the same across all these domains. The schemas and business logic differ. The infrastructure is identical.


🔬 What Makes Stateful Processing Hard to Debug

The most common failure modes in stateful streaming pipelines:

  1. State explosion: a key with unbounded cardinality causes the state backend to run out of memory. Solution: TTL, monitoring, cardinality limits.
  2. Checkpoint timeout: state is too large to snapshot within the configured interval. Solution: incremental checkpoints (RocksDB), reduce state size, increase interval.
  3. Reprocessing after failed checkpoint: job restarts from an older checkpoint, causing events to be processed twice. Solution: idempotent sinks (UPSERTs), exactly-once semantics (covered in the next post).
  4. Slow state access: high key cardinality with RocksDB leads to disk I/O bottlenecks. Solution: SSD storage for RocksDB, compaction tuning.

None of these appear in batch pipelines, because batch pipelines have no persistent state across records. Each dbt run starts fresh. This is the price of continuity.


← Previous: The Streaming Stack: Anatomy of a Pipeline That Never Sleeps

Next: The Late Data Problem: Watermarks, Dead Zones, and Time →