[Pre-RFC] Transformations on channels

Each time I use channels, I am frustrated by a lack of a map function that would let me hand over a Sender for one type and rcv stuff in another type. Right now, the only way of doing this is to either break abstraction and design with callbacks, or create one thread per channel(), which is extremely annoying.

The problem at hand

Consider an event loop, defined as follows:

let (tx, rx) = channel();
thread::spawn(move || {
  for event in rx {
    match event {
      UIEvent::KeyboardEvent(ev) => { ... },
      UIEvent::MouseEvent(ev) => { ... },
      ...
    }
  }
});

Now, imagine a system library that can watch for keyboard events, with the following signature:

impl ThirdPartyLibrary {
  fn register_watch(&self, on_event: Sender<PrimitiveKeyboardEvent>) -> ...;
}

How can I interact with this library? Well, at the moment, my only way is to fire another thread, as follows:

let (tx2, rx2) = channel();
let tx = tx.clone(); // That's the tx for my event loop, see above.
thread::spawn(move || {
  for ev in rx {
    match tx.send(UIEvent::KeyboardEvent(ev) {
      Ok(_) => {},
      Err(_) => return, // Cleanup if nobody is listening anymore.
    }
  }
});

third_party_library.register_watch(tx2);

Instead, I would have preferred writing

third_party_library.register_watch(tx.clone().map(|ev| UIEvent::KeyboardEvent(ev)));

Now, let’s assume that the situation is slightly more complicated and that our system needs to handle several keyboards. Now, we need to label each keyboard with a unique key.

For the moment, the only solution is to fire one thread per keyboard, i.e.

let key = ...;
let (tx3, rx3) = channel();
let tx = tx.clone(); // That's the tx for my event loop, see above.
thread::spawn(move || {
  for ev in rx {
    match tx.send(UIEvent::KeyboardEvent(key, ev) {
      Ok(_) => {},
      Err(_) => return, // Cleanup if nobody is listening anymore.
    }
  }
});

third_party_library.register_watch(tx3);

Instead, I would have preferred to write

let key = ...;
third_party_library.register_watch(tx.clone().map(move |ev| UIEvent::KeyboardEvent(key, ev)));

Why is this a problem?

  1. Threads are not free. In a large system, we can easily end up with hundreds of threads fired just for the sake of adapting or annotating data.
  2. This is pretty non-intuitive and verbose.

Redesigning the use of Sender

At this stage, one can imagine that the problem comes from the user of Sender. Perhaps it would be better to use Fn(T)?

Surely, we could write

third_party_library.register_watch(move |event| {
  /* somehow dispatch to target*/
});

Let’s redesign the library accordingly:

impl ThirdPartyLibrary {
  fn register_watch_2<F>(&self, on_event: F) -> ...
    where F: Fn(PrimitiveKeyboardEvent) + Send;
}

In simple cases, this works nicely. However, we have introduced a major limitation: closures are pretty hard to share between threads or even between further closures. In our example, for instance, maybe ThirdPartyLibrary relies upon several system callbacks, one that is triggered when the user presses a key and one when the user releases the key, and perhaps one that is triggered when the toolkit decides to start auto-repeating the keypress.

How can we implement this? Well, since our system callbacks cannot share a handle to on_event, there is a single solution: we need to fire an event loop in charge of firing on_event, and make sure that the system callbacks dispatch messages to this event loop.

By doing this, we have repeated our original problem. It’s actually slightly worse, since in our original snippets, the developer could decide when to launch threads, while in the new version, the third-party library takes the decision, hence removing control from the developer.

Redesigning the use of Sender, v2

Fair enough, in this case, let’s make it possible to either share the closure between threads and/or callbacks. For this purpose, we need to be able to clone it. So we need to put the closure behind a Rc or a Arc.

  • If we pick Rc, we lose the ability to dispatch the closure to other threads, so we have made the situation even worse.
  • If we pick Arc, we also need the closure to be Sync. Suddenly, we have made our closure extremely weak. In particular, the closure cannot contain any Sender, so we lose the ability to talk to threads.

In either case, we have reached game over.

Does this really happen?

Yes, I am confronted to this all the time in Project Link. This piece of software is watching for inputs from any number of devices, on behalf of any number of clients.

In Project Link, just waiting for an alarm clock to fire requires traversing two abstraction barriers (high-level API and driver-level API). Prior to writing transformable_channels (which is a prototype for this pre-rfc), each call to the corresponding method caused us to fire one thread per watching client. Multiply this by 50 or 100 scripts running on the system and we will quickly end up with thousands of threads, exhausting the limited resources of embedded devices.

Suggestion

I would like to suggest the following extension to std::sync::channel::mpsc.

/// A sender that supports `map`, `filter`, ...
///
/// This trait cannot be converted into an object but we recover the ability to
/// manipulate trait objects below.
pub trait TransformableSender<T>: Clone {
  /// Apply a transformation to all values prior to sending them.
  ///
  /// Function `f` is executed at the Sender. Any panic in `f` will
  /// panic the sending thread.
  fn map<F, U>(&self, f: F) -> MappedSender<U, F, Self>
    where F: Fn(U) -> T + Send + Sync {
    // ...
  }

  /// Before sending a value, apply a transformation or discard
  /// the value.
  ///
  /// Function `f` is executed at the Sender. Any panic in `f` will
  /// panic the sending thread.
  fn filter_map<F, U>(&self, f: F) -> FilterMappedSender<U, F, Self>
    where F: Fn(U) -> Option<T> + Send + Sync {
    // ...
  }

  /// Accept or discard values prior to sending them.
  ///
  /// Function `f` is executed at the Sender. Any panic in `f` will
  /// panic the sending thread.
  fn filter<F>(&self, f: F) -> FilteredSender<F, Self>
    where F: Fn(&T) -> bool + Send + Sync {
    // ...
  }
}

/// Utility trait used to allow defining trait objects for `TransformableSender`
pub trait SenderTrait<T> {
  fn send(&self)  -> Result<(), ()>;
}

/// Any `Sender` is both `TransformableSender` and `SenderTrait`.
impl<T> TransformableSender<T> for Sender<T> { }
impl<T> SenderTrait<T> for Sender<T> {
  // ...
}

/// Extend trait objects for `SenderTrait<T>` into `TransformableSender<T>`.
impl<T> TransformableSender<T> for Box<SenderTrait<T>> { }

// Similarly, instances of `MappedSender`, `FilterMappedSender`, `FilteredSender` should
// implement `SenderTrait` and `TransformableSender`.

A prototype crate for this is available here.

Can this be implemented in a lib?

More or less. With a large dose of unsafe code, one could implement something like Receiver and Sender that supports map and transform.

edit Added an example.

edit “Can this be implemented in a lib?”

edit Extended discussion based on remarks by @eddyb, revised API based on prototype.

1 Like

This is easy to do outside of std:

use std::sync::mpsc;

struct MapRecv<T,U,F: Fn(T) -> U> {
    recv: mpsc::Receiver<T>,
    f: F,
}

impl<T,U,F: Fn(T) -> U> MapRecv<T,U,F> {
    fn recv(&self) -> Result<U, mpsc::RecvError> {
        self.recv.recv().map(|v|(self.f)(v))
    }
}

fn main() {
    use std::thread;
    
    let (send, recv) = mpsc::channel();
    let recv=MapRecv{recv:recv,f:|v|v+1};
    let handle = thread::spawn(move || {
        println!("{}",recv.recv().unwrap());
    });
    
    send.send(1u8).unwrap();

    handle.join().unwrap();
}

Receiver can also be converted to an Iterator, which can be mapped, filtered, and so on. This isn’t perfect, because you lose the ability to dispatch over the error value or perform a try_recv(), but it seems to cover many of these use cases.

1 Like

@jethrogb and @withoutboats, the solutions you give me work if and only if I have a dedicated thread specifically for the rx. Having to fire a thread just for a map or filter-like transformation is pretty harsh. That’s exactly what my pre-RFC intends to solve.

Receiver can only be owned by a single thread, so I’m not sure how my implementation above puts extra restrictions on that.

1 Like

Receiver can only be owned by a single thread, so I'm not sure how my implementation above puts extra restrictions on that.

It doesn't, but it requires spawning a thread. And threads aren't free.

I've wanted this aswell. I think a better solution would be to make Sender<T> implement Extend<T> and add some methods to Extend that mirror Iterator's methods. eg.

pub struct ExtendMap<'a, E, F> {
    extending: &'a mut E,
    f: F,
}

trait Extend<A> {
    fn map<'a, B, F>(&'a mut self, f: F) -> ExtendMap<'a, Self, F>
        where F: FnMut(B) -> A
    {
        ExtendMap {
            extending: self,
            f: f,
        }
    }
}

