diff --git a/new_regression.py b/new_regression.py new file mode 100644 index 0000000..001baae --- /dev/null +++ b/new_regression.py @@ -0,0 +1,169 @@ +import os +import argparse +import sys +import numpy as np + +from tqdm import tqdm + +if "SOFA_ROOT" not in os.environ: + print('SOFA_ROOT environment variable has not been detected, quitting.') + exit(1) +else: + sofapython3_path = os.environ["SOFA_ROOT"] + "/lib/python3/site-packages" + sys.path.append(sofapython3_path) + +import Sofa +import SofaRuntime # importing SofaRuntime will add the py3 loader to the scene loaders +import tools.RegressionSceneParsing as RegressionSceneParsing + + +class RegressionProgram: + def __init__(self, input_folder, disable_progress_bar = False, verbose = False): + self.scene_sets = [] # List + self.disable_progress_bar = disable_progress_bar + self.verbose = verbose + + for root, dirs, files in os.walk(input_folder): + for file in files: + if file.endswith(".regression-tests"): + file_path = os.path.join(root, file) + + scene_list = RegressionSceneParsing.RegressionSceneList(file_path, self.disable_progress_bar, verbose) + + scene_list.process_file() + self.scene_sets.append(scene_list) + + def nbr_error_in_sets(self): + nbr_errors = 0 + for scene_list in self.scene_sets: + nbr_errors = nbr_errors + scene_list.get_nbr_errors() + return nbr_errors + + def log_errors_in_sets(self): + for scene_list in self.scene_sets: + scene_list.log_scenes_errors() + + def write_sets_references(self, id_set=0): + scene_list = self.scene_sets[id_set] + nbr_scenes = scene_list.write_all_references() + return nbr_scenes + + def write_all_sets_references(self): + nbr_sets = len(self.scene_sets) + pbar_sets = tqdm(total=nbr_sets, disable=self.disable_progress_bar) + pbar_sets.set_description("Write All sets") + + nbr_scenes = 0 + for i in range(0, nbr_sets): + nbr_scenes = nbr_scenes + self.write_sets_references(i) + pbar_sets.update(1) + + pbar_sets.close() + + return nbr_scenes + + def compare_sets_references(self, id_set=0): + scene_list = self.scene_sets[id_set] + nbr_scenes = scene_list.compare_all_references() + return nbr_scenes + + def compare_all_sets_references(self): + nbr_sets = len(self.scene_sets) + pbar_sets = tqdm(total=nbr_sets, disable=self.disable_progress_bar) + pbar_sets.set_description("Compare All sets") + + nbr_scenes = 0 + for i in range(0, nbr_sets): + nbr_scenes = nbr_scenes + self.compare_sets_references(i) + pbar_sets.update(1) + + pbar_sets.close() + + return nbr_scenes + + def replay_references(self, id_set=0): + scene_list = self.scene_sets[id_set] + scene_list.replay_references(0) + + + +def parse_args(): + """ + Parse input arguments + """ + parser = argparse.ArgumentParser( + description='Regression arguments') + parser.add_argument('--input', + dest='input', + help='help input', + type=str) + + parser.add_argument('--output', + dest='output', + help="Directory where to export data preprocessed", + type=str) + + parser.add_argument('--replay', + dest='replay', + help="test option to replay reference", + type=int) + + parser.add_argument( + "--write-references", + dest="write_mode", + help='If set, will generate new reference files', + action='store_true' + ) + parser.add_argument( + "--disable-progress-bar", + dest="progress_bar_is_disabled", + help='If set, will disable progress bars', + action='store_true' + ) + parser.add_argument( + "--verbose", + dest="verbose", + help='If set, will display more information', + action='store_true' + ) + + cmdline_args = parser.parse_args() + + return cmdline_args + + + +if __name__ == '__main__': + # 1- Parse arguments to get folder path + args = parse_args() + # 2- Process file + if args.input is not None: + reg_prog = RegressionProgram(args.input, args.progress_bar_is_disabled, args.verbose) + else: + exit("Error: Argument is required ! Quitting.") + + nbr_scenes = 0 + + replay = bool(args.replay) + if replay: + reg_prog.replay_references() + sys.exit() + + if args.write_mode: + nbr_scenes = reg_prog.write_all_sets_references() + else: + nbr_scenes = reg_prog.compare_all_sets_references() + + np.set_printoptions(legacy='1.25') # revert printing floating-point type in numpy (concretely remove np.array when displaying a list of np.float) + + print ("### Number of sets Done: " + str(len(reg_prog.scene_sets))) + print ("### Number of scenes Done: " + str(nbr_scenes)) + if args.write_mode is False: + print ("### Number of scenes failed: " + str(reg_prog.nbr_error_in_sets())) + reg_prog.log_errors_in_sets() + if reg_prog.nbr_error_in_sets() > 0: + sys.exit(1) # exit with error(s) + + sys.exit(0) # exit without error + + diff --git a/tools/RegressionSceneData.py b/tools/RegressionSceneData.py new file mode 100644 index 0000000..ad797c5 --- /dev/null +++ b/tools/RegressionSceneData.py @@ -0,0 +1,249 @@ +from tqdm import tqdm +import json +from json import JSONEncoder +import numpy as np +import gzip +import pathlib + +import Sofa +import Sofa.Gui + +debug_info = False + +def is_simulated(node): + if node.hasODESolver(): + return True + + # if no Solver in current node, check parent nodes + for parent in node.parents: + solver_found = is_simulated(parent) + if solver_found: + return True + + return False + + +class NumpyArrayEncoder(JSONEncoder): + def default(self, obj): + if isinstance(obj, np.ndarray): + return obj.tolist() + return JSONEncoder.default(self, obj) + + +class RegressionSceneData: + def __init__(self, file_scene_path: str = None, file_ref_path: str = None, steps = 1000, + epsilon = 0.0001, meca_in_mapping = True, dump_number_step = 1, disable_progress_bar = False): + """ + /// Path to the file scene to test + std::string m_fileScenePath; + /// Path to the reference file corresponding to the scene to test + std::string m_fileRefPath; + /// Number of step to perform + unsigned int m_steps; + /// Threshold value for dof position comparison + double m_epsilon; + /// Option to test mechanicalObject in Node containing a Mapping (true will test them) + bool m_mecaInMapping; + /// Option to compare mechanicalObject dof position at each timestep + bool m_dumpNumberStep; + """ + self.file_scene_path = file_scene_path + self.file_ref_path = file_ref_path + self.steps = int(steps) + self.epsilon = float(epsilon) + self.meca_in_mapping = meca_in_mapping + self.dump_number_step = int(dump_number_step) + self.meca_objs = [] + self.filenames = [] + self.mins = [] + self.maxs = [] + self.total_error = [] + self.error_by_dof = [] + self.nbr_tested_frame = 0 + self.regression_failed = False + self.root_node = None + self.disable_progress_bar = disable_progress_bar + + def print_info(self): + print("Test scene: " + self.file_scene_path + " vs " + self.file_ref_path + " using: " + str(self.steps) + + " " + str(self.epsilon)) + + def log_errors(self): + if self.regression_failed is True: + print("### Failed: " + self.file_scene_path) + print(" ### Total Error per MechanicalObject: " + str(self.total_error)) + print(" ### Error by Dofs: " + str(self.error_by_dof)) + else: + print ("### Success: " + self.file_scene_path + " | Number of key frames compared without error: " + str(self.nbr_tested_frame)) + + + def print_meca_objs(self): + print ("# Nbr Meca: " + str(len(self.meca_objs))) + counter = 0 + for mecaObj in self.meca_objs: + filename = self.file_ref_path + ".reference_" + str(counter) + "_" + mecaObj.name.value + "_mstate" + ".txt.gz" + counter = counter+1 + print ("# File attached: " + filename) + + + def parse_node(self, node, level = 0): + for child in node.children: + mstate = child.getMechanicalState() + if mstate: + if is_simulated(child): + self.meca_objs.append(mstate) + + self.parse_node(child, level + 1) + + + def add_compare_state(self): + counter = 0 + for meca_obj in self.meca_objs: + _filename = self.file_ref_path + ".reference_" + str(counter) + "_" + meca_obj.name.value + "_mstate" + ".txt.gz" + + meca_obj.getContext().addObject('CompareState', filename=_filename) + counter = counter+1 + + + def add_write_state(self): + counter = 0 + for meca_obj in self.meca_objs: + _filename = self.file_ref_path + ".reference_" + str(counter) + "_" + meca_obj.name.value + "_mstate" + ".txt.gz" + + meca_obj.getContext().addObject('WriteState', filename=_filename) + counter = counter+1 + + + def load_scene(self): + self.root_node = Sofa.Simulation.load(self.file_scene_path) + if not self.root_node: # error while loading + print(f'Error while trying to load {self.file_scene_path}') + raise RuntimeError + else: + Sofa.Simulation.init(self.root_node) + + # prepare ref files per mecaObjs: + self.parse_node(self.root_node, 0) + counter = 0 + for mecaObj in self.meca_objs: + _filename = self.file_ref_path + ".reference_mstate_" + str(counter) + "_" + mecaObj.name.value + ".json.gz" + self.filenames.append(_filename) + counter = counter+1 + + + + def write_references(self): + pbar_simu = tqdm(total=self.steps, disable=self.disable_progress_bar) + pbar_simu.set_description("Simulate: " + self.file_scene_path) + + nbr_meca = len(self.meca_objs) + numpy_data = [] # List + for meca_id in range(0, nbr_meca): + meca_dofs = {} + numpy_data.append(meca_dofs) + + + counter_step = 0 + modulo_step = self.steps / self.dump_number_step + + for step in range(0, self.steps + 1): + # export rest position, final position + modulo steps: + if step == 0 or counter_step >= modulo_step or step == self.steps: + for meca_id in range(0, nbr_meca): + numpy_data[meca_id][self.root_node.dt.value * step] = np.copy(self.meca_objs[meca_id].position.value) + counter_step = 0 + + Sofa.Simulation.animate(self.root_node, self.root_node.dt.value) + counter_step = counter_step + 1 + + pbar_simu.update(1) + pbar_simu.close() + + for meca_id in range(0, nbr_meca): + # make sure the parent directory of the references exists + output_file = pathlib.Path(self.filenames[meca_id]) + output_file.parent.mkdir(exist_ok=True, parents=True) + + with gzip.open(self.filenames[meca_id], 'wb') as write_file: + write_file.write(json.dumps(numpy_data[meca_id], cls=NumpyArrayEncoder).encode('utf-8')) + + Sofa.Simulation.unload(self.root_node) + + + def compare_references(self): + pbar_simu = tqdm(total=float(self.steps), disable=self.disable_progress_bar) + pbar_simu.set_description("compare_references: " + self.file_scene_path) + + nbr_meca = len(self.meca_objs) + numpy_data = [] # List + keyframes = [] + self.total_error = [] + self.error_by_dof = [] + + try: + for meca_id in range(0, nbr_meca): + with gzip.open(self.filenames[meca_id], 'r') as zipfile: + decoded_array = json.loads(zipfile.read().decode('utf-8')) + numpy_data.append(decoded_array) + + if meca_id == 0: + for key in decoded_array: + keyframes.append(float(key)) + + self.total_error.append(0.0) + self.error_by_dof.append(0.0) + except FileNotFoundError as e: + print(f'Error while reading references: {str(e)}') + return False + + + frame_step = 0 + nbr_frames = len(keyframes) + self.nbr_tested_frame = 0 + for step in range(0, self.steps + 1): + simu_time = self.root_node.dt.value * step + + if simu_time == keyframes[frame_step]: + for meca_id in range(0, nbr_meca): + meca_dofs = np.copy(self.meca_objs[meca_id].position.value) + data_ref = np.asarray(numpy_data[meca_id][str(keyframes[frame_step])]) - meca_dofs + + # Compute total distance between the 2 sets + full_dist = np.linalg.norm(data_ref) + error_by_dof = full_dist / float(data_ref.size) + + if debug_info: + print (str(step) + "| " + self.meca_objs[meca_id].name.value + " | full_dist: " + str(full_dist) + " | error_by_dof: " + str(error_by_dof) + " | nbrDofs: " + str(data_ref.size)) + + self.total_error[meca_id] = self.total_error[meca_id] + full_dist + self.error_by_dof[meca_id] = self.error_by_dof[meca_id] + error_by_dof + + frame_step = frame_step + 1 + self.nbr_tested_frame = self.nbr_tested_frame + 1 + + # security exit if simulation steps exceed nbr_frames + if frame_step == nbr_frames: + break + + Sofa.Simulation.animate(self.root_node, self.root_node.dt.value) + + pbar_simu.update(1) + pbar_simu.close() + + for meca_id in range(0, nbr_meca): + if self.total_error[meca_id] > self.epsilon: + self.regression_failed = True + return False + + return True + + + def replayReferences(self): + Sofa.Gui.GUIManager.Init("myscene", "qglviewer") + Sofa.Gui.GUIManager.createGUI(self.root_node, __file__) + Sofa.Gui.GUIManager.SetDimension(1080, 1080) + Sofa.Gui.GUIManager.MainLoop(self.root_node) + Sofa.Gui.GUIManager.closeGUI() + + + diff --git a/tools/RegressionSceneParsing.py b/tools/RegressionSceneParsing.py new file mode 100644 index 0000000..f37dccc --- /dev/null +++ b/tools/RegressionSceneParsing.py @@ -0,0 +1,124 @@ +import os +import tools.RegressionSceneData as RegressionSceneData +from tqdm import tqdm + + +## This class is responsible for loading a file.regression-tests to gather the list of scene to test with all arguments +## It will provide the API to launch the tests or write refs on all scenes contained in this file +class RegressionSceneList: + def __init__(self, file_path, disable_progress_bar = False, verbose = False): + """ + /// Path to the file.regression-tests containing the list of scene to tests with all arguments + std::string filePath; + """ + self.file_path = file_path + self.file_dir = os.path.dirname(file_path) + self.scenes = [] # List + self.nbr_errors = 0 + self.ref_dir_path = None + self.disable_progress_bar = disable_progress_bar + self.verbose = verbose + + + def get_nbr_scenes(self): + return len(self.scenes) + + def get_nbr_errors(self): + return self.nbr_errors + + def log_scenes_errors(self): + for scene in self.scenes: + scene.log_errors() + + def process_file(self): + with open(self.file_path, 'r') as the_file: + data = the_file.readlines() + the_file.close() + + count = 0 + for idx, line in enumerate(data): + if line[0] == "#": + continue + + values = line.split() + if len(values) == 0: + continue + + if count == 0: + self.ref_dir_path = os.path.join(self.file_dir, values[0]) + self.ref_dir_path = os.path.abspath(self.ref_dir_path) + count = count + 1 + continue + + + if len(values) != 5: + print ("line read has not 5 arguments: " + str(len(values)) + " -> " + line) + continue + + full_file_path = os.path.join(self.file_dir, values[0]) + full_ref_file_path = os.path.join(self.ref_dir_path, values[0]) + + if len(values) == 5: + scene_data = RegressionSceneData.RegressionSceneData(full_file_path, full_ref_file_path, + values[1], values[2], values[3], values[4], + self.disable_progress_bar) + + #scene_data.printInfo() + self.scenes.append(scene_data) + + + def write_references(self, id_scene, print_log = False): + if self.verbose: + print(f'Writing reference files for {self.scenes[id_scene].file_path}.') + + self.scenes[id_scene].load_scene() + if print_log is True: + self.scenes[id_scene].print_meca_objs() + + self.scenes[id_scene].write_references() + + def write_all_references(self): + nbr_scenes = len(self.scenes) + pbar_scenes = tqdm(total=nbr_scenes, disable=self.disable_progress_bar) + pbar_scenes.set_description("Write all scenes from: " + self.file_path) + + for i in range(0, nbr_scenes): + self.write_references(i) + pbar_scenes.update(1) + pbar_scenes.close() + + return nbr_scenes + + + def compare_references(self, id_scene): + if self.verbose: + self.scenes[id_scene].print_info() + + try: + self.scenes[id_scene].load_scene() + except Exception as e: + self.nbr_errors = self.nbr_errors + 1 + print(f'Error while trying to load: {str(e)}') + else: + result = self.scenes[id_scene].compare_references() + if not result: + self.nbr_errors = self.nbr_errors + 1 + + def compare_all_references(self): + nbr_scenes = len(self.scenes) + pbar_scenes = tqdm(total=nbr_scenes, disable=self.disable_progress_bar) + pbar_scenes.set_description("Compare all scenes from: " + self.file_path) + + for i in range(0, nbr_scenes): + self.compare_references(i) + pbar_scenes.update(1) + pbar_scenes.close() + + return nbr_scenes + + + def replay_references(self, id_scene): + self.scenes[id_scene].load_scene() + self.scenes[id_scene].replay_references() + +