Lakeflow Declarative Pipelines(旧Delta Live Tables / DLT)のExpectationsは、パイプライン内のデータ品質を宣言的に検査・制御する仕組みです。 「このカラムはNULLであってはならない」「金額は0以上」といったルールをテーブル定義に埋め込み、違反レコードの記録・除外・パイプライン停止を選択できます。
本記事ではExpectationsの3つの動作モード、SQL・PySparkそれぞれの構文、quarantineパターン、イベントログでの品質モニタリング、 そしてDelta Table Constraints(NOT NULL / CHECK)との使い分けを整理します。
Expectationsには3つのモードがあり、違反レコードの扱いが異なります。 パイプラインの性質(バッチ/ストリーミング)やビジネス要件(データ欠損の許容度)に応じて使い分けます。
| モード | 違反時の動作 | 出力テーブルへの影響 | パイプライン継続 | 主な用途 |
|---|---|---|---|---|
| EXPECT(記録) | 違反レコードを記録するが出力に含める | 全レコードが書き込まれる | 継続 | 品質モニタリング、初期段階の可視化 |
| EXPECT OR DROP(ドロップ) | 違反レコードを出力から除外する | 合格レコードのみ書き込まれる | 継続 | 不良データのフィルタリング、Silverテーブル生成 |
| EXPECT OR FAIL(失敗) | 1件でも違反があればパイプラインを停止 | トランザクションがロールバックされる | 停止 | Goldテーブル、課金データなど欠損不可のケース |
EXPECTモードは「違反があっても止めたくないが数を把握したい」場合に最適です。Bronzeレイヤーでの初期取り込み時に使い、 イベントログで違反傾向を監視するユースケースが典型です。
SQLでExpectationsを定義するには、CREATE OR REFRESH文のCONSTRAINT句を使います。 テーブル定義の一部として宣言的に品質ルールを記述できます。
-- EXPECT: 違反を記録するが全レコードを出力に含める
CREATE OR REFRESH MATERIALIZED VIEW silver_orders
(
CONSTRAINT valid_amount EXPECT (amount > 0),
CONSTRAINT not_null_id EXPECT (order_id IS NOT NULL)
)
AS SELECT * FROM LIVE.bronze_orders;
-- EXPECT OR DROP: 違反レコードを出力から除外
CREATE OR REFRESH MATERIALIZED VIEW silver_orders_clean
(
CONSTRAINT valid_amount EXPECT (amount > 0) ON VIOLATION DROP ROW,
CONSTRAINT not_null_id EXPECT (order_id IS NOT NULL) ON VIOLATION DROP ROW
)
AS SELECT * FROM LIVE.bronze_orders;
-- EXPECT OR FAIL: 1件でも違反があればパイプラインを停止
CREATE OR REFRESH MATERIALIZED VIEW gold_revenue
(
CONSTRAINT valid_amount EXPECT (amount > 0) ON VIOLATION FAIL UPDATE
)
AS SELECT region, SUM(amount) AS total FROM LIVE.silver_orders_clean
GROUP BY region;CONSTRAINT名はパイプライン内で一意にします。イベントログでこの名前をキーに違反数を追跡できるため、 業務的に意味のある命名(valid_amount, not_null_customer_idなど)が推奨されます。
PySparkではデコレータを使ってExpectationsを定義します。 複数の条件を辞書形式でまとめて指定できる@dlt.expect_all系が実務では多用されます。
import dlt
from pyspark.sql.functions import col
# 単一条件の EXPECT(記録のみ)
@dlt.table
@dlt.expect("valid_amount", "amount > 0")
@dlt.expect("not_null_id", "order_id IS NOT NULL")
def silver_orders():
return dlt.read_stream("bronze_orders")
# 複数条件をまとめて DROP
quality_rules = {
"valid_amount": "amount > 0",
"not_null_id": "order_id IS NOT NULL",
"valid_region": "region IN ('APAC','EMEA','NA')"
}
@dlt.table
@dlt.expect_all_or_drop(quality_rules)
def silver_orders_clean():
return dlt.read_stream("bronze_orders")
# FAIL: 1件でも違反でパイプライン停止
@dlt.table
@dlt.expect_all_or_fail({"positive_revenue": "total > 0"})
def gold_revenue():
return (
dlt.read("silver_orders_clean")
.groupBy("region")
.agg({"amount": "sum"})
.withColumnRenamed("sum(amount)", "total")
)@dlt.expect_allは辞書を受け取り、すべてのルールをEXPECT(記録)モードで適用します。 @dlt.expect_all_or_dropと@dlt.expect_all_or_failはそれぞれDROPモード・FAILモードに対応します。 辞書を外部変数に切り出すと、ルール管理やテスト時の差し替えが容易になります。
EXPECT OR DROPではレコードが消えてしまい、EXPECTでは不良データが下流に流れます。 「違反データは残すが下流に流さない」場合には、quarantine(隔離)テーブルパターンを使います。
import dlt
from pyspark.sql.functions import expr
@dlt.table(comment="品質条件を満たすレコードのみ")
def silver_orders_valid():
return (
dlt.read_stream("bronze_orders")
.filter("amount > 0 AND order_id IS NOT NULL")
)
@dlt.table(comment="品質違反レコードを隔離")
def silver_orders_quarantine():
return (
dlt.read_stream("bronze_orders")
.filter("amount <= 0 OR order_id IS NULL")
.withColumn("quarantine_reason",
expr("""CASE
WHEN amount <= 0 THEN 'invalid_amount'
WHEN order_id IS NULL THEN 'null_order_id'
ELSE 'unknown'
END"""))
)quarantineテーブルに違反理由カラムを付与しておくと、データ品質レポートの作成や 運用チームによる原因調査・修正後の再投入ワークフローに活用できます。
Expectationsの違反数はパイプラインのイベントログに自動記録されます。 UIのパイプライン詳細画面で確認できるほか、SQLでevent_logテーブルを直接クエリできます。
SELECT
details:flow_progress.data_quality.expectations.name AS expectation_name,
details:flow_progress.data_quality.expectations.dataset AS dataset,
details:flow_progress.data_quality.expectations.passed_records AS passed,
details:flow_progress.data_quality.expectations.failed_records AS failed,
timestamp
FROM event_log(TABLE(my_catalog.my_schema.my_pipeline))
WHERE event_type = 'flow_progress'
AND details:flow_progress.data_quality IS NOT NULL
ORDER BY timestamp DESC;このクエリで各Expectationの合格数・違反数を時系列で取得できます。 違反率が急増した場合にアラートを設定する運用と組み合わせると、上流データの品質劣化を早期に検知できます。
DLT ExpectationsとDelta Table Constraintsはどちらもデータ品質に関わりますが、適用レイヤーと挙動が異なります。
| 観点 | DLT Expectations | Delta Constraints(NOT NULL / CHECK) |
|---|---|---|
| 適用レイヤー | DLTパイプライン内のデータフロー | Deltaテーブルへの書き込み時 |
| 定義方法 | CONSTRAINT句 / @dlt.expectデコレータ | ALTER TABLE ADD CONSTRAINT |
| 違反時の選択肢 | 記録 / ドロップ / 失敗の3段階 | 書き込みエラー(拒否)のみ |
| メトリクス記録 | イベントログに自動記録 | なし(エラーログのみ) |
| DLT外での利用 | 不可(DLTパイプライン専用) | 可能(通常のSpark書き込みにも適用) |
| 用途の典型 | パイプライン中間段階の品質監視・フィルタ | 最終テーブルのスキーマガード |
実務では、DLT Expectationsでパイプライン途中の品質を段階的にチェックし、最終的なGoldテーブルにDelta Constraintsを設定して 「品質を通過したデータしか書き込めない」構造にするのが堅牢です。
この段階的な品質ゲートにより、上流の品質問題が下流に波及するリスクを最小化しつつ、 Bronze段階でのデータロスを防ぎます。
Data Engineer Associate / Professional
問題 1
DLTパイプラインのSilverテーブルで、amount列が0以下のレコードを出力に含めず、かつ違反レコード数をイベントログで追跡したい。パイプラインは違反があっても停止させたくない。最も適切な方法はどれか。
正解: B
EXPECT ... ON VIOLATION DROP ROWは、違反レコードを出力から除外しつつパイプラインは継続し、違反数はイベントログに記録されます。EXPECTのみ(A)は全レコードを出力に含めてしまいます。FAIL UPDATE(C)はパイプラインを停止します。CHECK制約(D)はDLTパイプラインの機能ではなくDeltaテーブルの制約であり、イベントログへの品質メトリクス記録もされません。
EXPECT OR DROPで除外されたレコードはどこに行きますか?復元できますか?
EXPECT OR DROPで除外されたレコードは出力テーブルに書き込まれず、デフォルトでは自動保存されません。ただしイベントログにはドロップされた行数が記録されます。復元可能にするにはquarantineパターンを自前で実装し、違反レコードを別テーブルに書き出す設計が必要です。
Expectationsの違反メトリクスはどこで確認できますか?
DLTパイプラインのイベントログ(event_log)に記録されます。flow_progress.data_quality.expectationsフィールドから、各Expectation名・合格数・違反数を取得できます。UIのパイプライン詳細画面でもグラフで確認可能です。
ExpectationsとDelta Constraintsは併用できますか?
はい、併用可能です。Expectationsはパイプライン内のデータフロー段階で品質を検査し、Delta Constraints(NOT NULL / CHECK)はテーブル書き込み時に制約を強制します。Expectationsでソフトに監視しつつ、最終テーブルにDelta Constraintsでハードなガードを置く二段構えが堅牢な設計です。
NicheeLab編集部
データエンジニアリング・クラウド資格の専門家。Databricks・Snowflake等の認定資格を保有し、実務経験に基づいた問題作成・解説を行っています。NicheeLab運営。
Databricks資格一覧|全7試験・難易度・勉強法
Databricks認定資格全7試験の一覧・難易度・出題範囲・合格ラインを徹底解説。2026年最新版の公式試験ガイドに準...
Databricks試験の難易度ランキング|全7資格を徹底比較
Databricks認定全7試験の難易度をランキング形式で徹底比較。合格率・学習時間・出題傾向から難易度を分析。...
Databricks資格の勉強方法|最短合格ルートと学習時間の目安
Databricks認定資格に最短で合格するための勉強方法を完全ガイド。公式リソース・問題集・学習スケジュールを徹底解説...
Databricks Data Engineer Associate完全解説|出題範囲・問題例・合格戦略
Databricks Certified Data Engineer Associate試験を徹底解説。5つの出題ドメイ...
Databricks Data Engineer Professional完全解説|上級試験の攻略法
Databricks Certified Data Engineer Professional試験を徹底解説。10の出題...