Kafka

Kafkaエラーハンドリングパターン:リトライ・DLQ・補償処理

2026-04-19
NicheeLab編集部

Kafkaで“失敗”は珍しくありません。ネットワーク揺らぎ、スキーマ不一致、下流の一時停止など、原因は多様です。本稿では公式ドキュメントで安定している概念を土台に、リトライ、DLQ、補償処理をどう設計・運用するかを整理します。

CCDAK対策として、配信保証、トランザクション(EOS)、オフセットコミット、パーティション順序性の要点も同時に押さえます。

失敗の分類と設計原則

まずは失敗を分類します。短時間で解消する一時的エラー(ネットワーク断、スロットリング)と、データ自体が原因の恒久的エラー(スキーマ不一致、検証NG、想定外フォーマット)で対処は異なります。

次に、ビジネス上のエラーと技術的エラーを分けます。技術的エラーはリトライやバックオフで回復可能なことが多く、ビジネスエラーはDLQや補償処理で明示的に扱うのが基本です。

Kafkaはメッセージ順序をパーティション単位で提供します。エラーハンドリングは順序性への影響を常に評価し、必要に応じて同一キーを同一パーティションに送る、in-flightを制限する、リトライトピックも同じパーティション数とパーティショナーを使うといった設計を行います。

  • まず原因を可観測化(例:例外種別、元トピック/パーティション/オフセットをヘッダーに格納)
  • 一時的エラーはバックオフ付きで再試行、恒久的エラーはDLQへ早期退避
  • 順序性が重要なキーは同一パーティションへ、リトライトピックも同じパーティショニング戦略を適用
  • 下流が冪等でないなら、プロデューサはenable.idempotence=true、必要に応じてトランザクションでExactly-onceを検討
  • オフセットコミットのタイミングは少なくとも処理結果が“永続化された後”に(at-least-once)。EOSではsendOffsetsToTransactionを利用

リトライ戦略の使い分け

Kafkaのプロデューサリトライはブローカーやネットワークの一時的失敗に有効です。enable.idempotence=trueと組み合わせれば、再送による重複生成をクライアント側で防げます。順序性を重視する場合はmax.in.flight.requests.per.connectionを1に制限するのが定石です。

コンシューマ側の再試行はアプリケーション責務です。インラインでリトライすると、その間パーティションがブロックされてスループットが落ちがちです。実運用では、N段のリトライトピック(retry-1, retry-2, ...)を用い、段階的にバックオフを伸ばすパターンが安定します。最終的に処理不能なレコードはDLQへ送ります。Kafka自体は“遅延キュー”をネイティブ提供していないため、遅延は専用のリトライトピックと消費間隔の制御で表現します。

  • プロデューサ推奨設定の一例: acks=all, enable.idempotence=true, max.in.flight.requests.per.connection=1, delivery.timeout.msはビジネスSLAに合わせて設定
  • コンシューマのインラインリトライは回数を小さく、恒久的エラーは速やかにDLQへ
  • リトライトピックは元トピックと同じパーティション数・パーティショナーを使い、キー順序を維持
  • バックオフは段階的(指数/線形)に増やし、無限リトライは避ける
パターン適用対象主目的長所
プロデューサリトライ送信失敗(技術的)配信成功率の向上idempotenceと併用で重複最小化
コンシューマインラインリトライ処理失敗(技術/一部ビジネス)即時の再試行実装が容易
リトライトピック処理失敗(主に一時的)段階的バックオフと切り分け本流のスループット維持
DLQ処理不能(恒久的)損傷レコードの隔離本流を詰まらせない
補償処理ビジネス不整合後追い是正複数サービス間の整合性回復

リトライトピックとDLQの流れ(キー順序維持)

main-topicConsumerok → 下流処理 / on error ↓retry-1-topic同Partition設計Retry G1遅延/間引きretry-2-topicRetry G2DLQ

DLQの設計と実装の要点

DLQはKafkaの組み込み機能ではなく、通常のトピックで実装します。命名は<元トピック>.DLQや<ドメイン>.dead-letterなど一目で分かる規約にします。リテンションは本流より長めに設定し、監視と再処理ツール(手動/バッチ/専用UI)を用意します。圧縮は通常deleteポリシーで良く、キー削除の意味づけがない限りcompactionは避けます。

順序性を保つには、DLQやリトライトピックのパーティション数・パーティショナーを元トピックと揃えます。ヘッダーに元メタデータとエラー情報を付与し、再処理時のコンテキストを復元可能にします。個人情報や機密はヘッダー/値に不用意に載せないようにし、監査とアクセス制御を設計します。

  • 推奨ヘッダー例: x-original-topic, x-original-partition, x-original-offset, x-exception-class, x-ex-message, x-attempt
  • スキーマはDLQ専用の“エンベロープ”を定義(元ペイロード+メタデータ)
  • 再処理ポリシーを明文化(再投入先、最大回数、手動承認の要否)
  • DLQボリュームの監視としきい値アラートを必ず用意

