Databricks

PySpark Guide: Core Syntax for Databricks Exam Prep

2026-03-21
NicheeLab Editorial Team

PySpark is one of the most frequently tested technologies on Databricks exams. Data Engineer Associate, Spark Developer Associate, and ML Associate all include PySpark code reading questions. This article covers the PySpark syntax you need for the exams end-to-end: DataFrame API, filtering, aggregations, joins, UDFs, and Structured Streaming.

What is PySpark?

PySpark is the Python API for Apache Spark. Spark is a distributed processing engine for large-scale data, and PySpark lets Python developers tap into Spark's powerful distributed processing capabilities.

PySpark is available by default in Databricks environments, and you can process data interactively in notebooks. On the exam you are expected to read and understand PySpark code. You do not need to write code, but you do need to know exactly how each API behaves.

  • SparkSession: the entry point for PySpark. Available by default in Databricks as the spark variable
  • DataFrame: Spark's primary data structure. Made of rows and columns, with operations similar to a SQL table
  • Transformation: lazily evaluated transformations (select, filter, groupBy, etc.)
  • Action: operations that actually trigger computation (show, collect, count, write, etc.)

DataFrame API Basics

DataFrame is the core data structure of PySpark. Creating, displaying, and selecting columns from DataFrames are frequent topics on Databricks exams.

Creating a DataFrame

There are several ways to create a DataFrame. The most commonly tested approach is reading files with spark.read.

# Read from a CSV file
df = spark.read.csv("/path/to/file.csv", header=True, inferSchema=True)

# Read from a JSON file
df = spark.read.json("/path/to/file.json")

# Read from a Delta table
df = spark.read.format("delta").load("/path/to/delta")
# or
df = spark.table("my_catalog.my_schema.my_table")

# Create from a Python list
data = [("Alice", 30), ("Bob", 25)]
df = spark.createDataFrame(data, ["name", "age"])

Displaying a DataFrame (show)

show() displays the contents of a DataFrame. By default it shows the first 20 rows.

# Show the first 20 rows
df.show()

# Show the first 5 rows
df.show(5)

# Show without truncating columns
df.show(truncate=False)

# Show the schema (column names and types)
df.printSchema()

# Get the row count
df.count()

Selecting Columns (select)

select() picks just the columns you need. There are several ways to reference columns, but exams most often test the col() function approach.

from pyspark.sql.functions import col, upper, lit

# Reference columns by string
df.select("name", "age")

# Reference columns with col()
df.select(col("name"), col("age"))

# Apply a transformation to a column
df.select(col("name"), (col("age") + 1).alias("age_next_year"))

# Add a new column with withColumn
df.withColumn("name_upper", upper(col("name")))

# Add a constant column
df.withColumn("country", lit("Japan"))

Filtering (where / filter)

where() and filter() behave identically. Both keep only the rows that match the condition. Exams use both forms.

from pyspark.sql.functions import col

# Basic filtering
df.filter(col("age") > 25)
df.where(col("age") > 25)  # same as filter

# String condition (SQL expression)
df.filter("age > 25 AND city = 'Tokyo'")

# Multiple conditions (AND)
df.filter((col("age") > 25) & (col("city") == "Tokyo"))

# Multiple conditions (OR)
df.filter((col("age") > 30) | (col("city") == "Osaka"))

# NOT condition
df.filter(~(col("status") == "inactive"))

# NULL checks
df.filter(col("email").isNotNull())
df.filter(col("email").isNull())

# IN condition
df.filter(col("city").isin("Tokyo", "Osaka", "Nagoya"))

# LIKE condition
df.filter(col("name").like("A%"))
df.filter(col("name").contains("田"))

# BETWEEN condition
df.filter(col("age").between(20, 30))

On exam code questions, a common gotcha is needing to wrap each condition in parentheses when combining them with & (AND) or | (OR). Without parentheses, Python's operator precedence will cause errors.

Aggregation (groupBy / agg)

groupBy() and agg() combine to aggregate by group. This is equivalent to SQL's GROUP BY clause.

from pyspark.sql.functions import count, sum, avg, max, min, round

# Basic groupBy
df.groupBy("department").count()

