Kafka

KStream と KTable: ストリームとテーブルの抽象を正しく使い分ける

2026-04-19
NicheeLab編集部

KStream はイベントの連続(追記ログ)、KTable はキーごとの最新状態(チェンジログ)を表します。両者の違いを理解すると、ジョインや集約、トピック設計で迷いません。

CCDAK では、KStream/KTable の選択、ジョインの時間セマンティクス、トピックのコンパクション、トゥームストーン(null 値)などが頻出論点です。

KStream と KTable の基礎

KStream は、トピックに到着する各レコードをそのまま時間順に扱う抽象です。各レコードは独立しており、ストリーム処理(map、filter、branch、windowed aggregation など)に向きます。KStream は「事実のイベント列」を表現します。

KTable は、キーごとの最新の値を持つテーブルの抽象で、チェンジログ(更新の流れ)から最新像をマテリアライズします。KTable の入力にはログコンパクション対象のトピックが適しています。null 値はトゥームストーンとして扱われ、そのキーの行を削除します。

GlobalKTable は全パーティションを各タスクに複製する読み取り専用のテーブルで、KStream とのキー以外での結合に使えます(全ノードに全データを展開するためサイズに注意)。

  • KStream: 追記ログ(append-only)、各レコードは不変のイベント
  • KTable: キーの最新値、アップサートと削除(null)で表現
  • トピックの cleanup.policy=compact は KTable の正確な最新像に有効
  • KTable の更新は同じキーのレコードを上書きし、下流には変更のみを伝播
抽象表すもの主な用途
KStreamイベントの連続(履歴)フィルタ、マップ、ウィンドウ集約、ストリーム-ストリーム結合
KTableキーの最新状態(現在形)アップサート、集約結果の保持、ストリーム-テーブル結合の参照側
GlobalKTable全ノードに複製された読み取りテーブルキー以外での参照結合(KStream-GlobalKTable)

KStream と KTable の概念的な位置づけ

集約/集計コンパクションTopic (ログ)append-onlyKStreamKTablecompacted topicchangelogGlobalKTable: 全パーティションを各タスクに複製

KStream/KTable の基本的な組み立て(Java, Kafka Streams DSL)

Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-kstream-ktable");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> events = builder.stream("events");
// KTable のソースは通常 compacted topic を推奨
KTable<String, String> profiles = builder.table("user-profiles");

KStream<String, String> enriched = events.leftJoin(
    profiles,
    (eventVal, profileVal) -> profileVal == null ? eventVal : eventVal + "|" + profileVal
);

enriched.to("events-enriched");

KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();

ソース、シリアライズ、パーティション設計

KStream/KTable の正確な振る舞いはキーとパーティション整合性に依存します。ジョインや集約では、同一キーが同一パーティションに存在することが前提です。selectKey や groupBy を行うと、内部リパーティション・トピックが自動作成されます。

KTable を正しく最新像として保つには、入力トピックの cleanup.policy=compact を設定し、キーは安定(不変)であることが望まれます。値が null のレコードはトゥームストーンとなり、そのキーが削除されます。

Serdes(シリアライザ/デシリアライザ)はデータの互換性に直結します。デフォルト Serdes を設定しつつ、異なる型を扱う演算では明示的に Serdes を渡してください。

  • join/aggregate ではキー整合(同一ハッシュ -> 同一パーティション)が必須
  • selectKey/groupBy は内部リパーティションを生成(コストとスループットに影響)
  • KTable の入力には compacted topic を推奨(最新像が得やすく、ストレージ効率も良い)
  • null 値は削除の意味(トゥームストーン)。スキーマ進化時は null の扱いに注意
設計項目KStream の考え方KTable の考え方試験の着眼点
キーとパーティションイベント送出時に正しいキー付与。必要なら selectKey 後に再パーティション同一キーの最新値を同一パーティションで維持ジョインはキー整合が前提。GlobalKTable は例外的に全体複製
トピック方針通常は delete/時間保持。履歴を蓄積compact または compact,delete で最新像+一定履歴KTable で compact を理解。null=delete の意味
Serdesデフォルト Serdes と演算ごとの Serdes 指定null 値対応の Serde 注意(Avro/JSON Schema での null 許容)Serdes の不一致は試験の落とし穴

パーティション整合とタスク配置

Topic A (3 partitions)          Topic B (3 partitions)
P0 -----> Task 0 <----- P0       P0 -----> Task 0 <----- P0
P1 -----> Task 1 <----- P1       P1 -----> Task 1 <----- P1
P2 -----> Task 2 <----- P2       P2 -----> Task 2 <----- P2

