Rust中的状态机实现
我概括一下,引入 Future 必须解决自引用被swap的问题
Pin通过 提供 &Self 来让编译器进行入参检查,mem::swap(& mut self) 需要 mut 类型,所以编译器类型检查失败而退出
https://rust-lang.github.io/async-book/04_pinning/01_chapter.html
Future 任务会跨越线程执行,我们知道Future可以编译成状态机yield执行,那在从Future Pending 到 Done 的过程中需要将全部的状态统一保存起来。
这里最好的办法是 struct,也即 async 中所有的变量都在 struct 上分配,并通过self引用。
整个 Future 执行期间全部的变量都通过 enum 进行保存。
Future 是如何保存执行状态的?
编译之前
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 use std::future::Future;use std::pin::Pin;use std::task::{Context, Poll};use std::time::{Duration, Instant};struct Delay { when: Instant, } impl Future for Delay { type Output = &'static str ; fn poll (self : Pin<&mut Self >, cx: &mut Context<'_ >) -> Poll<&'static str > { if Instant::now () >= self .when { println! ("Hello world" ); Poll::Ready ("done" ) } else { cx.waker ().wake_by_ref (); Poll::Pending } } } #[tokio::main] async fn main () { let when = Instant::now () + Duration::from_millis (10 ); let future = Delay { when }; let out = future.await ; assert_eq! (out, "done" ); }
编译之后
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 use std::future::Future;use std::pin::Pin;use std::task::{Context, Poll};use std::time::{Duration, Instant};enum MainFuture { State0, State1 (Delay), Terminated, } impl Future for MainFuture { type Output = (); fn poll (mut self : Pin<&mut Self >, cx: &mut Context<'_ >) -> Poll<()> { use MainFuture::*; loop { match *self { State0 => { let when = Instant::now () + Duration::from_millis (10 ); let future = Delay { when }; *self = State1 (future); } State1 (ref mut my_future) => { match Pin::new (my_future).poll (cx) { Poll::Ready (out) => { assert_eq! (out, "done" ); *self = Terminated; return Poll::Ready (()); } Poll::Pending => { return Poll::Pending; } } } Terminated => { panic! ("future polled after completion" ) } } } } }
比如如下async代码块
1 2 3 4 5 6 async { let mut x = [0 ; 128 ]; let read_into_buf_fut = read_into_buf (&mut x); read_into_buf_fut.await ; println! ("{:?}" , x); }
下面是desugar的代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 struct AsyncFuture { x: [u8 ; 128 ], read_into_buf_fut: ReadIntoBuf state: State, } enum State { Start, AwaitingReadIntoBuf, Done } struct ReadIntoBuf { buf: &'a mut [u8 ] }
会被rustc编译成Future对象,并且跨.await的变量被封装到struct保存,通过生成state维护状态机的状态
并不是全部的local variable 都会被保存,rustc只会保存await需要的变量,这些变量特点是await返回后就会被销毁,所以需要保存到struct
https://discord.com/channels/500028886025895936/500336333500448798/854955299522084914
由于保存变量状态, poll 中获得这状态通过变量引用访问,所以struct往往会保存变量的引用,这就使得 future 不能被移动,所以编译器会为生成的 future 状态机增加 PhantomPinned,使得 future 是 !Unpin
https://os.phil-opp.com/async-await/#pinning-and-futures
https://www.reddit.com/r/learnrust/comments/fkvr15/why_cant_an_async_block_or_fn_implement_unpin/
也正是由于 future !Unpin,为了再次 poll 这个future,必须重新 Pin 住
所以 shadowsocks-rust 这里重新Pin住
https://github.com/willdeeper/shadowsocks-rust/blob/6ff4f5b04b8e22d36cd54477cfcadf8cb403de21/bin/sslocal.rs#L463
虽然看着 server 是 stack local,但 async 会将其保存到 struct,确保pin的整个过程中server始终存在。
https://os.phil-opp.com/async-await/#stack-pinning-and-pin-mut-t
Future 可能会出现自引用结构,比如 https://os.phil-opp.com/async-await/#self-referential-structs
,在await
之后获取了array的引用。这种情况下future move会出问题
https://os.phil-opp.com/async-await/#pinning-and-futures
The reason that this method takes self: Pin<&mut Self> instead of the normal &mut self is that future instances created from async/await are often self-referential, as we saw above. By wrapping Self into Pin and letting the compiler opt-out of Unpin for self-referential futures generated from async/await, it is guaranteed that the futures are not moved in memory between poll calls
https://os.phil-opp.com/async-await/#saving-state-1
Language-supported implementations of cooperative tasks are often even able to backup up the required parts of the call stack before pausing. As an example, Rust’s async/await implementation stores all local variables that are still needed in an automatically generated struct
Rust没找到在线编辑器看编译后的代码,但stackless理论中,js和rust最接近,后面不不明白,可以看js编译后是如何利用函数闭包保存状态的
js async编译后的状态机
为什么说 Pin 是零开销?
Pin! 的内容会分配到堆上防止移动,对于自引用结构为了防止移动一定会内存分配到堆上 ,零开销的意思是不会进行多余的 堆分配。
mem::swap 要求 参数为 &mut T
,只要我们保证不泄漏出 mut 就可保证用户代码不会 move 内存造成不安全编程
除非显式 !Unpin, 否则编译器会自动实现Unpin,也就是Rust里默认变量是可以移动的
比如下面这代码
https://play.rust-lang.org/?version=stable&mode=debug&edition=2018&gist=636b71d4acef346a9ce810b735908cb1
https://stackoverflow.com/a/49917903/7529562
Javascript Promise 和 Rust Future 的不同
JavaScript 的 Promise 调用可能会返回新的 Promise,而Rust的Future只会返回 Pending 或 Ready,并不会返回新的 Future
Future在Rust中是块写法
1 2 3 thread::spawn (async { });
这 async 编译为 Future,Future::poll返回 Pending / Ready
,想孵化新的异步任务需调用运行时提供的孵化方法,而不是通过返回 Future
1 2 3 4 5 6 7 8 tokio::spawn (async { let join_hander = tokio::spawn (async { }); join_hander.await ?; });
JS中的状态机
我将用 Javascript 来演示 async 如何实现。 async 原理类似 C# 的 async Task,Rust中的 ?. Future
async 本质是依赖 回调函数
1 2 3 4 5 6 7 8 async function fetchDataFromRemote (path ) { const url = 'https://' + path; const r1 = await api.get (url) const r2 = await api.getFrom (url) return [r1,r2] } const data = await fetchDataFromRemote ('api.github.com/repos/tokio-rs/mini-redis/languages' )
1 2 3 4 5 6 7 8 9 10 11 12 fetchDateFromRemote ( ) { const url = 'https://' + path; let state = 0 ; let finalState = 2 ; let waitFor = null switch (state) { case 0 : { waitFor = api.get (url); return waitFor; } } }
又或者下面从babel拷贝过来的代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 async function getActive (keyId: string ) { if (typeof itemId === 'string' ) { const r = await (await api.items .get (keyId, itemId)).data ; setDefault (r.value ); setFields ({ key : 'value' , value : r.value }); return ; } const k = await (await api.keys .get (keyId)).data ; if (Number .parseInt (k?.active ) !== -1 ) { const active_item = await (await api.items .get (keyId, itemId)).data setDefault (active_item.value ); setFields ({ key : 'value' , value : active_item.value }); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 function getActive (keyId ) { return __awaiter (this , void 0 , void 0 , function ( ) { var r, k, active_item; return __generator (this , function (_a ) { switch (_a.label ) { case 0 : if (!(typeof itemId === 'string' )) return [3 , 3 ]; return [4 , _api__WEBPACK_IMPORTED_MODULE_5__["default" ].items .get (keyId, itemId)]; case 1 : return [4 , (_a.sent ()).data ]; case 2 : r = _a.sent (); setDefault (r.value ); setFields ({ key : 'value' , value : r.value }); return [2 ]; case 3 : return [4 , _api__WEBPACK_IMPORTED_MODULE_5__["default" ].keys .get (keyId)]; case 4 : return [4 , (_a.sent ()).data ]; case 5 : k = _a.sent (); debugger ; if (!(Number .parseInt (k === null || k === void 0 ? void 0 : k.active ) !== -1 )) return [3 , 8 ]; return [4 , _api__WEBPACK_IMPORTED_MODULE_5__["default" ].items .get (keyId, itemId)]; case 6 : return [4 , (_a.sent ()).data ]; case 7 : active_item = _a.sent (); setDefault (active_item.value ); setFields ({ key : 'value' , value : active_item.value }); _a.label = 8 ; case 8 : return [2 ]; } }); }); }