Runtime-agnostic cooperative task scheduling budget

Yes; an executor would then define its spawn function as something like

fn spawn<O>(&self, fut: impl Future<SomeConcreteType, Output=O>)->JoinHandle<O>

where SomeConcreteType is the actual context type that the executor will be passing to poll later. The compiler will then reject any attempt to spawn an incompatible Future into the executor. Boxed futures are still possible, but they would need to know the context type that will be used at boxing-time.


A nice-to-have extension of this would be some way to get a reference to the context from an async block, but that might be trickier: The borrow would need to end before the next await point, and there might be some trouble around type inference.

Even without that, hand-written futures could take advantage of the more-specific context and be called from regular async blocks, which would then pick up the extra requirements via the generic template I wrote above.

A question of ordering: is it better for cooperation to take a "yield before" shape or a "yield after" shape?

// yield before
async fn cooperative<F: IntoFuture>(inner: F) -> F::Output {
    let inner = pin!(inner.into_future());
    poll_fn(move |cx| {
        // fn Context::poll_cooperate(cx) -> Poll<Cooperation>;
        let mut coop = ready!(cx.poll_cooperate(cx));
        let output = ready!(inner.poll(cx));
        // fn Cooperation::record_progress(self);
        coop.record_progress();
        Ready(output)
    }).await
}

// yield after
async fn cooperative<F: Future>(mut inner: F) -> F::Output {
    let output = inner.await;
    // fn Context::yield_cooperatively() -> bool;
    poll_fn(|cx| {
        if cx.yield_cooperatively() { Ready(()) } else { Pending }
    ).await;
    output
}

Having written that down, I think the former is probably preferable; yield after imposes an extra state (made progress but yielding cooperatively), and you can shim from yield before to yield after but not the other way around.

async fn yield_cooperatively() {
    poll_fn(|cx| {
        let coop = ready!(cx.poll_cooperation());
        coop.record_progress();
        Ready(())
    }).await    
}

// useful (abusable?) for back-edge yielding, e.g.
while let Some(task) = tasks.next() {
    make_progress(task); // sync
    yield_cooperarively().await;
}
// kinda like yield_now(), but yielding less frequently

I'd still suggest some minor tweaks, though:

  • I have a slight preference for record_progress over made_progress, because the former is an active verb, whereas the latter "feels" more like a fn(&self) -> bool query due to being past tense.
  • Cooperation::record_progress should probably take self by value, because it's only intended to attempt and then record a single instance of progress after acquiring a cooperation handle. If you want to make more progress, you should poll for another.
  • Cooperation should capture the lifetime of the context, since it's not intended to hold onto an instance across multiple polls.
    • Doing so is almost certainly an error, so we should make it an error to write poll_fn(|cx| cx.poll_cooperation()). I originally saw this was possible and thought[1] that it was a correct way to implement cooperation, and thus that record_progress needed to return Poll as well for "yield after" behavior for repeated progress on the same Cooperation.
    • In the extreme, Cooperation could even just be &dyn Cooperate rather than going through a manual vtable like Waker. Though it's probably realistic to consider that it might want to hold onto some state which the Context doesn't have.

Before stabilization, it'll be strongly desirable to provide a reasonably usable definition of when you should guard a poll with cooperation. The starting position is probably roughly "any nonguaranteed[2] progress by means other than polling another future" and let people use their best judgement from there.


The simplest possible implementation would I think be

use core::task::{Poll, Waker};
use core::marker::PhantomData;
use core::mem::align_of;

pub struct Context<'a> {
    waker: &'a Waker,
    cooperation: &'a mut (dyn coop::Cooperate + 'a),
    // plus the PhantomData markers
}

#[derive(Default)]
#[non_exhaustive]
pub struct ExtraContext<'a> {
    pub cooperation: Option<&'a mut (dyn coop::Cooperate + 'a)>,
    _marker: PhantomData<Context<'a>>,
}

impl<'a> Context<'a> {
    pub fn from_waker_and(waker: &'a Waker, extra: ExtraContext<'a>) -> Self {
        let cooperation = extra.cooperation.unwrap_or_else(|| unsafe {
            // SAFETY: coop::Never is a unit struct ZST; it's
            // `Box::leak(Box::new(coop::Never))` without alloc
            &mut *(align_of::<coop::Never>() as *mut coop::Never)
        });
        Self { waker, cooperation, .. }
    }

    pub fn with_waker<'new>(&'new mut self, waker: &'new Waker) -> Context<'new> {
        Context { waker, cooperation: self.cooperation, .. }
    }

    pub fn poll_cooperate(&mut self) -> Poll<coop::Cooperation<'_>> {
        if self.cooperation.should_yield() {
            Poll::Pending
        } else {
            Poll::Ready(coop::Cooperation(self.cooperation))
        }
    }
}

