Source code for directdemod.decode_noaa

'''
noaa specific
'''
from directdemod import source, sink, chunker, comm, constants, filters, demod_am, demod_fm
import numpy as np
import logging, colorsys
import scipy.signal as signal
from scipy import stats
import scipy.ndimage
from datetime import datetime, timedelta
import matplotlib.pyplot as plt
from scipy import ndimage
from scipy import misc
from PIL import Image

'''
Object to decode NOAA APT
'''

[docs]class decode_noaa: ''' Object to decode NOAA APT '''
[docs] def __init__(self, sigsrc, offset, bw = None): '''Initialize the object Args: sigsrc (:obj:`commSignal`): IQ data source offset (:obj:`float`): Frequency offset of source in Hz bw (:obj:`int`, optional): Bandwidth ''' self.__bw = bw if self.__bw is None: self.__bw = constants.NOAA_FMBW self.__sigsrc = sigsrc self.__offset = offset self.__extractedAudio = None self.__image = None self.__syncA = None self.__syncB = None self.__asyncA = None self.__asyncB = None self.__audOut = None self.__asyncApk = None self.__asyncAtime = None self.__asyncBpk = None self.__asyncBtime = None self.__useNormCorrelate = None self.__color = None self.__useful = 0 self.__chIDA = None self.__chIDB = None
@property def channelID(self): '''get channel ID's Returns: :obj:`list`: [channelIDA, channelIDB] ''' if self.__image is None: self.getImage return [self.__chIDA, self.__chIDB] @property def useful(self): '''See if some data was found or not: 10 consecutive syncs apart by 0.5s+-error Returns: :obj:`int`: 0 if not found, 1 if found ''' if self.__syncA is None or self.__syncB is None: self.getCrudeSync() return self.__useful @property def getAudio(self): '''Get the audio from data Returns: :obj:`commSignal`: An audio signal ''' if self.__extractedAudio is None: self.__extractedAudio = self.__audio() return self.__extractedAudio
[docs] def getMapImage(self, cTime, destFileRot, destFileNoRot, satellite, tleFile = None): '''Get the map overlay of the image Args: cTime (:obj:`datetime`): Time of start of capture in UTC tleFile (:obj:`str`, optional): TLE file location, pulls latest from internet if not given destFile (:obj:`str`): location where to store the image satellite (:obj:`str`): Satellite name, ex: NOAA 19 etc. ''' try: from pyorbital.orbital import Orbital from pyorbital import tlefile except ImportError: logging.error('pyorbital not installed') return basemapPresent = False cartopyPresent = False try: from mpl_toolkits.basemap import Basemap basemapPresent = True except ImportError: logging.warning('basemap not installed') if not basemapPresent: try: import cartopy.crs as ccrs import cartopy.feature cartopyPresent = True except ImportError: logging.error('Both basemap and cartopy not installed. Please install either.') return def angleFromCoordinate(lat1, long1, lat2, long2): # source: https://stackoverflow.com/questions/3932502/calculate-angle-between-two-latitude-longitude-points lat1 = np.radians(lat1) long1 = np.radians(long1) lat2 = np.radians(lat2) long2 = np.radians(long2) dLon = (long2 - long1) y = np.sin(dLon) * np.cos(lat2) x = np.cos(lat1) * np.sin(lat2) - np.sin(lat1) * np.cos(lat2) * np.cos(dLon) brng = np.arctan2(y, x) brng = np.degrees(brng) brng = (brng + 360) % 360 brng = 360 - brng return brng if tleFile is None: orb = Orbital(satellite) else: orb = Orbital(satellite, tle_file=tleFile) im = self.getImageA im = im[:, 85:995] oim = im[:] tdelta = int(im.shape[0]/16) if tdelta < 10: tdelta = 10 top = orb.get_lonlatalt(cTime + timedelta(seconds=int(im.shape[0]/4) - tdelta))[:2][::-1] bot = orb.get_lonlatalt(cTime + timedelta(seconds=int(im.shape[0]/4) + tdelta))[:2][::-1] center = orb.get_lonlatalt(cTime + timedelta(seconds=int(im.shape[0]/4)))[:2][::-1] rot = angleFromCoordinate(*bot, *top) if basemapPresent: rotated_img = ndimage.rotate(im, rot) rimg = rotated_img[:] w = rotated_img.shape[1] h = rotated_img.shape[0] m = Basemap(projection='cass',lon_0 = center[1],lat_0 = center[0],width = w*4000*0.81,height = h*4000*0.81, resolution = "i") m.drawcoastlines(color='yellow') m.drawcountries(color='yellow') im = plt.imshow(rotated_img, cmap='gray', extent=(*plt.xlim(), *plt.ylim())) plt.savefig(destFileRot, bbox_inches='tight', dpi=1000) img = misc.imread(destFileRot) img = img[109:-109,109:-109,:] img = misc.imresize(img, rimg.shape) if 90 < (rot%360) < 270: img = ndimage.rotate(img, -1 * (rot%180)) else: img = ndimage.rotate(img, -1 * rot) rf = int((img.shape[0]/2) - oim.shape[0]/2) re = int((img.shape[0]/2) + oim.shape[0]/2) cf = int((img.shape[1]/2) - oim.shape[1]/2) ce = int((img.shape[1]/2) + oim.shape[1]/2) img = img[rf:re,cf:ce] img = Image.fromarray(img) try: img.save(destFileNoRot) except: logging.error('Image reverse rotation failed') elif cartopyPresent: def add_m(center, dx, dy): # source: https://stackoverflow.com/questions/7477003/calculating-new-longitude-latitude-from-old-n-meters new_latitude = center[0] + (dy / 6371000.0) * (180 / np.pi) new_longitude = center[1] + (dx / 6371000.0) * (180 / np.pi) / np.cos(center[0] * np.pi/180) return [new_latitude, new_longitude] fig = plt.figure() img = ndimage.rotate(im, rot) rimg = img[:] dx = img.shape[0]*4000/2*0.81 # in meters dy = img.shape[1]*4000/2*0.81 # in meters leftbot = add_m(center, -1*dx, -1*dy) righttop = add_m(center, dx, dy) img_extent = (leftbot[1], righttop[1], leftbot[0], righttop[0]) ax = plt.axes(projection=ccrs.PlateCarree()) ax.imshow(img, origin='upper', cmap='gray', extent=img_extent, transform=ccrs.PlateCarree()) ax.coastlines(resolution='50m', color='yellow', linewidth=1) ax.add_feature(cartopy.feature.BORDERS, linestyle='-', edgecolor='yellow') plt.savefig(destFileRot, bbox_inches='tight', dpi=1000) img = misc.imread(destFileRot) img = img[109:-109,109:-109,:] img = misc.imresize(img, rimg.shape) if 90 < (rot%360) < 270: img = ndimage.rotate(img, -1 * (rot%180)) else: img = ndimage.rotate(img, -1 * rot) rf = int((img.shape[0]/2) - oim.shape[0]/2) re = int((img.shape[0]/2) + oim.shape[0]/2) cf = int((img.shape[1]/2) - oim.shape[1]/2) ce = int((img.shape[1]/2) + oim.shape[1]/2) img = img[rf:re,cf:ce] img = Image.fromarray(img) try: img.save(destFileNoRot) except: logging.error('Image reverse rotation failed')
@property def getImage(self): '''Get the image from data Returns: :obj:`numpy array`: A matrix of pixel values ''' if self.__image is None: if self.__audOut is None or self.__syncA is None or self.__syncB is None: self.getCrudeSync() logging.info('Beginning image extraction') # get audio amSig = self.__audOut # apply a bandpass filter to remove any noise amSig.filter(filters.butter(amSig.sampRate, 400, 4400, typeFlt = constants.FLT_BP, zeroPhase = True)) # am demodulate amSig = self.__getAM(amSig) # convert sync from samples to time csyncA = self.__syncA / self.__syncCrudeSampRate csyncB = self.__syncB / self.__syncCrudeSampRate # convert back to sample number csyncA *= amSig.sampRate csyncB *= amSig.sampRate # store uncorrected csync ucsync = csyncA[:] # correct any missing syncs csyncA = self.__fillSync(csyncA, amSig.length) csyncB = self.__fillSync(csyncB, amSig.length) # we want channel A first always if csyncB[0] < csyncA[0]: csyncB.pop(0) if csyncB[-1] < csyncA[-1]: csyncA.pop(-1) if not len(csyncA) == len(csyncB): logging.error("Number of syncA and syncB unequal") csyncB = np.array(csyncA) + int(0.25 * amSig.sampRate) self.__image = [] imageBuffer = [] backupImage = [] numPixels = int(0.5/constants.NOAA_T) imgLine = amSig.signal[:int(len(amSig.signal)/numPixels) * numPixels] imgLine = np.reshape(imgLine, (numPixels, int(len(imgLine)/numPixels))) imgLine = np.median(imgLine, axis = -1) (self.__low, self.__high) = np.percentile(imgLine, (0.5, 99.5)) # vars for image correction lowFifo, highFifo = [], [] corrfifo = [] corrfifosig = [] corrfifosig2 = [] ncorrfifo = 3 lcorr = None lcorrsig = None statecorr = 0 valuesPixCorr = [] valuesSigCorr = [] self.__slope = None self.__intercept = None chidFifo1 = [] chidFifo2 = [] for syncIndex in range(len(csyncA)): logging.info('Decoding line %d of %d lines', syncIndex + 1, len(csyncA)) startIA = int(csyncA[syncIndex]) startIB = int(csyncB[syncIndex]) endIA = startIB endIB = startIB + int(0.25 * amSig.sampRate) if 1+syncIndex < len(csyncA): endIB = int(csyncA[syncIndex + 1]) if endIB > amSig.length or endIA > amSig.length or startIA < 0 or startIB < 0: continue imgLineA = amSig.signal[startIA:endIA] imgLineB = amSig.signal[startIB:endIB] imgLineA = signal.resample(imgLineA, int(int(len(imgLineA)/(numPixels*0.5)) * (numPixels*0.5))) imgLineB = signal.resample(imgLineB, int(int(len(imgLineB)/(numPixels*0.5)) * (numPixels*0.5))) imgLineA = np.reshape(imgLineA, (int(numPixels*0.5), int(len(imgLineA)/(numPixels*0.5)))) imgLineB = np.reshape(imgLineB, (int(numPixels*0.5), int(len(imgLineB)/(numPixels*0.5)))) # image color correction based on sync if csyncA[syncIndex] in ucsync: for j in range(len(constants.NOAA_SYNCA)): if constants.NOAA_SYNCA[j] == 0: lowFifo.extend(imgLineA[j]) else: highFifo.extend(imgLineA[j]) lowFifo = lowFifo[-1*constants.NOAA_COLORCORRECT_FIFOLEN:] highFifo = highFifo[-1*constants.NOAA_COLORCORRECT_FIFOLEN:] val11, val244 = np.median(lowFifo), np.median(highFifo) val0 = val11 - (val244 - val11)*(11 - 0)/(244 - 11) val255 = val11 - (val244 - val11)*(11 - 255)/(244 - 11) self.__low = val0 self.__high = val255 # image color correction based on calibration strip lengthOfStrip = int((len(constants.NOAA_SYNCA) * constants.NOAA_T) * amSig.sampRate) stripVal = np.median(amSig.signal[startIA - lengthOfStrip:startIA]) corrfifo.append(255 * (stripVal - self.__low) / (self.__high - self.__low)) corrfifo = corrfifo[-1*ncorrfifo:] outcorr = np.median(corrfifo) corrfifosig.append(stripVal) corrfifosig = corrfifosig[-1*ncorrfifo:] outcorrsig = np.median(corrfifosig) lengthOfStrip2 = int((len(constants.NOAA_SYNCB) * constants.NOAA_T) * amSig.sampRate) stripVal2 = np.median(amSig.signal[startIB - lengthOfStrip2:startIB]) corrfifosig2.append(stripVal2) corrfifosig2 = corrfifosig2[-1*ncorrfifo:] outcorrsig2 = np.median(corrfifosig2) chidFifo1.append(outcorrsig2) chidFifo1 = chidFifo1[-100:] chidFifo2.append(outcorrsig) chidFifo2 = chidFifo2[-100:] if lcorr is None or abs(outcorr - lcorr) > 255.0/16: logging.info('Color correction state: %d', statecorr) if statecorr == 0 and not lcorrsig is None: valuesPixCorr = [lcorr, outcorr] valuesSigCorr = [lcorrsig, outcorrsig] statecorr = 1 elif 1 <= statecorr <= 6: if outcorr - valuesPixCorr[-1] > 2*255.0/(8*3): valuesPixCorr.append(outcorr) valuesSigCorr.append(outcorrsig) statecorr += 1 else: statecorr = 0 elif statecorr == 7: if valuesPixCorr[-1] - outcorr > 2*255.0/3: valuesPixCorr = [outcorr] + valuesPixCorr valuesSigCorr = [outcorrsig] + valuesSigCorr self.__slope, self.__intercept, r_value, p_value, std_err = stats.linregress(valuesSigCorr,np.array([i for i in range(9)]) * 255.0/8) logging.info('Color correction bingo slope: %f intercept: %f', self.__slope, self.__intercept) if len(chidFifo1) > 1+64+8: self.__chIDA = int(np.round((self.__slope*np.median(chidFifo1[-1-64-8:-1-64]) + self.__intercept) / (255.0/8))) self.__chIDB = int(np.round((self.__slope*np.median(chidFifo2[-1-64-8:-1-64]) + self.__intercept) / (255.0/8))) chidFifo1 = [] chidFifo2 = [] statecorr = 0 else: statecorr = 0 lcorr = outcorr lcorrsig = outcorrsig imgLineA = np.median(imgLineA, axis = -1) imgLineB = np.median(imgLineB, axis = -1) imgLine = np.concatenate([imgLineA, imgLineB]) if self.__slope is None or self.__intercept is None: imageBuffer.append(imgLine[:]) imgLine = np.round(255 * (imgLine - self.__low) / (self.__high - self.__low)) imgLine[imgLine < 0] = 0 imgLine[imgLine > 255] = 255 imgLine = imgLine.astype(np.uint8) backupImage.append(imgLine) else: if len(imageBuffer) > 0: for i in imageBuffer: imgLineb = np.round(i*self.__slope + self.__intercept) imgLineb[imgLineb < 0] = 0 imgLineb[imgLineb > 255] = 255 imgLineb = imgLineb.astype(np.uint8) self.__image.append(imgLineb) imageBuffer = [] imgLine = np.round(imgLine*self.__slope + self.__intercept) imgLine[imgLine < 0] = 0 imgLine[imgLine > 255] = 255 imgLine = imgLine.astype(np.uint8) self.__image.append(imgLine) # use backup if color correction didnot work if len(self.__image) == 0: self.__image = backupImage # get mean lengths of lines lens = [len(i) for i in self.__image] acceptedLen = max(set(lens), key=lens.count) self.__image = np.array([i for i in self.__image if len(i) == acceptedLen]) logging.info('Image extraction complete') return self.__image def __fillSync(self, csync, maxLen): '''Filters and fills missed syncs to help generate image Args: csync (:obj:`list`): List of detected syncs maxLen (:obj:`int`): Frequency offset of source in Hz Returns: :obj:`list`: corrected syncs ''' syncDIff = np.diff(csync) modeSyncDIff = max(set(syncDIff), key=list(syncDIff).count) wiggleRoom = 200 validSyncs = [] for i in range(len(csync) - 1): if abs(csync[i+1] - csync[i] - modeSyncDIff) < wiggleRoom: if csync[i] not in validSyncs: validSyncs.append(csync[i]) if csync[i+1] not in validSyncs: validSyncs.append(csync[i+1]) correctedSyncs = validSyncs[:] # initial correction c = validSyncs[0] - modeSyncDIff while(c > wiggleRoom): correctedSyncs.append(c) c -= modeSyncDIff # later corrections anchor = 0 c = modeSyncDIff while(validSyncs[anchor] + c < maxLen): if (anchor + 1) < len(validSyncs) and (abs(validSyncs[anchor + 1] - c - validSyncs[anchor]) < wiggleRoom or c + validSyncs[anchor] > validSyncs[anchor + 1]): anchor += 1 c = modeSyncDIff else: correctedSyncs.append(validSyncs[anchor] + c) c += modeSyncDIff return list(np.sort(correctedSyncs)) @property def getImageA(self): '''Get Image A from the extracted image Returns: :obj:`numpy array`: A matrix list of pixel ''' if self.__image is None: self.getImage return self.__image[:,:1040] @property def getImageB(self): '''Get Image B from the extracted image Returns: :obj:`numpy array`: A matrix list of pixel ''' if self.__image is None: self.getImage return self.__image[:,1040:] @property def getColor(self): '''Get false color image (EXPERIMENTAL) Returns: :obj:`numpy array`: A matrix list of pixel ''' if self.__color is None: if self.__image is None: self.getImage imageA = self.getImageA imageB = self.getImageB #imageAb = scipy.ndimage.uniform_filter(self.getImageA, size=(3, 3)) #imageBb = scipy.ndimage.uniform_filter(self.getImageB, size=(3, 3)) # constants #tempLimit = 147.0 #seaLimit = 25.0 #landLimit = 90.0 #orig tempLimit = 155.0 seaLimit = 30.0 landLimit = 90.0 colorImg = [] for r in range(len(imageA)): colorRow = [] for c in range(1040): v, t = imageA[r,c], imageB[r,c] #vb, tb = imageAb[r,c], imageBb[r,c] maxColor, minColor = None, None scaleVisible, scaleTemp = None, None # change to > if t < tempLimit: # clouds minColor, maxColor = [230/360.0, 0.2, 0.3], [230/360.0, 0.0, 1.0] scaleVisible = v / 256.0 scaleTemp = (256.0 - t) / 256.0 else: if v < seaLimit: # sea minColor, maxColor = [200.0/360.0, 0.7, 0.6], [240.0/360.0, 0.6, 0.4] scaleVisible = v / seaLimit scaleTemp = (256.0-t) / (256.0 - tempLimit) else: # ground minColor, maxColor = [60.0/360.0, 0.6, 0.2], [100.0/360.0, 0.0, 0.5] scaleVisible = (v - seaLimit) / (landLimit - seaLimit) scaleTemp = (256.0 - t) / (256.0 - tempLimit); finalS = maxColor[1] + scaleTemp * (minColor[1] - maxColor[1]); finalV = maxColor[2] + scaleVisible * (minColor[2] - maxColor[2]); finalH = maxColor[0] + scaleVisible * scaleTemp * (minColor[0] - maxColor[0]); pix = tuple([int(k * 255.0) for k in colorsys.hsv_to_rgb(finalH, finalS, finalV)]) colorRow.append(pix) colorImg.append(colorRow) self.__color = np.uint8(np.array(colorImg)) return self.__color def __audio(self, audioFreq = constants.NOAA_AUDSAMPRATE, strictness = True): '''Get the audio from data at this sampling rate Args: audioFreq (:obj:`int`, optional): Target frequency of sampling of audio strictness (:obj:`bool`, optional): Strictness of sampling Returns: :obj:`commSignal`: An audio signal ''' logging.info('Beginning FM demodulation to get audio in chunks') audioOut = comm.commSignal(audioFreq) bhFilter = filters.blackmanHarris(151) fmDemdulator = demod_fm.demod_fm() chunkerObj = chunker.chunker(self.__sigsrc) for i in chunkerObj.getChunks: logging.info('Processing chunk %d of %d chunks', chunkerObj.getChunks.index(i)+1, len(chunkerObj.getChunks)) sig = comm.commSignal(self.__sigsrc.sampFreq, self.__sigsrc.read(*i), chunkerObj).offsetFreq(self.__offset).filter(bhFilter).bwLim(self.__bw, uniq = "First").funcApply(fmDemdulator.demod).bwLim(audioFreq, strictness) audioOut.extend(sig) logging.info('FM demodulation successfully complete') self.__audOut = audioOut return audioOut def __getAM(self, sig): '''Do AM demodulation in chunks of given signal Args: sig (:obj:`comm object`): Input signal Returns: :obj:`commSignal`: AM demodulated signal ''' logging.info('Beginning AM demodulation in chunks') amDemdulator = demod_am.demod_am() amOut = comm.commSignal(sig.sampRate) chunkerObj = chunker.chunker(sig, chunkSize = 60000*4) for i in chunkerObj.getChunks: logging.info('Processing chunk %d of %d chunks', chunkerObj.getChunks.index(i)+1, len(chunkerObj.getChunks)) demodSig = amDemdulator.demod(sig.signal[i[0]:i[1]]) amOut.extend(comm.commSignal(sig.sampRate, demodSig)) logging.info('AM demodulation completed') return amOut def __correlate(self, haystack, needle): '''Function to do normalised correlation Args: haystack (:obj:`numpy array`): Input signal needle (:obj:`numpy array`): Sync signal Returns: :obj:`numpy array`: correlation array ''' cor = signal.correlate(haystack, needle, mode = 'same') sums = np.convolve(haystack * haystack, [1]*len(needle), mode = 'same') norm = cor / (sums * np.sum(needle * needle))**0.5 return norm def __correlateAndFindPeaks(self, sig, sync, getExtraInfo = False, useNormCorrelate = True, useFilter = False, usePosNeedle = True, filterType = filters.hamming(492, zeroPhase = True)): '''Correlates given signal and sync signal to find location of syncs Args: sig (:obj:`comm object`): Input signal sync (:obj:`list`): Sync bits Returns: :obj:`list`: List of detected syncs ''' # create the sync signals, at required sampling frequency sampRateCorrection = round(sig.sampRate * constants.NOAA_T) if usePosNeedle: sync = ((np.repeat(sync, sampRateCorrection) * 233) + 11)/255 else: sync = np.repeat(sync, sampRateCorrection) - 0.5 # uncomment below if exact sampling frequency is desired #sync = signal.resample(sync, int(sig.sampRate * len(sync)/(sampRateCorrection*1.0/constants.NOAA_T))) # correlate signal with syncs cor = None if not useNormCorrelate: if useFilter: cor = signal.correlate(filterType.applyOn(sig.signal), sync, mode = 'same') else: cor = signal.correlate(sig.signal, sync, mode = 'same') else: if useFilter: cor = np.array(self.__correlate(filterType.applyOn(sig.signal), sync)) else: cor = np.array(self.__correlate(sig.signal, sync)) # now to find peaks # in a second long signal we will expect two peaks, similarly here expectedPeaks = int(2*(len(cor) / sig.sampRate)) + 2 # find indices top expectedPeak number of values maxk = np.argpartition(cor, -1*expectedPeaks)[-1*expectedPeaks:] # get average height of peaks avgpk = np.sum(cor[maxk]) / expectedPeaks # set minimum peak height, 25% less than average avgpk -= constants.NOAA_PEAKHEIGHTWIGGLE*(avgpk - (np.sum(cor[np.argpartition(cor, expectedPeaks)[:expectedPeaks]]) / expectedPeaks)) # get all signal locations where it is above this possiblePeaks = np.sort(np.argwhere(cor > avgpk).ravel()) # minimum distance between peaks is about 0.45 seconds i.e. 50 ms wiggle room minPkDist = constants.NOAA_MINPEAKDIST * sig.sampRate absolutePeaks = [] currentMax = None currentMaxIndex = None # go through the list of possible peaks abd pick the maximum one in each group for i in np.nditer(possiblePeaks): if not currentMaxIndex is None and (i - currentMaxIndex) >= minPkDist: absolutePeaks.append(currentMaxIndex) currentMax = None currentMaxIndex = None if currentMax is None or currentMax < cor[i]: currentMax = cor[i] currentMaxIndex = i absolutePeaks.append(currentMaxIndex) # offset it to the beginning of the sync absolutePeaks = [i - int(len(sync)/2) for i in absolutePeaks] absolutePeaks = np.sort(np.array(absolutePeaks).ravel()) # get time sync values timeSyncs = [] pkHeights = [] if getExtraInfo: for i in absolutePeaks: if i+2*int(len(sync)) < sig.length: timeSyncs.append(np.average(sig.signal[i+int(len(sync)):i+2*int(len(sync))])) else: timeSyncs.append(None) pkHeights.append(cor[i + int(len(sync)/2)]) if getExtraInfo: return absolutePeaks, pkHeights, timeSyncs return absolutePeaks
[docs] def getCrudeSync(self): '''Get the sync locations: at constants.NOAA_CRUDESYNCSAMPRATE sampling rate Returns: :obj:`list`: A list of locations of sync in sample number (start of sync) ''' if self.__syncA is None or self.__syncB is None: sig = self.__audio(constants.NOAA_CRUDESYNCSAMPRATE, False) # first get the AM demodulated signal at required sampling rate sig = self.__getAM(sig) self.__syncCrudeSampRate = sig.sampRate logging.info('Beginning SyncA detection') self.__syncA = self.__correlateAndFindPeaks(sig, constants.NOAA_SYNCA) logging.info('Done SyncA detection') logging.info('Beginning SyncB detection') self.__syncB = self.__correlateAndFindPeaks(sig, constants.NOAA_SYNCB) logging.info('Done SyncB detection') # determine if some data was found or not syncAdiff = np.abs(np.diff(self.__syncA) - (self.__syncCrudeSampRate*0.5)) syncAdiffTemp = np.array([np.max(syncAdiff[i:i+constants.NOAA_DETECTCONSSYNCSNUM]) for i in range(len(syncAdiff)-constants.NOAA_DETECTCONSSYNCSNUM+1)]) syncBdiff = np.abs(np.diff(self.__syncB) - (self.__syncCrudeSampRate*0.5)) syncBdiffTemp = np.array([np.max(syncBdiff[i:i+constants.NOAA_DETECTCONSSYNCSNUM]) for i in range(len(syncBdiff)-constants.NOAA_DETECTCONSSYNCSNUM+1)]) if syncAdiffTemp.size != 0 and syncBdiffTemp.size != 0: minSyncAdiff = np.min(syncAdiffTemp) minSyncBdiff = np.min(syncBdiffTemp) if minSyncAdiff < constants.NOAA_DETECTMAXCHANGE or minSyncBdiff < constants.NOAA_DETECTMAXCHANGE: logging.info('NOAA Signal was found') self.__useful = 1 else: logging.info('NOAA Signal was not found') else: pass return [self.__syncA, self.__syncB]
[docs] def getAccurateSync(self, useNormCorrelate = True): '''Get the sync locations: at highest sampling rate Args: useNormCorrelate (:obj:`bool`, optional): Whether to use normalized correlation or not Returns: :obj:`list`: A list of locations of sync in sample number (start of sync) ''' if self.__asyncA is None or self.__asyncB is None or self.__asyncBtime is None or self.__asyncAtime is None or self.__asyncBpk is None or self.__asyncApk is None or not self.__useNormCorrelate == useNormCorrelate: self.__useNormCorrelate = useNormCorrelate if self.__syncA is None or self.__syncB is None: self.getCrudeSync() # calculate the width of search window in sample numbers syncTime = constants.NOAA_T * len(constants.NOAA_SYNCA) searchTimeWidth = 3 * syncTime searchSampleWidth = int(searchTimeWidth * self.__sigsrc.sampFreq) # convert sync from samples to time csyncA = self.__syncA / self.__syncCrudeSampRate csyncB = self.__syncB / self.__syncCrudeSampRate # convert back to sample number csyncA *= self.__sigsrc.sampFreq csyncB *= self.__sigsrc.sampFreq ## Accurate syncA self.__asyncA = [] self.__asyncApk = [] self.__asyncAtime = [] logging.info('Beginning Accurate SyncA detection') for i in csyncA: logging.info('Detecting Sync %d of %d syncs', list(csyncA).index(i) + 1, len(csyncA)) startI = int(i) - int(searchSampleWidth) endI = int(i) + int(searchSampleWidth) if startI < 0 or endI > self.__sigsrc.length: continue sig = comm.commSignal(self.__sigsrc.sampFreq, self.__sigsrc.read(startI, endI)).offsetFreq(self.__offset).filter(filters.blackmanHarris(151, zeroPhase = True)).funcApply(demod_fm.demod_fm().demod).funcApply(demod_am.demod_am().demod) syncDet, PkHeights, TimeSync = self.__correlateAndFindPeaks(sig, constants.NOAA_SYNCA, getExtraInfo = True, useNormCorrelate = useNormCorrelate, usePosNeedle = useNormCorrelate, useFilter = True) self.__asyncA.append(syncDet[0] + startI) self.__asyncApk.append(PkHeights[0]) self.__asyncAtime.append(TimeSync[0]) logging.info('Accurate SyncA detection complete') ## Accurate syncB self.__asyncB = [] self.__asyncBpk = [] self.__asyncBtime = [] logging.info('Beginning Accurate SyncB detection') for i in csyncB: logging.info('Detecting Sync %d of %d syncs', list(csyncB).index(i) + 1, len(csyncB)) startI = int(i) - int(searchSampleWidth) endI = int(i) + int(searchSampleWidth) if startI < 0 or endI > self.__sigsrc.length: continue sig = comm.commSignal(self.__sigsrc.sampFreq, self.__sigsrc.read(startI, endI)).offsetFreq(self.__offset).filter(filters.blackmanHarris(151, zeroPhase = True)).funcApply(demod_fm.demod_fm().demod).funcApply(demod_am.demod_am().demod) syncDet, PkHeights, TimeSync = self.__correlateAndFindPeaks(sig, constants.NOAA_SYNCB, getExtraInfo = True, useNormCorrelate = useNormCorrelate, usePosNeedle = useNormCorrelate, useFilter = True) self.__asyncB.append(syncDet[0] + startI) self.__asyncBpk.append(PkHeights[0]) self.__asyncBtime.append(TimeSync[0]) logging.info('Accurate SyncB detection complete') return [self.__asyncA, np.diff(self.__asyncA), self.__asyncApk, self.__asyncAtime, self.__asyncB, np.diff(self.__asyncB), self.__asyncBpk, self.__asyncBtime]