diff options
author | Manel Caro <mcaro@iatec.biz> | 2021-11-06 16:28:38 +0100 |
---|---|---|
committer | Manel Caro <mcaro@iatec.biz> | 2021-11-06 16:28:38 +0100 |
commit | cf19bfe18cbd283b188a858ee1629f9909c924f4 (patch) | |
tree | 1efb23519727130058401df090ab1b5f4cc8ba99 /test-cli/test_main.py | |
parent | b6932fbaf898724ae87c29f8965621610f377084 (diff) | |
parent | d5b273a3b58a250742049df4ca0ef0ba54f53d33 (diff) | |
download | board-rel.0.1.zip board-rel.0.1.tar.gz board-rel.0.1.tar.bz2 |
Diffstat (limited to 'test-cli/test_main.py')
-rw-r--r-- | test-cli/test_main.py | 495 |
1 files changed, 404 insertions, 91 deletions
diff --git a/test-cli/test_main.py b/test-cli/test_main.py index 3c4d1cb..1f5d1e4 100644 --- a/test-cli/test_main.py +++ b/test-cli/test_main.py @@ -1,118 +1,431 @@ -from test.helpers.get_dieid import genDieid -from subprocess import call -import xml.etree.ElementTree as XMLParser -import errno -import sys +import time +import sh import os import unittest +import socket +import logging +import sys +import getopt +import random + +from datetime import datetime +from test.helpers.int_registers import get_die_id +from test.helpers.int_registers import get_internal_mac +from subprocess import call from test.helpers.testsrv_db import TestSrv_Database +from test.helpers.setup_xml import XMLSetup from test.runners.simple import SimpleTestRunner -from test.tests.qbutton import Qbutton -from test.helpers.syscmd import TestSysCommand -from test.helpers.syscmd import SysCommand -from test.tests.qiperf import QIperf from test.tests.qethernet import Qethernet -from test.tests.qaudio import Qaudio from test.tests.qram import Qram from test.tests.qusb import Qusb -from test.tests.qi2c import Qi2c from test.tests.qeeprom import Qeeprom from test.tests.qserial import Qserial -from test.tests.qscreen import Qscreen from test.tests.qwifi import Qwifi from test.tests.qrtc import Qrtc from test.tests.qduplex_ser import Qduplex -from test.tests.qamp import Qamp -from test.tests.qflash import Qflasher -from test.helpers.finisher import Finisher +from test.tests.qamper import Qamper +from test.tests.qnand import Qnand +from test.tests.qaudio import Qaudio +from test.tests.qdmesg import Qdmesg +from test.tests.qmmcflash import Qmmcflash +from test.tests.qplc import Qplc +from test.tests.qvideo import Qvideo from test.helpers.globalVariables import globalVar +from test.tasks.flasheeprom import EEprom_Flasher +from test.tasks.flashmemory import NAND_Flasher +from test.helpers.qrreader import QRReader +from test.helpers.cmdline import LinuxKernelCmd +from test.helpers.amper import Amper +from test.enums.StationStates import StationStates +import xml.etree.ElementTree as XMLParser +from test.helpers.iseelogger import logObj +from test.helpers.utils import str2bool + +# global variables +psdbObj = TestSrv_Database() +xmlObj = None +test_abspath = None +tests_manually_executed = [] +AutoTest = False +OverrideAutoReboot = False +delay_reboot = '0' + # define clear function def clear(): # check and make call for specific operating system - _ = call('clear' if os.name =='posix' else 'cls') + _ = call('clear' if os.name == 'posix' else 'cls') +def reboot_board(): + logObj.getlogger().debug("#reboot board#") + if not OverrideAutoReboot: + set_next_state(StationStates.STATION_WAIT_SHUTDOWN.name) + if delay_reboot == '0': + time.sleep(5) + sh.shutdown('-r', '+{}'.format(delay_reboot), _bg=True) + # sh.reboot(_bg=True) + print("REBOOT...") + exit(0) -def create_board(): - psdb = TestSrv_Database() - psdb.open("setup.xml") - tree = XMLParser.parse('setup.xml') - root = tree.getroot() - suite = unittest.TestSuite() - for element in root.iter('board'): - # print(str(element.tag) + str(element.attrib)) - model_id = element.attrib['model'] - variant = element.attrib['variant'] - nstation = element.attrib['station'] - globalVar.g_mid=model_id + "-" + variant - globalVar.station=nstation - processor_id=genDieid(globalVar.g_mid) - print(globalVar.g_mid) - print(processor_id) - globalVar.g_uuid = psdb.create_board(processor_id, model_id, variant, bmac = None) - -def testsuite(): - psdb=TestSrv_Database() - psdb.open("setup.xml") - suite = unittest.TestSuite() - tests=psdb.getboard_comp_test_list(globalVar.g_uuid) - for i in range(len(tests)): - #newstr = oldstr.replace("M", "") - variables=str(tests[i][0]).split(",") - testname=variables[0].replace('(', '') - testdes=variables[1] - testfunc=variables[2] - if len(tests)>2: - testparam=variables[3].replace(')', '') - testparam = testparam.replace('"', '') - testparam = testparam.replace(';', "','") - if testparam == "": - command = "suite.addTest({}('{}','execute'))".format(testfunc, testname) +def wait_for_reboot(): + while True: + currentstate = psdbObj.read_station_state(globalVar.station) + if currentstate == StationStates.STATION_REBOOT.name: + reboot_board() + time.sleep(5) + +def execute_station_error(text): + logObj.getlogger().error("Station error: #{}#".format(text)) + wait_for_reboot() + +def check_first_state(): + currentstate = psdbObj.read_station_state(globalVar.station) + if currentstate == StationStates.STATION_ERROR.name or currentstate == StationStates.STATION_REBOOT.name: + logObj.getlogger().debug('station state during the first check is {}'.format(currentstate)) + wait_for_reboot() + else: + count_t = int(xmlObj.getKeyVal("test_main", "wait_test_start", "240")) + while currentstate != StationStates.WAIT_TEST_START.name: + logObj.getlogger().debug('wait for WAIT_TEST_START but read: {}'.format(currentstate)) + if count_t <= 0: + # Error + execute_station_error("Timeout while waiting for WAIT_TEST_START state actual STATE = {}".format(currentstate)) else: - command="suite.addTest({}('{}','execute','{}'))".format(testfunc,testname,testparam) + count_t = count_t - 5 + time.sleep(5) + currentstate = psdbObj.read_station_state(globalVar.station) + if currentstate == StationStates.STATION_ERROR.name or currentstate == StationStates.STATION_REBOOT.name: + wait_for_reboot() + + +def set_next_state(newstate): + statewritten = psdbObj.change_station_state(globalVar.station, newstate) + if statewritten == StationStates.STATION_REBOOT.name or statewritten == StationStates.STATION_ERROR.name: + wait_for_reboot() + +def reboot_if_autotest(): + # reset board if AUTOTEST is enabled + if AutoTest: + reboot_board() + + +def create_paramslist(params): + paramlist = {} + for varname, varvalue in params: + paramlist[varname] = varvalue + return paramlist + + +def add_test(suite, testdefname, paramlist): + if testdefname == "RAM": + suite.addTest(Qram(testdefname, "execute", paramlist)) + return 0 + elif testdefname == "SERIALDUAL": + suite.addTest(Qduplex(testdefname, "execute", paramlist)) + return 0 + elif testdefname == "EEPROM": + suite.addTest(Qeeprom(testdefname, "execute", paramlist)) + return 0 + elif testdefname == "SERIAL": + suite.addTest(Qserial(testdefname, "execute", paramlist)) + return 0 + elif testdefname == "RTC": + suite.addTest(Qrtc(testdefname, "execute", paramlist)) + return 0 + elif testdefname == "CONSUMPTION": + suite.addTest(Qamper(testdefname, "execute", paramlist)) + return 0 + elif testdefname == "ETHERNET": + suite.addTest(Qethernet(testdefname, "execute", paramlist)) + return 0 + elif testdefname == "NAND": + suite.addTest(Qnand(testdefname, "execute", paramlist)) + return 0 + elif testdefname == "WIFI": + suite.addTest(Qwifi(testdefname, "execute", paramlist)) + return 0 + elif testdefname == "USB": + suite.addTest(Qusb(testdefname, "execute", paramlist)) + return 0 + elif testdefname == "AUDIO": + suite.addTest(Qaudio(testdefname, "execute", paramlist)) + return 0 + elif testdefname == "DMESG": + tests_manually_executed.append(Qdmesg(testdefname, "execute", paramlist)) + return 0 + elif testdefname == "MMCFLASH": + suite.addTest(Qmmcflash(testdefname, "execute", paramlist)) + return 0 + elif testdefname == "PLC_MODEM": + suite.addTest(Qplc(testdefname, "execute", paramlist)) + return 0 + elif testdefname == "VIDEO": + suite.addTest(Qvideo(testdefname, "execute", paramlist)) + return 0 + else: + logObj.getlogger().error("Params for test: {} not implemented (ignore it), ".format(testdefname)) + return 1 + + +def create_testsuite(): + # create an object TestSuite + suite1 = unittest.TestSuite() + # get list of tests for this board + tests = psdbObj.get_tests_list(globalVar.g_uuid) + logObj.getlogger().debug("Create TestSuite: {}".format(tests)) + # loop in every test for this board + for testid, testdefname in tests: + # get params for this test + params = psdbObj.get_test_params_list(testid) + paramlist = create_paramslist(params) + # add the testid as parameter + paramlist["testid"] = testid + paramlist["boarduuid"] = globalVar.g_uuid + paramlist["testidctl"] = globalVar.testid_ctl + paramlist['testPath'] = test_abspath + logObj.getlogger().debug("Params for test: {}:{}-{}, ".format(testid, testdefname, paramlist)) + paramlist["xml"] = xmlObj + paramlist["db"] = psdbObj + # add test to TestSuite + add_test(suite1, testdefname, paramlist) + + return suite1 + + +def create_board(): + + cmd = LinuxKernelCmd() + model_id = cmd.getkvar("bmodel", "none") + variant = cmd.getkvar("bvariant", "none") + logObj.getlogger().debug("Kernel Command line vars: bmodel: {} bvariant: {}".format(model_id, variant)) + + if model_id == "none" or variant == "none": + # Error + execute_station_error("Cannot get model {} and variant {} information".format(model_id, variant)) + + # get model id + globalVar.g_mid = model_id + "-" + variant + logObj.getlogger().debug("Model-Variant -> {}".format(globalVar.g_mid)) + + # get station + if "Station" not in globalVar.station: + # Error + execute_station_error("Hostname not defined") + + processor_id = get_die_id(globalVar.g_mid) + globalVar.g_uuid = psdbObj.create_board(processor_id, model_id, variant, globalVar.station, + get_internal_mac(globalVar.g_mid)) + + logObj.getlogger().info("processor ID: {} uuid: {}".format(processor_id, globalVar.g_uuid)) + + +def get_taskvars_list(uuid): + varlist = {} + for varname, varvalue in psdbObj.get_task_variables_list(uuid): + varlist[varname] = varvalue + return varlist + +def execute_extra_task(): + # list of task + myTask = [] + # Change state to "EXTRATASKS_RUNNING" + set_next_state(StationStates.EXTRATASKS_RUNNING.name) + # create task control + globalVar.taskid_ctl = psdbObj.open_task(globalVar.g_uuid) + # get extra variables + varlist = get_taskvars_list(globalVar.g_uuid) + + # Put default Vars + varlist["boarduuid"] = globalVar.g_uuid + varlist["testidctl"] = globalVar.testid_ctl + varlist['testPath'] = test_abspath + varlist["xml"] = xmlObj + varlist["db"] = psdbObj + + # Get list Extra Task and activation + eeprom = varlist.get('eeprom', '0') + flashNand = varlist.get('flashNAND', '0') + + if eeprom == '1': + myEEprom = EEprom_Flasher(varlist) + myTask.append(myEEprom) + if flashNand == '1': + myNAND = NAND_Flasher(varlist) + myTask.append(myNAND) + + taskresult = True + for task in myTask: + res, data = task.Execute() + if res: + psdbObj.create_task_result(globalVar.taskid_ctl, task.getTaskName(), "TASK_OK", data) + logObj.getlogger().info("task: {} ok".format(task.getTaskName())) else: - print(testname) - command = "suite.addTest({}('{}','execute'))".format(testfunc, testname) - exec(command) - globalVar.testid_ctl=psdb.open_testbatch(globalVar.g_uuid) - return suite - - -def finish_test(): - psdb = TestSrv_Database() - psdb.open("setup.xml") - auxs = psdb.close_testbatch(globalVar.g_uuid, globalVar.testid_ctl) - globalVar.fstatus = auxs[0][0] - # Burn eeprom struct - psdb = TestSrv_Database() - psdb.open("setup.xml") - # We should call getboard_eeprom only if test was ok - if globalVar.fstatus: - aux = psdb.getboard_eeprom(globalVar.g_uuid) - finish = Finisher(aux) - finish.end_ok() + psdbObj.create_task_result(globalVar.taskid_ctl, task.getTaskName(), "TASK_FAIL", data) + logObj.getlogger().info("task: {} failed".format(task.getTaskName())) + taskresult = False + + # check final taskresult => ok or fail + if taskresult: + psdbObj.update_taskctl_status(globalVar.taskid_ctl, "TASK_BOARD_OK") else: - finish = Finisher(globalVar.g_uuid) - finish.end_fail() - # Update set_test current_test with 'END' so that it finally gets painted in green - psdb = TestSrv_Database() - psdb.open("setup.xml") - psdb.update_set_test_row(globalVar.station, globalVar.testid_ctl, globalVar.g_uuid, "END","FINISH") - -def main(): - #addtesttomodel() - #addtestdef() + psdbObj.update_taskctl_status(globalVar.taskid_ctl, "TASK_BOARD_FAIL") + for task in myTask: + if task.getError() != '': + execute_station_error("Unable to complete extra tasks: {}".format(task.getError())) + # Not emit a error message + execute_station_error("Unable to complete extra tasks") + +def getBarCode(fake): + + set_next_state(StationStates.WAITING_FOR_SCANNER.name) + if fake: + psdbObj.set_factorycode(globalVar.g_uuid, str(random.randint(1, 1000001))) + return + qrreceived = False + wait_count = psdbObj.get_setup_variable("test_main", xmlObj.getKeyVal("test_main", "qr_wait_time", "120")) + while not qrreceived or int(wait_count) > 0: + qr = QRReader() + if qr.openQR(): + # waits 5s to receive a valid code + if qr.readQRasync(10): + qrreceived = True + factorycode = qr.getQRNumber() + psdbObj.set_factorycode(globalVar.g_uuid, factorycode) + return + qr.closeQR() + else: + qrreceived = True + del qr + time.sleep(1) + wait_count = wait_count - 1 + if int(wait_count) == 0: + execute_station_error("Cannot find a barcode scanner") + + +def execute_test (): + # initialize the board create_board() - #globalVar.g_uuid = "1f59c654-0cc6-11e8-8d51-e644f56b8edd" - try: - os.remove("test_results.dat") - except: - pass - runner = SimpleTestRunner() - runner.run(testsuite()) - finish_test() + # create a process + globalVar.testid_ctl = psdbObj.open_test(globalVar.g_uuid) + logObj.getlogger().info("Create new Test: {}".format(globalVar.testid_ctl)) + + # Change state to "TESTS_RUNNING" + set_next_state(StationStates.TESTS_RUNNING.name) + + # generate suits + suite1 = create_testsuite() + # Execute tests (round 1) + runner1 = SimpleTestRunner(psdbObj) + logObj.getlogger().info("Runner run id: {}".format(globalVar.testid_ctl)) + testresult = runner1.run(suite1) + logObj.getlogger().info("Runner run id: {} finish {}".format(globalVar.testid_ctl, testresult)) + + # Execute manually tests + for test in tests_manually_executed: + logObj.getlogger().info("Execute manual test id: {}:{}".format(globalVar.testid_ctl, test.getTestName())) + mtestResult = test.execute() + logObj.getlogger().info("Execute manual test id: {}:{} Result: {}".format(globalVar.testid_ctl, test.getTestName(), + 'ok' if mtestResult else 'fail')) + + # execute aditional tasks, only if the test was successful + if testresult.wasSuccessful(): + logObj.getlogger().info("Execute test id: {} finished succesfully".format(globalVar.testid_ctl)) + set_next_state(StationStates.TESTS_OK.name) + logObj.getlogger().info("Execute test id: {} extra task".format(globalVar.testid_ctl)) + execute_extra_task() + logObj.getlogger().info("Execute test id: {} wait read scanner".format(globalVar.testid_ctl)) + getBarCode(AutoTest) + set_next_state(StationStates.FINISHED.name) + logObj.getlogger().info("Execute test id: {} wait read scanner finish".format(globalVar.testid_ctl)) + else: + logObj.getlogger().info("Execute test id: {} finished with one or more fails".format(globalVar.testid_ctl)) + # Change state to "TESTS_FAILED" + set_next_state(StationStates.TESTS_FAILED.name) + + # reset board if AUTOTEST is enabled + reboot_if_autotest() + + +def enableFaulthandler(): + """ Enable faulthandler for all threads. + + If the faulthandler package is available, this function disables and then + re-enables fault handling for all threads (this is necessary to ensure any + new threads are handled correctly), and returns True. + + If faulthandler is not available, then returns False. + """ + try: + import faulthandler + # necessary to disable first or else new threads may not be handled. + faulthandler.disable() + faulthandler.enable(all_threads=True) + return True + except ImportError: + return False if __name__ == "__main__": + # Clear the shell screen clear() - main() + + enableFaulthandler() + + test_abspath = os.path.dirname(os.path.abspath(__file__)) + configFile = 'setup.xml' + loglevel = 'INFO' + logObj.getlogger().info("Test program Started : {}".format(datetime.now())) + try: + argv = sys.argv[1:] + opts, args = getopt.getopt(argv, 'c:l:', ['cfg', 'loglevel']) + for opt, args in opts: + if opt in ('-c', '--setup'): + configFile = args + if opt in ('-l', '--loglevel'): + loglevel = args + except getopt.GetoptError as err: + logObj.getlogger().error('#{}#'.format(err)) + + if loglevel != 'INFO': + logObj.setLogLevel(loglevel) + + # Try to parse the setup.xml file + logObj.getlogger().debug("parse config file: {}". format(configFile)) + try: + xmlObj = XMLSetup(os.path.join(test_abspath, configFile)) + except XMLParser.ParseError as error: + # Error + execute_station_error("Error parsing {} file {}".format(configFile, error.msg)) + + # Try to connect to the DB, according to setup.xml configuration + if not psdbObj.open(xmlObj): + # Error + execute_station_error("Cannot open DB connection") + else: + logObj.getlogger().debug("database connection ok: {}".format(psdbObj.getConfig())) + + # get station name + globalVar.station = socket.gethostname() + logObj.getlogger().info("Test Station: {}".format(globalVar.station)) + + if psdbObj.get_setup_variable("AUTOTEST_" + globalVar.station, xmlObj.getKeyVal("test_main", "autotest", "0")) == '1': + AutoTest = True + + OverrideAutoReboot = str2bool(xmlObj.getKeyVal("test_main", "OverrideAutoReboot", "0")) + delay_reboot = xmlObj.getKeyVal("test_main", "delayReboot", "0") + + logObj.getlogger().info("AutoTest: {} OverrideAutoReboot: {} delay Reboot: {}".format(AutoTest, OverrideAutoReboot, delay_reboot)) + + # Check if current state is "WAIT_TEST_START". if not, waits here + if xmlObj.getKeyVal("test_main", "overrideCheckWaitStart", "0") == "0": + check_first_state() + else: + psdbObj.setDevelStationState(globalVar.station, "WAIT_TEST_START") + + # Change state to "TESTS_CHECKING_ENV" + set_next_state(StationStates.TESTS_CHECKING_ENV.name) + + # execute main program + execute_test() + + exit(0) |