기쁘게도 Python용 Snowpark가 이제 모든 Snowflake 고객에게 공개 미리 보기로 제공됩니다. Python 개발자는 이제 Snowpark의 엔진에 기본적으로 통합된 Java 및 Scala를 포함해, 널리 사용되는 다른 Snowpark 언어들과 동일한 수준의 사용 편의성, 성능, 보안 이점을 누릴 수 있습니다. 

Snowpark Python API 및 Python Scalar 사용자 정의 함수(UDF)에 더해, 공개 미리 보기 제공의 일부로 Python UDF Batch API(벡터화된 UDF), 테이블 함수(UDTF), 저장 프로시저에 대한 지원을 도입하게 되어 매우 기쁩니다. Anaconda 통합과 결합된 이러한 기능은 데이터 과학자, 데이터 엔지니어, 개발자로 이루어진 발전하는 Python 커뮤니티에 다양하고 유연한 프로그래밍 계약과 오픈 소스 Python 패키지에 대한 손쉬운 액세스를 제공합니다. 그 덕분에 안전하고 확장 가능한 데이터 파이프라인 및 머신 러닝(ML) 워크플로우를 Snowflake 내에서 직접 구축할 수 있습니다.

기능에 대한 간단한 개요

  1. Snowpark Python API로 익숙한 구문을 사용해 데이터 쿼리 및 처리

    Snowpark는 개발자가 사용하길 원하는 언어에 긴밀히 통합된 DataFrame 스타일 프로그래밍을 제공하기 위해 사용되는 새로운 개발자 환경입니다.

    개발자는 Snowpark Python 클라이언트를 설치할 수 있고, 이를 SQL 문자열의 생성 및 전달 없이도 DataFrame을 사용해 쿼리를 구축하기 위해 원하는 IDE 및 개발 도구와 함께 사용할 수 있습니다.
from snowflake.snowpark import Session
from snowflake.snowpark.functions import col

# fetch snowflake connection information
from config import connection_parameters

# build connection to Snowflake
session = Session.builder.configs(connection_parameters).create()

# use Snowpark API to aggregate book sales by year
booksales_df = session.table("sales")
booksales_by_year_df = booksales_df.groupBy(year("sold_time_stamp")).agg([(col("qty"),"count")]).sort("count", ascending=False)
booksales_by_year_df.show()

내부적으로 DataFrame 작업은 푸시다운되는 동시에 이미 우리가 아는 고성능에 확장 가능한 그 동일한 Snowflake 엔진의 이점을 활용하는 SQL 쿼리로 투명하게 변환됩니다! 또한 API는 최고 수준의 언어 구조를 사용하기 때문에 개발 환경에서 유형 검사, IntelliSense, 오류 보고에 대한 지원도 받을 수 있습니다. 이번 공개 미리 보기에 대한 발표와 함께, Snowpark Python 클라이언트 API를 오픈 소싱한다는 점도 전해 드리게 되어 정말 기쁩니다. 당사는 API를 확장하고 궁극적으로 더 나은 API를 제공하는 데 Python 커뮤니티가 큰 역할을 해 주리라 기대합니다. Github 리포지토리를 통해 더 자세히 알아보고, 여러분도 기여해 보시기를 권합니다.

2. Anaconda 통합을 통해 인기 있는 오픈 소스 패키지에 원활하게 액세스

Snowpark를 사용하면 DataFrame 작업을 편리하게 작성할 수 있지만, 이는 단지 쿼리를 작성하는 좋은 방법에 불과한 것이 아닙니다. 사용자 지정 Python 로직을 UDF로 가져와(자세한 내용은 아래 참조), Snowflake에 미리 설치된 인기 있는 오픈 소스 패키지를 활용하는 것 또한 가능합니다.

Python의 힘은 오픈 소스 패키지의 풍부한 생태계에 있기 때문에, Anaconda 통합을 통해 데이터 클라우드에 원활한 엔터프라이즈급 오픈 소스 혁신을 Python용 Snowpark 제품의 일부로 제공하게 되어 정말 기쁘게 생각합니다. Anaconda의 포괄적인 오픈 소스 패키지 세트 및 원활한 종속성 관리를 통해, Python 기반 워크플로우의 속도를 높일 수 있습니다.

