Databricks

Databricks Auto Loader: Complete Guide to Automated Data Ingestion

2026-03-21
更新: 2026-03-27
NicheeLab Editorial Team

Auto Loader is a Databricks-native ingestion feature that automatically detects files arriving in cloud storage (S3, ADLS, GCS) and ingests them incrementally into Delta Lake tables. Under the hood it is built on top of Structured Streaming and runs as a data source format called cloudFiles. Because it tracks processed files via checkpoints, the same file is never ingested twice, and it scales efficiently to storage with millions of files.

Auto Loader makes up 10-15% of the Data Engineer Associate exam, making it one of the most frequently tested topics. This article walks through the basic syntax, file detection modes, schema inference and evolution, the comparison with COPY INTO, and checkpoint design, all with code examples.

Auto Loader Basic Syntax (cloudFiles)

Auto Loader is started with spark.readStream.format("cloudFiles"). The key point is that you use the Structured Streaming readStream API rather than the standard spark.read. The two required options are cloudFiles.format (the file format) and cloudFiles.schemaLocation (where the inferred schema is stored).

# Auto Loader basic syntax (PySpark)
df = (spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "json")
    .option("cloudFiles.schemaLocation", "/checkpoint/schema/events")
    .load("s3://my-bucket/landing/events/")
)

# Write to a Delta Table
query = (df.writeStream
    .format("delta")
    .option("checkpointLocation", "/checkpoint/bronze/events")
    .option("mergeSchema", "true")
    .trigger(availableNow=True)
    .toTable("bronze.raw_events")
)

The DataFrame returned by readStream supports the same transformations as a regular DataFrame (filter, withColumn, select, and so on). When writing to a Delta Table with writeStream, the checkpointLocation option is required. The checkpoint records which files have already been processed, so when the job restarts it resumes exactly where it left off.

Common cloudFiles Options

OptionDescriptionDefault
cloudFiles.formatFile format to read (json, csv, parquet, avro, orc, text, binaryFile)None (required)
cloudFiles.schemaLocationDirectory where the inferred schema is storedNone (required when inferring)
cloudFiles.useNotificationsEnable file notification modefalse
cloudFiles.schemaEvolutionModeBehavior on schema change (addNewColumns / failOnNewColumns / rescue / none)addNewColumns (JSON/CSV)
cloudFiles.schemaHintsOverride types in the inferred schemaNone
cloudFiles.maxFilesPerTriggerMaximum number of files per micro-batch1000

File Detection Modes: Directory Listing vs File Notification

Auto Loader has two modes for detecting new files. The default is directory listing: on every stream trigger it lists the cloud storage directory and compares the result against the files recorded in the checkpoint to identify new ones. In file notification mode, it subscribes to the cloud provider's event notification service and receives file arrival events through a queue.

AspectDirectory ListingFile Notification
ConfigurationDefault (no extra config required)cloudFiles.useNotifications = true
Detection mechanismDetect new files by listing the directoryReceive events via S3 Event→SQS / Event Grid→Queue / Pub/Sub
Detection latencyDepends on the trigger interval (seconds to minutes)Near-instant after file arrival (sub-second)
ScalabilityList cost grows with total file countIndependent of total file count (scales with event volume)
Cloud resourcesNoneAuto-creates SQS / Event Grid / Pub/Sub (IAM permissions required)
CostProportional to the number of list API callsCost of running the notification resources (very small)
Recommended forDev / test environments with low file countsLarge-scale production (millions of files or more)
# Directory listing (default)
df = (spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "json")
    .option("cloudFiles.schemaLocation", "/checkpoint/schema")
    .load("s3://bucket/landing/")
)

# Switch to file notification mode
df = (spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "json")
    .option("cloudFiles.schemaLocation", "/checkpoint/schema")
    .option("cloudFiles.useNotifications", "true")
    .load("s3://bucket/landing/")
)

Switching from directory listing to file notification mode takes a single extra option line. The existing checkpoint carries over, so no files are reprocessed during the switch. However, in file notification mode Auto Loader automatically provisions cloud resources (an SQS queue and so on), so the cluster or service principal must have the appropriate IAM permissions.

Schema Inference and Schema Evolution

Auto Loader performs automatic schema inference for JSON and CSV files. On the first run it samples files to determine the schema and persists it under _schemas/ inside the directory specified by cloudFiles.schemaLocation. Subsequent runs use the stored schema, so there is no per-run inference cost.

schemaHints: Overriding Inferred Types

Schema inference may infer JSON numeric fields as Long or Double when you actually want them as Timestamp or Decimal for business reasons. cloudFiles.schemaHints lets you override the inferred type for specific columns.

