Kafka

Kafkaで実装するCQRSパターン: コマンドとクエリの分離設計

2026-04-19
NicheeLab編集部

CQRSは、書き込み(コマンド)と読み取り(クエリ)を独立設計することでスケールと変更容易性を高めるパターンです。Kafkaは高スループットのログ、厳格なパーティション内順序、コンパクション、トランザクションを備え、CQRSを現実的に組み立てるための部品が揃っています。

本記事では、CCDAKで問われやすい公式機能(トピックのコンパクション、Idempotent/Transactional Producer、Kafka Streams/KTable、processing.guarantee、isolation.levelなど)に寄せて、設計・設定・落とし穴を具体的に解説します。

CQRSの要点とKafkaでの落とし込み

CQRSは、書き込みモデル(状態遷移ロジック)と読み取りモデル(クエリ最適化ビュー)を分離します。Kafkaではコマンド(あるいはイベント)を受け取るトピックと、読み取り用に最適化されたビュー(コンパクションされたトピックや外部ストア)を分けるのが基本です。

Streamsアプリでコマンドを集計してKTableにマテリアライズし、クエリ層はそのKTableまたはビュー用トピックを参照します。最終的整合性を前提に、キー設計とパーティション設計で整合性境界を明確にします。

  • コマンド: 状態変更の要求(例: OrderCreated, OrderApproved)
  • クエリ: 読み取り最適化されたビュー(例: 現在の注文サマリ)
  • Kafkaの適合点: パーティション順序、ログによる再処理、コンパクション、トランザクション
設計スタイル書き込み/読み取りモデル一貫性/遅延複雑さ
CRUD単一モデル同一スキーマで兼用強整合/低遅延だがスケール制約
CQRS分離(書き込み=正規化, 読み取り=最適化)最終的整合/遅延発生
Event Sourcing + CQRSイベントから状態再構築+ビュー最終的整合/柔軟だが設計負荷

Kafkaを用いたCQRSの全体像

ClientsHTTPCommand APIProducertopic: commands.ordersKafka Streams (aggregate)KTable: orders_statetopic: views.orders(compact)Query ServiceConsumersKafkaを用いたCQRSの全体像

論理名と役割のサンプル(設計メモ)

commands.orders        # 書き込み用(append-only, retentionベース)
views.orders           # 読み取り用(コンパクション, 最新状態)
internal.orders-agg    # Streamsのchangelog/repartition(内部トピック)

トピック設計: コマンドとクエリの分離

コマンド用トピックは事実の不可逆な追記ログとして設計し、キーは集約ID(例: orderId)で固定します。読み取り用トピックは最新状態を取り出しやすいようにコンパクションを有効化します。これによりKTableや外部シンクで最新スナップショットを効率的に再構築できます。

パーティション数は書き込みスループットと並列性で決めます。順序保証はパーティション単位なので、順序が必要な単位でキーを設計します。ビュー側のトピックは同一キーで上書きされるため、値は完全スナップショット(ソースオブトゥルース)が望ましく、差分更新なら必ずキーとスキーマ進化を管理します。

  • コマンド: cleanup.policy=delete、適切なretention.ms/bytes、キー=aggregateId
  • ビュー: cleanup.policy=compact(場合によりcompact,delete)、値は最新完全状態
  • 内部トピック: Streamsが自動作成(changelog/repartition)。運用上の可視化と監視を行う
トピック種別主な設定用途/注意点
commands.*cleanup.policy=delete, min.insync.replicas>=2追記ログ。順序はパーティション内のみ。保持期間を業務要件で設定
views.*cleanup.policy=compact(またはcompact,delete), min.compaction.lag.ms最新状態を維持。tombstoneで削除を表現
internal.*Streams自動管理, replication.factor>=3推奨障害時の再構築とEOSに関与。容量・パフォーマンス監視が必要

トピック分離のイメージ

Producercommands.ordersappend-onlyStreams Aggregationviews.orderscompactトピック分離のイメージ

トピック作成例(kafka-topics)

