From 34df86b37d6838b115e65e5f3a332344afeb86b8 Mon Sep 17 00:00:00 2001 From: Hector Fernandez Date: Wed, 1 Jul 2020 10:45:34 +0200 Subject: Changes to adapt to new way to save results in DB. Created audio test. Added protections against unexpected status in DB. --- test-cli/.idea/workspace.xml | 121 +++++++++++-------------------- test-cli/test/files/dtmf-13579.wav | Bin 8045 -> 17722 bytes test-cli/test/files/test_pattern.png | Bin 4457 -> 0 bytes test-cli/test/helpers/DiskHelpers.py | 9 +++ test-cli/test/helpers/int_registers.py | 2 +- test-cli/test/helpers/testsrv_db.py | 44 +++++------ test-cli/test/runners/simple.py | 17 ++--- test-cli/test/tasks/flasheeprom.py | 2 +- test-cli/test/tasks/generatedmesg.py | 17 ----- test-cli/test/tests/qamper.py | 61 +++++----------- test-cli/test/tests/qaudio.py | 59 +++++++++++++++ test-cli/test/tests/qdmesg.py | 38 ++++++++++ test-cli/test/tests/qduplex_ser.py | 40 +--------- test-cli/test/tests/qeeprom.py | 33 +-------- test-cli/test/tests/qethernet.py | 37 ++-------- test-cli/test/tests/qi2c.py | 67 ----------------- test-cli/test/tests/qmmcflash.py | 15 ++++ test-cli/test/tests/qnand.py | 29 ++------ test-cli/test/tests/qram.py | 20 +---- test-cli/test/tests/qrtc.py | 33 +-------- test-cli/test/tests/qserial.py | 26 +------ test-cli/test/tests/qusb.py | 40 +--------- test-cli/test/tests/qwifi.py | 63 ++-------------- test-cli/test_main.py | 129 +++++++++++++++++++++------------ 24 files changed, 332 insertions(+), 570 deletions(-) delete mode 100644 test-cli/test/files/test_pattern.png create mode 100644 test-cli/test/helpers/DiskHelpers.py delete mode 100644 test-cli/test/tasks/generatedmesg.py create mode 100644 test-cli/test/tests/qaudio.py create mode 100644 test-cli/test/tests/qdmesg.py delete mode 100644 test-cli/test/tests/qi2c.py create mode 100644 test-cli/test/tests/qmmcflash.py diff --git a/test-cli/.idea/workspace.xml b/test-cli/.idea/workspace.xml index 2439fd2..887bf9b 100644 --- a/test-cli/.idea/workspace.xml +++ b/test-cli/.idea/workspace.xml @@ -2,9 +2,39 @@ + + + + + - + + + + + + + + + + + + + + + + + + + + + + + + + + @@ -42,6 +72,8 @@ + + @@ -85,90 +117,21 @@ - + - - + + - - + + - - + + - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + \ No newline at end of file diff --git a/test-cli/test/files/dtmf-13579.wav b/test-cli/test/files/dtmf-13579.wav index 1ca5b93..c5b416a 100644 Binary files a/test-cli/test/files/dtmf-13579.wav and b/test-cli/test/files/dtmf-13579.wav differ diff --git a/test-cli/test/files/test_pattern.png b/test-cli/test/files/test_pattern.png deleted file mode 100644 index 353aab5..0000000 Binary files a/test-cli/test/files/test_pattern.png and /dev/null differ diff --git a/test-cli/test/helpers/DiskHelpers.py b/test-cli/test/helpers/DiskHelpers.py new file mode 100644 index 0000000..79b661e --- /dev/null +++ b/test-cli/test/helpers/DiskHelpers.py @@ -0,0 +1,9 @@ +import stat +import os + + +def disk_exists(path): + try: + return stat.S_ISBLK(os.stat(path).st_mode) + except: + return False diff --git a/test-cli/test/helpers/int_registers.py b/test-cli/test/helpers/int_registers.py index 0feb8da..9de503b 100644 --- a/test-cli/test/helpers/int_registers.py +++ b/test-cli/test/helpers/int_registers.py @@ -44,7 +44,7 @@ def get_die_id(modelid): return dieid -def get_mac(modelid): +def get_internal_mac(modelid): mac = None if modelid.find("IGEP0034") == 0 or modelid.find("SOPA0000") == 0: diff --git a/test-cli/test/helpers/testsrv_db.py b/test-cli/test/helpers/testsrv_db.py index 637d45c..7eeefb3 100644 --- a/test-cli/test/helpers/testsrv_db.py +++ b/test-cli/test/helpers/testsrv_db.py @@ -91,8 +91,8 @@ class TestSrv_Database(object): # print(r) return None - def finish_test(self, testid_ctl, testid, newstatus): - sql = "SELECT isee.f_finish_test({},{},'{}')".format(testid_ctl, testid, newstatus) + def finish_test(self, testid_ctl, testid, newstatus, textresult): + sql = "SELECT isee.f_finish_test({},{},'{}','{}')".format(testid_ctl, testid, newstatus, textresult) # print('>>>' + sql) try: self.__sqlObject.db_execute_query(sql) @@ -101,35 +101,13 @@ class TestSrv_Database(object): # print(r) return None - def upload_result_string(self, testid_ctl, testid, descrip, strdata): - sql = "SELECT isee.f_upload_result_string({},{},'{}','{}')".format(testid_ctl, testid, descrip, strdata) - # print('>>>' + sql) - try: - self.__sqlObject.db_execute_query(sql) - except Exception as err: - r = find_between(str(err), '#', '#') - # print(r) - return None - - def upload_result_file(self, testid_ctl, testid, descrip, filepath): + def upload_result_file(self, testid_ctl, testid, desc, filepath, mimetype): try: # generate a new oid fileoid = self.__sqlObject.db_upload_large_file(filepath) # insert into a table - sql = "SELECT isee.f_upload_result_file({},{},'{}','{}')".format(testid_ctl, testid, descrip, fileoid) - # print('>>>' + sql) - self.__sqlObject.db_execute_query(sql) - except Exception as err: - r = find_between(str(err), '#', '#') - # print(r) - return None - - def upload_dmesg(self, testid_ctl, filepath): - try: - # generate a new oid - fileoid = self.__sqlObject.db_upload_large_file(filepath) - # insert into a table - sql = "SELECT isee.f_upload_dmesg_file({},'{}')".format(testid_ctl, fileoid) + sql = "SELECT isee.f_upload_result_file({},{},'{}','{}','{}')".format(testid_ctl, testid, desc, fileoid, + mimetype) # print('>>>' + sql) self.__sqlObject.db_execute_query(sql) except Exception as err: @@ -224,3 +202,15 @@ class TestSrv_Database(object): r = find_between(str(err), '#', '#') # print(r) return None + + def get_setup_variable(self, skey): + sql = "SELECT * FROM admin.get_setupvar('{}')".format(skey) + # print('>>>' + sql) + try: + res = self.__sqlObject.db_execute_query(sql) + # print(res) + return res[0][0] + except Exception as err: + r = find_between(str(err), '#', '#') + # print(r) + return None diff --git a/test-cli/test/runners/simple.py b/test-cli/test/runners/simple.py index 22fe449..2eb61a4 100644 --- a/test-cli/test/runners/simple.py +++ b/test-cli/test/runners/simple.py @@ -59,8 +59,9 @@ class TextTestResult(unittest.TestResult): def addError(self, test, err): unittest.TestResult.addError(self, test, err) - test.longMessage = err[1] + # test.longMessage = err[1] self.result = self.ERROR + print(err[1]) def addFailure(self, test, err): unittest.TestResult.addFailure(self, test, err) @@ -73,17 +74,13 @@ class TextTestResult(unittest.TestResult): self.runner.writeUpdate(self.result) # save result data for result in test.getresults(): - if "desc" in result.keys() and result["desc"]: - if "type" in result.keys() and "data" in result.keys(): - if result["type"] == "string": - self.__pgObj.upload_result_string(test.params["testidctl"], test.params["testid"], - result["desc"], result["data"]) - elif result["type"] == "file": - self.__pgObj.upload_result_file(test.params["testidctl"], test.params["testid"], - result["desc"], result["data"]) + self.__pgObj.upload_result_file(test.params["testidctl"], test.params["testid"], + result["description"], result["filepath"], result["mimetype"]) # SEND TO DB THE RESULT OF THE TEST if self.result == self.PASS: status = "TEST_COMPLETE" + resulttext = test.gettextresult() else: status = "TEST_FAILED" - self.__pgObj.finish_test(test.params["testidctl"], test.params["testid"], status) + resulttext = test.longMessage + self.__pgObj.finish_test(test.params["testidctl"], test.params["testid"], status, resulttext) diff --git a/test-cli/test/tasks/flasheeprom.py b/test-cli/test/tasks/flasheeprom.py index 71c1bdd..feeb04c 100644 --- a/test-cli/test/tasks/flasheeprom.py +++ b/test-cli/test/tasks/flasheeprom.py @@ -21,7 +21,7 @@ def _generate_data_bytes(boarduuid, mac0, mac1): def _generate_output_data(data_rx): - boarduuid = data_rx[8:44].decode('ascii') # no 8:45 porque el \0 final hace que no funcione postgresql + boarduuid = data_rx[8:44].decode('ascii') # no 8:45 porque el \0 final hace que no funcione postgresql mac0 = binascii.hexlify(data_rx[45:51]).decode('ascii') mac1 = binascii.hexlify(data_rx[51:57]).decode('ascii') smac0 = ':'.join(mac0[i:i + 2] for i in range(0, len(mac0), 2)) diff --git a/test-cli/test/tasks/generatedmesg.py b/test-cli/test/tasks/generatedmesg.py deleted file mode 100644 index aa9d9ce..0000000 --- a/test-cli/test/tasks/generatedmesg.py +++ /dev/null @@ -1,17 +0,0 @@ -import sh - - -def generate_dmesg(tidctl, pgObj): - print("Start getting DMESG...") - p = sh.dmesg("--color=never") - if p.exit_code != 0: - print("DMESG: Unknown error.") - return 1 - else: - # save dmesg in a file - with open('/tmp/dmesg.txt', 'w') as outfile: - n = outfile.write(p.stdout.decode('ascii')) - # save dmesg result in DB - pgObj.upload_dmesg(tidctl, '/tmp/dmesg.txt') - print("DMESG: saved succesfully.") - return 0 diff --git a/test-cli/test/tests/qamper.py b/test-cli/test/tests/qamper.py index 7a31615..b4d57e3 100644 --- a/test-cli/test/tests/qamper.py +++ b/test-cli/test/tests/qamper.py @@ -26,60 +26,37 @@ class Qamper(unittest.TestCase): amp = Amper() # open serial connection if not amp.open(): - self.__resultlist.append( - { - "desc": "Test result", - "data": "FAILED: can not open a serial port", - "type": "string" - } - ) self.fail("Error: can not open a serial port") # check if the amperimeter is connected and working # 2 ATTEMTS if not amp.hello(): if not amp.hello(): - self.__resultlist.append( - { - "desc": "Test result", - "data": "FAILED: can not communicate with the amperimeter", - "type": "string" - } - ) self.fail("Error: can not communicate") # get current value (in Amperes) - current = amp.getCurrent() + self.current = amp.getCurrent() + # save result in a file + with open('/tmp/station/amper.txt', 'w') as outfile: + n = outfile.write("Current: {} A".format(self.current)) + outfile.close() + self.__resultlist.append( + { + "description": "Amperimeter values", + "filepath": "/tmp/station/amper.txt", + "mimetype": "text/plain" + } + ) # close serial connection amp.close() # Check current range - if float(current) > float(self._overcurrent): + if float(self.current) > float(self._overcurrent): # Overcurrent detected - self.__resultlist.append( - { - "desc": "Test result", - "data": "FAILED: Overcurrent detected ( {} A)".format(current), - "type": "string" - } - ) - self.fail("failed: Overcurrent detected ( {} )".format(current)) - elif float(current) < float(self._undercurrent): + self.fail("failed: Overcurrent detected ( {} )".format(self.current)) + elif float(self.current) < float(self._undercurrent): # Undercurrent detected - self.__resultlist.append( - { - "desc": "Test result", - "data": "FAILED: Undercurrent detected ( {} A)".format(current), - "type": "string" - } - ) - self.fail("failed: Undercurrent detected ( {} )".format(current)) - - # Test successful - self.__resultlist.append( - { - "desc": "Test result", - "data": "OK: Current {} A".format(current), - "type": "string" - } - ) + self.fail("failed: Undercurrent detected ( {} )".format(self.current)) def getresults(self): return self.__resultlist + + def gettextresult(self): + return "{} A".format(self.current) diff --git a/test-cli/test/tests/qaudio.py b/test-cli/test/tests/qaudio.py new file mode 100644 index 0000000..5baffe4 --- /dev/null +++ b/test-cli/test/tests/qaudio.py @@ -0,0 +1,59 @@ +import unittest +import sh +import wave +import contextlib + + +def calc_audio_duration(fname): + with contextlib.closing(wave.open(fname, 'r')) as f: + frames = f.getnframes() + rate = f.getframerate() + duration = frames / float(rate) + f.close() + return duration + + +class Qaudio(unittest.TestCase): + params = None + __resultlist = None # resultlist is a python list of python dictionaries + __dtmf_secuence = ["1", "3", "5", "7", "9"] + + def __init__(self, testname, testfunc, varlist): + self.params = varlist + super(Qaudio, self).__init__(testfunc) + self._testMethodDoc = testname + self.__resultlist = [] + + def execute(self): + # analize audio file + recordtime = calc_audio_duration("test/files/dtmf-13579.wav") + 0.15 + # play and record + p1 = sh.aplay("test/files/dtmf-13579.wav", _bg=True) + p2 = sh.arecord("-r", 8000, "-d", recordtime, "/tmp/station/recorded.wav", _bg=True) + p1.wait() + p2.wait() + if p1.exit_code == 0 and p2.exit_code == 0: + p = sh.multimon("-t", "wav", "-a", "DTMF", "/tmp/station/recorded.wav", "-q") + if p.exit_code == 0: + lines = p.stdout.decode('ascii').splitlines() + self.__dtmf_secuence_result = [] + for li in lines: + if li.split(" ")[0] == "DTMF:": + self.__dtmf_secuence_result.append(li.split(" ")[1]) + # compare original and processed dtmf sequence + if len(self.__dtmf_secuence) == len(self.__dtmf_secuence_result): + for i in range(len(self.__dtmf_secuence)): + if self.__dtmf_secuence[i] != self.__dtmf_secuence_result[i]: + self.fail("failed: sent and received DTMF sequence don't match.") + else: + self.fail("failed: received DTMF sequence is shorter than expected.") + else: + self.fail("failed: unable to use multimon command.") + else: + self.fail("failed: unable to play/record audio.") + + def getresults(self): + return self.__resultlist + + def gettextresult(self): + return "" diff --git a/test-cli/test/tests/qdmesg.py b/test-cli/test/tests/qdmesg.py new file mode 100644 index 0000000..45dd4eb --- /dev/null +++ b/test-cli/test/tests/qdmesg.py @@ -0,0 +1,38 @@ +import sh +import os.path +from os import path + +class Qdmesg: + params = None + __resultlist = None # resultlist is a python list of python dictionaries + + def __init__(self, testname, testfunc, varlist): + self.params = varlist + self._testMethodDoc = testname + self.__resultlist = [] + self.pgObj = varlist["db"] + + def execute(self): + print("running dmesg test") + self.pgObj.run_test(self.params["testidctl"], self.params["testid"]) + # delete previous file + if path.exists("/tmp/station/dmesg.txt"): + print("dmesg remove") + os.remove("/tmp/station/dmesg.txt") + # generate file + p = sh.dmesg("--color=never", _out="/tmp/station/dmesg.txt") + print("Exit code: {}".format(p.exit_code)) + if p.exit_code == 0: + # save result + # with open('/tmp/station/dmesg.txt', 'w') as outfile: + # n = outfile.write(p.stdout.decode('ascii')) + # outfile.close() + + # save dmesg result in DB + self.pgObj.upload_result_file(self.params["testidctl"], self.params["testid"], "dmesg output", + "/tmp/station/dmesg.txt", "text/plain") + self.pgObj.finish_test(self.params["testidctl"], self.params["testid"], "TEST_COMPLETE", "") + print("success dmesg test") + else: + self.pgObj.finish_test(self.params["testidctl"], self.params["testid"], "TEST_FAILED", "") + print("fail dmesg test") diff --git a/test-cli/test/tests/qduplex_ser.py b/test-cli/test/tests/qduplex_ser.py index 020844a..170039b 100644 --- a/test-cli/test/tests/qduplex_ser.py +++ b/test-cli/test/tests/qduplex_ser.py @@ -51,23 +51,9 @@ class Qduplex(unittest.TestCase): self.__serial1.write(test_uuid) time.sleep(0.05) # there might be a small delay if self.__serial2.inWaiting() == 0: - self.__resultlist.append( - { - "desc": "Test result", - "data": "FAILED: port {} wait timeout exceded".format(self.__port2), - "type": "string" - } - ) self.fail("failed: port {} wait timeout exceded".format(self.__port2)) else: if self.__serial2.readline() != test_uuid: - self.__resultlist.append( - { - "desc": "Test result", - "data": "FAILED: port {} write/read mismatch".format(self.__port2), - "type": "string" - } - ) self.fail("failed: port {} write/read mismatch".format(self.__port2)) time.sleep(0.05) # there might be a small delay @@ -81,33 +67,13 @@ class Qduplex(unittest.TestCase): self.__serial2.write(test_uuid) time.sleep(0.05) # there might be a small delay if self.__serial1.inWaiting() == 0: - self.__resultlist.append( - { - "desc": "Test result", - "data": "FAILED: port {} wait timeout exceded".format(self.__port1), - "type": "string" - } - ) self.fail("failed: port {} wait timeout exceded".format(self.__port1)) else: if self.__serial1.readline() != test_uuid: - self.__resultlist.append( - { - "desc": "Test result", - "data": "FAILED: port {} write/read mismatch".format(self.__port1), - "type": "string" - } - ) self.fail("failed: port {} write/read mismatch".format(self.__port1)) - # Test successful - self.__resultlist.append( - { - "desc": "Test OK", - "data": "OK", - "type": "string" - } - ) - def getresults(self): return self.__resultlist + + def gettextresult(self): + return "" diff --git a/test-cli/test/tests/qeeprom.py b/test-cli/test/tests/qeeprom.py index cafbc7f..7fc9fcd 100644 --- a/test-cli/test/tests/qeeprom.py +++ b/test-cli/test/tests/qeeprom.py @@ -36,41 +36,14 @@ class Qeeprom(unittest.TestCase): # compare both values if data_rx != data_tx: # Both data are different - self.__resultlist.append( - { - "desc": "Test result", - "data": "FAILED: mismatch between written and received values.", - "type": "string" - } - ) self.fail("failed: mismatch between written and received values.") else: - self.__resultlist.append( - { - "desc": "Test result", - "data": "FAILED: Unable to read from the EEPROM device.", - "type": "string" - } - ) self.fail("failed: Unable to read from the EEPROM device.") else: - self.__resultlist.append( - { - "desc": "Test result", - "data": "FAILED: Unable to write on the EEPROM device.", - "type": "string" - } - ) self.fail("failed: Unable to write on the EEPROM device.") - # Test successful - self.__resultlist.append( - { - "desc": "Test result", - "data": "OK", - "type": "string" - } - ) - def getresults(self): return self.__resultlist + + def gettextresult(self): + return "" diff --git a/test-cli/test/tests/qethernet.py b/test-cli/test/tests/qethernet.py index 878c0a0..3b2f197 100644 --- a/test-cli/test/tests/qethernet.py +++ b/test-cli/test/tests/qethernet.py @@ -1,6 +1,5 @@ import unittest import sh -import re import json @@ -53,45 +52,25 @@ class Qethernet(unittest.TestCase): data = json.loads(p.stdout.decode('ascii')) self.__bwreal = float(data['end']['sum_received']['bits_per_second'])/1024/1024 # save result file - with open('/tmp/ethernet-iperf.json', 'w') as outfile: + with open('/tmp/station/ethernet-iperf3.json', 'w') as outfile: json.dump(data, outfile, indent=4) + outfile.close() self.__resultlist.append( { - "desc": "iperf3 output", - "data": "/tmp/ethernet-iperf.json", - "type": "file" + "description": "iperf3 output", + "filepath": "/tmp/station/ethernet-iperf3.json", + "mimetype": "application/json" } ) # check if BW is in the expected range if self.__bwreal < float(self.__bwexpected): - self.__resultlist.append( - { - "desc": "Test result", - "data": "FAILED: speed is lower than spected. Speed(Mbits/s): " + str(self.__bwreal), - "type": "string" - } - ) self.fail("failed: speed is lower than spected. Speed(Mbits/s): " + str(self.__bwreal)) else: - self.__resultlist.append( - { - "desc": "Test result", - "data": "FAILED: could not complete iperf command.", - "type": "string" - } - ) self.fail("failed: could not complete iperf command.") - # Test successful - self.__resultlist.append( - { - "desc": "Test result", - "data": "OK", - "type": "string" - } - ) - def getresults(self): - return self.__resultlist + + def gettextresult(self): + return "" diff --git a/test-cli/test/tests/qi2c.py b/test-cli/test/tests/qi2c.py deleted file mode 100644 index 3afedfa..0000000 --- a/test-cli/test/tests/qi2c.py +++ /dev/null @@ -1,67 +0,0 @@ -from test.helpers.syscmd import SysCommand -import unittest - - -class Qi2c(unittest.TestCase): - params = None - __resultlist = None # resultlist is a python list of python dictionaries - - def __init__(self, testname, testfunc, varlist): - self.params = varlist - super(Qi2c, self).__init__(testfunc) - if "busnum" in varlist: - self.__busnum = varlist["busnum"] - else: - raise Exception('busnum param inside Qi2c must be defined') - if "register" in varlist: - self.__register = varlist["register"].split("/") - else: - raise Exception('register param inside Qi2c must be defined') - self.__devices = [] - self._testMethodDoc = testname - self.__resultlist = [] - - def execute(self): - str_cmd = "i2cdetect -a -y -r {}".format(self.__busnum) - i2c_command = SysCommand("i2cdetect", str_cmd) - if i2c_command.execute() == 0: - self.__raw_out = i2c_command.getOutput() - if self.__raw_out == "": - return -1 - lines = self.__raw_out.decode('ascii').splitlines() - for i in range(len(lines)): - if lines[i].count('UU'): - if lines[i].find("UU"): - self.__devices.append( - "0x{}{}".format((i - 1), hex(int((lines[i].find("UU") - 4) / 3)).split('x')[-1])) - for i in range(len(self.__register)): - if not (self.__register[i] in self.__devices): - self.__resultlist.append( - { - "desc": "Test result", - "data": "FAILED: device {} not found in bus i2c-{}".format(self.__register[i], self.__busnum), - "type": "string" - } - ) - self.fail("failed: device {} not found in bus i2c-{}".format(self.__register[i], self.__busnum)) - else: - self.__resultlist.append( - { - "desc": "Test result", - "data": "FAILED: could not complete i2cdedtect command", - "type": "string" - } - ) - self.fail("failed: could not complete i2cdedtect command.") - - # Test successful - self.__resultlist.append( - { - "desc": "Test result", - "data": "OK", - "type": "string" - } - ) - - def getresults(self): - return self.__resultlist diff --git a/test-cli/test/tests/qmmcflash.py b/test-cli/test/tests/qmmcflash.py new file mode 100644 index 0000000..f10757e --- /dev/null +++ b/test-cli/test/tests/qmmcflash.py @@ -0,0 +1,15 @@ +import unittest +import sh + +class Qmmcflash(unittest.TestCase): + params = None + + def __init__(self, testname, testfunc, varlist): + self.params = varlist + super(Qmmcflash, self).__init__(testfunc) + self._testMethodDoc = testname + # TODO + + + def execute(self): + # TODO \ No newline at end of file diff --git a/test-cli/test/tests/qnand.py b/test-cli/test/tests/qnand.py index d7c22b3..8c78e2b 100644 --- a/test-cli/test/tests/qnand.py +++ b/test-cli/test/tests/qnand.py @@ -22,34 +22,21 @@ class Qnand(unittest.TestCase): try: p = sh.nandtest("-m", self.__device) # save result - with open('/tmp/nand-nandtest.txt', 'w') as outfile: + with open('/tmp/station/nand-nandtest.txt', 'w') as outfile: n = outfile.write(p.stdout.decode('ascii')) + outfile.close() self.__resultlist.append( { - "desc": "nandtest output", - "data": "/tmp/nand-nandtest.txt", - "type": "file" + "description": "nandtest output", + "filepath": "/tmp/station/nand-nandtest.txt", + "mimetype": "text/plain" } ) - except sh.ErrorReturnCode as e: - self.__resultlist.append( - { - "desc": "Test result", - "data": "FAILED: could not complete nandtest command", - "type": "string" - } - ) self.fail("failed: could not complete nandtest command") - # Test successful - self.__resultlist.append( - { - "desc": "Test result", - "data": "OK", - "type": "string" - } - ) - def getresults(self): return self.__resultlist + + def gettextresult(self): + return "" diff --git a/test-cli/test/tests/qram.py b/test-cli/test/tests/qram.py index a46424f..21a01c8 100644 --- a/test-cli/test/tests/qram.py +++ b/test-cli/test/tests/qram.py @@ -26,25 +26,11 @@ class Qram(unittest.TestCase): def execute(self): try: p = sh.memtester(self.__memsize, "1", _out="/dev/null") - except sh.ErrorReturnCode as e: - self.__resultlist.append( - { - "desc": "Test result", - "data": "FAILED: could not complete memtester command", - "type": "string" - } - ) self.fail("failed: could not complete memtester command") - # Test successful - self.__resultlist.append( - { - "desc": "Test result", - "data": "OK", - "type": "string" - } - ) - def getresults(self): return self.__resultlist + + def gettextresult(self): + return "" diff --git a/test-cli/test/tests/qrtc.py b/test-cli/test/tests/qrtc.py index 6d4ecf6..df493d2 100644 --- a/test-cli/test/tests/qrtc.py +++ b/test-cli/test/tests/qrtc.py @@ -33,41 +33,14 @@ class Qrtc(unittest.TestCase): p.stdout.decode('ascii')) # check if the seconds of both times are different if time1 == time2: - self.__resultlist.append( - { - "desc": "Test result", - "data": "FAILED: RTC is not running", - "type": "string" - } - ) self.fail("failed: RTC is not running") else: - self.__resultlist.append( - { - "desc": "Test result", - "data": "FAILED: couldn't execute hwclock command for the second time", - "type": "string" - } - ) self.fail("failed: couldn't execute hwclock command for the second time") else: - self.__resultlist.append( - { - "desc": "Test result", - "data": "FAILED: couldn't execute hwclock command", - "type": "string" - } - ) self.fail("failed: couldn't execute hwclock command") - # Test successful - self.__resultlist.append( - { - "desc": "Test result", - "data": "OK", - "type": "string" - } - ) - def getresults(self): return self.__resultlist + + def gettextresult(self): + return "" diff --git a/test-cli/test/tests/qserial.py b/test-cli/test/tests/qserial.py index faca505..402e33f 100644 --- a/test-cli/test/tests/qserial.py +++ b/test-cli/test/tests/qserial.py @@ -42,34 +42,14 @@ class Qserial(unittest.TestCase): self.__serial.write(test_uuid) time.sleep(0.05) # there might be a small delay if self.__serial.inWaiting() == 0: - self.__resultlist.append( - { - "desc": "Test result", - "data": "FAILED: port {} wait timeout exceded".format(self.__port), - "type": "string" - } - ) self.fail("failed: port {} wait timeout exceded".format(self.__port)) else: # check if what it was sent is equal to what has been received if self.__serial.readline() != test_uuid: - self.__resultlist.append( - { - "desc": "Test result", - "data": "FAILED: port {} write/read mismatch".format(self.__port), - "type": "string" - } - ) self.fail("failed: port {} write/read mismatch".format(self.__port)) - # Test successful - self.__resultlist.append( - { - "desc": "Test result", - "data": "OK", - "type": "string" - } - ) - def getresults(self): return self.__resultlist + + def gettextresult(self): + return "" diff --git a/test-cli/test/tests/qusb.py b/test-cli/test/tests/qusb.py index 32d99ef..9b0cad3 100644 --- a/test-cli/test/tests/qusb.py +++ b/test-cli/test/tests/qusb.py @@ -22,13 +22,6 @@ class Qusb(unittest.TestCase): if dev_obj.getMassStorage(): device = dev_obj.getMassStorage()['disk'] + "1" else: - self.__resultlist.append( - { - "desc": "Test result", - "data": "FAILED: No USB memory found", - "type": "string" - } - ) self.fail("failed: No USB memory found.") # create a new folder where the pendrive is going to be mounted @@ -60,43 +53,16 @@ class Qusb(unittest.TestCase): sh.umount("/mnt/pendrive") # check result if q is None: - self.__resultlist.append( - { - "desc": "Test result", - "data": "FAILED: Wrong md5 result", - "type": "string" - } - ) self.fail("failed: Wrong md5 result.") else: # umount sh.umount("/mnt/pendrive") - self.__resultlist.append( - { - "desc": "Test result", - "data": "FAILED: Unable to copy files to the USB memory device", - "type": "string" - } - ) self.fail("failed: Unable to copy files to the USB memory device.") else: - self.__resultlist.append( - { - "desc": "Test result", - "data": "FAILED: Unable to mount the USB memory device", - "type": "string" - } - ) self.fail("failed: Unable to mount the USB memory device.") - # Test successful - self.__resultlist.append( - { - "desc": "Test result", - "data": "OK", - "type": "string" - } - ) - def getresults(self): return self.__resultlist + + def gettextresult(self): + return "" diff --git a/test-cli/test/tests/qwifi.py b/test-cli/test/tests/qwifi.py index 6f972d3..eb1f0bf 100644 --- a/test-cli/test/tests/qwifi.py +++ b/test-cli/test/tests/qwifi.py @@ -63,80 +63,33 @@ class Qwifi(unittest.TestCase): data = json.loads(p.stdout.decode('ascii')) self.__bwreal = float(data['end']['sum_received']['bits_per_second']) / 1024 / 1024 # save result file - with open('/tmp/wifi-iperf.json', 'w') as outfile: + with open('/tmp/station/wifi-iperf3.json', 'w') as outfile: json.dump(data, outfile, indent=4) + outfile.close() self.__resultlist.append( { - "desc": "iperf3 output", - "data": "/tmp/wifi-iperf.json", - "type": "file" + "description": "iperf3 output", + "filepath": "/tmp/station/wifi-iperf3.json", + "mimetype": "application/json" } ) # check if BW is in the expected range if self.__bwreal < float(self.__bwexpected): - self.__resultlist.append( - { - "desc": "Test result", - "data": "FAILED: speed is lower than expected. Speed(Mbits/s): " + str(self.__bwreal), - "type": "string" - } - ) self.fail("failed: speed is lower than expected. Speed(Mbits/s): " + str(self.__bwreal)) else: - self.__resultlist.append( - { - "desc": "Test result", - "data": "FAILED: could not complete iperf3 command", - "type": "string" - } - ) self.fail("failed: could not complete iperf3 command") else: - self.__resultlist.append( - { - "desc": "Test result", - "data": "FAILED: wlan0 interface doesn't have any ip address", - "type": "string" - } - ) self.fail("failed: wlan0 interface doesn't have any ip address.") else: - self.__resultlist.append( - { - "desc": "Test result", - "data": "FAILED: could not complete ifconfig command", - "type": "string" - } - ) self.fail("failed: could not complete ifconfig command.") else: - self.__resultlist.append( - { - "desc": "Test result", - "data": "FAILED: wifi module is not connected to the router", - "type": "string" - } - ) self.fail("failed: wifi module is not connected to the router.") else: - self.__resultlist.append( - { - "desc": "Test result", - "data": "FAILED: could not execute iw command", - "type": "string" - } - ) self.fail("failed: could not execute iw command") - # Test successful - self.__resultlist.append( - { - "desc": "Test result", - "data": "OK", - "type": "string" - } - ) - def getresults(self): return self.__resultlist + + def gettextresult(self): + return "" diff --git a/test-cli/test_main.py b/test-cli/test_main.py index 63d8dad..b32ad26 100644 --- a/test-cli/test_main.py +++ b/test-cli/test_main.py @@ -1,5 +1,5 @@ from test.helpers.int_registers import get_die_id -from test.helpers.int_registers import get_mac +from test.helpers.int_registers import get_internal_mac from subprocess import call import os import unittest @@ -9,7 +9,6 @@ from test.runners.simple import SimpleTestRunner from test.tests.qethernet import Qethernet from test.tests.qram import Qram from test.tests.qusb import Qusb -from test.tests.qi2c import Qi2c from test.tests.qeeprom import Qeeprom from test.tests.qserial import Qserial from test.tests.qwifi import Qwifi @@ -17,6 +16,8 @@ from test.tests.qrtc import Qrtc from test.tests.qduplex_ser import Qduplex from test.tests.qamper import Qamper from test.tests.qnand import Qnand +from test.tests.qaudio import Qaudio +from test.tests.qdmesg import Qdmesg from test.helpers.globalVariables import globalVar import socket from test.helpers.iseelogger import ISEE_Logger @@ -24,16 +25,17 @@ import logging from test.tasks.flasheeprom import flash_eeprom from test.tasks.flashmemory import flash_memory from test.helpers.qrreader import QRReader -from test.tasks.generatedmesg import generate_dmesg from test.helpers.cmdline import LinuxKernel from test.helpers.amper import Amper from test.enums.StationStates import StationStates +import time # global variables psdbObj = TestSrv_Database() xmlObj = None loggerObj = None test_abspath = None +tests_manually_executed = [] # define clear function @@ -50,36 +52,50 @@ def create_paramslist(params): return paramlist -def add_test_task(suite, testdefname, paramlist): +def add_test(suite, testdefname, paramlist): if testdefname == "RAM": suite.addTest(Qram(testdefname, "execute", paramlist)) + return 0 elif testdefname == "SERIALDUAL": suite.addTest(Qduplex(testdefname, "execute", paramlist)) + return 0 elif testdefname == "EEPROM": suite.addTest(Qeeprom(testdefname, "execute", paramlist)) + return 0 elif testdefname == "SERIAL": suite.addTest(Qserial(testdefname, "execute", paramlist)) + return 0 elif testdefname == "RTC": suite.addTest(Qrtc(testdefname, "execute", paramlist)) + return 0 elif testdefname == "CONSUMPTION": suite.addTest(Qamper(testdefname, "execute", paramlist)) + return 0 elif testdefname == "ETHERNET": suite.addTest(Qethernet(testdefname, "execute", paramlist)) + return 0 elif testdefname == "NAND": suite.addTest(Qnand(testdefname, "execute", paramlist)) - elif testdefname == "I2C": - suite.addTest(Qi2c(testdefname, "execute", paramlist)) + return 0 elif testdefname == "WIFI": suite.addTest(Qwifi(testdefname, "execute", paramlist)) + return 0 elif testdefname == "USB": suite.addTest(Qusb(testdefname, "execute", paramlist)) + return 0 + elif testdefname == "AUDIO": + suite.addTest(Qaudio(testdefname, "execute", paramlist)) + return 0 + elif testdefname == "DMESG": + tests_manually_executed.append(Qdmesg(testdefname, "execute", paramlist)) + return 0 else: - raise Exception("Wrong testdefname") + return 1 def create_testsuite(): # create an object TestSuite - suite = unittest.TestSuite() + suite1 = unittest.TestSuite() # get list of tests for this board tests = psdbObj.get_tests_list(globalVar.g_uuid) # loop in every test for this board @@ -93,10 +109,11 @@ def create_testsuite(): paramlist["boarduuid"] = globalVar.g_uuid paramlist["testidctl"] = globalVar.testid_ctl paramlist["xml"] = xmlObj + paramlist["db"] = psdbObj # add test to TestSuite - add_test_task(suite, testdefname, paramlist) + rescode = add_test(suite1, testdefname, paramlist) - return suite + return suite1 def create_board(): @@ -122,7 +139,7 @@ def create_board(): print(globalVar.g_mid) print(processor_id) globalVar.g_uuid = psdbObj.create_board(processor_id, model_id, variant, globalVar.station, - get_mac(globalVar.g_mid)) + get_internal_mac(globalVar.g_mid)) print(globalVar.g_uuid) @@ -140,31 +157,38 @@ def main(): # create a process globalVar.testid_ctl = psdbObj.open_test(globalVar.g_uuid) - # create and run tests according to the board type - runner = SimpleTestRunner(psdbObj) # Change state to "TESTS_RUNNING" if psdbObj.read_station_state(globalVar.station) == StationStates.STATION_ERROR.name: # Abort - print("Error: Wrong previous station state.") + print( + "Error: Wrong previous station state before changing to TESTS_RUNNING state. STATION_ERROR state unexpected.") exit(0) psdbObj.change_station_state(globalVar.station, StationStates.TESTS_RUNNING.name) - # Execute tests - testresult = runner.run(create_testsuite()) - # save dmesg at the end of the tests - generate_dmesg(globalVar.testid_ctl, psdbObj) - # execute aditional tasks, only if the test was succesfull + # generate suits + suite1 = create_testsuite() + # Execute tests (round 1) + runner1 = SimpleTestRunner(psdbObj) + testresult = runner1.run(suite1) + + #Execute manually tests + for test in tests_manually_executed: + test.execute() + + # execute aditional tasks, only if the test was successful if testresult.wasSuccessful(): # Change state to "TESTS_OK" if psdbObj.read_station_state(globalVar.station) == StationStates.STATION_ERROR.name: # Abort - print("Error: Wrong previous station state.") + print( + "Error: Wrong previous station state before changing to TESTS_OK state. STATION_ERROR state unexpected.") exit(0) psdbObj.change_station_state(globalVar.station, StationStates.TESTS_OK.name) # Change state to "EXTRATASKS_RUNNING" if psdbObj.read_station_state(globalVar.station) == StationStates.STATION_ERROR.name: # Abort - print("Error: Wrong previous station state.") + print( + "Error: Wrong previous station state before changing to EXTRATASKS_RUNNING state. STATION_ERROR state unexpected.") exit(0) psdbObj.change_station_state(globalVar.station, StationStates.EXTRATASKS_RUNNING.name) # create task control @@ -192,28 +216,38 @@ def main(): # update status with the result if resulteeprom == 0 and resultmemory == 0: + # finish tasks psdbObj.update_taskctl_status(globalVar.taskid_ctl, "TASK_BOARD_OK") + # Change state to "WAITING_FOR_SCANNER" if psdbObj.read_station_state(globalVar.station) == StationStates.STATION_ERROR.name: # Abort - print("Error: Wrong previous station state.") + print( + "Error: Wrong previous station state before changing to WAITING_FOR_SCANNER state. STATION_ERROR state unexpected.") exit(0) psdbObj.change_station_state(globalVar.station, StationStates.WAITING_FOR_SCANNER.name) - # get barcode using the scanner - qrreceived = False - while not qrreceived: - qr = QRReader() - if qr.openQR(): - # waits 5s to receive a valid code - if qr.readQRasync(5): - qrreceived = True - factorycode = qr.getQRNumber() - psdbObj.set_factorycode(globalVar.g_uuid, factorycode) - qr.closeQR() + + # get barcode using the scanner, only if autotest is disabled + autotest = psdbObj.get_setup_variable("AUTOTEST_" + globalVar.station) + if autotest == "True": + psdbObj.set_factorycode(globalVar.g_uuid, "1234567890") # fake factory code + else: + qrreceived = False + while not qrreceived: + qr = QRReader() + if qr.openQR(): + # waits 5s to receive a valid code + if qr.readQRasync(5): + qrreceived = True + factorycode = qr.getQRNumber() + psdbObj.set_factorycode(globalVar.g_uuid, factorycode) + qr.closeQR() + # Change state to "FINISHED" if psdbObj.read_station_state(globalVar.station) == StationStates.STATION_ERROR.name: # Abort - print("Error: Wrong previous station state.") + print( + "Error: Wrong previous station state before changing to FINISHED state. STATION_ERROR state unexpected.") exit(0) psdbObj.change_station_state(globalVar.station, StationStates.FINISHED.name) else: @@ -224,10 +258,16 @@ def main(): # Change state to "TESTS_FAILED" if psdbObj.read_station_state(globalVar.station) == StationStates.STATION_ERROR.name: # Abort - print("Error: Wrong previous station state.") + print( + "Error: Wrong previous station state before changing to TESTS_FAILED state. STATION_ERROR state unexpected.") exit(0) psdbObj.change_station_state(globalVar.station, StationStates.TESTS_FAILED.name) + # reset board if AUTOTEST is enabled + autotest = psdbObj.get_setup_variable("AUTOTEST_" + globalVar.station) + if autotest == "True": + os.system('reboot') + if __name__ == "__main__": # Clear the shell screen @@ -257,22 +297,17 @@ if __name__ == "__main__": # Check if current state is "WAIT_TEST_START" currentstate = psdbObj.read_station_state(globalVar.station) - if currentstate != StationStates.WAIT_TEST_START.name: - # Abort - print("Error: Wrong previous station state.") - exit(0) + while currentstate != StationStates.WAIT_TEST_START.name: + currentstate = psdbObj.read_station_state(globalVar.station) + if currentstate != StationStates.WAIT_TEST_START.name: + # Wait + print("Error: Wrong previous station state. WAIT_TEST_START state expected. Current state is: {}".format(currentstate)) + print("Waiting for WAIT_TEST_START state") + time.sleep(5) # Change state to "TESTS_CHECKING_ENV" psdbObj.change_station_state(globalVar.station, StationStates.TESTS_CHECKING_ENV.name) - # Look for a sd or pendrive - # TODO - #if not disk_exists("/dev/mmcblk0") and not : - # abort - #print("error: cannot find a barcode scanner.") - #loggerObj.getlogger().info("station error: #cannot find a sd card or memory usb to load the test environment.#") - #exit(0) - # Look for a barcode scanner qr = QRReader() if qr.openQR(): -- cgit v1.1