CQRSは、書き込み(コマンド)と読み取り(クエリ)を独立設計することでスケールと変更容易性を高めるパターンです。Kafkaは高スループットのログ、厳格なパーティション内順序、コンパクション、トランザクションを備え、CQRSを現実的に組み立てるための部品が揃っています。
本記事では、CCDAKで問われやすい公式機能(トピックのコンパクション、Idempotent/Transactional Producer、Kafka Streams/KTable、processing.guarantee、isolation.levelなど)に寄せて、設計・設定・落とし穴を具体的に解説します。
CQRSは、書き込みモデル(状態遷移ロジック)と読み取りモデル(クエリ最適化ビュー)を分離します。Kafkaではコマンド(あるいはイベント)を受け取るトピックと、読み取り用に最適化されたビュー(コンパクションされたトピックや外部ストア)を分けるのが基本です。
Streamsアプリでコマンドを集計してKTableにマテリアライズし、クエリ層はそのKTableまたはビュー用トピックを参照します。最終的整合性を前提に、キー設計とパーティション設計で整合性境界を明確にします。
| 設計スタイル | 書き込み/読み取りモデル | 一貫性/遅延 | 複雑さ |
|---|---|---|---|
| CRUD単一モデル | 同一スキーマで兼用 | 強整合/低遅延だがスケール制約 | 低 |
| CQRS | 分離(書き込み=正規化, 読み取り=最適化) | 最終的整合/遅延発生 | 中 |
| Event Sourcing + CQRS | イベントから状態再構築+ビュー | 最終的整合/柔軟だが設計負荷 | 高 |
Kafkaを用いたCQRSの全体像
論理名と役割のサンプル(設計メモ)
commands.orders # 書き込み用(append-only, retentionベース)
views.orders # 読み取り用(コンパクション, 最新状態)
internal.orders-agg # Streamsのchangelog/repartition(内部トピック)コマンド用トピックは事実の不可逆な追記ログとして設計し、キーは集約ID(例: orderId)で固定します。読み取り用トピックは最新状態を取り出しやすいようにコンパクションを有効化します。これによりKTableや外部シンクで最新スナップショットを効率的に再構築できます。
パーティション数は書き込みスループットと並列性で決めます。順序保証はパーティション単位なので、順序が必要な単位でキーを設計します。ビュー側のトピックは同一キーで上書きされるため、値は完全スナップショット(ソースオブトゥルース)が望ましく、差分更新なら必ずキーとスキーマ進化を管理します。
| トピック種別 | 主な設定 | 用途/注意点 |
|---|---|---|
| commands.* | cleanup.policy=delete, min.insync.replicas>=2 | 追記ログ。順序はパーティション内のみ。保持期間を業務要件で設定 |
| views.* | cleanup.policy=compact(またはcompact,delete), min.compaction.lag.ms | 最新状態を維持。tombstoneで削除を表現 |
| internal.* | Streams自動管理, replication.factor>=3推奨 | 障害時の再構築とEOSに関与。容量・パフォーマンス監視が必要 |
トピック分離のイメージ
トピック作成例(kafka-topics)
kafka-topics --create --topic commands.orders --partitions 12 --replication-factor 3 \
--config cleanup.policy=delete --config min.insync.replicas=2
kafka-topics --create --topic views.orders --partitions 12 --replication-factor 3 \
--config cleanup.policy=compact --config min.cleanable.dirty.ratio=0.5プロデューサのIdempotenceは、再試行時の重複を書き込み経路上で抑止します。acks=all、適切なin-flight設定と併用します。これにより少なくとも一度配送で生じがちな重複キーを軽減できます。
トランザクションは、複数トピック(例: commands.* と views.* への同一境界内更新や、処理済みオフセットのコミット)を原子的に公開するために用います。Kafka Streamsではprocessing.guarantee=exactly_once_v2を有効化すると、内部的にトランザクションとidempotentな書き込みが使われます。
| 送達保証/機能 | 特性 | オーバーヘッド/適用 |
|---|---|---|
| At-least-once | 重複の可能性あり | 低/大半の用途。下流でデデュープが必要 |
| Idempotent Producer | 同一セッション内の重複抑止 | 中/キー上書きに強い |
| Transactions(EOS) | 多出力の原子的公開+重複抑止 | 中〜高/ビュー更新やexactly-once処理 |
Idempotence/Transactionsの適用イメージ
Producer(retries) -> [commands.orders]
| enable.idempotence=true | acks=all
+--> Duplicates suppressed (per partition sequence)
[Transactional boundary]
begin -> send commands + send processed offsets -> commitJava Producer設定例(Idempotence/Transactions)
Properties p = new Properties();
p.put("bootstrap.servers", "broker:9092");
p.put("acks", "all");
p.put("enable.idempotence", "true");
p.put("max.in.flight.requests.per.connection", "5");
p.put("retries", Integer.toString(Integer.MAX_VALUE));
p.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
p.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
// トランザクションを使う場合
p.put("transactional.id", "orders-writer-001");
KafkaProducer<String, byte[]> producer = new KafkaProducer<>(p);
producer.initTransactions();
producer.beginTransaction();
producer.send(new ProducerRecord<>("commands.orders", orderId, payload));
// 必要なら追加のトピックにも送信
producer.commitTransaction();KTableはキーごとの最新値を保持する論理テーブルで、背後ではchangelogトピックにより耐障害性が担保されます。コマンドイベントを集約して最新状態を作る場合、groupByKey→aggregateまたはreduceでKTableを構築し、必要に応じてviews.*へ書き出します。
exactly_once_v2の有効化により、出力と内部状態更新、消費オフセットのコミットが原子的になります。再パーティショニングが発生するjoinやgroupByでは、内部トピックの設定と容量監視が重要です。
| 型 | 主用途 | 注意点 |
|---|---|---|
| KStream | イベント逐次処理/集計前段 | 順序はパーティション内のみ。再分散が必要なことが多い |
| KTable | 最新状態の保持・クエリ | changelog容量と復旧時間に注意 |
| GlobalKTable | 参照データの結合 | メモリ/ディスクフットプリントが増加 |
Streamsによるマテリアライズ
Kafka Streams(Exactly-once + KTable出力)の例
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "orders-aggregate-app");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "broker:9092");
props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_V2);
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.ByteArray().getClass());
StreamsBuilder b = new StreamsBuilder();
KStream<String, OrderEvent> events = b.stream("commands.orders");
KGroupedStream<String, OrderEvent> g = events.groupByKey();
KTable<String, OrderState> state = g.aggregate(
() -> OrderState.empty(),
(key, ev, agg) -> agg.apply(ev),
Materialized.<String, OrderState, KeyValueStore<Bytes, byte[]>>as("orders-state-store")
);
// ビュー用トピックへ最新状態を出力
state.toStream().to("views.orders", Produced.with(Serdes.String(), orderStateSerde));
KafkaStreams streams = new KafkaStreams(b.build(), props);
streams.start();読み取りは大きく2通り: 1) Streamsのローカルステート(Interactive Queries)をREST経由で引く、2) views.*を外部ストアへシンクしてクエリする。前者は最小レイテンシだがルーティングやディスカバリが必要。後者は運用しやすいが整合と遅延が加わります。
キーに基づくルーティングを行えば、該当キーを保持するタスク(ホットスタンバイ含む)に直接問い合わせできます。外部シンクはKafka Connectの公式シンクコネクタ(JDBC/Elasticsearch/Redisなど)を用い、read_committedを有効化して未コミット・中断トランザクションを避けます。
| クエリ手法 | 一貫性/遅延 | 運用コスト | 向いているケース |
|---|---|---|---|
| Interactive Queries | 最小遅延/プロセス内整合 | 中(ルーティング/HA) | 低レイテンシAPI |
| Connect Sink→RDB/検索基盤 | 最終的整合/遅延+再試行 | 低〜中(マネージド可) | 複雑検索/レポート |
| 直接views.*を消費 | 中(コンパクション依存) | 低(単純な消費) | 単純Key-Value参照 |
クエリパスの選択肢
Client -> REST(Query Service) -> Streams Instance -> [State Store]
\
-> [views.orders] -> Connect Sink -> [External DB]Kafka Connect Sink(Elasticsearch例)
{
"name": "views-orders-es",
"config": {
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"topics": "views.orders",
"connection.url": "http://es:9200",
"type.name": "_doc",
"key.ignore": false,
"behavior.on.malformed.documents": "ignore",
"write.method": "upsert",
"errors.tolerance": "all",
"consumer.override.isolation.level": "read_committed"
}
}CQRS+Kafkaは最終的整合が基本です。順序はパーティション内でのみ保証されるため、集約境界をキーとパーティションに一致させます。マテリアライズ遅延をSLOに織り込み、必要に応じてウォーターマークやタイムアウトを設計します。
トランザクションを利用する場合、read_committedを設定したコンシューマ/コネクタのみがコミット済みデータを読みます。コンパクションを用いるビューでは、tombstoneにより論理削除を表現し、スナップショット再構築時に欠落がないことを確認します。スキーマ進化には後方互換を基本とし、Schema Registryの互換性ルールを保守します。
| 課題 | 対策 | 関連Kafka機能 |
|---|---|---|
| 順序と重複 | キー設計/Idempotence/EOS | Idempotent Producer, Transactions |
| 再構築時間 | コンパクション調整/スタンバイレプリカ | KTable changelog, standby replicas |
| 読み取り整合 | read_committed/ビュー分離 | Consumer isolation, compacted topics |
最終的整合の時間関係(概念図)
Consumer/Streamsの整合関連設定
# Consumerがトランザクション整合を尊重
isolation.level=read_committed
# StreamsのExactly-once
processing.guarantee=exactly_once_v2
# コンパクションを前提にする場合のビュー消費(例)
auto.offset.reset=earliestCCDAK
問題 1
注文サービスでCQRSをKafka上に実装する。コマンドはcommands.ordersに書き込み、読み取りは最新状態を低遅延で返したい。重複挿入を避け、ビューの最新性と原子性を高めるために最も適切な組合せはどれか。
正解: A
ビューは最新状態を保つためcompactが適切。Streamsのexactly_once_v2は出力・状態更新・オフセットコミットの原子性を提供。Idempotent Producerで重複の抑止を強化し、read_committedで未コミットデータを読み飛ばす。B/C/Dはいずれも整合や重複に弱い。
Event Sourcingは必須か?CQRSとどう関係する?
必須ではありません。CQRSは読み書きの分離設計であり、イベントそのものを唯一のソースにするか(ES)は別決定です。Kafkaではコマンド/イベントいずれでもログに蓄積し、Streamsでビューを組み立てられます。ESを採るほど再現性と履歴は豊かになりますが、設計と運用の複雑さが増します。
単一トピックでコマンドとビューを兼用してもよい?
推奨しません。書き込み最適(append-only, deleteポリシー)と読み取り最適(最新状態, compact)では要件と設定が異なります。役割ごとにトピックを分離し、スキーマも用途に合わせて独立に進化させた方が安全です。
パーティションを増やしても順序は保てる?スケール時の注意点は?
順序はパーティション内でのみ保証されます。順序が必要な粒度(例: orderId)でキーを固定し、再分散が起きないようキー関数を不変に保ってください。Streamsでは再パーティション内部トピックが増えうるため、replication.factorとクリーンアップの監視、standbyレプリカの有効化を検討します。
NicheeLab編集部
データエンジニアリング・クラウド資格の専門家。Databricks・Snowflake等の認定資格を保有し、実務経験に基づいた問題作成・解説を行っています。NicheeLab運営。
Kafka Topic と Partition の基礎: 分散とスケーラビリティの要
CCDAK 対策と実務の両立を意識し、Topic/Partition/Replica/Consumer Group の役...
CCDAK 試験ガイド:出題範囲・配点・申込み・対策
Confluent Certified Developer for Apache Kafka (CCDAK) の出題範囲...
Confluent Certified Administrator (CCAAK) 対策: 出題範囲・配点の考え方・運用観点の要点
CCAAKに向けて、試験領域の押さえどころを運用目線で整理。プロダクションで通用する設定・監視・セキュリティの実践知を、...
Kafka の Replica と In-Sync Replicas を正しく設計する: 耐障害性と一貫性
レプリカとISRの仕組みを起点に、acks と min.insync.replicas、クリーン/アンクリーンリーダー選...
Kafka の Offset とコミット: ポジション管理と at-least-once の基礎
CCDAK 対策と実務の両立を意識して、Kafka コンシューマのオフセット管理とコミット戦略を整理。at-least-...