참고: 이 내용은 2021. 11. 23에 게시된 컨텐츠(Migrating Our Events Warehouse from Athena to Snowflake)에서 번역되었습니다.

Singular에는 전 세계 수백만 대의 모바일 장치에서 광고 조회, 광고 클릭 및 앱 설치에 관한 데이터를 수집하는 파이프라인이 있습니다. 이 엄청난 양의 데이터는 매시간 그리고 매일 집계됩니다. 우리는 다양한 마케팅 메트릭을 통해 이를 강화하고 고객에게 제공하여 캠페인의 성과를 분석하고 ROI를 확인할 수 있도록 지원합니다.

결과적으로 초당 수만 개의 이벤트를 수신하고 매일 수십 테라바이트의 데이터를 처리하여 수 페타바이트의 데이터 세트를 관리합니다.

이 방대하고 복잡한 데이터 웨어하우스를 Athena에서 Snowflake로 마이그레이션하는 것은 복잡한 프로세스였습니다. 이 게시물에서는 왜 우리가 이 어려운 단계를 밟기로 결정했는지, 어떻게 했는지, 그리고 그 과정에서 우리가 배운 몇 가지 교훈에 대해 논의하려고 합니다.

Snowflake 대 Athena

우리는 2018년에 Athena를 사용자 수준 이벤트 웨어하우스로 사용하기 시작했습니다. 그것은 훌륭한 솔루션처럼 보였는데, 컴퓨팅을 스토리지에서 분리하고, S3와 쉽게 통합하고, 적은 노력만 있으면 되기 때문입니다. 약간 마법 같은 느낌이었습니다.

우리는 Kinesis Streams를 통해 사용자 이벤트를 스트리밍하고 고객과 요일별로 분할된 데이터를 S3에 업로드했습니다. 파일은 Athena의 모범 사례를 사용하여 S3에 가능한 한 많이 저장했습니다.

그러나 프로덕션 1년 후, 문제가 발생하기 시작하여 다른 솔루션을 모색하게 되었습니다.

온디맨드 컴퓨팅 리소스의 부족

첫 번째 골치 아픈 문제는 크기 확장이 필요할 뿐만 아니라 온디맨드 식의 높은 컴퓨팅 파워가 필요하다는 점이었습니다. Athena는 다중 테넌트 서비스로, 쿼리가 받을 컴퓨팅 파워를 제어할 수 없습니다. 우리의 프로덕션 파이프라인은 시간당 약 4,000개의 쿼리를 실행하고 고객당 하루에 한 번 리소스 사용량이 많은 쿼리를 실행하기 때문에 컴퓨팅 리소스 부족으로 피크 시간에 장애가 발생했습니다.

불충분한 리소스 관리

사용량에 따라 컴퓨팅 리소스를 분할하고 제어할 수 없습니다. 이를테면 일부 데이터 세트를 다른 데이터 세트보다 빠르게 쿼리해야 하는 경우(시간별 쿼리 대 큰 일일 쿼리)말이죠. 우리는 고객에게 어느 정도 성능을 보장하고 싶었지만 컴퓨팅 리소스를 제어할 수 없었습니다.

즉각적인 클러스터링 부족

고객 수준 데이터를 S3에 업로드하기 전에 일정 시간(또는 크기별로) 버퍼링해야 했습니다. 버퍼링은 스트리밍 파이프라인에 대한 좋은 방법이 아니며 데이터 처리량이 많을 때는 제대로 수행되지 않았습니다. 버퍼링은 집계된 보고의 데이터 업데이트 주기에도 영향을 미쳤습니다.

특정 레코드의 지루한 삭제 및 업데이트

우리는 EU의 개인 정보 보호법(GDPR)을 준수하기 위해 최종 사용자가 특정 기록의 삭제를 요구할 때 이를 지원할 의무가 있습니다. Athena에서 파일의 특정 레코드를 삭제하는 것은 길고 복잡한 프로세스입니다.

왜 Snowflake일까요?

