1. Qiita
  2. 投稿
  3. Python

デコレータを使ってデータ加工処理の再実行を防ぐ

  • 2
    いいね
  • 0
    コメント

概要

データを加工して,一度ディスクに保存し,それを再利用(二度目からはデータ加工をスキップ)するというのはよくある一般的な処理ですが,再利用時のパラメータの依存なども考えると意外と煩雑になりがちです.
そこで,Pythonのデコレータを使ってスキップ判定をして,同じ処理を繰り返さない実装を考えます.

今回の処理をライブラリとしてまとめたものが github.com/sotetsuk/memozo にあります:

モチベーション

例えば,今手元に膨大な量の文からなるデータ(一行につき一文)があるとします:

1. I have a pen.
2. I have an apple.
3. ah! Apple pen!

...

9999...

# PPAP (copyright belongs to Pikotaro)

今,このデータから特定のキーワードが書かれた文だけをフィルタしたいとします(例えば,キーワード pen が入っている文).

フィルタのナイーブな実装の一つは,次のように,条件を満たす文を見つかるたびにyieldするジェネレータを作ることでしょう:

def filter_data(keyword):
    path_to_raw_data = './data/sentences.txt'
    with codecs.open(path_to_raw_data, 'r', 'utf-8') as f:
        for line in f:
            if keyword in line:
                yield line

gen = filter_data('pen')
for line in gen:
    print(line, end='')

そして,もしこの加工されたデータ(フィルタされたデータ)を何度も再利用する場合,毎回全データをスキャンするのは必ずしも得策ではありません.
一度ディスクにフィルタされたデータをキャッシュして,次からはそのキャッシュされたデータを使いたいと思うことがあります.
また,このデータ加工処理はパラメータ(keyword)に依存しますから,もし異なるkeywordでこの処理が実行されれば,再度全データをチェックしてディスクにキャッシュを行いたいという側面もあります.
そしてこの処理をデコレータを使って関数をラップするだけで達成したい,という気持ちがあります.

まとめると,目標は,次のようにデコレータ awesome_decorator を使ってジェネレータからの出力をキャッシュして,もし同じパラメータでこの関数が実行されれば,キャッシュを使って出力を返すことです:

@awesome_decorator
def filter_data(keyword):
    path_to_raw_data = './data/sentences.txt'
    with codecs.open(path_to_raw_data, 'r', 'utf-8') as f:
        for line in f:
            if keyword in line:
                yield line


# 一度目は全データをスキャンし,結果を返します.
# このとき,フィルタされた文を './data/pen.txt' にキャッシュします.
gen_pen_sentences1 = filter_data('pen')
for line in gen_pen_sentences1:
    print(line, end='')

# 同じパラメータでの実行なので,キャッシュ './data/pen.txt' からデータを返します.
gen_pen_sentences2 = filter_data('pen')
for line in gen_pen_sentences2:
    print(line, end='')

# 新しいパラメータなので,生データからもう一度フィルタリングを行います.
gen_apple_sentences = filter_data('apple')
for line in gen_apple_sentences:
    print(line, end='')

またこの例はジェネレータを返す関数ですが,他にも pickle によってシリアライズ可能なオブジェクトを返す関数の実行結果を,ディスクにキャッシュしたい場面もあると思います(例えば前処理された ndarray や,パラメータに依存した学習済の機械学習モデル).

実装

awesome_decorator の実装は簡単で,既にキャッシュされたファイルがあるかないかを判定し,

  1. キャッシュがある場合は,キャッシュから値を返すジェネレータを新しく作って,元のジェネレータの代わりに返す
  2. キャッシュがない場合は元のジェネレータをラップし,値を返す毎にキャッシュに書き込むジェネレータを返す

だけです(pickle などを使う場合も同様です):

