From 6ca63f952f16dc76091133a4540be6579c133d9a Mon Sep 17 00:00:00 2001 From: David Rheinsberg Date: Fri, 9 Jan 2026 14:48:38 +0100 Subject: [PATCH] tmp: add I/O abstractions Add a version of `std::io::{Read,Write}` but specifically for memory mapped access, and streaming access. Unlike their counterparts, these versions are meant to operate on offline data. Rather than using `async`, they operate on buffers and propagate control-flow up to the caller whenever more data is required. Signed-off-by: David Rheinsberg --- lib/tmp/src/io/map.rs | 189 +++++++++++++++++++++++ lib/tmp/src/io/mod.rs | 4 + lib/tmp/src/io/stream.rs | 315 +++++++++++++++++++++++++++++++++++++++ lib/tmp/src/lib.rs | 1 + 4 files changed, 509 insertions(+) create mode 100644 lib/tmp/src/io/map.rs create mode 100644 lib/tmp/src/io/mod.rs create mode 100644 lib/tmp/src/io/stream.rs diff --git a/lib/tmp/src/io/map.rs b/lib/tmp/src/io/map.rs new file mode 100644 index 0000000..f14822a --- /dev/null +++ b/lib/tmp/src/io/map.rs @@ -0,0 +1,189 @@ +//! Mapped I/O Utilities +//! +//! This module provides utilities to work with memory-mapped data. The +//! [`Read`] and [`Write`] abstractions allows consumers and producers to +//! be written independently without requiring knowledge about each other. + +use core::mem::MaybeUninit as Uninit; +use core::ops::ControlFlow as Flow; + +#[derive(Clone, Copy, Debug, Hash)] +#[derive(Eq, Ord, PartialEq, PartialOrd)] +pub enum Error { + /// The offset calculation exceeds the limits of the implementation. + Overflow, + /// The data limits were exceeded (for reads it indicates an access past + /// the end, for writes it indicates a limit of available or reserved + /// memory). + Exceeded, +} + +/// `Read` allows chunked access to logically linear data. +pub trait Read { + fn map(&self, idx: usize, len: usize) -> Flow, &[u8]>; + + fn read_uninit( + &self, + idx: &mut usize, + mut data: &mut [Uninit], + ) -> Flow> { + while data.len() > 0 { + let map = self.map(*idx, data.len())?; + assert!(map.len() > 0); + let n = core::cmp::min(map.len(), data.len()); + + { + // SAFETY: `Uninit` is `repr(transparent)` and has no + // additional invariants on its own if read-only. + let map_u = unsafe { + core::mem::transmute::<&[u8], &[Uninit]>(map) + }; + data[..n].copy_from_slice(&map_u[..n]); + } + + *idx += n; + data = &mut data[n..]; + } + Flow::Continue(()) + } + + fn read( + &self, + idx: &mut usize, + data: &mut [u8], + ) -> Flow> { + // SAFETY: `Uninit` is `repr(transparent)` and `read_uninit()` will + // (re-)initialize the entire array properly. + let data_u = unsafe { + core::mem::transmute::<&mut [u8], &mut [Uninit]>(data) + }; + self.read_uninit(idx, data_u) + } +} + +/// `Write` allows chunked mutable access to logically linear data. +pub trait Write { + unsafe fn commit(&mut self, len: usize); + + fn map(&mut self, idx: usize, len: usize) -> Flow, &mut [Uninit]>; + + fn write<'data>( + &mut self, + idx: &mut usize, + data: &'data [u8], + ) -> Flow> { + // SAFETY: `Uninit` is `repr(transparent)` and allows down-casts. + let mut data_u = unsafe { + core::mem::transmute::<&'data [u8], &'data [Uninit]>(data) + }; + + while data_u.len() > 0 { + let map = self.map(*idx, data_u.len())?; + assert!(map.len() > 0); + let n = core::cmp::min(map.len(), data_u.len()); + map[..n].copy_from_slice(&data_u[..n]); + *idx += n; + data_u = &data_u[n..]; + } + Flow::Continue(()) + } + + fn write_iter( + &mut self, + idx: &mut usize, + data: &mut dyn ExactSizeIterator, + ) -> Flow> { + loop { + // We require `ExactSizeIterator`, since iterator hints are not + // reliable bounds. We thus avoid aggressive overallocation if the + // `Self`-implementation has no upper bounds (which it is + // definitely not required to). + let mut n = data.size_hint().0; + if n == 0 { + assert!(data.next().is_none()); + return Flow::Continue(()); + } + + let map = self.map(*idx, n)?; + assert!(map.len() > 0); + n = core::cmp::min(n, map.len()); + + while n > 0 { + map[map.len().strict_sub(n)].write(data.next().unwrap()); + *idx += 1; + n -= 1; + } + } + } + + fn fill( + &mut self, + idx: &mut usize, + mut len: usize, + data: u8, + ) -> Flow> { + let data_u = Uninit::new(data); + while len > 0 { + let map = self.map(*idx, len)?; + assert!(map.len() > 0); + let n = core::cmp::min(map.len(), len); + map[..n].fill(data_u); + *idx += n; + len -= n; + } + Flow::Continue(()) + } + + fn zero(&mut self, idx: &mut usize, len: usize) -> Flow> { + self.fill(idx, len, 0) + } + + fn align_exp2(&mut self, idx: &mut usize, exp: u8) -> Flow> { + match idx.checked_next_multiple_of((1 << exp) as usize) { + None => Flow::Break(Some(Error::Overflow)), + Some(v) => self.zero(idx, v.strict_sub(*idx)), + } + } +} + +impl Read for [u8] { + fn map(&self, idx: usize, len: usize) -> Flow, &[u8]> { + let Some(end) = idx.checked_add(len) else { + return Flow::Break(Some(Error::Overflow)); + }; + if end > self.len() { + return Flow::Break(Some(Error::Exceeded)); + } + + Flow::Continue(&self[idx..end]) + } +} + +impl Write for alloc::vec::Vec { + unsafe fn commit(&mut self, len: usize) { + // SAFETY: Propagated to caller. + unsafe { + self.set_len(self.len().strict_add(len)); + } + } + + fn map( + &mut self, + idx: usize, + len: usize, + ) -> Flow, &mut [Uninit]> { + let Some(end) = idx.checked_add(len) else { + return Flow::Break(Some(Error::Overflow)); + }; + if end > self.len() { + self.reserve(end - self.len()); + } + + let ptr: *mut u8 = self.as_mut_ptr(); + let ptr_u: *mut Uninit = ptr as _; + let cap: usize = self.capacity(); + let slice = unsafe { core::slice::from_raw_parts_mut(ptr_u, cap) }; + + Flow::Continue(&mut slice[idx..end]) + } +} diff --git a/lib/tmp/src/io/mod.rs b/lib/tmp/src/io/mod.rs new file mode 100644 index 0000000..15485a0 --- /dev/null +++ b/lib/tmp/src/io/mod.rs @@ -0,0 +1,4 @@ +//! Input/Output Utilities + +pub mod map; +pub mod stream; diff --git a/lib/tmp/src/io/stream.rs b/lib/tmp/src/io/stream.rs new file mode 100644 index 0000000..6f1d09c --- /dev/null +++ b/lib/tmp/src/io/stream.rs @@ -0,0 +1,315 @@ +//! # Streaming I/O Utilities +//! +//! This module provides utilities to work with data streams. The [`Read`] and +//! [`Write`] abstractions allow consumers and producers to be written +//! independently with requiring knowledge about each other. + +use core::mem::MaybeUninit as Uninit; +use core::ops::ControlFlow as Flow; + +/// `More` describes the data extents needed to serve a request. +/// +/// The main use is for [`Read::map()`] and its derivatives to signal how much +/// more data is needed to serve the request. +/// +/// The structure describes the absolute requirements, rather than than the +/// relative delta. That is, a minimum of `16` means the buffer should be of +/// size 16, rather than 16 more bytes are needed on top of whatever the buffer +/// currently is. +#[derive(Clone, Copy, Debug, Hash)] +#[derive(Eq, Ord, PartialEq, PartialOrd)] +pub struct More { + /// The minimum number of bytes the buffer must have to serve the request. + pub min: usize, + /// An optional maximum number of bytes the request can make use of. + pub max: Option, +} + +/// `Read` allows buffered reads from a data stream. +/// +/// This trait is a connection between protocol implementations and transport +/// layers. That is, it allows writing code that reads structured data from a +/// data stream without knowing the transport layer used to stream the data. +/// +/// The trait is similar to [`std::io::Read`] but is designed for buffered +/// streams that perform transport layer operations +/// +/// The actual transport layer operations are not part of this trait, but must +/// be handled separately. This trait is just an abstraction for the data +/// buffer. +pub trait Read { + /// Advance the stream by the specified number of bytes. + /// + /// This will irrevocably discard the specified number of bytes of the + /// underlying stream of data and advance the position. + /// + /// The underlying stream will buffer data until this function is called. + fn advance(&mut self, len: usize); + + /// Map the data of the stream at the current position. + /// + /// This will return a linear memory mapping of the data of the stream at + /// the current position with at least a length of `min`. The maximum + /// length is a hint and may be ignored by the implementation. A maximum + /// of `None` is equivalent to a maximum of `Some(usize::MAX)`. + /// + /// This function does not advance the position of the underlying stream. + /// Repeated calls to this function will operate on the same data. Use + /// [`Self::advance()`] to advance the position of the stream. + /// Furthermore, this function does not perform any I/O. This function + /// merely maps the available data buffers or rearranges the data to ensure + /// it is available as a linear mapping. + /// + /// This function cannot fail, except if insufficient data is available. If + /// the implementation fails for other reasons, it must be recoverable and + /// handled out of band. + /// + /// If the underlying stream does not have sufficient data buffered, this + /// will return [`ControlFlow::Break()`](core::ops::ControlFlow::Break) + /// with information on how much data is needed. It is up to the caller to + /// pass this information to the stream operators to ensure more data is + /// made available. + fn map(&self, min: usize, max_hint: Option) -> Flow; +} + +/// `Write` allows buffered writes to a data stream. +/// +/// This trait is a connection between protocol implementations and transport +/// layers. That is, it allows writing code that writes structured data to a +/// data stream without knowing the transport layer used to stream the data. +/// +/// The trait is similar to [`std::io::Write`] but is designed for buffered +/// streams that perform transport layer operations +/// +/// The actual transport layer operations are not part of this trait, but must +/// be handled separately. This trait is just an abstraction for the data +/// buffer. +pub trait Write { + /// Commit the specified number of bytes to the stream. + /// + /// This will mark the given number of bytes as ready to be written and + /// advance the position of the stream. No data is actually written to the + /// transport layer, but merely marked to be ready. Transport layer + /// operations must be handled separately. + /// + /// ## Safety + /// + /// The caller must ensure that the first `len` bytes of the stream buffer + /// have been initialized via [`Self::map()`] or one of its derivatives. + unsafe fn commit(&mut self, len: usize); + + /// Map the data buffer of the stream for writing. + /// + /// This will return a linear memory mapping of the data buffer with at + /// least a length of `min`. The maximum length is a hint and may be + /// ignored by the implementation. A maximum of `None` is equivalent to a + /// maximum of `Some(usize::MAX)`. + /// + /// The data buffer can be repeatedly written to by the caller. No data is + /// written to the stream until [`Self::commit()`] is called. Repeated + /// calls must return the same mapping, unless the data is committed + /// between those calls. The mapping might be moved between two calls, but + /// the content of initialized cells must be retained, except if committed + /// in between. + /// + /// This function cannot fail, except if insufficient buffer space is + /// available. If the implementation fails for other reasons, it must be + /// recoverable and handled out of band. + /// + /// If the underlying stream does not have sufficient data buffers, this + /// will return [`ControlFlow::Break()`](core::ops::ControlFlow::Break) + /// with information on how much space is needed. It is up to the caller to + /// pass this information to the stream operators to ensure more data is + /// made available. + fn map(&mut self, min: usize, max_hint: Option) -> Flow]>; + + /// Map the initialized data buffer of the stream for writing. + /// + /// This works like [`Self::map()`] but returns an initialized reference. + /// This will always truncate the slice to the requested length. + /// + /// ## Safety + /// + /// The caller must ensure that the first `len` bytes of the stream buffer + /// have been initialized via [`Self::map()`] or one of its derivatives. + unsafe fn map_unchecked(&mut self, len: usize) -> Flow { + self.map(len, Some(len)).map_continue(|v| { + // SAFETY: Propagated to caller. + unsafe { + core::mem::transmute::<&mut [Uninit], &mut [u8]>( + &mut v[..len], + ) + } + }) + } + + /// Commit data directly to the stream. + /// + /// This takes a data buffer, writes it to the stream buffers at the + /// current position, and then commits the data. + /// + /// If insufficient buffer space is available to atomically write the + /// entire data blob, this will forward the return value from + /// [`Self::map()`]. + fn write(&mut self, data: &[u8]) -> Flow { + // SAFETY: `Uninit` is `repr(transparent)` and allows down-casts. + let data_u = unsafe { core::mem::transmute::<&[u8], &[Uninit]>(data) }; + let map = self.map(data.len(), Some(data.len()))?; + map[..data.len()].copy_from_slice(data_u); + + // SAFETY: `data.len()` bytes were copied, so they must be initialized. + unsafe { self.commit(data.len()) }; + + Flow::Continue(()) + } +} + +/// Map data of the stream for as long as the predicate indicates. +/// +/// This extends [`Read::map()`] by mapping buffered data for as long as the +/// provided predicate returns `true`. The predicate will be called for each +/// byte given the index of the byte relative to the current stream position +/// and its value. Once the predicate returns `false`, the map up until this +/// position is returned. +/// +/// The behavior otherwise matches [`Read::map()`]. +/// +/// `n` is the offset relative to the current streaming position where to +/// start running the predicate. `n` is incremented each time the predicate +/// is run and returned `true`. This avoids calling the predicate multiple +/// times on the same values even if the mapping operation is interrupted +/// with [`ControlFlow::Break()`](core::ops::ControlFlow::Break). +/// +/// That is, on success, `n` will reflect the relative index (relative to the +/// current stream position) of the first byte that failed the predicate. +/// However, the returned map is guaranteed to be at least 1 byte bigger and +/// thus always includes this byte. The returned map might be arbitrarily big +/// and the caller must truncate it, if required. +pub fn read_map_while<'this, This, Predicate>( + this: &'this This, + n: &mut usize, + max: Option, + mut predicate: Predicate, +) -> Flow +where + This: ?Sized + Read, + Predicate: FnMut(usize, u8) -> bool, +{ + let max_v = max.unwrap_or(usize::MAX); + + loop { + let n1 = n.strict_add(1); + let map = this.map(n1, max)?; + let map = &map[..core::cmp::min(map.len(), max_v)]; + assert!(map.len() >= n1); + assert!(map.len() <= max_v); + + while *n < map.len() { + if !predicate(*n, map[*n]) { + return Flow::Continue(map); + } + *n += 1; + } + + if *n >= max_v { + return Flow::Continue(map); + } + } +} + +impl<'this> dyn Read + 'this { + /// Map data of the stream for as long as the predicate indicates. + /// + /// This is an alias for [`read_map_while()`]. + pub fn map_while( + &self, + n: &mut usize, + max: Option, + predicate: Predicate, + ) -> Flow + where + Predicate: FnMut(usize, u8) -> bool, + { + read_map_while(self, n, max, predicate) + } +} + +impl<'data> Read for &'data [u8] { + fn advance(&mut self, len: usize) { + let v = core::mem::take(self); + *self = &v[len..]; + } + + fn map(&self, min: usize, max: Option) -> Flow { + if min <= self.len() { + Flow::Continue(self) + } else { + Flow::Break(More { min: min, max: max }) + } + } +} + +impl Write for alloc::vec::Vec { + unsafe fn commit(&mut self, len: usize) { + // SAFETY: Propagated to caller. + unsafe { + self.set_len(self.len().strict_add(len)); + } + } + + fn map(&mut self, min: usize, max: Option) -> Flow]> { + self.reserve(max.unwrap_or(min)); + Flow::Continue(self.spare_capacity_mut()) + } +} + +#[cfg(test)] +mod test { + use super::*; + + // A basic test of the `Write` trait and its helpers, using the trivial + // `Vec`-based implementation. + #[test] + fn write_basic() { + let mut vec = alloc::vec::Vec::new(); + + let v = vec.map(0, None).continue_value(); + assert!(v.is_some()); + + // Initialize the vector with increasing values. + let v = vec.map(128, None).continue_value().unwrap(); + assert!(v.len() >= 128); + for (i, e) in v.iter_mut().enumerate() { + e.write(i as u8); + } + + // Verify that a re-map correctly shows the values. + let v = unsafe { vec.map_unchecked(16).continue_value().unwrap() }; + assert_eq!(v.len(), 16); + for (i, e) in v.iter().enumerate() { + assert_eq!(*e, i as u8); + } + + // Commit the prefix. + unsafe { vec.commit(16) }; + + // Verify that the prefix was stripped and a re-map shows the tail. + let v = unsafe { vec.map_unchecked(16).continue_value().unwrap() }; + assert_eq!(v.len(), 16); + for (i, e) in v.iter().enumerate() { + assert_eq!(*e, (i + 16) as u8); + } + + // Discard the previous temporary values and rewrite the next 16 values + // again starting at 0 and immediately commit it. + let _ = vec.write( + &[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15], + ).continue_value().unwrap(); + + // Verify that the final committed data is twice the numbers 0-15. + assert_eq!(vec.len(), 32); + for (i, e) in vec.iter().enumerate() { + assert_eq!(*e, (i % 16) as u8); + } + } +} diff --git a/lib/tmp/src/lib.rs b/lib/tmp/src/lib.rs index f2e765a..3dbddb6 100644 --- a/lib/tmp/src/lib.rs +++ b/lib/tmp/src/lib.rs @@ -11,5 +11,6 @@ extern crate core; #[cfg(test)] extern crate std; +pub mod io; pub mod msdosmz; pub mod pecoff;