Kafka

Kafka Sink Connectors 実践ガイド: Kafka から外部システムへの書き出し

2026-04-19
NicheeLab編集部

Kafka Connect の Sink Connector は、Kafka のトピックからレコードを消費し、データベースや検索エンジン、オブジェクトストレージなどの外部システムへ書き出す仕組みです。

本稿では、配信保証、重複対策、エラーハンドリング、スケーリング、運用の要点を、Confluent/Apache Kafka の公式ドキュメントに基づく安定した振る舞いとしてまとめ、CCDAK で問われやすい観点も併記します。

Sink Connectors の全体像とアーキテクチャ

Kafka Connect は、ワーカー(単体/分散)上でコネクタとタスクを動かすフレームワークです。Sink Connector は Kafka から読み出して外部へ書き込みます。オフセットは内部トピックに保存され、処理成功後にコミットされます。

データの変換はコンバータ(Avro/JSON Schema/Protobuf など)と SMT(Single Message Transform)で行います。エラー時は再試行、スキップ、DLQ(Dead Letter Queue)への転送が構成可能です。

  • コネクタは論理定義、タスクは並列実行単位。tasks.max により並列度を制御
  • 内部トピック(connect-configs / connect-offsets / connect-status)は Kafka 上に保持
  • 順序保証はパーティション単位。1 パーティションは 1 タスク内で順次処理
  • コンバータはシリアライズ/スキーマ管理、SMT は軽量な前処理/整形
  • DLQ は問題レコードの隔離に有効。errors.* 設定で挙動を制御
コネクタ種別代表的な用途重複対策/整合性スキーマ進化
JDBC SinkRDB への書き込み(集計/ODS)UPSERT(pk.mode=record_key 等)で冪等化。削除は delete.enabled + tombstoneauto.create/auto.evolve で列追加に追随可(要検証)
Elasticsearch Sink全文検索/ログ分析document_id に Kafka キーを使い冪等化。外部 versioning も選択肢スキーマは動的マッピングだが型の不一致に注意
S3 Sinkデータレイク原本(ロウデータ)保管重複は起こり得る前提。キーにトピック/パーティション/オフセットを含め識別フォーマット(Avro/Parquet/JSON)とローテーション設定で運用最適化
Snowflake SinkDWH 取り込みターゲット側で MERGE 等により去重/整合性を担保する運用が一般的ステージングとターゲットでのスキーマ適用に注意

Kafka Sink Connector の論理構成

errorserrorsKafka Topicst1, t2, ...Internal Topicsconfigs / offsets / statusSink Connector (logical)Connect Distributed WorkerTask #1(P0, P1)Task #2(P2, P3)DLQ TopicKafkaExternal System AExternal System Bタスクはパーティション単位で並列化、失敗レコードはDLQへ退避

配信保証と整合性モデル:重複と順序の扱い

Sink Connector の基本は少なくとも 1 回(at-least-once)配信です。失敗時の再試行やリバランスにより、同一レコードが複数回書き込まれる可能性があります。したがってエンドツーエンドでの Exactly-once を前提にしない設計が安全です。

順序は Kafka のパーティション内で維持されます。タスクは割り当てられたパーティションの順序を保って処理しますが、複数パーティション間の順序は保証されません。トピック設計とキー設計で順序要件を満たしてください。

  • 既定の保証: at-least-once(成功確認後にオフセットコミット)
  • 重複対策: ターゲット側の UPSERT/主キー、ドキュメント ID にキー使用、下流で去重
  • 読み取り一貫性: 取込で未コミットを避ける場合は isolation.level=read_committed(consumer.override.isolation.level)
  • 削除伝播: tombstone(値 null)を使い、コネクタ側の delete 機能を有効化(対応コネクタのみ)
  • パーティション内の順序は保持。並列度と分散でスループットを確保

主要シンク別:設計・設定の勘所

コネクタ固有の設定は配信保証やスキーマ進化、スループットに直結します。公式ドキュメントの既定値と制約を前提に、安全側の設計を選びます。