def awesome_decorator(func):

    @functools.wraps(func)
    def _wrapper(keyword):
        # 今回は簡単のため関数の引数はkeyword一つだけを想定します.
        # 一般の(*args, **kwargs)を使う場合にはinspectなどを使って引数とその値を抽出します.
        file_path = './data/{}.txt'.format(keyword)

        # もしキャッシュされたデータがあれば,そこから文を読み込むジェネレータを返します.
        if os.path.exists(file_path):
            def gen_cached_data():
                with codecs.open(file_path, 'r', 'utf-8') as f:
                    for line in f:
                        yield line
            return gen_cached_data()

        # もしキャッシュしたデータがなければ,通常通り生データから文を返すデコレータを生成します.
        gen = func(keyword)

        # また,上記のジェネレータから返される値を逐次キャッシュします.
        def generator_with_cache(gen, file_path):
            with codecs.open(file_path, 'w', 'utf-8') as f:
                for e in gen:
                    f.write(e)
                    yield e

        return generator_with_cache(gen, file_path)

    return _wrapper

デコレータ自体についての解説は Pythonのデコレータを理解するための12Step の記事が分かりやすいです.

すべてまとめると,次のようになります(これは./data/sentence.txtを用意すればそのまま動きます):

awesome_generator.py
# -*- coding: utf-8 -*-

import os
import functools
import codecs


def awesome_decorator(func):

    @functools.wraps(func)
    def _wrapper(keyword):
        # 今回は簡単のため関数の引数はkeyword一つだけを想定します.
        # 一般の(*args, **kwargs)を使う場合にはinspectなどを使って引数とその値を抽出します.
        file_path = './data/{}.txt'.format(keyword)

        # もしキャッシュされたデータがあれば,そこから文を読み込むジェネレータを返します.
        if os.path.exists(file_path):
            def gen_cached_data():
                with codecs.open(file_path, 'r', 'utf-8') as f:
                    for line in f:
                        yield line
            return gen_cached_data()

        # もしキャッシュしたデータがなければ,通常通り生データから文を返すデコレータを生成します.
        gen = func(keyword)

        # また,上記のジェネレータから返される値を逐次キャッシュします.
        def generator_with_cache(gen, file_path):
            with codecs.open(file_path, 'w', 'utf-8') as f:
                for e in gen:
                    f.write(e)
                    yield e

        return generator_with_cache(gen, file_path)

    return _wrapper


@awesome_decorator
def filter_data(keyword):
    path_to_raw_data = './data/sentences.txt'
    with codecs.open(path_to_raw_data, 'r', 'utf-8') as f:
        for line in f:
            if keyword in line:
                yield line


if __name__ == '__main__':
    # 一度目は全データをスキャンし,結果を返します.
    # このとき,フィルタされた文を './data/pen.txt' にキャッシュします.
    gen_pen_sentences1 = filter_data('pen')
    for line in gen_pen_sentences1:
        print(line, end='')

    # 同じパラメータでの実行なので,キャッシュ './data/pen.txt' からデータを返します.
    gen_pen_sentences2 = filter_data('pen')
    for line in gen_pen_sentences2:
        print(line, end='')

    # 新しいパラメータなので,生データからもう一度フィルタリングを行います.
    gen_apple_sentences = filter_data('apple')
    for line in gen_apple_sentences:
        print(line, end='')

memozo

今回の実装は,パラメータの形やファイル名等を固定された形で扱っていましたが,任意の形に少し拡張したものをパッケージとしてgithub.com/sotetsuk/memozoにまとめました.
これを使うと今回の処理はこのように書けます:

from memozo import Memozo

m = Memozo('./data')

@m.generator(file_name='filtered_sentences', ext='txt')
def filter_data(keyword):
    path_to_raw_data = './data/sentences.txt'
    with codecs.open(path_to_raw_data, 'r', 'utf-8') as f:
        for line in f:
            if keyword in line:
                yield line

キャッシュファイルは './data/filtered_sentences_1fec01f.txt' に保存され, ./data/.memozo に使ったパラメータ等の履歴が書き込まれます.
hashは(ファイル名,関数名,パラメータ)から計算され,同じhashを使った履歴とキャッシュファイルが両方既に存在している場合には関数の実行はスキップされます.
つまり,同じ(ファイル名,関数名,パラメータ)で実行すればキャッシュから値が返されますし,どれか一つでも変えれば結果は異なります.

ジェネレータ以外にも picklecodecs, 普通の open に対応するバージョンの機能が用意してあります.

まだ実装は不完全だと思いますのでIssue/PR等挙げていただけると嬉しいです.

関連

タスク間に複雑な依存関係がある場合はDAGベースのワークフローツールを使った方がいいでしょう.一例として,github.com/spotify/luigiなどが挙げられます.

参考文献