kafka-topics --create --topic commands.orders --partitions 12 --replication-factor 3 \
  --config cleanup.policy=delete --config min.insync.replicas=2

kafka-topics --create --topic views.orders --partitions 12 --replication-factor 3 \
  --config cleanup.policy=compact --config min.cleanable.dirty.ratio=0.5

書き込みモデル: IdempotenceとTransactionsで重複と原子性を制御

プロデューサのIdempotenceは、再試行時の重複を書き込み経路上で抑止します。acks=all、適切なin-flight設定と併用します。これにより少なくとも一度配送で生じがちな重複キーを軽減できます。

トランザクションは、複数トピック(例: commands.* と views.* への同一境界内更新や、処理済みオフセットのコミット)を原子的に公開するために用います。Kafka Streamsではprocessing.guarantee=exactly_once_v2を有効化すると、内部的にトランザクションとidempotentな書き込みが使われます。

  • Idempotent Producer: enable.idempotence=true, acks=all, max.in.flight.requests.per.connection<=5
  • Transactions: transactional.idを安定化、read_committedで読み手を保護
  • Streams: processing.guarantee=exactly_once_v2を選択(安定版)。外部システムと2PCは使わない
送達保証/機能特性オーバーヘッド/適用
At-least-once重複の可能性あり低/大半の用途。下流でデデュープが必要
Idempotent Producer同一セッション内の重複抑止中/キー上書きに強い
Transactions(EOS)多出力の原子的公開+重複抑止中〜高/ビュー更新やexactly-once処理

Idempotence/Transactionsの適用イメージ

Producer(retries) -> [commands.orders]
   | enable.idempotence=true  | acks=all
   +--> Duplicates suppressed (per partition sequence)

[Transactional boundary]
  begin -> send commands + send processed offsets -> commit

Java Producer設定例(Idempotence/Transactions)

Properties p = new Properties();
p.put("bootstrap.servers", "broker:9092");
p.put("acks", "all");
p.put("enable.idempotence", "true");
p.put("max.in.flight.requests.per.connection", "5");
p.put("retries", Integer.toString(Integer.MAX_VALUE));
p.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
p.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
// トランザクションを使う場合
p.put("transactional.id", "orders-writer-001");
KafkaProducer<String, byte[]> producer = new KafkaProducer<>(p);
producer.initTransactions();
producer.beginTransaction();
producer.send(new ProducerRecord<>("commands.orders", orderId, payload));
// 必要なら追加のトピックにも送信
producer.commitTransaction();

読み取りモデル: Kafka StreamsとKTableのマテリアライズ

KTableはキーごとの最新値を保持する論理テーブルで、背後ではchangelogトピックにより耐障害性が担保されます。コマンドイベントを集約して最新状態を作る場合、groupByKey→aggregateまたはreduceでKTableを構築し、必要に応じてviews.*へ書き出します。

exactly_once_v2の有効化により、出力と内部状態更新、消費オフセットのコミットが原子的になります。再パーティショニングが発生するjoinやgroupByでは、内部トピックの設定と容量監視が重要です。

  • KStream: 逐次イベント処理/重複許容
  • KTable: 最新状態/コンパクションに親和
  • GlobalKTable: 全パーティションを各タスクに複製(参照用)
主用途注意点
KStreamイベント逐次処理/集計前段順序はパーティション内のみ。再分散が必要なことが多い
KTable最新状態の保持・クエリchangelog容量と復旧時間に注意
GlobalKTable参照データの結合メモリ/ディスクフットプリントが増加

Streamsによるマテリアライズ

commands.ordersKStreamgroupByKey → aggregateKTable(orders_state)views.orderscompactStreamsによるマテリアライズ

Kafka Streams(Exactly-once + KTable出力)の例

Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "orders-aggregate-app");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "broker:9092");
props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_V2);
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.ByteArray().getClass());

