Producer のチューニングは、まず目標を明確にし、信頼性の境界条件を固定してからスループットを押し上げるのが定石です。特に acks、enable.idempotence、retries、max.in.flight.requests.per.connection はメッセージ損失や重複の有無を左右するため、先に方針を決めます。
本稿は Apache Kafka 公式ドキュメントと Confluent の動作仕様に基づく安定概念を前提に、CCDAK 出題傾向に沿って設定の意味を実務目線で解説します。
Producer は同一パーティション宛てのレコードをまとめて送ることでネットワーク往復回数を減らし、スループットを上げます。鍵は batch.size(1バッチの最大バイト数)と linger.ms(バッチを埋めるために待つ最大時間)です。batch.size を上げ、linger.ms を数〜数十ミリ秒に設定すると、レイテンシを少し犠牲にしてもスループットを大幅に改善できます。
buffer.memory は送信待ちの全体バッファ容量であり、生成レートが一時的にブローカー処理能力を上回る局面での安全弁になります。頻繁に BufferExhausted が出るなら、batch.size と linger.ms の見直しに加えて buffer.memory も合わせて拡張します。
Java Producer 推奨スニペット(スループット寄りのバッチ設定例)
Properties p = new Properties();
p.put("bootstrap.servers", "broker1:9092,broker2:9092");
p.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
p.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
// バッチング
p.put("batch.size", 131072); // 128KB
p.put("linger.ms", 20); // 20ms 待ってバッチを埋める
p.put("buffer.memory", 67108864); // 64MB
// 圧縮は次セクション参照
KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(p);compression.type は none、gzip、snappy、lz4、zstd などが選べます。一般的に lz4 や zstd は圧縮・伸長が高速で、ネットワーク帯域節約とスループットの両立に向きます。gzip は圧縮率は高いが CPU コストが高めです。クラスターが対応するアルゴリズムを確認したうえで選定します。
平均レコードが小さい場合でも、バッチ圧縮が効くため全体として帯域を節約できます。超大きいレコードを送る場合は、max.request.size と broker 側の受信上限、レコードヘッダやスキーマのオーバーヘッドも考慮してください。
Java Producer 圧縮設定例
Properties p = new Properties();
// ... 基本設定は省略
p.put("compression.type", "lz4"); // もしくは "zstd"
// zstd 利用時はクラスターの対応状況を事前確認
// 必要に応じてレコードヘッダやシリアライザの最適化も検討信頼性を規定するのは acks の設定です。acks=all はリーダーに加え ISR 全体への複製を確認してから応答するため、ブローカー障害時のデータ損失リスクを最小化します。高スループットを維持したい場合も、acks はまず all を基準に検討します。
retries を有効にすると一時的なネットワークやリーダー交代に耐えられますが、順序の乱れと重複の可能性が出ます。enable.idempotence=true を併用すると、Producer レベルの重複排除により少なくとも一度以上の配信で重複を防げます。idempotence 有効時は max.in.flight.requests.per.connection を高くしすぎないことが重要で、一般に 5 以下が推奨です。
| 設定 | スループットへの影響 | 信頼性/順序性 | 注意点 |
|---|---|---|---|
| acks=0/1 | 往復が軽く高速 | データ損失リスクが高い/中 | 試験用途や一時ログ以外では非推奨 |
| acks=all | 若干低下 | ISR コミットまで確認し堅牢 | broker の min.insync.replicas と対で運用 |
| enable.idempotence=true | わずかなオーバーヘッド | 重複を回避、順序の安定度向上 | max.in.flight は 5 以下が望ましい |
| retries を大きく | 輻輳時に遅延増 | 一時障害に強い | delivery.timeout.ms の上限内で再試行 |
| max.in.flight=1/5/10 | 小/中/大の順で高速化 | 順序安定/許容/乱れやすい | idempotence なしで高値は重複・順序乱れに注意 |
acks=all と ISR へのレプリケーション
信頼性とスループットの両立構成例
Properties p = new Properties();
// ... 基本設定は省略
p.put("acks", "all");
p.put("enable.idempotence", true);
p.put("retries", Integer.toString(Integer.MAX_VALUE));
p.put("max.in.flight.requests.per.connection", 5); // idempotence と両立
p.put("linger.ms", 20);
p.put("batch.size", 131072);
p.put("compression.type", "lz4");delivery.timeout.ms は 1 レコードの送信が成功または失敗で完了するまでの上限時間を定めます。再試行を含めた合計時間の上限であり、これを超えると送信失敗としてコールバックに返ります。スループット重視で retries を大きくするなら、delivery.timeout.ms も十分に長く設定してください。
request.timeout.ms は 1 リクエスト単位の応答待ち時間です。これが短すぎると誤検知による再試行が増え、逆に長すぎると障害検知が遅れます。ネットワーク RTT とブローカーの負荷を踏まえ、レイテンシ観測値の p95〜p99 に少し余裕を持たせると安定します。言語クライアントによって名称が異なる場合があり、Java 以外では message.timeout.ms などを用いる実装もある点に注意します。
タイムアウトとバックオフの設定例
Properties p = new Properties();
// ... 基本設定は省略
p.put("delivery.timeout.ms", 180000); // 約 3 分
p.put("request.timeout.ms", 30000); // 30 秒程度を起点に調整
p.put("retry.backoff.ms", 100); // ブローカー側負荷を考慮
p.put("reconnect.backoff.ms", 50);
p.put("reconnect.backoff.max.ms", 1000);パーティション数は Producer の並列度とスループット上限を決めます。Producer 側ではパーティション単位で送信が直列化されるため、より多くのパーティションに分散するほど同一トピックの総スループットは伸びます。一方で、パーティションを増やしすぎるとブローカーのメタデータやファイルディスクリプタのコストが増します。
順序保証が必要な単位でレコードキーを設計します。同一キーのレコードは同じパーティションにマッピングされ順序が保たれます。ホットキーで偏りが生じるとスループットが頭打ちになるため、キーのエンコーディングやハッシュの多様化で偏りを緩和します。
カスタムパーティショナの雛形(Java)
public class SaltedKeyPartitioner implements Partitioner {
@Override
public void configure(Map<String, ?> configs) {}
@Override
public int partition(String topic, Object key, byte[] keyBytes,
Object value, byte[] valueBytes, Cluster cluster) {
int partitions = cluster.partitionCountForTopic(topic);
// キーのホットスポット緩和のため簡易ソルト
int salt = ThreadLocalRandom.current().nextInt(4); // 0..3
return Utils.toPositive(Utils.murmur2(ByteBuffer.allocate(keyBytes.length+1)
.put(keyBytes).put((byte)salt).array())) % partitions;
}
@Override
public void close() {}
}Producer メトリクスを観測し、ボトルネックを定量化してから設定を動かします。record-error-rate、record-retry-rate、request-latency-avg、batch-size-avg、records-per-request-avg、compression-rate-avg、bufferpool-wait-ratio、outgoing-byte-rate などが有用です。
手順は、信頼性を先に固定し、そのうえでバッチングと圧縮を調整、最後にタイムアウトとバックオフで安定度を整える流れが無難です。各変更は 1 回に 1 点、観測窓を十分に取り、p95/p99 の悪化がないかを確認します。
JMX メトリクス名の例(フィルタ用)
# 例: kafka.producer:type=producer-metrics の一部
kafka.producer:type=producer-metrics,client-id=*,metric=record-error-rate
kafka.producer:type=producer-metrics,client-id=*,metric=record-retry-rate
kafka.producer:type=producer-metrics,client-id=*,metric=request-latency-avg
kafka.producer:type=producer-node-metrics,client-id=*,node-id=*,metric=outgoing-byte-rateCCDAK
問題 1
あなたは高スループットを求めつつ、少なくとも一度以上の配信を満たし、重複は極力避けたい。どの設定組み合わせが最も適切か。
正解: B
B は acks=all と retries により少なくとも一度以上の配信を満たし、enable.idempotence により再試行時の重複を抑制する。max.in.flight=5 はスループットと順序安定の妥協点で、適度な linger.ms と batch.size がバッチ効率を高める。A は安全だが max.in.flight=1 によりスループットを不必要に犠牲にする。C は信頼性要件を満たさない。D は重複リスクが残る。
enable.idempotence とトランザクションの違いは何ですか。
enable.idempotence は単一パーティション内での Producer レベルの重複排除を行い、再試行時の重複を書き込まないようにします。トランザクションは複数パーティション・複数トピック間での原子的な書き込みと、EOS 消費(read-process-write)の整合性を提供します。重複防止だけが目的なら idempotence で十分なことが多いです。
acks=all でもデータ損失は起こり得ますか。
ブローカー側の min.insync.replicas が小さすぎる場合や、ISR が 1 台だけの状態で障害が連続すると損失リスクが残ります。また、アプリ側が送信成功後にクラッシュし、下流で恒久保存される前に処理が失われるなど、エンドツーエンドでの設計次第で損失は起こり得ます。
Python や C/C++ クライアントでは delivery.timeout.ms が見当たりません。
Java Producer では delivery.timeout.ms が再試行を含む送信全体の上限です。librdkafka ベースのクライアントでは message.timeout.ms が同等の役割を担います。利用クライアントの公式設定名と既定値を確認して、意味が一致するように調整してください。
NicheeLab編集部
データエンジニアリング・クラウド資格の専門家。Databricks・Snowflake等の認定資格を保有し、実務経験に基づいた問題作成・解説を行っています。NicheeLab運営。
Kafka Topic と Partition の基礎: 分散とスケーラビリティの要
CCDAK 対策と実務の両立を意識し、Topic/Partition/Replica/Consumer Group の役...
CCDAK 試験ガイド:出題範囲・配点・申込み・対策
Confluent Certified Developer for Apache Kafka (CCDAK) の出題範囲...
Confluent Certified Administrator (CCAAK) 対策: 出題範囲・配点の考え方・運用観点の要点
CCAAKに向けて、試験領域の押さえどころを運用目線で整理。プロダクションで通用する設定・監視・セキュリティの実践知を、...
Kafka の Replica と In-Sync Replicas を正しく設計する: 耐障害性と一貫性
レプリカとISRの仕組みを起点に、acks と min.insync.replicas、クリーン/アンクリーンリーダー選...
Kafka の Offset とコミット: ポジション管理と at-least-once の基礎
CCDAK 対策と実務の両立を意識して、Kafka コンシューマのオフセット管理とコミット戦略を整理。at-least-...