Kafka

Kafka Connect REST API: Connector Management and CI/CD Integration

2026-04-19
NicheeLab Editorial Team

Kafka Connect runs connectors on distributed workers and exposes a REST API as its control plane. Its real strength is that you can drive it declaratively from CI/CD.

Based on the stable features documented in the official docs, this article walks through how to create, update, and stop connectors, monitor status, manage offsets, and use the validation API in practice.

Connect REST API Overview and Deployment Prerequisites

The Kafka Connect REST API listens on every worker and lets you manage connectors declaratively through endpoints such as /connectors and /connector-plugins. In distributed mode, connector configs, tasks, and status are stored in internal topics, so state survives worker failures and stays in Kafka.

For CI/CD integration, use POST to create connectors and PUT for declarative (idempotent) configuration updates. An update restarts the connector's tasks, so changes are not applied with zero downtime, but the disruption is usually limited to a short rebalance.

  • Typical port: 8083 (configurable via listeners). It is standard to protect it with TLS and/or a reverse proxy.
  • Assumes distributed mode: provision config.storage.topic, offset.storage.topic, and status.storage.topic appropriately.
  • Key points for the job and the exam: POST to create, PUT to update, GET /status for state, and POST /tasks/{id}/restart to restart a single task.
EndpointPrimary purposeNotes
/connectorsList or create connectorsPOST is for creation only; an existing name typically returns 409
/connectors/{name}Existence check or deletionDELETE also stops the connector's tasks
/connectors/{name}/configDeclarative update (idempotent)Tasks are restarted whenever the config changes
/connectors/{name}/statusConnector and task stateEssential for diagnosing FAILED tasks
/connector-pluginsList of available pluginsUseful as a pre-CI sanity check

Conceptual flow from CI/CD to the Connect REST API

PUT/POST/GETGit (configs)CI pipelineKafka Connect REST8083Internal Topicsconfig/offset/statusConnector TasksExternal Source/SinkApache Kafka

Basic listing and existence checks (examples)

# コネクタ一覧
curl -s http://connect.example.com:8083/connectors | jq .

# 個別ステータス
curl -s http://connect.example.com:8083/connectors/my-sink/status | jq .

# プラグイン一覧
curl -s http://connect.example.com:8083/connector-plugins | jq '.[] | .class'

Connector Lifecycle: Create, Update, Pause, Restart, Delete

Create a new connector by sending name and config to POST /connectors. If the name already exists, you usually get a 409-class response. Declarative updates go through PUT /connectors/{name}/config, and centering your CI/CD on this PUT call gives you idempotent deployments.

Pause and resume are useful when you need to make safe pipeline changes or perform downstream maintenance. Restart is for failure recovery or applying configuration changes. Delete cleanly removes the connector and its internal state, but offsets are managed separately (covered below).

  • The exam frequently tests the distinction: POST = create, PUT = update (idempotent).
  • Remember that a PUT update normally restarts the connector's tasks, so design for a short blip in throughput.
  • pause/resume halts and resumes data flow. delete removes the configuration, and you must re-register the connector to bring it back.
OperationHTTP / PathEffect and notes
CreatePOST /connectorsname and config are required. Existing names return an error.
Declarative updatePUT /connectors/{name}/configIdempotent. Tasks are restarted.
PausePUT /connectors/{name}/pause (some implementations use POST)Temporarily halts ingestion or delivery.
ResumePUT /connectors/{name}/resume (some implementations use POST)Resumes from a paused state.
RestartPOST /connectors/{name}/restartRestarts the connector and all of its tasks (for failure recovery or applying config changes).
DeleteDELETE /connectors/{name}Deletes the connector definition and its tasks (offsets are managed via a separate API).

Update sequence (simplified)

ClientWorkerPUT /connectors/{name}/configReconfigureRestart tasksRunning

Examples of create and idempotent update

# 新規作成(POST)
curl -s -X POST http://connect:8083/connectors \
  -H 'Content-Type: application/json' \
  -d '{
        "name": "orders-sink",
        "config": {
          "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
          "topics": "orders",
          "connection.url": "jdbc:postgresql://db/orders",
          "auto.create": "true"
        }
      }' | jq .

# 宣言的更新(PUT) - 同じボディをPUTすれば冪等
curl -s -X PUT http://connect:8083/connectors/orders-sink/config \
  -H 'Content-Type: application/json' \
  -d '{
        "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
        "topics": "orders",
        "connection.url": "jdbc:postgresql://db/orders",
        "auto.create": "true",
        "insert.mode": "upsert"
      }' | jq .

# 一時停止・再開・再起動
curl -s -X PUT  http://connect:8083/connectors/orders-sink/pause
curl -s -X PUT  http://connect:8083/connectors/orders-sink/resume
curl -s -X POST http://connect:8083/connectors/orders-sink/restart

Offset Management, Task Control, and Error Handling

Offsets are stored in internal topics and can be viewed and reset via REST. Recent versions of Kafka Connect support viewing offsets via GET /connectors/{name}/offsets and resetting them (fully or partially) via DELETE. For partial resets, specify the target partitions (or the source-side partition representation) in the request body.

