Databricks

Structured Streaming完全ガイド|Databricksリアルタイム処理入門

2026-03-21
更新: 2026-03-27
NicheeLab編集部

Structured Streamingは、Apache Sparkのストリーム処理フレームワークです。バッチ処理と同じDataFrame APIでリアルタイムデータを処理でき、exactly-onceのデータ処理保証を提供します。Databricksでは、Auto Loader・Delta Live Tables・イベントドリブンパイプラインの基盤として使われており、Data Engineer Associate / Professionalの両試験で頻出です。

この記事では、readStream/writeStreamの基本構文、3つの出力モード、トリガーモードの選び方、ウォーターマークによる遅延データ制御、チェックポイントの設計、foreachBatchパターンまで、コード例付きで網羅的に解説します。

基本概念:無限に追加されるテーブル

Structured Streamingでは、ストリームデータを「無限に行が追加されるテーブル(Unbounded Table)」として扱います。Kafkaトピックやクラウドストレージに新しいデータが到着するたびに、このテーブルに行が追加され、クエリの結果が増分更新されます。

開発者はバッチ処理とほぼ同じDataFrame API(select、filter、groupBy、joinなど)でストリームクエリを記述でき、Sparkエンジンが内部的にマイクロバッチ処理として実行します。

# バッチ読み込み
batch_df = spark.read.format("delta").load("/data/events")

# ストリーム読み込み(readStream に変えるだけ)
stream_df = spark.readStream.format("delta").load("/data/events")

# どちらも同じ変換を適用できる
result = stream_df.filter("event_type = 'purchase'") \
    .groupBy("region") \
    .count()

バッチとストリームの違いは入口(read vs readStream)と出口(write vs writeStream)だけです。中間の変換ロジックは共通化できるため、バッチで検証してからストリームに切り替える開発フローが実務の定番です。

readStream と writeStream の基本構文

ストリーム処理のパイプラインは、ソース読み込み(readStream)→ 変換 → シンク書き込み(writeStream)の3ステップで構成されます。

# 1. ソースからの読み込み(Auto Loader例)
raw = (spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "json")
    .option("cloudFiles.schemaLocation", "/checkpoint/schema")
    .load("/data/landing/")
)

# 2. 変換
from pyspark.sql.functions import current_timestamp, col
cleaned = (raw
    .filter(col("amount") > 0)
    .withColumn("processed_at", current_timestamp())
)

# 3. シンクへの書き込み
query = (cleaned.writeStream
    .format("delta")
    .outputMode("append")
    .option("checkpointLocation", "/checkpoint/bronze")
    .trigger(availableNow=True)
    .toTable("bronze.events")
)

主なソースとシンク

種別ソース(readStream)シンク(writeStream)
Delta Lakeformat("delta")format("delta") / toTable()
Auto Loaderformat("cloudFiles")
Kafkaformat("kafka")format("kafka")
カスタム処理foreachBatch() / foreach()
デバッグ用format("rate")format("console")

3つの出力モード(Output Mode)

outputMode は、結果テーブルのどの部分をシンクに書き出すかを制御します。クエリの種類によって使えるモードが異なるため、試験では「このクエリにこのモードは使えるか?」という形式で問われます。

モード挙動集約クエリ非集約クエリ代表的な用途
append新しく追加された行のみ書き出すウォーターマーク必須使用可ログ取り込み、Bronze/Silver ETL
complete結果テーブル全体を毎回上書き使用可使用不可ダッシュボード用の全件集計
update変更された行のみ書き出す使用可使用可増分アップデートが必要な集計

appendモードと集約クエリの関係は試験で最もよく問われるポイントです。集約クエリでappendモードを使うには、必ずwithWatermark()を設定する必要があります。ウォーターマークにより結果が確定したタイミングでのみ行が出力されるためです。

completeモードは非集約クエリ(単純なfilterやselectのみ)では使えません。結果テーブル全体を保持する必要があるため、集約がないと無限にデータが蓄積されてしまうからです。

トリガーモードの比較

トリガーは「どのタイミングでマイクロバッチを実行するか」を決定します。コスト・レイテンシ・運用パターンに直結するため、実務でも試験でも頻出です。

トリガー挙動バッチ数コスト特性ユースケース
未指定(デフォルト)前バッチ完了後すぐに次バッチを開始無限(継続稼働)クラスタ常時稼働(最高コスト)最低レイテンシが必要な場合
processingTime("10 seconds")指定間隔ごとにバッチを実行無限(継続稼働)クラスタ常時稼働(アイドル時間あり)ニアリアルタイム処理
availableNow=True到着済みデータを複数バッチで処理し停止複数バッチ → 停止ジョブ実行時のみ課金(低コスト)定期バッチ増分処理(最も一般的)
once=True(非推奨)到着済みデータを1バッチで処理し停止1バッチ → 停止ジョブ実行時のみ課金availableNowへの移行を推奨
# 実務で最も一般的: availableNow(Workflowsで定期実行)
query = (df.writeStream
    .format("delta")
    .outputMode("append")
    .option("checkpointLocation", "/checkpoint/bronze")
    .trigger(availableNow=True)
    .toTable("bronze.events")
)

