Kafka

Kafka Connect の key/value.converter と Serializer の実務とCCDAK対策

2026-04-19
NicheeLab編集部

Kafka では「バイト列で書く/読む」ことが本質ですが、現場では人やシステムが扱いやすい形式へ変換する層が重要になります。それがクライアントの Serializer/Deserializer と、Kafka Connect の Converter です。

この記事では、特に Connect の key.converter / value.converter の役割を軸に、選定基準、Schema Registry 連携、トラブル回避ポイントをまとめ、CCDAK で問われやすい差分を明確にします。

Converter と Serializer の違いと位置づけ

Kafka クライアント(Producer/Consumer)はアプリ内でオブジェクトをバイト列へ変換するために Serializer/Deserializer を使います。一方、Kafka Connect はコネクタとKafkaの境界でデータとスキーマを扱うために Converter(key.converter / value.converter)を使います。

実装や設定場所、スキーマの意識レベルが異なります。Converter は Connect の内部データ表現(Schema + Struct など)と Kafka 上のバイト列を相互変換し、SMT(Single Message Transform)やリトライ、エラーハンドリングと整合的に動作します。

  • Serializer/Deserializer: アプリの境界で動作。Kafka クライアント API の設定項目。
  • Converter: Connect ワーカー/コネクタの境界で動作。key/value 単位で個別設定可能。
  • スキーマ対応: Converter はスキーマ同伴や Schema Registry 連携を前提に設計された実装が豊富。
対象適用層代表クラス/設定例
Converter(Connect)Kafka Connect のコネクタ境界org.apache.kafka.connect.storage.StringConverter / io.confluent.connect.avro.AvroConverter (key.converter, value.converter)
Serializer/Deserializer(クライアント)Producer/Consumer のアプリ境界org.apache.kafka.common.serialization.StringSerializer / StringDeserializer (key.serializer, value.deserializer)
SerDe(Streams)Kafka Streams(型とペア)org.apache.kafka.common.serialization.Serdes.String()(builder へ指定)

Kafka における変換の位置づけ(アプリ vs Connect)

App ProducerSerializer: obj -> bytesKafka Connectkey/value.converter: ConnectRecord <-> bytesApp ConsumerDeserializer: bytes -> objKafka Topic

Serializer と Converter の設定スニペット(対比)

# Producer(アプリ側)
bootstrap.servers=broker:9092
key.serializer=org.apache.kafka.common.serialization.StringSerializer
value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer

# Consumer(アプリ側)
bootstrap.servers=broker:9092
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
value.deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer

auto.offset.reset=earliest

# Kafka Connect(ワーカー/グローバル既定)
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=true
# (Schema Registry を使う場合)
# key.converter=io.confluent.connect.avro.AvroConverter
# value.converter=io.confluent.connect.avro.AvroConverter
# key.converter.schema.registry.url=http://schema-registry:8081
# value.converter.schema.registry.url=http://schema-registry:8081

key.converter / value.converter の役割と分離設定

Kafka のレコードは key と value を別々に持ち、Connect でも key.converter と value.converter を独立に設定します。これは実務で重要です。例えば、パーティショニングは通常 key のバイト列をハッシュするため、key の表現を安定させることがクラスタのデータ配置やキー集約に直結します。

一方、value はスキーマ進化を伴いがちで、Avro/Protobuf/JSON Schema と Schema Registry 連携を選ぶことが多いです。key はシンプルな String/Long、value は Avro/Protobuf といった組み合わせが現場の定番です。

  • key を StringConverter、value を AvroConverter にする組み合わせは一般的。
  • ByteArrayConverter は変換を行わずバイト列を透過。外部システムがバイナリを期待する場合に有効。
  • JsonConverter は schemas.enable=true でスキーマ同伴の JSON、false でスキーマなしのプレーン JSON。

コネクタ単位でのオーバーライド(ワーカー既定を上書き)

