Help test async/await/generators/coroutines!

I think using generators to implement iterators will be a huge benefit. For example this playground shows an example of an iterator that is very simple to implement with a generator, but otherwise somewhat complex and/or unsafe.

The eRFC discusses using wrapper types to convert generators to iterators. (That’s also the approach used in my playground above.) It mentions that wrapper types may be necessary because a generic impl<G: Generator> Iterator for G could cause coherence errors. But as an alternative to wrapper types, would it be possible for the compiler to automatically generate both Iterator and Generator implementations for each generator literal?

7 Likes

One could likewise hope for impl<I: Iterator> Generator for I – could specialization allow us to implement a mutual relationship like this?

From my preliminary tests I’ve seen that a generator-converted-to-iterator in some cases is much faster (and leads to cleaner code) than Iterator::scan().

But I think performance of such iterators-from-generators still needs to be assessed.

Can you use that on this function too?

// pairwise(1 2 3) => (1, 1) (1, 2) (1, 3) (2, 2) (2, 3) (3, 3)
fn pairwise<'a, T>(items: &'a [T]) -> impl Iterator<Item=(&'a T, &'a T)> + 'a {
    items
    .iter()
    .enumerate()
    .flat_map(move |(i, x1)| items[i ..]
                             .iter()
                             .map(move |x2| (x1, x2)))
}

I think having some mechanism for using generators as iterators and vice versa is a must-have in the long run. If people can’t transparently use a generator to implement basic iterators like they would in Python, they probably won’t get used to its full potential.

Being able to do away with all the superfluous iterator types (like std::slice::Iter) and replacing them with generators and conservative_impl_trait would also do away with a lot of tedious boilerplate code as well.

1 Like

Probably can’t get rid of std::slice::Iter et al, because that would break any code that already uses those types.

3 Likes

Hi. I’ve an async thread which receives requests from main thread and executes them (such as sending packets over network).

#[async]
fn command_read(commands_rx: Receiver<NetworkRequest>, mut sender: SplitSink<Framed<TcpStream, MqttCodec>>) -> io::Result<()> {

    // check question 1 below
    let commands_rx = commands_rx.or_else(|_| {
            Err(io::Error::new(ErrorKind::Other, "Rx Error"))
    });

    
    #[async]
    for command in commands_rx {
        println!("command = {:?}", command);
        let packet = match command {
            NetworkRequest::Publish(publish) => {
                Packet::Publish(publish)
            }
            NetworkRequest::Ping => {
                packet::gen_pingreq_packet()
            }
            NetworkRequest::Drain => {
                // break and drain the channel
           }
            _ => unimplemented!()
        };

        sender = await!(sender.send(packet))?
    }

    // check question 2
    // code to drain the channel and save it

    Ok(())
}

I’ve few questions w.r.t the above code

  1. Given the idea of this crate is to minimize the use of combinators, is there any other way to make commands_rx stream return IO errors with out using orelse combinator?

  2. Is there a way to drain the channel given channel receiver commands_rx is moved into async iterator.

Thanks for testing this all out!

A good question! Right now the #[async] for loop implementation sort of necessitates the use of combinators here because it doesn't allow customizing what happens on an error. Despite this though I wouldn't necessarily say that the goal of async/await is to minimize the use of combinators, but rather maximize the readability of code. In this case the or_else (which I might recommend using map_err instead) doesn't necessarily degrade the readability here. It's sort of like how match isn't really advocated over Option::and_then, it's just sort of whatever's most ergonomic at the callsite.

I'll also note that for Receiver explicitly the error can't ever happen. Right now the error is () and we'd like to change this so you basically don't have to worry about the error, which would obviate the need for the or_else ideally.

Hm I'm not quite sure what you mean by this? The iterator there will drain the commands_rx iterator already, so what else do you need from it? Otherwise you maybe able to use Stream::by_ref perhaps?

Thanks @alexcrichton for the quick reply. Sorry for not being very clear in my 2nd question.

What I want to do is, when my Receiver receives Drain request, it’ll break the iterator and close the stream so that Sender wont be able to put any new requests in the channel. After closing, I’ll iterate over the stream again and read up all the buffered messages and exit

let commands_rx = commands_rx.or_else(|_| {
    Err(io::Error::new(ErrorKind::Other, "Rx Error"))
});

#[async]
for command in commands_rx {
    match command {
        NetworkRequest::Drain => break,
        _ => unimplemented!(),
    }
}