同じキーは同じハッシュ -> 同じパーティション -> 同じ Task でジョイン可能

トピック作成(コンパクション)とキー整形の例

AdminClient admin = AdminClient.create(Map.of("bootstrap.servers", "localhost:9092"));
NewTopic compacted = new NewTopic("user-profiles", 3, (short) 3)
    .configs(Map.of(
        "cleanup.policy", "compact",
        "min.cleanable.dirty.ratio", "0.1",
        "segment.ms", "604800000" // 例: 7日
    ));
admin.createTopics(List.of(compacted));

StreamsBuilder b = new StreamsBuilder();
KStream<String, String> raw = b.stream("pageviews-raw");
KStream<String, String> keyed = raw.selectKey((k, v) -> extractUserId(v));
// キー変更 -> 内部リパーティション(throughで明示も可能)
KStream<String, String> repartitioned = keyed.through("pageviews-by-user");

変換、集約、ウィンドウの基本

KStream はレコード単位の変換(map、filter、branch)に加え、ウィンドウ集約が可能です。ウィンドウ集約の結果は KTable(ウィンドウ付きの最新値)として表現されます。時間セマンティクスはイベント時刻を基本とし、遅延到着は許容期間(grace)内で取り込めます。

KTable は非ウィンドウのキー集約に向き、各更新で下流へ再計算の差分を流します(update 設定)。KTable 同士の結合・集約は、テーブルの変更イベントをトリガに再評価されます。

  • KStream の windowedBy は Tumbling/Hopping/Sliding/Sessions を選択可能
  • 遅延到着は grace で制御。閉じたウィンドウは更新されない
  • KTable の集約は常に最新像(非ウィンドウ)。再計算の差分が流れる
  • ウィンドウ集約の出力は KTable(Windowed<K> キー)として表現
操作KStream での可否KTable での可否ウィンドウ/状態
map/filter/branch不可(テーブルは更新駆動)非状態
groupByKey + windowed aggregation可(結果は KTable)不可ウィンドウ+状態ストア
groupBy/aggregate(非ウィンドウ)可(結果は KTable)可(再計算で最新像)状態ストア
suppress(確定後に出力)可(集約後)可(集約後)バッファ+境界確定

ウィンドウ時間線図(Tumbling の例)

time --->
|----[W1]----|----[W2]----|----[W3]----|
イベントは所属ウィンドウに集約され、ウィンドウ境界後に確定(grace を考慮)

イベント時刻ベースのウィンドウ集約(Tumbling)

StreamsBuilder builder = new StreamsBuilder();
KStream<String, Long> events = builder.stream("clicks", Consumed.with(Serdes.String(), Serdes.Long()));
TimeWindows windows = TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(1)); // 例: grace なし
KTable<Windowed<String>, Long> counts = events
    .groupByKey()
    .windowedBy(windows)
    .count();

counts.toStream().to("clicks-per-minute", Produced.with(WindowedSerdes.timeWindowedSerdeFrom(String.class), Serdes.Long()));

ジョインの選択肢とセマンティクス

KStream-KTable は参照(ルックアップ)型の結合で、ストリーム側レコードの処理時点における KTable の最新値と結合します。時間ウィンドウは不要です。KTable の後続更新は過去の結合結果を遡及しません。

KStream-KStream はウィンドウ必須の結合で、同一キーのレコードがウィンドウ内に現れたときに結合します。左右どちらのストリームにも到着順序や遅延の影響を受けます。

KTable-KTable はテーブル同士の結合で、どちらかのテーブルでキーの値が更新されると結合結果が再計算され、差分が下流に流れます。

  • KStream-KTable: ウィンドウ不要、処理時点の最新像で参照
  • KStream-KStream: ウィンドウ必須、遅延・順序に敏感
  • KTable-KTable: 双方の更新イベントで再評価、差分伝播
  • leftJoin/rightJoin/outerJoin の可否と null の扱いを確認(特に CCDAK)
結合タイプ時間要件出力の性質
KStream-KTableウィンドウ不要(処理時点の参照)KStream(結合結果レコードが流れる)
KStream-KStreamウィンドウ必須(イベント時刻)KStream(ウィンドウ内マッチで出力)
KTable-KTableウィンドウ不要(更新駆動)KTable(差分更新が伝播)

3 種のジョインの直感図

KStream --lookup--> KTable   =>  参照時点の最新値
KStream <==window==> KStream =>  時間窓内で結合
KTable  <==update==> KTable  =>  更新のたびに再計算

KStream-KTable と KStream-KStream の例

