Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -135,3 +135,6 @@ src/zepben/examples/config*.json
*.crt

src/zepben/examples/csvs/

config.json
src/zepben/examples/config.json
54 changes: 54 additions & 0 deletions src/zepben/examples/studies/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
# Studies

This folder contains runnable study scripts that fetch network data, generate GeoJSON overlays, and upload results to EAS.
Most scripts accept a zone code (e.g. `CPM`) and use the shared config at `src/zepben/examples/config.json`.

## Quick start

1. Ensure `src/zepben/examples/config.json` has valid `host`, `access_token`, and `rpc_port`.
2. Run a study from this folder, for example:

```bash
python transformer_utilisation_by_demand.py CPM
```

## Common patterns

- **Zones vs feeders**: Some scripts support explicit feeder MRIDs. For transformer utilisation, use:

```bash
python transformer_utilisation_by_demand.py --mode feeders CPM3B3
```

- **Config override**: Most scripts accept `--config` to point at a different config file.
- **Styles**: Each study uses a companion `style_*.json` file to control map rendering.
- **Outputs**: Studies upload results to EAS and will log progress in the terminal.

## Data quality studies

Data quality scripts live in `data_quality_studies/`. See the dedicated README for usage:

- `src/zepben/examples/studies/data_quality_studies/README.md`

## Troubleshooting

- **Timeouts**: Large zones can take several minutes. Use a longer shell timeout or reduce concurrency if available.
- **404s from Load API**: Some assets may not have demand profiles; the scripts continue and mark those as missing.
- **No features uploaded**: If locations are missing, the study skips upload and logs a message.

## Scripts in this folder

Representative studies:
- `transformer_utilisation_by_demand.py`
- `pv_percent_by_transformer.py`
- `suspect_end_of_line.py`
- `transformer_downstream_density.py`
- `customer_distance_to_transformer.py`
- `loop_impedance_by_energy_consumer.py`
- `tap_changer_info_by_transformer.py`

See each script’s header and help output for specifics:

```bash
python <script>.py --help
```
284 changes: 284 additions & 0 deletions src/zepben/examples/studies/customer_distance_to_transformer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,284 @@
# Copyright 2025 Zeppelin Bend Pty Ltd
#
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at https://mozilla.org/MPL/2.0/.

import asyncio
import json
from datetime import datetime
from itertools import islice
from typing import List, Dict, Tuple, Callable, Any, Union, Type, Set

from geojson import FeatureCollection, Feature
from geojson.geometry import Geometry, LineString, Point
from zepben.eas.client.eas_client import EasClient
from zepben.eas.client.study import Study, Result, GeoJsonOverlay
from zepben.ewb import (
AcLineSegment,
EnergyConsumer,
PowerTransformer,
NetworkConsumerClient,
PhaseCode,
Feeder,
PowerSystemResource,
Location,
connect_with_token,
NetworkTraceStep,
Tracing,
upstream,
stop_at_open,
IncludedEnergizedContainers,
)
from zepben.ewb.services.network.tracing.networktrace.operators.network_state_operators import NetworkStateOperators


with open("../config.json") as f:
c = json.loads(f.read())


def chunk(it, size):
it = iter(it)
return iter(lambda: tuple(islice(it, size)), ())


async def main():
# Only process feeders in the following zones
zone_mrids = ["CPM"]
print(f"Start time: {datetime.now()}")

rpc_channel = connect_with_token(
host=c["host"],
access_token=c["access_token"],
rpc_port=c["rpc_port"],
ca_filename=c.get("ca_filename"),
timeout_seconds=c.get("timeout_seconds", 5),
debug=bool(c.get("debug", False)),
skip_connection_test=bool(c.get("skip_connection_test", False)),
)
client = NetworkConsumerClient(rpc_channel)
hierarchy = (await client.get_network_hierarchy()).throw_on_error()
substations = hierarchy.value.substations

print(f"Collecting feeders from zones {', '.join(zone_mrids)}.")
feeder_mrids = []
for zone_mrid in zone_mrids:
if zone_mrid in substations:
for feeder in substations[zone_mrid].feeders:
feeder_mrids.append(feeder.mrid)

