So I implemented my own async/await, just for fun

Hi folks,

I’m one of those people that can never properly grok something until I go and implement it myself. I’ve spent quite a bit of my free time over the last few weeks trying to grok the whole asynchronous IO situation. One part that’s been pretty rough going is trying to work out how to actually use the experimental async/await from @alexcrichton effectively with tokio to decode protocols. So, naturally I decided to implement async/await myself to see if that would help me understand what’s going on.

To my surprise, the result seems to actually work! And it has some interesting ideas, so I thought I’d share it and see if any of them are useful to you.

At this point I need to throw in a big disclaimer: My implementation does not actually interop with mio, tokio or the standard Future/Stream traits. This wasn’t really important for the learning experience.

Interesting Idea #1 - Separate traits for a taxonomy of stream types

I provide 4 separate types of streams, based on what they are allowed to do. I think that having this distinction in the type system is useful. Maybe the usefulness even outweighs the overhead of the complexity… not sure yet.

The 4 types of streams are:

  • VoidStream - This is a bit of an odd one. A stream which must never yield an item and must never complete. Each time it’s polled it runs until it becomes ‘not ready’. It could be useful to implement a background process.
  • UnitStream<C> - This is your standard ‘Future’. A stream which may eventually complete with a single terminal value.
  • InfiniteStream<Y> - A stream which may continue to yield non-terminal values but must never complete.
  • FiniteStream<Y, C> - This is the closest one to your standard ‘Stream’. This stream may yield multiple non-terminal values and then may at some point complete with a value. Importantly the type of the completion value may be different to the type of the items yielded by the stream.

Interesting Idea #2 - await_item vs await_complete

Sometimes you want to wait for a terminal value, e.g. a UnitStream (Future), or the last value of a FiniteStream. Other times you just want to wait for the next non-terminal value in a FiniteStream or InfiniteStream. So I provide 2 separate marcos for these different operations. I don’t believe the futures-await crate yet has the equivalent for the latter.

A motivating example usage

    println!("### Testing a protocol decoder");
    let mock_input = finite_stream!({
        not_ready!();
        yield_item!(3);
        not_ready!();
        yield_item!(21);
        yield_item!(22);
        not_ready!();
        not_ready!();
        yield_item!(23);
        not_ready!();
        not_ready!();
        yield_item!(0);
        yield_item!(0);
        not_ready!();
        yield_item!(1);
        not_ready!();
        yield_item!(5);
        yield_item!(5);
        yield_item!(5);
        String::from("end of stream")
    });

    fn decode<S: FiniteStream<u8, String>>(mut input: S) -> impl FiniteStream<Vec<u8>, Result<(), String>> {
        finite_stream!({
            loop {
                let length = await_item!(input)? as usize;
                let mut message = Vec::with_capacity(length);
                for _ in 0..length {
                    let byte = await_item!(input)?;
                    message.push(byte);
                }
                yield_item!(message);
            }
        })
    }

    let mut decoder = decode(mock_input);

    // NOTE these loops are just to print out the results
    // usually a global event loop would be use for your application
    loop {
        let p = decoder.poll();
        match p {
            FiniteStreamPoll::NotReady => {
                println!("not ready");
            }
            FiniteStreamPoll::Yielded(x) => {
                println!("yielded: {:?}", x);
            }
            FiniteStreamPoll::Complete(x) => {
                println!("complete: {:?}", x);
                break;
            }
        }
    }

Enough talk

The full code with plenty of comments and example usage can be found here:

https://play.rust-lang.org/?gist=ba2affe775d0adc4e50c962fe7c1c740&version=nightly

4 Likes

Just an observation but it almost feels like your 4 types of streams are variants of the generic “Generator” trait, just specialised to use different types. If we include the never type (!) to indicate something is impossible then you effectively have:

  • VoidStream = Generator<Yield=(), Return=!>
  • UnitStream<T> = Generator<Yield=!, Return=T>
  • InfiniteStream<Y> = Generator<Yield=Y, Return=!>
  • FiniteStream<Y, C> = Generator<Yield=Y, Return=C>

