Kafka Connectは分散ワーカー上でコネクタを実行し、その制御面(API)としてRESTを提供します。CI/CDから宣言的に操作できる点が強みです。
本稿は公式ドキュメントの安定機能に基づき、コネクタ作成・更新・停止、ステータス監視、オフセット操作、検証APIの使い分けを具体的に解説します。
Kafka ConnectのREST APIは各ワーカーでリッスンし、/connectorsや/connector-plugins等のエンドポイントを介してコネクタを宣言的に管理します。分散モードではコネクタ設定・タスク・ステータスが内部トピックに保存されるため、ワーカー障害時も状態はKafkaに残ります。
CI/CD連携では、POSTで新規作成、PUTで構成の宣言的更新(基本的に冪等)を行います。更新により該当コネクタのタスクは再起動されるため、変更が無停止で反映されるわけではありませんが、通常は短時間の再均衡で収まります。
| エンドポイント | 主な目的 | 注意点 |
|---|---|---|
| /connectors | 一覧取得・新規作成 | POSTは新規のみ(既存名では409が一般的) |
| /connectors/{name} | 存在確認・削除 | DELETEは内在タスクも停止 |
| /connectors/{name}/config | 宣言的更新(冪等) | 変更時はタスク再起動が発生 |
| /connectors/{name}/status | コネクタ/タスク状態 | FAILEDタスクの原因把握に必須 |
| /connector-plugins | 利用可能プラグイン一覧 | CI前の前提確認に有用 |
CI/CDからConnect RESTへのフロー(概念図)
基本の一覧・存在確認(例)
# コネクタ一覧
curl -s http://connect.example.com:8083/connectors | jq .
# 個別ステータス
curl -s http://connect.example.com:8083/connectors/my-sink/status | jq .
# プラグイン一覧
curl -s http://connect.example.com:8083/connector-plugins | jq '.[] | .class'新規作成はPOST /connectorsにnameとconfigを送ります。すでに同名が存在する場合は409系応答が一般的です。宣言的更新はPUT /connectors/{name}/configで行い、CI/CDではこのPUTを中心に据えることで冪等なデプロイが可能になります。
一時停止/再開はパイプラインの安全な変更や下流メンテ時に有用です。再起動は失敗回復や構成変更反映に使用します。削除は内部状態も含めてクリーンに落としますが、オフセットは別管理(後述)です。
| 操作 | HTTP/パス | 効果・備考 |
|---|---|---|
| 新規作成 | POST /connectors | nameとconfig必須。既存名はエラー。 |
| 宣言的更新 | PUT /connectors/{name}/config | 冪等。タスクは再起動される。 |
| 一時停止 | PUT /connectors/{name}/pause (実装によりPOSTの場合あり) | 取り込み/配送を一時停止。 |
| 再開 | PUT /connectors/{name}/resume (実装によりPOSTの場合あり) | 一時停止からの再開。 |
| 再起動 | POST /connectors/{name}/restart | コネクタと全タスク再起動(失敗回復・構成反映)。 |
| 削除 | DELETE /connectors/{name} | コネクタ定義とタスクを削除(オフセットは別API)。 |
更新時のシーケンス(簡略)
作成と冪等更新の例
# 新規作成(POST)
curl -s -X POST http://connect:8083/connectors \
-H 'Content-Type: application/json' \
-d '{
"name": "orders-sink",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"topics": "orders",
"connection.url": "jdbc:postgresql://db/orders",
"auto.create": "true"
}
}' | jq .
# 宣言的更新(PUT) - 同じボディをPUTすれば冪等
curl -s -X PUT http://connect:8083/connectors/orders-sink/config \
-H 'Content-Type: application/json' \
-d '{
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"topics": "orders",
"connection.url": "jdbc:postgresql://db/orders",
"auto.create": "true",
"insert.mode": "upsert"
}' | jq .
# 一時停止・再開・再起動
curl -s -X PUT http://connect:8083/connectors/orders-sink/pause
curl -s -X PUT http://connect:8083/connectors/orders-sink/resume
curl -s -X POST http://connect:8083/connectors/orders-sink/restartオフセットは内部トピックに保持され、RESTから参照・リセットが可能です。近年のKafka Connectでは/connectors/{name}/offsetsで参照、DELETEでリセット(一部/全体)がサポートされます。部分リセットは対象パーティション(もしくはソース側パーティション表現)をボディで指定します。
FAILEDタスクはPOST /connectors/{name}/tasks/{id}/restartで個別再起動できます。エラー処理はerrors.toleranceやDead Letter Queue(DLQ)関連設定で制御し、致命的エラーはFAILEDとなるため、ステータス監視と組み合わせて自動回復を設計します。
| 場面 | API例 | 影響 |
|---|---|---|
| タスクのみ再起動 | POST /connectors/{name}/tasks/{id}/restart | 該当タスクが再生成。スループットに軽微な影響。 |
| オフセット参照 | GET /connectors/{name}/offsets | 現行処理位置の把握に使用。 |
| オフセット全体リセット | DELETE /connectors/{name}/offsets | 全再処理の可能性。計画的に実施。 |
| オフセット部分リセット | DELETE /connectors/{name}/offsets (ボディ指定) | 対象限定の再処理。 |
エラー発生〜回復の流れ(概念)
ステータス/オフセット/再起動の例
# ステータス確認
curl -s http://connect:8083/connectors/orders-sink/status | jq .
# タスク0を再起動
curl -s -X POST http://connect:8083/connectors/orders-sink/tasks/0/restart
# オフセット参照
curl -s http://connect:8083/connectors/orders-sink/offsets | jq .
# オフセット部分リセット(例: パーティション指定)
curl -s -X DELETE http://connect:8083/connectors/orders-sink/offsets \
-H 'Content-Type: application/json' \
-d '{
"partitions": [ { "topic": "orders", "partition": 0 } ]
}'Gitにコネクタ設定(JSON)を格納し、CIからPUTで適用するGitOpsが実務上安定です。環境差分は環境変数やテンプレートで埋め込み、同一の論理名を保つことでプロモーションを容易にします。
シークレットはConfigProvider(例: ${env:VAR}, ${file:/path:key} 等)や外部秘密管理プラグインを利用し、設定ファイルに平文を残さない方針が基本です。RBACやプロキシでREST面の認可を掛け、監査ログを残します。
| パターン | 利点 | 注意点 |
|---|---|---|
| GitOps(宣言的PUT) | 冪等・差分管理・ロールバック容易 | 更新でタスク再起動。ウィンドウ設計が必要 |
| Blue/Greenコネクタ | 無停止切替に近づける | 二重書き込み/重複に留意(特にシンク) |
| ConfigProvider活用 | 秘匿性・構成の再利用性向上 | プラグイン配備/設定が前提 |
GitOps適用(簡略)
GitHub Actionsでの宣言的デプロイ例
name: deploy-connector
on: [push]
jobs:
apply:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Render config
run: |
envsubst < config/orders-sink.json > rendered.json
- name: Upsert via PUT
run: |
set -e
CONNECT_BASE=${{ secrets.CONNECT_URL }}
curl -sS -X PUT "$CONNECT_BASE/connectors/orders-sink/config" \
-H 'Content-Type: application/json' \
--data-binary @rendered.json | jq .
- name: Verify status
run: |
CONNECT_BASE=${{ secrets.CONNECT_URL }}
for i in {1..20}; do
st=$(curl -s "$CONNECT_BASE/connectors/orders-sink/status" | jq -r .connector.state)
[ "$st" = "RUNNING" ] && exit 0
sleep 3
done
echo "Connector not RUNNING" >&2; exit 1デプロイ前に/connector-pluginsで対象クラスが存在するかを確認します。plugin.pathの設定やクラスローダーの範囲により見え方が変わるため、CIの事前検証に組み込みます。
一部のバージョンでは構成検証エンドポイント(例: /connector-plugins/{class}/config/validate)が提供され、要求項目の欠落や型不一致をRESTで確認できます。HTTPメソッドは実装・バージョンで差異があるため、環境のドキュメントを必ず確認してください。
| 確認項目 | API/手段 | 失敗時の挙動 |
|---|---|---|
| プラグイン有無 | GET /connector-plugins | 対象クラスが無ければ作成時に失敗 |
| 構成検証 | /connector-plugins/{class}/config/validate | 必須キー欠落・型不一致を事前検出 |
| 存在確認 | GET /connectors/{name} | 404なら新規、200なら更新パスへ |
検証→適用の流れ
プラグイン確認と検証(例。環境によってメソッド差異に注意)
# プラグイン一覧
curl -s http://connect:8083/connector-plugins | jq '.[] | .class'
# 構成検証(例: PUT/POSTは環境に従う)
CLASS=io.confluent.connect.jdbc.JdbcSinkConnector
curl -s -X PUT http://connect:8083/connector-plugins/$CLASS/config/validate \
-H 'Content-Type: application/json' \
-d '{
"name": "validate-only",
"config": {
"connector.class": "'$CLASS'",
"topics": "orders"
}
}' | jq .稼働確認は/statusのRUNNING/FAILEDと、ワーカーのJMXメトリクス(スループット、エラー率、タスク稼働数等)を併用します。ヘルスチェック用に軽量なGET /connectorsや特定コネクタの/statusを利用し、CI/CD後の検証ステップに組み込みます。
REST面はTLS終端と認証・認可で保護します。自己管理のKafka ConnectではHTTPの前段にリバースプロキシを置き、Basic認証やmTLS・IP制限を適用する構成が一般的です。Confluent Platform環境ではRBACを使った粒度の細かい制御も可能です。
| 監視対象 | 方法 | 代表指標/目安 |
|---|---|---|
| タスク健全性 | GET /connectors/{name}/status + JMX | RUNNING比率、FAILED数=0 |
| 遅延・処理量 | JMX(Connector/Taskメトリクス) | records-consumed/produced-rate |
| API可用性 | ヘルスチェック(GET /connectors) | p95応答時間、エラー率 |
監視の面(概念)
Status API -> Health
JMX -> Metrics
Logs -> Root cause簡易ヘルスチェックとセキュアな呼び出し例
# 200かつJSONなら疎通OKとみなす
curl -fsS https://connect.example.com:8443/connectors \
--cacert /etc/ssl/certs/ca.pem \
-u ci-user:${CONNECT_REST_PASSWORD} | jq '.' >/dev/null
# 指数バックオフでステータス確認
for i in 1 2 3 5 8 13; do
state=$(curl -s https://connect.example.com:8443/connectors/orders-sink/status \
--cacert /etc/ssl/certs/ca.pem -u ci-user:${CONNECT_REST_PASSWORD} | jq -r .connector.state || echo "")
[ "$state" = "RUNNING" ] && break
sleep $i
doneCCDAK / CCAAK
問題 1
CI/CDから既存のKafka Connectコネクタ設定を安全に再適用(冪等)したい。最も適切な操作はどれか。
正解: A
宣言的な冪等更新はPUT /connectors/{name}/configで行います。POSTは新規作成向けで既存名ではエラーになりやすく、restartは構成を変更しません。削除・再作成は不要なダウンタイムとリスクを増やします。
更新時の無停止化は可能ですか?
PUT更新は通常タスク再起動を伴います。完全無停止は困難なため、Blue/Green(新コネクタを並行稼働→切替)や一時停止/再開の計画実施で影響を最小化します。
pause/resumeとrestartの違いは?
pause/resumeはデータ処理の一時停止/再開で構成は変わりません。restartはコネクタ/タスクを再起動し、失敗回復や構成変更の反映に使います。
クラスタ移行時にコネクタのオフセットはどう扱いますか?
内部トピックに保存されるため、そのままでは新クラスタに移りません。RESTのオフセットエクスポート/リセット機能やツールを用いて必要なパーティションのみを計画的に移行・再設定するのが安全です。
NicheeLab編集部
データエンジニアリング・クラウド資格の専門家。Databricks・Snowflake等の認定資格を保有し、実務経験に基づいた問題作成・解説を行っています。NicheeLab運営。
Kafka Topic と Partition の基礎: 分散とスケーラビリティの要
CCDAK 対策と実務の両立を意識し、Topic/Partition/Replica/Consumer Group の役...
CCDAK 試験ガイド:出題範囲・配点・申込み・対策
Confluent Certified Developer for Apache Kafka (CCDAK) の出題範囲...
Confluent Certified Administrator (CCAAK) 対策: 出題範囲・配点の考え方・運用観点の要点
CCAAKに向けて、試験領域の押さえどころを運用目線で整理。プロダクションで通用する設定・監視・セキュリティの実践知を、...
Kafka の Replica と In-Sync Replicas を正しく設計する: 耐障害性と一貫性
レプリカとISRの仕組みを起点に、acks と min.insync.replicas、クリーン/アンクリーンリーダー選...
Kafka の Offset とコミット: ポジション管理と at-least-once の基礎
CCDAK 対策と実務の両立を意識して、Kafka コンシューマのオフセット管理とコミット戦略を整理。at-least-...