When you write df.filter(...).groupBy(...).agg(...) in Spark SQL, the code does not execute in that literal order. The Catalyst Optimizer analyzes the query's meaning, pushes filters down, prunes unused columns, reorders JOINs, picks the optimal JOIN algorithm, and finally compiles everything to JVM bytecode for execution. Catalyst is the heart of Spark: you declare "what to compute," and it figures out "how to compute it" optimally.
This article dissects Catalyst's four phases (Analysis → Logical Optimization → Physical Planning → Code Generation) at the internals level, covering logical optimization rules, physical plan selection, AQE, Tungsten, and how to debug with EXPLAIN.
A query written in SQL or DataFrame API goes through these 4 stages. Each phase takes the previous phase's output as input and incrementally materializes the plan.
┌─────────────────────────────────────────────────────────────────┐
│ SQL / DataFrame API │
│ SELECT name, SUM(amount) │
│ FROM sales WHERE region = 'APAC' GROUP BY name │
└────────────────────────┬────────────────────────────────────────┘
▼
┌────────────────────────────────────────────────────────────────┐
│ Phase 1: Analysis │
│ - Receive Unresolved Logical Plan │
│ - Resolve table/column/type via Catalog lookup │
│ - Missing columns / type mismatches → AnalysisException │
│ Output: Resolved Logical Plan │
└────────────────────────┬───────────────────────────────────────┘
▼
┌────────────────────────────────────────────────────────────────┐
│ Phase 2: Logical Optimization │
│ - Iteratively apply rule-based equivalence transforms │
│ - Predicate Pushdown / Column Pruning / Constant Folding │
│ - Physical execution details are NOT considered │
│ Output: Optimized Logical Plan │
└────────────────────────┬───────────────────────────────────────┘
▼
┌────────────────────────────────────────────────────────────────┐
│ Phase 3: Physical Planning │
│ - Generate multiple physical plan candidates │
│ - Pick the best plan via cost model │
│ - Decide JOIN strategy / shuffle method / aggregation mode │
│ Output: Selected Physical Plan │
└────────────────────────┬───────────────────────────────────────┘
▼
┌────────────────────────────────────────────────────────────────┐
│ Phase 4: Code Generation (Tungsten) │
│ - Compile multiple operators into a single Java method │
│ - Eliminate virtual calls; maximize CPU cache efficiency │
│ Output: JVM Bytecode → execute │
└────────────────────────────────────────────────────────────────┘The parser produces an Unresolved Logical Plan. Catalyst then consults Spark's metastore (Hive Metastore / Unity Catalog) to resolve table, column, and data type references. If you reference a column that does not exist, like SELECT unknown_col FROM sales, an AnalysisException is thrown at this stage. DataFrame API calls internally go through the same analysis path, sodf.select("unknown_col") raises the same error.
Dozens of rule-based transformations are iteratively applied to the resolved logical plan. The three representative rules are Predicate Pushdown, Column Pruning, and Constant Folding, which we cover in detail later. This phase ignores physical execution details (JOIN implementation, shuffle strategy, memory allocation). It is purely about relational-algebra equivalence transformations.
From the optimized logical plan, Catalyst generates multiple physical plan candidates and selects the lowest-cost plan using a cost model. This is where the JOIN algorithm (Broadcast Hash Join / Sort Merge Join / Shuffle Hash Join), the aggregation method (HashAggregate / SortAggregate), and the placement of Exchange (shuffle) are determined. When Cost-Based Optimization (CBO) is enabled (spark.sql.cbo.enabled = true), table statistics collected via ANALYZE TABLE are consulted.
Each stage of the physical plan is compiled to Java bytecode. Whole-Stage Code Generation (Tungsten) fuses consecutive operators like Filter → Project → HashAggregate into a single Java method, eliminating the virtual call overhead of the Volcano model. See the "Whole-Stage Code Generation" section below for details.
Catalyst's logical optimization phase ships more than 50 rules, but we focus on the 3 most important ones for both exams and real-world work.
WHERE filter conditions are moved as close to the data source as possible in the query tree, reducing the amount of data read. When a filter is handed to the Parquet reader, it is matched against the Row Group footer statistics (min/max), and non-matching Row Groups are skipped entirely. Delta Lake adds file-level data skipping on top of that, avoiding the read of non-matching Parquet files altogether.
-- Logical plan before optimization (conceptual)
Aggregate [name], [name, sum(amount)]
Filter (region = 'APAC')
Scan sales [name, amount, region, ...]
-- After Predicate Pushdown
Aggregate [name], [name, sum(amount)]
Scan sales [name, amount, region]
PushedFilters: [EqualTo(region, APAC)]
-- For Parquet/Delta, the filter is pushed into the scan,
-- skipping non-matching Row Groups / files entirelyThe projection list of the scan operator is narrowed so that only columns actually referenced by SELECT or JOIN conditions are read. Because Parquet and Delta Lake are columnar formats, I/O for unread columns is skipped entirely. If you SELECT just 3 columns from a 100-column table, you cut disk I/O by 97%.
-- Original query
SELECT name, amount FROM sales WHERE region = 'APAC'
-- Scan after Column Pruning
Scan sales [name, amount, region] -- all other columns are not read
PushedFilters: [EqualTo(region, APAC)]
-- Thanks to Parquet's columnar layout,
-- physical I/O for any column other than name/amount/region is zeroConstant expressions that can be evaluated at compile time are pre-computed and replaced with literals. Per-row computation at runtime is avoided, cutting CPU load, and the simplified comparisons become eligible for Predicate Pushdown more often.
-- Before optimization
WHERE sale_date > date_add('2026-01-01', 30)
AND tax_rate * 100 > 8
-- After Constant Folding
WHERE sale_date > '2026-01-31'
AND tax_rate > 0.08
-- Constant expressions are pre-computed, turning into simple comparisons eligible for pushdownJOIN is the most expensive operation in Spark queries. In the physical planning phase Catalyst picks among 3 JOIN algorithms, decided automatically based on input table sizes, statistics, and configuration parameters.
| Algorithm | How it works | Selection criteria | Shuffle | Typical use case |
|---|---|---|---|---|
| Broadcast Hash Join | Broadcast the small table to all Executors and join via hash table | One side is below autoBroadcastJoinThreshold (default 10MB) | None | Star-schema JOIN: dimension x fact |
| Sort Merge Join | Shuffle and sort both tables by the JOIN key, then merge | Both tables are large and the JOIN is an equi-join | Yes (both sides) | Large-large JOIN (default strategy) |
| Shuffle Hash Join | Shuffle on the JOIN key, then build a hash table from the smaller side | When sort cost is high and one side fits in memory | Yes (both sides) | Mid-sized JOIN where sorting is unnecessary |
-- Broadcast Hash Join threshold
SET spark.sql.autoBroadcastJoinThreshold = 10485760; -- 10MB (default)
-- Force Broadcast Join via hint
SELECT /*+ BROADCAST(dim_product) */
f.order_id, f.amount, d.product_name
FROM fact_orders f
JOIN dim_product d ON f.product_id = d.product_id
-- Broadcast hint in PySpark API
from pyspark.sql.functions import broadcast
result = fact_orders.join(broadcast(dim_product), "product_id")Setting spark.sql.autoBroadcastJoinThreshold = -1 disables Broadcast Join, forcing Sort Merge Join everywhere. Raising the threshold widens the range where Broadcast Join applies, but increases the risk of driver OOM, so keep it below roughly 10% of Executor memory.
Classic Catalyst is a static optimizer that finalizes all decisions before execution. When table statistics are stale or missing, suboptimal plans could be chosen. AQE, introduced in Spark 3.0, collects runtime statistics at shuffle stage boundaries and re-optimizes the remaining plan dynamically.
-- Key AQE settings
SET spark.sql.adaptive.enabled = true; -- Enable AQE (default in Spark 3.2+)
SET spark.sql.adaptive.coalescePartitions.enabled = true; -- Auto-coalesce small partitions
SET spark.sql.adaptive.skewJoin.enabled = true; -- Skew Join optimization
SET spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes = 256m; -- Skew detection thresholdspark.sql.shuffle.partitions (default 200) is a fixed value shared across all queries. When the actual data volume is much smaller, many tiny tasks are spawned and scheduling overhead balloons. AQE measures the post-shuffle data size of each partition and automatically coalesces small partitions to keep task count at an appropriate level. Even if you leave the initial partition count at 200, execution-time tuning produces the right value per query.
Even if static statistics led Catalyst to pick Sort Merge Join, AQE can detect at runtime that the post-shuffle real data size of one side is belowautoBroadcastJoinThreshold and dynamically switch to Broadcast Hash Join. This is especially effective when filters have drastically reduced the data volume.
When JOIN key values are skewed, data piles up in specific partitions and those tasks alone become extremely slow (the straggler problem). AQE's Skew Join optimization detects partitions that cross the threshold, splits them into multiple sub-partitions, and processes them in parallel as separate tasks. The corresponding partition on the other side is duplicated and joined against each sub-partition.
-- Conceptual view of Skew Join optimization
-- Normal Sort Merge Join (with skew)
Partition 0: 100MB <- other 9 partitions are 10MB each
-> Partition 0's task is 10x slower
-- After AQE Skew Join optimization
Partition 0-a: 50MB |
Partition 0-b: 50MB | Original Partition 0 split in two
Partition 1: 10MB
...
-> All tasks finish in roughly equal timeTraditional Spark followed the Volcano model where each operator (Filter → Project → Aggregate) passed rows one at a time via the next() method. That approach incurs several virtual calls per row, triggering frequent CPU branch mispredictions and cache misses.
Whole-Stage Code Generation (part of the Tungsten project) compiles consecutive operators into a single Java method that processes rows inside a tight while loop. Virtual calls are eliminated, and JIT optimizations (loop unrolling, SIMD-friendly code) become much easier to apply.
-- Inspect WholeStageCodegen via EXPLAIN
EXPLAIN SELECT name, SUM(amount)
FROM sales WHERE region = 'APAC' GROUP BY name
-- Sample output (excerpt)
-- *(1) HashAggregate(keys=[name], functions=[sum(amount)])
-- +- *(1) Project [name, amount]
-- +- *(1) Filter (region = APAC)
-- +- *(1) ColumnarToRow
-- +- FileScan parquet sales[name,amount,region]
-- PushedFilters: [EqualTo(region,APAC)]
--
-- *(1) = WholeStageCodegen Stage 1
-- Filter, Project, HashAggregate are fused into a single Java methodOperators marked with * (asterisk) are part of WholeStageCodegen. FileScan and Exchange (shuffle) act as code generation boundaries, so a new Codegen stage starts on either side of them.
To inspect Catalyst's optimization output, use the EXPLAIN command. It is the foundational tool for debugging and performance tuning.
| Command | Output | Main use case |
|---|---|---|
EXPLAIN | Physical plan only | Quickly verify JOIN type and presence of shuffles |
EXPLAIN EXTENDED | All 4 plans: Parsed → Analyzed → Optimized → Physical | Confirm that logical optimizations like Predicate Pushdown applied |
EXPLAIN FORMATTED | Physical plan, easier-to-read format | Inspect per-operator metrics |
EXPLAIN COST | Physical plan plus cost estimates | Verify CBO (cost-based optimization) results |
-- SQL: inspect the physical plan
EXPLAIN
SELECT d.product_name, SUM(f.amount)
FROM fact_orders f
JOIN dim_product d ON f.product_id = d.product_id
WHERE f.order_date >= '2026-01-01'
GROUP BY d.product_name
-- PySpark: inspect via DataFrame API
df_result = (
fact_orders
.filter("order_date >= '2026-01-01'")
.join(broadcast(dim_product), "product_id")
.groupBy("product_name")
.agg(sum("amount"))
)
df_result.explain(True) # True = EXTENDED (all 4 plans)
df_result.explain("cost") # With cost estimatesCatalyst Optimizer is directly tested on the Databricks Certified Spark Developer exam. On the DEA (Data Engineer Associate), it is tested indirectly through query tuning scenarios. Lock down these talking points.
autoBroadcastJoinThreshold (10MB). Force it via the hint /*+ BROADCAST(t) */.* are part of code generation. FileScan and Exchange act as boundaries.EXPLAIN (physical plan only) vs EXPLAIN EXTENDED (all 4 plans).Spark Developer Associate
問題 1
When the following PySpark code runs, which statement about Catalyst Optimizer's behavior is correct? df = spark.read.parquet('/data/sales') result = ( df.filter(col('region') == 'APAC') .select('name', 'amount', 'region') .groupBy('name') .agg(sum('amount').alias('total')) )
正解: B
Catalyst applies Predicate Pushdown (pushing filter down to the scan) and Column Pruning (reading only the 3 columns named in select) in the logical optimization phase. Because Parquet is columnar, physical I/O for unused columns drops to zero. Option A is wrong because it reverses the direction (moving filter later). Filters move as early as possible, toward the data source. Option C is wrong: DataFrame API also goes through Catalyst's automatic optimization and does not execute in literal source order. Option D is wrong: groupBy alone does not introduce a JOIN, so no Broadcast Join is inserted.
Where is the boundary between Catalyst Optimizer's logical optimization and physical planning?
The logical optimization phase ignores physical data layout and execution engine specifics, applying only equivalence transformations of relational algebra (Predicate Pushdown, Column Pruning, Constant Folding, etc.). The physical planning phase, by contrast, consults execution engine details (memory, network bandwidth, table statistics) to choose JOIN algorithms (Broadcast Hash Join / Sort Merge Join / Shuffle Hash Join), decide between HashAggregate and SortAggregate, and select shuffle strategies. Comparing == Optimized Logical Plan == and == Physical Plan == in EXPLAIN output makes this boundary visually clear.
How effective is Predicate Pushdown with Parquet / Delta Lake?
Parquet stores per-Row-Group min/max statistics in the footer, so when Predicate Pushdown hands a filter to the Parquet reader, entire non-matching Row Groups are skipped. Delta Lake adds file-level data skipping (min/max in add.stats inside _delta_log), letting it skip reading whole non-matching files. Combined with Z-Order or Liquid Clustering, related data is physically co-located, dramatically boosting skipping rates. Point queries against multi-TB tables have been reported to cut scan volume by 100x or more.
Are there any downsides to enabling Adaptive Query Execution (AQE)?
AQE adds a small planning overhead at shuffle stage boundaries because it collects runtime statistics and re-optimizes. However, the overhead is on the order of milliseconds and is negligible for queries that run for seconds or longer. Spark 3.2+ enables it by default (spark.sql.adaptive.enabled = true), and Databricks recommends keeping it on for all workloads. Disabling it only makes sense in very narrow cases like extremely short (<100ms) micro-batches running at high volume. In practice, you almost never need to worry about the downside.
Practice with certification-focused question sets
無料で問題を解いてみるNicheeLab Editorial Team
NicheeLab editorial team focused on data engineering and cloud certification learning. Content is structured around practical study needs and official exam domains.
Databricks Certifications: All 7 Exams, Difficulty & Study Plan (2026)
Complete guide to all 7 Databricks certifications — Data Eng...
Databricks Exam Difficulty Ranking: All 7 Certs Compared (2026)
Every Databricks certification ranked by difficulty, with st...
Databricks Study Guide: Fastest Pass Route & Time Estimates (2026)
How to pass Databricks certifications efficiently. Official ...
Databricks Data Engineer Associate: Complete Guide (2026)
Domain-by-domain breakdown of the Databricks Certified Data ...
Databricks Data Engineer Professional: Complete Guide (2026)
Tactics for the Databricks Certified Data Engineer Professio...