Databricks

Structured Streaming Complete Guide: Databricks Real-Time Processing

2026-03-21
更新: 2026-03-27
NicheeLab Editorial Team

Structured Streaming is Apache Spark's stream processing framework. It lets you process real-time data with the same DataFrame API used for batch jobs and provides exactly-once processing guarantees. On Databricks, it powers Auto Loader, Delta Live Tables, and event-driven pipelines, and it shows up frequently on both the Data Engineer Associate and Professional exams.

This article covers everything you need with worked code examples: the basic readStream/writeStream syntax, the three output modes, how to pick a trigger mode, late-data control via watermarks, checkpoint design, and the foreachBatch pattern.

Core Concept: An Unbounded Table

Structured Streaming treats stream data as an unbounded table that keeps growing forever. Every time new data lands in a Kafka topic or cloud storage, rows are appended to that table and the query result is incrementally updated.

You write stream queries using essentially the same DataFrame API as batch (select, filter, groupBy, join, etc.), and the Spark engine executes them as micro-batches under the hood.

# バッチ読み込み
batch_df = spark.read.format("delta").load("/data/events")

# ストリーム読み込み(readStream に変えるだけ)
stream_df = spark.readStream.format("delta").load("/data/events")

# どちらも同じ変換を適用できる
result = stream_df.filter("event_type = 'purchase'") \
    .groupBy("region") \
    .count()

The only difference between batch and stream is the entry point (read vs readStream) and the exit point (write vs writeStream). The transformations in the middle are identical, so a common real-world workflow is to validate logic in batch first and then flip it to streaming.

Basic readStream and writeStream Syntax

A streaming pipeline has three stages: source read (readStream) → transformation → sink write (writeStream).

# 1. ソースからの読み込み(Auto Loader例)
raw = (spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "json")
    .option("cloudFiles.schemaLocation", "/checkpoint/schema")
    .load("/data/landing/")
)

# 2. 変換
from pyspark.sql.functions import current_timestamp, col
cleaned = (raw
    .filter(col("amount") > 0)
    .withColumn("processed_at", current_timestamp())
)

# 3. シンクへの書き込み
query = (cleaned.writeStream
    .format("delta")
    .outputMode("append")
    .option("checkpointLocation", "/checkpoint/bronze")
    .trigger(availableNow=True)
    .toTable("bronze.events")
)

Major Sources and Sinks

TypeSource (readStream)Sink (writeStream)
Delta Lakeformat("delta")format("delta") / toTable()
Auto Loaderformat("cloudFiles")
Kafkaformat("kafka")format("kafka")
Custom processingforeachBatch() / foreach()
Debuggingformat("rate")format("console")

The Three Output Modes

outputMode controls which part of the result table is written to the sink. The valid modes depend on the type of query, so the exam often asks "can this mode be used with this query?"

ModeBehaviorAggregation queriesNon-aggregation queriesTypical use case
appendEmit only newly appended rowsWatermark requiredSupportedLog ingestion, Bronze/Silver ETL
completeOverwrite the entire result table each triggerSupportedNot supportedFull aggregations for dashboards
updateEmit only changed rowsSupportedSupportedAggregations that need incremental updates

The relationship between append mode and aggregation queries is the single most common exam question. To use append mode on an aggregation query, you must configure withWatermark(). The watermark is what tells Spark when a result is final and therefore safe to emit.

Complete mode cannot be used with non-aggregation queries (plain filter or select). It has to keep the full result table, so without an aggregation the state would grow forever.

Trigger Mode Comparison

The trigger decides when each micro-batch runs. It directly drives cost, latency, and operational pattern, so it shows up constantly in both real-world work and exams.

