From 8b5b57052a04f8ae1bf83d74d20ebe61a10075e2 Mon Sep 17 00:00:00 2001 From: Michael Tarnawa <18899420+mtar@users.noreply.github.com> Date: Mon, 9 Feb 2026 15:23:29 +0100 Subject: [PATCH 01/12] accept JSON files in MergeReader --- config/evaluate/eval_config.yml | 247 ++++++++++-------- .../weathergen/evaluate/io/wegen_reader.py | 36 ++- .../src/weathergen/evaluate/run_evaluation.py | 3 +- .../src/weathergen/evaluate/utils/utils.py | 22 +- 4 files changed, 177 insertions(+), 131 deletions(-) diff --git a/config/evaluate/eval_config.yml b/config/evaluate/eval_config.yml index 83e0a7b84..815ff68c6 100644 --- a/config/evaluate/eval_config.yml +++ b/config/evaluate/eval_config.yml @@ -18,9 +18,9 @@ # vmax: 40 evaluation: - metrics : ["rmse", "mae"] - regions: ["global", "nhem"] - summary_plots : true + metrics : ["rmse"] + regions: ["global"] + summary_plots : false ratio_plots : false heat_maps : false summary_dir: "./plots/" @@ -35,134 +35,167 @@ evaluation: # baseline: "ar40mckx" -default_streams: - ERA5: - channels: ["2t", "10u"] #, "10v", "z_500", "t_850", "u_850", "v_850", "q_850", ] - evaluation: - forecast_step: "all" - sample: "all" - ensemble: "all" #supported: "all", "mean", [0,1,2] - plotting: - sample: [1, 3] - forecast_step: [1,3, 2] #supported: "all", [1,2,3,...], "1-50" (equivalent of [1,2,3,...50]) - ensemble: [0,2,5] #supported: "all", "mean", [0,1,2] - plot_maps: true - plot_target: false - plot_histograms: true - plot_animations: true - CERRA: - channels: ["z_500", "t_850", "u_850"] #, "blah"] - evaluation: - forecast_step: "all" - sample: "all" - plotting: - sample: [2, 3, 0] - forecast_step: [1,3, 4, 5] - plot_maps: true - plot_target: false - plot_histograms: true - plot_animations: true +# default_streams: +# ERA5: +# channels: ["2t", "10u"] #, "10v", "z_500", "t_850", "u_850", "v_850", "q_850", ] +# evaluation: +# forecast_step: "all" +# sample: "all" +# ensemble: "all" #supported: "all", "mean", [0,1,2] +# plotting: +# sample: [1, 3] +# forecast_step: [1,3, 2] #supported: "all", [1,2,3,...], "1-50" (equivalent of [1,2,3,...50]) +# ensemble: [0,2,5] #supported: "all", "mean", [0,1,2] +# plot_maps: false +# plot_target: false +# plot_histograms: false +# plot_animations: false +# CERRA: +# channels: ["z_500", "t_850", "u_850"] #, "blah"] +# evaluation: +# forecast_step: "all" +# sample: "all" +# plotting: +# sample: [2, 3, 0] +# forecast_step: [1,3, 4, 5] +# plot_maps: false +# plot_target: false +# plot_histograms: false +# plot_animations: false run_ids : - ar40mckx: - label: "pretrained model ar40mckx" - results_base_dir : "./results/" - #NEW: if "streams" is not specified, the default streams are used - - - c8g5katp: - label: "2 steps window" - results_base_dir : "./results/" - #NEW: if "streams" is not specified, the default streams are used - +# ar40mckx: +# label: "pretrained model ar40mckx" +# results_base_dir : "./results/" +# #NEW: if "streams" is not specified, the default streams are used +# +# +# c8g5katp: +# label: "2 steps window" +# results_base_dir : "./results/" +# #NEW: if "streams" is not specified, the default streams are used +# ############################### #How to define custom variables different than default_streams (default_streams will be ignored for this run_id): - jjc9ym62: - label: "2 steps window" - results_base_dir : "./results/" - epoch: 1 #optional: if not specified epoch 0 (in inference it is always 0) is used - rank: 2 #optional: if not specified rank 0 is used - streams: - ERA5: - channels: ["2t", "10u", "10v"] - evaluation: - forecast_step: "all" - sample: "all" - ensemble: "mean" - plotting: - sample: [1, 3] - forecast_step: [1,3, 2] - ensemble: "mean" - plot_maps: true - plot_target: false - plot_histograms: true - plot_animations: true +# jjc9ym62: +# label: "2 steps window" +# results_base_dir : "./results/" +# epoch: 1 #optional: if not specified epoch 0 (in inference it is always 0) is used +# rank: 2 #optional: if not specified rank 0 is used +# streams: +# ERA5: +# channels: ["2t", "10u", "10v"] +# evaluation: +# forecast_step: "all" +# sample: "all" +# ensemble: "mean" +# plotting: +# sample: [1, 3] +# forecast_step: [1,3, 2] +# ensemble: "mean" +# plot_maps: true +# plot_target: false +# plot_histograms: true +# plot_animations: true - + # aaaaaaaa: + # label: "test" + # #results_base_dir : "/p/scratch/weatherai/shared_work/results/" + # #model_base_dir: "/p/scratch/weatherai/shared_work/models/" + # epoch: 0 #optional: if not specified epoch 0 (in inference it is always 0) is used + # rank: 0 #optional: if not specified rank 0 is used + # streams: + # ERA5: + # channels: ["2t"] + # evaluation: + # forecast_step: "1-3" + # sample: [0] + # plotting: + # sample: "all" + # forecast_step: "all" + # plot_maps: false + # plot_target: false + # plot_histograms: false + # plot_animations: false #WeatherGeneratorMerge example: ############################### #Example of syntax to stack multiple runs over the ensemble dimension + # merge_test: #<------ This is the new run_id name of the merged dataset. NB. you always need to specify one + # type: "merge" # <------- VERY IMPORTANT + # merge_run_ids: + # - so67dku4 + # - c9cg8ql3 + # metrics_dir: "./merge_test/metrics/" #<------- VERY IMPORTANT + # label: "Merged Results" + # results_base_dir : "./results/" + # streams: + # ERA5: + # channels: ["z_500", "t_850", "u_850", "v_850", "q_850"] + # evaluation: + # forecast_step: [2,4,6] + # sample: [0, 1, 2, 3] + # ensemble: "all" + merge_test: #<------ This is the new run_id name of the merged dataset. NB. you always need to specify one - type: "merge" # <------- VERY IMPORTANT - merge_run_ids: - - so67dku4 - - c9cg8ql3 - metrics_dir: "./merge_test/metrics/" #<------- VERY IMPORTANT - label: "Merged Results" - results_base_dir : "./results/" - streams: - ERA5: - channels: ["z_500", "t_850", "u_850", "v_850", "q_850"] - evaluation: - forecast_step: [2,4,6] - sample: [0, 1, 2, 3] - ensemble: "all" + type: "merge" # <------- VERY IMPORTANT + merge_run_ids: + - aaaaaaaa #yv0scz3w + - aaaaaaaa #jq63t9vj + metrics_dir: "./merge_test/metrics/" #<------- VERY IMPORTANT + label: "Merged Results" + #results_base_dir : "./results/" + streams: + ERA5: + channels: ["2t"] + evaluation: + forecast_step: "1-3" + sample: [0] #WeatherGeneratorJSON example: ############################## #Example of syntax to run over pre-computed scores when the .zarr output is not available anymore - so67dku1: - type: "json" # <------- VERY IMPORTANT - label: "WeatherGenerator" - results_base_dir : "./results/" - streams: - ERA5: - channels: ["z_500", "t_850", "u_850", "v_850", "q_850"] - evaluation: - forecast_step: [2,4,6] - sample: [0, 1, 2, 3] - ensemble: "all" #supported + # so67dku1: + # type: "json" # <------- VERY IMPORTANT + # label: "WeatherGenerator" + # results_base_dir : "./results/" + # streams: + # ERA5: + # channels: ["z_500", "t_850", "u_850", "v_850", "q_850"] + # evaluation: + # forecast_step: [2,4,6] + # sample: [0, 1, 2, 3] + # ensemble: "all" #supported # CSV Reader example: ############################### #ADVANCED (please handle with care): example of how to use the csv reader to Plot PanguWeather scores computed with quaver - pangu: - type: "csv" - label: "Pangu-Weather" - metrics_dir: "" - streams: - ERA5: - channels: ["2t", "q_850", "t_850", "z_500"] - evaluation: - forecast_step: "all" - sample: "all" + # pangu: + # type: "csv" + # label: "Pangu-Weather" + # metrics_dir: "" + # streams: + # ERA5: + # channels: ["2t", "q_850", "t_850", "z_500"] + # evaluation: + # forecast_step: "all" + # sample: "all" - graphcast: - type: "csv" - label: "GraphCast" - metrics_dir: "" - streams: - ERA5: - channels: ["2t", "q_850", "t_850", "z_500"] - evaluation: - forecast_step: "all" - sample: "all" + # graphcast: + # type: "csv" + # label: "GraphCast" + # metrics_dir: "" + # streams: + # ERA5: + # channels: ["2t", "q_850", "t_850", "z_500"] + # evaluation: + # forecast_step: "all" + # sample: "all" diff --git a/packages/evaluate/src/weathergen/evaluate/io/wegen_reader.py b/packages/evaluate/src/weathergen/evaluate/io/wegen_reader.py index 062031273..affc240c1 100644 --- a/packages/evaluate/src/weathergen/evaluate/io/wegen_reader.py +++ b/packages/evaluate/src/weathergen/evaluate/io/wegen_reader.py @@ -205,7 +205,7 @@ def load_single_score(self, stream: str, region: str, metric: str) -> xr.DataArr Load a single pre-computed score for a given run, stream and metric """ score_path = ( - Path(self.metrics_dir) + Path(self.results_base_dir) / "evaluation" / f"{self.run_id}_{stream}_{region}_{metric}_chkpt{self.mini_epoch:05d}.json" ) _logger.debug(f"Looking for: {score_path}") @@ -691,20 +691,34 @@ def _force_consistent_grids(ref: list[xr.DataArray]) -> xr.DataArray: class WeatherGenMergeReader(Reader): - def __init__(self, eval_cfg: dict, run_id: str, private_paths: dict | None = None): - """Data reader class for WeatherGenerator model outputs stored in Zarr format.""" - + def __init__(self, eval_cfg: dict, run_id: str, private_paths: dict | None = None,/, regions: list[str] | None = None, metrics: list[str] | None = None, *, reader_type: str = 'json'): + """ + Data reader class for merging WeatherGenerator model outputs stored in Zarr or JSON format. + + Parameters + ---------- + eval_cfg: dict + config with plotting and evaluation options for that run id + run_id: str + run id of the model + private_paths: dict + dictionary of private paths for the supported HPC + """ + super().__init__(eval_cfg, run_id, private_paths) self.run_ids = eval_cfg.get("merge_run_ids", []) self.metrics_dir = Path(eval_cfg.get("metrics_dir")) - self.mini_epoch = eval_cfg.get("mini_epoch", eval_cfg.get("epoch")) + self.mini_epoch = eval_cfg.get("mini_epoch", 0) - super().__init__(eval_cfg, run_id, private_paths) self.readers = [] _logger.info(f"MERGE READERS: {self.run_ids} ...") + # TODO: Decide which reader shall be used. for run_id in self.run_ids: - reader = WeatherGenZarrReader(self.eval_cfg, run_id, self.private_paths) + if reader_type == 'zarr': + reader = WeatherGenZarrReader(self.eval_cfg, run_id, self.private_paths) + else: + reader = WeatherGenJSONReader(self.eval_cfg, run_id, self.private_paths, regions, metrics) self.readers.append(reader) def get_data( @@ -834,8 +848,12 @@ def load_scores(self, stream: str, regions: str, metrics: str) -> xr.DataArray | missing_metrics: dictionary of missing regions and metrics that need to be recomputed. """ - # TODO: implement this properly. Not it is skipping loading scores + # TODO: merge the scores + if isinstance(self.readers[0], WeatherGenJSONReader): + return self.readers[0].load_scores(stream,regions,metrics) + + # TODO: implement this properly. Not it is skipping loading scores local_scores = {} missing_metrics = {} for region in regions: @@ -929,7 +947,7 @@ def get_ensemble(self, stream: str | None = None) -> list[str]: for reader in self.readers: all_ensembles.append(reader.get_ensemble(stream)) - if all(e == ["0"] or e == [0] for e in all_ensembles): + if all(e == ["0"] or e == [0] or e == {0} for e in all_ensembles): return set(range(len(self.readers))) else: raise NotImplementedError( diff --git a/packages/evaluate/src/weathergen/evaluate/run_evaluation.py b/packages/evaluate/src/weathergen/evaluate/run_evaluation.py index 98326444f..24d627562 100755 --- a/packages/evaluate/src/weathergen/evaluate/run_evaluation.py +++ b/packages/evaluate/src/weathergen/evaluate/run_evaluation.py @@ -172,7 +172,7 @@ def get_reader( elif reader_type == "json": reader = WeatherGenJSONReader(run, run_id, private_paths, region, metric) elif reader_type == "merge": - reader = WeatherGenMergeReader(run, run_id, private_paths) + reader = WeatherGenMergeReader(run, run_id, private_paths, region, metric) else: raise ValueError(f"Unknown reader type: {reader_type}") return reader @@ -272,6 +272,7 @@ def evaluate_from_config( Configuration input stored as dictionary. """ runs = cfg.run_ids + _logger.info(f"Detected {runs} runs") _logger.info(f"Detected {len(runs)} runs") private_paths = cfg.get("private_paths") summary_dir = Path(cfg.evaluation.get("summary_dir", _DEFAULT_PLOT_DIR)) diff --git a/packages/evaluate/src/weathergen/evaluate/utils/utils.py b/packages/evaluate/src/weathergen/evaluate/utils/utils.py index d911a370b..f075b8346 100644 --- a/packages/evaluate/src/weathergen/evaluate/utils/utils.py +++ b/packages/evaluate/src/weathergen/evaluate/utils/utils.py @@ -459,26 +459,20 @@ def metric_list_to_json( Parameters ---------- - reader: + reader: Reader Reader object containing all info about the run_id. - metrics_list : + stream: str + Stream name. + metrics_dict: list Metrics per stream. - npoints_sample_list : - Number of points per sample per stream. - streams : - Stream names. - region : - Region name. - metric_dir : - Output directory. - run_id : - Identifier of the inference run. - mini_epoch : - Mini_epoch number. + regions: list + Region names. """ # stream_loaded_scores['rmse']['nhem']['ERA5']['jjqce6x5'] reader.metrics_dir.mkdir(parents=True, exist_ok=True) + #_logger.info(f"{reader.mini_epoch}") + for metric, metric_stream in metrics_dict.items(): for region in regions: metric_now = metric_stream[region][stream] From fc1d670a959dfc7ca49412894363d265d27ea1ef Mon Sep 17 00:00:00 2001 From: Michael Tarnawa <18899420+mtar@users.noreply.github.com> Date: Mon, 9 Feb 2026 15:34:29 +0100 Subject: [PATCH 02/12] restore original config --- config/evaluate/eval_config.yml | 249 ++++++++++++++------------------ 1 file changed, 108 insertions(+), 141 deletions(-) diff --git a/config/evaluate/eval_config.yml b/config/evaluate/eval_config.yml index 815ff68c6..82c3d7525 100644 --- a/config/evaluate/eval_config.yml +++ b/config/evaluate/eval_config.yml @@ -18,9 +18,9 @@ # vmax: 40 evaluation: - metrics : ["rmse"] - regions: ["global"] - summary_plots : false + metrics : ["rmse", "mae"] + regions: ["global", "nhem"] + summary_plots : true ratio_plots : false heat_maps : false summary_dir: "./plots/" @@ -35,167 +35,134 @@ evaluation: # baseline: "ar40mckx" -# default_streams: -# ERA5: -# channels: ["2t", "10u"] #, "10v", "z_500", "t_850", "u_850", "v_850", "q_850", ] -# evaluation: -# forecast_step: "all" -# sample: "all" -# ensemble: "all" #supported: "all", "mean", [0,1,2] -# plotting: -# sample: [1, 3] -# forecast_step: [1,3, 2] #supported: "all", [1,2,3,...], "1-50" (equivalent of [1,2,3,...50]) -# ensemble: [0,2,5] #supported: "all", "mean", [0,1,2] -# plot_maps: false -# plot_target: false -# plot_histograms: false -# plot_animations: false -# CERRA: -# channels: ["z_500", "t_850", "u_850"] #, "blah"] -# evaluation: -# forecast_step: "all" -# sample: "all" -# plotting: -# sample: [2, 3, 0] -# forecast_step: [1,3, 4, 5] -# plot_maps: false -# plot_target: false -# plot_histograms: false -# plot_animations: false +default_streams: + ERA5: + channels: ["2t", "10u"] #, "10v", "z_500", "t_850", "u_850", "v_850", "q_850", ] + evaluation: + forecast_step: "all" + sample: "all" + ensemble: "all" #supported: "all", "mean", [0,1,2] + plotting: + sample: [1, 3] + forecast_step: [1,3, 2] #supported: "all", [1,2,3,...], "1-50" (equivalent of [1,2,3,...50]) + ensemble: [0,2,5] #supported: "all", "mean", [0,1,2] + plot_maps: true + plot_target: false + plot_histograms: true + plot_animations: true + CERRA: + channels: ["z_500", "t_850", "u_850"] #, "blah"] + evaluation: + forecast_step: "all" + sample: "all" + plotting: + sample: [2, 3, 0] + forecast_step: [1,3, 4, 5] + plot_maps: true + plot_target: false + plot_histograms: true + plot_animations: true run_ids : -# ar40mckx: -# label: "pretrained model ar40mckx" -# results_base_dir : "./results/" -# #NEW: if "streams" is not specified, the default streams are used -# -# -# c8g5katp: -# label: "2 steps window" -# results_base_dir : "./results/" -# #NEW: if "streams" is not specified, the default streams are used -# + ar40mckx: + label: "pretrained model ar40mckx" + results_base_dir : "./results/" + #NEW: if "streams" is not specified, the default streams are used + + + c8g5katp: + label: "2 steps window" + results_base_dir : "./results/" + #NEW: if "streams" is not specified, the default streams are used + ############################### #How to define custom variables different than default_streams (default_streams will be ignored for this run_id): -# jjc9ym62: -# label: "2 steps window" -# results_base_dir : "./results/" -# epoch: 1 #optional: if not specified epoch 0 (in inference it is always 0) is used -# rank: 2 #optional: if not specified rank 0 is used -# streams: -# ERA5: -# channels: ["2t", "10u", "10v"] -# evaluation: -# forecast_step: "all" -# sample: "all" -# ensemble: "mean" -# plotting: -# sample: [1, 3] -# forecast_step: [1,3, 2] -# ensemble: "mean" -# plot_maps: true -# plot_target: false -# plot_histograms: true -# plot_animations: true + jjc9ym62: + label: "2 steps window" + results_base_dir : "./results/" + epoch: 1 #optional: if not specified epoch 0 (in inference it is always 0) is used + rank: 2 #optional: if not specified rank 0 is used + streams: + ERA5: + channels: ["2t", "10u", "10v"] + evaluation: + forecast_step: "all" + sample: "all" + ensemble: "mean" + plotting: + sample: [1, 3] + forecast_step: [1,3, 2] + ensemble: "mean" + plot_maps: true + plot_target: false + plot_histograms: true + plot_animations: true - # aaaaaaaa: - # label: "test" - # #results_base_dir : "/p/scratch/weatherai/shared_work/results/" - # #model_base_dir: "/p/scratch/weatherai/shared_work/models/" - # epoch: 0 #optional: if not specified epoch 0 (in inference it is always 0) is used - # rank: 0 #optional: if not specified rank 0 is used - # streams: - # ERA5: - # channels: ["2t"] - # evaluation: - # forecast_step: "1-3" - # sample: [0] - # plotting: - # sample: "all" - # forecast_step: "all" - # plot_maps: false - # plot_target: false - # plot_histograms: false - # plot_animations: false + #WeatherGeneratorMerge example: ############################### #Example of syntax to stack multiple runs over the ensemble dimension - # merge_test: #<------ This is the new run_id name of the merged dataset. NB. you always need to specify one - # type: "merge" # <------- VERY IMPORTANT - # merge_run_ids: - # - so67dku4 - # - c9cg8ql3 - # metrics_dir: "./merge_test/metrics/" #<------- VERY IMPORTANT - # label: "Merged Results" - # results_base_dir : "./results/" - # streams: - # ERA5: - # channels: ["z_500", "t_850", "u_850", "v_850", "q_850"] - # evaluation: - # forecast_step: [2,4,6] - # sample: [0, 1, 2, 3] - # ensemble: "all" - merge_test: #<------ This is the new run_id name of the merged dataset. NB. you always need to specify one - type: "merge" # <------- VERY IMPORTANT - merge_run_ids: - - aaaaaaaa #yv0scz3w - - aaaaaaaa #jq63t9vj - metrics_dir: "./merge_test/metrics/" #<------- VERY IMPORTANT - label: "Merged Results" - #results_base_dir : "./results/" - streams: - ERA5: - channels: ["2t"] - evaluation: - forecast_step: "1-3" - sample: [0] + type: "merge" # <------- VERY IMPORTANT + merge_run_ids: + - so67dku4 + - c9cg8ql3 + metrics_dir: "./merge_test/metrics/" #<------- VERY IMPORTANT + label: "Merged Results" + results_base_dir : "./results/" + streams: + ERA5: + channels: ["z_500", "t_850", "u_850", "v_850", "q_850"] + evaluation: + forecast_step: [2,4,6] + sample: [0, 1, 2, 3] + ensemble: "all" #WeatherGeneratorJSON example: ############################## #Example of syntax to run over pre-computed scores when the .zarr output is not available anymore - # so67dku1: - # type: "json" # <------- VERY IMPORTANT - # label: "WeatherGenerator" - # results_base_dir : "./results/" - # streams: - # ERA5: - # channels: ["z_500", "t_850", "u_850", "v_850", "q_850"] - # evaluation: - # forecast_step: [2,4,6] - # sample: [0, 1, 2, 3] - # ensemble: "all" #supported + so67dku1: + type: "json" # <------- VERY IMPORTANT + label: "WeatherGenerator" + results_base_dir : "./results/" + streams: + ERA5: + channels: ["z_500", "t_850", "u_850", "v_850", "q_850"] + evaluation: + forecast_step: [2,4,6] + sample: [0, 1, 2, 3] + ensemble: "all" #supported # CSV Reader example: ############################### #ADVANCED (please handle with care): example of how to use the csv reader to Plot PanguWeather scores computed with quaver - # pangu: - # type: "csv" - # label: "Pangu-Weather" - # metrics_dir: "" - # streams: - # ERA5: - # channels: ["2t", "q_850", "t_850", "z_500"] - # evaluation: - # forecast_step: "all" - # sample: "all" + pangu: + type: "csv" + label: "Pangu-Weather" + metrics_dir: "" + streams: + ERA5: + channels: ["2t", "q_850", "t_850", "z_500"] + evaluation: + forecast_step: "all" + sample: "all" - # graphcast: - # type: "csv" - # label: "GraphCast" - # metrics_dir: "" - # streams: - # ERA5: - # channels: ["2t", "q_850", "t_850", "z_500"] - # evaluation: - # forecast_step: "all" - # sample: "all" + graphcast: + type: "csv" + label: "GraphCast" + metrics_dir: "" + streams: + ERA5: + channels: ["2t", "q_850", "t_850", "z_500"] + evaluation: + forecast_step: "all" + sample: "all" - + \ No newline at end of file From 5b762bee3c0ebd9ed9ca44c7189a8c6045c13cf4 Mon Sep 17 00:00:00 2001 From: Michael Tarnawa <18899420+mtar@users.noreply.github.com> Date: Tue, 10 Feb 2026 09:58:25 +0100 Subject: [PATCH 03/12] merge scores --- .../weathergen/evaluate/io/wegen_reader.py | 41 +++++++++++++++---- .../src/weathergen/evaluate/run_evaluation.py | 1 - .../src/weathergen/evaluate/utils/utils.py | 1 - 3 files changed, 32 insertions(+), 11 deletions(-) diff --git a/packages/evaluate/src/weathergen/evaluate/io/wegen_reader.py b/packages/evaluate/src/weathergen/evaluate/io/wegen_reader.py index affc240c1..cd9cd9d18 100644 --- a/packages/evaluate/src/weathergen/evaluate/io/wegen_reader.py +++ b/packages/evaluate/src/weathergen/evaluate/io/wegen_reader.py @@ -29,6 +29,7 @@ from weathergen.evaluate.io.io_reader import Reader, ReaderOutput from weathergen.evaluate.scores.score_utils import to_list from weathergen.evaluate.utils.derived_channels import DeriveChannels +from weathergen.evaluate.utils.utils import merge _logger = logging.getLogger(__name__) _logger.setLevel(logging.INFO) @@ -205,7 +206,8 @@ def load_single_score(self, stream: str, region: str, metric: str) -> xr.DataArr Load a single pre-computed score for a given run, stream and metric """ score_path = ( - Path(self.results_base_dir) / "evaluation" + Path(self.results_base_dir) + / "evaluation" / f"{self.run_id}_{stream}_{region}_{metric}_chkpt{self.mini_epoch:05d}.json" ) _logger.debug(f"Looking for: {score_path}") @@ -691,10 +693,20 @@ def _force_consistent_grids(ref: list[xr.DataArray]) -> xr.DataArray: class WeatherGenMergeReader(Reader): - def __init__(self, eval_cfg: dict, run_id: str, private_paths: dict | None = None,/, regions: list[str] | None = None, metrics: list[str] | None = None, *, reader_type: str = 'json'): + def __init__( + self, + eval_cfg: dict, + run_id: str, + private_paths: dict | None = None, + /, + regions: list[str] | None = None, + metrics: list[str] | None = None, + *, + reader_type: str = "json", + ): """ Data reader class for merging WeatherGenerator model outputs stored in Zarr or JSON format. - + Parameters ---------- eval_cfg: dict @@ -715,10 +727,12 @@ def __init__(self, eval_cfg: dict, run_id: str, private_paths: dict | None = Non # TODO: Decide which reader shall be used. for run_id in self.run_ids: - if reader_type == 'zarr': + if reader_type == "zarr": reader = WeatherGenZarrReader(self.eval_cfg, run_id, self.private_paths) else: - reader = WeatherGenJSONReader(self.eval_cfg, run_id, self.private_paths, regions, metrics) + reader = WeatherGenJSONReader( + self.eval_cfg, run_id, self.private_paths, regions, metrics + ) self.readers.append(reader) def get_data( @@ -848,11 +862,20 @@ def load_scores(self, stream: str, regions: str, metrics: str) -> xr.DataArray | missing_metrics: dictionary of missing regions and metrics that need to be recomputed. """ - # TODO: merge the scores + # Merge scores from all JsonReaders if isinstance(self.readers[0], WeatherGenJSONReader): - return self.readers[0].load_scores(stream,regions,metrics) - - + merged_scores = {} + merged_missing = {} + for reader in self.readers: + scores, missing = reader.load_scores(stream, regions, metrics) + merge(merged_scores, scores) + merge(merged_missing, missing) + + _logger.info(f"scores {merged_scores}") + _logger.info(f"missing {merged_missing}") + return merged_scores, merged_missing + + # ZarrReader # TODO: implement this properly. Not it is skipping loading scores local_scores = {} missing_metrics = {} diff --git a/packages/evaluate/src/weathergen/evaluate/run_evaluation.py b/packages/evaluate/src/weathergen/evaluate/run_evaluation.py index 24d627562..02bdde20d 100755 --- a/packages/evaluate/src/weathergen/evaluate/run_evaluation.py +++ b/packages/evaluate/src/weathergen/evaluate/run_evaluation.py @@ -272,7 +272,6 @@ def evaluate_from_config( Configuration input stored as dictionary. """ runs = cfg.run_ids - _logger.info(f"Detected {runs} runs") _logger.info(f"Detected {len(runs)} runs") private_paths = cfg.get("private_paths") summary_dir = Path(cfg.evaluation.get("summary_dir", _DEFAULT_PLOT_DIR)) diff --git a/packages/evaluate/src/weathergen/evaluate/utils/utils.py b/packages/evaluate/src/weathergen/evaluate/utils/utils.py index f075b8346..98c26b8cd 100644 --- a/packages/evaluate/src/weathergen/evaluate/utils/utils.py +++ b/packages/evaluate/src/weathergen/evaluate/utils/utils.py @@ -471,7 +471,6 @@ def metric_list_to_json( # stream_loaded_scores['rmse']['nhem']['ERA5']['jjqce6x5'] reader.metrics_dir.mkdir(parents=True, exist_ok=True) - #_logger.info(f"{reader.mini_epoch}") for metric, metric_stream in metrics_dict.items(): for region in regions: From 08f16f0ff4653ea8d7ef85bc31afb085ba127d0d Mon Sep 17 00:00:00 2001 From: Michael Tarnawa <18899420+mtar@users.noreply.github.com> Date: Tue, 10 Feb 2026 11:22:34 +0100 Subject: [PATCH 04/12] add reader_type jsonmerge --- packages/evaluate/src/weathergen/evaluate/io/wegen_reader.py | 5 +---- packages/evaluate/src/weathergen/evaluate/run_evaluation.py | 4 ++++ packages/evaluate/src/weathergen/evaluate/utils/utils.py | 1 - 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/packages/evaluate/src/weathergen/evaluate/io/wegen_reader.py b/packages/evaluate/src/weathergen/evaluate/io/wegen_reader.py index cd9cd9d18..5c3991b24 100644 --- a/packages/evaluate/src/weathergen/evaluate/io/wegen_reader.py +++ b/packages/evaluate/src/weathergen/evaluate/io/wegen_reader.py @@ -702,7 +702,7 @@ def __init__( regions: list[str] | None = None, metrics: list[str] | None = None, *, - reader_type: str = "json", + reader_type: str = "zarr", ): """ Data reader class for merging WeatherGenerator model outputs stored in Zarr or JSON format. @@ -870,9 +870,6 @@ def load_scores(self, stream: str, regions: str, metrics: str) -> xr.DataArray | scores, missing = reader.load_scores(stream, regions, metrics) merge(merged_scores, scores) merge(merged_missing, missing) - - _logger.info(f"scores {merged_scores}") - _logger.info(f"missing {merged_missing}") return merged_scores, merged_missing # ZarrReader diff --git a/packages/evaluate/src/weathergen/evaluate/run_evaluation.py b/packages/evaluate/src/weathergen/evaluate/run_evaluation.py index 02bdde20d..8b09314f3 100755 --- a/packages/evaluate/src/weathergen/evaluate/run_evaluation.py +++ b/packages/evaluate/src/weathergen/evaluate/run_evaluation.py @@ -173,6 +173,10 @@ def get_reader( reader = WeatherGenJSONReader(run, run_id, private_paths, region, metric) elif reader_type == "merge": reader = WeatherGenMergeReader(run, run_id, private_paths, region, metric) + elif reader_type == "jsonmerge": + reader = WeatherGenMergeReader( + run, run_id, private_paths, region, metric, reader_type="json" + ) else: raise ValueError(f"Unknown reader type: {reader_type}") return reader diff --git a/packages/evaluate/src/weathergen/evaluate/utils/utils.py b/packages/evaluate/src/weathergen/evaluate/utils/utils.py index 98c26b8cd..68796dcb7 100644 --- a/packages/evaluate/src/weathergen/evaluate/utils/utils.py +++ b/packages/evaluate/src/weathergen/evaluate/utils/utils.py @@ -471,7 +471,6 @@ def metric_list_to_json( # stream_loaded_scores['rmse']['nhem']['ERA5']['jjqce6x5'] reader.metrics_dir.mkdir(parents=True, exist_ok=True) - for metric, metric_stream in metrics_dict.items(): for region in regions: metric_now = metric_stream[region][stream] From 5ac2ac458bb1325e1ce88197b0b8d43ec22ae641 Mon Sep 17 00:00:00 2001 From: Michael Tarnawa <18899420+mtar@users.noreply.github.com> Date: Tue, 10 Feb 2026 14:08:58 +0100 Subject: [PATCH 05/12] merge underlying array --- .../evaluate/src/weathergen/evaluate/io/wegen_reader.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/packages/evaluate/src/weathergen/evaluate/io/wegen_reader.py b/packages/evaluate/src/weathergen/evaluate/io/wegen_reader.py index 5c3991b24..bb91f2792 100644 --- a/packages/evaluate/src/weathergen/evaluate/io/wegen_reader.py +++ b/packages/evaluate/src/weathergen/evaluate/io/wegen_reader.py @@ -870,6 +870,14 @@ def load_scores(self, stream: str, regions: str, metrics: str) -> xr.DataArray | scores, missing = reader.load_scores(stream, regions, metrics) merge(merged_scores, scores) merge(merged_missing, missing) + + for metric in merged_scores.keys(): + for region in merged_scores[metric].keys(): + for stream in merged_scores[metric][region].keys(): + scores = (merged_scores[metric][region][stream].pop(run_id, None) for run_id in self.run_ids) + _logger.info(f"scores {scores}") + merged_scores[metric][region][stream].setdefault('merge_test', xr.concat(scores,dim='ens').assign_coords(ens=range(len(self.readers)))) + return merged_scores, merged_missing # ZarrReader From 545fc3624d33c326f03616db5faeb925c51f2e7b Mon Sep 17 00:00:00 2001 From: Michael Tarnawa <18899420+mtar@users.noreply.github.com> Date: Tue, 10 Feb 2026 14:29:22 +0100 Subject: [PATCH 06/12] set run_id key --- packages/evaluate/src/weathergen/evaluate/io/wegen_reader.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/evaluate/src/weathergen/evaluate/io/wegen_reader.py b/packages/evaluate/src/weathergen/evaluate/io/wegen_reader.py index bb91f2792..e35443ca8 100644 --- a/packages/evaluate/src/weathergen/evaluate/io/wegen_reader.py +++ b/packages/evaluate/src/weathergen/evaluate/io/wegen_reader.py @@ -876,7 +876,7 @@ def load_scores(self, stream: str, regions: str, metrics: str) -> xr.DataArray | for stream in merged_scores[metric][region].keys(): scores = (merged_scores[metric][region][stream].pop(run_id, None) for run_id in self.run_ids) _logger.info(f"scores {scores}") - merged_scores[metric][region][stream].setdefault('merge_test', xr.concat(scores,dim='ens').assign_coords(ens=range(len(self.readers)))) + merged_scores[metric][region][stream].setdefault(self.run_id, xr.concat(scores,dim='ens').assign_coords(ens=range(len(self.readers)))) return merged_scores, merged_missing From 8b2a4e96f4cf4d17689cfda3a2afb7aa06cec230 Mon Sep 17 00:00:00 2001 From: Michael Tarnawa <18899420+mtar@users.noreply.github.com> Date: Tue, 10 Feb 2026 15:24:47 +0100 Subject: [PATCH 07/12] add comments --- packages/evaluate/src/weathergen/evaluate/io/wegen_reader.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/packages/evaluate/src/weathergen/evaluate/io/wegen_reader.py b/packages/evaluate/src/weathergen/evaluate/io/wegen_reader.py index e35443ca8..ec1350c2d 100644 --- a/packages/evaluate/src/weathergen/evaluate/io/wegen_reader.py +++ b/packages/evaluate/src/weathergen/evaluate/io/wegen_reader.py @@ -862,15 +862,18 @@ def load_scores(self, stream: str, regions: str, metrics: str) -> xr.DataArray | missing_metrics: dictionary of missing regions and metrics that need to be recomputed. """ - # Merge scores from all JsonReaders + # merge scores from all JsonReaders if isinstance(self.readers[0], WeatherGenJSONReader): merged_scores = {} merged_missing = {} + + # deep merge dicts for reader in self.readers: scores, missing = reader.load_scores(stream, regions, metrics) merge(merged_scores, scores) merge(merged_missing, missing) + # merge runs into one with all scores concatenated for metric in merged_scores.keys(): for region in merged_scores[metric].keys(): for stream in merged_scores[metric][region].keys(): From 54e064e0b3e0b861cf22bdf311e1cc8640ba6eaa Mon Sep 17 00:00:00 2001 From: Michael Tarnawa <18899420+mtar@users.noreply.github.com> Date: Tue, 10 Feb 2026 15:48:32 +0100 Subject: [PATCH 08/12] update docstring --- .../weathergen/evaluate/io/wegen_reader.py | 25 ++++++++++++++++--- 1 file changed, 21 insertions(+), 4 deletions(-) diff --git a/packages/evaluate/src/weathergen/evaluate/io/wegen_reader.py b/packages/evaluate/src/weathergen/evaluate/io/wegen_reader.py index ec1350c2d..12d6c3d73 100644 --- a/packages/evaluate/src/weathergen/evaluate/io/wegen_reader.py +++ b/packages/evaluate/src/weathergen/evaluate/io/wegen_reader.py @@ -715,6 +715,13 @@ def __init__( run id of the model private_paths: dict dictionary of private paths for the supported HPC + regions: list[str] + names of predefined bounding box for a region + metrics: list[str] + names of the metric scores to compute + reader_type: str + The type of the internal reader. If zarr, WeatherGenZarrReader is used, + WeatherGenJSONReader otherwise. Default: zarr """ super().__init__(eval_cfg, run_id, private_paths) self.run_ids = eval_cfg.get("merge_run_ids", []) @@ -841,7 +848,9 @@ def _concat_over_ens(self, da_merge, fsteps_merge): return da_ens - def load_scores(self, stream: str, regions: str, metrics: str) -> xr.DataArray | None: + def load_scores( + self, stream: str, regions: list[str], metrics: list[str] + ) -> xr.DataArray | None: """ Load the pre-computed scores for a given run, stream and metric and epoch. @@ -866,7 +875,7 @@ def load_scores(self, stream: str, regions: str, metrics: str) -> xr.DataArray | if isinstance(self.readers[0], WeatherGenJSONReader): merged_scores = {} merged_missing = {} - + # deep merge dicts for reader in self.readers: scores, missing = reader.load_scores(stream, regions, metrics) @@ -877,9 +886,17 @@ def load_scores(self, stream: str, regions: str, metrics: str) -> xr.DataArray | for metric in merged_scores.keys(): for region in merged_scores[metric].keys(): for stream in merged_scores[metric][region].keys(): - scores = (merged_scores[metric][region][stream].pop(run_id, None) for run_id in self.run_ids) + scores = ( + merged_scores[metric][region][stream].pop(run_id, None) + for run_id in self.run_ids + ) _logger.info(f"scores {scores}") - merged_scores[metric][region][stream].setdefault(self.run_id, xr.concat(scores,dim='ens').assign_coords(ens=range(len(self.readers)))) + merged_scores[metric][region][stream].setdefault( + self.run_id, + xr.concat(scores, dim="ens").assign_coords( + ens=range(len(self.readers)) + ), + ) return merged_scores, merged_missing From 16ed180575c31a4c7a11f7324a5217c2075e2ce5 Mon Sep 17 00:00:00 2001 From: Michael Tarnawa <18899420+mtar@users.noreply.github.com> Date: Tue, 10 Feb 2026 15:48:50 +0100 Subject: [PATCH 09/12] add config example --- config/evaluate/eval_config.yml | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/config/evaluate/eval_config.yml b/config/evaluate/eval_config.yml index 82c3d7525..407a73608 100644 --- a/config/evaluate/eval_config.yml +++ b/config/evaluate/eval_config.yml @@ -138,6 +138,25 @@ run_ids : sample: [0, 1, 2, 3] ensemble: "all" #supported + #WeatherGeneratorMerge JSON example: + ############################### + + #Example of syntax to stack multiple runs over the ensemble dimension + merge_test: #<------ This is the new run_id name of the merged dataset. NB. you always need to specify one + type: "jsonmerge" # <------- VERY IMPORTANT + merge_run_ids: + - so67dku4 + - c9cg8ql3 + metrics_dir: "./merge_test/metrics/" #<------- VERY IMPORTANT + label: "Merged Results" + results_base_dir : "./results/" + streams: + ERA5: + channels: ["z_500", "t_850", "u_850", "v_850", "q_850"] + evaluation: + forecast_step: [2,4,6] + sample: [0, 1, 2, 3] + ensemble: "all" # CSV Reader example: ############################### From e184c4c6bc9837806cbadd6edb7480d66b090b63 Mon Sep 17 00:00:00 2001 From: Michael Tarnawa <18899420+mtar@users.noreply.github.com> Date: Tue, 17 Feb 2026 09:43:12 +0100 Subject: [PATCH 10/12] remove opt arguments --- packages/evaluate/src/weathergen/evaluate/io/wegen_reader.py | 2 -- packages/evaluate/src/weathergen/evaluate/run_evaluation.py | 2 +- 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/packages/evaluate/src/weathergen/evaluate/io/wegen_reader.py b/packages/evaluate/src/weathergen/evaluate/io/wegen_reader.py index 12d6c3d73..13b19fa10 100644 --- a/packages/evaluate/src/weathergen/evaluate/io/wegen_reader.py +++ b/packages/evaluate/src/weathergen/evaluate/io/wegen_reader.py @@ -698,10 +698,8 @@ def __init__( eval_cfg: dict, run_id: str, private_paths: dict | None = None, - /, regions: list[str] | None = None, metrics: list[str] | None = None, - *, reader_type: str = "zarr", ): """ diff --git a/packages/evaluate/src/weathergen/evaluate/run_evaluation.py b/packages/evaluate/src/weathergen/evaluate/run_evaluation.py index 8b09314f3..70dd18dd4 100755 --- a/packages/evaluate/src/weathergen/evaluate/run_evaluation.py +++ b/packages/evaluate/src/weathergen/evaluate/run_evaluation.py @@ -172,7 +172,7 @@ def get_reader( elif reader_type == "json": reader = WeatherGenJSONReader(run, run_id, private_paths, region, metric) elif reader_type == "merge": - reader = WeatherGenMergeReader(run, run_id, private_paths, region, metric) + reader = WeatherGenMergeReader(run, run_id, private_paths) elif reader_type == "jsonmerge": reader = WeatherGenMergeReader( run, run_id, private_paths, region, metric, reader_type="json" From 78bfeed52a86ff616f9de21ea66e7ba3ad2a03ea Mon Sep 17 00:00:00 2001 From: Michael Tarnawa <18899420+mtar@users.noreply.github.com> Date: Tue, 17 Feb 2026 10:42:19 +0100 Subject: [PATCH 11/12] same var names scores --- .../weathergen/evaluate/io/wegen_reader.py | 38 ++++++++----------- 1 file changed, 16 insertions(+), 22 deletions(-) diff --git a/packages/evaluate/src/weathergen/evaluate/io/wegen_reader.py b/packages/evaluate/src/weathergen/evaluate/io/wegen_reader.py index 13b19fa10..7782cab1e 100644 --- a/packages/evaluate/src/weathergen/evaluate/io/wegen_reader.py +++ b/packages/evaluate/src/weathergen/evaluate/io/wegen_reader.py @@ -869,44 +869,38 @@ def load_scores( missing_metrics: dictionary of missing regions and metrics that need to be recomputed. """ - # merge scores from all JsonReaders - if isinstance(self.readers[0], WeatherGenJSONReader): - merged_scores = {} - merged_missing = {} + local_scores = {} + missing_metrics = {} + if isinstance(self.readers[0], WeatherGenZarrReader): + # TODO: implement this properly. Not it is skipping loading scores + for region in regions: + for metric in metrics: + # all other cases: recompute scores + missing_metrics.setdefault(region, []).append(metric) + else: #JsonReader # deep merge dicts for reader in self.readers: scores, missing = reader.load_scores(stream, regions, metrics) - merge(merged_scores, scores) - merge(merged_missing, missing) + merge(local_scores, scores) + merge(missing_metrics, missing) # merge runs into one with all scores concatenated - for metric in merged_scores.keys(): - for region in merged_scores[metric].keys(): - for stream in merged_scores[metric][region].keys(): + for metric in local_scores.keys(): + for region in local_scores[metric].keys(): + for stream in local_scores[metric][region].keys(): scores = ( - merged_scores[metric][region][stream].pop(run_id, None) + local_scores[metric][region][stream].pop(run_id, None) for run_id in self.run_ids ) _logger.info(f"scores {scores}") - merged_scores[metric][region][stream].setdefault( + local_scores[metric][region][stream].setdefault( self.run_id, xr.concat(scores, dim="ens").assign_coords( ens=range(len(self.readers)) ), ) - return merged_scores, merged_missing - - # ZarrReader - # TODO: implement this properly. Not it is skipping loading scores - local_scores = {} - missing_metrics = {} - for region in regions: - for metric in metrics: - # all other cases: recompute scores - missing_metrics.setdefault(region, []).append(metric) - return local_scores, missing_metrics def get_climatology_filename(self, stream: str) -> str | None: From 33d1ec3c1cd16c0e6c129b23532d7e2e777471d8 Mon Sep 17 00:00:00 2001 From: Michael Tarnawa <18899420+mtar@users.noreply.github.com> Date: Tue, 17 Feb 2026 13:13:19 +0100 Subject: [PATCH 12/12] remove debug message --- .../evaluate/src/weathergen/evaluate/io/wegen_reader.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/packages/evaluate/src/weathergen/evaluate/io/wegen_reader.py b/packages/evaluate/src/weathergen/evaluate/io/wegen_reader.py index 7782cab1e..4da20fd0c 100644 --- a/packages/evaluate/src/weathergen/evaluate/io/wegen_reader.py +++ b/packages/evaluate/src/weathergen/evaluate/io/wegen_reader.py @@ -730,7 +730,8 @@ def __init__( _logger.info(f"MERGE READERS: {self.run_ids} ...") - # TODO: Decide which reader shall be used. + + for run_id in self.run_ids: if reader_type == "zarr": reader = WeatherGenZarrReader(self.eval_cfg, run_id, self.private_paths) @@ -890,10 +891,9 @@ def load_scores( for region in local_scores[metric].keys(): for stream in local_scores[metric][region].keys(): scores = ( - local_scores[metric][region][stream].pop(run_id, None) + local_scores[metric][region][stream].pop(run_id) for run_id in self.run_ids ) - _logger.info(f"scores {scores}") local_scores[metric][region][stream].setdefault( self.run_id, xr.concat(scores, dim="ens").assign_coords(