참고: 이 내용은 2021.10.19 에 게시된 컨텐츠(Bringing More to the Table: Azure and UDTF Support with Snowpark)에서 번역되었습니다.

6월에 우리는 JAVA 함수와 Snowpark API를 AWS의 미리 보기용으로 사용할 수 있음을 발표했습니다. 오늘 우리는 그 미리 보기에 대한 몇 가지 추가 사항, 즉 Snowpark를 사용할 수 있는 곳과 Snowpark로 할 수 있는 작업 모두를 확장하고 있음을 발표합니다.

새로운 클라우드 공급자

Snowflake에서 우리는 운영하는 모든 클라우드와 지역에서 동일하고 훌륭한 지원을 제공하는 것을 목표로 합니다. 클라우드 공급자는 서로 상당히 다를 수 있지만 그것은 우리의 문제여야지 여러분의 문제가 되어서는 안 됩니다. 

오늘, Java 함수와 Snowpark API의 미리 보기가 이제 모든 Azure에서 제공된다는, 그 목표를 향한 또 다른 단계를 발표하게 되어 기쁩니다.

그리고 중요한 소식은 더 이상 할 말이 없다는 것입니다. 이러한 기능이 두 클라우드 모두에서 동일하게 작동하므로 한 번 개발해서 원하는 위치에 배포할 수 있습니다. GCP(Google Cloud Platform)를 사용하고 계신다면, 여러분을 잊지 않았습니다. 곧 동일한 지원이 제공될 예정입니다.

테이블 함수

또한 Java 함수 지원이 크게 확장되었음을 발표합니다. 테이블 함수의 미리 보기가 이제 AWS와 Azure 모두에서 제공됩니다.

테이블 함수를 이해하기 위해 잠시 뒤로 가보겠습니다. 이전에는 스칼라 함수만 지원했습니다. 즉, 각 행에서 개별적으로 작동하고 단일(복잡할 수 있음) 결과를 생성하는 함수입니다. 스칼라 함수는 훌륭합니다. 작성하기 쉽고 Snowflake가 쿼리의 일부로 확장할 수 있으며 광범위한 문제를 해결합니다.

하지만 스칼라 함수로 할 수 없는 일이 있습니다. 다음 중 하나를 수행해야 하는 경우 막히게 됩니다.

  • 각 입력 행에 대해 여러 행 반환
  • 여러 행에서 상태 유지
  • 행 그룹에 대한 단일 결과 반환

스칼라 함수로는 이러한 작업을 수행할 수 없지만 테이블 함수에서는 이 모든 것이 가능합니다. 단점은 작성 및 사용이 약간 더 번거롭다는 것입니다. 

한 번 보시죠.

예: 단어 수

행 모음에서 각 단어가 몇 번 발생하는지 계산하려 한다고 가정해 보겠습니다. 예를 들어 다음과 같은 책에 대한 데이터가 있다고 가정합니다.

제목저자첫_줄
모비 딕허먼 멜빌나를 이슈마엘이라 불러달라.
선원, 빌리 버드허먼 멜빌증기선이 생기기 전에는 지금보다 더 자주, 어떤 유명한 항구의 부두를 따라 유모차가 가다가 이따금 자유를 위해 해변에 상륙하던, 휴가 복장을 한 구릿빛으로 그을린 선원들, 전쟁 참전 용사들이나 상인 선원들에 주의를 사로잡히곤 했다.
중력의 무지개토머스 핀천절규는 하늘을 가로질러온다.
고유의 결함(Inherent Vice)토머스 핀천그녀는 늘 하던 대로 골목길을 따라 뒤 계단을 올라갔다.
젊은 예술가의 초상제임스 조이스옛날 옛적, 아주 살기 좋았던 시절에 길을 따라 내려오는 음매 소가 있었지. 그런데 길을 따라 내려오고 있던 이 음매 소는 베이비 터쿠라는 이름을 가진 어여쁜 사내아이를 만났어.
율리시스제임스 조이스몸집이 당당한 벅 멀리건이 비누거품이 이는 그릇을 들고 계단 꼭대기에서 나타났다. 그릇 위에는 거울과 면도날이 십자로 포개져 놓여 있었다.