# ニアリアルタイムが必要な場合: processingTime
query = (df.writeStream
    .format("delta")
    .outputMode("append")
    .option("checkpointLocation", "/checkpoint/bronze")
    .trigger(processingTime="10 seconds")
    .toTable("bronze.events")
)

availableNow=True は、Databricks Workflowsで1時間ごとに起動し、到着済みファイルを全て処理して停止するパターンで使います。常時稼働クラスタが不要なためコスト効率が最も高く、Databricksが推奨するデフォルトのトリガーです。

once=True との決定的な違いは「バッチ分割」です。onceは全データを1バッチに詰め込むため、データ量が大きいとOOMの原因になります。availableNowはmaxFilesPerTrigger等に基づいて複数バッチに分割するため、メモリ効率が高く安定します。

ウォーターマーク(withWatermark)

ウォーターマークは、イベントタイムベースのウィンドウ集約で「遅延データをどこまで待つか」の閾値を定義する仕組みです。Sparkは受信した全イベントの最大イベントタイムを追跡し、そこから閾値を引いた時刻を「ウォーターマーク」として設定します。ウォーターマークより古いイベントは無視され、対応する状態はメモリから破棄されます。

from pyspark.sql.functions import window, col

# 10分以上遅延したイベントは処理対象外
result = (stream_df
    .withWatermark("event_time", "10 minutes")
    .groupBy(
        window("event_time", "5 minutes"),
        "region"
    )
    .agg({"amount": "sum", "*": "count"})
)

# append モードで書き込む場合、ウォーターマークが必須
query = (result.writeStream
    .format("delta")
    .outputMode("append")
    .option("checkpointLocation", "/checkpoint/windowed_agg")
    .trigger(processingTime="30 seconds")
    .toTable("gold.regional_sales")
)

ウォーターマークの動作例

  • 最大イベントタイムが 12:30 でウォーターマーク閾値が10分 → ウォーターマーク = 12:20
  • 12:20 より古いイベント(例: 12:15)が到着しても無視される
  • 12:20 以前のウィンドウ(例: 12:10-12:15)の状態がメモリから解放される

閾値を大きくするほど遅延データを拾えますが、保持する状態量が増えてメモリ消費が上がります。逆に小さすぎると正当なデータを取りこぼします。実務では、データの遅延特性を計測してから閾値を決定します。

チェックポイント

チェックポイントは、ストリームクエリの進捗状態(処理済みオフセット・集約の中間結果・メタデータ)をクラウドストレージに永続化する仕組みです。障害発生時にチェックポイントから再開することで、exactly-once処理を保証します。

# checkpointLocation はwriteStreamに必須
query = (df.writeStream
    .format("delta")
    .outputMode("append")
    .option("checkpointLocation", "dbfs:/checkpoint/pipeline_bronze")
    .trigger(availableNow=True)
    .toTable("bronze.events")
)

チェックポイントの設計ルール

ルール理由
1クエリにつき1つのチェックポイントディレクトリ複数クエリでの共有はオフセット競合が発生し、データ重複・欠損の原因になる
クラウドストレージに保存する(S3/ADLS/GCS)ローカルディスクではノード障害時にチェックポイントが失われる
チェックポイントを削除すると全データ再処理が発生する処理済みオフセット情報が消失するため、ソースの先頭から再読み込みされる
クエリのスキーマや集約ロジックを変更する場合は新しいチェックポイントが必要互換性のない状態をロードするとクエリ起動時にエラーが発生する

Auto Loaderとの組み合わせ

Auto Loader(cloudFilesフォーマット)は、Structured Streamingの上に構築されたファイル取り込み機能です。クラウドストレージに到着したファイルを自動検知し、増分処理でDelta Tableに取り込みます。readStreamのフォーマットに"cloudFiles"を指定するだけで利用できます。

# Auto Loader + Structured Streaming の典型パターン
raw = (spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "json")
    .option("cloudFiles.schemaLocation", "/checkpoint/schema")
    .option("cloudFiles.schemaEvolutionMode", "addNewColumns")
    .load("s3://bucket/landing/events/")
)

# Bronze Tableへの書き込み
query = (raw.writeStream
    .format("delta")
    .outputMode("append")
    .option("checkpointLocation", "/checkpoint/bronze_events")
    .option("mergeSchema", "true")
    .trigger(availableNow=True)
    .toTable("bronze.raw_events")
)

