Introduce write_at/write_all_at/read_at/read_exact_at on Windows

I wasn't talking about the cost of kernel operations, but the hardware-level cost of changing page mappings repeatedly. In my head, the "map and unmap for each read" approach ought to be eating the cost of two partial TLB invalidations and a full page table walk for the new mapping every single time you do it, and that all by itself ought to be more expensive than the in-kernel locking cost for the seek. Clearly I do not understand something about how Windows does these things, and I'm still interested in seeing a complete benchmark program that I can tinker with myself.

When I have time I'll put something together that is roughly representative of the workload I have. Things mentioned above are what is being used, but I understand it is not quite the benchmark, so less useful because of it.

1 Like

Here is a demo app:

//! ```cargo
//! [dependencies]
//! fs2 = "0.4.3"
//! libc = "0.2.151"
//! rand = "0.8.5"
//! rayon = "1.8.0"
//!
//! [target.'cfg(windows)'.dependencies]
//! winapi = "0.3.9"
//! ```

use rand::prelude::*;
use rayon::prelude::*;
use std::fs::{File, OpenOptions};
use std::hint::black_box;
use std::io::{Seek, SeekFrom};
use std::path::Path;
use std::time::Instant;
use std::{env, io};

const SECTOR_SIZE: usize = 1024 * 1024 * 1024;
const CHUNK_SIZE: usize = 32;

fn test<F>(file: F, sectors: usize) -> io::Result<Vec<[u8; CHUNK_SIZE]>>
where
    F: ReadAtSync,
{
    let mut result = vec![[0u8; CHUNK_SIZE]; sectors];

    (0..sectors)
        .into_par_iter()
        .zip(&mut result)
        .try_for_each(|(offset, result)| {
            let sector_offset = offset * SECTOR_SIZE;
            let offset_within_sector =
                thread_rng().gen_range(0..SECTOR_SIZE / CHUNK_SIZE) * CHUNK_SIZE;

            file.read_at(result, sector_offset + offset_within_sector)
        })?;

    Ok(result)
}

fn main() -> io::Result<()> {
    let file = env::args().nth(1).unwrap();

    let sectors = (File::open(&file)?.seek(SeekFrom::End(0))? / SECTOR_SIZE as u64) as usize;

    {
        let file = OpenOptions::new()
            .read(true)
            .advise_random_access()
            .open(&file)?;
        file.advise_random_access()?;

        let start = Instant::now();
        black_box(test(file, sectors)?);
        println!(
            "Single file read {sectors} sectors in {:?}",
            start.elapsed()
        );
    }

    {
        let file = RayonFiles::open(&file)?;

        let start = Instant::now();
        black_box(test(file, sectors)?);
        println!(
            "Rayon files read {sectors} sectors in {:?}",
            start.elapsed()
        );
    }

    Ok(())
}

trait ReadAtSync: Send + Sync {
    /// Fill the buffer by reading bytes at a specific offset
    fn read_at(&self, buf: &mut [u8], offset: usize) -> io::Result<()>;
}

impl ReadAtSync for File {
    fn read_at(&self, buf: &mut [u8], offset: usize) -> io::Result<()> {
        self.read_exact_at(buf, offset as u64)
    }
}

struct RayonFiles {
    files: Vec<File>,
}

impl ReadAtSync for RayonFiles {
    fn read_at(&self, buf: &mut [u8], offset: usize) -> io::Result<()> {
        let thread_index = rayon::current_thread_index().ok_or_else(|| {
            io::Error::new(
                io::ErrorKind::Other,
                "Reads must be called from rayon worker thread",
            )
        })?;
        let file = self.files.get(thread_index).ok_or_else(|| {
            io::Error::new(io::ErrorKind::Other, "No files entry for this rayon thread")
        })?;

        file.read_at(buf, offset)
    }
}

impl RayonFiles {
    fn open<P>(path: P) -> io::Result<Self>
    where
        P: AsRef<Path>,
    {
        let files = (0..rayon::current_num_threads())
            .map(|_| {
                let file = OpenOptions::new()
                    .read(true)
                    .advise_random_access()
                    .open(path.as_ref())?;
                file.advise_random_access()?;

                Ok::<_, io::Error>(file)
            })
            .collect::<Result<Vec<_>, _>>()?;

        Ok(Self { files })
    }
}

