Databricks

Lakeflow Declarative PipelinesのExpectations設計ガイド: 品質監視と無効データ処理

2026-03-21
更新: 2026-03-27
NicheeLab編集部

Lakeflow Declarative Pipelines(旧Delta Live Tables / DLT)のExpectationsは、パイプライン内のデータ品質を宣言的に検査・制御する仕組みです。 「このカラムはNULLであってはならない」「金額は0以上」といったルールをテーブル定義に埋め込み、違反レコードの記録・除外・パイプライン停止を選択できます。

本記事ではExpectationsの3つの動作モード、SQL・PySparkそれぞれの構文、quarantineパターン、イベントログでの品質モニタリング、 そしてDelta Table Constraints(NOT NULL / CHECK)との使い分けを整理します。

Expectationsの3つの動作モード

Expectationsには3つのモードがあり、違反レコードの扱いが異なります。 パイプラインの性質(バッチ/ストリーミング)やビジネス要件(データ欠損の許容度)に応じて使い分けます。

モード違反時の動作出力テーブルへの影響パイプライン継続主な用途
EXPECT(記録)違反レコードを記録するが出力に含める全レコードが書き込まれる継続品質モニタリング、初期段階の可視化
EXPECT OR DROP(ドロップ)違反レコードを出力から除外する合格レコードのみ書き込まれる継続不良データのフィルタリング、Silverテーブル生成
EXPECT OR FAIL(失敗)1件でも違反があればパイプラインを停止トランザクションがロールバックされる停止Goldテーブル、課金データなど欠損不可のケース

EXPECTモードは「違反があっても止めたくないが数を把握したい」場合に最適です。Bronzeレイヤーでの初期取り込み時に使い、 イベントログで違反傾向を監視するユースケースが典型です。

SQL構文:CONSTRAINT句によるExpectations定義

SQLでExpectationsを定義するには、CREATE OR REFRESH文のCONSTRAINT句を使います。 テーブル定義の一部として宣言的に品質ルールを記述できます。

-- EXPECT: 違反を記録するが全レコードを出力に含める
CREATE OR REFRESH MATERIALIZED VIEW silver_orders
(
  CONSTRAINT valid_amount EXPECT (amount > 0),
  CONSTRAINT not_null_id EXPECT (order_id IS NOT NULL)
)
AS SELECT * FROM LIVE.bronze_orders;

-- EXPECT OR DROP: 違反レコードを出力から除外
CREATE OR REFRESH MATERIALIZED VIEW silver_orders_clean
(
  CONSTRAINT valid_amount EXPECT (amount > 0) ON VIOLATION DROP ROW,
  CONSTRAINT not_null_id EXPECT (order_id IS NOT NULL) ON VIOLATION DROP ROW
)
AS SELECT * FROM LIVE.bronze_orders;

-- EXPECT OR FAIL: 1件でも違反があればパイプラインを停止
CREATE OR REFRESH MATERIALIZED VIEW gold_revenue
(
  CONSTRAINT valid_amount EXPECT (amount > 0) ON VIOLATION FAIL UPDATE
)
AS SELECT region, SUM(amount) AS total FROM LIVE.silver_orders_clean
GROUP BY region;

CONSTRAINT名はパイプライン内で一意にします。イベントログでこの名前をキーに違反数を追跡できるため、 業務的に意味のある命名(valid_amount, not_null_customer_idなど)が推奨されます。

PySpark構文:@dlt.expectデコレータ

PySparkではデコレータを使ってExpectationsを定義します。 複数の条件を辞書形式でまとめて指定できる@dlt.expect_all系が実務では多用されます。

import dlt
from pyspark.sql.functions import col

# 単一条件の EXPECT(記録のみ)
@dlt.table
@dlt.expect("valid_amount", "amount > 0")
@dlt.expect("not_null_id", "order_id IS NOT NULL")
def silver_orders():
    return dlt.read_stream("bronze_orders")

# 複数条件をまとめて DROP
quality_rules = {
    "valid_amount": "amount > 0",
    "not_null_id": "order_id IS NOT NULL",
    "valid_region": "region IN ('APAC','EMEA','NA')"
}

@dlt.table
@dlt.expect_all_or_drop(quality_rules)
def silver_orders_clean():
    return dlt.read_stream("bronze_orders")

