Kafka

Kafka Sink Connectors Practical Guide: Writing from Kafka to External Systems

2026-04-19
NicheeLab Editorial Team

Kafka Connect Sink Connectors consume records from Kafka topics and write them to external systems such as databases, search engines, and object storage.

This article summarizes delivery guarantees, duplicate handling, error handling, scaling, and operational best practices grounded in the official Confluent/Apache Kafka documentation, alongside the points most often tested on the CCDAK exam.

Sink Connectors: Overview and Architecture

Kafka Connect is a framework that runs connectors and tasks on workers (standalone or distributed). Sink Connectors read from Kafka and write to external systems. Offsets are stored in internal topics and committed after successful processing.

Data transformation is handled by converters (Avro/JSON Schema/Protobuf, etc.) and SMTs (Single Message Transform). On error, you can configure retries, skipping, or forwarding to a DLQ (Dead Letter Queue).

  • A connector is the logical definition; tasks are the units of parallel execution. Use tasks.max to control parallelism.
  • Internal topics (connect-configs / connect-offsets / connect-status) are stored on Kafka.
  • Ordering is guaranteed per partition. A single partition is processed sequentially within one task.
  • Converters handle serialization and schema management; SMTs perform lightweight preprocessing and shaping.
  • DLQs are effective for isolating problematic records. Control behavior via the errors.* settings.
Connector TypeTypical Use CaseDedup / ConsistencySchema Evolution
JDBC SinkWrites to RDBMS (aggregates / ODS)Idempotent via UPSERT (e.g. pk.mode=record_key). Deletes use delete.enabled + tombstones.auto.create/auto.evolve can follow column additions (requires validation).
Elasticsearch SinkFull-text search / log analyticsIdempotent by using the Kafka key as document_id. External versioning is also an option.Schemas use dynamic mapping; watch out for type mismatches.
S3 SinkData lake raw data storageAssume duplicates may occur. Include topic/partition/offset in keys for identification.Optimize operations with format choice (Avro/Parquet/JSON) and rotation settings.
Snowflake SinkDWH ingestionTypically uses MERGE or similar on the target side to ensure dedup and consistency.Be careful about schema application on staging vs. target.

Logical architecture of a Kafka Sink Connector

errorserrorsKafka Topicst1, t2, ...Internal Topicsconfigs / offsets / statusSink Connector (logical)Connect Distributed WorkerTask #1(P0, P1)Task #2(P2, P3)DLQ TopicKafkaExternal System AExternal System BTasks parallelize per partition; failed records are routed to the DLQ.

Delivery Semantics and Consistency Model: Handling Duplicates and Ordering

Sink Connectors are fundamentally at-least-once. Retries on failure and rebalances can cause the same record to be written more than once. Designing without assuming end-to-end Exactly-once is the safer approach.

Ordering is preserved within a Kafka partition. Tasks process records in order within their assigned partitions, but ordering across partitions is not guaranteed. Meet ordering requirements through topic and key design.

  • Default guarantee: at-least-once (offset commit after successful confirmation).
  • Dedup: target-side UPSERT/primary keys, using the key as document ID, downstream dedup.
  • Read consistency: to avoid uncommitted records on ingest, set isolation.level=read_committed (via consumer.override.isolation.level).
  • Delete propagation: use tombstones (null value) and enable the connector's delete feature (supported connectors only).
  • Order within a partition is preserved. Achieve throughput via parallelism and distribution.

Design and Configuration Tips by Major Sink

Connector-specific settings directly affect delivery guarantees, schema evolution, and throughput. Build on the defaults and constraints in the official documentation, and choose the safer design.

Here we list key points using the commonly deployed JDBC / Elasticsearch / S3 / Snowflake connectors as examples.

  • JDBC Sink: set insert.mode=upsert, pk.mode=record_key, and explicitly declare pk.fields. Use delete.enabled with behavior.on.null.values=delete for delete propagation. Validate auto.create/auto.evolve in staging before production.
  • Elasticsearch Sink: set key.ignore=false to use the key as _id for idempotency. Tune bulk.size and linger to the workload. Watch for mapping conflicts and type conversions.
  • S3 Sink: organize rotation (flush.size, rotate.interval.ms) and partitioning (topics.dir, partitioner.class). Tolerate duplicates and design dedup/MERGE downstream.
  • Snowflake Sink: operationalize post-ingest MERGE using the target table's keys/unique constraints. Watch out for cost from very small batches. Apply schema changes incrementally for safety.

Error Handling, Retries, and DLQ Implementation Guidelines

Kafka Connect lets you control error handling via configuration. Regardless of whether failures occur during record conversion, serialization, or destination writes, designing for progress via retries, skipping, or DLQ forwarding is the realistic approach.

Always provision a DLQ in production, and build monitoring and reprocessing workflows for DLQ records (such as re-ingest after fixes or one-off corrections) into operations.

  • errors.tolerance=all keeps processing past failed records; none stops immediately.
  • Specify the DLQ target via errors.deadletterqueue.topic.name. Use context.headers.enable to attach cause information to headers.
  • Tune persistence against transient errors with errors.retry.timeout and errors.retry.delay.max.ms.
  • Conversion errors: revisit SMT/converter settings (schemas.enable, types, required fields).
  • Target-specific limits (constraint violations, timeouts): adjust batch size, concurrency, and retries.

Scaling and Performance: Tasks and Partition Design

Throughput is largely determined by the product of topic partition count and tasks.max. A task can own multiple partitions, but a single partition is always processed sequentially within one task.

