Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions changelog.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
# Zepben Python SDK
## [1.1.0] - UNRELEASED
### Breaking Changes
* None.
* Updated `EwbDataFilePaths` to be an abstract class that supports variants. Added `LocalEwbDataFilePaths` which is a local file system implementation of
`EwbDataFilePaths`, and should be used in place of the old `EwbDataFilePaths`.

### New Features
* None.
Expand Down Expand Up @@ -176,7 +177,7 @@
* `RegulatingControl.ratedCurrent`
* `Sensor.relayFunctions`
* `UsagePoint.approvedInverterCapacity`
* using `EquipmentTreeBuilder` more then once per interpreter will no longer cause the `roots` to contain more objects then it should due to `_roots` being a
* using `EquipmentTreeBuilder` more then once per interpreter will no longer cause the `roots` to contain more objects then it should due to `_roots` being a
class var
* Errors when initiating gRPC connections will now properly be propagated to users.

Expand Down
1 change: 1 addition & 0 deletions src/zepben/ewb/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,7 @@

from zepben.ewb.database.paths.database_type import *
from zepben.ewb.database.paths.ewb_data_file_paths import *
from zepben.ewb.database.paths.local_ewb_data_file_paths import *

from zepben.ewb.database.sql.column import *
from zepben.ewb.database.sqlite.tables.sqlite_table import *
Expand Down
289 changes: 122 additions & 167 deletions src/zepben/ewb/database/paths/ewb_data_file_paths.py
Original file line number Diff line number Diff line change
@@ -1,237 +1,192 @@
# Copyright 2024 Zeppelin Bend Pty Ltd
# Copyright 2025 Zeppelin Bend Pty Ltd
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at https://mozilla.org/MPL/2.0/.

__all__ = ['EwbDataFilePaths']

from abc import ABC, abstractmethod
from datetime import date, timedelta
from pathlib import Path
from typing import Callable, Iterator, Optional, List
from typing import Optional, List, Generator

from zepben.ewb import require
from zepben.ewb.database.paths.database_type import DatabaseType


class EwbDataFilePaths:
class EwbDataFilePaths(ABC):
"""Provides paths to all the various data files / folders used by EWB."""

def __init__(self, base_dir: Path,
create_path: bool = False,
create_directories_func: Callable[[Path], None] = lambda it: it.mkdir(parents=True),
is_directory: Callable[[Path], bool] = Path.is_dir,
exists: Callable[[Path], bool] = Path.exists,
list_files: Callable[[Path], Iterator[Path]] = Path.iterdir):
"""
:param base_dir: The root directory of the EWB data structure.
:param create_path: Create the root directory (and any missing parent folders) if it does not exist.
"""
self.create_directories_func = create_directories_func
self.is_directory = is_directory
self.exists = exists
self.list_files = list_files
self._base_dir = base_dir

if create_path:
self.create_directories_func(base_dir)

require(self.is_directory(base_dir), lambda: f"base_dir must be a directory")

@property
def base_dir(self):
"""The root directory of the EWB data structure."""
return self._base_dir

def customer(self, database_date: date) -> Path:
"""
Determine the path to the "customers" database for the specified date.

:param database_date: The :class:`date` to use for the "customers" database.
:return: The :class:`path` to the "customers" database for the specified date.
"""
return self._to_dated_path(database_date, DatabaseType.CUSTOMER.file_descriptor)

def diagram(self, database_date: date) -> Path:
"""
Determine the path to the "diagrams" database for the specified date.

:param database_date: The :class:`date` to use for the "diagrams" database.
:return: The :class:`path` to the "diagrams" database for the specified date.
"""
return self._to_dated_path(database_date, DatabaseType.DIAGRAM.file_descriptor)

def measurement(self, database_date: date) -> Path:
"""
Determine the path to the "measurements" database for the specified date.

:param database_date: The :class:`date` to use for the "measurements" database.
:return: The :class:`path` to the "measurements" database for the specified date.
"""
return self._to_dated_path(database_date, DatabaseType.MEASUREMENT.file_descriptor)

def network_model(self, database_date: date) -> Path:
"""
Determine the path to the "network model" database for the specified date.

:param database_date: The :class:`date` to use for the "network model" database.
:return: The :class:`path` to the "network model" database for the specified date.
"""
return self._to_dated_path(database_date, DatabaseType.NETWORK_MODEL.file_descriptor)

def tile_cache(self, database_date: date) -> Path:
"""
Determine the path to the "tile cache" database for the specified date.

:param database_date: The :class:`date` to use for the "tile cache" database.
:return: The :class:`path` to the "tile cache" database for the specified date.
"""
return self._to_dated_path(database_date, DatabaseType.TILE_CACHE.file_descriptor)

