From db3b1e45c47a1ef23c1ad67114a09cbec0976681 Mon Sep 17 00:00:00 2001 From: Hector Fernandez Date: Thu, 25 Jun 2020 11:45:31 +0200 Subject: Solved bugs. Adapted to DB changes. --- test-cli/setup.xml | 2 +- test-cli/test/enums/StationStates.py | 20 ++++ test-cli/test/enums/__init__.py | 0 test-cli/test/helpers/testsrv_db.py | 18 ++- test-cli/test/runners/simple.py | 4 +- test-cli/test/tests/qamp.py | 76 ------------ test-cli/test/tests/qamper.py | 64 +++++++--- test-cli/test/tests/qaudio.py | 39 ------ test-cli/test/tests/qduplex_ser.py | 43 ++++++- test-cli/test/tests/qeeprom.py | 37 +++++- test-cli/test/tests/qethernet.py | 47 ++++++-- test-cli/test/tests/qi2c.py | 31 ++++- test-cli/test/tests/qnand.py | 33 +++-- test-cli/test/tests/qram.py | 22 +++- test-cli/test/tests/qrtc.py | 38 +++++- test-cli/test/tests/qscreen.py | 27 ----- test-cli/test/tests/qserial.py | 29 ++++- test-cli/test/tests/qtemplate.py | 51 -------- test-cli/test/tests/qusb.py | 56 +++++++-- test-cli/test/tests/qwifi.py | 76 ++++++++++-- test-cli/test_main.py | 226 ++++++++++++++++++++++++----------- 21 files changed, 593 insertions(+), 346 deletions(-) create mode 100644 test-cli/test/enums/StationStates.py create mode 100644 test-cli/test/enums/__init__.py delete mode 100644 test-cli/test/tests/qamp.py delete mode 100644 test-cli/test/tests/qaudio.py delete mode 100644 test-cli/test/tests/qscreen.py delete mode 100644 test-cli/test/tests/qtemplate.py diff --git a/test-cli/setup.xml b/test-cli/setup.xml index 81f0919..42c4be9 100644 --- a/test-cli/setup.xml +++ b/test-cli/setup.xml @@ -1,7 +1,7 @@ - + diff --git a/test-cli/test/enums/StationStates.py b/test-cli/test/enums/StationStates.py new file mode 100644 index 0000000..9de5e15 --- /dev/null +++ b/test-cli/test/enums/StationStates.py @@ -0,0 +1,20 @@ +from enum import Enum, unique + + +@unique +class StationStates(Enum): + UNDEFINED = 1000 + FTP_KERNEL = 1 + FTP_DTB = 2 + MOUNT_NFS = 3 + KERNEL_BOOT = 4 + WAIT_TEST_START = 5 + STATION_ERROR = 2000 + TESTS_CHECKING_ENV = 6 + TESTS_RUNNING = 10 + TESTS_FAILED = 40 + TESTS_OK = 30 + EXTRATASKS_RUNNING = 100 + WAITING_FOR_SCANNER = 50 + FINISHED = 60 + diff --git a/test-cli/test/enums/__init__.py b/test-cli/test/enums/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/test-cli/test/helpers/testsrv_db.py b/test-cli/test/helpers/testsrv_db.py index dfb4d15..637d45c 100644 --- a/test-cli/test/helpers/testsrv_db.py +++ b/test-cli/test/helpers/testsrv_db.py @@ -202,9 +202,21 @@ class TestSrv_Database(object): r = find_between(str(err), '#', '#') # print(r) return None - - def bond_to_station(self, uuid, station): - sql = "SELECT station.bond_to_station('{}','{}')".format(uuid, station) + + def read_station_state(self, station): + sql = "SELECT * FROM station.getmystate('{}');".format(station) + # print('>>>' + sql) + try: + res = self.__sqlObject.db_execute_query(sql) + # print(res) + return res[0][0] + except Exception as err: + r = find_between(str(err), '#', '#') + # print(r) + return None + + def change_station_state(self, station, newstate): + sql = "SELECT station.setmystate('{}', '{}', NULL)".format(newstate, station) # print('>>>' + sql) try: self.__sqlObject.db_execute_query(sql) diff --git a/test-cli/test/runners/simple.py b/test-cli/test/runners/simple.py index e0418d2..22fe449 100644 --- a/test-cli/test/runners/simple.py +++ b/test-cli/test/runners/simple.py @@ -79,8 +79,8 @@ class TextTestResult(unittest.TestResult): self.__pgObj.upload_result_string(test.params["testidctl"], test.params["testid"], result["desc"], result["data"]) elif result["type"] == "file": - self.__pgObj.upload_result_file(test.params["testidctl"], test.params["testid"], result["desc"], - result["data"]) + self.__pgObj.upload_result_file(test.params["testidctl"], test.params["testid"], + result["desc"], result["data"]) # SEND TO DB THE RESULT OF THE TEST if self.result == self.PASS: status = "TEST_COMPLETE" diff --git a/test-cli/test/tests/qamp.py b/test-cli/test/tests/qamp.py deleted file mode 100644 index 56511c8..0000000 --- a/test-cli/test/tests/qamp.py +++ /dev/null @@ -1,76 +0,0 @@ -from test.helpers.syscmd import SysCommand -import unittest -import serial -import time - - -class Qamp(unittest.TestCase): - params = None - - # varlist: undercurrent, overcurrent - def __init__(self, testname, testfunc, varlist): - self.params = varlist - self._current = 0.0 - if "undercurrent" in varlist: - self._undercurrent = varlist["undercurrent"] - else: - raise Exception('undercurrent param inside Qamp must be defined') - if "overcurrent" in varlist: - self._overcurrent = varlist["overcurrent"] - else: - raise Exception('overcurrent param inside Qamp must be defined') - self._vshuntfactor = 16384.0 - self._ser = serial.Serial() - self._ser.port = '/dev/ttyUSB0' - self._ser.baudrate = 115200 - self._ser.parity = serial.PARITY_NONE - self._ser.timeout = 0 - self._ser.writeTimout = 0 - self._ser.xonxoff = False - self._ser.rtscts = False - self._ser.dsrdtr = False - super(Qamp, self).__init__(testfunc) - self._testMethodDoc = testname - - def __del__(self): - self._ser.close() - - def execute(self): - - # Open Serial port ttyUSB0 - error = 0 - try: - self._ser.open() - except: - self.fail("failed: IMPOSSIBLE OPEN USB-SERIAL PORT ( {} )".format(self._ser.port)) - error = 1 - if error == 0: - # Clean input and output buffer of serial port - self._ser.flushInput() - self._ser.flushOutput() - # Send command to read Voltage at Shunt resistor - # Prepare cmd - cmd = 'at+in?\n\r' - i = 0 - - # Write, 1 by 1 byte at a time to avoid hanging of serial receiver code of listener, emphasis being made at sleep of 50 ms. - while (i < len(cmd)): - i = i + self._ser.write(cmd[i].encode('ascii')) - time.sleep(0.05) - self._ser.read(1) - - # Read, 1 by 1 byte - res = [] - while self._ser.inWaiting() > 0: # if incoming bytes are waiting to be read from the serial input buffer - res.append(self._ser.read(1).decode('ascii')) # read the bytes and convert from binary array to ASCII - - string = ''.join(res) - string = string.replace('\n', '') - self._current = float(int(string, 0)) / self._vshuntfactor - print(self._current) - # In order to give a valid result it is importarnt to define an under current value - if self._current > float(self._overcurrent): - self.fail("failed: OVERCURRENT DETECTED ( {} )".format(self._current)) - - if self._current < float(self._undercurrent): - self.fail("failed: UNDERCURRENT DETECTED ( {} )".format(self._current)) diff --git a/test-cli/test/tests/qamper.py b/test-cli/test/tests/qamper.py index 51aa469..7a31615 100644 --- a/test-cli/test/tests/qamper.py +++ b/test-cli/test/tests/qamper.py @@ -4,7 +4,7 @@ from test.helpers.amper import Amper class Qamper(unittest.TestCase): params = None - __current = None + __resultlist = None # resultlist is a python list of python dictionaries # varlist: undercurrent, overcurrent def __init__(self, testname, testfunc, varlist): @@ -20,36 +20,66 @@ class Qamper(unittest.TestCase): else: raise Exception('overcurrent param inside Qamp must be defined') self._testMethodDoc = testname + self.__resultlist = [] def execute(self): amp = Amper() # open serial connection if not amp.open(): - self.fail("failed: can not open serial port") - return + self.__resultlist.append( + { + "desc": "Test result", + "data": "FAILED: can not open a serial port", + "type": "string" + } + ) + self.fail("Error: can not open a serial port") # check if the amperimeter is connected and working # 2 ATTEMTS if not amp.hello(): if not amp.hello(): - self.fail("failed: can not communicate") - return + self.__resultlist.append( + { + "desc": "Test result", + "data": "FAILED: can not communicate with the amperimeter", + "type": "string" + } + ) + self.fail("Error: can not communicate") # get current value (in Amperes) - self.__current = amp.getCurrent() + current = amp.getCurrent() # close serial connection amp.close() # Check current range - if float(self.__current) > float(self._overcurrent): - self.fail("failed: Overcurrent detected ( {} )".format(self.__current)) - if float(self.__current) < float(self._undercurrent): - self.fail("failed: Undercurrent detected ( {} )".format(self.__current)) + if float(current) > float(self._overcurrent): + # Overcurrent detected + self.__resultlist.append( + { + "desc": "Test result", + "data": "FAILED: Overcurrent detected ( {} A)".format(current), + "type": "string" + } + ) + self.fail("failed: Overcurrent detected ( {} )".format(current)) + elif float(current) < float(self._undercurrent): + # Undercurrent detected + self.__resultlist.append( + { + "desc": "Test result", + "data": "FAILED: Undercurrent detected ( {} A)".format(current), + "type": "string" + } + ) + self.fail("failed: Undercurrent detected ( {} )".format(current)) - def getresults(self): - # resultlist is a python list of python dictionaries - resultlist = [ + # Test successful + self.__resultlist.append( { - "desc": "current (Ampers)", - "data": self.__current, + "desc": "Test result", + "data": "OK: Current {} A".format(current), "type": "string" } - ] - return resultlist + ) + + def getresults(self): + return self.__resultlist diff --git a/test-cli/test/tests/qaudio.py b/test-cli/test/tests/qaudio.py deleted file mode 100644 index ef4da67..0000000 --- a/test-cli/test/tests/qaudio.py +++ /dev/null @@ -1,39 +0,0 @@ -from test.helpers.syscmd import SysCommand -import unittest - - -class Qaudio(unittest.TestCase): - params = None - - def __init__(self, testname, testfunc, varlist): - self.params = varlist - super(Qaudio, self).__init__(testfunc) - self._testMethodDoc = testname - if "dtmfFile" in varlist: - self._dtmfFile = varlist["dtmfFile"] - else: - raise Exception('undercurrent param inside Qamp must be defined') - self.__sum = 0 - self.__refSum = 25 # 1+3+5+7+9 - - def execute(self): - str_cmd = "aplay test/files/dtmf-13579.wav & arecord -r 8000 -d 1 recorded.wav" # .format(self.__dtmfFile) - audio_loop = SysCommand("command-name", str_cmd) - if audio_loop.execute() == -1: # BUG: Returns -1 but work - lines = audio_loop.getOutput().splitlines() - str_cmd = "multimon -t wav -a DTMF recorded.wav -q" - dtmf_decoder = SysCommand("command-name", str_cmd) - a = dtmf_decoder.execute() - if dtmf_decoder.execute() == -1: # BUG: Returns -1 but work - self.__raw_out = dtmf_decoder.getOutput() - if self.__raw_out == "": - self.fail("failed: can not execute multimon command") - lines = dtmf_decoder.getOutput().splitlines() - for i in range(0, 5): - aux = [int(s) for s in lines[i].split() if s.isdigit()] - self.__sum = self.__sum + aux[0] - self.failUnless(self.__sum == self.__refSum), "failed: incorrect dtmf code" + str(self.__sum) - else: - self.fail("failed: fail reading recorded file") - else: - self.fail("failed: fail playing/recording file") diff --git a/test-cli/test/tests/qduplex_ser.py b/test-cli/test/tests/qduplex_ser.py index c7231c2..020844a 100644 --- a/test-cli/test/tests/qduplex_ser.py +++ b/test-cli/test/tests/qduplex_ser.py @@ -9,6 +9,7 @@ class Qduplex(unittest.TestCase): __port1 = None __port2 = None __baudrate = None + __resultlist = None # resultlist is a python list of python dictionaries # varlist: port1, port2, baudrate def __init__(self, testname, testfunc, varlist): @@ -27,6 +28,7 @@ class Qduplex(unittest.TestCase): else: raise Exception('baudrate param inside Qduplex must be defined') self._testMethodDoc = testname + self.__resultlist = [] # open serial connection self.__serial1 = serial.Serial(self.__port1, self.__baudrate, timeout=1) @@ -49,9 +51,23 @@ class Qduplex(unittest.TestCase): self.__serial1.write(test_uuid) time.sleep(0.05) # there might be a small delay if self.__serial2.inWaiting() == 0: + self.__resultlist.append( + { + "desc": "Test result", + "data": "FAILED: port {} wait timeout exceded".format(self.__port2), + "type": "string" + } + ) self.fail("failed: port {} wait timeout exceded".format(self.__port2)) else: if self.__serial2.readline() != test_uuid: + self.__resultlist.append( + { + "desc": "Test result", + "data": "FAILED: port {} write/read mismatch".format(self.__port2), + "type": "string" + } + ) self.fail("failed: port {} write/read mismatch".format(self.__port2)) time.sleep(0.05) # there might be a small delay @@ -65,12 +81,33 @@ class Qduplex(unittest.TestCase): self.__serial2.write(test_uuid) time.sleep(0.05) # there might be a small delay if self.__serial1.inWaiting() == 0: + self.__resultlist.append( + { + "desc": "Test result", + "data": "FAILED: port {} wait timeout exceded".format(self.__port1), + "type": "string" + } + ) self.fail("failed: port {} wait timeout exceded".format(self.__port1)) else: if self.__serial1.readline() != test_uuid: + self.__resultlist.append( + { + "desc": "Test result", + "data": "FAILED: port {} write/read mismatch".format(self.__port1), + "type": "string" + } + ) self.fail("failed: port {} write/read mismatch".format(self.__port1)) + # Test successful + self.__resultlist.append( + { + "desc": "Test OK", + "data": "OK", + "type": "string" + } + ) + def getresults(self): - # resultlist is a python list of python dictionaries - resultlist = [] - return resultlist + return self.__resultlist diff --git a/test-cli/test/tests/qeeprom.py b/test-cli/test/tests/qeeprom.py index a65ca97..cafbc7f 100644 --- a/test-cli/test/tests/qeeprom.py +++ b/test-cli/test/tests/qeeprom.py @@ -6,12 +6,14 @@ class Qeeprom(unittest.TestCase): params = None __position = None __eeprompath = None + __resultlist = None # resultlist is a python list of python dictionaries # varlist: position, eeprompath def __init__(self, testname, testfunc, varlist): self.params = varlist super(Qeeprom, self).__init__(testfunc) self._testMethodDoc = testname + self.__resultlist = [] if "position" in varlist: self.__position = varlist["position"] else: @@ -33,13 +35,42 @@ class Qeeprom(unittest.TestCase): data_rx = p.stdout.decode('ascii') # compare both values if data_rx != data_tx: + # Both data are different + self.__resultlist.append( + { + "desc": "Test result", + "data": "FAILED: mismatch between written and received values.", + "type": "string" + } + ) self.fail("failed: mismatch between written and received values.") else: + self.__resultlist.append( + { + "desc": "Test result", + "data": "FAILED: Unable to read from the EEPROM device.", + "type": "string" + } + ) self.fail("failed: Unable to read from the EEPROM device.") else: + self.__resultlist.append( + { + "desc": "Test result", + "data": "FAILED: Unable to write on the EEPROM device.", + "type": "string" + } + ) self.fail("failed: Unable to write on the EEPROM device.") + # Test successful + self.__resultlist.append( + { + "desc": "Test result", + "data": "OK", + "type": "string" + } + ) + def getresults(self): - # resultlist is a python list of python dictionaries - resultlist = [] - return resultlist + return self.__resultlist diff --git a/test-cli/test/tests/qethernet.py b/test-cli/test/tests/qethernet.py index d7354bf..878c0a0 100644 --- a/test-cli/test/tests/qethernet.py +++ b/test-cli/test/tests/qethernet.py @@ -11,6 +11,7 @@ class Qethernet(unittest.TestCase): __port = None params = None __bwreal = None + __resultlist = None # resultlist is a python list of python dictionaries # varlist content: serverip, bwexpected, port def __init__(self, testname, testfunc, varlist): @@ -34,6 +35,7 @@ class Qethernet(unittest.TestCase): raise Exception('port param inside Qethernet must be defined') self.__numbytestx = "10M" self._testMethodDoc = testname + self.__resultlist = [] def execute(self): # execute iperf command against the server @@ -53,20 +55,43 @@ class Qethernet(unittest.TestCase): # save result file with open('/tmp/ethernet-iperf.json', 'w') as outfile: json.dump(data, outfile, indent=4) + self.__resultlist.append( + { + "desc": "iperf3 output", + "data": "/tmp/ethernet-iperf.json", + "type": "file" + } + ) # check if BW is in the expected range - self.failUnless(self.__bwreal > float(self.__bwexpected) * 0.9, - "failed: speed is lower than spected. Speed(Mbits/s)" + str(self.__bwreal)) + if self.__bwreal < float(self.__bwexpected): + self.__resultlist.append( + { + "desc": "Test result", + "data": "FAILED: speed is lower than spected. Speed(Mbits/s): " + str(self.__bwreal), + "type": "string" + } + ) + self.fail("failed: speed is lower than spected. Speed(Mbits/s): " + str(self.__bwreal)) else: - self.fail("failed: could not complete iperf command") + self.__resultlist.append( + { + "desc": "Test result", + "data": "FAILED: could not complete iperf command.", + "type": "string" + } + ) + self.fail("failed: could not complete iperf command.") - def getresults(self): - # resultlist is a python list of python dictionaries - resultlist = [ + # Test successful + self.__resultlist.append( { - "desc": "iperf3 output", - "data": "/tmp/ethernet-iperf.json", - "type": "file" + "desc": "Test result", + "data": "OK", + "type": "string" } - ] - return resultlist + ) + + def getresults(self): + + return self.__resultlist diff --git a/test-cli/test/tests/qi2c.py b/test-cli/test/tests/qi2c.py index ad7ddf0..3afedfa 100644 --- a/test-cli/test/tests/qi2c.py +++ b/test-cli/test/tests/qi2c.py @@ -4,6 +4,7 @@ import unittest class Qi2c(unittest.TestCase): params = None + __resultlist = None # resultlist is a python list of python dictionaries def __init__(self, testname, testfunc, varlist): self.params = varlist @@ -18,6 +19,7 @@ class Qi2c(unittest.TestCase): raise Exception('register param inside Qi2c must be defined') self.__devices = [] self._testMethodDoc = testname + self.__resultlist = [] def execute(self): str_cmd = "i2cdetect -a -y -r {}".format(self.__busnum) @@ -34,11 +36,32 @@ class Qi2c(unittest.TestCase): "0x{}{}".format((i - 1), hex(int((lines[i].find("UU") - 4) / 3)).split('x')[-1])) for i in range(len(self.__register)): if not (self.__register[i] in self.__devices): + self.__resultlist.append( + { + "desc": "Test result", + "data": "FAILED: device {} not found in bus i2c-{}".format(self.__register[i], self.__busnum), + "type": "string" + } + ) self.fail("failed: device {} not found in bus i2c-{}".format(self.__register[i], self.__busnum)) else: - self.fail("failed: could not complete i2cdedtect command") + self.__resultlist.append( + { + "desc": "Test result", + "data": "FAILED: could not complete i2cdedtect command", + "type": "string" + } + ) + self.fail("failed: could not complete i2cdedtect command.") + + # Test successful + self.__resultlist.append( + { + "desc": "Test result", + "data": "OK", + "type": "string" + } + ) def getresults(self): - # resultlist is a python list of python dictionaries - resultlist = [] - return resultlist + return self.__resultlist diff --git a/test-cli/test/tests/qnand.py b/test-cli/test/tests/qnand.py index 7ff7cb2..d7c22b3 100644 --- a/test-cli/test/tests/qnand.py +++ b/test-cli/test/tests/qnand.py @@ -5,6 +5,7 @@ import sh class Qnand(unittest.TestCase): params = None __device = "10M" + __resultlist = None # resultlist is a python list of python dictionaries # varlist: device def __init__(self, testname, testfunc, varlist): @@ -15,6 +16,7 @@ class Qnand(unittest.TestCase): else: raise Exception('device param inside Qnand must be defined') self._testMethodDoc = testname + self.__resultlist = [] def execute(self): try: @@ -22,17 +24,32 @@ class Qnand(unittest.TestCase): # save result with open('/tmp/nand-nandtest.txt', 'w') as outfile: n = outfile.write(p.stdout.decode('ascii')) + self.__resultlist.append( + { + "desc": "nandtest output", + "data": "/tmp/nand-nandtest.txt", + "type": "file" + } + ) except sh.ErrorReturnCode as e: + self.__resultlist.append( + { + "desc": "Test result", + "data": "FAILED: could not complete nandtest command", + "type": "string" + } + ) self.fail("failed: could not complete nandtest command") - def getresults(self): - # resultlist is a python list of python dictionaries - resultlist = [ + # Test successful + self.__resultlist.append( { - "desc": "nandtest output", - "data": "/tmp/nand-nandtest.txt", - "type": "file" + "desc": "Test result", + "data": "OK", + "type": "string" } - ] - return resultlist + ) + + def getresults(self): + return self.__resultlist diff --git a/test-cli/test/tests/qram.py b/test-cli/test/tests/qram.py index 561e980..a46424f 100644 --- a/test-cli/test/tests/qram.py +++ b/test-cli/test/tests/qram.py @@ -6,6 +6,7 @@ class Qram(unittest.TestCase): params = None __memsize = "10M" __loops = "1" + __resultlist = None # resultlist is a python list of python dictionaries # varlist: memsize, loops def __init__(self, testname, testfunc, varlist): @@ -20,15 +21,30 @@ class Qram(unittest.TestCase): else: raise Exception('loops param inside Qram must be defined') self._testMethodDoc = testname + self.__resultlist = [] def execute(self): try: p = sh.memtester(self.__memsize, "1", _out="/dev/null") except sh.ErrorReturnCode as e: + self.__resultlist.append( + { + "desc": "Test result", + "data": "FAILED: could not complete memtester command", + "type": "string" + } + ) self.fail("failed: could not complete memtester command") + # Test successful + self.__resultlist.append( + { + "desc": "Test result", + "data": "OK", + "type": "string" + } + ) + def getresults(self): - # resultlist is a python list of python dictionaries - resultlist = [] - return resultlist + return self.__resultlist diff --git a/test-cli/test/tests/qrtc.py b/test-cli/test/tests/qrtc.py index 715bcb7..6d4ecf6 100644 --- a/test-cli/test/tests/qrtc.py +++ b/test-cli/test/tests/qrtc.py @@ -7,6 +7,7 @@ import re class Qrtc(unittest.TestCase): params = None __rtc = None + __resultlist = None # resultlist is a python list of python dictionaries def __init__(self, testname, testfunc, varlist): self.params = varlist @@ -16,6 +17,7 @@ class Qrtc(unittest.TestCase): else: raise Exception('rtc param inside Qrtc must be defined') self._testMethodDoc = testname + self.__resultlist = [] def execute(self): # get time from RTC peripheral @@ -31,13 +33,41 @@ class Qrtc(unittest.TestCase): p.stdout.decode('ascii')) # check if the seconds of both times are different if time1 == time2: + self.__resultlist.append( + { + "desc": "Test result", + "data": "FAILED: RTC is not running", + "type": "string" + } + ) self.fail("failed: RTC is not running") else: - self.fail("failed: couldn't execute hwclock command 2nd time") + self.__resultlist.append( + { + "desc": "Test result", + "data": "FAILED: couldn't execute hwclock command for the second time", + "type": "string" + } + ) + self.fail("failed: couldn't execute hwclock command for the second time") else: + self.__resultlist.append( + { + "desc": "Test result", + "data": "FAILED: couldn't execute hwclock command", + "type": "string" + } + ) self.fail("failed: couldn't execute hwclock command") + # Test successful + self.__resultlist.append( + { + "desc": "Test result", + "data": "OK", + "type": "string" + } + ) + def getresults(self): - # resultlist is a python list of python dictionaries - resultlist = [] - return resultlist + return self.__resultlist diff --git a/test-cli/test/tests/qscreen.py b/test-cli/test/tests/qscreen.py deleted file mode 100644 index aaee628..0000000 --- a/test-cli/test/tests/qscreen.py +++ /dev/null @@ -1,27 +0,0 @@ -from test.helpers.syscmd import SysCommand -import unittest -from test.helpers.cv_display_test import pattern_detect - - -class Qscreen(unittest.TestCase): - params = None - - #varlist: display - def __init__(self, testname, testfunc, varlist): - self.params = varlist - super(Qscreen, self).__init__(testfunc) - if "display" in varlist: - self.__display = varlist["display"] - else: - raise Exception('display param inside Qscreen have been be defined') - self._testMethodDoc = testname - - def execute(self): - str_cmd = "fbi -T 1 --noverbose -d /dev/{} test/files/test_pattern.png".format(self.__display) - display_image = SysCommand("display_image", str_cmd) - if display_image.execute() == -1: - test_screen = pattern_detect(1) - if not test_screen=="0": - self.fail("failed: {}".format(test_screen)) - else: - self.fail("failed: could not display the image") diff --git a/test-cli/test/tests/qserial.py b/test-cli/test/tests/qserial.py index 0cb5563..faca505 100644 --- a/test-cli/test/tests/qserial.py +++ b/test-cli/test/tests/qserial.py @@ -8,6 +8,7 @@ class Qserial(unittest.TestCase): params = None __port = None __baudrate = None + __resultlist = None # resultlist is a python list of python dictionaries # varlist: port, baudrate def __init__(self, testname, testfunc, varlist): @@ -23,6 +24,7 @@ class Qserial(unittest.TestCase): else: raise Exception('baudrate param inside Qserial must be defined') self._testMethodDoc = testname + self.__resultlist = [] # open serial connection self.__serial = serial.Serial(self.__port, self.__baudrate, timeout=1) @@ -40,13 +42,34 @@ class Qserial(unittest.TestCase): self.__serial.write(test_uuid) time.sleep(0.05) # there might be a small delay if self.__serial.inWaiting() == 0: + self.__resultlist.append( + { + "desc": "Test result", + "data": "FAILED: port {} wait timeout exceded".format(self.__port), + "type": "string" + } + ) self.fail("failed: port {} wait timeout exceded".format(self.__port)) else: # check if what it was sent is equal to what has been received if self.__serial.readline() != test_uuid: + self.__resultlist.append( + { + "desc": "Test result", + "data": "FAILED: port {} write/read mismatch".format(self.__port), + "type": "string" + } + ) self.fail("failed: port {} write/read mismatch".format(self.__port)) + # Test successful + self.__resultlist.append( + { + "desc": "Test result", + "data": "OK", + "type": "string" + } + ) + def getresults(self): - # resultlist is a python list of python dictionaries - resultlist = [] - return resultlist + return self.__resultlist diff --git a/test-cli/test/tests/qtemplate.py b/test-cli/test/tests/qtemplate.py deleted file mode 100644 index 940cded..0000000 --- a/test-cli/test/tests/qtemplate.py +++ /dev/null @@ -1,51 +0,0 @@ -#IF COMMAND IS NEEDED -from test.helpers.syscmd import SysCommand -import unittest -#class name -class Qtemplate(unittest.TestCase): - # Initialize the variables - __variable1 = "Value-a" - __variable2 = "Value-b" - #.... - __variablen = "Value-n" - - def __init__(self, testname, testfunc, input1=None, inputn=None): - # Doing this we will initialize the class and later on perform a particular method inside this class - super(Qtemplate, self).__init__(testfunc) - self.__testname = testname - self.__input1 = input1 - self.__inputn = inputn - self._testMethodDoc = testname - - - - def execute(self): - str_cmd = "command" - t = SysCommand("command-name", str_cmd) - if t.execute() == 0: - self.__raw_out = t.getOutput() - if self.__raw_out == "": - return -1 - lines = t.getOutput().splitlines() - dat = lines[1] - dat = dat.decode('ascii') - dat_list = dat.split( ) - for d in dat_list: - a = dat_list.pop(0) - if a == "sec": - break - self.__MB_real = dat_list[0] - self.__BW_real = dat_list[2] - self.__dat_list = dat_list - print(self.__MB_real) - print(self.__BW_real) - self.failUnless(int(self.__BW_real)>int(self.__OKBW)*0.9,"FAIL:BECAUSE...") - else: - return -1 - return 0 - - def get_Total_MB(self): - return self.__MB_real; - - def get_Total_BW(self): - return self.__MB_real; diff --git a/test-cli/test/tests/qusb.py b/test-cli/test/tests/qusb.py index 316cef5..d952afc 100644 --- a/test-cli/test/tests/qusb.py +++ b/test-cli/test/tests/qusb.py @@ -7,11 +7,13 @@ from test.helpers.changedir import changedir class Qusb(unittest.TestCase): params = None + __resultlist = None # resultlist is a python list of python dictionaries def __init__(self, testname, testfunc, varlist): self.params = varlist super(Qusb, self).__init__(testfunc) self._testMethodDoc = testname + self.__resultlist = [] def execute(self): # Execute script usb.sh @@ -19,9 +21,19 @@ class Qusb(unittest.TestCase): test_abspath = os.path.join(test_abspath, "..", "..") p = sh.bash(os.path.join(test_abspath, 'test/scripts/usb.sh')) # Search in the stdout a pattern "/dev/sd + {letter} + {number} - q = re.search("/dev/sd\w\d", p.stdout.decode('ascii')) + match = re.search("/dev/sd\w\d", p.stdout.decode('ascii')) # get the first device which matches the pattern - device = q.group(0) + if match: + device = match.group(0) + else: + self.__resultlist.append( + { + "desc": "Test result", + "data": "FAILED: No pendrive found", + "type": "string" + } + ) + self.fail("failed: No pendrive found.") # create a new folder where the pendrive is going to be mounted sh.mkdir("-p", "/mnt/pendrive") # check if the device is mounted, and umount it @@ -50,15 +62,43 @@ class Qusb(unittest.TestCase): sh.umount("/mnt/pendrive") # check result if q is None: - self.fail("failed: wrong md5 result.") + self.__resultlist.append( + { + "desc": "Test result", + "data": "FAILED: Wrong md5 result", + "type": "string" + } + ) + self.fail("failed: Wrong md5 result.") else: # umount sh.umount("/mnt/pendrive") - self.fail("failed: unable to copy files.") + self.__resultlist.append( + { + "desc": "Test result", + "data": "FAILED: Unable to copy files to the USB memory device", + "type": "string" + } + ) + self.fail("failed: Unable to copy files to the USB memory device.") else: - self.fail("failed: unable to mount the device.") + self.__resultlist.append( + { + "desc": "Test result", + "data": "FAILED: Unable to mount the USB memory device", + "type": "string" + } + ) + self.fail("failed: Unable to mount the USB memory device.") + + # Test successful + self.__resultlist.append( + { + "desc": "Test result", + "data": "OK", + "type": "string" + } + ) def getresults(self): - # resultlist is a python list of python dictionaries - resultlist = [] - return resultlist + return self.__resultlist diff --git a/test-cli/test/tests/qwifi.py b/test-cli/test/tests/qwifi.py index a5b66e9..6f972d3 100644 --- a/test-cli/test/tests/qwifi.py +++ b/test-cli/test/tests/qwifi.py @@ -11,6 +11,7 @@ class Qwifi(unittest.TestCase): __port = None params = None __bwreal = None + __resultlist = None # resultlist is a python list of python dictionaries # varlist content: serverip, bwexpected, port def __init__(self, testname, testfunc, varlist): @@ -30,6 +31,7 @@ class Qwifi(unittest.TestCase): raise Exception('port param inside Qwifi must be defined') self.__numbytestx = "10M" self._testMethodDoc = testname + self.__resultlist = [] def execute(self): # check if the board is connected to the router by wifi @@ -63,28 +65,78 @@ class Qwifi(unittest.TestCase): # save result file with open('/tmp/wifi-iperf.json', 'w') as outfile: json.dump(data, outfile, indent=4) + self.__resultlist.append( + { + "desc": "iperf3 output", + "data": "/tmp/wifi-iperf.json", + "type": "file" + } + ) # check if BW is in the expected range - self.failUnless(self.__bwreal > float(self.__bwexpected) * 0.9, - "failed: speed is lower than spected. Speed(Mbits/s)" + str(self.__bwreal)) + if self.__bwreal < float(self.__bwexpected): + self.__resultlist.append( + { + "desc": "Test result", + "data": "FAILED: speed is lower than expected. Speed(Mbits/s): " + str(self.__bwreal), + "type": "string" + } + ) + self.fail("failed: speed is lower than expected. Speed(Mbits/s): " + str(self.__bwreal)) else: - self.fail("failed: could not complete iperf command") + self.__resultlist.append( + { + "desc": "Test result", + "data": "FAILED: could not complete iperf3 command", + "type": "string" + } + ) + self.fail("failed: could not complete iperf3 command") else: + self.__resultlist.append( + { + "desc": "Test result", + "data": "FAILED: wlan0 interface doesn't have any ip address", + "type": "string" + } + ) self.fail("failed: wlan0 interface doesn't have any ip address.") else: + self.__resultlist.append( + { + "desc": "Test result", + "data": "FAILED: could not complete ifconfig command", + "type": "string" + } + ) self.fail("failed: could not complete ifconfig command.") else: + self.__resultlist.append( + { + "desc": "Test result", + "data": "FAILED: wifi module is not connected to the router", + "type": "string" + } + ) self.fail("failed: wifi module is not connected to the router.") else: - self.fail("failed: couldn't execute iw command") + self.__resultlist.append( + { + "desc": "Test result", + "data": "FAILED: could not execute iw command", + "type": "string" + } + ) + self.fail("failed: could not execute iw command") - def getresults(self): - # resultlist is a python list of python dictionaries - resultlist = [ + # Test successful + self.__resultlist.append( { - "desc": "iperf3 output", - "data": "/tmp/wifi-iperf.json", - "type": "file" + "desc": "Test result", + "data": "OK", + "type": "string" } - ] - return resultlist + ) + + def getresults(self): + return self.__resultlist diff --git a/test-cli/test_main.py b/test-cli/test_main.py index 3345f53..0df9bd4 100644 --- a/test-cli/test_main.py +++ b/test-cli/test_main.py @@ -6,7 +6,6 @@ import unittest from test.helpers.testsrv_db import TestSrv_Database from test.helpers.setup_xml import XMLSetup from test.runners.simple import SimpleTestRunner -from test.helpers.syscmd import TestSysCommand from test.tests.qethernet import Qethernet from test.tests.qram import Qram from test.tests.qusb import Qusb @@ -27,6 +26,8 @@ from test.tasks.flashmemory import flash_memory from test.helpers.qrreader import QRReader from test.tasks.generatedmesg import generate_dmesg from test.helpers.cmdline import LinuxKernel +from test.helpers.amper import Amper +from test.enums.StationStates import StationStates # global variables psdbObj = TestSrv_Database() @@ -103,13 +104,18 @@ def create_board(): variant = cmd.getkvar("bvariant", "none") if model_id == "none" or variant == "none": - return 1 + print("Error: Cannot open DB connection") + loggerObj.getlogger().info("Station error: #Cannot get model and variant information#") + exit(0) # get model id globalVar.g_mid = model_id + "-" + variant # get station - globalVar.station = socket.gethostname() + if "Station" not in globalVar.station: + print("Error: Wrong Station name") + loggerObj.getlogger().info("Station error: #Wrong station name#") + exit(0) processor_id = get_die_id(globalVar.g_mid) print(globalVar.g_mid) @@ -117,8 +123,6 @@ def create_board(): globalVar.g_uuid = psdbObj.create_board(processor_id, model_id, variant, globalVar.station, get_mac(globalVar.g_mid)) print(globalVar.g_uuid) - psdbObj.bond_to_station(globalVar.g_uuid, globalVar.station) - return 0 def get_taskvars_list(uuid): @@ -131,67 +135,97 @@ def get_taskvars_list(uuid): def main(): # initialize the board - res = create_board() - if res: - loggerObj.getlogger().info("Python program error") + create_board() + + # create a process + globalVar.testid_ctl = psdbObj.open_test(globalVar.g_uuid) + # create and run tests according to the board type + runner = SimpleTestRunner(psdbObj) + # Change state to "TESTS_RUNNING" + if psdbObj.read_station_state(globalVar.station) == StationStates.STATION_ERROR.name: + # Abort + print("Error: Wrong previous station state.") + exit(0) + psdbObj.change_station_state(globalVar.station, StationStates.TESTS_RUNNING.name) + # Execute tests + testresult = runner.run(create_testsuite()) + # save dmesg at the end of the tests + generate_dmesg(globalVar.testid_ctl, psdbObj) + + # execute aditional tasks, only if the test was succesfull + if testresult.wasSuccessful(): + # Change state to "TESTS_OK" + if psdbObj.read_station_state(globalVar.station) == StationStates.STATION_ERROR.name: + # Abort + print("Error: Wrong previous station state.") + exit(0) + psdbObj.change_station_state(globalVar.station, StationStates.TESTS_OK.name) + # Change state to "EXTRATASKS_RUNNING" + if psdbObj.read_station_state(globalVar.station) == StationStates.STATION_ERROR.name: + # Abort + print("Error: Wrong previous station state.") + exit(0) + psdbObj.change_station_state(globalVar.station, StationStates.EXTRATASKS_RUNNING.name) + # create task control + globalVar.taskid_ctl = psdbObj.open_task(globalVar.g_uuid) + # get extra variables + varlist = get_taskvars_list(globalVar.g_uuid) + + alltasksok = False + + # flash eeprom + resulteeprom = 0 + if "eeprompath" in varlist and len(varlist["eeprompath"]) > 0: + mac0 = psdbObj.get_board_macaddr(globalVar.g_uuid) + resulteeprom, eepromdata = flash_eeprom(varlist["eeprompath"], globalVar.g_uuid, mac0) + psdbObj.create_task_result(globalVar.taskid_ctl, "FLASHEEPROM", + "TASK_OK" if resulteeprom == 0 else "TASK_FAIL", eepromdata) + + # flash non-volatile memory + resultmemory = 0 + if "flashimagepath" in varlist and len(varlist["flashimagepath"]) > 0: + resultmemory = flash_memory(varlist["flashimagepath"]) + psdbObj.create_task_result(globalVar.taskid_ctl, "FLASHMEMORY", + "TASK_OK" if resultmemory == 0 else "TASK_FAIL", + varlist["flashimagepath"]) + + # update status with the result + if resulteeprom == 0 and resultmemory == 0: + psdbObj.update_taskctl_status(globalVar.taskid_ctl, "TASK_BOARD_OK") + # Change state to "WAITING_FOR_SCANNER" + if psdbObj.read_station_state(globalVar.station) == StationStates.STATION_ERROR.name: + # Abort + print("Error: Wrong previous station state.") + exit(0) + psdbObj.change_station_state(globalVar.station, StationStates.WAITING_FOR_SCANNER.name) + # get barcode using the scanner + qrreceived = False + while not qrreceived: + qr = QRReader() + if qr.openQR(): + # waits 5s to receive a valid code + if qr.readQRasync(5): + qrreceived = True + factorycode = qr.getQRNumber() + psdbObj.set_factorycode(globalVar.g_uuid, factorycode) + qr.closeQR() + # Change state to "FINISHED" + if psdbObj.read_station_state(globalVar.station) == StationStates.STATION_ERROR.name: + # Abort + print("Error: Wrong previous station state.") + exit(0) + psdbObj.change_station_state(globalVar.station, StationStates.FINISHED.name) + else: + psdbObj.update_taskctl_status(globalVar.taskid_ctl, "TASK_BOARD_FAIL") + loggerObj.getlogger().info("Station error: #Unable to complete extra tasks#") + else: - # create a process - globalVar.testid_ctl = psdbObj.open_test(globalVar.g_uuid) - # create and run tests according to the board type - runner = SimpleTestRunner(psdbObj) - loggerObj.getlogger().info("Tests running") - testresult = runner.run(create_testsuite()) - # save dmesg - generate_dmesg(globalVar.testid_ctl, psdbObj) - - # execute aditional tasks, only if the test was succesfull - if testresult.wasSuccessful(): - loggerObj.getlogger().info("Extra tasks running") - # create task control - globalVar.taskid_ctl = psdbObj.open_task(globalVar.g_uuid) - # get extra variables - varlist = get_taskvars_list(globalVar.g_uuid) - - alltasksok = False - - # flash eeprom - resulteeprom = 0 - if "eeprompath" in varlist and len(varlist["eeprompath"]) > 0: - mac0 = psdbObj.get_board_macaddr(globalVar.g_uuid) - resulteeprom, eepromdata = flash_eeprom(varlist["eeprompath"], globalVar.g_uuid, mac0) - psdbObj.create_task_result(globalVar.taskid_ctl, "FLASHEEPROM", - "TASK_OK" if resulteeprom == 0 else "TASK_FAIL", eepromdata) - - # flash non-volatile memory - resultmemory = 0 - if "flashimagepath" in varlist and len(varlist["flashimagepath"]) > 0: - resultmemory = flash_memory(varlist["flashimagepath"]) - psdbObj.create_task_result(globalVar.taskid_ctl, "FLASHMEMORY", - "TASK_OK" if resultmemory == 0 else "TASK_FAIL", - varlist["flashimagepath"]) - - # update status with the result - if resulteeprom == 0 and resultmemory == 0: - alltasksok = True - psdbObj.update_taskctl_status(globalVar.taskid_ctl, "TASK_BOARD_OK") - else: - psdbObj.update_taskctl_status(globalVar.taskid_ctl, "TASK_BOARD_FAIL") - - if alltasksok: - # get barcode using the scanner - loggerObj.getlogger().info("Waiting for barcode scanner") - qrreceived = False - while not qrreceived: - qr = QRReader() - if qr.openQR(): - # waits 5s to receive a valid code - if qr.readQRasync(5): - qrreceived = True - factorycode = qr.getQRNumber() - psdbObj.set_factorycode(globalVar.g_uuid, factorycode) - qr.closeQR() - - loggerObj.getlogger().info("Python program finished") + # Change state to "TESTS_FAILED" + if psdbObj.read_station_state(globalVar.station) == StationStates.STATION_ERROR.name: + # Abort + print("Error: Wrong previous station state.") + exit(0) + psdbObj.change_station_state(globalVar.station, StationStates.TESTS_FAILED.name) if __name__ == "__main__": @@ -202,18 +236,68 @@ if __name__ == "__main__": # create logger loggerObj = ISEE_Logger(logging.INFO) - loggerObj.getlogger().info("Python program started") # Try to parse the setup.xml file try: xmlObj = XMLSetup(os.path.join(test_abspath, "setup.xml")) except: + # Abort print("Error: Cannot parse setup.xml file") - exit(1) + loggerObj.getlogger().info("Station error: #Cannot parse XML configuration file#") + exit(0) # Try to connect to the DB, according to setup.xml configuration - if psdbObj.open(xmlObj): - main() - else: + if not psdbObj.open(xmlObj): print("Error: Cannot open DB connection") - exit(2) + loggerObj.getlogger().info("Station error: #Cannot open DB connection#") + exit(0) + + # get station name + globalVar.station = socket.gethostname() + + # Check if current state is "WAIT_TEST_START" + currentstate = psdbObj.read_station_state(globalVar.station) + if currentstate != StationStates.WAIT_TEST_START.name: + # Abort + print("Error: Wrong previous station state.") + exit(0) + + # Change state to "TESTS_CHECKING_ENV" + psdbObj.change_station_state(globalVar.station, StationStates.TESTS_CHECKING_ENV.name) + + # Look for a sd or pendrive + # TODO + #if not disk_exists("/dev/mmcblk0") and not : + # abort + #print("error: cannot find a barcode scanner.") + #loggerObj.getlogger().info("station error: #cannot find a sd card or memory usb to load the test environment.#") + #exit(0) + + # Look for a barcode scanner + qr = QRReader() + if qr.openQR(): + qr.closeQR() + else: + # Abort + print("Error: Cannot find a barcode scanner.") + loggerObj.getlogger().info("Station error: #Cannot find a barcode scanner#") + exit(0) + + # Look for an amperimeter + amp = Amper() + if not amp.open(): + # Abort + print("Error: Cannot open an amperimeter COM port.") + loggerObj.getlogger().info("Station error: #Cannot open an amperimeter COM port#") + exit(0) + if not amp.hello(): + if not amp.hello(): + # Abort + print("Error: Cannot open find an amperimeter.") + loggerObj.getlogger().info("Station error: #Cannot open find an amperimeter#") + exit(0) + + # Execute main + main() + + exit(0) -- cgit v1.1