diff --git a/environment.yml b/environment.yml index da840c1ab4..82cd515c14 100644 --- a/environment.yml +++ b/environment.yml @@ -50,7 +50,7 @@ dependencies: # external software tools for chemistry - conda-forge::coolprop - - conda-forge::cantera =2.6 + - conda-forge::cantera >=3.0 - conda-forge::mopac # see https://github.com/ReactionMechanismGenerator/RMG-Py/pull/2639#issuecomment-2050292972 - conda-forge::cclib >=1.6.3,<1.9 diff --git a/rmgpy/cantera.py b/rmgpy/cantera.py new file mode 100644 index 0000000000..8daf87b4db --- /dev/null +++ b/rmgpy/cantera.py @@ -0,0 +1,598 @@ +############################################################################### +# # +# RMG - Reaction Mechanism Generator # +# # +# Copyright (c) 2002-2023 Prof. William H. Green (whgreen@mit.edu), # +# Prof. Richard H. West (r.west@neu.edu) and the RMG Team (rmg_dev@mit.edu) # +# # +# Permission is hereby granted, free of charge, to any person obtaining a # +# copy of this software and associated documentation files (the 'Software'), # +# to deal in the Software without restriction, including without limitation # +# the rights to use, copy, modify, merge, publish, distribute, sublicense, # +# and/or sell copies of the Software, and to permit persons to whom the # +# Software is furnished to do so, subject to the following conditions: # +# # +# The above copyright notice and this permission notice shall be included in # +# all copies or substantial portions of the Software. # +# # +# THE SOFTWARE IS PROVIDED 'AS IS', WITHOUT WARRANTY OF ANY KIND, EXPRESS OR # +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, # +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE # +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER # +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING # +# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER # +# DEALINGS IN THE SOFTWARE. # +# # +############################################################################### + +""" +This module contains functions for writing of Cantera input files. +""" + +from typing import Union, TYPE_CHECKING + +import os +import shutil +import logging +import yaml + +from rmgpy.data.kinetics.family import TemplateReaction +from rmgpy.data.kinetics.library import LibraryReaction +from rmgpy.kinetics import ( + Arrhenius, PDepArrhenius, MultiArrhenius, MultiPDepArrhenius, + Chebyshev, Troe, Lindemann, ThirdBody, + StickingCoefficient, SurfaceArrhenius, +) +from rmgpy.reaction import Reaction +from rmgpy.rmg.pdep import PDepReaction +from rmgpy.util import make_output_subdirectory +import rmgpy.constants as constants + +if TYPE_CHECKING: + from rmgpy.species import Species + from rmgpy.molecule.molecule import Molecule + +SYMBOL_BY_NUMBER = {0: 'e', 1: 'H', 2: 'He', 3: 'Li', 4: 'Be', 5: 'B', 6: 'C', 7: 'N', 8: 'O', 9: 'F', 10: 'Ne', + 11: 'Na', 12: 'Mg', 13: 'Al', 14: 'Si', 15: 'P', 16: 'S', 17: 'Cl', 18: 'Ar', 19: 'K', 20: 'Ca', + 21: 'Sc', 22: 'Ti', 23: 'V', 24: 'Cr', 25: 'Mn', 26: 'Fe', 27: 'Co', 28: 'Ni', 29: 'Cu', 30: 'Zn', + 31: 'Ga', 32: 'Ge', 33: 'As', 34: 'Se', 35: 'Br', 36: 'Kr', 37: 'Rb', 38: 'Sr', 39: 'Y', 40: 'Zr', + 41: 'Nb', 42: 'Mo', 43: 'Tc', 44: 'Ru', 45: 'Rh', 46: 'Pd', 47: 'Ag', 48: 'Cd', 49: 'In', 50: 'Sn', + 51: 'Sb', 52: 'Te', 53: 'I', 54: 'Xe', 55: 'Cs', 56: 'Ba', 57: 'La', 58: 'Ce', 59: 'Pr', 60: 'Nd', + 61: 'Pm', 62: 'Sm', 63: 'Eu', 64: 'Gd', 65: 'Tb', 66: 'Dy', 67: 'Ho', 68: 'Er', 69: 'Tm', 70: 'Yb', + 71: 'Lu', 72: 'Hf', 73: 'Ta', 74: 'W', 75: 'Re', 76: 'Os', 77: 'Ir', 78: 'Pt', 79: 'Au', 80: 'Hg', + 81: 'Tl', 82: 'Pb', 83: 'Bi', 84: 'Po', 85: 'At', 86: 'Rn', 87: 'Fr', 88: 'Ra', 89: 'Ac', 90: 'Th', + 91: 'Pa', 92: 'U', 93: 'Np', 94: 'Pu', 95: 'Am', 96: 'Cm', 97: 'Bk', 98: 'Cf', 99: 'Es', 100: 'Fm', + 101: 'Md', 102: 'No', 103: 'Lr', 104: 'Rf', 105: 'Db', 106: 'Sg', 107: 'Bh', 108: 'Hs', 109: 'Mt', + 110: 'Ds', 111: 'Rg', 112: 'Cn', 113: 'Nh', 114: 'Fl', 115: 'Mc', 116: 'Lv', 117: 'Ts', 118: 'Og'} +NUMBER_BY_SYMBOL = {value: key for key, value in SYMBOL_BY_NUMBER.items()} + + +class CanteraWriter(object): + """ + This class listens to a RMG subject and writes a Cantera YAML file + with the current state of the RMG model at every iteration. + """ + + def __init__(self, output_directory=''): + self.output_directory = output_directory + make_output_subdirectory(output_directory, 'cantera') + + def update(self, rmg): + """ + Called whenever the RMG subject notifies listeners. + """ + save_cantera_files(rmg) + + +def save_cantera_files(rmg): + """ + Save the current reaction model to a set of Cantera YAML files. + + Creates: + 1. chem{N}.yaml (where N is num species) + 2. chem.yaml (latest copy) + """ + # Ensure subdirectory exists + cantera_dir = os.path.join(rmg.output_directory, 'cantera') + if not os.path.exists(cantera_dir): + os.mkdir(cantera_dir) + + try: + site_density = rmg.surface_site_density.value_si + except (AttributeError, KeyError, TypeError): + site_density = None + + # ------------------------------------------------------------------------- + # 1. Save Core Model + # ------------------------------------------------------------------------- + num_species = len(rmg.reaction_model.core.species) + + # Define paths + this_cantera_path = os.path.join(rmg.output_directory, 'cantera', + 'chem{0:04d}.yaml'.format(num_species)) + latest_cantera_path = os.path.join(rmg.output_directory, 'cantera', 'chem.yaml') + + logging.info(f"Saving current model core to Cantera file: {this_cantera_path}") + + # Write the YAML file + save_cantera_model(rmg.reaction_model.core, this_cantera_path, site_density=site_density) + + # Copy to 'chem.yaml' (The latest file) + if os.path.exists(latest_cantera_path): + os.unlink(latest_cantera_path) + shutil.copy2(this_cantera_path, latest_cantera_path) + + # ------------------------------------------------------------------------- + # 2. Save Edge Model (Optional, matching ChemkinWriter logic) + # ------------------------------------------------------------------------- + if rmg.save_edge_species: + logging.info('Saving current model core and edge to Cantera file...') + + this_edge_path = os.path.join(rmg.output_directory, 'cantera', + 'chem_edge{0:04d}.yaml'.format(num_species)) + latest_edge_path = os.path.join(rmg.output_directory, 'cantera', 'chem_edge.yaml') + + # Create a simple container object to pass to save_cantera_model + class MixedModel: + def __init__(self, species, reactions): + self.species = species + self.reactions = reactions + + edge_model = MixedModel( + rmg.reaction_model.core.species + rmg.reaction_model.edge.species, + rmg.reaction_model.core.reactions + rmg.reaction_model.edge.reactions + ) + + save_cantera_model(edge_model, this_edge_path, site_density=site_density) + + if os.path.exists(latest_edge_path): + os.unlink(latest_edge_path) + shutil.copy2(this_edge_path, latest_edge_path) + + +def save_cantera_model(model_container, path, site_density=None): + """ + Internal helper to generate the dictionary and write the YAML file. + model_container must have .species and .reactions attributes (lists). + """ + species_list = model_container.species + reaction_list = model_container.reactions + + is_plasma = False + for sp in species_list: + if sp.is_electron(): + is_plasma = True + break + + # Generate Data + yaml_data = generate_cantera_data(species_list, reaction_list, is_plasma=is_plasma, site_density=site_density) + + # Write + with open(path, 'w') as f: + # sort_keys=False ensures 'units' comes first, then 'phases', etc. + yaml.dump(yaml_data, f, sort_keys=False, default_flow_style=None) + + +def generate_cantera_data(species_list, + reaction_list, + is_plasma=False, + site_density=None, + search_for_additional_elements=False, + ): + """ + Converts RMG objects into a dictionary structure compatible with Cantera YAML. + """ + # --- 1. Header & Units --- + # We output everything in SI units. + data = { + 'description': 'RMG-Py Generated Mechanism', + 'generator': 'RMG-Py CanteraWriter', + 'cantera-version': '3.1', + 'units': { + 'length': 'm', + 'time': 's', + 'quantity': 'mol', + 'activation-energy': 'J/mol' + } + } + + # --- 2. Phase Segregation (Gas vs Surface) --- + gas_species, surface_species, gas_reactions, surface_reactions = list(), list(), list(), list() + + for spc in species_list: + if spc.contains_surface_site(): + surface_species.append(spc) + else: + gas_species.append(spc) + + for rxn in reaction_list: + if rxn.is_surface_reaction(): + surface_reactions.append(rxn) + else: + gas_reactions.append(rxn) + + # --- 3. Phase Definitions --- + base_elements = ['H', 'C', 'O', 'N', 'Ne', 'Ar', 'He', 'Si', 'S', 'F', 'Cl', 'Br', 'I', 'E'] + elements_set = set(base_elements) + + if search_for_additional_elements: + for spc in species_list: + if spc.molecule and len(spc.molecule) > 0: + if spc.is_electron(): + elements_set.add('E') + is_plasma = True + else: + for elem in spc.molecule[0].get_element_count().keys(): + if elem != 'X': + elements_set.add(elem) + + phases = list() + + gas_phase_def = { + 'name': 'gas', + 'thermo': 'plasma' if is_plasma else 'ideal-gas', + 'elements': sorted(list(elements_set)), + 'species': [get_label(spc, species_list) for spc in gas_species], + 'kinetics': 'gas', + 'reactions': 'declared-species', + } + + if is_plasma: + gas_phase_def['transport'] = 'ionized-gas' + # Plasma specific defaults + gas_phase_def['electron-energy-distribution'] = { + 'type': 'isotropic', + 'shape-factor': 2.0, + 'mean-electron-energy': 1.0 + } + else: + gas_phase_def['transport'] = 'mixture-averaged' + + phases.append(gas_phase_def) + + if surface_species: + default_site_density = 2.5e-5 # mol/m^2 + + surface_phase_def = { + 'name': 'surface', + 'thermo': 'ideal-surface', + 'adjacent-phases': ['gas'], + 'elements': sorted(list(elements_set)), + 'species': [get_label(sp, species_list) for sp in surface_species], + 'kinetics': 'surface', + 'reactions': 'declared-species', + 'site-density': site_density or default_site_density + } + phases.append(surface_phase_def) + + data['phases'] = phases + + species_data = list() + for sp in species_list: + species_data.append(species_to_dict(sp, species_list)) + data['species'] = species_data + + reaction_data = list() + for rxn in gas_reactions: + entries = reaction_to_dict_list(rxn, species_list) + if entries: + reaction_data.extend(entries) + for rxn in surface_reactions: + entries = reaction_to_dict_list(rxn, species_list) + if entries: + reaction_data.extend(entries) + data['reactions'] = reaction_data + + return data + + +def species_to_dict(species, species_list): + """Convert an RMG Species object to a Cantera YAML dictionary.""" + + notes = list() + try: + notes.append(species.to_smiles()) + except: + pass + + # Composition + mol = species.molecule[0] + atom_dict = dict(mol.get_element_count()) + + # --- FIX: Remove surface site marker 'X' --- + if 'X' in atom_dict: + del atom_dict['X'] + + # Calculate 'E' based on net charge: E = Z - charge + # --- FIX: Use .get() to avoid KeyError if 'X' or other unknown symbols are processed + Z_mol = sum(NUMBER_BY_SYMBOL.get(atom, 0) * count for atom, count in atom_dict.items()) + charge = mol.get_net_charge() + if 'E' not in atom_dict: # Don't double count if E is explicit + atom_dict['E'] = Z_mol - charge + + # Remove E if 0 to keep it clean + if atom_dict.get('E') == 0: + del atom_dict['E'] + + # Sort composition by atomic number + atom_dict = {k: atom_dict[k] for k in sorted(atom_dict.keys(), key=lambda x: NUMBER_BY_SYMBOL.get(x, 999))} + + # Thermo (NASA7) + thermo_data = species.get_thermo_data() + + # Sort polynomials by Tmin + sorted_polys = sorted(thermo_data.polynomials, key=lambda p: p.Tmin.value_si) + + polys = [] + for poly in sorted_polys: + polys.append({ + 'T-range': [poly.Tmin.value_si, poly.Tmax.value_si], + 'data': poly.coeffs.tolist() # a0..a6 + }) + + # Build the base dictionary + species_entry = { + 'name': get_label(species, species_list), + 'composition': atom_dict, + 'thermo': { + 'model': 'NASA7', + 'temperature-ranges': [sorted_polys[0].Tmin.value_si, sorted_polys[0].Tmax.value_si, + sorted_polys[1].Tmax.value_si], + 'data': [polys[0]['data'], polys[1]['data']] + }, + } + + # Transport (if available) - Only relevant for gas phase usually + if species.transport_data and not species.contains_surface_site(): + td = species.transport_data + + dipole = 0.0 + if td.dipoleMoment is not None: + dipole = td.dipoleMoment.value_si * 1e21 / constants.c # Debye + + polarizability = 0.0 + if hasattr(td, 'polarizability') and td.polarizability is not None: + polarizability = td.polarizability.value_si * 1e30 # Angstrom^3 + + rot_relax = 0.0 + if hasattr(td, 'rotrelaxcollnum') and td.rotrelaxcollnum is not None: + rot_relax = td.rotrelaxcollnum + + species_entry['transport'] = { + 'model': 'gas', + 'geometry': 'atom' if td.shapeIndex == 0 else 'linear' if td.shapeIndex == 1 else 'nonlinear', + 'well-depth': td.epsilon.value_si / constants.R, + 'diameter': td.sigma.value_si, + 'dipole': dipole, + 'rotational-relaxation': rot_relax + } + + if species.thermo and species.thermo.comment: + clean_comment = species.thermo.comment.replace('\n', '; ').strip() + notes.append(f"Thermo Source: {clean_comment}") + + if species.transport_data and species.transport_data.comment: + notes.append(f"Transport Source: {species.transport_data.comment.strip()}") + + if notes: + species_entry['note'] = " | ".join(notes) + + return species_entry + + +def reaction_to_dict_list(reaction, species_list=None): + """ + Convert an RMG Reaction object to a LIST of Cantera YAML dictionaries. + """ + # Check for MultiKinetics (duplicates grouped in one RMG object) + if isinstance(reaction.kinetics, (MultiArrhenius, MultiPDepArrhenius)): + entries = [] + sub_kinetics_list = reaction.kinetics.arrhenius + + for sub_kin in sub_kinetics_list: + sub_rxn = Reaction( + reactants=reaction.reactants, + products=reaction.products, + reversible=reaction.reversible, + kinetics=sub_kin, + duplicate=True + ) + sub_result = reaction_to_dict_list(sub_rxn, species_list) + if sub_result: + entries.extend(sub_result) + return entries + + kin = reaction.kinetics + + # Generate equation string + equation = get_reaction_equation(reaction, species_list) + entry = {'equation': equation} + + if reaction.duplicate: + entry['duplicate'] = True + + # --- Kinetics Serialization --- + + # 1. Surface Kinetics + if isinstance(kin, StickingCoefficient): + entry['type'] = 'sticking-Arrhenius' + entry['sticking-coefficient'] = {'A': kin.A.value_si, 'b': kin.n.value_si, 'Ea': kin.Ea.value_si} + + elif isinstance(kin, SurfaceArrhenius): + entry['type'] = 'interface-Arrhenius' + entry['rate-constant'] = {'A': kin.A.value_si, 'b': kin.n.value_si, 'Ea': kin.Ea.value_si} + + # 2. Gas Kinetics + elif isinstance(kin, Arrhenius): + entry['rate-constant'] = {'A': kin.A.value_si, 'b': kin.n.value_si, 'Ea': kin.Ea.value_si} + + elif isinstance(kin, Chebyshev): + entry['type'] = 'Chebyshev' + entry['temperature-range'] = [kin.Tmin.value_si, kin.Tmax.value_si] + entry['pressure-range'] = [kin.Pmin.value_si, kin.Pmax.value_si] + entry['data'] = kin.coeffs.value_si.tolist() + + elif isinstance(kin, ThirdBody): + entry['type'] = 'three-body' + entry['rate-constant'] = { + 'A': kin.arrheniusLow.A.value_si, + 'b': kin.arrheniusLow.n.value_si, + 'Ea': kin.arrheniusLow.Ea.value_si + } + entry['efficiencies'] = {lbl: v for m, v in kin.efficiencies.items() if + (lbl := get_label(m, species_list)) is not None} + + elif isinstance(kin, Troe): + entry['type'] = 'falloff' + entry['high-P-rate-constant'] = { + 'A': kin.arrheniusHigh.A.value_si, + 'b': kin.arrheniusHigh.n.value_si, + 'Ea': kin.arrheniusHigh.Ea.value_si + } + entry['low-P-rate-constant'] = { + 'A': kin.arrheniusLow.A.value_si, + 'b': kin.arrheniusLow.n.value_si, + 'Ea': kin.arrheniusLow.Ea.value_si + } + troe_p = {'A': kin.alpha, 'T3': kin.T3.value_si, 'T1': kin.T1.value_si} + if kin.T2: + troe_p['T2'] = kin.T2.value_si + entry['Troe'] = troe_p + entry['efficiencies'] = {lbl: v for m, v in kin.efficiencies.items() if + (lbl := get_label(m, species_list)) is not None} + + elif isinstance(kin, Lindemann): + entry['type'] = 'falloff' + entry['high-P-rate-constant'] = { + 'A': kin.arrheniusHigh.A.value_si, + 'b': kin.arrheniusHigh.n.value_si, + 'Ea': kin.arrheniusHigh.Ea.value_si + } + entry['low-P-rate-constant'] = { + 'A': kin.arrheniusLow.A.value_si, + 'b': kin.arrheniusLow.n.value_si, + 'Ea': kin.arrheniusLow.Ea.value_si + } + entry['efficiencies'] = {lbl: v for m, v in kin.efficiencies.items() if + (lbl := get_label(m, species_list)) is not None} + + elif isinstance(kin, PDepArrhenius): + # Check if any pressure point uses MultiArrhenius (sum of rates) + has_multi = any(isinstance(arr, MultiArrhenius) for arr in kin.arrhenius) + + if has_multi: + max_terms = 0 + for arr in kin.arrhenius: + if isinstance(arr, MultiArrhenius): + max_terms = max(max_terms, len(arr.arrhenius)) + else: + max_terms = max(max_terms, 1) + + entries = [] + for i in range(max_terms): + sub_entry = entry.copy() + sub_entry['type'] = 'pressure-dependent-Arrhenius' + sub_entry['duplicate'] = True + + rates = [] + for P, arr in zip(kin.pressures.value_si, kin.arrhenius): + current_arr = None + if isinstance(arr, MultiArrhenius): + if i < len(arr.arrhenius): + current_arr = arr.arrhenius[i] + elif isinstance(arr, Arrhenius): + if i == 0: + current_arr = arr + + if current_arr: + rates.append({ + 'P': P, + 'A': current_arr.A.value_si, + 'b': current_arr.n.value_si, + 'Ea': current_arr.Ea.value_si + }) + else: + rates.append({'P': P, 'A': 0.0, 'b': 0.0, 'Ea': 0.0}) + + sub_entry['rate-constants'] = rates + entries.append(sub_entry) + return entries + + else: + entry['type'] = 'pressure-dependent-Arrhenius' + rates = [] + for P, arr in zip(kin.pressures.value_si, kin.arrhenius): + rates.append({ + 'P': P, + 'A': arr.A.value_si, + 'b': arr.n.value_si, + 'Ea': arr.Ea.value_si + }) + entry['rate-constants'] = rates + + else: + logging.warning(f"Skipping reaction {equation}: Unknown kinetics type {type(kin)}") + return [] + + # --- Coverage Dependencies --- + if hasattr(kin, 'coverage_dependence') and kin.coverage_dependence: + cov_deps = {} + for sp, cov_params in kin.coverage_dependence.items(): + sp_label = get_label(sp, species_list) + if sp_label: + # Cantera YAML expects { a: ..., m: ..., E: ... } + cov_deps[sp_label] = { + 'a': cov_params.a.value_si, + 'm': cov_params.m.value_si, + 'E': cov_params.E.value_si + } + if cov_deps: + entry['coverage-dependencies'] = cov_deps + + # --- Metadata / Notes --- + note_parts = list() + if isinstance(reaction, TemplateReaction): + note_parts.append(f"Source: Template family {reaction.family}") + elif isinstance(reaction, LibraryReaction): + note_parts.append(f"Source: Library {reaction.library}") + elif isinstance(reaction, PDepReaction): + note_parts.append(f"Source: PDep Network #{reaction.network.index}") + elif isinstance(reaction, Reaction): + note_parts.append(f"Source: P{reaction.kinetics.comment}") + + if hasattr(kin, 'comment') and kin.comment: + clean_comment = kin.comment.replace('\n', '; ').strip() + if clean_comment: + note_parts.append(clean_comment) + + if reaction.specific_collider: + note_parts.append(f"Specific collider: {reaction.specific_collider.label}") + + if note_parts: + entry['note'] = " | ".join(note_parts) + + return [entry] + + +def get_reaction_equation(reaction, species_list): + """Helper to build reaction string""" + reactants_str = " + ".join([get_label(r, species_list) for r in reaction.reactants]) + products_str = " + ".join([get_label(p, species_list) for p in reaction.products]) + + suffix = "" + kin = reaction.kinetics + if isinstance(kin, (ThirdBody, Lindemann, Troe)): + if hasattr(reaction, 'specific_collider') and reaction.specific_collider: + suffix = " + " + get_label(reaction.specific_collider, species_list) + else: + suffix = " (+ M)" + + return reactants_str + suffix + " <=> " + products_str + suffix + + +def get_label(obj: Union['Species', 'Molecule'], species_list: list['Species']): + if species_list: + for sp in species_list: + if sp.is_isomorphic(obj): + return f'{sp.label}({sp.index})' if sp.index > 0 else sp.label + return None diff --git a/rmgpy/kinetics/diffusionLimited.py b/rmgpy/kinetics/diffusionLimited.py index 4805f2857e..fd96ff85e0 100644 --- a/rmgpy/kinetics/diffusionLimited.py +++ b/rmgpy/kinetics/diffusionLimited.py @@ -34,7 +34,7 @@ import rmgpy.constants as constants from rmgpy.molecule.fragment import Fragment -from rmgpy.species import Species + def _to_molecule(obj): """Return plain Molecule; accept Molecule or (Molecule, mapping) tuple.""" @@ -110,9 +110,10 @@ def get_diffusion_limit(self, T, reaction, forward=True): Return the diffusive limit on the rate coefficient, k_diff. This is the upper limit on the rate, in the specified direction. - (ie. forward direction if forward=True [default] or reverse if forward=False) + (i.e. forward direction if forward=True [default] or reverse if forward=False) Returns the rate coefficient k_diff in m3/mol/s. """ + from rmgpy.species import Species if forward: reacting = reaction.reactants else: diff --git a/rmgpy/kinetics/falloff.pyx b/rmgpy/kinetics/falloff.pyx index 12290667be..bc7de3650f 100644 --- a/rmgpy/kinetics/falloff.pyx +++ b/rmgpy/kinetics/falloff.pyx @@ -124,9 +124,7 @@ cdef class ThirdBody(PDepKineticsModel): """ Sets the kinetics and efficiencies for a cantera `ThreeBodyReaction` object """ - import cantera as ct - assert isinstance(ct_reaction, ct.ThreeBodyReaction), "Must be a Cantera ThreeBodyReaction object" - ct_reaction.efficiencies = PDepKineticsModel.get_cantera_efficiencies(self, species_list) + ct_reaction.third_body.efficiencies = PDepKineticsModel.get_cantera_efficiencies(self, species_list) self.arrheniusLow.set_cantera_kinetics(ct_reaction, species_list) ################################################################################ @@ -226,7 +224,7 @@ cdef class Lindemann(PDepKineticsModel): """ import cantera as ct assert isinstance(ct_reaction.rate, ct.LindemannRate), "Must have a Cantera LindemannRate attribute" - ct_reaction.efficiencies = PDepKineticsModel.get_cantera_efficiencies(self, species_list) + ct_reaction.third_body.efficiencies = PDepKineticsModel.get_cantera_efficiencies(self, species_list) ct_reaction.rate = self.to_cantera_kinetics() @@ -398,7 +396,7 @@ cdef class Troe(PDepKineticsModel): """ import cantera as ct assert isinstance(ct_reaction.rate, ct.TroeRate), "Must have a Cantera TroeRate attribute" - ct_reaction.efficiencies = PDepKineticsModel.get_cantera_efficiencies(self, species_list) + ct_reaction.third_body.efficiencies = PDepKineticsModel.get_cantera_efficiencies(self, species_list) ct_reaction.rate = self.to_cantera_kinetics() def to_cantera_kinetics(self): diff --git a/rmgpy/reaction.py b/rmgpy/reaction.py index 1997824afc..a8ae13f3d3 100644 --- a/rmgpy/reaction.py +++ b/rmgpy/reaction.py @@ -347,43 +347,53 @@ def to_cantera(self, species_list=None, use_chemkin_identifier=False): ct_reaction = ct.Reaction(reactants=ct_reactants, products=ct_products, rate=ct.ChebyshevRate()) elif isinstance(self.kinetics, ThirdBody): - if ct_collider is not None: - ct_reaction = ct.ThreeBodyReaction(reactants=ct_reactants, products=ct_products, third_body=ct_collider) + if ct_collider: + ct_reaction = ct.Reaction(reactants=ct_reactants, + products=ct_products, + third_body=ct_collider, + rate=ct.ArrheniusRate(), + ) else: - ct_reaction = ct.ThreeBodyReaction(reactants=ct_reactants, products=ct_products) + ct_reaction = ct.Reaction(reactants=ct_reactants, + products=ct_products, + third_body="M", + rate=ct.ArrheniusRate(), + ) elif isinstance(self.kinetics, Troe): - if ct_collider is not None: - ct_reaction = ct.FalloffReaction( + if ct_collider: + ct_reaction = ct.Reaction( reactants=ct_reactants, products=ct_products, - tbody=ct_collider, + third_body=ct_collider, rate=ct.TroeRate() ) else: - ct_reaction = ct.FalloffReaction( + ct_reaction = ct.Reaction( reactants=ct_reactants, products=ct_products, + third_body="M", rate=ct.TroeRate() ) elif isinstance(self.kinetics, Lindemann): - if ct_collider is not None: - ct_reaction = ct.FalloffReaction( + if ct_collider: + ct_reaction = ct.Reaction( reactants=ct_reactants, products=ct_products, - tbody=ct_collider, + third_body=ct_collider, rate=ct.LindemannRate() ) else: - ct_reaction = ct.FalloffReaction( + ct_reaction = ct.Reaction( reactants=ct_reactants, products=ct_products, + third_body="M", rate=ct.LindemannRate() ) elif isinstance(self.kinetics, SurfaceArrhenius): - ct_reaction = ct.InterfaceReaction( + ct_reaction = ct.Reaction( reactants=ct_reactants, products=ct_products, rate=ct.InterfaceArrheniusRate() @@ -417,8 +427,6 @@ def to_cantera(self, species_list=None, use_chemkin_identifier=False): return ct_reaction - - def get_url(self): """ Get a URL to search for this reaction in the rmg website. diff --git a/rmgpy/rmg/main.py b/rmgpy/rmg/main.py index 88ea75ff4c..2c9a9ade91 100644 --- a/rmgpy/rmg/main.py +++ b/rmgpy/rmg/main.py @@ -41,23 +41,21 @@ import shutil import sys import time -import warnings from copy import deepcopy import h5py import numpy as np import psutil import yaml -from cantera import ck2yaml from scipy.optimize import brute import rmgpy.util as util from rmgpy import settings +from rmgpy.cantera import CanteraWriter from rmgpy.chemkin import ChemkinWriter from rmgpy.constraints import fails_species_constraints from rmgpy.data.base import Entry -from rmgpy.data.kinetics.family import TemplateReaction -from rmgpy.data.kinetics.library import KineticsLibrary, LibraryReaction +from rmgpy.data.kinetics.library import KineticsLibrary from rmgpy.data.rmg import RMGDatabase from rmgpy.data.vaporLiquidMassTransfer import vapor_liquid_mass_transfer from rmgpy.exceptions import ( @@ -74,11 +72,10 @@ from rmgpy.rmg.listener import SimulationProfilePlotter, SimulationProfileWriter from rmgpy.rmg.model import CoreEdgeReactionModel, Species from rmgpy.rmg.output import OutputHTMLWriter -from rmgpy.rmg.pdep import PDepNetwork, PDepReaction +from rmgpy.rmg.pdep import PDepNetwork from rmgpy.rmg.reactionmechanismsimulator_reactors import Reactor as RMSReactor from rmgpy.rmg.settings import ModelSettings -from rmgpy.solver.base import TerminationConversion, TerminationTime -from rmgpy.solver.simple import SimpleReactor +from rmgpy.solver.base import TerminationTime from rmgpy.stats import ExecutionStatsWriter from rmgpy.thermo.thermoengine import submit from rmgpy.tools.plot import plot_sensitivity @@ -771,8 +768,8 @@ def register_listeners(self, requires_rms=False): """ self.attach(ChemkinWriter(self.output_directory)) - self.attach(RMSWriter(self.output_directory)) + self.attach(CanteraWriter(self.output_directory)) if self.generate_output_html: self.attach(OutputHTMLWriter(self.output_directory)) @@ -1221,25 +1218,6 @@ def execute(self, initialize=True, **kwargs): self.run_model_analysis() - # generate Cantera files chem.yaml & chem_annotated.yaml in a designated `cantera` output folder - try: - if any([s.contains_surface_site() for s in self.reaction_model.core.species]): - self.generate_cantera_files( - os.path.join(self.output_directory, "chemkin", "chem-gas.inp"), - surface_file=(os.path.join(self.output_directory, "chemkin", "chem-surface.inp")), - ) - self.generate_cantera_files( - os.path.join(self.output_directory, "chemkin", "chem_annotated-gas.inp"), - surface_file=(os.path.join(self.output_directory, "chemkin", "chem_annotated-surface.inp")), - ) - else: # gas phase only - self.generate_cantera_files(os.path.join(self.output_directory, "chemkin", "chem.inp")) - self.generate_cantera_files(os.path.join(self.output_directory, "chemkin", "chem_annotated.inp")) - except EnvironmentError: - logging.exception("Could not generate Cantera files due to EnvironmentError. Check read\\write privileges in output directory.") - except Exception: - logging.exception("Could not generate Cantera files for some reason.") - self.check_model() # Write output file logging.info("") @@ -1805,32 +1783,6 @@ def process_reactions_to_species(self, obj): raise TypeError("improper call, obj input was incorrect") return potential_spcs - def generate_cantera_files(self, chemkin_file, **kwargs): - """ - Convert a chemkin mechanism chem.inp file to a cantera mechanism file chem.yaml - and save it in the cantera directory - """ - transport_file = os.path.join(os.path.dirname(chemkin_file), "tran.dat") - file_name = os.path.splitext(os.path.basename(chemkin_file))[0] + ".yaml" - out_name = os.path.join(self.output_directory, "cantera", file_name) - if "surface_file" in kwargs: - out_name = out_name.replace("-gas.", ".") - cantera_dir = os.path.dirname(out_name) - try: - os.makedirs(cantera_dir) - except OSError: - if not os.path.isdir(cantera_dir): - raise - if os.path.exists(out_name): - os.remove(out_name) - parser = ck2yaml.Parser() - try: - parser.convert_mech(chemkin_file, transport_file=transport_file, out_name=out_name, quiet=True, permissive=True, **kwargs) - except ck2yaml.InputError: - logging.exception("Error converting to Cantera format.") - logging.info("Trying again without transport data file.") - parser.convert_mech(chemkin_file, out_name=out_name, quiet=True, permissive=True, **kwargs) - def initialize_reaction_threshold_and_react_flags(self): num_core_species = len(self.reaction_model.core.species) diff --git a/rmgpy/tools/canteramodel.py b/rmgpy/tools/canteramodel.py index 17f9809558..56ae45b372 100644 --- a/rmgpy/tools/canteramodel.py +++ b/rmgpy/tools/canteramodel.py @@ -817,13 +817,11 @@ def check_equivalent_falloff(fall1, fall2): if isinstance(ct_rxn1, ct.Reaction): # may not mean it is arrhenius, need to check if it is troe, if isinstance(ct_rxn1.rate, ct.ArrheniusRate): - assert ct_rxn1.allow_negative_pre_exponential_factor == ct_rxn2.allow_negative_pre_exponential_factor, \ - "Same allow_negative_pre_exponential_factor attribute" if ct_rxn1.rate or ct_rxn2.rate: check_equivalent_arrhenius(ct_rxn1.rate, ct_rxn2.rate) elif isinstance(ct_rxn1.rate, ct.PlogRate): if ct_rxn1.rate.rates or ct_rxn2.rate.rates: - assert len(ct_rxn1.rates) == len(ct_rxn2.rates), "Same number of rates in PLOG reaction" + assert len(ct_rxn1.rate.rates) == len(ct_rxn2.rate.rates), "Same number of rates in PLOG reaction" for i in range(len(ct_rxn1.rate.rates)): P1, arr1 = ct_rxn1.rate.rates[i] diff --git a/test/rmgpy/canteraTest.py b/test/rmgpy/canteraTest.py new file mode 100644 index 0000000000..23ad23eed1 --- /dev/null +++ b/test/rmgpy/canteraTest.py @@ -0,0 +1,401 @@ +#!/usr/bin/env python3 + +############################################################################### +# # +# RMG - Reaction Mechanism Generator # +# # +# Copyright (c) 2002-2023 Prof. William H. Green (whgreen@mit.edu), # +# Prof. Richard H. West (r.west@neu.edu) and the RMG Team (rmg_dev@mit.edu) # +# # +# Permission is hereby granted, free of charge, to any person obtaining a # +# copy of this software and associated documentation files (the 'Software'), # +# to deal in the Software without restriction, including without limitation # +# the rights to use, copy, modify, merge, publish, distribute, sublicense, # +# and/or sell copies of the Software, and to permit persons to whom the # +# Software is furnished to do so, subject to the following conditions: # +# # +# The above copyright notice and this permission notice shall be included in # +# all copies or substantial portions of the Software. # +# # +# THE SOFTWARE IS PROVIDED 'AS IS', WITHOUT WARRANTY OF ANY KIND, EXPRESS OR # +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, # +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE # +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER # +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING # +# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER # +# DEALINGS IN THE SOFTWARE. # +# # +############################################################################### + + +import cantera as ct +import os +import shutil +import numpy as np +import pytest + +from rmgpy.species import Species +from rmgpy.reaction import Reaction +from rmgpy.kinetics import ( + Arrhenius, + PDepArrhenius, + MultiArrhenius, + Chebyshev, + Troe, + Lindemann, + ThirdBody, +) +from rmgpy.thermo import NASA, NASAPolynomial +from rmgpy.transport import TransportData +from rmgpy.cantera import ( + CanteraWriter, + save_cantera_files, + species_to_dict, + reaction_to_dict_list, + generate_cantera_data +) + + +class TestCanteraWriter: + + def setup_method(self): + """ + Create a temporary directory for file I/O tests. + """ + base_dir = os.path.dirname(os.path.abspath(__file__)) + self.tmp_dir = os.path.join(base_dir, 'tmp') + + # Ensure a clean start: delete if exists, then create + if os.path.exists(self.tmp_dir): + shutil.rmtree(self.tmp_dir) + os.makedirs(self.tmp_dir) + + def teardown_method(self): + """ + Clean up the temporary directory after tests. + """ + shutil.rmtree(self.tmp_dir) + + + def _create_dummy_species(self, label, formula, index=-1): + """Helper to create a functional RMG Species object with thermo/transport""" + sp = Species(label=label).from_smiles(formula) + sp.index = index + coeffs = [1.0, 0.0, 0.0, 0.0, 0.0, -100.0, 1.0] + poly_low = NASAPolynomial(coeffs=coeffs, Tmin=(200, 'K'), Tmax=(1000, 'K')) + poly_high = NASAPolynomial(coeffs=coeffs, Tmin=(1000, 'K'), Tmax=(6000, 'K')) + sp.thermo = NASA(polynomials=[poly_low, poly_high], Tmin=(200, 'K'), Tmax=(6000, 'K')) + num_atoms = len(sp.molecule[0].atoms) + if num_atoms == 1: + shape_idx = 0 + elif num_atoms == 2: + shape_idx = 1 + else: + shape_idx = 2 + sp.transport_data = TransportData( + shapeIndex=shape_idx, + sigma=(3.0, 'angstrom'), + epsilon=(100.0, 'K'), + dipoleMoment=(0.0, 'De'), + polarizability=(0.0, 'angstrom^3'), + rotrelaxcollnum=1.0 + ) + return sp + + def test_species_to_dict_standard(self): + """Test conversion of a standard gas species.""" + sp = self._create_dummy_species("H2", "[H][H]", index=1) + d = species_to_dict(sp, [sp]) + + assert d['name'] == "H2(1)" + assert 'composition' in d + assert d['thermo']['model'] == 'NASA7' + assert len(d['thermo']['data']) == 2 + + # Verify Transport + assert 'transport' in d + assert d['transport']['model'] == 'gas' + assert d['transport']['geometry'] == 'linear' + # Diameter should be in meters (SI) + assert np.isclose(d['transport']['diameter'], 3.0e-10) + + def test_reaction_to_dict_arrhenius(self): + """Test standard Arrhenius kinetics.""" + r = self._create_dummy_species("R", "[CH2]O", index=1) + p = self._create_dummy_species("P", "C[O]", index=2) + rxn = Reaction( + reactants=[r], products=[p], + kinetics=Arrhenius(A=(1e10, "s^-1"), n=0.5, Ea=(10, "kJ/mol"), T0=(1, "K")) + ) + + entries = reaction_to_dict_list(rxn, species_list=[r, p]) + assert len(entries) == 1 + data = entries[0] + + assert data['equation'] == "R(1) <=> P(2)" + assert 'rate-constant' in data + assert np.isclose(data['rate-constant']['A'], 1e10) + assert np.isclose(data['rate-constant']['b'], 0.5) + assert np.isclose(data['rate-constant']['Ea'], 10000.0) + + def test_reaction_to_dict_duplicates(self): + """Test that MultiKinetics objects result in multiple YAML entries.""" + r = self._create_dummy_species("R", "[H]", index=1) + k1 = Arrhenius(A=(1e10, "s^-1"), n=0, Ea=(0, "J/mol"), T0=(1, "K")) + k2 = Arrhenius(A=(2e10, "s^-1"), n=0, Ea=(0, "J/mol"), T0=(1, "K")) + + rxn = Reaction( + reactants=[r], products=[r], + kinetics=MultiArrhenius(arrhenius=[k1, k2]), + duplicate=True + ) + + entries = reaction_to_dict_list(rxn, species_list=[r]) + assert len(entries) == 2 + assert entries[0]['rate-constant']['A'] == 1e10 + assert entries[1]['rate-constant']['A'] == 2e10 + assert entries[0].get('duplicate') is True + + def test_reaction_to_dict_troe(self): + """Test Falloff/Troe serialization.""" + r = self._create_dummy_species("R", "[H]", index=1) + M = self._create_dummy_species("M", "[Ar]", index=-1) + + # Troe + k_high = Arrhenius(A=(1e14, "s^-1"), n=0, Ea=(10, "kJ/mol"), T0=(1, "K")) + k_low = Arrhenius(A=(1e20, "cm^3/(mol*s)"), n=0, Ea=(10, "kJ/mol"), T0=(1, "K")) + + troe = Troe( + arrheniusHigh=k_high, arrheniusLow=k_low, + alpha=0.5, T3=(100, "K"), T1=(200, "K"), T2=(300, "K"), + efficiencies={M.molecule[0]: 2.0} + ) + + rxn = Reaction(reactants=[r], products=[r], kinetics=troe) + entries = reaction_to_dict_list(rxn, species_list=[r, M]) + data = entries[0] + + assert data['type'] == 'falloff' + assert 'Troe' in data + assert data['Troe']['A'] == 0.5 + assert data['Troe']['T2'] == 300.0 + # Efficiencies should map label -> val + assert data['efficiencies'] == {"M": 2.0} + + def test_generate_cantera_data_detects_plasma(self): + """Test that the writer detects 'e' and sets thermo: plasma.""" + h2 = self._create_dummy_species("H2", "[H][H]", index=1) + + # Case 1: No Electron + data = generate_cantera_data([h2], [], is_plasma=False) + phase = data['phases'][0] + assert phase['thermo'] == 'ideal-gas' + assert phase['transport'] == 'mixture-averaged' + + def test_full_integration_plasma_model(self): + """ + Create a comprehensive RMG model, write it to disk, and load it in Cantera + to ensure all fields are valid and parsed correctly. + """ + + # 1. Create Model Components + h2 = self._create_dummy_species("H2", "[H][H]", index=1) + h = self._create_dummy_species("H", "[H]", index=2) + ch4 = self._create_dummy_species("CH4", "C", index=3) + oh = self._create_dummy_species("OH", "[OH]", index=4) + ar = self._create_dummy_species("Ar", "[Ar]", index=-1) + + species = [h2, h, ch4, oh, ar] + + r1 = Reaction( + reactants=[h2], products=[h, h], + kinetics=Arrhenius(A=(1e13, "s^-1"), n=0, Ea=(400, "kJ/mol"), T0=(1, "K")) + ) + + r2 = Reaction( + reactants=[h, h], products=[h2], + kinetics=ThirdBody( + arrheniusLow=Arrhenius(A=(1e18, "cm^6/(mol^2*s)"), n=-1, Ea=(0, "J/mol"), T0=(1, "K")), + efficiencies={ar.molecule[0]: 0.7} + ) + ) + + reactions = [r1, r2] + + # 2. Mock RMG Object Structure + # The writer expects: rmg.output_directory and rmg.reaction_model.core + class MockCore: + def __init__(self): + self.species = species + self.reactions = reactions + + class MockModel: + def __init__(self): + self.core = MockCore() + self.edge = MockCore() # Empty for now + + class MockRMG: + def __init__(self, out_dir): + self.output_directory = out_dir + self.reaction_model = MockModel() + self.save_edge_species = False + + mock_rmg = MockRMG(self.tmp_dir) + save_cantera_files(mock_rmg) + + yaml_file = os.path.join(self.tmp_dir, "cantera", "chem.yaml") + versioned_file = os.path.join(self.tmp_dir, "cantera", "chem0005.yaml") + assert os.path.exists(yaml_file) + assert os.path.exists(versioned_file) + + try: + sol = ct.Solution(yaml_file) + except Exception as e: + pytest.fail(f"Cantera failed to load the generated YAML: {e}") + + assert sol.n_species == 5 + + assert sol.n_reactions == 2 + + ct_r2 = sol.reaction(1) + assert "three-body" in ct_r2.reaction_type or "ThreeBody" in ct_r2.reaction_type + assert np.isclose(ct_r2.third_body.efficiencies["Ar"], 0.7) + + def test_reaction_to_dict_pdep_arrhenius(self): + """Test Pressure-Dependent Arrhenius (PLOG) structure.""" + r = self._create_dummy_species("R", "[CH2]O", index=1) + p = self._create_dummy_species("P", "C[O]", index=2) + + k_low = Arrhenius(A=(1e10, "s^-1"), n=0, Ea=(10, "kJ/mol"), T0=(1, "K")) + k_high = Arrhenius(A=(1e12, "s^-1"), n=0, Ea=(15, "kJ/mol"), T0=(1, "K")) + + pdep = PDepArrhenius( + pressures=([0.1, 1.0], "atm"), + arrhenius=[k_low, k_high], + ) + + rxn = Reaction(reactants=[r], products=[p], kinetics=pdep) + + entries = reaction_to_dict_list(rxn, species_list=[r, p]) + data = entries[0] + + assert data['type'] == 'pressure-dependent-Arrhenius' + rates = data['rate-constants'] + assert len(rates) == 2 + + assert np.isclose(rates[0]['P'], 0.1 * 101325.0) + assert np.isclose(rates[0]['A'], 1e10) + assert np.isclose(rates[0]['Ea'], 10000.0) + + assert np.isclose(rates[1]['P'], 1.0 * 101325.0) + assert np.isclose(rates[1]['A'], 1e12) + assert np.isclose(rates[1]['Ea'], 15000.0) + + def test_reaction_to_dict_chebyshev(self): + """Test Chebyshev kinetics structure.""" + r = self._create_dummy_species("R", "[H]", index=1) + + # 2x2 Coefficients matrix + coeffs = np.array([[1.0, 2.0], [3.0, 4.0]]) + cheb = Chebyshev( + Tmin=(300, "K"), Tmax=(2000, "K"), + Pmin=(0.01, "atm"), Pmax=(100, "atm"), + coeffs=coeffs, + kunits="s^-1" + ) + + rxn = Reaction(reactants=[r], products=[r], kinetics=cheb) + + entries = reaction_to_dict_list(rxn, species_list=[r]) + data = entries[0] + + assert data['type'] == 'Chebyshev' + + assert np.allclose(data['temperature-range'], [300.0, 2000.0]) + assert np.allclose(data['pressure-range'], [0.01 * 101325.0, 100 * 101325.0]) + assert np.allclose(data['data'], coeffs) + + def test_reaction_to_dict_lindemann(self): + """Test Lindemann (Falloff without Troe parameters).""" + r = self._create_dummy_species("R", "[H]", index=1) + M = self._create_dummy_species("M", "[Ar]", index=-1) + + k_high = Arrhenius(A=(1e14, "s^-1"), n=0, Ea=(10, "kJ/mol"), T0=(1, "K")) + k_low = Arrhenius(A=(1e21, "cm^3/(mol*s)"), n=0, Ea=(10, "kJ/mol"), T0=(1, "K")) + lind = Lindemann( + arrheniusHigh=k_high, + arrheniusLow=k_low, + efficiencies={M.molecule[0]: 5.0}, + ) + rxn = Reaction(reactants=[r], products=[r], kinetics=lind) + entries = reaction_to_dict_list(rxn, species_list=[r, M]) + data = entries[0] + + assert data['type'] == 'falloff' + assert 'high-P-rate-constant' in data + assert 'low-P-rate-constant' in data + assert np.isclose(data['high-P-rate-constant']['A'], 1e14) + assert np.isclose(data['low-P-rate-constant']['A'], 1e15) + assert data['efficiencies'] == {"M": 5.0} + assert 'Troe' not in data + + def test_cantera_writer_class_listener(self): + """ + Test the CanteraWriter class directly to ensure it correctly initializes + subdirectories and triggers the save on update(). + """ + writer = CanteraWriter(self.tmp_dir) + cantera_dir = os.path.join(self.tmp_dir, 'cantera') + assert os.path.exists(cantera_dir) + assert os.path.isdir(cantera_dir) + + mock_rmg = self._create_dummy_model() + writer.update(mock_rmg) + + versioned_file = os.path.join(cantera_dir, 'chem0002.yaml') + latest_file = os.path.join(cantera_dir, 'chem.yaml') + + assert os.path.exists(versioned_file) + assert os.path.exists(latest_file) + + with open(latest_file, 'r') as f: + content = f.read() + assert "generator: RMG-Py CanteraWriter" in content + assert "phases:" in content + assert "species:" in content + + def _create_dummy_model(self): + """Creates a mock object structure resembling RMG.reaction_model""" + + # 1. Species + sp_H2 = self._create_dummy_species("H2", "[H][H]", index=1) + sp_H = self._create_dummy_species("H", "[H]", index=2) + species_list = [sp_H2, sp_H] + + # 2. Reactions + rxn_arr = Reaction( + reactants=[sp_H2], products=[sp_H, sp_H], + kinetics=Arrhenius(A=(1e13, "s^-1"), n=0.0, Ea=(200, "kJ/mol"), T0=(1, "K")) + ) + reaction_list = [rxn_arr] + + # Mock Object Structure + class MockCore: + def __init__(self, s, r): + self.species = s + self.reactions = r + + class MockModel: + def __init__(self, core): + self.core = core + self.edge = MockCore([], []) + self.output_species_list = [] + self.output_reaction_list = [] + + class MockRMG: + def __init__(self, out_dir, model): + self.output_directory = out_dir + self.reaction_model = model + self.save_edge_species = False + + return MockRMG(self.tmp_dir, MockModel(MockCore(species_list, reaction_list))) diff --git a/test/rmgpy/reactionTest.py b/test/rmgpy/reactionTest.py index 4e1cc0cc50..c8539d4e38 100644 --- a/test/rmgpy/reactionTest.py +++ b/test/rmgpy/reactionTest.py @@ -35,7 +35,6 @@ import cantera as ct import numpy -import yaml from copy import deepcopy import pytest @@ -2948,34 +2947,58 @@ def test_pdep_arrhenius(self): ct_objects = [self.ct_pdepArrhenius] converted_ct_objects = [obj.to_cantera(self.species_list, use_chemkin_identifier=True) for obj in rmg_objects] + def assert_plog_rates_equal(rates1, rates2, rtol=1e-9, atol=0.0): + assert len(rates1) == len(rates2) + # Sort by pressure to avoid ordering differences (Plog stores a multimap internally) + rates1 = sorted(rates1, key=lambda x: x[0]) + rates2 = sorted(rates2, key=lambda x: x[0]) + for (p1, r1), (p2, r2) in zip(rates1, rates2): + assert math.isclose(float(p1), float(p2), rel_tol=rtol, abs_tol=atol) + A1, b1, Ea1 = r1.pre_exponential_factor, r1.temperature_exponent, r1.activation_energy + A2, b2, Ea2 = r2.pre_exponential_factor, r2.temperature_exponent, r2.activation_energy + assert math.isclose(A1, A2, rel_tol=rtol, abs_tol=atol) + assert math.isclose(b1, b2, rel_tol=rtol, abs_tol=atol) + assert math.isclose(Ea1, Ea2, rel_tol=rtol, abs_tol=atol) + for converted_obj, ct_obj in zip(converted_ct_objects, ct_objects): # Check that the reaction class is the same assert type(converted_obj) == type(ct_obj) # Check that the reaction string is the same assert repr(converted_obj) == repr(ct_obj) # Check that the Arrhenius rates are identical - assert str(converted_obj.rates) == str(ct_obj.rates) + assert_plog_rates_equal(converted_obj.rate.rates, ct_obj.rate.rates) def test_multi_pdep_arrhenius(self): """ Tests formation of cantera reactions with MultiPDepArrhenius kinetics. """ - rmg_objects = [self.multiPdepArrhenius] ct_objects = [self.ct_multiPdepArrhenius] converted_ct_objects = [obj.to_cantera(self.species_list, use_chemkin_identifier=True) for obj in rmg_objects] + def assert_plog_rates_equal(rates1, rates2, rtol=1e-9, atol=0.0): + assert len(rates1) == len(rates2) + # Sort by pressure to avoid ordering differences (Plog stores a multimap internally) + rates1 = sorted(rates1, key=lambda x: x[0]) + rates2 = sorted(rates2, key=lambda x: x[0]) + for (p1, r1), (p2, r2) in zip(rates1, rates2): + assert math.isclose(float(p1), float(p2), rel_tol=rtol, abs_tol=atol) + A1, b1, Ea1 = r1.pre_exponential_factor, r1.temperature_exponent, r1.activation_energy + A2, b2, Ea2 = r2.pre_exponential_factor, r2.temperature_exponent, r2.activation_energy + assert math.isclose(A1, A2, rel_tol=rtol, abs_tol=atol) + assert math.isclose(b1, b2, rel_tol=rtol, abs_tol=atol) + assert math.isclose(Ea1, Ea2, rel_tol=rtol, abs_tol=atol) + for converted_obj, ct_obj in zip(converted_ct_objects, ct_objects): # Check that the same number of reactions are produced assert len(converted_obj) == len(ct_obj) - for converted_rxn, ct_rxn in zip(converted_obj, ct_obj): # Check that the reaction has the same type assert type(converted_rxn) == type(ct_rxn) # Check that the reaction string is the same assert repr(converted_rxn) == repr(ct_rxn) # Check that the Arrhenius rates are identical - assert str(converted_rxn.rates) == str(ct_rxn.rates) + assert_plog_rates_equal(converted_rxn.rate.rates, ct_rxn.rate.rates) def test_chebyshev(self): """ @@ -2996,18 +3019,18 @@ def test_falloff(self): assert round(abs(ct_troe.rate.low_rate.pre_exponential_factor - self.ct_troe.rate.low_rate.pre_exponential_factor), 3) == 0 assert ct_troe.rate.low_rate.temperature_exponent == self.ct_troe.rate.low_rate.temperature_exponent assert ct_troe.rate.low_rate.activation_energy == self.ct_troe.rate.low_rate.activation_energy - assert ct_troe.efficiencies == self.ct_troe.efficiencies + assert ct_troe.third_body.efficiencies == self.ct_troe.third_body.efficiencies ct_third_body = self.thirdBody.to_cantera(self.species_list, use_chemkin_identifier=True) assert type(ct_third_body.rate) == type(self.ct_thirdBody.rate) assert round(abs(ct_third_body.rate.pre_exponential_factor - self.ct_thirdBody.rate.pre_exponential_factor), 3) == 0 assert ct_third_body.rate.temperature_exponent == self.ct_thirdBody.rate.temperature_exponent assert ct_third_body.rate.activation_energy == self.ct_thirdBody.rate.activation_energy - assert ct_third_body.efficiencies == self.ct_thirdBody.efficiencies + assert ct_third_body.third_body.efficiencies == self.ct_thirdBody.third_body.efficiencies ct_lindemann = self.lindemann.to_cantera(self.species_list, use_chemkin_identifier=True) assert type(ct_lindemann.rate) == type(self.ct_lindemann.rate) - assert ct_lindemann.efficiencies == self.ct_lindemann.efficiencies + assert ct_lindemann.third_body.efficiencies == self.ct_lindemann.third_body.efficiencies assert str(ct_lindemann.rate.low_rate) == str(self.ct_lindemann.rate.low_rate) assert str(ct_lindemann.rate.high_rate) == str(self.ct_lindemann.rate.high_rate) diff --git a/test/rmgpy/rmg/mainTest.py b/test/rmgpy/rmg/mainTest.py index c71f2bbb76..fdca40c680 100644 --- a/test/rmgpy/rmg/mainTest.py +++ b/test/rmgpy/rmg/mainTest.py @@ -55,7 +55,7 @@ def setup_class(cls): cls.seedKinetics = os.path.join(cls.databaseDirectory, "kinetics", "libraries", "testSeed") cls.seedKineticsEdge = os.path.join(cls.databaseDirectory, "kinetics", "libraries", "testSeed_edge") - os.mkdir(os.path.join(cls.testDir, cls.outputDir)) + os.makedirs(os.path.join(cls.testDir, cls.outputDir), exist_ok=True) cls.rmg = RMG( input_file=os.path.join(cls.testDir, "input.py"), @@ -174,7 +174,7 @@ def test_rmg_memory(self): def test_make_cantera_input_file(self): """ - This tests to ensure that a usable Cantera input file is created. + This test ensures that a usable Cantera input file is created. """ import cantera as ct @@ -274,7 +274,7 @@ def setup_class(cls): cls.outputDir = os.path.join(cls.testDir, "output") cls.databaseDirectory = settings["database.directory"] - os.mkdir(os.path.join(cls.testDir, cls.outputDir)) + os.makedirs(os.path.join(cls.testDir, cls.outputDir), exist_ok=True) cls.max_iter = 10 @@ -360,168 +360,3 @@ def teardown_class(cls): os.remove(os.path.join(cls.test_dir, "RMG.profile.dot")) os.remove(os.path.join(cls.test_dir, "RMG.profile.dot.ps2")) - -class TestCanteraOutput: - def setup_class(self): - self.chemkin_files = { - """ELEMENTS - H - D /2.014/ - T /3.016/ - C - CI /13.003/ - O - OI /18.000/ - N - -END - -SPECIES - ethane(1) - CH3(4) -END - -THERM ALL - 300.000 1000.000 5000.000 - -ethane(1) H 6 C 2 G100.000 5000.000 954.52 1 - 4.58987205E+00 1.41507042E-02-4.75958084E-06 8.60284590E-10-6.21708569E-14 2 --1.27217823E+04-3.61762003E+00 3.78032308E+00-3.24248354E-03 5.52375224E-05 3 --6.38573917E-08 2.28633835E-11-1.16203404E+04 5.21037799E+00 4 - -CH3(4) H 3 C 1 G100.000 5000.000 1337.62 1 - 3.54144859E+00 4.76788187E-03-1.82149144E-06 3.28878182E-10-2.22546856E-14 2 - 1.62239622E+04 1.66040083E+00 3.91546822E+00 1.84153688E-03 3.48743616E-06 3 --3.32749553E-09 8.49963443E-13 1.62856393E+04 3.51739246E-01 4 - -END - - - -REACTIONS KCAL/MOLE MOLES - -CH3(4)+CH3(4)=ethane(1) 8.260e+17 -1.400 1.000 - -END -""": True, - """ELEMENTS - CI /13.003/ - O - OI /18.000/ - N - -END - -SPECIES - ethane(1) - CH3(4) -END - -THERM ALL - 300.000 1000.000 5000.000 - -ethane(1) H 6 C 2 G100.000 5000.000 954.52 1 - 4.58987205E+00 1.41507042E-02-4.75958084E-06 8.60284590E-10-6.21708569E-14 2 --1.27217823E+04-3.61762003E+00 3.78032308E+00-3.24248354E-03 5.52375224E-05 3 --6.38573917E-08 2.28633835E-11-1.16203404E+04 5.21037799E+00 4 - -CH3(4) H 3 C 1 G100.000 5000.000 1337.62 1 - 3.54144859E+00 4.76788187E-03-1.82149144E-06 3.28878182E-10-2.22546856E-14 2 - 1.62239622E+04 1.66040083E+00 3.91546822E+00 1.84153688E-03 3.48743616E-06 3 --3.32749553E-09 8.49963443E-13 1.62856393E+04 3.51739246E-01 4 - -END - - - -REACTIONS KCAL/MOLE MOLES - -CH3(4)+CH3(4)=ethane(1) 8.260e+17 -1.400 1.000 - -END -""": False, - """ELEMENTS - H - D /2.014/ - T /3.016/ - C - CI /13.003/ - O - OI /18.000/ - N - -END - -SPECIES - ethane(1) - CH3(4) -END - -THERM ALL - 300.000 1000.000 5000.000 - -ethane(1) H 6 C 2 G100.000 5000.000 954.52 1 - 4.58987205E+00 1.41507042E-02-4.75958084E-06 8.60284590E-10-6.21708569E-14 2 --1.27217823E+04-3.61762003E+00 3.78032308E+00-3.24248354E-03 5.52375224E-05 3 --6.38573917E-08 2.28633835E-11-1.16203404E+04 5.21037799E+00 4 - -END - -REACTIONS KCAL/MOLE MOLES - -CH3(4)+CH3(4)=ethane(1) 8.260e+17 -1.400 1.000 - -END -""": False, - } - self.rmg = RMG() - self.dir_name = "temp_dir_for_testing" - self.rmg.output_directory = os.path.join(originalPath, "..", "test", "rmgpy", "test_data", self.dir_name) - - self.tran_dat = """ -! Species Shape LJ-depth LJ-diam DiplMom Polzblty RotRelaxNum Data -! Name Index epsilon/k_B sigma mu alpha Zrot Source -ethane(1) 2 252.301 4.302 0.000 0.000 1.500 ! GRI-Mech -CH3(4) 2 144.001 3.800 0.000 0.000 0.000 ! GRI-Mech - """ - - def teardown_class(self): - os.chdir(originalPath) - # try to remove the tree. If testChemkinToCanteraConversion properly - # ran, the files should already be removed. - try: - shutil.rmtree(self.dir_name) - except OSError: - pass - # go back to the main RMG-Py directory - os.chdir("..") - - def test_chemkin_to_cantera_conversion(self): - """ - Tests that good and bad chemkin files raise proper exceptions - """ - - from cantera.ck2yaml import InputError - - for ck_input, works in self.chemkin_files.items(): - os.chdir(originalPath) - os.mkdir(self.dir_name) - os.chdir(self.dir_name) - - f = open("chem001.inp", "w") - f.write(ck_input) - f.close() - - f = open("tran.dat", "w") - f.write(self.tran_dat) - f.close() - - if works: - self.rmg.generate_cantera_files(os.path.join(os.getcwd(), "chem001.inp")) - else: - with pytest.raises(InputError): - self.rmg.generate_cantera_files(os.path.join(os.getcwd(), "chem001.inp")) - - # clean up - os.chdir(originalPath) - shutil.rmtree(self.dir_name) diff --git a/test/rmgpy/speciesTest.py b/test/rmgpy/speciesTest.py index 6c2f558b21..ec97de5b9d 100644 --- a/test/rmgpy/speciesTest.py +++ b/test/rmgpy/speciesTest.py @@ -458,23 +458,25 @@ def test_cantera(self): rmg_ct_species = rmg_species.to_cantera(use_chemkin_identifier=True) - ct_species = ct.Species.fromCti( - """species(name=u'Ar', - atoms='Ar:1', - thermo=(NASA([200.00, 1000.00], - [ 2.50000000E+00, 0.00000000E+00, 0.00000000E+00, - 0.00000000E+00, 0.00000000E+00, -7.45375000E+02, - 4.37967000E+00]), - NASA([1000.00, 6000.00], - [ 2.50000000E+00, 0.00000000E+00, 0.00000000E+00, - 0.00000000E+00, 0.00000000E+00, -7.45375000E+02, - 4.37967000E+00])), - transport=gas_transport(geom='atom', - diam=3.33, - well_depth=136.501, - dipole=2.0, - polar=1.0, - rot_relax=15.0))""" + ct_species = ct.Species.from_yaml( + """ + name: Ar + composition: {Ar: 1} + thermo: + model: NASA7 + temperature-ranges: [200.00, 1000.00, 6000.00] + data: + - [2.50000000E+00, 0.00000000E+00, 0.00000000E+00, 0.00000000E+00, 0.00000000E+00, -7.45375000E+02, 4.37967000E+00] + - [2.50000000E+00, 0.00000000E+00, 0.00000000E+00, 0.00000000E+00, 0.00000000E+00, -7.45375000E+02, 4.37967000E+00] + transport: + model: gas + geometry: atom + diameter: 3.33 + well-depth: 136.501 + dipole: 2.0 + polarizability: 1.0 + rotational-relaxation: 15.0 + """ ) assert type(rmg_ct_species) == type(ct_species) assert rmg_ct_species.name == ct_species.name diff --git a/test/rmgpy/tools/canteramodelTest.py b/test/rmgpy/tools/canteramodelTest.py index 81b7d6844e..aeb2e372d5 100644 --- a/test/rmgpy/tools/canteramodelTest.py +++ b/test/rmgpy/tools/canteramodelTest.py @@ -28,9 +28,8 @@ ############################################################################### import os - - import numpy as np +import tempfile import rmgpy from rmgpy.quantity import Quantity @@ -42,7 +41,6 @@ def test_ignition_delay(self): """ Test that find_ignition_delay() works. """ - t = np.arange(0, 5, 0.5) P = np.array([0, 0.33, 0.5, 0.9, 2, 4, 15, 16, 16.1, 16.2]) OH = np.array([0, 0.33, 0.5, 0.9, 2, 4, 15, 16, 7, 2]) @@ -82,7 +80,6 @@ class RMGToCanteraTest: def setup_class(self): from rmgpy.chemkin import load_chemkin_file - folder = os.path.join(os.path.dirname(rmgpy.__file__), "tools/data/various_kinetics") chemkin_path = os.path.join(folder, "chem_annotated.inp") @@ -118,19 +115,32 @@ def setup_class(self): self.rmg_surface_ct_reactions.extend(converted_reactions) else: self.rmg_surface_ct_reactions.append(converted_reactions) + + with open(chemkin_surface_path, 'r') as f: + surface_content = f.read() + if "SITE SDEN" in surface_content: + # Inject a dummy phase name 'SURF' + surface_content = surface_content.replace("SITE SDEN", "SITE SURF SDEN") + + # Write to a temporary file to avoid modifying the repo's test data + tf = tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.inp') + tf.write(surface_content) + tf.close() + # Use the temp file for Cantera loading + chemkin_surface_path = tf.name + + job = Cantera() job.surface = True - job.load_chemkin_model(chemkin_path, surface_file=chemkin_surface_path, quiet=True) + job.load_chemkin_model(chemkin_path, surface_file=chemkin_surface_path, quiet=True, permissive=True) self.ct_surface_species = job.surface.species() self.ct_surface_reactions = job.surface.reactions() - def test_species_conversion(self): """ Test that species objects convert properly """ from rmgpy.tools.canteramodel import check_equivalent_cantera_species - for i in range(len(self.ctSpecies)): assert check_equivalent_cantera_species(self.ctSpecies[i], self.rmg_ctSpecies[i]) @@ -139,7 +149,6 @@ def test_reaction_conversion(self): Test that reaction objects convert properly """ from rmgpy.tools.canteramodel import check_equivalent_cantera_reaction - for i in range(len(self.ctReactions)): assert check_equivalent_cantera_reaction(self.ctReactions[i], self.rmg_ctReactions[i]) @@ -148,7 +157,6 @@ def test_surface_species_conversion(self): Test that surface species objects convert properly """ from rmgpy.tools.canteramodel import check_equivalent_cantera_species - for i in range(len(self.ct_surface_species)): #print("Chemkin-to-Cantera:", self.ct_surfaceSpecies[i].input_data) #print("Chemkin-to-RMG-to-Cantera:", self.rmg_surface_ctSpecies[i].input_data) @@ -159,8 +167,7 @@ def test_surface_reaction_conversion(self): Test that surface reaction objects convert properly """ from rmgpy.tools.canteramodel import check_equivalent_cantera_reaction - for i in range(len(self.ct_surface_reactions)): #print("Chemkin-to-Cantera:", self.ct_surfaceReactions[i].input_data) #print("Chemkin-to-RMG-to-Cantera:", self.rmg_surface_ctReactions[i].input_data) - assert check_equivalent_cantera_reaction(self.ct_surface_reactions[i], self.rmg_surface_ct_reactions[i]) \ No newline at end of file + assert check_equivalent_cantera_reaction(self.ct_surface_reactions[i], self.rmg_surface_ct_reactions[i]) diff --git a/test/rmgpy/transportDataTest.py b/test/rmgpy/transportDataTest.py index 59dee0a542..575d41e0f9 100644 --- a/test/rmgpy/transportDataTest.py +++ b/test/rmgpy/transportDataTest.py @@ -161,15 +161,19 @@ def test_to_cantera(self): rmg_ct_transport = transport.to_cantera() import cantera as ct - ct_species = ct.Species.fromCti( - """species(name=u'Ar', - atoms='Ar:1', - transport=gas_transport(geom='atom', - diam=3.33, - well_depth=136.501, - dipole=2.0, - polar=1.0, - rot_relax=15.0))""" + ct_species = ct.Species.from_yaml( + """ + name: Ar + composition: {Ar: 1} + transport: + model: gas + geometry: atom + diameter: 3.33 + well-depth: 136.501 + dipole: 2.0 + polarizability: 1.0 + rotational-relaxation: 15.0 + """ ) ct_transport = ct_species.transport