Database Infusions: The Postgres Orchestration Protocol

Important

SYSTEM ALERT: Unstructured data detected in external storage. High risk of schema drift and duplicate records. Initiate database synchronization protocols immediately.

We have all been there. You download a CSV file, manually open a database client, and run an INSERT command. It works. The data lands.

But then, the inevitable happens. Next month’s file arrives. And the one after that. You’re manually running the same procedure, hoping you don’t accidentally re-insert the same records.

Here is the hard truth: Manual data loading is not a pipeline. It is a liability. To build production-grade ingestion, we need idempotency, staging tables, and scheduled triggers.

At the Yellow Capsule, we treat data ingestion like a blood transfusion: carefully staged, type-matched, and continuously monitored. Enter the Postgres Orchestration Protocol.


🧬 System Scan: The ETL Architecture

The 04_postgres_taxi.yaml and 05_postgres_taxi_scheduled.yaml workflows implement a robust ETL pattern for NYC taxi trip data.

The Core Design Principles

  1. Parameterized Inputs: The workflow accepts taxi (yellow/green), year, and month as inputs, making it reusable for any dataset.
  2. Staging Table Pattern: Data is first loaded into a temporary staging table, cleaned, and then merged into the final production table.
  3. Idempotent Merge: Uses a MERGE statement to only insert new records, preventing duplicates on re-runs.

🛠️ Neural Decomposition: The Surgical Procedure

Let’s trace the data flow through the organs of 04_postgres_taxi.yaml.

Phase 1: Extraction (The Biopsy)

1
2
3
4
5
6
- id: extract
  type: io.kestra.plugin.scripts.shell.Commands
  outputFiles:
    - "*.csv"
  commands:
    - wget -qO- https://github.com/.../{{render(vars.file)}}.gz | gunzip > {{render(vars.file)}}
  • Downloads a gzipped CSV from GitHub releases.
  • Decompresses and outputs the raw .csv file.
  • The outputFiles declaration captures the file for downstream tasks.

Phase 2: Conditional Logic (The Diagnosis)

Yellow and green taxi datasets have different schemas. The workflow uses If conditions to branch:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
- id: if_yellow_taxi
  type: io.kestra.plugin.core.flow.If
  condition: "{{inputs.taxi == 'yellow'}}"
  then:
    # ... yellow taxi specific tasks ...

- id: if_green_taxi
  type: io.kestra.plugin.core.flow.If
  condition: "{{inputs.taxi == 'green'}}"
  then:
    # ... green taxi specific tasks ...

System Note: This is a powerful pattern. Kestra supports flow control (If, Switch, EachParallel) to handle complex branching logic declaratively.

Phase 3: Table Preparation (The Prep Room)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
- id: yellow_create_table
  type: io.kestra.plugin.jdbc.postgresql.Queries
  sql: |
    CREATE TABLE IF NOT EXISTS {{render(vars.table)}} (
        unique_row_id          text,
        filename               text,
        VendorID               text,
        tpep_pickup_datetime   timestamp,
        ...
    );
  • Creates the final destination table if it doesn’t exist.
  • A parallel task creates the _staging table with an identical schema.

Phase 4: Staging & Transformation (The Transfusion)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
- id: yellow_truncate_staging_table
  type: io.kestra.plugin.jdbc.postgresql.Queries
  sql: |
    TRUNCATE TABLE {{render(vars.staging_table)}};

- id: yellow_copy_in_to_staging_table
  type: io.kestra.plugin.jdbc.postgresql.CopyIn
  format: CSV
  from: "{{render(vars.data)}}"
  table: "{{render(vars.staging_table)}}"
  header: true
  • Truncate First: Clears the staging table to ensure a clean load.
  • CopyIn: Uses Postgres’s efficient COPY command to bulk-load CSV data.

Phase 5: Data Enrichment (The Genetic Marker)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
- id: yellow_add_unique_id_and_filename
  type: io.kestra.plugin.jdbc.postgresql.Queries
  sql: |
    UPDATE {{render(vars.staging_table)}}
    SET 
      unique_row_id = md5(
        COALESCE(CAST(VendorID AS text), '') ||
        COALESCE(CAST(tpep_pickup_datetime AS text), '') || 
        ...   
      ),
      filename = '{{render(vars.file)}}';
  • Generates a unique row ID by hashing key columns.
  • Tags each row with the source filename for data lineage.

Phase 6: The Merge (The Graft)

1
2
3
4
5
6
7
8
- id: yellow_merge_data
  type: io.kestra.plugin.jdbc.postgresql.Queries
  sql: |
    MERGE INTO {{render(vars.table)}} AS T
    USING {{render(vars.staging_table)}} AS S
    ON T.unique_row_id = S.unique_row_id
    WHEN NOT MATCHED THEN
      INSERT (...) VALUES (...);

This is the critical operation. The MERGE statement compares the staging table against the production table. It only inserts rows where the unique_row_id does not already exist.

System Note: This guarantees idempotency. You can run this workflow 100 times with the same input, and the data will only be inserted once.


⏰ The Automation Layer: Scheduled Triggers

The 05_postgres_taxi_scheduled.yaml takes the manual workflow and puts it on autopilot.

The Trigger Mechanism

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
triggers:
  - id: green_schedule
    type: io.kestra.plugin.core.trigger.Schedule
    cron: "0 9 1 * *"
    inputs:
      taxi: green

  - id: yellow_schedule
    type: io.kestra.plugin.core.trigger.Schedule
    cron: "0 10 1 * *"
    inputs:
      taxi: yellow
  • cron: Uses standard cron syntax. 0 9 1 * * means “At 09:00 on day 1 of every month.”
  • inputs: Automatically passes predefined inputs when the trigger fires.
  • Dual Schedules: Green taxi runs at 9 AM, Yellow at 10 AM, staggering the load.

Dynamic Date Handling

The scheduled version uses trigger.date to dynamically determine the file to process:

1
2
variables:
  file: "{{inputs.taxi}}_tripdata_{{trigger.date | date('yyyy-MM')}}.csv"

System Note: The trigger.date variable is automatically populated by the scheduler. Combined with the date filter, it ensures the workflow always processes the correct monthly file.

Concurrency Control (The Quarantine)

1
2
concurrency:
  limit: 1

This ensures only one instance of the workflow can run at a time. Critical for preventing race conditions on database writes.


🧪 Post-Op Analysis

Why is this pattern so powerful?

Component Purpose
If Conditions Handle schema variations between datasets.
Staging Tables Isolate raw data before transformation.
unique_row_id (MD5 Hash) Enable idempotent merges, preventing duplicates.
MERGE Statement Upsert logic for incremental data loading.
Scheduled Triggers Fully automated, hands-off execution.
Concurrency Limits Prevent parallel runs from corrupting data.

When you master the Staging -> Transform -> Merge pattern, you have the blueprint for every production ETL pipeline.

This is the transition from “running scripts” to “operating a data platform.”

Close the database client. Open the Kestra scheduler. Let the system run itself.

Happy building, initiate.