# Apply multiple aggregation functions at once
df.groupBy("department").agg(
    count("*").alias("employee_count"),
    avg("salary").alias("avg_salary"),
    max("salary").alias("max_salary"),
    min("salary").alias("min_salary"),
    sum("salary").alias("total_salary")
)

# Group by multiple columns
df.groupBy("department", "job_title").agg(
    count("*").alias("count"),
    round(avg("salary"), 2).alias("avg_salary")
)

# Sort (orderBy)
df.groupBy("department").count().orderBy(col("count").desc())

# Get the top N rows
df.groupBy("department").count().orderBy(col("count").desc()).limit(5)

Exams often ask about naming aggregated columns with alias() and about asc/desc usage in orderBy().sort() is an alias for orderBy() and behaves identically.

Joins (join)

To join multiple DataFrames, use the join() method. It is equivalent to SQL's JOIN clause and supports join types: inner, left, right, full, cross, semi, and anti.

# inner join (default)
df1.join(df2, df1["id"] == df2["id"], "inner")

# left outer join
df1.join(df2, df1["id"] == df2["id"], "left")

# right outer join
df1.join(df2, df1["id"] == df2["id"], "right")

# full outer join
df1.join(df2, df1["id"] == df2["id"], "full")

# cross join (Cartesian product)
df1.crossJoin(df2)

# left semi join (only df1 rows that exist in df2)
df1.join(df2, df1["id"] == df2["id"], "left_semi")

# left anti join (only df1 rows that do not exist in df2)
df1.join(df2, df1["id"] == df2["id"], "left_anti")

# Join by matching column names (shortcut when column names match)
df1.join(df2, "id")         # inner join
df1.join(df2, ["id", "name"])  # join on multiple columns

Exams especially focus on the difference between left semi join and left anti join. A semi join "returns only the left rows whose join key matches", while an anti join "returns only the left rows whose join key does not match". In both cases, columns from the right table are not included in the result.

User Defined Functions (UDF)

When built-in functions cannot handle a custom transformation, you can define a UDF (User Defined Function). However, UDFs bypass Spark optimizations, so you should prefer built-in functions whenever possible.

from pyspark.sql.functions import udf
from pyspark.sql.types import StringType, IntegerType

# Approach 1: register via the udf() function
def classify_age(age):
    if age < 20:
        return "young"
    elif age < 60:
        return "adult"
    else:
        return "senior"

classify_age_udf = udf(classify_age, StringType())
df.withColumn("age_group", classify_age_udf(col("age")))

# Approach 2: register via decorator
@udf(returnType=StringType())
def format_name(first, last):
    return f"{last} {first}"

df.withColumn("full_name", format_name(col("first_name"), col("last_name")))

# Approach 3: Pandas UDF (fast, recommended)
from pyspark.sql.functions import pandas_udf
import pandas as pd

@pandas_udf(IntegerType())
def double_value(s: pd.Series) -> pd.Series:
    return s * 2

df.withColumn("doubled", double_value(col("value")))

Exams frequently ask about "the difference between UDF and Pandas UDF".Pandas UDF uses Apache Arrow for vectorized I/O, making it significantly faster than a regular Python UDF. For new development, Pandas UDF is the recommended choice.

Structured Streaming

Structured Streaming is an engine that processes streaming data using Spark's batch processing API. On Databricks exams, it is very frequently paired with Auto Loader (cloudFiles).

# Stream from cloud storage with Auto Loader
df_stream = (spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "json")
    .option("cloudFiles.schemaLocation", "/schema/path")
    .load("/data/input/"))

# Transformations on the stream
df_transformed = (df_stream
    .select("id", "name", "timestamp")
    .filter(col("status") == "active"))

# Stream write to a Delta table
(df_transformed.writeStream
    .format("delta")
    .option("checkpointLocation", "/checkpoint/path")
    .outputMode("append")
    .trigger(availableNow=True)
    .toTable("my_catalog.my_schema.output_table"))

# Trigger mode options
# .trigger(processingTime="10 seconds")  # micro-batch (every 10 seconds)
# .trigger(once=True)                     # run once (deprecated)
# .trigger(availableNow=True)             # process all available data (recommended)

checkpointLocation is a required option. The checkpoint records the stream's progress and is used to recover from failures. Exams also test it via questions like "what happens if you do not specify checkpointLocation?".