Auto Loaderのチェックポイントには「どのファイルを処理済みか」が記録されるため、同じファイルが二重に処理されることはありません。スキーマ推論結果はcloudFiles.schemaLocationに保存され、再起動時にはこの保存済みスキーマが使われます。

foreachBatch パターン

foreachBatchは、各マイクロバッチの結果をカスタム関数で処理する仕組みです。Delta TableへのMERGE(upsert)、複数テーブルへの同時書き込み、外部APIの呼び出しなど、標準のwriteStreamでは実現できない処理に使います。

from delta.tables import DeltaTable

def upsert_to_silver(batch_df, batch_id):
    """マイクロバッチごとにMERGE(upsert)を実行"""
    silver = DeltaTable.forName(spark, "silver.customers")

    (silver.alias("t")
        .merge(batch_df.alias("s"), "t.customer_id = s.customer_id")
        .whenMatchedUpdateAll()
        .whenNotMatchedInsertAll()
        .execute()
    )

# foreachBatch でMERGEを実行
query = (df.writeStream
    .foreachBatch(upsert_to_silver)
    .option("checkpointLocation", "/checkpoint/silver_customers")
    .trigger(availableNow=True)
    .start()
)

foreachBatch の典型ユースケース

  • MERGEによるupsert: ストリームデータで既存レコードを更新しつつ新規レコードを挿入。CDC(Change Data Capture)パイプラインで定番
  • 複数テーブルへの書き込み: 1つのマイクロバッチを複数のDeltaテーブルやKafkaトピックに同時出力
  • 外部システム連携: バッチ単位でREST APIに送信したり、データベースにJDBC書き込みしたりする

foreachBatch内では通常のDataFrame操作がすべて使えるため、batch_df.createOrReplaceTempView()でSparkSQL経由のMERGEも可能です。ただし、foreachBatch内でエラーが発生するとそのバッチ全体がリトライされる点に注意が必要です。冪等な処理を書くか、MERGEのように重複適用しても結果が変わらない操作を使うのが鉄則です。

試験で問われるポイント

  • readStream / writeStream の構文: バッチとの違いは入口・出口だけ。中間の変換ロジックは共通
  • 出力モードの制約: 集約クエリでappendモード → ウォーターマーク必須。非集約でcompleteモード → 使用不可
  • availableNow vs once: availableNowは複数バッチに分割(推奨)、onceは単一バッチ(非推奨)
  • ウォーターマーク: 遅延データの許容閾値を定義。閾値超過のデータは無視され、状態がメモリから解放される
  • チェックポイント: exactly-once保証の基盤。1クエリ1ディレクトリ。削除すると全件再処理
  • foreachBatch: MERGEや複数シンクへの書き込みに使う。冪等性の確保が必須
  • Auto Loader連携: cloudFilesフォーマットはStructured Streamingの上に構築されている

サンプル問題

Data Engineer Associate / Professional

問題 1

IoTデバイスからのイベントを5分ウィンドウで集計し、Delta Tableに書き込みたい。イベントは最大3分の遅延がありうる。appendモードで結果を書き出すための正しい実装はどれか。

  1. df.withWatermark("event_time", "3 minutes").groupBy(window("event_time", "5 minutes")).count() を outputMode("append") で書き込む
  2. df.groupBy(window("event_time", "5 minutes")).count() を outputMode("append") で書き込む
  3. df.withWatermark("event_time", "3 minutes").groupBy(window("event_time", "5 minutes")).count() を outputMode("complete") で書き込む
  4. df.filter("event_time > current_timestamp() - interval 3 minutes").groupBy("device_id").count() を outputMode("append") で書き込む

正解: A

ウィンドウ集約でappendモードを使うにはウォーターマークの設定が必須です。選択肢Bはウォーターマークなしの集約にappendモードを指定しているためAnalysisExceptionが発生します。選択肢Cはcompleteモードで動作しますが「appendモードで書き出す」という要件を満たしません。選択肢Dはfilterによる時間制限であり、ウォーターマークの代替にはなりません。ウォーターマーク3分 + 5分ウィンドウ + appendが正しい組み合わせです。

Data Engineer Associate

問題 2

あるチームはAuto Loaderでクラウドストレージから Bronze Delta Table へデータを取り込んでいる。Databricks Workflowsで1時間ごとにジョブを実行し、到着済みファイルを全て処理して停止させたい。最も適切なトリガー設定はどれか。

  1. trigger(processingTime="1 hour")
  2. trigger(availableNow=True)
  3. trigger(once=True)
  4. トリガーを指定しない(デフォルト)

正解: B

