YC Medical
ENTER

Spark SQL: Running SQL Queries on DataFrames

Goal: Learn how to combine multiple datasets in Spark, register DataFrames as temporary tables, and run SQL queries — bridging the gap between DataFrame operations and SQL.


1. Why Spark SQL?

There are dedicated tools for running SQL on data lakes (Hive, Presto, Athena). But if you already have a Spark cluster, Spark SQL lets you run SQL queries without setting up additional infrastructure.

Spark SQL is especially useful when:

  • You’re mixing SQL and Python logic in the same pipeline.
  • You want to reuse existing SQL queries (e.g., from a dbt project) inside Spark.
  • You need to prototype quickly before moving to a dedicated SQL engine.

2. Combining Two Datasets

Let’s work with the green and yellow NYC taxi datasets. Since their pickup/dropoff column names differ, we first need to align them:

1
2
3
4
5
6
7
8
9
df_green = spark.read.parquet('data/pq/green/*/*')
df_green = df_green \
    .withColumnRenamed('lpep_pickup_datetime', 'pickup_datetime') \
    .withColumnRenamed('lpep_dropoff_datetime', 'dropoff_datetime')

df_yellow = spark.read.parquet('data/pq/yellow/*/*')
df_yellow = df_yellow \
    .withColumnRenamed('tpep_pickup_datetime', 'pickup_datetime') \
    .withColumnRenamed('tpep_dropoff_datetime', 'dropoff_datetime')

Finding Common Columns

1
2
3
4
5
6
common_columns = []
yellow_columns = set(df_yellow.columns)

for col in df_green.columns:
    if col in yellow_columns:
        common_columns.append(col)

Adding a Service Type Identifier

Before merging, we add a column to track which dataset each record came from:

1
2
3
4
5
6
7
8
9
from pyspark.sql import functions as F

df_green_sel = df_green \
    .select(common_columns) \
    .withColumn('service_type', F.lit('green'))

df_yellow_sel = df_yellow \
    .select(common_columns) \
    .withColumn('service_type', F.lit('yellow'))
  • F.lit() adds a literal (constant) value to every row.

Merging with unionAll

1
2
3
4
df_trips_data = df_green_sel.unionAll(df_yellow_sel)

# Quick sanity check
df_trips_data.groupBy('service_type').count().show()

3. Querying with Temporary Tables

Spark SQL needs a table to query. We can register a DataFrame as a temporary table:

1
df_trips_data.registerTempTable('trips_data')

Note: registerTempTable() is deprecated in newer Spark versions. Use createOrReplaceTempView() instead.

Now we can run standard SQL:

1
2
3
4
5
6
7
8
9
spark.sql("""
SELECT
    service_type,
    count(1)
FROM
    trips_data
GROUP BY
    service_type
""").show()

This produces the same result as df_trips_data.groupBy('service_type').count().show() — but expressed in SQL.


4. A Real-World Revenue Query

Let’s replicate a revenue aggregation model in Spark SQL:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
df_result = spark.sql("""
SELECT
    -- Revenue grouping
    PULocationID AS revenue_zone,
    date_trunc('month', pickup_datetime) AS revenue_month,
    service_type,

    -- Revenue calculation
    SUM(fare_amount) AS revenue_monthly_fare,
    SUM(extra) AS revenue_monthly_extra,
    SUM(mta_tax) AS revenue_monthly_mta_tax,
    SUM(tip_amount) AS revenue_monthly_tip_amount,
    SUM(tolls_amount) AS revenue_monthly_tolls_amount,
    SUM(improvement_surcharge) AS revenue_monthly_improvement_surcharge,
    SUM(total_amount) AS revenue_monthly_total_amount,
    SUM(congestion_surcharge) AS revenue_monthly_congestion_surcharge,

    -- Additional calculations
    AVG(passenger_count) AS avg_monthly_passenger_count,
    AVG(trip_distance) AS avg_monthly_trip_distance
FROM
    trips_data
GROUP BY
    1, 2, 3
""")
  • SQL queries in Spark are transformations (lazy). You need an action like show() or write() to trigger execution.
  • We use positional references (1, 2, 3) in GROUP BY for cleaner syntax.

5. Writing Results and Controlling Partitions

Writing the result directly:

1
df_result.write.parquet('data/report/revenue/')

With our dataset, this could create 200+ tiny Parquet files — not ideal. Use coalesce() to reduce the partition count:

1
df_result.coalesce(1).write.parquet('data/report/revenue/', mode='overwrite')
Method Purpose
repartition(n) Increase partitions (involves a shuffle).
coalesce(n) Decrease partitions (no shuffle, more efficient).

Rule of thumb: Use coalesce() when reducing partitions and repartition() when increasing them. coalesce() avoids the expensive shuffle operation.


In the next post, we’ll look under the hood at Spark internals — how clusters work, how GROUP BY and JOINs are executed across distributed partitions, and what RDDs are.