impl<A, B, E, F> Extend<B> for ExtendMap<E, F>
    where E: Extend<A>,
          F: FnMut(B) -> A
{
    fn extend<I>(&mut self, iterator: I)
        where I: IntoIterator<Item=B>
    {
        self.extending.extend(iterator.into_iter().map(self.f))
    }
}

And then similarly for filter_map, filter, or any of the other Iterator methods that return another Iterator.

@jethrogb I have added a large example in my OP. This should make the problem clearer.

If you can make this work, I would be fine with it, too. I’m pretty sure that F needs to implement Send, though.

Ah, I see. From your OP it wasn’t clear to me that you wanted to have multiple Senders that accept different types mapping to a receiver with the same type.

I think you can make this work by having the library use the following as a parameter:

trait Sendable<T>: Send + 'static + Clone {
    fn send(&self, t: T) -> Result<(), mpsc::SendError<()>>;
}

Working example:

use std::sync::{Arc,mpsc};
use std::marker::PhantomData;
use std::thread;

struct MapSend<T,U,F: Fn(T) -> U + Sync> {
    send: mpsc::Sender<U>,
    f: Arc<F>,
    _t: PhantomData<T>,
}

impl<T,U,F: Fn(T) -> U + Sync> Clone for MapSend<T,U,F> {
    fn clone(&self) -> Self {
        MapSend {
            send: self.send.clone(),
            f: self.f.clone(),
            _t: PhantomData
        }
    }
}