{
  "name": "jdbc-source-custom",
  "config": {
    "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
    "tasks.max": "1",
    "connection.url": "jdbc:postgresql://db:5432/app",
    "mode": "incrementing",
    "incrementing.column.name": "id",
    "topic.prefix": "db_",

    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter.schema.registry.url": "http://schema-registry:8081"
  }
}

Schema Registry 連携とスキーマ管理(Connect の視点)

Confluent の AvroConverter/ProtobufConverter/JsonSchemaConverter は Schema Registry と連携し、Connect の内部データ(Schema + Value)をレコードごとに適切なスキーマ ID でバイト列化します。これにより後続の Consumer は対応する Deserializer(KafkaAvroDeserializer など)で安全に復元できます。

JsonConverter は Schema Registry 非依存ですが、schemas.enable=true の場合、Kafka 上の JSON はスキーマ情報を含む構造になり、Connect 間では相互運用しやすくなります(ただし一般の Consumer で直接扱うには実装理解が必要)。

  • Schema Registry を使う場合は、key/value それぞれの Converter に対して schema.registry.url を設定する。
  • スキーマ進化ポリシー(BACKWARD など)は Schema Registry 側の互換性設定で管理する。
  • Consumer 側は対応する Deserializer を設定(KafkaAvroDeserializer など)。Connect Sink なら対応する Converter により自動復元される。

Consumer(Avro)の最小構成例

bootstrap.servers=broker:9092
group.id=orders-app
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
value.deserializer=io.confluent.kafka.serializers.KafkaAvroDeserializer
schema.registry.url=http://schema-registry:8081

auto.offset.reset=earliest

実務パターン:形式の使い分けと選定基準

可観測性、相互運用性、スキーマ進化、パフォーマンスの観点で形式を選びます。運用中の移行コストも重要です。

下記は現場でよく採用される方針です。

  • キーは人が読める安定表現(String/Long)。バイナリキーは極力避ける。
  • バリューは Avro/Protobuf/JSON Schema のいずれか。スキーマ進化と言語間互換を重視するなら Avro/Protobuf。
  • 一時的な可観測性や PoC なら JsonConverter(schemas.enable=true)で素早く開始、段階的に Avro/Protobuf へ移行。

小規模パイプラインの例(キーは String、値は JSON Schema へ移行)

# ワーカー既定
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=true

# 後日、対象トピックのコネクタだけ JSON Schema へ移行
value.converter=io.confluent.connect.json.JsonSchemaConverter
value.converter.schema.registry.url=http://schema-registry:8081

よくある落とし穴とデバッグ方法

形式不一致やスキーマ設定漏れが典型です。バイト列を安全に確認し、Converter/Serializer の両面を点検します。

レコードキーはパーティショニングとロジックに直結します。キーの表現が変更されるとハッシュ値が変わり、古いデータと新しいデータが別パーティションに分散しうる点に注意してください。

  • 文字化け:Producer が StringSerializer、Connect が ByteArrayConverter などの食い違い。kcat で -f を使い表現を確認する。
  • JsonConverter で schemas.enable=false のまま SMT(ExtractField など)を適用しようとして失敗。Connect の内部表現にスキーマが必要な SMT がある。
  • Schema Registry URL の片側(key か value)のみ設定漏れ。key だけ String、value だけ Avro といった設計なら両方の Converter に個別設定が必要。
  • NULL 値(トゥームストーン)はフォーマットに関係なく現れる。Sink 側の Converter/Deser が NULL を扱える前提でロジックを組む。

kcat での観察例(キーは String、値は Avro)

# キーを文字列で、値は Avro をデコード
kcat -b broker:9092 -t orders -C -K: -f 'key=%k\tval=%s\n' \
  -s key=string -s value=avro -r http://schema-registry:8081

CCDAK 試験向けチェックポイント

