diff --git a/src/common/io.py b/src/common/io.py index 3400c745..e889d2cb 100644 --- a/src/common/io.py +++ b/src/common/io.py @@ -3121,24 +3121,23 @@ def get_result(self) -> ResultDymolaBinary: return ResultDymolaBinary(self._get_file_name()) class _DelayedVarReader4Diags(DelayedVarReader4): + def _create_section_data(self, section_name, hdr): + return { + "section": section_name, + "file_position": self.mat_stream.tell(), + "sizeof_type": hdr.dtype.itemsize, + "nbr_points": hdr.dims[1], + "nbr_variables": hdr.dims[0] + } + def read_sub_array(self, hdr, copy=True): match hdr.name: case b"data_2": - return { - "section": "data_2", - "file_position": self.mat_stream.tell(), - "sizeof_type": hdr.dtype.itemsize, - "nbr_points": hdr.dims[1], - "nbr_variables": hdr.dims[0] - } + return self._create_section_data("data_2", hdr) case b"data_3": - return { - "section": "data_3", - "file_position": self.mat_stream.tell(), - "sizeof_type": hdr.dtype.itemsize, - "nbr_points": hdr.dims[1], - "nbr_variables": hdr.dims[0] - } + return self._create_section_data("data_3", hdr) + case b"data_4": + return self._create_section_data("data_4", hdr) case b"name": return { "section": "name", @@ -3177,11 +3176,7 @@ def __init__(self, fname): elif hasattr(fname, "name") and os.path.isfile(fname.name): self._fname = fname.name - data_sections = ["name", "dataInfo", "data_2", "data_3"] - with open(self._fname, "rb") as f: - delayed = _DelayedVariableLoadDiags(f, chars_as_strings=False) - self.raw: dict = delayed.get_variables(variable_names = data_sections) - + self.raw = self._load_raw_data_info() self._name_info: dict = self.raw["name"] self._dataInfo: dict = self.raw["dataInfo"] self._data_2_info: dict = self.raw["data_2"] @@ -3189,6 +3184,12 @@ def __init__(self, fname): if self._contains_diagnostic_data: self._data_3_info: dict = self.raw["data_3"] + def _load_raw_data_info(self) -> dict: + data_sections = ["name", "dataInfo", "data_2", "data_3"] + with open(self._fname, "rb") as f: + delayed = _DelayedVariableLoadDiags(f, chars_as_strings=False) + return delayed.get_variables(variable_names = data_sections) + @cache def _get_variable_name_to_index_dict(self) -> dict[str, int]: name_dict: dict = fmi_util.read_name_list( @@ -3283,6 +3284,62 @@ def _get_interpolated_trajectory(self, data_index: int) -> np.ndarray: f = scipy.interpolate.interp1d(time_vector, data, fill_value="extrapolate") return f(diag_time_vector) + +class ResultReaderBinaryMat(ResultReader): + def __init__(self, fname, allow_file_updates=False): + """ + Load a .mat result file. + + Parameters:: + + fname -- + Name of file or a file object supported by scipy.io.loadmat, + which the result is written to. + + allow_file_updates -- + If this is True, file updates (in terms of more + data points being added to the result file) is allowed. + The number of variables stored in the file needs to be + exactly the same and only the number of data points for + the continuous variables are allowed to change. + Default: False + """ + self._delegate = self._get_delegate(fname, allow_file_updates) + + + def _get_delegate(self, fname, allow_file_updates: bool): + """Determines what delegate to use based on input result data""" + try: + with open(fname, "rb") as f: + delayed = _DelayedVariableLoadDiags(f, chars_as_strings=False) + data_sections = ["data_3", "data_4"] + raw_data_info = delayed.get_variables(variable_names = data_sections) + except FileNotFoundError as e: + raise NoResultError(str(e)) from e + + if raw_data_info.get("data_3") is not None and raw_data_info.get("data_4") is None: + # The result is 'consolidated' if 'data_3' exists but not 'data_4', meaning + # the dynamic diagnostic variable data exists in 'data_3'. This reader also + # handle results only containing 'data_2' but for now 'ResultDymolaBinary' is used. + # NOTE: Argument 'allow_file_updates' is ignored here. Consolidated results cannot + # be updated so it should not matter what value is given. + return _ResultReaderBinaryMatConsolidated(fname) + else: + return ResultDymolaBinary( + fname, + allow_file_updates=allow_file_updates, + ) + + def get_variable_names(self) -> list[str]: + return self._delegate.get_variable_names() + + def get_trajectory(self, name: str) -> Trajectory: + return self._delegate.get_trajectory(name) + + def get_trajectories(self, names: list[str]) -> dict[str, Trajectory]: + return self._delegate.get_trajectories(names) + + def verify_result_size(file_name, first_point, current_size, previous_size, max_size, ncp, time): free_space = get_available_disk_space(file_name) diff --git a/tests/test_io.py b/tests/test_io.py index 2a3b44ed..28c15788 100644 --- a/tests/test_io.py +++ b/tests/test_io.py @@ -24,6 +24,7 @@ import re from io import StringIO, BytesIO from collections import OrderedDict +from typing import Protocol from pyfmi import load_fmu from scipy.io import savemat @@ -45,7 +46,7 @@ ResultCSVTextual, ResultHandlerBinaryFile, ResultHandlerFile, - _ResultReaderBinaryMatConsolidated, + ResultReaderBinaryMat, VariableNotFoundError, ResultHandlerMemory, Trajectory, @@ -576,6 +577,31 @@ def coupled_clutches_cs_2_0(): def coupled_clutches_me_1_0(): return Dummy_FMUModelME1([], os.path.join(file_path, "files", "FMUs", "XML", "ME1.0", "CoupledClutches.fmu"), _connect_dll=False) +class ResultReaderFactory(Protocol): + def __call__(self, fname, delayed_trajectory_loading=True, allow_file_updates=False) -> ResultDymolaBinary | ResultReaderBinaryMat: + ... + +def create_result_dymola_binary_reader(fname, delayed_trajectory_loading=True, allow_file_updates=False): + return ResultDymolaBinary( + fname, + delayed_trajectory_loading=delayed_trajectory_loading, + allow_file_updates=allow_file_updates, + ) + +def create_result_reader_binary_mat(fname, delayed_trajectory_loading=True, allow_file_updates=False): + return ResultReaderBinaryMat( + fname, + allow_file_updates=allow_file_updates, + ) + +@pytest.fixture(params=[create_result_dymola_binary_reader, create_result_reader_binary_mat]) +def result_reader_cls(request) -> ResultReaderFactory: + return request.param + +@pytest.fixture +def double_pendulum_mat_file(): + return os.path.join(file_path, "files", "Results", "DoublePendulum.mat") + @pytest.mark.assimulo class TestResultFileBinary: @@ -622,7 +648,7 @@ def test_get_description(self, coupled_clutches_me_1_0): assert res.description[res.get_variable_index("J1.phi")] == "Absolute rotation angle of component" - def test_modified_result_file_data_diagnostics(self, coupled_clutches_me_2_0): + def test_modified_result_file_data_diagnostics(self, result_reader_cls: ResultReaderFactory, coupled_clutches_me_2_0): """Verify that computed diagnostics can be retrieved from an updated result file""" model = coupled_clutches_me_2_0 model.setup_experiment() @@ -661,7 +687,7 @@ def test_modified_result_file_data_diagnostics(self, coupled_clutches_me_2_0): result_writer.integration_point() result_writer.diagnostics_point(diag_data) - res = ResultDymolaBinary('CoupledClutches_result.mat', allow_file_updates=True) + res = result_reader_cls('CoupledClutches_result.mat', allow_file_updates=True) assert len(res.get_trajectory("@Diagnostics.state_errors.clutch1.w_rel").x) == 2, res.get_trajectory("@Diagnostics.state_errors.clutch1.w_rel").x @@ -812,7 +838,7 @@ def test_modified_result_file_data_1_delayed(self, coupled_clutches_me_2_0): #Assert that no exception is raised res.get_trajectory("J2.J") - def test_modified_result_file_time(self, coupled_clutches_me_2_0): + def test_modified_result_file_time(self, result_reader_cls: ResultReaderFactory, coupled_clutches_me_2_0): """Verify that 'time' can be retrieved from an updated result file""" model = coupled_clutches_me_2_0 model.setup_experiment() @@ -824,7 +850,7 @@ def test_modified_result_file_time(self, coupled_clutches_me_2_0): result_writer.initialize_complete() result_writer.integration_point() - res = ResultDymolaBinary('CoupledClutches_result.mat', allow_file_updates=True) + res = result_reader_cls('CoupledClutches_result.mat', allow_file_updates=True) res.get_trajectory("time") @@ -880,16 +906,16 @@ def test_overwriting_results(self, coupled_clutches_me_1_0): with pytest.raises(JIOError): res.get_trajectory("J1.phi") - def test_read_all_variables(self): - res = ResultDymolaBinary(os.path.join(file_path, "files", "Results", "DoublePendulum.mat")) + def test_read_all_variables(self, double_pendulum_mat_file: str, result_reader_cls: ResultReaderFactory): + res = result_reader_cls(double_pendulum_mat_file) assert len(res.get_variable_names()) == 1097, "Incorrect number of variables found, should be 1097" for var in res.get_variable_names(): res.get_trajectory(var) - def test_data_matrix_delayed_loading(self): - res = ResultDymolaBinary(os.path.join(file_path, "files", "Results", "DoublePendulum.mat"), delayed_trajectory_loading=True) + def test_data_matrix_delayed_loading(self, double_pendulum_mat_file: str): + res = ResultDymolaBinary(double_pendulum_mat_file, delayed_trajectory_loading=True) data_matrix = res.get_data_matrix() @@ -898,8 +924,8 @@ def test_data_matrix_delayed_loading(self): assert nbr_continuous_variables == 68, "Number of variables is incorrect, should be 68" assert nbr_points == 502, "Number of points is incorrect, should be 502" - def test_data_matrix_loading(self): - res = ResultDymolaBinary(os.path.join(file_path, "files", "Results", "DoublePendulum.mat"), delayed_trajectory_loading=False) + def test_data_matrix_loading(self, double_pendulum_mat_file: str): + res = ResultDymolaBinary(double_pendulum_mat_file, delayed_trajectory_loading=False) data_matrix = res.get_data_matrix() @@ -908,9 +934,9 @@ def test_data_matrix_loading(self): assert nbr_continuous_variables == 68, "Number of variables is incorrect, should be 68" assert nbr_points == 502, "Number of points is incorrect, should be 502" - def test_read_all_variables_from_stream(self): + def test_read_all_variables_from_stream(self, double_pendulum_mat_file: str): - with open(os.path.join(file_path, "files", "Results", "DoublePendulum.mat"), "rb") as f: + with open(double_pendulum_mat_file, "rb") as f: res = ResultDymolaBinary(f) assert len(res.get_variable_names()) == 1097, "Incorrect number of variables found, should be 1097" @@ -918,12 +944,12 @@ def test_read_all_variables_from_stream(self): for var in res.get_variable_names(): res.get_trajectory(var) - def test_compare_all_variables_from_stream(self): - res_file = ResultDymolaBinary(os.path.join(file_path, "files", "Results", "DoublePendulum.mat")) + def test_compare_all_variables_from_stream(self, double_pendulum_mat_file: str): + res_file = ResultDymolaBinary(double_pendulum_mat_file) assert len(res_file.get_variable_names()) == 1097, "Incorrect number of variables found, should be 1097" - with open(os.path.join(file_path, "files", "Results", "DoublePendulum.mat"), "rb") as f: + with open(double_pendulum_mat_file, "rb") as f: res_stream = ResultDymolaBinary(f) assert len(res_stream.get_variable_names()) == 1097, "Incorrect number of variables found, should be 1097" @@ -933,9 +959,9 @@ def test_compare_all_variables_from_stream(self): np.testing.assert_array_equal(x_file.x, x_stream.x, err_msg="Mismatch in array values for var=%s"%var) - def test_on_demand_loading_32_bits(self): - res_demand = ResultDymolaBinary(os.path.join(file_path, "files", "Results", "DoublePendulum.mat")) - res_all = ResultDymolaBinary(os.path.join(file_path, "files", "Results", "DoublePendulum.mat")) + def test_on_demand_loading_32_bits(self, double_pendulum_mat_file: str, result_reader_cls: ResultReaderFactory): + res_demand = result_reader_cls(double_pendulum_mat_file, delayed_trajectory_loading=True) + res_all = result_reader_cls(double_pendulum_mat_file, delayed_trajectory_loading=False) t_demand = res_demand.get_trajectory('time').x t_all = res_all.get_trajectory('time').x np.testing.assert_array_equal(t_demand, t_all, "On demand loaded result and all loaded does not contain equal result.") @@ -2406,13 +2432,13 @@ def mat_file_singular_data(tmp_path): class TestResultReaderForBinaryMatConsolidated: def test_get_all_variable_names(self, mat_file): - result = _ResultReaderBinaryMatConsolidated(mat_file) + result = ResultReaderBinaryMat(mat_file) variables = result.get_variable_names() expected = {"spring.phi_nominal", "spring.k_constant", "time", "torque.flange.phi", "@Diagnostics.step_time", "@Diagnostics.error_code"} assert set(variables) == expected def test_get_values_assert_valid(self, mat_file): - result = _ResultReaderBinaryMatConsolidated(mat_file) + result = ResultReaderBinaryMat(mat_file) # Test spring.phi_nominal (data_1, constant value) traj_phi = result.get_trajectory("spring.phi_nominal") @@ -2443,7 +2469,7 @@ def test_get_values_assert_valid(self, mat_file): assert np.allclose(traj_error.x, [0.0, 1.0, 0.0]) def test_get_data_only_one_len(self, mat_file_singular_data): - result = _ResultReaderBinaryMatConsolidated(mat_file_singular_data) + result = ResultReaderBinaryMat(mat_file_singular_data) # Test time variable traj_time = result.get_trajectory("time") @@ -2458,25 +2484,35 @@ def test_get_data_only_one_len(self, mat_file_singular_data): assert np.allclose(traj_phi.x, [1.0]) def test_get_all_non_existing_variable_throws(self, mat_file): - result = _ResultReaderBinaryMatConsolidated(mat_file) + result = ResultReaderBinaryMat(mat_file) with pytest.raises(VariableNotFoundError): result.get_trajectory("does.not.exist") def test_get_trajectories_from_all_matrices(self, mat_file): - result = _ResultReaderBinaryMatConsolidated(mat_file) + result = ResultReaderBinaryMat(mat_file) for var in ["spring.phi_nominal", "torque.flange.phi", "@Diagnostics.step_time"]: assert result.get_trajectory(var) is not None def test_with_diagnostic_variable(self, mat_file): - result = _ResultReaderBinaryMatConsolidated(mat_file) + result = ResultReaderBinaryMat(mat_file) assert result.get_trajectory("@Diagnostics.step_time") is not None def test_without_diagnostic_variable(self, mat_file_no_diag): - result_no_diag = _ResultReaderBinaryMatConsolidated(mat_file_no_diag) + result_no_diag = ResultReaderBinaryMat(mat_file_no_diag) assert "@Diagnostics.step_time" not in result_no_diag.get_variable_names() + def test_without_diagnostic_variable_delegates(self, mat_file_no_diag): + result = ResultReaderBinaryMat(mat_file_no_diag) + assert result.get_trajectory("time") is not None + assert result.get_trajectories(["time"])["time"] is not None + + def test_result_does_not_exist_raises_error(self): + with pytest.raises(NoResultError): + ResultReaderBinaryMat("does-not-exists") + + def test_interpolation_between_points(mat_file_interpolation): - result = _ResultReaderBinaryMatConsolidated(mat_file_interpolation) + result = ResultReaderBinaryMat(mat_file_interpolation) traj = result.get_trajectory("spring.phi_nominal") assert np.allclose(traj.t, [0.0, 0.5, 1.0, 1.5, 2.0])