Databricks

Databricks Structured Streamingのチェックポイント運用: 再開性・Exactly-once・障害復旧

2026-03-26
更新: 2026-03-27
NicheeLab編集部

Structured Streamingのチェックポイントは、ストリーミングクエリの進捗状態を永続化し、障害時の再開とExactly-once処理を実現する基盤です。 設計を誤るとデータの重複・欠損・ステート肥大化によるOOMが発生します。 本稿ではチェックポイントの内部構造、設計ルール、障害復旧手順、Exactly-onceの達成条件を実務に即して解説します。

チェックポイントの内部構造

チェックポイントディレクトリには4つのサブディレクトリが生成されます。 それぞれがストリーミングクエリの異なる状態を管理しています。

ディレクトリ内容役割
offsets/バッチごとの読み取りオフセット「どこまで読んだか」を記録。バッチID→オフセットのマッピング
commits/コミット済みバッチID「どのバッチまで書き込み完了したか」を記録
sources/ソース固有のメタデータファイルソースなら処理済みファイルリスト、Kafkaなら不要
state/ステートフル操作の中間状態ウォーターマーク、集約、ジョイン等のステートを永続化
/mnt/checkpoints/orders-pipeline/
├── offsets/
│   ├── 0    # バッチ0: {"kafka-topic": {"0": 100, "1": 150}}
│   ├── 1    # バッチ1: {"kafka-topic": {"0": 200, "1": 300}}
│   └── 2
├── commits/
│   ├── 0    # バッチ0のシンク書き込み完了
│   └── 1    # バッチ1のシンク書き込み完了
├── sources/
│   └── 0/
└── state/
    └── 0/
        └── delta/  # ステートストアのスナップショット

再起動時のリカバリロジックは次の通りです。offsetsに記録があるがcommitsに対応するバッチIDがない場合、そのバッチは「読み取り済み・未コミット」と判断され、再処理されます。 commitsに記録がある場合はスキップされるため、二重書き込みが防止されます。

設計ルール: 1クエリ1チェックポイント

チェックポイントの最重要ルールは「1つのストリーミングクエリに1つの固有のチェックポイントパス」を割り当てることです。

  • 複数クエリで同一パスを共有すると、offsetsファイルが競合してデータ欠損・重複が発生する
  • 同一クエリでもロジック変更時に古いチェックポイントを再利用すると、ステートの不整合でエラーになる場合がある
  • パス命名は /mnt/checkpoints/{pipeline-name}/{query-name}/ のような階層構造を推奨
# クエリごとに固有のcheckpointLocationを指定
orders_query = (
    orders_df.writeStream
    .format("delta")
    .outputMode("append")
    .option("checkpointLocation", "/mnt/checkpoints/orders-pipeline/orders-to-silver")
    .toTable("silver.orders")
)

payments_query = (
    payments_df.writeStream
    .format("delta")
    .outputMode("append")
    .option("checkpointLocation", "/mnt/checkpoints/orders-pipeline/payments-to-silver")
    .toTable("silver.payments")
)

チェックポイントの保存先選定

保存先メリット注意点
DBFSDatabricks環境で簡易に使えるワークスペース削除時に消失。本番非推奨
クラウドストレージ(S3/ADLS/GCS)永続性・耐久性が高い。推奨ネットワークレイテンシ(通常無視可能)
Unity Catalog Volumesガバナンス統合、アクセス制御UCが有効な環境限定

本番環境ではクラウドストレージまたはUC Volumesを使用し、DBFS上のチェックポイントは開発・検証用に限定します。

Exactly-once処理の達成条件

Structured StreamingのExactly-onceセマンティクスは、ソースのリプレイ可能性とシンクの冪等性の両方が揃って初めて成立します。

要素条件Exactly-once保証
ソースリプレイ可能(Kafka、ファイル、Delta)チェックポイントでオフセットを管理
シンク冪等書き込み(Delta、foreachBatch+トランザクション)再処理時の重複を防止
チェックポイント永続ストレージに保存、1クエリ1パス進捗の正確な追跡

