Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
38df93b
Start of example of sophisticated aggregating compute ensemble
markcoletti Nov 14, 2025
7c1d62f
Start of example of sophisticated aggregating compute ensemble. Many …
markcoletti Nov 14, 2025
479abd8
Added basic slurm script for new example
markcoletti Nov 14, 2025
6a9b350
Start of CSV input; this will go significant changes as the example i…
markcoletti Nov 14, 2025
8fad236
Checkpoint commit
markcoletti Dec 4, 2025
ca0676e
get_platform_info() now returns CPU affinity if that is supported on …
markcoletti Dec 4, 2025
6cbf491
Let's be more explicit about pusutil ops that may be unsupported on s…
markcoletti Dec 4, 2025
a2e66d6
Added variables to template.conf
markcoletti Dec 4, 2025
0299888
Added randomly generated input to CSV
markcoletti Dec 4, 2025
785bef2
Now write out instance parameter values to CSV
markcoletti Dec 4, 2025
8d24bcf
Correcting platform file
markcoletti Dec 5, 2025
e0ccf96
Entirely new synthetic data function
markcoletti Dec 5, 2025
b632ba4
Import params_from_csv utility in EnsembleDriver
markcoletti Dec 5, 2025
8b2b9e8
Repaired header
markcoletti Dec 5, 2025
8de6630
Fixed column header
markcoletti Dec 5, 2025
093366e
Ensuring types are correct for test func
markcoletti Dec 5, 2025
d7e2de7
Convert numpy arrays to lists in return statement
markcoletti Dec 5, 2025
cde2913
Enabling portal support
markcoletti Dec 5, 2025
c1d2aa8
Cleaning up dead comments and code.
markcoletti Dec 5, 2025
bcac255
Added matplotlib as an *optional* dependency since now at least one e…
markcoletti Dec 5, 2025
bdcae0b
Move synthetic data generation to standalone executable
markcoletti Dec 8, 2025
e23eeee
It helps to pass in the actual parameters
markcoletti Dec 8, 2025
8e9b73b
Expanding README.md signposts to include new examples dirs
markcoletti Dec 8, 2025
b3db297
Helps to specify the python interpreter if there is no executable set…
markcoletti Dec 8, 2025
92e8ca9
launching tasks expect single command string instead of suprocess-sty…
markcoletti Dec 8, 2025
8532988
Reducing resource allocation
markcoletti Dec 8, 2025
15fbdaf
Fixed signpost descriptions in README.md
markcoletti Dec 8, 2025
7fc1228
Add instance parameter to data generation and update output files
markcoletti Dec 8, 2025
5a14afa
Add error handling to data generation script
markcoletti Dec 9, 2025
39f7d0c
Repaired indentation
markcoletti Dec 9, 2025
c28e664
Fixed a number of errors.
markcoletti Dec 9, 2025
311ac85
Specifying path to data generator script.
markcoletti Dec 9, 2025
dbf1097
Name the output figure consistently with other output file names
markcoletti Dec 9, 2025
d72d230
Just a minor comment tweak
markcoletti Dec 10, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion examples-proposed/022-tasks-and-ensembles/ensemble.conf
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ SIMULATION_MODE = NORMAL
CLASS = DRIVER
SUB_CLASS =
NAME = EnsembleDriver
NPROC = 1
NPROC = 20
BIN_PATH =
INPUT_FILES =
OUTPUT_FILES =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,11 @@
class InstanceComponent(Component):

def step(self, timestamp: float = 0.0, **keywords):
if 'HWLOC_XMLFILE' in os.environ:
self.services.warning(f'HWLOC_XMLfile still set!')
else:
self.services.info('HWLOC_XMLFILE is not set')

