diff options
Diffstat (limited to 'test-cli/test')
-rw-r--r-- | test-cli/test/helpers/psqldb.py | 29 | ||||
-rw-r--r-- | test-cli/test/helpers/testsrv_db.py | 37 | ||||
-rw-r--r-- | test-cli/test/runners/simple.py | 11 | ||||
-rw-r--r-- | test-cli/test/tasks/__init__.py | 0 | ||||
-rw-r--r-- | test-cli/test/tasks/flasheeprom.py | 56 | ||||
-rw-r--r-- | test-cli/test/tasks/flashmemory.py | 12 | ||||
-rw-r--r-- | test-cli/test/tasks/generatedmesg.py | 17 | ||||
-rw-r--r-- | test-cli/test/tests/qamper.py | 23 | ||||
-rw-r--r-- | test-cli/test/tests/qduplex_ser.py | 5 | ||||
-rw-r--r-- | test-cli/test/tests/qeeprom.py | 4 | ||||
-rw-r--r-- | test-cli/test/tests/qethernet.py | 38 | ||||
-rw-r--r-- | test-cli/test/tests/qi2c.py | 14 | ||||
-rw-r--r-- | test-cli/test/tests/qnand.py | 17 | ||||
-rw-r--r-- | test-cli/test/tests/qram.py | 8 | ||||
-rw-r--r-- | test-cli/test/tests/qrtc.py | 4 | ||||
-rw-r--r-- | test-cli/test/tests/qserial.py | 5 | ||||
-rw-r--r-- | test-cli/test/tests/qusb.py | 5 | ||||
-rw-r--r-- | test-cli/test/tests/qwifi.py | 34 |
18 files changed, 270 insertions, 49 deletions
diff --git a/test-cli/test/helpers/psqldb.py b/test-cli/test/helpers/psqldb.py index 1d0e422..c98338a 100644 --- a/test-cli/test/helpers/psqldb.py +++ b/test-cli/test/helpers/psqldb.py @@ -1,29 +1,30 @@ import psycopg2 + class PgSQLConnection(object): """aaaaaaa""" __conection_object = None __db_config = {'dbname': 'testsrv', 'host': '192.168.2.171', - 'password': 'Idkfa2009', 'port': 5432, 'user': 'admin'} + 'password': 'Idkfa2009', 'port': 5432, 'user': 'admin'} - def __init__ (self, connect_str = None): + def __init__(self, connect_str=None): self.__conection_object = None if connect_str is not None: self.__db_config = connect_str else: self.__db_config = {'dbname': 'testsrv', 'host': '192.168.2.171', - 'password': 'Idkfa2009', 'port': 5432, 'user': 'admin'} + 'password': 'Idkfa2009', 'port': 5432, 'user': 'admin'} - def db_connect (self, connect_str = None): + def db_connect(self, connect_str=None): result = False try: if connect_str == None: self.__conection_object = psycopg2.connect(**self.__db_config) else: - self.__db_config = connect_str; + self.__db_config = connect_str self.__conection_object = psycopg2.connect(**self.__db_config) - + self.__conection_object.autocommit = True result = True except Exception as error: @@ -37,20 +38,26 @@ class PgSQLConnection(object): cur.close() return data + def db_upload_large_file(self, filepath): + # a new connection must be created to use large objects + __conn = psycopg2.connect(**self.__db_config) + lobj = __conn.lobject(oid=0, mode="rw", new_oid=0, new_file=filepath) + fileoid = lobj.oid + lobj.close() + __conn.commit() + __conn.close() + return fileoid + def commit(self): self.__conection_object.commit() - def db_close(self): if self.__conection_object is not None: self.__conection_object.close() del self.__conection_object self.__conection_object = None - def __del__(self): if self.__conection_object is not None: self.__conection_object.close() - #print("disconnected") - - + # print("disconnected") diff --git a/test-cli/test/helpers/testsrv_db.py b/test-cli/test/helpers/testsrv_db.py index dc4fb55..dfb4d15 100644 --- a/test-cli/test/helpers/testsrv_db.py +++ b/test-cli/test/helpers/testsrv_db.py @@ -101,6 +101,42 @@ class TestSrv_Database(object): # print(r) return None + def upload_result_string(self, testid_ctl, testid, descrip, strdata): + sql = "SELECT isee.f_upload_result_string({},{},'{}','{}')".format(testid_ctl, testid, descrip, strdata) + # print('>>>' + sql) + try: + self.__sqlObject.db_execute_query(sql) + except Exception as err: + r = find_between(str(err), '#', '#') + # print(r) + return None + + def upload_result_file(self, testid_ctl, testid, descrip, filepath): + try: + # generate a new oid + fileoid = self.__sqlObject.db_upload_large_file(filepath) + # insert into a table + sql = "SELECT isee.f_upload_result_file({},{},'{}','{}')".format(testid_ctl, testid, descrip, fileoid) + # print('>>>' + sql) + self.__sqlObject.db_execute_query(sql) + except Exception as err: + r = find_between(str(err), '#', '#') + # print(r) + return None + + def upload_dmesg(self, testid_ctl, filepath): + try: + # generate a new oid + fileoid = self.__sqlObject.db_upload_large_file(filepath) + # insert into a table + sql = "SELECT isee.f_upload_dmesg_file({},'{}')".format(testid_ctl, fileoid) + # print('>>>' + sql) + self.__sqlObject.db_execute_query(sql) + except Exception as err: + r = find_between(str(err), '#', '#') + # print(r) + return None + def get_task_variables_list(self, board_uuid): sql = "SELECT * FROM isee.f_get_task_variables_list('{}')".format(board_uuid) # print('>>>' + sql) @@ -176,4 +212,3 @@ class TestSrv_Database(object): r = find_between(str(err), '#', '#') # print(r) return None - diff --git a/test-cli/test/runners/simple.py b/test-cli/test/runners/simple.py index a165406..e0418d2 100644 --- a/test-cli/test/runners/simple.py +++ b/test-cli/test/runners/simple.py @@ -71,10 +71,19 @@ class TextTestResult(unittest.TestResult): unittest.TestResult.stopTest(self, test) # display: print test result self.runner.writeUpdate(self.result) + # save result data + for result in test.getresults(): + if "desc" in result.keys() and result["desc"]: + if "type" in result.keys() and "data" in result.keys(): + if result["type"] == "string": + self.__pgObj.upload_result_string(test.params["testidctl"], test.params["testid"], + result["desc"], result["data"]) + elif result["type"] == "file": + self.__pgObj.upload_result_file(test.params["testidctl"], test.params["testid"], result["desc"], + result["data"]) # SEND TO DB THE RESULT OF THE TEST if self.result == self.PASS: status = "TEST_COMPLETE" else: status = "TEST_FAILED" self.__pgObj.finish_test(test.params["testidctl"], test.params["testid"], status) - diff --git a/test-cli/test/tasks/__init__.py b/test-cli/test/tasks/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/test-cli/test/tasks/__init__.py diff --git a/test-cli/test/tasks/flasheeprom.py b/test-cli/test/tasks/flasheeprom.py new file mode 100644 index 0000000..71c1bdd --- /dev/null +++ b/test-cli/test/tasks/flasheeprom.py @@ -0,0 +1,56 @@ +import os +import binascii +from test.helpers.globalVariables import globalVar + + +def _generate_data_bytes(boarduuid, mac0, mac1): + data = bytearray() + data += (2029785358).to_bytes(4, 'big') # magicid --> 0x78FC110E + data += bytearray([0, 0, 0, 0]) # crc32 = 0 + data += bytearray(boarduuid + "\0", + 'ascii') # uuid --> 'c0846c8a-5fa5-11ea-8576-f8b156ac62d7' and \0 at the end + data += binascii.unhexlify(mac0.replace(':', '')) # mac0 --> 'f8:b1:56:ac:62:d7' + if mac1 is not None: + data += binascii.unhexlify(mac1.replace(':', '')) # mac1 --> 'f8:b1:56:ac:62:d7' + else: + data += bytearray([0, 0, 0, 0, 0, 0]) # mac1 --> 0:0:0:0:0:0 + # calculate crc + crc = (binascii.crc32(data, 0)).to_bytes(4, 'big') + data[4:8] = crc + return data + + +def _generate_output_data(data_rx): + boarduuid = data_rx[8:44].decode('ascii') # no 8:45 porque el \0 final hace que no funcione postgresql + mac0 = binascii.hexlify(data_rx[45:51]).decode('ascii') + mac1 = binascii.hexlify(data_rx[51:57]).decode('ascii') + smac0 = ':'.join(mac0[i:i + 2] for i in range(0, len(mac0), 2)) + smac1 = ':'.join(mac1[i:i + 2] for i in range(0, len(mac1), 2)) + eepromdata = "UUID " + boarduuid + " , MAC0 " + smac0 + " , MAC1 " + smac1 + + return eepromdata + + +# returns exitcode and data saved into eeprom +def flash_eeprom(eeprompath, boarduuid, mac0, mac1=None): + print("Start programming Eeprom...") + # check if eeprompath is correct + if os.path.isfile(eeprompath): + # create u-boot data struct + data = _generate_data_bytes(boarduuid, mac0, mac1) + # write into eeprom and read back + f = open(eeprompath, "r+b") + f.write(data) + f.seek(0) + data_rx = f.read(57) + for i in range(57): + if data_rx[i] != data[i]: + print("Error while programming eeprom memory.") + return 1, None + print("Eeprom programmed succesfully.") + # generated eeprom read data + eepromdata = _generate_output_data(data_rx) + return 0, eepromdata + else: + print("Eeprom memory not found.") + return 1, None diff --git a/test-cli/test/tasks/flashmemory.py b/test-cli/test/tasks/flashmemory.py new file mode 100644 index 0000000..3be56d7 --- /dev/null +++ b/test-cli/test/tasks/flashmemory.py @@ -0,0 +1,12 @@ +import sh + + +def flash_memory(imagepath): + print("Start programming Nand memory...") + p = sh.bash("/usr/bin/igep-flash", "--skip-nandtest", "--image", imagepath) + if p.exit_code != 0: + print("Flasher: Could not complete flashing task.") + return 1 + else: + print("Flasher: NAND memory flashed succesfully.") + return 0 diff --git a/test-cli/test/tasks/generatedmesg.py b/test-cli/test/tasks/generatedmesg.py new file mode 100644 index 0000000..f0a3418 --- /dev/null +++ b/test-cli/test/tasks/generatedmesg.py @@ -0,0 +1,17 @@ +import sh + + +def generate_dmesg(tidctl, pgObj): + print("Start getting DMESG...") + p = sh.dmesg() + if p.exit_code != 0: + print("DMESG: Unknown error.") + return 1 + else: + # save dmesg in a file + with open('/tmp/dmesg.txt', 'w') as outfile: + n = outfile.write(p.stdout.decode('ascii')) + # save dmesg result in DB + pgObj.upload_dmesg(tidctl, '/tmp/dmesg.txt') + print("DMESG: saved succesfully.") + return 0 diff --git a/test-cli/test/tests/qamper.py b/test-cli/test/tests/qamper.py index c611fdb..51aa469 100644 --- a/test-cli/test/tests/qamper.py +++ b/test-cli/test/tests/qamper.py @@ -4,6 +4,7 @@ from test.helpers.amper import Amper class Qamper(unittest.TestCase): params = None + __current = None # varlist: undercurrent, overcurrent def __init__(self, testname, testfunc, varlist): @@ -33,12 +34,22 @@ class Qamper(unittest.TestCase): self.fail("failed: can not communicate") return # get current value (in Amperes) - result = amp.getCurrent() + self.__current = amp.getCurrent() # close serial connection amp.close() - # # In order to give a valid result it is importarnt to define an under current value - if result > float(self._overcurrent): - self.fail("failed: Overcurrent detected ( {} )".format(result)) + # Check current range + if float(self.__current) > float(self._overcurrent): + self.fail("failed: Overcurrent detected ( {} )".format(self.__current)) + if float(self.__current) < float(self._undercurrent): + self.fail("failed: Undercurrent detected ( {} )".format(self.__current)) - if result < float(self._undercurrent): - self.fail("failed: Undercurrent detected ( {} )".format(result)) + def getresults(self): + # resultlist is a python list of python dictionaries + resultlist = [ + { + "desc": "current (Ampers)", + "data": self.__current, + "type": "string" + } + ] + return resultlist diff --git a/test-cli/test/tests/qduplex_ser.py b/test-cli/test/tests/qduplex_ser.py index cb690cb..c7231c2 100644 --- a/test-cli/test/tests/qduplex_ser.py +++ b/test-cli/test/tests/qduplex_ser.py @@ -69,3 +69,8 @@ class Qduplex(unittest.TestCase): else: if self.__serial1.readline() != test_uuid: self.fail("failed: port {} write/read mismatch".format(self.__port1)) + + def getresults(self): + # resultlist is a python list of python dictionaries + resultlist = [] + return resultlist diff --git a/test-cli/test/tests/qeeprom.py b/test-cli/test/tests/qeeprom.py index 1921bf7..a65ca97 100644 --- a/test-cli/test/tests/qeeprom.py +++ b/test-cli/test/tests/qeeprom.py @@ -39,3 +39,7 @@ class Qeeprom(unittest.TestCase): else: self.fail("failed: Unable to write on the EEPROM device.") + def getresults(self): + # resultlist is a python list of python dictionaries + resultlist = [] + return resultlist diff --git a/test-cli/test/tests/qethernet.py b/test-cli/test/tests/qethernet.py index 2246029..da085d8 100644 --- a/test-cli/test/tests/qethernet.py +++ b/test-cli/test/tests/qethernet.py @@ -1,6 +1,7 @@ import unittest import sh import re +import json class Qethernet(unittest.TestCase): @@ -9,11 +10,16 @@ class Qethernet(unittest.TestCase): __bwexpected = None __port = None params = None + __bwreal = None # varlist content: serverip, bwexpected, port def __init__(self, testname, testfunc, varlist): + # save the parameters list self.params = varlist + # configure the function to be executed when the test runs. "testfunc" is a name of a method inside this + # class, that in this situation, it can only be "execute". super(Qethernet, self).__init__(testfunc) + # validate and get the parameters if "serverip" in varlist: self.__serverip = varlist["serverip"] else: @@ -32,8 +38,8 @@ class Qethernet(unittest.TestCase): def execute(self): # execute iperf command against the server try: - p = sh.iperf("-c", self.__serverip, "-x", "CMSV", "-n", self.__numbytestx, "-f", "m", "-p", self.__port, - _timeout=20) + p = sh.iperf3("-c", self.__serverip, "-n", self.__numbytestx, "-f", "m", "-p", self.__port, "-J", + _timeout=20) except sh.TimeoutException: self.fail("failed: iperf timeout reached") @@ -42,17 +48,25 @@ class Qethernet(unittest.TestCase): if p.stdout == "": self.fail("failed: error executing iperf command") # analyze output string - # split by lines - lines = p.stdout.splitlines() - # get first line - dat = lines[1].decode('ascii') - # search for the BW value - a = re.search("\d+(\.\d)? Mbits/sec", dat) - b = a.group().split() - bwreal = b[0] + data = json.loads(p.stdout.decode('ascii')) + self.__bwreal = float(data['end']['sum_received']['bits_per_second'])/1024/1024 + # save result file + with open('/tmp/ethernet-iperf.json', 'w') as outfile: + json.dump(data, outfile) # check if BW is in the expected range - self.failUnless(float(bwreal) > float(self.__bwexpected) * 0.9, - "failed: speed is lower than spected. Speed(MB/s)" + str(bwreal)) + self.failUnless(self.__bwreal > float(self.__bwexpected) * 0.9, + "failed: speed is lower than spected. Speed(Mbits/s)" + str(self.__bwreal)) else: self.fail("failed: could not complete iperf command") + + def getresults(self): + # resultlist is a python list of python dictionaries + resultlist = [ + { + "desc": "iperf3 output", + "data": "/tmp/ethernet-iperf.json", + "type": "file" + } + ] + return resultlist diff --git a/test-cli/test/tests/qi2c.py b/test-cli/test/tests/qi2c.py index c59975e..ad7ddf0 100644 --- a/test-cli/test/tests/qi2c.py +++ b/test-cli/test/tests/qi2c.py @@ -16,7 +16,7 @@ class Qi2c(unittest.TestCase): self.__register = varlist["register"].split("/") else: raise Exception('register param inside Qi2c must be defined') - self.__devices=[] + self.__devices = [] self._testMethodDoc = testname def execute(self): @@ -26,13 +26,19 @@ class Qi2c(unittest.TestCase): self.__raw_out = i2c_command.getOutput() if self.__raw_out == "": return -1 - lines=self.__raw_out.decode('ascii').splitlines() + lines = self.__raw_out.decode('ascii').splitlines() for i in range(len(lines)): if lines[i].count('UU'): if lines[i].find("UU"): - self.__devices.append("0x{}{}".format((i - 1), hex(int((lines[i].find("UU") - 4) / 3)).split('x')[-1])) + self.__devices.append( + "0x{}{}".format((i - 1), hex(int((lines[i].find("UU") - 4) / 3)).split('x')[-1])) for i in range(len(self.__register)): - if not(self.__register[i] in self.__devices): + if not (self.__register[i] in self.__devices): self.fail("failed: device {} not found in bus i2c-{}".format(self.__register[i], self.__busnum)) else: self.fail("failed: could not complete i2cdedtect command") + + def getresults(self): + # resultlist is a python list of python dictionaries + resultlist = [] + return resultlist diff --git a/test-cli/test/tests/qnand.py b/test-cli/test/tests/qnand.py index 10a68f2..7ff7cb2 100644 --- a/test-cli/test/tests/qnand.py +++ b/test-cli/test/tests/qnand.py @@ -18,6 +18,21 @@ class Qnand(unittest.TestCase): def execute(self): try: - sh.nandtest("-m", self.__device, _out="/dev/null") + p = sh.nandtest("-m", self.__device) + # save result + with open('/tmp/nand-nandtest.txt', 'w') as outfile: + n = outfile.write(p.stdout.decode('ascii')) + except sh.ErrorReturnCode as e: self.fail("failed: could not complete nandtest command") + + def getresults(self): + # resultlist is a python list of python dictionaries + resultlist = [ + { + "desc": "nandtest output", + "data": "/tmp/nand-nandtest.txt", + "type": "file" + } + ] + return resultlist diff --git a/test-cli/test/tests/qram.py b/test-cli/test/tests/qram.py index 5a7734a..561e980 100644 --- a/test-cli/test/tests/qram.py +++ b/test-cli/test/tests/qram.py @@ -23,6 +23,12 @@ class Qram(unittest.TestCase): def execute(self): try: - sh.memtester(self.__memsize, "1", _out="/dev/null") + p = sh.memtester(self.__memsize, "1", _out="/dev/null") + except sh.ErrorReturnCode as e: self.fail("failed: could not complete memtester command") + + def getresults(self): + # resultlist is a python list of python dictionaries + resultlist = [] + return resultlist diff --git a/test-cli/test/tests/qrtc.py b/test-cli/test/tests/qrtc.py index 0be0f99..715bcb7 100644 --- a/test-cli/test/tests/qrtc.py +++ b/test-cli/test/tests/qrtc.py @@ -37,3 +37,7 @@ class Qrtc(unittest.TestCase): else: self.fail("failed: couldn't execute hwclock command") + def getresults(self): + # resultlist is a python list of python dictionaries + resultlist = [] + return resultlist diff --git a/test-cli/test/tests/qserial.py b/test-cli/test/tests/qserial.py index 45c9d03..0cb5563 100644 --- a/test-cli/test/tests/qserial.py +++ b/test-cli/test/tests/qserial.py @@ -45,3 +45,8 @@ class Qserial(unittest.TestCase): # check if what it was sent is equal to what has been received if self.__serial.readline() != test_uuid: self.fail("failed: port {} write/read mismatch".format(self.__port)) + + def getresults(self): + # resultlist is a python list of python dictionaries + resultlist = [] + return resultlist diff --git a/test-cli/test/tests/qusb.py b/test-cli/test/tests/qusb.py index 4c177d9..316cef5 100644 --- a/test-cli/test/tests/qusb.py +++ b/test-cli/test/tests/qusb.py @@ -57,3 +57,8 @@ class Qusb(unittest.TestCase): self.fail("failed: unable to copy files.") else: self.fail("failed: unable to mount the device.") + + def getresults(self): + # resultlist is a python list of python dictionaries + resultlist = [] + return resultlist diff --git a/test-cli/test/tests/qwifi.py b/test-cli/test/tests/qwifi.py index 684cb34..0944dd7 100644 --- a/test-cli/test/tests/qwifi.py +++ b/test-cli/test/tests/qwifi.py @@ -1,6 +1,7 @@ import unittest import sh import re +import json class Qwifi(unittest.TestCase): @@ -9,6 +10,7 @@ class Qwifi(unittest.TestCase): __bwexpected = None __port = None params = None + __bwreal = None # varlist content: serverip, bwexpected, port def __init__(self, testname, testfunc, varlist): @@ -46,8 +48,8 @@ class Qwifi(unittest.TestCase): if result: # execute iperf command against the server try: - p = sh.iperf("-c", self.__serverip, "-x", "CMSV", "-n", self.__numbytestx, "-f", "m", "-p", - self.__port, _timeout=20) + p = sh.iperf3("-c", self.__serverip, "-n", self.__numbytestx, "-f", "m", "-p", self.__port, + "-J", _timeout=20) except sh.TimeoutException: self.fail("failed: iperf timeout reached") @@ -56,18 +58,15 @@ class Qwifi(unittest.TestCase): if p.stdout == "": self.fail("failed: error executing iperf command") # analyze output string - # split by lines - lines = p.stdout.splitlines() - # get first line - dat = lines[1].decode('ascii') - # search for the BW value - a = re.search("\d+(\.\d)? Mbits/sec", dat) - b = a.group().split() - bwreal = b[0] + data = json.loads(p.stdout.decode('ascii')) + self.__bwreal = float(data['end']['sum_received']['bits_per_second']) / 1024 / 1024 + # save result file + with open('/tmp/wifi-iperf.json', 'w') as outfile: + json.dump(data, outfile) # check if BW is in the expected range - self.failUnless(float(bwreal) > float(self.__bwexpected) * 0.9, - "failed: speed is lower than spected. Speed(MB/s)" + str(bwreal)) + self.failUnless(self.__bwreal > float(self.__bwexpected) * 0.9, + "failed: speed is lower than spected. Speed(Mbits/s)" + str(self.__bwreal)) else: self.fail("failed: could not complete iperf command") else: @@ -78,3 +77,14 @@ class Qwifi(unittest.TestCase): self.fail("failed: wifi module is not connected to the router.") else: self.fail("failed: couldn't execute iw command") + + def getresults(self): + # resultlist is a python list of python dictionaries + resultlist = [ + { + "desc": "iperf3 output", + "data": "/tmp/wifi-iperf.json", + "type": "file" + } + ] + return resultlist |