Kafka では「バイト列で書く/読む」ことが本質ですが、現場では人やシステムが扱いやすい形式へ変換する層が重要になります。それがクライアントの Serializer/Deserializer と、Kafka Connect の Converter です。
この記事では、特に Connect の key.converter / value.converter の役割を軸に、選定基準、Schema Registry 連携、トラブル回避ポイントをまとめ、CCDAK で問われやすい差分を明確にします。
Kafka クライアント(Producer/Consumer)はアプリ内でオブジェクトをバイト列へ変換するために Serializer/Deserializer を使います。一方、Kafka Connect はコネクタとKafkaの境界でデータとスキーマを扱うために Converter(key.converter / value.converter)を使います。
実装や設定場所、スキーマの意識レベルが異なります。Converter は Connect の内部データ表現(Schema + Struct など)と Kafka 上のバイト列を相互変換し、SMT(Single Message Transform)やリトライ、エラーハンドリングと整合的に動作します。
| 対象 | 適用層 | 代表クラス/設定例 |
|---|---|---|
| Converter(Connect) | Kafka Connect のコネクタ境界 | org.apache.kafka.connect.storage.StringConverter / io.confluent.connect.avro.AvroConverter (key.converter, value.converter) |
| Serializer/Deserializer(クライアント) | Producer/Consumer のアプリ境界 | org.apache.kafka.common.serialization.StringSerializer / StringDeserializer (key.serializer, value.deserializer) |
| SerDe(Streams) | Kafka Streams(型とペア) | org.apache.kafka.common.serialization.Serdes.String()(builder へ指定) |
Kafka における変換の位置づけ(アプリ vs Connect)
Serializer と Converter の設定スニペット(対比)
# Producer(アプリ側)
bootstrap.servers=broker:9092
key.serializer=org.apache.kafka.common.serialization.StringSerializer
value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
# Consumer(アプリ側)
bootstrap.servers=broker:9092
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
value.deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer
auto.offset.reset=earliest
# Kafka Connect(ワーカー/グローバル既定)
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=true
# (Schema Registry を使う場合)
# key.converter=io.confluent.connect.avro.AvroConverter
# value.converter=io.confluent.connect.avro.AvroConverter
# key.converter.schema.registry.url=http://schema-registry:8081
# value.converter.schema.registry.url=http://schema-registry:8081Kafka のレコードは key と value を別々に持ち、Connect でも key.converter と value.converter を独立に設定します。これは実務で重要です。例えば、パーティショニングは通常 key のバイト列をハッシュするため、key の表現を安定させることがクラスタのデータ配置やキー集約に直結します。
一方、value はスキーマ進化を伴いがちで、Avro/Protobuf/JSON Schema と Schema Registry 連携を選ぶことが多いです。key はシンプルな String/Long、value は Avro/Protobuf といった組み合わせが現場の定番です。
コネクタ単位でのオーバーライド(ワーカー既定を上書き)
{
"name": "jdbc-source-custom",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"tasks.max": "1",
"connection.url": "jdbc:postgresql://db:5432/app",
"mode": "incrementing",
"incrementing.column.name": "id",
"topic.prefix": "db_",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://schema-registry:8081"
}
}Confluent の AvroConverter/ProtobufConverter/JsonSchemaConverter は Schema Registry と連携し、Connect の内部データ(Schema + Value)をレコードごとに適切なスキーマ ID でバイト列化します。これにより後続の Consumer は対応する Deserializer(KafkaAvroDeserializer など)で安全に復元できます。
JsonConverter は Schema Registry 非依存ですが、schemas.enable=true の場合、Kafka 上の JSON はスキーマ情報を含む構造になり、Connect 間では相互運用しやすくなります(ただし一般の Consumer で直接扱うには実装理解が必要)。
Consumer(Avro)の最小構成例
bootstrap.servers=broker:9092
group.id=orders-app
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
value.deserializer=io.confluent.kafka.serializers.KafkaAvroDeserializer
schema.registry.url=http://schema-registry:8081
auto.offset.reset=earliest可観測性、相互運用性、スキーマ進化、パフォーマンスの観点で形式を選びます。運用中の移行コストも重要です。
下記は現場でよく採用される方針です。
小規模パイプラインの例(キーは String、値は JSON Schema へ移行)
# ワーカー既定
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=true
# 後日、対象トピックのコネクタだけ JSON Schema へ移行
value.converter=io.confluent.connect.json.JsonSchemaConverter
value.converter.schema.registry.url=http://schema-registry:8081形式不一致やスキーマ設定漏れが典型です。バイト列を安全に確認し、Converter/Serializer の両面を点検します。
レコードキーはパーティショニングとロジックに直結します。キーの表現が変更されるとハッシュ値が変わり、古いデータと新しいデータが別パーティションに分散しうる点に注意してください。
kcat での観察例(キーは String、値は Avro)
# キーを文字列で、値は Avro をデコード
kcat -b broker:9092 -t orders -C -K: -f 'key=%k\tval=%s\n' \
-s key=string -s value=avro -r http://schema-registry:8081試験では、どこで何を設定するか、という層の違いが頻出です。システム図を思い浮かべて「この処理はどの境界で起きているか」を素早く判断できるようにしてください。
CCDAK
問題 1
Kafka Connect の Source Connector で、キーは文字列、値はスキーマ管理された形式で Kafka に書き出したい。適切な設定の組み合わせはどれか。
正解: A
Connect では Converter を設定する。キーは StringConverter、値は Avro/Protobuf/JSON Schema 系の Converter を選び、Schema Registry を使う形式では converter 側に schema.registry.url を与える。Serializer はクライアント(アプリ)設定であり、Connect の代替にはならない。schemas.enable=false の JSON はスキーマ進化の保証を提供しない。
key.converter と value.converter を同じクラスにする必要はありますか?
ありません。一般的に key は StringConverter/LongConverter、value は Avro/Protobuf/JSON Schema 系 Converter のように分けます。要件に応じて独立に選定してください。
JsonConverter の schemas.enable=true/false の違いは?
true はスキーマ同伴 JSON(Connect 間での相互運用や SMT に有利)、false はスキーマなしの素朴な JSON を書き出します。一般の Consumer と直接やり取りするなら false が扱いやすい一方、型情報や進化管理はアプリ側で担う必要があります。
Schema Registry の互換性設定はどこで行いますか?
Schema Registry サーバ側の互換性レベル(BACKWARD など)をサブジェクト単位/グローバルに設定します。Converter/Serializer はその設定に従い登録/検証を行います。
NicheeLab編集部
データエンジニアリング・クラウド資格の専門家。Databricks・Snowflake等の認定資格を保有し、実務経験に基づいた問題作成・解説を行っています。NicheeLab運営。
Kafka Topic と Partition の基礎: 分散とスケーラビリティの要
CCDAK 対策と実務の両立を意識し、Topic/Partition/Replica/Consumer Group の役...
CCDAK 試験ガイド:出題範囲・配点・申込み・対策
Confluent Certified Developer for Apache Kafka (CCDAK) の出題範囲...
Confluent Certified Administrator (CCAAK) 対策: 出題範囲・配点の考え方・運用観点の要点
CCAAKに向けて、試験領域の押さえどころを運用目線で整理。プロダクションで通用する設定・監視・セキュリティの実践知を、...
Kafka の Replica と In-Sync Replicas を正しく設計する: 耐障害性と一貫性
レプリカとISRの仕組みを起点に、acks と min.insync.replicas、クリーン/アンクリーンリーダー選...
Kafka の Offset とコミット: ポジション管理と at-least-once の基礎
CCDAK 対策と実務の両立を意識して、Kafka コンシューマのオフセット管理とコミット戦略を整理。at-least-...