# FAIL: 1件でも違反でパイプライン停止
@dlt.table
@dlt.expect_all_or_fail({"positive_revenue": "total > 0"})
def gold_revenue():
    return (
        dlt.read("silver_orders_clean")
        .groupBy("region")
        .agg({"amount": "sum"})
        .withColumnRenamed("sum(amount)", "total")
    )

@dlt.expect_allは辞書を受け取り、すべてのルールをEXPECT(記録)モードで適用します。 @dlt.expect_all_or_dropと@dlt.expect_all_or_failはそれぞれDROPモード・FAILモードに対応します。 辞書を外部変数に切り出すと、ルール管理やテスト時の差し替えが容易になります。

Quarantineパターン:違反データを隔離して後処理する

EXPECT OR DROPではレコードが消えてしまい、EXPECTでは不良データが下流に流れます。 「違反データは残すが下流に流さない」場合には、quarantine(隔離)テーブルパターンを使います。

import dlt
from pyspark.sql.functions import expr

@dlt.table(comment="品質条件を満たすレコードのみ")
def silver_orders_valid():
    return (
        dlt.read_stream("bronze_orders")
        .filter("amount > 0 AND order_id IS NOT NULL")
    )

@dlt.table(comment="品質違反レコードを隔離")
def silver_orders_quarantine():
    return (
        dlt.read_stream("bronze_orders")
        .filter("amount <= 0 OR order_id IS NULL")
        .withColumn("quarantine_reason",
            expr("""CASE
                WHEN amount <= 0 THEN 'invalid_amount'
                WHEN order_id IS NULL THEN 'null_order_id'
                ELSE 'unknown'
            END"""))
    )

quarantineテーブルに違反理由カラムを付与しておくと、データ品質レポートの作成や 運用チームによる原因調査・修正後の再投入ワークフローに活用できます。

イベントログでの品質メトリクス確認

Expectationsの違反数はパイプラインのイベントログに自動記録されます。 UIのパイプライン詳細画面で確認できるほか、SQLでevent_logテーブルを直接クエリできます。

SELECT
  details:flow_progress.data_quality.expectations.name AS expectation_name,
  details:flow_progress.data_quality.expectations.dataset AS dataset,
  details:flow_progress.data_quality.expectations.passed_records AS passed,
  details:flow_progress.data_quality.expectations.failed_records AS failed,
  timestamp
FROM event_log(TABLE(my_catalog.my_schema.my_pipeline))
WHERE event_type = 'flow_progress'
  AND details:flow_progress.data_quality IS NOT NULL
ORDER BY timestamp DESC;

このクエリで各Expectationの合格数・違反数を時系列で取得できます。 違反率が急増した場合にアラートを設定する運用と組み合わせると、上流データの品質劣化を早期に検知できます。

Delta Constraints(NOT NULL / CHECK)との使い分け

DLT ExpectationsとDelta Table Constraintsはどちらもデータ品質に関わりますが、適用レイヤーと挙動が異なります。

観点DLT ExpectationsDelta Constraints(NOT NULL / CHECK)
適用レイヤーDLTパイプライン内のデータフローDeltaテーブルへの書き込み時
定義方法CONSTRAINT句 / @dlt.expectデコレータALTER TABLE ADD CONSTRAINT
違反時の選択肢記録 / ドロップ / 失敗の3段階書き込みエラー(拒否)のみ
メトリクス記録イベントログに自動記録なし(エラーログのみ)
DLT外での利用不可(DLTパイプライン専用)可能(通常のSpark書き込みにも適用)
用途の典型パイプライン中間段階の品質監視・フィルタ最終テーブルのスキーマガード

実務では、DLT Expectationsでパイプライン途中の品質を段階的にチェックし、最終的なGoldテーブルにDelta Constraintsを設定して 「品質を通過したデータしか書き込めない」構造にするのが堅牢です。

設計ガイドライン:Medallionアーキテクチャでの配置

  • Bronze: EXPECT(記録のみ)で生データの品質傾向を可視化。ドロップや失敗は行わない
  • Silver: EXPECT OR DROPで不良レコードを除外。必要ならquarantineテーブルに隔離
  • Gold: EXPECT OR FAILで品質違反をゼロ保証。課金や集計用テーブルに適用

