Python
機械学習
scikit-learn
python3

Python でのデータ分析作業をスマートにするために

はじめに

機械学習を使った分析業務といっても、データの取り込み、モデリング、プロダクト化といった様々なステージに分けられ、それぞれのステージ特有のボトルネックがあるかと思います。私はその全てに対する処方箋を提示できるわけではありませんが、今回は特に、Pythonを使った機械学習モデリングのステージについてご意見しようという魂胆です。

Python は scikit-learn など機械学習用のモジュールが充実しているので、予測モデルの作成によく使われていると思いますが、予測モデルの作成というのは試行錯誤の繰り返しで、頻繁に出戻りが発生します。あるいはモデル作成のパートを分業せず、一人で試行錯誤という場面も多いと思いますが、多くの人は Jupyter notebook のような対話型エディタで作業していると思います。これも、作業が長引くにつれて、相当見づらいコードが出来上がってしまうことが予想されます。

Python で機械学習する際には scikit-learn モジュールが半ばデファクトスタンダードになっています。XGBoost とか LightGBMKeras のようなモジュールも、 scikit-learn の構文に準じたAPIを用意しています。そこで、scikit-learn の構文で統一し、分析作業当事者にも第三者にも見やすくしてしまおうというのが今回の話です。

具体的には、以下のトピックについて言及します
1. scikit-learn の transformer, estimator, Pipeline について
2. scikit-learn 0.20 で追加予定のクラス
3. transformer/estimator 自作のすすめ

機械学習をフローでとらえる

機械学習モデルは、全体的に見れば、データを入力し、予測値を出力するというシンプルな関数です。その流れはフローチャートで表わせます。scikit-learn
では、教師あり学習モデルを作るのに必要なクラスは以下の2つに分けられます。

  • transformer (変換器) は X (と y) を入力して、変換された X’ を出力するクラスです。標準化、カテゴリカル変数の変換などいわゆる「前処理」の処理をします1。パラメータを決定する(例えば、標準化なら平均・分散値を保持しておく必要があります) fit, 入力を変換する transform, 両者の処理を一度に行う fit_transformというメソッドを持っています。また、オプションで inverse_transform という逆変換をかけるメソッドのあるものもあります。
  • estimator (推定器), は (X, y) を入力して y’ を出力するクラスです。いわゆる狭義の学習アルゴリズムです。もう少し細かいことをいうと、 推定器はさらに regressor (回帰器) と classifier (分類器) の2種類に分類されます2

変換器、推定器のデータの流れを図で表すとそれぞれ以下のようになります。変換器については、 y の入力が必須ではないので破線で表しています。
transformer.png
estimator.png

整形され、訓練・テストに分けられたデータ X_train, y_train, X_test, y_test (例えば、 numpy 配列または pandas のデータフレームとします) がある状態から予測値を出力するまでは、 基本的に以上の変換器と推定器だけを使うため、

(X, y) -> 変換器 -> 変換器 -> ... -> 推定器 -> 予測値

という流れで表現できるはずです。例えば、「入力データを標準化し、elastic net で学習させ、テストデータの予測値を出力する」という処理は、データのフローが図のようになります。
nopipeline1.png

図での transformer が標準化変換器に、 estimator が elastic net 推定器に対応します。これは次のように書けます。

from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import ElasticNet
from sklearn.metrics import mean_squared_error
from sklearn.datasets import make_regression  # 疑似データ作成用
from sklearn.model_selection import train_test_split  # 疑似データ作成用


# 疑似データ作成
X, y = make_regression(n_samples=100, random_state=42)
X_train, X_test, y_train, y_test = train_test_split(X, y)

# 変換器・推定器オブジェクト作成
standardizer = StandardScaler()
reg = ElasticNet(random_state=42)

# 学習
standardizer.fit(X_train)
reg.fit(standardizer.transform(X_train), y_train)

# テストデータに対して予測
prediction = reg.predict(standardizer.transform(X_test))

# 評価
mean_squared_error(y_test, prediction)

となり、標準化変換器 standardizer とelastic net 推定器 reg について、それぞれ順に .fit() メソッドを呼び出すだけで学習が完了します。
さらに、elastic net で学習する際に、正則化パラメータをグリッドサーチ交差検証 (GSCV) で求めたいならば、GridSearchCV クラスが使えます。このクラスは、推定器をコンストラクタの引数にとり、このオブジェクト自体が同時に推定器でもあります。よって、データのフローが図のようになります。
noPipelineGSCV.png

