import unittest import sh import wave import contextlib import uuid import time from test.helpers.utils import save_result from test.helpers.iseelogger import logObj class Qaudio(unittest.TestCase): params = None __resultlist = None # resultlist is a python list of python dictionaries __dtmf_secuence = ["1", "3", "5", "7", "9"] __dtmf_file = None __file_uuid = None __QaudioName = None def __init__(self, testname, testfunc, varlist): self.params = varlist super(Qaudio, self).__init__(testfunc) self._testMethodDoc = testname self.__resultlist = [] self.__xmlObj = varlist["xml"] self.__QaudioName = varlist.get('name', 'qaudio') self.__dtmf_file_play = varlist.get('play_dtmf_file', self.__xmlObj.getKeyVal(self.__QaudioName, "play_dtmf_file", "dtmf-13579.wav")) self.__recordtime = varlist.get('recordtime', self.__xmlObj.getKeyVal(self.__QaudioName, "recordtime", "0.15")) self.__dtmf_file_record = varlist.get('record_dtmf_file', self.__xmlObj.getKeyVal(self.__QaudioName, "record_dtmf_file", "record-{}.wav")) self.__sampling_rate = varlist.get('rate', self.__xmlObj.getKeyVal(self.__QaudioName, "rate", "8000")) self.__record_path = varlist.get('to', self.__xmlObj.getKeyVal(self.__QaudioName, "to", "/mnt/station_ramdisk")) self.__run_delay = float(varlist.get('aplay_run_delay', self.__xmlObj.getKeyVal(self.__QaudioName, "aplay_run_delay", "0.0"))) self.__file_uuid = uuid.uuid4() self.__dtmf_file_record = self.__dtmf_file_record.format(self.__file_uuid) def restoreAlsaCtl(self): try: sh.alsactl('restore'); except Exception as E: logObj.getlogger().error("Failed to Execite alsactl restore"); def execute(self): self.restoreAlsaCtl() try: # analize audio file calcRecordTime = self.calc_audio_duration(self.__dtmf_file_play) + float(self.__recordtime) + float(0.1) except TypeError as Error: self.fail("failed: TypeError: {}.".Error) # previous play to estabilise audio lines p2 = sh.arecord("-r", self.__sampling_rate, "-d", calcRecordTime, "{}/{}".format(self.__record_path, self.__dtmf_file_record), _bg=True) time.sleep(self.__run_delay) p1 = sh.aplay(self.__dtmf_file_play) p2.wait() p2 = sh.arecord("-r", self.__sampling_rate, "-d", calcRecordTime, "{}/{}".format(self.__record_path, self.__dtmf_file_record), _bg=True) time.sleep(self.__run_delay) p1 = sh.aplay(self.__dtmf_file_play) p2.wait() if p1.exit_code == 0 and p2.exit_code == 0: p = sh.multimon("-t", "wav", "-a", "DTMF", "{}/{}".format(self.__record_path, self.__dtmf_file_record), "-q") if p.exit_code == 0: lines = p.stdout.decode('ascii').splitlines() self.__dtmf_secuence_result = [] for li in lines: if li.split(" ")[0] == "DTMF:": self.__dtmf_secuence_result.append(li.split(" ")[1]) self.__dtmf_secuence_result = sorted(list(set(self.__dtmf_secuence_result))) # compare original and processed dtmf sequence if len(self.__dtmf_secuence) == len(self.__dtmf_secuence_result): for a, b in zip(self.__dtmf_secuence, self.__dtmf_secuence_result): if a != b: logObj.getlogger().debug( "failed: sent and received DTMF sequence do not match.{} != {}".format( self.__dtmf_secuence, self.__dtmf_secuence_result)) self.fail("failed: sent and received DTMF sequence do not match.") save_result(filePath='{}/{}'.format(self.__record_path, self.__dtmf_file_record), description='sound record', mime='audio/x-wav', result=self.__resultlist) else: logObj.getlogger().debug("received DTMF sequence is shorter than expected.{} {}".format(self.__dtmf_secuence,self.__dtmf_secuence_result)) self.fail("received DTMF sequence is shorter than expected") else: self.fail("failed: unable to use multimon command.") else: self.fail("failed: unable to play/record audio.") def getresults(self): return self.__resultlist def gettextresult(self): return "" def calc_audio_duration(self, fname): duration = 0.0 with contextlib.closing(wave.open(fname, 'r')) as f: frames = f.getnframes() rate = f.getframerate() duration = frames / float(rate) f.close() return duration