availableNow=Trueは到着済みデータを複数マイクロバッチに分割して処理し、全て完了したら停止します。ジョブスケジューラとの組み合わせに最適です。processingTime(A)はクラスタが常時稼働してしまいコスト非効率です。once=True(C)は1バッチに全データを詰め込むため大量データでOOMのリスクがあり、非推奨です。デフォルト(D)も常時稼働のため停止しません。

Data Engineer Professional

問題 3

ストリーミングパイプラインで、ソースから読んだ変更データをSilver Delta Tableに customer_id をキーとしてupsertしたい。以下のコードの空欄に入る最も適切な選択肢はどれか。

def process(batch_df, batch_id):
 silver = DeltaTable.forName(spark, 'silver.customers')
 silver.alias('t').merge(batch_df.alias('s'), 't.customer_id = s.customer_id')
 .whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()

query = df.writeStream._____

  1. .foreachBatch(process).option("checkpointLocation", "/ckpt").start()
  2. .format("delta").outputMode("update").option("checkpointLocation", "/ckpt").toTable("silver.customers")
  3. .foreachBatch(process).outputMode("complete").start()
  4. .format("delta").outputMode("append").option("checkpointLocation", "/ckpt").toTable("silver.customers")

正解: A

MERGEによるupsertを行うにはforeachBatchが必要です。foreachBatch内でDeltaTable.mergeを実行し、チェックポイントを指定してstart()で起動します。選択肢Bはformat('delta')でwriteStreamするため単純なappend/overwriteしかできずMERGEは実行できません。選択肢CはforeachBatchにoutputModeを併用していますが、foreachBatch使用時にoutputModeは不要(指定するとエラー)。選択肢Dもformat('delta')のappendでありMERGEとは無関係です。

よくある質問

Structured StreamingとSpark Streaming(DStreams)の違いは何ですか?

Spark Streaming(DStreams)はRDDベースの旧APIで、処理時刻ベースのマイクロバッチのみをサポートしていました。Structured StreamingはDataFrame/Dataset APIベースで、イベントタイムウィンドウ、ウォーターマーク、exactly-onceセマンティクス、複数出力モードを提供します。DStreamsは2023年にメンテナンスモードに入っており、新規開発ではStructured Streamingが唯一の推奨選択肢です。試験でDStreamsが正解になることはありません。

trigger(once=True) と trigger(availableNow=True) の違いを教えてください。

onceは到着済みデータを1つのマイクロバッチで処理して停止します。データ量が大きい場合、単一バッチに収まらずOOMやタイムアウトの原因になります。availableNowは到着済みデータを複数のマイクロバッチに分割して処理し、全データ処理後に停止します。各バッチのサイズはmaxFilesPerTrigger等で制御でき、メモリ効率が高いです。Databricksはonce=Trueを非推奨としており、availableNow=Trueへの移行を推奨しています。

ウォーターマークなしの集約クエリでappendモードを指定するとどうなりますか?

AnalysisExceptionが発生し、クエリを開始できません。appendモードでは「結果が確定した行」だけを出力する必要がありますが、ウォーターマークがないとSparkは結果の確定タイミングを判断できません。解決策は3つあります。(1) withWatermarkを追加してappendモードを使う、(2) completeモードに切り替えて結果テーブル全体を毎回出力する、(3) updateモードに切り替えて変更行のみを出力する。試験ではこの制約が頻出です。

この記事で学んだ内容を問題で確認しましょう

16,000問以上の問題で実力チェック

無料で問題を解いてみる
この記事の著者

NicheeLab編集部

データエンジニアリング・クラウド資格の専門家。Databricks・Snowflake等の認定資格を保有し、実務経験に基づいた問題作成・解説を行っています。NicheeLab運営。


関連記事
Databricks

Databricks資格一覧|全7試験・難易度・勉強法

Databricks認定資格全7試験の一覧・難易度・出題範囲・合格ラインを徹底解説。2026年最新版の公式試験ガイドに準...

Databricks

Databricks試験の難易度ランキング|全7資格を徹底比較

Databricks認定全7試験の難易度をランキング形式で徹底比較。合格率・学習時間・出題傾向から難易度を分析。...

Databricks

Databricks資格の勉強方法|最短合格ルートと学習時間の目安

Databricks認定資格に最短で合格するための勉強方法を完全ガイド。公式リソース・問題集・学習スケジュールを徹底解説...

Databricks

Databricks Data Engineer Associate完全解説|出題範囲・問題例・合格戦略

Databricks Certified Data Engineer Associate試験を徹底解説。5つの出題ドメイ...

Databricks

Databricks Data Engineer Professional完全解説|上級試験の攻略法

Databricks Certified Data Engineer Professional試験を徹底解説。10の出題...

Databricksの記事一覧 (109件)
© 2026 NicheeLab All rights reserved.