diff --git a/pyproject.toml b/pyproject.toml index bd75e82..2e5f3a8 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -23,14 +23,15 @@ authors = [ {name = "Zeppelin Bend", email = "oss@zepben.com"} ] dependencies = [ - "zepben.eas==0.19.0", - "zepben.ewb==1.0.0b7", + "zepben.eas==0.23.0", + "zepben.ewb==1.0.3", "numba==0.60.0", "geojson==2.5.0", "gql[requests]==3.4.1", "geopandas", "pandas", - "shapely" + "shapely", + "tqdm" ] classifiers = [ "Programming Language :: Python :: 3", diff --git a/src/zepben/examples/energy_consumer_device_hierarchy.py b/src/zepben/examples/energy_consumer_device_hierarchy.py index 7d2b4fa..d39dfe0 100644 --- a/src/zepben/examples/energy_consumer_device_hierarchy.py +++ b/src/zepben/examples/energy_consumer_device_hierarchy.py @@ -7,14 +7,13 @@ import asyncio import json import os -from dataclasses import dataclass -from multiprocessing import Pool -from typing import Union - import pandas as pd -from zepben.ewb import NetworkConsumerClient, connect_with_token, Tracing, upstream, EnergyConsumer, NetworkTraceStep, StepContext, PowerTransformer, \ - TransformerFunctionKind, Breaker, ConductingEquipment, Fuse, IdentifiedObject, NetworkTrace, Feeder -from zepben.protobuf.nc.nc_requests_pb2 import IncludedEnergizedContainers + +from typing import Dict, Callable, List +from dataclasses import dataclass +from zepben.ewb import connect_with_token, NetworkConsumerClient, Feeder, Tracing, downstream, StepActionWithContextValue, \ + NetworkTraceStep, EnergyConsumer, StepContext, IdentifiedObject, Breaker, Fuse, PowerTransformer, TransformerFunctionKind, TreeNode, EquipmentTreeBuilder, \ + ConductingEquipment, NetworkTrace, upstream, IncludedEnergizedContainers @dataclass @@ -35,19 +34,80 @@ def _get_client(): with open('config.json') as f: config = json.load(f) - # Connect to server - channel = connect_with_token( - host=config["host"], - access_token=config["access_token"], - rpc_port=config['rpc_port'], - ca_filename=config['ca_path'] - ) + # Connect to server + channel = connect_with_token(**config) return NetworkConsumerClient(channel) -def _get_equipment_tree_trace(up_data: dict) -> NetworkTrace: - def step_action(step: NetworkTraceStep, _: StepContext): - to_equip: ConductingEquipment = step.path.to_equipment +async def get_feeders(_client=None) -> Dict[str, Feeder]: + _feeders = (await (_client or _get_client()).get_network_hierarchy()).result.feeders + return _feeders + + +async def get_feeder_equipment(client: NetworkConsumerClient, feeder_mrid: str) -> None: + """Get all objects under the feeder, including LV Feeders""" + (await client.get_equipment_container( + feeder_mrid, + include_energized_containers=IncludedEnergizedContainers.LV_FEEDERS + )).throw_on_error() + + +async def trace_from_energy_consumers(feeder_mrid: str, client=None): + """ + Least efficient/the slowest + Inefficient upstream tracing example. + Trace upstream from every EnergyConsumer. + """ + client = client or _get_client() + await get_feeder_equipment(client, feeder_mrid) + + def _get_equipment_tree_trace(up_data: dict) -> NetworkTrace: + def step_action(step: NetworkTraceStep, _: StepContext): + to_equip: ConductingEquipment = step.path.to_equipment + + if isinstance(to_equip, Breaker): + if not up_data.get('breaker'): + up_data['breaker'] = to_equip + elif isinstance(to_equip, Fuse): + if not up_data.get('upstream_switch'): + up_data['upstream_switch'] = to_equip + elif isinstance(to_equip, PowerTransformer): + if not up_data.get('distribution_power_transformer'): + up_data['distribution_power_transformer'] = to_equip + elif not up_data.get('regulator') and to_equip.function == TransformerFunctionKind.voltageRegulator: + up_data['regulator'] = to_equip + + return ( + Tracing.network_trace() + .add_condition(upstream()) + .add_step_action(step_action) + ) + + feeder = client.service.get(feeder_mrid, Feeder) + + energy_consumers = [] + for lvf in feeder.normal_energized_lv_feeders: + for ce in lvf.equipment: + if isinstance(ce, EnergyConsumer): + up_data = {'feeder': feeder_mrid, 'energy_consumer_mrid': ce.mrid} + + # Trace upstream from EnergyConsumer. + await _get_equipment_tree_trace(up_data).run(ce) + energy_consumers.append(_build_row(up_data)) + + write_csv(energy_consumers, feeder.mrid) + + +async def trace_from_feeder_downstream(feeder_mrid: str, client=None): + """ + More memory use than `trace_from_feeder_context`, more efficient/faster than `trace_from_energy_consumers` + Build an equipment tree of everything downstream of the feeder. + Use the Equipment tree to recurse through parent equipment of all EC's and get the equipment we are interested in. + """ + + def process_leaf(up_data: dict, leaf: TreeNode): + to_equip: IdentifiedObject = leaf.identified_object + if isinstance(to_equip, Breaker): if not up_data.get('breaker'): up_data['breaker'] = to_equip @@ -60,46 +120,89 @@ def step_action(step: NetworkTraceStep, _: StepContext): elif not up_data.get('regulator') and to_equip.function == TransformerFunctionKind.voltageRegulator: up_data['regulator'] = to_equip - return ( + client = client or _get_client() + await get_feeder_equipment(client, feeder_mrid) + + builder = EquipmentTreeBuilder() + + feeder = client.service.get(feeder_mrid, Feeder) + await ( Tracing.network_trace() - .add_condition(upstream()) - .add_step_action(step_action) - ) + .add_condition(downstream()) + .add_step_action(builder) + ).run(getattr(feeder, 'normal_head_terminal')) + + energy_consumers = [] + for leaf in (l for l in builder.leaves if isinstance((ec := l.identified_object), EnergyConsumer)): + ec_data = {'feeder': feeder.mrid, 'energy_consumer_mrid': ec.mrid} -async def get_feeders(): - client = _get_client() + def _process(_leaf): + process_leaf(ec_data, _leaf) + if _leaf.parent: + _process(_leaf.parent) - _feeders = (await client.get_network_hierarchy()).result.feeders - return _feeders + _process(leaf) + row = _build_row(ec_data) + energy_consumers.append(row) -async def trace_from_energy_consumers(feeder): + write_csv(energy_consumers, feeder.mrid) + + +async def trace_from_feeder_context(feeder_mrid: str, client=None): """ - Fetch the equipment container from the given feeder, then trace upstream from every EnergyConsumer - and create a CSV with the relevant information. + Most efficient/fastest. + trace downstream from the feeder recording relevant information using `NetworkTrace` `StepContext`. """ - client = _get_client() - print(f'processing feeder {feeder}') + client = client or _get_client() # Get all objects under the feeder, including Substations and LV Feeders - (await client.get_equipment_container(feeder, include_energized_containers=IncludedEnergizedContainers.INCLUDE_ENERGIZED_LV_FEEDERS)).throw_on_error() - network = client.service - f = network.get(feeder, Feeder) + await get_feeder_equipment(client, feeder_mrid) energy_consumers = [] - for lvf in f.normal_energized_lv_feeders: - for ce in lvf.equipment: - if isinstance(ce, EnergyConsumer): - up_data = {'feeder': feeder, 'energy_consumer_mrid': ce.mrid} - # Trace upstream from EnergyConsumer. - await _get_equipment_tree_trace(up_data).run(ce) - energy_consumers.append(_build_row(up_data)) + feeder = client.service.get(feeder_mrid, Feeder) - csv_sfx = "energy_consumers.csv" + class StepActionWithContext(StepActionWithContextValue): + def _apply(self, item: NetworkTraceStep, context: StepContext): + if isinstance((ec := item.path.to_equipment), EnergyConsumer): + nonlocal energy_consumers + data = self.get_context_value(context) + data.update({'feeder': feeder.mrid, 'energy_consumer_mrid': ec.mrid}) + + row = _build_row(data) + energy_consumers.append(row) + + def compute_next_value(self, next_item: NetworkTraceStep, current_item: NetworkTraceStep, current_value: Dict[str, IdentifiedObject]): + data = dict(current_value) + equip = next_item.path.to_equipment + if isinstance(equip, Breaker): + data['breaker'] = equip + elif isinstance(equip, Fuse): + data['upstream_switch'] = equip + elif isinstance(equip, PowerTransformer): + if equip.function == TransformerFunctionKind.distributionTransformer: + data['distribution_power_transformer'] = equip + elif equip.function == TransformerFunctionKind.voltageRegulator: + data['regulator'] = equip + return data + + def compute_initial_value(self, item: NetworkTraceStep): + return {} + + await ( + Tracing.network_trace() + .add_condition(downstream()) + .add_step_action(StepActionWithContext('key')) + ).run(getattr(feeder, 'normal_head_terminal')) + + write_csv(energy_consumers, feeder.mrid) + + +def write_csv(energy_consumers: List[EnergyConsumerDeviceHierarchy], feeder_mrid: str): network_objects = pd.DataFrame(energy_consumers) os.makedirs("csvs", exist_ok=True) - network_objects.to_csv(f"csvs/{f.mrid}_{csv_sfx}", index=False) + network_objects.to_csv(f"csvs/{feeder_mrid}_energy_consumers.csv", index=False) class NullEquipment: @@ -108,7 +211,7 @@ class NullEquipment: name = None -def _build_row(up_data: dict[str, Union[IdentifiedObject, str]]) -> EnergyConsumerDeviceHierarchy: +def _build_row(up_data: dict[str, IdentifiedObject | str]) -> EnergyConsumerDeviceHierarchy: return EnergyConsumerDeviceHierarchy( energy_consumer_mrid=up_data['energy_consumer_mrid'], upstream_switch_mrid=(up_data.get('upstream_switch') or NullEquipment).mrid, @@ -122,22 +225,45 @@ def _build_row(up_data: dict[str, Union[IdentifiedObject, str]]) -> EnergyConsum ) -def process_target(feeder): - asyncio.run(trace_from_energy_consumers(feeder)) +def process_feeders_sequentially(): + async def main_async(trace_type: Callable): + """ + Fetch the equipment container from the given feeder and create a CSV with the relevant information. + Differences between the functions passable as `trace_type` are documented in the relevant docstrings. + `trace_type` must be one of the following: + - `trace_from_energy_consumers` + - `trace_from_energy_consumers_with_context` + - `trace_from_feeder_downstream` + + """ + from tqdm import tqdm + client = _get_client() + feeders = [""] + # feeders = list(await get_feeders(client)) # Uncomment to process all feeders + for _feeder in tqdm(feeders): + await trace_type(_feeder, client) + + # Uncomment to run other trace functions + asyncio.run(main_async(trace_from_feeder_context)) + # asyncio.run(main_async(trace_from_feeder_downstream)) + # asyncio.run(main_async(trace_from_energy_consumers)) + + +def process_feeders_concurrently(): + def multi_proc(_feeder): + # Uncomment to run other trace functions + asyncio.run(trace_from_feeder_context(_feeder)) + # asyncio.run(trace_from_feeder_downstream(_feeder)) + # asyncio.run(trace_from_energy_consumers(_feeder)) -if __name__ == "__main__": # Get a list of feeders before entering main compute section of script. - feeders = asyncio.run(get_feeders()) + feeders = list(asyncio.run(get_feeders())) - # Spin up a multiprocess pool of $CPU_COUNT processes to handle the workload, otherwise we saturate a single cpu core and it's slow. - cpus = os.cpu_count() - print(f'Spawning {cpus} processes') - pool = Pool(cpus) + from tqdm.contrib.concurrent import process_map + process_map(multi_proc, feeders, max_workers=int(os.cpu_count() / 2)) - print(f'mapping to process pool') - pool.map(process_target, feeders) - print('finishing remaining processes') - pool.close() - pool.join() +if __name__ == "__main__": + process_feeders_sequentially() + # process_feeders_concurrently() # Uncomment and comment sequentially above to multi-process, note this is resource intensive and may cause issues. diff --git a/src/zepben/examples/energy_consumer_device_hierarchy_downstream.py b/src/zepben/examples/energy_consumer_device_hierarchy_downstream.py deleted file mode 100644 index aa745af..0000000 --- a/src/zepben/examples/energy_consumer_device_hierarchy_downstream.py +++ /dev/null @@ -1,142 +0,0 @@ -# Copyright 2025 Zeppelin Bend Pty Ltd -# -# This Source Code Form is subject to the terms of the Mozilla Public -# License, v. 2.0. If a copy of the MPL was not distributed with this -# file, You can obtain one at https://mozilla.org/MPL/2.0/. - -import asyncio -import json -import os -from dataclasses import dataclass -from typing import Dict - -import pandas as pd -from zepben.ewb import NetworkConsumerClient, connect_with_token, Tracing, EnergyConsumer, PowerTransformer, \ - TransformerFunctionKind, Breaker, Fuse, IdentifiedObject, EquipmentTreeBuilder, downstream, TreeNode, \ - Feeder, IncludedEnergizedContainers - - -@dataclass -class EnergyConsumerDeviceHierarchy: - energy_consumer_mrid: str - lv_circuit_name: str - upstream_switch_mrid: str - lv_circuit_name: str - upstream_switch_class: str - distribution_power_transformer_mrid: str - distribution_power_transformer_name: str - regulator_mrid: str - breaker_mrid: str - feeder_mrid: str - - -def _get_client(): - with open('config.json') as f: - config = json.load(f) - - # Connect to server - channel = connect_with_token( - host=config["host"], - access_token=config["access_token"], - rpc_port=config['rpc_port'], - ca_filename=config['ca_path'] - ) - return NetworkConsumerClient(channel) - - -async def get_feeders() -> Dict[str, Feeder]: - _feeders = (await _get_client().get_network_hierarchy()).result.feeders - return _feeders - - -def process_leaf(up_data: dict, leaf: TreeNode): - to_equip: IdentifiedObject = leaf.identified_object - - if isinstance(to_equip, Breaker): - if not up_data.get('breaker'): - up_data['breaker'] = to_equip - elif isinstance(to_equip, Fuse): - if not up_data.get('upstream_switch'): - up_data['upstream_switch'] = to_equip - elif isinstance(to_equip, PowerTransformer): - if not up_data.get('distribution_power_transformer'): - up_data['distribution_power_transformer'] = to_equip - elif not up_data.get('regulator') and to_equip.function == TransformerFunctionKind.voltageRegulator: - up_data['regulator'] = to_equip - - -async def trace_from_feeder(feeder_mrid: str): - """ - Fetch the equipment container from the given feeder and build an equipment tree of everything downstream of the feeder. - Use the Equipment tree to traverse upstream of all EC's and get the equipment we are interested in. - Finally, create a CSV with the relevant information. - """ - client = _get_client() - print(f'processing feeder {feeder_mrid}') - - # Get all objects under the feeder, including Substations and LV Feeders - await client.get_equipment_container( - feeder_mrid, - include_energized_containers = IncludedEnergizedContainers.INCLUDE_ENERGIZED_LV_FEEDERS - ) - - feeder = client.service.get(feeder_mrid, Feeder) - - builder = EquipmentTreeBuilder() - - await ( - Tracing.network_trace() - .add_condition(downstream()) - .add_step_action(builder) - ).run(getattr(feeder, 'normal_head_terminal')) - - energy_consumers = [] - for up in client.service.objects(EnergyConsumer): - # iterate up tree from EC. - up_data = {'feeder': feeder.mrid, 'energy_consumer_mrid': up.mrid} - def _process(leaf): - process_leaf(up_data, leaf) - if leaf.parent: - _process(leaf.parent) - try: - _process(builder.leaves[up.mrid]) - except KeyError: - # If the up is not in the Equipment tree builders leaves, skip it - continue - - row = _build_row(up_data) - energy_consumers.append(row) - - csv_sfx = "energy_consumers.csv" - network_objects = pd.DataFrame(energy_consumers) - os.makedirs("csvs", exist_ok=True) - network_objects.to_csv(f"csvs/{feeder.mrid}_{csv_sfx}", index=False) - - -class NullEquipment: - """empty class to simplify code below in the case of an equipment not existing in that position of the network""" - mrid = None - name = None - - -def _build_row(up_data: dict[str, IdentifiedObject | str]) -> EnergyConsumerDeviceHierarchy: - return EnergyConsumerDeviceHierarchy( - energy_consumer_mrid = up_data['energy_consumer_mrid'], - upstream_switch_mrid = (up_data.get('upstream_switch') or NullEquipment).mrid, - lv_circuit_name = (up_data.get('upstream_switch') or NullEquipment).name, - upstream_switch_class = type(up_data.get('upstream_switch')).__name__, - distribution_power_transformer_mrid = (up_data.get('distribution_power_transformer') or NullEquipment).mrid, - distribution_power_transformer_name = (up_data.get('distribution_power_transformer') or NullEquipment).name, - regulator_mrid = (up_data.get('regulator') or NullEquipment).mrid, - breaker_mrid = (up_data.get('breaker') or NullEquipment).mrid, - feeder_mrid = up_data.get('feeder'), - ) - - -if __name__ == "__main__": - # Get a list of feeders before entering main compute section of script. - feeders = asyncio.run(get_feeders()) - - print('processing feeders') - for _feeder in feeders: - asyncio.run(trace_from_feeder(_feeder))