Kafka

Kafkaにおけるステートフル処理入門: 集約・Join・ステートストアの実務とCCDAK対策

2026-04-19
NicheeLab編集部

Kafkaはメッセージングの枠を超え、ストリーム処理プラットフォームとして確立しています。特にステートフル処理は、集約・Join・ステートストアを正しく理解してはじめて安定稼働します。

本稿ではKafka Streamsを軸に、公式ドキュメントの動作仕様に基づいて用語・アーキテクチャ・落とし穴を整理します。CCDAK(Confluent Certified Developer for Apache Kafka)の出題傾向にも触れながら、実務で迷わない判断材料を提示します。

ステートフル処理の全体像と用語整理

Kafkaのステートフル処理は、演算に必要な状態をタスクごとにローカルのステートストアに保持し、変更履歴をKafkaのチェンジログトピックへ書き出すことで耐障害性を確保します。復旧時はチェンジログから状態をリプレイしてストアを再構築します。

代表的なステートフル演算は、集約(count/reduce/aggregate)、ウィンドウ集約、Join(KStream-KStream、KStream-KTable、KTable-KTable)です。ウィンドウ処理では、時間境界(ウィンドウサイズ)と遅延許容量(grace)が結果の確定タイミングを決めます。Joinではコーパーティション(同じキー・同じパーティション数)と必要に応じたリパーティションが成立条件です。

  • ステートストア: RocksDB(デフォルト)またはインメモリでローカル保持
  • チェンジログトピック: ストア更新をKafkaへ非同期書き出しし、復旧・スケール時に再構築
  • スタンバイタスク: 予備のタスクがチェンジログを追従し、フェイルオーバーを短縮
  • ウィンドウのgrace: 許容遅延。過去ウィンドウへの遅延イベントを受け付ける時間
  • コーパーティション: Joinやグループ化を分散で正しく行うための必須条件
ストア種別主な用途保持とキー構造
KeyValueStoreKTableの現在値、ストリームの集約結果(非ウィンドウ)キー→最新値を保持。チェンジログはコンパクション主体
WindowStoreウィンドウ集約、ストリーム-ストリームJoinのウィンドウバッファキー+時間境界でセグメント化。保持期間はウィンドウサイズ+grace+α
SessionStoreセッションウィンドウ(ギャップ閉塞)の集約キー+セッション境界(可変長)。マージにより境界が変動

Kafka Streamsにおけるステートフル処理の流れ

uses WindowStorechangelogjoinsourceKStreamgroupBy/windowBycount/aggregateState StoreKafka topicchangelog, compact/retentionreference topicKTable

集約とウィンドウ: count/reduce/aggregateと遅延イベント

非ウィンドウ集約はKeyValueStoreに最新値を保持し、チェンジログはコンパクションで効率化されます。ウィンドウ集約はWindowStoreに時間セグメントごとの部分集計を保持し、保持期間はウィンドウサイズとgraceを含めて十分に設定します。

graceは遅延イベントをどこまで取り込むかを定義します。grace経過後のイベントはウィンドウ更新の対象外となるため、ビジネス要件(到着遅延の分布)に合わせて設定が必要です。

  • count: 件数集計。重複除去が不要なら軽量で高速
  • reduce: 同型の集約(例: 最新タイムスタンプで置換)。順序依存に注意
  • aggregate: 初期値+任意のアキュムレータで柔軟。状態サイズに注意
  • TimeWindows/SlidingWindows/SessionWindowsを使い分ける
  • 保持期間はウィンドウサイズ+grace+安全余裕(処理遅延)を含める

Kafka Streamsでのウィンドウ集約とJoinの例(Java)

StreamsBuilder builder = new StreamsBuilder();

Serde<String> stringSerde = Serdes.String();
Serde<Order> orderSerde = Serdes.serdeFrom(new OrderSerializer(), new OrderDeserializer());
Serde<Customer> customerSerde = Serdes.serdeFrom(new CustomerSerializer(), new CustomerDeserializer());

KStream<String, Order> orders = builder.stream("orders", Consumed.with(stringSerde, orderSerde));
KStream<String, Payment> payments = builder.stream("payments", Consumed.with(stringSerde, Serdes.serdeFrom(new PaymentSerializer(), new PaymentDeserializer())));

