Structured Streamingのチェックポイントは、ストリーミングクエリの進捗状態を永続化し、障害時の再開とExactly-once処理を実現する基盤です。 設計を誤るとデータの重複・欠損・ステート肥大化によるOOMが発生します。 本稿ではチェックポイントの内部構造、設計ルール、障害復旧手順、Exactly-onceの達成条件を実務に即して解説します。
チェックポイントディレクトリには4つのサブディレクトリが生成されます。 それぞれがストリーミングクエリの異なる状態を管理しています。
| ディレクトリ | 内容 | 役割 |
|---|---|---|
| offsets/ | バッチごとの読み取りオフセット | 「どこまで読んだか」を記録。バッチID→オフセットのマッピング |
| commits/ | コミット済みバッチID | 「どのバッチまで書き込み完了したか」を記録 |
| sources/ | ソース固有のメタデータ | ファイルソースなら処理済みファイルリスト、Kafkaなら不要 |
| state/ | ステートフル操作の中間状態 | ウォーターマーク、集約、ジョイン等のステートを永続化 |
/mnt/checkpoints/orders-pipeline/
├── offsets/
│ ├── 0 # バッチ0: {"kafka-topic": {"0": 100, "1": 150}}
│ ├── 1 # バッチ1: {"kafka-topic": {"0": 200, "1": 300}}
│ └── 2
├── commits/
│ ├── 0 # バッチ0のシンク書き込み完了
│ └── 1 # バッチ1のシンク書き込み完了
├── sources/
│ └── 0/
└── state/
└── 0/
└── delta/ # ステートストアのスナップショット再起動時のリカバリロジックは次の通りです。offsetsに記録があるがcommitsに対応するバッチIDがない場合、そのバッチは「読み取り済み・未コミット」と判断され、再処理されます。 commitsに記録がある場合はスキップされるため、二重書き込みが防止されます。
チェックポイントの最重要ルールは「1つのストリーミングクエリに1つの固有のチェックポイントパス」を割り当てることです。
/mnt/checkpoints/{pipeline-name}/{query-name}/ のような階層構造を推奨# クエリごとに固有のcheckpointLocationを指定
orders_query = (
orders_df.writeStream
.format("delta")
.outputMode("append")
.option("checkpointLocation", "/mnt/checkpoints/orders-pipeline/orders-to-silver")
.toTable("silver.orders")
)
payments_query = (
payments_df.writeStream
.format("delta")
.outputMode("append")
.option("checkpointLocation", "/mnt/checkpoints/orders-pipeline/payments-to-silver")
.toTable("silver.payments")
)| 保存先 | メリット | 注意点 |
|---|---|---|
| DBFS | Databricks環境で簡易に使える | ワークスペース削除時に消失。本番非推奨 |
| クラウドストレージ(S3/ADLS/GCS) | 永続性・耐久性が高い。推奨 | ネットワークレイテンシ(通常無視可能) |
| Unity Catalog Volumes | ガバナンス統合、アクセス制御 | UCが有効な環境限定 |
本番環境ではクラウドストレージまたはUC Volumesを使用し、DBFS上のチェックポイントは開発・検証用に限定します。
Structured StreamingのExactly-onceセマンティクスは、ソースのリプレイ可能性とシンクの冪等性の両方が揃って初めて成立します。
| 要素 | 条件 | Exactly-once保証 |
|---|---|---|
| ソース | リプレイ可能(Kafka、ファイル、Delta) | チェックポイントでオフセットを管理 |
| シンク | 冪等書き込み(Delta、foreachBatch+トランザクション) | 再処理時の重複を防止 |
| チェックポイント | 永続ストレージに保存、1クエリ1パス | 進捗の正確な追跡 |
Deltaテーブルをシンクにする場合、DeltaのトランザクションログとStructured Streamingのチェックポイントが協調してExactly-onceを実現します。 foreachBatchで外部システムに書き込む場合は、バッチIDを使った冪等キーの管理が必要です。
def process_batch(batch_df, batch_id):
# バッチIDを使った冪等処理の例
batch_df.write \
.format("delta") \
.mode("append") \
.saveAsTable("silver.events")
(source_df.writeStream
.foreachBatch(process_batch)
.option("checkpointLocation", "/mnt/checkpoints/events-pipeline/events-to-silver")
.start()
)ストリーミングジョブが障害で停止した場合の復旧パターンを整理します。
同じcheckpointLocationを指定してジョブを再起動します。チェックポイントから最終コミット済みバッチの次から再開されます。 コードのロジックに変更がない場合はこれで十分です。
# 1. 破損したチェックポイントを退避
dbutils.fs.mv(
"/mnt/checkpoints/orders-pipeline/orders-to-silver",
"/mnt/checkpoints/_archived/orders-to-silver-corrupted-20260327"
)
# 2. シンクテーブルの最終タイムスタンプを確認
last_ts = spark.sql("""
SELECT MAX(event_timestamp) FROM silver.orders
""").collect()[0][0]
# 3. 新チェックポイントで再開(重複防止にMERGEを使用)
(source_df.writeStream
.foreachBatch(lambda df, id: merge_deduplicate(df, id))
.option("checkpointLocation", "/mnt/checkpoints/orders-pipeline/orders-to-silver")
.start()
)ウィンドウ集約やstream-streamジョインでは、ステートがチェックポイントのstate/ディレクトリに蓄積されます。 ウォーターマークを設定しないとステートが無限に肥大化し、OOMを引き起こします。
from pyspark.sql.functions import window, col
(orders_df
.withWatermark("event_time", "10 minutes")
.groupBy(window("event_time", "5 minutes"), "product_id")
.agg({"amount": "sum"})
.writeStream
.format("delta")
.outputMode("append")
.option("checkpointLocation", "/mnt/checkpoints/orders-pipeline/windowed-agg")
.toTable("gold.orders_5min_agg")
)Data Engineer Associate / Professional
問題 1
データエンジニアがKafkaからStructured StreamingでDeltaテーブルに書き込むパイプラインを運用している。障害で停止後に同じcheckpointLocationで再起動したところ、重複なくデータが処理された。この動作を正しく説明しているのはどれか。
正解: A
Structured Streamingのチェックポイントはoffsets(読み取り済み)とcommits(書き込み完了済み)の差分から再処理範囲を特定します。Deltaシンクはトランザクショナルな書き込みにより冪等性を保証します。KafkaのコンシューマーグループオフセットはStructured Streamingでは使用せず、チェックポイントが代替します。SELECTによる照合やmergeによる重複排除は行われません。
チェックポイントを削除するとどうなりますか?
チェックポイントを削除すると、ストリーミングクエリはすべての進捗情報(処理済みオフセット、ステート)を失います。次回起動時は初期状態から再開され、ソースの全データを再処理します。Deltaシンクにappendモードで書いている場合はデータ重複が発生します。チェックポイントの削除は、ロジック変更に伴うストリーム再構築時にのみ計画的に行い、シンク側のテーブルも併せてリセットするのが原則です。
1つのチェックポイントを複数のストリーミングクエリで共有できますか?
できません。チェックポイントは1クエリ1パスが鉄則です。複数クエリが同じチェックポイントパスを参照すると、オフセット管理が競合し、データの欠損や重複、予期しないエラーが発生します。クエリごとに固有のcheckpointLocationを指定してください。
Exactly-onceが保証されないケースはどのような場合ですか?
Exactly-onceが崩れる主なケースは3つあります。(1) シンクが冪等書き込みに対応していない場合(外部API呼び出し、非トランザクショナルなDB等)。(2) foreachBatchで複数の書き込みを1トランザクションにまとめていない場合。(3) チェックポイントをリセットした後にシンクのデータをクリアせず再処理した場合。Deltaテーブルをシンクにするとトランザクショナルな書き込みが保証されるため、Exactly-onceを最も安全に実現できます。
NicheeLab編集部
データエンジニアリング・クラウド資格の専門家。Databricks・Snowflake等の認定資格を保有し、実務経験に基づいた問題作成・解説を行っています。NicheeLab運営。
Databricks資格一覧|全7試験・難易度・勉強法
Databricks認定資格全7試験の一覧・難易度・出題範囲・合格ラインを徹底解説。2026年最新版の公式試験ガイドに準...
Databricks試験の難易度ランキング|全7資格を徹底比較
Databricks認定全7試験の難易度をランキング形式で徹底比較。合格率・学習時間・出題傾向から難易度を分析。...
Databricks資格の勉強方法|最短合格ルートと学習時間の目安
Databricks認定資格に最短で合格するための勉強方法を完全ガイド。公式リソース・問題集・学習スケジュールを徹底解説...
Databricks Data Engineer Associate完全解説|出題範囲・問題例・合格戦略
Databricks Certified Data Engineer Associate試験を徹底解説。5つの出題ドメイ...
Databricks Data Engineer Professional完全解説|上級試験の攻略法
Databricks Certified Data Engineer Professional試験を徹底解説。10の出題...