outputMode has three modes:

  • append: Output only new rows. Used for non-aggregating workloads (most common).
  • complete: Output the entire result every time. Used for aggregations.
  • update: Output only updated rows. Used when you only need the changed portion of an aggregation.

Check your PySpark skills

Try questions on DataFrame API and Structured Streaming

Try free questions

Try PySpark Sample Questions

Now apply what we covered with questions modeled on the real exam.

Spark / Data Engineer

問題 1

Which is the correct result of running the following PySpark code?&#10;&#10;df = spark.createDataFrame([(1, 'A', 100), (2, 'B', 200), (3, 'A', 150)], ['id', 'group', 'value'])&#10;result = df.groupBy('group').agg(sum('value').alias('total')).orderBy('total')&#10;result.show()

  1. Two rows: group=A, total=250 / group=B, total=200 (ascending by total)
  2. Two rows: group=B, total=200 / group=A, total=250 (descending by total)
  3. Three rows: group=A, total=100 / group=A, total=150 / group=B, total=200
  4. An error occurs (sum requires col())

正解: A

groupBy('group') splits into two groups, A and B, and agg(sum('value')) computes the sum per group: A = 100+150 = 250, B = 200. orderBy('total') sorts by total ascending, so the actual display order is B=200 then A=250. Option A describes the two-row result sorted ascending by total and is correct. sum() accepts either a string column name or a col() reference.

Structured Streaming

問題 2

What is the main purpose of specifying a checkpoint location in Structured Streaming?

  1. To improve stream processing speed
  2. To recover stream progress on failure and guarantee exactly-once processing
  3. To store encryption keys for the stream data
  4. To automatically manage the schema of the output table

正解: B

checkpointLocation stores stream offset information (how far processing has progressed) and metadata. On failure, this information is used to resume processing and guarantee exactly-once semantics. It is not for performance, encryption, or schema management.

Frequently Asked Questions

What is the difference between PySpark and Pandas?

Pandas is a single-machine data processing library, while PySpark is a framework that runs distributed processing across a cluster. Pandas is fine for datasets up to a few GB, but PySpark is required once you reach tens of GB to TB scale. On Databricks you can also use Pandas API on Spark to keep Pandas syntax while running distributed processing on Spark.

How many PySpark code questions appear on Databricks exams?

It varies by exam. About 15-20% of Data Engineer Associate (DEA) questions are PySpark/SQL code reading. On Spark Developer Associate, code questions make up over 50%. You do not need to write code, but you must read code and determine the correct output or behavior.

What is Structured Streaming?

Structured Streaming is an engine that processes streaming data using the same DataFrame/Dataset API as Spark batch processing. You read streams with readStream and write them with writeStream. Databricks exams frequently combine it with Auto Loader (cloudFiles). Make sure you understand the differences between trigger modes (processingTime, once, availableNow).

Test your PySpark understanding

Covers DataFrame API, filtering, aggregation, and Streaming questions

Try free questions

Related Databricks Exam Articles

Databricks Spark Developer: Complete Guide

Covers the DataFrame API and Spark SQL exam scope

Databricks Data Engineer Associate: Complete Guide

DEA exam scope with sample questions

How to Study for Databricks Certifications

Fastest path to passing and study-time estimates

Free Databricks Question Bank

Over 6,800 practice questions available bilingually

Check what you learned with practice questions

Practice with certification-focused question sets

Try free practice questions
Author

NicheeLab Editorial Team

NicheeLab editorial team focused on data engineering and cloud certification learning. Content is structured around practical study needs and official exam domains.


Related articles
Databricks

Databricks Certifications: All 7 Exams, Difficulty & Study Plan (2026)

Complete guide to all 7 Databricks certifications — Data Eng...

Databricks

Databricks Exam Difficulty Ranking: All 7 Certs Compared (2026)

Every Databricks certification ranked by difficulty, with st...

Databricks

Databricks Study Guide: Fastest Pass Route & Time Estimates (2026)

How to pass Databricks certifications efficiently. Official ...

Databricks

Databricks Data Engineer Associate: Complete Guide (2026)

Domain-by-domain breakdown of the Databricks Certified Data ...

Databricks

Databricks Data Engineer Professional: Complete Guide (2026)

Tactics for the Databricks Certified Data Engineer Professio...

Browse all Databricks articles (110)
© 2026 NicheeLab All rights reserved.