Help test async/await/generators/coroutines!

<good-part>

This is really awesome. I wasn’t expecting this to be implemented for another year or so :smile: Hopefully this will make Rust/Tokio much more feasible for real-life production usage.

My favorite recursive Fibonacci sequence example works fine (yay!)

</good-part> <bad-part>

But there seems to be some problem with generator objects size calculation when the generator invokes itself. Naturally, a generator cannot store a copy of itself on the stack across yield points (that would make its size infinite), but it should work fine to store itself unboxed between yield points (this works) and to store itself boxed across yield points (this causes rustc to overflow its stack).

And when I try to cast the boxed-self to a boxed trait object, rustc reports a weird lifetime error.

Playground link

</bad-part>

1 Like

But there seems to be some problem with generator objects size calculation when the generator invokes itself.

Sounds related to https://github.com/rust-lang/rust/issues/35706

Oh dear sounds bad! Do you know if there’s a bug filed for this against rust-lang/rust?

Thanks for the report! I’ll make sure we track the bug @dwrensha cc’d

AFAIK #43787 is suspected, but I haven’t profiled to determine the cause.

An interesting thing about this is that the code compiles in 20 seconds for the lib, but insanely long for any binary that uses it (tests, CLI bin).

Probably https://github.com/rust-lang/rust/issues/38528.

I think using generators to implement iterators will be a huge benefit. For example this playground shows an example of an iterator that is very simple to implement with a generator, but otherwise somewhat complex and/or unsafe.

The eRFC discusses using wrapper types to convert generators to iterators. (That’s also the approach used in my playground above.) It mentions that wrapper types may be necessary because a generic impl<G: Generator> Iterator for G could cause coherence errors. But as an alternative to wrapper types, would it be possible for the compiler to automatically generate both Iterator and Generator implementations for each generator literal?

7 Likes

One could likewise hope for impl<I: Iterator> Generator for I – could specialization allow us to implement a mutual relationship like this?

From my preliminary tests I’ve seen that a generator-converted-to-iterator in some cases is much faster (and leads to cleaner code) than Iterator::scan().

But I think performance of such iterators-from-generators still needs to be assessed.

Can you use that on this function too?

// pairwise(1 2 3) => (1, 1) (1, 2) (1, 3) (2, 2) (2, 3) (3, 3)
fn pairwise<'a, T>(items: &'a [T]) -> impl Iterator<Item=(&'a T, &'a T)> + 'a {
    items
    .iter()
    .enumerate()
    .flat_map(move |(i, x1)| items[i ..]
                             .iter()
                             .map(move |x2| (x1, x2)))
}

I think having some mechanism for using generators as iterators and vice versa is a must-have in the long run. If people can’t transparently use a generator to implement basic iterators like they would in Python, they probably won’t get used to its full potential.

Being able to do away with all the superfluous iterator types (like std::slice::Iter) and replacing them with generators and conservative_impl_trait would also do away with a lot of tedious boilerplate code as well.

1 Like

Probably can’t get rid of std::slice::Iter et al, because that would break any code that already uses those types.

3 Likes

Hi. I’ve an async thread which receives requests from main thread and executes them (such as sending packets over network).

#[async]
fn command_read(commands_rx: Receiver<NetworkRequest>, mut sender: SplitSink<Framed<TcpStream, MqttCodec>>) -> io::Result<()> {

    // check question 1 below
    let commands_rx = commands_rx.or_else(|_| {
            Err(io::Error::new(ErrorKind::Other, "Rx Error"))
    });

    
    #[async]
    for command in commands_rx {
        println!("command = {:?}", command);
        let packet = match command {
            NetworkRequest::Publish(publish) => {
                Packet::Publish(publish)
            }
            NetworkRequest::Ping => {
                packet::gen_pingreq_packet()
            }
            NetworkRequest::Drain => {
                // break and drain the channel
           }
            _ => unimplemented!()
        };

        sender = await!(sender.send(packet))?
    }

    // check question 2
    // code to drain the channel and save it

    Ok(())
}

I’ve few questions w.r.t the above code

  1. Given the idea of this crate is to minimize the use of combinators, is there any other way to make commands_rx stream return IO errors with out using orelse combinator?

  2. Is there a way to drain the channel given channel receiver commands_rx is moved into async iterator.

Thanks for testing this all out!