この段階的な品質ゲートにより、上流の品質問題が下流に波及するリスクを最小化しつつ、 Bronze段階でのデータロスを防ぎます。

試験で問われるポイント

  • 3つのモード(EXPECT / EXPECT OR DROP / EXPECT OR FAIL)の違反時の挙動を正確に区別できるか
  • EXPECT OR DROPで除外されたレコードがどうなるか(自動保存されない=quarantineは自前実装)
  • イベントログからExpectationsの合格数・違反数を取得する方法
  • Delta Constraintsとの適用レイヤーの違い(パイプライン内 vs テーブル書き込み時)

問題で確認

Data Engineer Associate / Professional

問題 1

DLTパイプラインのSilverテーブルで、amount列が0以下のレコードを出力に含めず、かつ違反レコード数をイベントログで追跡したい。パイプラインは違反があっても停止させたくない。最も適切な方法はどれか。

  1. EXPECT制約でamount > 0を定義する(ON VIOLATION句なし)
  2. EXPECT制約でamount > 0を定義し、ON VIOLATION DROP ROWを指定する
  3. EXPECT制約でamount > 0を定義し、ON VIOLATION FAIL UPDATEを指定する
  4. ALTER TABLEでCHECK制約 (amount > 0) をSilverテーブルに追加する

正解: B

EXPECT ... ON VIOLATION DROP ROWは、違反レコードを出力から除外しつつパイプラインは継続し、違反数はイベントログに記録されます。EXPECTのみ(A)は全レコードを出力に含めてしまいます。FAIL UPDATE(C)はパイプラインを停止します。CHECK制約(D)はDLTパイプラインの機能ではなくDeltaテーブルの制約であり、イベントログへの品質メトリクス記録もされません。

よくある質問

EXPECT OR DROPで除外されたレコードはどこに行きますか?復元できますか?

EXPECT OR DROPで除外されたレコードは出力テーブルに書き込まれず、デフォルトでは自動保存されません。ただしイベントログにはドロップされた行数が記録されます。復元可能にするにはquarantineパターンを自前で実装し、違反レコードを別テーブルに書き出す設計が必要です。

Expectationsの違反メトリクスはどこで確認できますか?

DLTパイプラインのイベントログ(event_log)に記録されます。flow_progress.data_quality.expectationsフィールドから、各Expectation名・合格数・違反数を取得できます。UIのパイプライン詳細画面でもグラフで確認可能です。

ExpectationsとDelta Constraintsは併用できますか?

はい、併用可能です。Expectationsはパイプライン内のデータフロー段階で品質を検査し、Delta Constraints(NOT NULL / CHECK)はテーブル書き込み時に制約を強制します。Expectationsでソフトに監視しつつ、最終テーブルにDelta Constraintsでハードなガードを置く二段構えが堅牢な設計です。

この記事で学んだ内容を問題で確認しましょう

16,000問以上の問題で実力チェック

無料で問題を解いてみる
この記事の著者

NicheeLab編集部

データエンジニアリング・クラウド資格の専門家。Databricks・Snowflake等の認定資格を保有し、実務経験に基づいた問題作成・解説を行っています。NicheeLab運営。


関連記事
Databricks

Databricks資格一覧|全7試験・難易度・勉強法

Databricks認定資格全7試験の一覧・難易度・出題範囲・合格ラインを徹底解説。2026年最新版の公式試験ガイドに準...

Databricks

Databricks試験の難易度ランキング|全7資格を徹底比較

Databricks認定全7試験の難易度をランキング形式で徹底比較。合格率・学習時間・出題傾向から難易度を分析。...

Databricks

Databricks資格の勉強方法|最短合格ルートと学習時間の目安

Databricks認定資格に最短で合格するための勉強方法を完全ガイド。公式リソース・問題集・学習スケジュールを徹底解説...

Databricks

Databricks Data Engineer Associate完全解説|出題範囲・問題例・合格戦略

Databricks Certified Data Engineer Associate試験を徹底解説。5つの出題ドメイ...

Databricks

Databricks Data Engineer Professional完全解説|上級試験の攻略法

Databricks Certified Data Engineer Professional試験を徹底解説。10の出題...

Databricksの記事一覧 (105件)
© 2026 NicheeLab All rights reserved.