注:本記事は(2021年11月23日)に公開された(Migrating Our Events Warehouse from Athena to Snowflake)を翻訳して公開したものです。

Singularには、世界中の何百万というモバイルデバイスから、アドビュー、アドクリック、アプリインストールに関するデータを取り込むパイプラインがあります。この非常に膨大なデータは時間別、日別に集計されています。私たちは、このデータにさまざまなマーケティングメトリクスを付与し、キャンペーンのパフォーマンスやROIを確認できるよう、顧客に提供しています。

結果的に、私たちは毎秒何万というイベントを受信し、毎日、数十テラバイトのデータを処理し、数ペタバイトのデータセットを管理していることになります。 

この膨大かつ複雑なデータウェアハウスをAthenaからSnowflakeへ移行することは、非常に込み入ったプロセスでした。この投稿では、私たちがなぜこの難しい手段を選択したか、どのように実行したか、そして実際の移行で何を学んだかをお伝えしたいと思います。

SnowflakeとAthenaの比較

私たちは、2018年に当社のユーザーレベルのイベントウェアハウスとしてAthenaの使用を開始しました。ストレージからコンピュートを切り離すことができ、S3との統合も容易で、手間もそれほどかからないため、素晴らしいソリューションのように思えました。魔法のようにさえ感じました。

私たちは、Kinesisストリームを通じてユーザーイベントを送信し、顧客や日付ごとに分割してデータをS3にアップロードしました。ファイルは、可能な限りAthenaのベストプラクティスを使用してS3に保存しました。

本番稼働による1年経過後、問題が出始め、他のソリューションの検討を余儀なくされました。

オンデマンドのコンピューティングリソース不足

最初の厄介な問題は、スケーリングの必要性とオンデマンドの高いコンピューティング能力でした。Athenaはマルチテナントサービスであり、個々のクエリが受けるコンピューティング能力を管理することはできません。当社の本番稼働のパイプラインでは毎時約4,000クエリを実行しており、顧客ごとに1日1回大量のクエリが送信されます。そのため、ピーク時間にはコンピューティングリソース不足による不具合が生じました。

お粗末なリソース管理

使用量に応じて、コンピューティング能力を分割して管理することはできません。例えば、データセットの中には他よりも早くクエリを処理する必要があるものもあります(毎時のクエリと日時の大量のクエリ)。私たちは顧客に対し、パフォーマンスに関してある程度の保証をしたかったのですが、コンピューティング能力の管理権限がないため、それが叶いませんでした。

すぐに使えるクラスタリングの不足

カスタマーレベルのデータは、S3へアップロードする前にしばらく(またはサイズで)バッファリングしなければなりませんでした。ストリーミングパイプラインにとって、バッファリングはあまり推奨されることではなく、データスループットが高いときのパフォーマンスもよくありませんでした。またバッファリングにより、集計レポーティングにおけるデータのアップデート頻度も影響を受けました。

特定のレコードに対する面倒な削除およびアップデート処理

EUのプライバシー法令(EU一般データ保護規則:GDPR)に従い、私たちには、エンドユーザーが望めば特定のレコードの削除をサポートする義務があります。Athenaでは、ファイルから特定のレコードを削除することは時間のかかる、複雑なプロセスとなっていました。

なぜSnowflakeなのか?

私たちが移行を決定したのは、Snowflakeであればこの重要な点が仕様ですべてサポートされている、という点に基づいています。

  • ウェアハウスの使用を正しく計画することによってクエリのコンピューティング能力を管理することが可能となります。異なるクエリを、さまざまなインスタンスを持つ異なるウェアハウスに分割することができるため、必要なところにより多くのリソースを付与することができます。
  • クラスタキーを定義し、取り込みの際にこれに基づいてデータをソートすることで、Snowflakeのマイクロパーティション内でのデータクラスタリングが可能です。また、自動クラスタリングを使用してマイクロパーティションをクラスタキーに従って最適化することもできます。クエリの使用に従ってデータを適切にクラスタリングすることで、クエリの大幅な最適化が実現できます。
  • クラスタキーに基づいていれば、Snowflake内での削除処理はシンプルです。特定レコードの更新は高額なままですが、必要に応じたコンピューティング能力の分割やスケーリングが可能なため、Athena使用時よりもはるかに効率が良くなりました。

移行前の手順:PoC(概念実証)

