Going from AsyncWrite to AsyncRead

futures-util::io::copy provides a efficient way to write a AsyncRead into a Asyncwrite. I recently found myself in need for the other way around: I need the AsyncWrite interface to expose a AsyncRead.

Well not exactly. I needed a way to go from AsyncWrite to Stream<Result> in axum to Stream big Zip files. The corresponding issue on github sparked interest from the axum developers, so I implemented a first draft. The usage currently looks like:

// Stream implements: Stream<Item=Result<Bytes>>
let stream = Stream::new(|w: Writer| async move {
    let mut w = std::pin::pin!(w);
    write_all(&mut w, &[42]).await?;
    write_all(w, &[42]).await
});

Even if I still think, that this would be very valuable in the axum project, the abstraction-level feels wrong. The implementation doesn't use any axum related code, just Stream and Bytes, so it could be used on the client side too. Imo, this fundamental building-block shouldn't be in a small standalone project, as this contributes to projects having thousands of small dependencies, which is making supply-chain security a nightmare. I struggled to find a project where it would fit.

I then asked myself, if the official futures-util crate could add the difficult (unsafe) part of this conversion, by providing a AsyncRead from a Closure, which gets a AsyncWrite and returns a Future<Output=std::io::Result<()>> when it finishes. This would fit, as it doesn't introduce any dependencies and it is a zero-cost abstraction without any intermediate buffers.

There are several similar utilities in tokio-util.

I've seen and used ReaderStreamsucessfully together with axum. Unfortunately, the tokio-dependency is a showstopper since the code is used in the Browser too for uploads... But AsyncReadStream<Item=Bytes> is rather simple and was discussed in futures-util already (issue)

I have not seen a helper in tokio-util, which allows writing to a AsyncWrite to get a AsyncRead... I need that to write to AsyncZipWriter.... In the first attempt, i used ReaderStream with a tokio free alternative of tokio_util::io::simplex and spawned a background task to fill the Writer... But this involves spawning a Task, using a Queue and its a mess, to correctly handle io-Errors in the spawned Task correctly… Use of ? there is almost certainly wrong, as we need to Kill the connection, so the receiver knows, that things went south and doesn’t just get EOF…

With the proposed util, it could be implemented runtime independent, without allocations and even without locking or Atomics, which should enable aggressive compiler optimizations… Such encoders (what they basically are) could even be used in no-std/no-alloc environments

The cool thing about this abstraction is, that polling the stream also drives the future if progress should be made…

It's certainly an interesting abstraction. I was going to say that we could add one to tokio-util, and that you could ask the futures-util maintainers about that too. But:

Thinking a bit more about it, it does suffer one challenge, which is that when you stop reading, the internal future just stops running. If the internal future holds a mutex or semaphore or similar, then you could get a deadlock. That would be another instance of the classic FutureLock bug. I'm not sure I want to add any more abstractions with this problem.

Spawning solves that problem, though.

Does the FutureLock bug really apply here? If the Stream is dropped, so is the Future and any potential Mutexes it curently holds… I think it’s just a problem if we have clients which stops polling new data…

But if a Http-Client could pause the polling of our Abstraction’s AsyncRead, it would be a problem in all async code, which is using mutexes… This would be a weakness in the WebServer implementation

If the user is using stuff like select! within our future or outside (using our AsyncRead), he has to know what he’s doing until we get undropable types or a similar language solution. I don’t see, how this abstraction makes things worse. If the user holds a Mutex in the inner future from start to end, this might be a actual requirement.

Or do you mean/think, that it’s not fundamentally wrong to poll only futures from other AsyncRead if a poll_read returns ready, without getting the next Future of a AsyncRead and polling it together with the rest?

Basically the problem is this scenario:

let stream = Stream::new(|w: Writer| async move {
    tokio::join!(async {
        let mut w = std::pin::pin!(w);
        write_all(&mut w, &[42]).await?;
        write_all(w, &[42]).await
    }, async {
        some_other_async().await;
    });
});

In this code, execution of code inside some_other_async().await may randomly stop executing at any .await point.

What do you mean, by ‘stop executing’? E.g. returning Poll::Pending because of a Mutex? The future at position 1 can still continue, right? Or is the problem, that some_other_async might not continue for some time untill there is space?

Edit: Or did you mean, that, if future1 returns io::Error, future2 doesn’t continue? This is not possible with join, since the Error of future_1 can only be propagated, when some_other_async finishes too…

Did you think about this? ```

let other = std::pin::pin!(some_other_async());
let stream = Stream::new(|w: Writer| async move {
    tokio::join!(async {
        let mut w = std::pin::pin!(w);
        write_all(&mut w, &[42]).await?;
        write_all(w, &[42]).await
    }, async {
        other.await;
    });
});
consume(stream).await;
/// Here, other could not be unfinished, even if there is still a ref available

If we just make sure, that we always finish the future which is given into Stream::new, we should be good… In contrast if it were a Sender::Item=Result<(), io::Error>, the future doens’t have the possibility to send a Std::err and not be done… It can only finish via Err(io::Error) to propagate errors…