pub mod coop {

pub trait Cooperate {
    fn should_yield(&self) -> bool;
    fn record_progress(&mut self);
}

pub struct Never;
impl Cooperate for Never {
    fn should_yield(&self) -> bool { false }
    fn record_progress(&mut self) {}
}

pub struct Cooperation<'a>(pub(super) &'a mut (dyn Cooperate + 'a));
impl Cooperation<'_> {
    pub fn record_progress(self) {
        self.0.record_progress()
    }
}

}

If we do much more than this, I'd want to see some small example of where more flexibility might be desirable. I've failed to construct an artificial scenario where this isn't sufficient.

The reason Waker uses a manual vtable is that it needs to be cloneable and owned. Cooperation I think doesn't, and in fact wants to not last beyond a single poll's context.


I do personally think that ultimately task::Context should be a Provider such that runtimes can dynamically thread extra context through which futures can then dynamically use if available. Cooperative yielding is actually a good application of this IMHO; if a task doesn't cooperate, it's an unfortunate pessimization but it's not going to result in incorrect results.

Rough "could look like" sketch
pub struct Context<'a> {
    provider: &'a dyn Provider + 'a,
    waker: &'a Waker,
}

impl<'_> Provider for Context<'_> {
    fn provide<'a>(&'a self, demand: &mut Demand<'a>) {
        demand.provide_ref(waker);
        demand.provide_value_with(|| waker.clone());
        self.provider.provide(demand);
    }
}

impl<'a> Context<'a> {
    pub fn from_waker(waker: &'a Waker) -> Self {
        Self::from_waker_and_provider(waker, &NullProvider)
    }

    #[track_caller]
    pub fn from_provider(provider: &'a dyn Provider + 'a) -> Self {
        let waker = request_ref(provider)
            .expect("context provider should provide a waker");
        Self::from_waker_and_provider(waker, provider)
    }

    pub fn from_waker_and_provider(waker: &'a Waker, provider: &'a dyn Provider + 'a) -> Self {
        Context { provider, waker }
    }

    pub fn with_waker<'b>(&mut self, waker: &'b Waker) -> Context<'b>
    where
        'a: 'b,
    {
        Context::from_waker_and_provider(waker, self.provider)
    }
}

impl<'a> Context<'a> {
    pub fn waker(&self) -> &'a Waker { self.waker }

    pub fn poll_cooperate(&self) -> Poll<impl FnOnce() + '_> {
        let coop = request_ref::<dyn Cooperate>();
        if let Some(coop) = &coop && coop.should_yield() {
            Pending
        } else {
            Ready(move || if let Some(coop) = coop {
                coop.record_progress();
            })
        }
    }
}

pub trait Cooperate {
    fn should_yield(&self) -> bool;
    fn record_progress(&self);
}

// NB: there's no mut-capable version of Provider currently.
// Provider could support demanding &mut 'static, it just doesn't currently.
// If there were, I'd use it, so recording progress could be mut.
// And that way, the budget can be owned 

The waker is kept outside the provider for two main reasons:

  • Roughly every future will access the waker, so keeping cx.waker() nondynamic is probably beneficial.
  • Creating a new context with a replaced waker is common (e.g. for join/select combinators tracking which future got awoken). The waker reference being separated allows easy replacement, whereas it being behind Provider would require more work to chain Providers up the context stack.
    • If futures want to add on things accessible via the provider API, they can still wrap and delegate up the context stack themselves, e.g. with Context::with_waker_and_provider(&new_waker, cx).

