Kafka

Implementing CQRS on Kafka: Separating Commands and Queries

2026-04-19
NicheeLab Editorial Team

CQRS is a pattern that separately designs writes (commands) and reads (queries) to improve scalability and changeability. Kafka provides high-throughput logs, strict in-partition ordering, compaction, and transactions — all the building blocks you need to assemble CQRS in practice.

This article focuses on the official features most commonly tested on CCDAK — topic compaction, Idempotent/Transactional Producer, Kafka Streams/KTable, processing.guarantee, isolation.level — and walks through concrete design choices, configurations, and pitfalls.

CQRS Essentials and How They Map to Kafka

CQRS separates the write model (state-transition logic) from the read model (query-optimized views). On Kafka, the basic approach is to split topics that receive commands (or events) from views optimized for reads (compacted topics or external stores).

A Streams app aggregates commands and materializes them into a KTable, and the query layer reads from that KTable or the view topic. Assume eventual consistency, then use key design and partition design to draw clear consistency boundaries.

  • Commands: state-change requests (e.g. OrderCreated, OrderApproved)
  • Queries: read-optimized views (e.g. current order summary)
  • Where Kafka fits: partition ordering, replay via the log, compaction, transactions
Design StyleWrite / Read ModelConsistency / LatencyComplexity
Single CRUD ModelSame schema for bothStrong consistency / low latency, but limited scalabilityLow
CQRSSeparated (writes normalized, reads optimized)Eventual consistency / introduces latencyMedium
Event Sourcing + CQRSRebuild state from events + viewsEventual consistency / flexible but heavy design loadHigh

Big picture of CQRS on Kafka

ClientsHTTPCommand APIProducertopic: commands.ordersKafka Streams (aggregate)KTable: orders_statetopic: views.orders(compact)Query ServiceConsumersBig picture of CQRS on Kafka

Sample logical names and roles (design notes)

commands.orders        # 書き込み用(append-only, retentionベース)
views.orders           # 読み取り用(コンパクション, 最新状態)
internal.orders-agg    # Streamsのchangelog/repartition(内部トピック)

Topic Design: Separating Commands and Queries

Design the command topic as an immutable append-only log of facts, with the key fixed to the aggregate ID (e.g. orderId). Enable compaction on the read topic so the latest state is easy to retrieve. This lets KTables and external sinks rebuild the latest snapshot efficiently.

Pick the partition count based on write throughput and parallelism. Ordering is guaranteed per partition, so design keys at the granularity where ordering matters. View topics overwrite by key, so values should ideally be a complete snapshot (source of truth). If you must use delta updates, manage keys and schema evolution carefully.

  • Commands: cleanup.policy=delete, appropriate retention.ms/bytes, key=aggregateId
  • Views: cleanup.policy=compact (sometimes compact,delete), value carries the latest complete state
  • Internal: auto-created by Streams (changelog/repartition); ensure operational visibility and monitoring
Topic TypeKey SettingsPurpose / Caveats
commands.*cleanup.policy=delete, min.insync.replicas>=2Append-only log. Order holds only within a partition. Set retention based on business needs.
views.*cleanup.policy=compact (or compact,delete), min.compaction.lag.msMaintains the latest state. Deletes are expressed with tombstones.
internal.*Auto-managed by Streams; replication.factor>=3 recommendedInvolved in failure recovery and EOS. Capacity and performance monitoring are essential.

Topic separation, visualized

Producercommands.ordersappend-onlyStreams Aggregationviews.orderscompactTopic separation, visualized

Topic creation examples (kafka-topics)

kafka-topics --create --topic commands.orders --partitions 12 --replication-factor 3 \
  --config cleanup.policy=delete --config min.insync.replicas=2

kafka-topics --create --topic views.orders --partitions 12 --replication-factor 3 \
  --config cleanup.policy=compact --config min.cleanable.dirty.ratio=0.5

Write Model: Controlling Duplicates and Atomicity with Idempotence and Transactions

Producer idempotence suppresses duplicates from retries on the write path. Use it together with acks=all and an appropriate in-flight setting. This mitigates the duplicate keys that at-least-once delivery tends to introduce.

Use transactions to atomically publish across multiple topics (e.g. updates within the same boundary to commands.* and views.*, or committing processed offsets). In Kafka Streams, enabling processing.guarantee=exactly_once_v2 uses transactions and idempotent writes under the hood.

  • Idempotent Producer: enable.idempotence=true, acks=all, max.in.flight.requests.per.connection<=5
  • Transactions: stabilize transactional.id and protect readers with read_committed
  • Streams: choose processing.guarantee=exactly_once_v2 (the stable version). Do not use 2PC with external systems.
