Kafka

Kafka JDBC Connectors: Source/Sink とチューニング実践

2026-04-19
NicheeLab編集部

JDBC Connectors は RDBMS と Kafka をつなぐ最短ルートですが、取り込みモードや主キー/スキーマ設計、ポーリング間隔やバッチサイズなどの設定を外すとすぐに詰まります。

本稿は Source/Sink 双方の基本から、スループット・重複防止・エラー処理の勘所までを、試験と実務の両面で役に立つ形でまとめました。

JDBC Source の基本と設計モード

JDBC Source は RDBMS から Kafka へ継続的にデータを取り込みます。代表的なモードは bulk、incrementing、timestamp、timestamp+incrementing の4種。増分取り込みでは、非NULLかつ単調増加する列(IDなど)や更新時刻列にインデックスを張るのが前提です。

テーブル指定は include/exclude リスト、またはカスタム SQL(query)で行います。カスタム SQL を使うと柔軟ですが、コネクタのオフセット管理(増分境界列)と整合が取れるように SQL 側でも境界条件を明示するのが安全です。

  • モード選択の指針: 書き換えがあるテーブルは timestamp(+incrementing)、追記のみは incrementing、初回スナップショットは bulk。
  • ポーリングは poll.interval.ms、1回の取得件数は batch.max.rows。オフセット遅延の吸収に timestamp.delay.interval.ms。
  • トピック名付与は topic.prefix。並列度は tasks.max(テーブル単位の分割が基本)。
  • producer.override.* で Source 側のプロデューサ設定(acks、compression.type、linger.ms など)を上書き可能。
レイヤ主目的主なキー設定/例
Source(JDBC)RDB → Kafka 取り込みmode=timestamp+incrementing, incrementing.column.name, timestamp.column.name, poll.interval.ms, batch.max.rows, topic.prefix, table.include.list
Sink(JDBC)Kafka → RDB 書き込みinsert.mode, pk.mode, pk.fields, auto.create, auto.evolve, delete.enabled, batch.size, max.retries, retry.backoff.ms, table.name.format
Worker/共通実行基盤・エラー処理tasks.max, errors.tolerance, errors.deadletterqueue.topic.name, producer.override.*, consumer.override.*

RDB と Kafka をつなぐ JDBC Source/Sink の流れ

poll() (incremental)consume()topicsRDBMS(OLTP, Postgres)Kafka Connect(JDBC Source/Sink)RDBMS/DWH(OLAP, MySQL etc.)Kafka Topics

JDBC Source(増分取り込み)の最小構成例

name=jdbc-source-orders
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
connection.url=jdbc:postgresql://db.example.local:5432/app
connection.user=app
connection.password=***
mode=timestamp+incrementing
incrementing.column.name=id
timestamp.column.name=updated_at
topic.prefix=db_
table.include.list=public.orders,public.customers
poll.interval.ms=10000
batch.max.rows=5000
timestamp.delay.interval.ms=5000
producer.override.compression.type=snappy
producer.override.linger.ms=20

JDBC Sink の基本と主なオプション

JDBC Sink は Kafka のレコードを RDBMS に書き込みます。書き込みモードは insert、upsert、update。upsert/update を使う場合は主キー情報(pk.mode、pk.fields)が必須です。Kafka の tombstone(value=null)を物理削除に変換したい場合は delete.enabled を有効にします。

スキーマ不一致に備えて auto.create と auto.evolve を使う選択肢はありますが、運用ではテーブル設計を明示し、変更はマイグレーションで管理するのが安全です。バッチ書き込みは batch.size で調整します。

  • insert.mode=upsert のとき、PK 未設定ではエラー。PK は record_key か record_value、または両方から指定。
  • delete.enabled=true は tombstone を DELETE 文に変換。PK 必須、DB 側の外部キー制約に注意。
  • consumer.override.max.poll.records で 1 回に引き当てるレコード数を制御可能(バックプレッシャ調整)。

JDBC Sink(UPSERT + 削除対応)の最小構成例

name=jdbc-sink-orders
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
connection.url=jdbc:postgresql://dw.example.local:5432/warehouse
connection.user=dw
connection.password=***
topics=db_orders
insert.mode=upsert
pk.mode=record_value
pk.fields=id
auto.create=false
auto.evolve=false
delete.enabled=true
batch.size=3000
max.retries=10
retry.backoff.ms=3000
table.name.format=${topic}_stg
consumer.override.max.poll.records=2000

