summaryrefslogtreecommitdiff
path: root/test-cli/test/tests
diff options
context:
space:
mode:
authorManel Caro <mcaro@iatec.biz>2021-11-06 16:28:38 +0100
committerManel Caro <mcaro@iatec.biz>2021-11-06 16:28:38 +0100
commitcf19bfe18cbd283b188a858ee1629f9909c924f4 (patch)
tree1efb23519727130058401df090ab1b5f4cc8ba99 /test-cli/test/tests
parentb6932fbaf898724ae87c29f8965621610f377084 (diff)
parentd5b273a3b58a250742049df4ca0ef0ba54f53d33 (diff)
downloadboard-cf19bfe18cbd283b188a858ee1629f9909c924f4.zip
board-cf19bfe18cbd283b188a858ee1629f9909c924f4.tar.gz
board-cf19bfe18cbd283b188a858ee1629f9909c924f4.tar.bz2
Merge branch 'sopa-test'rel.0.1sopa-test
Diffstat (limited to 'test-cli/test/tests')
-rw-r--r--test-cli/test/tests/qamp.py87
-rw-r--r--test-cli/test/tests/qamper.py74
-rw-r--r--test-cli/test/tests/qaudio.py119
-rw-r--r--test-cli/test/tests/qbutton.py54
-rw-r--r--test-cli/test/tests/qdmesg.py32
-rw-r--r--test-cli/test/tests/qduplex_ser.py69
-rw-r--r--test-cli/test/tests/qeeprom.py90
-rw-r--r--test-cli/test/tests/qethernet.py161
-rw-r--r--test-cli/test/tests/qflash.py77
-rw-r--r--test-cli/test/tests/qi2c.py29
-rw-r--r--test-cli/test/tests/qiperf.py53
-rw-r--r--test-cli/test/tests/qmmcflash.py55
-rw-r--r--test-cli/test/tests/qnand.py48
-rw-r--r--test-cli/test/tests/qplc.py121
-rw-r--r--test-cli/test/tests/qram.py68
-rw-r--r--test-cli/test/tests/qrtc.py44
-rw-r--r--test-cli/test/tests/qscreen.py31
-rw-r--r--test-cli/test/tests/qserial.py43
-rw-r--r--test-cli/test/tests/qtemplate.py51
-rw-r--r--test-cli/test/tests/qusb.py139
-rw-r--r--test-cli/test/tests/qvideo.py125
-rw-r--r--test-cli/test/tests/qwifi.py120
22 files changed, 1064 insertions, 626 deletions
diff --git a/test-cli/test/tests/qamp.py b/test-cli/test/tests/qamp.py
deleted file mode 100644
index 9fcd6d2..0000000
--- a/test-cli/test/tests/qamp.py
+++ /dev/null
@@ -1,87 +0,0 @@
-from test.helpers.syscmd import SysCommand
-import unittest
-import serial
-import time
-
-class Qamp(unittest.TestCase):
-
- def __init__(self, testname, testfunc, undercurrent=0.1, overcurrent=2):
- self._current = 0.0
- self._undercurrent=undercurrent
- self._overcurrent = overcurrent
- self._vshuntfactor=16384.0
- self._ser = serial.Serial()
- self._ser.port = '/dev/ttyUSB0'
- self._ser.baudrate = 115200
- self._ser.parity = serial.PARITY_NONE
- self._ser.timeout = 0
- self._ser.writeTimout = 0
- self._ser.xonxoff = False
- self._ser.rtscts = False
- self._ser.dsrdtr = False
- super(Qamp, self).__init__(testfunc)
- self._testMethodDoc = testname
-
- def __del__(self):
- self._ser.close()
-
- def execute(self):
- # Open Serial port ttyUSB0
- error=0
- try:
- self._ser.open()
- except:
- self.fail("failed: IMPOSSIBLE OPEN USB-SERIAL PORT ( {} )".format(self._ser.port))
- error=1
- return -1
- if error==0:
- # Clean input and output buffer of serial port
- self._ser.flushInput()
- self._ser.flushOutput()
- # Send command to read Voltage at Shunt resistor
- # Prepare cmd
- cmd = ('at\n\r')
- i=0
- while (i < len(cmd)):
- i = i + self._ser.write(cmd[i].encode('ascii'))
- time.sleep(0.05)
- self._ser.read(1)
- res0 = []
- while (self._ser.inWaiting() > 0): # if incoming bytes are waiting to be read from the serial input buffer
- res0.append(self._ser.read(1).decode('ascii')) # read the bytes and convert from binary array to ASCII
- print(res0)
- #CHECK COM FIRST
- cmd = ('at+in?\n\r')
- i = 0
-
- # Write, 1 by 1 byte at a time to avoid hanging of serial receiver code of listener, emphasis being made at sleep of 50 ms.
- while (i < len(cmd)):
- i = i + self._ser.write(cmd[i].encode('ascii'))
- time.sleep(0.05)
- self._ser.read(1)
-
- # Read, 1 by 1 byte
- res = []
- while (self._ser.inWaiting() > 0): # if incoming bytes are waiting to be read from the serial input buffer
- res.append(self._ser.read(1).decode('ascii')) # read the bytes and convert from binary array to ASCII
-
- string = ''.join(res)
- string = string.replace('\n', '')
- try:
- self._current = float(int(string, 0)) / self._vshuntfactor
- except:
- self.fail("failed: CAN'T READ CONSUMPTION (CURRENT=0?)")
- error=1
- return -1
- if error==0:
- print(self._current)
- # In order to give a valid result it is importarnt to define an under current value
- if (self._current > float(self._overcurrent)):
- self.fail("failed: OVERCURRENT DETECTED ( {} )".format(self._current))
- return -1
-
- if (self._current < float(self._undercurrent)):
- self.fail("failed: UNDERCURRENT DETECTED ( {} )".format(self._current))
- return -1
-
-
diff --git a/test-cli/test/tests/qamper.py b/test-cli/test/tests/qamper.py
new file mode 100644
index 0000000..96b673c
--- /dev/null
+++ b/test-cli/test/tests/qamper.py
@@ -0,0 +1,74 @@
+import unittest
+from test.helpers.amper import Amper
+
+
+class Qamper(unittest.TestCase):
+ params = None
+ __resultlist = None # resultlist is a python list of python dictionaries
+ dummytest = False
+
+ # varlist: undercurrent, overcurrent
+ def __init__(self, testname, testfunc, varlist):
+ self.params = varlist
+ super(Qamper, self).__init__(testfunc)
+
+ if "undercurrent" in varlist:
+ self._undercurrent = varlist["undercurrent"]
+ else:
+ raise Exception('undercurrent param inside Qamp must be defined')
+ if "overcurrent" in varlist:
+ self._overcurrent = varlist["overcurrent"]
+ else:
+ raise Exception('overcurrent param inside Qamp must be defined')
+ self._testMethodDoc = testname
+ self.__resultlist = []
+ if "dummytest" in varlist:
+ self.dummytest = varlist["dummytest"]
+
+ def saveresultinfile(self, currentvalue):
+ # save result in a file
+ with open('/mnt/station_ramdisk/amper.txt', 'w') as outfile:
+ n = outfile.write("Current: {} A".format(currentvalue))
+ outfile.close()
+ self.__resultlist.append(
+ {
+ "description": "Amperimeter values",
+ "filepath": "/mnt/station_ramdisk/amper.txt",
+ "mimetype": "text/plain"
+ }
+ )
+
+ def execute(self):
+ if self.dummytest == '1':
+ self.current = "0.0"
+ self.saveresultinfile(self.current)
+ return
+
+ amp = Amper()
+ # open serial connection
+ if not amp.open():
+ self.fail("Error: can not open a serial port")
+ # check if the amperimeter is connected and working
+ # 2 ATTEMTS
+ if not amp.hello():
+ if not amp.hello():
+ self.fail("Error: can not communicate")
+ # get current value (in Amperes)
+ self.current = amp.getCurrent()
+ # save result in a file
+ self.saveresultinfile(self.current)
+ # close serial connection
+ amp.close()
+ # Check current range
+ if float(self.current) > float(self._overcurrent):
+ # Overcurrent detected
+ self.fail("failed: Overcurrent detected ( {} )".format(self.current))
+ elif float(self.current) < float(self._undercurrent):
+ # Undercurrent detected
+ self.fail("failed: Undercurrent detected ( {} )".format(self.current))
+
+ def getresults(self):
+ return self.__resultlist
+
+ def gettextresult(self):
+ return "{} A".format(self.current)
diff --git a/test-cli/test/tests/qaudio.py b/test-cli/test/tests/qaudio.py
index a1572ca..a219a1a 100644
--- a/test-cli/test/tests/qaudio.py
+++ b/test-cli/test/tests/qaudio.py
@@ -1,37 +1,100 @@
-from test.helpers.syscmd import SysCommand
import unittest
-#class name
+import sh
+import wave
+import contextlib
+import uuid
+import time
+from test.helpers.utils import save_result
+from test.helpers.iseelogger import logObj
+
class Qaudio(unittest.TestCase):
- # Initialize the variables
+ params = None
+ __resultlist = None # resultlist is a python list of python dictionaries
+ __dtmf_secuence = ["1", "3", "5", "7", "9"]
+ __dtmf_file = None
+ __file_uuid = None
+ __QaudioName = None
- def __init__(self, testname, testfunc, dtmfFile):
- # Doing this we will initialize the class and later on perform a particular method inside this class
+ def __init__(self, testname, testfunc, varlist):
+ self.params = varlist
super(Qaudio, self).__init__(testfunc)
self._testMethodDoc = testname
- self._dtmfFile=dtmfFile
- self.__sum=0
- self.__refSum = 25 # 1+3+5+7+9
+ self.__resultlist = []
+ self.__xmlObj = varlist["xml"]
+ self.__QaudioName = varlist.get('name', 'qaudio')
+ self.__dtmf_file_play = varlist.get('play_dtmf_file', self.__xmlObj.getKeyVal(self.__QaudioName, "play_dtmf_file", "dtmf-13579.wav"))
+ self.__recordtime = varlist.get('recordtime', self.__xmlObj.getKeyVal(self.__QaudioName, "recordtime", "0.15"))
+ self.__dtmf_file_record = varlist.get('record_dtmf_file', self.__xmlObj.getKeyVal(self.__QaudioName, "record_dtmf_file", "record-{}.wav"))
+ self.__sampling_rate = varlist.get('rate', self.__xmlObj.getKeyVal(self.__QaudioName, "rate", "8000"))
+ self.__record_path = varlist.get('to', self.__xmlObj.getKeyVal(self.__QaudioName, "to", "/mnt/station_ramdisk"))
+ self.__run_delay = float(varlist.get('aplay_run_delay', self.__xmlObj.getKeyVal(self.__QaudioName, "aplay_run_delay", "0.0")))
+ self.__file_uuid = uuid.uuid4()
+ self.__dtmf_file_record = self.__dtmf_file_record.format(self.__file_uuid)
+
+ def restoreAlsaCtl(self):
+ try:
+ sh.alsactl('restore');
+ except Exception as E:
+ logObj.getlogger().error("Failed to Execite alsactl restore");
def execute(self):
- str_cmd = "aplay test/files/dtmf-13579.wav 2> /dev/null & arecord -r 8000 -d 1 recorded.wav 2> /dev/null" #.format(self.__dtmfFile)
- audio_loop = SysCommand("command-name", str_cmd)
- if audio_loop.execute() == 0:# BUG: Returns -1 but work
- lines = audio_loop.getOutput().splitlines()
- str_cmd = "multimon -t wav -a DTMF recorded.wav -q 2> /dev/null"
- dtmf_decoder = SysCommand("command-name", str_cmd)
- if dtmf_decoder.execute() == 0: # BUG: Returns -1 but work
- self.__raw_out = dtmf_decoder.getOutput()
- if self.__raw_out == "":
- return -1
- lines = dtmf_decoder.getOutput().splitlines()
- for i in range(0, 5):
- aux=[int(s) for s in lines[i].split() if s.isdigit()]
- self.__sum=self.__sum+aux[0]
- self.failUnless(self.__sum == self.__refSum), "failed: incorrect dtmf code" + str(self.__sum)
+ self.restoreAlsaCtl()
+ try:
+ # analize audio file
+ calcRecordTime = self.calc_audio_duration(self.__dtmf_file_play) + float(self.__recordtime) + float(0.1)
+ except TypeError as Error:
+ self.fail("failed: TypeError: {}.".Error)
+
+ # previous play to estabilise audio lines
+ p2 = sh.arecord("-r", self.__sampling_rate, "-d", calcRecordTime, "{}/{}".format(self.__record_path, self.__dtmf_file_record), _bg=True)
+ time.sleep(self.__run_delay)
+ p1 = sh.aplay(self.__dtmf_file_play)
+ p2.wait()
+
+ p2 = sh.arecord("-r", self.__sampling_rate, "-d", calcRecordTime, "{}/{}".format(self.__record_path, self.__dtmf_file_record), _bg=True)
+ time.sleep(self.__run_delay)
+ p1 = sh.aplay(self.__dtmf_file_play)
+ p2.wait()
+ if p1.exit_code == 0 and p2.exit_code == 0:
+ p = sh.multimon("-t", "wav", "-a", "DTMF", "{}/{}".format(self.__record_path, self.__dtmf_file_record), "-q")
+ if p.exit_code == 0:
+ lines = p.stdout.decode('ascii').splitlines()
+ self.__dtmf_secuence_result = []
+ for li in lines:
+ if li.split(" ")[0] == "DTMF:":
+ self.__dtmf_secuence_result.append(li.split(" ")[1])
+ self.__dtmf_secuence_result = sorted(list(set(self.__dtmf_secuence_result)))
+ # compare original and processed dtmf sequence
+ if len(self.__dtmf_secuence) == len(self.__dtmf_secuence_result):
+ for a, b in zip(self.__dtmf_secuence, self.__dtmf_secuence_result):
+ if a != b:
+ logObj.getlogger().debug(
+ "failed: sent and received DTMF sequence do not match.{} != {}".format(
+ self.__dtmf_secuence, self.__dtmf_secuence_result))
+ self.fail("failed: sent and received DTMF sequence do not match.")
+ save_result(filePath='{}/{}'.format(self.__record_path, self.__dtmf_file_record),
+ description='sound record',
+ mime='audio/x-wav',
+ result=self.__resultlist)
+ else:
+ logObj.getlogger().debug("received DTMF sequence is shorter than expected.{} {}".format(self.__dtmf_secuence,self.__dtmf_secuence_result))
+ self.fail("received DTMF sequence is shorter than expected")
else:
- self.fail("failed: fail reading recorded file")
- return -1
+ self.fail("failed: unable to use multimon command.")
else:
- self.fail("failed: fail playing/recording file")
- return -1
- return 0
+ self.fail("failed: unable to play/record audio.")
+
+ def getresults(self):
+ return self.__resultlist
+
+ def gettextresult(self):
+ return ""
+
+ def calc_audio_duration(self, fname):
+ duration = 0.0
+ with contextlib.closing(wave.open(fname, 'r')) as f:
+ frames = f.getnframes()
+ rate = f.getframerate()
+ duration = frames / float(rate)
+ f.close()
+ return duration
diff --git a/test-cli/test/tests/qbutton.py b/test-cli/test/tests/qbutton.py
deleted file mode 100644
index 72a897e..0000000
--- a/test-cli/test/tests/qbutton.py
+++ /dev/null
@@ -1,54 +0,0 @@
-from test.helpers.syscmd import SysCommand
-import unittest
-import uuid
-import time
-
-class Qbutton(unittest.TestCase):
-
- def __init__(self, testname, testfunc, gpio):
- if gpio == "SOPA":
- super(Qbutton, self).__init__("buttonSopa")
- else:
- super(Qbutton, self).__init__("buttonGpio")
- self._testMethodDoc = testname
-
- def buttonGpio(self):
- print("normal-button-test-using-gpio")
- self.fail("failed: GPIO BUTTON FAIL")
-
- def buttonSopa(self):
- str_cmd = "i2cset -f -y 1 0x2d 0x40 0x31"
- disable_pmic = SysCommand("disable_pmic", str_cmd)
- disable_pmic.execute()
- # BUG: REPEAT THIS EXECUTION TWICE BECAUSE FIRST TIME IT RETURNS AN ERROR
- led_on="echo 1 > /sys/class/leds/red\:usbhost/brightness"
- ledon = SysCommand("led_on", led_on)
- ledon.execute()
- time.sleep(0.1)
- disable_pmic.execute()
- if disable_pmic.execute() == 0:
- str_cmd = "i2cset -f -y 1 0x2d 0x50 0xff"
- reset_button = SysCommand("reset_button", str_cmd)
- if reset_button.execute() == 0:
- str_cmd = "i2cget -f -y 1 0x2d 0x50"
- get_button_val = SysCommand("get_button_val", str_cmd)
- print("\n\t --> PRESS button for 1 sec (TIMEOUT: 10s) \n")
- timeout = 0
- while timeout < 20:
- if get_button_val.execute() == 0:
- get_button_val.execute()
- button_value = get_button_val.getOutput()
- button_value=button_value.decode('ascii').split("x")
- if int(button_value[1]) == 4:
- timeout = 20
- time.sleep(0.5)
- timeout = timeout + 1
- if timeout==20 and int(button_value[1]) == 0:
- self.fail("failed: timeout exceeded")
- else:
- timeout = 20
- self.fail("failed: not button input")
- else:
- self.fail("failed: could not complete i2c reset button state")
- else:
- self.fail("failed: could not complete i2c disable PMIC")
diff --git a/test-cli/test/tests/qdmesg.py b/test-cli/test/tests/qdmesg.py
new file mode 100644
index 0000000..39a5047
--- /dev/null
+++ b/test-cli/test/tests/qdmesg.py
@@ -0,0 +1,32 @@
+import sh
+import os
+import os.path
+from os import path
+
+
+class Qdmesg:
+ params = None
+ __resultlist = None # resultlist is a python list of python dictionaries
+
+ def __init__(self, testname, testfunc, varlist):
+ self.params = varlist
+ self.__testMethodDoc = testname
+ self.__resultlist = []
+ self.pgObj = varlist["db"]
+ self.__xmlObj = varlist["xml"]
+ self.__QdmesgName = varlist.get('name', 'qdmesg')
+ self.__syslog_dmesg_file = varlist.get('syslog_dmesg',self.__xmlObj.getKeyVal(self.__QdmesgName, "syslog_dmesg",
+ "/var/log/kern.log"))
+
+ def getTestName(self):
+ return self.__testMethodDoc
+
+ def execute(self):
+ self.pgObj.run_test(self.params["testidctl"], self.params["testid"])
+ if not os.path.isfile('{}'.format(self.__syslog_dmesg_file)):
+ self.pgObj.finish_test(self.params["testidctl"], self.params["testid"], "TEST_FAILED", "")
+ return False
+ self.pgObj.upload_result_file(self.params["testidctl"], self.params["testid"], "{}".format(self.__syslog_dmesg_file),
+ "{}".format(self.__syslog_dmesg_file), "text/plain")
+ self.pgObj.finish_test(self.params["testidctl"], self.params["testid"], "TEST_COMPLETE", "")
+ return True
diff --git a/test-cli/test/tests/qduplex_ser.py b/test-cli/test/tests/qduplex_ser.py
index 98bda81..170039b 100644
--- a/test-cli/test/tests/qduplex_ser.py
+++ b/test-cli/test/tests/qduplex_ser.py
@@ -1,46 +1,79 @@
-from test.helpers.syscmd import SysCommand
import unittest
import uuid
import serial
import time
+
class Qduplex(unittest.TestCase):
+ params = None
+ __port1 = None
+ __port2 = None
+ __baudrate = None
+ __resultlist = None # resultlist is a python list of python dictionaries
- def __init__(self, testname, testfunc, port1, port2, baudrate):
+ # varlist: port1, port2, baudrate
+ def __init__(self, testname, testfunc, varlist):
+ self.params = varlist
super(Qduplex, self).__init__(testfunc)
- self.__port1 = port1
- self.__serial1 = serial.Serial(self.__port1, timeout=1)
- self.__serial1.baudrate = baudrate
-
- self.__port2 = port2
- self.__serial2 = serial.Serial(self.__port2, timeout=1)
- self.__serial2.baudrate = baudrate
+ if "port1" in varlist:
+ self.__port1 = varlist["port1"]
+ else:
+ raise Exception('port1 param inside Qduplex must be defined')
+ if "port2" in varlist:
+ self.__port2 = varlist["port2"]
+ else:
+ raise Exception('port2 param inside Qduplex must be defined')
+ if "baudrate" in varlist:
+ self.__baudrate = varlist["baudrate"]
+ else:
+ raise Exception('baudrate param inside Qduplex must be defined')
self._testMethodDoc = testname
+ self.__resultlist = []
+
+ # open serial connection
+ self.__serial1 = serial.Serial(self.__port1, self.__baudrate, timeout=1)
+ self.__serial2 = serial.Serial(self.__port2, self.__baudrate, timeout=1)
def __del__(self):
self.__serial1.close()
self.__serial2.close()
def execute(self):
+ # generate a random uuid
+ test_uuid = str(uuid.uuid4()).encode()
+
+ # clean serial ports
self.__serial1.flushInput()
self.__serial1.flushOutput()
self.__serial2.flushInput()
self.__serial2.flushOutput()
- test_uuid = str(uuid.uuid4()).encode()
+ # send the uuid through serial port
self.__serial1.write(test_uuid)
time.sleep(0.05) # there might be a small delay
if self.__serial2.inWaiting() == 0:
- self.fail("failed: PORT {} wait timeout exceded, wrong communication?".format(self.__port2))
+ self.fail("failed: port {} wait timeout exceded".format(self.__port2))
else:
- if (self.__serial2.readline() != test_uuid):
- self.fail("failed: PORT {} write/read mismatch".format(self.__port2))
+ if self.__serial2.readline() != test_uuid:
+ self.fail("failed: port {} write/read mismatch".format(self.__port2))
- test_uuid = str(uuid.uuid4()).encode()
+ time.sleep(0.05) # there might be a small delay
+
+ # clean serial ports
+ self.__serial1.flushInput()
+ self.__serial1.flushOutput()
+ self.__serial2.flushInput()
+ self.__serial2.flushOutput()
+ # send the uuid through serial port
self.__serial2.write(test_uuid)
time.sleep(0.05) # there might be a small delay
if self.__serial1.inWaiting() == 0:
- self.fail("failed: PORT {} wait timeout exceded, wrong communication?".format(self.__port1))
+ self.fail("failed: port {} wait timeout exceded".format(self.__port1))
else:
- if (self.__serial1.readline() != test_uuid):
- self.fail("failed: PORT {} write/read mismatch".format(self.__port1))
-
+ if self.__serial1.readline() != test_uuid:
+ self.fail("failed: port {} write/read mismatch".format(self.__port1))
+
+ def getresults(self):
+ return self.__resultlist
+
+ def gettextresult(self):
+ return ""
diff --git a/test-cli/test/tests/qeeprom.py b/test-cli/test/tests/qeeprom.py
index f72f78f..7900381 100644
--- a/test-cli/test/tests/qeeprom.py
+++ b/test-cli/test/tests/qeeprom.py
@@ -1,38 +1,68 @@
-from test.helpers.syscmd import SysCommand
import unittest
-import uuid
+import os
class Qeeprom(unittest.TestCase):
+ params = None
+ __resultlist = None # resultlist is a python list of python dictionaries
+ __QEepromName = None
+ __i2cBus = None
+ __device = None
- def __init__(self, testname, testfunc):
+ # varlist: position, eeprompath
+ def __init__(self, testname, testfunc, varlist):
+ self.params = varlist
super(Qeeprom, self).__init__(testfunc)
self._testMethodDoc = testname
+ self.__xmlObj = varlist["xml"]
+ self.__QEepromName = varlist.get('name', 'qeeprom')
+ self.__i2cAddress = varlist.get('i2c_address', self.__xmlObj.getKeyVal(self.__QEepromName, "i2c_address", "0050"))
+ self.__i2cBus = varlist.get('i2c_bus', self.__xmlObj.getKeyVal(self.__QEepromName, "i2c_bus", "0"))
+ self.__device = '/sys/bus/i2c/devices/{}-{}/eeprom'.format(self.__i2cBus, self.__i2cAddress)
+
+ self.__resultlist = []
+
+ def __check_device(self):
+ if os.path.isfile(self.__device):
+ return True
+ return False
+
+ def __eeprom_write(self, data):
+ try:
+ f = open(self.__device, "wb")
+ f.write(data)
+ f.close()
+ except IOError as Error:
+ return False, '{}'.format(Error.errno)
+ return True, ''
+
+ def __eeprom_read(self, size):
+ try:
+ f = open(self.__device, "rb")
+ data = f.read(size)
+ f.close()
+ except IOError as Error:
+ return False, '{}'.format(Error.errno)
+ return True, data
+
def execute(self):
- str_cmd = "find /sys/ -iname 'eeprom'"
- eeprom_location = SysCommand("eeprom_location", str_cmd)
- if eeprom_location.execute() == 0:
- self.__raw_out = eeprom_location.getOutput()
- if self.__raw_out == "":
- self.fail("Unable to get EEPROM location. IS EEPROM CONNECTED?")
- return -1
- eeprom=self.__raw_out.decode('ascii')
- test_uuid = uuid.uuid4()
- str_cmd="echo '{}' > {}".format(str(test_uuid), eeprom)
- eeprom_write = SysCommand("eeprom_write", str_cmd)
- if eeprom_write.execute() == 0:
- self.__raw_out = eeprom_write.getOutput()
- if self.__raw_out == "":
- self.fail("Unable to write on the EEPROM?")
- return -1
- str_cmd = "head -2 {}".format(eeprom)
- eeprom_read = SysCommand("eeprom_read", str_cmd)
- if eeprom_read.execute() == 0:
- self.__raw_out = eeprom_read.getOutput()
- if self.__raw_out == "":
- self.fail("Unable to read from the EEPROM?")
- return -1
- if(str(self.__raw_out).find(str(test_uuid)) == -1):
- self.fail("failed: READ/WRITE mismatch")
- else:
- self.fail("failed: could not complete find eeprom command")
+ if not self.__check_device():
+ self.fail("cannot open device {}".format(self.__device))
+
+ data = bytes('AbcDeFgHijK098521', 'utf-8')
+
+ v, w = self.__eeprom_write(data)
+ if not v:
+ self.fail("eeprom: {} write test failed".format(w))
+ v, r = self.__eeprom_read(len(data))
+ if not v:
+ self.fail("eeprom: {} read test failed".format(r))
+ if r != data:
+ self.fail("mismatch between write and read bytes")
+
+
+ def getresults(self):
+ return self.__resultlist
+
+ def gettextresult(self):
+ return ""
diff --git a/test-cli/test/tests/qethernet.py b/test-cli/test/tests/qethernet.py
index 2ac447c..7d081a1 100644
--- a/test-cli/test/tests/qethernet.py
+++ b/test-cli/test/tests/qethernet.py
@@ -1,57 +1,124 @@
-from test.helpers.syscmd import SysCommand
import unittest
-
+import sh
+import json
+import time
+import uuid
+import iperf3
+import netifaces
+from test.helpers.utils import save_json_to_disk
+from test.helpers.utils import station2Port
class Qethernet(unittest.TestCase):
- __sip = None
- __raw_out = None
- __MB_req = None
- __MB_real = None
- __BW_real = None
- __dat_list = None
- __bind = None
- __OKBW = None
-
- def __init__(self, testname, testfunc, sip = None, OKBW=100, bind=None):
+ __serverip = None
+ __numbytestx = None
+ __bwexpected = None
+ __port = None
+ params = None
+ __bwreal = None
+ __resultlist = None # resultlist is a python list of python dictionaries
+ timebetweenattempts = 5
+
+ # varlist content: serverip, bwexpected, port
+ def __init__(self, testname, testfunc, varlist):
+ self.params = varlist
super(Qethernet, self).__init__(testfunc)
- if sip is not None:
- self.__sip = sip
- if sip is not None:
- self.__bind = bind
- self.__MB_req = '10'
- self.__OKBW=OKBW
self._testMethodDoc = testname
+ self.__xmlObj = varlist["xml"]
+ self.__QEthName = varlist.get('name', 'qeth')
+ self.__loops = varlist.get('loops', self.__xmlObj.getKeyVal(self.__QEthName, "loops", "1"))
+ self.__port = varlist.get('port', self.__xmlObj.getKeyVal(self.__QEthName, "port", "5000"))
+ self.__bw = varlist.get('bwexpected', self.__xmlObj.getKeyVal(self.__QEthName, "bwexpected", "40.0"))
+ self.__serverip = varlist.get('serverip', self.__xmlObj.getKeyVal(self.__QEthName, "serverip", "localhost"))
+ self.__duration = varlist.get('duration', self.__xmlObj.getKeyVal(self.__QEthName, "duration", "10"))
+ self.__interface = varlist.get('interface', self.__xmlObj.getKeyVal(self.__QEthName, "interface", "eth0"))
+ self.__toPath = varlist.get('to', self.__xmlObj.getKeyVal(self.__QEthName, "to", "/mnt/station_ramdisk"))
+ self.__retriesCount = varlist.get('retries', self.__xmlObj.getKeyVal(self.__QEthName, "retries", "5"))
+ self.__waitRetryTime = varlist.get('wait_retry', self.__xmlObj.getKeyVal(self.__QEthName, "wait_retry", "10"))
+ self.__wifi_res_file = varlist.get('eth_res_file', self.__xmlObj.getKeyVal(self.__QEthName, "eth_res_file", "eth_test_{}.json"))
+ self.__wifi_st_file = varlist.get('eth_st_file', self.__xmlObj.getKeyVal(self.__QEthName, "eth_st_file", "eth_st_{}.json"))
+ self.__file_uuid = uuid.uuid4()
+ self.__wifi_res_file = self.__wifi_res_file.format(self.__file_uuid)
+ self.__wifi_st_file = self.__wifi_st_file.format(self.__file_uuid)
+
+ self.__resultlist = []
+
+ def checkInterface(self, reqInterface):
+ ifaces = netifaces.interfaces()
+ for t in ifaces:
+ if t == reqInterface:
+ return True
+ return False
def execute(self):
- print
- if self.__bind is None:
- str_cmd = "iperf -c {} -x CMSV -n {}M".format(self.__sip, self.__MB_req)
- else:
- str_cmd = "iperf -c {} -x CMSV -n {}M -B {}".format(self.__sip, self.__MB_req, self.__bind)
- iperf_command = SysCommand("iperf", str_cmd)
- if iperf_command.execute() == 0:
- self.__raw_out = iperf_command.getOutput()
- if self.__raw_out == "":
- return -1
- lines = iperf_command.getOutput().splitlines()
- dat = lines[1]
- dat = dat.decode('ascii')
- dat_list = dat.split( )
- for d in dat_list:
- a = dat_list.pop(0)
- if a == "sec":
- break
- self.__MB_real = dat_list[0]
- self.__BW_real = dat_list[2]
- self.__dat_list = dat_list
- #print(self.__MB_real)
- #print(self.__BW_real)
- self.failUnless(float(self.__BW_real)>float(self.__OKBW)*0.9,"failed: speed is lower than spected. Speed(MB/s)" + str(self.__BW_real))
+ self.__resultlist = []
+ # check interfaces
+ if not self.checkInterface(self.__interface):
+ self.fail('Interface not found: {}'.format(self.__interface))
+
+ # Start Iperf
+ client = iperf3.Client()
+ client.duration = int(self.__duration)
+ client.server_hostname = self.__serverip
+ client.port = station2Port(self.__port)
+ count = int(self.__retriesCount)
+ while count > 0:
+ result = client.run()
+ if result.error == None:
+ if result.received_Mbps >= float(self.__bw) and result.sent_Mbps >= float(self.__bw):
+ save_json_to_disk(filePath='{}/{}'.format(self.__toPath, self.__wifi_res_file),
+ description='eth {} iperf3'.format(self.__interface),
+ mime='application/json',
+ json_data=result.json,
+ result=self.__resultlist)
+ return
+ else:
+ self.fail('bw fail: rx:{} tx:{}'.format(result.received_Mbps, result.sent_Mbps))
+ else:
+ count = count - 1
+ time.sleep(int(self.__waitRetryTime))
+
+ self.fail('qEth (max tries) Execution error: {}'.format(result.error))
+
+ def execute2(self):
+ # execute iperf command against the server, but it implements attempts in case the server is busy
+ iperfdone = False
+ while not iperfdone:
+ try:
+ p = sh.iperf3("-c", self.__serverip, "-n", self.__numbytestx, "-f", "m", "-p", self.__port, "-J",
+ _timeout=20)
+ iperfdone = True
+ except sh.TimeoutException:
+ self.fail("failed: iperf timeout reached")
+ except sh.ErrorReturnCode:
+ time.sleep(self.timebetweenattempts)
+
+ # check if it was executed succesfully
+ if p.exit_code == 0:
+ if p.stdout == "":
+ self.fail("failed: error executing iperf command")
+ # analyze output string
+ data = json.loads(p.stdout.decode('ascii'))
+ self.__bwreal = float(data['end']['sum_received']['bits_per_second'])/1024/1024
+ # save result file
+ with open('/mnt/station_ramdisk/ethernet-iperf3.json', 'w') as outfile:
+ json.dump(data, outfile, indent=4)
+ outfile.close()
+ self.__resultlist.append(
+ {
+ "description": "iperf3 output",
+ "filepath": "/mnt/station_ramdisk/ethernet-iperf3.json",
+ "mimetype": "application/json"
+ }
+ )
+
+ # check if BW is in the expected range
+ if self.__bwreal < float(self.__bwexpected):
+ self.fail("failed: speed is lower than spected. Speed(Mbits/s): " + str(self.__bwreal))
else:
- self.fail("failed: could not complete iperf command")
+ self.fail("failed: could not complete iperf command.")
- def get_Total_MB(self):
- return self.__MB_real;
+ def getresults(self):
+ return self.__resultlist
- def get_Total_BW(self):
- return self.__MB_real;
+ def gettextresult(self):
+ return ""
diff --git a/test-cli/test/tests/qflash.py b/test-cli/test/tests/qflash.py
deleted file mode 100644
index cdc4109..0000000
--- a/test-cli/test/tests/qflash.py
+++ /dev/null
@@ -1,77 +0,0 @@
-from test.helpers.syscmd import SysCommand
-import unittest
-from test.helpers.globalVariables import globalVar
-
-class Qflasher(unittest.TestCase):
-
- def __init__(self, testname, testfunc):
- super(Qflasher, self).__init__(testfunc)
- self._testMethodDoc = testname
- model=globalVar.g_mid
- if model.find("IGEP0046") == 0:
- self._flash_method = "mx6"
- self._binlocation="/boot/"
- elif model.find("IGEP0000") == 0:
- self._flash_method = "mx6"
- self._binlocation="/boot/"
- elif model.find("IGEP0034") == 0:
- self._flash_method = "nandti"
- self._binlocation = "/boot/"
- elif model.find("SOPA") == 0:
- self._flash_method = "sopaflash"
- elif model.find("OMAP3") == 0:
- self._flash_method = "nandti"
- self._binlocation = "/boot/"
- elif model.find("OMAP5") == 0:
- self._flash_method = "nandti"
- self._binlocation = "/boot/"
-
-
- def execute(self):
- # CASE eMMC boards.
- if(self._flash_method == "mx6"):
- str_cmd= "dd if={}u-boot.imx of=/dev/mmcblk2 bs=512 seek=2 2>&1 >/dev/null".format(self._binlocation)
- flash_command = SysCommand("flash_command", str_cmd)
- if flash_command.execute() == 0:
- str_cmd = "sync"
- sync_command = SysCommand("sync_command", str_cmd)
- if sync_command.execute() != 0:
- self.fail("failed: could not sync")
- else:
- self.fail("failed: could not complete flash eMMC commands")
- # CASE of nandflash boards
- elif(self._flash_method == "nandti"):
- str_cmd = "nandwrite -p -s 0x0 /dev/mtd0 {}u-boot.img 2>&1 >/dev/null; " \
- "nandwrite -p -s 0x20000 /dev/mtd0 {}u-boot.img 2>&1 >/dev/null; " \
- "nandwrite -p -s 0x40000 /dev/mtd0 {}u-boot.img 2>&1 >/dev/null; " \
- "nandwrite -p -s 0x60000 /dev/mtd0 {}u-boot.img 2>&1 >/dev/null; " \
- "".format(self._binlocation, self._binlocation, self._binlocation, self._binlocation,)
- flash_MLO_command = SysCommand("flash_MLO_command", str_cmd)
- if flash_MLO_command.execute() == 0:
- str_cmd= "nandwrite -p /dev/mtd1 {}u-boot.img 2>&1 >/dev/null".format(self._binlocation)
- flash_uBoot_command = SysCommand("flash_command", str_cmd)
- if flash_uBoot_command.execute() == 0:
- str_cmd = "sync"
- sync_command = SysCommand("sync_command", str_cmd)
- if sync_command.execute() != 0:
- self.fail("failed: could not sync")
- else:
- self.fail("failed: could not complete flash u-boot commnds")
- else:
- self.fail("failed: could not complete flash MLO commnd")
-
-
- # CASE of SOPA0000 BOARD. FULL FLASH (u-boot+kernel+rootfs)
- elif (self._flash_method == "sopaflash"):
- str_cmd = "nandtest /dev/mtd0 >/dev/null"
- flash_command = SysCommand("flash_command", str_cmd)
- if flash_command.execute() == 0:
- str_cmd = "/usr/bin/igep-flash --skip-nandtest --image /opt/firmware/demo-ti-image-*-*.tar* >/dev/null 2>&1"
- sync_command = SysCommand("sync_command", str_cmd)
- if sync_command.execute() != 0:
- self.fail("failed: could not complete flashing procces")
- else:
- self.fail("failed: could not complete nandtest mtd0")
-
- else:
- self.fail("failed: could not find flash method")
diff --git a/test-cli/test/tests/qi2c.py b/test-cli/test/tests/qi2c.py
deleted file mode 100644
index 409005c..0000000
--- a/test-cli/test/tests/qi2c.py
+++ /dev/null
@@ -1,29 +0,0 @@
-from test.helpers.syscmd import SysCommand
-import unittest
-
-class Qi2c(unittest.TestCase):
-
- def __init__(self, testname, testfunc, busnum, register):
- super(Qi2c, self).__init__(testfunc)
- self.__busnum = busnum
- self.__register = register.split("/")
- self.__devices=[]
- self._testMethodDoc = testname
-
- def execute(self):
- str_cmd= "i2cdetect -a -y -r {}".format(self.__busnum)
- i2c_command = SysCommand("i2cdetect", str_cmd)
- if i2c_command.execute() == 0:
- self.__raw_out = i2c_command.getOutput()
- if self.__raw_out == "":
- return -1
- lines=self.__raw_out.decode('ascii').splitlines()
- for i in range(len(lines)):
- if (lines[i].count('UU')):
- if (lines[i].find("UU")):
- self.__devices.append("0x{}{}".format((i - 1), hex(int((lines[i].find("UU") - 4) / 3)).split('x')[-1]))
- for i in range(len(self.__register)):
- if not(self.__register[i] in self.__devices):
- self.fail("failed: device {} not found in bus i2c-{}".format(self.__register[i], self.__busnum))
- else:
- self.fail("failed: could not complete i2cdedtect command")
diff --git a/test-cli/test/tests/qiperf.py b/test-cli/test/tests/qiperf.py
deleted file mode 100644
index 126b6ee..0000000
--- a/test-cli/test/tests/qiperf.py
+++ /dev/null
@@ -1,53 +0,0 @@
-from test.helpers.syscmd import SysCommand
-
-
-class QIperf(object):
- __sip = None
- __raw_out = None
- __MB_req = None
- __MB_real = None
- __BW_real = None
- __dat_list = None
- __bind = None
-
- def __init__(self, sip = None):
- if sip is not None:
- self.__sip = sip
- self.__MB_req = '10'
-
- def execute(self, sip = None, bind = None):
- if sip is not None:
- self.__sip = sip
-
- if bind is None:
- str_cmd = "iperf -c {} -x CMSV -n {}M".format(self.__sip, self.__MB_req)
- else:
- self.__bind = bind
- str_cmd = "iperf -c {} -x CMSV -n {}M -B {}".format(self.__sip, self.__MB_req, self.__bind)
- t = SysCommand("iperf", str_cmd)
- if t.execute() == 0:
- self.__raw_out = t.getOutput()
- if self.__raw_out == "":
- return -1
- lines = t.getOutput().splitlines()
- dat = lines[1]
- dat = dat.decode('ascii')
- dat_list = dat.split( )
- for d in dat_list:
- a = dat_list.pop(0)
- if a == "sec":
- break
- self.__MB_real = dat_list[0]
- self.__BW_real = dat_list[2]
- self.__dat_list = dat_list
- print(self.__MB_real)
- print(self.__BW_real)
- else:
- return -1
- return 0
-
- def get_Total_MB(self):
- return self.__MB_real;
-
- def get_Total_BW(self):
- return self.__MB_real;
diff --git a/test-cli/test/tests/qmmcflash.py b/test-cli/test/tests/qmmcflash.py
new file mode 100644
index 0000000..0f5a0c1
--- /dev/null
+++ b/test-cli/test/tests/qmmcflash.py
@@ -0,0 +1,55 @@
+import unittest
+import scanf
+from scanf import scanf
+from sh import mmc
+from sh import ErrorReturnCode
+from test.helpers.utils import save_file_to_disk
+from test.helpers.utils import sys_read
+
+class Qmmcflash(unittest.TestCase):
+ params = None
+
+ def __init__(self, testname, testfunc, varlist):
+ self.params = varlist
+ super(Qmmcflash, self).__init__(testfunc)
+ self._testMethodDoc = testname
+ self.__xmlObj = varlist["xml"]
+
+ self.__QeMMCName = varlist.get('name', 'qemmc')
+ self.__emmcDevice = varlist.get('device', self.__xmlObj.getKeyVal(self.__QeMMCName, "device", "mmcblk0"))
+ self.__toPath = varlist.get('to', self.__xmlObj.getKeyVal(self.__QeMMCName, "to", "/mnt/station_ramdisk"))
+ self.__mmc_res_file = varlist.get('emmc_res_file', self.__xmlObj.getKeyVal(self.__QeMMCName, "emmc_res_file", "emmc_status.txt"))
+ self.__mmcPort = varlist.get('mmc_port', self.__xmlObj.getKeyVal(self.__QeMMCName, "mmc_port", "0"))
+ self.__mmcID = varlist.get('mmc_id', self.__xmlObj.getKeyVal(self.__QeMMCName, "mmc_id", "0001"))
+
+ self.__resultlist = []
+
+ def execute(self):
+ try:
+ dataOut = mmc('extcsd', 'read', '/dev/{}'.format(self.__emmcDevice))
+ save_file_to_disk(filePath='{}/{}'.format(self.__toPath, self.__mmc_res_file),
+ description='eMMC health test',
+ mime='text/plain',
+ data=dataOut.stdout.decode('utf-8'),
+ result=self.__resultlist)
+ sysDevice = "/sys/bus/mmc/drivers/mmcblk/mmc{}:{}".format(self.__mmcPort, self.__mmcID)
+ r, data = sys_read("{}/life_time".format(sysDevice))
+ if not r:
+ self.fail("emmc: life_time not found")
+ res = scanf("0x%d 0x%d", data)
+ if res[0] > 3 or res[1] > 3:
+ self.fail("emmc: review {} life_time > 3".format(sysDevice))
+ r, data = sys_read("{}/pre_eol_info".format(sysDevice))
+ if not r:
+ self.fail("emmc: pre_eol_info not found")
+ res = scanf("0x%d", data)
+ if res[0] != 1:
+ self.fail("emmc: review {} pre_eol_info != 1".format(sysDevice))
+ except ErrorReturnCode as Error:
+ self.fail("emmc: failed {} ".format(Error.exit_code))
+
+ def getresults(self):
+ return self.__resultlist
+
+ def gettextresult(self):
+ return "" \ No newline at end of file
diff --git a/test-cli/test/tests/qnand.py b/test-cli/test/tests/qnand.py
new file mode 100644
index 0000000..6de1eaf
--- /dev/null
+++ b/test-cli/test/tests/qnand.py
@@ -0,0 +1,48 @@
+import unittest
+import sh
+import uuid
+from test.helpers.utils import save_file_to_disk
+
+class Qnand(unittest.TestCase):
+ params = None
+ __device = "10M"
+ __resultlist = None # resultlist is a python list of python dictionaries
+
+ # varlist: device
+ def __init__(self, testname, testfunc, varlist):
+ self.params = varlist
+ super(Qnand, self).__init__(testfunc)
+ self._testMethodDoc = testname
+ self.__xmlObj = varlist["xml"]
+ self.__QNandName = varlist.get('name', 'qnand')
+ self.__device = varlist.get('device', self.__xmlObj.getKeyVal(self.__QNandName, "device", "/dev/mtd0"))
+ self.__nand_res_file = varlist.get('nand_res_file', self.__xmlObj.getKeyVal(self.__QNandName, "nand_res_file", "nand_test_{}.txt"))
+ self.__toPath = varlist.get('to', self.__xmlObj.getKeyVal(self.__QNandName, "to", "/mnt/station_ramdisk"))
+ self.__file_uuid = uuid.uuid4()
+ self.__nand_res_file = self.__nand_res_file.format(self.__file_uuid)
+ nandtestPath = self.__xmlObj.getKeyVal("qnand", "nandtestPath", '/root/hwtest-files/nandtest"')
+
+ if nandtestPath == '':
+ self.myNandTest = sh.nandtest
+ else:
+ self.myNandTest = sh.Command(nandtestPath)
+ self.__resultlist = []
+
+ def execute(self):
+ try:
+ res = self.myNandTest("-m", self.__device)
+ save_file_to_disk(filePath='{}/{}'.format(self.__toPath, self.__nand_res_file),
+ description='nand {} test'.format(self.__device),
+ mime='text/plain',
+ data=res.stdout.decode('utf-8'),
+ result=self.__resultlist)
+
+ except sh.ErrorReturnCode_1 as e:
+ self.fail("nandtest failed: {}".format(e.ErrorReturnCode.stdout))
+
+
+ def getresults(self):
+ return self.__resultlist
+
+ def gettextresult(self):
+ return ""
diff --git a/test-cli/test/tests/qplc.py b/test-cli/test/tests/qplc.py
new file mode 100644
index 0000000..28310c6
--- /dev/null
+++ b/test-cli/test/tests/qplc.py
@@ -0,0 +1,121 @@
+import shutil
+import unittest
+import os.path
+import os
+import sh
+import stat
+import time
+import ping3
+from netifaces import AF_INET
+import netifaces as ni
+
+from test.helpers.md5 import md5_file
+from test.helpers.plc import dcpPLC
+from test.helpers.iseelogger import logObj
+
+class Qplc(unittest.TestCase):
+ params = None
+ __resultlist = None # resultlist is a python list of python dictionaries
+ __QPLCName = None
+ __plc = None
+ __board_uuid = None
+
+ def __init__(self, testname, testfunc, varlist):
+ self.params = varlist
+ super(Qplc, self).__init__(testfunc)
+ self._testMethodDoc = testname
+ self.__xmlObj = varlist["xml"]
+ self.__psdbObj = varlist["db"]
+ self.__board_uuid = varlist["boarduuid"]
+
+ self.__QPLCName = varlist.get('name', 'qplc')
+ self.__firmware = varlist.get('firmware', self.__xmlObj.getKeyVal(self.__QPLCName, "firmware", ""))
+ self.__firmware_md5 = varlist.get('firmware_md5', self.__xmlObj.getKeyVal(self.__QPLCName, "firmware_md5", ""))
+ self.__factoryTool = varlist.get('factory_tool', self.__xmlObj.getKeyVal(self.__QPLCName, "factory_tool", ""))
+ self.__gen_ip = varlist.get('gen_ip', self.__xmlObj.getKeyVal(self.__QPLCName, "gen_ip", "0"))
+ self.__gen_mac = varlist.get('gen_mac', self.__xmlObj.getKeyVal(self.__QPLCName, "gen_mac", "0"))
+ self.__mtd_device = varlist.get('mtd_device', self.__xmlObj.getKeyVal(self.__QPLCName, "mtd_device", "/dev/mtd0"))
+ self.__firmware_Path = varlist.get('firmwarepath', self.__xmlObj.getKeyVal(self.__QPLCName, "firmwarepath", "/root/hwtest-files/firmware"))
+ self.__skipflash = varlist.get('skipflash', self.__xmlObj.getKeyVal(self.__QPLCName, "skipflash", "0"))
+ self.__plc = dcpPLC(self.__factoryTool, self.__mtd_device)
+ self.__factory_password = varlist.get('factory_password', self.__xmlObj.getKeyVal(self.__QPLCName, "factory_password", "0"))
+ self.__firmware_password = varlist.get('firmware_password', self.__xmlObj.getKeyVal(self.__QPLCName, "firmware_password", "0"))
+ self.__plc_reset_wait = varlist.get('plc_reset_wait', self.__xmlObj.getKeyVal(self.__QPLCName, "plc_reset_wait", "60"))
+ self.__resultlist = []
+
+
+ def checkfiles(self):
+ if not os.path.isfile(self.__factoryTool):
+ return False, 'Factory tool Not found {}/{}'.format(self.__factoryTool)
+ if not os.path.isfile('{}/{}'.format(self.__firmware_Path, self.__firmware)):
+ return False, 'Firmware Not found {}/{}'.format(self.__firmware_Path, self.__firmware)
+ if not os.path.isfile('{}/{}'.format(self.__firmware_Path, self.__firmware_md5)):
+ return False, 'Firmware md5 Not found {}/{}'.format(self.__firmware_Path, self.__firmware_md5)
+ try:
+ mode = os.stat(self.__mtd_device).st_mode;
+ if not stat.S_ISCHR(mode):
+ return False, 'No device {} found'.format(self.__mtd_device)
+ except:
+ return False, 'No device {} found'.format(self.__mtd_device)
+ return True, ''
+
+ def __flashMemory(self):
+ r, v = self.__plc.SaveFirmware('{}/{}'.format(self.__firmware_Path, self.__firmware))
+ if not r:
+ self.fail(v)
+
+ def execute(self):
+ # Check files
+ v, m = self.checkfiles()
+ if not v:
+ self.fail(m)
+ # do flash & verify
+ if self.__skipflash == '0':
+ self.__flashMemory()
+ # do Plc boot
+ self.__plc.setBootMode()
+ # Wait plc goes UP
+ if self.__gen_mac:
+ res, v = self.__psdbObj.get_plc_macaddr(self.__board_uuid)
+ if res == None and v == 'MAC limit by uuid':
+ self.fail('MAC limit by uuid')
+ elif res == None:
+ self.fail('BUG: ?????')
+ logObj.getlogger().info("MAC {}".format(res[0]))
+ plcMAC = res[0][0]
+ # wait for PLC to be turned on
+ attemptcounter = 0
+ plcfound = False
+ while attemptcounter < 5 and not plcfound:
+ if self.__plc.discover():
+ plcfound = True
+ else:
+ attemptcounter += 1
+ time.sleep(5)
+ # mandatory wait before configuring the PLC chip
+ time.sleep(2)
+ self.__plc.set_plc2('SYSTEM.PRODUCTION.SECTOR0_UNLOCK_PASSWORD',
+ self.__factory_password,
+ 'SYSTEM.PRODUCTION.MAC_ADDR',
+ '{}'.format(plcMAC),
+ self.__firmware_password)
+ # plc reset to save changes
+ self.__plc.setPLCReset()
+ time.sleep(int(self.__plc_reset_wait))
+ # calculate the IP of the GPLC0000 board to ping
+ ip_eth01 = ni.ifaddresses('eth0:1')[AF_INET][0]['addr'] # plc_test_ip = 10.10.1.113
+ ip_eth01_splited = ip_eth01.split('.')
+ ip_eth01_splited[3] = str(int(ip_eth01_splited[3]) + 100)
+ plc_test_ip = ".".join(ip_eth01_splited) # plc_test_ip = 10.10.1.213
+ # ping against the GPLC0000 board
+ ping3.EXCEPTIONS = True
+ try:
+ ping3.ping(plc_test_ip, timeout=0.1)
+ except:
+ self.fail('PLC ping timeout')
+
+ def getresults(self):
+ return self.__resultlist
+
+ def gettextresult(self):
+ return ""
diff --git a/test-cli/test/tests/qram.py b/test-cli/test/tests/qram.py
index 8ec0210..49d4d54 100644
--- a/test-cli/test/tests/qram.py
+++ b/test-cli/test/tests/qram.py
@@ -1,22 +1,62 @@
-from test.helpers.syscmd import SysCommand
import unittest
+import sh
+# from test.helpers.iseelogger import MeasureTime
+# from test.helpers.iseelogger import logObj
+from test.helpers.utils import save_file_to_disk
class Qram(unittest.TestCase):
+ params = None
+ __memsize = None
+ __loops = None
+ __resultlist = [] # resultlist is a python list of python dictionaries
- def __init__(self, testname, testfunc, memSize):
+ # varlist: memsize, loops
+ def __init__(self, testname, testfunc, varlist):
+ self.params = varlist
super(Qram, self).__init__(testfunc)
- self.__memSize = memSize
self._testMethodDoc = testname
+ self.__xmlObj = varlist["xml"]
- def execute(self):
- str_cmd= "free -m"
- free_command = SysCommand("free_ram", str_cmd)
- if free_command.execute() == 0:
- self.__raw_out = free_command.getOutput()
- if self.__raw_out == "":
- return -1
- lines = free_command.getOutput().splitlines()
- aux = [int(s) for s in lines[1].split() if s.isdigit()]
- self.failUnless(int(aux[0])>int(self.__memSize),"failed: total ram memory size lower than expected")
+ self.__QramName = varlist.get('name', 'qram')
+ self.__loops = varlist.get('loops', self.__xmlObj.getKeyVal(self.__QramName, "loops", "1"))
+ self.__memsize = varlist.get('memsize', self.__xmlObj.getKeyVal(self.__QramName, "memsize", "50M"))
+ self.__toPath = varlist.get('to', self.__xmlObj.getKeyVal(self.__QramName, "to", "/mnt/station_ramdisk"))
+ self.__ram_res_file = varlist.get('ram_res_file', self.__xmlObj.getKeyVal(self.__QramName, "ram_res_file", "ram_res.txt"))
+
+ self.__dummytest = varlist.get('dummytest', self.__xmlObj.getKeyVal(self.__QramName, "dummytest", "0"))
+ self.__dummyresult = varlist.get('dummytestresult', self.__xmlObj.getKeyVal(self.__QramName, "dummytest", "0"))
+
+ memtesterPath = self.__xmlObj.getKeyVal(self.__QramName, "memtesterPath", '')
+
+ if memtesterPath == '':
+ self.myMemtester = sh.memtester
else:
- self.fail("failed: could not complete iperf command")
+ self.myMemtester = sh.Command(memtesterPath)
+
+ def execute(self):
+ self.__resultlist = []
+ try:
+ # mytime = MeasureTime()
+ res = self.myMemtester(self.__memsize, '{}'.format(self.__loops))
+ save_file_to_disk(filePath='{}/{}'.format(self.__toPath, self.__ram_res_file),
+ description='Ram result test',
+ mime='text/plain',
+ data=res.stdout.decode('utf-8'),
+ result=self.__resultlist)
+ # mytime.stop()
+ except sh.ErrorReturnCode as e:
+ save_file_to_disk(filePath='{}/{}'.format(self.__toPath, self.__ram_res_file),
+ description='Ram result test',
+ mime='text/plain',
+ data=e.stdout.decode('utf-8'),
+ result=self.__resultlist)
+ self.fail("failed: memtester {}::{}".format(str(e.exit_code), e.stdout.decode('utf-8')))
+
+ except Exception as details:
+ self.fail('Error: {}'.format(details))
+
+ def getresults(self):
+ return self.__resultlist
+
+ def gettextresult(self):
+ return ""
diff --git a/test-cli/test/tests/qrtc.py b/test-cli/test/tests/qrtc.py
index 1d02f78..df493d2 100644
--- a/test-cli/test/tests/qrtc.py
+++ b/test-cli/test/tests/qrtc.py
@@ -1,28 +1,46 @@
-from test.helpers.syscmd import SysCommand
+import sh
import unittest
import time
+import re
+
class Qrtc(unittest.TestCase):
+ params = None
+ __rtc = None
+ __resultlist = None # resultlist is a python list of python dictionaries
- def __init__(self, testname, testfunc, rtc):
+ def __init__(self, testname, testfunc, varlist):
+ self.params = varlist
super(Qrtc, self).__init__(testfunc)
- self.__rtc = rtc
+ if "rtc" in varlist:
+ self.__rtc = varlist["rtc"]
+ else:
+ raise Exception('rtc param inside Qrtc must be defined')
self._testMethodDoc = testname
+ self.__resultlist = []
def execute(self):
- str_cmd = "hwclock -f {}".format(self.__rtc)
- rtc_set = SysCommand("rtc_set", str_cmd)
- if rtc_set.execute() == 0:
- curr_hour = rtc_set.getOutput().decode('ascii').split(" ")
- time1 = int((curr_hour[4].split(":"))[2])
+ # get time from RTC peripheral
+ p = sh.hwclock("-f", self.__rtc)
+ if p.exit_code == 0:
+ time1 = re.search("([01]?[0-9]{1}|2[0-3]{1})[:][0-5]{1}[0-9]{1}[:][0-5]{1}[0-9]{1}",
+ p.stdout.decode('ascii'))
time.sleep(1)
- if rtc_set.execute() == 0:
- curr_hour = rtc_set.getOutput().decode('ascii').split(" ")
- time2 = int((curr_hour[4].split(":"))[2])
- if time1==time2:
+ # get time from RTC another time
+ p = sh.hwclock("-f", self.__rtc)
+ if p.exit_code == 0:
+ time2 = re.search("([01]?[0-9]{1}|2[0-3]{1})[:][0-5]{1}[0-9]{1}[:][0-5]{1}[0-9]{1}",
+ p.stdout.decode('ascii'))
+ # check if the seconds of both times are different
+ if time1 == time2:
self.fail("failed: RTC is not running")
else:
- self.fail("failed: couldn't execute hwclock command 2nd time")
+ self.fail("failed: couldn't execute hwclock command for the second time")
else:
self.fail("failed: couldn't execute hwclock command")
+ def getresults(self):
+ return self.__resultlist
+
+ def gettextresult(self):
+ return ""
diff --git a/test-cli/test/tests/qscreen.py b/test-cli/test/tests/qscreen.py
deleted file mode 100644
index b8a5a48..0000000
--- a/test-cli/test/tests/qscreen.py
+++ /dev/null
@@ -1,31 +0,0 @@
-from test.helpers.syscmd import SysCommand
-import unittest
-import time
-from test.helpers.cv_display_test import pattern_detect
-
-class Qscreen(unittest.TestCase):
-
- def __init__(self, testname, testfunc, display):
- super(Qscreen, self).__init__(testfunc)
- self.__display = display
- self._testMethodDoc = testname
-
- def execute(self):
- str_cmd = "fbi -T 1 --noverbose -d /dev/{} test/files/test_pattern.png".format(self.__display)
- display_image = SysCommand("display_image", str_cmd)
- if display_image.execute() == -1:
- test_screen = pattern_detect(1)
- if not test_screen=="0":
- self.fail("failed: {}".format(test_screen))
- str_cmd= "fbi -T 1 /home/root/result_hdmi_img.jpg -d /dev/fb0 --noverbose -a"
- show_img = SysCommand("show-image", str_cmd)
- show_img.execute()
- else:
- self.fail("failed: could not display the image")
- try:
- str_cmd= "fbi -T 1 /home/root/result_hdmi_img.jpg -d /dev/fb0 --noverbose -a"
- show_img = SysCommand("show-image", str_cmd)
- show_img.execute()
- except ValueError:
- print("COULD NOT DISPLAY IMAGE")
-
diff --git a/test-cli/test/tests/qserial.py b/test-cli/test/tests/qserial.py
index 43ba3c8..402e33f 100644
--- a/test-cli/test/tests/qserial.py
+++ b/test-cli/test/tests/qserial.py
@@ -1,30 +1,55 @@
-from test.helpers.syscmd import SysCommand
import unittest
import uuid
import serial
import time
+
class Qserial(unittest.TestCase):
+ params = None
+ __port = None
+ __baudrate = None
+ __resultlist = None # resultlist is a python list of python dictionaries
- def __init__(self, testname, testfunc, port, baudrate):
+ # varlist: port, baudrate
+ def __init__(self, testname, testfunc, varlist):
+ self.params = varlist
super(Qserial, self).__init__(testfunc)
- self.__port = port
- self.__serial = serial.Serial(self.__port, timeout=1)
- self.__serial.baudrate = baudrate
+ if "port" in varlist:
+ self.__port = varlist["port"]
+ else:
+ raise Exception('port param inside Qserial must be defined')
+
+ if "baudrate" in varlist:
+ self.__baudrate = varlist["baudrate"]
+ else:
+ raise Exception('baudrate param inside Qserial must be defined')
self._testMethodDoc = testname
+ self.__resultlist = []
+
+ # open serial connection
+ self.__serial = serial.Serial(self.__port, self.__baudrate, timeout=1)
def __del__(self):
self.__serial.close()
def execute(self):
+ # generate a random uuid
+ test_uuid = str(uuid.uuid4()).encode()
+ # clean serial port
self.__serial.flushInput()
self.__serial.flushOutput()
- test_uuid = str(uuid.uuid4()).encode()
+ # send the uuid through serial port
self.__serial.write(test_uuid)
time.sleep(0.05) # there might be a small delay
if self.__serial.inWaiting() == 0:
- self.fail("failed: PORT {} wait timeout exceded, wrong communication?".format(self.__port))
+ self.fail("failed: port {} wait timeout exceded".format(self.__port))
else:
- if (self.__serial.readline() != test_uuid):
- self.fail("failed: PORT {} write/read mismatch".format(self.__port))
+ # check if what it was sent is equal to what has been received
+ if self.__serial.readline() != test_uuid:
+ self.fail("failed: port {} write/read mismatch".format(self.__port))
+
+ def getresults(self):
+ return self.__resultlist
+ def gettextresult(self):
+ return ""
diff --git a/test-cli/test/tests/qtemplate.py b/test-cli/test/tests/qtemplate.py
deleted file mode 100644
index 940cded..0000000
--- a/test-cli/test/tests/qtemplate.py
+++ /dev/null
@@ -1,51 +0,0 @@
-#IF COMMAND IS NEEDED
-from test.helpers.syscmd import SysCommand
-import unittest
-#class name
-class Qtemplate(unittest.TestCase):
- # Initialize the variables
- __variable1 = "Value-a"
- __variable2 = "Value-b"
- #....
- __variablen = "Value-n"
-
- def __init__(self, testname, testfunc, input1=None, inputn=None):
- # Doing this we will initialize the class and later on perform a particular method inside this class
- super(Qtemplate, self).__init__(testfunc)
- self.__testname = testname
- self.__input1 = input1
- self.__inputn = inputn
- self._testMethodDoc = testname
-
-
-
- def execute(self):
- str_cmd = "command"
- t = SysCommand("command-name", str_cmd)
- if t.execute() == 0:
- self.__raw_out = t.getOutput()
- if self.__raw_out == "":
- return -1
- lines = t.getOutput().splitlines()
- dat = lines[1]
- dat = dat.decode('ascii')
- dat_list = dat.split( )
- for d in dat_list:
- a = dat_list.pop(0)
- if a == "sec":
- break
- self.__MB_real = dat_list[0]
- self.__BW_real = dat_list[2]
- self.__dat_list = dat_list
- print(self.__MB_real)
- print(self.__BW_real)
- self.failUnless(int(self.__BW_real)>int(self.__OKBW)*0.9,"FAIL:BECAUSE...")
- else:
- return -1
- return 0
-
- def get_Total_MB(self):
- return self.__MB_real;
-
- def get_Total_BW(self):
- return self.__MB_real;
diff --git a/test-cli/test/tests/qusb.py b/test-cli/test/tests/qusb.py
index 44490bc..4f76d0c 100644
--- a/test-cli/test/tests/qusb.py
+++ b/test-cli/test/tests/qusb.py
@@ -1,60 +1,95 @@
-from test.helpers.syscmd import SysCommand
+import shutil
import unittest
+import os.path
+import os
+import sh
+
+from test.helpers.md5 import md5_file
class Qusb(unittest.TestCase):
+ params = None
+ __resultlist = None # resultlist is a python list of python dictionaries
+ __QusbName = None
- def __init__(self, testname, testfunc, devLabel, numPorts):
+ def __init__(self, testname, testfunc, varlist):
+ self.params = varlist
super(Qusb, self).__init__(testfunc)
- self.__numPorts = numPorts
self._testMethodDoc = testname
- self.__devLabel = devLabel
- if testname=="USBOTG":
- self.__usbFileName = "/this_is_an_usb_otg"
- self.__usbtext = "USBOTG"
- elif testname=="SATA":
- self.__usbFileName = "/this_is_a_sata"
- self.__usbtext = "SATA"
- else:
- self.__usbFileName = "/this_is_an_usb_host"
- self.__usbtext = "USBHOST"
- self.__numUsbFail=[]
+ self.__xmlObj = varlist["xml"]
+
+ self.__QusbName = varlist.get('name', 'qusb')
+ self.__loops = varlist.get('loops', self.__xmlObj.getKeyVal(self.__QusbName, "loops", "3"))
+ self.__filepath_from = varlist.get('path', self.__xmlObj.getKeyVal(self.__QusbName, "path", "/root/hwtest-files"))
+ self.__file_source = varlist.get('file', self.__xmlObj.getKeyVal(self.__QusbName, "file", "usb-test.bin"))
+ self.__file_md5 = varlist.get('file_md5', self.__xmlObj.getKeyVal(self.__QusbName, "file_md5", "usb-test.bin.md5"))
+ self.__path_to = varlist.get('pathto', self.__xmlObj.getKeyVal(self.__QusbName, "pathto", "/mnt/test/disk2"))
+ self.__check_mounted = varlist.get('check_mounted', self.__xmlObj.getKeyVal(self.__QusbName, "check_mounted", ""))
+
+ self.__resultlist = []
+
+ def removefileIfExist(self, fname):
+ if os.path.exists(fname):
+ try:
+ os.remove(fname)
+ except OSError as error:
+ return False, str(error)
+ return True, ''
+
+ def checkfiles(self):
+ if not os.path.isfile('{}/{}'.format(self.__filepath_from, self.__file_source)):
+ return False, 'File test binary Not found {}/{}'.format(self.__filepath_from, self.__file_source)
+ if not os.path.isfile('{}/{}'.format(self.__filepath_from, self.__file_md5)):
+ return False, 'File test binary md5 Not found {}/{}'.format(self.__filepath_from, self.__file_md5)
+ if self.__check_mounted != '':
+ if not os.path.ismount('{}'.format(self.__check_mounted)):
+ return False, 'Required mounted failed: {}'.format(self.__path_to)
+ r, s = self.removefileIfExist('{}/{}'.format(self.__path_to, self.__file_source))
+ if not r:
+ return r, s
+ return True, ''
+
+ def getTestFile_md5(self, fname):
+ t = ''
+ with open('{}'.format(fname), 'r') as outfile:
+ t = outfile.read().rstrip("\n")
+ outfile.close()
+ return t
+
+ def ExecuteTranfer(self):
+ try:
+ originalMD5 = self.getTestFile_md5('{}/{}'.format(self.__filepath_from, self.__file_md5))
+ if originalMD5 == '':
+ self.fail('MD5 file {}/{} Not contain any md5'.format(self.__filepath_from, self.__file_md5))
+ # shutil.copy2('{}/{}'.format(self.__filepath_from, self.__file_source), '{}'.format(self.__path_to))
+ sh.cp('{}/{}'.format(self.__filepath_from, self.__file_source), '{}'.format(self.__path_to))
+ sh.sync()
+ md5val = md5_file('{}/{}'.format(self.__path_to, self.__file_source))
+ self.removefileIfExist('{}/{}'.format(self.__path_to, self.__file_source))
+ if originalMD5 != md5val:
+ return False, 'md5 check failed {}<>{}'.format(originalMD5, md5val)
+ except OSError as why:
+ return False,'Error: {} tranfer failed'.format(why.errno)
+ except sh.ErrorReturnCode as shError:
+ return False, 'USB Exception: tranfer failed'
+ except Exception as details:
+ return False, 'USB Exception: {} tranfer failed'.format(details)
+ return True, ''
def execute(self):
- str_cmd= "lsblk -o LABEL"
- lsblk_command = SysCommand("lsblk", str_cmd)
- if lsblk_command.execute() == 0:
- self.__raw_out = lsblk_command.getOutput()
- if self.__raw_out == "":
- return -1
- lines = lsblk_command.getOutput().splitlines()
- host_list=[]
- for i in range(len(lines)):
- if str(lines[i].decode('ascii'))==self.__devLabel:
- host_list.append(i)
- if len(host_list)==int(self.__numPorts):
- str_cmd = "lsblk -o MOUNTPOINT"
- lsblk_command = SysCommand("lsblk", str_cmd)
- if lsblk_command.execute() == 0:
- self.__raw_out = lsblk_command.getOutput()
- if self.__raw_out == "":
- print("failed: no command output")
- self.fail("failed: no command output")
- else:
- lines = lsblk_command.getOutput().splitlines()
- for i in range(len(host_list)):
- file_path=str(lines[host_list[i]].decode('ascii')) + self.__usbFileName
- usb_file = open(file_path, 'r')
- read=usb_file.read()
- if read.find(self.__usbtext)!=-1:
- print(file_path + " --> OK!")
- else:
- self.fail("failed: could not read from usb {}".format(lines[host_list[i]].decode('ascii')))
- self.__numUsbFail.append(host_list[i])
- usb_file.close()
- else:
- self.fail("failed: couldn't execute lsblk command")
-
- else:
- self.fail("failed: reference and real usb host devices number mismatch")
- else:
- self.fail("failed: couldn't execute lsblk command")
+ v, m = self.checkfiles()
+ if not v:
+ self.fail(m)
+ countLoops = int(self.__loops)
+ while countLoops > 0:
+ res, msg = self.ExecuteTranfer()
+ if not res:
+ res, msg = self.ExecuteTranfer()
+ if not res:
+ self.fail(msg)
+ countLoops = countLoops - 1
+
+ def getresults(self):
+ return self.__resultlist
+
+ def gettextresult(self):
+ return ""
diff --git a/test-cli/test/tests/qvideo.py b/test-cli/test/tests/qvideo.py
new file mode 100644
index 0000000..225940b
--- /dev/null
+++ b/test-cli/test/tests/qvideo.py
@@ -0,0 +1,125 @@
+import unittest
+import sh
+import time
+from test.helpers.camara import Camara
+from test.helpers.detect import Detect_Color
+from test.helpers.sdl import SDL2_Test
+
+class Qvideo(unittest.TestCase):
+ params = None
+ __resultlist = None # resultlist is a python list of python dictionaries
+
+ def __init__(self, testname, testfunc, varlist):
+ self.params = varlist
+ super(Qvideo, self).__init__(testfunc)
+ self._testMethodDoc = testname
+ self.__xmlObj = varlist["xml"]
+ self.__QVideoName = varlist.get('name', 'qvideo')
+ self.__resultlist = []
+ self.__w = int(varlist.get('capture_size_w', self.__xmlObj.getKeyVal(self.__QVideoName, "capture_size_w", 1280)))
+ self.__h = int(varlist.get('capture_size_h', self.__xmlObj.getKeyVal(self.__QVideoName, "capture_size_h", 720)))
+ self.__discard_frames_Count = int(varlist.get('capture_discardframes',self.__xmlObj.getKeyVal(self.__QVideoName, "capture_discardframes", 3)))
+ self.__frame_mean = int(varlist.get('capture_framemean', self.__xmlObj.getKeyVal(self.__QVideoName, "capture_framemean", 3)))
+ self.__max_failed = int(varlist.get('capture_maxfailed', self.__xmlObj.getKeyVal(self.__QVideoName, "capture_maxfailed", 1)))
+ self.__cam_setupscript = varlist.get('cam_setupfile', self.__xmlObj.getKeyVal(self.__QVideoName, "cam_setupfile",
+ "/root/hwtest-files/board/scripts/v4l-cam.sh"))
+ self.__sdl_display = varlist.get('sdl_display', self.__xmlObj.getKeyVal(self.__QVideoName, "sdl_display", ":0"))
+ self.__sdl_driver = varlist.get('sdl_driver', self.__xmlObj.getKeyVal(self.__QVideoName, "sdl_driver", "x11"))
+ self.__camdevice = varlist.get('camdevice', self.__xmlObj.getKeyVal(self.__QVideoName, "camdevice", "video0"))
+
+ self.__Camara = Camara(setup_script_path=self.__cam_setupscript, device=self.__camdevice, width=self.__w, height=self.__h)
+ self.__SDL2_Test = SDL2_Test(driver=self.__sdl_driver, display=self.__sdl_display, w=self.__w, h=self.__h)
+ self.__SDL2_Test.Clear()
+
+ def __drop_frames(self, frame_count):
+ count = frame_count
+ while count > 0:
+ _ = self.__Camara.getFrame()
+ count -= 1
+
+ def Verify_Color(self, color):
+ self.paintColor(color)
+ count = self.__frame_mean
+ res_ok = 0
+ res_failed = 0
+ r_count = 0
+ c_mean = 0
+ while count > 0:
+ res, t = self.Capture(color)
+ if res:
+ res_ok += 1
+ else:
+ res_failed += 1
+ if t == "count":
+ r_count += 1
+ else:
+ c_mean +=1
+ count -= 1
+ self.unpaintColor(color)
+ if res_failed > self.__max_failed:
+ if r_count > 0:
+ return '{}_COLOR_FAILED'.format(color)
+ elif c_mean > 0:
+ return 'MEAN_{}_FAILED'.format(color)
+ return "ok"
+
+ def paintColor(self, color):
+ self.__SDL2_Test.Paint(color)
+ time.sleep(3)
+ self.__drop_frames(self.__Camara.getFrameCount())
+
+ def unpaintColor(self, color):
+ self.__SDL2_Test.Clear()
+ time.sleep(1)
+
+ def Capture(self, color):
+ image = self.__Camara.getFrame()
+ if image is None:
+ return None
+ dtObject = Detect_Color(image)
+ if color == 'RED':
+ _, _, mean, count = dtObject.getRed()
+ elif color == 'BLUE':
+ _, _, mean, count = dtObject.getBlue()
+ elif color == 'GREEN':
+ c_mask, croped, mean, count = dtObject.getGreen()
+
+ return self.checkThreshold(color, mean, count)
+
+ def checkThreshold(self, color, mean, count):
+ min_count = int(
+ self.params.get('{}_min_count'.format(color),
+ self.__xmlObj.getKeyVal(self.__QVideoName, '{}_min_count'.format(color), 50000)))
+
+ str = ""
+ # first verify count
+ if count >= min_count:
+ return True, ""
+ else:
+ str = "count"
+ return False, str
+
+ def execute(self):
+ self.__resultlist = []
+
+ # Open camara
+ if not self.__Camara.Open():
+ self.fail('Error: USB camera not found')
+ # discard initial frames
+ self.__drop_frames(self.__discard_frames_Count)
+ # Test
+ res = self.Verify_Color('RED')
+ if res != "ok":
+ self.fail("failed: RED verification failed")
+ res = self.Verify_Color('BLUE')
+ if res != "ok":
+ self.fail("failed: BLUE verification failed")
+ res = self.Verify_Color('GREEN')
+ if res != "ok":
+ self.fail("failed: GREEN verification failed")
+
+ def getresults(self):
+ return self.__resultlist
+
+ def gettextresult(self):
+ return ""
diff --git a/test-cli/test/tests/qwifi.py b/test-cli/test/tests/qwifi.py
index d08149b..4695041 100644
--- a/test-cli/test/tests/qwifi.py
+++ b/test-cli/test/tests/qwifi.py
@@ -1,45 +1,99 @@
-from test.helpers.syscmd import SysCommand
import unittest
-import subprocess
+import time
+import uuid
+import iperf3
+import netifaces
+from test.helpers.iw import iwScan
+from test.helpers.utils import save_json_to_disk
+from test.helpers.utils import station2Port
class Qwifi(unittest.TestCase):
+ __serverip = None
+ __numbytestx = None
+ __bwexpected = None
+ __port = None
+ params = None
+ __bwreal = None
+ __resultlist = None # resultlist is a python list of python dictionaries
+ timebetweenattempts = 5
- def __init__(self, testname, testfunc, signal):
+ # varlist content: serverip, bwexpected, port
+ def __init__(self, testname, testfunc, varlist):
+ self.params = varlist
super(Qwifi, self).__init__(testfunc)
- self.__signal = signal
self._testMethodDoc = testname
- # WiFi SERVERIP fixed at the moment.
- self._serverip = "192.168.5.1"
+ self.__xmlObj = varlist["xml"]
+ self.__QwifiName = varlist.get('name', 'qwifi')
+ self.__loops = varlist.get('loops', self.__xmlObj.getKeyVal(self.__QwifiName, "loops", "1"))
+ self.__port = varlist.get('port', self.__xmlObj.getKeyVal(self.__QwifiName, "port", "5000"))
+ self.__bw = varlist.get('bwexpected', self.__xmlObj.getKeyVal(self.__QwifiName, "bwexpected", "5.0"))
+ self.__serverip = varlist.get('serverip', self.__xmlObj.getKeyVal(self.__QwifiName, "serverip", "localhost"))
+ self.__duration = varlist.get('duration', self.__xmlObj.getKeyVal(self.__QwifiName, "duration", "10"))
+ self.__interface = varlist.get('interface', self.__xmlObj.getKeyVal(self.__QwifiName, "interface", "wlan0"))
+ self.__toPath = varlist.get('to', self.__xmlObj.getKeyVal(self.__QwifiName, "to", "/mnt/station_ramdisk"))
+ self.__retriesCount = varlist.get('retries', self.__xmlObj.getKeyVal(self.__QwifiName, "retries", "5"))
+ self.__waitRetryTime = varlist.get('wait_retry', self.__xmlObj.getKeyVal(self.__QwifiName, "wait_retry", "10"))
+ self.__wifi_res_file = varlist.get('wifi_res_file', self.__xmlObj.getKeyVal(self.__QwifiName, "wifi_res_file", "wifi_test_{}.json"))
+ self.__wifi_st_file = varlist.get('wifi_st_file', self.__xmlObj.getKeyVal(self.__QwifiName, "wifi_st_file", "wifi_st_{}.json"))
+ self.__file_uuid = uuid.uuid4()
+ self.__wifi_res_file = self.__wifi_res_file.format(self.__file_uuid)
+ self.__wifi_st_file = self.__wifi_st_file.format(self.__file_uuid)
+ self.__resultlist = []
+
+ def checkInterface(self, reqInterface):
+ ifaces = netifaces.interfaces()
+ for t in ifaces:
+ if t == reqInterface:
+ return True
+ return False
def execute(self):
- # First check connection with the wifi server using ping command
- #str_cmd = "ping -c 1 {} > /dev/null".format(self._serverip)
- #wifi_ping = SysCommand("wifi_ping", str_cmd)
- #wifi_ping.execute()
- #res = subprocess.call(['ping', '-c', '1', self._serverip])
- p = subprocess.Popen(['ping','-c','1',self._serverip,'-W','2'],stdout=subprocess.PIPE)
- p.wait()
- res=p.poll()
- if res == 0:
- str_cmd= "iw wlan0 link"
- wifi_stats = SysCommand("wifi_stats", str_cmd)
- if wifi_stats.execute() == 0:
- w_stats = wifi_stats.getOutput().decode('ascii').splitlines()
- #self._longMessage = str(self.__raw_out).replace("'", "")
- if not w_stats[0] == "Not connected.":
- signal_st = w_stats[5].split(" ")[1]
- try:
- int(signal_st)
- if -1*int(signal_st) > int(self.__signal):
- self.fail("failed: signal to server lower than expected")
- except ValueError:
- self.fail("failed: error output (Bad connection?)")
+ self.__resultlist = []
+ stationList = {}
+ # check interfaces
+ if not self.checkInterface(self.__interface):
+ self.fail('Interface not found: {}'.format(self.__interface))
+ # scan networks
+ scanner = iwScan(self.__interface)
+ if scanner.scan():
+ stationList = scanner.getStations()
+ # check if we are connected
+ if not scanner.findConnected():
+ self.fail('Not connected to test AP')
+ else:
+ strError = scanner.getLastError()
+ self.fail('qWifi scanner Execution error: {}'.format(strError))
+ # Start Iperf
+ client = iperf3.Client()
+ client.duration = int(self.__duration)
+ client.server_hostname = self.__serverip
+ client.port = station2Port(self.__port)
+ count = int(self.__retriesCount)
+ while count > 0:
+ result = client.run()
+ if result.error == None:
+ if result.received_Mbps >= float(self.__bw) and result.sent_Mbps >= float(self.__bw):
+ save_json_to_disk(filePath='{}/{}'.format(self.__toPath, self.__wifi_st_file),
+ description='wifi scan',
+ mime='application/json',
+ json_data=stationList,
+ result=self.__resultlist)
+ save_json_to_disk(filePath='{}/{}'.format(self.__toPath, self.__wifi_res_file),
+ description='wifi {} iperf3'.format(self.__interface),
+ mime='application/json',
+ json_data=result.json,
+ result=self.__resultlist)
+ return
else:
- self.fail("failed: error output (Bad connection?)")
- #tx_brate = float(w_stats[6].split(" ")[2])
+ self.fail('bw fail: rx:{} tx:{}'.format(result.received_Mbps, result.sent_Mbps))
else:
- self.fail("failed: couldn't execute iw command")
- else:
- self.fail("failed: ping to server {}".format(self._serverip))
+ count = count - 1
+ time.sleep(int(self.__waitRetryTime))
+
+ self.fail('qWifi (max tries) Execution error: {}'.format(result.error))
+ def getresults(self):
+ return self.__resultlist
+ def gettextresult(self):
+ return ""