The !Send + !Sync now comes from containing a dyn trait object. I relaxed the invariance to covariance because cx.with_waker(cx.waker()) basically does the same as covariance. (It could just drop any invariant context, but doing so is at best a footgun, since replacing wakers is a reasonably common thing to do. The current use of Context::from_waker to do so already drops any extra context we might attach in the future, which is unfortunate since that means any currently existing code which does that won't play well with that extra context.)


But even if arbitrary context can be provided, it still makes sense for std to provide dedicated types/accessors for widely applicable context, such as for cooperative yielding.

Where it becomes more interesting is when non-std context is required (e.g. Tokio IO futures which require being in a scoped Tokio context with an active reactor/runtime). Then you do want some marker with which to record your needed additional context. With the Provider API, that maps somewhat nicely as a const generic set of TypeId (for the lifetime-erased type tags, not the demanded type, which could be a lifetime-covariant type), but still carries a lot of questions on allowing providing (and propagating) "too much" guaranteed context.


  1. Because Cooperation is owned, I initially thought usage would look like

    let coop = poll_fn(|cx| cx.poll_cooperation()).await;
    async_work().await;
    coop.made_progress();
    

    to record that this future made progress. ↩︎

  2. i.e. ready or yield_now shouldn't poll cooperation or record progress, since they're "pure" and what polling them returns only depends on how many times they've been polled before. A sync function masquerading as a future shouldn't be recording progress. ↩︎

4 Likes

How do you envision async functions will access the cooperation mechanism? If I have a normal CPU-intensive function in my thread-per-core application, it's trivial to transform it into an async function that periodically yields (that's how we're solving the cooperation problem now), but less so to transform it into a manual future implementation.

I guess it's possible to provide some combinators (e.g., a coop_fn, that exposes the Cooperation object), but I wonder if there's a more ergonomic solution.

I think you may be talking about a different problem here.

The cooperation mechanism solves the problem of yield points that never actually return Poll::Pending (e.g. an MPSC channel's recv() that's always ready because the senders are adding items faster than the receiver can read them out). By asking for budget before recv() checks the channel for a new item, and returning Poll::Pending if there's no budget available, you ensure that a task cannot hold onto the thread indefinitely - eventually, the cooperation mechanism will say "you've used up all your budget, Poll::Pending time", and you yield.

If I've read you correctly, you're talking about the problem where I have fn foo() -> T which takes a lot of CPU time, and I want to translate it to fn foo() -> impl Future<Output = T> that has calls to tokio::task::yield_now().await dotted throughout it. This is trivial if you use async fn foo() -> T syntax, but not so easy if you want to write your own impl Future<Output = T> for block, and isn't solved by the coop mechanism.

In both cases there is code in a cooperative system that's hogging a hardware resource, namely the CPU, so it seems natural for the same solution to be used.

In my scenario I can indeed sprinkle calls to yield_now(), but that is both runtime specific and unconditional, whereas with Cooperation I can know whether I'm over budget and should yield or if I can keep going (in Seastar I had maybe_yield).

Even in impl Future<Output = T> for blocks, I can unconditionally return Poll::Pending in certain places, which is what I would be doing in an async fn foo() context with yield_now().

(NOT A CONTRIBUTION)

You should use a yield_now primitive which consumes some cooperation budget when it doesn't yield if you want to use this feature with that pattern. That's basically the answer for everything: every primitive can opt into this feature, and then your async functions built on top of them get the feature by using those primitives.

EDIT: something like Seastar's maybe_yield (which yields if you've consumed too much budget) would look like this:

async fn maybe_yield() {
    poll_fn(|cx| {
        ready!(cx.poll_cooperate()).make_progress();
        Poll::Ready(())
    }).await;
}

Easy!

3 Likes

(NOT A CONTRIBUTION)

Anyway, I can't drive this idea any further than this thread. If anyone reading is excited by this idea, please do take it on.

5 Likes

Propagating the preemption across join!() or select!() calls will require every level to call into ready!(cx.poll_cooperative()), right? This adds a function call to every single branch of these macros. And this needs to be done regardless whether cooperation is enabled or not.

Some event-dispatchers on linux solve this by extending Poll with Poll::Preempted, so a caller immediately knows whether the call was pending due to missing progress, or pending because it was preempted. Sadly, I don't see how to extend the existing Poll-enum with this information in a backwards compatible way.

1 Like

Could you explain how the Poll::Preempted return enables a leaf future inside a select! or a join! to avoid calling into the cooperation mechanism? At first glance, Poll::Preempted is the return value of poll_cooperate when you want to use this mechanism, and thus it's not going to avoid the need to call into poll_cooperate.

(NOT A CONTRIBUTION)

It's true that when a tokio task runs out of budget, each branch of the task has to check for budget, without being able to short-circuit, because it doesn't know if the branch has returned pending for its own reasons or if it has returned pending because it has run out of budget. This is how it works already and I'm not aware of anyone finding it prohibitive.