JavaクライアントによるDLQ/リトライトピック送信例(シンプル版)

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.*;

public class RetryAndDlqExample {
  static String MAIN = "orders";
  static String RETRY_PREFIX = MAIN + ".retry."; // retry.1, retry.2 など
  static String DLQ = MAIN + ".DLQ";

  public static void main(String[] args) {
    Properties cp = new Properties();
    cp.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "broker:9092");
    cp.put(ConsumerConfig.GROUP_ID_CONFIG, "orders-worker");
    cp.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
    KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(cp, new ByteArrayDeserializer(), new ByteArrayDeserializer());
    consumer.subscribe(Collections.singletonList(MAIN));

    Properties pp = new Properties();
    pp.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "broker:9092");
    pp.put(ProducerConfig.ACKS_CONFIG, "all");
    pp.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
    pp.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1");
    pp.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, "120000");
    KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(pp, new ByteArraySerializer(), new ByteArraySerializer());

    while (true) {
      ConsumerRecords<byte[], byte[]> records = consumer.poll(Duration.ofSeconds(1));
      for (ConsumerRecord<byte[], byte[]> r : records) {
        try {
          // ここで業務処理(例:検証/下流呼び出し/DB upsert)
          process(r);
          // 正常時のみオフセットを明示コミット
          consumer.commitSync(Collections.singletonMap(new TopicPartition(r.topic(), r.partition()), new OffsetAndMetadata(r.offset() + 1)));
        } catch (Exception e) {
          int attempt = headerInt(r, "x-attempt", 0) + 1;
          String nextTopic = attempt <= 2 ? RETRY_PREFIX + attempt : DLQ;
          ProducerRecord<byte[], byte[]> pr = new ProducerRecord<>(nextTopic, r.key(), r.value());
          pr.headers().add(new RecordHeader("x-original-topic", r.topic().getBytes(StandardCharsets.UTF_8)));
          pr.headers().add(new RecordHeader("x-original-partition", Integer.toString(r.partition()).getBytes(StandardCharsets.UTF_8)));
          pr.headers().add(new RecordHeader("x-original-offset", Long.toString(r.offset()).getBytes(StandardCharsets.UTF_8)));
          pr.headers().add(new RecordHeader("x-exception-class", e.getClass().getName().getBytes(StandardCharsets.UTF_8)));
          pr.headers().add(new RecordHeader("x-ex-message", Optional.ofNullable(e.getMessage()).orElse("").getBytes(StandardCharsets.UTF_8)));
          pr.headers().add(new RecordHeader("x-attempt", Integer.toString(attempt).getBytes(StandardCharsets.UTF_8)));
          producer.send(pr).get(); // シンプルに同期送信
          // 失敗レコードのオフセットは“処理済み”として進め、本流を詰まらせない
          consumer.commitSync(Collections.singletonMap(new TopicPartition(r.topic(), r.partition()), new OffsetAndMetadata(r.offset() + 1)));
        }
      }
    }
  }

  static void process(ConsumerRecord<byte[], byte[]> r) {
    // ダミー:例外を投げるかもしれない業務処理
  }

  static int headerInt(ConsumerRecord<byte[], byte[]> r, String key, int dflt) {
    var h = r.headers().lastHeader(key);
    if (h == null) return dflt;
    try { return Integer.parseInt(new String(h.value(), StandardCharsets.UTF_8)); } catch (Exception e) { return dflt; }
  }
}

補償処理とSAGA/アウトボックス

補償処理は、ビジネス不整合を後から打ち消すステップ(取り消し/反転)を設計するものです。分散トランザクションを使わずに整合性を保つSAGAパターンでは、各ステップが成功/補償イベントを非同期で発行します。

DB更新とイベント発行の原子性にはアウトボックスパターンが有効です。アプリはDBに業務データとアウトボックス行を同一トランザクションで書き込み、CDCコネクタがその行をKafkaへ確実に流します。補償が必要になったら、補償イベントを新たに発行し下流に反映させます。Kafkaのトランザクション(EOS)はKafka内の読み書き原子性を提供しますが、外部システムとの全体原子性は提供しない点に注意します。

  • 補償は“別イベント”として扱い、下流は冪等に適用できるようにする
  • アウトボックスで“書いてから発行”の順序を固定化。二重書き問題を回避
  • 補償イベントは順序と一意性を担保(キー設計、重複抑止)

モニタリング/アラートと運用の落とし穴

リトライとDLQは“流量の健全性”で監視します。突然のDLQスパイクやリトライトピック滞留は本流の劣化サインです。メトリクスはプロデューサ/コンシューマのクライアントメトリクスに加え、トピックごとのレイテンシや滞留も見ると原因切り分けが速くなります。

アラートは“長時間の滞留”“DLQ比率上昇”“再試行段の増大”などトレンド系に寄せ、即時断を避けるために段階通知にします。再処理ジョブは冪等・スロットリング可能にして、再投入で本流を圧迫しないようにします。

  • Producer: record-error-rate, record-retry-rate, request-latency-avg
  • Consumer: records-lag-max, commit-latency, poll間隔のばらつき
  • トピック: リトライトピック/ DLQのメッセージ数・到達レート・滞留時間
  • アラート例: DLQ比率 > X%、retry-2以上の比率上昇、lagがSLA超過