from sklearn.model_selection import GridSearchCV

# 変換器・推定器オブジェクト作成
standardizer = StandardScaler()
reg = ElasticNet(random_state=42)
gscv = GridSearchCV(reg,
                    param_grid={'alpha': [1.0, 0.5, 0.1]},
                    cv=2)

# 学習
standardizer.fit(X_train)
gscv.fit(standardizer.transform(X_train), y_train)

# テストデータに対して予測
prediction = gscv.predict(standardizer.transform(X_test))

# 評価
mean_squared_error(y_test, prediction)

しかし、実際の作業はもっと複雑な前処理や試行錯誤を伴います。たとえば、入力データの中にカテゴリカル変数がある場合は直接標準化できないので、one-hot encoding したり、あるいは不要な列を除去したり、主成分分析 (PCA) で次元削減をしたり、といった操作が必要になります。そして、推定器の出力する結果いかんで、この前処理のフローは何度も変更されることでしょう。また、前処理の工程が増え、複数の変換器を使うようになると、予測値を出力する際にも変換器を同じ順で呼び出さねばなりません。
Pipeline クラスは

変換器 -> 変換器 -> ... -> 学習器

のフローを一括して1つのクラスオブジェクトでまとめられます3
PipelineGSCV.png

これを使って、より複雑な処理フローをまとめてみます (結果として誤差が大きくなっていますが動作確認なので気にしないでください)。

from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import ElasticNet
from sklearn.model_selection import GridSearchCV
from sklearn.metrics import mean_squared_error
from sklearn.decomposition import PCA
from sklearn.pipeline import Pipeline

# 変換器・推定器オブジェクト作成
standardizer = StandardScaler()
pca = PCA()
reg = ElasticNet(random_state=42)
gscv = GridSearchCV(reg, param_grid={'alpha': [1.0, 0.5, 0.1]}, cv=2)

pl = Pipeline([
        ('standardize', standardizer),
        ('pca', pca),
        ('gs_elastic_net', gscv)
        ])

pl.fit(X_train, y_train)

# テストデータに対して予測
prediction = pl.predict(X_test)

# 評価
mean_squared_error(y_test, prediction)

この他にも、変換器を並列化できる FeatureUnion というクラスがあります。
featureunion.png

Pipeline でだいぶプログラムの見通しが良くなりました。このように、前処理をステップごとに変換器クラスにまとめてしまえば、コードは比較的シンプルになり、変更もしやすくなります。一方で、より実践的な使用法を考えると、次のような問題が残されています。

  1. 従属変数 y の変換に対応していない。ベクトルなので自分で書くことができるが、見づらくなる

  2. 列ごとの変換ができない (数値変数なら対数、文字変数なら count encoding といったふうに列ごとに変換器を使い分けられない)

  3. Pipelineではスタッキング (アンサンブル学習) ができない

(1) については、 LabelBinarizerLabelEncoder
という、クラス分類タスク用のクラスがありますが、従属変数の変換が必要になるのはむしろ回帰器タスクのことが多いです。対数をとったり、平方根をとったり、割合に変換したりと、タスクに応じていろいろな操作が要求されるはずです。

y = np.log(y)
y=y**2
y = f(y)
...

def f_inv(y):
    ...

prediction = f_inv(np.log(np.sqrt(pl.predict(X_test))))

このような処理を何度も書き直すと煩雑になります。
(2) は、列ごとに異なる処理をする場合、 Pipeline など現行のモジュールでは不十分です。
(3) については、Pipeline は直列的な流れしか構築できないためです。バギングはクラスがすでに用意されているので可能ですが、異なる学習アルゴリズムを並列化する異質アンサンブルはできません。

現行バージョン 0.19.1 ではまだ存在しませんが、0.20 開発版ではこれらを解決するクラスが登場しています。(1) については、TransformedTargetRegressor4、 (2) に対しては ColumnTransformer があります5。以下の図のように、X の列ごとに異なる変換器を充てがうことができます。

col_transformer.png

(3) については、mlxtend モジュールで、 scikit-learn 対応のクラス StackingClassifier, StackingCVClassifier, StackingRegressor, StackingCVRegressor が提供されています6。そのため、このクラスに複数の推定器オブジェクトや Pipeline オブジェクトを与えれば、sklearn のクラスとして扱えます7

