Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
109 commits
Select commit Hold shift + click to select a range
0c54386
Move TransactionManager to db4-storage
fabubaker Dec 1, 2025
92f2793
Create DurabilityOps
fabubaker Dec 1, 2025
cff7a1e
Remove wal from transaction manager
fabubaker Dec 2, 2025
6f24855
Simplify wal log and replay methods
fabubaker Dec 2, 2025
c192d64
Add sketch of correct logging to add_edge
fabubaker Dec 2, 2025
fd49e14
Add lsn to MemNodeSegment/MemEdgeSegment
fabubaker Dec 2, 2025
aa9f84b
Add set_lsn method for AtomicAddEdge
fabubaker Dec 2, 2025
c2c77fa
Simplify WriterPair to NodeWriters
fabubaker Dec 2, 2025
f263616
Remove lsn args
fabubaker Dec 3, 2025
5c0069c
Implement basic add_edge replay
fabubaker Dec 4, 2025
fa11c7a
Simplify GraphWal
fabubaker Dec 5, 2025
aa73139
Remove wrapper for graph replay
fabubaker Dec 10, 2025
df6679c
Merge branch 'db_v4' of github.com:Pometry/Raphtory into db_v4_/wal
fabubaker Dec 10, 2025
6944a6f
Fix leftover merge issues
fabubaker Dec 10, 2025
65be87a
Change mark_dirty to set_dirty
fabubaker Dec 10, 2025
9a80df7
Add replay tests
fabubaker Dec 11, 2025
c3f3352
Always set edge_writer in init in WriteSession
fabubaker Dec 11, 2025
bc54103
Return early from add_static_edge if edge exists
fabubaker Dec 12, 2025
f7ac76a
Move GraphReplay to WriteLockedGraph
fabubaker Dec 12, 2025
8e778ca
Implement add_edge replay for WriteLockedGraph
fabubaker Dec 16, 2025
110781b
Minor cleanup
fabubaker Dec 18, 2025
5b7532a
Check lsn before replaying wal entries
fabubaker Jan 6, 2026
987a17a
Implement take for node and edge segments
fabubaker Jan 6, 2026
c4d5d7c
Set lsn during replay
fabubaker Jan 6, 2026
01257e5
Rename wal sync to flush
fabubaker Jan 7, 2026
9221008
Modify flush to take an LSN
fabubaker Jan 7, 2026
0fddef3
Remove background wal flush
fabubaker Jan 8, 2026
612dc09
Rename to PersistenceStrategy
fabubaker Jan 8, 2026
c4e8525
Merge branch 'db_v4' of github.com:Pometry/Raphtory into db_v4_/wal
fabubaker Jan 8, 2026
417ce5f
Rename graph_config to persistence_config
fabubaker Jan 9, 2026
3528f71
Use config through PersistenceStrategy
fabubaker Jan 9, 2026
aea3450
Merge branch 'db_v4' of github.com:Pometry/Raphtory into db_v4_/wal
fabubaker Jan 9, 2026
9979c50
Fix parallel_flush
fabubaker Jan 9, 2026
aa77163
Remove defaults for Extension
fabubaker Jan 9, 2026
aa55404
Move read/write from dir methods to PersistenceConfig
fabubaker Jan 9, 2026
934b3a8
Add config_mut TODO
fabubaker Jan 12, 2026
eb459fd
Apply some more page -> segment rename
fabubaker Jan 12, 2026
427f2a8
Expose WalType through PersistenceStrategy
fabubaker Jan 12, 2026
7a13c79
Pass wal as argument to constructor
fabubaker Jan 12, 2026
00d4a3c
Expose wal from extension
fabubaker Jan 12, 2026
f78279c
Add more docs to graph paths
fabubaker Jan 12, 2026
7c375dc
Minor cleanup
fabubaker Jan 14, 2026
334fef4
Modify graph load to accept extension
fabubaker Jan 14, 2026
30c2744
Use PersistenceConfig::new instead of strategy constructors
fabubaker Jan 14, 2026
8ad9128
Run fmt
fabubaker Jan 14, 2026
529a2cd
Use &Path instead of PathBuf for wal
fabubaker Jan 14, 2026
9b9af6e
Set next_lsn after replay
fabubaker Jan 15, 2026
8334e83
Change replay to be an instance method
fabubaker Jan 15, 2026
252ded1
Add load method to Wal
fabubaker Jan 15, 2026
37bd656
Run fmt
fabubaker Jan 15, 2026
e306731
Run fmt
fabubaker Jan 16, 2026
bf3e0f7
Create new WAL file on materialize
fabubaker Jan 16, 2026
7f609f9
Merge branch 'db_v4' of github.com:Pometry/Raphtory into db_v4_/wal
fabubaker Jan 19, 2026
3ebf9f9
Fix some more merge conflicts
fabubaker Jan 19, 2026
0b83372
Merge branch 'db_v4' of github.com:Pometry/Raphtory into db_v4_/wal
fabubaker Jan 19, 2026
2e17df0
Fix all merge conflicts
fabubaker Jan 20, 2026
1504e82
Merge branch 'db_v4' of github.com:Pometry/Raphtory into db_v4_/wal
fabubaker Jan 20, 2026
412c4bb
Move merge.rs to db4-storage
fabubaker Jan 20, 2026
d080a26
Modify Graph::new to take merge_config
fabubaker Jan 20, 2026
d379b02
Merge branch 'db_v4' of github.com:Pometry/Raphtory into db_v4_/wal
fabubaker Jan 21, 2026
f30440a
Document re-exports
fabubaker Jan 21, 2026
41364f9
Rename Wal trait to WalOps
fabubaker Jan 21, 2026
dc45af9
Rename WalType to Wal
fabubaker Jan 21, 2026
18dad10
Cleanup configs in OS
fabubaker Jan 21, 2026
dc2f834
Use separate ConfigOps
fabubaker Jan 22, 2026
70b1d07
Use consistent re-exports
fabubaker Jan 22, 2026
94ffe23
Remove MergeConfig
fabubaker Jan 22, 2026
7755d28
Remove silly comment
fabubaker Jan 22, 2026
7aa753b
Use getter methods for PersistenceConfig
fabubaker Jan 22, 2026
7abc7c6
Run fmt
fabubaker Jan 23, 2026
e4feece
Merge branch 'db_v4' of github.com:Pometry/Raphtory into db_v4_/wal
fabubaker Jan 23, 2026
8153f57
Fix up defaults
fabubaker Jan 23, 2026
a304065
Run fmt
fabubaker Jan 23, 2026
eb13fe2
Rename bg_flush_enabled to bg_flush
fabubaker Jan 26, 2026
3269c40
Fix node_types
fabubaker Jan 27, 2026
b869ea6
Rename check_node to has_node
fabubaker Jan 27, 2026
acda4a8
Increment num_nodes for segment in wal replay
fabubaker Jan 27, 2026
cf65020
Run fmt
fabubaker Jan 27, 2026
4bf60cf
chore: apply tidy-public auto-fixes
github-actions[bot] Jan 27, 2026
9ff9ce4
Merge db_v4 into db_v4_/wal
ljeub-pometry Jan 27, 2026
6088eca
fix handling of additions with internal ids and avoid cloning the id
ljeub-pometry Jan 27, 2026
383bb9b
chore: apply tidy-public auto-fixes
github-actions[bot] Jan 27, 2026
c38b67d
Remove silly comment and use STATIC_GRAPH_LAYER_ID
fabubaker Jan 27, 2026
2f77e09
Add wal.has_entries
fabubaker Jan 27, 2026
b2f67ee
json output for GraphFixture
ljeub-pometry Jan 27, 2026
fbdff92
Restrict DurabilityOps impls
fabubaker Jan 27, 2026
384ef5b
Merge branch 'db_v4' of github.com:Pometry/Raphtory into db_v4_/wal
fabubaker Jan 27, 2026
802f851
Merge branch 'db_v4_/wal' of github.com:Pometry/Raphtory into db_v4_/wal
fabubaker Jan 27, 2026
32f2974
Run fmt
fabubaker Jan 27, 2026
30f10fd
Minor cleanup
fabubaker Jan 27, 2026
da23315
Use STATIC_GRAPH_LAYER_ID
fabubaker Jan 27, 2026
299c723
Run fmt
fabubaker Jan 27, 2026
d8a5c64
chore: apply tidy-public auto-fixes
github-actions[bot] Jan 27, 2026
5ac2088
Merge branch 'db_v4_/wal' of github.com:Pometry/Raphtory into db_v4_/wal
fabubaker Jan 27, 2026
85d1703
some cleanup for add_edge
ljeub-pometry Jan 28, 2026
02264a4
json output for GraphFixture
ljeub-pometry Jan 27, 2026
e991fe1
refactor the config
ljeub-pometry Jan 28, 2026
2edfb7e
check for self-loops in materialize
ljeub-pometry Jan 28, 2026
aa7dcc1
Merge fix/materialize into db_v4_/wal
ljeub-pometry Jan 28, 2026
66036dd
property types need to be unified
ljeub-pometry Jan 28, 2026
34115ca
move the property meta updates under the edge segment lsn
ljeub-pometry Jan 28, 2026
8e93a06
Fix borrow checker issues in replay
fabubaker Jan 28, 2026
af22b61
Move DEFAULT_MAX_MEMORY_BYTES to private
fabubaker Jan 28, 2026
cab894c
move the count checks to the end as the other ones are more helpful f…
ljeub-pometry Jan 29, 2026
714ad3f
serialize PropArray as ipc
ljeub-pometry Jan 29, 2026
e81f78d
postcard cannot handle the multi-deserialize in BigDecimal
ljeub-pometry Jan 29, 2026
fcfcdc3
Merge db_v4 into db_v4_/wal
ljeub-pometry Jan 29, 2026
88aaa05
fmt
ljeub-pometry Jan 29, 2026
f1cc3ee
cleanup
ljeub-pometry Jan 29, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
125 changes: 43 additions & 82 deletions db4-graph/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@
use std::{
io,
path::{Path, PathBuf},
sync::{
atomic::{self, AtomicU64, AtomicUsize},
Arc,
},
sync::{atomic::AtomicUsize, Arc},
};