StreamsBuilder b = new StreamsBuilder();
KStream<String, String> pageviews = b.stream("pageviews");
KTable<String, String> users = b.table("user-profiles");

// 参照型(ウィンドウ不要)
KStream<String, String> enriched = pageviews.leftJoin(users, (pv, u) -> u == null ? pv : pv + "|user=" + u);
enriched.to("pageviews-enriched");

// ストリーム同士の結合(ウィンドウ必須)
KStream<String, String> clicks = b.stream("clicks");
JoinWindows jw = JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofSeconds(30));
KStream<String, String> joined = pageviews.join(
    clicks,
    (pv, cl) -> pv + "&" + cl,
    jw
);
joined.to("pv-clicks-joined");

状態管理とマテリアライズ

Kafka Streams は状態ストア(既定では RocksDB ローカル)に集約・結合などの状態を保持し、変更は内部チェンジログ・トピックに複製されます。障害時にはチェンジログから再構築され、スタンバイ・レプリカを有効化するとフェイルオーバ時間を短縮できます。

マテリアライズでストア名を明示すると、インタラクティブクエリや外部キャッシュ統合にも備えられます。発行頻度はキャッシュ設定と commit.interval に影響されます。トランザクションと冪等プロデューサにより、状態更新と出力の一貫性(いわゆる exactly-once 処理セマンティクス)が提供されます。

  • 状態ストアはローカルに保持、チェンジログに複製して復元可能
  • Materialized.as でストア名・Serdes・キャッシュを指定
  • スタンバイレプリカでより速い復旧(コストとトレードオフ)
  • processing.guarantee の設定で整合性レベルを選択(環境互換性に合わせる)
ストア種別用途永続化/復元ポイント補足
KeyValueStore集約結果や KTable のマテリアライズチェンジログ・トピック最も一般的
WindowStoreウィンドウ集約の中間・結果チェンジログ+セグメント化Windowed<Key> をキーに持つ
SessionStoreセッションウィンドウの集約チェンジログ開始・終了時刻を内部管理

状態ストアとチェンジログの関係

変更を複製TaskRocksDB State StorelocalChangelog TopicStandby Replica Node再起動時は Changelog から復元

マテリアライズと EOS(環境互換に注意)の設定例

Properties props = new Properties();
props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_V2); // 環境により EXACTLY_ONCE を選択
props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 10485760); // キャッシュ 10MB(例)

StreamsBuilder b = new StreamsBuilder();
KTable<String, Long> counts = b.stream("events", Consumed.with(Serdes.String(), Serdes.String()))
    .groupByKey()
    .count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("event-counts")
        .withKeySerde(Serdes.String())
        .withValueSerde(Serdes.Long())
    );

counts.toStream().to("events-counts");

試験対策チェックポイントと実務の落とし穴

CCDAK では、KStream/KTable の役割、結合ごとの時間セマンティクス、トピックのコンパクション、トゥームストーンの意味、リパーティションの発生条件、状態管理と処理保証の設定が問われます。実務でも同じ点で障害・過負荷・不整合が起きやすいため、設計時に明確化しておきます。

特に KTable での null 値は削除を意味するため、スキーマ進化やデシリアライズ失敗で意図せず null を生成しないように注意してください。キャッシュが有効だと下流への出力が遅延するため、テストでは一時的にキャッシュを無効化して観測性を高める手もあります。

  • KStream-KTable はウィンドウ不要、処理時点の参照。KStream-KStream はウィンドウ必須
  • KTable 入力には compacted topic を推奨。null は削除(トゥームストーン)
  • selectKey/groupBy 後は内部リパーティションが発生しうる
  • キャッシュと commit.interval が下流出力頻度に影響
  • processing.guarantee を環境に合わせて設定(EOS 利用時はブローカ設定も要件)
  • GlobalKTable はサイズ注意(全ノードに複製)
落とし穴症状回避策/確認ポイント
トピックが非コンパクションKTable の状態が膨張・再起動で復元長時間cleanup.policy=compact(必要に応じて compact,delete)を設定
キー不整合のままジョイン結合ヒット率が低い/ゼロselectKey で整合、パーティション数とハッシュを合わせる
意図しない null 値KTable の行が削除されるスキーマで null 許容を明示、デシリアライズ失敗の監視
キャッシュの影響を誤解下流に結果が出ないように見えるテスト時は CACHE_MAX_BYTES=0、commit.interval を短縮
無計画な再パーティション内部トピック増加・スループット低下キー設計を事前に固定、必要最小限に selectKey/groupBy

トゥームストーン(null)による削除の流れ