Python 워크플로우를 구축하기 위해 Anaconda에서 제공하는 타사 라이브러리를 어떻게 활용할 수 있는지 소개해 드리겠습니다.

  • 로컬 개발: Conda 리포지토리에서 호스팅되는 Snowflake 채널에서도 Snowflake 환경에서 사전 구축된 동일한 라이브러리 및 버전 세트를 사용할 수 있습니다. 로컬에서 개발할 때 로컬 Anaconda 설치를 Snowflake 채널로 지정하기만 하면 Snowflake 서버 측에서 사용 가능한 것과 동일한 패키지 및 버전을 사용할 수 있습니다.
  • 패키지 관리: Python 함수가 타사 라이브러리에 의존하는 경우 UDF 선언 중에 필요한 Python 패키지를 지정하기만 하면 됩니다. Snowpark는 자동으로 서버 측의 종속성을 선택하고 해결하며 통합 Conda 패키지 관리자를 사용해 관련 패키지를 Python UDF 실행 환경에 설치합니다. 이를 통해 서로 다른 패키지 간의 종속성을 해결하는 데 더 이상 시간을 할애할 필요가 없어지고, 그 결과 엄청난 시간 낭비가 될 수 있는 ‘종속성 지옥’이 제거됩니다. 
  • 안전하고 확장 가능한 처리: 마지막으로 UDF가 실행되면 이는 Snowflake 컴퓨팅 전체에 분산되고 Snowflake 처리 엔진에서 제공하는 규모와 보안 처리로부터 이점을 얻게 됩니다. 

이 모든 것이 Snowflake의 표준 소비 기반 가격으로 제공되며, Snowflake와 함께 사용할 경우 어떠한 부가 기능 및 제품에 대해서도 추가 비용이 발생하지 않습니다!

“NumPy 및 SciPy와 같은 가장 인기 있는 Python 라이브러리를 사용하면 개발 프로세스에서 또 다른 관리 계층을 제거할 수 있습니다. 또 통합 Conda 패키지 관리자를 사용하면 종속성 관리에 대해서도 걱정할 필요가 없습니다.”

Jim Gradwell, HyperGroup, 데이터 엔지니어링 및 머신 러닝 책임자


HyperGroup의 블로그를 통해 HyperFinity가 Python용 Snowpark로 서버리스 아키텍처를 간소화하는 방법에 대해 더 자세히 알아보십시오!

3. Python 사용자 정의 함수로 사용자 지정 코드 가져오기 및 실행

위에서 언급한 바와 같이 Snowpark는 사용자 지정 Python 로직을 Snowflake에 푸시할 수 있고, 그것을 데이터 바로 옆에서 실행할 수 있습니다. 이는 Snowflake의 컴퓨팅 리소스 내부에 호스팅되는 안전한 샌드박스 처리된 Python 런타임을 통해 가능합니다. 함수가 UDF임을 선언하고 코드가 종속된 타사 패키지를 지정하기만 하면 됩니다.

#Given geo-coordinates, UDF to calculate distance between warehouse and shipping locations

from snowflake.snowpark.functions import udf
import geopandas as gpd
from shapely.geometry import Point

@udf(packages=['geopandas'])
def calculate_distance(lat1: float, long1: float, lat2: float, long2: float)-> float:
  points_df = gpd.GeoDataFrame({'geometry': [Point(long1, lat1), Point(long2, lat2)]}, crs='EPSG:4326').to_crs('EPSG:3310')
  return points_df.distance(points_df.shift()).iloc[1]

# Call function on dataframe containing location coordinates
distance_df = loc_df.select(loc_df.sale_id, loc_df.warehouse_address, loc_df.shipping_address, \
   calculate_distance(loc_df.warehouse_lat, loc_df.warehouse_lng, loc_df.shipping_lat, loc_df.shipping_lng) \
   .alias('warehouse_to_shipping_distance'))

Snowpark는 사용자 지정 코드를 Python 바이트 코드로 직렬화하고, 필요한 종속성을 해결 및 설치한 다음 모든 로직을 Snowflake로 푸시합니다. 따라서 데이터 바로 옆에 있는 보안 샌드박스에서 실행됩니다. DataFrame 작업에서 일반 Python 함수와 마찬가지로 UDF를 호출하고 사용할 수 있습니다.

SQL과 Python 통합

