Database Infusions: The Postgres Orchestration Protocol
Important
SYSTEM ALERT: Unstructured data detected in external storage. High risk of schema drift and duplicate records. Initiate database synchronization protocols immediately.
We have all been there. You download a CSV file, manually open a database client, and run an INSERT command. It works. The data lands.
But then, the inevitable happens. Next month’s file arrives. And the one after that. You’re manually running the same procedure, hoping you don’t accidentally re-insert the same records.
Here is the hard truth: Manual data loading is not a pipeline. It is a liability. To build production-grade ingestion, we need idempotency, staging tables, and scheduled triggers.
At the Yellow Capsule, we treat data ingestion like a blood transfusion: carefully staged, type-matched, and continuously monitored. Enter the Postgres Orchestration Protocol.
🧬 System Scan: The ETL Architecture
The 04_postgres_taxi.yaml and 05_postgres_taxi_scheduled.yaml workflows implement a robust ETL pattern for NYC taxi trip data.
The Core Design Principles
- Parameterized Inputs: The workflow accepts
taxi(yellow/green),year, andmonthas inputs, making it reusable for any dataset. - Staging Table Pattern: Data is first loaded into a temporary staging table, cleaned, and then merged into the final production table.
- Idempotent Merge: Uses a
MERGEstatement to only insert new records, preventing duplicates on re-runs.
🛠️ Neural Decomposition: The Surgical Procedure
Let’s trace the data flow through the organs of 04_postgres_taxi.yaml.
Phase 1: Extraction (The Biopsy)
|
|
- Downloads a gzipped CSV from GitHub releases.
- Decompresses and outputs the raw
.csvfile. - The
outputFilesdeclaration captures the file for downstream tasks.
Phase 2: Conditional Logic (The Diagnosis)
Yellow and green taxi datasets have different schemas. The workflow uses If conditions to branch:
|
|
System Note: This is a powerful pattern. Kestra supports flow control (If, Switch, EachParallel) to handle complex branching logic declaratively.
Phase 3: Table Preparation (The Prep Room)
|
|
- Creates the final destination table if it doesn’t exist.
- A parallel task creates the
_stagingtable with an identical schema.
Phase 4: Staging & Transformation (The Transfusion)
|
|
- Truncate First: Clears the staging table to ensure a clean load.
- CopyIn: Uses Postgres’s efficient
COPYcommand to bulk-load CSV data.
Phase 5: Data Enrichment (The Genetic Marker)
|
|
- Generates a unique row ID by hashing key columns.
- Tags each row with the source
filenamefor data lineage.
Phase 6: The Merge (The Graft)
|
|
This is the critical operation. The MERGE statement compares the staging table against the production table. It only inserts rows where the unique_row_id does not already exist.
System Note: This guarantees idempotency. You can run this workflow 100 times with the same input, and the data will only be inserted once.
⏰ The Automation Layer: Scheduled Triggers
The 05_postgres_taxi_scheduled.yaml takes the manual workflow and puts it on autopilot.
The Trigger Mechanism
|
|
cron: Uses standard cron syntax.0 9 1 * *means “At 09:00 on day 1 of every month.”inputs: Automatically passes predefined inputs when the trigger fires.- Dual Schedules: Green taxi runs at 9 AM, Yellow at 10 AM, staggering the load.
Dynamic Date Handling
The scheduled version uses trigger.date to dynamically determine the file to process:
|
|
System Note: The trigger.date variable is automatically populated by the scheduler. Combined with the date filter, it ensures the workflow always processes the correct monthly file.
Concurrency Control (The Quarantine)
|
|
This ensures only one instance of the workflow can run at a time. Critical for preventing race conditions on database writes.
🧪 Post-Op Analysis
Why is this pattern so powerful?
| Component | Purpose |
|---|---|
If Conditions |
Handle schema variations between datasets. |
| Staging Tables | Isolate raw data before transformation. |
unique_row_id (MD5 Hash) |
Enable idempotent merges, preventing duplicates. |
MERGE Statement |
Upsert logic for incremental data loading. |
| Scheduled Triggers | Fully automated, hands-off execution. |
| Concurrency Limits | Prevent parallel runs from corrupting data. |
When you master the Staging -> Transform -> Merge pattern, you have the blueprint for every production ETL pipeline.
This is the transition from “running scripts” to “operating a data platform.”
Close the database client. Open the Kestra scheduler. Let the system run itself.
Happy building, initiate.