From ac8c022885d073af8c302dc01cb3c5ec25fdcf7d Mon Sep 17 00:00:00 2001 From: Yves-Gwenael Bourhis Date: Sat, 1 Apr 2023 01:17:17 +0200 Subject: [PATCH] Handling clipping --- README.md | 12 ++--- sigprocessutils/lib/integration.py | 27 +++++++++-- sigprocessutils/lib/wavelib.py | 54 +++++++++++++++++---- sigprocessutils/scripts/audio_downsample.py | 40 +++++++++++---- 4 files changed, 104 insertions(+), 29 deletions(-) diff --git a/README.md b/README.md index ba4787a..e54dd2b 100644 --- a/README.md +++ b/README.md @@ -73,8 +73,8 @@ Or: for i in range(nb_frames): paked_input = faded.readframes(1) input_g, input_d = struct.unpack(' self.output: + LOG.warning("Low Clipping during integration : %s", self.output) + self.output = self.min + elif self.max is not None and self.max < self.output: + LOG.warning("High Clipping during integration : %s", self.output) + self.output = self.max return self.output def reset(self, offset=None, coef=None): @@ -63,3 +76,7 @@ def reset(self, offset=None, coef=None): self.coef = coef self.output = self.rounded_offset self.integrator.reset(-self.offset, self.offset) + + def set_min_max(self, min_value=None, max_value=None): + self.min = min_value + self.max = max_value diff --git a/sigprocessutils/lib/wavelib.py b/sigprocessutils/lib/wavelib.py index 890ed5e..ce1b4f4 100644 --- a/sigprocessutils/lib/wavelib.py +++ b/sigprocessutils/lib/wavelib.py @@ -1,7 +1,13 @@ +from functools import cached_property +import logging import wave from struct import pack as struct_pack from struct import unpack as struct_unpack + +LOG = logging.getLogger(__name__) + + # Bit size struct mapping: # https://docs.python.org/3.6/library/struct.html#format-characters # https://stackoverflow.com/questions/3783677/how-to-read-integers-from-a-file-that-are-24bit-and-little-endian-using-python @@ -34,6 +40,13 @@ 4: 32, } +WIDTH_MIN_MAX = { + 1: (0, 255), + 2: (-32768, 32767), + 3: (-8388608, 8388607), + 4: (-2147483648, 2147483647), +} + class WaveMixin(object): _filename = None @@ -48,7 +61,7 @@ def close(self): def tell(self): return self._wfp.tell() - @property + @cached_property def struct_fmt(self): return '<%d%s' % (self.nchannels, SAMPLE_WIDTHS[self.sampwidth]) @@ -56,34 +69,42 @@ def struct_fmt(self): def wfp(self): return self._wfp - @property + @cached_property def nchannels(self): return self._wfp.getnchannels() - @property + @cached_property def sampwidth(self): return self._wfp.getsampwidth() - @property + @cached_property def framerate(self): return self._wfp.getframerate() - @property + @cached_property def nframes(self): return self._wfp.getnframes() - @property + @cached_property def comptype(self): return self._wfp.getcomptype() - @property + @cached_property def compname(self): return self._wfp.getcompname() - @property + @cached_property def params(self): return self._wfp.getparams() + @cached_property + def min_value(self): + return WIDTH_MIN_MAX[self.sampwidth][0] + + @cached_property + def max_value(self): + return WIDTH_MIN_MAX[self.sampwidth][1] + class WaveRead(WaveMixin): @@ -153,8 +174,23 @@ def setcomptype(self, comptype, compname): def setparams(self, params): self._wfp.setparams(params) + def constrain_value(self, value): + if self.min_value <= value <= self.max_value: + return value + elif value < self.min_value: + LOG.warning("Low Clipping : %s", value) + return self.min_value + elif value > self.max_value: + LOG.warning("High Clipping : %s", value) + return self.max_value + + def constrain_values(self, values): + return tuple( + self.constrain_value(value) for value in values + ) + def pack_data(self, *args): - return struct_pack(self.struct_fmt, *args) + return struct_pack(self.struct_fmt, *self.constrain_values(args)) def writeframesraw(self, data): self._wfp.writeframesraw(data) diff --git a/sigprocessutils/scripts/audio_downsample.py b/sigprocessutils/scripts/audio_downsample.py index c611823..d0f3b74 100644 --- a/sigprocessutils/scripts/audio_downsample.py +++ b/sigprocessutils/scripts/audio_downsample.py @@ -7,7 +7,7 @@ from ..conf.logconf import configure_logging from ..lib.integration import DownSamplingLSBIntegration, Integrator -from ..lib.wavelib import WaveRead, WaveWrite, BITS_WIDTHS, WIDTH_BITS +from ..lib.wavelib import WaveRead, WaveWrite, BITS_WIDTHS, WIDTH_BITS, WIDTH_MIN_MAX def create_argument_parser(parser=None): @@ -28,6 +28,15 @@ def create_argument_parser(parser=None): parser.add_argument( "-o", "--output", required=True ) + parser.add_argument( + "-c", "--clip", action="store_true", + help="Clip during integration, not only during output" + ) + parser.add_argument( + "--no-reduce", action="store_true", + help="Do not reduce of 1 LSB Amplitude. Since Integration produces 1 LSB amplitude noise, we reduce the level " + "of 1 LSB to avoid clipping by default. Set this parameter if you do not want this reduction" + ) return parser @@ -43,12 +52,12 @@ def main(): infile = WaveRead(options.input) input_bits = WIDTH_BITS[infile.sampwidth] - logger.debug('Input sample width = %s bits', input_bits) - logger.debug('Output sample width = %s bits', options.bits) + logger.info('Input sample width = %s bits', input_bits) + logger.info('Output sample width = %s bits', options.bits) input_offset = 0 if input_bits == 8: input_offset = 128 - logger.debug('Input offset = %s', input_offset) + logger.info('Input offset = %s', input_offset) outfile = WaveWrite(options.output, infile.params) outfile.setsampwidth(BITS_WIDTHS[options.bits]) @@ -56,13 +65,26 @@ def main(): output_offset = 0 if options.bits == 8: output_offset = 128 - logger.debug('Output offset = %s', output_offset) - divider = math.pow(2, input_bits) / math.pow(2, options.bits) - logger.debug('Divider = %s', divider) + logger.info('Output offset = %s', output_offset) + numerator = math.pow(2, input_bits)/2 - 1 + logger.info('Input MAX theoretical level = %s', numerator) + denominator = (math.pow(2, options.bits) / 2) - 1 + logger.info('Output MAX theoretical level = %s', denominator) + if not options.no_reduce: + denominator = denominator - 1 + logger.info('Reducing Output MAX level to %s to prevent clipping. Use --no-reduce if you do not want this 1 ' + 'LSB amplitude reduction.', denominator) + divider = numerator / denominator + if options.clip: + min_value, max_value = WIDTH_MIN_MAX[outfile.sampwidth] + logger.info('Clipping values = %s, %s', min_value, max_value) + else: + min_value = max_value = None + logger.info('Divider = %s', divider) try: downsample_chans = [ - DownSamplingLSBIntegration(Integrator()) + DownSamplingLSBIntegration(Integrator(), min_value=min_value, max_value=max_value) for _ in six.moves.xrange(infile.nchannels) ] @@ -71,7 +93,7 @@ def main(): for frame_nb in six.moves.xrange(total_frames): # noqa pylint: disable=W0612 inputs = infile.read_unpacked_frames(1) outputs = tuple( - integrator.transfert( + integrator.transfer( (inp - input_offset) / divider ) + output_offset for integrator, inp in zip(downsample_chans, inputs)