SnowflakeでのPoCを計画した際、当社のシステムが必要とするであろうあらゆる使用タイプについて試験したいと考えました。そこで、PoCを3つの主なカテゴリー(読み取り、書き込み、削除)に分けました。

セットアップ要件には、すべての顧客から得られる1日分のデータ(約100TB)、マルチテナントテーブル、インデックス作成が含まれます(クラスタキーなし、1つのクラスタキー、2つのクラスタキー)。

4つの成功基準:

  • 時間通りのデータローディング: データインジェストパイプラインPoCでは、高速かつ安価であることを検証。
  • 既存クエリへのサポート: 既存のクエリSnowflake上で実行できることを検証。
  • クエリSLAの達成:クエリ時間を試験し、改善にはSnowflakeへの移行しか方法がないことを検証。
  • コスト要件: Snowflakeでの本番運用がAthenaより安価であることを検証(当初のプロジェクトメトリクスではないものの、コストの増加を防ぐために重要)。

私たちが実施したSnowflake POCは、すべての面で成功基準を満たしていました。さらに、Snowflakeチームは柔軟にサポートしてくれました。

堅牢なインジェストパイプラインの構築

要件

私たちは、次のニーズをすべて満たすSnowflakeインジェストメカニズムを構築する必要がある、と理解していました。

  1. 堅牢性および耐久性。 Snowflakeに取り込むデータ量は、短時間で急増する場合があるため、それに伴うインジェストパイプラインのスケーリングが必要です。クライアントの中には、非常に唐突にトラフィックが劇的に増加するところがあり、計画的または非計画的なイベントの両方で生じました。例えば、重大なニュースが生じれば、ソーシャルメディアの動きが急に活発になります。このような唐突なトラフィックオーバーロードは過去にもありました。そのため、データ量全体が急に2倍に増えるような状況を想定してインジェストパイプラインを構築する必要があります。それには容易なスケーリングが不可欠です。
  2. 高速性。 ローディングプロセスは特定のSLAに合致している必要があります。当社の顧客は、ユーザーレベルのデータを数分の遅れのみで引き出す必要があります。私たちは、Snowflakeがこの閾値を超えることなくデータを利用可能にできるかどうか検証する必要があります。
  3. 高い可用性。 私たちにはダウンタイムを受け入れる余裕はありません。新たなデータが1時間以上も遅れれば顧客は不具合に気付くでしょう。
  4. 最も頻度の高いクエリに合わせた最適化。集計レポートを最新に保つために、毎時間、4,000回を超えるクエリがSnowflakeに送信されています。クエリプランの最適化を確実にするために、データの順序付けやクラスタリングは、当該の使用シナリオに合わせ、できる限り準備されている必要があります。
  5. データ保持やデータ分離といったあらゆるプライバシー要件の順守。ユーザーレベルのデータに対し異なる保持ルールを必要とするパートナーもいます。また、マーケティングユーザーレベルのデータを残りから完全に分離する必要のあるパートナーもいます。このようなプライバシー関連の機能もサポートしなければなりません。Athenaを使用していた際は、S3内にさまざまなファイルを作成して処理していました(そのため、ファイルサイズが縮小され、Athenaのパフォーマンスにマイナス効果が出ました)。Snowflakeでは、他のテーブルを使用して分離し、必要であれば、パートナーごとに異なる保持ルールを定義することもできます。

私たちは、できるだけ無駄がなく、セルフヒーリング機能を持ったSnowflakeインジェストパイプラインを構築することを決定しました。

設計

設計の詳細は次の通りです。

Singularのスタック (1)からユーザーレベルのイベントをKinesisストリームにストリーミング。

イベント(アドクリック、アドビュー、アプリインストール、アプリ内イベントなど)はバッチのような方式でストリームから読み取られ、csvファイルとして保存(zstdで圧縮)、そしてKubernetesが管理するPythonワーカー(2)を使用してS3ファイルへアップロードされます。

ファイルの作成ごとにSNSトピック(3)内に通知が生成され、Snowpipe(4)により発行されます。

次にSnowpipeは、COPY INTOコマンドを使用し、バケットに追加された各ファイル上でファイル内のコンテンツをバッファテーブル(5)に挿入します。

バッファテーブルに保存されたデータは、クラスト化や順序付けが行われていないため、集計クエリでの使用は効率的ではありません。 

クエリ実行に向けてデータが整理され、最適化されているのを確認するために、バッファテーブルにストリーム(6)を定義します。

次に、Snowflakeの定期タスク(7)を使用してストリームをクエリし、データをソートし、ファイナルテーブル(8)に挿入します。