/// Extension convenience trait that allows setting some file opening options in cross-platform way
trait OpenOptionsExt {
    /// Advise OS/file system that file will use random access and read-ahead behavior is
    /// undesirable, only has impact on Windows, for other operating systems see [`FileExt`]
    fn advise_random_access(&mut self) -> &mut Self;
}

impl OpenOptionsExt for OpenOptions {
    #[cfg(target_os = "linux")]
    fn advise_random_access(&mut self) -> &mut Self {
        // Not supported
        self
    }

    #[cfg(target_os = "macos")]
    fn advise_random_access(&mut self) -> &mut Self {
        // Not supported
        self
    }

    #[cfg(windows)]
    fn advise_random_access(&mut self) -> &mut Self {
        use std::os::windows::fs::OpenOptionsExt;
        self.custom_flags(winapi::um::winbase::FILE_FLAG_RANDOM_ACCESS)
    }
}

/// Extension convenience trait that allows pre-allocating files, suggesting random access pattern
/// and doing cross-platform exact reads/writes
trait FileExt {
    /// Advise OS/file system that file will use random access and read-ahead behavior is
    /// undesirable, on Windows this can only be set when file is opened, see [`OpenOptionsExt`]
    fn advise_random_access(&self) -> io::Result<()>;

    /// Read exact number of bytes at a specific offset
    fn read_exact_at(&self, buf: &mut [u8], offset: u64) -> io::Result<()>;
}

impl FileExt for File {
    #[cfg(target_os = "linux")]
    fn advise_random_access(&self) -> io::Result<()> {
        use std::os::unix::io::AsRawFd;
        let err = unsafe { libc::posix_fadvise(self.as_raw_fd(), 0, 0, libc::POSIX_FADV_RANDOM) };
        if err != 0 {
            Err(std::io::Error::from_raw_os_error(err))
        } else {
            Ok(())
        }
    }

    #[cfg(target_os = "macos")]
    fn advise_random_access(&self) -> io::Result<()> {
        use std::os::unix::io::AsRawFd;
        if unsafe { libc::fcntl(self.as_raw_fd(), libc::F_RDAHEAD, 0) } != 0 {
            Err(std::io::Error::last_os_error())
        } else {
            Ok(())
        }
    }

    #[cfg(windows)]
    fn advise_random_access(&self) -> io::Result<()> {
        // Not supported
        Ok(())
    }

    #[cfg(unix)]
    fn read_exact_at(&self, buf: &mut [u8], offset: u64) -> io::Result<()> {
        std::os::unix::fs::FileExt::read_exact_at(self, buf, offset)
    }

    #[cfg(windows)]
    fn read_exact_at(&self, mut buf: &mut [u8], mut offset: u64) -> io::Result<()> {
        while !buf.is_empty() {
            match std::os::windows::fs::FileExt::seek_read(self, buf, offset) {
                Ok(0) => {
                    break;
                }
                Ok(n) => {
                    buf = &mut buf[n..];
                    offset += n as u64;
                }
                Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => {
                    // Try again
                }
                Err(e) => {
                    return Err(e);
                }
            }
        }

        if !buf.is_empty() {
            Err(std::io::Error::new(
                std::io::ErrorKind::UnexpectedEof,
                "failed to fill whole buffer",
            ))
        } else {
            Ok(())
        }
    }
}

Can be ran as cargo -Zscript file.rs large-file.bin.

For testing purposes I ran this in Window 10 VM on 512GiB file:

Single file read 500 sectors in 62.9738ms
Rayon files read 500 sectors in 7.8492ms

VM has 20 logical cores, the disk is in raw format on NVMe SSD.

Was anyone able to reproduce mentioned behavior on Windows?

Probably last ping here, but was hoping for this thread to go somewhere, even if confirming that Windows APIs are bad performance-wise and it works like this by design.

Thanks for the reminder, I had completely forgotten about this thread.

I did a very simple experiment on Linux (kernel 6.6, x86-64, AMD Ryzen 9 3950X) with a modified version of your program (at end of this post). The modifications do two things: they let me run each test as a separate invocation of the program, which is important because that lets me capture a perf profile for each test separately, and they add a second independent test mode in which the test program uses the mmap-for-read trick that positioned-io uses on Windows.

