YC Medical
ENTER

Spark Internals: Clusters, Shuffles, Joins, and RDDs

Goal: Understand how Spark executes jobs across a cluster, how operations like GROUP BY and JOIN are distributed, and what Resilient Distributed Datasets (RDDs) are.


1. Spark Cluster Architecture

When running beyond a single machine, Spark uses a master-worker architecture:

  • Master — the entry point that receives jobs and distributes work.
  • Driver — the program that submits a Spark job (an Airflow DAG, a Python script, etc.).
  • Executors — worker nodes that actually process the data.
flowchart LR
    A["Driver (Spark Job)"] -->|"spark-submit
port 4040"| Master subgraph Cluster["Spark Cluster"] Master([Master]) Master --> E1{{Executor 1}} Master --> E2{{Executor 2}} Master --> E3{{Executor 3}} end subgraph Data["DataFrame Partitions (in GCS/S3)"] P1[Partition 1] P2[Partition 2] P3[Partition 3] end E1 <--> P1 E2 <--> P2 E3 <--> P3
flowchart LR
    A["Driver (Spark Job)"] -->|"spark-submit
port 4040"| Master subgraph Cluster["Spark Cluster"] Master([Master]) Master --> E1{{Executor 1}} Master --> E2{{Executor 2}} Master --> E3{{Executor 3}} end subgraph Data["DataFrame Partitions (in GCS/S3)"] P1[Partition 1] P2[Partition 2] P3[Partition 3] end E1 <--> P1 E2 <--> P2 E3 <--> P3
flowchart LR
    A["Driver (Spark Job)"] -->|"spark-submit
port 4040"| Master subgraph Cluster["Spark Cluster"] Master([Master]) Master --> E1{{Executor 1}} Master --> E2{{Executor 2}} Master --> E3{{Executor 3}} end subgraph Data["DataFrame Partitions (in GCS/S3)"] P1[Partition 1] P2[Partition 2] P3[Partition 3] end E1 <--> P1 E2 <--> P2 E3 <--> P3

Each executor fetches a partition from the Data Lake, processes it, and writes the output. If there are more partitions than executors, executors keep pulling partitions until all are processed.

Spark vs Hadoop: Unlike Hadoop, where data is stored locally on executor nodes, Spark separates storage from compute. This is possible because modern cloud storage (S3, GCS) is cheap and fast enough to make data locality unnecessary.


2. How GROUP BY Works (Shuffling)

Consider this query that calculates hourly revenue per zone:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
df_green_revenue = spark.sql("""
SELECT
    date_trunc('hour', lpep_pickup_datetime) AS hour,
    PULocationID AS zone,
    SUM(total_amount) AS amount,
    COUNT(1) AS number_records
FROM green
WHERE lpep_pickup_datetime >= '2020-01-01 00:00:00'
GROUP BY 1, 2
""")

Since data is split across partitions, records with the same key (hour, zone) may live in different partitions. Spark solves this in two stages:

Stage 1: Local Aggregation

Each executor groups the data within its own partition and produces intermediate results.

Executor Intermediate Result
Executor 1 (hour 1, zone 1, $100, 5 trips), (hour 1, zone 2, $200, 10 trips)
Executor 2 (hour 1, zone 1, $50, 2 trips), (hour 1, zone 2, $250, 11 trips)
Executor 3 (hour 1, zone 1, $200, 10 trips), (hour 2, zone 1, $75, 3 trips)

Stage 2: Shuffle & Reduce

Spark shuffles (redistributes) the data so that all records with the same key end up in the same partition. Then it performs a final aggregation.

After Shuffle Final Result
Partition A: all (hour 1, zone 1) records (hour 1, zone 1, $350, 17 trips)
Partition B: all (hour 1, zone 2) and (hour 2, zone 1) records (hour 1, zone 2, $450, 21 trips), (hour 2, zone 1, $75, 3 trips)

The algorithm used for shuffling is called external merge sort.

Performance note: Shuffling is expensive because it moves data across the network. Minimizing the data that needs to be shuffled is key to optimizing Spark jobs. By default, Spark creates 200 partitions after a shuffle.


3. How JOINs Work

Joining Two Large Tables (Sort-Merge Join)

When joining two large tables (e.g., green taxi revenue and yellow taxi revenue), Spark uses the same shuffle + merge strategy as GROUP BY:

  1. Shuffle both tables so that matching keys end up in the same partition.
  2. Merge (reduce) the records with matching keys into a single output.
1
2
3
4
5
df_join = df_green_revenue_tmp.join(
    df_yellow_revenue_tmp,
    on=['hour', 'zone'],
    how='outer'
)
  • on= — the join key(s), forming a composite key.
  • how='outer' — includes records even if only one side has data (fills the other side with nulls).

