Databricks

Auto Loaderとは?Databricksのデータ取り込み自動化

2026-03-21
更新: 2026-03-27
NicheeLab編集部

Auto Loaderは、クラウドストレージ(S3・ADLS・GCS)に到着するファイルを自動検知し、Delta Lakeテーブルへ増分取り込みするDatabricks独自のデータ取り込み機能です。内部的にはStructured Streamingの上に構築されており、cloudFilesというデータソースフォーマットとして動作します。チェックポイントで処理済みファイルを追跡するため、同一ファイルの二重取り込みが起きず、数百万ファイル規模のストレージでも効率的にスケールします。

Data Engineer Associate試験ではAuto Loader関連の出題が全体の10〜15%を占め、最頻出トピックのひとつです。この記事では、基本構文・ファイル検知モード・スキーマ推論/エボリューション・COPY INTOとの比較・チェックポイント設計まで、コード例付きで解説します。

Auto Loaderの基本構文(cloudFiles)

Auto Loaderは spark.readStream.format("cloudFiles") で起動します。通常のファイル読み込み(spark.read)ではなく、Structured StreamingのreadStream APIを使う点がポイントです。必須オプションは cloudFiles.format(ファイル形式)と cloudFiles.schemaLocation(推論済みスキーマの保存先)の2つです。

# Auto Loaderの基本構文(PySpark)
df = (spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "json")
    .option("cloudFiles.schemaLocation", "/checkpoint/schema/events")
    .load("s3://my-bucket/landing/events/")
)

# Delta Tableへの書き込み
query = (df.writeStream
    .format("delta")
    .option("checkpointLocation", "/checkpoint/bronze/events")
    .option("mergeSchema", "true")
    .trigger(availableNow=True)
    .toTable("bronze.raw_events")
)

readStream が返すDataFrameは通常のDataFrameと同じ変換操作(filter、withColumn、selectなど)を適用できます。writeStream でDelta Tableに書き込む際は checkpointLocation の指定が必須です。このチェックポイントに「どのファイルまで処理したか」が記録されるため、ジョブを再起動しても前回の続きから処理が再開されます。

よく使うcloudFilesオプション

オプション説明デフォルト
cloudFiles.format読み込むファイル形式(json, csv, parquet, avro, orc, text, binaryFile)なし(必須)
cloudFiles.schemaLocation推論済みスキーマの保存ディレクトリなし(推論時は必須)
cloudFiles.useNotificationsファイル通知モードの有効化false
cloudFiles.schemaEvolutionModeスキーマ変更時の挙動(addNewColumns / failOnNewColumns / rescue / none)addNewColumns(JSON/CSV)
cloudFiles.schemaHints推論結果に対する型の上書き指定なし
cloudFiles.maxFilesPerTrigger1マイクロバッチあたりの処理ファイル数上限1000

ファイル検知モード:Directory Listing vs File Notification

Auto Loaderは新しいファイルの検知方法として2つのモードを持ちます。デフォルトはディレクトリリスティングで、ストリームのトリガーごとにクラウドストレージのディレクトリをリスト操作でスキャンし、チェックポイントに記録済みのファイルと比較して新規ファイルを特定します。ファイル通知モードでは、クラウドプロバイダのイベント通知サービスを購読してファイル到着イベントをキューで受信します。

比較項目ディレクトリリスティングファイル通知
設定方法デフォルト(追加設定不要)cloudFiles.useNotifications = true
検知の仕組みディレクトリのlistオペレーションで差分を検出S3 Event→SQS / Event Grid→Queue / Pub/Sub からイベントを受信
検知レイテンシトリガー間隔に依存(数秒〜数分)ファイル到着後ほぼ即時(秒以下)
スケーラビリティファイル総数が増えるとlistコストが増大ファイル総数に無関係(イベント数に比例)
クラウドリソース不要SQS/Event Grid/Pub/Subを自動作成(IAM権限が必要)
コストlist APIコール数に比例通知リソースの維持費(極小)
推奨シーンファイル数が少ない開発・検証環境大規模本番環境(数百万ファイル以上)
# ディレクトリリスティング(デフォルト)
df = (spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "json")
    .option("cloudFiles.schemaLocation", "/checkpoint/schema")
    .load("s3://bucket/landing/")
)

