JDBC Connectors は RDBMS と Kafka をつなぐ最短ルートですが、取り込みモードや主キー/スキーマ設計、ポーリング間隔やバッチサイズなどの設定を外すとすぐに詰まります。
本稿は Source/Sink 双方の基本から、スループット・重複防止・エラー処理の勘所までを、試験と実務の両面で役に立つ形でまとめました。
JDBC Source は RDBMS から Kafka へ継続的にデータを取り込みます。代表的なモードは bulk、incrementing、timestamp、timestamp+incrementing の4種。増分取り込みでは、非NULLかつ単調増加する列(IDなど)や更新時刻列にインデックスを張るのが前提です。
テーブル指定は include/exclude リスト、またはカスタム SQL(query)で行います。カスタム SQL を使うと柔軟ですが、コネクタのオフセット管理(増分境界列)と整合が取れるように SQL 側でも境界条件を明示するのが安全です。
| レイヤ | 主目的 | 主なキー設定/例 |
|---|---|---|
| Source(JDBC) | RDB → Kafka 取り込み | mode=timestamp+incrementing, incrementing.column.name, timestamp.column.name, poll.interval.ms, batch.max.rows, topic.prefix, table.include.list |
| Sink(JDBC) | Kafka → RDB 書き込み | insert.mode, pk.mode, pk.fields, auto.create, auto.evolve, delete.enabled, batch.size, max.retries, retry.backoff.ms, table.name.format |
| Worker/共通 | 実行基盤・エラー処理 | tasks.max, errors.tolerance, errors.deadletterqueue.topic.name, producer.override.*, consumer.override.* |
RDB と Kafka をつなぐ JDBC Source/Sink の流れ
JDBC Source(増分取り込み)の最小構成例
name=jdbc-source-orders
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
connection.url=jdbc:postgresql://db.example.local:5432/app
connection.user=app
connection.password=***
mode=timestamp+incrementing
incrementing.column.name=id
timestamp.column.name=updated_at
topic.prefix=db_
table.include.list=public.orders,public.customers
poll.interval.ms=10000
batch.max.rows=5000
timestamp.delay.interval.ms=5000
producer.override.compression.type=snappy
producer.override.linger.ms=20JDBC Sink は Kafka のレコードを RDBMS に書き込みます。書き込みモードは insert、upsert、update。upsert/update を使う場合は主キー情報(pk.mode、pk.fields)が必須です。Kafka の tombstone(value=null)を物理削除に変換したい場合は delete.enabled を有効にします。
スキーマ不一致に備えて auto.create と auto.evolve を使う選択肢はありますが、運用ではテーブル設計を明示し、変更はマイグレーションで管理するのが安全です。バッチ書き込みは batch.size で調整します。
JDBC Sink(UPSERT + 削除対応)の最小構成例
name=jdbc-sink-orders
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
connection.url=jdbc:postgresql://dw.example.local:5432/warehouse
connection.user=dw
connection.password=***
topics=db_orders
insert.mode=upsert
pk.mode=record_value
pk.fields=id
auto.create=false
auto.evolve=false
delete.enabled=true
batch.size=3000
max.retries=10
retry.backoff.ms=3000
table.name.format=${topic}_stg
consumer.override.max.poll.records=2000スループットは「DB 側のインデックスとI/O」「コネクタのバッチ/ポーリング」「Kafka 側のバッファリング」の掛け算で決まります。まずは DB に適切なインデックス(増分列・更新時刻列・PK)を用意し、取り込み/書き込みのバッチを無理なくまとめます。
Source はプロデューサ、Sink はコンシューマのチューニングが重要です。Connect は override 機構で下層クライアントへ委譲できます。
よく使う override(Source=Producer / Sink=Consumer)
# Source(JDBC Source)側: プロデューサ設定
producer.override.acks=all
producer.override.compression.type=zstd
producer.override.linger.ms=30
producer.override.batch.size=131072
# Sink(JDBC Sink)側: コンシューマ設定
delete.enabled=true
consumer.override.max.poll.records=1000
consumer.override.max.partition.fetch.bytes=5242880
consumer.override.fetch.max.bytes=52428800JDBC Source は RDB の型を Kafka Connect のスキーマへ、JDBC Sink はその逆方向にマッピングします。DECIMAL/NUMERIC、TIMESTAMP/TIME/DATE、BLOB/CLOB などは変換規則を理解しておくと事故を防げます。スキーマレジストリを使う場合は、互換性設定とフィールドの進化(nullable 化、デフォルト付与)を計画します。
数値のマッピングは numeric.mapping(best_fit, precision_only など)で制御できます。日時はタイムゾーン表現の揺れに注意し、Source ではタイムスタンプ列の精度(秒/ミリ/マイクロ)と DB ドライバの挙動を確認します。
型マッピング/SMT の例(日時正規化と不要フィールド除去)
# JDBC Source の SMT 例(不要列除去 + フィールド名正規化)
transforms=dropCols,renameTs
transforms.dropCols.type=org.apache.kafka.connect.transforms.MaskField$Value
transforms.dropCols.blacklist=internal_note,temp_flag
transforms.renameTs.type=org.apache.kafka.connect.transforms.ReplaceField$Value
transforms.renameTs.renames=updatedat:updated_at
# 数値マッピング(Source)
numeric.mapping=best_fitConnect の標準エラー処理は errors.tolerance と Dead Letter Queue(DLQ)で構成します。全スキップ(all)を選ぶ場合でも、後で原因追跡できるように DLQ へ流し、ヘッダにコンテキストを付けるのが実務定石です。
一時的な DB 停止やロック競合にはリトライ/バックオフで耐える設計にし、上限を超えたら明示的に失敗させてオペレーションへエスカレーションします。
DLQ とリトライの設定例(Sink)
errors.tolerance=all
errors.deadletterqueue.topic.name=dlq.jdbc.sink.orders
errors.deadletterqueue.context.headers.enable=true
max.retries=12
retry.backoff.ms=5000スナップショット(初回全件)→増分切替の移行テストを必ず実施し、重複/取りこぼしがないことを SQL とオフセットで検証します。再起動時は timestamp+incrementing モードが重複に強い選択肢です。
並列度は tasks.max で段階増加。Source はテーブル数やクエリ分割なしではスケールに限界があります。Sink はトピックパーティション数と DB ロック/インデックスで頭打ちになります。Connect REST API で状態監視を自動化し、失敗時は DLQ とログを突き合わせます。
Connect REST でのヘルス/再配置例
# 状態確認
GET /connectors/jdbc-sink-orders/status
# タスク再起動
POST /connectors/jdbc-sink-orders/tasks/0/restart
# 設定取得
GET /connectors/jdbc-source-orders/configCCDAK
問題 1
Confluent JDBC Source を用いて更新のあるテーブルを増分取り込みし、再起動時の重複を最小化したい。正しい前提/設定の組み合わせはどれか。
正解: A
更新のあるテーブルでは timestamp+incrementing が再起動時の重複抑制に有効。増分列は非NULLかつ単調増加で、両列にインデックスが必要。B/C/D はいずれも公式の要件や推奨に反する。
bulk と incrementing/timestamp はどう使い分けますか?
初回の全件取り込みは bulk、その後は incrementing(追記のみ)か timestamp/timestamp+incrementing(更新あり)へ切り替えます。切替時はオフセットと件数照合で重複/欠損検証を実施します。
insert.mode=upsert でエラーになります。何が必要ですか?
pk.mode と pk.fields の指定が必要です(record_key か record_value 由来)。PK や一意制約のないテーブルでは upsert は成立しません。
エラーをスキップしつつ原因追跡したいのですが?
errors.tolerance=all と DLQ(errors.deadletterqueue.topic.name)を併用します。コンテキストヘッダも有効化し、DLQ を監視対象に加えます。
NicheeLab編集部
データエンジニアリング・クラウド資格の専門家。Databricks・Snowflake等の認定資格を保有し、実務経験に基づいた問題作成・解説を行っています。NicheeLab運営。
Kafka Topic と Partition の基礎: 分散とスケーラビリティの要
CCDAK 対策と実務の両立を意識し、Topic/Partition/Replica/Consumer Group の役...
CCDAK 試験ガイド:出題範囲・配点・申込み・対策
Confluent Certified Developer for Apache Kafka (CCDAK) の出題範囲...
Confluent Certified Administrator (CCAAK) 対策: 出題範囲・配点の考え方・運用観点の要点
CCAAKに向けて、試験領域の押さえどころを運用目線で整理。プロダクションで通用する設定・監視・セキュリティの実践知を、...
Kafka の Replica と In-Sync Replicas を正しく設計する: 耐障害性と一貫性
レプリカとISRの仕組みを起点に、acks と min.insync.replicas、クリーン/アンクリーンリーダー選...
Kafka の Offset とコミット: ポジション管理と at-least-once の基礎
CCDAK 対策と実務の両立を意識して、Kafka コンシューマのオフセット管理とコミット戦略を整理。at-least-...