Kafka

KafkaのPoison Pillメッセージ実践ガイド:消費不能データの安全な扱い

2026-04-19
NicheeLab編集部

Poison Pillは、コンシューマが取り出したもののアプリやSerdeが処理できず、同じレコードで繰り返し失敗するメッセージを指します。単なる一時障害と違い、放置すると処理パイプラインが詰まり、オフセットコミットの戦略次第では全体を停止させます。

本稿では、公式ドキュメントに基づき、Kafka Connect・Kafka Streams・通常のConsumer APIそれぞれでの失敗隔離、DLQ(Dead Letter Queue)、リトライ設計、スキーマ互換性の設定、監視ポイントをコンパクトに整理します。CCDAK(Confluent Certified Developer for Apache Kafka)受験者が押さえるべき要点も章末にまとめます。

Poison Pillとは何か:症状と原因の切り分け

Poison Pillは、対象レコードが原因でデシリアライズやアプリケーションロジックが永続的に失敗する状態です。典型的にはスキーマ不一致、Serdeミスマッチ、破損データ、想定外の圧縮やエンコーディング、サイズ超過などが原因になります。Kafkaブローカ自体はメッセージを“そのまま”保持するだけなので、ほとんどはコンシューマ側で露呈します。

重要なのは、失敗を“隔離”し、健全なレコードの処理を継続しつつ、後で再処理できるように手がかり(オリジナルのヘッダや例外情報)を残すことです。無制限リトライや無批判なコミットは、無限ループやデータロスを招くため避けます。

  • よくある原因: スキーマ進化の不整合(例: 新フィールド必須化)、Serde設定誤り、レコード破損、文字コード不一致、最大メッセージサイズ超過
  • 初動の切り分け: コンシューマアプリのログ、DLQの有無、対象パーティションのラグ、同一レコードでの再現性
  • KafkaはDLQを“標準機能”としては持たない(ConnectやStreamsには専用の戦略あり)。通常のConsumer APIでは実装パターンで実現する

値の型不一致によるデシリアライズ失敗(例)

// Avroスキーマ(ageはint)
{
  "type": "record",
  "name": "User",
  "fields": [
    {"name": "name", "type": "string"},
    {"name": "age",  "type": "int"}
  ]
}

// 実際のレコード(ageがstring) -> デシリアライズ時に失敗
{"name":"taro","age":"twenty"}

// こうした不一致はSchema Registryの互換性設定や、プロデューサ側のバリデーションで未然に防げます。

失敗の隔離と迂回: DLQ・リトライトピック・停止戦略

Poison Pillが検出されたら、健全系を止めずに当該レコードを迂回させるのが基本です。代表的な方法は、DLQ(Dead Letter Queue)へ振り分ける、専用のリトライトピックで指数バックオフ再処理を行う、あるいは明示的に停止して一貫性を優先する、の3つです。

Kafka Connectは公式にエラー処理とDLQ設定を備えています。Kafka StreamsはDeserializationExceptionHandlerやトポロジ分岐でエラーパスを設けられます。素のConsumer APIでは、自前でDLQにプロデュースし、コミットポイントを制御します。

  • DLQにはオリジナルのヘッダ・キー・パーティション情報・例外メッセージを含める(後処理の手がかり)
  • リトライは“回数と待機”を制限し、無限リトライを避ける(バックオフ + DLQ最終落とし)
  • 停止戦略は金融等の厳格SLAで有効だが、運用復旧手順・監視とセットで
戦略長所注意点/向き不向き
DLQ(隔離)健全系を継続。後から人手/バッチ修正可DLQの運用・保管コスト。後処理設計が必要
リトライトピック(遅延再処理)一時的不整合の自己回復。バックオフで負荷平準化恒久的失敗は結局DLQへ。順序要件に注意
即時停止(フェイルファスト)一貫性最優先で異常を早期顕在化可用性低下。復旧プレイブック必須

Poison Pill隔離の流れ(概念図)

okerror失敗閾値超ProducerTopic P0..PnConsumer GroupValidate/SerdeBusiness sinkRetry Topicexponential backoffDLQ

Kafka ConnectでDLQを有効化する設定例(シンク/ソース共通)

# connector.properties(一部)
errors.tolerance=all
errors.deadletterqueue.topic.name=dlq.user-v1
errors.deadletterqueue.context.headers.enable=true
errors.log.enable=true
errors.log.include.messages=true

# レコードサイズが大きい場合の調整例(必要に応じて)
producer.max.request.size=1048576
consumer.max.partition.fetch.bytes=1048576

スキーマ互換性戦略:未然防止と段階的移行

Poison Pillの多くはスキーマ進化の不整合で発生します。Schema Registryで後方互換(BACKWARD/ BACKWARD_TRANSITIVE)を基本に、プロデューサは新旧クライアントの共存期間を確保します。必須化や型変更など“破壊的変更”は、デフォルト値の導入、フィールドの追加→旧削除の二段階などで段階移行します。