ここでは現場利用が多い JDBC / Elasticsearch / S3 / Snowflake を例に要点を列挙します。

  • JDBC Sink: insert.mode=upsert、pk.mode=record_key、pk.fields を明示。delete.enabled と behavior.on.null.values=delete で削除連携。auto.create/auto.evolve は本番導入前にステージで検証
  • Elasticsearch Sink: key.ignore=false でキーを _id に利用し冪等化。bulk.size と linger をワークロードに合わせ最適化。mapping 衝突と型変換に注意
  • S3 Sink: ローテーション(flush.size, rotate.interval.ms)とパーティション(topics.dir, partitioner.class)を整理。重複は許容し、下流で MERGE/去重を設計
  • Snowflake Sink: ターゲットテーブルのキー/一意制約を基準に取り込み後の MERGE を運用化。小刻みなバッチはコスト増に注意。スキーマ変更は段階適用で安全に

エラー処理、再試行、DLQ の実装指針

Kafka Connect はエラー処理を設定で制御できます。レコード変換/シリアライズ/宛先書き込みなど、どの段階で失敗しても、再試行、スキップ、DLQ 転送のいずれかで前進させる設計が現実的です。

本番では DLQ を必ず用意し、DLQ レコードの監視・再処理フロー(修正後の再投入や個別補正)を運用に組み込みます。

  • errors.tolerance=all で失敗レコードを継続処理、none で即停止
  • errors.deadletterqueue.topic.name で DLQ 先を指定。context.headers.enable で原因情報をヘッダーに付与
  • errors.retry.timeout / errors.retry.delay.max.ms で一時的エラーへの粘りを調整
  • 変換系エラーは SMT/コンバータの設定見直し(schemas.enable、型/必須項目)
  • 宛先固有の制限(制約違反、タイムアウト)はバッチ/同時実行数/リトライを調整

スケーリングとパフォーマンス:tasks とパーティション設計

スループットは大きくトピックのパーティション数と tasks.max の積で決まります。1 タスクは複数パーティションを担当できますが、1 パーティションは 1 タスク内で順次処理されます。

コネクタ固有のバッチ設定(例: JDBC の batch.size)や、Connect の consumer.override.* による消費の微調整も有効です。過負荷時はコネクタがコンシューマを一時停止しバックプレッシャーをかけます。

  • tasks.max はパーティション数以上に設定しても効果が頭打ち(割当て上限に依存)
  • 大規模化時はトピックのパーティション数を計画的に増やし、キー分散を確認
  • JDBC 等のバルク書き込みは batch.size と接続プール/コミット間隔を調整
  • consumer.override.max.poll.records で 1 回の取得件数を調整(処理時間とバランス)
  • リバランス頻発は停止/遅延要因。安定したワーカー数と協調再均衡を運用で確保

運用・監視と CCDAK 出題ポイント

分散モードでの本番運用が一般的です。内部トピックのレプリケーション係数、クリーンアップポリシー(compact など)を適切に設定します。監視は Connect REST API と JMX メトリクスを併用します。

CCDAK では、Connect の構成要素、内部トピック、配信保証、DLQ、SMT/コンバータの役割、代表的コネクタ設定が頻出です。公式ドキュメントにある既定値や制約を前提に、選択肢の言い回しに注意してください。

  • REST: /connectors, /connectors/<name>/status, /connectors/<name>/tasks
  • 内部トピック: connect-configs(compact), connect-offsets(compact), connect-status(compact)
  • セキュリティ: Kafka 接続は SSL/SASL、宛先はコネクタ固有の認証を安全に保管
  • 変更管理: ローリングでタスク再起動、コンフィグは REST 経由でバージョン管理
  • 試験対策: at-least-once が基本、重複はターゲット側で処理。DLQ と errors.* の組み合わせを覚える

例: JDBC Sink Connector を REST で作成(UPSERT + 削除 + DLQ)

POST /connectors HTTP/1.1
Host: connect:8083
Content-Type: application/json

