Kafka

Kafka JDBC Connectors: Source/Sink and Tuning Practical Guide

2026-04-19
NicheeLab Editorial Team

JDBC Connectors are the shortest route between an RDBMS and Kafka, but getting ingest mode, primary key/schema design, polling interval, or batch size wrong gets you stuck fast.

This article covers the basics of both Source and Sink along with the key points on throughput, duplicate prevention, and error handling, in a form that pays off for both the exam and real-world operations.

JDBC Source Basics and Design Modes

JDBC Source continuously ingests data from an RDBMS into Kafka. The four representative modes are bulk, incrementing, timestamp, and timestamp+incrementing. Incremental ingest assumes you have indexes on a non-NULL, monotonically increasing column (such as an ID) or an update-time column.

You specify tables via include/exclude lists or custom SQL (query). Custom SQL is flexible, but for safety make the boundary conditions explicit on the SQL side too, so they stay consistent with the connector's offset management (the incremental boundary column).

  • Mode selection guide: timestamp(+incrementing) for tables with updates, incrementing for append-only, and bulk for the initial snapshot.
  • Polling uses poll.interval.ms; rows fetched per poll uses batch.max.rows. Use timestamp.delay.interval.ms to absorb offset latency.
  • Use topic.prefix for topic naming. Parallelism is controlled by tasks.max (table-level splitting is the default).
  • You can override the Source-side producer settings (acks, compression.type, linger.ms, etc.) via producer.override.*.
LayerMain purposeKey settings/examples
Source (JDBC)RDB → Kafka ingestmode=timestamp+incrementing, incrementing.column.name, timestamp.column.name, poll.interval.ms, batch.max.rows, topic.prefix, table.include.list
Sink (JDBC)Kafka → RDB writeinsert.mode, pk.mode, pk.fields, auto.create, auto.evolve, delete.enabled, batch.size, max.retries, retry.backoff.ms, table.name.format
Worker/CommonRuntime/error handlingtasks.max, errors.tolerance, errors.deadletterqueue.topic.name, producer.override.*, consumer.override.*

Flow of JDBC Source/Sink connecting RDB and Kafka

poll() (incremental)consume()topicsRDBMS(OLTP, Postgres)Kafka Connect(JDBC Source/Sink)RDBMS/DWH(OLAP, MySQL etc.)Kafka Topics

Minimal configuration example for JDBC Source (incremental ingest)

name=jdbc-source-orders
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
connection.url=jdbc:postgresql://db.example.local:5432/app
connection.user=app
connection.password=***
mode=timestamp+incrementing
incrementing.column.name=id
timestamp.column.name=updated_at
topic.prefix=db_
table.include.list=public.orders,public.customers
poll.interval.ms=10000
batch.max.rows=5000
timestamp.delay.interval.ms=5000
producer.override.compression.type=snappy
producer.override.linger.ms=20

JDBC Sink Basics and Key Options

JDBC Sink writes Kafka records into an RDBMS. The write modes are insert, upsert, and update. Using upsert/update requires primary key information (pk.mode, pk.fields). To convert Kafka tombstones (value=null) into physical deletes, enable delete.enabled.

You can use auto.create and auto.evolve to handle schema mismatches, but in operations it's safer to declare the table design up front and manage changes via migrations. Tune batch writes with batch.size.

  • With insert.mode=upsert, no PK means an error. The PK is specified from record_key, record_value, or both.
  • delete.enabled=true converts tombstones into DELETE statements. PK is required; watch out for foreign key constraints on the DB side.
  • You can control the number of records pulled per fetch via consumer.override.max.poll.records (for backpressure tuning).

Minimal configuration example for JDBC Sink (UPSERT + delete support)

