Kafka

Kafka Connect Single Message Transformsを使いこなす: 軽量変換でパイプラインの柔軟性を高める

2026-04-19
NicheeLab編集部

Single Message Transforms(SMT)は、Kafka Connectのレコード単位で動作する軽量な変換機構です。重い処理は別レイヤーに任せ、接続定義の近くで最小限の整形・マスキング・ルーティングを実現します。

本稿は、公式ドキュメントの安定機能に基づき、試験(CCDAK)でも狙われやすい論点と、実務で失敗しがちな落とし穴をまとめました。

SMTの基本: 位置づけと制約

SMTはKafka ConnectのSourceRecord/SinkRecordに対して、レコードごとに適用されるステートレスな処理です。チェーン可能で、設定順に評価されます。ソース側ではコネクタが内部表現(Connectのデータ/スキーマ)を出した後、SMTで加工し、コンバータがシリアライズしてKafkaへ書き込みます。シンク側ではKafkaからデシリアライズした後にSMTが走り、コネクタが外部システムへ書き込みます。

SMTはあくまで軽量変換です。集計、結合、状態管理が必要な処理や大規模なエンリッチメントは、Kafka StreamsやksqlDBなど別レイヤーで行うのが設計の原則です。

  • ステートレス・レコード単位・順序適用
  • キーと値のどちらに適用するかを明示(例: ExtractField$Key / $Value)
  • トピック名変更(RegexRouter)は可能だが、パーティション数の再配置や再分散を直接制御する機能ではない
  • スキーマ付き/スキーマレスの両方を扱えるが、変換によってはスキーマ必須(例: Flatten)

Kafka ConnectにおけるSMTの流れ(ソースとシンク)

Source SystemSource ConnectorSMTssource-sideKafkaserialization / deserializationSMTssink-sideSink ConnectorSink System

代表的なSMTとユースケース

公式に提供されている代表的なSMTは、フィールド抽出・名称変更・マスキング・フラット化・タイムスタンプ変換・トピックリライティングなどです。運用要件に直結するため、挙動と前提(スキーマ要否、キー/値の別、無いフィールドの扱い)を把握しておきます。

よくあるパターンとして、ソース側でキーを設定(ValueToKey)、不要フィールドを削除(ReplaceField)、監査用のメタ情報をヘッダやフィールドに付与(InsertField)、PIIの簡易マスキング(MaskField)、環境やバージョンに応じたトピック名変更(RegexRouter)などがあります。

  • ExtractField: 構造体から特定フィールドを抽出(キー/値の双方に対応)
  • ReplaceField: フィールドのホワイト/ブラックリストやリネーム
  • MaskField: フィールド値の簡易マスキング(固定長・全置換など)
  • Flatten: ネスト構造をフラット化(スキーマ必須)
  • InsertField: 静的値やメタ(timestamp, topic, partitionなど)の追加
  • ValueToKey: 値の一部をキーへ昇格(後段のパーティション割当てに影響しうる)

SMTと他手段の比較: 軽量変換か、別レイヤーか

変換の重さ・状態性・展開コストを軸に、どの手段を選ぶかを決めます。CCDAKでも、SMTとKafka Streams/ksqlDBの境界を問う設問が定番です。

原則は「軽量な整形はSMT、集計/結合/ウィンドウはStreams/ksqlDB、外部API依存の重いエンリッチは別プロセス(またはコネクタ側の機能)」。

  • シンプル変換を接続設定に内包できるのがSMTの強み
  • 状態を持つ処理や高い再利用性が必要ならコードベース(Streams/ksqlDB)へ
  • トピック名変更はSMT、分岐/マルチ配信はコネクタ複製や路線設計で対処
手段主用途状態保持運用/パフォーマンスの特徴
SMT(Kafka Connect)軽量整形・マスキング・トピック名変更なし(レコード単位)低オーバーヘッド・設定で完結・順序依存
Kafka Streams結合・集計・ウィンドウ処理・状態機械あり(ストア使用)アプリとしてデプロイ・スケール/監視が必要
ksqlDBSQLライクなストリーム処理あり(内部的に管理)マネージドに近い体験・対話的運用
カスタムコネクタ/プラグイン接続固有の拡張や重変換の内製ケース次第再配布・互換性管理が必要