A good question! Right now the #[async] for loop implementation sort of necessitates the use of combinators here because it doesn’t allow customizing what happens on an error. Despite this though I wouldn’t necessarily say that the goal of async/await is to minimize the use of combinators, but rather maximize the readability of code. In this case the or_else (which I might recommend using map_err instead) doesn’t necessarily degrade the readability here. It’s sort of like how match isn’t really advocated over Option::and_then, it’s just sort of whatever’s most ergonomic at the callsite.

I’ll also note that for Receiver explicitly the error can’t ever happen. Right now the error is () and we’d like to change this so you basically don’t have to worry about the error, which would obviate the need for the or_else ideally.

Hm I’m not quite sure what you mean by this? The iterator there will drain the commands_rx iterator already, so what else do you need from it? Otherwise you maybe able to use Stream::by_ref perhaps?

Thanks @alexcrichton for the quick reply. Sorry for not being very clear in my 2nd question.

What I want to do is, when my Receiver receives Drain request, it’ll break the iterator and close the stream so that Sender wont be able to put any new requests in the channel. After closing, I’ll iterate over the stream again and read up all the buffered messages and exit

let commands_rx = commands_rx.or_else(|_| {
    Err(io::Error::new(ErrorKind::Other, "Rx Error"))
});

#[async]
for command in commands_rx {
    match command {
        NetworkRequest::Drain => break,
        _ => unimplemented!(),
    }
}

commands_rx.close();

// tx not allowed send new requests anymore. drain
// all the buffered requests and exit

I’m facing 2 problems

P1. Unable to async iterate over stream reference ( I think this isn’t supported yet ?)

let commands_rx  = commands_rx.by_ref();
#[async]
for command in commands_rx {
    match command {
        NetworkRequest::Drain => break,
        _ => unimplemented!(),
    }
} 

Error

borrow may still be in use when generator yields
   --> src/client/connection.rs:124:32
    |
124 |             let commands_rx  = commands_rx.by_ref();        

Q2. Getting back Receiver from OrElse<Receiver<>> to be able to apply close

Is there a way to get back Receiver stream back from the combinator?

Ah right yeah excellent point! I don’t think by_ref will work here b/c of lifetimes right now.

So in thinking about what you’d do with iterators (if you weren’t allowed to use by_ref) I think you’d end up doing something like:

while let Some(a) = iter.next() {
    // maybe `break`
}

// continue using `iter`

In that sense I think you may not be able to use #[async] for loops here. You could try using Stream::into_future but that may not be the most ergnomic:

#[async]
fn foo(mut stream: Box<Stream<Item = u32, Error = i32>>) -> Result<(), i32> {
    loop {
        let item = match await!(stream.into_future().map_err(|e| e.0))? {
            (Some(item), s) => {
                stream = s;
                item
            }
            (None, s) => {
                stream = s;
                break
            }
        };

        if item == 3 {
            break
        }
    }


    // continue using `stream`
    drop(stream);
    Ok(())
}
1 Like

@alexcrichton Awesome. This works :slight_smile:. Thank you

Coming to ergonomics, do we have plans to make it possible to use streams after iterators (i.e don’t consume the stream)

#[async]
for command in commands_rx.iter() {

}

// use `commnads_rx` here again

I’m facing a Broken MIR error.

error: internal compiler error: src/librustc_mir/transform/generator.rs:340: Broken MIR: generator contains type mqtt3::Publish in MIR, but typeck only knows about (std::result::Result<futures::stream::OrElse<futures::sync::mpsc::Receiver<client::connection::Request>

Is this a known bug? I’m using using Rc<RefCell> inside async_block ( not sure if this is the issue).

I think this is the problematic part of the code inside async_block

           // execute user requests  
            loop {
                let command = match await!(commands_rx.into_future().map_err(|e| e.0))? {
                    (Some(item), s) => {
                        commands_rx = s;
                        item
                    }
                    (None, s) => {
                        commands_rx = s;
                        break

                    }
                };

                info!("command = {:?}", command);

                let packet = match command {
                    Request::Publish(publish) => mqtt_state.borrow_mut().handle_outgoing_publish(publish),
                    Request::Ping => Packet::Pingreq,
                    Request::Reconnect => break,
                    _ => unimplemented!()
                };

                sender = await!(sender.send(packet))?
            }

Perhaps related to https://github.com/rust-lang/rust/issues/44184?

Oh sorry meant to respond to this, but I’ve opened https://github.com/alexcrichton/futures-await/issues/17 to track this.