{
  "name": "inventory-jdbc-sink",
  "config": {
    "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
    "tasks.max": "3",
    "topics": "inventory.customers,inventory.orders",
    "connection.url": "jdbc:postgresql://db:5432/warehouse",
    "connection.user": "app",
    "connection.password": "******",
    "auto.create": "true",
    "auto.evolve": "true",
    "insert.mode": "upsert",
    "pk.mode": "record_key",
    "pk.fields": "id",
    "delete.enabled": "true",
    "behavior.on.null.values": "delete",
    "batch.size": "3000",
    "max.retries": "10",
    "retry.backoff.ms": "5000",
    "errors.tolerance": "all",
    "errors.deadletterqueue.topic.name": "dlq.inventory",
    "errors.deadletterqueue.context.headers.enable": "true",
    "key.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "key.converter.schema.registry.url": "http://schema-registry:8081",
    "value.converter.schema.registry.url": "http://schema-registry:8081",
    "consumer.override.isolation.level": "read_committed"
  }
}

問題で確認

CCDAK

問題 1

Kafka Connect JDBC Sink で、Kafka の tombstone(値が null)を RDB の削除として反映したい。重複を避けつつ UPSERT 運用を維持するための適切な設定の組み合わせはどれか。

  1. A. insert.mode=upsert, pk.mode=record_key, delete.enabled=true, behavior.on.null.values=delete
  2. B. insert.mode=insert, pk.mode=none, delete.enabled=true, behavior.on.null.values=drop
  3. C. insert.mode=update, pk.mode=record_value, delete.enabled=false, behavior.on.null.values=ignore
  4. D. insert.mode=upsert, pk.mode=none, delete.enabled=true, behavior.on.null.values=rewrite

正解: A

JDBC Sink で削除を有効化するには delete.enabled=true が必要。tombstone を削除として扱うには behavior.on.null.values=delete を指定する。UPSERT で冪等化するには insert.mode=upsert とし、主キーは通常 Kafka のキーを使うため pk.mode=record_key(および pk.fields)を設定する。

よくある質問

Sink で Exactly-once は実現できますか?

Kafka Connect の Sink は基本的に at-least-once です。ターゲットが外部システムであるため、エンドツーエンドでの Exactly-once は一般化できません。実務では UPSERT/主キー、ドキュメント ID、下流側の去重や MERGE を組み合わせて整合性を確保します。

スキーマ無しの JSON をそのまま書き出せますか?

可能です。value.converter に JsonConverter を用い、schemas.enable=false を指定すればスキーマレスで取り扱えます。ただしスキーマ進化や型検証が効かないため、長期運用では Avro/JSON Schema/Protobuf とスキーマレジストリの併用が推奨です。

複数トピックを 1 つのテーブルに統合できますか?

コネクタや SMT の設定次第で可能です。JDBC Sink では table.name.format で書き込み先を制御できますが、スキーマ互換性と主キーの一貫性が必須です。衝突や型不一致が出やすいため、統合前にスキーマ整形の SMT か中間トピックでの正規化を検討してください。

この記事で学んだ内容を問題で確認しましょう

16,000問以上の問題で実力チェック

無料で問題を解いてみる
この記事の著者

NicheeLab編集部

データエンジニアリング・クラウド資格の専門家。Databricks・Snowflake等の認定資格を保有し、実務経験に基づいた問題作成・解説を行っています。NicheeLab運営。


関連記事
Kafka

Kafka Topic と Partition の基礎: 分散とスケーラビリティの要

CCDAK 対策と実務の両立を意識し、Topic/Partition/Replica/Consumer Group の役...

Kafka

CCDAK 試験ガイド:出題範囲・配点・申込み・対策

Confluent Certified Developer for Apache Kafka (CCDAK) の出題範囲...

Kafka

Confluent Certified Administrator (CCAAK) 対策: 出題範囲・配点の考え方・運用観点の要点

CCAAKに向けて、試験領域の押さえどころを運用目線で整理。プロダクションで通用する設定・監視・セキュリティの実践知を、...

Kafka

Kafka の Replica と In-Sync Replicas を正しく設計する: 耐障害性と一貫性

レプリカとISRの仕組みを起点に、acks と min.insync.replicas、クリーン/アンクリーンリーダー選...

Kafka

Kafka の Offset とコミット: ポジション管理と at-least-once の基礎

CCDAK 対策と実務の両立を意識して、Kafka コンシューマのオフセット管理とコミット戦略を整理。at-least-...

Kafkaの記事一覧 (100件)
© 2026 NicheeLab All rights reserved.