Building an End-to-End Data Pipeline With Bruin

Goal: Learn how to transition from theory to practice by building an end-to-end data pipeline locally using Bruin and DuckDB, processing NYC Taxi Data.


1. Getting Started: Installation & Setup

Before building our pipeline, we need to set up Bruin on our local machine.

Installation

Bruin provides a simple CLI that can be installed via terminal:

1
2
curl -LsSf https://getbruin.com/install/cli | sh
bruin version

Tip: It is highly recommended to install the Bruin extension for VS Code or Cursor. This provides a render panel, syntax highlighting, and a visual lineage graph directly inside your IDE.

Initializing a Project

To start a new project, we use the bruin init command. Bruin requires projects to be git-initialized, which the init command handles automatically.

1
2
bruin init default my-taxi-pipeline
cd my-taxi-pipeline

2. Pipeline Architecture & Configuration

We are going to build a Three-Layered Pipeline using DuckDB as our local data warehouse:

  1. Ingestion Layer: Extract data from external sources and store it as raw data.
  2. Staging Layer: Clean, deduplicate, transform, and join the raw data.
  3. Reports Layer: Aggregate the clean data into business-ready metrics.

Project Configuration (.bruin.yml)

At the root of the project, the .bruin.yml file defines our environments and database connections. We will define a local DuckDB connection:

1
2
3
4
5
6
7
default_environment: default
environments:
  default:
    connections:
      duckdb:
        - name: duckdb-default
          path: duckdb.db

Pipeline Configuration (pipeline.yml)

Inside our pipeline directory, we define the schedule and default connection for our assets:

1
2
3
4
5
6
7
8
9
name: nyc_taxi
schedule: daily
start_date: "2022-01-01"
default_connections:
  duckdb: duckdb-default
variables:
  taxi_types:
    type: array
    default: ["yellow"]

3. Building the Pipeline layers

Layer 1: Ingestion (Python & Seed Assets)

The Ingestion layer pulls data into our warehouse without applying complex business logic.

1. Python Asset (trips.py): We use Python to fetch NYC Taxi data from a remote URL. Bruin handles the execution and expects a Pandas DataFrame to be returned, which it automatically inserts into DuckDB using an append materialization strategy.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
"""@bruin
name: ingestion.trips
type: python
materialization:
  type: table
  strategy: append
@bruin"""
import os
import pandas as pd

def materialize():
    # Fetch data based on BRUIN_START_DATE and BRUIN_END_DATE
    # Return pandas DataFrame
    return final_dataframe

2. Seed Asset (payment_lookup.asset.yml): We have a local CSV file mapping payment codes to readable names (e.g., 1 -> credit_card). Bruin’s Seed asset automatically loads this CSV into DuckDB. We also add Data Quality Checks directly in the YAML definition (e.g., not_null, unique).

Layer 2: Staging (SQL Asset)

The Staging layer cleans and joins the data. This is where Bruin’s dependency magic happens. We explicitly state that this asset depends on the two ingestion assets.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
/* @bruin
name: staging.trips
type: duckdb.sql
depends:
  - ingestion.trips
  - ingestion.payment_lookup
materialization:
  type: table
  strategy: time_interval
  incremental_key: pickup_datetime
@bruin */

SELECT
    t.pickup_datetime,
    t.fare_amount,
    p.payment_type_name
FROM ingestion.trips t
LEFT JOIN ingestion.payment_lookup p ON t.payment_type = p.payment_type_id
WHERE t.pickup_datetime >= '{{ start_datetime }}'
  AND t.pickup_datetime < '{{ end_datetime }}'
  • Time Interval Strategy: Bruin will automatically delete existing rows within the specific date range, then insert the new results, making this operation idempotent.
  • Jinja Templating: The {{ start_datetime }} variables are automatically injected by Bruin during the run.

Layer 3: Reports (SQL Asset)

Finally, we create an aggregated report summing up the fares grouped by day and payment type. This asset depends on staging.trips.

1
2
3
4
5
6
7
/* @bruin
name: reports.trips_report
type: duckdb.sql
depends:
  - staging.trips
@bruin */
-- SQL Aggregations go here...

4. Running the Pipeline

With all assets defined, Bruin automatically understands the execution order (Ingestion -> Staging -> Reports).

First, validate the pipeline to ensure there are no syntax errors or broken dependencies:

1
bruin validate ./pipeline/pipeline.yml

Then, trigger a run for a specific date range:

1
bruin run ./pipeline/pipeline.yml --start-date 2022-01-01 --end-date 2022-02-01

To verify the data, you can use the Bruin query command:

1
bruin query --connection duckdb-default --query "SELECT COUNT(*) FROM reports.trips_report"

In the final post of this series, we will explore advanced features: using Bruin MCP with AI Agents to build pipelines conversationally, and deploying our pipeline to Bruin Cloud for fully managed orchestration.