print(f"Feeders to be processed: {', '.join(feeder_mrids)}")

all_ecs: List[EnergyConsumer] = []
ec_to_distance_m: Dict[str, float] = {}

# Process feeders in batches of 3, using asyncio, for performance
batches = chunk(feeder_mrids, 3)
for feeders in batches:
futures = []
rpc_channel = connect_with_token(
host=c["host"],
access_token=c["access_token"],
rpc_port=c["rpc_port"],
ca_filename=c.get("ca_filename"),
timeout_seconds=c.get("timeout_seconds", 5),
debug=bool(c.get("debug", False)),
skip_connection_test=bool(c.get("skip_connection_test", False)),
)
print(f"Processing feeders {', '.join(feeders)}")
for feeder_mrid in feeders:
futures.append(asyncio.ensure_future(fetch_feeder_customer_distance(feeder_mrid, rpc_channel)))

for future in futures:
result = await future
if result is None:
continue
ecs, distances = result
all_ecs.extend(ecs)
ec_to_distance_m.update(distances)

print(f"Creating study for {len(all_ecs)} energy consumers")

eas_client = EasClient(host=c["host"], port=c["rpc_port"], protocol="https", access_token=c["access_token"])
print(f"Uploading Study for zones {', '.join(zone_mrids)} ...")
await upload_distance_study(
eas_client,
all_ecs,
ec_to_distance_m,
name=f"Customer distance to transformer ({', '.join(zone_mrids)})",
description="Distance along the normal network path from each EnergyConsumer to its upstream transformer.",
tags=["customer_distance", "-".join(zone_mrids)],
styles=json.load(open("style_customer_distance.json", "r")),
)
await eas_client.aclose()
print("Uploaded Study")

print(f"Finish time: {datetime.now()}")


async def fetch_feeder_customer_distance(
feeder_mrid: str,
rpc_channel,
) -> Union[Tuple[List[EnergyConsumer], Dict[str, float]], None]:
print(f"Fetching Feeder {feeder_mrid}")
client = NetworkConsumerClient(rpc_channel)

result = (
await client.get_equipment_container(
mrid=feeder_mrid,
expected_class=Feeder,
include_energized_containers=IncludedEnergizedContainers.LV_FEEDERS,
)
)
if result.was_failure:
print(f"Failed: {result.thrown}")
return None

network = client.service
print(f"Finished fetching Feeder {feeder_mrid}")

# Required for directed traces (upstream/downstream)
await Tracing.set_direction().run(network, network_state_operators=NetworkStateOperators.NORMAL)

ecs = list(network.objects(EnergyConsumer))
ec_to_distance_m = {}
for ec in ecs:
path_lines = await get_upstream_lines_to_transformer(ec)
distance = sum(_line_length_m(line) for line in path_lines)
ec_to_distance_m[ec.mrid] = distance

return ecs, ec_to_distance_m


def _line_length_m(line: AcLineSegment) -> float:
return float(line.length or 0.0)


def collect_upstream_lines_provider(lines: Set[AcLineSegment]):

async def collect_lines(ps: NetworkTraceStep, _):
line = ps.path.traversed_ac_line_segment
if line is not None:
lines.add(line)
if isinstance(ps.path.to_equipment, AcLineSegment):
lines.add(ps.path.to_equipment)
if isinstance(ps.path.from_equipment, AcLineSegment):
lines.add(ps.path.from_equipment)

return collect_lines


async def get_upstream_lines_to_transformer(ec: EnergyConsumer) -> Set[AcLineSegment]:
lines = set()

await (
Tracing.network_trace()
.add_condition(upstream())
.add_condition(stop_at_open())
.add_step_action(collect_upstream_lines_provider(lines))
.add_stop_condition(_is_transformer)
).run(start=ec, phases=PhaseCode.ABCN, can_stop_on_start_item=False)

return lines


def _is_transformer(ps: NetworkTraceStep, _context=None) -> bool:
return isinstance(ps.path.to_equipment, PowerTransformer)


