diff --git a/mhkit/acoustics/analysis.py b/mhkit/acoustics/analysis.py index 145757fd..80e9030a 100644 --- a/mhkit/acoustics/analysis.py +++ b/mhkit/acoustics/analysis.py @@ -50,6 +50,17 @@ from mhkit.dolfyn.time import epoch2dt64, dt642epoch +def _check_numeric(value, name: str): + if np.issubdtype(type(value), np.ndarray): + value = value.item() + if not ( + isinstance(value, (int, float)) + or np.issubdtype(type(value), np.integer) + or np.issubdtype(type(value), np.floating) + ): + raise TypeError(f"{name} must be a numeric type (int or float).") + + def _fmax_warning( fn: Union[int, float, np.ndarray], fmax: Union[int, float, np.ndarray] ) -> Union[int, float, np.ndarray]: @@ -69,11 +80,6 @@ def _fmax_warning( The adjusted maximum frequency limit, ensuring it does not exceed the Nyquist frequency. """ - if not isinstance(fn, (int, float, np.ndarray)): - raise TypeError("'fn' must be a numeric type (int or float).") - if not isinstance(fmax, (int, float, np.ndarray)): - raise TypeError("'fmax' must be a numeric type (int or float).") - if fmax > fn: warnings.warn( f"`fmax` = {fmax} is greater than the Nyquist frequency. Setting" @@ -120,10 +126,8 @@ def minimum_frequency( if not np.issubdtype(water_depth.dtype, np.number): raise TypeError("'water_depth' must be a numeric type or array of numerics.") - if not isinstance(c, (int, float)): - raise TypeError("'c' must be a numeric type (int or float).") - if not isinstance(c_seabed, (int, float)): - raise TypeError("'c_seabed' must be a numeric type (int or float).") + _check_numeric(c, "c") + _check_numeric(c_seabed, "c_seabed") if np.any(water_depth <= 0): raise ValueError("All elements of 'water_depth' must be positive numbers.") @@ -143,6 +147,7 @@ def sound_pressure_spectral_density( pressure: xr.DataArray, fs: Union[int, float], bin_length: Union[int, float] = 1, + fft_length: Optional[Union[int, float]] = None, rms: bool = True, ) -> xr.DataArray: """ @@ -167,6 +172,8 @@ def sound_pressure_spectral_density( Data collection sampling rate [Hz] bin_length: int or float Length of time in seconds to create FFTs. Default: 1. + fft_length: int or float, optional + Length of FFT to use. If None, uses bin_length * fs. Default: None. rms: bool If True, calculates the mean-squared SPSD. Set to False to calculate standard SPSD. Default: True. @@ -180,10 +187,8 @@ def sound_pressure_spectral_density( # Type checks if not isinstance(pressure, xr.DataArray): raise TypeError("'pressure' must be an xarray.DataArray.") - if not isinstance(fs, (int, float)): - raise TypeError("'fs' must be a numeric type (int or float).") - if not isinstance(bin_length, (int, float)): - raise TypeError("'bin_length' must be a numeric type (int or float).") + _check_numeric(fs, "fs") + _check_numeric(bin_length, "bin_length") # Ensure that 'pressure' has a 'time' coordinate if "time" not in pressure.dims: @@ -191,9 +196,14 @@ def sound_pressure_spectral_density( # window length of each time series nbin = bin_length * fs + if fft_length is not None: + _check_numeric(fft_length, "fft_length") + nfft = fft_length + else: + nfft = nbin # Use dolfyn PSD functionality - binner = VelBinner(n_bin=nbin, fs=fs, n_fft=nbin) + binner = VelBinner(n_bin=nbin, fs=fs, n_fft=nfft) # Always 50% overlap if numbers reshape perfectly # Mean square sound pressure psd = binner.power_spectral_density(pressure, freq_units="Hz") @@ -221,9 +231,9 @@ def sound_pressure_spectral_density( "units": pressure.units + "^2/Hz", "long_name": long_name, "fs": fs, - "nbin": str(bin_length) + " s", + "bin_length": bin_length, "overlap": "50%", - "nfft": nbin, + "n_fft": nfft, }, ) @@ -234,6 +244,7 @@ def apply_calibration( spsd: xr.DataArray, sensitivity_curve: xr.DataArray, fill_value: Union[float, int, np.ndarray], + interp_method: str = "linear", ) -> xr.DataArray: """ Applies custom calibration to spectral density values. @@ -248,6 +259,9 @@ def apply_calibration( fill_value: float or int Value with which to fill missing values from the calibration curve, in units of dB rel 1 V^2/uPa^2. + interp_method: str + Interpolation method to use when interpolating the calibration curve + to the frequencies in 'spsd'. Default is 'linear'. Returns ------- @@ -259,8 +273,7 @@ def apply_calibration( raise TypeError("'spsd' must be an xarray.DataArray.") if not isinstance(sensitivity_curve, xr.DataArray): raise TypeError("'sensitivity_curve' must be an xarray.DataArray.") - if not isinstance(fill_value, (int, float, np.ndarray)): - raise TypeError("'fill_value' must be a numeric type (int or float).") + _check_numeric(fill_value, "fill_value") # Ensure 'freq' dimension exists in 'spsd' if "freq" not in spsd.dims: @@ -295,7 +308,7 @@ def apply_calibration( freq = sensitivity_curve.dims[0] # Interpolate calibration curve to desired value calibration = sensitivity_curve.interp( - {freq: spsd_calibrated["freq"]}, method="linear" + {freq: spsd_calibrated["freq"]}, method=interp_method ) # Fill missing with provided value calibration = calibration.fillna(fill_value) @@ -340,6 +353,7 @@ def sound_pressure_spectral_density_level(spsd: xr.DataArray) -> xr.DataArray: attrs={ "units": "dB re 1 uPa^2/Hz", "long_name": "Sound Pressure Spectral Density Level", + "time_resolution": spsd.attrs["bin_length"], }, ) @@ -571,8 +585,8 @@ def band_aggregate( for val in octave: if not isinstance(val, int) or (val <= 0): raise TypeError("'octave' must contain positive integers.") - if not isinstance(fmin, int) or (fmin <= 0): - raise TypeError("'fmin' must be a positive integer.") + _check_numeric(fmin, "fmax") + _check_numeric(fmax, "fmax") if fmax <= fmin: # also checks that fmax is positive raise ValueError("'fmax' must be greater than 'fmin'.") diff --git a/mhkit/acoustics/graphics.py b/mhkit/acoustics/graphics.py index fb61361f..17fb9e94 100644 --- a/mhkit/acoustics/graphics.py +++ b/mhkit/acoustics/graphics.py @@ -24,7 +24,7 @@ import xarray as xr import matplotlib.pyplot as plt -from .analysis import _fmax_warning +from .analysis import _fmax_warning, _check_numeric def plot_spectrogram( @@ -61,8 +61,9 @@ def plot_spectrogram( Figure plot axis """ - if not isinstance(fmin, (int, float)) or not isinstance(fmax, (int, float)): - raise TypeError("fmin and fmax must be numeric types.") + # Type checks + _check_numeric(fmin, "fmin") + _check_numeric(fmax, "fmax") if fmin >= fmax: raise ValueError("fmin must be less than fmax.") @@ -128,8 +129,8 @@ def plot_spectra( Figure plot axis """ - if not isinstance(fmin, (int, float)) or not isinstance(fmax, (int, float)): - raise TypeError("fmin and fmax must be numeric types.") + _check_numeric(fmin, "fmin") + _check_numeric(fmax, "fmax") if fmin >= fmax: raise ValueError("fmin must be less than fmax.") diff --git a/mhkit/acoustics/io.py b/mhkit/acoustics/io.py index c1743b0e..cf836e4f 100644 --- a/mhkit/acoustics/io.py +++ b/mhkit/acoustics/io.py @@ -49,6 +49,8 @@ import xarray as xr from scipy.io import wavfile +from mhkit.acoustics.analysis import _check_numeric + def _read_wav_metadata(f: BinaryIO) -> dict: """ @@ -132,10 +134,9 @@ def _calculate_voltage_and_time( raise TypeError("Raw audio data 'raw' must be a numpy.ndarray.") if not isinstance(bits_per_sample, int): raise TypeError("'bits_per_sample' must be an integer.") - if not isinstance(peak_voltage, (int, float)): - raise TypeError("'peak_voltage' must be numeric (int or float).") if not isinstance(start_time, (str, np.datetime64)): raise TypeError("'start_time' must be a string or np.datetime64.") + _check_numeric(peak_voltage, "peak_voltage") length = raw.shape[0] // fs # length of recording in seconds @@ -196,15 +197,12 @@ def read_hydrophone( if not isinstance(filename, (str, Path)): raise TypeError("Filename must be a string or a pathlib.Path object.") - if not isinstance(peak_voltage, (int, float)): - raise TypeError("'peak_voltage' must be numeric (int or float).") - if sensitivity is not None and not isinstance(sensitivity, (int, float)): - raise TypeError("'sensitivity' must be numeric (int, float) or None.") - if not isinstance(gain, (int, float)): - raise TypeError("'gain' must be numeric (int or float).") + if sensitivity is not None: + _check_numeric(sensitivity, "sensitivity") + _check_numeric(peak_voltage, "peak_voltage") + _check_numeric(gain, "gain") if not isinstance(start_time, (str, np.datetime64)): raise TypeError("'start_time' must be a string or np.datetime64") - if (sensitivity is not None) and (sensitivity > 0): raise ValueError( "Hydrophone calibrated sensitivity should be entered as a negative number." @@ -225,9 +223,9 @@ def read_hydrophone( # If sensitivity is provided, convert to sound pressure if sensitivity is not None: # Subtract gain - # Hydrophone with sensitivity of -177 dB and gain of -3 dB = sensitivity of -174 dB + # Hydrophone with sensitivity of -177 dB and gain of 3 dB = sensitivity of -174 dB if gain: - sensitivity -= gain + sensitivity += gain # Convert calibration from dB rel 1 V/uPa into ratio sensitivity = 10 ** (sensitivity / 20) # V/uPa @@ -511,25 +509,18 @@ def export_audio( ------- None """ + if not isinstance(filename, str): raise TypeError("'filename' must be a string.") - if not isinstance(pressure, xr.DataArray): raise TypeError("'pressure' must be an xarray.DataArray.") - if not hasattr(pressure, "values") or not isinstance(pressure.values, np.ndarray): raise TypeError("'pressure.values' must be a numpy.ndarray.") - - if not hasattr(pressure, "sensitivity") or not isinstance( - pressure.sensitivity, (int, float) - ): - raise TypeError("'pressure.sensitivity' must be a numeric type (int or float).") - - if not hasattr(pressure, "fs") or not isinstance(pressure.fs, (int, float)): - raise TypeError("'pressure.fs' must be a numeric type (int or float).") - - if not isinstance(gain, (int, float)): - raise TypeError("'gain' must be a numeric type (int or float).") + if not hasattr(pressure, "sensitivity"): + _check_numeric(pressure.sensitivity, "pressure.sensitivity") + if not hasattr(pressure, "fs"): + _check_numeric(pressure.fs, "pressure.fs") + _check_numeric(gain, "gain") # Convert from Pascals to UPa upa = pressure.values.T * 1e6 diff --git a/mhkit/acoustics/sel.py b/mhkit/acoustics/sel.py index 781db387..9083dc16 100644 --- a/mhkit/acoustics/sel.py +++ b/mhkit/acoustics/sel.py @@ -134,9 +134,7 @@ def sound_exposure_level( exposure = np.trapezoid(band * w, band["freq"]) # Sound exposure level (L_{E,p}) = (L_{p,rms} + 10log10(t)) - sel = 10 * np.log10(exposure / reference) + 10 * np.log10( - spsd.attrs["nfft"] / spsd.attrs["fs"] # n_points / (n_points/s) - ) + sel = 10 * np.log10(exposure / reference) + 10 * np.log10(spsd.attrs["bin_length"]) out = xr.DataArray( sel.astype(np.float32), @@ -145,7 +143,7 @@ def sound_exposure_level( "units": "dB re 1 uPa^2 s", "long_name": long_name, "weighting_group": group, - "integration_time": spsd.attrs["nbin"], + "integration_time": spsd.attrs["bin_length"], "freq_band_min": fmin, "freq_band_max": fmax, }, diff --git a/mhkit/acoustics/spl.py b/mhkit/acoustics/spl.py index 2e9ef86a..3be616c9 100644 --- a/mhkit/acoustics/spl.py +++ b/mhkit/acoustics/spl.py @@ -19,7 +19,7 @@ import numpy as np import xarray as xr -from .analysis import _fmax_warning, _create_frequency_bands +from .analysis import _check_numeric, _fmax_warning, _create_frequency_bands def _argument_check(spsd, fmin, fmax): @@ -45,10 +45,8 @@ def _argument_check(spsd, fmin, fmax): # Type checks if not isinstance(spsd, xr.DataArray): raise TypeError("'spsd' must be an xarray.DataArray.") - if not isinstance(fmin, int): - raise TypeError("'fmin' must be an integer.") - if not isinstance(fmax, int): - raise TypeError("'fmax' must be an integer.") + _check_numeric(fmin, "fmin") + _check_numeric(fmax, "fmax") # Ensure 'freq' and 'time' dimensions are present if ("freq" not in spsd.dims) or ("time" not in spsd.dims): @@ -59,9 +57,9 @@ def _argument_check(spsd, fmin, fmax): raise ValueError( "'spsd' must have 'fs' (sampling frequency) in its attributes." ) - if "nfft" not in spsd.attrs: + if "n_fft" not in spsd.attrs: raise ValueError( - "'spsd' must have 'nfft' (sampling frequency) in its attributes." + "'spsd' must have 'n_fft' (number of points in each FFT) in its attributes." ) # Value checks @@ -119,6 +117,7 @@ def sound_pressure_level( attrs={ "units": "dB re 1 uPa", "long_name": "Sound Pressure Level", + "time_resolution": spsd.attrs["bin_length"], "freq_band_min": fmin, "freq_band_max": fmax, }, @@ -235,6 +234,7 @@ def third_octave_sound_pressure_level( { "units": "dB re 1 uPa", "long_name": "Third Octave Sound Pressure Level", + "time_resolution": spsd.attrs["bin_length"], } ) @@ -272,6 +272,7 @@ def decidecade_sound_pressure_level( { "units": "dB re 1 uPa", "long_name": "Decidecade Sound Pressure Level", + "time_resolution": spsd.attrs["bin_length"], } ) diff --git a/mhkit/tests/acoustics/test_analysis.py b/mhkit/tests/acoustics/test_analysis.py index ce792fb3..b60f5f91 100644 --- a/mhkit/tests/acoustics/test_analysis.py +++ b/mhkit/tests/acoustics/test_analysis.py @@ -52,7 +52,8 @@ def test_sound_pressure_spectral_density(self): self.assertIn("freq", spsd.dims) self.assertIn("time", spsd.dims) self.assertEqual(spsd.attrs["units"], "Pa^2/Hz") - self.assertEqual(spsd.attrs["nbin"], f"{bin_length} s") + self.assertEqual(spsd.attrs["bin_length"], bin_length) + self.assertEqual(spsd.attrs["n_fft"], bin_length * fs) # Calculate expected number of segments overlap = 0.0