Kafkaで“失敗”は珍しくありません。ネットワーク揺らぎ、スキーマ不一致、下流の一時停止など、原因は多様です。本稿では公式ドキュメントで安定している概念を土台に、リトライ、DLQ、補償処理をどう設計・運用するかを整理します。
CCDAK対策として、配信保証、トランザクション(EOS)、オフセットコミット、パーティション順序性の要点も同時に押さえます。
まずは失敗を分類します。短時間で解消する一時的エラー(ネットワーク断、スロットリング)と、データ自体が原因の恒久的エラー(スキーマ不一致、検証NG、想定外フォーマット)で対処は異なります。
次に、ビジネス上のエラーと技術的エラーを分けます。技術的エラーはリトライやバックオフで回復可能なことが多く、ビジネスエラーはDLQや補償処理で明示的に扱うのが基本です。
Kafkaはメッセージ順序をパーティション単位で提供します。エラーハンドリングは順序性への影響を常に評価し、必要に応じて同一キーを同一パーティションに送る、in-flightを制限する、リトライトピックも同じパーティション数とパーティショナーを使うといった設計を行います。
Kafkaのプロデューサリトライはブローカーやネットワークの一時的失敗に有効です。enable.idempotence=trueと組み合わせれば、再送による重複生成をクライアント側で防げます。順序性を重視する場合はmax.in.flight.requests.per.connectionを1に制限するのが定石です。
コンシューマ側の再試行はアプリケーション責務です。インラインでリトライすると、その間パーティションがブロックされてスループットが落ちがちです。実運用では、N段のリトライトピック(retry-1, retry-2, ...)を用い、段階的にバックオフを伸ばすパターンが安定します。最終的に処理不能なレコードはDLQへ送ります。Kafka自体は“遅延キュー”をネイティブ提供していないため、遅延は専用のリトライトピックと消費間隔の制御で表現します。
| パターン | 適用対象 | 主目的 | 長所 |
|---|---|---|---|
| プロデューサリトライ | 送信失敗(技術的) | 配信成功率の向上 | idempotenceと併用で重複最小化 |
| コンシューマインラインリトライ | 処理失敗(技術/一部ビジネス) | 即時の再試行 | 実装が容易 |
| リトライトピック | 処理失敗(主に一時的) | 段階的バックオフと切り分け | 本流のスループット維持 |
| DLQ | 処理不能(恒久的) | 損傷レコードの隔離 | 本流を詰まらせない |
| 補償処理 | ビジネス不整合 | 後追い是正 | 複数サービス間の整合性回復 |
リトライトピックとDLQの流れ(キー順序維持)
DLQはKafkaの組み込み機能ではなく、通常のトピックで実装します。命名は<元トピック>.DLQや<ドメイン>.dead-letterなど一目で分かる規約にします。リテンションは本流より長めに設定し、監視と再処理ツール(手動/バッチ/専用UI)を用意します。圧縮は通常deleteポリシーで良く、キー削除の意味づけがない限りcompactionは避けます。
順序性を保つには、DLQやリトライトピックのパーティション数・パーティショナーを元トピックと揃えます。ヘッダーに元メタデータとエラー情報を付与し、再処理時のコンテキストを復元可能にします。個人情報や機密はヘッダー/値に不用意に載せないようにし、監査とアクセス制御を設計します。
JavaクライアントによるDLQ/リトライトピック送信例(シンプル版)
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.*;
public class RetryAndDlqExample {
static String MAIN = "orders";
static String RETRY_PREFIX = MAIN + ".retry."; // retry.1, retry.2 など
static String DLQ = MAIN + ".DLQ";
public static void main(String[] args) {
Properties cp = new Properties();
cp.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "broker:9092");
cp.put(ConsumerConfig.GROUP_ID_CONFIG, "orders-worker");
cp.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(cp, new ByteArrayDeserializer(), new ByteArrayDeserializer());
consumer.subscribe(Collections.singletonList(MAIN));
Properties pp = new Properties();
pp.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "broker:9092");
pp.put(ProducerConfig.ACKS_CONFIG, "all");
pp.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
pp.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1");
pp.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, "120000");
KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(pp, new ByteArraySerializer(), new ByteArraySerializer());
while (true) {
ConsumerRecords<byte[], byte[]> records = consumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<byte[], byte[]> r : records) {
try {
// ここで業務処理(例:検証/下流呼び出し/DB upsert)
process(r);
// 正常時のみオフセットを明示コミット
consumer.commitSync(Collections.singletonMap(new TopicPartition(r.topic(), r.partition()), new OffsetAndMetadata(r.offset() + 1)));
} catch (Exception e) {
int attempt = headerInt(r, "x-attempt", 0) + 1;
String nextTopic = attempt <= 2 ? RETRY_PREFIX + attempt : DLQ;
ProducerRecord<byte[], byte[]> pr = new ProducerRecord<>(nextTopic, r.key(), r.value());
pr.headers().add(new RecordHeader("x-original-topic", r.topic().getBytes(StandardCharsets.UTF_8)));
pr.headers().add(new RecordHeader("x-original-partition", Integer.toString(r.partition()).getBytes(StandardCharsets.UTF_8)));
pr.headers().add(new RecordHeader("x-original-offset", Long.toString(r.offset()).getBytes(StandardCharsets.UTF_8)));
pr.headers().add(new RecordHeader("x-exception-class", e.getClass().getName().getBytes(StandardCharsets.UTF_8)));
pr.headers().add(new RecordHeader("x-ex-message", Optional.ofNullable(e.getMessage()).orElse("").getBytes(StandardCharsets.UTF_8)));
pr.headers().add(new RecordHeader("x-attempt", Integer.toString(attempt).getBytes(StandardCharsets.UTF_8)));
producer.send(pr).get(); // シンプルに同期送信
// 失敗レコードのオフセットは“処理済み”として進め、本流を詰まらせない
consumer.commitSync(Collections.singletonMap(new TopicPartition(r.topic(), r.partition()), new OffsetAndMetadata(r.offset() + 1)));
}
}
}
}
static void process(ConsumerRecord<byte[], byte[]> r) {
// ダミー:例外を投げるかもしれない業務処理
}
static int headerInt(ConsumerRecord<byte[], byte[]> r, String key, int dflt) {
var h = r.headers().lastHeader(key);
if (h == null) return dflt;
try { return Integer.parseInt(new String(h.value(), StandardCharsets.UTF_8)); } catch (Exception e) { return dflt; }
}
}
補償処理は、ビジネス不整合を後から打ち消すステップ(取り消し/反転)を設計するものです。分散トランザクションを使わずに整合性を保つSAGAパターンでは、各ステップが成功/補償イベントを非同期で発行します。
DB更新とイベント発行の原子性にはアウトボックスパターンが有効です。アプリはDBに業務データとアウトボックス行を同一トランザクションで書き込み、CDCコネクタがその行をKafkaへ確実に流します。補償が必要になったら、補償イベントを新たに発行し下流に反映させます。Kafkaのトランザクション(EOS)はKafka内の読み書き原子性を提供しますが、外部システムとの全体原子性は提供しない点に注意します。
リトライとDLQは“流量の健全性”で監視します。突然のDLQスパイクやリトライトピック滞留は本流の劣化サインです。メトリクスはプロデューサ/コンシューマのクライアントメトリクスに加え、トピックごとのレイテンシや滞留も見ると原因切り分けが速くなります。
アラートは“長時間の滞留”“DLQ比率上昇”“再試行段の増大”などトレンド系に寄せ、即時断を避けるために段階通知にします。再処理ジョブは冪等・スロットリング可能にして、再投入で本流を圧迫しないようにします。
CCDAKでは配信保証(at-most-once/at-least-once/exactly-once)の違い、idempotent producer、トランザクションの制約(1つのトランザクションで複数パーティションへの書き込みとオフセットコミットは可能、外部システムとの原子性は保証しない)が頻出です。
アンチパターンは、無限リトライ、max.poll.interval.ms超過を招く長時間ブロック、DLQ未監視、リトライトピックのパーティション数不一致による順序崩れ、プロデューサのidempotence未設定での再送などです。
CCDAK
問題 1
コンシューマがトピックAから読み、処理結果をトピックBへ書き出す。恒久的エラーはDLQへ送る。重複を最小化し、同一キーの順序を保ちたい。最も適切な設計はどれか。
正解: A
Kafka内でのExactly-onceを実現するにはidempotent producerとトランザクションを併用し、書き込みとオフセットコミットを同一トランザクションに含める。順序性を重視する場合はin-flightを1に制限する。恒久的エラーは中止後にDLQへ隔離する。Bは重複/順序を保証しない誤り、Cはスループットと安定性を損なう、DはDLQ用途にcompactionのみを使うのは不適切。
DLQはKafkaの標準機能ですか?
いいえ。DLQは通常のトピックとして実装します。命名規約、メタデータ(元トピック/パーティション/オフセット等のヘッダー)、再処理手順、監視を合わせて“DLQ設計”と捉えます。
リトライトピックを使っても順序は保てますか?
単一トピック内の順序はパーティション単位で保たれます。リトライトピックでも元トピックと同じパーティション数・パーティショナー(キーに基づく)を使えば、同一キーの相対順序を実質的に維持できます。ただしトピックを跨ぐ絶対順序はKafkaの保証範囲外です。
プロデューサのretriesは増やせば増やすほど良いですか?
無制限に増やすとレイテンシが膨らみます。delivery.timeout.msの枠内で再送され、これを超えると失敗扱いになります。enable.idempotence=trueと組み合わせ、SLAに合うretries/バックオフを設定してください。順序重視ならin-flightも制限します。
NicheeLab編集部
データエンジニアリング・クラウド資格の専門家。Databricks・Snowflake等の認定資格を保有し、実務経験に基づいた問題作成・解説を行っています。NicheeLab運営。
Kafka Topic と Partition の基礎: 分散とスケーラビリティの要
CCDAK 対策と実務の両立を意識し、Topic/Partition/Replica/Consumer Group の役...
CCDAK 試験ガイド:出題範囲・配点・申込み・対策
Confluent Certified Developer for Apache Kafka (CCDAK) の出題範囲...
Confluent Certified Administrator (CCAAK) 対策: 出題範囲・配点の考え方・運用観点の要点
CCAAKに向けて、試験領域の押さえどころを運用目線で整理。プロダクションで通用する設定・監視・セキュリティの実践知を、...
Kafka の Replica と In-Sync Replicas を正しく設計する: 耐障害性と一貫性
レプリカとISRの仕組みを起点に、acks と min.insync.replicas、クリーン/アンクリーンリーダー選...
Kafka の Offset とコミット: ポジション管理と at-least-once の基礎
CCDAK 対策と実務の両立を意識して、Kafka コンシューマのオフセット管理とコミット戦略を整理。at-least-...