Kafka

Kafka Producer API 基礎: 送信フローと主要設定の意味

2026-04-19
NicheeLab編集部

Producer はアプリスレッドでレコードを直列化し、パーティショナーで宛先を決め、RecordAccumulator に蓄えて Sender スレッドがバッチ化してブローカーへ送ります。この一連の流れと各設定の役割を理解すると、試験対策にも現場のチューニングにも強くなれます。

本稿は公式ドキュメントに沿った安定概念を中心にまとめ、バージョン差分に依存しやすい既定値には踏み込みすぎず、効果とトレードオフに焦点を当てます。

送信フローの全体像

Producer の主要パスは「直列化 → パーティショニング → 蓄積(RecordAccumulator) → 送信(Sender スレッド) → 応答(ACK) → コールバック」です。アプリ側の send は多くの場合ノンブロッキングで、実送信はバックグラウンドで進みます。

バッチ単位はトピックパーティションごとに作られ、batch.size と linger.ms によりサイズ駆動・時間駆動のどちらでもフラッシュされます。圧縮はバッチ単位で効くため、バッチングと compression.type はセットで考えるとスループット効率が上がります。

応答は acks 設定に従って返り、失敗時は retries と backoff、そして delivery.timeout.ms までの間で再送されます。順序性の維持は max.in.flight.requests.per.connection と idempotence の有無に強く影響されます。

  • シリアライザ: key.serializer と value.serializer でバイト列へ変換
  • パーティショナー: キーに基づき宛先パーティションを決定(キー無しは負荷分散)
  • RecordAccumulator: 同一トピックパーティションのレコードをまとめてバッチ化
  • Sender スレッド: バッチを Produce リクエストに載せてブローカーへ送信
  • ACK とコールバック: 成功/失敗を onCompletion に通知

Producer 送信パス(概念)

sendkey/hashtpflushcallbackACKISR acks (acks=all)App Threadyour codeSerializerkey/valuePartitionerkey->partitionAccumulatorper topic-pFuture/CallbackSender Threadbuild batches, sendBroker LeaderReplicas

最小サンプル(同期送信で挙動を可視化)

Properties p = new Properties();
p.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
p.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringSerializer.class.getName());
p.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringSerializer.class.getName());
try (KafkaProducer<String, String> producer = new KafkaProducer<>(p)) {
    ProducerRecord<String, String> rec = new ProducerRecord<>("orders", "order-1", "payload");
    RecordMetadata md = producer.send(rec).get(); // 確実に送れたかを同期確認(学習用途)
    System.out.printf("topic=%s partition=%d offset=%d%n", md.topic(), md.partition(), md.offset());
}

バッチングとスループット調整

Producer はトピックパーティション単位でバッチを構築します。batch.size は1バッチの上限(バイト)で、linger.ms は十分なレコードが溜まらない場合に待機してバッチを太らせます。高スループットを狙うなら、ある程度の linger.ms と適切な batch.size を組み合わせ、compression.type を有効にするのが定石です。

ただし待機時間を増やすとエンドツーエンドの遅延は伸びます。リアルタイム性が重要なワークロードでは linger.ms を小さくし、レートが低いときのみ控えめに待機するなど、SLA と回線/CPU コストのバランスを取りましょう。

  • batch.size は「1パーティションあたり」なので、パーティション数が多いとメモリ消費も増える点に注意
  • compression.type は gzip/snappy/lz4/zstd など。CPU と圧縮率のトレードオフを理解して選択
  • buffer.memory は送信待ち全体の上限。満杯時は send が最大 max.block.ms までブロックし、超過で TimeoutException
  • small messages + linger.ms 数ms + 圧縮 でネットワーク効率を大幅に改善できるケースが多い

典型的なバッチ向け設定例(抜粋)

Properties p = new Properties();
p.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "broker:9092");
p.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.ByteArraySerializer.class.getName());
p.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.ByteArraySerializer.class.getName());
// バッチを太らせる
p.put(ProducerConfig.BATCH_SIZE_CONFIG, 64 * 1024);       // 64 KiB 程度から検証
p.put(ProducerConfig.LINGER_MS_CONFIG, 5);                // 5ms 程度からSLAに合わせて調整
p.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4");    // 低レイテンシと圧縮率のバランス
// バッファ管理
p.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 64L * 1024 * 1024); // 総バッファ上限
p.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 30_000);             // send が待てる最大時間

信頼性と再送の制御(acks, retries, delivery.timeout.ms)

acks は応答条件を決めます。acks=0 は送信即完了、acks=1 はリーダー書き込みまで、acks=all はISRの追随を待ちます。耐久性を重視するなら acks=all とし、ブローカー/トピック側の min.insync.replicas と組み合わせるのが定石です(ISR が閾値未満だとエラー応答)。

retries は失敗時の再送回数、retry.backoff.ms は再送の待機。delivery.timeout.ms は「最初の送信試行から成功/失敗が確定するまでの上限」で、linger や再送時間も含む包括的な上限です。request.timeout.ms は単一リクエストの待機上限で、delivery.timeout.ms はそれらの合算上限と捉えると整理できます。

