From a615dac03a9ea7897bf3947ba8d31278b2cea121 Mon Sep 17 00:00:00 2001 From: Hector Fernandez Date: Thu, 5 Mar 2020 16:29:31 +0100 Subject: Modify tests. --- test-cli/setup.xml | 2 +- test-cli/test/tests/qeeprom.py | 62 +++++++++++++++------------- test-cli/test/tests/qethernet.py | 24 +++++++---- test-cli/test/tests/qwifi.py | 88 +++++++++++++++++++++++++++++++--------- test-cli/test_main.py | 30 ++++++-------- 5 files changed, 132 insertions(+), 74 deletions(-) diff --git a/test-cli/setup.xml b/test-cli/setup.xml index 3feffd0..db9e7db 100644 --- a/test-cli/setup.xml +++ b/test-cli/setup.xml @@ -2,7 +2,7 @@ - + diff --git a/test-cli/test/tests/qeeprom.py b/test-cli/test/tests/qeeprom.py index 2bab248..5bb0755 100644 --- a/test-cli/test/tests/qeeprom.py +++ b/test-cli/test/tests/qeeprom.py @@ -1,38 +1,44 @@ -from test.helpers.syscmd import SysCommand +import sh import unittest -import uuid + class Qeeprom(unittest.TestCase): + __position = None + __uuid = None + __eeprompath = None + + # varlist: position, uuid, eeprompath def __init__(self, testname, testfunc, varlist): super(Qeeprom, self).__init__(testfunc) self._testMethodDoc = testname + if "position" in varlist: + self.__position = varlist["position"] + else: + raise Exception('position param inside Qeeprom must be defined') + if "uuid" in varlist: + self.__uuid = varlist["uuid"] + else: + raise Exception('uuid param inside Qeeprom must be defined') + if "eeprompath" in varlist: + self.__eeprompath = varlist["eeprompath"] + else: + raise Exception('eeprompath param inside Qeeprom must be defined') def execute(self): - str_cmd = "find /sys/ -iname 'eeprom'" - eeprom_location = SysCommand("eeprom_location", str_cmd) - if eeprom_location.execute() == 0: - self.__raw_out = eeprom_location.getOutput() - if self.__raw_out == "": - self.fail("Unable to get EEPROM location. IS EEPROM CONNECTED?") - return -1 - eeprom=self.__raw_out.decode('ascii') - test_uuid = uuid.uuid4() - str_cmd="echo '{}' > {}".format(str(test_uuid), eeprom) - eeprom_write = SysCommand("eeprom_write", str_cmd) - if eeprom_write.execute() == 0: - self.__raw_out = eeprom_write.getOutput() - if self.__raw_out == "": - self.fail("Unable to write on the EEPROM?") - return -1 - str_cmd = "head -2 {}".format(eeprom) - eeprom_read = SysCommand("eeprom_read", str_cmd) - if eeprom_read.execute() == 0: - self.__raw_out = eeprom_read.getOutput() - if self.__raw_out == "": - self.fail("Unable to read from the EEPROM?") - return -1 - if(str(self.__raw_out).find(str(test_uuid)) == -1): - self.fail("failed: READ/WRITE mismatch") + # write data into the eeprom + p = sh.dd(sh.echo(self._uuid), "of=" + self.__eeprompath, "bs=1", "seek=" + self.__position) + if p.exit_code == 0: + # read data from the eeprom + p = sh.dd("if=" + self.__eeprompath, "bs=1", "skip=" + self.__position, "count=" + len(self.__uuid)) + if p.exit_code == 0: + uuid_rx = p.stdout.decode('ascii') + # compare both values + if (uuid_rx != self.__uuid): + self.fail("failed: mismatch between written and received values.") + + else: + self.fail("failed: Unable to read from the EEPROM device.") else: - self.fail("failed: could not complete find eeprom command") + self.fail("failed: Unable to write on the EEPROM device.") + diff --git a/test-cli/test/tests/qethernet.py b/test-cli/test/tests/qethernet.py index be984f5..22d796c 100644 --- a/test-cli/test/tests/qethernet.py +++ b/test-cli/test/tests/qethernet.py @@ -1,12 +1,14 @@ import unittest import sh -import ex +import re + class Qethernet(unittest.TestCase): __sip = None __numbytestx = None __bind = None __OKBW = None + __port = None def __init__(self, testname, testfunc, varlist): super(Qethernet, self).__init__(testfunc) @@ -22,16 +24,21 @@ class Qethernet(unittest.TestCase): self.__OKBW = varlist["OKBW"] else: raise Exception('OKBW param inside Qethernet must be defined') + if "port" in varlist: + self.__port = varlist["port"] + else: + raise Exception('port param inside Qethernet must be defined') self.__numbytestx = "10M" self._testMethodDoc = testname def execute(self): # execute iperf command against the server if self.__bind is None: - p = sh.iperf("-c", self.__sip, "-x", "CMSV", "-n", self.__numbytestx, "-f", "m") + p = sh.iperf("-c", self.__sip, "-x", "CMSV", "-n", self.__numbytestx, "-f", "m", "-p", self.__port) else: - p = sh.iperf("-c", self.__sip, "-x", "CMSV", "-n", self.__numbytestx, "-f", "m", "-B", self.__bind) - #check if it was executed succesfully + p = sh.iperf("-c", self.__sip, "-x", "CMSV", "-n", self.__numbytestx, "-f", "m", "-p", self.__port, "-B", + self.__bind) + # check if it was executed succesfully if p.exit_code == 0: if p.stdout == "": self.fail("failed: error executing iperf command") @@ -41,11 +48,12 @@ class Qethernet(unittest.TestCase): # get first line dat = lines[1].decode('ascii') # search for the BW value - a = re.search("\d+(\.\d)? Mbits/sec",dat) - b = a.group().split( ) - BWreal = b[0] + a = re.search("\d+(\.\d)? Mbits/sec", dat) + b = a.group().split() + bwreal = b[0] # check if BW is in the expected range - self.failUnless(float(BWreal)>float(self.__OKBW)*0.9,"failed: speed is lower than spected. Speed(MB/s)" + str(BWreal)) + self.failUnless(float(bwreal) > float(self.__OKBW) * 0.9, + "failed: speed is lower than spected. Speed(MB/s)" + str(bwreal)) else: self.fail("failed: could not complete iperf command") diff --git a/test-cli/test/tests/qwifi.py b/test-cli/test/tests/qwifi.py index 188e300..154dd52 100644 --- a/test-cli/test/tests/qwifi.py +++ b/test-cli/test/tests/qwifi.py @@ -1,33 +1,81 @@ -from test.helpers.syscmd import SysCommand import unittest +import sh +import re + class Qwifi(unittest.TestCase): + __sip = None + __numbytestx = None + __bind = None + __OKBW = None + __port = None + #varlist: sip, bind, OKBW, port def __init__(self, testname, testfunc, varlist): super(Qwifi, self).__init__(testfunc) - if "signal" in varlist: - self.__signal = varlist["signal"] + if "sip" in varlist: + self.__sip = varlist["sip"] + else: + raise Exception('sip param inside Qwifi have been be defined') + if "OKBW" in varlist: + self.__OKBW = varlist["OKBW"] + else: + raise Exception('OKBW param inside Qwifi must be defined') + if "port" in varlist: + self.__port = varlist["port"] else: - raise Exception('signal param inside Qwifi must be defined') + raise Exception('port param inside Qwifi must be defined') + if "bind" in varlist: + self.__bind = varlist["bind"] + else: + self.__bind = None + self.__numbytestx = "10M" self._testMethodDoc = testname def execute(self): - str_cmd= "iw wlan0 link" - wifi_stats = SysCommand("wifi_stats", str_cmd) - if wifi_stats.execute() == 0: - w_stats = wifi_stats.getOutput().decode('ascii').splitlines() - #self._longMessage = str(self.__raw_out).replace("'", "") - if not w_stats[0] == "Not connected.": - signal_st = w_stats[5].split(" ")[1] - try: - int(signal_st) - if -1*int(signal_st) > int(self.__signal): - self.fail("failed: signal to server lower than expected") - except ValueError: - self.fail("failed: error output (Bad connection?)") + # check if the board is connected to the router by wifi + p = sh.iw("wlan0", "link") + if p.exit_code == 0: + # get the first line of the output stream + out1 = p.stdout.decode('ascii').splitlines()[0] + if out1 != "Not connected.": + #check if the board has ip in the wlan0 interface + p = sh.ifconfig("wlan0") + if p.exit_code == 0: + result = re.search("inet addr:(?!127\.0{1,3}\.0{1,3}\.0{0,2}1$)((25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.){3}(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)", + p.stdout) + if result: + # execute iperf command against the server + if self.__bind is None: + p = sh.iperf("-c", self.__sip, "-x", "CMSV", "-n", self.__numbytestx, "-f", "m", "-p", + self.__port) + else: + p = sh.iperf("-c", self.__sip, "-x", "CMSV", "-n", self.__numbytestx, "-f", "m", "-p", + self.__port, "-B", self.__bind) + # check if it was executed succesfully + if p.exit_code == 0: + if p.stdout == "": + self.fail("failed: error executing iperf command") + # analyze output string + # split by lines + lines = p.stdout.splitlines() + # get first line + dat = lines[1].decode('ascii') + # search for the BW value + a = re.search("\d+(\.\d)? Mbits/sec", dat) + b = a.group().split() + bwreal = b[0] + + # check if BW is in the expected range + self.failUnless(float(bwreal) > float(self.__OKBW) * 0.9, + "failed: speed is lower than spected. Speed(MB/s)" + str(bwreal)) + else: + self.fail("failed: could not complete iperf command") + else: + self.fail("failed: wlan0 interface doesn't have any ip address.") + else: + self.fail("failed: could not complete ifconfig command.") else: - self.fail("failed: error output (Bad connection?)") - #tx_brate = float(w_stats[6].split(" ")[2]) + self.fail("failed: wifi module is not connected to the router.") else: self.fail("failed: couldn't execute iw command") - diff --git a/test-cli/test_main.py b/test-cli/test_main.py index 6680082..b657f00 100644 --- a/test-cli/test_main.py +++ b/test-cli/test_main.py @@ -32,15 +32,15 @@ import re from test.helpers.iseelogger import ISEE_Logger import logging - psdbObj = TestSrv_Database() xmlObj = None loggerObj = None + # define clear function def clear(): # check and make call for specific operating system - _ = call('clear' if os.name =='posix' else 'cls') + _ = call('clear' if os.name == 'posix' else 'cls') def create_board(): @@ -48,25 +48,21 @@ def create_board(): variant = xmlObj.gettagKey('board', 'variant') # get model id - globalVar.g_mid=model_id + "-" + variant + globalVar.g_mid = model_id + "-" + variant - # get station number - hstname = socket.gethostname() - if (re.search("^Station\d{2}$", hstname)): - globalVar.station = int(re.search("\d{2}", hstname).group(0)) - else: - raise Exception('Station number not configured') - exit(3) + # get station + globalVar.station = socket.gethostname() processor_id = genDieid(globalVar.g_mid) print(globalVar.g_mid) print(processor_id) - globalVar.g_uuid = psdbObj.create_board(processor_id, model_id, variant, bmac = None) + globalVar.g_uuid = psdbObj.create_board(processor_id, model_id, variant, bmac=None) + def createvarlist(testvars): varlist = {} for row in testvars: - varname,testparam = row + varname, testparam = row varlist[varname] = testparam return varlist @@ -75,13 +71,13 @@ def testsuite(): suite = unittest.TestSuite() tests = psdbObj.getboard_comp_test_list(globalVar.g_uuid) for row in tests: - testdefname,testfunc = row + testdefname, testfunc = row testvars = psdbObj.getboard_test_variables(globalVar.g_uuid, testdefname) varlist = createvarlist(testvars) command = "suite.addTest({}('{}','execute', varlist))".format(testfunc, testdefname) exec(command) - - globalVar.testid_ctl=psdbObj.open_testbatch(globalVar.g_uuid, globalVar.station) + + globalVar.testid_ctl = psdbObj.open_testbatch(globalVar.g_uuid, globalVar.station) return suite @@ -90,6 +86,7 @@ def finish_test(): # Update Set Test status with FINISH so that status column is not modified because close_testbatch already did it. psdbObj.update_set_test_row(globalVar.station, globalVar.testid_ctl, globalVar.g_uuid, 'END', 'FINISH') + def main(): create_board() try: @@ -120,6 +117,5 @@ if __name__ == "__main__": if psdbObj.open(xmlObj): main() else: - print("Error: Cannot open DB connection") + print("Error: Cannot open DB connection") exit(2) - -- cgit v1.1