YC Medical
ENTER

PyFlink I: Architecture, Checkpoints, and Pass-Through Jobs

Goal: Understand Flink’s internal architecture (JobManager and TaskManager). Build a custom Docker environment to run PyFlink. Write a Flink SQL “Pass-Through” job to experience how Flink automatically handles Kafka reading, Checkpointing, and PostgreSQL writing.


In the previous two posts, we manually wired up a streaming pipeline: Python -> Kafka -> Python -> PostgreSQL. However, we also saw that relying on native code to handle offset management, fault recovery, multi-threading, and window aggregations is an engineering disaster.

Apache Flink exists to solve these hardcore foundation engineering problems. Flink takes over the heaviest distributed computing tasks for us:

  • Windowing — Built-in tumbling, sliding, and session windows (no need to hand-write complex timing logic).
  • Checkpointing — Automatic fault recovery! Say goodbye to tracking manual offsets.
  • Connectors Ecosystem — Out-of-the-box interfaces for Kafka, JDBC, Filesystems, and more (no more writing boilerplate psycopg2 or complex serialization logic).
  • Parallelism — Automatically scale data processing from a single worker out to an entire server cluster.
  • Flink SQL — An elegant, highly expressive way to manage and define data pipelines using native SQL queries!

However, powerful engines require proper infrastructure. We must maintain a Flink cluster somewhat similarly to a 24/7 server.


A Flink service is driven by two main process types:

  1. JobManager (The Brain): The master node, acting as the coordinator and manager. It accepts all Flink Job Submissions and oversees state recovery (Checkpoints) during execution. It also exposes port 8081 for the Web UI, visualizing all running details.
  2. TaskManager (The Brawn): The worker node. It holds a finite number of “Task Slots”. This is the actual physical vessel that performs computations, merges data, and handles I/O.

Task Slots: Think of a slot as a dedicated highway lane (a heavily isolated slice of CPU/Memory). If you define a Parallelism = 3, it means your job securely claims three lanes simultaneously, splitting the workload into 3 independent parallel pipes for maximum throughput. If a cluster provides 15 available task slots, you could simultaneously run 5 of these degree-3 micro-pipelines.

The native Flink Docker image only natively supports Java. We need to extend its ecosystem by injecting Python support, PyFlink packages, and third-party connector JARs (like kafka-connector and postgres-jdbc-driver).

In our project, we can build a customized Dockerfile.flink containing these modules, generating a pyflink-workshop image.

Then, we deploy it in our docker-compose.yml:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
  jobmanager:
    image: pyflink-workshop # Custom base image
    ports:
      - "8081:8081" # UI Access
    command: jobmanager
    # We use FLINK_PROPERTIES to configure memory and JobManager addresses

  taskmanager:
    image: pyflink-workshop
    depends_on:
      - jobmanager
    command: taskmanager --taskmanager.registration.timeout 5 min
    # Configure Task Slots (e.g., 15) and default parallelism here

Run docker compose up --build -d, navigate to localhost:8081, and you’ll see a healthy TaskManager node with 15 fully-loaded slots ready for action!


3. Writing the First “Pass-Through” Streaming Job

Our first Flink streaming job will mimic our old native Python consumer: fetch from Kafka -> save directly to PostgreSQL. But this time, we aren’t writing low-level APIs. Flink will handle all fault tolerance and wiring.

We need two Flink Dynamic Tables.

Step 1: Create the Source Virtual Table (Kafka)

Here, we use Flink DDL to map the data directly (no separate deserializer needed):

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
def create_events_source_kafka(t_env):
    table_name = "events"
    source_ddl = f"""
        CREATE TABLE {table_name} (
            PULocationID INTEGER,
            DOLocationID INTEGER,
            trip_distance DOUBLE,
            total_amount DOUBLE,
            tpep_pickup_datetime BIGINT
        ) WITH (
            'connector' = 'kafka',
            'properties.bootstrap.servers' = 'redpanda:29092',
            'topic' = 'rides',
            'scan.startup.mode' = 'latest-offset', -- Crucial!
            'format' = 'json'  -- Flink deserializes automatically
        );
        """
    t_env.execute_sql(source_ddl)
    return table_name

latest-offset vs earliest-offset:

  • 'latest-offset' is perfect for general deployment (only processing streams arriving after the job boots).
  • If you want to replay historical data or perform Backfilling, use 'earliest-offset' so Flink treats Kafka like a batch database to sweep.
  • To seamlessly resume from a crash, Flink relies on clearly defined Checkpoint Recoveries or the precise 'timestamp' boot mode.

Step 2: Create the Sink Virtual Table (PostgreSQL)

We use the official JDBC Sink connector to pipe the water into a new pool.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
def create_processed_events_sink_postgres(t_env):
    table_name = 'processed_events'
    sink_ddl = f"""
        CREATE TABLE {table_name} (
            PULocationID INTEGER,
            DOLocationID INTEGER,
            trip_distance DOUBLE,
            total_amount DOUBLE,
            pickup_datetime TIMESTAMP(3)
        ) WITH (
            'connector' = 'jdbc',
            'url' = 'jdbc:postgresql://postgres:5432/postgres',
            'table-name' = '{table_name}',
            'username' = 'postgres',
            'password' = 'postgres',
            'driver' = 'org.postgresql.Driver'
        );
        """
    t_env.execute_sql(sink_ddl)
    return table_name

Step 3: Wire the Pipes and Enable Checkpoints

Now we write the execution body: enabling checkpoints. With this, if a task violently crashes and restarts, Flink won’t blindly read offsets. It strictly resumes from a high-throughput snapshot it securely stored internally.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import EnvironmentSettings, StreamTableEnvironment

def log_processing():
    env = StreamExecutionEnvironment.get_execution_environment()
    # Execute distributed snapshots every 10 seconds
    env.enable_checkpointing(10 * 1000) 

    settings = EnvironmentSettings.new_instance().in_streaming_mode().build()
    t_env = StreamTableEnvironment.create(env, environment_settings=settings)

    source_table = create_events_source_kafka(t_env)
    postgres_sink = create_processed_events_sink_postgres(t_env)

    # Use pure SQL to trigger the streaming engine
    t_env.execute_sql(
        f"""
        INSERT INTO {postgres_sink}
        SELECT
            PULocationID,
            DOLocationID,
            trip_distance,
            total_amount,
            # Native function TO_TIMESTAMP_LTZ handles epoch conversion
            TO_TIMESTAMP_LTZ(tpep_pickup_datetime, 3) as pickup_datetime
        FROM {source_table}
        """
    ).wait() # .wait() blocks the script, ensuring continuous stream listening

if __name__ == '__main__':
    log_processing()

Submitting the Job

Submit the task to the JobManager:

1
2
3
docker compose exec jobmanager ./bin/flink run \
    -py /opt/src/job/pass_through_job.py \
    --pyFiles /opt/src -d

Once submitted, you’ll see the Job suspending into the background. Checking the localhost:8081 dashboard will reveal not only the DAG graph of the computation but also the precise details of the 10-second Checkpoint persistence.

Through declarative configurations, we achieved enterprise-grade disaster recovery without writing hundreds of lines of fragile communication code.

Next up: Stream processing is never just a straight line. Now that we have a distributed engine, let’s put it under pressure. In the next chapter, we unpack the very soul of Flink: Windows and using Watermarks to handle delayed data.