Kafka

Kafka Streams 入門: アプリ内ストリーム処理の実践

2026-04-19
NicheeLab編集部

Kafka Streams は、Kafka クラスタの外部に専用の処理基盤を追加せず、アプリケーションのプロセス内でストリーム処理を実現するライブラリです。Kafka の Consumer/Producer と同じ信頼性を土台に、DSL と状態管理を備え、集計・結合・ウィンドウ処理を簡潔に記述できます。

本稿では、KStream/KTable などのコア概念、トポロジ設計、状態と障害復旧、時間セマンティクス、運用の勘所を整理し、CCDAK(Confluent Certified Developer for Apache Kafka)で問われやすいポイントも明示します。

Kafka Streams とは: 位置づけと適用領域

Kafka Streams は Apache Kafka の公式ライブラリで、アプリ内でストリーム処理を行うための軽量ランタイムです。外部の分散処理クラスターを不要とし、各インスタンスがタスクとしてスケールします。Kafka のトピックを入力および出力とし、オフセット管理、再試行、フェイルオーバーを Kafka の仕組みと連携して提供します。

CCDAK の観点では、Kafka Streams と Consumer/Producer の自前実装、ksqlDB の違いを明確にし、どのユースケースで何を選ぶべきかを説明できることが重要です。

  • ライブラリ実行型: アプリに依存関係を追加して埋め込む
  • スケール: パーティション数に応じてタスクを分散
  • 状態管理: ローカルステート + チェンジログで耐障害性を確保
  • 処理保証: 少なくとも 1 回、厳密に 1 回相当の構成が可能(Exactly-Once 処理保証はトランザクションと冪等プロデューサに依存)
  • DSL と Processor API: 宣言的にグラフを構築、あるいは低レベルで制御
観点Kafka StreamsConsumer/Producer 自前ksqlDB
モデルアプリ内ライブラリ + DSL/状態低レベル API を自作サーバサイド SQL-Like
実行形態各アプリプロセスがタスクを実行アプリ任せ(設計次第)専用サーバ上で実行
状態管理ローカルストア + チェンジログ自前で実装・外部DB併用が多いサーバが管理
障害復旧タスクが別インスタンスへ移動しステート復元自前実装の責務サーバ側で処理再開
学習/運用コスト中程度(コードで柔軟)実装負荷高い学習容易・運用はサーバ管理

Kafka Streams の実行モデル(アプリ内タスクと状態の流れ)

assign by group.idassign by group.idKafka Clusterinput-topic [P0][P1][P2] / output-topicStreams App Instance ATask T0 (P0) / Task T1 (P1) / State(RDB)ChangelogRocksDB writesStreams App Instance BTask T2 (P2) / State(RDB)Kafka Streams の実行モデル

最小の Kafka Streams アプリ(Java)

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.*;
import org.apache.kafka.streams.state.Stores;
import java.time.Duration;
import java.util.Properties;

public class WordCountApp {
  public static void main(String[] args) {
    Properties props = new Properties();
    props.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-app");
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
    props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
    // Exactly-once 対応の設定(サポートされる方式を指定)
    props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_V2);

    StreamsBuilder builder = new StreamsBuilder();

    KStream<String, String> textLines = builder.stream("text-input");

    KTable<Windowed<String>, Long> counts = textLines
        .flatMapValues(value -> java.util.Arrays.asList(value.toLowerCase().split("\\W+")))
        .selectKey((k, word) -> word)
        .groupByKey()
        .windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(1)))
        .count(Materialized.as("word-counts-store"));

    counts.toStream()
        .map((windowedKey, count) -> new KeyValue<>(windowedKey.key(), count.toString()))
        .to("wordcount-output", Produced.with(Serdes.String(), Serdes.String()));

    KafkaStreams streams = new KafkaStreams(builder.build(), props);
    streams.start();

    Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
  }
}

コア概念: KStream/KTable/GlobalKTable と Serdes

KStream は無限レコード列を表すストリームです。各レコードは独立で、フィルタ、マップ、集計前のグループ化などを行います。KTable はキーごとの最新値のテーブル表現で、チェンジログから再構築可能です。GlobalKTable は全パーティションを各インスタンスに全複製し、読み取り中心の参照結合に向きます。

シリアライズ/デシリアライズ(Serdes)は、トピック入出力と状態ストアの型安全性を支えます。CCDAK では、キー/バリュー Serde の設定漏れや、Join 時の Serde 誤設定によるランタイムエラーが典型的な出題ポイントです。

  • KStream: イベント逐次処理、集計前の grouping、Join のストリーム側
  • KTable: 最終結果表現、アップサート的更新、テーブル結合の相手
  • GlobalKTable: 参照データの全複製、ストリームとの外部キー結合
  • Serde: トピック/状態ストア/中間状態の型を合致させる

KStream-KTable Join の最小例(Java)

