Single Message Transforms(SMT)は、Kafka Connectのレコード単位で動作する軽量な変換機構です。重い処理は別レイヤーに任せ、接続定義の近くで最小限の整形・マスキング・ルーティングを実現します。
本稿は、公式ドキュメントの安定機能に基づき、試験(CCDAK)でも狙われやすい論点と、実務で失敗しがちな落とし穴をまとめました。
SMTはKafka ConnectのSourceRecord/SinkRecordに対して、レコードごとに適用されるステートレスな処理です。チェーン可能で、設定順に評価されます。ソース側ではコネクタが内部表現(Connectのデータ/スキーマ)を出した後、SMTで加工し、コンバータがシリアライズしてKafkaへ書き込みます。シンク側ではKafkaからデシリアライズした後にSMTが走り、コネクタが外部システムへ書き込みます。
SMTはあくまで軽量変換です。集計、結合、状態管理が必要な処理や大規模なエンリッチメントは、Kafka StreamsやksqlDBなど別レイヤーで行うのが設計の原則です。
Kafka ConnectにおけるSMTの流れ(ソースとシンク)
公式に提供されている代表的なSMTは、フィールド抽出・名称変更・マスキング・フラット化・タイムスタンプ変換・トピックリライティングなどです。運用要件に直結するため、挙動と前提(スキーマ要否、キー/値の別、無いフィールドの扱い)を把握しておきます。
よくあるパターンとして、ソース側でキーを設定(ValueToKey)、不要フィールドを削除(ReplaceField)、監査用のメタ情報をヘッダやフィールドに付与(InsertField)、PIIの簡易マスキング(MaskField)、環境やバージョンに応じたトピック名変更(RegexRouter)などがあります。
変換の重さ・状態性・展開コストを軸に、どの手段を選ぶかを決めます。CCDAKでも、SMTとKafka Streams/ksqlDBの境界を問う設問が定番です。
原則は「軽量な整形はSMT、集計/結合/ウィンドウはStreams/ksqlDB、外部API依存の重いエンリッチは別プロセス(またはコネクタ側の機能)」。
| 手段 | 主用途 | 状態保持 | 運用/パフォーマンスの特徴 |
|---|---|---|---|
| SMT(Kafka Connect) | 軽量整形・マスキング・トピック名変更 | なし(レコード単位) | 低オーバーヘッド・設定で完結・順序依存 |
| Kafka Streams | 結合・集計・ウィンドウ処理・状態機械 | あり(ストア使用) | アプリとしてデプロイ・スケール/監視が必要 |
| ksqlDB | SQLライクなストリーム処理 | あり(内部的に管理) | マネージドに近い体験・対話的運用 |
| カスタムコネクタ/プラグイン | 接続固有の拡張や重変換の内製 | ケース次第 | 再配布・互換性管理が必要 |
SMTはtransformsでエイリアスを列挙し、各エイリアスにtypeとプロパティを与えます。Key/Valueのどちらに適用するかは、クラス名末尾の$Key/$Valueで指定します。複数のSMTはカンマ区切りの順序で適用されます。
条件適用(predicates)は、条件に一致するレコードにだけ特定のSMTを有効化する仕組みです。トピック名やヘッダ有無などに基づく判定が可能です。利用可否やクラス名は環境のKafka Connectバージョンに依存する場合があるため、運用環境のドキュメントで確認してください。
サンプル: 複数SMTをチェーン(ReplaceField, InsertField, RegexRouter, ValueToKey)
name=jdbc-source-users
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
tasks.max=1
connection.url=jdbc:postgresql://db/app
mode=incrementing
incrementing.column.name=id
topic.prefix=src_
# SMT chain
transforms=rf,meta,route,v2k
# 不要フィールドの除去とリネーム
transforms.rf.type=org.apache.kafka.connect.transforms.ReplaceField$Value
transforms.rf.blacklist=password,sensitive_token
transforms.rf.renames=full_name:fullName
# メタ情報を値に付与
transforms.meta.type=org.apache.kafka.connect.transforms.InsertField$Value
transforms.meta.static.field=source
transforms.meta.static.value=postgres
# トピック名の環境リライティング
transforms.route.type=org.apache.kafka.connect.transforms.RegexRouter
transforms.route.regex=^src_(.*)$
transforms.route.replacement=dev.$1
# 値のidをキーへ
transforms.v2k.type=org.apache.kafka.connect.transforms.ValueToKey
transforms.v2k.fields=id
# 条件適用(例: 特定トピックにだけrfを適用)
# (使用可否やクラス名は環境のバージョンに依存)
# transforms.rf.predicate=topicMatch
# predicates=topicMatch
# predicates.topicMatch.type=org.apache.kafka.connect.transforms.predicates.TopicNameMatches
# predicates.topicMatch.pattern=^dev\.users$SMTはConnect内部データに対して動作します。スキーマレス(Map構造)かスキーマ付き(Struct)かで適用可否やフィールド解決の挙動が異なります。FlattenやSetSchemaMetadataなど、一部のSMTはスキーマが必須です。逆にReplaceFieldなどはスキーマレスでも動作します。
コンパクション用途のtombstone(値null)や存在しないフィールドに対する動作はSMTごとにオプションがあります(例: ignore.missing=true)。不整合や型変換エラーが起きた場合は、errors.toleranceやDLQ設定で処理継続と失敗レコードの退避を制御します。
エラー処理とDLQの例(ソース/シンク共通で有効な設定)
errors.tolerance=all
errors.log.enable=true
errors.log.include.messages=true
errors.deadletterqueue.topic.name=connect-errors.dlq
errors.deadletterqueue.context.headers.enable=true
errors.deadletterqueue.topic.replication.factor=3SMTは軽量ですが、チェーンが長いほどCPUとレイテンシは増えます。特にFlattenやRegex系はコストが上がりやすいため、必要十分な最小セットに絞ります。キー変更(ValueToKey/ExtractField$Key)はソース側のパーティション割当てに影響しうる点も含めて負荷を観測してください。
監視はConnectのタスク/ワーカー指標を確認します。エラー数、DLQレコード数、ポーリング遅延、投入/消費スループット、GC/CPUなどを継続観測し、変更時はローリングで反映・検証します。
CCDAK
問題 1
あるチームは、ソースDBから取り込んだレコードのpasswordフィールドを除去し、topic名に環境プレフィックス(dev.)を付け、さらに値のidをメッセージキーに設定したい。Kafka Connectの機能だけで最も適切に実現する方法はどれか。
正解: A
要件はいずれもSMTの典型ユースケース(フィールド除去=ReplaceField、トピック名変更=RegexRouter、キー設定=ValueToKey)であり、順序付きチェーンで接続設定として完結できる。重い処理や状態管理は不要なためStreamsやカスタムは過剰。
SMTはシリアライズ/デシリアライズのどのタイミングで動作しますか?
ソース側ではコネクタが生成したConnect内部表現に対してSMTを適用し、その後コンバータでシリアライズしてKafkaに書き込みます。シンク側ではKafkaからデシリアライズした後にSMTを適用し、最終的に外部システムへ書き込みます。
SMTでパーティションを制御できますか?
直接の制御機能はありません。ただしソース側でキーを変更するSMT(ValueToKeyやExtractField$Key)を使うと、プロデューサのパーティショナが新しいキーで割り当てを行うため、結果的にパーティションが変わる可能性はあります。トピック名の変更(RegexRouter)はできますが、再分散の調整自体は担いません。
どのくらいの数のSMTをチェーンしても大丈夫ですか?
上限は実装上の制約よりも性能・可読性の観点で決まります。目安としては数個に留め、複雑化する場合はStreams/ksqlDBや別プロセスに退避してください。変更時はステージングでレイテンシとスループットを計測し、本番で継続監視するのが安全です。
NicheeLab編集部
データエンジニアリング・クラウド資格の専門家。Databricks・Snowflake等の認定資格を保有し、実務経験に基づいた問題作成・解説を行っています。NicheeLab運営。
Kafka Topic と Partition の基礎: 分散とスケーラビリティの要
CCDAK 対策と実務の両立を意識し、Topic/Partition/Replica/Consumer Group の役...
CCDAK 試験ガイド:出題範囲・配点・申込み・対策
Confluent Certified Developer for Apache Kafka (CCDAK) の出題範囲...
Confluent Certified Administrator (CCAAK) 対策: 出題範囲・配点の考え方・運用観点の要点
CCAAKに向けて、試験領域の押さえどころを運用目線で整理。プロダクションで通用する設定・監視・セキュリティの実践知を、...
Kafka の Replica と In-Sync Replicas を正しく設計する: 耐障害性と一貫性
レプリカとISRの仕組みを起点に、acks と min.insync.replicas、クリーン/アンクリーンリーダー選...
Kafka の Offset とコミット: ポジション管理と at-least-once の基礎
CCDAK 対策と実務の両立を意識して、Kafka コンシューマのオフセット管理とコミット戦略を整理。at-least-...