Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,15 @@ authors = [
{name = "Zeppelin Bend", email = "oss@zepben.com"}
]
dependencies = [
"zepben.eas==0.19.0",
"zepben.ewb==1.0.0b7",
"zepben.eas==0.23.0",
"zepben.ewb==1.0.3",
"numba==0.60.0",
"geojson==2.5.0",
"gql[requests]==3.4.1",
"geopandas",
"pandas",
"shapely"
"shapely",
"tqdm"
]
classifiers = [
"Programming Language :: Python :: 3",
Expand Down
238 changes: 182 additions & 56 deletions src/zepben/examples/energy_consumer_device_hierarchy.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,13 @@
import asyncio
import json
import os
from dataclasses import dataclass
from multiprocessing import Pool
from typing import Union

import pandas as pd
from zepben.ewb import NetworkConsumerClient, connect_with_token, Tracing, upstream, EnergyConsumer, NetworkTraceStep, StepContext, PowerTransformer, \
TransformerFunctionKind, Breaker, ConductingEquipment, Fuse, IdentifiedObject, NetworkTrace, Feeder
from zepben.protobuf.nc.nc_requests_pb2 import IncludedEnergizedContainers

from typing import Dict, Callable, List
from dataclasses import dataclass
from zepben.ewb import connect_with_token, NetworkConsumerClient, Feeder, Tracing, downstream, StepActionWithContextValue, \
NetworkTraceStep, EnergyConsumer, StepContext, IdentifiedObject, Breaker, Fuse, PowerTransformer, TransformerFunctionKind, TreeNode, EquipmentTreeBuilder, \
ConductingEquipment, NetworkTrace, upstream, IncludedEnergizedContainers


@dataclass
Expand All @@ -35,19 +34,80 @@ def _get_client():
with open('config.json') as f:
config = json.load(f)

# Connect to server
channel = connect_with_token(
host=config["host"],
access_token=config["access_token"],
rpc_port=config['rpc_port'],
ca_filename=config['ca_path']
)
# Connect to server
channel = connect_with_token(**config)
return NetworkConsumerClient(channel)


def _get_equipment_tree_trace(up_data: dict) -> NetworkTrace:
def step_action(step: NetworkTraceStep, _: StepContext):
to_equip: ConductingEquipment = step.path.to_equipment
async def get_feeders(_client=None) -> Dict[str, Feeder]:
_feeders = (await (_client or _get_client()).get_network_hierarchy()).result.feeders
return _feeders


async def get_feeder_equipment(client: NetworkConsumerClient, feeder_mrid: str) -> None:
"""Get all objects under the feeder, including LV Feeders"""
(await client.get_equipment_container(
feeder_mrid,
include_energized_containers=IncludedEnergizedContainers.LV_FEEDERS
)).throw_on_error()


async def trace_from_energy_consumers(feeder_mrid: str, client=None):
"""
Least efficient/the slowest
Inefficient upstream tracing example.
Trace upstream from every EnergyConsumer.
"""
client = client or _get_client()
await get_feeder_equipment(client, feeder_mrid)

def _get_equipment_tree_trace(up_data: dict) -> NetworkTrace:
def step_action(step: NetworkTraceStep, _: StepContext):
to_equip: ConductingEquipment = step.path.to_equipment

if isinstance(to_equip, Breaker):
if not up_data.get('breaker'):
up_data['breaker'] = to_equip
elif isinstance(to_equip, Fuse):
if not up_data.get('upstream_switch'):
up_data['upstream_switch'] = to_equip
elif isinstance(to_equip, PowerTransformer):
if not up_data.get('distribution_power_transformer'):
up_data['distribution_power_transformer'] = to_equip
elif not up_data.get('regulator') and to_equip.function == TransformerFunctionKind.voltageRegulator:
up_data['regulator'] = to_equip

return (
Tracing.network_trace()
.add_condition(upstream())
.add_step_action(step_action)
)

feeder = client.service.get(feeder_mrid, Feeder)

energy_consumers = []
for lvf in feeder.normal_energized_lv_feeders:
for ce in lvf.equipment:
if isinstance(ce, EnergyConsumer):
up_data = {'feeder': feeder_mrid, 'energy_consumer_mrid': ce.mrid}

# Trace upstream from EnergyConsumer.
await _get_equipment_tree_trace(up_data).run(ce)
energy_consumers.append(_build_row(up_data))

write_csv(energy_consumers, feeder.mrid)


async def trace_from_feeder_downstream(feeder_mrid: str, client=None):
"""
More memory use than `trace_from_feeder_context`, more efficient/faster than `trace_from_energy_consumers`
Build an equipment tree of everything downstream of the feeder.
Use the Equipment tree to recurse through parent equipment of all EC's and get the equipment we are interested in.
"""

def process_leaf(up_data: dict, leaf: TreeNode):
to_equip: IdentifiedObject = leaf.identified_object

if isinstance(to_equip, Breaker):
if not up_data.get('breaker'):
up_data['breaker'] = to_equip
Expand All @@ -60,46 +120,89 @@ def step_action(step: NetworkTraceStep, _: StepContext):
elif not up_data.get('regulator') and to_equip.function == TransformerFunctionKind.voltageRegulator:
up_data['regulator'] = to_equip

return (
client = client or _get_client()
await get_feeder_equipment(client, feeder_mrid)

builder = EquipmentTreeBuilder()

feeder = client.service.get(feeder_mrid, Feeder)
await (
Tracing.network_trace()
.add_condition(upstream())
.add_step_action(step_action)
)
.add_condition(downstream())
.add_step_action(builder)
).run(getattr(feeder, 'normal_head_terminal'))

energy_consumers = []

for leaf in (l for l in builder.leaves if isinstance((ec := l.identified_object), EnergyConsumer)):
ec_data = {'feeder': feeder.mrid, 'energy_consumer_mrid': ec.mrid}

async def get_feeders():
client = _get_client()
def _process(_leaf):
process_leaf(ec_data, _leaf)
if _leaf.parent:
_process(_leaf.parent)

_feeders = (await client.get_network_hierarchy()).result.feeders
return _feeders
_process(leaf)

row = _build_row(ec_data)
energy_consumers.append(row)

async def trace_from_energy_consumers(feeder):
write_csv(energy_consumers, feeder.mrid)


async def trace_from_feeder_context(feeder_mrid: str, client=None):
"""
Fetch the equipment container from the given feeder, then trace upstream from every EnergyConsumer
and create a CSV with the relevant information.
Most efficient/fastest.
trace downstream from the feeder recording relevant information using `NetworkTrace` `StepContext`.
"""
client = _get_client()
print(f'processing feeder {feeder}')
client = client or _get_client()
# Get all objects under the feeder, including Substations and LV Feeders
(await client.get_equipment_container(feeder, include_energized_containers=IncludedEnergizedContainers.INCLUDE_ENERGIZED_LV_FEEDERS)).throw_on_error()
network = client.service
f = network.get(feeder, Feeder)
await get_feeder_equipment(client, feeder_mrid)

energy_consumers = []
for lvf in f.normal_energized_lv_feeders:
for ce in lvf.equipment:
if isinstance(ce, EnergyConsumer):
up_data = {'feeder': feeder, 'energy_consumer_mrid': ce.mrid}

# Trace upstream from EnergyConsumer.
await _get_equipment_tree_trace(up_data).run(ce)
energy_consumers.append(_build_row(up_data))
feeder = client.service.get(feeder_mrid, Feeder)

csv_sfx = "energy_consumers.csv"
class StepActionWithContext(StepActionWithContextValue):
def _apply(self, item: NetworkTraceStep, context: StepContext):
if isinstance((ec := item.path.to_equipment), EnergyConsumer):
nonlocal energy_consumers
data = self.get_context_value(context)
data.update({'feeder': feeder.mrid, 'energy_consumer_mrid': ec.mrid})

row = _build_row(data)
energy_consumers.append(row)

def compute_next_value(self, next_item: NetworkTraceStep, current_item: NetworkTraceStep, current_value: Dict[str, IdentifiedObject]):
data = dict(current_value)
equip = next_item.path.to_equipment
if isinstance(equip, Breaker):
data['breaker'] = equip
elif isinstance(equip, Fuse):
data['upstream_switch'] = equip
elif isinstance(equip, PowerTransformer):
if equip.function == TransformerFunctionKind.distributionTransformer:
data['distribution_power_transformer'] = equip
elif equip.function == TransformerFunctionKind.voltageRegulator:
data['regulator'] = equip
return data

def compute_initial_value(self, item: NetworkTraceStep):
return {}

await (
Tracing.network_trace()
.add_condition(downstream())
.add_step_action(StepActionWithContext('key'))
).run(getattr(feeder, 'normal_head_terminal'))

write_csv(energy_consumers, feeder.mrid)


def write_csv(energy_consumers: List[EnergyConsumerDeviceHierarchy], feeder_mrid: str):
network_objects = pd.DataFrame(energy_consumers)
os.makedirs("csvs", exist_ok=True)
network_objects.to_csv(f"csvs/{f.mrid}_{csv_sfx}", index=False)
network_objects.to_csv(f"csvs/{feeder_mrid}_energy_consumers.csv", index=False)


class NullEquipment:
Expand All @@ -108,7 +211,7 @@ class NullEquipment:
name = None


def _build_row(up_data: dict[str, Union[IdentifiedObject, str]]) -> EnergyConsumerDeviceHierarchy:
def _build_row(up_data: dict[str, IdentifiedObject | str]) -> EnergyConsumerDeviceHierarchy:
return EnergyConsumerDeviceHierarchy(
energy_consumer_mrid=up_data['energy_consumer_mrid'],
upstream_switch_mrid=(up_data.get('upstream_switch') or NullEquipment).mrid,
Expand All @@ -122,22 +225,45 @@ def _build_row(up_data: dict[str, Union[IdentifiedObject, str]]) -> EnergyConsum
)


def process_target(feeder):
asyncio.run(trace_from_energy_consumers(feeder))
def process_feeders_sequentially():
async def main_async(trace_type: Callable):
"""
Fetch the equipment container from the given feeder and create a CSV with the relevant information.
Differences between the functions passable as `trace_type` are documented in the relevant docstrings.

`trace_type` must be one of the following:
- `trace_from_energy_consumers`
- `trace_from_energy_consumers_with_context`
- `trace_from_feeder_downstream`

"""
from tqdm import tqdm
client = _get_client()
feeders = ["<FEEDER_ID>"]
# feeders = list(await get_feeders(client)) # Uncomment to process all feeders
for _feeder in tqdm(feeders):
await trace_type(_feeder, client)

# Uncomment to run other trace functions
asyncio.run(main_async(trace_from_feeder_context))
# asyncio.run(main_async(trace_from_feeder_downstream))
# asyncio.run(main_async(trace_from_energy_consumers))


def process_feeders_concurrently():
def multi_proc(_feeder):
# Uncomment to run other trace functions
asyncio.run(trace_from_feeder_context(_feeder))
# asyncio.run(trace_from_feeder_downstream(_feeder))
# asyncio.run(trace_from_energy_consumers(_feeder))

if __name__ == "__main__":
# Get a list of feeders before entering main compute section of script.
feeders = asyncio.run(get_feeders())
feeders = list(asyncio.run(get_feeders()))

# Spin up a multiprocess pool of $CPU_COUNT processes to handle the workload, otherwise we saturate a single cpu core and it's slow.
cpus = os.cpu_count()
print(f'Spawning {cpus} processes')
pool = Pool(cpus)
from tqdm.contrib.concurrent import process_map
process_map(multi_proc, feeders, max_workers=int(os.cpu_count() / 2))

print(f'mapping to process pool')
pool.map(process_target, feeders)

print('finishing remaining processes')
pool.close()
pool.join()
if __name__ == "__main__":
process_feeders_sequentially()
# process_feeders_concurrently() # Uncomment and comment sequentially above to multi-process, note this is resource intensive and may cause issues.
Loading
Loading