StreamsBuilder b = new StreamsBuilder();
KStream<String, String> orders = b.stream("orders");
KTable<String, String> users = b.table("users");
KStream<String, String> enriched = orders.join(
  users,
  (order, user) -> user + ":" + order
);
enriched.to("orders-enriched");

トポロジ設計とリパーティションの勘所

トポロジはソース、プロセッサ、シンクの有向グラフです。Kafka Streams DSL は、ビルダー上で演算をつなげると内部でトポロジが構築されます。グループ化や結合にはキーの整合が必須で、必要に応じてリパーティション・トピックが自動生成されます。

CCDAK では、keyBy/selectKey の有無でリパーティションが起きるか、結合の共通キーが整っているか、同一トピック内で null キーがどう扱われるか、などが問われがちです。

  • groupByKey/selectKey 後の集計はキーに基づくため、パーティション境界が重要
  • 異なるキーでの join は自動リパーティションが発生しうる(追加 I/O と遅延に注意)
  • null キーは多くの演算で除外される。join/aggregate 前に扱いを決める
  • 中間トピック名は明示的に Named や Materialized#withName で制御可能

リパーティションを伴う設計の例(Java)

KStream<String, Event> s = builder.stream("raw-events");
// userId をキーに再割当(selectKey によりリパーティションが必要になりうる)
KStream<String, Event> byUser = s.selectKey((k, v) -> v.userId());
KTable<String, Long> counts = byUser.groupByKey()
  .count(Materialized.as("by-user-counts"));

状態管理とフォールトトレランス

Kafka Streams の状態ストアは、デフォルトで RocksDB(ローカル)に保持され、更新は対応するチェンジログ・トピックへ書き込まれます。障害時は、同じアプリケーショングループ内の別インスタンスにタスクが移り、チェンジログから状態を復元します。必要に応じてスタンバイタスク(standby task)でウォームアップも可能です。

Exactly-Once の処理保証は、冪等プロデューサとトランザクションにより、読み取り・処理・書き込みを単一トランザクションでコミットすることで実現されます。状態更新と出力の原子的コミットにより、重複や欠落を避けます。

  • Materialized.as で名前を付けると、状態ストアとチェンジログ名が決まる
  • スタンバイレプリカ数はトポロジごとに調整可能(復元時間とコストのトレードオフ)
  • 復元は Kafka からのリプレイ。外部 DB 依存を減らすと一貫性を保ちやすい
  • EOS 有効時はプロデューサのトランザクションタイムアウトやバッファに留意

ローカル状態ストアの明示と読み取り(Processor API 併用)

StreamsBuilder builder = new StreamsBuilder();
var storeBuilder = Stores.keyValueStoreBuilder(
  Stores.persistentKeyValueStore("kv-store"),
  Serdes.String(), Serdes.Long()
);
builder.addStateStore(storeBuilder);
KStream<String, Long> s = builder.stream("metrics");
s.process(() -> new org.apache.kafka.streams.processor.api.Processor<>() {
  private org.apache.kafka.streams.state.KeyValueStore<String, Long> store;
  @Override public void init(org.apache.kafka.streams.processor.api.ProcessorContext<String, Long> ctx){
    store = ctx.getStateStore("kv-store");
  }
  @Override public void process(org.apache.kafka.streams.processor.api.Record<String, Long> r){
    Long cur = store.get(r.key());
    long next = (cur == null ? 0L : cur) + r.value();
    store.put(r.key(), next);
  }
}, "kv-store");

時間セマンティクスとウィンドウ処理

Kafka Streams は主にイベント時刻(レコードタイム)を用います。タイムスタンプはデフォルトでレコードのタイムスタンプを採用し、カスタム TimestampExtractor で制御できます。ウィンドウ処理は tumbling、hopping、session などに対応します。

遅延イベントの扱いはウィンドウ定義時の grace(許容遅延)で制御します。集計結果の最終化や下流への出力タイミングは、必要に応じて Suppress 演算で一時的に抑制する設計が有効です。

  • TimeWindows(固定長)、SessionWindows(アイドル間隔でバケット化)に対応
  • TimestampExtractor でイベント時刻を抽出。未設定時はデフォルトのレコード時刻
  • grace でウィンドウの遅延許容を調整(大きくするとメモリ/状態増)
  • Suppress を使うとウィンドウ確定まで中間更新を抑制可能

ウィンドウ集計と Suppress の例(Java)

KStream<String, Long> metrics = builder.stream("latency");
KTable<Windowed<String>, Long> agg = metrics
  .groupByKey()
  .windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofSeconds(30)))
  .reduce(Long::sum, Materialized.as("latency-sum"));
KStream<String, Long> finalized = agg
  .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
  .toStream()
  .map((wKey, v) -> new KeyValue<>(wKey.key(), v));
finalized.to("latency-30s");

運用の勘所と CCDAK 対策ポイント