# ファイル通知モードに切り替え
df = (spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "json")
    .option("cloudFiles.schemaLocation", "/checkpoint/schema")
    .option("cloudFiles.useNotifications", "true")
    .load("s3://bucket/landing/")
)

ディレクトリリスティングからファイル通知モードへの切り替えは、オプションを1行追加するだけで完了します。既存のチェックポイントは引き継がれるため、切り替え時にファイルの再処理は発生しません。ただし、ファイル通知モードではAuto Loaderがクラウドリソース(SQSキュー等)を自動的に作成するため、クラスタやサービスプリンシパルに適切なIAM権限を付与する必要があります。

スキーマ推論とスキーマエボリューション

Auto LoaderはJSON・CSV形式のファイルに対してスキーマの自動推論を行います。初回実行時にサンプルファイルを読み取ってスキーマを決定し、cloudFiles.schemaLocation で指定したディレクトリに _schemas/ として永続化します。以降の実行では保存済みスキーマを使うため、毎回の推論コストはかかりません。

schemaHints:推論結果の型を上書きする

スキーマ推論ではJSONの数値フィールドがLongやDoubleに推論されることがありますが、業務上はTimestamp型やDecimal型にしたいケースがあります。cloudFiles.schemaHints を使うと、推論結果の特定カラムの型を上書きできます。

# schemaHints で推論結果を上書き
df = (spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "json")
    .option("cloudFiles.schemaLocation", "/checkpoint/schema")
    .option("cloudFiles.schemaHints",
            "event_time TIMESTAMP, amount DECIMAL(10,2), user_id STRING")
    .load("s3://bucket/landing/events/")
)

スキーマエボリューション:新しいカラムへの対応

データソースに新しいカラムが追加された場合、Auto Loaderはスキーマエボリューション機能で対応します。cloudFiles.schemaEvolutionMode で挙動を制御できます。

モード新しいカラム検知時の挙動ユースケース
addNewColumns(JSON/CSVデフォルト)ストリームを停止し、次回起動時に新カラムを追加して再開新カラムを自動で取り込みたい場合
failOnNewColumnsストリームをエラーで停止スキーマ変更を検知してアラートを出したい場合
rescue新カラムのデータを _rescued_data に退避(ストリーム継続)ストリームを止めずにスキーマ不整合を記録したい場合
none新カラムを無視(ストリーム継続)既知のカラムだけ取り込めればよい場合

addNewColumns モード使用時は、writeStream側にも .option("mergeSchema", "true") を指定することで、Delta Tableのスキーマに新カラムが自動追加されます。この設定がないと、書き込み時にスキーマ不整合エラーが発生します。

# addNewColumns + mergeSchema の組み合わせ
df = (spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "json")
    .option("cloudFiles.schemaLocation", "/checkpoint/schema")
    .option("cloudFiles.schemaEvolutionMode", "addNewColumns")
    .load("s3://bucket/landing/")
)

query = (df.writeStream
    .option("checkpointLocation", "/checkpoint/bronze")
    .option("mergeSchema", "true")
    .trigger(availableNow=True)
    .toTable("bronze.raw_events")
)

Rescued Data Column:スキーマ不整合データの退避

Auto Loaderはデフォルトで _rescued_data カラムを生成します。スキーマに合わないデータ(型不一致、未知のカラム、壊れたレコード)が到着した場合、そのデータは破棄されずに _rescued_data 列にJSON文字列として格納されます。正常に解析できたフィールドは通常のカラムに入り、残りが退避されるため、データの欠損を防ぎながら品質の問題を後から検証できます。

# _rescued_data の確認
df.select("user_id", "event_type", "amount", "_rescued_data").show(truncate=False)

出力例:

user_idevent_typeamount_rescued_data
U001purchase29.99null
U002purchasenull{"amount":"not_a_number","extra_field":"abc"}
# スキーマ不整合レコードの件数を監視
bad_count = df.filter("_rescued_data IS NOT NULL").count()
print(f"スキーマ不整合レコード数: {bad_count}")

Medallion ArchitectureのBronze層では _rescued_data を含めて保存し、Silver層への変換時に _rescued_data IS NOT NULL のレコードを隔離テーブル(quarantine table)に分離するパターンが実務の定石です。rescuedDataColumn オプションでカラム名を変更することも、false に設定して無効化することもできます。

Auto Loader vs COPY INTO 比較表

