First experience and thoughts about `Pin` API


#21

While implementing IntoIterator for Generator will be a path of least friction, it will discard return value, which I believe is a really bad design decision. Viewing Iterator<T> as Generator<T, ()> is very natural, and doing it the other way around feels simply wrong. Also the proposal quite neatly solves question of for loop return value and issues around Iterator<Result<T, E>>, which can be replaced with Generator<T, Result<(), E>>.


#22

I don’t know how you expect to receive the return value from a generator that is being used in a for loop, it must be consumed by the Iterator::next implementation and discarded since it can only return None. Unless you’re referring to actually change the for loop expansion to use something other than Iterator, in which case you can just do whatever is necessary to make it work nicely. If you know how long the iterator is beforehand I guess you can use take to stop before you consume the return value, but that seems unlikely in most cases.

I should have noted that as well as that IntoIterator it would probably use impl<G> Iterator for G where G: Generator + Unpin as well which (along with some extension methods) could allow for pretty complex uses without much overhead in the logic like:

trait GeneratorExt: Generator {
    fn by_ref(&mut self) -> &mut Self;

    fn map_complete<U, F>(self, f: F) -> impl Generator<Yield = Self::Yield, Return = U>
    where
        F: FnOnce(Self::Return) -> U;

    fn resume_unpin(&mut self) -> GeneratorState<Self::Yield, Self::Return>
    where
        Self: Unpin;
}

let gen = || { yield 1; yield 2; return "foo"; };

for one in gen.by_mut().map_return(drop).take(1) {}
if let Some(two) = gen.next() {}
if let GeneratorState::Complete(final) = gen.resume_unpin() {}

#23

Please, read the linked proposal (and ideally discussion). In addition to motivation and explanations why I consider implementing iterator traits for generators a wrong move (but you still can use a wrapper which will transform Generator into Iterator by discarding return value), it also includes a rough description how for loop expansion can be changed in a backwards compatible way (though it relies on negative trait bounds).


#24

Ok, sorry for assuming that it would be based around converting Generator -> Iterator. If you’re changing the for loop expansion then it should be possible to make it so that pinning is integrated into it better anyway, I doubt it would be necessary to manually wrap the Generator in a pinned reference (presumably by just swapping generator.resume() with Pin::new(&mut generator).resume() in the expansion so that users must supply a IntoGenerator<Generator: Unpin> value).


#25

Hm, you are right, pinning as part of expansion could indeed work. Hopefully manual resume calls on non-self-referential generators will be rare enough for unwieldness of Pin::new(&mut generator).resume() to matter.


#26

The new std::future::Future common combinators are mush more complicated than I thought. Here is another one join_on_ok will run two Result returning Futures in parallel, and stop if either of then results in Err, or both of them results in Ok.

With the EitherOr defined before, I wrote