# Override inferred types with schemaHints
df = (spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "json")
    .option("cloudFiles.schemaLocation", "/checkpoint/schema")
    .option("cloudFiles.schemaHints",
            "event_time TIMESTAMP, amount DECIMAL(10,2), user_id STRING")
    .load("s3://bucket/landing/events/")
)

Schema Evolution: Handling New Columns

When the data source gains new columns, Auto Loader's schema evolution feature handles it. You control the behavior with cloudFiles.schemaEvolutionMode.

ModeBehavior when a new column is detectedUse case
addNewColumns (default for JSON/CSV)Stops the stream; on the next start it resumes with the new column addedWhen you want to ingest new columns automatically
failOnNewColumnsStops the stream with an errorWhen you want to detect schema changes and alert on them
rescueRoutes new column data to _rescued_data (stream continues)When you want to record schema mismatches without stopping the stream
noneIgnores new columns (stream continues)When ingesting only known columns is sufficient

When using addNewColumns mode, also specify .option("mergeSchema", "true") on the writeStream so that new columns are added to the Delta Table schema automatically. Without it, the write fails with a schema mismatch error.

# Combining addNewColumns with mergeSchema
df = (spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "json")
    .option("cloudFiles.schemaLocation", "/checkpoint/schema")
    .option("cloudFiles.schemaEvolutionMode", "addNewColumns")
    .load("s3://bucket/landing/")
)

query = (df.writeStream
    .option("checkpointLocation", "/checkpoint/bronze")
    .option("mergeSchema", "true")
    .trigger(availableNow=True)
    .toTable("bronze.raw_events")
)

Rescued Data Column: Capturing Schema Mismatches

Auto Loader produces a _rescued_data column by default. When data arrives that does not match the schema (type mismatches, unknown columns, malformed records), it is not discarded; instead, it is stored as a JSON string in the _rescued_data column. Fields that parsed correctly land in the regular columns and the rest is captured in _rescued_data, so you avoid data loss and can investigate quality issues after the fact.

# Inspect _rescued_data
df.select("user_id", "event_type", "amount", "_rescued_data").show(truncate=False)

Example output:

user_idevent_typeamount_rescued_data
U001purchase29.99null
U002purchasenull{"amount":"not_a_number","extra_field":"abc"}
# Monitor the number of schema-mismatched records
bad_count = df.filter("_rescued_data IS NOT NULL").count()
print(f"Schema-mismatched record count: {bad_count}")

In the Bronze layer of the Medallion Architecture, the standard practice is to store _rescued_data alongside the data, then route records where _rescued_data IS NOT NULL into a quarantine table when transforming to Silver. You can rename the column via the rescuedDataColumn option, or disable it by setting it to false.

Auto Loader vs COPY INTO: Comparison Table

Auto Loader and COPY INTO both ingest files from cloud storage into Delta Tables, but they have different design philosophies and target scenarios. The DEA exam frequently asks which one to choose for a given scenario.

DimensionAuto Loader (cloudFiles)COPY INTO
Incremental processingTracks state automatically via checkpoints; processed files are never re-readTracked via table metadata; tracking cost grows with file count
ScalabilityHandles millions of files via file notification modePerformance degrades past a few thousand files
Streaming supportBuilt on Structured Streaming; supports both continuous execution and availableNowSQL batch command only; no streaming execution
Schema inference / evolutionFull support for inference, evolution, schemaHints, and rescuedDataColumnNo inference; the table schema must be defined in advance
Setup simplicityRequires implementation in the PySpark or Scala APIOne SQL statement (COPY INTO target FROM source)
Exactly-once guaranteeGuaranteed via checkpoints plus Delta Table transactionsDeduplicates via table metadata (idempotent)
Recommended forContinuous file arrival, large-scale data, sources with evolving schemasSmall ad-hoc loads, SQL-only environments, sources with a fixed schema
# COPY INTO basic syntax (SQL)
COPY INTO bronze.raw_events
FROM 's3://bucket/landing/events/'
FILEFORMAT = JSON
FORMAT_OPTIONS ('mergeSchema' = 'true')
COPY_OPTIONS ('mergeSchema' = 'true');

-- Compare: with Auto Loader (PySpark)
-- spark.readStream.format("cloudFiles")... -> writeStream.toTable("bronze.raw_events")

The decision criteria are simple. Ask: Do files arrive continuously? Will the file count exceed a few thousand? Could the schema change? If any answer is yes, Auto Loader is the right choice; if all are no, COPY INTO is appropriate. When in doubt, pick Auto Loader — it will continue to fit even if the workload grows later.

