JDBC Connectors are the shortest route between an RDBMS and Kafka, but getting ingest mode, primary key/schema design, polling interval, or batch size wrong gets you stuck fast.
This article covers the basics of both Source and Sink along with the key points on throughput, duplicate prevention, and error handling, in a form that pays off for both the exam and real-world operations.
JDBC Source continuously ingests data from an RDBMS into Kafka. The four representative modes are bulk, incrementing, timestamp, and timestamp+incrementing. Incremental ingest assumes you have indexes on a non-NULL, monotonically increasing column (such as an ID) or an update-time column.
You specify tables via include/exclude lists or custom SQL (query). Custom SQL is flexible, but for safety make the boundary conditions explicit on the SQL side too, so they stay consistent with the connector's offset management (the incremental boundary column).
| Layer | Main purpose | Key settings/examples |
|---|---|---|
| Source (JDBC) | RDB → Kafka ingest | mode=timestamp+incrementing, incrementing.column.name, timestamp.column.name, poll.interval.ms, batch.max.rows, topic.prefix, table.include.list |
| Sink (JDBC) | Kafka → RDB write | insert.mode, pk.mode, pk.fields, auto.create, auto.evolve, delete.enabled, batch.size, max.retries, retry.backoff.ms, table.name.format |
| Worker/Common | Runtime/error handling | tasks.max, errors.tolerance, errors.deadletterqueue.topic.name, producer.override.*, consumer.override.* |
Flow of JDBC Source/Sink connecting RDB and Kafka
Minimal configuration example for JDBC Source (incremental ingest)
name=jdbc-source-orders
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
connection.url=jdbc:postgresql://db.example.local:5432/app
connection.user=app
connection.password=***
mode=timestamp+incrementing
incrementing.column.name=id
timestamp.column.name=updated_at
topic.prefix=db_
table.include.list=public.orders,public.customers
poll.interval.ms=10000
batch.max.rows=5000
timestamp.delay.interval.ms=5000
producer.override.compression.type=snappy
producer.override.linger.ms=20JDBC Sink writes Kafka records into an RDBMS. The write modes are insert, upsert, and update. Using upsert/update requires primary key information (pk.mode, pk.fields). To convert Kafka tombstones (value=null) into physical deletes, enable delete.enabled.
You can use auto.create and auto.evolve to handle schema mismatches, but in operations it's safer to declare the table design up front and manage changes via migrations. Tune batch writes with batch.size.
Minimal configuration example for JDBC Sink (UPSERT + delete support)
name=jdbc-sink-orders
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
connection.url=jdbc:postgresql://dw.example.local:5432/warehouse
connection.user=dw
connection.password=***
topics=db_orders
insert.mode=upsert
pk.mode=record_value
pk.fields=id
auto.create=false
auto.evolve=false
delete.enabled=true
batch.size=3000
max.retries=10
retry.backoff.ms=3000
table.name.format=${topic}_stg
consumer.override.max.poll.records=2000Throughput is the product of three things: DB-side indexes and I/O, the connector's batching/polling, and Kafka-side buffering. First, set up appropriate indexes on the DB (incremental column, update-time column, PK), then size ingest/write batches sensibly.
On Source, producer tuning matters most; on Sink, it's the consumer. Connect delegates to the underlying client via its override mechanism.
Common overrides (Source=Producer / Sink=Consumer)
# Source(JDBC Source)側: プロデューサ設定
producer.override.acks=all
producer.override.compression.type=zstd
producer.override.linger.ms=30
producer.override.batch.size=131072
# Sink(JDBC Sink)側: コンシューマ設定
delete.enabled=true
consumer.override.max.poll.records=1000
consumer.override.max.partition.fetch.bytes=5242880
consumer.override.fetch.max.bytes=52428800JDBC Source maps RDB types to Kafka Connect schemas; JDBC Sink maps them in reverse. Understanding the conversion rules for DECIMAL/NUMERIC, TIMESTAMP/TIME/DATE, and BLOB/CLOB helps prevent incidents. When using a Schema Registry, plan compatibility settings and field evolution (nullable conversions, default values).
Numeric mapping is controlled via numeric.mapping (best_fit, precision_only, etc.). Watch out for timezone representation inconsistencies in date/time, and on the Source side confirm the timestamp column's precision (seconds/milliseconds/microseconds) and DB driver behavior.
Type mapping/SMT example (datetime normalization and stripping unneeded fields)
# JDBC Source の SMT 例(不要列除去 + フィールド名正規化)
transforms=dropCols,renameTs
transforms.dropCols.type=org.apache.kafka.connect.transforms.MaskField$Value
transforms.dropCols.blacklist=internal_note,temp_flag
transforms.renameTs.type=org.apache.kafka.connect.transforms.ReplaceField$Value
transforms.renameTs.renames=updatedat:updated_at
# 数値マッピング(Source)
numeric.mapping=best_fitConnect's standard error handling combines errors.tolerance and a Dead Letter Queue (DLQ). Even when choosing skip-all (all), the operational standard is to route to a DLQ so you can trace root causes later, and to attach context in the headers.
Design for transient DB outages and lock contention with retries/backoff, then explicitly fail and escalate to operations once the threshold is exceeded.
DLQ and retry configuration example (Sink)
errors.tolerance=all
errors.deadletterqueue.topic.name=dlq.jdbc.sink.orders
errors.deadletterqueue.context.headers.enable=true
max.retries=12
retry.backoff.ms=5000Always run the migration test from snapshot (initial full load) → incremental switch, and verify the absence of duplicates and missed records via SQL and offsets. On restart, timestamp+incrementing mode is the more duplicate-resistant choice.
Increase parallelism gradually via tasks.max. Without table count or query partitioning, Source has scaling limits. Sink tops out at the topic partition count and DB locks/indexes. Automate status monitoring through the Connect REST API and cross-reference DLQ with logs on failure.
Connect REST health/relocation examples
# 状態確認
GET /connectors/jdbc-sink-orders/status
# タスク再起動
POST /connectors/jdbc-sink-orders/tasks/0/restart
# 設定取得
GET /connectors/jdbc-source-orders/configCCDAK
問題 1
You want to incrementally ingest a table with updates via Confluent JDBC Source and minimize duplicates on restart. Which combination of assumptions/settings is correct?
正解: A
For tables with updates, timestamp+incrementing effectively suppresses duplicates on restart. The incremental column must be non-NULL and monotonically increasing, and indexes are required on both columns. B/C/D each contradict the official requirements or recommendations.
How should I choose between bulk and incrementing/timestamp modes?
Use bulk for the initial full snapshot, then switch to incrementing (append-only) or timestamp/timestamp+incrementing (when updates occur). When switching, validate for duplicates and missing records by reconciling offsets and row counts.
I get errors with insert.mode=upsert. What do I need?
You need to specify pk.mode and pk.fields (from record_key or record_value). Upsert cannot work against tables without a primary key or unique constraint.
How can I skip errors while still being able to trace root causes?
Combine errors.tolerance=all with a DLQ (errors.deadletterqueue.topic.name). Enable context headers and add the DLQ to your monitoring scope.
Practice with certification-focused question sets
無料で問題を解いてみるNicheeLab Editorial Team
NicheeLab editorial team focused on data engineering and cloud certification learning. Content is structured around practical study needs and official exam domains.
Kafka Topics & Partitions: Distribution Fundamentals (2026)
How Kafka topics and partitions enable scale — ordering guar...
CCDAK Exam Guide: Confluent Certified Developer (2026)
Complete prep for the CCDAK exam — Producer/Consumer API, St...
CCAAK Exam Guide: Confluent Certified Administrator (2026)
Pass the CCAAK exam — cluster management, partitions, securi...
Kafka Replicas & ISR: Fault Tolerance Explained (2026)
Replica placement, in-sync replicas (ISR), leader election. ...
Kafka Offsets: Commit Modes & Consumer Position (2026)
Offset semantics — auto vs. manual commit, __consumer_offset...