summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorHector Fernandez <hector@iatec.biz>2020-04-13 09:49:51 +0200
committerHector Fernandez <hector@iatec.biz>2020-04-13 10:06:37 +0200
commitf86887baef80509460da0bff8f48b2900a627282 (patch)
tree70276382e2bba14673f763e2dbcace998a8fe685
parent0c610ebbef5928f17ec8c14d872b4d4298d3c4fb (diff)
downloadboard-f86887baef80509460da0bff8f48b2900a627282.zip
board-f86887baef80509460da0bff8f48b2900a627282.tar.gz
board-f86887baef80509460da0bff8f48b2900a627282.tar.bz2
Added new DB functions. Added Qrreader.py file.
-rw-r--r--test-cli/test/helpers/qrreader.py120
-rw-r--r--test-cli/test/helpers/testsrv_db.py25
-rw-r--r--test-cli/test/tests/qethernet.py11
-rw-r--r--test-cli/test/tests/qwifi.py1
-rw-r--r--test-cli/test_main.py34
5 files changed, 172 insertions, 19 deletions
diff --git a/test-cli/test/helpers/qrreader.py b/test-cli/test/helpers/qrreader.py
new file mode 100644
index 0000000..1d3768b
--- /dev/null
+++ b/test-cli/test/helpers/qrreader.py
@@ -0,0 +1,120 @@
+import evdev
+from evdev import InputDevice, categorize, ecodes
+import threading
+import time
+import selectors
+from selectors import DefaultSelector, EVENT_READ
+
+selector = selectors.DefaultSelector()
+
+qrdevice_list = [
+ "Honeywell Imaging & Mobility 1900",
+ "Manufacturer Barcode Reader"
+]
+
+
+qrkey_list = { 'KEY_0':'0',
+ 'KEY_1':'1',
+ 'KEY_2':'2',
+ 'KEY_3':'3',
+ 'KEY_4':'4',
+ 'KEY_5':'5',
+ 'KEY_6':'6',
+ 'KEY_7':'7',
+ 'KEY_8':'8',
+ 'KEY_9':'9'
+ }
+
+
+class QRReader:
+ __qrstr = ""
+ __myReader = {}
+ __numdigits = 10
+ __dev = None
+
+ def __init__(self):
+ self.getQRlist()
+
+ def getQRlist(self):
+ devices = [evdev.InputDevice(path) for path in evdev.list_devices()]
+ for device in devices:
+ if device.name in qrdevice_list:
+ self.__myReader['NAME'] = device.name
+ print(self.__myReader['NAME'])
+ self.__myReader['PATH'] = device.path
+ print(self.__myReader['PATH'])
+ self.__myReader['PHYS'] = device.phys
+ print(self.__myReader['PHYS'])
+
+ def IsQR (self):
+ return 'NAME' in self.__myReader
+
+ def getQRNumber(self):
+ return self.__qrstr
+
+ def openQR(self):
+ if self.IsQR():
+ self.closeQR()
+ self.__dev = InputDevice(self.__myReader['PATH'])
+ return True
+ return False
+
+ def closeQR(self):
+ if self.__dev:
+ del self.__dev
+ self.__dev = None
+
+ def readQR(self):
+ """" Sync Read up to numdigits """
+ count = 0
+ self.__qrstr = ""
+ if self.__dev:
+ self.__dev.grab()
+ for event in self.__dev.read_loop():
+ if event.type == evdev.ecodes.EV_KEY:
+ c_event = categorize(event)
+ if c_event.keycode in qrkey_list:
+ if c_event.keystate == 0:
+ self.__qrstr += qrkey_list[c_event.keycode]
+ count += 1
+ if count == self.__numdigits:
+ break
+ self.__dev.ungrab()
+ return True
+ return False
+
+ def wait_event (self, timeout):
+ selector.register(self.__dev, selectors.EVENT_READ)
+ while True:
+ events = selector.select(timeout);
+ if not events:
+ return False
+ else:
+ return True
+
+ def readQRasync(self, timeout):
+ count = 0
+ r = False
+ self.__qrstr = ""
+ if self.__dev:
+ self.__dev.grab()
+ if self.wait_event(timeout):
+ for event in self.__dev.read_loop():
+ if event.type == evdev.ecodes.EV_KEY:
+ c_event = categorize(event)
+ if c_event.keycode in qrkey_list:
+ if c_event.keystate == 0:
+ self.__qrstr += qrkey_list[c_event.keycode]
+ count += 1
+ if count == self.__numdigits:
+ r = True
+ break
+ self.__dev.ungrab()
+ return r
+ return False
+
+
+#qr = QRReader()
+#if qr.openQR():
+# print(qr.readQRasync(2))
+#qr.closeQR() \ No newline at end of file
diff --git a/test-cli/test/helpers/testsrv_db.py b/test-cli/test/helpers/testsrv_db.py
index d563e74..dc4fb55 100644
--- a/test-cli/test/helpers/testsrv_db.py
+++ b/test-cli/test/helpers/testsrv_db.py
@@ -139,12 +139,12 @@ class TestSrv_Database(object):
def create_task_result(self, taskid_ctl, name, newstatus, newinfo=None):
sql = "SELECT isee.f_create_task_result({},'{}','{}','{}')".format(taskid_ctl, name, newstatus, newinfo)
- print('>>>' + sql)
+ # print('>>>' + sql)
try:
self.__sqlObject.db_execute_query(sql)
except Exception as err:
r = find_between(str(err), '#', '#')
- print(r)
+ # print(r)
return None
def update_taskctl_status(self, taskid_ctl, newstatus):
@@ -156,3 +156,24 @@ class TestSrv_Database(object):
r = find_between(str(err), '#', '#')
# print(r)
return None
+
+ def set_factorycode(self, uuid, factorycode):
+ sql = "SELECT isee.f_set_factorycode('{}','{}')".format(uuid, factorycode)
+ # print('>>>' + sql)
+ try:
+ self.__sqlObject.db_execute_query(sql)
+ except Exception as err:
+ r = find_between(str(err), '#', '#')
+ # print(r)
+ return None
+
+ def bond_to_station(self, uuid, station):
+ sql = "SELECT station.bond_to_station('{}','{}')".format(uuid, station)
+ # print('>>>' + sql)
+ try:
+ self.__sqlObject.db_execute_query(sql)
+ except Exception as err:
+ r = find_between(str(err), '#', '#')
+ # print(r)
+ return None
+
diff --git a/test-cli/test/tests/qethernet.py b/test-cli/test/tests/qethernet.py
index 1d4c9bc..027931c 100644
--- a/test-cli/test/tests/qethernet.py
+++ b/test-cli/test/tests/qethernet.py
@@ -19,10 +19,6 @@ class Qethernet(unittest.TestCase):
self.__serverip = varlist["serverip"]
else:
raise Exception('sip param inside Qethernet have been be defined')
- if "bind" in varlist:
- self.__bind = varlist["bind"]
- else:
- self.__bind = None
if "bwexpected" in varlist:
self.__bwexpected = varlist["bwexpected"]
else:
@@ -36,11 +32,8 @@ class Qethernet(unittest.TestCase):
def execute(self):
# execute iperf command against the server
- if self.__bind is None:
- p = sh.iperf("-c", self.__serverip, "-x", "CMSV", "-n", self.__numbytestx, "-f", "m", "-p", self.__port)
- else:
- p = sh.iperf("-c", self.__serverip, "-x", "CMSV", "-n", self.__numbytestx, "-f", "m", "-p", self.__port, "-B",
- self.__bind)
+ p = sh.iperf("-c", self.__serverip, "-x", "CMSV", "-n", self.__numbytestx, "-f", "m", "-p", self.__port)
+
# check if it was executed succesfully
if p.exit_code == 0:
if p.stdout == "":
diff --git a/test-cli/test/tests/qwifi.py b/test-cli/test/tests/qwifi.py
index 8daf069..b0b8d6b 100644
--- a/test-cli/test/tests/qwifi.py
+++ b/test-cli/test/tests/qwifi.py
@@ -44,6 +44,7 @@ class Qwifi(unittest.TestCase):
# check if the board has ip in the wlan0 interface
p = sh.ifconfig("wlan0")
if p.exit_code == 0:
+ # check if wlan0 has an IP
result = re.search(
'inet addr:(?!127\.0{1,3}\.0{1,3}\.0{0,2}1$)((25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.){3}(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)',
p.stdout.decode('ascii'))
diff --git a/test-cli/test_main.py b/test-cli/test_main.py
index ddd62aa..5abbe85 100644
--- a/test-cli/test_main.py
+++ b/test-cli/test_main.py
@@ -113,6 +113,8 @@ def create_board():
print(processor_id)
globalVar.g_uuid = psdbObj.create_board(processor_id, model_id, variant, globalVar.station,
get_mac(globalVar.g_mid))
+ print(globalVar.g_uuid)
+ psdbObj.bond_to_station(globalVar.g_uuid, globalVar.station)
def get_taskvars_list(uuid):
@@ -130,14 +132,19 @@ def main():
globalVar.testid_ctl = psdbObj.open_test(globalVar.g_uuid)
# create and run tests according to the board type
runner = SimpleTestRunner(psdbObj)
+ loggerObj.getlogger().info("Tests running")
testresult = runner.run(create_testsuite())
# execute aditional tasks, only if the test was succesfull
- if testresult.wasSuccessful():
+ # if testresult.wasSuccessful():
+ if True:
+ loggerObj.getlogger().info("Extra tasks running")
# create task control
globalVar.taskid_ctl = psdbObj.open_task(globalVar.g_uuid)
# get extra variables
varlist = get_taskvars_list(globalVar.g_uuid)
+ alltasksok = False;
+
# flash eeprom
resulteeprom = 0
if "eeprompath" in varlist and len(varlist["eeprompath"]) > 0:
@@ -147,18 +154,29 @@ def main():
"TASK_OK" if resulteeprom == 0 else "TASK_FAIL", eepromdata)
# flash non-volatile memory
- resultmemory = 0
- if "image" in varlist and len(varlist["image"]) > 0:
- resultmemory = flash_memory(varlist["image"])
- psdbObj.create_task_result(globalVar.taskid_ctl, "FLASHMEMORY",
- "TASK_OK" if resultmemory == 0 else "TASK_FAIL", varlist["image"])
+ # resultmemory = 0
+ # if "image" in varlist and len(varlist["image"]) > 0:
+ # resultmemory = flash_memory(varlist["image"])
+ # psdbObj.create_task_result(globalVar.taskid_ctl, "FLASHMEMORY",
+ # "TASK_OK" if resultmemory == 0 else "TASK_FAIL", varlist["image"])
# update status with the result
- if resulteeprom == 0 and resultmemory == 0:
+ # if resulteeprom == 0 and resultmemory == 0:
+ if resulteeprom == 0:
+ alltasksok = True;
psdbObj.update_taskctl_status(globalVar.taskid_ctl, "TASK_BOARD_OK")
else:
psdbObj.update_taskctl_status(globalVar.taskid_ctl, "TASK_BOARD_FAIL")
+ if alltasksok:
+ # get barcode using the scanner
+ loggerObj.getlogger().info("Waiting for barcode scanner")
+ # TODO -----
+ factorycode = "XXXXX-XXXXX"
+ psdbObj.set_factorycode(globalVar.g_uuid, factorycode)
+
+ loggerObj.getlogger().info("Python program finished")
+
if __name__ == "__main__":
# Clear the shell screen
@@ -166,7 +184,7 @@ if __name__ == "__main__":
# create logger
loggerObj = ISEE_Logger(logging.INFO)
- # logger = loggerObj.getlogger().info("Starting test script...")
+ loggerObj.getlogger().info("Python program started")
# Try to parse the setup.xml file
try: