Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 35 additions & 21 deletions mhkit/acoustics/analysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,17 @@
from mhkit.dolfyn.time import epoch2dt64, dt642epoch


def _check_numeric(value, name: str):
if np.issubdtype(type(value), np.ndarray):
value = value.item()
if not (
isinstance(value, (int, float))
or np.issubdtype(type(value), np.integer)
or np.issubdtype(type(value), np.floating)
):
raise TypeError(f"{name} must be a numeric type (int or float).")


def _fmax_warning(
fn: Union[int, float, np.ndarray], fmax: Union[int, float, np.ndarray]
) -> Union[int, float, np.ndarray]:
Expand All @@ -69,11 +80,6 @@ def _fmax_warning(
The adjusted maximum frequency limit, ensuring it does not exceed the Nyquist frequency.
"""

if not isinstance(fn, (int, float, np.ndarray)):
raise TypeError("'fn' must be a numeric type (int or float).")
if not isinstance(fmax, (int, float, np.ndarray)):
raise TypeError("'fmax' must be a numeric type (int or float).")

if fmax > fn:
warnings.warn(
f"`fmax` = {fmax} is greater than the Nyquist frequency. Setting"
Expand Down Expand Up @@ -120,10 +126,8 @@ def minimum_frequency(
if not np.issubdtype(water_depth.dtype, np.number):
raise TypeError("'water_depth' must be a numeric type or array of numerics.")

if not isinstance(c, (int, float)):
raise TypeError("'c' must be a numeric type (int or float).")
if not isinstance(c_seabed, (int, float)):
raise TypeError("'c_seabed' must be a numeric type (int or float).")
_check_numeric(c, "c")
_check_numeric(c_seabed, "c_seabed")

if np.any(water_depth <= 0):
raise ValueError("All elements of 'water_depth' must be positive numbers.")
Expand All @@ -143,6 +147,7 @@ def sound_pressure_spectral_density(
pressure: xr.DataArray,
fs: Union[int, float],
bin_length: Union[int, float] = 1,
fft_length: Optional[Union[int, float]] = None,
rms: bool = True,
) -> xr.DataArray:
"""
Expand All @@ -167,6 +172,8 @@ def sound_pressure_spectral_density(
Data collection sampling rate [Hz]
bin_length: int or float
Length of time in seconds to create FFTs. Default: 1.
fft_length: int or float, optional
Length of FFT to use. If None, uses bin_length * fs. Default: None.
rms: bool
If True, calculates the mean-squared SPSD. Set to False to
calculate standard SPSD. Default: True.
Expand All @@ -180,20 +187,23 @@ def sound_pressure_spectral_density(
# Type checks
if not isinstance(pressure, xr.DataArray):
raise TypeError("'pressure' must be an xarray.DataArray.")
if not isinstance(fs, (int, float)):
raise TypeError("'fs' must be a numeric type (int or float).")
if not isinstance(bin_length, (int, float)):
raise TypeError("'bin_length' must be a numeric type (int or float).")
_check_numeric(fs, "fs")
_check_numeric(bin_length, "bin_length")

# Ensure that 'pressure' has a 'time' coordinate
if "time" not in pressure.dims:
raise ValueError("'pressure' must be indexed by 'time' dimension.")

# window length of each time series
nbin = bin_length * fs
if fft_length is not None:
_check_numeric(fft_length, "fft_length")
nfft = fft_length
else:
nfft = nbin

# Use dolfyn PSD functionality
binner = VelBinner(n_bin=nbin, fs=fs, n_fft=nbin)
binner = VelBinner(n_bin=nbin, fs=fs, n_fft=nfft)
# Always 50% overlap if numbers reshape perfectly
# Mean square sound pressure
psd = binner.power_spectral_density(pressure, freq_units="Hz")
Expand Down Expand Up @@ -221,9 +231,9 @@ def sound_pressure_spectral_density(
"units": pressure.units + "^2/Hz",
"long_name": long_name,
"fs": fs,
"nbin": str(bin_length) + " s",
"bin_length": bin_length,
"overlap": "50%",
"nfft": nbin,
"n_fft": nfft,
},
)

Expand All @@ -234,6 +244,7 @@ def apply_calibration(
spsd: xr.DataArray,
sensitivity_curve: xr.DataArray,
fill_value: Union[float, int, np.ndarray],
interp_method: str = "linear",
) -> xr.DataArray:
"""
Applies custom calibration to spectral density values.
Expand All @@ -248,6 +259,9 @@ def apply_calibration(
fill_value: float or int
Value with which to fill missing values from the calibration curve,
in units of dB rel 1 V^2/uPa^2.
interp_method: str
Interpolation method to use when interpolating the calibration curve
to the frequencies in 'spsd'. Default is 'linear'.

Returns
-------
Expand All @@ -259,8 +273,7 @@ def apply_calibration(
raise TypeError("'spsd' must be an xarray.DataArray.")
if not isinstance(sensitivity_curve, xr.DataArray):
raise TypeError("'sensitivity_curve' must be an xarray.DataArray.")
if not isinstance(fill_value, (int, float, np.ndarray)):
raise TypeError("'fill_value' must be a numeric type (int or float).")
_check_numeric(fill_value, "fill_value")

# Ensure 'freq' dimension exists in 'spsd'
if "freq" not in spsd.dims:
Expand Down Expand Up @@ -295,7 +308,7 @@ def apply_calibration(
freq = sensitivity_curve.dims[0]
# Interpolate calibration curve to desired value
calibration = sensitivity_curve.interp(
{freq: spsd_calibrated["freq"]}, method="linear"
{freq: spsd_calibrated["freq"]}, method=interp_method
)
# Fill missing with provided value
calibration = calibration.fillna(fill_value)
Expand Down Expand Up @@ -340,6 +353,7 @@ def sound_pressure_spectral_density_level(spsd: xr.DataArray) -> xr.DataArray:
attrs={
"units": "dB re 1 uPa^2/Hz",
"long_name": "Sound Pressure Spectral Density Level",
"time_resolution": spsd.attrs["bin_length"],
},
)

Expand Down Expand Up @@ -571,8 +585,8 @@ def band_aggregate(
for val in octave:
if not isinstance(val, int) or (val <= 0):
raise TypeError("'octave' must contain positive integers.")
if not isinstance(fmin, int) or (fmin <= 0):
raise TypeError("'fmin' must be a positive integer.")
_check_numeric(fmin, "fmax")
_check_numeric(fmax, "fmax")
if fmax <= fmin: # also checks that fmax is positive
raise ValueError("'fmax' must be greater than 'fmin'.")

Expand Down
11 changes: 6 additions & 5 deletions mhkit/acoustics/graphics.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import xarray as xr
import matplotlib.pyplot as plt

from .analysis import _fmax_warning
from .analysis import _fmax_warning, _check_numeric


def plot_spectrogram(
Expand Down Expand Up @@ -61,8 +61,9 @@ def plot_spectrogram(
Figure plot axis
"""

if not isinstance(fmin, (int, float)) or not isinstance(fmax, (int, float)):
raise TypeError("fmin and fmax must be numeric types.")
# Type checks
_check_numeric(fmin, "fmin")
_check_numeric(fmax, "fmax")
if fmin >= fmax:
raise ValueError("fmin must be less than fmax.")

Expand Down Expand Up @@ -128,8 +129,8 @@ def plot_spectra(
Figure plot axis
"""

if not isinstance(fmin, (int, float)) or not isinstance(fmax, (int, float)):
raise TypeError("fmin and fmax must be numeric types.")
_check_numeric(fmin, "fmin")
_check_numeric(fmax, "fmax")
if fmin >= fmax:
raise ValueError("fmin must be less than fmax.")

Expand Down
39 changes: 15 additions & 24 deletions mhkit/acoustics/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@
import xarray as xr
from scipy.io import wavfile

from mhkit.acoustics.analysis import _check_numeric


def _read_wav_metadata(f: BinaryIO) -> dict:
"""
Expand Down Expand Up @@ -132,10 +134,9 @@ def _calculate_voltage_and_time(
raise TypeError("Raw audio data 'raw' must be a numpy.ndarray.")
if not isinstance(bits_per_sample, int):
raise TypeError("'bits_per_sample' must be an integer.")
if not isinstance(peak_voltage, (int, float)):
raise TypeError("'peak_voltage' must be numeric (int or float).")
if not isinstance(start_time, (str, np.datetime64)):
raise TypeError("'start_time' must be a string or np.datetime64.")
_check_numeric(peak_voltage, "peak_voltage")

length = raw.shape[0] // fs # length of recording in seconds

Expand Down Expand Up @@ -196,15 +197,12 @@ def read_hydrophone(

if not isinstance(filename, (str, Path)):
raise TypeError("Filename must be a string or a pathlib.Path object.")
if not isinstance(peak_voltage, (int, float)):
raise TypeError("'peak_voltage' must be numeric (int or float).")
if sensitivity is not None and not isinstance(sensitivity, (int, float)):
raise TypeError("'sensitivity' must be numeric (int, float) or None.")
if not isinstance(gain, (int, float)):
raise TypeError("'gain' must be numeric (int or float).")
if sensitivity is not None:
_check_numeric(sensitivity, "sensitivity")
_check_numeric(peak_voltage, "peak_voltage")
_check_numeric(gain, "gain")
if not isinstance(start_time, (str, np.datetime64)):
raise TypeError("'start_time' must be a string or np.datetime64")

if (sensitivity is not None) and (sensitivity > 0):
raise ValueError(
"Hydrophone calibrated sensitivity should be entered as a negative number."
Expand All @@ -225,9 +223,9 @@ def read_hydrophone(
# If sensitivity is provided, convert to sound pressure
if sensitivity is not None:
# Subtract gain
# Hydrophone with sensitivity of -177 dB and gain of -3 dB = sensitivity of -174 dB
# Hydrophone with sensitivity of -177 dB and gain of 3 dB = sensitivity of -174 dB
if gain:
sensitivity -= gain
sensitivity += gain
# Convert calibration from dB rel 1 V/uPa into ratio
sensitivity = 10 ** (sensitivity / 20) # V/uPa

Expand Down Expand Up @@ -511,25 +509,18 @@ def export_audio(
-------
None
"""

if not isinstance(filename, str):
raise TypeError("'filename' must be a string.")

if not isinstance(pressure, xr.DataArray):
raise TypeError("'pressure' must be an xarray.DataArray.")

if not hasattr(pressure, "values") or not isinstance(pressure.values, np.ndarray):
raise TypeError("'pressure.values' must be a numpy.ndarray.")

if not hasattr(pressure, "sensitivity") or not isinstance(
pressure.sensitivity, (int, float)
):
raise TypeError("'pressure.sensitivity' must be a numeric type (int or float).")

if not hasattr(pressure, "fs") or not isinstance(pressure.fs, (int, float)):
raise TypeError("'pressure.fs' must be a numeric type (int or float).")

if not isinstance(gain, (int, float)):
raise TypeError("'gain' must be a numeric type (int or float).")
if not hasattr(pressure, "sensitivity"):
_check_numeric(pressure.sensitivity, "pressure.sensitivity")
if not hasattr(pressure, "fs"):
_check_numeric(pressure.fs, "pressure.fs")
_check_numeric(gain, "gain")

# Convert from Pascals to UPa
upa = pressure.values.T * 1e6
Expand Down
6 changes: 2 additions & 4 deletions mhkit/acoustics/sel.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,9 +134,7 @@ def sound_exposure_level(
exposure = np.trapezoid(band * w, band["freq"])

# Sound exposure level (L_{E,p}) = (L_{p,rms} + 10log10(t))
sel = 10 * np.log10(exposure / reference) + 10 * np.log10(
spsd.attrs["nfft"] / spsd.attrs["fs"] # n_points / (n_points/s)
)
sel = 10 * np.log10(exposure / reference) + 10 * np.log10(spsd.attrs["bin_length"])

out = xr.DataArray(
sel.astype(np.float32),
Expand All @@ -145,7 +143,7 @@ def sound_exposure_level(
"units": "dB re 1 uPa^2 s",
"long_name": long_name,
"weighting_group": group,
"integration_time": spsd.attrs["nbin"],
"integration_time": spsd.attrs["bin_length"],
"freq_band_min": fmin,
"freq_band_max": fmax,
},
Expand Down
15 changes: 8 additions & 7 deletions mhkit/acoustics/spl.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import numpy as np
import xarray as xr

from .analysis import _fmax_warning, _create_frequency_bands
from .analysis import _check_numeric, _fmax_warning, _create_frequency_bands


def _argument_check(spsd, fmin, fmax):
Expand All @@ -45,10 +45,8 @@ def _argument_check(spsd, fmin, fmax):
# Type checks
if not isinstance(spsd, xr.DataArray):
raise TypeError("'spsd' must be an xarray.DataArray.")
if not isinstance(fmin, int):
raise TypeError("'fmin' must be an integer.")
if not isinstance(fmax, int):
raise TypeError("'fmax' must be an integer.")
_check_numeric(fmin, "fmin")
_check_numeric(fmax, "fmax")

# Ensure 'freq' and 'time' dimensions are present
if ("freq" not in spsd.dims) or ("time" not in spsd.dims):
Expand All @@ -59,9 +57,9 @@ def _argument_check(spsd, fmin, fmax):
raise ValueError(
"'spsd' must have 'fs' (sampling frequency) in its attributes."
)
if "nfft" not in spsd.attrs:
if "n_fft" not in spsd.attrs:
raise ValueError(
"'spsd' must have 'nfft' (sampling frequency) in its attributes."
"'spsd' must have 'n_fft' (number of points in each FFT) in its attributes."
)

# Value checks
Expand Down Expand Up @@ -119,6 +117,7 @@ def sound_pressure_level(
attrs={
"units": "dB re 1 uPa",
"long_name": "Sound Pressure Level",
"time_resolution": spsd.attrs["bin_length"],
"freq_band_min": fmin,
"freq_band_max": fmax,
},
Expand Down Expand Up @@ -235,6 +234,7 @@ def third_octave_sound_pressure_level(
{
"units": "dB re 1 uPa",
"long_name": "Third Octave Sound Pressure Level",
"time_resolution": spsd.attrs["bin_length"],
}
)

Expand Down Expand Up @@ -272,6 +272,7 @@ def decidecade_sound_pressure_level(
{
"units": "dB re 1 uPa",
"long_name": "Decidecade Sound Pressure Level",
"time_resolution": spsd.attrs["bin_length"],
}
)

Expand Down
3 changes: 2 additions & 1 deletion mhkit/tests/acoustics/test_analysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ def test_sound_pressure_spectral_density(self):
self.assertIn("freq", spsd.dims)
self.assertIn("time", spsd.dims)
self.assertEqual(spsd.attrs["units"], "Pa^2/Hz")
self.assertEqual(spsd.attrs["nbin"], f"{bin_length} s")
self.assertEqual(spsd.attrs["bin_length"], bin_length)
self.assertEqual(spsd.attrs["n_fft"], bin_length * fs)

# Calculate expected number of segments
overlap = 0.0
Expand Down
Loading