試験では、どこで何を設定するか、という層の違いが頻出です。システム図を思い浮かべて「この処理はどの境界で起きているか」を素早く判断できるようにしてください。

  • Kafka Connect は key.converter と value.converter を個別設定。ワーカー既定とコネクタ単位の上書きが可能。
  • クライアント(Producer/Consumer)は key.serializer/value.serializer、key.deserializer/value.deserializer を設定。
  • Schema Registry を用いるフォーマット(Avro/Protobuf/JSON Schema)は、Connect では Converter 側に schema.registry.url を設定する。
  • JsonConverter の schemas.enable=true/false の挙動差を把握。true はスキーマ同伴の JSON で Connect 間連携向き。
  • キーの表現はパーティショニングと互換性に影響。安定したキー形式を保つ。

問題で確認

CCDAK

問題 1

Kafka Connect の Source Connector で、キーは文字列、値はスキーマ管理された形式で Kafka に書き出したい。適切な設定の組み合わせはどれか。

  1. key.converter=StringConverter、value.converter=Avro/Protobuf/JSON Schema のいずれか + 各 converter に schema.registry.url(必要に応じて)
  2. key.serializer=StringSerializer、value.serializer=KafkaAvroSerializer を Connect のワーカープロパティに設定
  3. Consumer 側で Avro Deserializer を設定すれば、Source 側の設定は不要
  4. value.converter=JsonConverter(schemas.enable=false)にしておけば Schema Registry は不要かつスキーマ進化も保証される

正解: A

Connect では Converter を設定する。キーは StringConverter、値は Avro/Protobuf/JSON Schema 系の Converter を選び、Schema Registry を使う形式では converter 側に schema.registry.url を与える。Serializer はクライアント(アプリ)設定であり、Connect の代替にはならない。schemas.enable=false の JSON はスキーマ進化の保証を提供しない。

よくある質問

key.converter と value.converter を同じクラスにする必要はありますか?

ありません。一般的に key は StringConverter/LongConverter、value は Avro/Protobuf/JSON Schema 系 Converter のように分けます。要件に応じて独立に選定してください。

JsonConverter の schemas.enable=true/false の違いは?

true はスキーマ同伴 JSON(Connect 間での相互運用や SMT に有利)、false はスキーマなしの素朴な JSON を書き出します。一般の Consumer と直接やり取りするなら false が扱いやすい一方、型情報や進化管理はアプリ側で担う必要があります。

Schema Registry の互換性設定はどこで行いますか?

Schema Registry サーバ側の互換性レベル(BACKWARD など)をサブジェクト単位/グローバルに設定します。Converter/Serializer はその設定に従い登録/検証を行います。

この記事で学んだ内容を問題で確認しましょう

16,000問以上の問題で実力チェック

無料で問題を解いてみる
この記事の著者

NicheeLab編集部

データエンジニアリング・クラウド資格の専門家。Databricks・Snowflake等の認定資格を保有し、実務経験に基づいた問題作成・解説を行っています。NicheeLab運営。


関連記事
Kafka

Kafka Topic と Partition の基礎: 分散とスケーラビリティの要

CCDAK 対策と実務の両立を意識し、Topic/Partition/Replica/Consumer Group の役...

Kafka

CCDAK 試験ガイド:出題範囲・配点・申込み・対策

Confluent Certified Developer for Apache Kafka (CCDAK) の出題範囲...

Kafka

Confluent Certified Administrator (CCAAK) 対策: 出題範囲・配点の考え方・運用観点の要点

CCAAKに向けて、試験領域の押さえどころを運用目線で整理。プロダクションで通用する設定・監視・セキュリティの実践知を、...

Kafka

Kafka の Replica と In-Sync Replicas を正しく設計する: 耐障害性と一貫性

レプリカとISRの仕組みを起点に、acks と min.insync.replicas、クリーン/アンクリーンリーダー選...

Kafka

Kafka の Offset とコミット: ポジション管理と at-least-once の基礎

CCDAK 対策と実務の両立を意識して、Kafka コンシューマのオフセット管理とコミット戦略を整理。at-least-...

Kafkaの記事一覧 (100件)
© 2026 NicheeLab All rights reserved.