지금까지 살펴본 것처럼, Snowpark API에는 DataFrame 쿼리와 사용자 지정 로직 모두를 Snowflkae에 푸시할 수 있는 기능이 있습니다. 하지만 저희는 SQL 사용자도 Python 함수의 이점을 온전히 활용할 수 있게 하기를 원합니다. 이러한 함수를 구축하려면 기본적인 작업을 수행할 때 코드를 간단한 인라인 정의로 작성하고 Snowflake 워크시트에서 직접 SQL을 사용해 UDF를 등록합니다. 

보다 복잡한 사용 사례의 경우에는 소스 제어, 개발 환경, 디버깅 도구와 같은 기존에 있는 도구 세트를 최대한 활용하고 로컬에서 개발 및 테스트할 수 있습니다. 그런 다음 해당 코드를 Snowflake로 가져올 수 있습니다. 코드를 SQL로 가져오려면 코드를 zip 파일로 패키징하고, Snowflake 단계에 로드하고, Python 함수로 등록할 때 가져오기로 지정하기만 하면 됩니다. Python UDF가 생성되면 모든 SQL 사용자는 다른 함수와 마찬가지로 로직을 일반 쿼리의 일부로 사용할 수 있습니다. 또한 스트림 및 작업을 사용해 ELT, ML 및 CI/CD 워크플로우를 자동화할 수 있는데, Python 함수는 다른 함수가 사용되는 거의 모든 곳에서 사용할 수 있기 때문입니다. 

기타 UDF 계약


이전 예에서는 Scalar UDF를 사용했는데, 이 UDF는 각 행에서 개별적으로 작동하고 단일 결과를 생성합니다. Scalar 함수는 작성하기가 쉽고 다양한 범위의 문제를 해결할 수 있지만 Scalar 계약이 부족한 상황에서는 추가적인 UDF 계약이 유용할 수 있습니다.
Python UDF Batch API를 사용하면, Pandas DataFrame으로 입력 행의 배치를 수신하고 Pandas 배열 또는 시리즈로 결과 배치를 반환하는 Python 함수를 정의할 수 있습니다. Batch API는 Scalar UDF를 사용한 행별 처리 패턴에 비해 더 나은 성능을 제공할 수 있는데, 특히 Python 코드가 행 배치에서 효율적으로 작동하는 경우에 그러합니다. Batch API를 사용하면 성능 개선에 더해 Pandas DataFrames 또는 Pandas 배열에서 작동하는 라이브러리를 호출할 때 필요한 변환 로직이 줄어듭니다. 그러한 예로 Batch API(스테이지에서 사전 학습된 XGBoost 모델 로드)를 사용하는 아래에 정의된 추론 UDF는 수백만 개의 행이 있는 대규모 테이블을 스코어링할 때 상당한 성능 향상을 보입니다.

–- Using SQL to register a UDF to predict customer propensity score based on historical sales data
create or replace function score_vec(category string, region_code string)
  returns double
  language python
  runtime_version = '3.8'
  imports=('@ml_demo/model/model_0_0_0.csv')
  packages = ('numpy','pandas','xgboost','scikit-learn')
  handler = 'score'
as $$
import pandas as pd
import xgboost as xgb
import sys, pickle, codecs
from _snowflake import vectorized

import_dir = sys._xoptions["snowflake_import_directory"]
with open(import_dir + 'model_0_0_0.csv', 'r') as file:
  file_contents = file.read()
pickle_jar = pickle.loads(codecs.decode(file_contents.encode(), "base64"))
bst = pickle_jar[0]
ohe = pickle_jar[1]
bst.set_param('nthread', 1)

@vectorized(input=pd.DataFrame)
def score(features):
  dscore = xgb.DMatrix(ohe.transform(features).toarray())
  return bst.predict(dscore)
$$;

-- Scoring 20M rows using the Batch API takes about ~8 sec vs ~5 mins for a Scalar UDF on an XL warehouse
select score_vec(category, region_code) from wholesale_customer_sales

Python 테이블 함수는 각 입력 행에 대해 여러 행을 반환하고, 행 그룹에 대한 단일 결과를 반환하고, Scalar 또는 Batch API로 수행할 수 없는 여러 행에 걸쳐 상태를 유지할 수 있게 해 줍니다. 다음은 UDTF를 사용하는 방법과 개체명 인식을 수행하기 위해 spaCy 라이브러리를 활용하는 방법의 간단한 예입니다. UDTF는 문자열을 입력으로 받아들이고 인식된 개체 및 해당 레이블의 테이블을 반환합니다.