max.in.flight.requests.per.connection が大きいと、再送が絡むと順序が入れ替わることがあります。順序重視なら idempotence を有効化するか、保守的に max.in.flight を小さく設定します。

  • 高耐久: acks=all + 適切な min.insync.replicas(ブローカー/トピック側)
  • 高スループット: acks=1 かつレプリカ数と可用性要件を考慮
  • delivery.timeout.ms までに成功しなければ TimeoutException を返す(再送を含む上限)
  • 順序性の担保: enable.idempotence=true を使う。未使用時は max.in.flight=1 で順序維持(スループット低下)
acksブローカー応答条件耐久性の目安レイテンシ/スループット傾向
0送信直後に成功扱い(応答待ちなし)低(ブローカー未到達でも成功と見なす)最小レイテンシ・最大スループット
1リーダーへの書き込み成功で応答中(リーダー障害時は喪失の可能性)低レイテンシ・高スループット
allISR 全員(min.insync.replicas を満たす)からの確認で応答高(ISR が十分な場合)やや高レイテンシ・中〜高スループット(バッチと圧縮で緩和可)

失敗時のリトライとタイムアウト(要点)

Properties p = new Properties();
// 信頼性重視
p.put(ProducerConfig.ACKS_CONFIG, "all");
// 十分な再送回数(具体値はSLAと失敗パターンで設計)
p.put(ProducerConfig.RETRIES_CONFIG, 100);
p.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 200); // ブローカー回復を待つ間隔
// 包括的な送達上限(linger や再送を含む)
p.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 120_000);
// リクエスト単体の待機上限(小さすぎると誤検知が増える)
p.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 30_000);
// 順序性の確保(idempotence 未使用なら1を推奨)
p.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1);

パーティショニングとキー設計

キーが同じレコードは同じパーティションに入り、順序が保たれます。キー未指定の場合は実装上のデフォルトパーティショナーがラウンドロビン/スティッキーな戦略で分散します。高カーディナリティのキーは偏りにくい一方、ホットキーがあると一部パーティションに負荷が集中します。

ログコンパクションを使う場合、キーは最新値で上書きされる単位になります。順序性・圧縮効率・ホットパーティション回避を総合してキーを設計します。必要に応じてカスタムパーティショナーでビジネス上の分散規則を実装します。

  • 順序保証が必要な単位で key を付ける(例: customerId, orderId)
  • ホットパーティション対策: キーにサフィックスを足して擬似シャーディング、あるいはレート制御
  • カスタムパーティショナーはスループットに影響するため計測しながら導入

カスタムパーティショナーの骨子(Java)

public class ModPartitioner implements org.apache.kafka.clients.producer.Partitioner {
  @Override
  public void configure(java.util.Map<String, ?> configs) {}

  @Override
  public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, org.apache.kafka.common.Cluster cluster) {
    int partitions = cluster.partitionCountForTopic(topic);
    if (keyBytes == null || partitions <= 0) return 0;
    // 例: 数値IDを想定し軽いモジュロ分散
    int k = java.nio.ByteBuffer.wrap(keyBytes).getInt();
    return Math.floorMod(k, partitions);
  }

  @Override
  public void close() {}
}
// 使用時: ProducerConfig.PARTITIONER_CLASS_CONFIG にクラス名を設定

Idempotence とトランザクション(Exactly-Once の要点)

enable.idempotence=true は重複排除を可能にし、再送で二重書き込みにならないようにします。Producer は内部的にプロデューサIDとシーケンス番号を用い、ブローカー側で重複を無視できます。これにより acks=all、適切な max.in.flight(一般に5以下)など互換条件が満たされます。

トランザクションは複数トピック/パーティションにまたがる一貫性を提供します。transactional.id を安定させ、initTransactions → beginTransaction → send... → commitTransaction(失敗時は abort)という流れです。消費側で read_committed を指定すればコミット済みのみを可視化できます。

  • idempotence: 再送時の重複排除と順序の強化(実装とバージョンにより上限あり)
  • transactions: 複数パーティションに跨る原子性。重いが一貫性が必要なケースに有効
  • 同一 transactional.id の多重使用はフェンシングされる(片方が強制失効)

Transactional Producer の最小パターン(Java)

Properties p = new Properties();
p.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "broker:9092");
p.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringSerializer.class.getName());
p.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringSerializer.class.getName());
p.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
p.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "orders-tx-01");
try (KafkaProducer<String, String> producer = new KafkaProducer<>(p)) {
  producer.initTransactions();
  try {
    producer.beginTransaction();
    producer.send(new ProducerRecord<>("orders", "k1", "v1"));
    producer.send(new ProducerRecord<>("audit", "k1", "v1"));
    producer.commitTransaction();
  } catch (Exception e) {
    producer.abortTransaction();
    throw e;
  }
}

実装例と運用チェックリスト

実務ではメトリクス監視とバックプレッシャの扱いが重要です。record-error-rate、request-latency-avg、outgoing-byte-rate、bufferpool-wait-time-total などの指標をダッシュボード化し、配信エラーとタイムアウトの傾向を早期に掴みます。

