Source code for directdemod.comm

'''
Signal utilities
'''

import directdemod.constants as constants
import numpy as np
import scipy.signal as signal


'''
This is an object used to store a signal and its properties
e.g. To use this to store a audio signal: audioSig = commSignal(ArrayWithSignalValues, SamplingRate)
Refer: Experiment 1 for testing memory effeciency of the object
'''
[docs]class commSignal: ''' This is an object used to store a signal and its properties '''
[docs] def __init__(self, sampRate, sig = np.array([]), chunker = None): '''Initialize the object Args: sampRate (:obj:`int`): sampling rate in Hz, will be forced to be an integer sig (:obj:`numpy array`, optional): must be one dimentional, will be forced to be a numpy array chunker (:obj:`chunker`, optional): Chunking object, if this signal is going to be processed in chunks ''' self.__chunker = chunker self.__len = len(sig) self.__sampRate = int(sampRate) if self.__sampRate <= 0: raise ValueError("The sampling rate must be greater than zero") self.__sig = np.array(sig) if not self.__sig.size == self.__sig.shape[0]: raise TypeError("The signal array must be 1-D")
@property def length(self): ''':obj:`int`: get length of signal''' return self.__len @property def sampRate(self): ''':obj:`int`: get sampling rate of signal''' return self.__sampRate @property def signal(self): ''':obj:`numpy array`: get signal''' return self.__sig
[docs] def offsetFreq(self, freqOffset): '''Offset signal by a frequency by multiplying a complex envelope Args: freqOffset (:obj:`float`): offset frequency in Hz Returns: :obj:`commSignal`: Signal offset by given frequency (self) ''' offset = 0 if not self.__chunker is None: offset = self.__chunker.get(constants.CHUNK_FREQOFFSET, 0) self.__chunker.set(constants.CHUNK_FREQOFFSET, offset + self.length) self.__sig *= np.exp(-1.0j*2.0*np.pi*freqOffset*np.arange(offset, offset + self.length)/self.sampRate) return self
[docs] def filter(self, filt): '''Apply a filter to the signal Args: filt (:obj:`filter`): filter object Returns: :obj:`commSignal`: Updated signal (self) ''' self.updateSignal(filt.applyOn(self.signal)) return self
[docs] def bwLim(self, tsampRate, strict = False, uniq = "abcd"): '''Limit the bandwidth by downsampling Args: tsampRate (:obj:`int`): target sample rate strict (:obj:`bool`, optional): if true, the target sample rate will be matched exactly uniq (:obj:`str`, optional): in case chunked signal, uniq is to differentiate different bwLim funcs Returns: :obj:`commSignal`: Updated signal (self) ''' if self.__sampRate < tsampRate: raise ValueError("The target sampling rate must be less than current sampling rate") if strict: # will be depreciated later on, try not to use self.__sig = signal.resample(self.signal, int(tsampRate * self.length/self.sampRate)) self.__sampRate = tsampRate self.__len = len(self.signal) else: jumpIndex = int(self.sampRate / tsampRate) offset = 0 if not self.__chunker is None: offset = self.__chunker.get(constants.CHUNK_BWLIM + uniq, 0) nextOff = (jumpIndex - (self.length - offset)%jumpIndex)%jumpIndex self.__chunker.set(constants.CHUNK_BWLIM + uniq, nextOff) self.__sig = self.signal[offset::jumpIndex] self.__sampRate = int(self.sampRate/jumpIndex) self.__len = len(self.signal) return self
[docs] def funcApply(self, func): ''' Applies a function to the signal Args: func (function): function to be applied Returns: :obj:`commSignal`: Updated signal (self) ''' self.updateSignal(func(self.signal)) return self
[docs] def extend(self, sig): ''' Adds another signal to this one at the tail end Args: sig (:obj:`commSignal`): Signal to be added Returns: :obj:`commSignal`: Updated signal (self) ''' if self.length == 0: self.__sampRate = sig.sampRate if not self.__sampRate == sig.sampRate: raise TypeError("Signals must have same sampling rate to be extended") self.updateSignal(np.concatenate([self.signal, sig.signal])) return self
[docs] def updateSignal(self, sig): ''' Updates the signal Args: sig (:obj:`numpy array`): New signal array Returns: :obj:`commSignal`: Updated signal (self) ''' self.__sig = np.array(sig) if not self.__sig.size <= self.__sig.shape[0]: raise TypeError("The signal array must be 1-D") self.__len = len(self.signal) return self