DigdagをHA構成にしてみた

こんにちは、最近のマイブームはマヌルネコ動画な新事業創造部バックエンドエンジニアの塩崎です。 今回のテックブログでは、以前にDigdagを紹介した記事の続編として、DigdagをHA構成にするためのTipsなどを紹介します。

Digdagとは

Digdagはワークフローエンジンと呼ばれるソフトウェアです。 複数個のタスク間の依存関係からなるワークフローを定義し、そのワークフローの実行及び管理を行います。 この説明だけですと、何が便利なのかいまいちピンとこない方が多いかと思います。 ですが、かゆいところに手が届く便利ソフトウェアです。 具体的なかゆいところの紹介は以前にDigdagを紹介した記事の前半部分に書かれています。 Digdagを使用したことのない方はこちらを読んでから本記事を読み進めると理解しやすいかと思います。

tech.starttoday-tech.com

さて、前回の記事ではDigdagを使うメリットの1つとしてHA構成を紹介しましたが、それを実現するための具体的な設定などについては紹介していませんでした。 今回はDigdagでHA構成を実現するために必要な構成要素や設定ファイルなどを紹介します。

HA構成にするための知識

DigdagをHA構成にするためには digdag server コマンドで起動しているプロセスが担っているいくつかの役割を理解することから始めると分かりやすいです。 以下でそれぞれのコンポーネントの説明をします。

API server

このコンポーネントはHTTPサーバーとして動作し、以下の機能を提供します。

  • タスクの状態をブラウザから確認できる機能
  • digdag push などのdigdagのクライアントモードのコマンドを受け付けるためのREST API

これはdigdag serverを起動した時に必ず有効化されます。

Agent

Agentは実際にタスクの実行を行う部分です。タスクキューからタスクを取り出し、実行をします。 信用できない環境で動かすことも考慮されているため、この処理を行うスレッドはワークフロー情報が格納されているDBと直接通信をしません。

Workflow executor

Workflow executorはワークフローの状態を監視し、次に実行するべきタスクをタスクキューにプッシュします。

Schedule executor

Schedule executorはスケジュール実行が設定されているワークフローを監視し、設定された時刻になったらワークフローを実行状態にします。

一部の機能の無効化

これらの機能の一部は digdag server 起動時にコマンドにオプションを渡すことによって無効化できます。

指定するオプション API server Agent Workflow executor Schedule executor
なし
--disable-executor-loop
--disable-local-agent
--disable-executor-loop --disable-local-agent

なお、これらのより詳細な説明はDigdag公式ドキュメントの中の以下の部分にかかれています。

Internal architecture

構成

HA構成のときの典型的なシステム構成図を以下に示します。 上で紹介した4つの機能のすべてがいずれかのサーバーで有効になっています。 以下ではそれぞれの構成要素について説明します。

f:id:vasilyjp:20180622171706p:plain

PestgreSQL

ワークフロー定義やタスクキューなどはPostgreSQLに保存されます。 システム全体をHA構成にする場合には当然ここもHA構成にする必要があります。 今回構築したシステムではAmazon AuroraのPostgreSQL互換モードをMultiAZ構成にすることによって、HA構成としました。

API Server

API Serverはそれ専用のインスタンスを用意し、ロードバランサーの後段に2台を配置しました。 これらのサーバー上でのタスクの実行を止めるために、--disable-executor-loop --disable-local-agentのオプションを指定しています。 また、ここではタスクの実行を行わないため、小さなEC2インスタンスを用いています。

Agent + Workflow executor + Schedule executor

Agent、Workflow executor、Schedule executorの機能は同一のサーバーに載せています。 digdag server を起動する時に、--disable-* オプションを指定せずに起動しています。 タスクの実行はこれらのサーバーのみで行われます。 実際には、これらもAPI Serverとしての機能も有していますが、HTTPリクエストを受けることはありません。

タスクの実行ログの場所について

digdag serverを1つのサーバーだけで運用する場合、タスクの実行ログ(タスクが標準出力に書き出した内容)をサーバー上のローカルストレージに保存することが多いかと思います。 しかし、上で紹介したような構成を取る場合、全てのサーバーが読み書きできる場所に実行ログを配置する必要があります。 その1つの方法は、NFSなどの方法を使いネットワーク内の全てのDigdagで同じディレクトリを共有する事です。

f:id:vasilyjp:20180622171718p:plain

また、Digdagはaws S3に実行ログを保存することも出来るので、今回はこちらを採用しました。 以下のような設定ファイルを全てのDigdag serverに読み込ませることによって、実行ログがS3に保存されます。 direct_downloadオプションはログをS3から直接ダウンロードするか否かを指定するためのオプションです。

log-server.type=s3
log-server.s3.bucket=<バケット名>
log-server.s3.path=<ログを配置する場所のパス>
log-server.s3.direct_download=false # S3から直にダウンロードする場合はtrueにする

HA構成にしたときにハマったこと

構成例の次に、HA構成にすることによって発生した問題とその対処法を紹介します。

ローカルストレージのファイルの扱い

DigdagをHA構成すると一般的にサーバー台数が2台以上になるため、1つのワークフローに属するタスクたちが複数台のサーバーで実行されることがあります。 そのため、タスク間でのファイルの受け渡しにサーバーのローカルストレージを使用すると、ファイルが見つからずエラーになることがあります。