use raphtory_api::core::{
Expand All @@ -25,17 +22,20 @@ use storage::{
nodes::WriteLockedNodePages,
},
},
persist::strategy::PersistentStrategy,
persist::strategy::PersistenceStrategy,
resolver::GIDResolverOps,
wal::{GraphWal, TransactionID, Wal},
Extension, GIDResolver, Layer, ReadLockedLayer, WalImpl, ES, GS, NS,
transaction::TransactionManager,
wal::WalOps,
Config, Extension, GIDResolver, Layer, ReadLockedLayer, Wal, ES, GS, NS,
};
use tempfile::TempDir;

mod replay;

#[derive(Debug)]
pub struct TemporalGraph<EXT = Extension>
where
EXT: PersistentStrategy<NS = NS<EXT>, ES = ES<EXT>, GS = GS<EXT>>,
EXT: PersistenceStrategy<NS = NS<EXT>, ES = ES<EXT>, GS = GS<EXT>>,
NS<EXT>: NodeSegmentOps<Extension = EXT>,
ES<EXT>: EdgeSegmentOps<Extension = EXT>,
GS<EXT>: GraphPropSegmentOps<Extension = EXT>,
Expand All @@ -46,7 +46,6 @@ where
storage: Arc<Layer<EXT>>,
graph_dir: Option<GraphDir>,
pub transaction_manager: Arc<TransactionManager>,
pub wal: Arc<WalImpl>,
}