Snowflake サービス

以下は、使用した主なSnowflakeサービスの一例です。

  • Snowpipe:S3のような外部ソースからSnowflakeテーブルにデータをコピーする、Snowflakeが管理するキューイングサービス。SNSトピックによりSnowpipeがトリガーされるため、S3バケットに保存される個々のファイルがSnowflake内のテーブルにコピーされます。
  • ストリーム:自社のデータベース内にあるテーブル上で定義できるSnowflakeサービス。ストリームから読み込みを行うたびに前回の読み込み以降のテーブル内のデータを読み込みます。そのため、変更のあったデータを利用して対策を講じることができます。これは(KafkaやKinesisのような)ストリーミングサービスですが、Snowflakeテーブル上に実装されます。
  • タスク:Snowflakeの周期的タスクは、ストリームから取得したデータを必要に応じて別のテーブルに取り込むための素晴らしいツールです。私たちはSnowflakeタスクを、マイクロパーティションの深さを最適化するバッチメカニズムとして使用しています。

モニタリング、セルフヒーリング、リテンション処理

Snowflakeパイプラインが突然のデータバーストにも耐えうることを確認するために、いくつかの社内ツールの構築に加えSnowflakeの素晴らしい機能を使用しました。

  1. データ取り込みに最適化されたウェアハウス: 私たちは使用量に応じ、「高負荷」と「軽め」のデータ取り込みソースを異なるウェアハウスに分割しました。
  2. 水平自動スケーリング: 当社のすべてのウェアハウスは、マルチクラスタで、必要に応じた水平スケールアップを行います。
  3. タスク失敗モニター: Snowflakeのタスクメカニズムは、散発的なタスクの失敗に容易に対応します。タスクが失敗すると、失敗した同じチェックポイントからタスクを再開します。私たちが問題視したのは、タスクが実行されるたびに、すべてのストリームデータをプルすることでした。ストリームにあまりにも多くの行のデータがあればタスクがタイムアウトする(または単に時間がかかりすぎる)可能性があります。データバーストによりデータ取り込みが長引くと、管理コストが非常に高くなります。私たちは、Snowflakeのタスク履歴を読み込み時間がかかりすぎていないかどうかをチェックする内部モニターを構築することで、この問題に対応しました。時間がかかっている場合、モニターがタスクの強制終了、ウェアハウスのスケールアップ、タスクの再実行を行います。その後、ウェアハウスを元のサイズにスケールダウンします。このモニターは、既存サービスでは迅速に達成できない機能を提供してくれる、まさに救世主と言えるものでした。
  4. データ保持:私たちは毎日、周期的タスクを使用してデータ保持を効率的に処理しており、X日(マーケティングソースや顧客により異なる)を経過したデータは削除していました。SnowflakeのDELETEコマンドは効率的かつノンブロッキングで、特にクラスタキー上(1日の全データを削除するなど)で実行しているときに効果を発揮してくれます。

毎時に、インジェストパイプライン内のPython CeleryタスクがSnowflakeに対し、取り込まれたデータのクエリを行います。

スケール処理

独自のクエリパターンを生み出すSingular内でのSnowflakeのさまざまな使用法:

  • Singularの集計データインジェストパイプライン:顧客別のデータに対し、毎時、アグリゲーションクエリを実行。
  • 顧客対応ETLプロセス: SingularのETLソリューションは毎時生データをクエリし、顧客のデータベースにプッシュします。
  • 顧客対応マニュアルエクスポート: 生データのオンデマンドエクスポート。
  • Singularの内部BI。
  • GDPR 「forget users」クエリ:特定行のアップデート、毎日のバッチでの実行、GDPR Forgetリクエスト処理。

私たちは、何千ものクエリも毎時間実行しており、多岐にわたるデータボリュームに対しこれらのクエリを短時間で完了させる必要があります。さらに、データクラスタリングへの影響を最小限に抑えつつSnowflakeに特定行を更新させる必要もあります。クエリプロセスは、Python Celeryワーカー上で分散的に周期的に実行されます。

Snowflakeへの移行を開始した際、使用やスケールに合わせた仮想ウェアハウスのサイジング管理が必要になると想定していました。当社の分散クエリパターンでは、各ワーカーが直接Snowflakeにクエリするのが自然で、クエリごとに新規セッションが開かれます。そのため、同時実行クエリにおけるSnowflakeのソフトリミットにすぐに達してしまいました(Snowflakeが事実上制限なしで処理する同時実行クエリとは異なる)。 

