From c685367cbd6abf1c6ae442df759e39b25a907d3b Mon Sep 17 00:00:00 2001 From: Hector Fernandez Date: Mon, 9 Mar 2020 12:39:50 +0100 Subject: Fixed several test procedures. --- test-cli/.idea/workspace.xml | 37 +++++++--------- test-cli/test/helpers/amper.py | 1 - test-cli/test/helpers/changedir.py | 14 ++++++ test-cli/test/helpers/usb.sh | 2 +- test-cli/test/tests/qamper.py | 27 ++++-------- test-cli/test/tests/qduplex_ser.py | 40 ++++++++++-------- test-cli/test/tests/qeeprom.py | 1 - test-cli/test/tests/qi2c.py | 6 +-- test-cli/test/tests/qrtc.py | 26 +++++++----- test-cli/test/tests/qserial.py | 22 ++++++---- test-cli/test/tests/qusb.py | 87 ++++++++++++++------------------------ 11 files changed, 125 insertions(+), 138 deletions(-) create mode 100644 test-cli/test/helpers/changedir.py (limited to 'test-cli') diff --git a/test-cli/.idea/workspace.xml b/test-cli/.idea/workspace.xml index 7e0aea1..872ab45 100644 --- a/test-cli/.idea/workspace.xml +++ b/test-cli/.idea/workspace.xml @@ -2,12 +2,17 @@ - - + - - - + + + + + + + + + @@ -148,25 +153,13 @@ + + + + - - - - - file://$PROJECT_DIR$/test/runners/simple.py - 83 - - - file://$PROJECT_DIR$/test/runners/simple.py - 58 - - - - \ No newline at end of file diff --git a/test-cli/test/helpers/amper.py b/test-cli/test/helpers/amper.py index 45ec7db..ea719f6 100644 --- a/test-cli/test/helpers/amper.py +++ b/test-cli/test/helpers/amper.py @@ -1,6 +1,5 @@ import serial import scanf -import time class Amper(object): diff --git a/test-cli/test/helpers/changedir.py b/test-cli/test/helpers/changedir.py new file mode 100644 index 0000000..fad9ade --- /dev/null +++ b/test-cli/test/helpers/changedir.py @@ -0,0 +1,14 @@ +import os + + +class changedir: + """Context manager for changing the current working directory""" + def __init__(self, newPath): + self.newPath = os.path.expanduser(newPath) + + def __enter__(self): + self.savedPath = os.getcwd() + os.chdir(self.newPath) + + def __exit__(self, etype, value, traceback): + os.chdir(self.savedPath) \ No newline at end of file diff --git a/test-cli/test/helpers/usb.sh b/test-cli/test/helpers/usb.sh index c8e6924..52ea4b2 100755 --- a/test-cli/test/helpers/usb.sh +++ b/test-cli/test/helpers/usb.sh @@ -11,4 +11,4 @@ for sysdevpath in $(find /sys/bus/usb/devices/usb*/ -name dev); do ) done -return 0 +exit 0 diff --git a/test-cli/test/tests/qamper.py b/test-cli/test/tests/qamper.py index 2b02302..b3ab8b1 100644 --- a/test-cli/test/tests/qamper.py +++ b/test-cli/test/tests/qamper.py @@ -22,30 +22,21 @@ class Qamper(unittest.TestCase): def execute(self): amp = Amper() - print(amp) + # open serial connection if not amp.open(): - print("1") self.fail("failed: can not open serial port") return - + # check if the amperimeter is connected and working if not amp.hello(): - print("2") self.fail("failed: can not communicate") return - + # get current value (in Amperes) result = amp.getCurrent() - print(result) - + # close serial connection amp.close() + # # In order to give a valid result it is importarnt to define an under current value + if result > float(self._overcurrent): + self.fail("failed: Overcurrent detected ( {} )".format(result)) - - - - - - # # In order to give a valid result it is importarnt to define an under current value - # if self._current > float(self._overcurrent): - # self.fail("failed: OVERCURRENT DETECTED ( {} )".format(self._current)) - # - # if self._current < float(self._undercurrent): - # self.fail("failed: UNDERCURRENT DETECTED ( {} )".format(self._current)) + if result < float(self._undercurrent): + self.fail("failed: Undercurrent detected ( {} )".format(result)) diff --git a/test-cli/test/tests/qduplex_ser.py b/test-cli/test/tests/qduplex_ser.py index 46fb5db..0666363 100644 --- a/test-cli/test/tests/qduplex_ser.py +++ b/test-cli/test/tests/qduplex_ser.py @@ -6,7 +6,11 @@ import time class Qduplex(unittest.TestCase): params = None + __port1 = None + __port2 = None + __baudrate = None + # varlist: port1, port2, baudrate def __init__(self, testname, testfunc, varlist): self.params = varlist super(Qduplex, self).__init__(testfunc) @@ -14,46 +18,46 @@ class Qduplex(unittest.TestCase): self.__port1 = varlist["port1"] else: raise Exception('port1 param inside Qduplex must be defined') - self.__serial1 = serial.Serial(self.__port1, timeout=1) - if "port2" in varlist: self.__port2 = varlist["port2"] else: raise Exception('port2 param inside Qduplex must be defined') - self.__serial2 = serial.Serial(self.__port2, timeout=1) - if "baudrate" in varlist: - self.__serial1.baudrate = varlist["baudrate"] - self.__serial2.baudrate = varlist["baudrate"] + self.__baudrate = varlist["baudrate"] else: raise Exception('baudrate param inside Qduplex must be defined') - self._testMethodDoc = testname + # open serial connection + self.__serial1 = serial.Serial(self.__port1, self.__baudrate, timeout=1) + self.__serial1.flushInput() + self.__serial1.flushOutput() + self.__serial2 = serial.Serial(self.__port2, self.__baudrate, timeout=1) + self.__serial2.flushInput() + self.__serial2.flushOutput() + def __del__(self): self.__serial1.close() self.__serial2.close() def execute(self): - self.__serial1.flushInput() - self.__serial1.flushOutput() - self.__serial2.flushInput() - self.__serial2.flushOutput() + # generate a random uuid test_uuid = str(uuid.uuid4()).encode() + # send the uuid through serial port self.__serial1.write(test_uuid) time.sleep(0.05) # there might be a small delay if self.__serial2.inWaiting() == 0: - self.fail("failed: PORT {} wait timeout exceded, wrong communication?".format(self.__port2)) + self.fail("failed: port {} wait timeout exceded".format(self.__port2)) else: - if (self.__serial2.readline() != test_uuid): - self.fail("failed: PORT {} write/read mismatch".format(self.__port2)) + if self.__serial2.readline() != test_uuid: + self.fail("failed: port {} write/read mismatch".format(self.__port2)) test_uuid = str(uuid.uuid4()).encode() + # send the uuid through serial port self.__serial2.write(test_uuid) time.sleep(0.05) # there might be a small delay if self.__serial1.inWaiting() == 0: - self.fail("failed: PORT {} wait timeout exceded, wrong communication?".format(self.__port1)) + self.fail("failed: port {} wait timeout exceded".format(self.__port1)) else: - if (self.__serial1.readline() != test_uuid): - self.fail("failed: PORT {} write/read mismatch".format(self.__port1)) - + if self.__serial1.readline() != test_uuid: + self.fail("failed: port {} write/read mismatch".format(self.__port1)) diff --git a/test-cli/test/tests/qeeprom.py b/test-cli/test/tests/qeeprom.py index d2c55ef..1921bf7 100644 --- a/test-cli/test/tests/qeeprom.py +++ b/test-cli/test/tests/qeeprom.py @@ -34,7 +34,6 @@ class Qeeprom(unittest.TestCase): # compare both values if data_rx != data_tx: self.fail("failed: mismatch between written and received values.") - else: self.fail("failed: Unable to read from the EEPROM device.") else: diff --git a/test-cli/test/tests/qi2c.py b/test-cli/test/tests/qi2c.py index 71eb64e..c59975e 100644 --- a/test-cli/test/tests/qi2c.py +++ b/test-cli/test/tests/qi2c.py @@ -20,7 +20,7 @@ class Qi2c(unittest.TestCase): self._testMethodDoc = testname def execute(self): - str_cmd= "i2cdetect -a -y -r {}".format(self.__busnum) + str_cmd = "i2cdetect -a -y -r {}".format(self.__busnum) i2c_command = SysCommand("i2cdetect", str_cmd) if i2c_command.execute() == 0: self.__raw_out = i2c_command.getOutput() @@ -28,8 +28,8 @@ class Qi2c(unittest.TestCase): return -1 lines=self.__raw_out.decode('ascii').splitlines() for i in range(len(lines)): - if (lines[i].count('UU')): - if (lines[i].find("UU")): + if lines[i].count('UU'): + if lines[i].find("UU"): self.__devices.append("0x{}{}".format((i - 1), hex(int((lines[i].find("UU") - 4) / 3)).split('x')[-1])) for i in range(len(self.__register)): if not(self.__register[i] in self.__devices): diff --git a/test-cli/test/tests/qrtc.py b/test-cli/test/tests/qrtc.py index 884a290..0be0f99 100644 --- a/test-cli/test/tests/qrtc.py +++ b/test-cli/test/tests/qrtc.py @@ -1,11 +1,12 @@ -from test.helpers.syscmd import SysCommand +import sh import unittest import time +import re class Qrtc(unittest.TestCase): params = None - __rtc = "/dev/rtc0" + __rtc = None def __init__(self, testname, testfunc, varlist): self.params = varlist @@ -17,16 +18,19 @@ class Qrtc(unittest.TestCase): self._testMethodDoc = testname def execute(self): - str_cmd = "hwclock -f {}".format(self.__rtc) - rtc_set = SysCommand("rtc_set", str_cmd) - if rtc_set.execute() == 0: - curr_hour = rtc_set.getOutput().decode('ascii').split(" ") - time1 = int((curr_hour[4].split(":"))[2]) + # get time from RTC peripheral + p = sh.hwclock("-f", self.__rtc) + if p.exit_code == 0: + time1 = re.search("([01]?[0-9]{1}|2[0-3]{1})[:][0-5]{1}[0-9]{1}[:][0-5]{1}[0-9]{1}", + p.stdout.decode('ascii')) time.sleep(1) - if rtc_set.execute() == 0: - curr_hour = rtc_set.getOutput().decode('ascii').split(" ") - time2 = int((curr_hour[4].split(":"))[2]) - if time1==time2: + # get time from RTC another time + p = sh.hwclock("-f", self.__rtc) + if p.exit_code == 0: + time2 = re.search("([01]?[0-9]{1}|2[0-3]{1})[:][0-5]{1}[0-9]{1}[:][0-5]{1}[0-9]{1}", + p.stdout.decode('ascii')) + # check if the seconds of both times are different + if time1 == time2: self.fail("failed: RTC is not running") else: self.fail("failed: couldn't execute hwclock command 2nd time") diff --git a/test-cli/test/tests/qserial.py b/test-cli/test/tests/qserial.py index d798578..67e3af1 100644 --- a/test-cli/test/tests/qserial.py +++ b/test-cli/test/tests/qserial.py @@ -6,7 +6,10 @@ import time class Qserial(unittest.TestCase): params = None + __port = None + __baudrate = None + #varlist: port, baudrate def __init__(self, testname, testfunc, varlist): self.params = varlist super(Qserial, self).__init__(testfunc) @@ -14,28 +17,31 @@ class Qserial(unittest.TestCase): self.__port = varlist["port"] else: raise Exception('port param inside Qserial must be defined') - self.__serial = serial.Serial(self.__port, timeout=1) + if "baudrate" in varlist: self.__baudrate = varlist["baudrate"] else: raise Exception('baudrate param inside Qserial must be defined') self._testMethodDoc = testname + # open serial connection + self.__serial = serial.Serial(self.__port, self.__baudrate, timeout=1) + self.__serial.flushInput() + self.__serial.flushOutput() + def __del__(self): self.__serial.close() def execute(self): - self.__serial.flushInput() - self.__serial.flushOutput() - #generate a random uuid + # generate a random uuid test_uuid = str(uuid.uuid4()).encode() - #send the uuid through serial port + # send the uuid through serial port self.__serial.write(test_uuid) time.sleep(0.05) # there might be a small delay if self.__serial.inWaiting() == 0: - self.fail("failed: PORT {} wait timeout exceded, wrong communication?".format(self.__port)) + self.fail("failed: port {} wait timeout exceded".format(self.__port)) else: # check if what it was sent is equal to what has been received - if (self.__serial.readline() != test_uuid): - self.fail("failed: PORT {} write/read mismatch".format(self.__port)) + if self.__serial.readline() != test_uuid: + self.fail("failed: port {} write/read mismatch".format(self.__port)) diff --git a/test-cli/test/tests/qusb.py b/test-cli/test/tests/qusb.py index 70265a5..7ecf31e 100644 --- a/test-cli/test/tests/qusb.py +++ b/test-cli/test/tests/qusb.py @@ -1,69 +1,46 @@ -from test.helpers.syscmd import SysCommand +import sh import unittest +import re +from test.helpers.changedir import changedir class Qusb(unittest.TestCase): - __numPorts = None - __devLabel = None params = None def __init__(self, testname, testfunc, varlist): self.params = varlist super(Qusb, self).__init__(testfunc) - if "numPorts" in varlist: - self.__numPorts = varlist["numPorts"] - else: - raise Exception('numPorts param inside Qusb must be defined') self._testMethodDoc = testname - if "devLabel" in varlist: - self.__devLabel = varlist["devLabel"] - else: - raise Exception('devLabel param inside Qusb must be defined') - if testname == "USBOTG": - self.__usbFileName = "/this_is_an_usb_otg" - self.__usbtext = "USBOTG" - else: - self.__usbFileName = "/this_is_an_usb_host" - self.__usbtext = "USBHOST" - self.__numUsbFail = [] def execute(self): - str_cmd = "lsblk -o LABEL" - lsblk_command = SysCommand("lsblk", str_cmd) - if lsblk_command.execute() == 0: - self.__raw_out = lsblk_command.getOutput() - if self.__raw_out == "": - return -1 - lines = lsblk_command.getOutput().splitlines() - host_list = [] - for i in range(len(lines)): - if str(lines[i].decode('ascii')) == self.__devLabel: - host_list.append(i) - if len(host_list) == int(self.__numPorts): - str_cmd = "lsblk -o MOUNTPOINT" - lsblk_command = SysCommand("lsblk", str_cmd) - if lsblk_command.execute() == 0: - self.__raw_out = lsblk_command.getOutput() - if self.__raw_out == "": - print("failed: no command output") - self.fail("failed: no command output") - else: - lines = lsblk_command.getOutput().splitlines() - for i in range(len(host_list)): - file_path = str(lines[host_list[i]].decode('ascii')) + self.__usbFileName - usb_file = open(file_path, 'r') - read = usb_file.read() - if read.find(self.__usbtext) != -1: - print(file_path + " --> OK!") - else: - self.fail( - "failed: could not read from usb {}".format(lines[host_list[i]].decode('ascii'))) - self.__numUsbFail.append(host_list[i]) - usb_file.close() - else: - self.fail("failed: couldn't execute lsblk command") - + # Execute script usb.sh + p = sh.bash('test/helpers/usb.sh') + # Search in the stdout a pattern "/dev/sd + {letter} + {number} + q = re.search("/dev/sd\w\d", p.stdout.decode('ascii')) + # get the first device which matches the pattern + device = q.group(0) + # create a new folder where the pendrive is going to be mounted + sh.mkdir("-p", "/mnt/pendrive") + # mount the device + p = sh.mount(device, "/mnt/pendrive") + if p.exit_code == 0: + # copy files + p = sh.cp("/root/usbtest/usbdatatest.bin", "/root/usbtest/usbdatatest.md5", "/mnt/pendrive") + if p.exit_code == 0: + # check md5 + with changedir("/mnt/pendrive/"): + p = sh.md5sum("-c", "usbdatatest.md5") + q = re.search("OK", p.stdout.decode('ascii')) + # delete files + sh.rm("-f", "/mnt/pendrive/usbdatatest.bin", "/mnt/pendrive/usbdatatest.md5") + # umount + sh.umount("/mnt/pendrive") + # check result + if q is None: + self.fail("failed: wrong md5 result.") else: - self.fail("failed: reference and real usb host devices number mismatch") + # umount + sh.umount("/mnt/pendrive") + self.fail("failed: unable to copy files.") else: - self.fail("failed: couldn't execute lsblk command") + self.fail("failed: unable to mount the device.") -- cgit v1.1