Kafka Connect は、外部システムと Kafka を低コードでつなぐプラガブルなフレームワーク。Source/Sink コネクタとタスクをワーカーにデプロイしてスケールさせるだけで、信頼性のあるデータ連携基盤を構築できる。
本稿は CCDAK の出題観点(モード選択、内部トピック、タスク/コネクタ、コンバータ/SMT、エラーハンドリング)と、実務の運用手順を両立して解説する。
Kafka Connect は、外部システム(DB、オブジェクトストレージ、SaaS など)とのデータ連携を、コネクタというプラグインとして実装・運用するための公式フレームワーク。アプリケーションに専用の Producer/Consumer を書かずに、設定ベースで連携を行う。
ワーカー(Connect クラスタ)はタスクを並列実行し、分散モードでは構成・オフセット・ステータスを Kafka の内部トピックで耐障害的に管理する。操作は REST API 経由で行い、ローリングでの拡張・更新が可能。
Kafka Connect の論理アーキテクチャ
最小の Source コネクタを作成(REST)
curl -s -X POST http://localhost:8083/connectors \
-H 'Content-Type: application/json' \
-d '{
"name": "fs-source",
"config": {
"connector.class": "org.apache.kafka.connect.file.FileStreamSourceConnector",
"tasks.max": "1",
"file": "/tmp/input.txt",
"topic": "fs-input",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.storage.StringConverter"
}
}'Kafka Connect は Standalone と Distributed の2モードを持つ。前者は単一プロセスで簡易、後者はクラスタリングして耐障害・スケールアウトを実現する。マネージド(例: Confluent Cloud)の提供もあり、運用をアウトソースできる。
分散モードでは内部トピック(config/offsets/status)により状態が Kafka 上に保持され、ワーカーの増減や障害時も自動でリバランスされる。スタンドアロンは開発・単体用途に向く。
| 観点 | Standalone | Distributed | マネージド(例) |
|---|---|---|---|
| 運用単位 | 単一 JVM プロセス | 複数ワーカーでクラスタ | 提供事業者が管理 |
| フォールトトレランス | プロセス障害で停止 | ワーカー障害時に自動再配置 | サービス側で冗長化 |
| スケーリング | 手動で縦方向主体 | タスクをワーカー間で水平分散 | プラン変更や自動スケール |
| 状態管理 | ローカルファイル等 | 内部トピック(config/offsets/status) | サービス内メタデータ |
| 代表ユースケース | 開発/単体/簡易バッチ | 本番常時稼働の連携基盤 | フルマネージド要件 |
起動コマンドの対比(例)
# Standalone(単一プロセス)
connect-standalone worker.properties file-source.properties
# Distributed(ワーカークラスタ)
connect-distributed worker.properties # その後 REST でコネクタ作成コネクタは「外部システムとの接続と分割統治」の責務、タスクは「並列実行単位」の責務を持つ。tasks.max に応じてタスクが生成され、分散モードでは複数ワーカーに割り当てられる。
コンバータ(key/value.converter)は Connect と Kafka 間のシリアライズ形式を定義する。JSON/String/ByteArray が同梱され、Avro/Protobuf などは一般に別途プラグインとスキーマ管理(例: Schema Registry)を用いる。SMT は単一レコード単位の軽量変換チェーンで、配線前後のデータ整形に有効。
SMT とコンバータの設定例(抜粋)
{
"name": "jdbc-sink",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"topics": "orders",
"tasks.max": "4",
"connection.url": "jdbc:postgresql://db:5432/app",
"insert.mode": "upsert",
"pk.mode": "record_key",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
"transforms": "route,mask",
"transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.route.regex": "(.*)",
"transforms.route.replacement": "prod_$1",
"transforms.mask.type": "org.apache.kafka.connect.transforms.MaskField$Value",
"transforms.mask.fields": "card_number"
}
}分散モードでは、connect-configs・connect-offsets・connect-status の内部トピックが用意される。いずれもログコンパクション対象で、推奨は十分なレプリケーション係数(一般に 3)を設定して耐障害性を確保すること。
Source のオフセットは外部システム側の位置情報(ソースパーティション+位置)をマップとして保持し、Sink は Kafka Consumer のオフセットを保持する。これにより、ワーカー障害・再起動後も継続位置から再開できる。
トピック名はワーカー構成で指定し、事前作成しない場合は自動作成されることもあるが、本番ではポリシーどおりに手動作成し、コンパクションとレプリケーションを明示するのが安全。
分散ワーカー設定(抜粋)
# Kafka 接続
bootstrap.servers=broker1:9092,broker2:9092
group.id=connect-cluster
# 内部トピック
config.storage.topic=connect-configs
offset.storage.topic=connect-offsets
status.storage.topic=connect-status
config.storage.replication.factor=3
offset.storage.replication.factor=3
status.storage.replication.factor=3
# コンバータ(例: スキーマなし JSON)
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
# REST ポート
rest.port=8083
# (必要に応じて)セキュリティや Config Provider
# ssl.endpoint.identification.algorithm=https
# sasl.mechanism=PLAIN
# config.providers=env,file
# config.providers.env.class=org.apache.kafka.common.config.provider.EnvVarConfigProviderスケールは tasks.max の調整とワーカー台数の増減で行う。リバランスによりタスク割当が自動再配置される。入力トピックのパーティション数以上に Sink タスクを増やしても効果は薄いため、パーティション設計と併せ込む。
監視は REST と JMX を併用。/connectors、/connectors/{name}/status、/connectors/{name}/tasks などで稼働状態を確認できる。失敗時の挙動は errors.* 設定で制御でき、DLQ(デッドレタートピック)への退避も可能。
ローリング更新は 1 台ずつワーカーを再起動し、各再起動ごとにステータスを確認する。長時間のリバランスやタスク起動失敗が続く場合は、内部トピックの可用性と外部依存(DB 接続、認証)の健全性を優先確認する。
DLQ と監視 REST(例)
# 監視
curl -s http://localhost:8083/connectors | jq .
curl -s http://localhost:8083/connectors/my-conn/status | jq .
curl -s http://localhost:8083/connectors/my-conn/tasks | jq .
# 一時停止/再開
curl -s -X PUT http://localhost:8083/connectors/my-conn/pause
curl -s -X PUT http://localhost:8083/connectors/my-conn/resume
# エラー処理(コネクタ設定の一部)
"errors.tolerance": "all", # all または none
"errors.retry.timeout": "60000", # ms
"errors.retry.delay.max.ms": "5000",
"errors.deadletterqueue.topic.name": "dlq.my-conn",
"errors.deadletterqueue.context.headers.enable": "true"出題では、モード選択、内部トピックの役割、タスクとパーティションの関係、コンバータ/SMT の責務分離、エラーハンドリングの構成が頻出。設計問題では、少なくとも一度の配送前提で冪等な下流更新(Upsert/Idempotent API)を選ぶ、といった方針が問われやすい。
一般的なパターンは CDC Source → Kafka → 複数 Sink(DWH、検索、レイク)。スキーマ進化を考慮し、スキーマ管理と互換性戦略(後方互換など)を決める。性能面は、タスク並列化・適切なパーティション数・バッチサイズやフラッシュ間隔の調整で詰める。
スケーリング設計のメモ(概算)
# 目標スループットとパーティション数から tasks.max を見積もる
# P = 入力パーティション数, Rt = 目標総処理行/秒, Rc = 1タスクあたり処理行/秒
# tasks.max ≈ min(P, ceil(Rt / Rc))
# 例: P=24, Rt=48k rps, Rc=3k rps → tasks.max=min(24, ceil(48000/3000)=16)=16CCDAK
問題 1
本番向けの耐障害性とスケールを満たす Kafka Connect 構成を選びたい。ワーカー障害時も自動でタスクが再配置され、再起動後に継続位置から処理を再開できることが要件である。最も適切な選択はどれか。
正解: A
要求はフォールトトレランスと自動再配置、継続位置からの再開。これは Distributed モードと内部トピックの冗長化で満たせる。Standalone は単一プロセスで再配置されず、内部トピックのレプリカ 1 は単一障害点になる。タスク数の増加だけでは自動再配置は起きない。
Kafka Streams と Kafka Connect の違いは何か?
Connect は外部システムとの入出力(ETL の I/O)に特化したフレームワークで、設定ベース・プラガブルなコネクタが中心。Streams はアプリケーションコードでストリーム処理(集計、結合、状態管理など)を記述するライブラリ。用途が異なるため補完関係にある。
Connect で Exactly-Once は実現できるか?
Connect は一般に at-least-once セマンティクスで動作する。重複を許容できない場合は、下流での冪等化(主キーによる upsert、重複検知キー、トランザクション API など)を設計する。
Schema Registry は必須か?
必須ではない。Connect 同梱の JSON/String/ByteArray コンバータで運用できる。ただしスキーマ進化や型安全性が重要な場合は、Avro/Protobuf などのコンバータとスキーマ管理(例: Schema Registry)を併用するのが実務的。
NicheeLab編集部
データエンジニアリング・クラウド資格の専門家。Databricks・Snowflake等の認定資格を保有し、実務経験に基づいた問題作成・解説を行っています。NicheeLab運営。
Kafka Topic と Partition の基礎: 分散とスケーラビリティの要
CCDAK 対策と実務の両立を意識し、Topic/Partition/Replica/Consumer Group の役...
CCDAK 試験ガイド:出題範囲・配点・申込み・対策
Confluent Certified Developer for Apache Kafka (CCDAK) の出題範囲...
Confluent Certified Administrator (CCAAK) 対策: 出題範囲・配点の考え方・運用観点の要点
CCAAKに向けて、試験領域の押さえどころを運用目線で整理。プロダクションで通用する設定・監視・セキュリティの実践知を、...
Kafka の Replica と In-Sync Replicas を正しく設計する: 耐障害性と一貫性
レプリカとISRの仕組みを起点に、acks と min.insync.replicas、クリーン/アンクリーンリーダー選...
Kafka の Offset とコミット: ポジション管理と at-least-once の基礎
CCDAK 対策と実務の両立を意識して、Kafka コンシューマのオフセット管理とコミット戦略を整理。at-least-...