summaryrefslogtreecommitdiff
path: root/test-cli/test_main.py
diff options
context:
space:
mode:
authorManel Caro <mcaro@iatec.biz>2021-11-06 16:28:38 +0100
committerManel Caro <mcaro@iatec.biz>2021-11-06 16:28:38 +0100
commitcf19bfe18cbd283b188a858ee1629f9909c924f4 (patch)
tree1efb23519727130058401df090ab1b5f4cc8ba99 /test-cli/test_main.py
parentb6932fbaf898724ae87c29f8965621610f377084 (diff)
parentd5b273a3b58a250742049df4ca0ef0ba54f53d33 (diff)
downloadboard-sopa-test.zip
board-sopa-test.tar.gz
board-sopa-test.tar.bz2
Merge branch 'sopa-test'rel.0.1sopa-test
Diffstat (limited to 'test-cli/test_main.py')
-rw-r--r--test-cli/test_main.py495
1 files changed, 404 insertions, 91 deletions
diff --git a/test-cli/test_main.py b/test-cli/test_main.py
index 3c4d1cb..1f5d1e4 100644
--- a/test-cli/test_main.py
+++ b/test-cli/test_main.py
@@ -1,118 +1,431 @@
-from test.helpers.get_dieid import genDieid
-from subprocess import call
-import xml.etree.ElementTree as XMLParser
-import errno
-import sys
+import time
+import sh
import os
import unittest
+import socket
+import logging
+import sys
+import getopt
+import random
+
+from datetime import datetime
+from test.helpers.int_registers import get_die_id
+from test.helpers.int_registers import get_internal_mac
+from subprocess import call
from test.helpers.testsrv_db import TestSrv_Database
+from test.helpers.setup_xml import XMLSetup
from test.runners.simple import SimpleTestRunner
-from test.tests.qbutton import Qbutton
-from test.helpers.syscmd import TestSysCommand
-from test.helpers.syscmd import SysCommand
-from test.tests.qiperf import QIperf
from test.tests.qethernet import Qethernet
-from test.tests.qaudio import Qaudio
from test.tests.qram import Qram
from test.tests.qusb import Qusb
-from test.tests.qi2c import Qi2c
from test.tests.qeeprom import Qeeprom
from test.tests.qserial import Qserial
-from test.tests.qscreen import Qscreen
from test.tests.qwifi import Qwifi
from test.tests.qrtc import Qrtc
from test.tests.qduplex_ser import Qduplex
-from test.tests.qamp import Qamp
-from test.tests.qflash import Qflasher
-from test.helpers.finisher import Finisher
+from test.tests.qamper import Qamper
+from test.tests.qnand import Qnand
+from test.tests.qaudio import Qaudio
+from test.tests.qdmesg import Qdmesg
+from test.tests.qmmcflash import Qmmcflash
+from test.tests.qplc import Qplc
+from test.tests.qvideo import Qvideo
from test.helpers.globalVariables import globalVar
+from test.tasks.flasheeprom import EEprom_Flasher
+from test.tasks.flashmemory import NAND_Flasher
+from test.helpers.qrreader import QRReader
+from test.helpers.cmdline import LinuxKernelCmd
+from test.helpers.amper import Amper
+from test.enums.StationStates import StationStates
+import xml.etree.ElementTree as XMLParser
+from test.helpers.iseelogger import logObj
+from test.helpers.utils import str2bool
+
+# global variables
+psdbObj = TestSrv_Database()
+xmlObj = None
+test_abspath = None
+tests_manually_executed = []
+AutoTest = False
+OverrideAutoReboot = False
+delay_reboot = '0'
+
# define clear function
def clear():
# check and make call for specific operating system
- _ = call('clear' if os.name =='posix' else 'cls')
+ _ = call('clear' if os.name == 'posix' else 'cls')
+def reboot_board():
+ logObj.getlogger().debug("#reboot board#")
+ if not OverrideAutoReboot:
+ set_next_state(StationStates.STATION_WAIT_SHUTDOWN.name)
+ if delay_reboot == '0':
+ time.sleep(5)
+ sh.shutdown('-r', '+{}'.format(delay_reboot), _bg=True)
+ # sh.reboot(_bg=True)
+ print("REBOOT...")
+ exit(0)
-def create_board():
- psdb = TestSrv_Database()
- psdb.open("setup.xml")
- tree = XMLParser.parse('setup.xml')
- root = tree.getroot()
- suite = unittest.TestSuite()
- for element in root.iter('board'):
- # print(str(element.tag) + str(element.attrib))
- model_id = element.attrib['model']
- variant = element.attrib['variant']
- nstation = element.attrib['station']
- globalVar.g_mid=model_id + "-" + variant
- globalVar.station=nstation
- processor_id=genDieid(globalVar.g_mid)
- print(globalVar.g_mid)
- print(processor_id)
- globalVar.g_uuid = psdb.create_board(processor_id, model_id, variant, bmac = None)
-
-def testsuite():
- psdb=TestSrv_Database()
- psdb.open("setup.xml")
- suite = unittest.TestSuite()
- tests=psdb.getboard_comp_test_list(globalVar.g_uuid)
- for i in range(len(tests)):
- #newstr = oldstr.replace("M", "")
- variables=str(tests[i][0]).split(",")
- testname=variables[0].replace('(', '')
- testdes=variables[1]
- testfunc=variables[2]
- if len(tests)>2:
- testparam=variables[3].replace(')', '')
- testparam = testparam.replace('"', '')
- testparam = testparam.replace(';', "','")
- if testparam == "":
- command = "suite.addTest({}('{}','execute'))".format(testfunc, testname)
+def wait_for_reboot():
+ while True:
+ currentstate = psdbObj.read_station_state(globalVar.station)
+ if currentstate == StationStates.STATION_REBOOT.name:
+ reboot_board()
+ time.sleep(5)
+
+def execute_station_error(text):
+ logObj.getlogger().error("Station error: #{}#".format(text))
+ wait_for_reboot()
+
+def check_first_state():
+ currentstate = psdbObj.read_station_state(globalVar.station)
+ if currentstate == StationStates.STATION_ERROR.name or currentstate == StationStates.STATION_REBOOT.name:
+ logObj.getlogger().debug('station state during the first check is {}'.format(currentstate))
+ wait_for_reboot()
+ else:
+ count_t = int(xmlObj.getKeyVal("test_main", "wait_test_start", "240"))
+ while currentstate != StationStates.WAIT_TEST_START.name:
+ logObj.getlogger().debug('wait for WAIT_TEST_START but read: {}'.format(currentstate))
+ if count_t <= 0:
+ # Error
+ execute_station_error("Timeout while waiting for WAIT_TEST_START state actual STATE = {}".format(currentstate))
else:
- command="suite.addTest({}('{}','execute','{}'))".format(testfunc,testname,testparam)
+ count_t = count_t - 5
+ time.sleep(5)
+ currentstate = psdbObj.read_station_state(globalVar.station)
+ if currentstate == StationStates.STATION_ERROR.name or currentstate == StationStates.STATION_REBOOT.name:
+ wait_for_reboot()
+
+
+def set_next_state(newstate):
+ statewritten = psdbObj.change_station_state(globalVar.station, newstate)
+ if statewritten == StationStates.STATION_REBOOT.name or statewritten == StationStates.STATION_ERROR.name:
+ wait_for_reboot()
+
+def reboot_if_autotest():
+ # reset board if AUTOTEST is enabled
+ if AutoTest:
+ reboot_board()
+
+
+def create_paramslist(params):
+ paramlist = {}
+ for varname, varvalue in params:
+ paramlist[varname] = varvalue
+ return paramlist
+
+
+def add_test(suite, testdefname, paramlist):
+ if testdefname == "RAM":
+ suite.addTest(Qram(testdefname, "execute", paramlist))
+ return 0
+ elif testdefname == "SERIALDUAL":
+ suite.addTest(Qduplex(testdefname, "execute", paramlist))
+ return 0
+ elif testdefname == "EEPROM":
+ suite.addTest(Qeeprom(testdefname, "execute", paramlist))
+ return 0
+ elif testdefname == "SERIAL":
+ suite.addTest(Qserial(testdefname, "execute", paramlist))
+ return 0
+ elif testdefname == "RTC":
+ suite.addTest(Qrtc(testdefname, "execute", paramlist))
+ return 0
+ elif testdefname == "CONSUMPTION":
+ suite.addTest(Qamper(testdefname, "execute", paramlist))
+ return 0
+ elif testdefname == "ETHERNET":
+ suite.addTest(Qethernet(testdefname, "execute", paramlist))
+ return 0
+ elif testdefname == "NAND":
+ suite.addTest(Qnand(testdefname, "execute", paramlist))
+ return 0
+ elif testdefname == "WIFI":
+ suite.addTest(Qwifi(testdefname, "execute", paramlist))
+ return 0
+ elif testdefname == "USB":
+ suite.addTest(Qusb(testdefname, "execute", paramlist))
+ return 0
+ elif testdefname == "AUDIO":
+ suite.addTest(Qaudio(testdefname, "execute", paramlist))
+ return 0
+ elif testdefname == "DMESG":
+ tests_manually_executed.append(Qdmesg(testdefname, "execute", paramlist))
+ return 0
+ elif testdefname == "MMCFLASH":
+ suite.addTest(Qmmcflash(testdefname, "execute", paramlist))
+ return 0
+ elif testdefname == "PLC_MODEM":
+ suite.addTest(Qplc(testdefname, "execute", paramlist))
+ return 0
+ elif testdefname == "VIDEO":
+ suite.addTest(Qvideo(testdefname, "execute", paramlist))
+ return 0
+ else:
+ logObj.getlogger().error("Params for test: {} not implemented (ignore it), ".format(testdefname))
+ return 1
+
+
+def create_testsuite():
+ # create an object TestSuite
+ suite1 = unittest.TestSuite()
+ # get list of tests for this board
+ tests = psdbObj.get_tests_list(globalVar.g_uuid)
+ logObj.getlogger().debug("Create TestSuite: {}".format(tests))
+ # loop in every test for this board
+ for testid, testdefname in tests:
+ # get params for this test
+ params = psdbObj.get_test_params_list(testid)
+ paramlist = create_paramslist(params)
+ # add the testid as parameter
+ paramlist["testid"] = testid
+ paramlist["boarduuid"] = globalVar.g_uuid
+ paramlist["testidctl"] = globalVar.testid_ctl
+ paramlist['testPath'] = test_abspath
+ logObj.getlogger().debug("Params for test: {}:{}-{}, ".format(testid, testdefname, paramlist))
+ paramlist["xml"] = xmlObj
+ paramlist["db"] = psdbObj
+ # add test to TestSuite
+ add_test(suite1, testdefname, paramlist)
+
+ return suite1
+
+
+def create_board():
+
+ cmd = LinuxKernelCmd()
+ model_id = cmd.getkvar("bmodel", "none")
+ variant = cmd.getkvar("bvariant", "none")
+ logObj.getlogger().debug("Kernel Command line vars: bmodel: {} bvariant: {}".format(model_id, variant))
+
+ if model_id == "none" or variant == "none":
+ # Error
+ execute_station_error("Cannot get model {} and variant {} information".format(model_id, variant))
+
+ # get model id
+ globalVar.g_mid = model_id + "-" + variant
+ logObj.getlogger().debug("Model-Variant -> {}".format(globalVar.g_mid))
+
+ # get station
+ if "Station" not in globalVar.station:
+ # Error
+ execute_station_error("Hostname not defined")
+
+ processor_id = get_die_id(globalVar.g_mid)
+ globalVar.g_uuid = psdbObj.create_board(processor_id, model_id, variant, globalVar.station,
+ get_internal_mac(globalVar.g_mid))
+
+ logObj.getlogger().info("processor ID: {} uuid: {}".format(processor_id, globalVar.g_uuid))
+
+
+def get_taskvars_list(uuid):
+ varlist = {}
+ for varname, varvalue in psdbObj.get_task_variables_list(uuid):
+ varlist[varname] = varvalue
+ return varlist
+
+def execute_extra_task():
+ # list of task
+ myTask = []
+ # Change state to "EXTRATASKS_RUNNING"
+ set_next_state(StationStates.EXTRATASKS_RUNNING.name)
+ # create task control
+ globalVar.taskid_ctl = psdbObj.open_task(globalVar.g_uuid)
+ # get extra variables
+ varlist = get_taskvars_list(globalVar.g_uuid)
+
+ # Put default Vars
+ varlist["boarduuid"] = globalVar.g_uuid
+ varlist["testidctl"] = globalVar.testid_ctl
+ varlist['testPath'] = test_abspath
+ varlist["xml"] = xmlObj
+ varlist["db"] = psdbObj
+
+ # Get list Extra Task and activation
+ eeprom = varlist.get('eeprom', '0')
+ flashNand = varlist.get('flashNAND', '0')
+
+ if eeprom == '1':
+ myEEprom = EEprom_Flasher(varlist)
+ myTask.append(myEEprom)
+ if flashNand == '1':
+ myNAND = NAND_Flasher(varlist)
+ myTask.append(myNAND)
+
+ taskresult = True
+ for task in myTask:
+ res, data = task.Execute()
+ if res:
+ psdbObj.create_task_result(globalVar.taskid_ctl, task.getTaskName(), "TASK_OK", data)
+ logObj.getlogger().info("task: {} ok".format(task.getTaskName()))
else:
- print(testname)
- command = "suite.addTest({}('{}','execute'))".format(testfunc, testname)
- exec(command)
- globalVar.testid_ctl=psdb.open_testbatch(globalVar.g_uuid)
- return suite
-
-
-def finish_test():
- psdb = TestSrv_Database()
- psdb.open("setup.xml")
- auxs = psdb.close_testbatch(globalVar.g_uuid, globalVar.testid_ctl)
- globalVar.fstatus = auxs[0][0]
- # Burn eeprom struct
- psdb = TestSrv_Database()
- psdb.open("setup.xml")
- # We should call getboard_eeprom only if test was ok
- if globalVar.fstatus:
- aux = psdb.getboard_eeprom(globalVar.g_uuid)
- finish = Finisher(aux)
- finish.end_ok()
+ psdbObj.create_task_result(globalVar.taskid_ctl, task.getTaskName(), "TASK_FAIL", data)
+ logObj.getlogger().info("task: {} failed".format(task.getTaskName()))
+ taskresult = False
+
+ # check final taskresult => ok or fail
+ if taskresult:
+ psdbObj.update_taskctl_status(globalVar.taskid_ctl, "TASK_BOARD_OK")
else:
- finish = Finisher(globalVar.g_uuid)
- finish.end_fail()
- # Update set_test current_test with 'END' so that it finally gets painted in green
- psdb = TestSrv_Database()
- psdb.open("setup.xml")
- psdb.update_set_test_row(globalVar.station, globalVar.testid_ctl, globalVar.g_uuid, "END","FINISH")
-
-def main():
- #addtesttomodel()
- #addtestdef()
+ psdbObj.update_taskctl_status(globalVar.taskid_ctl, "TASK_BOARD_FAIL")
+ for task in myTask:
+ if task.getError() != '':
+ execute_station_error("Unable to complete extra tasks: {}".format(task.getError()))
+ # Not emit a error message
+ execute_station_error("Unable to complete extra tasks")
+
+def getBarCode(fake):
+
+ set_next_state(StationStates.WAITING_FOR_SCANNER.name)
+ if fake:
+ psdbObj.set_factorycode(globalVar.g_uuid, str(random.randint(1, 1000001)))
+ return
+ qrreceived = False
+ wait_count = psdbObj.get_setup_variable("test_main", xmlObj.getKeyVal("test_main", "qr_wait_time", "120"))
+ while not qrreceived or int(wait_count) > 0:
+ qr = QRReader()
+ if qr.openQR():
+ # waits 5s to receive a valid code
+ if qr.readQRasync(10):
+ qrreceived = True
+ factorycode = qr.getQRNumber()
+ psdbObj.set_factorycode(globalVar.g_uuid, factorycode)
+ return
+ qr.closeQR()
+ else:
+ qrreceived = True
+ del qr
+ time.sleep(1)
+ wait_count = wait_count - 1
+ if int(wait_count) == 0:
+ execute_station_error("Cannot find a barcode scanner")
+
+
+def execute_test ():
+ # initialize the board
create_board()
- #globalVar.g_uuid = "1f59c654-0cc6-11e8-8d51-e644f56b8edd"
- try:
- os.remove("test_results.dat")
- except:
- pass
- runner = SimpleTestRunner()
- runner.run(testsuite())
- finish_test()
+ # create a process
+ globalVar.testid_ctl = psdbObj.open_test(globalVar.g_uuid)
+ logObj.getlogger().info("Create new Test: {}".format(globalVar.testid_ctl))
+
+ # Change state to "TESTS_RUNNING"
+ set_next_state(StationStates.TESTS_RUNNING.name)
+
+ # generate suits
+ suite1 = create_testsuite()
+ # Execute tests (round 1)
+ runner1 = SimpleTestRunner(psdbObj)
+ logObj.getlogger().info("Runner run id: {}".format(globalVar.testid_ctl))
+ testresult = runner1.run(suite1)
+ logObj.getlogger().info("Runner run id: {} finish {}".format(globalVar.testid_ctl, testresult))
+
+ # Execute manually tests
+ for test in tests_manually_executed:
+ logObj.getlogger().info("Execute manual test id: {}:{}".format(globalVar.testid_ctl, test.getTestName()))
+ mtestResult = test.execute()
+ logObj.getlogger().info("Execute manual test id: {}:{} Result: {}".format(globalVar.testid_ctl, test.getTestName(),
+ 'ok' if mtestResult else 'fail'))
+
+ # execute aditional tasks, only if the test was successful
+ if testresult.wasSuccessful():
+ logObj.getlogger().info("Execute test id: {} finished succesfully".format(globalVar.testid_ctl))
+ set_next_state(StationStates.TESTS_OK.name)
+ logObj.getlogger().info("Execute test id: {} extra task".format(globalVar.testid_ctl))
+ execute_extra_task()
+ logObj.getlogger().info("Execute test id: {} wait read scanner".format(globalVar.testid_ctl))
+ getBarCode(AutoTest)
+ set_next_state(StationStates.FINISHED.name)
+ logObj.getlogger().info("Execute test id: {} wait read scanner finish".format(globalVar.testid_ctl))
+ else:
+ logObj.getlogger().info("Execute test id: {} finished with one or more fails".format(globalVar.testid_ctl))
+ # Change state to "TESTS_FAILED"
+ set_next_state(StationStates.TESTS_FAILED.name)
+
+ # reset board if AUTOTEST is enabled
+ reboot_if_autotest()
+
+
+def enableFaulthandler():
+ """ Enable faulthandler for all threads.
+
+ If the faulthandler package is available, this function disables and then
+ re-enables fault handling for all threads (this is necessary to ensure any
+ new threads are handled correctly), and returns True.
+
+ If faulthandler is not available, then returns False.
+ """
+ try:
+ import faulthandler
+ # necessary to disable first or else new threads may not be handled.
+ faulthandler.disable()
+ faulthandler.enable(all_threads=True)
+ return True
+ except ImportError:
+ return False
if __name__ == "__main__":
+ # Clear the shell screen
clear()
- main()
+
+ enableFaulthandler()
+
+ test_abspath = os.path.dirname(os.path.abspath(__file__))
+ configFile = 'setup.xml'
+ loglevel = 'INFO'
+ logObj.getlogger().info("Test program Started : {}".format(datetime.now()))
+ try:
+ argv = sys.argv[1:]
+ opts, args = getopt.getopt(argv, 'c:l:', ['cfg', 'loglevel'])
+ for opt, args in opts:
+ if opt in ('-c', '--setup'):
+ configFile = args
+ if opt in ('-l', '--loglevel'):
+ loglevel = args
+ except getopt.GetoptError as err:
+ logObj.getlogger().error('#{}#'.format(err))
+
+ if loglevel != 'INFO':
+ logObj.setLogLevel(loglevel)
+
+ # Try to parse the setup.xml file
+ logObj.getlogger().debug("parse config file: {}". format(configFile))
+ try:
+ xmlObj = XMLSetup(os.path.join(test_abspath, configFile))
+ except XMLParser.ParseError as error:
+ # Error
+ execute_station_error("Error parsing {} file {}".format(configFile, error.msg))
+
+ # Try to connect to the DB, according to setup.xml configuration
+ if not psdbObj.open(xmlObj):
+ # Error
+ execute_station_error("Cannot open DB connection")
+ else:
+ logObj.getlogger().debug("database connection ok: {}".format(psdbObj.getConfig()))
+
+ # get station name
+ globalVar.station = socket.gethostname()
+ logObj.getlogger().info("Test Station: {}".format(globalVar.station))
+
+ if psdbObj.get_setup_variable("AUTOTEST_" + globalVar.station, xmlObj.getKeyVal("test_main", "autotest", "0")) == '1':
+ AutoTest = True
+
+ OverrideAutoReboot = str2bool(xmlObj.getKeyVal("test_main", "OverrideAutoReboot", "0"))
+ delay_reboot = xmlObj.getKeyVal("test_main", "delayReboot", "0")
+
+ logObj.getlogger().info("AutoTest: {} OverrideAutoReboot: {} delay Reboot: {}".format(AutoTest, OverrideAutoReboot, delay_reboot))
+
+ # Check if current state is "WAIT_TEST_START". if not, waits here
+ if xmlObj.getKeyVal("test_main", "overrideCheckWaitStart", "0") == "0":
+ check_first_state()
+ else:
+ psdbObj.setDevelStationState(globalVar.station, "WAIT_TEST_START")
+
+ # Change state to "TESTS_CHECKING_ENV"
+ set_next_state(StationStates.TESTS_CHECKING_ENV.name)
+
+ # execute main program
+ execute_test()
+
+ exit(0)