Structured Streaming is Apache Spark's stream processing framework. It lets you process real-time data with the same DataFrame API used for batch jobs and provides exactly-once processing guarantees. On Databricks, it powers Auto Loader, Delta Live Tables, and event-driven pipelines, and it shows up frequently on both the Data Engineer Associate and Professional exams.
This article covers everything you need with worked code examples: the basic readStream/writeStream syntax, the three output modes, how to pick a trigger mode, late-data control via watermarks, checkpoint design, and the foreachBatch pattern.
Structured Streaming treats stream data as an unbounded table that keeps growing forever. Every time new data lands in a Kafka topic or cloud storage, rows are appended to that table and the query result is incrementally updated.
You write stream queries using essentially the same DataFrame API as batch (select, filter, groupBy, join, etc.), and the Spark engine executes them as micro-batches under the hood.
# バッチ読み込み
batch_df = spark.read.format("delta").load("/data/events")
# ストリーム読み込み(readStream に変えるだけ)
stream_df = spark.readStream.format("delta").load("/data/events")
# どちらも同じ変換を適用できる
result = stream_df.filter("event_type = 'purchase'") \
.groupBy("region") \
.count()The only difference between batch and stream is the entry point (read vs readStream) and the exit point (write vs writeStream). The transformations in the middle are identical, so a common real-world workflow is to validate logic in batch first and then flip it to streaming.
A streaming pipeline has three stages: source read (readStream) → transformation → sink write (writeStream).
# 1. ソースからの読み込み(Auto Loader例)
raw = (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaLocation", "/checkpoint/schema")
.load("/data/landing/")
)
# 2. 変換
from pyspark.sql.functions import current_timestamp, col
cleaned = (raw
.filter(col("amount") > 0)
.withColumn("processed_at", current_timestamp())
)
# 3. シンクへの書き込み
query = (cleaned.writeStream
.format("delta")
.outputMode("append")
.option("checkpointLocation", "/checkpoint/bronze")
.trigger(availableNow=True)
.toTable("bronze.events")
)| Type | Source (readStream) | Sink (writeStream) |
|---|---|---|
| Delta Lake | format("delta") | format("delta") / toTable() |
| Auto Loader | format("cloudFiles") | — |
| Kafka | format("kafka") | format("kafka") |
| Custom processing | — | foreachBatch() / foreach() |
| Debugging | format("rate") | format("console") |
outputMode controls which part of the result table is written to the sink. The valid modes depend on the type of query, so the exam often asks "can this mode be used with this query?"
| Mode | Behavior | Aggregation queries | Non-aggregation queries | Typical use case |
|---|---|---|---|---|
| append | Emit only newly appended rows | Watermark required | Supported | Log ingestion, Bronze/Silver ETL |
| complete | Overwrite the entire result table each trigger | Supported | Not supported | Full aggregations for dashboards |
| update | Emit only changed rows | Supported | Supported | Aggregations that need incremental updates |
The relationship between append mode and aggregation queries is the single most common exam question. To use append mode on an aggregation query, you must configure withWatermark(). The watermark is what tells Spark when a result is final and therefore safe to emit.
Complete mode cannot be used with non-aggregation queries (plain filter or select). It has to keep the full result table, so without an aggregation the state would grow forever.
The trigger decides when each micro-batch runs. It directly drives cost, latency, and operational pattern, so it shows up constantly in both real-world work and exams.
| Trigger | Behavior | Batch count | Cost profile | Use case |
|---|---|---|---|---|
| Unspecified (default) | Start the next batch as soon as the previous one finishes | Unbounded (always on) | Cluster always on (highest cost) | When the lowest possible latency is required |
| processingTime("10 seconds") | Run a batch at the specified interval | Unbounded (always on) | Cluster always on (with idle time) | Near-real-time processing |
| availableNow=True | Process all available data across multiple batches, then stop | Multiple batches → stop | Pay only while the job runs (low cost) | Scheduled incremental batches (most common) |
| once=True (deprecated) | Process all available data in a single batch, then stop | 1 batch → stop | Pay only while the job runs | Recommended to migrate to availableNow |
# 実務で最も一般的: availableNow(Workflowsで定期実行)
query = (df.writeStream
.format("delta")
.outputMode("append")
.option("checkpointLocation", "/checkpoint/bronze")
.trigger(availableNow=True)
.toTable("bronze.events")
)
# ニアリアルタイムが必要な場合: processingTime
query = (df.writeStream
.format("delta")
.outputMode("append")
.option("checkpointLocation", "/checkpoint/bronze")
.trigger(processingTime="10 seconds")
.toTable("bronze.events")
)availableNow=True is what you use when Databricks Workflows kicks off the job every hour to process all arrived files and then stop. No always-on cluster is needed, making it the most cost-efficient option and the default trigger Databricks recommends.
The critical difference from once=True is batch splitting. once jams all data into a single batch, which causes OOM on large data sets. availableNow splits the work into multiple batches based on options like maxFilesPerTrigger, giving you much better memory efficiency and stability.
A watermark defines the threshold for how long event-time windowed aggregations will wait for late data. Spark tracks the maximum event time across all received events and sets the watermark to that time minus the threshold. Events older than the watermark are dropped, and their corresponding state is evicted from memory.
from pyspark.sql.functions import window, col
# 10分以上遅延したイベントは処理対象外
result = (stream_df
.withWatermark("event_time", "10 minutes")
.groupBy(
window("event_time", "5 minutes"),
"region"
)
.agg({"amount": "sum", "*": "count"})
)
# append モードで書き込む場合、ウォーターマークが必須
query = (result.writeStream
.format("delta")
.outputMode("append")
.option("checkpointLocation", "/checkpoint/windowed_agg")
.trigger(processingTime="30 seconds")
.toTable("gold.regional_sales")
)A larger threshold catches more late data but increases the state held in memory. Too small a threshold drops legitimate late data. In production, measure the actual lateness profile of your data before locking in a threshold.
A checkpoint durably persists the progress of a streaming query (processed offsets, intermediate aggregation state, metadata) to cloud storage. When a failure happens, the query restarts from the checkpoint, which is how Structured Streaming delivers exactly-once processing.
# checkpointLocation はwriteStreamに必須
query = (df.writeStream
.format("delta")
.outputMode("append")
.option("checkpointLocation", "dbfs:/checkpoint/pipeline_bronze")
.trigger(availableNow=True)
.toTable("bronze.events")
)| Rule | Reason |
|---|---|
| One checkpoint directory per query | Sharing across queries causes offset conflicts, leading to duplication or data loss |
| Store on cloud storage (S3/ADLS/GCS) | Local disk loses the checkpoint when a node fails |
| Deleting the checkpoint triggers full reprocessing | Once processed-offset metadata is gone, the source is re-read from the beginning |
| Schema or aggregation logic changes require a new checkpoint | Loading incompatible state errors out at query start |
Auto Loader (the cloudFiles format) is a file ingestion feature built on top of Structured Streaming. It auto-detects files arriving in cloud storage and incrementally loads them into Delta tables. To use it, simply set the readStream format to "cloudFiles".
# Auto Loader + Structured Streaming の典型パターン
raw = (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaLocation", "/checkpoint/schema")
.option("cloudFiles.schemaEvolutionMode", "addNewColumns")
.load("s3://bucket/landing/events/")
)
# Bronze Tableへの書き込み
query = (raw.writeStream
.format("delta")
.outputMode("append")
.option("checkpointLocation", "/checkpoint/bronze_events")
.option("mergeSchema", "true")
.trigger(availableNow=True)
.toTable("bronze.raw_events")
)The Auto Loader checkpoint records which files have already been processed, so the same file is never processed twice. The inferred schema is saved at cloudFiles.schemaLocation and reused on restart.
foreachBatch lets you process each micro-batch result with a custom function. Use it for things that the standard writeStream cannot do: MERGE (upsert) into a Delta table, writing to multiple tables in one batch, or calling external APIs.
from delta.tables import DeltaTable
def upsert_to_silver(batch_df, batch_id):
"""マイクロバッチごとにMERGE(upsert)を実行"""
silver = DeltaTable.forName(spark, "silver.customers")
(silver.alias("t")
.merge(batch_df.alias("s"), "t.customer_id = s.customer_id")
.whenMatchedUpdateAll()
.whenNotMatchedInsertAll()
.execute()
)
# foreachBatch でMERGEを実行
query = (df.writeStream
.foreachBatch(upsert_to_silver)
.option("checkpointLocation", "/checkpoint/silver_customers")
.trigger(availableNow=True)
.start()
)Inside foreachBatch you can use any normal DataFrame operation, so batch_df.createOrReplaceTempView() even lets you run MERGE through Spark SQL. Be aware that if an error happens inside foreachBatch, the entire batch is retried. The rule of thumb is to write idempotent logic or use operations like MERGE that produce the same result when applied repeatedly.
Data Engineer Associate / Professional
問題 1
You need to aggregate IoT events in 5-minute windows and write the results to a Delta table. Events can be up to 3 minutes late. Which implementation correctly emits results in append mode?
正解: A
Append mode on a windowed aggregation requires a watermark. Option B specifies append on an aggregation with no watermark, which throws AnalysisException. Option C works but uses complete mode and so fails the requirement of writing in append mode. Option D uses filter for time bounding, which is not a substitute for a watermark. The correct combination is 3-minute watermark + 5-minute window + append.
Data Engineer Associate
問題 2
A team uses Auto Loader to ingest data from cloud storage into a Bronze Delta table. They want Databricks Workflows to run the job hourly, process all arrived files, then stop. Which trigger setting is the most appropriate?
正解: B
availableNow=True splits available data into multiple micro-batches, processes them all, and stops — perfect for use with a job scheduler. processingTime (A) leaves the cluster running continuously, which is cost-inefficient. once=True (C) crams all data into a single batch and risks OOM on large data sets; it is deprecated. The default (D) also runs continuously and never stops.
Data Engineer Professional
問題 3
In a streaming pipeline you want to upsert changes from a source into a Silver Delta table keyed by customer_id. Which option fills in the blank correctly? def process(batch_df, batch_id): silver = DeltaTable.forName(spark, 'silver.customers') silver.alias('t').merge(batch_df.alias('s'), 't.customer_id = s.customer_id') .whenMatchedUpdateAll().whenNotMatchedInsertAll().execute() query = df.writeStream._____
正解: A
Upsert via MERGE requires foreachBatch. Run DeltaTable.merge inside foreachBatch, supply the checkpoint, and launch with start(). Option B uses format('delta') in writeStream, which only supports simple append/overwrite — MERGE cannot run there. Option C combines foreachBatch with outputMode, but outputMode is unnecessary (and errors out) when foreachBatch is used. Option D is also format('delta') append, unrelated to MERGE.
What is the difference between Structured Streaming and Spark Streaming (DStreams)?
Spark Streaming (DStreams) is the legacy RDD-based API that only supported processing-time micro-batches. Structured Streaming is built on the DataFrame/Dataset API and provides event-time windows, watermarks, exactly-once semantics, and multiple output modes. DStreams entered maintenance mode in 2023, so Structured Streaming is the only recommended option for new development. DStreams will never be the correct answer on the exam.
What is the difference between trigger(once=True) and trigger(availableNow=True)?
once processes all available data in a single micro-batch and then stops. If the data volume is large, it can blow past memory limits or time out because it cannot fit in one batch. availableNow splits the available data into multiple micro-batches, processes them, and stops once everything is done. Batch size is controlled by options like maxFilesPerTrigger, which is far more memory-efficient. Databricks has deprecated once=True and recommends migrating to availableNow=True.
What happens when you specify append mode on an aggregation query without a watermark?
An AnalysisException is thrown and the query will not start. Append mode requires emitting only finalized rows, but without a watermark Spark cannot determine when a result is final. There are three fixes: (1) add withWatermark and keep append mode, (2) switch to complete mode and emit the entire result table each trigger, or (3) switch to update mode and emit only changed rows. This constraint comes up frequently on the exam.
Practice with certification-focused question sets
無料で問題を解いてみるNicheeLab Editorial Team
NicheeLab editorial team focused on data engineering and cloud certification learning. Content is structured around practical study needs and official exam domains.
Databricks Certifications: All 7 Exams, Difficulty & Study Plan (2026)
Complete guide to all 7 Databricks certifications — Data Eng...
Databricks Exam Difficulty Ranking: All 7 Certs Compared (2026)
Every Databricks certification ranked by difficulty, with st...
Databricks Study Guide: Fastest Pass Route & Time Estimates (2026)
How to pass Databricks certifications efficiently. Official ...
Databricks Data Engineer Associate: Complete Guide (2026)
Domain-by-domain breakdown of the Databricks Certified Data ...
Databricks Data Engineer Professional: Complete Guide (2026)
Tactics for the Databricks Certified Data Engineer Professio...