Kafka

Kafka Connect REST APIで実現するコネクタ管理とCI/CD連携

2026-04-19
NicheeLab編集部

Kafka Connectは分散ワーカー上でコネクタを実行し、その制御面(API)としてRESTを提供します。CI/CDから宣言的に操作できる点が強みです。

本稿は公式ドキュメントの安定機能に基づき、コネクタ作成・更新・停止、ステータス監視、オフセット操作、検証APIの使い分けを具体的に解説します。

Connect REST APIの全体像とデプロイ前提

Kafka ConnectのREST APIは各ワーカーでリッスンし、/connectorsや/connector-plugins等のエンドポイントを介してコネクタを宣言的に管理します。分散モードではコネクタ設定・タスク・ステータスが内部トピックに保存されるため、ワーカー障害時も状態はKafkaに残ります。

CI/CD連携では、POSTで新規作成、PUTで構成の宣言的更新(基本的に冪等)を行います。更新により該当コネクタのタスクは再起動されるため、変更が無停止で反映されるわけではありませんが、通常は短時間の再均衡で収まります。

  • 典型ポート: 8083 (listenersで変更可)。TLS/リバースプロキシでの保護が一般的。
  • 分散モード前提: config.storage.topic / offset.storage.topic / status.storage.topic を適切にプロビジョニング。
  • 実務/試験の要点: 作成はPOST、更新はPUT、状態はGET /status、タスク単位の再起動はPOST /tasks/{id}/restart。
エンドポイント主な目的注意点
/connectors一覧取得・新規作成POSTは新規のみ(既存名では409が一般的)
/connectors/{name}存在確認・削除DELETEは内在タスクも停止
/connectors/{name}/config宣言的更新(冪等)変更時はタスク再起動が発生
/connectors/{name}/statusコネクタ/タスク状態FAILEDタスクの原因把握に必須
/connector-plugins利用可能プラグイン一覧CI前の前提確認に有用

CI/CDからConnect RESTへのフロー(概念図)

PUT/POST/GETGit (configs)CI pipelineKafka Connect REST8083Internal Topicsconfig/offset/statusConnector TasksExternal Source/SinkApache Kafka

基本の一覧・存在確認(例)

# コネクタ一覧
curl -s http://connect.example.com:8083/connectors | jq .

# 個別ステータス
curl -s http://connect.example.com:8083/connectors/my-sink/status | jq .

# プラグイン一覧
curl -s http://connect.example.com:8083/connector-plugins | jq '.[] | .class'

コネクタのライフサイクル: 作成・更新・一時停止・再起動・削除

新規作成はPOST /connectorsにnameとconfigを送ります。すでに同名が存在する場合は409系応答が一般的です。宣言的更新はPUT /connectors/{name}/configで行い、CI/CDではこのPUTを中心に据えることで冪等なデプロイが可能になります。

一時停止/再開はパイプラインの安全な変更や下流メンテ時に有用です。再起動は失敗回復や構成変更反映に使用します。削除は内部状態も含めてクリーンに落としますが、オフセットは別管理(後述)です。

  • 試験では「POST=新規、PUT=更新(冪等)」の区別が頻出。
  • PUT更新は通常、該当コネクタのタスク再起動を伴う点に注意(短時間のブリップを許容する設計に)。
  • pause/resumeはデータフローの一時停止/再開。deleteは構成を削除し、復旧には再登録が必要。
操作HTTP/パス効果・備考
新規作成POST /connectorsnameとconfig必須。既存名はエラー。
宣言的更新PUT /connectors/{name}/config冪等。タスクは再起動される。
一時停止PUT /connectors/{name}/pause (実装によりPOSTの場合あり)取り込み/配送を一時停止。
再開PUT /connectors/{name}/resume (実装によりPOSTの場合あり)一時停止からの再開。
再起動POST /connectors/{name}/restartコネクタと全タスク再起動(失敗回復・構成反映)。
削除DELETE /connectors/{name}コネクタ定義とタスクを削除(オフセットは別API)。

更新時のシーケンス(簡略)