障害注入(ブローカー停止、ネットワーク遅延、ISR 減少)で acks、retries、delivery.timeout.ms、idempotence の組み合わせが期待通りに効くかを検証してから本番適用すると安全です。

  • メトリクス: record-error-rate が上昇したらまずブローカー応答とリトライ設定を点検
  • バッファ枯渇: buffer.memory と max.block.ms、送信レートの整合を確認
  • 順序必須パス: enable.idempotence=true とし、max.in.flight を保守的に
  • SLA 切り分け: request.timeout.ms と delivery.timeout.ms の意図を混同しない

実務向け Producer 実装例(コールバック付き)

Properties p = new Properties();
p.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "broker:9092");
p.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringSerializer.class.getName());
p.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringSerializer.class.getName());
// 信頼性と順序のバランス
p.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
p.put(ProducerConfig.ACKS_CONFIG, "all");
p.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 120_000);
p.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 30_000);
p.put(ProducerConfig.LINGER_MS_CONFIG, 5);
p.put(ProducerConfig.BATCH_SIZE_CONFIG, 64 * 1024);
p.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4");

try (KafkaProducer<String, String> producer = new KafkaProducer<>(p)) {
  Callback cb = (md, ex) -> {
    if (ex != null) {
      System.err.println("send failed: " + ex.getClass().getSimpleName() + " - " + ex.getMessage());
      // ログ/メトリクス連携やDLQ送出などをここで行う
    } else {
      System.out.printf("ok %s-%d@%d%n", md.topic(), md.partition(), md.offset());
    }
  };
  for (int i = 0; i < 1000; i++) {
    ProducerRecord<String, String> rec = new ProducerRecord<>("orders", "cust-" + (i % 10), "payload-" + i);
    producer.send(rec, cb);
  }
  producer.flush();
}

問題で確認

CCDAK

問題 1

要件: レプリカ追随を待ってから成功とし、失敗時は自動再送。再送に伴う重複を避け、各キー内の順序を維持したい。Producer 側の設定として最も適切なのはどれか。

  1. acks=all, enable.idempotence=true, max.in.flight.requests.per.connection=5 以下
  2. acks=1, retries=0, max.in.flight.requests.per.connection=1
  3. acks=0, linger.ms=50, compression.type=zstd
  4. acks=all, retries=0, delivery.timeout.ms を大きくする

正解: A

耐久性は acks=all が前提。再送に伴う重複排除と順序維持には enable.idempotence=true が最も効果的で、互換の max.in.flight 設定(一般に5以下)が求められます。B は耐久性/再送なし、C は耐久性要件未満、D は再送しないため delivery.timeout.ms を伸ばしても要件を満たしません。

よくある質問

acks=all は全レプリカからの応答を待つのですか?

全レプリカではなく ISR(In-Sync Replica)集合の応答を待ちます。min.insync.replicas を満たせない場合はエラー応答となり、Producer は例外を受け取ります。

linger.ms を大きくすると何が起こりますか?

低負荷時に待機してバッチを太らせ、圧縮効率とスループットが上がる一方、エンドツーエンドの遅延が増えます。SLA とコストに応じて数ms〜数十msの範囲で計測しながら調整します。

delivery.timeout.ms と request.timeout.ms の違いは?

request.timeout.ms は単一リクエストの待機上限、delivery.timeout.ms は最初の送信試行から再送・待機を含めた包括的な上限です。後者を超えると送達失敗(TimeoutException)として確定します。

この記事で学んだ内容を問題で確認しましょう

16,000問以上の問題で実力チェック

無料で問題を解いてみる
この記事の著者

NicheeLab編集部

データエンジニアリング・クラウド資格の専門家。Databricks・Snowflake等の認定資格を保有し、実務経験に基づいた問題作成・解説を行っています。NicheeLab運営。


関連記事
Kafka

Kafka Topic と Partition の基礎: 分散とスケーラビリティの要

CCDAK 対策と実務の両立を意識し、Topic/Partition/Replica/Consumer Group の役...

Kafka

CCDAK 試験ガイド:出題範囲・配点・申込み・対策

Confluent Certified Developer for Apache Kafka (CCDAK) の出題範囲...

Kafka

Confluent Certified Administrator (CCAAK) 対策: 出題範囲・配点の考え方・運用観点の要点

CCAAKに向けて、試験領域の押さえどころを運用目線で整理。プロダクションで通用する設定・監視・セキュリティの実践知を、...

Kafka

Kafka の Replica と In-Sync Replicas を正しく設計する: 耐障害性と一貫性

レプリカとISRの仕組みを起点に、acks と min.insync.replicas、クリーン/アンクリーンリーダー選...

Kafka

Kafka の Offset とコミット: ポジション管理と at-least-once の基礎

CCDAK 対策と実務の両立を意識して、Kafka コンシューマのオフセット管理とコミット戦略を整理。at-least-...

Kafkaの記事一覧 (101件)
© 2026 NicheeLab All rights reserved.