// 5分タンブリング + 1分grace
TimeWindows windows = TimeWindows.ofSizeAndGrace(Duration.ofMinutes(5), Duration.ofMinutes(1));
KTable<Windowed<String>, Long> orderCount5m = orders
    .groupByKey()
    .windowedBy(windows)
    .count(Materialized.<String, Long, WindowStore<Bytes, byte[]>>as("order-count-5m")
           .withKeySerde(stringSerde)
           .withValueSerde(Serdes.Long())) ;

// KTableは現在値のスナップショットを保持
KTable<String, Customer> customers = builder.table(
    "customers",
    Consumed.with(stringSerde, customerSerde),
    Materialized.<String, Customer, KeyValueStore<Bytes, byte[]>>as("customers-store")
        .withKeySerde(stringSerde).withValueSerde(customerSerde)
);

// KStream-KTable join(同一キー)。コーパーティションが必要。必要に応じてリパーティションが自動作成される
KStream<String, EnrichedOrder> enriched = orders.join(
    customers,
    (order, customer) -> EnrichedOrder.of(order, customer),
    Joined.with(stringSerde, orderSerde, customerSerde)
);

// KStream-KStreamの時間制約付きJoin
KStream<String, JoinedEvent> joined = orders.join(
    payments,
    (o, p) -> JoinedEvent.of(o, p),
    JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofMinutes(5)),
    StreamJoined.with(stringSerde, orderSerde, Serdes.serdeFrom(new PaymentSerializer(), new PaymentDeserializer()))
);

KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();

Joinパターンの要点: Stream-Stream, Stream-Table, Table-Table

KStream-KStream Joinは時間制約が必須で、両側のレコードがウィンドウ内で出会う必要があります。演算内部では各側の一時バッファとしてWindowStoreが使われます。遅延イベントの扱いはgraceで制御されます。

KStream-KTable Joinは、ストリームのイベントを到着時点のKTableスナップショットで拡張します。KTable側はKeyValueStoreで現在値のみ参照され、ウィンドウは不要です。KTable-KTable Joinは両側の更新を伝播して結果テーブルを更新します。外部キーJoinなどの拡張はバージョン依存のため、実装前に公式ドキュメントで対応状況を確認してください。

  • 全Joinに共通: キー整合とコーパーティション(同一キー・同一パーティション数)が必要
  • KStream-KStream: JoinWindows必須。WindowStoreにより両側を一時保持
  • KStream-KTable: 片側のみ状態参照。更新トリガはストリーム側
  • KTable-KTable: 双方向伝播。チェンジログのコンパクションで効率化
  • 必要に応じて自動リパーティション(中間トピック)が生成されることがある

ステートストアと障害復旧: RocksDB・チェンジログ・スタンバイ

Kafka StreamsはデフォルトでRocksDBをローカルストアとして用います。書き込みはコミット境界でバッチ適用され、更新はチェンジログトピックへ非同期複製されます。タスク再配置時やノード障害時は、チェンジログをリプレイしてRocksDBを再構築します。

高可用性のために、同一アプリケーショングループ内でスタンバイタスクを有効化できます。スタンバイはチェンジログを平行追従しており、フェイルオーバー時のウォームスタートに寄与します。保持やコンパクション設定は、テーブルかウィンドウかによって最適値が異なります。

  • RocksDB: 大容量・ディスク常駐。withCachingEnabledでヒット率向上
  • チェンジログ: テーブル系はコンパクション、ウィンドウ系は保持期間+削除
  • スタンバイタスク数を増やすと復旧短縮と引き換えにネットワーク/ストレージ負荷増
  • 復旧時間はチェンジログサイズ、レイテンシ要件、I/O帯域で見積もる

パーティションとリパーティション: コーパーティション成立条件

グルーピングやJoinの前段でキーが変わる操作(map/flatMap/selectKeyなど)を行うと、既存のパーティション配置は不整合になります。この場合、Kafka Streamsは中間トピックを挟んで自動リパーティションします。試験では、コーパーティション成立の必須条件と、自動リパーティションが生じる代表ケースがよく問われます。

実務では、入口でキーを明示的に整える、トピック作成時にパーティション数を合わせる、必要な箇所だけselectKeyを行うといった設計で中間トピックの氾濫を防ぎます。

  • コーパーティション成立条件: 同一パーティション数、同一キー、同一パーティショナー
  • selectKeyやgroupByでキー変更→リパーティション用の中間トピックが生成され得る
  • KStream-KTable JoinではKTable側のキーが結合キーと一致している必要あり
  • パーティション数を後から増やすと、既存テーブルのコーパーティションが崩れる可能性

