Source code for directdemod.decode_afsk1200

'''
AFSK1200
'''
from directdemod import source, sink, chunker, comm, constants, filters, demod_am, demod_fm, peakdetect, \
    framechecksequence
import numpy as np
import logging
import scipy.signal as signal
import matplotlib.pylab as plt

'''
Object to decode AFSK1200
'''

[docs]class decode_afsk1200: ''' Object to decode AFSK1200 '''
[docs] def __init__(self, sigsrc, offset, bw): '''Initialize the object Args: sigsrc (:obj:`commSignal`): IQ data source offset (:obj:`float`): Frequency offset of source in Hz bw (:obj:`int`, optional): Bandwidth ''' self.__BAUDRATE = 1200 self.__mark_frequency = 1200 self.__space_frequency = 2200 self.__bw = bw if self.__bw is None: self.__bw = 22050 self.__sigsrc = sigsrc self.__offset = offset self.__msg = None self.__graphs = 0 self.__useful = 0
@property def useful(self): '''See if atleast one message was found or not Returns: :obj:`int`: 0 if not found, 1 if found ''' #if self.__msg is None: # self.getMsg return self.__useful @property def getMsg(self): '''Get the message from data Returns: :string: A string of message data ''' if self.__msg is None: sig = comm.commSignal(self.__sigsrc.sampFreq) chunkerObj = chunker.chunker(self.__sigsrc) bhFilter = filters.blackmanHarris(151) fmDemodObj = demod_fm.demod_fm() for i in chunkerObj.getChunks: logging.info('Processing chunk %d of %d chunks', chunkerObj.getChunks.index(i)+1, len(chunkerObj.getChunks)) # get the signal chunkSig = comm.commSignal(self.__sigsrc.sampFreq, self.__sigsrc.read(*i), chunkerObj) ## Offset the frequency if required, not needed here chunkSig.offsetFreq(self.__offset) ## Apply a blackman harris filter to get rid of noise chunkSig.filter(bhFilter) ## Limit bandwidth chunkSig.bwLim(self.__bw) # store signal sig.extend(chunkSig) ## FM demodulate sig.funcApply(fmDemodObj.demod) logging.info('FM demod complete') ## APRS has two freqs 1200 and 2200, hence create a butter band pass filter from 1200-500 to 2200+500 sig.filter(filters.butter(sig.sampRate, 1200 - 500, 2200 + 500, typeFlt=constants.FLT_BP)) logging.info('Filtering complete') ## plot the signal if self.__graphs == 1: plt.plot(sig.signal) plt.show() buffer_size = int(np.round(self.__bw / self.__BAUDRATE)) SAMPLE_PER_BAUD = self.__bw // self.__BAUDRATE # creating the “correlation list" for the comparison frequencies of the digital frequency filers corr_mark_i = np.zeros(buffer_size) corr_mark_q = np.zeros(buffer_size) corr_space_i = np.zeros(buffer_size) corr_space_q = np.zeros(buffer_size) # filling the "correlation list" with sampled waveform for the two frequencies. for i in range(buffer_size): mark_angle = (i * 1.0 / self.__bw) / (1 / self.__mark_frequency) * 2 * np.pi corr_mark_i[i] = np.cos(mark_angle) corr_mark_q[i] = np.sin(mark_angle) space_angle = (i * 1.0 / self.__bw) / (1 / self.__space_frequency) * 2 * np.pi corr_space_i[i] = np.cos(space_angle) corr_space_q[i] = np.sin(space_angle) # now we check the full signal for the binary states, whether it is closer to 1200 hz or closer to 2200 Hz binary_filter = np.zeros(len(sig.signal)) for sample in range(len(sig.signal) - buffer_size): corr_mi = 0 corr_mq = 0 corr_si = 0 corr_sq = 0 for sub in range(buffer_size): corr_mi = corr_mi + sig.signal[sample + sub] * corr_mark_i[sub] corr_mq = corr_mq + sig.signal[sample + sub] * corr_mark_q[sub] corr_si = corr_si + sig.signal[sample + sub] * corr_space_i[sub] corr_sq = corr_sq + sig.signal[sample + sub] * corr_space_q[sub] binary_filter[sample] = (corr_mi ** 2 + corr_mq ** 2 - corr_si ** 2 - corr_sq ** 2) logging.info('Binary filter complete') if self.__graphs == 1: plt.plot(sig.signal / np.max(sig.signal)) plt.plot(np.sign(binary_filter)) plt.show() # now trying to find the raising or falling edges of the bits # generating the edge detection kernel kernel = np.zeros(SAMPLE_PER_BAUD) for i in range(len(kernel)): if i < SAMPLE_PER_BAUD // 2: kernel[i] = -1 else: kernel[i] = 1 changes = np.correlate(np.sign(binary_filter), kernel, mode="same") / SAMPLE_PER_BAUD if self.__graphs == 1: plt.plot(np.sign(binary_filter)) plt.plot(changes) plt.title("bit starts") plt.show() # by using the edges of the bits for synching the sampling to the transmitted bits, the algo is # self synchronizing. # but sometimes the crossing areas between the bits can be uncertain. for that, a peak detection defines # only one solution in close vicinity and defining the edges further. peaks = peakdetect.peakdetect(np.abs(changes), lookahead=int(SAMPLE_PER_BAUD * 0.65)) peaks1_x = [] peaks1_y = [] # positive peaks for i in range(len(peaks[0])): peaks1_x.append(peaks[0][i][0]) peaks1_y.append(peaks[0][i][1]) if self.__graphs == 1: plt.plot(peaks1_x, peaks1_y, "o") plt.plot(np.abs(changes)) plt.plot(np.sign(binary_filter)) plt.plot(sig.signal / np.max(sig.signal)) plt.show() bit_repeated = np.round(np.diff(peaks1_x) / (self.__bw / self.__BAUDRATE)) logging.info('Bit repeat complete') if self.__graphs == 1: plt.plot(np.sign(binary_filter)) plt.plot(peaks1_x[:-1], bit_repeated, "*") plt.grid() plt.title("where frequency shifts") plt.show() # making the bits for nrzi bitstream_nrzi = [] c = 0 for i in range(len(bit_repeated)): # print(c, i, x1[i], "p", int(bit_repeated[i])) for repeats in range(int(bit_repeated[i])): bitstream_nrzi.append((np.mean(binary_filter[ peaks1_x[i] + repeats * SAMPLE_PER_BAUD: peaks1_x[i] + ( repeats + 1) * SAMPLE_PER_BAUD]))) # print(c, bitstream_nrzi[-1]) c += 1 # here we convert the nrzi bits to normal bits bitstream = decode_afsk1200.decode_nrzi(np.sign(bitstream_nrzi)) logging.info('Decoding NRZI complete') if self.__graphs == 1: plt.plot(np.sign(bitstream_nrzi)) plt.plot(bitstream_nrzi, "o-") plt.plot(bitstream, "*") plt.show() bit_startflag = [] bit_startflag_marker = [] for bit in range(len(bitstream) - 8): out = "" for i in range(8): out += str(bitstream[bit + i]) if out == "01111110": # print(bit) bit_startflag.append(bit) bit_startflag_marker.append(1) length = np.diff(bit_startflag) # there are still the stuffed bits inside the bit stream, so we need to find them... bitstream_stuffed = decode_afsk1200.find_bit_stuffing(bitstream) if self.__graphs == 1: plt.plot(bitstream_nrzi) plt.plot(bitstream, "o-") plt.plot(bit_startflag[:-1], length, "o") plt.plot(bit_startflag, bit_startflag_marker, "o") plt.plot(bitstream_stuffed, "*") plt.title("test1") plt.show() logging.info('Stuffed bit removal complete') # checking at each possible start flag, if the bit stream was received correctly. # this is done by checking the crc16 at the end of the msg with the msg body. for flag in range(len(bit_startflag) - 1): # and firstly, we need to get rid of the stuffed bits, that are still inside the bit stream bits = decode_afsk1200.reduce_stuffed_bit(bitstream[bit_startflag[flag] + 8: bit_startflag[flag + 1]], bitstream_stuffed[bit_startflag[flag] + 8: bit_startflag[flag + 1]]) msg = bits[:-16] if len(bits) % 8 == 0 and len(msg) > 16 * 8: out = "" for i in range(len(msg)): out += str(msg[i]) crc = framechecksequence.fcs_crc16(out) crc_received = "" msg_rest = bits[-16:] for i in range(len(msg_rest)): crc_received += str(msg_rest[i]) if crc_received == crc: msg_text = decode_afsk1200.bits_to_msg(msg) print("one aprs msg with correct crc is found. #", flag, "starts at", bit_startflag[flag], "length is", len(bits) / 8) msg_text if self.__graphs == 1: plt.plot(bitstream[bit_startflag[flag] + 8: bit_startflag[flag + 1] + 8], "o-") plt.plot(bits, "*-") plt.show() # there can be several messages per stream, so for now only the last is stored. # to-do self.__msg = "template: space rocks!" self.__useful = 1 logging.info('Message extraction complete') return self.__msg def bits_to_msg(bits): msg_text = "" header_text = "" aprs_header = 1 for byte in range(0, len(bits), 8): tmp = bits[byte: byte + 8] character = "" for bit in range(len(tmp)): character += str(tmp[7 - bit]) if aprs_header == 1: header_text += chr(int("0" + character[:7], 2)) else: msg_text += chr(int(character, 2)) if character[-1] == "1" and aprs_header == 1: # header is ending here! aprs_header = 0 DESTINATION_ADDRESS = header_text[:7] SOURCE_ADDRESS = header_text[7:14] PATH = header_text[14:] print("destination:\t", DESTINATION_ADDRESS) print("source:\t\t", SOURCE_ADDRESS) print("path:\t\t", PATH) CONTROL_FIELD = hex(ord(msg_text[0])) PROTOCOL_ID = hex(ord(msg_text[1])) INFORMATION_FIELD = msg_text[2:] print("control fields:\t", CONTROL_FIELD, PROTOCOL_ID) print("information:\t", INFORMATION_FIELD) return INFORMATION_FIELD
[docs] def decode_nrzi(nrzi): '''Decode NRZI Args: nrzi (:obj:`list`): the NRZI bits Returns: :obj:`list`: decoded NRZI bits ''' code_bit = [] # starting bit. with NRZI it doesn't matter, if 0 or 1 at the beginning code_bit.append(1) for bit in range(1, len(nrzi)): if nrzi[bit - 1] == nrzi[bit]: code_bit.append(1) elif nrzi[bit - 1] != nrzi[bit]: code_bit.append(0) return code_bit
[docs] def find_bit_stuffing(code_bit): '''To find bit stuffing Args: code_bit (:obj:`list`): the bits Returns: :obj:`list`: bit stuffing status ''' stuffed_bit = np.zeros(len(code_bit), dtype=np.int) counter = 0 for bit in range(len(code_bit)): if counter == 5 and code_bit[bit] == 1: # could be ending, because 6th bit is not 0 and could be intentially left 1, as expected for the frame end stuffed_bit[bit] = 2 if counter == 5 and code_bit[bit] == 0: # normal bit stuffing stuffed_bit[bit] = 1 #print(bit, code_bit[bit], stuffed_bit[bit], counter) if code_bit[bit] == 1: counter += 1 if code_bit[bit] == 0: counter = 0 return stuffed_bit
[docs] def reduce_stuffed_bit(code_bit, stuffed_bit): '''To remove stuffed bits Args: code_bit (:obj:`list`): the bits stuffed_bit (:obj:`list`): the result from find_bit_stuffing() Returns: :obj:`list`: bits free from stuffing ''' out = [] for i in range(len(code_bit)): if stuffed_bit[i] == 0: out.append(code_bit[i]) return out
if __name__== "__main__": # input file name fileName = "../samples/SDRSharp_20170830_073907Z_145825000Hz_IQ_autogain.wav" # create this as a signal source sigsrc = source.IQwav(fileName) freqOffset = 0 bandwidth = 22050 afskObj = decode_afsk1200(sigsrc, freqOffset, bandwidth) print(afskObj.getMsg)