KStream represents a sequence of events (an append-only log), while KTable represents the latest state per key (a changelog). Understanding the difference removes the guesswork from joins, aggregations, and topic design.
On CCDAK, common topics include choosing between KStream and KTable, join time semantics, topic compaction, and tombstones (null values).
KStream is an abstraction that handles each record arriving on a topic in time order. Each record is independent, making it suitable for stream processing (map, filter, branch, windowed aggregation, etc.). KStream represents a sequence of factual events.
KTable is a table abstraction that holds the latest value per key, materializing the current view from a changelog (stream of updates). KTable inputs work best from log-compacted topics. A null value is treated as a tombstone and deletes the row for that key.
GlobalKTable is a read-only table replicated across all partitions to every task, enabling joins with a KStream on non-key fields (watch out for size, since all data is replicated to every node).
| Abstraction | What it represents | Main uses |
|---|---|---|
| KStream | A sequence of events (history) | Filter, map, windowed aggregation, stream-stream joins |
| KTable | Latest state per key (current view) | Upserts, holding aggregation results, lookup side of stream-table joins |
| GlobalKTable | A read-only table replicated to every node | Lookup joins on non-key fields (KStream-GlobalKTable) |
Conceptual positioning of KStream and KTable
Basic KStream/KTable assembly (Java, Kafka Streams DSL)
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-kstream-ktable");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> events = builder.stream("events");
// KTable のソースは通常 compacted topic を推奨
KTable<String, String> profiles = builder.table("user-profiles");
KStream<String, String> enriched = events.leftJoin(
profiles,
(eventVal, profileVal) -> profileVal == null ? eventVal : eventVal + "|" + profileVal
);
enriched.to("events-enriched");
KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();KStream/KTable behavior depends on keys and partition alignment. Joins and aggregations assume the same key lives on the same partition. Using selectKey or groupBy automatically creates an internal repartition topic.
To keep a KTable accurate as a latest view, set cleanup.policy=compact on the input topic and keep keys stable (immutable). Records with null values become tombstones and delete the corresponding key.
Serdes (serializer/deserializer) directly affect data compatibility. Configure default Serdes but pass explicit Serdes for operations that handle different types.
| Design item | KStream perspective | KTable perspective | Exam focus |
|---|---|---|---|
| Keys and partitions | Assign correct keys at produce time. Repartition after selectKey if needed | Keep the latest value for the same key on the same partition | Joins require key alignment. GlobalKTable is the exception with full replication |
| Topic policy | Usually delete/time-based retention. Accumulates history | compact, or compact,delete for the latest view plus some history | Understand compact for KTable. null = delete |
| Serdes | Default Serdes plus per-operation Serdes | Watch for null-aware Serdes (Avro/JSON Schema nullability) | Serdes mismatches are a classic exam pitfall |
Partition alignment and task assignment
Topic A (3 partitions) Topic B (3 partitions)
P0 -----> Task 0 <----- P0 P0 -----> Task 0 <----- P0
P1 -----> Task 1 <----- P1 P1 -----> Task 1 <----- P1
P2 -----> Task 2 <----- P2 P2 -----> Task 2 <----- P2
同じキーは同じハッシュ -> 同じパーティション -> 同じ Task でジョイン可能Topic creation (compaction) and key shaping example
AdminClient admin = AdminClient.create(Map.of("bootstrap.servers", "localhost:9092"));
NewTopic compacted = new NewTopic("user-profiles", 3, (short) 3)
.configs(Map.of(
"cleanup.policy", "compact",
"min.cleanable.dirty.ratio", "0.1",
"segment.ms", "604800000" // 例: 7日
));
admin.createTopics(List.of(compacted));
StreamsBuilder b = new StreamsBuilder();
KStream<String, String> raw = b.stream("pageviews-raw");
KStream<String, String> keyed = raw.selectKey((k, v) -> extractUserId(v));
// キー変更 -> 内部リパーティション(throughで明示も可能)
KStream<String, String> repartitioned = keyed.through("pageviews-by-user");KStream supports per-record transformations (map, filter, branch) plus windowed aggregations. The result of a windowed aggregation is expressed as a KTable (latest value with a window). Time semantics are event-time by default, and late arrivals can be absorbed within the grace period.
KTable is suited to non-windowed aggregation by key and streams recomputed deltas downstream on each update (update mode). KTable-KTable joins and aggregations are re-evaluated whenever the underlying table change events trigger them.
| Operation | Allowed on KStream | Allowed on KTable | Window/state |
|---|---|---|---|
| map/filter/branch | Yes | No (tables are update-driven) | Stateless |
| groupByKey + windowed aggregation | Yes (result is a KTable) | No | Window + state store |
| groupBy/aggregate (non-windowed) | Yes (result is a KTable) | Yes (recomputed to the latest view) | State store |
| suppress (emit after finalization) | Yes (after aggregation) | Yes (after aggregation) | Buffer + boundary finalization |
Window timeline diagram (Tumbling example)
time --->
|----[W1]----|----[W2]----|----[W3]----|
イベントは所属ウィンドウに集約され、ウィンドウ境界後に確定(grace を考慮)Event-time-based windowed aggregation (Tumbling)
StreamsBuilder builder = new StreamsBuilder();
KStream<String, Long> events = builder.stream("clicks", Consumed.with(Serdes.String(), Serdes.Long()));
TimeWindows windows = TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(1)); // 例: grace なし
KTable<Windowed<String>, Long> counts = events
.groupByKey()
.windowedBy(windows)
.count();
counts.toStream().to("clicks-per-minute", Produced.with(WindowedSerdes.timeWindowedSerdeFrom(String.class), Serdes.Long()));KStream-KTable is a lookup-style join that combines a stream record with the latest KTable value at processing time. No time window is needed, and later KTable updates do not retroactively change past join results.
KStream-KStream requires a window and joins records with the same key when they appear within the window. Both streams are sensitive to arrival order and lateness.
KTable-KTable is a join between two tables. Whenever the value for a key in either table updates, the join result is recomputed and the delta flows downstream.
| Join type | Time requirement | Output nature |
|---|---|---|
| KStream-KTable | No window (lookup at processing time) | KStream (join result records flow through) |
| KStream-KStream | Window required (event time) | KStream (emits on matches inside the window) |
| KTable-KTable | No window (update-driven) | KTable (delta updates propagate) |
Intuitive picture of the three join types
KStream --lookup--> KTable => 参照時点の最新値
KStream <==window==> KStream => 時間窓内で結合
KTable <==update==> KTable => 更新のたびに再計算Examples of KStream-KTable and KStream-KStream
StreamsBuilder b = new StreamsBuilder();
KStream<String, String> pageviews = b.stream("pageviews");
KTable<String, String> users = b.table("user-profiles");
// 参照型(ウィンドウ不要)
KStream<String, String> enriched = pageviews.leftJoin(users, (pv, u) -> u == null ? pv : pv + "|user=" + u);
enriched.to("pageviews-enriched");
// ストリーム同士の結合(ウィンドウ必須)
KStream<String, String> clicks = b.stream("clicks");
JoinWindows jw = JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofSeconds(30));
KStream<String, String> joined = pageviews.join(
clicks,
(pv, cl) -> pv + "&" + cl,
jw
);
joined.to("pv-clicks-joined");Kafka Streams holds aggregation and join state in state stores (RocksDB local by default), and changes are replicated to an internal changelog topic. On failure, state is rebuilt from the changelog, and enabling standby replicas can shorten failover time.
Naming the store explicitly when materializing prepares you for interactive queries and external cache integration. Emit frequency is influenced by cache settings and commit.interval. Transactions plus the idempotent producer deliver consistency between state updates and output (so-called exactly-once processing semantics).
| Store type | Use case | Persistence/recovery point | Notes |
|---|---|---|---|
| KeyValueStore | Aggregation results and KTable materialization | Changelog topic | Most common |
| WindowStore | Intermediate and final results of windowed aggregation | Changelog plus segmentation | Keyed by Windowed<Key> |
| SessionStore | Session window aggregation | Changelog | Manages start/end times internally |
Relationship between state store and changelog
Example: configuring materialization and EOS (mind environment compatibility)
Properties props = new Properties();
props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_V2); // 環境により EXACTLY_ONCE を選択
props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 10485760); // キャッシュ 10MB(例)
StreamsBuilder b = new StreamsBuilder();
KTable<String, Long> counts = b.stream("events", Consumed.with(Serdes.String(), Serdes.String()))
.groupByKey()
.count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("event-counts")
.withKeySerde(Serdes.String())
.withValueSerde(Serdes.Long())
);
counts.toStream().to("events-counts");CCDAK tests you on the roles of KStream/KTable, time semantics per join type, topic compaction, the meaning of tombstones, conditions that trigger repartition, and state management and processing guarantee settings. The same areas commonly cause failures, overload, and inconsistencies in production, so clarify them at design time.
In particular, null values in a KTable mean delete, so be careful that schema evolution or deserialization failures do not accidentally produce nulls. Caching delays downstream output, so disabling cache temporarily for tests can help observability.
| Pitfall | Symptom | Mitigation / what to check |
|---|---|---|
| Topic is not compacted | KTable state grows huge; restart restoration takes a long time | Set cleanup.policy=compact (or compact,delete as needed) |
| Joining with misaligned keys | Low or zero join hit rate | Align with selectKey; match partition count and hashing |
| Unintended null values | KTable rows get deleted | Mark fields nullable in the schema; monitor for deserialization failures |
| Misunderstanding cache effects | Results appear not to flow downstream | For tests set CACHE_MAX_BYTES=0 and shorten commit.interval |
| Unplanned repartitioning | Internal topics multiply, throughput drops | Fix key design up front; use selectKey/groupBy only when necessary |
Flow of deletion via tombstones (null)
Emitting tombstones and boosting observability (for tests)
Properties p = new Properties();
p.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); // テスト時は即時フラッシュ
StreamsBuilder b = new StreamsBuilder();
KStream<String, String> updates = b.stream("user-updates");
// 削除条件を満たすと null を出力 -> compacted topic でトゥームストーン
updates
.mapValues((v) -> shouldDelete(v) ? null : v)
.to("user-profiles"); // cleanup.policy=compact 推奨
// KTable 側で null を検知して監視に送る
KTable<String, String> profiles = b.table("user-profiles");
profiles.toStream()
.filter((k, v) -> v == null)
.mapValues(v -> "deleted")
.to("audit-deletes");CCDAK
問題 1
You want to enrich a stream of page views with the latest user profile information. Profiles are upserted by userId, with deletes expressed as null values (tombstones). Which implementation is appropriate?
正解: A
Lookup-style enrichment fits KStream-KTable, which looks up the latest KTable view at processing time. Profiles express upserts and deletes (null), so a compacted topic is the natural fit. KStream-KStream needs a window and does not match lookup-join requirements. KTable-KTable recomputes on table changes, which is not a good fit for event-driven enrichment.
What is the difference between KTable and GlobalKTable, and when should you use GlobalKTable?
KTable is a partitioned latest-value view that assumes key alignment for joins. GlobalKTable is a read-only table replicated to every task across all partitions, letting you map arbitrary fields on the KStream side as join keys. It works well when data is small and you need frequent lookups, but watch out for memory and network costs.
What happens when a KTable receives a null value?
A null value is a tombstone, and the row for that key gets deleted. Compaction collapses history so that only the delete marker is retained or eventually purged. To avoid unintended deletes, explicitly mark fields as nullable in the schema and monitor for deserialization failures.
How does exactly-once processing semantics relate to KStream/KTable?
State updates (KTable materialization or aggregations) and writes to output topics are committed in a single transaction to keep them consistent. The guarantee is configured via processing.guarantee and requires the broker's idempotent/transactional features to be enabled. For the exam, understand the concept and the requirement of atomic commit of state and output.
Practice with certification-focused question sets
無料で問題を解いてみるNicheeLab Editorial Team
NicheeLab editorial team focused on data engineering and cloud certification learning. Content is structured around practical study needs and official exam domains.
Kafka Topics & Partitions: Distribution Fundamentals (2026)
How Kafka topics and partitions enable scale — ordering guar...
CCDAK Exam Guide: Confluent Certified Developer (2026)
Complete prep for the CCDAK exam — Producer/Consumer API, St...
CCAAK Exam Guide: Confluent Certified Administrator (2026)
Pass the CCAAK exam — cluster management, partitions, securi...
Kafka Replicas & ISR: Fault Tolerance Explained (2026)
Replica placement, in-sync replicas (ISR), leader election. ...
Kafka Offsets: Commit Modes & Consumer Position (2026)
Offset semantics — auto vs. manual commit, __consumer_offset...