Delivery Guarantee / FeatureCharacteristicsOverhead / Where to Use
At-least-onceDuplicates are possibleLow / fits most workloads. Requires downstream dedup.
Idempotent ProducerSuppresses duplicates within the same sessionMedium / robust against key overwrites
Transactions(EOS)Atomic publish across multiple outputs + duplicate suppressionMedium-high / view updates and exactly-once processing

How Idempotence and Transactions apply

Producer(retries) -> [commands.orders]
   | enable.idempotence=true  | acks=all
   +--> Duplicates suppressed (per partition sequence)

[Transactional boundary]
  begin -> send commands + send processed offsets -> commit

Java Producer configuration example (Idempotence / Transactions)

Properties p = new Properties();
p.put("bootstrap.servers", "broker:9092");
p.put("acks", "all");
p.put("enable.idempotence", "true");
p.put("max.in.flight.requests.per.connection", "5");
p.put("retries", Integer.toString(Integer.MAX_VALUE));
p.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
p.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
// トランザクションを使う場合
p.put("transactional.id", "orders-writer-001");
KafkaProducer<String, byte[]> producer = new KafkaProducer<>(p);
producer.initTransactions();
producer.beginTransaction();
producer.send(new ProducerRecord<>("commands.orders", orderId, payload));
// 必要なら追加のトピックにも送信
producer.commitTransaction();

Read Model: Materialization with Kafka Streams and KTable

A KTable is a logical table that holds the latest value per key, backed by a changelog topic for fault tolerance. When aggregating command events into the latest state, build a KTable with groupByKey → aggregate or reduce, and write it to views.* as needed.

Enabling exactly_once_v2 makes outputs, internal state updates, and consumer offset commits atomic. For joins and groupBy operations that trigger repartitioning, internal topic configuration and capacity monitoring matter most.

  • KStream: per-event processing, tolerates duplicates
  • KTable: latest state, naturally aligned with compaction
  • GlobalKTable: replicates all partitions to every task (lookup data)
TypePrimary UseCaveats
KStreamPer-event processing, pre-aggregation stageOrder holds only within a partition. Reshuffling is often required.
KTableHolding and querying the latest stateWatch out for changelog size and recovery time.
GlobalKTableJoins against lookup dataMemory and disk footprint grow.

Materialization with Streams

commands.ordersKStreamgroupByKey → aggregateKTable(orders_state)views.orderscompactMaterialization with Streams

Kafka Streams example (Exactly-once + KTable output)

Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "orders-aggregate-app");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "broker:9092");
props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_V2);
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.ByteArray().getClass());

StreamsBuilder b = new StreamsBuilder();
KStream<String, OrderEvent> events = b.stream("commands.orders");
KGroupedStream<String, OrderEvent> g = events.groupByKey();
KTable<String, OrderState> state = g.aggregate(
  () -> OrderState.empty(),
  (key, ev, agg) -> agg.apply(ev),
  Materialized.<String, OrderState, KeyValueStore<Bytes, byte[]>>as("orders-state-store")
);
// ビュー用トピックへ最新状態を出力
state.toStream().to("views.orders", Produced.with(Serdes.String(), orderStateSerde));

KafkaStreams streams = new KafkaStreams(b.build(), props);
streams.start();

Query Path Design: Options for Low-Latency Reads

There are two broad options for reads: 1) pull Streams local state (Interactive Queries) over REST, or 2) sink views.* into an external store and query it. The first option gives minimal latency but requires routing and discovery; the second is easier to operate but adds consistency lag and delay.

With key-based routing you can directly query the task (including hot standbys) that owns a given key. For external sinks, use the official Kafka Connect sink connectors (JDBC/Elasticsearch/Redis, etc.) and enable read_committed to skip uncommitted and aborted transactions.

  • Interactive Queries: key routing and instance discovery via the metadata API
  • External store: project views.* via Connect Sink; manage lag and schema evolution
  • Either way, enforce immutable keys and backward-compatible schema evolution
Query ApproachConsistency / LatencyOperational CostWhere It Fits
Interactive QueriesMinimal latency / in-process consistencyMedium (routing / HA)Low-latency APIs
Connect Sink → RDB / search platformEventual consistency / latency + retriesLow to medium (managed offerings available)Complex search / reporting
Consume views.* directlyMedium (depends on compaction)Low (simple consumption)Simple key-value lookups

Query path options

Client -> REST(Query Service) -> Streams Instance -> [State Store]
                               \
                                -> [views.orders] -> Connect Sink -> [External DB]

Kafka Connect Sink (Elasticsearch example)

{
  "name": "views-orders-es",
  "config": {
    "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
    "topics": "views.orders",
    "connection.url": "http://es:9200",
    "type.name": "_doc",
    "key.ignore": false,
    "behavior.on.malformed.documents": "ignore",
    "write.method": "upsert",
    "errors.tolerance": "all",
    "consumer.override.isolation.level": "read_committed"
  }
}

