Java

同期・非同期・ノンブロッキング・並列・リアクティブを区別する

Play Framework や Akka, RxJava, Spring5 での WebFlux などここ数年は非同期処理だとかリアクティブだとかが流行っていますが、
いまいち各処理ごとの特性とかが整理できなかったり、整理しても忘れてしまうことがあるので
自分向けのメモとして書いておこうと思います。

これらが指すものは OS・プログラム・IO などの文脈によっても変わりますので、おかしい部分があるかもしれません。間違いがあれば突っ込んでください。

非同期が重要なのは IOバウンドな処理です。
IO はファイルやネットワークの通信先の準備が処理できないと処理が開始できません。
それをどうやって扱うのかがポイントになります。

同期(ブロッキング)

普通に IO を開始したら準備できるまで待ちます。

HTTP で適当なホストのレスポンスを取得するコードは次の通りです。

public static String httpGet(String url) throws Exception {
    URL u = new URL(url);
    HttpURLConnection con = (HttpURLConnection)u.openConnection();
    con.connect();
    try(InputStream is = con.getInputStream()) {
        BufferedReader br = new BufferedReader(new InputStreamReader(is, "utf-8"));

        StringBuilder sb = new StringBuilder();
        String line;
        while((line = br.readLine()) != null) {
            sb.append(line + "\n");
        }
        return sb.toString();
    }
}

相手との通信に時間がかかったり取得するデータ量が多いと、同期処理だと今の処理が終わるまで次の処理を行えません(ブロッキング)

これを解消しようというのが非同期処理です。

非同期処理

IO を非同期化する方法はマルチスレッドとノンブロッキングIO の2通りです。

マルチスレッド

同期処理を別のスレッドで処理します。
次は前述の同期処理をスレッドプール上で実行するサンプルです。
(Javaには非同期IO を扱う AsynchronousSocketChannel もありますが割愛します)

ExecutorService pool = Executors.newCachedThreadPool();
Future<String> result = pool.submit(() -> httpGet("sample"));

submit で記述したタスクが別スレッドで開始されますが、結果を待たず Future が呼び出し元に返されます。
呼び出し元は後で Future から結果を取得できます。

非同期計算の結果については Future で将来の計算結果を取得する方法や、コールバックを設定して今の計算が終わった後に続けて行う処理を設定したります。
後者の方式は Java だと CompletableFuture でできます。

ノンブロッキング

そもそも IO 処理などを同期にしない方法で OS の命令(select/pool)やライブラリ(libuvなど)を使って実現します。
イベントループと呼ぶ無限ループを回してIO を受け付けます。
IO を受け付けてから開始できる状態でないなら、それを置いておいて次の IO を受け付けます。
IO が受付可能な状態になっていることを、イベントループで検知したらその IO を開始します。

