Databricks

Databricksで押さえるSchema Registryパターン: Avro/JSON・Auto Loader・スキーマ進化

2026-03-26
更新: 2026-03-27
NicheeLab編集部

ストリーミングやバッチ取り込みにおいて、上流のスキーマ変更はパイプライン障害の主要因です。 Databricksでは「Confluent Schema Registryとの連携」「Auto Loaderのスキーマ推論・進化」「Delta Lakeのスキーマ進化」を組み合わせてスキーマの変更を安全に吸収します。 本稿ではこれらのパターンを実務と試験の両面から整理します。

スキーマレジストリとは何か

スキーマレジストリは、メッセージのスキーマ定義(フィールド名・型・デフォルト値)を一元管理するサービスです。 Kafka等のメッセージングシステムと組み合わせて使い、プロデューサーとコンシューマー間のスキーマ互換性を保証します。

  • スキーマのバージョン管理: 新フィールド追加やフィールド削除がいつ行われたか追跡可能
  • 互換性チェック: BACKWARD / FORWARD / FULL の3モードで互換性を自動検証
  • シリアライズ/デシリアライズ: スキーマIDをメッセージに埋め込み、コンシューマー側でスキーマを取得してデシリアライズ
互換性モード許可される変更ユースケース
BACKWARDフィールド削除、デフォルト付きフィールド追加コンシューマーが先に更新される場合
FORWARDフィールド追加、デフォルト付きフィールド削除プロデューサーが先に更新される場合
FULLデフォルト付きフィールドの追加/削除のみプロデューサー・コンシューマーの更新順序が不定の場合

Confluent Schema Registryとの連携

DatabricksからConfluentのKafkaトピックをAvroで読む場合、from_avroとschema registry URLを組み合わせてデシリアライズします。

from pyspark.sql.functions import col
from pyspark.sql.avro.functions import from_avro

schema_registry_options = {
    "schema.registry.url": "https://your-registry.confluent.cloud",
    "confluent.schema.registry.basic.auth.credentials.source": "USER_INFO",
    "confluent.schema.registry.basic.auth.user.info": "<API_KEY>:<API_SECRET>"
}

df = (spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "broker:9092")
    .option("subscribe", "orders-topic")
    .option("startingOffsets", "latest")
    .load()
    .select(
        from_avro(
            col("value"),
            "orders-value",
            schema_registry_options
        ).alias("data")
    )
    .select("data.*")
)

from_avroの第2引数はSchema Registry上のSubject名(通常は topic名-value)です。 スキーマIDがメッセージヘッダーに含まれるため、プロデューサー側でスキーマが更新されても、レジストリからスキーマを動的に取得してデシリアライズできます。

Auto Loaderのスキーマ推論と進化

Auto Loader(cloudFiles)はクラウドストレージ上のファイルを増分取り込みする仕組みで、スキーマの自動推論と進化に対応しています。 Kafkaではなくファイルベースの取り込みにおいて、スキーマ管理の主力となる機能です。

オプション動作
cloudFiles.inferColumnTypestrueサンプリングで型を推論(デフォルトはすべてstring)
cloudFiles.schemaEvolutionModeaddNewColumns新カラムを検出するとスキーマに自動追加
cloudFiles.schemaEvolutionModerescue不明カラムを _rescued_data に退避
cloudFiles.schemaEvolutionModefailOnNewColumns新カラム検出でストリーム停止
cloudFiles.schemaEvolutionModenone初回推論スキーマを固定し、変更を無視
cloudFiles.schemaLocationパス指定推論済みスキーマの保存先(チェックポイントと分離可能)
df = (spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "json")
    .option("cloudFiles.inferColumnTypes", "true")
    .option("cloudFiles.schemaEvolutionMode", "addNewColumns")
    .option("cloudFiles.schemaLocation", "/mnt/schema/orders")
    .load("/mnt/landing/orders/")
)

schemaLocationに保存されたスキーマファイルは、次回ストリーム起動時に読み込まれます。 addNewColumnsモードでは新カラム検出時にストリームが一時的にrestartされ、スキーマファイルが更新されます。

Delta Lakeのスキーマ進化

Deltaテーブルへの書き込み時にスキーマの差分をどう処理するかは、mergeSchemaとoverwriteSchemaで制御します。

# mergeSchema: 新カラムを追加(既存カラムは保持)
(df.writeStream
    .format("delta")
    .option("mergeSchema", "true")
    .option("checkpointLocation", "/mnt/checkpoints/orders")
    .outputMode("append")
    .toTable("silver.orders")
)

# overwriteSchema: スキーマ全体を置換(破壊的変更)
(df.write
    .format("delta")
    .mode("overwrite")
    .option("overwriteSchema", "true")
    .saveAsTable("silver.orders_v2")
)
  • mergeSchema: カラム追加のみ。型変更やカラム削除は不可。ストリーミングでもバッチでも使用可能
  • overwriteSchema: スキーマ全体を置換。バッチのoverwriteモード限定。既存データの型と整合しなくなるリスクがあるため、テーブル再構築時のみ使用

Avro vs JSON: フォーマット選択基準

比較軸AvroJSON
シリアライズ形式バイナリ(コンパクト)テキスト(人間可読)
スキーマ埋め込みスキーマIDをヘッダーに含むスキーマなし(自己記述型)
Schema Registry連携ネイティブ対応任意(json-schema連携も可)
互換性チェックRegistry側で自動検証自前で管理
メッセージサイズ小さい(30-50%削減)大きい(フィールド名を毎回含む)
デバッグ容易性専用ツールが必要そのまま読める
推奨ユースケース高スループット本番パイプライン開発・プロトタイプ・ログ収集

エンドツーエンドのスキーマ管理パターン