# ENSEMBLE_INSTANCE is a special IPS variable that contains the
# string uniquely identifying this instance. Each instance will have
# the `run_ensemble()` `name` argument prepended to a unique number
Expand All @@ -37,7 +42,7 @@ def step(self, timestamp: float = 0.0, **keywords):
'-o', 'stats.csv']
cmd = str(mpi_executable) + ' ' + ' '.join(args)
try:
run_id = self.services.launch_task(nproc=1,
run_id = self.services.launch_task(nproc=5,
working_dir=working_dir,
binary=cmd)
except Exception as e:
Expand Down
2 changes: 1 addition & 1 deletion examples-proposed/022-tasks-and-ensembles/template.conf
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ SIMULATION_MODE = NORMAL
CLASS = WORKER
SUB_CLASS =
NAME = InstanceComponent
NPROC = 1
NPROC = 2
BIN_PATH =
INPUT_FILES =
OUTPUT_FILES =
Expand Down
33 changes: 33 additions & 0 deletions examples-proposed/024-aggregated-compute-ensemble/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
# An example ensemble simulation for aggregated computing

This example demonstrates how to set up an ensemble simulation in IPS that
performs aggregated computing across multiple ensemble instances. Each ensemble
instance runs a component for ${COMPUTE} that reports values local to the
instance, but that is then aggregated at the top-level after the instances
have finished.

Note that there will be Dask related errors and warnings at the end that can be
ignored. These are due to Dask not having a clean shutdown.

## Contents

* `driver.py` -- top-level driver
* `instance_component.py` -- component worker code
* `instance_driver.py` -- component driver code

* `ensemble.conf` -- top-level configuration file
* `platform.conf` -- platform configuration file
* `template.conf` -- ensemble instance configuration file


## Instructions

To run the code, run:

```bash
PORTAL_API_KEY=changeme ips.py --platform platform.conf --simulation ensemble.conf
```

Depending on the web portal instance you want to connect to, you will need to
change `PORTAL_API_KEY` in the run command and `PORTAL_URL` in the
`ensemble.conf` file.
Empty file.
58 changes: 58 additions & 0 deletions examples-proposed/024-aggregated-compute-ensemble/driver.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
#!/usr/bin/env python3
"""
Simple ensemble driver that just dispatches an IPS ensemble for an example
compute application.
"""

from pathlib import Path

from ipsframework import Component
from ipsframework.ipsutil import params_from_csv


class EnsembleDriver(Component):
"""Kicks off a simple ensemble"""

def init(self, timestamp=0.0):
pass
# TODO temporarily commenting this out until the actual
# example is ready to consider adding a notebook to the portal.

# NOTEBOOK_TEMPLATE = 'notebook.ipynb'
# self.services.stage_input_files([NOTEBOOK_TEMPLATE])
# try:
# self.services.initialize_jupyter_notebook(NOTEBOOK_TEMPLATE)
# except Exception:
# print('did not add notebook to portal')

def step(self, timestamp=0.0):
# This CSV file contains the parameters used for the
# different instances.
variables = params_from_csv(self.config['PARAMETER_FILE'])

# This is the IPS configuration file for the instances that looks
# like a regular configuration file except there are slots for the
# variables (e.g., 'alpha', 'T_final', etc.). 'TEMPLATE' is
# specified in the config file section for this driver.
template = Path(self.config['TEMPLATE'])
self.services.info(f'Using template config file {template}')

if not template.exists():
raise RuntimeError(
f'{template} config template file does not exist')

# Now spin up and run the instances. This function will return a list
# with each list element corresponding to an instance. You can use
# this information to find the specific instance run directory for a
# given set of variables.
#
# The "name" parameter must be unique for each ensemble within a run,
# and will be used as an identifier on the Portal.
mapping = self.services.run_ensemble(template, variables,
run_dir=Path('.').absolute(),
name='INSTANCE_',
num_nodes=1, cores_per_instance=1)

# Print each mapping of instance name to what variable values were used.
for instance in mapping:
self.services.info(f'{instance!s}')
33 changes: 33 additions & 0 deletions examples-proposed/024-aggregated-compute-ensemble/ensemble.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
SIM_NAME = simpleensemble
SIM_ROOT = $PWD/ENSEMBLES
LOG_FILE = log
LOG_LEVEL = INFO
SIMULATION_MODE = NORMAL

INPUT_DIR = $PWD/input_dir/

USE_PORTAL = True
PORTAL_URL = https://lb.ipsportal.development.svc.spin.nersc.org
# do not commit actual PORTAL_API_KEY value to version control, best to set as an environment variable
#PORTAL_API_KEY=changeme

[PORTS]
NAMES = DRIVER
[[DRIVER]]
IMPLEMENTATION = ensemble_driver

[ensemble_driver]
CLASS = DRIVER
SUB_CLASS =
NAME = EnsembleDriver
NPROC = 1
BIN_PATH =
INPUT_FILES =
OUTPUT_FILES =
SCRIPT = $PWD/driver.py
MODULE =
# Specifies the template configuration file used for instances
TEMPLATE = $PWD/template.conf
# Specifies the parameter values for each instance
PARAMETER_FILE = $PWD/values.csv

118 changes: 118 additions & 0 deletions examples-proposed/024-aggregated-compute-ensemble/gen_data.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
#!/usr/bin/env python3
"""
Used to generate synthetic data as an example.
"""
import argparse
from typing import Any
import json
import csv
from time import time
from traceback import print_exc
import numpy as np
import matplotlib.pyplot as plt

from ipsframework.resourceHelper import get_platform_info

def main(instance: str,
alpha: float, L:float, T_final:float, Nx:int, Nt:int) -> dict[str, Any]:
""" Generate synthetic data to emulate an actual simulation or complex
calculation.

As a side-effect it will save a plot to the current working directory with
the name `solution.png`.

:param instance: instance name
:param alpha: thermal diffusivity
:param L: domain length
:param T_final: final time
:param Nx: number of spatial grid points
:param Nt: number of time steps
:returns: x, y, where x is the steps and u the corresponding values
"""
start = time()

# Discretization
dx = L / (Nx - 1)
dt = T_final / Nt
r = alpha * dt / (dx ** 2)

# # Check stability condition for explicit method
if r > 0.5:
print("Warning: Stability condition r <= 0.5 is not met. "
"Results may be inaccurate.")

# Initial condition (e.g., a sine wave)
x = np.linspace(0, L, Nx)
u = np.sin(np.pi * x)

# Boundary conditions (Dirichlet, e.g., u(0,t) = 0, u(L,t) = 0) These are
# already handled by the initial setup of u=0 at boundaries if the
# initial condition is 0 there. If non-zero, they would be set within the
# time loop.

# Time evolution
for n in range(Nt):
u_new = np.copy(u) # Create a copy for updating
for i in range(1, Nx - 1):
u_new[i] = u[i] + r * (u[i + 1] - 2 * u[i] + u[i - 1])
u = u_new

# Plotting the result
plt.plot(x, u)
plt.xlabel("Position (x)")
plt.ylabel("Temperature (u)")
plt.title("Solution of 1D Heat Equation")
plt.grid(True)
plt.savefig(f"{instance}_solution.png")

# Save some per-component stats
stats_fname = f'{instance}_stats.csv'
run_env = get_platform_info()

with open(stats_fname, 'w') as f:
# Write run-time stats to a CSV as well as the runtime parameters
# specific to this instance.
writer = csv.writer(f)
writer.writerow(
['instance', 'hostname', 'pid', 'core',
'affinity',
'alpha', 'L', 'T_final', 'Nx', 'Nt',
'start', 'end'])

writer.writerow([instance, run_env['hostname'],
run_env['pid'], run_env['core_id'],
run_env['affinity'],
alpha, L, T_final, Nx, Nt,
start, time()])

return {'x': x.tolist(), 'u': u.tolist()}



if __name__ == '__main__':
try:
parser = argparse.ArgumentParser(description='Generate synthetic data to '
'emulate an actual simulation '
'or complex')
parser.add_argument('--instance', type=str,
help='instance name')
parser.add_argument('--alpha', type=float, default=1.0,)
parser.add_argument('--L', type=float, default=1.0,)
parser.add_argument('--T_final', type=float, default=1.0,)
parser.add_argument('--Nx', type=int, default=100,)
parser.add_argument('--Nt', type=int, default=100,)

args = parser.parse_args()

data = main(args.instance,
args.alpha, args.L, args.T_final, args.Nx, args.Nt)

file_name = f'{args.instance}_solution.json'
print(f'Writing to {file_name}')
with open(file_name, 'w') as f:
json.dump(data, f)

except Exception as e:
print(f'Encountered error: {e}')
print(f'Encountered error: {e}', file='gen_data_error.txt')
print_exc()
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
#!/usr/bin/env python3
"""
Component to be stepped in instance.

This should generate a PNG image, a JSON file, and a CSV file. The first
two are from synthetic data generated from `gen_data.py`. The latter is
also generated from provenance data captured in `gen_data.py`, too.
"""
from pathlib import Path
from time import time
from typing import Any

from ipsframework import Component


def create_cmd(instance: str, path: Path, alpha: float, L:float, T_final:float,
Nx:int, Nt:int) -> list[Any]:
""" create the command to run the external data generator

:param instance: instance name
:param path: path to data generator script directory
:param alpha: thermal diffusivity
:param L: domain length
:param T_final: final time
:param Nx: number of spatial grid points
:param Nt: number of time steps
:returns: list of command line arguments to be executed in step()
"""
executable = f'{path!s}/gen_data.py'
cmd = ['python3', executable, '--instance', instance,
'--alpha', alpha, '--L', L, '--T_final', T_final,
'--Nx', Nx, '--Nt', Nt]
return cmd


class InstanceComponent(Component):
def step(self, timestamp: float = 0.0, **keywords):
start = time()

# ENSEMBLE_INSTANCE is a special IPS variable that contains the
# string uniquely identifying this instance. Each instance will have
# the `run_ensemble()` `name` argument prepended to a unique number
# for each instance. E.g., ENSEMBLE_INSTANCE might be "MY_INSTANCE_23".
instance_id = self.services.get_config_param('ENSEMBLE_INSTANCE')
self.services.info(f'{instance_id}: Start of step of instance component.')

# Echo the parameters we're expecting, A, B, and C
self.services.info(f'{instance_id}: instance component parameters: '
f'alpha={self.alpha}, L={self.L}, '
f'T_final={self.T_final}, Nx={self.Nx}, '
f'Nt={self.Nt}')

cmd = create_cmd(instance_id, Path(self.BIN_PATH),
self.alpha, self.L, self.T_final, self.Nx, self.Nt)

working_dir = str(Path('.').absolute())
self.services.info(f'{instance_id}: Launching executable '
f'in {working_dir}')
run_id = None
try:
cmd = ' '.join(cmd) # need one big ole string for executing tasks
run_id = self.services.launch_task(nproc=1,
working_dir=working_dir,
binary=cmd)
except Exception as e:
self.services.critical(f'{instance_id}: Unable to launch '
f'executable in {working_dir}')

return_value = self.services.wait_task(run_id) # block until done

self.services.info(f'{instance_id}: Completed MPI executable with '
f'return value: {return_value}.')

# TODO temporarily commenting this out until the actual
# example is ready to consider adding data files to the portal. This
# originally came from code Lance wrote in a previous example.
# try:
# self.services.add_analysis_data_files([data_fname, stats_fname], timestamp)
# except Exception:
# print('did not add data files to portal, check logs')

self.services.info(f'{instance_id}: End of step of instance component.')
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
#!/usr/bin/env python3
"""
Driver component for instances
"""

from ipsframework import Component


class InstanceDriver(Component):
"""
Instance driver component that steps the main component
"""

def step(self, timestamp: float = 0.0, **keywords):
instance_component = self.services.get_port('WORKER')

self.services.call(instance_component, 'step', 0.0)
Loading
Loading