name=jdbc-sink-orders
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
connection.url=jdbc:postgresql://dw.example.local:5432/warehouse
connection.user=dw
connection.password=***
topics=db_orders
insert.mode=upsert
pk.mode=record_value
pk.fields=id
auto.create=false
auto.evolve=false
delete.enabled=true
batch.size=3000
max.retries=10
retry.backoff.ms=3000
table.name.format=${topic}_stg
consumer.override.max.poll.records=2000

Tuning Essentials Shared by Source and Sink

Throughput is the product of three things: DB-side indexes and I/O, the connector's batching/polling, and Kafka-side buffering. First, set up appropriate indexes on the DB (incremental column, update-time column, PK), then size ingest/write batches sensibly.

On Source, producer tuning matters most; on Sink, it's the consumer. Connect delegates to the underlying client via its override mechanism.

  • DB: B-tree indexes on the incremental column, update-time column, and PK. Avoid projecting unnecessary columns.
  • Source: balance latency and load via the combination of poll.interval.ms and batch.max.rows. For large catch-up loads, combine with producer.override.linger.ms / compression.
  • Sink: size batch.size to the DB's transaction capacity. Control fetches with consumer.override.max.poll.records.
  • Parallelism: pushing tasks.max too high causes DB lock contention and connection exhaustion. Plan it together with your connection pool and the DB's max_connections.
  • Network: when RTT to the Kafka broker is high, absorb it with linger.ms and batching.

Common overrides (Source=Producer / Sink=Consumer)

# Source(JDBC Source)側: プロデューサ設定
producer.override.acks=all
producer.override.compression.type=zstd
producer.override.linger.ms=30
producer.override.batch.size=131072

# Sink(JDBC Sink)側: コンシューマ設定
delete.enabled=true
consumer.override.max.poll.records=1000
consumer.override.max.partition.fetch.bytes=5242880
consumer.override.fetch.max.bytes=52428800

Schema and Type Mapping Considerations

JDBC Source maps RDB types to Kafka Connect schemas; JDBC Sink maps them in reverse. Understanding the conversion rules for DECIMAL/NUMERIC, TIMESTAMP/TIME/DATE, and BLOB/CLOB helps prevent incidents. When using a Schema Registry, plan compatibility settings and field evolution (nullable conversions, default values).

Numeric mapping is controlled via numeric.mapping (best_fit, precision_only, etc.). Watch out for timezone representation inconsistencies in date/time, and on the Source side confirm the timestamp column's precision (seconds/milliseconds/microseconds) and DB driver behavior.

  • DECIMAL/NUMERIC: watch for precision/scale loss. Consider falling back to string as needed.
  • TIMESTAMP: millisecond precision is typical. Document the application's timezone conventions clearly.
  • Schema evolution is safest when governed via migrations + compatibility rules, not auto.evolve.

Type mapping/SMT example (datetime normalization and stripping unneeded fields)

# JDBC Source の SMT 例(不要列除去 + フィールド名正規化)
transforms=dropCols,renameTs
transforms.dropCols.type=org.apache.kafka.connect.transforms.MaskField$Value
transforms.dropCols.blacklist=internal_note,temp_flag
transforms.renameTs.type=org.apache.kafka.connect.transforms.ReplaceField$Value
transforms.renameTs.renames=updatedat:updated_at

# 数値マッピング(Source)
numeric.mapping=best_fit

Error Handling and Reprocessing Design

Connect's standard error handling combines errors.tolerance and a Dead Letter Queue (DLQ). Even when choosing skip-all (all), the operational standard is to route to a DLQ so you can trace root causes later, and to attach context in the headers.

Design for transient DB outages and lock contention with retries/backoff, then explicitly fail and escalate to operations once the threshold is exceeded.

  • errors.tolerance=all + errors.deadletterqueue.topic.name=dlq.topic + errors.deadletterqueue.context.headers.enable=true
  • Retries: controlled via max.retries / retry.backoff.ms (JDBC Sink) or errors.retry.* (Connect-wide).
  • Recommend a dedicated DLQ topic with compaction disabled. Combine it with monitoring and observability (Connect REST status/metrics).

