KedroでMLOps事始め

半年程度Kedroを使ってみたのでまとめていきます。バージョン0.18になりより使いやすくなっていたりするのでその辺を中心に書いていきます。

Kedroとは

LF AI & Data FoundationによってホストされているOSSであり、再現性、保守性、モジュール性に優れたデータサイエンスコードを作成するためのオープンソースPythonフレームワークです。意識せずともdata, config, srcが分離され、クリーンなコードを保ちやすくなります。また、Kedro PipelineをKubeflow PipelineやAWS Batchなどに簡単に変換が可能でMLOpsで活用しやすいのが特徴です。私はkedro-vertexaiを活用してVertex AI Pipelineと組み合わせて利用しています。

Kedroの構成要素

大まかな構成要素はNode, Pipeline, Data Catalog, Runnerです。以下個々の要素について概要を記載します。

Node

  • Python関数のラッパー
  • Pipelineの構成要素
  • あるNodeの出力が後続のNodeの入力である場合にPipelineをLinkできる
  • 関数の入力と出力をData Catalogで定義

Pipeline

  • PipelineはNodeの依存関係と実行順序を定義する構成要素
  • Pipelineは依存関係を解決することでNodeの実行順序を決定する

Data Catalog

  • Pipelineで使用するデータの定義をする構成要素
  • Nodeの入出力の名前をDataSetのKeyとしてマッピングするもの
  • 様々タイプのデータストレージに対応している
    • デフォルトで用意されているDataSetのClassはこちらの通り
    • 単にメモリに保存する場合はMemoryDataSetを使用する
    • Kedroに実装が無い場合は独自で実装可能
      • 私はrasterファイルのIO用にRasterioのクラスなどを独自に作成することが多いです
      • 独自のDataSet Classを作成する場合はKedro公式と同じようにpipelinesの下にextrasフォルダを作成し定義します
  • すべてのデータをData Catalogで定義する必要はない

Runner

  • Pipelineを実行するオブジェクト
  • Nodeの実行順序はNodeへのI/Oを基にKedroが解決
  • デフォルトではSequentialRunner、ParallelRunnerが用意されている
    • AWS Batchなどを利用する場合はRunnerを個別に定義する必要がある

動かし方

Install

以下のようにpipでインストールできます。

pip install kedro

Data Catalogでpandas経由でExcelファイルやParquetファイルを扱いたい場合は以下のようにインストールします。

pip install kedro[pandas.exceldataset,pandas.parquetdataset]

新規Kedroプロジェクトの作成

kedro newでCookiecutterベースのKedroのデフォルトtemplateからプロジェクトが作成できます。独自のtemplateを作成して、当該templateベースで作成したい場合は以下のようにstarterオプションを与えてコマンドを実行します。

kedro new --starter=templates/my_template1

Notebookでの実験からPipelineの構築

以下のコマンドでKedroの設定を事前に読み込んだNotebookを使用できます。

kedro jupyter notebook

通常のjupyter notebookと異なる点はKedroのcontextやcatalogオブジェクトを自前で定義することなく利用できることです。 Notebookでの実験が終わったら、公式ドキュメントの通りNotebookのセルにnodeタグを付与し、以下のコマンドを実行してPipelineの素となるNodeの定義を出力します。

kedro jupyter convert --all

src/<package_name>/nodesの下にnodeタグが付与されたNotebookのセルの内容が出力されるので、それを加工してsrc/<package_name>/pipeliesの下に配置します。

その後、conf/base/catalog.yml, conf/base/globals.ymlconf/base/parameters.ymlを設定し、必要があればsrc/<package_name>/pipeline_registry.pyでPipeline実行について定義します。定義したPipelineを全て使用する場合は以下のままで問題ありません。

pipelines = find_pipelines()
pipelines["__default__"] = sum(pipelines.values())

global変数を使う場合はsrc/<package_name>/pipeline_registry.pyでこちらのように設定します。 Pipelineの実行はkedro runコマンドを打つだけです。単一のNodeを実行する場合は--nodeオプション、単一Pipelineを実行する場合は--pipelineオプション、Node定義で付与されたtagに基づいて実行したい場合は--tagオプションを付けて実行します。

namespaceを使ったPipeline定義

Pipelineにnamespaceを定義することが可能であり、IOをnamespase単位で定義できます。これによりData Catalogの定義をサボれます。 例えばPipelineを以下のように定義することでPipelineの定義を抽象化できます。

    return pipeline(
        [
            node(
                func=split_data,
                inputs=["model_input_table", "params:model_options"],
                outputs=["X_train", "X_test", "y_train", "y_test"],
                name="split_data_node",
            ),
            node(
                func=train_model,
                inputs=["X_train", "y_train"],
                outputs="regressor",
                name="train_model_node",
            ),
            node(
                func=convert_model_to_onnx,
                inputs=["regressor", "X_train"],
                outputs="regressor_onnx",
                name="convert_model_node",
            ),
            node(
                func=evaluate_model,
                inputs=["regressor", "X_test", "y_test"],
                outputs=None,
                name="evaluate_model_node",
            ),
        ],
        namespace="data_science",
        inputs=["model_input_table"],
    )

Pipeline定義にnamespaceを使うとData Catalogの定義はこのように変化します。

namespace導入前

model_input_table:
  type: pandas.ParquetDataSet
  filepath: data/03_primary/model_input_table.pq
  layer: primary

X_train:
  type: pandas.xxx
  filepath: data/04_features/xxxxx

X_test:
  type: pandas.xxx
  filepath: data/04_features/xxxxx

y_train:
  type: pandas.xxx
  filepath: data/04_features/xxxxx

y_test:
  type: pandas.xxx
  filepath: data/04_features/xxxxx

regressor:
  type: pickle.PickleDataSet
  filepath: data/06_models/regressor.pickle
  versioned: true
  layer: models

regressor_onnx:
  type: pickle.PickleDataSet
  filepath: data/06_models/credit_model.pickle
  layer: models

namespace導入後

model_input_table:
  type: pandas.ParquetDataSet
  filepath: data/03_primary/model_input_table.pq
  layer: primary

data_science.regressor:
  type: pickle.PickleDataSet
  filepath: data/06_models/regressor.pickle
  versioned: true
  layer: models

data_science.regressor_onnx:
  type: pickle.PickleDataSet
  filepath: data/06_models/credit_model.pickle
  layer: models

MLOpsに向けて

AWS Batch, Kubeflow, Vertex AI PipelineやAirflowなどでデプロイが可能です。

AWS Batch

あくまでRunnerとしてAWS Batchを使用しているだけです。ローカルでも同じ時間Pythonプロセスが走るため非同期で実行したい場合には向きません。

Airflow

Kubeflow

公式ドキュメント通りの利用の仕方とサードパーティプラグインを使用した方法があります。個人的にはサードパーティプラグインを使用した方法がお薦めです。

Vertex AI Pipeline

こちらもKubeflowと同様ですので、以下同文です。サードパーティプラグインこちらです。

まとめ

今回は簡単にKedroについて紹介しました。次回以降はKedroで定義したパイプラインを活用してMLOpsパイプラインを構築する方法を紹介していければ良いなと思っています。