diff options
Diffstat (limited to 'test-cli/test/tests')
-rw-r--r-- | test-cli/test/tests/qamp.py | 87 | ||||
-rw-r--r-- | test-cli/test/tests/qamper.py | 74 | ||||
-rw-r--r-- | test-cli/test/tests/qaudio.py | 119 | ||||
-rw-r--r-- | test-cli/test/tests/qbutton.py | 54 | ||||
-rw-r--r-- | test-cli/test/tests/qdmesg.py | 32 | ||||
-rw-r--r-- | test-cli/test/tests/qduplex_ser.py | 69 | ||||
-rw-r--r-- | test-cli/test/tests/qeeprom.py | 90 | ||||
-rw-r--r-- | test-cli/test/tests/qethernet.py | 161 | ||||
-rw-r--r-- | test-cli/test/tests/qflash.py | 77 | ||||
-rw-r--r-- | test-cli/test/tests/qi2c.py | 29 | ||||
-rw-r--r-- | test-cli/test/tests/qiperf.py | 53 | ||||
-rw-r--r-- | test-cli/test/tests/qmmcflash.py | 55 | ||||
-rw-r--r-- | test-cli/test/tests/qnand.py | 48 | ||||
-rw-r--r-- | test-cli/test/tests/qplc.py | 121 | ||||
-rw-r--r-- | test-cli/test/tests/qram.py | 68 | ||||
-rw-r--r-- | test-cli/test/tests/qrtc.py | 44 | ||||
-rw-r--r-- | test-cli/test/tests/qscreen.py | 31 | ||||
-rw-r--r-- | test-cli/test/tests/qserial.py | 43 | ||||
-rw-r--r-- | test-cli/test/tests/qtemplate.py | 51 | ||||
-rw-r--r-- | test-cli/test/tests/qusb.py | 139 | ||||
-rw-r--r-- | test-cli/test/tests/qvideo.py | 125 | ||||
-rw-r--r-- | test-cli/test/tests/qwifi.py | 120 |
22 files changed, 1064 insertions, 626 deletions
diff --git a/test-cli/test/tests/qamp.py b/test-cli/test/tests/qamp.py deleted file mode 100644 index 9fcd6d2..0000000 --- a/test-cli/test/tests/qamp.py +++ /dev/null @@ -1,87 +0,0 @@ -from test.helpers.syscmd import SysCommand -import unittest -import serial -import time - -class Qamp(unittest.TestCase): - - def __init__(self, testname, testfunc, undercurrent=0.1, overcurrent=2): - self._current = 0.0 - self._undercurrent=undercurrent - self._overcurrent = overcurrent - self._vshuntfactor=16384.0 - self._ser = serial.Serial() - self._ser.port = '/dev/ttyUSB0' - self._ser.baudrate = 115200 - self._ser.parity = serial.PARITY_NONE - self._ser.timeout = 0 - self._ser.writeTimout = 0 - self._ser.xonxoff = False - self._ser.rtscts = False - self._ser.dsrdtr = False - super(Qamp, self).__init__(testfunc) - self._testMethodDoc = testname - - def __del__(self): - self._ser.close() - - def execute(self): - # Open Serial port ttyUSB0 - error=0 - try: - self._ser.open() - except: - self.fail("failed: IMPOSSIBLE OPEN USB-SERIAL PORT ( {} )".format(self._ser.port)) - error=1 - return -1 - if error==0: - # Clean input and output buffer of serial port - self._ser.flushInput() - self._ser.flushOutput() - # Send command to read Voltage at Shunt resistor - # Prepare cmd - cmd = ('at\n\r') - i=0 - while (i < len(cmd)): - i = i + self._ser.write(cmd[i].encode('ascii')) - time.sleep(0.05) - self._ser.read(1) - res0 = [] - while (self._ser.inWaiting() > 0): # if incoming bytes are waiting to be read from the serial input buffer - res0.append(self._ser.read(1).decode('ascii')) # read the bytes and convert from binary array to ASCII - print(res0) - #CHECK COM FIRST - cmd = ('at+in?\n\r') - i = 0 - - # Write, 1 by 1 byte at a time to avoid hanging of serial receiver code of listener, emphasis being made at sleep of 50 ms. - while (i < len(cmd)): - i = i + self._ser.write(cmd[i].encode('ascii')) - time.sleep(0.05) - self._ser.read(1) - - # Read, 1 by 1 byte - res = [] - while (self._ser.inWaiting() > 0): # if incoming bytes are waiting to be read from the serial input buffer - res.append(self._ser.read(1).decode('ascii')) # read the bytes and convert from binary array to ASCII - - string = ''.join(res) - string = string.replace('\n', '') - try: - self._current = float(int(string, 0)) / self._vshuntfactor - except: - self.fail("failed: CAN'T READ CONSUMPTION (CURRENT=0?)") - error=1 - return -1 - if error==0: - print(self._current) - # In order to give a valid result it is importarnt to define an under current value - if (self._current > float(self._overcurrent)): - self.fail("failed: OVERCURRENT DETECTED ( {} )".format(self._current)) - return -1 - - if (self._current < float(self._undercurrent)): - self.fail("failed: UNDERCURRENT DETECTED ( {} )".format(self._current)) - return -1 - - diff --git a/test-cli/test/tests/qamper.py b/test-cli/test/tests/qamper.py new file mode 100644 index 0000000..96b673c --- /dev/null +++ b/test-cli/test/tests/qamper.py @@ -0,0 +1,74 @@ +import unittest +from test.helpers.amper import Amper + + +class Qamper(unittest.TestCase): + params = None + __resultlist = None # resultlist is a python list of python dictionaries + dummytest = False + + # varlist: undercurrent, overcurrent + def __init__(self, testname, testfunc, varlist): + self.params = varlist + super(Qamper, self).__init__(testfunc) + + if "undercurrent" in varlist: + self._undercurrent = varlist["undercurrent"] + else: + raise Exception('undercurrent param inside Qamp must be defined') + if "overcurrent" in varlist: + self._overcurrent = varlist["overcurrent"] + else: + raise Exception('overcurrent param inside Qamp must be defined') + self._testMethodDoc = testname + self.__resultlist = [] + if "dummytest" in varlist: + self.dummytest = varlist["dummytest"] + + def saveresultinfile(self, currentvalue): + # save result in a file + with open('/mnt/station_ramdisk/amper.txt', 'w') as outfile: + n = outfile.write("Current: {} A".format(currentvalue)) + outfile.close() + self.__resultlist.append( + { + "description": "Amperimeter values", + "filepath": "/mnt/station_ramdisk/amper.txt", + "mimetype": "text/plain" + } + ) + + def execute(self): + if self.dummytest == '1': + self.current = "0.0" + self.saveresultinfile(self.current) + return + + amp = Amper() + # open serial connection + if not amp.open(): + self.fail("Error: can not open a serial port") + # check if the amperimeter is connected and working + # 2 ATTEMTS + if not amp.hello(): + if not amp.hello(): + self.fail("Error: can not communicate") + # get current value (in Amperes) + self.current = amp.getCurrent() + # save result in a file + self.saveresultinfile(self.current) + # close serial connection + amp.close() + # Check current range + if float(self.current) > float(self._overcurrent): + # Overcurrent detected + self.fail("failed: Overcurrent detected ( {} )".format(self.current)) + elif float(self.current) < float(self._undercurrent): + # Undercurrent detected + self.fail("failed: Undercurrent detected ( {} )".format(self.current)) + + def getresults(self): + return self.__resultlist + + def gettextresult(self): + return "{} A".format(self.current) diff --git a/test-cli/test/tests/qaudio.py b/test-cli/test/tests/qaudio.py index a1572ca..a219a1a 100644 --- a/test-cli/test/tests/qaudio.py +++ b/test-cli/test/tests/qaudio.py @@ -1,37 +1,100 @@ -from test.helpers.syscmd import SysCommand import unittest -#class name +import sh +import wave +import contextlib +import uuid +import time +from test.helpers.utils import save_result +from test.helpers.iseelogger import logObj + class Qaudio(unittest.TestCase): - # Initialize the variables + params = None + __resultlist = None # resultlist is a python list of python dictionaries + __dtmf_secuence = ["1", "3", "5", "7", "9"] + __dtmf_file = None + __file_uuid = None + __QaudioName = None - def __init__(self, testname, testfunc, dtmfFile): - # Doing this we will initialize the class and later on perform a particular method inside this class + def __init__(self, testname, testfunc, varlist): + self.params = varlist super(Qaudio, self).__init__(testfunc) self._testMethodDoc = testname - self._dtmfFile=dtmfFile - self.__sum=0 - self.__refSum = 25 # 1+3+5+7+9 + self.__resultlist = [] + self.__xmlObj = varlist["xml"] + self.__QaudioName = varlist.get('name', 'qaudio') + self.__dtmf_file_play = varlist.get('play_dtmf_file', self.__xmlObj.getKeyVal(self.__QaudioName, "play_dtmf_file", "dtmf-13579.wav")) + self.__recordtime = varlist.get('recordtime', self.__xmlObj.getKeyVal(self.__QaudioName, "recordtime", "0.15")) + self.__dtmf_file_record = varlist.get('record_dtmf_file', self.__xmlObj.getKeyVal(self.__QaudioName, "record_dtmf_file", "record-{}.wav")) + self.__sampling_rate = varlist.get('rate', self.__xmlObj.getKeyVal(self.__QaudioName, "rate", "8000")) + self.__record_path = varlist.get('to', self.__xmlObj.getKeyVal(self.__QaudioName, "to", "/mnt/station_ramdisk")) + self.__run_delay = float(varlist.get('aplay_run_delay', self.__xmlObj.getKeyVal(self.__QaudioName, "aplay_run_delay", "0.0"))) + self.__file_uuid = uuid.uuid4() + self.__dtmf_file_record = self.__dtmf_file_record.format(self.__file_uuid) + + def restoreAlsaCtl(self): + try: + sh.alsactl('restore'); + except Exception as E: + logObj.getlogger().error("Failed to Execite alsactl restore"); def execute(self): - str_cmd = "aplay test/files/dtmf-13579.wav 2> /dev/null & arecord -r 8000 -d 1 recorded.wav 2> /dev/null" #.format(self.__dtmfFile) - audio_loop = SysCommand("command-name", str_cmd) - if audio_loop.execute() == 0:# BUG: Returns -1 but work - lines = audio_loop.getOutput().splitlines() - str_cmd = "multimon -t wav -a DTMF recorded.wav -q 2> /dev/null" - dtmf_decoder = SysCommand("command-name", str_cmd) - if dtmf_decoder.execute() == 0: # BUG: Returns -1 but work - self.__raw_out = dtmf_decoder.getOutput() - if self.__raw_out == "": - return -1 - lines = dtmf_decoder.getOutput().splitlines() - for i in range(0, 5): - aux=[int(s) for s in lines[i].split() if s.isdigit()] - self.__sum=self.__sum+aux[0] - self.failUnless(self.__sum == self.__refSum), "failed: incorrect dtmf code" + str(self.__sum) + self.restoreAlsaCtl() + try: + # analize audio file + calcRecordTime = self.calc_audio_duration(self.__dtmf_file_play) + float(self.__recordtime) + float(0.1) + except TypeError as Error: + self.fail("failed: TypeError: {}.".Error) + + # previous play to estabilise audio lines + p2 = sh.arecord("-r", self.__sampling_rate, "-d", calcRecordTime, "{}/{}".format(self.__record_path, self.__dtmf_file_record), _bg=True) + time.sleep(self.__run_delay) + p1 = sh.aplay(self.__dtmf_file_play) + p2.wait() + + p2 = sh.arecord("-r", self.__sampling_rate, "-d", calcRecordTime, "{}/{}".format(self.__record_path, self.__dtmf_file_record), _bg=True) + time.sleep(self.__run_delay) + p1 = sh.aplay(self.__dtmf_file_play) + p2.wait() + if p1.exit_code == 0 and p2.exit_code == 0: + p = sh.multimon("-t", "wav", "-a", "DTMF", "{}/{}".format(self.__record_path, self.__dtmf_file_record), "-q") + if p.exit_code == 0: + lines = p.stdout.decode('ascii').splitlines() + self.__dtmf_secuence_result = [] + for li in lines: + if li.split(" ")[0] == "DTMF:": + self.__dtmf_secuence_result.append(li.split(" ")[1]) + self.__dtmf_secuence_result = sorted(list(set(self.__dtmf_secuence_result))) + # compare original and processed dtmf sequence + if len(self.__dtmf_secuence) == len(self.__dtmf_secuence_result): + for a, b in zip(self.__dtmf_secuence, self.__dtmf_secuence_result): + if a != b: + logObj.getlogger().debug( + "failed: sent and received DTMF sequence do not match.{} != {}".format( + self.__dtmf_secuence, self.__dtmf_secuence_result)) + self.fail("failed: sent and received DTMF sequence do not match.") + save_result(filePath='{}/{}'.format(self.__record_path, self.__dtmf_file_record), + description='sound record', + mime='audio/x-wav', + result=self.__resultlist) + else: + logObj.getlogger().debug("received DTMF sequence is shorter than expected.{} {}".format(self.__dtmf_secuence,self.__dtmf_secuence_result)) + self.fail("received DTMF sequence is shorter than expected") else: - self.fail("failed: fail reading recorded file") - return -1 + self.fail("failed: unable to use multimon command.") else: - self.fail("failed: fail playing/recording file") - return -1 - return 0 + self.fail("failed: unable to play/record audio.") + + def getresults(self): + return self.__resultlist + + def gettextresult(self): + return "" + + def calc_audio_duration(self, fname): + duration = 0.0 + with contextlib.closing(wave.open(fname, 'r')) as f: + frames = f.getnframes() + rate = f.getframerate() + duration = frames / float(rate) + f.close() + return duration diff --git a/test-cli/test/tests/qbutton.py b/test-cli/test/tests/qbutton.py deleted file mode 100644 index 72a897e..0000000 --- a/test-cli/test/tests/qbutton.py +++ /dev/null @@ -1,54 +0,0 @@ -from test.helpers.syscmd import SysCommand -import unittest -import uuid -import time - -class Qbutton(unittest.TestCase): - - def __init__(self, testname, testfunc, gpio): - if gpio == "SOPA": - super(Qbutton, self).__init__("buttonSopa") - else: - super(Qbutton, self).__init__("buttonGpio") - self._testMethodDoc = testname - - def buttonGpio(self): - print("normal-button-test-using-gpio") - self.fail("failed: GPIO BUTTON FAIL") - - def buttonSopa(self): - str_cmd = "i2cset -f -y 1 0x2d 0x40 0x31" - disable_pmic = SysCommand("disable_pmic", str_cmd) - disable_pmic.execute() - # BUG: REPEAT THIS EXECUTION TWICE BECAUSE FIRST TIME IT RETURNS AN ERROR - led_on="echo 1 > /sys/class/leds/red\:usbhost/brightness" - ledon = SysCommand("led_on", led_on) - ledon.execute() - time.sleep(0.1) - disable_pmic.execute() - if disable_pmic.execute() == 0: - str_cmd = "i2cset -f -y 1 0x2d 0x50 0xff" - reset_button = SysCommand("reset_button", str_cmd) - if reset_button.execute() == 0: - str_cmd = "i2cget -f -y 1 0x2d 0x50" - get_button_val = SysCommand("get_button_val", str_cmd) - print("\n\t --> PRESS button for 1 sec (TIMEOUT: 10s) \n") - timeout = 0 - while timeout < 20: - if get_button_val.execute() == 0: - get_button_val.execute() - button_value = get_button_val.getOutput() - button_value=button_value.decode('ascii').split("x") - if int(button_value[1]) == 4: - timeout = 20 - time.sleep(0.5) - timeout = timeout + 1 - if timeout==20 and int(button_value[1]) == 0: - self.fail("failed: timeout exceeded") - else: - timeout = 20 - self.fail("failed: not button input") - else: - self.fail("failed: could not complete i2c reset button state") - else: - self.fail("failed: could not complete i2c disable PMIC") diff --git a/test-cli/test/tests/qdmesg.py b/test-cli/test/tests/qdmesg.py new file mode 100644 index 0000000..39a5047 --- /dev/null +++ b/test-cli/test/tests/qdmesg.py @@ -0,0 +1,32 @@ +import sh +import os +import os.path +from os import path + + +class Qdmesg: + params = None + __resultlist = None # resultlist is a python list of python dictionaries + + def __init__(self, testname, testfunc, varlist): + self.params = varlist + self.__testMethodDoc = testname + self.__resultlist = [] + self.pgObj = varlist["db"] + self.__xmlObj = varlist["xml"] + self.__QdmesgName = varlist.get('name', 'qdmesg') + self.__syslog_dmesg_file = varlist.get('syslog_dmesg',self.__xmlObj.getKeyVal(self.__QdmesgName, "syslog_dmesg", + "/var/log/kern.log")) + + def getTestName(self): + return self.__testMethodDoc + + def execute(self): + self.pgObj.run_test(self.params["testidctl"], self.params["testid"]) + if not os.path.isfile('{}'.format(self.__syslog_dmesg_file)): + self.pgObj.finish_test(self.params["testidctl"], self.params["testid"], "TEST_FAILED", "") + return False + self.pgObj.upload_result_file(self.params["testidctl"], self.params["testid"], "{}".format(self.__syslog_dmesg_file), + "{}".format(self.__syslog_dmesg_file), "text/plain") + self.pgObj.finish_test(self.params["testidctl"], self.params["testid"], "TEST_COMPLETE", "") + return True diff --git a/test-cli/test/tests/qduplex_ser.py b/test-cli/test/tests/qduplex_ser.py index 98bda81..170039b 100644 --- a/test-cli/test/tests/qduplex_ser.py +++ b/test-cli/test/tests/qduplex_ser.py @@ -1,46 +1,79 @@ -from test.helpers.syscmd import SysCommand import unittest import uuid import serial import time + class Qduplex(unittest.TestCase): + params = None + __port1 = None + __port2 = None + __baudrate = None + __resultlist = None # resultlist is a python list of python dictionaries - def __init__(self, testname, testfunc, port1, port2, baudrate): + # varlist: port1, port2, baudrate + def __init__(self, testname, testfunc, varlist): + self.params = varlist super(Qduplex, self).__init__(testfunc) - self.__port1 = port1 - self.__serial1 = serial.Serial(self.__port1, timeout=1) - self.__serial1.baudrate = baudrate - - self.__port2 = port2 - self.__serial2 = serial.Serial(self.__port2, timeout=1) - self.__serial2.baudrate = baudrate + if "port1" in varlist: + self.__port1 = varlist["port1"] + else: + raise Exception('port1 param inside Qduplex must be defined') + if "port2" in varlist: + self.__port2 = varlist["port2"] + else: + raise Exception('port2 param inside Qduplex must be defined') + if "baudrate" in varlist: + self.__baudrate = varlist["baudrate"] + else: + raise Exception('baudrate param inside Qduplex must be defined') self._testMethodDoc = testname + self.__resultlist = [] + + # open serial connection + self.__serial1 = serial.Serial(self.__port1, self.__baudrate, timeout=1) + self.__serial2 = serial.Serial(self.__port2, self.__baudrate, timeout=1) def __del__(self): self.__serial1.close() self.__serial2.close() def execute(self): + # generate a random uuid + test_uuid = str(uuid.uuid4()).encode() + + # clean serial ports self.__serial1.flushInput() self.__serial1.flushOutput() self.__serial2.flushInput() self.__serial2.flushOutput() - test_uuid = str(uuid.uuid4()).encode() + # send the uuid through serial port self.__serial1.write(test_uuid) time.sleep(0.05) # there might be a small delay if self.__serial2.inWaiting() == 0: - self.fail("failed: PORT {} wait timeout exceded, wrong communication?".format(self.__port2)) + self.fail("failed: port {} wait timeout exceded".format(self.__port2)) else: - if (self.__serial2.readline() != test_uuid): - self.fail("failed: PORT {} write/read mismatch".format(self.__port2)) + if self.__serial2.readline() != test_uuid: + self.fail("failed: port {} write/read mismatch".format(self.__port2)) - test_uuid = str(uuid.uuid4()).encode() + time.sleep(0.05) # there might be a small delay + + # clean serial ports + self.__serial1.flushInput() + self.__serial1.flushOutput() + self.__serial2.flushInput() + self.__serial2.flushOutput() + # send the uuid through serial port self.__serial2.write(test_uuid) time.sleep(0.05) # there might be a small delay if self.__serial1.inWaiting() == 0: - self.fail("failed: PORT {} wait timeout exceded, wrong communication?".format(self.__port1)) + self.fail("failed: port {} wait timeout exceded".format(self.__port1)) else: - if (self.__serial1.readline() != test_uuid): - self.fail("failed: PORT {} write/read mismatch".format(self.__port1)) - + if self.__serial1.readline() != test_uuid: + self.fail("failed: port {} write/read mismatch".format(self.__port1)) + + def getresults(self): + return self.__resultlist + + def gettextresult(self): + return "" diff --git a/test-cli/test/tests/qeeprom.py b/test-cli/test/tests/qeeprom.py index f72f78f..7900381 100644 --- a/test-cli/test/tests/qeeprom.py +++ b/test-cli/test/tests/qeeprom.py @@ -1,38 +1,68 @@ -from test.helpers.syscmd import SysCommand import unittest -import uuid +import os class Qeeprom(unittest.TestCase): + params = None + __resultlist = None # resultlist is a python list of python dictionaries + __QEepromName = None + __i2cBus = None + __device = None - def __init__(self, testname, testfunc): + # varlist: position, eeprompath + def __init__(self, testname, testfunc, varlist): + self.params = varlist super(Qeeprom, self).__init__(testfunc) self._testMethodDoc = testname + self.__xmlObj = varlist["xml"] + self.__QEepromName = varlist.get('name', 'qeeprom') + self.__i2cAddress = varlist.get('i2c_address', self.__xmlObj.getKeyVal(self.__QEepromName, "i2c_address", "0050")) + self.__i2cBus = varlist.get('i2c_bus', self.__xmlObj.getKeyVal(self.__QEepromName, "i2c_bus", "0")) + self.__device = '/sys/bus/i2c/devices/{}-{}/eeprom'.format(self.__i2cBus, self.__i2cAddress) + + self.__resultlist = [] + + def __check_device(self): + if os.path.isfile(self.__device): + return True + return False + + def __eeprom_write(self, data): + try: + f = open(self.__device, "wb") + f.write(data) + f.close() + except IOError as Error: + return False, '{}'.format(Error.errno) + return True, '' + + def __eeprom_read(self, size): + try: + f = open(self.__device, "rb") + data = f.read(size) + f.close() + except IOError as Error: + return False, '{}'.format(Error.errno) + return True, data + def execute(self): - str_cmd = "find /sys/ -iname 'eeprom'" - eeprom_location = SysCommand("eeprom_location", str_cmd) - if eeprom_location.execute() == 0: - self.__raw_out = eeprom_location.getOutput() - if self.__raw_out == "": - self.fail("Unable to get EEPROM location. IS EEPROM CONNECTED?") - return -1 - eeprom=self.__raw_out.decode('ascii') - test_uuid = uuid.uuid4() - str_cmd="echo '{}' > {}".format(str(test_uuid), eeprom) - eeprom_write = SysCommand("eeprom_write", str_cmd) - if eeprom_write.execute() == 0: - self.__raw_out = eeprom_write.getOutput() - if self.__raw_out == "": - self.fail("Unable to write on the EEPROM?") - return -1 - str_cmd = "head -2 {}".format(eeprom) - eeprom_read = SysCommand("eeprom_read", str_cmd) - if eeprom_read.execute() == 0: - self.__raw_out = eeprom_read.getOutput() - if self.__raw_out == "": - self.fail("Unable to read from the EEPROM?") - return -1 - if(str(self.__raw_out).find(str(test_uuid)) == -1): - self.fail("failed: READ/WRITE mismatch") - else: - self.fail("failed: could not complete find eeprom command") + if not self.__check_device(): + self.fail("cannot open device {}".format(self.__device)) + + data = bytes('AbcDeFgHijK098521', 'utf-8') + + v, w = self.__eeprom_write(data) + if not v: + self.fail("eeprom: {} write test failed".format(w)) + v, r = self.__eeprom_read(len(data)) + if not v: + self.fail("eeprom: {} read test failed".format(r)) + if r != data: + self.fail("mismatch between write and read bytes") + + + def getresults(self): + return self.__resultlist + + def gettextresult(self): + return "" diff --git a/test-cli/test/tests/qethernet.py b/test-cli/test/tests/qethernet.py index 2ac447c..7d081a1 100644 --- a/test-cli/test/tests/qethernet.py +++ b/test-cli/test/tests/qethernet.py @@ -1,57 +1,124 @@ -from test.helpers.syscmd import SysCommand import unittest - +import sh +import json +import time +import uuid +import iperf3 +import netifaces +from test.helpers.utils import save_json_to_disk +from test.helpers.utils import station2Port class Qethernet(unittest.TestCase): - __sip = None - __raw_out = None - __MB_req = None - __MB_real = None - __BW_real = None - __dat_list = None - __bind = None - __OKBW = None - - def __init__(self, testname, testfunc, sip = None, OKBW=100, bind=None): + __serverip = None + __numbytestx = None + __bwexpected = None + __port = None + params = None + __bwreal = None + __resultlist = None # resultlist is a python list of python dictionaries + timebetweenattempts = 5 + + # varlist content: serverip, bwexpected, port + def __init__(self, testname, testfunc, varlist): + self.params = varlist super(Qethernet, self).__init__(testfunc) - if sip is not None: - self.__sip = sip - if sip is not None: - self.__bind = bind - self.__MB_req = '10' - self.__OKBW=OKBW self._testMethodDoc = testname + self.__xmlObj = varlist["xml"] + self.__QEthName = varlist.get('name', 'qeth') + self.__loops = varlist.get('loops', self.__xmlObj.getKeyVal(self.__QEthName, "loops", "1")) + self.__port = varlist.get('port', self.__xmlObj.getKeyVal(self.__QEthName, "port", "5000")) + self.__bw = varlist.get('bwexpected', self.__xmlObj.getKeyVal(self.__QEthName, "bwexpected", "40.0")) + self.__serverip = varlist.get('serverip', self.__xmlObj.getKeyVal(self.__QEthName, "serverip", "localhost")) + self.__duration = varlist.get('duration', self.__xmlObj.getKeyVal(self.__QEthName, "duration", "10")) + self.__interface = varlist.get('interface', self.__xmlObj.getKeyVal(self.__QEthName, "interface", "eth0")) + self.__toPath = varlist.get('to', self.__xmlObj.getKeyVal(self.__QEthName, "to", "/mnt/station_ramdisk")) + self.__retriesCount = varlist.get('retries', self.__xmlObj.getKeyVal(self.__QEthName, "retries", "5")) + self.__waitRetryTime = varlist.get('wait_retry', self.__xmlObj.getKeyVal(self.__QEthName, "wait_retry", "10")) + self.__wifi_res_file = varlist.get('eth_res_file', self.__xmlObj.getKeyVal(self.__QEthName, "eth_res_file", "eth_test_{}.json")) + self.__wifi_st_file = varlist.get('eth_st_file', self.__xmlObj.getKeyVal(self.__QEthName, "eth_st_file", "eth_st_{}.json")) + self.__file_uuid = uuid.uuid4() + self.__wifi_res_file = self.__wifi_res_file.format(self.__file_uuid) + self.__wifi_st_file = self.__wifi_st_file.format(self.__file_uuid) + + self.__resultlist = [] + + def checkInterface(self, reqInterface): + ifaces = netifaces.interfaces() + for t in ifaces: + if t == reqInterface: + return True + return False def execute(self): - print - if self.__bind is None: - str_cmd = "iperf -c {} -x CMSV -n {}M".format(self.__sip, self.__MB_req) - else: - str_cmd = "iperf -c {} -x CMSV -n {}M -B {}".format(self.__sip, self.__MB_req, self.__bind) - iperf_command = SysCommand("iperf", str_cmd) - if iperf_command.execute() == 0: - self.__raw_out = iperf_command.getOutput() - if self.__raw_out == "": - return -1 - lines = iperf_command.getOutput().splitlines() - dat = lines[1] - dat = dat.decode('ascii') - dat_list = dat.split( ) - for d in dat_list: - a = dat_list.pop(0) - if a == "sec": - break - self.__MB_real = dat_list[0] - self.__BW_real = dat_list[2] - self.__dat_list = dat_list - #print(self.__MB_real) - #print(self.__BW_real) - self.failUnless(float(self.__BW_real)>float(self.__OKBW)*0.9,"failed: speed is lower than spected. Speed(MB/s)" + str(self.__BW_real)) + self.__resultlist = [] + # check interfaces + if not self.checkInterface(self.__interface): + self.fail('Interface not found: {}'.format(self.__interface)) + + # Start Iperf + client = iperf3.Client() + client.duration = int(self.__duration) + client.server_hostname = self.__serverip + client.port = station2Port(self.__port) + count = int(self.__retriesCount) + while count > 0: + result = client.run() + if result.error == None: + if result.received_Mbps >= float(self.__bw) and result.sent_Mbps >= float(self.__bw): + save_json_to_disk(filePath='{}/{}'.format(self.__toPath, self.__wifi_res_file), + description='eth {} iperf3'.format(self.__interface), + mime='application/json', + json_data=result.json, + result=self.__resultlist) + return + else: + self.fail('bw fail: rx:{} tx:{}'.format(result.received_Mbps, result.sent_Mbps)) + else: + count = count - 1 + time.sleep(int(self.__waitRetryTime)) + + self.fail('qEth (max tries) Execution error: {}'.format(result.error)) + + def execute2(self): + # execute iperf command against the server, but it implements attempts in case the server is busy + iperfdone = False + while not iperfdone: + try: + p = sh.iperf3("-c", self.__serverip, "-n", self.__numbytestx, "-f", "m", "-p", self.__port, "-J", + _timeout=20) + iperfdone = True + except sh.TimeoutException: + self.fail("failed: iperf timeout reached") + except sh.ErrorReturnCode: + time.sleep(self.timebetweenattempts) + + # check if it was executed succesfully + if p.exit_code == 0: + if p.stdout == "": + self.fail("failed: error executing iperf command") + # analyze output string + data = json.loads(p.stdout.decode('ascii')) + self.__bwreal = float(data['end']['sum_received']['bits_per_second'])/1024/1024 + # save result file + with open('/mnt/station_ramdisk/ethernet-iperf3.json', 'w') as outfile: + json.dump(data, outfile, indent=4) + outfile.close() + self.__resultlist.append( + { + "description": "iperf3 output", + "filepath": "/mnt/station_ramdisk/ethernet-iperf3.json", + "mimetype": "application/json" + } + ) + + # check if BW is in the expected range + if self.__bwreal < float(self.__bwexpected): + self.fail("failed: speed is lower than spected. Speed(Mbits/s): " + str(self.__bwreal)) else: - self.fail("failed: could not complete iperf command") + self.fail("failed: could not complete iperf command.") - def get_Total_MB(self): - return self.__MB_real; + def getresults(self): + return self.__resultlist - def get_Total_BW(self): - return self.__MB_real; + def gettextresult(self): + return "" diff --git a/test-cli/test/tests/qflash.py b/test-cli/test/tests/qflash.py deleted file mode 100644 index cdc4109..0000000 --- a/test-cli/test/tests/qflash.py +++ /dev/null @@ -1,77 +0,0 @@ -from test.helpers.syscmd import SysCommand -import unittest -from test.helpers.globalVariables import globalVar - -class Qflasher(unittest.TestCase): - - def __init__(self, testname, testfunc): - super(Qflasher, self).__init__(testfunc) - self._testMethodDoc = testname - model=globalVar.g_mid - if model.find("IGEP0046") == 0: - self._flash_method = "mx6" - self._binlocation="/boot/" - elif model.find("IGEP0000") == 0: - self._flash_method = "mx6" - self._binlocation="/boot/" - elif model.find("IGEP0034") == 0: - self._flash_method = "nandti" - self._binlocation = "/boot/" - elif model.find("SOPA") == 0: - self._flash_method = "sopaflash" - elif model.find("OMAP3") == 0: - self._flash_method = "nandti" - self._binlocation = "/boot/" - elif model.find("OMAP5") == 0: - self._flash_method = "nandti" - self._binlocation = "/boot/" - - - def execute(self): - # CASE eMMC boards. - if(self._flash_method == "mx6"): - str_cmd= "dd if={}u-boot.imx of=/dev/mmcblk2 bs=512 seek=2 2>&1 >/dev/null".format(self._binlocation) - flash_command = SysCommand("flash_command", str_cmd) - if flash_command.execute() == 0: - str_cmd = "sync" - sync_command = SysCommand("sync_command", str_cmd) - if sync_command.execute() != 0: - self.fail("failed: could not sync") - else: - self.fail("failed: could not complete flash eMMC commands") - # CASE of nandflash boards - elif(self._flash_method == "nandti"): - str_cmd = "nandwrite -p -s 0x0 /dev/mtd0 {}u-boot.img 2>&1 >/dev/null; " \ - "nandwrite -p -s 0x20000 /dev/mtd0 {}u-boot.img 2>&1 >/dev/null; " \ - "nandwrite -p -s 0x40000 /dev/mtd0 {}u-boot.img 2>&1 >/dev/null; " \ - "nandwrite -p -s 0x60000 /dev/mtd0 {}u-boot.img 2>&1 >/dev/null; " \ - "".format(self._binlocation, self._binlocation, self._binlocation, self._binlocation,) - flash_MLO_command = SysCommand("flash_MLO_command", str_cmd) - if flash_MLO_command.execute() == 0: - str_cmd= "nandwrite -p /dev/mtd1 {}u-boot.img 2>&1 >/dev/null".format(self._binlocation) - flash_uBoot_command = SysCommand("flash_command", str_cmd) - if flash_uBoot_command.execute() == 0: - str_cmd = "sync" - sync_command = SysCommand("sync_command", str_cmd) - if sync_command.execute() != 0: - self.fail("failed: could not sync") - else: - self.fail("failed: could not complete flash u-boot commnds") - else: - self.fail("failed: could not complete flash MLO commnd") - - - # CASE of SOPA0000 BOARD. FULL FLASH (u-boot+kernel+rootfs) - elif (self._flash_method == "sopaflash"): - str_cmd = "nandtest /dev/mtd0 >/dev/null" - flash_command = SysCommand("flash_command", str_cmd) - if flash_command.execute() == 0: - str_cmd = "/usr/bin/igep-flash --skip-nandtest --image /opt/firmware/demo-ti-image-*-*.tar* >/dev/null 2>&1" - sync_command = SysCommand("sync_command", str_cmd) - if sync_command.execute() != 0: - self.fail("failed: could not complete flashing procces") - else: - self.fail("failed: could not complete nandtest mtd0") - - else: - self.fail("failed: could not find flash method") diff --git a/test-cli/test/tests/qi2c.py b/test-cli/test/tests/qi2c.py deleted file mode 100644 index 409005c..0000000 --- a/test-cli/test/tests/qi2c.py +++ /dev/null @@ -1,29 +0,0 @@ -from test.helpers.syscmd import SysCommand -import unittest - -class Qi2c(unittest.TestCase): - - def __init__(self, testname, testfunc, busnum, register): - super(Qi2c, self).__init__(testfunc) - self.__busnum = busnum - self.__register = register.split("/") - self.__devices=[] - self._testMethodDoc = testname - - def execute(self): - str_cmd= "i2cdetect -a -y -r {}".format(self.__busnum) - i2c_command = SysCommand("i2cdetect", str_cmd) - if i2c_command.execute() == 0: - self.__raw_out = i2c_command.getOutput() - if self.__raw_out == "": - return -1 - lines=self.__raw_out.decode('ascii').splitlines() - for i in range(len(lines)): - if (lines[i].count('UU')): - if (lines[i].find("UU")): - self.__devices.append("0x{}{}".format((i - 1), hex(int((lines[i].find("UU") - 4) / 3)).split('x')[-1])) - for i in range(len(self.__register)): - if not(self.__register[i] in self.__devices): - self.fail("failed: device {} not found in bus i2c-{}".format(self.__register[i], self.__busnum)) - else: - self.fail("failed: could not complete i2cdedtect command") diff --git a/test-cli/test/tests/qiperf.py b/test-cli/test/tests/qiperf.py deleted file mode 100644 index 126b6ee..0000000 --- a/test-cli/test/tests/qiperf.py +++ /dev/null @@ -1,53 +0,0 @@ -from test.helpers.syscmd import SysCommand - - -class QIperf(object): - __sip = None - __raw_out = None - __MB_req = None - __MB_real = None - __BW_real = None - __dat_list = None - __bind = None - - def __init__(self, sip = None): - if sip is not None: - self.__sip = sip - self.__MB_req = '10' - - def execute(self, sip = None, bind = None): - if sip is not None: - self.__sip = sip - - if bind is None: - str_cmd = "iperf -c {} -x CMSV -n {}M".format(self.__sip, self.__MB_req) - else: - self.__bind = bind - str_cmd = "iperf -c {} -x CMSV -n {}M -B {}".format(self.__sip, self.__MB_req, self.__bind) - t = SysCommand("iperf", str_cmd) - if t.execute() == 0: - self.__raw_out = t.getOutput() - if self.__raw_out == "": - return -1 - lines = t.getOutput().splitlines() - dat = lines[1] - dat = dat.decode('ascii') - dat_list = dat.split( ) - for d in dat_list: - a = dat_list.pop(0) - if a == "sec": - break - self.__MB_real = dat_list[0] - self.__BW_real = dat_list[2] - self.__dat_list = dat_list - print(self.__MB_real) - print(self.__BW_real) - else: - return -1 - return 0 - - def get_Total_MB(self): - return self.__MB_real; - - def get_Total_BW(self): - return self.__MB_real; diff --git a/test-cli/test/tests/qmmcflash.py b/test-cli/test/tests/qmmcflash.py new file mode 100644 index 0000000..0f5a0c1 --- /dev/null +++ b/test-cli/test/tests/qmmcflash.py @@ -0,0 +1,55 @@ +import unittest +import scanf +from scanf import scanf +from sh import mmc +from sh import ErrorReturnCode +from test.helpers.utils import save_file_to_disk +from test.helpers.utils import sys_read + +class Qmmcflash(unittest.TestCase): + params = None + + def __init__(self, testname, testfunc, varlist): + self.params = varlist + super(Qmmcflash, self).__init__(testfunc) + self._testMethodDoc = testname + self.__xmlObj = varlist["xml"] + + self.__QeMMCName = varlist.get('name', 'qemmc') + self.__emmcDevice = varlist.get('device', self.__xmlObj.getKeyVal(self.__QeMMCName, "device", "mmcblk0")) + self.__toPath = varlist.get('to', self.__xmlObj.getKeyVal(self.__QeMMCName, "to", "/mnt/station_ramdisk")) + self.__mmc_res_file = varlist.get('emmc_res_file', self.__xmlObj.getKeyVal(self.__QeMMCName, "emmc_res_file", "emmc_status.txt")) + self.__mmcPort = varlist.get('mmc_port', self.__xmlObj.getKeyVal(self.__QeMMCName, "mmc_port", "0")) + self.__mmcID = varlist.get('mmc_id', self.__xmlObj.getKeyVal(self.__QeMMCName, "mmc_id", "0001")) + + self.__resultlist = [] + + def execute(self): + try: + dataOut = mmc('extcsd', 'read', '/dev/{}'.format(self.__emmcDevice)) + save_file_to_disk(filePath='{}/{}'.format(self.__toPath, self.__mmc_res_file), + description='eMMC health test', + mime='text/plain', + data=dataOut.stdout.decode('utf-8'), + result=self.__resultlist) + sysDevice = "/sys/bus/mmc/drivers/mmcblk/mmc{}:{}".format(self.__mmcPort, self.__mmcID) + r, data = sys_read("{}/life_time".format(sysDevice)) + if not r: + self.fail("emmc: life_time not found") + res = scanf("0x%d 0x%d", data) + if res[0] > 3 or res[1] > 3: + self.fail("emmc: review {} life_time > 3".format(sysDevice)) + r, data = sys_read("{}/pre_eol_info".format(sysDevice)) + if not r: + self.fail("emmc: pre_eol_info not found") + res = scanf("0x%d", data) + if res[0] != 1: + self.fail("emmc: review {} pre_eol_info != 1".format(sysDevice)) + except ErrorReturnCode as Error: + self.fail("emmc: failed {} ".format(Error.exit_code)) + + def getresults(self): + return self.__resultlist + + def gettextresult(self): + return ""
\ No newline at end of file diff --git a/test-cli/test/tests/qnand.py b/test-cli/test/tests/qnand.py new file mode 100644 index 0000000..6de1eaf --- /dev/null +++ b/test-cli/test/tests/qnand.py @@ -0,0 +1,48 @@ +import unittest +import sh +import uuid +from test.helpers.utils import save_file_to_disk + +class Qnand(unittest.TestCase): + params = None + __device = "10M" + __resultlist = None # resultlist is a python list of python dictionaries + + # varlist: device + def __init__(self, testname, testfunc, varlist): + self.params = varlist + super(Qnand, self).__init__(testfunc) + self._testMethodDoc = testname + self.__xmlObj = varlist["xml"] + self.__QNandName = varlist.get('name', 'qnand') + self.__device = varlist.get('device', self.__xmlObj.getKeyVal(self.__QNandName, "device", "/dev/mtd0")) + self.__nand_res_file = varlist.get('nand_res_file', self.__xmlObj.getKeyVal(self.__QNandName, "nand_res_file", "nand_test_{}.txt")) + self.__toPath = varlist.get('to', self.__xmlObj.getKeyVal(self.__QNandName, "to", "/mnt/station_ramdisk")) + self.__file_uuid = uuid.uuid4() + self.__nand_res_file = self.__nand_res_file.format(self.__file_uuid) + nandtestPath = self.__xmlObj.getKeyVal("qnand", "nandtestPath", '/root/hwtest-files/nandtest"') + + if nandtestPath == '': + self.myNandTest = sh.nandtest + else: + self.myNandTest = sh.Command(nandtestPath) + self.__resultlist = [] + + def execute(self): + try: + res = self.myNandTest("-m", self.__device) + save_file_to_disk(filePath='{}/{}'.format(self.__toPath, self.__nand_res_file), + description='nand {} test'.format(self.__device), + mime='text/plain', + data=res.stdout.decode('utf-8'), + result=self.__resultlist) + + except sh.ErrorReturnCode_1 as e: + self.fail("nandtest failed: {}".format(e.ErrorReturnCode.stdout)) + + + def getresults(self): + return self.__resultlist + + def gettextresult(self): + return "" diff --git a/test-cli/test/tests/qplc.py b/test-cli/test/tests/qplc.py new file mode 100644 index 0000000..28310c6 --- /dev/null +++ b/test-cli/test/tests/qplc.py @@ -0,0 +1,121 @@ +import shutil +import unittest +import os.path +import os +import sh +import stat +import time +import ping3 +from netifaces import AF_INET +import netifaces as ni + +from test.helpers.md5 import md5_file +from test.helpers.plc import dcpPLC +from test.helpers.iseelogger import logObj + +class Qplc(unittest.TestCase): + params = None + __resultlist = None # resultlist is a python list of python dictionaries + __QPLCName = None + __plc = None + __board_uuid = None + + def __init__(self, testname, testfunc, varlist): + self.params = varlist + super(Qplc, self).__init__(testfunc) + self._testMethodDoc = testname + self.__xmlObj = varlist["xml"] + self.__psdbObj = varlist["db"] + self.__board_uuid = varlist["boarduuid"] + + self.__QPLCName = varlist.get('name', 'qplc') + self.__firmware = varlist.get('firmware', self.__xmlObj.getKeyVal(self.__QPLCName, "firmware", "")) + self.__firmware_md5 = varlist.get('firmware_md5', self.__xmlObj.getKeyVal(self.__QPLCName, "firmware_md5", "")) + self.__factoryTool = varlist.get('factory_tool', self.__xmlObj.getKeyVal(self.__QPLCName, "factory_tool", "")) + self.__gen_ip = varlist.get('gen_ip', self.__xmlObj.getKeyVal(self.__QPLCName, "gen_ip", "0")) + self.__gen_mac = varlist.get('gen_mac', self.__xmlObj.getKeyVal(self.__QPLCName, "gen_mac", "0")) + self.__mtd_device = varlist.get('mtd_device', self.__xmlObj.getKeyVal(self.__QPLCName, "mtd_device", "/dev/mtd0")) + self.__firmware_Path = varlist.get('firmwarepath', self.__xmlObj.getKeyVal(self.__QPLCName, "firmwarepath", "/root/hwtest-files/firmware")) + self.__skipflash = varlist.get('skipflash', self.__xmlObj.getKeyVal(self.__QPLCName, "skipflash", "0")) + self.__plc = dcpPLC(self.__factoryTool, self.__mtd_device) + self.__factory_password = varlist.get('factory_password', self.__xmlObj.getKeyVal(self.__QPLCName, "factory_password", "0")) + self.__firmware_password = varlist.get('firmware_password', self.__xmlObj.getKeyVal(self.__QPLCName, "firmware_password", "0")) + self.__plc_reset_wait = varlist.get('plc_reset_wait', self.__xmlObj.getKeyVal(self.__QPLCName, "plc_reset_wait", "60")) + self.__resultlist = [] + + + def checkfiles(self): + if not os.path.isfile(self.__factoryTool): + return False, 'Factory tool Not found {}/{}'.format(self.__factoryTool) + if not os.path.isfile('{}/{}'.format(self.__firmware_Path, self.__firmware)): + return False, 'Firmware Not found {}/{}'.format(self.__firmware_Path, self.__firmware) + if not os.path.isfile('{}/{}'.format(self.__firmware_Path, self.__firmware_md5)): + return False, 'Firmware md5 Not found {}/{}'.format(self.__firmware_Path, self.__firmware_md5) + try: + mode = os.stat(self.__mtd_device).st_mode; + if not stat.S_ISCHR(mode): + return False, 'No device {} found'.format(self.__mtd_device) + except: + return False, 'No device {} found'.format(self.__mtd_device) + return True, '' + + def __flashMemory(self): + r, v = self.__plc.SaveFirmware('{}/{}'.format(self.__firmware_Path, self.__firmware)) + if not r: + self.fail(v) + + def execute(self): + # Check files + v, m = self.checkfiles() + if not v: + self.fail(m) + # do flash & verify + if self.__skipflash == '0': + self.__flashMemory() + # do Plc boot + self.__plc.setBootMode() + # Wait plc goes UP + if self.__gen_mac: + res, v = self.__psdbObj.get_plc_macaddr(self.__board_uuid) + if res == None and v == 'MAC limit by uuid': + self.fail('MAC limit by uuid') + elif res == None: + self.fail('BUG: ?????') + logObj.getlogger().info("MAC {}".format(res[0])) + plcMAC = res[0][0] + # wait for PLC to be turned on + attemptcounter = 0 + plcfound = False + while attemptcounter < 5 and not plcfound: + if self.__plc.discover(): + plcfound = True + else: + attemptcounter += 1 + time.sleep(5) + # mandatory wait before configuring the PLC chip + time.sleep(2) + self.__plc.set_plc2('SYSTEM.PRODUCTION.SECTOR0_UNLOCK_PASSWORD', + self.__factory_password, + 'SYSTEM.PRODUCTION.MAC_ADDR', + '{}'.format(plcMAC), + self.__firmware_password) + # plc reset to save changes + self.__plc.setPLCReset() + time.sleep(int(self.__plc_reset_wait)) + # calculate the IP of the GPLC0000 board to ping + ip_eth01 = ni.ifaddresses('eth0:1')[AF_INET][0]['addr'] # plc_test_ip = 10.10.1.113 + ip_eth01_splited = ip_eth01.split('.') + ip_eth01_splited[3] = str(int(ip_eth01_splited[3]) + 100) + plc_test_ip = ".".join(ip_eth01_splited) # plc_test_ip = 10.10.1.213 + # ping against the GPLC0000 board + ping3.EXCEPTIONS = True + try: + ping3.ping(plc_test_ip, timeout=0.1) + except: + self.fail('PLC ping timeout') + + def getresults(self): + return self.__resultlist + + def gettextresult(self): + return "" diff --git a/test-cli/test/tests/qram.py b/test-cli/test/tests/qram.py index 8ec0210..49d4d54 100644 --- a/test-cli/test/tests/qram.py +++ b/test-cli/test/tests/qram.py @@ -1,22 +1,62 @@ -from test.helpers.syscmd import SysCommand import unittest +import sh +# from test.helpers.iseelogger import MeasureTime +# from test.helpers.iseelogger import logObj +from test.helpers.utils import save_file_to_disk class Qram(unittest.TestCase): + params = None + __memsize = None + __loops = None + __resultlist = [] # resultlist is a python list of python dictionaries - def __init__(self, testname, testfunc, memSize): + # varlist: memsize, loops + def __init__(self, testname, testfunc, varlist): + self.params = varlist super(Qram, self).__init__(testfunc) - self.__memSize = memSize self._testMethodDoc = testname + self.__xmlObj = varlist["xml"] - def execute(self): - str_cmd= "free -m" - free_command = SysCommand("free_ram", str_cmd) - if free_command.execute() == 0: - self.__raw_out = free_command.getOutput() - if self.__raw_out == "": - return -1 - lines = free_command.getOutput().splitlines() - aux = [int(s) for s in lines[1].split() if s.isdigit()] - self.failUnless(int(aux[0])>int(self.__memSize),"failed: total ram memory size lower than expected") + self.__QramName = varlist.get('name', 'qram') + self.__loops = varlist.get('loops', self.__xmlObj.getKeyVal(self.__QramName, "loops", "1")) + self.__memsize = varlist.get('memsize', self.__xmlObj.getKeyVal(self.__QramName, "memsize", "50M")) + self.__toPath = varlist.get('to', self.__xmlObj.getKeyVal(self.__QramName, "to", "/mnt/station_ramdisk")) + self.__ram_res_file = varlist.get('ram_res_file', self.__xmlObj.getKeyVal(self.__QramName, "ram_res_file", "ram_res.txt")) + + self.__dummytest = varlist.get('dummytest', self.__xmlObj.getKeyVal(self.__QramName, "dummytest", "0")) + self.__dummyresult = varlist.get('dummytestresult', self.__xmlObj.getKeyVal(self.__QramName, "dummytest", "0")) + + memtesterPath = self.__xmlObj.getKeyVal(self.__QramName, "memtesterPath", '') + + if memtesterPath == '': + self.myMemtester = sh.memtester else: - self.fail("failed: could not complete iperf command") + self.myMemtester = sh.Command(memtesterPath) + + def execute(self): + self.__resultlist = [] + try: + # mytime = MeasureTime() + res = self.myMemtester(self.__memsize, '{}'.format(self.__loops)) + save_file_to_disk(filePath='{}/{}'.format(self.__toPath, self.__ram_res_file), + description='Ram result test', + mime='text/plain', + data=res.stdout.decode('utf-8'), + result=self.__resultlist) + # mytime.stop() + except sh.ErrorReturnCode as e: + save_file_to_disk(filePath='{}/{}'.format(self.__toPath, self.__ram_res_file), + description='Ram result test', + mime='text/plain', + data=e.stdout.decode('utf-8'), + result=self.__resultlist) + self.fail("failed: memtester {}::{}".format(str(e.exit_code), e.stdout.decode('utf-8'))) + + except Exception as details: + self.fail('Error: {}'.format(details)) + + def getresults(self): + return self.__resultlist + + def gettextresult(self): + return "" diff --git a/test-cli/test/tests/qrtc.py b/test-cli/test/tests/qrtc.py index 1d02f78..df493d2 100644 --- a/test-cli/test/tests/qrtc.py +++ b/test-cli/test/tests/qrtc.py @@ -1,28 +1,46 @@ -from test.helpers.syscmd import SysCommand +import sh import unittest import time +import re + class Qrtc(unittest.TestCase): + params = None + __rtc = None + __resultlist = None # resultlist is a python list of python dictionaries - def __init__(self, testname, testfunc, rtc): + def __init__(self, testname, testfunc, varlist): + self.params = varlist super(Qrtc, self).__init__(testfunc) - self.__rtc = rtc + if "rtc" in varlist: + self.__rtc = varlist["rtc"] + else: + raise Exception('rtc param inside Qrtc must be defined') self._testMethodDoc = testname + self.__resultlist = [] def execute(self): - str_cmd = "hwclock -f {}".format(self.__rtc) - rtc_set = SysCommand("rtc_set", str_cmd) - if rtc_set.execute() == 0: - curr_hour = rtc_set.getOutput().decode('ascii').split(" ") - time1 = int((curr_hour[4].split(":"))[2]) + # get time from RTC peripheral + p = sh.hwclock("-f", self.__rtc) + if p.exit_code == 0: + time1 = re.search("([01]?[0-9]{1}|2[0-3]{1})[:][0-5]{1}[0-9]{1}[:][0-5]{1}[0-9]{1}", + p.stdout.decode('ascii')) time.sleep(1) - if rtc_set.execute() == 0: - curr_hour = rtc_set.getOutput().decode('ascii').split(" ") - time2 = int((curr_hour[4].split(":"))[2]) - if time1==time2: + # get time from RTC another time + p = sh.hwclock("-f", self.__rtc) + if p.exit_code == 0: + time2 = re.search("([01]?[0-9]{1}|2[0-3]{1})[:][0-5]{1}[0-9]{1}[:][0-5]{1}[0-9]{1}", + p.stdout.decode('ascii')) + # check if the seconds of both times are different + if time1 == time2: self.fail("failed: RTC is not running") else: - self.fail("failed: couldn't execute hwclock command 2nd time") + self.fail("failed: couldn't execute hwclock command for the second time") else: self.fail("failed: couldn't execute hwclock command") + def getresults(self): + return self.__resultlist + + def gettextresult(self): + return "" diff --git a/test-cli/test/tests/qscreen.py b/test-cli/test/tests/qscreen.py deleted file mode 100644 index b8a5a48..0000000 --- a/test-cli/test/tests/qscreen.py +++ /dev/null @@ -1,31 +0,0 @@ -from test.helpers.syscmd import SysCommand -import unittest -import time -from test.helpers.cv_display_test import pattern_detect - -class Qscreen(unittest.TestCase): - - def __init__(self, testname, testfunc, display): - super(Qscreen, self).__init__(testfunc) - self.__display = display - self._testMethodDoc = testname - - def execute(self): - str_cmd = "fbi -T 1 --noverbose -d /dev/{} test/files/test_pattern.png".format(self.__display) - display_image = SysCommand("display_image", str_cmd) - if display_image.execute() == -1: - test_screen = pattern_detect(1) - if not test_screen=="0": - self.fail("failed: {}".format(test_screen)) - str_cmd= "fbi -T 1 /home/root/result_hdmi_img.jpg -d /dev/fb0 --noverbose -a" - show_img = SysCommand("show-image", str_cmd) - show_img.execute() - else: - self.fail("failed: could not display the image") - try: - str_cmd= "fbi -T 1 /home/root/result_hdmi_img.jpg -d /dev/fb0 --noverbose -a" - show_img = SysCommand("show-image", str_cmd) - show_img.execute() - except ValueError: - print("COULD NOT DISPLAY IMAGE") - diff --git a/test-cli/test/tests/qserial.py b/test-cli/test/tests/qserial.py index 43ba3c8..402e33f 100644 --- a/test-cli/test/tests/qserial.py +++ b/test-cli/test/tests/qserial.py @@ -1,30 +1,55 @@ -from test.helpers.syscmd import SysCommand import unittest import uuid import serial import time + class Qserial(unittest.TestCase): + params = None + __port = None + __baudrate = None + __resultlist = None # resultlist is a python list of python dictionaries - def __init__(self, testname, testfunc, port, baudrate): + # varlist: port, baudrate + def __init__(self, testname, testfunc, varlist): + self.params = varlist super(Qserial, self).__init__(testfunc) - self.__port = port - self.__serial = serial.Serial(self.__port, timeout=1) - self.__serial.baudrate = baudrate + if "port" in varlist: + self.__port = varlist["port"] + else: + raise Exception('port param inside Qserial must be defined') + + if "baudrate" in varlist: + self.__baudrate = varlist["baudrate"] + else: + raise Exception('baudrate param inside Qserial must be defined') self._testMethodDoc = testname + self.__resultlist = [] + + # open serial connection + self.__serial = serial.Serial(self.__port, self.__baudrate, timeout=1) def __del__(self): self.__serial.close() def execute(self): + # generate a random uuid + test_uuid = str(uuid.uuid4()).encode() + # clean serial port self.__serial.flushInput() self.__serial.flushOutput() - test_uuid = str(uuid.uuid4()).encode() + # send the uuid through serial port self.__serial.write(test_uuid) time.sleep(0.05) # there might be a small delay if self.__serial.inWaiting() == 0: - self.fail("failed: PORT {} wait timeout exceded, wrong communication?".format(self.__port)) + self.fail("failed: port {} wait timeout exceded".format(self.__port)) else: - if (self.__serial.readline() != test_uuid): - self.fail("failed: PORT {} write/read mismatch".format(self.__port)) + # check if what it was sent is equal to what has been received + if self.__serial.readline() != test_uuid: + self.fail("failed: port {} write/read mismatch".format(self.__port)) + + def getresults(self): + return self.__resultlist + def gettextresult(self): + return "" diff --git a/test-cli/test/tests/qtemplate.py b/test-cli/test/tests/qtemplate.py deleted file mode 100644 index 940cded..0000000 --- a/test-cli/test/tests/qtemplate.py +++ /dev/null @@ -1,51 +0,0 @@ -#IF COMMAND IS NEEDED -from test.helpers.syscmd import SysCommand -import unittest -#class name -class Qtemplate(unittest.TestCase): - # Initialize the variables - __variable1 = "Value-a" - __variable2 = "Value-b" - #.... - __variablen = "Value-n" - - def __init__(self, testname, testfunc, input1=None, inputn=None): - # Doing this we will initialize the class and later on perform a particular method inside this class - super(Qtemplate, self).__init__(testfunc) - self.__testname = testname - self.__input1 = input1 - self.__inputn = inputn - self._testMethodDoc = testname - - - - def execute(self): - str_cmd = "command" - t = SysCommand("command-name", str_cmd) - if t.execute() == 0: - self.__raw_out = t.getOutput() - if self.__raw_out == "": - return -1 - lines = t.getOutput().splitlines() - dat = lines[1] - dat = dat.decode('ascii') - dat_list = dat.split( ) - for d in dat_list: - a = dat_list.pop(0) - if a == "sec": - break - self.__MB_real = dat_list[0] - self.__BW_real = dat_list[2] - self.__dat_list = dat_list - print(self.__MB_real) - print(self.__BW_real) - self.failUnless(int(self.__BW_real)>int(self.__OKBW)*0.9,"FAIL:BECAUSE...") - else: - return -1 - return 0 - - def get_Total_MB(self): - return self.__MB_real; - - def get_Total_BW(self): - return self.__MB_real; diff --git a/test-cli/test/tests/qusb.py b/test-cli/test/tests/qusb.py index 44490bc..4f76d0c 100644 --- a/test-cli/test/tests/qusb.py +++ b/test-cli/test/tests/qusb.py @@ -1,60 +1,95 @@ -from test.helpers.syscmd import SysCommand +import shutil import unittest +import os.path +import os +import sh + +from test.helpers.md5 import md5_file class Qusb(unittest.TestCase): + params = None + __resultlist = None # resultlist is a python list of python dictionaries + __QusbName = None - def __init__(self, testname, testfunc, devLabel, numPorts): + def __init__(self, testname, testfunc, varlist): + self.params = varlist super(Qusb, self).__init__(testfunc) - self.__numPorts = numPorts self._testMethodDoc = testname - self.__devLabel = devLabel - if testname=="USBOTG": - self.__usbFileName = "/this_is_an_usb_otg" - self.__usbtext = "USBOTG" - elif testname=="SATA": - self.__usbFileName = "/this_is_a_sata" - self.__usbtext = "SATA" - else: - self.__usbFileName = "/this_is_an_usb_host" - self.__usbtext = "USBHOST" - self.__numUsbFail=[] + self.__xmlObj = varlist["xml"] + + self.__QusbName = varlist.get('name', 'qusb') + self.__loops = varlist.get('loops', self.__xmlObj.getKeyVal(self.__QusbName, "loops", "3")) + self.__filepath_from = varlist.get('path', self.__xmlObj.getKeyVal(self.__QusbName, "path", "/root/hwtest-files")) + self.__file_source = varlist.get('file', self.__xmlObj.getKeyVal(self.__QusbName, "file", "usb-test.bin")) + self.__file_md5 = varlist.get('file_md5', self.__xmlObj.getKeyVal(self.__QusbName, "file_md5", "usb-test.bin.md5")) + self.__path_to = varlist.get('pathto', self.__xmlObj.getKeyVal(self.__QusbName, "pathto", "/mnt/test/disk2")) + self.__check_mounted = varlist.get('check_mounted', self.__xmlObj.getKeyVal(self.__QusbName, "check_mounted", "")) + + self.__resultlist = [] + + def removefileIfExist(self, fname): + if os.path.exists(fname): + try: + os.remove(fname) + except OSError as error: + return False, str(error) + return True, '' + + def checkfiles(self): + if not os.path.isfile('{}/{}'.format(self.__filepath_from, self.__file_source)): + return False, 'File test binary Not found {}/{}'.format(self.__filepath_from, self.__file_source) + if not os.path.isfile('{}/{}'.format(self.__filepath_from, self.__file_md5)): + return False, 'File test binary md5 Not found {}/{}'.format(self.__filepath_from, self.__file_md5) + if self.__check_mounted != '': + if not os.path.ismount('{}'.format(self.__check_mounted)): + return False, 'Required mounted failed: {}'.format(self.__path_to) + r, s = self.removefileIfExist('{}/{}'.format(self.__path_to, self.__file_source)) + if not r: + return r, s + return True, '' + + def getTestFile_md5(self, fname): + t = '' + with open('{}'.format(fname), 'r') as outfile: + t = outfile.read().rstrip("\n") + outfile.close() + return t + + def ExecuteTranfer(self): + try: + originalMD5 = self.getTestFile_md5('{}/{}'.format(self.__filepath_from, self.__file_md5)) + if originalMD5 == '': + self.fail('MD5 file {}/{} Not contain any md5'.format(self.__filepath_from, self.__file_md5)) + # shutil.copy2('{}/{}'.format(self.__filepath_from, self.__file_source), '{}'.format(self.__path_to)) + sh.cp('{}/{}'.format(self.__filepath_from, self.__file_source), '{}'.format(self.__path_to)) + sh.sync() + md5val = md5_file('{}/{}'.format(self.__path_to, self.__file_source)) + self.removefileIfExist('{}/{}'.format(self.__path_to, self.__file_source)) + if originalMD5 != md5val: + return False, 'md5 check failed {}<>{}'.format(originalMD5, md5val) + except OSError as why: + return False,'Error: {} tranfer failed'.format(why.errno) + except sh.ErrorReturnCode as shError: + return False, 'USB Exception: tranfer failed' + except Exception as details: + return False, 'USB Exception: {} tranfer failed'.format(details) + return True, '' def execute(self): - str_cmd= "lsblk -o LABEL" - lsblk_command = SysCommand("lsblk", str_cmd) - if lsblk_command.execute() == 0: - self.__raw_out = lsblk_command.getOutput() - if self.__raw_out == "": - return -1 - lines = lsblk_command.getOutput().splitlines() - host_list=[] - for i in range(len(lines)): - if str(lines[i].decode('ascii'))==self.__devLabel: - host_list.append(i) - if len(host_list)==int(self.__numPorts): - str_cmd = "lsblk -o MOUNTPOINT" - lsblk_command = SysCommand("lsblk", str_cmd) - if lsblk_command.execute() == 0: - self.__raw_out = lsblk_command.getOutput() - if self.__raw_out == "": - print("failed: no command output") - self.fail("failed: no command output") - else: - lines = lsblk_command.getOutput().splitlines() - for i in range(len(host_list)): - file_path=str(lines[host_list[i]].decode('ascii')) + self.__usbFileName - usb_file = open(file_path, 'r') - read=usb_file.read() - if read.find(self.__usbtext)!=-1: - print(file_path + " --> OK!") - else: - self.fail("failed: could not read from usb {}".format(lines[host_list[i]].decode('ascii'))) - self.__numUsbFail.append(host_list[i]) - usb_file.close() - else: - self.fail("failed: couldn't execute lsblk command") - - else: - self.fail("failed: reference and real usb host devices number mismatch") - else: - self.fail("failed: couldn't execute lsblk command") + v, m = self.checkfiles() + if not v: + self.fail(m) + countLoops = int(self.__loops) + while countLoops > 0: + res, msg = self.ExecuteTranfer() + if not res: + res, msg = self.ExecuteTranfer() + if not res: + self.fail(msg) + countLoops = countLoops - 1 + + def getresults(self): + return self.__resultlist + + def gettextresult(self): + return "" diff --git a/test-cli/test/tests/qvideo.py b/test-cli/test/tests/qvideo.py new file mode 100644 index 0000000..225940b --- /dev/null +++ b/test-cli/test/tests/qvideo.py @@ -0,0 +1,125 @@ +import unittest +import sh +import time +from test.helpers.camara import Camara +from test.helpers.detect import Detect_Color +from test.helpers.sdl import SDL2_Test + +class Qvideo(unittest.TestCase): + params = None + __resultlist = None # resultlist is a python list of python dictionaries + + def __init__(self, testname, testfunc, varlist): + self.params = varlist + super(Qvideo, self).__init__(testfunc) + self._testMethodDoc = testname + self.__xmlObj = varlist["xml"] + self.__QVideoName = varlist.get('name', 'qvideo') + self.__resultlist = [] + self.__w = int(varlist.get('capture_size_w', self.__xmlObj.getKeyVal(self.__QVideoName, "capture_size_w", 1280))) + self.__h = int(varlist.get('capture_size_h', self.__xmlObj.getKeyVal(self.__QVideoName, "capture_size_h", 720))) + self.__discard_frames_Count = int(varlist.get('capture_discardframes',self.__xmlObj.getKeyVal(self.__QVideoName, "capture_discardframes", 3))) + self.__frame_mean = int(varlist.get('capture_framemean', self.__xmlObj.getKeyVal(self.__QVideoName, "capture_framemean", 3))) + self.__max_failed = int(varlist.get('capture_maxfailed', self.__xmlObj.getKeyVal(self.__QVideoName, "capture_maxfailed", 1))) + self.__cam_setupscript = varlist.get('cam_setupfile', self.__xmlObj.getKeyVal(self.__QVideoName, "cam_setupfile", + "/root/hwtest-files/board/scripts/v4l-cam.sh")) + self.__sdl_display = varlist.get('sdl_display', self.__xmlObj.getKeyVal(self.__QVideoName, "sdl_display", ":0")) + self.__sdl_driver = varlist.get('sdl_driver', self.__xmlObj.getKeyVal(self.__QVideoName, "sdl_driver", "x11")) + self.__camdevice = varlist.get('camdevice', self.__xmlObj.getKeyVal(self.__QVideoName, "camdevice", "video0")) + + self.__Camara = Camara(setup_script_path=self.__cam_setupscript, device=self.__camdevice, width=self.__w, height=self.__h) + self.__SDL2_Test = SDL2_Test(driver=self.__sdl_driver, display=self.__sdl_display, w=self.__w, h=self.__h) + self.__SDL2_Test.Clear() + + def __drop_frames(self, frame_count): + count = frame_count + while count > 0: + _ = self.__Camara.getFrame() + count -= 1 + + def Verify_Color(self, color): + self.paintColor(color) + count = self.__frame_mean + res_ok = 0 + res_failed = 0 + r_count = 0 + c_mean = 0 + while count > 0: + res, t = self.Capture(color) + if res: + res_ok += 1 + else: + res_failed += 1 + if t == "count": + r_count += 1 + else: + c_mean +=1 + count -= 1 + self.unpaintColor(color) + if res_failed > self.__max_failed: + if r_count > 0: + return '{}_COLOR_FAILED'.format(color) + elif c_mean > 0: + return 'MEAN_{}_FAILED'.format(color) + return "ok" + + def paintColor(self, color): + self.__SDL2_Test.Paint(color) + time.sleep(3) + self.__drop_frames(self.__Camara.getFrameCount()) + + def unpaintColor(self, color): + self.__SDL2_Test.Clear() + time.sleep(1) + + def Capture(self, color): + image = self.__Camara.getFrame() + if image is None: + return None + dtObject = Detect_Color(image) + if color == 'RED': + _, _, mean, count = dtObject.getRed() + elif color == 'BLUE': + _, _, mean, count = dtObject.getBlue() + elif color == 'GREEN': + c_mask, croped, mean, count = dtObject.getGreen() + + return self.checkThreshold(color, mean, count) + + def checkThreshold(self, color, mean, count): + min_count = int( + self.params.get('{}_min_count'.format(color), + self.__xmlObj.getKeyVal(self.__QVideoName, '{}_min_count'.format(color), 50000))) + + str = "" + # first verify count + if count >= min_count: + return True, "" + else: + str = "count" + return False, str + + def execute(self): + self.__resultlist = [] + + # Open camara + if not self.__Camara.Open(): + self.fail('Error: USB camera not found') + # discard initial frames + self.__drop_frames(self.__discard_frames_Count) + # Test + res = self.Verify_Color('RED') + if res != "ok": + self.fail("failed: RED verification failed") + res = self.Verify_Color('BLUE') + if res != "ok": + self.fail("failed: BLUE verification failed") + res = self.Verify_Color('GREEN') + if res != "ok": + self.fail("failed: GREEN verification failed") + + def getresults(self): + return self.__resultlist + + def gettextresult(self): + return "" diff --git a/test-cli/test/tests/qwifi.py b/test-cli/test/tests/qwifi.py index d08149b..4695041 100644 --- a/test-cli/test/tests/qwifi.py +++ b/test-cli/test/tests/qwifi.py @@ -1,45 +1,99 @@ -from test.helpers.syscmd import SysCommand import unittest -import subprocess +import time +import uuid +import iperf3 +import netifaces +from test.helpers.iw import iwScan +from test.helpers.utils import save_json_to_disk +from test.helpers.utils import station2Port class Qwifi(unittest.TestCase): + __serverip = None + __numbytestx = None + __bwexpected = None + __port = None + params = None + __bwreal = None + __resultlist = None # resultlist is a python list of python dictionaries + timebetweenattempts = 5 - def __init__(self, testname, testfunc, signal): + # varlist content: serverip, bwexpected, port + def __init__(self, testname, testfunc, varlist): + self.params = varlist super(Qwifi, self).__init__(testfunc) - self.__signal = signal self._testMethodDoc = testname - # WiFi SERVERIP fixed at the moment. - self._serverip = "192.168.5.1" + self.__xmlObj = varlist["xml"] + self.__QwifiName = varlist.get('name', 'qwifi') + self.__loops = varlist.get('loops', self.__xmlObj.getKeyVal(self.__QwifiName, "loops", "1")) + self.__port = varlist.get('port', self.__xmlObj.getKeyVal(self.__QwifiName, "port", "5000")) + self.__bw = varlist.get('bwexpected', self.__xmlObj.getKeyVal(self.__QwifiName, "bwexpected", "5.0")) + self.__serverip = varlist.get('serverip', self.__xmlObj.getKeyVal(self.__QwifiName, "serverip", "localhost")) + self.__duration = varlist.get('duration', self.__xmlObj.getKeyVal(self.__QwifiName, "duration", "10")) + self.__interface = varlist.get('interface', self.__xmlObj.getKeyVal(self.__QwifiName, "interface", "wlan0")) + self.__toPath = varlist.get('to', self.__xmlObj.getKeyVal(self.__QwifiName, "to", "/mnt/station_ramdisk")) + self.__retriesCount = varlist.get('retries', self.__xmlObj.getKeyVal(self.__QwifiName, "retries", "5")) + self.__waitRetryTime = varlist.get('wait_retry', self.__xmlObj.getKeyVal(self.__QwifiName, "wait_retry", "10")) + self.__wifi_res_file = varlist.get('wifi_res_file', self.__xmlObj.getKeyVal(self.__QwifiName, "wifi_res_file", "wifi_test_{}.json")) + self.__wifi_st_file = varlist.get('wifi_st_file', self.__xmlObj.getKeyVal(self.__QwifiName, "wifi_st_file", "wifi_st_{}.json")) + self.__file_uuid = uuid.uuid4() + self.__wifi_res_file = self.__wifi_res_file.format(self.__file_uuid) + self.__wifi_st_file = self.__wifi_st_file.format(self.__file_uuid) + self.__resultlist = [] + + def checkInterface(self, reqInterface): + ifaces = netifaces.interfaces() + for t in ifaces: + if t == reqInterface: + return True + return False def execute(self): - # First check connection with the wifi server using ping command - #str_cmd = "ping -c 1 {} > /dev/null".format(self._serverip) - #wifi_ping = SysCommand("wifi_ping", str_cmd) - #wifi_ping.execute() - #res = subprocess.call(['ping', '-c', '1', self._serverip]) - p = subprocess.Popen(['ping','-c','1',self._serverip,'-W','2'],stdout=subprocess.PIPE) - p.wait() - res=p.poll() - if res == 0: - str_cmd= "iw wlan0 link" - wifi_stats = SysCommand("wifi_stats", str_cmd) - if wifi_stats.execute() == 0: - w_stats = wifi_stats.getOutput().decode('ascii').splitlines() - #self._longMessage = str(self.__raw_out).replace("'", "") - if not w_stats[0] == "Not connected.": - signal_st = w_stats[5].split(" ")[1] - try: - int(signal_st) - if -1*int(signal_st) > int(self.__signal): - self.fail("failed: signal to server lower than expected") - except ValueError: - self.fail("failed: error output (Bad connection?)") + self.__resultlist = [] + stationList = {} + # check interfaces + if not self.checkInterface(self.__interface): + self.fail('Interface not found: {}'.format(self.__interface)) + # scan networks + scanner = iwScan(self.__interface) + if scanner.scan(): + stationList = scanner.getStations() + # check if we are connected + if not scanner.findConnected(): + self.fail('Not connected to test AP') + else: + strError = scanner.getLastError() + self.fail('qWifi scanner Execution error: {}'.format(strError)) + # Start Iperf + client = iperf3.Client() + client.duration = int(self.__duration) + client.server_hostname = self.__serverip + client.port = station2Port(self.__port) + count = int(self.__retriesCount) + while count > 0: + result = client.run() + if result.error == None: + if result.received_Mbps >= float(self.__bw) and result.sent_Mbps >= float(self.__bw): + save_json_to_disk(filePath='{}/{}'.format(self.__toPath, self.__wifi_st_file), + description='wifi scan', + mime='application/json', + json_data=stationList, + result=self.__resultlist) + save_json_to_disk(filePath='{}/{}'.format(self.__toPath, self.__wifi_res_file), + description='wifi {} iperf3'.format(self.__interface), + mime='application/json', + json_data=result.json, + result=self.__resultlist) + return else: - self.fail("failed: error output (Bad connection?)") - #tx_brate = float(w_stats[6].split(" ")[2]) + self.fail('bw fail: rx:{} tx:{}'.format(result.received_Mbps, result.sent_Mbps)) else: - self.fail("failed: couldn't execute iw command") - else: - self.fail("failed: ping to server {}".format(self._serverip)) + count = count - 1 + time.sleep(int(self.__waitRetryTime)) + + self.fail('qWifi (max tries) Execution error: {}'.format(result.error)) + def getresults(self): + return self.__resultlist + def gettextresult(self): + return "" |