참고: 이 내용은 2022. 2. 17에 게시된 컨텐츠(Operationalizing Data Pipelines With Snowpark Stored Procedures, Now in Preview)에서 번역되었습니다.

AWS 고객용 Snowpark GA의 최근 출시에 뒤이어, 이제 모든 고객이 모든 클라우드에서 Snowpark Scala 저장 프로시저를 미리 보기를 통해 살펴볼 수 있게 되었음을 발표하게 되어 기쁩니다.

Snowpark는 DataFrames와 같이 강력한 추상화를 사용하여 데이터 파이프라인을 구축하고 실행하기 위해 언어가 통합된 방식을 제공합니다. Snowpark를 통해, 실행하고자 하는 파이프라인을 설명하는 클라이언트 측 프로그램을 작성할 수 있습니다. 또한 모든 무거운 작업은 Snowflake의 탄력적 컴퓨팅 엔진에서 수행합니다.

하지만 파이프라인을 구축하고 테스트를 완료했다면 다음 단계는 이를 운영하는 것입니다. 운영하려면 클라이언트 프로그램을 호스팅하고 예약하는 홈을 찾아야 합니다. 또한 지금까지 이러한 홈은 Snowflake 외부에 있어야 했습니다.

Snowpark 저장 프로시저로 인해 이러한 부분이 완전히 변했습니다. 이제 Snowflake 가상 웨어하우스를 컴퓨팅 프레임워크로 사용하여 파이프라인을 Snowflake 내부에서 바로 호스팅할 수 있습니다. 또한 예약을 위해 작업과 같은 Snowflake 기능과 자연스럽게 통합됩니다. 이는 관련된 시스템의 수를 줄이고 Snowflake 내부에 모든 것을 저장하여 엔드 투 엔드 스토리를 단순화합니다.

가장 좋은 점은 Snowpark 저장 프로시저가 매우 강력한 동시에 사용이 아주 쉽다는 것입니다.

예시를 살펴봅시다. Snowpark에서 엄청나게 복잡한 프로세스를 구축할 수 있지만 저장 프로시저에 집중하기 위해 단순한 예시를 들어 보겠습니다.

import com.snowflake.snowpark._
import com.snowflake.snowpark.functions._
 
object Main {
 def main(args: Array[String]): Unit = {
   // Connect
   val sess = Session.builder.configFile("config_demo.prop").create
 
   // run the pipeline
   println(processSales(sess))
 }
 
 def processSales(sess: Session): String = {
   import sess.implicits._
 
   // UDF to bucket sales by amount
   val qualifySale = udf((price:Float) =>
     if (price < 0.00) "ERROR"
     else if (price < 100.00) "Small"
     else if (price < 10000.00) "Medium"
     else "Large"
   )
 
   // Create and run the pipeline
   sess.table("raw_sales")
       .withColumn("size", qualifySale('sale_amount))
       .join(sess.table("catalog"), 'item_id === 'item)
       .drop('item)
       .write.saveAsTable("processed_sales")
 
   return "Success!"
 }
}

저희는 파이프라인을 두 부분으로 나눴습니다. 아주 간단한 main() 루틴은 파일에 저장된 구성으로부터 세션을 생성한 다음 파이프라인을 실질적으로 구현하는 processSales() 함수로 호출됩니다. 잠시 후에 이 부분을 다시 다루겠습니다.

processSales() 함수 또한 꽤나 단순합니다. 이 함수는 단순한 UDF를 생성하여 금액으로 영업을 충족합니다. 그런 다음 이것과 카탈로그와의 결합을 사용하여 어떠한 원시 영업을 처리된 테이블로 변환합니다. 예를 들어 다음을 변환합니다.

변환 결과:

하지만 이제 이 파이프라인을 운영하고 매일 밤 실행하고 싶습니다. 이를 위해 먼저 Snowpark 저장 프로시저를 생성합니다. 복잡한 코드가 있거나 라이브러리도 함께 가져오고 싶다면 JAR에서 이를 생성할 수 있습니다. 그러나 이와 같은 단순한 예시에서는 위에 있는 코드를 단순히 복사하여 단순한 인라인 SQL 구문에 붙여넣겠습니다.

create or replace procedure processSales() returns string
language scala
runtime_version=2.12
packages=('com.snowflake:snowpark:latest')
handler = 'Main.processSales'
target_path = '@jars/processSales.jar'
as
$$
import com.snowflake.snowpark._
import com.snowflake.snowpark.functions._

object Main {
  def main(args: Array[String]): Unit = {
    // Connect
    val sess = Session.builder.configFile("config_demo.prop").create

    // run the pipeline
    println(processSales(sess))
  }

  def processSales(sess: Session): String = {
    import sess.implicits._

    // UDF to bucket sales by amount
    val qualifySale = udf((price:Float) => 
      if (price < 0.00) "ERROR"
      else if (price < 100.00) "Small"
      else if (price < 10000.00) "Medium"
      else "Large"
    )

    // Create and run the pipeline
    sess.table("raw_sales")
        .withColumn("size", qualifySale('sale_amount))
        .join(sess.table("catalog"), 'item_id === 'item)
        .drop('item)
        .write.saveAsTable("processed_sales")

    return "Success!"
  }
}
$$;

이 예시에서 두 가지에 집중하시기 바랍니다. 먼저 이 프로시저를 위해 processSales()를 핸들러로 지정했습니다. main() 루틴이 실질적으로 사용되지 않으며 저장 프로시저에서 완전히 삭제할 수 있습니다. 두 번째로 Scala processSales()는 매개변수 또는 세션을 사용합니다. 반면 SQL에 노출된 프로시저는 매개변수를 사용하지 않습니다.

이는 일반적인 패턴입니다. Snowflake는 언제나 세션 객체를 여러분의 핸들러를 호출할 때 첫 번째 매개변수로 삽입합니다. 이 세션은 저장 프로시저가 호출된 세션으로 다시 라우팅됩니다. 이는 Snowflake로 되돌아가는 단순한 링크를 제공합니다. 또한 저희 예시에서처럼 Snowflake 외부에서 실행 및 테스트할 수 있는 코드를 쉽게 만들 수 있습니다. 그런 다음 저장 스토리지에서 사용하기 위해 Snowflake로 가져옵니다.

준비된 저장 프로시저로 다음을 호출할 수 있습니다.

call processed_sales();

또한 작업과 함께 이를 예약할 수 있습니다.

create or replace task process_sales_nightly
warehouse = 'small'
schedule = '1440 minute'
as
call processed_sales();

Snowpark 저장 프로시저에 대한 간략한 설명이 끝났습니다. 단순한 기능으로 Snowflake 내부에 바로 파이프라인을 호스팅하여 단순화할 수 있습니다. Snowpark 빠른 시작을 사용하여 테스트해 보시기 바랍니다. 자세한 내용은 설명서를 참조하십시오.

해피 해킹!