Using a 2TB (decimal :stuck_out_tongue: ), completely blank NVMe storage device as the test file (this should mean that the controller just returns zeros for all reads without bothering to access the actual storage, thus minimizing the overhead of the actual I/O) I get these topline statistics:

Rayon files read 1863 sectors in 22.731573ms
Single file read 1863 sectors in 24.384357ms
Rayon files mmap 1863 sectors in 73.304096ms
Single file mmap 1863 sectors in 75.016002ms

This is what I expected to see: mmap+copy+unmap has substantial overhead relative to pread, and the effect of the number of open OS-level file handles is small but measurable. Now let's see what perf can tell us about where the time is being spent...

rayon-read
## CPU cycles
     2.37%  [kernel.kallsyms]     [k] queued_spin_lock_slowpath

## Stalled cycles (frontend)
     3.92%  [kernel.kallsyms]     [k] queued_spin_lock_slowpath

## Stalled cycles (backend)
     5.86%  [kernel.kallsyms]     [k] xas_find
     2.26%  [kernel.kallsyms]     [k] __noinstr_text_start
     1.24%  [kernel.kallsyms]     [k] queued_spin_lock_slowpath
     1.18%  [kernel.kallsyms]     [k] rwsem_spin_on_owner
     1.05%  read-at-benchmark     [.] crossbeam_epoch::default::with_handle

## L1 dcache misses
     2.27%  [kernel.kallsyms]     [k] clear_page_rep
     1.69%  [kernel.kallsyms]     [k] __noinstr_text_start
     1.54%  [kernel.kallsyms]     [k] queued_spin_lock_slowpath
     1.47%  [kernel.kallsyms]     [k] xas_find
     1.24%  read-at-benchmark     [.] crossbeam_epoch::default::with_handle
     1.09%  [kernel.kallsyms]     [k] xas_start

## L1 dTLB misses
     2.48%  read-at-benchmark     [.] crossbeam_epoch::default::with_handle
     2.40%  [kernel.kallsyms]     [k] __noinstr_text_start
     1.10%  [kernel.kallsyms]     [k] queued_spin_lock_slowpath
single-read
## CPU cycles
     2.77%  [kernel.kallsyms]     [k] queued_spin_lock_slowpath

## Stalled cycles (frontend)
     2.98%  [kernel.kallsyms]     [k] queued_spin_lock_slowpath

## Stalled cycles (backend)
     7.48%  [kernel.kallsyms]     [k] xas_find
     2.06%  [kernel.kallsyms]     [k] clear_page_rep
     1.51%  [kernel.kallsyms]     [k] __noinstr_text_start
     1.13%  read-at-benchmark     [.] crossbeam_epoch::default::with_handle
     1.00%  [kernel.kallsyms]     [k] queued_spin_lock_slowpath

## L1 dcache misses
     3.77%  [kernel.kallsyms]     [k] clear_page_rep
     2.08%  [kernel.kallsyms]     [k] __noinstr_text_start
     1.94%  read-at-benchmark     [.] crossbeam_epoch::default::with_handle
     1.44%  [kernel.kallsyms]     [k] queued_spin_lock_slowpath
     1.13%  [kernel.kallsyms]     [k] iommu_v1_map_pages
     1.04%  [kernel.kallsyms]     [k] update_irq_load_avg

## L1 dTLB misses
     5.15%  read-at-benchmark     [.] crossbeam_epoch::default::with_handle
     3.60%  [kernel.kallsyms]     [k] __noinstr_text_start
     1.55%  [kernel.kallsyms]     [k] up_read
     1.17%  [kernel.kallsyms]     [k] queued_spin_lock_slowpath
     1.13%  read-at-benchmark     [.] crossbeam_epoch::internal::Global::try_advance
     1.11%  read-at-benchmark     [.] crossbeam_deque::deque::Stealer<T>::steal
rayon-mmap
## CPU cycles
    38.05%  [kernel.kallsyms]     [k] queued_spin_lock_slowpath
     4.33%  [kernel.kallsyms]     [k] osq_lock
     3.56%  [kernel.kallsyms]     [k] clear_page_rep
     2.15%  [kernel.kallsyms]     [k] __filemap_add_folio
     2.12%  [kernel.kallsyms]     [k] xas_start
     1.85%  [kernel.kallsyms]     [k] smp_call_function_many_cond
     1.00%  [kernel.kallsyms]     [k] _raw_spin_lock_irq