StreamsBuilder b = new StreamsBuilder();
KStream<String, OrderEvent> events = b.stream("commands.orders");
KGroupedStream<String, OrderEvent> g = events.groupByKey();
KTable<String, OrderState> state = g.aggregate(
  () -> OrderState.empty(),
  (key, ev, agg) -> agg.apply(ev),
  Materialized.<String, OrderState, KeyValueStore<Bytes, byte[]>>as("orders-state-store")
);
// ビュー用トピックへ最新状態を出力
state.toStream().to("views.orders", Produced.with(Serdes.String(), orderStateSerde));

KafkaStreams streams = new KafkaStreams(b.build(), props);
streams.start();

クエリパス設計: 低レイテンシ読み取りの選択肢

読み取りは大きく2通り: 1) Streamsのローカルステート(Interactive Queries)をREST経由で引く、2) views.*を外部ストアへシンクしてクエリする。前者は最小レイテンシだがルーティングやディスカバリが必要。後者は運用しやすいが整合と遅延が加わります。

キーに基づくルーティングを行えば、該当キーを保持するタスク(ホットスタンバイ含む)に直接問い合わせできます。外部シンクはKafka Connectの公式シンクコネクタ(JDBC/Elasticsearch/Redisなど)を用い、read_committedを有効化して未コミット・中断トランザクションを避けます。

  • Interactive Queries: キールーティング/メタデータAPIでインスタンス発見
  • 外部ストア: Connect Sinkでviews.*を反映。遅延とスキーマ進化を管理
  • どちらでも、キー不変・スキーマ後方互換を徹底
クエリ手法一貫性/遅延運用コスト向いているケース
Interactive Queries最小遅延/プロセス内整合中(ルーティング/HA)低レイテンシAPI
Connect Sink→RDB/検索基盤最終的整合/遅延+再試行低〜中(マネージド可)複雑検索/レポート
直接views.*を消費中(コンパクション依存)低(単純な消費)単純Key-Value参照

クエリパスの選択肢

Client -> REST(Query Service) -> Streams Instance -> [State Store]
                               \
                                -> [views.orders] -> Connect Sink -> [External DB]

Kafka Connect Sink(Elasticsearch例)

{
  "name": "views-orders-es",
  "config": {
    "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
    "topics": "views.orders",
    "connection.url": "http://es:9200",
    "type.name": "_doc",
    "key.ignore": false,
    "behavior.on.malformed.documents": "ignore",
    "write.method": "upsert",
    "errors.tolerance": "all",
    "consumer.override.isolation.level": "read_committed"
  }
}

一貫性とスナップショット: 最終的整合の扱い方

CQRS+Kafkaは最終的整合が基本です。順序はパーティション内でのみ保証されるため、集約境界をキーとパーティションに一致させます。マテリアライズ遅延をSLOに織り込み、必要に応じてウォーターマークやタイムアウトを設計します。

トランザクションを利用する場合、read_committedを設定したコンシューマ/コネクタのみがコミット済みデータを読みます。コンパクションを用いるビューでは、tombstoneにより論理削除を表現し、スナップショット再構築時に欠落がないことを確認します。スキーマ進化には後方互換を基本とし、Schema Registryの互換性ルールを保守します。

  • ordering: パーティション内のみ。クロス集約の同期は避ける
  • isolation.level=read_committedで未コミット/中断Txを回避
  • コンパクション: tombstone(keyのみ)で削除、min.compaction.lag.msで早すぎる圧縮を防止
課題対策関連Kafka機能
順序と重複キー設計/Idempotence/EOSIdempotent Producer, Transactions
再構築時間コンパクション調整/スタンバイレプリカKTable changelog, standby replicas
読み取り整合read_committed/ビュー分離Consumer isolation, compacted topics

最終的整合の時間関係(概念図)

t0: command appendedcommands.orderst1: Streams consumesstate updatedt2: state publishedviews.orderst3: Query reads latest vieweventual consistency satisfied最終的整合の時間関係

Consumer/Streamsの整合関連設定

# Consumerがトランザクション整合を尊重
isolation.level=read_committed