마이그레이션하기로 한 우리의 결정은 Snowflake가 설계상 다음과 같이 이러한 중요한 기능을 지원한다는 사실에 기반한 것입니다.

  • 웨어하우스의 사용량을 올바르게 계획하여 쿼리 컴퓨팅 파워를 제어할 수 있습니다. 다양한 인스턴스가 있는 여러 웨어하우스에 서로 다른 쿼리를 분할하여 필요한 곳에 더 많은 리소스를 부여할 수 있습니다.
  • 수집할 때 클러스터 키를 정의하고 그에 따라 데이터를 정렬하여 Snowflake의 마이크로 파티션에서 데이터를 클러스터링할 수 있습니다. 또한 자동 클러스터링을 사용하여 클러스터 키에 따라 마이크로 파티션을 최적화할 수도 있습니다. 쿼리 사용량에 따라 데이터를 현명하게 클러스터링하면 쿼리를 크게 최적화할 수 있습니다.
  • 주로 클러스터 키를 기반으로 하는 경우, Snowflake에서 삭제하는 것이 간단합니다. 특정 레코드를 업데이트하는 것은 여전히 비용이 많이 들지만 컴퓨팅 파워를 분리하고 필요에 따라 크기를 확장할 수 있기 때문에 Athena보다 훨씬 효율적입니다.

마이그레이션 이전 단계: 개념 증명

우리가 Snowflake로 POC를 계획했을 때 시스템에 필요한 모든 유형의 사용을 테스트하고 싶었습니다. 우리는 POC를 읽기, 쓰기 및 삭제의 세 가지 주요 범주로 나누었습니다.

설치 요구 사항에는 모든 고객의 1일 데이터(약 100TB), 다중 테넌트 테이블 및 인덱싱(클러스터 키 없음, 클러스터 키 1개, 클러스터 키 2개)이 포함되었습니다.

다음과 같은 네 가지 성공 기준이 있었습니다.

  • 정시에 데이터 로드: 빠르고 저렴하다는 것을 증명하기 위해 데이터 수집 파이프라인 POC가 필요했습니다.
  • 기존 쿼리 지원: Snowflake에서 현재 쿼리를 실행할 수 있음을 증명해야 했습니다.
  • 쿼리 SLA 충족: 쿼리 지속 시간을 테스트하여 Snowflake로 마이그레이션하는 것이 유일한 개선 방법임을 확인했습니다.
  • 비용 요구 사항: 우리는 Snowflake에서 프로덕션 부하를 실행하는 것이 Athena보다 저렴하게 만들고 싶었습니다(이것이 주요 프로젝트 메트릭은 아니었지만 비용이 늘어나지 않는 것이 중요했음).

Snowflake POC는 모든 면에서 성공적이었습니다. 더욱이 Snowflake 팀이 협조적이고 도움이 되었습니다.

견고한 수집 파이프라인 구축

요구 사항

우리는 다음 요구 사항을 모두 충족하는 Snowflake 수집 메커니즘을 구축해야 한다는 것을 알고 있었습니다.

  1. 견고함과 내구성. Snowflake로 수집하는 데이터의 양이 짧은 시간에 극적으로 증가할 수 있으므로 수집 파이프라인이 그에 따라 확장되어야만 합니다. 고객 중 한 곳으로 가는 트래픽이 아주 갑자기 급격히 증가할 수 있으며 이는 계획된 이벤트와 계획되지 않은 이벤트 모두에서 발생할 수 있습니다. 예를 들어 중요한 뉴스로 소셜 미디어 활동이 급증할 수 있습니다. 과거에 이러한 종류의 갑작스러운 트래픽 과부하 발생했었습니다. 따라서 전체 데이터 양이 갑자기 두 배로 증가할 것을 예상하여 수집 파이프라인을 계획하고 구축해야 합니다. 쉽게 확장될 수 있어야 합니다.
  2. 빠른 속도. 로드 프로세스가 특정 SLA를 충족해야 합니다. 단 몇 분의 지연만으로 고객이 사용자 수준 데이터를 가져올 수 있어야 합니다. 우리는 Snowflake가 반드시 이러한 임계치를 초과하여 데이터 가용성을 지연시키지 않도록 해야 합니다.
  3. 높은 가용성. 우리는 가동 중지 시간을 감당할 수 없습니다. 새로운 데이터가 1시간 이상 지연되면 고객이 알아차리기 시작합니다.
  4. 가장 일반적인 쿼리 사용에 최적화. 우리는 집계된 보고를 최신 상태로 유지하기 위해 시간당 4,000번 이상 Snowflake를 쿼리하고 있습니다. 쿼리 계획을 최적화하려면 데이터 순서 지정 및 클러스터링이 해당 사용 시나리오에 대해 최대한 준비되도록 해야 합니다.
  5. 데이터 보존 및 미사용 데이터 분리와 같은 모든 개인 정보 보호 요구 사항 준수. 일부 파트너는 사용자 수준 데이터에 대해 다른 보존 규칙을 요구합니다. 일부 파트너는 마케팅 사용자 수준 미사용 데이터를 완전히 분리해야 합니다. 우리는 이러한 개인 정보 관련 기능을 지원해야 했습니다. Athena에서는 S3에 다른 파일을 생성하여 이를 처리했습니다(파일 크기가 줄어들어 Athena의 성능에 부정적인 영향을 줌). Snowflake에서는 필요한 경우 분리를 위해 다른 테이블을 사용하여 각 파트너에 대해 서로 다른 보존 규칙을 정의할 수 있습니다.