ClientWorkerPUT /connectors/{name}/configReconfigureRestart tasksRunning

作成と冪等更新の例

# 新規作成(POST)
curl -s -X POST http://connect:8083/connectors \
  -H 'Content-Type: application/json' \
  -d '{
        "name": "orders-sink",
        "config": {
          "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
          "topics": "orders",
          "connection.url": "jdbc:postgresql://db/orders",
          "auto.create": "true"
        }
      }' | jq .

# 宣言的更新(PUT) - 同じボディをPUTすれば冪等
curl -s -X PUT http://connect:8083/connectors/orders-sink/config \
  -H 'Content-Type: application/json' \
  -d '{
        "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
        "topics": "orders",
        "connection.url": "jdbc:postgresql://db/orders",
        "auto.create": "true",
        "insert.mode": "upsert"
      }' | jq .

# 一時停止・再開・再起動
curl -s -X PUT  http://connect:8083/connectors/orders-sink/pause
curl -s -X PUT  http://connect:8083/connectors/orders-sink/resume
curl -s -X POST http://connect:8083/connectors/orders-sink/restart

オフセット管理・タスク制御・エラー対処

オフセットは内部トピックに保持され、RESTから参照・リセットが可能です。近年のKafka Connectでは/connectors/{name}/offsetsで参照、DELETEでリセット(一部/全体)がサポートされます。部分リセットは対象パーティション(もしくはソース側パーティション表現)をボディで指定します。

FAILEDタスクはPOST /connectors/{name}/tasks/{id}/restartで個別再起動できます。エラー処理はerrors.toleranceやDead Letter Queue(DLQ)関連設定で制御し、致命的エラーはFAILEDとなるため、ステータス監視と組み合わせて自動回復を設計します。

  • オフセットのリセットは再取り込み・再配送を引き起こすため、本番では明示的に範囲を限定。
  • FAILEDの原因はstatus出力のtraceやワーカーのログ(JMX/ログ)で確認。
  • DLQ設定(例: errors.deadletterqueue.topic.name)はシンク/ソースで挙動差あり。
場面API例影響
タスクのみ再起動POST /connectors/{name}/tasks/{id}/restart該当タスクが再生成。スループットに軽微な影響。
オフセット参照GET /connectors/{name}/offsets現行処理位置の把握に使用。
オフセット全体リセットDELETE /connectors/{name}/offsets全再処理の可能性。計画的に実施。
オフセット部分リセットDELETE /connectors/{name}/offsets (ボディ指定)対象限定の再処理。

エラー発生〜回復の流れ(概念)

Task FAILEDStatus checkTask restartIf persistentReview config/DLQFixRestart

ステータス/オフセット/再起動の例

# ステータス確認
curl -s http://connect:8083/connectors/orders-sink/status | jq .

# タスク0を再起動
curl -s -X POST http://connect:8083/connectors/orders-sink/tasks/0/restart

# オフセット参照
curl -s http://connect:8083/connectors/orders-sink/offsets | jq .

# オフセット部分リセット(例: パーティション指定)
curl -s -X DELETE http://connect:8083/connectors/orders-sink/offsets \
  -H 'Content-Type: application/json' \
  -d '{
        "partitions": [ { "topic": "orders", "partition": 0 } ]
      }'

CI/CD統合パターン: 宣言的運用とシークレット分離

Gitにコネクタ設定(JSON)を格納し、CIからPUTで適用するGitOpsが実務上安定です。環境差分は環境変数やテンプレートで埋め込み、同一の論理名を保つことでプロモーションを容易にします。

シークレットはConfigProvider(例: ${env:VAR}, ${file:/path:key} 等)や外部秘密管理プラグインを利用し、設定ファイルに平文を残さない方針が基本です。RBACやプロキシでREST面の認可を掛け、監査ログを残します。

  • PUT中心の宣言的適用により、同じパイプラインをDev/Stage/Prodで再現可能。
  • Blue/Green切替は-suffix(v2等)の並行稼働→切替→旧停止が安全。
  • ConfigProviderを使うとRESTボディや構成トピックにも秘匿値を露出しにくい。
