From d46bce422fd03cd57d1ba336361da17d6efb48db Mon Sep 17 00:00:00 2001 From: Manel Caro Date: Fri, 31 Jul 2020 02:07:37 +0200 Subject: TEST restructure --- test-cli/test_main.py | 372 +++++++++++++++++++++++++++++--------------------- 1 file changed, 220 insertions(+), 152 deletions(-) (limited to 'test-cli/test_main.py') diff --git a/test-cli/test_main.py b/test-cli/test_main.py index 942df61..3f52152 100644 --- a/test-cli/test_main.py +++ b/test-cli/test_main.py @@ -1,8 +1,17 @@ +import time +import sh +import os +import unittest +import socket +import logging +import sys +import getopt +import random + +from datetime import datetime from test.helpers.int_registers import get_die_id from test.helpers.int_registers import get_internal_mac from subprocess import call -import os -import unittest from test.helpers.testsrv_db import TestSrv_Database from test.helpers.setup_xml import XMLSetup from test.runners.simple import SimpleTestRunner @@ -11,7 +20,6 @@ from test.tests.qram import Qram from test.tests.qusb import Qusb from test.tests.qeeprom import Qeeprom from test.tests.qserial import Qserial -from test.tests.qusbdual import Qusbdual from test.tests.qwifi import Qwifi from test.tests.qrtc import Qrtc from test.tests.qduplex_ser import Qduplex @@ -19,25 +27,28 @@ from test.tests.qamper import Qamper from test.tests.qnand import Qnand from test.tests.qaudio import Qaudio from test.tests.qdmesg import Qdmesg +from test.tests.qmmcflash import Qmmcflash +from test.tests.qplc import Qplc from test.helpers.globalVariables import globalVar -import socket -from test.helpers.iseelogger import ISEE_Logger -import logging -from test.tasks.flasheeprom import flash_eeprom -from test.tasks.flashmemory import flash_memory +from test.tasks.flasheeprom import EEprom_Flasher +from test.tasks.flashmemory import NAND_Flasher from test.helpers.qrreader import QRReader -from test.helpers.cmdline import LinuxKernel +from test.helpers.cmdline import LinuxKernelCmd from test.helpers.amper import Amper from test.enums.StationStates import StationStates -import time -import sh +from test.helpers.mydebugger import sentdebugmessage +import xml.etree.ElementTree as XMLParser +from test.helpers.iseelogger import logObj +from test.helpers.utils import str2bool # global variables psdbObj = TestSrv_Database() xmlObj = None -loggerObj = None test_abspath = None tests_manually_executed = [] +AutoTest = False +OverrideAutoReboot = False +delay_reboot = '0' # define clear function @@ -45,74 +56,62 @@ def clear(): # check and make call for specific operating system _ = call('clear' if os.name == 'posix' else 'cls') +def reboot_board(): + logObj.getlogger().debug("#reboot board#") + if not OverrideAutoReboot: + set_next_state(StationStates.STATION_WAIT_SHUTDOWN.name) + if delay_reboot == '0': + time.sleep(5) + sh.shutdown('-r', '+{}'.format(delay_reboot), _bg=True) + # sh.reboot(_bg=True) + print("REBOOT...") + exit(0) -def execute_station_error(text): - print("Error: {}".format(text)) - loggerObj.getlogger().info("Station error: #{}#".format(text)) +def wait_for_reboot(): while True: currentstate = psdbObj.read_station_state(globalVar.station) if currentstate == StationStates.STATION_REBOOT.name: reboot_board() - time.sleep(1) + time.sleep(5) +def execute_station_error(text): + logObj.getlogger().error("Station error: #{}#".format(text)) + wait_for_reboot() def check_first_state(): currentstate = psdbObj.read_station_state(globalVar.station) - if currentstate == StationStates.STATION_ERROR.name: - while True: - currentstate = psdbObj.read_station_state(globalVar.station) - if currentstate == StationStates.STATION_REBOOT.name: - reboot_board() - time.sleep(1) - elif currentstate == StationStates.STATION_REBOOT.name: - reboot_board() + if currentstate == StationStates.STATION_ERROR.name or currentstate == StationStates.STATION_REBOOT.name: + logObj.getlogger().debug('station state during the first check is {}'.format(currentstate)) + wait_for_reboot() else: - starttime = time.time() + count_t = int(xmlObj.getKeyVal("test_main", "wait_test_start", "240")) while currentstate != StationStates.WAIT_TEST_START.name: - if time.time() - starttime >= 30.0: + logObj.getlogger().debug('wait for WAIT_TEST_START but read: {}'.format(currentstate)) + if count_t <= 0: # Error - execute_station_error("Timeout while waiting for WAIT_TEST_START state") + execute_station_error("Timeout while waiting for WAIT_TEST_START state actual STATE = {}".format(currentstate)) else: + count_t = count_t - 5 time.sleep(5) currentstate = psdbObj.read_station_state(globalVar.station) - if currentstate == StationStates.STATION_ERROR.name: - while True: - currentstate = psdbObj.read_station_state(globalVar.station) - if currentstate == StationStates.STATION_REBOOT.name: - reboot_board() - time.sleep(1) - elif currentstate == StationStates.STATION_REBOOT.name: - reboot_board() - - -def reboot_board(): - sh.reboot() - exit(0) + if currentstate == StationStates.STATION_ERROR.name or currentstate == StationStates.STATION_REBOOT.name: + wait_for_reboot() def set_next_state(newstate): statewritten = psdbObj.change_station_state(globalVar.station, newstate) - if statewritten == StationStates.STATION_REBOOT.name: - reboot_board() - elif statewritten == StationStates.STATION_ERROR.name: - while True: - currentstate = psdbObj.read_station_state(globalVar.station) - if currentstate == StationStates.STATION_REBOOT.name: - reboot_board() - time.sleep(1) - + if statewritten == StationStates.STATION_REBOOT.name or statewritten == StationStates.STATION_ERROR.name: + wait_for_reboot() def reboot_if_autotest(): # reset board if AUTOTEST is enabled - autotest = psdbObj.get_setup_variable("AUTOTEST_" + globalVar.station) - if int(autotest) == 1: + if AutoTest: reboot_board() def create_paramslist(params): paramlist = {} - for row in params: - varname, varvalue = row + for varname, varvalue in params: paramlist[varname] = varvalue return paramlist @@ -151,13 +150,17 @@ def add_test(suite, testdefname, paramlist): elif testdefname == "AUDIO": suite.addTest(Qaudio(testdefname, "execute", paramlist)) return 0 - elif testdefname == "USBLOOP": - suite.addTest(Qusbdual(testdefname, "execute", paramlist)) - return 0 elif testdefname == "DMESG": tests_manually_executed.append(Qdmesg(testdefname, "execute", paramlist)) return 0 + elif testdefname == "MMCFLASH": + suite.addTest(Qmmcflash(testdefname, "execute", paramlist)) + return 0 + elif testdefname == "PLC_MODEM": + suite.addTest(Qplc(testdefname, "execute", paramlist)) + return 0 else: + logObj.getlogger().error("Params for test: {} not implemented (ignore it), ".format(testdefname)) return 1 @@ -166,9 +169,9 @@ def create_testsuite(): suite1 = unittest.TestSuite() # get list of tests for this board tests = psdbObj.get_tests_list(globalVar.g_uuid) + logObj.getlogger().debug("Create TestSuite: {}".format(tests)) # loop in every test for this board - for row in tests: - testid, testdefname = row + for testid, testdefname in tests: # get params for this test params = psdbObj.get_test_params_list(testid) paramlist = create_paramslist(params) @@ -176,6 +179,8 @@ def create_testsuite(): paramlist["testid"] = testid paramlist["boarduuid"] = globalVar.g_uuid paramlist["testidctl"] = globalVar.testid_ctl + paramlist['testPath'] = test_abspath + logObj.getlogger().debug("Params for test: {}:{}-{}, ".format(testid, testdefname, paramlist)) paramlist["xml"] = xmlObj paramlist["db"] = psdbObj # add test to TestSuite @@ -185,44 +190,123 @@ def create_testsuite(): def create_board(): - cmd = LinuxKernel() + + cmd = LinuxKernelCmd() model_id = cmd.getkvar("bmodel", "none") variant = cmd.getkvar("bvariant", "none") + logObj.getlogger().debug("Kernel Command line vars: bmodel: {} bvariant: {}".format(model_id, variant)) if model_id == "none" or variant == "none": # Error - execute_station_error("Cannot get model and variant information") + execute_station_error("Cannot get model {} and variant {} information".format(model_id, variant)) # get model id globalVar.g_mid = model_id + "-" + variant + logObj.getlogger().debug("Model-Variant -> {}".format(globalVar.g_mid)) # get station if "Station" not in globalVar.station: # Error - execute_station_error("Wrong station name") + execute_station_error("Hostname not defined") processor_id = get_die_id(globalVar.g_mid) - print(globalVar.g_mid) - print(processor_id) globalVar.g_uuid = psdbObj.create_board(processor_id, model_id, variant, globalVar.station, get_internal_mac(globalVar.g_mid)) - print(globalVar.g_uuid) + + logObj.getlogger().info("processor ID: {} uuid: {}".format(processor_id, globalVar.g_uuid)) def get_taskvars_list(uuid): varlist = {} - for row in psdbObj.get_task_variables_list(uuid): - varname, varvalue = row + for varname, varvalue in psdbObj.get_task_variables_list(uuid): varlist[varname] = varvalue return varlist +def execute_extra_task(): + # list of task + myTask = [] + # Change state to "EXTRATASKS_RUNNING" + set_next_state(StationStates.EXTRATASKS_RUNNING.name) + # create task control + globalVar.taskid_ctl = psdbObj.open_task(globalVar.g_uuid) + # get extra variables + varlist = get_taskvars_list(globalVar.g_uuid) + + # Put default Vars + varlist["boarduuid"] = globalVar.g_uuid + varlist["testidctl"] = globalVar.testid_ctl + varlist['testPath'] = test_abspath + varlist["xml"] = xmlObj + varlist["db"] = psdbObj + + # Get list Extra Task and activation + eeprom = varlist.get('eeprom', '0') + flashNand = varlist.get('flashNAND', '0') + + if eeprom == '1': + myEEprom = EEprom_Flasher(varlist) + myTask.append(myEEprom) + if flashNand == '1': + myNAND = NAND_Flasher(varlist) + myTask.append(myNAND) + + taskresult = True + for task in myTask: + res, data = task.Execute() + if res: + psdbObj.create_task_result(globalVar.taskid_ctl, task.getTaskName(), "TASK_OK", data) + logObj.getlogger().info("task: {} ok".format(task.getTaskName())) + else: + psdbObj.create_task_result(globalVar.taskid_ctl, task.getTaskName(), "TASK_FAIL", data) + logObj.getlogger().info("task: {} failed".format(task.getTaskName())) + taskresult = False + + # check final taskresult => ok or fail + if taskresult: + psdbObj.update_taskctl_status(globalVar.taskid_ctl, "TASK_BOARD_OK") + else: + psdbObj.update_taskctl_status(globalVar.taskid_ctl, "TASK_BOARD_FAIL") + for task in myTask: + if task.getError() != '': + execute_station_error("Unable to complete extra tasks: {}".format(task.getError())) + # Not emit a error message + execute_station_error("Unable to complete extra tasks") + +def getBarCode(fake): + + set_next_state(StationStates.WAITING_FOR_SCANNER.name) + if fake: + psdbObj.set_factorycode(globalVar.g_uuid, str(random.randint(1, 1000001))) + return + qrreceived = False + wait_count = psdbObj.get_setup_variable("QRWAIT_SEC", xmlObj.getKeyVal("test_main", "qr_wait_time", "120")) + while not qrreceived or int(wait_count) > 0: + qr = QRReader() + if qr.openQR(): + # waits 5s to receive a valid code + if qr.readQRasync(10): + qrreceived = True + factorycode = qr.getQRNumber() + psdbObj.set_factorycode(globalVar.g_uuid, factorycode) + return + qr.closeQR() + else: + qrreceived = True + del qr + time.sleep(1) + wait_count = wait_count - 1 + if int(wait_count) == 0: + execute_station_error("Cannot find a barcode scanner") + -def main(): +def execute_test (): # initialize the board create_board() # create a process globalVar.testid_ctl = psdbObj.open_test(globalVar.g_uuid) + logObj.getlogger().info("Create new Test: {}".format(globalVar.testid_ctl)) + # Change state to "TESTS_RUNNING" set_next_state(StationStates.TESTS_RUNNING.name) @@ -230,71 +314,29 @@ def main(): suite1 = create_testsuite() # Execute tests (round 1) runner1 = SimpleTestRunner(psdbObj) + logObj.getlogger().info("Runner run id: {}".format(globalVar.testid_ctl)) testresult = runner1.run(suite1) + logObj.getlogger().info("Runner run id: {} finish {}".format(globalVar.testid_ctl, testresult)) # Execute manually tests for test in tests_manually_executed: - test.execute() + logObj.getlogger().info("Execute manual test id: {}:{}".format(globalVar.testid_ctl, test.getTestName())) + mtestResult = test.execute() + logObj.getlogger().info("Execute manual test id: {}:{} Result: {}".format(globalVar.testid_ctl, test.getTestName(), + 'ok' if mtestResult else 'fail')) # execute aditional tasks, only if the test was successful if testresult.wasSuccessful(): - # Change state to "TESTS_OK" + logObj.getlogger().info("Execute test id: {} finished succesfully".format(globalVar.testid_ctl)) set_next_state(StationStates.TESTS_OK.name) - # Change state to "EXTRATASKS_RUNNING" - set_next_state(StationStates.EXTRATASKS_RUNNING.name) - # create task control - globalVar.taskid_ctl = psdbObj.open_task(globalVar.g_uuid) - # get extra variables - varlist = get_taskvars_list(globalVar.g_uuid) - - alltasksok = False - - # flash eeprom - resulteeprom = 0 - if "eeprompath" in varlist and len(varlist["eeprompath"]) > 0: - mac0 = psdbObj.get_board_macaddr(globalVar.g_uuid) - resulteeprom, eepromdata = flash_eeprom(varlist["eeprompath"], globalVar.g_uuid, mac0) - psdbObj.create_task_result(globalVar.taskid_ctl, "FLASHEEPROM", - "TASK_OK" if resulteeprom == 0 else "TASK_FAIL", eepromdata) - - # flash non-volatile memory - resultmemory = 0 - if "flashimagepath" in varlist and len(varlist["flashimagepath"]) > 0: - resultmemory = flash_memory(varlist["flashimagepath"]) - psdbObj.create_task_result(globalVar.taskid_ctl, "FLASHMEMORY", - "TASK_OK" if resultmemory == 0 else "TASK_FAIL", - varlist["flashimagepath"]) - - # update status with the result - if resulteeprom == 0 and resultmemory == 0: - # finish tasks - psdbObj.update_taskctl_status(globalVar.taskid_ctl, "TASK_BOARD_OK") - - # Change state to "WAITING_FOR_SCANNER" - set_next_state(StationStates.WAITING_FOR_SCANNER.name) - - # get barcode using the scanner, only if autotest is disabled - autotest = psdbObj.get_setup_variable("AUTOTEST_" + globalVar.station) - if int(autotest) == 1: - psdbObj.set_factorycode(globalVar.g_uuid, "1234567890") # fake factory code - else: - qrreceived = False - while not qrreceived: - qr = QRReader() - if qr.openQR(): - # waits 5s to receive a valid code - if qr.readQRasync(5): - qrreceived = True - factorycode = qr.getQRNumber() - psdbObj.set_factorycode(globalVar.g_uuid, factorycode) - qr.closeQR() - - # Change state to "FINISHED" - set_next_state(StationStates.FINISHED.name) - else: - psdbObj.update_taskctl_status(globalVar.taskid_ctl, "TASK_BOARD_FAIL") - loggerObj.getlogger().info("Station error: #Unable to complete extra tasks#") + logObj.getlogger().info("Execute test id: {} extra task".format(globalVar.testid_ctl)) + execute_extra_task() + logObj.getlogger().info("Execute test id: {} wait read scanner".format(globalVar.testid_ctl)) + getBarCode(AutoTest) + set_next_state(StationStates.FINISHED.name) + logObj.getlogger().info("Execute test id: {} wait read scanner finish".format(globalVar.testid_ctl)) else: + logObj.getlogger().info("Execute test id: {} finished with one or more fails".format(globalVar.testid_ctl)) # Change state to "TESTS_FAILED" set_next_state(StationStates.TESTS_FAILED.name) @@ -302,28 +344,75 @@ def main(): reboot_if_autotest() +def enableFaulthandler(): + """ Enable faulthandler for all threads. + + If the faulthandler package is available, this function disables and then + re-enables fault handling for all threads (this is necessary to ensure any + new threads are handled correctly), and returns True. + + If faulthandler is not available, then returns False. + """ + try: + import faulthandler + # necessary to disable first or else new threads may not be handled. + faulthandler.disable() + faulthandler.enable(all_threads=True) + return True + except ImportError: + return False + if __name__ == "__main__": # Clear the shell screen clear() - test_abspath = os.path.dirname(os.path.abspath(__file__)) - # create logger - loggerObj = ISEE_Logger(logging.INFO) + enableFaulthandler() + + test_abspath = os.path.dirname(os.path.abspath(__file__)) + configFile = 'setup.xml' + loglevel = 'INFO' + logObj.getlogger().info("Test program Started : {}".format(datetime.now())) + try: + argv = sys.argv[1:] + opts, args = getopt.getopt(argv, 'c:l:', ['cfg', 'loglevel']) + for opt, args in opts: + if opt in ('-c', '--setup'): + configFile = args + if opt in ('-l', '--loglevel'): + loglevel = args + except getopt.GetoptError as err: + logObj.getlogger().error('#{}#'.format(err)) + + if loglevel != 'INFO': + logObj.setLogLevel(loglevel) # Try to parse the setup.xml file + logObj.getlogger().debug("parse config file: {}". format(configFile)) try: - xmlObj = XMLSetup(os.path.join(test_abspath, "setup.xml")) - except: + xmlObj = XMLSetup(os.path.join(test_abspath, configFile)) + except XMLParser.ParseError as error: # Error - execute_station_error("Cannot parse setup.xml file") + execute_station_error("Error parsing {} file {}".format(configFile, error.msg)) # Try to connect to the DB, according to setup.xml configuration if not psdbObj.open(xmlObj): # Error execute_station_error("Cannot open DB connection") + else: + logObj.getlogger().debug("database connection failed: {}".format(psdbObj.getConfig())) # get station name globalVar.station = socket.gethostname() + logObj.getlogger().info("Test Station: {}".format(globalVar.station)) + + if psdbObj.get_setup_variable("AUTOTEST_" + globalVar.station, xmlObj.getKeyVal("test_main", "autotest", "0")) == '1': + AutoTest = True + + OverrideAutoReboot = str2bool(xmlObj.getKeyVal("test_main", "OverrideAutoReboot", "0")) + delay_reboot = xmlObj.getKeyVal("test_main", "delayReboot", "0") + + logObj.getlogger().info("AutoTest: {} OverrideAutoReboot: {} delay Reboot: {}".format(AutoTest, OverrideAutoReboot, delay_reboot)) + # Check if current state is "WAIT_TEST_START". if not, waits here check_first_state() @@ -331,28 +420,7 @@ if __name__ == "__main__": # Change state to "TESTS_CHECKING_ENV" set_next_state(StationStates.TESTS_CHECKING_ENV.name) - # Wait before beginning - time.sleep(2) - - # Look for a barcode scanner - qr = QRReader() - if qr.openQR(): - qr.closeQR() - else: - # Error - execute_station_error("Cannot find a barcode scanner") - - # Look for an amperimeter - amp = Amper() - if not amp.open(): - # Error - execute_station_error("Cannot open an amperimeter COM port") - if not amp.hello(): - if not amp.hello(): - # Error - execute_station_error("Cannot open find an amperimeter") - - # Execute main - main() + # execute main program + execute_test() exit(0) -- cgit v1.1