우리는 Snowflake 수집 파이프라인을 가능한 한 군더더기 없이(lean) 자동 복구 방식으로 구축하기로 결정했습니다.

설계

다음은 우리의 설계를 상세히 살펴본 것입니다.

Singular의 스택(1)에서 Kinesis Streams의 사용자 수준 이벤트를 스트리밍합니다.

이벤트(예: 광고 클릭, 광고 조회, 앱 설치 또는 인앱 이벤트)를 일괄 처리 방식으로 스트림에서 읽어 .csv 파일(zstd로 압축)로 저장 후 Kubernetes에서 관리하는 Python 작업자(2)를 사용하여 S3 파일에 업로드합니다.

모든 파일 생성은 Snowpipe(4)가 구독하는 SNS 주제(3)에 알림을 생성합니다.

그런 다음 Snowpipe가 COPY INTO 명령을 사용하여 버킷에 추가된 모든 파일에 대해 버퍼 테이블(5)에 파일 콘텐츠를 삽입합니다.

버퍼 테이블에 저장된 데이터는 클러스터링되거나 순서대로 정렬되지 않으므로 집계된 쿼리용으로 활용하는 것은 효율적이지 않습니다.

데이터가 쿼리를 위해 조직되고 최적화되었는지 확인하기 위해 버퍼 테이블에 스트림(6)을 정의합니다.

그런 다음 Snowflake 정기적 작업(7)을 사용하여 스트림을 쿼리하고 데이터를 정렬하고 최종 테이블에 이를 삽입합니다(8).

Snowflake 서비스

이것이 우리가 사용한 주요 Snowflake 서비스입니다.

  • Snowpipe: S3와 같은 외부 소스에서 Snowflake 테이블로 데이터를 복사하는 Snowflake 관리형 대기열 서비스입니다. SNS 주제가 Snowpipe를 트리거할 수 있으므로 S3 버킷에 저장하는 모든 파일은 Snowflake의 테이블에 복사됩니다. Snowpipe의 컴퓨팅 비용은 Snowflake에서 제공하며 사용자 정의된 웨어하우스를 사용하지 않습니다.
  • 스트림: 데이터베이스의 테이블에 정의할 수 있는 Snowflake 서비스입니다. 스트림에서 읽을 때마다 마지막으로 읽은 이후의 테이블 데이터를 읽고 있으므로 변경된 데이터를 사용하여 조치를 취할 수 있습니다. (Kafka/Kinesis와 같은) 스트리밍 서비스이지만 Snowflake 테이블에서 구현됩니다.
  • 작업: Snowflake 정기적인 작업은 필요한 경우 스트림의 데이터를 다른 테이블로 넣어 사용하기 위한 훌륭한 도구입니다. 우리는 마이크로 파티션 깊이를 최적화하기 위한 일괄 처리 메커니즘으로 Snowflake 작업을 활용하고 있습니다.

모니터링, 자동 복구 및 보존 처리