우리는 이 줄에 있는 단어의 빈도를 분석하는 데 관심이 있습니다. 행별로 이 작업을 수행하려는 경우 갖가지 사전을 사용하여 그 개수를 반환하는 스칼라 Java 함수로 그렇게 할 수 있습니다.

그러나 우리는 여러 행에 걸쳐(예: 한 번에 각 저자에 대한) 그 개수를 구하고 싶었습니다. 그러기 위해서는 테이블 함수가 필요합니다. 테이블 함수를 구축하기 위해서는 다음을 드러내는 클래스를 제공해야 합니다.

  • 시스템이 테이블 함수의 스키마를 검색하는 데 사용할 수 있는 정적 getOutputClass() 메서드. (Java의 유형 삭제때문에 이것이 필요합니다. 즉, Stream<OutputRow>는 효과적으로 Stream<Object>로 컴파일되지만 이건 따로 할 이야기입니다.) 
  • 시스템이 쿼리의 각 파티션에 대해 호출하는 기본 생성자. 핸들러 메서드가 병렬로 호출될 수 있으므로 클래스 상태를 수정해서는 안 되는 스칼라 함수와 달리 테이블 함수에 대한 process() 메서드는 파티션의 각 행에 대해 순차적으로 호출되며 상태를 축적할 수 있습니다.
  • 단일 행을 사용하는 process() 메서드는 필요한 모든 작업을 수행하고 시스템이 입력 행과 연관시킬 잠재적으로 빈 결과 Stream을 반환합니다.
  • 파티션의 끝에서 호출되는 endPartition() 메서드는 파티션의 특정 행보다는 시스템이 전체 파티션과 연결하는 행의 Stream을 반환할 수 있습니다.

우리의 예시에 이를 합쳐 봅시다.

코드

먼저 일부 Java 프레이밍과 우리의 출력 유형인 OutputRow의 정의부터 살펴보겠습니다.

import java.util.HashMap;
import java.util.Map;
import java.util.stream.Stream;
 
public class WordCount {
 // A class to define the schema of our output: pairings of words
 // and counts.
 static class OutputRow {
   public final String word;
   public final int count;
 
   public OutputRow(String word, int count) {
     this.word = word;
     this.count = count;
   }
 }
 
 // The getOutputClass routine is what actually sets this class as
 // the return type for the table function.
 public static Class getOutputClass() {
   return OutputRow.class;
 }

출력 테이블에는 OutputRow 클래스에 의해 정의된 두 개의 열(단어 및 발생 횟수)이 있습니다. 

계속해서, 우리에게는 카운트 상태를 유지하는 Map, wordCounts와 각 파티션에 대해 클린 카운트를 초기화하는 생성자가 있습니다.

 // Keeps partition-wide counts for each word seen.
 private final Map wordCounts;
 
 // Each partition will have a new instance of WordCount
 // constructed, and we are allowed to maintain state over the
 // partition. In this case, we're going to start the partition with
 // an empty map, and keep running counts as we go.
 public WordCount() {
   wordCounts = new HashMap<>();
 }

우리의 process 메서드는 문자열을 가져와 구성 단어로 나누고 찾은 단어로 상태를 업데이트합니다.

 // The process method is called on each record. In this case, we'll
 // split up the line and add the words to our partition-wide count.
 // This method could return per-row values if we wished, but in our
 // case, we'll just return an empty Stream because our results are
 // really for the partition as a whole.
 public Stream process(String text) {
   // Update the counts with the words in this line.
   for (String word : text.toLowerCase().split("[.,!\"\\s]+")) {
     // If we don't have an entry for the word, set the count to 1.
     // Otherwise, increment the count.
     wordCounts.compute(word, (key, value) ->
       (value == null) ? 1 : value + 1);
   }
 
   // We're waiting until the end of the partition to return per-
   // word counts and return nothing here.
   return Stream.empty();
 }

한 줄을 처리할 때 실제로 어떤 행을 반환하는 것은 아니라는 점에 유의하십시오. 이 예에서는 파티션당 개수만 계산합니다. 다음과 같이 해봅시다.

