Source code for pyfar.classes.filter

import deepdiff
import warnings

import numpy as np
import scipy.signal as spsignal

import pyfar
from copy import deepcopy


def atleast_3d_first_dim(arr):
    arr = np.asarray(arr)
    ndim = np.ndim(arr)

    if ndim < 2:
        arr = np.atleast_2d(arr)
    if ndim < 3:
        return arr[np.newaxis]
    else:
        return arr


def pop_state_from_kwargs(**kwargs):
    kwargs.pop('zi', None)
    warnings.warn(
        "This filter function does not support saving the filter state")
    return kwargs


def lfilter(coefficients, signal, zi):
    return spsignal.lfilter(coefficients[0], coefficients[1], signal, zi=zi)


def filtfilt(coefficients, signal, **kwargs):
    kwargs = pop_state_from_kwargs(kwargs)
    return spsignal.filtfilt(
        coefficients[0], coefficients[1], signal, **kwargs)


def sosfilt(sos, signal, zi):
    return spsignal.sosfilt(sos, signal, zi=zi)


def sosfiltfilt(sos, signal, **kwargs):
    kwargs = pop_state_from_kwargs(kwargs)
    return spsignal.sosfiltfilt(sos, signal, **kwargs)


[docs]def extend_sos_coefficients(sos, order): """Extend a set of SOS filter coefficients to match a required filter order by adding sections with coefficients resulting in an ideal frequency response. Parameters ---------- sos : array-like The second order section filter coefficients. order : int The order to which the coefficients are to be extended. Returns ------- sos_ext : array-like The extended second order section coefficients. """ sos_order = sos.shape[0] if sos_order == order: return sos pad_len = order-sos_order sos_ext = np.zeros((pad_len, 6)) sos_ext[:, 3] = 1. sos_ext[:, 0] = 1. return np.vstack((sos, sos_ext))
class Filter(object): """ Container class for digital filters. This is an abstract class method, only used for the shared processing method used for the application of a filter on a signal. """ def __init__( self, coefficients=None, filter_func=None, sampling_rate=None, state=None, comment=None): """ Initialize a general Filter object. Parameters ---------- coefficients : array, double The filter coefficients as an array. filter_func : ``'default'``, ``'zerophase'`` ``'default'`` applies a direct form II transposed time domain filter based on the standard difference equation. ``'zerophase'`` uses the same filter twice, first forward, then backwards resulting in zero phase. state : array, optional The state of the filter from a priory knowledge. Returns ------- Filter The filter object. """ super().__init__() if coefficients is not None: coefficients = atleast_3d_first_dim(coefficients) self._coefficients = coefficients if state is not None: if coefficients is None: raise ValueError( "Cannot set a state without filter coefficients") state = atleast_3d_first_dim(state) self._initialized = True else: self._initialized = False self._state = state self._filter_func = None self._FILTER_FUNCS = { 'default': None, 'zerophase': None} self._sampling_rate = sampling_rate self._comment = comment def initialize(self): raise NotImplementedError("Abstract class method") @property def sampling_rate(self): """Sampling rate of the filter in Hz. The sampling rate is set upon initialization and cannot be changed after the object has been created. """ return self._sampling_rate @property def shape(self): """ The shape of the filter. """ return self._coefficients.shape[:-2] @property def size(self): """ The size of the filter, that is all elements in the filter object. """ return np.prod(self.shape) @property def state(self): """ The current state of the filter as an array with dimensions corresponding to the order of the filter and number of filter channels. """ return self._state @property def filter_func(self): raise NotImplementedError("Abstract class method") @filter_func.setter def filter_func(self, filter_func): raise NotImplementedError("Abstract class method") def process(self, signal, reset=True): """Apply the filter to a signal. Parameters ---------- signal : Signal The data to be filtered as Signal object. reset : bool, optional If set to ``True``, the filter state will be reset to zero before the filter is applied to the signal. The default is ``'True'``. Returns ------- filtered : Signal A filtered copy of the input signal. """ if not isinstance(signal, pyfar.Signal): raise ValueError("The input needs to be a haiopy.Signal object.") if self.sampling_rate != signal.sampling_rate: raise ValueError( "The sampling rates of filter and signal do not match") if reset is True: self.reset() filtered_signal_data = np.zeros( (self.size, *signal.time.shape), dtype=signal.time.dtype) if self.state is not None: for idx, (coeff, state) in enumerate( zip(self._coefficients, self._state)): filtered_signal_data[idx, ...], new_state = self.filter_func( coeff, filtered_signal_data[idx, ...], state) else: for idx, coeff in enumerate(self._coefficients): filtered_signal_data[idx, ...] = self.filter_func( coeff, signal.time, zi=None) filtered_signal = deepcopy(signal) filtered_signal.time = np.squeeze(filtered_signal_data) return filtered_signal def reset(self): """Reset the filter state by filling it with zeros.""" if self._state is not None: self._state = np.zeros_like(self._state) else: self._state = None @property def comment(self): """Get comment.""" return self._comment @comment.setter def comment(self, value): """Set comment.""" self._comment = str(value) def copy(self): """Return a copy of the Filter object.""" return deepcopy(self) def _encode(self): """Return dictionary for the encoding.""" return self.copy().__dict__ @classmethod def _decode(cls, obj_dict): """Decode object based on its respective object dictionary.""" obj = cls() obj.__dict__.update(obj_dict) return obj def __eq__(self, other): """Check for equality of two objects.""" return not deepdiff.DeepDiff(self, other)
[docs]class FilterFIR(Filter): """ Filter object for FIR filters. """
[docs] def __init__( self, coefficients, sampling_rate, filter_func=lfilter): """ Initialize an finite impulse response (FIR) Filter object. Parameters ---------- coefficients : array, double The filter coefficients as an array with dimensions (number of channels, number of filter coefficients) sampling_rate : number The sampling rate of the filter in Hz. filter_func : ``'default'``, ``'zerophase'`` ``'default'`` applies a direct form II transposed time domain filter based on the standard difference equation. ``'zerophase'`` uses the same filter twice, first forward, then backwards resulting in zero phase. state : array, optional The state of the filter from a priory knowledge. Returns ------- FilterFIR The FIR filter object. """ b = np.atleast_2d(coefficients) a = np.zeros_like(b) a[..., 0] = 1 coeff = np.stack((b, a), axis=-2) super().__init__(coefficients=coeff, sampling_rate=sampling_rate) self._FILTER_FUNCS = { 'default': lfilter, 'zerophase': filtfilt} self._filter_func = filter_func
@property def filter_func(self): return self._filter_func @filter_func.setter def filter_func(self, filter_func): if type('filter_func') == str: filter_func = self._FILTER_FUNCS[filter_func] self._filter_func = filter_func
[docs]class FilterIIR(Filter): """ Filter object for IIR filters. For IIR filters with high orders, second order section IIR filters using FilterSOS should be considered. """
[docs] def __init__( self, coefficients, sampling_rate, filter_func=lfilter): """IIR filter Initialize an infinite impulse response (IIR) Filter object. Parameters ---------- coefficients : array, double The filter coefficients as an array, with shape (number of channels, number of coefficients in the nominator, number of coefficients in the denominator) sampling_rate : number The sampling rate of the filter in Hz. filter_func : ``'default'``, ``'zerophase'`` ``'default'`` applies a direct form II transposed time domain filter based on the standard difference equation. ``'zerophase'`` uses the same filter twice, first forward, then backwards resulting in zero phase. state : array, optional The state of the filter from a priory knowledge. Returns ------- FilterIIR The IIR filter object. """ coeff = np.atleast_2d(coefficients) super().__init__(coefficients=coeff, sampling_rate=sampling_rate) self._FILTER_FUNCS = { 'default': lfilter, 'zerophase': filtfilt} self._filter_func = filter_func
@property def filter_func(self): return self._filter_func @filter_func.setter def filter_func(self, filter_func): if type('filter_func') == str: filter_func = self._FILTER_FUNCS[filter_func] self._filter_func = filter_func
[docs]class FilterSOS(Filter): """ Filter object for IIR filters as second order sections (SOS). """
[docs] def __init__( self, coefficients, sampling_rate, filter_func=sosfilt): """ Initialize a second order sections (SOS) Filter object. Parameters ---------- coefficients : array, double The filter coefficients as an array with dimensions (n_filter_chan, n_sections, 6) sampling_rate : number The sampling rate of the filter in Hz. filter_func : ``'default'``, ``'zerophase'`` ```'default'``` applies a direct form II transposed time domain filter based on the standard difference equation. ``'zerophase'`` uses the same filter twice, first forward, then backwards resulting in zero phase. state : array, optional The state of the filter from a priory knowledge. Returns ------- FilterSOS The SOS filter object. """ coeff = np.atleast_2d(coefficients) if coeff.shape[-1] != 6: raise ValueError( "The coefficients are not in line with a second order", "section filter structure.") super().__init__( coefficients=coeff, sampling_rate=sampling_rate) self._FILTER_FUNCS = { 'default': sosfilt, 'zerophase': sosfiltfilt } self._filter_func = filter_func
@property def filter_func(self): return self._filter_func