Snowflake 파이프라인이 갑작스러운 데이터 버스트를 견딜 수 있도록 하기 위해 몇 가지 내부 도구를 구축하는 것 외에도 몇 가지 멋진 Snowflake 기능을 사용했습니다.

  1. 수집을 위해 최적화된 웨어하우스: 사용량에 따라 ‘무거운’ 수집 소스와 ‘가벼운’ 수집 소스를 서로 다른 창고로 나눕니다.
  2. 수평형 자동 크기 조정: 우리의 모든 웨어하우스는 멀티 클러스터이며 필요한 경우 수평적으로 확장됩니다.
  3. 작업 실패 모니터: Snowflake의 작업 메커니즘은 산발적인 실패를 쉽게 처리합니다. 작업이 실패하면 실패한 바로 그 체크포인트에서 사용 중지됩니다. 우리의 문제는 작업이 실행될 때마다 모든 스트림 데이터를 끌어온다는 것입니다. 스트림에 데이터 행이 너무 많으면 작업 시간이 초과될 수 있습니다(또는 시간이 오래 걸릴 수 있음). 데이터 버스트로 인한 수집 지연은 관리 비용이 매우 많이 듭니다. Snowflake의 작업 기록을 읽고 작업이 너무 오래 걸리는지 확인하는 내부 모니터를 구축하여 그 문제를 해결했습니다. 그런 경우, 모니터가 이를 종료하고 웨어하우스를 확장한 다음 다시 실행합니다. 나중에 이 모니터는 웨어하우스를 원래 크기로 축소합니다. 이 모니터는 기존 서비스로는 신속하게 수행할 수 없었던 기능으로, 생명의 은인입니다.
  4. 데이터 보존: 우리는 매일 정기적인 작업을 통해 데이터 보존을 효율적으로 처리하고 x일 이상 지난 데이터는 삭제했습니다(마케팅 소스/고객별 변경사항). Snowflake의 DELETE 명령은 특히 이를 클러스터 키에서 실행할 때(예: 전체 날짜의 데이터 삭제) 효율적이며 차단하지 않습니다.

매시간 수집 파이프라인의 Python Celery 작업은 수집된 데이터에 대해 Snowflake를 쿼리합니다.

크기 조정 처리

Singular에서 Snowflake를 사용하는 다양한 방법은 다음과 같이 고유한 쿼리 패턴에 추가됩니다.

  • Singular의 집계 데이터 수집 파이프라인: 각 고객의 데이터에 대해 시간별 집계 쿼리를 실행합니다.
  • 고객 관련 ETL 프로세스: Singular의 ETL 솔루션이 매시간 원시 데이터를 쿼리하여 이를 고객의 데이터베이스로 푸시합니다.
  • 고객 관련 수동 내보내기: 원시 데이터의 주문형 내보내기
  • Singular의 내부 BI.
  • GDPR ‘사용자 잊기’ 쿼리: 매일 일괄적으로 실행되는 특정 행 업데이트가 GDPR 망각 요청을 처리합니다.

우리는 매시간 수천 개의 쿼리를 실행하며 다양한 데이터 볼륨에 대해 이를 신속하게 완료해야 합니다. 또한 데이터 클러스터링에 미치는 영향을 최소화하면서 특정 행을 업데이트하려면 Snowflake가 필요합니다. 쿼리 프로세스는 Python Celery 작업자에 대해 분산적인 방식으로 정기적으로 실행됩니다.

Snowflake로 마이그레이션을 시작할 때 우리는 용량과 규모에 따라 가상 웨어하우스 크기를 관리해야 할 것이라고 가정했습니다. 분산 쿼리 패턴을 사용하면 각 작업자가 Snowflake를 직접 쿼리하여 각 쿼리와 함께 새 세션을 여는 것이 당연합니다. 이는 곧 동시 세션(Snowflake가 실질적으로 제한 없이 처리하는 동시 쿼리와는 다름)에 대한 Snowflake의 소프트 제한에 직면했음을 의미합니다.

좋은 소식은 Snowflake의 세션에 의해 제한될 필요가 없다는 것입니다. 웨어하우스 대기열 크기로만 제한되는 비동기 쿼리를 실행할 수 있습니다. 우리는 이것을 활용하고 작업을 실행할 수 있는 라이브 세션 풀을 만들기로 결정했습니다. 설계상 우리는 이것을 완전히 비동기식이며 상태 비저장으로 계획했으므로 웨어하우스 사용량에 의해서만 제한됩니다. 연결을 관리하는 상태 비저장 Python ASGI 서버를 구축하는 것은 우리를 위한 다소 간단하고 우아한 솔루션이었습니다.

우리는 SqlAlchemy Snowflake 통합, UvicornFastAPI를 사용하여 Python Snowflake 연결 풀 서비스를 구현했습니다. 그리고 이를 곧 오픈 소스로 공개하여 커뮤니티와 공유할 계획이므로 Snowflake에서 애플리케이션을 구축하는 다른 사람들이 자신의 세션 풀을 생성할 때 시간과 노력을 절약할 수 있습니다.