スケーリングはトピックのパーティション数に依存します。最大タスク数は入力トピックのパーティション数に概ね制約されます。バージョン互換の維持、Serde の進化(スキーマ互換性)、リバランス時の停止時間短縮(静的メンバーシップなど)に注意します。

CCDAK では、処理保証(at-least-once と exactly-once の違い)、状態ストアとチェンジログの関係、結合とリパーティションの発生条件、timestamp/grace の効果、スケール戦略(パーティション設計)が頻出です。

  • アプリケーションIDは状態と内部トピック名に影響。変更は慎重に
  • 内部トピックの保存ポリシー/レプリケーション係数は運用 SLO に合わせて調整
  • スタンバイタスクで復元短縮、ただしブローカー負荷とストレージ増に留意
  • モニタリングは処理遅延(end-to-end lag と stream-time 差)を指標に

運用向け主要プロパティ例(Java・設定抜粋)

Properties p = new Properties();
p.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 3); // 同一プロセス内の並列度
p.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 3); // 内部トピックのRF
p.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 50 * 1024 * 1024L); // キャッシュ
p.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_V2); // 処理保証
p.put(StreamsConfig.STATE_DIR_CONFIG, "/var/lib/streams"); // 状態ディレクトリ
// 必要に応じて consumer/producer のタイムアウトやリトライも調整

問題で確認

CCDAK

問題 1

Kafka Streams アプリで、KStream と KTable をキー join し、その後にウィンドウ集計を行う設計を検討している。リパーティション発生の有無と、遅延イベントの扱いについて正しい説明はどれか。

  1. KStream 側で join キーを selectKey しておらず、KTable のキーと一致していない場合、自動でリパーティションが発生し得る。遅延イベントはウィンドウの grace を設定することで許容範囲内なら集計に反映できる。
  2. KStream-KTable の join では常にリパーティションは発生しない。遅延イベントは Suppress 設定に関係なく必ず破棄される。
  3. KTable 側のキーは無視され、KStream の元のキーで常に join される。遅延イベントは自動で最新ウィンドウに繰り上げられる。
  4. リパーティションは明示的に repartition() を呼ばない限り発生しない。遅延イベントはタイムスタンプ抽出に依存しない。

正解: A

KStream-KTable join はキー一致が前提で、KStream 側のキーが一致しない場合には内部でリパーティションが発生し得ます。遅延イベントはウィンドウ定義時の grace により許容範囲内なら集計へ取り込めます。Suppress は出力タイミングの制御であり、遅延イベントの受理/拒否そのものは grace とウィンドウ境界で決まります。

よくある質問

Kafka Streams と ksqlDB はどう使い分けるべきですか?

コードで細かな制御やアプリ固有のライブラリ統合が必要なら Kafka Streams、SQL で高速に開発し運用をサーバ側に寄せたい場合は ksqlDB が適します。チームのスキルセット、運用形態、レイテンシ/スループット要件で選びます。

Exactly-Once を有効化するとパフォーマンスは落ちますか?

トランザクション管理のオーバーヘッドにより、at-least-once より遅延やスループットが悪化する場合があります。必要なフローに限定、バッチサイズやトランザクション時間の調整、中間トピックの最適化で緩和します。

状態ストアは必ず RocksDB ですか?

デフォルトは RocksDB ですが、インメモリストアも選択できます。ただし耐障害性やメモリ使用量の観点から、永続ストア(RocksDB)とチェンジログの組み合わせが一般的です。

この記事で学んだ内容を問題で確認しましょう

16,000問以上の問題で実力チェック

無料で問題を解いてみる
この記事の著者

NicheeLab編集部

データエンジニアリング・クラウド資格の専門家。Databricks・Snowflake等の認定資格を保有し、実務経験に基づいた問題作成・解説を行っています。NicheeLab運営。


関連記事
Kafka

Kafka Topic と Partition の基礎: 分散とスケーラビリティの要

CCDAK 対策と実務の両立を意識し、Topic/Partition/Replica/Consumer Group の役...

Kafka

CCDAK 試験ガイド:出題範囲・配点・申込み・対策

Confluent Certified Developer for Apache Kafka (CCDAK) の出題範囲...

Kafka

Confluent Certified Administrator (CCAAK) 対策: 出題範囲・配点の考え方・運用観点の要点

CCAAKに向けて、試験領域の押さえどころを運用目線で整理。プロダクションで通用する設定・監視・セキュリティの実践知を、...

Kafka

Kafka の Replica と In-Sync Replicas を正しく設計する: 耐障害性と一貫性

レプリカとISRの仕組みを起点に、acks と min.insync.replicas、クリーン/アンクリーンリーダー選...

Kafka

Kafka の Offset とコミット: ポジション管理と at-least-once の基礎

CCDAK 対策と実務の両立を意識して、Kafka コンシューマのオフセット管理とコミット戦略を整理。at-least-...

Kafkaの記事一覧 (101件)
© 2026 NicheeLab All rights reserved.