diff --git a/docs/changelog.md b/docs/changelog.md index 1a74a30..93b67eb 100644 --- a/docs/changelog.md +++ b/docs/changelog.md @@ -1,5 +1,12 @@ # Release Notes +## Version 1.3.0 (not released yet) + +- Introduced the `pySWATPlus.Calibration` class for parameter calibration using multi-objective optimization, evolutionary algorithms, and parallel computing. + +- Added the `pySWATPlus.SensitivityAnalyzer.simulation_and_indices` method to compute sensitivity indices directly against observed data without saving detailed simulation results. + + ## Version 1.2.0 (October 13, 2025) - All SWAT+ simulations with modified parameters are now configured through the `calibration.cal` file, eliminating the need to read and modify individual input files. @@ -31,7 +38,6 @@ - Renamed `set_begin_and_end_date` to `set_simulation_period` for better consistency. - ## Version 1.1.0 (August 26, 2025) - Added a new class `pySWATPlus.SensitivityAnalyzer` to support sensitivity simulations with two main methods: diff --git a/docs/userguide/model_calibration.md b/docs/userguide/model_calibration.md index 049029c..d09607c 100644 --- a/docs/userguide/model_calibration.md +++ b/docs/userguide/model_calibration.md @@ -33,7 +33,8 @@ the calibration interface offers flexible options for optimizing model parameter The interface provides a [`Calibration`](https://swat-model.github.io/pySWATPlus/api/calibration/) class that must be initialized with the required parameters. -This class includes the `parameter_optimization` method, which performs parameter optimization using multi-objective algorithms, evolutionary strategies, and parallel computation. +This class includes the [`parameter_optimization`](https://swat-model.github.io/pySWATPlus/api/calibration/#pySWATPlus.Calibration.parameter_optimization) method, +which performs parameter optimization using multi-objective algorithms, evolutionary strategies, and parallel computation. The following code provides an example of optimizing flow discharge for both daily and monthly time-series data using multi-objective evolutionary computation. The usage of both daily and monthly flow discharge is just for illustrative purposes on how multi-objective optimization can be performed. Users should replace monthly flow @@ -41,7 +42,7 @@ discharge by nitorgen or phosporus concentration according to their needs. ```python -# Calibration parameter space +# Define parameter space parameters = [ { 'name': 'esco', @@ -84,7 +85,7 @@ observe_data = { } # Objective configuration -objectives = { +objective_config = { 'channel_sd_day.txt': { 'sim_col': 'flo_out', 'obs_col': 'discharge', @@ -105,7 +106,7 @@ if __name__ == '__main__': txtinout_dir=r"C:\Users\Username\project\Scenarios\Default\TxtInOut", extract_data=extract_data, observe_data=observe_data, - objectives=objectives, + objective_config=objective_config, algorithm='NSGA2', n_gen=2, pop_size=5 diff --git a/docs/userguide/sensitivity_interface.md b/docs/userguide/sensitivity_interface.md index eccb5b5..5bd3d7a 100644 --- a/docs/userguide/sensitivity_interface.md +++ b/docs/userguide/sensitivity_interface.md @@ -130,4 +130,81 @@ output = pySWATPlus.SensitivityAnalyzer().parameter_sensitivity_indices( indicators=['KGE', 'RMSE'], json_file=r"C:\Users\Username\sensitivity_indices.json" ) +``` + + +## Integrated Simulation and Sensitivity + +To compute sensitivity indices directly for multiple outputs against observed data and skip saving the detailed simulation results, use the following interface: + + +```python +# Define parameter space +parameters = [ + { + 'name': 'esco', + 'change_type': 'absval', + 'lower_bound': 0, + 'upper_bound': 1 + }, + { + 'name': 'perco', + 'change_type': 'absval', + 'lower_bound': 0, + 'upper_bound': 1 + } +] + + +# Extract data configuration +extract_data = { + 'channel_sd_day.txt': { + 'has_units': True, + 'apply_filter': {'name': ['cha561']} + }, + 'channel_sd_mon.txt': { + 'has_units': True, + 'ref_day': 1, + 'apply_filter': {'name': ['cha561']} + } +} + +# Observe data configuration +observe_data = { + 'channel_sd_day.txt': { + 'obs_file': r"C:\Users\Username\observed_folder\discharge_daily.csv", + 'date_format': '%Y-%m-%d' + }, + 'channel_sd_mon.txt': { + 'obs_file': r"C:\Users\Username\observed_folder\discharge_monthly.csv", + 'date_format': '%Y-%m-%d' + } +} + +# Metric configuration +metric_config = { + 'channel_sd_day.txt': { + 'sim_col': 'flo_out', + 'obs_col': 'discharge', + 'indicator': 'NSE' + }, + 'channel_sd_mon.txt': { + 'sim_col': 'flo_out', + 'obs_col': 'mean', + 'indicator': 'RMSE' + } +} + +# Sensitivity indices +if __name__ == '__main__': + output = pySWATPlus.SensitivityAnalyzer().simulation_and_indices( + parameters=parameters, + sample_number=1, + sensim_dir=r"C:\Users\dpal22\Desktop\swat_run\experiment_dominant_hru\empty_dir", + txtinout_dir=r"C:\Users\dpal22\Desktop\swat_run\experiment_dominant_hru\txtinout_copy", + extract_data=extract_data, + observe_data=observe_data, + metric_config=metric_config + ) + print(output) ``` \ No newline at end of file diff --git a/pySWATPlus/calibration.py b/pySWATPlus/calibration.py index f9f3ab9..95e1215 100644 --- a/pySWATPlus/calibration.py +++ b/pySWATPlus/calibration.py @@ -86,7 +86,7 @@ class Calibration(pymoo.core.problem.Problem): # type: ignore[misc] !!! note The sub-key `usecols` should **not** be included here. Although no error will be raised, it will be ignored during class initialization - because the `sim_col` sub-key from the `objectives` input is automatically used as `usecols`. Including it manually has no effect. + because the `sim_col` sub-key from the `objective_config` input is automatically used as `usecols`. Including it manually has no effect. ```python extract_data = { @@ -124,7 +124,7 @@ class Calibration(pymoo.core.problem.Problem): # type: ignore[misc] } ``` - objectives (dict[str, dict[str, str]]): A nested dictionary specifying objectives configuration. The top-level keys + objective_config (dict[str, dict[str, str]]): A nested dictionary specifying objectives configuration. The top-level keys are same as keys of `extract_data` (e.g., `channel_sd_day.txt`). Each key must map to a non-empty dictionary containing the following sub-keys: - `sim_col` (str): **Required.** Name of the column containing simulated values. @@ -142,7 +142,7 @@ class Calibration(pymoo.core.problem.Problem): # type: ignore[misc] Avoid using `MARE` if `obs_col` contains zero values, as it will cause a division-by-zero error. ```python - objectives = { + objective_config = { 'channel_sd_day.txt': { 'sim_col': 'flo_out', 'obs_col': 'discharge', @@ -186,7 +186,7 @@ def __init__( txtinout_dir: str | pathlib.Path, extract_data: dict[str, dict[str, typing.Any]], observe_data: dict[str, dict[str, str]], - objectives: dict[str, dict[str, str]], + objective_config: dict[str, dict[str, str]], algorithm: str, n_gen: int, pop_size: int, @@ -205,11 +205,17 @@ def __init__( calsim_dir = pathlib.Path(calsim_dir).resolve() txtinout_dir = pathlib.Path(txtinout_dir).resolve() - # Check same top-level keys in dictionaries - if not (extract_data.keys() == observe_data.keys() == objectives.keys()): - raise KeyError( - 'Mismatch of key names. Ensure extract_data, observe_data, and objectives have identical top-level keys.' - ) + # Validate same top-level keys in dictionaries + validators._dict_key_equal( + extract_data=extract_data, + observe_data=observe_data, + objective_config=objective_config + ) + + # Dictionary of metric key name + df_key = { + obj: obj.split('.')[0] + '_df' for obj in objective_config + } # Validate initialization of TxtinoutReader class tmp_reader = TxtinoutReader( @@ -232,29 +238,31 @@ def __init__( ) # Validate objectives configuration - self._validate_objectives_config( - objectives=objectives + validators._metric_config( + input_dict=objective_config, + var_name='objective_config' ) + for obj in objective_config: + if objective_config[obj]['indicator'] == 'PBIAS': + raise ValueError( + 'Indicator "PBIAS" is invalid in objective_config; it lacks a defined optimization direction' + ) # Validate observe_data configuration - self._validate_observe_data_config( + validators._observe_data_config( observe_data=observe_data ) # Dictionary of observed DataFrames - observe_dict = {} - for key in observe_data: - key_df = utils._df_observe( - obs_file=pathlib.Path(observe_data[key]['obs_file']).resolve(), - date_format=observe_data[key]['date_format'], - obs_col=objectives[key]['obs_col'] - ) - key_df.columns = ['date', 'obs'] - observe_dict[key.split('.')[0] + '_df'] = key_df + observe_dict = utils._observe_data_dict( + observe_data=observe_data, + metric_config=objective_config, + df_key=df_key + ) # Validate extract_data configuration for key in extract_data: - extract_data[key]['usecols'] = [objectives[key]['sim_col']] + extract_data[key]['usecols'] = [objective_config[key]['sim_col']] validators._extract_data_config( extract_data=extract_data ) @@ -274,8 +282,9 @@ def __init__( # Initalize parameters self.params_bounds = params_bounds self.extract_data = extract_data - self.objectives = objectives + self.objective_config = objective_config self.var_names = var_names + self.df_key = df_key self.observe_dict = observe_dict self.calsim_dir = calsim_dir self.txtinout_dir = txtinout_dir @@ -289,7 +298,7 @@ def __init__( # Access properties and methods from Problem class super().__init__( n_var=len(params_bounds), - n_obj=len(objectives), + n_obj=len(objective_config), xl=numpy.array(var_lb), xu=numpy.array(var_ub) ) @@ -350,16 +359,12 @@ def _evaluate( # Simulation output for the population pop_sim = cpu_dict[tuple(pop)] # Iterate objectives - for obj in self.objectives: - # Objective indicator - obj_ind = self.objectives[obj]['indicator'] - # Objective key name to extract simulated and obseved DataFrames - obj_key = obj.split('.')[0] + '_df' + for obj in self.objective_config: # Simulated DataFrame - sim_df = pop_sim[obj_key] + sim_df = pop_sim[self.df_key[obj]] sim_df.columns = ['date', 'sim'] # Observed DataFrame - obs_df = self.observe_dict[obj_key] + obs_df = self.observe_dict[self.df_key[obj]] # Merge simulated and observed DataFrames by 'date' column merge_df = sim_df.merge( right=obs_df, @@ -372,6 +377,7 @@ def _evaluate( norm_col='obs' ) # Indicator method from abbreviation + obj_ind = self.objective_config[obj]['indicator'] indicator_method = getattr( PerformanceMetrics(), f'compute_{obj_ind.lower()}' @@ -414,94 +420,6 @@ def _objectives_directions( return objs_dirs - def _validate_observe_data_config( - self, - observe_data: dict[str, dict[str, str]], - ) -> None: - ''' - Validate `observe_data` configuration. - ''' - - # List of valid sub-keys of sub-dictionaries - valid_subkeys = [ - 'obs_file', - 'date_format' - ] - - # Iterate dictionary - for file_key, file_dict in observe_data.items(): - # Check type of a sub-dictionary - if not isinstance(file_dict, dict): - raise TypeError( - f'Expected "{file_key}" in observe_data must be a dictionary, ' - f'but got type "{type(file_dict).__name__}"' - ) - # Check sub-dictionary length - if len(file_dict) != 2: - raise ValueError( - f'Length of "{file_key}" sub-dictionary in observe_data must be 2, ' - f'but got {len(file_dict)}' - ) - # Iterate sub-key - for sub_key in file_dict: - # Check valid sub-key - if sub_key not in valid_subkeys: - raise KeyError( - f'Invalid sub-key "{sub_key}" for "{file_key}" in observe_data; ' - f'expected sub-keys are {json.dumps(valid_subkeys)}' - ) - - return None - - def _validate_objectives_config( - self, - objectives: dict[str, dict[str, str]], - ) -> None: - ''' - Validate `objectives` configuration. - ''' - - # List of valid sub-keys of sub-dictionaries - valid_subkeys = [ - 'sim_col', - 'obs_col', - 'indicator' - ] - - valid_indicators = [ - key for key in PerformanceMetrics().indicator_names if key != 'PBIAS' - ] - - # Iterate dictionary - for file_key, file_dict in objectives.items(): - # Check type of a sub-dictionary - if not isinstance(file_dict, dict): - raise TypeError( - f'Expected "{file_key}" in "objectives" must be a dictionary, ' - f'but got type "{type(file_dict).__name__}"' - ) - # Check sub-dictionary length - if len(file_dict) != 3: - raise ValueError( - f'Length of "{file_key}" sub-dictionary in "objectives" must be 3, ' - f'but got {len(file_dict)}' - ) - # Iterate sub-key - for sub_key in file_dict: - # Check valid sub-key - if sub_key not in valid_subkeys: - raise KeyError( - f'Invalid sub-key "{sub_key}" for "{file_key}" in "objectives"; ' - f'expected sub-keys are {json.dumps(valid_subkeys)}' - ) - if sub_key == 'indicator' and file_dict[sub_key] not in valid_indicators: - raise ValueError( - f'Invalid "indicator" value "{file_dict[sub_key]}" for "{file_key}" in "objectives"; ' - f'expected indicators are {valid_indicators}' - ) - - return None - def _algorithm_class( self, algorithm: str @@ -510,6 +428,9 @@ def _algorithm_class( Retrieve the optimization algorithm class from the `pymoo` package. ''' + single_obj = ['GA', 'DE'] + multi_obj = ['NSGA2'] + # Dictionary mapping between algorithm name and module api_module = { 'GA': importlib.import_module('pymoo.algorithms.soo.nonconvex.ga'), @@ -523,6 +444,12 @@ def _algorithm_class( f'Invalid algorithm "{algorithm}"; valid names are {list(api_module.keys())}' ) + # Check single objective algorithm cannot be used for multiple objectives + if len(self.objective_config) >= 2 and algorithm in single_obj: + raise ValueError( + f'Algorithm "{algorithm}" cannot handle multiple objectives; use one of {multi_obj}' + ) + # Algorithm class alg_class = typing.cast( type, @@ -540,15 +467,15 @@ def parameter_optimization( This method executes the optimization process and returns a dictionary containing the optimized parameters, corresponding objective values, and total execution time. - Two JSON files are saved in the input directory `calsim_dir`. + The following JSON files are saved in `calsim_dir`: - The file `optimization_history.json` stores the optimization history. Each key in this - file is an integer starting from 1, representing the generation number. The corresponding value - is a sub-dictionary with two keys: `pop` for the population data (decision variables) and `obj` - for the objective function values. This file is useful for analyzing optimization progress, - convergence trends, performance indicators, and visualization. + - `optimization_history.json`: A dictionary containing the optimization history. Each key in this + file is an integer starting from 1, representing the generation number. The corresponding value + is a sub-dictionary with two keys: `pop` for the population data (decision variables) and `obj` + for the objective function values. This file is useful for analyzing optimization progress, + convergence trends, performance indicators, and visualization. - The file `optimization_result.json` contains the final output dictionary described below. + - `optimization_result.json`: A dictionary containing the final output dictionary. Returns: Dictionary with the following keys: @@ -599,7 +526,7 @@ def parameter_optimization( # Sign of objective directions objs_dirs = self._objectives_directions() dir_list = [ - objs_dirs[v['indicator']] for k, v in self.objectives.items() + objs_dirs[v['indicator']] for k, v in self.objective_config.items() ] dir_sign = numpy.where(numpy.array(dir_list) == 'max', -1, 1) @@ -610,18 +537,16 @@ def parameter_optimization( 'pop': gen.pop.get('X').tolist(), 'obj': (gen.pop.get('F') * dir_sign).tolist() } - json_file = self.calsim_dir / 'optimization_history.json' - with open(json_file, 'w') as output_write: + with open(self.calsim_dir / 'optimization_history.json', 'w') as output_write: json.dump(opt_hist, output_write, indent=4) # Optimized output of parameters, objectives, and execution times - required_time = round(result.exec_time) opt_output = { 'algorithm': self.algorithm, 'generation': self.n_gen, 'population': self.pop_size, 'total_simulation': self.pop_size * self.n_gen, - 'time_sec': required_time, + 'time_sec': round(result.exec_time), 'variables': result.X, 'objectives': result.F * dir_sign } @@ -631,8 +556,7 @@ def parameter_optimization( save_output = { k: v.tolist() if k.startswith(('var', 'obj')) else v for k, v in save_output.items() } - json_file = self.calsim_dir / 'optimization_result.json' - with open(json_file, 'w') as output_write: + with open(self.calsim_dir / 'optimization_result.json', 'w') as output_write: json.dump(save_output, output_write, indent=4) return opt_output diff --git a/pySWATPlus/sensitivity_analyzer.py b/pySWATPlus/sensitivity_analyzer.py index 1d81443..7777621 100644 --- a/pySWATPlus/sensitivity_analyzer.py +++ b/pySWATPlus/sensitivity_analyzer.py @@ -48,7 +48,7 @@ def _create_sobol_problem( return problem - def _save_output_in_json( + def _write_simulation_in_json( self, sensim_dir: pathlib.Path, sensim_output: dict[str, typing.Any] @@ -202,12 +202,12 @@ def simulation_by_sample_parameters( will be deleted dynamically after collecting the required data. Returns: - A dictionary with the follwoing keys: + Dictionary with the follwoing keys: - `time`: A dictionary containing time-related statistics: - - `sample_length`: Total number of samples, including duplicates. - `time_sec`: Total time in seconds for the simulation. + - `sample_length`: Total number of samples, including duplicates. - `time_per_sample_sec`: Average simulation time per sample in seconds. - `problem`: The problem definition dictionary passed to `Sobol` sampling, containing: @@ -251,7 +251,7 @@ def simulation_by_sample_parameters( Otherwise, no error will be raised by the system, but simulation outputs may not be generated. ''' - # start time + # Start time start_time = time.time() # Check input variables type @@ -376,13 +376,36 @@ def simulation_by_sample_parameters( # Write output to the file 'sensitivity_simulation.json' in simulation folder if save_output: - self._save_output_in_json( + self._write_simulation_in_json( sensim_dir=sensim_dir, sensim_output=sensim_output ) return sensim_output + def _write_index_in_json( + self, + index_dict: dict[str, typing.Any], + json_file: pathlib.Path + ) -> None: + ''' + Write sensitivity indices outputs a JSON file. + ''' + + # covert array to list + copy_index = copy.deepcopy(index_dict) + write_index = {} + for i in copy_index: + write_index[i] = { + k: v.tolist() for k, v in copy_index[i].items() + } + + # write index data + with open(json_file, 'w') as output_json: + json.dump(write_index, output_json, indent=4) + + return None + def parameter_sensitivity_indices( self, sensim_file: str | pathlib.Path, @@ -471,23 +494,16 @@ def parameter_sensitivity_indices( ) sensitivity_indices[indicator] = indicator_sensitivity - # Save the sensitivity indices + # Write the sensitivity indices if json_file is not None: - # Raise error for invalid JSON file extension json_file = pathlib.Path(json_file).resolve() validators._json_extension( json_file=json_file ) - # Modify sensitivity index to write in the JSON file - copy_indices = copy.deepcopy(sensitivity_indices) - write_indices = {} - for indicator in indicators: - write_indices[indicator] = { - k: v.tolist() for k, v in copy_indices[indicator].items() - } - # saving output data - with open(json_file, 'w') as output_json: - json.dump(write_indices, output_json, indent=4) + self._write_index_in_json( + index_dict=sensitivity_indices, + json_file=json_file + ) # Output dictionary output = { @@ -496,3 +512,374 @@ def parameter_sensitivity_indices( } return output + + def simulation_and_indices( + self, + parameters: newtype.BoundType, + sample_number: int, + sensim_dir: str | pathlib.Path, + txtinout_dir: str | pathlib.Path, + extract_data: dict[str, dict[str, typing.Any]], + observe_data: dict[str, dict[str, str]], + metric_config: dict[str, dict[str, str]], + max_workers: typing.Optional[int] = None + ) -> dict[str, typing.Any]: + ''' + Warning: + This method is currently under development. + + Provide a high-level interface for directly computing sensitivity indices. Similar to the method + [`simulation_by_sample_parameters`](https://swat-model.github.io/pySWATPlus/api/sensitivity_analyzer/#pySWATPlus.SensitivityAnalyzer.simulation_by_sample_parameters), + it follows the same computational approach but skips saving the simulated data, instead computing sensitivity indices directly against the observed data. + + The method returns a dictionary containing keys correspond to entries in `metric_config`, and values are the computed sensitivity indices. + + The following JSON files are saved in `sensim_dir`: + + - `sensitivity_indices.json`: A dictionary where keys correspond to entries in `metric_config`, and values are the computed sensitivity indices. + - `time.json`: A dictionary containing the computation time details. + + Args: + parameters (newtype.BoundType): List of dictionaries defining parameter configurations for sensitivity simulations. + Each dictionary contain the following keys: + + - `name` (str): **Required.** Name of the parameter in the `cal_parms.cal` file. + - `change_type` (str): **Required.** Type of change to apply. Must be one of 'absval', 'abschg', or 'pctchg'. + - `lower_bound` (float): **Required.** Lower bound for the parameter. + - `upper_bound` (float): **Required.** Upper bound for the parameter. + - `units` (Iterable[int]): Optional. List of unit IDs to which the parameter change should be constrained. + - `conditions` (dict[str, list[str]]): Optional. Conditions to apply when changing the parameter. + Supported keys include `'hsg'`, `'texture'`, `'plant'`, and `'landuse'`, each mapped to a list of allowed values. + + ```python + parameters = [ + { + 'name': 'cn2', + 'change_type': 'pctchg', + 'lower_bound': 25, + 'upper_bound': 75, + }, + { + 'name': 'perco', + 'change_type': 'absval', + 'lower_bound': 0, + 'upper_bound': 1, + 'conditions': {'hsg': ['A']} + }, + { + 'name': 'bf_max', + 'change_type': 'absval', + 'lower_bound': 0.1, + 'upper_bound': 2.0, + 'units': range(1, 194) + } + ] + ``` + + sample_number (int): sample_number (int): Determines the number of samples. + Generates an array of length `2^N * (D + 1)`, where `D` is the number of parameter changes + and `N = sample_number + 1`. For example, when `sample_number` is 1, 12 samples will be generated. + + sensim_dir (str | pathlib.Path): Path to the directory where individual simulations for each parameter set will be performed. + Raises an error if the folder is not empty. This precaution helps prevent data deletion, overwriting directories, + and issues with reading required data files not generated by the simulation. + + txtinout_dir (str | pathlib.Path): Path to the `TxtInOut` directory containing the required files for SWAT+ simulation. + + extract_data (dict[str, dict[str, typing.Any]]): A nested dictionary specifying how to extract data from SWAT+ simulation output files. + The top-level keys are filenames of the output files, without paths (e.g., `channel_sd_day.txt`). Each key must map to a non-empty dictionary + containing the following sub-keys, which correspond to the input variables within the method + [`simulated_timeseries_df`](https://swat-model.github.io/pySWATPlus/api/data_manager/#pySWATPlus.DataManager.simulated_timeseries_df): + + - `has_units` (bool): **Required.** If `True`, the third line of the simulated file contains units for the columns. + - `begin_date` (str): Optional. Start date in `DD-Mon-YYYY` format (e.g., 01-Jan-2010). Defaults to the earliest date in the simulated file. + - `end_date` (str): Optional. End date in `DD-Mon-YYYY` format (e.g., 31-Dec-2013). Defaults to the latest date in the simulated file. + - `ref_day` (int): Optional. Reference day for monthly and yearly time series. + If `None` (default), the last day of the month or year is used, obtained from simulation. Not applicable to daily time series files (ending with `_day`). + - `ref_month` (int): Optional. Reference month for yearly time series. If `None` (default), the last month of the year is used, obtained from simulation. + Not applicable to monthly time series files (ending with `_mon`). + - `apply_filter` (dict[str, list[typing.Any]]): Optional. Each key is a column name and the corresponding value + is a list of allowed values for filtering rows in the DataFrame. By default, no filtering is applied. + An error is raised if filtering produces an empty DataFrame. + + !!! note + The sub-key `usecols` should **not** be included here. Although no error will be raised, it will be ignored during class initialization + because the `sim_col` sub-key from the `objective_config` input is automatically used as `usecols`. Including it manually has no effect. + + ```python + extract_data = { + 'channel_sd_mon.txt': { + 'has_units': True, + 'begin_date': '01-Jun-2014', + 'end_date': '01-Oct-2016', + 'apply_filter': {'gis_id': [561], 'yr': [2015, 2016]}, + 'usecols': ['gis_id', 'flo_out'] + }, + 'channel_sd_yr.txt': { + 'has_units': True, + 'apply_filter': {'name': ['cha561'], 'yr': [2015, 2016]}, + 'usecols': ['gis_id', 'flo_out'] + } + } + ``` + + observe_data (dict[str, dict[str, str]]): A nested dictionary specifying observed data configuration. The top-level keys + are same as keys of `extract_data` (e.g., `channel_sd_day.txt`). Each key must map to a non-empty dictionary containing the following sub-keys: + + - `obs_file` (str): **Required.** Path to the CSV file containing observed data. The file must include a `date` column with comma as the + separator to read the `DataFrame` by the [`pandas.read_csv`](https://pandas.pydata.org/docs/reference/api/pandas.read_csv.html#pandas-read-csv) + method. The `date` col will be used to merge simulated and observed data. + - `date_format` (str): **Required.** Date format of the `date` column in `obs_file`, used to parse `datetime.date` objects from date strings. + + ```python + observe_data = { + 'channel_sd_day.txt': { + 'obs_file': "C:\\Users\\Username\\observed_data\\discharge_daily.csv", + 'date_format': '%Y-%m-%d' + }, + 'channel_sd_mon.txt': { + 'obs_file': "C:\\Users\\Username\\observed_data\\discharge_monthly.csv", + 'date_format': '%Y-%m-%d' + } + } + ``` + + metric_config (dict[str, dict[str, str]]): A nested dictionary specifying metric configuration. The top-level keys + are same as keys of `extract_data` (e.g., `channel_sd_day.txt`). Each key must map to a non-empty dictionary containing the following sub-keys: + + - `sim_col` (str): **Required.** Name of the column containing simulated values. + - `obs_col` (str): **Required.** Name of the column containing observed values. + - `indicator` (str): **Required.** Name of the performance indicator used for optimization. + Available options with their optimization direction are listed below: + + - `NSE`: Nash–Sutcliffe Efficiency. + - `KGE`: Kling–Gupta Efficiency. + - `MSE`: Mean Squared Error. + - `RMSE`: Root Mean Squared Error. + - `PBIAS`: Percent Bias. + - `MARE`: Mean Absolute Relative Error. + + !!! tip + Avoid using `MARE` if `obs_col` contains zero values, as it will cause a division-by-zero error. + + ```python + metric_config = { + 'channel_sd_day.txt': { + 'sim_col': 'flo_out', + 'obs_col': 'discharge', + 'indicator': 'NSE' + }, + 'channel_sd_mon.txt': { + 'sim_col': 'flo_out', + 'obs_col': 'discharge', + 'indicator': 'MSE' + } + } + ``` + + max_workers (int): Number of logical CPUs to use for parallel processing. If `None` (default), all available logical CPUs are used. + + Returns: + Dictionary where keys correspond to entries in `metric_config`, and values are the computed sensitivity indices. + ''' + + # Start time + start_time = time.time() + + # Check input variables type + validators._variable_origin_static_type( + vars_types=typing.get_type_hints( + obj=self.simulation_and_indices + ), + vars_values=locals() + ) + + # Absolute directory path + sensim_dir = pathlib.Path(sensim_dir).resolve() + txtinout_dir = pathlib.Path(txtinout_dir).resolve() + + # Validate same top-level keys in dictionaries + validators._dict_key_equal( + extract_data=extract_data, + observe_data=observe_data, + metric_config=metric_config + ) + + # Dictionary of DataFrame key name + df_key = { + m: m.split('.')[0] + '_df' for m in metric_config + } + + # Validate initialization of TxtinoutReader class + tmp_reader = TxtinoutReader( + tio_dir=txtinout_dir + ) + + # Disable CSV print to save time + tmp_reader.disable_csv_print() + + # List of BoundDict objects + params_bounds = utils._parameters_bound_dict_list( + parameters=parameters + ) + + # Validate configuration of simulation parameters + validators._simulation_preliminary_setup( + sim_dir=sensim_dir, + tio_dir=txtinout_dir, + parameters=params_bounds + ) + + # Validate metric configuration + validators._metric_config( + input_dict=metric_config, + var_name='metric_config' + ) + + # Validate observe_data configuration + validators._observe_data_config( + observe_data=observe_data + ) + + # Dictionary of observed DataFrames + observe_dict = utils._observe_data_dict( + observe_data=observe_data, + metric_config=metric_config, + df_key=df_key + ) + + # Validate extract_data configuration + for key in extract_data: + extract_data[key]['usecols'] = [metric_config[key]['sim_col']] + validators._extract_data_config( + extract_data=extract_data + ) + + # problem dictionary + problem = self._create_sobol_problem( + params_bounds=params_bounds + ) + copy_problem = copy.deepcopy( + x=problem + ) + + # Generate sample array + sample_array = SALib.sample.sobol.sample( + problem=copy_problem, + N=pow(2, sample_number) + ) + + # Unique array to avoid duplicate computations + unique_array = numpy.unique( + ar=sample_array, + axis=0 + ) + + # Number of unique simulations + num_sim = len(unique_array) + + # Simulation in separate CPU + cpu_sim = functools.partial( + cpu._simulation_output, + num_sim=num_sim, + var_names=copy_problem['names'], + sim_dir=sensim_dir, + tio_dir=txtinout_dir, + params_bounds=params_bounds, + extract_data=extract_data, + clean_setup=True + ) + + # Assign model simulation in individual computer CPU and collect results + cpu_dict = {} + with concurrent.futures.ProcessPoolExecutor(max_workers=max_workers) as executor: + # Multicore simulation + futures = [ + executor.submit(cpu_sim, idx, arr) for idx, arr in enumerate(unique_array, start=1) + ] + for future in concurrent.futures.as_completed(futures): + # Message simulation completion for tracking + print(f'Completed simulation: {futures.index(future) + 1}/{num_sim}', flush=True) + # Collect simulation results + future_result = future.result() + cpu_dict[tuple(future_result['array'])] = { + k: v for k, v in future_result.items() if k != 'array' + } + + # An empty dictionary to store metric values for array + metric_dict = {} + + # Iterate unique array + for arr in unique_array: + # Empty dictionary to store DataFrame metric + arr_ind = {} + # Simulation output for the array + arr_sim = cpu_dict[tuple(arr)] + # Iterate metric + for m in metric_config: + # Simulated DataFrame + sim_df = arr_sim[df_key[m]] + sim_df.columns = ['date', 'sim'] + # Observed DataFrame + obs_df = observe_dict[df_key[m]] + # Merge simulated and observed DataFrames by 'date' column + merge_df = sim_df.merge( + right=obs_df, + how='inner', + on='date' + ) + # Normalized DataFrame + norm_df = utils._df_normalize( + df=merge_df[['sim', 'obs']], + norm_col='obs' + ) + # Indicator method from abbreviation + ind_abbr = metric_config[m]['indicator'] + ind_method = getattr( + PerformanceMetrics(), + f'compute_{ind_abbr.lower()}' + ) + # Indicator value computed from method + ind_val = ind_method( + df=norm_df, + sim_col='sim', + obs_col='obs' + ) + # Store DataFrame metric in dictionary + arr_ind[df_key[m]] = ind_val + # Store metric values for array + metric_dict[tuple(arr)] = arr_ind + + # Sensitivity indices + sensitivity_indices = {} + for m in metric_config: + # List of metric values + m_list = [ + metric_dict[tuple(arr)][df_key[m]] for arr in sample_array + ] + # Indicator sensitivity indices + m_index = SALib.analyze.sobol.analyze( + problem=copy.deepcopy(problem), + Y=numpy.array(m_list) + ) + sensitivity_indices[m] = m_index + + # Write the sensitivity indices + self._write_index_in_json( + index_dict=sensitivity_indices, + json_file=sensim_dir / 'sensitivity_indices.json' + ) + + # Time statistics + required_time = time.time() - start_time + time_stats = { + 'time_sec': round(required_time), + 'sample_length': len(sample_array), + 'time_per_sample_sec': round(required_time / len(sample_array), 1) + } + + # Write time statistics + with open(sensim_dir / 'time.json', 'w') as output_json: + json.dump(time_stats, output_json, indent=4) + + return sensitivity_indices diff --git a/pySWATPlus/utils.py b/pySWATPlus/utils.py index d461155..2a7d32d 100644 --- a/pySWATPlus/utils.py +++ b/pySWATPlus/utils.py @@ -393,3 +393,25 @@ def _parameters_name_with_counter( name_counter.append(f'{p_name}|{current_count[p_name]}') return name_counter + + +def _observe_data_dict( + observe_data: dict[str, dict[str, str]], + metric_config: dict[str, dict[str, str]], + df_key: dict[str, str] +) -> dict[str, pandas.DataFrame]: + ''' + Generate a dictionary mapping each entry in `observed_data` to its corresponding `DataFrame`. + ''' + + observe_dict = {} + for obs in observe_data: + obs_df = _df_observe( + obs_file=pathlib.Path(observe_data[obs]['obs_file']).resolve(), + date_format=observe_data[obs]['date_format'], + obs_col=metric_config[obs]['obs_col'] + ) + obs_df.columns = ['date', 'obs'] + observe_dict[df_key[obs]] = obs_df + + return observe_dict diff --git a/pySWATPlus/validators.py b/pySWATPlus/validators.py index a569751..949af50 100644 --- a/pySWATPlus/validators.py +++ b/pySWATPlus/validators.py @@ -414,3 +414,121 @@ def _extract_data_config( ) return None + + +def _metric_config( + input_dict: dict[str, dict[str, str]], + var_name: str +) -> None: + ''' + Validate metric dictionary configuration. + ''' + + # List of valid sub-keys of sub-dictionaries + valid_subkeys = [ + 'sim_col', + 'obs_col', + 'indicator' + ] + + # List of valid indicators + valid_indicators = [ + 'NSE', + 'KGE', + 'MSE', + 'RMSE', + 'PBIAS', + 'MARE' + ] + + # Iterate dictionary + for file_key, file_dict in input_dict.items(): + # Check type of a sub-dictionary + if not isinstance(file_dict, dict): + raise TypeError( + f'Expected "{file_key}" in {var_name} must be a dictionary, ' + f'but got type "{type(file_dict).__name__}"' + ) + # Check sub-dictionary length + if len(file_dict) != 3: + raise ValueError( + f'Length of "{file_key}" sub-dictionary in {var_name} must be 3, ' + f'but got {len(file_dict)}' + ) + # Iterate sub-key + for sub_key in file_dict: + # Check valid sub-key + if sub_key not in valid_subkeys: + raise KeyError( + f'Invalid sub-key "{sub_key}" for "{file_key}" in {var_name}; ' + f'expected sub-keys are {json.dumps(valid_subkeys)}' + ) + if sub_key == 'indicator' and file_dict[sub_key] not in valid_indicators: + raise ValueError( + f'Invalid "indicator" value "{file_dict[sub_key]}" for "{file_key}" in {var_name}; ' + f'expected indicators are {valid_indicators}' + ) + + return None + + +def _observe_data_config( + observe_data: dict[str, dict[str, str]], +) -> None: + ''' + Validate `observe_data` configuration. + ''' + + # List of valid sub-keys of sub-dictionaries + valid_subkeys = [ + 'obs_file', + 'date_format' + ] + + # Iterate dictionary + for file_key, file_dict in observe_data.items(): + # Check type of a sub-dictionary + if not isinstance(file_dict, dict): + raise TypeError( + f'Expected "{file_key}" in observe_data must be a dictionary, ' + f'but got type "{type(file_dict).__name__}"' + ) + # Check sub-dictionary length + if len(file_dict) != 2: + raise ValueError( + f'Length of "{file_key}" sub-dictionary in observe_data must be 2, ' + f'but got {len(file_dict)}' + ) + # Iterate sub-key + for sub_key in file_dict: + # Check valid sub-key + if sub_key not in valid_subkeys: + raise KeyError( + f'Invalid sub-key "{sub_key}" for "{file_key}" in observe_data; ' + f'expected sub-keys are {json.dumps(valid_subkeys)}' + ) + + return None + + +def _dict_key_equal( + **dicts: dict[str, typing.Any] +) -> None: + ''' + Validate equal top-level keys across input dictionaries. + ''' + + # List of dictionary name and their values + dict_items = list(dicts.items()) + + # Get reference name and keys from the first dictionary + ref_name, ref_dict = dict_items[0] + + # Compare with each subsequent dictionary + for c_name, c_dict in dict_items[1:]: + if c_dict.keys() != ref_dict.keys(): + raise ValueError( + f'Top-level keys mismatch between "{ref_name}" and "{c_name}"' + ) + + return None diff --git a/tests/test_calibration.py b/tests/test_calibration.py index 4c7d499..dd2ca6f 100644 --- a/tests/test_calibration.py +++ b/tests/test_calibration.py @@ -41,7 +41,7 @@ def test_calibration(): } # Objective configuration - objectives = { + objective_config = { 'channel_sd_mon.txt': { 'sim_col': 'flo_out', 'obs_col': 'mean', @@ -84,7 +84,7 @@ def test_calibration(): txtinout_dir=tmp1_dir, extract_data=extract_data, observe_data=observe_data, - objectives=objectives, + objective_config=objective_config, algorithm='NSGA2', n_gen=2, pop_size=3 @@ -99,6 +99,23 @@ def test_calibration(): assert os.path.exists(os.path.join(tmp2_dir, 'optimization_history.json')) assert os.path.exists(os.path.join(tmp2_dir, 'optimization_result.json')) + # Pass: compute sensitivity indices directly + with tempfile.TemporaryDirectory() as tmp3_dir: + output = pySWATPlus.SensitivityAnalyzer().simulation_and_indices( + parameters=parameters, + sample_number=1, + sensim_dir=tmp3_dir, + txtinout_dir=tmp1_dir, + extract_data=extract_data, + observe_data=observe_data, + metric_config=objective_config + ) + + assert isinstance(output, dict) + assert len(output) == 1 + assert os.path.exists(os.path.join(tmp3_dir, 'sensitivity_indices.json')) + assert os.path.exists(os.path.join(tmp3_dir, 'time.json')) + def test_error_calibration(): @@ -137,7 +154,7 @@ def test_error_calibration(): } # Objective configuration - objectives = { + objective_config = { 'channel_sd_mon.txt': { 'sim_col': 'flo_out', 'obs_col': 'mean', @@ -161,7 +178,7 @@ def test_error_calibration(): txtinout_dir=tmp1_dir, extract_data=extract_data, observe_data=observe_data, - objectives={ + objective_config={ 'channel_sd_monn.txt': { 'sim_col': 'flo_out', 'obs_col': 'mean', @@ -172,7 +189,28 @@ def test_error_calibration(): n_gen=2, pop_size=5 ) - assert exc_info.value.args[0] == 'Mismatch of key names. Ensure extract_data, observe_data, and objectives have identical top-level keys.' + assert 'Top-level keys mismatch' in exc_info.value.args[0] + + # Error: PBIAS is not allowed as an indicator name + with pytest.raises(Exception) as exc_info: + pySWATPlus.Calibration( + parameters=parameters, + calsim_dir=tmp2_dir, + txtinout_dir=tmp1_dir, + extract_data=extract_data, + observe_data=observe_data, + objective_config={ + 'channel_sd_mon.txt': { + 'sim_col': 'flo_out', + 'obs_col': 'mean', + 'indicator': 'PBIAS' + } + }, + algorithm='NSGA2', + n_gen=2, + pop_size=5 + ) + assert exc_info.value.args[0] == 'Indicator "PBIAS" is invalid in objective_config; it lacks a defined optimization direction' # Error: invalid value type of top-key in observe_data with pytest.raises(Exception) as exc_info: @@ -184,7 +222,7 @@ def test_error_calibration(): observe_data={ 'channel_sd_mon.txt': [] }, - objectives=objectives, + objective_config=objective_config, algorithm='NSGA2', n_gen=2, pop_size=5 @@ -203,7 +241,7 @@ def test_error_calibration(): 'date_format': '%Y-%m-%d' } }, - objectives=objectives, + objective_config=objective_config, algorithm='NSGA2', n_gen=2, pop_size=5 @@ -223,7 +261,7 @@ def test_error_calibration(): 'date_formatt': '%Y-%m-%d' } }, - objectives=objectives, + objective_config=objective_config, algorithm='NSGA2', n_gen=2, pop_size=5 @@ -238,14 +276,14 @@ def test_error_calibration(): txtinout_dir=tmp1_dir, extract_data=extract_data, observe_data=observe_data, - objectives={ + objective_config={ 'channel_sd_mon.txt': [] }, algorithm='NSGA2', n_gen=2, pop_size=5 ) - assert 'Expected "channel_sd_mon.txt" in "objectives" must be a dictionary' in exc_info.value.args[0] + assert 'Expected "channel_sd_mon.txt" in objective_config must be a dictionary' in exc_info.value.args[0] # Error: invalid length of sub-dictionary in objectives with pytest.raises(Exception) as exc_info: @@ -255,7 +293,7 @@ def test_error_calibration(): txtinout_dir=tmp1_dir, extract_data=extract_data, observe_data=observe_data, - objectives={ + objective_config={ 'channel_sd_mon.txt': { 'sim_col': 'flo_out', 'obs_col': 'mean' @@ -265,7 +303,7 @@ def test_error_calibration(): n_gen=2, pop_size=5 ) - assert 'Length of "channel_sd_mon.txt" sub-dictionary in "objectives" must be 3' in exc_info.value.args[0] + assert 'Length of "channel_sd_mon.txt" sub-dictionary in objective_config must be 3' in exc_info.value.args[0] # Error: invalid sub-key in objectives with pytest.raises(Exception) as exc_info: @@ -275,7 +313,7 @@ def test_error_calibration(): txtinout_dir=tmp1_dir, extract_data=extract_data, observe_data=observe_data, - objectives={ + objective_config={ 'channel_sd_mon.txt': { 'sim_col': 'flo_out', 'obs_col': 'mean', @@ -286,7 +324,7 @@ def test_error_calibration(): n_gen=2, pop_size=5 ) - assert 'Invalid sub-key "indicatorr" for "channel_sd_mon.txt" in "objectives"' in exc_info.value.args[0] + assert 'Invalid sub-key "indicatorr" for "channel_sd_mon.txt" in objective_config' in exc_info.value.args[0] # Error: invalid indicator in objectives with pytest.raises(Exception) as exc_info: @@ -296,7 +334,7 @@ def test_error_calibration(): txtinout_dir=tmp1_dir, extract_data=extract_data, observe_data=observe_data, - objectives={ + objective_config={ 'channel_sd_mon.txt': { 'sim_col': 'flo_out', 'obs_col': 'mean', @@ -307,7 +345,7 @@ def test_error_calibration(): n_gen=2, pop_size=5 ) - assert 'Invalid "indicator" value "RMSEE" for "channel_sd_mon.txt" in "objectives"' in exc_info.value.args[0] + assert 'Invalid "indicator" value "RMSEE" for "channel_sd_mon.txt" in objective_config' in exc_info.value.args[0] # Error: invalid algorithm name with pytest.raises(Exception) as exc_info: @@ -317,9 +355,54 @@ def test_error_calibration(): txtinout_dir=tmp1_dir, extract_data=extract_data, observe_data=observe_data, - objectives=objectives, + objective_config=objective_config, algorithm='NSGA', n_gen=2, pop_size=5 ).parameter_optimization() assert 'Invalid algorithm "NSGA"' in exc_info.value.args[0] + + # Error: invalid algorithm for multiple objectives + with pytest.raises(Exception) as exc_info: + pySWATPlus.Calibration( + parameters=parameters, + calsim_dir=tmp2_dir, + txtinout_dir=tmp1_dir, + extract_data={ + 'channel_sd_day.txt': { + 'has_units': True, + 'apply_filter': {'name': ['cha561']} + }, + 'channel_sd_mon.txt': { + 'has_units': True, + 'ref_day': 1, + 'apply_filter': {'name': ['cha561']} + } + }, + observe_data={ + 'channel_sd_day.txt': { + 'obs_file': os.path.join(txtinout_dir, 'a_observe_discharge_monthly.csv'), + 'date_format': '%Y-%m-%d' + }, + 'channel_sd_mon.txt': { + 'obs_file': os.path.join(txtinout_dir, 'a_observe_discharge_monthly.csv'), + 'date_format': '%Y-%m-%d' + } + }, + objective_config={ + 'channel_sd_day.txt': { + 'sim_col': 'flo_out', + 'obs_col': 'mean', + 'indicator': 'NSE' + }, + 'channel_sd_mon.txt': { + 'sim_col': 'flo_out', + 'obs_col': 'mean', + 'indicator': 'RMSE' + } + }, + algorithm='DE', + n_gen=2, + pop_size=5 + ).parameter_optimization() + assert 'Algorithm "DE" cannot handle multiple objectives' in exc_info.value.args[0]