#[derive(Debug)]
Expand Down Expand Up @@ -90,49 +89,17 @@ impl<'a> From<&'a Path> for GraphDir {
}
}

#[derive(Debug)]
pub struct TransactionManager {
last_transaction_id: AtomicU64,
wal: Arc<WalImpl>,
}

impl TransactionManager {
const STARTING_TRANSACTION_ID: TransactionID = 1;

pub fn new(wal: Arc<WalImpl>) -> Self {
Self {
last_transaction_id: AtomicU64::new(Self::STARTING_TRANSACTION_ID),
wal,
}
}

pub fn load(self, last_transaction_id: TransactionID) {
self.last_transaction_id
.store(last_transaction_id, atomic::Ordering::SeqCst)
}

pub fn begin_transaction(&self) -> TransactionID {
let transaction_id = self
.last_transaction_id
.fetch_add(1, atomic::Ordering::SeqCst);
self.wal.log_begin_transaction(transaction_id).unwrap();
transaction_id
}

pub fn end_transaction(&self, transaction_id: TransactionID) {
self.wal.log_end_transaction(transaction_id).unwrap();
}
}

impl Default for TemporalGraph<Extension> {
fn default() -> Self {
Self::new(Extension::default()).unwrap()
let config = Config::default();
let wal = Arc::new(Wal::new(None).unwrap());
Self::new(Extension::new(config, wal)).unwrap()
}
}

