summaryrefslogtreecommitdiff
path: root/test-cli/test/tests/qduplex_ser.py
diff options
context:
space:
mode:
Diffstat (limited to 'test-cli/test/tests/qduplex_ser.py')
-rw-r--r--test-cli/test/tests/qduplex_ser.py69
1 files changed, 51 insertions, 18 deletions
diff --git a/test-cli/test/tests/qduplex_ser.py b/test-cli/test/tests/qduplex_ser.py
index 98bda81..170039b 100644
--- a/test-cli/test/tests/qduplex_ser.py
+++ b/test-cli/test/tests/qduplex_ser.py
@@ -1,46 +1,79 @@
-from test.helpers.syscmd import SysCommand
import unittest
import uuid
import serial
import time
+
class Qduplex(unittest.TestCase):
+ params = None
+ __port1 = None
+ __port2 = None
+ __baudrate = None
+ __resultlist = None # resultlist is a python list of python dictionaries
- def __init__(self, testname, testfunc, port1, port2, baudrate):
+ # varlist: port1, port2, baudrate
+ def __init__(self, testname, testfunc, varlist):
+ self.params = varlist
super(Qduplex, self).__init__(testfunc)
- self.__port1 = port1
- self.__serial1 = serial.Serial(self.__port1, timeout=1)
- self.__serial1.baudrate = baudrate
-
- self.__port2 = port2
- self.__serial2 = serial.Serial(self.__port2, timeout=1)
- self.__serial2.baudrate = baudrate
+ if "port1" in varlist:
+ self.__port1 = varlist["port1"]
+ else:
+ raise Exception('port1 param inside Qduplex must be defined')
+ if "port2" in varlist:
+ self.__port2 = varlist["port2"]
+ else:
+ raise Exception('port2 param inside Qduplex must be defined')
+ if "baudrate" in varlist:
+ self.__baudrate = varlist["baudrate"]
+ else:
+ raise Exception('baudrate param inside Qduplex must be defined')
self._testMethodDoc = testname
+ self.__resultlist = []
+
+ # open serial connection
+ self.__serial1 = serial.Serial(self.__port1, self.__baudrate, timeout=1)
+ self.__serial2 = serial.Serial(self.__port2, self.__baudrate, timeout=1)
def __del__(self):
self.__serial1.close()
self.__serial2.close()
def execute(self):
+ # generate a random uuid
+ test_uuid = str(uuid.uuid4()).encode()
+
+ # clean serial ports
self.__serial1.flushInput()
self.__serial1.flushOutput()
self.__serial2.flushInput()
self.__serial2.flushOutput()
- test_uuid = str(uuid.uuid4()).encode()
+ # send the uuid through serial port
self.__serial1.write(test_uuid)
time.sleep(0.05) # there might be a small delay
if self.__serial2.inWaiting() == 0:
- self.fail("failed: PORT {} wait timeout exceded, wrong communication?".format(self.__port2))
+ self.fail("failed: port {} wait timeout exceded".format(self.__port2))
else:
- if (self.__serial2.readline() != test_uuid):
- self.fail("failed: PORT {} write/read mismatch".format(self.__port2))
+ if self.__serial2.readline() != test_uuid:
+ self.fail("failed: port {} write/read mismatch".format(self.__port2))
- test_uuid = str(uuid.uuid4()).encode()
+ time.sleep(0.05) # there might be a small delay
+
+ # clean serial ports
+ self.__serial1.flushInput()
+ self.__serial1.flushOutput()
+ self.__serial2.flushInput()
+ self.__serial2.flushOutput()
+ # send the uuid through serial port
self.__serial2.write(test_uuid)
time.sleep(0.05) # there might be a small delay
if self.__serial1.inWaiting() == 0:
- self.fail("failed: PORT {} wait timeout exceded, wrong communication?".format(self.__port1))
+ self.fail("failed: port {} wait timeout exceded".format(self.__port1))
else:
- if (self.__serial1.readline() != test_uuid):
- self.fail("failed: PORT {} write/read mismatch".format(self.__port1))
-
+ if self.__serial1.readline() != test_uuid:
+ self.fail("failed: port {} write/read mismatch".format(self.__port1))
+
+ def getresults(self):
+ return self.__resultlist
+
+ def gettextresult(self):
+ return ""