from test.helpers.int_registers import get_die_id from test.helpers.int_registers import get_internal_mac from subprocess import call import os import unittest from test.helpers.testsrv_db import TestSrv_Database from test.helpers.setup_xml import XMLSetup from test.runners.simple import SimpleTestRunner from test.tests.qethernet import Qethernet from test.tests.qram import Qram from test.tests.qusb import Qusb from test.tests.qeeprom import Qeeprom from test.tests.qserial import Qserial from test.tests.qwifi import Qwifi from test.tests.qrtc import Qrtc from test.tests.qduplex_ser import Qduplex from test.tests.qamper import Qamper from test.tests.qnand import Qnand from test.tests.qaudio import Qaudio from test.tests.qdmesg import Qdmesg from test.helpers.globalVariables import globalVar import socket from test.helpers.iseelogger import ISEE_Logger import logging from test.tasks.flasheeprom import flash_eeprom from test.tasks.flashmemory import flash_memory from test.helpers.qrreader import QRReader from test.helpers.cmdline import LinuxKernel from test.helpers.amper import Amper from test.enums.StationStates import StationStates import time # global variables psdbObj = TestSrv_Database() xmlObj = None loggerObj = None test_abspath = None tests_manually_executed = [] # define clear function def clear(): # check and make call for specific operating system _ = call('clear' if os.name == 'posix' else 'cls') def create_paramslist(params): paramlist = {} for row in params: varname, varvalue = row paramlist[varname] = varvalue return paramlist def add_test(suite, testdefname, paramlist): if testdefname == "RAM": suite.addTest(Qram(testdefname, "execute", paramlist)) return 0 elif testdefname == "SERIALDUAL": suite.addTest(Qduplex(testdefname, "execute", paramlist)) return 0 elif testdefname == "EEPROM": suite.addTest(Qeeprom(testdefname, "execute", paramlist)) return 0 elif testdefname == "SERIAL": suite.addTest(Qserial(testdefname, "execute", paramlist)) return 0 elif testdefname == "RTC": suite.addTest(Qrtc(testdefname, "execute", paramlist)) return 0 elif testdefname == "CONSUMPTION": suite.addTest(Qamper(testdefname, "execute", paramlist)) return 0 elif testdefname == "ETHERNET": suite.addTest(Qethernet(testdefname, "execute", paramlist)) return 0 elif testdefname == "NAND": suite.addTest(Qnand(testdefname, "execute", paramlist)) return 0 elif testdefname == "WIFI": suite.addTest(Qwifi(testdefname, "execute", paramlist)) return 0 elif testdefname == "USB": suite.addTest(Qusb(testdefname, "execute", paramlist)) return 0 elif testdefname == "AUDIO": suite.addTest(Qaudio(testdefname, "execute", paramlist)) return 0 elif testdefname == "USBDUAL": suite.addTest(Qaudio(testdefname, "execute", paramlist)) return 0 elif testdefname == "DMESG": tests_manually_executed.append(Qdmesg(testdefname, "execute", paramlist)) return 0 else: return 1 def create_testsuite(): # create an object TestSuite suite1 = unittest.TestSuite() # get list of tests for this board tests = psdbObj.get_tests_list(globalVar.g_uuid) # loop in every test for this board for row in tests: testid, testdefname = row # get params for this test params = psdbObj.get_test_params_list(testid) paramlist = create_paramslist(params) # add the testid as parameter paramlist["testid"] = testid paramlist["boarduuid"] = globalVar.g_uuid paramlist["testidctl"] = globalVar.testid_ctl paramlist["xml"] = xmlObj paramlist["db"] = psdbObj # add test to TestSuite rescode = add_test(suite1, testdefname, paramlist) return suite1 def create_board(): cmd = LinuxKernel() model_id = cmd.getkvar("bmodel", "none") variant = cmd.getkvar("bvariant", "none") if model_id == "none" or variant == "none": print("Error: Cannot open DB connection") loggerObj.getlogger().info("Station error: #Cannot get model and variant information#") exit(0) # get model id globalVar.g_mid = model_id + "-" + variant # get station if "Station" not in globalVar.station: print("Error: Wrong Station name") loggerObj.getlogger().info("Station error: #Wrong station name#") exit(0) processor_id = get_die_id(globalVar.g_mid) print(globalVar.g_mid) print(processor_id) globalVar.g_uuid = psdbObj.create_board(processor_id, model_id, variant, globalVar.station, get_internal_mac(globalVar.g_mid)) print(globalVar.g_uuid) def get_taskvars_list(uuid): varlist = {} for row in psdbObj.get_task_variables_list(uuid): varname, varvalue = row varlist[varname] = varvalue return varlist def main(): # initialize the board create_board() # create a process globalVar.testid_ctl = psdbObj.open_test(globalVar.g_uuid) # Change state to "TESTS_RUNNING" if psdbObj.read_station_state(globalVar.station) == StationStates.STATION_ERROR.name: # Abort print( "Error: Wrong previous station state before changing to TESTS_RUNNING state. STATION_ERROR state unexpected.") exit(0) psdbObj.change_station_state(globalVar.station, StationStates.TESTS_RUNNING.name) # generate suits suite1 = create_testsuite() # Execute tests (round 1) runner1 = SimpleTestRunner(psdbObj) testresult = runner1.run(suite1) # Execute manually tests for test in tests_manually_executed: test.execute() # execute aditional tasks, only if the test was successful if testresult.wasSuccessful(): # Change state to "TESTS_OK" if psdbObj.read_station_state(globalVar.station) == StationStates.STATION_ERROR.name: # Abort print( "Error: Wrong previous station state before changing to TESTS_OK state. STATION_ERROR state unexpected.") exit(0) psdbObj.change_station_state(globalVar.station, StationStates.TESTS_OK.name) # Change state to "EXTRATASKS_RUNNING" if psdbObj.read_station_state(globalVar.station) == StationStates.STATION_ERROR.name: # Abort print( "Error: Wrong previous station state before changing to EXTRATASKS_RUNNING state. STATION_ERROR state unexpected.") exit(0) psdbObj.change_station_state(globalVar.station, StationStates.EXTRATASKS_RUNNING.name) # create task control globalVar.taskid_ctl = psdbObj.open_task(globalVar.g_uuid) # get extra variables varlist = get_taskvars_list(globalVar.g_uuid) alltasksok = False # flash eeprom resulteeprom = 0 if "eeprompath" in varlist and len(varlist["eeprompath"]) > 0: mac0 = psdbObj.get_board_macaddr(globalVar.g_uuid) resulteeprom, eepromdata = flash_eeprom(varlist["eeprompath"], globalVar.g_uuid, mac0) psdbObj.create_task_result(globalVar.taskid_ctl, "FLASHEEPROM", "TASK_OK" if resulteeprom == 0 else "TASK_FAIL", eepromdata) # flash non-volatile memory resultmemory = 0 if "flashimagepath" in varlist and len(varlist["flashimagepath"]) > 0: resultmemory = flash_memory(varlist["flashimagepath"]) psdbObj.create_task_result(globalVar.taskid_ctl, "FLASHMEMORY", "TASK_OK" if resultmemory == 0 else "TASK_FAIL", varlist["flashimagepath"]) # update status with the result if resulteeprom == 0 and resultmemory == 0: # finish tasks psdbObj.update_taskctl_status(globalVar.taskid_ctl, "TASK_BOARD_OK") # Change state to "WAITING_FOR_SCANNER" if psdbObj.read_station_state(globalVar.station) == StationStates.STATION_ERROR.name: # Abort print( "Error: Wrong previous station state before changing to WAITING_FOR_SCANNER state. STATION_ERROR state unexpected.") exit(0) psdbObj.change_station_state(globalVar.station, StationStates.WAITING_FOR_SCANNER.name) # get barcode using the scanner, only if autotest is disabled autotest = psdbObj.get_setup_variable("AUTOTEST_" + globalVar.station) if int(autotest) == 1: psdbObj.set_factorycode(globalVar.g_uuid, "1234567890") # fake factory code else: qrreceived = False while not qrreceived: qr = QRReader() if qr.openQR(): # waits 5s to receive a valid code if qr.readQRasync(5): qrreceived = True factorycode = qr.getQRNumber() psdbObj.set_factorycode(globalVar.g_uuid, factorycode) qr.closeQR() # Change state to "FINISHED" if psdbObj.read_station_state(globalVar.station) == StationStates.STATION_ERROR.name: # Abort print( "Error: Wrong previous station state before changing to FINISHED state. STATION_ERROR state unexpected.") exit(0) psdbObj.change_station_state(globalVar.station, StationStates.FINISHED.name) else: psdbObj.update_taskctl_status(globalVar.taskid_ctl, "TASK_BOARD_FAIL") loggerObj.getlogger().info("Station error: #Unable to complete extra tasks#") else: # Change state to "TESTS_FAILED" if psdbObj.read_station_state(globalVar.station) == StationStates.STATION_ERROR.name: # Abort print( "Error: Wrong previous station state before changing to TESTS_FAILED state. STATION_ERROR state unexpected.") exit(0) psdbObj.change_station_state(globalVar.station, StationStates.TESTS_FAILED.name) # reset board if AUTOTEST is enabled autotest = psdbObj.get_setup_variable("AUTOTEST_" + globalVar.station) if int(autotest) == 1: os.system('reboot') if __name__ == "__main__": # Clear the shell screen clear() test_abspath = os.path.dirname(os.path.abspath(__file__)) # create logger loggerObj = ISEE_Logger(logging.INFO) # Try to parse the setup.xml file try: xmlObj = XMLSetup(os.path.join(test_abspath, "setup.xml")) except: # Abort print("Error: Cannot parse setup.xml file") loggerObj.getlogger().info("Station error: #Cannot parse XML configuration file#") exit(0) # Try to connect to the DB, according to setup.xml configuration if not psdbObj.open(xmlObj): print("Error: Cannot open DB connection") loggerObj.getlogger().info("Station error: #Cannot open DB connection#") exit(0) # get station name globalVar.station = socket.gethostname() # Check if current state is "WAIT_TEST_START" currentstate = psdbObj.read_station_state(globalVar.station) while currentstate != StationStates.WAIT_TEST_START.name: currentstate = psdbObj.read_station_state(globalVar.station) if currentstate != StationStates.WAIT_TEST_START.name: # Wait print("Error: Wrong previous station state. WAIT_TEST_START state expected. Current state is: {}".format(currentstate)) print("Waiting for WAIT_TEST_START state") time.sleep(5) # Change state to "TESTS_CHECKING_ENV" psdbObj.change_station_state(globalVar.station, StationStates.TESTS_CHECKING_ENV.name) # Look for a barcode scanner qr = QRReader() if qr.openQR(): qr.closeQR() else: # Abort print("Error: Cannot find a barcode scanner.") loggerObj.getlogger().info("Station error: #Cannot find a barcode scanner#") exit(0) # Look for an amperimeter amp = Amper() if not amp.open(): # Abort print("Error: Cannot open an amperimeter COM port.") loggerObj.getlogger().info("Station error: #Cannot open an amperimeter COM port#") exit(0) if not amp.hello(): if not amp.hello(): # Abort print("Error: Cannot open find an amperimeter.") loggerObj.getlogger().info("Station error: #Cannot open find an amperimeter#") exit(0) # Execute main main() exit(0)