Auto Loader is a Databricks-native ingestion feature that automatically detects files arriving in cloud storage (S3, ADLS, GCS) and ingests them incrementally into Delta Lake tables. Under the hood it is built on top of Structured Streaming and runs as a data source format called cloudFiles. Because it tracks processed files via checkpoints, the same file is never ingested twice, and it scales efficiently to storage with millions of files.
Auto Loader makes up 10-15% of the Data Engineer Associate exam, making it one of the most frequently tested topics. This article walks through the basic syntax, file detection modes, schema inference and evolution, the comparison with COPY INTO, and checkpoint design, all with code examples.
Auto Loader is started with spark.readStream.format("cloudFiles"). The key point is that you use the Structured Streaming readStream API rather than the standard spark.read. The two required options are cloudFiles.format (the file format) and cloudFiles.schemaLocation (where the inferred schema is stored).
# Auto Loader basic syntax (PySpark)
df = (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaLocation", "/checkpoint/schema/events")
.load("s3://my-bucket/landing/events/")
)
# Write to a Delta Table
query = (df.writeStream
.format("delta")
.option("checkpointLocation", "/checkpoint/bronze/events")
.option("mergeSchema", "true")
.trigger(availableNow=True)
.toTable("bronze.raw_events")
)The DataFrame returned by readStream supports the same transformations as a regular DataFrame (filter, withColumn, select, and so on). When writing to a Delta Table with writeStream, the checkpointLocation option is required. The checkpoint records which files have already been processed, so when the job restarts it resumes exactly where it left off.
| Option | Description | Default |
|---|---|---|
| cloudFiles.format | File format to read (json, csv, parquet, avro, orc, text, binaryFile) | None (required) |
| cloudFiles.schemaLocation | Directory where the inferred schema is stored | None (required when inferring) |
| cloudFiles.useNotifications | Enable file notification mode | false |
| cloudFiles.schemaEvolutionMode | Behavior on schema change (addNewColumns / failOnNewColumns / rescue / none) | addNewColumns (JSON/CSV) |
| cloudFiles.schemaHints | Override types in the inferred schema | None |
| cloudFiles.maxFilesPerTrigger | Maximum number of files per micro-batch | 1000 |
Auto Loader has two modes for detecting new files. The default is directory listing: on every stream trigger it lists the cloud storage directory and compares the result against the files recorded in the checkpoint to identify new ones. In file notification mode, it subscribes to the cloud provider's event notification service and receives file arrival events through a queue.
| Aspect | Directory Listing | File Notification |
|---|---|---|
| Configuration | Default (no extra config required) | cloudFiles.useNotifications = true |
| Detection mechanism | Detect new files by listing the directory | Receive events via S3 Event→SQS / Event Grid→Queue / Pub/Sub |
| Detection latency | Depends on the trigger interval (seconds to minutes) | Near-instant after file arrival (sub-second) |
| Scalability | List cost grows with total file count | Independent of total file count (scales with event volume) |
| Cloud resources | None | Auto-creates SQS / Event Grid / Pub/Sub (IAM permissions required) |
| Cost | Proportional to the number of list API calls | Cost of running the notification resources (very small) |
| Recommended for | Dev / test environments with low file counts | Large-scale production (millions of files or more) |
# Directory listing (default)
df = (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaLocation", "/checkpoint/schema")
.load("s3://bucket/landing/")
)
# Switch to file notification mode
df = (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaLocation", "/checkpoint/schema")
.option("cloudFiles.useNotifications", "true")
.load("s3://bucket/landing/")
)Switching from directory listing to file notification mode takes a single extra option line. The existing checkpoint carries over, so no files are reprocessed during the switch. However, in file notification mode Auto Loader automatically provisions cloud resources (an SQS queue and so on), so the cluster or service principal must have the appropriate IAM permissions.
Auto Loader performs automatic schema inference for JSON and CSV files. On the first run it samples files to determine the schema and persists it under _schemas/ inside the directory specified by cloudFiles.schemaLocation. Subsequent runs use the stored schema, so there is no per-run inference cost.
Schema inference may infer JSON numeric fields as Long or Double when you actually want them as Timestamp or Decimal for business reasons. cloudFiles.schemaHints lets you override the inferred type for specific columns.
# Override inferred types with schemaHints
df = (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaLocation", "/checkpoint/schema")
.option("cloudFiles.schemaHints",
"event_time TIMESTAMP, amount DECIMAL(10,2), user_id STRING")
.load("s3://bucket/landing/events/")
)When the data source gains new columns, Auto Loader's schema evolution feature handles it. You control the behavior with cloudFiles.schemaEvolutionMode.
| Mode | Behavior when a new column is detected | Use case |
|---|---|---|
| addNewColumns (default for JSON/CSV) | Stops the stream; on the next start it resumes with the new column added | When you want to ingest new columns automatically |
| failOnNewColumns | Stops the stream with an error | When you want to detect schema changes and alert on them |
| rescue | Routes new column data to _rescued_data (stream continues) | When you want to record schema mismatches without stopping the stream |
| none | Ignores new columns (stream continues) | When ingesting only known columns is sufficient |
When using addNewColumns mode, also specify .option("mergeSchema", "true") on the writeStream so that new columns are added to the Delta Table schema automatically. Without it, the write fails with a schema mismatch error.
# Combining addNewColumns with mergeSchema
df = (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaLocation", "/checkpoint/schema")
.option("cloudFiles.schemaEvolutionMode", "addNewColumns")
.load("s3://bucket/landing/")
)
query = (df.writeStream
.option("checkpointLocation", "/checkpoint/bronze")
.option("mergeSchema", "true")
.trigger(availableNow=True)
.toTable("bronze.raw_events")
)Auto Loader produces a _rescued_data column by default. When data arrives that does not match the schema (type mismatches, unknown columns, malformed records), it is not discarded; instead, it is stored as a JSON string in the _rescued_data column. Fields that parsed correctly land in the regular columns and the rest is captured in _rescued_data, so you avoid data loss and can investigate quality issues after the fact.
# Inspect _rescued_data
df.select("user_id", "event_type", "amount", "_rescued_data").show(truncate=False)Example output:
| user_id | event_type | amount | _rescued_data |
|---|---|---|---|
| U001 | purchase | 29.99 | null |
| U002 | purchase | null | {"amount":"not_a_number","extra_field":"abc"} |
# Monitor the number of schema-mismatched records
bad_count = df.filter("_rescued_data IS NOT NULL").count()
print(f"Schema-mismatched record count: {bad_count}")In the Bronze layer of the Medallion Architecture, the standard practice is to store _rescued_data alongside the data, then route records where _rescued_data IS NOT NULL into a quarantine table when transforming to Silver. You can rename the column via the rescuedDataColumn option, or disable it by setting it to false.
Auto Loader and COPY INTO both ingest files from cloud storage into Delta Tables, but they have different design philosophies and target scenarios. The DEA exam frequently asks which one to choose for a given scenario.
| Dimension | Auto Loader (cloudFiles) | COPY INTO |
|---|---|---|
| Incremental processing | Tracks state automatically via checkpoints; processed files are never re-read | Tracked via table metadata; tracking cost grows with file count |
| Scalability | Handles millions of files via file notification mode | Performance degrades past a few thousand files |
| Streaming support | Built on Structured Streaming; supports both continuous execution and availableNow | SQL batch command only; no streaming execution |
| Schema inference / evolution | Full support for inference, evolution, schemaHints, and rescuedDataColumn | No inference; the table schema must be defined in advance |
| Setup simplicity | Requires implementation in the PySpark or Scala API | One SQL statement (COPY INTO target FROM source) |
| Exactly-once guarantee | Guaranteed via checkpoints plus Delta Table transactions | Deduplicates via table metadata (idempotent) |
| Recommended for | Continuous file arrival, large-scale data, sources with evolving schemas | Small ad-hoc loads, SQL-only environments, sources with a fixed schema |
# COPY INTO basic syntax (SQL)
COPY INTO bronze.raw_events
FROM 's3://bucket/landing/events/'
FILEFORMAT = JSON
FORMAT_OPTIONS ('mergeSchema' = 'true')
COPY_OPTIONS ('mergeSchema' = 'true');
-- Compare: with Auto Loader (PySpark)
-- spark.readStream.format("cloudFiles")... -> writeStream.toTable("bronze.raw_events")The decision criteria are simple. Ask: Do files arrive continuously? Will the file count exceed a few thousand? Could the schema change? If any answer is yes, Auto Loader is the right choice; if all are no, COPY INTO is appropriate. When in doubt, pick Auto Loader — it will continue to fit even if the workload grows later.
| Format | cloudFiles.format value | Schema inference | Key additional options |
|---|---|---|---|
| JSON | json | Supported | multiLine, primitivesAsString |
| CSV | csv | Supported | header, delimiter, quote, escape |
| Parquet | parquet | Uses the file's embedded schema | mergeSchema |
| Avro | avro | Uses the file's embedded schema | avroSchema (external schema) |
| ORC | orc | Uses the file's embedded schema | mergeSchema |
| Text | text | Fixed (value: STRING) | wholetext |
| Binary | binaryFile | Fixed (path, modificationTime, length, content) | pathGlobFilter |
Parquet, Avro, and ORC carry their schema inside the file, so Auto Loader uses the file's schema directly rather than running schema inference. However, when files with differing schemas are mixed, schema evolution still kicks in via cloudFiles.schemaEvolutionMode.
Auto Loader uses Structured Streaming's checkpoint mechanism to persist the list of processed files and stream offset information to cloud storage. When a job fails it resumes from the checkpoint, so every file is processed exactly once (the exactly-once guarantee). This guarantee is realized in combination with the Delta Lake transaction log.
# Example checkpoint layout
df = (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaLocation",
"dbfs:/checkpoint/autoloader/events/schema")
.load("s3://bucket/landing/events/")
)
query = (df.writeStream
.option("checkpointLocation",
"dbfs:/checkpoint/autoloader/events/stream")
.trigger(availableNow=True)
.toTable("bronze.raw_events")
)Because Auto Loader is built on Structured Streaming, you can choose the trigger mode. In practice the most common pattern is to run trigger(availableNow=True) on a schedule via Databricks Workflows. It processes all already-arrived files across multiple micro-batches and then stops the job, eliminating the need for an always-on cluster and making it very cost-efficient.
# availableNow: process all arrived files, then stop (recommended)
query = (df.writeStream
.option("checkpointLocation", "/checkpoint/bronze")
.trigger(availableNow=True)
.toTable("bronze.events")
)
# processingTime: continuous processing on a fixed interval (when real-time is required)
query = (df.writeStream
.option("checkpointLocation", "/checkpoint/bronze")
.trigger(processingTime="30 seconds")
.toTable("bronze.events")
)
# once=True: deprecated (loads all data into a single batch with OOM risk)
# -> Databricks recommends migrating to availableNow=TrueIn the Medallion Architecture (Bronze / Silver / Gold three-layer model), Auto Loader is used for ingestion into the Bronze layer. It handles Landing Zone (raw files) → Bronze Table (raw data stored in Delta format, including _rescued_data), while transformations from Silver onward are done with Delta Live Tables or MERGE via foreachBatch.
Auto Loader makes up 10-15% of the DEA exam and is one of the most frequently tested topics. The DEP exam also tests it as a Bronze-layer ingestion pattern. Make sure the following points are crystal clear.
Data Engineer Associate
問題 1
A data engineer is building an ingestion pipeline into a Bronze Delta Table for an S3 bucket that receives thousands of JSON files every hour. The file count is expected to reach millions, and the source-side JSON schema may gain new fields. Which is the most appropriate ingestion approach?
正解: A
When millions of files arrive continuously, Auto Loader in file notification mode is the right choice. It detects file arrivals instantly via event notifications, so it scales regardless of total file count. Because the JSON schema may gain new fields, schemaEvolutionMode=addNewColumns automatically adds the new columns and mergeSchema=true on the writeStream automatically extends the Delta Table schema — that combination is the correct answer. COPY INTO (B) has no schema inference or evolution and degrades in performance with millions of files. Directory listing + none (C) drives up list API costs as the file count grows and silently ignores new columns. Manual management (D) has no file tracking and is unworkable at scale.
What is the difference between Auto Loader and COPY INTO?
Auto Loader (cloudFiles) is built on Structured Streaming and uses checkpoints to track file processing state, ingesting only new files incrementally. In file notification mode it detects file arrivals instantly via S3 Event Notifications or Azure Event Grid and scales to millions of files. COPY INTO, in contrast, is a SQL batch command that records processed files in table metadata. It has no schema inference or schema evolution, so the table schema must be defined in advance. Databricks officially recommends Auto Loader for continuous ingestion and COPY INTO for small ad-hoc loads.
Which file formats does Auto Loader support?
Seven formats are supported: JSON, CSV, Parquet, Avro, ORC, Text, and binary files (binaryFile). You specify the format with the cloudFiles.format option (e.g. .option('cloudFiles.format', 'json')), and format-specific parser options are also available. Standard Spark reader options apply directly: multiLine for JSON; header, delimiter, and quote for CSV; mergeSchema for Parquet/Avro/ORC. Binary file mode lets you ingest unstructured data such as images and PDFs as binary columns.
How do I switch from directory listing mode to file notification mode?
Just set the cloudFiles.useNotifications option to true. The existing checkpoint is carried over, so no data is reprocessed. In file notification mode, Auto Loader automatically provisions the cloud provider's event notification resources (AWS: SQS queue + S3 event notifications; Azure: Event Grid + Queue Storage; GCP: Pub/Sub). You must grant Auto Loader the appropriate IAM permissions (on AWS that includes s3:GetBucketNotificationConfiguration, sqs:CreateQueue, and so on). When deleting the stream, set cloudFiles.validateConfiguration=false to run cleanup and remove the cloud resources Auto Loader created.
Practice with certification-focused question sets
無料で問題を解いてみるNicheeLab Editorial Team
NicheeLab editorial team focused on data engineering and cloud certification learning. Content is structured around practical study needs and official exam domains.
Databricks Certifications: All 7 Exams, Difficulty & Study Plan (2026)
Complete guide to all 7 Databricks certifications — Data Eng...
Databricks Exam Difficulty Ranking: All 7 Certs Compared (2026)
Every Databricks certification ranked by difficulty, with st...
Databricks Study Guide: Fastest Pass Route & Time Estimates (2026)
How to pass Databricks certifications efficiently. Official ...
Databricks Data Engineer Associate: Complete Guide (2026)
Domain-by-domain breakdown of the Databricks Certified Data ...
Databricks Data Engineer Professional: Complete Guide (2026)
Tactics for the Databricks Certified Data Engineer Professio...