1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
|
import unittest
import sh
import wave
import contextlib
import uuid
import time
from test.helpers.utils import save_result
from test.helpers.iseelogger import logObj
class Qaudio(unittest.TestCase):
params = None
__resultlist = None # resultlist is a python list of python dictionaries
__dtmf_secuence = ["1", "3", "5", "7", "9"]
__dtmf_file = None
__file_uuid = None
__QaudioName = None
def __init__(self, testname, testfunc, varlist):
self.params = varlist
super(Qaudio, self).__init__(testfunc)
self._testMethodDoc = testname
self.__resultlist = []
self.__xmlObj = varlist["xml"]
self.__QaudioName = varlist.get('name', 'qaudio')
self.__dtmf_file_play = varlist.get('play_dtmf_file', self.__xmlObj.getKeyVal(self.__QaudioName, "play_dtmf_file", "dtmf-13579.wav"))
self.__recordtime = varlist.get('recordtime', self.__xmlObj.getKeyVal(self.__QaudioName, "recordtime", "0.15"))
self.__dtmf_file_record = varlist.get('record_dtmf_file', self.__xmlObj.getKeyVal(self.__QaudioName, "record_dtmf_file", "record-{}.wav"))
self.__sampling_rate = varlist.get('rate', self.__xmlObj.getKeyVal(self.__QaudioName, "rate", "8000"))
self.__record_path = varlist.get('to', self.__xmlObj.getKeyVal(self.__QaudioName, "to", "/mnt/station_ramdisk"))
self.__run_delay = float(varlist.get('aplay_run_delay', self.__xmlObj.getKeyVal(self.__QaudioName, "aplay_run_delay", "0.0")))
self.__file_uuid = uuid.uuid4()
self.__dtmf_file_record = self.__dtmf_file_record.format(self.__file_uuid)
def restoreAlsaCtl(self):
try:
sh.alsactl('restore');
except Exception as E:
logObj.getlogger().error("Failed to Execite alsactl restore");
def execute(self):
self.restoreAlsaCtl()
try:
# analize audio file
calcRecordTime = self.calc_audio_duration(self.__dtmf_file_play) + float(self.__recordtime) + float(0.1)
except TypeError as Error:
self.fail("failed: TypeError: {}.".Error)
# previous play to estabilise audio lines
p2 = sh.arecord("-r", self.__sampling_rate, "-d", calcRecordTime, "{}/{}".format(self.__record_path, self.__dtmf_file_record), _bg=True)
time.sleep(self.__run_delay)
p1 = sh.aplay(self.__dtmf_file_play)
p2.wait()
p2 = sh.arecord("-r", self.__sampling_rate, "-d", calcRecordTime, "{}/{}".format(self.__record_path, self.__dtmf_file_record), _bg=True)
time.sleep(self.__run_delay)
p1 = sh.aplay(self.__dtmf_file_play)
p2.wait()
if p1.exit_code == 0 and p2.exit_code == 0:
p = sh.multimon("-t", "wav", "-a", "DTMF", "{}/{}".format(self.__record_path, self.__dtmf_file_record), "-q")
if p.exit_code == 0:
lines = p.stdout.decode('ascii').splitlines()
self.__dtmf_secuence_result = []
for li in lines:
if li.split(" ")[0] == "DTMF:":
self.__dtmf_secuence_result.append(li.split(" ")[1])
self.__dtmf_secuence_result = sorted(list(set(self.__dtmf_secuence_result)))
# compare original and processed dtmf sequence
if len(self.__dtmf_secuence) == len(self.__dtmf_secuence_result):
for a, b in zip(self.__dtmf_secuence, self.__dtmf_secuence_result):
if a != b:
logObj.getlogger().debug(
"failed: sent and received DTMF sequence do not match.{} != {}".format(
self.__dtmf_secuence, self.__dtmf_secuence_result))
self.fail("failed: sent and received DTMF sequence do not match.")
save_result(filePath='{}/{}'.format(self.__record_path, self.__dtmf_file_record),
description='sound record',
mime='audio/x-wav',
result=self.__resultlist)
else:
logObj.getlogger().debug("received DTMF sequence is shorter than expected.{} {}".format(self.__dtmf_secuence,self.__dtmf_secuence_result))
self.fail("received DTMF sequence is shorter than expected")
else:
self.fail("failed: unable to use multimon command.")
else:
self.fail("failed: unable to play/record audio.")
def getresults(self):
return self.__resultlist
def gettextresult(self):
return ""
def calc_audio_duration(self, fname):
duration = 0.0
with contextlib.closing(wave.open(fname, 'r')) as f:
frames = f.getnframes()
rate = f.getframerate()
duration = frames / float(rate)
f.close()
return duration
|