パターン利点注意点
GitOps(宣言的PUT)冪等・差分管理・ロールバック容易更新でタスク再起動。ウィンドウ設計が必要
Blue/Greenコネクタ無停止切替に近づける二重書き込み/重複に留意(特にシンク)
ConfigProvider活用秘匿性・構成の再利用性向上プラグイン配備/設定が前提

GitOps適用(簡略)

GitpushCI/CDPUTConnect REST 8083

GitHub Actionsでの宣言的デプロイ例

name: deploy-connector
on: [push]
jobs:
  apply:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4
      - name: Render config
        run: |
          envsubst < config/orders-sink.json > rendered.json
      - name: Upsert via PUT
        run: |
          set -e
          CONNECT_BASE=${{ secrets.CONNECT_URL }}
          curl -sS -X PUT "$CONNECT_BASE/connectors/orders-sink/config" \
            -H 'Content-Type: application/json' \
            --data-binary @rendered.json | jq .
      - name: Verify status
        run: |
          CONNECT_BASE=${{ secrets.CONNECT_URL }}
          for i in {1..20}; do
            st=$(curl -s "$CONNECT_BASE/connectors/orders-sink/status" | jq -r .connector.state)
            [ "$st" = "RUNNING" ] && exit 0
            sleep 3
          done
          echo "Connector not RUNNING" >&2; exit 1

プラグイン検出と構成検証APIの活用

デプロイ前に/connector-pluginsで対象クラスが存在するかを確認します。plugin.pathの設定やクラスローダーの範囲により見え方が変わるため、CIの事前検証に組み込みます。

一部のバージョンでは構成検証エンドポイント(例: /connector-plugins/{class}/config/validate)が提供され、要求項目の欠落や型不一致をRESTで確認できます。HTTPメソッドは実装・バージョンで差異があるため、環境のドキュメントを必ず確認してください。

  • configにはconnector.class(完全修飾クラス名)を必ず指定。nameは作成時の識別子で別物。
  • 検証APIがない/差異がある環境では、ダミー作成を避けるために静的バリデーション(スキーマ化)を補助的に利用。
  • シリアライズやスキーマ互換性(特にSchema Registry連携)はコネクタ外側の前提として別途検証。
確認項目API/手段失敗時の挙動
プラグイン有無GET /connector-plugins対象クラスが無ければ作成時に失敗
構成検証/connector-plugins/{class}/config/validate必須キー欠落・型不一致を事前検出
存在確認GET /connectors/{name}404なら新規、200なら更新パスへ

検証→適用の流れ

List pluginsValidate configPUT /configStatus check

プラグイン確認と検証(例。環境によってメソッド差異に注意)

# プラグイン一覧
curl -s http://connect:8083/connector-plugins | jq '.[] | .class'

# 構成検証(例: PUT/POSTは環境に従う)
CLASS=io.confluent.connect.jdbc.JdbcSinkConnector
curl -s -X PUT http://connect:8083/connector-plugins/$CLASS/config/validate \
  -H 'Content-Type: application/json' \
  -d '{
        "name": "validate-only",
        "config": {
          "connector.class": "'$CLASS'",
          "topics": "orders"
        }
      }' | jq .

運用監視とセキュリティ留意点

稼働確認は/statusのRUNNING/FAILEDと、ワーカーのJMXメトリクス(スループット、エラー率、タスク稼働数等)を併用します。ヘルスチェック用に軽量なGET /connectorsや特定コネクタの/statusを利用し、CI/CD後の検証ステップに組み込みます。

REST面はTLS終端と認証・認可で保護します。自己管理のKafka ConnectではHTTPの前段にリバースプロキシを置き、Basic認証やmTLS・IP制限を適用する構成が一般的です。Confluent Platform環境ではRBACを使った粒度の細かい制御も可能です。

  • ロールリング更新時はタスク・ワーカーの再均衡に伴う一時的な遅延を監視。
  • APIは失敗時に再試行(指数バックオフ)を実装。409/409相当は整合的ハンドリング。
  • RESTはパブリックに晒さない。最小権限・監査ログ・証明書ローテーションを徹底。