I wouldn't say so:

  • If the primitives aren't using cooperation, of course it isn't done at all.
  • If the executor doesn't provide cooperation but you're using primitives which handle cooperation, this can be implemented as a null check rather than actually making a virtual call. And since it will always return success (an executor that doesn't provide cooperation is functionally giving the task an infinite budget), it will never cause that branch to return pending anyway.

The first time a branch returns Poll::Preempted, the select or join could forward it up and not check other branches, because it knows they will all return Poll::Preempted, since the entire task is out of budget. But you can't add a variant to Poll without breakage.

1 Like

Maybe there should be a way to signal that this pending was due to running out of budget?

(NOT A CONTRIBUTION)

That's exactly the point of @dvdhrm's suggestion, which as we've mentioned is not backwards compatible.

1 Like

Right, tail-primitives could decide to opt out, I think. But generic combinators like join!() better support it, shouldn't they? Otherwise, a join would needlessly poll into all remaining futures if the first is preempted.

True, I somehow missed that we can just guard the vtable-call with a null-check, sorry.

For tokio you mean? Fair enough. I just thought I share our experience and how we decided to solve the problem outside of tokio (and outside of rust, for now).

The thing is, if you opt into the feature, these vtable-calls cannot be optimized away. And once you start using "trivial futures" like future::ready a lot, each of them suddenly pulls in a vtable-call in combinators.

I thought I just raise the idea in case someone has a clever idea how to tie preempted: bool to Poll::Pending in a backwards compatible way. Alternatively, how about a trait like trait PreemptableFuture which returns the tri-state PreemptablePoll rather than Poll, and automatically implements Future which maps Preempted to Pending. This is definitely more disruptive (and thus maybe a non-starter for some?). But I wouldn't say the idea is not backwards-compatible per se.

1 Like

Could we put a flag into Context? If I understand correctly, the same Context is used throughout the task, and thus if I set a boolean flag out_of_budget to true, select!, join! and other primitives can return back immediately without polling other leaves.

The default would be for out_of_budget to always be false, forcing you to make poll_cooperate calls to find out if there's any budget left; but a smart implementation could set out_of_budget to true before returning Poll::Pending, thus telling the combinators that all calls to poll_cooperate will return Poll::Pending.

Idea is that out_of_budget is only true if there is a guarantee that all further calls to poll_cooperate on this specific Context will return Poll::Pending.

Isn't out_of_budget just cx.poll_cooperate().is_pending()? Combinators like select! could call poll_cooperate themselves in-between polling their sub-futures to decide whether to continue or short-circuit.

Yes it is - but without a virtual function call required, so the data cache should make the check cheaper than calling poll_cooperate and indirecting through the vtable to call the cooperation mechanism's check.

If poll_cooperate were inverse-fused that would avoid that cost too, as soon as the registered function returns Poll::Pending then Context can register that and start returning Poll::Pending without calling it.

That only half-avoids it, and you still have to have the flag inside Context, so that Context has somewhere to register that poll_cooperate returned Poll::Pending in the past.

With a fused poll_cooperate that latches the Poll::Pending state, every time the combinator chooses a leaf to run, it'll pay the price of a virtual function call to claim budget, and then pay the cost of dropping the returned Cooperation (which also involves a virtual function call) to return that budget to the system, until such time as poll_cooperate returns Poll::Pending, whereupon it improves to the same as the flag.

But what we want is what the flag offers - a very cheap check to say "it's worth polling another leaf" or "no, when the leaf calls poll_cooperate, it'll get Poll::Pending, so short-circuit the whole process now". That means eliding both the virtual function call when poll_cooperate returns Poll::Pending, and the virtual function calls that happen when it returns Poll::Ready<Cooperation>.

(NOT A CONTRIBUTION)

It's an interesting to have context contain a flag which flips to indicate that the task has been preempted. This would let us do better than tokio does now, because other branches would no longer need to make a virtual call. Potentially, this could have other use cases in the future, which maybe don't use cooperative task budgets but also have a notion of a task being "preempted."

There a few different ways this could work:

  • When the runtime's poll_cooperate callback returns Pending, Context could set this flag and in the future always returning Pending with poll_cooperate (as @Nemo157 suggests).
  • Context could have a new method for declaring that the task is preempted, and the runtime's poll_cooperate callback could take &mut Context as an argument.

The main advantage of the second would be that in the future other APIs could be added that can be called from poll_cooperate. The main disadvantage is that there are more ways to implement it incorrectly.

1 Like

(NOT A CONTRIBUTION)

A particularly relevant use case for the preemption flag is FuturesUnordered.

In the past, FuturesUnordered could exhibit quadratic behavior in a scenario in which all of its futures begin returning Pending because the task is out of budget. It would continue polling them, because they had been awoken, receiving a Pending every time. See here: Quadratic complexity in FuturesUnordered · Issue #2526 · rust-lang/futures-rs · GitHub

This was solved by arbitrarily limiting the number of futures FuturesUnordered would poll before yielding to 32. See here: FuturesUnordered: Limit max value of yield_every by taiki-e · Pull Request #2527 · rust-lang/futures-rs · GitHub

With a preemption flag, instead FuturesUnordered could simply check this flag before polling the next ready future and yield if it has been pre-empted. This would have the effect of making the limit on the number of futures dynamically determined by the runtime according to the budget the task has, rather than a fixed value of 32. This would avoid unnecessary yields (when more than 32 futures could be polled without preemption) and unnecessary polls (when the task is preempted at less than 32 futures being polled).

1 Like