def energy_reading(self, database_date: date) -> Path:
"""
Determine the path to the "energy readings" database for the specified date.

:param database_date: The :class:`date` to use for the "energy readings" database.
:return: The :class:`path` to the "energy readings" database for the specified date.
"""
return self._to_dated_path(database_date, DatabaseType.ENERGY_READING.file_descriptor)

def energy_readings_index(self) -> Path:
"""
Determine the path to the "energy readings index" database.

:return: The :class:`path` to the "energy readings index" database.
"""
return self._base_dir.joinpath(f"{DatabaseType.ENERGY_READINGS_INDEX.file_descriptor}.sqlite")
VARIANTS_PATH: str = "variants"
"""
The folder containing the variants. Will be placed under the dated folder alongside the network model database.
"""

def load_aggregator_meters_by_date(self) -> Path:
def resolve(self, database_type: DatabaseType, database_date: Optional[date] = None, variant: Optional[str] = None) -> Path:
"""
Determine the path to the "load aggregator meters-by-date" database.
Resolves the :class:`Path` to the database file for the specified :class:`DatabaseType`, within the specified `database_date`
and optional `variant` when `DatabaseType.per_date` is set to true.

:return: The :class:`path` to the "load aggregator meters-by-date" database.
"""
return self._base_dir.joinpath(f"{DatabaseType.LOAD_AGGREGATOR_METERS_BY_DATE.file_descriptor}.sqlite")

def weather_reading(self) -> Path:
"""
Determine the path to the "weather readings" database.
:param database_type: The :class:`DatabaseType` to use for the database :class:`Path`.
:param database_date: The :class:`date` to use for the database :class:`Path`. Required when `database_type.per_date` is true, otherwise must be `None`.
:param variant: The optional name of the variant containing the database.

:return: The :class:`path` to the "weather readings" database.
:return: The :class:`Path` to the :class:`DatabaseType` database file.
"""
return self._base_dir.joinpath(f"{DatabaseType.WEATHER_READING.file_descriptor}.sqlite")

def results_cache(self) -> Path:
"""
Determine the path to the "results cache" database.

:return: The :class:`path` to the "results cache" database.
"""
return self._base_dir.joinpath(f"{DatabaseType.RESULTS_CACHE.file_descriptor}.sqlite")
if database_date is not None:
require(database_type.per_date, lambda: "database_type must have its per_date set to True to use this method with a database_date.")
if variant is not None:
return self.resolve_database(self._to_dated_variant_path(database_type, database_date, variant))
else:
return self.resolve_database(self._to_dated_path(database_type, database_date))
else:
require(not database_type.per_date, lambda: "database_type must have its per_date set to False to use this method without a database_date.")
return self.resolve_database(Path(self._database_name(database_type)))

@abstractmethod
def create_directories(self, database_date: date) -> Path:
"""
Create the directories required to have a valid path for the specified date.

:param database_date: The :class:`date` required in the path.
:return: The :class:`path` to the directory for the `database_date`.
"""
date_path = self._base_dir.joinpath(str(database_date))
if self.exists(date_path):
return date_path
else:
self.create_directories_func(date_path)
return date_path

def _to_dated_path(self, database_date: date, file: str) -> Path:
return self._base_dir.joinpath(str(database_date), f"{database_date}-{file}.sqlite")

def _check_exists(self, database_type: DatabaseType, database_date: date) -> bool:
:return: The :class:`Path` to the directory for the `database_date`.
"""
Check if a database of the specified type and date exists.
raise NotImplemented

:param database_type: The type of database to search for.
:param database_date: The date to check.
:return: `True` if a database of the specified `database_type` and `database_date` exists in the date path.
"""
if not database_type.per_date:
raise ValueError("INTERNAL ERROR: Should only be calling `checkExists` for `perDate` files.")

if database_type == DatabaseType.CUSTOMER:
model_path = self.customer(database_date)
elif database_type == DatabaseType.DIAGRAM:
model_path = self.diagram(database_date)
elif database_type == DatabaseType.MEASUREMENT:
model_path = self.measurement(database_date)
elif database_type == DatabaseType.NETWORK_MODEL:
model_path = self.network_model(database_date)
elif database_type == DatabaseType.TILE_CACHE:
model_path = self.tile_cache(database_date)
elif database_type == DatabaseType.ENERGY_READING:
model_path = self.energy_reading(database_date)
else:
raise ValueError(
"INTERNAL ERROR: Should only be calling `check_exists` for `per_date` files, which should all be covered above, so go ahead and add it.")
return self.exists(model_path)

