import time import sh import os import unittest import socket import logging import sys import getopt import random from datetime import datetime from test.helpers.int_registers import get_die_id from test.helpers.int_registers import get_internal_mac from subprocess import call from test.helpers.testsrv_db import TestSrv_Database from test.helpers.setup_xml import XMLSetup from test.runners.simple import SimpleTestRunner from test.tests.qethernet import Qethernet from test.tests.qram import Qram from test.tests.qusb import Qusb from test.tests.qeeprom import Qeeprom from test.tests.qserial import Qserial from test.tests.qwifi import Qwifi from test.tests.qrtc import Qrtc from test.tests.qduplex_ser import Qduplex from test.tests.qamper import Qamper from test.tests.qnand import Qnand from test.tests.qaudio import Qaudio from test.tests.qdmesg import Qdmesg from test.tests.qmmcflash import Qmmcflash from test.tests.qplc import Qplc from test.tests.qvideo import Qvideo from test.helpers.globalVariables import globalVar from test.tasks.flasheeprom import EEprom_Flasher from test.tasks.flashmemory import NAND_Flasher from test.helpers.qrreader import QRReader from test.helpers.cmdline import LinuxKernelCmd from test.helpers.amper import Amper from test.enums.StationStates import StationStates from test.helpers.mydebugger import sentdebugmessage import xml.etree.ElementTree as XMLParser from test.helpers.iseelogger import logObj from test.helpers.utils import str2bool # global variables psdbObj = TestSrv_Database() xmlObj = None test_abspath = None tests_manually_executed = [] AutoTest = False OverrideAutoReboot = False delay_reboot = '0' # define clear function def clear(): # check and make call for specific operating system _ = call('clear' if os.name == 'posix' else 'cls') def reboot_board(): logObj.getlogger().debug("#reboot board#") if not OverrideAutoReboot: set_next_state(StationStates.STATION_WAIT_SHUTDOWN.name) if delay_reboot == '0': time.sleep(5) sh.shutdown('-r', '+{}'.format(delay_reboot), _bg=True) # sh.reboot(_bg=True) print("REBOOT...") exit(0) def wait_for_reboot(): while True: currentstate = psdbObj.read_station_state(globalVar.station) if currentstate == StationStates.STATION_REBOOT.name: reboot_board() time.sleep(5) def execute_station_error(text): logObj.getlogger().error("Station error: #{}#".format(text)) wait_for_reboot() def check_first_state(): currentstate = psdbObj.read_station_state(globalVar.station) if currentstate == StationStates.STATION_ERROR.name or currentstate == StationStates.STATION_REBOOT.name: logObj.getlogger().debug('station state during the first check is {}'.format(currentstate)) wait_for_reboot() else: count_t = int(xmlObj.getKeyVal("test_main", "wait_test_start", "240")) while currentstate != StationStates.WAIT_TEST_START.name: logObj.getlogger().debug('wait for WAIT_TEST_START but read: {}'.format(currentstate)) if count_t <= 0: # Error execute_station_error("Timeout while waiting for WAIT_TEST_START state actual STATE = {}".format(currentstate)) else: count_t = count_t - 5 time.sleep(5) currentstate = psdbObj.read_station_state(globalVar.station) if currentstate == StationStates.STATION_ERROR.name or currentstate == StationStates.STATION_REBOOT.name: wait_for_reboot() def set_next_state(newstate): statewritten = psdbObj.change_station_state(globalVar.station, newstate) if statewritten == StationStates.STATION_REBOOT.name or statewritten == StationStates.STATION_ERROR.name: wait_for_reboot() def reboot_if_autotest(): # reset board if AUTOTEST is enabled if AutoTest: reboot_board() def create_paramslist(params): paramlist = {} for varname, varvalue in params: paramlist[varname] = varvalue return paramlist def add_test(suite, testdefname, paramlist): if testdefname == "RAM": suite.addTest(Qram(testdefname, "execute", paramlist)) return 0 elif testdefname == "SERIALDUAL": suite.addTest(Qduplex(testdefname, "execute", paramlist)) return 0 elif testdefname == "EEPROM": suite.addTest(Qeeprom(testdefname, "execute", paramlist)) return 0 elif testdefname == "SERIAL": suite.addTest(Qserial(testdefname, "execute", paramlist)) return 0 elif testdefname == "RTC": suite.addTest(Qrtc(testdefname, "execute", paramlist)) return 0 elif testdefname == "CONSUMPTION": suite.addTest(Qamper(testdefname, "execute", paramlist)) return 0 elif testdefname == "ETHERNET": suite.addTest(Qethernet(testdefname, "execute", paramlist)) return 0 elif testdefname == "NAND": suite.addTest(Qnand(testdefname, "execute", paramlist)) return 0 elif testdefname == "WIFI": suite.addTest(Qwifi(testdefname, "execute", paramlist)) return 0 elif testdefname == "USB": suite.addTest(Qusb(testdefname, "execute", paramlist)) return 0 elif testdefname == "AUDIO": suite.addTest(Qaudio(testdefname, "execute", paramlist)) return 0 elif testdefname == "DMESG": tests_manually_executed.append(Qdmesg(testdefname, "execute", paramlist)) return 0 elif testdefname == "MMCFLASH": suite.addTest(Qmmcflash(testdefname, "execute", paramlist)) return 0 elif testdefname == "PLC_MODEM": suite.addTest(Qplc(testdefname, "execute", paramlist)) return 0 elif testdefname == "VIDEO": suite.addTest(Qvideo(testdefname, "execute", paramlist)) return 0 else: logObj.getlogger().error("Params for test: {} not implemented (ignore it), ".format(testdefname)) return 1 def create_testsuite(): # create an object TestSuite suite1 = unittest.TestSuite() # get list of tests for this board tests = psdbObj.get_tests_list(globalVar.g_uuid) logObj.getlogger().debug("Create TestSuite: {}".format(tests)) # loop in every test for this board for testid, testdefname in tests: # get params for this test params = psdbObj.get_test_params_list(testid) paramlist = create_paramslist(params) # add the testid as parameter paramlist["testid"] = testid paramlist["boarduuid"] = globalVar.g_uuid paramlist["testidctl"] = globalVar.testid_ctl paramlist['testPath'] = test_abspath logObj.getlogger().debug("Params for test: {}:{}-{}, ".format(testid, testdefname, paramlist)) paramlist["xml"] = xmlObj paramlist["db"] = psdbObj # add test to TestSuite add_test(suite1, testdefname, paramlist) return suite1 def create_board(): cmd = LinuxKernelCmd() model_id = cmd.getkvar("bmodel", "none") variant = cmd.getkvar("bvariant", "none") logObj.getlogger().debug("Kernel Command line vars: bmodel: {} bvariant: {}".format(model_id, variant)) if model_id == "none" or variant == "none": # Error execute_station_error("Cannot get model {} and variant {} information".format(model_id, variant)) # get model id globalVar.g_mid = model_id + "-" + variant logObj.getlogger().debug("Model-Variant -> {}".format(globalVar.g_mid)) # get station if "Station" not in globalVar.station: # Error execute_station_error("Hostname not defined") processor_id = get_die_id(globalVar.g_mid) globalVar.g_uuid = psdbObj.create_board(processor_id, model_id, variant, globalVar.station, get_internal_mac(globalVar.g_mid)) logObj.getlogger().info("processor ID: {} uuid: {}".format(processor_id, globalVar.g_uuid)) def get_taskvars_list(uuid): varlist = {} for varname, varvalue in psdbObj.get_task_variables_list(uuid): varlist[varname] = varvalue return varlist def execute_extra_task(): # list of task myTask = [] # Change state to "EXTRATASKS_RUNNING" set_next_state(StationStates.EXTRATASKS_RUNNING.name) # create task control globalVar.taskid_ctl = psdbObj.open_task(globalVar.g_uuid) # get extra variables varlist = get_taskvars_list(globalVar.g_uuid) # Put default Vars varlist["boarduuid"] = globalVar.g_uuid varlist["testidctl"] = globalVar.testid_ctl varlist['testPath'] = test_abspath varlist["xml"] = xmlObj varlist["db"] = psdbObj # Get list Extra Task and activation eeprom = varlist.get('eeprom', '0') flashNand = varlist.get('flashNAND', '0') if eeprom == '1': myEEprom = EEprom_Flasher(varlist) myTask.append(myEEprom) if flashNand == '1': myNAND = NAND_Flasher(varlist) myTask.append(myNAND) taskresult = True for task in myTask: res, data = task.Execute() if res: psdbObj.create_task_result(globalVar.taskid_ctl, task.getTaskName(), "TASK_OK", data) logObj.getlogger().info("task: {} ok".format(task.getTaskName())) else: psdbObj.create_task_result(globalVar.taskid_ctl, task.getTaskName(), "TASK_FAIL", data) logObj.getlogger().info("task: {} failed".format(task.getTaskName())) taskresult = False # check final taskresult => ok or fail if taskresult: psdbObj.update_taskctl_status(globalVar.taskid_ctl, "TASK_BOARD_OK") else: psdbObj.update_taskctl_status(globalVar.taskid_ctl, "TASK_BOARD_FAIL") for task in myTask: if task.getError() != '': execute_station_error("Unable to complete extra tasks: {}".format(task.getError())) # Not emit a error message execute_station_error("Unable to complete extra tasks") def getBarCode(fake): set_next_state(StationStates.WAITING_FOR_SCANNER.name) if fake: psdbObj.set_factorycode(globalVar.g_uuid, str(random.randint(1, 1000001))) return qrreceived = False wait_count = psdbObj.get_setup_variable("test_main", xmlObj.getKeyVal("test_main", "qr_wait_time", "120")) while not qrreceived or int(wait_count) > 0: qr = QRReader() if qr.openQR(): # waits 5s to receive a valid code if qr.readQRasync(10): qrreceived = True factorycode = qr.getQRNumber() psdbObj.set_factorycode(globalVar.g_uuid, factorycode) return qr.closeQR() else: qrreceived = True del qr time.sleep(1) wait_count = wait_count - 1 if int(wait_count) == 0: execute_station_error("Cannot find a barcode scanner") def execute_test (): # initialize the board create_board() # create a process globalVar.testid_ctl = psdbObj.open_test(globalVar.g_uuid) logObj.getlogger().info("Create new Test: {}".format(globalVar.testid_ctl)) # Change state to "TESTS_RUNNING" set_next_state(StationStates.TESTS_RUNNING.name) # generate suits suite1 = create_testsuite() # Execute tests (round 1) runner1 = SimpleTestRunner(psdbObj) logObj.getlogger().info("Runner run id: {}".format(globalVar.testid_ctl)) testresult = runner1.run(suite1) logObj.getlogger().info("Runner run id: {} finish {}".format(globalVar.testid_ctl, testresult)) # Execute manually tests for test in tests_manually_executed: logObj.getlogger().info("Execute manual test id: {}:{}".format(globalVar.testid_ctl, test.getTestName())) mtestResult = test.execute() logObj.getlogger().info("Execute manual test id: {}:{} Result: {}".format(globalVar.testid_ctl, test.getTestName(), 'ok' if mtestResult else 'fail')) # execute aditional tasks, only if the test was successful if testresult.wasSuccessful(): logObj.getlogger().info("Execute test id: {} finished succesfully".format(globalVar.testid_ctl)) set_next_state(StationStates.TESTS_OK.name) logObj.getlogger().info("Execute test id: {} extra task".format(globalVar.testid_ctl)) execute_extra_task() logObj.getlogger().info("Execute test id: {} wait read scanner".format(globalVar.testid_ctl)) getBarCode(AutoTest) set_next_state(StationStates.FINISHED.name) logObj.getlogger().info("Execute test id: {} wait read scanner finish".format(globalVar.testid_ctl)) else: logObj.getlogger().info("Execute test id: {} finished with one or more fails".format(globalVar.testid_ctl)) # Change state to "TESTS_FAILED" set_next_state(StationStates.TESTS_FAILED.name) # reset board if AUTOTEST is enabled reboot_if_autotest() def enableFaulthandler(): """ Enable faulthandler for all threads. If the faulthandler package is available, this function disables and then re-enables fault handling for all threads (this is necessary to ensure any new threads are handled correctly), and returns True. If faulthandler is not available, then returns False. """ try: import faulthandler # necessary to disable first or else new threads may not be handled. faulthandler.disable() faulthandler.enable(all_threads=True) return True except ImportError: return False if __name__ == "__main__": # Clear the shell screen clear() enableFaulthandler() test_abspath = os.path.dirname(os.path.abspath(__file__)) configFile = 'setup.xml' loglevel = 'INFO' logObj.getlogger().info("Test program Started : {}".format(datetime.now())) try: argv = sys.argv[1:] opts, args = getopt.getopt(argv, 'c:l:', ['cfg', 'loglevel']) for opt, args in opts: if opt in ('-c', '--setup'): configFile = args if opt in ('-l', '--loglevel'): loglevel = args except getopt.GetoptError as err: logObj.getlogger().error('#{}#'.format(err)) if loglevel != 'INFO': logObj.setLogLevel(loglevel) # Try to parse the setup.xml file logObj.getlogger().debug("parse config file: {}". format(configFile)) try: xmlObj = XMLSetup(os.path.join(test_abspath, configFile)) except XMLParser.ParseError as error: # Error execute_station_error("Error parsing {} file {}".format(configFile, error.msg)) # Try to connect to the DB, according to setup.xml configuration if not psdbObj.open(xmlObj): # Error execute_station_error("Cannot open DB connection") else: logObj.getlogger().debug("database connection failed: {}".format(psdbObj.getConfig())) # get station name globalVar.station = socket.gethostname() logObj.getlogger().info("Test Station: {}".format(globalVar.station)) if psdbObj.get_setup_variable("AUTOTEST_" + globalVar.station, xmlObj.getKeyVal("test_main", "autotest", "0")) == '1': AutoTest = True OverrideAutoReboot = str2bool(xmlObj.getKeyVal("test_main", "OverrideAutoReboot", "0")) delay_reboot = xmlObj.getKeyVal("test_main", "delayReboot", "0") logObj.getlogger().info("AutoTest: {} OverrideAutoReboot: {} delay Reboot: {}".format(AutoTest, OverrideAutoReboot, delay_reboot)) # Check if current state is "WAIT_TEST_START". if not, waits here check_first_state() # Change state to "TESTS_CHECKING_ENV" set_next_state(StationStates.TESTS_CHECKING_ENV.name) # execute main program execute_test() exit(0)