impl<EXT> TemporalGraph<EXT>
where
EXT: PersistentStrategy<NS = NS<EXT>, ES = ES<EXT>, GS = GS<EXT>>,
EXT: PersistenceStrategy<NS = NS<EXT>, ES = ES<EXT>, GS = GS<EXT>>,
NS<EXT>: NodeSegmentOps<Extension = EXT>,
ES<EXT>: EdgeSegmentOps<Extension = EXT>,
GS<EXT>: GraphPropSegmentOps<Extension = EXT>,
Expand All @@ -145,7 +112,7 @@ where
Self::new_with_meta(None, node_meta, edge_meta, graph_props_meta, ext)
}

pub fn new_with_path(path: impl AsRef<Path>, ext: EXT) -> Result<Self, StorageError> {
pub fn new_at_path_with_ext(path: impl AsRef<Path>, ext: EXT) -> Result<Self, StorageError> {
let node_meta = Meta::new_for_nodes();
let edge_meta = Meta::new_for_edges();
let graph_props_meta = Meta::new_for_graph_props();
Expand All @@ -159,26 +126,6 @@ where
)
}

pub fn load_from_path(path: impl AsRef<Path>) -> Result<Self, StorageError> {
let path = path.as_ref();
let storage = Layer::load(path)?;
let id_type = storage.nodes().id_type();

let gid_resolver_dir = path.join("gid_resolver");
let resolver = GIDResolver::new_with_path(&gid_resolver_dir, id_type)?;
let wal_dir = path.join("wal");
let wal = Arc::new(WalImpl::new(Some(wal_dir))?);

Ok(Self {
graph_dir: Some(path.into()),
event_counter: AtomicUsize::new(resolver.len()),
logical_to_physical: resolver.into(),
storage: Arc::new(storage),
transaction_manager: Arc::new(TransactionManager::new(wal.clone())),
wal,
})
}

pub fn new_with_meta(
graph_dir: Option<GraphDir>,
node_meta: Meta,
Expand Down Expand Up @@ -218,16 +165,29 @@ where
ext,
);

let wal_dir = graph_dir.as_ref().map(|dir| dir.wal_dir());
let wal = Arc::new(WalImpl::new(wal_dir)?);

