Kafka Connect runs connectors on distributed workers and exposes a REST API as its control plane. Its real strength is that you can drive it declaratively from CI/CD.
Based on the stable features documented in the official docs, this article walks through how to create, update, and stop connectors, monitor status, manage offsets, and use the validation API in practice.
The Kafka Connect REST API listens on every worker and lets you manage connectors declaratively through endpoints such as /connectors and /connector-plugins. In distributed mode, connector configs, tasks, and status are stored in internal topics, so state survives worker failures and stays in Kafka.
For CI/CD integration, use POST to create connectors and PUT for declarative (idempotent) configuration updates. An update restarts the connector's tasks, so changes are not applied with zero downtime, but the disruption is usually limited to a short rebalance.
| Endpoint | Primary purpose | Notes |
|---|---|---|
| /connectors | List or create connectors | POST is for creation only; an existing name typically returns 409 |
| /connectors/{name} | Existence check or deletion | DELETE also stops the connector's tasks |
| /connectors/{name}/config | Declarative update (idempotent) | Tasks are restarted whenever the config changes |
| /connectors/{name}/status | Connector and task state | Essential for diagnosing FAILED tasks |
| /connector-plugins | List of available plugins | Useful as a pre-CI sanity check |
Conceptual flow from CI/CD to the Connect REST API
Basic listing and existence checks (examples)
# コネクタ一覧
curl -s http://connect.example.com:8083/connectors | jq .
# 個別ステータス
curl -s http://connect.example.com:8083/connectors/my-sink/status | jq .
# プラグイン一覧
curl -s http://connect.example.com:8083/connector-plugins | jq '.[] | .class'Create a new connector by sending name and config to POST /connectors. If the name already exists, you usually get a 409-class response. Declarative updates go through PUT /connectors/{name}/config, and centering your CI/CD on this PUT call gives you idempotent deployments.
Pause and resume are useful when you need to make safe pipeline changes or perform downstream maintenance. Restart is for failure recovery or applying configuration changes. Delete cleanly removes the connector and its internal state, but offsets are managed separately (covered below).
| Operation | HTTP / Path | Effect and notes |
|---|---|---|
| Create | POST /connectors | name and config are required. Existing names return an error. |
| Declarative update | PUT /connectors/{name}/config | Idempotent. Tasks are restarted. |
| Pause | PUT /connectors/{name}/pause (some implementations use POST) | Temporarily halts ingestion or delivery. |
| Resume | PUT /connectors/{name}/resume (some implementations use POST) | Resumes from a paused state. |
| Restart | POST /connectors/{name}/restart | Restarts the connector and all of its tasks (for failure recovery or applying config changes). |
| Delete | DELETE /connectors/{name} | Deletes the connector definition and its tasks (offsets are managed via a separate API). |
Update sequence (simplified)
Examples of create and idempotent update
# 新規作成(POST)
curl -s -X POST http://connect:8083/connectors \
-H 'Content-Type: application/json' \
-d '{
"name": "orders-sink",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"topics": "orders",
"connection.url": "jdbc:postgresql://db/orders",
"auto.create": "true"
}
}' | jq .
# 宣言的更新(PUT) - 同じボディをPUTすれば冪等
curl -s -X PUT http://connect:8083/connectors/orders-sink/config \
-H 'Content-Type: application/json' \
-d '{
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"topics": "orders",
"connection.url": "jdbc:postgresql://db/orders",
"auto.create": "true",
"insert.mode": "upsert"
}' | jq .
# 一時停止・再開・再起動
curl -s -X PUT http://connect:8083/connectors/orders-sink/pause
curl -s -X PUT http://connect:8083/connectors/orders-sink/resume
curl -s -X POST http://connect:8083/connectors/orders-sink/restartOffsets are stored in internal topics and can be viewed and reset via REST. Recent versions of Kafka Connect support viewing offsets via GET /connectors/{name}/offsets and resetting them (fully or partially) via DELETE. For partial resets, specify the target partitions (or the source-side partition representation) in the request body.
You can restart a FAILED task individually with POST /connectors/{name}/tasks/{id}/restart. Error handling is controlled by errors.tolerance and Dead Letter Queue (DLQ) settings. Fatal errors mark the task FAILED, so combine status monitoring with these settings to design automated recovery.
| Scenario | Example API | Impact |
|---|---|---|
| Restart a single task | POST /connectors/{name}/tasks/{id}/restart | The target task is recreated, with a minor impact on throughput. |
| View offsets | GET /connectors/{name}/offsets | Used to understand the current processing position. |
| Full offset reset | DELETE /connectors/{name}/offsets | Potential full reprocessing. Plan carefully before running. |
| Partial offset reset | DELETE /connectors/{name}/offsets (with a body) | Reprocesses only the targeted partitions. |
Conceptual flow from error to recovery
Examples for status, offsets, and restart
# ステータス確認
curl -s http://connect:8083/connectors/orders-sink/status | jq .
# タスク0を再起動
curl -s -X POST http://connect:8083/connectors/orders-sink/tasks/0/restart
# オフセット参照
curl -s http://connect:8083/connectors/orders-sink/offsets | jq .
# オフセット部分リセット(例: パーティション指定)
curl -s -X DELETE http://connect:8083/connectors/orders-sink/offsets \
-H 'Content-Type: application/json' \
-d '{
"partitions": [ { "topic": "orders", "partition": 0 } ]
}'Storing connector configs (as JSON) in Git and applying them via PUT from CI — i.e., GitOps — is the most reliable pattern in practice. Bake environment differences in through environment variables or templates, and keep the logical names consistent across environments to make promotion easy.
For secrets, use ConfigProvider (for example ${env:VAR} or ${file:/path:key}) or an external secret management plugin so plaintext never lives in config files. Apply authorization to the REST surface via RBAC or a proxy, and keep audit logs.
| Pattern | Benefits | Caveats |
|---|---|---|
| GitOps (declarative PUT) | Idempotent, easy diff review, simple rollback | Updates restart tasks, so plan a deployment window |
| Blue/Green connectors | Gets you close to a zero-downtime cutover | Watch for double writes and duplicates (especially in sinks) |
| Using ConfigProvider | Improves secret hygiene and config reusability | Requires the plugin to be deployed and configured |
GitOps application (simplified)
Example declarative deployment in GitHub Actions
name: deploy-connector
on: [push]
jobs:
apply:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Render config
run: |
envsubst < config/orders-sink.json > rendered.json
- name: Upsert via PUT
run: |
set -e
CONNECT_BASE=${{ secrets.CONNECT_URL }}
curl -sS -X PUT "$CONNECT_BASE/connectors/orders-sink/config" \
-H 'Content-Type: application/json' \
--data-binary @rendered.json | jq .
- name: Verify status
run: |
CONNECT_BASE=${{ secrets.CONNECT_URL }}
for i in {1..20}; do
st=$(curl -s "$CONNECT_BASE/connectors/orders-sink/status" | jq -r .connector.state)
[ "$st" = "RUNNING" ] && exit 0
sleep 3
done
echo "Connector not RUNNING" >&2; exit 1Before deploying, check /connector-plugins to confirm that the target class is present. Visibility depends on plugin.path and class loader scope, so bake this check into your CI pre-validation.
Some versions expose a config validation endpoint (e.g., /connector-plugins/{class}/config/validate) that lets you catch missing required fields or type mismatches via REST. The HTTP method varies by implementation and version, so always check the docs for your environment.
| Check | API / Method | Behavior on failure |
|---|---|---|
| Plugin presence | GET /connector-plugins | If the target class is missing, creation fails |
| Config validation | /connector-plugins/{class}/config/validate | Detects missing required keys and type mismatches up front |
| Existence check | GET /connectors/{name} | 404 means create path; 200 means update path |
Flow: validate → apply
Plugin check and validation (example; watch for method differences across environments)
# プラグイン一覧
curl -s http://connect:8083/connector-plugins | jq '.[] | .class'
# 構成検証(例: PUT/POSTは環境に従う)
CLASS=io.confluent.connect.jdbc.JdbcSinkConnector
curl -s -X PUT http://connect:8083/connector-plugins/$CLASS/config/validate \
-H 'Content-Type: application/json' \
-d '{
"name": "validate-only",
"config": {
"connector.class": "'$CLASS'",
"topics": "orders"
}
}' | jq .Combine the RUNNING/FAILED state from /status with the worker's JMX metrics (throughput, error rate, running task count, etc.) to assess health. Use a lightweight GET /connectors or a specific connector's /status for health checks and wire them into your post-deploy validation in CI/CD.
Protect the REST surface with TLS termination plus authentication and authorization. For self-managed Kafka Connect, the typical setup is to put a reverse proxy in front of the HTTP listener and apply Basic auth, mTLS, and IP restrictions. On Confluent Platform you can also use RBAC for fine-grained control.
| Target | Method | Key metrics / targets |
|---|---|---|
| Task health | GET /connectors/{name}/status + JMX | RUNNING ratio, FAILED count = 0 |
| Latency and throughput | JMX (Connector and Task metrics) | records-consumed/produced-rate |
| API availability | Health check (GET /connectors) | p95 response time, error rate |
Monitoring surface (conceptual)
Status API -> Health
JMX -> Metrics
Logs -> Root causeSimple health check and secure invocation examples
# 200かつJSONなら疎通OKとみなす
curl -fsS https://connect.example.com:8443/connectors \
--cacert /etc/ssl/certs/ca.pem \
-u ci-user:${CONNECT_REST_PASSWORD} | jq '.' >/dev/null
# 指数バックオフでステータス確認
for i in 1 2 3 5 8 13; do
state=$(curl -s https://connect.example.com:8443/connectors/orders-sink/status \
--cacert /etc/ssl/certs/ca.pem -u ci-user:${CONNECT_REST_PASSWORD} | jq -r .connector.state || echo "")
[ "$state" = "RUNNING" ] && break
sleep $i
doneCCDAK / CCAAK
問題 1
You want to safely re-apply (idempotently) an existing Kafka Connect connector configuration from CI/CD. Which operation is most appropriate?
正解: A
Declarative idempotent updates go through PUT /connectors/{name}/config. POST is for creation and tends to fail on an existing name, while restart does not change the configuration. Delete-then-recreate adds unnecessary downtime and risk.
Can connector updates be performed without downtime?
PUT updates normally trigger a task restart, so fully zero-downtime updates are hard to achieve. The common approach is to minimize impact with a Blue/Green deployment (run a new connector in parallel, then cut over) or a planned pause/resume window.
What is the difference between pause/resume and restart?
pause/resume temporarily halts and resumes data processing without changing the configuration. restart restarts the connector or its tasks and is used for failure recovery or applying configuration changes.
How do you handle connector offsets during a cluster migration?
Offsets live in internal topics, so they do not migrate automatically to a new cluster. The safest approach is to use the REST offset export/reset endpoints (or related tools) to move and reconfigure only the partitions you need, as part of a planned migration.
Practice with certification-focused question sets
無料で問題を解いてみるNicheeLab Editorial Team
NicheeLab editorial team focused on data engineering and cloud certification learning. Content is structured around practical study needs and official exam domains.
Kafka Topics & Partitions: Distribution Fundamentals (2026)
How Kafka topics and partitions enable scale — ordering guar...
CCDAK Exam Guide: Confluent Certified Developer (2026)
Complete prep for the CCDAK exam — Producer/Consumer API, St...
CCAAK Exam Guide: Confluent Certified Administrator (2026)
Pass the CCAAK exam — cluster management, partitions, securi...
Kafka Replicas & ISR: Fault Tolerance Explained (2026)
Replica placement, in-sync replicas (ISR), leader election. ...
Kafka Offsets: Commit Modes & Consumer Position (2026)
Offset semantics — auto vs. manual commit, __consumer_offset...