Supported File Formats

FormatcloudFiles.format valueSchema inferenceKey additional options
JSONjsonSupportedmultiLine, primitivesAsString
CSVcsvSupportedheader, delimiter, quote, escape
ParquetparquetUses the file's embedded schemamergeSchema
AvroavroUses the file's embedded schemaavroSchema (external schema)
ORCorcUses the file's embedded schemamergeSchema
TexttextFixed (value: STRING)wholetext
BinarybinaryFileFixed (path, modificationTime, length, content)pathGlobFilter

Parquet, Avro, and ORC carry their schema inside the file, so Auto Loader uses the file's schema directly rather than running schema inference. However, when files with differing schemas are mixed, schema evolution still kicks in via cloudFiles.schemaEvolutionMode.

Checkpoints and the Exactly-once Guarantee

Auto Loader uses Structured Streaming's checkpoint mechanism to persist the list of processed files and stream offset information to cloud storage. When a job fails it resumes from the checkpoint, so every file is processed exactly once (the exactly-once guarantee). This guarantee is realized in combination with the Delta Lake transaction log.

  • One checkpoint directory per stream. Sharing a directory across streams causes offset conflicts.
  • Deleting the checkpoint reprocesses every file. The processing history is lost and every file in the source directory is read again.
  • Store checkpoints in cloud storage (dbfs:/, s3://, abfss://, and so on). On local disk they are lost when a node fails.
  • Keep schemaLocation and checkpointLocation in separate directories. schemaLocation stores the inferred schema; checkpointLocation stores the stream's processing state. They serve different purposes.
# Example checkpoint layout
df = (spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "json")
    .option("cloudFiles.schemaLocation",
            "dbfs:/checkpoint/autoloader/events/schema")
    .load("s3://bucket/landing/events/")
)

query = (df.writeStream
    .option("checkpointLocation",
            "dbfs:/checkpoint/autoloader/events/stream")
    .trigger(availableNow=True)
    .toTable("bronze.raw_events")
)

Trigger Modes and Use in the Medallion Architecture

Because Auto Loader is built on Structured Streaming, you can choose the trigger mode. In practice the most common pattern is to run trigger(availableNow=True) on a schedule via Databricks Workflows. It processes all already-arrived files across multiple micro-batches and then stops the job, eliminating the need for an always-on cluster and making it very cost-efficient.

# availableNow: process all arrived files, then stop (recommended)
query = (df.writeStream
    .option("checkpointLocation", "/checkpoint/bronze")
    .trigger(availableNow=True)
    .toTable("bronze.events")
)

# processingTime: continuous processing on a fixed interval (when real-time is required)
query = (df.writeStream
    .option("checkpointLocation", "/checkpoint/bronze")
    .trigger(processingTime="30 seconds")
    .toTable("bronze.events")
)

# once=True: deprecated (loads all data into a single batch with OOM risk)
# -> Databricks recommends migrating to availableNow=True

In the Medallion Architecture (Bronze / Silver / Gold three-layer model), Auto Loader is used for ingestion into the Bronze layer. It handles Landing Zone (raw files) → Bronze Table (raw data stored in Delta format, including _rescued_data), while transformations from Silver onward are done with Delta Live Tables or MERGE via foreachBatch.

  • Landing Zone → Bronze: Incremental ingestion via Auto Loader (cloudFiles), storing _rescued_data alongside the data
  • Bronze → Silver: Cleanse and deduplicate via Delta Streaming, DLT, or foreachBatch + MERGE
  • Silver → Gold: Compute business metrics via batch aggregation, materialized views, or SQL

What the Exam Tests (DEA / DEP)

Auto Loader makes up 10-15% of the DEA exam and is one of the most frequently tested topics. The DEP exam also tests it as a Bronze-layer ingestion pattern. Make sure the following points are crystal clear.

  • Choosing between Auto Loader and COPY INTO: Many files arriving continuously → Auto Loader. Small, one-off loads → COPY INTO. If the scenario says "files arrive hourly and the count reaches millions," Auto Loader is the answer.
  • Directory listing vs file notification: The default is directory listing. At scale, use file notification mode (useNotifications=true). Switching is a single option change — no checkpoint rebuild required.
  • Default schema evolution behavior: For JSON/CSV the default is addNewColumns (stop the stream on a new column, then add it automatically on restart). Without mergeSchema=true on the writeStream, the write fails.
  • _rescued_data column: Captures type mismatches and unknown columns as a JSON string. Enabled by default. Useful for Bronze-layer quality checks.
  • trigger(availableNow=True): Process all arrived files across multiple batches, then stop. The standard pattern for scheduled Workflows execution. once=True is deprecated (everything into one batch creates OOM risk).
  • Role of cloudFiles.schemaLocation: Where the inferred schema is persisted. Keep it in a separate directory from checkpointLocation.
  • Impact of deleting the checkpoint: The processing history is lost and every file in the source is reprocessed.

