Prefect 触ってみた

Naofumi Yamada

Naofumi Yamada

Data Engineer

Prefect 触ってみた

Airflow のようなワークフローツールで、手軽にタスクやフローが定義できて好いです。 Docker サーバと通信したり、Slack 通知するタスクがあったり、動的 DAG が作れたり、ローカルテストしやすかったりします。

Kubernetes Agent から GKE デプロイする方法は要調査ですが、Runner として実装はされています。

本稿では、Prefect / First steps を実施してみます。 Colab を用意したので、そちらで触りながら、読むことができます。

Prefect とは

Airflow よりデータフローを強化しているワークフローツールです。 気になる場合には、Why Not Airflow?を読むのがおすすめです。

以下のようなことが Airflow よりお手軽にできるのもポイントです。

触ってみよう

Prefect のインストール

2020-05-11 時点最新の 0.10.7 をシュッと動かすことができなかったので、prefect==0.10.6 を使います。0.10.6 も tornado のバージョンによって動かないので tornado==4.5.3 に固定してます。

pip install -q prefect==0.10.6 tornado==4.5.3

タスクの定義

もっともお手軽なのが、関数に @task デコレータをつけてタスクにするものです。

from prefect import task
@task
def add(x, y=1):
return x + y

単一の関数より複雑なタスクはクラスで定義することもできます。

# from prefect import Task
# class AddTask(Task):
# def __init__(self, default: int, *args, **kwargs):
# super().__init__(*args, **kwargs)
# self.default = default
# def run(self, x: int, y: int=None) -> int:
# if y is None:
# y = self.default
# return x + y
# # initialize the task instance
# add = AddTask(default=1)

フローの定義

Flow コンテキストの中で、通常の関数のようにタスクを呼び出すと、Prefectがワークフローを表す計算グラフを作成します。この時点では実行されません。

from prefect import Flow, Parameter
with Flow("My first flow!") as flow:
y = Parameter("y")
first_result = add(1, y=y)
second_result = add(x=first_result, y=100)

フローを実行する

あとは定義したフローインスタンスを実行するだけです。

state = flow.run(y=2)
assert state.is_successful()
first_task_state = state.result[first_result]
assert first_task_state.is_successful()
assert first_task_state.result == 3
second_task_state = state.result[second_result]
assert second_task_state.is_successful()
assert second_task_state.result == 103