KedroでMLOps事始め
半年程度Kedroを使ってみたのでまとめていきます。バージョン0.18になりより使いやすくなっていたりするのでその辺を中心に書いていきます。
Kedroとは
LF AI & Data FoundationによってホストされているOSSであり、再現性、保守性、モジュール性に優れたデータサイエンスコードを作成するためのオープンソースのPythonフレームワークです。意識せずともdata, config, srcが分離され、クリーンなコードを保ちやすくなります。また、Kedro PipelineをKubeflow PipelineやAWS Batchなどに簡単に変換が可能でMLOpsで活用しやすいのが特徴です。私はkedro-vertexaiを活用してVertex AI Pipelineと組み合わせて利用しています。
Kedroの構成要素
大まかな構成要素はNode, Pipeline, Data Catalog, Runnerです。以下個々の要素について概要を記載します。
Node
- Python関数のラッパー
- Pipelineの構成要素
- あるNodeの出力が後続のNodeの入力である場合にPipelineをLinkできる
- 関数の入力と出力をData Catalogで定義
Pipeline
- PipelineはNodeの依存関係と実行順序を定義する構成要素
- Pipelineは依存関係を解決することでNodeの実行順序を決定する
Data Catalog
- Pipelineで使用するデータの定義をする構成要素
- Nodeの入出力の名前をDataSetのKeyとしてマッピングするもの
- 様々タイプのデータストレージに対応している
- デフォルトで用意されているDataSetのClassはこちらの通り
- 単にメモリに保存する場合はMemoryDataSetを使用する
- Kedroに実装が無い場合は独自で実装可能
- 私はrasterファイルのIO用にRasterioのクラスなどを独自に作成することが多いです
- 独自のDataSet Classを作成する場合はKedro公式と同じようにpipelinesの下にextrasフォルダを作成し定義します
- すべてのデータをData Catalogで定義する必要はない
- Pipelineでnamespaceを定義してあげれば中間NodeのIOはtempファイルとしてKedro側で処理してくれる
Runner
- Pipelineを実行するオブジェクト
- Nodeの実行順序はNodeへのI/Oを基にKedroが解決
- デフォルトではSequentialRunner、ParallelRunnerが用意されている
- AWS Batchなどを利用する場合はRunnerを個別に定義する必要がある
動かし方
Install
以下のようにpipでインストールできます。
pip install kedro
Data Catalogでpandas経由でExcelファイルやParquetファイルを扱いたい場合は以下のようにインストールします。
pip install kedro[pandas.exceldataset,pandas.parquetdataset]
新規Kedroプロジェクトの作成
kedro new
でCookiecutterベースのKedroのデフォルトtemplateからプロジェクトが作成できます。独自のtemplateを作成して、当該templateベースで作成したい場合は以下のようにstarterオプションを与えてコマンドを実行します。
kedro new --starter=templates/my_template1
Notebookでの実験からPipelineの構築
以下のコマンドでKedroの設定を事前に読み込んだNotebookを使用できます。
kedro jupyter notebook
通常のjupyter notebookと異なる点はKedroのcontextやcatalogオブジェクトを自前で定義することなく利用できることです。 Notebookでの実験が終わったら、公式ドキュメントの通りNotebookのセルにnodeタグを付与し、以下のコマンドを実行してPipelineの素となるNodeの定義を出力します。
kedro jupyter convert --all
src/<package_name>/nodes
の下にnodeタグが付与されたNotebookのセルの内容が出力されるので、それを加工してsrc/<package_name>/pipelies
の下に配置します。
その後、conf/base/catalog.yml
, conf/base/globals.yml
やconf/base/parameters.yml
を設定し、必要があればsrc/<package_name>/pipeline_registry.py
でPipeline実行について定義します。定義したPipelineを全て使用する場合は以下のままで問題ありません。
pipelines = find_pipelines() pipelines["__default__"] = sum(pipelines.values())
global変数を使う場合はsrc/<package_name>/pipeline_registry.py
でこちらのように設定します。
Pipelineの実行はkedro run
コマンドを打つだけです。単一のNodeを実行する場合は--nodeオプション、単一Pipelineを実行する場合は--pipeline
オプション、Node定義で付与されたtagに基づいて実行したい場合は--tag
オプションを付けて実行します。
namespaceを使ったPipeline定義
Pipelineにnamespaceを定義することが可能であり、IOをnamespase単位で定義できます。これによりData Catalogの定義をサボれます。 例えばPipelineを以下のように定義することでPipelineの定義を抽象化できます。
return pipeline( [ node( func=split_data, inputs=["model_input_table", "params:model_options"], outputs=["X_train", "X_test", "y_train", "y_test"], name="split_data_node", ), node( func=train_model, inputs=["X_train", "y_train"], outputs="regressor", name="train_model_node", ), node( func=convert_model_to_onnx, inputs=["regressor", "X_train"], outputs="regressor_onnx", name="convert_model_node", ), node( func=evaluate_model, inputs=["regressor", "X_test", "y_test"], outputs=None, name="evaluate_model_node", ), ], namespace="data_science", inputs=["model_input_table"], )
Pipeline定義にnamespaceを使うとData Catalogの定義はこのように変化します。
namespace導入前
model_input_table: type: pandas.ParquetDataSet filepath: data/03_primary/model_input_table.pq layer: primary X_train: type: pandas.xxx filepath: data/04_features/xxxxx X_test: type: pandas.xxx filepath: data/04_features/xxxxx y_train: type: pandas.xxx filepath: data/04_features/xxxxx y_test: type: pandas.xxx filepath: data/04_features/xxxxx regressor: type: pickle.PickleDataSet filepath: data/06_models/regressor.pickle versioned: true layer: models regressor_onnx: type: pickle.PickleDataSet filepath: data/06_models/credit_model.pickle layer: models
namespace導入後
model_input_table: type: pandas.ParquetDataSet filepath: data/03_primary/model_input_table.pq layer: primary data_science.regressor: type: pickle.PickleDataSet filepath: data/06_models/regressor.pickle versioned: true layer: models data_science.regressor_onnx: type: pickle.PickleDataSet filepath: data/06_models/credit_model.pickle layer: models
MLOpsに向けて
AWS Batch, Kubeflow, Vertex AI PipelineやAirflowなどでデプロイが可能です。
AWS Batch
あくまでRunnerとしてAWS Batchを使用しているだけです。ローカルでも同じ時間Pythonプロセスが走るため非同期で実行したい場合には向きません。
Airflow
Kubeflow
公式ドキュメント通りの利用の仕方とサードパーティプラグインを使用した方法があります。個人的にはサードパーティプラグインを使用した方法がお薦めです。
Vertex AI Pipeline
こちらもKubeflowと同様ですので、以下同文です。サードパーティプラグインはこちらです。
まとめ
今回は簡単にKedroについて紹介しました。次回以降はKedroで定義したパイプラインを活用してMLOpsパイプラインを構築する方法を紹介していければ良いなと思っています。