From 09de774dcc1a5abc1c8f3a00fdb039aa3c522f52 Mon Sep 17 00:00:00 2001 From: Manel Caro Date: Wed, 4 Mar 2020 17:46:36 +0100 Subject: SOPA Initial Commit --- test-cli/test/tests/qamp.py | 47 +++++++++-------------- test-cli/test/tests/qaudio.py | 11 +++--- test-cli/test/tests/qbutton.py | 19 +++++++--- test-cli/test/tests/qduplex_ser.py | 21 +++++++--- test-cli/test/tests/qeeprom.py | 2 +- test-cli/test/tests/qethernet.py | 78 ++++++++++++++++++-------------------- test-cli/test/tests/qflash.py | 5 +-- test-cli/test/tests/qi2c.py | 12 ++++-- test-cli/test/tests/qnand.py | 22 +++++++++++ test-cli/test/tests/qram.py | 33 +++++++++------- test-cli/test/tests/qrtc.py | 9 ++++- test-cli/test/tests/qscreen.py | 18 +++------ test-cli/test/tests/qserial.py | 15 ++++++-- test-cli/test/tests/qusb.py | 17 +++++---- test-cli/test/tests/qwifi.py | 52 ++++++++++--------------- 15 files changed, 196 insertions(+), 165 deletions(-) create mode 100644 test-cli/test/tests/qnand.py (limited to 'test-cli/test/tests') diff --git a/test-cli/test/tests/qamp.py b/test-cli/test/tests/qamp.py index 9fcd6d2..3e38ce9 100644 --- a/test-cli/test/tests/qamp.py +++ b/test-cli/test/tests/qamp.py @@ -5,10 +5,17 @@ import time class Qamp(unittest.TestCase): - def __init__(self, testname, testfunc, undercurrent=0.1, overcurrent=2): + #varlist: undercurrent, overcurrent + def __init__(self, testname, testfunc, varlist): self._current = 0.0 - self._undercurrent=undercurrent - self._overcurrent = overcurrent + if "undercurrent" in varlist: + self._undercurrent = varlist["undercurrent"] + else: + raise Exception('undercurrent param inside Qamp must be defined') + if "overcurrent" in varlist: + self._overcurrent = varlist["overcurrent"] + else: + raise Exception('overcurrent param inside Qamp must be defined') self._vshuntfactor=16384.0 self._ser = serial.Serial() self._ser.port = '/dev/ttyUSB0' @@ -33,24 +40,12 @@ class Qamp(unittest.TestCase): except: self.fail("failed: IMPOSSIBLE OPEN USB-SERIAL PORT ( {} )".format(self._ser.port)) error=1 - return -1 if error==0: # Clean input and output buffer of serial port self._ser.flushInput() self._ser.flushOutput() # Send command to read Voltage at Shunt resistor # Prepare cmd - cmd = ('at\n\r') - i=0 - while (i < len(cmd)): - i = i + self._ser.write(cmd[i].encode('ascii')) - time.sleep(0.05) - self._ser.read(1) - res0 = [] - while (self._ser.inWaiting() > 0): # if incoming bytes are waiting to be read from the serial input buffer - res0.append(self._ser.read(1).decode('ascii')) # read the bytes and convert from binary array to ASCII - print(res0) - #CHECK COM FIRST cmd = ('at+in?\n\r') i = 0 @@ -67,21 +62,15 @@ class Qamp(unittest.TestCase): string = ''.join(res) string = string.replace('\n', '') - try: - self._current = float(int(string, 0)) / self._vshuntfactor - except: - self.fail("failed: CAN'T READ CONSUMPTION (CURRENT=0?)") - error=1 - return -1 - if error==0: - print(self._current) + self._current = float(int(string, 0)) / self._vshuntfactor + print(self._current) # In order to give a valid result it is importarnt to define an under current value - if (self._current > float(self._overcurrent)): - self.fail("failed: OVERCURRENT DETECTED ( {} )".format(self._current)) - return -1 + if (self._current > float(self._overcurrent)): + self.fail("failed: OVERCURRENT DETECTED ( {} )".format(self._current)) + + if (self._current < float(self._undercurrent)): + self.fail("failed: UNDERCURRENT DETECTED ( {} )".format(self._current)) + - if (self._current < float(self._undercurrent)): - self.fail("failed: UNDERCURRENT DETECTED ( {} )".format(self._current)) - return -1 diff --git a/test-cli/test/tests/qaudio.py b/test-cli/test/tests/qaudio.py index a1572ca..fe57be2 100644 --- a/test-cli/test/tests/qaudio.py +++ b/test-cli/test/tests/qaudio.py @@ -13,13 +13,14 @@ class Qaudio(unittest.TestCase): self.__refSum = 25 # 1+3+5+7+9 def execute(self): - str_cmd = "aplay test/files/dtmf-13579.wav 2> /dev/null & arecord -r 8000 -d 1 recorded.wav 2> /dev/null" #.format(self.__dtmfFile) + str_cmd = "aplay test/files/dtmf-13579.wav & arecord -r 8000 -d 1 recorded.wav" #.format(self.__dtmfFile) audio_loop = SysCommand("command-name", str_cmd) - if audio_loop.execute() == 0:# BUG: Returns -1 but work + if audio_loop.execute() == -1:# BUG: Returns -1 but work lines = audio_loop.getOutput().splitlines() - str_cmd = "multimon -t wav -a DTMF recorded.wav -q 2> /dev/null" + str_cmd = "multimon -t wav -a DTMF recorded.wav -q" dtmf_decoder = SysCommand("command-name", str_cmd) - if dtmf_decoder.execute() == 0: # BUG: Returns -1 but work + a=dtmf_decoder.execute() + if dtmf_decoder.execute() == -1: # BUG: Returns -1 but work self.__raw_out = dtmf_decoder.getOutput() if self.__raw_out == "": return -1 @@ -34,4 +35,4 @@ class Qaudio(unittest.TestCase): else: self.fail("failed: fail playing/recording file") return -1 - return 0 + return 0 \ No newline at end of file diff --git a/test-cli/test/tests/qbutton.py b/test-cli/test/tests/qbutton.py index 72a897e..4718924 100644 --- a/test-cli/test/tests/qbutton.py +++ b/test-cli/test/tests/qbutton.py @@ -5,8 +5,12 @@ import time class Qbutton(unittest.TestCase): - def __init__(self, testname, testfunc, gpio): - if gpio == "SOPA": + def __init__(self, testname, testfunc, varlist): + if "gpio" in varlist: + self.__gpio = varlist["gpio"] + else: + raise Exception('gpio param inside Qbutton must be defined') + if self.__gpio == "SOPA": super(Qbutton, self).__init__("buttonSopa") else: super(Qbutton, self).__init__("buttonGpio") @@ -34,19 +38,22 @@ class Qbutton(unittest.TestCase): get_button_val = SysCommand("get_button_val", str_cmd) print("\n\t --> PRESS button for 1 sec (TIMEOUT: 10s) \n") timeout = 0 - while timeout < 20: + while timeout < 7200: if get_button_val.execute() == 0: get_button_val.execute() button_value = get_button_val.getOutput() button_value=button_value.decode('ascii').split("x") if int(button_value[1]) == 4: - timeout = 20 + timeout = 7200 + led_off="echo 0 > /sys/class/leds/red\:usbhost/brightness" + ledoff = SysCommand("led_off", led_off) + ledoff.execute() time.sleep(0.5) timeout = timeout + 1 - if timeout==20 and int(button_value[1]) == 0: + if timeout==7200 and int(button_value[1]) == 0: self.fail("failed: timeout exceeded") else: - timeout = 20 + timeout = 7200 self.fail("failed: not button input") else: self.fail("failed: could not complete i2c reset button state") diff --git a/test-cli/test/tests/qduplex_ser.py b/test-cli/test/tests/qduplex_ser.py index 98bda81..837b4d0 100644 --- a/test-cli/test/tests/qduplex_ser.py +++ b/test-cli/test/tests/qduplex_ser.py @@ -6,15 +6,26 @@ import time class Qduplex(unittest.TestCase): - def __init__(self, testname, testfunc, port1, port2, baudrate): + def __init__(self, testname, testfunc, varlist): super(Qduplex, self).__init__(testfunc) - self.__port1 = port1 + if "port1" in varlist: + self.__port1 = varlist["port1"] + else: + raise Exception('port1 param inside Qduplex must be defined') self.__serial1 = serial.Serial(self.__port1, timeout=1) - self.__serial1.baudrate = baudrate - self.__port2 = port2 + if "port2" in varlist: + self.__port2 = varlist["port2"] + else: + raise Exception('port2 param inside Qduplex must be defined') self.__serial2 = serial.Serial(self.__port2, timeout=1) - self.__serial2.baudrate = baudrate + + if "baudrate" in varlist: + self.__serial1.baudrate = varlist["baudrate"] + self.__serial2.baudrate = varlist["baudrate"] + else: + raise Exception('baudrate param inside Qduplex must be defined') + self._testMethodDoc = testname def __del__(self): diff --git a/test-cli/test/tests/qeeprom.py b/test-cli/test/tests/qeeprom.py index f72f78f..2bab248 100644 --- a/test-cli/test/tests/qeeprom.py +++ b/test-cli/test/tests/qeeprom.py @@ -4,7 +4,7 @@ import uuid class Qeeprom(unittest.TestCase): - def __init__(self, testname, testfunc): + def __init__(self, testname, testfunc, varlist): super(Qeeprom, self).__init__(testfunc) self._testMethodDoc = testname diff --git a/test-cli/test/tests/qethernet.py b/test-cli/test/tests/qethernet.py index 2ac447c..be984f5 100644 --- a/test-cli/test/tests/qethernet.py +++ b/test-cli/test/tests/qethernet.py @@ -1,57 +1,51 @@ -from test.helpers.syscmd import SysCommand import unittest - +import sh +import ex class Qethernet(unittest.TestCase): __sip = None - __raw_out = None - __MB_req = None - __MB_real = None - __BW_real = None - __dat_list = None + __numbytestx = None __bind = None __OKBW = None - def __init__(self, testname, testfunc, sip = None, OKBW=100, bind=None): + def __init__(self, testname, testfunc, varlist): super(Qethernet, self).__init__(testfunc) - if sip is not None: - self.__sip = sip - if sip is not None: - self.__bind = bind - self.__MB_req = '10' - self.__OKBW=OKBW + if "sip" in varlist: + self.__sip = varlist["sip"] + else: + raise Exception('sip param inside Qethernet have been be defined') + if "bind" in varlist: + self.__bind = varlist["bind"] + else: + self.__bind = None + if "OKBW" in varlist: + self.__OKBW = varlist["OKBW"] + else: + raise Exception('OKBW param inside Qethernet must be defined') + self.__numbytestx = "10M" self._testMethodDoc = testname def execute(self): - print + # execute iperf command against the server if self.__bind is None: - str_cmd = "iperf -c {} -x CMSV -n {}M".format(self.__sip, self.__MB_req) + p = sh.iperf("-c", self.__sip, "-x", "CMSV", "-n", self.__numbytestx, "-f", "m") else: - str_cmd = "iperf -c {} -x CMSV -n {}M -B {}".format(self.__sip, self.__MB_req, self.__bind) - iperf_command = SysCommand("iperf", str_cmd) - if iperf_command.execute() == 0: - self.__raw_out = iperf_command.getOutput() - if self.__raw_out == "": - return -1 - lines = iperf_command.getOutput().splitlines() - dat = lines[1] - dat = dat.decode('ascii') - dat_list = dat.split( ) - for d in dat_list: - a = dat_list.pop(0) - if a == "sec": - break - self.__MB_real = dat_list[0] - self.__BW_real = dat_list[2] - self.__dat_list = dat_list - #print(self.__MB_real) - #print(self.__BW_real) - self.failUnless(float(self.__BW_real)>float(self.__OKBW)*0.9,"failed: speed is lower than spected. Speed(MB/s)" + str(self.__BW_real)) + p = sh.iperf("-c", self.__sip, "-x", "CMSV", "-n", self.__numbytestx, "-f", "m", "-B", self.__bind) + #check if it was executed succesfully + if p.exit_code == 0: + if p.stdout == "": + self.fail("failed: error executing iperf command") + # analyze output string + # split by lines + lines = p.stdout.splitlines() + # get first line + dat = lines[1].decode('ascii') + # search for the BW value + a = re.search("\d+(\.\d)? Mbits/sec",dat) + b = a.group().split( ) + BWreal = b[0] + + # check if BW is in the expected range + self.failUnless(float(BWreal)>float(self.__OKBW)*0.9,"failed: speed is lower than spected. Speed(MB/s)" + str(BWreal)) else: self.fail("failed: could not complete iperf command") - - def get_Total_MB(self): - return self.__MB_real; - - def get_Total_BW(self): - return self.__MB_real; diff --git a/test-cli/test/tests/qflash.py b/test-cli/test/tests/qflash.py index cdc4109..0c3fcb5 100644 --- a/test-cli/test/tests/qflash.py +++ b/test-cli/test/tests/qflash.py @@ -4,16 +4,13 @@ from test.helpers.globalVariables import globalVar class Qflasher(unittest.TestCase): - def __init__(self, testname, testfunc): + def __init__(self, testname, testfunc, varlist): super(Qflasher, self).__init__(testfunc) self._testMethodDoc = testname model=globalVar.g_mid if model.find("IGEP0046") == 0: self._flash_method = "mx6" self._binlocation="/boot/" - elif model.find("IGEP0000") == 0: - self._flash_method = "mx6" - self._binlocation="/boot/" elif model.find("IGEP0034") == 0: self._flash_method = "nandti" self._binlocation = "/boot/" diff --git a/test-cli/test/tests/qi2c.py b/test-cli/test/tests/qi2c.py index 409005c..2f14424 100644 --- a/test-cli/test/tests/qi2c.py +++ b/test-cli/test/tests/qi2c.py @@ -3,10 +3,16 @@ import unittest class Qi2c(unittest.TestCase): - def __init__(self, testname, testfunc, busnum, register): + def __init__(self, testname, testfunc, varlist): super(Qi2c, self).__init__(testfunc) - self.__busnum = busnum - self.__register = register.split("/") + if "busnum" in varlist: + self.__busnum = varlist["busnum"] + else: + raise Exception('busnum param inside Qi2c must be defined') + if "register" in varlist: + self.__register = varlist["register"].split("/") + else: + raise Exception('register param inside Qi2c must be defined') self.__devices=[] self._testMethodDoc = testname diff --git a/test-cli/test/tests/qnand.py b/test-cli/test/tests/qnand.py new file mode 100644 index 0000000..9f2cd47 --- /dev/null +++ b/test-cli/test/tests/qnand.py @@ -0,0 +1,22 @@ +import unittest +import sh + +class Qnand(unittest.TestCase): + + __device = "10M" + + # varlist: device + def __init__(self, testname, testfunc, varlist): + super(Qnand, self).__init__(testfunc) + + if "device" in varlist: + self.__device = varlist["device"] + else: + raise Exception('device param inside Qnand must be defined') + self._testMethodDoc = testname + + def execute(self): + try: + sh.nandtest("-m", self.__device) + except sh.ErrorReturnCode as e: + self.fail("failed: could not complete nandtest command") diff --git a/test-cli/test/tests/qram.py b/test-cli/test/tests/qram.py index 8ec0210..1b66e5c 100644 --- a/test-cli/test/tests/qram.py +++ b/test-cli/test/tests/qram.py @@ -1,22 +1,27 @@ -from test.helpers.syscmd import SysCommand import unittest +import sh class Qram(unittest.TestCase): - def __init__(self, testname, testfunc, memSize): + __memSize = "10M" + __loops = "1" + + # varlist: memSize, loops + def __init__(self, testname, testfunc, varlist): super(Qram, self).__init__(testfunc) - self.__memSize = memSize + + if "memSize" in varlist: + self.__memSize = varlist["memSize"] + else: + raise Exception('memSize param inside Qram must be defined') + if "loops" in varlist: + self.__loops = varlist["loops"] + else: + raise Exception('loops param inside Qram must be defined') self._testMethodDoc = testname def execute(self): - str_cmd= "free -m" - free_command = SysCommand("free_ram", str_cmd) - if free_command.execute() == 0: - self.__raw_out = free_command.getOutput() - if self.__raw_out == "": - return -1 - lines = free_command.getOutput().splitlines() - aux = [int(s) for s in lines[1].split() if s.isdigit()] - self.failUnless(int(aux[0])>int(self.__memSize),"failed: total ram memory size lower than expected") - else: - self.fail("failed: could not complete iperf command") + try: + sh.memtester(self.__memSize, "1") + except sh.ErrorReturnCode as e: + self.fail("failed: could not complete memtester command") diff --git a/test-cli/test/tests/qrtc.py b/test-cli/test/tests/qrtc.py index 1d02f78..8e31572 100644 --- a/test-cli/test/tests/qrtc.py +++ b/test-cli/test/tests/qrtc.py @@ -4,9 +4,14 @@ import time class Qrtc(unittest.TestCase): - def __init__(self, testname, testfunc, rtc): + __rtc = "/dev/rtc0" + + def __init__(self, testname, testfunc, varlist): super(Qrtc, self).__init__(testfunc) - self.__rtc = rtc + if "rtc" in varlist: + self.__rtc = varlist["rtc"] + else: + raise Exception('rtc param inside Qrtc must be defined') self._testMethodDoc = testname def execute(self): diff --git a/test-cli/test/tests/qscreen.py b/test-cli/test/tests/qscreen.py index b8a5a48..87e53b2 100644 --- a/test-cli/test/tests/qscreen.py +++ b/test-cli/test/tests/qscreen.py @@ -5,9 +5,13 @@ from test.helpers.cv_display_test import pattern_detect class Qscreen(unittest.TestCase): - def __init__(self, testname, testfunc, display): + #varlist: display + def __init__(self, testname, testfunc, varlist): super(Qscreen, self).__init__(testfunc) - self.__display = display + if "display" in varlist: + self.__display = varlist["display"] + else: + raise Exception('display param inside Qscreen have been be defined') self._testMethodDoc = testname def execute(self): @@ -17,15 +21,5 @@ class Qscreen(unittest.TestCase): test_screen = pattern_detect(1) if not test_screen=="0": self.fail("failed: {}".format(test_screen)) - str_cmd= "fbi -T 1 /home/root/result_hdmi_img.jpg -d /dev/fb0 --noverbose -a" - show_img = SysCommand("show-image", str_cmd) - show_img.execute() else: self.fail("failed: could not display the image") - try: - str_cmd= "fbi -T 1 /home/root/result_hdmi_img.jpg -d /dev/fb0 --noverbose -a" - show_img = SysCommand("show-image", str_cmd) - show_img.execute() - except ValueError: - print("COULD NOT DISPLAY IMAGE") - diff --git a/test-cli/test/tests/qserial.py b/test-cli/test/tests/qserial.py index 43ba3c8..d3e2ee6 100644 --- a/test-cli/test/tests/qserial.py +++ b/test-cli/test/tests/qserial.py @@ -6,11 +6,17 @@ import time class Qserial(unittest.TestCase): - def __init__(self, testname, testfunc, port, baudrate): + def __init__(self, testname, testfunc, varlist): super(Qserial, self).__init__(testfunc) - self.__port = port + if "port" in varlist: + self.__port = varlist["port"] + else: + raise Exception('port param inside Qserial must be defined') self.__serial = serial.Serial(self.__port, timeout=1) - self.__serial.baudrate = baudrate + if "baudrate" in varlist: + self.__baudrate = varlist["baudrate"] + else: + raise Exception('baudrate param inside Qserial must be defined') self._testMethodDoc = testname def __del__(self): @@ -19,12 +25,15 @@ class Qserial(unittest.TestCase): def execute(self): self.__serial.flushInput() self.__serial.flushOutput() + #generate a random uuid test_uuid = str(uuid.uuid4()).encode() + #send the uuid through serial port self.__serial.write(test_uuid) time.sleep(0.05) # there might be a small delay if self.__serial.inWaiting() == 0: self.fail("failed: PORT {} wait timeout exceded, wrong communication?".format(self.__port)) else: + # check if what it was sent is equal to what has been received if (self.__serial.readline() != test_uuid): self.fail("failed: PORT {} write/read mismatch".format(self.__port)) diff --git a/test-cli/test/tests/qusb.py b/test-cli/test/tests/qusb.py index 44490bc..0390143 100644 --- a/test-cli/test/tests/qusb.py +++ b/test-cli/test/tests/qusb.py @@ -3,17 +3,20 @@ import unittest class Qusb(unittest.TestCase): - def __init__(self, testname, testfunc, devLabel, numPorts): + def __init__(self, testname, testfunc, varlist): super(Qusb, self).__init__(testfunc) - self.__numPorts = numPorts + if "numPorts" in varlist: + self.__numPorts = varlist["numPorts"] + else: + raise Exception('numPorts param inside Qusb must be defined') self._testMethodDoc = testname - self.__devLabel = devLabel + if "devLabel" in varlist: + self.__devLabel = varlist["devLabel"] + else: + raise Exception('devLabel param inside Qusb must be defined') if testname=="USBOTG": self.__usbFileName = "/this_is_an_usb_otg" self.__usbtext = "USBOTG" - elif testname=="SATA": - self.__usbFileName = "/this_is_a_sata" - self.__usbtext = "SATA" else: self.__usbFileName = "/this_is_an_usb_host" self.__usbtext = "USBHOST" @@ -57,4 +60,4 @@ class Qusb(unittest.TestCase): else: self.fail("failed: reference and real usb host devices number mismatch") else: - self.fail("failed: couldn't execute lsblk command") + self.fail("failed: couldn't execute lsblk command") \ No newline at end of file diff --git a/test-cli/test/tests/qwifi.py b/test-cli/test/tests/qwifi.py index d08149b..188e300 100644 --- a/test-cli/test/tests/qwifi.py +++ b/test-cli/test/tests/qwifi.py @@ -1,45 +1,33 @@ from test.helpers.syscmd import SysCommand import unittest -import subprocess class Qwifi(unittest.TestCase): - def __init__(self, testname, testfunc, signal): + def __init__(self, testname, testfunc, varlist): super(Qwifi, self).__init__(testfunc) - self.__signal = signal + if "signal" in varlist: + self.__signal = varlist["signal"] + else: + raise Exception('signal param inside Qwifi must be defined') self._testMethodDoc = testname - # WiFi SERVERIP fixed at the moment. - self._serverip = "192.168.5.1" def execute(self): - # First check connection with the wifi server using ping command - #str_cmd = "ping -c 1 {} > /dev/null".format(self._serverip) - #wifi_ping = SysCommand("wifi_ping", str_cmd) - #wifi_ping.execute() - #res = subprocess.call(['ping', '-c', '1', self._serverip]) - p = subprocess.Popen(['ping','-c','1',self._serverip,'-W','2'],stdout=subprocess.PIPE) - p.wait() - res=p.poll() - if res == 0: - str_cmd= "iw wlan0 link" - wifi_stats = SysCommand("wifi_stats", str_cmd) - if wifi_stats.execute() == 0: - w_stats = wifi_stats.getOutput().decode('ascii').splitlines() - #self._longMessage = str(self.__raw_out).replace("'", "") - if not w_stats[0] == "Not connected.": - signal_st = w_stats[5].split(" ")[1] - try: - int(signal_st) - if -1*int(signal_st) > int(self.__signal): - self.fail("failed: signal to server lower than expected") - except ValueError: - self.fail("failed: error output (Bad connection?)") - else: + str_cmd= "iw wlan0 link" + wifi_stats = SysCommand("wifi_stats", str_cmd) + if wifi_stats.execute() == 0: + w_stats = wifi_stats.getOutput().decode('ascii').splitlines() + #self._longMessage = str(self.__raw_out).replace("'", "") + if not w_stats[0] == "Not connected.": + signal_st = w_stats[5].split(" ")[1] + try: + int(signal_st) + if -1*int(signal_st) > int(self.__signal): + self.fail("failed: signal to server lower than expected") + except ValueError: self.fail("failed: error output (Bad connection?)") - #tx_brate = float(w_stats[6].split(" ")[2]) else: - self.fail("failed: couldn't execute iw command") + self.fail("failed: error output (Bad connection?)") + #tx_brate = float(w_stats[6].split(" ")[2]) else: - self.fail("failed: ping to server {}".format(self._serverip)) - + self.fail("failed: couldn't execute iw command") -- cgit v1.1 From a615dac03a9ea7897bf3947ba8d31278b2cea121 Mon Sep 17 00:00:00 2001 From: Hector Fernandez Date: Thu, 5 Mar 2020 16:29:31 +0100 Subject: Modify tests. --- test-cli/test/tests/qeeprom.py | 62 +++++++++++++++------------- test-cli/test/tests/qethernet.py | 24 +++++++---- test-cli/test/tests/qwifi.py | 88 +++++++++++++++++++++++++++++++--------- 3 files changed, 118 insertions(+), 56 deletions(-) (limited to 'test-cli/test/tests') diff --git a/test-cli/test/tests/qeeprom.py b/test-cli/test/tests/qeeprom.py index 2bab248..5bb0755 100644 --- a/test-cli/test/tests/qeeprom.py +++ b/test-cli/test/tests/qeeprom.py @@ -1,38 +1,44 @@ -from test.helpers.syscmd import SysCommand +import sh import unittest -import uuid + class Qeeprom(unittest.TestCase): + __position = None + __uuid = None + __eeprompath = None + + # varlist: position, uuid, eeprompath def __init__(self, testname, testfunc, varlist): super(Qeeprom, self).__init__(testfunc) self._testMethodDoc = testname + if "position" in varlist: + self.__position = varlist["position"] + else: + raise Exception('position param inside Qeeprom must be defined') + if "uuid" in varlist: + self.__uuid = varlist["uuid"] + else: + raise Exception('uuid param inside Qeeprom must be defined') + if "eeprompath" in varlist: + self.__eeprompath = varlist["eeprompath"] + else: + raise Exception('eeprompath param inside Qeeprom must be defined') def execute(self): - str_cmd = "find /sys/ -iname 'eeprom'" - eeprom_location = SysCommand("eeprom_location", str_cmd) - if eeprom_location.execute() == 0: - self.__raw_out = eeprom_location.getOutput() - if self.__raw_out == "": - self.fail("Unable to get EEPROM location. IS EEPROM CONNECTED?") - return -1 - eeprom=self.__raw_out.decode('ascii') - test_uuid = uuid.uuid4() - str_cmd="echo '{}' > {}".format(str(test_uuid), eeprom) - eeprom_write = SysCommand("eeprom_write", str_cmd) - if eeprom_write.execute() == 0: - self.__raw_out = eeprom_write.getOutput() - if self.__raw_out == "": - self.fail("Unable to write on the EEPROM?") - return -1 - str_cmd = "head -2 {}".format(eeprom) - eeprom_read = SysCommand("eeprom_read", str_cmd) - if eeprom_read.execute() == 0: - self.__raw_out = eeprom_read.getOutput() - if self.__raw_out == "": - self.fail("Unable to read from the EEPROM?") - return -1 - if(str(self.__raw_out).find(str(test_uuid)) == -1): - self.fail("failed: READ/WRITE mismatch") + # write data into the eeprom + p = sh.dd(sh.echo(self._uuid), "of=" + self.__eeprompath, "bs=1", "seek=" + self.__position) + if p.exit_code == 0: + # read data from the eeprom + p = sh.dd("if=" + self.__eeprompath, "bs=1", "skip=" + self.__position, "count=" + len(self.__uuid)) + if p.exit_code == 0: + uuid_rx = p.stdout.decode('ascii') + # compare both values + if (uuid_rx != self.__uuid): + self.fail("failed: mismatch between written and received values.") + + else: + self.fail("failed: Unable to read from the EEPROM device.") else: - self.fail("failed: could not complete find eeprom command") + self.fail("failed: Unable to write on the EEPROM device.") + diff --git a/test-cli/test/tests/qethernet.py b/test-cli/test/tests/qethernet.py index be984f5..22d796c 100644 --- a/test-cli/test/tests/qethernet.py +++ b/test-cli/test/tests/qethernet.py @@ -1,12 +1,14 @@ import unittest import sh -import ex +import re + class Qethernet(unittest.TestCase): __sip = None __numbytestx = None __bind = None __OKBW = None + __port = None def __init__(self, testname, testfunc, varlist): super(Qethernet, self).__init__(testfunc) @@ -22,16 +24,21 @@ class Qethernet(unittest.TestCase): self.__OKBW = varlist["OKBW"] else: raise Exception('OKBW param inside Qethernet must be defined') + if "port" in varlist: + self.__port = varlist["port"] + else: + raise Exception('port param inside Qethernet must be defined') self.__numbytestx = "10M" self._testMethodDoc = testname def execute(self): # execute iperf command against the server if self.__bind is None: - p = sh.iperf("-c", self.__sip, "-x", "CMSV", "-n", self.__numbytestx, "-f", "m") + p = sh.iperf("-c", self.__sip, "-x", "CMSV", "-n", self.__numbytestx, "-f", "m", "-p", self.__port) else: - p = sh.iperf("-c", self.__sip, "-x", "CMSV", "-n", self.__numbytestx, "-f", "m", "-B", self.__bind) - #check if it was executed succesfully + p = sh.iperf("-c", self.__sip, "-x", "CMSV", "-n", self.__numbytestx, "-f", "m", "-p", self.__port, "-B", + self.__bind) + # check if it was executed succesfully if p.exit_code == 0: if p.stdout == "": self.fail("failed: error executing iperf command") @@ -41,11 +48,12 @@ class Qethernet(unittest.TestCase): # get first line dat = lines[1].decode('ascii') # search for the BW value - a = re.search("\d+(\.\d)? Mbits/sec",dat) - b = a.group().split( ) - BWreal = b[0] + a = re.search("\d+(\.\d)? Mbits/sec", dat) + b = a.group().split() + bwreal = b[0] # check if BW is in the expected range - self.failUnless(float(BWreal)>float(self.__OKBW)*0.9,"failed: speed is lower than spected. Speed(MB/s)" + str(BWreal)) + self.failUnless(float(bwreal) > float(self.__OKBW) * 0.9, + "failed: speed is lower than spected. Speed(MB/s)" + str(bwreal)) else: self.fail("failed: could not complete iperf command") diff --git a/test-cli/test/tests/qwifi.py b/test-cli/test/tests/qwifi.py index 188e300..154dd52 100644 --- a/test-cli/test/tests/qwifi.py +++ b/test-cli/test/tests/qwifi.py @@ -1,33 +1,81 @@ -from test.helpers.syscmd import SysCommand import unittest +import sh +import re + class Qwifi(unittest.TestCase): + __sip = None + __numbytestx = None + __bind = None + __OKBW = None + __port = None + #varlist: sip, bind, OKBW, port def __init__(self, testname, testfunc, varlist): super(Qwifi, self).__init__(testfunc) - if "signal" in varlist: - self.__signal = varlist["signal"] + if "sip" in varlist: + self.__sip = varlist["sip"] + else: + raise Exception('sip param inside Qwifi have been be defined') + if "OKBW" in varlist: + self.__OKBW = varlist["OKBW"] + else: + raise Exception('OKBW param inside Qwifi must be defined') + if "port" in varlist: + self.__port = varlist["port"] else: - raise Exception('signal param inside Qwifi must be defined') + raise Exception('port param inside Qwifi must be defined') + if "bind" in varlist: + self.__bind = varlist["bind"] + else: + self.__bind = None + self.__numbytestx = "10M" self._testMethodDoc = testname def execute(self): - str_cmd= "iw wlan0 link" - wifi_stats = SysCommand("wifi_stats", str_cmd) - if wifi_stats.execute() == 0: - w_stats = wifi_stats.getOutput().decode('ascii').splitlines() - #self._longMessage = str(self.__raw_out).replace("'", "") - if not w_stats[0] == "Not connected.": - signal_st = w_stats[5].split(" ")[1] - try: - int(signal_st) - if -1*int(signal_st) > int(self.__signal): - self.fail("failed: signal to server lower than expected") - except ValueError: - self.fail("failed: error output (Bad connection?)") + # check if the board is connected to the router by wifi + p = sh.iw("wlan0", "link") + if p.exit_code == 0: + # get the first line of the output stream + out1 = p.stdout.decode('ascii').splitlines()[0] + if out1 != "Not connected.": + #check if the board has ip in the wlan0 interface + p = sh.ifconfig("wlan0") + if p.exit_code == 0: + result = re.search("inet addr:(?!127\.0{1,3}\.0{1,3}\.0{0,2}1$)((25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.){3}(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)", + p.stdout) + if result: + # execute iperf command against the server + if self.__bind is None: + p = sh.iperf("-c", self.__sip, "-x", "CMSV", "-n", self.__numbytestx, "-f", "m", "-p", + self.__port) + else: + p = sh.iperf("-c", self.__sip, "-x", "CMSV", "-n", self.__numbytestx, "-f", "m", "-p", + self.__port, "-B", self.__bind) + # check if it was executed succesfully + if p.exit_code == 0: + if p.stdout == "": + self.fail("failed: error executing iperf command") + # analyze output string + # split by lines + lines = p.stdout.splitlines() + # get first line + dat = lines[1].decode('ascii') + # search for the BW value + a = re.search("\d+(\.\d)? Mbits/sec", dat) + b = a.group().split() + bwreal = b[0] + + # check if BW is in the expected range + self.failUnless(float(bwreal) > float(self.__OKBW) * 0.9, + "failed: speed is lower than spected. Speed(MB/s)" + str(bwreal)) + else: + self.fail("failed: could not complete iperf command") + else: + self.fail("failed: wlan0 interface doesn't have any ip address.") + else: + self.fail("failed: could not complete ifconfig command.") else: - self.fail("failed: error output (Bad connection?)") - #tx_brate = float(w_stats[6].split(" ")[2]) + self.fail("failed: wifi module is not connected to the router.") else: self.fail("failed: couldn't execute iw command") - -- cgit v1.1 From dd9ffc52507c391271d0821636c683fd7562b46a Mon Sep 17 00:00:00 2001 From: Hector Fernandez Date: Thu, 5 Mar 2020 18:50:47 +0100 Subject: Modified postgre functions. --- test-cli/test/tests/qeeprom.py | 17 +++++++---------- 1 file changed, 7 insertions(+), 10 deletions(-) (limited to 'test-cli/test/tests') diff --git a/test-cli/test/tests/qeeprom.py b/test-cli/test/tests/qeeprom.py index 5bb0755..c2293c2 100644 --- a/test-cli/test/tests/qeeprom.py +++ b/test-cli/test/tests/qeeprom.py @@ -5,10 +5,9 @@ import unittest class Qeeprom(unittest.TestCase): __position = None - __uuid = None __eeprompath = None - # varlist: position, uuid, eeprompath + # varlist: position, eeprompath def __init__(self, testname, testfunc, varlist): super(Qeeprom, self).__init__(testfunc) self._testMethodDoc = testname @@ -16,25 +15,23 @@ class Qeeprom(unittest.TestCase): self.__position = varlist["position"] else: raise Exception('position param inside Qeeprom must be defined') - if "uuid" in varlist: - self.__uuid = varlist["uuid"] - else: - raise Exception('uuid param inside Qeeprom must be defined') if "eeprompath" in varlist: self.__eeprompath = varlist["eeprompath"] else: raise Exception('eeprompath param inside Qeeprom must be defined') def execute(self): + # generate some data + data_tx = "isee_test" # write data into the eeprom - p = sh.dd(sh.echo(self._uuid), "of=" + self.__eeprompath, "bs=1", "seek=" + self.__position) + p = sh.dd(sh.echo(data_tx), "of=" + self.__eeprompath, "bs=1", "seek=" + self.__position) if p.exit_code == 0: # read data from the eeprom - p = sh.dd("if=" + self.__eeprompath, "bs=1", "skip=" + self.__position, "count=" + len(self.__uuid)) + p = sh.dd("if=" + self.__eeprompath, "bs=1", "skip=" + self.__position, "count=" + len(data_tx)) if p.exit_code == 0: - uuid_rx = p.stdout.decode('ascii') + data_rx = p.stdout.decode('ascii') # compare both values - if (uuid_rx != self.__uuid): + if data_rx != data_tx: self.fail("failed: mismatch between written and received values.") else: -- cgit v1.1 From 98d40cecc9818360984188755e455aa53933aab0 Mon Sep 17 00:00:00 2001 From: Hector Fernandez Date: Fri, 6 Mar 2020 09:38:18 +0100 Subject: changed parameter names of tests --- test-cli/test/tests/qeeprom.py | 2 +- test-cli/test/tests/qethernet.py | 20 ++++++++++---------- test-cli/test/tests/qram.py | 12 ++++++------ 3 files changed, 17 insertions(+), 17 deletions(-) (limited to 'test-cli/test/tests') diff --git a/test-cli/test/tests/qeeprom.py b/test-cli/test/tests/qeeprom.py index c2293c2..acdc1f6 100644 --- a/test-cli/test/tests/qeeprom.py +++ b/test-cli/test/tests/qeeprom.py @@ -27,7 +27,7 @@ class Qeeprom(unittest.TestCase): p = sh.dd(sh.echo(data_tx), "of=" + self.__eeprompath, "bs=1", "seek=" + self.__position) if p.exit_code == 0: # read data from the eeprom - p = sh.dd("if=" + self.__eeprompath, "bs=1", "skip=" + self.__position, "count=" + len(data_tx)) + p = sh.dd("if=" + self.__eeprompath, "bs=1", "skip=" + self.__position, "count=" + str(len(data_tx))) if p.exit_code == 0: data_rx = p.stdout.decode('ascii') # compare both values diff --git a/test-cli/test/tests/qethernet.py b/test-cli/test/tests/qethernet.py index 22d796c..adee67f 100644 --- a/test-cli/test/tests/qethernet.py +++ b/test-cli/test/tests/qethernet.py @@ -4,26 +4,26 @@ import re class Qethernet(unittest.TestCase): - __sip = None + __serverip = None __numbytestx = None __bind = None - __OKBW = None + __bwexpected = None __port = None def __init__(self, testname, testfunc, varlist): super(Qethernet, self).__init__(testfunc) - if "sip" in varlist: - self.__sip = varlist["sip"] + if "serverip" in varlist: + self.__serverip = varlist["serverip"] else: raise Exception('sip param inside Qethernet have been be defined') if "bind" in varlist: self.__bind = varlist["bind"] else: self.__bind = None - if "OKBW" in varlist: - self.__OKBW = varlist["OKBW"] + if "bwexpected" in varlist: + self.__bwexpected = varlist["bwexpected"] else: - raise Exception('OKBW param inside Qethernet must be defined') + raise Exception('bwexpected param inside Qethernet must be defined') if "port" in varlist: self.__port = varlist["port"] else: @@ -34,9 +34,9 @@ class Qethernet(unittest.TestCase): def execute(self): # execute iperf command against the server if self.__bind is None: - p = sh.iperf("-c", self.__sip, "-x", "CMSV", "-n", self.__numbytestx, "-f", "m", "-p", self.__port) + p = sh.iperf("-c", self.__serverip, "-x", "CMSV", "-n", self.__numbytestx, "-f", "m", "-p", self.__port) else: - p = sh.iperf("-c", self.__sip, "-x", "CMSV", "-n", self.__numbytestx, "-f", "m", "-p", self.__port, "-B", + p = sh.iperf("-c", self.__serverip, "-x", "CMSV", "-n", self.__numbytestx, "-f", "m", "-p", self.__port, "-B", self.__bind) # check if it was executed succesfully if p.exit_code == 0: @@ -53,7 +53,7 @@ class Qethernet(unittest.TestCase): bwreal = b[0] # check if BW is in the expected range - self.failUnless(float(bwreal) > float(self.__OKBW) * 0.9, + self.failUnless(float(bwreal) > float(self.__bwexpected) * 0.9, "failed: speed is lower than spected. Speed(MB/s)" + str(bwreal)) else: self.fail("failed: could not complete iperf command") diff --git a/test-cli/test/tests/qram.py b/test-cli/test/tests/qram.py index 1b66e5c..ade7aeb 100644 --- a/test-cli/test/tests/qram.py +++ b/test-cli/test/tests/qram.py @@ -3,17 +3,17 @@ import sh class Qram(unittest.TestCase): - __memSize = "10M" + __memsize = "10M" __loops = "1" - # varlist: memSize, loops + # varlist: memsize, loops def __init__(self, testname, testfunc, varlist): super(Qram, self).__init__(testfunc) - if "memSize" in varlist: - self.__memSize = varlist["memSize"] + if "memsize" in varlist: + self.__memsize = varlist["memsize"] else: - raise Exception('memSize param inside Qram must be defined') + raise Exception('memsize param inside Qram must be defined') if "loops" in varlist: self.__loops = varlist["loops"] else: @@ -22,6 +22,6 @@ class Qram(unittest.TestCase): def execute(self): try: - sh.memtester(self.__memSize, "1") + sh.memtester(self.__memsize, "1") except sh.ErrorReturnCode as e: self.fail("failed: could not complete memtester command") -- cgit v1.1 From 9f07a57d89a927aa9b172c1bf20c7ab563658c73 Mon Sep 17 00:00:00 2001 From: Hector Fernandez Date: Fri, 6 Mar 2020 12:46:27 +0100 Subject: Fixed multiple errors. --- test-cli/test/tests/qamp.py | 25 +++++++++--------- test-cli/test/tests/qaudio.py | 35 +++++++++++++------------ test-cli/test/tests/qbutton.py | 5 +++- test-cli/test/tests/qduplex_ser.py | 4 ++- test-cli/test/tests/qeeprom.py | 3 ++- test-cli/test/tests/qethernet.py | 3 +++ test-cli/test/tests/qflash.py | 3 +++ test-cli/test/tests/qi2c.py | 3 +++ test-cli/test/tests/qiperf.py | 53 -------------------------------------- test-cli/test/tests/qnand.py | 5 ++-- test-cli/test/tests/qram.py | 5 ++-- test-cli/test/tests/qrtc.py | 4 ++- test-cli/test/tests/qscreen.py | 4 ++- test-cli/test/tests/qserial.py | 4 ++- test-cli/test/tests/qusb.py | 28 ++++++++++++-------- test-cli/test/tests/qwifi.py | 14 +++++----- 16 files changed, 88 insertions(+), 110 deletions(-) delete mode 100644 test-cli/test/tests/qiperf.py (limited to 'test-cli/test/tests') diff --git a/test-cli/test/tests/qamp.py b/test-cli/test/tests/qamp.py index 3e38ce9..cf18fc5 100644 --- a/test-cli/test/tests/qamp.py +++ b/test-cli/test/tests/qamp.py @@ -3,10 +3,13 @@ import unittest import serial import time + class Qamp(unittest.TestCase): + params = None - #varlist: undercurrent, overcurrent + # varlist: undercurrent, overcurrent def __init__(self, testname, testfunc, varlist): + self.params = varlist self._current = 0.0 if "undercurrent" in varlist: self._undercurrent = varlist["undercurrent"] @@ -16,7 +19,7 @@ class Qamp(unittest.TestCase): self._overcurrent = varlist["overcurrent"] else: raise Exception('overcurrent param inside Qamp must be defined') - self._vshuntfactor=16384.0 + self._vshuntfactor = 16384.0 self._ser = serial.Serial() self._ser.port = '/dev/ttyUSB0' self._ser.baudrate = 115200 @@ -34,19 +37,19 @@ class Qamp(unittest.TestCase): def execute(self): # Open Serial port ttyUSB0 - error=0 + error = 0 try: self._ser.open() except: self.fail("failed: IMPOSSIBLE OPEN USB-SERIAL PORT ( {} )".format(self._ser.port)) - error=1 - if error==0: + error = 1 + if error == 0: # Clean input and output buffer of serial port self._ser.flushInput() self._ser.flushOutput() # Send command to read Voltage at Shunt resistor # Prepare cmd - cmd = ('at+in?\n\r') + cmd = 'at+in?\n\r' i = 0 # Write, 1 by 1 byte at a time to avoid hanging of serial receiver code of listener, emphasis being made at sleep of 50 ms. @@ -57,7 +60,7 @@ class Qamp(unittest.TestCase): # Read, 1 by 1 byte res = [] - while (self._ser.inWaiting() > 0): # if incoming bytes are waiting to be read from the serial input buffer + while self._ser.inWaiting() > 0: # if incoming bytes are waiting to be read from the serial input buffer res.append(self._ser.read(1).decode('ascii')) # read the bytes and convert from binary array to ASCII string = ''.join(res) @@ -65,12 +68,8 @@ class Qamp(unittest.TestCase): self._current = float(int(string, 0)) / self._vshuntfactor print(self._current) # In order to give a valid result it is importarnt to define an under current value - if (self._current > float(self._overcurrent)): + if self._current > float(self._overcurrent): self.fail("failed: OVERCURRENT DETECTED ( {} )".format(self._current)) - if (self._current < float(self._undercurrent)): + if self._current < float(self._undercurrent): self.fail("failed: UNDERCURRENT DETECTED ( {} )".format(self._current)) - - - - diff --git a/test-cli/test/tests/qaudio.py b/test-cli/test/tests/qaudio.py index fe57be2..ef4da67 100644 --- a/test-cli/test/tests/qaudio.py +++ b/test-cli/test/tests/qaudio.py @@ -1,38 +1,39 @@ from test.helpers.syscmd import SysCommand import unittest -#class name + + class Qaudio(unittest.TestCase): - # Initialize the variables + params = None - def __init__(self, testname, testfunc, dtmfFile): - # Doing this we will initialize the class and later on perform a particular method inside this class + def __init__(self, testname, testfunc, varlist): + self.params = varlist super(Qaudio, self).__init__(testfunc) self._testMethodDoc = testname - self._dtmfFile=dtmfFile - self.__sum=0 - self.__refSum = 25 # 1+3+5+7+9 + if "dtmfFile" in varlist: + self._dtmfFile = varlist["dtmfFile"] + else: + raise Exception('undercurrent param inside Qamp must be defined') + self.__sum = 0 + self.__refSum = 25 # 1+3+5+7+9 def execute(self): - str_cmd = "aplay test/files/dtmf-13579.wav & arecord -r 8000 -d 1 recorded.wav" #.format(self.__dtmfFile) + str_cmd = "aplay test/files/dtmf-13579.wav & arecord -r 8000 -d 1 recorded.wav" # .format(self.__dtmfFile) audio_loop = SysCommand("command-name", str_cmd) - if audio_loop.execute() == -1:# BUG: Returns -1 but work + if audio_loop.execute() == -1: # BUG: Returns -1 but work lines = audio_loop.getOutput().splitlines() str_cmd = "multimon -t wav -a DTMF recorded.wav -q" dtmf_decoder = SysCommand("command-name", str_cmd) - a=dtmf_decoder.execute() - if dtmf_decoder.execute() == -1: # BUG: Returns -1 but work + a = dtmf_decoder.execute() + if dtmf_decoder.execute() == -1: # BUG: Returns -1 but work self.__raw_out = dtmf_decoder.getOutput() if self.__raw_out == "": - return -1 + self.fail("failed: can not execute multimon command") lines = dtmf_decoder.getOutput().splitlines() for i in range(0, 5): - aux=[int(s) for s in lines[i].split() if s.isdigit()] - self.__sum=self.__sum+aux[0] + aux = [int(s) for s in lines[i].split() if s.isdigit()] + self.__sum = self.__sum + aux[0] self.failUnless(self.__sum == self.__refSum), "failed: incorrect dtmf code" + str(self.__sum) else: self.fail("failed: fail reading recorded file") - return -1 else: self.fail("failed: fail playing/recording file") - return -1 - return 0 \ No newline at end of file diff --git a/test-cli/test/tests/qbutton.py b/test-cli/test/tests/qbutton.py index 4718924..46ddde0 100644 --- a/test-cli/test/tests/qbutton.py +++ b/test-cli/test/tests/qbutton.py @@ -1,11 +1,14 @@ from test.helpers.syscmd import SysCommand import unittest -import uuid import time + class Qbutton(unittest.TestCase): + params = None def __init__(self, testname, testfunc, varlist): + self.params = varlist + super(Qbutton, self).__init__(testfunc) if "gpio" in varlist: self.__gpio = varlist["gpio"] else: diff --git a/test-cli/test/tests/qduplex_ser.py b/test-cli/test/tests/qduplex_ser.py index 837b4d0..46fb5db 100644 --- a/test-cli/test/tests/qduplex_ser.py +++ b/test-cli/test/tests/qduplex_ser.py @@ -1,12 +1,14 @@ -from test.helpers.syscmd import SysCommand import unittest import uuid import serial import time + class Qduplex(unittest.TestCase): + params = None def __init__(self, testname, testfunc, varlist): + self.params = varlist super(Qduplex, self).__init__(testfunc) if "port1" in varlist: self.__port1 = varlist["port1"] diff --git a/test-cli/test/tests/qeeprom.py b/test-cli/test/tests/qeeprom.py index acdc1f6..d2c55ef 100644 --- a/test-cli/test/tests/qeeprom.py +++ b/test-cli/test/tests/qeeprom.py @@ -3,12 +3,13 @@ import unittest class Qeeprom(unittest.TestCase): - + params = None __position = None __eeprompath = None # varlist: position, eeprompath def __init__(self, testname, testfunc, varlist): + self.params = varlist super(Qeeprom, self).__init__(testfunc) self._testMethodDoc = testname if "position" in varlist: diff --git a/test-cli/test/tests/qethernet.py b/test-cli/test/tests/qethernet.py index adee67f..1d4c9bc 100644 --- a/test-cli/test/tests/qethernet.py +++ b/test-cli/test/tests/qethernet.py @@ -9,8 +9,11 @@ class Qethernet(unittest.TestCase): __bind = None __bwexpected = None __port = None + params = None + #varlist content: serverip, bwexpected, port, (optional)bind def __init__(self, testname, testfunc, varlist): + self.params = varlist super(Qethernet, self).__init__(testfunc) if "serverip" in varlist: self.__serverip = varlist["serverip"] diff --git a/test-cli/test/tests/qflash.py b/test-cli/test/tests/qflash.py index 0c3fcb5..59ed13d 100644 --- a/test-cli/test/tests/qflash.py +++ b/test-cli/test/tests/qflash.py @@ -2,9 +2,12 @@ from test.helpers.syscmd import SysCommand import unittest from test.helpers.globalVariables import globalVar + class Qflasher(unittest.TestCase): + params = None def __init__(self, testname, testfunc, varlist): + self.params = varlist super(Qflasher, self).__init__(testfunc) self._testMethodDoc = testname model=globalVar.g_mid diff --git a/test-cli/test/tests/qi2c.py b/test-cli/test/tests/qi2c.py index 2f14424..71eb64e 100644 --- a/test-cli/test/tests/qi2c.py +++ b/test-cli/test/tests/qi2c.py @@ -1,9 +1,12 @@ from test.helpers.syscmd import SysCommand import unittest + class Qi2c(unittest.TestCase): + params = None def __init__(self, testname, testfunc, varlist): + self.params = varlist super(Qi2c, self).__init__(testfunc) if "busnum" in varlist: self.__busnum = varlist["busnum"] diff --git a/test-cli/test/tests/qiperf.py b/test-cli/test/tests/qiperf.py deleted file mode 100644 index 126b6ee..0000000 --- a/test-cli/test/tests/qiperf.py +++ /dev/null @@ -1,53 +0,0 @@ -from test.helpers.syscmd import SysCommand - - -class QIperf(object): - __sip = None - __raw_out = None - __MB_req = None - __MB_real = None - __BW_real = None - __dat_list = None - __bind = None - - def __init__(self, sip = None): - if sip is not None: - self.__sip = sip - self.__MB_req = '10' - - def execute(self, sip = None, bind = None): - if sip is not None: - self.__sip = sip - - if bind is None: - str_cmd = "iperf -c {} -x CMSV -n {}M".format(self.__sip, self.__MB_req) - else: - self.__bind = bind - str_cmd = "iperf -c {} -x CMSV -n {}M -B {}".format(self.__sip, self.__MB_req, self.__bind) - t = SysCommand("iperf", str_cmd) - if t.execute() == 0: - self.__raw_out = t.getOutput() - if self.__raw_out == "": - return -1 - lines = t.getOutput().splitlines() - dat = lines[1] - dat = dat.decode('ascii') - dat_list = dat.split( ) - for d in dat_list: - a = dat_list.pop(0) - if a == "sec": - break - self.__MB_real = dat_list[0] - self.__BW_real = dat_list[2] - self.__dat_list = dat_list - print(self.__MB_real) - print(self.__BW_real) - else: - return -1 - return 0 - - def get_Total_MB(self): - return self.__MB_real; - - def get_Total_BW(self): - return self.__MB_real; diff --git a/test-cli/test/tests/qnand.py b/test-cli/test/tests/qnand.py index 9f2cd47..5125c1c 100644 --- a/test-cli/test/tests/qnand.py +++ b/test-cli/test/tests/qnand.py @@ -1,14 +1,15 @@ import unittest import sh -class Qnand(unittest.TestCase): +class Qnand(unittest.TestCase): + params = None __device = "10M" # varlist: device def __init__(self, testname, testfunc, varlist): + self.params = varlist super(Qnand, self).__init__(testfunc) - if "device" in varlist: self.__device = varlist["device"] else: diff --git a/test-cli/test/tests/qram.py b/test-cli/test/tests/qram.py index ade7aeb..cc75b68 100644 --- a/test-cli/test/tests/qram.py +++ b/test-cli/test/tests/qram.py @@ -1,15 +1,16 @@ import unittest import sh -class Qram(unittest.TestCase): +class Qram(unittest.TestCase): + params = None __memsize = "10M" __loops = "1" # varlist: memsize, loops def __init__(self, testname, testfunc, varlist): + self.params = varlist super(Qram, self).__init__(testfunc) - if "memsize" in varlist: self.__memsize = varlist["memsize"] else: diff --git a/test-cli/test/tests/qrtc.py b/test-cli/test/tests/qrtc.py index 8e31572..884a290 100644 --- a/test-cli/test/tests/qrtc.py +++ b/test-cli/test/tests/qrtc.py @@ -2,11 +2,13 @@ from test.helpers.syscmd import SysCommand import unittest import time -class Qrtc(unittest.TestCase): +class Qrtc(unittest.TestCase): + params = None __rtc = "/dev/rtc0" def __init__(self, testname, testfunc, varlist): + self.params = varlist super(Qrtc, self).__init__(testfunc) if "rtc" in varlist: self.__rtc = varlist["rtc"] diff --git a/test-cli/test/tests/qscreen.py b/test-cli/test/tests/qscreen.py index 87e53b2..aaee628 100644 --- a/test-cli/test/tests/qscreen.py +++ b/test-cli/test/tests/qscreen.py @@ -1,12 +1,14 @@ from test.helpers.syscmd import SysCommand import unittest -import time from test.helpers.cv_display_test import pattern_detect + class Qscreen(unittest.TestCase): + params = None #varlist: display def __init__(self, testname, testfunc, varlist): + self.params = varlist super(Qscreen, self).__init__(testfunc) if "display" in varlist: self.__display = varlist["display"] diff --git a/test-cli/test/tests/qserial.py b/test-cli/test/tests/qserial.py index d3e2ee6..d798578 100644 --- a/test-cli/test/tests/qserial.py +++ b/test-cli/test/tests/qserial.py @@ -1,12 +1,14 @@ -from test.helpers.syscmd import SysCommand import unittest import uuid import serial import time + class Qserial(unittest.TestCase): + params = None def __init__(self, testname, testfunc, varlist): + self.params = varlist super(Qserial, self).__init__(testfunc) if "port" in varlist: self.__port = varlist["port"] diff --git a/test-cli/test/tests/qusb.py b/test-cli/test/tests/qusb.py index 0390143..70265a5 100644 --- a/test-cli/test/tests/qusb.py +++ b/test-cli/test/tests/qusb.py @@ -1,9 +1,14 @@ from test.helpers.syscmd import SysCommand import unittest + class Qusb(unittest.TestCase): + __numPorts = None + __devLabel = None + params = None def __init__(self, testname, testfunc, varlist): + self.params = varlist super(Qusb, self).__init__(testfunc) if "numPorts" in varlist: self.__numPorts = varlist["numPorts"] @@ -14,27 +19,27 @@ class Qusb(unittest.TestCase): self.__devLabel = varlist["devLabel"] else: raise Exception('devLabel param inside Qusb must be defined') - if testname=="USBOTG": + if testname == "USBOTG": self.__usbFileName = "/this_is_an_usb_otg" self.__usbtext = "USBOTG" else: self.__usbFileName = "/this_is_an_usb_host" self.__usbtext = "USBHOST" - self.__numUsbFail=[] + self.__numUsbFail = [] def execute(self): - str_cmd= "lsblk -o LABEL" + str_cmd = "lsblk -o LABEL" lsblk_command = SysCommand("lsblk", str_cmd) if lsblk_command.execute() == 0: self.__raw_out = lsblk_command.getOutput() if self.__raw_out == "": return -1 lines = lsblk_command.getOutput().splitlines() - host_list=[] + host_list = [] for i in range(len(lines)): - if str(lines[i].decode('ascii'))==self.__devLabel: + if str(lines[i].decode('ascii')) == self.__devLabel: host_list.append(i) - if len(host_list)==int(self.__numPorts): + if len(host_list) == int(self.__numPorts): str_cmd = "lsblk -o MOUNTPOINT" lsblk_command = SysCommand("lsblk", str_cmd) if lsblk_command.execute() == 0: @@ -45,13 +50,14 @@ class Qusb(unittest.TestCase): else: lines = lsblk_command.getOutput().splitlines() for i in range(len(host_list)): - file_path=str(lines[host_list[i]].decode('ascii')) + self.__usbFileName + file_path = str(lines[host_list[i]].decode('ascii')) + self.__usbFileName usb_file = open(file_path, 'r') - read=usb_file.read() - if read.find(self.__usbtext)!=-1: + read = usb_file.read() + if read.find(self.__usbtext) != -1: print(file_path + " --> OK!") else: - self.fail("failed: could not read from usb {}".format(lines[host_list[i]].decode('ascii'))) + self.fail( + "failed: could not read from usb {}".format(lines[host_list[i]].decode('ascii'))) self.__numUsbFail.append(host_list[i]) usb_file.close() else: @@ -60,4 +66,4 @@ class Qusb(unittest.TestCase): else: self.fail("failed: reference and real usb host devices number mismatch") else: - self.fail("failed: couldn't execute lsblk command") \ No newline at end of file + self.fail("failed: couldn't execute lsblk command") diff --git a/test-cli/test/tests/qwifi.py b/test-cli/test/tests/qwifi.py index 154dd52..2a5fa0c 100644 --- a/test-cli/test/tests/qwifi.py +++ b/test-cli/test/tests/qwifi.py @@ -9,14 +9,16 @@ class Qwifi(unittest.TestCase): __bind = None __OKBW = None __port = None + params = None - #varlist: sip, bind, OKBW, port + #varlist: serverip, bind, OKBW, port def __init__(self, testname, testfunc, varlist): + self.params = varlist super(Qwifi, self).__init__(testfunc) - if "sip" in varlist: - self.__sip = varlist["sip"] + if "serverip" in varlist: + self.__serverip = varlist["serverip"] else: - raise Exception('sip param inside Qwifi have been be defined') + raise Exception('serverip param inside Qwifi have been be defined') if "OKBW" in varlist: self.__OKBW = varlist["OKBW"] else: @@ -47,10 +49,10 @@ class Qwifi(unittest.TestCase): if result: # execute iperf command against the server if self.__bind is None: - p = sh.iperf("-c", self.__sip, "-x", "CMSV", "-n", self.__numbytestx, "-f", "m", "-p", + p = sh.iperf("-c", self.__serverip, "-x", "CMSV", "-n", self.__numbytestx, "-f", "m", "-p", self.__port) else: - p = sh.iperf("-c", self.__sip, "-x", "CMSV", "-n", self.__numbytestx, "-f", "m", "-p", + p = sh.iperf("-c", self.__serverip, "-x", "CMSV", "-n", self.__numbytestx, "-f", "m", "-p", self.__port, "-B", self.__bind) # check if it was executed succesfully if p.exit_code == 0: -- cgit v1.1 From a03055f657d2e970e45e7ea2a3e66857f821eabd Mon Sep 17 00:00:00 2001 From: Hector Fernandez Date: Mon, 9 Mar 2020 09:03:28 +0100 Subject: Solved problems with consumption test. Fixed error when getting mac address. --- test-cli/test/tests/qamp.py | 1 + test-cli/test/tests/qamper.py | 51 +++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 52 insertions(+) create mode 100644 test-cli/test/tests/qamper.py (limited to 'test-cli/test/tests') diff --git a/test-cli/test/tests/qamp.py b/test-cli/test/tests/qamp.py index cf18fc5..56511c8 100644 --- a/test-cli/test/tests/qamp.py +++ b/test-cli/test/tests/qamp.py @@ -36,6 +36,7 @@ class Qamp(unittest.TestCase): self._ser.close() def execute(self): + # Open Serial port ttyUSB0 error = 0 try: diff --git a/test-cli/test/tests/qamper.py b/test-cli/test/tests/qamper.py new file mode 100644 index 0000000..2b02302 --- /dev/null +++ b/test-cli/test/tests/qamper.py @@ -0,0 +1,51 @@ +import unittest +from test.helpers.amper import Amper + + +class Qamper(unittest.TestCase): + params = None + + # varlist: undercurrent, overcurrent + def __init__(self, testname, testfunc, varlist): + self.params = varlist + super(Qamper, self).__init__(testfunc) + + if "undercurrent" in varlist: + self._undercurrent = varlist["undercurrent"] + else: + raise Exception('undercurrent param inside Qamp must be defined') + if "overcurrent" in varlist: + self._overcurrent = varlist["overcurrent"] + else: + raise Exception('overcurrent param inside Qamp must be defined') + self._testMethodDoc = testname + + def execute(self): + amp = Amper() + print(amp) + if not amp.open(): + print("1") + self.fail("failed: can not open serial port") + return + + if not amp.hello(): + print("2") + self.fail("failed: can not communicate") + return + + result = amp.getCurrent() + print(result) + + amp.close() + + + + + + + # # In order to give a valid result it is importarnt to define an under current value + # if self._current > float(self._overcurrent): + # self.fail("failed: OVERCURRENT DETECTED ( {} )".format(self._current)) + # + # if self._current < float(self._undercurrent): + # self.fail("failed: UNDERCURRENT DETECTED ( {} )".format(self._current)) -- cgit v1.1 From c685367cbd6abf1c6ae442df759e39b25a907d3b Mon Sep 17 00:00:00 2001 From: Hector Fernandez Date: Mon, 9 Mar 2020 12:39:50 +0100 Subject: Fixed several test procedures. --- test-cli/test/tests/qamper.py | 27 ++++-------- test-cli/test/tests/qduplex_ser.py | 40 ++++++++++-------- test-cli/test/tests/qeeprom.py | 1 - test-cli/test/tests/qi2c.py | 6 +-- test-cli/test/tests/qrtc.py | 26 +++++++----- test-cli/test/tests/qserial.py | 22 ++++++---- test-cli/test/tests/qusb.py | 87 ++++++++++++++------------------------ 7 files changed, 95 insertions(+), 114 deletions(-) (limited to 'test-cli/test/tests') diff --git a/test-cli/test/tests/qamper.py b/test-cli/test/tests/qamper.py index 2b02302..b3ab8b1 100644 --- a/test-cli/test/tests/qamper.py +++ b/test-cli/test/tests/qamper.py @@ -22,30 +22,21 @@ class Qamper(unittest.TestCase): def execute(self): amp = Amper() - print(amp) + # open serial connection if not amp.open(): - print("1") self.fail("failed: can not open serial port") return - + # check if the amperimeter is connected and working if not amp.hello(): - print("2") self.fail("failed: can not communicate") return - + # get current value (in Amperes) result = amp.getCurrent() - print(result) - + # close serial connection amp.close() + # # In order to give a valid result it is importarnt to define an under current value + if result > float(self._overcurrent): + self.fail("failed: Overcurrent detected ( {} )".format(result)) - - - - - - # # In order to give a valid result it is importarnt to define an under current value - # if self._current > float(self._overcurrent): - # self.fail("failed: OVERCURRENT DETECTED ( {} )".format(self._current)) - # - # if self._current < float(self._undercurrent): - # self.fail("failed: UNDERCURRENT DETECTED ( {} )".format(self._current)) + if result < float(self._undercurrent): + self.fail("failed: Undercurrent detected ( {} )".format(result)) diff --git a/test-cli/test/tests/qduplex_ser.py b/test-cli/test/tests/qduplex_ser.py index 46fb5db..0666363 100644 --- a/test-cli/test/tests/qduplex_ser.py +++ b/test-cli/test/tests/qduplex_ser.py @@ -6,7 +6,11 @@ import time class Qduplex(unittest.TestCase): params = None + __port1 = None + __port2 = None + __baudrate = None + # varlist: port1, port2, baudrate def __init__(self, testname, testfunc, varlist): self.params = varlist super(Qduplex, self).__init__(testfunc) @@ -14,46 +18,46 @@ class Qduplex(unittest.TestCase): self.__port1 = varlist["port1"] else: raise Exception('port1 param inside Qduplex must be defined') - self.__serial1 = serial.Serial(self.__port1, timeout=1) - if "port2" in varlist: self.__port2 = varlist["port2"] else: raise Exception('port2 param inside Qduplex must be defined') - self.__serial2 = serial.Serial(self.__port2, timeout=1) - if "baudrate" in varlist: - self.__serial1.baudrate = varlist["baudrate"] - self.__serial2.baudrate = varlist["baudrate"] + self.__baudrate = varlist["baudrate"] else: raise Exception('baudrate param inside Qduplex must be defined') - self._testMethodDoc = testname + # open serial connection + self.__serial1 = serial.Serial(self.__port1, self.__baudrate, timeout=1) + self.__serial1.flushInput() + self.__serial1.flushOutput() + self.__serial2 = serial.Serial(self.__port2, self.__baudrate, timeout=1) + self.__serial2.flushInput() + self.__serial2.flushOutput() + def __del__(self): self.__serial1.close() self.__serial2.close() def execute(self): - self.__serial1.flushInput() - self.__serial1.flushOutput() - self.__serial2.flushInput() - self.__serial2.flushOutput() + # generate a random uuid test_uuid = str(uuid.uuid4()).encode() + # send the uuid through serial port self.__serial1.write(test_uuid) time.sleep(0.05) # there might be a small delay if self.__serial2.inWaiting() == 0: - self.fail("failed: PORT {} wait timeout exceded, wrong communication?".format(self.__port2)) + self.fail("failed: port {} wait timeout exceded".format(self.__port2)) else: - if (self.__serial2.readline() != test_uuid): - self.fail("failed: PORT {} write/read mismatch".format(self.__port2)) + if self.__serial2.readline() != test_uuid: + self.fail("failed: port {} write/read mismatch".format(self.__port2)) test_uuid = str(uuid.uuid4()).encode() + # send the uuid through serial port self.__serial2.write(test_uuid) time.sleep(0.05) # there might be a small delay if self.__serial1.inWaiting() == 0: - self.fail("failed: PORT {} wait timeout exceded, wrong communication?".format(self.__port1)) + self.fail("failed: port {} wait timeout exceded".format(self.__port1)) else: - if (self.__serial1.readline() != test_uuid): - self.fail("failed: PORT {} write/read mismatch".format(self.__port1)) - + if self.__serial1.readline() != test_uuid: + self.fail("failed: port {} write/read mismatch".format(self.__port1)) diff --git a/test-cli/test/tests/qeeprom.py b/test-cli/test/tests/qeeprom.py index d2c55ef..1921bf7 100644 --- a/test-cli/test/tests/qeeprom.py +++ b/test-cli/test/tests/qeeprom.py @@ -34,7 +34,6 @@ class Qeeprom(unittest.TestCase): # compare both values if data_rx != data_tx: self.fail("failed: mismatch between written and received values.") - else: self.fail("failed: Unable to read from the EEPROM device.") else: diff --git a/test-cli/test/tests/qi2c.py b/test-cli/test/tests/qi2c.py index 71eb64e..c59975e 100644 --- a/test-cli/test/tests/qi2c.py +++ b/test-cli/test/tests/qi2c.py @@ -20,7 +20,7 @@ class Qi2c(unittest.TestCase): self._testMethodDoc = testname def execute(self): - str_cmd= "i2cdetect -a -y -r {}".format(self.__busnum) + str_cmd = "i2cdetect -a -y -r {}".format(self.__busnum) i2c_command = SysCommand("i2cdetect", str_cmd) if i2c_command.execute() == 0: self.__raw_out = i2c_command.getOutput() @@ -28,8 +28,8 @@ class Qi2c(unittest.TestCase): return -1 lines=self.__raw_out.decode('ascii').splitlines() for i in range(len(lines)): - if (lines[i].count('UU')): - if (lines[i].find("UU")): + if lines[i].count('UU'): + if lines[i].find("UU"): self.__devices.append("0x{}{}".format((i - 1), hex(int((lines[i].find("UU") - 4) / 3)).split('x')[-1])) for i in range(len(self.__register)): if not(self.__register[i] in self.__devices): diff --git a/test-cli/test/tests/qrtc.py b/test-cli/test/tests/qrtc.py index 884a290..0be0f99 100644 --- a/test-cli/test/tests/qrtc.py +++ b/test-cli/test/tests/qrtc.py @@ -1,11 +1,12 @@ -from test.helpers.syscmd import SysCommand +import sh import unittest import time +import re class Qrtc(unittest.TestCase): params = None - __rtc = "/dev/rtc0" + __rtc = None def __init__(self, testname, testfunc, varlist): self.params = varlist @@ -17,16 +18,19 @@ class Qrtc(unittest.TestCase): self._testMethodDoc = testname def execute(self): - str_cmd = "hwclock -f {}".format(self.__rtc) - rtc_set = SysCommand("rtc_set", str_cmd) - if rtc_set.execute() == 0: - curr_hour = rtc_set.getOutput().decode('ascii').split(" ") - time1 = int((curr_hour[4].split(":"))[2]) + # get time from RTC peripheral + p = sh.hwclock("-f", self.__rtc) + if p.exit_code == 0: + time1 = re.search("([01]?[0-9]{1}|2[0-3]{1})[:][0-5]{1}[0-9]{1}[:][0-5]{1}[0-9]{1}", + p.stdout.decode('ascii')) time.sleep(1) - if rtc_set.execute() == 0: - curr_hour = rtc_set.getOutput().decode('ascii').split(" ") - time2 = int((curr_hour[4].split(":"))[2]) - if time1==time2: + # get time from RTC another time + p = sh.hwclock("-f", self.__rtc) + if p.exit_code == 0: + time2 = re.search("([01]?[0-9]{1}|2[0-3]{1})[:][0-5]{1}[0-9]{1}[:][0-5]{1}[0-9]{1}", + p.stdout.decode('ascii')) + # check if the seconds of both times are different + if time1 == time2: self.fail("failed: RTC is not running") else: self.fail("failed: couldn't execute hwclock command 2nd time") diff --git a/test-cli/test/tests/qserial.py b/test-cli/test/tests/qserial.py index d798578..67e3af1 100644 --- a/test-cli/test/tests/qserial.py +++ b/test-cli/test/tests/qserial.py @@ -6,7 +6,10 @@ import time class Qserial(unittest.TestCase): params = None + __port = None + __baudrate = None + #varlist: port, baudrate def __init__(self, testname, testfunc, varlist): self.params = varlist super(Qserial, self).__init__(testfunc) @@ -14,28 +17,31 @@ class Qserial(unittest.TestCase): self.__port = varlist["port"] else: raise Exception('port param inside Qserial must be defined') - self.__serial = serial.Serial(self.__port, timeout=1) + if "baudrate" in varlist: self.__baudrate = varlist["baudrate"] else: raise Exception('baudrate param inside Qserial must be defined') self._testMethodDoc = testname + # open serial connection + self.__serial = serial.Serial(self.__port, self.__baudrate, timeout=1) + self.__serial.flushInput() + self.__serial.flushOutput() + def __del__(self): self.__serial.close() def execute(self): - self.__serial.flushInput() - self.__serial.flushOutput() - #generate a random uuid + # generate a random uuid test_uuid = str(uuid.uuid4()).encode() - #send the uuid through serial port + # send the uuid through serial port self.__serial.write(test_uuid) time.sleep(0.05) # there might be a small delay if self.__serial.inWaiting() == 0: - self.fail("failed: PORT {} wait timeout exceded, wrong communication?".format(self.__port)) + self.fail("failed: port {} wait timeout exceded".format(self.__port)) else: # check if what it was sent is equal to what has been received - if (self.__serial.readline() != test_uuid): - self.fail("failed: PORT {} write/read mismatch".format(self.__port)) + if self.__serial.readline() != test_uuid: + self.fail("failed: port {} write/read mismatch".format(self.__port)) diff --git a/test-cli/test/tests/qusb.py b/test-cli/test/tests/qusb.py index 70265a5..7ecf31e 100644 --- a/test-cli/test/tests/qusb.py +++ b/test-cli/test/tests/qusb.py @@ -1,69 +1,46 @@ -from test.helpers.syscmd import SysCommand +import sh import unittest +import re +from test.helpers.changedir import changedir class Qusb(unittest.TestCase): - __numPorts = None - __devLabel = None params = None def __init__(self, testname, testfunc, varlist): self.params = varlist super(Qusb, self).__init__(testfunc) - if "numPorts" in varlist: - self.__numPorts = varlist["numPorts"] - else: - raise Exception('numPorts param inside Qusb must be defined') self._testMethodDoc = testname - if "devLabel" in varlist: - self.__devLabel = varlist["devLabel"] - else: - raise Exception('devLabel param inside Qusb must be defined') - if testname == "USBOTG": - self.__usbFileName = "/this_is_an_usb_otg" - self.__usbtext = "USBOTG" - else: - self.__usbFileName = "/this_is_an_usb_host" - self.__usbtext = "USBHOST" - self.__numUsbFail = [] def execute(self): - str_cmd = "lsblk -o LABEL" - lsblk_command = SysCommand("lsblk", str_cmd) - if lsblk_command.execute() == 0: - self.__raw_out = lsblk_command.getOutput() - if self.__raw_out == "": - return -1 - lines = lsblk_command.getOutput().splitlines() - host_list = [] - for i in range(len(lines)): - if str(lines[i].decode('ascii')) == self.__devLabel: - host_list.append(i) - if len(host_list) == int(self.__numPorts): - str_cmd = "lsblk -o MOUNTPOINT" - lsblk_command = SysCommand("lsblk", str_cmd) - if lsblk_command.execute() == 0: - self.__raw_out = lsblk_command.getOutput() - if self.__raw_out == "": - print("failed: no command output") - self.fail("failed: no command output") - else: - lines = lsblk_command.getOutput().splitlines() - for i in range(len(host_list)): - file_path = str(lines[host_list[i]].decode('ascii')) + self.__usbFileName - usb_file = open(file_path, 'r') - read = usb_file.read() - if read.find(self.__usbtext) != -1: - print(file_path + " --> OK!") - else: - self.fail( - "failed: could not read from usb {}".format(lines[host_list[i]].decode('ascii'))) - self.__numUsbFail.append(host_list[i]) - usb_file.close() - else: - self.fail("failed: couldn't execute lsblk command") - + # Execute script usb.sh + p = sh.bash('test/helpers/usb.sh') + # Search in the stdout a pattern "/dev/sd + {letter} + {number} + q = re.search("/dev/sd\w\d", p.stdout.decode('ascii')) + # get the first device which matches the pattern + device = q.group(0) + # create a new folder where the pendrive is going to be mounted + sh.mkdir("-p", "/mnt/pendrive") + # mount the device + p = sh.mount(device, "/mnt/pendrive") + if p.exit_code == 0: + # copy files + p = sh.cp("/root/usbtest/usbdatatest.bin", "/root/usbtest/usbdatatest.md5", "/mnt/pendrive") + if p.exit_code == 0: + # check md5 + with changedir("/mnt/pendrive/"): + p = sh.md5sum("-c", "usbdatatest.md5") + q = re.search("OK", p.stdout.decode('ascii')) + # delete files + sh.rm("-f", "/mnt/pendrive/usbdatatest.bin", "/mnt/pendrive/usbdatatest.md5") + # umount + sh.umount("/mnt/pendrive") + # check result + if q is None: + self.fail("failed: wrong md5 result.") else: - self.fail("failed: reference and real usb host devices number mismatch") + # umount + sh.umount("/mnt/pendrive") + self.fail("failed: unable to copy files.") else: - self.fail("failed: couldn't execute lsblk command") + self.fail("failed: unable to mount the device.") -- cgit v1.1 From d38c92bfd7b6abe3a52b51b87b1a2949b857d9b4 Mon Sep 17 00:00:00 2001 From: Hector Fernandez Date: Mon, 9 Mar 2020 19:16:08 +0100 Subject: Created function to flash the eeprom memory. --- test-cli/test/tests/qbutton.py | 64 ------------------------------------------ test-cli/test/tests/qusb.py | 2 +- test-cli/test/tests/qwifi.py | 19 +++++++------ 3 files changed, 11 insertions(+), 74 deletions(-) delete mode 100644 test-cli/test/tests/qbutton.py (limited to 'test-cli/test/tests') diff --git a/test-cli/test/tests/qbutton.py b/test-cli/test/tests/qbutton.py deleted file mode 100644 index 46ddde0..0000000 --- a/test-cli/test/tests/qbutton.py +++ /dev/null @@ -1,64 +0,0 @@ -from test.helpers.syscmd import SysCommand -import unittest -import time - - -class Qbutton(unittest.TestCase): - params = None - - def __init__(self, testname, testfunc, varlist): - self.params = varlist - super(Qbutton, self).__init__(testfunc) - if "gpio" in varlist: - self.__gpio = varlist["gpio"] - else: - raise Exception('gpio param inside Qbutton must be defined') - if self.__gpio == "SOPA": - super(Qbutton, self).__init__("buttonSopa") - else: - super(Qbutton, self).__init__("buttonGpio") - self._testMethodDoc = testname - - def buttonGpio(self): - print("normal-button-test-using-gpio") - self.fail("failed: GPIO BUTTON FAIL") - - def buttonSopa(self): - str_cmd = "i2cset -f -y 1 0x2d 0x40 0x31" - disable_pmic = SysCommand("disable_pmic", str_cmd) - disable_pmic.execute() - # BUG: REPEAT THIS EXECUTION TWICE BECAUSE FIRST TIME IT RETURNS AN ERROR - led_on="echo 1 > /sys/class/leds/red\:usbhost/brightness" - ledon = SysCommand("led_on", led_on) - ledon.execute() - time.sleep(0.1) - disable_pmic.execute() - if disable_pmic.execute() == 0: - str_cmd = "i2cset -f -y 1 0x2d 0x50 0xff" - reset_button = SysCommand("reset_button", str_cmd) - if reset_button.execute() == 0: - str_cmd = "i2cget -f -y 1 0x2d 0x50" - get_button_val = SysCommand("get_button_val", str_cmd) - print("\n\t --> PRESS button for 1 sec (TIMEOUT: 10s) \n") - timeout = 0 - while timeout < 7200: - if get_button_val.execute() == 0: - get_button_val.execute() - button_value = get_button_val.getOutput() - button_value=button_value.decode('ascii').split("x") - if int(button_value[1]) == 4: - timeout = 7200 - led_off="echo 0 > /sys/class/leds/red\:usbhost/brightness" - ledoff = SysCommand("led_off", led_off) - ledoff.execute() - time.sleep(0.5) - timeout = timeout + 1 - if timeout==7200 and int(button_value[1]) == 0: - self.fail("failed: timeout exceeded") - else: - timeout = 7200 - self.fail("failed: not button input") - else: - self.fail("failed: could not complete i2c reset button state") - else: - self.fail("failed: could not complete i2c disable PMIC") diff --git a/test-cli/test/tests/qusb.py b/test-cli/test/tests/qusb.py index 7ecf31e..6a004f0 100644 --- a/test-cli/test/tests/qusb.py +++ b/test-cli/test/tests/qusb.py @@ -14,7 +14,7 @@ class Qusb(unittest.TestCase): def execute(self): # Execute script usb.sh - p = sh.bash('test/helpers/usb.sh') + p = sh.bash('test/scripts/usb.sh') # Search in the stdout a pattern "/dev/sd + {letter} + {number} q = re.search("/dev/sd\w\d", p.stdout.decode('ascii')) # get the first device which matches the pattern diff --git a/test-cli/test/tests/qwifi.py b/test-cli/test/tests/qwifi.py index 2a5fa0c..8daf069 100644 --- a/test-cli/test/tests/qwifi.py +++ b/test-cli/test/tests/qwifi.py @@ -4,14 +4,14 @@ import re class Qwifi(unittest.TestCase): - __sip = None + __serverip = None __numbytestx = None __bind = None - __OKBW = None + __bwexpected = None __port = None params = None - #varlist: serverip, bind, OKBW, port + # varlist content: serverip, bwexpected, port, (optional)bind def __init__(self, testname, testfunc, varlist): self.params = varlist super(Qwifi, self).__init__(testfunc) @@ -19,8 +19,8 @@ class Qwifi(unittest.TestCase): self.__serverip = varlist["serverip"] else: raise Exception('serverip param inside Qwifi have been be defined') - if "OKBW" in varlist: - self.__OKBW = varlist["OKBW"] + if "bwexpected" in varlist: + self.__bwexpected = varlist["bwexpected"] else: raise Exception('OKBW param inside Qwifi must be defined') if "port" in varlist: @@ -41,11 +41,12 @@ class Qwifi(unittest.TestCase): # get the first line of the output stream out1 = p.stdout.decode('ascii').splitlines()[0] if out1 != "Not connected.": - #check if the board has ip in the wlan0 interface + # check if the board has ip in the wlan0 interface p = sh.ifconfig("wlan0") if p.exit_code == 0: - result = re.search("inet addr:(?!127\.0{1,3}\.0{1,3}\.0{0,2}1$)((25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.){3}(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)", - p.stdout) + result = re.search( + 'inet addr:(?!127\.0{1,3}\.0{1,3}\.0{0,2}1$)((25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.){3}(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)', + p.stdout.decode('ascii')) if result: # execute iperf command against the server if self.__bind is None: @@ -69,7 +70,7 @@ class Qwifi(unittest.TestCase): bwreal = b[0] # check if BW is in the expected range - self.failUnless(float(bwreal) > float(self.__OKBW) * 0.9, + self.failUnless(float(bwreal) > float(self.__bwexpected) * 0.9, "failed: speed is lower than spected. Speed(MB/s)" + str(bwreal)) else: self.fail("failed: could not complete iperf command") -- cgit v1.1 From 41ffba6a76a80a7ef4553cb8856393dd209d172e Mon Sep 17 00:00:00 2001 From: Hector Fernandez Date: Tue, 10 Mar 2020 17:37:16 +0100 Subject: First release of the test. All the tests work correctly for SOPA0000. --- test-cli/test/tests/qduplex_ser.py | 18 ++++++--- test-cli/test/tests/qflash.py | 77 -------------------------------------- test-cli/test/tests/qserial.py | 8 ++-- test-cli/test/tests/qusb.py | 10 ++++- 4 files changed, 26 insertions(+), 87 deletions(-) delete mode 100644 test-cli/test/tests/qflash.py (limited to 'test-cli/test/tests') diff --git a/test-cli/test/tests/qduplex_ser.py b/test-cli/test/tests/qduplex_ser.py index 0666363..cb690cb 100644 --- a/test-cli/test/tests/qduplex_ser.py +++ b/test-cli/test/tests/qduplex_ser.py @@ -30,11 +30,7 @@ class Qduplex(unittest.TestCase): # open serial connection self.__serial1 = serial.Serial(self.__port1, self.__baudrate, timeout=1) - self.__serial1.flushInput() - self.__serial1.flushOutput() self.__serial2 = serial.Serial(self.__port2, self.__baudrate, timeout=1) - self.__serial2.flushInput() - self.__serial2.flushOutput() def __del__(self): self.__serial1.close() @@ -43,6 +39,12 @@ class Qduplex(unittest.TestCase): def execute(self): # generate a random uuid test_uuid = str(uuid.uuid4()).encode() + + # clean serial ports + self.__serial1.flushInput() + self.__serial1.flushOutput() + self.__serial2.flushInput() + self.__serial2.flushOutput() # send the uuid through serial port self.__serial1.write(test_uuid) time.sleep(0.05) # there might be a small delay @@ -52,7 +54,13 @@ class Qduplex(unittest.TestCase): if self.__serial2.readline() != test_uuid: self.fail("failed: port {} write/read mismatch".format(self.__port2)) - test_uuid = str(uuid.uuid4()).encode() + time.sleep(0.05) # there might be a small delay + + # clean serial ports + self.__serial1.flushInput() + self.__serial1.flushOutput() + self.__serial2.flushInput() + self.__serial2.flushOutput() # send the uuid through serial port self.__serial2.write(test_uuid) time.sleep(0.05) # there might be a small delay diff --git a/test-cli/test/tests/qflash.py b/test-cli/test/tests/qflash.py deleted file mode 100644 index 59ed13d..0000000 --- a/test-cli/test/tests/qflash.py +++ /dev/null @@ -1,77 +0,0 @@ -from test.helpers.syscmd import SysCommand -import unittest -from test.helpers.globalVariables import globalVar - - -class Qflasher(unittest.TestCase): - params = None - - def __init__(self, testname, testfunc, varlist): - self.params = varlist - super(Qflasher, self).__init__(testfunc) - self._testMethodDoc = testname - model=globalVar.g_mid - if model.find("IGEP0046") == 0: - self._flash_method = "mx6" - self._binlocation="/boot/" - elif model.find("IGEP0034") == 0: - self._flash_method = "nandti" - self._binlocation = "/boot/" - elif model.find("SOPA") == 0: - self._flash_method = "sopaflash" - elif model.find("OMAP3") == 0: - self._flash_method = "nandti" - self._binlocation = "/boot/" - elif model.find("OMAP5") == 0: - self._flash_method = "nandti" - self._binlocation = "/boot/" - - - def execute(self): - # CASE eMMC boards. - if(self._flash_method == "mx6"): - str_cmd= "dd if={}u-boot.imx of=/dev/mmcblk2 bs=512 seek=2 2>&1 >/dev/null".format(self._binlocation) - flash_command = SysCommand("flash_command", str_cmd) - if flash_command.execute() == 0: - str_cmd = "sync" - sync_command = SysCommand("sync_command", str_cmd) - if sync_command.execute() != 0: - self.fail("failed: could not sync") - else: - self.fail("failed: could not complete flash eMMC commands") - # CASE of nandflash boards - elif(self._flash_method == "nandti"): - str_cmd = "nandwrite -p -s 0x0 /dev/mtd0 {}u-boot.img 2>&1 >/dev/null; " \ - "nandwrite -p -s 0x20000 /dev/mtd0 {}u-boot.img 2>&1 >/dev/null; " \ - "nandwrite -p -s 0x40000 /dev/mtd0 {}u-boot.img 2>&1 >/dev/null; " \ - "nandwrite -p -s 0x60000 /dev/mtd0 {}u-boot.img 2>&1 >/dev/null; " \ - "".format(self._binlocation, self._binlocation, self._binlocation, self._binlocation,) - flash_MLO_command = SysCommand("flash_MLO_command", str_cmd) - if flash_MLO_command.execute() == 0: - str_cmd= "nandwrite -p /dev/mtd1 {}u-boot.img 2>&1 >/dev/null".format(self._binlocation) - flash_uBoot_command = SysCommand("flash_command", str_cmd) - if flash_uBoot_command.execute() == 0: - str_cmd = "sync" - sync_command = SysCommand("sync_command", str_cmd) - if sync_command.execute() != 0: - self.fail("failed: could not sync") - else: - self.fail("failed: could not complete flash u-boot commnds") - else: - self.fail("failed: could not complete flash MLO commnd") - - - # CASE of SOPA0000 BOARD. FULL FLASH (u-boot+kernel+rootfs) - elif (self._flash_method == "sopaflash"): - str_cmd = "nandtest /dev/mtd0 >/dev/null" - flash_command = SysCommand("flash_command", str_cmd) - if flash_command.execute() == 0: - str_cmd = "/usr/bin/igep-flash --skip-nandtest --image /opt/firmware/demo-ti-image-*-*.tar* >/dev/null 2>&1" - sync_command = SysCommand("sync_command", str_cmd) - if sync_command.execute() != 0: - self.fail("failed: could not complete flashing procces") - else: - self.fail("failed: could not complete nandtest mtd0") - - else: - self.fail("failed: could not find flash method") diff --git a/test-cli/test/tests/qserial.py b/test-cli/test/tests/qserial.py index 67e3af1..45c9d03 100644 --- a/test-cli/test/tests/qserial.py +++ b/test-cli/test/tests/qserial.py @@ -9,7 +9,7 @@ class Qserial(unittest.TestCase): __port = None __baudrate = None - #varlist: port, baudrate + # varlist: port, baudrate def __init__(self, testname, testfunc, varlist): self.params = varlist super(Qserial, self).__init__(testfunc) @@ -26,8 +26,6 @@ class Qserial(unittest.TestCase): # open serial connection self.__serial = serial.Serial(self.__port, self.__baudrate, timeout=1) - self.__serial.flushInput() - self.__serial.flushOutput() def __del__(self): self.__serial.close() @@ -35,6 +33,9 @@ class Qserial(unittest.TestCase): def execute(self): # generate a random uuid test_uuid = str(uuid.uuid4()).encode() + # clean serial port + self.__serial.flushInput() + self.__serial.flushOutput() # send the uuid through serial port self.__serial.write(test_uuid) time.sleep(0.05) # there might be a small delay @@ -44,4 +45,3 @@ class Qserial(unittest.TestCase): # check if what it was sent is equal to what has been received if self.__serial.readline() != test_uuid: self.fail("failed: port {} write/read mismatch".format(self.__port)) - diff --git a/test-cli/test/tests/qusb.py b/test-cli/test/tests/qusb.py index 6a004f0..6c22c48 100644 --- a/test-cli/test/tests/qusb.py +++ b/test-cli/test/tests/qusb.py @@ -21,11 +21,19 @@ class Qusb(unittest.TestCase): device = q.group(0) # create a new folder where the pendrive is going to be mounted sh.mkdir("-p", "/mnt/pendrive") + # check if the device is mounted, and umount it + try: + p = sh.findmnt("-n", device) + if p.exit_code == 0: + sh.umount(device) + except sh.ErrorReturnCode_1: + # error = 1 means "no found" + pass # mount the device p = sh.mount(device, "/mnt/pendrive") if p.exit_code == 0: # copy files - p = sh.cp("/root/usbtest/usbdatatest.bin", "/root/usbtest/usbdatatest.md5", "/mnt/pendrive") + p = sh.cp("test/files/usbtest/usbdatatest.bin", "test/files/usbtest/usbdatatest.md5", "/mnt/pendrive") if p.exit_code == 0: # check md5 with changedir("/mnt/pendrive/"): -- cgit v1.1 From 71d9a1f0b6bb4389cb9b0760ce4e847574fdcd45 Mon Sep 17 00:00:00 2001 From: Hector Fernandez Date: Fri, 13 Mar 2020 12:36:14 +0100 Subject: SOPA0000 test: Saved the variable data used during the eeprom and nand flashing. --- test-cli/test/tests/qnand.py | 2 +- test-cli/test/tests/qram.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) (limited to 'test-cli/test/tests') diff --git a/test-cli/test/tests/qnand.py b/test-cli/test/tests/qnand.py index 5125c1c..10a68f2 100644 --- a/test-cli/test/tests/qnand.py +++ b/test-cli/test/tests/qnand.py @@ -18,6 +18,6 @@ class Qnand(unittest.TestCase): def execute(self): try: - sh.nandtest("-m", self.__device) + sh.nandtest("-m", self.__device, _out="/dev/null") except sh.ErrorReturnCode as e: self.fail("failed: could not complete nandtest command") diff --git a/test-cli/test/tests/qram.py b/test-cli/test/tests/qram.py index cc75b68..5a7734a 100644 --- a/test-cli/test/tests/qram.py +++ b/test-cli/test/tests/qram.py @@ -23,6 +23,6 @@ class Qram(unittest.TestCase): def execute(self): try: - sh.memtester(self.__memsize, "1") + sh.memtester(self.__memsize, "1", _out="/dev/null") except sh.ErrorReturnCode as e: self.fail("failed: could not complete memtester command") -- cgit v1.1 From f86887baef80509460da0bff8f48b2900a627282 Mon Sep 17 00:00:00 2001 From: Hector Fernandez Date: Mon, 13 Apr 2020 09:49:51 +0200 Subject: Added new DB functions. Added Qrreader.py file. --- test-cli/test/tests/qethernet.py | 11 ++--------- test-cli/test/tests/qwifi.py | 1 + 2 files changed, 3 insertions(+), 9 deletions(-) (limited to 'test-cli/test/tests') diff --git a/test-cli/test/tests/qethernet.py b/test-cli/test/tests/qethernet.py index 1d4c9bc..027931c 100644 --- a/test-cli/test/tests/qethernet.py +++ b/test-cli/test/tests/qethernet.py @@ -19,10 +19,6 @@ class Qethernet(unittest.TestCase): self.__serverip = varlist["serverip"] else: raise Exception('sip param inside Qethernet have been be defined') - if "bind" in varlist: - self.__bind = varlist["bind"] - else: - self.__bind = None if "bwexpected" in varlist: self.__bwexpected = varlist["bwexpected"] else: @@ -36,11 +32,8 @@ class Qethernet(unittest.TestCase): def execute(self): # execute iperf command against the server - if self.__bind is None: - p = sh.iperf("-c", self.__serverip, "-x", "CMSV", "-n", self.__numbytestx, "-f", "m", "-p", self.__port) - else: - p = sh.iperf("-c", self.__serverip, "-x", "CMSV", "-n", self.__numbytestx, "-f", "m", "-p", self.__port, "-B", - self.__bind) + p = sh.iperf("-c", self.__serverip, "-x", "CMSV", "-n", self.__numbytestx, "-f", "m", "-p", self.__port) + # check if it was executed succesfully if p.exit_code == 0: if p.stdout == "": diff --git a/test-cli/test/tests/qwifi.py b/test-cli/test/tests/qwifi.py index 8daf069..b0b8d6b 100644 --- a/test-cli/test/tests/qwifi.py +++ b/test-cli/test/tests/qwifi.py @@ -44,6 +44,7 @@ class Qwifi(unittest.TestCase): # check if the board has ip in the wlan0 interface p = sh.ifconfig("wlan0") if p.exit_code == 0: + # check if wlan0 has an IP result = re.search( 'inet addr:(?!127\.0{1,3}\.0{1,3}\.0{0,2}1$)((25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.){3}(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)', p.stdout.decode('ascii')) -- cgit v1.1 From 9f613e33f27f772682ab7d7bd2261a9c4bd014c5 Mon Sep 17 00:00:00 2001 From: Hector Fernandez Date: Wed, 15 Apr 2020 17:59:38 +0200 Subject: Allow 2 attempts to communicate to amperimeter. --- test-cli/test/tests/qamper.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) (limited to 'test-cli/test/tests') diff --git a/test-cli/test/tests/qamper.py b/test-cli/test/tests/qamper.py index b3ab8b1..c611fdb 100644 --- a/test-cli/test/tests/qamper.py +++ b/test-cli/test/tests/qamper.py @@ -27,9 +27,11 @@ class Qamper(unittest.TestCase): self.fail("failed: can not open serial port") return # check if the amperimeter is connected and working + # 2 ATTEMTS if not amp.hello(): - self.fail("failed: can not communicate") - return + if not amp.hello(): + self.fail("failed: can not communicate") + return # get current value (in Amperes) result = amp.getCurrent() # close serial connection -- cgit v1.1 From 6a680137694233008005415e22cf889b85baa292 Mon Sep 17 00:00:00 2001 From: Hector Fernandez Date: Thu, 16 Apr 2020 17:09:19 +0200 Subject: First release --- test-cli/test/tests/qethernet.py | 9 ++++++--- test-cli/test/tests/qusb.py | 9 +++++++-- test-cli/test/tests/qwifi.py | 17 ++++++----------- 3 files changed, 19 insertions(+), 16 deletions(-) (limited to 'test-cli/test/tests') diff --git a/test-cli/test/tests/qethernet.py b/test-cli/test/tests/qethernet.py index 027931c..2246029 100644 --- a/test-cli/test/tests/qethernet.py +++ b/test-cli/test/tests/qethernet.py @@ -6,12 +6,11 @@ import re class Qethernet(unittest.TestCase): __serverip = None __numbytestx = None - __bind = None __bwexpected = None __port = None params = None - #varlist content: serverip, bwexpected, port, (optional)bind + # varlist content: serverip, bwexpected, port def __init__(self, testname, testfunc, varlist): self.params = varlist super(Qethernet, self).__init__(testfunc) @@ -32,7 +31,11 @@ class Qethernet(unittest.TestCase): def execute(self): # execute iperf command against the server - p = sh.iperf("-c", self.__serverip, "-x", "CMSV", "-n", self.__numbytestx, "-f", "m", "-p", self.__port) + try: + p = sh.iperf("-c", self.__serverip, "-x", "CMSV", "-n", self.__numbytestx, "-f", "m", "-p", self.__port, + _timeout=20) + except sh.TimeoutException: + self.fail("failed: iperf timeout reached") # check if it was executed succesfully if p.exit_code == 0: diff --git a/test-cli/test/tests/qusb.py b/test-cli/test/tests/qusb.py index 6c22c48..4c177d9 100644 --- a/test-cli/test/tests/qusb.py +++ b/test-cli/test/tests/qusb.py @@ -1,6 +1,7 @@ import sh import unittest import re +import os from test.helpers.changedir import changedir @@ -14,7 +15,9 @@ class Qusb(unittest.TestCase): def execute(self): # Execute script usb.sh - p = sh.bash('test/scripts/usb.sh') + test_abspath = os.path.dirname(os.path.abspath(__file__)) + test_abspath = os.path.join(test_abspath, "..", "..") + p = sh.bash(os.path.join(test_abspath, 'test/scripts/usb.sh')) # Search in the stdout a pattern "/dev/sd + {letter} + {number} q = re.search("/dev/sd\w\d", p.stdout.decode('ascii')) # get the first device which matches the pattern @@ -33,7 +36,9 @@ class Qusb(unittest.TestCase): p = sh.mount(device, "/mnt/pendrive") if p.exit_code == 0: # copy files - p = sh.cp("test/files/usbtest/usbdatatest.bin", "test/files/usbtest/usbdatatest.md5", "/mnt/pendrive") + p = sh.cp(os.path.join(test_abspath, "test/files/usbtest/usbdatatest.bin"), + os.path.join(test_abspath, "test/files/usbtest/usbdatatest.md5"), + "/mnt/pendrive") if p.exit_code == 0: # check md5 with changedir("/mnt/pendrive/"): diff --git a/test-cli/test/tests/qwifi.py b/test-cli/test/tests/qwifi.py index b0b8d6b..684cb34 100644 --- a/test-cli/test/tests/qwifi.py +++ b/test-cli/test/tests/qwifi.py @@ -6,12 +6,11 @@ import re class Qwifi(unittest.TestCase): __serverip = None __numbytestx = None - __bind = None __bwexpected = None __port = None params = None - # varlist content: serverip, bwexpected, port, (optional)bind + # varlist content: serverip, bwexpected, port def __init__(self, testname, testfunc, varlist): self.params = varlist super(Qwifi, self).__init__(testfunc) @@ -27,10 +26,6 @@ class Qwifi(unittest.TestCase): self.__port = varlist["port"] else: raise Exception('port param inside Qwifi must be defined') - if "bind" in varlist: - self.__bind = varlist["bind"] - else: - self.__bind = None self.__numbytestx = "10M" self._testMethodDoc = testname @@ -50,12 +45,12 @@ class Qwifi(unittest.TestCase): p.stdout.decode('ascii')) if result: # execute iperf command against the server - if self.__bind is None: - p = sh.iperf("-c", self.__serverip, "-x", "CMSV", "-n", self.__numbytestx, "-f", "m", "-p", - self.__port) - else: + try: p = sh.iperf("-c", self.__serverip, "-x", "CMSV", "-n", self.__numbytestx, "-f", "m", "-p", - self.__port, "-B", self.__bind) + self.__port, _timeout=20) + except sh.TimeoutException: + self.fail("failed: iperf timeout reached") + # check if it was executed succesfully if p.exit_code == 0: if p.stdout == "": -- cgit v1.1 From 09b3bb38fc7305c9b47c29bc90ebc9c636827307 Mon Sep 17 00:00:00 2001 From: Hector Fernandez Date: Wed, 17 Jun 2020 09:44:56 +0200 Subject: SOPA0000: Added support for saving results (strings or files) in the DB. It also saves a final DMESG file. --- test-cli/test/tests/qamper.py | 23 +++++++++++++++++------ test-cli/test/tests/qduplex_ser.py | 5 +++++ test-cli/test/tests/qeeprom.py | 4 ++++ test-cli/test/tests/qethernet.py | 38 ++++++++++++++++++++++++++------------ test-cli/test/tests/qi2c.py | 14 ++++++++++---- test-cli/test/tests/qnand.py | 17 ++++++++++++++++- test-cli/test/tests/qram.py | 8 +++++++- test-cli/test/tests/qrtc.py | 4 ++++ test-cli/test/tests/qserial.py | 5 +++++ test-cli/test/tests/qusb.py | 5 +++++ test-cli/test/tests/qwifi.py | 34 ++++++++++++++++++++++------------ 11 files changed, 121 insertions(+), 36 deletions(-) (limited to 'test-cli/test/tests') diff --git a/test-cli/test/tests/qamper.py b/test-cli/test/tests/qamper.py index c611fdb..51aa469 100644 --- a/test-cli/test/tests/qamper.py +++ b/test-cli/test/tests/qamper.py @@ -4,6 +4,7 @@ from test.helpers.amper import Amper class Qamper(unittest.TestCase): params = None + __current = None # varlist: undercurrent, overcurrent def __init__(self, testname, testfunc, varlist): @@ -33,12 +34,22 @@ class Qamper(unittest.TestCase): self.fail("failed: can not communicate") return # get current value (in Amperes) - result = amp.getCurrent() + self.__current = amp.getCurrent() # close serial connection amp.close() - # # In order to give a valid result it is importarnt to define an under current value - if result > float(self._overcurrent): - self.fail("failed: Overcurrent detected ( {} )".format(result)) + # Check current range + if float(self.__current) > float(self._overcurrent): + self.fail("failed: Overcurrent detected ( {} )".format(self.__current)) + if float(self.__current) < float(self._undercurrent): + self.fail("failed: Undercurrent detected ( {} )".format(self.__current)) - if result < float(self._undercurrent): - self.fail("failed: Undercurrent detected ( {} )".format(result)) + def getresults(self): + # resultlist is a python list of python dictionaries + resultlist = [ + { + "desc": "current (Ampers)", + "data": self.__current, + "type": "string" + } + ] + return resultlist diff --git a/test-cli/test/tests/qduplex_ser.py b/test-cli/test/tests/qduplex_ser.py index cb690cb..c7231c2 100644 --- a/test-cli/test/tests/qduplex_ser.py +++ b/test-cli/test/tests/qduplex_ser.py @@ -69,3 +69,8 @@ class Qduplex(unittest.TestCase): else: if self.__serial1.readline() != test_uuid: self.fail("failed: port {} write/read mismatch".format(self.__port1)) + + def getresults(self): + # resultlist is a python list of python dictionaries + resultlist = [] + return resultlist diff --git a/test-cli/test/tests/qeeprom.py b/test-cli/test/tests/qeeprom.py index 1921bf7..a65ca97 100644 --- a/test-cli/test/tests/qeeprom.py +++ b/test-cli/test/tests/qeeprom.py @@ -39,3 +39,7 @@ class Qeeprom(unittest.TestCase): else: self.fail("failed: Unable to write on the EEPROM device.") + def getresults(self): + # resultlist is a python list of python dictionaries + resultlist = [] + return resultlist diff --git a/test-cli/test/tests/qethernet.py b/test-cli/test/tests/qethernet.py index 2246029..da085d8 100644 --- a/test-cli/test/tests/qethernet.py +++ b/test-cli/test/tests/qethernet.py @@ -1,6 +1,7 @@ import unittest import sh import re +import json class Qethernet(unittest.TestCase): @@ -9,11 +10,16 @@ class Qethernet(unittest.TestCase): __bwexpected = None __port = None params = None + __bwreal = None # varlist content: serverip, bwexpected, port def __init__(self, testname, testfunc, varlist): + # save the parameters list self.params = varlist + # configure the function to be executed when the test runs. "testfunc" is a name of a method inside this + # class, that in this situation, it can only be "execute". super(Qethernet, self).__init__(testfunc) + # validate and get the parameters if "serverip" in varlist: self.__serverip = varlist["serverip"] else: @@ -32,8 +38,8 @@ class Qethernet(unittest.TestCase): def execute(self): # execute iperf command against the server try: - p = sh.iperf("-c", self.__serverip, "-x", "CMSV", "-n", self.__numbytestx, "-f", "m", "-p", self.__port, - _timeout=20) + p = sh.iperf3("-c", self.__serverip, "-n", self.__numbytestx, "-f", "m", "-p", self.__port, "-J", + _timeout=20) except sh.TimeoutException: self.fail("failed: iperf timeout reached") @@ -42,17 +48,25 @@ class Qethernet(unittest.TestCase): if p.stdout == "": self.fail("failed: error executing iperf command") # analyze output string - # split by lines - lines = p.stdout.splitlines() - # get first line - dat = lines[1].decode('ascii') - # search for the BW value - a = re.search("\d+(\.\d)? Mbits/sec", dat) - b = a.group().split() - bwreal = b[0] + data = json.loads(p.stdout.decode('ascii')) + self.__bwreal = float(data['end']['sum_received']['bits_per_second'])/1024/1024 + # save result file + with open('/tmp/ethernet-iperf.json', 'w') as outfile: + json.dump(data, outfile) # check if BW is in the expected range - self.failUnless(float(bwreal) > float(self.__bwexpected) * 0.9, - "failed: speed is lower than spected. Speed(MB/s)" + str(bwreal)) + self.failUnless(self.__bwreal > float(self.__bwexpected) * 0.9, + "failed: speed is lower than spected. Speed(Mbits/s)" + str(self.__bwreal)) else: self.fail("failed: could not complete iperf command") + + def getresults(self): + # resultlist is a python list of python dictionaries + resultlist = [ + { + "desc": "iperf3 output", + "data": "/tmp/ethernet-iperf.json", + "type": "file" + } + ] + return resultlist diff --git a/test-cli/test/tests/qi2c.py b/test-cli/test/tests/qi2c.py index c59975e..ad7ddf0 100644 --- a/test-cli/test/tests/qi2c.py +++ b/test-cli/test/tests/qi2c.py @@ -16,7 +16,7 @@ class Qi2c(unittest.TestCase): self.__register = varlist["register"].split("/") else: raise Exception('register param inside Qi2c must be defined') - self.__devices=[] + self.__devices = [] self._testMethodDoc = testname def execute(self): @@ -26,13 +26,19 @@ class Qi2c(unittest.TestCase): self.__raw_out = i2c_command.getOutput() if self.__raw_out == "": return -1 - lines=self.__raw_out.decode('ascii').splitlines() + lines = self.__raw_out.decode('ascii').splitlines() for i in range(len(lines)): if lines[i].count('UU'): if lines[i].find("UU"): - self.__devices.append("0x{}{}".format((i - 1), hex(int((lines[i].find("UU") - 4) / 3)).split('x')[-1])) + self.__devices.append( + "0x{}{}".format((i - 1), hex(int((lines[i].find("UU") - 4) / 3)).split('x')[-1])) for i in range(len(self.__register)): - if not(self.__register[i] in self.__devices): + if not (self.__register[i] in self.__devices): self.fail("failed: device {} not found in bus i2c-{}".format(self.__register[i], self.__busnum)) else: self.fail("failed: could not complete i2cdedtect command") + + def getresults(self): + # resultlist is a python list of python dictionaries + resultlist = [] + return resultlist diff --git a/test-cli/test/tests/qnand.py b/test-cli/test/tests/qnand.py index 10a68f2..7ff7cb2 100644 --- a/test-cli/test/tests/qnand.py +++ b/test-cli/test/tests/qnand.py @@ -18,6 +18,21 @@ class Qnand(unittest.TestCase): def execute(self): try: - sh.nandtest("-m", self.__device, _out="/dev/null") + p = sh.nandtest("-m", self.__device) + # save result + with open('/tmp/nand-nandtest.txt', 'w') as outfile: + n = outfile.write(p.stdout.decode('ascii')) + except sh.ErrorReturnCode as e: self.fail("failed: could not complete nandtest command") + + def getresults(self): + # resultlist is a python list of python dictionaries + resultlist = [ + { + "desc": "nandtest output", + "data": "/tmp/nand-nandtest.txt", + "type": "file" + } + ] + return resultlist diff --git a/test-cli/test/tests/qram.py b/test-cli/test/tests/qram.py index 5a7734a..561e980 100644 --- a/test-cli/test/tests/qram.py +++ b/test-cli/test/tests/qram.py @@ -23,6 +23,12 @@ class Qram(unittest.TestCase): def execute(self): try: - sh.memtester(self.__memsize, "1", _out="/dev/null") + p = sh.memtester(self.__memsize, "1", _out="/dev/null") + except sh.ErrorReturnCode as e: self.fail("failed: could not complete memtester command") + + def getresults(self): + # resultlist is a python list of python dictionaries + resultlist = [] + return resultlist diff --git a/test-cli/test/tests/qrtc.py b/test-cli/test/tests/qrtc.py index 0be0f99..715bcb7 100644 --- a/test-cli/test/tests/qrtc.py +++ b/test-cli/test/tests/qrtc.py @@ -37,3 +37,7 @@ class Qrtc(unittest.TestCase): else: self.fail("failed: couldn't execute hwclock command") + def getresults(self): + # resultlist is a python list of python dictionaries + resultlist = [] + return resultlist diff --git a/test-cli/test/tests/qserial.py b/test-cli/test/tests/qserial.py index 45c9d03..0cb5563 100644 --- a/test-cli/test/tests/qserial.py +++ b/test-cli/test/tests/qserial.py @@ -45,3 +45,8 @@ class Qserial(unittest.TestCase): # check if what it was sent is equal to what has been received if self.__serial.readline() != test_uuid: self.fail("failed: port {} write/read mismatch".format(self.__port)) + + def getresults(self): + # resultlist is a python list of python dictionaries + resultlist = [] + return resultlist diff --git a/test-cli/test/tests/qusb.py b/test-cli/test/tests/qusb.py index 4c177d9..316cef5 100644 --- a/test-cli/test/tests/qusb.py +++ b/test-cli/test/tests/qusb.py @@ -57,3 +57,8 @@ class Qusb(unittest.TestCase): self.fail("failed: unable to copy files.") else: self.fail("failed: unable to mount the device.") + + def getresults(self): + # resultlist is a python list of python dictionaries + resultlist = [] + return resultlist diff --git a/test-cli/test/tests/qwifi.py b/test-cli/test/tests/qwifi.py index 684cb34..0944dd7 100644 --- a/test-cli/test/tests/qwifi.py +++ b/test-cli/test/tests/qwifi.py @@ -1,6 +1,7 @@ import unittest import sh import re +import json class Qwifi(unittest.TestCase): @@ -9,6 +10,7 @@ class Qwifi(unittest.TestCase): __bwexpected = None __port = None params = None + __bwreal = None # varlist content: serverip, bwexpected, port def __init__(self, testname, testfunc, varlist): @@ -46,8 +48,8 @@ class Qwifi(unittest.TestCase): if result: # execute iperf command against the server try: - p = sh.iperf("-c", self.__serverip, "-x", "CMSV", "-n", self.__numbytestx, "-f", "m", "-p", - self.__port, _timeout=20) + p = sh.iperf3("-c", self.__serverip, "-n", self.__numbytestx, "-f", "m", "-p", self.__port, + "-J", _timeout=20) except sh.TimeoutException: self.fail("failed: iperf timeout reached") @@ -56,18 +58,15 @@ class Qwifi(unittest.TestCase): if p.stdout == "": self.fail("failed: error executing iperf command") # analyze output string - # split by lines - lines = p.stdout.splitlines() - # get first line - dat = lines[1].decode('ascii') - # search for the BW value - a = re.search("\d+(\.\d)? Mbits/sec", dat) - b = a.group().split() - bwreal = b[0] + data = json.loads(p.stdout.decode('ascii')) + self.__bwreal = float(data['end']['sum_received']['bits_per_second']) / 1024 / 1024 + # save result file + with open('/tmp/wifi-iperf.json', 'w') as outfile: + json.dump(data, outfile) # check if BW is in the expected range - self.failUnless(float(bwreal) > float(self.__bwexpected) * 0.9, - "failed: speed is lower than spected. Speed(MB/s)" + str(bwreal)) + self.failUnless(self.__bwreal > float(self.__bwexpected) * 0.9, + "failed: speed is lower than spected. Speed(Mbits/s)" + str(self.__bwreal)) else: self.fail("failed: could not complete iperf command") else: @@ -78,3 +77,14 @@ class Qwifi(unittest.TestCase): self.fail("failed: wifi module is not connected to the router.") else: self.fail("failed: couldn't execute iw command") + + def getresults(self): + # resultlist is a python list of python dictionaries + resultlist = [ + { + "desc": "iperf3 output", + "data": "/tmp/wifi-iperf.json", + "type": "file" + } + ] + return resultlist -- cgit v1.1 From 736ddcfe6dc3b5edbab6773f1c6687f6597fa7a3 Mon Sep 17 00:00:00 2001 From: Hector Fernandez Date: Wed, 17 Jun 2020 10:50:02 +0200 Subject: SOPA0000: Beatified JSON files generated by tests which use iperf3. --- test-cli/test/tests/qethernet.py | 2 +- test-cli/test/tests/qwifi.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) (limited to 'test-cli/test/tests') diff --git a/test-cli/test/tests/qethernet.py b/test-cli/test/tests/qethernet.py index da085d8..d7354bf 100644 --- a/test-cli/test/tests/qethernet.py +++ b/test-cli/test/tests/qethernet.py @@ -52,7 +52,7 @@ class Qethernet(unittest.TestCase): self.__bwreal = float(data['end']['sum_received']['bits_per_second'])/1024/1024 # save result file with open('/tmp/ethernet-iperf.json', 'w') as outfile: - json.dump(data, outfile) + json.dump(data, outfile, indent=4) # check if BW is in the expected range self.failUnless(self.__bwreal > float(self.__bwexpected) * 0.9, diff --git a/test-cli/test/tests/qwifi.py b/test-cli/test/tests/qwifi.py index 0944dd7..a5b66e9 100644 --- a/test-cli/test/tests/qwifi.py +++ b/test-cli/test/tests/qwifi.py @@ -62,7 +62,7 @@ class Qwifi(unittest.TestCase): self.__bwreal = float(data['end']['sum_received']['bits_per_second']) / 1024 / 1024 # save result file with open('/tmp/wifi-iperf.json', 'w') as outfile: - json.dump(data, outfile) + json.dump(data, outfile, indent=4) # check if BW is in the expected range self.failUnless(self.__bwreal > float(self.__bwexpected) * 0.9, -- cgit v1.1 From db3b1e45c47a1ef23c1ad67114a09cbec0976681 Mon Sep 17 00:00:00 2001 From: Hector Fernandez Date: Thu, 25 Jun 2020 11:45:31 +0200 Subject: Solved bugs. Adapted to DB changes. --- test-cli/test/tests/qamp.py | 76 -------------------------------------- test-cli/test/tests/qamper.py | 64 +++++++++++++++++++++++--------- test-cli/test/tests/qaudio.py | 39 ------------------- test-cli/test/tests/qduplex_ser.py | 43 +++++++++++++++++++-- test-cli/test/tests/qeeprom.py | 37 +++++++++++++++++-- test-cli/test/tests/qethernet.py | 47 +++++++++++++++++------ test-cli/test/tests/qi2c.py | 31 ++++++++++++++-- test-cli/test/tests/qnand.py | 33 +++++++++++++---- test-cli/test/tests/qram.py | 22 +++++++++-- test-cli/test/tests/qrtc.py | 38 +++++++++++++++++-- test-cli/test/tests/qscreen.py | 27 -------------- test-cli/test/tests/qserial.py | 29 +++++++++++++-- test-cli/test/tests/qtemplate.py | 51 ------------------------- test-cli/test/tests/qusb.py | 56 ++++++++++++++++++++++++---- test-cli/test/tests/qwifi.py | 76 ++++++++++++++++++++++++++++++++------ 15 files changed, 400 insertions(+), 269 deletions(-) delete mode 100644 test-cli/test/tests/qamp.py delete mode 100644 test-cli/test/tests/qaudio.py delete mode 100644 test-cli/test/tests/qscreen.py delete mode 100644 test-cli/test/tests/qtemplate.py (limited to 'test-cli/test/tests') diff --git a/test-cli/test/tests/qamp.py b/test-cli/test/tests/qamp.py deleted file mode 100644 index 56511c8..0000000 --- a/test-cli/test/tests/qamp.py +++ /dev/null @@ -1,76 +0,0 @@ -from test.helpers.syscmd import SysCommand -import unittest -import serial -import time - - -class Qamp(unittest.TestCase): - params = None - - # varlist: undercurrent, overcurrent - def __init__(self, testname, testfunc, varlist): - self.params = varlist - self._current = 0.0 - if "undercurrent" in varlist: - self._undercurrent = varlist["undercurrent"] - else: - raise Exception('undercurrent param inside Qamp must be defined') - if "overcurrent" in varlist: - self._overcurrent = varlist["overcurrent"] - else: - raise Exception('overcurrent param inside Qamp must be defined') - self._vshuntfactor = 16384.0 - self._ser = serial.Serial() - self._ser.port = '/dev/ttyUSB0' - self._ser.baudrate = 115200 - self._ser.parity = serial.PARITY_NONE - self._ser.timeout = 0 - self._ser.writeTimout = 0 - self._ser.xonxoff = False - self._ser.rtscts = False - self._ser.dsrdtr = False - super(Qamp, self).__init__(testfunc) - self._testMethodDoc = testname - - def __del__(self): - self._ser.close() - - def execute(self): - - # Open Serial port ttyUSB0 - error = 0 - try: - self._ser.open() - except: - self.fail("failed: IMPOSSIBLE OPEN USB-SERIAL PORT ( {} )".format(self._ser.port)) - error = 1 - if error == 0: - # Clean input and output buffer of serial port - self._ser.flushInput() - self._ser.flushOutput() - # Send command to read Voltage at Shunt resistor - # Prepare cmd - cmd = 'at+in?\n\r' - i = 0 - - # Write, 1 by 1 byte at a time to avoid hanging of serial receiver code of listener, emphasis being made at sleep of 50 ms. - while (i < len(cmd)): - i = i + self._ser.write(cmd[i].encode('ascii')) - time.sleep(0.05) - self._ser.read(1) - - # Read, 1 by 1 byte - res = [] - while self._ser.inWaiting() > 0: # if incoming bytes are waiting to be read from the serial input buffer - res.append(self._ser.read(1).decode('ascii')) # read the bytes and convert from binary array to ASCII - - string = ''.join(res) - string = string.replace('\n', '') - self._current = float(int(string, 0)) / self._vshuntfactor - print(self._current) - # In order to give a valid result it is importarnt to define an under current value - if self._current > float(self._overcurrent): - self.fail("failed: OVERCURRENT DETECTED ( {} )".format(self._current)) - - if self._current < float(self._undercurrent): - self.fail("failed: UNDERCURRENT DETECTED ( {} )".format(self._current)) diff --git a/test-cli/test/tests/qamper.py b/test-cli/test/tests/qamper.py index 51aa469..7a31615 100644 --- a/test-cli/test/tests/qamper.py +++ b/test-cli/test/tests/qamper.py @@ -4,7 +4,7 @@ from test.helpers.amper import Amper class Qamper(unittest.TestCase): params = None - __current = None + __resultlist = None # resultlist is a python list of python dictionaries # varlist: undercurrent, overcurrent def __init__(self, testname, testfunc, varlist): @@ -20,36 +20,66 @@ class Qamper(unittest.TestCase): else: raise Exception('overcurrent param inside Qamp must be defined') self._testMethodDoc = testname + self.__resultlist = [] def execute(self): amp = Amper() # open serial connection if not amp.open(): - self.fail("failed: can not open serial port") - return + self.__resultlist.append( + { + "desc": "Test result", + "data": "FAILED: can not open a serial port", + "type": "string" + } + ) + self.fail("Error: can not open a serial port") # check if the amperimeter is connected and working # 2 ATTEMTS if not amp.hello(): if not amp.hello(): - self.fail("failed: can not communicate") - return + self.__resultlist.append( + { + "desc": "Test result", + "data": "FAILED: can not communicate with the amperimeter", + "type": "string" + } + ) + self.fail("Error: can not communicate") # get current value (in Amperes) - self.__current = amp.getCurrent() + current = amp.getCurrent() # close serial connection amp.close() # Check current range - if float(self.__current) > float(self._overcurrent): - self.fail("failed: Overcurrent detected ( {} )".format(self.__current)) - if float(self.__current) < float(self._undercurrent): - self.fail("failed: Undercurrent detected ( {} )".format(self.__current)) + if float(current) > float(self._overcurrent): + # Overcurrent detected + self.__resultlist.append( + { + "desc": "Test result", + "data": "FAILED: Overcurrent detected ( {} A)".format(current), + "type": "string" + } + ) + self.fail("failed: Overcurrent detected ( {} )".format(current)) + elif float(current) < float(self._undercurrent): + # Undercurrent detected + self.__resultlist.append( + { + "desc": "Test result", + "data": "FAILED: Undercurrent detected ( {} A)".format(current), + "type": "string" + } + ) + self.fail("failed: Undercurrent detected ( {} )".format(current)) - def getresults(self): - # resultlist is a python list of python dictionaries - resultlist = [ + # Test successful + self.__resultlist.append( { - "desc": "current (Ampers)", - "data": self.__current, + "desc": "Test result", + "data": "OK: Current {} A".format(current), "type": "string" } - ] - return resultlist + ) + + def getresults(self): + return self.__resultlist diff --git a/test-cli/test/tests/qaudio.py b/test-cli/test/tests/qaudio.py deleted file mode 100644 index ef4da67..0000000 --- a/test-cli/test/tests/qaudio.py +++ /dev/null @@ -1,39 +0,0 @@ -from test.helpers.syscmd import SysCommand -import unittest - - -class Qaudio(unittest.TestCase): - params = None - - def __init__(self, testname, testfunc, varlist): - self.params = varlist - super(Qaudio, self).__init__(testfunc) - self._testMethodDoc = testname - if "dtmfFile" in varlist: - self._dtmfFile = varlist["dtmfFile"] - else: - raise Exception('undercurrent param inside Qamp must be defined') - self.__sum = 0 - self.__refSum = 25 # 1+3+5+7+9 - - def execute(self): - str_cmd = "aplay test/files/dtmf-13579.wav & arecord -r 8000 -d 1 recorded.wav" # .format(self.__dtmfFile) - audio_loop = SysCommand("command-name", str_cmd) - if audio_loop.execute() == -1: # BUG: Returns -1 but work - lines = audio_loop.getOutput().splitlines() - str_cmd = "multimon -t wav -a DTMF recorded.wav -q" - dtmf_decoder = SysCommand("command-name", str_cmd) - a = dtmf_decoder.execute() - if dtmf_decoder.execute() == -1: # BUG: Returns -1 but work - self.__raw_out = dtmf_decoder.getOutput() - if self.__raw_out == "": - self.fail("failed: can not execute multimon command") - lines = dtmf_decoder.getOutput().splitlines() - for i in range(0, 5): - aux = [int(s) for s in lines[i].split() if s.isdigit()] - self.__sum = self.__sum + aux[0] - self.failUnless(self.__sum == self.__refSum), "failed: incorrect dtmf code" + str(self.__sum) - else: - self.fail("failed: fail reading recorded file") - else: - self.fail("failed: fail playing/recording file") diff --git a/test-cli/test/tests/qduplex_ser.py b/test-cli/test/tests/qduplex_ser.py index c7231c2..020844a 100644 --- a/test-cli/test/tests/qduplex_ser.py +++ b/test-cli/test/tests/qduplex_ser.py @@ -9,6 +9,7 @@ class Qduplex(unittest.TestCase): __port1 = None __port2 = None __baudrate = None + __resultlist = None # resultlist is a python list of python dictionaries # varlist: port1, port2, baudrate def __init__(self, testname, testfunc, varlist): @@ -27,6 +28,7 @@ class Qduplex(unittest.TestCase): else: raise Exception('baudrate param inside Qduplex must be defined') self._testMethodDoc = testname + self.__resultlist = [] # open serial connection self.__serial1 = serial.Serial(self.__port1, self.__baudrate, timeout=1) @@ -49,9 +51,23 @@ class Qduplex(unittest.TestCase): self.__serial1.write(test_uuid) time.sleep(0.05) # there might be a small delay if self.__serial2.inWaiting() == 0: + self.__resultlist.append( + { + "desc": "Test result", + "data": "FAILED: port {} wait timeout exceded".format(self.__port2), + "type": "string" + } + ) self.fail("failed: port {} wait timeout exceded".format(self.__port2)) else: if self.__serial2.readline() != test_uuid: + self.__resultlist.append( + { + "desc": "Test result", + "data": "FAILED: port {} write/read mismatch".format(self.__port2), + "type": "string" + } + ) self.fail("failed: port {} write/read mismatch".format(self.__port2)) time.sleep(0.05) # there might be a small delay @@ -65,12 +81,33 @@ class Qduplex(unittest.TestCase): self.__serial2.write(test_uuid) time.sleep(0.05) # there might be a small delay if self.__serial1.inWaiting() == 0: + self.__resultlist.append( + { + "desc": "Test result", + "data": "FAILED: port {} wait timeout exceded".format(self.__port1), + "type": "string" + } + ) self.fail("failed: port {} wait timeout exceded".format(self.__port1)) else: if self.__serial1.readline() != test_uuid: + self.__resultlist.append( + { + "desc": "Test result", + "data": "FAILED: port {} write/read mismatch".format(self.__port1), + "type": "string" + } + ) self.fail("failed: port {} write/read mismatch".format(self.__port1)) + # Test successful + self.__resultlist.append( + { + "desc": "Test OK", + "data": "OK", + "type": "string" + } + ) + def getresults(self): - # resultlist is a python list of python dictionaries - resultlist = [] - return resultlist + return self.__resultlist diff --git a/test-cli/test/tests/qeeprom.py b/test-cli/test/tests/qeeprom.py index a65ca97..cafbc7f 100644 --- a/test-cli/test/tests/qeeprom.py +++ b/test-cli/test/tests/qeeprom.py @@ -6,12 +6,14 @@ class Qeeprom(unittest.TestCase): params = None __position = None __eeprompath = None + __resultlist = None # resultlist is a python list of python dictionaries # varlist: position, eeprompath def __init__(self, testname, testfunc, varlist): self.params = varlist super(Qeeprom, self).__init__(testfunc) self._testMethodDoc = testname + self.__resultlist = [] if "position" in varlist: self.__position = varlist["position"] else: @@ -33,13 +35,42 @@ class Qeeprom(unittest.TestCase): data_rx = p.stdout.decode('ascii') # compare both values if data_rx != data_tx: + # Both data are different + self.__resultlist.append( + { + "desc": "Test result", + "data": "FAILED: mismatch between written and received values.", + "type": "string" + } + ) self.fail("failed: mismatch between written and received values.") else: + self.__resultlist.append( + { + "desc": "Test result", + "data": "FAILED: Unable to read from the EEPROM device.", + "type": "string" + } + ) self.fail("failed: Unable to read from the EEPROM device.") else: + self.__resultlist.append( + { + "desc": "Test result", + "data": "FAILED: Unable to write on the EEPROM device.", + "type": "string" + } + ) self.fail("failed: Unable to write on the EEPROM device.") + # Test successful + self.__resultlist.append( + { + "desc": "Test result", + "data": "OK", + "type": "string" + } + ) + def getresults(self): - # resultlist is a python list of python dictionaries - resultlist = [] - return resultlist + return self.__resultlist diff --git a/test-cli/test/tests/qethernet.py b/test-cli/test/tests/qethernet.py index d7354bf..878c0a0 100644 --- a/test-cli/test/tests/qethernet.py +++ b/test-cli/test/tests/qethernet.py @@ -11,6 +11,7 @@ class Qethernet(unittest.TestCase): __port = None params = None __bwreal = None + __resultlist = None # resultlist is a python list of python dictionaries # varlist content: serverip, bwexpected, port def __init__(self, testname, testfunc, varlist): @@ -34,6 +35,7 @@ class Qethernet(unittest.TestCase): raise Exception('port param inside Qethernet must be defined') self.__numbytestx = "10M" self._testMethodDoc = testname + self.__resultlist = [] def execute(self): # execute iperf command against the server @@ -53,20 +55,43 @@ class Qethernet(unittest.TestCase): # save result file with open('/tmp/ethernet-iperf.json', 'w') as outfile: json.dump(data, outfile, indent=4) + self.__resultlist.append( + { + "desc": "iperf3 output", + "data": "/tmp/ethernet-iperf.json", + "type": "file" + } + ) # check if BW is in the expected range - self.failUnless(self.__bwreal > float(self.__bwexpected) * 0.9, - "failed: speed is lower than spected. Speed(Mbits/s)" + str(self.__bwreal)) + if self.__bwreal < float(self.__bwexpected): + self.__resultlist.append( + { + "desc": "Test result", + "data": "FAILED: speed is lower than spected. Speed(Mbits/s): " + str(self.__bwreal), + "type": "string" + } + ) + self.fail("failed: speed is lower than spected. Speed(Mbits/s): " + str(self.__bwreal)) else: - self.fail("failed: could not complete iperf command") + self.__resultlist.append( + { + "desc": "Test result", + "data": "FAILED: could not complete iperf command.", + "type": "string" + } + ) + self.fail("failed: could not complete iperf command.") - def getresults(self): - # resultlist is a python list of python dictionaries - resultlist = [ + # Test successful + self.__resultlist.append( { - "desc": "iperf3 output", - "data": "/tmp/ethernet-iperf.json", - "type": "file" + "desc": "Test result", + "data": "OK", + "type": "string" } - ] - return resultlist + ) + + def getresults(self): + + return self.__resultlist diff --git a/test-cli/test/tests/qi2c.py b/test-cli/test/tests/qi2c.py index ad7ddf0..3afedfa 100644 --- a/test-cli/test/tests/qi2c.py +++ b/test-cli/test/tests/qi2c.py @@ -4,6 +4,7 @@ import unittest class Qi2c(unittest.TestCase): params = None + __resultlist = None # resultlist is a python list of python dictionaries def __init__(self, testname, testfunc, varlist): self.params = varlist @@ -18,6 +19,7 @@ class Qi2c(unittest.TestCase): raise Exception('register param inside Qi2c must be defined') self.__devices = [] self._testMethodDoc = testname + self.__resultlist = [] def execute(self): str_cmd = "i2cdetect -a -y -r {}".format(self.__busnum) @@ -34,11 +36,32 @@ class Qi2c(unittest.TestCase): "0x{}{}".format((i - 1), hex(int((lines[i].find("UU") - 4) / 3)).split('x')[-1])) for i in range(len(self.__register)): if not (self.__register[i] in self.__devices): + self.__resultlist.append( + { + "desc": "Test result", + "data": "FAILED: device {} not found in bus i2c-{}".format(self.__register[i], self.__busnum), + "type": "string" + } + ) self.fail("failed: device {} not found in bus i2c-{}".format(self.__register[i], self.__busnum)) else: - self.fail("failed: could not complete i2cdedtect command") + self.__resultlist.append( + { + "desc": "Test result", + "data": "FAILED: could not complete i2cdedtect command", + "type": "string" + } + ) + self.fail("failed: could not complete i2cdedtect command.") + + # Test successful + self.__resultlist.append( + { + "desc": "Test result", + "data": "OK", + "type": "string" + } + ) def getresults(self): - # resultlist is a python list of python dictionaries - resultlist = [] - return resultlist + return self.__resultlist diff --git a/test-cli/test/tests/qnand.py b/test-cli/test/tests/qnand.py index 7ff7cb2..d7c22b3 100644 --- a/test-cli/test/tests/qnand.py +++ b/test-cli/test/tests/qnand.py @@ -5,6 +5,7 @@ import sh class Qnand(unittest.TestCase): params = None __device = "10M" + __resultlist = None # resultlist is a python list of python dictionaries # varlist: device def __init__(self, testname, testfunc, varlist): @@ -15,6 +16,7 @@ class Qnand(unittest.TestCase): else: raise Exception('device param inside Qnand must be defined') self._testMethodDoc = testname + self.__resultlist = [] def execute(self): try: @@ -22,17 +24,32 @@ class Qnand(unittest.TestCase): # save result with open('/tmp/nand-nandtest.txt', 'w') as outfile: n = outfile.write(p.stdout.decode('ascii')) + self.__resultlist.append( + { + "desc": "nandtest output", + "data": "/tmp/nand-nandtest.txt", + "type": "file" + } + ) except sh.ErrorReturnCode as e: + self.__resultlist.append( + { + "desc": "Test result", + "data": "FAILED: could not complete nandtest command", + "type": "string" + } + ) self.fail("failed: could not complete nandtest command") - def getresults(self): - # resultlist is a python list of python dictionaries - resultlist = [ + # Test successful + self.__resultlist.append( { - "desc": "nandtest output", - "data": "/tmp/nand-nandtest.txt", - "type": "file" + "desc": "Test result", + "data": "OK", + "type": "string" } - ] - return resultlist + ) + + def getresults(self): + return self.__resultlist diff --git a/test-cli/test/tests/qram.py b/test-cli/test/tests/qram.py index 561e980..a46424f 100644 --- a/test-cli/test/tests/qram.py +++ b/test-cli/test/tests/qram.py @@ -6,6 +6,7 @@ class Qram(unittest.TestCase): params = None __memsize = "10M" __loops = "1" + __resultlist = None # resultlist is a python list of python dictionaries # varlist: memsize, loops def __init__(self, testname, testfunc, varlist): @@ -20,15 +21,30 @@ class Qram(unittest.TestCase): else: raise Exception('loops param inside Qram must be defined') self._testMethodDoc = testname + self.__resultlist = [] def execute(self): try: p = sh.memtester(self.__memsize, "1", _out="/dev/null") except sh.ErrorReturnCode as e: + self.__resultlist.append( + { + "desc": "Test result", + "data": "FAILED: could not complete memtester command", + "type": "string" + } + ) self.fail("failed: could not complete memtester command") + # Test successful + self.__resultlist.append( + { + "desc": "Test result", + "data": "OK", + "type": "string" + } + ) + def getresults(self): - # resultlist is a python list of python dictionaries - resultlist = [] - return resultlist + return self.__resultlist diff --git a/test-cli/test/tests/qrtc.py b/test-cli/test/tests/qrtc.py index 715bcb7..6d4ecf6 100644 --- a/test-cli/test/tests/qrtc.py +++ b/test-cli/test/tests/qrtc.py @@ -7,6 +7,7 @@ import re class Qrtc(unittest.TestCase): params = None __rtc = None + __resultlist = None # resultlist is a python list of python dictionaries def __init__(self, testname, testfunc, varlist): self.params = varlist @@ -16,6 +17,7 @@ class Qrtc(unittest.TestCase): else: raise Exception('rtc param inside Qrtc must be defined') self._testMethodDoc = testname + self.__resultlist = [] def execute(self): # get time from RTC peripheral @@ -31,13 +33,41 @@ class Qrtc(unittest.TestCase): p.stdout.decode('ascii')) # check if the seconds of both times are different if time1 == time2: + self.__resultlist.append( + { + "desc": "Test result", + "data": "FAILED: RTC is not running", + "type": "string" + } + ) self.fail("failed: RTC is not running") else: - self.fail("failed: couldn't execute hwclock command 2nd time") + self.__resultlist.append( + { + "desc": "Test result", + "data": "FAILED: couldn't execute hwclock command for the second time", + "type": "string" + } + ) + self.fail("failed: couldn't execute hwclock command for the second time") else: + self.__resultlist.append( + { + "desc": "Test result", + "data": "FAILED: couldn't execute hwclock command", + "type": "string" + } + ) self.fail("failed: couldn't execute hwclock command") + # Test successful + self.__resultlist.append( + { + "desc": "Test result", + "data": "OK", + "type": "string" + } + ) + def getresults(self): - # resultlist is a python list of python dictionaries - resultlist = [] - return resultlist + return self.__resultlist diff --git a/test-cli/test/tests/qscreen.py b/test-cli/test/tests/qscreen.py deleted file mode 100644 index aaee628..0000000 --- a/test-cli/test/tests/qscreen.py +++ /dev/null @@ -1,27 +0,0 @@ -from test.helpers.syscmd import SysCommand -import unittest -from test.helpers.cv_display_test import pattern_detect - - -class Qscreen(unittest.TestCase): - params = None - - #varlist: display - def __init__(self, testname, testfunc, varlist): - self.params = varlist - super(Qscreen, self).__init__(testfunc) - if "display" in varlist: - self.__display = varlist["display"] - else: - raise Exception('display param inside Qscreen have been be defined') - self._testMethodDoc = testname - - def execute(self): - str_cmd = "fbi -T 1 --noverbose -d /dev/{} test/files/test_pattern.png".format(self.__display) - display_image = SysCommand("display_image", str_cmd) - if display_image.execute() == -1: - test_screen = pattern_detect(1) - if not test_screen=="0": - self.fail("failed: {}".format(test_screen)) - else: - self.fail("failed: could not display the image") diff --git a/test-cli/test/tests/qserial.py b/test-cli/test/tests/qserial.py index 0cb5563..faca505 100644 --- a/test-cli/test/tests/qserial.py +++ b/test-cli/test/tests/qserial.py @@ -8,6 +8,7 @@ class Qserial(unittest.TestCase): params = None __port = None __baudrate = None + __resultlist = None # resultlist is a python list of python dictionaries # varlist: port, baudrate def __init__(self, testname, testfunc, varlist): @@ -23,6 +24,7 @@ class Qserial(unittest.TestCase): else: raise Exception('baudrate param inside Qserial must be defined') self._testMethodDoc = testname + self.__resultlist = [] # open serial connection self.__serial = serial.Serial(self.__port, self.__baudrate, timeout=1) @@ -40,13 +42,34 @@ class Qserial(unittest.TestCase): self.__serial.write(test_uuid) time.sleep(0.05) # there might be a small delay if self.__serial.inWaiting() == 0: + self.__resultlist.append( + { + "desc": "Test result", + "data": "FAILED: port {} wait timeout exceded".format(self.__port), + "type": "string" + } + ) self.fail("failed: port {} wait timeout exceded".format(self.__port)) else: # check if what it was sent is equal to what has been received if self.__serial.readline() != test_uuid: + self.__resultlist.append( + { + "desc": "Test result", + "data": "FAILED: port {} write/read mismatch".format(self.__port), + "type": "string" + } + ) self.fail("failed: port {} write/read mismatch".format(self.__port)) + # Test successful + self.__resultlist.append( + { + "desc": "Test result", + "data": "OK", + "type": "string" + } + ) + def getresults(self): - # resultlist is a python list of python dictionaries - resultlist = [] - return resultlist + return self.__resultlist diff --git a/test-cli/test/tests/qtemplate.py b/test-cli/test/tests/qtemplate.py deleted file mode 100644 index 940cded..0000000 --- a/test-cli/test/tests/qtemplate.py +++ /dev/null @@ -1,51 +0,0 @@ -#IF COMMAND IS NEEDED -from test.helpers.syscmd import SysCommand -import unittest -#class name -class Qtemplate(unittest.TestCase): - # Initialize the variables - __variable1 = "Value-a" - __variable2 = "Value-b" - #.... - __variablen = "Value-n" - - def __init__(self, testname, testfunc, input1=None, inputn=None): - # Doing this we will initialize the class and later on perform a particular method inside this class - super(Qtemplate, self).__init__(testfunc) - self.__testname = testname - self.__input1 = input1 - self.__inputn = inputn - self._testMethodDoc = testname - - - - def execute(self): - str_cmd = "command" - t = SysCommand("command-name", str_cmd) - if t.execute() == 0: - self.__raw_out = t.getOutput() - if self.__raw_out == "": - return -1 - lines = t.getOutput().splitlines() - dat = lines[1] - dat = dat.decode('ascii') - dat_list = dat.split( ) - for d in dat_list: - a = dat_list.pop(0) - if a == "sec": - break - self.__MB_real = dat_list[0] - self.__BW_real = dat_list[2] - self.__dat_list = dat_list - print(self.__MB_real) - print(self.__BW_real) - self.failUnless(int(self.__BW_real)>int(self.__OKBW)*0.9,"FAIL:BECAUSE...") - else: - return -1 - return 0 - - def get_Total_MB(self): - return self.__MB_real; - - def get_Total_BW(self): - return self.__MB_real; diff --git a/test-cli/test/tests/qusb.py b/test-cli/test/tests/qusb.py index 316cef5..d952afc 100644 --- a/test-cli/test/tests/qusb.py +++ b/test-cli/test/tests/qusb.py @@ -7,11 +7,13 @@ from test.helpers.changedir import changedir class Qusb(unittest.TestCase): params = None + __resultlist = None # resultlist is a python list of python dictionaries def __init__(self, testname, testfunc, varlist): self.params = varlist super(Qusb, self).__init__(testfunc) self._testMethodDoc = testname + self.__resultlist = [] def execute(self): # Execute script usb.sh @@ -19,9 +21,19 @@ class Qusb(unittest.TestCase): test_abspath = os.path.join(test_abspath, "..", "..") p = sh.bash(os.path.join(test_abspath, 'test/scripts/usb.sh')) # Search in the stdout a pattern "/dev/sd + {letter} + {number} - q = re.search("/dev/sd\w\d", p.stdout.decode('ascii')) + match = re.search("/dev/sd\w\d", p.stdout.decode('ascii')) # get the first device which matches the pattern - device = q.group(0) + if match: + device = match.group(0) + else: + self.__resultlist.append( + { + "desc": "Test result", + "data": "FAILED: No pendrive found", + "type": "string" + } + ) + self.fail("failed: No pendrive found.") # create a new folder where the pendrive is going to be mounted sh.mkdir("-p", "/mnt/pendrive") # check if the device is mounted, and umount it @@ -50,15 +62,43 @@ class Qusb(unittest.TestCase): sh.umount("/mnt/pendrive") # check result if q is None: - self.fail("failed: wrong md5 result.") + self.__resultlist.append( + { + "desc": "Test result", + "data": "FAILED: Wrong md5 result", + "type": "string" + } + ) + self.fail("failed: Wrong md5 result.") else: # umount sh.umount("/mnt/pendrive") - self.fail("failed: unable to copy files.") + self.__resultlist.append( + { + "desc": "Test result", + "data": "FAILED: Unable to copy files to the USB memory device", + "type": "string" + } + ) + self.fail("failed: Unable to copy files to the USB memory device.") else: - self.fail("failed: unable to mount the device.") + self.__resultlist.append( + { + "desc": "Test result", + "data": "FAILED: Unable to mount the USB memory device", + "type": "string" + } + ) + self.fail("failed: Unable to mount the USB memory device.") + + # Test successful + self.__resultlist.append( + { + "desc": "Test result", + "data": "OK", + "type": "string" + } + ) def getresults(self): - # resultlist is a python list of python dictionaries - resultlist = [] - return resultlist + return self.__resultlist diff --git a/test-cli/test/tests/qwifi.py b/test-cli/test/tests/qwifi.py index a5b66e9..6f972d3 100644 --- a/test-cli/test/tests/qwifi.py +++ b/test-cli/test/tests/qwifi.py @@ -11,6 +11,7 @@ class Qwifi(unittest.TestCase): __port = None params = None __bwreal = None + __resultlist = None # resultlist is a python list of python dictionaries # varlist content: serverip, bwexpected, port def __init__(self, testname, testfunc, varlist): @@ -30,6 +31,7 @@ class Qwifi(unittest.TestCase): raise Exception('port param inside Qwifi must be defined') self.__numbytestx = "10M" self._testMethodDoc = testname + self.__resultlist = [] def execute(self): # check if the board is connected to the router by wifi @@ -63,28 +65,78 @@ class Qwifi(unittest.TestCase): # save result file with open('/tmp/wifi-iperf.json', 'w') as outfile: json.dump(data, outfile, indent=4) + self.__resultlist.append( + { + "desc": "iperf3 output", + "data": "/tmp/wifi-iperf.json", + "type": "file" + } + ) # check if BW is in the expected range - self.failUnless(self.__bwreal > float(self.__bwexpected) * 0.9, - "failed: speed is lower than spected. Speed(Mbits/s)" + str(self.__bwreal)) + if self.__bwreal < float(self.__bwexpected): + self.__resultlist.append( + { + "desc": "Test result", + "data": "FAILED: speed is lower than expected. Speed(Mbits/s): " + str(self.__bwreal), + "type": "string" + } + ) + self.fail("failed: speed is lower than expected. Speed(Mbits/s): " + str(self.__bwreal)) else: - self.fail("failed: could not complete iperf command") + self.__resultlist.append( + { + "desc": "Test result", + "data": "FAILED: could not complete iperf3 command", + "type": "string" + } + ) + self.fail("failed: could not complete iperf3 command") else: + self.__resultlist.append( + { + "desc": "Test result", + "data": "FAILED: wlan0 interface doesn't have any ip address", + "type": "string" + } + ) self.fail("failed: wlan0 interface doesn't have any ip address.") else: + self.__resultlist.append( + { + "desc": "Test result", + "data": "FAILED: could not complete ifconfig command", + "type": "string" + } + ) self.fail("failed: could not complete ifconfig command.") else: + self.__resultlist.append( + { + "desc": "Test result", + "data": "FAILED: wifi module is not connected to the router", + "type": "string" + } + ) self.fail("failed: wifi module is not connected to the router.") else: - self.fail("failed: couldn't execute iw command") + self.__resultlist.append( + { + "desc": "Test result", + "data": "FAILED: could not execute iw command", + "type": "string" + } + ) + self.fail("failed: could not execute iw command") - def getresults(self): - # resultlist is a python list of python dictionaries - resultlist = [ + # Test successful + self.__resultlist.append( { - "desc": "iperf3 output", - "data": "/tmp/wifi-iperf.json", - "type": "file" + "desc": "Test result", + "data": "OK", + "type": "string" } - ] - return resultlist + ) + + def getresults(self): + return self.__resultlist -- cgit v1.1 From 9ac8a326412b04e4873b883c2f2a056ca0b22480 Mon Sep 17 00:00:00 2001 From: Hector Fernandez Date: Thu, 25 Jun 2020 14:42:42 +0200 Subject: Modified USB test and the way it finds USB memory storages. --- test-cli/test/tests/qusb.py | 26 ++++++++++++-------------- 1 file changed, 12 insertions(+), 14 deletions(-) (limited to 'test-cli/test/tests') diff --git a/test-cli/test/tests/qusb.py b/test-cli/test/tests/qusb.py index d952afc..32d99ef 100644 --- a/test-cli/test/tests/qusb.py +++ b/test-cli/test/tests/qusb.py @@ -2,6 +2,7 @@ import sh import unittest import re import os +from test.helpers.usb import USBDevices from test.helpers.changedir import changedir @@ -16,24 +17,20 @@ class Qusb(unittest.TestCase): self.__resultlist = [] def execute(self): - # Execute script usb.sh - test_abspath = os.path.dirname(os.path.abspath(__file__)) - test_abspath = os.path.join(test_abspath, "..", "..") - p = sh.bash(os.path.join(test_abspath, 'test/scripts/usb.sh')) - # Search in the stdout a pattern "/dev/sd + {letter} + {number} - match = re.search("/dev/sd\w\d", p.stdout.decode('ascii')) - # get the first device which matches the pattern - if match: - device = match.group(0) + # get usb device + dev_obj = USBDevices(self.params["xml"]) + if dev_obj.getMassStorage(): + device = dev_obj.getMassStorage()['disk'] + "1" else: self.__resultlist.append( { "desc": "Test result", - "data": "FAILED: No pendrive found", + "data": "FAILED: No USB memory found", "type": "string" } ) - self.fail("failed: No pendrive found.") + self.fail("failed: No USB memory found.") + # create a new folder where the pendrive is going to be mounted sh.mkdir("-p", "/mnt/pendrive") # check if the device is mounted, and umount it @@ -48,11 +45,12 @@ class Qusb(unittest.TestCase): p = sh.mount(device, "/mnt/pendrive") if p.exit_code == 0: # copy files - p = sh.cp(os.path.join(test_abspath, "test/files/usbtest/usbdatatest.bin"), - os.path.join(test_abspath, "test/files/usbtest/usbdatatest.md5"), + test_abspath = os.path.dirname(os.path.abspath(__file__)) + "/../" + p = sh.cp(os.path.join(test_abspath, "files/usbtest/usbdatatest.bin"), + os.path.join(test_abspath, "files/usbtest/usbdatatest.md5"), "/mnt/pendrive") if p.exit_code == 0: - # check md5 + # check with changedir("/mnt/pendrive/"): p = sh.md5sum("-c", "usbdatatest.md5") q = re.search("OK", p.stdout.decode('ascii')) -- cgit v1.1 From 34df86b37d6838b115e65e5f3a332344afeb86b8 Mon Sep 17 00:00:00 2001 From: Hector Fernandez Date: Wed, 1 Jul 2020 10:45:34 +0200 Subject: Changes to adapt to new way to save results in DB. Created audio test. Added protections against unexpected status in DB. --- test-cli/test/tests/qamper.py | 61 +++++++++++----------------------- test-cli/test/tests/qaudio.py | 59 +++++++++++++++++++++++++++++++++ test-cli/test/tests/qdmesg.py | 38 +++++++++++++++++++++ test-cli/test/tests/qduplex_ser.py | 40 ++--------------------- test-cli/test/tests/qeeprom.py | 33 ++----------------- test-cli/test/tests/qethernet.py | 37 +++++---------------- test-cli/test/tests/qi2c.py | 67 -------------------------------------- test-cli/test/tests/qmmcflash.py | 15 +++++++++ test-cli/test/tests/qnand.py | 29 +++++------------ test-cli/test/tests/qram.py | 20 ++---------- test-cli/test/tests/qrtc.py | 33 ++----------------- test-cli/test/tests/qserial.py | 26 ++------------- test-cli/test/tests/qusb.py | 40 ++--------------------- test-cli/test/tests/qwifi.py | 63 +++++------------------------------ 14 files changed, 173 insertions(+), 388 deletions(-) create mode 100644 test-cli/test/tests/qaudio.py create mode 100644 test-cli/test/tests/qdmesg.py delete mode 100644 test-cli/test/tests/qi2c.py create mode 100644 test-cli/test/tests/qmmcflash.py (limited to 'test-cli/test/tests') diff --git a/test-cli/test/tests/qamper.py b/test-cli/test/tests/qamper.py index 7a31615..b4d57e3 100644 --- a/test-cli/test/tests/qamper.py +++ b/test-cli/test/tests/qamper.py @@ -26,60 +26,37 @@ class Qamper(unittest.TestCase): amp = Amper() # open serial connection if not amp.open(): - self.__resultlist.append( - { - "desc": "Test result", - "data": "FAILED: can not open a serial port", - "type": "string" - } - ) self.fail("Error: can not open a serial port") # check if the amperimeter is connected and working # 2 ATTEMTS if not amp.hello(): if not amp.hello(): - self.__resultlist.append( - { - "desc": "Test result", - "data": "FAILED: can not communicate with the amperimeter", - "type": "string" - } - ) self.fail("Error: can not communicate") # get current value (in Amperes) - current = amp.getCurrent() + self.current = amp.getCurrent() + # save result in a file + with open('/tmp/station/amper.txt', 'w') as outfile: + n = outfile.write("Current: {} A".format(self.current)) + outfile.close() + self.__resultlist.append( + { + "description": "Amperimeter values", + "filepath": "/tmp/station/amper.txt", + "mimetype": "text/plain" + } + ) # close serial connection amp.close() # Check current range - if float(current) > float(self._overcurrent): + if float(self.current) > float(self._overcurrent): # Overcurrent detected - self.__resultlist.append( - { - "desc": "Test result", - "data": "FAILED: Overcurrent detected ( {} A)".format(current), - "type": "string" - } - ) - self.fail("failed: Overcurrent detected ( {} )".format(current)) - elif float(current) < float(self._undercurrent): + self.fail("failed: Overcurrent detected ( {} )".format(self.current)) + elif float(self.current) < float(self._undercurrent): # Undercurrent detected - self.__resultlist.append( - { - "desc": "Test result", - "data": "FAILED: Undercurrent detected ( {} A)".format(current), - "type": "string" - } - ) - self.fail("failed: Undercurrent detected ( {} )".format(current)) - - # Test successful - self.__resultlist.append( - { - "desc": "Test result", - "data": "OK: Current {} A".format(current), - "type": "string" - } - ) + self.fail("failed: Undercurrent detected ( {} )".format(self.current)) def getresults(self): return self.__resultlist + + def gettextresult(self): + return "{} A".format(self.current) diff --git a/test-cli/test/tests/qaudio.py b/test-cli/test/tests/qaudio.py new file mode 100644 index 0000000..5baffe4 --- /dev/null +++ b/test-cli/test/tests/qaudio.py @@ -0,0 +1,59 @@ +import unittest +import sh +import wave +import contextlib + + +def calc_audio_duration(fname): + with contextlib.closing(wave.open(fname, 'r')) as f: + frames = f.getnframes() + rate = f.getframerate() + duration = frames / float(rate) + f.close() + return duration + + +class Qaudio(unittest.TestCase): + params = None + __resultlist = None # resultlist is a python list of python dictionaries + __dtmf_secuence = ["1", "3", "5", "7", "9"] + + def __init__(self, testname, testfunc, varlist): + self.params = varlist + super(Qaudio, self).__init__(testfunc) + self._testMethodDoc = testname + self.__resultlist = [] + + def execute(self): + # analize audio file + recordtime = calc_audio_duration("test/files/dtmf-13579.wav") + 0.15 + # play and record + p1 = sh.aplay("test/files/dtmf-13579.wav", _bg=True) + p2 = sh.arecord("-r", 8000, "-d", recordtime, "/tmp/station/recorded.wav", _bg=True) + p1.wait() + p2.wait() + if p1.exit_code == 0 and p2.exit_code == 0: + p = sh.multimon("-t", "wav", "-a", "DTMF", "/tmp/station/recorded.wav", "-q") + if p.exit_code == 0: + lines = p.stdout.decode('ascii').splitlines() + self.__dtmf_secuence_result = [] + for li in lines: + if li.split(" ")[0] == "DTMF:": + self.__dtmf_secuence_result.append(li.split(" ")[1]) + # compare original and processed dtmf sequence + if len(self.__dtmf_secuence) == len(self.__dtmf_secuence_result): + for i in range(len(self.__dtmf_secuence)): + if self.__dtmf_secuence[i] != self.__dtmf_secuence_result[i]: + self.fail("failed: sent and received DTMF sequence don't match.") + else: + self.fail("failed: received DTMF sequence is shorter than expected.") + else: + self.fail("failed: unable to use multimon command.") + else: + self.fail("failed: unable to play/record audio.") + + def getresults(self): + return self.__resultlist + + def gettextresult(self): + return "" diff --git a/test-cli/test/tests/qdmesg.py b/test-cli/test/tests/qdmesg.py new file mode 100644 index 0000000..45dd4eb --- /dev/null +++ b/test-cli/test/tests/qdmesg.py @@ -0,0 +1,38 @@ +import sh +import os.path +from os import path + +class Qdmesg: + params = None + __resultlist = None # resultlist is a python list of python dictionaries + + def __init__(self, testname, testfunc, varlist): + self.params = varlist + self._testMethodDoc = testname + self.__resultlist = [] + self.pgObj = varlist["db"] + + def execute(self): + print("running dmesg test") + self.pgObj.run_test(self.params["testidctl"], self.params["testid"]) + # delete previous file + if path.exists("/tmp/station/dmesg.txt"): + print("dmesg remove") + os.remove("/tmp/station/dmesg.txt") + # generate file + p = sh.dmesg("--color=never", _out="/tmp/station/dmesg.txt") + print("Exit code: {}".format(p.exit_code)) + if p.exit_code == 0: + # save result + # with open('/tmp/station/dmesg.txt', 'w') as outfile: + # n = outfile.write(p.stdout.decode('ascii')) + # outfile.close() + + # save dmesg result in DB + self.pgObj.upload_result_file(self.params["testidctl"], self.params["testid"], "dmesg output", + "/tmp/station/dmesg.txt", "text/plain") + self.pgObj.finish_test(self.params["testidctl"], self.params["testid"], "TEST_COMPLETE", "") + print("success dmesg test") + else: + self.pgObj.finish_test(self.params["testidctl"], self.params["testid"], "TEST_FAILED", "") + print("fail dmesg test") diff --git a/test-cli/test/tests/qduplex_ser.py b/test-cli/test/tests/qduplex_ser.py index 020844a..170039b 100644 --- a/test-cli/test/tests/qduplex_ser.py +++ b/test-cli/test/tests/qduplex_ser.py @@ -51,23 +51,9 @@ class Qduplex(unittest.TestCase): self.__serial1.write(test_uuid) time.sleep(0.05) # there might be a small delay if self.__serial2.inWaiting() == 0: - self.__resultlist.append( - { - "desc": "Test result", - "data": "FAILED: port {} wait timeout exceded".format(self.__port2), - "type": "string" - } - ) self.fail("failed: port {} wait timeout exceded".format(self.__port2)) else: if self.__serial2.readline() != test_uuid: - self.__resultlist.append( - { - "desc": "Test result", - "data": "FAILED: port {} write/read mismatch".format(self.__port2), - "type": "string" - } - ) self.fail("failed: port {} write/read mismatch".format(self.__port2)) time.sleep(0.05) # there might be a small delay @@ -81,33 +67,13 @@ class Qduplex(unittest.TestCase): self.__serial2.write(test_uuid) time.sleep(0.05) # there might be a small delay if self.__serial1.inWaiting() == 0: - self.__resultlist.append( - { - "desc": "Test result", - "data": "FAILED: port {} wait timeout exceded".format(self.__port1), - "type": "string" - } - ) self.fail("failed: port {} wait timeout exceded".format(self.__port1)) else: if self.__serial1.readline() != test_uuid: - self.__resultlist.append( - { - "desc": "Test result", - "data": "FAILED: port {} write/read mismatch".format(self.__port1), - "type": "string" - } - ) self.fail("failed: port {} write/read mismatch".format(self.__port1)) - # Test successful - self.__resultlist.append( - { - "desc": "Test OK", - "data": "OK", - "type": "string" - } - ) - def getresults(self): return self.__resultlist + + def gettextresult(self): + return "" diff --git a/test-cli/test/tests/qeeprom.py b/test-cli/test/tests/qeeprom.py index cafbc7f..7fc9fcd 100644 --- a/test-cli/test/tests/qeeprom.py +++ b/test-cli/test/tests/qeeprom.py @@ -36,41 +36,14 @@ class Qeeprom(unittest.TestCase): # compare both values if data_rx != data_tx: # Both data are different - self.__resultlist.append( - { - "desc": "Test result", - "data": "FAILED: mismatch between written and received values.", - "type": "string" - } - ) self.fail("failed: mismatch between written and received values.") else: - self.__resultlist.append( - { - "desc": "Test result", - "data": "FAILED: Unable to read from the EEPROM device.", - "type": "string" - } - ) self.fail("failed: Unable to read from the EEPROM device.") else: - self.__resultlist.append( - { - "desc": "Test result", - "data": "FAILED: Unable to write on the EEPROM device.", - "type": "string" - } - ) self.fail("failed: Unable to write on the EEPROM device.") - # Test successful - self.__resultlist.append( - { - "desc": "Test result", - "data": "OK", - "type": "string" - } - ) - def getresults(self): return self.__resultlist + + def gettextresult(self): + return "" diff --git a/test-cli/test/tests/qethernet.py b/test-cli/test/tests/qethernet.py index 878c0a0..3b2f197 100644 --- a/test-cli/test/tests/qethernet.py +++ b/test-cli/test/tests/qethernet.py @@ -1,6 +1,5 @@ import unittest import sh -import re import json @@ -53,45 +52,25 @@ class Qethernet(unittest.TestCase): data = json.loads(p.stdout.decode('ascii')) self.__bwreal = float(data['end']['sum_received']['bits_per_second'])/1024/1024 # save result file - with open('/tmp/ethernet-iperf.json', 'w') as outfile: + with open('/tmp/station/ethernet-iperf3.json', 'w') as outfile: json.dump(data, outfile, indent=4) + outfile.close() self.__resultlist.append( { - "desc": "iperf3 output", - "data": "/tmp/ethernet-iperf.json", - "type": "file" + "description": "iperf3 output", + "filepath": "/tmp/station/ethernet-iperf3.json", + "mimetype": "application/json" } ) # check if BW is in the expected range if self.__bwreal < float(self.__bwexpected): - self.__resultlist.append( - { - "desc": "Test result", - "data": "FAILED: speed is lower than spected. Speed(Mbits/s): " + str(self.__bwreal), - "type": "string" - } - ) self.fail("failed: speed is lower than spected. Speed(Mbits/s): " + str(self.__bwreal)) else: - self.__resultlist.append( - { - "desc": "Test result", - "data": "FAILED: could not complete iperf command.", - "type": "string" - } - ) self.fail("failed: could not complete iperf command.") - # Test successful - self.__resultlist.append( - { - "desc": "Test result", - "data": "OK", - "type": "string" - } - ) - def getresults(self): - return self.__resultlist + + def gettextresult(self): + return "" diff --git a/test-cli/test/tests/qi2c.py b/test-cli/test/tests/qi2c.py deleted file mode 100644 index 3afedfa..0000000 --- a/test-cli/test/tests/qi2c.py +++ /dev/null @@ -1,67 +0,0 @@ -from test.helpers.syscmd import SysCommand -import unittest - - -class Qi2c(unittest.TestCase): - params = None - __resultlist = None # resultlist is a python list of python dictionaries - - def __init__(self, testname, testfunc, varlist): - self.params = varlist - super(Qi2c, self).__init__(testfunc) - if "busnum" in varlist: - self.__busnum = varlist["busnum"] - else: - raise Exception('busnum param inside Qi2c must be defined') - if "register" in varlist: - self.__register = varlist["register"].split("/") - else: - raise Exception('register param inside Qi2c must be defined') - self.__devices = [] - self._testMethodDoc = testname - self.__resultlist = [] - - def execute(self): - str_cmd = "i2cdetect -a -y -r {}".format(self.__busnum) - i2c_command = SysCommand("i2cdetect", str_cmd) - if i2c_command.execute() == 0: - self.__raw_out = i2c_command.getOutput() - if self.__raw_out == "": - return -1 - lines = self.__raw_out.decode('ascii').splitlines() - for i in range(len(lines)): - if lines[i].count('UU'): - if lines[i].find("UU"): - self.__devices.append( - "0x{}{}".format((i - 1), hex(int((lines[i].find("UU") - 4) / 3)).split('x')[-1])) - for i in range(len(self.__register)): - if not (self.__register[i] in self.__devices): - self.__resultlist.append( - { - "desc": "Test result", - "data": "FAILED: device {} not found in bus i2c-{}".format(self.__register[i], self.__busnum), - "type": "string" - } - ) - self.fail("failed: device {} not found in bus i2c-{}".format(self.__register[i], self.__busnum)) - else: - self.__resultlist.append( - { - "desc": "Test result", - "data": "FAILED: could not complete i2cdedtect command", - "type": "string" - } - ) - self.fail("failed: could not complete i2cdedtect command.") - - # Test successful - self.__resultlist.append( - { - "desc": "Test result", - "data": "OK", - "type": "string" - } - ) - - def getresults(self): - return self.__resultlist diff --git a/test-cli/test/tests/qmmcflash.py b/test-cli/test/tests/qmmcflash.py new file mode 100644 index 0000000..f10757e --- /dev/null +++ b/test-cli/test/tests/qmmcflash.py @@ -0,0 +1,15 @@ +import unittest +import sh + +class Qmmcflash(unittest.TestCase): + params = None + + def __init__(self, testname, testfunc, varlist): + self.params = varlist + super(Qmmcflash, self).__init__(testfunc) + self._testMethodDoc = testname + # TODO + + + def execute(self): + # TODO \ No newline at end of file diff --git a/test-cli/test/tests/qnand.py b/test-cli/test/tests/qnand.py index d7c22b3..8c78e2b 100644 --- a/test-cli/test/tests/qnand.py +++ b/test-cli/test/tests/qnand.py @@ -22,34 +22,21 @@ class Qnand(unittest.TestCase): try: p = sh.nandtest("-m", self.__device) # save result - with open('/tmp/nand-nandtest.txt', 'w') as outfile: + with open('/tmp/station/nand-nandtest.txt', 'w') as outfile: n = outfile.write(p.stdout.decode('ascii')) + outfile.close() self.__resultlist.append( { - "desc": "nandtest output", - "data": "/tmp/nand-nandtest.txt", - "type": "file" + "description": "nandtest output", + "filepath": "/tmp/station/nand-nandtest.txt", + "mimetype": "text/plain" } ) - except sh.ErrorReturnCode as e: - self.__resultlist.append( - { - "desc": "Test result", - "data": "FAILED: could not complete nandtest command", - "type": "string" - } - ) self.fail("failed: could not complete nandtest command") - # Test successful - self.__resultlist.append( - { - "desc": "Test result", - "data": "OK", - "type": "string" - } - ) - def getresults(self): return self.__resultlist + + def gettextresult(self): + return "" diff --git a/test-cli/test/tests/qram.py b/test-cli/test/tests/qram.py index a46424f..21a01c8 100644 --- a/test-cli/test/tests/qram.py +++ b/test-cli/test/tests/qram.py @@ -26,25 +26,11 @@ class Qram(unittest.TestCase): def execute(self): try: p = sh.memtester(self.__memsize, "1", _out="/dev/null") - except sh.ErrorReturnCode as e: - self.__resultlist.append( - { - "desc": "Test result", - "data": "FAILED: could not complete memtester command", - "type": "string" - } - ) self.fail("failed: could not complete memtester command") - # Test successful - self.__resultlist.append( - { - "desc": "Test result", - "data": "OK", - "type": "string" - } - ) - def getresults(self): return self.__resultlist + + def gettextresult(self): + return "" diff --git a/test-cli/test/tests/qrtc.py b/test-cli/test/tests/qrtc.py index 6d4ecf6..df493d2 100644 --- a/test-cli/test/tests/qrtc.py +++ b/test-cli/test/tests/qrtc.py @@ -33,41 +33,14 @@ class Qrtc(unittest.TestCase): p.stdout.decode('ascii')) # check if the seconds of both times are different if time1 == time2: - self.__resultlist.append( - { - "desc": "Test result", - "data": "FAILED: RTC is not running", - "type": "string" - } - ) self.fail("failed: RTC is not running") else: - self.__resultlist.append( - { - "desc": "Test result", - "data": "FAILED: couldn't execute hwclock command for the second time", - "type": "string" - } - ) self.fail("failed: couldn't execute hwclock command for the second time") else: - self.__resultlist.append( - { - "desc": "Test result", - "data": "FAILED: couldn't execute hwclock command", - "type": "string" - } - ) self.fail("failed: couldn't execute hwclock command") - # Test successful - self.__resultlist.append( - { - "desc": "Test result", - "data": "OK", - "type": "string" - } - ) - def getresults(self): return self.__resultlist + + def gettextresult(self): + return "" diff --git a/test-cli/test/tests/qserial.py b/test-cli/test/tests/qserial.py index faca505..402e33f 100644 --- a/test-cli/test/tests/qserial.py +++ b/test-cli/test/tests/qserial.py @@ -42,34 +42,14 @@ class Qserial(unittest.TestCase): self.__serial.write(test_uuid) time.sleep(0.05) # there might be a small delay if self.__serial.inWaiting() == 0: - self.__resultlist.append( - { - "desc": "Test result", - "data": "FAILED: port {} wait timeout exceded".format(self.__port), - "type": "string" - } - ) self.fail("failed: port {} wait timeout exceded".format(self.__port)) else: # check if what it was sent is equal to what has been received if self.__serial.readline() != test_uuid: - self.__resultlist.append( - { - "desc": "Test result", - "data": "FAILED: port {} write/read mismatch".format(self.__port), - "type": "string" - } - ) self.fail("failed: port {} write/read mismatch".format(self.__port)) - # Test successful - self.__resultlist.append( - { - "desc": "Test result", - "data": "OK", - "type": "string" - } - ) - def getresults(self): return self.__resultlist + + def gettextresult(self): + return "" diff --git a/test-cli/test/tests/qusb.py b/test-cli/test/tests/qusb.py index 32d99ef..9b0cad3 100644 --- a/test-cli/test/tests/qusb.py +++ b/test-cli/test/tests/qusb.py @@ -22,13 +22,6 @@ class Qusb(unittest.TestCase): if dev_obj.getMassStorage(): device = dev_obj.getMassStorage()['disk'] + "1" else: - self.__resultlist.append( - { - "desc": "Test result", - "data": "FAILED: No USB memory found", - "type": "string" - } - ) self.fail("failed: No USB memory found.") # create a new folder where the pendrive is going to be mounted @@ -60,43 +53,16 @@ class Qusb(unittest.TestCase): sh.umount("/mnt/pendrive") # check result if q is None: - self.__resultlist.append( - { - "desc": "Test result", - "data": "FAILED: Wrong md5 result", - "type": "string" - } - ) self.fail("failed: Wrong md5 result.") else: # umount sh.umount("/mnt/pendrive") - self.__resultlist.append( - { - "desc": "Test result", - "data": "FAILED: Unable to copy files to the USB memory device", - "type": "string" - } - ) self.fail("failed: Unable to copy files to the USB memory device.") else: - self.__resultlist.append( - { - "desc": "Test result", - "data": "FAILED: Unable to mount the USB memory device", - "type": "string" - } - ) self.fail("failed: Unable to mount the USB memory device.") - # Test successful - self.__resultlist.append( - { - "desc": "Test result", - "data": "OK", - "type": "string" - } - ) - def getresults(self): return self.__resultlist + + def gettextresult(self): + return "" diff --git a/test-cli/test/tests/qwifi.py b/test-cli/test/tests/qwifi.py index 6f972d3..eb1f0bf 100644 --- a/test-cli/test/tests/qwifi.py +++ b/test-cli/test/tests/qwifi.py @@ -63,80 +63,33 @@ class Qwifi(unittest.TestCase): data = json.loads(p.stdout.decode('ascii')) self.__bwreal = float(data['end']['sum_received']['bits_per_second']) / 1024 / 1024 # save result file - with open('/tmp/wifi-iperf.json', 'w') as outfile: + with open('/tmp/station/wifi-iperf3.json', 'w') as outfile: json.dump(data, outfile, indent=4) + outfile.close() self.__resultlist.append( { - "desc": "iperf3 output", - "data": "/tmp/wifi-iperf.json", - "type": "file" + "description": "iperf3 output", + "filepath": "/tmp/station/wifi-iperf3.json", + "mimetype": "application/json" } ) # check if BW is in the expected range if self.__bwreal < float(self.__bwexpected): - self.__resultlist.append( - { - "desc": "Test result", - "data": "FAILED: speed is lower than expected. Speed(Mbits/s): " + str(self.__bwreal), - "type": "string" - } - ) self.fail("failed: speed is lower than expected. Speed(Mbits/s): " + str(self.__bwreal)) else: - self.__resultlist.append( - { - "desc": "Test result", - "data": "FAILED: could not complete iperf3 command", - "type": "string" - } - ) self.fail("failed: could not complete iperf3 command") else: - self.__resultlist.append( - { - "desc": "Test result", - "data": "FAILED: wlan0 interface doesn't have any ip address", - "type": "string" - } - ) self.fail("failed: wlan0 interface doesn't have any ip address.") else: - self.__resultlist.append( - { - "desc": "Test result", - "data": "FAILED: could not complete ifconfig command", - "type": "string" - } - ) self.fail("failed: could not complete ifconfig command.") else: - self.__resultlist.append( - { - "desc": "Test result", - "data": "FAILED: wifi module is not connected to the router", - "type": "string" - } - ) self.fail("failed: wifi module is not connected to the router.") else: - self.__resultlist.append( - { - "desc": "Test result", - "data": "FAILED: could not execute iw command", - "type": "string" - } - ) self.fail("failed: could not execute iw command") - # Test successful - self.__resultlist.append( - { - "desc": "Test result", - "data": "OK", - "type": "string" - } - ) - def getresults(self): return self.__resultlist + + def gettextresult(self): + return "" -- cgit v1.1 From b58a64c6993ab60f18a1a0ca8c90f167c7e6656b Mon Sep 17 00:00:00 2001 From: Hector Fernandez Date: Thu, 2 Jul 2020 14:45:48 +0200 Subject: Added support for IGEP0020 and IGEP0030. Fixed problem in Audio test. --- test-cli/test/tests/qaudio.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) (limited to 'test-cli/test/tests') diff --git a/test-cli/test/tests/qaudio.py b/test-cli/test/tests/qaudio.py index 5baffe4..ef0cf53 100644 --- a/test-cli/test/tests/qaudio.py +++ b/test-cli/test/tests/qaudio.py @@ -2,6 +2,7 @@ import unittest import sh import wave import contextlib +import os def calc_audio_duration(fname): @@ -25,10 +26,11 @@ class Qaudio(unittest.TestCase): self.__resultlist = [] def execute(self): + test_abspath = os.path.dirname(os.path.abspath(__file__)) + "/../" # analize audio file - recordtime = calc_audio_duration("test/files/dtmf-13579.wav") + 0.15 + recordtime = calc_audio_duration(os.path.join(test_abspath, "files/dtmf-13579.wav")) + 0.15 # play and record - p1 = sh.aplay("test/files/dtmf-13579.wav", _bg=True) + p1 = sh.aplay(os.path.join(test_abspath, "files/dtmf-13579.wav"), _bg=True) p2 = sh.arecord("-r", 8000, "-d", recordtime, "/tmp/station/recorded.wav", _bg=True) p1.wait() p2.wait() -- cgit v1.1 From 5dcd213c28451ac210703dc5bf9bf538671a0682 Mon Sep 17 00:00:00 2001 From: Hector Fernandez Date: Fri, 3 Jul 2020 13:55:45 +0200 Subject: Created new USB LOOP test. Moved all the test files to /var/lib/hwtest-files/ folder. --- test-cli/test/tests/qaudio.py | 6 ++-- test-cli/test/tests/qusb.py | 64 +++++++++++++++++++---------------- test-cli/test/tests/qusbdual.py | 74 +++++++++++++++++++++++++++++++++++++++++ 3 files changed, 112 insertions(+), 32 deletions(-) create mode 100644 test-cli/test/tests/qusbdual.py (limited to 'test-cli/test/tests') diff --git a/test-cli/test/tests/qaudio.py b/test-cli/test/tests/qaudio.py index ef0cf53..acad4a9 100644 --- a/test-cli/test/tests/qaudio.py +++ b/test-cli/test/tests/qaudio.py @@ -2,7 +2,6 @@ import unittest import sh import wave import contextlib -import os def calc_audio_duration(fname): @@ -26,11 +25,10 @@ class Qaudio(unittest.TestCase): self.__resultlist = [] def execute(self): - test_abspath = os.path.dirname(os.path.abspath(__file__)) + "/../" # analize audio file - recordtime = calc_audio_duration(os.path.join(test_abspath, "files/dtmf-13579.wav")) + 0.15 + recordtime = calc_audio_duration("/var/lib/hwtest-files/dtmf-13579.wav") + 0.15 # play and record - p1 = sh.aplay(os.path.join(test_abspath, "files/dtmf-13579.wav"), _bg=True) + p1 = sh.aplay("/var/lib/hwtest-files/dtmf-13579.wav", _bg=True) p2 = sh.arecord("-r", 8000, "-d", recordtime, "/tmp/station/recorded.wav", _bg=True) p1.wait() p2.wait() diff --git a/test-cli/test/tests/qusb.py b/test-cli/test/tests/qusb.py index 9b0cad3..df1248d 100644 --- a/test-cli/test/tests/qusb.py +++ b/test-cli/test/tests/qusb.py @@ -1,9 +1,6 @@ import sh import unittest -import re -import os from test.helpers.usb import USBDevices -from test.helpers.changedir import changedir class Qusb(unittest.TestCase): @@ -14,6 +11,10 @@ class Qusb(unittest.TestCase): self.params = varlist super(Qusb, self).__init__(testfunc) self._testMethodDoc = testname + if "repetitions" in varlist: + self.__repetitions = varlist["repetitions"] + else: + raise Exception('repetitions param inside Qusb must be defined') self.__resultlist = [] def execute(self): @@ -23,43 +24,50 @@ class Qusb(unittest.TestCase): device = dev_obj.getMassStorage()['disk'] + "1" else: self.fail("failed: No USB memory found.") - - # create a new folder where the pendrive is going to be mounted - sh.mkdir("-p", "/mnt/pendrive") # check if the device is mounted, and umount it try: p = sh.findmnt("-n", device) if p.exit_code == 0: sh.umount(device) - except sh.ErrorReturnCode_1: + except sh.ErrorReturnCode as e: # error = 1 means "no found" pass # mount the device + sh.mkdir("-p", "/mnt/pendrive") p = sh.mount(device, "/mnt/pendrive") - if p.exit_code == 0: + if p.exit_code != 0: + self.fail("failed: Unable to mount the USB memory device.") + # execute test + for i in range(self.__repetitions): # copy files - test_abspath = os.path.dirname(os.path.abspath(__file__)) + "/../" - p = sh.cp(os.path.join(test_abspath, "files/usbtest/usbdatatest.bin"), - os.path.join(test_abspath, "files/usbtest/usbdatatest.md5"), - "/mnt/pendrive") - if p.exit_code == 0: - # check - with changedir("/mnt/pendrive/"): - p = sh.md5sum("-c", "usbdatatest.md5") - q = re.search("OK", p.stdout.decode('ascii')) - # delete files - sh.rm("-f", "/mnt/pendrive/usbdatatest.bin", "/mnt/pendrive/usbdatatest.md5") - # umount - sh.umount("/mnt/pendrive") - # check result - if q is None: - self.fail("failed: Wrong md5 result.") - else: - # umount + try: + p = sh.cp("/var/lib/hwtest-files/usbdatatest.bin", + "/var/lib/hwtest-files/usbdatatest.bin.md5", + "/mnt/pendrive") + except sh.ErrorReturnCode as e: sh.umount("/mnt/pendrive") + sh.rmdir("/mnt/pendrive") self.fail("failed: Unable to copy files to the USB memory device.") - else: - self.fail("failed: Unable to mount the USB memory device.") + # check MD5 + try: + p = sh.md5sum("/mnt/pendrive/usbdatatest.bin") + except sh.ErrorReturnCode as e: + sh.umount("/mnt/pendrive") + sh.rmdir("/mnt/pendrive") + self.fail("failed: Unable to calculate MD5 of the copied file.") + newmd5 = p.stdout.decode().split(" ")[0] + with open('/mnt/pendrive/usbdatatest.bin.md5', 'r') as outfile: + oldmd5 = outfile.read() + outfile.close() + if newmd5 != oldmd5: + sh.umount("/mnt/pendrive") + sh.rmdir("/mnt/pendrive") + self.fail("failed: MD5 check failed.") + # delete copied files + sh.rm("-f", "/mnt/pendrive/usbdatatest.bin", "/mnt/pendrive/usbdatatest.bin.md5") + # Finish + sh.umount("/mnt/pendrive") + sh.rmdir("/mnt/pendrive") def getresults(self): return self.__resultlist diff --git a/test-cli/test/tests/qusbdual.py b/test-cli/test/tests/qusbdual.py new file mode 100644 index 0000000..bb295b8 --- /dev/null +++ b/test-cli/test/tests/qusbdual.py @@ -0,0 +1,74 @@ +import sh +import unittest +import os.path + + +class Qusbdual(unittest.TestCase): + params = None + __resultlist = None # resultlist is a python list of python dictionaries + + def __init__(self, testname, testfunc, varlist): + self.params = varlist + super(Qusbdual, self).__init__(testfunc) + self._testMethodDoc = testname + if "repetitions" in varlist: + self.__repetitions = varlist["repetitions"] + else: + raise Exception('repetitions param inside Qusbdual must be defined') + self.__resultlist = [] + + def execute(self): + # check if file-as-filesystem exists + if not os.path.isfile('/var/lib/hwtest-files/mass-otg-test.img'): + self.fail("failed: Unable to find file-as-filesystem image.") + # generate mass storage gadget + p = sh.modprobe("g_mass_storage", "file=/var/lib/hwtest-files/mass-otg-test.img") + if p.exit_code != 0: + self.fail("failed: Unable to create a mass storage gadget.") + # find the mass storage device + try: + p = sh.grep(sh.lsblk("-So", "NAME,VENDOR"), "Linux") + except sh.ErrorReturnCode as e: + self.fail("failed: could not find any mass storage gadget") + device = p.stdout.decode().split(" ")[0] + # mount the mass storage gadget + sh.mkdir("-p", "/tmp/station/hdd_gadget") + p = sh.mount("-o", "ro", "/dev/" + device, "/tmp/station/hdd_gadget") + if p.exit_code != 0: + self.fail("failed: Unable to mount the mass storage gadget.") + # execute test + for i in range(self.__repetitions): + # copy files + try: + p = sh.cp("/tmp/station/hdd_gadget/usb-test.bin", "/tmp/station/hdd_gadget/usb-test.bin.md5", + "/tmp/stationnfs") + except sh.ErrorReturnCode as e: + sh.umount("/tmp/station/hdd_gadget") + sh.rmdir("/tmp/station/hdd_gadget") + self.fail("failed: Unable to copy files through USB.") + # Check md5 + try: + p = sh.md5sum("/tmp/stationnfs/usb-test.bin") + except sh.ErrorReturnCode as e: + sh.umount("/tmp/station/hdd_gadget") + sh.rmdir("/tmp/station/hdd_gadget") + self.fail("failed: Unable to calculate MD5 of the copied file.") + newmd5 = p.stdout.decode().split(" ")[0] + with open('/tmp/station/hdd_gadget/usb-test.bin.md5', 'r') as outfile: + oldmd5 = outfile.read() + outfile.close() + if newmd5 != oldmd5: + sh.umount("/tmp/station/hdd_gadget") + sh.rmdir("/tmp/station/hdd_gadget") + self.fail("failed: MD5 check failed.") + # delete copied files + sh.rm("-f", "/tmp/stationnfs/usb-test.bin", "/tmp/stationnfs/usb-test.bin.md5") + # Finish + sh.umount("/tmp/station/hdd_gadget") + sh.rmdir("/tmp/station/hdd_gadget") + + def getresults(self): + return self.__resultlist + + def gettextresult(self): + return "" -- cgit v1.1 From 0e8e3ecd4b9be71c41008b95950d089819622dff Mon Sep 17 00:00:00 2001 From: Hector Fernandez Date: Mon, 6 Jul 2020 14:08:27 +0200 Subject: Corrected some errors in USB and USBDUAL tests. --- test-cli/test/tests/qdmesg.py | 3 +-- test-cli/test/tests/qusb.py | 50 ++++++++++++++++++++++++++++------------- test-cli/test/tests/qusbdual.py | 7 +++++- 3 files changed, 41 insertions(+), 19 deletions(-) (limited to 'test-cli/test/tests') diff --git a/test-cli/test/tests/qdmesg.py b/test-cli/test/tests/qdmesg.py index 45dd4eb..8117deb 100644 --- a/test-cli/test/tests/qdmesg.py +++ b/test-cli/test/tests/qdmesg.py @@ -2,6 +2,7 @@ import sh import os.path from os import path + class Qdmesg: params = None __resultlist = None # resultlist is a python list of python dictionaries @@ -17,11 +18,9 @@ class Qdmesg: self.pgObj.run_test(self.params["testidctl"], self.params["testid"]) # delete previous file if path.exists("/tmp/station/dmesg.txt"): - print("dmesg remove") os.remove("/tmp/station/dmesg.txt") # generate file p = sh.dmesg("--color=never", _out="/tmp/station/dmesg.txt") - print("Exit code: {}".format(p.exit_code)) if p.exit_code == 0: # save result # with open('/tmp/station/dmesg.txt', 'w') as outfile: diff --git a/test-cli/test/tests/qusb.py b/test-cli/test/tests/qusb.py index df1248d..b5d152e 100644 --- a/test-cli/test/tests/qusb.py +++ b/test-cli/test/tests/qusb.py @@ -1,6 +1,7 @@ import sh import unittest from test.helpers.usb import USBDevices +import os.path class Qusb(unittest.TestCase): @@ -33,41 +34,58 @@ class Qusb(unittest.TestCase): # error = 1 means "no found" pass # mount the device - sh.mkdir("-p", "/mnt/pendrive") - p = sh.mount(device, "/mnt/pendrive") + sh.mkdir("-p", "/tmp/station/pendrive") + p = sh.mount(device, "/tmp/station/pendrive") if p.exit_code != 0: self.fail("failed: Unable to mount the USB memory device.") # execute test - for i in range(self.__repetitions): + for i in range(int(self.__repetitions)): # copy files try: p = sh.cp("/var/lib/hwtest-files/usbdatatest.bin", "/var/lib/hwtest-files/usbdatatest.bin.md5", - "/mnt/pendrive") + "/tmp/station/pendrive") except sh.ErrorReturnCode as e: - sh.umount("/mnt/pendrive") - sh.rmdir("/mnt/pendrive") + try: + sh.umount("/tmp/station/pendrive") + except sh.ErrorReturnCode: + pass + sh.rmdir("/tmp/station/pendrive") self.fail("failed: Unable to copy files to the USB memory device.") + # check if the device is still mounted + if not os.path.ismount("/tmp/station/pendrive"): + sh.rm("/tmp/station/pendrive/*") + sh.rmdir("/tmp/station/pendrive") + self.fail("failed: USB device unmounted during/after copying files.") # check MD5 try: - p = sh.md5sum("/mnt/pendrive/usbdatatest.bin") + p = sh.md5sum("/tmp/station/pendrive/usbdatatest.bin") except sh.ErrorReturnCode as e: - sh.umount("/mnt/pendrive") - sh.rmdir("/mnt/pendrive") + try: + sh.umount("/tmp/station/pendrive") + except sh.ErrorReturnCode: + pass + sh.rmdir("/tmp/station/pendrive") self.fail("failed: Unable to calculate MD5 of the copied file.") newmd5 = p.stdout.decode().split(" ")[0] - with open('/mnt/pendrive/usbdatatest.bin.md5', 'r') as outfile: - oldmd5 = outfile.read() + with open('/tmp/station/pendrive/usbdatatest.bin.md5', 'r') as outfile: + oldmd5 = outfile.read().rstrip("\n") outfile.close() if newmd5 != oldmd5: - sh.umount("/mnt/pendrive") - sh.rmdir("/mnt/pendrive") + try: + sh.umount("/tmp/station/pendrive") + except sh.ErrorReturnCode: + pass + sh.rmdir("/tmp/station/pendrive") self.fail("failed: MD5 check failed.") # delete copied files - sh.rm("-f", "/mnt/pendrive/usbdatatest.bin", "/mnt/pendrive/usbdatatest.bin.md5") + sh.rm("-f", "/tmp/station/pendrive/usbdatatest.bin", "/tmp/station/pendrive/usbdatatest.bin.md5") # Finish - sh.umount("/mnt/pendrive") - sh.rmdir("/mnt/pendrive") + try: + sh.umount("/tmp/station/pendrive") + except sh.ErrorReturnCode: + pass + sh.rmdir("/tmp/station/pendrive") def getresults(self): return self.__resultlist diff --git a/test-cli/test/tests/qusbdual.py b/test-cli/test/tests/qusbdual.py index bb295b8..ad95b84 100644 --- a/test-cli/test/tests/qusbdual.py +++ b/test-cli/test/tests/qusbdual.py @@ -37,7 +37,7 @@ class Qusbdual(unittest.TestCase): if p.exit_code != 0: self.fail("failed: Unable to mount the mass storage gadget.") # execute test - for i in range(self.__repetitions): + for i in range(int(self.__repetitions)): # copy files try: p = sh.cp("/tmp/station/hdd_gadget/usb-test.bin", "/tmp/station/hdd_gadget/usb-test.bin.md5", @@ -46,6 +46,11 @@ class Qusbdual(unittest.TestCase): sh.umount("/tmp/station/hdd_gadget") sh.rmdir("/tmp/station/hdd_gadget") self.fail("failed: Unable to copy files through USB.") + # check if the device is still mounted + if not os.path.ismount("/tmp/station/hdd_gadget"): + sh.rm("/tmp/station/hdd_gadget/*") + sh.rmdir("/tmp/station/hdd_gadget") + self.fail("failed: USB device unmounted during/after copying files.") # Check md5 try: p = sh.md5sum("/tmp/stationnfs/usb-test.bin") -- cgit v1.1 From cd3c8dd78e3bcdd442967583e3814db3701871c0 Mon Sep 17 00:00:00 2001 From: Hector Fernandez Date: Mon, 6 Jul 2020 16:01:08 +0200 Subject: Solved minor errors with audio and usbloop tests. --- test-cli/test/tests/qaudio.py | 7 +++++-- test-cli/test/tests/qusbdual.py | 2 +- 2 files changed, 6 insertions(+), 3 deletions(-) (limited to 'test-cli/test/tests') diff --git a/test-cli/test/tests/qaudio.py b/test-cli/test/tests/qaudio.py index acad4a9..f7ae33c 100644 --- a/test-cli/test/tests/qaudio.py +++ b/test-cli/test/tests/qaudio.py @@ -2,6 +2,7 @@ import unittest import sh import wave import contextlib +import time def calc_audio_duration(fname): @@ -27,6 +28,8 @@ class Qaudio(unittest.TestCase): def execute(self): # analize audio file recordtime = calc_audio_duration("/var/lib/hwtest-files/dtmf-13579.wav") + 0.15 + # wait time before playing + time.sleep(1) # play and record p1 = sh.aplay("/var/lib/hwtest-files/dtmf-13579.wav", _bg=True) p2 = sh.arecord("-r", 8000, "-d", recordtime, "/tmp/station/recorded.wav", _bg=True) @@ -42,8 +45,8 @@ class Qaudio(unittest.TestCase): self.__dtmf_secuence_result.append(li.split(" ")[1]) # compare original and processed dtmf sequence if len(self.__dtmf_secuence) == len(self.__dtmf_secuence_result): - for i in range(len(self.__dtmf_secuence)): - if self.__dtmf_secuence[i] != self.__dtmf_secuence_result[i]: + for a, b in zip(self.__dtmf_secuence, self.__dtmf_secuence_result): + if a != b: self.fail("failed: sent and received DTMF sequence don't match.") else: self.fail("failed: received DTMF sequence is shorter than expected.") diff --git a/test-cli/test/tests/qusbdual.py b/test-cli/test/tests/qusbdual.py index ad95b84..842d514 100644 --- a/test-cli/test/tests/qusbdual.py +++ b/test-cli/test/tests/qusbdual.py @@ -60,7 +60,7 @@ class Qusbdual(unittest.TestCase): self.fail("failed: Unable to calculate MD5 of the copied file.") newmd5 = p.stdout.decode().split(" ")[0] with open('/tmp/station/hdd_gadget/usb-test.bin.md5', 'r') as outfile: - oldmd5 = outfile.read() + oldmd5 = outfile.read().rstrip("\n") outfile.close() if newmd5 != oldmd5: sh.umount("/tmp/station/hdd_gadget") -- cgit v1.1 From 1d51a80b57cc8c80c78d67c85290503997060e9e Mon Sep 17 00:00:00 2001 From: Hector Fernandez Date: Mon, 6 Jul 2020 17:22:17 +0200 Subject: Modified paths of ram and nfs temporary folders. --- test-cli/test/tests/qamper.py | 4 ++-- test-cli/test/tests/qaudio.py | 10 +++++----- test-cli/test/tests/qdmesg.py | 13 ++++--------- test-cli/test/tests/qethernet.py | 4 ++-- test-cli/test/tests/qnand.py | 4 ++-- test-cli/test/tests/qusb.py | 34 +++++++++++++++++----------------- test-cli/test/tests/qusbdual.py | 34 +++++++++++++++++----------------- test-cli/test/tests/qwifi.py | 6 +++--- 8 files changed, 52 insertions(+), 57 deletions(-) (limited to 'test-cli/test/tests') diff --git a/test-cli/test/tests/qamper.py b/test-cli/test/tests/qamper.py index b4d57e3..58054c9 100644 --- a/test-cli/test/tests/qamper.py +++ b/test-cli/test/tests/qamper.py @@ -35,13 +35,13 @@ class Qamper(unittest.TestCase): # get current value (in Amperes) self.current = amp.getCurrent() # save result in a file - with open('/tmp/station/amper.txt', 'w') as outfile: + with open('/mnt/station_ramdisk/amper.txt', 'w') as outfile: n = outfile.write("Current: {} A".format(self.current)) outfile.close() self.__resultlist.append( { "description": "Amperimeter values", - "filepath": "/tmp/station/amper.txt", + "filepath": "/mnt/station_ramdisk/amper.txt", "mimetype": "text/plain" } ) diff --git a/test-cli/test/tests/qaudio.py b/test-cli/test/tests/qaudio.py index f7ae33c..bfbb051 100644 --- a/test-cli/test/tests/qaudio.py +++ b/test-cli/test/tests/qaudio.py @@ -2,7 +2,6 @@ import unittest import sh import wave import contextlib -import time def calc_audio_duration(fname): @@ -28,15 +27,15 @@ class Qaudio(unittest.TestCase): def execute(self): # analize audio file recordtime = calc_audio_duration("/var/lib/hwtest-files/dtmf-13579.wav") + 0.15 - # wait time before playing - time.sleep(1) + # previous play to estabilise audio lines + sh.aplay("/var/lib/hwtest-files/dtmf-13579.wav") # play and record p1 = sh.aplay("/var/lib/hwtest-files/dtmf-13579.wav", _bg=True) - p2 = sh.arecord("-r", 8000, "-d", recordtime, "/tmp/station/recorded.wav", _bg=True) + p2 = sh.arecord("-r", 8000, "-d", recordtime, "/mnt/station_ramdisk/recorded.wav", _bg=True) p1.wait() p2.wait() if p1.exit_code == 0 and p2.exit_code == 0: - p = sh.multimon("-t", "wav", "-a", "DTMF", "/tmp/station/recorded.wav", "-q") + p = sh.multimon("-t", "wav", "-a", "DTMF", "/mnt/station_ramdisk/recorded.wav", "-q") if p.exit_code == 0: lines = p.stdout.decode('ascii').splitlines() self.__dtmf_secuence_result = [] @@ -49,6 +48,7 @@ class Qaudio(unittest.TestCase): if a != b: self.fail("failed: sent and received DTMF sequence don't match.") else: + print(self.__dtmf_secuence_result) self.fail("failed: received DTMF sequence is shorter than expected.") else: self.fail("failed: unable to use multimon command.") diff --git a/test-cli/test/tests/qdmesg.py b/test-cli/test/tests/qdmesg.py index 8117deb..b35f1ff 100644 --- a/test-cli/test/tests/qdmesg.py +++ b/test-cli/test/tests/qdmesg.py @@ -17,19 +17,14 @@ class Qdmesg: print("running dmesg test") self.pgObj.run_test(self.params["testidctl"], self.params["testid"]) # delete previous file - if path.exists("/tmp/station/dmesg.txt"): - os.remove("/tmp/station/dmesg.txt") + if path.exists("/mnt/station_ramdisk/dmesg.txt"): + os.remove("/mnt/station_ramdisk/dmesg.txt") # generate file - p = sh.dmesg("--color=never", _out="/tmp/station/dmesg.txt") + p = sh.dmesg("--color=never", _out="/mnt/station_ramdisk/dmesg.txt") if p.exit_code == 0: - # save result - # with open('/tmp/station/dmesg.txt', 'w') as outfile: - # n = outfile.write(p.stdout.decode('ascii')) - # outfile.close() - # save dmesg result in DB self.pgObj.upload_result_file(self.params["testidctl"], self.params["testid"], "dmesg output", - "/tmp/station/dmesg.txt", "text/plain") + "/mnt/station_ramdisk/dmesg.txt", "text/plain") self.pgObj.finish_test(self.params["testidctl"], self.params["testid"], "TEST_COMPLETE", "") print("success dmesg test") else: diff --git a/test-cli/test/tests/qethernet.py b/test-cli/test/tests/qethernet.py index 3b2f197..d38de34 100644 --- a/test-cli/test/tests/qethernet.py +++ b/test-cli/test/tests/qethernet.py @@ -52,13 +52,13 @@ class Qethernet(unittest.TestCase): data = json.loads(p.stdout.decode('ascii')) self.__bwreal = float(data['end']['sum_received']['bits_per_second'])/1024/1024 # save result file - with open('/tmp/station/ethernet-iperf3.json', 'w') as outfile: + with open('/mnt/station_ramdisk/ethernet-iperf3.json', 'w') as outfile: json.dump(data, outfile, indent=4) outfile.close() self.__resultlist.append( { "description": "iperf3 output", - "filepath": "/tmp/station/ethernet-iperf3.json", + "filepath": "/mnt/station_ramdisk/ethernet-iperf3.json", "mimetype": "application/json" } ) diff --git a/test-cli/test/tests/qnand.py b/test-cli/test/tests/qnand.py index 8c78e2b..988ab60 100644 --- a/test-cli/test/tests/qnand.py +++ b/test-cli/test/tests/qnand.py @@ -22,13 +22,13 @@ class Qnand(unittest.TestCase): try: p = sh.nandtest("-m", self.__device) # save result - with open('/tmp/station/nand-nandtest.txt', 'w') as outfile: + with open('/mnt/station_ramdisk/nand-nandtest.txt', 'w') as outfile: n = outfile.write(p.stdout.decode('ascii')) outfile.close() self.__resultlist.append( { "description": "nandtest output", - "filepath": "/tmp/station/nand-nandtest.txt", + "filepath": "/mnt/station_ramdisk/nand-nandtest.txt", "mimetype": "text/plain" } ) diff --git a/test-cli/test/tests/qusb.py b/test-cli/test/tests/qusb.py index b5d152e..6302012 100644 --- a/test-cli/test/tests/qusb.py +++ b/test-cli/test/tests/qusb.py @@ -34,8 +34,8 @@ class Qusb(unittest.TestCase): # error = 1 means "no found" pass # mount the device - sh.mkdir("-p", "/tmp/station/pendrive") - p = sh.mount(device, "/tmp/station/pendrive") + sh.mkdir("-p", "/mnt/station_ramdisk/pendrive") + p = sh.mount(device, "/mnt/station_ramdisk/pendrive") if p.exit_code != 0: self.fail("failed: Unable to mount the USB memory device.") # execute test @@ -44,48 +44,48 @@ class Qusb(unittest.TestCase): try: p = sh.cp("/var/lib/hwtest-files/usbdatatest.bin", "/var/lib/hwtest-files/usbdatatest.bin.md5", - "/tmp/station/pendrive") + "/mnt/station_ramdisk/pendrive") except sh.ErrorReturnCode as e: try: - sh.umount("/tmp/station/pendrive") + sh.umount("/mnt/station_ramdisk/pendrive") except sh.ErrorReturnCode: pass - sh.rmdir("/tmp/station/pendrive") + sh.rmdir("/mnt/station_ramdisk/pendrive") self.fail("failed: Unable to copy files to the USB memory device.") # check if the device is still mounted - if not os.path.ismount("/tmp/station/pendrive"): - sh.rm("/tmp/station/pendrive/*") - sh.rmdir("/tmp/station/pendrive") + if not os.path.ismount("/mnt/station_ramdisk/pendrive"): + sh.rm("/mnt/station_ramdisk/pendrive/*") + sh.rmdir("/mnt/station_ramdisk/pendrive") self.fail("failed: USB device unmounted during/after copying files.") # check MD5 try: - p = sh.md5sum("/tmp/station/pendrive/usbdatatest.bin") + p = sh.md5sum("/mnt/station_ramdisk/pendrive/usbdatatest.bin") except sh.ErrorReturnCode as e: try: - sh.umount("/tmp/station/pendrive") + sh.umount("/mnt/station_ramdisk/pendrive") except sh.ErrorReturnCode: pass - sh.rmdir("/tmp/station/pendrive") + sh.rmdir("/mnt/station_ramdisk/pendrive") self.fail("failed: Unable to calculate MD5 of the copied file.") newmd5 = p.stdout.decode().split(" ")[0] - with open('/tmp/station/pendrive/usbdatatest.bin.md5', 'r') as outfile: + with open('/mnt/station_ramdisk/pendrive/usbdatatest.bin.md5', 'r') as outfile: oldmd5 = outfile.read().rstrip("\n") outfile.close() if newmd5 != oldmd5: try: - sh.umount("/tmp/station/pendrive") + sh.umount("/mnt/station_ramdisk/pendrive") except sh.ErrorReturnCode: pass - sh.rmdir("/tmp/station/pendrive") + sh.rmdir("/mnt/station_ramdisk/pendrive") self.fail("failed: MD5 check failed.") # delete copied files - sh.rm("-f", "/tmp/station/pendrive/usbdatatest.bin", "/tmp/station/pendrive/usbdatatest.bin.md5") + sh.rm("-f", "/mnt/station_ramdisk/pendrive/usbdatatest.bin", "/mnt/station_ramdisk/pendrive/usbdatatest.bin.md5") # Finish try: - sh.umount("/tmp/station/pendrive") + sh.umount("/mnt/station_ramdisk/pendrive") except sh.ErrorReturnCode: pass - sh.rmdir("/tmp/station/pendrive") + sh.rmdir("/mnt/station_ramdisk/pendrive") def getresults(self): return self.__resultlist diff --git a/test-cli/test/tests/qusbdual.py b/test-cli/test/tests/qusbdual.py index 842d514..054d7f4 100644 --- a/test-cli/test/tests/qusbdual.py +++ b/test-cli/test/tests/qusbdual.py @@ -32,45 +32,45 @@ class Qusbdual(unittest.TestCase): self.fail("failed: could not find any mass storage gadget") device = p.stdout.decode().split(" ")[0] # mount the mass storage gadget - sh.mkdir("-p", "/tmp/station/hdd_gadget") - p = sh.mount("-o", "ro", "/dev/" + device, "/tmp/station/hdd_gadget") + sh.mkdir("-p", "/mnt/station_ramdisk/hdd_gadget") + p = sh.mount("-o", "ro", "/dev/" + device, "/mnt/station_ramdisk/hdd_gadget") if p.exit_code != 0: self.fail("failed: Unable to mount the mass storage gadget.") # execute test for i in range(int(self.__repetitions)): # copy files try: - p = sh.cp("/tmp/station/hdd_gadget/usb-test.bin", "/tmp/station/hdd_gadget/usb-test.bin.md5", + p = sh.cp("/mnt/station_ramdisk/hdd_gadget/usb-test.bin", "/mnt/station_ramdisk/hdd_gadget/usb-test.bin.md5", "/tmp/stationnfs") except sh.ErrorReturnCode as e: - sh.umount("/tmp/station/hdd_gadget") - sh.rmdir("/tmp/station/hdd_gadget") + sh.umount("/mnt/station_ramdisk/hdd_gadget") + sh.rmdir("/mnt/station_ramdisk/hdd_gadget") self.fail("failed: Unable to copy files through USB.") # check if the device is still mounted - if not os.path.ismount("/tmp/station/hdd_gadget"): - sh.rm("/tmp/station/hdd_gadget/*") - sh.rmdir("/tmp/station/hdd_gadget") + if not os.path.ismount("/mnt/station_ramdisk/hdd_gadget"): + sh.rm("/mnt/station_ramdisk/hdd_gadget/*") + sh.rmdir("/mnt/station_ramdisk/hdd_gadget") self.fail("failed: USB device unmounted during/after copying files.") # Check md5 try: - p = sh.md5sum("/tmp/stationnfs/usb-test.bin") + p = sh.md5sum("/mnt/station_nfsdisk/usb-test.bin") except sh.ErrorReturnCode as e: - sh.umount("/tmp/station/hdd_gadget") - sh.rmdir("/tmp/station/hdd_gadget") + sh.umount("/mnt/station_ramdisk/hdd_gadget") + sh.rmdir("/mnt/station_ramdisk/hdd_gadget") self.fail("failed: Unable to calculate MD5 of the copied file.") newmd5 = p.stdout.decode().split(" ")[0] - with open('/tmp/station/hdd_gadget/usb-test.bin.md5', 'r') as outfile: + with open('/mnt/station_ramdisk/hdd_gadget/usb-test.bin.md5', 'r') as outfile: oldmd5 = outfile.read().rstrip("\n") outfile.close() if newmd5 != oldmd5: - sh.umount("/tmp/station/hdd_gadget") - sh.rmdir("/tmp/station/hdd_gadget") + sh.umount("/mnt/station_ramdisk/hdd_gadget") + sh.rmdir("/mnt/station_ramdisk/hdd_gadget") self.fail("failed: MD5 check failed.") # delete copied files - sh.rm("-f", "/tmp/stationnfs/usb-test.bin", "/tmp/stationnfs/usb-test.bin.md5") + sh.rm("-f", "/mnt/station_nfsdisk/usb-test.bin", "/mnt/station_nfsdisk/usb-test.bin.md5") # Finish - sh.umount("/tmp/station/hdd_gadget") - sh.rmdir("/tmp/station/hdd_gadget") + sh.umount("/mnt/station_ramdisk/hdd_gadget") + sh.rmdir("/mnt/station_ramdisk/hdd_gadget") def getresults(self): return self.__resultlist diff --git a/test-cli/test/tests/qwifi.py b/test-cli/test/tests/qwifi.py index eb1f0bf..4522057 100644 --- a/test-cli/test/tests/qwifi.py +++ b/test-cli/test/tests/qwifi.py @@ -51,7 +51,7 @@ class Qwifi(unittest.TestCase): # execute iperf command against the server try: p = sh.iperf3("-c", self.__serverip, "-n", self.__numbytestx, "-f", "m", "-p", self.__port, - "-J", _timeout=20) + "-J", _timeout=30) except sh.TimeoutException: self.fail("failed: iperf timeout reached") @@ -63,13 +63,13 @@ class Qwifi(unittest.TestCase): data = json.loads(p.stdout.decode('ascii')) self.__bwreal = float(data['end']['sum_received']['bits_per_second']) / 1024 / 1024 # save result file - with open('/tmp/station/wifi-iperf3.json', 'w') as outfile: + with open('/mnt/station_ramdisk/wifi-iperf3.json', 'w') as outfile: json.dump(data, outfile, indent=4) outfile.close() self.__resultlist.append( { "description": "iperf3 output", - "filepath": "/tmp/station/wifi-iperf3.json", + "filepath": "/mnt/station_ramdisk/wifi-iperf3.json", "mimetype": "application/json" } ) -- cgit v1.1 From 47827e40f71696e04ad805296dedc44a03451db3 Mon Sep 17 00:00:00 2001 From: Hector Fernandez Date: Mon, 6 Jul 2020 17:51:09 +0200 Subject: In case iperf3 server is busy, clients wait and retry. --- test-cli/test/tests/qaudio.py | 1 - test-cli/test/tests/qethernet.py | 19 +++++++++++++------ test-cli/test/tests/qwifi.py | 19 +++++++++++++------ 3 files changed, 26 insertions(+), 13 deletions(-) (limited to 'test-cli/test/tests') diff --git a/test-cli/test/tests/qaudio.py b/test-cli/test/tests/qaudio.py index bfbb051..364d8b2 100644 --- a/test-cli/test/tests/qaudio.py +++ b/test-cli/test/tests/qaudio.py @@ -48,7 +48,6 @@ class Qaudio(unittest.TestCase): if a != b: self.fail("failed: sent and received DTMF sequence don't match.") else: - print(self.__dtmf_secuence_result) self.fail("failed: received DTMF sequence is shorter than expected.") else: self.fail("failed: unable to use multimon command.") diff --git a/test-cli/test/tests/qethernet.py b/test-cli/test/tests/qethernet.py index d38de34..81acef1 100644 --- a/test-cli/test/tests/qethernet.py +++ b/test-cli/test/tests/qethernet.py @@ -1,6 +1,7 @@ import unittest import sh import json +import time class Qethernet(unittest.TestCase): @@ -11,6 +12,7 @@ class Qethernet(unittest.TestCase): params = None __bwreal = None __resultlist = None # resultlist is a python list of python dictionaries + timebetweenattempts = 5 # varlist content: serverip, bwexpected, port def __init__(self, testname, testfunc, varlist): @@ -37,12 +39,17 @@ class Qethernet(unittest.TestCase): self.__resultlist = [] def execute(self): - # execute iperf command against the server - try: - p = sh.iperf3("-c", self.__serverip, "-n", self.__numbytestx, "-f", "m", "-p", self.__port, "-J", - _timeout=20) - except sh.TimeoutException: - self.fail("failed: iperf timeout reached") + # execute iperf command against the server, but it implements attempts in case the server is busy + iperfdone = False + while not iperfdone: + try: + p = sh.iperf3("-c", self.__serverip, "-n", self.__numbytestx, "-f", "m", "-p", self.__port, "-J", + _timeout=20) + iperfdone = True + except sh.TimeoutException: + self.fail("failed: iperf timeout reached") + except sh.ErrorReturnCode: + time.sleep(self.timebetweenattempts) # check if it was executed succesfully if p.exit_code == 0: diff --git a/test-cli/test/tests/qwifi.py b/test-cli/test/tests/qwifi.py index 4522057..3dd9e38 100644 --- a/test-cli/test/tests/qwifi.py +++ b/test-cli/test/tests/qwifi.py @@ -2,6 +2,7 @@ import unittest import sh import re import json +import time class Qwifi(unittest.TestCase): @@ -12,6 +13,7 @@ class Qwifi(unittest.TestCase): params = None __bwreal = None __resultlist = None # resultlist is a python list of python dictionaries + timebetweenattempts = 5 # varlist content: serverip, bwexpected, port def __init__(self, testname, testfunc, varlist): @@ -48,12 +50,17 @@ class Qwifi(unittest.TestCase): 'inet addr:(?!127\.0{1,3}\.0{1,3}\.0{0,2}1$)((25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.){3}(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)', p.stdout.decode('ascii')) if result: - # execute iperf command against the server - try: - p = sh.iperf3("-c", self.__serverip, "-n", self.__numbytestx, "-f", "m", "-p", self.__port, - "-J", _timeout=30) - except sh.TimeoutException: - self.fail("failed: iperf timeout reached") + # execute iperf command against the server, but it implements attempts in case the server is busy + iperfdone = False + while not iperfdone: + try: + p = sh.iperf3("-c", self.__serverip, "-n", self.__numbytestx, "-f", "m", "-p", self.__port, + "-J", _timeout=20) + iperfdone = True + except sh.TimeoutException: + self.fail("failed: iperf timeout reached") + except sh.ErrorReturnCode: + time.sleep(self.timebetweenattempts) # check if it was executed succesfully if p.exit_code == 0: -- cgit v1.1 From 43f688b414ea182cd9baf06ee83df072c3f563f5 Mon Sep 17 00:00:00 2001 From: Hector Fernandez Date: Tue, 7 Jul 2020 09:57:58 +0200 Subject: Added full support for USBLOOP test. Reboot board always if autotest is enabled. --- test-cli/test/tests/qusbdual.py | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) (limited to 'test-cli/test/tests') diff --git a/test-cli/test/tests/qusbdual.py b/test-cli/test/tests/qusbdual.py index 054d7f4..fcbb9aa 100644 --- a/test-cli/test/tests/qusbdual.py +++ b/test-cli/test/tests/qusbdual.py @@ -1,6 +1,7 @@ import sh import unittest import os.path +import time class Qusbdual(unittest.TestCase): @@ -25,10 +26,12 @@ class Qusbdual(unittest.TestCase): p = sh.modprobe("g_mass_storage", "file=/var/lib/hwtest-files/mass-otg-test.img") if p.exit_code != 0: self.fail("failed: Unable to create a mass storage gadget.") + # wait to detect the new device + time.sleep(3) # find the mass storage device try: p = sh.grep(sh.lsblk("-So", "NAME,VENDOR"), "Linux") - except sh.ErrorReturnCode as e: + except sh.ErrorReturnCode: self.fail("failed: could not find any mass storage gadget") device = p.stdout.decode().split(" ")[0] # mount the mass storage gadget @@ -40,8 +43,9 @@ class Qusbdual(unittest.TestCase): for i in range(int(self.__repetitions)): # copy files try: - p = sh.cp("/mnt/station_ramdisk/hdd_gadget/usb-test.bin", "/mnt/station_ramdisk/hdd_gadget/usb-test.bin.md5", - "/tmp/stationnfs") + p = sh.cp("/mnt/station_ramdisk/hdd_gadget/usb-test.bin", + "/mnt/station_ramdisk/hdd_gadget/usb-test.bin.md5", + "/mnt/station_nfsdisk/") except sh.ErrorReturnCode as e: sh.umount("/mnt/station_ramdisk/hdd_gadget") sh.rmdir("/mnt/station_ramdisk/hdd_gadget") -- cgit v1.1 From 9f89f02e7d6e2a3208b0b85d2567ebffd2515e09 Mon Sep 17 00:00:00 2001 From: Hector Fernandez Date: Tue, 7 Jul 2020 14:09:59 +0200 Subject: New way to check preivous state before changing to a new one. Solved some problems with audio test. --- test-cli/test/tests/qaudio.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) (limited to 'test-cli/test/tests') diff --git a/test-cli/test/tests/qaudio.py b/test-cli/test/tests/qaudio.py index 364d8b2..3d2113a 100644 --- a/test-cli/test/tests/qaudio.py +++ b/test-cli/test/tests/qaudio.py @@ -28,7 +28,10 @@ class Qaudio(unittest.TestCase): # analize audio file recordtime = calc_audio_duration("/var/lib/hwtest-files/dtmf-13579.wav") + 0.15 # previous play to estabilise audio lines - sh.aplay("/var/lib/hwtest-files/dtmf-13579.wav") + p1 = sh.aplay("/var/lib/hwtest-files/dtmf-13579.wav", _bg=True) + p2 = sh.arecord("-r", 8000, "-d", recordtime, "/mnt/station_ramdisk/recorded.wav", _bg=True) + p1.wait() + p2.wait() # play and record p1 = sh.aplay("/var/lib/hwtest-files/dtmf-13579.wav", _bg=True) p2 = sh.arecord("-r", 8000, "-d", recordtime, "/mnt/station_ramdisk/recorded.wav", _bg=True) -- cgit v1.1 From 907b96801230e04d02575a3732a73e452089637b Mon Sep 17 00:00:00 2001 From: Hector Fernandez Date: Tue, 7 Jul 2020 17:27:33 +0200 Subject: After USBLOOP test, g_mass_storage must be stopped so the board can be rebooted correctly. --- test-cli/test/tests/qusbdual.py | 7 +++++++ 1 file changed, 7 insertions(+) (limited to 'test-cli/test/tests') diff --git a/test-cli/test/tests/qusbdual.py b/test-cli/test/tests/qusbdual.py index fcbb9aa..05b22c3 100644 --- a/test-cli/test/tests/qusbdual.py +++ b/test-cli/test/tests/qusbdual.py @@ -32,12 +32,14 @@ class Qusbdual(unittest.TestCase): try: p = sh.grep(sh.lsblk("-So", "NAME,VENDOR"), "Linux") except sh.ErrorReturnCode: + sh.modprobe("-r", "g_mass_storage") self.fail("failed: could not find any mass storage gadget") device = p.stdout.decode().split(" ")[0] # mount the mass storage gadget sh.mkdir("-p", "/mnt/station_ramdisk/hdd_gadget") p = sh.mount("-o", "ro", "/dev/" + device, "/mnt/station_ramdisk/hdd_gadget") if p.exit_code != 0: + sh.modprobe("-r", "g_mass_storage") self.fail("failed: Unable to mount the mass storage gadget.") # execute test for i in range(int(self.__repetitions)): @@ -49,11 +51,13 @@ class Qusbdual(unittest.TestCase): except sh.ErrorReturnCode as e: sh.umount("/mnt/station_ramdisk/hdd_gadget") sh.rmdir("/mnt/station_ramdisk/hdd_gadget") + sh.modprobe("-r", "g_mass_storage") self.fail("failed: Unable to copy files through USB.") # check if the device is still mounted if not os.path.ismount("/mnt/station_ramdisk/hdd_gadget"): sh.rm("/mnt/station_ramdisk/hdd_gadget/*") sh.rmdir("/mnt/station_ramdisk/hdd_gadget") + sh.modprobe("-r", "g_mass_storage") self.fail("failed: USB device unmounted during/after copying files.") # Check md5 try: @@ -61,6 +65,7 @@ class Qusbdual(unittest.TestCase): except sh.ErrorReturnCode as e: sh.umount("/mnt/station_ramdisk/hdd_gadget") sh.rmdir("/mnt/station_ramdisk/hdd_gadget") + sh.modprobe("-r", "g_mass_storage") self.fail("failed: Unable to calculate MD5 of the copied file.") newmd5 = p.stdout.decode().split(" ")[0] with open('/mnt/station_ramdisk/hdd_gadget/usb-test.bin.md5', 'r') as outfile: @@ -69,12 +74,14 @@ class Qusbdual(unittest.TestCase): if newmd5 != oldmd5: sh.umount("/mnt/station_ramdisk/hdd_gadget") sh.rmdir("/mnt/station_ramdisk/hdd_gadget") + sh.modprobe("-r", "g_mass_storage") self.fail("failed: MD5 check failed.") # delete copied files sh.rm("-f", "/mnt/station_nfsdisk/usb-test.bin", "/mnt/station_nfsdisk/usb-test.bin.md5") # Finish sh.umount("/mnt/station_ramdisk/hdd_gadget") sh.rmdir("/mnt/station_ramdisk/hdd_gadget") + sh.modprobe("-r", "g_mass_storage") def getresults(self): return self.__resultlist -- cgit v1.1 From d46bce422fd03cd57d1ba336361da17d6efb48db Mon Sep 17 00:00:00 2001 From: Manel Caro Date: Fri, 31 Jul 2020 02:07:37 +0200 Subject: TEST restructure --- test-cli/test/tests/qamper.py | 32 ++++++--- test-cli/test/tests/qaudio.py | 77 ++++++++++++++------- test-cli/test/tests/qdmesg.py | 30 ++++---- test-cli/test/tests/qeeprom.py | 77 +++++++++++++-------- test-cli/test/tests/qethernet.py | 77 ++++++++++++++++----- test-cli/test/tests/qmmcflash.py | 46 ++++++++++++- test-cli/test/tests/qnand.py | 44 ++++++------ test-cli/test/tests/qplc.py | 70 +++++++++++++++++++ test-cli/test/tests/qram.py | 56 +++++++++++---- test-cli/test/tests/qusb.py | 143 +++++++++++++++++++-------------------- test-cli/test/tests/qusbdual.py | 90 ------------------------ test-cli/test/tests/qwifi.py | 143 +++++++++++++++++++-------------------- 12 files changed, 518 insertions(+), 367 deletions(-) create mode 100644 test-cli/test/tests/qplc.py delete mode 100644 test-cli/test/tests/qusbdual.py (limited to 'test-cli/test/tests') diff --git a/test-cli/test/tests/qamper.py b/test-cli/test/tests/qamper.py index 58054c9..df340eb 100644 --- a/test-cli/test/tests/qamper.py +++ b/test-cli/test/tests/qamper.py @@ -5,6 +5,7 @@ from test.helpers.amper import Amper class Qamper(unittest.TestCase): params = None __resultlist = None # resultlist is a python list of python dictionaries + dummytest = False # varlist: undercurrent, overcurrent def __init__(self, testname, testfunc, varlist): @@ -21,8 +22,28 @@ class Qamper(unittest.TestCase): raise Exception('overcurrent param inside Qamp must be defined') self._testMethodDoc = testname self.__resultlist = [] + if "dummytest" in varlist: + self.dummytest = varlist["dummytest"] + + def saveresultinfile(self, currentvalue): + # save result in a file + with open('/mnt/station_ramdisk/amper.txt', 'w') as outfile: + n = outfile.write("Current: {} A".format(currentvalue)) + outfile.close() + self.__resultlist.append( + { + "description": "Amperimeter values", + "filepath": "/mnt/station_ramdisk/amper.txt", + "mimetype": "text/plain" + } + ) def execute(self): + if self.dummytest: + self.current = "0.0" + self.saveresultinfile(self.current) + return + amp = Amper() # open serial connection if not amp.open(): @@ -35,16 +56,7 @@ class Qamper(unittest.TestCase): # get current value (in Amperes) self.current = amp.getCurrent() # save result in a file - with open('/mnt/station_ramdisk/amper.txt', 'w') as outfile: - n = outfile.write("Current: {} A".format(self.current)) - outfile.close() - self.__resultlist.append( - { - "description": "Amperimeter values", - "filepath": "/mnt/station_ramdisk/amper.txt", - "mimetype": "text/plain" - } - ) + self.saveresultinfile(self.current) # close serial connection amp.close() # Check current range diff --git a/test-cli/test/tests/qaudio.py b/test-cli/test/tests/qaudio.py index 3d2113a..2c86809 100644 --- a/test-cli/test/tests/qaudio.py +++ b/test-cli/test/tests/qaudio.py @@ -2,56 +2,78 @@ import unittest import sh import wave import contextlib - - -def calc_audio_duration(fname): - with contextlib.closing(wave.open(fname, 'r')) as f: - frames = f.getnframes() - rate = f.getframerate() - duration = frames / float(rate) - f.close() - return duration - +import uuid +import time +from test.helpers.utils import save_result +from test.helpers.iseelogger import logObj class Qaudio(unittest.TestCase): params = None __resultlist = None # resultlist is a python list of python dictionaries __dtmf_secuence = ["1", "3", "5", "7", "9"] + __dtmf_file = None + __file_uuid = None + __QaudioName = None def __init__(self, testname, testfunc, varlist): self.params = varlist super(Qaudio, self).__init__(testfunc) self._testMethodDoc = testname self.__resultlist = [] + self.__xmlObj = varlist["xml"] + self.__QaudioName = varlist.get('name', 'qaudio') + self.__dtmf_file_play = varlist.get('play_dtmf_file', self.__xmlObj.getKeyVal(self.__QaudioName, "play_dtmf_file", "dtmf-13579.wav")) + self.__recordtime = varlist.get('recordtime', self.__xmlObj.getKeyVal(self.__QaudioName, "recordtime", "0.15")) + self.__dtmf_file_record = varlist.get('record_dtmf_file', self.__xmlObj.getKeyVal(self.__QaudioName, "record_dtmf_file", "record-{}.wav")) + self.__sampling_rate = varlist.get('rate', self.__xmlObj.getKeyVal(self.__QaudioName, "rate", "8000")) + self.__record_path = varlist.get('to', self.__xmlObj.getKeyVal(self.__QaudioName, "to", "/mnt/station_ramdisk")) + self.__run_delay = float(varlist.get('aplay_run_delay', self.__xmlObj.getKeyVal(self.__QaudioName, "aplay_run_delay", "0.0"))) + self.__file_uuid = uuid.uuid4() + self.__dtmf_file_record = self.__dtmf_file_record.format(self.__file_uuid) + + def restoreAlsaCtl(self): + try: + sh.alsactl('restore'); + except Exception as E: + logObj.getlogger().error("Failed to Execite alsactl restore"); def execute(self): - # analize audio file - recordtime = calc_audio_duration("/var/lib/hwtest-files/dtmf-13579.wav") + 0.15 + self.restoreAlsaCtl() + try: + # analize audio file + calcRecordTime = self.calc_audio_duration(self.__dtmf_file_play) + float(self.__recordtime) + float(0.1) + except TypeError as Error: + self.fail("failed: TypeError: {}.".Error) + # previous play to estabilise audio lines - p1 = sh.aplay("/var/lib/hwtest-files/dtmf-13579.wav", _bg=True) - p2 = sh.arecord("-r", 8000, "-d", recordtime, "/mnt/station_ramdisk/recorded.wav", _bg=True) - p1.wait() - p2.wait() - # play and record - p1 = sh.aplay("/var/lib/hwtest-files/dtmf-13579.wav", _bg=True) - p2 = sh.arecord("-r", 8000, "-d", recordtime, "/mnt/station_ramdisk/recorded.wav", _bg=True) - p1.wait() + p2 = sh.arecord("-r", self.__sampling_rate, "-d", calcRecordTime, "{}/{}".format(self.__record_path, self.__dtmf_file_record), _bg=True) + time.sleep(self.__run_delay) + p1 = sh.aplay(self.__dtmf_file_play) p2.wait() if p1.exit_code == 0 and p2.exit_code == 0: - p = sh.multimon("-t", "wav", "-a", "DTMF", "/mnt/station_ramdisk/recorded.wav", "-q") + p = sh.multimon("-t", "wav", "-a", "DTMF", "{}/{}".format(self.__record_path, self.__dtmf_file_record), "-q") if p.exit_code == 0: lines = p.stdout.decode('ascii').splitlines() self.__dtmf_secuence_result = [] for li in lines: if li.split(" ")[0] == "DTMF:": self.__dtmf_secuence_result.append(li.split(" ")[1]) + self.__dtmf_secuence_result = sorted(list(set(self.__dtmf_secuence_result))) # compare original and processed dtmf sequence if len(self.__dtmf_secuence) == len(self.__dtmf_secuence_result): for a, b in zip(self.__dtmf_secuence, self.__dtmf_secuence_result): if a != b: - self.fail("failed: sent and received DTMF sequence don't match.") + logObj.getlogger().debug( + "failed: sent and received DTMF sequence do not match.{} != {}".format( + self.__dtmf_secuence, self.__dtmf_secuence_result)) + self.fail("failed: sent and received DTMF sequence do not match.") + save_result(filePath='{}/{}'.format(self.__record_path, self.__dtmf_file_record), + description='sound record', + mime='audio/x-wav', + result=self.__resultlist) else: - self.fail("failed: received DTMF sequence is shorter than expected.") + logObj.getlogger().debug("received DTMF sequence is shorter than expected.{} {}".format(self.__dtmf_secuence,self.__dtmf_secuence_result)) + self.fail("received DTMF sequence is shorter than expected") else: self.fail("failed: unable to use multimon command.") else: @@ -62,3 +84,12 @@ class Qaudio(unittest.TestCase): def gettextresult(self): return "" + + def calc_audio_duration(self, fname): + duration = 0.0 + with contextlib.closing(wave.open(fname, 'r')) as f: + frames = f.getnframes() + rate = f.getframerate() + duration = frames / float(rate) + f.close() + return duration diff --git a/test-cli/test/tests/qdmesg.py b/test-cli/test/tests/qdmesg.py index b35f1ff..39a5047 100644 --- a/test-cli/test/tests/qdmesg.py +++ b/test-cli/test/tests/qdmesg.py @@ -1,4 +1,5 @@ import sh +import os import os.path from os import path @@ -9,24 +10,23 @@ class Qdmesg: def __init__(self, testname, testfunc, varlist): self.params = varlist - self._testMethodDoc = testname + self.__testMethodDoc = testname self.__resultlist = [] self.pgObj = varlist["db"] + self.__xmlObj = varlist["xml"] + self.__QdmesgName = varlist.get('name', 'qdmesg') + self.__syslog_dmesg_file = varlist.get('syslog_dmesg',self.__xmlObj.getKeyVal(self.__QdmesgName, "syslog_dmesg", + "/var/log/kern.log")) + + def getTestName(self): + return self.__testMethodDoc def execute(self): - print("running dmesg test") self.pgObj.run_test(self.params["testidctl"], self.params["testid"]) - # delete previous file - if path.exists("/mnt/station_ramdisk/dmesg.txt"): - os.remove("/mnt/station_ramdisk/dmesg.txt") - # generate file - p = sh.dmesg("--color=never", _out="/mnt/station_ramdisk/dmesg.txt") - if p.exit_code == 0: - # save dmesg result in DB - self.pgObj.upload_result_file(self.params["testidctl"], self.params["testid"], "dmesg output", - "/mnt/station_ramdisk/dmesg.txt", "text/plain") - self.pgObj.finish_test(self.params["testidctl"], self.params["testid"], "TEST_COMPLETE", "") - print("success dmesg test") - else: + if not os.path.isfile('{}'.format(self.__syslog_dmesg_file)): self.pgObj.finish_test(self.params["testidctl"], self.params["testid"], "TEST_FAILED", "") - print("fail dmesg test") + return False + self.pgObj.upload_result_file(self.params["testidctl"], self.params["testid"], "{}".format(self.__syslog_dmesg_file), + "{}".format(self.__syslog_dmesg_file), "text/plain") + self.pgObj.finish_test(self.params["testidctl"], self.params["testid"], "TEST_COMPLETE", "") + return True diff --git a/test-cli/test/tests/qeeprom.py b/test-cli/test/tests/qeeprom.py index 7fc9fcd..7900381 100644 --- a/test-cli/test/tests/qeeprom.py +++ b/test-cli/test/tests/qeeprom.py @@ -1,46 +1,65 @@ -import sh import unittest - +import os class Qeeprom(unittest.TestCase): params = None - __position = None - __eeprompath = None __resultlist = None # resultlist is a python list of python dictionaries + __QEepromName = None + __i2cBus = None + __device = None # varlist: position, eeprompath def __init__(self, testname, testfunc, varlist): self.params = varlist super(Qeeprom, self).__init__(testfunc) self._testMethodDoc = testname + self.__xmlObj = varlist["xml"] + self.__QEepromName = varlist.get('name', 'qeeprom') + self.__i2cAddress = varlist.get('i2c_address', self.__xmlObj.getKeyVal(self.__QEepromName, "i2c_address", "0050")) + self.__i2cBus = varlist.get('i2c_bus', self.__xmlObj.getKeyVal(self.__QEepromName, "i2c_bus", "0")) + self.__device = '/sys/bus/i2c/devices/{}-{}/eeprom'.format(self.__i2cBus, self.__i2cAddress) + self.__resultlist = [] - if "position" in varlist: - self.__position = varlist["position"] - else: - raise Exception('position param inside Qeeprom must be defined') - if "eeprompath" in varlist: - self.__eeprompath = varlist["eeprompath"] - else: - raise Exception('eeprompath param inside Qeeprom must be defined') + + def __check_device(self): + if os.path.isfile(self.__device): + return True + return False + + def __eeprom_write(self, data): + try: + f = open(self.__device, "wb") + f.write(data) + f.close() + except IOError as Error: + return False, '{}'.format(Error.errno) + return True, '' + + def __eeprom_read(self, size): + try: + f = open(self.__device, "rb") + data = f.read(size) + f.close() + except IOError as Error: + return False, '{}'.format(Error.errno) + return True, data + def execute(self): - # generate some data - data_tx = "isee_test" - # write data into the eeprom - p = sh.dd(sh.echo(data_tx), "of=" + self.__eeprompath, "bs=1", "seek=" + self.__position) - if p.exit_code == 0: - # read data from the eeprom - p = sh.dd("if=" + self.__eeprompath, "bs=1", "skip=" + self.__position, "count=" + str(len(data_tx))) - if p.exit_code == 0: - data_rx = p.stdout.decode('ascii') - # compare both values - if data_rx != data_tx: - # Both data are different - self.fail("failed: mismatch between written and received values.") - else: - self.fail("failed: Unable to read from the EEPROM device.") - else: - self.fail("failed: Unable to write on the EEPROM device.") + if not self.__check_device(): + self.fail("cannot open device {}".format(self.__device)) + + data = bytes('AbcDeFgHijK098521', 'utf-8') + + v, w = self.__eeprom_write(data) + if not v: + self.fail("eeprom: {} write test failed".format(w)) + v, r = self.__eeprom_read(len(data)) + if not v: + self.fail("eeprom: {} read test failed".format(r)) + if r != data: + self.fail("mismatch between write and read bytes") + def getresults(self): return self.__resultlist diff --git a/test-cli/test/tests/qethernet.py b/test-cli/test/tests/qethernet.py index 81acef1..7d081a1 100644 --- a/test-cli/test/tests/qethernet.py +++ b/test-cli/test/tests/qethernet.py @@ -2,7 +2,11 @@ import unittest import sh import json import time - +import uuid +import iperf3 +import netifaces +from test.helpers.utils import save_json_to_disk +from test.helpers.utils import station2Port class Qethernet(unittest.TestCase): __serverip = None @@ -16,29 +20,66 @@ class Qethernet(unittest.TestCase): # varlist content: serverip, bwexpected, port def __init__(self, testname, testfunc, varlist): - # save the parameters list self.params = varlist - # configure the function to be executed when the test runs. "testfunc" is a name of a method inside this - # class, that in this situation, it can only be "execute". super(Qethernet, self).__init__(testfunc) - # validate and get the parameters - if "serverip" in varlist: - self.__serverip = varlist["serverip"] - else: - raise Exception('sip param inside Qethernet have been be defined') - if "bwexpected" in varlist: - self.__bwexpected = varlist["bwexpected"] - else: - raise Exception('bwexpected param inside Qethernet must be defined') - if "port" in varlist: - self.__port = varlist["port"] - else: - raise Exception('port param inside Qethernet must be defined') - self.__numbytestx = "10M" self._testMethodDoc = testname + self.__xmlObj = varlist["xml"] + self.__QEthName = varlist.get('name', 'qeth') + self.__loops = varlist.get('loops', self.__xmlObj.getKeyVal(self.__QEthName, "loops", "1")) + self.__port = varlist.get('port', self.__xmlObj.getKeyVal(self.__QEthName, "port", "5000")) + self.__bw = varlist.get('bwexpected', self.__xmlObj.getKeyVal(self.__QEthName, "bwexpected", "40.0")) + self.__serverip = varlist.get('serverip', self.__xmlObj.getKeyVal(self.__QEthName, "serverip", "localhost")) + self.__duration = varlist.get('duration', self.__xmlObj.getKeyVal(self.__QEthName, "duration", "10")) + self.__interface = varlist.get('interface', self.__xmlObj.getKeyVal(self.__QEthName, "interface", "eth0")) + self.__toPath = varlist.get('to', self.__xmlObj.getKeyVal(self.__QEthName, "to", "/mnt/station_ramdisk")) + self.__retriesCount = varlist.get('retries', self.__xmlObj.getKeyVal(self.__QEthName, "retries", "5")) + self.__waitRetryTime = varlist.get('wait_retry', self.__xmlObj.getKeyVal(self.__QEthName, "wait_retry", "10")) + self.__wifi_res_file = varlist.get('eth_res_file', self.__xmlObj.getKeyVal(self.__QEthName, "eth_res_file", "eth_test_{}.json")) + self.__wifi_st_file = varlist.get('eth_st_file', self.__xmlObj.getKeyVal(self.__QEthName, "eth_st_file", "eth_st_{}.json")) + self.__file_uuid = uuid.uuid4() + self.__wifi_res_file = self.__wifi_res_file.format(self.__file_uuid) + self.__wifi_st_file = self.__wifi_st_file.format(self.__file_uuid) + self.__resultlist = [] + def checkInterface(self, reqInterface): + ifaces = netifaces.interfaces() + for t in ifaces: + if t == reqInterface: + return True + return False + def execute(self): + self.__resultlist = [] + # check interfaces + if not self.checkInterface(self.__interface): + self.fail('Interface not found: {}'.format(self.__interface)) + + # Start Iperf + client = iperf3.Client() + client.duration = int(self.__duration) + client.server_hostname = self.__serverip + client.port = station2Port(self.__port) + count = int(self.__retriesCount) + while count > 0: + result = client.run() + if result.error == None: + if result.received_Mbps >= float(self.__bw) and result.sent_Mbps >= float(self.__bw): + save_json_to_disk(filePath='{}/{}'.format(self.__toPath, self.__wifi_res_file), + description='eth {} iperf3'.format(self.__interface), + mime='application/json', + json_data=result.json, + result=self.__resultlist) + return + else: + self.fail('bw fail: rx:{} tx:{}'.format(result.received_Mbps, result.sent_Mbps)) + else: + count = count - 1 + time.sleep(int(self.__waitRetryTime)) + + self.fail('qEth (max tries) Execution error: {}'.format(result.error)) + + def execute2(self): # execute iperf command against the server, but it implements attempts in case the server is busy iperfdone = False while not iperfdone: diff --git a/test-cli/test/tests/qmmcflash.py b/test-cli/test/tests/qmmcflash.py index f10757e..0f5a0c1 100644 --- a/test-cli/test/tests/qmmcflash.py +++ b/test-cli/test/tests/qmmcflash.py @@ -1,5 +1,10 @@ import unittest -import sh +import scanf +from scanf import scanf +from sh import mmc +from sh import ErrorReturnCode +from test.helpers.utils import save_file_to_disk +from test.helpers.utils import sys_read class Qmmcflash(unittest.TestCase): params = None @@ -8,8 +13,43 @@ class Qmmcflash(unittest.TestCase): self.params = varlist super(Qmmcflash, self).__init__(testfunc) self._testMethodDoc = testname - # TODO + self.__xmlObj = varlist["xml"] + self.__QeMMCName = varlist.get('name', 'qemmc') + self.__emmcDevice = varlist.get('device', self.__xmlObj.getKeyVal(self.__QeMMCName, "device", "mmcblk0")) + self.__toPath = varlist.get('to', self.__xmlObj.getKeyVal(self.__QeMMCName, "to", "/mnt/station_ramdisk")) + self.__mmc_res_file = varlist.get('emmc_res_file', self.__xmlObj.getKeyVal(self.__QeMMCName, "emmc_res_file", "emmc_status.txt")) + self.__mmcPort = varlist.get('mmc_port', self.__xmlObj.getKeyVal(self.__QeMMCName, "mmc_port", "0")) + self.__mmcID = varlist.get('mmc_id', self.__xmlObj.getKeyVal(self.__QeMMCName, "mmc_id", "0001")) + + self.__resultlist = [] def execute(self): - # TODO \ No newline at end of file + try: + dataOut = mmc('extcsd', 'read', '/dev/{}'.format(self.__emmcDevice)) + save_file_to_disk(filePath='{}/{}'.format(self.__toPath, self.__mmc_res_file), + description='eMMC health test', + mime='text/plain', + data=dataOut.stdout.decode('utf-8'), + result=self.__resultlist) + sysDevice = "/sys/bus/mmc/drivers/mmcblk/mmc{}:{}".format(self.__mmcPort, self.__mmcID) + r, data = sys_read("{}/life_time".format(sysDevice)) + if not r: + self.fail("emmc: life_time not found") + res = scanf("0x%d 0x%d", data) + if res[0] > 3 or res[1] > 3: + self.fail("emmc: review {} life_time > 3".format(sysDevice)) + r, data = sys_read("{}/pre_eol_info".format(sysDevice)) + if not r: + self.fail("emmc: pre_eol_info not found") + res = scanf("0x%d", data) + if res[0] != 1: + self.fail("emmc: review {} pre_eol_info != 1".format(sysDevice)) + except ErrorReturnCode as Error: + self.fail("emmc: failed {} ".format(Error.exit_code)) + + def getresults(self): + return self.__resultlist + + def gettextresult(self): + return "" \ No newline at end of file diff --git a/test-cli/test/tests/qnand.py b/test-cli/test/tests/qnand.py index 988ab60..6de1eaf 100644 --- a/test-cli/test/tests/qnand.py +++ b/test-cli/test/tests/qnand.py @@ -1,6 +1,7 @@ import unittest import sh - +import uuid +from test.helpers.utils import save_file_to_disk class Qnand(unittest.TestCase): params = None @@ -11,29 +12,34 @@ class Qnand(unittest.TestCase): def __init__(self, testname, testfunc, varlist): self.params = varlist super(Qnand, self).__init__(testfunc) - if "device" in varlist: - self.__device = varlist["device"] - else: - raise Exception('device param inside Qnand must be defined') self._testMethodDoc = testname + self.__xmlObj = varlist["xml"] + self.__QNandName = varlist.get('name', 'qnand') + self.__device = varlist.get('device', self.__xmlObj.getKeyVal(self.__QNandName, "device", "/dev/mtd0")) + self.__nand_res_file = varlist.get('nand_res_file', self.__xmlObj.getKeyVal(self.__QNandName, "nand_res_file", "nand_test_{}.txt")) + self.__toPath = varlist.get('to', self.__xmlObj.getKeyVal(self.__QNandName, "to", "/mnt/station_ramdisk")) + self.__file_uuid = uuid.uuid4() + self.__nand_res_file = self.__nand_res_file.format(self.__file_uuid) + nandtestPath = self.__xmlObj.getKeyVal("qnand", "nandtestPath", '/root/hwtest-files/nandtest"') + + if nandtestPath == '': + self.myNandTest = sh.nandtest + else: + self.myNandTest = sh.Command(nandtestPath) self.__resultlist = [] def execute(self): try: - p = sh.nandtest("-m", self.__device) - # save result - with open('/mnt/station_ramdisk/nand-nandtest.txt', 'w') as outfile: - n = outfile.write(p.stdout.decode('ascii')) - outfile.close() - self.__resultlist.append( - { - "description": "nandtest output", - "filepath": "/mnt/station_ramdisk/nand-nandtest.txt", - "mimetype": "text/plain" - } - ) - except sh.ErrorReturnCode as e: - self.fail("failed: could not complete nandtest command") + res = self.myNandTest("-m", self.__device) + save_file_to_disk(filePath='{}/{}'.format(self.__toPath, self.__nand_res_file), + description='nand {} test'.format(self.__device), + mime='text/plain', + data=res.stdout.decode('utf-8'), + result=self.__resultlist) + + except sh.ErrorReturnCode_1 as e: + self.fail("nandtest failed: {}".format(e.ErrorReturnCode.stdout)) + def getresults(self): return self.__resultlist diff --git a/test-cli/test/tests/qplc.py b/test-cli/test/tests/qplc.py new file mode 100644 index 0000000..01258a9 --- /dev/null +++ b/test-cli/test/tests/qplc.py @@ -0,0 +1,70 @@ +import shutil +import unittest +import os.path +import os +import sh + +from test.helpers.md5 import md5_file +from test.helpers.plc import dcpPLC + +class Qplc(unittest.TestCase): + params = None + __resultlist = None # resultlist is a python list of python dictionaries + __QPLCName = None + __plc = None + + def __init__(self, testname, testfunc, varlist): + self.params = varlist + super(Qplc, self).__init__(testfunc) + self._testMethodDoc = testname + self.__xmlObj = varlist["xml"] + + self.__QPLCName = varlist.get('name', 'qplc') + self.__firmware = varlist.get('firmware', self.__xmlObj.getKeyVal(self.__QPLCName, "firmware", "")) + self.__firmware_md5 = varlist.get('firmware_md5', self.__xmlObj.getKeyVal(self.__QPLCName, "firmware_md5", "")) + self.__factoryTool = varlist.get('factory_tool', self.__xmlObj.getKeyVal(self.__QPLCName, "factory_tool", "")) + self.__gen_ip = varlist.get('gen_ip', self.__xmlObj.getKeyVal(self.__QPLCName, "gen_ip", "0")) + self.__gen_mac = varlist.get('gen_mac', self.__xmlObj.getKeyVal(self.__QPLCName, "gen_mac", "0")) + self.__mtd_device = varlist.get('mtd_device', self.__xmlObj.getKeyVal(self.__QPLCName, "mtd_device", "/dev/mtd0")) + self.__plc = dcpPLC(self.__factoryTool, self.__mtd_device) + + self.__resultlist = [] + + + def checkfiles(self): + if not os.path.isfile('{}/{}'.format(self.__filepath_from, self.__file_source)): + return False, 'File test binary Not found {}/{}'.format(self.__filepath_from, self.__file_source) + if not os.path.isfile('{}/{}'.format(self.__filepath_from, self.__file_md5)): + return False, 'File test binary md5 Not found {}/{}'.format(self.__filepath_from, self.__file_md5) + if self.__check_mounted != '': + if not os.path.ismount('{}'.format(self.__check_mounted)): + return False, 'Required mounted failed: {}'.format(self.__path_to) + r, s = self.removefileIfExist('{}/{}'.format(self.__path_to, self.__file_source)) + if not r: + return r, s + return True, '' + + def __flashMemory(self): + self.__plc.setSaveFirmwareMode() + r, v = self.__plc.SaveFirmware(self.__firmware) + if not r: + self.fail(v) + + def execute(self): + # Check files + v, m = self.checkfiles() + if not v: + self.fail(m) + # do flash & verify + self.__flashMemory() + # do Plc boot + self.__plc.setBootMode() + # Wait plc goes UP + + + + def getresults(self): + return self.__resultlist + + def gettextresult(self): + return "" diff --git a/test-cli/test/tests/qram.py b/test-cli/test/tests/qram.py index 21a01c8..49d4d54 100644 --- a/test-cli/test/tests/qram.py +++ b/test-cli/test/tests/qram.py @@ -1,33 +1,59 @@ import unittest import sh - +# from test.helpers.iseelogger import MeasureTime +# from test.helpers.iseelogger import logObj +from test.helpers.utils import save_file_to_disk class Qram(unittest.TestCase): params = None - __memsize = "10M" - __loops = "1" - __resultlist = None # resultlist is a python list of python dictionaries + __memsize = None + __loops = None + __resultlist = [] # resultlist is a python list of python dictionaries # varlist: memsize, loops def __init__(self, testname, testfunc, varlist): self.params = varlist super(Qram, self).__init__(testfunc) - if "memsize" in varlist: - self.__memsize = varlist["memsize"] - else: - raise Exception('memsize param inside Qram must be defined') - if "loops" in varlist: - self.__loops = varlist["loops"] - else: - raise Exception('loops param inside Qram must be defined') self._testMethodDoc = testname - self.__resultlist = [] + self.__xmlObj = varlist["xml"] + + self.__QramName = varlist.get('name', 'qram') + self.__loops = varlist.get('loops', self.__xmlObj.getKeyVal(self.__QramName, "loops", "1")) + self.__memsize = varlist.get('memsize', self.__xmlObj.getKeyVal(self.__QramName, "memsize", "50M")) + self.__toPath = varlist.get('to', self.__xmlObj.getKeyVal(self.__QramName, "to", "/mnt/station_ramdisk")) + self.__ram_res_file = varlist.get('ram_res_file', self.__xmlObj.getKeyVal(self.__QramName, "ram_res_file", "ram_res.txt")) + + self.__dummytest = varlist.get('dummytest', self.__xmlObj.getKeyVal(self.__QramName, "dummytest", "0")) + self.__dummyresult = varlist.get('dummytestresult', self.__xmlObj.getKeyVal(self.__QramName, "dummytest", "0")) + + memtesterPath = self.__xmlObj.getKeyVal(self.__QramName, "memtesterPath", '') + + if memtesterPath == '': + self.myMemtester = sh.memtester + else: + self.myMemtester = sh.Command(memtesterPath) def execute(self): + self.__resultlist = [] try: - p = sh.memtester(self.__memsize, "1", _out="/dev/null") + # mytime = MeasureTime() + res = self.myMemtester(self.__memsize, '{}'.format(self.__loops)) + save_file_to_disk(filePath='{}/{}'.format(self.__toPath, self.__ram_res_file), + description='Ram result test', + mime='text/plain', + data=res.stdout.decode('utf-8'), + result=self.__resultlist) + # mytime.stop() except sh.ErrorReturnCode as e: - self.fail("failed: could not complete memtester command") + save_file_to_disk(filePath='{}/{}'.format(self.__toPath, self.__ram_res_file), + description='Ram result test', + mime='text/plain', + data=e.stdout.decode('utf-8'), + result=self.__resultlist) + self.fail("failed: memtester {}::{}".format(str(e.exit_code), e.stdout.decode('utf-8'))) + + except Exception as details: + self.fail('Error: {}'.format(details)) def getresults(self): return self.__resultlist diff --git a/test-cli/test/tests/qusb.py b/test-cli/test/tests/qusb.py index 6302012..3182685 100644 --- a/test-cli/test/tests/qusb.py +++ b/test-cli/test/tests/qusb.py @@ -1,91 +1,90 @@ -import sh +import shutil import unittest -from test.helpers.usb import USBDevices import os.path +import os +import sh +from test.helpers.md5 import md5_file class Qusb(unittest.TestCase): params = None __resultlist = None # resultlist is a python list of python dictionaries + __QusbName = None def __init__(self, testname, testfunc, varlist): self.params = varlist super(Qusb, self).__init__(testfunc) self._testMethodDoc = testname - if "repetitions" in varlist: - self.__repetitions = varlist["repetitions"] - else: - raise Exception('repetitions param inside Qusb must be defined') + self.__xmlObj = varlist["xml"] + + self.__QusbName = varlist.get('name', 'qusb') + self.__loops = varlist.get('loops', self.__xmlObj.getKeyVal(self.__QusbName, "loops", "3")) + self.__filepath_from = varlist.get('path', self.__xmlObj.getKeyVal(self.__QusbName, "path", "/root/hwtest-files")) + self.__file_source = varlist.get('file', self.__xmlObj.getKeyVal(self.__QusbName, "file", "usb-test.bin")) + self.__file_md5 = varlist.get('file_md5', self.__xmlObj.getKeyVal(self.__QusbName, "file_md5", "usb-test.bin.md5")) + self.__path_to = varlist.get('pathto', self.__xmlObj.getKeyVal(self.__QusbName, "pathto", "/mnt/test/disk2")) + self.__check_mounted = varlist.get('check_mounted', self.__xmlObj.getKeyVal(self.__QusbName, "check_mounted", "")) + self.__resultlist = [] - def execute(self): - # get usb device - dev_obj = USBDevices(self.params["xml"]) - if dev_obj.getMassStorage(): - device = dev_obj.getMassStorage()['disk'] + "1" - else: - self.fail("failed: No USB memory found.") - # check if the device is mounted, and umount it - try: - p = sh.findmnt("-n", device) - if p.exit_code == 0: - sh.umount(device) - except sh.ErrorReturnCode as e: - # error = 1 means "no found" - pass - # mount the device - sh.mkdir("-p", "/mnt/station_ramdisk/pendrive") - p = sh.mount(device, "/mnt/station_ramdisk/pendrive") - if p.exit_code != 0: - self.fail("failed: Unable to mount the USB memory device.") - # execute test - for i in range(int(self.__repetitions)): - # copy files + def removefileIfExist(self, fname): + if os.path.exists(fname): try: - p = sh.cp("/var/lib/hwtest-files/usbdatatest.bin", - "/var/lib/hwtest-files/usbdatatest.bin.md5", - "/mnt/station_ramdisk/pendrive") - except sh.ErrorReturnCode as e: - try: - sh.umount("/mnt/station_ramdisk/pendrive") - except sh.ErrorReturnCode: - pass - sh.rmdir("/mnt/station_ramdisk/pendrive") - self.fail("failed: Unable to copy files to the USB memory device.") - # check if the device is still mounted - if not os.path.ismount("/mnt/station_ramdisk/pendrive"): - sh.rm("/mnt/station_ramdisk/pendrive/*") - sh.rmdir("/mnt/station_ramdisk/pendrive") - self.fail("failed: USB device unmounted during/after copying files.") - # check MD5 - try: - p = sh.md5sum("/mnt/station_ramdisk/pendrive/usbdatatest.bin") - except sh.ErrorReturnCode as e: - try: - sh.umount("/mnt/station_ramdisk/pendrive") - except sh.ErrorReturnCode: - pass - sh.rmdir("/mnt/station_ramdisk/pendrive") - self.fail("failed: Unable to calculate MD5 of the copied file.") - newmd5 = p.stdout.decode().split(" ")[0] - with open('/mnt/station_ramdisk/pendrive/usbdatatest.bin.md5', 'r') as outfile: - oldmd5 = outfile.read().rstrip("\n") - outfile.close() - if newmd5 != oldmd5: - try: - sh.umount("/mnt/station_ramdisk/pendrive") - except sh.ErrorReturnCode: - pass - sh.rmdir("/mnt/station_ramdisk/pendrive") - self.fail("failed: MD5 check failed.") - # delete copied files - sh.rm("-f", "/mnt/station_ramdisk/pendrive/usbdatatest.bin", "/mnt/station_ramdisk/pendrive/usbdatatest.bin.md5") - # Finish + os.remove(fname) + except OSError as error: + return False, str(error) + return True, '' + + def checkfiles(self): + if not os.path.isfile('{}/{}'.format(self.__filepath_from, self.__file_source)): + return False, 'File test binary Not found {}/{}'.format(self.__filepath_from, self.__file_source) + if not os.path.isfile('{}/{}'.format(self.__filepath_from, self.__file_md5)): + return False, 'File test binary md5 Not found {}/{}'.format(self.__filepath_from, self.__file_md5) + if self.__check_mounted != '': + if not os.path.ismount('{}'.format(self.__check_mounted)): + return False, 'Required mounted failed: {}'.format(self.__path_to) + r, s = self.removefileIfExist('{}/{}'.format(self.__path_to, self.__file_source)) + if not r: + return r, s + return True, '' + + def getTestFile_md5(self, fname): + t = '' + with open('{}'.format(fname), 'r') as outfile: + t = outfile.read().rstrip("\n") + outfile.close() + return t + + def ExecuteTranfer(self): try: - sh.umount("/mnt/station_ramdisk/pendrive") - except sh.ErrorReturnCode: - pass - sh.rmdir("/mnt/station_ramdisk/pendrive") + originalMD5 = self.getTestFile_md5('{}/{}'.format(self.__filepath_from, self.__file_md5)) + if originalMD5 == '': + self.fail('MD5 file {}/{} Not contain any md5'.format(self.__filepath_from, self.__file_md5)) + # shutil.copy2('{}/{}'.format(self.__filepath_from, self.__file_source), '{}'.format(self.__path_to)) + sh.cp('{}/{}'.format(self.__filepath_from, self.__file_source), '{}'.format(self.__path_to)) + sh.sync() + md5val = md5_file('{}/{}'.format(self.__path_to, self.__file_source)) + self.removefileIfExist('{}/{}'.format(self.__path_to, self.__file_source)) + if originalMD5 != md5val: + return False, 'md5 check failed {}<>{}'.format(originalMD5, md5val) + except OSError as why: + return False,'Error: {} tranfer failed'.format(why.errno) + except Exception as details: + return False, 'USB Exception: {} tranfer failed'.format(details) + return True, '' + + def execute(self): + v, m = self.checkfiles() + if not v: + self.fail(m) + countLoops = int(self.__loops) + while countLoops > 0: + res, msg = self.ExecuteTranfer() + if not res: + res, msg = self.ExecuteTranfer() + if not res: + self.fail(msg) + countLoops = countLoops - 1 def getresults(self): return self.__resultlist diff --git a/test-cli/test/tests/qusbdual.py b/test-cli/test/tests/qusbdual.py deleted file mode 100644 index 05b22c3..0000000 --- a/test-cli/test/tests/qusbdual.py +++ /dev/null @@ -1,90 +0,0 @@ -import sh -import unittest -import os.path -import time - - -class Qusbdual(unittest.TestCase): - params = None - __resultlist = None # resultlist is a python list of python dictionaries - - def __init__(self, testname, testfunc, varlist): - self.params = varlist - super(Qusbdual, self).__init__(testfunc) - self._testMethodDoc = testname - if "repetitions" in varlist: - self.__repetitions = varlist["repetitions"] - else: - raise Exception('repetitions param inside Qusbdual must be defined') - self.__resultlist = [] - - def execute(self): - # check if file-as-filesystem exists - if not os.path.isfile('/var/lib/hwtest-files/mass-otg-test.img'): - self.fail("failed: Unable to find file-as-filesystem image.") - # generate mass storage gadget - p = sh.modprobe("g_mass_storage", "file=/var/lib/hwtest-files/mass-otg-test.img") - if p.exit_code != 0: - self.fail("failed: Unable to create a mass storage gadget.") - # wait to detect the new device - time.sleep(3) - # find the mass storage device - try: - p = sh.grep(sh.lsblk("-So", "NAME,VENDOR"), "Linux") - except sh.ErrorReturnCode: - sh.modprobe("-r", "g_mass_storage") - self.fail("failed: could not find any mass storage gadget") - device = p.stdout.decode().split(" ")[0] - # mount the mass storage gadget - sh.mkdir("-p", "/mnt/station_ramdisk/hdd_gadget") - p = sh.mount("-o", "ro", "/dev/" + device, "/mnt/station_ramdisk/hdd_gadget") - if p.exit_code != 0: - sh.modprobe("-r", "g_mass_storage") - self.fail("failed: Unable to mount the mass storage gadget.") - # execute test - for i in range(int(self.__repetitions)): - # copy files - try: - p = sh.cp("/mnt/station_ramdisk/hdd_gadget/usb-test.bin", - "/mnt/station_ramdisk/hdd_gadget/usb-test.bin.md5", - "/mnt/station_nfsdisk/") - except sh.ErrorReturnCode as e: - sh.umount("/mnt/station_ramdisk/hdd_gadget") - sh.rmdir("/mnt/station_ramdisk/hdd_gadget") - sh.modprobe("-r", "g_mass_storage") - self.fail("failed: Unable to copy files through USB.") - # check if the device is still mounted - if not os.path.ismount("/mnt/station_ramdisk/hdd_gadget"): - sh.rm("/mnt/station_ramdisk/hdd_gadget/*") - sh.rmdir("/mnt/station_ramdisk/hdd_gadget") - sh.modprobe("-r", "g_mass_storage") - self.fail("failed: USB device unmounted during/after copying files.") - # Check md5 - try: - p = sh.md5sum("/mnt/station_nfsdisk/usb-test.bin") - except sh.ErrorReturnCode as e: - sh.umount("/mnt/station_ramdisk/hdd_gadget") - sh.rmdir("/mnt/station_ramdisk/hdd_gadget") - sh.modprobe("-r", "g_mass_storage") - self.fail("failed: Unable to calculate MD5 of the copied file.") - newmd5 = p.stdout.decode().split(" ")[0] - with open('/mnt/station_ramdisk/hdd_gadget/usb-test.bin.md5', 'r') as outfile: - oldmd5 = outfile.read().rstrip("\n") - outfile.close() - if newmd5 != oldmd5: - sh.umount("/mnt/station_ramdisk/hdd_gadget") - sh.rmdir("/mnt/station_ramdisk/hdd_gadget") - sh.modprobe("-r", "g_mass_storage") - self.fail("failed: MD5 check failed.") - # delete copied files - sh.rm("-f", "/mnt/station_nfsdisk/usb-test.bin", "/mnt/station_nfsdisk/usb-test.bin.md5") - # Finish - sh.umount("/mnt/station_ramdisk/hdd_gadget") - sh.rmdir("/mnt/station_ramdisk/hdd_gadget") - sh.modprobe("-r", "g_mass_storage") - - def getresults(self): - return self.__resultlist - - def gettextresult(self): - return "" diff --git a/test-cli/test/tests/qwifi.py b/test-cli/test/tests/qwifi.py index 3dd9e38..4695041 100644 --- a/test-cli/test/tests/qwifi.py +++ b/test-cli/test/tests/qwifi.py @@ -1,9 +1,11 @@ import unittest -import sh -import re -import json import time - +import uuid +import iperf3 +import netifaces +from test.helpers.iw import iwScan +from test.helpers.utils import save_json_to_disk +from test.helpers.utils import station2Port class Qwifi(unittest.TestCase): __serverip = None @@ -19,81 +21,76 @@ class Qwifi(unittest.TestCase): def __init__(self, testname, testfunc, varlist): self.params = varlist super(Qwifi, self).__init__(testfunc) - if "serverip" in varlist: - self.__serverip = varlist["serverip"] - else: - raise Exception('serverip param inside Qwifi have been be defined') - if "bwexpected" in varlist: - self.__bwexpected = varlist["bwexpected"] - else: - raise Exception('OKBW param inside Qwifi must be defined') - if "port" in varlist: - self.__port = varlist["port"] - else: - raise Exception('port param inside Qwifi must be defined') - self.__numbytestx = "10M" self._testMethodDoc = testname + self.__xmlObj = varlist["xml"] + self.__QwifiName = varlist.get('name', 'qwifi') + self.__loops = varlist.get('loops', self.__xmlObj.getKeyVal(self.__QwifiName, "loops", "1")) + self.__port = varlist.get('port', self.__xmlObj.getKeyVal(self.__QwifiName, "port", "5000")) + self.__bw = varlist.get('bwexpected', self.__xmlObj.getKeyVal(self.__QwifiName, "bwexpected", "5.0")) + self.__serverip = varlist.get('serverip', self.__xmlObj.getKeyVal(self.__QwifiName, "serverip", "localhost")) + self.__duration = varlist.get('duration', self.__xmlObj.getKeyVal(self.__QwifiName, "duration", "10")) + self.__interface = varlist.get('interface', self.__xmlObj.getKeyVal(self.__QwifiName, "interface", "wlan0")) + self.__toPath = varlist.get('to', self.__xmlObj.getKeyVal(self.__QwifiName, "to", "/mnt/station_ramdisk")) + self.__retriesCount = varlist.get('retries', self.__xmlObj.getKeyVal(self.__QwifiName, "retries", "5")) + self.__waitRetryTime = varlist.get('wait_retry', self.__xmlObj.getKeyVal(self.__QwifiName, "wait_retry", "10")) + self.__wifi_res_file = varlist.get('wifi_res_file', self.__xmlObj.getKeyVal(self.__QwifiName, "wifi_res_file", "wifi_test_{}.json")) + self.__wifi_st_file = varlist.get('wifi_st_file', self.__xmlObj.getKeyVal(self.__QwifiName, "wifi_st_file", "wifi_st_{}.json")) + self.__file_uuid = uuid.uuid4() + self.__wifi_res_file = self.__wifi_res_file.format(self.__file_uuid) + self.__wifi_st_file = self.__wifi_st_file.format(self.__file_uuid) self.__resultlist = [] - def execute(self): - # check if the board is connected to the router by wifi - p = sh.iw("wlan0", "link") - if p.exit_code == 0: - # get the first line of the output stream - out1 = p.stdout.decode('ascii').splitlines()[0] - if out1 != "Not connected.": - # check if the board has ip in the wlan0 interface - p = sh.ifconfig("wlan0") - if p.exit_code == 0: - # check if wlan0 has an IP - result = re.search( - 'inet addr:(?!127\.0{1,3}\.0{1,3}\.0{0,2}1$)((25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.){3}(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)', - p.stdout.decode('ascii')) - if result: - # execute iperf command against the server, but it implements attempts in case the server is busy - iperfdone = False - while not iperfdone: - try: - p = sh.iperf3("-c", self.__serverip, "-n", self.__numbytestx, "-f", "m", "-p", self.__port, - "-J", _timeout=20) - iperfdone = True - except sh.TimeoutException: - self.fail("failed: iperf timeout reached") - except sh.ErrorReturnCode: - time.sleep(self.timebetweenattempts) - - # check if it was executed succesfully - if p.exit_code == 0: - if p.stdout == "": - self.fail("failed: error executing iperf command") - # analyze output string - data = json.loads(p.stdout.decode('ascii')) - self.__bwreal = float(data['end']['sum_received']['bits_per_second']) / 1024 / 1024 - # save result file - with open('/mnt/station_ramdisk/wifi-iperf3.json', 'w') as outfile: - json.dump(data, outfile, indent=4) - outfile.close() - self.__resultlist.append( - { - "description": "iperf3 output", - "filepath": "/mnt/station_ramdisk/wifi-iperf3.json", - "mimetype": "application/json" - } - ) + def checkInterface(self, reqInterface): + ifaces = netifaces.interfaces() + for t in ifaces: + if t == reqInterface: + return True + return False - # check if BW is in the expected range - if self.__bwreal < float(self.__bwexpected): - self.fail("failed: speed is lower than expected. Speed(Mbits/s): " + str(self.__bwreal)) - else: - self.fail("failed: could not complete iperf3 command") - else: - self.fail("failed: wlan0 interface doesn't have any ip address.") + def execute(self): + self.__resultlist = [] + stationList = {} + # check interfaces + if not self.checkInterface(self.__interface): + self.fail('Interface not found: {}'.format(self.__interface)) + # scan networks + scanner = iwScan(self.__interface) + if scanner.scan(): + stationList = scanner.getStations() + # check if we are connected + if not scanner.findConnected(): + self.fail('Not connected to test AP') + else: + strError = scanner.getLastError() + self.fail('qWifi scanner Execution error: {}'.format(strError)) + # Start Iperf + client = iperf3.Client() + client.duration = int(self.__duration) + client.server_hostname = self.__serverip + client.port = station2Port(self.__port) + count = int(self.__retriesCount) + while count > 0: + result = client.run() + if result.error == None: + if result.received_Mbps >= float(self.__bw) and result.sent_Mbps >= float(self.__bw): + save_json_to_disk(filePath='{}/{}'.format(self.__toPath, self.__wifi_st_file), + description='wifi scan', + mime='application/json', + json_data=stationList, + result=self.__resultlist) + save_json_to_disk(filePath='{}/{}'.format(self.__toPath, self.__wifi_res_file), + description='wifi {} iperf3'.format(self.__interface), + mime='application/json', + json_data=result.json, + result=self.__resultlist) + return else: - self.fail("failed: could not complete ifconfig command.") + self.fail('bw fail: rx:{} tx:{}'.format(result.received_Mbps, result.sent_Mbps)) else: - self.fail("failed: wifi module is not connected to the router.") - else: - self.fail("failed: could not execute iw command") + count = count - 1 + time.sleep(int(self.__waitRetryTime)) + + self.fail('qWifi (max tries) Execution error: {}'.format(result.error)) def getresults(self): return self.__resultlist -- cgit v1.1 From 227d9783fe8327b84ac3b0e032f012ba144816f8 Mon Sep 17 00:00:00 2001 From: Manel Caro Date: Fri, 31 Jul 2020 13:58:41 +0200 Subject: IGEP0048: added plc test --- test-cli/test/tests/qamper.py | 2 +- test-cli/test/tests/qplc.py | 58 +++++++++++++++++++++++++++++++------------ 2 files changed, 43 insertions(+), 17 deletions(-) (limited to 'test-cli/test/tests') diff --git a/test-cli/test/tests/qamper.py b/test-cli/test/tests/qamper.py index df340eb..96b673c 100644 --- a/test-cli/test/tests/qamper.py +++ b/test-cli/test/tests/qamper.py @@ -39,7 +39,7 @@ class Qamper(unittest.TestCase): ) def execute(self): - if self.dummytest: + if self.dummytest == '1': self.current = "0.0" self.saveresultinfile(self.current) return diff --git a/test-cli/test/tests/qplc.py b/test-cli/test/tests/qplc.py index 01258a9..a70ae3a 100644 --- a/test-cli/test/tests/qplc.py +++ b/test-cli/test/tests/qplc.py @@ -3,21 +3,26 @@ import unittest import os.path import os import sh +import stat from test.helpers.md5 import md5_file from test.helpers.plc import dcpPLC +from test.helpers.iseelogger import logObj class Qplc(unittest.TestCase): params = None __resultlist = None # resultlist is a python list of python dictionaries __QPLCName = None __plc = None + __board_uuid = None def __init__(self, testname, testfunc, varlist): self.params = varlist super(Qplc, self).__init__(testfunc) self._testMethodDoc = testname self.__xmlObj = varlist["xml"] + self.__psdbObj = varlist["db"] + self.__board_uuid = varlist["boarduuid"] self.__QPLCName = varlist.get('name', 'qplc') self.__firmware = varlist.get('firmware', self.__xmlObj.getKeyVal(self.__QPLCName, "firmware", "")) @@ -26,27 +31,32 @@ class Qplc(unittest.TestCase): self.__gen_ip = varlist.get('gen_ip', self.__xmlObj.getKeyVal(self.__QPLCName, "gen_ip", "0")) self.__gen_mac = varlist.get('gen_mac', self.__xmlObj.getKeyVal(self.__QPLCName, "gen_mac", "0")) self.__mtd_device = varlist.get('mtd_device', self.__xmlObj.getKeyVal(self.__QPLCName, "mtd_device", "/dev/mtd0")) + self.__firmware_Path = varlist.get('firmwarepath', self.__xmlObj.getKeyVal(self.__QPLCName, "firmwarepath", "/root/hwtest-files/firmware")) + self.__plc_test_ip = varlist.get('plc_test_ip', self.__xmlObj.getKeyVal(self.__QPLCName, "plc_test_ip", "192.168.60.91")) + self.__skipflash = varlist.get('skipflash', self.__xmlObj.getKeyVal(self.__QPLCName, "skipflash", "0")) self.__plc = dcpPLC(self.__factoryTool, self.__mtd_device) - + self.__factory_password = varlist.get('factory_password', self.__xmlObj.getKeyVal(self.__QPLCName, "factory_password", "0")) + self.__firmware_password = varlist.get('firmware_password', self.__xmlObj.getKeyVal(self.__QPLCName, "firmware_password", "0")) self.__resultlist = [] def checkfiles(self): - if not os.path.isfile('{}/{}'.format(self.__filepath_from, self.__file_source)): - return False, 'File test binary Not found {}/{}'.format(self.__filepath_from, self.__file_source) - if not os.path.isfile('{}/{}'.format(self.__filepath_from, self.__file_md5)): - return False, 'File test binary md5 Not found {}/{}'.format(self.__filepath_from, self.__file_md5) - if self.__check_mounted != '': - if not os.path.ismount('{}'.format(self.__check_mounted)): - return False, 'Required mounted failed: {}'.format(self.__path_to) - r, s = self.removefileIfExist('{}/{}'.format(self.__path_to, self.__file_source)) - if not r: - return r, s + if not os.path.isfile(self.__factoryTool): + return False, 'Factory tool Not found {}/{}'.format(self.__factoryTool) + if not os.path.isfile('{}/{}'.format(self.__firmware_Path, self.__firmware)): + return False, 'Firmware Not found {}/{}'.format(self.__firmware_Path, self.__firmware) + if not os.path.isfile('{}/{}'.format(self.__firmware_Path, self.__firmware_md5)): + return False, 'Firmware md5 Not found {}/{}'.format(self.__firmware_Path, self.__firmware_md5) + try: + mode = os.stat(self.__mtd_device).st_mode; + if not stat.S_ISCHR(mode): + return False, 'No device {} found'.format(self.__mtd_device) + except: + return False, 'No device {} found'.format(self.__mtd_device) return True, '' def __flashMemory(self): - self.__plc.setSaveFirmwareMode() - r, v = self.__plc.SaveFirmware(self.__firmware) + r, v = self.__plc.SaveFirmware('{}/{}'.format(self.__firmware_Path, self.__firmware)) if not r: self.fail(v) @@ -56,12 +66,28 @@ class Qplc(unittest.TestCase): if not v: self.fail(m) # do flash & verify - self.__flashMemory() + if self.__skipflash == '0': + self.__flashMemory() # do Plc boot self.__plc.setBootMode() # Wait plc goes UP - - + if self.__gen_mac: + res, v = self.__psdbObj.get_plc_macaddr(self.__board_uuid) + if res == None and v == 'MAC limit by uuid': + self.fail('MAC limit by uuid') + elif res == None: + self.fail('BUG: ?????') + logObj.getlogger().info("MAC {}".format(res[0])) + plcMAC = res[0][0] + self.__plc.set_plc2('SYSTEM.PRODUCTION.SECTOR0_UNLOCK_PASSWORD', + self.__factory_password, + 'SYSTEM.PRODUCTION.MAC_ADDR', + '{}'.format(plcMAC), + self.__firmware_password) + #self.__plc.set_plc('SYSTEM.PRODUCTION.SECTOR0_UNLOCK_PASSWORD', self.__factory_password, self.__firmware_password) + #plcMAC = res[0][0] + #self.__plc.set_plc('SYSTEM.PRODUCTION.MAC_ADDR', '{}'.format(plcMAC), self.__firmware_password) + self.__plc.setPLCReset() def getresults(self): return self.__resultlist -- cgit v1.1 From f5f3398291c40bcf0a8fdedb48d4821e8e331c3c Mon Sep 17 00:00:00 2001 From: Hector Fernandez Date: Thu, 20 Aug 2020 14:17:49 +0200 Subject: Added ping check in PLC test. --- test-cli/test/tests/qplc.py | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) (limited to 'test-cli/test/tests') diff --git a/test-cli/test/tests/qplc.py b/test-cli/test/tests/qplc.py index a70ae3a..27bbbea 100644 --- a/test-cli/test/tests/qplc.py +++ b/test-cli/test/tests/qplc.py @@ -4,6 +4,8 @@ import os.path import os import sh import stat +import time +import ping3 from test.helpers.md5 import md5_file from test.helpers.plc import dcpPLC @@ -32,11 +34,12 @@ class Qplc(unittest.TestCase): self.__gen_mac = varlist.get('gen_mac', self.__xmlObj.getKeyVal(self.__QPLCName, "gen_mac", "0")) self.__mtd_device = varlist.get('mtd_device', self.__xmlObj.getKeyVal(self.__QPLCName, "mtd_device", "/dev/mtd0")) self.__firmware_Path = varlist.get('firmwarepath', self.__xmlObj.getKeyVal(self.__QPLCName, "firmwarepath", "/root/hwtest-files/firmware")) - self.__plc_test_ip = varlist.get('plc_test_ip', self.__xmlObj.getKeyVal(self.__QPLCName, "plc_test_ip", "192.168.60.91")) + self.__plc_test_ip = varlist.get('plc_test_ip', self.__xmlObj.getKeyVal(self.__QPLCName, "plc_test_ip", "10.10.1.254")) self.__skipflash = varlist.get('skipflash', self.__xmlObj.getKeyVal(self.__QPLCName, "skipflash", "0")) self.__plc = dcpPLC(self.__factoryTool, self.__mtd_device) self.__factory_password = varlist.get('factory_password', self.__xmlObj.getKeyVal(self.__QPLCName, "factory_password", "0")) self.__firmware_password = varlist.get('firmware_password', self.__xmlObj.getKeyVal(self.__QPLCName, "firmware_password", "0")) + self.__plc_reset_wait = varlist.get('plc_reset_wait', self.__xmlObj.getKeyVal(self.__QPLCName, "plc_reset_wait", "60")) self.__resultlist = [] @@ -88,6 +91,14 @@ class Qplc(unittest.TestCase): #plcMAC = res[0][0] #self.__plc.set_plc('SYSTEM.PRODUCTION.MAC_ADDR', '{}'.format(plcMAC), self.__firmware_password) self.__plc.setPLCReset() + time.sleep(int(self.__plc_reset_wait)) + # ping against the GPLC0000 board + ping3.EXCEPTIONS = True + try: + ping3.ping(self.__plc_test_ip, timeout=0.1) + except: + self.fail('PLC ping timeout') + def getresults(self): return self.__resultlist -- cgit v1.1 From 8765219910c95d0712ac0b2c27d33fd7a21fe3b8 Mon Sep 17 00:00:00 2001 From: Hector Fernandez Date: Mon, 14 Sep 2020 17:40:45 +0200 Subject: IGEP0048: Solved PLC test error. The PLC MAC address can be changed now. --- test-cli/test/tests/qplc.py | 1 + 1 file changed, 1 insertion(+) (limited to 'test-cli/test/tests') diff --git a/test-cli/test/tests/qplc.py b/test-cli/test/tests/qplc.py index 27bbbea..48d600a 100644 --- a/test-cli/test/tests/qplc.py +++ b/test-cli/test/tests/qplc.py @@ -82,6 +82,7 @@ class Qplc(unittest.TestCase): self.fail('BUG: ?????') logObj.getlogger().info("MAC {}".format(res[0])) plcMAC = res[0][0] + time.sleep(12) self.__plc.set_plc2('SYSTEM.PRODUCTION.SECTOR0_UNLOCK_PASSWORD', self.__factory_password, 'SYSTEM.PRODUCTION.MAC_ADDR', -- cgit v1.1 From 6550286c1f6ce5b1dbfc718b5566af3012ab538a Mon Sep 17 00:00:00 2001 From: Hector Fernandez Date: Tue, 15 Sep 2020 16:29:14 +0200 Subject: IGEP0048: PLC test: Variable IP of the connected PLC. Use discover before changing MAC address. --- test-cli/test/tests/qplc.py | 27 ++++++++++++++++++++------- 1 file changed, 20 insertions(+), 7 deletions(-) (limited to 'test-cli/test/tests') diff --git a/test-cli/test/tests/qplc.py b/test-cli/test/tests/qplc.py index 48d600a..28310c6 100644 --- a/test-cli/test/tests/qplc.py +++ b/test-cli/test/tests/qplc.py @@ -6,6 +6,8 @@ import sh import stat import time import ping3 +from netifaces import AF_INET +import netifaces as ni from test.helpers.md5 import md5_file from test.helpers.plc import dcpPLC @@ -34,7 +36,6 @@ class Qplc(unittest.TestCase): self.__gen_mac = varlist.get('gen_mac', self.__xmlObj.getKeyVal(self.__QPLCName, "gen_mac", "0")) self.__mtd_device = varlist.get('mtd_device', self.__xmlObj.getKeyVal(self.__QPLCName, "mtd_device", "/dev/mtd0")) self.__firmware_Path = varlist.get('firmwarepath', self.__xmlObj.getKeyVal(self.__QPLCName, "firmwarepath", "/root/hwtest-files/firmware")) - self.__plc_test_ip = varlist.get('plc_test_ip', self.__xmlObj.getKeyVal(self.__QPLCName, "plc_test_ip", "10.10.1.254")) self.__skipflash = varlist.get('skipflash', self.__xmlObj.getKeyVal(self.__QPLCName, "skipflash", "0")) self.__plc = dcpPLC(self.__factoryTool, self.__mtd_device) self.__factory_password = varlist.get('factory_password', self.__xmlObj.getKeyVal(self.__QPLCName, "factory_password", "0")) @@ -82,25 +83,37 @@ class Qplc(unittest.TestCase): self.fail('BUG: ?????') logObj.getlogger().info("MAC {}".format(res[0])) plcMAC = res[0][0] - time.sleep(12) + # wait for PLC to be turned on + attemptcounter = 0 + plcfound = False + while attemptcounter < 5 and not plcfound: + if self.__plc.discover(): + plcfound = True + else: + attemptcounter += 1 + time.sleep(5) + # mandatory wait before configuring the PLC chip + time.sleep(2) self.__plc.set_plc2('SYSTEM.PRODUCTION.SECTOR0_UNLOCK_PASSWORD', self.__factory_password, 'SYSTEM.PRODUCTION.MAC_ADDR', '{}'.format(plcMAC), self.__firmware_password) - #self.__plc.set_plc('SYSTEM.PRODUCTION.SECTOR0_UNLOCK_PASSWORD', self.__factory_password, self.__firmware_password) - #plcMAC = res[0][0] - #self.__plc.set_plc('SYSTEM.PRODUCTION.MAC_ADDR', '{}'.format(plcMAC), self.__firmware_password) + # plc reset to save changes self.__plc.setPLCReset() time.sleep(int(self.__plc_reset_wait)) + # calculate the IP of the GPLC0000 board to ping + ip_eth01 = ni.ifaddresses('eth0:1')[AF_INET][0]['addr'] # plc_test_ip = 10.10.1.113 + ip_eth01_splited = ip_eth01.split('.') + ip_eth01_splited[3] = str(int(ip_eth01_splited[3]) + 100) + plc_test_ip = ".".join(ip_eth01_splited) # plc_test_ip = 10.10.1.213 # ping against the GPLC0000 board ping3.EXCEPTIONS = True try: - ping3.ping(self.__plc_test_ip, timeout=0.1) + ping3.ping(plc_test_ip, timeout=0.1) except: self.fail('PLC ping timeout') - def getresults(self): return self.__resultlist -- cgit v1.1 From d8d4684c24a7c34334bb0b1d74dead69282e8c46 Mon Sep 17 00:00:00 2001 From: Josep Maso Date: Wed, 30 Sep 2020 10:32:16 +0200 Subject: Modified audio test to work in IGEP0046. --- test-cli/test/tests/qaudio.py | 5 +++++ 1 file changed, 5 insertions(+) (limited to 'test-cli/test/tests') diff --git a/test-cli/test/tests/qaudio.py b/test-cli/test/tests/qaudio.py index 2c86809..a219a1a 100644 --- a/test-cli/test/tests/qaudio.py +++ b/test-cli/test/tests/qaudio.py @@ -50,6 +50,11 @@ class Qaudio(unittest.TestCase): time.sleep(self.__run_delay) p1 = sh.aplay(self.__dtmf_file_play) p2.wait() + + p2 = sh.arecord("-r", self.__sampling_rate, "-d", calcRecordTime, "{}/{}".format(self.__record_path, self.__dtmf_file_record), _bg=True) + time.sleep(self.__run_delay) + p1 = sh.aplay(self.__dtmf_file_play) + p2.wait() if p1.exit_code == 0 and p2.exit_code == 0: p = sh.multimon("-t", "wav", "-a", "DTMF", "{}/{}".format(self.__record_path, self.__dtmf_file_record), "-q") if p.exit_code == 0: -- cgit v1.1 From ca73fcc336edc23db5750e93b6e1014d32a54ea5 Mon Sep 17 00:00:00 2001 From: Hector Fernandez Date: Tue, 29 Sep 2020 13:27:32 +0200 Subject: Added new test to validate video output with a webcam. --- test-cli/test/tests/qvideo.py | 124 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 124 insertions(+) create mode 100644 test-cli/test/tests/qvideo.py (limited to 'test-cli/test/tests') diff --git a/test-cli/test/tests/qvideo.py b/test-cli/test/tests/qvideo.py new file mode 100644 index 0000000..e2f413e --- /dev/null +++ b/test-cli/test/tests/qvideo.py @@ -0,0 +1,124 @@ +import unittest +import sh +import time +from test.helpers.camara import Camara +from test.helpers.detect import Detect_Color +from test.helpers.sdl import SDL2_Test + +class Qvideo(unittest.TestCase): + params = None + __resultlist = None # resultlist is a python list of python dictionaries + + def __init__(self, testname, testfunc, varlist): + self.params = varlist + super(Qvideo, self).__init__(testfunc) + self._testMethodDoc = testname + self.__xmlObj = varlist["xml"] + self.__QVideoName = varlist.get('name', 'qvideo') + self.__resultlist = [] + self.__Camara = Camara() + self.__SDL2_Test = SDL2_Test() + self.__SDL2_Test.Clear() + + def define_capture(self): + self.__Camara.setSize( + int(self.params.get('capture_size_w', self.__xmlObj.getKeyVal(self.__QVideoName, "capture_size_w", 1280))), + int(self.params.get('capture_size_h', self.__xmlObj.getKeyVal(self.__QVideoName, "capture_size_h", 720)))) + self.__discard_frames_Count = int(self.params.get('capture_discardframes', self.__xmlObj.getKeyVal(self.__QVideoName, "capture_discardframes", 3))) + self.__frame_mean = int(self.params.get('capture_framemean', self.__xmlObj.getKeyVal(self.__QVideoName, "capture_framemean", 3))) + self.__max_failed = int(self.params.get('capture_maxfailed', self.__xmlObj.getKeyVal(self.__QVideoName, "capture_maxfailed", 1))) + + def __drop_frames(self, frame_count): + count = frame_count + while count > 0: + _ = self.__Camara.getFrame() + count -= 1 + + def Verify_Color(self, color): + self.paintColor(color) + count = self.__frame_mean + res_ok = 0 + res_failed = 0 + r_count = 0 + c_mean = 0 + while count > 0: + res, t = self.Capture(color) + if res: + res_ok += 1 + else: + res_failed += 1 + if t == "count": + r_count += 1 + else: + c_mean +=1 + count -= 1 + self.unpaintColor(color) + if res_failed > self.__max_failed: + if r_count > 0: + return '{}_COLOR_FAILED'.format(color) + elif c_mean > 0: + return 'MEAN_{}_FAILED'.format(color) + return "ok" + + def paintColor(self, color): + self.__SDL2_Test.Paint(color) + time.sleep(3) + self.__drop_frames(self.__Camara.getFrameCount()) + + def unpaintColor(self, color): + self.__SDL2_Test.Clear() + time.sleep(1) + + def Capture(self, color): + image = self.__Camara.getFrame() + if image is None: + return None + dtObject = Detect_Color(image) + if color == 'RED': + _, _, mean, count = dtObject.getRed() + elif color == 'BLUE': + _, _, mean, count = dtObject.getBlue() + elif color == 'GREEN': + c_mask, croped, mean, count = dtObject.getGreen() + + return self.checkThreshold(color, mean, count) + + def checkThreshold(self, color, mean, count): + min_count = int( + self.params.get('{}_min_count'.format(color), + self.__xmlObj.getKeyVal(self.__QVideoName, '{}_min_count'.format(color), 50000))) + + str = "" + # first verify count + if count >= min_count: + return True, "" + else: + str = "count" + return False, str + + def execute(self): + self.__resultlist = [] + + # set image size + self.define_capture() + # Open camara + if not self.__Camara.Open(): + self.fail('Error: USB camera not found') + # discard initial frames + self.__drop_frames(self.__discard_frames_Count) + # Test + res = self.Verify_Color('RED') + if res != "ok": + self.fail("failed: RED verification failed") + res = self.Verify_Color('BLUE') + if res != "ok": + self.fail("failed: BLUE verification failed") + res = self.Verify_Color('GREEN') + if res != "ok": + self.fail("failed: GREEN verification failed") + + def getresults(self): + return self.__resultlist + + def gettextresult(self): + return "" -- cgit v1.1 From d4071b6579312b2fb265e379031f588354d35f3a Mon Sep 17 00:00:00 2001 From: Manel Caro Date: Fri, 2 Oct 2020 14:12:09 +0200 Subject: solve usb error bug with quotes message --- test-cli/test/tests/qusb.py | 2 ++ 1 file changed, 2 insertions(+) (limited to 'test-cli/test/tests') diff --git a/test-cli/test/tests/qusb.py b/test-cli/test/tests/qusb.py index 3182685..4f76d0c 100644 --- a/test-cli/test/tests/qusb.py +++ b/test-cli/test/tests/qusb.py @@ -69,6 +69,8 @@ class Qusb(unittest.TestCase): return False, 'md5 check failed {}<>{}'.format(originalMD5, md5val) except OSError as why: return False,'Error: {} tranfer failed'.format(why.errno) + except sh.ErrorReturnCode as shError: + return False, 'USB Exception: tranfer failed' except Exception as details: return False, 'USB Exception: {} tranfer failed'.format(details) return True, '' -- cgit v1.1 From 5771dcc8acabd2f4f560768cf45615e929409f6e Mon Sep 17 00:00:00 2001 From: Manel Caro Date: Fri, 2 Oct 2020 15:33:12 +0200 Subject: fix execution camera setup script path not correct --- test-cli/test/tests/qvideo.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'test-cli/test/tests') diff --git a/test-cli/test/tests/qvideo.py b/test-cli/test/tests/qvideo.py index e2f413e..b4133b6 100644 --- a/test-cli/test/tests/qvideo.py +++ b/test-cli/test/tests/qvideo.py @@ -16,7 +16,7 @@ class Qvideo(unittest.TestCase): self.__xmlObj = varlist["xml"] self.__QVideoName = varlist.get('name', 'qvideo') self.__resultlist = [] - self.__Camara = Camara() + self.__Camara = Camara(setup_script_path=varlist['testPath']) self.__SDL2_Test = SDL2_Test() self.__SDL2_Test.Clear() -- cgit v1.1 From e7631181c08c38c196558fc79b648d78ccaadcc8 Mon Sep 17 00:00:00 2001 From: Manel Caro Date: Mon, 5 Oct 2020 11:44:22 +0200 Subject: Add qvideo variables --- test-cli/test/tests/qvideo.py | 25 +++++++++++++------------ 1 file changed, 13 insertions(+), 12 deletions(-) (limited to 'test-cli/test/tests') diff --git a/test-cli/test/tests/qvideo.py b/test-cli/test/tests/qvideo.py index b4133b6..225940b 100644 --- a/test-cli/test/tests/qvideo.py +++ b/test-cli/test/tests/qvideo.py @@ -16,17 +16,20 @@ class Qvideo(unittest.TestCase): self.__xmlObj = varlist["xml"] self.__QVideoName = varlist.get('name', 'qvideo') self.__resultlist = [] - self.__Camara = Camara(setup_script_path=varlist['testPath']) - self.__SDL2_Test = SDL2_Test() - self.__SDL2_Test.Clear() + self.__w = int(varlist.get('capture_size_w', self.__xmlObj.getKeyVal(self.__QVideoName, "capture_size_w", 1280))) + self.__h = int(varlist.get('capture_size_h', self.__xmlObj.getKeyVal(self.__QVideoName, "capture_size_h", 720))) + self.__discard_frames_Count = int(varlist.get('capture_discardframes',self.__xmlObj.getKeyVal(self.__QVideoName, "capture_discardframes", 3))) + self.__frame_mean = int(varlist.get('capture_framemean', self.__xmlObj.getKeyVal(self.__QVideoName, "capture_framemean", 3))) + self.__max_failed = int(varlist.get('capture_maxfailed', self.__xmlObj.getKeyVal(self.__QVideoName, "capture_maxfailed", 1))) + self.__cam_setupscript = varlist.get('cam_setupfile', self.__xmlObj.getKeyVal(self.__QVideoName, "cam_setupfile", + "/root/hwtest-files/board/scripts/v4l-cam.sh")) + self.__sdl_display = varlist.get('sdl_display', self.__xmlObj.getKeyVal(self.__QVideoName, "sdl_display", ":0")) + self.__sdl_driver = varlist.get('sdl_driver', self.__xmlObj.getKeyVal(self.__QVideoName, "sdl_driver", "x11")) + self.__camdevice = varlist.get('camdevice', self.__xmlObj.getKeyVal(self.__QVideoName, "camdevice", "video0")) - def define_capture(self): - self.__Camara.setSize( - int(self.params.get('capture_size_w', self.__xmlObj.getKeyVal(self.__QVideoName, "capture_size_w", 1280))), - int(self.params.get('capture_size_h', self.__xmlObj.getKeyVal(self.__QVideoName, "capture_size_h", 720)))) - self.__discard_frames_Count = int(self.params.get('capture_discardframes', self.__xmlObj.getKeyVal(self.__QVideoName, "capture_discardframes", 3))) - self.__frame_mean = int(self.params.get('capture_framemean', self.__xmlObj.getKeyVal(self.__QVideoName, "capture_framemean", 3))) - self.__max_failed = int(self.params.get('capture_maxfailed', self.__xmlObj.getKeyVal(self.__QVideoName, "capture_maxfailed", 1))) + self.__Camara = Camara(setup_script_path=self.__cam_setupscript, device=self.__camdevice, width=self.__w, height=self.__h) + self.__SDL2_Test = SDL2_Test(driver=self.__sdl_driver, display=self.__sdl_display, w=self.__w, h=self.__h) + self.__SDL2_Test.Clear() def __drop_frames(self, frame_count): count = frame_count @@ -99,8 +102,6 @@ class Qvideo(unittest.TestCase): def execute(self): self.__resultlist = [] - # set image size - self.define_capture() # Open camara if not self.__Camara.Open(): self.fail('Error: USB camera not found') -- cgit v1.1