diff --git a/Cargo.lock b/Cargo.lock index 8001be5e..db5e0e3b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -707,6 +707,7 @@ dependencies = [ "http-body-util", "humantime", "indexmap", + "md-5", "mime", "mime_guess", "mockall", diff --git a/Cargo.toml b/Cargo.toml index 8d793847..0960264a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -100,6 +100,7 @@ humantime = "2" indexmap = "2" insta = { version = "1", features = ["filters"] } insta-cmd = "0.6" +md5 = { package = "md-5", version = "0.10.6"} mime = "0.3" mime_guess = { version = "2", default-features = false } mockall = "0.14" diff --git a/cot/Cargo.toml b/cot/Cargo.toml index 1cb081b6..df24c714 100644 --- a/cot/Cargo.toml +++ b/cot/Cargo.toml @@ -42,6 +42,7 @@ http-body.workspace = true http.workspace = true humantime.workspace = true indexmap.workspace = true +md5.workspace = true mime.workspace = true mime_guess.workspace = true multer.workspace = true diff --git a/cot/src/cache/store.rs b/cot/src/cache/store.rs index ef278cbb..f42c3f5f 100644 --- a/cot/src/cache/store.rs +++ b/cot/src/cache/store.rs @@ -5,6 +5,7 @@ //! provide a simple asynchronous interface for putting, getting, and managing //! cached values, optionally with expiration policies. +pub mod file; pub mod memory; #[cfg(feature = "redis")] pub mod redis; diff --git a/cot/src/cache/store/file.rs b/cot/src/cache/store/file.rs new file mode 100644 index 00000000..5583d107 --- /dev/null +++ b/cot/src/cache/store/file.rs @@ -0,0 +1,645 @@ +//! File-based cache store implementation. +//! +//! This store uses the local file system as the backend for caching. It +//! provides atomic writes via sync-then-rename and active validation for +//! TTL-based expiration. +//! +//! # Examples +//! +//! ```no_run +//! # use cot::cache::store::file::FileStore; +//! # use cot::cache::store::CacheStore; +//! # use cot::config::Timeout; +//! # use std::path::PathBuf; +//! # #[tokio::main] +//! # async fn main() { +//! let path = PathBuf::from("./cache_data"); +//! let store = FileStore::new(path).expect("Failed to initialize store"); +//! +//! let key = "example_key".to_string(); +//! let value = serde_json::json!({"data": "example_value"}); +//! +//! store.insert(key.clone(), value.clone(), Default::default()).await.unwrap(); +//! +//! let retrieved = store.get(&key).await.unwrap(); +//! assert_eq!(retrieved, Some(value)); +//! +//! # } +//! ``` +//! +//! # Expiration Policy +//! +//! Cache files are evicted on `contains_key` and `get`. +//! No background collector is implemented. +use std::borrow::Cow; +use std::path::Path; + +use chrono::{DateTime, Utc}; +use md5::{Digest, Md5}; +use serde_json::Value; +use thiserror::Error; +use tokio::fs::OpenOptions; +use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt, SeekFrom}; + +use crate::cache::store::{CacheStore, CacheStoreError, CacheStoreResult}; +use crate::config::Timeout; +use crate::error::error_impl::impl_into_cot_error; + +const ERROR_PREFIX: &str = "file based cache store error:"; +const TEMPFILE_SUFFIX: &str = ".tmp"; + +/// Errors specific to the file based cache store. +#[derive(Debug, Error)] +#[non_exhaustive] +pub enum FileCacheStoreError { + /// An error occured during directory creation + #[error("{ERROR_PREFIX} file dir creation error: {0}")] + DirCreation(Box), + + /// An error occured during temp file creation + #[error("{ERROR_PREFIX} file temp file creation error: {0}")] + TempFileCreation(Box), + + /// An error occured during write/stream file + #[error("{ERROR_PREFIX} file io error: {0}")] + Io(Box), + + /// An error occured during data serialization + #[error("{ERROR_PREFIX} serialization error: {0}")] + Serialize(Box), + + /// An error occured during data deserialization + #[error("{ERROR_PREFIX} deserialization error: {0}")] + Deserialize(Box), +} + +impl_into_cot_error!(FileCacheStoreError); + +impl From for CacheStoreError { + fn from(err: FileCacheStoreError) -> Self { + let full = err.to_string(); + + match err { + FileCacheStoreError::Serialize(_) => CacheStoreError::Serialize(full), + FileCacheStoreError::Deserialize(_) => CacheStoreError::Deserialize(full), + _ => CacheStoreError::Backend(full), + } + } +} + +/// A file-backed cache store implementation. +/// +/// This store uses the local file system for caching. +/// +/// # Examples +/// ```no_run +/// use std::path::Path; +/// +/// use cot::cache::store::file::FileStore; +/// +/// let store = FileStore::new(Path::new("./cache_dir")).unwrap(); +/// ``` +#[derive(Debug, Clone)] +pub struct FileStore { + dir_path: Cow<'static, Path>, +} + +impl FileStore { + /// Creates a new `FileStore` at the specified directory. + /// + /// This will attempt to create the directory and its parents if they do not + /// exist. + /// + /// # Errors + /// + /// Returns [`FileCacheStoreError::DirCreation`] if the directory cannot be + /// created due to permissions or other I/O issues. + /// + /// # Examples + /// + /// ```no_run + /// use std::path::PathBuf; + /// + /// use cot::cache::store::file::FileStore; + /// + /// // Using a string slice + /// let path = PathBuf::from("./cache"); + /// let store = FileStore::new(path).unwrap(); + /// + /// // Using a PathBuf + /// let path = PathBuf::from("/var/lib/myapp/cache"); + /// let store = FileStore::new(path).unwrap(); + /// ``` + pub fn new(dir: impl Into>) -> CacheStoreResult { + let dir_path = dir.into(); + + let store = Self { dir_path }; + store.create_dir_sync_root()?; + + Ok(store) + } + + fn create_dir_sync_root(&self) -> CacheStoreResult<()> { + std::fs::create_dir_all(&self.dir_path) + .map_err(|e| FileCacheStoreError::DirCreation(Box::new(e)))?; + + Ok(()) + } + + async fn create_dir_root(&self) -> CacheStoreResult<()> { + tokio::fs::create_dir_all(&self.dir_path) + .await + .map_err(|e| FileCacheStoreError::DirCreation(Box::new(e)))?; + + Ok(()) + } + + async fn write(&self, key: String, value: Value, expiry: Timeout) -> CacheStoreResult<()> { + self.create_dir_root().await?; // create the dir if not exist + + let key_hash = FileStore::create_key_hash(&key); + let (mut file, file_path) = self.create_file_temp(&key_hash).await?; + + let proc_result: CacheStoreResult<()> = async { + self.serialize_data(value, expiry, &mut file).await?; + + Ok(()) + } + .await; + + if let Err(e) = proc_result { + let _ = tokio::fs::remove_file(&file_path).await; + return Err(e); + } + + // rename + file.sync_all() + .await + .map_err(|e| FileCacheStoreError::Io(Box::new(e)))?; + tokio::fs::rename(file_path, self.dir_path.join(&key_hash)) + .await + .map_err(|e| FileCacheStoreError::Io(Box::new(e)))?; + + Ok(()) + } + + async fn read(&self, key: &str) -> CacheStoreResult> { + let Some((mut file, file_path)) = self.file_open(key).await? else { + return Ok(None); + }; + + if let Some(value) = self.deserialize_data(&mut file).await? { + Ok(Some(value)) + } else { + // delete on expired when read + let _ = tokio::fs::remove_file(&file_path).await; + Ok(None) + } + } + + fn create_key_hash(key: &str) -> String { + let mut hasher = Md5::new(); + hasher.update(key.as_bytes()); + let key_hash_hex = hasher.finalize(); + format!("{key_hash_hex:x}") + } + + async fn serialize_data( + &self, + value: Value, + expiry: Timeout, + file: &mut tokio::fs::File, + ) -> CacheStoreResult> { + let timeout = expiry.canonicalize(); + let seconds: i64 = match timeout { + Timeout::Never => i64::MAX, + Timeout::AtDateTime(date_time) => date_time.timestamp(), + Timeout::After(_) => unreachable!("should've been converted by canonicalize"), + }; + let timeout_header = seconds.to_le_bytes(); + + let data = serde_json::to_string(&value) + .map_err(|e| FileCacheStoreError::Serialize(Box::new(e)))?; + + let mut buffer: Vec = Vec::with_capacity(8 + data.len()); + buffer.extend_from_slice(&timeout_header); + buffer.extend_from_slice(data.as_bytes()); + + file.write_all(&buffer) + .await + .map_err(|e| FileCacheStoreError::Io(Box::new(e)))?; + + Ok(buffer) + } + + async fn parse_expiry(&self, file: &mut tokio::fs::File) -> CacheStoreResult { + let mut header: [u8; 8] = [0; 8]; + + let _ = file + .read_exact(&mut header) + .await + .map_err(|e| FileCacheStoreError::Deserialize(Box::new(e)))?; + let seconds = i64::from_le_bytes(header); + + let expiry = if seconds == i64::MAX { + Timeout::Never + } else { + let date_time = DateTime::from_timestamp(seconds, 0) + .ok_or_else(|| FileCacheStoreError::Deserialize("date time corrupted".into()))? + .with_timezone(&Utc) + .fixed_offset(); + Timeout::AtDateTime(date_time) + }; + + if expiry.is_expired(None) { + return Ok(false); + } + + // This may look inefficient, but this ensures portability + // By making this method reset its own cursor, + // the logic is reusable without the risk of forgetting to reset cursor + file.seek(SeekFrom::Start(0)) + .await + .map_err(|e| FileCacheStoreError::Io(Box::new(e)))?; + + Ok(true) + } + + async fn deserialize_data( + &self, + file: &mut tokio::fs::File, + ) -> CacheStoreResult> { + if !self.parse_expiry(file).await? { + return Ok(None); + } + + let mut buffer = Vec::new(); + + // advances cursor by the expiry header offset + file.seek(SeekFrom::Start(8)) + .await + .map_err(|e| FileCacheStoreError::Io(Box::new(e)))?; + file.read_to_end(&mut buffer) + .await + .map_err(|e| FileCacheStoreError::Io(Box::new(e)))?; + + let value: Value = serde_json::from_slice(&buffer) + .map_err(|e| FileCacheStoreError::Deserialize(Box::new(e)))?; + + Ok(Some(value)) + } + + async fn create_file_temp( + &self, + key_hash: &str, + ) -> CacheStoreResult<(tokio::fs::File, std::path::PathBuf)> { + let temp_path = self.dir_path.join(format!("{key_hash}{TEMPFILE_SUFFIX}")); + + let temp_file = OpenOptions::new() + .write(true) + .create(true) + .truncate(true) + .open(&temp_path) + .await + .map_err(|e| FileCacheStoreError::TempFileCreation(Box::new(e)))?; + + Ok((temp_file, temp_path)) + } + + async fn file_open( + &self, + key: &str, + ) -> CacheStoreResult> { + let key_hash = FileStore::create_key_hash(key); + let path = self.dir_path.join(&key_hash); + match OpenOptions::new().read(true).open(&path).await { + Ok(f) => Ok(Some((f, path))), + Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(None), + Err(e) => Err(FileCacheStoreError::Io(Box::new(e)).into()), + } + } +} + +impl CacheStore for FileStore { + async fn get(&self, key: &str) -> CacheStoreResult> { + match self.read(key).await? { + Some(value) => Ok(Some(value)), + None => Ok(None), + } + } + + async fn insert(&self, key: String, value: Value, expiry: Timeout) -> CacheStoreResult<()> { + self.write(key, value, expiry).await?; + Ok(()) + } + + async fn remove(&self, key: &str) -> CacheStoreResult<()> { + if let Some((_file, file_path)) = self.file_open(key).await? { + tokio::fs::remove_file(file_path) + .await + .map_err(|e| FileCacheStoreError::Io(Box::new(e)))?; + } + + Ok(()) + } + + async fn clear(&self) -> CacheStoreResult<()> { + if let Err(e) = tokio::fs::remove_dir_all(&self.dir_path).await { + // if not found try to continue, don't dip + if e.kind() != std::io::ErrorKind::NotFound { + return Err(FileCacheStoreError::Io(Box::new(e)).into()); + } + } + // even though write is self healing, this minimizes result variants on other + // methods + tokio::fs::create_dir_all(&self.dir_path) + .await + .map_err(|e| FileCacheStoreError::DirCreation(Box::new(e)))?; + Ok(()) + } + + async fn approx_size(&self) -> CacheStoreResult { + let mut entries = match tokio::fs::read_dir(&self.dir_path).await { + Ok(e) => e, + Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(0), + Err(e) => return Err(FileCacheStoreError::Io(Box::new(e)).into()), + }; + + let mut total_size: u64 = 0; + + while let Ok(Some(entry)) = entries.next_entry().await { + if let Ok(meta) = entry.metadata().await + && meta.is_file() + { + total_size += meta.len(); + } + } + + // when error is triggered, this would be because capacity overflow + // of trying to wrap usize on a 32-bit system + let wrapped_size = + usize::try_from(total_size).map_err(|e| FileCacheStoreError::Io(Box::new(e)))?; + + Ok(wrapped_size) + } + + async fn contains_key(&self, key: &str) -> CacheStoreResult { + let Ok(Some(mut file_tuple)) = self.file_open(key).await else { + return Ok(false); + }; + + // cache eviction on contains_key() based on TTL + if self.parse_expiry(&mut file_tuple.0).await? { + return Ok(true); + } + + tokio::fs::remove_file(&file_tuple.1) + .await + .map_err(|e| FileCacheStoreError::Io(Box::new(e)))?; + + Ok(false) + } +} + +#[cfg(test)] +mod tests { + use std::time::Duration; + + use chrono::Utc; + use tempfile::tempdir; + + use crate::cache::store::file::{FileCacheStoreError, FileStore}; + use crate::cache::store::{CacheStore, CacheStoreError}; + use crate::config::Timeout; + + fn make_store_path() -> std::path::PathBuf { + tempdir().expect("failed to create dir").keep() + } + + #[cot::test] + async fn test_create_dir() { + let path = make_store_path(); + let _ = FileStore::new(path.clone()).expect("failed to init store"); + + assert!(path.exists()); + assert!(path.is_dir()); + + tokio::fs::remove_dir_all(path) + .await + .expect("failed to cleanup tempdir"); + } + + #[cot::test] + async fn test_create_dir_on_existing() { + let path = make_store_path(); + let _ = FileStore::new(path.clone()).expect("failed to init store"); + let _ = FileStore::new(path.clone()).expect("failed to init second store"); + + assert!(path.exists()); + assert!(path.is_dir()); + + tokio::fs::remove_dir_all(path) + .await + .expect("failed to cleanup tempdir"); + } + + #[cot::test] + async fn test_insert_and_read_single() { + let path = make_store_path(); + + let store = FileStore::new(path.clone()).expect("failed to init store"); + let key = "test_key".to_string(); + let value = serde_json::json!({ "id": 1, "message": "hello world" }); + + store + .insert(key.clone(), value.clone(), Timeout::Never) + .await + .expect("failed to insert data to store"); + + let retrieved = store.read(&key).await.expect("failed to read from store"); + + assert!(retrieved.is_some(), "retrieved value should not be None"); + assert_eq!( + retrieved.unwrap(), + value, + "retrieved value does not match inserted value" + ); + + let _ = tokio::fs::remove_dir_all(&path).await; + } + + #[cot::test] + async fn test_insert_and_read_after_delete_single() { + let path = make_store_path(); + + let store = FileStore::new(path.clone()).expect("failed to init store"); + let key = "test_key".to_string(); + let value = serde_json::json!({ "id": 1, "message": "hello world" }); + + store + .insert(key.clone(), value.clone(), Timeout::Never) + .await + .expect("failed to insert data to store"); + + store.remove(&key).await.expect("failed to delete entry"); + + let retrieved = store.read(&key).await.expect("failed to read from store"); + assert!(retrieved.is_none(), "retrieved value should not be Some"); + + let _ = tokio::fs::remove_dir_all(&path).await; + } + + #[cot::test] + async fn test_clear_double_free() { + let path = make_store_path(); + + let store = FileStore::new(path.clone()).expect("failed to init store"); + let key = "test_key".to_string(); + let value = serde_json::json!({ "id": 1, "message": "hello world" }); + + store + .insert(key.clone(), value.clone(), Timeout::Never) + .await + .expect("failed to insert data to store"); + + store.clear().await.expect("failed to clear"); + store + .clear() + .await + .expect("failed to clear the second time"); + + let retrieved = store.read(&key).await.expect("failed to read from store"); + + assert!(path.is_dir(), "path must be dir"); + assert!(retrieved.is_none(), "retrieved value should not be Some"); + + let _ = tokio::fs::remove_dir_all(&path).await; + } + + #[cot::test] + async fn test_approx_size() { + let path = make_store_path(); + + let store = FileStore::new(path.clone()).expect("failed to init store"); + let key = "test_key".to_string(); + let value = serde_json::json!({ "id": 1, "message": "hello world" }); + + store + .insert(key.clone(), value.clone(), Timeout::Never) + .await + .expect("failed to insert data to store"); + + let data_buffer: Vec = + serde_json::to_vec(&value).expect("failed to convert value into vector"); + let data_length: usize = 8 + data_buffer.len(); // extra 8 for header size + + let entry_length = store + .approx_size() + .await + .expect("failed to get approx file"); + + assert_eq!(data_length, entry_length); + + let _ = tokio::fs::remove_dir_all(&path).await; + } + + #[cot::test] + async fn test_contains_key() { + let path = make_store_path(); + + let store = FileStore::new(path.clone()).expect("failed to init store"); + let key = "test_key".to_string(); + let value = serde_json::json!({ "id": 1, "message": "hello world" }); + + store + .insert(key.clone(), value.clone(), Timeout::Never) + .await + .expect("failed to insert data to store"); + + let exist = store + .contains_key(&key) + .await + .expect("failed to check key existence"); + + assert!(exist); + + let _ = tokio::fs::remove_dir_all(&path).await; + } + + #[cot::test] + async fn test_expiration_integrity() { + let path = make_store_path(); + + let store = FileStore::new(path.clone()).expect("failed to init store"); + let key = "test_key".to_string(); + let value = serde_json::json!({ "id": 1, "message": "hello world" }); + + let past = Utc::now() - Duration::from_secs(1); + let past_fixed = past.fixed_offset(); + let expiry = Timeout::AtDateTime(past_fixed); + + store + .insert(key.clone(), value.clone(), expiry) + .await + .expect("failed to insert data to store"); + + // test file is None + let retrieved = store.get(&key).await.expect("failed to read from store"); + assert!(retrieved.is_none()); + + // test file doesn't exist + let exist = store + .contains_key(&key) + .await + .expect("failed to check key existence"); + assert!(!exist); + + // test size is 0 + let size = store.approx_size().await.expect("failed to check size"); + assert_eq!(size, 0); + + let _ = tokio::fs::remove_dir_all(&path).await; + } + + #[cot::test] + async fn test_from_file_cache_store_error_to_cache_store_error() { + let file_error = FileCacheStoreError::Io(Box::new(std::io::Error::other("disk failure"))); + let cache_error: CacheStoreError = file_error.into(); + assert_eq!( + cache_error.to_string(), + "cache store error: backend error: file based cache store error: file io error: disk failure" + ); + + let file_error = + FileCacheStoreError::Serialize(Box::new(std::io::Error::other("json fail"))); + let cache_error: CacheStoreError = file_error.into(); + assert_eq!( + cache_error.to_string(), + "cache store error: serialization error: file based cache store error: serialization error: json fail" + ); + + let file_error = + FileCacheStoreError::Deserialize(Box::new(std::io::Error::other("corrupt header"))); + let cache_error: CacheStoreError = file_error.into(); + assert_eq!( + cache_error.to_string(), + "cache store error: deserialization error: file based cache store error: deserialization error: corrupt header" + ); + + let file_error = + FileCacheStoreError::DirCreation(Box::new(std::io::Error::other("permission denied"))); + let cache_error: CacheStoreError = file_error.into(); + assert_eq!( + cache_error.to_string(), + "cache store error: backend error: file based cache store error: file dir creation error: permission denied" + ); + + let file_error = + FileCacheStoreError::TempFileCreation(Box::new(std::io::Error::other("no space left"))); + let cache_error: CacheStoreError = file_error.into(); + assert_eq!( + cache_error.to_string(), + "cache store error: backend error: file based cache store error: file temp file creation error: no space left" + ); + } +}