運用と試験対策の要点: EOS・コミット・ウィンドウ確定

Exactly-once処理はTransactional producerとprocessing.guarantee=exactly_once_v2の組み合わせで実現されます。ステート更新と出力の原子的コミットにより二重計上を防ぎます。コミット間隔は遅延と書き込み負荷のトレードオフです。

ウィンドウ結果の確定タイミングは、ストリーム時間の前進とgrace経過に依存します。結果の最終化・集計確定を外部に出す場合、ksqlDBやアプリ側でsuppression相当の制御を設けることがあります。試験では、graceの意味、遅延イベントの扱い、EOSの前提条件が頻出です。

  • processing.guarantee=exactly_once_v2でEOSを有効化
  • commit.interval.msはRocksDBフラッシュや出力バッチに影響
  • ウィンドウ結果はgrace経過までは更新され得る
  • 遅延イベントの分布に基づくgrace設定と保持期間見積りが実務の肝

問題で確認

CCDAK

問題 1

KStreamとKTableのJoinを行うアプリで、orders(KStream)のキーをselectKeyで変更した直後にcustomers(KTable)と結合したい。正しい設計として最も適切なのはどれか。

  1. ordersのselectKey後に自動生成されるリパーティショントピックを介してコーパーティションを成立させ、customersのキーと一致させる
  2. ordersのselectKey後にJoinWindowsを指定して時間制約を設ければ、KStream-KTableでもウィンドウにより結合できる
  3. customers側のキーは不一致でも、KStream-KTableは到着順でスキャンするため結合可能
  4. ordersとcustomersのパーティション数は無関係。Kafka Streamsが内部でブロードキャストして結合する

正解: A

KStream-KTable Joinは結合キーの一致とコーパーティションが前提。selectKeyでキーを変えた場合、必要に応じて中間トピックによるリパーティションが挿入され、そのトピックのキーとcustomersのキーが一致していれば結合可能。KStream-KTableにJoinWindowsは不要(時間制約は用いない)。ブロードキャストは行われない。

よくある質問

KTableのチェンジログはなぜコンパクションが推奨されるのか?

KTableはキーの現在値を表すため、古い更新は意味を持ちません。コンパクションにより最新値だけを残せば、復旧のリプレイ量を削減し、ストレージ消費も抑えられます。

RocksDBではなくインメモリストアを使うと何が変わる?

低レイテンシを得られますが、容量制限により状態を大きく持てません。プロセス再起動時はチェンジログからの再構築が必要なのは同じで、ウォームアップ時間やGCの影響を考慮します。

ウィンドウのgraceを0にするとどうなる?

遅延イベントは受け付けず、ウィンドウ境界を超えた時点で結果がほぼ確定します。遅延の多いデータ源では結果欠損のリスクが高まるため、実測に基づき適切なgraceを設定してください。

この記事で学んだ内容を問題で確認しましょう

16,000問以上の問題で実力チェック

無料で問題を解いてみる
この記事の著者

NicheeLab編集部

データエンジニアリング・クラウド資格の専門家。Databricks・Snowflake等の認定資格を保有し、実務経験に基づいた問題作成・解説を行っています。NicheeLab運営。


関連記事
Kafka

Kafka Topic と Partition の基礎: 分散とスケーラビリティの要

CCDAK 対策と実務の両立を意識し、Topic/Partition/Replica/Consumer Group の役...

Kafka

CCDAK 試験ガイド:出題範囲・配点・申込み・対策

Confluent Certified Developer for Apache Kafka (CCDAK) の出題範囲...

Kafka

Confluent Certified Administrator (CCAAK) 対策: 出題範囲・配点の考え方・運用観点の要点

CCAAKに向けて、試験領域の押さえどころを運用目線で整理。プロダクションで通用する設定・監視・セキュリティの実践知を、...

Kafka

Kafka の Replica と In-Sync Replicas を正しく設計する: 耐障害性と一貫性

レプリカとISRの仕組みを起点に、acks と min.insync.replicas、クリーン/アンクリーンリーダー選...

Kafka

Kafka の Offset とコミット: ポジション管理と at-least-once の基礎

CCDAK 対策と実務の両立を意識して、Kafka コンシューマのオフセット管理とコミット戦略を整理。at-least-...

Kafkaの記事一覧 (101件)
© 2026 NicheeLab All rights reserved.