変換器の自作

しかし、(1), (2) については、開発版のためかまだ問題があります。TransformedTargetRegressor
は、y に対して任意の変換器を指定できますが、例えば、「対数変換してから標準化する」というふうに、複数の変換器を噛ませる処理ができません。両方の処理を行う変換器クラスを1から作るのは、本末転倒感があります。また、そこで、今回は複数の変換器を噛ませられる簡易的なクラスを作成しました。(2) の ColumnTransformer についても、私が確認した時点ではサンプルプログラムですらエラーが発生し正常終了しないという状態でした。そこで、似たような挙動をする同名のクラスを自作しました。以下のコードには3つのクラスが定義されています。

長いので折りたたみ
#! /usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Sun Jun 17 01:19:29 2018
sklearn supplemental classes

あると便利な sklearn クラス
@author: ks
"""
import numpy as np
import warnings
from sklearn.base import BaseEstimator, RegressorMixin, TransformerMixin, clone
from sklearn.utils.metaestimators import _BaseComposition
from sklearn.utils.validation import check_is_fitted
from sklearn.utils import check_array


class ChainTransformedTargetRegressor(BaseEstimator, RegressorMixin):
    """
    transformers をリストで与える以外は TransformedTargetRegressor と同じ
    """
    def __init__(self, regressor=None, transformers=None):
        self.regressor = regressor
        self.transformers = transformers

    def _fit_transformers(self, y):
        self.transformers_ = self.transformers
        for i in range(len(self.transformers)):
            y = self.transformers_[i].fit_transform(y)
        # XXX: sample_weight is not currently passed to the
        # transformer. However, if transformer starts using sample_weight, the
        # code should be modified accordingly. At the time to consider the
        # sample_prop feature, it is also a good use case to be considered.

    def fit(self, X, y, sample_weight=None):
        """Fit the model according to the given training data.
        Parameters
        ----------
        X : {array-like, sparse matrix}, shape (n_samples, n_features)
            Training vector, where n_samples is the number of samples and
            n_features is the number of features.
        y : array-like, shape (n_samples,)
            Target values.
        sample_weight : array-like, shape (n_samples,) optional
            Array of weights that are assigned to individual samples.
            If not provided, then each sample is given unit weight.
        Returns
        -------
        self : object
        """
        y = check_array(y, accept_sparse=False, force_all_finite=True,
                        ensure_2d=False, dtype='numeric')

        # store the number of dimension of the target to predict an array of
        # similar shape at predict
        self._training_dim = y.ndim

        # transformers are designed to modify X which is 2d dimensional, we
        # need to modify y accordingly.
        if y.ndim == 1:
            y_2d = y.reshape(-1, 1)
        else:
            y_2d = y
        self._fit_transformers(y_2d)

        if self.regressor is None:
            from sklearn.linear_model import LinearRegression
            self.regressor_ = LinearRegression()
        else:
            self.regressor_ = clone(self.regressor)

        # transform y and convert back to 1d array if needed
        y_trans = y_2d
        for i in range(len(self.transformers_)):
            y_trans = self.transformers_[i].transform(y_trans)
        # FIXME: a FunctionTransformer can return a 1D array even when validate
        # is set to True. Therefore, we need to check the number of dimension
        # first.
        if y_trans.ndim == 2 and y_trans.shape[1] == 1:
            y_trans = y_trans.squeeze(axis=1)
        if sample_weight is None:
            self.regressor_.fit(X, y_trans)
        else:
            self.regressor_.fit(X, y_trans, sample_weight=sample_weight)

        return self

    def predict(self, X):
        """Predict using the base regressor, applying inverse.
        The regressor is used to predict and the ``inverse_func`` or
        ``inverse_transform`` is applied before returning the prediction.
        Parameters
        ----------
        X : {array-like, sparse matrix}, shape = (n_samples, n_features)
            Samples.
        Returns
        -------
        y_hat : array, shape = (n_samples,)
            Predicted values.
        """
        check_is_fitted(self, "regressor_")
        pred = self.regressor_.predict(X)
        if pred.ndim == 1:
            pred_trans = pred.reshape(-1, 1)
        else:
            pred_trans = pred
        for i in reversed(range(len(self.transformers_))):
            pred_trans = self.transformers_[i].inverse_transform(pred_trans)
        if (self._training_dim == 1 and
                pred_trans.ndim == 2 and pred_trans.shape[1] == 1):
            pred_trans = pred_trans.squeeze(axis=1)

        return pred_trans


class ColumnTransformer(_BaseComposition, TransformerMixin):
    """"
    sklearn 0.20dev の同名クラスが現時点で動かないので代用
    sklearn で実装されたら消す
    --------
    transformers: タプルリスト。タプルは (transformer 名, transformer, 対象列) で表す
    remainder: 一度も指定されなかった列をどうするか。 'patssrough' でそのまま出力。
              'drop' で出力しない。デフォルト: 'passthrough'
    """
    def __init__(self, transformers, remainder='passthrough'):
        self.transformers = transformers
        self.remainder = remainder

    def fit(self, X, y=None):
        X = X.copy()
        if type(X).__name__ == 'DataFrame':
            for n, t, c in self.transformers:
                t.fit(X=X[c].values, y=y)
        elif type(X).__name__ == 'ndarray':
            for n, t, c in self.transformers:
                t.fit(X=X[:, c], y=y)
        return self

    def transform(self, X, y=None):
        if type(X).__name__ == 'DataFrame':
            X_ = [t.transform(X[c].values, y) for n, t, c in self.transformers]
        elif type(X).__name__ == 'ndarray':
            X_ = [t.transform(X[:, c], y) for n, t, c in self.transformers]
        if self.remainder == 'passthrough':
            remainders = tuple(set(range(X.shape[1])) - set(
                    [c for _, _, cols in self.transformers for c in cols]))
            if remainders is not None:
                X_ += [X[:, remainders]]
        elif self.remainder == 'drop':
            pass
        else:
            raise ValueError('paththrough must be `passthrough` or `drop`.')
        print([x.shape for x in X_])
        X_ = np.hstack(X_)
        return X_

    def fit_transform(self, X, y=None):
        return self.fit(X, y).transform(X, y)


class LabelEncoderM(BaseEstimator, TransformerMixin):
    """
    sklearn.preprocessing.LabelEncoder の複数列バージョン
    ------
    col_index: 変換する列を指定。 None ならば全列が対象。デフォルト: None
    handle_unknown: transform 時に、fit の入力 X にないラベルがあった場合の対応。
                    'impute' で補完、 'raise' でエラーを, 'warn' で警告を出して補完
                    デフォルト: warn
    impute_value: transform 時の補完値。handle_unknown で
                  'warn', 'impute' のいずれかを指定したときに有効。default: 0
    """
    def __init__(self, col_index=None, handle_unknown='warn', impute_value=0):
        self.col_index = col_index
        self.handle_unknown = handle_unknown
        self.impute_value = impute_value

    def fit(self, X, y=None):
        X = X.copy()
        if type(X).__name__ == 'DataFrame':
            if self.col_index is not None:
                X = X[self.col_index].values
        elif type(X).__name__ == 'ndarray':
            if self.col_index is not None:
                X = X[:, self.col_index]
        else:
            X = np.array(X)
        self.labels_ = [
                {k: v for v, k in enumerate(np.unique(c))} for c in X.T]
        return self

    def transform(self, X, y=None):
        X = X.copy()
        if type(X).__name__ == 'DataFrame':
            if self.col_index is not None:
                X = X[self.col_index].values
        elif type(X).__name__ == 'ndarray':
            if self.col_index is not None:
                X = X[:, self.col_index]
        else:
            X = np.array(X)
        X = X.astype('object')
        if len(self.labels_) != X.shape[1]:
            raise IndexError('dimension of classes and X is wrong')
        for i in range(X.shape[1]):
            X[:, i] = [self.labels_[i].get(k) for k in X[:, i]]
            if self.handle_unknown in ['raise', 'warn'] and np.isin(
                    None, X[:, i]):
                if self.handle_unknown == 'raise':
                    raise ValueError("unknown label found at "
                                     "%s' th feature column" % i)
                else:
                    warnings.warn("unknown label found at "
                                  "%s' th feature column" % i)
            X[np.equal(X[:, i], None), i] = self.impute_value
        return X.astype(float)

    def fit_transform(self, X, y=None):
        return self.fit(X, y).transform(X, y)

まず、私家版 ColumnTransformer について説明します。将来正式リリースされる ColumnTransformer とは仕様が異なるので、注意してください。タプルのリストで、どの列にどの変換器を与えるかを指定できます。タプルの0番目の要素が変換器の名称で、任意の名前を付けられます。1番目が変換器のオブジェクトです。2番目がどの列に適用するかになります。突貫工事なので、iterable なオブジェクトでのみの指定になり、スライスは指定できません (remainder=’drop’ を指定した場合のみスライスでもエラーが出ません)。scipy の疎行列にも対応していません。

次に、今回新たに作成した、 yを変換する回帰器ChainTransformedTargetRegressor クラスは、ベースとなる回帰器と、y に噛ませる変換器のリストを与えることができます。scikit-learn で用意されている、 FunctionTransformer を使って、対数変換と、その逆変換である指数変換をする変換器オブジェクトを作ります。FunctionTransformer は、任意の関数を変換器オブジェクトにするクラスです。yは復元する必要があるので、逆変換も指定しています。これで、 y を対数変換 -> 標準化してから elastic net で学習するような回帰器を作成できます。
tanstargetestimator.png

最後に、LabelEnocderM ですが、 scikit-learn の LabelEnocder は y のラベルを別の数値に1対1に変換するものなので、複数列には対応していません。ここでは、特徴量のカテゴリカル変数を数値に変換するために作成しました。実際にはこういう特徴量抽出が意味を持つことはあまりないですが、たとえば OneHotEncoder は値が str 型だと変換できないため、LabelEncoderM をかませるとコードがシンプルになります8

from sklearn.preprocessing import FunctionTransformer

#  ---- 疑似データ作成 ----
from sklearn.model_selection import train_test_split
import numpy as np
X = np.random.normal(size=100*10).reshape(100, 10)
y = np.exp(np.dot(X, np.array([[1] * 10]).T)).ravel()
X_ = np.random.choice(['a', 'b', 'c'], size=100).reshape(-1, 1)
X_ = X_.astype('object')
X = np.hstack([X, X_])
X_train, X_test, y_train, y_test = train_test_split(
        X, y, test_size=.3, random_state=42)
# ---- 疑似データ作成ここまで -----

# オブジェクト作成
featuring = ColumnTransformer([
        ('std', StandardScaler(), range(10)),
        ('label', LabelEncoderM(), (10, ))
        ])
log_trans = FunctionTransformer(func=np.log, inverse_func=np.exp)
log_elastic = ChainTransformedTargetRegressor(
        ElasticNet(random_state=42), [log_trans, StandardScaler()])
gscv = GridSearchCV(
        log_elastic,
        param_grid={'regressor__alpha': [1.0, 0.5, 0.1]},
        cv=2)
pl = Pipeline([('preprocess', featuring), ('regressor', gscv)])

# 学習
pl.fit(X_test, y_test)

機械学習の前処理やアルゴリズムはこれからも次々考案されることでしょうが、このように fit, predict, transform などのメソッドを持つ、 API として最低限の機能の簡単なクラスだけでも作成すれば、 scikit-learn の他のクラスと連携できるので見通しが良くなります。


  1. y が必要になる前処理として、たとえば target encoding のような変換処理があります。  

  2. https://qiita.com/roronya/items/fdf35d4f69ea62e1dd91 など日本語の解説もあるので細かい話は省略します。 

  3. Pipeline クラスの詳しい仕様は公式ドキュメント http://scikit-learn.org/stable/modules/generated/sklearn.pipeline.Pipeline.html か、 http://tkzs.hatenablog.com/entry/2016/06/26/093502, https://hayataka2049.hatenablog.jp/entry/2018/02/22/234011 などの日本語の紹介記事を読んでください。 

  4. http://scikit-learn.org/dev/modules/generated/sklearn.compose.TransformedTargetRegressor.html 

  5. http://scikit-learn.org/dev/modules/generated/sklearn.compose.ColumnTransformer.html 

  6. mlxtend のスタッキングについても、https://qiita.com/altescy/items/60a6def66f13267f6347 ですでに解説されているので省略します。 

  7. ただし、データのサイズが大きく、処理にリソースが要求されるようならば、推定器を並列にするようなクラスが必要になるかもしれません。 

  8. categorical-encoding というモジュールでは、scikit-learn に準拠した変換器がいくつも開発されていますが、このモジュールはデータが大きくなると非常に処理が遅くなります。そこで、カテゴリカル変数用のクラスを自作しました。