// ノンブロッキングIOで、nums回 url にアクセスする。
public static void nonBlockingIO(int nums, String url) throws IOException {

    long start = System.currentTimeMillis();

    // セレクターは登録したリソースに状況の変化があったら知らせてくれるもの。
    Selector selector =  SelectorProvider.provider().openSelector();

    // 件数分、HTTP接続とGETリクエストの書き込みを行った後、セレクタに登録して、応答を待つ。
    for (int i = 0; i < nums; i++) {

        String host = url.substring(url.startsWith("https") ? 8 : 7);
        SocketChannel chan = SocketChannel.open(new InetSocketAddress(host, 80));
        chan.configureBlocking(false); // Non-Blocking

        chan.write(ByteBuffer.wrap(("GET " + "/" +  " HTTP/1.1\r\n").getBytes()));
        chan.write(ByteBuffer.wrap(("HOST: " + host + "\r\n").getBytes()));
        chan.write(ByteBuffer.wrap("Connection: close\r\n".getBytes()));
        chan.write(ByteBuffer.wrap("\r\n".getBytes()));
        chan.write(ByteBuffer.wrap("\r\n".getBytes()));

        chan.register(selector, SelectionKey.OP_READ);
    }

    List<String> list = new ArrayList<>();
    int selectCount = 0;

    // 準備ができたものから受け取リ続ける。。
    while(selector.select() > 0) {
        Iterator<SelectionKey> itr = selector.selectedKeys().iterator();
        while (itr.hasNext()) {
            SelectionKey key = itr.next();
            itr.remove(); // 取り出したキーは削除
            if (key.isReadable()) {
                System.out.println(key);
                SocketChannel chan = (SocketChannel)key.channel();

                ByteBuffer buf = ByteBuffer.allocate(4096);
                int read = chan.read(buf);
                StringBuilder sb = new StringBuilder();
                while(read >= 0) {
                    buf.flip();
                    sb.append(StandardCharsets.UTF_8.decode(buf).toString());
                    buf.clear();
                    read = chan.read(buf);
                }
                list.add(sb.toString());
                selectCount++;
                chan.close();
                key.cancel();
                if (selectCount == nums) {
                    //全てのチャネルを取得したら、終了。
                    selector.wakeup();
                }
            }
        }
    }


    System.out.println(list.get(0));
    System.out.println(list.get(3));
    System.out.println(list.get(9));
}

コンビニのレジを一人で担当する場合、一人目のお客さんが弁当を温めてと言ったら弁当を温めている間お客さんを横に立たせて、次のお客さんの対応すると思います。そして弁当が温め終わったら、最初のお客さんに渡します。
ノンブロッキングIO はこれに近いやり方で、マルチスレッドはお客さん一人ずつに店員を一人割り当てる感じです。

ノンブロッキングIO は Java では Netty という Webサーバーが採用しています。少ないスレッドで大量のアクセスをさばけます。 Java 以外では node.js が libuv によるノンブロッキングIO を全面的に採用してシングルスレッドでIOを多重化しています。

node.js はその設計上、IO 処理を使う場面ではコールバックで後続処理を指定する必要があります。
( xxxSync のようなブロッキング用メソッドも用意してあることが多いですが、プロダクションコードで使うなと書いてあることが多いです)
最近だと、 Promise や async/await の採用でコールバックのネストが少ないコードを書けるようになりました。

スレッドとノンブロッキングどっちがいいの?

冒頭の Play Framework, Akka, WebFlux を見ているとノンブロッキングを基本として、マルチスレッドを組み合わせるが現代の標準っぽく感じます。

Java並行処理プログラミング
を読み返したら、ノンブロッキングIO のプログラムは難しいしマルチスレッドでも性能は十分だからマルチスレッドにしたら的なことが書いてありましたが、すでに10年以上前の本です。
今でも並行処理の基本を知るための最適な一冊(ただし絶版)ですが、
その時にはスマートフォンや IoT の登場により、爆発的にネットワークアクセスが増えることは予想されていなかったのだろうなと。

マルチスレッドにして、同期処理を別スレッドにしたら呼び出し側から見ればブロッキングするコードは無くなりますが、アプリケーション全体で見れば結局どこかがブロッキングしていることに変わりはありません。
よってアプリケーションへの同時アクセス数が増大すればブロッキング箇所でスレッドプール内のスレッドが足りなくなり、パフォーマンス低下を招きます。

ノンブロッキングだとスレッドを有効活用するため大量アクセスに耐えられますが、ブロッキング処理が発生したらそこがネックになってパフォーマンスが落ちます。
そこで、ノンブロッキング処理をマルチスレッド化したり、どうしても避けられないブロッキング処理を切り離して行う場所としてスレッドプールを併用します。

どうしても避けれられないブロッキング処理の代表は JDBC です。
Play Framework は非同期・ノンブロッキングなフレームワークと言っていますが、DBアクセスを行う処理についてはスレッドプールを別にして実行せよという指針があります。