幸いだったのは、Snowflakeではセッションによる制限を受けないことです。非同期型クエリを、ウェアハウスのキューサイズによる制限を受けるのみで実行することができます。この点を活用し、タスクの実行が可能なライブセッションのプールを作成することにしました。完全な非同期型かつステートレスな仕様を計画し、ウェアハウス使用でのみ制限を受けるようにしました。コネクションを管理するステートレスなPython ASGIサーバーの構築は、私たちにとって非常にシンプルかつエレガントなソリューションとなりました。

私たちはSqlAlchemy Snowflake インテグレーションUvicornFastAPIを使用してPython Snowflakeコネクションプールサービスを実装しました。さらに、Snowflake上でアプリケーションを構築する他の人も、時間や手間をかけずに自身のセッションプールを作成できるよう、早期にオープンソース化しコミュニティで共有することを計画しています。

現在私たちは、Snowflakeコネクションプールを使用して、毎時4,000を超えるクエリを複数のSnowflake仮想ウェアハウス上で実行しています。仮想ウェアハウスは必要に応じ、自動的に水平スケールを実施します。GDPRアップデートは大規模の専用ウェアハウス上で実行されているため、複雑なUPDATEクエリが効率的に実行されています。処理されるリクエストがない場合、仮想ウェアハウスは自動的に停止します。

データ検証

Singularの顧客は、それぞれの事業運営において私たちのデータを非常に重要視しています。そのため、AthenaからSnowflakeへの移行に際し、いかなるデータの不一致も許容することはできませんでした。

双方のインジェストパイプラインを平行に実行しながら、S3にアップロードされた各ファイルに対して、AthenaとSnowflake上のクエリを比較する、継続に実行するモニターを構築しました。この方法で、データの取り込みに問題が発生すれば直ちに把握できるようにしました。さらに、Snowflakeクエリを開始し、その結果をプロダクションで実行した個々のAthenaクエリと比較しました。

本モニターの構築は、データ移行が適切に実行されているのか、また不一致が発生していないかといった検証に役立ちました。

その他気付いた点

移行が完了した時点で、クレジットの使用状況および設定をより詳細に試験し、最適化が可能ないくつかの簡単な方法を見出しました。

  • 使用状況に応じたウェアハウス使用計画を立てる。例えば、「負荷の高い」クエリは、自動停止機能付きの大規模なウェアハウスで1日1回実行(すべてを同時に開始)し、毎時の「負荷が軽め」のクエリは高い水平クラスタスケーリング機能付きの小規模のウェアハウスで実行させます。これにより、日々の使用のコスト最適化(負荷の高いクエリを高速で実行しほとんどの時間を停止させておく)が実現しました。毎時のクエリは、同時実行スケーリングに必要な伸縮性を確保しています。
  • クラウドサービスのコストを把握するためのSnowflakeとの連携。負荷の高いクエリパターンにより、テーブル上のNDV算出時のクエリコンパイル時間にかかる負荷も高くなることがわかりました。Snowflakeは、機能構成を変更することで、この問題に対応してくれました。これにより、当社のクエリタイプが特定のユースケースで必要としていなかったクエリコンパイル時間が大幅に短縮されました(新規クラスタのクエリは頻繁に更新されるテーブル上でのみ)。
  • クエリの最適化:多数のクエリについて、Snowflakeプロファイリングツールを使用した期間の最適化を行いました。これは、GDPR updatesのような複雑なクエリに役立ちました。

次の内容

  • データシェアリング:Snowflakeによる安全なデータシェアリングに関して、素晴らしい機会が予定されています。現在、安全なデータシェアリングのための分散型クリーンルームのアイディアの活用に向け、Snowflakeチームと連携しています。
  • IP難読化: 当社のセキュリティマーク認証を維持するためには、IP関連フィールドに対する30日間保持を実装する必要がありました。そのために、Snowflakeマスキングポリシーおよびセッション変数を使用して暗号化されたIP関連フィールドをすべて保存し、オンデマンドでのみ複合する効率的なフローを実装しました。別のブログでこのソリューションを取り上げ、深く掘り下げる予定です。
  • コネクションプールのオープンソース化:今後数か月内にPython Snowflakeコネクションプールサービスはオープンソース化される予定です。御社のビジネスに役立つようであれば(大量のクエリの毎時実行が予定されているなど)ぜひご連絡ください。