async def upload_distance_study(
eas_client: EasClient,
ecs: List[EnergyConsumer],
ec_to_distance_m: Dict[str, float],
name: str,
description: str,
tags: List[str],
styles: List,
) -> None:

class_to_properties = {
EnergyConsumer: {
"name": lambda ec: ec.name,
"distance_m": _distance_from(ec_to_distance_m),
"distance_label": _distance_label_from(ec_to_distance_m),
"type": lambda x: "ec",
},
}
feature_collection = to_geojson_feature_collection(ecs, class_to_properties)
response = await eas_client.async_upload_study(
Study(
name=name,
description=description,
tags=tags,
results=[
Result(
name=name,
geo_json_overlay=GeoJsonOverlay(
data=feature_collection,
styles=[s['id'] for s in styles]
)
)
],
styles=styles
)
)
print(f"Study response: {response}")


def _distance_from(ec_to_distance_m: Dict[str, float]):
def fun(ec: EnergyConsumer):
return round(ec_to_distance_m.get(ec.mrid, 0.0))

return fun


def _distance_label_from(ec_to_distance_m: Dict[str, float]):
def fun(ec: EnergyConsumer):
value = ec_to_distance_m.get(ec.mrid, 0.0)
return f"{round(value)}m"

return fun


def to_geojson_feature_collection(
psrs: List[PowerSystemResource],
class_to_properties: Dict[Type, Dict[str, Callable[[Any], Any]]]
) -> FeatureCollection:

features = []
for psr in psrs:
properties_map = class_to_properties.get(type(psr))

if properties_map is not None:
feature = to_geojson_feature(psr, properties_map)
if feature is not None:
features.append(feature)

return FeatureCollection(features)


def to_geojson_feature(
psr: PowerSystemResource,
property_map: Dict[str, Callable[[PowerSystemResource], Any]]
) -> Union[Feature, None]:

geometry = to_geojson_geometry(psr.location)
if geometry is None:
return None

properties = {k: f(psr) for (k, f) in property_map.items()}
return Feature(psr.mrid, geometry, properties)


def to_geojson_geometry(location: Location) -> Union[Geometry, None]:
points = list(location.points) if location is not None else []
if len(points) > 1:
return LineString([(point.x_position, point.y_position) for point in points])
elif len(points) == 1:
return Point((points[0].x_position, points[0].y_position))
else:
return None


if __name__ == "__main__":
asyncio.run(main())
40 changes: 40 additions & 0 deletions src/zepben/examples/studies/data_quality_studies/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
# Data Quality Studies

These scripts generate data-quality analysis layers focused on connectivity and power-flow modeling issues.
They upload EAS studies using the credentials in `src/zepben/examples/config.json`.

## Usage

From the repository root:

```
python src/zepben/examples/studies/data_quality_studies/connectivity_gaps.py CPM
python src/zepben/examples/studies/data_quality_studies/consumer_mapping_issues.py CPM
python src/zepben/examples/studies/data_quality_studies/phase_conductor_issues.py CPM
python src/zepben/examples/studies/data_quality_studies/asset_attribute_inconsistencies.py CPM
python src/zepben/examples/studies/data_quality_studies/protection_directionality_anomalies.py CPM
python src/zepben/examples/studies/data_quality_studies/spatial_location_anomalies.py CPM
```

Use a comma-separated list for multiple zones:

```
python src/zepben/examples/studies/data_quality_studies/connectivity_gaps.py CPM,NSK
```

## Summary Study

`data_quality_summary.py` runs all checks for a zone and uploads a single study
containing only layers where anomalies are detected. The study description and
tags list only the tests that reported anomalies.

```
python src/zepben/examples/studies/data_quality_studies/data_quality_summary.py NSK
```

## Notes

- Each script accepts a zone code argument. If omitted, it defaults to `CPM`.
- Individual scripts always upload all layers; if no anomalies are found, the
layer name is prefixed with "No anomalies detected: ...".
- The summary script skips upload if no anomalies are detected at all.
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
# Data quality study examples.
Loading
Loading