From 48066658e62c4acc319fbdf154facdd52d610feb Mon Sep 17 00:00:00 2001 From: Max Chesterfield Date: Fri, 1 Aug 2025 18:54:18 +1000 Subject: [PATCH 1/6] needs a tidy up Signed-off-by: Max Chesterfield --- ...mer_device_hierarchy_downstream_context.py | 143 ++++++++++++++++++ ...gy_consumer_device_hierarchy_downstream.py | 58 +++---- 2 files changed, 174 insertions(+), 27 deletions(-) create mode 100644 src/zepben/examples/emergy_consumer_device_hierarchy_downstream_context.py diff --git a/src/zepben/examples/emergy_consumer_device_hierarchy_downstream_context.py b/src/zepben/examples/emergy_consumer_device_hierarchy_downstream_context.py new file mode 100644 index 0000000..4397144 --- /dev/null +++ b/src/zepben/examples/emergy_consumer_device_hierarchy_downstream_context.py @@ -0,0 +1,143 @@ +# Copyright 2025 Zeppelin Bend Pty Ltd +# +# This Source Code Form is subject to the terms of the Mozilla Public +# License, v. 2.0. If a copy of the MPL was not distributed with this +# file, You can obtain one at https://mozilla.org/MPL/2.0/. + +import asyncio +import json +import math +import os +from dataclasses import dataclass +import time +from typing import Dict + +import pandas as pd +from tqdm.contrib.concurrent import process_map +from zepben.evolve import connect_with_token, NetworkConsumerClient, Feeder, Tracing, downstream, StepActionWithContextValue, \ + NetworkTraceStep, EnergyConsumer, StepContext, IdentifiedObject, Breaker, Fuse, PowerTransformer, TransformerFunctionKind +from zepben.protobuf.nc.nc_requests_pb2 import IncludedEnergizedContainers + + +@dataclass +class EnergyConsumerDeviceHierarchy: + energy_consumer_mrid: str + lv_circuit_name: str + upstream_switch_mrid: str + lv_circuit_name: str + upstream_switch_class: str + distribution_power_transformer_mrid: str + distribution_power_transformer_name: str + regulator_mrid: str + breaker_mrid: str + feeder_mrid: str + + +def _get_client(): + with open('config.json') as f: + config = json.load(f) + + # Connect to server + channel = connect_with_token(**config) + return NetworkConsumerClient(channel) + + +async def get_feeders() -> Dict[str, Feeder]: + _feeders = (await _get_client().get_network_hierarchy()).result.feeders + return _feeders + + +async def trace_from_feeder(feeder_mrid: str): + """ + Fetch the equipment container from the given feeder and build an equipment tree of everything downstream of the feeder. + Use the Equipment tree to traverse upstream of all EC's and get the equipment we are interested in. + Finally, create a CSV with the relevant information. + """ + client = _get_client() + # Get all objects under the feeder, including Substations and LV Feeders + (await client.get_equipment_container( + feeder_mrid, + include_energized_containers = IncludedEnergizedContainers.INCLUDE_ENERGIZED_LV_FEEDERS + )).throw_on_error() + feeder = client.service.get(feeder_mrid, Feeder) + + energy_consumers = [] + + class StepActionWithContext(StepActionWithContextValue): + def _apply(self, item: NetworkTraceStep, context: StepContext): + if isinstance((ec := item.path.to_equipment), EnergyConsumer): + nonlocal energy_consumers + data = self.get_context_value(context) + data.update({'feeder': feeder.mrid, 'energy_consumer_mrid': ec.mrid}) + + row = _build_row(data) + energy_consumers.append(row) + + def compute_next_value(self, next_item: NetworkTraceStep, current_item: NetworkTraceStep, current_value: Dict[str, IdentifiedObject]): + data = dict(current_value) + equip = next_item.path.to_equipment + if isinstance(equip, Breaker): + data['breaker'] = equip + elif isinstance(equip, Fuse): + data['upstream_switch'] = equip + elif isinstance(equip, PowerTransformer): + if equip.function == TransformerFunctionKind.distributionTransformer: + data['distribution_power_transformer'] = equip + elif equip.function == TransformerFunctionKind.voltageRegulator: + data['regulator'] = equip + return data + + def compute_initial_value(self, item: NetworkTraceStep): + return {} + + await ( + Tracing.network_trace() + .add_condition(downstream()) + .add_step_action(StepActionWithContext('key')) + ).run(getattr(feeder, 'normal_head_terminal')) + + csv_sfx = "energy_consumers.csv" + network_objects = pd.DataFrame(energy_consumers) + os.makedirs("csvs", exist_ok=True) + network_objects.to_csv(f"csvs/{feeder.mrid}_{csv_sfx}", index=False) + + +def _build_row(up_data: dict[str, IdentifiedObject | str]) -> EnergyConsumerDeviceHierarchy: + return EnergyConsumerDeviceHierarchy( + energy_consumer_mrid = up_data['energy_consumer_mrid'], + upstream_switch_mrid = (up_data.get('upstream_switch') or NullEquipment).mrid, + lv_circuit_name = (up_data.get('upstream_switch') or NullEquipment).name, + upstream_switch_class = type(up_data.get('upstream_switch')).__name__, + distribution_power_transformer_mrid = (up_data.get('distribution_power_transformer') or NullEquipment).mrid, + distribution_power_transformer_name = (up_data.get('distribution_power_transformer') or NullEquipment).name, + regulator_mrid = (up_data.get('regulator') or NullEquipment).mrid, + breaker_mrid = (up_data.get('breaker') or NullEquipment).mrid, + feeder_mrid = up_data.get('feeder'), + ) + + +class NullEquipment: + """empty class to simplify code below in the case of an equipment not existing in that position of the network""" + mrid = None + name = None + + +def main(_feeder): + asyncio.run(trace_from_feeder(_feeder)) + + +if __name__ == "__main__": + start = time.time() + # Get a list of feeders before entering main compute section of script. + feeders = list(asyncio.run(get_feeders())) + + print('processing feeders') + + # Process feeders sequentially. + # for _feeder in tqdm(feeders): + # asyncio.run(trace_from_feeder(_feeder)) + + # Process feeders concurrently. + process_map(main, feeders, max_workers=math.floor(os.cpu_count()/2)) + + print(f'done in {time.time() - start} seconds') diff --git a/src/zepben/examples/energy_consumer_device_hierarchy_downstream.py b/src/zepben/examples/energy_consumer_device_hierarchy_downstream.py index aa745af..3f517a0 100644 --- a/src/zepben/examples/energy_consumer_device_hierarchy_downstream.py +++ b/src/zepben/examples/energy_consumer_device_hierarchy_downstream.py @@ -7,6 +7,8 @@ import asyncio import json import os +import time +import math from dataclasses import dataclass from typing import Dict @@ -14,6 +16,8 @@ from zepben.ewb import NetworkConsumerClient, connect_with_token, Tracing, EnergyConsumer, PowerTransformer, \ TransformerFunctionKind, Breaker, Fuse, IdentifiedObject, EquipmentTreeBuilder, downstream, TreeNode, \ Feeder, IncludedEnergizedContainers +import tqdm +from tqdm.contrib.concurrent import process_map @dataclass @@ -35,12 +39,7 @@ def _get_client(): config = json.load(f) # Connect to server - channel = connect_with_token( - host=config["host"], - access_token=config["access_token"], - rpc_port=config['rpc_port'], - ca_filename=config['ca_path'] - ) + channel = connect_with_token(**config) return NetworkConsumerClient(channel) @@ -72,14 +71,11 @@ async def trace_from_feeder(feeder_mrid: str): Finally, create a CSV with the relevant information. """ client = _get_client() - print(f'processing feeder {feeder_mrid}') - # Get all objects under the feeder, including Substations and LV Feeders - await client.get_equipment_container( + (await client.get_equipment_container( feeder_mrid, include_energized_containers = IncludedEnergizedContainers.INCLUDE_ENERGIZED_LV_FEEDERS - ) - + )).throw_on_error() feeder = client.service.get(feeder_mrid, Feeder) builder = EquipmentTreeBuilder() @@ -91,20 +87,16 @@ async def trace_from_feeder(feeder_mrid: str): ).run(getattr(feeder, 'normal_head_terminal')) energy_consumers = [] - for up in client.service.objects(EnergyConsumer): - # iterate up tree from EC. - up_data = {'feeder': feeder.mrid, 'energy_consumer_mrid': up.mrid} - def _process(leaf): - process_leaf(up_data, leaf) - if leaf.parent: - _process(leaf.parent) - try: - _process(builder.leaves[up.mrid]) - except KeyError: - # If the up is not in the Equipment tree builders leaves, skip it - continue - - row = _build_row(up_data) + + for leaf in (l for l in builder.leaves if isinstance((ec := l.identified_object), EnergyConsumer)): + ec_data = {'feeder': feeder.mrid, 'energy_consumer_mrid': ec.mrid} + def _process(_leaf): + process_leaf(ec_data, _leaf) + if _leaf.parent: + _process(_leaf.parent) + _process(leaf) + + row = _build_row(ec_data) energy_consumers.append(row) csv_sfx = "energy_consumers.csv" @@ -133,10 +125,22 @@ def _build_row(up_data: dict[str, IdentifiedObject | str]) -> EnergyConsumerDevi ) +def main(_feeder): + asyncio.run(trace_from_feeder(_feeder)) + + if __name__ == "__main__": + start = time.time() # Get a list of feeders before entering main compute section of script. - feeders = asyncio.run(get_feeders()) + feeders = list(asyncio.run(get_feeders())) print('processing feeders') - for _feeder in feeders: + + # Process feeders sequentially. + for _feeder in tqdm.tqdm(feeders): asyncio.run(trace_from_feeder(_feeder)) + + # Process feeders concurrently. + #process_map(main, feeders, max_workers=math.floor(os.cpu_count()/2)) + + print(f'done in {time.time() - start} seconds') From 853bd5f8a5c633ff05a461235988b58a384b1c56 Mon Sep 17 00:00:00 2001 From: Max Chesterfield Date: Mon, 4 Aug 2025 14:29:27 +1000 Subject: [PATCH 2/6] combined all different methods into 1 file Easier to maintain, less code dupe Signed-off-by: Max Chesterfield --- .../emergy_consumer_device_hierarchy.py | 266 ++++++++++++++++++ ...mer_device_hierarchy_downstream_context.py | 143 ---------- .../energy_consumer_device_hierarchy.py | 143 ---------- ...gy_consumer_device_hierarchy_downstream.py | 146 ---------- 4 files changed, 266 insertions(+), 432 deletions(-) create mode 100644 src/zepben/examples/emergy_consumer_device_hierarchy.py delete mode 100644 src/zepben/examples/emergy_consumer_device_hierarchy_downstream_context.py delete mode 100644 src/zepben/examples/energy_consumer_device_hierarchy.py delete mode 100644 src/zepben/examples/energy_consumer_device_hierarchy_downstream.py diff --git a/src/zepben/examples/emergy_consumer_device_hierarchy.py b/src/zepben/examples/emergy_consumer_device_hierarchy.py new file mode 100644 index 0000000..c704f82 --- /dev/null +++ b/src/zepben/examples/emergy_consumer_device_hierarchy.py @@ -0,0 +1,266 @@ +# Copyright 2025 Zeppelin Bend Pty Ltd +# +# This Source Code Form is subject to the terms of the Mozilla Public +# License, v. 2.0. If a copy of the MPL was not distributed with this +# file, You can obtain one at https://mozilla.org/MPL/2.0/. + +import asyncio +import json +import os +import pandas as pd + +from typing import Dict, Callable, List +from dataclasses import dataclass + +from zepben.evolve import connect_with_token, NetworkConsumerClient, Feeder, Tracing, downstream, StepActionWithContextValue, \ + NetworkTraceStep, EnergyConsumer, StepContext, IdentifiedObject, Breaker, Fuse, PowerTransformer, TransformerFunctionKind, TreeNode, EquipmentTreeBuilder, \ + ConductingEquipment, NetworkTrace, upstream +from zepben.protobuf.nc.nc_requests_pb2 import IncludedEnergizedContainers + + +@dataclass +class EnergyConsumerDeviceHierarchy: + energy_consumer_mrid: str + lv_circuit_name: str + upstream_switch_mrid: str + lv_circuit_name: str + upstream_switch_class: str + distribution_power_transformer_mrid: str + distribution_power_transformer_name: str + regulator_mrid: str + breaker_mrid: str + feeder_mrid: str + + +def _get_client(): + with open('config.json') as f: + config = json.load(f) + + # Connect to server + channel = connect_with_token(**config) + return NetworkConsumerClient(channel) + + +async def get_feeders(_client = None) -> Dict[str, Feeder]: + _feeders = (await (_client or _get_client()).get_network_hierarchy()).result.feeders + return _feeders + + +async def get_feeder_equipmet(client: NetworkConsumerClient, feeder_mrid: str) -> None: + """Get all objects under the feeder, including LV Feeders""" + (await client.get_equipment_container( + feeder_mrid, + include_energized_containers=IncludedEnergizedContainers.INCLUDE_ENERGIZED_LV_FEEDERS + )).throw_on_error() + + +async def trace_from_energy_consumers(feeder_mrid: str, client=None): + """ + Least efficient/the slowest + Inefficient upstream tracing example. + Trace upstream from every EnergyConsumer. + """ + client = client or _get_client() + await get_feeder_equipmet(client, feeder_mrid) + + def _get_equipment_tree_trace(up_data: dict) -> NetworkTrace: + def step_action(step: NetworkTraceStep, _: StepContext): + to_equip: ConductingEquipment = step.path.to_equipment + + if isinstance(to_equip, Breaker): + if not up_data.get('breaker'): + up_data['breaker'] = to_equip + elif isinstance(to_equip, Fuse): + if not up_data.get('upstream_switch'): + up_data['upstream_switch'] = to_equip + elif isinstance(to_equip, PowerTransformer): + if not up_data.get('distribution_power_transformer'): + up_data['distribution_power_transformer'] = to_equip + elif not up_data.get('regulator') and to_equip.function == TransformerFunctionKind.voltageRegulator: + up_data['regulator'] = to_equip + + return ( + Tracing.network_trace() + .add_condition(upstream()) + .add_step_action(step_action) + ) + + feeder = client.service.get(feeder_mrid, Feeder) + + energy_consumers = [] + for lvf in feeder.normal_energized_lv_feeders: + for ce in lvf.equipment: + if isinstance(ce, EnergyConsumer): + up_data = {'feeder': feeder_mrid, 'energy_consumer_mrid': ce.mrid} + + # Trace upstream from EnergyConsumer. + await _get_equipment_tree_trace(up_data).run(ce) + energy_consumers.append(_build_row(up_data)) + + write_csv(energy_consumers, feeder.mrid) + + +async def trace_from_feeder_downstream(feeder_mrid: str, client=None): + """ + More memory use than `trace_from_feeder_context`, more efficient/faster than `trace_from_energy_consumers` + Build an equipment tree of everything downstream of the feeder. + Use the Equipment tree to recurse through parent equipment of all EC's and get the equipment we are interested in. + """ + def process_leaf(up_data: dict, leaf: TreeNode): + to_equip: IdentifiedObject = leaf.identified_object + + if isinstance(to_equip, Breaker): + if not up_data.get('breaker'): + up_data['breaker'] = to_equip + elif isinstance(to_equip, Fuse): + if not up_data.get('upstream_switch'): + up_data['upstream_switch'] = to_equip + elif isinstance(to_equip, PowerTransformer): + if not up_data.get('distribution_power_transformer'): + up_data['distribution_power_transformer'] = to_equip + elif not up_data.get('regulator') and to_equip.function == TransformerFunctionKind.voltageRegulator: + up_data['regulator'] = to_equip + + client = client or _get_client() + await get_feeder_equipmet(client, feeder_mrid) + + builder = EquipmentTreeBuilder() + + feeder = client.service.get(feeder_mrid, Feeder) + await ( + Tracing.network_trace() + .add_condition(downstream()) + .add_step_action(builder) + ).run(getattr(feeder, 'normal_head_terminal')) + + energy_consumers = [] + + for leaf in (l for l in builder.leaves if isinstance((ec := l.identified_object), EnergyConsumer)): + ec_data = {'feeder': feeder.mrid, 'energy_consumer_mrid': ec.mrid} + def _process(_leaf): + process_leaf(ec_data, _leaf) + if _leaf.parent: + _process(_leaf.parent) + _process(leaf) + + row = _build_row(ec_data) + energy_consumers.append(row) + + write_csv(energy_consumers, feeder.mrid) + + +async def trace_from_feeder_context(feeder_mrid: str, client=None): + """ + Most efficient/fastest. + trace downstream from the feeder recording relevant information using `NetworkTrace` `StepContext`. + """ + client = client or _get_client() + # Get all objects under the feeder, including Substations and LV Feeders + await get_feeder_equipmet(client, feeder_mrid) + + energy_consumers = [] + + feeder = client.service.get(feeder_mrid, Feeder) + + class StepActionWithContext(StepActionWithContextValue): + def _apply(self, item: NetworkTraceStep, context: StepContext): + if isinstance((ec := item.path.to_equipment), EnergyConsumer): + nonlocal energy_consumers + data = self.get_context_value(context) + data.update({'feeder': feeder.mrid, 'energy_consumer_mrid': ec.mrid}) + + row = _build_row(data) + energy_consumers.append(row) + + def compute_next_value(self, next_item: NetworkTraceStep, current_item: NetworkTraceStep, current_value: Dict[str, IdentifiedObject]): + data = dict(current_value) + equip = next_item.path.to_equipment + if isinstance(equip, Breaker): + data['breaker'] = equip + elif isinstance(equip, Fuse): + data['upstream_switch'] = equip + elif isinstance(equip, PowerTransformer): + if equip.function == TransformerFunctionKind.distributionTransformer: + data['distribution_power_transformer'] = equip + elif equip.function == TransformerFunctionKind.voltageRegulator: + data['regulator'] = equip + return data + + def compute_initial_value(self, item: NetworkTraceStep): + return {} + + await ( + Tracing.network_trace() + .add_condition(downstream()) + .add_step_action(StepActionWithContext('key')) + ).run(getattr(feeder, 'normal_head_terminal')) + + write_csv(energy_consumers, feeder.mrid) + + +def write_csv(energy_consumers: List[EnergyConsumerDeviceHierarchy], feeder_mrid: str): + network_objects = pd.DataFrame(energy_consumers) + os.makedirs("csvs", exist_ok=True) + network_objects.to_csv(f"csvs/{feeder_mrid}_energy_consumers.csv", index=False) + + +class NullEquipment: + """empty class to simplify code below in the case of an equipment not existing in that position of the network""" + mrid = None + name = None + + +def _build_row(up_data: dict[str, IdentifiedObject | str]) -> EnergyConsumerDeviceHierarchy: + return EnergyConsumerDeviceHierarchy( + energy_consumer_mrid = up_data['energy_consumer_mrid'], + upstream_switch_mrid = (up_data.get('upstream_switch') or NullEquipment).mrid, + lv_circuit_name = (up_data.get('upstream_switch') or NullEquipment).name, + upstream_switch_class = type(up_data.get('upstream_switch')).__name__, + distribution_power_transformer_mrid = (up_data.get('distribution_power_transformer') or NullEquipment).mrid, + distribution_power_transformer_name = (up_data.get('distribution_power_transformer') or NullEquipment).name, + regulator_mrid = (up_data.get('regulator') or NullEquipment).mrid, + breaker_mrid = (up_data.get('breaker') or NullEquipment).mrid, + feeder_mrid = up_data.get('feeder'), + ) + + +def process_feeders_sequentially(): + async def main_async(trace_type: Callable): + """ + Fetch the equipment container from the given feeder and create a CSV with the relevant information. + Differences between the functions passable as `trace_type` are documented in the relevant docstrings. + + `trace_type` must be one of the following: + - `trace_from_energy_consumers` + - `trace_from_energy_consumers_with_context` + - `trace_from_feeder_downstream` + + """ + from tqdm import tqdm + client = _get_client() + feeders = list(await get_feeders(client)) + for _feeder in tqdm(feeders): + await trace_type(_feeder, client) + + # Uncomment to run other trace functions + asyncio.run(main_async(trace_from_feeder_context)) + #asyncio.run(main_async(trace_from_feeder_downstream)) + #asyncio.run(main_async(trace_from_energy_consumers)) + + +def process_feeders_concurrently(): + def multi_proc(_feeder): + # Uncomment to run other trace functions + asyncio.run(trace_from_feeder_context(_feeder)) + #asyncio.run(trace_from_feeder_downstream(_feeder)) + #asyncio.run(trace_from_energy_consumers(_feeder)) + + # Get a list of feeders before entering main compute section of script. + feeders = list(asyncio.run(get_feeders())) + + from tqdm.contrib.concurrent import process_map + process_map(multi_proc, feeders ,max_workers=int(os.cpu_count() / 2)) + + +if __name__ == "__main__": + process_feeders_sequentially() diff --git a/src/zepben/examples/emergy_consumer_device_hierarchy_downstream_context.py b/src/zepben/examples/emergy_consumer_device_hierarchy_downstream_context.py deleted file mode 100644 index 4397144..0000000 --- a/src/zepben/examples/emergy_consumer_device_hierarchy_downstream_context.py +++ /dev/null @@ -1,143 +0,0 @@ -# Copyright 2025 Zeppelin Bend Pty Ltd -# -# This Source Code Form is subject to the terms of the Mozilla Public -# License, v. 2.0. If a copy of the MPL was not distributed with this -# file, You can obtain one at https://mozilla.org/MPL/2.0/. - -import asyncio -import json -import math -import os -from dataclasses import dataclass -import time -from typing import Dict - -import pandas as pd -from tqdm.contrib.concurrent import process_map -from zepben.evolve import connect_with_token, NetworkConsumerClient, Feeder, Tracing, downstream, StepActionWithContextValue, \ - NetworkTraceStep, EnergyConsumer, StepContext, IdentifiedObject, Breaker, Fuse, PowerTransformer, TransformerFunctionKind -from zepben.protobuf.nc.nc_requests_pb2 import IncludedEnergizedContainers - - -@dataclass -class EnergyConsumerDeviceHierarchy: - energy_consumer_mrid: str - lv_circuit_name: str - upstream_switch_mrid: str - lv_circuit_name: str - upstream_switch_class: str - distribution_power_transformer_mrid: str - distribution_power_transformer_name: str - regulator_mrid: str - breaker_mrid: str - feeder_mrid: str - - -def _get_client(): - with open('config.json') as f: - config = json.load(f) - - # Connect to server - channel = connect_with_token(**config) - return NetworkConsumerClient(channel) - - -async def get_feeders() -> Dict[str, Feeder]: - _feeders = (await _get_client().get_network_hierarchy()).result.feeders - return _feeders - - -async def trace_from_feeder(feeder_mrid: str): - """ - Fetch the equipment container from the given feeder and build an equipment tree of everything downstream of the feeder. - Use the Equipment tree to traverse upstream of all EC's and get the equipment we are interested in. - Finally, create a CSV with the relevant information. - """ - client = _get_client() - # Get all objects under the feeder, including Substations and LV Feeders - (await client.get_equipment_container( - feeder_mrid, - include_energized_containers = IncludedEnergizedContainers.INCLUDE_ENERGIZED_LV_FEEDERS - )).throw_on_error() - feeder = client.service.get(feeder_mrid, Feeder) - - energy_consumers = [] - - class StepActionWithContext(StepActionWithContextValue): - def _apply(self, item: NetworkTraceStep, context: StepContext): - if isinstance((ec := item.path.to_equipment), EnergyConsumer): - nonlocal energy_consumers - data = self.get_context_value(context) - data.update({'feeder': feeder.mrid, 'energy_consumer_mrid': ec.mrid}) - - row = _build_row(data) - energy_consumers.append(row) - - def compute_next_value(self, next_item: NetworkTraceStep, current_item: NetworkTraceStep, current_value: Dict[str, IdentifiedObject]): - data = dict(current_value) - equip = next_item.path.to_equipment - if isinstance(equip, Breaker): - data['breaker'] = equip - elif isinstance(equip, Fuse): - data['upstream_switch'] = equip - elif isinstance(equip, PowerTransformer): - if equip.function == TransformerFunctionKind.distributionTransformer: - data['distribution_power_transformer'] = equip - elif equip.function == TransformerFunctionKind.voltageRegulator: - data['regulator'] = equip - return data - - def compute_initial_value(self, item: NetworkTraceStep): - return {} - - await ( - Tracing.network_trace() - .add_condition(downstream()) - .add_step_action(StepActionWithContext('key')) - ).run(getattr(feeder, 'normal_head_terminal')) - - csv_sfx = "energy_consumers.csv" - network_objects = pd.DataFrame(energy_consumers) - os.makedirs("csvs", exist_ok=True) - network_objects.to_csv(f"csvs/{feeder.mrid}_{csv_sfx}", index=False) - - -def _build_row(up_data: dict[str, IdentifiedObject | str]) -> EnergyConsumerDeviceHierarchy: - return EnergyConsumerDeviceHierarchy( - energy_consumer_mrid = up_data['energy_consumer_mrid'], - upstream_switch_mrid = (up_data.get('upstream_switch') or NullEquipment).mrid, - lv_circuit_name = (up_data.get('upstream_switch') or NullEquipment).name, - upstream_switch_class = type(up_data.get('upstream_switch')).__name__, - distribution_power_transformer_mrid = (up_data.get('distribution_power_transformer') or NullEquipment).mrid, - distribution_power_transformer_name = (up_data.get('distribution_power_transformer') or NullEquipment).name, - regulator_mrid = (up_data.get('regulator') or NullEquipment).mrid, - breaker_mrid = (up_data.get('breaker') or NullEquipment).mrid, - feeder_mrid = up_data.get('feeder'), - ) - - -class NullEquipment: - """empty class to simplify code below in the case of an equipment not existing in that position of the network""" - mrid = None - name = None - - -def main(_feeder): - asyncio.run(trace_from_feeder(_feeder)) - - -if __name__ == "__main__": - start = time.time() - # Get a list of feeders before entering main compute section of script. - feeders = list(asyncio.run(get_feeders())) - - print('processing feeders') - - # Process feeders sequentially. - # for _feeder in tqdm(feeders): - # asyncio.run(trace_from_feeder(_feeder)) - - # Process feeders concurrently. - process_map(main, feeders, max_workers=math.floor(os.cpu_count()/2)) - - print(f'done in {time.time() - start} seconds') diff --git a/src/zepben/examples/energy_consumer_device_hierarchy.py b/src/zepben/examples/energy_consumer_device_hierarchy.py deleted file mode 100644 index 7d2b4fa..0000000 --- a/src/zepben/examples/energy_consumer_device_hierarchy.py +++ /dev/null @@ -1,143 +0,0 @@ -# Copyright 2025 Zeppelin Bend Pty Ltd -# -# This Source Code Form is subject to the terms of the Mozilla Public -# License, v. 2.0. If a copy of the MPL was not distributed with this -# file, You can obtain one at https://mozilla.org/MPL/2.0/. - -import asyncio -import json -import os -from dataclasses import dataclass -from multiprocessing import Pool -from typing import Union - -import pandas as pd -from zepben.ewb import NetworkConsumerClient, connect_with_token, Tracing, upstream, EnergyConsumer, NetworkTraceStep, StepContext, PowerTransformer, \ - TransformerFunctionKind, Breaker, ConductingEquipment, Fuse, IdentifiedObject, NetworkTrace, Feeder -from zepben.protobuf.nc.nc_requests_pb2 import IncludedEnergizedContainers - - -@dataclass -class EnergyConsumerDeviceHierarchy: - energy_consumer_mrid: str - lv_circuit_name: str - upstream_switch_mrid: str - lv_circuit_name: str - upstream_switch_class: str - distribution_power_transformer_mrid: str - distribution_power_transformer_name: str - regulator_mrid: str - breaker_mrid: str - feeder_mrid: str - - -def _get_client(): - with open('config.json') as f: - config = json.load(f) - - # Connect to server - channel = connect_with_token( - host=config["host"], - access_token=config["access_token"], - rpc_port=config['rpc_port'], - ca_filename=config['ca_path'] - ) - return NetworkConsumerClient(channel) - - -def _get_equipment_tree_trace(up_data: dict) -> NetworkTrace: - def step_action(step: NetworkTraceStep, _: StepContext): - to_equip: ConductingEquipment = step.path.to_equipment - if isinstance(to_equip, Breaker): - if not up_data.get('breaker'): - up_data['breaker'] = to_equip - elif isinstance(to_equip, Fuse): - if not up_data.get('upstream_switch'): - up_data['upstream_switch'] = to_equip - elif isinstance(to_equip, PowerTransformer): - if not up_data.get('distribution_power_transformer'): - up_data['distribution_power_transformer'] = to_equip - elif not up_data.get('regulator') and to_equip.function == TransformerFunctionKind.voltageRegulator: - up_data['regulator'] = to_equip - - return ( - Tracing.network_trace() - .add_condition(upstream()) - .add_step_action(step_action) - ) - - -async def get_feeders(): - client = _get_client() - - _feeders = (await client.get_network_hierarchy()).result.feeders - return _feeders - - -async def trace_from_energy_consumers(feeder): - """ - Fetch the equipment container from the given feeder, then trace upstream from every EnergyConsumer - and create a CSV with the relevant information. - """ - client = _get_client() - print(f'processing feeder {feeder}') - # Get all objects under the feeder, including Substations and LV Feeders - (await client.get_equipment_container(feeder, include_energized_containers=IncludedEnergizedContainers.INCLUDE_ENERGIZED_LV_FEEDERS)).throw_on_error() - network = client.service - f = network.get(feeder, Feeder) - - energy_consumers = [] - for lvf in f.normal_energized_lv_feeders: - for ce in lvf.equipment: - if isinstance(ce, EnergyConsumer): - up_data = {'feeder': feeder, 'energy_consumer_mrid': ce.mrid} - - # Trace upstream from EnergyConsumer. - await _get_equipment_tree_trace(up_data).run(ce) - energy_consumers.append(_build_row(up_data)) - - csv_sfx = "energy_consumers.csv" - network_objects = pd.DataFrame(energy_consumers) - os.makedirs("csvs", exist_ok=True) - network_objects.to_csv(f"csvs/{f.mrid}_{csv_sfx}", index=False) - - -class NullEquipment: - """empty class to simplify code below in the case of an equipment not existing in that position of the network""" - mrid = None - name = None - - -def _build_row(up_data: dict[str, Union[IdentifiedObject, str]]) -> EnergyConsumerDeviceHierarchy: - return EnergyConsumerDeviceHierarchy( - energy_consumer_mrid=up_data['energy_consumer_mrid'], - upstream_switch_mrid=(up_data.get('upstream_switch') or NullEquipment).mrid, - lv_circuit_name=(up_data.get('upstream_switch') or NullEquipment).name, - upstream_switch_class=type(up_data.get('upstream_switch')).__name__, - distribution_power_transformer_mrid=(up_data.get('distribution_power_transformer') or NullEquipment).mrid, - distribution_power_transformer_name=(up_data.get('distribution_power_transformer') or NullEquipment).name, - regulator_mrid=(up_data.get('regulator') or NullEquipment).mrid, - breaker_mrid=(up_data.get('breaker') or NullEquipment).mrid, - feeder_mrid=up_data.get('feeder'), - ) - - -def process_target(feeder): - asyncio.run(trace_from_energy_consumers(feeder)) - - -if __name__ == "__main__": - # Get a list of feeders before entering main compute section of script. - feeders = asyncio.run(get_feeders()) - - # Spin up a multiprocess pool of $CPU_COUNT processes to handle the workload, otherwise we saturate a single cpu core and it's slow. - cpus = os.cpu_count() - print(f'Spawning {cpus} processes') - pool = Pool(cpus) - - print(f'mapping to process pool') - pool.map(process_target, feeders) - - print('finishing remaining processes') - pool.close() - pool.join() diff --git a/src/zepben/examples/energy_consumer_device_hierarchy_downstream.py b/src/zepben/examples/energy_consumer_device_hierarchy_downstream.py deleted file mode 100644 index 3f517a0..0000000 --- a/src/zepben/examples/energy_consumer_device_hierarchy_downstream.py +++ /dev/null @@ -1,146 +0,0 @@ -# Copyright 2025 Zeppelin Bend Pty Ltd -# -# This Source Code Form is subject to the terms of the Mozilla Public -# License, v. 2.0. If a copy of the MPL was not distributed with this -# file, You can obtain one at https://mozilla.org/MPL/2.0/. - -import asyncio -import json -import os -import time -import math -from dataclasses import dataclass -from typing import Dict - -import pandas as pd -from zepben.ewb import NetworkConsumerClient, connect_with_token, Tracing, EnergyConsumer, PowerTransformer, \ - TransformerFunctionKind, Breaker, Fuse, IdentifiedObject, EquipmentTreeBuilder, downstream, TreeNode, \ - Feeder, IncludedEnergizedContainers -import tqdm -from tqdm.contrib.concurrent import process_map - - -@dataclass -class EnergyConsumerDeviceHierarchy: - energy_consumer_mrid: str - lv_circuit_name: str - upstream_switch_mrid: str - lv_circuit_name: str - upstream_switch_class: str - distribution_power_transformer_mrid: str - distribution_power_transformer_name: str - regulator_mrid: str - breaker_mrid: str - feeder_mrid: str - - -def _get_client(): - with open('config.json') as f: - config = json.load(f) - - # Connect to server - channel = connect_with_token(**config) - return NetworkConsumerClient(channel) - - -async def get_feeders() -> Dict[str, Feeder]: - _feeders = (await _get_client().get_network_hierarchy()).result.feeders - return _feeders - - -def process_leaf(up_data: dict, leaf: TreeNode): - to_equip: IdentifiedObject = leaf.identified_object - - if isinstance(to_equip, Breaker): - if not up_data.get('breaker'): - up_data['breaker'] = to_equip - elif isinstance(to_equip, Fuse): - if not up_data.get('upstream_switch'): - up_data['upstream_switch'] = to_equip - elif isinstance(to_equip, PowerTransformer): - if not up_data.get('distribution_power_transformer'): - up_data['distribution_power_transformer'] = to_equip - elif not up_data.get('regulator') and to_equip.function == TransformerFunctionKind.voltageRegulator: - up_data['regulator'] = to_equip - - -async def trace_from_feeder(feeder_mrid: str): - """ - Fetch the equipment container from the given feeder and build an equipment tree of everything downstream of the feeder. - Use the Equipment tree to traverse upstream of all EC's and get the equipment we are interested in. - Finally, create a CSV with the relevant information. - """ - client = _get_client() - # Get all objects under the feeder, including Substations and LV Feeders - (await client.get_equipment_container( - feeder_mrid, - include_energized_containers = IncludedEnergizedContainers.INCLUDE_ENERGIZED_LV_FEEDERS - )).throw_on_error() - feeder = client.service.get(feeder_mrid, Feeder) - - builder = EquipmentTreeBuilder() - - await ( - Tracing.network_trace() - .add_condition(downstream()) - .add_step_action(builder) - ).run(getattr(feeder, 'normal_head_terminal')) - - energy_consumers = [] - - for leaf in (l for l in builder.leaves if isinstance((ec := l.identified_object), EnergyConsumer)): - ec_data = {'feeder': feeder.mrid, 'energy_consumer_mrid': ec.mrid} - def _process(_leaf): - process_leaf(ec_data, _leaf) - if _leaf.parent: - _process(_leaf.parent) - _process(leaf) - - row = _build_row(ec_data) - energy_consumers.append(row) - - csv_sfx = "energy_consumers.csv" - network_objects = pd.DataFrame(energy_consumers) - os.makedirs("csvs", exist_ok=True) - network_objects.to_csv(f"csvs/{feeder.mrid}_{csv_sfx}", index=False) - - -class NullEquipment: - """empty class to simplify code below in the case of an equipment not existing in that position of the network""" - mrid = None - name = None - - -def _build_row(up_data: dict[str, IdentifiedObject | str]) -> EnergyConsumerDeviceHierarchy: - return EnergyConsumerDeviceHierarchy( - energy_consumer_mrid = up_data['energy_consumer_mrid'], - upstream_switch_mrid = (up_data.get('upstream_switch') or NullEquipment).mrid, - lv_circuit_name = (up_data.get('upstream_switch') or NullEquipment).name, - upstream_switch_class = type(up_data.get('upstream_switch')).__name__, - distribution_power_transformer_mrid = (up_data.get('distribution_power_transformer') or NullEquipment).mrid, - distribution_power_transformer_name = (up_data.get('distribution_power_transformer') or NullEquipment).name, - regulator_mrid = (up_data.get('regulator') or NullEquipment).mrid, - breaker_mrid = (up_data.get('breaker') or NullEquipment).mrid, - feeder_mrid = up_data.get('feeder'), - ) - - -def main(_feeder): - asyncio.run(trace_from_feeder(_feeder)) - - -if __name__ == "__main__": - start = time.time() - # Get a list of feeders before entering main compute section of script. - feeders = list(asyncio.run(get_feeders())) - - print('processing feeders') - - # Process feeders sequentially. - for _feeder in tqdm.tqdm(feeders): - asyncio.run(trace_from_feeder(_feeder)) - - # Process feeders concurrently. - #process_map(main, feeders, max_workers=math.floor(os.cpu_count()/2)) - - print(f'done in {time.time() - start} seconds') From 511b6bc7ab9922ba4acd6edfcb14dfb35b2cdcd8 Mon Sep 17 00:00:00 2001 From: Kurt Greaves Date: Thu, 11 Sep 2025 12:22:05 +1000 Subject: [PATCH 3/6] Rename Signed-off-by: Kurt Greaves --- ...er_device_hierarchy.py => energy_consumer_device_hierarchy.py} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename src/zepben/examples/{emergy_consumer_device_hierarchy.py => energy_consumer_device_hierarchy.py} (100%) diff --git a/src/zepben/examples/emergy_consumer_device_hierarchy.py b/src/zepben/examples/energy_consumer_device_hierarchy.py similarity index 100% rename from src/zepben/examples/emergy_consumer_device_hierarchy.py rename to src/zepben/examples/energy_consumer_device_hierarchy.py From a14cacb5c2cd6c38df29495aecd927b5cbdcc0ae Mon Sep 17 00:00:00 2001 From: Kurt Greaves Date: Thu, 11 Sep 2025 12:22:17 +1000 Subject: [PATCH 4/6] Fix for version 1.0.0 Signed-off-by: Kurt Greaves --- src/zepben/examples/energy_consumer_device_hierarchy.py | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/src/zepben/examples/energy_consumer_device_hierarchy.py b/src/zepben/examples/energy_consumer_device_hierarchy.py index c704f82..5f7f638 100644 --- a/src/zepben/examples/energy_consumer_device_hierarchy.py +++ b/src/zepben/examples/energy_consumer_device_hierarchy.py @@ -11,11 +11,9 @@ from typing import Dict, Callable, List from dataclasses import dataclass - -from zepben.evolve import connect_with_token, NetworkConsumerClient, Feeder, Tracing, downstream, StepActionWithContextValue, \ +from zepben.ewb import connect_with_token, NetworkConsumerClient, Feeder, Tracing, downstream, StepActionWithContextValue, \ NetworkTraceStep, EnergyConsumer, StepContext, IdentifiedObject, Breaker, Fuse, PowerTransformer, TransformerFunctionKind, TreeNode, EquipmentTreeBuilder, \ - ConductingEquipment, NetworkTrace, upstream -from zepben.protobuf.nc.nc_requests_pb2 import IncludedEnergizedContainers + ConductingEquipment, NetworkTrace, upstream, IncludedEnergizedContainers @dataclass @@ -50,7 +48,7 @@ async def get_feeder_equipmet(client: NetworkConsumerClient, feeder_mrid: str) - """Get all objects under the feeder, including LV Feeders""" (await client.get_equipment_container( feeder_mrid, - include_energized_containers=IncludedEnergizedContainers.INCLUDE_ENERGIZED_LV_FEEDERS + include_energized_containers=IncludedEnergizedContainers.LV_FEEDERS )).throw_on_error() From 152fc14dda238f0669a02954f7f5c16995b4f264 Mon Sep 17 00:00:00 2001 From: Kurt Greaves Date: Thu, 11 Sep 2025 17:02:15 +1000 Subject: [PATCH 5/6] Update versions Signed-off-by: Kurt Greaves --- pyproject.toml | 7 ++++--- src/zepben/examples/energy_consumer_device_hierarchy.py | 2 +- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index bd75e82..2e5f3a8 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -23,14 +23,15 @@ authors = [ {name = "Zeppelin Bend", email = "oss@zepben.com"} ] dependencies = [ - "zepben.eas==0.19.0", - "zepben.ewb==1.0.0b7", + "zepben.eas==0.23.0", + "zepben.ewb==1.0.3", "numba==0.60.0", "geojson==2.5.0", "gql[requests]==3.4.1", "geopandas", "pandas", - "shapely" + "shapely", + "tqdm" ] classifiers = [ "Programming Language :: Python :: 3", diff --git a/src/zepben/examples/energy_consumer_device_hierarchy.py b/src/zepben/examples/energy_consumer_device_hierarchy.py index 5f7f638..981f607 100644 --- a/src/zepben/examples/energy_consumer_device_hierarchy.py +++ b/src/zepben/examples/energy_consumer_device_hierarchy.py @@ -257,7 +257,7 @@ def multi_proc(_feeder): feeders = list(asyncio.run(get_feeders())) from tqdm.contrib.concurrent import process_map - process_map(multi_proc, feeders ,max_workers=int(os.cpu_count() / 2)) + process_map(multi_proc, feeders, max_workers=int(os.cpu_count() / 2)) if __name__ == "__main__": From 3c863177ca0d1e767491b78e6fe04c9bf3fc32c8 Mon Sep 17 00:00:00 2001 From: Kurt Greaves Date: Fri, 12 Sep 2025 13:56:46 +1000 Subject: [PATCH 6/6] Update example Signed-off-by: Kurt Greaves --- .../energy_consumer_device_hierarchy.py | 45 ++++++++++--------- 1 file changed, 25 insertions(+), 20 deletions(-) diff --git a/src/zepben/examples/energy_consumer_device_hierarchy.py b/src/zepben/examples/energy_consumer_device_hierarchy.py index 981f607..d39dfe0 100644 --- a/src/zepben/examples/energy_consumer_device_hierarchy.py +++ b/src/zepben/examples/energy_consumer_device_hierarchy.py @@ -39,12 +39,12 @@ def _get_client(): return NetworkConsumerClient(channel) -async def get_feeders(_client = None) -> Dict[str, Feeder]: +async def get_feeders(_client=None) -> Dict[str, Feeder]: _feeders = (await (_client or _get_client()).get_network_hierarchy()).result.feeders return _feeders -async def get_feeder_equipmet(client: NetworkConsumerClient, feeder_mrid: str) -> None: +async def get_feeder_equipment(client: NetworkConsumerClient, feeder_mrid: str) -> None: """Get all objects under the feeder, including LV Feeders""" (await client.get_equipment_container( feeder_mrid, @@ -59,7 +59,7 @@ async def trace_from_energy_consumers(feeder_mrid: str, client=None): Trace upstream from every EnergyConsumer. """ client = client or _get_client() - await get_feeder_equipmet(client, feeder_mrid) + await get_feeder_equipment(client, feeder_mrid) def _get_equipment_tree_trace(up_data: dict) -> NetworkTrace: def step_action(step: NetworkTraceStep, _: StepContext): @@ -104,6 +104,7 @@ async def trace_from_feeder_downstream(feeder_mrid: str, client=None): Build an equipment tree of everything downstream of the feeder. Use the Equipment tree to recurse through parent equipment of all EC's and get the equipment we are interested in. """ + def process_leaf(up_data: dict, leaf: TreeNode): to_equip: IdentifiedObject = leaf.identified_object @@ -119,8 +120,8 @@ def process_leaf(up_data: dict, leaf: TreeNode): elif not up_data.get('regulator') and to_equip.function == TransformerFunctionKind.voltageRegulator: up_data['regulator'] = to_equip - client = client or _get_client() - await get_feeder_equipmet(client, feeder_mrid) + client = client or _get_client() + await get_feeder_equipment(client, feeder_mrid) builder = EquipmentTreeBuilder() @@ -135,10 +136,12 @@ def process_leaf(up_data: dict, leaf: TreeNode): for leaf in (l for l in builder.leaves if isinstance((ec := l.identified_object), EnergyConsumer)): ec_data = {'feeder': feeder.mrid, 'energy_consumer_mrid': ec.mrid} + def _process(_leaf): process_leaf(ec_data, _leaf) if _leaf.parent: _process(_leaf.parent) + _process(leaf) row = _build_row(ec_data) @@ -154,7 +157,7 @@ async def trace_from_feeder_context(feeder_mrid: str, client=None): """ client = client or _get_client() # Get all objects under the feeder, including Substations and LV Feeders - await get_feeder_equipmet(client, feeder_mrid) + await get_feeder_equipment(client, feeder_mrid) energy_consumers = [] @@ -210,15 +213,15 @@ class NullEquipment: def _build_row(up_data: dict[str, IdentifiedObject | str]) -> EnergyConsumerDeviceHierarchy: return EnergyConsumerDeviceHierarchy( - energy_consumer_mrid = up_data['energy_consumer_mrid'], - upstream_switch_mrid = (up_data.get('upstream_switch') or NullEquipment).mrid, - lv_circuit_name = (up_data.get('upstream_switch') or NullEquipment).name, - upstream_switch_class = type(up_data.get('upstream_switch')).__name__, - distribution_power_transformer_mrid = (up_data.get('distribution_power_transformer') or NullEquipment).mrid, - distribution_power_transformer_name = (up_data.get('distribution_power_transformer') or NullEquipment).name, - regulator_mrid = (up_data.get('regulator') or NullEquipment).mrid, - breaker_mrid = (up_data.get('breaker') or NullEquipment).mrid, - feeder_mrid = up_data.get('feeder'), + energy_consumer_mrid=up_data['energy_consumer_mrid'], + upstream_switch_mrid=(up_data.get('upstream_switch') or NullEquipment).mrid, + lv_circuit_name=(up_data.get('upstream_switch') or NullEquipment).name, + upstream_switch_class=type(up_data.get('upstream_switch')).__name__, + distribution_power_transformer_mrid=(up_data.get('distribution_power_transformer') or NullEquipment).mrid, + distribution_power_transformer_name=(up_data.get('distribution_power_transformer') or NullEquipment).name, + regulator_mrid=(up_data.get('regulator') or NullEquipment).mrid, + breaker_mrid=(up_data.get('breaker') or NullEquipment).mrid, + feeder_mrid=up_data.get('feeder'), ) @@ -236,22 +239,23 @@ async def main_async(trace_type: Callable): """ from tqdm import tqdm client = _get_client() - feeders = list(await get_feeders(client)) + feeders = [""] + # feeders = list(await get_feeders(client)) # Uncomment to process all feeders for _feeder in tqdm(feeders): await trace_type(_feeder, client) # Uncomment to run other trace functions asyncio.run(main_async(trace_from_feeder_context)) - #asyncio.run(main_async(trace_from_feeder_downstream)) - #asyncio.run(main_async(trace_from_energy_consumers)) + # asyncio.run(main_async(trace_from_feeder_downstream)) + # asyncio.run(main_async(trace_from_energy_consumers)) def process_feeders_concurrently(): def multi_proc(_feeder): # Uncomment to run other trace functions asyncio.run(trace_from_feeder_context(_feeder)) - #asyncio.run(trace_from_feeder_downstream(_feeder)) - #asyncio.run(trace_from_energy_consumers(_feeder)) + # asyncio.run(trace_from_feeder_downstream(_feeder)) + # asyncio.run(trace_from_energy_consumers(_feeder)) # Get a list of feeders before entering main compute section of script. feeders = list(asyncio.run(get_feeders())) @@ -262,3 +266,4 @@ def multi_proc(_feeder): if __name__ == "__main__": process_feeders_sequentially() + # process_feeders_concurrently() # Uncomment and comment sequentially above to multi-process, note this is resource intensive and may cause issues.