TriggerBehaviorBatch countCost profileUse case
Unspecified (default)Start the next batch as soon as the previous one finishesUnbounded (always on)Cluster always on (highest cost)When the lowest possible latency is required
processingTime("10 seconds")Run a batch at the specified intervalUnbounded (always on)Cluster always on (with idle time)Near-real-time processing
availableNow=TrueProcess all available data across multiple batches, then stopMultiple batches → stopPay only while the job runs (low cost)Scheduled incremental batches (most common)
once=True (deprecated)Process all available data in a single batch, then stop1 batch → stopPay only while the job runsRecommended to migrate to availableNow
# 実務で最も一般的: availableNow(Workflowsで定期実行)
query = (df.writeStream
    .format("delta")
    .outputMode("append")
    .option("checkpointLocation", "/checkpoint/bronze")
    .trigger(availableNow=True)
    .toTable("bronze.events")
)

# ニアリアルタイムが必要な場合: processingTime
query = (df.writeStream
    .format("delta")
    .outputMode("append")
    .option("checkpointLocation", "/checkpoint/bronze")
    .trigger(processingTime="10 seconds")
    .toTable("bronze.events")
)

availableNow=True is what you use when Databricks Workflows kicks off the job every hour to process all arrived files and then stop. No always-on cluster is needed, making it the most cost-efficient option and the default trigger Databricks recommends.

The critical difference from once=True is batch splitting. once jams all data into a single batch, which causes OOM on large data sets. availableNow splits the work into multiple batches based on options like maxFilesPerTrigger, giving you much better memory efficiency and stability.

Watermarks (withWatermark)

A watermark defines the threshold for how long event-time windowed aggregations will wait for late data. Spark tracks the maximum event time across all received events and sets the watermark to that time minus the threshold. Events older than the watermark are dropped, and their corresponding state is evicted from memory.

from pyspark.sql.functions import window, col

# 10分以上遅延したイベントは処理対象外
result = (stream_df
    .withWatermark("event_time", "10 minutes")
    .groupBy(
        window("event_time", "5 minutes"),
        "region"
    )
    .agg({"amount": "sum", "*": "count"})
)

# append モードで書き込む場合、ウォーターマークが必須
query = (result.writeStream
    .format("delta")
    .outputMode("append")
    .option("checkpointLocation", "/checkpoint/windowed_agg")
    .trigger(processingTime="30 seconds")
    .toTable("gold.regional_sales")
)

Watermark Behavior Example

  • Max event time is 12:30 and the watermark threshold is 10 minutes → watermark = 12:20
  • Events older than 12:20 (e.g., 12:15) are dropped on arrival
  • State for windows ending at or before 12:20 (e.g., 12:10-12:15) is evicted from memory

A larger threshold catches more late data but increases the state held in memory. Too small a threshold drops legitimate late data. In production, measure the actual lateness profile of your data before locking in a threshold.

Checkpoints

A checkpoint durably persists the progress of a streaming query (processed offsets, intermediate aggregation state, metadata) to cloud storage. When a failure happens, the query restarts from the checkpoint, which is how Structured Streaming delivers exactly-once processing.

# checkpointLocation はwriteStreamに必須
query = (df.writeStream
    .format("delta")
    .outputMode("append")
    .option("checkpointLocation", "dbfs:/checkpoint/pipeline_bronze")
    .trigger(availableNow=True)
    .toTable("bronze.events")
)

Checkpoint Design Rules

RuleReason
One checkpoint directory per querySharing across queries causes offset conflicts, leading to duplication or data loss
Store on cloud storage (S3/ADLS/GCS)Local disk loses the checkpoint when a node fails
Deleting the checkpoint triggers full reprocessingOnce processed-offset metadata is gone, the source is re-read from the beginning
Schema or aggregation logic changes require a new checkpointLoading incompatible state errors out at query start

Combining with Auto Loader

Auto Loader (the cloudFiles format) is a file ingestion feature built on top of Structured Streaming. It auto-detects files arriving in cloud storage and incrementally loads them into Delta tables. To use it, simply set the readStream format to "cloudFiles".

# Auto Loader + Structured Streaming の典型パターン
raw = (spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "json")
    .option("cloudFiles.schemaLocation", "/checkpoint/schema")
    .option("cloudFiles.schemaEvolutionMode", "addNewColumns")
    .load("s3://bucket/landing/events/")
)

