From 4e528fdf24d309fe62d5c08413dcc20e99f2d922 Mon Sep 17 00:00:00 2001 From: Emil Fredriksson Date: Mon, 12 Jan 2026 15:09:07 +0000 Subject: [PATCH 1/3] feat: new result reader class 'ResultReaderBinaryMat' that can delegate either to 'default' or new consolidated implementation docs: fix missing args fix: remove delayed_trajectory_loading argument from ResultReaderBinaryMat chore: add clarifying comment on when using which result reader test: add check that the delegator works when simulating with dynamic diagnostics --- src/common/io.py | 56 ++++++++++++++++++++++++++++++ tests/test_io.py | 90 +++++++++++++++++++++++++++++++++--------------- 2 files changed, 119 insertions(+), 27 deletions(-) diff --git a/src/common/io.py b/src/common/io.py index 3400c745..cf884d8c 100644 --- a/src/common/io.py +++ b/src/common/io.py @@ -3283,6 +3283,62 @@ def _get_interpolated_trajectory(self, data_index: int) -> np.ndarray: f = scipy.interpolate.interp1d(time_vector, data, fill_value="extrapolate") return f(diag_time_vector) + +class ResultReaderBinaryMat(ResultReader): + def __init__(self, fname, allow_file_updates=False): + """ + Load a .mat result file. + + Parameters:: + + fname -- + Name of file or a file object supported by scipy.io.loadmat, + which the result is written to. + + allow_file_updates -- + If this is True, file updates (in terms of more + data points being added to the result file) is allowed. + The number of variables stored in the file needs to be + exactly the same and only the number of data points for + the continuous variables are allowed to change. + Default: False + """ + self._delegate = self._get_delegate(fname, allow_file_updates) + + + def _get_delegate(self, fname, allow_file_updates: bool): + """Determines what delegate to use based on input result data""" + try: + with open(fname, "rb") as f: + delayed = DelayedVariableLoad(f, chars_as_strings=False) + data_sections = ["name", "dataInfo", "data_2", "data_3", "data_4"] + raw_data_info = delayed.get_variables(variable_names = data_sections) + except FileNotFoundError as e: + raise NoResultError(str(e)) from e + + if raw_data_info.get("data_3") is not None and raw_data_info.get("data_4") is None: + # The result is 'consolidated' if 'data_3' exists but not 'data_4', meaning + # the dynamic diagnostic variable data exists in 'data_3'. This reader also + # handle results only containing 'data_2' but for now 'ResultDymolaBinary' is used. + # NOTE: Argument 'allow_file_updates' is ignored here. Consolidated results cannot + # be updated so it should not matter what value is given. + return _ResultReaderBinaryMatConsolidated(fname) + else: + return ResultDymolaBinary( + fname, + allow_file_updates=allow_file_updates, + ) + + def get_variable_names(self) -> list[str]: + return self._delegate.get_variable_names() + + def get_trajectory(self, name: str) -> Trajectory: + return self._delegate.get_trajectory(name) + + def get_trajectories(self, names: list[str]) -> dict[str, Trajectory]: + return self._delegate.get_trajectories(names) + + def verify_result_size(file_name, first_point, current_size, previous_size, max_size, ncp, time): free_space = get_available_disk_space(file_name) diff --git a/tests/test_io.py b/tests/test_io.py index 2a3b44ed..28c15788 100644 --- a/tests/test_io.py +++ b/tests/test_io.py @@ -24,6 +24,7 @@ import re from io import StringIO, BytesIO from collections import OrderedDict +from typing import Protocol from pyfmi import load_fmu from scipy.io import savemat @@ -45,7 +46,7 @@ ResultCSVTextual, ResultHandlerBinaryFile, ResultHandlerFile, - _ResultReaderBinaryMatConsolidated, + ResultReaderBinaryMat, VariableNotFoundError, ResultHandlerMemory, Trajectory, @@ -576,6 +577,31 @@ def coupled_clutches_cs_2_0(): def coupled_clutches_me_1_0(): return Dummy_FMUModelME1([], os.path.join(file_path, "files", "FMUs", "XML", "ME1.0", "CoupledClutches.fmu"), _connect_dll=False) +class ResultReaderFactory(Protocol): + def __call__(self, fname, delayed_trajectory_loading=True, allow_file_updates=False) -> ResultDymolaBinary | ResultReaderBinaryMat: + ... + +def create_result_dymola_binary_reader(fname, delayed_trajectory_loading=True, allow_file_updates=False): + return ResultDymolaBinary( + fname, + delayed_trajectory_loading=delayed_trajectory_loading, + allow_file_updates=allow_file_updates, + ) + +def create_result_reader_binary_mat(fname, delayed_trajectory_loading=True, allow_file_updates=False): + return ResultReaderBinaryMat( + fname, + allow_file_updates=allow_file_updates, + ) + +@pytest.fixture(params=[create_result_dymola_binary_reader, create_result_reader_binary_mat]) +def result_reader_cls(request) -> ResultReaderFactory: + return request.param + +@pytest.fixture +def double_pendulum_mat_file(): + return os.path.join(file_path, "files", "Results", "DoublePendulum.mat") + @pytest.mark.assimulo class TestResultFileBinary: @@ -622,7 +648,7 @@ def test_get_description(self, coupled_clutches_me_1_0): assert res.description[res.get_variable_index("J1.phi")] == "Absolute rotation angle of component" - def test_modified_result_file_data_diagnostics(self, coupled_clutches_me_2_0): + def test_modified_result_file_data_diagnostics(self, result_reader_cls: ResultReaderFactory, coupled_clutches_me_2_0): """Verify that computed diagnostics can be retrieved from an updated result file""" model = coupled_clutches_me_2_0 model.setup_experiment() @@ -661,7 +687,7 @@ def test_modified_result_file_data_diagnostics(self, coupled_clutches_me_2_0): result_writer.integration_point() result_writer.diagnostics_point(diag_data) - res = ResultDymolaBinary('CoupledClutches_result.mat', allow_file_updates=True) + res = result_reader_cls('CoupledClutches_result.mat', allow_file_updates=True) assert len(res.get_trajectory("@Diagnostics.state_errors.clutch1.w_rel").x) == 2, res.get_trajectory("@Diagnostics.state_errors.clutch1.w_rel").x @@ -812,7 +838,7 @@ def test_modified_result_file_data_1_delayed(self, coupled_clutches_me_2_0): #Assert that no exception is raised res.get_trajectory("J2.J") - def test_modified_result_file_time(self, coupled_clutches_me_2_0): + def test_modified_result_file_time(self, result_reader_cls: ResultReaderFactory, coupled_clutches_me_2_0): """Verify that 'time' can be retrieved from an updated result file""" model = coupled_clutches_me_2_0 model.setup_experiment() @@ -824,7 +850,7 @@ def test_modified_result_file_time(self, coupled_clutches_me_2_0): result_writer.initialize_complete() result_writer.integration_point() - res = ResultDymolaBinary('CoupledClutches_result.mat', allow_file_updates=True) + res = result_reader_cls('CoupledClutches_result.mat', allow_file_updates=True) res.get_trajectory("time") @@ -880,16 +906,16 @@ def test_overwriting_results(self, coupled_clutches_me_1_0): with pytest.raises(JIOError): res.get_trajectory("J1.phi") - def test_read_all_variables(self): - res = ResultDymolaBinary(os.path.join(file_path, "files", "Results", "DoublePendulum.mat")) + def test_read_all_variables(self, double_pendulum_mat_file: str, result_reader_cls: ResultReaderFactory): + res = result_reader_cls(double_pendulum_mat_file) assert len(res.get_variable_names()) == 1097, "Incorrect number of variables found, should be 1097" for var in res.get_variable_names(): res.get_trajectory(var) - def test_data_matrix_delayed_loading(self): - res = ResultDymolaBinary(os.path.join(file_path, "files", "Results", "DoublePendulum.mat"), delayed_trajectory_loading=True) + def test_data_matrix_delayed_loading(self, double_pendulum_mat_file: str): + res = ResultDymolaBinary(double_pendulum_mat_file, delayed_trajectory_loading=True) data_matrix = res.get_data_matrix() @@ -898,8 +924,8 @@ def test_data_matrix_delayed_loading(self): assert nbr_continuous_variables == 68, "Number of variables is incorrect, should be 68" assert nbr_points == 502, "Number of points is incorrect, should be 502" - def test_data_matrix_loading(self): - res = ResultDymolaBinary(os.path.join(file_path, "files", "Results", "DoublePendulum.mat"), delayed_trajectory_loading=False) + def test_data_matrix_loading(self, double_pendulum_mat_file: str): + res = ResultDymolaBinary(double_pendulum_mat_file, delayed_trajectory_loading=False) data_matrix = res.get_data_matrix() @@ -908,9 +934,9 @@ def test_data_matrix_loading(self): assert nbr_continuous_variables == 68, "Number of variables is incorrect, should be 68" assert nbr_points == 502, "Number of points is incorrect, should be 502" - def test_read_all_variables_from_stream(self): + def test_read_all_variables_from_stream(self, double_pendulum_mat_file: str): - with open(os.path.join(file_path, "files", "Results", "DoublePendulum.mat"), "rb") as f: + with open(double_pendulum_mat_file, "rb") as f: res = ResultDymolaBinary(f) assert len(res.get_variable_names()) == 1097, "Incorrect number of variables found, should be 1097" @@ -918,12 +944,12 @@ def test_read_all_variables_from_stream(self): for var in res.get_variable_names(): res.get_trajectory(var) - def test_compare_all_variables_from_stream(self): - res_file = ResultDymolaBinary(os.path.join(file_path, "files", "Results", "DoublePendulum.mat")) + def test_compare_all_variables_from_stream(self, double_pendulum_mat_file: str): + res_file = ResultDymolaBinary(double_pendulum_mat_file) assert len(res_file.get_variable_names()) == 1097, "Incorrect number of variables found, should be 1097" - with open(os.path.join(file_path, "files", "Results", "DoublePendulum.mat"), "rb") as f: + with open(double_pendulum_mat_file, "rb") as f: res_stream = ResultDymolaBinary(f) assert len(res_stream.get_variable_names()) == 1097, "Incorrect number of variables found, should be 1097" @@ -933,9 +959,9 @@ def test_compare_all_variables_from_stream(self): np.testing.assert_array_equal(x_file.x, x_stream.x, err_msg="Mismatch in array values for var=%s"%var) - def test_on_demand_loading_32_bits(self): - res_demand = ResultDymolaBinary(os.path.join(file_path, "files", "Results", "DoublePendulum.mat")) - res_all = ResultDymolaBinary(os.path.join(file_path, "files", "Results", "DoublePendulum.mat")) + def test_on_demand_loading_32_bits(self, double_pendulum_mat_file: str, result_reader_cls: ResultReaderFactory): + res_demand = result_reader_cls(double_pendulum_mat_file, delayed_trajectory_loading=True) + res_all = result_reader_cls(double_pendulum_mat_file, delayed_trajectory_loading=False) t_demand = res_demand.get_trajectory('time').x t_all = res_all.get_trajectory('time').x np.testing.assert_array_equal(t_demand, t_all, "On demand loaded result and all loaded does not contain equal result.") @@ -2406,13 +2432,13 @@ def mat_file_singular_data(tmp_path): class TestResultReaderForBinaryMatConsolidated: def test_get_all_variable_names(self, mat_file): - result = _ResultReaderBinaryMatConsolidated(mat_file) + result = ResultReaderBinaryMat(mat_file) variables = result.get_variable_names() expected = {"spring.phi_nominal", "spring.k_constant", "time", "torque.flange.phi", "@Diagnostics.step_time", "@Diagnostics.error_code"} assert set(variables) == expected def test_get_values_assert_valid(self, mat_file): - result = _ResultReaderBinaryMatConsolidated(mat_file) + result = ResultReaderBinaryMat(mat_file) # Test spring.phi_nominal (data_1, constant value) traj_phi = result.get_trajectory("spring.phi_nominal") @@ -2443,7 +2469,7 @@ def test_get_values_assert_valid(self, mat_file): assert np.allclose(traj_error.x, [0.0, 1.0, 0.0]) def test_get_data_only_one_len(self, mat_file_singular_data): - result = _ResultReaderBinaryMatConsolidated(mat_file_singular_data) + result = ResultReaderBinaryMat(mat_file_singular_data) # Test time variable traj_time = result.get_trajectory("time") @@ -2458,25 +2484,35 @@ def test_get_data_only_one_len(self, mat_file_singular_data): assert np.allclose(traj_phi.x, [1.0]) def test_get_all_non_existing_variable_throws(self, mat_file): - result = _ResultReaderBinaryMatConsolidated(mat_file) + result = ResultReaderBinaryMat(mat_file) with pytest.raises(VariableNotFoundError): result.get_trajectory("does.not.exist") def test_get_trajectories_from_all_matrices(self, mat_file): - result = _ResultReaderBinaryMatConsolidated(mat_file) + result = ResultReaderBinaryMat(mat_file) for var in ["spring.phi_nominal", "torque.flange.phi", "@Diagnostics.step_time"]: assert result.get_trajectory(var) is not None def test_with_diagnostic_variable(self, mat_file): - result = _ResultReaderBinaryMatConsolidated(mat_file) + result = ResultReaderBinaryMat(mat_file) assert result.get_trajectory("@Diagnostics.step_time") is not None def test_without_diagnostic_variable(self, mat_file_no_diag): - result_no_diag = _ResultReaderBinaryMatConsolidated(mat_file_no_diag) + result_no_diag = ResultReaderBinaryMat(mat_file_no_diag) assert "@Diagnostics.step_time" not in result_no_diag.get_variable_names() + def test_without_diagnostic_variable_delegates(self, mat_file_no_diag): + result = ResultReaderBinaryMat(mat_file_no_diag) + assert result.get_trajectory("time") is not None + assert result.get_trajectories(["time"])["time"] is not None + + def test_result_does_not_exist_raises_error(self): + with pytest.raises(NoResultError): + ResultReaderBinaryMat("does-not-exists") + + def test_interpolation_between_points(mat_file_interpolation): - result = _ResultReaderBinaryMatConsolidated(mat_file_interpolation) + result = ResultReaderBinaryMat(mat_file_interpolation) traj = result.get_trajectory("spring.phi_nominal") assert np.allclose(traj.t, [0.0, 0.5, 1.0, 1.5, 2.0]) From 52ef84e882a2d22b19f2fd2b0fab91e6c1f2d2c9 Mon Sep 17 00:00:00 2001 From: Emil Fredriksson Date: Mon, 12 Jan 2026 16:05:27 +0000 Subject: [PATCH 2/3] refactor: minor refactor, breaking functionality out into method --- src/common/io.py | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/src/common/io.py b/src/common/io.py index cf884d8c..0dcabc2e 100644 --- a/src/common/io.py +++ b/src/common/io.py @@ -3177,11 +3177,7 @@ def __init__(self, fname): elif hasattr(fname, "name") and os.path.isfile(fname.name): self._fname = fname.name - data_sections = ["name", "dataInfo", "data_2", "data_3"] - with open(self._fname, "rb") as f: - delayed = _DelayedVariableLoadDiags(f, chars_as_strings=False) - self.raw: dict = delayed.get_variables(variable_names = data_sections) - + self.raw = self._load_raw_data_info() self._name_info: dict = self.raw["name"] self._dataInfo: dict = self.raw["dataInfo"] self._data_2_info: dict = self.raw["data_2"] @@ -3189,6 +3185,12 @@ def __init__(self, fname): if self._contains_diagnostic_data: self._data_3_info: dict = self.raw["data_3"] + def _load_raw_data_info(self) -> dict: + data_sections = ["name", "dataInfo", "data_2", "data_3"] + with open(self._fname, "rb") as f: + delayed = _DelayedVariableLoadDiags(f, chars_as_strings=False) + return delayed.get_variables(variable_names = data_sections) + @cache def _get_variable_name_to_index_dict(self) -> dict[str, int]: name_dict: dict = fmi_util.read_name_list( From f5d554328595422808acdcf7a379dd8d2205f863 Mon Sep 17 00:00:00 2001 From: Emil Fredriksson Date: Mon, 12 Jan 2026 16:04:47 +0000 Subject: [PATCH 3/3] perf: ensure as little data as possible is read for .mat file delegation It is not possible to re-use the data loaded for both cases of delegation. The reason is they use different implementations of the delayed loading which returns different results, what is expected to be the full data vs a file position pointer. For example 'data_3' is different for the two implementations. So, instead of trying to re-use it for one of the implementations we can instead try to minimize the amount of data handling for doing the delegation. This is done by using the delayed loader for the consolidated result and add support to it to not load in any data_4. --- src/common/io.py | 31 +++++++++++++++---------------- 1 file changed, 15 insertions(+), 16 deletions(-) diff --git a/src/common/io.py b/src/common/io.py index 0dcabc2e..e889d2cb 100644 --- a/src/common/io.py +++ b/src/common/io.py @@ -3121,24 +3121,23 @@ def get_result(self) -> ResultDymolaBinary: return ResultDymolaBinary(self._get_file_name()) class _DelayedVarReader4Diags(DelayedVarReader4): + def _create_section_data(self, section_name, hdr): + return { + "section": section_name, + "file_position": self.mat_stream.tell(), + "sizeof_type": hdr.dtype.itemsize, + "nbr_points": hdr.dims[1], + "nbr_variables": hdr.dims[0] + } + def read_sub_array(self, hdr, copy=True): match hdr.name: case b"data_2": - return { - "section": "data_2", - "file_position": self.mat_stream.tell(), - "sizeof_type": hdr.dtype.itemsize, - "nbr_points": hdr.dims[1], - "nbr_variables": hdr.dims[0] - } + return self._create_section_data("data_2", hdr) case b"data_3": - return { - "section": "data_3", - "file_position": self.mat_stream.tell(), - "sizeof_type": hdr.dtype.itemsize, - "nbr_points": hdr.dims[1], - "nbr_variables": hdr.dims[0] - } + return self._create_section_data("data_3", hdr) + case b"data_4": + return self._create_section_data("data_4", hdr) case b"name": return { "section": "name", @@ -3312,8 +3311,8 @@ def _get_delegate(self, fname, allow_file_updates: bool): """Determines what delegate to use based on input result data""" try: with open(fname, "rb") as f: - delayed = DelayedVariableLoad(f, chars_as_strings=False) - data_sections = ["name", "dataInfo", "data_2", "data_3", "data_4"] + delayed = _DelayedVariableLoadDiags(f, chars_as_strings=False) + data_sections = ["data_3", "data_4"] raw_data_info = delayed.get_variables(variable_names = data_sections) except FileNotFoundError as e: raise NoResultError(str(e)) from e