Auto LoaderとCOPY INTOは、どちらもクラウドストレージからDelta Tableにファイルを取り込む機能ですが、設計思想と適用シーンが異なります。DEA試験では「このシナリオでどちらを使うべきか」が頻出です。

比較軸Auto Loader(cloudFiles)COPY INTO
増分処理チェックポイントで自動追跡。処理済みファイルは再読み込みしないテーブルメタデータで追跡。ファイル数が多いと追跡コストが増大
スケーラビリティファイル通知モードで数百万ファイルに対応数千ファイルを超えるとパフォーマンスが低下
ストリーミング対応Structured Streamingベース。連続実行とavailableNowの両方に対応SQLバッチコマンドのみ。ストリーミング実行は不可
スキーマ推論/エボリューション推論・エボリューション・schemaHints・rescuedDataColumnを完備推論なし。テーブルスキーマの事前定義が必須
セットアップの簡易さPySpark/Scala APIでの実装が必要SQL 1文で完結(COPY INTO target FROM source)
Exactly-once保証チェックポイント + Delta Tableトランザクションで保証テーブルメタデータで重複排除(冪等)
推奨シーン継続的なファイル到着、大規模データ、スキーマが変わりうるソース少量のアドホックロード、SQLオンリー環境、スキーマ固定のソース
# COPY INTO の基本構文(SQL)
COPY INTO bronze.raw_events
FROM 's3://bucket/landing/events/'
FILEFORMAT = JSON
FORMAT_OPTIONS ('mergeSchema' = 'true')
COPY_OPTIONS ('mergeSchema' = 'true');

-- 比較: Auto Loader の場合(PySpark)
-- spark.readStream.format("cloudFiles")... → writeStream.toTable("bronze.raw_events")

判断基準はシンプルです。「ファイルが継続的に到着するか?」「ファイル数が数千を超えるか?」「スキーマが変わる可能性があるか?」のいずれかがYesならAuto Loader、すべてNoなら COPY INTO が適切です。迷った場合はAuto Loaderを選べば、後から規模が拡大してもそのまま対応できます。

対応ファイルフォーマット

フォーマットcloudFiles.format値スキーマ推論主な追加オプション
JSONjson対応multiLine, primitivesAsString
CSVcsv対応header, delimiter, quote, escape
Parquetparquetファイル内蔵スキーマを使用mergeSchema
Avroavroファイル内蔵スキーマを使用avroSchema(外部スキーマ指定)
ORCorcファイル内蔵スキーマを使用mergeSchema
Texttext固定(value: STRING)wholetext
BinarybinaryFile固定(path, modificationTime, length, content)pathGlobFilter

Parquet・Avro・ORCはファイル自体にスキーマが埋め込まれているため、Auto Loaderのスキーマ推論は使わずファイルのスキーマがそのまま使われます。ただし、異なるスキーマのファイルが混在する場合は cloudFiles.schemaEvolutionMode によるスキーマエボリューションが動作します。

チェックポイントとExactly-once保証

Auto Loaderは、Structured Streamingのチェックポイント機構を利用して処理済みファイルの一覧とストリームのオフセット情報をクラウドストレージに永続化します。ジョブが失敗してもチェックポイントから再開するため、各ファイルは必ず1回だけ処理されます(Exactly-once保証)。この保証は Delta Lake のトランザクションログとの組み合わせで実現されています。

  • 1ストリームにつき1チェックポイントディレクトリを割り当てる。複数ストリームでの共有はオフセット競合の原因になる
  • チェックポイントを削除すると全ファイルが再処理される。処理履歴が失われ、ソースディレクトリの全ファイルを再度読み込む
  • チェックポイントはクラウドストレージに保存する(dbfs:/、s3://、abfss://など)。ローカルディスクではノード障害時に消失する
  • schemaLocationとcheckpointLocationは別ディレクトリにする。schemaLocationは推論済みスキーマの保存先、checkpointLocationはストリームの処理状態の保存先で、役割が異なる
# チェックポイント設計の例
df = (spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "json")
    .option("cloudFiles.schemaLocation",
            "dbfs:/checkpoint/autoloader/events/schema")
    .load("s3://bucket/landing/events/")
)

query = (df.writeStream
    .option("checkpointLocation",
            "dbfs:/checkpoint/autoloader/events/stream")
    .trigger(availableNow=True)
    .toTable("bronze.raw_events")
)

