summaryrefslogtreecommitdiff
path: root/test-cli/test_main.py
diff options
context:
space:
mode:
authorManel Caro <mcaro@iseebcn.com>2020-07-31 02:07:37 +0200
committerManel Caro <mcaro@iseebcn.com>2020-07-31 02:07:37 +0200
commitd46bce422fd03cd57d1ba336361da17d6efb48db (patch)
treee5ec7aa1ee5d53a655ce121a7c2ddd95888fc989 /test-cli/test_main.py
parent907b96801230e04d02575a3732a73e452089637b (diff)
downloadboard-d46bce422fd03cd57d1ba336361da17d6efb48db.zip
board-d46bce422fd03cd57d1ba336361da17d6efb48db.tar.gz
board-d46bce422fd03cd57d1ba336361da17d6efb48db.tar.bz2
TEST restructure
Diffstat (limited to 'test-cli/test_main.py')
-rw-r--r--test-cli/test_main.py372
1 files changed, 220 insertions, 152 deletions
diff --git a/test-cli/test_main.py b/test-cli/test_main.py
index 942df61..3f52152 100644
--- a/test-cli/test_main.py
+++ b/test-cli/test_main.py
@@ -1,8 +1,17 @@
+import time
+import sh
+import os
+import unittest
+import socket
+import logging
+import sys
+import getopt
+import random
+
+from datetime import datetime
from test.helpers.int_registers import get_die_id
from test.helpers.int_registers import get_internal_mac
from subprocess import call
-import os
-import unittest
from test.helpers.testsrv_db import TestSrv_Database
from test.helpers.setup_xml import XMLSetup
from test.runners.simple import SimpleTestRunner
@@ -11,7 +20,6 @@ from test.tests.qram import Qram
from test.tests.qusb import Qusb
from test.tests.qeeprom import Qeeprom
from test.tests.qserial import Qserial
-from test.tests.qusbdual import Qusbdual
from test.tests.qwifi import Qwifi
from test.tests.qrtc import Qrtc
from test.tests.qduplex_ser import Qduplex
@@ -19,25 +27,28 @@ from test.tests.qamper import Qamper
from test.tests.qnand import Qnand
from test.tests.qaudio import Qaudio
from test.tests.qdmesg import Qdmesg
+from test.tests.qmmcflash import Qmmcflash
+from test.tests.qplc import Qplc
from test.helpers.globalVariables import globalVar
-import socket
-from test.helpers.iseelogger import ISEE_Logger
-import logging
-from test.tasks.flasheeprom import flash_eeprom
-from test.tasks.flashmemory import flash_memory
+from test.tasks.flasheeprom import EEprom_Flasher
+from test.tasks.flashmemory import NAND_Flasher
from test.helpers.qrreader import QRReader
-from test.helpers.cmdline import LinuxKernel
+from test.helpers.cmdline import LinuxKernelCmd
from test.helpers.amper import Amper
from test.enums.StationStates import StationStates
-import time
-import sh
+from test.helpers.mydebugger import sentdebugmessage
+import xml.etree.ElementTree as XMLParser
+from test.helpers.iseelogger import logObj
+from test.helpers.utils import str2bool
# global variables
psdbObj = TestSrv_Database()
xmlObj = None
-loggerObj = None
test_abspath = None
tests_manually_executed = []
+AutoTest = False
+OverrideAutoReboot = False
+delay_reboot = '0'
# define clear function
@@ -45,74 +56,62 @@ def clear():
# check and make call for specific operating system
_ = call('clear' if os.name == 'posix' else 'cls')
+def reboot_board():
+ logObj.getlogger().debug("#reboot board#")
+ if not OverrideAutoReboot:
+ set_next_state(StationStates.STATION_WAIT_SHUTDOWN.name)
+ if delay_reboot == '0':
+ time.sleep(5)
+ sh.shutdown('-r', '+{}'.format(delay_reboot), _bg=True)
+ # sh.reboot(_bg=True)
+ print("REBOOT...")
+ exit(0)
-def execute_station_error(text):
- print("Error: {}".format(text))
- loggerObj.getlogger().info("Station error: #{}#".format(text))
+def wait_for_reboot():
while True:
currentstate = psdbObj.read_station_state(globalVar.station)
if currentstate == StationStates.STATION_REBOOT.name:
reboot_board()
- time.sleep(1)
+ time.sleep(5)
+def execute_station_error(text):
+ logObj.getlogger().error("Station error: #{}#".format(text))
+ wait_for_reboot()
def check_first_state():
currentstate = psdbObj.read_station_state(globalVar.station)
- if currentstate == StationStates.STATION_ERROR.name:
- while True:
- currentstate = psdbObj.read_station_state(globalVar.station)
- if currentstate == StationStates.STATION_REBOOT.name:
- reboot_board()
- time.sleep(1)
- elif currentstate == StationStates.STATION_REBOOT.name:
- reboot_board()
+ if currentstate == StationStates.STATION_ERROR.name or currentstate == StationStates.STATION_REBOOT.name:
+ logObj.getlogger().debug('station state during the first check is {}'.format(currentstate))
+ wait_for_reboot()
else:
- starttime = time.time()
+ count_t = int(xmlObj.getKeyVal("test_main", "wait_test_start", "240"))
while currentstate != StationStates.WAIT_TEST_START.name:
- if time.time() - starttime >= 30.0:
+ logObj.getlogger().debug('wait for WAIT_TEST_START but read: {}'.format(currentstate))
+ if count_t <= 0:
# Error
- execute_station_error("Timeout while waiting for WAIT_TEST_START state")
+ execute_station_error("Timeout while waiting for WAIT_TEST_START state actual STATE = {}".format(currentstate))
else:
+ count_t = count_t - 5
time.sleep(5)
currentstate = psdbObj.read_station_state(globalVar.station)
- if currentstate == StationStates.STATION_ERROR.name:
- while True:
- currentstate = psdbObj.read_station_state(globalVar.station)
- if currentstate == StationStates.STATION_REBOOT.name:
- reboot_board()
- time.sleep(1)
- elif currentstate == StationStates.STATION_REBOOT.name:
- reboot_board()
-
-
-def reboot_board():
- sh.reboot()
- exit(0)
+ if currentstate == StationStates.STATION_ERROR.name or currentstate == StationStates.STATION_REBOOT.name:
+ wait_for_reboot()
def set_next_state(newstate):
statewritten = psdbObj.change_station_state(globalVar.station, newstate)
- if statewritten == StationStates.STATION_REBOOT.name:
- reboot_board()
- elif statewritten == StationStates.STATION_ERROR.name:
- while True:
- currentstate = psdbObj.read_station_state(globalVar.station)
- if currentstate == StationStates.STATION_REBOOT.name:
- reboot_board()
- time.sleep(1)
-
+ if statewritten == StationStates.STATION_REBOOT.name or statewritten == StationStates.STATION_ERROR.name:
+ wait_for_reboot()
def reboot_if_autotest():
# reset board if AUTOTEST is enabled
- autotest = psdbObj.get_setup_variable("AUTOTEST_" + globalVar.station)
- if int(autotest) == 1:
+ if AutoTest:
reboot_board()
def create_paramslist(params):
paramlist = {}
- for row in params:
- varname, varvalue = row
+ for varname, varvalue in params:
paramlist[varname] = varvalue
return paramlist
@@ -151,13 +150,17 @@ def add_test(suite, testdefname, paramlist):
elif testdefname == "AUDIO":
suite.addTest(Qaudio(testdefname, "execute", paramlist))
return 0
- elif testdefname == "USBLOOP":
- suite.addTest(Qusbdual(testdefname, "execute", paramlist))
- return 0
elif testdefname == "DMESG":
tests_manually_executed.append(Qdmesg(testdefname, "execute", paramlist))
return 0
+ elif testdefname == "MMCFLASH":
+ suite.addTest(Qmmcflash(testdefname, "execute", paramlist))
+ return 0
+ elif testdefname == "PLC_MODEM":
+ suite.addTest(Qplc(testdefname, "execute", paramlist))
+ return 0
else:
+ logObj.getlogger().error("Params for test: {} not implemented (ignore it), ".format(testdefname))
return 1
@@ -166,9 +169,9 @@ def create_testsuite():
suite1 = unittest.TestSuite()
# get list of tests for this board
tests = psdbObj.get_tests_list(globalVar.g_uuid)
+ logObj.getlogger().debug("Create TestSuite: {}".format(tests))
# loop in every test for this board
- for row in tests:
- testid, testdefname = row
+ for testid, testdefname in tests:
# get params for this test
params = psdbObj.get_test_params_list(testid)
paramlist = create_paramslist(params)
@@ -176,6 +179,8 @@ def create_testsuite():
paramlist["testid"] = testid
paramlist["boarduuid"] = globalVar.g_uuid
paramlist["testidctl"] = globalVar.testid_ctl
+ paramlist['testPath'] = test_abspath
+ logObj.getlogger().debug("Params for test: {}:{}-{}, ".format(testid, testdefname, paramlist))
paramlist["xml"] = xmlObj
paramlist["db"] = psdbObj
# add test to TestSuite
@@ -185,44 +190,123 @@ def create_testsuite():
def create_board():
- cmd = LinuxKernel()
+
+ cmd = LinuxKernelCmd()
model_id = cmd.getkvar("bmodel", "none")
variant = cmd.getkvar("bvariant", "none")
+ logObj.getlogger().debug("Kernel Command line vars: bmodel: {} bvariant: {}".format(model_id, variant))
if model_id == "none" or variant == "none":
# Error
- execute_station_error("Cannot get model and variant information")
+ execute_station_error("Cannot get model {} and variant {} information".format(model_id, variant))
# get model id
globalVar.g_mid = model_id + "-" + variant
+ logObj.getlogger().debug("Model-Variant -> {}".format(globalVar.g_mid))
# get station
if "Station" not in globalVar.station:
# Error
- execute_station_error("Wrong station name")
+ execute_station_error("Hostname not defined")
processor_id = get_die_id(globalVar.g_mid)
- print(globalVar.g_mid)
- print(processor_id)
globalVar.g_uuid = psdbObj.create_board(processor_id, model_id, variant, globalVar.station,
get_internal_mac(globalVar.g_mid))
- print(globalVar.g_uuid)
+
+ logObj.getlogger().info("processor ID: {} uuid: {}".format(processor_id, globalVar.g_uuid))
def get_taskvars_list(uuid):
varlist = {}
- for row in psdbObj.get_task_variables_list(uuid):
- varname, varvalue = row
+ for varname, varvalue in psdbObj.get_task_variables_list(uuid):
varlist[varname] = varvalue
return varlist
+def execute_extra_task():
+ # list of task
+ myTask = []
+ # Change state to "EXTRATASKS_RUNNING"
+ set_next_state(StationStates.EXTRATASKS_RUNNING.name)
+ # create task control
+ globalVar.taskid_ctl = psdbObj.open_task(globalVar.g_uuid)
+ # get extra variables
+ varlist = get_taskvars_list(globalVar.g_uuid)
+
+ # Put default Vars
+ varlist["boarduuid"] = globalVar.g_uuid
+ varlist["testidctl"] = globalVar.testid_ctl
+ varlist['testPath'] = test_abspath
+ varlist["xml"] = xmlObj
+ varlist["db"] = psdbObj
+
+ # Get list Extra Task and activation
+ eeprom = varlist.get('eeprom', '0')
+ flashNand = varlist.get('flashNAND', '0')
+
+ if eeprom == '1':
+ myEEprom = EEprom_Flasher(varlist)
+ myTask.append(myEEprom)
+ if flashNand == '1':
+ myNAND = NAND_Flasher(varlist)
+ myTask.append(myNAND)
+
+ taskresult = True
+ for task in myTask:
+ res, data = task.Execute()
+ if res:
+ psdbObj.create_task_result(globalVar.taskid_ctl, task.getTaskName(), "TASK_OK", data)
+ logObj.getlogger().info("task: {} ok".format(task.getTaskName()))
+ else:
+ psdbObj.create_task_result(globalVar.taskid_ctl, task.getTaskName(), "TASK_FAIL", data)
+ logObj.getlogger().info("task: {} failed".format(task.getTaskName()))
+ taskresult = False
+
+ # check final taskresult => ok or fail
+ if taskresult:
+ psdbObj.update_taskctl_status(globalVar.taskid_ctl, "TASK_BOARD_OK")
+ else:
+ psdbObj.update_taskctl_status(globalVar.taskid_ctl, "TASK_BOARD_FAIL")
+ for task in myTask:
+ if task.getError() != '':
+ execute_station_error("Unable to complete extra tasks: {}".format(task.getError()))
+ # Not emit a error message
+ execute_station_error("Unable to complete extra tasks")
+
+def getBarCode(fake):
+
+ set_next_state(StationStates.WAITING_FOR_SCANNER.name)
+ if fake:
+ psdbObj.set_factorycode(globalVar.g_uuid, str(random.randint(1, 1000001)))
+ return
+ qrreceived = False
+ wait_count = psdbObj.get_setup_variable("QRWAIT_SEC", xmlObj.getKeyVal("test_main", "qr_wait_time", "120"))
+ while not qrreceived or int(wait_count) > 0:
+ qr = QRReader()
+ if qr.openQR():
+ # waits 5s to receive a valid code
+ if qr.readQRasync(10):
+ qrreceived = True
+ factorycode = qr.getQRNumber()
+ psdbObj.set_factorycode(globalVar.g_uuid, factorycode)
+ return
+ qr.closeQR()
+ else:
+ qrreceived = True
+ del qr
+ time.sleep(1)
+ wait_count = wait_count - 1
+ if int(wait_count) == 0:
+ execute_station_error("Cannot find a barcode scanner")
+
-def main():
+def execute_test ():
# initialize the board
create_board()
# create a process
globalVar.testid_ctl = psdbObj.open_test(globalVar.g_uuid)
+ logObj.getlogger().info("Create new Test: {}".format(globalVar.testid_ctl))
+
# Change state to "TESTS_RUNNING"
set_next_state(StationStates.TESTS_RUNNING.name)
@@ -230,71 +314,29 @@ def main():
suite1 = create_testsuite()
# Execute tests (round 1)
runner1 = SimpleTestRunner(psdbObj)
+ logObj.getlogger().info("Runner run id: {}".format(globalVar.testid_ctl))
testresult = runner1.run(suite1)
+ logObj.getlogger().info("Runner run id: {} finish {}".format(globalVar.testid_ctl, testresult))
# Execute manually tests
for test in tests_manually_executed:
- test.execute()
+ logObj.getlogger().info("Execute manual test id: {}:{}".format(globalVar.testid_ctl, test.getTestName()))
+ mtestResult = test.execute()
+ logObj.getlogger().info("Execute manual test id: {}:{} Result: {}".format(globalVar.testid_ctl, test.getTestName(),
+ 'ok' if mtestResult else 'fail'))
# execute aditional tasks, only if the test was successful
if testresult.wasSuccessful():
- # Change state to "TESTS_OK"
+ logObj.getlogger().info("Execute test id: {} finished succesfully".format(globalVar.testid_ctl))
set_next_state(StationStates.TESTS_OK.name)
- # Change state to "EXTRATASKS_RUNNING"
- set_next_state(StationStates.EXTRATASKS_RUNNING.name)
- # create task control
- globalVar.taskid_ctl = psdbObj.open_task(globalVar.g_uuid)
- # get extra variables
- varlist = get_taskvars_list(globalVar.g_uuid)
-
- alltasksok = False
-
- # flash eeprom
- resulteeprom = 0
- if "eeprompath" in varlist and len(varlist["eeprompath"]) > 0:
- mac0 = psdbObj.get_board_macaddr(globalVar.g_uuid)
- resulteeprom, eepromdata = flash_eeprom(varlist["eeprompath"], globalVar.g_uuid, mac0)
- psdbObj.create_task_result(globalVar.taskid_ctl, "FLASHEEPROM",
- "TASK_OK" if resulteeprom == 0 else "TASK_FAIL", eepromdata)
-
- # flash non-volatile memory
- resultmemory = 0
- if "flashimagepath" in varlist and len(varlist["flashimagepath"]) > 0:
- resultmemory = flash_memory(varlist["flashimagepath"])
- psdbObj.create_task_result(globalVar.taskid_ctl, "FLASHMEMORY",
- "TASK_OK" if resultmemory == 0 else "TASK_FAIL",
- varlist["flashimagepath"])
-
- # update status with the result
- if resulteeprom == 0 and resultmemory == 0:
- # finish tasks
- psdbObj.update_taskctl_status(globalVar.taskid_ctl, "TASK_BOARD_OK")
-
- # Change state to "WAITING_FOR_SCANNER"
- set_next_state(StationStates.WAITING_FOR_SCANNER.name)
-
- # get barcode using the scanner, only if autotest is disabled
- autotest = psdbObj.get_setup_variable("AUTOTEST_" + globalVar.station)
- if int(autotest) == 1:
- psdbObj.set_factorycode(globalVar.g_uuid, "1234567890") # fake factory code
- else:
- qrreceived = False
- while not qrreceived:
- qr = QRReader()
- if qr.openQR():
- # waits 5s to receive a valid code
- if qr.readQRasync(5):
- qrreceived = True
- factorycode = qr.getQRNumber()
- psdbObj.set_factorycode(globalVar.g_uuid, factorycode)
- qr.closeQR()
-
- # Change state to "FINISHED"
- set_next_state(StationStates.FINISHED.name)
- else:
- psdbObj.update_taskctl_status(globalVar.taskid_ctl, "TASK_BOARD_FAIL")
- loggerObj.getlogger().info("Station error: #Unable to complete extra tasks#")
+ logObj.getlogger().info("Execute test id: {} extra task".format(globalVar.testid_ctl))
+ execute_extra_task()
+ logObj.getlogger().info("Execute test id: {} wait read scanner".format(globalVar.testid_ctl))
+ getBarCode(AutoTest)
+ set_next_state(StationStates.FINISHED.name)
+ logObj.getlogger().info("Execute test id: {} wait read scanner finish".format(globalVar.testid_ctl))
else:
+ logObj.getlogger().info("Execute test id: {} finished with one or more fails".format(globalVar.testid_ctl))
# Change state to "TESTS_FAILED"
set_next_state(StationStates.TESTS_FAILED.name)
@@ -302,28 +344,75 @@ def main():
reboot_if_autotest()
+def enableFaulthandler():
+ """ Enable faulthandler for all threads.
+
+ If the faulthandler package is available, this function disables and then
+ re-enables fault handling for all threads (this is necessary to ensure any
+ new threads are handled correctly), and returns True.
+
+ If faulthandler is not available, then returns False.
+ """
+ try:
+ import faulthandler
+ # necessary to disable first or else new threads may not be handled.
+ faulthandler.disable()
+ faulthandler.enable(all_threads=True)
+ return True
+ except ImportError:
+ return False
+
if __name__ == "__main__":
# Clear the shell screen
clear()
- test_abspath = os.path.dirname(os.path.abspath(__file__))
- # create logger
- loggerObj = ISEE_Logger(logging.INFO)
+ enableFaulthandler()
+
+ test_abspath = os.path.dirname(os.path.abspath(__file__))
+ configFile = 'setup.xml'
+ loglevel = 'INFO'
+ logObj.getlogger().info("Test program Started : {}".format(datetime.now()))
+ try:
+ argv = sys.argv[1:]
+ opts, args = getopt.getopt(argv, 'c:l:', ['cfg', 'loglevel'])
+ for opt, args in opts:
+ if opt in ('-c', '--setup'):
+ configFile = args
+ if opt in ('-l', '--loglevel'):
+ loglevel = args
+ except getopt.GetoptError as err:
+ logObj.getlogger().error('#{}#'.format(err))
+
+ if loglevel != 'INFO':
+ logObj.setLogLevel(loglevel)
# Try to parse the setup.xml file
+ logObj.getlogger().debug("parse config file: {}". format(configFile))
try:
- xmlObj = XMLSetup(os.path.join(test_abspath, "setup.xml"))
- except:
+ xmlObj = XMLSetup(os.path.join(test_abspath, configFile))
+ except XMLParser.ParseError as error:
# Error
- execute_station_error("Cannot parse setup.xml file")
+ execute_station_error("Error parsing {} file {}".format(configFile, error.msg))
# Try to connect to the DB, according to setup.xml configuration
if not psdbObj.open(xmlObj):
# Error
execute_station_error("Cannot open DB connection")
+ else:
+ logObj.getlogger().debug("database connection failed: {}".format(psdbObj.getConfig()))
# get station name
globalVar.station = socket.gethostname()
+ logObj.getlogger().info("Test Station: {}".format(globalVar.station))
+
+ if psdbObj.get_setup_variable("AUTOTEST_" + globalVar.station, xmlObj.getKeyVal("test_main", "autotest", "0")) == '1':
+ AutoTest = True
+
+ OverrideAutoReboot = str2bool(xmlObj.getKeyVal("test_main", "OverrideAutoReboot", "0"))
+ delay_reboot = xmlObj.getKeyVal("test_main", "delayReboot", "0")
+
+ logObj.getlogger().info("AutoTest: {} OverrideAutoReboot: {} delay Reboot: {}".format(AutoTest, OverrideAutoReboot, delay_reboot))
+
# Check if current state is "WAIT_TEST_START". if not, waits here
check_first_state()
@@ -331,28 +420,7 @@ if __name__ == "__main__":
# Change state to "TESTS_CHECKING_ENV"
set_next_state(StationStates.TESTS_CHECKING_ENV.name)
- # Wait before beginning
- time.sleep(2)
-
- # Look for a barcode scanner
- qr = QRReader()
- if qr.openQR():
- qr.closeQR()
- else:
- # Error
- execute_station_error("Cannot find a barcode scanner")
-
- # Look for an amperimeter
- amp = Amper()
- if not amp.open():
- # Error
- execute_station_error("Cannot open an amperimeter COM port")
- if not amp.hello():
- if not amp.hello():
- # Error
- execute_station_error("Cannot open find an amperimeter")
-
- # Execute main
- main()
+ # execute main program
+ execute_test()
exit(0)