ストリーミングやバッチ取り込みにおいて、上流のスキーマ変更はパイプライン障害の主要因です。 Databricksでは「Confluent Schema Registryとの連携」「Auto Loaderのスキーマ推論・進化」「Delta Lakeのスキーマ進化」を組み合わせてスキーマの変更を安全に吸収します。 本稿ではこれらのパターンを実務と試験の両面から整理します。
スキーマレジストリは、メッセージのスキーマ定義(フィールド名・型・デフォルト値)を一元管理するサービスです。 Kafka等のメッセージングシステムと組み合わせて使い、プロデューサーとコンシューマー間のスキーマ互換性を保証します。
| 互換性モード | 許可される変更 | ユースケース |
|---|---|---|
| BACKWARD | フィールド削除、デフォルト付きフィールド追加 | コンシューマーが先に更新される場合 |
| FORWARD | フィールド追加、デフォルト付きフィールド削除 | プロデューサーが先に更新される場合 |
| FULL | デフォルト付きフィールドの追加/削除のみ | プロデューサー・コンシューマーの更新順序が不定の場合 |
DatabricksからConfluentのKafkaトピックをAvroで読む場合、from_avroとschema registry URLを組み合わせてデシリアライズします。
from pyspark.sql.functions import col
from pyspark.sql.avro.functions import from_avro
schema_registry_options = {
"schema.registry.url": "https://your-registry.confluent.cloud",
"confluent.schema.registry.basic.auth.credentials.source": "USER_INFO",
"confluent.schema.registry.basic.auth.user.info": "<API_KEY>:<API_SECRET>"
}
df = (spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "broker:9092")
.option("subscribe", "orders-topic")
.option("startingOffsets", "latest")
.load()
.select(
from_avro(
col("value"),
"orders-value",
schema_registry_options
).alias("data")
)
.select("data.*")
)from_avroの第2引数はSchema Registry上のSubject名(通常は topic名-value)です。 スキーマIDがメッセージヘッダーに含まれるため、プロデューサー側でスキーマが更新されても、レジストリからスキーマを動的に取得してデシリアライズできます。
Auto Loader(cloudFiles)はクラウドストレージ上のファイルを増分取り込みする仕組みで、スキーマの自動推論と進化に対応しています。 Kafkaではなくファイルベースの取り込みにおいて、スキーマ管理の主力となる機能です。
| オプション | 値 | 動作 |
|---|---|---|
| cloudFiles.inferColumnTypes | true | サンプリングで型を推論(デフォルトはすべてstring) |
| cloudFiles.schemaEvolutionMode | addNewColumns | 新カラムを検出するとスキーマに自動追加 |
| cloudFiles.schemaEvolutionMode | rescue | 不明カラムを _rescued_data に退避 |
| cloudFiles.schemaEvolutionMode | failOnNewColumns | 新カラム検出でストリーム停止 |
| cloudFiles.schemaEvolutionMode | none | 初回推論スキーマを固定し、変更を無視 |
| cloudFiles.schemaLocation | パス指定 | 推論済みスキーマの保存先(チェックポイントと分離可能) |
df = (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.inferColumnTypes", "true")
.option("cloudFiles.schemaEvolutionMode", "addNewColumns")
.option("cloudFiles.schemaLocation", "/mnt/schema/orders")
.load("/mnt/landing/orders/")
)schemaLocationに保存されたスキーマファイルは、次回ストリーム起動時に読み込まれます。 addNewColumnsモードでは新カラム検出時にストリームが一時的にrestartされ、スキーマファイルが更新されます。
Deltaテーブルへの書き込み時にスキーマの差分をどう処理するかは、mergeSchemaとoverwriteSchemaで制御します。
# mergeSchema: 新カラムを追加(既存カラムは保持)
(df.writeStream
.format("delta")
.option("mergeSchema", "true")
.option("checkpointLocation", "/mnt/checkpoints/orders")
.outputMode("append")
.toTable("silver.orders")
)
# overwriteSchema: スキーマ全体を置換(破壊的変更)
(df.write
.format("delta")
.mode("overwrite")
.option("overwriteSchema", "true")
.saveAsTable("silver.orders_v2")
)| 比較軸 | Avro | JSON |
|---|---|---|
| シリアライズ形式 | バイナリ(コンパクト) | テキスト(人間可読) |
| スキーマ埋め込み | スキーマIDをヘッダーに含む | スキーマなし(自己記述型) |
| Schema Registry連携 | ネイティブ対応 | 任意(json-schema連携も可) |
| 互換性チェック | Registry側で自動検証 | 自前で管理 |
| メッセージサイズ | 小さい(30-50%削減) | 大きい(フィールド名を毎回含む) |
| デバッグ容易性 | 専用ツールが必要 | そのまま読める |
| 推奨ユースケース | 高スループット本番パイプライン | 開発・プロトタイプ・ログ収集 |
Kafkaソース+Auto Loader+Delta Lakeを組み合わせた典型的なスキーマ管理フローを示します。
[Producer]
│ Avro + Schema Registry (BACKWARD互換)
v
[Kafka Topic]
│ from_avro + schema_registry_options
v
[Databricks Streaming Job]
│ スキーマ変更検出
├─ 新カラム → mergeSchema=true でDeltaに追加
└─ 型変更 → ストリーム停止 → 手動スキーマ移行
v
[Delta Table (Silver)]
│ NOT NULL / CHECK制約で最終検証
v
[Delta Table (Gold)][Cloud Storage (JSON/CSV/Parquet)]
│ Auto Loader (cloudFiles)
│ schemaEvolutionMode = addNewColumns
│ schemaLocation = /mnt/schema/xxx
v
[Databricks Streaming Job]
│ mergeSchema=true
v
[Delta Table (Bronze → Silver)]Data Engineer Associate / Professional
問題 1
Auto Loaderでクラウドストレージ上のJSONファイルを取り込んでいる。上流システムが新しいカラムを追加する可能性があるが、パイプラインを停止させず新カラムを自動的にDeltaテーブルに追加したい。最も適切な設定の組み合わせはどれか。
正解: A
addNewColumnsモードは新カラムを検出するとスキーマファイルを更新してストリームを再起動し、mergeSchema=trueでDeltaテーブルにカラムを追加します。failOnNewColumnsはストリームを停止するため要件に合いません。rescueは新カラムを_rescued_dataに退避するだけで自動追加はしません。noneモードはスキーマを固定するため新カラムは無視されます。
Confluent Schema RegistryなしでもDatabricksでスキーマ管理はできますか?
はい。Auto LoaderのschemaEvolutionModeとDelta Lakeのスキーマ進化(mergeSchema)を組み合わせれば、Databricks単体でスキーマ管理を完結できます。Confluent Schema Registryが必要になるのは、Kafkaエコシステム側で複数のプロデューサー/コンシューマー間のAvroスキーマ互換性を一元管理したい場合です。Databricksがコンシューマー専任であれば、Auto Loader+Deltaだけで十分なケースが多いです。
Auto LoaderのschemaEvolutionModeで 'rescue' を選ぶとどうなりますか?
rescueモードでは、既存スキーマに合致しないカラムのデータを _rescued_data という特別なカラムにJSON文字列として退避します。パイプラインは停止せず、既知のカラムは正常に処理されます。後から _rescued_data を解析して新カラムの追加判断を行えるため、スキーマ変更の影響を安全に評価したい運用に適しています。
AvroとJSONではどちらをKafkaメッセージのフォーマットに選ぶべきですか?
スキーマの厳密な互換性管理が必要ならAvro、柔軟性と可読性を優先するならJSONが適しています。AvroはSchema Registryとの親和性が高く、バイナリシリアライズでメッセージサイズも小さくなります。JSONはSchema Registryなしでも使え、デバッグしやすいですが、スキーマ進化の互換性チェックは自前で管理する必要があります。高スループットの本番パイプラインではAvroが一般的です。
NicheeLab編集部
データエンジニアリング・クラウド資格の専門家。Databricks・Snowflake等の認定資格を保有し、実務経験に基づいた問題作成・解説を行っています。NicheeLab運営。
Databricks資格一覧|全7試験・難易度・勉強法
Databricks認定資格全7試験の一覧・難易度・出題範囲・合格ラインを徹底解説。2026年最新版の公式試験ガイドに準...
Databricks試験の難易度ランキング|全7資格を徹底比較
Databricks認定全7試験の難易度をランキング形式で徹底比較。合格率・学習時間・出題傾向から難易度を分析。...
Databricks資格の勉強方法|最短合格ルートと学習時間の目安
Databricks認定資格に最短で合格するための勉強方法を完全ガイド。公式リソース・問題集・学習スケジュールを徹底解説...
Databricks Data Engineer Associate完全解説|出題範囲・問題例・合格戦略
Databricks Certified Data Engineer Associate試験を徹底解説。5つの出題ドメイ...
Databricks Data Engineer Professional完全解説|上級試験の攻略法
Databricks Certified Data Engineer Professional試験を徹底解説。10の出題...