トリガーモードとMedallion Architectureでの活用

Auto LoaderはStructured Streamingベースのため、トリガーモードを選択できます。実務では trigger(availableNow=True) をDatabricks Workflowsで定期実行するパターンが最も一般的です。これは「到着済みの全ファイルを複数マイクロバッチに分割して処理し、完了したらジョブを停止する」という動作で、常時稼働クラスタが不要なためコスト効率に優れています。

# availableNow: 到着済みファイルを全て処理して停止(推奨)
query = (df.writeStream
    .option("checkpointLocation", "/checkpoint/bronze")
    .trigger(availableNow=True)
    .toTable("bronze.events")
)

# processingTime: 一定間隔で継続処理(リアルタイムが必要な場合)
query = (df.writeStream
    .option("checkpointLocation", "/checkpoint/bronze")
    .trigger(processingTime="30 seconds")
    .toTable("bronze.events")
)

# once=True: 非推奨(1バッチに全データを詰め込むためOOMリスクあり)
# → availableNow=True への移行をDatabricksが推奨

Medallion Architecture(Bronze / Silver / Gold の3層構造)では、Auto LoaderはBronze層への取り込みで使います。Landing Zone(生ファイル)→ Bronze Table(Delta形式、_rescued_data含む生データ保存)を担当し、Silver以降の変換はDelta LiveTablesやforeachBatchによるMERGEで行います。

  • Landing Zone → Bronze: Auto Loader(cloudFiles)で増分取り込み。_rescued_data を含めて保存
  • Bronze → Silver: Delta Streaming / DLT / foreachBatch + MERGEでクレンジング・重複排除
  • Silver → Gold: バッチ集計 / Materialized Views / SQLによるビジネス指標の算出

試験で問われるポイント(DEA / DEP)

DEA試験ではAuto Loaderが全体の10〜15%を占め、最頻出です。DEPでもBronze層のデータ取り込みパターンとして問われます。以下のポイントを正確に整理しておきましょう。

  • Auto Loader vs COPY INTO の選択基準: 大量ファイルが継続的に到着 → Auto Loader。少量の1回限りロード → COPY INTO。「ファイルが毎時到着し、数百万に達する」という条件が出たら Auto Loader が正解
  • ディレクトリリスティング vs ファイル通知: デフォルトはディレクトリリスティング。大規模環境ではファイル通知モード(useNotifications=true)。切り替えはオプション変更のみで、チェックポイントの再作成は不要
  • スキーマエボリューションのデフォルト挙動: JSON/CSVではaddNewColumns(新カラム検知でストリーム停止→再開時に自動追加)。writeStream側に mergeSchema=true を付けないと書き込みエラーになる
  • _rescued_data カラム: 型不一致・未知カラムのデータをJSON文字列として退避。デフォルトで有効。Bronze層での品質チェックに活用
  • trigger(availableNow=True): 到着済み全ファイルを複数バッチで処理→停止。Workflows定期実行パターンの標準。once=Trueは非推奨(1バッチに詰め込むためOOMリスク)
  • cloudFiles.schemaLocation の役割: 推論済みスキーマの永続化先。checkpointLocationとは別ディレクトリで管理
  • チェックポイント削除の影響: 処理履歴が失われ、ソースの全ファイルが再処理される

問題で確認

Data Engineer Associate

問題 1

あるデータエンジニアは、S3バケットに毎時数千のJSONファイルが到着する環境でBronze Delta Tableへの取り込みパイプラインを構築している。ファイル数は今後数百万に達する見込みで、ソース側のJSONスキーマに新しいフィールドが追加される可能性がある。最も適切な取り込み方法はどれか。

  1. Auto Loaderのファイル通知モード(cloudFiles.useNotifications=true)を使い、cloudFiles.schemaEvolutionModeをaddNewColumnsに設定し、writeStream側でmergeSchema=trueを指定する
  2. COPY INTOコマンドを1時間ごとにWorkflowsジョブで実行し、FORMAT_OPTIONSでmergeSchemaを有効にする
  3. Auto Loaderのディレクトリリスティングモード(デフォルト)を使い、cloudFiles.schemaEvolutionModeをnoneに設定する
  4. spark.read.format('json')でバッチ読み込みを行い、重複ファイルはファイル名のリストで手動管理する

正解: A