Consistency and Snapshots: Working with Eventual Consistency

CQRS + Kafka is fundamentally eventually consistent. Ordering is only guaranteed within a partition, so align aggregate boundaries with keys and partitions. Bake materialization lag into your SLOs, and design watermarks or timeouts where needed.

When transactions are in play, only consumers and connectors configured with read_committed see committed data. In compacted views, express logical deletes with tombstones and verify there are no gaps when rebuilding snapshots. Use backward-compatible schema evolution as the default, and maintain Schema Registry compatibility rules.

  • ordering: only within a partition. Avoid cross-aggregate synchronization.
  • isolation.level=read_committed avoids uncommitted/aborted transactions
  • Compaction: delete with tombstones (key only); use min.compaction.lag.ms to prevent premature compaction
ChallengeMitigationRelated Kafka Features
Ordering and duplicatesKey design / Idempotence / EOSIdempotent Producer, Transactions
Recovery timeTune compaction / use standby replicasKTable changelog, standby replicas
Read consistencyread_committed / separate viewsConsumer isolation, compacted topics

Timing of eventual consistency (conceptual)

t0: command appendedcommands.orderst1: Streams consumesstate updatedt2: state publishedviews.orderst3: Query reads latest vieweventual consistency satisfiedTiming of eventual consistency

Consistency-related settings for Consumer / Streams

# Consumerがトランザクション整合を尊重
isolation.level=read_committed

# StreamsのExactly-once
processing.guarantee=exactly_once_v2

# コンパクションを前提にする場合のビュー消費(例)
auto.offset.reset=earliest

Check Yourself with a Practice Question

CCDAK

問題 1

You're implementing CQRS on Kafka for an order service. Commands are written to commands.orders, and reads must return the latest state with low latency. To avoid duplicate inserts and to strengthen view freshness and atomicity, which combination is most appropriate?

  1. Create views.orders with cleanup.policy=compact, enable processing.guarantee=exactly_once_v2 in Kafka Streams, set enable.idempotence=true on the producer, and consume with isolation.level=read_committed on the query side.
  2. Set commands.orders to cleanup.policy=compact, implement custom dedup logic on the consumer side, and run Streams with at_least_once.
  3. Leave views.orders at the default (delete) policy and just set auto.offset.reset=latest so reads always get the freshest data.
  4. Skip transactions; tolerate writing the same record twice at the application layer and set retries to unlimited.

正解: A

Compact is the right policy for views, since they must hold the latest state. Streams' exactly_once_v2 provides atomicity across outputs, state updates, and offset commits. Idempotent Producer further suppresses duplicates, and read_committed skips uncommitted data. B, C, and D are all weak on consistency or duplicates.

Frequently Asked Questions

Is Event Sourcing required? How is it related to CQRS?

It is not required. CQRS is a pattern that separates writes and reads; deciding whether the event log itself is the only source of truth (ES) is a separate choice. With Kafka you can persist either commands or events in the log and build views with Streams. Adopting ES gives you richer replay and history, but it adds design and operational complexity.

Can a single topic serve both commands and views?

Not recommended. Write-optimized (append-only, delete policy) and read-optimized (latest state, compact) topics have different requirements and configurations. It is safer to separate topics by role and let each schema evolve independently for its specific purpose.

Does ordering hold when partitions increase? What should I watch out for when scaling?

Ordering is only guaranteed within a single partition. Fix the key at the granularity where order matters (e.g. orderId) and keep the key function immutable to avoid reshuffling. In Streams, internal repartition topics can grow, so monitor replication.factor and cleanup, and consider enabling standby replicas.

Check what you learned with practice questions

Practice with certification-focused question sets

無料で問題を解いてみる
Author

NicheeLab Editorial Team

NicheeLab editorial team focused on data engineering and cloud certification learning. Content is structured around practical study needs and official exam domains.


Related articles
Kafka

Kafka Topics & Partitions: Distribution Fundamentals (2026)

How Kafka topics and partitions enable scale — ordering guar...

Kafka

CCDAK Exam Guide: Confluent Certified Developer (2026)

Complete prep for the CCDAK exam — Producer/Consumer API, St...

Kafka

CCAAK Exam Guide: Confluent Certified Administrator (2026)

Pass the CCAAK exam — cluster management, partitions, securi...

Kafka

Kafka Replicas & ISR: Fault Tolerance Explained (2026)

Replica placement, in-sync replicas (ISR), leader election. ...

Kafka

Kafka Offsets: Commit Modes & Consumer Position (2026)

Offset semantics — auto vs. manual commit, __consumer_offset...

Browse all Kafka articles (101)
© 2026 NicheeLab All rights reserved.