CCDAK視点の頻出ポイントとアンチパターン

CCDAKでは配信保証(at-most-once/at-least-once/exactly-once)の違い、idempotent producer、トランザクションの制約(1つのトランザクションで複数パーティションへの書き込みとオフセットコミットは可能、外部システムとの原子性は保証しない)が頻出です。

アンチパターンは、無限リトライ、max.poll.interval.ms超過を招く長時間ブロック、DLQ未監視、リトライトピックのパーティション数不一致による順序崩れ、プロデューサのidempotence未設定での再送などです。

  • 少なくとも一度届ける at-least-once: 処理後にオフセットコミット
  • Exactly-once in Kafka: enable.idempotence + トランザクション + sendOffsetsToTransaction
  • 順序性を守るならmax.in.flight.requests.per.connection=1(レイテンシとトレードオフ)
  • DLQは“機能”ではなく“設計”で実現。ヘッダーで再処理可能性を担保
  • リトライトピックは同じキー空間/パーティショナーで

問題で確認

CCDAK

問題 1

コンシューマがトピックAから読み、処理結果をトピックBへ書き出す。恒久的エラーはDLQへ送る。重複を最小化し、同一キーの順序を保ちたい。最も適切な設計はどれか。

  1. プロデューサでenable.idempotence=true、acks=all、max.in.flight.requests.per.connection=1を設定し、トランザクションを用いてBへの書き込みとAのオフセットをsendOffsetsToTransactionで同一トランザクションに含める。恒久的エラー時はトランザクションを中止し、DLQへの送信は別送にする。
  2. コンシューマは自動コミットを有効化し、プロデューサのretriesだけ多くすれば重複も順序問題も発生しない。
  3. コンシューマで無限インラインリトライを行い、エラーが収まるまでpollを止めれば順序は常に守られるため十分である。
  4. DLQトピックをcompactionのみに設定すれば、古い失敗は自動的に消え、順序性も強化される。

正解: A

Kafka内でのExactly-onceを実現するにはidempotent producerとトランザクションを併用し、書き込みとオフセットコミットを同一トランザクションに含める。順序性を重視する場合はin-flightを1に制限する。恒久的エラーは中止後にDLQへ隔離する。Bは重複/順序を保証しない誤り、Cはスループットと安定性を損なう、DはDLQ用途にcompactionのみを使うのは不適切。

よくある質問

DLQはKafkaの標準機能ですか?

いいえ。DLQは通常のトピックとして実装します。命名規約、メタデータ(元トピック/パーティション/オフセット等のヘッダー)、再処理手順、監視を合わせて“DLQ設計”と捉えます。

リトライトピックを使っても順序は保てますか?

単一トピック内の順序はパーティション単位で保たれます。リトライトピックでも元トピックと同じパーティション数・パーティショナー(キーに基づく)を使えば、同一キーの相対順序を実質的に維持できます。ただしトピックを跨ぐ絶対順序はKafkaの保証範囲外です。

プロデューサのretriesは増やせば増やすほど良いですか?

無制限に増やすとレイテンシが膨らみます。delivery.timeout.msの枠内で再送され、これを超えると失敗扱いになります。enable.idempotence=trueと組み合わせ、SLAに合うretries/バックオフを設定してください。順序重視ならin-flightも制限します。

この記事で学んだ内容を問題で確認しましょう

16,000問以上の問題で実力チェック

無料で問題を解いてみる
この記事の著者

NicheeLab編集部

データエンジニアリング・クラウド資格の専門家。Databricks・Snowflake等の認定資格を保有し、実務経験に基づいた問題作成・解説を行っています。NicheeLab運営。


関連記事
Kafka

Kafka Topic と Partition の基礎: 分散とスケーラビリティの要

CCDAK 対策と実務の両立を意識し、Topic/Partition/Replica/Consumer Group の役...

Kafka

CCDAK 試験ガイド:出題範囲・配点・申込み・対策

Confluent Certified Developer for Apache Kafka (CCDAK) の出題範囲...

Kafka

Confluent Certified Administrator (CCAAK) 対策: 出題範囲・配点の考え方・運用観点の要点

CCAAKに向けて、試験領域の押さえどころを運用目線で整理。プロダクションで通用する設定・監視・セキュリティの実践知を、...

Kafka

Kafka の Replica と In-Sync Replicas を正しく設計する: 耐障害性と一貫性

レプリカとISRの仕組みを起点に、acks と min.insync.replicas、クリーン/アンクリーンリーダー選...

Kafka

Kafka の Offset とコミット: ポジション管理と at-least-once の基礎

CCDAK 対策と実務の両立を意識して、Kafka コンシューマのオフセット管理とコミット戦略を整理。at-least-...

Kafkaの記事一覧 (100件)
© 2026 NicheeLab All rights reserved.