Connector-specific batch settings (e.g. JDBC's batch.size) and fine-tuning consumption via Connect's consumer.override.* are also effective. Under overload, the connector pauses the consumer to apply backpressure.

  • Setting tasks.max higher than the partition count caps out (depends on assignment limits).
  • When scaling up, plan partition count increases and verify key distribution.
  • For bulk writes such as JDBC, tune batch.size and the connection pool / commit interval.
  • Tune records-per-poll with consumer.override.max.poll.records (balance against processing time).
  • Frequent rebalances cause stalls and lag. Ensure a stable worker count and cooperative rebalancing operationally.

Operations, Monitoring, and CCDAK Exam Points

Distributed mode is standard for production. Properly configure replication factor and cleanup policy (such as compact) on internal topics. Monitor using both the Connect REST API and JMX metrics.

On CCDAK, Connect's components, internal topics, delivery guarantees, DLQ, the roles of SMTs/converters, and representative connector settings are frequently tested. Build on the defaults and constraints from the official documentation and pay close attention to the wording of answer choices.

  • REST: /connectors, /connectors/<name>/status, /connectors/<name>/tasks
  • Internal topics: connect-configs (compact), connect-offsets (compact), connect-status (compact).
  • Security: use SSL/SASL for Kafka connections; store connector-specific destination credentials securely.
  • Change management: restart tasks in rolling fashion; version-manage configs via REST.
  • Exam tips: at-least-once is the baseline, duplicates handled on the target side. Memorize the combination of DLQ and errors.* settings.

Example: creating a JDBC Sink Connector via REST (UPSERT + deletes + DLQ)

POST /connectors HTTP/1.1
Host: connect:8083
Content-Type: application/json

{
  "name": "inventory-jdbc-sink",
  "config": {
    "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
    "tasks.max": "3",
    "topics": "inventory.customers,inventory.orders",
    "connection.url": "jdbc:postgresql://db:5432/warehouse",
    "connection.user": "app",
    "connection.password": "******",
    "auto.create": "true",
    "auto.evolve": "true",
    "insert.mode": "upsert",
    "pk.mode": "record_key",
    "pk.fields": "id",
    "delete.enabled": "true",
    "behavior.on.null.values": "delete",
    "batch.size": "3000",
    "max.retries": "10",
    "retry.backoff.ms": "5000",
    "errors.tolerance": "all",
    "errors.deadletterqueue.topic.name": "dlq.inventory",
    "errors.deadletterqueue.context.headers.enable": "true",
    "key.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "key.converter.schema.registry.url": "http://schema-registry:8081",
    "value.converter.schema.registry.url": "http://schema-registry:8081",
    "consumer.override.isolation.level": "read_committed"
  }
}

Check Your Understanding

CCDAK

問題 1

You want a Kafka Connect JDBC Sink to reflect Kafka tombstones (null value) as RDBMS deletes while maintaining UPSERT operations without duplicates. Which combination of settings is appropriate?

  1. A. insert.mode=upsert, pk.mode=record_key, delete.enabled=true, behavior.on.null.values=delete
  2. B. insert.mode=insert, pk.mode=none, delete.enabled=true, behavior.on.null.values=drop
  3. C. insert.mode=update, pk.mode=record_value, delete.enabled=false, behavior.on.null.values=ignore
  4. D. insert.mode=upsert, pk.mode=none, delete.enabled=true, behavior.on.null.values=rewrite

正解: A

Enabling deletes in JDBC Sink requires delete.enabled=true. To treat tombstones as deletes, specify behavior.on.null.values=delete. For idempotent UPSERT, use insert.mode=upsert, and since the primary key is typically the Kafka key, set pk.mode=record_key (and pk.fields).

Frequently Asked Questions

Can Sink connectors achieve Exactly-once delivery?

Kafka Connect Sink connectors are fundamentally at-least-once. Because the target is an external system, end-to-end Exactly-once cannot be generalized. In practice you combine UPSERT/primary keys, document IDs, and downstream deduplication or MERGE operations to maintain consistency.

Can I write out schemaless JSON as-is?

Yes. Use JsonConverter for value.converter and set schemas.enable=false to handle records in schemaless mode. However, since schema evolution and type validation are not enforced, combining Avro/JSON Schema/Protobuf with a Schema Registry is recommended for long-term operations.

Can I merge multiple topics into a single table?

Yes, depending on connector and SMT configuration. JDBC Sink lets you control the destination via table.name.format, but schema compatibility and primary key consistency are mandatory. Conflicts and type mismatches are common, so consider an SMT that normalizes schemas or an intermediate topic for normalization before consolidating.

Check what you learned with practice questions

Practice with certification-focused question sets

無料で問題を解いてみる
Author

NicheeLab Editorial Team

NicheeLab editorial team focused on data engineering and cloud certification learning. Content is structured around practical study needs and official exam domains.


Related articles
Kafka

Kafka Topics & Partitions: Distribution Fundamentals (2026)

How Kafka topics and partitions enable scale — ordering guar...

Kafka

CCDAK Exam Guide: Confluent Certified Developer (2026)

Complete prep for the CCDAK exam — Producer/Consumer API, St...

Kafka

CCAAK Exam Guide: Confluent Certified Administrator (2026)

Pass the CCAAK exam — cluster management, partitions, securi...

Kafka

Kafka Replicas & ISR: Fault Tolerance Explained (2026)

Replica placement, in-sync replicas (ISR), leader election. ...

Kafka

Kafka Offsets: Commit Modes & Consumer Position (2026)

Offset semantics — auto vs. manual commit, __consumer_offset...

Browse all Kafka articles (101)
© 2026 NicheeLab All rights reserved.