Using coroutines for a sans-io parser

I've been experimenting with sans-io parsers, and they all result in creating manual state machines. So I've decided to give it a go with the unstable coroutines feature as it generates the state machines for me.

It works pretty well! However, using coroutines recursively is a bit painful. It might be useful to look into a .yield postfix that automatically yields the upper coroutine when the inner coroutine returns CoroutineState::Yielded (and the Resume and Yield types are compatible).

Feedback is very welcome!

(Also as a gist)

#![feature(coroutines, coroutine_trait)]

use futures_lite::{AsyncReadExt, AsyncSeekExt};
use std::{
    io::{Read, Seek},
    ops::{Coroutine, CoroutineState},
};

/// A type that can be parsed from a file.
pub trait Parser: Sized {
    /// Read this type at the given position.
    /// 
    /// Users should use the [`SyncParser`] and/or the [`AsyncParser`], instead
    /// of this trait. This trait is only for implementers.
    /// 
    /// After the coroutine is finished, `position` **must** be at the
    /// end of the type. It's not unsafe, but your parser will fail at some
    /// point.
    fn read_at_inner(
        position: &mut usize,
    ) -> impl Coroutine<IoBuffer, Yield = IoBuffer, Return = Self>;
}

/// A parser for the FLAC file format, currently only parses the magic number.
impl Parser for Flac {
    fn read_at_inner(position: &mut usize) -> impl Coroutine<IoBuffer, Yield = IoBuffer, Return = Self> {
        #[coroutine]
        static |mut buffer: IoBuffer| {
            // This would be much nicer:
            // let magic = u32::read_at(position).yield;
            let mut cor = std::pin::pin!(u32::read_at_inner(position));
            let magic = loop {
                match cor.as_mut().resume(buffer) {
                    CoroutineState::Yielded(buf) => buffer = yield buf,
                    CoroutineState::Complete(t) => break t,
                }
            };

            assert_eq!(&magic.to_ne_bytes(), b"fLaC", "Magic is wrong!");
            Flac { magic }
        }
    }
}

/// Trait that implements a synchronous interface for [`Parser`] types.
/// 
/// Automatically implemented for every type.
pub trait SyncParser: Parser {
    fn read_at(mut reader: impl Read + Seek, position: &mut usize) -> Self {
        let mut buffer = IoBuffer {
            buffer: vec![0; 16],
            buffer_start: *position,
            request: (*position, 0),
        };
        let mut cor = std::pin::pin!(Self::read_at_inner(position));
        loop {
            reader
                .seek(std::io::SeekFrom::Start(buffer.request.0 as u64))
                .unwrap();
            reader.read_exact(&mut buffer.buffer).unwrap();
            buffer.buffer_start = buffer.request.0;
            match cor.as_mut().resume(buffer) {
                CoroutineState::Yielded(buf) => buffer = buf,
                CoroutineState::Complete(t) => break t,
            }
        }
    }
}
impl<T: Parser> SyncParser for T {}

/// Trait that implements a asynchronous interface for [`Parser`] types.
/// 
/// Automatically implemented for every type.
pub trait AsyncParser: Parser {
    #[allow(async_fn_in_trait, reason = "Just a demo")]
    async fn read_at(
        mut reader: impl AsyncReadExt + AsyncSeekExt + Unpin,
        position: &mut usize,
    ) -> Self {
        let mut buffer = IoBuffer {
            buffer: vec![0; 16],
            buffer_start: *position,
            request: (*position, 0),
        };
        let mut cor = std::pin::pin!(Self::read_at_inner(position));
        loop {
            reader
                .seek(std::io::SeekFrom::Start(buffer.request.0 as u64))
                .await
                .unwrap();
            reader.read_exact(&mut buffer.buffer).await.unwrap();
            buffer.buffer_start = buffer.request.0;
            match cor.as_mut().resume(buffer) {
                CoroutineState::Yielded(buf) => buffer = buf,
                CoroutineState::Complete(t) => break t,
            }
        }
    }
}
impl<T: Parser> AsyncParser for T {}