いつになるのかわかりませんが async-JDBC (java.sql2) という提案があるので、それが登場したら全てがノンブロッキングになるアプリケーションが実現できそうです。
https://events.rainfocus.com/catalog/oracle/oow17/catalogjavaone17?search=CON1491&showEnrolled=false

並行処理 (Concurrency)

非同期と区別があまりついていませんが、並行とか複数の異なるタスクが同時に動いて1つの処理を作るものだそうです。 GUI アプリケーションのような、処理実行中にキャンセルで処理を打ち切るとかああいうものを指すんだと思います。
非同期は前述のような単発の処理をバックグラウンドでという感じですかね。

並列 (Parallel)

非同期は主に IO バウンドな処理向けの効率化のためでした。
純粋に長大な配列の各要素に1億回計算を行うというような CPU バウンドの処理に対して非同期処理では、
UXの改善はできても、処理時間を短縮することはできません。
並列処理は、タスクを複数のタスクに分割しそれをマルチ CPU で同時に処理する感じです。

百枚のはがきのあて名書きを何人かで手分けしてやる感じですね。

Java では並列処理は、ForkJoinPool や、 Stream API の parallel モードを使用します。
Stream API は内部で ForkJoinPool を使っているので、これが中心ですね。
ForkJoinPool は、work steal というアルゴリズムで実装したスレッドプールで、大量のタスクに対して各スレッドが自分でタスクを取りに行く動きをすることで、複数のスレッドが常に働き続けるような社畜根性にあふれた振る舞いをしてくれます。

リアクティブ

リアクティブ宣言、リアクティブシステム、リアクティブプログラムと色々ありますが、
ここではリアクティブプログラムについて触れます。
つまりは、 ReactiveExtention や RxJava などです。

リアクティブプログラムは 今までに出てきた技術要素の上に成り立つライブラリ・API という位置づけです。

大量・無限に発生するデータを、データの発生都度処理していくというスタイルを取ります。

例えば、ファイルの中身を取得して別のファイルに書き出すというプログラムを書くことを考えてみます。
ファイルの中身をすべてメモリにロードしてファイルに書き出すというプログラムは簡単ですが、ファイルサイズが 1G とかだったりするとファイルを開くまでに時間がかかるし、ヒープサイズが足りずにアプリケーションが死んだりします。
そこで、読み込みと書き出しのファイルを2つのも開き、片方から1行読みだしたら片方に1行書き出すという風にしてみます。
そうするとヒープを消費せずにファイルが書きだせます。

余談ですが私はバッチ処理でDBからCSVを書きだすという内容の処理で同様の改善を行ったことがあり、その時はヒープ使用量が 6GB から 200MB まで減りました。

リアクティブプログラムは上記のような、データの送信元(Publisher)から、データの送信先( Subscriber) にデータが発生する度に送る処理に対する汎用的な API です。

データの送信元・送信先をファイルやネットワークにしたり、処理をマルチスレッドにしたり、複数の送信元・送信先を混在させたり、データの加工をラムダ式で設定出来たりといった機能があります。
また、送信元・送信先の処理能力に合わせてデータの送信量を調整するといった機能もあります(バックプレッシャーという)

非同期処理を使ってデータを後から取得するのは簡単です。しかし、大量データの取得要求をするとデータが揃うまでの処理時間が長くなったり、大量データを取得するための処理でリソースが枯渇しかねません。
リアクティブにすることで、大量データの要求であっても、相手に少しづつデータを送れます。
無限に発生するデータにも対応できますし、大量アクセスでも安定して対応できます。

このような特徴は例えばログ送信のような大量・無限に発生し続けるデータを逐次処理するのに有効です。

リアクティブシステムが目差すのは、リアクティブプログラムによる非同期ノンブロッキングと分散処理で、拓さんのクライアントに素早く応答することだと言えそうです。

主にサーバーサイドの文脈で述べましたが、リアクティブの適用範囲は広く GUI アプリで発生するイベントをうまく扱うための手段としても用います。

私もリアクティブプログラムはよくわからないので、今後も勉強したいと思います。