Confluent Replicatorは、Kafka Connect上で動作する商用のレプリケーションコネクタです。データセンター間DRや段階的移行に強く、安定した運用とサポートを求める現場で使われます。
本稿は、公式ドキュメントの挙動に沿って、設計・設定・運用の実務要点をまとめ、CCAAK(Confluent Certified Administrator for Apache Kafka)で問われやすい論点も押さえます。
Replicatorは、ソースKafkaクラスターからデスティネーションクラスターへトピックを継続的に複製します。Kafka Connectのソースコネクタとして実装されており、スケールアウト、フォールトトレランス、再平衡といったConnectの枠組みをそのまま活用できます。
代表的な用途は、DRサイトへの一方向レプリケーション、データセンター統合時の段階的移行、分析系とプロダクション系の分離(読み取り負荷の隔離)です。スキーマやACLなどの周辺要素は別途ツールの役割分担があるため、データ本体の安定レプリケーションに集中させる設計が無難です。
| ユースケース | 狙い | 補助ツール・留意点 |
|---|---|---|
| DR/BCP | 遠隔地に低RPOでコピー | ネットワーク帯域と遅延、再同期の計画 |
| 段階的移行 | 新旧クラスターの並行稼働 | topic.rename.formatで安全に切替 |
| 分析系分離 | 本番負荷の遮断 | コンシューマグループの設計と遅延監視 |
Connector作成時のREST呼び出し(雰囲気)
POST /connectors
Content-Type: application/json
{
"name": "replicator-orders",
"config": {
"connector.class": "io.confluent.connect.replicator.ReplicatorSourceConnector",
"tasks.max": "4",
"src.kafka.bootstrap.servers": "src1:9092,src2:9092",
"dest.kafka.bootstrap.servers": "dst1:9092,dst2:9092",
"topic.regex": "^(orders|payments).*",
"topic.rename.format": "${topic}.dr",
"key.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
"value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
"provenance.header.enable": "true"
}
}Replicatorは、Kafka Connectワーカー上で動作し、ソースに対してはコンシューマ、デスティネーションに対してはプロデューサとして振る舞います。ネットワークは通常、Replicatorノードから両クラスターへ到達可能である必要があります。
運用上は、デスティネーションクラスター側にConnectクラスタを置く構成が一般的です。ソースからの送出規制が厳しい環境でも、Replicatorがソースに読み取り接続できれば一方向コピーを実現できます。
| パターン | 配置 | メリット/注意 |
|---|---|---|
| デスティネーション隣接 | Connectをデスティ側に設置 | 書き込み遅延を最小化しやすい |
| ソース隣接 | Connectをソース側に設置 | 読み取り遅延を最小化/送信路の制約に注意 |
| 中立ゾーン | 専用サブネットに設置 | 運用分離/往復レイテンシの影響に配慮 |
Replicatorの論理構成
Connectワーカーの基本設定(抜粋)
# worker.properties(抜粋)
bootstrap.servers=dst1:9092,dst2:9092
config.storage.topic=connect-configs
offset.storage.topic=connect-offsets
status.storage.topic=connect-status
config.storage.replication.factor=3
offset.storage.replication.factor=3
status.storage.replication.factor=3
# 冪等プロデューサで重複を最小化(ソース→デスティ間の書き込み側)
producer.enable.idempotence=true
# セキュリティ例(ワーカー→デスティ)
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="connect" password="secret";トピック選択は正規表現を使い、意図しない新規トピックの流入を防ぐために、命名規約を前提にしたパターンを明示します。段階的移行ではtopic.rename.formatで複製先トピック名を変え、コンシューマの切替を計画的に行います。
コンバータはByteArrayConverterを推奨します。ソースとデスティネーションの間でスキーマ変換を行わないことで、データそのものの忠実なコピーを担保できます。
| 設定キー | 目的 | 推奨・注意 |
|---|---|---|
| topic.regex / topic.whitelist | 複製対象の選択 | 明確な命名規約と併用 |
| topic.rename.format | トピック名の付け替え | 段階的移行や併走期間に有効 |
| provenance.header.enable | 出所メタ情報の付与 | トラブルシュートに有効 |
Replicatorコネクタ設定(セキュリティ込みの例)
{
"name": "replicator-secure",
"config": {
"connector.class": "io.confluent.connect.replicator.ReplicatorSourceConnector",
"tasks.max": "6",
"src.kafka.bootstrap.servers": "src-a:9093,src-b:9093",
"src.kafka.security.protocol": "SASL_SSL",
"src.kafka.sasl.mechanism": "PLAIN",
"src.kafka.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username='src-user' password='src-pass';",
"dest.kafka.bootstrap.servers": "dst-a:9093,dst-b:9093",
"dest.kafka.security.protocol": "SASL_SSL",
"dest.kafka.sasl.mechanism": "PLAIN",
"dest.kafka.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username='dst-user' password='dst-pass';",
"topic.regex": "^(orders_|inventory_).*
NicheeLab を読み込み中…
quot;, "topic.rename.format": "${topic}.dr", "key.converter": "org.apache.kafka.connect.converters.ByteArrayConverter", "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter", "provenance.header.enable": "true" } }
順序性はKafkaのパーティション内で保証されます。Replicatorも同様に、同一キーを同一パーティションにルーティングするため、ソースと同数パーティションを基本に設計します。
重複は障害時のリトライで発生し得ます。idempotent producerを有効化し、送信側のretriesやdelivery.timeout.msを適切に設定することで、実務上の重複を最小化できます。
| 設計レバー | 影響 | 目安・備考 |
|---|---|---|
| tasks.max | 平行度/スループット | パーティション総数≒上限の出発点 |
| linger/batch.size | 送信効率/遅延 | 遅延と帯域のトレードオフ |
| replication.factor(先) | 耐障害性 | DR先は3以上が無難 |
送信側プロデューサ調整(コネクタの追加プロパティ例)
{
"producer.linger.ms": "20",
"producer.batch.size": "131072",
"producer.compression.type": "lz4",
"producer.enable.idempotence": "true",
"producer.max.in.flight.requests.per.connection": "5"
}ソース/デスティネーションともにSASL/SSLでの相互認証を基本とし、最小権限のサービスアカウントを用意します。Replicatorはソース側で読み取り、デスティ側で書き込みの権限が必要です。
運用監視は、Connectのタスク状態、ラグ(ソースとデスティの時刻・オフセット差異)、エラーレート、再平衡頻度を押さえます。Confluent Control CenterやJMXメトリクスで可視化できます。
| 対象 | 推奨設定 | 観測メトリクス例 |
|---|---|---|
| 認証/暗号化 | SASL_SSL + 最小権限ACL | 認証失敗率、TLSハンドシェイク失敗 |
| 可観測性 | Control Center/JMX導入 | task-running/failed、lag、error-rate |
| 運用プロセス | Runbook/演習 | フェイルオーバー所要時間、RPO実測 |
ACL付与例(概念的、実環境に合わせて調整)
# ソース側: 読み取り(コンシューマ)
kafka-acls --authorizer-properties zookeeper.connect=zk-src:2181 \
--add --allow-principal User:replicator \
--operation Read --topic 'orders_*' --group replicator-orders
# デスティ側: 書き込みとトピック作成
auth-cli-or-kafka-acls --add --allow-principal User:replicator \
--operation Write --operation Create --topic 'orders_*.dr'Kafkaのレプリケーション手段は複数あります。Apache標準のMirrorMaker 2(MM2)はオープンソースで幅広く使われ、ConfluentのCluster Linkingはブローカー間で直接ミラーを形成する方式です。ReplicatorはConnectベースの枯れた運用と商用サポートを重視するケースに適します。
試験では、要件に応じた選定理由(ネットワーク制約、両端のバージョン/ディストリビューション混在、移行の段階的切替、運用ガバナンス)を言語化できることがポイントになります。
| 選択肢 | 長所 | 留意点 |
|---|---|---|
| Confluent Replicator | Connect基盤で運用容易、商用サポート、トピック名リネームが簡単 | Connectクラスタの運用が前提、処理はアプリ層(ブローカー直結ではない) |
| MirrorMaker 2 (OSS) | OSS標準、チェックポイントでグループ移行に対応 | 設計自由度が高い分、運用/監視の作り込みが必要 |
| Cluster Linking (Confluent) | ブローカー間直結、低オーバーヘッド、ミラートピック | 両端環境要件の整合とネットワーク要件に注意 |
Replicator停止・切替の最小Runbook(例)
# 1) 対象トピックのレプリケーションラグを監視し0近傍を確認
# 2) コンシューマを一時停止、最終バッチを待機
# 3) Replicatorを一時停止(/connectors/{name}/pause)
# 4) デスティ側の新トピック(例: *.dr)へコンシューマの接続先を切替
# 5) 段階的に旧系を廃止(監視を併走)CCAAK
問題 1
地理的に離れた2つのKafkaクラスター間で段階的にトピックを移行したい。新旧のコンシューマを安全に併走させ、切替時の混乱を避けたい。以下の選択肢のうち、最も適切な設計はどれか。
正解: A
段階的移行と併走運用には、複製先で別名トピックを作成し、コンシューマの接続先を段階切替するのが安全。Replicatorのtopic.rename.formatが適します。__consumer_offsetsの直接コピーは推奨されず、Cluster Linkingの双方向同時書き込みは衝突や設計複雑化の恐れがあります。一括停止移行はダウンタイムが大きい。
ReplicatorはSchema Registryのスキーマも複製しますか?
いいえ。ReplicatorはKafkaトピックのデータ複製に特化します。スキーマはSchema Registryの機能(例: エクスポート/インポートやSchema Linking)を利用するのが公式の推奨です。
コンシューマグループのオフセットはどう移行しますか?
Replicator自体は__consumer_offsetsを複製しません。段階的移行では新トピック名への切替を行い、必要に応じてチェックポイント/オフセット翻訳ツールを使います。最新の手順はConfluent公式ドキュメントの移行ガイドに従ってください。
Exactly-onceは担保されますか?
Kafkaの冪等プロデューサにより重複は最小化可能ですが、障害時を含むエンドツーエンドの完全なExactly-onceを常に保証するものではありません。少なくともパーティション内順序を維持し、重複を受け入れる設計(冪等なシンク等)を前提にするのが現実的です。
NicheeLab編集部
データエンジニアリング・クラウド資格の専門家。Databricks・Snowflake等の認定資格を保有し、実務経験に基づいた問題作成・解説を行っています。NicheeLab運営。
Kafka Topic と Partition の基礎: 分散とスケーラビリティの要
CCDAK 対策と実務の両立を意識し、Topic/Partition/Replica/Consumer Group の役...
CCDAK 試験ガイド:出題範囲・配点・申込み・対策
Confluent Certified Developer for Apache Kafka (CCDAK) の出題範囲...
Confluent Certified Administrator (CCAAK) 対策: 出題範囲・配点の考え方・運用観点の要点
CCAAKに向けて、試験領域の押さえどころを運用目線で整理。プロダクションで通用する設定・監視・セキュリティの実践知を、...
Kafka の Replica と In-Sync Replicas を正しく設計する: 耐障害性と一貫性
レプリカとISRの仕組みを起点に、acks と min.insync.replicas、クリーン/アンクリーンリーダー選...
Kafka の Offset とコミット: ポジション管理と at-least-once の基礎
CCDAK 対策と実務の両立を意識して、Kafka コンシューマのオフセット管理とコミット戦略を整理。at-least-...