注:本記事は(2021年9月9日)に公開された(Migrating Airflow from Amazon EC2 to Kubernetes)を翻訳して公開したものです。

Snowflakeのデータエンジニア、データサイエンティスト、およびデータアナリストは、情報に基づくより効果的な意思決定を下すため、また会社の戦略を推進するアナリティカルインサイトを提供するため、さまざまに設定されたスケジュールで実行される多様なETLおよびELTデータパイプラインを構築し、情報を集めています。これらのパイプラインをうまく稼働させ、設定されたスケジュールどおりにデータを提供するには、信頼性が高く、効率的で、確実なワークフロー管理システムが不可欠です。

当社のデータチームは非常に部門横断的で、財務、セールス、マーケティングIT、プロダクトといったさまざまな分野に属する開発者が、パイプラインに関して単一のモノリポジトリ上で協働しています。

Apache Airflowを使う理由

ワークフローでは、繰り返しのタスクを自動化、合理化することで、プロジェクトやビジネスの生産性と効率性が高まりますが、Apache Airflowのようなツールなしでこれらのワークフローを作成することは困難であり、タスクの数が増えてタスク間の依存関係が複雑になれば徐々に管理しきれない状況が生じます。

SnowflakeでAirflowを使う方法

Airflowは、私たちのソフトウェアスタックの中でも重要な部分です。Snowflakeのデータパイプラインと機械学習モデルは、Snowhouse(当社固有のSnowflakeインスタンスで、SaaSベースのプラットフォーム)、dbt1、およびモデリングと変換のためのAirflowによって動作しています。

Airflowは、当社のワークフロー内のステップが特定の順番かつ反復ベースで実行されているかどうかを確認するための重要な部分です。

当社の旧Airflowセットアップ:Amazon EC2

私たちがAirflow(v1.10.12)をSnowflakeで使い始めたのは2020年の初めです。下図で示す最初の実装では、Airflowを単一のAmazon Elastic Compute Cloud(EC2)インスタンス、r5.xlargeで実行していました。これで対応可能な有向非巡回グラフ(DAG)は15以下、ユーザー数は10以下でした。

私たちが使用していたAirflowサービスは、Worker、Webserver、Scheduler、Flower、Celery Executorの5つでした。加えて、Amazon ElastiCache内のRedisクラスタと、さらにAmazon Relational Database Service(RDS)でホストされるPostgreSQLデータベースを有していました。

私たちはAirflow構成とアプリケーションロジックを単一のDockerイメージにビルドしてEC2に実装してから、Airflowサービスをローンチしていました。下図は、これら旧セットアップのリリースプロセスを示しています。

EC2でのAirflow実行の課題

Snowflakeが成長するにつれてチームの数も増え、さらには探索的データアナリティクスの実施やビジネス関連の質問に回答するニーズも増えました。わずか1年の間に、Airflowクラスタ内のDAGの数は、15以下から270にまで増大しました。

結果として私たちは、DAGはいずれうまく稼働しなくなるか、スケジューラーに問題が生じるだろうと考えました。これらの問題は下記のように波及し、SLAやプラットフォーム内のデータの信頼性に悪影響をもたらしました。

  • 拡張性:1つのEC2インスタンスでは、最大で46のタスクを同時実行できていました。この数は、ピーク時に予定されていたタスクの数を考えると非常に小さなものでした。ワーカーを追加したり既存ノードの容量を増やすと、クラウドインフラストラクチャチーム内でのルーピングやAirflowサービスの再起動が必要になりました。
  • 信頼性と可用性:キューにプッシュされるタスクが増えるごとに、スケジューラーに負担がかかり、最終的にスケジューラーが応答しなくなりました。こうなると、誰かが問題を特定し、Airflowサービスを再起動するまで、業務全体を止めざるを得ませんでした。
  • 分離:Airflowのインストレーションとアプリケーションロジックは単一のDockerイメージと緊密に結合されて組み込まれていました。アプリケーションロジックを本番環境に実装する必要があるたびに、Airflowサービスを停止して再起動しなければならなかったため、実行中のDAGが突然停止するということが起こりました。

SnowflakeのKubernetes環境におけるAirflow

大規模ワークロードの実現に関する需要の急増を支え、すべての関係者のニーズを満足させるためには、Airflowセットアップを単一のEC2インスタンスから、(ほぼ)無限の拡張性を持つKubernetesに移行する必要がありました。

