Hi folks,
I’m one of those people that can never properly grok something until I go and implement it myself. I’ve spent quite a bit of my free time over the last few weeks trying to grok the whole asynchronous IO situation. One part that’s been pretty rough going is trying to work out how to actually use the experimental async/await from @alexcrichton effectively with tokio to decode protocols. So, naturally I decided to implement async/await myself to see if that would help me understand what’s going on.
To my surprise, the result seems to actually work! And it has some interesting ideas, so I thought I’d share it and see if any of them are useful to you.
At this point I need to throw in a big disclaimer: My implementation does not actually interop with mio, tokio or the standard Future/Stream traits. This wasn’t really important for the learning experience.
Interesting Idea #1 - Separate traits for a taxonomy of stream types
I provide 4 separate types of streams, based on what they are allowed to do. I think that having this distinction in the type system is useful. Maybe the usefulness even outweighs the overhead of the complexity… not sure yet.
The 4 types of streams are:
-
VoidStream
- This is a bit of an odd one. A stream which must never yield an item and must never complete. Each time it’s polled it runs until it becomes ‘not ready’. It could be useful to implement a background process. -
UnitStream<C>
- This is your standard ‘Future’. A stream which may eventually complete with a single terminal value. -
InfiniteStream<Y>
- A stream which may continue to yield non-terminal values but must never complete. -
FiniteStream<Y, C>
- This is the closest one to your standard ‘Stream’. This stream may yield multiple non-terminal values and then may at some point complete with a value. Importantly the type of the completion value may be different to the type of the items yielded by the stream.
Interesting Idea #2 - await_item vs await_complete
Sometimes you want to wait for a terminal value, e.g. a UnitStream (Future), or the last value of a FiniteStream. Other times you just want to wait for the next non-terminal value in a FiniteStream or InfiniteStream. So I provide 2 separate marcos for these different operations. I don’t believe the futures-await crate yet has the equivalent for the latter.
A motivating example usage
println!("### Testing a protocol decoder");
let mock_input = finite_stream!({
not_ready!();
yield_item!(3);
not_ready!();
yield_item!(21);
yield_item!(22);
not_ready!();
not_ready!();
yield_item!(23);
not_ready!();
not_ready!();
yield_item!(0);
yield_item!(0);
not_ready!();
yield_item!(1);
not_ready!();
yield_item!(5);
yield_item!(5);
yield_item!(5);
String::from("end of stream")
});
fn decode<S: FiniteStream<u8, String>>(mut input: S) -> impl FiniteStream<Vec<u8>, Result<(), String>> {
finite_stream!({
loop {
let length = await_item!(input)? as usize;
let mut message = Vec::with_capacity(length);
for _ in 0..length {
let byte = await_item!(input)?;
message.push(byte);
}
yield_item!(message);
}
})
}
let mut decoder = decode(mock_input);
// NOTE these loops are just to print out the results
// usually a global event loop would be use for your application
loop {
let p = decoder.poll();
match p {
FiniteStreamPoll::NotReady => {
println!("not ready");
}
FiniteStreamPoll::Yielded(x) => {
println!("yielded: {:?}", x);
}
FiniteStreamPoll::Complete(x) => {
println!("complete: {:?}", x);
break;
}
}
}
Enough talk
The full code with plenty of comments and example usage can be found here:
https://play.rust-lang.org/?gist=ba2affe775d0adc4e50c962fe7c1c740&version=nightly