def find_closest(self, database_type: DatabaseType, max_days_to_search: int = 999, target_date: date = date.today(), search_forwards: bool = False) -> \
Optional[date]:
def find_closest(
self,
database_type: DatabaseType,
max_days_to_search: int = 999999,
target_date: date = date.today(),
search_forwards: bool = False
) -> Optional[date]:
"""
Find the closest date with a usable database of the specified type.

:param database_type: The type of database to search for.
:param max_days_to_search: The maximum number of days to search for a valid database.
:param target_date: The target :class:`date`. Defaults to today.
:param search_forwards: Indicates the search should also look forwards in time from `start_date` for a valid file. Defaults to reverse search only.
:return: The closest :class:`date` to `database_date` with a valid database of `database_type` within the search parameters, or `None` if no valid database was found.
:param target_date: The target date. Defaults to today.
:param search_forwards: Indicates the search should also look forwards in time from `target_date` for a valid file. Defaults to reverse search only.

:return: The closest :class:`date` to `target_date` with a valid database of `database_type` within the search parameters, or null if no valid database
was found.
"""
if not database_type.per_date:
return None

if self._check_exists(database_type, target_date):
descendants = list(self.enumerate_descendants())
if self._check_exists(descendants, database_type, target_date):
return target_date

offset = 1

while offset <= max_days_to_search:
offset_days = timedelta(offset)
try:
previous_date = target_date - offset_days
if self._check_exists(database_type, previous_date):
if self._check_exists(descendants, database_type, previous_date):
return previous_date
except OverflowError:
pass

if search_forwards:
try:
forward_date = target_date + offset_days
if self._check_exists(database_type, forward_date):
if self._check_exists(descendants, database_type, forward_date):
return forward_date
except OverflowError:
pass

offset += 1

return None

def _get_available_dates_for(self, database_type: DatabaseType) -> List[date]:
def get_available_dates_for(self, database_type: DatabaseType) -> List[date]:
"""
Find available databases specified by :class:`DatabaseType` in data path.

:param database_type: The type of database to search for.

:return: list of :class:`date`'s for which this specified :class:`DatabaseType` databases exist in the data path.
"""
if not database_type.per_date:
raise ValueError(
"INTERNAL ERROR: Should only be calling `_get_available_dates_for` for `per_date` files.")
"INTERNAL ERROR: Should only be calling `get_available_dates_for` for `per_date` files, "
"which should all be covered above, so go ahead and add it."
)

to_return = list()

for file in self.list_files(self._base_dir):
if self.is_directory(file):
for it in self.enumerate_descendants():
if it.name.endswith(self._database_name(database_type)):
try:
database_date = date.fromisoformat(file.name)
if self.exists(self._to_dated_path(database_date, database_type.file_descriptor)):
to_return.append(database_date)
to_return.append(date.fromisoformat(it.parent.name))
except ValueError:
pass

return sorted(to_return)

def get_available_variants_for(self, target_date: date = date.today()) -> List[str]:
"""
Find available variants for the specified `target_date` in data path.

:param target_date: The target date. Defaults to today.

:return: list of variant names that exist in the data path for the specified `target_date`.
"""
to_return = list()

for it in self.enumerate_descendants():
try:
if (str(it.parent.name).lower() == self.VARIANTS_PATH) and (str(it.parent.parent.name) == str(target_date)):
to_return.append(str(it.name))
except ValueError:
pass

return sorted(to_return)

def get_network_model_databases(self) -> List[date]:
@abstractmethod
def enumerate_descendants(self) -> Generator[Path, None, None]:
"""
Lists the child items of source location.

:return: generator of child items.
"""
raise NotImplemented

@abstractmethod
def resolve_database(self, path: Path) -> Path:
"""
Find available network-model databases in data path.
Resolves the database in the specified source :class:`Path`.

:return: A list of :class:`date`'s for which network-model databases exist in the data path.
:param path: :class:`Path` to the source database file.
:return: :class:`Path` to the local database file.
"""
return self._get_available_dates_for(DatabaseType.NETWORK_MODEL)
raise NotImplemented

def _check_exists(self, descendants: List[Path], database_type: DatabaseType, database_date: date) -> bool:
"""
Check if a database :class:`Path` of the specified :class:`DatabaseType` and :class:`date` exists.

:param descendants: A list of :class:`Path` representing the descendant paths.
:param database_type: The type of database to search for.
:param database_date: The date to check.

:return: True if a database of the specified `database_type` and `database_date` exits in the date path.
"""
for cp in descendants:
if cp.is_relative_to(self._to_dated_path(database_type, database_date)):
return True

return False

def _to_dated_path(self, database_type: DatabaseType, database_date: date) -> Path:
date_str = str(database_date)
return Path(date_str).joinpath(f"{date_str}-{self._database_name(database_type)}")

def _to_dated_variant_path(self, database_type: DatabaseType, database_date: date, variant: str) -> Path:
date_str = str(database_date)
return Path(date_str).joinpath(self.VARIANTS_PATH, variant, f"{date_str}-{self._database_name(database_type)}")

@staticmethod
def _database_name(database_type: DatabaseType) -> str:
return f"{database_type.file_descriptor}.sqlite"
Loading