–-UDTF that uses Spacy for named entity recognition

create or replace function entities(input string)
  returns table(entity string, label string)
  language python
  runtime_version = 3.8
  handler = 'Entities'
  packages = ('spacy==2.3.5')
  imports = ('@spacy_stage/spacy_en_core_web_sm.zip')
as $$
import zipfile
import spacy
import os
import sys

import_dir = sys._xoptions["snowflake_import_directory"]
extracted = '/tmp/en_core_web_sm' + str(os.getpid())
with zipfile.ZipFile(import_dir + "spacy_en_core_web_sm.zip", 'r') as zip_ref:
    zip_ref.extractall(extracted)

nlp = spacy.load(extracted + "/en_core_web_sm/en_core_web_sm-2.3.1")

class Entities:
  def process(self, input):
    doc = nlp(input)
    for ent in doc.ents:
      yield (ent.text, ent.label_)
$$;
select * from table(entities('Hi this is Ryan from Colorado, calling about my missing book from order no 2689.'));

“고객 환경을 보호하기 위해서는 계속해서 혁신해야 합니다. Sophos는 머신 러닝 모델이 구축된 수십 개의 사이버 보안 솔루션을 보유하고 있고 항상 추가 모델을 출시하고 있습니다. Python용 Snowpark를 사용하면 초기 파이프라인에서 생산 중인 모델 추론에 이르기까지 머신 러닝 모델의 개발 및 운영을 획기적으로 간소화하고 성장시킬 수 있습니다.”

Konstantin Berlin, Sophos AI, 선임 이사

4. 저장 프로시저를 사용해 Snowflake에 코드를 직접 호스팅 및 실행

이전 예에서 살펴본 것처럼, Snowpark Python API는 DataFrame과 같은 강력한 추상화를 사용해 데이터 파이프라인을 구축 및 실행하는 클라이언트 측 프로그램을 작성할 수 있게 합니다. 하지만 파이프라인을 구축하고 테스트했다면 다음 단계는 파이프라인을 실행하는 것입니다. 그러기 위해서는 호스팅할 홈을 찾고 해당 클라이언트 프로그램을 예약해야 합니다. Snowpark Python 저장 프로시저를 사용하면 바로 그 작업을 수행할 수 있습니다! Snowflake 가상 웨어하우스를 컴퓨팅 프레임워크로 사용함으로써 Python 파이프라인을 Snowflake 내에 바로 호스팅할 수 있고, 스케줄링 작업과 같은 Snowflake 기능과 통합할 수 있습니다. 이렇게 하면 관련된 시스템의 수를 줄이고 모든 것을 Snowflake에 포함된 모든 것을 독립적으로 관리할 수 있고 엔드 투 엔드 스토리를 단순화합니다.

그리고 가장 좋은 점은 Snowpark 저장 프로시저가 매우 강력하면서도 사용하기에 아주 편리하다는 것입니다. 다음은 일 단위로 판매 보너스를 계산하고 적용하는 Snowpark Python 파이프라인을 실행하는 간단한 예입니다.

-- Create python stored procedure to host and run the snowpark pipeline to calculate and apply bonuses
create or replace procedure apply_bonuses(sales_table string, bonus_table string)
  returns string
  language python
  runtime_version = '3.8'
  packages = ('snowflake-snowpark-python')
  handler = 'apply_bonuses'
AS
$$
from snowflake.snowpark.functions import udf, col
from snowflake.snowpark.types import *

def apply_bonuses(session, sales_table, bonus_table):
  session.table(sales_table).select(col("rep_id"), col("sales_amount")*0.1).write.save_as_table(bonus_table)
  return "SUCCESS"
$$;

--Call stored procedure to apply bonuses
call apply_bonuses('wholesale_sales','bonuses');

– Query bonuses table to see newly applied bonuses
select * from bonuses;

– Create a task to run the pipeline on a daily basis
create or replace task bonus_task
warehouse = 'xs'
schedule = '1440 minuite'
as
call apply_bonuses('wholesale_sales','bonuses');