構成の実際: チェーン、Key/Value、条件適用と順序

SMTはtransformsでエイリアスを列挙し、各エイリアスにtypeとプロパティを与えます。Key/Valueのどちらに適用するかは、クラス名末尾の$Key/$Valueで指定します。複数のSMTはカンマ区切りの順序で適用されます。

条件適用(predicates)は、条件に一致するレコードにだけ特定のSMTを有効化する仕組みです。トピック名やヘッダ有無などに基づく判定が可能です。利用可否やクラス名は環境のKafka Connectバージョンに依存する場合があるため、運用環境のドキュメントで確認してください。

  • 順序は厳密。前段の出力が後段の前提(例: ValueToKey → RegexRouterでキーに依存したルーティングを想定しない)
  • Key/Valueの取り違えが頻出の不具合要因
  • スキーマレス入力に対しFlattenなどスキーマ必須のSMTを適用すると失敗する
  • 本番適用前に単体データでスタンドアロンConnectを使った検証を推奨

サンプル: 複数SMTをチェーン(ReplaceField, InsertField, RegexRouter, ValueToKey)

name=jdbc-source-users
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
tasks.max=1
connection.url=jdbc:postgresql://db/app
mode=incrementing
incrementing.column.name=id
topic.prefix=src_

# SMT chain
transforms=rf,meta,route,v2k

# 不要フィールドの除去とリネーム
transforms.rf.type=org.apache.kafka.connect.transforms.ReplaceField$Value
transforms.rf.blacklist=password,sensitive_token
transforms.rf.renames=full_name:fullName

# メタ情報を値に付与
transforms.meta.type=org.apache.kafka.connect.transforms.InsertField$Value
transforms.meta.static.field=source
transforms.meta.static.value=postgres

# トピック名の環境リライティング
transforms.route.type=org.apache.kafka.connect.transforms.RegexRouter
transforms.route.regex=^src_(.*)$
transforms.route.replacement=dev.$1

# 値のidをキーへ
transforms.v2k.type=org.apache.kafka.connect.transforms.ValueToKey
transforms.v2k.fields=id

# 条件適用(例: 特定トピックにだけrfを適用)
# (使用可否やクラス名は環境のバージョンに依存)
# transforms.rf.predicate=topicMatch
# predicates=topicMatch
# predicates.topicMatch.type=org.apache.kafka.connect.transforms.predicates.TopicNameMatches
# predicates.topicMatch.pattern=^dev\.users$

スキーマ/型とエラー処理: schemaless・tombstone・DLQ

SMTはConnect内部データに対して動作します。スキーマレス(Map構造)かスキーマ付き(Struct)かで適用可否やフィールド解決の挙動が異なります。FlattenやSetSchemaMetadataなど、一部のSMTはスキーマが必須です。逆にReplaceFieldなどはスキーマレスでも動作します。

コンパクション用途のtombstone(値null)や存在しないフィールドに対する動作はSMTごとにオプションがあります(例: ignore.missing=true)。不整合や型変換エラーが起きた場合は、errors.toleranceやDLQ設定で処理継続と失敗レコードの退避を制御します。

  • TimestampConverterでの型指定ミス(string→Timestamp/long→string)が典型的失敗
  • MaskFieldは数値/文字列の型違いに注意
  • tombstoneレコードにSMTを適用しない/スキップする設定があるかを確認
  • DLQは検証・再処理の起点として有用(非同期運用がしやすい)

エラー処理とDLQの例(ソース/シンク共通で有効な設定)

errors.tolerance=all
errors.log.enable=true
errors.log.include.messages=true
errors.deadletterqueue.topic.name=connect-errors.dlq
errors.deadletterqueue.context.headers.enable=true
errors.deadletterqueue.topic.replication.factor=3

運用の勘所: パフォーマンス、監視、テスト

SMTは軽量ですが、チェーンが長いほどCPUとレイテンシは増えます。特にFlattenやRegex系はコストが上がりやすいため、必要十分な最小セットに絞ります。キー変更(ValueToKey/ExtractField$Key)はソース側のパーティション割当てに影響しうる点も含めて負荷を観測してください。