commands_rx.close();

// tx not allowed send new requests anymore. drain
// all the buffered requests and exit

I’m facing 2 problems

P1. Unable to async iterate over stream reference ( I think this isn’t supported yet ?)

let commands_rx  = commands_rx.by_ref();
#[async]
for command in commands_rx {
    match command {
        NetworkRequest::Drain => break,
        _ => unimplemented!(),
    }
} 

Error

borrow may still be in use when generator yields
   --> src/client/connection.rs:124:32
    |
124 |             let commands_rx  = commands_rx.by_ref();        

Q2. Getting back Receiver from OrElse<Receiver<>> to be able to apply close

Is there a way to get back Receiver stream back from the combinator?

Ah right yeah excellent point! I don’t think by_ref will work here b/c of lifetimes right now.

So in thinking about what you’d do with iterators (if you weren’t allowed to use by_ref) I think you’d end up doing something like:

while let Some(a) = iter.next() {
    // maybe `break`
}

// continue using `iter`

In that sense I think you may not be able to use #[async] for loops here. You could try using Stream::into_future but that may not be the most ergnomic:

#[async]
fn foo(mut stream: Box<Stream<Item = u32, Error = i32>>) -> Result<(), i32> {
    loop {
        let item = match await!(stream.into_future().map_err(|e| e.0))? {
            (Some(item), s) => {
                stream = s;
                item
            }
            (None, s) => {
                stream = s;
                break
            }
        };

        if item == 3 {
            break
        }
    }


    // continue using `stream`
    drop(stream);
    Ok(())
}
1 Like

@alexcrichton Awesome. This works :slight_smile:. Thank you

Coming to ergonomics, do we have plans to make it possible to use streams after iterators (i.e don’t consume the stream)

#[async]
for command in commands_rx.iter() {

}

// use `commnads_rx` here again

I’m facing a Broken MIR error.

error: internal compiler error: src/librustc_mir/transform/generator.rs:340: Broken MIR: generator contains type mqtt3::Publish in MIR, but typeck only knows about (std::result::Result<futures::stream::OrElse<futures::sync::mpsc::Receiver<client::connection::Request>

Is this a known bug? I’m using using Rc<RefCell> inside async_block ( not sure if this is the issue).

I think this is the problematic part of the code inside async_block

           // execute user requests  
            loop {
                let command = match await!(commands_rx.into_future().map_err(|e| e.0))? {
                    (Some(item), s) => {
                        commands_rx = s;
                        item
                    }
                    (None, s) => {
                        commands_rx = s;
                        break

                    }
                };

                info!("command = {:?}", command);

                let packet = match command {
                    Request::Publish(publish) => mqtt_state.borrow_mut().handle_outgoing_publish(publish),
                    Request::Ping => Packet::Pingreq,
                    Request::Reconnect => break,
                    _ => unimplemented!()
                };

                sender = await!(sender.send(packet))?
            }

Perhaps related to https://github.com/rust-lang/rust/issues/44184?

Oh sorry meant to respond to this, but I’ve opened https://github.com/alexcrichton/futures-await/issues/17 to track this.

Has there been any discussion of allowing resume to take an argument, whose value is the result of the yield expression? (See https://docs.python.org/3/reference/expressions.html#generator.send for example.) I don’t think this needs to be in an initial implementation, but if any decisions now would impede doing that later I’d like there to be some thought put into it.

7 Likes

Another option would be to take “resume” arguments where a closure’s normal arguments would go, eg. |a, b| { ... yield ... } and have them update after each yield. The existing restrictions on borrowing across yield points make this safe, although I could see it being too implicit?

Skimming the merged eRFC and discussion thread I don't see any/much discussion of this, presumably because it's not important for the #[async] -> impl Future case that motivated this particular eRFC. There has been discussion of this in the past on the more fully-fledged stackless coroutine RFC and Post-RFC threads that preceded the current implementation.

I'm actually really curious if it would be possible to use this to implement an #[async] -> impl Sink and even #[async] -> impl Stream + Sink variants of the #[async] macro, these would be really useful for me to help implement async IO adapters.

1 Like

Arguments to generators were in the initial implementation but ended up getting removed to make the implementation easier to review and also as the most conservative route. It’s definitely something we’d like to support!

7 Likes

This topic was automatically closed 90 days after the last reply. New replies are no longer allowed.