Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 3 additions & 15 deletions src/orcapod/databases/__init__.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,4 @@
# from .legacy.types import DataStore, ArrowDataStore
# from .legacy.legacy_arrow_data_stores import MockArrowDataStore, SimpleParquetDataStore
# from .legacy.dict_data_stores import DirDataStore, NoOpDataStore
# from .legacy.safe_dir_data_store import SafeDirDataStore

# __all__ = [
# "DataStore",
# "ArrowDataStore",
# "DirDataStore",
# "SafeDirDataStore",
# "NoOpDataStore",
# "MockArrowDataStore",
# "SimpleParquetDataStore",
# ]

from .no_op_database import NoOpDatabase
from .delta_lake_databases import DeltaTableDatabase

__all__ = ["NoOpDatabase", "DeltaTableDatabase"]
88 changes: 88 additions & 0 deletions src/orcapod/databases/no_op_database.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
import logging
from collections.abc import Collection, Mapping
from typing import TYPE_CHECKING, Any, Literal


from orcapod.utils.lazy_module import LazyModule

if TYPE_CHECKING:
import polars as pl
import pyarrow as pa
else:
pa = LazyModule("pyarrow")
pl = LazyModule("polars")

# Module-level logger
logger = logging.getLogger(__name__)


class NoOpDatabase:
def add_record(
self,
record_path: tuple[str, ...],
record_id: str,
record: "pa.Table",
skip_duplicates: bool = False,
flush: bool = False,
schema_handling: Literal["merge", "error", "coerce"] = "error",
) -> None:
return

def add_records(
self,
record_path: tuple[str, ...],
records: pa.Table,
record_id_column: str | None = None,
skip_duplicates: bool = False,
flush: bool = False,
schema_handling: Literal["merge", "error", "coerce"] = "error",
) -> None:
return

def get_all_records(
self,
record_path: tuple[str, ...],
record_id_column: str | None = None,
retrieve_pending: bool = True,
) -> pa.Table | None:
return None

def get_records_with_column_value(
self,
record_path: tuple[str, ...],
column_values: Collection[tuple[str, Any]] | Mapping[str, Any],
record_id_column: str | None = None,
flush: bool = False,
) -> "pa.Table | None":
return None

def get_record_by_id(
self,
record_path: tuple[str, ...],
record_id: str,
record_id_column: str | None = None,
flush: bool = False,
) -> "pa.Table | None":
return None

def get_records_by_ids(
self,
record_path: tuple[str, ...],
record_ids: "Collection[str] | pl.Series | pa.Array",
record_id_column: str | None = None,
flush: bool = False,
) -> "pa.Table | None":
return None

def flush(self) -> None:
"""Flush all pending batches."""
return None

def flush_batch(self, record_path: tuple[str, ...]) -> None:
"""
Flush pending batch for a specific source path.

Args:
record_path: Tuple of path components
"""
return None
16 changes: 11 additions & 5 deletions src/orcapod/pipeline/graph.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import warnings
from orcapod.core.trackers import GraphTracker, Invocation
from orcapod.pipeline.nodes import KernelNode, PodNode
from orcapod.protocols.pipeline_protocols import Node
from orcapod import contexts
from orcapod.protocols import core_protocols as cp
from orcapod.protocols import database_protocols as dbp
from orcapod.databases import NoOpDatabase
from typing import Any
from collections.abc import Collection
import os
Expand Down Expand Up @@ -73,7 +75,7 @@ class Pipeline(GraphTracker):
def __init__(
self,
name: str | tuple[str, ...],
pipeline_database: dbp.ArrowDatabase,
pipeline_database: dbp.ArrowDatabase | None = None,
results_database: dbp.ArrowDatabase | None = None,
tracker_manager: cp.TrackerManager | None = None,
data_context: str | contexts.DataContext | None = None,
Expand All @@ -84,14 +86,18 @@ def __init__(
name = (name,)
self.name = name
self.pipeline_store_path_prefix = self.name

if pipeline_database is None:
pipeline_database = NoOpDatabase()
warnings.warn(
"No database was specified. Pipeline results will not be saved"
)

self.results_store_path_prefix = ()
if results_database is None:
if pipeline_database is None:
raise ValueError(
"Either pipeline_database or results_database must be provided"
)
results_database = pipeline_database
self.results_store_path_prefix = self.name + ("_results",)

self.pipeline_database = pipeline_database
self.results_database = results_database
self.nodes: dict[str, Node] = {}
Expand Down
Loading