+task1:
  sh>: echo 'hoge' > /tmp/hoge.txt

+task2:
  sh>: cat /tmp/hoge.txt # ファイルが見つからない場合がある

この問題を解消するためには、サーバーのローカルストレージを介したデータの受け渡しをなくす必要があります。 ローカルストレージの代わりにS3やDBなどの全サーバーから参照することのできるストレージを使うことによって、問題に対処をしました。

PostgreSQLのコネクション数

DigdagがPostgreSQLと接続する時に使用しているコネクションが多すぎると、Digdagの起動時にPostgreSQLとの接続を確立できないことがありました。 これはPosgreSQL側が受けることのできるコネクション数の上限に達すると発生する現象です。 コネクション数のデフォルトはCPUコアの数*32とかなり多めになっているので、以下のようにして、コネクション数を絞って問題に対処しました。

database.maximumPoolSize=32

サーバーが落ちた時の挙動

HA構成をとっているときにサーバーが突然死したときの挙動がどうなるのかの検証を行いました。 以下のようなタスクを実行し、sleep 10を実行している最中にDigdagプロセスをkill -9で落としてみました。

timezone: UTC

+task1:
  sh>: "echo task1 start && sleep 10 && echo task1 end"

kill -9を行ったdigdagプロセスのログは以下のようになり、task1の実行途中でサーバーが突然死をしたような挙動になっています。

2018-06-15 13:50:24 +0900: Digdag v0.9.25
2018-06-15 13:50:25 +0900 [INFO] (main): secret encryption engine: disabled
2018-06-15 13:50:25 +0900 [INFO] (main): XNIO version 3.3.6.Final
2018-06-15 13:50:25 +0900 [INFO] (main): XNIO NIO Implementation Version 3.3.6.Final
2018-06-15 13:50:25 +0900 [INFO] (main): Starting server on 0.0.0.0:65434
2018-06-15 13:50:25 +0900 [INFO] (main): Bound on 0:0:0:0:0:0:0:0:65434 (api)
2018-06-15 13:50:26 +0900 [INFO] (0042@[0:ha_sample]+sample^failure-alert): type: notify
2018-06-15 13:50:34 +0900 [INFO] (0042@[0:ha_sample]+sample+task1): sh>: echo task1 start && sleep 10 && echo task1 end
task1 start
# ここでdigdagプロセスに対してkill -9を行う。プロセスが突然死ぬので、これ以降のログは無い。

このワークフローの実行状況をweb UIで確認すると、タスクが未だ実行中という状態になっています。 そして、約5分後になると別のdigdagプロセスでタスクが実行され、ワークフローの実行が成功しました。

2018-06-15 13:55:37 +0900 [INFO] (0042@[0:ha_sample]+sample+task1): sh>: echo task1 start && sleep 10 && echo task1 end
task1 start
task1 end

Agentはキューからタスクを取り出すときにqueued_task_locksテーブルのlock_expire_timeに現在時刻から5分後のUNIX timeを書き込みます。 Agentは1分毎にこの値を現在時刻の5分後に設定するため、Agentが生きている限りは現在時刻 < lock_expire_timeになります。 一方、Agentが死んだ場合にはlock_expire_timeの更新がストップするため、約5分後になると現在時刻 > lock_expire_timeという状態になります。 この状態のタスクの存在を他のAgentが検知することによって、突然死したAgentが担当していたタスクを他のAgentが代わりに実行するという機能が実現されています。 この動作はタスクのリトライ機能とは関係ないため、リトライ回数を設定していないタスクに対しても行われます。

このような内部実装になっているため、Digdagのワークルフローを書くときには可能な限り各タスクを冪等にするべきです。

オートスケーリングとの組み合わせ

HA構成を組んだことによってオートスケーリングによるスケールアウトが簡単に行えるようになったので、その紹介もします。 構成図中でタスクを実行する役割を持っているEC2インスタンスたちに対してAutoScalingGroupの設定を行い、サーバーの起動時にDigdagが自動的に起動するよう設定します。 そして以下のようにすることによって、効率よく複数個のタスクを実行できます。 なお、Agentのサーバー台数をゼロにしてしまうとその後にサーバー台数を増やすためのリクエストを実行することすらできなくなりますので、注意が必要です。

+scale_out:
  sh>: autoscaling.sh 10

+load_tables:
  for_each>:
    table: ["table1", "table2", "table3", ... "table100"]
  _parallel: true
  _do:
    call>: load_table.dig

+scale_in:
  sh>: autoscaling.sh 1
#!/bin/bash

REGION='<region>'
AUTO_SCALING_GROUP_NAME='<auto scaling group name>'
aws --region $REGION autoscaling set-desired-capacity --auto-scaling-group-name $AUTO_SCALING_GROUP_NAME --desired-capacity $1

まとめ

今回はDigdagをHA構成で構築する方法を紹介しました。 ワークフロー管理ツールはDigdag以外にもAirflowやLuigiなどいろいろあります。 それら他のツールと比べるとDigdagはHA構成を簡単に組むことが出来るように最初から考えられている印象を受けます。 バッチをスケジュール実行するサーバーは多くのシステムでSPOFになりやすいので、HA構成にすることによって可用性をより高めましょう。

スタートトゥディテクノロジーズでは、一緒にサービスを作り上げてくれるエンジニアを大募集中です。 ご興味のある方は、以下のリンクからぜひご応募ください!