プロダクションのRustコードを async / await に移行した話
κeenです。日本時間の 11/8 日に Rust 1.39.0 が リリースされ、Rustでもいよいよ async / await が利用できるようになりました。
async / await は面倒な Future の記述をすっきり書けるようにするシンタックスシュガーであると共に、 Future をまたいだ値のライフタイムもよしなに扱ってくれるので視認性以上のメリットがあります。
可能な限り使った方が良いでしょう。
Ideinのプロダクションコードもすぐさま async / await に移行しました。
IdeinのActcastのプロジェクトにはいくつかRustのコードベースがありますが、そのうちのAPIサーバの部分を async / await に移行しました。元々非同期なWAFを使っていたこともあり、ほとんどのコードで Future を使っています。規模としてはRustだけで23パッケージ、4万行程あるようです。
$ ls */Cargo.toml | wc -l
23
$ git ls-files | grep '\.rs$' | xargs wc -l
...
39673 合計
何がどう変わるの?
移行に際してはOPTiMさんのTECH BLOGに大変お世話になりました。素晴しいブログをありがとうございます。
Rustの非同期プログラミングをマスターする - OPTiM TECH BLOG
さて、async / await に移行と言っていますが、async / await を使うには標準ライブラリの Future も必要なので厳密には同時に2つのことをやっています。さらに、Ideinのコードベースでは Future の and_then の連鎖を少しでも読みやすくするために mdoと mdo-future も利用していたのでこれらの移行を行います。
つまり、以下の3つの作業を同時に行いました
- futures 0.1からfutures 0.3に移行
- mdoをやめて
async/awaitに移行 - その他の
Futureを使うコードもasync/awaitに移行
futures 0.1からfutures 0.3に移行
元々RustでFutureデザインパターンを書くにはfuturesクレート (futures 0.1) がデファクトスタンダード的立ち位置でした。
Rust 1.39.0で入った async / await は言語機能なので外部のクレートに依存することはなくて、標準ライブラリにある std::future::Future トレイトを使っています(async / await のために標準ライブラリに Future が入りました)。ここでよく使われる Future が2種類あることになります。
std::future::Future は本当に async / await の実現に必要な最低限の機能しか持っていません。
std::future::Future をベースとしつつfutures 0.1と同等の拡張機能を提供するのがfutures 0.3です。
同時に std::future::Future も再エクスポートしているので今までと同じような感覚でfuturesクレートを使うと、自然と async / await に対応した Future になる訳です。
とはいっても Future の定義がまるっと変わってしまっているのでそのまま自然には移行できません。
両者の定義を比べてみましょう。
futures 0.1
pub trait Future {
type Item;
type Error;
fn poll(&mut self) -> Poll;
fn wait(self) -> Result
where
Self: Sized,
{ ... }
fn map(self, f: F) -> Map
where
F: FnOnce(Self::Item) -> U,
Self: Sized,
{ ... }
fn map_err(self, f: F) -> MapErr
where
F: FnOnce(Self::Error) -> E,
Self: Sized,
{ ... }
// その他19個のメソッド
}
std::future::Future
pub trait Future {
type Output;
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll;
}
一番の違いは futures 0.1 の Future は関連型が Item と Error の2つあるのに対して標準ライブラリの Future は Output 1つしかありません。
型を impl Future<...> などと書いている箇所は全て変更が必要になります。
futures 0.1 のときに impl Future と書いている箇所は Result を使って impl Future<output>> と書くと綺麗に対応が取れるでしょう。
また、futures 0.1 にはメソッドが生えているのに対して標準ライブラリのものにはメソッドが生えていません。そこはfutures 0.3がカバーしており、futures 0.3の FutureExt と TryFutureExt を使えばほとんど同じメソッドが揃うはずです。
mdoをやめて async / await に移行
mdo については貧者の async / await といったところなので省略します。
Future を使うコードを async / await に移行
まず、 Future を返していた関数は async fn に移行できます。
以下のように書かれたfutures 0.1のコードがあったとします。
fn do_async() -> impl Future {
// ...
return future;
}
これは先程紹介したように標準ライブラリの Future と Result 使ったコードに移行できます。
fn do_async() -> impl Future<output>> {
// ...
return future;
}
これはさらに async fn に書き換えられます
async fn do_async() -> Result {
// ...
return Ok(value);
}
また、futures 0.1の and_then や then などを使っているコードは(async ブロックと) await で書き換えられます。
以下のように書かれたfutures 0.1のコードがあったとします。
do_async().and_then(|value| {
// do something with value ...
return future;
}).then(|result| match result {
Ok(v) => {
// ...
}
Err(e) => {
// ...
}
})
これは以下のように async ブロックと await を使って書き換えられます。
async {
let value = do_async().await?;
// do something with value ...
let result = future.await;
match result {
Ok(v) => {
// ...
}
Err(e) => {
// ...
}
}
}
このコードが async fn の中にある場合は async ブロックは不要です。
async fn run_async() {
let value = do_async().await?;
// do something with value ...
let result = future.await;
match result {
Ok(v) => {
// ...
}
Err(e) => {
// ...
}
}
}
コードの大部分に影響はありますが、内容としてはほとんど頭を使わずにパターンに沿って書き換えていくだけです。
ところで残念ながらトレイトのメソッドは async fn にできません。つまり、以下のようなトレイト定義は書けないということです。
trait Trait {
async fn method(&self) -> Return;
}
これを可能にするにはいくつかの機能をRustに追加しないと実現できないそうなので気長に待ちましょう。
ということでメソッドのシグネチャには async fn (= fn () -> impl Future 相当)ではなく、 dyn Future を使うことになります。
標準ライブラリの Future は実用するには Pin も必要になるので現実的には fn () -> Pin> + 'static + Send>> などの型を使うことになるでしょう。
async fn(&self)-> Return と書いたら裏でfn (&self) -> Pin + 'static + Send>> に変換してくれるasync-traitなどのライブラリもありますが、今回は使っていません。
まだトレイトのメソッドの async fn がどうなるか分からないことから不安定な仕様を先取りするよりは確実に動く方を選びました。
あと元のコードが後程紹介する BoxFut で書かれていたので BoxFut の中身を差し替えるだけの方が変更が少なくて済むという理由もあります。
方針
方針としてはビジネスロジックを記述する部分を互換性を補うコードなしに async / await にすることを目的に移行することにしました。
とはいってもビジネスロジックを中心に外郭のパッケージがあるのでそれはほとんどのパッケージを移行することを意味します。
外部のクレートまでは制御できないので、外郭クレートが外部のクレートを呼ぶときにfutures 0.1からfutures 0.3への互換をはさみ、自社のクレート内ではfutures 0.3しか使わない方針にしました。
ただし、1つだけ例外があって、Actcastで使っているWAF、actix-web 1.0はfutures 0.1ベースで動いているのでこれに関連するコードはfutures 0.1のまま使っています。
こういう大きな変更はタイミングを見て一気にやってしまうのがよさそうですが、 async / await がリリースされた今が一番の好機でしょう。
全体設計
方針が定まったらあとは書き換えていくだけなのですが、1つだけ重要な要素があります。 我々が使っているWAFがactix-webという点です。
冒頭で紹介したOPTiMさんのテックブログにはタスクの実行モデルがいくつか紹介されていますが、actix-webはそのうちのスレッドプールのモデルに近いものです。 クライアントからのリクエストを各スレッドに振り分けたら、レスポンスを返すまでそのスレッドが処理を担当します。
このモデルの大きな特徴の1つとして、タスクがスレッド間を移動しない点が挙げられます。Rust的にいうと + Send が必要なくなります。
+ Send の有無が全体に効いてくるのでまず最初に確認しておきましょう。さもないと全部変換し終わったあとに型が合わずに動かない、のような悲劇が起きます。作業の前に「我々のタスクはスレッドを跨ぐような設計か?」と一度見直してみるとよいでしょう。
余談ですがスレッドプールモデルだと1つ1つのタスクの粒度にバラつきがあった際にスケジューリングが平等にならないという問題が知られています。 actix-webはWebアプリケーションフレームワークということもありタスクの大きさにさほどバラつきはありませんし、そもそも非同期がタスクを細切れにして実行する仕組みなのでデメリットは薄いでしょう。 もうちょっと言うとスレッドプールモデルと対比されているワークスティーリングにも色々種類があり、スレッド毎にキューを持ちつつタスクがなくなったら他のスレッドのキューを奪うようにすることでロックを減らして効率化しているようなものもあります。興味のある方は調べてみて下さい。
先程 Pin> + 'static + Send>> という型を紹介しましたが、我々の場合ここの + Send が不要になる訳です。
現実的にはこの型を毎回書くのはしんどいので以下のように BoxFut という型を定義しておいて各所でそれを使うことになるでしょう。
pub type BoxFut = Pin> + 'static>>;
これはfutures 0.1の頃からあったプラクティスなので既に(futures 0.1版の) BoxFut を定義しているプロダクトも多いかと思います。そういう場合は定義を上記のように差し替えるだけで置換できます。
実作業
おおむね、Cargo.toml にある futures = "0.1" と書かれている箇所を futures = "0.3" と書き換えて、コンパイルエラーを取りつつ async / await を導入していく作業が続きます。
futures 0.1との互換
外部クレートでfutures 0.1を使っていた箇所は互換コードを挟むことになります。
互換コードは compat フィーチャを有効にすると使えるようになります。
[dependencies]
futures = { version = "0.3", features = ["compat"] }
compat を有効にした上で Future01CompatExt をインポートすると Future に compat メソッドが生えてくるのでそれでfutures 0.1の Future をfutures 0.3 の Future に変換できます。
少し実例を紹介しましょう。Reqwestを使っている部分です。
use futures::compat::{Future01CompatExt, Stream01CompatExt};
// `async fn` !!
async fn get_hogehoge(arg: Arg) -> Result, Error> {
// ...
let resp = client
.request(req)
// `compat` を呼んでいる
.compat()
// `compat` したので `await` が呼べる
.await
.map_err(|err| {
error!("get_hogehoge error: {:?}", err);
error!("get_hogehoge cause: {:?}", err.source());
err
})
.map_err(|err| err.context(ErrorKind::Http))?;
debug!("get_hogehoge resp: {:?}", resp);
let status = resp.status();
let body = resp
.into_body()
// `compat` を呼んでいる
.compat()
// futures 0.1にあった `concat2` は `try_concat` に置き換え
.try_concat()
// `compat` したので `await` が呼べる
.await
.map_err(|err| err.context(ErrorKind::Http))?;
// ...
}
コメントで書いた部分が互換コードを呼んでいる部分です。
このようにfutures 0.1を使っている外部クレートを呼ぶコードでも async / await を使ったコードと共存できます。
ところで、 await の位置が気になった方もいるかもしれません。
future.await.map_err(|e| ...)? の部分です。
futures 0.1のコードで future.map_err(|e| ...) と書いていた箇所なので、自然に書き換えるなら TryFutureExt::map_err を使って future.map_err(|e| ...).await? と書くこともできます。しかしここでは future.await で Result にしたあとに Result::map_err を呼んでいます。
元も子もないことを言えばどちらでもいいのですが、一応ここでは可能な限り標準ライブラリのAPIを使うために早めに await しています。futuresもまだ0.3で安定版ではありませんからね。
とはいえ .await? がイディオムのようになっているので .await と ? 繋げたい方もいるでしょうからあまりにするほどのことでもないです。
余談ですがfutures 0.1の map 相当のメソッドはfutures 0.3では map_ok になっているので注意して下さい。
futures 0.1との共存
我々の使っているactix-webはfutures 0.1で動いているのでサーバのエントリーポイント付近ではfutures 0.1とfutures 0.3が共存する汽水域が存在します。0.1と0.3を共存させないといけません。
そういう用途のために crates.io にfutures01 という名前でfutures 0.1のコードが登録されています。これを使って0.1系と0.3系を共存させます。
[dependencies]
futures01 = "0.1"
futures03 = { package = "futures", version = "0.3", features = ["compat"] }
片方がfuturesという名前だと混乱しそうなので平等に0.3の方もfutures03という名前にリネームして導入しました。
どちらともほとんど同じAPIの prelude を公開しているので名前が正面から衝突します。
Rustはグロブインポート同士で名前が衝突すると両方とも名前が見えなくなり、ユーザに名前解決を促す仕組みになっています。
prelude でよく使うのはトレイトなので名前が見えなくても多くのケースでは困らないのですが必要になったら手で解決します。
use futures01::prelude::{Future, *};
use futures03::prelude::*;
futures 0.3から0.1へ
最終的にはactix-webがfutures 0.1を要求するのでfutures 0.3から変換してあげる必要があります。
これはセオリー通りに .boxed_local().compat() で変換可能です。
mdo! {
opt: Option =<< self.0.get(key)
.boxed_local().compat()
.map_err(ErrorInternalServerError);
}
wait
ほとんどの箇所では Future から中身を取り出すコードはないのですが、テストなど一部のコードで wait() を呼んでいました。
多くの場合は futures 0.3の .now_or_never().unwrap() を使うと同等のことができるのですが、名前の通り即時に返ってくる Future 以外に呼ぶとパニックしてしまいます。そういう箇所では仕方なしに一旦futures 0.1に変換してから wait() を呼んでいます。
future
.boxed_local()
.compat()
.wait()
しかし今なら tokio 0.2 がリリースされているので test を使えばテスト内で await できるようになって不要になりそうです。
細かな点や落とし穴
boxed と boxed_local
先程紹介した通り、トレイトからの返り値には BoxFut を使うことになるでしょう。
pub type BoxFut<T> = Pin> + 'static>>;
すると、メソッドはだいたいこういう見た目になるはずです。
fn method(&self, arg: Arg) -> BoxFut {
async {
// ...
Ok(ret)
}.boxed()
}
ここで使っているのはOPTiMさんのテックブログに紹介されている、 FutureExt のboxedです。
これで問題なくコンパイルは通るのですが、微妙に罠があります。
boxed は Self: Send を要求しているのです。なので async ブロックの内側に Send が要求されてしまいます。我々のアプリケーションでは Send が必要ないのでこれは過剰な要求です。
そこで boxed の Self: Send を要求しないバージョンとして boxed_localというAPIがあります。
これを使って以下のように書き換えると全体が破綻しなくて済みます。
fn method(&self, arg: Arg) -> BoxFut {
async {
// ...
Ok(ret)
}.boxed_local()
}
async fn とライフタイム
async fn にはライフタイム関連の罠があります。
下記のような仮想的な関数を考えましょう。
async fn reqest_data(path: &str) -> Result<data> {
let url = format!("https://example.com{}", path);
let data = Client::new()
.request(url)
.await?
.response_to_data();
Ok(data)
}
このタスクは理想的にはリクエストを投げてしまえばサーバからのレスポンスにしか依存しないのでライフタイムは 'static がついてほしいです。
しかし async fn の脱糖ルールに従うと以下のように変換されます。
fn reqest_data(path: &str) -> impl Future<output>> + '_ {
async {
let url = format!("https://example.com{}", path);
let data = Client::new()
.request(url)
.await?
.response_to_data();
Ok(data)
}
}
ここで + '_ の部分は最初に紹介しませんでしたが、Rust 2018 editionで導入された匿名ライフタイムで、ライフタイムが省略されていることを明記する記法です。
関数の型のライフタイムは省略した場合は推論ではなくライフタイムの省略のルールに従って自動で決められます。
これに基いて先程のコードのライフタイムを明示すると以下のようになります。
fn reqest_data<'a>(path: &'a str) -> impl Future<output>> + 'a {
async {
let url = format!("https://example.com{}", path);
let data = Client::new()
.request(url)
.await?
.response_to_data();
Ok(data)
}
}
ということで残念ながら返り値のライフタイムは 'static になってくれません。
自動にまかせるとダメということが分かったので、手動で頑張ります。
結局こういうコードを書くことになります。
fn reqest_data(path: &str) -> impl Future<output>> + 'static {
async {
let url = format!("https://example.com{}", path);
let data = Client::new()
.request(url)
.await?
.response_to_data();
Ok(data)
}
}
'static はなくても変わりませんが明示しておいた方が意図が分かりやすいでしょう。
async / await が目の前にあるのに冗長なコードを書くのは少し歯痒いですね。
async ブロックとライフタイム
実は、先程のコードはコンパイルが通りません。コンパイルしようとすると以下のようなエラーが出ます。
--> async.rs:26:11
|
25 | fn reqest_data(path: &str) -> impl Future<output>> {
| ----------------------------------------- this return type evaluates to the `'static` lifetime...
26 | async {
| ___________^
27 | | let url = format!("https://example.com{}", path);
28 | | let data = Client::new().request(url).await?.response_to_data();
29 | | Ok(data)
30 | | }
| |_____^ ...but this borrow...
|
note: ...can't outlive the anonymous lifetime #1 defined on the function body at 25:1
--> async.rs:25:1
|
25 | / fn reqest_data(path: &str) -> impl Future<output>> {
26 | | async {
27 | | let url = format!("https://example.com{}", path);
28 | | let data = Client::new().request(url).await?.response_to_data();
29 | | Ok(data)
30 | | }
31 | | }
| |_^
help: you can add a constraint to the return type to make it last less than `'static` and match the anonymous lifetime #1 defined on the function body at 25:1
|
25 | fn reqest_data(path: &str) -> impl Future<output>> + '_ {
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
これは async ブロック内にあるデータがタスクの中に放り込まれてしまうためです。
コードをよくみると async の中で path を使っていますよね?そのために async ブロック全体のライフタイムが path のライフタイムに制限されているのです。
これはシンプルに let url = ... の文を async ブロックの外に出してあげると解決します。
fn reqest_data(path: &str) -> impl Future<output>> {
let url = format!("https://example.com{}", path);
async {
let data = Client::new().request(url).await?.response_to_data();
Ok(data)
}
}
同様に、 Client を外部から受け取る設計の場合は client.request(...) の式を async の外に出してしまえば解決します。
fn reqest_data(client: &Client, path: &str) -> impl Future<output>> + 'static {
let url = format!("https://example.com{}", path);
let f = client.request(url);
async {
let data = f.await?.response_to_data();
Ok(data)
}
}
一時変数が必要になるのがもどかしいですね。
これくらいなら TryFutureExt::map_ok を用いて以下のように書いた方が簡潔になります。
fn reqest_data(client: &Client, path: &str) -> impl Future<output>> + 'static {
let url = format!("https://example.com{}", path);
client.request(url).map_ok(Response::response_to_data)
}
私が今回やった移行ではあえて async ブロックを残すようにしました。
async / await への移行というのは必ずしもコードだけでなく、普段からそのコードをメンテナンスしている開発者の意識も移行していく必要があるので多めに async / await を使う判断をしました。
async move ブロック
ほとんどのケースでは async ブロックで問題ないのですが、たまにライフタイムの問題が起きることがあります。
例えば先程の例の拡張で、一度リクエストを送ってURLを取得したあとにそのURLに再度データを投げるような処理を考えます。
fn put_data(path: &str, data: &str) -> impl Future<output>> + 'static {
let url = format!("https://example.com{}", path);
// 関数の引数からリクエストデータを作成
let body = Body::from_str(data);
async {
let loc = Client::new().request(url).await?.location();
let data = Client::new()
// ここで `async` ブロックの外側にあるデータを参照
.body(&body)
.request(loc)
.await?
.response_to_data();
Ok(data)
}
}
これは async ブロックの外側にあるデータ body を参照しているのでライフタイムエラーでコンパイルが通りません。
しかし落ち着いて考えると body は関数内で生成したデータなので自分の都合で置き場所を変更しても構いません。例えば async ブロック内に移動してあげるとタスクの中に含まれるのでライフタイムの問題がなくなります。
fn put_data(path: &str, data: &str) -> impl Future<output>> + 'static {
let url = format!("https://example.com{}", path);
// 関数の引数からリクエストデータを作成
let body = Body::from_str(data);
async {
let loc = Client::new().request(url).await?.location();
// `body` を一旦 `async` block内に移動
let body = body;
let data = Client::new()
// この参照は `async` ブロック内なので問題ない
.body(&body)
.request(loc)
.await?
.response_to_data();
Ok(data)
}
}
しかしこれは少し面倒ですね。 async ブロック外への参照がある度に1行増えますし、どこで参照しているかを1つ1つ把握しないといません。
そういうときに async move ブロックを使うと、 async ブロック外のデータの参照を一気にタスク内に移動できます。
fn put_data(path: &str, data: &str) -> impl Future<output>> + 'static {
let url = format!("https://example.com{}", path);
// 関数の引数からリクエストデータを作成
let body = Body::from_str(data);
// `async move` を使う
async move {
let loc = Client::new().request(url).await?.location();
let data = Client::new()
// この参照は `async` ブロック外だが move しているので問題ない
.body(&body)
.request(loc)
.await?
.response_to_data();
Ok(data)
}
}
因みにActcastのレポジトリでは async ブロックが45回、 async move ブロックが10回使われているようです。
$ git grep 'async {' | wc -l
45
$ git grep 'async move {' | wc -l
10
5、6回に1回くらいは必要になる機能なようなので是非覚えておいて下さい。
コンパイルの通るコードはこちらに置いておきます。
Early return
async ブロックのおかげで型テトリスが多くの場合不要になり、条件分岐がぐっと楽になりました。
しかし、やはりどうしても型合わせをする必要があるケースがあります。そのうちの1つが async ブロックの外で行なうearly returnです。
先程のコードを変更して、JSONデータを投げてみましょう。但し Body::from_json は Result を返すとします。素直にやると ? を使えばよさそうです。
fn put_json(path: &str, json: &Json) -> impl Future<output>> + 'static {
let url = format!("https://example.com{}", path);
// from_json を呼んだあとに `?` でエラーなら即座に帰る
let body = Body::from_json(json)?;
async move {
let loc = Client::new().request(url).await?.location();
let data = Client::new()
.body(&body)
.request(loc)
.await?
.response_to_data();
Ok(data)
}
}
これはあえなくコンパイルエラーになってしまいます。
返り値が impl Future なのに対して ? を使って Result を返そうとしているために起きるエラーです。
error[E0277]: the `?` operator can only be used in a function that returns `Result` or `Option` (or another type that implements `std::ops::Try`)
--> async.rs:79:16
|
79 | let body = Body::from_json(json)?;
| ^^^^^^^^^^^^^^^^^^^^^^ cannot use the `?` operator in a function that returns `impl std::future::Future`
|
= help: the trait `std::ops::Try` is not implemented for `impl std::future::Future`
= note: required by `std::ops::Try::from_error`
error: aborting due to previous error
For more information about this error, try `rustc --explain E0277`.
これの解決策の1つには futures の Either を使う手があります。futures 0.1ではお馴染の方法ですね。
fn put_json(path: &str, json: &Json) -> impl Future<output>> + 'static {
// futures 0.3の `Either` と `err` をインポート
use futures::future::{err, Either};
let url = format!("https://example.com{}", path);
// 関数の引数からリクエストデータを作成
let body = match Body::from_json(json) {
Ok(body) => body,
// エラーだった場合は `Left` として帰る。
// `Err(e)` を `Future` にするには
// `err(e)` や `async { Err(e) }` などがある
Err(e) => return Either::Left(err(e)),
};
Either::Right(async move {
let loc = Client::new().request(url).await?.location();
let data = Client::new()
.body(&body)
.request(loc)
.await?
.response_to_data();
Ok(data)
})
}
ただしこれはちょと不恰好です。状況が許すなら、 async ブロック内にデータを持ち込んでから ? という手もあります。
fn put_json(path: &str, json: &Json) -> impl Future<output>> + 'static {
let url = format!("https://example.com{}", path);
// 関数の引数からリクエストデータを作成
// ここではまだ `?` しない
let body = Body::from_json(json);
async {
// `async` ブロックの中で `?` すると問題なくなる
let body = body?;
let loc = Client::new().request(url).await?.location();
let data = Client::new()
.body(&body)
.request(loc)
.await?
.response_to_data();
Ok(data)
}
}
状況判断しながら使い分けて下さい。
まとめ
「async / await に移行する」といったときに必要になる実作業を示しました。
また、多くの場合は機械的に変換できるものの、いくつか注意点があることも紹介しました。
余談ですが、この作業は足掛け2日かかりました。とはいっても1日中 async と await を書き続けた訳ではなくて(そうだとしたら苦行ですね)、他の作業と並行して行っていたので実作業でいうと1人/日といったところじゃないでしょうか。
今回書いた async / await は1800個を数えます。
$ git grep -o -e 'async' -e 'await' | wc -l
1810
恐らく後にも先にも人生で一番 async / await と書いた2日間だったと思います。功徳も溜まったことでしょうし来世は期待できそうですね。
今のActcastは既に async / await を使って動いています。
皆さんも移行の折は今回のケースを参考にしてみて下さい。