注:本記事は(2022年2月17日)に公開された(Operationalizing Data Pipelines With Snowpark Stored Procedures, Now in Preview)を翻訳して公開したものです。

先日のAWS利用顧客へのSnowparkの一般提供に続き、クラウドを利用するすべての顧客に対し、Snowpark Scalaストアドプロシージャのプレビュー版の提供を開始いたします。

Snowparkは、強力な抽象化手法であるDataFrameを使用し、統合言語によるデータパイプラインの構築および実行を可能にします。Snowparkでは、実行したいパイプラインを記述するクライアント側のプログラムを書くと、手間のかかる仕事はすべて伸縮性のあるSnowflakeコンピューティングエンジンに任せることができます。

パイプラインの構築とテストが終わったら、次のステップはその運用です。そのためには、ホストやクライアントプログラムをスケジュールするホームを決定する必要があります。これまで、そのホームはSnowflake外部にある必要がありました。

Snowparkストアドプロシージャを使用することで、この点が完全に変わります。Snowflake仮想ウェアハウスをコンピューティングフレームワークとして使用し、必然的にSnowflake機能を統合(スケジューリングにタスクを使用するなど)することで、Snowflake内でパイプラインをホストすることが可能です。これにより関与するシステム数が減り、すべてがSnowflake内で自己完結するため、エンドツーエンドのストーリーが簡素化されます。

Snowparkのストアドプロシージャは非常に強力であるだけでなく、使い方もとてもシンプルです。

例を見てみましょう。Snowpark内ではかなり複雑なプロセスを構築することもできますが、ここではストアドプロシージャに注目するためにごくシンプルな例を取り上げています。

import com.snowflake.snowpark._
import com.snowflake.snowpark.functions._
 
object Main {
 def main(args: Array[String]): Unit = {
   // Connect
   val sess = Session.builder.configFile("config_demo.prop").create
 
   // run the pipeline
   println(processSales(sess))
 }
 
 def processSales(sess: Session): String = {
   import sess.implicits._
 
   // UDF to bucket sales by amount
   val qualifySale = udf((price:Float) =>
     if (price < 0.00) "ERROR"
     else if (price < 100.00) "Small"
     else if (price < 10000.00) "Medium"
     else "Large"
   )
 
   // Create and run the pipeline
   sess.table("raw_sales")
       .withColumn("size", qualifySale('sale_amount))
       .join(sess.table("catalog"), 'item_id === 'item)
       .drop('item)
       .write.saveAsTable("processed_sales")
 
   return "Success!"
 }
}

パイプラインを2つの部分に分けている点に注目してください。非常にシンプルなmain()ルーチンはファイルに保存された設定からSessionを作成するだけで、呼び出されたprocessSales()関数が実際のパイプラインの実装を行います。これについては後から説明します

processSales()も非常に明快な関数です。金額に基づいて営業を評価するシンプルなUFDを作成し、それをcatalogと結合し、raw_salesをprocessed_salesへと変換します。例示するとこれが、

このようになります。

次は、このパイプラインを運用し毎晩実行するようにしたいと思います。そのためにはまず、Snowparkストアドプロシージャを作成します。複雑なコードであればJARを利用したりライブラリから呼び出して作成することもできますが、今回のようにシンプルな例であれば、上述のコードをシンプルなインラインSQLシンタックスにコピー&ペーストするだけでOKです。

create or replace procedure processSales() returns string
language scala
runtime_version=2.12
packages=('com.snowflake:snowpark:latest')
handler = 'Main.processSales'
target_path = '@jars/processSales.jar'
as
$$
import com.snowflake.snowpark._
import com.snowflake.snowpark.functions._

object Main {
  def main(args: Array[String]): Unit = {
    // Connect
    val sess = Session.builder.configFile("config_demo.prop").create

    // run the pipeline
    println(processSales(sess))
  }

  def processSales(sess: Session): String = {
    import sess.implicits._

    // UDF to bucket sales by amount
    val qualifySale = udf((price:Float) => 
      if (price < 0.00) "ERROR"
      else if (price < 100.00) "Small"
      else if (price < 10000.00) "Medium"
      else "Large"
    )

    // Create and run the pipeline
    sess.table("raw_sales")
        .withColumn("size", qualifySale('sale_amount))
        .join(sess.table("catalog"), 'item_id === 'item)
        .drop('item)
        .write.saveAsTable("processed_sales")

    return "Success!"
  }
}
$$;

今回の例では、2つの点に注目してください。1つ目はプロシージャのハンドラーとしてprocessSales() を指定している点です。main()ルーチンは実際には使用されておらず、ストアドプロシージャから完全に削除することが可能です。2つ目はScala processSales()はパラメータとしてSessionをとる一方、SQLに呼び出されるプロシージャではパラメータをとらないという点です。

これは一般的なパターンです。Snowflakeは常に、ハンドラーへの呼び出しの最初のパラメータとしてSessionオブジェクトをインジェクトします。このSessionはストアドプロシージャが呼び出されたセッション上にルートバックします。これにより、Snowflakeに戻るシンプルなリンクを提供するだけでなく、Snowflake外部での実行とテストが可能であり、Snowflakeに戻すことでストアドプロシージャ内で使用できる今回の例のようなコードの構築の際非常に便利です。

ストアドプロシージャを使用可能にして、以下のように呼び出します。

call processed_sales();

次にtaskでスケジューリングを行います。

create or replace task process_sales_nightly
warehouse = 'small'
schedule = '1440 minute'
as
call processed_sales();

Snowflake内でホストすることでパイプラインの簡素化を実現するシンプルな機能。つまり、これがSnowparkストアドプロシージャです。Snowparkクイックスタートを使用してまずは試してみましょう。詳しくはドキュメンテーションをご覧ください。

以上、参考になれば幸いです。