From f86887baef80509460da0bff8f48b2900a627282 Mon Sep 17 00:00:00 2001 From: Hector Fernandez Date: Mon, 13 Apr 2020 09:49:51 +0200 Subject: Added new DB functions. Added Qrreader.py file. --- test-cli/test/helpers/qrreader.py | 120 ++++++++++++++++++++++++++++++++++++ test-cli/test/helpers/testsrv_db.py | 25 +++++++- test-cli/test/tests/qethernet.py | 11 +--- test-cli/test/tests/qwifi.py | 1 + test-cli/test_main.py | 34 +++++++--- 5 files changed, 172 insertions(+), 19 deletions(-) create mode 100644 test-cli/test/helpers/qrreader.py diff --git a/test-cli/test/helpers/qrreader.py b/test-cli/test/helpers/qrreader.py new file mode 100644 index 0000000..1d3768b --- /dev/null +++ b/test-cli/test/helpers/qrreader.py @@ -0,0 +1,120 @@ +import evdev +from evdev import InputDevice, categorize, ecodes +import threading +import time +import selectors +from selectors import DefaultSelector, EVENT_READ + +selector = selectors.DefaultSelector() + +qrdevice_list = [ + "Honeywell Imaging & Mobility 1900", + "Manufacturer Barcode Reader" +] + + +qrkey_list = { 'KEY_0':'0', + 'KEY_1':'1', + 'KEY_2':'2', + 'KEY_3':'3', + 'KEY_4':'4', + 'KEY_5':'5', + 'KEY_6':'6', + 'KEY_7':'7', + 'KEY_8':'8', + 'KEY_9':'9' + } + + +class QRReader: + __qrstr = "" + __myReader = {} + __numdigits = 10 + __dev = None + + def __init__(self): + self.getQRlist() + + def getQRlist(self): + devices = [evdev.InputDevice(path) for path in evdev.list_devices()] + for device in devices: + if device.name in qrdevice_list: + self.__myReader['NAME'] = device.name + print(self.__myReader['NAME']) + self.__myReader['PATH'] = device.path + print(self.__myReader['PATH']) + self.__myReader['PHYS'] = device.phys + print(self.__myReader['PHYS']) + + def IsQR (self): + return 'NAME' in self.__myReader + + def getQRNumber(self): + return self.__qrstr + + def openQR(self): + if self.IsQR(): + self.closeQR() + self.__dev = InputDevice(self.__myReader['PATH']) + return True + return False + + def closeQR(self): + if self.__dev: + del self.__dev + self.__dev = None + + def readQR(self): + """" Sync Read up to numdigits """ + count = 0 + self.__qrstr = "" + if self.__dev: + self.__dev.grab() + for event in self.__dev.read_loop(): + if event.type == evdev.ecodes.EV_KEY: + c_event = categorize(event) + if c_event.keycode in qrkey_list: + if c_event.keystate == 0: + self.__qrstr += qrkey_list[c_event.keycode] + count += 1 + if count == self.__numdigits: + break + self.__dev.ungrab() + return True + return False + + def wait_event (self, timeout): + selector.register(self.__dev, selectors.EVENT_READ) + while True: + events = selector.select(timeout); + if not events: + return False + else: + return True + + def readQRasync(self, timeout): + count = 0 + r = False + self.__qrstr = "" + if self.__dev: + self.__dev.grab() + if self.wait_event(timeout): + for event in self.__dev.read_loop(): + if event.type == evdev.ecodes.EV_KEY: + c_event = categorize(event) + if c_event.keycode in qrkey_list: + if c_event.keystate == 0: + self.__qrstr += qrkey_list[c_event.keycode] + count += 1 + if count == self.__numdigits: + r = True + break + self.__dev.ungrab() + return r + return False + + +#qr = QRReader() +#if qr.openQR(): +# print(qr.readQRasync(2)) +#qr.closeQR() \ No newline at end of file diff --git a/test-cli/test/helpers/testsrv_db.py b/test-cli/test/helpers/testsrv_db.py index d563e74..dc4fb55 100644 --- a/test-cli/test/helpers/testsrv_db.py +++ b/test-cli/test/helpers/testsrv_db.py @@ -139,12 +139,12 @@ class TestSrv_Database(object): def create_task_result(self, taskid_ctl, name, newstatus, newinfo=None): sql = "SELECT isee.f_create_task_result({},'{}','{}','{}')".format(taskid_ctl, name, newstatus, newinfo) - print('>>>' + sql) + # print('>>>' + sql) try: self.__sqlObject.db_execute_query(sql) except Exception as err: r = find_between(str(err), '#', '#') - print(r) + # print(r) return None def update_taskctl_status(self, taskid_ctl, newstatus): @@ -156,3 +156,24 @@ class TestSrv_Database(object): r = find_between(str(err), '#', '#') # print(r) return None + + def set_factorycode(self, uuid, factorycode): + sql = "SELECT isee.f_set_factorycode('{}','{}')".format(uuid, factorycode) + # print('>>>' + sql) + try: + self.__sqlObject.db_execute_query(sql) + except Exception as err: + r = find_between(str(err), '#', '#') + # print(r) + return None + + def bond_to_station(self, uuid, station): + sql = "SELECT station.bond_to_station('{}','{}')".format(uuid, station) + # print('>>>' + sql) + try: + self.__sqlObject.db_execute_query(sql) + except Exception as err: + r = find_between(str(err), '#', '#') + # print(r) + return None + diff --git a/test-cli/test/tests/qethernet.py b/test-cli/test/tests/qethernet.py index 1d4c9bc..027931c 100644 --- a/test-cli/test/tests/qethernet.py +++ b/test-cli/test/tests/qethernet.py @@ -19,10 +19,6 @@ class Qethernet(unittest.TestCase): self.__serverip = varlist["serverip"] else: raise Exception('sip param inside Qethernet have been be defined') - if "bind" in varlist: - self.__bind = varlist["bind"] - else: - self.__bind = None if "bwexpected" in varlist: self.__bwexpected = varlist["bwexpected"] else: @@ -36,11 +32,8 @@ class Qethernet(unittest.TestCase): def execute(self): # execute iperf command against the server - if self.__bind is None: - p = sh.iperf("-c", self.__serverip, "-x", "CMSV", "-n", self.__numbytestx, "-f", "m", "-p", self.__port) - else: - p = sh.iperf("-c", self.__serverip, "-x", "CMSV", "-n", self.__numbytestx, "-f", "m", "-p", self.__port, "-B", - self.__bind) + p = sh.iperf("-c", self.__serverip, "-x", "CMSV", "-n", self.__numbytestx, "-f", "m", "-p", self.__port) + # check if it was executed succesfully if p.exit_code == 0: if p.stdout == "": diff --git a/test-cli/test/tests/qwifi.py b/test-cli/test/tests/qwifi.py index 8daf069..b0b8d6b 100644 --- a/test-cli/test/tests/qwifi.py +++ b/test-cli/test/tests/qwifi.py @@ -44,6 +44,7 @@ class Qwifi(unittest.TestCase): # check if the board has ip in the wlan0 interface p = sh.ifconfig("wlan0") if p.exit_code == 0: + # check if wlan0 has an IP result = re.search( 'inet addr:(?!127\.0{1,3}\.0{1,3}\.0{0,2}1$)((25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.){3}(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)', p.stdout.decode('ascii')) diff --git a/test-cli/test_main.py b/test-cli/test_main.py index ddd62aa..5abbe85 100644 --- a/test-cli/test_main.py +++ b/test-cli/test_main.py @@ -113,6 +113,8 @@ def create_board(): print(processor_id) globalVar.g_uuid = psdbObj.create_board(processor_id, model_id, variant, globalVar.station, get_mac(globalVar.g_mid)) + print(globalVar.g_uuid) + psdbObj.bond_to_station(globalVar.g_uuid, globalVar.station) def get_taskvars_list(uuid): @@ -130,14 +132,19 @@ def main(): globalVar.testid_ctl = psdbObj.open_test(globalVar.g_uuid) # create and run tests according to the board type runner = SimpleTestRunner(psdbObj) + loggerObj.getlogger().info("Tests running") testresult = runner.run(create_testsuite()) # execute aditional tasks, only if the test was succesfull - if testresult.wasSuccessful(): + # if testresult.wasSuccessful(): + if True: + loggerObj.getlogger().info("Extra tasks running") # create task control globalVar.taskid_ctl = psdbObj.open_task(globalVar.g_uuid) # get extra variables varlist = get_taskvars_list(globalVar.g_uuid) + alltasksok = False; + # flash eeprom resulteeprom = 0 if "eeprompath" in varlist and len(varlist["eeprompath"]) > 0: @@ -147,18 +154,29 @@ def main(): "TASK_OK" if resulteeprom == 0 else "TASK_FAIL", eepromdata) # flash non-volatile memory - resultmemory = 0 - if "image" in varlist and len(varlist["image"]) > 0: - resultmemory = flash_memory(varlist["image"]) - psdbObj.create_task_result(globalVar.taskid_ctl, "FLASHMEMORY", - "TASK_OK" if resultmemory == 0 else "TASK_FAIL", varlist["image"]) + # resultmemory = 0 + # if "image" in varlist and len(varlist["image"]) > 0: + # resultmemory = flash_memory(varlist["image"]) + # psdbObj.create_task_result(globalVar.taskid_ctl, "FLASHMEMORY", + # "TASK_OK" if resultmemory == 0 else "TASK_FAIL", varlist["image"]) # update status with the result - if resulteeprom == 0 and resultmemory == 0: + # if resulteeprom == 0 and resultmemory == 0: + if resulteeprom == 0: + alltasksok = True; psdbObj.update_taskctl_status(globalVar.taskid_ctl, "TASK_BOARD_OK") else: psdbObj.update_taskctl_status(globalVar.taskid_ctl, "TASK_BOARD_FAIL") + if alltasksok: + # get barcode using the scanner + loggerObj.getlogger().info("Waiting for barcode scanner") + # TODO ----- + factorycode = "XXXXX-XXXXX" + psdbObj.set_factorycode(globalVar.g_uuid, factorycode) + + loggerObj.getlogger().info("Python program finished") + if __name__ == "__main__": # Clear the shell screen @@ -166,7 +184,7 @@ if __name__ == "__main__": # create logger loggerObj = ISEE_Logger(logging.INFO) - # logger = loggerObj.getlogger().info("Starting test script...") + loggerObj.getlogger().info("Python program started") # Try to parse the setup.xml file try: -- cgit v1.1