監視はConnectのタスク/ワーカー指標を確認します。エラー数、DLQレコード数、ポーリング遅延、投入/消費スループット、GC/CPUなどを継続観測し、変更時はローリングで反映・検証します。

  • 変更は基本的にコネクタ再構成(再起動)を伴うため、同一設定をコード化(IaC)
  • ステージングで代表データによる再現テスト(スキーマ進化も含めて)
  • メトリクス監視(task-error-rate、sink-record-read-rate、source-record-write-rate 等)
  • SMTの配置は最小限。重い変換は別レイヤーに退避

問題で確認

CCDAK

問題 1

あるチームは、ソースDBから取り込んだレコードのpasswordフィールドを除去し、topic名に環境プレフィックス(dev.)を付け、さらに値のidをメッセージキーに設定したい。Kafka Connectの機能だけで最も適切に実現する方法はどれか。

  1. SMTのReplaceField、RegexRouter、ValueToKeyをこの順にチェーンする
  2. Kafka Streamsで変換し、結果を別トピックに書き戻す
  3. コネクタをフォークしてカスタム変換をJavaで実装する
  4. プロデューサーのInterceptorで加工してからConnectに渡す

正解: A

要件はいずれもSMTの典型ユースケース(フィールド除去=ReplaceField、トピック名変更=RegexRouter、キー設定=ValueToKey)であり、順序付きチェーンで接続設定として完結できる。重い処理や状態管理は不要なためStreamsやカスタムは過剰。

よくある質問

SMTはシリアライズ/デシリアライズのどのタイミングで動作しますか?

ソース側ではコネクタが生成したConnect内部表現に対してSMTを適用し、その後コンバータでシリアライズしてKafkaに書き込みます。シンク側ではKafkaからデシリアライズした後にSMTを適用し、最終的に外部システムへ書き込みます。

SMTでパーティションを制御できますか?

直接の制御機能はありません。ただしソース側でキーを変更するSMT(ValueToKeyやExtractField$Key)を使うと、プロデューサのパーティショナが新しいキーで割り当てを行うため、結果的にパーティションが変わる可能性はあります。トピック名の変更(RegexRouter)はできますが、再分散の調整自体は担いません。

どのくらいの数のSMTをチェーンしても大丈夫ですか?

上限は実装上の制約よりも性能・可読性の観点で決まります。目安としては数個に留め、複雑化する場合はStreams/ksqlDBや別プロセスに退避してください。変更時はステージングでレイテンシとスループットを計測し、本番で継続監視するのが安全です。

この記事で学んだ内容を問題で確認しましょう

16,000問以上の問題で実力チェック

無料で問題を解いてみる
この記事の著者

NicheeLab編集部

データエンジニアリング・クラウド資格の専門家。Databricks・Snowflake等の認定資格を保有し、実務経験に基づいた問題作成・解説を行っています。NicheeLab運営。


関連記事
Kafka

Kafka Topic と Partition の基礎: 分散とスケーラビリティの要

CCDAK 対策と実務の両立を意識し、Topic/Partition/Replica/Consumer Group の役...

Kafka

CCDAK 試験ガイド:出題範囲・配点・申込み・対策

Confluent Certified Developer for Apache Kafka (CCDAK) の出題範囲...

Kafka

Confluent Certified Administrator (CCAAK) 対策: 出題範囲・配点の考え方・運用観点の要点

CCAAKに向けて、試験領域の押さえどころを運用目線で整理。プロダクションで通用する設定・監視・セキュリティの実践知を、...

Kafka

Kafka の Replica と In-Sync Replicas を正しく設計する: 耐障害性と一貫性

レプリカとISRの仕組みを起点に、acks と min.insync.replicas、クリーン/アンクリーンリーダー選...

Kafka

Kafka の Offset とコミット: ポジション管理と at-least-once の基礎

CCDAK 対策と実務の両立を意識して、Kafka コンシューマのオフセット管理とコミット戦略を整理。at-least-...

Kafkaの記事一覧 (100件)
© 2026 NicheeLab All rights reserved.