ForEachAsync - 非同期の列挙の方法 Part2
- C# - 14.03/14
Part2って、Part1はあったのかというと、うーん、非同期時代のLINQ、かな……?さて、今回はForEachがテーマです。といってもそれってSelect+WhenAllでしょ!「Selectは非同期時代のForEach」って言ってたじゃない、というと、はい、言ってました。まだ他に言うことあるの?というと、例えば以下のシチュエーション。
var httpClient = new HttpClient(); var tasks = Enumerable.Range(1, 100000) .Select(async x => { var str = await httpClient.GetStringAsync("http://hogehoge?q=" + x); Console.WriteLine(str); }); await Task.WhenAll(tasks);
別に動きはしますが、制御不能に10万件、同時リクエスト走ります。これはまぁいくないですよね。もはや途中で死んだりしますので動くとも言えない……。というわけで、元シーケンスが巨大な時は、Select+WhenAllはForEachになりえないのです。
さて、この事態に手抜きで対抗すると?
var httpClient = new HttpClient(); Parallel.ForEach(Enumerable.Range(1, 100000), x => { var str = httpClient.GetStringAsync("http://hogehoge?q=" + x).Result; Console.WriteLine(str); });
みんな大好きParallel.ForEachです。CPUバウンドとかI/Oバウンドとか面倒くさいんですよ、動きゃあいいんですよ(ホジホジ。という楽さ。実際これは普通に機能します。ので、バッチとかはこんなんでもいーんじゃないでしょうか、マジで。でも、これ、序盤はじわじわと並列数が上がってくので、初速がイマイチに感じるかもしれません。最初はコア数分しか並列にならず、待ちが多いことを検出してからじわじわ上がっていくので。あと終盤の挙動をアレゲに感じたりするかもしれません。待ち時間が長いと、際限なく並列数が上がってっちゃうんですよ。でも別に極端に上がっても速くなるわけじゃなくて、逆にむしろ余計遅くなる。ので、ちょこっと調整を入れます。
// 最小スレッドプール数を最初に適当に伸ばしてやると初速に効く // 設定は一回でいいので、アプリケーションスタートアップのところにでも置いときましょう ThreadPool.SetMinThreads(200, 200); // 無尽蔵に伸び続けるのもいくないのでMaxDegreeOfParallelismを設定 var httpClient = new HttpClient(); Parallel.ForEach(Enumerable.Range(1, 100000), new ParallelOptions { MaxDegreeOfParallelism = 200 }, x => { var str = httpClient.GetStringAsync("http://hogehoge?q=" + x).Result; Console.WriteLine(str); });
SetMinThreadsとMaxDegreeOfParallelism、この2つはふとぅーに影響大きくて大事。なので適当に、とか書きましたがあんまり適当にやるのはよくない。
ForEachAsync
とはいえ、非同期は非同期として扱いたい!そりゃそーだ。で、つまり、ようするに、同時実行数を抑えながら非同期を走らせられればいい。それにうってつけのクラスがSemaphoreSlim。「リソースまたはリソースのプールに同時にアクセスできるスレッドの数を制限する Semaphore の軽量版です。SemaphoreSlim は、Windows カーネルのセマフォを使用しない、軽量セマフォ クラスを提供します。」。です。.NET 4.0からの登場。使うメソッドはWaitAsync(これは.NET 4.5から)とReleaseがほとんどかな。.NET 4.0の場合はWaitAsyncのかわりにWaitで。
内部にCountを持っていて、それをWaitAsyncで減らし、Releaseで増やします。Countが0に達すると、WaitAsyncは待機するようになります。これを用いてForEachAsyncを作ってみると?
public static class EnumerableExtensions { public static async Task ForEachAsync<T>(this IEnumerable<T> source, Func<T, Task> action, int concurrency, CancellationToken cancellationToken = default(CancellationToken), bool configureAwait = false) { if (source == null) throw new ArgumentNullException("source"); if (action == null) throw new ArgumentNullException("action"); if (concurrency <= 0) throw new ArgumentOutOfRangeException("concurrencyは1以上の必要があります"); using (var semaphore = new SemaphoreSlim(initialCount: concurrency, maxCount: concurrency)) { var exceptionCount = 0; var tasks = new List<Task>(); foreach (var item in source) { if (exceptionCount > 0) break; cancellationToken.ThrowIfCancellationRequested(); await semaphore.WaitAsync(cancellationToken).ConfigureAwait(configureAwait); var task = action(item).ContinueWith(t => { semaphore.Release(); if (t.IsFaulted) { Interlocked.Increment(ref exceptionCount); throw t.Exception; } }); tasks.Add(task); } await Task.WhenAll(tasks.ToArray()).ConfigureAwait(configureAwait); } } }
ほむ、わからん。ExceptionとかCancellationTokenとかでゴチャついてますが、よーわ、実行開始しようとするとWaitAsyncでカウントを減らして、実行完了したらReleaseでカウントを増やす。初期値の指定がそのまま並列実行数になる、って感じ。利用例を見ると
var httpClient = new HttpClient(); await Enumerable.Range(1, 100000) .ForEachAsync(async x => { var str = await httpClient.GetStringAsync("http://hogehoge?q=" + x); Console.WriteLine(str); }, concurrency: 200);
実に簡単にひどぅーきなForEachができました。これは、Taskの実行開始はシーケンシャルです。これも何気に有難かったりしますねえ。実行完了のほうは順不同です。まあ、そりゃそうだ、って話ですね。
まとめ
SemaphoreSlimかわいい。