Beating C with 60 Lines of Python: wc -l (failed) ~Python での行数カウントを手軽に高速化~

follow us in feedly

R&D チームの奥村です。最近シェルスクリプトばっかり書いてます。Beating C with 70 Lines of Go - Ajeet D'Souza を見て気になったので、Python によるテキストファイルの高速な行数カウントを試しました。実験の結果、いくつかの簡単な処理を加えるとことで、ナイーブな実装に比べて、コアの数× 0.9 程度倍の高速化が達成できました。私の環境だと 8 コアなので、処理時間はおおよそ 1/7 で済むようになりました。残念ながら wc -l には一歩届きませんでした。

https://cdn-ak.f.st-hatena.com/images/fotolife/o/optim-tech/20191207/20191207092332.png

背景

Python で大きなテキストファイルを扱いたいときがあります。CSV などであれば pandas を使えば高速ですし読み込みの進捗も出せますが、pandas が未対応だとそうできません。例えば点群のファイルは、巨大なテキストファイル形式として書き出されていることがあります。私が見たものだと 1,500 万行という巨大なものもありました。このような場合、読み込みの進捗を出さねばアプリがフリーズしたようになってしまいます。

読み込みの進捗を出すには、行数を知っておく必要があります。しかし、巨大なファイルだとそれだけでもかなりの時間がかかります。今回は、これを高速に実行したい、という話です。

実験

以下のマシンで実験しました。

  • MacBook Pro (Retina, 15-inch, Mid 2015)
    • 2.5 GHz Intel Core i7
    • 16 GB 1600 MHz DDR3

Python 2.7 の環境を Docker 19.03 で実験しました*1

docker run --rm -i -t python:2.7 /bin/bash

時間とメモリ使用量の計測のため time ユーティリティを使いました。

apt-get update && apt-get install -y time

ベンチマーク用のファイルを用意しました。

dd if=/dev/zero of=large-file-1mb.txt count=1024 bs=1024
dd if=/dev/zero of=large-file-10mb.txt count=1024 bs=10240
dd if=/dev/zero of=large-file-100mb.txt count=1024 bs=102400
dd if=/dev/zero of=large-file-1gb.txt count=1024 bs=1048576

wc -l をベースラインとし、Python でいくつかの実装を試しました。

a: ベースライン

wc -l を使います。

b: Python によるナイーブな実装

#!/usr/bin/env python
# -*- coding: utf-8 -*-
import sys
def wc_naive(name):
with open(name, 'r') as f:
return sum(1 for line in f)
print(wc_naive(sys.argv[1]))
view raw wc-naive.py hosted with ❤ by GitHub

c: Python でブロックサイズを大きくした実装

wc コマンドの高速化の工夫として参考になるのは「ファイルの読み込みブロックを大きくとる」というものです。64 KiB が効率がいいようです。これで少し速くなります。

#!/usr/bin/env python
# -*- coding: utf-8 -*-
import sys
def wc_block_64k(name, blocksize=65536):
def blocks(f):
while True:
b = f.read(blocksize)
if b:
yield b
else:
break
with open(name, 'r') as f:
return sum(bl.count('\n') for bl in blocks(f))
print(wc_block_64k(sys.argv[1]))
view raw wc-block-64k.py hosted with ❤ by GitHub

d: Python で並列化 (multiprocessing.Pool)

マルチコア時代の恩恵にあやかります。ファイルサイズを並列実行数で分割し、各スレッド(今回の実装ではプロセス)にファイルのどこからどこまでを担当させるかを決めます。各スレッドは担当範囲分の行数を数えて、最後にそれを足し合わせます。

