Structured Streamingは、Apache Sparkのストリーム処理フレームワークです。バッチ処理と同じDataFrame APIでリアルタイムデータを処理でき、exactly-onceのデータ処理保証を提供します。Databricksでは、Auto Loader・Delta Live Tables・イベントドリブンパイプラインの基盤として使われており、Data Engineer Associate / Professionalの両試験で頻出です。
この記事では、readStream/writeStreamの基本構文、3つの出力モード、トリガーモードの選び方、ウォーターマークによる遅延データ制御、チェックポイントの設計、foreachBatchパターンまで、コード例付きで網羅的に解説します。
Structured Streamingでは、ストリームデータを「無限に行が追加されるテーブル(Unbounded Table)」として扱います。Kafkaトピックやクラウドストレージに新しいデータが到着するたびに、このテーブルに行が追加され、クエリの結果が増分更新されます。
開発者はバッチ処理とほぼ同じDataFrame API(select、filter、groupBy、joinなど)でストリームクエリを記述でき、Sparkエンジンが内部的にマイクロバッチ処理として実行します。
# バッチ読み込み
batch_df = spark.read.format("delta").load("/data/events")
# ストリーム読み込み(readStream に変えるだけ)
stream_df = spark.readStream.format("delta").load("/data/events")
# どちらも同じ変換を適用できる
result = stream_df.filter("event_type = 'purchase'") \
.groupBy("region") \
.count()バッチとストリームの違いは入口(read vs readStream)と出口(write vs writeStream)だけです。中間の変換ロジックは共通化できるため、バッチで検証してからストリームに切り替える開発フローが実務の定番です。
ストリーム処理のパイプラインは、ソース読み込み(readStream)→ 変換 → シンク書き込み(writeStream)の3ステップで構成されます。
# 1. ソースからの読み込み(Auto Loader例)
raw = (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaLocation", "/checkpoint/schema")
.load("/data/landing/")
)
# 2. 変換
from pyspark.sql.functions import current_timestamp, col
cleaned = (raw
.filter(col("amount") > 0)
.withColumn("processed_at", current_timestamp())
)
# 3. シンクへの書き込み
query = (cleaned.writeStream
.format("delta")
.outputMode("append")
.option("checkpointLocation", "/checkpoint/bronze")
.trigger(availableNow=True)
.toTable("bronze.events")
)| 種別 | ソース(readStream) | シンク(writeStream) |
|---|---|---|
| Delta Lake | format("delta") | format("delta") / toTable() |
| Auto Loader | format("cloudFiles") | — |
| Kafka | format("kafka") | format("kafka") |
| カスタム処理 | — | foreachBatch() / foreach() |
| デバッグ用 | format("rate") | format("console") |
outputMode は、結果テーブルのどの部分をシンクに書き出すかを制御します。クエリの種類によって使えるモードが異なるため、試験では「このクエリにこのモードは使えるか?」という形式で問われます。
| モード | 挙動 | 集約クエリ | 非集約クエリ | 代表的な用途 |
|---|---|---|---|---|
| append | 新しく追加された行のみ書き出す | ウォーターマーク必須 | 使用可 | ログ取り込み、Bronze/Silver ETL |
| complete | 結果テーブル全体を毎回上書き | 使用可 | 使用不可 | ダッシュボード用の全件集計 |
| update | 変更された行のみ書き出す | 使用可 | 使用可 | 増分アップデートが必要な集計 |
appendモードと集約クエリの関係は試験で最もよく問われるポイントです。集約クエリでappendモードを使うには、必ずwithWatermark()を設定する必要があります。ウォーターマークにより結果が確定したタイミングでのみ行が出力されるためです。
completeモードは非集約クエリ(単純なfilterやselectのみ)では使えません。結果テーブル全体を保持する必要があるため、集約がないと無限にデータが蓄積されてしまうからです。
トリガーは「どのタイミングでマイクロバッチを実行するか」を決定します。コスト・レイテンシ・運用パターンに直結するため、実務でも試験でも頻出です。
| トリガー | 挙動 | バッチ数 | コスト特性 | ユースケース |
|---|---|---|---|---|
| 未指定(デフォルト) | 前バッチ完了後すぐに次バッチを開始 | 無限(継続稼働) | クラスタ常時稼働(最高コスト) | 最低レイテンシが必要な場合 |
| processingTime("10 seconds") | 指定間隔ごとにバッチを実行 | 無限(継続稼働) | クラスタ常時稼働(アイドル時間あり) | ニアリアルタイム処理 |
| availableNow=True | 到着済みデータを複数バッチで処理し停止 | 複数バッチ → 停止 | ジョブ実行時のみ課金(低コスト) | 定期バッチ増分処理(最も一般的) |
| once=True(非推奨) | 到着済みデータを1バッチで処理し停止 | 1バッチ → 停止 | ジョブ実行時のみ課金 | availableNowへの移行を推奨 |
# 実務で最も一般的: availableNow(Workflowsで定期実行)
query = (df.writeStream
.format("delta")
.outputMode("append")
.option("checkpointLocation", "/checkpoint/bronze")
.trigger(availableNow=True)
.toTable("bronze.events")
)
# ニアリアルタイムが必要な場合: processingTime
query = (df.writeStream
.format("delta")
.outputMode("append")
.option("checkpointLocation", "/checkpoint/bronze")
.trigger(processingTime="10 seconds")
.toTable("bronze.events")
)availableNow=True は、Databricks Workflowsで1時間ごとに起動し、到着済みファイルを全て処理して停止するパターンで使います。常時稼働クラスタが不要なためコスト効率が最も高く、Databricksが推奨するデフォルトのトリガーです。
once=True との決定的な違いは「バッチ分割」です。onceは全データを1バッチに詰め込むため、データ量が大きいとOOMの原因になります。availableNowはmaxFilesPerTrigger等に基づいて複数バッチに分割するため、メモリ効率が高く安定します。
ウォーターマークは、イベントタイムベースのウィンドウ集約で「遅延データをどこまで待つか」の閾値を定義する仕組みです。Sparkは受信した全イベントの最大イベントタイムを追跡し、そこから閾値を引いた時刻を「ウォーターマーク」として設定します。ウォーターマークより古いイベントは無視され、対応する状態はメモリから破棄されます。
from pyspark.sql.functions import window, col
# 10分以上遅延したイベントは処理対象外
result = (stream_df
.withWatermark("event_time", "10 minutes")
.groupBy(
window("event_time", "5 minutes"),
"region"
)
.agg({"amount": "sum", "*": "count"})
)
# append モードで書き込む場合、ウォーターマークが必須
query = (result.writeStream
.format("delta")
.outputMode("append")
.option("checkpointLocation", "/checkpoint/windowed_agg")
.trigger(processingTime="30 seconds")
.toTable("gold.regional_sales")
)閾値を大きくするほど遅延データを拾えますが、保持する状態量が増えてメモリ消費が上がります。逆に小さすぎると正当なデータを取りこぼします。実務では、データの遅延特性を計測してから閾値を決定します。
チェックポイントは、ストリームクエリの進捗状態(処理済みオフセット・集約の中間結果・メタデータ)をクラウドストレージに永続化する仕組みです。障害発生時にチェックポイントから再開することで、exactly-once処理を保証します。
# checkpointLocation はwriteStreamに必須
query = (df.writeStream
.format("delta")
.outputMode("append")
.option("checkpointLocation", "dbfs:/checkpoint/pipeline_bronze")
.trigger(availableNow=True)
.toTable("bronze.events")
)| ルール | 理由 |
|---|---|
| 1クエリにつき1つのチェックポイントディレクトリ | 複数クエリでの共有はオフセット競合が発生し、データ重複・欠損の原因になる |
| クラウドストレージに保存する(S3/ADLS/GCS) | ローカルディスクではノード障害時にチェックポイントが失われる |
| チェックポイントを削除すると全データ再処理が発生する | 処理済みオフセット情報が消失するため、ソースの先頭から再読み込みされる |
| クエリのスキーマや集約ロジックを変更する場合は新しいチェックポイントが必要 | 互換性のない状態をロードするとクエリ起動時にエラーが発生する |
Auto Loader(cloudFilesフォーマット)は、Structured Streamingの上に構築されたファイル取り込み機能です。クラウドストレージに到着したファイルを自動検知し、増分処理でDelta Tableに取り込みます。readStreamのフォーマットに"cloudFiles"を指定するだけで利用できます。
# Auto Loader + Structured Streaming の典型パターン
raw = (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaLocation", "/checkpoint/schema")
.option("cloudFiles.schemaEvolutionMode", "addNewColumns")
.load("s3://bucket/landing/events/")
)
# Bronze Tableへの書き込み
query = (raw.writeStream
.format("delta")
.outputMode("append")
.option("checkpointLocation", "/checkpoint/bronze_events")
.option("mergeSchema", "true")
.trigger(availableNow=True)
.toTable("bronze.raw_events")
)Auto Loaderのチェックポイントには「どのファイルを処理済みか」が記録されるため、同じファイルが二重に処理されることはありません。スキーマ推論結果はcloudFiles.schemaLocationに保存され、再起動時にはこの保存済みスキーマが使われます。
foreachBatchは、各マイクロバッチの結果をカスタム関数で処理する仕組みです。Delta TableへのMERGE(upsert)、複数テーブルへの同時書き込み、外部APIの呼び出しなど、標準のwriteStreamでは実現できない処理に使います。
from delta.tables import DeltaTable
def upsert_to_silver(batch_df, batch_id):
"""マイクロバッチごとにMERGE(upsert)を実行"""
silver = DeltaTable.forName(spark, "silver.customers")
(silver.alias("t")
.merge(batch_df.alias("s"), "t.customer_id = s.customer_id")
.whenMatchedUpdateAll()
.whenNotMatchedInsertAll()
.execute()
)
# foreachBatch でMERGEを実行
query = (df.writeStream
.foreachBatch(upsert_to_silver)
.option("checkpointLocation", "/checkpoint/silver_customers")
.trigger(availableNow=True)
.start()
)foreachBatch内では通常のDataFrame操作がすべて使えるため、batch_df.createOrReplaceTempView()でSparkSQL経由のMERGEも可能です。ただし、foreachBatch内でエラーが発生するとそのバッチ全体がリトライされる点に注意が必要です。冪等な処理を書くか、MERGEのように重複適用しても結果が変わらない操作を使うのが鉄則です。
Data Engineer Associate / Professional
問題 1
IoTデバイスからのイベントを5分ウィンドウで集計し、Delta Tableに書き込みたい。イベントは最大3分の遅延がありうる。appendモードで結果を書き出すための正しい実装はどれか。
正解: A
ウィンドウ集約でappendモードを使うにはウォーターマークの設定が必須です。選択肢Bはウォーターマークなしの集約にappendモードを指定しているためAnalysisExceptionが発生します。選択肢Cはcompleteモードで動作しますが「appendモードで書き出す」という要件を満たしません。選択肢Dはfilterによる時間制限であり、ウォーターマークの代替にはなりません。ウォーターマーク3分 + 5分ウィンドウ + appendが正しい組み合わせです。
Data Engineer Associate
問題 2
あるチームはAuto Loaderでクラウドストレージから Bronze Delta Table へデータを取り込んでいる。Databricks Workflowsで1時間ごとにジョブを実行し、到着済みファイルを全て処理して停止させたい。最も適切なトリガー設定はどれか。
正解: B
availableNow=Trueは到着済みデータを複数マイクロバッチに分割して処理し、全て完了したら停止します。ジョブスケジューラとの組み合わせに最適です。processingTime(A)はクラスタが常時稼働してしまいコスト非効率です。once=True(C)は1バッチに全データを詰め込むため大量データでOOMのリスクがあり、非推奨です。デフォルト(D)も常時稼働のため停止しません。
Data Engineer Professional
問題 3
ストリーミングパイプラインで、ソースから読んだ変更データをSilver Delta Tableに customer_id をキーとしてupsertしたい。以下のコードの空欄に入る最も適切な選択肢はどれか。 def process(batch_df, batch_id): silver = DeltaTable.forName(spark, 'silver.customers') silver.alias('t').merge(batch_df.alias('s'), 't.customer_id = s.customer_id') .whenMatchedUpdateAll().whenNotMatchedInsertAll().execute() query = df.writeStream._____
正解: A
MERGEによるupsertを行うにはforeachBatchが必要です。foreachBatch内でDeltaTable.mergeを実行し、チェックポイントを指定してstart()で起動します。選択肢Bはformat('delta')でwriteStreamするため単純なappend/overwriteしかできずMERGEは実行できません。選択肢CはforeachBatchにoutputModeを併用していますが、foreachBatch使用時にoutputModeは不要(指定するとエラー)。選択肢Dもformat('delta')のappendでありMERGEとは無関係です。
Structured StreamingとSpark Streaming(DStreams)の違いは何ですか?
Spark Streaming(DStreams)はRDDベースの旧APIで、処理時刻ベースのマイクロバッチのみをサポートしていました。Structured StreamingはDataFrame/Dataset APIベースで、イベントタイムウィンドウ、ウォーターマーク、exactly-onceセマンティクス、複数出力モードを提供します。DStreamsは2023年にメンテナンスモードに入っており、新規開発ではStructured Streamingが唯一の推奨選択肢です。試験でDStreamsが正解になることはありません。
trigger(once=True) と trigger(availableNow=True) の違いを教えてください。
onceは到着済みデータを1つのマイクロバッチで処理して停止します。データ量が大きい場合、単一バッチに収まらずOOMやタイムアウトの原因になります。availableNowは到着済みデータを複数のマイクロバッチに分割して処理し、全データ処理後に停止します。各バッチのサイズはmaxFilesPerTrigger等で制御でき、メモリ効率が高いです。Databricksはonce=Trueを非推奨としており、availableNow=Trueへの移行を推奨しています。
ウォーターマークなしの集約クエリでappendモードを指定するとどうなりますか?
AnalysisExceptionが発生し、クエリを開始できません。appendモードでは「結果が確定した行」だけを出力する必要がありますが、ウォーターマークがないとSparkは結果の確定タイミングを判断できません。解決策は3つあります。(1) withWatermarkを追加してappendモードを使う、(2) completeモードに切り替えて結果テーブル全体を毎回出力する、(3) updateモードに切り替えて変更行のみを出力する。試験ではこの制約が頻出です。
NicheeLab編集部
データエンジニアリング・クラウド資格の専門家。Databricks・Snowflake等の認定資格を保有し、実務経験に基づいた問題作成・解説を行っています。NicheeLab運営。
Databricks資格一覧|全7試験・難易度・勉強法
Databricks認定資格全7試験の一覧・難易度・出題範囲・合格ラインを徹底解説。2026年最新版の公式試験ガイドに準...
Databricks試験の難易度ランキング|全7資格を徹底比較
Databricks認定全7試験の難易度をランキング形式で徹底比較。合格率・学習時間・出題傾向から難易度を分析。...
Databricks資格の勉強方法|最短合格ルートと学習時間の目安
Databricks認定資格に最短で合格するための勉強方法を完全ガイド。公式リソース・問題集・学習スケジュールを徹底解説...
Databricks Data Engineer Associate完全解説|出題範囲・問題例・合格戦略
Databricks Certified Data Engineer Associate試験を徹底解説。5つの出題ドメイ...
Databricks Data Engineer Professional完全解説|上級試験の攻略法
Databricks Certified Data Engineer Professional試験を徹底解説。10の出題...