数百万ファイルが継続的に到着する環境では、Auto Loaderのファイル通知モードが最適です。イベント通知でファイル到着を即時検知するため、ファイル総数に関係なくスケールします。さらに、JSONスキーマに新フィールドが追加される可能性があるため、schemaEvolutionMode=addNewColumnsで新カラムを自動追加し、writeStream側のmergeSchema=trueでDelta Tableスキーマも自動拡張する構成が正解です。COPY INTO(B)はスキーマ推論/エボリューション機能がなく、数百万ファイルではパフォーマンスが劣化します。ディレクトリリスティング+none(C)ではファイル数増大時にlist APIコストが増大し、新カラムも無視されます。手動管理(D)はファイル追跡の仕組みがなく、大規模環境では運用不能です。

よくある質問

Auto LoaderとCOPY INTOの違いは何ですか?

Auto Loader(cloudFiles)はStructured Streamingベースで、チェックポイントを使ってファイルの処理状態を追跡し、新規ファイルだけを増分取り込みします。ファイル通知モードではS3 Event NotificationやAzure Event Gridでファイル到着を即時検知でき、数百万ファイル規模でもスケールします。一方、COPY INTOはSQLバッチコマンドで、テーブルメタデータにファイルの処理履歴を記録します。スキーマ推論やスキーマエボリューション機能はなく、事前にテーブルスキーマの定義が必要です。Databricksの公式ガイドラインでは、継続的なデータ取り込みにはAuto Loader、少量のアドホックロードにはCOPY INTOを推奨しています。

Auto Loaderはどのファイル形式に対応していますか?

JSON、CSV、Parquet、Avro、ORC、Text、バイナリファイル(binaryFile)の7形式に対応しています。cloudFiles.formatオプションで形式を指定し(例: .option('cloudFiles.format', 'json'))、各形式固有の解析オプションも利用可能です。JSONではmultiLine、CSVではheader・delimiter・quote、Parquet/Avro/ORCではmergeSchemaなど、Sparkの標準リーダーオプションがそのまま使えます。バイナリファイルモードでは画像やPDFなどの非構造化データをバイナリ列として取り込めます。

ディレクトリリスティングモードからファイル通知モードへ切り替える手順は?

cloudFiles.useNotificationsオプションをtrueに変更するだけで切り替えできます。既存のチェックポイントはそのまま引き継がれるため、データの再処理は発生しません。ファイル通知モードではAuto Loaderがクラウドプロバイダのイベント通知リソース(AWS: SQSキュー + S3イベント通知、Azure: Event Grid + Queue Storage、GCP: Pub/Sub)を自動作成します。Auto Loaderに適切なIAM権限(AWSの場合はs3:GetBucketNotificationConfiguration、sqs:CreateQueueなど)を付与する必要があります。ストリームを削除する際は cloudFiles.validateConfiguration=false を設定してクリーンアップ処理を行い、作成されたクラウドリソースを削除してください。

この記事で学んだ内容を問題で確認しましょう

16,000問以上の問題で実力チェック

無料で問題を解いてみる
この記事の著者

NicheeLab編集部

データエンジニアリング・クラウド資格の専門家。Databricks・Snowflake等の認定資格を保有し、実務経験に基づいた問題作成・解説を行っています。NicheeLab運営。


関連記事
Databricks

Databricks資格一覧|全7試験・難易度・勉強法

Databricks認定資格全7試験の一覧・難易度・出題範囲・合格ラインを徹底解説。2026年最新版の公式試験ガイドに準...

Databricks

Databricks試験の難易度ランキング|全7資格を徹底比較

Databricks認定全7試験の難易度をランキング形式で徹底比較。合格率・学習時間・出題傾向から難易度を分析。...

Databricks

Databricks資格の勉強方法|最短合格ルートと学習時間の目安

Databricks認定資格に最短で合格するための勉強方法を完全ガイド。公式リソース・問題集・学習スケジュールを徹底解説...

Databricks

Databricks Data Engineer Associate完全解説|出題範囲・問題例・合格戦略

Databricks Certified Data Engineer Associate試験を徹底解説。5つの出題ドメイ...

Databricks

Databricks Data Engineer Professional完全解説|上級試験の攻略法

Databricks Certified Data Engineer Professional試験を徹底解説。10の出題...

Databricksの記事一覧 (109件)
© 2026 NicheeLab All rights reserved.