From 09de774dcc1a5abc1c8f3a00fdb039aa3c522f52 Mon Sep 17 00:00:00 2001 From: Manel Caro Date: Wed, 4 Mar 2020 17:46:36 +0100 Subject: SOPA Initial Commit --- test-cli/setup.xml | 4 +- test-cli/test/helpers/cv_display_test.py | 28 +++---- test-cli/test/helpers/get_dieid.py | 2 - test-cli/test/helpers/globalVariables.py | 2 - test-cli/test/helpers/iseelogger.py | 39 ++++++++++ test-cli/test/helpers/psqldb.py | 19 ++--- test-cli/test/helpers/setup_xml.py | 9 ++- test-cli/test/helpers/syscmd.py | 19 +++-- test-cli/test/helpers/testsrv_db.py | 37 +++++---- test-cli/test/helpers/usb.sh | 14 ++++ test-cli/test/runners/simple.py | 26 ++++--- test-cli/test/tests/qamp.py | 47 +++++------- test-cli/test/tests/qaudio.py | 11 +-- test-cli/test/tests/qbutton.py | 19 +++-- test-cli/test/tests/qduplex_ser.py | 21 ++++-- test-cli/test/tests/qeeprom.py | 2 +- test-cli/test/tests/qethernet.py | 78 +++++++++---------- test-cli/test/tests/qflash.py | 5 +- test-cli/test/tests/qi2c.py | 12 ++- test-cli/test/tests/qnand.py | 22 ++++++ test-cli/test/tests/qram.py | 33 ++++---- test-cli/test/tests/qrtc.py | 9 ++- test-cli/test/tests/qscreen.py | 18 ++--- test-cli/test/tests/qserial.py | 15 +++- test-cli/test/tests/qusb.py | 17 +++-- test-cli/test/tests/qwifi.py | 52 +++++-------- test-cli/test_main.py | 125 ++++++++++++++++--------------- 27 files changed, 394 insertions(+), 291 deletions(-) create mode 100644 test-cli/test/helpers/iseelogger.py create mode 100755 test-cli/test/helpers/usb.sh create mode 100644 test-cli/test/tests/qnand.py diff --git a/test-cli/setup.xml b/test-cli/setup.xml index 41295b5..3feffd0 100644 --- a/test-cli/setup.xml +++ b/test-cli/setup.xml @@ -2,8 +2,8 @@ - - + + diff --git a/test-cli/test/helpers/cv_display_test.py b/test-cli/test/helpers/cv_display_test.py index b54a698..3a3004f 100644 --- a/test-cli/test/helpers/cv_display_test.py +++ b/test-cli/test/helpers/cv_display_test.py @@ -14,8 +14,7 @@ def pattern_detect(cam_device=0): # RETURN 0 only if the test is ok msg="0" # Capture the corresponding camera device [0,1] - #capture = cv2.VideoCapture(0) - capture = cv2.VideoCapture("/dev/v4l/by-id/usb-Creative_Technology_Ltd._Live__Cam_Sync_HD_VF0770-video-index0") + capture = cv2.VideoCapture(0) try: _, image = capture.read() except: @@ -64,12 +63,10 @@ def pattern_detect(cam_device=0): # After testing the gamma factor correction is computed with the green color # gamma = gamma_factor / blue+red into green part # gamma factor = 50-100 - #gamma = (100 / (avg_color_rawg[0] + avg_color_rawg[2] + 1)) - gamma=1 + gamma = (100 / (avg_color_rawg[0] + avg_color_rawg[2] + 1)) # Adjust the image acording to this gamma value adjusted = adjust_gamma(image, gamma=gamma) -# adjusted=image - cv2.imwrite( "/home/root/result_hdmi_img.jpg", adjusted); + # Calculate again the average color using the gamma adjusted image # Crop the gamma adjusted image for wach color section red_cal = adjusted[y1:y2, xr1:xr2] @@ -109,19 +106,14 @@ def pattern_detect(cam_device=0): mask_r = mask0 + mask1 # mask_r = cv2.inRange(hsv, lower_red, upper_red) # Perform a morphological open to expand -# kernel = np.ones((5, 5), np.uint8) -# closing_r = cv2.morphologyEx(mask_r, cv2.MORPH_OPEN, kernel) -# closing_b = cv2.morphologyEx(mask_b, cv2.MORPH_OPEN, kernel) -# closing_g = cv2.morphologyEx(mask_g, cv2.MORPH_OPEN, kernel) + kernel = np.ones((15, 15), np.uint8) + closing_r = cv2.morphologyEx(mask_r, cv2.MORPH_OPEN, kernel) + closing_b = cv2.morphologyEx(mask_b, cv2.MORPH_OPEN, kernel) + closing_g = cv2.morphologyEx(mask_g, cv2.MORPH_OPEN, kernel) # Count the number of pixels that are not of the corresponding color (black) -# count_r = cv2.countNonZero(closing_r) -# count_b = cv2.countNonZero(closing_b) -# count_g = cv2.countNonZero(closing_g) - #----------- - count_r = cv2.countNonZero(mask_r) - count_b = cv2.countNonZero(mask_b) - count_g = cv2.countNonZero(mask_g) - #------- + count_r = cv2.countNonZero(closing_r) + count_b = cv2.countNonZero(closing_b) + count_g = cv2.countNonZero(closing_g) if (count_r < 5): msg = "RED COUNT FAIL" return msg diff --git a/test-cli/test/helpers/get_dieid.py b/test-cli/test/helpers/get_dieid.py index b20f143..029ddb5 100644 --- a/test-cli/test/helpers/get_dieid.py +++ b/test-cli/test/helpers/get_dieid.py @@ -21,8 +21,6 @@ def read(addr): def getRegisters(model): if model.find("IGEP0046") == 0: registers = [0x021BC420, 0x021BC410] - elif model.find("IGEP0000") == 0: - registers = [0x021BC420, 0x021BC410] elif model.find("IGEP0034") == 0 or model.find("SOPA0000") == 0: registers = [0x44e10630, 0x44e10634, 0x44e10638, 0x44e1063C] elif model.find("OMAP3") == 0: diff --git a/test-cli/test/helpers/globalVariables.py b/test-cli/test/helpers/globalVariables.py index c4d8358..6b89f4d 100644 --- a/test-cli/test/helpers/globalVariables.py +++ b/test-cli/test/helpers/globalVariables.py @@ -6,5 +6,3 @@ def globalVar(): g_mid = "" outdata = "NONE" station = "" - fstatus = "" - gdisplay = None \ No newline at end of file diff --git a/test-cli/test/helpers/iseelogger.py b/test-cli/test/helpers/iseelogger.py new file mode 100644 index 0000000..785a78c --- /dev/null +++ b/test-cli/test/helpers/iseelogger.py @@ -0,0 +1,39 @@ +import logging +import logging.handlers + + +class ISEE_Logger(object): + __logger = None + __logHandler = None + __formater = None + __logHandlerConsole = None + + def __init__(self, level=logging.INFO): + # Create syslog logger + self.__logger = logging.getLogger('ISEE_logger') + self.__logger.setLevel(level) + self.__logHandler = logging.handlers.SysLogHandler('/dev/log') + self.__logHandlerConsole = logging.StreamHandler() + self.__formater = logging.Formatter('Python: { "loggerName":"%(name)s", "timestamp":"%(asctime)s", "pathName":"%(pathname)s", "logRecordCreationTime":"%(created)f", "functionName":"%(funcName)s", "levelNo":"%(levelno)s", "lineNo":"%(lineno)d", "time":"%(msecs)d", "levelName":"%(levelname)s", "message":"%(message)s"}') + self.__logHandler.formatter = self.__formater + self.__logHandlerConsole.formatter = self.__formater + self.__logger.addHandler(self.__logHandler) + self.__logger.addHandler(self.__logHandlerConsole) + + def setLogLevel(self, level): + if level.upper() == "DEBUG": + nlevel = logging.DEBUG + elif level.upper() == "INFO": + nlevel = logging.INFO + elif level.upper() == "ERROR": + nlevel = logging.ERROR + elif level.upper() == "WARNING": + nlevel = logging.WARNING + else: + nlevel = logging.DEBUG + + self.__logger.setLevel(nlevel) + + def getlogger(self): + return self.__logger + diff --git a/test-cli/test/helpers/psqldb.py b/test-cli/test/helpers/psqldb.py index 26dd03d..1d0e422 100644 --- a/test-cli/test/helpers/psqldb.py +++ b/test-cli/test/helpers/psqldb.py @@ -7,15 +7,15 @@ class PgSQLConnection(object): __db_config = {'dbname': 'testsrv', 'host': '192.168.2.171', 'password': 'Idkfa2009', 'port': 5432, 'user': 'admin'} - def __init__ (self): -# self.__conection_object = None -# if connect_str is not None: -# self.__db_config = connect_str -# else: - self.__db_config = {'dbname': 'testsrv', 'host': '192.168.2.171', - 'password': 'Idkfa2009', 'port': 5432, 'user': 'admin'} - - def db_connect (self, connect_str): + def __init__ (self, connect_str = None): + self.__conection_object = None + if connect_str is not None: + self.__db_config = connect_str + else: + self.__db_config = {'dbname': 'testsrv', 'host': '192.168.2.171', + 'password': 'Idkfa2009', 'port': 5432, 'user': 'admin'} + + def db_connect (self, connect_str = None): result = False try: if connect_str == None: @@ -23,6 +23,7 @@ class PgSQLConnection(object): else: self.__db_config = connect_str; self.__conection_object = psycopg2.connect(**self.__db_config) + self.__conection_object.autocommit = True result = True except Exception as error: diff --git a/test-cli/test/helpers/setup_xml.py b/test-cli/test/helpers/setup_xml.py index 3fd9fd5..eb8d73c 100644 --- a/test-cli/test/helpers/setup_xml.py +++ b/test-cli/test/helpers/setup_xml.py @@ -37,4 +37,11 @@ class XMLSetup (object): def getMysqlConnectionStr (self): """aaaaa""" - pass \ No newline at end of file + pass + + def gettagKey (self, xmltag, xmlkey): + """aaaaa""" + for element in self.__tree.iter(xmltag): + return element.attrib[xmlkey] + + return None \ No newline at end of file diff --git a/test-cli/test/helpers/syscmd.py b/test-cli/test/helpers/syscmd.py index b579e39..a869bd7 100644 --- a/test-cli/test/helpers/syscmd.py +++ b/test-cli/test/helpers/syscmd.py @@ -10,15 +10,24 @@ class TestSysCommand(unittest.TestCase): __outdata = None __outtofile = False - def __init__(self, testname, testfunc, str_cmd, outtofile = False): + #varlist: str_cmd, outtofile + def __init__(self, testname, testfunc, varlist): """ init """ super(TestSysCommand, self).__init__(testfunc) - self.__str_cmd = str_cmd + if "str_cmd" in varlist: + self.__str_cmd = varlist["str_cmd"] + else: + raise Exception('str_cmd param inside TestSysCommand have been be defined') self.__testname = testname - self.__outtofile = outtofile + if "outtofile" in varlist: + self.__outtofile = varlist["outtofile"] + if self.__outtofile is True: + self.__outfilename = '/tmp/{}.txt'.format(testname) + else: + self.__outtofile = None + self.__outfilename = None self._testMethodDoc = testname - if self.__outtofile is True: - self.__outfilename = '/tmp/{}.txt'.format(testname) + def getName(self): return self.__testname diff --git a/test-cli/test/helpers/testsrv_db.py b/test-cli/test/helpers/testsrv_db.py index 94181f9..f08cb17 100644 --- a/test-cli/test/helpers/testsrv_db.py +++ b/test-cli/test/helpers/testsrv_db.py @@ -19,12 +19,12 @@ class TestSrv_Database(object): def __init__(self): pass - def open (self, filename): - '''Open database connection''' - self.__xml_setup = XMLSetup(filename) + def open (self, xmlObj): + self.__xml_setup = xmlObj; self.__sqlObject = PgSQLConnection() return self.__sqlObject.db_connect(self.__xml_setup.getdbConnectionStr()) + def create_board(self, processor_id, model_id, variant, bmac = None): '''create a new board''' if bmac is None: @@ -95,7 +95,20 @@ class TestSrv_Database(object): def getboard_comp_test_list(self, board_uuid): '''get the board test list''' - sql = "SELECT isee.gettestcompletelist('{}')".format(board_uuid) + sql = "SELECT * FROM isee.gettestcompletelist('{}')".format(board_uuid) + #print('>>>' + sql) + try: + res = self.__sqlObject.db_execute_query(sql) + #print(res) + return res; + except Exception as err: + r = find_between(str(err), '#', '#') + #print(r) + return None + + def getboard_test_variables(self, board_uuid, testdefid): + '''get the board test list''' + sql = "SELECT * FROM isee.getboardtestvariables('{}', '{}')".format(board_uuid, testdefid) #print('>>>' + sql) try: res = self.__sqlObject.db_execute_query(sql) @@ -159,18 +172,4 @@ class TestSrv_Database(object): except Exception as err: r = find_between(str(err), '#', '#') #print(r) - return None - - def getboard_eeprom(self, board_uuid): - '''get the board eeprom struct ''' - sql = "SELECT isee.getboard_eeprom('{}')".format(board_uuid) - #print('>>>' + sql) - try: - res = self.__sqlObject.db_execute_query(sql) - #print(res) - return res; - except Exception as err: - r = find_between(str(err), '#', '#') - #print(r) - return None - \ No newline at end of file + return None \ No newline at end of file diff --git a/test-cli/test/helpers/usb.sh b/test-cli/test/helpers/usb.sh new file mode 100755 index 0000000..c8e6924 --- /dev/null +++ b/test-cli/test/helpers/usb.sh @@ -0,0 +1,14 @@ +#!/bin/bash + +for sysdevpath in $(find /sys/bus/usb/devices/usb*/ -name dev); do + ( + syspath="${sysdevpath%/dev}" + devname="$(udevadm info -q name -p $syspath)" + [[ "$devname" == "bus/"* ]] && continue + eval "$(udevadm info -q property --export -p $syspath)" + [[ -z "$ID_SERIAL" ]] && continue + echo "/dev/$devname - $ID_SERIAL" + ) +done + +return 0 diff --git a/test-cli/test/runners/simple.py b/test-cli/test/runners/simple.py index 5b2da5a..3537d0e 100644 --- a/test-cli/test/runners/simple.py +++ b/test-cli/test/runners/simple.py @@ -22,9 +22,12 @@ class SimpleTestRunner: --------------------------------------------- """ - def __init__(self, stream=sys.stderr, verbosity=0): + __pgObj = None + + def __init__(self, pgObj, stream=sys.stderr, verbosity=0): self.stream = stream self.verbosity = verbosity + self.__pgObj = pgObj def writeUpdate(self, message): self.stream.write(message) @@ -33,7 +36,7 @@ class SimpleTestRunner: """ Run the given test case or Test Suite. """ - result = TextTestResult(self) + result = TextTestResult(self, self.__pgObj) test(result) result.testsRun # self.writeUpdate("---------------------------------------------\n") @@ -45,18 +48,22 @@ class TextTestResult(unittest.TestResult): FAIL = '\033[31mFAIL\033[0m\n' ERROR = '\033[31mERROR\033[0m\n' - def __init__(self, runner): + __pgObj = None + + def __init__(self, runner, pgObj): unittest.TestResult.__init__(self) self.runner = runner self.result = self.ERROR + self.__pgObj = pgObj def startTest(self, test): unittest.TestResult.startTest(self, test) self.runner.writeUpdate("%s : " % test.shortDescription()) # SEND TO DB THE UPDATE THAT WE RUN EACH TEST - psdb = TestSrv_Database() - psdb.open("setup.xml") - psdb.update_set_test_row(globalVar.station, globalVar.testid_ctl, globalVar.g_uuid, test.shortDescription(), "RUNNING") + self.__pgObj.update_set_test_row(globalVar.station, globalVar.testid_ctl, globalVar.g_uuid, test.shortDescription(), "RUNNING") + # psdb = TestSrv_Database() + # psdb.open("setup.xml") + # psdb.update_set_test_row(globalVar.station, globalVar.testid_ctl, globalVar.g_uuid, test.shortDescription(), "RUNNING") def addSuccess(self, test): unittest.TestResult.addSuccess(self, test) @@ -93,6 +100,7 @@ class TextTestResult(unittest.TestResult): if self.result == self.FAIL: simple_result = "FALSE" elif self.result == self.ERROR: simple_result = "FALSE" #SEND TO DB THE RESULT OF THE TEST - psdb = TestSrv_Database() - psdb.open("setup.xml") - psdb.add_test_to_batch(globalVar.g_uuid, test.shortDescription(), globalVar.testid_ctl, simple_result, globalVar.g_mid, test.longMessage) + # psdb = TestSrv_Database() + # psdb.open("setup.xml") + self.__pgObj.add_test_to_batch(globalVar.g_uuid, test.shortDescription(), globalVar.testid_ctl, simple_result, globalVar.g_mid, test.longMessage) + # psdb.add_test_to_batch(globalVar.g_uuid, test.shortDescription(), globalVar.testid_ctl, simple_result, globalVar.g_mid, test.longMessage) diff --git a/test-cli/test/tests/qamp.py b/test-cli/test/tests/qamp.py index 9fcd6d2..3e38ce9 100644 --- a/test-cli/test/tests/qamp.py +++ b/test-cli/test/tests/qamp.py @@ -5,10 +5,17 @@ import time class Qamp(unittest.TestCase): - def __init__(self, testname, testfunc, undercurrent=0.1, overcurrent=2): + #varlist: undercurrent, overcurrent + def __init__(self, testname, testfunc, varlist): self._current = 0.0 - self._undercurrent=undercurrent - self._overcurrent = overcurrent + if "undercurrent" in varlist: + self._undercurrent = varlist["undercurrent"] + else: + raise Exception('undercurrent param inside Qamp must be defined') + if "overcurrent" in varlist: + self._overcurrent = varlist["overcurrent"] + else: + raise Exception('overcurrent param inside Qamp must be defined') self._vshuntfactor=16384.0 self._ser = serial.Serial() self._ser.port = '/dev/ttyUSB0' @@ -33,24 +40,12 @@ class Qamp(unittest.TestCase): except: self.fail("failed: IMPOSSIBLE OPEN USB-SERIAL PORT ( {} )".format(self._ser.port)) error=1 - return -1 if error==0: # Clean input and output buffer of serial port self._ser.flushInput() self._ser.flushOutput() # Send command to read Voltage at Shunt resistor # Prepare cmd - cmd = ('at\n\r') - i=0 - while (i < len(cmd)): - i = i + self._ser.write(cmd[i].encode('ascii')) - time.sleep(0.05) - self._ser.read(1) - res0 = [] - while (self._ser.inWaiting() > 0): # if incoming bytes are waiting to be read from the serial input buffer - res0.append(self._ser.read(1).decode('ascii')) # read the bytes and convert from binary array to ASCII - print(res0) - #CHECK COM FIRST cmd = ('at+in?\n\r') i = 0 @@ -67,21 +62,15 @@ class Qamp(unittest.TestCase): string = ''.join(res) string = string.replace('\n', '') - try: - self._current = float(int(string, 0)) / self._vshuntfactor - except: - self.fail("failed: CAN'T READ CONSUMPTION (CURRENT=0?)") - error=1 - return -1 - if error==0: - print(self._current) + self._current = float(int(string, 0)) / self._vshuntfactor + print(self._current) # In order to give a valid result it is importarnt to define an under current value - if (self._current > float(self._overcurrent)): - self.fail("failed: OVERCURRENT DETECTED ( {} )".format(self._current)) - return -1 + if (self._current > float(self._overcurrent)): + self.fail("failed: OVERCURRENT DETECTED ( {} )".format(self._current)) + + if (self._current < float(self._undercurrent)): + self.fail("failed: UNDERCURRENT DETECTED ( {} )".format(self._current)) + - if (self._current < float(self._undercurrent)): - self.fail("failed: UNDERCURRENT DETECTED ( {} )".format(self._current)) - return -1 diff --git a/test-cli/test/tests/qaudio.py b/test-cli/test/tests/qaudio.py index a1572ca..fe57be2 100644 --- a/test-cli/test/tests/qaudio.py +++ b/test-cli/test/tests/qaudio.py @@ -13,13 +13,14 @@ class Qaudio(unittest.TestCase): self.__refSum = 25 # 1+3+5+7+9 def execute(self): - str_cmd = "aplay test/files/dtmf-13579.wav 2> /dev/null & arecord -r 8000 -d 1 recorded.wav 2> /dev/null" #.format(self.__dtmfFile) + str_cmd = "aplay test/files/dtmf-13579.wav & arecord -r 8000 -d 1 recorded.wav" #.format(self.__dtmfFile) audio_loop = SysCommand("command-name", str_cmd) - if audio_loop.execute() == 0:# BUG: Returns -1 but work + if audio_loop.execute() == -1:# BUG: Returns -1 but work lines = audio_loop.getOutput().splitlines() - str_cmd = "multimon -t wav -a DTMF recorded.wav -q 2> /dev/null" + str_cmd = "multimon -t wav -a DTMF recorded.wav -q" dtmf_decoder = SysCommand("command-name", str_cmd) - if dtmf_decoder.execute() == 0: # BUG: Returns -1 but work + a=dtmf_decoder.execute() + if dtmf_decoder.execute() == -1: # BUG: Returns -1 but work self.__raw_out = dtmf_decoder.getOutput() if self.__raw_out == "": return -1 @@ -34,4 +35,4 @@ class Qaudio(unittest.TestCase): else: self.fail("failed: fail playing/recording file") return -1 - return 0 + return 0 \ No newline at end of file diff --git a/test-cli/test/tests/qbutton.py b/test-cli/test/tests/qbutton.py index 72a897e..4718924 100644 --- a/test-cli/test/tests/qbutton.py +++ b/test-cli/test/tests/qbutton.py @@ -5,8 +5,12 @@ import time class Qbutton(unittest.TestCase): - def __init__(self, testname, testfunc, gpio): - if gpio == "SOPA": + def __init__(self, testname, testfunc, varlist): + if "gpio" in varlist: + self.__gpio = varlist["gpio"] + else: + raise Exception('gpio param inside Qbutton must be defined') + if self.__gpio == "SOPA": super(Qbutton, self).__init__("buttonSopa") else: super(Qbutton, self).__init__("buttonGpio") @@ -34,19 +38,22 @@ class Qbutton(unittest.TestCase): get_button_val = SysCommand("get_button_val", str_cmd) print("\n\t --> PRESS button for 1 sec (TIMEOUT: 10s) \n") timeout = 0 - while timeout < 20: + while timeout < 7200: if get_button_val.execute() == 0: get_button_val.execute() button_value = get_button_val.getOutput() button_value=button_value.decode('ascii').split("x") if int(button_value[1]) == 4: - timeout = 20 + timeout = 7200 + led_off="echo 0 > /sys/class/leds/red\:usbhost/brightness" + ledoff = SysCommand("led_off", led_off) + ledoff.execute() time.sleep(0.5) timeout = timeout + 1 - if timeout==20 and int(button_value[1]) == 0: + if timeout==7200 and int(button_value[1]) == 0: self.fail("failed: timeout exceeded") else: - timeout = 20 + timeout = 7200 self.fail("failed: not button input") else: self.fail("failed: could not complete i2c reset button state") diff --git a/test-cli/test/tests/qduplex_ser.py b/test-cli/test/tests/qduplex_ser.py index 98bda81..837b4d0 100644 --- a/test-cli/test/tests/qduplex_ser.py +++ b/test-cli/test/tests/qduplex_ser.py @@ -6,15 +6,26 @@ import time class Qduplex(unittest.TestCase): - def __init__(self, testname, testfunc, port1, port2, baudrate): + def __init__(self, testname, testfunc, varlist): super(Qduplex, self).__init__(testfunc) - self.__port1 = port1 + if "port1" in varlist: + self.__port1 = varlist["port1"] + else: + raise Exception('port1 param inside Qduplex must be defined') self.__serial1 = serial.Serial(self.__port1, timeout=1) - self.__serial1.baudrate = baudrate - self.__port2 = port2 + if "port2" in varlist: + self.__port2 = varlist["port2"] + else: + raise Exception('port2 param inside Qduplex must be defined') self.__serial2 = serial.Serial(self.__port2, timeout=1) - self.__serial2.baudrate = baudrate + + if "baudrate" in varlist: + self.__serial1.baudrate = varlist["baudrate"] + self.__serial2.baudrate = varlist["baudrate"] + else: + raise Exception('baudrate param inside Qduplex must be defined') + self._testMethodDoc = testname def __del__(self): diff --git a/test-cli/test/tests/qeeprom.py b/test-cli/test/tests/qeeprom.py index f72f78f..2bab248 100644 --- a/test-cli/test/tests/qeeprom.py +++ b/test-cli/test/tests/qeeprom.py @@ -4,7 +4,7 @@ import uuid class Qeeprom(unittest.TestCase): - def __init__(self, testname, testfunc): + def __init__(self, testname, testfunc, varlist): super(Qeeprom, self).__init__(testfunc) self._testMethodDoc = testname diff --git a/test-cli/test/tests/qethernet.py b/test-cli/test/tests/qethernet.py index 2ac447c..be984f5 100644 --- a/test-cli/test/tests/qethernet.py +++ b/test-cli/test/tests/qethernet.py @@ -1,57 +1,51 @@ -from test.helpers.syscmd import SysCommand import unittest - +import sh +import ex class Qethernet(unittest.TestCase): __sip = None - __raw_out = None - __MB_req = None - __MB_real = None - __BW_real = None - __dat_list = None + __numbytestx = None __bind = None __OKBW = None - def __init__(self, testname, testfunc, sip = None, OKBW=100, bind=None): + def __init__(self, testname, testfunc, varlist): super(Qethernet, self).__init__(testfunc) - if sip is not None: - self.__sip = sip - if sip is not None: - self.__bind = bind - self.__MB_req = '10' - self.__OKBW=OKBW + if "sip" in varlist: + self.__sip = varlist["sip"] + else: + raise Exception('sip param inside Qethernet have been be defined') + if "bind" in varlist: + self.__bind = varlist["bind"] + else: + self.__bind = None + if "OKBW" in varlist: + self.__OKBW = varlist["OKBW"] + else: + raise Exception('OKBW param inside Qethernet must be defined') + self.__numbytestx = "10M" self._testMethodDoc = testname def execute(self): - print + # execute iperf command against the server if self.__bind is None: - str_cmd = "iperf -c {} -x CMSV -n {}M".format(self.__sip, self.__MB_req) + p = sh.iperf("-c", self.__sip, "-x", "CMSV", "-n", self.__numbytestx, "-f", "m") else: - str_cmd = "iperf -c {} -x CMSV -n {}M -B {}".format(self.__sip, self.__MB_req, self.__bind) - iperf_command = SysCommand("iperf", str_cmd) - if iperf_command.execute() == 0: - self.__raw_out = iperf_command.getOutput() - if self.__raw_out == "": - return -1 - lines = iperf_command.getOutput().splitlines() - dat = lines[1] - dat = dat.decode('ascii') - dat_list = dat.split( ) - for d in dat_list: - a = dat_list.pop(0) - if a == "sec": - break - self.__MB_real = dat_list[0] - self.__BW_real = dat_list[2] - self.__dat_list = dat_list - #print(self.__MB_real) - #print(self.__BW_real) - self.failUnless(float(self.__BW_real)>float(self.__OKBW)*0.9,"failed: speed is lower than spected. Speed(MB/s)" + str(self.__BW_real)) + p = sh.iperf("-c", self.__sip, "-x", "CMSV", "-n", self.__numbytestx, "-f", "m", "-B", self.__bind) + #check if it was executed succesfully + if p.exit_code == 0: + if p.stdout == "": + self.fail("failed: error executing iperf command") + # analyze output string + # split by lines + lines = p.stdout.splitlines() + # get first line + dat = lines[1].decode('ascii') + # search for the BW value + a = re.search("\d+(\.\d)? Mbits/sec",dat) + b = a.group().split( ) + BWreal = b[0] + + # check if BW is in the expected range + self.failUnless(float(BWreal)>float(self.__OKBW)*0.9,"failed: speed is lower than spected. Speed(MB/s)" + str(BWreal)) else: self.fail("failed: could not complete iperf command") - - def get_Total_MB(self): - return self.__MB_real; - - def get_Total_BW(self): - return self.__MB_real; diff --git a/test-cli/test/tests/qflash.py b/test-cli/test/tests/qflash.py index cdc4109..0c3fcb5 100644 --- a/test-cli/test/tests/qflash.py +++ b/test-cli/test/tests/qflash.py @@ -4,16 +4,13 @@ from test.helpers.globalVariables import globalVar class Qflasher(unittest.TestCase): - def __init__(self, testname, testfunc): + def __init__(self, testname, testfunc, varlist): super(Qflasher, self).__init__(testfunc) self._testMethodDoc = testname model=globalVar.g_mid if model.find("IGEP0046") == 0: self._flash_method = "mx6" self._binlocation="/boot/" - elif model.find("IGEP0000") == 0: - self._flash_method = "mx6" - self._binlocation="/boot/" elif model.find("IGEP0034") == 0: self._flash_method = "nandti" self._binlocation = "/boot/" diff --git a/test-cli/test/tests/qi2c.py b/test-cli/test/tests/qi2c.py index 409005c..2f14424 100644 --- a/test-cli/test/tests/qi2c.py +++ b/test-cli/test/tests/qi2c.py @@ -3,10 +3,16 @@ import unittest class Qi2c(unittest.TestCase): - def __init__(self, testname, testfunc, busnum, register): + def __init__(self, testname, testfunc, varlist): super(Qi2c, self).__init__(testfunc) - self.__busnum = busnum - self.__register = register.split("/") + if "busnum" in varlist: + self.__busnum = varlist["busnum"] + else: + raise Exception('busnum param inside Qi2c must be defined') + if "register" in varlist: + self.__register = varlist["register"].split("/") + else: + raise Exception('register param inside Qi2c must be defined') self.__devices=[] self._testMethodDoc = testname diff --git a/test-cli/test/tests/qnand.py b/test-cli/test/tests/qnand.py new file mode 100644 index 0000000..9f2cd47 --- /dev/null +++ b/test-cli/test/tests/qnand.py @@ -0,0 +1,22 @@ +import unittest +import sh + +class Qnand(unittest.TestCase): + + __device = "10M" + + # varlist: device + def __init__(self, testname, testfunc, varlist): + super(Qnand, self).__init__(testfunc) + + if "device" in varlist: + self.__device = varlist["device"] + else: + raise Exception('device param inside Qnand must be defined') + self._testMethodDoc = testname + + def execute(self): + try: + sh.nandtest("-m", self.__device) + except sh.ErrorReturnCode as e: + self.fail("failed: could not complete nandtest command") diff --git a/test-cli/test/tests/qram.py b/test-cli/test/tests/qram.py index 8ec0210..1b66e5c 100644 --- a/test-cli/test/tests/qram.py +++ b/test-cli/test/tests/qram.py @@ -1,22 +1,27 @@ -from test.helpers.syscmd import SysCommand import unittest +import sh class Qram(unittest.TestCase): - def __init__(self, testname, testfunc, memSize): + __memSize = "10M" + __loops = "1" + + # varlist: memSize, loops + def __init__(self, testname, testfunc, varlist): super(Qram, self).__init__(testfunc) - self.__memSize = memSize + + if "memSize" in varlist: + self.__memSize = varlist["memSize"] + else: + raise Exception('memSize param inside Qram must be defined') + if "loops" in varlist: + self.__loops = varlist["loops"] + else: + raise Exception('loops param inside Qram must be defined') self._testMethodDoc = testname def execute(self): - str_cmd= "free -m" - free_command = SysCommand("free_ram", str_cmd) - if free_command.execute() == 0: - self.__raw_out = free_command.getOutput() - if self.__raw_out == "": - return -1 - lines = free_command.getOutput().splitlines() - aux = [int(s) for s in lines[1].split() if s.isdigit()] - self.failUnless(int(aux[0])>int(self.__memSize),"failed: total ram memory size lower than expected") - else: - self.fail("failed: could not complete iperf command") + try: + sh.memtester(self.__memSize, "1") + except sh.ErrorReturnCode as e: + self.fail("failed: could not complete memtester command") diff --git a/test-cli/test/tests/qrtc.py b/test-cli/test/tests/qrtc.py index 1d02f78..8e31572 100644 --- a/test-cli/test/tests/qrtc.py +++ b/test-cli/test/tests/qrtc.py @@ -4,9 +4,14 @@ import time class Qrtc(unittest.TestCase): - def __init__(self, testname, testfunc, rtc): + __rtc = "/dev/rtc0" + + def __init__(self, testname, testfunc, varlist): super(Qrtc, self).__init__(testfunc) - self.__rtc = rtc + if "rtc" in varlist: + self.__rtc = varlist["rtc"] + else: + raise Exception('rtc param inside Qrtc must be defined') self._testMethodDoc = testname def execute(self): diff --git a/test-cli/test/tests/qscreen.py b/test-cli/test/tests/qscreen.py index b8a5a48..87e53b2 100644 --- a/test-cli/test/tests/qscreen.py +++ b/test-cli/test/tests/qscreen.py @@ -5,9 +5,13 @@ from test.helpers.cv_display_test import pattern_detect class Qscreen(unittest.TestCase): - def __init__(self, testname, testfunc, display): + #varlist: display + def __init__(self, testname, testfunc, varlist): super(Qscreen, self).__init__(testfunc) - self.__display = display + if "display" in varlist: + self.__display = varlist["display"] + else: + raise Exception('display param inside Qscreen have been be defined') self._testMethodDoc = testname def execute(self): @@ -17,15 +21,5 @@ class Qscreen(unittest.TestCase): test_screen = pattern_detect(1) if not test_screen=="0": self.fail("failed: {}".format(test_screen)) - str_cmd= "fbi -T 1 /home/root/result_hdmi_img.jpg -d /dev/fb0 --noverbose -a" - show_img = SysCommand("show-image", str_cmd) - show_img.execute() else: self.fail("failed: could not display the image") - try: - str_cmd= "fbi -T 1 /home/root/result_hdmi_img.jpg -d /dev/fb0 --noverbose -a" - show_img = SysCommand("show-image", str_cmd) - show_img.execute() - except ValueError: - print("COULD NOT DISPLAY IMAGE") - diff --git a/test-cli/test/tests/qserial.py b/test-cli/test/tests/qserial.py index 43ba3c8..d3e2ee6 100644 --- a/test-cli/test/tests/qserial.py +++ b/test-cli/test/tests/qserial.py @@ -6,11 +6,17 @@ import time class Qserial(unittest.TestCase): - def __init__(self, testname, testfunc, port, baudrate): + def __init__(self, testname, testfunc, varlist): super(Qserial, self).__init__(testfunc) - self.__port = port + if "port" in varlist: + self.__port = varlist["port"] + else: + raise Exception('port param inside Qserial must be defined') self.__serial = serial.Serial(self.__port, timeout=1) - self.__serial.baudrate = baudrate + if "baudrate" in varlist: + self.__baudrate = varlist["baudrate"] + else: + raise Exception('baudrate param inside Qserial must be defined') self._testMethodDoc = testname def __del__(self): @@ -19,12 +25,15 @@ class Qserial(unittest.TestCase): def execute(self): self.__serial.flushInput() self.__serial.flushOutput() + #generate a random uuid test_uuid = str(uuid.uuid4()).encode() + #send the uuid through serial port self.__serial.write(test_uuid) time.sleep(0.05) # there might be a small delay if self.__serial.inWaiting() == 0: self.fail("failed: PORT {} wait timeout exceded, wrong communication?".format(self.__port)) else: + # check if what it was sent is equal to what has been received if (self.__serial.readline() != test_uuid): self.fail("failed: PORT {} write/read mismatch".format(self.__port)) diff --git a/test-cli/test/tests/qusb.py b/test-cli/test/tests/qusb.py index 44490bc..0390143 100644 --- a/test-cli/test/tests/qusb.py +++ b/test-cli/test/tests/qusb.py @@ -3,17 +3,20 @@ import unittest class Qusb(unittest.TestCase): - def __init__(self, testname, testfunc, devLabel, numPorts): + def __init__(self, testname, testfunc, varlist): super(Qusb, self).__init__(testfunc) - self.__numPorts = numPorts + if "numPorts" in varlist: + self.__numPorts = varlist["numPorts"] + else: + raise Exception('numPorts param inside Qusb must be defined') self._testMethodDoc = testname - self.__devLabel = devLabel + if "devLabel" in varlist: + self.__devLabel = varlist["devLabel"] + else: + raise Exception('devLabel param inside Qusb must be defined') if testname=="USBOTG": self.__usbFileName = "/this_is_an_usb_otg" self.__usbtext = "USBOTG" - elif testname=="SATA": - self.__usbFileName = "/this_is_a_sata" - self.__usbtext = "SATA" else: self.__usbFileName = "/this_is_an_usb_host" self.__usbtext = "USBHOST" @@ -57,4 +60,4 @@ class Qusb(unittest.TestCase): else: self.fail("failed: reference and real usb host devices number mismatch") else: - self.fail("failed: couldn't execute lsblk command") + self.fail("failed: couldn't execute lsblk command") \ No newline at end of file diff --git a/test-cli/test/tests/qwifi.py b/test-cli/test/tests/qwifi.py index d08149b..188e300 100644 --- a/test-cli/test/tests/qwifi.py +++ b/test-cli/test/tests/qwifi.py @@ -1,45 +1,33 @@ from test.helpers.syscmd import SysCommand import unittest -import subprocess class Qwifi(unittest.TestCase): - def __init__(self, testname, testfunc, signal): + def __init__(self, testname, testfunc, varlist): super(Qwifi, self).__init__(testfunc) - self.__signal = signal + if "signal" in varlist: + self.__signal = varlist["signal"] + else: + raise Exception('signal param inside Qwifi must be defined') self._testMethodDoc = testname - # WiFi SERVERIP fixed at the moment. - self._serverip = "192.168.5.1" def execute(self): - # First check connection with the wifi server using ping command - #str_cmd = "ping -c 1 {} > /dev/null".format(self._serverip) - #wifi_ping = SysCommand("wifi_ping", str_cmd) - #wifi_ping.execute() - #res = subprocess.call(['ping', '-c', '1', self._serverip]) - p = subprocess.Popen(['ping','-c','1',self._serverip,'-W','2'],stdout=subprocess.PIPE) - p.wait() - res=p.poll() - if res == 0: - str_cmd= "iw wlan0 link" - wifi_stats = SysCommand("wifi_stats", str_cmd) - if wifi_stats.execute() == 0: - w_stats = wifi_stats.getOutput().decode('ascii').splitlines() - #self._longMessage = str(self.__raw_out).replace("'", "") - if not w_stats[0] == "Not connected.": - signal_st = w_stats[5].split(" ")[1] - try: - int(signal_st) - if -1*int(signal_st) > int(self.__signal): - self.fail("failed: signal to server lower than expected") - except ValueError: - self.fail("failed: error output (Bad connection?)") - else: + str_cmd= "iw wlan0 link" + wifi_stats = SysCommand("wifi_stats", str_cmd) + if wifi_stats.execute() == 0: + w_stats = wifi_stats.getOutput().decode('ascii').splitlines() + #self._longMessage = str(self.__raw_out).replace("'", "") + if not w_stats[0] == "Not connected.": + signal_st = w_stats[5].split(" ")[1] + try: + int(signal_st) + if -1*int(signal_st) > int(self.__signal): + self.fail("failed: signal to server lower than expected") + except ValueError: self.fail("failed: error output (Bad connection?)") - #tx_brate = float(w_stats[6].split(" ")[2]) else: - self.fail("failed: couldn't execute iw command") + self.fail("failed: error output (Bad connection?)") + #tx_brate = float(w_stats[6].split(" ")[2]) else: - self.fail("failed: ping to server {}".format(self._serverip)) - + self.fail("failed: couldn't execute iw command") diff --git a/test-cli/test_main.py b/test-cli/test_main.py index 3c4d1cb..a44b7d1 100644 --- a/test-cli/test_main.py +++ b/test-cli/test_main.py @@ -6,6 +6,7 @@ import sys import os import unittest from test.helpers.testsrv_db import TestSrv_Database +from test.helpers.setup_xml import XMLSetup from test.runners.simple import SimpleTestRunner from test.tests.qbutton import Qbutton from test.helpers.syscmd import TestSysCommand @@ -24,8 +25,17 @@ from test.tests.qrtc import Qrtc from test.tests.qduplex_ser import Qduplex from test.tests.qamp import Qamp from test.tests.qflash import Qflasher -from test.helpers.finisher import Finisher +from test.tests.qnand import Qnand from test.helpers.globalVariables import globalVar +import socket +import re +from test.helpers.iseelogger import ISEE_Logger +import logging + + +psdbObj = TestSrv_Database() +xmlObj = None +loggerObj = None # define clear function def clear(): @@ -34,85 +44,82 @@ def clear(): def create_board(): - psdb = TestSrv_Database() - psdb.open("setup.xml") - tree = XMLParser.parse('setup.xml') - root = tree.getroot() - suite = unittest.TestSuite() - for element in root.iter('board'): - # print(str(element.tag) + str(element.attrib)) - model_id = element.attrib['model'] - variant = element.attrib['variant'] - nstation = element.attrib['station'] + model_id = xmlObj.gettagKey('board', 'model') + variant = xmlObj.gettagKey('board', 'variant') + + # get model id globalVar.g_mid=model_id + "-" + variant - globalVar.station=nstation - processor_id=genDieid(globalVar.g_mid) + + # get station number + hstname = socket.gethostname() + if (re.search("^Station\d{2}$", hstname)): + globalVar.station = int(re.search("\d{2}", hstname).group(0)) + else: + raise Exception('Station number not configured') + exit(3) + + processor_id = genDieid(globalVar.g_mid) print(globalVar.g_mid) print(processor_id) - globalVar.g_uuid = psdb.create_board(processor_id, model_id, variant, bmac = None) + s = globalVar.g_uuid = psdbObj.create_board(processor_id, model_id, variant, bmac = None) + +def createvarlist(testvars): + varlist = {} + for row in testvars: + varname,testparam = row + varlist[varname] = testparam + return varlist + def testsuite(): - psdb=TestSrv_Database() - psdb.open("setup.xml") suite = unittest.TestSuite() - tests=psdb.getboard_comp_test_list(globalVar.g_uuid) - for i in range(len(tests)): - #newstr = oldstr.replace("M", "") - variables=str(tests[i][0]).split(",") - testname=variables[0].replace('(', '') - testdes=variables[1] - testfunc=variables[2] - if len(tests)>2: - testparam=variables[3].replace(')', '') - testparam = testparam.replace('"', '') - testparam = testparam.replace(';', "','") - if testparam == "": - command = "suite.addTest({}('{}','execute'))".format(testfunc, testname) - else: - command="suite.addTest({}('{}','execute','{}'))".format(testfunc,testname,testparam) - else: - print(testname) - command = "suite.addTest({}('{}','execute'))".format(testfunc, testname) + tests = psdbObj.getboard_comp_test_list(globalVar.g_uuid) + for row in tests: + testdefname,testfunc = row + testvars = psdbObj.getboard_test_variables(globalVar.g_uuid, testdefname) + varlist = createvarlist(testvars) + command = "suite.addTest({}('{}','execute', varlist))".format(testfunc, testdefname) exec(command) - globalVar.testid_ctl=psdb.open_testbatch(globalVar.g_uuid) + + globalVar.testid_ctl=psdbObj.open_testbatch(globalVar.g_uuid) return suite def finish_test(): - psdb = TestSrv_Database() - psdb.open("setup.xml") - auxs = psdb.close_testbatch(globalVar.g_uuid, globalVar.testid_ctl) - globalVar.fstatus = auxs[0][0] - # Burn eeprom struct - psdb = TestSrv_Database() - psdb.open("setup.xml") - # We should call getboard_eeprom only if test was ok - if globalVar.fstatus: - aux = psdb.getboard_eeprom(globalVar.g_uuid) - finish = Finisher(aux) - finish.end_ok() - else: - finish = Finisher(globalVar.g_uuid) - finish.end_fail() - # Update set_test current_test with 'END' so that it finally gets painted in green - psdb = TestSrv_Database() - psdb.open("setup.xml") - psdb.update_set_test_row(globalVar.station, globalVar.testid_ctl, globalVar.g_uuid, "END","FINISH") + psdbObj.close_testbatch(globalVar.g_uuid, globalVar.testid_ctl) + # Update Set Test status with FINISH so that status column is not modified because close_testbatch already did it. + psdbObj.update_set_test_row(globalVar.station, globalVar.testid_ctl, globalVar.g_uuid, 'END', 'FINISH') def main(): - #addtesttomodel() - #addtestdef() create_board() - #globalVar.g_uuid = "1f59c654-0cc6-11e8-8d51-e644f56b8edd" try: os.remove("test_results.dat") except: pass - runner = SimpleTestRunner() + runner = SimpleTestRunner(psdbObj) runner.run(testsuite()) finish_test() if __name__ == "__main__": + # Clear the shell screen clear() - main() + + # create logger + loggerObj = ISEE_Logger(logging.INFO) + # logger = loggerObj.getlogger().info("Starting test script...") + + # Try to parse the setup.xml file + try: + xmlObj = XMLSetup("setup.xml") + except: + print("Error: Cannot parse setup.xml file") + exit(1) + + # Try to connect to the DB, according to setup.xml configuration + if psdbObj.open(xmlObj): + main() + else: + print("Error: Cannot open DB connection") + exit(2) + -- cgit v1.1 From a4d7e279731b4b2aa6fb825b096305556d1e825a Mon Sep 17 00:00:00 2001 From: Manel Caro Date: Wed, 4 Mar 2020 19:37:53 +0100 Subject: pycharm project --- .../.idea/inspectionProfiles/profiles_settings.xml | 6 +++ test-cli/.idea/misc.xml | 7 ++++ test-cli/.idea/modules.xml | 8 ++++ test-cli/.idea/test-cli.iml | 12 ++++++ test-cli/.idea/vcs.xml | 6 +++ test-cli/.idea/workspace.xml | 46 ++++++++++++++++++++++ 6 files changed, 85 insertions(+) create mode 100644 test-cli/.idea/inspectionProfiles/profiles_settings.xml create mode 100644 test-cli/.idea/misc.xml create mode 100644 test-cli/.idea/modules.xml create mode 100644 test-cli/.idea/test-cli.iml create mode 100644 test-cli/.idea/vcs.xml create mode 100644 test-cli/.idea/workspace.xml diff --git a/test-cli/.idea/inspectionProfiles/profiles_settings.xml b/test-cli/.idea/inspectionProfiles/profiles_settings.xml new file mode 100644 index 0000000..105ce2d --- /dev/null +++ b/test-cli/.idea/inspectionProfiles/profiles_settings.xml @@ -0,0 +1,6 @@ + + + + \ No newline at end of file diff --git a/test-cli/.idea/misc.xml b/test-cli/.idea/misc.xml new file mode 100644 index 0000000..2012045 --- /dev/null +++ b/test-cli/.idea/misc.xml @@ -0,0 +1,7 @@ + + + + + + \ No newline at end of file diff --git a/test-cli/.idea/modules.xml b/test-cli/.idea/modules.xml new file mode 100644 index 0000000..fc2e93a --- /dev/null +++ b/test-cli/.idea/modules.xml @@ -0,0 +1,8 @@ + + + + + + + + \ No newline at end of file diff --git a/test-cli/.idea/test-cli.iml b/test-cli/.idea/test-cli.iml new file mode 100644 index 0000000..8a05c6e --- /dev/null +++ b/test-cli/.idea/test-cli.iml @@ -0,0 +1,12 @@ + + + + + + + + + + \ No newline at end of file diff --git a/test-cli/.idea/vcs.xml b/test-cli/.idea/vcs.xml new file mode 100644 index 0000000..6c0b863 --- /dev/null +++ b/test-cli/.idea/vcs.xml @@ -0,0 +1,6 @@ + + + + + + \ No newline at end of file diff --git a/test-cli/.idea/workspace.xml b/test-cli/.idea/workspace.xml new file mode 100644 index 0000000..25fe244 --- /dev/null +++ b/test-cli/.idea/workspace.xml @@ -0,0 +1,46 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + 1583340556211 + + + + + + \ No newline at end of file -- cgit v1.1 From 7490324bc98248fc82be814920e2deff4114acc8 Mon Sep 17 00:00:00 2001 From: Manel Caro Date: Wed, 4 Mar 2020 19:38:10 +0100 Subject: Added station --- test-cli/test/helpers/testsrv_db.py | 4 ++-- test-cli/test_main.py | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/test-cli/test/helpers/testsrv_db.py b/test-cli/test/helpers/testsrv_db.py index f08cb17..7ed02f3 100644 --- a/test-cli/test/helpers/testsrv_db.py +++ b/test-cli/test/helpers/testsrv_db.py @@ -119,9 +119,9 @@ class TestSrv_Database(object): #print(r) return None - def open_testbatch(self, board_uuid): + def open_testbatch(self, board_uuid, station): '''get the board test list''' - sql = "SELECT isee.open_testbatch('{}')".format(board_uuid) + sql = "SELECT * FROM isee.open_testbatch('{}','{}')".format(board_uuid, station) #print('>>>' + sql) try: res = str(self.__sqlObject.db_execute_query(sql)[0]) diff --git a/test-cli/test_main.py b/test-cli/test_main.py index a44b7d1..6680082 100644 --- a/test-cli/test_main.py +++ b/test-cli/test_main.py @@ -61,7 +61,7 @@ def create_board(): processor_id = genDieid(globalVar.g_mid) print(globalVar.g_mid) print(processor_id) - s = globalVar.g_uuid = psdbObj.create_board(processor_id, model_id, variant, bmac = None) + globalVar.g_uuid = psdbObj.create_board(processor_id, model_id, variant, bmac = None) def createvarlist(testvars): varlist = {} @@ -81,7 +81,7 @@ def testsuite(): command = "suite.addTest({}('{}','execute', varlist))".format(testfunc, testdefname) exec(command) - globalVar.testid_ctl=psdbObj.open_testbatch(globalVar.g_uuid) + globalVar.testid_ctl=psdbObj.open_testbatch(globalVar.g_uuid, globalVar.station) return suite -- cgit v1.1 From 23e0cea58ba6ac5357db507b0ac7f8dfcf223326 Mon Sep 17 00:00:00 2001 From: Manel Caro Date: Wed, 4 Mar 2020 19:38:24 +0100 Subject: Modify typos --- test-cli/test/helpers/get_dieid.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/test-cli/test/helpers/get_dieid.py b/test-cli/test/helpers/get_dieid.py index 029ddb5..48e724e 100644 --- a/test-cli/test/helpers/get_dieid.py +++ b/test-cli/test/helpers/get_dieid.py @@ -1,8 +1,11 @@ import mmap import os import struct + MAP_MASK = mmap.PAGESIZE - 1 WORD = 4 + + def read(addr): """ Read from any location in memory Returns the readed value in hexadecimal format @@ -30,8 +33,8 @@ def getRegisters(model): return registers def genDieid(modelid): - registers=getRegisters(modelid) - id="" + registers = getRegisters(modelid) + id = "" for i in range(len(registers)): id=id+(read(registers[i])) return id -- cgit v1.1 From a615dac03a9ea7897bf3947ba8d31278b2cea121 Mon Sep 17 00:00:00 2001 From: Hector Fernandez Date: Thu, 5 Mar 2020 16:29:31 +0100 Subject: Modify tests. --- test-cli/setup.xml | 2 +- test-cli/test/tests/qeeprom.py | 62 +++++++++++++++------------- test-cli/test/tests/qethernet.py | 24 +++++++---- test-cli/test/tests/qwifi.py | 88 +++++++++++++++++++++++++++++++--------- test-cli/test_main.py | 30 ++++++-------- 5 files changed, 132 insertions(+), 74 deletions(-) diff --git a/test-cli/setup.xml b/test-cli/setup.xml index 3feffd0..db9e7db 100644 --- a/test-cli/setup.xml +++ b/test-cli/setup.xml @@ -2,7 +2,7 @@ - + diff --git a/test-cli/test/tests/qeeprom.py b/test-cli/test/tests/qeeprom.py index 2bab248..5bb0755 100644 --- a/test-cli/test/tests/qeeprom.py +++ b/test-cli/test/tests/qeeprom.py @@ -1,38 +1,44 @@ -from test.helpers.syscmd import SysCommand +import sh import unittest -import uuid + class Qeeprom(unittest.TestCase): + __position = None + __uuid = None + __eeprompath = None + + # varlist: position, uuid, eeprompath def __init__(self, testname, testfunc, varlist): super(Qeeprom, self).__init__(testfunc) self._testMethodDoc = testname + if "position" in varlist: + self.__position = varlist["position"] + else: + raise Exception('position param inside Qeeprom must be defined') + if "uuid" in varlist: + self.__uuid = varlist["uuid"] + else: + raise Exception('uuid param inside Qeeprom must be defined') + if "eeprompath" in varlist: + self.__eeprompath = varlist["eeprompath"] + else: + raise Exception('eeprompath param inside Qeeprom must be defined') def execute(self): - str_cmd = "find /sys/ -iname 'eeprom'" - eeprom_location = SysCommand("eeprom_location", str_cmd) - if eeprom_location.execute() == 0: - self.__raw_out = eeprom_location.getOutput() - if self.__raw_out == "": - self.fail("Unable to get EEPROM location. IS EEPROM CONNECTED?") - return -1 - eeprom=self.__raw_out.decode('ascii') - test_uuid = uuid.uuid4() - str_cmd="echo '{}' > {}".format(str(test_uuid), eeprom) - eeprom_write = SysCommand("eeprom_write", str_cmd) - if eeprom_write.execute() == 0: - self.__raw_out = eeprom_write.getOutput() - if self.__raw_out == "": - self.fail("Unable to write on the EEPROM?") - return -1 - str_cmd = "head -2 {}".format(eeprom) - eeprom_read = SysCommand("eeprom_read", str_cmd) - if eeprom_read.execute() == 0: - self.__raw_out = eeprom_read.getOutput() - if self.__raw_out == "": - self.fail("Unable to read from the EEPROM?") - return -1 - if(str(self.__raw_out).find(str(test_uuid)) == -1): - self.fail("failed: READ/WRITE mismatch") + # write data into the eeprom + p = sh.dd(sh.echo(self._uuid), "of=" + self.__eeprompath, "bs=1", "seek=" + self.__position) + if p.exit_code == 0: + # read data from the eeprom + p = sh.dd("if=" + self.__eeprompath, "bs=1", "skip=" + self.__position, "count=" + len(self.__uuid)) + if p.exit_code == 0: + uuid_rx = p.stdout.decode('ascii') + # compare both values + if (uuid_rx != self.__uuid): + self.fail("failed: mismatch between written and received values.") + + else: + self.fail("failed: Unable to read from the EEPROM device.") else: - self.fail("failed: could not complete find eeprom command") + self.fail("failed: Unable to write on the EEPROM device.") + diff --git a/test-cli/test/tests/qethernet.py b/test-cli/test/tests/qethernet.py index be984f5..22d796c 100644 --- a/test-cli/test/tests/qethernet.py +++ b/test-cli/test/tests/qethernet.py @@ -1,12 +1,14 @@ import unittest import sh -import ex +import re + class Qethernet(unittest.TestCase): __sip = None __numbytestx = None __bind = None __OKBW = None + __port = None def __init__(self, testname, testfunc, varlist): super(Qethernet, self).__init__(testfunc) @@ -22,16 +24,21 @@ class Qethernet(unittest.TestCase): self.__OKBW = varlist["OKBW"] else: raise Exception('OKBW param inside Qethernet must be defined') + if "port" in varlist: + self.__port = varlist["port"] + else: + raise Exception('port param inside Qethernet must be defined') self.__numbytestx = "10M" self._testMethodDoc = testname def execute(self): # execute iperf command against the server if self.__bind is None: - p = sh.iperf("-c", self.__sip, "-x", "CMSV", "-n", self.__numbytestx, "-f", "m") + p = sh.iperf("-c", self.__sip, "-x", "CMSV", "-n", self.__numbytestx, "-f", "m", "-p", self.__port) else: - p = sh.iperf("-c", self.__sip, "-x", "CMSV", "-n", self.__numbytestx, "-f", "m", "-B", self.__bind) - #check if it was executed succesfully + p = sh.iperf("-c", self.__sip, "-x", "CMSV", "-n", self.__numbytestx, "-f", "m", "-p", self.__port, "-B", + self.__bind) + # check if it was executed succesfully if p.exit_code == 0: if p.stdout == "": self.fail("failed: error executing iperf command") @@ -41,11 +48,12 @@ class Qethernet(unittest.TestCase): # get first line dat = lines[1].decode('ascii') # search for the BW value - a = re.search("\d+(\.\d)? Mbits/sec",dat) - b = a.group().split( ) - BWreal = b[0] + a = re.search("\d+(\.\d)? Mbits/sec", dat) + b = a.group().split() + bwreal = b[0] # check if BW is in the expected range - self.failUnless(float(BWreal)>float(self.__OKBW)*0.9,"failed: speed is lower than spected. Speed(MB/s)" + str(BWreal)) + self.failUnless(float(bwreal) > float(self.__OKBW) * 0.9, + "failed: speed is lower than spected. Speed(MB/s)" + str(bwreal)) else: self.fail("failed: could not complete iperf command") diff --git a/test-cli/test/tests/qwifi.py b/test-cli/test/tests/qwifi.py index 188e300..154dd52 100644 --- a/test-cli/test/tests/qwifi.py +++ b/test-cli/test/tests/qwifi.py @@ -1,33 +1,81 @@ -from test.helpers.syscmd import SysCommand import unittest +import sh +import re + class Qwifi(unittest.TestCase): + __sip = None + __numbytestx = None + __bind = None + __OKBW = None + __port = None + #varlist: sip, bind, OKBW, port def __init__(self, testname, testfunc, varlist): super(Qwifi, self).__init__(testfunc) - if "signal" in varlist: - self.__signal = varlist["signal"] + if "sip" in varlist: + self.__sip = varlist["sip"] + else: + raise Exception('sip param inside Qwifi have been be defined') + if "OKBW" in varlist: + self.__OKBW = varlist["OKBW"] + else: + raise Exception('OKBW param inside Qwifi must be defined') + if "port" in varlist: + self.__port = varlist["port"] else: - raise Exception('signal param inside Qwifi must be defined') + raise Exception('port param inside Qwifi must be defined') + if "bind" in varlist: + self.__bind = varlist["bind"] + else: + self.__bind = None + self.__numbytestx = "10M" self._testMethodDoc = testname def execute(self): - str_cmd= "iw wlan0 link" - wifi_stats = SysCommand("wifi_stats", str_cmd) - if wifi_stats.execute() == 0: - w_stats = wifi_stats.getOutput().decode('ascii').splitlines() - #self._longMessage = str(self.__raw_out).replace("'", "") - if not w_stats[0] == "Not connected.": - signal_st = w_stats[5].split(" ")[1] - try: - int(signal_st) - if -1*int(signal_st) > int(self.__signal): - self.fail("failed: signal to server lower than expected") - except ValueError: - self.fail("failed: error output (Bad connection?)") + # check if the board is connected to the router by wifi + p = sh.iw("wlan0", "link") + if p.exit_code == 0: + # get the first line of the output stream + out1 = p.stdout.decode('ascii').splitlines()[0] + if out1 != "Not connected.": + #check if the board has ip in the wlan0 interface + p = sh.ifconfig("wlan0") + if p.exit_code == 0: + result = re.search("inet addr:(?!127\.0{1,3}\.0{1,3}\.0{0,2}1$)((25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.){3}(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)", + p.stdout) + if result: + # execute iperf command against the server + if self.__bind is None: + p = sh.iperf("-c", self.__sip, "-x", "CMSV", "-n", self.__numbytestx, "-f", "m", "-p", + self.__port) + else: + p = sh.iperf("-c", self.__sip, "-x", "CMSV", "-n", self.__numbytestx, "-f", "m", "-p", + self.__port, "-B", self.__bind) + # check if it was executed succesfully + if p.exit_code == 0: + if p.stdout == "": + self.fail("failed: error executing iperf command") + # analyze output string + # split by lines + lines = p.stdout.splitlines() + # get first line + dat = lines[1].decode('ascii') + # search for the BW value + a = re.search("\d+(\.\d)? Mbits/sec", dat) + b = a.group().split() + bwreal = b[0] + + # check if BW is in the expected range + self.failUnless(float(bwreal) > float(self.__OKBW) * 0.9, + "failed: speed is lower than spected. Speed(MB/s)" + str(bwreal)) + else: + self.fail("failed: could not complete iperf command") + else: + self.fail("failed: wlan0 interface doesn't have any ip address.") + else: + self.fail("failed: could not complete ifconfig command.") else: - self.fail("failed: error output (Bad connection?)") - #tx_brate = float(w_stats[6].split(" ")[2]) + self.fail("failed: wifi module is not connected to the router.") else: self.fail("failed: couldn't execute iw command") - diff --git a/test-cli/test_main.py b/test-cli/test_main.py index 6680082..b657f00 100644 --- a/test-cli/test_main.py +++ b/test-cli/test_main.py @@ -32,15 +32,15 @@ import re from test.helpers.iseelogger import ISEE_Logger import logging - psdbObj = TestSrv_Database() xmlObj = None loggerObj = None + # define clear function def clear(): # check and make call for specific operating system - _ = call('clear' if os.name =='posix' else 'cls') + _ = call('clear' if os.name == 'posix' else 'cls') def create_board(): @@ -48,25 +48,21 @@ def create_board(): variant = xmlObj.gettagKey('board', 'variant') # get model id - globalVar.g_mid=model_id + "-" + variant + globalVar.g_mid = model_id + "-" + variant - # get station number - hstname = socket.gethostname() - if (re.search("^Station\d{2}$", hstname)): - globalVar.station = int(re.search("\d{2}", hstname).group(0)) - else: - raise Exception('Station number not configured') - exit(3) + # get station + globalVar.station = socket.gethostname() processor_id = genDieid(globalVar.g_mid) print(globalVar.g_mid) print(processor_id) - globalVar.g_uuid = psdbObj.create_board(processor_id, model_id, variant, bmac = None) + globalVar.g_uuid = psdbObj.create_board(processor_id, model_id, variant, bmac=None) + def createvarlist(testvars): varlist = {} for row in testvars: - varname,testparam = row + varname, testparam = row varlist[varname] = testparam return varlist @@ -75,13 +71,13 @@ def testsuite(): suite = unittest.TestSuite() tests = psdbObj.getboard_comp_test_list(globalVar.g_uuid) for row in tests: - testdefname,testfunc = row + testdefname, testfunc = row testvars = psdbObj.getboard_test_variables(globalVar.g_uuid, testdefname) varlist = createvarlist(testvars) command = "suite.addTest({}('{}','execute', varlist))".format(testfunc, testdefname) exec(command) - - globalVar.testid_ctl=psdbObj.open_testbatch(globalVar.g_uuid, globalVar.station) + + globalVar.testid_ctl = psdbObj.open_testbatch(globalVar.g_uuid, globalVar.station) return suite @@ -90,6 +86,7 @@ def finish_test(): # Update Set Test status with FINISH so that status column is not modified because close_testbatch already did it. psdbObj.update_set_test_row(globalVar.station, globalVar.testid_ctl, globalVar.g_uuid, 'END', 'FINISH') + def main(): create_board() try: @@ -120,6 +117,5 @@ if __name__ == "__main__": if psdbObj.open(xmlObj): main() else: - print("Error: Cannot open DB connection") + print("Error: Cannot open DB connection") exit(2) - -- cgit v1.1 From dd9ffc52507c391271d0821636c683fd7562b46a Mon Sep 17 00:00:00 2001 From: Hector Fernandez Date: Thu, 5 Mar 2020 18:50:47 +0100 Subject: Modified postgre functions. --- test-cli/test/helpers/testsrv_db.py | 155 ++++++++++-------------------------- test-cli/test/runners/simple.py | 6 -- test-cli/test/tests/qeeprom.py | 17 ++-- test-cli/test_main.py | 106 +++++++++++++++--------- 4 files changed, 119 insertions(+), 165 deletions(-) diff --git a/test-cli/test/helpers/testsrv_db.py b/test-cli/test/helpers/testsrv_db.py index 7ed02f3..b7b75b1 100644 --- a/test-cli/test/helpers/testsrv_db.py +++ b/test-cli/test/helpers/testsrv_db.py @@ -1,10 +1,10 @@ from test.helpers.psqldb import PgSQLConnection -from test.helpers.setup_xml import XMLSetup -def find_between( s, first, last ): + +def find_between(s, first, last): try: - start = s.index( first ) + len( first ) - end = s.index( last, start ) + start = s.index(first) + len(first) + end = s.index(last, start) return s[start:end] except ValueError: return "" @@ -19,157 +19,90 @@ class TestSrv_Database(object): def __init__(self): pass - def open (self, xmlObj): - self.__xml_setup = xmlObj; + def open(self, xmlObj): + self.__xml_setup = xmlObj self.__sqlObject = PgSQLConnection() return self.__sqlObject.db_connect(self.__xml_setup.getdbConnectionStr()) - - def create_board(self, processor_id, model_id, variant, bmac = None): + def create_board(self, processor_id, model_id, variant, station, bmac=None): '''create a new board''' if bmac is None: - sql = "SELECT isee.create_board('{}', '{}', '{}', NULL);".format(processor_id, model_id, variant) + sql = "SELECT isee.f_create_board('{}', '{}', '{}', NULL, '{}');".format(processor_id, model_id, variant, + station) else: - sql = "SELECT isee.create_board('{}', '{}', '{}', '{}');".format(processor_id, model_id, variant, bmac) - #print('>>>' + sql) - try: - res = self.__sqlObject.db_execute_query(sql) - #print(res) - return res[0][0]; - except Exception as err: - r = find_between(str(err), '#', '#') - #print(r) - return None - - def create_model(self, modid, variant, descr, tgid): - '''create new model''' - sql = "SELECT isee.create_model('{}', '{}', '{}', '{}')".format(modid, variant, descr, tgid) - #print('>>>' + sql) - try: - res = self.__sqlObject.db_execute_query(sql) - #print(res) - return res[0][0]; - except Exception as err: - r = find_between(str(err), '#', '#') - #print(r) - return None - - def create_test_definition(self, testname, testdesc, testfunc): - '''Create a new definition and return definition id on fail (testname already exist) return -1''' - sql = "SELECT isee.define_test('{}', '{}', '{}')".format(testname, testdesc, testfunc) - #print('>>>' + sql) - try: - res = self.__sqlObject.db_execute_query(sql) - #print(res) - return res[0][0]; - except Exception as err: - r = find_between(str(err), '#', '#') - #print(r) - return None - - def add_testdef_to_group(self, testgroupid, testname, testparam): - '''Assign definition to group test return true on success or false if it fails''' - sql = "SELECT isee.add_test_to_group('{}', '{}', '{}')".format(testgroupid, testname, testparam) - #print('>>>' + sql) + sql = "SELECT isee.f_create_board('{}', '{}', '{}', '{}', '{}');".format(processor_id, model_id, variant, + bmac, station) + # print('>>>' + sql) try: res = self.__sqlObject.db_execute_query(sql) - #print(res) - return res[0][0]; + # print(res) + return res[0][0] except Exception as err: r = find_between(str(err), '#', '#') - #print(r) + # print(r) return None - def getboard_test_list(self, board_uuid): + def get_tests_list(self, board_uuid): '''get the board test list''' - sql = "SELECT isee.gettestlist('{}')".format(board_uuid) - #print('>>>' + sql) + sql = "SELECT * FROM isee.f_get_tests_list('{}')".format(board_uuid) + # print('>>>' + sql) try: res = self.__sqlObject.db_execute_query(sql) - #print(res) + # print(res) return res; except Exception as err: r = find_between(str(err), '#', '#') - #print(r) + # print(r) return None - def getboard_comp_test_list(self, board_uuid): + def get_test_params_list(self, testid): '''get the board test list''' - sql = "SELECT * FROM isee.gettestcompletelist('{}')".format(board_uuid) - #print('>>>' + sql) + sql = "SELECT * FROM isee.f_get_test_params_list('{}')".format(testid) + # print('>>>' + sql) try: res = self.__sqlObject.db_execute_query(sql) - #print(res) - return res; + # print(res) + return res except Exception as err: r = find_between(str(err), '#', '#') - #print(r) + # print(r) return None - def getboard_test_variables(self, board_uuid, testdefid): + def open_test(self, board_uuid): '''get the board test list''' - sql = "SELECT * FROM isee.getboardtestvariables('{}', '{}')".format(board_uuid, testdefid) - #print('>>>' + sql) + sql = "SELECT * FROM isee.f_open_test('{}')".format(board_uuid) + # print('>>>' + sql) try: res = self.__sqlObject.db_execute_query(sql) - #print(res) - return res; - except Exception as err: - r = find_between(str(err), '#', '#') - #print(r) - return None - - def open_testbatch(self, board_uuid, station): - '''get the board test list''' - sql = "SELECT * FROM isee.open_testbatch('{}','{}')".format(board_uuid, station) - #print('>>>' + sql) - try: - res = str(self.__sqlObject.db_execute_query(sql)[0]) - res = res.replace('(', '') - res = res.replace(')', '') - res = res.replace(',', '') - #print(res) - return res; + # print(res) + return res except Exception as err: r = find_between(str(err), '#', '#') - #print(r) + # print(r) return None - def add_test_to_batch(self, board_uuid, testid, testid_ctl, result, groupid, data): + def run_test(self, testid_ctl, testid): '''get the board test list''' - sql = "SELECT isee.add_test_to_batch_c('{}','{}','{}','{}','{}','{}')".format(board_uuid, testid, testid_ctl, result, groupid, data) - #print('>>>' + sql) + sql = "SELECT * FROM isee.f_run_test('{}','{}')".format(testid_ctl, testid) + # print('>>>' + sql) try: res = self.__sqlObject.db_execute_query(sql) - #print(res) - return res; + # print(res) + return res except Exception as err: r = find_between(str(err), '#', '#') - #print(r) + # print(r) return None - def close_testbatch(self, board_uuid, testid_ctl): + def finish_test(self, testid_ctl, testid, newstatus): '''get the board test list''' - sql = "SELECT isee.close_testbatch('{}','{}')".format(board_uuid, testid_ctl) - #print('>>>' + sql) + sql = "SELECT * FROM isee.f_finish_test('{}','{}','{}')".format(testid_ctl, testid, newstatus) + # print('>>>' + sql) try: res = self.__sqlObject.db_execute_query(sql) - #print(res) - return res; + # print(res) + return res except Exception as err: r = find_between(str(err), '#', '#') - #print(r) + # print(r) return None - - def update_set_test_row(self, nstation, testid_ctl, board_uuid, ctest, cstatus): - '''get the board test list''' - sql = "SELECT isee.update_set_test_row('{}','{}','{}','{}','{}')".format(nstation, testid_ctl ,board_uuid, ctest, cstatus) - #print('>>>' + sql) - try: - res = self.__sqlObject.db_execute_query(sql) - #print(res) - return res; - except Exception as err: - r = find_between(str(err), '#', '#') - #print(r) - return None \ No newline at end of file diff --git a/test-cli/test/runners/simple.py b/test-cli/test/runners/simple.py index 3537d0e..084a5b8 100644 --- a/test-cli/test/runners/simple.py +++ b/test-cli/test/runners/simple.py @@ -89,12 +89,6 @@ class TextTestResult(unittest.TestResult): dbdata['name'] = test.shortDescription() dbdata['result'] = self.result dbdata['msg'] = str(test.longMessage) - #DB: INSERT IN THE DATABASE - filename='test_results.dat' - testResult = open(filename, 'a') - testResult.write(str(dbdata)) - testResult.write("\n") - testResult.close() #CONVERT FANCY FAIL AND PASS if self.result==self.PASS: simple_result="TRUE" if self.result == self.FAIL: simple_result = "FALSE" diff --git a/test-cli/test/tests/qeeprom.py b/test-cli/test/tests/qeeprom.py index 5bb0755..c2293c2 100644 --- a/test-cli/test/tests/qeeprom.py +++ b/test-cli/test/tests/qeeprom.py @@ -5,10 +5,9 @@ import unittest class Qeeprom(unittest.TestCase): __position = None - __uuid = None __eeprompath = None - # varlist: position, uuid, eeprompath + # varlist: position, eeprompath def __init__(self, testname, testfunc, varlist): super(Qeeprom, self).__init__(testfunc) self._testMethodDoc = testname @@ -16,25 +15,23 @@ class Qeeprom(unittest.TestCase): self.__position = varlist["position"] else: raise Exception('position param inside Qeeprom must be defined') - if "uuid" in varlist: - self.__uuid = varlist["uuid"] - else: - raise Exception('uuid param inside Qeeprom must be defined') if "eeprompath" in varlist: self.__eeprompath = varlist["eeprompath"] else: raise Exception('eeprompath param inside Qeeprom must be defined') def execute(self): + # generate some data + data_tx = "isee_test" # write data into the eeprom - p = sh.dd(sh.echo(self._uuid), "of=" + self.__eeprompath, "bs=1", "seek=" + self.__position) + p = sh.dd(sh.echo(data_tx), "of=" + self.__eeprompath, "bs=1", "seek=" + self.__position) if p.exit_code == 0: # read data from the eeprom - p = sh.dd("if=" + self.__eeprompath, "bs=1", "skip=" + self.__position, "count=" + len(self.__uuid)) + p = sh.dd("if=" + self.__eeprompath, "bs=1", "skip=" + self.__position, "count=" + len(data_tx)) if p.exit_code == 0: - uuid_rx = p.stdout.decode('ascii') + data_rx = p.stdout.decode('ascii') # compare both values - if (uuid_rx != self.__uuid): + if data_rx != data_tx: self.fail("failed: mismatch between written and received values.") else: diff --git a/test-cli/test_main.py b/test-cli/test_main.py index b657f00..8648b6b 100644 --- a/test-cli/test_main.py +++ b/test-cli/test_main.py @@ -10,8 +10,6 @@ from test.helpers.setup_xml import XMLSetup from test.runners.simple import SimpleTestRunner from test.tests.qbutton import Qbutton from test.helpers.syscmd import TestSysCommand -from test.helpers.syscmd import SysCommand -from test.tests.qiperf import QIperf from test.tests.qethernet import Qethernet from test.tests.qaudio import Qaudio from test.tests.qram import Qram @@ -28,7 +26,6 @@ from test.tests.qflash import Qflasher from test.tests.qnand import Qnand from test.helpers.globalVariables import globalVar import socket -import re from test.helpers.iseelogger import ISEE_Logger import logging @@ -43,6 +40,72 @@ def clear(): _ = call('clear' if os.name == 'posix' else 'cls') +def create_paramslist(params): + paramlist = {} + for row in params: + varname, varvalue = row + paramlist[varname] = varvalue + return paramlist + + +def add_test_task(suite, testdefname, paramlist): + testfunc = None + + if testdefname == "AUDIO": + suite.addTest(Qaudio(testdefname, "execute", paramlist)) + elif testdefname == "RAM": + suite.addTest(Qram(testdefname, "execute", paramlist)) + elif testdefname == "SERIALDUAL": + suite.addTest(Qduplex(testdefname, "execute", paramlist)) + elif testdefname == "EEPROM": + suite.addTest(Qeeprom(testdefname, "execute", paramlist)) + elif testdefname == "SERIAL": + suite.addTest(Qserial(testdefname, "execute", paramlist)) + # elif testdefname == "HDMI": + # suite.addTest(Qhdmi(testdefname, "execute", paramlist)) + elif testdefname == "BUTTON": + suite.addTest(Qbutton(testdefname, "execute", paramlist)) + elif testdefname == "RTC": + suite.addTest(Qrtc(testdefname, "execute", paramlist)) + elif testdefname == "CONSUMPTION": + suite.addTest(Qamp(testdefname, "execute", paramlist)) + # elif testdefname == "SATA": + # suite.addTest(Qsata(testdefname, "execute", paramlist)) + elif testdefname == "DMESG": + suite.addTest(TestSysCommand(testdefname, "execute", paramlist)) + elif testdefname == "ETHERNET": + suite.addTest(Qethernet(testdefname, "execute", paramlist)) + elif testdefname == "NAND": + suite.addTest(Qnand(testdefname, "execute", paramlist)) + elif testdefname == "I2C": + suite.addTest(Qi2c(testdefname, "execute", paramlist)) + elif testdefname == "WIFI": + suite.addTest(Qwifi(testdefname, "execute", paramlist)) + elif testdefname == "USB": + suite.addTest(Qusb(testdefname, "execute", paramlist)) + else: + raise Exception("Wrong testdefname") + + +def create_testsuite(): + # create an object TestSuite + suite = unittest.TestSuite() + # get list of tests for this board + tests = psdbObj.get_tests_list(globalVar.g_uuid) + # loop in every test for this board + for row in tests: + testid, testdefname = row + # get params for this test + params = psdbObj.get_test_params_list(testid) + paramlist = create_paramslist(params) + # add test to TestSuite + add_test_task(suite, testdefname, paramlist) + + globalVar.testid_ctl = psdbObj.open_test(globalVar.g_uuid) + + return suite + + def create_board(): model_id = xmlObj.gettagKey('board', 'model') variant = xmlObj.gettagKey('board', 'variant') @@ -56,46 +119,13 @@ def create_board(): processor_id = genDieid(globalVar.g_mid) print(globalVar.g_mid) print(processor_id) - globalVar.g_uuid = psdbObj.create_board(processor_id, model_id, variant, bmac=None) - - -def createvarlist(testvars): - varlist = {} - for row in testvars: - varname, testparam = row - varlist[varname] = testparam - return varlist - - -def testsuite(): - suite = unittest.TestSuite() - tests = psdbObj.getboard_comp_test_list(globalVar.g_uuid) - for row in tests: - testdefname, testfunc = row - testvars = psdbObj.getboard_test_variables(globalVar.g_uuid, testdefname) - varlist = createvarlist(testvars) - command = "suite.addTest({}('{}','execute', varlist))".format(testfunc, testdefname) - exec(command) - - globalVar.testid_ctl = psdbObj.open_testbatch(globalVar.g_uuid, globalVar.station) - return suite - - -def finish_test(): - psdbObj.close_testbatch(globalVar.g_uuid, globalVar.testid_ctl) - # Update Set Test status with FINISH so that status column is not modified because close_testbatch already did it. - psdbObj.update_set_test_row(globalVar.station, globalVar.testid_ctl, globalVar.g_uuid, 'END', 'FINISH') + globalVar.g_uuid = psdbObj.create_board(processor_id, model_id, variant, globalVar.station, bmac=None) def main(): create_board() - try: - os.remove("test_results.dat") - except: - pass runner = SimpleTestRunner(psdbObj) - runner.run(testsuite()) - finish_test() + runner.run(create_testsuite()) if __name__ == "__main__": -- cgit v1.1 From 98d40cecc9818360984188755e455aa53933aab0 Mon Sep 17 00:00:00 2001 From: Hector Fernandez Date: Fri, 6 Mar 2020 09:38:18 +0100 Subject: changed parameter names of tests --- test-cli/.idea/misc.xml | 2 +- test-cli/.idea/test-cli.iml | 2 +- test-cli/.idea/workspace.xml | 19 +++++++++++++++++-- test-cli/test/tests/qeeprom.py | 2 +- test-cli/test/tests/qethernet.py | 20 ++++++++++---------- test-cli/test/tests/qram.py | 12 ++++++------ test-cli/test_main.py | 2 ++ 7 files changed, 38 insertions(+), 21 deletions(-) diff --git a/test-cli/.idea/misc.xml b/test-cli/.idea/misc.xml index 2012045..3999087 100644 --- a/test-cli/.idea/misc.xml +++ b/test-cli/.idea/misc.xml @@ -3,5 +3,5 @@ - + \ No newline at end of file diff --git a/test-cli/.idea/test-cli.iml b/test-cli/.idea/test-cli.iml index 8a05c6e..218fb9e 100644 --- a/test-cli/.idea/test-cli.iml +++ b/test-cli/.idea/test-cli.iml @@ -2,7 +2,7 @@ - + diff --git a/test-cli/.idea/workspace.xml b/test-cli/.idea/workspace.xml index 25fe244..42f0f8f 100644 --- a/test-cli/.idea/workspace.xml +++ b/test-cli/.idea/workspace.xml @@ -2,8 +2,12 @@ - - + + + + + + @@ -43,4 +48,14 @@ + + + + + + + + + + \ No newline at end of file diff --git a/test-cli/test/tests/qeeprom.py b/test-cli/test/tests/qeeprom.py index c2293c2..acdc1f6 100644 --- a/test-cli/test/tests/qeeprom.py +++ b/test-cli/test/tests/qeeprom.py @@ -27,7 +27,7 @@ class Qeeprom(unittest.TestCase): p = sh.dd(sh.echo(data_tx), "of=" + self.__eeprompath, "bs=1", "seek=" + self.__position) if p.exit_code == 0: # read data from the eeprom - p = sh.dd("if=" + self.__eeprompath, "bs=1", "skip=" + self.__position, "count=" + len(data_tx)) + p = sh.dd("if=" + self.__eeprompath, "bs=1", "skip=" + self.__position, "count=" + str(len(data_tx))) if p.exit_code == 0: data_rx = p.stdout.decode('ascii') # compare both values diff --git a/test-cli/test/tests/qethernet.py b/test-cli/test/tests/qethernet.py index 22d796c..adee67f 100644 --- a/test-cli/test/tests/qethernet.py +++ b/test-cli/test/tests/qethernet.py @@ -4,26 +4,26 @@ import re class Qethernet(unittest.TestCase): - __sip = None + __serverip = None __numbytestx = None __bind = None - __OKBW = None + __bwexpected = None __port = None def __init__(self, testname, testfunc, varlist): super(Qethernet, self).__init__(testfunc) - if "sip" in varlist: - self.__sip = varlist["sip"] + if "serverip" in varlist: + self.__serverip = varlist["serverip"] else: raise Exception('sip param inside Qethernet have been be defined') if "bind" in varlist: self.__bind = varlist["bind"] else: self.__bind = None - if "OKBW" in varlist: - self.__OKBW = varlist["OKBW"] + if "bwexpected" in varlist: + self.__bwexpected = varlist["bwexpected"] else: - raise Exception('OKBW param inside Qethernet must be defined') + raise Exception('bwexpected param inside Qethernet must be defined') if "port" in varlist: self.__port = varlist["port"] else: @@ -34,9 +34,9 @@ class Qethernet(unittest.TestCase): def execute(self): # execute iperf command against the server if self.__bind is None: - p = sh.iperf("-c", self.__sip, "-x", "CMSV", "-n", self.__numbytestx, "-f", "m", "-p", self.__port) + p = sh.iperf("-c", self.__serverip, "-x", "CMSV", "-n", self.__numbytestx, "-f", "m", "-p", self.__port) else: - p = sh.iperf("-c", self.__sip, "-x", "CMSV", "-n", self.__numbytestx, "-f", "m", "-p", self.__port, "-B", + p = sh.iperf("-c", self.__serverip, "-x", "CMSV", "-n", self.__numbytestx, "-f", "m", "-p", self.__port, "-B", self.__bind) # check if it was executed succesfully if p.exit_code == 0: @@ -53,7 +53,7 @@ class Qethernet(unittest.TestCase): bwreal = b[0] # check if BW is in the expected range - self.failUnless(float(bwreal) > float(self.__OKBW) * 0.9, + self.failUnless(float(bwreal) > float(self.__bwexpected) * 0.9, "failed: speed is lower than spected. Speed(MB/s)" + str(bwreal)) else: self.fail("failed: could not complete iperf command") diff --git a/test-cli/test/tests/qram.py b/test-cli/test/tests/qram.py index 1b66e5c..ade7aeb 100644 --- a/test-cli/test/tests/qram.py +++ b/test-cli/test/tests/qram.py @@ -3,17 +3,17 @@ import sh class Qram(unittest.TestCase): - __memSize = "10M" + __memsize = "10M" __loops = "1" - # varlist: memSize, loops + # varlist: memsize, loops def __init__(self, testname, testfunc, varlist): super(Qram, self).__init__(testfunc) - if "memSize" in varlist: - self.__memSize = varlist["memSize"] + if "memsize" in varlist: + self.__memsize = varlist["memsize"] else: - raise Exception('memSize param inside Qram must be defined') + raise Exception('memsize param inside Qram must be defined') if "loops" in varlist: self.__loops = varlist["loops"] else: @@ -22,6 +22,6 @@ class Qram(unittest.TestCase): def execute(self): try: - sh.memtester(self.__memSize, "1") + sh.memtester(self.__memsize, "1") except sh.ErrorReturnCode as e: self.fail("failed: could not complete memtester command") diff --git a/test-cli/test_main.py b/test-cli/test_main.py index 8648b6b..c09d703 100644 --- a/test-cli/test_main.py +++ b/test-cli/test_main.py @@ -63,6 +63,8 @@ def add_test_task(suite, testdefname, paramlist): suite.addTest(Qserial(testdefname, "execute", paramlist)) # elif testdefname == "HDMI": # suite.addTest(Qhdmi(testdefname, "execute", paramlist)) + # elif testdefname == "SCREEN": + # suite.addTest(Qscreen(testdefname, "execute", paramlist)) elif testdefname == "BUTTON": suite.addTest(Qbutton(testdefname, "execute", paramlist)) elif testdefname == "RTC": -- cgit v1.1 From 9f07a57d89a927aa9b172c1bf20c7ab563658c73 Mon Sep 17 00:00:00 2001 From: Hector Fernandez Date: Fri, 6 Mar 2020 12:46:27 +0100 Subject: Fixed multiple errors. --- test-cli/.idea/dictionaries/hfernandez.xml | 3 + .../.idea/inspectionProfiles/profiles_settings.xml | 1 + test-cli/.idea/workspace.xml | 126 ++++++++++++++++++++- test-cli/test/helpers/get_dieid.py | 44 ------- test-cli/test/helpers/int_registers.py | 56 +++++++++ test-cli/test/helpers/testsrv_db.py | 26 ++--- test-cli/test/runners/simple.py | 39 ++----- test-cli/test/tests/qamp.py | 25 ++-- test-cli/test/tests/qaudio.py | 35 +++--- test-cli/test/tests/qbutton.py | 5 +- test-cli/test/tests/qduplex_ser.py | 4 +- test-cli/test/tests/qeeprom.py | 3 +- test-cli/test/tests/qethernet.py | 3 + test-cli/test/tests/qflash.py | 3 + test-cli/test/tests/qi2c.py | 3 + test-cli/test/tests/qiperf.py | 53 --------- test-cli/test/tests/qnand.py | 5 +- test-cli/test/tests/qram.py | 5 +- test-cli/test/tests/qrtc.py | 4 +- test-cli/test/tests/qscreen.py | 4 +- test-cli/test/tests/qserial.py | 4 +- test-cli/test/tests/qusb.py | 28 +++-- test-cli/test/tests/qwifi.py | 14 ++- test-cli/test_main.py | 16 ++- 24 files changed, 303 insertions(+), 206 deletions(-) create mode 100644 test-cli/.idea/dictionaries/hfernandez.xml delete mode 100644 test-cli/test/helpers/get_dieid.py create mode 100644 test-cli/test/helpers/int_registers.py delete mode 100644 test-cli/test/tests/qiperf.py diff --git a/test-cli/.idea/dictionaries/hfernandez.xml b/test-cli/.idea/dictionaries/hfernandez.xml new file mode 100644 index 0000000..c85feeb --- /dev/null +++ b/test-cli/.idea/dictionaries/hfernandez.xml @@ -0,0 +1,3 @@ + + + \ No newline at end of file diff --git a/test-cli/.idea/inspectionProfiles/profiles_settings.xml b/test-cli/.idea/inspectionProfiles/profiles_settings.xml index 105ce2d..dd4c951 100644 --- a/test-cli/.idea/inspectionProfiles/profiles_settings.xml +++ b/test-cli/.idea/inspectionProfiles/profiles_settings.xml @@ -1,5 +1,6 @@ + diff --git a/test-cli/.idea/workspace.xml b/test-cli/.idea/workspace.xml index 42f0f8f..cd587e0 100644 --- a/test-cli/.idea/workspace.xml +++ b/test-cli/.idea/workspace.xml @@ -2,12 +2,26 @@ - - + + + + + + + + + + + + + + + + + - - + + + + + + + @@ -49,13 +87,89 @@ - + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + - + + + + + + file://$PROJECT_DIR$/test/runners/simple.py + 83 + + + file://$PROJECT_DIR$/test/runners/simple.py + 58 + + + + \ No newline at end of file diff --git a/test-cli/test/helpers/get_dieid.py b/test-cli/test/helpers/get_dieid.py deleted file mode 100644 index 48e724e..0000000 --- a/test-cli/test/helpers/get_dieid.py +++ /dev/null @@ -1,44 +0,0 @@ -import mmap -import os -import struct - -MAP_MASK = mmap.PAGESIZE - 1 -WORD = 4 - - -def read(addr): - """ Read from any location in memory - Returns the readed value in hexadecimal format - Keyword arguments: - - addr: The memory address to be readed. - """ - fd = os.open("/dev/mem", os.O_RDWR | os.O_SYNC) - # Map one page - mm = mmap.mmap(fd, mmap.PAGESIZE, mmap.MAP_SHARED, mmap.PROT_WRITE | mmap.PROT_READ, offset=addr & ~MAP_MASK) - mm.seek(addr & MAP_MASK) - retval = struct.unpack('I', mm.read(WORD)) - mm.close() - os.close(fd) - return "%08X" % retval[0] - -def getRegisters(model): - if model.find("IGEP0046") == 0: - registers = [0x021BC420, 0x021BC410] - elif model.find("IGEP0034") == 0 or model.find("SOPA0000") == 0: - registers = [0x44e10630, 0x44e10634, 0x44e10638, 0x44e1063C] - elif model.find("OMAP3") == 0: - registers = [0x4830A224, 0x4830A220, 0x4830A21C, 0x4830A218] - elif model.find("OMAP5") == 0: - registers = [0x4A002210, 0x4A00220C, 0x4A002208, 0x4A002200] - return registers - -def genDieid(modelid): - registers = getRegisters(modelid) - id = "" - for i in range(len(registers)): - id=id+(read(registers[i])) - return id - -#if __name__ == "__main__": - #registers = [0x021BC420, 0x021BC410] - #print(main(registers)) diff --git a/test-cli/test/helpers/int_registers.py b/test-cli/test/helpers/int_registers.py new file mode 100644 index 0000000..cf2e35b --- /dev/null +++ b/test-cli/test/helpers/int_registers.py @@ -0,0 +1,56 @@ +import mmap +import os +import struct + +MAP_MASK = mmap.PAGESIZE - 1 +WORD = 4 + + +def read(addr): + """ Read from any location in memory + Returns the readed value in hexadecimal format + Keyword arguments: + - addr: The memory address to be readed. + """ + fd = os.open("/dev/mem", os.O_RDWR | os.O_SYNC) + # Map one page + mm = mmap.mmap(fd, mmap.PAGESIZE, mmap.MAP_SHARED, mmap.PROT_WRITE | mmap.PROT_READ, offset=addr & ~MAP_MASK) + mm.seek(addr & MAP_MASK) + retval = struct.unpack('I', mm.read(WORD)) + mm.close() + os.close(fd) + return "%08X" % retval[0] + + +def get_die_id(modelid): + dieid = "" + + # get registers + if modelid.find("IGEP0046") == 0: + registers = [0x021BC420, 0x021BC410] + elif modelid.find("IGEP0034") == 0 or modelid.find("SOPA0000") == 0: + # registers: mac_id0_lo, mac_id0_hi, mac_id1_lo, mac_id1_hi + registers = [0x44e10630, 0x44e10634, 0x44e10638, 0x44e1063C] + elif modelid.find("OMAP3") == 0: + registers = [0x4830A224, 0x4830A220, 0x4830A21C, 0x4830A218] + elif modelid.find("OMAP5") == 0: + registers = [0x4A002210, 0x4A00220C, 0x4A002208, 0x4A002200] + else: + raise Exception('modelid not defined') + + for rg in registers: + dieid = dieid + read(rg) + return dieid + + +def get_mac(modelid): + mac = None + + if modelid.find("IGEP0034") == 0 or modelid.find("SOPA0000") == 0: + # registers: mac_id0_lo, mac_id0_hi + registers = [0x44e10630, 0x44e10634] + mac = "" + for rg in registers: + mac = mac + read(rg) + + return mac diff --git a/test-cli/test/helpers/testsrv_db.py b/test-cli/test/helpers/testsrv_db.py index b7b75b1..d937d3e 100644 --- a/test-cli/test/helpers/testsrv_db.py +++ b/test-cli/test/helpers/testsrv_db.py @@ -27,11 +27,13 @@ class TestSrv_Database(object): def create_board(self, processor_id, model_id, variant, station, bmac=None): '''create a new board''' if bmac is None: - sql = "SELECT isee.f_create_board('{}', '{}', '{}', NULL, '{}');".format(processor_id, model_id, variant, - station) + sql = "SELECT * FROM isee.f_create_board('{}', '{}', '{}', NULL, '{}');".format(processor_id, model_id, + variant, + station) else: - sql = "SELECT isee.f_create_board('{}', '{}', '{}', '{}', '{}');".format(processor_id, model_id, variant, - bmac, station) + sql = "SELECT * FROM isee.f_create_board('{}', '{}', '{}', '{}', '{}');".format(processor_id, model_id, + variant, + bmac, station) # print('>>>' + sql) try: res = self.__sqlObject.db_execute_query(sql) @@ -57,7 +59,7 @@ class TestSrv_Database(object): def get_test_params_list(self, testid): '''get the board test list''' - sql = "SELECT * FROM isee.f_get_test_params_list('{}')".format(testid) + sql = "SELECT * FROM isee.f_get_test_params_list({})".format(testid) # print('>>>' + sql) try: res = self.__sqlObject.db_execute_query(sql) @@ -75,7 +77,7 @@ class TestSrv_Database(object): try: res = self.__sqlObject.db_execute_query(sql) # print(res) - return res + return res[0][0] except Exception as err: r = find_between(str(err), '#', '#') # print(r) @@ -83,12 +85,10 @@ class TestSrv_Database(object): def run_test(self, testid_ctl, testid): '''get the board test list''' - sql = "SELECT * FROM isee.f_run_test('{}','{}')".format(testid_ctl, testid) + sql = "SELECT isee.f_run_test({},{})".format(testid_ctl, testid) # print('>>>' + sql) try: - res = self.__sqlObject.db_execute_query(sql) - # print(res) - return res + self.__sqlObject.db_execute_query(sql) except Exception as err: r = find_between(str(err), '#', '#') # print(r) @@ -96,12 +96,10 @@ class TestSrv_Database(object): def finish_test(self, testid_ctl, testid, newstatus): '''get the board test list''' - sql = "SELECT * FROM isee.f_finish_test('{}','{}','{}')".format(testid_ctl, testid, newstatus) + sql = "SELECT isee.f_finish_test({},{},'{}')".format(testid_ctl, testid, newstatus) # print('>>>' + sql) try: - res = self.__sqlObject.db_execute_query(sql) - # print(res) - return res + self.__sqlObject.db_execute_query(sql) except Exception as err: r = find_between(str(err), '#', '#') # print(r) diff --git a/test-cli/test/runners/simple.py b/test-cli/test/runners/simple.py index 084a5b8..dd7c74d 100644 --- a/test-cli/test/runners/simple.py +++ b/test-cli/test/runners/simple.py @@ -12,7 +12,6 @@ from test.helpers.globalVariables import globalVar from test.helpers.testsrv_db import TestSrv_Database - class SimpleTestRunner: """ A Test Runner that shows results in a simple human-readable format. @@ -33,15 +32,12 @@ class SimpleTestRunner: self.stream.write(message) def run(self, test): - """ Run the given test case or Test Suite. - - """ + """ Run the given test case or Test Suite. """ result = TextTestResult(self, self.__pgObj) test(result) - result.testsRun - # self.writeUpdate("---------------------------------------------\n") return result + class TextTestResult(unittest.TestResult): # Print in terminal with colors PASS = '\033[32mPASS\033[0m\n' @@ -60,14 +56,11 @@ class TextTestResult(unittest.TestResult): unittest.TestResult.startTest(self, test) self.runner.writeUpdate("%s : " % test.shortDescription()) # SEND TO DB THE UPDATE THAT WE RUN EACH TEST - self.__pgObj.update_set_test_row(globalVar.station, globalVar.testid_ctl, globalVar.g_uuid, test.shortDescription(), "RUNNING") - # psdb = TestSrv_Database() - # psdb.open("setup.xml") - # psdb.update_set_test_row(globalVar.station, globalVar.testid_ctl, globalVar.g_uuid, test.shortDescription(), "RUNNING") + self.__pgObj.run_test(test.params["testidctl"], test.params["testid"]) def addSuccess(self, test): unittest.TestResult.addSuccess(self, test) - self.result=self.PASS + self.result = self.PASS def addError(self, test, err): unittest.TestResult.addError(self, test, err) @@ -76,25 +69,17 @@ class TextTestResult(unittest.TestResult): def addFailure(self, test, err): unittest.TestResult.addFailure(self, test, err) - test.longMessage=err[1] + test.longMessage = err[1] self.result = self.FAIL def stopTest(self, test): unittest.TestResult.stopTest(self, test) # display: print test result self.runner.writeUpdate(self.result) - # DB: PREPARE THE DATA TO BE INSERTED - dbdata = {} - dbdata['uuid'] = globalVar.g_uuid - dbdata['name'] = test.shortDescription() - dbdata['result'] = self.result - dbdata['msg'] = str(test.longMessage) - #CONVERT FANCY FAIL AND PASS - if self.result==self.PASS: simple_result="TRUE" - if self.result == self.FAIL: simple_result = "FALSE" - elif self.result == self.ERROR: simple_result = "FALSE" - #SEND TO DB THE RESULT OF THE TEST - # psdb = TestSrv_Database() - # psdb.open("setup.xml") - self.__pgObj.add_test_to_batch(globalVar.g_uuid, test.shortDescription(), globalVar.testid_ctl, simple_result, globalVar.g_mid, test.longMessage) - # psdb.add_test_to_batch(globalVar.g_uuid, test.shortDescription(), globalVar.testid_ctl, simple_result, globalVar.g_mid, test.longMessage) + # SEND TO DB THE RESULT OF THE TEST + if self.result == self.PASS: + status = "TEST_COMPLETE" + else: + status = "TEST_FAILED" + self.__pgObj.finish_test(test.params["testidctl"], test.params["testid"], status) + diff --git a/test-cli/test/tests/qamp.py b/test-cli/test/tests/qamp.py index 3e38ce9..cf18fc5 100644 --- a/test-cli/test/tests/qamp.py +++ b/test-cli/test/tests/qamp.py @@ -3,10 +3,13 @@ import unittest import serial import time + class Qamp(unittest.TestCase): + params = None - #varlist: undercurrent, overcurrent + # varlist: undercurrent, overcurrent def __init__(self, testname, testfunc, varlist): + self.params = varlist self._current = 0.0 if "undercurrent" in varlist: self._undercurrent = varlist["undercurrent"] @@ -16,7 +19,7 @@ class Qamp(unittest.TestCase): self._overcurrent = varlist["overcurrent"] else: raise Exception('overcurrent param inside Qamp must be defined') - self._vshuntfactor=16384.0 + self._vshuntfactor = 16384.0 self._ser = serial.Serial() self._ser.port = '/dev/ttyUSB0' self._ser.baudrate = 115200 @@ -34,19 +37,19 @@ class Qamp(unittest.TestCase): def execute(self): # Open Serial port ttyUSB0 - error=0 + error = 0 try: self._ser.open() except: self.fail("failed: IMPOSSIBLE OPEN USB-SERIAL PORT ( {} )".format(self._ser.port)) - error=1 - if error==0: + error = 1 + if error == 0: # Clean input and output buffer of serial port self._ser.flushInput() self._ser.flushOutput() # Send command to read Voltage at Shunt resistor # Prepare cmd - cmd = ('at+in?\n\r') + cmd = 'at+in?\n\r' i = 0 # Write, 1 by 1 byte at a time to avoid hanging of serial receiver code of listener, emphasis being made at sleep of 50 ms. @@ -57,7 +60,7 @@ class Qamp(unittest.TestCase): # Read, 1 by 1 byte res = [] - while (self._ser.inWaiting() > 0): # if incoming bytes are waiting to be read from the serial input buffer + while self._ser.inWaiting() > 0: # if incoming bytes are waiting to be read from the serial input buffer res.append(self._ser.read(1).decode('ascii')) # read the bytes and convert from binary array to ASCII string = ''.join(res) @@ -65,12 +68,8 @@ class Qamp(unittest.TestCase): self._current = float(int(string, 0)) / self._vshuntfactor print(self._current) # In order to give a valid result it is importarnt to define an under current value - if (self._current > float(self._overcurrent)): + if self._current > float(self._overcurrent): self.fail("failed: OVERCURRENT DETECTED ( {} )".format(self._current)) - if (self._current < float(self._undercurrent)): + if self._current < float(self._undercurrent): self.fail("failed: UNDERCURRENT DETECTED ( {} )".format(self._current)) - - - - diff --git a/test-cli/test/tests/qaudio.py b/test-cli/test/tests/qaudio.py index fe57be2..ef4da67 100644 --- a/test-cli/test/tests/qaudio.py +++ b/test-cli/test/tests/qaudio.py @@ -1,38 +1,39 @@ from test.helpers.syscmd import SysCommand import unittest -#class name + + class Qaudio(unittest.TestCase): - # Initialize the variables + params = None - def __init__(self, testname, testfunc, dtmfFile): - # Doing this we will initialize the class and later on perform a particular method inside this class + def __init__(self, testname, testfunc, varlist): + self.params = varlist super(Qaudio, self).__init__(testfunc) self._testMethodDoc = testname - self._dtmfFile=dtmfFile - self.__sum=0 - self.__refSum = 25 # 1+3+5+7+9 + if "dtmfFile" in varlist: + self._dtmfFile = varlist["dtmfFile"] + else: + raise Exception('undercurrent param inside Qamp must be defined') + self.__sum = 0 + self.__refSum = 25 # 1+3+5+7+9 def execute(self): - str_cmd = "aplay test/files/dtmf-13579.wav & arecord -r 8000 -d 1 recorded.wav" #.format(self.__dtmfFile) + str_cmd = "aplay test/files/dtmf-13579.wav & arecord -r 8000 -d 1 recorded.wav" # .format(self.__dtmfFile) audio_loop = SysCommand("command-name", str_cmd) - if audio_loop.execute() == -1:# BUG: Returns -1 but work + if audio_loop.execute() == -1: # BUG: Returns -1 but work lines = audio_loop.getOutput().splitlines() str_cmd = "multimon -t wav -a DTMF recorded.wav -q" dtmf_decoder = SysCommand("command-name", str_cmd) - a=dtmf_decoder.execute() - if dtmf_decoder.execute() == -1: # BUG: Returns -1 but work + a = dtmf_decoder.execute() + if dtmf_decoder.execute() == -1: # BUG: Returns -1 but work self.__raw_out = dtmf_decoder.getOutput() if self.__raw_out == "": - return -1 + self.fail("failed: can not execute multimon command") lines = dtmf_decoder.getOutput().splitlines() for i in range(0, 5): - aux=[int(s) for s in lines[i].split() if s.isdigit()] - self.__sum=self.__sum+aux[0] + aux = [int(s) for s in lines[i].split() if s.isdigit()] + self.__sum = self.__sum + aux[0] self.failUnless(self.__sum == self.__refSum), "failed: incorrect dtmf code" + str(self.__sum) else: self.fail("failed: fail reading recorded file") - return -1 else: self.fail("failed: fail playing/recording file") - return -1 - return 0 \ No newline at end of file diff --git a/test-cli/test/tests/qbutton.py b/test-cli/test/tests/qbutton.py index 4718924..46ddde0 100644 --- a/test-cli/test/tests/qbutton.py +++ b/test-cli/test/tests/qbutton.py @@ -1,11 +1,14 @@ from test.helpers.syscmd import SysCommand import unittest -import uuid import time + class Qbutton(unittest.TestCase): + params = None def __init__(self, testname, testfunc, varlist): + self.params = varlist + super(Qbutton, self).__init__(testfunc) if "gpio" in varlist: self.__gpio = varlist["gpio"] else: diff --git a/test-cli/test/tests/qduplex_ser.py b/test-cli/test/tests/qduplex_ser.py index 837b4d0..46fb5db 100644 --- a/test-cli/test/tests/qduplex_ser.py +++ b/test-cli/test/tests/qduplex_ser.py @@ -1,12 +1,14 @@ -from test.helpers.syscmd import SysCommand import unittest import uuid import serial import time + class Qduplex(unittest.TestCase): + params = None def __init__(self, testname, testfunc, varlist): + self.params = varlist super(Qduplex, self).__init__(testfunc) if "port1" in varlist: self.__port1 = varlist["port1"] diff --git a/test-cli/test/tests/qeeprom.py b/test-cli/test/tests/qeeprom.py index acdc1f6..d2c55ef 100644 --- a/test-cli/test/tests/qeeprom.py +++ b/test-cli/test/tests/qeeprom.py @@ -3,12 +3,13 @@ import unittest class Qeeprom(unittest.TestCase): - + params = None __position = None __eeprompath = None # varlist: position, eeprompath def __init__(self, testname, testfunc, varlist): + self.params = varlist super(Qeeprom, self).__init__(testfunc) self._testMethodDoc = testname if "position" in varlist: diff --git a/test-cli/test/tests/qethernet.py b/test-cli/test/tests/qethernet.py index adee67f..1d4c9bc 100644 --- a/test-cli/test/tests/qethernet.py +++ b/test-cli/test/tests/qethernet.py @@ -9,8 +9,11 @@ class Qethernet(unittest.TestCase): __bind = None __bwexpected = None __port = None + params = None + #varlist content: serverip, bwexpected, port, (optional)bind def __init__(self, testname, testfunc, varlist): + self.params = varlist super(Qethernet, self).__init__(testfunc) if "serverip" in varlist: self.__serverip = varlist["serverip"] diff --git a/test-cli/test/tests/qflash.py b/test-cli/test/tests/qflash.py index 0c3fcb5..59ed13d 100644 --- a/test-cli/test/tests/qflash.py +++ b/test-cli/test/tests/qflash.py @@ -2,9 +2,12 @@ from test.helpers.syscmd import SysCommand import unittest from test.helpers.globalVariables import globalVar + class Qflasher(unittest.TestCase): + params = None def __init__(self, testname, testfunc, varlist): + self.params = varlist super(Qflasher, self).__init__(testfunc) self._testMethodDoc = testname model=globalVar.g_mid diff --git a/test-cli/test/tests/qi2c.py b/test-cli/test/tests/qi2c.py index 2f14424..71eb64e 100644 --- a/test-cli/test/tests/qi2c.py +++ b/test-cli/test/tests/qi2c.py @@ -1,9 +1,12 @@ from test.helpers.syscmd import SysCommand import unittest + class Qi2c(unittest.TestCase): + params = None def __init__(self, testname, testfunc, varlist): + self.params = varlist super(Qi2c, self).__init__(testfunc) if "busnum" in varlist: self.__busnum = varlist["busnum"] diff --git a/test-cli/test/tests/qiperf.py b/test-cli/test/tests/qiperf.py deleted file mode 100644 index 126b6ee..0000000 --- a/test-cli/test/tests/qiperf.py +++ /dev/null @@ -1,53 +0,0 @@ -from test.helpers.syscmd import SysCommand - - -class QIperf(object): - __sip = None - __raw_out = None - __MB_req = None - __MB_real = None - __BW_real = None - __dat_list = None - __bind = None - - def __init__(self, sip = None): - if sip is not None: - self.__sip = sip - self.__MB_req = '10' - - def execute(self, sip = None, bind = None): - if sip is not None: - self.__sip = sip - - if bind is None: - str_cmd = "iperf -c {} -x CMSV -n {}M".format(self.__sip, self.__MB_req) - else: - self.__bind = bind - str_cmd = "iperf -c {} -x CMSV -n {}M -B {}".format(self.__sip, self.__MB_req, self.__bind) - t = SysCommand("iperf", str_cmd) - if t.execute() == 0: - self.__raw_out = t.getOutput() - if self.__raw_out == "": - return -1 - lines = t.getOutput().splitlines() - dat = lines[1] - dat = dat.decode('ascii') - dat_list = dat.split( ) - for d in dat_list: - a = dat_list.pop(0) - if a == "sec": - break - self.__MB_real = dat_list[0] - self.__BW_real = dat_list[2] - self.__dat_list = dat_list - print(self.__MB_real) - print(self.__BW_real) - else: - return -1 - return 0 - - def get_Total_MB(self): - return self.__MB_real; - - def get_Total_BW(self): - return self.__MB_real; diff --git a/test-cli/test/tests/qnand.py b/test-cli/test/tests/qnand.py index 9f2cd47..5125c1c 100644 --- a/test-cli/test/tests/qnand.py +++ b/test-cli/test/tests/qnand.py @@ -1,14 +1,15 @@ import unittest import sh -class Qnand(unittest.TestCase): +class Qnand(unittest.TestCase): + params = None __device = "10M" # varlist: device def __init__(self, testname, testfunc, varlist): + self.params = varlist super(Qnand, self).__init__(testfunc) - if "device" in varlist: self.__device = varlist["device"] else: diff --git a/test-cli/test/tests/qram.py b/test-cli/test/tests/qram.py index ade7aeb..cc75b68 100644 --- a/test-cli/test/tests/qram.py +++ b/test-cli/test/tests/qram.py @@ -1,15 +1,16 @@ import unittest import sh -class Qram(unittest.TestCase): +class Qram(unittest.TestCase): + params = None __memsize = "10M" __loops = "1" # varlist: memsize, loops def __init__(self, testname, testfunc, varlist): + self.params = varlist super(Qram, self).__init__(testfunc) - if "memsize" in varlist: self.__memsize = varlist["memsize"] else: diff --git a/test-cli/test/tests/qrtc.py b/test-cli/test/tests/qrtc.py index 8e31572..884a290 100644 --- a/test-cli/test/tests/qrtc.py +++ b/test-cli/test/tests/qrtc.py @@ -2,11 +2,13 @@ from test.helpers.syscmd import SysCommand import unittest import time -class Qrtc(unittest.TestCase): +class Qrtc(unittest.TestCase): + params = None __rtc = "/dev/rtc0" def __init__(self, testname, testfunc, varlist): + self.params = varlist super(Qrtc, self).__init__(testfunc) if "rtc" in varlist: self.__rtc = varlist["rtc"] diff --git a/test-cli/test/tests/qscreen.py b/test-cli/test/tests/qscreen.py index 87e53b2..aaee628 100644 --- a/test-cli/test/tests/qscreen.py +++ b/test-cli/test/tests/qscreen.py @@ -1,12 +1,14 @@ from test.helpers.syscmd import SysCommand import unittest -import time from test.helpers.cv_display_test import pattern_detect + class Qscreen(unittest.TestCase): + params = None #varlist: display def __init__(self, testname, testfunc, varlist): + self.params = varlist super(Qscreen, self).__init__(testfunc) if "display" in varlist: self.__display = varlist["display"] diff --git a/test-cli/test/tests/qserial.py b/test-cli/test/tests/qserial.py index d3e2ee6..d798578 100644 --- a/test-cli/test/tests/qserial.py +++ b/test-cli/test/tests/qserial.py @@ -1,12 +1,14 @@ -from test.helpers.syscmd import SysCommand import unittest import uuid import serial import time + class Qserial(unittest.TestCase): + params = None def __init__(self, testname, testfunc, varlist): + self.params = varlist super(Qserial, self).__init__(testfunc) if "port" in varlist: self.__port = varlist["port"] diff --git a/test-cli/test/tests/qusb.py b/test-cli/test/tests/qusb.py index 0390143..70265a5 100644 --- a/test-cli/test/tests/qusb.py +++ b/test-cli/test/tests/qusb.py @@ -1,9 +1,14 @@ from test.helpers.syscmd import SysCommand import unittest + class Qusb(unittest.TestCase): + __numPorts = None + __devLabel = None + params = None def __init__(self, testname, testfunc, varlist): + self.params = varlist super(Qusb, self).__init__(testfunc) if "numPorts" in varlist: self.__numPorts = varlist["numPorts"] @@ -14,27 +19,27 @@ class Qusb(unittest.TestCase): self.__devLabel = varlist["devLabel"] else: raise Exception('devLabel param inside Qusb must be defined') - if testname=="USBOTG": + if testname == "USBOTG": self.__usbFileName = "/this_is_an_usb_otg" self.__usbtext = "USBOTG" else: self.__usbFileName = "/this_is_an_usb_host" self.__usbtext = "USBHOST" - self.__numUsbFail=[] + self.__numUsbFail = [] def execute(self): - str_cmd= "lsblk -o LABEL" + str_cmd = "lsblk -o LABEL" lsblk_command = SysCommand("lsblk", str_cmd) if lsblk_command.execute() == 0: self.__raw_out = lsblk_command.getOutput() if self.__raw_out == "": return -1 lines = lsblk_command.getOutput().splitlines() - host_list=[] + host_list = [] for i in range(len(lines)): - if str(lines[i].decode('ascii'))==self.__devLabel: + if str(lines[i].decode('ascii')) == self.__devLabel: host_list.append(i) - if len(host_list)==int(self.__numPorts): + if len(host_list) == int(self.__numPorts): str_cmd = "lsblk -o MOUNTPOINT" lsblk_command = SysCommand("lsblk", str_cmd) if lsblk_command.execute() == 0: @@ -45,13 +50,14 @@ class Qusb(unittest.TestCase): else: lines = lsblk_command.getOutput().splitlines() for i in range(len(host_list)): - file_path=str(lines[host_list[i]].decode('ascii')) + self.__usbFileName + file_path = str(lines[host_list[i]].decode('ascii')) + self.__usbFileName usb_file = open(file_path, 'r') - read=usb_file.read() - if read.find(self.__usbtext)!=-1: + read = usb_file.read() + if read.find(self.__usbtext) != -1: print(file_path + " --> OK!") else: - self.fail("failed: could not read from usb {}".format(lines[host_list[i]].decode('ascii'))) + self.fail( + "failed: could not read from usb {}".format(lines[host_list[i]].decode('ascii'))) self.__numUsbFail.append(host_list[i]) usb_file.close() else: @@ -60,4 +66,4 @@ class Qusb(unittest.TestCase): else: self.fail("failed: reference and real usb host devices number mismatch") else: - self.fail("failed: couldn't execute lsblk command") \ No newline at end of file + self.fail("failed: couldn't execute lsblk command") diff --git a/test-cli/test/tests/qwifi.py b/test-cli/test/tests/qwifi.py index 154dd52..2a5fa0c 100644 --- a/test-cli/test/tests/qwifi.py +++ b/test-cli/test/tests/qwifi.py @@ -9,14 +9,16 @@ class Qwifi(unittest.TestCase): __bind = None __OKBW = None __port = None + params = None - #varlist: sip, bind, OKBW, port + #varlist: serverip, bind, OKBW, port def __init__(self, testname, testfunc, varlist): + self.params = varlist super(Qwifi, self).__init__(testfunc) - if "sip" in varlist: - self.__sip = varlist["sip"] + if "serverip" in varlist: + self.__serverip = varlist["serverip"] else: - raise Exception('sip param inside Qwifi have been be defined') + raise Exception('serverip param inside Qwifi have been be defined') if "OKBW" in varlist: self.__OKBW = varlist["OKBW"] else: @@ -47,10 +49,10 @@ class Qwifi(unittest.TestCase): if result: # execute iperf command against the server if self.__bind is None: - p = sh.iperf("-c", self.__sip, "-x", "CMSV", "-n", self.__numbytestx, "-f", "m", "-p", + p = sh.iperf("-c", self.__serverip, "-x", "CMSV", "-n", self.__numbytestx, "-f", "m", "-p", self.__port) else: - p = sh.iperf("-c", self.__sip, "-x", "CMSV", "-n", self.__numbytestx, "-f", "m", "-p", + p = sh.iperf("-c", self.__serverip, "-x", "CMSV", "-n", self.__numbytestx, "-f", "m", "-p", self.__port, "-B", self.__bind) # check if it was executed succesfully if p.exit_code == 0: diff --git a/test-cli/test_main.py b/test-cli/test_main.py index c09d703..48ec534 100644 --- a/test-cli/test_main.py +++ b/test-cli/test_main.py @@ -1,4 +1,5 @@ -from test.helpers.get_dieid import genDieid +from test.helpers.int_registers import get_die_id +from test.helpers.int_registers import get_mac from subprocess import call import xml.etree.ElementTree as XMLParser import errno @@ -92,6 +93,8 @@ def add_test_task(suite, testdefname, paramlist): def create_testsuite(): # create an object TestSuite suite = unittest.TestSuite() + # get id of the full test for this board + globalVar.testid_ctl = psdbObj.open_test(globalVar.g_uuid) # get list of tests for this board tests = psdbObj.get_tests_list(globalVar.g_uuid) # loop in every test for this board @@ -100,11 +103,13 @@ def create_testsuite(): # get params for this test params = psdbObj.get_test_params_list(testid) paramlist = create_paramslist(params) + # add the testid as parameter + paramlist["testid"] = testid + paramlist["boarduuid"] = globalVar.g_uuid + paramlist["testidctl"] = globalVar.testid_ctl # add test to TestSuite add_test_task(suite, testdefname, paramlist) - globalVar.testid_ctl = psdbObj.open_test(globalVar.g_uuid) - return suite @@ -118,10 +123,11 @@ def create_board(): # get station globalVar.station = socket.gethostname() - processor_id = genDieid(globalVar.g_mid) + processor_id = get_die_id(globalVar.g_mid) print(globalVar.g_mid) print(processor_id) - globalVar.g_uuid = psdbObj.create_board(processor_id, model_id, variant, globalVar.station, bmac=None) + globalVar.g_uuid = psdbObj.create_board(processor_id, model_id, variant, globalVar.station, + get_mac(globalVar.g_mid)) def main(): -- cgit v1.1 From a03055f657d2e970e45e7ea2a3e66857f821eabd Mon Sep 17 00:00:00 2001 From: Hector Fernandez Date: Mon, 9 Mar 2020 09:03:28 +0100 Subject: Solved problems with consumption test. Fixed error when getting mac address. --- test-cli/.idea/workspace.xml | 43 +++++---- test-cli/test/helpers/amper.py | 155 +++++++++++++++++++++++++++++++++ test-cli/test/helpers/int_registers.py | 15 ++-- test-cli/test/tests/qamp.py | 1 + test-cli/test/tests/qamper.py | 51 +++++++++++ test-cli/test_main.py | 4 +- 6 files changed, 239 insertions(+), 30 deletions(-) create mode 100644 test-cli/test/helpers/amper.py create mode 100644 test-cli/test/tests/qamper.py diff --git a/test-cli/.idea/workspace.xml b/test-cli/.idea/workspace.xml index cd587e0..7e0aea1 100644 --- a/test-cli/.idea/workspace.xml +++ b/test-cli/.idea/workspace.xml @@ -2,26 +2,11 @@ + + - - - + - - - - - - - - - - - - - - - + + + @@ -43,9 +35,14 @@ - + + + + + + @@ -87,18 +84,18 @@ - + - + - + - + diff --git a/test-cli/test/helpers/amper.py b/test-cli/test/helpers/amper.py new file mode 100644 index 0000000..45ec7db --- /dev/null +++ b/test-cli/test/helpers/amper.py @@ -0,0 +1,155 @@ +import serial +import scanf +import time + + +class Amper(object): + __ser = None + __port = '/dev/ttyUSB0' + __speed = 115200 + __parity = None + __rtscts = 0 + __timeout = 1 + __isThere = False + __version = None + __voltage = 0.0 + __current = 0.0 + __alarm_condition = 0 + + def __init__(self, port='/dev/ttyUSB0', serial_speed=115200, parity=serial.PARITY_NONE, rtscts=0): + self.__port = port + self.__speed = serial_speed + self.__parity = parity + self.__rtscts = rtscts + + def open(self): + try: + if self.__ser is not None: + self.close() + self.__ser = serial.Serial(port=self.__port, + baudrate=self.__speed, + timeout=self.__timeout, + parity=self.__parity, + rtscts=self.__rtscts, + bytesize=serial.EIGHTBITS, + stopbits=serial.STOPBITS_ONE, + xonxoff=0) + + self.__ser.flushInput() + self.__ser.flushOutput() + self.__ser.break_condition = False + return True + except serial.SerialException as err: + print(err) + return False + + def close(self): + if self.__ser is not None: + self.__ser.close() + self.__ser = None + self.__isThere = False + self.__version = None + self.__voltage = 0.0 + self.__current = 0.0 + self.__alarm_condition = 0 + + def __write(self, data): + data += "\n\r" + dat = data.encode('ascii') + self.__ser.write(dat) + + def __read(self): + return self.__ser.readline().decode().rstrip() + + def __set_voltage_underlimit(self, under_limit): + if self.__isThere and self.__ser is not None: + self.__write('AT+VIN_UV_W_LIM={}'.format(under_limit)) + dat = self.__read() + print(dat) + + def __get_voltage_underlimit(self): + self.__write('AT+VIN_UV_W_LIM=?') + dat = self.__read() + res = scanf.scanf("%f", dat) + return res[0] + + def __set_voltage_overlimit(self, over_limit): + if self.__isThere and self.__ser is not None: + self.__write('AT+VIN_OV_W_LIM={}'.format(over_limit)) + dat = self.__read() + print(dat) + + def __get_voltage_overlimit(self): + if self.__isThere and self.__ser is not None: + self.__write('AT+VIN_OV_W_LIM?') + dat = self.__read() + res = scanf.scanf("%f", dat) + return res[0] + + def __set_current_overlimit(self, over_limit): + if self.__isThere and self.__ser is not None: + self.__write('AT+IOUT_W_LIM={}'.format(over_limit)) + dat = self.__read() + print(dat) + + def __get_current_overlimit(self): + if self.__isThere and self.__ser is not None: + self.__write('AT+IOUT_W_LIM?') + dat = self.__read() + dat = self.__read() + res = scanf.scanf("%f", dat) + return res[0] + + def hello(self): + self.__isThere = False + if self.__ser is None: + return False + self.__write('at+ver?') + dat = self.__read() + res = scanf.scanf("ISEE Amper v%3c", dat) + if res is None: + return False + self.__isThere = True + self.__version = res[0] + return True + + def getVersion(self): + return self.__version + + def getVoltage(self): + if self.__isThere and self.__ser is not None: + self.__write('at+vin?') + dat = self.__read() + res = scanf.scanf("%f", dat) + self.__voltage = res[0] + # print(self.__voltage) + return self.__voltage + else: + return None + + def getCurrent(self): + if self.__isThere and self.__ser is not None: + self.__write('at+in?') + dat = self.__read() + res = scanf.scanf("%f", dat) + self.__current = res[0] + # print(self.__current) + else: + return None + return self.__current + + def getAlarm(self): + if self.__isThere and self.__ser is not None: + self.__write('at+st_mfr?') + dat = self.__read() + # print(dat) + self.__alarm_condition = (int(dat) & 0x0F) + # print(self.__alarm_condition) + return self.__alarm_condition + + def clearAlarm(self): + if self.__isThere and self.__ser is not None: + self.__write('at+CLRFAULT') + dat = self.__read() + # print(dat) + diff --git a/test-cli/test/helpers/int_registers.py b/test-cli/test/helpers/int_registers.py index cf2e35b..030035d 100644 --- a/test-cli/test/helpers/int_registers.py +++ b/test-cli/test/helpers/int_registers.py @@ -1,6 +1,7 @@ import mmap import os import struct +import sh MAP_MASK = mmap.PAGESIZE - 1 WORD = 4 @@ -47,10 +48,14 @@ def get_mac(modelid): mac = None if modelid.find("IGEP0034") == 0 or modelid.find("SOPA0000") == 0: - # registers: mac_id0_lo, mac_id0_hi - registers = [0x44e10630, 0x44e10634] - mac = "" - for rg in registers: - mac = mac + read(rg) + # # registers: mac_id0_lo, mac_id0_hi + # registers = [0x44e10630, 0x44e10634] + # mac = "" + # for rg in registers: + # mac = mac + read(rg) + # #erase trailing zeros + # mac = mac[4::1] + mac = sh.cat("/sys/class/net/eth0/address") + return mac diff --git a/test-cli/test/tests/qamp.py b/test-cli/test/tests/qamp.py index cf18fc5..56511c8 100644 --- a/test-cli/test/tests/qamp.py +++ b/test-cli/test/tests/qamp.py @@ -36,6 +36,7 @@ class Qamp(unittest.TestCase): self._ser.close() def execute(self): + # Open Serial port ttyUSB0 error = 0 try: diff --git a/test-cli/test/tests/qamper.py b/test-cli/test/tests/qamper.py new file mode 100644 index 0000000..2b02302 --- /dev/null +++ b/test-cli/test/tests/qamper.py @@ -0,0 +1,51 @@ +import unittest +from test.helpers.amper import Amper + + +class Qamper(unittest.TestCase): + params = None + + # varlist: undercurrent, overcurrent + def __init__(self, testname, testfunc, varlist): + self.params = varlist + super(Qamper, self).__init__(testfunc) + + if "undercurrent" in varlist: + self._undercurrent = varlist["undercurrent"] + else: + raise Exception('undercurrent param inside Qamp must be defined') + if "overcurrent" in varlist: + self._overcurrent = varlist["overcurrent"] + else: + raise Exception('overcurrent param inside Qamp must be defined') + self._testMethodDoc = testname + + def execute(self): + amp = Amper() + print(amp) + if not amp.open(): + print("1") + self.fail("failed: can not open serial port") + return + + if not amp.hello(): + print("2") + self.fail("failed: can not communicate") + return + + result = amp.getCurrent() + print(result) + + amp.close() + + + + + + + # # In order to give a valid result it is importarnt to define an under current value + # if self._current > float(self._overcurrent): + # self.fail("failed: OVERCURRENT DETECTED ( {} )".format(self._current)) + # + # if self._current < float(self._undercurrent): + # self.fail("failed: UNDERCURRENT DETECTED ( {} )".format(self._current)) diff --git a/test-cli/test_main.py b/test-cli/test_main.py index 48ec534..0e863dc 100644 --- a/test-cli/test_main.py +++ b/test-cli/test_main.py @@ -22,7 +22,7 @@ from test.tests.qscreen import Qscreen from test.tests.qwifi import Qwifi from test.tests.qrtc import Qrtc from test.tests.qduplex_ser import Qduplex -from test.tests.qamp import Qamp +from test.tests.qamper import Qamper from test.tests.qflash import Qflasher from test.tests.qnand import Qnand from test.helpers.globalVariables import globalVar @@ -71,7 +71,7 @@ def add_test_task(suite, testdefname, paramlist): elif testdefname == "RTC": suite.addTest(Qrtc(testdefname, "execute", paramlist)) elif testdefname == "CONSUMPTION": - suite.addTest(Qamp(testdefname, "execute", paramlist)) + suite.addTest(Qamper(testdefname, "execute", paramlist)) # elif testdefname == "SATA": # suite.addTest(Qsata(testdefname, "execute", paramlist)) elif testdefname == "DMESG": -- cgit v1.1 From c685367cbd6abf1c6ae442df759e39b25a907d3b Mon Sep 17 00:00:00 2001 From: Hector Fernandez Date: Mon, 9 Mar 2020 12:39:50 +0100 Subject: Fixed several test procedures. --- test-cli/.idea/workspace.xml | 37 +++++++--------- test-cli/test/helpers/amper.py | 1 - test-cli/test/helpers/changedir.py | 14 ++++++ test-cli/test/helpers/usb.sh | 2 +- test-cli/test/tests/qamper.py | 27 ++++-------- test-cli/test/tests/qduplex_ser.py | 40 ++++++++++-------- test-cli/test/tests/qeeprom.py | 1 - test-cli/test/tests/qi2c.py | 6 +-- test-cli/test/tests/qrtc.py | 26 +++++++----- test-cli/test/tests/qserial.py | 22 ++++++---- test-cli/test/tests/qusb.py | 87 ++++++++++++++------------------------ 11 files changed, 125 insertions(+), 138 deletions(-) create mode 100644 test-cli/test/helpers/changedir.py diff --git a/test-cli/.idea/workspace.xml b/test-cli/.idea/workspace.xml index 7e0aea1..872ab45 100644 --- a/test-cli/.idea/workspace.xml +++ b/test-cli/.idea/workspace.xml @@ -2,12 +2,17 @@ - - + - - - + + + + + + + + + @@ -148,25 +153,13 @@ + + + + - - - - - file://$PROJECT_DIR$/test/runners/simple.py - 83 - - - file://$PROJECT_DIR$/test/runners/simple.py - 58 - - - - \ No newline at end of file diff --git a/test-cli/test/helpers/amper.py b/test-cli/test/helpers/amper.py index 45ec7db..ea719f6 100644 --- a/test-cli/test/helpers/amper.py +++ b/test-cli/test/helpers/amper.py @@ -1,6 +1,5 @@ import serial import scanf -import time class Amper(object): diff --git a/test-cli/test/helpers/changedir.py b/test-cli/test/helpers/changedir.py new file mode 100644 index 0000000..fad9ade --- /dev/null +++ b/test-cli/test/helpers/changedir.py @@ -0,0 +1,14 @@ +import os + + +class changedir: + """Context manager for changing the current working directory""" + def __init__(self, newPath): + self.newPath = os.path.expanduser(newPath) + + def __enter__(self): + self.savedPath = os.getcwd() + os.chdir(self.newPath) + + def __exit__(self, etype, value, traceback): + os.chdir(self.savedPath) \ No newline at end of file diff --git a/test-cli/test/helpers/usb.sh b/test-cli/test/helpers/usb.sh index c8e6924..52ea4b2 100755 --- a/test-cli/test/helpers/usb.sh +++ b/test-cli/test/helpers/usb.sh @@ -11,4 +11,4 @@ for sysdevpath in $(find /sys/bus/usb/devices/usb*/ -name dev); do ) done -return 0 +exit 0 diff --git a/test-cli/test/tests/qamper.py b/test-cli/test/tests/qamper.py index 2b02302..b3ab8b1 100644 --- a/test-cli/test/tests/qamper.py +++ b/test-cli/test/tests/qamper.py @@ -22,30 +22,21 @@ class Qamper(unittest.TestCase): def execute(self): amp = Amper() - print(amp) + # open serial connection if not amp.open(): - print("1") self.fail("failed: can not open serial port") return - + # check if the amperimeter is connected and working if not amp.hello(): - print("2") self.fail("failed: can not communicate") return - + # get current value (in Amperes) result = amp.getCurrent() - print(result) - + # close serial connection amp.close() + # # In order to give a valid result it is importarnt to define an under current value + if result > float(self._overcurrent): + self.fail("failed: Overcurrent detected ( {} )".format(result)) - - - - - - # # In order to give a valid result it is importarnt to define an under current value - # if self._current > float(self._overcurrent): - # self.fail("failed: OVERCURRENT DETECTED ( {} )".format(self._current)) - # - # if self._current < float(self._undercurrent): - # self.fail("failed: UNDERCURRENT DETECTED ( {} )".format(self._current)) + if result < float(self._undercurrent): + self.fail("failed: Undercurrent detected ( {} )".format(result)) diff --git a/test-cli/test/tests/qduplex_ser.py b/test-cli/test/tests/qduplex_ser.py index 46fb5db..0666363 100644 --- a/test-cli/test/tests/qduplex_ser.py +++ b/test-cli/test/tests/qduplex_ser.py @@ -6,7 +6,11 @@ import time class Qduplex(unittest.TestCase): params = None + __port1 = None + __port2 = None + __baudrate = None + # varlist: port1, port2, baudrate def __init__(self, testname, testfunc, varlist): self.params = varlist super(Qduplex, self).__init__(testfunc) @@ -14,46 +18,46 @@ class Qduplex(unittest.TestCase): self.__port1 = varlist["port1"] else: raise Exception('port1 param inside Qduplex must be defined') - self.__serial1 = serial.Serial(self.__port1, timeout=1) - if "port2" in varlist: self.__port2 = varlist["port2"] else: raise Exception('port2 param inside Qduplex must be defined') - self.__serial2 = serial.Serial(self.__port2, timeout=1) - if "baudrate" in varlist: - self.__serial1.baudrate = varlist["baudrate"] - self.__serial2.baudrate = varlist["baudrate"] + self.__baudrate = varlist["baudrate"] else: raise Exception('baudrate param inside Qduplex must be defined') - self._testMethodDoc = testname + # open serial connection + self.__serial1 = serial.Serial(self.__port1, self.__baudrate, timeout=1) + self.__serial1.flushInput() + self.__serial1.flushOutput() + self.__serial2 = serial.Serial(self.__port2, self.__baudrate, timeout=1) + self.__serial2.flushInput() + self.__serial2.flushOutput() + def __del__(self): self.__serial1.close() self.__serial2.close() def execute(self): - self.__serial1.flushInput() - self.__serial1.flushOutput() - self.__serial2.flushInput() - self.__serial2.flushOutput() + # generate a random uuid test_uuid = str(uuid.uuid4()).encode() + # send the uuid through serial port self.__serial1.write(test_uuid) time.sleep(0.05) # there might be a small delay if self.__serial2.inWaiting() == 0: - self.fail("failed: PORT {} wait timeout exceded, wrong communication?".format(self.__port2)) + self.fail("failed: port {} wait timeout exceded".format(self.__port2)) else: - if (self.__serial2.readline() != test_uuid): - self.fail("failed: PORT {} write/read mismatch".format(self.__port2)) + if self.__serial2.readline() != test_uuid: + self.fail("failed: port {} write/read mismatch".format(self.__port2)) test_uuid = str(uuid.uuid4()).encode() + # send the uuid through serial port self.__serial2.write(test_uuid) time.sleep(0.05) # there might be a small delay if self.__serial1.inWaiting() == 0: - self.fail("failed: PORT {} wait timeout exceded, wrong communication?".format(self.__port1)) + self.fail("failed: port {} wait timeout exceded".format(self.__port1)) else: - if (self.__serial1.readline() != test_uuid): - self.fail("failed: PORT {} write/read mismatch".format(self.__port1)) - + if self.__serial1.readline() != test_uuid: + self.fail("failed: port {} write/read mismatch".format(self.__port1)) diff --git a/test-cli/test/tests/qeeprom.py b/test-cli/test/tests/qeeprom.py index d2c55ef..1921bf7 100644 --- a/test-cli/test/tests/qeeprom.py +++ b/test-cli/test/tests/qeeprom.py @@ -34,7 +34,6 @@ class Qeeprom(unittest.TestCase): # compare both values if data_rx != data_tx: self.fail("failed: mismatch between written and received values.") - else: self.fail("failed: Unable to read from the EEPROM device.") else: diff --git a/test-cli/test/tests/qi2c.py b/test-cli/test/tests/qi2c.py index 71eb64e..c59975e 100644 --- a/test-cli/test/tests/qi2c.py +++ b/test-cli/test/tests/qi2c.py @@ -20,7 +20,7 @@ class Qi2c(unittest.TestCase): self._testMethodDoc = testname def execute(self): - str_cmd= "i2cdetect -a -y -r {}".format(self.__busnum) + str_cmd = "i2cdetect -a -y -r {}".format(self.__busnum) i2c_command = SysCommand("i2cdetect", str_cmd) if i2c_command.execute() == 0: self.__raw_out = i2c_command.getOutput() @@ -28,8 +28,8 @@ class Qi2c(unittest.TestCase): return -1 lines=self.__raw_out.decode('ascii').splitlines() for i in range(len(lines)): - if (lines[i].count('UU')): - if (lines[i].find("UU")): + if lines[i].count('UU'): + if lines[i].find("UU"): self.__devices.append("0x{}{}".format((i - 1), hex(int((lines[i].find("UU") - 4) / 3)).split('x')[-1])) for i in range(len(self.__register)): if not(self.__register[i] in self.__devices): diff --git a/test-cli/test/tests/qrtc.py b/test-cli/test/tests/qrtc.py index 884a290..0be0f99 100644 --- a/test-cli/test/tests/qrtc.py +++ b/test-cli/test/tests/qrtc.py @@ -1,11 +1,12 @@ -from test.helpers.syscmd import SysCommand +import sh import unittest import time +import re class Qrtc(unittest.TestCase): params = None - __rtc = "/dev/rtc0" + __rtc = None def __init__(self, testname, testfunc, varlist): self.params = varlist @@ -17,16 +18,19 @@ class Qrtc(unittest.TestCase): self._testMethodDoc = testname def execute(self): - str_cmd = "hwclock -f {}".format(self.__rtc) - rtc_set = SysCommand("rtc_set", str_cmd) - if rtc_set.execute() == 0: - curr_hour = rtc_set.getOutput().decode('ascii').split(" ") - time1 = int((curr_hour[4].split(":"))[2]) + # get time from RTC peripheral + p = sh.hwclock("-f", self.__rtc) + if p.exit_code == 0: + time1 = re.search("([01]?[0-9]{1}|2[0-3]{1})[:][0-5]{1}[0-9]{1}[:][0-5]{1}[0-9]{1}", + p.stdout.decode('ascii')) time.sleep(1) - if rtc_set.execute() == 0: - curr_hour = rtc_set.getOutput().decode('ascii').split(" ") - time2 = int((curr_hour[4].split(":"))[2]) - if time1==time2: + # get time from RTC another time + p = sh.hwclock("-f", self.__rtc) + if p.exit_code == 0: + time2 = re.search("([01]?[0-9]{1}|2[0-3]{1})[:][0-5]{1}[0-9]{1}[:][0-5]{1}[0-9]{1}", + p.stdout.decode('ascii')) + # check if the seconds of both times are different + if time1 == time2: self.fail("failed: RTC is not running") else: self.fail("failed: couldn't execute hwclock command 2nd time") diff --git a/test-cli/test/tests/qserial.py b/test-cli/test/tests/qserial.py index d798578..67e3af1 100644 --- a/test-cli/test/tests/qserial.py +++ b/test-cli/test/tests/qserial.py @@ -6,7 +6,10 @@ import time class Qserial(unittest.TestCase): params = None + __port = None + __baudrate = None + #varlist: port, baudrate def __init__(self, testname, testfunc, varlist): self.params = varlist super(Qserial, self).__init__(testfunc) @@ -14,28 +17,31 @@ class Qserial(unittest.TestCase): self.__port = varlist["port"] else: raise Exception('port param inside Qserial must be defined') - self.__serial = serial.Serial(self.__port, timeout=1) + if "baudrate" in varlist: self.__baudrate = varlist["baudrate"] else: raise Exception('baudrate param inside Qserial must be defined') self._testMethodDoc = testname + # open serial connection + self.__serial = serial.Serial(self.__port, self.__baudrate, timeout=1) + self.__serial.flushInput() + self.__serial.flushOutput() + def __del__(self): self.__serial.close() def execute(self): - self.__serial.flushInput() - self.__serial.flushOutput() - #generate a random uuid + # generate a random uuid test_uuid = str(uuid.uuid4()).encode() - #send the uuid through serial port + # send the uuid through serial port self.__serial.write(test_uuid) time.sleep(0.05) # there might be a small delay if self.__serial.inWaiting() == 0: - self.fail("failed: PORT {} wait timeout exceded, wrong communication?".format(self.__port)) + self.fail("failed: port {} wait timeout exceded".format(self.__port)) else: # check if what it was sent is equal to what has been received - if (self.__serial.readline() != test_uuid): - self.fail("failed: PORT {} write/read mismatch".format(self.__port)) + if self.__serial.readline() != test_uuid: + self.fail("failed: port {} write/read mismatch".format(self.__port)) diff --git a/test-cli/test/tests/qusb.py b/test-cli/test/tests/qusb.py index 70265a5..7ecf31e 100644 --- a/test-cli/test/tests/qusb.py +++ b/test-cli/test/tests/qusb.py @@ -1,69 +1,46 @@ -from test.helpers.syscmd import SysCommand +import sh import unittest +import re +from test.helpers.changedir import changedir class Qusb(unittest.TestCase): - __numPorts = None - __devLabel = None params = None def __init__(self, testname, testfunc, varlist): self.params = varlist super(Qusb, self).__init__(testfunc) - if "numPorts" in varlist: - self.__numPorts = varlist["numPorts"] - else: - raise Exception('numPorts param inside Qusb must be defined') self._testMethodDoc = testname - if "devLabel" in varlist: - self.__devLabel = varlist["devLabel"] - else: - raise Exception('devLabel param inside Qusb must be defined') - if testname == "USBOTG": - self.__usbFileName = "/this_is_an_usb_otg" - self.__usbtext = "USBOTG" - else: - self.__usbFileName = "/this_is_an_usb_host" - self.__usbtext = "USBHOST" - self.__numUsbFail = [] def execute(self): - str_cmd = "lsblk -o LABEL" - lsblk_command = SysCommand("lsblk", str_cmd) - if lsblk_command.execute() == 0: - self.__raw_out = lsblk_command.getOutput() - if self.__raw_out == "": - return -1 - lines = lsblk_command.getOutput().splitlines() - host_list = [] - for i in range(len(lines)): - if str(lines[i].decode('ascii')) == self.__devLabel: - host_list.append(i) - if len(host_list) == int(self.__numPorts): - str_cmd = "lsblk -o MOUNTPOINT" - lsblk_command = SysCommand("lsblk", str_cmd) - if lsblk_command.execute() == 0: - self.__raw_out = lsblk_command.getOutput() - if self.__raw_out == "": - print("failed: no command output") - self.fail("failed: no command output") - else: - lines = lsblk_command.getOutput().splitlines() - for i in range(len(host_list)): - file_path = str(lines[host_list[i]].decode('ascii')) + self.__usbFileName - usb_file = open(file_path, 'r') - read = usb_file.read() - if read.find(self.__usbtext) != -1: - print(file_path + " --> OK!") - else: - self.fail( - "failed: could not read from usb {}".format(lines[host_list[i]].decode('ascii'))) - self.__numUsbFail.append(host_list[i]) - usb_file.close() - else: - self.fail("failed: couldn't execute lsblk command") - + # Execute script usb.sh + p = sh.bash('test/helpers/usb.sh') + # Search in the stdout a pattern "/dev/sd + {letter} + {number} + q = re.search("/dev/sd\w\d", p.stdout.decode('ascii')) + # get the first device which matches the pattern + device = q.group(0) + # create a new folder where the pendrive is going to be mounted + sh.mkdir("-p", "/mnt/pendrive") + # mount the device + p = sh.mount(device, "/mnt/pendrive") + if p.exit_code == 0: + # copy files + p = sh.cp("/root/usbtest/usbdatatest.bin", "/root/usbtest/usbdatatest.md5", "/mnt/pendrive") + if p.exit_code == 0: + # check md5 + with changedir("/mnt/pendrive/"): + p = sh.md5sum("-c", "usbdatatest.md5") + q = re.search("OK", p.stdout.decode('ascii')) + # delete files + sh.rm("-f", "/mnt/pendrive/usbdatatest.bin", "/mnt/pendrive/usbdatatest.md5") + # umount + sh.umount("/mnt/pendrive") + # check result + if q is None: + self.fail("failed: wrong md5 result.") else: - self.fail("failed: reference and real usb host devices number mismatch") + # umount + sh.umount("/mnt/pendrive") + self.fail("failed: unable to copy files.") else: - self.fail("failed: couldn't execute lsblk command") + self.fail("failed: unable to mount the device.") -- cgit v1.1 From d38c92bfd7b6abe3a52b51b87b1a2949b857d9b4 Mon Sep 17 00:00:00 2001 From: Hector Fernandez Date: Mon, 9 Mar 2020 19:16:08 +0100 Subject: Created function to flash the eeprom memory. --- test-cli/.idea/workspace.xml | 32 +++++++++++------ test-cli/test/helpers/button_script.sh | 4 --- test-cli/test/helpers/testsrv_db.py | 13 +++++++ test-cli/test/helpers/usb.sh | 14 -------- test-cli/test/scripts/__init__.py | 0 test-cli/test/scripts/usb.sh | 14 ++++++++ test-cli/test/tests/qbutton.py | 64 ---------------------------------- test-cli/test/tests/qusb.py | 2 +- test-cli/test/tests/qwifi.py | 19 +++++----- test-cli/test_main.py | 62 +++++++++++++++++++++++--------- 10 files changed, 105 insertions(+), 119 deletions(-) delete mode 100644 test-cli/test/helpers/button_script.sh delete mode 100755 test-cli/test/helpers/usb.sh create mode 100644 test-cli/test/scripts/__init__.py create mode 100755 test-cli/test/scripts/usb.sh delete mode 100644 test-cli/test/tests/qbutton.py diff --git a/test-cli/.idea/workspace.xml b/test-cli/.idea/workspace.xml index 872ab45..c2653e0 100644 --- a/test-cli/.idea/workspace.xml +++ b/test-cli/.idea/workspace.xml @@ -2,17 +2,16 @@ - + + - - - - - - - - + + + + + + + + + @@ -93,6 +95,14 @@ + + + + + + + + @@ -157,9 +167,9 @@ - + - + \ No newline at end of file diff --git a/test-cli/test/helpers/button_script.sh b/test-cli/test/helpers/button_script.sh deleted file mode 100644 index 6908f22..0000000 --- a/test-cli/test/helpers/button_script.sh +++ /dev/null @@ -1,4 +0,0 @@ -#!/usr/bin/env bash -i2cset -f -y 1 0x2d 0x40 0x31 -i2cset -f -y 1 0x2d 0x50 0xff -i2cget -f -y 1 0x2d 0x50 diff --git a/test-cli/test/helpers/testsrv_db.py b/test-cli/test/helpers/testsrv_db.py index d937d3e..9fb61fd 100644 --- a/test-cli/test/helpers/testsrv_db.py +++ b/test-cli/test/helpers/testsrv_db.py @@ -104,3 +104,16 @@ class TestSrv_Database(object): r = find_between(str(err), '#', '#') # print(r) return None + + def get_board_variables(self, board_uuid): + sql = "SELECT * FROM isee.f_get_boardvariables('{}')".format(board_uuid) + # print('>>>' + sql) + try: + res = self.__sqlObject.db_execute_query(sql) + # print(res) + return res + except Exception as err: + r = find_between(str(err), '#', '#') + # print(r) + return None + diff --git a/test-cli/test/helpers/usb.sh b/test-cli/test/helpers/usb.sh deleted file mode 100755 index 52ea4b2..0000000 --- a/test-cli/test/helpers/usb.sh +++ /dev/null @@ -1,14 +0,0 @@ -#!/bin/bash - -for sysdevpath in $(find /sys/bus/usb/devices/usb*/ -name dev); do - ( - syspath="${sysdevpath%/dev}" - devname="$(udevadm info -q name -p $syspath)" - [[ "$devname" == "bus/"* ]] && continue - eval "$(udevadm info -q property --export -p $syspath)" - [[ -z "$ID_SERIAL" ]] && continue - echo "/dev/$devname - $ID_SERIAL" - ) -done - -exit 0 diff --git a/test-cli/test/scripts/__init__.py b/test-cli/test/scripts/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/test-cli/test/scripts/usb.sh b/test-cli/test/scripts/usb.sh new file mode 100755 index 0000000..52ea4b2 --- /dev/null +++ b/test-cli/test/scripts/usb.sh @@ -0,0 +1,14 @@ +#!/bin/bash + +for sysdevpath in $(find /sys/bus/usb/devices/usb*/ -name dev); do + ( + syspath="${sysdevpath%/dev}" + devname="$(udevadm info -q name -p $syspath)" + [[ "$devname" == "bus/"* ]] && continue + eval "$(udevadm info -q property --export -p $syspath)" + [[ -z "$ID_SERIAL" ]] && continue + echo "/dev/$devname - $ID_SERIAL" + ) +done + +exit 0 diff --git a/test-cli/test/tests/qbutton.py b/test-cli/test/tests/qbutton.py deleted file mode 100644 index 46ddde0..0000000 --- a/test-cli/test/tests/qbutton.py +++ /dev/null @@ -1,64 +0,0 @@ -from test.helpers.syscmd import SysCommand -import unittest -import time - - -class Qbutton(unittest.TestCase): - params = None - - def __init__(self, testname, testfunc, varlist): - self.params = varlist - super(Qbutton, self).__init__(testfunc) - if "gpio" in varlist: - self.__gpio = varlist["gpio"] - else: - raise Exception('gpio param inside Qbutton must be defined') - if self.__gpio == "SOPA": - super(Qbutton, self).__init__("buttonSopa") - else: - super(Qbutton, self).__init__("buttonGpio") - self._testMethodDoc = testname - - def buttonGpio(self): - print("normal-button-test-using-gpio") - self.fail("failed: GPIO BUTTON FAIL") - - def buttonSopa(self): - str_cmd = "i2cset -f -y 1 0x2d 0x40 0x31" - disable_pmic = SysCommand("disable_pmic", str_cmd) - disable_pmic.execute() - # BUG: REPEAT THIS EXECUTION TWICE BECAUSE FIRST TIME IT RETURNS AN ERROR - led_on="echo 1 > /sys/class/leds/red\:usbhost/brightness" - ledon = SysCommand("led_on", led_on) - ledon.execute() - time.sleep(0.1) - disable_pmic.execute() - if disable_pmic.execute() == 0: - str_cmd = "i2cset -f -y 1 0x2d 0x50 0xff" - reset_button = SysCommand("reset_button", str_cmd) - if reset_button.execute() == 0: - str_cmd = "i2cget -f -y 1 0x2d 0x50" - get_button_val = SysCommand("get_button_val", str_cmd) - print("\n\t --> PRESS button for 1 sec (TIMEOUT: 10s) \n") - timeout = 0 - while timeout < 7200: - if get_button_val.execute() == 0: - get_button_val.execute() - button_value = get_button_val.getOutput() - button_value=button_value.decode('ascii').split("x") - if int(button_value[1]) == 4: - timeout = 7200 - led_off="echo 0 > /sys/class/leds/red\:usbhost/brightness" - ledoff = SysCommand("led_off", led_off) - ledoff.execute() - time.sleep(0.5) - timeout = timeout + 1 - if timeout==7200 and int(button_value[1]) == 0: - self.fail("failed: timeout exceeded") - else: - timeout = 7200 - self.fail("failed: not button input") - else: - self.fail("failed: could not complete i2c reset button state") - else: - self.fail("failed: could not complete i2c disable PMIC") diff --git a/test-cli/test/tests/qusb.py b/test-cli/test/tests/qusb.py index 7ecf31e..6a004f0 100644 --- a/test-cli/test/tests/qusb.py +++ b/test-cli/test/tests/qusb.py @@ -14,7 +14,7 @@ class Qusb(unittest.TestCase): def execute(self): # Execute script usb.sh - p = sh.bash('test/helpers/usb.sh') + p = sh.bash('test/scripts/usb.sh') # Search in the stdout a pattern "/dev/sd + {letter} + {number} q = re.search("/dev/sd\w\d", p.stdout.decode('ascii')) # get the first device which matches the pattern diff --git a/test-cli/test/tests/qwifi.py b/test-cli/test/tests/qwifi.py index 2a5fa0c..8daf069 100644 --- a/test-cli/test/tests/qwifi.py +++ b/test-cli/test/tests/qwifi.py @@ -4,14 +4,14 @@ import re class Qwifi(unittest.TestCase): - __sip = None + __serverip = None __numbytestx = None __bind = None - __OKBW = None + __bwexpected = None __port = None params = None - #varlist: serverip, bind, OKBW, port + # varlist content: serverip, bwexpected, port, (optional)bind def __init__(self, testname, testfunc, varlist): self.params = varlist super(Qwifi, self).__init__(testfunc) @@ -19,8 +19,8 @@ class Qwifi(unittest.TestCase): self.__serverip = varlist["serverip"] else: raise Exception('serverip param inside Qwifi have been be defined') - if "OKBW" in varlist: - self.__OKBW = varlist["OKBW"] + if "bwexpected" in varlist: + self.__bwexpected = varlist["bwexpected"] else: raise Exception('OKBW param inside Qwifi must be defined') if "port" in varlist: @@ -41,11 +41,12 @@ class Qwifi(unittest.TestCase): # get the first line of the output stream out1 = p.stdout.decode('ascii').splitlines()[0] if out1 != "Not connected.": - #check if the board has ip in the wlan0 interface + # check if the board has ip in the wlan0 interface p = sh.ifconfig("wlan0") if p.exit_code == 0: - result = re.search("inet addr:(?!127\.0{1,3}\.0{1,3}\.0{0,2}1$)((25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.){3}(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)", - p.stdout) + result = re.search( + 'inet addr:(?!127\.0{1,3}\.0{1,3}\.0{0,2}1$)((25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.){3}(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)', + p.stdout.decode('ascii')) if result: # execute iperf command against the server if self.__bind is None: @@ -69,7 +70,7 @@ class Qwifi(unittest.TestCase): bwreal = b[0] # check if BW is in the expected range - self.failUnless(float(bwreal) > float(self.__OKBW) * 0.9, + self.failUnless(float(bwreal) > float(self.__bwexpected) * 0.9, "failed: speed is lower than spected. Speed(MB/s)" + str(bwreal)) else: self.fail("failed: could not complete iperf command") diff --git a/test-cli/test_main.py b/test-cli/test_main.py index 0e863dc..f8e3c1a 100644 --- a/test-cli/test_main.py +++ b/test-cli/test_main.py @@ -1,15 +1,11 @@ from test.helpers.int_registers import get_die_id from test.helpers.int_registers import get_mac from subprocess import call -import xml.etree.ElementTree as XMLParser -import errno -import sys import os import unittest from test.helpers.testsrv_db import TestSrv_Database from test.helpers.setup_xml import XMLSetup from test.runners.simple import SimpleTestRunner -from test.tests.qbutton import Qbutton from test.helpers.syscmd import TestSysCommand from test.tests.qethernet import Qethernet from test.tests.qaudio import Qaudio @@ -18,17 +14,16 @@ from test.tests.qusb import Qusb from test.tests.qi2c import Qi2c from test.tests.qeeprom import Qeeprom from test.tests.qserial import Qserial -from test.tests.qscreen import Qscreen from test.tests.qwifi import Qwifi from test.tests.qrtc import Qrtc from test.tests.qduplex_ser import Qduplex from test.tests.qamper import Qamper -from test.tests.qflash import Qflasher from test.tests.qnand import Qnand from test.helpers.globalVariables import globalVar import socket from test.helpers.iseelogger import ISEE_Logger import logging +import binascii psdbObj = TestSrv_Database() xmlObj = None @@ -50,8 +45,6 @@ def create_paramslist(params): def add_test_task(suite, testdefname, paramlist): - testfunc = None - if testdefname == "AUDIO": suite.addTest(Qaudio(testdefname, "execute", paramlist)) elif testdefname == "RAM": @@ -62,18 +55,10 @@ def add_test_task(suite, testdefname, paramlist): suite.addTest(Qeeprom(testdefname, "execute", paramlist)) elif testdefname == "SERIAL": suite.addTest(Qserial(testdefname, "execute", paramlist)) - # elif testdefname == "HDMI": - # suite.addTest(Qhdmi(testdefname, "execute", paramlist)) - # elif testdefname == "SCREEN": - # suite.addTest(Qscreen(testdefname, "execute", paramlist)) - elif testdefname == "BUTTON": - suite.addTest(Qbutton(testdefname, "execute", paramlist)) elif testdefname == "RTC": suite.addTest(Qrtc(testdefname, "execute", paramlist)) elif testdefname == "CONSUMPTION": suite.addTest(Qamper(testdefname, "execute", paramlist)) - # elif testdefname == "SATA": - # suite.addTest(Qsata(testdefname, "execute", paramlist)) elif testdefname == "DMESG": suite.addTest(TestSysCommand(testdefname, "execute", paramlist)) elif testdefname == "ETHERNET": @@ -130,10 +115,55 @@ def create_board(): get_mac(globalVar.g_mid)) +def program_eeprom(eeprompath): + # check if eeprompath is correct + if os.path.isfile(eeprompath): + # create u-boot data struct + data = bytearray() + data += (2029785358).to_bytes(4, 'big') # magicid --> 0x78FC110E + data += bytearray([0, 0, 0, 0]) # crc32 = 0 + data += bytearray(globalVar.g_uuid + "\0", + 'ascii') # uuid --> 'c0846c8a-5fa5-11ea-8576-f8b156ac62d7' and \0 at the end + data += binascii.unhexlify((get_mac(globalVar.g_mid)).replace(':', '')) # mac0 --> 'f8:b1:56:ac:62:d7' + data += bytearray([0, 0, 0, 0, 0, 0]) # mac1 --> 0:0:0:0:0:0 + # calculate crc + crc = (binascii.crc32(data, 0)).to_bytes(4, 'big') + data[4:8] = crc + # write into eeprom an validate + f = open(eeprompath, "r+b") + f.write(data) + f.seek(0) + data_rx = f.read(57) + for i in range(57): + if data_rx[i] != data[i]: + print("Error while programming eeprom memory.") + break + print("Eeprom programmed succesfully.") + + +def flash_nvmemory(flashscript): + if os.path.isfile(flashscript): + print("a") + + +def create_boardvariables_list(uuid): + varlist = {} + for row in psdbObj.get_board_variables(globalVar.g_uuid): + varname, varvalue = row + varlist[varname] = varvalue + return varlist + + def main(): + # initialize the board create_board() + # create and run tests according to the board type runner = SimpleTestRunner(psdbObj) runner.run(create_testsuite()) + # execute aditional tasks + varlist = create_boardvariables_list(globalVar.g_uuid) + program_eeprom(varlist["eeprompath"]) + flash_nvmemory(varlist["flashscript"]) if __name__ == "__main__": -- cgit v1.1 From 41ffba6a76a80a7ef4553cb8856393dd209d172e Mon Sep 17 00:00:00 2001 From: Hector Fernandez Date: Tue, 10 Mar 2020 17:37:16 +0100 Subject: First release of the test. All the tests work correctly for SOPA0000. --- test-cli/.idea/workspace.xml | 21 ++++---- test-cli/setup.xml | 2 +- test-cli/test/files/test_ok.png | Bin 18150 -> 0 bytes test-cli/test/files/usbtest/usbdatatest.bin | Bin 0 -> 33554431 bytes test-cli/test/files/usbtest/usbdatatest.md5 | 1 + test-cli/test/flashers/__init__.py | 0 test-cli/test/flashers/flasher.py | 10 ++++ test-cli/test/flashers/flasher_sopa0000.py | 13 +++++ test-cli/test/helpers/int_registers.py | 3 +- test-cli/test/helpers/uboot_flasher.py | 0 test-cli/test/runners/simple.py | 1 + test-cli/test/suites/__init__.py | 0 test-cli/test/tests/qduplex_ser.py | 18 +++++-- test-cli/test/tests/qflash.py | 77 ---------------------------- test-cli/test/tests/qserial.py | 8 +-- test-cli/test/tests/qusb.py | 10 +++- test-cli/test_main.py | 30 ++++++----- 17 files changed, 82 insertions(+), 112 deletions(-) delete mode 100644 test-cli/test/files/test_ok.png create mode 100644 test-cli/test/files/usbtest/usbdatatest.bin create mode 100644 test-cli/test/files/usbtest/usbdatatest.md5 create mode 100644 test-cli/test/flashers/__init__.py create mode 100644 test-cli/test/flashers/flasher.py create mode 100644 test-cli/test/flashers/flasher_sopa0000.py delete mode 100644 test-cli/test/helpers/uboot_flasher.py delete mode 100644 test-cli/test/suites/__init__.py delete mode 100644 test-cli/test/tests/qflash.py diff --git a/test-cli/.idea/workspace.xml b/test-cli/.idea/workspace.xml index c2653e0..6b65c5c 100644 --- a/test-cli/.idea/workspace.xml +++ b/test-cli/.idea/workspace.xml @@ -2,15 +2,18 @@ - - + - - - - + + + + + + + + + - \ No newline at end of file diff --git a/test-cli/test/flashers/flasheeprom.py b/test-cli/test/flashers/flasheeprom.py new file mode 100644 index 0000000..e427b87 --- /dev/null +++ b/test-cli/test/flashers/flasheeprom.py @@ -0,0 +1,36 @@ +import os +import binascii + + +def flash_eeprom(eeprompath, boarduuid, mac0, mac1=None): + print("Start programming Eeprom...") + # check if eeprompath is correct + if os.path.isfile(eeprompath): + # create u-boot data struct + data = bytearray() + data += (2029785358).to_bytes(4, 'big') # magicid --> 0x78FC110E + data += bytearray([0, 0, 0, 0]) # crc32 = 0 + data += bytearray(boarduuid + "\0", + 'ascii') # uuid --> 'c0846c8a-5fa5-11ea-8576-f8b156ac62d7' and \0 at the end + data += binascii.unhexlify(mac0.replace(':', '')) # mac0 --> 'f8:b1:56:ac:62:d7' + if mac1 is not None: + data += binascii.unhexlify(mac1.replace(':', '')) # mac1 --> 'f8:b1:56:ac:62:d7' + else: + data += bytearray([0, 0, 0, 0, 0, 0]) # mac1 --> 0:0:0:0:0:0 + # calculate crc + crc = (binascii.crc32(data, 0)).to_bytes(4, 'big') + data[4:8] = crc + # write into eeprom and read back + f = open(eeprompath, "r+b") + f.write(data) + f.seek(0) + data_rx = f.read(57) + for i in range(57): + if data_rx[i] != data[i]: + print("Error while programming eeprom memory.") + return 1 + print("Eeprom programmed succesfully.") + return 0 + else: + print("eeprom memory not found.") + return 1 diff --git a/test-cli/test/flashers/flasher.py b/test-cli/test/flashers/flasher.py deleted file mode 100644 index d962fca..0000000 --- a/test-cli/test/flashers/flasher.py +++ /dev/null @@ -1,10 +0,0 @@ -from test.flashers.flasher_sopa0000 import flash_sopa0000 - - -def flash(modelid): - result = None - - if modelid.find("SOPA0000") == 0: - result = flash_sopa0000() - - return result diff --git a/test-cli/test/flashers/flasher_sopa0000.py b/test-cli/test/flashers/flasher_sopa0000.py deleted file mode 100644 index 818be91..0000000 --- a/test-cli/test/flashers/flasher_sopa0000.py +++ /dev/null @@ -1,13 +0,0 @@ -from test.helpers.syscmd import SysCommand - - -def flash_sopa0000(): - print("Sart programming Nand memory...") - str_cmd = "/usr/bin/igep-flash --skip-nandtest --image /opt/firmware/demo-ti-image-*-*.tar* >/dev/null 2>&1" - sync_command = SysCommand("sync_command", str_cmd) - if sync_command.execute() != 0: - print("Flasher: Could not complete flashing task.") - return 1 - else: - print("Flasher: NAND memory flashed succesfully.") - return 0 diff --git a/test-cli/test/flashers/flashmemory.py b/test-cli/test/flashers/flashmemory.py new file mode 100644 index 0000000..ac59be5 --- /dev/null +++ b/test-cli/test/flashers/flashmemory.py @@ -0,0 +1,12 @@ +import sh + + +def flash_memory(imagefile): + print("Sart programming Nand memory...") + p = sh.bash("/usr/bin/igep-flash", "--skip-nandtest", "--image", "/opt/firmware/" + imagefile) + if p.exit_code != 0: + print("Flasher: Could not complete flashing task.") + return 1 + else: + print("Flasher: NAND memory flashed succesfully.") + return 0 diff --git a/test-cli/test/helpers/finisher.py b/test-cli/test/helpers/finisher.py deleted file mode 100644 index 73142d9..0000000 --- a/test-cli/test/helpers/finisher.py +++ /dev/null @@ -1,151 +0,0 @@ -from test.helpers.syscmd import SysCommand -from test.helpers.globalVariables import globalVar -import binascii -import uuid -import subprocess -from PIL import Image, ImageDraw, ImageFont -import qrcode -import PIL - -class Finisher(object): - __muhb = None - __final_to_burn_to_eeprom = None - __qr_image = None - - def __init__(self, muhb = None): - self.__muhb = muhb - self.__final_to_burn_to_eeprom = bytearray() - self.__qr_image = None - pass - - def eeprom_burn(self): - ## Create binary file - str_cmd2 = "find /sys/ -iname 'eeprom'" - eeprom_location = SysCommand("eeprom_location", str_cmd2) - if eeprom_location.execute() == 0: - raw_out = eeprom_location.getOutput() - if raw_out == "": - self.fail("Unable to get EEPROM location. IS EEPROM CONNECTED?") - eeprom = raw_out.decode('ascii') - eeprom = eeprom.strip('\n') - ## push binary data to eeprom like if working with files - file2 = open(eeprom, "w+b") - file2.write(self.__final_to_burn_to_eeprom) - else: - self.fail("failed: could not complete find eeprom command") - - def generate_qr_stamp(self): - # Generate QR to put in a stamp - qr = qrcode.QRCode( - version=1, - error_correction=qrcode.constants.ERROR_CORRECT_L, - box_size=10, - border=4, - ) - # Use board_uuid to generate the QR code - qr.add_data(globalVar.g_uuid) - qr.make(fit=True) - # Save QR as image stamp.png - img = qr.make_image() - # Store QR as a class atrib - self.__qr_image = img - img.save('/home/root/stamp.png') - - def print_stamp(self): - # Print stamp by sending a cmd to the printer - str_cmd3 = "lpr -o scaling=4 stamp.png".format(self.__display) - printstamp = SysCommand("printstamp", str_cmd3) - if printstamp.execute() != 0: - self.fail("failed: could not print stamp") - - def generate_screen(self): - # Generate green image with board uuid and QR maybe pasted on top of green image - # Define image size - W = 1250 - H = 703 - # Create blank rectangle to write on - image = Image.new('RGB', (W, H), (46, 204, 113, 0)) - draw = ImageDraw.Draw(image) - message = "TEST OK\n\n" + "Board UUID: "+ globalVar.g_uuid - font = ImageFont.truetype('/usr/share/fonts/truetype/dejavuu/DejaVuSans.ttf', 40) - # Calculate the width and height of the text to be drawn, given font size - w, h = draw.textsize(message, font=font) - # Write the text to the image, where (x,y) is the top left corner of the text - draw.text(((W-w)/2,(H-h)/2), message, align='center', font=font) - #draw.image(self.__qr_image) - image.save('/home/root/test/files/test_ok.png') - - def show_result_screen(self): - # If test OK show test_OK.png located in /home/root, If test fail show test_fail.png, located in /home/root/test/files/test_KO.png - if globalVar.fstatus: - str_cmd4 = "fbi -T 1 --noverbose -d /dev/{} test/files/test_ok.png".format('fb0') - else: - str_cmd4 = "fbi -T 1 --noverbose -d /dev/{} test/files/test_ko.png".format('fb0') - - display_image = SysCommand("display_image", str_cmd4) - #print(display_image.execute()) - if display_image.execute() != -1: - self.fail("failed: could not display the image") - - - def end_ok(self): - # Burn retrieved igep eeprom struct - new = self.__muhb[0][0] - - # Convert from string to hex - hnew = new.encode() - # Magic_id and default crc32 0x6d, 0x6a, 0x6d, 0xe4 with endianess changed so that u-boot loads it correctly - # IF magic ever changes this magic_id should be changed. At the end always magic_id of test and u-boot should - # be the same - magic_id = bytes([0xe4, 0x6d, 0x6a, 0x6d]) - default_igep_crc32 = bytes([0x00, 0x00, 0x00, 0x00]) - - # Create bytearray - to_calculate = bytearray() - # Build the final hex binary for crc32 operation - to_calculate.extend(magic_id) - to_calculate.extend(default_igep_crc32) - to_calculate.extend(hnew) - - # Calculate crc32! - new_crc32 = binascii.crc32(to_calculate) - hnew_crc32 = new_crc32.to_bytes(4, byteorder="little") - - # Recreate final eeprom struct in bytearray - self.__final_to_burn_to_eeprom = bytearray() - self.__final_to_burn_to_eeprom.extend(magic_id) - self.__final_to_burn_to_eeprom.extend(hnew_crc32) - self.__final_to_burn_to_eeprom.extend(hnew) - self.eeprom_burn() - - # Generate QR stamp - self.generate_qr_stamp() - # Send print stamp command - #self.print_stamp() - # Generate green image with board uuid and QR maybe pasted on top of green image - self.generate_screen() - # Show test_ok.png image in screen - self.show_result_screen() - - def end_fail(self): - # Burn igep eeprom bug struct - hnew = self.__muhb.encode() - default_fail = bytes([0xD0, 0xBA, 0xD0, 0xBA]) - self.__final_burn_to_eeprom = bytearray() - self.__final_to_burn_to_eeprom.extend(default_fail) - self.__final_to_burn_to_eeprom.extend(default_fail) - self.__final_to_burn_to_eeprom.extend(hnew) - self.__final_to_burn_to_eeprom.extend(default_fail) - self.__final_to_burn_to_eeprom.extend(default_fail) - self.__final_to_burn_to_eeprom.extend(default_fail) - self.__final_to_burn_to_eeprom.extend(default_fail) - self.__final_to_burn_to_eeprom.extend(default_fail) - self.__final_to_burn_to_eeprom.extend(default_fail) - self.__final_to_burn_to_eeprom.extend(default_fail) - self.__final_to_burn_to_eeprom.extend(default_fail) - self.__final_to_burn_to_eeprom.extend(default_fail) - self.__final_to_burn_to_eeprom.extend(default_fail) - self.__final_to_burn_to_eeprom.extend(default_fail) - self.eeprom_burn() - # Show test_ko.png image in screen - self.show_result_screen() \ No newline at end of file diff --git a/test-cli/test/helpers/globalVariables.py b/test-cli/test/helpers/globalVariables.py index 6b89f4d..baaae57 100644 --- a/test-cli/test/helpers/globalVariables.py +++ b/test-cli/test/helpers/globalVariables.py @@ -6,3 +6,4 @@ def globalVar(): g_mid = "" outdata = "NONE" station = "" + taskid_ctl = "" diff --git a/test-cli/test/helpers/int_registers.py b/test-cli/test/helpers/int_registers.py index 22a3d9b..0feb8da 100644 --- a/test-cli/test/helpers/int_registers.py +++ b/test-cli/test/helpers/int_registers.py @@ -55,6 +55,7 @@ def get_mac(modelid): # mac = mac + read(rg) # #erase trailing zeros # mac = mac[4::1] + # # To be finished... mac = sh.cat("/sys/class/net/eth0/address").strip() return mac diff --git a/test-cli/test/helpers/syscmd.py b/test-cli/test/helpers/syscmd.py index a869bd7..6114449 100644 --- a/test-cli/test/helpers/syscmd.py +++ b/test-cli/test/helpers/syscmd.py @@ -1,6 +1,5 @@ import unittest import subprocess -from test.helpers.globalVariables import globalVar class TestSysCommand(unittest.TestCase): diff --git a/test-cli/test/helpers/testsrv_db.py b/test-cli/test/helpers/testsrv_db.py index 9fb61fd..c9372fc 100644 --- a/test-cli/test/helpers/testsrv_db.py +++ b/test-cli/test/helpers/testsrv_db.py @@ -51,14 +51,13 @@ class TestSrv_Database(object): try: res = self.__sqlObject.db_execute_query(sql) # print(res) - return res; + return res except Exception as err: r = find_between(str(err), '#', '#') # print(r) return None def get_test_params_list(self, testid): - '''get the board test list''' sql = "SELECT * FROM isee.f_get_test_params_list({})".format(testid) # print('>>>' + sql) try: @@ -71,7 +70,6 @@ class TestSrv_Database(object): return None def open_test(self, board_uuid): - '''get the board test list''' sql = "SELECT * FROM isee.f_open_test('{}')".format(board_uuid) # print('>>>' + sql) try: @@ -84,7 +82,6 @@ class TestSrv_Database(object): return None def run_test(self, testid_ctl, testid): - '''get the board test list''' sql = "SELECT isee.f_run_test({},{})".format(testid_ctl, testid) # print('>>>' + sql) try: @@ -95,7 +92,6 @@ class TestSrv_Database(object): return None def finish_test(self, testid_ctl, testid, newstatus): - '''get the board test list''' sql = "SELECT isee.f_finish_test({},{},'{}')".format(testid_ctl, testid, newstatus) # print('>>>' + sql) try: @@ -117,3 +113,46 @@ class TestSrv_Database(object): # print(r) return None + def get_board_macaddr(self, board_uuid): + sql = "SELECT * FROM isee.f_get_board_macaddr('{}')".format(board_uuid) + # print('>>>' + sql) + try: + res = self.__sqlObject.db_execute_query(sql) + # print(res) + return res[0][0] + except Exception as err: + r = find_between(str(err), '#', '#') + # print(r) + return None + + def create_process(self, testid_ctl): + sql = "SELECT * FROM isee.f_create_process({})".format(testid_ctl) + # print('>>>' + sql) + try: + res = self.__sqlObject.db_execute_query(sql) + # print(res) + return res[0][0] + except Exception as err: + r = find_between(str(err), '#', '#') + # print(r) + return None + + def create_task_result(self, taskid_ctl, name, newstatus): + sql = "SELECT isee.f_create_task_result({},'{}','{}')".format(taskid_ctl, name, newstatus) + # print('>>>' + sql) + try: + self.__sqlObject.db_execute_query(sql) + except Exception as err: + r = find_between(str(err), '#', '#') + # print(r) + return None + + def update_taskctl_status(self, taskid_ctl, newstatus): + sql = "SELECT isee.f_update_taskctl_status({},'{}')".format(taskid_ctl, newstatus) + # print('>>>' + sql) + try: + self.__sqlObject.db_execute_query(sql) + except Exception as err: + r = find_between(str(err), '#', '#') + # print(r) + return None diff --git a/test-cli/test/runners/simple.py b/test-cli/test/runners/simple.py index f8b4dd4..a165406 100644 --- a/test-cli/test/runners/simple.py +++ b/test-cli/test/runners/simple.py @@ -1,5 +1,3 @@ -#!/usr/bin/env python - """ Simple Test Runner for unittest module @@ -7,9 +5,6 @@ Simple Test Runner for unittest module import sys import unittest -import os -from test.helpers.globalVariables import globalVar -from test.helpers.testsrv_db import TestSrv_Database class SimpleTestRunner: @@ -65,7 +60,6 @@ class TextTestResult(unittest.TestResult): def addError(self, test, err): unittest.TestResult.addError(self, test, err) test.longMessage = err[1] - print(err[1]) self.result = self.ERROR def addFailure(self, test, err): diff --git a/test-cli/test_main.py b/test-cli/test_main.py index 6e56214..6681d4e 100644 --- a/test-cli/test_main.py +++ b/test-cli/test_main.py @@ -23,8 +23,8 @@ from test.helpers.globalVariables import globalVar import socket from test.helpers.iseelogger import ISEE_Logger import logging -import binascii -from test.flashers.flasher import flash +from test.flashers.flasheeprom import flash_eeprom +from test.flashers.flashmemory import flash_memory # global variables psdbObj = TestSrv_Database() @@ -80,8 +80,6 @@ def add_test_task(suite, testdefname, paramlist): def create_testsuite(): # create an object TestSuite suite = unittest.TestSuite() - # get id of the full test for this board - globalVar.testid_ctl = psdbObj.open_test(globalVar.g_uuid) # get list of tests for this board tests = psdbObj.get_tests_list(globalVar.g_uuid) # loop in every test for this board @@ -117,41 +115,9 @@ def create_board(): get_mac(globalVar.g_mid)) -def program_eeprom(eeprompath): - print("Start programming Eeprom...") - # check if eeprompath is correct - if os.path.isfile(eeprompath): - # create u-boot data struct - data = bytearray() - data += (2029785358).to_bytes(4, 'big') # magicid --> 0x78FC110E - data += bytearray([0, 0, 0, 0]) # crc32 = 0 - data += bytearray(globalVar.g_uuid + "\0", - 'ascii') # uuid --> 'c0846c8a-5fa5-11ea-8576-f8b156ac62d7' and \0 at the end - mac0 = get_mac(globalVar.g_mid) - data += binascii.unhexlify(mac0.replace(':', '')) # mac0 --> 'f8:b1:56:ac:62:d7' - data += bytearray([0, 0, 0, 0, 0, 0]) # mac1 --> 0:0:0:0:0:0 - # calculate crc - crc = (binascii.crc32(data, 0)).to_bytes(4, 'big') - data[4:8] = crc - # write into eeprom and read back - f = open(eeprompath, "r+b") - f.write(data) - f.seek(0) - data_rx = f.read(57) - for i in range(57): - if data_rx[i] != data[i]: - print("Error while programming eeprom memory.") - return 1 - print("Eeprom programmed succesfully.") - return 0 - else: - print("eeprom memory not found.") - return 1 - - def create_boardvariables_list(uuid): varlist = {} - for row in psdbObj.get_board_variables(globalVar.g_uuid): + for row in psdbObj.get_board_variables(uuid): varname, varvalue = row varlist[varname] = varvalue return varlist @@ -160,14 +126,47 @@ def create_boardvariables_list(uuid): def main(): # initialize the board create_board() + # create a process + globalVar.testid_ctl = psdbObj.open_test(globalVar.g_uuid) + globalVar.taskid_ctl = psdbObj.create_process(globalVar.testid_ctl) # create and run tests according to the board type runner = SimpleTestRunner(psdbObj) testresult = runner.run(create_testsuite()) # execute aditional tasks, only if the test was succesfull if testresult.wasSuccessful(): + # change status to "running tasks" + psdbObj.update_taskctl_status(globalVar.taskid_ctl, "TASK_BOARD_RUNNING") + # get extra variables varlist = create_boardvariables_list(globalVar.g_uuid) - program_eeprom(varlist["eeprompath"]) - flash(globalVar.g_mid) + + resultmemory = 0 + # flash eeprom + if "eeprompath" in varlist and len(varlist["eeprompath"]) > 0: + mac0 = psdbObj.get_board_macaddr(globalVar.g_uuid) + resulteeprom = flash_eeprom(varlist["eeprompath"], globalVar.g_uuid, mac0) + psdbObj.create_task_result(globalVar.taskid_ctl, "FLASHEEPROM", + "TASK_OK" if resulteeprom == 0 else "TASK_FAIL") + else: + resulteeprom = 0 + psdbObj.create_task_result(globalVar.taskid_ctl, "FLASHEEPROM", "TASK_SKIP") + + # flash non-volatile memory + if "image" in varlist and len(varlist["image"]) > 0: + resultmemory = flash_memory(varlist["image"]) + psdbObj.create_task_result(globalVar.taskid_ctl, "FLASHMEMORY", + "TASK_OK" if resultmemory == 0 else "TASK_FAIL") + else: + resultmemory = 0 + psdbObj.create_task_result(globalVar.taskid_ctl, "FLASHMEMORY", "TASK_SKIP") + + # update status with the result + if resulteeprom == 0 and resultmemory == 0: + psdbObj.update_taskctl_status(globalVar.taskid_ctl, "TASK_BOARD_OK") + else: + psdbObj.update_taskctl_status(globalVar.taskid_ctl, "TASK_BOARD_FAIL") + else: + # change status to "not done" + psdbObj.update_taskctl_status(globalVar.taskid_ctl, "TASK_BOARD_NOT_DONE") if __name__ == "__main__": -- cgit v1.1 From 71d9a1f0b6bb4389cb9b0760ce4e847574fdcd45 Mon Sep 17 00:00:00 2001 From: Hector Fernandez Date: Fri, 13 Mar 2020 12:36:14 +0100 Subject: SOPA0000 test: Saved the variable data used during the eeprom and nand flashing. --- test-cli/.idea/workspace.xml | 8 +++++++- test-cli/test/helpers/testsrv_db.py | 4 ++-- test-cli/test/tests/qnand.py | 2 +- test-cli/test/tests/qram.py | 2 +- test-cli/test_main.py | 4 ++-- 5 files changed, 13 insertions(+), 7 deletions(-) diff --git a/test-cli/.idea/workspace.xml b/test-cli/.idea/workspace.xml index 6167250..9b9e803 100644 --- a/test-cli/.idea/workspace.xml +++ b/test-cli/.idea/workspace.xml @@ -1,7 +1,13 @@ - + + + + + + + \ No newline at end of file diff --git a/test-cli/test/files/dtmf-13579.wav b/test-cli/test/files/dtmf-13579.wav index 1ca5b93..c5b416a 100644 Binary files a/test-cli/test/files/dtmf-13579.wav and b/test-cli/test/files/dtmf-13579.wav differ diff --git a/test-cli/test/files/test_pattern.png b/test-cli/test/files/test_pattern.png deleted file mode 100644 index 353aab5..0000000 Binary files a/test-cli/test/files/test_pattern.png and /dev/null differ diff --git a/test-cli/test/helpers/DiskHelpers.py b/test-cli/test/helpers/DiskHelpers.py new file mode 100644 index 0000000..79b661e --- /dev/null +++ b/test-cli/test/helpers/DiskHelpers.py @@ -0,0 +1,9 @@ +import stat +import os + + +def disk_exists(path): + try: + return stat.S_ISBLK(os.stat(path).st_mode) + except: + return False diff --git a/test-cli/test/helpers/int_registers.py b/test-cli/test/helpers/int_registers.py index 0feb8da..9de503b 100644 --- a/test-cli/test/helpers/int_registers.py +++ b/test-cli/test/helpers/int_registers.py @@ -44,7 +44,7 @@ def get_die_id(modelid): return dieid -def get_mac(modelid): +def get_internal_mac(modelid): mac = None if modelid.find("IGEP0034") == 0 or modelid.find("SOPA0000") == 0: diff --git a/test-cli/test/helpers/testsrv_db.py b/test-cli/test/helpers/testsrv_db.py index 637d45c..7eeefb3 100644 --- a/test-cli/test/helpers/testsrv_db.py +++ b/test-cli/test/helpers/testsrv_db.py @@ -91,8 +91,8 @@ class TestSrv_Database(object): # print(r) return None - def finish_test(self, testid_ctl, testid, newstatus): - sql = "SELECT isee.f_finish_test({},{},'{}')".format(testid_ctl, testid, newstatus) + def finish_test(self, testid_ctl, testid, newstatus, textresult): + sql = "SELECT isee.f_finish_test({},{},'{}','{}')".format(testid_ctl, testid, newstatus, textresult) # print('>>>' + sql) try: self.__sqlObject.db_execute_query(sql) @@ -101,35 +101,13 @@ class TestSrv_Database(object): # print(r) return None - def upload_result_string(self, testid_ctl, testid, descrip, strdata): - sql = "SELECT isee.f_upload_result_string({},{},'{}','{}')".format(testid_ctl, testid, descrip, strdata) - # print('>>>' + sql) - try: - self.__sqlObject.db_execute_query(sql) - except Exception as err: - r = find_between(str(err), '#', '#') - # print(r) - return None - - def upload_result_file(self, testid_ctl, testid, descrip, filepath): + def upload_result_file(self, testid_ctl, testid, desc, filepath, mimetype): try: # generate a new oid fileoid = self.__sqlObject.db_upload_large_file(filepath) # insert into a table - sql = "SELECT isee.f_upload_result_file({},{},'{}','{}')".format(testid_ctl, testid, descrip, fileoid) - # print('>>>' + sql) - self.__sqlObject.db_execute_query(sql) - except Exception as err: - r = find_between(str(err), '#', '#') - # print(r) - return None - - def upload_dmesg(self, testid_ctl, filepath): - try: - # generate a new oid - fileoid = self.__sqlObject.db_upload_large_file(filepath) - # insert into a table - sql = "SELECT isee.f_upload_dmesg_file({},'{}')".format(testid_ctl, fileoid) + sql = "SELECT isee.f_upload_result_file({},{},'{}','{}','{}')".format(testid_ctl, testid, desc, fileoid, + mimetype) # print('>>>' + sql) self.__sqlObject.db_execute_query(sql) except Exception as err: @@ -224,3 +202,15 @@ class TestSrv_Database(object): r = find_between(str(err), '#', '#') # print(r) return None + + def get_setup_variable(self, skey): + sql = "SELECT * FROM admin.get_setupvar('{}')".format(skey) + # print('>>>' + sql) + try: + res = self.__sqlObject.db_execute_query(sql) + # print(res) + return res[0][0] + except Exception as err: + r = find_between(str(err), '#', '#') + # print(r) + return None diff --git a/test-cli/test/runners/simple.py b/test-cli/test/runners/simple.py index 22fe449..2eb61a4 100644 --- a/test-cli/test/runners/simple.py +++ b/test-cli/test/runners/simple.py @@ -59,8 +59,9 @@ class TextTestResult(unittest.TestResult): def addError(self, test, err): unittest.TestResult.addError(self, test, err) - test.longMessage = err[1] + # test.longMessage = err[1] self.result = self.ERROR + print(err[1]) def addFailure(self, test, err): unittest.TestResult.addFailure(self, test, err) @@ -73,17 +74,13 @@ class TextTestResult(unittest.TestResult): self.runner.writeUpdate(self.result) # save result data for result in test.getresults(): - if "desc" in result.keys() and result["desc"]: - if "type" in result.keys() and "data" in result.keys(): - if result["type"] == "string": - self.__pgObj.upload_result_string(test.params["testidctl"], test.params["testid"], - result["desc"], result["data"]) - elif result["type"] == "file": - self.__pgObj.upload_result_file(test.params["testidctl"], test.params["testid"], - result["desc"], result["data"]) + self.__pgObj.upload_result_file(test.params["testidctl"], test.params["testid"], + result["description"], result["filepath"], result["mimetype"]) # SEND TO DB THE RESULT OF THE TEST if self.result == self.PASS: status = "TEST_COMPLETE" + resulttext = test.gettextresult() else: status = "TEST_FAILED" - self.__pgObj.finish_test(test.params["testidctl"], test.params["testid"], status) + resulttext = test.longMessage + self.__pgObj.finish_test(test.params["testidctl"], test.params["testid"], status, resulttext) diff --git a/test-cli/test/tasks/flasheeprom.py b/test-cli/test/tasks/flasheeprom.py index 71c1bdd..feeb04c 100644 --- a/test-cli/test/tasks/flasheeprom.py +++ b/test-cli/test/tasks/flasheeprom.py @@ -21,7 +21,7 @@ def _generate_data_bytes(boarduuid, mac0, mac1): def _generate_output_data(data_rx): - boarduuid = data_rx[8:44].decode('ascii') # no 8:45 porque el \0 final hace que no funcione postgresql + boarduuid = data_rx[8:44].decode('ascii') # no 8:45 porque el \0 final hace que no funcione postgresql mac0 = binascii.hexlify(data_rx[45:51]).decode('ascii') mac1 = binascii.hexlify(data_rx[51:57]).decode('ascii') smac0 = ':'.join(mac0[i:i + 2] for i in range(0, len(mac0), 2)) diff --git a/test-cli/test/tasks/generatedmesg.py b/test-cli/test/tasks/generatedmesg.py deleted file mode 100644 index aa9d9ce..0000000 --- a/test-cli/test/tasks/generatedmesg.py +++ /dev/null @@ -1,17 +0,0 @@ -import sh - - -def generate_dmesg(tidctl, pgObj): - print("Start getting DMESG...") - p = sh.dmesg("--color=never") - if p.exit_code != 0: - print("DMESG: Unknown error.") - return 1 - else: - # save dmesg in a file - with open('/tmp/dmesg.txt', 'w') as outfile: - n = outfile.write(p.stdout.decode('ascii')) - # save dmesg result in DB - pgObj.upload_dmesg(tidctl, '/tmp/dmesg.txt') - print("DMESG: saved succesfully.") - return 0 diff --git a/test-cli/test/tests/qamper.py b/test-cli/test/tests/qamper.py index 7a31615..b4d57e3 100644 --- a/test-cli/test/tests/qamper.py +++ b/test-cli/test/tests/qamper.py @@ -26,60 +26,37 @@ class Qamper(unittest.TestCase): amp = Amper() # open serial connection if not amp.open(): - self.__resultlist.append( - { - "desc": "Test result", - "data": "FAILED: can not open a serial port", - "type": "string" - } - ) self.fail("Error: can not open a serial port") # check if the amperimeter is connected and working # 2 ATTEMTS if not amp.hello(): if not amp.hello(): - self.__resultlist.append( - { - "desc": "Test result", - "data": "FAILED: can not communicate with the amperimeter", - "type": "string" - } - ) self.fail("Error: can not communicate") # get current value (in Amperes) - current = amp.getCurrent() + self.current = amp.getCurrent() + # save result in a file + with open('/tmp/station/amper.txt', 'w') as outfile: + n = outfile.write("Current: {} A".format(self.current)) + outfile.close() + self.__resultlist.append( + { + "description": "Amperimeter values", + "filepath": "/tmp/station/amper.txt", + "mimetype": "text/plain" + } + ) # close serial connection amp.close() # Check current range - if float(current) > float(self._overcurrent): + if float(self.current) > float(self._overcurrent): # Overcurrent detected - self.__resultlist.append( - { - "desc": "Test result", - "data": "FAILED: Overcurrent detected ( {} A)".format(current), - "type": "string" - } - ) - self.fail("failed: Overcurrent detected ( {} )".format(current)) - elif float(current) < float(self._undercurrent): + self.fail("failed: Overcurrent detected ( {} )".format(self.current)) + elif float(self.current) < float(self._undercurrent): # Undercurrent detected - self.__resultlist.append( - { - "desc": "Test result", - "data": "FAILED: Undercurrent detected ( {} A)".format(current), - "type": "string" - } - ) - self.fail("failed: Undercurrent detected ( {} )".format(current)) - - # Test successful - self.__resultlist.append( - { - "desc": "Test result", - "data": "OK: Current {} A".format(current), - "type": "string" - } - ) + self.fail("failed: Undercurrent detected ( {} )".format(self.current)) def getresults(self): return self.__resultlist + + def gettextresult(self): + return "{} A".format(self.current) diff --git a/test-cli/test/tests/qaudio.py b/test-cli/test/tests/qaudio.py new file mode 100644 index 0000000..5baffe4 --- /dev/null +++ b/test-cli/test/tests/qaudio.py @@ -0,0 +1,59 @@ +import unittest +import sh +import wave +import contextlib + + +def calc_audio_duration(fname): + with contextlib.closing(wave.open(fname, 'r')) as f: + frames = f.getnframes() + rate = f.getframerate() + duration = frames / float(rate) + f.close() + return duration + + +class Qaudio(unittest.TestCase): + params = None + __resultlist = None # resultlist is a python list of python dictionaries + __dtmf_secuence = ["1", "3", "5", "7", "9"] + + def __init__(self, testname, testfunc, varlist): + self.params = varlist + super(Qaudio, self).__init__(testfunc) + self._testMethodDoc = testname + self.__resultlist = [] + + def execute(self): + # analize audio file + recordtime = calc_audio_duration("test/files/dtmf-13579.wav") + 0.15 + # play and record + p1 = sh.aplay("test/files/dtmf-13579.wav", _bg=True) + p2 = sh.arecord("-r", 8000, "-d", recordtime, "/tmp/station/recorded.wav", _bg=True) + p1.wait() + p2.wait() + if p1.exit_code == 0 and p2.exit_code == 0: + p = sh.multimon("-t", "wav", "-a", "DTMF", "/tmp/station/recorded.wav", "-q") + if p.exit_code == 0: + lines = p.stdout.decode('ascii').splitlines() + self.__dtmf_secuence_result = [] + for li in lines: + if li.split(" ")[0] == "DTMF:": + self.__dtmf_secuence_result.append(li.split(" ")[1]) + # compare original and processed dtmf sequence + if len(self.__dtmf_secuence) == len(self.__dtmf_secuence_result): + for i in range(len(self.__dtmf_secuence)): + if self.__dtmf_secuence[i] != self.__dtmf_secuence_result[i]: + self.fail("failed: sent and received DTMF sequence don't match.") + else: + self.fail("failed: received DTMF sequence is shorter than expected.") + else: + self.fail("failed: unable to use multimon command.") + else: + self.fail("failed: unable to play/record audio.") + + def getresults(self): + return self.__resultlist + + def gettextresult(self): + return "" diff --git a/test-cli/test/tests/qdmesg.py b/test-cli/test/tests/qdmesg.py new file mode 100644 index 0000000..45dd4eb --- /dev/null +++ b/test-cli/test/tests/qdmesg.py @@ -0,0 +1,38 @@ +import sh +import os.path +from os import path + +class Qdmesg: + params = None + __resultlist = None # resultlist is a python list of python dictionaries + + def __init__(self, testname, testfunc, varlist): + self.params = varlist + self._testMethodDoc = testname + self.__resultlist = [] + self.pgObj = varlist["db"] + + def execute(self): + print("running dmesg test") + self.pgObj.run_test(self.params["testidctl"], self.params["testid"]) + # delete previous file + if path.exists("/tmp/station/dmesg.txt"): + print("dmesg remove") + os.remove("/tmp/station/dmesg.txt") + # generate file + p = sh.dmesg("--color=never", _out="/tmp/station/dmesg.txt") + print("Exit code: {}".format(p.exit_code)) + if p.exit_code == 0: + # save result + # with open('/tmp/station/dmesg.txt', 'w') as outfile: + # n = outfile.write(p.stdout.decode('ascii')) + # outfile.close() + + # save dmesg result in DB + self.pgObj.upload_result_file(self.params["testidctl"], self.params["testid"], "dmesg output", + "/tmp/station/dmesg.txt", "text/plain") + self.pgObj.finish_test(self.params["testidctl"], self.params["testid"], "TEST_COMPLETE", "") + print("success dmesg test") + else: + self.pgObj.finish_test(self.params["testidctl"], self.params["testid"], "TEST_FAILED", "") + print("fail dmesg test") diff --git a/test-cli/test/tests/qduplex_ser.py b/test-cli/test/tests/qduplex_ser.py index 020844a..170039b 100644 --- a/test-cli/test/tests/qduplex_ser.py +++ b/test-cli/test/tests/qduplex_ser.py @@ -51,23 +51,9 @@ class Qduplex(unittest.TestCase): self.__serial1.write(test_uuid) time.sleep(0.05) # there might be a small delay if self.__serial2.inWaiting() == 0: - self.__resultlist.append( - { - "desc": "Test result", - "data": "FAILED: port {} wait timeout exceded".format(self.__port2), - "type": "string" - } - ) self.fail("failed: port {} wait timeout exceded".format(self.__port2)) else: if self.__serial2.readline() != test_uuid: - self.__resultlist.append( - { - "desc": "Test result", - "data": "FAILED: port {} write/read mismatch".format(self.__port2), - "type": "string" - } - ) self.fail("failed: port {} write/read mismatch".format(self.__port2)) time.sleep(0.05) # there might be a small delay @@ -81,33 +67,13 @@ class Qduplex(unittest.TestCase): self.__serial2.write(test_uuid) time.sleep(0.05) # there might be a small delay if self.__serial1.inWaiting() == 0: - self.__resultlist.append( - { - "desc": "Test result", - "data": "FAILED: port {} wait timeout exceded".format(self.__port1), - "type": "string" - } - ) self.fail("failed: port {} wait timeout exceded".format(self.__port1)) else: if self.__serial1.readline() != test_uuid: - self.__resultlist.append( - { - "desc": "Test result", - "data": "FAILED: port {} write/read mismatch".format(self.__port1), - "type": "string" - } - ) self.fail("failed: port {} write/read mismatch".format(self.__port1)) - # Test successful - self.__resultlist.append( - { - "desc": "Test OK", - "data": "OK", - "type": "string" - } - ) - def getresults(self): return self.__resultlist + + def gettextresult(self): + return "" diff --git a/test-cli/test/tests/qeeprom.py b/test-cli/test/tests/qeeprom.py index cafbc7f..7fc9fcd 100644 --- a/test-cli/test/tests/qeeprom.py +++ b/test-cli/test/tests/qeeprom.py @@ -36,41 +36,14 @@ class Qeeprom(unittest.TestCase): # compare both values if data_rx != data_tx: # Both data are different - self.__resultlist.append( - { - "desc": "Test result", - "data": "FAILED: mismatch between written and received values.", - "type": "string" - } - ) self.fail("failed: mismatch between written and received values.") else: - self.__resultlist.append( - { - "desc": "Test result", - "data": "FAILED: Unable to read from the EEPROM device.", - "type": "string" - } - ) self.fail("failed: Unable to read from the EEPROM device.") else: - self.__resultlist.append( - { - "desc": "Test result", - "data": "FAILED: Unable to write on the EEPROM device.", - "type": "string" - } - ) self.fail("failed: Unable to write on the EEPROM device.") - # Test successful - self.__resultlist.append( - { - "desc": "Test result", - "data": "OK", - "type": "string" - } - ) - def getresults(self): return self.__resultlist + + def gettextresult(self): + return "" diff --git a/test-cli/test/tests/qethernet.py b/test-cli/test/tests/qethernet.py index 878c0a0..3b2f197 100644 --- a/test-cli/test/tests/qethernet.py +++ b/test-cli/test/tests/qethernet.py @@ -1,6 +1,5 @@ import unittest import sh -import re import json @@ -53,45 +52,25 @@ class Qethernet(unittest.TestCase): data = json.loads(p.stdout.decode('ascii')) self.__bwreal = float(data['end']['sum_received']['bits_per_second'])/1024/1024 # save result file - with open('/tmp/ethernet-iperf.json', 'w') as outfile: + with open('/tmp/station/ethernet-iperf3.json', 'w') as outfile: json.dump(data, outfile, indent=4) + outfile.close() self.__resultlist.append( { - "desc": "iperf3 output", - "data": "/tmp/ethernet-iperf.json", - "type": "file" + "description": "iperf3 output", + "filepath": "/tmp/station/ethernet-iperf3.json", + "mimetype": "application/json" } ) # check if BW is in the expected range if self.__bwreal < float(self.__bwexpected): - self.__resultlist.append( - { - "desc": "Test result", - "data": "FAILED: speed is lower than spected. Speed(Mbits/s): " + str(self.__bwreal), - "type": "string" - } - ) self.fail("failed: speed is lower than spected. Speed(Mbits/s): " + str(self.__bwreal)) else: - self.__resultlist.append( - { - "desc": "Test result", - "data": "FAILED: could not complete iperf command.", - "type": "string" - } - ) self.fail("failed: could not complete iperf command.") - # Test successful - self.__resultlist.append( - { - "desc": "Test result", - "data": "OK", - "type": "string" - } - ) - def getresults(self): - return self.__resultlist + + def gettextresult(self): + return "" diff --git a/test-cli/test/tests/qi2c.py b/test-cli/test/tests/qi2c.py deleted file mode 100644 index 3afedfa..0000000 --- a/test-cli/test/tests/qi2c.py +++ /dev/null @@ -1,67 +0,0 @@ -from test.helpers.syscmd import SysCommand -import unittest - - -class Qi2c(unittest.TestCase): - params = None - __resultlist = None # resultlist is a python list of python dictionaries - - def __init__(self, testname, testfunc, varlist): - self.params = varlist - super(Qi2c, self).__init__(testfunc) - if "busnum" in varlist: - self.__busnum = varlist["busnum"] - else: - raise Exception('busnum param inside Qi2c must be defined') - if "register" in varlist: - self.__register = varlist["register"].split("/") - else: - raise Exception('register param inside Qi2c must be defined') - self.__devices = [] - self._testMethodDoc = testname - self.__resultlist = [] - - def execute(self): - str_cmd = "i2cdetect -a -y -r {}".format(self.__busnum) - i2c_command = SysCommand("i2cdetect", str_cmd) - if i2c_command.execute() == 0: - self.__raw_out = i2c_command.getOutput() - if self.__raw_out == "": - return -1 - lines = self.__raw_out.decode('ascii').splitlines() - for i in range(len(lines)): - if lines[i].count('UU'): - if lines[i].find("UU"): - self.__devices.append( - "0x{}{}".format((i - 1), hex(int((lines[i].find("UU") - 4) / 3)).split('x')[-1])) - for i in range(len(self.__register)): - if not (self.__register[i] in self.__devices): - self.__resultlist.append( - { - "desc": "Test result", - "data": "FAILED: device {} not found in bus i2c-{}".format(self.__register[i], self.__busnum), - "type": "string" - } - ) - self.fail("failed: device {} not found in bus i2c-{}".format(self.__register[i], self.__busnum)) - else: - self.__resultlist.append( - { - "desc": "Test result", - "data": "FAILED: could not complete i2cdedtect command", - "type": "string" - } - ) - self.fail("failed: could not complete i2cdedtect command.") - - # Test successful - self.__resultlist.append( - { - "desc": "Test result", - "data": "OK", - "type": "string" - } - ) - - def getresults(self): - return self.__resultlist diff --git a/test-cli/test/tests/qmmcflash.py b/test-cli/test/tests/qmmcflash.py new file mode 100644 index 0000000..f10757e --- /dev/null +++ b/test-cli/test/tests/qmmcflash.py @@ -0,0 +1,15 @@ +import unittest +import sh + +class Qmmcflash(unittest.TestCase): + params = None + + def __init__(self, testname, testfunc, varlist): + self.params = varlist + super(Qmmcflash, self).__init__(testfunc) + self._testMethodDoc = testname + # TODO + + + def execute(self): + # TODO \ No newline at end of file diff --git a/test-cli/test/tests/qnand.py b/test-cli/test/tests/qnand.py index d7c22b3..8c78e2b 100644 --- a/test-cli/test/tests/qnand.py +++ b/test-cli/test/tests/qnand.py @@ -22,34 +22,21 @@ class Qnand(unittest.TestCase): try: p = sh.nandtest("-m", self.__device) # save result - with open('/tmp/nand-nandtest.txt', 'w') as outfile: + with open('/tmp/station/nand-nandtest.txt', 'w') as outfile: n = outfile.write(p.stdout.decode('ascii')) + outfile.close() self.__resultlist.append( { - "desc": "nandtest output", - "data": "/tmp/nand-nandtest.txt", - "type": "file" + "description": "nandtest output", + "filepath": "/tmp/station/nand-nandtest.txt", + "mimetype": "text/plain" } ) - except sh.ErrorReturnCode as e: - self.__resultlist.append( - { - "desc": "Test result", - "data": "FAILED: could not complete nandtest command", - "type": "string" - } - ) self.fail("failed: could not complete nandtest command") - # Test successful - self.__resultlist.append( - { - "desc": "Test result", - "data": "OK", - "type": "string" - } - ) - def getresults(self): return self.__resultlist + + def gettextresult(self): + return "" diff --git a/test-cli/test/tests/qram.py b/test-cli/test/tests/qram.py index a46424f..21a01c8 100644 --- a/test-cli/test/tests/qram.py +++ b/test-cli/test/tests/qram.py @@ -26,25 +26,11 @@ class Qram(unittest.TestCase): def execute(self): try: p = sh.memtester(self.__memsize, "1", _out="/dev/null") - except sh.ErrorReturnCode as e: - self.__resultlist.append( - { - "desc": "Test result", - "data": "FAILED: could not complete memtester command", - "type": "string" - } - ) self.fail("failed: could not complete memtester command") - # Test successful - self.__resultlist.append( - { - "desc": "Test result", - "data": "OK", - "type": "string" - } - ) - def getresults(self): return self.__resultlist + + def gettextresult(self): + return "" diff --git a/test-cli/test/tests/qrtc.py b/test-cli/test/tests/qrtc.py index 6d4ecf6..df493d2 100644 --- a/test-cli/test/tests/qrtc.py +++ b/test-cli/test/tests/qrtc.py @@ -33,41 +33,14 @@ class Qrtc(unittest.TestCase): p.stdout.decode('ascii')) # check if the seconds of both times are different if time1 == time2: - self.__resultlist.append( - { - "desc": "Test result", - "data": "FAILED: RTC is not running", - "type": "string" - } - ) self.fail("failed: RTC is not running") else: - self.__resultlist.append( - { - "desc": "Test result", - "data": "FAILED: couldn't execute hwclock command for the second time", - "type": "string" - } - ) self.fail("failed: couldn't execute hwclock command for the second time") else: - self.__resultlist.append( - { - "desc": "Test result", - "data": "FAILED: couldn't execute hwclock command", - "type": "string" - } - ) self.fail("failed: couldn't execute hwclock command") - # Test successful - self.__resultlist.append( - { - "desc": "Test result", - "data": "OK", - "type": "string" - } - ) - def getresults(self): return self.__resultlist + + def gettextresult(self): + return "" diff --git a/test-cli/test/tests/qserial.py b/test-cli/test/tests/qserial.py index faca505..402e33f 100644 --- a/test-cli/test/tests/qserial.py +++ b/test-cli/test/tests/qserial.py @@ -42,34 +42,14 @@ class Qserial(unittest.TestCase): self.__serial.write(test_uuid) time.sleep(0.05) # there might be a small delay if self.__serial.inWaiting() == 0: - self.__resultlist.append( - { - "desc": "Test result", - "data": "FAILED: port {} wait timeout exceded".format(self.__port), - "type": "string" - } - ) self.fail("failed: port {} wait timeout exceded".format(self.__port)) else: # check if what it was sent is equal to what has been received if self.__serial.readline() != test_uuid: - self.__resultlist.append( - { - "desc": "Test result", - "data": "FAILED: port {} write/read mismatch".format(self.__port), - "type": "string" - } - ) self.fail("failed: port {} write/read mismatch".format(self.__port)) - # Test successful - self.__resultlist.append( - { - "desc": "Test result", - "data": "OK", - "type": "string" - } - ) - def getresults(self): return self.__resultlist + + def gettextresult(self): + return "" diff --git a/test-cli/test/tests/qusb.py b/test-cli/test/tests/qusb.py index 32d99ef..9b0cad3 100644 --- a/test-cli/test/tests/qusb.py +++ b/test-cli/test/tests/qusb.py @@ -22,13 +22,6 @@ class Qusb(unittest.TestCase): if dev_obj.getMassStorage(): device = dev_obj.getMassStorage()['disk'] + "1" else: - self.__resultlist.append( - { - "desc": "Test result", - "data": "FAILED: No USB memory found", - "type": "string" - } - ) self.fail("failed: No USB memory found.") # create a new folder where the pendrive is going to be mounted @@ -60,43 +53,16 @@ class Qusb(unittest.TestCase): sh.umount("/mnt/pendrive") # check result if q is None: - self.__resultlist.append( - { - "desc": "Test result", - "data": "FAILED: Wrong md5 result", - "type": "string" - } - ) self.fail("failed: Wrong md5 result.") else: # umount sh.umount("/mnt/pendrive") - self.__resultlist.append( - { - "desc": "Test result", - "data": "FAILED: Unable to copy files to the USB memory device", - "type": "string" - } - ) self.fail("failed: Unable to copy files to the USB memory device.") else: - self.__resultlist.append( - { - "desc": "Test result", - "data": "FAILED: Unable to mount the USB memory device", - "type": "string" - } - ) self.fail("failed: Unable to mount the USB memory device.") - # Test successful - self.__resultlist.append( - { - "desc": "Test result", - "data": "OK", - "type": "string" - } - ) - def getresults(self): return self.__resultlist + + def gettextresult(self): + return "" diff --git a/test-cli/test/tests/qwifi.py b/test-cli/test/tests/qwifi.py index 6f972d3..eb1f0bf 100644 --- a/test-cli/test/tests/qwifi.py +++ b/test-cli/test/tests/qwifi.py @@ -63,80 +63,33 @@ class Qwifi(unittest.TestCase): data = json.loads(p.stdout.decode('ascii')) self.__bwreal = float(data['end']['sum_received']['bits_per_second']) / 1024 / 1024 # save result file - with open('/tmp/wifi-iperf.json', 'w') as outfile: + with open('/tmp/station/wifi-iperf3.json', 'w') as outfile: json.dump(data, outfile, indent=4) + outfile.close() self.__resultlist.append( { - "desc": "iperf3 output", - "data": "/tmp/wifi-iperf.json", - "type": "file" + "description": "iperf3 output", + "filepath": "/tmp/station/wifi-iperf3.json", + "mimetype": "application/json" } ) # check if BW is in the expected range if self.__bwreal < float(self.__bwexpected): - self.__resultlist.append( - { - "desc": "Test result", - "data": "FAILED: speed is lower than expected. Speed(Mbits/s): " + str(self.__bwreal), - "type": "string" - } - ) self.fail("failed: speed is lower than expected. Speed(Mbits/s): " + str(self.__bwreal)) else: - self.__resultlist.append( - { - "desc": "Test result", - "data": "FAILED: could not complete iperf3 command", - "type": "string" - } - ) self.fail("failed: could not complete iperf3 command") else: - self.__resultlist.append( - { - "desc": "Test result", - "data": "FAILED: wlan0 interface doesn't have any ip address", - "type": "string" - } - ) self.fail("failed: wlan0 interface doesn't have any ip address.") else: - self.__resultlist.append( - { - "desc": "Test result", - "data": "FAILED: could not complete ifconfig command", - "type": "string" - } - ) self.fail("failed: could not complete ifconfig command.") else: - self.__resultlist.append( - { - "desc": "Test result", - "data": "FAILED: wifi module is not connected to the router", - "type": "string" - } - ) self.fail("failed: wifi module is not connected to the router.") else: - self.__resultlist.append( - { - "desc": "Test result", - "data": "FAILED: could not execute iw command", - "type": "string" - } - ) self.fail("failed: could not execute iw command") - # Test successful - self.__resultlist.append( - { - "desc": "Test result", - "data": "OK", - "type": "string" - } - ) - def getresults(self): return self.__resultlist + + def gettextresult(self): + return "" diff --git a/test-cli/test_main.py b/test-cli/test_main.py index 63d8dad..b32ad26 100644 --- a/test-cli/test_main.py +++ b/test-cli/test_main.py @@ -1,5 +1,5 @@ from test.helpers.int_registers import get_die_id -from test.helpers.int_registers import get_mac +from test.helpers.int_registers import get_internal_mac from subprocess import call import os import unittest @@ -9,7 +9,6 @@ from test.runners.simple import SimpleTestRunner from test.tests.qethernet import Qethernet from test.tests.qram import Qram from test.tests.qusb import Qusb -from test.tests.qi2c import Qi2c from test.tests.qeeprom import Qeeprom from test.tests.qserial import Qserial from test.tests.qwifi import Qwifi @@ -17,6 +16,8 @@ from test.tests.qrtc import Qrtc from test.tests.qduplex_ser import Qduplex from test.tests.qamper import Qamper from test.tests.qnand import Qnand +from test.tests.qaudio import Qaudio +from test.tests.qdmesg import Qdmesg from test.helpers.globalVariables import globalVar import socket from test.helpers.iseelogger import ISEE_Logger @@ -24,16 +25,17 @@ import logging from test.tasks.flasheeprom import flash_eeprom from test.tasks.flashmemory import flash_memory from test.helpers.qrreader import QRReader -from test.tasks.generatedmesg import generate_dmesg from test.helpers.cmdline import LinuxKernel from test.helpers.amper import Amper from test.enums.StationStates import StationStates +import time # global variables psdbObj = TestSrv_Database() xmlObj = None loggerObj = None test_abspath = None +tests_manually_executed = [] # define clear function @@ -50,36 +52,50 @@ def create_paramslist(params): return paramlist -def add_test_task(suite, testdefname, paramlist): +def add_test(suite, testdefname, paramlist): if testdefname == "RAM": suite.addTest(Qram(testdefname, "execute", paramlist)) + return 0 elif testdefname == "SERIALDUAL": suite.addTest(Qduplex(testdefname, "execute", paramlist)) + return 0 elif testdefname == "EEPROM": suite.addTest(Qeeprom(testdefname, "execute", paramlist)) + return 0 elif testdefname == "SERIAL": suite.addTest(Qserial(testdefname, "execute", paramlist)) + return 0 elif testdefname == "RTC": suite.addTest(Qrtc(testdefname, "execute", paramlist)) + return 0 elif testdefname == "CONSUMPTION": suite.addTest(Qamper(testdefname, "execute", paramlist)) + return 0 elif testdefname == "ETHERNET": suite.addTest(Qethernet(testdefname, "execute", paramlist)) + return 0 elif testdefname == "NAND": suite.addTest(Qnand(testdefname, "execute", paramlist)) - elif testdefname == "I2C": - suite.addTest(Qi2c(testdefname, "execute", paramlist)) + return 0 elif testdefname == "WIFI": suite.addTest(Qwifi(testdefname, "execute", paramlist)) + return 0 elif testdefname == "USB": suite.addTest(Qusb(testdefname, "execute", paramlist)) + return 0 + elif testdefname == "AUDIO": + suite.addTest(Qaudio(testdefname, "execute", paramlist)) + return 0 + elif testdefname == "DMESG": + tests_manually_executed.append(Qdmesg(testdefname, "execute", paramlist)) + return 0 else: - raise Exception("Wrong testdefname") + return 1 def create_testsuite(): # create an object TestSuite - suite = unittest.TestSuite() + suite1 = unittest.TestSuite() # get list of tests for this board tests = psdbObj.get_tests_list(globalVar.g_uuid) # loop in every test for this board @@ -93,10 +109,11 @@ def create_testsuite(): paramlist["boarduuid"] = globalVar.g_uuid paramlist["testidctl"] = globalVar.testid_ctl paramlist["xml"] = xmlObj + paramlist["db"] = psdbObj # add test to TestSuite - add_test_task(suite, testdefname, paramlist) + rescode = add_test(suite1, testdefname, paramlist) - return suite + return suite1 def create_board(): @@ -122,7 +139,7 @@ def create_board(): print(globalVar.g_mid) print(processor_id) globalVar.g_uuid = psdbObj.create_board(processor_id, model_id, variant, globalVar.station, - get_mac(globalVar.g_mid)) + get_internal_mac(globalVar.g_mid)) print(globalVar.g_uuid) @@ -140,31 +157,38 @@ def main(): # create a process globalVar.testid_ctl = psdbObj.open_test(globalVar.g_uuid) - # create and run tests according to the board type - runner = SimpleTestRunner(psdbObj) # Change state to "TESTS_RUNNING" if psdbObj.read_station_state(globalVar.station) == StationStates.STATION_ERROR.name: # Abort - print("Error: Wrong previous station state.") + print( + "Error: Wrong previous station state before changing to TESTS_RUNNING state. STATION_ERROR state unexpected.") exit(0) psdbObj.change_station_state(globalVar.station, StationStates.TESTS_RUNNING.name) - # Execute tests - testresult = runner.run(create_testsuite()) - # save dmesg at the end of the tests - generate_dmesg(globalVar.testid_ctl, psdbObj) - # execute aditional tasks, only if the test was succesfull + # generate suits + suite1 = create_testsuite() + # Execute tests (round 1) + runner1 = SimpleTestRunner(psdbObj) + testresult = runner1.run(suite1) + + #Execute manually tests + for test in tests_manually_executed: + test.execute() + + # execute aditional tasks, only if the test was successful if testresult.wasSuccessful(): # Change state to "TESTS_OK" if psdbObj.read_station_state(globalVar.station) == StationStates.STATION_ERROR.name: # Abort - print("Error: Wrong previous station state.") + print( + "Error: Wrong previous station state before changing to TESTS_OK state. STATION_ERROR state unexpected.") exit(0) psdbObj.change_station_state(globalVar.station, StationStates.TESTS_OK.name) # Change state to "EXTRATASKS_RUNNING" if psdbObj.read_station_state(globalVar.station) == StationStates.STATION_ERROR.name: # Abort - print("Error: Wrong previous station state.") + print( + "Error: Wrong previous station state before changing to EXTRATASKS_RUNNING state. STATION_ERROR state unexpected.") exit(0) psdbObj.change_station_state(globalVar.station, StationStates.EXTRATASKS_RUNNING.name) # create task control @@ -192,28 +216,38 @@ def main(): # update status with the result if resulteeprom == 0 and resultmemory == 0: + # finish tasks psdbObj.update_taskctl_status(globalVar.taskid_ctl, "TASK_BOARD_OK") + # Change state to "WAITING_FOR_SCANNER" if psdbObj.read_station_state(globalVar.station) == StationStates.STATION_ERROR.name: # Abort - print("Error: Wrong previous station state.") + print( + "Error: Wrong previous station state before changing to WAITING_FOR_SCANNER state. STATION_ERROR state unexpected.") exit(0) psdbObj.change_station_state(globalVar.station, StationStates.WAITING_FOR_SCANNER.name) - # get barcode using the scanner - qrreceived = False - while not qrreceived: - qr = QRReader() - if qr.openQR(): - # waits 5s to receive a valid code - if qr.readQRasync(5): - qrreceived = True - factorycode = qr.getQRNumber() - psdbObj.set_factorycode(globalVar.g_uuid, factorycode) - qr.closeQR() + + # get barcode using the scanner, only if autotest is disabled + autotest = psdbObj.get_setup_variable("AUTOTEST_" + globalVar.station) + if autotest == "True": + psdbObj.set_factorycode(globalVar.g_uuid, "1234567890") # fake factory code + else: + qrreceived = False + while not qrreceived: + qr = QRReader() + if qr.openQR(): + # waits 5s to receive a valid code + if qr.readQRasync(5): + qrreceived = True + factorycode = qr.getQRNumber() + psdbObj.set_factorycode(globalVar.g_uuid, factorycode) + qr.closeQR() + # Change state to "FINISHED" if psdbObj.read_station_state(globalVar.station) == StationStates.STATION_ERROR.name: # Abort - print("Error: Wrong previous station state.") + print( + "Error: Wrong previous station state before changing to FINISHED state. STATION_ERROR state unexpected.") exit(0) psdbObj.change_station_state(globalVar.station, StationStates.FINISHED.name) else: @@ -224,10 +258,16 @@ def main(): # Change state to "TESTS_FAILED" if psdbObj.read_station_state(globalVar.station) == StationStates.STATION_ERROR.name: # Abort - print("Error: Wrong previous station state.") + print( + "Error: Wrong previous station state before changing to TESTS_FAILED state. STATION_ERROR state unexpected.") exit(0) psdbObj.change_station_state(globalVar.station, StationStates.TESTS_FAILED.name) + # reset board if AUTOTEST is enabled + autotest = psdbObj.get_setup_variable("AUTOTEST_" + globalVar.station) + if autotest == "True": + os.system('reboot') + if __name__ == "__main__": # Clear the shell screen @@ -257,22 +297,17 @@ if __name__ == "__main__": # Check if current state is "WAIT_TEST_START" currentstate = psdbObj.read_station_state(globalVar.station) - if currentstate != StationStates.WAIT_TEST_START.name: - # Abort - print("Error: Wrong previous station state.") - exit(0) + while currentstate != StationStates.WAIT_TEST_START.name: + currentstate = psdbObj.read_station_state(globalVar.station) + if currentstate != StationStates.WAIT_TEST_START.name: + # Wait + print("Error: Wrong previous station state. WAIT_TEST_START state expected. Current state is: {}".format(currentstate)) + print("Waiting for WAIT_TEST_START state") + time.sleep(5) # Change state to "TESTS_CHECKING_ENV" psdbObj.change_station_state(globalVar.station, StationStates.TESTS_CHECKING_ENV.name) - # Look for a sd or pendrive - # TODO - #if not disk_exists("/dev/mmcblk0") and not : - # abort - #print("error: cannot find a barcode scanner.") - #loggerObj.getlogger().info("station error: #cannot find a sd card or memory usb to load the test environment.#") - #exit(0) - # Look for a barcode scanner qr = QRReader() if qr.openQR(): -- cgit v1.1 From b58a64c6993ab60f18a1a0ca8c90f167c7e6656b Mon Sep 17 00:00:00 2001 From: Hector Fernandez Date: Thu, 2 Jul 2020 14:45:48 +0200 Subject: Added support for IGEP0020 and IGEP0030. Fixed problem in Audio test. --- test-cli/test/helpers/int_registers.py | 2 +- test-cli/test/tests/qaudio.py | 6 ++++-- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/test-cli/test/helpers/int_registers.py b/test-cli/test/helpers/int_registers.py index 9de503b..d081853 100644 --- a/test-cli/test/helpers/int_registers.py +++ b/test-cli/test/helpers/int_registers.py @@ -32,7 +32,7 @@ def get_die_id(modelid): elif modelid.find("IGEP0034") == 0 or modelid.find("SOPA0000") == 0: # registers: mac_id0_lo, mac_id0_hi, mac_id1_lo, mac_id1_hi registers = [0x44e10630, 0x44e10634, 0x44e10638, 0x44e1063C] - elif modelid.find("OMAP3") == 0: + elif modelid.find("OMAP3") == 0 or modelid.find("IGEP0020") == 0 or modelid.find("IGEP0030") == 0: registers = [0x4830A224, 0x4830A220, 0x4830A21C, 0x4830A218] elif modelid.find("OMAP5") == 0: registers = [0x4A002210, 0x4A00220C, 0x4A002208, 0x4A002200] diff --git a/test-cli/test/tests/qaudio.py b/test-cli/test/tests/qaudio.py index 5baffe4..ef0cf53 100644 --- a/test-cli/test/tests/qaudio.py +++ b/test-cli/test/tests/qaudio.py @@ -2,6 +2,7 @@ import unittest import sh import wave import contextlib +import os def calc_audio_duration(fname): @@ -25,10 +26,11 @@ class Qaudio(unittest.TestCase): self.__resultlist = [] def execute(self): + test_abspath = os.path.dirname(os.path.abspath(__file__)) + "/../" # analize audio file - recordtime = calc_audio_duration("test/files/dtmf-13579.wav") + 0.15 + recordtime = calc_audio_duration(os.path.join(test_abspath, "files/dtmf-13579.wav")) + 0.15 # play and record - p1 = sh.aplay("test/files/dtmf-13579.wav", _bg=True) + p1 = sh.aplay(os.path.join(test_abspath, "files/dtmf-13579.wav"), _bg=True) p2 = sh.arecord("-r", 8000, "-d", recordtime, "/tmp/station/recorded.wav", _bg=True) p1.wait() p2.wait() -- cgit v1.1 From 5dcd213c28451ac210703dc5bf9bf538671a0682 Mon Sep 17 00:00:00 2001 From: Hector Fernandez Date: Fri, 3 Jul 2020 13:55:45 +0200 Subject: Created new USB LOOP test. Moved all the test files to /var/lib/hwtest-files/ folder. --- test-cli/test/files/dtmf-13579.wav | Bin 17722 -> 0 bytes test-cli/test/files/usbtest/usbdatatest.bin | Bin 33554431 -> 0 bytes test-cli/test/files/usbtest/usbdatatest.md5 | 1 - test-cli/test/tests/qaudio.py | 6 +-- test-cli/test/tests/qusb.py | 64 +++++++++++++----------- test-cli/test/tests/qusbdual.py | 74 ++++++++++++++++++++++++++++ test-cli/test_main.py | 3 ++ 7 files changed, 115 insertions(+), 33 deletions(-) delete mode 100644 test-cli/test/files/dtmf-13579.wav delete mode 100644 test-cli/test/files/usbtest/usbdatatest.bin delete mode 100644 test-cli/test/files/usbtest/usbdatatest.md5 create mode 100644 test-cli/test/tests/qusbdual.py diff --git a/test-cli/test/files/dtmf-13579.wav b/test-cli/test/files/dtmf-13579.wav deleted file mode 100644 index c5b416a..0000000 Binary files a/test-cli/test/files/dtmf-13579.wav and /dev/null differ diff --git a/test-cli/test/files/usbtest/usbdatatest.bin b/test-cli/test/files/usbtest/usbdatatest.bin deleted file mode 100644 index 423eec3..0000000 Binary files a/test-cli/test/files/usbtest/usbdatatest.bin and /dev/null differ diff --git a/test-cli/test/files/usbtest/usbdatatest.md5 b/test-cli/test/files/usbtest/usbdatatest.md5 deleted file mode 100644 index 4358fb7..0000000 --- a/test-cli/test/files/usbtest/usbdatatest.md5 +++ /dev/null @@ -1 +0,0 @@ -6ef9717ac968b592803d0026f662a85e usbdatatest.bin diff --git a/test-cli/test/tests/qaudio.py b/test-cli/test/tests/qaudio.py index ef0cf53..acad4a9 100644 --- a/test-cli/test/tests/qaudio.py +++ b/test-cli/test/tests/qaudio.py @@ -2,7 +2,6 @@ import unittest import sh import wave import contextlib -import os def calc_audio_duration(fname): @@ -26,11 +25,10 @@ class Qaudio(unittest.TestCase): self.__resultlist = [] def execute(self): - test_abspath = os.path.dirname(os.path.abspath(__file__)) + "/../" # analize audio file - recordtime = calc_audio_duration(os.path.join(test_abspath, "files/dtmf-13579.wav")) + 0.15 + recordtime = calc_audio_duration("/var/lib/hwtest-files/dtmf-13579.wav") + 0.15 # play and record - p1 = sh.aplay(os.path.join(test_abspath, "files/dtmf-13579.wav"), _bg=True) + p1 = sh.aplay("/var/lib/hwtest-files/dtmf-13579.wav", _bg=True) p2 = sh.arecord("-r", 8000, "-d", recordtime, "/tmp/station/recorded.wav", _bg=True) p1.wait() p2.wait() diff --git a/test-cli/test/tests/qusb.py b/test-cli/test/tests/qusb.py index 9b0cad3..df1248d 100644 --- a/test-cli/test/tests/qusb.py +++ b/test-cli/test/tests/qusb.py @@ -1,9 +1,6 @@ import sh import unittest -import re -import os from test.helpers.usb import USBDevices -from test.helpers.changedir import changedir class Qusb(unittest.TestCase): @@ -14,6 +11,10 @@ class Qusb(unittest.TestCase): self.params = varlist super(Qusb, self).__init__(testfunc) self._testMethodDoc = testname + if "repetitions" in varlist: + self.__repetitions = varlist["repetitions"] + else: + raise Exception('repetitions param inside Qusb must be defined') self.__resultlist = [] def execute(self): @@ -23,43 +24,50 @@ class Qusb(unittest.TestCase): device = dev_obj.getMassStorage()['disk'] + "1" else: self.fail("failed: No USB memory found.") - - # create a new folder where the pendrive is going to be mounted - sh.mkdir("-p", "/mnt/pendrive") # check if the device is mounted, and umount it try: p = sh.findmnt("-n", device) if p.exit_code == 0: sh.umount(device) - except sh.ErrorReturnCode_1: + except sh.ErrorReturnCode as e: # error = 1 means "no found" pass # mount the device + sh.mkdir("-p", "/mnt/pendrive") p = sh.mount(device, "/mnt/pendrive") - if p.exit_code == 0: + if p.exit_code != 0: + self.fail("failed: Unable to mount the USB memory device.") + # execute test + for i in range(self.__repetitions): # copy files - test_abspath = os.path.dirname(os.path.abspath(__file__)) + "/../" - p = sh.cp(os.path.join(test_abspath, "files/usbtest/usbdatatest.bin"), - os.path.join(test_abspath, "files/usbtest/usbdatatest.md5"), - "/mnt/pendrive") - if p.exit_code == 0: - # check - with changedir("/mnt/pendrive/"): - p = sh.md5sum("-c", "usbdatatest.md5") - q = re.search("OK", p.stdout.decode('ascii')) - # delete files - sh.rm("-f", "/mnt/pendrive/usbdatatest.bin", "/mnt/pendrive/usbdatatest.md5") - # umount - sh.umount("/mnt/pendrive") - # check result - if q is None: - self.fail("failed: Wrong md5 result.") - else: - # umount + try: + p = sh.cp("/var/lib/hwtest-files/usbdatatest.bin", + "/var/lib/hwtest-files/usbdatatest.bin.md5", + "/mnt/pendrive") + except sh.ErrorReturnCode as e: sh.umount("/mnt/pendrive") + sh.rmdir("/mnt/pendrive") self.fail("failed: Unable to copy files to the USB memory device.") - else: - self.fail("failed: Unable to mount the USB memory device.") + # check MD5 + try: + p = sh.md5sum("/mnt/pendrive/usbdatatest.bin") + except sh.ErrorReturnCode as e: + sh.umount("/mnt/pendrive") + sh.rmdir("/mnt/pendrive") + self.fail("failed: Unable to calculate MD5 of the copied file.") + newmd5 = p.stdout.decode().split(" ")[0] + with open('/mnt/pendrive/usbdatatest.bin.md5', 'r') as outfile: + oldmd5 = outfile.read() + outfile.close() + if newmd5 != oldmd5: + sh.umount("/mnt/pendrive") + sh.rmdir("/mnt/pendrive") + self.fail("failed: MD5 check failed.") + # delete copied files + sh.rm("-f", "/mnt/pendrive/usbdatatest.bin", "/mnt/pendrive/usbdatatest.bin.md5") + # Finish + sh.umount("/mnt/pendrive") + sh.rmdir("/mnt/pendrive") def getresults(self): return self.__resultlist diff --git a/test-cli/test/tests/qusbdual.py b/test-cli/test/tests/qusbdual.py new file mode 100644 index 0000000..bb295b8 --- /dev/null +++ b/test-cli/test/tests/qusbdual.py @@ -0,0 +1,74 @@ +import sh +import unittest +import os.path + + +class Qusbdual(unittest.TestCase): + params = None + __resultlist = None # resultlist is a python list of python dictionaries + + def __init__(self, testname, testfunc, varlist): + self.params = varlist + super(Qusbdual, self).__init__(testfunc) + self._testMethodDoc = testname + if "repetitions" in varlist: + self.__repetitions = varlist["repetitions"] + else: + raise Exception('repetitions param inside Qusbdual must be defined') + self.__resultlist = [] + + def execute(self): + # check if file-as-filesystem exists + if not os.path.isfile('/var/lib/hwtest-files/mass-otg-test.img'): + self.fail("failed: Unable to find file-as-filesystem image.") + # generate mass storage gadget + p = sh.modprobe("g_mass_storage", "file=/var/lib/hwtest-files/mass-otg-test.img") + if p.exit_code != 0: + self.fail("failed: Unable to create a mass storage gadget.") + # find the mass storage device + try: + p = sh.grep(sh.lsblk("-So", "NAME,VENDOR"), "Linux") + except sh.ErrorReturnCode as e: + self.fail("failed: could not find any mass storage gadget") + device = p.stdout.decode().split(" ")[0] + # mount the mass storage gadget + sh.mkdir("-p", "/tmp/station/hdd_gadget") + p = sh.mount("-o", "ro", "/dev/" + device, "/tmp/station/hdd_gadget") + if p.exit_code != 0: + self.fail("failed: Unable to mount the mass storage gadget.") + # execute test + for i in range(self.__repetitions): + # copy files + try: + p = sh.cp("/tmp/station/hdd_gadget/usb-test.bin", "/tmp/station/hdd_gadget/usb-test.bin.md5", + "/tmp/stationnfs") + except sh.ErrorReturnCode as e: + sh.umount("/tmp/station/hdd_gadget") + sh.rmdir("/tmp/station/hdd_gadget") + self.fail("failed: Unable to copy files through USB.") + # Check md5 + try: + p = sh.md5sum("/tmp/stationnfs/usb-test.bin") + except sh.ErrorReturnCode as e: + sh.umount("/tmp/station/hdd_gadget") + sh.rmdir("/tmp/station/hdd_gadget") + self.fail("failed: Unable to calculate MD5 of the copied file.") + newmd5 = p.stdout.decode().split(" ")[0] + with open('/tmp/station/hdd_gadget/usb-test.bin.md5', 'r') as outfile: + oldmd5 = outfile.read() + outfile.close() + if newmd5 != oldmd5: + sh.umount("/tmp/station/hdd_gadget") + sh.rmdir("/tmp/station/hdd_gadget") + self.fail("failed: MD5 check failed.") + # delete copied files + sh.rm("-f", "/tmp/stationnfs/usb-test.bin", "/tmp/stationnfs/usb-test.bin.md5") + # Finish + sh.umount("/tmp/station/hdd_gadget") + sh.rmdir("/tmp/station/hdd_gadget") + + def getresults(self): + return self.__resultlist + + def gettextresult(self): + return "" diff --git a/test-cli/test_main.py b/test-cli/test_main.py index b32ad26..36d5fb1 100644 --- a/test-cli/test_main.py +++ b/test-cli/test_main.py @@ -86,6 +86,9 @@ def add_test(suite, testdefname, paramlist): elif testdefname == "AUDIO": suite.addTest(Qaudio(testdefname, "execute", paramlist)) return 0 + elif testdefname == "USBDUAL": + suite.addTest(Qaudio(testdefname, "execute", paramlist)) + return 0 elif testdefname == "DMESG": tests_manually_executed.append(Qdmesg(testdefname, "execute", paramlist)) return 0 -- cgit v1.1 From 0e8e3ecd4b9be71c41008b95950d089819622dff Mon Sep 17 00:00:00 2001 From: Hector Fernandez Date: Mon, 6 Jul 2020 14:08:27 +0200 Subject: Corrected some errors in USB and USBDUAL tests. --- test-cli/.idea/workspace.xml | 54 +++++++++++++-------------------------- test-cli/test/helpers/qrreader.py | 40 ++++++++++++++--------------- test-cli/test/tests/qdmesg.py | 3 +-- test-cli/test/tests/qusb.py | 50 ++++++++++++++++++++++++------------ test-cli/test/tests/qusbdual.py | 7 ++++- test-cli/test_main.py | 6 ++--- 6 files changed, 81 insertions(+), 79 deletions(-) diff --git a/test-cli/.idea/workspace.xml b/test-cli/.idea/workspace.xml index 887bf9b..edde041 100644 --- a/test-cli/.idea/workspace.xml +++ b/test-cli/.idea/workspace.xml @@ -2,39 +2,11 @@ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + + - + - + + + + - + - - - + + + @@ -116,6 +91,9 @@ + + @@ -125,6 +103,10 @@ + + + + diff --git a/test-cli/test/helpers/qrreader.py b/test-cli/test/helpers/qrreader.py index 908c9db..744db54 100644 --- a/test-cli/test/helpers/qrreader.py +++ b/test-cli/test/helpers/qrreader.py @@ -13,18 +13,17 @@ qrdevice_list = [ "SM SM-2D PRODUCT HID KBW" ] - -qrkey_list = { 'KEY_0':'0', - 'KEY_1':'1', - 'KEY_2':'2', - 'KEY_3':'3', - 'KEY_4':'4', - 'KEY_5':'5', - 'KEY_6':'6', - 'KEY_7':'7', - 'KEY_8':'8', - 'KEY_9':'9' - } +qrkey_list = {'KEY_0': '0', + 'KEY_1': '1', + 'KEY_2': '2', + 'KEY_3': '3', + 'KEY_4': '4', + 'KEY_5': '5', + 'KEY_6': '6', + 'KEY_7': '7', + 'KEY_8': '8', + 'KEY_9': '9' + } class QRReader: @@ -41,13 +40,13 @@ class QRReader: for device in devices: if device.name in qrdevice_list: self.__myReader['NAME'] = device.name - #print(self.__myReader['NAME']) + # print(self.__myReader['NAME']) self.__myReader['PATH'] = device.path - #print(self.__myReader['PATH']) + # print(self.__myReader['PATH']) self.__myReader['PHYS'] = device.phys - #print(self.__myReader['PHYS']) + # print(self.__myReader['PHYS']) - def IsQR (self): + def IsQR(self): return 'NAME' in self.__myReader def getQRNumber(self): @@ -84,7 +83,7 @@ class QRReader: return True return False - def wait_event (self, timeout): + def wait_event(self, timeout): selector.register(self.__dev, selectors.EVENT_READ) while True: events = selector.select(timeout); @@ -114,8 +113,7 @@ class QRReader: return r return False - -#qr = QRReader() -#if qr.openQR(): +# qr = QRReader() +# if qr.openQR(): # print(qr.readQRasync(2)) -#qr.closeQR() +# qr.closeQR() diff --git a/test-cli/test/tests/qdmesg.py b/test-cli/test/tests/qdmesg.py index 45dd4eb..8117deb 100644 --- a/test-cli/test/tests/qdmesg.py +++ b/test-cli/test/tests/qdmesg.py @@ -2,6 +2,7 @@ import sh import os.path from os import path + class Qdmesg: params = None __resultlist = None # resultlist is a python list of python dictionaries @@ -17,11 +18,9 @@ class Qdmesg: self.pgObj.run_test(self.params["testidctl"], self.params["testid"]) # delete previous file if path.exists("/tmp/station/dmesg.txt"): - print("dmesg remove") os.remove("/tmp/station/dmesg.txt") # generate file p = sh.dmesg("--color=never", _out="/tmp/station/dmesg.txt") - print("Exit code: {}".format(p.exit_code)) if p.exit_code == 0: # save result # with open('/tmp/station/dmesg.txt', 'w') as outfile: diff --git a/test-cli/test/tests/qusb.py b/test-cli/test/tests/qusb.py index df1248d..b5d152e 100644 --- a/test-cli/test/tests/qusb.py +++ b/test-cli/test/tests/qusb.py @@ -1,6 +1,7 @@ import sh import unittest from test.helpers.usb import USBDevices +import os.path class Qusb(unittest.TestCase): @@ -33,41 +34,58 @@ class Qusb(unittest.TestCase): # error = 1 means "no found" pass # mount the device - sh.mkdir("-p", "/mnt/pendrive") - p = sh.mount(device, "/mnt/pendrive") + sh.mkdir("-p", "/tmp/station/pendrive") + p = sh.mount(device, "/tmp/station/pendrive") if p.exit_code != 0: self.fail("failed: Unable to mount the USB memory device.") # execute test - for i in range(self.__repetitions): + for i in range(int(self.__repetitions)): # copy files try: p = sh.cp("/var/lib/hwtest-files/usbdatatest.bin", "/var/lib/hwtest-files/usbdatatest.bin.md5", - "/mnt/pendrive") + "/tmp/station/pendrive") except sh.ErrorReturnCode as e: - sh.umount("/mnt/pendrive") - sh.rmdir("/mnt/pendrive") + try: + sh.umount("/tmp/station/pendrive") + except sh.ErrorReturnCode: + pass + sh.rmdir("/tmp/station/pendrive") self.fail("failed: Unable to copy files to the USB memory device.") + # check if the device is still mounted + if not os.path.ismount("/tmp/station/pendrive"): + sh.rm("/tmp/station/pendrive/*") + sh.rmdir("/tmp/station/pendrive") + self.fail("failed: USB device unmounted during/after copying files.") # check MD5 try: - p = sh.md5sum("/mnt/pendrive/usbdatatest.bin") + p = sh.md5sum("/tmp/station/pendrive/usbdatatest.bin") except sh.ErrorReturnCode as e: - sh.umount("/mnt/pendrive") - sh.rmdir("/mnt/pendrive") + try: + sh.umount("/tmp/station/pendrive") + except sh.ErrorReturnCode: + pass + sh.rmdir("/tmp/station/pendrive") self.fail("failed: Unable to calculate MD5 of the copied file.") newmd5 = p.stdout.decode().split(" ")[0] - with open('/mnt/pendrive/usbdatatest.bin.md5', 'r') as outfile: - oldmd5 = outfile.read() + with open('/tmp/station/pendrive/usbdatatest.bin.md5', 'r') as outfile: + oldmd5 = outfile.read().rstrip("\n") outfile.close() if newmd5 != oldmd5: - sh.umount("/mnt/pendrive") - sh.rmdir("/mnt/pendrive") + try: + sh.umount("/tmp/station/pendrive") + except sh.ErrorReturnCode: + pass + sh.rmdir("/tmp/station/pendrive") self.fail("failed: MD5 check failed.") # delete copied files - sh.rm("-f", "/mnt/pendrive/usbdatatest.bin", "/mnt/pendrive/usbdatatest.bin.md5") + sh.rm("-f", "/tmp/station/pendrive/usbdatatest.bin", "/tmp/station/pendrive/usbdatatest.bin.md5") # Finish - sh.umount("/mnt/pendrive") - sh.rmdir("/mnt/pendrive") + try: + sh.umount("/tmp/station/pendrive") + except sh.ErrorReturnCode: + pass + sh.rmdir("/tmp/station/pendrive") def getresults(self): return self.__resultlist diff --git a/test-cli/test/tests/qusbdual.py b/test-cli/test/tests/qusbdual.py index bb295b8..ad95b84 100644 --- a/test-cli/test/tests/qusbdual.py +++ b/test-cli/test/tests/qusbdual.py @@ -37,7 +37,7 @@ class Qusbdual(unittest.TestCase): if p.exit_code != 0: self.fail("failed: Unable to mount the mass storage gadget.") # execute test - for i in range(self.__repetitions): + for i in range(int(self.__repetitions)): # copy files try: p = sh.cp("/tmp/station/hdd_gadget/usb-test.bin", "/tmp/station/hdd_gadget/usb-test.bin.md5", @@ -46,6 +46,11 @@ class Qusbdual(unittest.TestCase): sh.umount("/tmp/station/hdd_gadget") sh.rmdir("/tmp/station/hdd_gadget") self.fail("failed: Unable to copy files through USB.") + # check if the device is still mounted + if not os.path.ismount("/tmp/station/hdd_gadget"): + sh.rm("/tmp/station/hdd_gadget/*") + sh.rmdir("/tmp/station/hdd_gadget") + self.fail("failed: USB device unmounted during/after copying files.") # Check md5 try: p = sh.md5sum("/tmp/stationnfs/usb-test.bin") diff --git a/test-cli/test_main.py b/test-cli/test_main.py index 36d5fb1..a8eff91 100644 --- a/test-cli/test_main.py +++ b/test-cli/test_main.py @@ -174,7 +174,7 @@ def main(): runner1 = SimpleTestRunner(psdbObj) testresult = runner1.run(suite1) - #Execute manually tests + # Execute manually tests for test in tests_manually_executed: test.execute() @@ -232,7 +232,7 @@ def main(): # get barcode using the scanner, only if autotest is disabled autotest = psdbObj.get_setup_variable("AUTOTEST_" + globalVar.station) - if autotest == "True": + if int(autotest) == 1: psdbObj.set_factorycode(globalVar.g_uuid, "1234567890") # fake factory code else: qrreceived = False @@ -268,7 +268,7 @@ def main(): # reset board if AUTOTEST is enabled autotest = psdbObj.get_setup_variable("AUTOTEST_" + globalVar.station) - if autotest == "True": + if int(autotest) == 1: os.system('reboot') -- cgit v1.1 From cd3c8dd78e3bcdd442967583e3814db3701871c0 Mon Sep 17 00:00:00 2001 From: Hector Fernandez Date: Mon, 6 Jul 2020 16:01:08 +0200 Subject: Solved minor errors with audio and usbloop tests. --- test-cli/.idea/workspace.xml | 5 +---- test-cli/test/tests/qaudio.py | 7 +++++-- test-cli/test/tests/qusbdual.py | 2 +- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/test-cli/.idea/workspace.xml b/test-cli/.idea/workspace.xml index edde041..4e10f5c 100644 --- a/test-cli/.idea/workspace.xml +++ b/test-cli/.idea/workspace.xml @@ -3,11 +3,8 @@ - - - + - + + + + diff --git a/test-cli/test/tests/qamper.py b/test-cli/test/tests/qamper.py index b4d57e3..58054c9 100644 --- a/test-cli/test/tests/qamper.py +++ b/test-cli/test/tests/qamper.py @@ -35,13 +35,13 @@ class Qamper(unittest.TestCase): # get current value (in Amperes) self.current = amp.getCurrent() # save result in a file - with open('/tmp/station/amper.txt', 'w') as outfile: + with open('/mnt/station_ramdisk/amper.txt', 'w') as outfile: n = outfile.write("Current: {} A".format(self.current)) outfile.close() self.__resultlist.append( { "description": "Amperimeter values", - "filepath": "/tmp/station/amper.txt", + "filepath": "/mnt/station_ramdisk/amper.txt", "mimetype": "text/plain" } ) diff --git a/test-cli/test/tests/qaudio.py b/test-cli/test/tests/qaudio.py index f7ae33c..bfbb051 100644 --- a/test-cli/test/tests/qaudio.py +++ b/test-cli/test/tests/qaudio.py @@ -2,7 +2,6 @@ import unittest import sh import wave import contextlib -import time def calc_audio_duration(fname): @@ -28,15 +27,15 @@ class Qaudio(unittest.TestCase): def execute(self): # analize audio file recordtime = calc_audio_duration("/var/lib/hwtest-files/dtmf-13579.wav") + 0.15 - # wait time before playing - time.sleep(1) + # previous play to estabilise audio lines + sh.aplay("/var/lib/hwtest-files/dtmf-13579.wav") # play and record p1 = sh.aplay("/var/lib/hwtest-files/dtmf-13579.wav", _bg=True) - p2 = sh.arecord("-r", 8000, "-d", recordtime, "/tmp/station/recorded.wav", _bg=True) + p2 = sh.arecord("-r", 8000, "-d", recordtime, "/mnt/station_ramdisk/recorded.wav", _bg=True) p1.wait() p2.wait() if p1.exit_code == 0 and p2.exit_code == 0: - p = sh.multimon("-t", "wav", "-a", "DTMF", "/tmp/station/recorded.wav", "-q") + p = sh.multimon("-t", "wav", "-a", "DTMF", "/mnt/station_ramdisk/recorded.wav", "-q") if p.exit_code == 0: lines = p.stdout.decode('ascii').splitlines() self.__dtmf_secuence_result = [] @@ -49,6 +48,7 @@ class Qaudio(unittest.TestCase): if a != b: self.fail("failed: sent and received DTMF sequence don't match.") else: + print(self.__dtmf_secuence_result) self.fail("failed: received DTMF sequence is shorter than expected.") else: self.fail("failed: unable to use multimon command.") diff --git a/test-cli/test/tests/qdmesg.py b/test-cli/test/tests/qdmesg.py index 8117deb..b35f1ff 100644 --- a/test-cli/test/tests/qdmesg.py +++ b/test-cli/test/tests/qdmesg.py @@ -17,19 +17,14 @@ class Qdmesg: print("running dmesg test") self.pgObj.run_test(self.params["testidctl"], self.params["testid"]) # delete previous file - if path.exists("/tmp/station/dmesg.txt"): - os.remove("/tmp/station/dmesg.txt") + if path.exists("/mnt/station_ramdisk/dmesg.txt"): + os.remove("/mnt/station_ramdisk/dmesg.txt") # generate file - p = sh.dmesg("--color=never", _out="/tmp/station/dmesg.txt") + p = sh.dmesg("--color=never", _out="/mnt/station_ramdisk/dmesg.txt") if p.exit_code == 0: - # save result - # with open('/tmp/station/dmesg.txt', 'w') as outfile: - # n = outfile.write(p.stdout.decode('ascii')) - # outfile.close() - # save dmesg result in DB self.pgObj.upload_result_file(self.params["testidctl"], self.params["testid"], "dmesg output", - "/tmp/station/dmesg.txt", "text/plain") + "/mnt/station_ramdisk/dmesg.txt", "text/plain") self.pgObj.finish_test(self.params["testidctl"], self.params["testid"], "TEST_COMPLETE", "") print("success dmesg test") else: diff --git a/test-cli/test/tests/qethernet.py b/test-cli/test/tests/qethernet.py index 3b2f197..d38de34 100644 --- a/test-cli/test/tests/qethernet.py +++ b/test-cli/test/tests/qethernet.py @@ -52,13 +52,13 @@ class Qethernet(unittest.TestCase): data = json.loads(p.stdout.decode('ascii')) self.__bwreal = float(data['end']['sum_received']['bits_per_second'])/1024/1024 # save result file - with open('/tmp/station/ethernet-iperf3.json', 'w') as outfile: + with open('/mnt/station_ramdisk/ethernet-iperf3.json', 'w') as outfile: json.dump(data, outfile, indent=4) outfile.close() self.__resultlist.append( { "description": "iperf3 output", - "filepath": "/tmp/station/ethernet-iperf3.json", + "filepath": "/mnt/station_ramdisk/ethernet-iperf3.json", "mimetype": "application/json" } ) diff --git a/test-cli/test/tests/qnand.py b/test-cli/test/tests/qnand.py index 8c78e2b..988ab60 100644 --- a/test-cli/test/tests/qnand.py +++ b/test-cli/test/tests/qnand.py @@ -22,13 +22,13 @@ class Qnand(unittest.TestCase): try: p = sh.nandtest("-m", self.__device) # save result - with open('/tmp/station/nand-nandtest.txt', 'w') as outfile: + with open('/mnt/station_ramdisk/nand-nandtest.txt', 'w') as outfile: n = outfile.write(p.stdout.decode('ascii')) outfile.close() self.__resultlist.append( { "description": "nandtest output", - "filepath": "/tmp/station/nand-nandtest.txt", + "filepath": "/mnt/station_ramdisk/nand-nandtest.txt", "mimetype": "text/plain" } ) diff --git a/test-cli/test/tests/qusb.py b/test-cli/test/tests/qusb.py index b5d152e..6302012 100644 --- a/test-cli/test/tests/qusb.py +++ b/test-cli/test/tests/qusb.py @@ -34,8 +34,8 @@ class Qusb(unittest.TestCase): # error = 1 means "no found" pass # mount the device - sh.mkdir("-p", "/tmp/station/pendrive") - p = sh.mount(device, "/tmp/station/pendrive") + sh.mkdir("-p", "/mnt/station_ramdisk/pendrive") + p = sh.mount(device, "/mnt/station_ramdisk/pendrive") if p.exit_code != 0: self.fail("failed: Unable to mount the USB memory device.") # execute test @@ -44,48 +44,48 @@ class Qusb(unittest.TestCase): try: p = sh.cp("/var/lib/hwtest-files/usbdatatest.bin", "/var/lib/hwtest-files/usbdatatest.bin.md5", - "/tmp/station/pendrive") + "/mnt/station_ramdisk/pendrive") except sh.ErrorReturnCode as e: try: - sh.umount("/tmp/station/pendrive") + sh.umount("/mnt/station_ramdisk/pendrive") except sh.ErrorReturnCode: pass - sh.rmdir("/tmp/station/pendrive") + sh.rmdir("/mnt/station_ramdisk/pendrive") self.fail("failed: Unable to copy files to the USB memory device.") # check if the device is still mounted - if not os.path.ismount("/tmp/station/pendrive"): - sh.rm("/tmp/station/pendrive/*") - sh.rmdir("/tmp/station/pendrive") + if not os.path.ismount("/mnt/station_ramdisk/pendrive"): + sh.rm("/mnt/station_ramdisk/pendrive/*") + sh.rmdir("/mnt/station_ramdisk/pendrive") self.fail("failed: USB device unmounted during/after copying files.") # check MD5 try: - p = sh.md5sum("/tmp/station/pendrive/usbdatatest.bin") + p = sh.md5sum("/mnt/station_ramdisk/pendrive/usbdatatest.bin") except sh.ErrorReturnCode as e: try: - sh.umount("/tmp/station/pendrive") + sh.umount("/mnt/station_ramdisk/pendrive") except sh.ErrorReturnCode: pass - sh.rmdir("/tmp/station/pendrive") + sh.rmdir("/mnt/station_ramdisk/pendrive") self.fail("failed: Unable to calculate MD5 of the copied file.") newmd5 = p.stdout.decode().split(" ")[0] - with open('/tmp/station/pendrive/usbdatatest.bin.md5', 'r') as outfile: + with open('/mnt/station_ramdisk/pendrive/usbdatatest.bin.md5', 'r') as outfile: oldmd5 = outfile.read().rstrip("\n") outfile.close() if newmd5 != oldmd5: try: - sh.umount("/tmp/station/pendrive") + sh.umount("/mnt/station_ramdisk/pendrive") except sh.ErrorReturnCode: pass - sh.rmdir("/tmp/station/pendrive") + sh.rmdir("/mnt/station_ramdisk/pendrive") self.fail("failed: MD5 check failed.") # delete copied files - sh.rm("-f", "/tmp/station/pendrive/usbdatatest.bin", "/tmp/station/pendrive/usbdatatest.bin.md5") + sh.rm("-f", "/mnt/station_ramdisk/pendrive/usbdatatest.bin", "/mnt/station_ramdisk/pendrive/usbdatatest.bin.md5") # Finish try: - sh.umount("/tmp/station/pendrive") + sh.umount("/mnt/station_ramdisk/pendrive") except sh.ErrorReturnCode: pass - sh.rmdir("/tmp/station/pendrive") + sh.rmdir("/mnt/station_ramdisk/pendrive") def getresults(self): return self.__resultlist diff --git a/test-cli/test/tests/qusbdual.py b/test-cli/test/tests/qusbdual.py index 842d514..054d7f4 100644 --- a/test-cli/test/tests/qusbdual.py +++ b/test-cli/test/tests/qusbdual.py @@ -32,45 +32,45 @@ class Qusbdual(unittest.TestCase): self.fail("failed: could not find any mass storage gadget") device = p.stdout.decode().split(" ")[0] # mount the mass storage gadget - sh.mkdir("-p", "/tmp/station/hdd_gadget") - p = sh.mount("-o", "ro", "/dev/" + device, "/tmp/station/hdd_gadget") + sh.mkdir("-p", "/mnt/station_ramdisk/hdd_gadget") + p = sh.mount("-o", "ro", "/dev/" + device, "/mnt/station_ramdisk/hdd_gadget") if p.exit_code != 0: self.fail("failed: Unable to mount the mass storage gadget.") # execute test for i in range(int(self.__repetitions)): # copy files try: - p = sh.cp("/tmp/station/hdd_gadget/usb-test.bin", "/tmp/station/hdd_gadget/usb-test.bin.md5", + p = sh.cp("/mnt/station_ramdisk/hdd_gadget/usb-test.bin", "/mnt/station_ramdisk/hdd_gadget/usb-test.bin.md5", "/tmp/stationnfs") except sh.ErrorReturnCode as e: - sh.umount("/tmp/station/hdd_gadget") - sh.rmdir("/tmp/station/hdd_gadget") + sh.umount("/mnt/station_ramdisk/hdd_gadget") + sh.rmdir("/mnt/station_ramdisk/hdd_gadget") self.fail("failed: Unable to copy files through USB.") # check if the device is still mounted - if not os.path.ismount("/tmp/station/hdd_gadget"): - sh.rm("/tmp/station/hdd_gadget/*") - sh.rmdir("/tmp/station/hdd_gadget") + if not os.path.ismount("/mnt/station_ramdisk/hdd_gadget"): + sh.rm("/mnt/station_ramdisk/hdd_gadget/*") + sh.rmdir("/mnt/station_ramdisk/hdd_gadget") self.fail("failed: USB device unmounted during/after copying files.") # Check md5 try: - p = sh.md5sum("/tmp/stationnfs/usb-test.bin") + p = sh.md5sum("/mnt/station_nfsdisk/usb-test.bin") except sh.ErrorReturnCode as e: - sh.umount("/tmp/station/hdd_gadget") - sh.rmdir("/tmp/station/hdd_gadget") + sh.umount("/mnt/station_ramdisk/hdd_gadget") + sh.rmdir("/mnt/station_ramdisk/hdd_gadget") self.fail("failed: Unable to calculate MD5 of the copied file.") newmd5 = p.stdout.decode().split(" ")[0] - with open('/tmp/station/hdd_gadget/usb-test.bin.md5', 'r') as outfile: + with open('/mnt/station_ramdisk/hdd_gadget/usb-test.bin.md5', 'r') as outfile: oldmd5 = outfile.read().rstrip("\n") outfile.close() if newmd5 != oldmd5: - sh.umount("/tmp/station/hdd_gadget") - sh.rmdir("/tmp/station/hdd_gadget") + sh.umount("/mnt/station_ramdisk/hdd_gadget") + sh.rmdir("/mnt/station_ramdisk/hdd_gadget") self.fail("failed: MD5 check failed.") # delete copied files - sh.rm("-f", "/tmp/stationnfs/usb-test.bin", "/tmp/stationnfs/usb-test.bin.md5") + sh.rm("-f", "/mnt/station_nfsdisk/usb-test.bin", "/mnt/station_nfsdisk/usb-test.bin.md5") # Finish - sh.umount("/tmp/station/hdd_gadget") - sh.rmdir("/tmp/station/hdd_gadget") + sh.umount("/mnt/station_ramdisk/hdd_gadget") + sh.rmdir("/mnt/station_ramdisk/hdd_gadget") def getresults(self): return self.__resultlist diff --git a/test-cli/test/tests/qwifi.py b/test-cli/test/tests/qwifi.py index eb1f0bf..4522057 100644 --- a/test-cli/test/tests/qwifi.py +++ b/test-cli/test/tests/qwifi.py @@ -51,7 +51,7 @@ class Qwifi(unittest.TestCase): # execute iperf command against the server try: p = sh.iperf3("-c", self.__serverip, "-n", self.__numbytestx, "-f", "m", "-p", self.__port, - "-J", _timeout=20) + "-J", _timeout=30) except sh.TimeoutException: self.fail("failed: iperf timeout reached") @@ -63,13 +63,13 @@ class Qwifi(unittest.TestCase): data = json.loads(p.stdout.decode('ascii')) self.__bwreal = float(data['end']['sum_received']['bits_per_second']) / 1024 / 1024 # save result file - with open('/tmp/station/wifi-iperf3.json', 'w') as outfile: + with open('/mnt/station_ramdisk/wifi-iperf3.json', 'w') as outfile: json.dump(data, outfile, indent=4) outfile.close() self.__resultlist.append( { "description": "iperf3 output", - "filepath": "/tmp/station/wifi-iperf3.json", + "filepath": "/mnt/station_ramdisk/wifi-iperf3.json", "mimetype": "application/json" } ) -- cgit v1.1 From 47827e40f71696e04ad805296dedc44a03451db3 Mon Sep 17 00:00:00 2001 From: Hector Fernandez Date: Mon, 6 Jul 2020 17:51:09 +0200 Subject: In case iperf3 server is busy, clients wait and retry. --- test-cli/.idea/workspace.xml | 5 ----- test-cli/test/tests/qaudio.py | 1 - test-cli/test/tests/qethernet.py | 19 +++++++++++++------ test-cli/test/tests/qwifi.py | 19 +++++++++++++------ 4 files changed, 26 insertions(+), 18 deletions(-) diff --git a/test-cli/.idea/workspace.xml b/test-cli/.idea/workspace.xml index 5d804c5..3d15030 100644 --- a/test-cli/.idea/workspace.xml +++ b/test-cli/.idea/workspace.xml @@ -3,13 +3,8 @@ - - - - -