Deltaテーブルをシンクにする場合、DeltaのトランザクションログとStructured Streamingのチェックポイントが協調してExactly-onceを実現します。 foreachBatchで外部システムに書き込む場合は、バッチIDを使った冪等キーの管理が必要です。

def process_batch(batch_df, batch_id):
    # バッチIDを使った冪等処理の例
    batch_df.write \
        .format("delta") \
        .mode("append") \
        .saveAsTable("silver.events")

(source_df.writeStream
    .foreachBatch(process_batch)
    .option("checkpointLocation", "/mnt/checkpoints/events-pipeline/events-to-silver")
    .start()
)

障害復旧手順

ストリーミングジョブが障害で停止した場合の復旧パターンを整理します。

パターン1: 単純な再起動(最も一般的)

同じcheckpointLocationを指定してジョブを再起動します。チェックポイントから最終コミット済みバッチの次から再開されます。 コードのロジックに変更がない場合はこれで十分です。

パターン2: ロジック変更を伴う復旧

  • 出力カラムの追加・削除: mergeSchemaで対応可能な場合はチェックポイント継続可
  • ステートフル操作の変更(ウォーターマーク期間変更、集約キー変更): チェックポイントをリセットし、シンクテーブルも再構築
  • ソースの変更(トピック追加等): チェックポイントをリセットして新規開始

パターン3: チェックポイント破損時の復旧

# 1. 破損したチェックポイントを退避
dbutils.fs.mv(
    "/mnt/checkpoints/orders-pipeline/orders-to-silver",
    "/mnt/checkpoints/_archived/orders-to-silver-corrupted-20260327"
)

# 2. シンクテーブルの最終タイムスタンプを確認
last_ts = spark.sql("""
    SELECT MAX(event_timestamp) FROM silver.orders
""").collect()[0][0]

# 3. 新チェックポイントで再開(重複防止にMERGEを使用)
(source_df.writeStream
    .foreachBatch(lambda df, id: merge_deduplicate(df, id))
    .option("checkpointLocation", "/mnt/checkpoints/orders-pipeline/orders-to-silver")
    .start()
)

ステート管理とウォーターマーク

ウィンドウ集約やstream-streamジョインでは、ステートがチェックポイントのstate/ディレクトリに蓄積されます。 ウォーターマークを設定しないとステートが無限に肥大化し、OOMを引き起こします。

from pyspark.sql.functions import window, col

(orders_df
    .withWatermark("event_time", "10 minutes")
    .groupBy(window("event_time", "5 minutes"), "product_id")
    .agg({"amount": "sum"})
    .writeStream
    .format("delta")
    .outputMode("append")
    .option("checkpointLocation", "/mnt/checkpoints/orders-pipeline/windowed-agg")
    .toTable("gold.orders_5min_agg")
)
  • ウォーターマークは「この時間より古いデータはもう来ない」という宣言
  • ウォーターマーク期限を過ぎたステートは自動的にパージされ、メモリとチェックポイントサイズを抑制する
  • 期限が短すぎると遅延データが破棄され、長すぎるとステートが肥大化するトレードオフ

運用チェックリスト

  • checkpointLocationはクラウドストレージまたはUC Volumesに配置する(DBFSは開発のみ)
  • 1クエリ1チェックポイントパスの命名規則を統一する
  • ウォーターマークをすべてのステートフル操作に設定する
  • チェックポイントディレクトリのサイズを定期監視し、異常な肥大化を検出する
  • 障害復旧手順をRunbookに文書化し、チェックポイントリセット時のシンクテーブル処理も明記する
  • ジョブの再試行ポリシーで自動復旧を設定する(DatabricksジョブのRetries設定)

問題で確認

Data Engineer Associate / Professional

問題 1