Source/Sink 共通のチューニング要点

スループットは「DB 側のインデックスとI/O」「コネクタのバッチ/ポーリング」「Kafka 側のバッファリング」の掛け算で決まります。まずは DB に適切なインデックス(増分列・更新時刻列・PK)を用意し、取り込み/書き込みのバッチを無理なくまとめます。

Source はプロデューサ、Sink はコンシューマのチューニングが重要です。Connect は override 機構で下層クライアントへ委譲できます。

  • DB: 増分列・更新時刻列・PK に B-tree インデックス。不要なカラム投影を避ける。
  • Source: poll.interval.ms と batch.max.rows の組み合わせで遅延と負荷を両立。大量追いつき時は producer.override.linger.ms/ compression を併用。
  • Sink: batch.size を DB のトランザクション許容量に合わせる。consumer.override.max.poll.records でフェッチを制御。
  • 並列度: tasks.max の上げ過ぎは DB ロック競合や接続枯渇を招く。接続プールや DB の max_connections と合わせて計画。
  • ネットワーク: Kafka ブローカとの RTT が大きい場合は linger.ms とバッチで吸収。

よく使う override(Source=Producer / Sink=Consumer)

# Source(JDBC Source)側: プロデューサ設定
producer.override.acks=all
producer.override.compression.type=zstd
producer.override.linger.ms=30
producer.override.batch.size=131072

# Sink(JDBC Sink)側: コンシューマ設定
delete.enabled=true
consumer.override.max.poll.records=1000
consumer.override.max.partition.fetch.bytes=5242880
consumer.override.fetch.max.bytes=52428800

スキーマと型マッピングの注意点

JDBC Source は RDB の型を Kafka Connect のスキーマへ、JDBC Sink はその逆方向にマッピングします。DECIMAL/NUMERIC、TIMESTAMP/TIME/DATE、BLOB/CLOB などは変換規則を理解しておくと事故を防げます。スキーマレジストリを使う場合は、互換性設定とフィールドの進化(nullable 化、デフォルト付与)を計画します。

数値のマッピングは numeric.mapping(best_fit, precision_only など)で制御できます。日時はタイムゾーン表現の揺れに注意し、Source ではタイムスタンプ列の精度(秒/ミリ/マイクロ)と DB ドライバの挙動を確認します。

  • DECIMAL/NUMERIC は精度・スケール喪失に注意。必要に応じて string に逃がす設計も検討。
  • TIMESTAMP はミリ秒精度が一般的。アプリ側のタイムゾーン規約を明文化。
  • スキーマ進化は auto.evolve ではなく、マイグレーション + 互換性ルールで統制するのが安全。

型マッピング/SMT の例(日時正規化と不要フィールド除去)

# JDBC Source の SMT 例(不要列除去 + フィールド名正規化)
transforms=dropCols,renameTs
transforms.dropCols.type=org.apache.kafka.connect.transforms.MaskField$Value
transforms.dropCols.blacklist=internal_note,temp_flag
transforms.renameTs.type=org.apache.kafka.connect.transforms.ReplaceField$Value
transforms.renameTs.renames=updatedat:updated_at

# 数値マッピング(Source)
numeric.mapping=best_fit

エラーハンドリングと再処理設計

Connect の標準エラー処理は errors.tolerance と Dead Letter Queue(DLQ)で構成します。全スキップ(all)を選ぶ場合でも、後で原因追跡できるように DLQ へ流し、ヘッダにコンテキストを付けるのが実務定石です。

一時的な DB 停止やロック競合にはリトライ/バックオフで耐える設計にし、上限を超えたら明示的に失敗させてオペレーションへエスカレーションします。

  • errors.tolerance=all + errors.deadletterqueue.topic.name=dlq.topic + errors.deadletterqueue.context.headers.enable=true
  • 再試行: max.retries / retry.backoff.ms(JDBC Sink)や errors.retry.*(Connect 共通)で制御。
  • DLQ はコンパクション無効の専用トピックを推奨。監視と可観測性(Connect REST の status/metrics)を組み合わせる。

DLQ とリトライの設定例(Sink)

errors.tolerance=all
errors.deadletterqueue.topic.name=dlq.jdbc.sink.orders
errors.deadletterqueue.context.headers.enable=true
max.retries=12
retry.backoff.ms=5000

