summaryrefslogtreecommitdiff
path: root/test-cli/test/tests
diff options
context:
space:
mode:
Diffstat (limited to 'test-cli/test/tests')
-rw-r--r--test-cli/test/tests/qamper.py27
-rw-r--r--test-cli/test/tests/qduplex_ser.py40
-rw-r--r--test-cli/test/tests/qeeprom.py1
-rw-r--r--test-cli/test/tests/qi2c.py6
-rw-r--r--test-cli/test/tests/qrtc.py26
-rw-r--r--test-cli/test/tests/qserial.py22
-rw-r--r--test-cli/test/tests/qusb.py87
7 files changed, 95 insertions, 114 deletions
diff --git a/test-cli/test/tests/qamper.py b/test-cli/test/tests/qamper.py
index 2b02302..b3ab8b1 100644
--- a/test-cli/test/tests/qamper.py
+++ b/test-cli/test/tests/qamper.py
@@ -22,30 +22,21 @@ class Qamper(unittest.TestCase):
def execute(self):
amp = Amper()
- print(amp)
+ # open serial connection
if not amp.open():
- print("1")
self.fail("failed: can not open serial port")
return
-
+ # check if the amperimeter is connected and working
if not amp.hello():
- print("2")
self.fail("failed: can not communicate")
return
-
+ # get current value (in Amperes)
result = amp.getCurrent()
- print(result)
-
+ # close serial connection
amp.close()
+ # # In order to give a valid result it is importarnt to define an under current value
+ if result > float(self._overcurrent):
+ self.fail("failed: Overcurrent detected ( {} )".format(result))
-
-
-
-
-
- # # In order to give a valid result it is importarnt to define an under current value
- # if self._current > float(self._overcurrent):
- # self.fail("failed: OVERCURRENT DETECTED ( {} )".format(self._current))
- #
- # if self._current < float(self._undercurrent):
- # self.fail("failed: UNDERCURRENT DETECTED ( {} )".format(self._current))
+ if result < float(self._undercurrent):
+ self.fail("failed: Undercurrent detected ( {} )".format(result))
diff --git a/test-cli/test/tests/qduplex_ser.py b/test-cli/test/tests/qduplex_ser.py
index 46fb5db..0666363 100644
--- a/test-cli/test/tests/qduplex_ser.py
+++ b/test-cli/test/tests/qduplex_ser.py
@@ -6,7 +6,11 @@ import time
class Qduplex(unittest.TestCase):
params = None
+ __port1 = None
+ __port2 = None
+ __baudrate = None
+ # varlist: port1, port2, baudrate
def __init__(self, testname, testfunc, varlist):
self.params = varlist
super(Qduplex, self).__init__(testfunc)
@@ -14,46 +18,46 @@ class Qduplex(unittest.TestCase):
self.__port1 = varlist["port1"]
else:
raise Exception('port1 param inside Qduplex must be defined')
- self.__serial1 = serial.Serial(self.__port1, timeout=1)
-
if "port2" in varlist:
self.__port2 = varlist["port2"]
else:
raise Exception('port2 param inside Qduplex must be defined')
- self.__serial2 = serial.Serial(self.__port2, timeout=1)
-
if "baudrate" in varlist:
- self.__serial1.baudrate = varlist["baudrate"]
- self.__serial2.baudrate = varlist["baudrate"]
+ self.__baudrate = varlist["baudrate"]
else:
raise Exception('baudrate param inside Qduplex must be defined')
-
self._testMethodDoc = testname
+ # open serial connection
+ self.__serial1 = serial.Serial(self.__port1, self.__baudrate, timeout=1)
+ self.__serial1.flushInput()
+ self.__serial1.flushOutput()
+ self.__serial2 = serial.Serial(self.__port2, self.__baudrate, timeout=1)
+ self.__serial2.flushInput()
+ self.__serial2.flushOutput()
+
def __del__(self):
self.__serial1.close()
self.__serial2.close()
def execute(self):
- self.__serial1.flushInput()
- self.__serial1.flushOutput()
- self.__serial2.flushInput()
- self.__serial2.flushOutput()
+ # generate a random uuid
test_uuid = str(uuid.uuid4()).encode()
+ # send the uuid through serial port
self.__serial1.write(test_uuid)
time.sleep(0.05) # there might be a small delay
if self.__serial2.inWaiting() == 0:
- self.fail("failed: PORT {} wait timeout exceded, wrong communication?".format(self.__port2))
+ self.fail("failed: port {} wait timeout exceded".format(self.__port2))
else:
- if (self.__serial2.readline() != test_uuid):
- self.fail("failed: PORT {} write/read mismatch".format(self.__port2))
+ if self.__serial2.readline() != test_uuid:
+ self.fail("failed: port {} write/read mismatch".format(self.__port2))
test_uuid = str(uuid.uuid4()).encode()
+ # send the uuid through serial port
self.__serial2.write(test_uuid)
time.sleep(0.05) # there might be a small delay
if self.__serial1.inWaiting() == 0:
- self.fail("failed: PORT {} wait timeout exceded, wrong communication?".format(self.__port1))
+ self.fail("failed: port {} wait timeout exceded".format(self.__port1))
else:
- if (self.__serial1.readline() != test_uuid):
- self.fail("failed: PORT {} write/read mismatch".format(self.__port1))
-
+ if self.__serial1.readline() != test_uuid:
+ self.fail("failed: port {} write/read mismatch".format(self.__port1))
diff --git a/test-cli/test/tests/qeeprom.py b/test-cli/test/tests/qeeprom.py
index d2c55ef..1921bf7 100644
--- a/test-cli/test/tests/qeeprom.py
+++ b/test-cli/test/tests/qeeprom.py
@@ -34,7 +34,6 @@ class Qeeprom(unittest.TestCase):
# compare both values
if data_rx != data_tx:
self.fail("failed: mismatch between written and received values.")
-
else:
self.fail("failed: Unable to read from the EEPROM device.")
else:
diff --git a/test-cli/test/tests/qi2c.py b/test-cli/test/tests/qi2c.py
index 71eb64e..c59975e 100644
--- a/test-cli/test/tests/qi2c.py
+++ b/test-cli/test/tests/qi2c.py
@@ -20,7 +20,7 @@ class Qi2c(unittest.TestCase):
self._testMethodDoc = testname
def execute(self):
- str_cmd= "i2cdetect -a -y -r {}".format(self.__busnum)
+ str_cmd = "i2cdetect -a -y -r {}".format(self.__busnum)
i2c_command = SysCommand("i2cdetect", str_cmd)
if i2c_command.execute() == 0:
self.__raw_out = i2c_command.getOutput()
@@ -28,8 +28,8 @@ class Qi2c(unittest.TestCase):
return -1
lines=self.__raw_out.decode('ascii').splitlines()
for i in range(len(lines)):
- if (lines[i].count('UU')):
- if (lines[i].find("UU")):
+ if lines[i].count('UU'):
+ if lines[i].find("UU"):
self.__devices.append("0x{}{}".format((i - 1), hex(int((lines[i].find("UU") - 4) / 3)).split('x')[-1]))
for i in range(len(self.__register)):
if not(self.__register[i] in self.__devices):
diff --git a/test-cli/test/tests/qrtc.py b/test-cli/test/tests/qrtc.py
index 884a290..0be0f99 100644
--- a/test-cli/test/tests/qrtc.py
+++ b/test-cli/test/tests/qrtc.py
@@ -1,11 +1,12 @@
-from test.helpers.syscmd import SysCommand
+import sh
import unittest
import time
+import re
class Qrtc(unittest.TestCase):
params = None
- __rtc = "/dev/rtc0"
+ __rtc = None
def __init__(self, testname, testfunc, varlist):
self.params = varlist
@@ -17,16 +18,19 @@ class Qrtc(unittest.TestCase):
self._testMethodDoc = testname
def execute(self):
- str_cmd = "hwclock -f {}".format(self.__rtc)
- rtc_set = SysCommand("rtc_set", str_cmd)
- if rtc_set.execute() == 0:
- curr_hour = rtc_set.getOutput().decode('ascii').split(" ")
- time1 = int((curr_hour[4].split(":"))[2])
+ # get time from RTC peripheral
+ p = sh.hwclock("-f", self.__rtc)
+ if p.exit_code == 0:
+ time1 = re.search("([01]?[0-9]{1}|2[0-3]{1})[:][0-5]{1}[0-9]{1}[:][0-5]{1}[0-9]{1}",
+ p.stdout.decode('ascii'))
time.sleep(1)
- if rtc_set.execute() == 0:
- curr_hour = rtc_set.getOutput().decode('ascii').split(" ")
- time2 = int((curr_hour[4].split(":"))[2])
- if time1==time2:
+ # get time from RTC another time
+ p = sh.hwclock("-f", self.__rtc)
+ if p.exit_code == 0:
+ time2 = re.search("([01]?[0-9]{1}|2[0-3]{1})[:][0-5]{1}[0-9]{1}[:][0-5]{1}[0-9]{1}",
+ p.stdout.decode('ascii'))
+ # check if the seconds of both times are different
+ if time1 == time2:
self.fail("failed: RTC is not running")
else:
self.fail("failed: couldn't execute hwclock command 2nd time")
diff --git a/test-cli/test/tests/qserial.py b/test-cli/test/tests/qserial.py
index d798578..67e3af1 100644
--- a/test-cli/test/tests/qserial.py
+++ b/test-cli/test/tests/qserial.py
@@ -6,7 +6,10 @@ import time
class Qserial(unittest.TestCase):
params = None
+ __port = None
+ __baudrate = None
+ #varlist: port, baudrate
def __init__(self, testname, testfunc, varlist):
self.params = varlist
super(Qserial, self).__init__(testfunc)
@@ -14,28 +17,31 @@ class Qserial(unittest.TestCase):
self.__port = varlist["port"]
else:
raise Exception('port param inside Qserial must be defined')
- self.__serial = serial.Serial(self.__port, timeout=1)
+
if "baudrate" in varlist:
self.__baudrate = varlist["baudrate"]
else:
raise Exception('baudrate param inside Qserial must be defined')
self._testMethodDoc = testname
+ # open serial connection
+ self.__serial = serial.Serial(self.__port, self.__baudrate, timeout=1)
+ self.__serial.flushInput()
+ self.__serial.flushOutput()
+
def __del__(self):
self.__serial.close()
def execute(self):
- self.__serial.flushInput()
- self.__serial.flushOutput()
- #generate a random uuid
+ # generate a random uuid
test_uuid = str(uuid.uuid4()).encode()
- #send the uuid through serial port
+ # send the uuid through serial port
self.__serial.write(test_uuid)
time.sleep(0.05) # there might be a small delay
if self.__serial.inWaiting() == 0:
- self.fail("failed: PORT {} wait timeout exceded, wrong communication?".format(self.__port))
+ self.fail("failed: port {} wait timeout exceded".format(self.__port))
else:
# check if what it was sent is equal to what has been received
- if (self.__serial.readline() != test_uuid):
- self.fail("failed: PORT {} write/read mismatch".format(self.__port))
+ if self.__serial.readline() != test_uuid:
+ self.fail("failed: port {} write/read mismatch".format(self.__port))
diff --git a/test-cli/test/tests/qusb.py b/test-cli/test/tests/qusb.py
index 70265a5..7ecf31e 100644
--- a/test-cli/test/tests/qusb.py
+++ b/test-cli/test/tests/qusb.py
@@ -1,69 +1,46 @@
-from test.helpers.syscmd import SysCommand
+import sh
import unittest
+import re
+from test.helpers.changedir import changedir
class Qusb(unittest.TestCase):
- __numPorts = None
- __devLabel = None
params = None
def __init__(self, testname, testfunc, varlist):
self.params = varlist
super(Qusb, self).__init__(testfunc)
- if "numPorts" in varlist:
- self.__numPorts = varlist["numPorts"]
- else:
- raise Exception('numPorts param inside Qusb must be defined')
self._testMethodDoc = testname
- if "devLabel" in varlist:
- self.__devLabel = varlist["devLabel"]
- else:
- raise Exception('devLabel param inside Qusb must be defined')
- if testname == "USBOTG":
- self.__usbFileName = "/this_is_an_usb_otg"
- self.__usbtext = "USBOTG"
- else:
- self.__usbFileName = "/this_is_an_usb_host"
- self.__usbtext = "USBHOST"
- self.__numUsbFail = []
def execute(self):
- str_cmd = "lsblk -o LABEL"
- lsblk_command = SysCommand("lsblk", str_cmd)
- if lsblk_command.execute() == 0:
- self.__raw_out = lsblk_command.getOutput()
- if self.__raw_out == "":
- return -1
- lines = lsblk_command.getOutput().splitlines()
- host_list = []
- for i in range(len(lines)):
- if str(lines[i].decode('ascii')) == self.__devLabel:
- host_list.append(i)
- if len(host_list) == int(self.__numPorts):
- str_cmd = "lsblk -o MOUNTPOINT"
- lsblk_command = SysCommand("lsblk", str_cmd)
- if lsblk_command.execute() == 0:
- self.__raw_out = lsblk_command.getOutput()
- if self.__raw_out == "":
- print("failed: no command output")
- self.fail("failed: no command output")
- else:
- lines = lsblk_command.getOutput().splitlines()
- for i in range(len(host_list)):
- file_path = str(lines[host_list[i]].decode('ascii')) + self.__usbFileName
- usb_file = open(file_path, 'r')
- read = usb_file.read()
- if read.find(self.__usbtext) != -1:
- print(file_path + " --> OK!")
- else:
- self.fail(
- "failed: could not read from usb {}".format(lines[host_list[i]].decode('ascii')))
- self.__numUsbFail.append(host_list[i])
- usb_file.close()
- else:
- self.fail("failed: couldn't execute lsblk command")
-
+ # Execute script usb.sh
+ p = sh.bash('test/helpers/usb.sh')
+ # Search in the stdout a pattern "/dev/sd + {letter} + {number}
+ q = re.search("/dev/sd\w\d", p.stdout.decode('ascii'))
+ # get the first device which matches the pattern
+ device = q.group(0)
+ # create a new folder where the pendrive is going to be mounted
+ sh.mkdir("-p", "/mnt/pendrive")
+ # mount the device
+ p = sh.mount(device, "/mnt/pendrive")
+ if p.exit_code == 0:
+ # copy files
+ p = sh.cp("/root/usbtest/usbdatatest.bin", "/root/usbtest/usbdatatest.md5", "/mnt/pendrive")
+ if p.exit_code == 0:
+ # check md5
+ with changedir("/mnt/pendrive/"):
+ p = sh.md5sum("-c", "usbdatatest.md5")
+ q = re.search("OK", p.stdout.decode('ascii'))
+ # delete files
+ sh.rm("-f", "/mnt/pendrive/usbdatatest.bin", "/mnt/pendrive/usbdatatest.md5")
+ # umount
+ sh.umount("/mnt/pendrive")
+ # check result
+ if q is None:
+ self.fail("failed: wrong md5 result.")
else:
- self.fail("failed: reference and real usb host devices number mismatch")
+ # umount
+ sh.umount("/mnt/pendrive")
+ self.fail("failed: unable to copy files.")
else:
- self.fail("failed: couldn't execute lsblk command")
+ self.fail("failed: unable to mount the device.")