注:本記事は(2021年10月19日)に公開された(Bringing More to the Table: Azure and UDTF Support with Snowpark)を翻訳して公開したものです。
6月、SnowflakeはAWSでJava UDFとSnowpark APIがプレビュー版で提供開始されたことを発表しましたが、そのプレビュー版にいくつかの追加事項があります。本日はその件についてお知らせいたします。Snowparkの提供地域と機能が拡大します。
新しいクラウドプロバイダー
Snowflakeでは、すべてのクラウドや当社が展開しているリージョン全体で一様に優れたサポートを提供することを目指しています。クラウドプロバイダーはそれぞれ大きな違いがありますが、それはクラウドプロバイダー側の問題であって、カスタマーの問題ではありません。
本日の発表は、その目標に向けた当社の新たな一歩となる内容です。Java UDFとSnowpark APIがプレビュー版としてAzure全体でご利用いただけるようになりました。
ビッグニュースですが、非常にシンプルで多くを語る必要はありません。これらの機能は両クラウド全体で同じように機能するため、いったん構築したあとは好きなところにデプロイできます。GCPでも、全く同じサポートが近日中に提供可能となる予定です。
テーブル関数
Java UDFへのサポートも大きく拡大し、AWSとAzureの両方でテーブル関数のプレビュー版が提供されることになります。
テーブル関数について、ざっとおさらいしておきましょう。これまで、Snowflakeはスカラー関数のみをサポートしていました。この関数は行ごとに動作し、単数(場合によっては複素数)の結果を返します。スカラー関数の長所は、書きやすく、Snowflakeでクエリの一環として自動的にスケールアウトでき、さらにさまざまな問題に対応している点です。
しかし、スカラー関数ではうまく対処できないこともあります。次のような処理にスカラー関数を使用すると、行き詰まってしまうでしょう。
- 各入力行に対して複数行を返す
- 複数行に渡って状態を維持する
- 行のグループに対して単一の結果を返す
スカラー関数を使用して対処できないこれらの処理は、テーブル関数で実行できます。ただしテーブル関数は、記述と使用に少しだけ手間がかかります。
詳しく見ていきましょう。
例:ワードカウント
行の集まり全体で、各ワードが何回登場するかを算出する場合を考えてみましょう。たとえば、次のような本に関するデータがあるとします。
books
title | author | first_line |
Moby Dick | Herman Melville | Call me Ishmael. |
Billy Budd, Sailor | Herman Melville | In the time before steamships, or then more frequently than now, a stroller along the docks of any considerable seaport would occasionally have his attention arrested by a group of bronzed mariners, man-of-war’s men or merchant sailors in holiday attire ashore on liberty. |
Gravity’s Rainbow | Thomas Pynchon | A screaming comes across the sky. |
Inherent Vice | Thomas Pynchon | She came along the alley and up the back steps the way she always used to. |
A Portrait of the Artist as a Young Man | James Joyce | Once upon a time and a very good time it was there was a moocow coming down along the road and this moocow that was coming down along the road met a nicens little boy named baby tuckoo. |
Ulysses | James Joyce | Stately, plump Buck Mulligan came from the stairhead, bearing a bowl of lather on which a mirror and a razor lay crossed. |
… | … | … |
これらの行内のワードの登場回数を分析するとします。これを行ごとに処理する場合、スカラーJava関数で実行すれば、バリアントディクショナリを使用してカウント数が返されます。
しかし作者ごとなど、一度に複数行をカウントする場合はテーブル関数が必要となります。テーブル関数を構築するには、以下を表示するクラスを指定する必要があります。
- システムがテーブル関数のスキーマを発見するために使用する静的なgetOutputClass()メソッド。(これが必要な理由は、Javaの型消去に起因します。つまり Stream<OutputRow>は事実上、Stream<Object>にコンパイルされます。これについては別の機会に取り上げたいと思います)。
- クエリ内の各パーティションに関してシステムが呼び出すデフォルトのコンストラクタ。スカラー関数では、ハンドラメソッドが同時に呼び出される場合があり、クラスの状態(state)が変更されることはありませんが、テーブル関数のprocess()メソッドは、パーティション内の各行に対して連続的に呼び出され、状態を累積する可能性があります。
- process()メソッド。1行を受け取り、それに対して必要な処理を実行し、潜在的に空の結果のストリームを返します。その後システムがこれを入力行と関連付けます。
- endPartition()メソッド。パーティションの最後で呼び出され、行のストリームを返します。システムは、それらの行をパーティション内の特定の行ではなく、パーティション全体と関連付けます。
例に当てはめてみましょう。
コード
いくつかのJavaフレームと、出力タイプの定義OutputRowから始めます。
import java.util.HashMap;
import java.util.Map;
import java.util.stream.Stream;
public class WordCount {
// A class to define the schema of our output: pairings of words
// and counts.
static class OutputRow {
public final String word;
public final int count;
public OutputRow(String word, int count) {
this.word = word;
this.count = count;
}
}
// The getOutputClass routine is what actually sets this class as
// the return type for the table function.
public static Class getOutputClass() {
return OutputRow.class;
}
出力のためのテーブルは、OutputRowクラスによって定義された2つの列を持ちます。
続いて、Map、カウントの状態を維持するwordCounts、および各パーティションに対して正確なカウントを開始するコンストラクタがあります。
// Keeps partition-wide counts for each word seen.
private final Map wordCounts;
// Each partition will have a new instance of WordCount
// constructed, and we are allowed to maintain state over the
// partition. In this case, we're going to start the partition with
// an empty map, and keep running counts as we go.
public WordCount() {
wordCounts = new HashMap<>();
}
プロセスメソッドは文字列を構成ワードに分解し、見つかったワード数で状態を更新します。
// The process method is called on each record. In this case, we'll
// split up the line and add the words to our partition-wide count.
// This method could return per-row values if we wished, but in our
// case, we'll just return an empty Stream because our results are
// really for the partition as a whole.
public Stream process(String text) {
// Update the counts with the words in this line.
for (String word : text.toLowerCase().split("[.,!\"\\s]+")) {
// If we don't have an entry for the word, set the count to 1.
// Otherwise, increment the count.
wordCounts.compute(word, (key, value) ->
(value == null) ? 1 : value + 1);
}
// We're waiting until the end of the partition to return per-
// word counts and return nothing here.
return Stream.empty();
}
実際は、ラインを処理する際に行を返すことはありませんが、この例では、パーティションあたりのカウント数を計算することにします。実際にやってみましょう。
// The endPartition routine is called at the end of the partition.
// In our case, this will return the total counts across all lines
// in the input.
public Stream endPartition() {
// Stream back the word counts for the whole partition. Calling
// stream() on the keySet enables us to iterate lazily over the
// contents of the map.
return wordCounts.keySet().stream().map(word ->
new OutputRow(word, wordCounts.get(word)));
}
}
こうなりました。これをjarファイルにして、ステージに保存し、そこから登録することも、関数登録全体をインラインで書くこともできます。
create or replace function wordcount(s string)
returns table(word string, count int)
language java
handler = 'WordCount'
as
$$
import java.util.HashMap;
import java.util.Map;
import java.util.stream.Stream;
public class WordCount {
// A class to define the schema of our output: pairings of words
// and counts.
static class OutputRow {
public final String word;
public final int count;
public OutputRow(String word, int count) {
this.word = word;
this.count = count;
}
}
// The getOutputClass routine is what actually sets this class as
// the return type for the table function.
public static Class getOutputClass() {
return OutputRow.class;
}
// Keeps partition-wide counts for each word seen.
private final Map wordCounts;
// Each partition will have a new instance of WordCount
// constructed, and we are allowed to maintain state over the
// partition. In this case, we're going to start the partition with
// an empty map, and keep running counts as we go.
public WordCount() {
wordCounts = new HashMap<>();
}
// The process method is called on each record. In this case, we'll
// split up the line and add the words to our partition-wide count.
// This method could return per-row values if we wished, but in our
// case, we'll just return an empty Stream because our results are
// really for the partition as a whole.
public Stream process(String text) {
// Update the counts with the words in this line.
for (String word : text.toLowerCase().split("[.,!\"\\s]+")) {
// If we don't have an entry for the word, set the count to 1.
// Otherwise, increment the count.
wordCounts.compute(word, (key, value) ->
(value == null) ? 1 : value + 1);
}
// We're waiting until the end of the partition to return per-
// word counts and return nothing here.
return Stream.empty();
}
// The endPartition routine is called at the end of the partition.
// In our case, this will return the total counts across all lines
// in the input.
public Stream endPartition() {
// Stream back the word counts for the whole partition. Calling
// stream() on the keySet enables us to iterate lazily over the
// contents of the map.
return wordCounts.keySet().stream().map(word ->
new OutputRow(word, wordCounts.get(word)));
}
}
$$;
これにより、新しいテーブル関数をクエリで使用できるようになります。
select author, word, count
from books,
table(wordcount(books.first_line) over (partition by author))
order by count desc;
それでは、カウントを取得しましょう。
ご覧ください。ストップワードを処理すれば、より的確な結果が得られるはずですし、より大規模なテキストコーパスを分析することで、より有用なインサイトが得られるでしょう。
結論
Snowparkは、Snowflakeに期待されるシンプルさ、拡張性、セキュリティを維持しながら、データを使って面白いことを簡単に実行できるようにすることを目指しています。これらの拡張機能により、皆さんがSnowparkをさらに便利に活用する機会が増えることを願っています。何か面白いアイデアを思いついた方がいれば、ぜひお聞かせください。
Javaテーブル関数の使用を開始するには、ドキュメンテーションをご確認ください。また、Java UDFやSnowpark APIに関するドキュメンテーションも併せてご確認ください。
Happy hacking!