pub fn join_on_ok<T1, T2, E1, E2>(
    f1: impl Future<Output = Result<T1, E1>>,
    f2: impl Future<Output = Result<T2, E2>>,
) -> impl Future<Output = EitherOr<Result<T1,E1>,Result<T2,E2>>> {
    struct JoinFuture<T1, T2, F1, F2>(F1, F2, Option<T1>, Option<T2>);
    impl<T1, T2, E1, E2, F1, F2> Future for JoinFuture<T1, T2, F1, F2>
    where
        F1: Future<Output = Result<T1, E1>>,
        F2: Future<Output = Result<T2, E2>>,
    {
        type Output = EitherOr<Result<T1,E1>,Result<T2,E2>>;
        fn poll(self: Pin<&mut Self>, lw: &LocalWaker) -> Poll<Self::Output> {
            let this = unsafe {
                let this = self.get_unchecked_mut();
                (
                    Pin::new_unchecked(&mut this.0),
                    Pin::new_unchecked(&mut this.1),
                    &mut this.2,
                    &mut this.3,
                )
            };
            match (&this.2, &this.3) {
                (Some(_), Some(_)) => unreachable!(),
                (Some(_), _) => match this.1.poll(lw) {
                    Ready(Ok(v2)) => Ready(EitherOr::Both(Ok(this.2.take().unwrap()), Ok(v2))),
                    Ready(Err(e2)) => Ready(EitherOr::Both(Ok(this.2.take().unwrap()), Err(e2))),
                    _ => Pending,
                },
                (_, Some(_)) => match this.0.poll(lw) {
                    Ready(Ok(v1)) => Ready(EitherOr::Both(Ok(v1), Ok(this.3.take().unwrap()))),
                    Ready(Err(e1)) => Ready(EitherOr::Both(Err(e1), Ok(this.3.take().unwrap()))),
                    _ => Pending,
                },
                _ => match (this.0.poll(lw), this.1.poll(lw)) {
                    (Ready(Ok(v1)), Ready(Ok(v2))) => Ready(EitherOr::Both(Ok(v1), Ok(v2))),
                    (Ready(Err(e1)), Ready(Ok(v2))) => Ready(EitherOr::Both(Err(e1), Ok(v2))),
                    (Ready(Ok(v1)), Ready(Err(e2))) => Ready(EitherOr::Both(Ok(v1), Err(e2))),
                    (Ready(Err(e1)), Ready(Err(e2))) => Ready(EitherOr::Both(Err(e1), Err(e2))),
                    (Ready(Ok(v1)), Pending) => {
                        *this.2 = Some(v1);
                        Pending
                    },
                    (Ready(Err(e1)), _) => Ready(EitherOr::This(Err(e1))),
                    (Pending, Ready(Ok(v2))) => {
                        *this.3 = Some(v2);
                        Pending
                    },
                    (_, Ready(Err(e2))) => Ready(EitherOr::That(Err(e2))),
                    _ => Pending,
                },
            }
        }
    }
    JoinFuture(f1, f2, None, None)
}

This combinator is natually appear when I have to proxy connection. In such a case I hold 2 seperate data transfering futures in the form of source -> proxy -> target, and if either futures are stopped I need to stop the other one as well, or they can both close peacefully.

Right now, I identified 4 combinators that cannot be written in await! form, and requires manual implementing:

  1. join: run two Futures in parallel, stop if both stops.
  2. alt: run two Futures in parrel, stop if either or both of them stops.
  3. select, a varition of alt, where will be bias to one of the Futures, such that other Futures wil not be polled in the same circle.
  4. join_on_ok: a variation of join such that the outputs are Results, and the stop criteria is either of the futures fail or both success.

#27

Compared to implementing the equivalent in futures(0.1) that appears to just have a single extra statement doing the pin-projection + passing the LocalWaker around?

One thing you’re missing here is an abstraction to allow dealing with futures that may have completed in an earlier poll. futures(0.3) uses MaybeDone for this, I’m not certain if this will help in this exact case, but I’m sure it’s possible to have something that would.

You’re currently over-matching a lot, I think almost all the Ok/Err patterns can be dropped and you can just bind the result directly, e.g.

(Some(_), _) => match this.1.poll(lw) {
    Ready(Ok(v2)) => Ready(EitherOr::Both(Ok(this.2.take().unwrap()), Ok(v2))),
    Ready(Err(e2)) => Ready(EitherOr::Both(Ok(this.2.take().unwrap()), Err(e2))),
    _ => Pending,
},

can be written

(Some(_), _) => match this.1.poll(lw) {
    Ready(v2) => Ready(EitherOr::Both(Ok(this.2.take().unwrap()), v2)),
    Pending => Pending,
},

and using the ready! macro

(Some(_), _) => {
    let v2 = ready!(this.1.poll(lw));
    Ready(EitherOr::Both(Ok(this.2.take().unwrap()), v2))
},

fully simplifying using those gives something that doesn’t seem too bad to me (definitely feels like it could be simplified a bit more with a targeted abstraction).

