summaryrefslogtreecommitdiff
path: root/test-cli/test/tests
diff options
context:
space:
mode:
Diffstat (limited to 'test-cli/test/tests')
-rw-r--r--test-cli/test/tests/qamper.py32
-rw-r--r--test-cli/test/tests/qaudio.py77
-rw-r--r--test-cli/test/tests/qdmesg.py30
-rw-r--r--test-cli/test/tests/qeeprom.py77
-rw-r--r--test-cli/test/tests/qethernet.py77
-rw-r--r--test-cli/test/tests/qmmcflash.py46
-rw-r--r--test-cli/test/tests/qnand.py44
-rw-r--r--test-cli/test/tests/qplc.py70
-rw-r--r--test-cli/test/tests/qram.py56
-rw-r--r--test-cli/test/tests/qusb.py143
-rw-r--r--test-cli/test/tests/qusbdual.py90
-rw-r--r--test-cli/test/tests/qwifi.py143
12 files changed, 518 insertions, 367 deletions
diff --git a/test-cli/test/tests/qamper.py b/test-cli/test/tests/qamper.py
index 58054c9..df340eb 100644
--- a/test-cli/test/tests/qamper.py
+++ b/test-cli/test/tests/qamper.py
@@ -5,6 +5,7 @@ from test.helpers.amper import Amper
class Qamper(unittest.TestCase):
params = None
__resultlist = None # resultlist is a python list of python dictionaries
+ dummytest = False
# varlist: undercurrent, overcurrent
def __init__(self, testname, testfunc, varlist):
@@ -21,8 +22,28 @@ class Qamper(unittest.TestCase):
raise Exception('overcurrent param inside Qamp must be defined')
self._testMethodDoc = testname
self.__resultlist = []
+ if "dummytest" in varlist:
+ self.dummytest = varlist["dummytest"]
+
+ def saveresultinfile(self, currentvalue):
+ # save result in a file
+ with open('/mnt/station_ramdisk/amper.txt', 'w') as outfile:
+ n = outfile.write("Current: {} A".format(currentvalue))
+ outfile.close()
+ self.__resultlist.append(
+ {
+ "description": "Amperimeter values",
+ "filepath": "/mnt/station_ramdisk/amper.txt",
+ "mimetype": "text/plain"
+ }
+ )
def execute(self):
+ if self.dummytest:
+ self.current = "0.0"
+ self.saveresultinfile(self.current)
+ return
+
amp = Amper()
# open serial connection
if not amp.open():
@@ -35,16 +56,7 @@ class Qamper(unittest.TestCase):
# get current value (in Amperes)
self.current = amp.getCurrent()
# save result in a file
- with open('/mnt/station_ramdisk/amper.txt', 'w') as outfile:
- n = outfile.write("Current: {} A".format(self.current))
- outfile.close()
- self.__resultlist.append(
- {
- "description": "Amperimeter values",
- "filepath": "/mnt/station_ramdisk/amper.txt",
- "mimetype": "text/plain"
- }
- )
+ self.saveresultinfile(self.current)
# close serial connection
amp.close()
# Check current range
diff --git a/test-cli/test/tests/qaudio.py b/test-cli/test/tests/qaudio.py
index 3d2113a..2c86809 100644
--- a/test-cli/test/tests/qaudio.py
+++ b/test-cli/test/tests/qaudio.py
@@ -2,56 +2,78 @@ import unittest
import sh
import wave
import contextlib
-
-
-def calc_audio_duration(fname):
- with contextlib.closing(wave.open(fname, 'r')) as f:
- frames = f.getnframes()
- rate = f.getframerate()
- duration = frames / float(rate)
- f.close()
- return duration
-
+import uuid
+import time
+from test.helpers.utils import save_result
+from test.helpers.iseelogger import logObj
class Qaudio(unittest.TestCase):
params = None
__resultlist = None # resultlist is a python list of python dictionaries
__dtmf_secuence = ["1", "3", "5", "7", "9"]
+ __dtmf_file = None
+ __file_uuid = None
+ __QaudioName = None
def __init__(self, testname, testfunc, varlist):
self.params = varlist
super(Qaudio, self).__init__(testfunc)
self._testMethodDoc = testname
self.__resultlist = []
+ self.__xmlObj = varlist["xml"]
+ self.__QaudioName = varlist.get('name', 'qaudio')
+ self.__dtmf_file_play = varlist.get('play_dtmf_file', self.__xmlObj.getKeyVal(self.__QaudioName, "play_dtmf_file", "dtmf-13579.wav"))
+ self.__recordtime = varlist.get('recordtime', self.__xmlObj.getKeyVal(self.__QaudioName, "recordtime", "0.15"))
+ self.__dtmf_file_record = varlist.get('record_dtmf_file', self.__xmlObj.getKeyVal(self.__QaudioName, "record_dtmf_file", "record-{}.wav"))
+ self.__sampling_rate = varlist.get('rate', self.__xmlObj.getKeyVal(self.__QaudioName, "rate", "8000"))
+ self.__record_path = varlist.get('to', self.__xmlObj.getKeyVal(self.__QaudioName, "to", "/mnt/station_ramdisk"))
+ self.__run_delay = float(varlist.get('aplay_run_delay', self.__xmlObj.getKeyVal(self.__QaudioName, "aplay_run_delay", "0.0")))
+ self.__file_uuid = uuid.uuid4()
+ self.__dtmf_file_record = self.__dtmf_file_record.format(self.__file_uuid)
+
+ def restoreAlsaCtl(self):
+ try:
+ sh.alsactl('restore');
+ except Exception as E:
+ logObj.getlogger().error("Failed to Execite alsactl restore");
def execute(self):
- # analize audio file
- recordtime = calc_audio_duration("/var/lib/hwtest-files/dtmf-13579.wav") + 0.15
+ self.restoreAlsaCtl()
+ try:
+ # analize audio file
+ calcRecordTime = self.calc_audio_duration(self.__dtmf_file_play) + float(self.__recordtime) + float(0.1)
+ except TypeError as Error:
+ self.fail("failed: TypeError: {}.".Error)
+
# previous play to estabilise audio lines
- p1 = sh.aplay("/var/lib/hwtest-files/dtmf-13579.wav", _bg=True)
- p2 = sh.arecord("-r", 8000, "-d", recordtime, "/mnt/station_ramdisk/recorded.wav", _bg=True)
- p1.wait()
- p2.wait()
- # play and record
- p1 = sh.aplay("/var/lib/hwtest-files/dtmf-13579.wav", _bg=True)
- p2 = sh.arecord("-r", 8000, "-d", recordtime, "/mnt/station_ramdisk/recorded.wav", _bg=True)
- p1.wait()
+ p2 = sh.arecord("-r", self.__sampling_rate, "-d", calcRecordTime, "{}/{}".format(self.__record_path, self.__dtmf_file_record), _bg=True)
+ time.sleep(self.__run_delay)
+ p1 = sh.aplay(self.__dtmf_file_play)
p2.wait()
if p1.exit_code == 0 and p2.exit_code == 0:
- p = sh.multimon("-t", "wav", "-a", "DTMF", "/mnt/station_ramdisk/recorded.wav", "-q")
+ p = sh.multimon("-t", "wav", "-a", "DTMF", "{}/{}".format(self.__record_path, self.__dtmf_file_record), "-q")
if p.exit_code == 0:
lines = p.stdout.decode('ascii').splitlines()
self.__dtmf_secuence_result = []
for li in lines:
if li.split(" ")[0] == "DTMF:":
self.__dtmf_secuence_result.append(li.split(" ")[1])
+ self.__dtmf_secuence_result = sorted(list(set(self.__dtmf_secuence_result)))
# compare original and processed dtmf sequence
if len(self.__dtmf_secuence) == len(self.__dtmf_secuence_result):
for a, b in zip(self.__dtmf_secuence, self.__dtmf_secuence_result):
if a != b:
- self.fail("failed: sent and received DTMF sequence don't match.")
+ logObj.getlogger().debug(
+ "failed: sent and received DTMF sequence do not match.{} != {}".format(
+ self.__dtmf_secuence, self.__dtmf_secuence_result))
+ self.fail("failed: sent and received DTMF sequence do not match.")
+ save_result(filePath='{}/{}'.format(self.__record_path, self.__dtmf_file_record),
+ description='sound record',
+ mime='audio/x-wav',
+ result=self.__resultlist)
else:
- self.fail("failed: received DTMF sequence is shorter than expected.")
+ logObj.getlogger().debug("received DTMF sequence is shorter than expected.{} {}".format(self.__dtmf_secuence,self.__dtmf_secuence_result))
+ self.fail("received DTMF sequence is shorter than expected")
else:
self.fail("failed: unable to use multimon command.")
else:
@@ -62,3 +84,12 @@ class Qaudio(unittest.TestCase):
def gettextresult(self):
return ""
+
+ def calc_audio_duration(self, fname):
+ duration = 0.0
+ with contextlib.closing(wave.open(fname, 'r')) as f:
+ frames = f.getnframes()
+ rate = f.getframerate()
+ duration = frames / float(rate)
+ f.close()
+ return duration
diff --git a/test-cli/test/tests/qdmesg.py b/test-cli/test/tests/qdmesg.py
index b35f1ff..39a5047 100644
--- a/test-cli/test/tests/qdmesg.py
+++ b/test-cli/test/tests/qdmesg.py
@@ -1,4 +1,5 @@
import sh
+import os
import os.path
from os import path
@@ -9,24 +10,23 @@ class Qdmesg:
def __init__(self, testname, testfunc, varlist):
self.params = varlist
- self._testMethodDoc = testname
+ self.__testMethodDoc = testname
self.__resultlist = []
self.pgObj = varlist["db"]
+ self.__xmlObj = varlist["xml"]
+ self.__QdmesgName = varlist.get('name', 'qdmesg')
+ self.__syslog_dmesg_file = varlist.get('syslog_dmesg',self.__xmlObj.getKeyVal(self.__QdmesgName, "syslog_dmesg",
+ "/var/log/kern.log"))
+
+ def getTestName(self):
+ return self.__testMethodDoc
def execute(self):
- print("running dmesg test")
self.pgObj.run_test(self.params["testidctl"], self.params["testid"])
- # delete previous file
- if path.exists("/mnt/station_ramdisk/dmesg.txt"):
- os.remove("/mnt/station_ramdisk/dmesg.txt")
- # generate file
- p = sh.dmesg("--color=never", _out="/mnt/station_ramdisk/dmesg.txt")
- if p.exit_code == 0:
- # save dmesg result in DB
- self.pgObj.upload_result_file(self.params["testidctl"], self.params["testid"], "dmesg output",
- "/mnt/station_ramdisk/dmesg.txt", "text/plain")
- self.pgObj.finish_test(self.params["testidctl"], self.params["testid"], "TEST_COMPLETE", "")
- print("success dmesg test")
- else:
+ if not os.path.isfile('{}'.format(self.__syslog_dmesg_file)):
self.pgObj.finish_test(self.params["testidctl"], self.params["testid"], "TEST_FAILED", "")
- print("fail dmesg test")
+ return False
+ self.pgObj.upload_result_file(self.params["testidctl"], self.params["testid"], "{}".format(self.__syslog_dmesg_file),
+ "{}".format(self.__syslog_dmesg_file), "text/plain")
+ self.pgObj.finish_test(self.params["testidctl"], self.params["testid"], "TEST_COMPLETE", "")
+ return True
diff --git a/test-cli/test/tests/qeeprom.py b/test-cli/test/tests/qeeprom.py
index 7fc9fcd..7900381 100644
--- a/test-cli/test/tests/qeeprom.py
+++ b/test-cli/test/tests/qeeprom.py
@@ -1,46 +1,65 @@
-import sh
import unittest
-
+import os
class Qeeprom(unittest.TestCase):
params = None
- __position = None
- __eeprompath = None
__resultlist = None # resultlist is a python list of python dictionaries
+ __QEepromName = None
+ __i2cBus = None
+ __device = None
# varlist: position, eeprompath
def __init__(self, testname, testfunc, varlist):
self.params = varlist
super(Qeeprom, self).__init__(testfunc)
self._testMethodDoc = testname
+ self.__xmlObj = varlist["xml"]
+ self.__QEepromName = varlist.get('name', 'qeeprom')
+ self.__i2cAddress = varlist.get('i2c_address', self.__xmlObj.getKeyVal(self.__QEepromName, "i2c_address", "0050"))
+ self.__i2cBus = varlist.get('i2c_bus', self.__xmlObj.getKeyVal(self.__QEepromName, "i2c_bus", "0"))
+ self.__device = '/sys/bus/i2c/devices/{}-{}/eeprom'.format(self.__i2cBus, self.__i2cAddress)
+
self.__resultlist = []
- if "position" in varlist:
- self.__position = varlist["position"]
- else:
- raise Exception('position param inside Qeeprom must be defined')
- if "eeprompath" in varlist:
- self.__eeprompath = varlist["eeprompath"]
- else:
- raise Exception('eeprompath param inside Qeeprom must be defined')
+
+ def __check_device(self):
+ if os.path.isfile(self.__device):
+ return True
+ return False
+
+ def __eeprom_write(self, data):
+ try:
+ f = open(self.__device, "wb")
+ f.write(data)
+ f.close()
+ except IOError as Error:
+ return False, '{}'.format(Error.errno)
+ return True, ''
+
+ def __eeprom_read(self, size):
+ try:
+ f = open(self.__device, "rb")
+ data = f.read(size)
+ f.close()
+ except IOError as Error:
+ return False, '{}'.format(Error.errno)
+ return True, data
+
def execute(self):
- # generate some data
- data_tx = "isee_test"
- # write data into the eeprom
- p = sh.dd(sh.echo(data_tx), "of=" + self.__eeprompath, "bs=1", "seek=" + self.__position)
- if p.exit_code == 0:
- # read data from the eeprom
- p = sh.dd("if=" + self.__eeprompath, "bs=1", "skip=" + self.__position, "count=" + str(len(data_tx)))
- if p.exit_code == 0:
- data_rx = p.stdout.decode('ascii')
- # compare both values
- if data_rx != data_tx:
- # Both data are different
- self.fail("failed: mismatch between written and received values.")
- else:
- self.fail("failed: Unable to read from the EEPROM device.")
- else:
- self.fail("failed: Unable to write on the EEPROM device.")
+ if not self.__check_device():
+ self.fail("cannot open device {}".format(self.__device))
+
+ data = bytes('AbcDeFgHijK098521', 'utf-8')
+
+ v, w = self.__eeprom_write(data)
+ if not v:
+ self.fail("eeprom: {} write test failed".format(w))
+ v, r = self.__eeprom_read(len(data))
+ if not v:
+ self.fail("eeprom: {} read test failed".format(r))
+ if r != data:
+ self.fail("mismatch between write and read bytes")
+
def getresults(self):
return self.__resultlist
diff --git a/test-cli/test/tests/qethernet.py b/test-cli/test/tests/qethernet.py
index 81acef1..7d081a1 100644
--- a/test-cli/test/tests/qethernet.py
+++ b/test-cli/test/tests/qethernet.py
@@ -2,7 +2,11 @@ import unittest
import sh
import json
import time
-
+import uuid
+import iperf3
+import netifaces
+from test.helpers.utils import save_json_to_disk
+from test.helpers.utils import station2Port
class Qethernet(unittest.TestCase):
__serverip = None
@@ -16,29 +20,66 @@ class Qethernet(unittest.TestCase):
# varlist content: serverip, bwexpected, port
def __init__(self, testname, testfunc, varlist):
- # save the parameters list
self.params = varlist
- # configure the function to be executed when the test runs. "testfunc" is a name of a method inside this
- # class, that in this situation, it can only be "execute".
super(Qethernet, self).__init__(testfunc)
- # validate and get the parameters
- if "serverip" in varlist:
- self.__serverip = varlist["serverip"]
- else:
- raise Exception('sip param inside Qethernet have been be defined')
- if "bwexpected" in varlist:
- self.__bwexpected = varlist["bwexpected"]
- else:
- raise Exception('bwexpected param inside Qethernet must be defined')
- if "port" in varlist:
- self.__port = varlist["port"]
- else:
- raise Exception('port param inside Qethernet must be defined')
- self.__numbytestx = "10M"
self._testMethodDoc = testname
+ self.__xmlObj = varlist["xml"]
+ self.__QEthName = varlist.get('name', 'qeth')
+ self.__loops = varlist.get('loops', self.__xmlObj.getKeyVal(self.__QEthName, "loops", "1"))
+ self.__port = varlist.get('port', self.__xmlObj.getKeyVal(self.__QEthName, "port", "5000"))
+ self.__bw = varlist.get('bwexpected', self.__xmlObj.getKeyVal(self.__QEthName, "bwexpected", "40.0"))
+ self.__serverip = varlist.get('serverip', self.__xmlObj.getKeyVal(self.__QEthName, "serverip", "localhost"))
+ self.__duration = varlist.get('duration', self.__xmlObj.getKeyVal(self.__QEthName, "duration", "10"))
+ self.__interface = varlist.get('interface', self.__xmlObj.getKeyVal(self.__QEthName, "interface", "eth0"))
+ self.__toPath = varlist.get('to', self.__xmlObj.getKeyVal(self.__QEthName, "to", "/mnt/station_ramdisk"))
+ self.__retriesCount = varlist.get('retries', self.__xmlObj.getKeyVal(self.__QEthName, "retries", "5"))
+ self.__waitRetryTime = varlist.get('wait_retry', self.__xmlObj.getKeyVal(self.__QEthName, "wait_retry", "10"))
+ self.__wifi_res_file = varlist.get('eth_res_file', self.__xmlObj.getKeyVal(self.__QEthName, "eth_res_file", "eth_test_{}.json"))
+ self.__wifi_st_file = varlist.get('eth_st_file', self.__xmlObj.getKeyVal(self.__QEthName, "eth_st_file", "eth_st_{}.json"))
+ self.__file_uuid = uuid.uuid4()
+ self.__wifi_res_file = self.__wifi_res_file.format(self.__file_uuid)
+ self.__wifi_st_file = self.__wifi_st_file.format(self.__file_uuid)
+
self.__resultlist = []
+ def checkInterface(self, reqInterface):
+ ifaces = netifaces.interfaces()
+ for t in ifaces:
+ if t == reqInterface:
+ return True
+ return False
+
def execute(self):
+ self.__resultlist = []
+ # check interfaces
+ if not self.checkInterface(self.__interface):
+ self.fail('Interface not found: {}'.format(self.__interface))
+
+ # Start Iperf
+ client = iperf3.Client()
+ client.duration = int(self.__duration)
+ client.server_hostname = self.__serverip
+ client.port = station2Port(self.__port)
+ count = int(self.__retriesCount)
+ while count > 0:
+ result = client.run()
+ if result.error == None:
+ if result.received_Mbps >= float(self.__bw) and result.sent_Mbps >= float(self.__bw):
+ save_json_to_disk(filePath='{}/{}'.format(self.__toPath, self.__wifi_res_file),
+ description='eth {} iperf3'.format(self.__interface),
+ mime='application/json',
+ json_data=result.json,
+ result=self.__resultlist)
+ return
+ else:
+ self.fail('bw fail: rx:{} tx:{}'.format(result.received_Mbps, result.sent_Mbps))
+ else:
+ count = count - 1
+ time.sleep(int(self.__waitRetryTime))
+
+ self.fail('qEth (max tries) Execution error: {}'.format(result.error))
+
+ def execute2(self):
# execute iperf command against the server, but it implements attempts in case the server is busy
iperfdone = False
while not iperfdone:
diff --git a/test-cli/test/tests/qmmcflash.py b/test-cli/test/tests/qmmcflash.py
index f10757e..0f5a0c1 100644
--- a/test-cli/test/tests/qmmcflash.py
+++ b/test-cli/test/tests/qmmcflash.py
@@ -1,5 +1,10 @@
import unittest
-import sh
+import scanf
+from scanf import scanf
+from sh import mmc
+from sh import ErrorReturnCode
+from test.helpers.utils import save_file_to_disk
+from test.helpers.utils import sys_read
class Qmmcflash(unittest.TestCase):
params = None
@@ -8,8 +13,43 @@ class Qmmcflash(unittest.TestCase):
self.params = varlist
super(Qmmcflash, self).__init__(testfunc)
self._testMethodDoc = testname
- # TODO
+ self.__xmlObj = varlist["xml"]
+ self.__QeMMCName = varlist.get('name', 'qemmc')
+ self.__emmcDevice = varlist.get('device', self.__xmlObj.getKeyVal(self.__QeMMCName, "device", "mmcblk0"))
+ self.__toPath = varlist.get('to', self.__xmlObj.getKeyVal(self.__QeMMCName, "to", "/mnt/station_ramdisk"))
+ self.__mmc_res_file = varlist.get('emmc_res_file', self.__xmlObj.getKeyVal(self.__QeMMCName, "emmc_res_file", "emmc_status.txt"))
+ self.__mmcPort = varlist.get('mmc_port', self.__xmlObj.getKeyVal(self.__QeMMCName, "mmc_port", "0"))
+ self.__mmcID = varlist.get('mmc_id', self.__xmlObj.getKeyVal(self.__QeMMCName, "mmc_id", "0001"))
+
+ self.__resultlist = []
def execute(self):
- # TODO \ No newline at end of file
+ try:
+ dataOut = mmc('extcsd', 'read', '/dev/{}'.format(self.__emmcDevice))
+ save_file_to_disk(filePath='{}/{}'.format(self.__toPath, self.__mmc_res_file),
+ description='eMMC health test',
+ mime='text/plain',
+ data=dataOut.stdout.decode('utf-8'),
+ result=self.__resultlist)
+ sysDevice = "/sys/bus/mmc/drivers/mmcblk/mmc{}:{}".format(self.__mmcPort, self.__mmcID)
+ r, data = sys_read("{}/life_time".format(sysDevice))
+ if not r:
+ self.fail("emmc: life_time not found")
+ res = scanf("0x%d 0x%d", data)
+ if res[0] > 3 or res[1] > 3:
+ self.fail("emmc: review {} life_time > 3".format(sysDevice))
+ r, data = sys_read("{}/pre_eol_info".format(sysDevice))
+ if not r:
+ self.fail("emmc: pre_eol_info not found")
+ res = scanf("0x%d", data)
+ if res[0] != 1:
+ self.fail("emmc: review {} pre_eol_info != 1".format(sysDevice))
+ except ErrorReturnCode as Error:
+ self.fail("emmc: failed {} ".format(Error.exit_code))
+
+ def getresults(self):
+ return self.__resultlist
+
+ def gettextresult(self):
+ return "" \ No newline at end of file
diff --git a/test-cli/test/tests/qnand.py b/test-cli/test/tests/qnand.py
index 988ab60..6de1eaf 100644
--- a/test-cli/test/tests/qnand.py
+++ b/test-cli/test/tests/qnand.py
@@ -1,6 +1,7 @@
import unittest
import sh
-
+import uuid
+from test.helpers.utils import save_file_to_disk
class Qnand(unittest.TestCase):
params = None
@@ -11,29 +12,34 @@ class Qnand(unittest.TestCase):
def __init__(self, testname, testfunc, varlist):
self.params = varlist
super(Qnand, self).__init__(testfunc)
- if "device" in varlist:
- self.__device = varlist["device"]
- else:
- raise Exception('device param inside Qnand must be defined')
self._testMethodDoc = testname
+ self.__xmlObj = varlist["xml"]
+ self.__QNandName = varlist.get('name', 'qnand')
+ self.__device = varlist.get('device', self.__xmlObj.getKeyVal(self.__QNandName, "device", "/dev/mtd0"))
+ self.__nand_res_file = varlist.get('nand_res_file', self.__xmlObj.getKeyVal(self.__QNandName, "nand_res_file", "nand_test_{}.txt"))
+ self.__toPath = varlist.get('to', self.__xmlObj.getKeyVal(self.__QNandName, "to", "/mnt/station_ramdisk"))
+ self.__file_uuid = uuid.uuid4()
+ self.__nand_res_file = self.__nand_res_file.format(self.__file_uuid)
+ nandtestPath = self.__xmlObj.getKeyVal("qnand", "nandtestPath", '/root/hwtest-files/nandtest"')
+
+ if nandtestPath == '':
+ self.myNandTest = sh.nandtest
+ else:
+ self.myNandTest = sh.Command(nandtestPath)
self.__resultlist = []
def execute(self):
try:
- p = sh.nandtest("-m", self.__device)
- # save result
- with open('/mnt/station_ramdisk/nand-nandtest.txt', 'w') as outfile:
- n = outfile.write(p.stdout.decode('ascii'))
- outfile.close()
- self.__resultlist.append(
- {
- "description": "nandtest output",
- "filepath": "/mnt/station_ramdisk/nand-nandtest.txt",
- "mimetype": "text/plain"
- }
- )
- except sh.ErrorReturnCode as e:
- self.fail("failed: could not complete nandtest command")
+ res = self.myNandTest("-m", self.__device)
+ save_file_to_disk(filePath='{}/{}'.format(self.__toPath, self.__nand_res_file),
+ description='nand {} test'.format(self.__device),
+ mime='text/plain',
+ data=res.stdout.decode('utf-8'),
+ result=self.__resultlist)
+
+ except sh.ErrorReturnCode_1 as e:
+ self.fail("nandtest failed: {}".format(e.ErrorReturnCode.stdout))
+
def getresults(self):
return self.__resultlist
diff --git a/test-cli/test/tests/qplc.py b/test-cli/test/tests/qplc.py
new file mode 100644
index 0000000..01258a9
--- /dev/null
+++ b/test-cli/test/tests/qplc.py
@@ -0,0 +1,70 @@
+import shutil
+import unittest
+import os.path
+import os
+import sh
+
+from test.helpers.md5 import md5_file
+from test.helpers.plc import dcpPLC
+
+class Qplc(unittest.TestCase):
+ params = None
+ __resultlist = None # resultlist is a python list of python dictionaries
+ __QPLCName = None
+ __plc = None
+
+ def __init__(self, testname, testfunc, varlist):
+ self.params = varlist
+ super(Qplc, self).__init__(testfunc)
+ self._testMethodDoc = testname
+ self.__xmlObj = varlist["xml"]
+
+ self.__QPLCName = varlist.get('name', 'qplc')
+ self.__firmware = varlist.get('firmware', self.__xmlObj.getKeyVal(self.__QPLCName, "firmware", ""))
+ self.__firmware_md5 = varlist.get('firmware_md5', self.__xmlObj.getKeyVal(self.__QPLCName, "firmware_md5", ""))
+ self.__factoryTool = varlist.get('factory_tool', self.__xmlObj.getKeyVal(self.__QPLCName, "factory_tool", ""))
+ self.__gen_ip = varlist.get('gen_ip', self.__xmlObj.getKeyVal(self.__QPLCName, "gen_ip", "0"))
+ self.__gen_mac = varlist.get('gen_mac', self.__xmlObj.getKeyVal(self.__QPLCName, "gen_mac", "0"))
+ self.__mtd_device = varlist.get('mtd_device', self.__xmlObj.getKeyVal(self.__QPLCName, "mtd_device", "/dev/mtd0"))
+ self.__plc = dcpPLC(self.__factoryTool, self.__mtd_device)
+
+ self.__resultlist = []
+
+
+ def checkfiles(self):
+ if not os.path.isfile('{}/{}'.format(self.__filepath_from, self.__file_source)):
+ return False, 'File test binary Not found {}/{}'.format(self.__filepath_from, self.__file_source)
+ if not os.path.isfile('{}/{}'.format(self.__filepath_from, self.__file_md5)):
+ return False, 'File test binary md5 Not found {}/{}'.format(self.__filepath_from, self.__file_md5)
+ if self.__check_mounted != '':
+ if not os.path.ismount('{}'.format(self.__check_mounted)):
+ return False, 'Required mounted failed: {}'.format(self.__path_to)
+ r, s = self.removefileIfExist('{}/{}'.format(self.__path_to, self.__file_source))
+ if not r:
+ return r, s
+ return True, ''
+
+ def __flashMemory(self):
+ self.__plc.setSaveFirmwareMode()
+ r, v = self.__plc.SaveFirmware(self.__firmware)
+ if not r:
+ self.fail(v)
+
+ def execute(self):
+ # Check files
+ v, m = self.checkfiles()
+ if not v:
+ self.fail(m)
+ # do flash & verify
+ self.__flashMemory()
+ # do Plc boot
+ self.__plc.setBootMode()
+ # Wait plc goes UP
+
+
+
+ def getresults(self):
+ return self.__resultlist
+
+ def gettextresult(self):
+ return ""
diff --git a/test-cli/test/tests/qram.py b/test-cli/test/tests/qram.py
index 21a01c8..49d4d54 100644
--- a/test-cli/test/tests/qram.py
+++ b/test-cli/test/tests/qram.py
@@ -1,33 +1,59 @@
import unittest
import sh
-
+# from test.helpers.iseelogger import MeasureTime
+# from test.helpers.iseelogger import logObj
+from test.helpers.utils import save_file_to_disk
class Qram(unittest.TestCase):
params = None
- __memsize = "10M"
- __loops = "1"
- __resultlist = None # resultlist is a python list of python dictionaries
+ __memsize = None
+ __loops = None
+ __resultlist = [] # resultlist is a python list of python dictionaries
# varlist: memsize, loops
def __init__(self, testname, testfunc, varlist):
self.params = varlist
super(Qram, self).__init__(testfunc)
- if "memsize" in varlist:
- self.__memsize = varlist["memsize"]
- else:
- raise Exception('memsize param inside Qram must be defined')
- if "loops" in varlist:
- self.__loops = varlist["loops"]
- else:
- raise Exception('loops param inside Qram must be defined')
self._testMethodDoc = testname
- self.__resultlist = []
+ self.__xmlObj = varlist["xml"]
+
+ self.__QramName = varlist.get('name', 'qram')
+ self.__loops = varlist.get('loops', self.__xmlObj.getKeyVal(self.__QramName, "loops", "1"))
+ self.__memsize = varlist.get('memsize', self.__xmlObj.getKeyVal(self.__QramName, "memsize", "50M"))
+ self.__toPath = varlist.get('to', self.__xmlObj.getKeyVal(self.__QramName, "to", "/mnt/station_ramdisk"))
+ self.__ram_res_file = varlist.get('ram_res_file', self.__xmlObj.getKeyVal(self.__QramName, "ram_res_file", "ram_res.txt"))
+
+ self.__dummytest = varlist.get('dummytest', self.__xmlObj.getKeyVal(self.__QramName, "dummytest", "0"))
+ self.__dummyresult = varlist.get('dummytestresult', self.__xmlObj.getKeyVal(self.__QramName, "dummytest", "0"))
+
+ memtesterPath = self.__xmlObj.getKeyVal(self.__QramName, "memtesterPath", '')
+
+ if memtesterPath == '':
+ self.myMemtester = sh.memtester
+ else:
+ self.myMemtester = sh.Command(memtesterPath)
def execute(self):
+ self.__resultlist = []
try:
- p = sh.memtester(self.__memsize, "1", _out="/dev/null")
+ # mytime = MeasureTime()
+ res = self.myMemtester(self.__memsize, '{}'.format(self.__loops))
+ save_file_to_disk(filePath='{}/{}'.format(self.__toPath, self.__ram_res_file),
+ description='Ram result test',
+ mime='text/plain',
+ data=res.stdout.decode('utf-8'),
+ result=self.__resultlist)
+ # mytime.stop()
except sh.ErrorReturnCode as e:
- self.fail("failed: could not complete memtester command")
+ save_file_to_disk(filePath='{}/{}'.format(self.__toPath, self.__ram_res_file),
+ description='Ram result test',
+ mime='text/plain',
+ data=e.stdout.decode('utf-8'),
+ result=self.__resultlist)
+ self.fail("failed: memtester {}::{}".format(str(e.exit_code), e.stdout.decode('utf-8')))
+
+ except Exception as details:
+ self.fail('Error: {}'.format(details))
def getresults(self):
return self.__resultlist
diff --git a/test-cli/test/tests/qusb.py b/test-cli/test/tests/qusb.py
index 6302012..3182685 100644
--- a/test-cli/test/tests/qusb.py
+++ b/test-cli/test/tests/qusb.py
@@ -1,91 +1,90 @@
-import sh
+import shutil
import unittest
-from test.helpers.usb import USBDevices
import os.path
+import os
+import sh
+from test.helpers.md5 import md5_file
class Qusb(unittest.TestCase):
params = None
__resultlist = None # resultlist is a python list of python dictionaries
+ __QusbName = None
def __init__(self, testname, testfunc, varlist):
self.params = varlist
super(Qusb, self).__init__(testfunc)
self._testMethodDoc = testname
- if "repetitions" in varlist:
- self.__repetitions = varlist["repetitions"]
- else:
- raise Exception('repetitions param inside Qusb must be defined')
+ self.__xmlObj = varlist["xml"]
+
+ self.__QusbName = varlist.get('name', 'qusb')
+ self.__loops = varlist.get('loops', self.__xmlObj.getKeyVal(self.__QusbName, "loops", "3"))
+ self.__filepath_from = varlist.get('path', self.__xmlObj.getKeyVal(self.__QusbName, "path", "/root/hwtest-files"))
+ self.__file_source = varlist.get('file', self.__xmlObj.getKeyVal(self.__QusbName, "file", "usb-test.bin"))
+ self.__file_md5 = varlist.get('file_md5', self.__xmlObj.getKeyVal(self.__QusbName, "file_md5", "usb-test.bin.md5"))
+ self.__path_to = varlist.get('pathto', self.__xmlObj.getKeyVal(self.__QusbName, "pathto", "/mnt/test/disk2"))
+ self.__check_mounted = varlist.get('check_mounted', self.__xmlObj.getKeyVal(self.__QusbName, "check_mounted", ""))
+
self.__resultlist = []
- def execute(self):
- # get usb device
- dev_obj = USBDevices(self.params["xml"])
- if dev_obj.getMassStorage():
- device = dev_obj.getMassStorage()['disk'] + "1"
- else:
- self.fail("failed: No USB memory found.")
- # check if the device is mounted, and umount it
- try:
- p = sh.findmnt("-n", device)
- if p.exit_code == 0:
- sh.umount(device)
- except sh.ErrorReturnCode as e:
- # error = 1 means "no found"
- pass
- # mount the device
- sh.mkdir("-p", "/mnt/station_ramdisk/pendrive")
- p = sh.mount(device, "/mnt/station_ramdisk/pendrive")
- if p.exit_code != 0:
- self.fail("failed: Unable to mount the USB memory device.")
- # execute test
- for i in range(int(self.__repetitions)):
- # copy files
+ def removefileIfExist(self, fname):
+ if os.path.exists(fname):
try:
- p = sh.cp("/var/lib/hwtest-files/usbdatatest.bin",
- "/var/lib/hwtest-files/usbdatatest.bin.md5",
- "/mnt/station_ramdisk/pendrive")
- except sh.ErrorReturnCode as e:
- try:
- sh.umount("/mnt/station_ramdisk/pendrive")
- except sh.ErrorReturnCode:
- pass
- sh.rmdir("/mnt/station_ramdisk/pendrive")
- self.fail("failed: Unable to copy files to the USB memory device.")
- # check if the device is still mounted
- if not os.path.ismount("/mnt/station_ramdisk/pendrive"):
- sh.rm("/mnt/station_ramdisk/pendrive/*")
- sh.rmdir("/mnt/station_ramdisk/pendrive")
- self.fail("failed: USB device unmounted during/after copying files.")
- # check MD5
- try:
- p = sh.md5sum("/mnt/station_ramdisk/pendrive/usbdatatest.bin")
- except sh.ErrorReturnCode as e:
- try:
- sh.umount("/mnt/station_ramdisk/pendrive")
- except sh.ErrorReturnCode:
- pass
- sh.rmdir("/mnt/station_ramdisk/pendrive")
- self.fail("failed: Unable to calculate MD5 of the copied file.")
- newmd5 = p.stdout.decode().split(" ")[0]
- with open('/mnt/station_ramdisk/pendrive/usbdatatest.bin.md5', 'r') as outfile:
- oldmd5 = outfile.read().rstrip("\n")
- outfile.close()
- if newmd5 != oldmd5:
- try:
- sh.umount("/mnt/station_ramdisk/pendrive")
- except sh.ErrorReturnCode:
- pass
- sh.rmdir("/mnt/station_ramdisk/pendrive")
- self.fail("failed: MD5 check failed.")
- # delete copied files
- sh.rm("-f", "/mnt/station_ramdisk/pendrive/usbdatatest.bin", "/mnt/station_ramdisk/pendrive/usbdatatest.bin.md5")
- # Finish
+ os.remove(fname)
+ except OSError as error:
+ return False, str(error)
+ return True, ''
+
+ def checkfiles(self):
+ if not os.path.isfile('{}/{}'.format(self.__filepath_from, self.__file_source)):
+ return False, 'File test binary Not found {}/{}'.format(self.__filepath_from, self.__file_source)
+ if not os.path.isfile('{}/{}'.format(self.__filepath_from, self.__file_md5)):
+ return False, 'File test binary md5 Not found {}/{}'.format(self.__filepath_from, self.__file_md5)
+ if self.__check_mounted != '':
+ if not os.path.ismount('{}'.format(self.__check_mounted)):
+ return False, 'Required mounted failed: {}'.format(self.__path_to)
+ r, s = self.removefileIfExist('{}/{}'.format(self.__path_to, self.__file_source))
+ if not r:
+ return r, s
+ return True, ''
+
+ def getTestFile_md5(self, fname):
+ t = ''
+ with open('{}'.format(fname), 'r') as outfile:
+ t = outfile.read().rstrip("\n")
+ outfile.close()
+ return t
+
+ def ExecuteTranfer(self):
try:
- sh.umount("/mnt/station_ramdisk/pendrive")
- except sh.ErrorReturnCode:
- pass
- sh.rmdir("/mnt/station_ramdisk/pendrive")
+ originalMD5 = self.getTestFile_md5('{}/{}'.format(self.__filepath_from, self.__file_md5))
+ if originalMD5 == '':
+ self.fail('MD5 file {}/{} Not contain any md5'.format(self.__filepath_from, self.__file_md5))
+ # shutil.copy2('{}/{}'.format(self.__filepath_from, self.__file_source), '{}'.format(self.__path_to))
+ sh.cp('{}/{}'.format(self.__filepath_from, self.__file_source), '{}'.format(self.__path_to))
+ sh.sync()
+ md5val = md5_file('{}/{}'.format(self.__path_to, self.__file_source))
+ self.removefileIfExist('{}/{}'.format(self.__path_to, self.__file_source))
+ if originalMD5 != md5val:
+ return False, 'md5 check failed {}<>{}'.format(originalMD5, md5val)
+ except OSError as why:
+ return False,'Error: {} tranfer failed'.format(why.errno)
+ except Exception as details:
+ return False, 'USB Exception: {} tranfer failed'.format(details)
+ return True, ''
+
+ def execute(self):
+ v, m = self.checkfiles()
+ if not v:
+ self.fail(m)
+ countLoops = int(self.__loops)
+ while countLoops > 0:
+ res, msg = self.ExecuteTranfer()
+ if not res:
+ res, msg = self.ExecuteTranfer()
+ if not res:
+ self.fail(msg)
+ countLoops = countLoops - 1
def getresults(self):
return self.__resultlist
diff --git a/test-cli/test/tests/qusbdual.py b/test-cli/test/tests/qusbdual.py
deleted file mode 100644
index 05b22c3..0000000
--- a/test-cli/test/tests/qusbdual.py
+++ /dev/null
@@ -1,90 +0,0 @@
-import sh
-import unittest
-import os.path
-import time
-
-
-class Qusbdual(unittest.TestCase):
- params = None
- __resultlist = None # resultlist is a python list of python dictionaries
-
- def __init__(self, testname, testfunc, varlist):
- self.params = varlist
- super(Qusbdual, self).__init__(testfunc)
- self._testMethodDoc = testname
- if "repetitions" in varlist:
- self.__repetitions = varlist["repetitions"]
- else:
- raise Exception('repetitions param inside Qusbdual must be defined')
- self.__resultlist = []
-
- def execute(self):
- # check if file-as-filesystem exists
- if not os.path.isfile('/var/lib/hwtest-files/mass-otg-test.img'):
- self.fail("failed: Unable to find file-as-filesystem image.")
- # generate mass storage gadget
- p = sh.modprobe("g_mass_storage", "file=/var/lib/hwtest-files/mass-otg-test.img")
- if p.exit_code != 0:
- self.fail("failed: Unable to create a mass storage gadget.")
- # wait to detect the new device
- time.sleep(3)
- # find the mass storage device
- try:
- p = sh.grep(sh.lsblk("-So", "NAME,VENDOR"), "Linux")
- except sh.ErrorReturnCode:
- sh.modprobe("-r", "g_mass_storage")
- self.fail("failed: could not find any mass storage gadget")
- device = p.stdout.decode().split(" ")[0]
- # mount the mass storage gadget
- sh.mkdir("-p", "/mnt/station_ramdisk/hdd_gadget")
- p = sh.mount("-o", "ro", "/dev/" + device, "/mnt/station_ramdisk/hdd_gadget")
- if p.exit_code != 0:
- sh.modprobe("-r", "g_mass_storage")
- self.fail("failed: Unable to mount the mass storage gadget.")
- # execute test
- for i in range(int(self.__repetitions)):
- # copy files
- try:
- p = sh.cp("/mnt/station_ramdisk/hdd_gadget/usb-test.bin",
- "/mnt/station_ramdisk/hdd_gadget/usb-test.bin.md5",
- "/mnt/station_nfsdisk/")
- except sh.ErrorReturnCode as e:
- sh.umount("/mnt/station_ramdisk/hdd_gadget")
- sh.rmdir("/mnt/station_ramdisk/hdd_gadget")
- sh.modprobe("-r", "g_mass_storage")
- self.fail("failed: Unable to copy files through USB.")
- # check if the device is still mounted
- if not os.path.ismount("/mnt/station_ramdisk/hdd_gadget"):
- sh.rm("/mnt/station_ramdisk/hdd_gadget/*")
- sh.rmdir("/mnt/station_ramdisk/hdd_gadget")
- sh.modprobe("-r", "g_mass_storage")
- self.fail("failed: USB device unmounted during/after copying files.")
- # Check md5
- try:
- p = sh.md5sum("/mnt/station_nfsdisk/usb-test.bin")
- except sh.ErrorReturnCode as e:
- sh.umount("/mnt/station_ramdisk/hdd_gadget")
- sh.rmdir("/mnt/station_ramdisk/hdd_gadget")
- sh.modprobe("-r", "g_mass_storage")
- self.fail("failed: Unable to calculate MD5 of the copied file.")
- newmd5 = p.stdout.decode().split(" ")[0]
- with open('/mnt/station_ramdisk/hdd_gadget/usb-test.bin.md5', 'r') as outfile:
- oldmd5 = outfile.read().rstrip("\n")
- outfile.close()
- if newmd5 != oldmd5:
- sh.umount("/mnt/station_ramdisk/hdd_gadget")
- sh.rmdir("/mnt/station_ramdisk/hdd_gadget")
- sh.modprobe("-r", "g_mass_storage")
- self.fail("failed: MD5 check failed.")
- # delete copied files
- sh.rm("-f", "/mnt/station_nfsdisk/usb-test.bin", "/mnt/station_nfsdisk/usb-test.bin.md5")
- # Finish
- sh.umount("/mnt/station_ramdisk/hdd_gadget")
- sh.rmdir("/mnt/station_ramdisk/hdd_gadget")
- sh.modprobe("-r", "g_mass_storage")
-
- def getresults(self):
- return self.__resultlist
-
- def gettextresult(self):
- return ""
diff --git a/test-cli/test/tests/qwifi.py b/test-cli/test/tests/qwifi.py
index 3dd9e38..4695041 100644
--- a/test-cli/test/tests/qwifi.py
+++ b/test-cli/test/tests/qwifi.py
@@ -1,9 +1,11 @@
import unittest
-import sh
-import re
-import json
import time
-
+import uuid
+import iperf3
+import netifaces
+from test.helpers.iw import iwScan
+from test.helpers.utils import save_json_to_disk
+from test.helpers.utils import station2Port
class Qwifi(unittest.TestCase):
__serverip = None
@@ -19,81 +21,76 @@ class Qwifi(unittest.TestCase):
def __init__(self, testname, testfunc, varlist):
self.params = varlist
super(Qwifi, self).__init__(testfunc)
- if "serverip" in varlist:
- self.__serverip = varlist["serverip"]
- else:
- raise Exception('serverip param inside Qwifi have been be defined')
- if "bwexpected" in varlist:
- self.__bwexpected = varlist["bwexpected"]
- else:
- raise Exception('OKBW param inside Qwifi must be defined')
- if "port" in varlist:
- self.__port = varlist["port"]
- else:
- raise Exception('port param inside Qwifi must be defined')
- self.__numbytestx = "10M"
self._testMethodDoc = testname
+ self.__xmlObj = varlist["xml"]
+ self.__QwifiName = varlist.get('name', 'qwifi')
+ self.__loops = varlist.get('loops', self.__xmlObj.getKeyVal(self.__QwifiName, "loops", "1"))
+ self.__port = varlist.get('port', self.__xmlObj.getKeyVal(self.__QwifiName, "port", "5000"))
+ self.__bw = varlist.get('bwexpected', self.__xmlObj.getKeyVal(self.__QwifiName, "bwexpected", "5.0"))
+ self.__serverip = varlist.get('serverip', self.__xmlObj.getKeyVal(self.__QwifiName, "serverip", "localhost"))
+ self.__duration = varlist.get('duration', self.__xmlObj.getKeyVal(self.__QwifiName, "duration", "10"))
+ self.__interface = varlist.get('interface', self.__xmlObj.getKeyVal(self.__QwifiName, "interface", "wlan0"))
+ self.__toPath = varlist.get('to', self.__xmlObj.getKeyVal(self.__QwifiName, "to", "/mnt/station_ramdisk"))
+ self.__retriesCount = varlist.get('retries', self.__xmlObj.getKeyVal(self.__QwifiName, "retries", "5"))
+ self.__waitRetryTime = varlist.get('wait_retry', self.__xmlObj.getKeyVal(self.__QwifiName, "wait_retry", "10"))
+ self.__wifi_res_file = varlist.get('wifi_res_file', self.__xmlObj.getKeyVal(self.__QwifiName, "wifi_res_file", "wifi_test_{}.json"))
+ self.__wifi_st_file = varlist.get('wifi_st_file', self.__xmlObj.getKeyVal(self.__QwifiName, "wifi_st_file", "wifi_st_{}.json"))
+ self.__file_uuid = uuid.uuid4()
+ self.__wifi_res_file = self.__wifi_res_file.format(self.__file_uuid)
+ self.__wifi_st_file = self.__wifi_st_file.format(self.__file_uuid)
self.__resultlist = []
- def execute(self):
- # check if the board is connected to the router by wifi
- p = sh.iw("wlan0", "link")
- if p.exit_code == 0:
- # get the first line of the output stream
- out1 = p.stdout.decode('ascii').splitlines()[0]
- if out1 != "Not connected.":
- # check if the board has ip in the wlan0 interface
- p = sh.ifconfig("wlan0")
- if p.exit_code == 0:
- # check if wlan0 has an IP
- result = re.search(
- 'inet addr:(?!127\.0{1,3}\.0{1,3}\.0{0,2}1$)((25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.){3}(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)',
- p.stdout.decode('ascii'))
- if result:
- # execute iperf command against the server, but it implements attempts in case the server is busy
- iperfdone = False
- while not iperfdone:
- try:
- p = sh.iperf3("-c", self.__serverip, "-n", self.__numbytestx, "-f", "m", "-p", self.__port,
- "-J", _timeout=20)
- iperfdone = True
- except sh.TimeoutException:
- self.fail("failed: iperf timeout reached")
- except sh.ErrorReturnCode:
- time.sleep(self.timebetweenattempts)
-
- # check if it was executed succesfully
- if p.exit_code == 0:
- if p.stdout == "":
- self.fail("failed: error executing iperf command")
- # analyze output string
- data = json.loads(p.stdout.decode('ascii'))
- self.__bwreal = float(data['end']['sum_received']['bits_per_second']) / 1024 / 1024
- # save result file
- with open('/mnt/station_ramdisk/wifi-iperf3.json', 'w') as outfile:
- json.dump(data, outfile, indent=4)
- outfile.close()
- self.__resultlist.append(
- {
- "description": "iperf3 output",
- "filepath": "/mnt/station_ramdisk/wifi-iperf3.json",
- "mimetype": "application/json"
- }
- )
+ def checkInterface(self, reqInterface):
+ ifaces = netifaces.interfaces()
+ for t in ifaces:
+ if t == reqInterface:
+ return True
+ return False
- # check if BW is in the expected range
- if self.__bwreal < float(self.__bwexpected):
- self.fail("failed: speed is lower than expected. Speed(Mbits/s): " + str(self.__bwreal))
- else:
- self.fail("failed: could not complete iperf3 command")
- else:
- self.fail("failed: wlan0 interface doesn't have any ip address.")
+ def execute(self):
+ self.__resultlist = []
+ stationList = {}
+ # check interfaces
+ if not self.checkInterface(self.__interface):
+ self.fail('Interface not found: {}'.format(self.__interface))
+ # scan networks
+ scanner = iwScan(self.__interface)
+ if scanner.scan():
+ stationList = scanner.getStations()
+ # check if we are connected
+ if not scanner.findConnected():
+ self.fail('Not connected to test AP')
+ else:
+ strError = scanner.getLastError()
+ self.fail('qWifi scanner Execution error: {}'.format(strError))
+ # Start Iperf
+ client = iperf3.Client()
+ client.duration = int(self.__duration)
+ client.server_hostname = self.__serverip
+ client.port = station2Port(self.__port)
+ count = int(self.__retriesCount)
+ while count > 0:
+ result = client.run()
+ if result.error == None:
+ if result.received_Mbps >= float(self.__bw) and result.sent_Mbps >= float(self.__bw):
+ save_json_to_disk(filePath='{}/{}'.format(self.__toPath, self.__wifi_st_file),
+ description='wifi scan',
+ mime='application/json',
+ json_data=stationList,
+ result=self.__resultlist)
+ save_json_to_disk(filePath='{}/{}'.format(self.__toPath, self.__wifi_res_file),
+ description='wifi {} iperf3'.format(self.__interface),
+ mime='application/json',
+ json_data=result.json,
+ result=self.__resultlist)
+ return
else:
- self.fail("failed: could not complete ifconfig command.")
+ self.fail('bw fail: rx:{} tx:{}'.format(result.received_Mbps, result.sent_Mbps))
else:
- self.fail("failed: wifi module is not connected to the router.")
- else:
- self.fail("failed: could not execute iw command")
+ count = count - 1
+ time.sleep(int(self.__waitRetryTime))
+
+ self.fail('qWifi (max tries) Execution error: {}'.format(result.error))
def getresults(self):
return self.__resultlist