監視対象方法代表指標/目安
タスク健全性GET /connectors/{name}/status + JMXRUNNING比率、FAILED数=0
遅延・処理量JMX(Connector/Taskメトリクス)records-consumed/produced-rate
API可用性ヘルスチェック(GET /connectors)p95応答時間、エラー率

監視の面(概念)

Status API -> Health
JMX -> Metrics
Logs -> Root cause

簡易ヘルスチェックとセキュアな呼び出し例

# 200かつJSONなら疎通OKとみなす
curl -fsS https://connect.example.com:8443/connectors \
  --cacert /etc/ssl/certs/ca.pem \
  -u ci-user:${CONNECT_REST_PASSWORD} | jq '.' >/dev/null

# 指数バックオフでステータス確認
for i in 1 2 3 5 8 13; do
  state=$(curl -s https://connect.example.com:8443/connectors/orders-sink/status \
    --cacert /etc/ssl/certs/ca.pem -u ci-user:${CONNECT_REST_PASSWORD} | jq -r .connector.state || echo "")
  [ "$state" = "RUNNING" ] && break
  sleep $i
done

問題で確認

CCDAK / CCAAK

問題 1

CI/CDから既存のKafka Connectコネクタ設定を安全に再適用(冪等)したい。最も適切な操作はどれか。

  1. PUT /connectors/{name}/config に完全な設定を送る
  2. POST /connectors に同じnameで再度作成を試みる
  3. POST /connectors/{name}/restart だけを実行する
  4. DELETE /connectors/{name} の後にPOST /connectors で作り直す

正解: A

宣言的な冪等更新はPUT /connectors/{name}/configで行います。POSTは新規作成向けで既存名ではエラーになりやすく、restartは構成を変更しません。削除・再作成は不要なダウンタイムとリスクを増やします。

よくある質問

更新時の無停止化は可能ですか?

PUT更新は通常タスク再起動を伴います。完全無停止は困難なため、Blue/Green(新コネクタを並行稼働→切替)や一時停止/再開の計画実施で影響を最小化します。

pause/resumeとrestartの違いは?

pause/resumeはデータ処理の一時停止/再開で構成は変わりません。restartはコネクタ/タスクを再起動し、失敗回復や構成変更の反映に使います。

クラスタ移行時にコネクタのオフセットはどう扱いますか?

内部トピックに保存されるため、そのままでは新クラスタに移りません。RESTのオフセットエクスポート/リセット機能やツールを用いて必要なパーティションのみを計画的に移行・再設定するのが安全です。

この記事で学んだ内容を問題で確認しましょう

16,000問以上の問題で実力チェック

無料で問題を解いてみる
この記事の著者

NicheeLab編集部

データエンジニアリング・クラウド資格の専門家。Databricks・Snowflake等の認定資格を保有し、実務経験に基づいた問題作成・解説を行っています。NicheeLab運営。


関連記事
Kafka

Kafka Topic と Partition の基礎: 分散とスケーラビリティの要

CCDAK 対策と実務の両立を意識し、Topic/Partition/Replica/Consumer Group の役...

Kafka

CCDAK 試験ガイド:出題範囲・配点・申込み・対策

Confluent Certified Developer for Apache Kafka (CCDAK) の出題範囲...

Kafka

Confluent Certified Administrator (CCAAK) 対策: 出題範囲・配点の考え方・運用観点の要点

CCAAKに向けて、試験領域の押さえどころを運用目線で整理。プロダクションで通用する設定・監視・セキュリティの実践知を、...

Kafka

Kafka の Replica と In-Sync Replicas を正しく設計する: 耐障害性と一貫性

レプリカとISRの仕組みを起点に、acks と min.insync.replicas、クリーン/アンクリーンリーダー選...

Kafka

Kafka の Offset とコミット: ポジション管理と at-least-once の基礎

CCDAK 対策と実務の両立を意識して、Kafka コンシューマのオフセット管理とコミット戦略を整理。at-least-...

Kafkaの記事一覧 (101件)
© 2026 NicheeLab All rights reserved.