Check Your Understanding

Data Engineer Associate

問題 1

A data engineer is building an ingestion pipeline into a Bronze Delta Table for an S3 bucket that receives thousands of JSON files every hour. The file count is expected to reach millions, and the source-side JSON schema may gain new fields. Which is the most appropriate ingestion approach?

  1. Use Auto Loader in file notification mode (cloudFiles.useNotifications=true), set cloudFiles.schemaEvolutionMode to addNewColumns, and specify mergeSchema=true on the writeStream
  2. Run a COPY INTO command hourly as a Workflows job and enable mergeSchema via FORMAT_OPTIONS
  3. Use Auto Loader in directory listing mode (the default) and set cloudFiles.schemaEvolutionMode to none
  4. Use spark.read.format('json') for batch reads and manage duplicate files manually with a filename list

正解: A

When millions of files arrive continuously, Auto Loader in file notification mode is the right choice. It detects file arrivals instantly via event notifications, so it scales regardless of total file count. Because the JSON schema may gain new fields, schemaEvolutionMode=addNewColumns automatically adds the new columns and mergeSchema=true on the writeStream automatically extends the Delta Table schema — that combination is the correct answer. COPY INTO (B) has no schema inference or evolution and degrades in performance with millions of files. Directory listing + none (C) drives up list API costs as the file count grows and silently ignores new columns. Manual management (D) has no file tracking and is unworkable at scale.

Frequently Asked Questions

What is the difference between Auto Loader and COPY INTO?

Auto Loader (cloudFiles) is built on Structured Streaming and uses checkpoints to track file processing state, ingesting only new files incrementally. In file notification mode it detects file arrivals instantly via S3 Event Notifications or Azure Event Grid and scales to millions of files. COPY INTO, in contrast, is a SQL batch command that records processed files in table metadata. It has no schema inference or schema evolution, so the table schema must be defined in advance. Databricks officially recommends Auto Loader for continuous ingestion and COPY INTO for small ad-hoc loads.

Which file formats does Auto Loader support?

Seven formats are supported: JSON, CSV, Parquet, Avro, ORC, Text, and binary files (binaryFile). You specify the format with the cloudFiles.format option (e.g. .option('cloudFiles.format', 'json')), and format-specific parser options are also available. Standard Spark reader options apply directly: multiLine for JSON; header, delimiter, and quote for CSV; mergeSchema for Parquet/Avro/ORC. Binary file mode lets you ingest unstructured data such as images and PDFs as binary columns.

How do I switch from directory listing mode to file notification mode?

Just set the cloudFiles.useNotifications option to true. The existing checkpoint is carried over, so no data is reprocessed. In file notification mode, Auto Loader automatically provisions the cloud provider's event notification resources (AWS: SQS queue + S3 event notifications; Azure: Event Grid + Queue Storage; GCP: Pub/Sub). You must grant Auto Loader the appropriate IAM permissions (on AWS that includes s3:GetBucketNotificationConfiguration, sqs:CreateQueue, and so on). When deleting the stream, set cloudFiles.validateConfiguration=false to run cleanup and remove the cloud resources Auto Loader created.

Check what you learned with practice questions

Practice with certification-focused question sets

無料で問題を解いてみる
Author

NicheeLab Editorial Team

NicheeLab editorial team focused on data engineering and cloud certification learning. Content is structured around practical study needs and official exam domains.


Related articles
Databricks

Databricks Certifications: All 7 Exams, Difficulty & Study Plan (2026)

Complete guide to all 7 Databricks certifications — Data Eng...

Databricks

Databricks Exam Difficulty Ranking: All 7 Certs Compared (2026)

Every Databricks certification ranked by difficulty, with st...

Databricks

Databricks Study Guide: Fastest Pass Route & Time Estimates (2026)

How to pass Databricks certifications efficiently. Official ...

Databricks

Databricks Data Engineer Associate: Complete Guide (2026)

Domain-by-domain breakdown of the Databricks Certified Data ...

Databricks

Databricks Data Engineer Professional: Complete Guide (2026)

Tactics for the Databricks Certified Data Engineer Professio...

Browse all Databricks articles (110)
© 2026 NicheeLab All rights reserved.