# Bronze Tableへの書き込み
query = (raw.writeStream
    .format("delta")
    .outputMode("append")
    .option("checkpointLocation", "/checkpoint/bronze_events")
    .option("mergeSchema", "true")
    .trigger(availableNow=True)
    .toTable("bronze.raw_events")
)

The Auto Loader checkpoint records which files have already been processed, so the same file is never processed twice. The inferred schema is saved at cloudFiles.schemaLocation and reused on restart.

The foreachBatch Pattern

foreachBatch lets you process each micro-batch result with a custom function. Use it for things that the standard writeStream cannot do: MERGE (upsert) into a Delta table, writing to multiple tables in one batch, or calling external APIs.

from delta.tables import DeltaTable

def upsert_to_silver(batch_df, batch_id):
    """マイクロバッチごとにMERGE(upsert)を実行"""
    silver = DeltaTable.forName(spark, "silver.customers")

    (silver.alias("t")
        .merge(batch_df.alias("s"), "t.customer_id = s.customer_id")
        .whenMatchedUpdateAll()
        .whenNotMatchedInsertAll()
        .execute()
    )

# foreachBatch でMERGEを実行
query = (df.writeStream
    .foreachBatch(upsert_to_silver)
    .option("checkpointLocation", "/checkpoint/silver_customers")
    .trigger(availableNow=True)
    .start()
)

Typical foreachBatch Use Cases

  • Upsert via MERGE: Update existing records while inserting new ones from streamed data. A staple of CDC (Change Data Capture) pipelines.
  • Writing to multiple tables: Fan out a single micro-batch to multiple Delta tables or Kafka topics.
  • External system integration: Send each batch to a REST API or write it to a database via JDBC.

Inside foreachBatch you can use any normal DataFrame operation, so batch_df.createOrReplaceTempView() even lets you run MERGE through Spark SQL. Be aware that if an error happens inside foreachBatch, the entire batch is retried. The rule of thumb is to write idempotent logic or use operations like MERGE that produce the same result when applied repeatedly.

Key Exam Points

  • readStream / writeStream syntax: The only differences from batch are the entry and exit points. Transformation logic is identical.
  • Output mode constraints: Append mode on aggregations → watermark required. Complete mode on non-aggregations → not allowed.
  • availableNow vs once: availableNow splits into multiple batches (recommended); once uses a single batch (deprecated).
  • Watermarks: Define the late-data threshold. Data older than the threshold is dropped and the associated state is freed.
  • Checkpoints: The foundation of exactly-once guarantees. One directory per query. Deleting it triggers full reprocessing.
  • foreachBatch: Use for MERGE or multi-sink writes. Idempotency is mandatory.
  • Auto Loader integration: The cloudFiles format is built on top of Structured Streaming.

Sample Questions

Data Engineer Associate / Professional

問題 1

You need to aggregate IoT events in 5-minute windows and write the results to a Delta table. Events can be up to 3 minutes late. Which implementation correctly emits results in append mode?

  1. df.withWatermark("event_time", "3 minutes").groupBy(window("event_time", "5 minutes")).count() written with outputMode("append")
  2. df.groupBy(window("event_time", "5 minutes")).count() written with outputMode("append")
  3. df.withWatermark("event_time", "3 minutes").groupBy(window("event_time", "5 minutes")).count() written with outputMode("complete")
  4. df.filter("event_time > current_timestamp() - interval 3 minutes").groupBy("device_id").count() written with outputMode("append")

正解: A

Append mode on a windowed aggregation requires a watermark. Option B specifies append on an aggregation with no watermark, which throws AnalysisException. Option C works but uses complete mode and so fails the requirement of writing in append mode. Option D uses filter for time bounding, which is not a substitute for a watermark. The correct combination is 3-minute watermark + 5-minute window + append.

Data Engineer Associate

問題 2

A team uses Auto Loader to ingest data from cloud storage into a Bronze Delta table. They want Databricks Workflows to run the job hourly, process all arrived files, then stop. Which trigger setting is the most appropriate?

  1. trigger(processingTime="1 hour")
  2. trigger(availableNow=True)
  3. trigger(once=True)
  4. No trigger specified (default)