運用テストとトラブルシューティング(CCDAK 観点)

スナップショット(初回全件)→増分切替の移行テストを必ず実施し、重複/取りこぼしがないことを SQL とオフセットで検証します。再起動時は timestamp+incrementing モードが重複に強い選択肢です。

並列度は tasks.max で段階増加。Source はテーブル数やクエリ分割なしではスケールに限界があります。Sink はトピックパーティション数と DB ロック/インデックスで頭打ちになります。Connect REST API で状態監視を自動化し、失敗時は DLQ とログを突き合わせます。

  • 初回投入はオフライン時間帯に実施し、DB バックアップとロールバック手順を用意。
  • オフセットバックアップ(Kafka 内部トピックの複製要件: replication.factor 適正化)。
  • よくある失敗: PK 未設定の upsert、増分列に NULL、時刻精度ミスマッチ、タスク過多による接続枯渇。

Connect REST でのヘルス/再配置例

# 状態確認
GET /connectors/jdbc-sink-orders/status
# タスク再起動
POST /connectors/jdbc-sink-orders/tasks/0/restart
# 設定取得
GET /connectors/jdbc-source-orders/config

問題で確認

CCDAK

問題 1

Confluent JDBC Source を用いて更新のあるテーブルを増分取り込みし、再起動時の重複を最小化したい。正しい前提/設定の組み合わせはどれか。

  1. A. mode=timestamp+incrementing を使い、非NULLの増分列と更新時刻列にインデックスを張る
  2. B. mode=incrementing のままでも、増分列がNULLを含んでいても自動的に補正される
  3. C. mode=timestamp にしておけば、更新時刻列にインデックスがなくても差分検出は影響を受けない
  4. D. mode=bulk にして poll.interval.ms を短くすれば増分と同等に動作する

正解: A

更新のあるテーブルでは timestamp+incrementing が再起動時の重複抑制に有効。増分列は非NULLかつ単調増加で、両列にインデックスが必要。B/C/D はいずれも公式の要件や推奨に反する。

よくある質問

bulk と incrementing/timestamp はどう使い分けますか?

初回の全件取り込みは bulk、その後は incrementing(追記のみ)か timestamp/timestamp+incrementing(更新あり)へ切り替えます。切替時はオフセットと件数照合で重複/欠損検証を実施します。

insert.mode=upsert でエラーになります。何が必要ですか?

pk.mode と pk.fields の指定が必要です(record_key か record_value 由来)。PK や一意制約のないテーブルでは upsert は成立しません。

エラーをスキップしつつ原因追跡したいのですが?

errors.tolerance=all と DLQ(errors.deadletterqueue.topic.name)を併用します。コンテキストヘッダも有効化し、DLQ を監視対象に加えます。

この記事で学んだ内容を問題で確認しましょう

16,000問以上の問題で実力チェック

無料で問題を解いてみる
この記事の著者

NicheeLab編集部

データエンジニアリング・クラウド資格の専門家。Databricks・Snowflake等の認定資格を保有し、実務経験に基づいた問題作成・解説を行っています。NicheeLab運営。


関連記事
Kafka

Kafka Topic と Partition の基礎: 分散とスケーラビリティの要

CCDAK 対策と実務の両立を意識し、Topic/Partition/Replica/Consumer Group の役...

Kafka

CCDAK 試験ガイド:出題範囲・配点・申込み・対策

Confluent Certified Developer for Apache Kafka (CCDAK) の出題範囲...

Kafka

Confluent Certified Administrator (CCAAK) 対策: 出題範囲・配点の考え方・運用観点の要点

CCAAKに向けて、試験領域の押さえどころを運用目線で整理。プロダクションで通用する設定・監視・セキュリティの実践知を、...

Kafka

Kafka の Replica と In-Sync Replicas を正しく設計する: 耐障害性と一貫性

レプリカとISRの仕組みを起点に、acks と min.insync.replicas、クリーン/アンクリーンリーダー選...

Kafka

Kafka の Offset とコミット: ポジション管理と at-least-once の基礎

CCDAK 対策と実務の両立を意識して、Kafka コンシューマのオフセット管理とコミット戦略を整理。at-least-...

Kafkaの記事一覧 (101件)
© 2026 NicheeLab All rights reserved.