YC Medical
ENTER

Getting Started With PySpark: Sessions, DataFrames, and Transformations

Goal: Set up a PySpark environment, read data into Spark DataFrames, understand partitions and the Parquet format, and learn the difference between lazy transformations and eager actions.


1. Creating a Spark Session

Everything in PySpark starts with a Spark Session — the entry point for interacting with Spark.

1
2
3
4
5
6
7
import pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()
  • master("local[*]") — run Spark locally using all available CPU cores.
  • appName('test') — names the application (visible in the Spark UI).
  • getOrCreate() — creates a new session or reuses an existing one.

Once running, the Spark UI is available at localhost:4040, showing all current and past jobs.


2. Reading CSV Files

Spark can read CSV files into DataFrames — its tabular data structure, similar to Pandas.

1
2
3
df = spark.read \
    .option("header", "true") \
    .csv('fhvhv_tripdata_2021-01.csv')
  • option("header", "true") — treats the first row as column names.
  • Unlike Pandas, Spark cannot automatically infer data types from CSV. All columns default to strings.

The Pandas Schema Trick

To get proper data types, use this workflow:

  1. Create a small CSV with the first 1,000 rows.
  2. Read it into a Pandas DataFrame (which infers types automatically).
  3. Convert the Pandas DataFrame to Spark and inspect the schema:
    1
    
    spark.createDataFrame(pandas_df).schema
  4. Use the output to build a StructType schema:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    
    from pyspark.sql import types
    
    schema = types.StructType([
        types.StructField("hvfhs_license_num", types.StringType(), True),
        types.StructField("dispatching_base_num", types.StringType(), True),
        types.StructField("pickup_datetime", types.TimestampType(), True),
        types.StructField("dropoff_datetime", types.TimestampType(), True),
        # ... more fields
    ])
  5. Re-read the full CSV with the explicit schema:
    1
    2
    3
    4
    
    df = spark.read \
        .option("header", "true") \
        .schema(schema) \
        .csv('fhvhv_tripdata_2021-01.csv')

3. Partitions and Parquet

Why Partition?

A Spark cluster has multiple executors. A single CSV file can only be read by one executor — no parallelism. To distribute work, we partition the data.

1
2
3
4
5
# Split the DataFrame into 24 partitions
df = df.repartition(24)

# Write as Parquet files
df.write.parquet('fhvhv/2021/01/')

This produces 24 Parquet files and a _SUCCESS marker. Each executor processes its own partition in parallel.

Why Parquet?

  • Schema embedded — no need to specify types when reading.
  • Columnar storage — more compact than CSV (integers take less space than their string representations).
  • Better compression — significantly smaller file sizes.

Reading Parquet is simpler:

1
2
df = spark.read.parquet('fhvhv/2021/01/')
df.printSchema()

Tip: The opposite of partitioning (reducing partitions) is called coalescing: df.coalesce(1).


4. Working with Spark DataFrames

Spark DataFrames support many Pandas-like operations:

1
2
3
4
5
6
# Select specific columns
new_df = df.select('pickup_datetime', 'dropoff_datetime', 'PULocationID', 'DOLocationID')

# Filter by value
new_df = df.select('pickup_datetime', 'dropoff_datetime', 'PULocationID', 'DOLocationID') \
    .filter(df.hvfhs_license_num == 'HV0003')

The official quick guide covers many more operations.


5. Actions vs Transformations

This is one of the most important concepts in Spark.

Type Behavior Examples
Transformations (lazy) Define what to do, but don’t execute immediately. select(), filter(), join(), groupBy(), repartition()
Actions (eager) Trigger actual computation on the cluster. show(), take(), head(), write(), read()
1
2
# Nothing happens until show() is called
df.select('pickup_datetime').filter(df.hvfhs_license_num == 'HV0003').show()

Spark builds a DAG (Directed Acyclic Graph) of transformations. Computation only starts when an action triggers it. This lazy evaluation allows Spark to optimize the execution plan before running it.


6. Built-in Functions and UDFs

Built-in Functions

Spark provides a rich library of built-in functions:

1
2
3
4
5
6
7
from pyspark.sql import functions as F

df \
    .withColumn('pickup_date', F.to_date(df.pickup_datetime)) \
    .withColumn('dropoff_date', F.to_date(df.dropoff_datetime)) \
    .select('pickup_date', 'dropoff_date', 'PULocationID', 'DOLocationID') \
    .show()
  • withColumn() — adds or overwrites a column (a transformation).
  • F.to_date() — converts a timestamp to a date (year/month/day only).

A full list of functions is available in the Spark SQL documentation.

User Defined Functions (UDFs)

When built-in functions aren’t enough, you can define your own:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
def crazy_stuff(base_num):
    num = int(base_num[1:])
    if num % 7 == 0:
        return f's/{num:03x}'
    elif num % 3 == 0:
        return f'a/{num:03x}'
    else:
        return f'e/{num:03x}'

# Register as a Spark UDF
crazy_stuff_udf = F.udf(crazy_stuff, returnType=types.StringType())
  • F.udf() wraps a regular Python function so Spark can use it across partitions.
  • UDFs are especially useful for ML preprocessing and complex business logic that’s hard to express in SQL.

Use it like any built-in function:

1
2
3
4
df \
    .withColumn('base_id', crazy_stuff_udf(df.dispatching_base_num)) \
    .select('base_id', 'pickup_date', 'dropoff_date', 'PULocationID', 'DOLocationID') \
    .show()

In the next post, we’ll explore Spark SQL — running SQL queries directly on DataFrames and combining multiple datasets.