Producerkey=K, value=nullcompacted topicKTable行Kが削除されるProducer が key=K, value=null を送信 → KTable の行Kが削除される

トゥームストーンの送出と観測性向上(テスト用)

Properties p = new Properties();
p.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); // テスト時は即時フラッシュ

StreamsBuilder b = new StreamsBuilder();
KStream<String, String> updates = b.stream("user-updates");
// 削除条件を満たすと null を出力 -> compacted topic でトゥームストーン
updates
  .mapValues((v) -> shouldDelete(v) ? null : v)
  .to("user-profiles"); // cleanup.policy=compact 推奨

// KTable 側で null を検知して監視に送る
KTable<String, String> profiles = b.table("user-profiles");
profiles.toStream()
  .filter((k, v) -> v == null)
  .mapValues(v -> "deleted")
  .to("audit-deletes");

問題で確認

CCDAK

問題 1

ページビューのストリームに、ユーザープロファイルの最新情報を付与したい。プロファイルは userId キーでアップサートされ、削除は null 値(トゥームストーン)で表現される。適切な実装はどれか。

  1. プロファイルを compacted topic から KTable として読み、ページビューストリームと KStream-KTable の leftJoin を行う
  2. プロファイルを KStream として読み、KStream-KStream のウィンドウ結合で 24 時間ウィンドウを指定する
  3. プロファイルを GlobalKTable として読み、KTable-KTable の結合で付与する
  4. プロファイルを KTable として読み、ページビューを KTable に変換して KTable-KTable の結合を行う

正解: A

参照型の付与は KStream-KTable が適切で、処理時点の KTable 最新像をルックアップできる。プロファイルはアップサートと削除(null)を表現するため compacted topic が望ましい。KStream-KStream はウィンドウが必要で参照結合の要件に合わない。KTable-KTable はテーブル変更で再計算されるため、イベント駆動の付与には不向き。

よくある質問

KTable と GlobalKTable の違いは?いつ GlobalKTable を使うべき?

KTable はパーティション分割された最新像で、結合にはキー整合が前提。GlobalKTable は全パーティションを各タスクに複製する読み取り専用テーブルで、KStream 側のフィールドを任意に参照キーにマッピングして結合できる。データ量が小さく、頻繁な参照を行いたい場合に有効(ただしメモリ/ネットワークコストに注意)。

KTable で null 値を受け取ると何が起きる?

null 値はトゥームストーンであり、そのキーに対応する行が削除される。コンパクションにより履歴は圧縮され、最終的に削除情報のみが保持または除去される。意図しない削除を避けるため、スキーマで null 許容を明示し、デシリアライズ失敗の監視を行うこと。

exactly-once 処理セマンティクスは KStream/KTable にどう関係する?

状態更新(KTable のマテリアライズや集約)と出力トピックへの書き込みを単一トランザクションでコミットし、一貫性を保つ。処理保証は processing.guarantee で設定し、ブローカー側の冪等/トランザクション機能が有効であることが前提。試験では概念と要件(状態・出力の原子的コミット)を理解しておく。

この記事で学んだ内容を問題で確認しましょう

16,000問以上の問題で実力チェック

無料で問題を解いてみる
この記事の著者

NicheeLab編集部

データエンジニアリング・クラウド資格の専門家。Databricks・Snowflake等の認定資格を保有し、実務経験に基づいた問題作成・解説を行っています。NicheeLab運営。


関連記事
Kafka

Kafka Topic と Partition の基礎: 分散とスケーラビリティの要

CCDAK 対策と実務の両立を意識し、Topic/Partition/Replica/Consumer Group の役...

Kafka

CCDAK 試験ガイド:出題範囲・配点・申込み・対策

Confluent Certified Developer for Apache Kafka (CCDAK) の出題範囲...

Kafka

Confluent Certified Administrator (CCAAK) 対策: 出題範囲・配点の考え方・運用観点の要点

CCAAKに向けて、試験領域の押さえどころを運用目線で整理。プロダクションで通用する設定・監視・セキュリティの実践知を、...

Kafka

Kafka の Replica と In-Sync Replicas を正しく設計する: 耐障害性と一貫性

レプリカとISRの仕組みを起点に、acks と min.insync.replicas、クリーン/アンクリーンリーダー選...

Kafka

Kafka の Offset とコミット: ポジション管理と at-least-once の基礎

CCDAK 対策と実務の両立を意識して、Kafka コンシューマのオフセット管理とコミット戦略を整理。at-least-...

Kafkaの記事一覧 (101件)
© 2026 NicheeLab All rights reserved.