Ok(Self {
graph_dir,
logical_to_physical,
storage: Arc::new(storage),
transaction_manager: Arc::new(TransactionManager::new(wal.clone())),
transaction_manager: Arc::new(TransactionManager::new()),
event_counter: AtomicUsize::new(0),
wal,
})
}

pub fn load_from_path(path: impl AsRef<Path>, ext: EXT) -> Result<Self, StorageError> {
let path = path.as_ref();
let storage = Layer::load(path, ext)?;
let id_type = storage.nodes().id_type();

let gid_resolver_dir = path.join("gid_resolver");
let resolver = GIDResolver::new_with_path(&gid_resolver_dir, id_type)?;

Ok(Self {
graph_dir: Some(path.into()),
event_counter: AtomicUsize::new(resolver.len()),
logical_to_physical: resolver.into(),
storage: Arc::new(storage),
transaction_manager: Arc::new(TransactionManager::new()),
})
}

Expand Down Expand Up @@ -266,10 +226,12 @@ where
.get_str(string)
.or_else(|| self.logical_to_physical.get_u64(string.id())),
}?;

// VIDs in the resolver may not be initialised yet, need to double-check the node actually exists!
let nodes = self.storage().nodes();
let (page_id, pos) = nodes.resolve_pos(vid);
let node_page = nodes.segments().get(page_id)?;

if pos.0 < node_page.num_nodes() {
Some(vid)
} else {
Expand Down Expand Up @@ -404,9 +366,10 @@ where
}
}

/// Holds write locks across all segments in the graph for fast bulk ingestion.
pub struct WriteLockedGraph<'a, EXT>
where
EXT: PersistentStrategy<NS = NS<EXT>, ES = ES<EXT>, GS = GS<EXT>>,
EXT: PersistenceStrategy<NS = NS<EXT>, ES = ES<EXT>, GS = GS<EXT>>,
NS<EXT>: NodeSegmentOps<Extension = EXT>,
ES<EXT>: EdgeSegmentOps<Extension = EXT>,
GS<EXT>: GraphPropSegmentOps<Extension = EXT>,
Expand All @@ -419,7 +382,7 @@ where

impl<'a, EXT> WriteLockedGraph<'a, EXT>
where
EXT: PersistentStrategy<NS = NS<EXT>, ES = ES<EXT>, GS = GS<EXT>>,
EXT: PersistenceStrategy<NS = NS<EXT>, ES = ES<EXT>, GS = GS<EXT>>,
NS<EXT>: NodeSegmentOps<Extension = EXT>,
ES<EXT>: EdgeSegmentOps<Extension = EXT>,
GS<EXT>: GraphPropSegmentOps<Extension = EXT>,
Expand All @@ -437,17 +400,15 @@ where
self.graph
}

pub fn resize_chunks_to_num_nodes(&mut self, max_vid: Option<VID>) {
if let Some(max_vid) = max_vid {
let (chunks_needed, _) = self.graph.storage.nodes().resolve_pos(max_vid);
self.graph.storage().nodes().grow(chunks_needed + 1);
std::mem::take(&mut self.nodes);
self.nodes = self.graph.storage.nodes().write_locked();
}
pub fn resize_chunks_to_vid(&mut self, vid: VID) {
let (chunks_needed, _) = self.graph.storage.nodes().resolve_pos(vid);
self.graph.storage().nodes().grow(chunks_needed + 1);
std::mem::take(&mut self.nodes);
self.nodes = self.graph.storage.nodes().write_locked();
}

pub fn resize_chunks_to_num_edges(&mut self, max_eid: EID) {
let (chunks_needed, _) = self.graph.storage.edges().resolve_pos(max_eid);
pub fn resize_chunks_to_eid(&mut self, eid: EID) {
let (chunks_needed, _) = self.graph.storage.edges().resolve_pos(eid);
self.graph.storage().edges().grow(chunks_needed + 1);
std::mem::take(&mut self.edges);
self.edges = self.graph.storage.edges().write_locked();
Expand Down
Loading
Loading