Airflowで名前に規則性がないGCSのオブジェクトを検知したい
「Cloud ComposerでSecret変数を使う」でも紹介しておりますが、弊社ではワークフロー オーケストレーションツールとして、Airflowをベースにした、フルマネージドのCloud Composerを利用しております。
このAirflowですが、様々な処理をテンプレート化したオペレータを多数取り揃えております。
その中で、ワークフローの途中でGoogle Cloud Storage(GCS)バケット内のファイルの存在をチェックする処理を入れたいというのは、よくあるシチュエーションかと思います。
今回は、その中でも特に名前に規則性がないオブジェクトを検知する方法について紹介したいと思います。
AirflowでGCS内のオブジェクトを検知する方法
まず、どういったときにGCS内のオブジェクト検知が必要になるのか紹介したいと思います。
例えば、GCSからBigQueryにデータをロードするワークフローがあるとします。
しかし、いざデータロードを行うためにGCS内のオブジェクトを見に行ったときに、該当のオブジェクトが存在していない場合エラーになってしまい、ワークフローが中断されてしまいます。
そのため、GCS内のファイルを指定された期間監視して、ファイルの到着を検知したら後続のタスクを実行するようなタスクが必要になってきます。
ここで利用できるのが、センサと呼ばれるもので、Airflowで用意されている、何かアクションが起きるまで待機するオペレータの一種になります。
今回のGCS内のオブジェクトを監視するにはGCSObjectExistenceSensorというセンサが用意されているので、これが使えそうです。
GCSObjectExistenceSensorでワイルドカードを使う
GCSObjectExistenceSensor
GCSObjectExistenceSensor
とは読んで字の如く、GCS内のファイルの存在をチェックするセンサになっています。
基本的なパラメータには以下のようなものがあります。
-
bucket
: オブジェクトがあるGCSのバケット名 -
object
: GCS内のオブジェクト名(プレフィックスも含む) -
use_glob
: Trueに設定するとobject
パラメータがglobとして解釈される -
google_cloud_conn_id
: GCSに接続するためのコネクションID -
timeout
: タスクがタイムアウトして失敗するまでの経過時間
実はこのuse_glob
というパラメータですが、ライブラリのバージョン10.13.0から実装されているパラメータでリリースが2023年12月28日ということなので、比較的新しい機能になります。
では、実際にこのオプションを利用してオブジェクトの検知を行う方法を紹介したいと思います。
DAGの実装例
例えば、あるバケット内のオブジェクトを検知したいときに、オブジェクト名がlogs_[ランダム文字列].csv
のようになっており、[ランダム文字列]
の部分にはどのような文字列が入るか分からないとします。
こういったシチュエーションの際に、use_glob
というパラメータにTrue
を指定することで、object
パラメータの値にワイルドカードを利用できるようになります。
以下が、use_glob
を利用したDAGの実装例になります。(your_project_id
は各自のGoogle CloudのプロジェクトIDを入力)
import os from airflow.models import DAG from airflow.operators.empty import EmptyOperator from airflow.providers.google.cloud.sensors.gcs import GCSObjectExistenceSensor from airflow.utils.dates import days_ago PYTHON_FILENAME = os.path.basename(__file__)[:-3] PROJECT_ID = "your_project_id" GCS_BUCKET_NAME = "airflow_test_gcs_bucket" GCS_OBJECT_NAME = "foo/logs_*.csv" default_args = { "owner": "airflow", "project_id": PROJECT_ID, "retries": 3, } with DAG( dag_id=PYTHON_FILENAME, default_args=default_args, start_date=days_ago(1, hour=0, minute=0, second=0), schedule=None, catchup=False, ) as dag: # GCS内へのファイル到達を確認する exist_sensor = GCSObjectExistenceSensor( task_id="check_file_exists", bucket=GCS_BUCKET_NAME, object=GCS_OBJECT_NAME, use_glob=True, timeout=3600.0, ) # 空のタスク dummy_task = EmptyOperator(task_id="dummy_task") exist_sensor >> dummy_task
今回は、オブジェクトの検知を行い、見つかったら後続の空のタスクを実行するようなワークフローとします。
実行結果
デプロイ完了後、Cloud Composerのコンソール上で手動実行すると以下のような待機状態になります。
ここで、logs_[ランダム文字列].csv
を満たすファイルをGCSにアップロードしてみます。
無事にタスクが成功して、後続のタスクまで実行されました。
今回は後続のタスクをダミーの空タスクにしましたが、実際はGCSToBigQueryOperator
等を利用して、オブジェクトの存在を確認してからBigQueryに取り込むなどのワークフローが考えられるかと思います。
また、
GCS_OBJECT_NAME = "*/logs_*.csv"
のようにプレフィックスにもワイルドカードの指定は可能です。
その他の方法
ちなみに、今回利用したGCSObjectExistenceSensor
以外にもGCSObjectsWithPrefixExistenceSensorを利用することで、特定のプレフィックスを持ったオブジェクトを検知でき、検知したオブジェクトの情報をクロス・コミュニケーション(XComs)を用いて、後続タスクに渡すことができます。
一見、こちらの方が、汎用性は高そうなのですが、例えば「logs_qwertyuiop.csv」と「logs_qwertyuiop.json」のようにオブジェクトのサフィックスのみ異なっていると、片方のオブジェクトのみの検知ができないなど、GCSObjectExistenceSensor
でuse_glob
パラメータを利用した方がオブジェクト検知の観点だけでいうと柔軟性は高そうです。
このように、それぞれのセンサごとに特徴が異なっているため、ユースケースに応じた使い分けが重要になってきます。
まとめ
今回は、AirflowでGCSのオブジェクト検知を行うときに規則性がないオブジェクト名であっても検知ができる方法を紹介しました。
本文中にもあったように、今回の機能は昨年12月にリリースされたものであり、数か月前までできないと思っていたことが、できるようになっているアップデートの早さはOSSの強みかと思います。
今後もAirflow(Cloud Composer)に限らず、各種サービスのアップデートは定期的にチェックするようにしておきたいですね。