Contents
async fn がコンパイル時に生成する状態機械
何が起きているか
async fn は 呼び出し時に 実行されるコードではなく、コンパイル時に 以下の2つを自動で作ります。
- 匿名構造体 – 関数本体で使われるローカル変数や中間結果をフィールドとして保持。
Futureの実装 – その構造体がFutureトレイトを実装し、pollメソッドの中で.awaitが 「現在の状態から次の状態へ遷移する」 ロジックに変換される。
参考: Rust Async Book – https://doc.rust-lang.org/book/ch20-02-multithreaded.html
cargo expand で見えるコード例
|
1 2 3 4 5 6 7 |
// async fn の元になるコード async fn fetch() -> u32 { let a = async_op1().await; let b = async_op2(a).await; b + 1 } |
cargo expand(公式ツール)で得られる概形は次の通りです。
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 |
use std::pin::Pin; use std::task::{Context, Poll}; struct Fetch<'a> { state: u8, a: Option<u32>, // 省略された内部フィールド … } impl<'a> Future for Fetch<'a> { type Output = u32; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { // `self` は Pin されているので安全に参照できる let this = unsafe { self.get_unchecked_mut() }; loop { match this.state { 0 => { // async_op1 のポーリング match async_op1().poll(cx) { Poll::Ready(v) => { this.a = Some(v); this.state = 1; } Poll::Pending => return Poll::Pending, } } 1 => { let a = this.a.unwrap(); match async_op2(a).poll(cx) { Poll::Ready(v) => return Poll::Ready(v + 1), Poll::Pending => return Poll::Pending, } } _ => unreachable!(), } } } } |
stateが現在の制御フローを示す。- 各
awaitは内部で別々のFutureのpollを呼び出し、Pendingになった時点で外側のpollもPendingを返す。
この変換があるおかげで、ユーザーは 「.await」 と書くだけで非同期制御フローを記述できる。
基本的な async 関数と .await の使い方
|
1 2 3 4 5 6 7 8 9 10 11 12 13 |
use std::time::Duration; use async_std::task; // 例として async‑std のブロックオン async fn hello() -> &'static str { task::sleep(Duration::from_secs(1)).await; "Hello, async!" } fn main() { let msg = task::block_on(hello()); println!("{}", msg); // => Hello, async! } |
task::sleepは 非同期タイマー で、内部的に OS のタイマーやイベントループを利用してブロックしない。.awaitが現れるたびにコンパイラ生成コードのpollが呼ばれ、タイマーが完了すると再開される。
公式ドキュメント: https://doc.rust-lang.org/std/future/trait.Future.html
Future トレイトを手動で実装するポイント
3‑1️⃣ poll と Pin/Unpin の意味
| 用語 | 説明 |
|---|---|
Pin<&mut T> |
T がメモリ上で移動しないことを保証。自己参照構造体(例: 内部に &self を保持する)を安全に扱える。 |
Unpin |
デフォルトではすべての型は Unpin だが、#[pin_project] 等で「ピン留めできない」構造体を作ることも可能。Future::poll の引数は Pin<&mut Self> なので、Unpin が必要なときは Box::pin や Pin::new_unchecked を使う。 |
| 安全にピン留めする方法 |
|
ピン留めの実例(pin_project 使用)
|
1 2 3 |
# Cargo.toml pin-project = "1.1" |
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
use pin_project::pin_project; use std::{future::Future, pin::Pin, task::{Context, Poll}}; #[pin_project] struct MyFuture { #[pin] inner: async_std::task::Sleep, } impl Future for MyFuture { type Output = (); fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { // `self.project()` が安全にピン化されたフィールドへの可変参照を返す let this = self.project(); this.inner.poll(cx) } } |
3‑2️⃣ カウンタ Future の実装例
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 |
use std::{ future::Future, pin::Pin, task::{Context, Poll}, }; pub struct SimpleCounter { count: u8, } impl SimpleCounter { pub fn new() -> Self { Self { count: 0 } } } impl Future for SimpleCounter { type Output = u8; fn poll(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> { if self.count < 10 { self.count += 1; // 実際の非同期処理では `Waker` に通知させるが、デモなのですぐに Pending Poll::Pending } else { Poll::Ready(self.count) } } } |
- ポイント:
selfはPin<&mut Self>で受け取っているが、この構造体は自己参照を持たないのでUnpinでも問題なし。 - 実際に使うときは
futures::executor::block_on(SimpleCounter::new())のようにPin::newが自動的に行われる。
3‑3️⃣ 正しいタイマー Future(ブロッキングしない実装)
以下の例では スレッドを生成してブロック する代わりに、std::thread::spawn で 非同期 に Waker を呼び出すだけです。ランタイム外でも安全に利用でき、ブロッキング操作は一切行いません。
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 |
use std::{ future::Future, pin::Pin, task::{Context, Poll, Waker}, time::{Duration, Instant}, sync::{Arc, Mutex}, }; /// `SimpleTimer` は指定した時間が経過したら `Ready(())` を返す Future。 pub struct SimpleTimer { deadline: Instant, /// まだ完了していないときに保持する(共有)Waker shared_waker: Arc<Mutex<Option<Waker>>>, } impl SimpleTimer { pub fn after(dur: Duration) -> Self { let deadline = Instant::now() + dur; let timer = SimpleTimer { deadline, shared_waker: Arc::new(Mutex::new(None)), }; // タイマーが満了したら Waker を呼び出すスレッドを起動 let waker_clone = Arc::clone(&timer.shared_waker); std::thread::spawn(move || { let now = Instant::now(); if deadline > now { std::thread::sleep(deadline - now); } // Waker が登録されていれば呼び出す if let Some(w) = waker_clone.lock().unwrap().take() { w.wake(); } }); timer } } impl Future for SimpleTimer { type Output = (); fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { if Instant::now() >= self.deadline { // 時間経過 → 完了 Poll::Ready(()) } else { // まだ時間が来ていないので、現在の Waker を保存して次回呼び出しに備える let mut guard = self.shared_waker.lock().unwrap(); *guard = Some(cx.waker().clone()); Poll::Pending } } } |
- 安全性:
std::thread::sleepは別スレッドで実行され、ランタイムのスレッドをブロックしない。 - 注意点: 実際のプロダクションコードでは
tokio::time::Sleepやasync‑std::task::sleepのように 非同期 I/O タイマー を利用する方がオーバーヘッドが低く、スレッド数を増やす必要がない。
公式ドキュメント: https://doc.rust-lang.org/std/future/trait.Future.html#method.poll
最小限の executor を作る
4‑1️⃣ RawWaker と安全な Waker の構築例
RawWaker は 低レベル API で、正しく実装しないと未定義動作になる。以下は所有権を Arc に委ねた完全版です。
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 |
use std::{ sync::Arc, task::{RawWaker, RawWakerVTable, Waker}, }; /// `WakerData` は `wake` が呼ばれたときに実行したい処理(ここではキューへの再投入)を保持。 struct WakerData { // 例: タスクキューへのハンドル enqueue: Box<dyn Fn() + Send + Sync>, } fn raw_waker_clone(data: *const ()) -> RawWaker { // Arc の参照カウントを増やすだけで OK let arc = unsafe { Arc::<WakerData>::from_raw(data as *const WakerData) }; std::mem::forget(arc.clone()); // 増えた分を保持 RawWaker::new(data, &VTABLE) } fn raw_waker_wake(data: *const ()) { let arc = unsafe { Arc::<WakerData>::from_raw(data as *const WakerData) }; (arc.enqueue)(); // キューにタスクを入れる等の実装 // `wake` 後は所有権が移動したので drop が走る } fn raw_waker_wake_by_ref(data: *const ()) { let arc = unsafe { Arc::<WakerData>::from_raw(data as *const WakerData) }; (arc.enqueue)(); std::mem::forget(arc); // 参照カウントを減らさない } fn raw_waker_drop(data: *const ()) { // `Arc` のドロップで参照カウントがデクリメントされるだけ unsafe { Arc::<WakerData>::from_raw(data as *const WakerData) }; } // VTABLE は 4 つの関数ポインタを保持するだけ static VTABLE: RawWakerVTable = RawWakerVTable::new( raw_waker_clone, raw_waker_wake, raw_waker_wake_by_ref, raw_waker_drop, ); /// 任意の `enqueue` クロージャから安全な `Waker` を作るユーティリティ fn make_waker<F>(enqueue: F) -> Waker where F: Fn() + Send + Sync + 'static, { let data = Arc::new(WakerData { enqueue: Box::new(enqueue), }); let raw = RawWaker::new(Arc::into_raw(data) as *const (), &VTABLE); unsafe { Waker::from_raw(raw) } } |
cloneはArcの参照カウントを増やすだけ。wake/wake_by_refでは保存したクロージャを呼び出し、タスクキューへの再投入等の実装を行う。dropは単にArcを復元してスコープを抜けさせるだけで、参照カウントが減少する。
4‑2️⃣ futures::task::ArcWake とシングルスレッドキュー
futures クレートが提供する ArcWake は上記ロジックを簡潔に書くヘルパーです。以下は ミニ executor の核心部分です。
|
1 2 3 4 5 |
# Cargo.toml [dependencies] futures = "0.3" lazy_static = "1.4" |
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 |
use futures::task::{ArcWake, waker_ref}; use std::{ collections::VecDeque, future::Future, pin::Pin, sync::{Arc, Mutex}, task::{Context, Poll}, }; /// タスクは `Future` を保持し、`ArcWake` により自ら再スケジュールできる。 struct Task { // `Box<dyn Future>` で多態的に保持 future: Mutex<Option<Box<dyn Future<Output = ()> + Send>>>, } impl ArcWake for Task { fn wake_by_ref(arc_self: &Arc<Self>) { // キューへ再投入(シングルスレッドなのでロックだけで安全) let mut queue = GLOBAL_QUEUE.lock().unwrap(); queue.push_back(arc_self.clone()); } } // グローバルなタスクキュー lazy_static::lazy_static! { static ref GLOBAL_QUEUE: Mutex<VecDeque<Arc<Task>>> = Mutex::new(VecDeque::new()); } /// `spawn` 相当のユーティリティ fn spawn<F>(fut: F) where F: Future<Output = ()> + Send + 'static, { let task = Arc::new(Task { future: Mutex::new(Some(Box::new(fut))), }); GLOBAL_QUEUE.lock().unwrap().push_back(task); } |
4‑3️⃣ 実行ループ(シングルスレッド)
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
fn run_executor() { while let Some(task) = GLOBAL_QUEUE.lock().unwrap().pop_front() { // `ArcWake` が実装されたタスクから Waker を生成 let waker = waker_ref(&task); let mut ctx = Context::from_waker(&*waker); // Future の取り出しと poll let mut future_slot = task.future.lock().unwrap(); if let Some(mut fut) = future_slot.take() { match Pin::new(&mut *fut).poll(&mut ctx) { Poll::Ready(_) => { // 完了 → 何もしない(ドロップで解放される) } Poll::Pending => { // 再度キューへ入れる。`wake_by_ref` が呼ばれたら自動で再投入される *future_slot = Some(fut); } } } } } |
ポイント
-Wakerが呼び出された瞬間に同スレッド上でpollが再度実行できるようになる。
- 本コードは シングルスレッド 向けなので、Mutexだけで競合を防げます。マルチスレッド化する場合はArc<Mutex<_>>の代わりにcrossbeam::queue::SegQueue等を検討してください。
カスタムランタイムと既存ライブラリの比較・実務での選択基準
| 項目 | Tokio (公式) | async‑std (公式) | 本記事のミニ executor |
|---|---|---|---|
| スレッドモデル | ワーカープール(デフォルト=CPU 数) | シングルスレッドまたは task::block_on |
手動実装(シングル or カスタム) |
| I/O サポート | 完全非同期 TCP/UDP/ファイル、TLS 等多数 | 同様に非同期 I/O が標準装備 | 外部クレート (mio, tokio‑io) を自前で組み合わせる必要あり |
| エコシステム | #[tokio::main] マクロ、豊富なプラグイン |
標準ライブラリに近い API | 学習・プロトタイプ向き |
| オーバーヘッド | 高め(スケジューラ最適化が多い) | 中程度 | 最小限実装なら極低(ただし機能は限定的) |
| 推奨シーン | 大規模サーバ、マルチコア活用が必須 | CLI ツール、軽量サービス | ランタイム内部で独自スケジューリングが必要なライブラリ開発や教育目的 |
どちらを選ぶべきか?
| 条件 | 推奨 |
|---|---|
| 高スループット・マルチコアが必須 | Tokio |
| 依存を極力減らしたい、シンプルさ重視 | async‑std |
| ランタイムの内部構造を自分で制御したい、または学習目的 | 本ミニ executor |
公式情報: https://tokio.rs/ , https://docs.rs/async-std/latest/async_std/
典型的なコンパイルエラーと対処法
| エラーメッセージ例 | 原因 | 修正例 |
|---|---|---|
future cannot be sent between threads safely |
Future が Send を実装していない(スレッド間で共有) |
すべての内部型を Arc<Mutex<_>> + Send に置き換える。例: Box::pin(async move { … }) |
cannot move out of captured outer variable in an async block |
所有権を async ブロック内で移動しようとした |
変数を Arc::clone(&var) して共有、または move キーワードを外す |
the trait bound 'MyFuture: Unpin' is not satisfied |
Pin<&mut Self> が必要なのに Unpin がない |
Box::pin(my_future) でピン留め、もしくは #[pin_project] を使う |
実践的な対処フロー
- エラーメッセージを読む → 必要なトレイト (
Send,Sync,Unpin) が足りないことが多い。 - 型の所有権・共有方法を見直す →
Arc/Mutexの組み合わせでスレッド安全にする。 - ピン留めが必要か判定 → 自己参照構造体なら必ず
Pinが必要。pin_projectを導入すると安全にフィールドごとに#[pin]を付与できる。
デバッグ・ベンチマーク手法
1. cargo expand
|
1 2 3 |
cargo install cargo-expand # 初回だけ cargo expand --lib # 生成された状態機械を確認 |
- 目的:
async fnがどのような構造体・pollに変換されたかを可視化。
2. tokio::task::yield_now / async_std::task::yield_now
|
1 2 3 4 5 6 7 8 |
use async_std::task; async fn step() { println!("before yield"); task::yield_now().await; println!("after yield"); } |
- 活用例: 手動でスケジューラの切り替えタイミングを観測し、デッドロックやスタックオーバーフローを防止。
3. criterion でベンチマーク
|
1 2 3 4 |
# Cargo.toml [dev-dependencies] criterion = "0.5" |
|
1 2 3 4 5 6 7 8 9 10 11 12 |
use criterion::{criterion_group, criterion_main, Criterion}; use futures::executor::block_on; fn bench_simple_counter(c: &mut Criterion) { c.bench_function("simple_counter", |b| { b.iter(|| block_on(SimpleCounter::new())) }); } criterion_group!(benches, bench_simple_counter); criterion_main!(benches); |
- 比較対象:
SimpleCounter(自前実装) vs.tokio::time::sleep等。 - 結果の活用: どちらがレイテンシ/スループットに優れるかを数値で示す。
4. ロギングとトレーシング
| クレート | 用途 |
|---|---|
tracing |
非同期タスクの開始・終了を階層的に可視化。 |
env_logger |
手軽なログ出力(RUST_LOG=debug cargo run)。 |
|
1 2 3 4 5 6 7 8 |
use tracing::{info, instrument}; #[instrument] async fn do_work() { info!("work started"); // … } |
次のステップ
- コードをローカルにコピー →
cargo new async_demo && cd async_demo - 必要な依存 (
futures,lazy_static,pin-project) をCargo.tomlに追記。 cargo runでミニ executor がタスクを正しくスケジュールできるか確認。- 完成したら GitHub にリポジトリを作り、README に本記事へのリンクと実行手順を書いて公開。
- 定期的に 公式ドキュメント(Rust Async Book、Tokio Docs)やクレートの
CHANGELOGをチェックし、最新機能やベストプラクティスを取り入れる。
公式情報まとめ
- Rust async/await: https://doc.rust-lang.org/std/future/
- Tokio: https://tokio.rs/tokio/tutorial
- futures crate: https://docs.rs/futures/latest/futures/
この記事は、冗長な「Point (再提示)」を削除し、実装例の正確性と安全性に重点を置いた構成になっています。読者が自分で非同期コードを書き、必要に応じてカスタムランタイムまで作れるようになることを目指しています。