summaryrefslogtreecommitdiff
path: root/test-cli
diff options
context:
space:
mode:
Diffstat (limited to 'test-cli')
-rw-r--r--test-cli/setup.xml2
-rw-r--r--test-cli/test/enums/StationStates.py20
-rw-r--r--test-cli/test/enums/__init__.py0
-rw-r--r--test-cli/test/helpers/testsrv_db.py18
-rw-r--r--test-cli/test/runners/simple.py4
-rw-r--r--test-cli/test/tests/qamp.py76
-rw-r--r--test-cli/test/tests/qamper.py64
-rw-r--r--test-cli/test/tests/qaudio.py39
-rw-r--r--test-cli/test/tests/qduplex_ser.py43
-rw-r--r--test-cli/test/tests/qeeprom.py37
-rw-r--r--test-cli/test/tests/qethernet.py47
-rw-r--r--test-cli/test/tests/qi2c.py31
-rw-r--r--test-cli/test/tests/qnand.py33
-rw-r--r--test-cli/test/tests/qram.py22
-rw-r--r--test-cli/test/tests/qrtc.py38
-rw-r--r--test-cli/test/tests/qscreen.py27
-rw-r--r--test-cli/test/tests/qserial.py29
-rw-r--r--test-cli/test/tests/qtemplate.py51
-rw-r--r--test-cli/test/tests/qusb.py56
-rw-r--r--test-cli/test/tests/qwifi.py76
-rw-r--r--test-cli/test_main.py226
21 files changed, 593 insertions, 346 deletions
diff --git a/test-cli/setup.xml b/test-cli/setup.xml
index 81f0919..42c4be9 100644
--- a/test-cli/setup.xml
+++ b/test-cli/setup.xml
@@ -1,7 +1,7 @@
<?xml version="1.0"?>
<data>
<setup>
- <db dbname="testsrv" type="PgSQLConnection" host="192.168.60.3" port="5432" user="admin" password="Idkfa2009" /> <!-- database setup -->
+ <db dbname="testsrv" type="PgSQLConnection" host="192.168.60.3" port="5432" user="admin" password="Idkfa2009" />
</setup>
</data>
diff --git a/test-cli/test/enums/StationStates.py b/test-cli/test/enums/StationStates.py
new file mode 100644
index 0000000..9de5e15
--- /dev/null
+++ b/test-cli/test/enums/StationStates.py
@@ -0,0 +1,20 @@
+from enum import Enum, unique
+
+
+@unique
+class StationStates(Enum):
+ UNDEFINED = 1000
+ FTP_KERNEL = 1
+ FTP_DTB = 2
+ MOUNT_NFS = 3
+ KERNEL_BOOT = 4
+ WAIT_TEST_START = 5
+ STATION_ERROR = 2000
+ TESTS_CHECKING_ENV = 6
+ TESTS_RUNNING = 10
+ TESTS_FAILED = 40
+ TESTS_OK = 30
+ EXTRATASKS_RUNNING = 100
+ WAITING_FOR_SCANNER = 50
+ FINISHED = 60
+
diff --git a/test-cli/test/enums/__init__.py b/test-cli/test/enums/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/test-cli/test/enums/__init__.py
diff --git a/test-cli/test/helpers/testsrv_db.py b/test-cli/test/helpers/testsrv_db.py
index dfb4d15..637d45c 100644
--- a/test-cli/test/helpers/testsrv_db.py
+++ b/test-cli/test/helpers/testsrv_db.py
@@ -202,9 +202,21 @@ class TestSrv_Database(object):
r = find_between(str(err), '#', '#')
# print(r)
return None
-
- def bond_to_station(self, uuid, station):
- sql = "SELECT station.bond_to_station('{}','{}')".format(uuid, station)
+
+ def read_station_state(self, station):
+ sql = "SELECT * FROM station.getmystate('{}');".format(station)
+ # print('>>>' + sql)
+ try:
+ res = self.__sqlObject.db_execute_query(sql)
+ # print(res)
+ return res[0][0]
+ except Exception as err:
+ r = find_between(str(err), '#', '#')
+ # print(r)
+ return None
+
+ def change_station_state(self, station, newstate):
+ sql = "SELECT station.setmystate('{}', '{}', NULL)".format(newstate, station)
# print('>>>' + sql)
try:
self.__sqlObject.db_execute_query(sql)
diff --git a/test-cli/test/runners/simple.py b/test-cli/test/runners/simple.py
index e0418d2..22fe449 100644
--- a/test-cli/test/runners/simple.py
+++ b/test-cli/test/runners/simple.py
@@ -79,8 +79,8 @@ class TextTestResult(unittest.TestResult):
self.__pgObj.upload_result_string(test.params["testidctl"], test.params["testid"],
result["desc"], result["data"])
elif result["type"] == "file":
- self.__pgObj.upload_result_file(test.params["testidctl"], test.params["testid"], result["desc"],
- result["data"])
+ self.__pgObj.upload_result_file(test.params["testidctl"], test.params["testid"],
+ result["desc"], result["data"])
# SEND TO DB THE RESULT OF THE TEST
if self.result == self.PASS:
status = "TEST_COMPLETE"
diff --git a/test-cli/test/tests/qamp.py b/test-cli/test/tests/qamp.py
deleted file mode 100644
index 56511c8..0000000
--- a/test-cli/test/tests/qamp.py
+++ /dev/null
@@ -1,76 +0,0 @@
-from test.helpers.syscmd import SysCommand
-import unittest
-import serial
-import time
-
-
-class Qamp(unittest.TestCase):
- params = None
-
- # varlist: undercurrent, overcurrent
- def __init__(self, testname, testfunc, varlist):
- self.params = varlist
- self._current = 0.0
- if "undercurrent" in varlist:
- self._undercurrent = varlist["undercurrent"]
- else:
- raise Exception('undercurrent param inside Qamp must be defined')
- if "overcurrent" in varlist:
- self._overcurrent = varlist["overcurrent"]
- else:
- raise Exception('overcurrent param inside Qamp must be defined')
- self._vshuntfactor = 16384.0
- self._ser = serial.Serial()
- self._ser.port = '/dev/ttyUSB0'
- self._ser.baudrate = 115200
- self._ser.parity = serial.PARITY_NONE
- self._ser.timeout = 0
- self._ser.writeTimout = 0
- self._ser.xonxoff = False
- self._ser.rtscts = False
- self._ser.dsrdtr = False
- super(Qamp, self).__init__(testfunc)
- self._testMethodDoc = testname
-
- def __del__(self):
- self._ser.close()
-
- def execute(self):
-
- # Open Serial port ttyUSB0
- error = 0
- try:
- self._ser.open()
- except:
- self.fail("failed: IMPOSSIBLE OPEN USB-SERIAL PORT ( {} )".format(self._ser.port))
- error = 1
- if error == 0:
- # Clean input and output buffer of serial port
- self._ser.flushInput()
- self._ser.flushOutput()
- # Send command to read Voltage at Shunt resistor
- # Prepare cmd
- cmd = 'at+in?\n\r'
- i = 0
-
- # Write, 1 by 1 byte at a time to avoid hanging of serial receiver code of listener, emphasis being made at sleep of 50 ms.
- while (i < len(cmd)):
- i = i + self._ser.write(cmd[i].encode('ascii'))
- time.sleep(0.05)
- self._ser.read(1)
-
- # Read, 1 by 1 byte
- res = []
- while self._ser.inWaiting() > 0: # if incoming bytes are waiting to be read from the serial input buffer
- res.append(self._ser.read(1).decode('ascii')) # read the bytes and convert from binary array to ASCII
-
- string = ''.join(res)
- string = string.replace('\n', '')
- self._current = float(int(string, 0)) / self._vshuntfactor
- print(self._current)
- # In order to give a valid result it is importarnt to define an under current value
- if self._current > float(self._overcurrent):
- self.fail("failed: OVERCURRENT DETECTED ( {} )".format(self._current))
-
- if self._current < float(self._undercurrent):
- self.fail("failed: UNDERCURRENT DETECTED ( {} )".format(self._current))
diff --git a/test-cli/test/tests/qamper.py b/test-cli/test/tests/qamper.py
index 51aa469..7a31615 100644
--- a/test-cli/test/tests/qamper.py
+++ b/test-cli/test/tests/qamper.py
@@ -4,7 +4,7 @@ from test.helpers.amper import Amper
class Qamper(unittest.TestCase):
params = None
- __current = None
+ __resultlist = None # resultlist is a python list of python dictionaries
# varlist: undercurrent, overcurrent
def __init__(self, testname, testfunc, varlist):
@@ -20,36 +20,66 @@ class Qamper(unittest.TestCase):
else:
raise Exception('overcurrent param inside Qamp must be defined')
self._testMethodDoc = testname
+ self.__resultlist = []
def execute(self):
amp = Amper()
# open serial connection
if not amp.open():
- self.fail("failed: can not open serial port")
- return
+ self.__resultlist.append(
+ {
+ "desc": "Test result",
+ "data": "FAILED: can not open a serial port",
+ "type": "string"
+ }
+ )
+ self.fail("Error: can not open a serial port")
# check if the amperimeter is connected and working
# 2 ATTEMTS
if not amp.hello():
if not amp.hello():
- self.fail("failed: can not communicate")
- return
+ self.__resultlist.append(
+ {
+ "desc": "Test result",
+ "data": "FAILED: can not communicate with the amperimeter",
+ "type": "string"
+ }
+ )
+ self.fail("Error: can not communicate")
# get current value (in Amperes)
- self.__current = amp.getCurrent()
+ current = amp.getCurrent()
# close serial connection
amp.close()
# Check current range
- if float(self.__current) > float(self._overcurrent):
- self.fail("failed: Overcurrent detected ( {} )".format(self.__current))
- if float(self.__current) < float(self._undercurrent):
- self.fail("failed: Undercurrent detected ( {} )".format(self.__current))
+ if float(current) > float(self._overcurrent):
+ # Overcurrent detected
+ self.__resultlist.append(
+ {
+ "desc": "Test result",
+ "data": "FAILED: Overcurrent detected ( {} A)".format(current),
+ "type": "string"
+ }
+ )
+ self.fail("failed: Overcurrent detected ( {} )".format(current))
+ elif float(current) < float(self._undercurrent):
+ # Undercurrent detected
+ self.__resultlist.append(
+ {
+ "desc": "Test result",
+ "data": "FAILED: Undercurrent detected ( {} A)".format(current),
+ "type": "string"
+ }
+ )
+ self.fail("failed: Undercurrent detected ( {} )".format(current))
- def getresults(self):
- # resultlist is a python list of python dictionaries
- resultlist = [
+ # Test successful
+ self.__resultlist.append(
{
- "desc": "current (Ampers)",
- "data": self.__current,
+ "desc": "Test result",
+ "data": "OK: Current {} A".format(current),
"type": "string"
}
- ]
- return resultlist
+ )
+
+ def getresults(self):
+ return self.__resultlist
diff --git a/test-cli/test/tests/qaudio.py b/test-cli/test/tests/qaudio.py
deleted file mode 100644
index ef4da67..0000000
--- a/test-cli/test/tests/qaudio.py
+++ /dev/null
@@ -1,39 +0,0 @@
-from test.helpers.syscmd import SysCommand
-import unittest
-
-
-class Qaudio(unittest.TestCase):
- params = None
-
- def __init__(self, testname, testfunc, varlist):
- self.params = varlist
- super(Qaudio, self).__init__(testfunc)
- self._testMethodDoc = testname
- if "dtmfFile" in varlist:
- self._dtmfFile = varlist["dtmfFile"]
- else:
- raise Exception('undercurrent param inside Qamp must be defined')
- self.__sum = 0
- self.__refSum = 25 # 1+3+5+7+9
-
- def execute(self):
- str_cmd = "aplay test/files/dtmf-13579.wav & arecord -r 8000 -d 1 recorded.wav" # .format(self.__dtmfFile)
- audio_loop = SysCommand("command-name", str_cmd)
- if audio_loop.execute() == -1: # BUG: Returns -1 but work
- lines = audio_loop.getOutput().splitlines()
- str_cmd = "multimon -t wav -a DTMF recorded.wav -q"
- dtmf_decoder = SysCommand("command-name", str_cmd)
- a = dtmf_decoder.execute()
- if dtmf_decoder.execute() == -1: # BUG: Returns -1 but work
- self.__raw_out = dtmf_decoder.getOutput()
- if self.__raw_out == "":
- self.fail("failed: can not execute multimon command")
- lines = dtmf_decoder.getOutput().splitlines()
- for i in range(0, 5):
- aux = [int(s) for s in lines[i].split() if s.isdigit()]
- self.__sum = self.__sum + aux[0]
- self.failUnless(self.__sum == self.__refSum), "failed: incorrect dtmf code" + str(self.__sum)
- else:
- self.fail("failed: fail reading recorded file")
- else:
- self.fail("failed: fail playing/recording file")
diff --git a/test-cli/test/tests/qduplex_ser.py b/test-cli/test/tests/qduplex_ser.py
index c7231c2..020844a 100644
--- a/test-cli/test/tests/qduplex_ser.py
+++ b/test-cli/test/tests/qduplex_ser.py
@@ -9,6 +9,7 @@ class Qduplex(unittest.TestCase):
__port1 = None
__port2 = None
__baudrate = None
+ __resultlist = None # resultlist is a python list of python dictionaries
# varlist: port1, port2, baudrate
def __init__(self, testname, testfunc, varlist):
@@ -27,6 +28,7 @@ class Qduplex(unittest.TestCase):
else:
raise Exception('baudrate param inside Qduplex must be defined')
self._testMethodDoc = testname
+ self.__resultlist = []
# open serial connection
self.__serial1 = serial.Serial(self.__port1, self.__baudrate, timeout=1)
@@ -49,9 +51,23 @@ class Qduplex(unittest.TestCase):
self.__serial1.write(test_uuid)
time.sleep(0.05) # there might be a small delay
if self.__serial2.inWaiting() == 0:
+ self.__resultlist.append(
+ {
+ "desc": "Test result",
+ "data": "FAILED: port {} wait timeout exceded".format(self.__port2),
+ "type": "string"
+ }
+ )
self.fail("failed: port {} wait timeout exceded".format(self.__port2))
else:
if self.__serial2.readline() != test_uuid:
+ self.__resultlist.append(
+ {
+ "desc": "Test result",
+ "data": "FAILED: port {} write/read mismatch".format(self.__port2),
+ "type": "string"
+ }
+ )
self.fail("failed: port {} write/read mismatch".format(self.__port2))
time.sleep(0.05) # there might be a small delay
@@ -65,12 +81,33 @@ class Qduplex(unittest.TestCase):
self.__serial2.write(test_uuid)
time.sleep(0.05) # there might be a small delay
if self.__serial1.inWaiting() == 0:
+ self.__resultlist.append(
+ {
+ "desc": "Test result",
+ "data": "FAILED: port {} wait timeout exceded".format(self.__port1),
+ "type": "string"
+ }
+ )
self.fail("failed: port {} wait timeout exceded".format(self.__port1))
else:
if self.__serial1.readline() != test_uuid:
+ self.__resultlist.append(
+ {
+ "desc": "Test result",
+ "data": "FAILED: port {} write/read mismatch".format(self.__port1),
+ "type": "string"
+ }
+ )
self.fail("failed: port {} write/read mismatch".format(self.__port1))
+ # Test successful
+ self.__resultlist.append(
+ {
+ "desc": "Test OK",
+ "data": "OK",
+ "type": "string"
+ }
+ )
+
def getresults(self):
- # resultlist is a python list of python dictionaries
- resultlist = []
- return resultlist
+ return self.__resultlist
diff --git a/test-cli/test/tests/qeeprom.py b/test-cli/test/tests/qeeprom.py
index a65ca97..cafbc7f 100644
--- a/test-cli/test/tests/qeeprom.py
+++ b/test-cli/test/tests/qeeprom.py
@@ -6,12 +6,14 @@ class Qeeprom(unittest.TestCase):
params = None
__position = None
__eeprompath = None
+ __resultlist = None # resultlist is a python list of python dictionaries
# varlist: position, eeprompath
def __init__(self, testname, testfunc, varlist):
self.params = varlist
super(Qeeprom, self).__init__(testfunc)
self._testMethodDoc = testname
+ self.__resultlist = []
if "position" in varlist:
self.__position = varlist["position"]
else:
@@ -33,13 +35,42 @@ class Qeeprom(unittest.TestCase):
data_rx = p.stdout.decode('ascii')
# compare both values
if data_rx != data_tx:
+ # Both data are different
+ self.__resultlist.append(
+ {
+ "desc": "Test result",
+ "data": "FAILED: mismatch between written and received values.",
+ "type": "string"
+ }
+ )
self.fail("failed: mismatch between written and received values.")
else:
+ self.__resultlist.append(
+ {
+ "desc": "Test result",
+ "data": "FAILED: Unable to read from the EEPROM device.",
+ "type": "string"
+ }
+ )
self.fail("failed: Unable to read from the EEPROM device.")
else:
+ self.__resultlist.append(
+ {
+ "desc": "Test result",
+ "data": "FAILED: Unable to write on the EEPROM device.",
+ "type": "string"
+ }
+ )
self.fail("failed: Unable to write on the EEPROM device.")
+ # Test successful
+ self.__resultlist.append(
+ {
+ "desc": "Test result",
+ "data": "OK",
+ "type": "string"
+ }
+ )
+
def getresults(self):
- # resultlist is a python list of python dictionaries
- resultlist = []
- return resultlist
+ return self.__resultlist
diff --git a/test-cli/test/tests/qethernet.py b/test-cli/test/tests/qethernet.py
index d7354bf..878c0a0 100644
--- a/test-cli/test/tests/qethernet.py
+++ b/test-cli/test/tests/qethernet.py
@@ -11,6 +11,7 @@ class Qethernet(unittest.TestCase):
__port = None
params = None
__bwreal = None
+ __resultlist = None # resultlist is a python list of python dictionaries
# varlist content: serverip, bwexpected, port
def __init__(self, testname, testfunc, varlist):
@@ -34,6 +35,7 @@ class Qethernet(unittest.TestCase):
raise Exception('port param inside Qethernet must be defined')
self.__numbytestx = "10M"
self._testMethodDoc = testname
+ self.__resultlist = []
def execute(self):
# execute iperf command against the server
@@ -53,20 +55,43 @@ class Qethernet(unittest.TestCase):
# save result file
with open('/tmp/ethernet-iperf.json', 'w') as outfile:
json.dump(data, outfile, indent=4)
+ self.__resultlist.append(
+ {
+ "desc": "iperf3 output",
+ "data": "/tmp/ethernet-iperf.json",
+ "type": "file"
+ }
+ )
# check if BW is in the expected range
- self.failUnless(self.__bwreal > float(self.__bwexpected) * 0.9,
- "failed: speed is lower than spected. Speed(Mbits/s)" + str(self.__bwreal))
+ if self.__bwreal < float(self.__bwexpected):
+ self.__resultlist.append(
+ {
+ "desc": "Test result",
+ "data": "FAILED: speed is lower than spected. Speed(Mbits/s): " + str(self.__bwreal),
+ "type": "string"
+ }
+ )
+ self.fail("failed: speed is lower than spected. Speed(Mbits/s): " + str(self.__bwreal))
else:
- self.fail("failed: could not complete iperf command")
+ self.__resultlist.append(
+ {
+ "desc": "Test result",
+ "data": "FAILED: could not complete iperf command.",
+ "type": "string"
+ }
+ )
+ self.fail("failed: could not complete iperf command.")
- def getresults(self):
- # resultlist is a python list of python dictionaries
- resultlist = [
+ # Test successful
+ self.__resultlist.append(
{
- "desc": "iperf3 output",
- "data": "/tmp/ethernet-iperf.json",
- "type": "file"
+ "desc": "Test result",
+ "data": "OK",
+ "type": "string"
}
- ]
- return resultlist
+ )
+
+ def getresults(self):
+
+ return self.__resultlist
diff --git a/test-cli/test/tests/qi2c.py b/test-cli/test/tests/qi2c.py
index ad7ddf0..3afedfa 100644
--- a/test-cli/test/tests/qi2c.py
+++ b/test-cli/test/tests/qi2c.py
@@ -4,6 +4,7 @@ import unittest
class Qi2c(unittest.TestCase):
params = None
+ __resultlist = None # resultlist is a python list of python dictionaries
def __init__(self, testname, testfunc, varlist):
self.params = varlist
@@ -18,6 +19,7 @@ class Qi2c(unittest.TestCase):
raise Exception('register param inside Qi2c must be defined')
self.__devices = []
self._testMethodDoc = testname
+ self.__resultlist = []
def execute(self):
str_cmd = "i2cdetect -a -y -r {}".format(self.__busnum)
@@ -34,11 +36,32 @@ class Qi2c(unittest.TestCase):
"0x{}{}".format((i - 1), hex(int((lines[i].find("UU") - 4) / 3)).split('x')[-1]))
for i in range(len(self.__register)):
if not (self.__register[i] in self.__devices):
+ self.__resultlist.append(
+ {
+ "desc": "Test result",
+ "data": "FAILED: device {} not found in bus i2c-{}".format(self.__register[i], self.__busnum),
+ "type": "string"
+ }
+ )
self.fail("failed: device {} not found in bus i2c-{}".format(self.__register[i], self.__busnum))
else:
- self.fail("failed: could not complete i2cdedtect command")
+ self.__resultlist.append(
+ {
+ "desc": "Test result",
+ "data": "FAILED: could not complete i2cdedtect command",
+ "type": "string"
+ }
+ )
+ self.fail("failed: could not complete i2cdedtect command.")
+
+ # Test successful
+ self.__resultlist.append(
+ {
+ "desc": "Test result",
+ "data": "OK",
+ "type": "string"
+ }
+ )
def getresults(self):
- # resultlist is a python list of python dictionaries
- resultlist = []
- return resultlist
+ return self.__resultlist
diff --git a/test-cli/test/tests/qnand.py b/test-cli/test/tests/qnand.py
index 7ff7cb2..d7c22b3 100644
--- a/test-cli/test/tests/qnand.py
+++ b/test-cli/test/tests/qnand.py
@@ -5,6 +5,7 @@ import sh
class Qnand(unittest.TestCase):
params = None
__device = "10M"
+ __resultlist = None # resultlist is a python list of python dictionaries
# varlist: device
def __init__(self, testname, testfunc, varlist):
@@ -15,6 +16,7 @@ class Qnand(unittest.TestCase):
else:
raise Exception('device param inside Qnand must be defined')
self._testMethodDoc = testname
+ self.__resultlist = []
def execute(self):
try:
@@ -22,17 +24,32 @@ class Qnand(unittest.TestCase):
# save result
with open('/tmp/nand-nandtest.txt', 'w') as outfile:
n = outfile.write(p.stdout.decode('ascii'))
+ self.__resultlist.append(
+ {
+ "desc": "nandtest output",
+ "data": "/tmp/nand-nandtest.txt",
+ "type": "file"
+ }
+ )
except sh.ErrorReturnCode as e:
+ self.__resultlist.append(
+ {
+ "desc": "Test result",
+ "data": "FAILED: could not complete nandtest command",
+ "type": "string"
+ }
+ )
self.fail("failed: could not complete nandtest command")
- def getresults(self):
- # resultlist is a python list of python dictionaries
- resultlist = [
+ # Test successful
+ self.__resultlist.append(
{
- "desc": "nandtest output",
- "data": "/tmp/nand-nandtest.txt",
- "type": "file"
+ "desc": "Test result",
+ "data": "OK",
+ "type": "string"
}
- ]
- return resultlist
+ )
+
+ def getresults(self):
+ return self.__resultlist
diff --git a/test-cli/test/tests/qram.py b/test-cli/test/tests/qram.py
index 561e980..a46424f 100644
--- a/test-cli/test/tests/qram.py
+++ b/test-cli/test/tests/qram.py
@@ -6,6 +6,7 @@ class Qram(unittest.TestCase):
params = None
__memsize = "10M"
__loops = "1"
+ __resultlist = None # resultlist is a python list of python dictionaries
# varlist: memsize, loops
def __init__(self, testname, testfunc, varlist):
@@ -20,15 +21,30 @@ class Qram(unittest.TestCase):
else:
raise Exception('loops param inside Qram must be defined')
self._testMethodDoc = testname
+ self.__resultlist = []
def execute(self):
try:
p = sh.memtester(self.__memsize, "1", _out="/dev/null")
except sh.ErrorReturnCode as e:
+ self.__resultlist.append(
+ {
+ "desc": "Test result",
+ "data": "FAILED: could not complete memtester command",
+ "type": "string"
+ }
+ )
self.fail("failed: could not complete memtester command")
+ # Test successful
+ self.__resultlist.append(
+ {
+ "desc": "Test result",
+ "data": "OK",
+ "type": "string"
+ }
+ )
+
def getresults(self):
- # resultlist is a python list of python dictionaries
- resultlist = []
- return resultlist
+ return self.__resultlist
diff --git a/test-cli/test/tests/qrtc.py b/test-cli/test/tests/qrtc.py
index 715bcb7..6d4ecf6 100644
--- a/test-cli/test/tests/qrtc.py
+++ b/test-cli/test/tests/qrtc.py
@@ -7,6 +7,7 @@ import re
class Qrtc(unittest.TestCase):
params = None
__rtc = None
+ __resultlist = None # resultlist is a python list of python dictionaries
def __init__(self, testname, testfunc, varlist):
self.params = varlist
@@ -16,6 +17,7 @@ class Qrtc(unittest.TestCase):
else:
raise Exception('rtc param inside Qrtc must be defined')
self._testMethodDoc = testname
+ self.__resultlist = []
def execute(self):
# get time from RTC peripheral
@@ -31,13 +33,41 @@ class Qrtc(unittest.TestCase):
p.stdout.decode('ascii'))
# check if the seconds of both times are different
if time1 == time2:
+ self.__resultlist.append(
+ {
+ "desc": "Test result",
+ "data": "FAILED: RTC is not running",
+ "type": "string"
+ }
+ )
self.fail("failed: RTC is not running")
else:
- self.fail("failed: couldn't execute hwclock command 2nd time")
+ self.__resultlist.append(
+ {
+ "desc": "Test result",
+ "data": "FAILED: couldn't execute hwclock command for the second time",
+ "type": "string"
+ }
+ )
+ self.fail("failed: couldn't execute hwclock command for the second time")
else:
+ self.__resultlist.append(
+ {
+ "desc": "Test result",
+ "data": "FAILED: couldn't execute hwclock command",
+ "type": "string"
+ }
+ )
self.fail("failed: couldn't execute hwclock command")
+ # Test successful
+ self.__resultlist.append(
+ {
+ "desc": "Test result",
+ "data": "OK",
+ "type": "string"
+ }
+ )
+
def getresults(self):
- # resultlist is a python list of python dictionaries
- resultlist = []
- return resultlist
+ return self.__resultlist
diff --git a/test-cli/test/tests/qscreen.py b/test-cli/test/tests/qscreen.py
deleted file mode 100644
index aaee628..0000000
--- a/test-cli/test/tests/qscreen.py
+++ /dev/null
@@ -1,27 +0,0 @@
-from test.helpers.syscmd import SysCommand
-import unittest
-from test.helpers.cv_display_test import pattern_detect
-
-
-class Qscreen(unittest.TestCase):
- params = None
-
- #varlist: display
- def __init__(self, testname, testfunc, varlist):
- self.params = varlist
- super(Qscreen, self).__init__(testfunc)
- if "display" in varlist:
- self.__display = varlist["display"]
- else:
- raise Exception('display param inside Qscreen have been be defined')
- self._testMethodDoc = testname
-
- def execute(self):
- str_cmd = "fbi -T 1 --noverbose -d /dev/{} test/files/test_pattern.png".format(self.__display)
- display_image = SysCommand("display_image", str_cmd)
- if display_image.execute() == -1:
- test_screen = pattern_detect(1)
- if not test_screen=="0":
- self.fail("failed: {}".format(test_screen))
- else:
- self.fail("failed: could not display the image")
diff --git a/test-cli/test/tests/qserial.py b/test-cli/test/tests/qserial.py
index 0cb5563..faca505 100644
--- a/test-cli/test/tests/qserial.py
+++ b/test-cli/test/tests/qserial.py
@@ -8,6 +8,7 @@ class Qserial(unittest.TestCase):
params = None
__port = None
__baudrate = None
+ __resultlist = None # resultlist is a python list of python dictionaries
# varlist: port, baudrate
def __init__(self, testname, testfunc, varlist):
@@ -23,6 +24,7 @@ class Qserial(unittest.TestCase):
else:
raise Exception('baudrate param inside Qserial must be defined')
self._testMethodDoc = testname
+ self.__resultlist = []
# open serial connection
self.__serial = serial.Serial(self.__port, self.__baudrate, timeout=1)
@@ -40,13 +42,34 @@ class Qserial(unittest.TestCase):
self.__serial.write(test_uuid)
time.sleep(0.05) # there might be a small delay
if self.__serial.inWaiting() == 0:
+ self.__resultlist.append(
+ {
+ "desc": "Test result",
+ "data": "FAILED: port {} wait timeout exceded".format(self.__port),
+ "type": "string"
+ }
+ )
self.fail("failed: port {} wait timeout exceded".format(self.__port))
else:
# check if what it was sent is equal to what has been received
if self.__serial.readline() != test_uuid:
+ self.__resultlist.append(
+ {
+ "desc": "Test result",
+ "data": "FAILED: port {} write/read mismatch".format(self.__port),
+ "type": "string"
+ }
+ )
self.fail("failed: port {} write/read mismatch".format(self.__port))
+ # Test successful
+ self.__resultlist.append(
+ {
+ "desc": "Test result",
+ "data": "OK",
+ "type": "string"
+ }
+ )
+
def getresults(self):
- # resultlist is a python list of python dictionaries
- resultlist = []
- return resultlist
+ return self.__resultlist
diff --git a/test-cli/test/tests/qtemplate.py b/test-cli/test/tests/qtemplate.py
deleted file mode 100644
index 940cded..0000000
--- a/test-cli/test/tests/qtemplate.py
+++ /dev/null
@@ -1,51 +0,0 @@
-#IF COMMAND IS NEEDED
-from test.helpers.syscmd import SysCommand
-import unittest
-#class name
-class Qtemplate(unittest.TestCase):
- # Initialize the variables
- __variable1 = "Value-a"
- __variable2 = "Value-b"
- #....
- __variablen = "Value-n"
-
- def __init__(self, testname, testfunc, input1=None, inputn=None):
- # Doing this we will initialize the class and later on perform a particular method inside this class
- super(Qtemplate, self).__init__(testfunc)
- self.__testname = testname
- self.__input1 = input1
- self.__inputn = inputn
- self._testMethodDoc = testname
-
-
-
- def execute(self):
- str_cmd = "command"
- t = SysCommand("command-name", str_cmd)
- if t.execute() == 0:
- self.__raw_out = t.getOutput()
- if self.__raw_out == "":
- return -1
- lines = t.getOutput().splitlines()
- dat = lines[1]
- dat = dat.decode('ascii')
- dat_list = dat.split( )
- for d in dat_list:
- a = dat_list.pop(0)
- if a == "sec":
- break
- self.__MB_real = dat_list[0]
- self.__BW_real = dat_list[2]
- self.__dat_list = dat_list
- print(self.__MB_real)
- print(self.__BW_real)
- self.failUnless(int(self.__BW_real)>int(self.__OKBW)*0.9,"FAIL:BECAUSE...")
- else:
- return -1
- return 0
-
- def get_Total_MB(self):
- return self.__MB_real;
-
- def get_Total_BW(self):
- return self.__MB_real;
diff --git a/test-cli/test/tests/qusb.py b/test-cli/test/tests/qusb.py
index 316cef5..d952afc 100644
--- a/test-cli/test/tests/qusb.py
+++ b/test-cli/test/tests/qusb.py
@@ -7,11 +7,13 @@ from test.helpers.changedir import changedir
class Qusb(unittest.TestCase):
params = None
+ __resultlist = None # resultlist is a python list of python dictionaries
def __init__(self, testname, testfunc, varlist):
self.params = varlist
super(Qusb, self).__init__(testfunc)
self._testMethodDoc = testname
+ self.__resultlist = []
def execute(self):
# Execute script usb.sh
@@ -19,9 +21,19 @@ class Qusb(unittest.TestCase):
test_abspath = os.path.join(test_abspath, "..", "..")
p = sh.bash(os.path.join(test_abspath, 'test/scripts/usb.sh'))
# Search in the stdout a pattern "/dev/sd + {letter} + {number}
- q = re.search("/dev/sd\w\d", p.stdout.decode('ascii'))
+ match = re.search("/dev/sd\w\d", p.stdout.decode('ascii'))
# get the first device which matches the pattern
- device = q.group(0)
+ if match:
+ device = match.group(0)
+ else:
+ self.__resultlist.append(
+ {
+ "desc": "Test result",
+ "data": "FAILED: No pendrive found",
+ "type": "string"
+ }
+ )
+ self.fail("failed: No pendrive found.")
# create a new folder where the pendrive is going to be mounted
sh.mkdir("-p", "/mnt/pendrive")
# check if the device is mounted, and umount it
@@ -50,15 +62,43 @@ class Qusb(unittest.TestCase):
sh.umount("/mnt/pendrive")
# check result
if q is None:
- self.fail("failed: wrong md5 result.")
+ self.__resultlist.append(
+ {
+ "desc": "Test result",
+ "data": "FAILED: Wrong md5 result",
+ "type": "string"
+ }
+ )
+ self.fail("failed: Wrong md5 result.")
else:
# umount
sh.umount("/mnt/pendrive")
- self.fail("failed: unable to copy files.")
+ self.__resultlist.append(
+ {
+ "desc": "Test result",
+ "data": "FAILED: Unable to copy files to the USB memory device",
+ "type": "string"
+ }
+ )
+ self.fail("failed: Unable to copy files to the USB memory device.")
else:
- self.fail("failed: unable to mount the device.")
+ self.__resultlist.append(
+ {
+ "desc": "Test result",
+ "data": "FAILED: Unable to mount the USB memory device",
+ "type": "string"
+ }
+ )
+ self.fail("failed: Unable to mount the USB memory device.")
+
+ # Test successful
+ self.__resultlist.append(
+ {
+ "desc": "Test result",
+ "data": "OK",
+ "type": "string"
+ }
+ )
def getresults(self):
- # resultlist is a python list of python dictionaries
- resultlist = []
- return resultlist
+ return self.__resultlist
diff --git a/test-cli/test/tests/qwifi.py b/test-cli/test/tests/qwifi.py
index a5b66e9..6f972d3 100644
--- a/test-cli/test/tests/qwifi.py
+++ b/test-cli/test/tests/qwifi.py
@@ -11,6 +11,7 @@ class Qwifi(unittest.TestCase):
__port = None
params = None
__bwreal = None
+ __resultlist = None # resultlist is a python list of python dictionaries
# varlist content: serverip, bwexpected, port
def __init__(self, testname, testfunc, varlist):
@@ -30,6 +31,7 @@ class Qwifi(unittest.TestCase):
raise Exception('port param inside Qwifi must be defined')
self.__numbytestx = "10M"
self._testMethodDoc = testname
+ self.__resultlist = []
def execute(self):
# check if the board is connected to the router by wifi
@@ -63,28 +65,78 @@ class Qwifi(unittest.TestCase):
# save result file
with open('/tmp/wifi-iperf.json', 'w') as outfile:
json.dump(data, outfile, indent=4)
+ self.__resultlist.append(
+ {
+ "desc": "iperf3 output",
+ "data": "/tmp/wifi-iperf.json",
+ "type": "file"
+ }
+ )
# check if BW is in the expected range
- self.failUnless(self.__bwreal > float(self.__bwexpected) * 0.9,
- "failed: speed is lower than spected. Speed(Mbits/s)" + str(self.__bwreal))
+ if self.__bwreal < float(self.__bwexpected):
+ self.__resultlist.append(
+ {
+ "desc": "Test result",
+ "data": "FAILED: speed is lower than expected. Speed(Mbits/s): " + str(self.__bwreal),
+ "type": "string"
+ }
+ )
+ self.fail("failed: speed is lower than expected. Speed(Mbits/s): " + str(self.__bwreal))
else:
- self.fail("failed: could not complete iperf command")
+ self.__resultlist.append(
+ {
+ "desc": "Test result",
+ "data": "FAILED: could not complete iperf3 command",
+ "type": "string"
+ }
+ )
+ self.fail("failed: could not complete iperf3 command")
else:
+ self.__resultlist.append(
+ {
+ "desc": "Test result",
+ "data": "FAILED: wlan0 interface doesn't have any ip address",
+ "type": "string"
+ }
+ )
self.fail("failed: wlan0 interface doesn't have any ip address.")
else:
+ self.__resultlist.append(
+ {
+ "desc": "Test result",
+ "data": "FAILED: could not complete ifconfig command",
+ "type": "string"
+ }
+ )
self.fail("failed: could not complete ifconfig command.")
else:
+ self.__resultlist.append(
+ {
+ "desc": "Test result",
+ "data": "FAILED: wifi module is not connected to the router",
+ "type": "string"
+ }
+ )
self.fail("failed: wifi module is not connected to the router.")
else:
- self.fail("failed: couldn't execute iw command")
+ self.__resultlist.append(
+ {
+ "desc": "Test result",
+ "data": "FAILED: could not execute iw command",
+ "type": "string"
+ }
+ )
+ self.fail("failed: could not execute iw command")
- def getresults(self):
- # resultlist is a python list of python dictionaries
- resultlist = [
+ # Test successful
+ self.__resultlist.append(
{
- "desc": "iperf3 output",
- "data": "/tmp/wifi-iperf.json",
- "type": "file"
+ "desc": "Test result",
+ "data": "OK",
+ "type": "string"
}
- ]
- return resultlist
+ )
+
+ def getresults(self):
+ return self.__resultlist
diff --git a/test-cli/test_main.py b/test-cli/test_main.py
index 3345f53..0df9bd4 100644
--- a/test-cli/test_main.py
+++ b/test-cli/test_main.py
@@ -6,7 +6,6 @@ import unittest
from test.helpers.testsrv_db import TestSrv_Database
from test.helpers.setup_xml import XMLSetup
from test.runners.simple import SimpleTestRunner
-from test.helpers.syscmd import TestSysCommand
from test.tests.qethernet import Qethernet
from test.tests.qram import Qram
from test.tests.qusb import Qusb
@@ -27,6 +26,8 @@ from test.tasks.flashmemory import flash_memory
from test.helpers.qrreader import QRReader
from test.tasks.generatedmesg import generate_dmesg
from test.helpers.cmdline import LinuxKernel
+from test.helpers.amper import Amper
+from test.enums.StationStates import StationStates
# global variables
psdbObj = TestSrv_Database()
@@ -103,13 +104,18 @@ def create_board():
variant = cmd.getkvar("bvariant", "none")
if model_id == "none" or variant == "none":
- return 1
+ print("Error: Cannot open DB connection")
+ loggerObj.getlogger().info("Station error: #Cannot get model and variant information#")
+ exit(0)
# get model id
globalVar.g_mid = model_id + "-" + variant
# get station
- globalVar.station = socket.gethostname()
+ if "Station" not in globalVar.station:
+ print("Error: Wrong Station name")
+ loggerObj.getlogger().info("Station error: #Wrong station name#")
+ exit(0)
processor_id = get_die_id(globalVar.g_mid)
print(globalVar.g_mid)
@@ -117,8 +123,6 @@ def create_board():
globalVar.g_uuid = psdbObj.create_board(processor_id, model_id, variant, globalVar.station,
get_mac(globalVar.g_mid))
print(globalVar.g_uuid)
- psdbObj.bond_to_station(globalVar.g_uuid, globalVar.station)
- return 0
def get_taskvars_list(uuid):
@@ -131,67 +135,97 @@ def get_taskvars_list(uuid):
def main():
# initialize the board
- res = create_board()
- if res:
- loggerObj.getlogger().info("Python program error")
+ create_board()
+
+ # create a process
+ globalVar.testid_ctl = psdbObj.open_test(globalVar.g_uuid)
+ # create and run tests according to the board type
+ runner = SimpleTestRunner(psdbObj)
+ # Change state to "TESTS_RUNNING"
+ if psdbObj.read_station_state(globalVar.station) == StationStates.STATION_ERROR.name:
+ # Abort
+ print("Error: Wrong previous station state.")
+ exit(0)
+ psdbObj.change_station_state(globalVar.station, StationStates.TESTS_RUNNING.name)
+ # Execute tests
+ testresult = runner.run(create_testsuite())
+ # save dmesg at the end of the tests
+ generate_dmesg(globalVar.testid_ctl, psdbObj)
+
+ # execute aditional tasks, only if the test was succesfull
+ if testresult.wasSuccessful():
+ # Change state to "TESTS_OK"
+ if psdbObj.read_station_state(globalVar.station) == StationStates.STATION_ERROR.name:
+ # Abort
+ print("Error: Wrong previous station state.")
+ exit(0)
+ psdbObj.change_station_state(globalVar.station, StationStates.TESTS_OK.name)
+ # Change state to "EXTRATASKS_RUNNING"
+ if psdbObj.read_station_state(globalVar.station) == StationStates.STATION_ERROR.name:
+ # Abort
+ print("Error: Wrong previous station state.")
+ exit(0)
+ psdbObj.change_station_state(globalVar.station, StationStates.EXTRATASKS_RUNNING.name)
+ # create task control
+ globalVar.taskid_ctl = psdbObj.open_task(globalVar.g_uuid)
+ # get extra variables
+ varlist = get_taskvars_list(globalVar.g_uuid)
+
+ alltasksok = False
+
+ # flash eeprom
+ resulteeprom = 0
+ if "eeprompath" in varlist and len(varlist["eeprompath"]) > 0:
+ mac0 = psdbObj.get_board_macaddr(globalVar.g_uuid)
+ resulteeprom, eepromdata = flash_eeprom(varlist["eeprompath"], globalVar.g_uuid, mac0)
+ psdbObj.create_task_result(globalVar.taskid_ctl, "FLASHEEPROM",
+ "TASK_OK" if resulteeprom == 0 else "TASK_FAIL", eepromdata)
+
+ # flash non-volatile memory
+ resultmemory = 0
+ if "flashimagepath" in varlist and len(varlist["flashimagepath"]) > 0:
+ resultmemory = flash_memory(varlist["flashimagepath"])
+ psdbObj.create_task_result(globalVar.taskid_ctl, "FLASHMEMORY",
+ "TASK_OK" if resultmemory == 0 else "TASK_FAIL",
+ varlist["flashimagepath"])
+
+ # update status with the result
+ if resulteeprom == 0 and resultmemory == 0:
+ psdbObj.update_taskctl_status(globalVar.taskid_ctl, "TASK_BOARD_OK")
+ # Change state to "WAITING_FOR_SCANNER"
+ if psdbObj.read_station_state(globalVar.station) == StationStates.STATION_ERROR.name:
+ # Abort
+ print("Error: Wrong previous station state.")
+ exit(0)
+ psdbObj.change_station_state(globalVar.station, StationStates.WAITING_FOR_SCANNER.name)
+ # get barcode using the scanner
+ qrreceived = False
+ while not qrreceived:
+ qr = QRReader()
+ if qr.openQR():
+ # waits 5s to receive a valid code
+ if qr.readQRasync(5):
+ qrreceived = True
+ factorycode = qr.getQRNumber()
+ psdbObj.set_factorycode(globalVar.g_uuid, factorycode)
+ qr.closeQR()
+ # Change state to "FINISHED"
+ if psdbObj.read_station_state(globalVar.station) == StationStates.STATION_ERROR.name:
+ # Abort
+ print("Error: Wrong previous station state.")
+ exit(0)
+ psdbObj.change_station_state(globalVar.station, StationStates.FINISHED.name)
+ else:
+ psdbObj.update_taskctl_status(globalVar.taskid_ctl, "TASK_BOARD_FAIL")
+ loggerObj.getlogger().info("Station error: #Unable to complete extra tasks#")
+
else:
- # create a process
- globalVar.testid_ctl = psdbObj.open_test(globalVar.g_uuid)
- # create and run tests according to the board type
- runner = SimpleTestRunner(psdbObj)
- loggerObj.getlogger().info("Tests running")
- testresult = runner.run(create_testsuite())
- # save dmesg
- generate_dmesg(globalVar.testid_ctl, psdbObj)
-
- # execute aditional tasks, only if the test was succesfull
- if testresult.wasSuccessful():
- loggerObj.getlogger().info("Extra tasks running")
- # create task control
- globalVar.taskid_ctl = psdbObj.open_task(globalVar.g_uuid)
- # get extra variables
- varlist = get_taskvars_list(globalVar.g_uuid)
-
- alltasksok = False
-
- # flash eeprom
- resulteeprom = 0
- if "eeprompath" in varlist and len(varlist["eeprompath"]) > 0:
- mac0 = psdbObj.get_board_macaddr(globalVar.g_uuid)
- resulteeprom, eepromdata = flash_eeprom(varlist["eeprompath"], globalVar.g_uuid, mac0)
- psdbObj.create_task_result(globalVar.taskid_ctl, "FLASHEEPROM",
- "TASK_OK" if resulteeprom == 0 else "TASK_FAIL", eepromdata)
-
- # flash non-volatile memory
- resultmemory = 0
- if "flashimagepath" in varlist and len(varlist["flashimagepath"]) > 0:
- resultmemory = flash_memory(varlist["flashimagepath"])
- psdbObj.create_task_result(globalVar.taskid_ctl, "FLASHMEMORY",
- "TASK_OK" if resultmemory == 0 else "TASK_FAIL",
- varlist["flashimagepath"])
-
- # update status with the result
- if resulteeprom == 0 and resultmemory == 0:
- alltasksok = True
- psdbObj.update_taskctl_status(globalVar.taskid_ctl, "TASK_BOARD_OK")
- else:
- psdbObj.update_taskctl_status(globalVar.taskid_ctl, "TASK_BOARD_FAIL")
-
- if alltasksok:
- # get barcode using the scanner
- loggerObj.getlogger().info("Waiting for barcode scanner")
- qrreceived = False
- while not qrreceived:
- qr = QRReader()
- if qr.openQR():
- # waits 5s to receive a valid code
- if qr.readQRasync(5):
- qrreceived = True
- factorycode = qr.getQRNumber()
- psdbObj.set_factorycode(globalVar.g_uuid, factorycode)
- qr.closeQR()
-
- loggerObj.getlogger().info("Python program finished")
+ # Change state to "TESTS_FAILED"
+ if psdbObj.read_station_state(globalVar.station) == StationStates.STATION_ERROR.name:
+ # Abort
+ print("Error: Wrong previous station state.")
+ exit(0)
+ psdbObj.change_station_state(globalVar.station, StationStates.TESTS_FAILED.name)
if __name__ == "__main__":
@@ -202,18 +236,68 @@ if __name__ == "__main__":
# create logger
loggerObj = ISEE_Logger(logging.INFO)
- loggerObj.getlogger().info("Python program started")
# Try to parse the setup.xml file
try:
xmlObj = XMLSetup(os.path.join(test_abspath, "setup.xml"))
except:
+ # Abort
print("Error: Cannot parse setup.xml file")
- exit(1)
+ loggerObj.getlogger().info("Station error: #Cannot parse XML configuration file#")
+ exit(0)
# Try to connect to the DB, according to setup.xml configuration
- if psdbObj.open(xmlObj):
- main()
- else:
+ if not psdbObj.open(xmlObj):
print("Error: Cannot open DB connection")
- exit(2)
+ loggerObj.getlogger().info("Station error: #Cannot open DB connection#")
+ exit(0)
+
+ # get station name
+ globalVar.station = socket.gethostname()
+
+ # Check if current state is "WAIT_TEST_START"
+ currentstate = psdbObj.read_station_state(globalVar.station)
+ if currentstate != StationStates.WAIT_TEST_START.name:
+ # Abort
+ print("Error: Wrong previous station state.")
+ exit(0)
+
+ # Change state to "TESTS_CHECKING_ENV"
+ psdbObj.change_station_state(globalVar.station, StationStates.TESTS_CHECKING_ENV.name)
+
+ # Look for a sd or pendrive
+ # TODO
+ #if not disk_exists("/dev/mmcblk0") and not :
+ # abort
+ #print("error: cannot find a barcode scanner.")
+ #loggerObj.getlogger().info("station error: #cannot find a sd card or memory usb to load the test environment.#")
+ #exit(0)
+
+ # Look for a barcode scanner
+ qr = QRReader()
+ if qr.openQR():
+ qr.closeQR()
+ else:
+ # Abort
+ print("Error: Cannot find a barcode scanner.")
+ loggerObj.getlogger().info("Station error: #Cannot find a barcode scanner#")
+ exit(0)
+
+ # Look for an amperimeter
+ amp = Amper()
+ if not amp.open():
+ # Abort
+ print("Error: Cannot open an amperimeter COM port.")
+ loggerObj.getlogger().info("Station error: #Cannot open an amperimeter COM port#")
+ exit(0)
+ if not amp.hello():
+ if not amp.hello():
+ # Abort
+ print("Error: Cannot open find an amperimeter.")
+ loggerObj.getlogger().info("Station error: #Cannot open find an amperimeter#")
+ exit(0)
+
+ # Execute main
+ main()
+
+ exit(0)