正解: B

availableNow=True splits available data into multiple micro-batches, processes them all, and stops — perfect for use with a job scheduler. processingTime (A) leaves the cluster running continuously, which is cost-inefficient. once=True (C) crams all data into a single batch and risks OOM on large data sets; it is deprecated. The default (D) also runs continuously and never stops.

Data Engineer Professional

問題 3

In a streaming pipeline you want to upsert changes from a source into a Silver Delta table keyed by customer_id. Which option fills in the blank correctly?

def process(batch_df, batch_id):
 silver = DeltaTable.forName(spark, 'silver.customers')
 silver.alias('t').merge(batch_df.alias('s'), 't.customer_id = s.customer_id')
 .whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()

query = df.writeStream._____

  1. .foreachBatch(process).option("checkpointLocation", "/ckpt").start()
  2. .format("delta").outputMode("update").option("checkpointLocation", "/ckpt").toTable("silver.customers")
  3. .foreachBatch(process).outputMode("complete").start()
  4. .format("delta").outputMode("append").option("checkpointLocation", "/ckpt").toTable("silver.customers")

正解: A

Upsert via MERGE requires foreachBatch. Run DeltaTable.merge inside foreachBatch, supply the checkpoint, and launch with start(). Option B uses format('delta') in writeStream, which only supports simple append/overwrite — MERGE cannot run there. Option C combines foreachBatch with outputMode, but outputMode is unnecessary (and errors out) when foreachBatch is used. Option D is also format('delta') append, unrelated to MERGE.

Frequently Asked Questions

What is the difference between Structured Streaming and Spark Streaming (DStreams)?

Spark Streaming (DStreams) is the legacy RDD-based API that only supported processing-time micro-batches. Structured Streaming is built on the DataFrame/Dataset API and provides event-time windows, watermarks, exactly-once semantics, and multiple output modes. DStreams entered maintenance mode in 2023, so Structured Streaming is the only recommended option for new development. DStreams will never be the correct answer on the exam.

What is the difference between trigger(once=True) and trigger(availableNow=True)?

once processes all available data in a single micro-batch and then stops. If the data volume is large, it can blow past memory limits or time out because it cannot fit in one batch. availableNow splits the available data into multiple micro-batches, processes them, and stops once everything is done. Batch size is controlled by options like maxFilesPerTrigger, which is far more memory-efficient. Databricks has deprecated once=True and recommends migrating to availableNow=True.

What happens when you specify append mode on an aggregation query without a watermark?

An AnalysisException is thrown and the query will not start. Append mode requires emitting only finalized rows, but without a watermark Spark cannot determine when a result is final. There are three fixes: (1) add withWatermark and keep append mode, (2) switch to complete mode and emit the entire result table each trigger, or (3) switch to update mode and emit only changed rows. This constraint comes up frequently on the exam.

Check what you learned with practice questions

Practice with certification-focused question sets

無料で問題を解いてみる
Author

NicheeLab Editorial Team

NicheeLab editorial team focused on data engineering and cloud certification learning. Content is structured around practical study needs and official exam domains.


Related articles
Databricks

Databricks Certifications: All 7 Exams, Difficulty & Study Plan (2026)

Complete guide to all 7 Databricks certifications — Data Eng...

Databricks

Databricks Exam Difficulty Ranking: All 7 Certs Compared (2026)

Every Databricks certification ranked by difficulty, with st...

Databricks

Databricks Study Guide: Fastest Pass Route & Time Estimates (2026)

How to pass Databricks certifications efficiently. Official ...

Databricks

Databricks Data Engineer Associate: Complete Guide (2026)

Domain-by-domain breakdown of the Databricks Certified Data ...

Databricks

Databricks Data Engineer Professional: Complete Guide (2026)

Tactics for the Databricks Certified Data Engineer Professio...

Browse all Databricks articles (110)
© 2026 NicheeLab All rights reserved.