diff options
author | Manel Caro <mcaro@iseebcn.com> | 2020-03-04 17:46:36 +0100 |
---|---|---|
committer | Manel Caro <mcaro@iseebcn.com> | 2020-03-04 17:46:36 +0100 |
commit | 09de774dcc1a5abc1c8f3a00fdb039aa3c522f52 (patch) | |
tree | c4bd3963d0df01d2e3a33732247388ed4651b186 | |
parent | b6932fbaf898724ae87c29f8965621610f377084 (diff) | |
download | board-09de774dcc1a5abc1c8f3a00fdb039aa3c522f52.zip board-09de774dcc1a5abc1c8f3a00fdb039aa3c522f52.tar.gz board-09de774dcc1a5abc1c8f3a00fdb039aa3c522f52.tar.bz2 |
SOPA Initial Commit
27 files changed, 394 insertions, 291 deletions
diff --git a/test-cli/setup.xml b/test-cli/setup.xml index 41295b5..3feffd0 100644 --- a/test-cli/setup.xml +++ b/test-cli/setup.xml @@ -2,8 +2,8 @@ <data> <setup> <test idline="1"/> <!-- Test line identify --> - <board model="IGEP0000" variant="TEST-TEST" station="1"/> - <db dbname="testsrv" type="PgSQLConnection" host="192.168.2.79" port="5432" user="admin" password="Idkfa2009" /> <!-- database setup --> + <board model="SOPA0000" variant="BW2Q-QXXX" station="1"/> + <db dbname="testsrv" type="PgSQLConnection" host="192.168.8.1" port="5432" user="admin" password="Idkfa2009" /> <!-- database setup --> </setup> </data> diff --git a/test-cli/test/helpers/cv_display_test.py b/test-cli/test/helpers/cv_display_test.py index b54a698..3a3004f 100644 --- a/test-cli/test/helpers/cv_display_test.py +++ b/test-cli/test/helpers/cv_display_test.py @@ -14,8 +14,7 @@ def pattern_detect(cam_device=0): # RETURN 0 only if the test is ok msg="0" # Capture the corresponding camera device [0,1] - #capture = cv2.VideoCapture(0) - capture = cv2.VideoCapture("/dev/v4l/by-id/usb-Creative_Technology_Ltd._Live__Cam_Sync_HD_VF0770-video-index0") + capture = cv2.VideoCapture(0) try: _, image = capture.read() except: @@ -64,12 +63,10 @@ def pattern_detect(cam_device=0): # After testing the gamma factor correction is computed with the green color # gamma = gamma_factor / blue+red into green part # gamma factor = 50-100 - #gamma = (100 / (avg_color_rawg[0] + avg_color_rawg[2] + 1)) - gamma=1 + gamma = (100 / (avg_color_rawg[0] + avg_color_rawg[2] + 1)) # Adjust the image acording to this gamma value adjusted = adjust_gamma(image, gamma=gamma) -# adjusted=image - cv2.imwrite( "/home/root/result_hdmi_img.jpg", adjusted); + # Calculate again the average color using the gamma adjusted image # Crop the gamma adjusted image for wach color section red_cal = adjusted[y1:y2, xr1:xr2] @@ -109,19 +106,14 @@ def pattern_detect(cam_device=0): mask_r = mask0 + mask1 # mask_r = cv2.inRange(hsv, lower_red, upper_red) # Perform a morphological open to expand -# kernel = np.ones((5, 5), np.uint8) -# closing_r = cv2.morphologyEx(mask_r, cv2.MORPH_OPEN, kernel) -# closing_b = cv2.morphologyEx(mask_b, cv2.MORPH_OPEN, kernel) -# closing_g = cv2.morphologyEx(mask_g, cv2.MORPH_OPEN, kernel) + kernel = np.ones((15, 15), np.uint8) + closing_r = cv2.morphologyEx(mask_r, cv2.MORPH_OPEN, kernel) + closing_b = cv2.morphologyEx(mask_b, cv2.MORPH_OPEN, kernel) + closing_g = cv2.morphologyEx(mask_g, cv2.MORPH_OPEN, kernel) # Count the number of pixels that are not of the corresponding color (black) -# count_r = cv2.countNonZero(closing_r) -# count_b = cv2.countNonZero(closing_b) -# count_g = cv2.countNonZero(closing_g) - #----------- - count_r = cv2.countNonZero(mask_r) - count_b = cv2.countNonZero(mask_b) - count_g = cv2.countNonZero(mask_g) - #------- + count_r = cv2.countNonZero(closing_r) + count_b = cv2.countNonZero(closing_b) + count_g = cv2.countNonZero(closing_g) if (count_r < 5): msg = "RED COUNT FAIL" return msg diff --git a/test-cli/test/helpers/get_dieid.py b/test-cli/test/helpers/get_dieid.py index b20f143..029ddb5 100644 --- a/test-cli/test/helpers/get_dieid.py +++ b/test-cli/test/helpers/get_dieid.py @@ -21,8 +21,6 @@ def read(addr): def getRegisters(model): if model.find("IGEP0046") == 0: registers = [0x021BC420, 0x021BC410] - elif model.find("IGEP0000") == 0: - registers = [0x021BC420, 0x021BC410] elif model.find("IGEP0034") == 0 or model.find("SOPA0000") == 0: registers = [0x44e10630, 0x44e10634, 0x44e10638, 0x44e1063C] elif model.find("OMAP3") == 0: diff --git a/test-cli/test/helpers/globalVariables.py b/test-cli/test/helpers/globalVariables.py index c4d8358..6b89f4d 100644 --- a/test-cli/test/helpers/globalVariables.py +++ b/test-cli/test/helpers/globalVariables.py @@ -6,5 +6,3 @@ def globalVar(): g_mid = "" outdata = "NONE" station = "" - fstatus = "" - gdisplay = None
\ No newline at end of file diff --git a/test-cli/test/helpers/iseelogger.py b/test-cli/test/helpers/iseelogger.py new file mode 100644 index 0000000..785a78c --- /dev/null +++ b/test-cli/test/helpers/iseelogger.py @@ -0,0 +1,39 @@ +import logging +import logging.handlers + + +class ISEE_Logger(object): + __logger = None + __logHandler = None + __formater = None + __logHandlerConsole = None + + def __init__(self, level=logging.INFO): + # Create syslog logger + self.__logger = logging.getLogger('ISEE_logger') + self.__logger.setLevel(level) + self.__logHandler = logging.handlers.SysLogHandler('/dev/log') + self.__logHandlerConsole = logging.StreamHandler() + self.__formater = logging.Formatter('Python: { "loggerName":"%(name)s", "timestamp":"%(asctime)s", "pathName":"%(pathname)s", "logRecordCreationTime":"%(created)f", "functionName":"%(funcName)s", "levelNo":"%(levelno)s", "lineNo":"%(lineno)d", "time":"%(msecs)d", "levelName":"%(levelname)s", "message":"%(message)s"}') + self.__logHandler.formatter = self.__formater + self.__logHandlerConsole.formatter = self.__formater + self.__logger.addHandler(self.__logHandler) + self.__logger.addHandler(self.__logHandlerConsole) + + def setLogLevel(self, level): + if level.upper() == "DEBUG": + nlevel = logging.DEBUG + elif level.upper() == "INFO": + nlevel = logging.INFO + elif level.upper() == "ERROR": + nlevel = logging.ERROR + elif level.upper() == "WARNING": + nlevel = logging.WARNING + else: + nlevel = logging.DEBUG + + self.__logger.setLevel(nlevel) + + def getlogger(self): + return self.__logger + diff --git a/test-cli/test/helpers/psqldb.py b/test-cli/test/helpers/psqldb.py index 26dd03d..1d0e422 100644 --- a/test-cli/test/helpers/psqldb.py +++ b/test-cli/test/helpers/psqldb.py @@ -7,15 +7,15 @@ class PgSQLConnection(object): __db_config = {'dbname': 'testsrv', 'host': '192.168.2.171', 'password': 'Idkfa2009', 'port': 5432, 'user': 'admin'} - def __init__ (self): -# self.__conection_object = None -# if connect_str is not None: -# self.__db_config = connect_str -# else: - self.__db_config = {'dbname': 'testsrv', 'host': '192.168.2.171', - 'password': 'Idkfa2009', 'port': 5432, 'user': 'admin'} - - def db_connect (self, connect_str): + def __init__ (self, connect_str = None): + self.__conection_object = None + if connect_str is not None: + self.__db_config = connect_str + else: + self.__db_config = {'dbname': 'testsrv', 'host': '192.168.2.171', + 'password': 'Idkfa2009', 'port': 5432, 'user': 'admin'} + + def db_connect (self, connect_str = None): result = False try: if connect_str == None: @@ -23,6 +23,7 @@ class PgSQLConnection(object): else: self.__db_config = connect_str; self.__conection_object = psycopg2.connect(**self.__db_config) + self.__conection_object.autocommit = True result = True except Exception as error: diff --git a/test-cli/test/helpers/setup_xml.py b/test-cli/test/helpers/setup_xml.py index 3fd9fd5..eb8d73c 100644 --- a/test-cli/test/helpers/setup_xml.py +++ b/test-cli/test/helpers/setup_xml.py @@ -37,4 +37,11 @@ class XMLSetup (object): def getMysqlConnectionStr (self): """aaaaa""" - pass
\ No newline at end of file + pass + + def gettagKey (self, xmltag, xmlkey): + """aaaaa""" + for element in self.__tree.iter(xmltag): + return element.attrib[xmlkey] + + return None
\ No newline at end of file diff --git a/test-cli/test/helpers/syscmd.py b/test-cli/test/helpers/syscmd.py index b579e39..a869bd7 100644 --- a/test-cli/test/helpers/syscmd.py +++ b/test-cli/test/helpers/syscmd.py @@ -10,15 +10,24 @@ class TestSysCommand(unittest.TestCase): __outdata = None __outtofile = False - def __init__(self, testname, testfunc, str_cmd, outtofile = False): + #varlist: str_cmd, outtofile + def __init__(self, testname, testfunc, varlist): """ init """ super(TestSysCommand, self).__init__(testfunc) - self.__str_cmd = str_cmd + if "str_cmd" in varlist: + self.__str_cmd = varlist["str_cmd"] + else: + raise Exception('str_cmd param inside TestSysCommand have been be defined') self.__testname = testname - self.__outtofile = outtofile + if "outtofile" in varlist: + self.__outtofile = varlist["outtofile"] + if self.__outtofile is True: + self.__outfilename = '/tmp/{}.txt'.format(testname) + else: + self.__outtofile = None + self.__outfilename = None self._testMethodDoc = testname - if self.__outtofile is True: - self.__outfilename = '/tmp/{}.txt'.format(testname) + def getName(self): return self.__testname diff --git a/test-cli/test/helpers/testsrv_db.py b/test-cli/test/helpers/testsrv_db.py index 94181f9..f08cb17 100644 --- a/test-cli/test/helpers/testsrv_db.py +++ b/test-cli/test/helpers/testsrv_db.py @@ -19,12 +19,12 @@ class TestSrv_Database(object): def __init__(self): pass - def open (self, filename): - '''Open database connection''' - self.__xml_setup = XMLSetup(filename) + def open (self, xmlObj): + self.__xml_setup = xmlObj; self.__sqlObject = PgSQLConnection() return self.__sqlObject.db_connect(self.__xml_setup.getdbConnectionStr()) + def create_board(self, processor_id, model_id, variant, bmac = None): '''create a new board''' if bmac is None: @@ -95,7 +95,20 @@ class TestSrv_Database(object): def getboard_comp_test_list(self, board_uuid): '''get the board test list''' - sql = "SELECT isee.gettestcompletelist('{}')".format(board_uuid) + sql = "SELECT * FROM isee.gettestcompletelist('{}')".format(board_uuid) + #print('>>>' + sql) + try: + res = self.__sqlObject.db_execute_query(sql) + #print(res) + return res; + except Exception as err: + r = find_between(str(err), '#', '#') + #print(r) + return None + + def getboard_test_variables(self, board_uuid, testdefid): + '''get the board test list''' + sql = "SELECT * FROM isee.getboardtestvariables('{}', '{}')".format(board_uuid, testdefid) #print('>>>' + sql) try: res = self.__sqlObject.db_execute_query(sql) @@ -159,18 +172,4 @@ class TestSrv_Database(object): except Exception as err: r = find_between(str(err), '#', '#') #print(r) - return None - - def getboard_eeprom(self, board_uuid): - '''get the board eeprom struct ''' - sql = "SELECT isee.getboard_eeprom('{}')".format(board_uuid) - #print('>>>' + sql) - try: - res = self.__sqlObject.db_execute_query(sql) - #print(res) - return res; - except Exception as err: - r = find_between(str(err), '#', '#') - #print(r) - return None -
\ No newline at end of file + return None
\ No newline at end of file diff --git a/test-cli/test/helpers/usb.sh b/test-cli/test/helpers/usb.sh new file mode 100755 index 0000000..c8e6924 --- /dev/null +++ b/test-cli/test/helpers/usb.sh @@ -0,0 +1,14 @@ +#!/bin/bash + +for sysdevpath in $(find /sys/bus/usb/devices/usb*/ -name dev); do + ( + syspath="${sysdevpath%/dev}" + devname="$(udevadm info -q name -p $syspath)" + [[ "$devname" == "bus/"* ]] && continue + eval "$(udevadm info -q property --export -p $syspath)" + [[ -z "$ID_SERIAL" ]] && continue + echo "/dev/$devname - $ID_SERIAL" + ) +done + +return 0 diff --git a/test-cli/test/runners/simple.py b/test-cli/test/runners/simple.py index 5b2da5a..3537d0e 100644 --- a/test-cli/test/runners/simple.py +++ b/test-cli/test/runners/simple.py @@ -22,9 +22,12 @@ class SimpleTestRunner: --------------------------------------------- """ - def __init__(self, stream=sys.stderr, verbosity=0): + __pgObj = None + + def __init__(self, pgObj, stream=sys.stderr, verbosity=0): self.stream = stream self.verbosity = verbosity + self.__pgObj = pgObj def writeUpdate(self, message): self.stream.write(message) @@ -33,7 +36,7 @@ class SimpleTestRunner: """ Run the given test case or Test Suite. """ - result = TextTestResult(self) + result = TextTestResult(self, self.__pgObj) test(result) result.testsRun # self.writeUpdate("---------------------------------------------\n") @@ -45,18 +48,22 @@ class TextTestResult(unittest.TestResult): FAIL = '\033[31mFAIL\033[0m\n' ERROR = '\033[31mERROR\033[0m\n' - def __init__(self, runner): + __pgObj = None + + def __init__(self, runner, pgObj): unittest.TestResult.__init__(self) self.runner = runner self.result = self.ERROR + self.__pgObj = pgObj def startTest(self, test): unittest.TestResult.startTest(self, test) self.runner.writeUpdate("%s : " % test.shortDescription()) # SEND TO DB THE UPDATE THAT WE RUN EACH TEST - psdb = TestSrv_Database() - psdb.open("setup.xml") - psdb.update_set_test_row(globalVar.station, globalVar.testid_ctl, globalVar.g_uuid, test.shortDescription(), "RUNNING") + self.__pgObj.update_set_test_row(globalVar.station, globalVar.testid_ctl, globalVar.g_uuid, test.shortDescription(), "RUNNING") + # psdb = TestSrv_Database() + # psdb.open("setup.xml") + # psdb.update_set_test_row(globalVar.station, globalVar.testid_ctl, globalVar.g_uuid, test.shortDescription(), "RUNNING") def addSuccess(self, test): unittest.TestResult.addSuccess(self, test) @@ -93,6 +100,7 @@ class TextTestResult(unittest.TestResult): if self.result == self.FAIL: simple_result = "FALSE" elif self.result == self.ERROR: simple_result = "FALSE" #SEND TO DB THE RESULT OF THE TEST - psdb = TestSrv_Database() - psdb.open("setup.xml") - psdb.add_test_to_batch(globalVar.g_uuid, test.shortDescription(), globalVar.testid_ctl, simple_result, globalVar.g_mid, test.longMessage) + # psdb = TestSrv_Database() + # psdb.open("setup.xml") + self.__pgObj.add_test_to_batch(globalVar.g_uuid, test.shortDescription(), globalVar.testid_ctl, simple_result, globalVar.g_mid, test.longMessage) + # psdb.add_test_to_batch(globalVar.g_uuid, test.shortDescription(), globalVar.testid_ctl, simple_result, globalVar.g_mid, test.longMessage) diff --git a/test-cli/test/tests/qamp.py b/test-cli/test/tests/qamp.py index 9fcd6d2..3e38ce9 100644 --- a/test-cli/test/tests/qamp.py +++ b/test-cli/test/tests/qamp.py @@ -5,10 +5,17 @@ import time class Qamp(unittest.TestCase): - def __init__(self, testname, testfunc, undercurrent=0.1, overcurrent=2): + #varlist: undercurrent, overcurrent + def __init__(self, testname, testfunc, varlist): self._current = 0.0 - self._undercurrent=undercurrent - self._overcurrent = overcurrent + if "undercurrent" in varlist: + self._undercurrent = varlist["undercurrent"] + else: + raise Exception('undercurrent param inside Qamp must be defined') + if "overcurrent" in varlist: + self._overcurrent = varlist["overcurrent"] + else: + raise Exception('overcurrent param inside Qamp must be defined') self._vshuntfactor=16384.0 self._ser = serial.Serial() self._ser.port = '/dev/ttyUSB0' @@ -33,24 +40,12 @@ class Qamp(unittest.TestCase): except: self.fail("failed: IMPOSSIBLE OPEN USB-SERIAL PORT ( {} )".format(self._ser.port)) error=1 - return -1 if error==0: # Clean input and output buffer of serial port self._ser.flushInput() self._ser.flushOutput() # Send command to read Voltage at Shunt resistor # Prepare cmd - cmd = ('at\n\r') - i=0 - while (i < len(cmd)): - i = i + self._ser.write(cmd[i].encode('ascii')) - time.sleep(0.05) - self._ser.read(1) - res0 = [] - while (self._ser.inWaiting() > 0): # if incoming bytes are waiting to be read from the serial input buffer - res0.append(self._ser.read(1).decode('ascii')) # read the bytes and convert from binary array to ASCII - print(res0) - #CHECK COM FIRST cmd = ('at+in?\n\r') i = 0 @@ -67,21 +62,15 @@ class Qamp(unittest.TestCase): string = ''.join(res) string = string.replace('\n', '') - try: - self._current = float(int(string, 0)) / self._vshuntfactor - except: - self.fail("failed: CAN'T READ CONSUMPTION (CURRENT=0?)") - error=1 - return -1 - if error==0: - print(self._current) + self._current = float(int(string, 0)) / self._vshuntfactor + print(self._current) # In order to give a valid result it is importarnt to define an under current value - if (self._current > float(self._overcurrent)): - self.fail("failed: OVERCURRENT DETECTED ( {} )".format(self._current)) - return -1 + if (self._current > float(self._overcurrent)): + self.fail("failed: OVERCURRENT DETECTED ( {} )".format(self._current)) + + if (self._current < float(self._undercurrent)): + self.fail("failed: UNDERCURRENT DETECTED ( {} )".format(self._current)) + - if (self._current < float(self._undercurrent)): - self.fail("failed: UNDERCURRENT DETECTED ( {} )".format(self._current)) - return -1 diff --git a/test-cli/test/tests/qaudio.py b/test-cli/test/tests/qaudio.py index a1572ca..fe57be2 100644 --- a/test-cli/test/tests/qaudio.py +++ b/test-cli/test/tests/qaudio.py @@ -13,13 +13,14 @@ class Qaudio(unittest.TestCase): self.__refSum = 25 # 1+3+5+7+9 def execute(self): - str_cmd = "aplay test/files/dtmf-13579.wav 2> /dev/null & arecord -r 8000 -d 1 recorded.wav 2> /dev/null" #.format(self.__dtmfFile) + str_cmd = "aplay test/files/dtmf-13579.wav & arecord -r 8000 -d 1 recorded.wav" #.format(self.__dtmfFile) audio_loop = SysCommand("command-name", str_cmd) - if audio_loop.execute() == 0:# BUG: Returns -1 but work + if audio_loop.execute() == -1:# BUG: Returns -1 but work lines = audio_loop.getOutput().splitlines() - str_cmd = "multimon -t wav -a DTMF recorded.wav -q 2> /dev/null" + str_cmd = "multimon -t wav -a DTMF recorded.wav -q" dtmf_decoder = SysCommand("command-name", str_cmd) - if dtmf_decoder.execute() == 0: # BUG: Returns -1 but work + a=dtmf_decoder.execute() + if dtmf_decoder.execute() == -1: # BUG: Returns -1 but work self.__raw_out = dtmf_decoder.getOutput() if self.__raw_out == "": return -1 @@ -34,4 +35,4 @@ class Qaudio(unittest.TestCase): else: self.fail("failed: fail playing/recording file") return -1 - return 0 + return 0
\ No newline at end of file diff --git a/test-cli/test/tests/qbutton.py b/test-cli/test/tests/qbutton.py index 72a897e..4718924 100644 --- a/test-cli/test/tests/qbutton.py +++ b/test-cli/test/tests/qbutton.py @@ -5,8 +5,12 @@ import time class Qbutton(unittest.TestCase): - def __init__(self, testname, testfunc, gpio): - if gpio == "SOPA": + def __init__(self, testname, testfunc, varlist): + if "gpio" in varlist: + self.__gpio = varlist["gpio"] + else: + raise Exception('gpio param inside Qbutton must be defined') + if self.__gpio == "SOPA": super(Qbutton, self).__init__("buttonSopa") else: super(Qbutton, self).__init__("buttonGpio") @@ -34,19 +38,22 @@ class Qbutton(unittest.TestCase): get_button_val = SysCommand("get_button_val", str_cmd) print("\n\t --> PRESS button for 1 sec (TIMEOUT: 10s) \n") timeout = 0 - while timeout < 20: + while timeout < 7200: if get_button_val.execute() == 0: get_button_val.execute() button_value = get_button_val.getOutput() button_value=button_value.decode('ascii').split("x") if int(button_value[1]) == 4: - timeout = 20 + timeout = 7200 + led_off="echo 0 > /sys/class/leds/red\:usbhost/brightness" + ledoff = SysCommand("led_off", led_off) + ledoff.execute() time.sleep(0.5) timeout = timeout + 1 - if timeout==20 and int(button_value[1]) == 0: + if timeout==7200 and int(button_value[1]) == 0: self.fail("failed: timeout exceeded") else: - timeout = 20 + timeout = 7200 self.fail("failed: not button input") else: self.fail("failed: could not complete i2c reset button state") diff --git a/test-cli/test/tests/qduplex_ser.py b/test-cli/test/tests/qduplex_ser.py index 98bda81..837b4d0 100644 --- a/test-cli/test/tests/qduplex_ser.py +++ b/test-cli/test/tests/qduplex_ser.py @@ -6,15 +6,26 @@ import time class Qduplex(unittest.TestCase): - def __init__(self, testname, testfunc, port1, port2, baudrate): + def __init__(self, testname, testfunc, varlist): super(Qduplex, self).__init__(testfunc) - self.__port1 = port1 + if "port1" in varlist: + self.__port1 = varlist["port1"] + else: + raise Exception('port1 param inside Qduplex must be defined') self.__serial1 = serial.Serial(self.__port1, timeout=1) - self.__serial1.baudrate = baudrate - self.__port2 = port2 + if "port2" in varlist: + self.__port2 = varlist["port2"] + else: + raise Exception('port2 param inside Qduplex must be defined') self.__serial2 = serial.Serial(self.__port2, timeout=1) - self.__serial2.baudrate = baudrate + + if "baudrate" in varlist: + self.__serial1.baudrate = varlist["baudrate"] + self.__serial2.baudrate = varlist["baudrate"] + else: + raise Exception('baudrate param inside Qduplex must be defined') + self._testMethodDoc = testname def __del__(self): diff --git a/test-cli/test/tests/qeeprom.py b/test-cli/test/tests/qeeprom.py index f72f78f..2bab248 100644 --- a/test-cli/test/tests/qeeprom.py +++ b/test-cli/test/tests/qeeprom.py @@ -4,7 +4,7 @@ import uuid class Qeeprom(unittest.TestCase): - def __init__(self, testname, testfunc): + def __init__(self, testname, testfunc, varlist): super(Qeeprom, self).__init__(testfunc) self._testMethodDoc = testname diff --git a/test-cli/test/tests/qethernet.py b/test-cli/test/tests/qethernet.py index 2ac447c..be984f5 100644 --- a/test-cli/test/tests/qethernet.py +++ b/test-cli/test/tests/qethernet.py @@ -1,57 +1,51 @@ -from test.helpers.syscmd import SysCommand import unittest - +import sh +import ex class Qethernet(unittest.TestCase): __sip = None - __raw_out = None - __MB_req = None - __MB_real = None - __BW_real = None - __dat_list = None + __numbytestx = None __bind = None __OKBW = None - def __init__(self, testname, testfunc, sip = None, OKBW=100, bind=None): + def __init__(self, testname, testfunc, varlist): super(Qethernet, self).__init__(testfunc) - if sip is not None: - self.__sip = sip - if sip is not None: - self.__bind = bind - self.__MB_req = '10' - self.__OKBW=OKBW + if "sip" in varlist: + self.__sip = varlist["sip"] + else: + raise Exception('sip param inside Qethernet have been be defined') + if "bind" in varlist: + self.__bind = varlist["bind"] + else: + self.__bind = None + if "OKBW" in varlist: + self.__OKBW = varlist["OKBW"] + else: + raise Exception('OKBW param inside Qethernet must be defined') + self.__numbytestx = "10M" self._testMethodDoc = testname def execute(self): - print + # execute iperf command against the server if self.__bind is None: - str_cmd = "iperf -c {} -x CMSV -n {}M".format(self.__sip, self.__MB_req) + p = sh.iperf("-c", self.__sip, "-x", "CMSV", "-n", self.__numbytestx, "-f", "m") else: - str_cmd = "iperf -c {} -x CMSV -n {}M -B {}".format(self.__sip, self.__MB_req, self.__bind) - iperf_command = SysCommand("iperf", str_cmd) - if iperf_command.execute() == 0: - self.__raw_out = iperf_command.getOutput() - if self.__raw_out == "": - return -1 - lines = iperf_command.getOutput().splitlines() - dat = lines[1] - dat = dat.decode('ascii') - dat_list = dat.split( ) - for d in dat_list: - a = dat_list.pop(0) - if a == "sec": - break - self.__MB_real = dat_list[0] - self.__BW_real = dat_list[2] - self.__dat_list = dat_list - #print(self.__MB_real) - #print(self.__BW_real) - self.failUnless(float(self.__BW_real)>float(self.__OKBW)*0.9,"failed: speed is lower than spected. Speed(MB/s)" + str(self.__BW_real)) + p = sh.iperf("-c", self.__sip, "-x", "CMSV", "-n", self.__numbytestx, "-f", "m", "-B", self.__bind) + #check if it was executed succesfully + if p.exit_code == 0: + if p.stdout == "": + self.fail("failed: error executing iperf command") + # analyze output string + # split by lines + lines = p.stdout.splitlines() + # get first line + dat = lines[1].decode('ascii') + # search for the BW value + a = re.search("\d+(\.\d)? Mbits/sec",dat) + b = a.group().split( ) + BWreal = b[0] + + # check if BW is in the expected range + self.failUnless(float(BWreal)>float(self.__OKBW)*0.9,"failed: speed is lower than spected. Speed(MB/s)" + str(BWreal)) else: self.fail("failed: could not complete iperf command") - - def get_Total_MB(self): - return self.__MB_real; - - def get_Total_BW(self): - return self.__MB_real; diff --git a/test-cli/test/tests/qflash.py b/test-cli/test/tests/qflash.py index cdc4109..0c3fcb5 100644 --- a/test-cli/test/tests/qflash.py +++ b/test-cli/test/tests/qflash.py @@ -4,16 +4,13 @@ from test.helpers.globalVariables import globalVar class Qflasher(unittest.TestCase): - def __init__(self, testname, testfunc): + def __init__(self, testname, testfunc, varlist): super(Qflasher, self).__init__(testfunc) self._testMethodDoc = testname model=globalVar.g_mid if model.find("IGEP0046") == 0: self._flash_method = "mx6" self._binlocation="/boot/" - elif model.find("IGEP0000") == 0: - self._flash_method = "mx6" - self._binlocation="/boot/" elif model.find("IGEP0034") == 0: self._flash_method = "nandti" self._binlocation = "/boot/" diff --git a/test-cli/test/tests/qi2c.py b/test-cli/test/tests/qi2c.py index 409005c..2f14424 100644 --- a/test-cli/test/tests/qi2c.py +++ b/test-cli/test/tests/qi2c.py @@ -3,10 +3,16 @@ import unittest class Qi2c(unittest.TestCase): - def __init__(self, testname, testfunc, busnum, register): + def __init__(self, testname, testfunc, varlist): super(Qi2c, self).__init__(testfunc) - self.__busnum = busnum - self.__register = register.split("/") + if "busnum" in varlist: + self.__busnum = varlist["busnum"] + else: + raise Exception('busnum param inside Qi2c must be defined') + if "register" in varlist: + self.__register = varlist["register"].split("/") + else: + raise Exception('register param inside Qi2c must be defined') self.__devices=[] self._testMethodDoc = testname diff --git a/test-cli/test/tests/qnand.py b/test-cli/test/tests/qnand.py new file mode 100644 index 0000000..9f2cd47 --- /dev/null +++ b/test-cli/test/tests/qnand.py @@ -0,0 +1,22 @@ +import unittest +import sh + +class Qnand(unittest.TestCase): + + __device = "10M" + + # varlist: device + def __init__(self, testname, testfunc, varlist): + super(Qnand, self).__init__(testfunc) + + if "device" in varlist: + self.__device = varlist["device"] + else: + raise Exception('device param inside Qnand must be defined') + self._testMethodDoc = testname + + def execute(self): + try: + sh.nandtest("-m", self.__device) + except sh.ErrorReturnCode as e: + self.fail("failed: could not complete nandtest command") diff --git a/test-cli/test/tests/qram.py b/test-cli/test/tests/qram.py index 8ec0210..1b66e5c 100644 --- a/test-cli/test/tests/qram.py +++ b/test-cli/test/tests/qram.py @@ -1,22 +1,27 @@ -from test.helpers.syscmd import SysCommand import unittest +import sh class Qram(unittest.TestCase): - def __init__(self, testname, testfunc, memSize): + __memSize = "10M" + __loops = "1" + + # varlist: memSize, loops + def __init__(self, testname, testfunc, varlist): super(Qram, self).__init__(testfunc) - self.__memSize = memSize + + if "memSize" in varlist: + self.__memSize = varlist["memSize"] + else: + raise Exception('memSize param inside Qram must be defined') + if "loops" in varlist: + self.__loops = varlist["loops"] + else: + raise Exception('loops param inside Qram must be defined') self._testMethodDoc = testname def execute(self): - str_cmd= "free -m" - free_command = SysCommand("free_ram", str_cmd) - if free_command.execute() == 0: - self.__raw_out = free_command.getOutput() - if self.__raw_out == "": - return -1 - lines = free_command.getOutput().splitlines() - aux = [int(s) for s in lines[1].split() if s.isdigit()] - self.failUnless(int(aux[0])>int(self.__memSize),"failed: total ram memory size lower than expected") - else: - self.fail("failed: could not complete iperf command") + try: + sh.memtester(self.__memSize, "1") + except sh.ErrorReturnCode as e: + self.fail("failed: could not complete memtester command") diff --git a/test-cli/test/tests/qrtc.py b/test-cli/test/tests/qrtc.py index 1d02f78..8e31572 100644 --- a/test-cli/test/tests/qrtc.py +++ b/test-cli/test/tests/qrtc.py @@ -4,9 +4,14 @@ import time class Qrtc(unittest.TestCase): - def __init__(self, testname, testfunc, rtc): + __rtc = "/dev/rtc0" + + def __init__(self, testname, testfunc, varlist): super(Qrtc, self).__init__(testfunc) - self.__rtc = rtc + if "rtc" in varlist: + self.__rtc = varlist["rtc"] + else: + raise Exception('rtc param inside Qrtc must be defined') self._testMethodDoc = testname def execute(self): diff --git a/test-cli/test/tests/qscreen.py b/test-cli/test/tests/qscreen.py index b8a5a48..87e53b2 100644 --- a/test-cli/test/tests/qscreen.py +++ b/test-cli/test/tests/qscreen.py @@ -5,9 +5,13 @@ from test.helpers.cv_display_test import pattern_detect class Qscreen(unittest.TestCase): - def __init__(self, testname, testfunc, display): + #varlist: display + def __init__(self, testname, testfunc, varlist): super(Qscreen, self).__init__(testfunc) - self.__display = display + if "display" in varlist: + self.__display = varlist["display"] + else: + raise Exception('display param inside Qscreen have been be defined') self._testMethodDoc = testname def execute(self): @@ -17,15 +21,5 @@ class Qscreen(unittest.TestCase): test_screen = pattern_detect(1) if not test_screen=="0": self.fail("failed: {}".format(test_screen)) - str_cmd= "fbi -T 1 /home/root/result_hdmi_img.jpg -d /dev/fb0 --noverbose -a" - show_img = SysCommand("show-image", str_cmd) - show_img.execute() else: self.fail("failed: could not display the image") - try: - str_cmd= "fbi -T 1 /home/root/result_hdmi_img.jpg -d /dev/fb0 --noverbose -a" - show_img = SysCommand("show-image", str_cmd) - show_img.execute() - except ValueError: - print("COULD NOT DISPLAY IMAGE") - diff --git a/test-cli/test/tests/qserial.py b/test-cli/test/tests/qserial.py index 43ba3c8..d3e2ee6 100644 --- a/test-cli/test/tests/qserial.py +++ b/test-cli/test/tests/qserial.py @@ -6,11 +6,17 @@ import time class Qserial(unittest.TestCase): - def __init__(self, testname, testfunc, port, baudrate): + def __init__(self, testname, testfunc, varlist): super(Qserial, self).__init__(testfunc) - self.__port = port + if "port" in varlist: + self.__port = varlist["port"] + else: + raise Exception('port param inside Qserial must be defined') self.__serial = serial.Serial(self.__port, timeout=1) - self.__serial.baudrate = baudrate + if "baudrate" in varlist: + self.__baudrate = varlist["baudrate"] + else: + raise Exception('baudrate param inside Qserial must be defined') self._testMethodDoc = testname def __del__(self): @@ -19,12 +25,15 @@ class Qserial(unittest.TestCase): def execute(self): self.__serial.flushInput() self.__serial.flushOutput() + #generate a random uuid test_uuid = str(uuid.uuid4()).encode() + #send the uuid through serial port self.__serial.write(test_uuid) time.sleep(0.05) # there might be a small delay if self.__serial.inWaiting() == 0: self.fail("failed: PORT {} wait timeout exceded, wrong communication?".format(self.__port)) else: + # check if what it was sent is equal to what has been received if (self.__serial.readline() != test_uuid): self.fail("failed: PORT {} write/read mismatch".format(self.__port)) diff --git a/test-cli/test/tests/qusb.py b/test-cli/test/tests/qusb.py index 44490bc..0390143 100644 --- a/test-cli/test/tests/qusb.py +++ b/test-cli/test/tests/qusb.py @@ -3,17 +3,20 @@ import unittest class Qusb(unittest.TestCase): - def __init__(self, testname, testfunc, devLabel, numPorts): + def __init__(self, testname, testfunc, varlist): super(Qusb, self).__init__(testfunc) - self.__numPorts = numPorts + if "numPorts" in varlist: + self.__numPorts = varlist["numPorts"] + else: + raise Exception('numPorts param inside Qusb must be defined') self._testMethodDoc = testname - self.__devLabel = devLabel + if "devLabel" in varlist: + self.__devLabel = varlist["devLabel"] + else: + raise Exception('devLabel param inside Qusb must be defined') if testname=="USBOTG": self.__usbFileName = "/this_is_an_usb_otg" self.__usbtext = "USBOTG" - elif testname=="SATA": - self.__usbFileName = "/this_is_a_sata" - self.__usbtext = "SATA" else: self.__usbFileName = "/this_is_an_usb_host" self.__usbtext = "USBHOST" @@ -57,4 +60,4 @@ class Qusb(unittest.TestCase): else: self.fail("failed: reference and real usb host devices number mismatch") else: - self.fail("failed: couldn't execute lsblk command") + self.fail("failed: couldn't execute lsblk command")
\ No newline at end of file diff --git a/test-cli/test/tests/qwifi.py b/test-cli/test/tests/qwifi.py index d08149b..188e300 100644 --- a/test-cli/test/tests/qwifi.py +++ b/test-cli/test/tests/qwifi.py @@ -1,45 +1,33 @@ from test.helpers.syscmd import SysCommand import unittest -import subprocess class Qwifi(unittest.TestCase): - def __init__(self, testname, testfunc, signal): + def __init__(self, testname, testfunc, varlist): super(Qwifi, self).__init__(testfunc) - self.__signal = signal + if "signal" in varlist: + self.__signal = varlist["signal"] + else: + raise Exception('signal param inside Qwifi must be defined') self._testMethodDoc = testname - # WiFi SERVERIP fixed at the moment. - self._serverip = "192.168.5.1" def execute(self): - # First check connection with the wifi server using ping command - #str_cmd = "ping -c 1 {} > /dev/null".format(self._serverip) - #wifi_ping = SysCommand("wifi_ping", str_cmd) - #wifi_ping.execute() - #res = subprocess.call(['ping', '-c', '1', self._serverip]) - p = subprocess.Popen(['ping','-c','1',self._serverip,'-W','2'],stdout=subprocess.PIPE) - p.wait() - res=p.poll() - if res == 0: - str_cmd= "iw wlan0 link" - wifi_stats = SysCommand("wifi_stats", str_cmd) - if wifi_stats.execute() == 0: - w_stats = wifi_stats.getOutput().decode('ascii').splitlines() - #self._longMessage = str(self.__raw_out).replace("'", "") - if not w_stats[0] == "Not connected.": - signal_st = w_stats[5].split(" ")[1] - try: - int(signal_st) - if -1*int(signal_st) > int(self.__signal): - self.fail("failed: signal to server lower than expected") - except ValueError: - self.fail("failed: error output (Bad connection?)") - else: + str_cmd= "iw wlan0 link" + wifi_stats = SysCommand("wifi_stats", str_cmd) + if wifi_stats.execute() == 0: + w_stats = wifi_stats.getOutput().decode('ascii').splitlines() + #self._longMessage = str(self.__raw_out).replace("'", "") + if not w_stats[0] == "Not connected.": + signal_st = w_stats[5].split(" ")[1] + try: + int(signal_st) + if -1*int(signal_st) > int(self.__signal): + self.fail("failed: signal to server lower than expected") + except ValueError: self.fail("failed: error output (Bad connection?)") - #tx_brate = float(w_stats[6].split(" ")[2]) else: - self.fail("failed: couldn't execute iw command") + self.fail("failed: error output (Bad connection?)") + #tx_brate = float(w_stats[6].split(" ")[2]) else: - self.fail("failed: ping to server {}".format(self._serverip)) - + self.fail("failed: couldn't execute iw command") diff --git a/test-cli/test_main.py b/test-cli/test_main.py index 3c4d1cb..a44b7d1 100644 --- a/test-cli/test_main.py +++ b/test-cli/test_main.py @@ -6,6 +6,7 @@ import sys import os import unittest from test.helpers.testsrv_db import TestSrv_Database +from test.helpers.setup_xml import XMLSetup from test.runners.simple import SimpleTestRunner from test.tests.qbutton import Qbutton from test.helpers.syscmd import TestSysCommand @@ -24,8 +25,17 @@ from test.tests.qrtc import Qrtc from test.tests.qduplex_ser import Qduplex from test.tests.qamp import Qamp from test.tests.qflash import Qflasher -from test.helpers.finisher import Finisher +from test.tests.qnand import Qnand from test.helpers.globalVariables import globalVar +import socket +import re +from test.helpers.iseelogger import ISEE_Logger +import logging + + +psdbObj = TestSrv_Database() +xmlObj = None +loggerObj = None # define clear function def clear(): @@ -34,85 +44,82 @@ def clear(): def create_board(): - psdb = TestSrv_Database() - psdb.open("setup.xml") - tree = XMLParser.parse('setup.xml') - root = tree.getroot() - suite = unittest.TestSuite() - for element in root.iter('board'): - # print(str(element.tag) + str(element.attrib)) - model_id = element.attrib['model'] - variant = element.attrib['variant'] - nstation = element.attrib['station'] + model_id = xmlObj.gettagKey('board', 'model') + variant = xmlObj.gettagKey('board', 'variant') + + # get model id globalVar.g_mid=model_id + "-" + variant - globalVar.station=nstation - processor_id=genDieid(globalVar.g_mid) + + # get station number + hstname = socket.gethostname() + if (re.search("^Station\d{2}$", hstname)): + globalVar.station = int(re.search("\d{2}", hstname).group(0)) + else: + raise Exception('Station number not configured') + exit(3) + + processor_id = genDieid(globalVar.g_mid) print(globalVar.g_mid) print(processor_id) - globalVar.g_uuid = psdb.create_board(processor_id, model_id, variant, bmac = None) + s = globalVar.g_uuid = psdbObj.create_board(processor_id, model_id, variant, bmac = None) + +def createvarlist(testvars): + varlist = {} + for row in testvars: + varname,testparam = row + varlist[varname] = testparam + return varlist + def testsuite(): - psdb=TestSrv_Database() - psdb.open("setup.xml") suite = unittest.TestSuite() - tests=psdb.getboard_comp_test_list(globalVar.g_uuid) - for i in range(len(tests)): - #newstr = oldstr.replace("M", "") - variables=str(tests[i][0]).split(",") - testname=variables[0].replace('(', '') - testdes=variables[1] - testfunc=variables[2] - if len(tests)>2: - testparam=variables[3].replace(')', '') - testparam = testparam.replace('"', '') - testparam = testparam.replace(';', "','") - if testparam == "": - command = "suite.addTest({}('{}','execute'))".format(testfunc, testname) - else: - command="suite.addTest({}('{}','execute','{}'))".format(testfunc,testname,testparam) - else: - print(testname) - command = "suite.addTest({}('{}','execute'))".format(testfunc, testname) + tests = psdbObj.getboard_comp_test_list(globalVar.g_uuid) + for row in tests: + testdefname,testfunc = row + testvars = psdbObj.getboard_test_variables(globalVar.g_uuid, testdefname) + varlist = createvarlist(testvars) + command = "suite.addTest({}('{}','execute', varlist))".format(testfunc, testdefname) exec(command) - globalVar.testid_ctl=psdb.open_testbatch(globalVar.g_uuid) + + globalVar.testid_ctl=psdbObj.open_testbatch(globalVar.g_uuid) return suite def finish_test(): - psdb = TestSrv_Database() - psdb.open("setup.xml") - auxs = psdb.close_testbatch(globalVar.g_uuid, globalVar.testid_ctl) - globalVar.fstatus = auxs[0][0] - # Burn eeprom struct - psdb = TestSrv_Database() - psdb.open("setup.xml") - # We should call getboard_eeprom only if test was ok - if globalVar.fstatus: - aux = psdb.getboard_eeprom(globalVar.g_uuid) - finish = Finisher(aux) - finish.end_ok() - else: - finish = Finisher(globalVar.g_uuid) - finish.end_fail() - # Update set_test current_test with 'END' so that it finally gets painted in green - psdb = TestSrv_Database() - psdb.open("setup.xml") - psdb.update_set_test_row(globalVar.station, globalVar.testid_ctl, globalVar.g_uuid, "END","FINISH") + psdbObj.close_testbatch(globalVar.g_uuid, globalVar.testid_ctl) + # Update Set Test status with FINISH so that status column is not modified because close_testbatch already did it. + psdbObj.update_set_test_row(globalVar.station, globalVar.testid_ctl, globalVar.g_uuid, 'END', 'FINISH') def main(): - #addtesttomodel() - #addtestdef() create_board() - #globalVar.g_uuid = "1f59c654-0cc6-11e8-8d51-e644f56b8edd" try: os.remove("test_results.dat") except: pass - runner = SimpleTestRunner() + runner = SimpleTestRunner(psdbObj) runner.run(testsuite()) finish_test() if __name__ == "__main__": + # Clear the shell screen clear() - main() + + # create logger + loggerObj = ISEE_Logger(logging.INFO) + # logger = loggerObj.getlogger().info("Starting test script...") + + # Try to parse the setup.xml file + try: + xmlObj = XMLSetup("setup.xml") + except: + print("Error: Cannot parse setup.xml file") + exit(1) + + # Try to connect to the DB, according to setup.xml configuration + if psdbObj.open(xmlObj): + main() + else: + print("Error: Cannot open DB connection") + exit(2) + |