データエンジニアがKafkaからStructured StreamingでDeltaテーブルに書き込むパイプラインを運用している。障害で停止後に同じcheckpointLocationで再起動したところ、重複なくデータが処理された。この動作を正しく説明しているのはどれか。

  1. チェックポイントのoffsets/とcommits/の差分から未コミットバッチを特定し、そのバッチのみ再処理する。Deltaシンクのトランザクションログにより冪等な書き込みが保証される
  2. Kafka側のコンシューマーグループのオフセットが進んでいるため、停止時点の次のメッセージから読み込みを再開する
  3. Deltaテーブルに書き込み済みのデータをSELECTで照合し、重複行をフィルタしてからINSERTする
  4. チェックポイントが存在する場合、ソースの全データを最初から読み直し、Deltaのmerge機能で重複を排除する

正解: A

Structured Streamingのチェックポイントはoffsets(読み取り済み)とcommits(書き込み完了済み)の差分から再処理範囲を特定します。Deltaシンクはトランザクショナルな書き込みにより冪等性を保証します。KafkaのコンシューマーグループオフセットはStructured Streamingでは使用せず、チェックポイントが代替します。SELECTによる照合やmergeによる重複排除は行われません。

よくある質問

チェックポイントを削除するとどうなりますか?

チェックポイントを削除すると、ストリーミングクエリはすべての進捗情報(処理済みオフセット、ステート)を失います。次回起動時は初期状態から再開され、ソースの全データを再処理します。Deltaシンクにappendモードで書いている場合はデータ重複が発生します。チェックポイントの削除は、ロジック変更に伴うストリーム再構築時にのみ計画的に行い、シンク側のテーブルも併せてリセットするのが原則です。

1つのチェックポイントを複数のストリーミングクエリで共有できますか?

できません。チェックポイントは1クエリ1パスが鉄則です。複数クエリが同じチェックポイントパスを参照すると、オフセット管理が競合し、データの欠損や重複、予期しないエラーが発生します。クエリごとに固有のcheckpointLocationを指定してください。

Exactly-onceが保証されないケースはどのような場合ですか?

Exactly-onceが崩れる主なケースは3つあります。(1) シンクが冪等書き込みに対応していない場合(外部API呼び出し、非トランザクショナルなDB等)。(2) foreachBatchで複数の書き込みを1トランザクションにまとめていない場合。(3) チェックポイントをリセットした後にシンクのデータをクリアせず再処理した場合。Deltaテーブルをシンクにするとトランザクショナルな書き込みが保証されるため、Exactly-onceを最も安全に実現できます。

この記事で学んだ内容を問題で確認しましょう

16,000問以上の問題で実力チェック

無料で問題を解いてみる
この記事の著者

NicheeLab編集部

データエンジニアリング・クラウド資格の専門家。Databricks・Snowflake等の認定資格を保有し、実務経験に基づいた問題作成・解説を行っています。NicheeLab運営。


関連記事
Databricks

Databricks資格一覧|全7試験・難易度・勉強法

Databricks認定資格全7試験の一覧・難易度・出題範囲・合格ラインを徹底解説。2026年最新版の公式試験ガイドに準...

Databricks

Databricks試験の難易度ランキング|全7資格を徹底比較

Databricks認定全7試験の難易度をランキング形式で徹底比較。合格率・学習時間・出題傾向から難易度を分析。...

Databricks

Databricks資格の勉強方法|最短合格ルートと学習時間の目安

Databricks認定資格に最短で合格するための勉強方法を完全ガイド。公式リソース・問題集・学習スケジュールを徹底解説...

Databricks

Databricks Data Engineer Associate完全解説|出題範囲・問題例・合格戦略

Databricks Certified Data Engineer Associate試験を徹底解説。5つの出題ドメイ...

Databricks

Databricks Data Engineer Professional完全解説|上級試験の攻略法

Databricks Certified Data Engineer Professional試験を徹底解説。10の出題...

Databricksの記事一覧 (105件)
© 2026 NicheeLab All rights reserved.