You can restart a FAILED task individually with POST /connectors/{name}/tasks/{id}/restart. Error handling is controlled by errors.tolerance and Dead Letter Queue (DLQ) settings. Fatal errors mark the task FAILED, so combine status monitoring with these settings to design automated recovery.

  • Resetting offsets triggers re-ingestion or redelivery, so in production explicitly limit the scope.
  • Diagnose the root cause of FAILED tasks via the trace in the status output and the worker's logs (JMX and log files).
  • DLQ settings (such as errors.deadletterqueue.topic.name) behave differently for sink versus source connectors.
ScenarioExample APIImpact
Restart a single taskPOST /connectors/{name}/tasks/{id}/restartThe target task is recreated, with a minor impact on throughput.
View offsetsGET /connectors/{name}/offsetsUsed to understand the current processing position.
Full offset resetDELETE /connectors/{name}/offsetsPotential full reprocessing. Plan carefully before running.
Partial offset resetDELETE /connectors/{name}/offsets (with a body)Reprocesses only the targeted partitions.

Conceptual flow from error to recovery

Task FAILEDStatus checkTask restartIf persistentReview config/DLQFixRestart

Examples for status, offsets, and restart

# ステータス確認
curl -s http://connect:8083/connectors/orders-sink/status | jq .

# タスク0を再起動
curl -s -X POST http://connect:8083/connectors/orders-sink/tasks/0/restart

# オフセット参照
curl -s http://connect:8083/connectors/orders-sink/offsets | jq .

# オフセット部分リセット(例: パーティション指定)
curl -s -X DELETE http://connect:8083/connectors/orders-sink/offsets \
  -H 'Content-Type: application/json' \
  -d '{
        "partitions": [ { "topic": "orders", "partition": 0 } ]
      }'

CI/CD Integration Patterns: Declarative Ops and Secret Separation

Storing connector configs (as JSON) in Git and applying them via PUT from CI — i.e., GitOps — is the most reliable pattern in practice. Bake environment differences in through environment variables or templates, and keep the logical names consistent across environments to make promotion easy.

For secrets, use ConfigProvider (for example ${env:VAR} or ${file:/path:key}) or an external secret management plugin so plaintext never lives in config files. Apply authorization to the REST surface via RBAC or a proxy, and keep audit logs.

  • Centering deployments on declarative PUTs lets you replay the same pipeline across Dev, Stage, and Prod.
  • For Blue/Green cutovers, run a suffixed version (e.g., -v2) in parallel, switch traffic, then stop the old version — that sequence is the safest.
  • ConfigProvider helps keep secrets out of REST request bodies and the config storage topic.
PatternBenefitsCaveats
GitOps (declarative PUT)Idempotent, easy diff review, simple rollbackUpdates restart tasks, so plan a deployment window
Blue/Green connectorsGets you close to a zero-downtime cutoverWatch for double writes and duplicates (especially in sinks)
Using ConfigProviderImproves secret hygiene and config reusabilityRequires the plugin to be deployed and configured

GitOps application (simplified)

GitpushCI/CDPUTConnect REST 8083

Example declarative deployment in GitHub Actions

name: deploy-connector
on: [push]
jobs:
  apply:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4
      - name: Render config
        run: |
          envsubst < config/orders-sink.json > rendered.json
      - name: Upsert via PUT
        run: |
          set -e
          CONNECT_BASE=${{ secrets.CONNECT_URL }}
          curl -sS -X PUT "$CONNECT_BASE/connectors/orders-sink/config" \
            -H 'Content-Type: application/json' \
            --data-binary @rendered.json | jq .
      - name: Verify status
        run: |
          CONNECT_BASE=${{ secrets.CONNECT_URL }}
          for i in {1..20}; do
            st=$(curl -s "$CONNECT_BASE/connectors/orders-sink/status" | jq -r .connector.state)
            [ "$st" = "RUNNING" ] && exit 0
            sleep 3
          done
          echo "Connector not RUNNING" >&2; exit 1

Plugin Discovery and the Config Validation API

Before deploying, check /connector-plugins to confirm that the target class is present. Visibility depends on plugin.path and class loader scope, so bake this check into your CI pre-validation.

Some versions expose a config validation endpoint (e.g., /connector-plugins/{class}/config/validate) that lets you catch missing required fields or type mismatches via REST. The HTTP method varies by implementation and version, so always check the docs for your environment.

  • Always specify connector.class (the fully qualified class name) in config. name is a separate identifier used at creation time.
  • If the validation API is missing or behaves differently in your environment, lean on static validation (schemas) as a backup so you do not have to create dummy connectors.
  • Verify serialization and schema compatibility (especially Schema Registry integration) separately, as preconditions outside the connector itself.
CheckAPI / MethodBehavior on failure
Plugin presenceGET /connector-pluginsIf the target class is missing, creation fails
Config validation/connector-plugins/{class}/config/validateDetects missing required keys and type mismatches up front
Existence checkGET /connectors/{name}404 means create path; 200 means update path

