R&D チームの奥村です。最近シェルスクリプトばっかり書いてます。Beating C with 70 Lines of Go - Ajeet D'Souza を見て気になったので、Python によるテキストファイルの高速な行数カウントを試しました。実験の結果、いくつかの簡単な処理を加えるとことで、ナイーブな実装に比べて、コアの数× 0.9 程度倍の高速化が達成できました。私の環境だと 8 コアなので、処理時間はおおよそ 1/7 で済むようになりました。残念ながら wc -l
には一歩届きませんでした。
https://cdn-ak.f.st-hatena.com/images/fotolife/o/optim-tech/20191207/20191207092332.png
背景
Python で大きなテキストファイルを扱いたいときがあります。CSV などであれば pandas を使えば高速ですし読み込みの進捗も出せますが、pandas が未対応だとそうできません。例えば点群のファイルは、巨大なテキストファイル形式として書き出されていることがあります。私が見たものだと 1,500 万行という巨大なものもありました。このような場合、読み込みの進捗を出さねばアプリがフリーズしたようになってしまいます。
読み込みの進捗を出すには、行数を知っておく必要があります。しかし、巨大なファイルだとそれだけでもかなりの時間がかかります。今回は、これを高速に実行したい、という話です。
実験
以下のマシンで実験しました。
- MacBook Pro (Retina, 15-inch, Mid 2015)
- 2.5 GHz Intel Core i7
- 16 GB 1600 MHz DDR3
Python 2.7 の環境を Docker 19.03 で実験しました*1。
docker run --rm -i -t python:2.7 /bin/bash
時間とメモリ使用量の計測のため time
ユーティリティを使いました。
apt-get update && apt-get install -y time
ベンチマーク用のファイルを用意しました。
dd if=/dev/zero of=large-file-1mb.txt count=1024 bs=1024 dd if=/dev/zero of=large-file-10mb.txt count=1024 bs=10240 dd if=/dev/zero of=large-file-100mb.txt count=1024 bs=102400 dd if=/dev/zero of=large-file-1gb.txt count=1024 bs=1048576
wc -l
をベースラインとし、Python でいくつかの実装を試しました。
a: ベースライン
wc -l
を使います。
b: Python によるナイーブな実装
#!/usr/bin/env python | |
# -*- coding: utf-8 -*- | |
import sys | |
def wc_naive(name): | |
with open(name, 'r') as f: | |
return sum(1 for line in f) | |
print(wc_naive(sys.argv[1])) |
c: Python でブロックサイズを大きくした実装
wc
コマンドの高速化の工夫として参考になるのは「ファイルの読み込みブロックを大きくとる」というものです。64 KiB が効率がいいようです。これで少し速くなります。
#!/usr/bin/env python | |
# -*- coding: utf-8 -*- | |
import sys | |
def wc_block_64k(name, blocksize=65536): | |
def blocks(f): | |
while True: | |
b = f.read(blocksize) | |
if b: | |
yield b | |
else: | |
break | |
with open(name, 'r') as f: | |
return sum(bl.count('\n') for bl in blocks(f)) | |
print(wc_block_64k(sys.argv[1])) |
d: Python で並列化 (multiprocessing.Pool)
マルチコア時代の恩恵にあやかります。ファイルサイズを並列実行数で分割し、各スレッド(今回の実装ではプロセス)にファイルのどこからどこまでを担当させるかを決めます。各スレッドは担当範囲分の行数を数えて、最後にそれを足し合わせます。
#!/usr/bin/env python | |
# -*- coding: utf-8 -*- | |
import multiprocessing | |
import os | |
import sys | |
import time | |
import threading | |
def get_chunk_line_count((name, start, stop, blocksize)): | |
left = stop - start | |
def blocks(f, left): | |
while left > 0: | |
b = f.read(min(left, blocksize)) | |
if b: | |
yield b | |
else: | |
break | |
left -= len(b) | |
with open(name, 'r') as f: | |
f.seek(start) | |
return sum(bl.count('\n') for bl in blocks(f, left)) | |
def get_file_offset_ranges(name, blocksize=65536, m=1): | |
fsize = os.stat(name).st_size | |
chunksize = (fsize // multiprocessing.cpu_count()) * m | |
n = fsize // chunksize | |
ranges = [] | |
for i in range(0, n * chunksize, chunksize): | |
ranges.append((name, i, i + chunksize, blocksize)) | |
if fsize % chunksize != 0: | |
ranges.append((name, ranges[-1][2], fsize, blocksize)) | |
return ranges | |
def wc_mp_pool(name, blocksize=65536): | |
ranges = get_file_offset_ranges(name, blocksize) | |
pool = multiprocessing.Pool(processes=len(ranges)) | |
pool_outputs = pool.map(get_chunk_line_count, ranges) | |
pool.close() | |
pool.join() | |
return sum(pool_outputs) | |
print(wc_mp_pool(sys.argv[1])) |
e: Python で並列化 (ProcessPoolExecuter)
d と同じ方針ですが、futures
パッケージで提供される ProcessPoolExecuter
を使います。pip install futures
でインストールしました。
#!/usr/bin/env python | |
# -*- coding: utf-8 -*- | |
import multiprocessing | |
import os | |
import sys | |
import time | |
import threading | |
import concurrent.futures as futures | |
from concurrent.futures import ProcessPoolExecutor | |
def get_chunk_line_count((name, start, stop, blocksize)): | |
left = stop - start | |
def blocks(f, left): | |
while left > 0: | |
b = f.read(min(left, blocksize)) | |
if b: | |
yield b | |
else: | |
break | |
left -= len(b) | |
with open(name, 'r') as f: | |
f.seek(start) | |
return sum(bl.count('\n') for bl in blocks(f, left)) | |
def get_file_offset_ranges(name, blocksize=65536, m=1): | |
fsize = os.stat(name).st_size | |
chunksize = (fsize // multiprocessing.cpu_count()) * m | |
n = fsize // chunksize | |
ranges = [] | |
for i in range(0, n * chunksize, chunksize): | |
ranges.append((name, i, i + chunksize, blocksize)) | |
if fsize % chunksize != 0: | |
ranges.append((name, ranges[-1][2], fsize, blocksize)) | |
return ranges | |
def wc_proc_pool_exec(name, blocksize=65536): | |
ranges = get_file_offset_ranges(name, blocksize) | |
with ProcessPoolExecutor(max_workers=len(ranges)) as executor: | |
results = [executor.submit(get_chunk_line_count, param) for param in ranges] | |
return sum([future.result() for future in futures.as_completed(results)]) | |
print(wc_proc_pool_exec(sys.argv[1])) |
実行結果
/usr/bin/time -f "%es %MKB" ~
のようにして実行した結果、以下の通りでした。
大きなファイルでの処理時間はいいところまでいきましたが、メモリ使用量は惨敗です。
(a) time | (b) time | (c) time | (d) time | (e) time | (a) mem | (b) mem | (c) mem | (d) mem | (e) mem | ||
---|---|---|---|---|---|---|---|---|---|---|---|
large-file-1mb.txt | 0 | 0.01 | 0.01 | 0.14 | 0.04 | 1868 | 7376 | 6220 | 11832 | 12552 | |
large-file-10mb.txt | 0 | 0.02 | 0.01 | 0.14 | 0.04 | 1824 | 18124 | 6316 | 11828 | 12744 | |
large-file-100mb.txt | 0.02 | 0.12 | 0.06 | 0.14 | 0.07 | 1780 | 128160 | 6356 | 11776 | 12816 | |
large-file-1gb.txt | 0.19 | 1.75 | 0.54 | 0.24 | 0.2 | 1872 | 1235448 | 6360 | 11684 | 12732 |
実験手法の実行速度を相対的に見ると以下のとおりでした。
手法 | ブロックサイズ調整 | 並列化 A | 並列化 B | 処理時間 [秒] | 相対処理時間 |
---|---|---|---|---|---|
a | 0.19 | 1 | |||
b | 1.75 | 9.21052632 | |||
c | ✓ | 0.54 | 2.84210526 | ||
d | ✓ | ✓ | 0.24 | 1.26315789 | |
e | ✓ | ✓ | 0.2 | 1.05263158 |
- 並列化 A: multiprocessing.Pool
- 並列化 B: concurrent.futures.ProcessPoolExecutor
相対的に見ると、ブロックサイズ調整の寄与が大きいですね。並列化はコアが増えればさらに寄与が大きくなるかも知れません。
ちなみに、10 GiB のファイルだと (a) で 13 秒、(e) で 15 秒ほどかかりました。
まとめ
完全に Rust が強そうなジャンルだなと思いました。とはいえ、データ解析で多用される Python でもちょっとしたコードでそこそこの速度になったので御の字かな、という印象です。
オプティムでは、こうした技術に興味がある・作ってみたい・既に作っている、というエンジニアを募集しています。興味のある方は、こちらをご覧ください。
*1:今更 Python 2.7 ... と、我ながら思いますが、この記事の元ネタとなったコードは三年ほど前に点群を扱っているときに書いたコードで、そのときの環境が Python 2.7 だった、という話です