diff --git a/improver/calibration/load_and_train_quantile_regression_random_forest.py b/improver/calibration/load_and_train_quantile_regression_random_forest.py index 8c6ff23683..db8845b948 100644 --- a/improver/calibration/load_and_train_quantile_regression_random_forest.py +++ b/improver/calibration/load_and_train_quantile_regression_random_forest.py @@ -322,8 +322,8 @@ def process( if len(missing_features) > 0: msg = ( "The features requested in the feature_config are absent from " - "the forecast parquet file and the input cubes. The missing fields are: " - f"{missing_features}." + "the forecast parquet file and the input cubes. " + f"The missing fields are: {missing_features}." ) raise ValueError(msg) return forecast_df, truth_df, cube_inputs @@ -421,9 +421,12 @@ def _add_static_features_from_cubes_to_df( constr = iris.Constraint(name=feature_name) # Static features can be provided either as a cube or as a column # in the forecast DataFrame. - if not cube_inputs.extract(constr): + try: + feature_cube = cube_inputs.extract_cube(constr) + except iris.exceptions.ConstraintMismatchError: + feature_cube = None + if not feature_cube: continue - feature_cube = cube_inputs.extract_cube(constr) feature_df = as_data_frame(feature_cube, add_aux_coords=True) forecast_df = forecast_df.merge( feature_df[[*self.unique_site_id_keys, feature_name]], @@ -448,14 +451,18 @@ def filter_bad_sites( - DataFrame containing the forecast data with bad sites removed. - DataFrame containing the truth data with bad sites removed. """ - truth_df.dropna(subset=["ob_value"], inplace=True) + truth_df.dropna(subset=["ob_value"] + [*self.unique_site_id_keys], inplace=True) if truth_df.empty: msg = "Empty truth DataFrame after removing NaNs." raise ValueError(msg) - forecast_index = forecast_df.set_index([*self.unique_site_id_keys]).index - truth_index = truth_df.set_index([*self.unique_site_id_keys]).index + # Include time in the index, so that forecasts will be dropped if they + # correspond to a site and time that is not in the truth data. + forecast_index = forecast_df.set_index( + [*self.unique_site_id_keys] + ["time"] + ).index + truth_index = truth_df.set_index([*self.unique_site_id_keys] + ["time"]).index forecast_df = forecast_df[forecast_index.isin(truth_index)] truth_df = truth_df[truth_index.isin(forecast_index)] diff --git a/improver/calibration/quantile_regression_random_forest.py b/improver/calibration/quantile_regression_random_forest.py index 67cc09ebac..2c8e90b352 100644 --- a/improver/calibration/quantile_regression_random_forest.py +++ b/improver/calibration/quantile_regression_random_forest.py @@ -96,14 +96,9 @@ def prep_feature( # For a subset of the input DataFrame compute the mean or standard deviation # over the representation column, grouped by the groupby columns. - if feature_name == "mean": - subset_df = df[subset_cols].groupby(groupby_cols).mean() - elif feature_name == "std": - subset_df = df[subset_cols].groupby(groupby_cols).std() - elif feature_name == "min": - subset_df = df[subset_cols].groupby(groupby_cols).min() - elif feature_name == "max": - subset_df = df[subset_cols].groupby(groupby_cols).max() + if feature_name in ["min", "max", "mean", "std"]: + subset_grouped = df[subset_cols].groupby(groupby_cols) + subset_df = getattr(subset_grouped, feature_name)() elif feature_name.startswith("members_below"): threshold = float(feature_name.split("_")[2]) if transformation is not None: @@ -119,7 +114,6 @@ def prep_feature( ) subset_df.rename(variable_name, inplace=True) subset_df = subset_df.astype(orig_dtype) - # subset_df[variable_name].astype(orig_dtype) elif feature_name.startswith("members_above"): threshold = float(feature_name.split("_")[2]) if transformation is not None: diff --git a/improver/cli/apply_quantile_regression_random_forest.py b/improver/cli/apply_quantile_regression_random_forest.py index cb0bd00407..765406d6ca 100644 --- a/improver/cli/apply_quantile_regression_random_forest.py +++ b/improver/cli/apply_quantile_regression_random_forest.py @@ -26,8 +26,11 @@ def process( Args: file_paths (cli.inputpaths): A list of input paths containing: - - The path to a QRF trained model in pickle file format to be used - for calibration. + - The path to the pickle file produced by training the QRF model. + The pickle file contains the QRF model and the transformation and + pre_transform_addition values if a transformation was applied. If no + transformation was applied then the transformation and + pre_transform_addition values will be None and 0, respectively. - The path to a NetCDF file containing the forecast to be calibrated. - Optionally, paths to NetCDF files containing additional predictors. feature_config (dict): @@ -50,8 +53,6 @@ def process( A string containing the CF name of the forecast to be calibrated e.g. air_temperature. This will be used to separate it from the rest of the feature cubes, if present. - The names of the coordinates that uniquely identify each site, - e.g. "wmo_id" or "latitude,longitude". unique_site_id_keys (str): The names of the coordinates that uniquely identify each site, e.g. "wmo_id" or "latitude,longitude". diff --git a/improver/cli/train_quantile_regression_random_forest.py b/improver/cli/train_quantile_regression_random_forest.py index f09a5f4153..182e8e09c6 100644 --- a/improver/cli/train_quantile_regression_random_forest.py +++ b/improver/cli/train_quantile_regression_random_forest.py @@ -153,5 +153,7 @@ def process( unique_site_id_keys=unique_site_id_keys, **kwargs, )(forecast_df, truth_df, cube_inputs) + if result == (None, None, None): + return None return result diff --git a/improver_tests/acceptance/test_train_quantile_regression_random_forest.py b/improver_tests/acceptance/test_train_quantile_regression_random_forest.py index 97669d4878..40205a6f3a 100644 --- a/improver_tests/acceptance/test_train_quantile_regression_random_forest.py +++ b/improver_tests/acceptance/test_train_quantile_regression_random_forest.py @@ -114,3 +114,51 @@ def test_missing_inputs( ] assert run_cli(compulsory_args + named_args) is None + # Check no file has been written to disk. + assert not output_path.exists() + + +def test_invalid_cycletime( + tmp_path, +): + """ + Test train-quantile-regression-random-forest CLI when no training can occur + because there is no valid data in the parquet files to calibrate the cycletime + provided. + """ + kgo_dir = acc.kgo_root() / CLI + config_path = kgo_dir / "config.json" + output_path = tmp_path / "output.pickle" + history_path = kgo_dir / "spot_calibration_tables" + truth_path = kgo_dir / "spot_observation_tables" + compulsory_args = [history_path, truth_path] + named_args = [ + "--feature-config", + config_path, + "--parquet-diagnostic-names", + "temperature_at_screen_level", + "--target-cf-name", + "air_temperature", + "--forecast-periods", + "6:18:6", + "--cycletime", + "20250704T0000Z", + "--training-length", + "2", + "--experiment", + "mix-latestblend", + "--n-estimators", + "10", + "--max-depth", + "5", + "--random-state", + "42", + "--compression-level", + "5", + "--output", + output_path, + ] + + assert run_cli(compulsory_args + named_args) is None + # Check no file has been written to disk. + assert not output_path.exists() diff --git a/improver_tests/calibration/quantile_regression_random_forests_calibration/test_load_and_apply_quantile_regression_random_forest.py b/improver_tests/calibration/quantile_regression_random_forests_calibration/test_load_and_apply_quantile_regression_random_forest.py index f64f7aacd8..e4417122d5 100644 --- a/improver_tests/calibration/quantile_regression_random_forests_calibration/test_load_and_apply_quantile_regression_random_forest.py +++ b/improver_tests/calibration/quantile_regression_random_forests_calibration/test_load_and_apply_quantile_regression_random_forest.py @@ -43,6 +43,68 @@ def _add_day_of_training_period_to_cube(cube, day_of_training_period, secondary_ return cube +@pytest.fixture +def set_up_for_unexpected(): + """Set up common elements for the unexpected tests.""" + feature_config = {"wind_speed_at_10m": ["mean", "std", "latitude", "longitude"]} + + n_estimators = 2 + max_depth = 2 + random_state = 55 + transformation = None + pre_transform_addition = 0 + extra_kwargs = {} + include_static = False + quantiles = [0.5] + + qrf_model = _run_train_qrf( + feature_config, + n_estimators, + max_depth, + random_state, + transformation, + pre_transform_addition, + extra_kwargs, + include_static, + forecast_reference_times=[ + "20170101T0000Z", + "20170102T0000Z", + ], + validity_times=[ + "20170101T1200Z", + "20170102T1200Z", + ], + realization_data=[2, 6, 10], + truth_data=[4.2, 6.2, 4.1, 5.1], + ) + + frt = "20170103T0000Z" + vt = "20170103T1200Z" + data = np.arange(6, (len(quantiles) * 6) + 1, 6) + day_of_training_period = 2 + forecast_cube = _create_forecasts(frt, vt, data, return_cube=True) + forecast_cube = _add_day_of_training_period_to_cube( + forecast_cube, day_of_training_period, "forecast_reference_time" + ) + cube_inputs = CubeList([forecast_cube]) + ancil_cube = _create_ancil_file(return_cube=True) + cube_inputs.append(ancil_cube) + + plugin = PrepareAndApplyQRF( + feature_config, + "wind_speed_at_10m", + ) + return ( + qrf_model, + transformation, + pre_transform_addition, + cube_inputs, + forecast_cube, + ancil_cube, + plugin, + ) + + # Disable ruff formatting to keep the parameter combinations aligned for readability. # fmt: off @pytest.mark.parametrize("percentile_input", [True, False]) @@ -97,10 +159,6 @@ def test_prepare_and_apply_qrf( if include_static: feature_config["distance_to_water"] = ["static"] - unique_site_id_key = site_id[0] - if site_id == ["latitude", "longitude", "altitude"]: - unique_site_id_key = "station_id" - qrf_model = _run_train_qrf( feature_config, n_estimators, @@ -120,7 +178,7 @@ def test_prepare_and_apply_qrf( ], realization_data=[2, 6, 10], truth_data=[4.2, 6.2, 4.1, 5.1], - site_id=unique_site_id_key, + site_id=site_id, ) frt = "20170103T0000Z" @@ -198,116 +256,122 @@ def test_prepare_and_apply_qrf( assert np.allclose(result.coord("realization").points, range(len(quantiles))) -@pytest.mark.parametrize( - "exception", - [ - "no_model_output", - "no_features", - "missing_target_feature", - "missing_static_feature", - "missing_dynamic_feature", - "no_quantile_forest_package", - ], -) -def test_unexpected( - exception, -): - """Test PrepareAndApplyQRF plugin behaviour in atypical situations.""" - feature_config = {"wind_speed_at_10m": ["mean", "std", "latitude", "longitude"]} +def test_no_model_output(set_up_for_unexpected): + """Test PrepareAndApplyQRF plugin behaviour when no model is provided.""" + ( + qrf_model, + transformation, + pre_transform_addition, + cube_inputs, + forecast_cube, + ancil_cube, + plugin, + ) = set_up_for_unexpected - n_estimators = 2 - max_depth = 2 - random_state = 55 - transformation = None - pre_transform_addition = 0 - extra_kwargs = {} - include_static = False - quantiles = [0.5] + with pytest.warns(UserWarning, match="Unable to apply Quantile Regression Random Forest model."): + result = plugin(cube_inputs, qrf_descriptors=None) + assert isinstance(result, Cube) + assert result.name() == "wind_speed_at_10m" + assert result.units == "m s-1" + assert result.data.shape == forecast_cube.data.shape + assert np.allclose(result.data, forecast_cube.data) - qrf_model = _run_train_qrf( - feature_config, - n_estimators, - max_depth, - random_state, + +def test_no_features(set_up_for_unexpected): + """Test PrepareAndApplyQRF plugin behaviour when no features are provided.""" + ( + qrf_model, transformation, pre_transform_addition, - extra_kwargs, - include_static, - forecast_reference_times=[ - "20170101T0000Z", - "20170102T0000Z", - ], - validity_times=[ - "20170101T1200Z", - "20170102T1200Z", - ], - realization_data=[2, 6, 10], - truth_data=[4.2, 6.2, 4.1, 5.1], - ) + cube_inputs, + forecast_cube, + ancil_cube, + plugin, + ) = set_up_for_unexpected - frt = "20170103T0000Z" - vt = "20170103T1200Z" - data = np.arange(6, (len(quantiles) * 6) + 1, 6) - day_of_training_period = 2 - forecast_cube = _create_forecasts(frt, vt, data, return_cube=True) - forecast_cube = _add_day_of_training_period_to_cube( - forecast_cube, day_of_training_period, "forecast_reference_time" - ) - cube_inputs = CubeList([forecast_cube]) - ancil_cube = _create_ancil_file(return_cube=True) - cube_inputs.append(ancil_cube) + qrf_descriptors = (qrf_model, transformation, pre_transform_addition) + with pytest.raises(ValueError, match="No target forecast provided."): + plugin(CubeList(), qrf_descriptors=qrf_descriptors) - plugin = PrepareAndApplyQRF( - feature_config, - "wind_speed_at_10m", - ) +def test_missing_target_feature(set_up_for_unexpected): + """Test PrepareAndApplyQRF plugin behaviour when the target feature is missing.""" + ( + qrf_model, + transformation, + pre_transform_addition, + cube_inputs, + forecast_cube, + ancil_cube, + plugin, + ) = set_up_for_unexpected - if exception == "no_model_output": - with pytest.warns(UserWarning, match="Unable to apply Quantile Regression Random Forest model."): - result = plugin(cube_inputs, qrf_descriptors=None) - assert isinstance(result, Cube) - assert result.name() == "wind_speed_at_10m" - assert result.units == "m s-1" - assert result.data.shape == forecast_cube.data.shape - assert np.allclose(result.data, forecast_cube.data) - elif exception == "no_features": - qrf_descriptors = (qrf_model, transformation, pre_transform_addition) - with pytest.raises(ValueError, match="No target forecast provided."): - plugin(CubeList(), qrf_descriptors=qrf_descriptors) - elif exception == "missing_target_feature": - qrf_descriptors = (qrf_model, transformation, pre_transform_addition) - with pytest.raises(ValueError, match="No target forecast provided."): - plugin( - CubeList([ancil_cube]), - qrf_descriptors=qrf_descriptors, - ) - elif exception == "missing_static_feature": - qrf_descriptors = (qrf_model, transformation, pre_transform_addition) - feature_config = { - "wind_speed_at_10m": ["mean", "std"], - "distance_to_water": ["static"], - } - plugin.feature_config = feature_config - with pytest.raises(ValueError, match="The number of cubes loaded."): - plugin(CubeList([forecast_cube]), qrf_descriptors=qrf_descriptors) - elif exception == "missing_dynamic_feature": - qrf_descriptors = (qrf_model, transformation, pre_transform_addition) - feature_config = { - "wind_speed_at_10m": ["mean", "std"], - "air_temperature": ["mean", "std"], - } - plugin.feature_config = feature_config - with pytest.raises(ValueError, match="The number of cubes loaded."): - plugin(CubeList([forecast_cube]), qrf_descriptors=qrf_descriptors) - elif exception == "no_quantile_forest_package": - qrf_descriptors = (qrf_model, transformation, pre_transform_addition) - plugin.quantile_forest_installed = False - with pytest.warns(UserWarning, match="Unable to apply Quantile Regression Random Forest model."): - result = plugin(CubeList([forecast_cube]), qrf_descriptors=qrf_descriptors) - assert isinstance(result, Cube) - assert result.name() == "wind_speed_at_10m" - assert result.units == "m s-1" - assert result.data.shape == forecast_cube.data.shape - assert np.allclose(result.data, forecast_cube.data) - else: - raise ValueError(f"Unknown exception type: {exception}") + qrf_descriptors = (qrf_model, transformation, pre_transform_addition) + with pytest.raises(ValueError, match="No target forecast provided."): + plugin( + CubeList([ancil_cube]), + qrf_descriptors=qrf_descriptors, + ) + +def test_missing_static_feature(set_up_for_unexpected): + """Test PrepareAndApplyQRF plugin behaviour when a static feature is missing.""" + ( + qrf_model, + transformation, + pre_transform_addition, + cube_inputs, + forecast_cube, + ancil_cube, + plugin, + ) = set_up_for_unexpected + + qrf_descriptors = (qrf_model, transformation, pre_transform_addition) + feature_config = { + "wind_speed_at_10m": ["mean", "std"], + "distance_to_water": ["static"], + } + plugin.feature_config = feature_config + with pytest.raises(ValueError, match="The number of cubes loaded."): + plugin(CubeList([forecast_cube]), qrf_descriptors=qrf_descriptors) + +def test_missing_dynamic_feature(set_up_for_unexpected): + """Test PrepareAndApplyQRF plugin behaviour when a dynamic feature is missing.""" + ( + qrf_model, + transformation, + pre_transform_addition, + cube_inputs, + forecast_cube, + ancil_cube, + plugin, + ) = set_up_for_unexpected + + qrf_descriptors = (qrf_model, transformation, pre_transform_addition) + feature_config = { + "wind_speed_at_10m": ["mean", "std"], + "air_temperature": ["mean", "std"], + } + plugin.feature_config = feature_config + with pytest.raises(ValueError, match="The number of cubes loaded."): + plugin(CubeList([forecast_cube]), qrf_descriptors=qrf_descriptors) + +def test_no_quantile_forest_package(set_up_for_unexpected): + """Test PrepareAndApplyQRF plugin behaviour when the quantile_forest package is not installed.""" + ( + qrf_model, + transformation, + pre_transform_addition, + cube_inputs, + forecast_cube, + ancil_cube, + plugin, + ) = set_up_for_unexpected + + qrf_descriptors = (qrf_model, transformation, pre_transform_addition) + plugin.quantile_forest_installed = False + with pytest.warns(UserWarning, match="Unable to apply Quantile Regression Random Forest model."): + result = plugin(CubeList([forecast_cube]), qrf_descriptors=qrf_descriptors) + assert isinstance(result, Cube) + assert result.name() == "wind_speed_at_10m" + assert result.units == "m s-1" + assert result.data.shape == forecast_cube.data.shape + assert np.allclose(result.data, forecast_cube.data) diff --git a/improver_tests/calibration/quantile_regression_random_forests_calibration/test_load_and_train_quantile_regression_random_forest.py b/improver_tests/calibration/quantile_regression_random_forests_calibration/test_load_and_train_quantile_regression_random_forest.py index 5b34a52ab8..46d81e910a 100644 --- a/improver_tests/calibration/quantile_regression_random_forests_calibration/test_load_and_train_quantile_regression_random_forest.py +++ b/improver_tests/calibration/quantile_regression_random_forests_calibration/test_load_and_train_quantile_regression_random_forest.py @@ -24,6 +24,93 @@ SITE_ID = ["03001", "03002", "03003", "03004", "03005"] +def write_to_parquet(df, tmp_path, dirname, filename): + """Write a DataFrame to a parquet file. + + Args: + df: DataFrame to write to a parquet file. + tmp_path: Temporary path to save the parquet file. + dirname: Directory name. + filename: Name of parquet file. + + Returns: + Output directory containing the parquet file. + """ + output_dir = tmp_path / dirname + output_dir.mkdir(parents=True, exist_ok=True) + output_path = output_dir / filename + df.to_parquet(output_path, index=False, engine="pyarrow") + return output_dir + + +def add_wind_data_to_forecasts(data_dict, wind_speed_values, wind_dir_values): + """Add wind speed and direction data to a data dictionary. + + Args: + data_dict: Dictionary containing the data. + wind_speed_values: List of wind speed values to add. + wind_dir_values: List of wind direction values to add. + Returns: + A DataFrame containing the original data along with wind speed and + direction data. + """ + # Add wind speed to demonstrate filtering. + wind_speed_dict = data_dict.copy() + wind_speed_dict["forecast"] = wind_speed_values + wind_speed_dict["cf_name"] = "wind_speed" + wind_speed_dict["units"] = "m s-1" + wind_speed_dict["diagnostic"] = "wind_speed_at_10m" + wind_dir_dict = data_dict.copy() + wind_dir_dict["forecast"] = wind_dir_values + wind_dir_dict["cf_name"] = "wind_direction" + wind_dir_dict["units"] = "degrees" + wind_dir_dict["diagnostic"] = "wind_from_direction" + data_df = pd.DataFrame(data_dict) + wind_speed_df = pd.DataFrame(wind_speed_dict) + wind_dir_df = pd.DataFrame(wind_dir_dict) + return pd.concat([data_df, wind_speed_df, wind_dir_df], ignore_index=True) + + +def add_wind_data_to_truth(data_dict, wind_speed_values): + """Add wind speed data to a data dictionary. + + Args: + data_dict: Dictionary containing the data. + wind_speed_values: List of wind speed values to add. + + Returns: + A DataFrame containing the original data along with wind speed data. + """ + wind_speed_dict = data_dict.copy() + wind_speed_dict["ob_value"] = wind_speed_values + wind_speed_dict["diagnostic"] = "wind_speed_at_10m" + data_df = pd.DataFrame(data_dict) + wind_speed_df = pd.DataFrame(wind_speed_dict) + return pd.concat([data_df, wind_speed_df], ignore_index=True) + + +def modify_representation(data_dict, representation, values): + """Modify the ensemble representation in a data dictionary. + + Args: + data_dict: Dictionary containing the data. + representation: The type of ensemble representation to use. Options are + "percentile", "realization" or "kittens". "kittens" is just used for testing + that the code works with a non-standard name. + values: List of values to use for the new representation. + + Returns: + Amended dictionary. + """ + if representation == "realization": + data_dict["realization"] = values + data_dict.pop("percentile") + elif representation == "kittens": + data_dict["kittens"] = values + data_dict.pop("percentile") + return data_dict + + def _create_multi_site_forecast_parquet_file(tmp_path, representation=None): """Create a parquet file with multi-site forecast data. @@ -54,34 +141,15 @@ def _create_multi_site_forecast_parquet_file(tmp_path, representation=None): "diagnostic": ["temperature_at_screen_level"] * 5, } # Other representations used for testing. - if representation == "realization": - data_dict["realization"] = list(range(len(data_dict["percentile"]))) - data_dict.pop("percentile") - elif representation == "kittens": - data_dict["kittens"] = list(range(len(data_dict["percentile"]))) - data_dict.pop("percentile") - - # Add wind speed to demonstrate filtering. - wind_speed_dict = data_dict.copy() - wind_speed_dict["forecast"] = [8, 19, 16, 12, 10] - wind_speed_dict["cf_name"] = "wind_speed" - wind_speed_dict["units"] = "m s-1" - wind_speed_dict["diagnostic"] = "wind_speed_at_10m" - wind_dir_dict = data_dict.copy() - wind_dir_dict["forecast"] = [90, 100, 110, 120, 130] - wind_dir_dict["cf_name"] = "wind_direction" - wind_dir_dict["units"] = "degrees" - wind_dir_dict["diagnostic"] = "wind_from_direction" - data_df = pd.DataFrame(data_dict) - wind_speed_df = pd.DataFrame(wind_speed_dict) - wind_dir_df = pd.DataFrame(wind_dir_dict) - joined_df = pd.concat([data_df, wind_speed_df, wind_dir_df], ignore_index=True) - - output_dir = tmp_path / "forecast_parquet_files" - output_dir.mkdir(parents=True, exist_ok=True) - output_path = output_dir / "forecast.parquet" - joined_df.to_parquet(output_path, index=False, engine="pyarrow") - + data_dict = modify_representation( + data_dict, representation, list(range(len(data_dict["percentile"]))) + ) + joined_df = add_wind_data_to_forecasts( + data_dict, [8, 19, 16, 12, 10], [90, 100, 110, 120, 130] + ) + output_dir = write_to_parquet( + joined_df, tmp_path, "forecast_parquet_files", "forecast.parquet" + ) return output_dir, joined_df, data_dict["wmo_id"] @@ -115,32 +183,15 @@ def _create_multi_percentile_forecast_parquet_file(tmp_path, representation=None "diagnostic": ["temperature_at_screen_level"] * 5, } # Other representations used for testing. - if representation == "realization": - data_dict["realization"] = list(range(len(data_dict["percentile"]))) - data_dict.pop("percentile") - elif representation == "kittens": - data_dict["kittens"] = list(range(len(data_dict["percentile"]))) - data_dict.pop("percentile") - # Add wind speed to demonstrate filtering. - wind_speed_dict = data_dict.copy() - wind_speed_dict["forecast"] = [6, 10, 11, 12, 15] - wind_speed_dict["cf_name"] = "wind_speed" - wind_speed_dict["units"] = "m s-1" - wind_speed_dict["diagnostic"] = "wind_speed_at_10m" - wind_dir_dict = data_dict.copy() - wind_dir_dict["forecast"] = [90, 100, 110, 120, 130] - wind_dir_dict["cf_name"] = "wind_direction" - wind_dir_dict["units"] = "degrees" - wind_dir_dict["diagnostic"] = "wind_from_direction" - data_df = pd.DataFrame(data_dict) - wind_speed_df = pd.DataFrame(wind_speed_dict) - wind_dir_df = pd.DataFrame(wind_dir_dict) - joined_df = pd.concat([data_df, wind_speed_df, wind_dir_df], ignore_index=True) - - output_dir = tmp_path / "forecast_parquet_files" - output_dir.mkdir(parents=True, exist_ok=True) - output_path = output_dir / "forecast.parquet" - joined_df.to_parquet(output_path, index=False, engine="pyarrow") + data_dict = modify_representation( + data_dict, representation, list(range(len(data_dict["percentile"]))) + ) + joined_df = add_wind_data_to_forecasts( + data_dict, [6, 10, 11, 12, 15], [90, 100, 110, 120, 130] + ) + output_dir = write_to_parquet( + joined_df, tmp_path, "forecast_parquet_files", "forecast.parquet" + ) return output_dir, joined_df, data_dict["wmo_id"] @@ -182,33 +233,13 @@ def _create_multi_forecast_period_forecast_parquet_file(tmp_path, representation "height": [1.5] * 4, "diagnostic": ["temperature_at_screen_level"] * 4, } - # Other representations used for testing. - if representation == "realization": - data_dict["realization"] = [0, 1, 0, 1] - data_dict.pop("percentile") - elif representation == "kittens": - data_dict["kittens"] = [0, 1, 0, 1] - data_dict.pop("percentile") - # Add wind speed to demonstrate filtering. - wind_speed_dict = data_dict.copy() - wind_speed_dict["forecast"] = [6, 16, 12, 15] - wind_speed_dict["cf_name"] = "wind_speed" - wind_speed_dict["units"] = "m s-1" - wind_speed_dict["diagnostic"] = "wind_speed_at_10m" - wind_dir_dict = data_dict.copy() - wind_dir_dict["forecast"] = [180, 190, 200, 210] - wind_dir_dict["cf_name"] = "wind_from_direction" - wind_dir_dict["units"] = "degrees" - wind_dir_dict["diagnostic"] = "wind_direction" - data_df = pd.DataFrame(data_dict) - wind_speed_df = pd.DataFrame(wind_speed_dict) - wind_dir_df = pd.DataFrame(wind_dir_dict) - joined_df = pd.concat([data_df, wind_speed_df, wind_dir_df], ignore_index=True) - - output_dir = tmp_path / "forecast_parquet_files" - output_dir.mkdir(parents=True, exist_ok=True) - output_path = output_dir / "forecast.parquet" - joined_df.to_parquet(output_path, index=False, engine="pyarrow") + data_dict = modify_representation(data_dict, representation, [0, 1, 0, 1]) + joined_df = add_wind_data_to_forecasts( + data_dict, [6, 16, 12, 15], [180, 190, 200, 210] + ) + output_dir = write_to_parquet( + joined_df, tmp_path, "forecast_parquet_files", "forecast.parquet" + ) return output_dir, joined_df, data_dict["wmo_id"] @@ -224,17 +255,10 @@ def _create_multi_site_truth_parquet_file(tmp_path): "station_id": ["03001", "03002", "03003", "03004", "03005"], "ob_value": [276.0, 270.0, 289.0, 290.0, 301.0], } - wind_speed_dict = data_dict.copy() - wind_speed_dict["ob_value"] = [3, 22, 24, 11, 9] - wind_speed_dict["diagnostic"] = "wind_speed_at_10m" - data_df = pd.DataFrame(data_dict) - wind_speed_df = pd.DataFrame(wind_speed_dict) - joined_df = pd.concat([data_df, wind_speed_df], ignore_index=True) - - output_dir = tmp_path / "truth_parquet_files" - output_dir.mkdir(parents=True, exist_ok=True) - output_path = output_dir / "truth.parquet" - joined_df.to_parquet(output_path, index=False, engine="pyarrow") + joined_df = add_wind_data_to_truth(data_dict, [3, 22, 24, 11, 9]) + output_dir = write_to_parquet( + joined_df, tmp_path, "truth_parquet_files", "truth.parquet" + ) return output_dir, joined_df @@ -250,17 +274,10 @@ def _create_multi_percentile_truth_parquet_file(tmp_path): "station_id": ["03001"], "ob_value": [276.0], } - wind_speed_dict = data_dict.copy() - wind_speed_dict["ob_value"] = [9] - wind_speed_dict["diagnostic"] = "wind_speed_at_10m" - data_df = pd.DataFrame(data_dict) - wind_speed_df = pd.DataFrame(wind_speed_dict) - joined_df = pd.concat([data_df, wind_speed_df], ignore_index=True) - - output_dir = tmp_path / "truth_parquet_files" - output_dir.mkdir(parents=True, exist_ok=True) - output_path = output_dir / "truth.parquet" - joined_df.to_parquet(output_path, index=False, engine="pyarrow") + joined_df = add_wind_data_to_truth(data_dict, [9]) + output_dir = write_to_parquet( + joined_df, tmp_path, "truth_parquet_files", "truth.parquet" + ) return output_dir, joined_df @@ -279,17 +296,10 @@ def _create_multi_forecast_period_truth_parquet_file(tmp_path): "station_id": ["03001", "03002", "03001", "03002"], "ob_value": [280, 273, 284, 275], } - wind_speed_dict = data_dict.copy() - wind_speed_dict["ob_value"] = [2.0, 11.0, 10.0, 14.0] - wind_speed_dict["diagnostic"] = "wind_speed_at_10m" - data_df = pd.DataFrame(data_dict) - wind_speed_df = pd.DataFrame(wind_speed_dict) - joined_df = pd.concat([data_df, wind_speed_df], ignore_index=True) - - output_dir = tmp_path / "truth_parquet_files" - output_dir.mkdir(parents=True, exist_ok=True) - output_path = output_dir / "truth.parquet" - joined_df.to_parquet(output_path, index=False, engine="pyarrow") + joined_df = add_wind_data_to_truth(data_dict, [2.0, 11.0, 10.0, 14.0]) + output_dir = write_to_parquet( + joined_df, tmp_path, "truth_parquet_files", "truth.parquet" + ) return output_dir, joined_df @@ -305,17 +315,10 @@ def _create_multi_site_truth_parquet_file_alt(tmp_path, site_id="wmo_id"): "station_id": ["03001", "03002", "03003", "03004", "03005"], "ob_value": [10.0, 25.0, 4.0, 3.0, 11.0], } - wind_speed_dict = data_dict.copy() - wind_speed_dict["ob_value"] = [3.0, 22.0, 24.0, 11.0, 9.0] - wind_speed_dict["diagnostic"] = "wind_speed_at_10m" - data_df = pd.DataFrame(data_dict) - wind_speed_df = pd.DataFrame(wind_speed_dict) - joined_df = pd.concat([data_df, wind_speed_df], ignore_index=True) - - output_dir = tmp_path / "truth_parquet_files" - output_dir.mkdir(parents=True, exist_ok=True) - output_path = output_dir / "truth.parquet" - joined_df.to_parquet(output_path, index=False, engine="pyarrow") + joined_df = add_wind_data_to_truth(data_dict, [3.0, 22.0, 24.0, 11.0, 9.0]) + output_dir = write_to_parquet( + joined_df, tmp_path, "truth_parquet_files", "truth.parquet" + ) return output_dir, joined_df @@ -756,7 +759,7 @@ def test_unexpected_loading( ( _create_multi_forecast_period_forecast_parquet_file, _create_multi_forecast_period_truth_parquet_file, - "12", + "6", ), ], ) @@ -822,7 +825,10 @@ def test_prepare_and_train_qrf( if include_latlon_nans: # As latitude is not a feature, this NaN should be ignored. - truth_df.loc[1, "latitude"] = pd.NA + if len(truth_df) == 1: + truth_df.loc[0, "latitude"] = pd.NA + else: + truth_df.loc[1, "latitude"] = pd.NA if add_kwargs: kwargs = {"min_samples_leaf": 2} @@ -842,7 +848,7 @@ def test_prepare_and_train_qrf( unique_site_id_keys=site_id, **(kwargs if add_kwargs else {}), ) - if truth_df["ob_value"].isna().all(): + if truth_df["ob_value"].isna().all() or truth_df["latitude"].isna().all(): with pytest.raises(ValueError, match="Empty truth DataFrame"): plugin(forecast_df, truth_df) return @@ -882,6 +888,113 @@ def test_prepare_and_train_qrf( np.testing.assert_almost_equal(result, expected, decimal=1) +@pytest.mark.parametrize("include_nans", [True, False]) +@pytest.mark.parametrize("include_latlon_nans", [True, False]) +@pytest.mark.parametrize("site_id", [["wmo_id"], ["latitude", "longitude"]]) +@pytest.mark.parametrize( + "forecast_creation,truth_creation", + [ + ( + _create_multi_site_forecast_parquet_file, + _create_multi_site_truth_parquet_file, + ), + ( + _create_multi_percentile_forecast_parquet_file, + _create_multi_percentile_truth_parquet_file, + ), + ( + _create_multi_forecast_period_forecast_parquet_file, + _create_multi_forecast_period_truth_parquet_file, + ), + ], +) +def test_filter_bad_sites( + tmp_path, + include_nans, + include_latlon_nans, + site_id, + forecast_creation, + truth_creation, +): + """Check that the filtering of bad sites results in rows with observed values of + NaN or NaNs in the site_id e.g. ['wmo_id'] or ['latitude', 'longitude'] being + filtered out of both the truth and forecast DataFrames.""" + feature_config = {"air_temperature": ["mean", "std", "altitude"]} + n_estimators = 2 + max_depth = 5 + random_state = 46 + target_cf_name = "air_temperature" + forecast_periods = "6:18:6" + representation = "percentile" + + _, forecast_df, _ = forecast_creation(tmp_path, representation) + forecast_df = amend_expected_forecast_df( + forecast_df, + forecast_periods, + ["temperature_at_screen_level"], + representation, + site_id, + ) + _, truth_df = truth_creation(tmp_path) + + truth_df = amend_expected_truth_df(truth_df, "temperature_at_screen_level") + + if include_nans: + # Insert a NaN will result in this row being dropped. + truth_df.loc[0, "ob_value"] = pd.NA + + if include_latlon_nans: + # As latitude is not a feature, this NaN should be ignored. + if len(truth_df) == 1: + truth_df.loc[0, "latitude"] = pd.NA + else: + truth_df.loc[1, "latitude"] = pd.NA + + # Create an instance of PrepareAndTrainQRF with the required parameters + plugin = PrepareAndTrainQRF( + feature_config=feature_config, + target_cf_name=target_cf_name, + n_estimators=n_estimators, + max_depth=max_depth, + random_state=random_state, + transformation="log", + pre_transform_addition=1, + unique_site_id_keys=site_id, + ) + if include_nans and truth_df["ob_value"].isna().all(): + with pytest.raises(ValueError, match="Empty truth DataFrame"): + plugin.filter_bad_sites(forecast_df.copy(), truth_df.copy()) + return + elif ( + include_latlon_nans + and truth_df["latitude"].isna().all() + and site_id + == [ + "latitude", + "longitude", + ] + ): + with pytest.raises(ValueError, match="Empty truth DataFrame"): + plugin.filter_bad_sites(forecast_df.copy(), truth_df.copy()) + return + result_forecast_df, result_truth_df = plugin.filter_bad_sites( + forecast_df.copy(), + truth_df.copy(), + ) + + expected_truth_len = len(truth_df) + expected_forecast_len = len(forecast_df) + if include_nans: + expected_truth_len -= 1 + expected_forecast_len -= 1 + if include_latlon_nans and "latitude" in site_id: + expected_truth_len -= 1 + expected_forecast_len -= 1 + + assert len(result_truth_df) == expected_truth_len + assert len(result_forecast_df) == expected_forecast_len + + def test_unexpected_preparation( tmp_path, ): diff --git a/improver_tests/calibration/quantile_regression_random_forests_calibration/test_quantile_regression_random_forest.py b/improver_tests/calibration/quantile_regression_random_forests_calibration/test_quantile_regression_random_forest.py index 1dabc77b24..4ddd71fc14 100644 --- a/improver_tests/calibration/quantile_regression_random_forests_calibration/test_quantile_regression_random_forest.py +++ b/improver_tests/calibration/quantile_regression_random_forests_calibration/test_quantile_regression_random_forest.py @@ -46,7 +46,8 @@ def _create_forecasts( representation: str = "realization", return_cube: bool = False, ) -> Cube | pd.DataFrame: - """Create site forecast cube with realizations. + """Create site forecast cube or DataFrame with the specified representation, + either realization or percentile. Args: forecast_reference_time: Timestamp e.g. "20170101T0000Z". @@ -58,7 +59,8 @@ def _create_forecasts( return_cube: If True, return a cube. If False, return a DataFrame. Returns: - Forecast cube containing three percentiles and two sites. + Forecast cube or DataFrame containing three realizations or three percentiles + for two sites. """ data = np.array([data, data + 2], dtype=np.float32).T cube = set_up_spot_variable_cube( @@ -107,7 +109,6 @@ def _add_day_of_training_period(df: pd.DataFrame) -> pd.DataFrame: Args: df: DataFrame to which the day of training period coordinate will be added. - day_of_training_period: Day of training period to be added. Returns: Cube with the day of training period coordinate added. @@ -178,8 +179,7 @@ def _run_train_qrf( realization_data=[2, 6, 10], truth_data=[4.2, 3.8, 5.8, 6, 7, 7.3, 9.1, 9.5], tmp_path=None, - compression=5, - site_id="wmo_id", + site_id=["wmo_id"], ): realization_data = np.array(realization_data, dtype=np.float32) forecast_dfs = [] @@ -206,7 +206,7 @@ def _run_train_qrf( if include_static: ancil_df = _create_ancil_file() forecast_df = forecast_df.merge( - ancil_df[[site_id, "distance_to_water"]], on=[site_id], how="left" + ancil_df[[*site_id, "distance_to_water"]], on=[*site_id], how="left" ) feature_config["distance_to_water"] = ["static"] if "air_temperature" in feature_config.keys(): @@ -228,7 +228,7 @@ def _run_train_qrf( result = plugin.process(forecast_df, truth_df) if tmp_path is not None: model_output = tmp_path / "qrf_model.pickle" - joblib.dump(result, model_output, compress=compression) + joblib.dump(result, model_output) return model_output return result @@ -743,22 +743,34 @@ def test_train_qrf_multiple_lead_times( @pytest.mark.parametrize( "feature_config,data,include_static,site_id,expected", [ - ({"wind_speed_at_10m": ["mean"]}, [5], False, "wmo_id", [5]), # One feature - ({"wind_speed_at_10m": ["mean"]}, [5], False, "station_id", [5]), # One feature - ({"wind_speed_at_10m": ["latitude"]}, [61], False, "wmo_id", [7.75]), # noqa Without the target - ({"wind_speed_at_10m": ["mean"]}, [5], True, "wmo_id", [4]), # With static data + ({"wind_speed_at_10m": ["mean"]}, [5], False, ["wmo_id"], [5]), # One feature + ( + {"wind_speed_at_10m": ["mean"]}, + [5], + False, + ["station_id"], + [5], + ), # One feature + ({"wind_speed_at_10m": ["latitude"]}, [61], False, ["wmo_id"], [7.75]), # noqa Without the target + ( + {"wind_speed_at_10m": ["mean"]}, + [5], + True, + ["wmo_id"], + [4], + ), # With static data ( {"wind_speed_at_10m": ["mean"], "air_temperature": ["mean"]}, [5], False, - "wmo_id", + ["wmo_id"], [5], ), # Multiple dynamic features ( {"wind_speed_at_10m": ["mean"], "pressure_at_mean_sea_level": ["mean"]}, [5], False, - "wmo_id", + ["wmo_id"], "Feature 'pressure_at_mean_sea_level' is not present", ), # Multiple dynamic features ],