A consumer is built around poll: group membership is kept alive by heartbeats, and processing is confirmed via offset commits.
Understanding how the key settings interact — max.poll.interval.ms, session.timeout.ms, heartbeat.interval.ms, enable.auto.commit, and friends — is critical both on the exam and in production.
KafkaConsumer calls poll(Duration) on a regular cadence and returns data that has already been fetched from the broker. The number of records returned is shaped by settings such as max.poll.records, and it interacts with the broker-side fetch.min.bytes and fetch.max.wait.ms. poll is also essential for driving group management forward — running rebalance callbacks and coordinating with the heartbeat thread.
After the first subscribe call, the consumer joins the group keyed by group.id, and polling really gets going once partitions are assigned. Fetches are prefetched in the background, so the single most important best practice for stable operation is to call poll on a short, regular cadence.
Consumer-to-broker interactions (poll / heartbeat / commit)
Minimal polling loop (Java)
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "broker:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "g1");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 200);
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("orders"));
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(500));
for (ConsumerRecord<String, String> r : records) {
// Application processing
handle(r);
}
// Commit after the batch (sync for low-volume, high-reliability cases)
consumer.commitSync();
}
} finally {
consumer.close();
}Consumers send periodic heartbeats to keep group membership alive. In the Java client, heartbeats are sent on an internal thread, but if the application-side poll falls far behind, max.poll.interval.ms is exceeded and the consumer becomes a rebalance target. session.timeout.ms is the upper bound the coordinator waits before deciding heartbeats have stopped, and heartbeat.interval.ms is the target interval between heartbeats.
A common production failure mode is when long processing or external I/O blocks the application thread: heartbeats keep flowing, but max.poll.interval.ms is exceeded and a rebalance is triggered anyway. Practical mitigations include shrinking the batch, using pause/resume to control the backlog, and bumping max.poll.interval.ms when the workload truly requires it.
Key heartbeat-related settings (Java)
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 45000); // upper bound for failure detection
props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 3000); // target heartbeat interval
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 300000); // max time between poll calls (tune to workload)
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 200); // cap records returned per pollA commit is a group-level write into the __consumer_offsets topic. With enable.auto.commit=true, the consumer automatically commits the offset up to the end of what poll returned, every auto.commit.interval.ms. With manual commits, commitSync is reliable but adds latency, while commitAsync is fast but tolerates some failures. Committing after processing completes is the foundation of at-least-once delivery.
A widely used field pattern is to call commitAsync at the end of each batch and seal things off with commitSync on shutdown or when partitions are revoked.
| Strategy | Latency / Reliability | Primary Use Case | Caveats |
|---|---|---|---|
| commitSync() | Higher latency / high reliability | Lower-throughput batches where certainty matters | Latency spikes when the coordinator briefly struggles, since the call waits |
| commitAsync() | Low latency / best-effort | High-throughput streaming workloads | Design retries via the callback; watch out for out-of-order commits |
| enable.auto.commit=true | Low latency / simple | Lightweight subscriptions and prototyping | Often commits before processing finishes, so it tends to slip into at-most-once |
Combining sync and async commits (rebalance-aware)
AtomicBoolean running = new AtomicBoolean(true);
consumer.subscribe(Collections.singletonList("orders"), new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
// Commit synchronously right before a rebalance
consumer.commitSync();
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) { }
});
try {
while (running.get()) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(300));
process(records);
consumer.commitAsync((offsets, ex) -> {
if (ex != null) log.warn("async commit failed", ex);
});
}
} catch (WakeupException e) {
// shutdown signal
} finally {
try { consumer.commitSync(); } finally { consumer.close(); }
}On its own, a consumer can deliver either at-most-once or at-least-once. Most workloads pick at-least-once — commit after processing — and absorb duplicates downstream through idempotency (unique-key updates, upserts, naturally idempotent aggregations).
If you also need to eliminate duplicates in a read-process-write loop that writes back to a producer, use the Idempotent Producer plus the Transactional Producer and commit output records and offsets in the same transaction via sendOffsetsToTransaction. This satisfies exactly-once at least within Kafka itself.
Transactional integration snippet (Java, for reference)
producer.initTransactions();
while (running) {
ConsumerRecords<String, String> rs = consumer.poll(Duration.ofMillis(300));
producer.beginTransaction();
for (ConsumerRecord<String, String> r : rs) {
ProducerRecord<String, String> out = transform(r);
producer.send(out);
}
Map<TopicPartition, OffsetAndMetadata> offsets = currentOffsets(rs);
producer.sendOffsetsToTransaction(offsets, new ConsumerGroupMetadata("g1"));
producer.commitTransaction();
}When processing is heavy, keep poll running and hand work off to worker threads, pausing the relevant partitions when needed to throttle the backlog. Commit progressively up to the highest contiguously processed offset per partition.
Using the cooperative-sticky assignor can shorten the stop-the-world time during a rebalance. Either way, the key design choice is committing synchronously in onPartitionsRevoked so progress is never lost.
pause/resume example (Java)
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(200));
Set<TopicPartition> busy = calcBusyPartitions(inFlightMap);
if (!busy.isEmpty()) consumer.pause(busy);
processAsync(records, inFlightMap, doneQueue);
updateCommitsFrom(doneQueue, commitMap);
if (backlogLow(busy)) consumer.resume(busy);
consumer.commitAsync();Key metrics to watch: commit-latency (average and max), commit-rate, heartbeat-rate, rebalance counts, and record lag (consumer lag). The exact names vary by client, but putting these on a dashboard pays off. Also check whether the time between poll calls is growing (depending on the client, look at metrics like last-poll-seconds-ago).
Common exceptions: CommitFailedException (committing after losing assignment during a rebalance), WakeupException (used for safe shutdown), RebalanceInProgress, and so on. Treat these as expected errors and handle them cleanly — don't let them flood the logs.
Safe shutdown (Java)
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
consumer.wakeup();
}));
try {
while (running.get()) {
ConsumerRecords<String, String> rs = consumer.poll(Duration.ofMillis(300));
process(rs);
}
} catch (WakeupException e) {
// Normal stop trigger
} finally {
try { consumer.commitSync(); } finally { consumer.close(); }
}CCDAK
問題 1
A high-throughput consumer occasionally blocks for up to 90 seconds on an external API call. Settings are max.poll.interval.ms=300000, session.timeout.ms=45000, heartbeat.interval.ms=3000, and enable.auto.commit=false. Which implementation best preserves at-least-once delivery while avoiding unnecessary rebalances?
正解: A
Using pause/resume to throttle the backlog and committing after processing completes makes at-least-once delivery easy to preserve, and it fits the premise of continually calling poll within max.poll.interval.ms. Enabling auto-commit tends to commit before processing finishes, slipping into at-most-once. Reducing how often you poll invites a rebalance by exceeding max.poll.interval.ms. Cranking up heartbeat frequency alone does nothing to fix application-side poll latency.
Is it safe to use enable.auto.commit in production?
It is useful for prototyping and lightweight use cases, but most production systems should use explicit commits after processing (commitSync/commitAsync). auto.commit can advance the offset before processing completes, which risks data loss (at-most-once) during failures.
Are heartbeats independent from poll calls?
The Java consumer sends heartbeats from an internal thread, but if the application stops calling poll for too long, max.poll.interval.ms is exceeded and a rebalance is triggered. Heartbeats keep flowing, but you still must meet the poll-frequency requirement.
Is commitSync slow? How do you choose between commitSync and commitAsync?
commitSync waits for a response from the coordinator, so latency is higher, but reliability is its strength. The well-balanced pattern is to use commitAsync for most cycles and reserve commitSync for critical boundaries such as shutdown or right before a rebalance.
Practice with certification-focused question sets
無料で問題を解いてみるNicheeLab Editorial Team
NicheeLab editorial team focused on data engineering and cloud certification learning. Content is structured around practical study needs and official exam domains.
Kafka Topics & Partitions: Distribution Fundamentals (2026)
How Kafka topics and partitions enable scale — ordering guar...
CCDAK Exam Guide: Confluent Certified Developer (2026)
Complete prep for the CCDAK exam — Producer/Consumer API, St...
CCAAK Exam Guide: Confluent Certified Administrator (2026)
Pass the CCAAK exam — cluster management, partitions, securi...
Kafka Replicas & ISR: Fault Tolerance Explained (2026)
Replica placement, in-sync replicas (ISR), leader election. ...
Kafka Offsets: Commit Modes & Consumer Position (2026)
Offset semantics — auto vs. manual commit, __consumer_offset...