Snowpipe Streamingは、アプリケーションやIoTデバイスから発生するイベントデータを、 ファイルに書き出すことなくサブ秒レイテンシでSnowflakeテーブルへ直接挿入する仕組みです。 従来のSnowpipeがファイル到着をトリガとする「プル型」であるのに対し、 Snowpipe Streamingはクライアントが能動的に行データを送信する「プッシュ型」のストリーミング取り込みを実現します。
Snowpipe Streamingの中核はSnowflake Ingest SDKが提供するストリーミングAPIです。 クライアントアプリケーションはSDKを通じてチャネル(Channel)と呼ばれる論理的な書き込み経路を開き、 insertRow APIで行データをMapとして送信します。 送信されたデータはSnowflakeのサーバーレスインフラストラクチャ上でマイクロパーティションへ変換され、 ターゲットテーブルに反映されます。ステージの作成もCOPY INTOの実行も不要です。
| 特性 | 詳細 |
|---|---|
| SDK | Snowflake Ingest SDK(Java) |
| プロトコル | HTTPS経由のREST通信 |
| 書き込み単位 | 行(Map<String, Object>) |
| レイテンシ | サブ秒〜数秒 |
| ステージ | 不要 |
| ウェアハウス | 不要(サーバーレス) |
両者は「自動取り込み」という点では共通しますが、 入力ソース・レイテンシ・課金モデルに根本的な違いがあります。
| 観点 | Snowpipe | Snowpipe Streaming |
|---|---|---|
| 取り込み方式 | ファイルベース(COPY INTO自動実行) | 行レベル直接挿入(InsertRow API) |
| トリガ | SQS / EventGrid / Pub Sub 通知またはREST API | クライアントSDKからの能動的送信 |
| レイテンシ | 1〜5分程度 | サブ秒〜数秒 |
| ステージ要否 | 必要(Internal / External Stage) | 不要 |
| SDK要否 | 不要(通知ベースで自動) | 必要(Ingest SDK / Kafka Connector) |
| COPY_HISTORY | 記録あり(ファイル単位) | 記録なし(行単位のため) |
| コストモデル | ファイルあたりのServerless Credit | 処理量ベースのServerless Credit |
| 向く用途 | CSVやParquetの継続バッチロード | Kafkaイベント・IoTセンサーデータ・リアルタイムログ |
Snowpipe Streamingのデータフローは「クライアント → チャネル → テーブル」の3層構造です。 1つのテーブルに対して複数のチャネルを並列に開くことができ、 チャネルごとに独立したoffsetTokenで進捗管理されます。
┌─────────────────────┐
│ Client App │ Snowflake Ingest SDK (Java)
│ / Kafka Connector │
└────────┬────────────┘
│ insertRow(Map)
▼
┌─────────────────────┐
│ Channel │ 論理的な書き込み経路
│ (offsetToken管理) │ テーブルごとに複数並列可
└────────┬────────────┘
│ HTTPS
▼
┌─────────────────────┐
│ Snowflake │ サーバーレスインフラ
│ Micro-partition │ 行データ → マイクロパーティション変換
│ ┌───────────────┐ │
│ │ Target Table │ │ 数秒以内にクエリ可能
│ └───────────────┘ │
└─────────────────────┘Snowflake Ingest SDKを使った基本的な取り込みフローです。 クライアントを生成し、チャネルを開いてinsertRowで行を送信します。
// 1. ストリーミングクライアントの生成
Properties props = new Properties();
props.put("url", "https://<account>.snowflakecomputing.com");
props.put("user", "INGEST_USER");
props.put("private_key", privateKey);
props.put("role", "INGEST_ROLE");
SnowflakeStreamingIngestClient client =
SnowflakeStreamingIngestClientFactory
.builder("MY_CLIENT")
.setProperties(props)
.build();
// 2. チャネルのオープン
OpenChannelRequest req = OpenChannelRequest.builder("CH_ORDERS")
.setDBName("PROD_DB")
.setSchemaName("PUBLIC")
.setTableName("ORDERS")
.setOnErrorOption(OpenChannelRequest.OnErrorOption.CONTINUE)
.build();
SnowflakeStreamingIngestChannel channel = client.openChannel(req);
// 3. 行の挿入
Map<String, Object> row = new HashMap<>();
row.put("ORDER_ID", 10001);
row.put("CUSTOMER", "Tanaka");
row.put("AMOUNT", 5800);
row.put("ORDER_TS", "2026-03-27T10:30:00Z");
InsertValidationResponse resp = channel.insertRow(row, "offset-10001");
if (resp.hasErrors()) {
// エラーハンドリング
resp.getInsertErrors().forEach(e ->
System.err.println(e.getMessage()));
}
// 4. コミット済みオフセットの確認
String committed = channel.getLatestCommittedOffsetToken();
System.out.println("Last committed: " + committed);チャネルはopenChannelで開き、不要になったらcloseします。 各チャネルはテーブルに対する独立した書き込みストリームを表します。
| 操作 | メソッド | 用途 |
|---|---|---|
| チャネルを開く | client.openChannel(request) | テーブルへの書き込み経路を確立 |
| 行を挿入 | channel.insertRow(row, offsetToken) | 1行ずつまたはinsertRowsでバッチ挿入 |
| コミット確認 | channel.getLatestCommittedOffsetToken() | 最後に永続化されたトークンを取得 |
| チャネルを閉じる | channel.close() | リソース解放 |
| 有効性チェック | channel.isValid() | チャネルが無効化されていないか確認 |
offsetTokenはクライアントが自由に設定できる文字列で、 Kafkaの場合はパーティションオフセット、独自アプリの場合は連番やUUIDを使います。 障害復旧時にgetLatestCommittedOffsetTokenで最後のコミット地点を取得し、 そこから再送することで重複や欠損のない取り込みを実現します。
Snowflake側はAt-least-onceを保証しますが、 アプリケーション側でoffsetTokenを正しく管理することでExactly-once semanticsを構築できます。
// 障害復旧時のリカバリパターン
SnowflakeStreamingIngestChannel channel = client.openChannel(req);
// 前回のコミット済みオフセットを取得
String lastCommitted = channel.getLatestCommittedOffsetToken();
// Kafkaの場合: lastCommittedオフセット以降からコンシュームを再開
long resumeOffset = Long.parseLong(lastCommitted) + 1;
consumer.seek(partition, resumeOffset);
// 再開地点から取り込みを継続
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
Map<String, Object> row = parseRecord(record);
channel.insertRow(row, String.valueOf(record.offset()));
}
}Snowflake Kafka ConnectorはSnowpipe Streamingモードをサポートしています。 Kafka ConnectのSink Connectorとして設定することで、 Kafkaトピックからのイベントを自動的にSnowpipe Streaming経由でテーブルへ取り込みます。
// Kafka Connector設定例(Snowpipe Streamingモード)
{
"connector.class": "com.snowflake.kafka.connector.SnowflakeSinkConnector",
"tasks.max": "4",
"topics": "orders_topic",
"snowflake.url.name": "<account>.snowflakecomputing.com",
"snowflake.user.name": "KAFKA_USER",
"snowflake.private.key": "${"quot;}{file:/secrets/sf_key.pem}",
"snowflake.role.name": "KAFKA_INGEST_ROLE",
"snowflake.database.name": "PROD_DB",
"snowflake.schema.name": "PUBLIC",
"snowflake.ingestion.method": "SNOWPIPE_STREAMING",
"snowflake.enable.schematization": "true",
"buffer.count.records": "10000",
"buffer.flush.time": "10",
"buffer.size.bytes": "5000000"
}| 設定パラメータ | 意味 |
|---|---|
| snowflake.ingestion.method | SNOWPIPE_STREAMINGを指定してストリーミングモードを有効化 |
| snowflake.enable.schematization | trueでJSON内のフィールドを自動的にテーブルカラムへマッピング |
| buffer.count.records | バッファに溜める最大レコード数(コスト効率に影響) |
| buffer.flush.time | バッファのフラッシュ間隔(秒) |
Snowpipe Streamingはサーバーレスコンピュートとして課金されます。 ユーザーがウェアハウスを起動する必要はありませんが、 取り込み処理に応じたServerless Creditが発生します。
| コスト要因 | 影響 | 最適化 |
|---|---|---|
| フラッシュ頻度 | 頻繁なフラッシュはクレジット消費増 | buffer設定でマイクロバッチ化 |
| チャネル数 | チャネルごとにオーバーヘッドあり | 必要最小限のチャネル数に抑制 |
| データ量 | 処理バイト数に比例 | 不要カラムの事前フィルタリング |
-- コスト確認クエリ
SELECT
START_TIME,
END_TIME,
CREDITS_USED,
SERVICE_TYPE
FROM SNOWFLAKE.ACCOUNT_USAGE.METERING_HISTORY
WHERE SERVICE_TYPE = 'SNOWPIPE_STREAMING'
ORDER BY START_TIME DESC
LIMIT 20;SnowPro Core / Data Engineerでは、 SnowpipeとSnowpipe Streamingの使い分けが頻出テーマです。
| 出題パターン | 押さえるポイント |
|---|---|
| 「低レイテンシ」「リアルタイム」キーワード | Snowpipe Streamingが正解候補 |
| 「ファイルベース」「CSV/Parquet到着」 | Snowpipe(Auto-ingest)が正解候補 |
| 「ステージ不要」「SDK使用」 | Snowpipe Streamingの特徴 |
| 「COPY_HISTORYで確認」 | Snowpipeのみ記録される(Streamingは対象外) |
| 「Exactly-once」「offsetToken」 | Snowpipe StreamingのoffsetToken管理 |
Data Engineer
問題 1
ある企業では、数千台のIoTセンサーから毎秒発生するテレメトリデータをSnowflakeへ取り込み、ダッシュボードで1秒以内の鮮度で可視化したいと考えています。データはJSON形式で、クラウドストレージへのファイル書き出しは避けたい要件です。Kafka経由でデータを受信するインフラは構築済みです。最も適切な取り込み方式はどれですか?
正解: A
サブ秒レイテンシが求められ、ファイル書き出しを避けたい要件にはSnowpipe Streamingが最適です。Kafka ConnectorでSNOWPIPE_STREAMINGモードを設定すれば、Kafkaトピックから直接Snowflakeテーブルへ行レベルで取り込めます。Bのsnowpipe Auto-ingestはファイルベースのため1〜5分のレイテンシが発生し要件を満たしません。CのCOPY INTO定期実行は5分間隔でさらに遅延します。DのINSERT INTO直接実行はウェアハウスが必要で大量センサーデータには非効率です。
Snowpipe StreamingはSnowpipeと何が違いますか?
Snowpipeはクラウドストレージ上のファイル到着をトリガに自動取り込みする仕組みで、SQS通知やREST APIで起動します。一方Snowpipe Streamingはファイルを介さず、Snowflake Ingest SDKまたはKafka Connectorを通じて行レベルのデータをサブ秒レイテンシでテーブルへ直接挿入します。ステージもCOPY INTOも不要で、チャネルと呼ばれる論理的な書き込み経路を通してInsertRow APIで行を送信します。Snowpipeはファイルベースの継続取り込み、Snowpipe StreamingはイベントストリームやIoTデータなどリアルタイム取り込みに適しています。
Snowpipe StreamingのExactly-once保証はどのように実現されていますか?
Snowpipe Streamingでは各チャネルにoffsetTokenという仕組みがあり、クライアントが行を挿入する際にトークンを付与できます。insertRow成功後にgetLatestCommittedOffsetTokenで最後にコミットされたトークンを取得し、障害復旧時にはそのトークン以降から再送することで重複なく再開できます。これによりアプリケーション側でExactly-once semanticsを実装できます。ただしSnowflake側はAt-least-onceを保証するため、Exactly-onceの担保にはクライアントロジックでのoffsetToken管理が必須です。
Snowpipe Streamingのコストはどのように計算されますか?
Snowpipe Streamingはサーバーレスコンピュートモデルで課金されます。ユーザーが仮想ウェアハウスを用意する必要はなく、Snowflake側が自動的にリソースを割り当てます。課金はServerless Credit単位で、取り込まれたデータ量と処理時間に基づきます。Snowpipeのファイルあたり課金とは異なり、行の挿入頻度とバッチサイズがコストに影響するため、細かすぎる単一行送信を避けてマイクロバッチにまとめるとコスト効率が向上します。ACCOUNT_USAGE.METERING_HISTORYビューでSnowpipe Streaming固有のクレジット消費を確認できます。
NicheeLab編集部
データエンジニアリング・クラウド資格の専門家。Databricks・Snowflake等の認定資格を保有し、実務経験に基づいた問題作成・解説を行っています。NicheeLab運営。
Snowflake資格一覧|全11試験(SnowPro)の難易度・費用
Snowflake認定資格(SnowPro)全11試験の一覧・難易度・費用・出題範囲を徹底解説。...
Snowflake試験の難易度ランキング|全11資格を徹底比較
Snowflake(SnowPro)認定全11試験の難易度をランキング形式で比較。学習時間・合格に必要なスキルから分析。...
Snowflake資格の勉強方法|効率的な学習ルートと合格のコツ
Snowflake認定資格(SnowPro)に最短で合格するための勉強方法。公式リソース・学習スケジュールを徹底ガイド。...
SnowPro Core試験完全解説|出題範囲・問題例・合格戦略
SnowPro Core Certification(COF-C03)を徹底解説。出題範囲・100問の試験形式・合格ライ...
SnowPro Platform Associate完全解説|入門試験の攻略
SnowPro Associate: Platform Certification(SOL-C01)を徹底解説。最も簡単...