Joining and cooperative interruptible threads, for Rust?

This is about C++'s std::jthread (planned for C++ 20 I think), which we've discussed a while back.

jthread provides three improvements for writing blocking code on top of threads:

  • guarantee that threads are always joined when the thread handle is dropped (which improves robustness and predictability of threaded applications quite a bit)
  • a C#-style CancellationToken, which can be used to request thread's cancellation in cooperative way (in particular, dropping a thread's handle requests cancellation before calling a blocking join).
  • a way to register callbacks with cancellation tokens, which makes them play nice with select style API.

The last point I think was added since we've discussed this last time, and significantly improves the feature. While previously you couldn't really rely on cancellation in case of a syscall like read, with callbacks it is now possible to implement proper cancellation.

It seems to me that this new API is really nice, and markedly better than current std::thread unconstrained concurrency. Moreover, in Rust we have std::future::Future which hopefully provides a better interface for selectable things than callbacks. I've tried this out in the jthread-rs and I must say I like the result:

use jthread;

// Note: this is synchronous, blocking code.
fn main() {
    let reader = jthread::spawn(|stop_token| {
        let mut stdin = jthread::io::stdin();

        // `stop_token` allows to break free from a blocking read call
        while let Ok(line) = stdin.read_line(&stop_token) {
            let line = line.unwrap();
            println!("> {:?}", line)
        }
        println!("exiting") // this will be printed to the screen
    });

    std::thread::sleep(std::time::Duration::from_secs(3));
    drop(reader); // cancels and joins the reader thread
}

That's just a proof of concept and, in particular, there's no counterpart to "stop_callback is deregistered in destructor" feature of C++ impl. I have neither capacity nor expertise to pursue these ideas further, but if someone runs with it and publishes jthread crate in Rust, I'd be delighted!

This discussion on structured concurrency forum is relevant: https://trio.discourse.group/t/structured-concurrency-proposal-for-c-20/125

14 Likes

@matklad Are jthreads intended to make ordinary threads more ergonomic, or are they green threads or fibers?

Basically, I'm trying to figure out how many live jthreads I can have at once; there are a lot of cases where it would be handy to spawn a more or less infinite number of threads, canceling them selectively once you discover that they are useless for one reason or another. In your code I see that you're using std::thread, but I don't know if that is a requirement of jthreads.

jthreads are OS threads. Though, the similar pattern should be applicable to any task system.

OK, thank you for the clarification.

As a huge fan of structured concurrency I really like the jthreads idea! Over the years I discovered that threads which outlive their parents are almost always a code smell, and that leaving out cancellation will introduce a lot of work if it tends to be required later on. So both are great to have!

I also think that one of the biggest benefits that the futures/async-await ecosystem has is cancellation support. By having cancellation support via [cancelation | stop]_token added for plain synchronous IO APIs (and things like synchronization primitives and crossbeam channels) those might get suitable for a new set of scenarios without requiring the [mental] overhead of going async/futures.

One idea that I particularly like is how jthread uses cancellation callbacks: CancellationTokens are easy and cheap (no heap allocations) to implement if only polled by the child thread. But that doesn't allow for some usage scenarios, e.g. there is no way to unblock a blocked syscall if the polling for cancellation happens outside of it. Originally I thought adding callbacks requires heap allocations to store the callback - like how C# CancellationToken.Register works. However storing the callback outside of the Cancellation/StopToken - potentially on the stack - and only linking it to the token via a pointer allows to avoid the allocation and thereby can make the mechanism cheaper and suitable for more use-cases (e.g. no-std).

I guess one question will come up if really makes sense to add and use those APIs if we already have futures/async-await which also provide cancellation capabilities and parent/child relations. I am leaning towards yes here, since not every application benefits from futures - for certain tasks a normal cancellable thread (or task running in a thread-pool) might be the better choice. Semantics are also slightly different, since futures only works if synchronous cancellation is supported whereas jthread is cooperative.

Regarding the implementation and interoperability with futures: I think interoperability is always great to have! But I think directly building on top of futures/wakers like done in the example might waste some potential. Futures/Wakers mostly require some heap allocations to store the task state - often in the form of refcounted objects so that it's safe to call the Waker even after the Future had been completed. This might not be necessary with cooperative cancellation and run-to-completion code - it might be possible to allocate all necessary data (incl. cancellation data and callbacks) on the stack, since blocking function calls are not allowed to refer to anything after they returned. futures-intrusive experiments with the idea of storing as much data as possible on the callers stack for futures based APIs - but I think we could potentially do even better for non-futures code.

1 Like

Completely agree here. In rust-analyzer, there's some tricky concurrency, because naturally concurrent user requests interact with the shared state. However, there are at most a dozen of requests in flight, so a thread pool makes much more sense, as threads have a real stack which you can see in debugger and which works for recursion.

This topic was automatically closed 90 days after the last reply. New replies are no longer allowed.