## Stalled cycles (frontend)
    79.22%  [kernel.kallsyms]     [k] queued_spin_lock_slowpath
     1.90%  [kernel.kallsyms]     [k] smp_call_function_many_cond
     1.81%  [kernel.kallsyms]     [k] __filemap_add_folio
     1.63%  [kernel.kallsyms]     [k] clear_page_rep
     1.25%  [kernel.kallsyms]     [k] folio_add_lru

## Stalled cycles (backend)
    14.33%  [kernel.kallsyms]     [k] queued_spin_lock_slowpath
     8.83%  [kernel.kallsyms]     [k] osq_lock
     5.99%  [kernel.kallsyms]     [k] clear_page_rep
     3.97%  [kernel.kallsyms]     [k] xas_start
     2.83%  [kernel.kallsyms]     [k] _raw_spin_lock_irq
     2.71%  [kernel.kallsyms]     [k] __filemap_add_folio
     1.62%  [kernel.kallsyms]     [k] xas_descend
     1.45%  [kernel.kallsyms]     [k] smp_call_function_many_cond
     1.38%  [kernel.kallsyms]     [k] rwsem_spin_on_owner
     1.26%  [kernel.kallsyms]     [k] __noinstr_text_start
     1.17%  [kernel.kallsyms]     [k] do_mpage_readpage
     1.12%  [kernel.kallsyms]     [k] default_send_IPI_mask_sequence_phys
     1.08%  [kernel.kallsyms]     [k] folio_add_lru

## L1 dcache misses
    16.76%  [kernel.kallsyms]     [k] queued_spin_lock_slowpath
     6.31%  [kernel.kallsyms]     [k] clear_page_rep
     3.08%  [kernel.kallsyms]     [k] xas_start
     2.57%  [kernel.kallsyms]     [k] _raw_spin_lock_irq
     1.79%  [kernel.kallsyms]     [k] __filemap_add_folio
     1.53%  [kernel.kallsyms]     [k] xas_descend
     1.25%  [kernel.kallsyms]     [k] __noinstr_text_start

## L1 dTLB misses
     9.05%  [kernel.kallsyms]     [k] queued_spin_lock_slowpath
     3.39%  [kernel.kallsyms]     [k] clear_page_rep
     2.06%  [kernel.kallsyms]     [k] xas_start
     1.69%  [kernel.kallsyms]     [k] __noinstr_text_start
     1.64%  read-at-benchmark     [.] crossbeam_epoch::default::with_handle
     1.41%  [kernel.kallsyms]     [k] __filemap_add_folio
     1.23%  [kernel.kallsyms]     [k] osq_lock
     1.12%  [kernel.kallsyms]     [k] __schedule
     1.09%  [kernel.kallsyms]     [k] smp_call_function_many_cond
     1.01%  [kernel.kallsyms]     [k] default_send_IPI_mask_sequence_phys
     1.00%  [kernel.kallsyms]     [k] _raw_spin_lock_irq
single-mmap
## CPU cycles
    36.25%  [kernel.kallsyms]  [k] queued_spin_lock_slowpath
     4.86%  [kernel.kallsyms]  [k] osq_lock
     3.23%  [kernel.kallsyms]  [k] clear_page_rep
     1.98%  [kernel.kallsyms]  [k] xas_start
     1.58%  [kernel.kallsyms]  [k] smp_call_function_many_cond
     1.53%  [kernel.kallsyms]  [k] __filemap_add_folio
     1.25%  [kernel.kallsyms]  [k] _raw_spin_lock_irq

## Stalled cycles (frontend)
    77.35%  [kernel.kallsyms]  [k] queued_spin_lock_slowpath
     2.22%  [kernel.kallsyms]  [k] clear_page_rep
     2.13%  [kernel.kallsyms]  [k] smp_call_function_many_cond
     2.01%  [kernel.kallsyms]  [k] __filemap_add_folio
     1.38%  [kernel.kallsyms]  [k] rwsem_down_write_slowpath