サブジェクト命名戦略(TopicNameStrategy/RecordNameStrategyなど)によって、互換性の適用範囲が変わります。実務でも試験でも、“どの単位で進化を管理するか”を明確に説明できることが重要です。

  • 基本は後方互換を守る(古いコンシューマが新しいメッセージを読める)
  • 破壊的変更が必要な場合は、並行稼働期間を設けてローリング移行
  • Schema Registryでグローバルとサブジェクト単位の互換性を区別する

Schema Registryで互換性レベルを設定(例: 後方互換)

# グローバル互換性
curl -s -X PUT -H 'Content-Type: application/json' \
  --data '{"compatibility":"BACKWARD"}' \
  http://schema-registry:8081/config

# サブジェクト単位(topic-value など)
curl -s -X PUT -H 'Content-Type: application/json' \
  --data '{"compatibility":"BACKWARD"}' \
  http://schema-registry:8081/config/my-topic-value

# 互換性検証(登録前のdry-run)
curl -s -X POST -H 'Content-Type: application/vnd.schemaregistry.v1+json' \
  --data '{"schema":"<avro schema string>"}' \
  http://schema-registry:8081/compatibility/subjects/my-topic-value/versions/latest

Consumer APIでの堅牢設計:コミット制御と再送設計

素のConsumer APIでは、DLQは自前実装になります。原則は“成功時にのみオフセットコミット”。失敗時は対象レコードをDLQへプロデュースし、同一レコードでの無限ループを避けるために、明示的に次へ進める(seek)か、失敗を例外としてスキップ処理に落とす設計を取ります。

ヘッダに原因(例外種別、スキーマID、元パーティション/オフセット)を付与すると、後続の修復や再処理が容易になります。最終的に修復したら、DLQから元トピックへ“再投入する”バッチを用意すると運用負荷が下がります。

  • enable.auto.commit=false にして、処理成功後に commitSync/Async
  • デシリアライズ前にバイト列を扱うなら ByteArrayDeserializer で一次受け→検証→変換
  • max.poll.interval.ms と処理時間のバランス、backoffの実装、Rebalance時の再入を考慮

Java(Kafka Consumer)でのDLQ転送と安全なコミット例

var consumer = new KafkaConsumer<String, byte[]>(consumerProps);
var dlqProducer = new KafkaProducer<String, byte[]>(producerProps);
consumer.subscribe(List.of("input"));

while (true) {
  ConsumerRecords<String, byte[]> batch = consumer.poll(Duration.ofSeconds(1));
  for (ConsumerRecord<String, byte[]> r : batch) {
    try {
      // 独自デコード(例: Avro/JSON検証)
      var record = decode(r.value());
      process(record);
      // 成功時のみコミット(手動)
      consumer.commitSync(Collections.singletonMap(
        new TopicPartition(r.topic(), r.partition()),
        new OffsetAndMetadata(r.offset() + 1)));
    } catch (Exception e) {
      // DLQへ転送(オリジナル情報をヘッダに付与)
      Headers h = new RecordHeaders(r.headers());
      h.add("error.class", e.getClass().getName().getBytes(StandardCharsets.UTF_8));
      h.add("error.message", Optional.ofNullable(e.getMessage()).orElse("").getBytes(StandardCharsets.UTF_8));
      h.add("source.topic", r.topic().getBytes(StandardCharsets.UTF_8));
      h.add("source.partition", Integer.toString(r.partition()).getBytes(StandardCharsets.UTF_8));
      h.add("source.offset", Long.toString(r.offset()).getBytes(StandardCharsets.UTF_8));
      dlqProducer.send(new ProducerRecord<>("dlq.input", null, r.key(), r.value(), h));

      // 次のレコードへ進める(無限ループを避ける)
      consumer.seek(new TopicPartition(r.topic(), r.partition()), r.offset() + 1);
    }
  }
}

運用監視とSLA:ラグ、エラーレート、DLQ滞留

Poison Pill対策は“見える化”が不可欠です。コンシューマラグ、DLQ流量・滞留、リトライトピックの再処理率、例外の種類分布を継続観測し、しきい値でアラートします。ConnectやStreamsはメトリクスが充実しており、JMX ExporterやConfluent Control Centerで可視化できます。

運用でハマるのは、DLQが静かに溜まり続けるケースです。滞留長(滞在時間)と件数をダッシュボード化し、一定閾値でオンコール。定期のリプレイ・修復手順(再投入、スキーマ修正、代替変換)を標準化します。

  • ラグ確認: kafka-consumer-groups.sh --describe でグループ単位のラグを即時把握
  • DLQ/Retryトピックの件数・到着率・滞在時間(Processing latency)を監視
  • Connect: errors-total/ deadletterqueue-produce-failures-total 等のメトリクス活用

ラグ/滞留の簡易確認コマンド例

# コンシューマラグ
kafka-consumer-groups.sh \
  --bootstrap-server broker:9092 \
  --group my-group \
  --describe

# トピック件数(概数)
kafka-run-class.sh kafka.tools.GetOffsetShell \
  --broker-list broker:9092 \
  --topic dlq.input --time -1 | awk -F: '{sum+=$3} END {print sum}'