현재 우리는 우리의 Snowflake 연결 풀을 사용하여 여러 Snowflake 가상 웨어하우스에서 시간당 4,000개 이상의 쿼리를 실행하고 있습니다. 가상 웨어하우스는 필요한 경우 자동으로 수평으로 확장됩니다. GDPR 업데이트는 대규모 전용 웨어하우스에서 실행되므로 복잡한 업데이트 쿼리가 효율적으로 실행됩니다. 처리된 요청이 없을 떄에는 가상 웨어하우스가 자동으로 일시 중단됩니다.

데이터 유효성 검사

Singular 고객은 비즈니스 운영에 있어 우리의 데이터에 크게 의존합니다. Athena에서 Snowflake로 마이그레이션할 때 어떤 데이터 불일치도 용납할 수 없었습니다.

두 수집 파이프라인을 병렬로 실행하면서 S3에 업로드된 각 파일에 대해 Athena 및 Snowflake의 쿼리를 비교하는 지속적으로 실행되는 모니터를 구축했습니다. 이를 통해 데이터 수집에 문제가 있으면 즉시 알 수 있었습니다. 또한 Snowflake 쿼리를 시작하고 프로덕션에서 우리가 실행한 모든 Athena 쿼리의 결과를 비교했습니다.

이 모니터 구축은 데이터 마이그레이션이 불일치 없이 올바르게 수행되고 있는지 검증하는 데 도움이 되었습니다.

기타 교훈

마이그레이션이 완료되었을 때 크레딧 사용 및 설정을 더욱 면밀히 검토하여 다음과 같이 최적화할 수 있는 몇 가지 쉬운 방법을 찾았습니다.

  • 용도에 따라 창고 사용을 계획하십시오. 예를 들어 ‘무거운’ 쿼리는 자동 일시 중단이 설정된 대규모 웨어하우스에서 하루에 한 번(모두 동시에 시작됨) 실행되는 반면, 시간별 ‘가벼운’ 쿼리는 수평형 클러스터 확장성이 더 높은 소규모 웨어하우스에서 실행됩니다. 이렇게 하면 일일 사용량이 비용 최적화됩니다(무거운 쿼리를 빠르게 실행하고 대부분의 시간에 일시 중지됨). 시간별 쿼리는 동시성 규모에 필수적인 탄력성을 갖췄습니다.
  • 클라우드 서비스 비용을 이해하기 위해 Snowflake와 협력하십시오. 우리는 우리의 무거운 쿼리 패턴 때문에 테이블에서 NDV를 계산할 때 쿼리 컴파일 시간에 과도한 부하가 걸린다는 것을 발견했습니다. Snowflake가 기능 구성을 변경하여 우리를 도와주었습니다. 특정 사용 사례에는 우리의 쿼리 유형이 필요치 않아 이로 인해 쿼리 컴파일 시간을 많이 절약했습니다(많이 업데이트된 테이블에서만 새 클러스터 쿼리).
  • 쿼리 최적화: Snowflake 프로파일링 도구를 사용하여 많은 쿼리 기간을 최적화했습니다. GDPR 업데이트와 같은 복잡한 쿼리를 처리하는 데 도움이 되었습니다.

다음 단계는?

  • 데이터 공유: Snowflake와의 안전한 데이터 공유와 관련된 환상적인 기회가 있습니다. 우리는 안전한 데이터 공유를 위한 분산된 클린룸이라는 아이디어를 활용하는 것과 관련하여 Snowflake 팀과 협력하고 있습니다.
  • IP 난독 처리: 보안 직인 인증서를 유지하기 위해 IP 관련 필드에 30일 보존을 구현해야 했습니다. 이를 위해 Snowflake 마스킹 정책과 세션 변수를 활용하여 모든 IP 관련 필드를 암호화하여 저장하고 주문형으로만 암호를 해독하는 멋지고 효율적인 흐름을 구현했습니다. 우리의 솔루션에 대해 자세히 설명하는 또 다른 블로그 게시물을 게시할 계획입니다.
  • 연결 풀 오픈 소싱: 앞으로 몇 달 안에 Python Snowflake 연결 풀 서비스의 코드 소스를 공개할 계획입니다. 예를 들어 매시간 많은 수의 쿼리를 실행할 계획이어서 이 기능이 유용할 수 있다고 생각되면 연락하여 알려주세요!