Auto Loaderは、クラウドストレージ(S3・ADLS・GCS)に到着するファイルを自動検知し、Delta Lakeテーブルへ増分取り込みするDatabricks独自のデータ取り込み機能です。内部的にはStructured Streamingの上に構築されており、cloudFilesというデータソースフォーマットとして動作します。チェックポイントで処理済みファイルを追跡するため、同一ファイルの二重取り込みが起きず、数百万ファイル規模のストレージでも効率的にスケールします。
Data Engineer Associate試験ではAuto Loader関連の出題が全体の10〜15%を占め、最頻出トピックのひとつです。この記事では、基本構文・ファイル検知モード・スキーマ推論/エボリューション・COPY INTOとの比較・チェックポイント設計まで、コード例付きで解説します。
Auto Loaderは spark.readStream.format("cloudFiles") で起動します。通常のファイル読み込み(spark.read)ではなく、Structured StreamingのreadStream APIを使う点がポイントです。必須オプションは cloudFiles.format(ファイル形式)と cloudFiles.schemaLocation(推論済みスキーマの保存先)の2つです。
# Auto Loaderの基本構文(PySpark)
df = (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaLocation", "/checkpoint/schema/events")
.load("s3://my-bucket/landing/events/")
)
# Delta Tableへの書き込み
query = (df.writeStream
.format("delta")
.option("checkpointLocation", "/checkpoint/bronze/events")
.option("mergeSchema", "true")
.trigger(availableNow=True)
.toTable("bronze.raw_events")
)readStream が返すDataFrameは通常のDataFrameと同じ変換操作(filter、withColumn、selectなど)を適用できます。writeStream でDelta Tableに書き込む際は checkpointLocation の指定が必須です。このチェックポイントに「どのファイルまで処理したか」が記録されるため、ジョブを再起動しても前回の続きから処理が再開されます。
| オプション | 説明 | デフォルト |
|---|---|---|
| cloudFiles.format | 読み込むファイル形式(json, csv, parquet, avro, orc, text, binaryFile) | なし(必須) |
| cloudFiles.schemaLocation | 推論済みスキーマの保存ディレクトリ | なし(推論時は必須) |
| cloudFiles.useNotifications | ファイル通知モードの有効化 | false |
| cloudFiles.schemaEvolutionMode | スキーマ変更時の挙動(addNewColumns / failOnNewColumns / rescue / none) | addNewColumns(JSON/CSV) |
| cloudFiles.schemaHints | 推論結果に対する型の上書き指定 | なし |
| cloudFiles.maxFilesPerTrigger | 1マイクロバッチあたりの処理ファイル数上限 | 1000 |
Auto Loaderは新しいファイルの検知方法として2つのモードを持ちます。デフォルトはディレクトリリスティングで、ストリームのトリガーごとにクラウドストレージのディレクトリをリスト操作でスキャンし、チェックポイントに記録済みのファイルと比較して新規ファイルを特定します。ファイル通知モードでは、クラウドプロバイダのイベント通知サービスを購読してファイル到着イベントをキューで受信します。
| 比較項目 | ディレクトリリスティング | ファイル通知 |
|---|---|---|
| 設定方法 | デフォルト(追加設定不要) | cloudFiles.useNotifications = true |
| 検知の仕組み | ディレクトリのlistオペレーションで差分を検出 | S3 Event→SQS / Event Grid→Queue / Pub/Sub からイベントを受信 |
| 検知レイテンシ | トリガー間隔に依存(数秒〜数分) | ファイル到着後ほぼ即時(秒以下) |
| スケーラビリティ | ファイル総数が増えるとlistコストが増大 | ファイル総数に無関係(イベント数に比例) |
| クラウドリソース | 不要 | SQS/Event Grid/Pub/Subを自動作成(IAM権限が必要) |
| コスト | list APIコール数に比例 | 通知リソースの維持費(極小) |
| 推奨シーン | ファイル数が少ない開発・検証環境 | 大規模本番環境(数百万ファイル以上) |
# ディレクトリリスティング(デフォルト)
df = (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaLocation", "/checkpoint/schema")
.load("s3://bucket/landing/")
)
# ファイル通知モードに切り替え
df = (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaLocation", "/checkpoint/schema")
.option("cloudFiles.useNotifications", "true")
.load("s3://bucket/landing/")
)ディレクトリリスティングからファイル通知モードへの切り替えは、オプションを1行追加するだけで完了します。既存のチェックポイントは引き継がれるため、切り替え時にファイルの再処理は発生しません。ただし、ファイル通知モードではAuto Loaderがクラウドリソース(SQSキュー等)を自動的に作成するため、クラスタやサービスプリンシパルに適切なIAM権限を付与する必要があります。
Auto LoaderはJSON・CSV形式のファイルに対してスキーマの自動推論を行います。初回実行時にサンプルファイルを読み取ってスキーマを決定し、cloudFiles.schemaLocation で指定したディレクトリに _schemas/ として永続化します。以降の実行では保存済みスキーマを使うため、毎回の推論コストはかかりません。
スキーマ推論ではJSONの数値フィールドがLongやDoubleに推論されることがありますが、業務上はTimestamp型やDecimal型にしたいケースがあります。cloudFiles.schemaHints を使うと、推論結果の特定カラムの型を上書きできます。
# schemaHints で推論結果を上書き
df = (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaLocation", "/checkpoint/schema")
.option("cloudFiles.schemaHints",
"event_time TIMESTAMP, amount DECIMAL(10,2), user_id STRING")
.load("s3://bucket/landing/events/")
)データソースに新しいカラムが追加された場合、Auto Loaderはスキーマエボリューション機能で対応します。cloudFiles.schemaEvolutionMode で挙動を制御できます。
| モード | 新しいカラム検知時の挙動 | ユースケース |
|---|---|---|
| addNewColumns(JSON/CSVデフォルト) | ストリームを停止し、次回起動時に新カラムを追加して再開 | 新カラムを自動で取り込みたい場合 |
| failOnNewColumns | ストリームをエラーで停止 | スキーマ変更を検知してアラートを出したい場合 |
| rescue | 新カラムのデータを _rescued_data に退避(ストリーム継続) | ストリームを止めずにスキーマ不整合を記録したい場合 |
| none | 新カラムを無視(ストリーム継続) | 既知のカラムだけ取り込めればよい場合 |
addNewColumns モード使用時は、writeStream側にも .option("mergeSchema", "true") を指定することで、Delta Tableのスキーマに新カラムが自動追加されます。この設定がないと、書き込み時にスキーマ不整合エラーが発生します。
# addNewColumns + mergeSchema の組み合わせ
df = (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaLocation", "/checkpoint/schema")
.option("cloudFiles.schemaEvolutionMode", "addNewColumns")
.load("s3://bucket/landing/")
)
query = (df.writeStream
.option("checkpointLocation", "/checkpoint/bronze")
.option("mergeSchema", "true")
.trigger(availableNow=True)
.toTable("bronze.raw_events")
)Auto Loaderはデフォルトで _rescued_data カラムを生成します。スキーマに合わないデータ(型不一致、未知のカラム、壊れたレコード)が到着した場合、そのデータは破棄されずに _rescued_data 列にJSON文字列として格納されます。正常に解析できたフィールドは通常のカラムに入り、残りが退避されるため、データの欠損を防ぎながら品質の問題を後から検証できます。
# _rescued_data の確認
df.select("user_id", "event_type", "amount", "_rescued_data").show(truncate=False)出力例:
| user_id | event_type | amount | _rescued_data |
|---|---|---|---|
| U001 | purchase | 29.99 | null |
| U002 | purchase | null | {"amount":"not_a_number","extra_field":"abc"} |
# スキーマ不整合レコードの件数を監視
bad_count = df.filter("_rescued_data IS NOT NULL").count()
print(f"スキーマ不整合レコード数: {bad_count}")Medallion ArchitectureのBronze層では _rescued_data を含めて保存し、Silver層への変換時に _rescued_data IS NOT NULL のレコードを隔離テーブル(quarantine table)に分離するパターンが実務の定石です。rescuedDataColumn オプションでカラム名を変更することも、false に設定して無効化することもできます。
Auto LoaderとCOPY INTOは、どちらもクラウドストレージからDelta Tableにファイルを取り込む機能ですが、設計思想と適用シーンが異なります。DEA試験では「このシナリオでどちらを使うべきか」が頻出です。
| 比較軸 | Auto Loader(cloudFiles) | COPY INTO |
|---|---|---|
| 増分処理 | チェックポイントで自動追跡。処理済みファイルは再読み込みしない | テーブルメタデータで追跡。ファイル数が多いと追跡コストが増大 |
| スケーラビリティ | ファイル通知モードで数百万ファイルに対応 | 数千ファイルを超えるとパフォーマンスが低下 |
| ストリーミング対応 | Structured Streamingベース。連続実行とavailableNowの両方に対応 | SQLバッチコマンドのみ。ストリーミング実行は不可 |
| スキーマ推論/エボリューション | 推論・エボリューション・schemaHints・rescuedDataColumnを完備 | 推論なし。テーブルスキーマの事前定義が必須 |
| セットアップの簡易さ | PySpark/Scala APIでの実装が必要 | SQL 1文で完結(COPY INTO target FROM source) |
| Exactly-once保証 | チェックポイント + Delta Tableトランザクションで保証 | テーブルメタデータで重複排除(冪等) |
| 推奨シーン | 継続的なファイル到着、大規模データ、スキーマが変わりうるソース | 少量のアドホックロード、SQLオンリー環境、スキーマ固定のソース |
# COPY INTO の基本構文(SQL)
COPY INTO bronze.raw_events
FROM 's3://bucket/landing/events/'
FILEFORMAT = JSON
FORMAT_OPTIONS ('mergeSchema' = 'true')
COPY_OPTIONS ('mergeSchema' = 'true');
-- 比較: Auto Loader の場合(PySpark)
-- spark.readStream.format("cloudFiles")... → writeStream.toTable("bronze.raw_events")判断基準はシンプルです。「ファイルが継続的に到着するか?」「ファイル数が数千を超えるか?」「スキーマが変わる可能性があるか?」のいずれかがYesならAuto Loader、すべてNoなら COPY INTO が適切です。迷った場合はAuto Loaderを選べば、後から規模が拡大してもそのまま対応できます。
| フォーマット | cloudFiles.format値 | スキーマ推論 | 主な追加オプション |
|---|---|---|---|
| JSON | json | 対応 | multiLine, primitivesAsString |
| CSV | csv | 対応 | header, delimiter, quote, escape |
| Parquet | parquet | ファイル内蔵スキーマを使用 | mergeSchema |
| Avro | avro | ファイル内蔵スキーマを使用 | avroSchema(外部スキーマ指定) |
| ORC | orc | ファイル内蔵スキーマを使用 | mergeSchema |
| Text | text | 固定(value: STRING) | wholetext |
| Binary | binaryFile | 固定(path, modificationTime, length, content) | pathGlobFilter |
Parquet・Avro・ORCはファイル自体にスキーマが埋め込まれているため、Auto Loaderのスキーマ推論は使わずファイルのスキーマがそのまま使われます。ただし、異なるスキーマのファイルが混在する場合は cloudFiles.schemaEvolutionMode によるスキーマエボリューションが動作します。
Auto Loaderは、Structured Streamingのチェックポイント機構を利用して処理済みファイルの一覧とストリームのオフセット情報をクラウドストレージに永続化します。ジョブが失敗してもチェックポイントから再開するため、各ファイルは必ず1回だけ処理されます(Exactly-once保証)。この保証は Delta Lake のトランザクションログとの組み合わせで実現されています。
# チェックポイント設計の例
df = (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaLocation",
"dbfs:/checkpoint/autoloader/events/schema")
.load("s3://bucket/landing/events/")
)
query = (df.writeStream
.option("checkpointLocation",
"dbfs:/checkpoint/autoloader/events/stream")
.trigger(availableNow=True)
.toTable("bronze.raw_events")
)Auto LoaderはStructured Streamingベースのため、トリガーモードを選択できます。実務では trigger(availableNow=True) をDatabricks Workflowsで定期実行するパターンが最も一般的です。これは「到着済みの全ファイルを複数マイクロバッチに分割して処理し、完了したらジョブを停止する」という動作で、常時稼働クラスタが不要なためコスト効率に優れています。
# availableNow: 到着済みファイルを全て処理して停止(推奨)
query = (df.writeStream
.option("checkpointLocation", "/checkpoint/bronze")
.trigger(availableNow=True)
.toTable("bronze.events")
)
# processingTime: 一定間隔で継続処理(リアルタイムが必要な場合)
query = (df.writeStream
.option("checkpointLocation", "/checkpoint/bronze")
.trigger(processingTime="30 seconds")
.toTable("bronze.events")
)
# once=True: 非推奨(1バッチに全データを詰め込むためOOMリスクあり)
# → availableNow=True への移行をDatabricksが推奨Medallion Architecture(Bronze / Silver / Gold の3層構造)では、Auto LoaderはBronze層への取り込みで使います。Landing Zone(生ファイル)→ Bronze Table(Delta形式、_rescued_data含む生データ保存)を担当し、Silver以降の変換はDelta LiveTablesやforeachBatchによるMERGEで行います。
DEA試験ではAuto Loaderが全体の10〜15%を占め、最頻出です。DEPでもBronze層のデータ取り込みパターンとして問われます。以下のポイントを正確に整理しておきましょう。
Data Engineer Associate
問題 1
あるデータエンジニアは、S3バケットに毎時数千のJSONファイルが到着する環境でBronze Delta Tableへの取り込みパイプラインを構築している。ファイル数は今後数百万に達する見込みで、ソース側のJSONスキーマに新しいフィールドが追加される可能性がある。最も適切な取り込み方法はどれか。
正解: A
数百万ファイルが継続的に到着する環境では、Auto Loaderのファイル通知モードが最適です。イベント通知でファイル到着を即時検知するため、ファイル総数に関係なくスケールします。さらに、JSONスキーマに新フィールドが追加される可能性があるため、schemaEvolutionMode=addNewColumnsで新カラムを自動追加し、writeStream側のmergeSchema=trueでDelta Tableスキーマも自動拡張する構成が正解です。COPY INTO(B)はスキーマ推論/エボリューション機能がなく、数百万ファイルではパフォーマンスが劣化します。ディレクトリリスティング+none(C)ではファイル数増大時にlist APIコストが増大し、新カラムも無視されます。手動管理(D)はファイル追跡の仕組みがなく、大規模環境では運用不能です。
Auto LoaderとCOPY INTOの違いは何ですか?
Auto Loader(cloudFiles)はStructured Streamingベースで、チェックポイントを使ってファイルの処理状態を追跡し、新規ファイルだけを増分取り込みします。ファイル通知モードではS3 Event NotificationやAzure Event Gridでファイル到着を即時検知でき、数百万ファイル規模でもスケールします。一方、COPY INTOはSQLバッチコマンドで、テーブルメタデータにファイルの処理履歴を記録します。スキーマ推論やスキーマエボリューション機能はなく、事前にテーブルスキーマの定義が必要です。Databricksの公式ガイドラインでは、継続的なデータ取り込みにはAuto Loader、少量のアドホックロードにはCOPY INTOを推奨しています。
Auto Loaderはどのファイル形式に対応していますか?
JSON、CSV、Parquet、Avro、ORC、Text、バイナリファイル(binaryFile)の7形式に対応しています。cloudFiles.formatオプションで形式を指定し(例: .option('cloudFiles.format', 'json'))、各形式固有の解析オプションも利用可能です。JSONではmultiLine、CSVではheader・delimiter・quote、Parquet/Avro/ORCではmergeSchemaなど、Sparkの標準リーダーオプションがそのまま使えます。バイナリファイルモードでは画像やPDFなどの非構造化データをバイナリ列として取り込めます。
ディレクトリリスティングモードからファイル通知モードへ切り替える手順は?
cloudFiles.useNotificationsオプションをtrueに変更するだけで切り替えできます。既存のチェックポイントはそのまま引き継がれるため、データの再処理は発生しません。ファイル通知モードではAuto Loaderがクラウドプロバイダのイベント通知リソース(AWS: SQSキュー + S3イベント通知、Azure: Event Grid + Queue Storage、GCP: Pub/Sub)を自動作成します。Auto Loaderに適切なIAM権限(AWSの場合はs3:GetBucketNotificationConfiguration、sqs:CreateQueueなど)を付与する必要があります。ストリームを削除する際は cloudFiles.validateConfiguration=false を設定してクリーンアップ処理を行い、作成されたクラウドリソースを削除してください。
NicheeLab編集部
データエンジニアリング・クラウド資格の専門家。Databricks・Snowflake等の認定資格を保有し、実務経験に基づいた問題作成・解説を行っています。NicheeLab運営。
Databricks資格一覧|全7試験・難易度・勉強法
Databricks認定資格全7試験の一覧・難易度・出題範囲・合格ラインを徹底解説。2026年最新版の公式試験ガイドに準...
Databricks試験の難易度ランキング|全7資格を徹底比較
Databricks認定全7試験の難易度をランキング形式で徹底比較。合格率・学習時間・出題傾向から難易度を分析。...
Databricks資格の勉強方法|最短合格ルートと学習時間の目安
Databricks認定資格に最短で合格するための勉強方法を完全ガイド。公式リソース・問題集・学習スケジュールを徹底解説...
Databricks Data Engineer Associate完全解説|出題範囲・問題例・合格戦略
Databricks Certified Data Engineer Associate試験を徹底解説。5つの出題ドメイ...
Databricks Data Engineer Professional完全解説|上級試験の攻略法
Databricks Certified Data Engineer Professional試験を徹底解説。10の出題...