CCDAK対策の要点:設計選択を問う設問に強くなる

CCDAKでは、“どのレイヤで、どの戦略を選ぶか”を正しく説明できることが問われます。Kafka ConnectのDLQ設定、Kafka Streamsの例外ハンドラ、Schema Registryの互換性、Consumerのコミット戦略が頻出です。プロダクション設計のつもりで、可用性と一貫性のトレードオフを言語化できるようにしておきましょう。

また、単なる“リトライ”だけでなく、メッセージ順序やExactly-Once Semantics(EOS)、トランザクションの可視性(isolation.level=read_committed)が設問に絡むことがあります。Poison PillはEOSの対象外ですが、コミット位置の扱いが整合性に直結する点を押さえてください。

  • Connect: errors.tolerance/ errors.deadletterqueue.topic.name/ headers.enable の意味をセットで
  • Streams: LogAndContinue/LogAndFail の違い、分岐でのエラートピック出し
  • Consumer: 成功時のみコミット、DLQへのメタデータ付与、無限ループ防止(seek/skip)

Kafka Streams: デシリアライズ例外の扱い(ハンドラ設定例)

Properties p = new Properties();
p.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
      org.apache.kafka.streams.errors.LogAndContinueExceptionHandler.class);
// LogAndFail を選ぶと既定でアプリ停止(健全性優先)

StreamsBuilder b = new StreamsBuilder();
KStream<String, String> in = b.stream("input");
KStream<String, String>[] branches = in.branch(
  (k, v) -> isValid(v),    // 正常系
  (k, v) -> true           // 異常系
);
branches[0].to("output");
branches[1].to("dlq.input");

問題で確認

CCDAK

問題 1

Kafka Connectシンクコネクタで、スキーマ不一致により一部レコードが書き込み不能になっています。健全なレコード処理は継続しつつ、失敗レコードを後から再処理できるようにしたい。最も適切な設定の組み合わせはどれか。

  1. errors.tolerance=none を設定し、失敗時にコネクタを停止する
  2. errors.tolerance=all と errors.deadletterqueue.topic.name を設定し、ヘッダ出力も有効化する
  3. consumer.auto.offset.reset=earliest を設定して自動的に再試行させる
  4. プロデューサのacks=0 にしてスループットを優先する

正解: B

健全系を継続しつつ失敗レコードを隔離するには、ConnectのDLQ機能を使います。errors.tolerance=all でエラー許容、errors.deadletterqueue.topic.name でDLQ宛先、さらに errors.deadletterqueue.context.headers.enable=true で原因情報をヘッダに含めるのが定石です。

よくある質問

Kafka自体にDLQ機能はありますか?

ブローカには“DLQ”という専用機能はありません。Kafka ConnectにはDLQ設定が、Kafka Streamsには例外ハンドラや分岐でのエラールート実装が用意されています。素のConsumer APIでは自前にDLQトピックへプロデュースする実装を行います。

Poison Pillがあるとオフセットはどう扱うべきですか?

成功時のみコミットが原則です。失敗レコードはDLQに転送し、consumer.seek などで次へ進めて無限ループを避けます。auto commitに頼ると未処理でもコミットされる場合があり、再処理不能のリスクが高まります。

スキーマ不一致を減らすには?

Schema Registryで後方互換を維持し、破壊的変更は段階移行を徹底します。プロデューサ側でスキーマ検証を入れ、CI/CDで互換性チェック(/compatibility API)を自動化すると効果的です。

この記事で学んだ内容を問題で確認しましょう

16,000問以上の問題で実力チェック

無料で問題を解いてみる
この記事の著者

NicheeLab編集部

データエンジニアリング・クラウド資格の専門家。Databricks・Snowflake等の認定資格を保有し、実務経験に基づいた問題作成・解説を行っています。NicheeLab運営。


関連記事
Kafka

Kafka Topic と Partition の基礎: 分散とスケーラビリティの要

CCDAK 対策と実務の両立を意識し、Topic/Partition/Replica/Consumer Group の役...

Kafka

CCDAK 試験ガイド:出題範囲・配点・申込み・対策

Confluent Certified Developer for Apache Kafka (CCDAK) の出題範囲...

Kafka

Confluent Certified Administrator (CCAAK) 対策: 出題範囲・配点の考え方・運用観点の要点

CCAAKに向けて、試験領域の押さえどころを運用目線で整理。プロダクションで通用する設定・監視・セキュリティの実践知を、...

Kafka

Kafka の Replica と In-Sync Replicas を正しく設計する: 耐障害性と一貫性

レプリカとISRの仕組みを起点に、acks と min.insync.replicas、クリーン/アンクリーンリーダー選...

Kafka

Kafka の Offset とコミット: ポジション管理と at-least-once の基礎

CCDAK 対策と実務の両立を意識して、Kafka コンシューマのオフセット管理とコミット戦略を整理。at-least-...

Kafkaの記事一覧 (101件)
© 2026 NicheeLab All rights reserved.