// Supporting types

type AbsolutePosition = usize;
type BytesRequested = usize;

/// A read buffer used by the [`Parser`] coroutine to read bytes.
pub struct IoBuffer {
    /// The internal buffer.
    buffer: Vec<u8>,
    /// The absolute position in the file for which the buffer is a match.
    buffer_start: AbsolutePosition,
    /// When not enough bytes are available set the position and amount of bytes needed.
    pub request: (AbsolutePosition, BytesRequested),
}

impl IoBuffer {
    /// Read `N` bytes at position.
    /// 
    /// Will set the request field to the required amount if not enough bytes are available.
    pub fn read_bytes<const N: usize>(&mut self, position: &mut usize) -> Option<[u8; N]> {
        if *position < self.buffer_start {
            self.request.0 = *position;
            self.request.1 = N;
            return None;
        }
        let diff = *position - self.buffer_start;
        if diff + N > self.buffer.len() {
            self.request.0 = *position;
            self.request.1 = N;
        }
        Some(<[u8; N]>::try_from(&self.buffer[diff..diff + N]).expect("Unreachable!"))
    }
}

/// Implementation for u32.
/// 
/// The library would contain implementations for all primitive (and some "complex") types.
/// So a user would almost never have to implement the loop themselves.
impl Parser for u32 {
    fn read_at_inner(position: &mut usize) -> impl Coroutine<IoBuffer, Yield = IoBuffer, Return = Self> {
        #[coroutine]
        |mut buffer: IoBuffer| {
            let bytes = loop {
                if let Some(bytes) = buffer.read_bytes::<4>(position) {
                    break bytes;
                }
                buffer = yield buffer;
            };
            *position += 4;
            u32::from_le_bytes(bytes)
        }
    }
}

pub struct Flac {
    pub magic: u32,
}

This is the operation that e.g. Python calls yield from, which is semantically the same operation as .await (specialized to the async resume and yield types).

Resume arguments make it a bit complicated, I prototyped a macro implementation years ago which has to also take a binding to update the resume argument in-place in (playground). Python avoids some of the issue because the very first invocation of a python generator does not take a resume argument.

EDIT: Actually, looking at that again I just realized it's only working because the resume arg was Copy. Fixing that means that it doesn't really need the updateable binding, it just needs a first-resume-arg and once the inner coroutine completes there is no resume arg available (unless the coroutine returns it as part of its completion) (updated playground).

1 Like

That (updated) macro is really nice!

I agree that it gets complicated with resume arguments and it's definitely not something that needs to be in the mvp of coroutines, but would be a very nice additon later on.

So, without resume arguments, it's not really a (semi[1])coroutine; it's a generator instead[2].

I'm personally still fond of the "magic argument rebind" style of coroutines[3], personally, which makes a yield from even more interesting. I also think coroutines are just "FnPin" shaped rather than something more involved. But that's a bikeshed far down the road. Generators which can yield but take a (maybe pinned?) Iterator are first on the agenda, due to being a simpler design space and something we'd want syntax for anyway.


  1. I'm the only person in the Rust space who really cares about this distinction — a semicoroutine can only yield back to its caller, but a full coroutine can yield control flow to any coroutine.

    • subroutine: single entry (call), single exit (return)
    • semicoroutine: multi entry (resume), single exit (yield)
    • coroutine: multi entry (resume), multi exit (yield to)
    ↩︎
  2. Generators are a subset of coroutines, so it's a coroutine. But the useful distinction is that a generator doesn't have a resume argument, you just poll for its next output. ↩︎

  3. Instead of |buf: _| loop { buf = yield buf } you could just write |buf: &mut _| loop { yield } and have the borrow of buf passed in each resume. ↩︎

Just to clarify, I meant that yield from does not need to be in the mvp. I absolutely would like to have Resume arguments in the mvp. I personally also prefer the magic argument style, sadly that's not what's currently implemented.