Kafkaソース+Auto Loader+Delta Lakeを組み合わせた典型的なスキーマ管理フローを示します。

[Producer]
  │  Avro + Schema Registry (BACKWARD互換)
  v
[Kafka Topic]
  │  from_avro + schema_registry_options
  v
[Databricks Streaming Job]
  │  スキーマ変更検出
  ├─ 新カラム → mergeSchema=true でDeltaに追加
  └─ 型変更  → ストリーム停止 → 手動スキーマ移行
       v
[Delta Table (Silver)]
  │  NOT NULL / CHECK制約で最終検証
  v
[Delta Table (Gold)]
[Cloud Storage (JSON/CSV/Parquet)]
  │  Auto Loader (cloudFiles)
  │  schemaEvolutionMode = addNewColumns
  │  schemaLocation = /mnt/schema/xxx
  v
[Databricks Streaming Job]
  │  mergeSchema=true
  v
[Delta Table (Bronze → Silver)]

運用上の注意点

  • schemaLocationとcheckpointLocationは別パスを推奨。チェックポイントをリセットしてもスキーマは保持される
  • addNewColumnsモードでは下流のテーブル・ビューにもカラム追加が波及するため、影響範囲を事前に把握する
  • Schema Registryの互換性モードをBACKWARDにしておけば、コンシューマー(Databricks)側は新スキーマで古いメッセージも読める
  • 型変更(int→string等)はmergeSchemaでは吸収できないため、overwriteSchemaまたはテーブル再構築が必要
  • _rescued_data カラムのサイズを定期監視し、想定外のスキーマ変更を早期検出する

問題で確認

Data Engineer Associate / Professional

問題 1

Auto Loaderでクラウドストレージ上のJSONファイルを取り込んでいる。上流システムが新しいカラムを追加する可能性があるが、パイプラインを停止させず新カラムを自動的にDeltaテーブルに追加したい。最も適切な設定の組み合わせはどれか。

  1. cloudFiles.schemaEvolutionMode = 'addNewColumns' を設定し、writeStream側で mergeSchema = true を指定する
  2. cloudFiles.schemaEvolutionMode = 'failOnNewColumns' を設定し、手動でALTER TABLEを実行する
  3. cloudFiles.schemaEvolutionMode = 'rescue' を設定し、_rescued_data カラムのまま運用する
  4. cloudFiles.schemaEvolutionMode = 'none' を設定し、overwriteSchema = true で毎回スキーマを上書きする

正解: A

addNewColumnsモードは新カラムを検出するとスキーマファイルを更新してストリームを再起動し、mergeSchema=trueでDeltaテーブルにカラムを追加します。failOnNewColumnsはストリームを停止するため要件に合いません。rescueは新カラムを_rescued_dataに退避するだけで自動追加はしません。noneモードはスキーマを固定するため新カラムは無視されます。

よくある質問

Confluent Schema RegistryなしでもDatabricksでスキーマ管理はできますか?

はい。Auto LoaderのschemaEvolutionModeとDelta Lakeのスキーマ進化(mergeSchema)を組み合わせれば、Databricks単体でスキーマ管理を完結できます。Confluent Schema Registryが必要になるのは、Kafkaエコシステム側で複数のプロデューサー/コンシューマー間のAvroスキーマ互換性を一元管理したい場合です。Databricksがコンシューマー専任であれば、Auto Loader+Deltaだけで十分なケースが多いです。

Auto LoaderのschemaEvolutionModeで 'rescue' を選ぶとどうなりますか?

rescueモードでは、既存スキーマに合致しないカラムのデータを _rescued_data という特別なカラムにJSON文字列として退避します。パイプラインは停止せず、既知のカラムは正常に処理されます。後から _rescued_data を解析して新カラムの追加判断を行えるため、スキーマ変更の影響を安全に評価したい運用に適しています。

AvroとJSONではどちらをKafkaメッセージのフォーマットに選ぶべきですか?

スキーマの厳密な互換性管理が必要ならAvro、柔軟性と可読性を優先するならJSONが適しています。AvroはSchema Registryとの親和性が高く、バイナリシリアライズでメッセージサイズも小さくなります。JSONはSchema Registryなしでも使え、デバッグしやすいですが、スキーマ進化の互換性チェックは自前で管理する必要があります。高スループットの本番パイプラインではAvroが一般的です。

この記事で学んだ内容を問題で確認しましょう

16,000問以上の問題で実力チェック

無料で問題を解いてみる
この記事の著者

NicheeLab編集部

データエンジニアリング・クラウド資格の専門家。Databricks・Snowflake等の認定資格を保有し、実務経験に基づいた問題作成・解説を行っています。NicheeLab運営。


関連記事
Databricks

Databricks資格一覧|全7試験・難易度・勉強法

Databricks認定資格全7試験の一覧・難易度・出題範囲・合格ラインを徹底解説。2026年最新版の公式試験ガイドに準...

Databricks

Databricks試験の難易度ランキング|全7資格を徹底比較

Databricks認定全7試験の難易度をランキング形式で徹底比較。合格率・学習時間・出題傾向から難易度を分析。...

Databricks

Databricks資格の勉強方法|最短合格ルートと学習時間の目安

Databricks認定資格に最短で合格するための勉強方法を完全ガイド。公式リソース・問題集・学習スケジュールを徹底解説...

Databricks

Databricks Data Engineer Associate完全解説|出題範囲・問題例・合格戦略

Databricks Certified Data Engineer Associate試験を徹底解説。5つの出題ドメイ...

Databricks

Databricks Data Engineer Professional完全解説|上級試験の攻略法

Databricks Certified Data Engineer Professional試験を徹底解説。10の出題...

Databricksの記事一覧 (105件)
© 2026 NicheeLab All rights reserved.