Each time I use channels, I am frustrated by a lack of a map
function that would let me hand over a Sender
for one type and rcv
stuff in another type. Right now, the only way of doing this is to either break abstraction and design with callbacks, or create one thread per channel()
, which is extremely annoying.
The problem at hand
Consider an event loop, defined as follows:
let (tx, rx) = channel();
thread::spawn(move || {
for event in rx {
match event {
UIEvent::KeyboardEvent(ev) => { ... },
UIEvent::MouseEvent(ev) => { ... },
...
}
}
});
Now, imagine a system library that can watch for keyboard events, with the following signature:
impl ThirdPartyLibrary {
fn register_watch(&self, on_event: Sender<PrimitiveKeyboardEvent>) -> ...;
}
How can I interact with this library? Well, at the moment, my only way is to fire another thread, as follows:
let (tx2, rx2) = channel();
let tx = tx.clone(); // That's the tx for my event loop, see above.
thread::spawn(move || {
for ev in rx {
match tx.send(UIEvent::KeyboardEvent(ev) {
Ok(_) => {},
Err(_) => return, // Cleanup if nobody is listening anymore.
}
}
});
third_party_library.register_watch(tx2);
Instead, I would have preferred writing
third_party_library.register_watch(tx.clone().map(|ev| UIEvent::KeyboardEvent(ev)));
Now, let’s assume that the situation is slightly more complicated and that our system needs to handle several keyboards. Now, we need to label each keyboard with a unique key.
For the moment, the only solution is to fire one thread per keyboard, i.e.
let key = ...;
let (tx3, rx3) = channel();
let tx = tx.clone(); // That's the tx for my event loop, see above.
thread::spawn(move || {
for ev in rx {
match tx.send(UIEvent::KeyboardEvent(key, ev) {
Ok(_) => {},
Err(_) => return, // Cleanup if nobody is listening anymore.
}
}
});
third_party_library.register_watch(tx3);
Instead, I would have preferred to write
let key = ...;
third_party_library.register_watch(tx.clone().map(move |ev| UIEvent::KeyboardEvent(key, ev)));
Why is this a problem?
- Threads are not free. In a large system, we can easily end up with hundreds of threads fired just for the sake of adapting or annotating data.
- This is pretty non-intuitive and verbose.
Redesigning the use of Sender
At this stage, one can imagine that the problem comes from the user of Sender
. Perhaps it would be better to use Fn(T)
?
Surely, we could write
third_party_library.register_watch(move |event| {
/* somehow dispatch to target*/
});
Let’s redesign the library accordingly:
impl ThirdPartyLibrary {
fn register_watch_2<F>(&self, on_event: F) -> ...
where F: Fn(PrimitiveKeyboardEvent) + Send;
}
In simple cases, this works nicely. However, we have introduced a major limitation: closures are pretty hard to share between threads or even between further closures. In our example, for instance, maybe ThirdPartyLibrary
relies upon several system callbacks, one that is triggered when the user presses a key and one when the user releases the key, and perhaps one that is triggered when the toolkit decides to start auto-repeating the keypress.
How can we implement this? Well, since our system callbacks cannot share a handle to on_event
, there is a single solution: we need to fire an event loop in charge of firing on_event
, and make sure that the system callbacks dispatch messages to this event loop.
By doing this, we have repeated our original problem. It’s actually slightly worse, since in our original snippets, the developer could decide when to launch threads, while in the new version, the third-party library takes the decision, hence removing control from the developer.
Redesigning the use of Sender
, v2
Fair enough, in this case, let’s make it possible to either share the closure between threads and/or callbacks. For this purpose, we need to be able to clone it. So we need to put the closure behind a Rc
or a Arc
.
- If we pick
Rc
, we lose the ability to dispatch the closure to other threads, so we have made the situation even worse. - If we pick
Arc
, we also need the closure to beSync
. Suddenly, we have made our closure extremely weak. In particular, the closure cannot contain anySender
, so we lose the ability to talk to threads.
In either case, we have reached game over.
Does this really happen?
Yes, I am confronted to this all the time in Project Link. This piece of software is watching for inputs from any number of devices, on behalf of any number of clients.
In Project Link, just waiting for an alarm clock to fire requires traversing two abstraction barriers (high-level API and driver-level API). Prior to writing transformable_channels
(which is a prototype for this pre-rfc), each call to the corresponding method caused us to fire one thread per watching client. Multiply this by 50 or 100 scripts running on the system and we will quickly end up with thousands of threads, exhausting the limited resources of embedded devices.
Suggestion
I would like to suggest the following extension to std::sync::channel::mpsc
.
/// A sender that supports `map`, `filter`, ...
///
/// This trait cannot be converted into an object but we recover the ability to
/// manipulate trait objects below.
pub trait TransformableSender<T>: Clone {
/// Apply a transformation to all values prior to sending them.
///
/// Function `f` is executed at the Sender. Any panic in `f` will
/// panic the sending thread.
fn map<F, U>(&self, f: F) -> MappedSender<U, F, Self>
where F: Fn(U) -> T + Send + Sync {
// ...
}
/// Before sending a value, apply a transformation or discard
/// the value.
///
/// Function `f` is executed at the Sender. Any panic in `f` will
/// panic the sending thread.
fn filter_map<F, U>(&self, f: F) -> FilterMappedSender<U, F, Self>
where F: Fn(U) -> Option<T> + Send + Sync {
// ...
}
/// Accept or discard values prior to sending them.
///
/// Function `f` is executed at the Sender. Any panic in `f` will
/// panic the sending thread.
fn filter<F>(&self, f: F) -> FilteredSender<F, Self>
where F: Fn(&T) -> bool + Send + Sync {
// ...
}
}
/// Utility trait used to allow defining trait objects for `TransformableSender`
pub trait SenderTrait<T> {
fn send(&self) -> Result<(), ()>;
}
/// Any `Sender` is both `TransformableSender` and `SenderTrait`.
impl<T> TransformableSender<T> for Sender<T> { }
impl<T> SenderTrait<T> for Sender<T> {
// ...
}
/// Extend trait objects for `SenderTrait<T>` into `TransformableSender<T>`.
impl<T> TransformableSender<T> for Box<SenderTrait<T>> { }
// Similarly, instances of `MappedSender`, `FilterMappedSender`, `FilteredSender` should
// implement `SenderTrait` and `TransformableSender`.
A prototype crate for this is available here.
Can this be implemented in a lib?
More or less. With a large dose of unsafe
code, one could implement something like Receiver
and Sender
that supports map
and transform
.
edit Added an example.
edit “Can this be implemented in a lib?”
edit Extended discussion based on remarks by @eddyb, revised API based on prototype.