From 09de774dcc1a5abc1c8f3a00fdb039aa3c522f52 Mon Sep 17 00:00:00 2001 From: Manel Caro Date: Wed, 4 Mar 2020 17:46:36 +0100 Subject: SOPA Initial Commit --- test-cli/test_main.py | 125 ++++++++++++++++++++++++++------------------------ 1 file changed, 66 insertions(+), 59 deletions(-) (limited to 'test-cli/test_main.py') diff --git a/test-cli/test_main.py b/test-cli/test_main.py index 3c4d1cb..a44b7d1 100644 --- a/test-cli/test_main.py +++ b/test-cli/test_main.py @@ -6,6 +6,7 @@ import sys import os import unittest from test.helpers.testsrv_db import TestSrv_Database +from test.helpers.setup_xml import XMLSetup from test.runners.simple import SimpleTestRunner from test.tests.qbutton import Qbutton from test.helpers.syscmd import TestSysCommand @@ -24,8 +25,17 @@ from test.tests.qrtc import Qrtc from test.tests.qduplex_ser import Qduplex from test.tests.qamp import Qamp from test.tests.qflash import Qflasher -from test.helpers.finisher import Finisher +from test.tests.qnand import Qnand from test.helpers.globalVariables import globalVar +import socket +import re +from test.helpers.iseelogger import ISEE_Logger +import logging + + +psdbObj = TestSrv_Database() +xmlObj = None +loggerObj = None # define clear function def clear(): @@ -34,85 +44,82 @@ def clear(): def create_board(): - psdb = TestSrv_Database() - psdb.open("setup.xml") - tree = XMLParser.parse('setup.xml') - root = tree.getroot() - suite = unittest.TestSuite() - for element in root.iter('board'): - # print(str(element.tag) + str(element.attrib)) - model_id = element.attrib['model'] - variant = element.attrib['variant'] - nstation = element.attrib['station'] + model_id = xmlObj.gettagKey('board', 'model') + variant = xmlObj.gettagKey('board', 'variant') + + # get model id globalVar.g_mid=model_id + "-" + variant - globalVar.station=nstation - processor_id=genDieid(globalVar.g_mid) + + # get station number + hstname = socket.gethostname() + if (re.search("^Station\d{2}$", hstname)): + globalVar.station = int(re.search("\d{2}", hstname).group(0)) + else: + raise Exception('Station number not configured') + exit(3) + + processor_id = genDieid(globalVar.g_mid) print(globalVar.g_mid) print(processor_id) - globalVar.g_uuid = psdb.create_board(processor_id, model_id, variant, bmac = None) + s = globalVar.g_uuid = psdbObj.create_board(processor_id, model_id, variant, bmac = None) + +def createvarlist(testvars): + varlist = {} + for row in testvars: + varname,testparam = row + varlist[varname] = testparam + return varlist + def testsuite(): - psdb=TestSrv_Database() - psdb.open("setup.xml") suite = unittest.TestSuite() - tests=psdb.getboard_comp_test_list(globalVar.g_uuid) - for i in range(len(tests)): - #newstr = oldstr.replace("M", "") - variables=str(tests[i][0]).split(",") - testname=variables[0].replace('(', '') - testdes=variables[1] - testfunc=variables[2] - if len(tests)>2: - testparam=variables[3].replace(')', '') - testparam = testparam.replace('"', '') - testparam = testparam.replace(';', "','") - if testparam == "": - command = "suite.addTest({}('{}','execute'))".format(testfunc, testname) - else: - command="suite.addTest({}('{}','execute','{}'))".format(testfunc,testname,testparam) - else: - print(testname) - command = "suite.addTest({}('{}','execute'))".format(testfunc, testname) + tests = psdbObj.getboard_comp_test_list(globalVar.g_uuid) + for row in tests: + testdefname,testfunc = row + testvars = psdbObj.getboard_test_variables(globalVar.g_uuid, testdefname) + varlist = createvarlist(testvars) + command = "suite.addTest({}('{}','execute', varlist))".format(testfunc, testdefname) exec(command) - globalVar.testid_ctl=psdb.open_testbatch(globalVar.g_uuid) + + globalVar.testid_ctl=psdbObj.open_testbatch(globalVar.g_uuid) return suite def finish_test(): - psdb = TestSrv_Database() - psdb.open("setup.xml") - auxs = psdb.close_testbatch(globalVar.g_uuid, globalVar.testid_ctl) - globalVar.fstatus = auxs[0][0] - # Burn eeprom struct - psdb = TestSrv_Database() - psdb.open("setup.xml") - # We should call getboard_eeprom only if test was ok - if globalVar.fstatus: - aux = psdb.getboard_eeprom(globalVar.g_uuid) - finish = Finisher(aux) - finish.end_ok() - else: - finish = Finisher(globalVar.g_uuid) - finish.end_fail() - # Update set_test current_test with 'END' so that it finally gets painted in green - psdb = TestSrv_Database() - psdb.open("setup.xml") - psdb.update_set_test_row(globalVar.station, globalVar.testid_ctl, globalVar.g_uuid, "END","FINISH") + psdbObj.close_testbatch(globalVar.g_uuid, globalVar.testid_ctl) + # Update Set Test status with FINISH so that status column is not modified because close_testbatch already did it. + psdbObj.update_set_test_row(globalVar.station, globalVar.testid_ctl, globalVar.g_uuid, 'END', 'FINISH') def main(): - #addtesttomodel() - #addtestdef() create_board() - #globalVar.g_uuid = "1f59c654-0cc6-11e8-8d51-e644f56b8edd" try: os.remove("test_results.dat") except: pass - runner = SimpleTestRunner() + runner = SimpleTestRunner(psdbObj) runner.run(testsuite()) finish_test() if __name__ == "__main__": + # Clear the shell screen clear() - main() + + # create logger + loggerObj = ISEE_Logger(logging.INFO) + # logger = loggerObj.getlogger().info("Starting test script...") + + # Try to parse the setup.xml file + try: + xmlObj = XMLSetup("setup.xml") + except: + print("Error: Cannot parse setup.xml file") + exit(1) + + # Try to connect to the DB, according to setup.xml configuration + if psdbObj.open(xmlObj): + main() + else: + print("Error: Cannot open DB connection") + exit(2) + -- cgit v1.1 From 7490324bc98248fc82be814920e2deff4114acc8 Mon Sep 17 00:00:00 2001 From: Manel Caro Date: Wed, 4 Mar 2020 19:38:10 +0100 Subject: Added station --- test-cli/test_main.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'test-cli/test_main.py') diff --git a/test-cli/test_main.py b/test-cli/test_main.py index a44b7d1..6680082 100644 --- a/test-cli/test_main.py +++ b/test-cli/test_main.py @@ -61,7 +61,7 @@ def create_board(): processor_id = genDieid(globalVar.g_mid) print(globalVar.g_mid) print(processor_id) - s = globalVar.g_uuid = psdbObj.create_board(processor_id, model_id, variant, bmac = None) + globalVar.g_uuid = psdbObj.create_board(processor_id, model_id, variant, bmac = None) def createvarlist(testvars): varlist = {} @@ -81,7 +81,7 @@ def testsuite(): command = "suite.addTest({}('{}','execute', varlist))".format(testfunc, testdefname) exec(command) - globalVar.testid_ctl=psdbObj.open_testbatch(globalVar.g_uuid) + globalVar.testid_ctl=psdbObj.open_testbatch(globalVar.g_uuid, globalVar.station) return suite -- cgit v1.1 From a615dac03a9ea7897bf3947ba8d31278b2cea121 Mon Sep 17 00:00:00 2001 From: Hector Fernandez Date: Thu, 5 Mar 2020 16:29:31 +0100 Subject: Modify tests. --- test-cli/test_main.py | 30 +++++++++++++----------------- 1 file changed, 13 insertions(+), 17 deletions(-) (limited to 'test-cli/test_main.py') diff --git a/test-cli/test_main.py b/test-cli/test_main.py index 6680082..b657f00 100644 --- a/test-cli/test_main.py +++ b/test-cli/test_main.py @@ -32,15 +32,15 @@ import re from test.helpers.iseelogger import ISEE_Logger import logging - psdbObj = TestSrv_Database() xmlObj = None loggerObj = None + # define clear function def clear(): # check and make call for specific operating system - _ = call('clear' if os.name =='posix' else 'cls') + _ = call('clear' if os.name == 'posix' else 'cls') def create_board(): @@ -48,25 +48,21 @@ def create_board(): variant = xmlObj.gettagKey('board', 'variant') # get model id - globalVar.g_mid=model_id + "-" + variant + globalVar.g_mid = model_id + "-" + variant - # get station number - hstname = socket.gethostname() - if (re.search("^Station\d{2}$", hstname)): - globalVar.station = int(re.search("\d{2}", hstname).group(0)) - else: - raise Exception('Station number not configured') - exit(3) + # get station + globalVar.station = socket.gethostname() processor_id = genDieid(globalVar.g_mid) print(globalVar.g_mid) print(processor_id) - globalVar.g_uuid = psdbObj.create_board(processor_id, model_id, variant, bmac = None) + globalVar.g_uuid = psdbObj.create_board(processor_id, model_id, variant, bmac=None) + def createvarlist(testvars): varlist = {} for row in testvars: - varname,testparam = row + varname, testparam = row varlist[varname] = testparam return varlist @@ -75,13 +71,13 @@ def testsuite(): suite = unittest.TestSuite() tests = psdbObj.getboard_comp_test_list(globalVar.g_uuid) for row in tests: - testdefname,testfunc = row + testdefname, testfunc = row testvars = psdbObj.getboard_test_variables(globalVar.g_uuid, testdefname) varlist = createvarlist(testvars) command = "suite.addTest({}('{}','execute', varlist))".format(testfunc, testdefname) exec(command) - - globalVar.testid_ctl=psdbObj.open_testbatch(globalVar.g_uuid, globalVar.station) + + globalVar.testid_ctl = psdbObj.open_testbatch(globalVar.g_uuid, globalVar.station) return suite @@ -90,6 +86,7 @@ def finish_test(): # Update Set Test status with FINISH so that status column is not modified because close_testbatch already did it. psdbObj.update_set_test_row(globalVar.station, globalVar.testid_ctl, globalVar.g_uuid, 'END', 'FINISH') + def main(): create_board() try: @@ -120,6 +117,5 @@ if __name__ == "__main__": if psdbObj.open(xmlObj): main() else: - print("Error: Cannot open DB connection") + print("Error: Cannot open DB connection") exit(2) - -- cgit v1.1 From dd9ffc52507c391271d0821636c683fd7562b46a Mon Sep 17 00:00:00 2001 From: Hector Fernandez Date: Thu, 5 Mar 2020 18:50:47 +0100 Subject: Modified postgre functions. --- test-cli/test_main.py | 106 ++++++++++++++++++++++++++++++++------------------ 1 file changed, 68 insertions(+), 38 deletions(-) (limited to 'test-cli/test_main.py') diff --git a/test-cli/test_main.py b/test-cli/test_main.py index b657f00..8648b6b 100644 --- a/test-cli/test_main.py +++ b/test-cli/test_main.py @@ -10,8 +10,6 @@ from test.helpers.setup_xml import XMLSetup from test.runners.simple import SimpleTestRunner from test.tests.qbutton import Qbutton from test.helpers.syscmd import TestSysCommand -from test.helpers.syscmd import SysCommand -from test.tests.qiperf import QIperf from test.tests.qethernet import Qethernet from test.tests.qaudio import Qaudio from test.tests.qram import Qram @@ -28,7 +26,6 @@ from test.tests.qflash import Qflasher from test.tests.qnand import Qnand from test.helpers.globalVariables import globalVar import socket -import re from test.helpers.iseelogger import ISEE_Logger import logging @@ -43,6 +40,72 @@ def clear(): _ = call('clear' if os.name == 'posix' else 'cls') +def create_paramslist(params): + paramlist = {} + for row in params: + varname, varvalue = row + paramlist[varname] = varvalue + return paramlist + + +def add_test_task(suite, testdefname, paramlist): + testfunc = None + + if testdefname == "AUDIO": + suite.addTest(Qaudio(testdefname, "execute", paramlist)) + elif testdefname == "RAM": + suite.addTest(Qram(testdefname, "execute", paramlist)) + elif testdefname == "SERIALDUAL": + suite.addTest(Qduplex(testdefname, "execute", paramlist)) + elif testdefname == "EEPROM": + suite.addTest(Qeeprom(testdefname, "execute", paramlist)) + elif testdefname == "SERIAL": + suite.addTest(Qserial(testdefname, "execute", paramlist)) + # elif testdefname == "HDMI": + # suite.addTest(Qhdmi(testdefname, "execute", paramlist)) + elif testdefname == "BUTTON": + suite.addTest(Qbutton(testdefname, "execute", paramlist)) + elif testdefname == "RTC": + suite.addTest(Qrtc(testdefname, "execute", paramlist)) + elif testdefname == "CONSUMPTION": + suite.addTest(Qamp(testdefname, "execute", paramlist)) + # elif testdefname == "SATA": + # suite.addTest(Qsata(testdefname, "execute", paramlist)) + elif testdefname == "DMESG": + suite.addTest(TestSysCommand(testdefname, "execute", paramlist)) + elif testdefname == "ETHERNET": + suite.addTest(Qethernet(testdefname, "execute", paramlist)) + elif testdefname == "NAND": + suite.addTest(Qnand(testdefname, "execute", paramlist)) + elif testdefname == "I2C": + suite.addTest(Qi2c(testdefname, "execute", paramlist)) + elif testdefname == "WIFI": + suite.addTest(Qwifi(testdefname, "execute", paramlist)) + elif testdefname == "USB": + suite.addTest(Qusb(testdefname, "execute", paramlist)) + else: + raise Exception("Wrong testdefname") + + +def create_testsuite(): + # create an object TestSuite + suite = unittest.TestSuite() + # get list of tests for this board + tests = psdbObj.get_tests_list(globalVar.g_uuid) + # loop in every test for this board + for row in tests: + testid, testdefname = row + # get params for this test + params = psdbObj.get_test_params_list(testid) + paramlist = create_paramslist(params) + # add test to TestSuite + add_test_task(suite, testdefname, paramlist) + + globalVar.testid_ctl = psdbObj.open_test(globalVar.g_uuid) + + return suite + + def create_board(): model_id = xmlObj.gettagKey('board', 'model') variant = xmlObj.gettagKey('board', 'variant') @@ -56,46 +119,13 @@ def create_board(): processor_id = genDieid(globalVar.g_mid) print(globalVar.g_mid) print(processor_id) - globalVar.g_uuid = psdbObj.create_board(processor_id, model_id, variant, bmac=None) - - -def createvarlist(testvars): - varlist = {} - for row in testvars: - varname, testparam = row - varlist[varname] = testparam - return varlist - - -def testsuite(): - suite = unittest.TestSuite() - tests = psdbObj.getboard_comp_test_list(globalVar.g_uuid) - for row in tests: - testdefname, testfunc = row - testvars = psdbObj.getboard_test_variables(globalVar.g_uuid, testdefname) - varlist = createvarlist(testvars) - command = "suite.addTest({}('{}','execute', varlist))".format(testfunc, testdefname) - exec(command) - - globalVar.testid_ctl = psdbObj.open_testbatch(globalVar.g_uuid, globalVar.station) - return suite - - -def finish_test(): - psdbObj.close_testbatch(globalVar.g_uuid, globalVar.testid_ctl) - # Update Set Test status with FINISH so that status column is not modified because close_testbatch already did it. - psdbObj.update_set_test_row(globalVar.station, globalVar.testid_ctl, globalVar.g_uuid, 'END', 'FINISH') + globalVar.g_uuid = psdbObj.create_board(processor_id, model_id, variant, globalVar.station, bmac=None) def main(): create_board() - try: - os.remove("test_results.dat") - except: - pass runner = SimpleTestRunner(psdbObj) - runner.run(testsuite()) - finish_test() + runner.run(create_testsuite()) if __name__ == "__main__": -- cgit v1.1 From 98d40cecc9818360984188755e455aa53933aab0 Mon Sep 17 00:00:00 2001 From: Hector Fernandez Date: Fri, 6 Mar 2020 09:38:18 +0100 Subject: changed parameter names of tests --- test-cli/test_main.py | 2 ++ 1 file changed, 2 insertions(+) (limited to 'test-cli/test_main.py') diff --git a/test-cli/test_main.py b/test-cli/test_main.py index 8648b6b..c09d703 100644 --- a/test-cli/test_main.py +++ b/test-cli/test_main.py @@ -63,6 +63,8 @@ def add_test_task(suite, testdefname, paramlist): suite.addTest(Qserial(testdefname, "execute", paramlist)) # elif testdefname == "HDMI": # suite.addTest(Qhdmi(testdefname, "execute", paramlist)) + # elif testdefname == "SCREEN": + # suite.addTest(Qscreen(testdefname, "execute", paramlist)) elif testdefname == "BUTTON": suite.addTest(Qbutton(testdefname, "execute", paramlist)) elif testdefname == "RTC": -- cgit v1.1 From 9f07a57d89a927aa9b172c1bf20c7ab563658c73 Mon Sep 17 00:00:00 2001 From: Hector Fernandez Date: Fri, 6 Mar 2020 12:46:27 +0100 Subject: Fixed multiple errors. --- test-cli/test_main.py | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) (limited to 'test-cli/test_main.py') diff --git a/test-cli/test_main.py b/test-cli/test_main.py index c09d703..48ec534 100644 --- a/test-cli/test_main.py +++ b/test-cli/test_main.py @@ -1,4 +1,5 @@ -from test.helpers.get_dieid import genDieid +from test.helpers.int_registers import get_die_id +from test.helpers.int_registers import get_mac from subprocess import call import xml.etree.ElementTree as XMLParser import errno @@ -92,6 +93,8 @@ def add_test_task(suite, testdefname, paramlist): def create_testsuite(): # create an object TestSuite suite = unittest.TestSuite() + # get id of the full test for this board + globalVar.testid_ctl = psdbObj.open_test(globalVar.g_uuid) # get list of tests for this board tests = psdbObj.get_tests_list(globalVar.g_uuid) # loop in every test for this board @@ -100,11 +103,13 @@ def create_testsuite(): # get params for this test params = psdbObj.get_test_params_list(testid) paramlist = create_paramslist(params) + # add the testid as parameter + paramlist["testid"] = testid + paramlist["boarduuid"] = globalVar.g_uuid + paramlist["testidctl"] = globalVar.testid_ctl # add test to TestSuite add_test_task(suite, testdefname, paramlist) - globalVar.testid_ctl = psdbObj.open_test(globalVar.g_uuid) - return suite @@ -118,10 +123,11 @@ def create_board(): # get station globalVar.station = socket.gethostname() - processor_id = genDieid(globalVar.g_mid) + processor_id = get_die_id(globalVar.g_mid) print(globalVar.g_mid) print(processor_id) - globalVar.g_uuid = psdbObj.create_board(processor_id, model_id, variant, globalVar.station, bmac=None) + globalVar.g_uuid = psdbObj.create_board(processor_id, model_id, variant, globalVar.station, + get_mac(globalVar.g_mid)) def main(): -- cgit v1.1 From a03055f657d2e970e45e7ea2a3e66857f821eabd Mon Sep 17 00:00:00 2001 From: Hector Fernandez Date: Mon, 9 Mar 2020 09:03:28 +0100 Subject: Solved problems with consumption test. Fixed error when getting mac address. --- test-cli/test_main.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'test-cli/test_main.py') diff --git a/test-cli/test_main.py b/test-cli/test_main.py index 48ec534..0e863dc 100644 --- a/test-cli/test_main.py +++ b/test-cli/test_main.py @@ -22,7 +22,7 @@ from test.tests.qscreen import Qscreen from test.tests.qwifi import Qwifi from test.tests.qrtc import Qrtc from test.tests.qduplex_ser import Qduplex -from test.tests.qamp import Qamp +from test.tests.qamper import Qamper from test.tests.qflash import Qflasher from test.tests.qnand import Qnand from test.helpers.globalVariables import globalVar @@ -71,7 +71,7 @@ def add_test_task(suite, testdefname, paramlist): elif testdefname == "RTC": suite.addTest(Qrtc(testdefname, "execute", paramlist)) elif testdefname == "CONSUMPTION": - suite.addTest(Qamp(testdefname, "execute", paramlist)) + suite.addTest(Qamper(testdefname, "execute", paramlist)) # elif testdefname == "SATA": # suite.addTest(Qsata(testdefname, "execute", paramlist)) elif testdefname == "DMESG": -- cgit v1.1 From d38c92bfd7b6abe3a52b51b87b1a2949b857d9b4 Mon Sep 17 00:00:00 2001 From: Hector Fernandez Date: Mon, 9 Mar 2020 19:16:08 +0100 Subject: Created function to flash the eeprom memory. --- test-cli/test_main.py | 62 ++++++++++++++++++++++++++++++++++++++------------- 1 file changed, 46 insertions(+), 16 deletions(-) (limited to 'test-cli/test_main.py') diff --git a/test-cli/test_main.py b/test-cli/test_main.py index 0e863dc..f8e3c1a 100644 --- a/test-cli/test_main.py +++ b/test-cli/test_main.py @@ -1,15 +1,11 @@ from test.helpers.int_registers import get_die_id from test.helpers.int_registers import get_mac from subprocess import call -import xml.etree.ElementTree as XMLParser -import errno -import sys import os import unittest from test.helpers.testsrv_db import TestSrv_Database from test.helpers.setup_xml import XMLSetup from test.runners.simple import SimpleTestRunner -from test.tests.qbutton import Qbutton from test.helpers.syscmd import TestSysCommand from test.tests.qethernet import Qethernet from test.tests.qaudio import Qaudio @@ -18,17 +14,16 @@ from test.tests.qusb import Qusb from test.tests.qi2c import Qi2c from test.tests.qeeprom import Qeeprom from test.tests.qserial import Qserial -from test.tests.qscreen import Qscreen from test.tests.qwifi import Qwifi from test.tests.qrtc import Qrtc from test.tests.qduplex_ser import Qduplex from test.tests.qamper import Qamper -from test.tests.qflash import Qflasher from test.tests.qnand import Qnand from test.helpers.globalVariables import globalVar import socket from test.helpers.iseelogger import ISEE_Logger import logging +import binascii psdbObj = TestSrv_Database() xmlObj = None @@ -50,8 +45,6 @@ def create_paramslist(params): def add_test_task(suite, testdefname, paramlist): - testfunc = None - if testdefname == "AUDIO": suite.addTest(Qaudio(testdefname, "execute", paramlist)) elif testdefname == "RAM": @@ -62,18 +55,10 @@ def add_test_task(suite, testdefname, paramlist): suite.addTest(Qeeprom(testdefname, "execute", paramlist)) elif testdefname == "SERIAL": suite.addTest(Qserial(testdefname, "execute", paramlist)) - # elif testdefname == "HDMI": - # suite.addTest(Qhdmi(testdefname, "execute", paramlist)) - # elif testdefname == "SCREEN": - # suite.addTest(Qscreen(testdefname, "execute", paramlist)) - elif testdefname == "BUTTON": - suite.addTest(Qbutton(testdefname, "execute", paramlist)) elif testdefname == "RTC": suite.addTest(Qrtc(testdefname, "execute", paramlist)) elif testdefname == "CONSUMPTION": suite.addTest(Qamper(testdefname, "execute", paramlist)) - # elif testdefname == "SATA": - # suite.addTest(Qsata(testdefname, "execute", paramlist)) elif testdefname == "DMESG": suite.addTest(TestSysCommand(testdefname, "execute", paramlist)) elif testdefname == "ETHERNET": @@ -130,10 +115,55 @@ def create_board(): get_mac(globalVar.g_mid)) +def program_eeprom(eeprompath): + # check if eeprompath is correct + if os.path.isfile(eeprompath): + # create u-boot data struct + data = bytearray() + data += (2029785358).to_bytes(4, 'big') # magicid --> 0x78FC110E + data += bytearray([0, 0, 0, 0]) # crc32 = 0 + data += bytearray(globalVar.g_uuid + "\0", + 'ascii') # uuid --> 'c0846c8a-5fa5-11ea-8576-f8b156ac62d7' and \0 at the end + data += binascii.unhexlify((get_mac(globalVar.g_mid)).replace(':', '')) # mac0 --> 'f8:b1:56:ac:62:d7' + data += bytearray([0, 0, 0, 0, 0, 0]) # mac1 --> 0:0:0:0:0:0 + # calculate crc + crc = (binascii.crc32(data, 0)).to_bytes(4, 'big') + data[4:8] = crc + # write into eeprom an validate + f = open(eeprompath, "r+b") + f.write(data) + f.seek(0) + data_rx = f.read(57) + for i in range(57): + if data_rx[i] != data[i]: + print("Error while programming eeprom memory.") + break + print("Eeprom programmed succesfully.") + + +def flash_nvmemory(flashscript): + if os.path.isfile(flashscript): + print("a") + + +def create_boardvariables_list(uuid): + varlist = {} + for row in psdbObj.get_board_variables(globalVar.g_uuid): + varname, varvalue = row + varlist[varname] = varvalue + return varlist + + def main(): + # initialize the board create_board() + # create and run tests according to the board type runner = SimpleTestRunner(psdbObj) runner.run(create_testsuite()) + # execute aditional tasks + varlist = create_boardvariables_list(globalVar.g_uuid) + program_eeprom(varlist["eeprompath"]) + flash_nvmemory(varlist["flashscript"]) if __name__ == "__main__": -- cgit v1.1 From 41ffba6a76a80a7ef4553cb8856393dd209d172e Mon Sep 17 00:00:00 2001 From: Hector Fernandez Date: Tue, 10 Mar 2020 17:37:16 +0100 Subject: First release of the test. All the tests work correctly for SOPA0000. --- test-cli/test_main.py | 30 +++++++++++++++++------------- 1 file changed, 17 insertions(+), 13 deletions(-) (limited to 'test-cli/test_main.py') diff --git a/test-cli/test_main.py b/test-cli/test_main.py index f8e3c1a..6e56214 100644 --- a/test-cli/test_main.py +++ b/test-cli/test_main.py @@ -24,7 +24,9 @@ import socket from test.helpers.iseelogger import ISEE_Logger import logging import binascii +from test.flashers.flasher import flash +# global variables psdbObj = TestSrv_Database() xmlObj = None loggerObj = None @@ -116,6 +118,7 @@ def create_board(): def program_eeprom(eeprompath): + print("Start programming Eeprom...") # check if eeprompath is correct if os.path.isfile(eeprompath): # create u-boot data struct @@ -124,12 +127,13 @@ def program_eeprom(eeprompath): data += bytearray([0, 0, 0, 0]) # crc32 = 0 data += bytearray(globalVar.g_uuid + "\0", 'ascii') # uuid --> 'c0846c8a-5fa5-11ea-8576-f8b156ac62d7' and \0 at the end - data += binascii.unhexlify((get_mac(globalVar.g_mid)).replace(':', '')) # mac0 --> 'f8:b1:56:ac:62:d7' + mac0 = get_mac(globalVar.g_mid) + data += binascii.unhexlify(mac0.replace(':', '')) # mac0 --> 'f8:b1:56:ac:62:d7' data += bytearray([0, 0, 0, 0, 0, 0]) # mac1 --> 0:0:0:0:0:0 # calculate crc crc = (binascii.crc32(data, 0)).to_bytes(4, 'big') data[4:8] = crc - # write into eeprom an validate + # write into eeprom and read back f = open(eeprompath, "r+b") f.write(data) f.seek(0) @@ -137,13 +141,12 @@ def program_eeprom(eeprompath): for i in range(57): if data_rx[i] != data[i]: print("Error while programming eeprom memory.") - break + return 1 print("Eeprom programmed succesfully.") - - -def flash_nvmemory(flashscript): - if os.path.isfile(flashscript): - print("a") + return 0 + else: + print("eeprom memory not found.") + return 1 def create_boardvariables_list(uuid): @@ -159,11 +162,12 @@ def main(): create_board() # create and run tests according to the board type runner = SimpleTestRunner(psdbObj) - runner.run(create_testsuite()) - # execute aditional tasks - varlist = create_boardvariables_list(globalVar.g_uuid) - program_eeprom(varlist["eeprompath"]) - flash_nvmemory(varlist["flashscript"]) + testresult = runner.run(create_testsuite()) + # execute aditional tasks, only if the test was succesfull + if testresult.wasSuccessful(): + varlist = create_boardvariables_list(globalVar.g_uuid) + program_eeprom(varlist["eeprompath"]) + flash(globalVar.g_mid) if __name__ == "__main__": -- cgit v1.1 From e74e0a36d9ad6a01c04500f3a24cb0ef5dd0b283 Mon Sep 17 00:00:00 2001 From: Hector Fernandez Date: Fri, 13 Mar 2020 12:17:51 +0100 Subject: Added final flashing tasks: eeprom and nand. --- test-cli/test_main.py | 77 +++++++++++++++++++++++++-------------------------- 1 file changed, 38 insertions(+), 39 deletions(-) (limited to 'test-cli/test_main.py') diff --git a/test-cli/test_main.py b/test-cli/test_main.py index 6e56214..6681d4e 100644 --- a/test-cli/test_main.py +++ b/test-cli/test_main.py @@ -23,8 +23,8 @@ from test.helpers.globalVariables import globalVar import socket from test.helpers.iseelogger import ISEE_Logger import logging -import binascii -from test.flashers.flasher import flash +from test.flashers.flasheeprom import flash_eeprom +from test.flashers.flashmemory import flash_memory # global variables psdbObj = TestSrv_Database() @@ -80,8 +80,6 @@ def add_test_task(suite, testdefname, paramlist): def create_testsuite(): # create an object TestSuite suite = unittest.TestSuite() - # get id of the full test for this board - globalVar.testid_ctl = psdbObj.open_test(globalVar.g_uuid) # get list of tests for this board tests = psdbObj.get_tests_list(globalVar.g_uuid) # loop in every test for this board @@ -117,41 +115,9 @@ def create_board(): get_mac(globalVar.g_mid)) -def program_eeprom(eeprompath): - print("Start programming Eeprom...") - # check if eeprompath is correct - if os.path.isfile(eeprompath): - # create u-boot data struct - data = bytearray() - data += (2029785358).to_bytes(4, 'big') # magicid --> 0x78FC110E - data += bytearray([0, 0, 0, 0]) # crc32 = 0 - data += bytearray(globalVar.g_uuid + "\0", - 'ascii') # uuid --> 'c0846c8a-5fa5-11ea-8576-f8b156ac62d7' and \0 at the end - mac0 = get_mac(globalVar.g_mid) - data += binascii.unhexlify(mac0.replace(':', '')) # mac0 --> 'f8:b1:56:ac:62:d7' - data += bytearray([0, 0, 0, 0, 0, 0]) # mac1 --> 0:0:0:0:0:0 - # calculate crc - crc = (binascii.crc32(data, 0)).to_bytes(4, 'big') - data[4:8] = crc - # write into eeprom and read back - f = open(eeprompath, "r+b") - f.write(data) - f.seek(0) - data_rx = f.read(57) - for i in range(57): - if data_rx[i] != data[i]: - print("Error while programming eeprom memory.") - return 1 - print("Eeprom programmed succesfully.") - return 0 - else: - print("eeprom memory not found.") - return 1 - - def create_boardvariables_list(uuid): varlist = {} - for row in psdbObj.get_board_variables(globalVar.g_uuid): + for row in psdbObj.get_board_variables(uuid): varname, varvalue = row varlist[varname] = varvalue return varlist @@ -160,14 +126,47 @@ def create_boardvariables_list(uuid): def main(): # initialize the board create_board() + # create a process + globalVar.testid_ctl = psdbObj.open_test(globalVar.g_uuid) + globalVar.taskid_ctl = psdbObj.create_process(globalVar.testid_ctl) # create and run tests according to the board type runner = SimpleTestRunner(psdbObj) testresult = runner.run(create_testsuite()) # execute aditional tasks, only if the test was succesfull if testresult.wasSuccessful(): + # change status to "running tasks" + psdbObj.update_taskctl_status(globalVar.taskid_ctl, "TASK_BOARD_RUNNING") + # get extra variables varlist = create_boardvariables_list(globalVar.g_uuid) - program_eeprom(varlist["eeprompath"]) - flash(globalVar.g_mid) + + resultmemory = 0 + # flash eeprom + if "eeprompath" in varlist and len(varlist["eeprompath"]) > 0: + mac0 = psdbObj.get_board_macaddr(globalVar.g_uuid) + resulteeprom = flash_eeprom(varlist["eeprompath"], globalVar.g_uuid, mac0) + psdbObj.create_task_result(globalVar.taskid_ctl, "FLASHEEPROM", + "TASK_OK" if resulteeprom == 0 else "TASK_FAIL") + else: + resulteeprom = 0 + psdbObj.create_task_result(globalVar.taskid_ctl, "FLASHEEPROM", "TASK_SKIP") + + # flash non-volatile memory + if "image" in varlist and len(varlist["image"]) > 0: + resultmemory = flash_memory(varlist["image"]) + psdbObj.create_task_result(globalVar.taskid_ctl, "FLASHMEMORY", + "TASK_OK" if resultmemory == 0 else "TASK_FAIL") + else: + resultmemory = 0 + psdbObj.create_task_result(globalVar.taskid_ctl, "FLASHMEMORY", "TASK_SKIP") + + # update status with the result + if resulteeprom == 0 and resultmemory == 0: + psdbObj.update_taskctl_status(globalVar.taskid_ctl, "TASK_BOARD_OK") + else: + psdbObj.update_taskctl_status(globalVar.taskid_ctl, "TASK_BOARD_FAIL") + else: + # change status to "not done" + psdbObj.update_taskctl_status(globalVar.taskid_ctl, "TASK_BOARD_NOT_DONE") if __name__ == "__main__": -- cgit v1.1 From 71d9a1f0b6bb4389cb9b0760ce4e847574fdcd45 Mon Sep 17 00:00:00 2001 From: Hector Fernandez Date: Fri, 13 Mar 2020 12:36:14 +0100 Subject: SOPA0000 test: Saved the variable data used during the eeprom and nand flashing. --- test-cli/test_main.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'test-cli/test_main.py') diff --git a/test-cli/test_main.py b/test-cli/test_main.py index 6681d4e..43a5b67 100644 --- a/test-cli/test_main.py +++ b/test-cli/test_main.py @@ -145,7 +145,7 @@ def main(): mac0 = psdbObj.get_board_macaddr(globalVar.g_uuid) resulteeprom = flash_eeprom(varlist["eeprompath"], globalVar.g_uuid, mac0) psdbObj.create_task_result(globalVar.taskid_ctl, "FLASHEEPROM", - "TASK_OK" if resulteeprom == 0 else "TASK_FAIL") + "TASK_OK" if resulteeprom == 0 else "TASK_FAIL", varlist["eeprompath"]) else: resulteeprom = 0 psdbObj.create_task_result(globalVar.taskid_ctl, "FLASHEEPROM", "TASK_SKIP") @@ -154,7 +154,7 @@ def main(): if "image" in varlist and len(varlist["image"]) > 0: resultmemory = flash_memory(varlist["image"]) psdbObj.create_task_result(globalVar.taskid_ctl, "FLASHMEMORY", - "TASK_OK" if resultmemory == 0 else "TASK_FAIL") + "TASK_OK" if resultmemory == 0 else "TASK_FAIL", varlist["image"]) else: resultmemory = 0 psdbObj.create_task_result(globalVar.taskid_ctl, "FLASHMEMORY", "TASK_SKIP") -- cgit v1.1 From a85534544fad3c51d8af8d65e7fde6cb5d07978b Mon Sep 17 00:00:00 2001 From: Hector Fernandez Date: Fri, 13 Mar 2020 14:30:21 +0100 Subject: Changed tasks to be independent of tests. --- test-cli/test_main.py | 23 +++++++---------------- 1 file changed, 7 insertions(+), 16 deletions(-) (limited to 'test-cli/test_main.py') diff --git a/test-cli/test_main.py b/test-cli/test_main.py index 43a5b67..37207cc 100644 --- a/test-cli/test_main.py +++ b/test-cli/test_main.py @@ -115,9 +115,9 @@ def create_board(): get_mac(globalVar.g_mid)) -def create_boardvariables_list(uuid): +def get_taskvars_list(uuid): varlist = {} - for row in psdbObj.get_board_variables(uuid): + for row in psdbObj.get_task_variables_list(uuid): varname, varvalue = row varlist[varname] = varvalue return varlist @@ -128,45 +128,36 @@ def main(): create_board() # create a process globalVar.testid_ctl = psdbObj.open_test(globalVar.g_uuid) - globalVar.taskid_ctl = psdbObj.create_process(globalVar.testid_ctl) # create and run tests according to the board type runner = SimpleTestRunner(psdbObj) testresult = runner.run(create_testsuite()) # execute aditional tasks, only if the test was succesfull if testresult.wasSuccessful(): - # change status to "running tasks" - psdbObj.update_taskctl_status(globalVar.taskid_ctl, "TASK_BOARD_RUNNING") + # create task control + globalVar.taskid_ctl = psdbObj.open_task(globalVar.g_uuid) # get extra variables - varlist = create_boardvariables_list(globalVar.g_uuid) + varlist = get_taskvars_list(globalVar.g_uuid) - resultmemory = 0 # flash eeprom + resulteeprom = 0 if "eeprompath" in varlist and len(varlist["eeprompath"]) > 0: mac0 = psdbObj.get_board_macaddr(globalVar.g_uuid) resulteeprom = flash_eeprom(varlist["eeprompath"], globalVar.g_uuid, mac0) psdbObj.create_task_result(globalVar.taskid_ctl, "FLASHEEPROM", "TASK_OK" if resulteeprom == 0 else "TASK_FAIL", varlist["eeprompath"]) - else: - resulteeprom = 0 - psdbObj.create_task_result(globalVar.taskid_ctl, "FLASHEEPROM", "TASK_SKIP") # flash non-volatile memory + resultmemory = 0 if "image" in varlist and len(varlist["image"]) > 0: resultmemory = flash_memory(varlist["image"]) psdbObj.create_task_result(globalVar.taskid_ctl, "FLASHMEMORY", "TASK_OK" if resultmemory == 0 else "TASK_FAIL", varlist["image"]) - else: - resultmemory = 0 - psdbObj.create_task_result(globalVar.taskid_ctl, "FLASHMEMORY", "TASK_SKIP") # update status with the result if resulteeprom == 0 and resultmemory == 0: psdbObj.update_taskctl_status(globalVar.taskid_ctl, "TASK_BOARD_OK") else: psdbObj.update_taskctl_status(globalVar.taskid_ctl, "TASK_BOARD_FAIL") - else: - # change status to "not done" - psdbObj.update_taskctl_status(globalVar.taskid_ctl, "TASK_BOARD_NOT_DONE") if __name__ == "__main__": -- cgit v1.1 From f013d0a76976a52fce979036df404826dcbf385b Mon Sep 17 00:00:00 2001 From: Hector Fernandez Date: Tue, 17 Mar 2020 18:27:59 +0100 Subject: Added return values after flashing, to be saved in the DB. --- test-cli/test_main.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'test-cli/test_main.py') diff --git a/test-cli/test_main.py b/test-cli/test_main.py index 37207cc..ddd62aa 100644 --- a/test-cli/test_main.py +++ b/test-cli/test_main.py @@ -142,9 +142,9 @@ def main(): resulteeprom = 0 if "eeprompath" in varlist and len(varlist["eeprompath"]) > 0: mac0 = psdbObj.get_board_macaddr(globalVar.g_uuid) - resulteeprom = flash_eeprom(varlist["eeprompath"], globalVar.g_uuid, mac0) + resulteeprom, eepromdata = flash_eeprom(varlist["eeprompath"], globalVar.g_uuid, mac0) psdbObj.create_task_result(globalVar.taskid_ctl, "FLASHEEPROM", - "TASK_OK" if resulteeprom == 0 else "TASK_FAIL", varlist["eeprompath"]) + "TASK_OK" if resulteeprom == 0 else "TASK_FAIL", eepromdata) # flash non-volatile memory resultmemory = 0 -- cgit v1.1 From f86887baef80509460da0bff8f48b2900a627282 Mon Sep 17 00:00:00 2001 From: Hector Fernandez Date: Mon, 13 Apr 2020 09:49:51 +0200 Subject: Added new DB functions. Added Qrreader.py file. --- test-cli/test_main.py | 34 ++++++++++++++++++++++++++-------- 1 file changed, 26 insertions(+), 8 deletions(-) (limited to 'test-cli/test_main.py') diff --git a/test-cli/test_main.py b/test-cli/test_main.py index ddd62aa..5abbe85 100644 --- a/test-cli/test_main.py +++ b/test-cli/test_main.py @@ -113,6 +113,8 @@ def create_board(): print(processor_id) globalVar.g_uuid = psdbObj.create_board(processor_id, model_id, variant, globalVar.station, get_mac(globalVar.g_mid)) + print(globalVar.g_uuid) + psdbObj.bond_to_station(globalVar.g_uuid, globalVar.station) def get_taskvars_list(uuid): @@ -130,14 +132,19 @@ def main(): globalVar.testid_ctl = psdbObj.open_test(globalVar.g_uuid) # create and run tests according to the board type runner = SimpleTestRunner(psdbObj) + loggerObj.getlogger().info("Tests running") testresult = runner.run(create_testsuite()) # execute aditional tasks, only if the test was succesfull - if testresult.wasSuccessful(): + # if testresult.wasSuccessful(): + if True: + loggerObj.getlogger().info("Extra tasks running") # create task control globalVar.taskid_ctl = psdbObj.open_task(globalVar.g_uuid) # get extra variables varlist = get_taskvars_list(globalVar.g_uuid) + alltasksok = False; + # flash eeprom resulteeprom = 0 if "eeprompath" in varlist and len(varlist["eeprompath"]) > 0: @@ -147,18 +154,29 @@ def main(): "TASK_OK" if resulteeprom == 0 else "TASK_FAIL", eepromdata) # flash non-volatile memory - resultmemory = 0 - if "image" in varlist and len(varlist["image"]) > 0: - resultmemory = flash_memory(varlist["image"]) - psdbObj.create_task_result(globalVar.taskid_ctl, "FLASHMEMORY", - "TASK_OK" if resultmemory == 0 else "TASK_FAIL", varlist["image"]) + # resultmemory = 0 + # if "image" in varlist and len(varlist["image"]) > 0: + # resultmemory = flash_memory(varlist["image"]) + # psdbObj.create_task_result(globalVar.taskid_ctl, "FLASHMEMORY", + # "TASK_OK" if resultmemory == 0 else "TASK_FAIL", varlist["image"]) # update status with the result - if resulteeprom == 0 and resultmemory == 0: + # if resulteeprom == 0 and resultmemory == 0: + if resulteeprom == 0: + alltasksok = True; psdbObj.update_taskctl_status(globalVar.taskid_ctl, "TASK_BOARD_OK") else: psdbObj.update_taskctl_status(globalVar.taskid_ctl, "TASK_BOARD_FAIL") + if alltasksok: + # get barcode using the scanner + loggerObj.getlogger().info("Waiting for barcode scanner") + # TODO ----- + factorycode = "XXXXX-XXXXX" + psdbObj.set_factorycode(globalVar.g_uuid, factorycode) + + loggerObj.getlogger().info("Python program finished") + if __name__ == "__main__": # Clear the shell screen @@ -166,7 +184,7 @@ if __name__ == "__main__": # create logger loggerObj = ISEE_Logger(logging.INFO) - # logger = loggerObj.getlogger().info("Starting test script...") + loggerObj.getlogger().info("Python program started") # Try to parse the setup.xml file try: -- cgit v1.1 From 99d74b02c7d3ff7eea51f59a2d67651aeab1297f Mon Sep 17 00:00:00 2001 From: Hector Fernandez Date: Wed, 15 Apr 2020 10:33:56 +0200 Subject: Added code scanner code after doing all tests. It waits for a valid code from the code scanner. --- test-cli/test_main.py | 27 ++++++++++++++++----------- 1 file changed, 16 insertions(+), 11 deletions(-) (limited to 'test-cli/test_main.py') diff --git a/test-cli/test_main.py b/test-cli/test_main.py index 5abbe85..dacc05a 100644 --- a/test-cli/test_main.py +++ b/test-cli/test_main.py @@ -25,6 +25,7 @@ from test.helpers.iseelogger import ISEE_Logger import logging from test.flashers.flasheeprom import flash_eeprom from test.flashers.flashmemory import flash_memory +from test.helpers.qrreader import QRReader # global variables psdbObj = TestSrv_Database() @@ -146,12 +147,12 @@ def main(): alltasksok = False; # flash eeprom - resulteeprom = 0 - if "eeprompath" in varlist and len(varlist["eeprompath"]) > 0: - mac0 = psdbObj.get_board_macaddr(globalVar.g_uuid) - resulteeprom, eepromdata = flash_eeprom(varlist["eeprompath"], globalVar.g_uuid, mac0) - psdbObj.create_task_result(globalVar.taskid_ctl, "FLASHEEPROM", - "TASK_OK" if resulteeprom == 0 else "TASK_FAIL", eepromdata) + # resulteeprom = 0 + # if "eeprompath" in varlist and len(varlist["eeprompath"]) > 0: + # mac0 = psdbObj.get_board_macaddr(globalVar.g_uuid) + # resulteeprom, eepromdata = flash_eeprom(varlist["eeprompath"], globalVar.g_uuid, mac0) + # psdbObj.create_task_result(globalVar.taskid_ctl, "FLASHEEPROM", + # "TASK_OK" if resulteeprom == 0 else "TASK_FAIL", eepromdata) # flash non-volatile memory # resultmemory = 0 @@ -162,8 +163,8 @@ def main(): # update status with the result # if resulteeprom == 0 and resultmemory == 0: - if resulteeprom == 0: - alltasksok = True; + if True: + alltasksok = True psdbObj.update_taskctl_status(globalVar.taskid_ctl, "TASK_BOARD_OK") else: psdbObj.update_taskctl_status(globalVar.taskid_ctl, "TASK_BOARD_FAIL") @@ -171,9 +172,13 @@ def main(): if alltasksok: # get barcode using the scanner loggerObj.getlogger().info("Waiting for barcode scanner") - # TODO ----- - factorycode = "XXXXX-XXXXX" - psdbObj.set_factorycode(globalVar.g_uuid, factorycode) + qr = QRReader() + if qr.openQR(): + # program remains here until gets a succesful code from the scanner + if qr.readQR(): + factorycode = qr.getQRNumber() + psdbObj.set_factorycode(globalVar.g_uuid, factorycode) + qr.closeQR() loggerObj.getlogger().info("Python program finished") -- cgit v1.1 From 47b65a71d5db142c73a0566f132af6eee3b0c2a9 Mon Sep 17 00:00:00 2001 From: Hector Fernandez Date: Wed, 15 Apr 2020 17:22:21 +0200 Subject: Every 5s teh qr scanner is refreshed to detect a new devices in case it is unplugged and plugged again. --- test-cli/test_main.py | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) (limited to 'test-cli/test_main.py') diff --git a/test-cli/test_main.py b/test-cli/test_main.py index dacc05a..8798cb3 100644 --- a/test-cli/test_main.py +++ b/test-cli/test_main.py @@ -172,13 +172,16 @@ def main(): if alltasksok: # get barcode using the scanner loggerObj.getlogger().info("Waiting for barcode scanner") - qr = QRReader() - if qr.openQR(): - # program remains here until gets a succesful code from the scanner - if qr.readQR(): - factorycode = qr.getQRNumber() - psdbObj.set_factorycode(globalVar.g_uuid, factorycode) - qr.closeQR() + qrreceived = False + while not qrreceived: + qr = QRReader() + if qr.openQR(): + # waits 5s to receive a valid code + if qr.readQRasync(5): + qrreceived = True + factorycode = qr.getQRNumber() + psdbObj.set_factorycode(globalVar.g_uuid, factorycode) + qr.closeQR() loggerObj.getlogger().info("Python program finished") -- cgit v1.1 From 6a680137694233008005415e22cf889b85baa292 Mon Sep 17 00:00:00 2001 From: Hector Fernandez Date: Thu, 16 Apr 2020 17:09:19 +0200 Subject: First release --- test-cli/test_main.py | 35 ++++++++++++++++++----------------- 1 file changed, 18 insertions(+), 17 deletions(-) (limited to 'test-cli/test_main.py') diff --git a/test-cli/test_main.py b/test-cli/test_main.py index 8798cb3..23250ca 100644 --- a/test-cli/test_main.py +++ b/test-cli/test_main.py @@ -31,6 +31,7 @@ from test.helpers.qrreader import QRReader psdbObj = TestSrv_Database() xmlObj = None loggerObj = None +test_abspath = None # define clear function @@ -136,34 +137,32 @@ def main(): loggerObj.getlogger().info("Tests running") testresult = runner.run(create_testsuite()) # execute aditional tasks, only if the test was succesfull - # if testresult.wasSuccessful(): - if True: + if testresult.wasSuccessful(): loggerObj.getlogger().info("Extra tasks running") # create task control globalVar.taskid_ctl = psdbObj.open_task(globalVar.g_uuid) # get extra variables varlist = get_taskvars_list(globalVar.g_uuid) - alltasksok = False; + alltasksok = False # flash eeprom - # resulteeprom = 0 - # if "eeprompath" in varlist and len(varlist["eeprompath"]) > 0: - # mac0 = psdbObj.get_board_macaddr(globalVar.g_uuid) - # resulteeprom, eepromdata = flash_eeprom(varlist["eeprompath"], globalVar.g_uuid, mac0) - # psdbObj.create_task_result(globalVar.taskid_ctl, "FLASHEEPROM", - # "TASK_OK" if resulteeprom == 0 else "TASK_FAIL", eepromdata) + resulteeprom = 0 + if "eeprompath" in varlist and len(varlist["eeprompath"]) > 0: + mac0 = psdbObj.get_board_macaddr(globalVar.g_uuid) + resulteeprom, eepromdata = flash_eeprom(varlist["eeprompath"], globalVar.g_uuid, mac0) + psdbObj.create_task_result(globalVar.taskid_ctl, "FLASHEEPROM", + "TASK_OK" if resulteeprom == 0 else "TASK_FAIL", eepromdata) # flash non-volatile memory - # resultmemory = 0 - # if "image" in varlist and len(varlist["image"]) > 0: - # resultmemory = flash_memory(varlist["image"]) - # psdbObj.create_task_result(globalVar.taskid_ctl, "FLASHMEMORY", - # "TASK_OK" if resultmemory == 0 else "TASK_FAIL", varlist["image"]) + resultmemory = 0 + if "image" in varlist and len(varlist["image"]) > 0: + resultmemory = flash_memory(varlist["image"]) + psdbObj.create_task_result(globalVar.taskid_ctl, "FLASHMEMORY", + "TASK_OK" if resultmemory == 0 else "TASK_FAIL", varlist["image"]) # update status with the result - # if resulteeprom == 0 and resultmemory == 0: - if True: + if resulteeprom == 0 and resultmemory == 0: alltasksok = True psdbObj.update_taskctl_status(globalVar.taskid_ctl, "TASK_BOARD_OK") else: @@ -189,6 +188,8 @@ def main(): if __name__ == "__main__": # Clear the shell screen clear() + test_abspath = os.path.dirname(os.path.abspath(__file__)) + print(test_abspath) # create logger loggerObj = ISEE_Logger(logging.INFO) @@ -196,7 +197,7 @@ if __name__ == "__main__": # Try to parse the setup.xml file try: - xmlObj = XMLSetup("setup.xml") + xmlObj = XMLSetup(os.path.join(test_abspath, "setup.xml")) except: print("Error: Cannot parse setup.xml file") exit(1) -- cgit v1.1 From d4020f0bce8c7841879736cd00ec64d617111c80 Mon Sep 17 00:00:00 2001 From: Hector Fernandez Date: Thu, 23 Apr 2020 14:14:02 +0200 Subject: Modified nand memory flasher --- test-cli/test_main.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) (limited to 'test-cli/test_main.py') diff --git a/test-cli/test_main.py b/test-cli/test_main.py index 23250ca..3034e77 100644 --- a/test-cli/test_main.py +++ b/test-cli/test_main.py @@ -156,10 +156,11 @@ def main(): # flash non-volatile memory resultmemory = 0 - if "image" in varlist and len(varlist["image"]) > 0: - resultmemory = flash_memory(varlist["image"]) + if ("flashimagepath" in varlist and len(varlist["flashimagepath"]) > 0: + resultmemory = flash_memory(varlist["flashimagepath"]) psdbObj.create_task_result(globalVar.taskid_ctl, "FLASHMEMORY", - "TASK_OK" if resultmemory == 0 else "TASK_FAIL", varlist["image"]) + "TASK_OK" if resultmemory == 0 else "TASK_FAIL", + varlist["flashimagepath"]) # update status with the result if resulteeprom == 0 and resultmemory == 0: -- cgit v1.1 From 0dd2613b88d937e71b274d68924088b1bad65de4 Mon Sep 17 00:00:00 2001 From: Hector Fernandez Date: Thu, 11 Jun 2020 18:10:27 +0200 Subject: Solved syntax error in test_main.py. --- test-cli/test_main.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'test-cli/test_main.py') diff --git a/test-cli/test_main.py b/test-cli/test_main.py index 3034e77..1f2ff90 100644 --- a/test-cli/test_main.py +++ b/test-cli/test_main.py @@ -156,7 +156,7 @@ def main(): # flash non-volatile memory resultmemory = 0 - if ("flashimagepath" in varlist and len(varlist["flashimagepath"]) > 0: + if "flashimagepath" in varlist and len(varlist["flashimagepath"]) > 0: resultmemory = flash_memory(varlist["flashimagepath"]) psdbObj.create_task_result(globalVar.taskid_ctl, "FLASHMEMORY", "TASK_OK" if resultmemory == 0 else "TASK_FAIL", -- cgit v1.1 From 09b3bb38fc7305c9b47c29bc90ebc9c636827307 Mon Sep 17 00:00:00 2001 From: Hector Fernandez Date: Wed, 17 Jun 2020 09:44:56 +0200 Subject: SOPA0000: Added support for saving results (strings or files) in the DB. It also saves a final DMESG file. --- test-cli/test_main.py | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) (limited to 'test-cli/test_main.py') diff --git a/test-cli/test_main.py b/test-cli/test_main.py index 1f2ff90..ea20cee 100644 --- a/test-cli/test_main.py +++ b/test-cli/test_main.py @@ -8,7 +8,6 @@ from test.helpers.setup_xml import XMLSetup from test.runners.simple import SimpleTestRunner from test.helpers.syscmd import TestSysCommand from test.tests.qethernet import Qethernet -from test.tests.qaudio import Qaudio from test.tests.qram import Qram from test.tests.qusb import Qusb from test.tests.qi2c import Qi2c @@ -23,9 +22,10 @@ from test.helpers.globalVariables import globalVar import socket from test.helpers.iseelogger import ISEE_Logger import logging -from test.flashers.flasheeprom import flash_eeprom -from test.flashers.flashmemory import flash_memory +from test.tasks.flasheeprom import flash_eeprom +from test.tasks.flashmemory import flash_memory from test.helpers.qrreader import QRReader +from test.tasks.generatedmesg import generate_dmesg # global variables psdbObj = TestSrv_Database() @@ -49,9 +49,7 @@ def create_paramslist(params): def add_test_task(suite, testdefname, paramlist): - if testdefname == "AUDIO": - suite.addTest(Qaudio(testdefname, "execute", paramlist)) - elif testdefname == "RAM": + if testdefname == "RAM": suite.addTest(Qram(testdefname, "execute", paramlist)) elif testdefname == "SERIALDUAL": suite.addTest(Qduplex(testdefname, "execute", paramlist)) @@ -136,6 +134,9 @@ def main(): runner = SimpleTestRunner(psdbObj) loggerObj.getlogger().info("Tests running") testresult = runner.run(create_testsuite()) + # save dmesg + generate_dmesg(globalVar.testid_ctl, psdbObj) + # execute aditional tasks, only if the test was succesfull if testresult.wasSuccessful(): loggerObj.getlogger().info("Extra tasks running") -- cgit v1.1 From 278b5729a44837e37fe13611518c1babc8de00df Mon Sep 17 00:00:00 2001 From: Hector Fernandez Date: Fri, 19 Jun 2020 13:15:56 +0200 Subject: Model and variant type are obtained from kernel cmdline instead of setup.xml. Python checks if they exist and can generate an error message to the DB if they are missing. --- test-cli/test_main.py | 131 ++++++++++++++++++++++++++------------------------ 1 file changed, 69 insertions(+), 62 deletions(-) (limited to 'test-cli/test_main.py') diff --git a/test-cli/test_main.py b/test-cli/test_main.py index ea20cee..3345f53 100644 --- a/test-cli/test_main.py +++ b/test-cli/test_main.py @@ -26,6 +26,7 @@ from test.tasks.flasheeprom import flash_eeprom from test.tasks.flashmemory import flash_memory from test.helpers.qrreader import QRReader from test.tasks.generatedmesg import generate_dmesg +from test.helpers.cmdline import LinuxKernel # global variables psdbObj = TestSrv_Database() @@ -61,8 +62,6 @@ def add_test_task(suite, testdefname, paramlist): suite.addTest(Qrtc(testdefname, "execute", paramlist)) elif testdefname == "CONSUMPTION": suite.addTest(Qamper(testdefname, "execute", paramlist)) - elif testdefname == "DMESG": - suite.addTest(TestSysCommand(testdefname, "execute", paramlist)) elif testdefname == "ETHERNET": suite.addTest(Qethernet(testdefname, "execute", paramlist)) elif testdefname == "NAND": @@ -99,8 +98,12 @@ def create_testsuite(): def create_board(): - model_id = xmlObj.gettagKey('board', 'model') - variant = xmlObj.gettagKey('board', 'variant') + cmd = LinuxKernel() + model_id = cmd.getkvar("bmodel", "none") + variant = cmd.getkvar("bvariant", "none") + + if model_id == "none" or variant == "none": + return 1 # get model id globalVar.g_mid = model_id + "-" + variant @@ -115,6 +118,7 @@ def create_board(): get_mac(globalVar.g_mid)) print(globalVar.g_uuid) psdbObj.bond_to_station(globalVar.g_uuid, globalVar.station) + return 0 def get_taskvars_list(uuid): @@ -127,64 +131,67 @@ def get_taskvars_list(uuid): def main(): # initialize the board - create_board() - # create a process - globalVar.testid_ctl = psdbObj.open_test(globalVar.g_uuid) - # create and run tests according to the board type - runner = SimpleTestRunner(psdbObj) - loggerObj.getlogger().info("Tests running") - testresult = runner.run(create_testsuite()) - # save dmesg - generate_dmesg(globalVar.testid_ctl, psdbObj) - - # execute aditional tasks, only if the test was succesfull - if testresult.wasSuccessful(): - loggerObj.getlogger().info("Extra tasks running") - # create task control - globalVar.taskid_ctl = psdbObj.open_task(globalVar.g_uuid) - # get extra variables - varlist = get_taskvars_list(globalVar.g_uuid) - - alltasksok = False - - # flash eeprom - resulteeprom = 0 - if "eeprompath" in varlist and len(varlist["eeprompath"]) > 0: - mac0 = psdbObj.get_board_macaddr(globalVar.g_uuid) - resulteeprom, eepromdata = flash_eeprom(varlist["eeprompath"], globalVar.g_uuid, mac0) - psdbObj.create_task_result(globalVar.taskid_ctl, "FLASHEEPROM", - "TASK_OK" if resulteeprom == 0 else "TASK_FAIL", eepromdata) - - # flash non-volatile memory - resultmemory = 0 - if "flashimagepath" in varlist and len(varlist["flashimagepath"]) > 0: - resultmemory = flash_memory(varlist["flashimagepath"]) - psdbObj.create_task_result(globalVar.taskid_ctl, "FLASHMEMORY", - "TASK_OK" if resultmemory == 0 else "TASK_FAIL", - varlist["flashimagepath"]) - - # update status with the result - if resulteeprom == 0 and resultmemory == 0: - alltasksok = True - psdbObj.update_taskctl_status(globalVar.taskid_ctl, "TASK_BOARD_OK") - else: - psdbObj.update_taskctl_status(globalVar.taskid_ctl, "TASK_BOARD_FAIL") - - if alltasksok: - # get barcode using the scanner - loggerObj.getlogger().info("Waiting for barcode scanner") - qrreceived = False - while not qrreceived: - qr = QRReader() - if qr.openQR(): - # waits 5s to receive a valid code - if qr.readQRasync(5): - qrreceived = True - factorycode = qr.getQRNumber() - psdbObj.set_factorycode(globalVar.g_uuid, factorycode) - qr.closeQR() - - loggerObj.getlogger().info("Python program finished") + res = create_board() + if res: + loggerObj.getlogger().info("Python program error") + else: + # create a process + globalVar.testid_ctl = psdbObj.open_test(globalVar.g_uuid) + # create and run tests according to the board type + runner = SimpleTestRunner(psdbObj) + loggerObj.getlogger().info("Tests running") + testresult = runner.run(create_testsuite()) + # save dmesg + generate_dmesg(globalVar.testid_ctl, psdbObj) + + # execute aditional tasks, only if the test was succesfull + if testresult.wasSuccessful(): + loggerObj.getlogger().info("Extra tasks running") + # create task control + globalVar.taskid_ctl = psdbObj.open_task(globalVar.g_uuid) + # get extra variables + varlist = get_taskvars_list(globalVar.g_uuid) + + alltasksok = False + + # flash eeprom + resulteeprom = 0 + if "eeprompath" in varlist and len(varlist["eeprompath"]) > 0: + mac0 = psdbObj.get_board_macaddr(globalVar.g_uuid) + resulteeprom, eepromdata = flash_eeprom(varlist["eeprompath"], globalVar.g_uuid, mac0) + psdbObj.create_task_result(globalVar.taskid_ctl, "FLASHEEPROM", + "TASK_OK" if resulteeprom == 0 else "TASK_FAIL", eepromdata) + + # flash non-volatile memory + resultmemory = 0 + if "flashimagepath" in varlist and len(varlist["flashimagepath"]) > 0: + resultmemory = flash_memory(varlist["flashimagepath"]) + psdbObj.create_task_result(globalVar.taskid_ctl, "FLASHMEMORY", + "TASK_OK" if resultmemory == 0 else "TASK_FAIL", + varlist["flashimagepath"]) + + # update status with the result + if resulteeprom == 0 and resultmemory == 0: + alltasksok = True + psdbObj.update_taskctl_status(globalVar.taskid_ctl, "TASK_BOARD_OK") + else: + psdbObj.update_taskctl_status(globalVar.taskid_ctl, "TASK_BOARD_FAIL") + + if alltasksok: + # get barcode using the scanner + loggerObj.getlogger().info("Waiting for barcode scanner") + qrreceived = False + while not qrreceived: + qr = QRReader() + if qr.openQR(): + # waits 5s to receive a valid code + if qr.readQRasync(5): + qrreceived = True + factorycode = qr.getQRNumber() + psdbObj.set_factorycode(globalVar.g_uuid, factorycode) + qr.closeQR() + + loggerObj.getlogger().info("Python program finished") if __name__ == "__main__": -- cgit v1.1 From db3b1e45c47a1ef23c1ad67114a09cbec0976681 Mon Sep 17 00:00:00 2001 From: Hector Fernandez Date: Thu, 25 Jun 2020 11:45:31 +0200 Subject: Solved bugs. Adapted to DB changes. --- test-cli/test_main.py | 226 ++++++++++++++++++++++++++++++++++---------------- 1 file changed, 155 insertions(+), 71 deletions(-) (limited to 'test-cli/test_main.py') diff --git a/test-cli/test_main.py b/test-cli/test_main.py index 3345f53..0df9bd4 100644 --- a/test-cli/test_main.py +++ b/test-cli/test_main.py @@ -6,7 +6,6 @@ import unittest from test.helpers.testsrv_db import TestSrv_Database from test.helpers.setup_xml import XMLSetup from test.runners.simple import SimpleTestRunner -from test.helpers.syscmd import TestSysCommand from test.tests.qethernet import Qethernet from test.tests.qram import Qram from test.tests.qusb import Qusb @@ -27,6 +26,8 @@ from test.tasks.flashmemory import flash_memory from test.helpers.qrreader import QRReader from test.tasks.generatedmesg import generate_dmesg from test.helpers.cmdline import LinuxKernel +from test.helpers.amper import Amper +from test.enums.StationStates import StationStates # global variables psdbObj = TestSrv_Database() @@ -103,13 +104,18 @@ def create_board(): variant = cmd.getkvar("bvariant", "none") if model_id == "none" or variant == "none": - return 1 + print("Error: Cannot open DB connection") + loggerObj.getlogger().info("Station error: #Cannot get model and variant information#") + exit(0) # get model id globalVar.g_mid = model_id + "-" + variant # get station - globalVar.station = socket.gethostname() + if "Station" not in globalVar.station: + print("Error: Wrong Station name") + loggerObj.getlogger().info("Station error: #Wrong station name#") + exit(0) processor_id = get_die_id(globalVar.g_mid) print(globalVar.g_mid) @@ -117,8 +123,6 @@ def create_board(): globalVar.g_uuid = psdbObj.create_board(processor_id, model_id, variant, globalVar.station, get_mac(globalVar.g_mid)) print(globalVar.g_uuid) - psdbObj.bond_to_station(globalVar.g_uuid, globalVar.station) - return 0 def get_taskvars_list(uuid): @@ -131,67 +135,97 @@ def get_taskvars_list(uuid): def main(): # initialize the board - res = create_board() - if res: - loggerObj.getlogger().info("Python program error") + create_board() + + # create a process + globalVar.testid_ctl = psdbObj.open_test(globalVar.g_uuid) + # create and run tests according to the board type + runner = SimpleTestRunner(psdbObj) + # Change state to "TESTS_RUNNING" + if psdbObj.read_station_state(globalVar.station) == StationStates.STATION_ERROR.name: + # Abort + print("Error: Wrong previous station state.") + exit(0) + psdbObj.change_station_state(globalVar.station, StationStates.TESTS_RUNNING.name) + # Execute tests + testresult = runner.run(create_testsuite()) + # save dmesg at the end of the tests + generate_dmesg(globalVar.testid_ctl, psdbObj) + + # execute aditional tasks, only if the test was succesfull + if testresult.wasSuccessful(): + # Change state to "TESTS_OK" + if psdbObj.read_station_state(globalVar.station) == StationStates.STATION_ERROR.name: + # Abort + print("Error: Wrong previous station state.") + exit(0) + psdbObj.change_station_state(globalVar.station, StationStates.TESTS_OK.name) + # Change state to "EXTRATASKS_RUNNING" + if psdbObj.read_station_state(globalVar.station) == StationStates.STATION_ERROR.name: + # Abort + print("Error: Wrong previous station state.") + exit(0) + psdbObj.change_station_state(globalVar.station, StationStates.EXTRATASKS_RUNNING.name) + # create task control + globalVar.taskid_ctl = psdbObj.open_task(globalVar.g_uuid) + # get extra variables + varlist = get_taskvars_list(globalVar.g_uuid) + + alltasksok = False + + # flash eeprom + resulteeprom = 0 + if "eeprompath" in varlist and len(varlist["eeprompath"]) > 0: + mac0 = psdbObj.get_board_macaddr(globalVar.g_uuid) + resulteeprom, eepromdata = flash_eeprom(varlist["eeprompath"], globalVar.g_uuid, mac0) + psdbObj.create_task_result(globalVar.taskid_ctl, "FLASHEEPROM", + "TASK_OK" if resulteeprom == 0 else "TASK_FAIL", eepromdata) + + # flash non-volatile memory + resultmemory = 0 + if "flashimagepath" in varlist and len(varlist["flashimagepath"]) > 0: + resultmemory = flash_memory(varlist["flashimagepath"]) + psdbObj.create_task_result(globalVar.taskid_ctl, "FLASHMEMORY", + "TASK_OK" if resultmemory == 0 else "TASK_FAIL", + varlist["flashimagepath"]) + + # update status with the result + if resulteeprom == 0 and resultmemory == 0: + psdbObj.update_taskctl_status(globalVar.taskid_ctl, "TASK_BOARD_OK") + # Change state to "WAITING_FOR_SCANNER" + if psdbObj.read_station_state(globalVar.station) == StationStates.STATION_ERROR.name: + # Abort + print("Error: Wrong previous station state.") + exit(0) + psdbObj.change_station_state(globalVar.station, StationStates.WAITING_FOR_SCANNER.name) + # get barcode using the scanner + qrreceived = False + while not qrreceived: + qr = QRReader() + if qr.openQR(): + # waits 5s to receive a valid code + if qr.readQRasync(5): + qrreceived = True + factorycode = qr.getQRNumber() + psdbObj.set_factorycode(globalVar.g_uuid, factorycode) + qr.closeQR() + # Change state to "FINISHED" + if psdbObj.read_station_state(globalVar.station) == StationStates.STATION_ERROR.name: + # Abort + print("Error: Wrong previous station state.") + exit(0) + psdbObj.change_station_state(globalVar.station, StationStates.FINISHED.name) + else: + psdbObj.update_taskctl_status(globalVar.taskid_ctl, "TASK_BOARD_FAIL") + loggerObj.getlogger().info("Station error: #Unable to complete extra tasks#") + else: - # create a process - globalVar.testid_ctl = psdbObj.open_test(globalVar.g_uuid) - # create and run tests according to the board type - runner = SimpleTestRunner(psdbObj) - loggerObj.getlogger().info("Tests running") - testresult = runner.run(create_testsuite()) - # save dmesg - generate_dmesg(globalVar.testid_ctl, psdbObj) - - # execute aditional tasks, only if the test was succesfull - if testresult.wasSuccessful(): - loggerObj.getlogger().info("Extra tasks running") - # create task control - globalVar.taskid_ctl = psdbObj.open_task(globalVar.g_uuid) - # get extra variables - varlist = get_taskvars_list(globalVar.g_uuid) - - alltasksok = False - - # flash eeprom - resulteeprom = 0 - if "eeprompath" in varlist and len(varlist["eeprompath"]) > 0: - mac0 = psdbObj.get_board_macaddr(globalVar.g_uuid) - resulteeprom, eepromdata = flash_eeprom(varlist["eeprompath"], globalVar.g_uuid, mac0) - psdbObj.create_task_result(globalVar.taskid_ctl, "FLASHEEPROM", - "TASK_OK" if resulteeprom == 0 else "TASK_FAIL", eepromdata) - - # flash non-volatile memory - resultmemory = 0 - if "flashimagepath" in varlist and len(varlist["flashimagepath"]) > 0: - resultmemory = flash_memory(varlist["flashimagepath"]) - psdbObj.create_task_result(globalVar.taskid_ctl, "FLASHMEMORY", - "TASK_OK" if resultmemory == 0 else "TASK_FAIL", - varlist["flashimagepath"]) - - # update status with the result - if resulteeprom == 0 and resultmemory == 0: - alltasksok = True - psdbObj.update_taskctl_status(globalVar.taskid_ctl, "TASK_BOARD_OK") - else: - psdbObj.update_taskctl_status(globalVar.taskid_ctl, "TASK_BOARD_FAIL") - - if alltasksok: - # get barcode using the scanner - loggerObj.getlogger().info("Waiting for barcode scanner") - qrreceived = False - while not qrreceived: - qr = QRReader() - if qr.openQR(): - # waits 5s to receive a valid code - if qr.readQRasync(5): - qrreceived = True - factorycode = qr.getQRNumber() - psdbObj.set_factorycode(globalVar.g_uuid, factorycode) - qr.closeQR() - - loggerObj.getlogger().info("Python program finished") + # Change state to "TESTS_FAILED" + if psdbObj.read_station_state(globalVar.station) == StationStates.STATION_ERROR.name: + # Abort + print("Error: Wrong previous station state.") + exit(0) + psdbObj.change_station_state(globalVar.station, StationStates.TESTS_FAILED.name) if __name__ == "__main__": @@ -202,18 +236,68 @@ if __name__ == "__main__": # create logger loggerObj = ISEE_Logger(logging.INFO) - loggerObj.getlogger().info("Python program started") # Try to parse the setup.xml file try: xmlObj = XMLSetup(os.path.join(test_abspath, "setup.xml")) except: + # Abort print("Error: Cannot parse setup.xml file") - exit(1) + loggerObj.getlogger().info("Station error: #Cannot parse XML configuration file#") + exit(0) # Try to connect to the DB, according to setup.xml configuration - if psdbObj.open(xmlObj): - main() - else: + if not psdbObj.open(xmlObj): print("Error: Cannot open DB connection") - exit(2) + loggerObj.getlogger().info("Station error: #Cannot open DB connection#") + exit(0) + + # get station name + globalVar.station = socket.gethostname() + + # Check if current state is "WAIT_TEST_START" + currentstate = psdbObj.read_station_state(globalVar.station) + if currentstate != StationStates.WAIT_TEST_START.name: + # Abort + print("Error: Wrong previous station state.") + exit(0) + + # Change state to "TESTS_CHECKING_ENV" + psdbObj.change_station_state(globalVar.station, StationStates.TESTS_CHECKING_ENV.name) + + # Look for a sd or pendrive + # TODO + #if not disk_exists("/dev/mmcblk0") and not : + # abort + #print("error: cannot find a barcode scanner.") + #loggerObj.getlogger().info("station error: #cannot find a sd card or memory usb to load the test environment.#") + #exit(0) + + # Look for a barcode scanner + qr = QRReader() + if qr.openQR(): + qr.closeQR() + else: + # Abort + print("Error: Cannot find a barcode scanner.") + loggerObj.getlogger().info("Station error: #Cannot find a barcode scanner#") + exit(0) + + # Look for an amperimeter + amp = Amper() + if not amp.open(): + # Abort + print("Error: Cannot open an amperimeter COM port.") + loggerObj.getlogger().info("Station error: #Cannot open an amperimeter COM port#") + exit(0) + if not amp.hello(): + if not amp.hello(): + # Abort + print("Error: Cannot open find an amperimeter.") + loggerObj.getlogger().info("Station error: #Cannot open find an amperimeter#") + exit(0) + + # Execute main + main() + + exit(0) -- cgit v1.1 From 9ac8a326412b04e4873b883c2f2a056ca0b22480 Mon Sep 17 00:00:00 2001 From: Hector Fernandez Date: Thu, 25 Jun 2020 14:42:42 +0200 Subject: Modified USB test and the way it finds USB memory storages. --- test-cli/test_main.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'test-cli/test_main.py') diff --git a/test-cli/test_main.py b/test-cli/test_main.py index 0df9bd4..63d8dad 100644 --- a/test-cli/test_main.py +++ b/test-cli/test_main.py @@ -92,6 +92,7 @@ def create_testsuite(): paramlist["testid"] = testid paramlist["boarduuid"] = globalVar.g_uuid paramlist["testidctl"] = globalVar.testid_ctl + paramlist["xml"] = xmlObj # add test to TestSuite add_test_task(suite, testdefname, paramlist) @@ -232,7 +233,6 @@ if __name__ == "__main__": # Clear the shell screen clear() test_abspath = os.path.dirname(os.path.abspath(__file__)) - print(test_abspath) # create logger loggerObj = ISEE_Logger(logging.INFO) -- cgit v1.1 From 34df86b37d6838b115e65e5f3a332344afeb86b8 Mon Sep 17 00:00:00 2001 From: Hector Fernandez Date: Wed, 1 Jul 2020 10:45:34 +0200 Subject: Changes to adapt to new way to save results in DB. Created audio test. Added protections against unexpected status in DB. --- test-cli/test_main.py | 129 ++++++++++++++++++++++++++++++++------------------ 1 file changed, 82 insertions(+), 47 deletions(-) (limited to 'test-cli/test_main.py') diff --git a/test-cli/test_main.py b/test-cli/test_main.py index 63d8dad..b32ad26 100644 --- a/test-cli/test_main.py +++ b/test-cli/test_main.py @@ -1,5 +1,5 @@ from test.helpers.int_registers import get_die_id -from test.helpers.int_registers import get_mac +from test.helpers.int_registers import get_internal_mac from subprocess import call import os import unittest @@ -9,7 +9,6 @@ from test.runners.simple import SimpleTestRunner from test.tests.qethernet import Qethernet from test.tests.qram import Qram from test.tests.qusb import Qusb -from test.tests.qi2c import Qi2c from test.tests.qeeprom import Qeeprom from test.tests.qserial import Qserial from test.tests.qwifi import Qwifi @@ -17,6 +16,8 @@ from test.tests.qrtc import Qrtc from test.tests.qduplex_ser import Qduplex from test.tests.qamper import Qamper from test.tests.qnand import Qnand +from test.tests.qaudio import Qaudio +from test.tests.qdmesg import Qdmesg from test.helpers.globalVariables import globalVar import socket from test.helpers.iseelogger import ISEE_Logger @@ -24,16 +25,17 @@ import logging from test.tasks.flasheeprom import flash_eeprom from test.tasks.flashmemory import flash_memory from test.helpers.qrreader import QRReader -from test.tasks.generatedmesg import generate_dmesg from test.helpers.cmdline import LinuxKernel from test.helpers.amper import Amper from test.enums.StationStates import StationStates +import time # global variables psdbObj = TestSrv_Database() xmlObj = None loggerObj = None test_abspath = None +tests_manually_executed = [] # define clear function @@ -50,36 +52,50 @@ def create_paramslist(params): return paramlist -def add_test_task(suite, testdefname, paramlist): +def add_test(suite, testdefname, paramlist): if testdefname == "RAM": suite.addTest(Qram(testdefname, "execute", paramlist)) + return 0 elif testdefname == "SERIALDUAL": suite.addTest(Qduplex(testdefname, "execute", paramlist)) + return 0 elif testdefname == "EEPROM": suite.addTest(Qeeprom(testdefname, "execute", paramlist)) + return 0 elif testdefname == "SERIAL": suite.addTest(Qserial(testdefname, "execute", paramlist)) + return 0 elif testdefname == "RTC": suite.addTest(Qrtc(testdefname, "execute", paramlist)) + return 0 elif testdefname == "CONSUMPTION": suite.addTest(Qamper(testdefname, "execute", paramlist)) + return 0 elif testdefname == "ETHERNET": suite.addTest(Qethernet(testdefname, "execute", paramlist)) + return 0 elif testdefname == "NAND": suite.addTest(Qnand(testdefname, "execute", paramlist)) - elif testdefname == "I2C": - suite.addTest(Qi2c(testdefname, "execute", paramlist)) + return 0 elif testdefname == "WIFI": suite.addTest(Qwifi(testdefname, "execute", paramlist)) + return 0 elif testdefname == "USB": suite.addTest(Qusb(testdefname, "execute", paramlist)) + return 0 + elif testdefname == "AUDIO": + suite.addTest(Qaudio(testdefname, "execute", paramlist)) + return 0 + elif testdefname == "DMESG": + tests_manually_executed.append(Qdmesg(testdefname, "execute", paramlist)) + return 0 else: - raise Exception("Wrong testdefname") + return 1 def create_testsuite(): # create an object TestSuite - suite = unittest.TestSuite() + suite1 = unittest.TestSuite() # get list of tests for this board tests = psdbObj.get_tests_list(globalVar.g_uuid) # loop in every test for this board @@ -93,10 +109,11 @@ def create_testsuite(): paramlist["boarduuid"] = globalVar.g_uuid paramlist["testidctl"] = globalVar.testid_ctl paramlist["xml"] = xmlObj + paramlist["db"] = psdbObj # add test to TestSuite - add_test_task(suite, testdefname, paramlist) + rescode = add_test(suite1, testdefname, paramlist) - return suite + return suite1 def create_board(): @@ -122,7 +139,7 @@ def create_board(): print(globalVar.g_mid) print(processor_id) globalVar.g_uuid = psdbObj.create_board(processor_id, model_id, variant, globalVar.station, - get_mac(globalVar.g_mid)) + get_internal_mac(globalVar.g_mid)) print(globalVar.g_uuid) @@ -140,31 +157,38 @@ def main(): # create a process globalVar.testid_ctl = psdbObj.open_test(globalVar.g_uuid) - # create and run tests according to the board type - runner = SimpleTestRunner(psdbObj) # Change state to "TESTS_RUNNING" if psdbObj.read_station_state(globalVar.station) == StationStates.STATION_ERROR.name: # Abort - print("Error: Wrong previous station state.") + print( + "Error: Wrong previous station state before changing to TESTS_RUNNING state. STATION_ERROR state unexpected.") exit(0) psdbObj.change_station_state(globalVar.station, StationStates.TESTS_RUNNING.name) - # Execute tests - testresult = runner.run(create_testsuite()) - # save dmesg at the end of the tests - generate_dmesg(globalVar.testid_ctl, psdbObj) - # execute aditional tasks, only if the test was succesfull + # generate suits + suite1 = create_testsuite() + # Execute tests (round 1) + runner1 = SimpleTestRunner(psdbObj) + testresult = runner1.run(suite1) + + #Execute manually tests + for test in tests_manually_executed: + test.execute() + + # execute aditional tasks, only if the test was successful if testresult.wasSuccessful(): # Change state to "TESTS_OK" if psdbObj.read_station_state(globalVar.station) == StationStates.STATION_ERROR.name: # Abort - print("Error: Wrong previous station state.") + print( + "Error: Wrong previous station state before changing to TESTS_OK state. STATION_ERROR state unexpected.") exit(0) psdbObj.change_station_state(globalVar.station, StationStates.TESTS_OK.name) # Change state to "EXTRATASKS_RUNNING" if psdbObj.read_station_state(globalVar.station) == StationStates.STATION_ERROR.name: # Abort - print("Error: Wrong previous station state.") + print( + "Error: Wrong previous station state before changing to EXTRATASKS_RUNNING state. STATION_ERROR state unexpected.") exit(0) psdbObj.change_station_state(globalVar.station, StationStates.EXTRATASKS_RUNNING.name) # create task control @@ -192,28 +216,38 @@ def main(): # update status with the result if resulteeprom == 0 and resultmemory == 0: + # finish tasks psdbObj.update_taskctl_status(globalVar.taskid_ctl, "TASK_BOARD_OK") + # Change state to "WAITING_FOR_SCANNER" if psdbObj.read_station_state(globalVar.station) == StationStates.STATION_ERROR.name: # Abort - print("Error: Wrong previous station state.") + print( + "Error: Wrong previous station state before changing to WAITING_FOR_SCANNER state. STATION_ERROR state unexpected.") exit(0) psdbObj.change_station_state(globalVar.station, StationStates.WAITING_FOR_SCANNER.name) - # get barcode using the scanner - qrreceived = False - while not qrreceived: - qr = QRReader() - if qr.openQR(): - # waits 5s to receive a valid code - if qr.readQRasync(5): - qrreceived = True - factorycode = qr.getQRNumber() - psdbObj.set_factorycode(globalVar.g_uuid, factorycode) - qr.closeQR() + + # get barcode using the scanner, only if autotest is disabled + autotest = psdbObj.get_setup_variable("AUTOTEST_" + globalVar.station) + if autotest == "True": + psdbObj.set_factorycode(globalVar.g_uuid, "1234567890") # fake factory code + else: + qrreceived = False + while not qrreceived: + qr = QRReader() + if qr.openQR(): + # waits 5s to receive a valid code + if qr.readQRasync(5): + qrreceived = True + factorycode = qr.getQRNumber() + psdbObj.set_factorycode(globalVar.g_uuid, factorycode) + qr.closeQR() + # Change state to "FINISHED" if psdbObj.read_station_state(globalVar.station) == StationStates.STATION_ERROR.name: # Abort - print("Error: Wrong previous station state.") + print( + "Error: Wrong previous station state before changing to FINISHED state. STATION_ERROR state unexpected.") exit(0) psdbObj.change_station_state(globalVar.station, StationStates.FINISHED.name) else: @@ -224,10 +258,16 @@ def main(): # Change state to "TESTS_FAILED" if psdbObj.read_station_state(globalVar.station) == StationStates.STATION_ERROR.name: # Abort - print("Error: Wrong previous station state.") + print( + "Error: Wrong previous station state before changing to TESTS_FAILED state. STATION_ERROR state unexpected.") exit(0) psdbObj.change_station_state(globalVar.station, StationStates.TESTS_FAILED.name) + # reset board if AUTOTEST is enabled + autotest = psdbObj.get_setup_variable("AUTOTEST_" + globalVar.station) + if autotest == "True": + os.system('reboot') + if __name__ == "__main__": # Clear the shell screen @@ -257,22 +297,17 @@ if __name__ == "__main__": # Check if current state is "WAIT_TEST_START" currentstate = psdbObj.read_station_state(globalVar.station) - if currentstate != StationStates.WAIT_TEST_START.name: - # Abort - print("Error: Wrong previous station state.") - exit(0) + while currentstate != StationStates.WAIT_TEST_START.name: + currentstate = psdbObj.read_station_state(globalVar.station) + if currentstate != StationStates.WAIT_TEST_START.name: + # Wait + print("Error: Wrong previous station state. WAIT_TEST_START state expected. Current state is: {}".format(currentstate)) + print("Waiting for WAIT_TEST_START state") + time.sleep(5) # Change state to "TESTS_CHECKING_ENV" psdbObj.change_station_state(globalVar.station, StationStates.TESTS_CHECKING_ENV.name) - # Look for a sd or pendrive - # TODO - #if not disk_exists("/dev/mmcblk0") and not : - # abort - #print("error: cannot find a barcode scanner.") - #loggerObj.getlogger().info("station error: #cannot find a sd card or memory usb to load the test environment.#") - #exit(0) - # Look for a barcode scanner qr = QRReader() if qr.openQR(): -- cgit v1.1 From 5dcd213c28451ac210703dc5bf9bf538671a0682 Mon Sep 17 00:00:00 2001 From: Hector Fernandez Date: Fri, 3 Jul 2020 13:55:45 +0200 Subject: Created new USB LOOP test. Moved all the test files to /var/lib/hwtest-files/ folder. --- test-cli/test_main.py | 3 +++ 1 file changed, 3 insertions(+) (limited to 'test-cli/test_main.py') diff --git a/test-cli/test_main.py b/test-cli/test_main.py index b32ad26..36d5fb1 100644 --- a/test-cli/test_main.py +++ b/test-cli/test_main.py @@ -86,6 +86,9 @@ def add_test(suite, testdefname, paramlist): elif testdefname == "AUDIO": suite.addTest(Qaudio(testdefname, "execute", paramlist)) return 0 + elif testdefname == "USBDUAL": + suite.addTest(Qaudio(testdefname, "execute", paramlist)) + return 0 elif testdefname == "DMESG": tests_manually_executed.append(Qdmesg(testdefname, "execute", paramlist)) return 0 -- cgit v1.1 From 0e8e3ecd4b9be71c41008b95950d089819622dff Mon Sep 17 00:00:00 2001 From: Hector Fernandez Date: Mon, 6 Jul 2020 14:08:27 +0200 Subject: Corrected some errors in USB and USBDUAL tests. --- test-cli/test_main.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) (limited to 'test-cli/test_main.py') diff --git a/test-cli/test_main.py b/test-cli/test_main.py index 36d5fb1..a8eff91 100644 --- a/test-cli/test_main.py +++ b/test-cli/test_main.py @@ -174,7 +174,7 @@ def main(): runner1 = SimpleTestRunner(psdbObj) testresult = runner1.run(suite1) - #Execute manually tests + # Execute manually tests for test in tests_manually_executed: test.execute() @@ -232,7 +232,7 @@ def main(): # get barcode using the scanner, only if autotest is disabled autotest = psdbObj.get_setup_variable("AUTOTEST_" + globalVar.station) - if autotest == "True": + if int(autotest) == 1: psdbObj.set_factorycode(globalVar.g_uuid, "1234567890") # fake factory code else: qrreceived = False @@ -268,7 +268,7 @@ def main(): # reset board if AUTOTEST is enabled autotest = psdbObj.get_setup_variable("AUTOTEST_" + globalVar.station) - if autotest == "True": + if int(autotest) == 1: os.system('reboot') -- cgit v1.1 From 43f688b414ea182cd9baf06ee83df072c3f563f5 Mon Sep 17 00:00:00 2001 From: Hector Fernandez Date: Tue, 7 Jul 2020 09:57:58 +0200 Subject: Added full support for USBLOOP test. Reboot board always if autotest is enabled. --- test-cli/test_main.py | 31 +++++++++++++++++++++++++------ 1 file changed, 25 insertions(+), 6 deletions(-) (limited to 'test-cli/test_main.py') diff --git a/test-cli/test_main.py b/test-cli/test_main.py index a8eff91..8f82398 100644 --- a/test-cli/test_main.py +++ b/test-cli/test_main.py @@ -11,6 +11,7 @@ from test.tests.qram import Qram from test.tests.qusb import Qusb from test.tests.qeeprom import Qeeprom from test.tests.qserial import Qserial +from test.tests.qusbdual import Qusbdual from test.tests.qwifi import Qwifi from test.tests.qrtc import Qrtc from test.tests.qduplex_ser import Qduplex @@ -44,6 +45,12 @@ def clear(): _ = call('clear' if os.name == 'posix' else 'cls') +def reboot_if_autotest(): + # reset board if AUTOTEST is enabled + autotest = psdbObj.get_setup_variable("AUTOTEST_" + globalVar.station) + if int(autotest) == 1: + os.system('reboot') + def create_paramslist(params): paramlist = {} for row in params: @@ -86,8 +93,8 @@ def add_test(suite, testdefname, paramlist): elif testdefname == "AUDIO": suite.addTest(Qaudio(testdefname, "execute", paramlist)) return 0 - elif testdefname == "USBDUAL": - suite.addTest(Qaudio(testdefname, "execute", paramlist)) + elif testdefname == "USBLOOP": + suite.addTest(Qusbdual(testdefname, "execute", paramlist)) return 0 elif testdefname == "DMESG": tests_manually_executed.append(Qdmesg(testdefname, "execute", paramlist)) @@ -127,6 +134,7 @@ def create_board(): if model_id == "none" or variant == "none": print("Error: Cannot open DB connection") loggerObj.getlogger().info("Station error: #Cannot get model and variant information#") + reboot_if_autotest() exit(0) # get model id @@ -136,6 +144,7 @@ def create_board(): if "Station" not in globalVar.station: print("Error: Wrong Station name") loggerObj.getlogger().info("Station error: #Wrong station name#") + reboot_if_autotest() exit(0) processor_id = get_die_id(globalVar.g_mid) @@ -165,6 +174,7 @@ def main(): # Abort print( "Error: Wrong previous station state before changing to TESTS_RUNNING state. STATION_ERROR state unexpected.") + reboot_if_autotest() exit(0) psdbObj.change_station_state(globalVar.station, StationStates.TESTS_RUNNING.name) @@ -185,6 +195,7 @@ def main(): # Abort print( "Error: Wrong previous station state before changing to TESTS_OK state. STATION_ERROR state unexpected.") + reboot_if_autotest() exit(0) psdbObj.change_station_state(globalVar.station, StationStates.TESTS_OK.name) # Change state to "EXTRATASKS_RUNNING" @@ -192,6 +203,7 @@ def main(): # Abort print( "Error: Wrong previous station state before changing to EXTRATASKS_RUNNING state. STATION_ERROR state unexpected.") + reboot_if_autotest() exit(0) psdbObj.change_station_state(globalVar.station, StationStates.EXTRATASKS_RUNNING.name) # create task control @@ -227,6 +239,7 @@ def main(): # Abort print( "Error: Wrong previous station state before changing to WAITING_FOR_SCANNER state. STATION_ERROR state unexpected.") + reboot_if_autotest() exit(0) psdbObj.change_station_state(globalVar.station, StationStates.WAITING_FOR_SCANNER.name) @@ -256,20 +269,18 @@ def main(): else: psdbObj.update_taskctl_status(globalVar.taskid_ctl, "TASK_BOARD_FAIL") loggerObj.getlogger().info("Station error: #Unable to complete extra tasks#") - else: # Change state to "TESTS_FAILED" if psdbObj.read_station_state(globalVar.station) == StationStates.STATION_ERROR.name: # Abort print( "Error: Wrong previous station state before changing to TESTS_FAILED state. STATION_ERROR state unexpected.") + reboot_if_autotest() exit(0) psdbObj.change_station_state(globalVar.station, StationStates.TESTS_FAILED.name) # reset board if AUTOTEST is enabled - autotest = psdbObj.get_setup_variable("AUTOTEST_" + globalVar.station) - if int(autotest) == 1: - os.system('reboot') + reboot_if_autotest() if __name__ == "__main__": @@ -287,12 +298,14 @@ if __name__ == "__main__": # Abort print("Error: Cannot parse setup.xml file") loggerObj.getlogger().info("Station error: #Cannot parse XML configuration file#") + reboot_if_autotest() exit(0) # Try to connect to the DB, according to setup.xml configuration if not psdbObj.open(xmlObj): print("Error: Cannot open DB connection") loggerObj.getlogger().info("Station error: #Cannot open DB connection#") + reboot_if_autotest() exit(0) # get station name @@ -311,6 +324,9 @@ if __name__ == "__main__": # Change state to "TESTS_CHECKING_ENV" psdbObj.change_station_state(globalVar.station, StationStates.TESTS_CHECKING_ENV.name) + # Wait before beginning + time.sleep(2) + # Look for a barcode scanner qr = QRReader() if qr.openQR(): @@ -319,6 +335,7 @@ if __name__ == "__main__": # Abort print("Error: Cannot find a barcode scanner.") loggerObj.getlogger().info("Station error: #Cannot find a barcode scanner#") + reboot_if_autotest() exit(0) # Look for an amperimeter @@ -327,12 +344,14 @@ if __name__ == "__main__": # Abort print("Error: Cannot open an amperimeter COM port.") loggerObj.getlogger().info("Station error: #Cannot open an amperimeter COM port#") + reboot_if_autotest() exit(0) if not amp.hello(): if not amp.hello(): # Abort print("Error: Cannot open find an amperimeter.") loggerObj.getlogger().info("Station error: #Cannot open find an amperimeter#") + reboot_if_autotest() exit(0) # Execute main -- cgit v1.1 From 9f89f02e7d6e2a3208b0b85d2567ebffd2515e09 Mon Sep 17 00:00:00 2001 From: Hector Fernandez Date: Tue, 7 Jul 2020 14:09:59 +0200 Subject: New way to check preivous state before changing to a new one. Solved some problems with audio test. --- test-cli/test_main.py | 160 ++++++++++++++++++++++++++------------------------ 1 file changed, 82 insertions(+), 78 deletions(-) (limited to 'test-cli/test_main.py') diff --git a/test-cli/test_main.py b/test-cli/test_main.py index 8f82398..3da79f7 100644 --- a/test-cli/test_main.py +++ b/test-cli/test_main.py @@ -45,12 +45,71 @@ def clear(): _ = call('clear' if os.name == 'posix' else 'cls') +def execute_station_error(text): + print("Error: {}".format(text)) + loggerObj.getlogger().info("Station error: #{}#".format(text)) + while True: + currentstate = psdbObj.read_station_state(globalVar.station) + if currentstate == StationStates.STATION_REBOOT.name: + os.system("reboot") + exit(1) + time.sleep(1) + + +def check_first_state(): + currentstate = psdbObj.read_station_state(globalVar.station) + if currentstate == StationStates.STATION_ERROR.name: + while True: + currentstate = psdbObj.read_station_state(globalVar.station) + if currentstate == StationStates.STATION_REBOOT.name: + os.system("reboot") + exit(1) + time.sleep(1) + elif currentstate == StationStates.STATION_REBOOT.name: + os.system("reboot") + exit(1) + else: + starttime = time.time() + while currentstate != StationStates.WAIT_TEST_START.name: + if time.time() - starttime >= 30.0: + # Error + execute_station_error("Timeout while waiting for WAIT_TEST_START state") + else: + time.sleep(5) + currentstate = psdbObj.read_station_state(globalVar.station) + if currentstate == StationStates.STATION_ERROR.name: + while True: + currentstate = psdbObj.read_station_state(globalVar.station) + if currentstate == StationStates.STATION_REBOOT.name: + os.system("reboot") + exit(1) + time.sleep(1) + elif currentstate == StationStates.STATION_REBOOT.name: + os.system("reboot") + exit(1) + + +def set_next_state(newstate): + statewritten = psdbObj.change_station_state(globalVar.station, newstate) + if statewritten == StationStates.STATION_REBOOT.name: + os.system("reboot") + exit(1) + elif statewritten == StationStates.STATION_ERROR.name: + while True: + currentstate = psdbObj.read_station_state(globalVar.station) + if currentstate == StationStates.STATION_REBOOT.name: + os.system("reboot") + exit(1) + time.sleep(1) + + def reboot_if_autotest(): # reset board if AUTOTEST is enabled autotest = psdbObj.get_setup_variable("AUTOTEST_" + globalVar.station) if int(autotest) == 1: os.system('reboot') + def create_paramslist(params): paramlist = {} for row in params: @@ -121,7 +180,7 @@ def create_testsuite(): paramlist["xml"] = xmlObj paramlist["db"] = psdbObj # add test to TestSuite - rescode = add_test(suite1, testdefname, paramlist) + add_test(suite1, testdefname, paramlist) return suite1 @@ -132,20 +191,16 @@ def create_board(): variant = cmd.getkvar("bvariant", "none") if model_id == "none" or variant == "none": - print("Error: Cannot open DB connection") - loggerObj.getlogger().info("Station error: #Cannot get model and variant information#") - reboot_if_autotest() - exit(0) + # Error + execute_station_error("Cannot get model and variant information") # get model id globalVar.g_mid = model_id + "-" + variant # get station if "Station" not in globalVar.station: - print("Error: Wrong Station name") - loggerObj.getlogger().info("Station error: #Wrong station name#") - reboot_if_autotest() - exit(0) + # Error + execute_station_error("Wrong station name") processor_id = get_die_id(globalVar.g_mid) print(globalVar.g_mid) @@ -170,13 +225,7 @@ def main(): # create a process globalVar.testid_ctl = psdbObj.open_test(globalVar.g_uuid) # Change state to "TESTS_RUNNING" - if psdbObj.read_station_state(globalVar.station) == StationStates.STATION_ERROR.name: - # Abort - print( - "Error: Wrong previous station state before changing to TESTS_RUNNING state. STATION_ERROR state unexpected.") - reboot_if_autotest() - exit(0) - psdbObj.change_station_state(globalVar.station, StationStates.TESTS_RUNNING.name) + set_next_state(StationStates.TESTS_RUNNING.name) # generate suits suite1 = create_testsuite() @@ -191,21 +240,9 @@ def main(): # execute aditional tasks, only if the test was successful if testresult.wasSuccessful(): # Change state to "TESTS_OK" - if psdbObj.read_station_state(globalVar.station) == StationStates.STATION_ERROR.name: - # Abort - print( - "Error: Wrong previous station state before changing to TESTS_OK state. STATION_ERROR state unexpected.") - reboot_if_autotest() - exit(0) - psdbObj.change_station_state(globalVar.station, StationStates.TESTS_OK.name) + set_next_state(StationStates.TESTS_OK.name) # Change state to "EXTRATASKS_RUNNING" - if psdbObj.read_station_state(globalVar.station) == StationStates.STATION_ERROR.name: - # Abort - print( - "Error: Wrong previous station state before changing to EXTRATASKS_RUNNING state. STATION_ERROR state unexpected.") - reboot_if_autotest() - exit(0) - psdbObj.change_station_state(globalVar.station, StationStates.EXTRATASKS_RUNNING.name) + set_next_state(StationStates.EXTRATASKS_RUNNING.name) # create task control globalVar.taskid_ctl = psdbObj.open_task(globalVar.g_uuid) # get extra variables @@ -235,13 +272,7 @@ def main(): psdbObj.update_taskctl_status(globalVar.taskid_ctl, "TASK_BOARD_OK") # Change state to "WAITING_FOR_SCANNER" - if psdbObj.read_station_state(globalVar.station) == StationStates.STATION_ERROR.name: - # Abort - print( - "Error: Wrong previous station state before changing to WAITING_FOR_SCANNER state. STATION_ERROR state unexpected.") - reboot_if_autotest() - exit(0) - psdbObj.change_station_state(globalVar.station, StationStates.WAITING_FOR_SCANNER.name) + set_next_state(StationStates.WAITING_FOR_SCANNER.name) # get barcode using the scanner, only if autotest is disabled autotest = psdbObj.get_setup_variable("AUTOTEST_" + globalVar.station) @@ -271,13 +302,7 @@ def main(): loggerObj.getlogger().info("Station error: #Unable to complete extra tasks#") else: # Change state to "TESTS_FAILED" - if psdbObj.read_station_state(globalVar.station) == StationStates.STATION_ERROR.name: - # Abort - print( - "Error: Wrong previous station state before changing to TESTS_FAILED state. STATION_ERROR state unexpected.") - reboot_if_autotest() - exit(0) - psdbObj.change_station_state(globalVar.station, StationStates.TESTS_FAILED.name) + set_next_state(StationStates.TESTS_FAILED.name) # reset board if AUTOTEST is enabled reboot_if_autotest() @@ -295,34 +320,22 @@ if __name__ == "__main__": try: xmlObj = XMLSetup(os.path.join(test_abspath, "setup.xml")) except: - # Abort - print("Error: Cannot parse setup.xml file") - loggerObj.getlogger().info("Station error: #Cannot parse XML configuration file#") - reboot_if_autotest() - exit(0) + # Error + execute_station_error("Cannot parse setup.xml file") # Try to connect to the DB, according to setup.xml configuration if not psdbObj.open(xmlObj): - print("Error: Cannot open DB connection") - loggerObj.getlogger().info("Station error: #Cannot open DB connection#") - reboot_if_autotest() - exit(0) + # Error + execute_station_error("Cannot open DB connection") # get station name globalVar.station = socket.gethostname() - # Check if current state is "WAIT_TEST_START" - currentstate = psdbObj.read_station_state(globalVar.station) - while currentstate != StationStates.WAIT_TEST_START.name: - currentstate = psdbObj.read_station_state(globalVar.station) - if currentstate != StationStates.WAIT_TEST_START.name: - # Wait - print("Error: Wrong previous station state. WAIT_TEST_START state expected. Current state is: {}".format(currentstate)) - print("Waiting for WAIT_TEST_START state") - time.sleep(5) + # Check if current state is "WAIT_TEST_START". if not, waits here + check_first_state() # Change state to "TESTS_CHECKING_ENV" - psdbObj.change_station_state(globalVar.station, StationStates.TESTS_CHECKING_ENV.name) + set_next_state(StationStates.TESTS_CHECKING_ENV.name) # Wait before beginning time.sleep(2) @@ -332,27 +345,18 @@ if __name__ == "__main__": if qr.openQR(): qr.closeQR() else: - # Abort - print("Error: Cannot find a barcode scanner.") - loggerObj.getlogger().info("Station error: #Cannot find a barcode scanner#") - reboot_if_autotest() - exit(0) + # Error + execute_station_error("Cannot find a barcode scanner") # Look for an amperimeter amp = Amper() if not amp.open(): - # Abort - print("Error: Cannot open an amperimeter COM port.") - loggerObj.getlogger().info("Station error: #Cannot open an amperimeter COM port#") - reboot_if_autotest() - exit(0) + # Error + execute_station_error("Cannot open an amperimeter COM port") if not amp.hello(): if not amp.hello(): - # Abort - print("Error: Cannot open find an amperimeter.") - loggerObj.getlogger().info("Station error: #Cannot open find an amperimeter#") - reboot_if_autotest() - exit(0) + # Error + execute_station_error("Cannot open find an amperimeter") # Execute main main() -- cgit v1.1 From a0b900477098a435b5e434512c491f6bf0365b80 Mon Sep 17 00:00:00 2001 From: Hector Fernandez Date: Tue, 7 Jul 2020 14:23:25 +0200 Subject: Solved minor errors --- test-cli/test_main.py | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) (limited to 'test-cli/test_main.py') diff --git a/test-cli/test_main.py b/test-cli/test_main.py index 3da79f7..80dfbe2 100644 --- a/test-cli/test_main.py +++ b/test-cli/test_main.py @@ -107,7 +107,8 @@ def reboot_if_autotest(): # reset board if AUTOTEST is enabled autotest = psdbObj.get_setup_variable("AUTOTEST_" + globalVar.station) if int(autotest) == 1: - os.system('reboot') + os.system("reboot") + exit(0) def create_paramslist(params): @@ -291,12 +292,7 @@ def main(): qr.closeQR() # Change state to "FINISHED" - if psdbObj.read_station_state(globalVar.station) == StationStates.STATION_ERROR.name: - # Abort - print( - "Error: Wrong previous station state before changing to FINISHED state. STATION_ERROR state unexpected.") - exit(0) - psdbObj.change_station_state(globalVar.station, StationStates.FINISHED.name) + set_next_state(StationStates.FINISHED.name) else: psdbObj.update_taskctl_status(globalVar.taskid_ctl, "TASK_BOARD_FAIL") loggerObj.getlogger().info("Station error: #Unable to complete extra tasks#") -- cgit v1.1 From 907b96801230e04d02575a3732a73e452089637b Mon Sep 17 00:00:00 2001 From: Hector Fernandez Date: Tue, 7 Jul 2020 17:27:33 +0200 Subject: After USBLOOP test, g_mass_storage must be stopped so the board can be rebooted correctly. --- test-cli/test_main.py | 30 ++++++++++++++---------------- 1 file changed, 14 insertions(+), 16 deletions(-) (limited to 'test-cli/test_main.py') diff --git a/test-cli/test_main.py b/test-cli/test_main.py index 80dfbe2..942df61 100644 --- a/test-cli/test_main.py +++ b/test-cli/test_main.py @@ -30,6 +30,7 @@ from test.helpers.cmdline import LinuxKernel from test.helpers.amper import Amper from test.enums.StationStates import StationStates import time +import sh # global variables psdbObj = TestSrv_Database() @@ -51,8 +52,7 @@ def execute_station_error(text): while True: currentstate = psdbObj.read_station_state(globalVar.station) if currentstate == StationStates.STATION_REBOOT.name: - os.system("reboot") - exit(1) + reboot_board() time.sleep(1) @@ -62,12 +62,10 @@ def check_first_state(): while True: currentstate = psdbObj.read_station_state(globalVar.station) if currentstate == StationStates.STATION_REBOOT.name: - os.system("reboot") - exit(1) + reboot_board() time.sleep(1) elif currentstate == StationStates.STATION_REBOOT.name: - os.system("reboot") - exit(1) + reboot_board() else: starttime = time.time() while currentstate != StationStates.WAIT_TEST_START.name: @@ -81,25 +79,26 @@ def check_first_state(): while True: currentstate = psdbObj.read_station_state(globalVar.station) if currentstate == StationStates.STATION_REBOOT.name: - os.system("reboot") - exit(1) + reboot_board() time.sleep(1) elif currentstate == StationStates.STATION_REBOOT.name: - os.system("reboot") - exit(1) + reboot_board() + + +def reboot_board(): + sh.reboot() + exit(0) def set_next_state(newstate): statewritten = psdbObj.change_station_state(globalVar.station, newstate) if statewritten == StationStates.STATION_REBOOT.name: - os.system("reboot") - exit(1) + reboot_board() elif statewritten == StationStates.STATION_ERROR.name: while True: currentstate = psdbObj.read_station_state(globalVar.station) if currentstate == StationStates.STATION_REBOOT.name: - os.system("reboot") - exit(1) + reboot_board() time.sleep(1) @@ -107,8 +106,7 @@ def reboot_if_autotest(): # reset board if AUTOTEST is enabled autotest = psdbObj.get_setup_variable("AUTOTEST_" + globalVar.station) if int(autotest) == 1: - os.system("reboot") - exit(0) + reboot_board() def create_paramslist(params): -- cgit v1.1 From d46bce422fd03cd57d1ba336361da17d6efb48db Mon Sep 17 00:00:00 2001 From: Manel Caro Date: Fri, 31 Jul 2020 02:07:37 +0200 Subject: TEST restructure --- test-cli/test_main.py | 372 +++++++++++++++++++++++++++++--------------------- 1 file changed, 220 insertions(+), 152 deletions(-) (limited to 'test-cli/test_main.py') diff --git a/test-cli/test_main.py b/test-cli/test_main.py index 942df61..3f52152 100644 --- a/test-cli/test_main.py +++ b/test-cli/test_main.py @@ -1,8 +1,17 @@ +import time +import sh +import os +import unittest +import socket +import logging +import sys +import getopt +import random + +from datetime import datetime from test.helpers.int_registers import get_die_id from test.helpers.int_registers import get_internal_mac from subprocess import call -import os -import unittest from test.helpers.testsrv_db import TestSrv_Database from test.helpers.setup_xml import XMLSetup from test.runners.simple import SimpleTestRunner @@ -11,7 +20,6 @@ from test.tests.qram import Qram from test.tests.qusb import Qusb from test.tests.qeeprom import Qeeprom from test.tests.qserial import Qserial -from test.tests.qusbdual import Qusbdual from test.tests.qwifi import Qwifi from test.tests.qrtc import Qrtc from test.tests.qduplex_ser import Qduplex @@ -19,25 +27,28 @@ from test.tests.qamper import Qamper from test.tests.qnand import Qnand from test.tests.qaudio import Qaudio from test.tests.qdmesg import Qdmesg +from test.tests.qmmcflash import Qmmcflash +from test.tests.qplc import Qplc from test.helpers.globalVariables import globalVar -import socket -from test.helpers.iseelogger import ISEE_Logger -import logging -from test.tasks.flasheeprom import flash_eeprom -from test.tasks.flashmemory import flash_memory +from test.tasks.flasheeprom import EEprom_Flasher +from test.tasks.flashmemory import NAND_Flasher from test.helpers.qrreader import QRReader -from test.helpers.cmdline import LinuxKernel +from test.helpers.cmdline import LinuxKernelCmd from test.helpers.amper import Amper from test.enums.StationStates import StationStates -import time -import sh +from test.helpers.mydebugger import sentdebugmessage +import xml.etree.ElementTree as XMLParser +from test.helpers.iseelogger import logObj +from test.helpers.utils import str2bool # global variables psdbObj = TestSrv_Database() xmlObj = None -loggerObj = None test_abspath = None tests_manually_executed = [] +AutoTest = False +OverrideAutoReboot = False +delay_reboot = '0' # define clear function @@ -45,74 +56,62 @@ def clear(): # check and make call for specific operating system _ = call('clear' if os.name == 'posix' else 'cls') +def reboot_board(): + logObj.getlogger().debug("#reboot board#") + if not OverrideAutoReboot: + set_next_state(StationStates.STATION_WAIT_SHUTDOWN.name) + if delay_reboot == '0': + time.sleep(5) + sh.shutdown('-r', '+{}'.format(delay_reboot), _bg=True) + # sh.reboot(_bg=True) + print("REBOOT...") + exit(0) -def execute_station_error(text): - print("Error: {}".format(text)) - loggerObj.getlogger().info("Station error: #{}#".format(text)) +def wait_for_reboot(): while True: currentstate = psdbObj.read_station_state(globalVar.station) if currentstate == StationStates.STATION_REBOOT.name: reboot_board() - time.sleep(1) + time.sleep(5) +def execute_station_error(text): + logObj.getlogger().error("Station error: #{}#".format(text)) + wait_for_reboot() def check_first_state(): currentstate = psdbObj.read_station_state(globalVar.station) - if currentstate == StationStates.STATION_ERROR.name: - while True: - currentstate = psdbObj.read_station_state(globalVar.station) - if currentstate == StationStates.STATION_REBOOT.name: - reboot_board() - time.sleep(1) - elif currentstate == StationStates.STATION_REBOOT.name: - reboot_board() + if currentstate == StationStates.STATION_ERROR.name or currentstate == StationStates.STATION_REBOOT.name: + logObj.getlogger().debug('station state during the first check is {}'.format(currentstate)) + wait_for_reboot() else: - starttime = time.time() + count_t = int(xmlObj.getKeyVal("test_main", "wait_test_start", "240")) while currentstate != StationStates.WAIT_TEST_START.name: - if time.time() - starttime >= 30.0: + logObj.getlogger().debug('wait for WAIT_TEST_START but read: {}'.format(currentstate)) + if count_t <= 0: # Error - execute_station_error("Timeout while waiting for WAIT_TEST_START state") + execute_station_error("Timeout while waiting for WAIT_TEST_START state actual STATE = {}".format(currentstate)) else: + count_t = count_t - 5 time.sleep(5) currentstate = psdbObj.read_station_state(globalVar.station) - if currentstate == StationStates.STATION_ERROR.name: - while True: - currentstate = psdbObj.read_station_state(globalVar.station) - if currentstate == StationStates.STATION_REBOOT.name: - reboot_board() - time.sleep(1) - elif currentstate == StationStates.STATION_REBOOT.name: - reboot_board() - - -def reboot_board(): - sh.reboot() - exit(0) + if currentstate == StationStates.STATION_ERROR.name or currentstate == StationStates.STATION_REBOOT.name: + wait_for_reboot() def set_next_state(newstate): statewritten = psdbObj.change_station_state(globalVar.station, newstate) - if statewritten == StationStates.STATION_REBOOT.name: - reboot_board() - elif statewritten == StationStates.STATION_ERROR.name: - while True: - currentstate = psdbObj.read_station_state(globalVar.station) - if currentstate == StationStates.STATION_REBOOT.name: - reboot_board() - time.sleep(1) - + if statewritten == StationStates.STATION_REBOOT.name or statewritten == StationStates.STATION_ERROR.name: + wait_for_reboot() def reboot_if_autotest(): # reset board if AUTOTEST is enabled - autotest = psdbObj.get_setup_variable("AUTOTEST_" + globalVar.station) - if int(autotest) == 1: + if AutoTest: reboot_board() def create_paramslist(params): paramlist = {} - for row in params: - varname, varvalue = row + for varname, varvalue in params: paramlist[varname] = varvalue return paramlist @@ -151,13 +150,17 @@ def add_test(suite, testdefname, paramlist): elif testdefname == "AUDIO": suite.addTest(Qaudio(testdefname, "execute", paramlist)) return 0 - elif testdefname == "USBLOOP": - suite.addTest(Qusbdual(testdefname, "execute", paramlist)) - return 0 elif testdefname == "DMESG": tests_manually_executed.append(Qdmesg(testdefname, "execute", paramlist)) return 0 + elif testdefname == "MMCFLASH": + suite.addTest(Qmmcflash(testdefname, "execute", paramlist)) + return 0 + elif testdefname == "PLC_MODEM": + suite.addTest(Qplc(testdefname, "execute", paramlist)) + return 0 else: + logObj.getlogger().error("Params for test: {} not implemented (ignore it), ".format(testdefname)) return 1 @@ -166,9 +169,9 @@ def create_testsuite(): suite1 = unittest.TestSuite() # get list of tests for this board tests = psdbObj.get_tests_list(globalVar.g_uuid) + logObj.getlogger().debug("Create TestSuite: {}".format(tests)) # loop in every test for this board - for row in tests: - testid, testdefname = row + for testid, testdefname in tests: # get params for this test params = psdbObj.get_test_params_list(testid) paramlist = create_paramslist(params) @@ -176,6 +179,8 @@ def create_testsuite(): paramlist["testid"] = testid paramlist["boarduuid"] = globalVar.g_uuid paramlist["testidctl"] = globalVar.testid_ctl + paramlist['testPath'] = test_abspath + logObj.getlogger().debug("Params for test: {}:{}-{}, ".format(testid, testdefname, paramlist)) paramlist["xml"] = xmlObj paramlist["db"] = psdbObj # add test to TestSuite @@ -185,44 +190,123 @@ def create_testsuite(): def create_board(): - cmd = LinuxKernel() + + cmd = LinuxKernelCmd() model_id = cmd.getkvar("bmodel", "none") variant = cmd.getkvar("bvariant", "none") + logObj.getlogger().debug("Kernel Command line vars: bmodel: {} bvariant: {}".format(model_id, variant)) if model_id == "none" or variant == "none": # Error - execute_station_error("Cannot get model and variant information") + execute_station_error("Cannot get model {} and variant {} information".format(model_id, variant)) # get model id globalVar.g_mid = model_id + "-" + variant + logObj.getlogger().debug("Model-Variant -> {}".format(globalVar.g_mid)) # get station if "Station" not in globalVar.station: # Error - execute_station_error("Wrong station name") + execute_station_error("Hostname not defined") processor_id = get_die_id(globalVar.g_mid) - print(globalVar.g_mid) - print(processor_id) globalVar.g_uuid = psdbObj.create_board(processor_id, model_id, variant, globalVar.station, get_internal_mac(globalVar.g_mid)) - print(globalVar.g_uuid) + + logObj.getlogger().info("processor ID: {} uuid: {}".format(processor_id, globalVar.g_uuid)) def get_taskvars_list(uuid): varlist = {} - for row in psdbObj.get_task_variables_list(uuid): - varname, varvalue = row + for varname, varvalue in psdbObj.get_task_variables_list(uuid): varlist[varname] = varvalue return varlist +def execute_extra_task(): + # list of task + myTask = [] + # Change state to "EXTRATASKS_RUNNING" + set_next_state(StationStates.EXTRATASKS_RUNNING.name) + # create task control + globalVar.taskid_ctl = psdbObj.open_task(globalVar.g_uuid) + # get extra variables + varlist = get_taskvars_list(globalVar.g_uuid) + + # Put default Vars + varlist["boarduuid"] = globalVar.g_uuid + varlist["testidctl"] = globalVar.testid_ctl + varlist['testPath'] = test_abspath + varlist["xml"] = xmlObj + varlist["db"] = psdbObj + + # Get list Extra Task and activation + eeprom = varlist.get('eeprom', '0') + flashNand = varlist.get('flashNAND', '0') + + if eeprom == '1': + myEEprom = EEprom_Flasher(varlist) + myTask.append(myEEprom) + if flashNand == '1': + myNAND = NAND_Flasher(varlist) + myTask.append(myNAND) + + taskresult = True + for task in myTask: + res, data = task.Execute() + if res: + psdbObj.create_task_result(globalVar.taskid_ctl, task.getTaskName(), "TASK_OK", data) + logObj.getlogger().info("task: {} ok".format(task.getTaskName())) + else: + psdbObj.create_task_result(globalVar.taskid_ctl, task.getTaskName(), "TASK_FAIL", data) + logObj.getlogger().info("task: {} failed".format(task.getTaskName())) + taskresult = False + + # check final taskresult => ok or fail + if taskresult: + psdbObj.update_taskctl_status(globalVar.taskid_ctl, "TASK_BOARD_OK") + else: + psdbObj.update_taskctl_status(globalVar.taskid_ctl, "TASK_BOARD_FAIL") + for task in myTask: + if task.getError() != '': + execute_station_error("Unable to complete extra tasks: {}".format(task.getError())) + # Not emit a error message + execute_station_error("Unable to complete extra tasks") + +def getBarCode(fake): + + set_next_state(StationStates.WAITING_FOR_SCANNER.name) + if fake: + psdbObj.set_factorycode(globalVar.g_uuid, str(random.randint(1, 1000001))) + return + qrreceived = False + wait_count = psdbObj.get_setup_variable("QRWAIT_SEC", xmlObj.getKeyVal("test_main", "qr_wait_time", "120")) + while not qrreceived or int(wait_count) > 0: + qr = QRReader() + if qr.openQR(): + # waits 5s to receive a valid code + if qr.readQRasync(10): + qrreceived = True + factorycode = qr.getQRNumber() + psdbObj.set_factorycode(globalVar.g_uuid, factorycode) + return + qr.closeQR() + else: + qrreceived = True + del qr + time.sleep(1) + wait_count = wait_count - 1 + if int(wait_count) == 0: + execute_station_error("Cannot find a barcode scanner") + -def main(): +def execute_test (): # initialize the board create_board() # create a process globalVar.testid_ctl = psdbObj.open_test(globalVar.g_uuid) + logObj.getlogger().info("Create new Test: {}".format(globalVar.testid_ctl)) + # Change state to "TESTS_RUNNING" set_next_state(StationStates.TESTS_RUNNING.name) @@ -230,71 +314,29 @@ def main(): suite1 = create_testsuite() # Execute tests (round 1) runner1 = SimpleTestRunner(psdbObj) + logObj.getlogger().info("Runner run id: {}".format(globalVar.testid_ctl)) testresult = runner1.run(suite1) + logObj.getlogger().info("Runner run id: {} finish {}".format(globalVar.testid_ctl, testresult)) # Execute manually tests for test in tests_manually_executed: - test.execute() + logObj.getlogger().info("Execute manual test id: {}:{}".format(globalVar.testid_ctl, test.getTestName())) + mtestResult = test.execute() + logObj.getlogger().info("Execute manual test id: {}:{} Result: {}".format(globalVar.testid_ctl, test.getTestName(), + 'ok' if mtestResult else 'fail')) # execute aditional tasks, only if the test was successful if testresult.wasSuccessful(): - # Change state to "TESTS_OK" + logObj.getlogger().info("Execute test id: {} finished succesfully".format(globalVar.testid_ctl)) set_next_state(StationStates.TESTS_OK.name) - # Change state to "EXTRATASKS_RUNNING" - set_next_state(StationStates.EXTRATASKS_RUNNING.name) - # create task control - globalVar.taskid_ctl = psdbObj.open_task(globalVar.g_uuid) - # get extra variables - varlist = get_taskvars_list(globalVar.g_uuid) - - alltasksok = False - - # flash eeprom - resulteeprom = 0 - if "eeprompath" in varlist and len(varlist["eeprompath"]) > 0: - mac0 = psdbObj.get_board_macaddr(globalVar.g_uuid) - resulteeprom, eepromdata = flash_eeprom(varlist["eeprompath"], globalVar.g_uuid, mac0) - psdbObj.create_task_result(globalVar.taskid_ctl, "FLASHEEPROM", - "TASK_OK" if resulteeprom == 0 else "TASK_FAIL", eepromdata) - - # flash non-volatile memory - resultmemory = 0 - if "flashimagepath" in varlist and len(varlist["flashimagepath"]) > 0: - resultmemory = flash_memory(varlist["flashimagepath"]) - psdbObj.create_task_result(globalVar.taskid_ctl, "FLASHMEMORY", - "TASK_OK" if resultmemory == 0 else "TASK_FAIL", - varlist["flashimagepath"]) - - # update status with the result - if resulteeprom == 0 and resultmemory == 0: - # finish tasks - psdbObj.update_taskctl_status(globalVar.taskid_ctl, "TASK_BOARD_OK") - - # Change state to "WAITING_FOR_SCANNER" - set_next_state(StationStates.WAITING_FOR_SCANNER.name) - - # get barcode using the scanner, only if autotest is disabled - autotest = psdbObj.get_setup_variable("AUTOTEST_" + globalVar.station) - if int(autotest) == 1: - psdbObj.set_factorycode(globalVar.g_uuid, "1234567890") # fake factory code - else: - qrreceived = False - while not qrreceived: - qr = QRReader() - if qr.openQR(): - # waits 5s to receive a valid code - if qr.readQRasync(5): - qrreceived = True - factorycode = qr.getQRNumber() - psdbObj.set_factorycode(globalVar.g_uuid, factorycode) - qr.closeQR() - - # Change state to "FINISHED" - set_next_state(StationStates.FINISHED.name) - else: - psdbObj.update_taskctl_status(globalVar.taskid_ctl, "TASK_BOARD_FAIL") - loggerObj.getlogger().info("Station error: #Unable to complete extra tasks#") + logObj.getlogger().info("Execute test id: {} extra task".format(globalVar.testid_ctl)) + execute_extra_task() + logObj.getlogger().info("Execute test id: {} wait read scanner".format(globalVar.testid_ctl)) + getBarCode(AutoTest) + set_next_state(StationStates.FINISHED.name) + logObj.getlogger().info("Execute test id: {} wait read scanner finish".format(globalVar.testid_ctl)) else: + logObj.getlogger().info("Execute test id: {} finished with one or more fails".format(globalVar.testid_ctl)) # Change state to "TESTS_FAILED" set_next_state(StationStates.TESTS_FAILED.name) @@ -302,28 +344,75 @@ def main(): reboot_if_autotest() +def enableFaulthandler(): + """ Enable faulthandler for all threads. + + If the faulthandler package is available, this function disables and then + re-enables fault handling for all threads (this is necessary to ensure any + new threads are handled correctly), and returns True. + + If faulthandler is not available, then returns False. + """ + try: + import faulthandler + # necessary to disable first or else new threads may not be handled. + faulthandler.disable() + faulthandler.enable(all_threads=True) + return True + except ImportError: + return False + if __name__ == "__main__": # Clear the shell screen clear() - test_abspath = os.path.dirname(os.path.abspath(__file__)) - # create logger - loggerObj = ISEE_Logger(logging.INFO) + enableFaulthandler() + + test_abspath = os.path.dirname(os.path.abspath(__file__)) + configFile = 'setup.xml' + loglevel = 'INFO' + logObj.getlogger().info("Test program Started : {}".format(datetime.now())) + try: + argv = sys.argv[1:] + opts, args = getopt.getopt(argv, 'c:l:', ['cfg', 'loglevel']) + for opt, args in opts: + if opt in ('-c', '--setup'): + configFile = args + if opt in ('-l', '--loglevel'): + loglevel = args + except getopt.GetoptError as err: + logObj.getlogger().error('#{}#'.format(err)) + + if loglevel != 'INFO': + logObj.setLogLevel(loglevel) # Try to parse the setup.xml file + logObj.getlogger().debug("parse config file: {}". format(configFile)) try: - xmlObj = XMLSetup(os.path.join(test_abspath, "setup.xml")) - except: + xmlObj = XMLSetup(os.path.join(test_abspath, configFile)) + except XMLParser.ParseError as error: # Error - execute_station_error("Cannot parse setup.xml file") + execute_station_error("Error parsing {} file {}".format(configFile, error.msg)) # Try to connect to the DB, according to setup.xml configuration if not psdbObj.open(xmlObj): # Error execute_station_error("Cannot open DB connection") + else: + logObj.getlogger().debug("database connection failed: {}".format(psdbObj.getConfig())) # get station name globalVar.station = socket.gethostname() + logObj.getlogger().info("Test Station: {}".format(globalVar.station)) + + if psdbObj.get_setup_variable("AUTOTEST_" + globalVar.station, xmlObj.getKeyVal("test_main", "autotest", "0")) == '1': + AutoTest = True + + OverrideAutoReboot = str2bool(xmlObj.getKeyVal("test_main", "OverrideAutoReboot", "0")) + delay_reboot = xmlObj.getKeyVal("test_main", "delayReboot", "0") + + logObj.getlogger().info("AutoTest: {} OverrideAutoReboot: {} delay Reboot: {}".format(AutoTest, OverrideAutoReboot, delay_reboot)) + # Check if current state is "WAIT_TEST_START". if not, waits here check_first_state() @@ -331,28 +420,7 @@ if __name__ == "__main__": # Change state to "TESTS_CHECKING_ENV" set_next_state(StationStates.TESTS_CHECKING_ENV.name) - # Wait before beginning - time.sleep(2) - - # Look for a barcode scanner - qr = QRReader() - if qr.openQR(): - qr.closeQR() - else: - # Error - execute_station_error("Cannot find a barcode scanner") - - # Look for an amperimeter - amp = Amper() - if not amp.open(): - # Error - execute_station_error("Cannot open an amperimeter COM port") - if not amp.hello(): - if not amp.hello(): - # Error - execute_station_error("Cannot open find an amperimeter") - - # Execute main - main() + # execute main program + execute_test() exit(0) -- cgit v1.1 From 227d9783fe8327b84ac3b0e032f012ba144816f8 Mon Sep 17 00:00:00 2001 From: Manel Caro Date: Fri, 31 Jul 2020 13:58:41 +0200 Subject: IGEP0048: added plc test --- test-cli/test_main.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'test-cli/test_main.py') diff --git a/test-cli/test_main.py b/test-cli/test_main.py index 3f52152..bce9de9 100644 --- a/test-cli/test_main.py +++ b/test-cli/test_main.py @@ -279,7 +279,7 @@ def getBarCode(fake): psdbObj.set_factorycode(globalVar.g_uuid, str(random.randint(1, 1000001))) return qrreceived = False - wait_count = psdbObj.get_setup_variable("QRWAIT_SEC", xmlObj.getKeyVal("test_main", "qr_wait_time", "120")) + wait_count = psdbObj.get_setup_variable("test_main", xmlObj.getKeyVal("test_main", "qr_wait_time", "120")) while not qrreceived or int(wait_count) > 0: qr = QRReader() if qr.openQR(): -- cgit v1.1 From acc0bfecb73a2ef47620aabb8234f507aed69085 Mon Sep 17 00:00:00 2001 From: Hector Fernandez Date: Tue, 29 Sep 2020 13:49:33 +0200 Subject: Added VIDEO test to the pool. --- test-cli/test_main.py | 4 ++++ 1 file changed, 4 insertions(+) (limited to 'test-cli/test_main.py') diff --git a/test-cli/test_main.py b/test-cli/test_main.py index bce9de9..ebaf0d9 100644 --- a/test-cli/test_main.py +++ b/test-cli/test_main.py @@ -29,6 +29,7 @@ from test.tests.qaudio import Qaudio from test.tests.qdmesg import Qdmesg from test.tests.qmmcflash import Qmmcflash from test.tests.qplc import Qplc +from test.tests.qvideo import Qvideo from test.helpers.globalVariables import globalVar from test.tasks.flasheeprom import EEprom_Flasher from test.tasks.flashmemory import NAND_Flasher @@ -159,6 +160,9 @@ def add_test(suite, testdefname, paramlist): elif testdefname == "PLC_MODEM": suite.addTest(Qplc(testdefname, "execute", paramlist)) return 0 + elif testdefname == "VIDEO": + suite.addTest(Qvideo(testdefname, "execute", paramlist)) + return 0 else: logObj.getlogger().error("Params for test: {} not implemented (ignore it), ".format(testdefname)) return 1 -- cgit v1.1 From ce7885cf9cb5b4e232ac8659eaaf82591d14eaa9 Mon Sep 17 00:00:00 2001 From: Manel Caro Date: Fri, 2 Oct 2020 14:10:48 +0200 Subject: fix database connection message --- test-cli/test_main.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'test-cli/test_main.py') diff --git a/test-cli/test_main.py b/test-cli/test_main.py index ebaf0d9..7bf0c34 100644 --- a/test-cli/test_main.py +++ b/test-cli/test_main.py @@ -37,7 +37,6 @@ from test.helpers.qrreader import QRReader from test.helpers.cmdline import LinuxKernelCmd from test.helpers.amper import Amper from test.enums.StationStates import StationStates -from test.helpers.mydebugger import sentdebugmessage import xml.etree.ElementTree as XMLParser from test.helpers.iseelogger import logObj from test.helpers.utils import str2bool @@ -403,7 +402,7 @@ if __name__ == "__main__": # Error execute_station_error("Cannot open DB connection") else: - logObj.getlogger().debug("database connection failed: {}".format(psdbObj.getConfig())) + logObj.getlogger().debug("database connection ok: {}".format(psdbObj.getConfig())) # get station name globalVar.station = socket.gethostname() @@ -419,6 +418,7 @@ if __name__ == "__main__": # Check if current state is "WAIT_TEST_START". if not, waits here + # if xmlObj.getKeyVal("test_main", "overrideCheckWaitStart", "0") == "1": check_first_state() # Change state to "TESTS_CHECKING_ENV" -- cgit v1.1 From a67f3fc7c10720135361ef99a8bd3110731c0011 Mon Sep 17 00:00:00 2001 From: Manel Caro Date: Mon, 5 Oct 2020 11:45:04 +0200 Subject: Override wait start test state --- test-cli/test_main.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) (limited to 'test-cli/test_main.py') diff --git a/test-cli/test_main.py b/test-cli/test_main.py index 7bf0c34..1f5d1e4 100644 --- a/test-cli/test_main.py +++ b/test-cli/test_main.py @@ -416,10 +416,11 @@ if __name__ == "__main__": logObj.getlogger().info("AutoTest: {} OverrideAutoReboot: {} delay Reboot: {}".format(AutoTest, OverrideAutoReboot, delay_reboot)) - # Check if current state is "WAIT_TEST_START". if not, waits here - # if xmlObj.getKeyVal("test_main", "overrideCheckWaitStart", "0") == "1": - check_first_state() + if xmlObj.getKeyVal("test_main", "overrideCheckWaitStart", "0") == "0": + check_first_state() + else: + psdbObj.setDevelStationState(globalVar.station, "WAIT_TEST_START") # Change state to "TESTS_CHECKING_ENV" set_next_state(StationStates.TESTS_CHECKING_ENV.name) -- cgit v1.1