Poison Pillは、コンシューマが取り出したもののアプリやSerdeが処理できず、同じレコードで繰り返し失敗するメッセージを指します。単なる一時障害と違い、放置すると処理パイプラインが詰まり、オフセットコミットの戦略次第では全体を停止させます。
本稿では、公式ドキュメントに基づき、Kafka Connect・Kafka Streams・通常のConsumer APIそれぞれでの失敗隔離、DLQ(Dead Letter Queue)、リトライ設計、スキーマ互換性の設定、監視ポイントをコンパクトに整理します。CCDAK(Confluent Certified Developer for Apache Kafka)受験者が押さえるべき要点も章末にまとめます。
Poison Pillは、対象レコードが原因でデシリアライズやアプリケーションロジックが永続的に失敗する状態です。典型的にはスキーマ不一致、Serdeミスマッチ、破損データ、想定外の圧縮やエンコーディング、サイズ超過などが原因になります。Kafkaブローカ自体はメッセージを“そのまま”保持するだけなので、ほとんどはコンシューマ側で露呈します。
重要なのは、失敗を“隔離”し、健全なレコードの処理を継続しつつ、後で再処理できるように手がかり(オリジナルのヘッダや例外情報)を残すことです。無制限リトライや無批判なコミットは、無限ループやデータロスを招くため避けます。
値の型不一致によるデシリアライズ失敗(例)
// Avroスキーマ(ageはint)
{
"type": "record",
"name": "User",
"fields": [
{"name": "name", "type": "string"},
{"name": "age", "type": "int"}
]
}
// 実際のレコード(ageがstring) -> デシリアライズ時に失敗
{"name":"taro","age":"twenty"}
// こうした不一致はSchema Registryの互換性設定や、プロデューサ側のバリデーションで未然に防げます。Poison Pillが検出されたら、健全系を止めずに当該レコードを迂回させるのが基本です。代表的な方法は、DLQ(Dead Letter Queue)へ振り分ける、専用のリトライトピックで指数バックオフ再処理を行う、あるいは明示的に停止して一貫性を優先する、の3つです。
Kafka Connectは公式にエラー処理とDLQ設定を備えています。Kafka StreamsはDeserializationExceptionHandlerやトポロジ分岐でエラーパスを設けられます。素のConsumer APIでは、自前でDLQにプロデュースし、コミットポイントを制御します。
| 戦略 | 長所 | 注意点/向き不向き |
|---|---|---|
| DLQ(隔離) | 健全系を継続。後から人手/バッチ修正可 | DLQの運用・保管コスト。後処理設計が必要 |
| リトライトピック(遅延再処理) | 一時的不整合の自己回復。バックオフで負荷平準化 | 恒久的失敗は結局DLQへ。順序要件に注意 |
| 即時停止(フェイルファスト) | 一貫性最優先で異常を早期顕在化 | 可用性低下。復旧プレイブック必須 |
Poison Pill隔離の流れ(概念図)
Kafka ConnectでDLQを有効化する設定例(シンク/ソース共通)
# connector.properties(一部)
errors.tolerance=all
errors.deadletterqueue.topic.name=dlq.user-v1
errors.deadletterqueue.context.headers.enable=true
errors.log.enable=true
errors.log.include.messages=true
# レコードサイズが大きい場合の調整例(必要に応じて)
producer.max.request.size=1048576
consumer.max.partition.fetch.bytes=1048576Poison Pillの多くはスキーマ進化の不整合で発生します。Schema Registryで後方互換(BACKWARD/ BACKWARD_TRANSITIVE)を基本に、プロデューサは新旧クライアントの共存期間を確保します。必須化や型変更など“破壊的変更”は、デフォルト値の導入、フィールドの追加→旧削除の二段階などで段階移行します。
サブジェクト命名戦略(TopicNameStrategy/RecordNameStrategyなど)によって、互換性の適用範囲が変わります。実務でも試験でも、“どの単位で進化を管理するか”を明確に説明できることが重要です。
Schema Registryで互換性レベルを設定(例: 後方互換)
# グローバル互換性
curl -s -X PUT -H 'Content-Type: application/json' \
--data '{"compatibility":"BACKWARD"}' \
http://schema-registry:8081/config
# サブジェクト単位(topic-value など)
curl -s -X PUT -H 'Content-Type: application/json' \
--data '{"compatibility":"BACKWARD"}' \
http://schema-registry:8081/config/my-topic-value
# 互換性検証(登録前のdry-run)
curl -s -X POST -H 'Content-Type: application/vnd.schemaregistry.v1+json' \
--data '{"schema":"<avro schema string>"}' \
http://schema-registry:8081/compatibility/subjects/my-topic-value/versions/latest素のConsumer APIでは、DLQは自前実装になります。原則は“成功時にのみオフセットコミット”。失敗時は対象レコードをDLQへプロデュースし、同一レコードでの無限ループを避けるために、明示的に次へ進める(seek)か、失敗を例外としてスキップ処理に落とす設計を取ります。
ヘッダに原因(例外種別、スキーマID、元パーティション/オフセット)を付与すると、後続の修復や再処理が容易になります。最終的に修復したら、DLQから元トピックへ“再投入する”バッチを用意すると運用負荷が下がります。
Java(Kafka Consumer)でのDLQ転送と安全なコミット例
var consumer = new KafkaConsumer<String, byte[]>(consumerProps);
var dlqProducer = new KafkaProducer<String, byte[]>(producerProps);
consumer.subscribe(List.of("input"));
while (true) {
ConsumerRecords<String, byte[]> batch = consumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, byte[]> r : batch) {
try {
// 独自デコード(例: Avro/JSON検証)
var record = decode(r.value());
process(record);
// 成功時のみコミット(手動)
consumer.commitSync(Collections.singletonMap(
new TopicPartition(r.topic(), r.partition()),
new OffsetAndMetadata(r.offset() + 1)));
} catch (Exception e) {
// DLQへ転送(オリジナル情報をヘッダに付与)
Headers h = new RecordHeaders(r.headers());
h.add("error.class", e.getClass().getName().getBytes(StandardCharsets.UTF_8));
h.add("error.message", Optional.ofNullable(e.getMessage()).orElse("").getBytes(StandardCharsets.UTF_8));
h.add("source.topic", r.topic().getBytes(StandardCharsets.UTF_8));
h.add("source.partition", Integer.toString(r.partition()).getBytes(StandardCharsets.UTF_8));
h.add("source.offset", Long.toString(r.offset()).getBytes(StandardCharsets.UTF_8));
dlqProducer.send(new ProducerRecord<>("dlq.input", null, r.key(), r.value(), h));
// 次のレコードへ進める(無限ループを避ける)
consumer.seek(new TopicPartition(r.topic(), r.partition()), r.offset() + 1);
}
}
}Poison Pill対策は“見える化”が不可欠です。コンシューマラグ、DLQ流量・滞留、リトライトピックの再処理率、例外の種類分布を継続観測し、しきい値でアラートします。ConnectやStreamsはメトリクスが充実しており、JMX ExporterやConfluent Control Centerで可視化できます。
運用でハマるのは、DLQが静かに溜まり続けるケースです。滞留長(滞在時間)と件数をダッシュボード化し、一定閾値でオンコール。定期のリプレイ・修復手順(再投入、スキーマ修正、代替変換)を標準化します。
ラグ/滞留の簡易確認コマンド例
# コンシューマラグ
kafka-consumer-groups.sh \
--bootstrap-server broker:9092 \
--group my-group \
--describe
# トピック件数(概数)
kafka-run-class.sh kafka.tools.GetOffsetShell \
--broker-list broker:9092 \
--topic dlq.input --time -1 | awk -F: '{sum+=$3} END {print sum}'CCDAKでは、“どのレイヤで、どの戦略を選ぶか”を正しく説明できることが問われます。Kafka ConnectのDLQ設定、Kafka Streamsの例外ハンドラ、Schema Registryの互換性、Consumerのコミット戦略が頻出です。プロダクション設計のつもりで、可用性と一貫性のトレードオフを言語化できるようにしておきましょう。
また、単なる“リトライ”だけでなく、メッセージ順序やExactly-Once Semantics(EOS)、トランザクションの可視性(isolation.level=read_committed)が設問に絡むことがあります。Poison PillはEOSの対象外ですが、コミット位置の扱いが整合性に直結する点を押さえてください。
Kafka Streams: デシリアライズ例外の扱い(ハンドラ設定例)
Properties p = new Properties();
p.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
org.apache.kafka.streams.errors.LogAndContinueExceptionHandler.class);
// LogAndFail を選ぶと既定でアプリ停止(健全性優先)
StreamsBuilder b = new StreamsBuilder();
KStream<String, String> in = b.stream("input");
KStream<String, String>[] branches = in.branch(
(k, v) -> isValid(v), // 正常系
(k, v) -> true // 異常系
);
branches[0].to("output");
branches[1].to("dlq.input");CCDAK
問題 1
Kafka Connectシンクコネクタで、スキーマ不一致により一部レコードが書き込み不能になっています。健全なレコード処理は継続しつつ、失敗レコードを後から再処理できるようにしたい。最も適切な設定の組み合わせはどれか。
正解: B
健全系を継続しつつ失敗レコードを隔離するには、ConnectのDLQ機能を使います。errors.tolerance=all でエラー許容、errors.deadletterqueue.topic.name でDLQ宛先、さらに errors.deadletterqueue.context.headers.enable=true で原因情報をヘッダに含めるのが定石です。
Kafka自体にDLQ機能はありますか?
ブローカには“DLQ”という専用機能はありません。Kafka ConnectにはDLQ設定が、Kafka Streamsには例外ハンドラや分岐でのエラールート実装が用意されています。素のConsumer APIでは自前にDLQトピックへプロデュースする実装を行います。
Poison Pillがあるとオフセットはどう扱うべきですか?
成功時のみコミットが原則です。失敗レコードはDLQに転送し、consumer.seek などで次へ進めて無限ループを避けます。auto commitに頼ると未処理でもコミットされる場合があり、再処理不能のリスクが高まります。
スキーマ不一致を減らすには?
Schema Registryで後方互換を維持し、破壊的変更は段階移行を徹底します。プロデューサ側でスキーマ検証を入れ、CI/CDで互換性チェック(/compatibility API)を自動化すると効果的です。
NicheeLab編集部
データエンジニアリング・クラウド資格の専門家。Databricks・Snowflake等の認定資格を保有し、実務経験に基づいた問題作成・解説を行っています。NicheeLab運営。
Kafka Topic と Partition の基礎: 分散とスケーラビリティの要
CCDAK 対策と実務の両立を意識し、Topic/Partition/Replica/Consumer Group の役...
CCDAK 試験ガイド:出題範囲・配点・申込み・対策
Confluent Certified Developer for Apache Kafka (CCDAK) の出題範囲...
Confluent Certified Administrator (CCAAK) 対策: 出題範囲・配点の考え方・運用観点の要点
CCAAKに向けて、試験領域の押さえどころを運用目線で整理。プロダクションで通用する設定・監視・セキュリティの実践知を、...
Kafka の Replica と In-Sync Replicas を正しく設計する: 耐障害性と一貫性
レプリカとISRの仕組みを起点に、acks と min.insync.replicas、クリーン/アンクリーンリーダー選...
Kafka の Offset とコミット: ポジション管理と at-least-once の基礎
CCDAK 対策と実務の両立を意識して、Kafka コンシューマのオフセット管理とコミット戦略を整理。at-least-...