pub fn join_on_ok<T1, T2, E1, E2>(
    f1: impl Future<Output = Result<T1, E1>>,
    f2: impl Future<Output = Result<T2, E2>>,
) -> impl Future<Output = EitherOr<Result<T1,E1>,Result<T2,E2>>> {
    struct JoinFuture<T1, T2, F1, F2>(F1, F2, Option<Result<T1, E1>>, Option<Result<T2, E2>>);
    impl<T1, T2, E1, E2, F1, F2> Future for JoinFuture<T1, T2, F1, F2>
    where
        F1: Future<Output = Result<T1, E1>>,
        F2: Future<Output = Result<T2, E2>>,
    {
        type Output = EitherOr<Result<T1,E1>,Result<T2,E2>>;
        fn poll(self: Pin<&mut Self>, lw: &LocalWaker) -> Poll<Self::Output> {
            let this = unsafe {
                let this = self.get_unchecked_mut();
                (
                    Pin::new_unchecked(&mut this.0),
                    Pin::new_unchecked(&mut this.1),
                    &mut this.2,
                    &mut this.3,
                )
            };

            match (&this.2, &this.3) {
                (Some(_), Some(_)) => unreachable!(),
                (Some(_), _) => {
                    let v2 = ready!(this.1.poll(lw);
                    Ready(EitherOr::Both(this.2.take().unwrap(), v2))
                },
                (_, Some(_)) => {
                    Ready(EitherOr::Both(ready!(this.0.poll(lw), this.3.take().unwrap()))
                },
                _ => match (this.0.poll(lw), this.1.poll(lw)) {
                    (Ready(v1), Ready(v2)) => (v1, v2),
                    (Ready(Err(e1)), _) => Ready(EitherOr::This(Err(e1)))
                    (_, Ready(Err(e2))) => Ready(EitherOr::That(Err(e2)))
                    (Ready(v1), Pending) => {
                        *this.2 = Some(v1);
                        Pending
                    },
                    (Pending, Ready(v2)) => {
                        *this.3 = Some(v2);
                        Pending
                    },
                    (Pending, Pending) => Pending,
                },
            }
        }
    }
    JoinFuture(f1, f2, None, None)
}

I think the simple criteria for something that cannot be written using async fn is just anything that involves parallelism, that’s why I think these sorts of APIs are likely to be the first to migrate to std from futures once the core APIs are stabilized. But a lot of the more specialized parallel combinators like this can be built on top of the basic select! and join! macros, e.g.

pub async fn join_on_ok<T1, T2, E1, E2>(
    f1: impl Future<Output = Result<T1, E1>>,
    f2: impl Future<Output = Result<T2, E2>>,
) -> EitherOr<Result<T1,E1>,Result<T2,E2>> {
    let (f1, f2) = (f1.fuse(), f2.fuse());
    let (mut v1, mut v2) = (None, None)
    loop {
        select! {
            v = f1 => match v {
                Ok(v) => v1.replace(v),
                Err(e) => match v2.take() {
                    Some(v2) => break EitherOr::Both(Err(e), Ok(v2)),
                    None => break EitherOr::This(Err(e)),
                },
            },
            v = f2 => match v {
                Ok(v) => v2.replace(v),
                Err(e) => match v1.take() {
                    Some(v1) => break EitherOr::Both(Ok(v1), Err(e)),
                    None => break EitherOr::That(Err(e)),
                },
            },
            complete => {
                break EitherOr::Both(Ok(v1.take().unwrap()), Ok(v2.take().unwrap()))
            }
        }
    }
}

(I realise this doesn’t fully handle the case where one future completes with a value simultaneously with the other future completing with an error, but I don’t think that’s an important case to handle)


#28

You are right! I were just posting my first “workable” code, and later on I jump in to optimise, and did something like you just did for me.

I feel the samething.

I didn’t see the join! macro anywhere, but I saw select! were deprecated without refering any Rust issue and no indication on alternatives.

https://doc.rust-lang.org/std/macro.select.html


#29

These are part of futures(0.3), they haven’t been proposed for including in std yet (and will have issues because of that existing select macro, unless there’s a way to have it as std::future::select! somehow). Here’s the docs for select. Most of the API experimentation on top of the core futures APIs is happening as part of futures(0.3) so it has time to bake outside std (I’m not really sure what the overall plan with moving it into std is, I would expect at least some of the core extensions to move, but futures might remain as something akin to itertools for the less used extensions).


#30

Speaking of, is this try_join!?


#31

Kind of. But if both futures end in a same poll circle I keep all successful-or-not results. I don’t know try_join! will or will not. Also I don’t require futures to have the same error type and/or ok type.