 // The endPartition routine is called at the end of the partition.
 // In our case, this will return the total counts across all lines
 // in the input.
 public Stream endPartition() {
   // Stream back the word counts for the whole partition. Calling
   // stream() on the keySet enables us to iterate lazily over the
   // contents of the map.
   return wordCounts.keySet().stream().map(word ->
     new OutputRow(word, wordCounts.get(word)));
 }
}

바로 그겁니다. 우리는 이것을 병에 담아 무대에 올리고 거기에서 등록할 수 있습니다. 또는 전체 함수 등록을 다음과 같이 인라인으로 작성할 수도 있습니다.

create or replace function wordcount(s string)
returns table(word string, count int)
language java
handler = 'WordCount'
as
$$
import java.util.HashMap;
import java.util.Map;
import java.util.stream.Stream;
 
public class WordCount {
 // A class to define the schema of our output: pairings of words
 // and counts.
 static class OutputRow {
   public final String word;
   public final int count;
 
   public OutputRow(String word, int count) {
     this.word = word;
     this.count = count;
   }
 }
 
 // The getOutputClass routine is what actually sets this class as
 // the return type for the table function.
 public static Class getOutputClass() {
   return OutputRow.class;
 }
 
 // Keeps partition-wide counts for each word seen.
 private final Map wordCounts;
 
 // Each partition will have a new instance of WordCount
 // constructed, and we are allowed to maintain state over the
 // partition. In this case, we're going to start the partition with
 // an empty map, and keep running counts as we go.
 public WordCount() {
   wordCounts = new HashMap<>();
 }
 
 // The process method is called on each record. In this case, we'll
 // split up the line and add the words to our partition-wide count.
 // This method could return per-row values if we wished, but in our
 // case, we'll just return an empty Stream because our results are
 // really for the partition as a whole.
 public Stream process(String text) {
   // Update the counts with the words in this line.
   for (String word : text.toLowerCase().split("[.,!\"\\s]+")) {
     // If we don't have an entry for the word, set the count to 1.
     // Otherwise, increment the count.
     wordCounts.compute(word, (key, value) ->
       (value == null) ? 1 : value + 1);
   }
 
   // We're waiting until the end of the partition to return per-
   // word counts and return nothing here.
   return Stream.empty();
 }
 
 // The endPartition routine is called at the end of the partition.
 // In our case, this will return the total counts across all lines
 // in the input.
 public Stream endPartition() {
   // Stream back the word counts for the whole partition. Calling
   // stream() on the keySet enables us to iterate lazily over the
   // contents of the map.
   return wordCounts.keySet().stream().map(word ->
     new OutputRow(word, wordCounts.get(word)));
 }
}
$$;

이를 통해 다음과 같이 쿼리에서 우리의 새 테이블 함수를 사용할 수 있습니다.

select author, word, count
from books,
   table(wordcount(books.first_line) over (partition by author))
order by count desc;

그러면 우리는 카운트를 얻게 됩니다.

잘하셨습니다! 정지 단어를 처리하면서 이를 개선할 수 있고, 더 큰 텍스트 말뭉치를 분석함으로써 더 나은 통찰력을 얻게 될 것입니다.

결론

Snowpark의 핵심은 Snowflake에서 기대하는 단순성, 확장성 및 보안을 유지하면서 데이터로 흥미로운 작업을 쉽게 수행할 수 있도록 하는 것입니다. 이러한 확장을 통해 Snowpark를 잘 사용할 수 있는 더 많은 기회가 있기를 바랍니다. 그리고 언제나처럼, 우리는 여러분이 생각해낸 흥미진진한 것들에 대한 소식을 들을 수 있기를 기대합니다.

Java 테이블 함수를 시작하려면 설명서를 확인하십시오. Java 함수Snowpark API에 대한 설명서도 놓치지 마십시오.

해피 해킹!