… Or am I just seeing patterns where they don’t exist?

1 Like

@Michael-F-Bryan those are definitely not imaginary patterns, there are similar exact correspondences between Generator and futures::{Future,Stream} as well. That’s how futures_await is actually implemented under the surface:

  • Future<Item=T, Error=E> = Generator<Yield=Async<!>, Return=Result<T, E>>
  • Stream<Item=T, Error=E> = Generator<Yield=Async<T>, Return=Result<(), E>>

(Note here one thing your correpondences are missing, Yield must be able to encode whether it’s yielding a value, or whether it’s yielding because it’s not ready to yield/return a value yet, that’s done via enum Async<T> { Ready(T), NotReady } in futures)

(and if generators took arguments, potentially something like Sink<SinkItem=T, SinkError=E> = Generator<Argument=Option<T>, Yield=AsyncSink<T>, Return=Result<(), E>>)

1 Like

Indeed, in fact that’s how they’re implemented (modulo the details of ‘not ready’, as @Nemo157 has pointed out) . You can see this in lines 432 488 of the source linked in the OP.

  • VoidStream = Generator<Yield=(), Return=Void>
  • UnitStream<C> = Generator<Yield=(), Return=C>
  • InfiniteStream<Y> = Generator<Yield=Option<Y>, Return=Void>
  • FiniteStream<Y, C> = Generator<Yield=Option<Y>, Return=C>

Note that I use () rather than Async<Void> and Option<Y> rather than Async<Y>, but these are basically equivalent.

What I did not know until I did some more research (prompted by your comment) is that the never_type feature is actually a thing. As you probably already know, it even allows you to have non-exhaustive patterns where the patterns involving ! can be omitted. For example:

#![feature(never_type)]

fn main() {
    let x: Option<!> = None;
    match x {
        None => println!("hello"),
    }
}

I remember testing this with my Void type and being disappointed that the compiler did not allow me to omit the cases with Void.

This is a bit of a game changer for me, and reduces the need for the different types significantly. I’m going to play around with it and see how it goes.

=== UPDATE ===

So I’m just about done refactoring due to the introduction of never_type. The result is so much cleaner! Deleting large sections of the code was very satisfying. Turns out you do only need to distinguish between future and stream to keep the compiler happy, the other types ‘just work’ without having to specify them. I’m also really glad to have only 1 Poll type and only 1 Stream trait now.

The only addition is that there are now 3 variations of await (before there were only 2):

  • await_item!(Stream<Y, !>) -> Y
  • await_complete!(Stream<Y, C>) -> C
  • await_either!(Stream<Y, C>) -> Result<Y, C>

I think using Result here is actually not a good choice. But I’m out of time to work on this for now. Will look at it again later on.

https://play.rust-lang.org/?gist=6ff453884fa4357e857dab1ee03bf38f&version=nightly

Interesting Idea #3

The futures-await readme suggests that async functions cannot take references as parameters due to borrowing rules. Yet, with my implementation this seems to compile and run as expected. Maybe I’m missing the point? Maybe this is unsafe?

    fn foo<'a>(s: &'a str) -> impl Stream<!, &'a str> + 'a {
        future!({
            not_ready!();
            println!("{}", s);
            not_ready!();
            s
        })
    }

    let some_str = String::from(">>> testing 123 <<<");
    let bar = foo(&some_str);
    poll_stream_to_completion(bar);

I also played a bit with futures today. Here is an Future trait and future combinators which accept immovable types using immovable generators.

It is a good idea to use as few result/poll types as possible. My code reuses the GeneratorState type which avoids the ugly translation between futures and generators, although you do need something more complex if you also want to support streams.. Using Result is completely unnecessary and just result in more complex code.

Only references to values stored in the generator are unsafe, as these would be invalidated if the generator moved. So your example is fine.

1 Like

This topic was automatically closed 90 days after the last reply. New replies are no longer allowed.