Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 29 additions & 29 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 4 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,18 +28,19 @@ description = "A high-performance, distributed, cloud native time-series databas
resolver = "2"
members = [
"src/benchmarks",
"src/columnar_storage"
,
"src/common",
"src/metric_engine",
"src/pb_types",
"src/server",
"src/storage"
"src/server"
]

[workspace.dependencies]
anyhow = { version = "1.0" }
seahash = { version = "4" }
metric_engine = { path = "src/metric_engine" }
horaedb_storage = { package = "storage", path = "src/storage" }
columnar_storage = { path = "src/columnar_storage" }
common = { path = "src/common" }
thiserror = "1"
bytes = "1"
Expand Down
2 changes: 1 addition & 1 deletion src/benchmarks/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ description.workspace = true

[dependencies]
bytes = { workspace = true }
columnar_storage = { workspace = true }
common = { workspace = true }
horaedb_storage = { workspace = true }
pb_types = { workspace = true }
prost = { workspace = true }
serde = { workspace = true }
Expand Down
2 changes: 1 addition & 1 deletion src/benchmarks/src/encoding_bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
//! encoding bench.

use bytes::Bytes;
use horaedb_storage::{
use columnar_storage::{
manifest::Snapshot,
sst::{FileMeta, SstFile},
};
Expand Down
2 changes: 1 addition & 1 deletion src/storage/Cargo.toml → src/columnar_storage/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
# under the License.

[package]
name = "storage"
name = "columnar_storage"
version.workspace = true
authors.workspace = true
edition.workspace = true
Expand Down
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
23 changes: 12 additions & 11 deletions src/storage/src/storage.rs → src/columnar_storage/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ pub struct CompactRequest {}

/// Time-aware merge storage interface.
#[async_trait]
pub trait TimeMergeStorage {
pub trait ColumnarStorage {
fn schema(&self) -> &SchemaRef;

async fn write(&self, req: WriteRequest) -> Result<()>;
Expand All @@ -86,7 +86,7 @@ pub trait TimeMergeStorage {
async fn compact(&self, req: CompactRequest) -> Result<()>;
}

pub type TimeMergeStorageRef = Arc<(dyn TimeMergeStorage + Send + Sync)>;
pub type ColumnarStorageRef = Arc<(dyn ColumnarStorage + Send + Sync)>;

#[derive(Clone)]
pub struct StorageRuntimes {
Expand All @@ -103,13 +103,14 @@ impl StorageRuntimes {
}
}

/// `TimeMergeStorage` implementation using cloud object storage, it will split
/// data into different segments(aka `segment_duration`) based time range.
/// `ObjectBasedStorage` implements `ColumnarStorage` utilizing cloud object
/// storage (e.g., S3), dividing data into distinct segments based on time
/// ranges(often referred to as `segment_duration`) .
///
/// Compaction will be done by merging segments within a segment, and segment
/// will make it easy to support expiration.
/// Compaction is facilitated through the merging of SST files within a single
/// segment, making it simple to support expiration.
#[allow(dead_code)]
pub struct CloudObjectStorage {
pub struct ObjectBasedStorage {
segment_duration: Duration,
path: String,
store: ObjectStoreRef,
Expand All @@ -133,7 +134,7 @@ pub struct CloudObjectStorage {
/// {root_path}/data/...
/// ```
/// `root_path` is composed of `path` and `segment_duration`.
impl CloudObjectStorage {
impl ObjectBasedStorage {
pub async fn try_new(
path: String,
segment_duration: Duration,
Expand Down Expand Up @@ -298,7 +299,7 @@ impl CloudObjectStorage {
}

#[async_trait]
impl TimeMergeStorage for CloudObjectStorage {
impl ColumnarStorage for ObjectBasedStorage {
fn schema(&self) -> &SchemaRef {
&self.schema.arrow_schema
}
Expand Down Expand Up @@ -394,7 +395,7 @@ mod tests {
let store = Arc::new(LocalFileSystem::new());
let runtimes = build_runtimes();
runtimes.sst_compact_runtime.clone().block_on(async move {
let storage = CloudObjectStorage::try_new(
let storage = ObjectBasedStorage::try_new(
root_dir.path().to_string_lossy().to_string(),
Duration::from_hours(2),
store,
Expand Down Expand Up @@ -496,7 +497,7 @@ mod tests {
let store = Arc::new(LocalFileSystem::new());
let runtimes = build_runtimes();
runtimes.sst_compact_runtime.clone().block_on(async move {
let storage = CloudObjectStorage::try_new(
let storage = ObjectBasedStorage::try_new(
root_dir.path().to_string_lossy().to_string(),
Duration::from_hours(2),
store,
Expand Down
File renamed without changes.
File renamed without changes.
2 changes: 1 addition & 1 deletion src/metric_engine/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ description.workspace = true

[dependencies]
anyhow = { workspace = true }
columnar_storage = { workspace = true }
common = { workspace = true }
horaedb_storage = { workspace = true }
seahash = { workspace = true }
thiserror = { workspace = true }
tokio = { workspace = true }
Expand Down
2 changes: 1 addition & 1 deletion src/metric_engine/src/data/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

use std::sync::Arc;

use horaedb_storage::storage::TimeMergeStorageRef;
use columnar_storage::storage::TimeMergeStorageRef;

use crate::{types::Sample, Result};

Expand Down
2 changes: 1 addition & 1 deletion src/metric_engine/src/index/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

use std::sync::Arc;

use horaedb_storage::storage::TimeMergeStorageRef;
use columnar_storage::storage::TimeMergeStorageRef;

use crate::{types::Sample, Result};

Expand Down
6 changes: 3 additions & 3 deletions src/metric_engine/src/metric/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

use std::sync::Arc;

use horaedb_storage::storage::TimeMergeStorageRef;
use columnar_storage::storage::ColumnarStorageRef;

use crate::{types::Sample, Result};

Expand All @@ -26,7 +26,7 @@ pub struct MetricManager {
}

impl MetricManager {
pub fn new(storage: TimeMergeStorageRef) -> Self {
pub fn new(storage: ColumnarStorageRef) -> Self {
Self {
inner: Arc::new(Inner { storage }),
}
Expand All @@ -40,7 +40,7 @@ impl MetricManager {
}

struct Inner {
storage: TimeMergeStorageRef,
storage: ColumnarStorageRef,
}

impl Inner {
Expand Down
2 changes: 1 addition & 1 deletion src/server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@ description.workspace = true
actix-web = "4"
arrow = { workspace = true }
clap = { workspace = true, features = ["derive"] }
columnar_storage = { workspace = true }
common = { workspace = true }
futures = { workspace = true }
horaedb_storage = { workspace = true }
object_store = { workspace = true }
rand = "0.8"
serde = { workspace = true }
Expand Down
2 changes: 1 addition & 1 deletion src/server/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ impl Default for ThreadConfig {
#[serde(default, deny_unknown_fields)]
pub struct StorageConfig {
pub object_store: ObjectStorageConfig,
pub time_merge_storage: horaedb_storage::config::StorageConfig,
pub time_merge_storage: columnar_storage::config::StorageConfig,
}

#[derive(Debug, Clone, Deserialize, Serialize)]
Expand Down
12 changes: 6 additions & 6 deletions src/server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,13 @@ use arrow::{
datatypes::{DataType, Field, Schema, SchemaRef},
};
use clap::Parser;
use config::{Config, ObjectStorageConfig};
use horaedb_storage::{
use columnar_storage::{
storage::{
CloudObjectStorage, CompactRequest, StorageRuntimes, TimeMergeStorageRef, WriteRequest,
ColumnarStorageRef, CompactRequest, ObjectBasedStorage, StorageRuntimes, WriteRequest,
},
types::RuntimeRef,
};
use config::{Config, ObjectStorageConfig};
use object_store::local::LocalFileSystem;
use tracing::{error, info};
use tracing_subscriber::EnvFilter;
Expand Down Expand Up @@ -80,7 +80,7 @@ async fn compact(data: web::Data<AppState>) -> impl Responder {
}

struct AppState {
storage: TimeMergeStorageRef,
storage: ColumnarStorageRef,
keep_writing: Arc<AtomicBool>,
}

Expand Down Expand Up @@ -121,7 +121,7 @@ pub fn main() {
let _ = rt.block_on(async move {
let store = Arc::new(LocalFileSystem::new());
let storage = Arc::new(
CloudObjectStorage::try_new(
ObjectBasedStorage::try_new(
object_store_config.data_dir,
segment_duration,
store,
Expand Down Expand Up @@ -185,7 +185,7 @@ fn build_schema() -> SchemaRef {
}

fn bench_write(
storage: TimeMergeStorageRef,
storage: ColumnarStorageRef,
rt: RuntimeRef,
workers: usize,
interval: Duration,
Expand Down