## Stalled cycles (backend)
    13.27%  [kernel.kallsyms]     [k] queued_spin_lock_slowpath
     8.75%  [kernel.kallsyms]     [k] osq_lock
     5.74%  [kernel.kallsyms]     [k] clear_page_rep
     3.47%  [kernel.kallsyms]     [k] xas_start
     2.79%  [kernel.kallsyms]     [k] __filemap_add_folio
     2.19%  [kernel.kallsyms]     [k] _raw_spin_lock_irq
     1.97%  [kernel.kallsyms]     [k] rwsem_spin_on_owner
     1.65%  [kernel.kallsyms]     [k] xas_descend
     1.56%  [kernel.kallsyms]     [k] smp_call_function_many_cond
     1.22%  [kernel.kallsyms]     [k] folio_add_lru
     1.16%  [kernel.kallsyms]     [k] __rmqueue_pcplist
     1.16%  [kernel.kallsyms]     [k] default_send_IPI_mask_sequence_phys
     1.11%  [kernel.kallsyms]     [k] do_mpage_readpage
     1.08%  [kernel.kallsyms]     [k] xas_find

## L1 dcache misses
    15.96%  [kernel.kallsyms]  [k] queued_spin_lock_slowpath
     6.34%  [kernel.kallsyms]  [k] clear_page_rep
     3.00%  [kernel.kallsyms]  [k] _raw_spin_lock_irq
     2.45%  [kernel.kallsyms]  [k] xas_start
     1.96%  [kernel.kallsyms]  [k] __filemap_add_folio
     1.52%  [kernel.kallsyms]  [k] __noinstr_text_start
     1.14%  [kernel.kallsyms]  [k] xas_descend

## L1 dTLB misses
     9.47%  [kernel.kallsyms]  [k] queued_spin_lock_slowpath
     3.26%  [kernel.kallsyms]  [k] clear_page_rep
     1.99%  read-at-benchmark  [.] crossbeam_epoch::default::with_handle
     1.94%  [kernel.kallsyms]  [k] __noinstr_text_start
     1.73%  [kernel.kallsyms]  [k] xas_start
     1.53%  [kernel.kallsyms]  [k] __filemap_add_folio
     1.43%  [kernel.kallsyms]  [k] smp_call_function_many_cond
     1.32%  [kernel.kallsyms]  [k] osq_lock
     1.12%  [kernel.kallsyms]  [k] xas_descend
     1.06%  [kernel.kallsyms]  [k] default_send_IPI_mask_sequence_phys