私たちの目標は、外部チームをループに入れずにアクセスできる、セルフサービス式で拡張性の高いリソースを提供すること、可能な限りビジネスアプリのDocker/コンテナ化をサポートし、ソフトウェア依存関係管理をシンプル化し、衝突を解消すること、ワークフローの変更ごとにAirflowを実装し直す必要なしに、動的にワークフローを更新する能力を維持すること、Airflowの健全性に関するリアルタイムの監視および警告ソリューションを提供することでした。次の図は、私たちの新しい構成を示します。

これらの目標を達成するため、私たちはairflow-helm2チャートを使用して、Celery ExecutorによりAirflow 2.0.2をKubernetesに実装しています。私たちは、Kubernetesのネイティブな自動スケーリング機能を使用し、Celeryワーカーのインとアウトの規模を自動的に調整することで、驚きの結果を得ています。リソースがタイムリーに利用可能でない場合、タスクのほとんどはキューにプッシュされるのですが、今では次の図のとおり、リソースを枯渇させることなく、タスクはSLAを満たしています。

AirflowのKubernetesPodOperatorを使用することで、Docker化されたビジネスアプリケーションをタスクとしてDAG内で実行できます。カスタムDockerのイメージにより、ユーザーはタスクの環境、構成、依存関係が完全にベキ等元であることを確認できます。これにより、DAGオーナーは、他のアプリケーションとの衝突について心配することなく、アプリケーションとその依存関係を柔軟に定義し、コンテナ化することができます。

また、KubernetesPodOperatorには、DAGファイルを通じて各タスクに追加のコンピュートリソースを提供する機能もあります。これにより、さまざまなワークロードに対してリソースを割り当て、利用されたリソースをタスクレベルで監視し、測定できるというメリットが得られます。

新しい環境に移行することで、アプリケーションロジックをインフラストラクチャーコードから切り離すことも可能になりました。今や、Airflow Kubernetesマニフェストに対するリポジトリが別途であるため、アプリケーション関連の変更を、Airflowサービスの再起動なしで実装できます。Airflowポッドのサイドカーコンテナとしてgit-syncを実行することで、ユーザーはデータパイプラインに関するフィードバックを即時に取得できます。以前の環境では毎晩のビルドを待つ必要があったため、取得までに丸一日かかっていました。

SnowflakeでのAirflowの監視と警告

私たちは、移行の一環として、Airflowクラスタの監視性能も向上させたいとも考えました。Airflowにはすぐに使える最小限の指標が用意されています。以前、DAGの立案者は、自らのDAGに関してさまざまな問題にぶつかっていましたが、包括的な監視機能や指標がなかったため、問題を徹底解明することはできませんでした。そのためチームは、問題がアプリケーションコードにあるのかAirflow自体にあるのかをすぐには判別できず、より多くの時間を調査に費やすことになっていました。

新しい環境では、次のような問いに対し、簡単に答えが得られればよいと考えていました。

  • Airflowスケジューラの健全性は、DAGやタスクのスケジュールをする上で十分か。
  • プールスポットの可用性はどうか。また「QUEUED」(キュー)状態で詰まっているタスクはないか。
  • DAGやタスクのスケジューリング遅延はどうなっているか。
  • 一番遅れているDAGやタスクはどれか。
  • ロードが失敗したDAGはないか。

下図のとおり、現在はWavefrontを使用してSnowflake内の監視や警告を行っています。私たちは、StatsDクライアントとデータ収集エージェント(Telegraf Agent on Wavefront Proxy)にプラグインしています。これは指標を取り入れ、Wavefrontサービスに安全かつ迅速に、信頼性のある形で転送する仕事を担っています。

スケジューラーが5分以上機能停止している場合、オンコールエンジニアに警告が送信されます。これらの指標により、私たちはAirflowクラスタの状態をすばやく理解し、問題を特定できます。

まとめ

このブログでは、私たちがSnowflakeでAirflowの使用をどのようにスタートさせたか、AirflowセットアップのKubernetes環境への移行を私たちに決意させた問題点は何か、Airflowクラスタを本番環境にどのように移行したか、警告および監視フレームワークをAirflowクラスタにどうセットアップしたか、Airflowクラスタをどう大規模に運用しているかといった点をご紹介しました。

今後、私たちはAirflowメタデータに基づくアナリティクスの構築に注力していきます。これにより、Airflow Kubernetesバッチオーケストレーション動作への新しいインサイトを提供できます。このメタデータを利用すれば、過去のパターンからの逸脱を検出し、データパイプライン内の変動を監視することができます。さらに、ユーザーと協力しながらデータパイプラインや環境を有効活用し、パフォーマンスを向上させるための改善点の特定にも役立ちます。


1getdbt.com

2bit.ly/3sxIoyl