# StreamsのExactly-once
processing.guarantee=exactly_once_v2

# コンパクションを前提にする場合のビュー消費(例)
auto.offset.reset=earliest

問題で確認

CCDAK

問題 1

注文サービスでCQRSをKafka上に実装する。コマンドはcommands.ordersに書き込み、読み取りは最新状態を低遅延で返したい。重複挿入を避け、ビューの最新性と原子性を高めるために最も適切な組合せはどれか。

  1. views.ordersをcleanup.policy=compactで作成し、Kafka Streamsでprocessing.guarantee=exactly_once_v2を有効化。プロデューサはenable.idempotence=trueを設定し、クエリはisolation.level=read_committedで消費する。
  2. commands.ordersをcleanup.policy=compactにし、コンシューマ側でデデュープロジックを自前実装。Streamsはat_least_onceで実行する。
  3. views.ordersはデフォルト設定(delete)のままにし、読み取りは常に最新を得るためにauto.offset.reset=latestのみ設定する。
  4. トランザクションは使わず、同じレコードを2回書く可能性をアプリ側で許容する代わりに再試行回数を無制限にする。

正解: A

ビューは最新状態を保つためcompactが適切。Streamsのexactly_once_v2は出力・状態更新・オフセットコミットの原子性を提供。Idempotent Producerで重複の抑止を強化し、read_committedで未コミットデータを読み飛ばす。B/C/Dはいずれも整合や重複に弱い。

よくある質問

Event Sourcingは必須か?CQRSとどう関係する?

必須ではありません。CQRSは読み書きの分離設計であり、イベントそのものを唯一のソースにするか(ES)は別決定です。Kafkaではコマンド/イベントいずれでもログに蓄積し、Streamsでビューを組み立てられます。ESを採るほど再現性と履歴は豊かになりますが、設計と運用の複雑さが増します。

単一トピックでコマンドとビューを兼用してもよい?

推奨しません。書き込み最適(append-only, deleteポリシー)と読み取り最適(最新状態, compact)では要件と設定が異なります。役割ごとにトピックを分離し、スキーマも用途に合わせて独立に進化させた方が安全です。

パーティションを増やしても順序は保てる?スケール時の注意点は?

順序はパーティション内でのみ保証されます。順序が必要な粒度(例: orderId)でキーを固定し、再分散が起きないようキー関数を不変に保ってください。Streamsでは再パーティション内部トピックが増えうるため、replication.factorとクリーンアップの監視、standbyレプリカの有効化を検討します。

この記事で学んだ内容を問題で確認しましょう

16,000問以上の問題で実力チェック

無料で問題を解いてみる
この記事の著者

NicheeLab編集部

データエンジニアリング・クラウド資格の専門家。Databricks・Snowflake等の認定資格を保有し、実務経験に基づいた問題作成・解説を行っています。NicheeLab運営。


関連記事
Kafka

Kafka Topic と Partition の基礎: 分散とスケーラビリティの要

CCDAK 対策と実務の両立を意識し、Topic/Partition/Replica/Consumer Group の役...

Kafka

CCDAK 試験ガイド:出題範囲・配点・申込み・対策

Confluent Certified Developer for Apache Kafka (CCDAK) の出題範囲...

Kafka

Confluent Certified Administrator (CCAAK) 対策: 出題範囲・配点の考え方・運用観点の要点

CCAAKに向けて、試験領域の押さえどころを運用目線で整理。プロダクションで通用する設定・監視・セキュリティの実践知を、...

Kafka

Kafka の Replica と In-Sync Replicas を正しく設計する: 耐障害性と一貫性

レプリカとISRの仕組みを起点に、acks と min.insync.replicas、クリーン/アンクリーンリーダー選...

Kafka

Kafka の Offset とコミット: ポジション管理と at-least-once の基礎

CCDAK 対策と実務の両立を意識して、Kafka コンシューマのオフセット管理とコミット戦略を整理。at-least-...

Kafkaの記事一覧 (101件)
© 2026 NicheeLab All rights reserved.