My summary of this summary, along with some information gleaned from call graph mode: Using pread (aka read_exact_at), the biggest cost is locking overhead associated with the page cache (the leading caller of queued_spin_lock_slowpath is filemap_add_folio). When we switch to using mmap + user space copy + munmap, the locking overhead is much higher but it's still the page cache's locks that are contended (it's still filemap_add_folio calling queued_spin_lock_slowpath) and this is because we are eating lots more L1 d-cache and L1 d-TLB misses. Also, functions associated with TLB and cache invalidation (e.g. smp_call_function_many_cond) are showing up much higher in the profile than they were before.

So this confirms what I said last year: On Linux, the mmap+copy+unmap approach is significantly costlier (~3x) than the pread approach, because of additional hardware-level overhead: Creating and destroying file mappings invalidates the TLB and makes you eat cache misses. Therefore, for MapViewOfFile+copy+UnmapViewOfFile to not be slower than NtReadFile on Windows for this benchmark would require NtReadFile to be suffering from enormous amounts of software overhead, that isn't shared by the map/unmap primitives, and I still find this difficult to believe.

I do not know how to make Windows give me perf-style detailed profiles, tracing into the kernel, with hardware performance counting for things like pipeline stalls and cache and TLB misses, but I'm sure it's possible somehow. I'd like to see someone do the same test I did, on Windows, and produce the same kind of profiling report. (You will probably need to tweak the Windows version of mmap_read_at to get it to compile. I do not have a Windows development environment set up.)

It would also be interesting to see how the profile changes if one bypasses the page cache (O_DIRECT on Linux; I have read that there is a Windows equivalent but I don't remember any details). And someone who's better at wrangling perf than me might be able to say more specifically where the extra costs are coming from.

Modified benchmark program
//! ```cargo
//! [dependencies]
//! fs2 = "0.4.3"
//! libc = "0.2.151"
//! rand = "0.8.5"
//! rayon = "1.8.0"
//!
//! [target.'cfg(windows)'.dependencies]
//! winapi = "0.3.9"
//! ```

use rand::prelude::*;
use rayon::prelude::*;
use std::fs::{File, OpenOptions};
use std::hint::black_box;
use std::io::{Seek, SeekFrom};
use std::path::Path;
use std::time::Instant;
use std::{env, io};

const SECTOR_SIZE: usize = 1024 * 1024 * 1024;
const CHUNK_SIZE: usize = 32;

//FIXME should call sysconf(_SC_PAGESIZE)
const PAGE_SIZE: usize = 4096;

fn test_read<F>(file: F, sectors: usize) -> io::Result<Vec<[u8; CHUNK_SIZE]>>
where
    F: ReadAtSync,
{
    let mut result = vec![[0u8; CHUNK_SIZE]; sectors];

    (0..sectors)
        .into_par_iter()
        .zip(&mut result)
        .try_for_each(|(offset, result)| {
            let sector_offset = offset * SECTOR_SIZE;
            let offset_within_sector =
                thread_rng().gen_range(0..SECTOR_SIZE / CHUNK_SIZE) * CHUNK_SIZE;

            file.read_at(result, sector_offset + offset_within_sector)
        })?;

    Ok(result)
}

fn test_mmap<F>(file: F, sectors: usize) -> io::Result<Vec<[u8; CHUNK_SIZE]>>
where
    F: ReadAtSync,
{
    let mut result = vec![[0u8; CHUNK_SIZE]; sectors];

    (0..sectors)
        .into_par_iter()
        .zip(&mut result)
        .try_for_each(|(offset, result)| {
            let sector_offset = offset * SECTOR_SIZE;
            let offset_within_sector =
                thread_rng().gen_range(0..SECTOR_SIZE / CHUNK_SIZE) * CHUNK_SIZE;

            file.mmap_read_at(result, sector_offset + offset_within_sector)
        })?;

    Ok(result)
}

fn main() -> io::Result<()> {
    let mode = env::args().nth(1).unwrap();
    let file = env::args().nth(2).unwrap();

    let sectors = (File::open(&file)?.seek(SeekFrom::End(0))? / SECTOR_SIZE as u64) as usize;

    match mode.as_str() {
        "single-read" => {
            let file = OpenOptions::new()
                .read(true)
                .advise_random_access()
                .open(&file)?;
            file.advise_random_access()?;

            let start = Instant::now();
            black_box(test_read(file, sectors)?);
            println!(
                "Single file read {sectors} sectors in {:?}",
                start.elapsed()
            );
        },
        "single-mmap" => {
            let file = OpenOptions::new()
                .read(true)
                .advise_random_access()
                .open(&file)?;
            file.advise_random_access()?;

            let start = Instant::now();
            black_box(test_mmap(file, sectors)?);
            println!(
                "Single file mmap {sectors} sectors in {:?}",
                start.elapsed()
            );
        },
        "rayon-read" => {
            let file = RayonFiles::open(&file)?;

            let start = Instant::now();
            black_box(test_read(file, sectors)?);
            println!(
                "Rayon files read {sectors} sectors in {:?}",
                start.elapsed()
            );
        },
        "rayon-mmap" => {
            let file = RayonFiles::open(&file)?;

            let start = Instant::now();
            black_box(test_mmap(file, sectors)?);
            println!(
                "Rayon files mmap {sectors} sectors in {:?}",
                start.elapsed()
            );
        },
        badmode => {
            eprintln!("invalid mode argument {}", badmode);
            std::process::exit(2);
        }
    }

    Ok(())
}

trait ReadAtSync: Send + Sync {
    /// Fill the buffer by reading bytes at a specific offset using pread
    fn read_at(&self, buf: &mut [u8], offset: usize) -> io::Result<()>;

    /// Fill the buffer by reading bytes at a specific offset using mmap
    fn mmap_read_at(&self, buf: &mut [u8], offset: usize) -> io::Result<()>;
}

impl ReadAtSync for File {
    fn read_at(&self, buf: &mut [u8], offset: usize) -> io::Result<()> {
        FileExt::read_exact_at(self, buf, offset as u64)
    }
    fn mmap_read_at(&self, buf: &mut [u8], offset: usize) -> io::Result<()> {
        FileExt::mmap_read_at(self, buf, offset as u64)
    }
}

struct RayonFiles {
    files: Vec<File>,
}
impl RayonFiles {
    fn get_file(&self) -> io::Result<&File> {
        rayon::current_thread_index()
            .ok_or_else(|| {
                io::Error::new(
                    io::ErrorKind::Other,
                    "Reads must be called from rayon worker thread",
                )
            })
            .map(|thread_index| self.files.get(thread_index))
            .transpose()
            .unwrap_or_else(|| {
                Err(io::Error::new(
                    io::ErrorKind::Other,
                    "No files entry for this rayon thread"
                ))
            })
    }
}

impl ReadAtSync for RayonFiles {
    fn read_at(&self, buf: &mut [u8], offset: usize) -> io::Result<()> {
        FileExt::read_exact_at(self.get_file()?, buf, offset as u64)
    }
    fn mmap_read_at(&self, buf: &mut [u8], offset: usize) -> io::Result<()> {
        FileExt::mmap_read_at(self.get_file()?, buf, offset as u64)
    }
}

impl RayonFiles {
    fn open<P>(path: P) -> io::Result<Self>
    where
        P: AsRef<Path>,
    {
        let files = (0..rayon::current_num_threads())
            .map(|_| {
                let file = OpenOptions::new()
                    .read(true)
                    .advise_random_access()
                    .open(path.as_ref())?;
                file.advise_random_access()?;

                Ok::<_, io::Error>(file)
            })
            .collect::<Result<Vec<_>, _>>()?;

        Ok(Self { files })
    }
}

/// Extension convenience trait that allows setting some file opening options in cross-platform way
trait OpenOptionsExt {
    /// Advise OS/file system that file will use random access and read-ahead behavior is
    /// undesirable, only has impact on Windows, for other operating systems see [`FileExt`]
    fn advise_random_access(&mut self) -> &mut Self;
}

impl OpenOptionsExt for OpenOptions {
    #[cfg(target_os = "linux")]
    fn advise_random_access(&mut self) -> &mut Self {
        // Not supported
        self
    }

    #[cfg(target_os = "macos")]
    fn advise_random_access(&mut self) -> &mut Self {
        // Not supported
        self
    }

    #[cfg(windows)]
    fn advise_random_access(&mut self) -> &mut Self {
        use std::os::windows::fs::OpenOptionsExt;
        self.custom_flags(winapi::um::winbase::FILE_FLAG_RANDOM_ACCESS)
    }
}

/// Extension convenience trait that allows pre-allocating files, suggesting random access pattern
/// and doing cross-platform exact reads/writes
trait FileExt {
    /// Advise OS/file system that file will use random access and read-ahead behavior is
    /// undesirable, on Windows this can only be set when file is opened, see [`OpenOptionsExt`]
    fn advise_random_access(&self) -> io::Result<()>;

    /// Read exact number of bytes at a specific offset using pread or equiv.
    fn read_exact_at(&self, buf: &mut [u8], offset: u64) -> io::Result<()>;

    /// Read exact number of bytes at a specific offset using mmap
    fn mmap_read_at(&self, buf: &mut [u8], offset: u64) -> io::Result<()>;
}

impl FileExt for File {
    #[cfg(target_os = "linux")]
    fn advise_random_access(&self) -> io::Result<()> {
        use std::os::unix::io::AsRawFd;
        let err = unsafe { libc::posix_fadvise(self.as_raw_fd(), 0, 0, libc::POSIX_FADV_RANDOM) };
        if err != 0 {
            Err(std::io::Error::from_raw_os_error(err))
        } else {
            Ok(())
        }
    }

    #[cfg(target_os = "macos")]
    fn advise_random_access(&self) -> io::Result<()> {
        use std::os::unix::io::AsRawFd;
        if unsafe { libc::fcntl(self.as_raw_fd(), libc::F_RDAHEAD, 0) } != 0 {
            Err(std::io::Error::last_os_error())
        } else {
            Ok(())
        }
    }

    #[cfg(windows)]
    fn advise_random_access(&self) -> io::Result<()> {
        // Not supported
        Ok(())
    }

    #[cfg(unix)]
    fn read_exact_at(&self, buf: &mut [u8], offset: u64) -> io::Result<()> {
        std::os::unix::fs::FileExt::read_exact_at(self, buf, offset)
    }

    #[cfg(unix)]
    fn mmap_read_at(&self, buf: &mut [u8], offset: u64) -> io::Result<()> {
        use std::os::unix::io::AsRawFd;
        use std::ptr;
        use libc::{c_void, mmap, munmap, MAP_FAILED, MAP_SHARED, PROT_READ};

        // Have to align the offset and length arguments to a multiple of
        // the system page size.
        let aligned_offset = offset & !((PAGE_SIZE as u64) - 1);
        let page_offset = (offset - aligned_offset) as usize;

        let copy_len = buf.len();
        let aligned_len = (copy_len + PAGE_SIZE - 1) & !(PAGE_SIZE - 1);

        let addr = unsafe { mmap(
            ptr::null_mut(),
            aligned_len,
            PROT_READ,
            MAP_SHARED,
            self.as_raw_fd(),
            aligned_offset as i64
        ) };
        if addr == MAP_FAILED {
            Err(std::io::Error::last_os_error())
        } else {
            unsafe {
                ptr::copy_nonoverlapping(
                    (addr as *mut u8).add(page_offset) as *mut c_void,
                    buf.as_mut_ptr() as *mut c_void,
                    copy_len
                );
                munmap(addr, buf.len()); // ignore errors
            }
            Ok(())
        }
    }

    #[cfg(windows)]
    fn read_exact_at(&self, mut buf: &mut [u8], mut offset: u64) -> io::Result<()> {
        while !buf.is_empty() {
            match std::os::windows::fs::FileExt::seek_read(self, buf, offset) {
                Ok(0) => {
                    break;
                }
                Ok(n) => {
                    buf = &mut buf[n..];
                    offset += n as u64;
                }
                Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => {
                    // Try again
                }
                Err(e) => {
                    return Err(e);
                }
            }
        }

        if !buf.is_empty() {
            Err(std::io::Error::new(
                std::io::ErrorKind::UnexpectedEof,
                "failed to fill whole buffer",
            ))
        } else {
            Ok(())
        }
    }

    #[cfg(windows)]
    fn mmap_read_at(&self, buf: &mut [u8], offset: u64) -> io::Result<()> {
        // Copied almost verbatim from https://github.com/vasi/positioned-io/blob/1cb70389b133f1e96c68279b04c6baf618e6ca22/src/windows.rs#L44C1-L92C6.
        // Has not been tested or even compiled.  For a fair head-to-head comparison the metadata()?.len() call should be removed.
        let file_len = self.metadata()?.len();
        if buf.is_empty() || offset >= file_len {
            return Ok(());
        }
        let len = min(file_len - offset, buf.len() as u64) as usize;

        unsafe {
            let alignment = offset % allocation_granularity() as u64;
            let aligned_offset = offset - alignment;
            let aligned_len = len + alignment as usize;

            let mapping = CreateFileMappingW(
                self.as_raw_handle() as HANDLE,
                ptr::null_mut(),
                PAGE_READONLY,
                0,
                0,
                ptr::null(),
            );

            if mapping.is_null() {
                return Err(io::Error::last_os_error());
            }

            let aligned_ptr = MapViewOfFile(
                mapping,
                FILE_MAP_READ,
                (aligned_offset >> 32) as DWORD,
                (aligned_offset & 0xffff_ffff) as DWORD,
                aligned_len as SIZE_T,
            );

            CloseHandle(mapping);

            if aligned_ptr.is_null() {
                return Err(io::Error::last_os_error());
            }

            let ptr = (aligned_ptr as *const u8).offset(alignment as isize);
            ptr::copy_nonoverlapping(ptr, buf.as_mut_ptr(), len);

            UnmapViewOfFile(aligned_ptr)?;
        }
        Ok(())
    }
}
2 Likes

To be honest, you're unlikely to get much in-depth discussion about specific Windows APIs on the rustc internals forum. You'd have better luck on more generalist or Windows-specific forums.