MERGE INTOは、Delta Lakeで最もよく使われるデータ操作のひとつです。 1つのステートメントで「マッチした行は更新、マッチしない行は挿入、不要な行は削除」ができるため、 UPSERT(Update or Insert)パターン、重複排除、SCD(Slowly Changing Dimension)の実装に広く使われます。 Databricks認定試験では全てのData Engineer系試験で頻出です。
MERGE INTOの構文は、ターゲットテーブル・ソースデータ・マッチ条件・マッチ時の処理・非マッチ時の処理で構成されます。
MERGE INTO silver.customers AS t
USING bronze.raw_customers AS s
ON t.customer_id = s.customer_id
-- マッチした行: 更新
WHEN MATCHED AND s.updated_at > t.updated_at THEN
UPDATE SET
t.name = s.name,
t.email = s.email,
t.updated_at = s.updated_at
-- マッチしない行: 挿入
WHEN NOT MATCHED THEN
INSERT (customer_id, name, email, created_at, updated_at)
VALUES (s.customer_id, s.name, s.email, s.created_at, s.updated_at)
-- ソースにない行: 削除(オプション)
WHEN NOT MATCHED BY SOURCE THEN
DELETE;WHEN NOT MATCHED BY SOURCE はDelta Lake 2.4以降で追加された構文で、 ターゲットにはあるがソースにはない行(つまり削除されたデータ)を処理できます。
最も基本的なパターンです。キーが一致すれば更新、一致しなければ挿入します。 Silver層へのデータ取り込みで最もよく使います。
-- シンプルなUPSERT
MERGE INTO silver.products t
USING staging.new_products s
ON t.product_id = s.product_id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *;SET * と INSERT * は「全カラム」を意味します。 カラム構造がソースとターゲットで同じ場合に使えるショートカットです。
Bronze層には同じレコードが複数回到着することがあります(リトライ、重複送信など)。 MERGE + ROW_NUMBERで最新の1件だけを取り込む重複排除パターンが定番です。
MERGE INTO silver.events t
USING (
SELECT *, ROW_NUMBER() OVER (
PARTITION BY event_id
ORDER BY event_timestamp DESC
) AS rn
FROM bronze.raw_events
WHERE _ingested_at > current_timestamp() - INTERVAL 1 HOUR
QUALIFY rn = 1
) s
ON t.event_id = s.event_id
WHEN MATCHED AND s.event_timestamp > t.event_timestamp
THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *;ソース側で事前にROW_NUMBERで重複を排除してからMERGEするのがポイントです。 MERGEのソースに重複があると、同一キーに対して複数のマッチが発生してエラーになります。
SCD Type 1は、変更があった場合に既存レコードを単純に上書きするパターンです。 履歴は保持しません。最新の状態だけが重要な場合に使います。
-- SCD Type 1: 変更があれば上書き
MERGE INTO dim.customers t
USING staging.customer_updates s
ON t.customer_id = s.customer_id
WHEN MATCHED AND (
t.name != s.name OR
t.email != s.email OR
t.address != s.address
) THEN UPDATE SET
t.name = s.name,
t.email = s.email,
t.address = s.address,
t.updated_at = current_timestamp()
WHEN NOT MATCHED THEN INSERT *;SCD Type 2は、変更前のレコードを「期限切れ」にし、変更後のレコードを新しい行として挿入するパターンです。 変更履歴を完全に保持します。is_current フラグと有効期間(effective_from / effective_to)で管理します。
-- SCD Type 2: ステップ1 - 既存レコードの期限切れ処理
MERGE INTO dim.customers t
USING staging.customer_updates s
ON t.customer_id = s.customer_id AND t.is_current = true
WHEN MATCHED AND (t.name != s.name OR t.email != s.email) THEN
UPDATE SET
t.is_current = false,
t.effective_to = current_date();
-- SCD Type 2: ステップ2 - 新レコードの挿入
INSERT INTO dim.customers
SELECT
customer_id,
name,
email,
true AS is_current,
current_date() AS effective_from,
'9999-12-31' AS effective_to
FROM staging.customer_updates s
WHERE NOT EXISTS (
SELECT 1 FROM dim.customers t
WHERE t.customer_id = s.customer_id
AND t.is_current = true
AND t.name = s.name
AND t.email = s.email
);SCD Type 2はMERGEだけでは完結しにくいため、UPDATE + INSERT の2ステップで実装するのが一般的です。 Delta Live Tables(DLT)には APPLY CHANGES INTO というSCD Type 2専用の構文があり、 これを使うとかなり簡潔に書けます。試験ではこの構文も出題範囲です。
Structured Streamingで増分データをSilver層にMERGEするには、foreachBatch を使います。 各マイクロバッチの結果をDataFrameとして受け取り、MERGEを実行します。
def upsert_to_silver(batch_df, batch_id):
# ソース側で重複排除
deduped = batch_df.dropDuplicates(["customer_id"])
deduped.createOrReplaceTempView("updates")
spark.sql("""
MERGE INTO silver.customers t
USING updates s
ON t.customer_id = s.customer_id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
""")
(spark.readStream
.format("delta")
.table("bronze.raw_customers")
.writeStream
.foreachBatch(upsert_to_silver)
.option("checkpointLocation", "/checkpoint/silver_customers")
.trigger(availableNow=True)
.start()
)| 課題 | 対策 | 効果 |
|---|---|---|
| マッチ条件のスキャンが遅い | マッチキーでZ-Order / Liquid Clustering | スキャンファイル数が大幅減少 |
| ソースに重複がある | ROW_NUMBER + QUALIFYで事前排除 | MERGEのエラー防止 |
| 大量データのMERGE | foreachBatchでバッチ分割 | メモリ安定、障害復旧容易 |
| 不要なファイルの蓄積 | MERGE後のOPTIMIZE + VACUUM | 読み取り性能の維持 |
Data Engineer Associate / Professional
問題 1
Bronze層に同一customer_idのレコードが複数回到着することがある。Silver層のcustomersテーブルにはcustomer_idごとに最新の1レコードだけを保持したい。最も適切な実装はどれか。
正解: A
MERGEのソースに同一キーの重複があるとエラーになるため、事前にROW_NUMBER + QUALIFYで最新1件に絞ってからMERGEを実行するのが正しいパターンです。BはMERGE内部でROW_NUMBERは使えません。CはUPSERTではなく全件挿入で重複が残ります。DはDelta LakeにUNIQUE制約はありません(CHECK制約のみ)。
MERGE INTOとINSERT OVERWRITEの違いは?
MERGE INTOは行単位でマッチ条件を評価し、マッチした行の更新・マッチしない行の挿入・条件付き削除を1ステートメントで行います。INSERT OVERWRITEはパーティション単位でデータを丸ごと上書きします。既存データとの差分処理が必要な場面ではMERGE、パーティション全体の洗い替えが目的ならINSERT OVERWRITEが適しています。
MERGEのパフォーマンスを改善するには?
最も効果的なのはマッチ条件に使うカラムでテーブルをZ-OrderまたはLiquid Clusteringすることです。これによりスキャン対象のファイル数が大幅に減ります。またソースデータの重複排除を事前に行い、MERGEの処理量自体を減らすのも重要です。大量データのMERGEでは、foreachBatchでマイクロバッチ化する方法も有効です。
試験でMERGEはどう出題されますか?
Data Engineer Associate/Professionalの両方で最頻出のSQL操作です。基本構文(USING / ON / WHEN MATCHED / WHEN NOT MATCHED)、SCD Type 1/2の実装パターン、重複排除への応用、foreachBatchとの組み合わせが主な出題範囲です。WHEN NOT MATCHED BY SOURCE(ソースにない行の処理)も近年追加された構文で出題対象です。
NicheeLab編集部
データエンジニアリング・クラウド資格の専門家。Databricks・Snowflake等の認定資格を保有し、実務経験に基づいた問題作成・解説を行っています。NicheeLab運営。
Databricks資格一覧|全7試験・難易度・勉強法
Databricks認定資格全7試験の一覧・難易度・出題範囲・合格ラインを徹底解説。2026年最新版の公式試験ガイドに準...
Databricks試験の難易度ランキング|全7資格を徹底比較
Databricks認定全7試験の難易度をランキング形式で徹底比較。合格率・学習時間・出題傾向から難易度を分析。...
Databricks資格の勉強方法|最短合格ルートと学習時間の目安
Databricks認定資格に最短で合格するための勉強方法を完全ガイド。公式リソース・問題集・学習スケジュールを徹底解説...
Databricks Data Engineer Associate完全解説|出題範囲・問題例・合格戦略
Databricks Certified Data Engineer Associate試験を徹底解説。5つの出題ドメイ...
Databricks Data Engineer Professional完全解説|上級試験の攻略法
Databricks Certified Data Engineer Professional試験を徹底解説。10の出題...