Change Data Feed(CDF)は、Delta Tableに対する変更(INSERT / UPDATE / DELETE)を自動的に記録し、 下流のパイプラインやシステムに「何が変わったか」を効率的に伝播させるための機能です。 Silver → Gold 間の増分処理や、外部システムへのデータ連携に活用されます。
この記事では、CDFの有効化、_change_typeの種類と意味、バッチ/ストリーミングでの読み取り方法、 foreachBatch + MERGEでの下流反映パターン、DLT APPLY CHANGES INTOとの関係、 そして試験で問われるポイントを整理します。
CDFはテーブル単位で有効化します。新規テーブルの作成時、または既存テーブルのプロパティ変更で設定できます。 CDF有効化後の変更のみが記録されます。有効化前の変更履歴は参照できません。
-- 新規テーブル作成時に有効化
CREATE TABLE silver.orders (
order_id BIGINT,
customer_id BIGINT,
amount DECIMAL(10,2),
status STRING,
updated_at TIMESTAMP
)
TBLPROPERTIES (delta.enableChangeDataFeed = true);
-- 既存テーブルに対して有効化
ALTER TABLE silver.orders
SET TBLPROPERTIES (delta.enableChangeDataFeed = true);
-- ワークスペース全体でデフォルト有効化
SET spark.databricks.delta.properties.defaults.enableChangeDataFeed = true;CDFで読み取ったデータには、自動的に3つのメタカラムが追加されます。 最も重要なのは _change_type で、変更の種類を示します。
| _change_type | 意味 | 発生する操作 |
|---|---|---|
| insert | 新しい行が挿入された | INSERT、MERGE(NOT MATCHED THEN INSERT) |
| update_preimage | 更新前の行の状態 | UPDATE、MERGE(MATCHED THEN UPDATE) |
| update_postimage | 更新後の行の状態 | UPDATE、MERGE(MATCHED THEN UPDATE) |
| delete | 行が削除された | DELETE、MERGE(MATCHED THEN DELETE) |
UPDATEでは preimage(変更前)と postimage(変更後)の2行が記録されます。 「何から何に変わったか」を完全にトレースできるのがCDFの強みです。 追加のメタカラムとして _commit_version(Deltaバージョン番号)と_commit_timestamp(コミット時刻)も付与されます。
-- バージョン範囲で変更を取得
SELECT * FROM table_changes('silver.orders', 5, 10);
-- タイムスタンプ範囲で変更を取得
SELECT * FROM table_changes('silver.orders', '2026-03-01', '2026-03-27');
-- 特定の_change_typeだけフィルタ
SELECT * FROM table_changes('silver.orders', 5)
WHERE _change_type IN ('insert', 'update_postimage');# バージョン指定で開始
cdf_stream = (spark.readStream
.format("delta")
.option("readChangeFeed", "true")
.option("startingVersion", 5)
.table("silver.orders")
)
# タイムスタンプ指定で開始
cdf_stream = (spark.readStream
.format("delta")
.option("readChangeFeed", "true")
.option("startingTimestamp", "2026-03-01")
.table("silver.orders")
)CDFの最も一般的な使い方は、Silver → Gold 間の増分処理です。 Gold層の集約テーブルを毎回フルリビルドする代わりに、CDFで変更分だけを取得してMERGEで反映します。
def update_gold(batch_df, batch_id):
updates = batch_df.filter(
"_change_type IN ('insert', 'update_postimage')"
)
deletes = batch_df.filter("_change_type = 'delete'")
updates.createOrReplaceTempView("updates")
deletes.createOrReplaceTempView("deletes")
# UPSERT: 新規挿入と更新後の状態を反映
spark.sql("""
MERGE INTO gold.order_summary t
USING updates s ON t.order_id = s.order_id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
""")
# DELETE: 削除された行をGoldからも削除
spark.sql("""
MERGE INTO gold.order_summary t
USING deletes s ON t.order_id = s.order_id
WHEN MATCHED THEN DELETE
""")
(spark.readStream
.format("delta")
.option("readChangeFeed", "true")
.table("silver.orders")
.writeStream
.foreachBatch(update_gold)
.option("checkpointLocation", "/checkpoint/gold_orders")
.trigger(availableNow=True)
.start()
)trigger(availableNow=True)はバッチ的にストリームを実行し、 利用可能なすべてのCDFデータを処理した後にストリームを自動停止します。 スケジュールジョブとの相性が良いパターンです。
CDFは外部システムへの増分データ連携にも使えます。全データの再送信ではなく、 変更分だけを送るのでネットワークと処理コストを大幅に削減できます。
Delta Live Tables(DLT)のAPPLY CHANGES INTOは、CDFと類似の概念ですが レイヤーが異なります。CDFはDelta Table側の「変更記録機能」であり、 APPLY CHANGES INTOはDLTパイプライン側の「CDC取り込み構文」です。
| 比較軸 | Change Data Feed | DLT APPLY CHANGES INTO |
|---|---|---|
| 役割 | Delta Tableの変更を記録・提供 | CDCソースからDLTテーブルへ変更を適用 |
| 有効化 | テーブルプロパティ | DLTパイプライン定義(SQL/Python) |
| キー指定 | 不要(全行の変更を記録) | KEYS句で主キーを指定 |
| シーケンス | _commit_versionで自動順序付け | SEQUENCE BY句で順序カラムを指定 |
| SCD Type 2 | ユーザーがロジックを実装 | STORED AS SCD TYPE 2で自動管理 |
-- DLT APPLY CHANGES INTO の例
APPLY CHANGES INTO LIVE.silver_orders
FROM STREAM(LIVE.bronze_orders_cdc)
KEYS (order_id)
SEQUENCE BY updated_at
COLUMNS * EXCEPT (_rescued_data)
STORED AS SCD TYPE 1;CDFデータはDelta Tableの _change_data ディレクトリに保存されます。 VACUUMを実行すると、保持期間を超えたCDFデータも削除されます。
-- VACUUM実行(7日間保持)
VACUUM silver.orders RETAIN 168 HOURS;
-- 下流パイプラインの処理遅延を考慮して保持期間を長めに設定
ALTER TABLE silver.orders
SET TBLPROPERTIES (
'delta.deletedFileRetentionDuration' = 'interval 30 days'
);Data Engineer Professional
問題 1
Silver層のordersテーブルでCDFが有効になっている。あるレコードのstatusが 'pending' から 'shipped' に更新された。CDFから読み取ったデータにはどのような行が含まれるか。
正解: A
CDFではUPDATE操作に対してupdate_preimage(変更前の状態)とupdate_postimage(変更後の状態)の2行が記録されます。これにより「何から何に変わったか」を完全にトレースできます。単純な'update'タイプは存在しません。deleteとinsertの組み合わせではなくpreimage/postimageペアです。
CDFとStreamの違いは?
CDF(Change Data Feed)はDelta Tableへの変更記録を自動的に保持する機能で、テーブルに対するINSERT/UPDATE/DELETEの操作結果を、_change_type、_commit_version、_commit_timestampのメタカラム付きで取得できます。Structured StreamingはSparkのストリーミングフレームワークで、CDFデータの「消費手段」の一つです。CDFが「変更を記録する仕組み」、Streamが「変更を読み取る手段」と理解してください。
CDFを有効にするとストレージコストは増えますか?
はい、わずかに増えます。CDFが有効なテーブルでUPDATEやDELETEが実行されると、変更前後の行データが_change_dataディレクトリに別途保存されます。INSERTのみのテーブルではほぼ追加コストはありません。VACUUMで古いCDFデータを削除できますが、下流パイプラインが必要とする保持期間は維持する必要があります。
試験でCDFはどう出題されますか?
Data Engineer Professionalで特に頻出です。CDFの有効化方法(テーブルプロパティ)、readChangeFeedオプションでの読み取り、_change_typeの値(insert/update_preimage/update_postimage/delete)の意味、下流テーブルへの増分反映パターンが主な出題範囲です。table_changes()関数のバッチ読み取りも重要です。
NicheeLab編集部
データエンジニアリング・クラウド資格の専門家。Databricks・Snowflake等の認定資格を保有し、実務経験に基づいた問題作成・解説を行っています。NicheeLab運営。
Databricks資格一覧|全7試験・難易度・勉強法
Databricks認定資格全7試験の一覧・難易度・出題範囲・合格ラインを徹底解説。2026年最新版の公式試験ガイドに準...
Databricks試験の難易度ランキング|全7資格を徹底比較
Databricks認定全7試験の難易度をランキング形式で徹底比較。合格率・学習時間・出題傾向から難易度を分析。...
Databricks資格の勉強方法|最短合格ルートと学習時間の目安
Databricks認定資格に最短で合格するための勉強方法を完全ガイド。公式リソース・問題集・学習スケジュールを徹底解説...
Databricks Data Engineer Associate完全解説|出題範囲・問題例・合格戦略
Databricks Certified Data Engineer Associate試験を徹底解説。5つの出題ドメイ...
Databricks Data Engineer Professional完全解説|上級試験の攻略法
Databricks Certified Data Engineer Professional試験を徹底解説。10の出題...