Kafka Streams は、Kafka クラスタの外部に専用の処理基盤を追加せず、アプリケーションのプロセス内でストリーム処理を実現するライブラリです。Kafka の Consumer/Producer と同じ信頼性を土台に、DSL と状態管理を備え、集計・結合・ウィンドウ処理を簡潔に記述できます。
本稿では、KStream/KTable などのコア概念、トポロジ設計、状態と障害復旧、時間セマンティクス、運用の勘所を整理し、CCDAK(Confluent Certified Developer for Apache Kafka)で問われやすいポイントも明示します。
Kafka Streams は Apache Kafka の公式ライブラリで、アプリ内でストリーム処理を行うための軽量ランタイムです。外部の分散処理クラスターを不要とし、各インスタンスがタスクとしてスケールします。Kafka のトピックを入力および出力とし、オフセット管理、再試行、フェイルオーバーを Kafka の仕組みと連携して提供します。
CCDAK の観点では、Kafka Streams と Consumer/Producer の自前実装、ksqlDB の違いを明確にし、どのユースケースで何を選ぶべきかを説明できることが重要です。
| 観点 | Kafka Streams | Consumer/Producer 自前 | ksqlDB |
|---|---|---|---|
| モデル | アプリ内ライブラリ + DSL/状態 | 低レベル API を自作 | サーバサイド SQL-Like |
| 実行形態 | 各アプリプロセスがタスクを実行 | アプリ任せ(設計次第) | 専用サーバ上で実行 |
| 状態管理 | ローカルストア + チェンジログ | 自前で実装・外部DB併用が多い | サーバが管理 |
| 障害復旧 | タスクが別インスタンスへ移動しステート復元 | 自前実装の責務 | サーバ側で処理再開 |
| 学習/運用コスト | 中程度(コードで柔軟) | 実装負荷高い | 学習容易・運用はサーバ管理 |
Kafka Streams の実行モデル(アプリ内タスクと状態の流れ)
最小の Kafka Streams アプリ(Java)
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.*;
import org.apache.kafka.streams.state.Stores;
import java.time.Duration;
import java.util.Properties;
public class WordCountApp {
public static void main(String[] args) {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-app");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
// Exactly-once 対応の設定(サポートされる方式を指定)
props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_V2);
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> textLines = builder.stream("text-input");
KTable<Windowed<String>, Long> counts = textLines
.flatMapValues(value -> java.util.Arrays.asList(value.toLowerCase().split("\\W+")))
.selectKey((k, word) -> word)
.groupByKey()
.windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(1)))
.count(Materialized.as("word-counts-store"));
counts.toStream()
.map((windowedKey, count) -> new KeyValue<>(windowedKey.key(), count.toString()))
.to("wordcount-output", Produced.with(Serdes.String(), Serdes.String()));
KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
}
}
KStream は無限レコード列を表すストリームです。各レコードは独立で、フィルタ、マップ、集計前のグループ化などを行います。KTable はキーごとの最新値のテーブル表現で、チェンジログから再構築可能です。GlobalKTable は全パーティションを各インスタンスに全複製し、読み取り中心の参照結合に向きます。
シリアライズ/デシリアライズ(Serdes)は、トピック入出力と状態ストアの型安全性を支えます。CCDAK では、キー/バリュー Serde の設定漏れや、Join 時の Serde 誤設定によるランタイムエラーが典型的な出題ポイントです。
KStream-KTable Join の最小例(Java)
StreamsBuilder b = new StreamsBuilder();
KStream<String, String> orders = b.stream("orders");
KTable<String, String> users = b.table("users");
KStream<String, String> enriched = orders.join(
users,
(order, user) -> user + ":" + order
);
enriched.to("orders-enriched");トポロジはソース、プロセッサ、シンクの有向グラフです。Kafka Streams DSL は、ビルダー上で演算をつなげると内部でトポロジが構築されます。グループ化や結合にはキーの整合が必須で、必要に応じてリパーティション・トピックが自動生成されます。
CCDAK では、keyBy/selectKey の有無でリパーティションが起きるか、結合の共通キーが整っているか、同一トピック内で null キーがどう扱われるか、などが問われがちです。
リパーティションを伴う設計の例(Java)
KStream<String, Event> s = builder.stream("raw-events");
// userId をキーに再割当(selectKey によりリパーティションが必要になりうる)
KStream<String, Event> byUser = s.selectKey((k, v) -> v.userId());
KTable<String, Long> counts = byUser.groupByKey()
.count(Materialized.as("by-user-counts"));Kafka Streams の状態ストアは、デフォルトで RocksDB(ローカル)に保持され、更新は対応するチェンジログ・トピックへ書き込まれます。障害時は、同じアプリケーショングループ内の別インスタンスにタスクが移り、チェンジログから状態を復元します。必要に応じてスタンバイタスク(standby task)でウォームアップも可能です。
Exactly-Once の処理保証は、冪等プロデューサとトランザクションにより、読み取り・処理・書き込みを単一トランザクションでコミットすることで実現されます。状態更新と出力の原子的コミットにより、重複や欠落を避けます。
ローカル状態ストアの明示と読み取り(Processor API 併用)
StreamsBuilder builder = new StreamsBuilder();
var storeBuilder = Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore("kv-store"),
Serdes.String(), Serdes.Long()
);
builder.addStateStore(storeBuilder);
KStream<String, Long> s = builder.stream("metrics");
s.process(() -> new org.apache.kafka.streams.processor.api.Processor<>() {
private org.apache.kafka.streams.state.KeyValueStore<String, Long> store;
@Override public void init(org.apache.kafka.streams.processor.api.ProcessorContext<String, Long> ctx){
store = ctx.getStateStore("kv-store");
}
@Override public void process(org.apache.kafka.streams.processor.api.Record<String, Long> r){
Long cur = store.get(r.key());
long next = (cur == null ? 0L : cur) + r.value();
store.put(r.key(), next);
}
}, "kv-store");Kafka Streams は主にイベント時刻(レコードタイム)を用います。タイムスタンプはデフォルトでレコードのタイムスタンプを採用し、カスタム TimestampExtractor で制御できます。ウィンドウ処理は tumbling、hopping、session などに対応します。
遅延イベントの扱いはウィンドウ定義時の grace(許容遅延)で制御します。集計結果の最終化や下流への出力タイミングは、必要に応じて Suppress 演算で一時的に抑制する設計が有効です。
ウィンドウ集計と Suppress の例(Java)
KStream<String, Long> metrics = builder.stream("latency");
KTable<Windowed<String>, Long> agg = metrics
.groupByKey()
.windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofSeconds(30)))
.reduce(Long::sum, Materialized.as("latency-sum"));
KStream<String, Long> finalized = agg
.suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
.toStream()
.map((wKey, v) -> new KeyValue<>(wKey.key(), v));
finalized.to("latency-30s");スケーリングはトピックのパーティション数に依存します。最大タスク数は入力トピックのパーティション数に概ね制約されます。バージョン互換の維持、Serde の進化(スキーマ互換性)、リバランス時の停止時間短縮(静的メンバーシップなど)に注意します。
CCDAK では、処理保証(at-least-once と exactly-once の違い)、状態ストアとチェンジログの関係、結合とリパーティションの発生条件、timestamp/grace の効果、スケール戦略(パーティション設計)が頻出です。
運用向け主要プロパティ例(Java・設定抜粋)
Properties p = new Properties();
p.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 3); // 同一プロセス内の並列度
p.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 3); // 内部トピックのRF
p.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 50 * 1024 * 1024L); // キャッシュ
p.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_V2); // 処理保証
p.put(StreamsConfig.STATE_DIR_CONFIG, "/var/lib/streams"); // 状態ディレクトリ
// 必要に応じて consumer/producer のタイムアウトやリトライも調整CCDAK
問題 1
Kafka Streams アプリで、KStream と KTable をキー join し、その後にウィンドウ集計を行う設計を検討している。リパーティション発生の有無と、遅延イベントの扱いについて正しい説明はどれか。
正解: A
KStream-KTable join はキー一致が前提で、KStream 側のキーが一致しない場合には内部でリパーティションが発生し得ます。遅延イベントはウィンドウ定義時の grace により許容範囲内なら集計へ取り込めます。Suppress は出力タイミングの制御であり、遅延イベントの受理/拒否そのものは grace とウィンドウ境界で決まります。
Kafka Streams と ksqlDB はどう使い分けるべきですか?
コードで細かな制御やアプリ固有のライブラリ統合が必要なら Kafka Streams、SQL で高速に開発し運用をサーバ側に寄せたい場合は ksqlDB が適します。チームのスキルセット、運用形態、レイテンシ/スループット要件で選びます。
Exactly-Once を有効化するとパフォーマンスは落ちますか?
トランザクション管理のオーバーヘッドにより、at-least-once より遅延やスループットが悪化する場合があります。必要なフローに限定、バッチサイズやトランザクション時間の調整、中間トピックの最適化で緩和します。
状態ストアは必ず RocksDB ですか?
デフォルトは RocksDB ですが、インメモリストアも選択できます。ただし耐障害性やメモリ使用量の観点から、永続ストア(RocksDB)とチェンジログの組み合わせが一般的です。
NicheeLab編集部
データエンジニアリング・クラウド資格の専門家。Databricks・Snowflake等の認定資格を保有し、実務経験に基づいた問題作成・解説を行っています。NicheeLab運営。
Kafka Topic と Partition の基礎: 分散とスケーラビリティの要
CCDAK 対策と実務の両立を意識し、Topic/Partition/Replica/Consumer Group の役...
CCDAK 試験ガイド:出題範囲・配点・申込み・対策
Confluent Certified Developer for Apache Kafka (CCDAK) の出題範囲...
Confluent Certified Administrator (CCAAK) 対策: 出題範囲・配点の考え方・運用観点の要点
CCAAKに向けて、試験領域の押さえどころを運用目線で整理。プロダクションで通用する設定・監視・セキュリティの実践知を、...
Kafka の Replica と In-Sync Replicas を正しく設計する: 耐障害性と一貫性
レプリカとISRの仕組みを起点に、acks と min.insync.replicas、クリーン/アンクリーンリーダー選...
Kafka の Offset とコミット: ポジション管理と at-least-once の基礎
CCDAK 対策と実務の両立を意識して、Kafka コンシューマのオフセット管理とコミット戦略を整理。at-least-...