Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 9 additions & 9 deletions .github/workflows/test-python.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,21 +23,21 @@ jobs:
strategy:
fail-fast: false
matrix:
os: [ubuntu-latest, windows-latest]
os: ubuntu-latest

steps:
- uses: actions/checkout@v4

- name: Set up Python
uses: actions/setup-python@v5

- name: Install the latest version of uv
uses: astral-sh/setup-uv@v6

- name: Install dependencies
if: github.ref_name != 'main'
run: uv sync --all-extras
- name: "Set up Python"
uses: actions/setup-python@v5
with:
python-version-file: ".python-version"

- name: Install the project
run: uv sync --locked --all-extras --dev

- name: Run tests
if: github.ref_name != 'main'
run: uv run -m pytest --tb=short --disable-warnings
run: uv run pytest tests
1 change: 1 addition & 0 deletions .python-version
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
3.12
13 changes: 8 additions & 5 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,6 @@ dependencies = [
ai = ["huggingface-hub>=0.30.2", "ollama>=0.4.7"]
iceberg = ["polars>=1.27.1", "pyarrow>=19.0.1", "pyiceberg>=0.9.0"]
all = ["tower[ai,iceberg]"]
dev = [
"openapi-python-client>=0.12.1",
"pytest>=8.3.5",
"pytest-httpx>=0.35.0",
]

[tool.maturin]
bindings = "bin"
Expand All @@ -62,3 +57,11 @@ include = ["rust-toolchain.toml"]

[tool.uv.sources]
tower = { workspace = true }

[dependency-groups]
dev = [
"openapi-python-client>=0.12.1",
"pytest>=8.3.5",
"pytest-httpx>=0.35.0",
"pyiceberg[sql-sqlite]>=0.9.0",
]
147 changes: 139 additions & 8 deletions src/tower/_tables.py
Original file line number Diff line number Diff line change
@@ -1,28 +1,46 @@
from typing import Optional
from typing import Optional, Generic, TypeVar, Union, List
from dataclasses import dataclass

TTable = TypeVar("TTable", bound="Table")

import polars as pl
import pyarrow as pa
import pyarrow.compute as pc

from pyiceberg.catalog import load_catalog
from pyiceberg.table import Table as IcebergTable
from pyiceberg.catalog import (
Catalog,
load_catalog,
)

from ._context import TowerContext
from .utils.pyarrow import convert_pyarrow_schema
from .utils.pyarrow import (
convert_pyarrow_schema,
convert_pyarrow_expressions,
)
from .utils.tables import (
make_table_name,
namespace_or_default,
)

@dataclass
class RowsAffectedInformation:
inserts: int
updates: int


class Table:
"""
`Table` is a wrapper around an Iceberg table. It provides methods to read and
write data to the table.
"""

def __init__(self, context: TowerContext, table: IcebergTable):
self._stats = RowsAffectedInformation(0, 0)
self._context = context
self._table = table


def read(self) -> pl.DataFrame:
"""
Reads from the Iceberg tables. Returns the results as a Polars DataFrame.
Expand All @@ -31,28 +49,135 @@ def read(self) -> pl.DataFrame:
# the result as a DataFrame.
return pl.scan_iceberg(self._table).collect()

def insert(self, data: pa.Table):

def to_polars(self) -> pl.LazyFrame:
"""
Converts the table to a Polars LazyFrame. This is useful when you
understand Polars and you want to do something more complicated.
"""
return pl.scan_iceberg(self._table)


def rows_affected(self) -> RowsAffectedInformation:
"""
Returns the stats for the table. This includes the number of inserts,
updates, and deletes.
"""
return self._stats


def insert(self, data: pa.Table) -> TTable:
"""
Inserts data into the Iceberg table. The data is expressed as a PyArrow table.

Args:
data (pa.Table): The data to insert into the table.

Returns:
TTable: The table with the inserted rows.
"""
self._table.append(data)
self._stats.inserts += data.num_rows
return self


def upsert(self, data: pa.Table, join_cols: Optional[list[str]] = None) -> TTable:
"""
Upserts data into the Iceberg table. The data is expressed as a PyArrow table.

Args:
data (pa.Table): The data to upsert into the table.
join_cols (Optional[list[str]]): The columns that form the key to match rows on

Returns:
TTable: The table with the upserted rows.
"""
res = self._table.upsert(
data,
join_cols=join_cols,

# All upserts will always be case sensitive. Perhaps we'll add this
# as a parameter in the future?
case_sensitive=True,

# These are the defaults, but we're including them to be complete.
when_matched_update_all=True,
when_not_matched_insert_all=True,
)

# Update the stats with the results of the relevant upsert.
self._stats.updates += res.rows_updated
self._stats.inserts += res.rows_inserted

return self


def delete(self, filters: Union[str, List[pc.Expression]]) -> TTable:
"""
Deletes data from the Iceberg table. The filters are expressed as a
PyArrow expression. The filters are applied to the table and the
matching rows are deleted.

Args:
filters (Union[str, List[pc.Expression]]): The filters to apply to the table.
This can be a string or a list of PyArrow expressions.

Returns:
TTable: The table with the deleted rows.
"""
if isinstance(filters, list):
# We need to convert the pc.Expression into PyIceberg
next_filters = convert_pyarrow_expressions(filters)
filters = next_filters

self._table.delete(
delete_filter=filters,

# We want this to always be the case. Not sure why you wouldn't?
case_sensitive=True,
)

# NOTE: There is, unfortunately, no way to get the number of rows
# deleted besides comparing the two snapshots that were created.

return self


def schema(self) -> pa.Schema:
# We take an Iceberg Schema and we need to convert it into a PyArrow Schema
iceberg_schema = self._table.schema()
return iceberg_schema.as_arrow()


def column(self, name: str) -> pa.compute.Expression:
"""
Returns a column from the table. This is useful when you want to
perform some operations on the column.
"""
field = self.schema().field(name)

if field is None:
raise ValueError(f"Column {name} not found in table schema")

# We need to convert the PyArrow field into pa.compute.Expression
return pa.compute.field(name)


class TableReference:
def __init__(self, ctx: TowerContext, catalog_name: str, name: str, namespace: Optional[str] = None):
def __init__(self, ctx: TowerContext, catalog: Catalog, name: str, namespace: Optional[str] = None):
self._context = ctx
self._catalog = load_catalog(catalog_name)
self._catalog = catalog
self._name = name
self._namespace = namespace


def load(self) -> Table:
namespace = namespace_or_default(self._namespace)
table_name = make_table_name(self._name, namespace)
table = self._catalog.load_table(table_name)
return Table(self._context, table)


def create(self, schema: pa.Schema) -> Table:
namespace = namespace_or_default(self._namespace)
table_name = make_table_name(self._name, namespace)
Expand All @@ -71,6 +196,7 @@ def create(self, schema: pa.Schema) -> Table:

return Table(self._context, table)


def create_if_not_exists(self, schema: pa.Schema) -> Table:
namespace = namespace_or_default(self._namespace)
table_name = make_table_name(self._name, namespace)
Expand All @@ -92,7 +218,7 @@ def create_if_not_exists(self, schema: pa.Schema) -> Table:

def tables(
name: str,
catalog: str = "default",
catalog: Union[str, Catalog] = "default",
namespace: Optional[str] = None
) -> TableReference:
"""
Expand All @@ -101,11 +227,16 @@ def tables(

Args:
`name` (str): The name of the table to load.
`catalog` (str): The name of the catalog to use. "default" by default.
`catalog` (Union[str, Catalog]): The name of the catalog or the actual
catalog to use. "default" is the default value. You can pass in an
actual catalog object for testing purposes.
`namespace` (Optional[str]): The namespace in which to load the table.

Returns:
TableReference: A reference to a table to be resolved with `create` or `load`
"""
if isinstance(catalog, str):
catalog = load_catalog(catalog)

ctx = TowerContext.build()
return TableReference(ctx, catalog, name, namespace)
10 changes: 10 additions & 0 deletions src/tower/polars.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
try:
import polars as _polars
# Re-export everything from polars
from polars import *

# Or if you prefer, you can be explicit about what you re-export
# from polars import DataFrame, Series, etc.
except ImportError:
_polars = None
# Set specific names to None if you're using explicit imports
6 changes: 6 additions & 0 deletions src/tower/pyarrow.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
try:
import pyarrow as _pyarrow
# Re-export everything
from pyarrow import *
except ImportError:
_pyarrow = None
17 changes: 17 additions & 0 deletions src/tower/pyiceberg.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
try:
import pyiceberg as _pyiceberg
# Re-export everything
from pyiceberg import *
except ImportError:
_pyiceberg = None


# Dynamic dispatch for submodules, as relevant.
def __getattr__(name):
"""Forward attribute access to the original module."""
return getattr(_pyiceberg, name)

# Optionally, also set up the module to handle subpackage imports
# This requires Python 3.7+
def __dir__():
return dir(_pyiceberg)
Loading