Flow: validate → apply

List pluginsValidate configPUT /configStatus check

Plugin check and validation (example; watch for method differences across environments)

# プラグイン一覧
curl -s http://connect:8083/connector-plugins | jq '.[] | .class'

# 構成検証(例: PUT/POSTは環境に従う)
CLASS=io.confluent.connect.jdbc.JdbcSinkConnector
curl -s -X PUT http://connect:8083/connector-plugins/$CLASS/config/validate \
  -H 'Content-Type: application/json' \
  -d '{
        "name": "validate-only",
        "config": {
          "connector.class": "'$CLASS'",
          "topics": "orders"
        }
      }' | jq .

Operational Monitoring and Security Considerations

Combine the RUNNING/FAILED state from /status with the worker's JMX metrics (throughput, error rate, running task count, etc.) to assess health. Use a lightweight GET /connectors or a specific connector's /status for health checks and wire them into your post-deploy validation in CI/CD.

Protect the REST surface with TLS termination plus authentication and authorization. For self-managed Kafka Connect, the typical setup is to put a reverse proxy in front of the HTTP listener and apply Basic auth, mTLS, and IP restrictions. On Confluent Platform you can also use RBAC for fine-grained control.

  • During rolling updates, watch for temporary latency caused by task and worker rebalancing.
  • Implement retries (exponential backoff) for API failures. Handle 409 and 409-equivalent responses consistently.
  • Never expose REST publicly. Enforce least privilege, audit logging, and certificate rotation.
TargetMethodKey metrics / targets
Task healthGET /connectors/{name}/status + JMXRUNNING ratio, FAILED count = 0
Latency and throughputJMX (Connector and Task metrics)records-consumed/produced-rate
API availabilityHealth check (GET /connectors)p95 response time, error rate

Monitoring surface (conceptual)

Status API -> Health
JMX -> Metrics
Logs -> Root cause

Simple health check and secure invocation examples

# 200かつJSONなら疎通OKとみなす
curl -fsS https://connect.example.com:8443/connectors \
  --cacert /etc/ssl/certs/ca.pem \
  -u ci-user:${CONNECT_REST_PASSWORD} | jq '.' >/dev/null

# 指数バックオフでステータス確認
for i in 1 2 3 5 8 13; do
  state=$(curl -s https://connect.example.com:8443/connectors/orders-sink/status \
    --cacert /etc/ssl/certs/ca.pem -u ci-user:${CONNECT_REST_PASSWORD} | jq -r .connector.state || echo "")
  [ "$state" = "RUNNING" ] && break
  sleep $i
done

Check Your Understanding

CCDAK / CCAAK

問題 1

You want to safely re-apply (idempotently) an existing Kafka Connect connector configuration from CI/CD. Which operation is most appropriate?

  1. Send the full configuration to PUT /connectors/{name}/config
  2. POST /connectors again with the same name to recreate it
  3. Run only POST /connectors/{name}/restart
  4. DELETE /connectors/{name} and then POST /connectors to recreate it

正解: A

Declarative idempotent updates go through PUT /connectors/{name}/config. POST is for creation and tends to fail on an existing name, while restart does not change the configuration. Delete-then-recreate adds unnecessary downtime and risk.

Frequently Asked Questions

Can connector updates be performed without downtime?

PUT updates normally trigger a task restart, so fully zero-downtime updates are hard to achieve. The common approach is to minimize impact with a Blue/Green deployment (run a new connector in parallel, then cut over) or a planned pause/resume window.

What is the difference between pause/resume and restart?

pause/resume temporarily halts and resumes data processing without changing the configuration. restart restarts the connector or its tasks and is used for failure recovery or applying configuration changes.

How do you handle connector offsets during a cluster migration?

Offsets live in internal topics, so they do not migrate automatically to a new cluster. The safest approach is to use the REST offset export/reset endpoints (or related tools) to move and reconfigure only the partitions you need, as part of a planned migration.

Check what you learned with practice questions

Practice with certification-focused question sets

無料で問題を解いてみる
Author

NicheeLab Editorial Team

NicheeLab editorial team focused on data engineering and cloud certification learning. Content is structured around practical study needs and official exam domains.


Related articles
Kafka

Kafka Topics & Partitions: Distribution Fundamentals (2026)

How Kafka topics and partitions enable scale — ordering guar...

Kafka

CCDAK Exam Guide: Confluent Certified Developer (2026)

Complete prep for the CCDAK exam — Producer/Consumer API, St...

Kafka

CCAAK Exam Guide: Confluent Certified Administrator (2026)

Pass the CCAAK exam — cluster management, partitions, securi...

Kafka

Kafka Replicas & ISR: Fault Tolerance Explained (2026)

Replica placement, in-sync replicas (ISR), leader election. ...

Kafka

Kafka Offsets: Commit Modes & Consumer Position (2026)

Offset semantics — auto vs. manual commit, __consumer_offset...

Browse all Kafka articles (101)
© 2026 NicheeLab All rights reserved.