'''
Object for filters
'''
import directdemod.constants as constants
import scipy.signal as signal
'''
Abstract model of a class, to keep the models consistent
Any filter must inherit this abstract class
The general structure is as follows
The goal of having such a parent abstract class is that, the children are forced to implement the required methods
'''
[docs]class filter:
'''
This is a parent object of all filters, it implements all the necessary properties. Refer to experiment 3 for details.
'''
[docs] def __init__(self, b, a, storeState = True, zeroPhase = False, initOut = None):
'''Initialize the object
Args:
b (:obj:`list`): list of 'b' constants of filter
a (:obj:`list`): list of 'a' constants of filter
storeState (:obj:`bool`, optional): Whether the filter state must be stored. Useful when filtering a chunked signal to avoid border effects.
zeroPhase (:obj:`bool`, optional): Whether the filter has to provide zero phase error to the input i.e. no delay in the output (Note: Enabling this will disable 'storeState' and 'initOut')
initOut (:obj:`list`, optional): Initial condition of the filter
'''
self.__storeState = storeState
self.__zeroPhase = zeroPhase
self.__initOut = initOut
if self.__storeState and self.__zeroPhase:
self.__storeState = False
if (not self.__initOut == None) and self.__zeroPhase:
self.__initOut = None
if self.__storeState:
self.__zi = signal.lfilter_zi(b, a)
if not self.__initOut == None:
self.__zi = None
self.__b = b
self.__a = a
[docs] def applyOn(self, x):
'''Apply the filter to a given array of signal
Args:
x (:obj:`numpy array`): The signal array on which the filter needs to be applied
Returns:
:obj:`numpy array`: Filtered signal array
'''
if self.__storeState:
if self.__zi is None:
self.__zi = signal.lfiltic(self.__b, self.__a, x, self.__initOut)
retDat, self.__zi = signal.lfilter(self.__b, self.__a, x, zi = self.__zi)
return retDat
else:
if self.__zeroPhase:
return signal.filtfilt(self.__b, self.__a, x)
else:
return signal.lfilter(self.__b, self.__a, x)
@property
def getA(self):
''':obj:`list`: Get 'a' of the filter'''
return self.__a
@property
def getB(self):
''':obj:`list`: Get 'b' of the filter'''
return self.__b
'''
A simple rolling average filter
'''
[docs]class rollingAverage(filter):
'''
A simple rolling average filter
'''
[docs] def __init__(self, n = 3, storeState = True, zeroPhase = False, initOut = None):
'''Initialize the object
Args:
n (:obj:`int`, optional): size of the rolling window
storeState (:obj:`bool`, optional): Whether the filter state must be stored. Useful when filtering a chunked signal to avoid border effects.
zeroPhase (:obj:`bool`, optional): Whether the filter has to provide zero phase error to the input i.e. no delay in the output (Note: Enabling this will disable 'storeState' and 'initOut')
initOut (:obj:`list`, optional): Initial condition of the filter
'''
self.__n = n
super(rollingAverage, self).__init__([1.0/n]*n, [1], storeState, zeroPhase, initOut)
'''
Blackman Harris filter
'''
[docs]class blackmanHarris(filter):
'''
Blackman Harris filter
'''
[docs] def __init__(self, n, storeState = True, zeroPhase = False, initOut = None):
'''Initialize the object
Args:
n (:obj:`int`): size of the window
storeState (:obj:`bool`, optional): Whether the filter state must be stored. Useful when filtering a chunked signal to avoid border effects.
zeroPhase (:obj:`bool`, optional): Whether the filter has to provide zero phase error to the input i.e. no delay in the output (Note: Enabling this will disable 'storeState' and 'initOut')
initOut (:obj:`list`, optional): Initial condition of the filter
'''
self.__n = n
super(blackmanHarris, self).__init__(signal.blackmanharris(self.__n), [1], storeState, zeroPhase, initOut)
'''
Blackman Harris filter (by convolving)
'''
[docs]class blackmanHarrisConv:
'''
Blackman Harris filter (by convolving, Not recommended for large signals)
'''
[docs] def __init__(self, n = 151):
'''Initialize the object
Args:
n (:obj:`int`, optional): size of the window
'''
self.__n = n
self.__window = signal.blackmanharris(self.__n)
[docs] def applyOn(self, sig):
'''Apply the filter to a given array of signal
Args:
x (:obj:`numpy array`): The signal array on which the filter needs to be applied
Returns:
:obj:`numpy array`: Filtered signal array
'''
return signal.convolve(sig, self.__window, mode='same')
'''
Hamming filter
'''
[docs]class hamming(filter):
'''
Hamming filter
'''
[docs] def __init__(self, n, storeState = True, zeroPhase = False, initOut = None):
'''Initialize the object
Args:
n (:obj:`int`): size of the window
storeState (:obj:`bool`, optional): Whether the filter state must be stored. Useful when filtering a chunked signal to avoid border effects.
zeroPhase (:obj:`bool`, optional): Whether the filter has to provide zero phase error to the input i.e. no delay in the output (Note: Enabling this will disable 'storeState' and 'initOut')
initOut (:obj:`list`, optional): Initial condition of the filter
'''
self.__n = n
super(hamming, self).__init__(signal.hamming(self.__n), [1], storeState, zeroPhase, initOut)
'''
Gaussian filter
'''
[docs]class gaussian(filter):
'''
Gaussian filter
'''
[docs] def __init__(self, n, sigma, storeState = True, zeroPhase = False, initOut = None):
'''Initialize the object
Args:
n (:obj:`int`): size of the window
sigma (:obj:`float`): The standard deviation
storeState (:obj:`bool`, optional): Whether the filter state must be stored. Useful when filtering a chunked signal to avoid border effects.
zeroPhase (:obj:`bool`, optional): Whether the filter has to provide zero phase error to the input i.e. no delay in the output (Note: Enabling this will disable 'storeState' and 'initOut')
initOut (:obj:`list`, optional): Initial condition of the filter
'''
self.__n = n
self.__sigma = sigma
super(gaussian, self).__init__(signal.gaussian(self.__n, self.__sigma), [1], storeState, zeroPhase, initOut)
'''
Butterworth filter
'''
[docs]class butter(filter):
'''
Butterworth filter
'''
[docs] def __init__(self, Fs, cutoffA, cutoffB = None, n = 6, typeFlt = constants.FLT_LP, storeState = True, zeroPhase = False, initOut = None):
'''Initialize the object
Args:
Fs (:obj:`int`): Sampling frequency of signal
cutoffA (:obj:`float`): desired cutoff A of filter in Hz
cutoffB (:obj:`float`, optional): desired cutoff B of filter in Hz
n (:obj:`int`, optional): Order of filter
type (:obj:`constant`, optional): constants.FLT_LP to constants.FLT_BS, see constants module
storeState (:obj:`bool`, optional): Whether the filter state must be stored. Useful when filtering a chunked signal to avoid border effects.
zeroPhase (:obj:`bool`, optional): Whether the filter has to provide zero phase error to the input i.e. no delay in the output (Note: Enabling this will disable 'storeState' and 'initOut')
initOut (:obj:`list`, optional): Initial condition of the filter
'''
self.__Fs = Fs
self.__cutoffA = cutoffA
self.__cutoffB = cutoffB
self.__n = n
self.__type = typeFlt
if (self.__type == constants.FLT_BP or self.__type == constants.FLT_BS) and self.__cutoffB is None:
raise ValueError("CutoffB must be given")
if self.__type == constants.FLT_LP:
self.__b, self.__a = signal.butter(self.__n, self.__cutoffA / (0.5 * self.__Fs), btype='lowpass')
elif self.__type == constants.FLT_HP:
self.__b, self.__a = signal.butter(self.__n, self.__cutoffA / (0.5 * self.__Fs), btype='highpass')
elif self.__type == constants.FLT_BP:
self.__b, self.__a = signal.butter(self.__n, [self.__cutoffA / (0.5 * self.__Fs), self.__cutoffB / (0.5 * self.__Fs)] , btype='bandpass')
elif self.__type == constants.FLT_BS:
self.__b, self.__a = signal.butter(self.__n, [self.__cutoffA / (0.5 * self.__Fs), self.__cutoffB / (0.5 * self.__Fs)] , btype='bandstop')
else:
raise ValueError("Invalid filter type")
super(butter, self).__init__(self.__b, self.__a, storeState, zeroPhase, initOut)
'''
Remez band filter
'''
[docs]class remez(filter):
'''
Remez band filter
'''
[docs] def __init__(self, Fs, bands, gains, ntaps = 128, storeState = True, zeroPhase = False, initOut = None):
'''Initialize the object
Args:
Fs (:obj:`int`): sampling frequency in Hz
bands (:obj:`list`): non-overlapping list of bands (in Hz) in increasing order. e.g [[0, 100], [400, 500], [600, 700]]
gains (:obj:`float`): Corresponding gains of the bands e.g. [0, 1, 0.5]
ntaps (:obj:`int`, optional): Number of taps of filter (number of terms in filter)
storeState (:obj:`bool`, optional): Whether the filter state must be stored. Useful when filtering a chunked signal to avoid border effects.
zeroPhase (:obj:`bool`, optional): Whether the filter has to provide zero phase error to the input i.e. no delay in the output (Note: Enabling this will disable 'storeState' and 'initOut')
initOut (:obj:`list`, optional): Initial condition of the filter
'''
if len(bands) == 0:
raise ValueError("Atleast one band must be given")
if bands[-1][1] >= (Fs/2):
raise ValueError("Last band must end before (Fs/2)Hz")
self.__bands = []
for i in bands:
self.__bands.extend(i)
self.__gains = gains
if not len(self.__bands) == 2*len(self.__gains):
raise ValueError("Invalid bands/gains values")
super(remez, self).__init__(signal.remez(ntaps, self.__bands, self.__gains, Hz = Fs), [1], storeState, zeroPhase, initOut)
'''
To be implemented later if needed
'''
class medianFilter:
def __init__(self, n = 5):
self.__n = n
def applyOn(self, sig):
return signal.medfilt(sig, self.__n)