From 972c957a8b6accffb0a4c244749fad8d58bf55e4 Mon Sep 17 00:00:00 2001 From: jiacai2050 Date: Sun, 27 Apr 2025 10:27:47 +0800 Subject: [PATCH 1/3] refactor: rename columnar storage --- Cargo.lock | 58 +++++++++---------- Cargo.toml | 4 +- src/benchmarks/Cargo.toml | 2 +- src/benchmarks/src/encoding_bench.rs | 2 +- src/{storage => columnar_storage}/Cargo.toml | 2 +- .../src/compaction/executor.rs | 0 .../src/compaction/mod.rs | 0 .../src/compaction/picker.rs | 0 .../src/compaction/scheduler.rs | 0 .../src/config.rs | 0 src/{storage => columnar_storage}/src/lib.rs | 0 .../src/macros.rs | 0 .../src/manifest/encoding.rs | 0 .../src/manifest/mod.rs | 0 .../src/operator.rs | 0 src/{storage => columnar_storage}/src/read.rs | 0 src/{storage => columnar_storage}/src/sst.rs | 0 .../src/storage.rs | 0 .../src/test_util.rs | 0 .../src/types.rs | 0 src/metric_engine/Cargo.toml | 2 +- src/metric_engine/src/data/mod.rs | 2 +- src/metric_engine/src/index/mod.rs | 2 +- src/metric_engine/src/metric/mod.rs | 2 +- src/server/Cargo.toml | 2 +- src/server/src/config.rs | 2 +- src/server/src/main.rs | 4 +- 27 files changed, 42 insertions(+), 42 deletions(-) rename src/{storage => columnar_storage}/Cargo.toml (98%) rename src/{storage => columnar_storage}/src/compaction/executor.rs (100%) rename src/{storage => columnar_storage}/src/compaction/mod.rs (100%) rename src/{storage => columnar_storage}/src/compaction/picker.rs (100%) rename src/{storage => columnar_storage}/src/compaction/scheduler.rs (100%) rename src/{storage => columnar_storage}/src/config.rs (100%) rename src/{storage => columnar_storage}/src/lib.rs (100%) rename src/{storage => columnar_storage}/src/macros.rs (100%) rename src/{storage => columnar_storage}/src/manifest/encoding.rs (100%) rename src/{storage => columnar_storage}/src/manifest/mod.rs (100%) rename src/{storage => columnar_storage}/src/operator.rs (100%) rename src/{storage => columnar_storage}/src/read.rs (100%) rename src/{storage => columnar_storage}/src/sst.rs (100%) rename src/{storage => columnar_storage}/src/storage.rs (100%) rename src/{storage => columnar_storage}/src/test_util.rs (100%) rename src/{storage => columnar_storage}/src/types.rs (100%) diff --git a/Cargo.lock b/Cargo.lock index f92b955d32..6671600616 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -630,12 +630,12 @@ name = "benchmarks" version = "2.2.0-alpha" dependencies = [ "bytes", + "columnar_storage", "common", "criterion", "pb_types", "prost", "serde", - "storage", "toml", "tracing", "tracing-subscriber", @@ -893,6 +893,32 @@ version = "1.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5b63caa9aa9397e2d9480a9b13673856c78d8ac123288526c37d7839f2a86990" +[[package]] +name = "columnar_storage" +version = "2.2.0-alpha" +dependencies = [ + "anyhow", + "arrow", + "async-scoped", + "async-trait", + "byteorder", + "bytes", + "common", + "datafusion", + "futures", + "itertools 0.3.25", + "lazy_static", + "object_store", + "parquet", + "pb_types", + "prost", + "serde", + "temp-dir", + "test-log", + "tokio", + "tracing", +] + [[package]] name = "comfy-table" version = "7.1.1" @@ -2154,9 +2180,9 @@ name = "metric_engine" version = "2.2.0-alpha" dependencies = [ "anyhow", + "columnar_storage", "common", "seahash", - "storage", "temp-dir", "test-log", "thiserror", @@ -2904,12 +2930,12 @@ dependencies = [ "actix-web", "arrow", "clap", + "columnar_storage", "common", "futures", "object_store", "rand", "serde", - "storage", "tokio", "toml", "tracing", @@ -3047,32 +3073,6 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f" -[[package]] -name = "storage" -version = "2.2.0-alpha" -dependencies = [ - "anyhow", - "arrow", - "async-scoped", - "async-trait", - "byteorder", - "bytes", - "common", - "datafusion", - "futures", - "itertools 0.3.25", - "lazy_static", - "object_store", - "parquet", - "pb_types", - "prost", - "serde", - "temp-dir", - "test-log", - "tokio", - "tracing", -] - [[package]] name = "strsim" version = "0.11.1" diff --git a/Cargo.toml b/Cargo.toml index 84ae73c6e3..9a8b797379 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -32,14 +32,14 @@ members = [ "src/metric_engine", "src/pb_types", "src/server", - "src/storage" + "src/columnar_storage" ] [workspace.dependencies] anyhow = { version = "1.0" } seahash = { version = "4" } metric_engine = { path = "src/metric_engine" } -horaedb_storage = { package = "storage", path = "src/storage" } +columnar_storage = { path = "src/columnar_storage" } common = { path = "src/common" } thiserror = "1" bytes = "1" diff --git a/src/benchmarks/Cargo.toml b/src/benchmarks/Cargo.toml index 7db01ea0da..2797e5c7d7 100644 --- a/src/benchmarks/Cargo.toml +++ b/src/benchmarks/Cargo.toml @@ -28,7 +28,7 @@ description.workspace = true [dependencies] bytes = { workspace = true } common = { workspace = true } -horaedb_storage = { workspace = true } +columnar_storage = { workspace = true } pb_types = { workspace = true } prost = { workspace = true } serde = { workspace = true } diff --git a/src/benchmarks/src/encoding_bench.rs b/src/benchmarks/src/encoding_bench.rs index ff5e3551cb..4045b5bcc4 100644 --- a/src/benchmarks/src/encoding_bench.rs +++ b/src/benchmarks/src/encoding_bench.rs @@ -18,7 +18,7 @@ //! encoding bench. use bytes::Bytes; -use horaedb_storage::{ +use columnar_storage::{ manifest::Snapshot, sst::{FileMeta, SstFile}, }; diff --git a/src/storage/Cargo.toml b/src/columnar_storage/Cargo.toml similarity index 98% rename from src/storage/Cargo.toml rename to src/columnar_storage/Cargo.toml index a6ee671d42..fae087e391 100644 --- a/src/storage/Cargo.toml +++ b/src/columnar_storage/Cargo.toml @@ -16,7 +16,7 @@ # under the License. [package] -name = "storage" +name = "columnar_storage" version.workspace = true authors.workspace = true edition.workspace = true diff --git a/src/storage/src/compaction/executor.rs b/src/columnar_storage/src/compaction/executor.rs similarity index 100% rename from src/storage/src/compaction/executor.rs rename to src/columnar_storage/src/compaction/executor.rs diff --git a/src/storage/src/compaction/mod.rs b/src/columnar_storage/src/compaction/mod.rs similarity index 100% rename from src/storage/src/compaction/mod.rs rename to src/columnar_storage/src/compaction/mod.rs diff --git a/src/storage/src/compaction/picker.rs b/src/columnar_storage/src/compaction/picker.rs similarity index 100% rename from src/storage/src/compaction/picker.rs rename to src/columnar_storage/src/compaction/picker.rs diff --git a/src/storage/src/compaction/scheduler.rs b/src/columnar_storage/src/compaction/scheduler.rs similarity index 100% rename from src/storage/src/compaction/scheduler.rs rename to src/columnar_storage/src/compaction/scheduler.rs diff --git a/src/storage/src/config.rs b/src/columnar_storage/src/config.rs similarity index 100% rename from src/storage/src/config.rs rename to src/columnar_storage/src/config.rs diff --git a/src/storage/src/lib.rs b/src/columnar_storage/src/lib.rs similarity index 100% rename from src/storage/src/lib.rs rename to src/columnar_storage/src/lib.rs diff --git a/src/storage/src/macros.rs b/src/columnar_storage/src/macros.rs similarity index 100% rename from src/storage/src/macros.rs rename to src/columnar_storage/src/macros.rs diff --git a/src/storage/src/manifest/encoding.rs b/src/columnar_storage/src/manifest/encoding.rs similarity index 100% rename from src/storage/src/manifest/encoding.rs rename to src/columnar_storage/src/manifest/encoding.rs diff --git a/src/storage/src/manifest/mod.rs b/src/columnar_storage/src/manifest/mod.rs similarity index 100% rename from src/storage/src/manifest/mod.rs rename to src/columnar_storage/src/manifest/mod.rs diff --git a/src/storage/src/operator.rs b/src/columnar_storage/src/operator.rs similarity index 100% rename from src/storage/src/operator.rs rename to src/columnar_storage/src/operator.rs diff --git a/src/storage/src/read.rs b/src/columnar_storage/src/read.rs similarity index 100% rename from src/storage/src/read.rs rename to src/columnar_storage/src/read.rs diff --git a/src/storage/src/sst.rs b/src/columnar_storage/src/sst.rs similarity index 100% rename from src/storage/src/sst.rs rename to src/columnar_storage/src/sst.rs diff --git a/src/storage/src/storage.rs b/src/columnar_storage/src/storage.rs similarity index 100% rename from src/storage/src/storage.rs rename to src/columnar_storage/src/storage.rs diff --git a/src/storage/src/test_util.rs b/src/columnar_storage/src/test_util.rs similarity index 100% rename from src/storage/src/test_util.rs rename to src/columnar_storage/src/test_util.rs diff --git a/src/storage/src/types.rs b/src/columnar_storage/src/types.rs similarity index 100% rename from src/storage/src/types.rs rename to src/columnar_storage/src/types.rs diff --git a/src/metric_engine/Cargo.toml b/src/metric_engine/Cargo.toml index ceaf134241..57a1d9f48d 100644 --- a/src/metric_engine/Cargo.toml +++ b/src/metric_engine/Cargo.toml @@ -28,7 +28,7 @@ description.workspace = true [dependencies] anyhow = { workspace = true } common = { workspace = true } -horaedb_storage = { workspace = true } +columnar_storage = { workspace = true } seahash = { workspace = true } thiserror = { workspace = true } tokio = { workspace = true } diff --git a/src/metric_engine/src/data/mod.rs b/src/metric_engine/src/data/mod.rs index 96d1a9d78f..a76b9613cd 100644 --- a/src/metric_engine/src/data/mod.rs +++ b/src/metric_engine/src/data/mod.rs @@ -17,7 +17,7 @@ use std::sync::Arc; -use horaedb_storage::storage::TimeMergeStorageRef; +use columnar_storage::storage::TimeMergeStorageRef; use crate::{types::Sample, Result}; diff --git a/src/metric_engine/src/index/mod.rs b/src/metric_engine/src/index/mod.rs index 4d98db4f5b..5c31a54617 100644 --- a/src/metric_engine/src/index/mod.rs +++ b/src/metric_engine/src/index/mod.rs @@ -17,7 +17,7 @@ use std::sync::Arc; -use horaedb_storage::storage::TimeMergeStorageRef; +use columnar_storage::storage::TimeMergeStorageRef; use crate::{types::Sample, Result}; diff --git a/src/metric_engine/src/metric/mod.rs b/src/metric_engine/src/metric/mod.rs index bdd1d6623c..a38af06a3a 100644 --- a/src/metric_engine/src/metric/mod.rs +++ b/src/metric_engine/src/metric/mod.rs @@ -17,7 +17,7 @@ use std::sync::Arc; -use horaedb_storage::storage::TimeMergeStorageRef; +use columnar_storage::storage::TimeMergeStorageRef; use crate::{types::Sample, Result}; diff --git a/src/server/Cargo.toml b/src/server/Cargo.toml index c696497880..536bc6a4a1 100644 --- a/src/server/Cargo.toml +++ b/src/server/Cargo.toml @@ -31,7 +31,7 @@ arrow = { workspace = true } clap = { workspace = true, features = ["derive"] } common = { workspace = true } futures = { workspace = true } -horaedb_storage = { workspace = true } +columnar_storage = { workspace = true } object_store = { workspace = true } rand = "0.8" serde = { workspace = true } diff --git a/src/server/src/config.rs b/src/server/src/config.rs index 668a895e4b..87951de910 100644 --- a/src/server/src/config.rs +++ b/src/server/src/config.rs @@ -83,7 +83,7 @@ impl Default for ThreadConfig { #[serde(default, deny_unknown_fields)] pub struct StorageConfig { pub object_store: ObjectStorageConfig, - pub time_merge_storage: horaedb_storage::config::StorageConfig, + pub time_merge_storage: columnar_storage::config::StorageConfig, } #[derive(Debug, Clone, Deserialize, Serialize)] diff --git a/src/server/src/main.rs b/src/server/src/main.rs index df154bc3fd..a0647d4754 100644 --- a/src/server/src/main.rs +++ b/src/server/src/main.rs @@ -37,13 +37,13 @@ use arrow::{ datatypes::{DataType, Field, Schema, SchemaRef}, }; use clap::Parser; -use config::{Config, ObjectStorageConfig}; -use horaedb_storage::{ +use columnar_storage::{ storage::{ CloudObjectStorage, CompactRequest, StorageRuntimes, TimeMergeStorageRef, WriteRequest, }, types::RuntimeRef, }; +use config::{Config, ObjectStorageConfig}; use object_store::local::LocalFileSystem; use tracing::{error, info}; use tracing_subscriber::EnvFilter; From a33bd6bd1d8d06a3efc864b25d72488f91adbd51 Mon Sep 17 00:00:00 2001 From: jiacai2050 Date: Sun, 27 Apr 2025 10:35:09 +0800 Subject: [PATCH 2/3] fix ci --- Cargo.toml | 5 +++-- src/benchmarks/Cargo.toml | 2 +- src/metric_engine/Cargo.toml | 2 +- src/server/Cargo.toml | 2 +- 4 files changed, 6 insertions(+), 5 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 9a8b797379..35e3a56b99 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -28,11 +28,12 @@ description = "A high-performance, distributed, cloud native time-series databas resolver = "2" members = [ "src/benchmarks", + "src/columnar_storage" +, "src/common", "src/metric_engine", "src/pb_types", - "src/server", - "src/columnar_storage" + "src/server" ] [workspace.dependencies] diff --git a/src/benchmarks/Cargo.toml b/src/benchmarks/Cargo.toml index 2797e5c7d7..a6c017bf07 100644 --- a/src/benchmarks/Cargo.toml +++ b/src/benchmarks/Cargo.toml @@ -27,8 +27,8 @@ description.workspace = true [dependencies] bytes = { workspace = true } -common = { workspace = true } columnar_storage = { workspace = true } +common = { workspace = true } pb_types = { workspace = true } prost = { workspace = true } serde = { workspace = true } diff --git a/src/metric_engine/Cargo.toml b/src/metric_engine/Cargo.toml index 57a1d9f48d..18247a0f14 100644 --- a/src/metric_engine/Cargo.toml +++ b/src/metric_engine/Cargo.toml @@ -27,8 +27,8 @@ description.workspace = true [dependencies] anyhow = { workspace = true } -common = { workspace = true } columnar_storage = { workspace = true } +common = { workspace = true } seahash = { workspace = true } thiserror = { workspace = true } tokio = { workspace = true } diff --git a/src/server/Cargo.toml b/src/server/Cargo.toml index 536bc6a4a1..7d32a48975 100644 --- a/src/server/Cargo.toml +++ b/src/server/Cargo.toml @@ -29,9 +29,9 @@ description.workspace = true actix-web = "4" arrow = { workspace = true } clap = { workspace = true, features = ["derive"] } +columnar_storage = { workspace = true } common = { workspace = true } futures = { workspace = true } -columnar_storage = { workspace = true } object_store = { workspace = true } rand = "0.8" serde = { workspace = true } From abfa1ea95accd0b3dcee454d43e508c4351e3495 Mon Sep 17 00:00:00 2001 From: jiacai2050 Date: Sun, 27 Apr 2025 14:37:03 +0800 Subject: [PATCH 3/3] rename ObjectBasedStorage --- src/columnar_storage/src/storage.rs | 23 ++++++++++++----------- src/metric_engine/src/metric/mod.rs | 6 +++--- src/server/src/main.rs | 8 ++++---- 3 files changed, 19 insertions(+), 18 deletions(-) diff --git a/src/columnar_storage/src/storage.rs b/src/columnar_storage/src/storage.rs index 14e3f1b217..119fa9dbe9 100644 --- a/src/columnar_storage/src/storage.rs +++ b/src/columnar_storage/src/storage.rs @@ -74,7 +74,7 @@ pub struct CompactRequest {} /// Time-aware merge storage interface. #[async_trait] -pub trait TimeMergeStorage { +pub trait ColumnarStorage { fn schema(&self) -> &SchemaRef; async fn write(&self, req: WriteRequest) -> Result<()>; @@ -86,7 +86,7 @@ pub trait TimeMergeStorage { async fn compact(&self, req: CompactRequest) -> Result<()>; } -pub type TimeMergeStorageRef = Arc<(dyn TimeMergeStorage + Send + Sync)>; +pub type ColumnarStorageRef = Arc<(dyn ColumnarStorage + Send + Sync)>; #[derive(Clone)] pub struct StorageRuntimes { @@ -103,13 +103,14 @@ impl StorageRuntimes { } } -/// `TimeMergeStorage` implementation using cloud object storage, it will split -/// data into different segments(aka `segment_duration`) based time range. +/// `ObjectBasedStorage` implements `ColumnarStorage` utilizing cloud object +/// storage (e.g., S3), dividing data into distinct segments based on time +/// ranges(often referred to as `segment_duration`) . /// -/// Compaction will be done by merging segments within a segment, and segment -/// will make it easy to support expiration. +/// Compaction is facilitated through the merging of SST files within a single +/// segment, making it simple to support expiration. #[allow(dead_code)] -pub struct CloudObjectStorage { +pub struct ObjectBasedStorage { segment_duration: Duration, path: String, store: ObjectStoreRef, @@ -133,7 +134,7 @@ pub struct CloudObjectStorage { /// {root_path}/data/... /// ``` /// `root_path` is composed of `path` and `segment_duration`. -impl CloudObjectStorage { +impl ObjectBasedStorage { pub async fn try_new( path: String, segment_duration: Duration, @@ -298,7 +299,7 @@ impl CloudObjectStorage { } #[async_trait] -impl TimeMergeStorage for CloudObjectStorage { +impl ColumnarStorage for ObjectBasedStorage { fn schema(&self) -> &SchemaRef { &self.schema.arrow_schema } @@ -394,7 +395,7 @@ mod tests { let store = Arc::new(LocalFileSystem::new()); let runtimes = build_runtimes(); runtimes.sst_compact_runtime.clone().block_on(async move { - let storage = CloudObjectStorage::try_new( + let storage = ObjectBasedStorage::try_new( root_dir.path().to_string_lossy().to_string(), Duration::from_hours(2), store, @@ -496,7 +497,7 @@ mod tests { let store = Arc::new(LocalFileSystem::new()); let runtimes = build_runtimes(); runtimes.sst_compact_runtime.clone().block_on(async move { - let storage = CloudObjectStorage::try_new( + let storage = ObjectBasedStorage::try_new( root_dir.path().to_string_lossy().to_string(), Duration::from_hours(2), store, diff --git a/src/metric_engine/src/metric/mod.rs b/src/metric_engine/src/metric/mod.rs index a38af06a3a..535854ecf7 100644 --- a/src/metric_engine/src/metric/mod.rs +++ b/src/metric_engine/src/metric/mod.rs @@ -17,7 +17,7 @@ use std::sync::Arc; -use columnar_storage::storage::TimeMergeStorageRef; +use columnar_storage::storage::ColumnarStorageRef; use crate::{types::Sample, Result}; @@ -26,7 +26,7 @@ pub struct MetricManager { } impl MetricManager { - pub fn new(storage: TimeMergeStorageRef) -> Self { + pub fn new(storage: ColumnarStorageRef) -> Self { Self { inner: Arc::new(Inner { storage }), } @@ -40,7 +40,7 @@ impl MetricManager { } struct Inner { - storage: TimeMergeStorageRef, + storage: ColumnarStorageRef, } impl Inner { diff --git a/src/server/src/main.rs b/src/server/src/main.rs index a0647d4754..593cc0d167 100644 --- a/src/server/src/main.rs +++ b/src/server/src/main.rs @@ -39,7 +39,7 @@ use arrow::{ use clap::Parser; use columnar_storage::{ storage::{ - CloudObjectStorage, CompactRequest, StorageRuntimes, TimeMergeStorageRef, WriteRequest, + ColumnarStorageRef, CompactRequest, ObjectBasedStorage, StorageRuntimes, WriteRequest, }, types::RuntimeRef, }; @@ -80,7 +80,7 @@ async fn compact(data: web::Data) -> impl Responder { } struct AppState { - storage: TimeMergeStorageRef, + storage: ColumnarStorageRef, keep_writing: Arc, } @@ -121,7 +121,7 @@ pub fn main() { let _ = rt.block_on(async move { let store = Arc::new(LocalFileSystem::new()); let storage = Arc::new( - CloudObjectStorage::try_new( + ObjectBasedStorage::try_new( object_store_config.data_dir, segment_duration, store, @@ -185,7 +185,7 @@ fn build_schema() -> SchemaRef { } fn bench_write( - storage: TimeMergeStorageRef, + storage: ColumnarStorageRef, rt: RuntimeRef, workers: usize, interval: Duration,