Spark SQLで df.filter(...).groupBy(...).agg(...) と書いたコードは、 そのままの順序では実行されません。Catalyst Optimizerがクエリの意味を解析し、フィルタの押し下げ、不要カラムの除去、JOIN順序の並べ替え、 最適なJOINアルゴリズムの選択を行い、最終的にJVMバイトコードへコンパイルして実行します。 Catalystはユーザーが「何を計算したいか」だけを宣言すれば、 「どう計算するか」を自動で最適解に導くSparkの心臓部です。
本記事では、Catalystの4フェーズ(Analysis → Logical Optimization → Physical Planning → Code Generation)を内部動作レベルで解説し、 論理最適化ルール、物理プラン選択、AQE、Tungsten、 そしてEXPLAINによるデバッグ手法まで網羅します。
SQLやDataFrame APIで記述された処理は、以下の4段階を経て実行されます。 各フェーズは前のフェーズの出力を入力として受け取り、段階的に具体化していきます。
┌─────────────────────────────────────────────────────────────────┐
│ SQL / DataFrame API │
│ SELECT name, SUM(amount) │
│ FROM sales WHERE region = 'APAC' GROUP BY name │
└────────────────────────┬────────────────────────────────────────┘
▼
┌────────────────────────────────────────────────────────────────┐
│ Phase 1: Analysis(解析) │
│ ─ Unresolved Logical Plan を受け取る │
│ ─ Catalog参照でテーブル名・カラム名・データ型を解決 │
│ ─ 存在しないカラム・型不一致はここで AnalysisException │
│ 出力: Resolved Logical Plan │
└────────────────────────┬───────────────────────────────────────┘
▼
┌────────────────────────────────────────────────────────────────┐
│ Phase 2: Logical Optimization(論理最適化) │
│ ─ ルールベースの等価変換を繰り返し適用 │
│ ─ Predicate Pushdown / Column Pruning / Constant Folding │
│ ─ 物理的な実行方法は考慮しない │
│ 出力: Optimized Logical Plan │
└────────────────────────┬───────────────────────────────────────┘
▼
┌────────────────────────────────────────────────────────────────┐
│ Phase 3: Physical Planning(物理プランニング) │
│ ─ 複数の物理プラン候補を生成 │
│ ─ コストモデルで最適プランを選択 │
│ ─ JOIN戦略・シャッフル方式・集計方式を決定 │
│ 出力: Selected Physical Plan │
└────────────────────────┬───────────────────────────────────────┘
▼
┌────────────────────────────────────────────────────────────────┐
│ Phase 4: Code Generation(コード生成 / Tungsten) │
│ ─ 複数オペレータを単一Javaメソッドにコンパイル │
│ ─ 仮想関数呼び出しを排除しCPUキャッシュ効率を最大化 │
│ 出力: JVM Bytecode → 実行 │
└────────────────────────────────────────────────────────────────┘パーサーが生成した未解決論理プラン(Unresolved Logical Plan)を受け取り、 Sparkのメタストア(Hive Metastore / Unity Catalog)を参照してテーブル・カラム・データ型を解決します。SELECT unknown_col FROM sales のように存在しないカラムを参照していれば、 この段階で AnalysisException が送出されます。 DataFrameAPIの場合も内部的に同じ解析パスを通るため、df.select("unknown_col") も同様のエラーになります。
解決済み論理プランに対して、数十のルールベース変換を繰り返し適用します。 代表的なルールはPredicate Pushdown・Column Pruning・Constant Foldingの3つで、 これらは後のセクションで詳述します。このフェーズでは物理的な実行方法(JOINの実装方式、 シャッフル戦略、メモリ配分)は一切考慮しません。 純粋にリレーショナル代数の等価変換だけを行うフェーズです。
最適化された論理プランから、実行可能な物理プランの候補を複数生成し、 コストモデルに基づいて最もコストの低いプランを選択します。 ここでJOINアルゴリズム(Broadcast Hash Join / Sort Merge Join / Shuffle Hash Join)、 集計の方式(HashAggregate / SortAggregate)、Exchange(シャッフル)の挿入位置が決定されます。 コストベース最適化(CBO)が有効な場合(spark.sql.cbo.enabled = true)、ANALYZE TABLE で収集したテーブル統計情報が参照されます。
物理プランの各ステージをJavaバイトコードにコンパイルします。 Whole-Stage Code Generation(Tungsten)により、Filter → Project → HashAggregate のような連続オペレータをひとつのJavaメソッドにまとめ、 Volcanoモデルの仮想関数呼び出しオーバーヘッドを排除します。 詳細は後述の「Whole-Stage Code Generation」セクションで解説します。
Catalystの論理最適化フェーズには50以上のルールがありますが、 試験・実務の両面で最重要な3つを取り上げます。
WHERE句のフィルタ条件をクエリツリーの可能な限りデータソース側に移動させ、 読み込むデータ量を削減します。Parquetリーダーに渡されたフィルタは Row Groupのフッター統計情報(min/max)と照合され、 条件に該当しないRow Groupをスキャンせずに飛ばします。 Delta Lakeではさらにファイルレベルのデータスキッピングが働き、 該当しないParquetファイル自体の読み込みを回避します。
-- 最適化前の論理プラン(概念)
Aggregate [name], [name, sum(amount)]
Filter (region = 'APAC')
Scan sales [name, amount, region, ...]
-- Predicate Pushdown適用後
Aggregate [name], [name, sum(amount)]
Scan sales [name, amount, region]
PushedFilters: [EqualTo(region, APAC)]
-- Parquet/Deltaの場合、フィルタがスキャン内部に
-- 押し下げられ、該当しないRow Group/ファイルを丸ごとスキップSELECT句やJOIN条件で実際に参照されるカラムだけを読み込むよう、 スキャンオペレータのプロジェクションリストを絞り込みます。 Parquet/Delta Lakeはカラムナー形式のため、読み込まないカラムのI/Oを完全にスキップでき、 100カラムのテーブルから3カラムだけSELECTする場合、ディスクI/Oを97%削減できます。
-- 元のクエリ
SELECT name, amount FROM sales WHERE region = 'APAC'
-- Column Pruning適用後のスキャン
Scan sales [name, amount, region] -- 他の全カラムは読み込まない
PushedFilters: [EqualTo(region, APAC)]
-- Parquetのカラムナー特性により、
-- name, amount, region以外のカラムチャンクは物理I/Oゼロコンパイル時に計算可能な定数式を事前に評価して定数に置き換えます。 実行時に毎行計算する必要がなくなるため、CPU負荷を削減し、 さらにPredicate Pushdownの対象にできる条件が増えます。
-- 最適化前
WHERE sale_date > date_add('2026-01-01', 30)
AND tax_rate * 100 > 8
-- Constant Folding適用後
WHERE sale_date > '2026-01-31'
AND tax_rate > 0.08
-- 定数式が事前計算され、Pushdown可能なシンプルな比較に変換JOINはSparkクエリで最もコストの高い操作です。物理プランニングフェーズで Catalystが選択するJOINアルゴリズムは3種類あり、入力テーブルのサイズ・ 統計情報・設定パラメータに基づいて自動決定されます。
| アルゴリズム | 動作原理 | 選択条件 | シャッフル | 適用ケース |
|---|---|---|---|---|
| Broadcast Hash Join | 小テーブルを全Executorにブロードキャストし、ハッシュテーブルで結合 | 片方のテーブルが autoBroadcastJoinThreshold(デフォルト10MB)以下 | なし | ディメンション × ファクトの星型JOIN |
| Sort Merge Join | 両テーブルをJOINキーでシャッフル&ソート後、マージ | 両テーブルが大規模で等結合(equi-join) | あり(両側) | 大規模テーブル同士のJOIN(デフォルト戦略) |
| Shuffle Hash Join | JOINキーでシャッフル後、小さい側のハッシュテーブルを構築 | ソートコストが高く片側がメモリに収まる場合 | あり(両側) | ソート不要な中規模JOIN |
-- Broadcast Hash Joinの閾値設定
SET spark.sql.autoBroadcastJoinThreshold = 10485760; -- 10MB(デフォルト)
-- 強制的にBroadcast Joinを使用(ヒント)
SELECT /*+ BROADCAST(dim_product) */
f.order_id, f.amount, d.product_name
FROM fact_orders f
JOIN dim_product d ON f.product_id = d.product_id
-- PySpark APIでのBroadcastヒント
from pyspark.sql.functions import broadcast
result = fact_orders.join(broadcast(dim_product), "product_id")spark.sql.autoBroadcastJoinThreshold = -1 に設定すると Broadcast Joinが無効化され、常にSort Merge Joinが選択されます。 逆に閾値を引き上げるとBroadcast Joinの適用範囲が広がりますが、 ドライバメモリ不足(OOM)のリスクがあるため、 Executorメモリの10%以下を目安にしてください。
従来のCatalystは実行前に全ての最適化を確定する静的オプティマイザでしたが、 テーブル統計情報が古い・存在しないケースでは最適でないプランが選択されることがありました。 AQEはSpark 3.0で導入され、シャッフルステージの境界で実行時統計を収集し、 残りのプランを動的に再最適化します。
-- AQE関連の主要設定
SET spark.sql.adaptive.enabled = true; -- AQE有効化(Spark 3.2+でデフォルト)
SET spark.sql.adaptive.coalescePartitions.enabled = true; -- 小パーティション自動統合
SET spark.sql.adaptive.skewJoin.enabled = true; -- Skew Join最適化
SET spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes = 256m; -- スキュー検出閾値spark.sql.shuffle.partitions(デフォルト200)は全クエリ共通の固定値ですが、 実際のデータ量に対して過大な場合、大量の小さなタスクが生成されスケジューリングオーバーヘッドが増大します。 AQEはシャッフル後の各パーティションのデータ量を計測し、小さなパーティションを自動的に統合して 適切なタスク数に調整します。これにより、初期パーティション数を200のままにしていても、 実行時にクエリごとの最適値に自動調整されます。
静的な統計情報ではSort Merge Joinが選択されたケースでも、 シャッフル後の実データサイズが autoBroadcastJoinThreshold以下であることをAQEが検出すると、Broadcast Hash Joinに動的に切り替えます。 フィルタで大幅にデータが削減された場合に特に有効です。
JOINキーの値に偏り(Skew)がある場合、特定のパーティションにデータが集中し そのタスクだけが極端に遅くなります(ストラグラー問題)。 AQEのSkew Join最適化は、閾値を超えるパーティションを自動検出し、 複数のサブパーティションに分割して別々のタスクで並列処理します。 対応するもう一方のテーブルのパーティションは複製されて各サブパーティションとJOINされます。
-- Skew Join最適化のイメージ
-- 通常のSort Merge Join(Skewあり)
Partition 0: 100MB ← 他の9パーティションは各10MB
→ Partition 0のタスクだけ10倍遅い
-- AQE Skew Join最適化後
Partition 0-a: 50MB ┐
Partition 0-b: 50MB ┘ 元のPartition 0を分割
Partition 1: 10MB
...
→ 全タスクが均等な所要時間に従来のSparkはVolcanoモデルに基づき、各オペレータ(Filter → Project → Aggregate)が 1行ずつ next() メソッドで行を受け渡していました。 この方式では1行あたり複数回の仮想関数呼び出しが発生し、 CPUの分岐予測ミスやキャッシュミスが頻発します。
Whole-Stage Code Generation(Tungstenプロジェクトの一部)は、 連続する複数のオペレータをひとつのJavaメソッドにコンパイルし、 タイトなwhileループで行を処理します。仮想関数呼び出しが排除され、 JITコンパイラの最適化(ループアンローリング、SIMD命令の適用)が効きやすくなります。
-- EXPLAINでWholeStageCodegenを確認
EXPLAIN SELECT name, SUM(amount)
FROM sales WHERE region = 'APAC' GROUP BY name
-- 出力例(抜粋)
-- *(1) HashAggregate(keys=[name], functions=[sum(amount)])
-- +- *(1) Project [name, amount]
-- +- *(1) Filter (region = APAC)
-- +- *(1) ColumnarToRow
-- +- FileScan parquet sales[name,amount,region]
-- PushedFilters: [EqualTo(region,APAC)]
--
-- *(1) = WholeStageCodegen Stage 1
-- Filter, Project, HashAggregateが単一Javaメソッドに統合*(アスタリスク)付きのオペレータがWholeStageCodegenの対象です。 FileScanやExchange(シャッフル)はコード生成の境界となるため、 これらを挟んで新しいCodegenステージが始まります。
Catalystの最適化結果を確認するには EXPLAIN コマンドを使います。 デバッグやパフォーマンスチューニングの基本ツールです。
| コマンド | 出力内容 | 主な用途 |
|---|---|---|
EXPLAIN | 物理プランのみ | JOINの種類・シャッフルの有無を素早く確認 |
EXPLAIN EXTENDED | Parsed → Analyzed → Optimized → Physical の全4プラン | Predicate Pushdownなど論理最適化の適用を確認 |
EXPLAIN FORMATTED | 物理プラン(読みやすいフォーマット) | オペレータごとのメトリクス確認 |
EXPLAIN COST | 物理プラン+コスト見積もり | CBO(コストベース最適化)の結果確認 |
-- SQL: 物理プランの確認
EXPLAIN
SELECT d.product_name, SUM(f.amount)
FROM fact_orders f
JOIN dim_product d ON f.product_id = d.product_id
WHERE f.order_date >= '2026-01-01'
GROUP BY d.product_name
-- PySpark: DataFrame APIでの確認
df_result = (
fact_orders
.filter("order_date >= '2026-01-01'")
.join(broadcast(dim_product), "product_id")
.groupBy("product_name")
.agg(sum("amount"))
)
df_result.explain(True) # True = EXTENDED相当(全4プラン表示)
df_result.explain("cost") # コスト見積もり付きCatalyst OptimizerはDatabricks Certified Spark Developer試験で直接出題される領域です。 DEA(Data Engineer Associate)でもクエリチューニングの文脈で間接的に問われます。 以下の論点を確実に押さえてください。
autoBroadcastJoinThreshold(10MB)以下。 ヒント(/*+ BROADCAST(t) */)で強制可能* が付いたオペレータがコード生成対象。 FileScan・Exchangeは境界となるEXPLAIN(物理プランのみ)とEXPLAIN EXTENDED(全4プラン)の違いSpark Developer Associate
問題 1
以下のPySparkコードを実行したとき、Catalyst Optimizerの動作として正しい記述はどれですか? df = spark.read.parquet('/data/sales') result = ( df.filter(col('region') == 'APAC') .select('name', 'amount', 'region') .groupBy('name') .agg(sum('amount').alias('total')) )
正解: B
Catalystは論理最適化フェーズでPredicate Pushdown(filterのスキャンへの押し下げ)とColumn Pruning(select句で指定された3カラムだけの読み込み)を適用します。Parquetはカラムナー形式のため、不要なカラムの物理I/Oをゼロにできます。選択肢Aは逆方向(filterを後ろに移動)なので誤りです。フィルタは可能な限り前(データソース側)に移動します。選択肢Cは誤りです。DataFrame APIでもCatalystによる自動最適化が行われ、記述順序通りには実行されません。選択肢Dは誤りです。groupBy単体ではJOINは発生せず、Broadcast Joinが挿入されることはありません。
Catalyst Optimizerの論理最適化と物理プランニングの境界はどこですか?
論理最適化フェーズはデータの物理配置や実行エンジンの仕様を一切考慮せず、リレーショナル代数の等価変換(Predicate Pushdown、Column Pruning、Constant Foldingなど)だけを適用します。一方、物理プランニングフェーズでは実行エンジンの仕様(メモリ量、ネットワーク帯域、テーブル統計情報)を参照し、JOINアルゴリズムの選択(Broadcast Hash Join / Sort Merge Join / Shuffle Hash Join)、集計のHashAggregateかSortAggregateかの決定、シャッフル方式の選択などを行います。EXPLAINの出力で == Optimized Logical Plan == と == Physical Plan == を比較すると、この境界を明確に確認できます。
Predicate PushdownがParquet/Delta Lakeでどの程度の効果を発揮しますか?
ParquetはRow Groupごとにカラムの最小値・最大値をフッター統計情報として保持しており、Predicate PushdownによりフィルタがParquetリーダーに渡されると、条件に該当しないRow Groupを丸ごとスキップします。Delta Lakeではさらにファイルレベルのデータスキッピング(_delta_log内のadd.statsに格納されたmin/max)が機能するため、条件に該当しないファイル自体の読み込みを回避できます。Z-OrderやLiquid Clusteringと組み合わせると、関連するデータが物理的に近接配置されるため、スキッピング率が飛躍的に向上し、数TB規模のテーブルに対するポイントクエリで100倍以上のスキャン量削減が報告されています。
AQE(Adaptive Query Execution)を有効にするデメリットはありますか?
AQEはシャッフルステージの境界で実行統計を収集し再最適化するため、ステージ間のプランニングオーバーヘッドが若干増加します。ただし、このオーバーヘッドはミリ秒単位であり、実行時間が秒〜分以上のクエリでは無視できるレベルです。Spark 3.2以降ではデフォルト有効(spark.sql.adaptive.enabled = true)になっており、Databricksも全ワークロードでの有効化を推奨しています。無効化が有効なのは、極端に短い(< 100ms)マイクロバッチを大量に回すケースなど非常に限定的です。実務上デメリットを意識する必要はほぼありません。
NicheeLab編集部
データエンジニアリング・クラウド資格の専門家。Databricks・Snowflake等の認定資格を保有し、実務経験に基づいた問題作成・解説を行っています。NicheeLab運営。
Databricks資格一覧|全7試験・難易度・勉強法
Databricks認定資格全7試験の一覧・難易度・出題範囲・合格ラインを徹底解説。2026年最新版の公式試験ガイドに準...
Databricks試験の難易度ランキング|全7資格を徹底比較
Databricks認定全7試験の難易度をランキング形式で徹底比較。合格率・学習時間・出題傾向から難易度を分析。...
Databricks資格の勉強方法|最短合格ルートと学習時間の目安
Databricks認定資格に最短で合格するための勉強方法を完全ガイド。公式リソース・問題集・学習スケジュールを徹底解説...
Databricks Data Engineer Associate完全解説|出題範囲・問題例・合格戦略
Databricks Certified Data Engineer Associate試験を徹底解説。5つの出題ドメイ...
Databricks Data Engineer Professional完全解説|上級試験の攻略法
Databricks Certified Data Engineer Professional試験を徹底解説。10の出題...