“Python용 Snowpark은 Allegis가 아키텍처를 간소화하는 동시에 ML 기반 솔루션을 시장에 더 빨리 출시하는 데 도움이 됩니다. 데이터 과학자는 저장 프로시저와 미리 설치된 패키지를 사용해 Python 코드를 데이터에 더 가깝게 실행함으로써 Snowflake의 탄력적인 성능 엔진의 이점을 누릴 수 있습니다.”

Joe Nolte, Allegis, AI 및 MDM 도메인 아키텍트

Snowpark 가속화 프로그램

또한 저희는 파트너 생태계에서 통합 구축에 큰 관심을 두는 것을 보게 되어 정말 기쁩니다. 그러한 통합 구축은 Snowflake를 기반으로 Python용 Snowpark를 활용해 고객 경험을 향상합니다.

이 출시 시점에 Snowpark 가속화 프로그램에는 수십 개의 파트너들이 있으며, 이들은 이미 데이터 클라우드에서 Python의 기능을 고객에게 확장하는 제품 통합 및 전문 지식을 구축했습니다. 

다음 단계는?

당사는 또한 Summit의 오프닝 기조연설에서 여러 가지 흥미로운 기능을 발표했는데, 더 자세한 내용은 여기에서 읽어 볼 수 있습니다. 그러한 기능들을 통해 ML 학습을 위한 더 큰 메모리 웨어하우스, Snowflake 내의 기본 Streamlit 통합, 데이터 클라우드에서 애플리케이션을 구축 및 배포하기 위한 네이티브 애플리케이션 프레임워크를 포함한 Python용 Snowpark로 더 많은 작업을 수행할 수 있습니다. 

Snowpark를 통한 더 빠르고 스마트한 작업

Snowpark는 무엇보다도 Snowflake 플랫폼의 단순성, 확장성, 보안을 유지하면서 데이터로 매우 영향력 있는 작업을 쉽게 수행하기 위한 것입니다. 고객이 Python용 Snowpark으로 무엇을 구축할지 굉장히 기대가 됩니다. 

시작하는 데 도움을 얻으려면 다음의 자료를 참조하십시오.

Python용 Snowpark가 실제로 어떻게 작동하는지 보고 싶다면 Snowpark Day에 등록하세요.

해피 해킹!

Snowpark에서 최신 정보 보기

미래 전망 진술

이 포스트에는 명시적 및 묵시적인 미래 전망 진술이 포함되어 있습니다. 여기에는 (i) Snowflake의 비즈니스 전략, (ii) 개발 중이거나 일반적으로 사용할 수 없는 제품을 포함한 Snowflake 제품, 서비스, 기술 제품, (iii) 시장 성장, 추세 및 경쟁 고려 사항, (iv) 타사 플랫폼과 Snowflake 제품의 통합 및 상호 운용성, 타사 플랫폼에서의 Snowflake 제품 가용성과 관련된 진술이 포함됩니다. 이 미래 전망 진술은 ‘위험 요인’이라는 제목 아래 기술된 내용, Snowflake가 증권 거래 위원회에 제출하는 양식 10-Q의 분기별 보고서와 양식 10-K의 연간 보고서에 기술된 내용을 포함하여 수많은 위험, 불확실성 및 가정의 영향을 받습니다. 이러한 위험, 불확실성 및 가정에 비추어, 실제 결과는 미래 전망 진술에서 예상했거나 암시한 내용과 달라 실질적이고 불리한 영향을 미칠 수 있습니다. 그러므로, 미래 사건에 대한 예측으로서 작성된 미래 예측 진술에 의존해서는 안 됩니다. 

© 2022 Snowflake Inc. All rights reserved. 여기에 언급된 Snowflake, Snowflake 로고 및 기타 모든 Snowflake 제품, 기능 및 서비스 이름은 미국 및 기타 국가에서 Snowflake Inc.의 등록 상표 또는 상표입니다. 여기에 언급되거나 사용된 기타 모든 브랜드 이름 또는 로고는 식별 목적으로만 사용되며 해당 소유자의 상표일 수 있습니다. Snowflake는 그러한 소유자와 연관되거나 그러한 소유자로부터 후원 또는 보증을 받지 않습니다.

참고: 이 내용은 2022. 6. 14에 게시된 컨텐츠(Snowpark Python: Bringing Enterprise-Grade Python Innovation to the Data Cloud)에서 번역되었습니다.