impl<T,U,F: Fn(T) -> U + Sync> MapSend<T,U,F> {
    fn send(&self, t: T) -> Result<(), mpsc::SendError<()>> {
        self.send.send((self.f)(t)).map_err(|_|mpsc::SendError(()))
    }
}

trait Sendable<T>: Send + 'static + Clone {
    fn send(&self, t: T) -> Result<(), mpsc::SendError<()>>;
}

impl<T: Send + 'static,U: Send + 'static,F: Fn(T) -> U + Sync + Send + 'static> Sendable<T> for MapSend<T,U,F> {
    fn send(&self, t: T) -> Result<(), mpsc::SendError<()>> {
        MapSend::send(self,t)
    }
}

impl<T: Send + 'static> Sendable<T> for mpsc::Sender<T> {
    fn send(&self, t: T) -> Result<(), mpsc::SendError<()>> {
        mpsc::Sender::send(self,t).map_err(|_|mpsc::SendError(()))
    }
}

fn send_thread<T: Send + 'static + Copy,S: Sendable<T>>(v: T, s: S) -> (thread::JoinHandle<()>,thread::JoinHandle<()>) {
    let s2=s.clone();
    (thread::spawn(move || {
        s2.send(v).unwrap();
    }),
    thread::spawn(move || {
        s.send(v).unwrap();
    }))
}

fn send_no_thread<T: Send + 'static,S: Sendable<T>>(v: T, s: S) {
    s.send(v).unwrap();
}

fn main() {
    let (send, recv) = mpsc::channel();

    let send_u8_1=MapSend{send:send.clone(),f:Arc::new(|v|(v as u32)|0x1000_0000),_t:PhantomData};
    let send_u8_2=send_u8_1.clone();
    let send_u32_map_1=MapSend{send:send.clone(),f:Arc::new(|v|v+1),_t:PhantomData};
    let send_u32_map_2=send_u32_map_1.clone();
    let send_u32_orig_1=send;
    let send_u32_orig_2=send_u32_orig_1.clone();
    
    send_thread(0x1u8,send_u8_1);
    send_thread(0x7u32,send_u32_map_1);
    send_thread(0xbu32,send_u32_orig_1);
    send_no_thread(0x1u8,send_u8_2);
    send_no_thread(0x7u32,send_u32_map_2);
    send_no_thread(0xbu32,send_u32_orig_2);

    for _ in 0..9 {
        println!("{:x}",recv.recv().unwrap());
    }
}

I had to fudge a little with the return type of send but I don’t think that’s a big deal. Note that while closures aren’t Clone, certain closures are Sync so you can share them with an Arc.

I don't understand this comment that you and @Yoric have both made. If you want to receive a value on a thread, that thread has to exist, nothing about the interface changes that. I think I am badly misunderstanding what you mean, could you restate?

Mmmmh… that looks good.

Your snippet won’t let me create trait objects, but I have derived from your idea a version that can and that seems to work nicely with Project Link. I’ll publish once I have finished documenting and writing tests.

Thanks a lot!

1 Like

I have added a large example in the OP. This should make things clearer.

Tests look good. Initial documentation may be found here. Thanks again for the help, @jethrogb.

Still, I think that this would make a pretty useful entry in the stdlib.

1 Like

I see. The key thing I was missing here is that map is inverted: to convert Sender<T> to Sender<U>, you map Fn(U) -> T instead of Fn(T) -> U.

The reason this is needed is that a dependency takes a sending end of a channel as an argument, instead of returning values and allowing you to process them however you like. Why would someone want to require using a channel as a part of their library’s public API?

1 Like

If that API takes a FnMut(T) instead, i.e. “internal iterators” (where Iterator is “external”), it can work with Sender<T> or Sender<U> or anything else you might want to use (like a single-threaded VecDeque<T>).

I realize that. However, but things get hairy as soon as the same callback needs to be shared between several threads and/or yet further callbacks (which is my use case).

For one thing, this means that you need a Arc<FnMut(T) + Sync>. I may be wrong, but I suspect that FnMut(T) + Sync doesn’t make much sense. Maybe we can fallback to something like Arc<Mutex<Box<FnMut(T)>>>, I’m not sure, but that’s much more complex, plus, well, we now have a Mutex, which is often annoying.

What if we restrict ourselves to Arc<Fn(T) + Sync>? Well, it’s a bit simpler, but we are still hit by Sync. If our closure needs to send a message to a Sender<?>, for instance, the closure cannot implement Sync. Unless I’m missing something, this is a dead-end for my use case (which does require dispatching stuff to an event loop through a Sender).

Oh, you just want to be able to clone it then - sadly you can’t do that with closures yet :(.

This topic was automatically closed 90 days after the last reply. New replies are no longer allowed.