Producer はアプリスレッドでレコードを直列化し、パーティショナーで宛先を決め、RecordAccumulator に蓄えて Sender スレッドがバッチ化してブローカーへ送ります。この一連の流れと各設定の役割を理解すると、試験対策にも現場のチューニングにも強くなれます。
本稿は公式ドキュメントに沿った安定概念を中心にまとめ、バージョン差分に依存しやすい既定値には踏み込みすぎず、効果とトレードオフに焦点を当てます。
Producer の主要パスは「直列化 → パーティショニング → 蓄積(RecordAccumulator) → 送信(Sender スレッド) → 応答(ACK) → コールバック」です。アプリ側の send は多くの場合ノンブロッキングで、実送信はバックグラウンドで進みます。
バッチ単位はトピックパーティションごとに作られ、batch.size と linger.ms によりサイズ駆動・時間駆動のどちらでもフラッシュされます。圧縮はバッチ単位で効くため、バッチングと compression.type はセットで考えるとスループット効率が上がります。
応答は acks 設定に従って返り、失敗時は retries と backoff、そして delivery.timeout.ms までの間で再送されます。順序性の維持は max.in.flight.requests.per.connection と idempotence の有無に強く影響されます。
Producer 送信パス(概念)
最小サンプル(同期送信で挙動を可視化)
Properties p = new Properties();
p.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
p.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringSerializer.class.getName());
p.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringSerializer.class.getName());
try (KafkaProducer<String, String> producer = new KafkaProducer<>(p)) {
ProducerRecord<String, String> rec = new ProducerRecord<>("orders", "order-1", "payload");
RecordMetadata md = producer.send(rec).get(); // 確実に送れたかを同期確認(学習用途)
System.out.printf("topic=%s partition=%d offset=%d%n", md.topic(), md.partition(), md.offset());
}
Producer はトピックパーティション単位でバッチを構築します。batch.size は1バッチの上限(バイト)で、linger.ms は十分なレコードが溜まらない場合に待機してバッチを太らせます。高スループットを狙うなら、ある程度の linger.ms と適切な batch.size を組み合わせ、compression.type を有効にするのが定石です。
ただし待機時間を増やすとエンドツーエンドの遅延は伸びます。リアルタイム性が重要なワークロードでは linger.ms を小さくし、レートが低いときのみ控えめに待機するなど、SLA と回線/CPU コストのバランスを取りましょう。
典型的なバッチ向け設定例(抜粋)
Properties p = new Properties();
p.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "broker:9092");
p.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.ByteArraySerializer.class.getName());
p.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.ByteArraySerializer.class.getName());
// バッチを太らせる
p.put(ProducerConfig.BATCH_SIZE_CONFIG, 64 * 1024); // 64 KiB 程度から検証
p.put(ProducerConfig.LINGER_MS_CONFIG, 5); // 5ms 程度からSLAに合わせて調整
p.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4"); // 低レイテンシと圧縮率のバランス
// バッファ管理
p.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 64L * 1024 * 1024); // 総バッファ上限
p.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 30_000); // send が待てる最大時間
acks は応答条件を決めます。acks=0 は送信即完了、acks=1 はリーダー書き込みまで、acks=all はISRの追随を待ちます。耐久性を重視するなら acks=all とし、ブローカー/トピック側の min.insync.replicas と組み合わせるのが定石です(ISR が閾値未満だとエラー応答)。
retries は失敗時の再送回数、retry.backoff.ms は再送の待機。delivery.timeout.ms は「最初の送信試行から成功/失敗が確定するまでの上限」で、linger や再送時間も含む包括的な上限です。request.timeout.ms は単一リクエストの待機上限で、delivery.timeout.ms はそれらの合算上限と捉えると整理できます。
max.in.flight.requests.per.connection が大きいと、再送が絡むと順序が入れ替わることがあります。順序重視なら idempotence を有効化するか、保守的に max.in.flight を小さく設定します。
| acks | ブローカー応答条件 | 耐久性の目安 | レイテンシ/スループット傾向 |
|---|---|---|---|
| 0 | 送信直後に成功扱い(応答待ちなし) | 低(ブローカー未到達でも成功と見なす) | 最小レイテンシ・最大スループット |
| 1 | リーダーへの書き込み成功で応答 | 中(リーダー障害時は喪失の可能性) | 低レイテンシ・高スループット |
| all | ISR 全員(min.insync.replicas を満たす)からの確認で応答 | 高(ISR が十分な場合) | やや高レイテンシ・中〜高スループット(バッチと圧縮で緩和可) |
失敗時のリトライとタイムアウト(要点)
Properties p = new Properties();
// 信頼性重視
p.put(ProducerConfig.ACKS_CONFIG, "all");
// 十分な再送回数(具体値はSLAと失敗パターンで設計)
p.put(ProducerConfig.RETRIES_CONFIG, 100);
p.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 200); // ブローカー回復を待つ間隔
// 包括的な送達上限(linger や再送を含む)
p.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 120_000);
// リクエスト単体の待機上限(小さすぎると誤検知が増える)
p.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 30_000);
// 順序性の確保(idempotence 未使用なら1を推奨)
p.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1);
キーが同じレコードは同じパーティションに入り、順序が保たれます。キー未指定の場合は実装上のデフォルトパーティショナーがラウンドロビン/スティッキーな戦略で分散します。高カーディナリティのキーは偏りにくい一方、ホットキーがあると一部パーティションに負荷が集中します。
ログコンパクションを使う場合、キーは最新値で上書きされる単位になります。順序性・圧縮効率・ホットパーティション回避を総合してキーを設計します。必要に応じてカスタムパーティショナーでビジネス上の分散規則を実装します。
カスタムパーティショナーの骨子(Java)
public class ModPartitioner implements org.apache.kafka.clients.producer.Partitioner {
@Override
public void configure(java.util.Map<String, ?> configs) {}
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, org.apache.kafka.common.Cluster cluster) {
int partitions = cluster.partitionCountForTopic(topic);
if (keyBytes == null || partitions <= 0) return 0;
// 例: 数値IDを想定し軽いモジュロ分散
int k = java.nio.ByteBuffer.wrap(keyBytes).getInt();
return Math.floorMod(k, partitions);
}
@Override
public void close() {}
}
// 使用時: ProducerConfig.PARTITIONER_CLASS_CONFIG にクラス名を設定
enable.idempotence=true は重複排除を可能にし、再送で二重書き込みにならないようにします。Producer は内部的にプロデューサIDとシーケンス番号を用い、ブローカー側で重複を無視できます。これにより acks=all、適切な max.in.flight(一般に5以下)など互換条件が満たされます。
トランザクションは複数トピック/パーティションにまたがる一貫性を提供します。transactional.id を安定させ、initTransactions → beginTransaction → send... → commitTransaction(失敗時は abort)という流れです。消費側で read_committed を指定すればコミット済みのみを可視化できます。
Transactional Producer の最小パターン(Java)
Properties p = new Properties();
p.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "broker:9092");
p.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringSerializer.class.getName());
p.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringSerializer.class.getName());
p.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
p.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "orders-tx-01");
try (KafkaProducer<String, String> producer = new KafkaProducer<>(p)) {
producer.initTransactions();
try {
producer.beginTransaction();
producer.send(new ProducerRecord<>("orders", "k1", "v1"));
producer.send(new ProducerRecord<>("audit", "k1", "v1"));
producer.commitTransaction();
} catch (Exception e) {
producer.abortTransaction();
throw e;
}
}
実務ではメトリクス監視とバックプレッシャの扱いが重要です。record-error-rate、request-latency-avg、outgoing-byte-rate、bufferpool-wait-time-total などの指標をダッシュボード化し、配信エラーとタイムアウトの傾向を早期に掴みます。
障害注入(ブローカー停止、ネットワーク遅延、ISR 減少)で acks、retries、delivery.timeout.ms、idempotence の組み合わせが期待通りに効くかを検証してから本番適用すると安全です。
実務向け Producer 実装例(コールバック付き)
Properties p = new Properties();
p.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "broker:9092");
p.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringSerializer.class.getName());
p.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringSerializer.class.getName());
// 信頼性と順序のバランス
p.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
p.put(ProducerConfig.ACKS_CONFIG, "all");
p.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 120_000);
p.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 30_000);
p.put(ProducerConfig.LINGER_MS_CONFIG, 5);
p.put(ProducerConfig.BATCH_SIZE_CONFIG, 64 * 1024);
p.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4");
try (KafkaProducer<String, String> producer = new KafkaProducer<>(p)) {
Callback cb = (md, ex) -> {
if (ex != null) {
System.err.println("send failed: " + ex.getClass().getSimpleName() + " - " + ex.getMessage());
// ログ/メトリクス連携やDLQ送出などをここで行う
} else {
System.out.printf("ok %s-%d@%d%n", md.topic(), md.partition(), md.offset());
}
};
for (int i = 0; i < 1000; i++) {
ProducerRecord<String, String> rec = new ProducerRecord<>("orders", "cust-" + (i % 10), "payload-" + i);
producer.send(rec, cb);
}
producer.flush();
}
CCDAK
問題 1
要件: レプリカ追随を待ってから成功とし、失敗時は自動再送。再送に伴う重複を避け、各キー内の順序を維持したい。Producer 側の設定として最も適切なのはどれか。
正解: A
耐久性は acks=all が前提。再送に伴う重複排除と順序維持には enable.idempotence=true が最も効果的で、互換の max.in.flight 設定(一般に5以下)が求められます。B は耐久性/再送なし、C は耐久性要件未満、D は再送しないため delivery.timeout.ms を伸ばしても要件を満たしません。
acks=all は全レプリカからの応答を待つのですか?
全レプリカではなく ISR(In-Sync Replica)集合の応答を待ちます。min.insync.replicas を満たせない場合はエラー応答となり、Producer は例外を受け取ります。
linger.ms を大きくすると何が起こりますか?
低負荷時に待機してバッチを太らせ、圧縮効率とスループットが上がる一方、エンドツーエンドの遅延が増えます。SLA とコストに応じて数ms〜数十msの範囲で計測しながら調整します。
delivery.timeout.ms と request.timeout.ms の違いは?
request.timeout.ms は単一リクエストの待機上限、delivery.timeout.ms は最初の送信試行から再送・待機を含めた包括的な上限です。後者を超えると送達失敗(TimeoutException)として確定します。
NicheeLab編集部
データエンジニアリング・クラウド資格の専門家。Databricks・Snowflake等の認定資格を保有し、実務経験に基づいた問題作成・解説を行っています。NicheeLab運営。
Kafka Topic と Partition の基礎: 分散とスケーラビリティの要
CCDAK 対策と実務の両立を意識し、Topic/Partition/Replica/Consumer Group の役...
CCDAK 試験ガイド:出題範囲・配点・申込み・対策
Confluent Certified Developer for Apache Kafka (CCDAK) の出題範囲...
Confluent Certified Administrator (CCAAK) 対策: 出題範囲・配点の考え方・運用観点の要点
CCAAKに向けて、試験領域の押さえどころを運用目線で整理。プロダクションで通用する設定・監視・セキュリティの実践知を、...
Kafka の Replica と In-Sync Replicas を正しく設計する: 耐障害性と一貫性
レプリカとISRの仕組みを起点に、acks と min.insync.replicas、クリーン/アンクリーンリーダー選...
Kafka の Offset とコミット: ポジション管理と at-least-once の基礎
CCDAK 対策と実務の両立を意識して、Kafka コンシューマのオフセット管理とコミット戦略を整理。at-least-...