#!/usr/bin/env python
# -*- coding: utf-8 -*-
import multiprocessing
import os
import sys
import time
import threading
def get_chunk_line_count((name, start, stop, blocksize)):
left = stop - start
def blocks(f, left):
while left > 0:
b = f.read(min(left, blocksize))
if b:
yield b
else:
break
left -= len(b)
with open(name, 'r') as f:
f.seek(start)
return sum(bl.count('\n') for bl in blocks(f, left))
def get_file_offset_ranges(name, blocksize=65536, m=1):
fsize = os.stat(name).st_size
chunksize = (fsize // multiprocessing.cpu_count()) * m
n = fsize // chunksize
ranges = []
for i in range(0, n * chunksize, chunksize):
ranges.append((name, i, i + chunksize, blocksize))
if fsize % chunksize != 0:
ranges.append((name, ranges[-1][2], fsize, blocksize))
return ranges
def wc_mp_pool(name, blocksize=65536):
ranges = get_file_offset_ranges(name, blocksize)
pool = multiprocessing.Pool(processes=len(ranges))
pool_outputs = pool.map(get_chunk_line_count, ranges)
pool.close()
pool.join()
return sum(pool_outputs)
print(wc_mp_pool(sys.argv[1]))
view raw wc-mp-pool.py hosted with ❤ by GitHub

e: Python で並列化 (ProcessPoolExecuter)

d と同じ方針ですが、futures パッケージで提供される ProcessPoolExecuter を使います。pip install futures でインストールしました。

#!/usr/bin/env python
# -*- coding: utf-8 -*-
import multiprocessing
import os
import sys
import time
import threading
import concurrent.futures as futures
from concurrent.futures import ProcessPoolExecutor
def get_chunk_line_count((name, start, stop, blocksize)):
left = stop - start
def blocks(f, left):
while left > 0:
b = f.read(min(left, blocksize))
if b:
yield b
else:
break
left -= len(b)
with open(name, 'r') as f:
f.seek(start)
return sum(bl.count('\n') for bl in blocks(f, left))
def get_file_offset_ranges(name, blocksize=65536, m=1):
fsize = os.stat(name).st_size
chunksize = (fsize // multiprocessing.cpu_count()) * m
n = fsize // chunksize
ranges = []
for i in range(0, n * chunksize, chunksize):
ranges.append((name, i, i + chunksize, blocksize))
if fsize % chunksize != 0:
ranges.append((name, ranges[-1][2], fsize, blocksize))
return ranges
def wc_proc_pool_exec(name, blocksize=65536):
ranges = get_file_offset_ranges(name, blocksize)
with ProcessPoolExecutor(max_workers=len(ranges)) as executor:
results = [executor.submit(get_chunk_line_count, param) for param in ranges]
return sum([future.result() for future in futures.as_completed(results)])
print(wc_proc_pool_exec(sys.argv[1]))
view raw wc-proc-pool-exec.py hosted with ❤ by GitHub

実行結果

/usr/bin/time -f "%es %MKB" ~ のようにして実行した結果、以下の通りでした。 大きなファイルでの処理時間はいいところまでいきましたが、メモリ使用量は惨敗です。

(a) time (b) time (c) time (d) time (e) time (a) mem (b) mem (c) mem (d) mem (e) mem
large-file-1mb.txt 0 0.01 0.01 0.14 0.04 1868 7376 6220 11832 12552
large-file-10mb.txt 0 0.02 0.01 0.14 0.04 1824 18124 6316 11828 12744
large-file-100mb.txt 0.02 0.12 0.06 0.14 0.07 1780 128160 6356 11776 12816
large-file-1gb.txt 0.19 1.75 0.54 0.24 0.2 1872 1235448 6360 11684 12732
view raw wc-py-result.csv hosted with ❤ by GitHub

実験手法の実行速度を相対的に見ると以下のとおりでした。

手法 ブロックサイズ調整 並列化 A 並列化 B 処理時間 [秒] 相対処理時間
a 0.19 1
b 1.75 9.21052632
c 0.54 2.84210526
d 0.24 1.26315789
e 0.2 1.05263158
  • 並列化 A: multiprocessing.Pool
  • 並列化 B: concurrent.futures.ProcessPoolExecutor

相対的に見ると、ブロックサイズ調整の寄与が大きいですね。並列化はコアが増えればさらに寄与が大きくなるかも知れません。

ちなみに、10 GiB のファイルだと (a) で 13 秒、(e) で 15 秒ほどかかりました。

まとめ

完全に Rust が強そうなジャンルだなと思いました。とはいえ、データ解析で多用される Python でもちょっとしたコードでそこそこの速度になったので御の字かな、という印象です。

オプティムでは、こうした技術に興味がある・作ってみたい・既に作っている、というエンジニアを募集しています。興味のある方は、こちらをご覧ください。

www.optim.co.jp

*1:今更 Python 2.7 ... と、我ながら思いますが、この記事の元ネタとなったコードは三年ほど前に点群を扱っているときに書いたコードで、そのときの環境が Python 2.7 だった、という話です