Joining a Large Table with a Small Table (Broadcast Join)

When one table is small (e.g., a zones lookup table), shuffling is wasteful. Instead, Spark uses broadcasting:

graph LR
    Z[Zones Table] -.->|Broadcast| E1 & E2 & E3
    subgraph Executors
        E1{{Executor 1}} -.-> Z1["zones (local copy)"]
        E2{{Executor 2}} -.-> Z2["zones (local copy)"]
        E3{{Executor 3}} -.-> Z3["zones (local copy)"]
    end
    B1[Big Table P1] --> E1 --> R1[Result 1]
    B2[Big Table P2] --> E2 --> R2[Result 2]
    B3[Big Table P3] --> E3 --> R3[Result 3]
graph LR
    Z[Zones Table] -.->|Broadcast| E1 & E2 & E3
    subgraph Executors
        E1{{Executor 1}} -.-> Z1["zones (local copy)"]
        E2{{Executor 2}} -.-> Z2["zones (local copy)"]
        E3{{Executor 3}} -.-> Z3["zones (local copy)"]
    end
    B1[Big Table P1] --> E1 --> R1[Result 1]
    B2[Big Table P2] --> E2 --> R2[Result 2]
    B3[Big Table P3] --> E3 --> R3[Result 3]
graph LR
    Z[Zones Table] -.->|Broadcast| E1 & E2 & E3
    subgraph Executors
        E1{{Executor 1}} -.-> Z1["zones (local copy)"]
        E2{{Executor 2}} -.-> Z2["zones (local copy)"]
        E3{{Executor 3}} -.-> Z3["zones (local copy)"]
    end
    B1[Big Table P1] --> E1 --> R1[Result 1]
    B2[Big Table P2] --> E2 --> R2[Result 2]
    B3[Big Table P3] --> E3 --> R3[Result 3]

Spark sends a copy of the entire small table to every executor. Each executor then performs a local lookup — no shuffle needed, making the join orders of magnitude faster.

1
2
3
df_zones = spark.read.parquet('zones/')
df_result = df_join.join(df_zones, df_join.zone == df_zones.LocationID)
df_result.drop('LocationID', 'zone').write.parquet('tmp/revenue-zones')

4. Resilient Distributed Datasets (RDDs)

RDDs are the low-level abstraction that Spark DataFrames are built on top of. While DataFrames have a schema, RDDs are simply distributed collections of objects.

From DataFrame to RDD

1
2
3
rdd = df_green \
    .select('lpep_pickup_datetime', 'PULocationID', 'total_amount') \
    .rdd

Map, Filter, and ReduceByKey

You can re-implement SQL-like logic with RDD operations:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
from datetime import datetime

start = datetime(year=2020, month=1, day=1)

# Filter
def filter_outliers(row):
    return row.lpep_pickup_datetime >= start

# Map: create (key, value) pairs
def prepare_for_grouping(row):
    hour = row.lpep_pickup_datetime.replace(minute=0, second=0, microsecond=0)
    zone = row.PULocationID
    key = (hour, zone)
    value = (row.total_amount, 1)
    return (key, value)

# Reduce: aggregate values by key
def calculate_revenue(left, right):
    return (left[0] + right[0], left[1] + right[1])

result_rdd = rdd \
    .filter(filter_outliers) \
    .map(prepare_for_grouping) \
    .reduceByKey(calculate_revenue)

Converting Back to a DataFrame

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
from pyspark.sql import types

result_schema = types.StructType([
    types.StructField('hour', types.TimestampType(), True),
    types.StructField('zone', types.IntegerType(), True),
    types.StructField('revenue', types.DoubleType(), True),
    types.StructField('count', types.IntegerType(), True)
])

df_result = result_rdd.map(unwrap).toDF(result_schema)

When to use RDDs? Almost never directly. DataFrames and Spark SQL are higher-level, more optimized, and easier to work with. But understanding RDDs helps you reason about how Spark works under the hood.


5. mapPartitions: Per-Partition Processing

The mapPartitions() method transforms entire partitions instead of individual elements — useful for batch operations like ML inference:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
def apply_model_in_batch(rows):
    df = pd.DataFrame(rows, columns=columns)
    predictions = model_predict(df)
    df['predicted_duration'] = predictions
    for row in df.itertuples():
        yield row

df_predicts = duration_rdd \
    .mapPartitions(apply_model_in_batch) \
    .toDF() \
    .drop('Index')
  • Each partition is converted to a Pandas DataFrame and passed to the model.
  • yield returns a generator, which Spark uses to build the output RDD incrementally.
  • Use case: when your model expects a batch of records (e.g., a NumPy array) rather than one row at a time.

In the next post, we’ll move from local development to the cloud — connecting Spark to Google Cloud Storage, setting up standalone clusters, and running jobs on Dataproc.