Kafka Connect の Sink Connector は、Kafka のトピックからレコードを消費し、データベースや検索エンジン、オブジェクトストレージなどの外部システムへ書き出す仕組みです。
本稿では、配信保証、重複対策、エラーハンドリング、スケーリング、運用の要点を、Confluent/Apache Kafka の公式ドキュメントに基づく安定した振る舞いとしてまとめ、CCDAK で問われやすい観点も併記します。
Kafka Connect は、ワーカー(単体/分散)上でコネクタとタスクを動かすフレームワークです。Sink Connector は Kafka から読み出して外部へ書き込みます。オフセットは内部トピックに保存され、処理成功後にコミットされます。
データの変換はコンバータ(Avro/JSON Schema/Protobuf など)と SMT(Single Message Transform)で行います。エラー時は再試行、スキップ、DLQ(Dead Letter Queue)への転送が構成可能です。
| コネクタ種別 | 代表的な用途 | 重複対策/整合性 | スキーマ進化 |
|---|---|---|---|
| JDBC Sink | RDB への書き込み(集計/ODS) | UPSERT(pk.mode=record_key 等)で冪等化。削除は delete.enabled + tombstone | auto.create/auto.evolve で列追加に追随可(要検証) |
| Elasticsearch Sink | 全文検索/ログ分析 | document_id に Kafka キーを使い冪等化。外部 versioning も選択肢 | スキーマは動的マッピングだが型の不一致に注意 |
| S3 Sink | データレイク原本(ロウデータ)保管 | 重複は起こり得る前提。キーにトピック/パーティション/オフセットを含め識別 | フォーマット(Avro/Parquet/JSON)とローテーション設定で運用最適化 |
| Snowflake Sink | DWH 取り込み | ターゲット側で MERGE 等により去重/整合性を担保する運用が一般的 | ステージングとターゲットでのスキーマ適用に注意 |
Kafka Sink Connector の論理構成
Sink Connector の基本は少なくとも 1 回(at-least-once)配信です。失敗時の再試行やリバランスにより、同一レコードが複数回書き込まれる可能性があります。したがってエンドツーエンドでの Exactly-once を前提にしない設計が安全です。
順序は Kafka のパーティション内で維持されます。タスクは割り当てられたパーティションの順序を保って処理しますが、複数パーティション間の順序は保証されません。トピック設計とキー設計で順序要件を満たしてください。
コネクタ固有の設定は配信保証やスキーマ進化、スループットに直結します。公式ドキュメントの既定値と制約を前提に、安全側の設計を選びます。
ここでは現場利用が多い JDBC / Elasticsearch / S3 / Snowflake を例に要点を列挙します。
Kafka Connect はエラー処理を設定で制御できます。レコード変換/シリアライズ/宛先書き込みなど、どの段階で失敗しても、再試行、スキップ、DLQ 転送のいずれかで前進させる設計が現実的です。
本番では DLQ を必ず用意し、DLQ レコードの監視・再処理フロー(修正後の再投入や個別補正)を運用に組み込みます。
スループットは大きくトピックのパーティション数と tasks.max の積で決まります。1 タスクは複数パーティションを担当できますが、1 パーティションは 1 タスク内で順次処理されます。
コネクタ固有のバッチ設定(例: JDBC の batch.size)や、Connect の consumer.override.* による消費の微調整も有効です。過負荷時はコネクタがコンシューマを一時停止しバックプレッシャーをかけます。
分散モードでの本番運用が一般的です。内部トピックのレプリケーション係数、クリーンアップポリシー(compact など)を適切に設定します。監視は Connect REST API と JMX メトリクスを併用します。
CCDAK では、Connect の構成要素、内部トピック、配信保証、DLQ、SMT/コンバータの役割、代表的コネクタ設定が頻出です。公式ドキュメントにある既定値や制約を前提に、選択肢の言い回しに注意してください。
例: JDBC Sink Connector を REST で作成(UPSERT + 削除 + DLQ)
POST /connectors HTTP/1.1
Host: connect:8083
Content-Type: application/json
{
"name": "inventory-jdbc-sink",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": "3",
"topics": "inventory.customers,inventory.orders",
"connection.url": "jdbc:postgresql://db:5432/warehouse",
"connection.user": "app",
"connection.password": "******",
"auto.create": "true",
"auto.evolve": "true",
"insert.mode": "upsert",
"pk.mode": "record_key",
"pk.fields": "id",
"delete.enabled": "true",
"behavior.on.null.values": "delete",
"batch.size": "3000",
"max.retries": "10",
"retry.backoff.ms": "5000",
"errors.tolerance": "all",
"errors.deadletterqueue.topic.name": "dlq.inventory",
"errors.deadletterqueue.context.headers.enable": "true",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "http://schema-registry:8081",
"value.converter.schema.registry.url": "http://schema-registry:8081",
"consumer.override.isolation.level": "read_committed"
}
}
CCDAK
問題 1
Kafka Connect JDBC Sink で、Kafka の tombstone(値が null)を RDB の削除として反映したい。重複を避けつつ UPSERT 運用を維持するための適切な設定の組み合わせはどれか。
正解: A
JDBC Sink で削除を有効化するには delete.enabled=true が必要。tombstone を削除として扱うには behavior.on.null.values=delete を指定する。UPSERT で冪等化するには insert.mode=upsert とし、主キーは通常 Kafka のキーを使うため pk.mode=record_key(および pk.fields)を設定する。
Sink で Exactly-once は実現できますか?
Kafka Connect の Sink は基本的に at-least-once です。ターゲットが外部システムであるため、エンドツーエンドでの Exactly-once は一般化できません。実務では UPSERT/主キー、ドキュメント ID、下流側の去重や MERGE を組み合わせて整合性を確保します。
スキーマ無しの JSON をそのまま書き出せますか?
可能です。value.converter に JsonConverter を用い、schemas.enable=false を指定すればスキーマレスで取り扱えます。ただしスキーマ進化や型検証が効かないため、長期運用では Avro/JSON Schema/Protobuf とスキーマレジストリの併用が推奨です。
複数トピックを 1 つのテーブルに統合できますか?
コネクタや SMT の設定次第で可能です。JDBC Sink では table.name.format で書き込み先を制御できますが、スキーマ互換性と主キーの一貫性が必須です。衝突や型不一致が出やすいため、統合前にスキーマ整形の SMT か中間トピックでの正規化を検討してください。
NicheeLab編集部
データエンジニアリング・クラウド資格の専門家。Databricks・Snowflake等の認定資格を保有し、実務経験に基づいた問題作成・解説を行っています。NicheeLab運営。
Kafka Topic と Partition の基礎: 分散とスケーラビリティの要
CCDAK 対策と実務の両立を意識し、Topic/Partition/Replica/Consumer Group の役...
CCDAK 試験ガイド:出題範囲・配点・申込み・対策
Confluent Certified Developer for Apache Kafka (CCDAK) の出題範囲...
Confluent Certified Administrator (CCAAK) 対策: 出題範囲・配点の考え方・運用観点の要点
CCAAKに向けて、試験領域の押さえどころを運用目線で整理。プロダクションで通用する設定・監視・セキュリティの実践知を、...
Kafka の Replica と In-Sync Replicas を正しく設計する: 耐障害性と一貫性
レプリカとISRの仕組みを起点に、acks と min.insync.replicas、クリーン/アンクリーンリーダー選...
Kafka の Offset とコミット: ポジション管理と at-least-once の基礎
CCDAK 対策と実務の両立を意識して、Kafka コンシューマのオフセット管理とコミット戦略を整理。at-least-...