DLQ and retry configuration example (Sink)

errors.tolerance=all
errors.deadletterqueue.topic.name=dlq.jdbc.sink.orders
errors.deadletterqueue.context.headers.enable=true
max.retries=12
retry.backoff.ms=5000

Operational Testing and Troubleshooting (from a CCDAK Perspective)

Always run the migration test from snapshot (initial full load) → incremental switch, and verify the absence of duplicates and missed records via SQL and offsets. On restart, timestamp+incrementing mode is the more duplicate-resistant choice.

Increase parallelism gradually via tasks.max. Without table count or query partitioning, Source has scaling limits. Sink tops out at the topic partition count and DB locks/indexes. Automate status monitoring through the Connect REST API and cross-reference DLQ with logs on failure.

  • Run the initial load during an offline window, with DB backups and a rollback procedure ready.
  • Offset backup (replication requirements for the Kafka internal topic: tune replication.factor).
  • Common failures: upsert without a PK set, NULL values in the incremental column, time precision mismatches, and connection exhaustion from too many tasks.

Connect REST health/relocation examples

# 状態確認
GET /connectors/jdbc-sink-orders/status
# タスク再起動
POST /connectors/jdbc-sink-orders/tasks/0/restart
# 設定取得
GET /connectors/jdbc-source-orders/config

Check with a Question

CCDAK

問題 1

You want to incrementally ingest a table with updates via Confluent JDBC Source and minimize duplicates on restart. Which combination of assumptions/settings is correct?

  1. A. Use mode=timestamp+incrementing and put indexes on the non-NULL incremental column and the update-time column
  2. B. With mode=incrementing, NULL values in the incremental column are automatically corrected
  3. C. As long as mode=timestamp is set, delta detection is unaffected even without an index on the update-time column
  4. D. Setting mode=bulk with a shorter poll.interval.ms behaves equivalently to incremental

正解: A

For tables with updates, timestamp+incrementing effectively suppresses duplicates on restart. The incremental column must be non-NULL and monotonically increasing, and indexes are required on both columns. B/C/D each contradict the official requirements or recommendations.

Frequently Asked Questions

How should I choose between bulk and incrementing/timestamp modes?

Use bulk for the initial full snapshot, then switch to incrementing (append-only) or timestamp/timestamp+incrementing (when updates occur). When switching, validate for duplicates and missing records by reconciling offsets and row counts.

I get errors with insert.mode=upsert. What do I need?

You need to specify pk.mode and pk.fields (from record_key or record_value). Upsert cannot work against tables without a primary key or unique constraint.

How can I skip errors while still being able to trace root causes?

Combine errors.tolerance=all with a DLQ (errors.deadletterqueue.topic.name). Enable context headers and add the DLQ to your monitoring scope.

Check what you learned with practice questions

Practice with certification-focused question sets

無料で問題を解いてみる
Author

NicheeLab Editorial Team

NicheeLab editorial team focused on data engineering and cloud certification learning. Content is structured around practical study needs and official exam domains.


Related articles
Kafka

Kafka Topics & Partitions: Distribution Fundamentals (2026)

How Kafka topics and partitions enable scale — ordering guar...

Kafka

CCDAK Exam Guide: Confluent Certified Developer (2026)

Complete prep for the CCDAK exam — Producer/Consumer API, St...

Kafka

CCAAK Exam Guide: Confluent Certified Administrator (2026)

Pass the CCAAK exam — cluster management, partitions, securi...

Kafka

Kafka Replicas & ISR: Fault Tolerance Explained (2026)

Replica placement, in-sync replicas (ISR), leader election. ...

Kafka

Kafka Offsets: Commit Modes & Consumer Position (2026)

Offset semantics — auto vs. manual commit, __consumer_offset...

Browse all Kafka articles (101)
© 2026 NicheeLab All rights reserved.