diff --git a/kwave/kmedium.py b/kwave/kmedium.py index 2886299d..89353088 100644 --- a/kwave/kmedium.py +++ b/kwave/kmedium.py @@ -1,6 +1,6 @@ import logging from dataclasses import dataclass -from typing import List +from typing import List, Optional, Sequence, Union import numpy as np @@ -9,42 +9,43 @@ @dataclass class kWaveMedium(object): + """ + Medium properties for k-Wave simulations. + + Note: For heterogeneous medium parameters, medium.sound_speed and medium.density + must be given in matrix form with the same dimensions as kgrid. For homogeneous + medium parameters, these can be given as single numeric values. If the medium is + homogeneous and velocity inputs or outputs are not required, it is not necessary + to specify medium.density. + """ # sound speed distribution within the acoustic medium [m/s] | required to be defined - sound_speed: np.array + sound_speed: Union[float, int, np.ndarray] # reference sound speed used within the k-space operator (phase correction term) [m/s] - sound_speed_ref: np.array = None + sound_speed_ref: Optional[Union[float, int, np.ndarray]] = None # density distribution within the acoustic medium [kg/m^3] - density: np.array = None + density: Optional[Union[float, int, np.ndarray]] = None # power law absorption coefficient [dB/(MHz^y cm)] - alpha_coeff: np.array = None + alpha_coeff: Optional[Union[float, int, np.ndarray]] = None # power law absorption exponent - alpha_power: np.array = None + alpha_power: Optional[Union[float, int, np.ndarray]] = None # optional input to force either the absorption or dispersion terms in the equation of state to be excluded; # valid inputs are 'no_absorption' or 'no_dispersion' - alpha_mode: np.array = None + alpha_mode: Optional[str] = None # frequency domain filter applied to the absorption and dispersion terms in the equation of state - alpha_filter: np.array = None + alpha_filter: Optional[np.ndarray] = None # two element array used to control the sign of absorption and dispersion terms in the equation of state - alpha_sign: np.array = None + alpha_sign: Optional[np.ndarray] = None # parameter of nonlinearity - BonA: np.array = None + BonA: Optional[Union[float, int, np.ndarray]] = None # is the medium absorbing? absorbing: bool = False # is the medium absorbing stokes? stokes: bool = False - # """ - # Note: For heterogeneous medium parameters, medium.sound_speed and - # medium.density must be given in matrix form with the same dimensions as - # kgrid. For homogeneous medium parameters, these can be given as single - # numeric values. If the medium is homogeneous and velocity inputs or - # outputs are not required, it is not necessary to specify medium.density. - # """ - def __post_init__(self): self.sound_speed = np.atleast_1d(self.sound_speed) - def check_fields(self, kgrid_shape: np.ndarray) -> None: + def check_fields(self, kgrid_shape: Sequence[int]) -> None: """ Check whether the given properties are valid @@ -63,17 +64,19 @@ def check_fields(self, kgrid_shape: np.ndarray) -> None: ], "medium.alpha_mode must be set to 'no_absorption', 'no_dispersion', or 'stokes'." # check the absorption filter input is valid - if self.alpha_filter is not None and not (self.alpha_filter.shape == kgrid_shape).all(): + if self.alpha_filter is not None and self.alpha_filter.shape != tuple(kgrid_shape): raise ValueError("medium.alpha_filter must be the same size as the computational grid.") # check the absorption sign input is valid - if self.alpha_sign is not None and (not kwave.utils.checkutils.is_number(self.alpha_sign) or (self.alpha_sign.size != 2)): - raise ValueError( - "medium.alpha_sign must be given as a " "2 element numerical array controlling absorption and dispersion, respectively." - ) + if self.alpha_sign is not None: + alpha_sign_arr = np.atleast_1d(self.alpha_sign) + if alpha_sign_arr.size != 2 or not np.issubdtype(alpha_sign_arr.dtype, np.number): + raise ValueError( + "medium.alpha_sign must be a 2 element numeric array controlling absorption and dispersion, respectively." + ) # check alpha_coeff is non-negative and real - if not np.all(np.isreal(self.alpha_coeff)) or np.any(self.alpha_coeff < 0): + if self.alpha_coeff is not None and (not np.all(np.isreal(self.alpha_coeff)) or np.any(self.alpha_coeff < 0)): raise ValueError("medium.alpha_coeff must be non-negative and real.") def is_defined(self, *fields) -> List[bool]: @@ -102,7 +105,7 @@ def ensure_defined(self, *fields) -> None: None """ for f in fields: - assert getattr(self, f) is not None, f"The field {f} must be not be None" + assert getattr(self, f) is not None, f"The field {f} must not be None" def is_nonlinear(self) -> bool: """ @@ -167,8 +170,10 @@ def _check_absorbing_with_stokes(self): self.ensure_defined("alpha_coeff") # give warning if y is specified - if self.alpha_power is not None and (self.alpha_power.size != 1 or self.alpha_power != 2): - logging.log(logging.WARN, "the axisymmetric code and stokes absorption assume alpha_power = 2, user value ignored.") + if self.alpha_power is not None: + ap = np.asarray(self.alpha_power) + if ap.size != 1 or not np.isclose(ap.item(), 2.0): + logging.warning("the axisymmetric code and stokes absorption assume alpha_power = 2, user value ignored.") # overwrite y value self.alpha_power = 2 diff --git a/tests/test_kmedium.py b/tests/test_kmedium.py new file mode 100644 index 00000000..5908c5ab --- /dev/null +++ b/tests/test_kmedium.py @@ -0,0 +1,247 @@ +""" +kWaveMedium comprehensive test suite + +Tests for code paths not covered by existing tests to improve code coverage. +""" + +from unittest.mock import patch + +import numpy as np +import pytest + +from kwave.data import Vector +from kwave.kgrid import kWaveGrid +from kwave.kmedium import kWaveMedium + + +def test_elastic_properties_access(): + """Test access to elastic code related properties (should raise NotImplementedError)""" + medium = kWaveMedium(sound_speed=1500) + + with pytest.raises(NotImplementedError, match="Elastic simulation"): + _ = medium.sound_speed_shear + + with pytest.raises(NotImplementedError, match="Elastic simulation"): + _ = medium.sound_speed_ref_shear + + with pytest.raises(NotImplementedError, match="Elastic simulation"): + _ = medium.sound_speed_compression + + with pytest.raises(NotImplementedError, match="Elastic simulation"): + _ = medium.sound_speed_ref_compression + + with pytest.raises(NotImplementedError, match="Elastic simulation"): + _ = medium.alpha_coeff_compression + + with pytest.raises(NotImplementedError, match="Elastic simulation"): + _ = medium.alpha_coeff_shear + + +def test_is_defined_method(): + """Test is_defined method with various scenarios""" + medium = kWaveMedium( + sound_speed=1500, + density=1000, + alpha_coeff=0.75 + ) + + # Test single field + assert medium.is_defined('sound_speed') == [True] + assert medium.is_defined('density') == [True] + assert medium.is_defined('alpha_coeff') == [True] + assert medium.is_defined('alpha_power') == [False] + assert medium.is_defined('BonA') == [False] + + # Test multiple fields + result = medium.is_defined('sound_speed', 'density', 'alpha_power', 'BonA') + assert result == [True, True, False, False] + + +def test_ensure_defined_method(): + """Test ensure_defined method""" + medium = kWaveMedium(sound_speed=1500, density=1000) + + # Test defined fields + medium.ensure_defined('sound_speed', 'density') # Should not raise exception + + # Test undefined fields + with pytest.raises(AssertionError, match="alpha_coeff must not be None"): + medium.ensure_defined('alpha_coeff') + + with pytest.raises(AssertionError, match="alpha_power must not be None"): + medium.ensure_defined('alpha_power') + + +def test_is_nonlinear_method(): + """Test is_nonlinear method""" + # Linear medium + medium1 = kWaveMedium(sound_speed=1500) + assert not medium1.is_nonlinear() + + # Nonlinear medium + medium2 = kWaveMedium(sound_speed=1500, BonA=6.0) + assert medium2.is_nonlinear() + + +def test_stokes_mode_alpha_power_none(): + """Test Stokes mode when alpha_power is None""" + medium = kWaveMedium(sound_speed=1500, alpha_coeff=0.75) + + # When alpha_power is None, setting Stokes mode should set it to 2 + medium.set_absorbing(is_absorbing=True, is_stokes=True) + assert medium.alpha_power == 2 + + +def test_stokes_mode_alpha_power_array(): + """Test Stokes mode when alpha_power is an array""" + # Test multi-element array + medium1 = kWaveMedium(sound_speed=1500, alpha_coeff=0.75, alpha_power=np.array([1.5, 1.8])) + + with patch('logging.warning') as mock_warning: + medium1.set_absorbing(is_absorbing=True, is_stokes=True) + mock_warning.assert_called_once() + assert "alpha_power = 2" in mock_warning.call_args[0][0] + + assert medium1.alpha_power == 2 + + +def test_absorbing_without_stokes_alpha_power_validation(): + """Test alpha_power validation in non-Stokes absorbing mode""" + # Test alpha_power must be scalar + medium1 = kWaveMedium(sound_speed=1500, alpha_coeff=0.75, alpha_power=np.array([1.5, 1.8])) + + with pytest.raises(AssertionError, match="must be scalar"): + medium1.set_absorbing(is_absorbing=True, is_stokes=False) + + # Test alpha_power must be in range 0-3 + medium2 = kWaveMedium(sound_speed=1500, alpha_coeff=0.75, alpha_power=-0.5) + + with pytest.raises(AssertionError, match="between 0 and 3"): + medium2.set_absorbing(is_absorbing=True, is_stokes=False) + + medium3 = kWaveMedium(sound_speed=1500, alpha_coeff=0.75, alpha_power=3.5) + + with pytest.raises(AssertionError, match="between 0 and 3"): + medium3.set_absorbing(is_absorbing=True, is_stokes=False) + + +def test_alpha_mode_validation_edge_cases(): + """Test alpha_mode validation edge cases""" + kgrid = kWaveGrid(Vector([64, 64]), Vector([0.1e-3, 0.1e-3])) + + # Test None value (should pass) + medium1 = kWaveMedium(sound_speed=1500, alpha_mode=None) + medium1.check_fields(kgrid.N) # Should not raise exception + + # Test empty string (should fail) + medium2 = kWaveMedium(sound_speed=1500, alpha_mode="") + with pytest.raises(AssertionError): + medium2.check_fields(kgrid.N) + + +def test_alpha_filter_none(): + """Test when alpha_filter is None""" + kgrid = kWaveGrid(Vector([64, 64]), Vector([0.1e-3, 0.1e-3])) + + medium = kWaveMedium(sound_speed=1500, alpha_filter=None) + medium.check_fields(kgrid.N) # Should not raise exception + + +def test_alpha_sign_none(): + """Test when alpha_sign is None""" + kgrid = kWaveGrid(Vector([64, 64]), Vector([0.1e-3, 0.1e-3])) + + medium = kWaveMedium(sound_speed=1500, alpha_sign=None) + medium.check_fields(kgrid.N) # Should not raise exception + +def test_alpha_sign_wrong_size_raises(): + kgrid = kWaveGrid(Vector([64, 64]), Vector([0.1e-3, 0.1e-3])) + medium = kWaveMedium(sound_speed=1500, alpha_sign=np.array([1.0])) + with pytest.raises(ValueError, match="2 element numeric array"): + medium.check_fields(kgrid.N) + +def test_alpha_sign_non_numeric_raises(): + kgrid = kWaveGrid(Vector([64, 64]), Vector([0.1e-3, 0.1e-3])) + medium = kWaveMedium(sound_speed=1500, alpha_sign=np.array(["a", "b"], dtype=object)) + with pytest.raises(ValueError, match="2 element numeric array"): + medium.check_fields(kgrid.N) + +def test_alpha_coeff_none(): + """Test when alpha_coeff is None""" + kgrid = kWaveGrid(Vector([64, 64]), Vector([0.1e-3, 0.1e-3])) + + medium = kWaveMedium(sound_speed=1500, alpha_coeff=None) + medium.check_fields(kgrid.N) # Should not raise exception + + +def test_alpha_coeff_array_validation(): + """Test alpha_coeff array validation""" + kgrid = kWaveGrid(Vector([64, 64]), Vector([0.1e-3, 0.1e-3])) + + # Valid array + medium1 = kWaveMedium(sound_speed=1500, alpha_coeff=np.array([0.5, 0.6, 0.7])) + medium1.check_fields(kgrid.N) # Should not raise exception + + # Array with negative values + medium2 = kWaveMedium(sound_speed=1500, alpha_coeff=np.array([0.5, -0.1, 0.7])) + with pytest.raises(ValueError, match="non-negative and real"): + medium2.check_fields(kgrid.N) + + # Array with complex values + medium3 = kWaveMedium(sound_speed=1500, alpha_coeff=np.array([0.5, 0.6 + 0.1j, 0.7])) + with pytest.raises(ValueError, match="non-negative and real"): + medium3.check_fields(kgrid.N) + + +def test_post_init_sound_speed_conversion(): + """Test sound_speed conversion in __post_init__""" + # Scalar input + medium1 = kWaveMedium(sound_speed=1500) + assert isinstance(medium1.sound_speed, np.ndarray) + assert medium1.sound_speed.shape == (1,) + + # Array input + medium2 = kWaveMedium(sound_speed=np.array([1500, 1600])) + assert isinstance(medium2.sound_speed, np.ndarray) + assert medium2.sound_speed.shape == (2,) + + +def test_stokes_mode_alpha_mode_restrictions(): + """Test alpha_mode restrictions in Stokes mode""" + # Test no_absorption mode + medium1 = kWaveMedium(sound_speed=1500, alpha_coeff=0.75, alpha_mode='no_absorption') + with pytest.raises(NotImplementedError, match="not supported with the axisymmetric code"): + medium1.set_absorbing(is_absorbing=True, is_stokes=True) + + # Test no_dispersion mode + medium2 = kWaveMedium(sound_speed=1500, alpha_coeff=0.75, alpha_mode='no_dispersion') + with pytest.raises(NotImplementedError, match="not supported with the axisymmetric code"): + medium2.set_absorbing(is_absorbing=True, is_stokes=True) + + +def test_absorbing_flags(): + """Test setting of absorbing and stokes flags""" + medium = kWaveMedium(sound_speed=1500, alpha_coeff=0.75, alpha_power=1.5) + + # Initial state + assert not medium.absorbing + assert not medium.stokes + + # Set to non-Stokes absorbing + medium.set_absorbing(is_absorbing=True, is_stokes=False) + assert medium.absorbing + assert not medium.stokes + + # Reset + medium.absorbing = False + medium.stokes = False + + # Set to Stokes absorbing + medium.set_absorbing(is_absorbing=True, is_stokes=True) + assert medium.absorbing + assert medium.stokes + + # Set to non-absorbing + medium.set_absorbing(is_absorbing=False, is_stokes=False) + assert not medium.absorbing + assert not medium.stokes