summaryrefslogtreecommitdiff
path: root/test-cli
diff options
context:
space:
mode:
Diffstat (limited to 'test-cli')
-rw-r--r--test-cli/setup.xml2
-rw-r--r--test-cli/test/tests/qeeprom.py62
-rw-r--r--test-cli/test/tests/qethernet.py24
-rw-r--r--test-cli/test/tests/qwifi.py88
-rw-r--r--test-cli/test_main.py30
5 files changed, 132 insertions, 74 deletions
diff --git a/test-cli/setup.xml b/test-cli/setup.xml
index 3feffd0..db9e7db 100644
--- a/test-cli/setup.xml
+++ b/test-cli/setup.xml
@@ -2,7 +2,7 @@
<data>
<setup>
<test idline="1"/> <!-- Test line identify -->
- <board model="SOPA0000" variant="BW2Q-QXXX" station="1"/>
+ <board model="SOPA0000" variant="BW2Q-QXXX"/>
<db dbname="testsrv" type="PgSQLConnection" host="192.168.8.1" port="5432" user="admin" password="Idkfa2009" /> <!-- database setup -->
</setup>
</data>
diff --git a/test-cli/test/tests/qeeprom.py b/test-cli/test/tests/qeeprom.py
index 2bab248..5bb0755 100644
--- a/test-cli/test/tests/qeeprom.py
+++ b/test-cli/test/tests/qeeprom.py
@@ -1,38 +1,44 @@
-from test.helpers.syscmd import SysCommand
+import sh
import unittest
-import uuid
+
class Qeeprom(unittest.TestCase):
+ __position = None
+ __uuid = None
+ __eeprompath = None
+
+ # varlist: position, uuid, eeprompath
def __init__(self, testname, testfunc, varlist):
super(Qeeprom, self).__init__(testfunc)
self._testMethodDoc = testname
+ if "position" in varlist:
+ self.__position = varlist["position"]
+ else:
+ raise Exception('position param inside Qeeprom must be defined')
+ if "uuid" in varlist:
+ self.__uuid = varlist["uuid"]
+ else:
+ raise Exception('uuid param inside Qeeprom must be defined')
+ if "eeprompath" in varlist:
+ self.__eeprompath = varlist["eeprompath"]
+ else:
+ raise Exception('eeprompath param inside Qeeprom must be defined')
def execute(self):
- str_cmd = "find /sys/ -iname 'eeprom'"
- eeprom_location = SysCommand("eeprom_location", str_cmd)
- if eeprom_location.execute() == 0:
- self.__raw_out = eeprom_location.getOutput()
- if self.__raw_out == "":
- self.fail("Unable to get EEPROM location. IS EEPROM CONNECTED?")
- return -1
- eeprom=self.__raw_out.decode('ascii')
- test_uuid = uuid.uuid4()
- str_cmd="echo '{}' > {}".format(str(test_uuid), eeprom)
- eeprom_write = SysCommand("eeprom_write", str_cmd)
- if eeprom_write.execute() == 0:
- self.__raw_out = eeprom_write.getOutput()
- if self.__raw_out == "":
- self.fail("Unable to write on the EEPROM?")
- return -1
- str_cmd = "head -2 {}".format(eeprom)
- eeprom_read = SysCommand("eeprom_read", str_cmd)
- if eeprom_read.execute() == 0:
- self.__raw_out = eeprom_read.getOutput()
- if self.__raw_out == "":
- self.fail("Unable to read from the EEPROM?")
- return -1
- if(str(self.__raw_out).find(str(test_uuid)) == -1):
- self.fail("failed: READ/WRITE mismatch")
+ # write data into the eeprom
+ p = sh.dd(sh.echo(self._uuid), "of=" + self.__eeprompath, "bs=1", "seek=" + self.__position)
+ if p.exit_code == 0:
+ # read data from the eeprom
+ p = sh.dd("if=" + self.__eeprompath, "bs=1", "skip=" + self.__position, "count=" + len(self.__uuid))
+ if p.exit_code == 0:
+ uuid_rx = p.stdout.decode('ascii')
+ # compare both values
+ if (uuid_rx != self.__uuid):
+ self.fail("failed: mismatch between written and received values.")
+
+ else:
+ self.fail("failed: Unable to read from the EEPROM device.")
else:
- self.fail("failed: could not complete find eeprom command")
+ self.fail("failed: Unable to write on the EEPROM device.")
+
diff --git a/test-cli/test/tests/qethernet.py b/test-cli/test/tests/qethernet.py
index be984f5..22d796c 100644
--- a/test-cli/test/tests/qethernet.py
+++ b/test-cli/test/tests/qethernet.py
@@ -1,12 +1,14 @@
import unittest
import sh
-import ex
+import re
+
class Qethernet(unittest.TestCase):
__sip = None
__numbytestx = None
__bind = None
__OKBW = None
+ __port = None
def __init__(self, testname, testfunc, varlist):
super(Qethernet, self).__init__(testfunc)
@@ -22,16 +24,21 @@ class Qethernet(unittest.TestCase):
self.__OKBW = varlist["OKBW"]
else:
raise Exception('OKBW param inside Qethernet must be defined')
+ if "port" in varlist:
+ self.__port = varlist["port"]
+ else:
+ raise Exception('port param inside Qethernet must be defined')
self.__numbytestx = "10M"
self._testMethodDoc = testname
def execute(self):
# execute iperf command against the server
if self.__bind is None:
- p = sh.iperf("-c", self.__sip, "-x", "CMSV", "-n", self.__numbytestx, "-f", "m")
+ p = sh.iperf("-c", self.__sip, "-x", "CMSV", "-n", self.__numbytestx, "-f", "m", "-p", self.__port)
else:
- p = sh.iperf("-c", self.__sip, "-x", "CMSV", "-n", self.__numbytestx, "-f", "m", "-B", self.__bind)
- #check if it was executed succesfully
+ p = sh.iperf("-c", self.__sip, "-x", "CMSV", "-n", self.__numbytestx, "-f", "m", "-p", self.__port, "-B",
+ self.__bind)
+ # check if it was executed succesfully
if p.exit_code == 0:
if p.stdout == "":
self.fail("failed: error executing iperf command")
@@ -41,11 +48,12 @@ class Qethernet(unittest.TestCase):
# get first line
dat = lines[1].decode('ascii')
# search for the BW value
- a = re.search("\d+(\.\d)? Mbits/sec",dat)
- b = a.group().split( )
- BWreal = b[0]
+ a = re.search("\d+(\.\d)? Mbits/sec", dat)
+ b = a.group().split()
+ bwreal = b[0]
# check if BW is in the expected range
- self.failUnless(float(BWreal)>float(self.__OKBW)*0.9,"failed: speed is lower than spected. Speed(MB/s)" + str(BWreal))
+ self.failUnless(float(bwreal) > float(self.__OKBW) * 0.9,
+ "failed: speed is lower than spected. Speed(MB/s)" + str(bwreal))
else:
self.fail("failed: could not complete iperf command")
diff --git a/test-cli/test/tests/qwifi.py b/test-cli/test/tests/qwifi.py
index 188e300..154dd52 100644
--- a/test-cli/test/tests/qwifi.py
+++ b/test-cli/test/tests/qwifi.py
@@ -1,33 +1,81 @@
-from test.helpers.syscmd import SysCommand
import unittest
+import sh
+import re
+
class Qwifi(unittest.TestCase):
+ __sip = None
+ __numbytestx = None
+ __bind = None
+ __OKBW = None
+ __port = None
+ #varlist: sip, bind, OKBW, port
def __init__(self, testname, testfunc, varlist):
super(Qwifi, self).__init__(testfunc)
- if "signal" in varlist:
- self.__signal = varlist["signal"]
+ if "sip" in varlist:
+ self.__sip = varlist["sip"]
+ else:
+ raise Exception('sip param inside Qwifi have been be defined')
+ if "OKBW" in varlist:
+ self.__OKBW = varlist["OKBW"]
+ else:
+ raise Exception('OKBW param inside Qwifi must be defined')
+ if "port" in varlist:
+ self.__port = varlist["port"]
else:
- raise Exception('signal param inside Qwifi must be defined')
+ raise Exception('port param inside Qwifi must be defined')
+ if "bind" in varlist:
+ self.__bind = varlist["bind"]
+ else:
+ self.__bind = None
+ self.__numbytestx = "10M"
self._testMethodDoc = testname
def execute(self):
- str_cmd= "iw wlan0 link"
- wifi_stats = SysCommand("wifi_stats", str_cmd)
- if wifi_stats.execute() == 0:
- w_stats = wifi_stats.getOutput().decode('ascii').splitlines()
- #self._longMessage = str(self.__raw_out).replace("'", "")
- if not w_stats[0] == "Not connected.":
- signal_st = w_stats[5].split(" ")[1]
- try:
- int(signal_st)
- if -1*int(signal_st) > int(self.__signal):
- self.fail("failed: signal to server lower than expected")
- except ValueError:
- self.fail("failed: error output (Bad connection?)")
+ # check if the board is connected to the router by wifi
+ p = sh.iw("wlan0", "link")
+ if p.exit_code == 0:
+ # get the first line of the output stream
+ out1 = p.stdout.decode('ascii').splitlines()[0]
+ if out1 != "Not connected.":
+ #check if the board has ip in the wlan0 interface
+ p = sh.ifconfig("wlan0")
+ if p.exit_code == 0:
+ result = re.search("inet addr:(?!127\.0{1,3}\.0{1,3}\.0{0,2}1$)((25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.){3}(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)",
+ p.stdout)
+ if result:
+ # execute iperf command against the server
+ if self.__bind is None:
+ p = sh.iperf("-c", self.__sip, "-x", "CMSV", "-n", self.__numbytestx, "-f", "m", "-p",
+ self.__port)
+ else:
+ p = sh.iperf("-c", self.__sip, "-x", "CMSV", "-n", self.__numbytestx, "-f", "m", "-p",
+ self.__port, "-B", self.__bind)
+ # check if it was executed succesfully
+ if p.exit_code == 0:
+ if p.stdout == "":
+ self.fail("failed: error executing iperf command")
+ # analyze output string
+ # split by lines
+ lines = p.stdout.splitlines()
+ # get first line
+ dat = lines[1].decode('ascii')
+ # search for the BW value
+ a = re.search("\d+(\.\d)? Mbits/sec", dat)
+ b = a.group().split()
+ bwreal = b[0]
+
+ # check if BW is in the expected range
+ self.failUnless(float(bwreal) > float(self.__OKBW) * 0.9,
+ "failed: speed is lower than spected. Speed(MB/s)" + str(bwreal))
+ else:
+ self.fail("failed: could not complete iperf command")
+ else:
+ self.fail("failed: wlan0 interface doesn't have any ip address.")
+ else:
+ self.fail("failed: could not complete ifconfig command.")
else:
- self.fail("failed: error output (Bad connection?)")
- #tx_brate = float(w_stats[6].split(" ")[2])
+ self.fail("failed: wifi module is not connected to the router.")
else:
self.fail("failed: couldn't execute iw command")
-
diff --git a/test-cli/test_main.py b/test-cli/test_main.py
index 6680082..b657f00 100644
--- a/test-cli/test_main.py
+++ b/test-cli/test_main.py
@@ -32,15 +32,15 @@ import re
from test.helpers.iseelogger import ISEE_Logger
import logging
-
psdbObj = TestSrv_Database()
xmlObj = None
loggerObj = None
+
# define clear function
def clear():
# check and make call for specific operating system
- _ = call('clear' if os.name =='posix' else 'cls')
+ _ = call('clear' if os.name == 'posix' else 'cls')
def create_board():
@@ -48,25 +48,21 @@ def create_board():
variant = xmlObj.gettagKey('board', 'variant')
# get model id
- globalVar.g_mid=model_id + "-" + variant
+ globalVar.g_mid = model_id + "-" + variant
- # get station number
- hstname = socket.gethostname()
- if (re.search("^Station\d{2}$", hstname)):
- globalVar.station = int(re.search("\d{2}", hstname).group(0))
- else:
- raise Exception('Station number not configured')
- exit(3)
+ # get station
+ globalVar.station = socket.gethostname()
processor_id = genDieid(globalVar.g_mid)
print(globalVar.g_mid)
print(processor_id)
- globalVar.g_uuid = psdbObj.create_board(processor_id, model_id, variant, bmac = None)
+ globalVar.g_uuid = psdbObj.create_board(processor_id, model_id, variant, bmac=None)
+
def createvarlist(testvars):
varlist = {}
for row in testvars:
- varname,testparam = row
+ varname, testparam = row
varlist[varname] = testparam
return varlist
@@ -75,13 +71,13 @@ def testsuite():
suite = unittest.TestSuite()
tests = psdbObj.getboard_comp_test_list(globalVar.g_uuid)
for row in tests:
- testdefname,testfunc = row
+ testdefname, testfunc = row
testvars = psdbObj.getboard_test_variables(globalVar.g_uuid, testdefname)
varlist = createvarlist(testvars)
command = "suite.addTest({}('{}','execute', varlist))".format(testfunc, testdefname)
exec(command)
-
- globalVar.testid_ctl=psdbObj.open_testbatch(globalVar.g_uuid, globalVar.station)
+
+ globalVar.testid_ctl = psdbObj.open_testbatch(globalVar.g_uuid, globalVar.station)
return suite
@@ -90,6 +86,7 @@ def finish_test():
# Update Set Test status with FINISH so that status column is not modified because close_testbatch already did it.
psdbObj.update_set_test_row(globalVar.station, globalVar.testid_ctl, globalVar.g_uuid, 'END', 'FINISH')
+
def main():
create_board()
try:
@@ -120,6 +117,5 @@ if __name__ == "__main__":
if psdbObj.open(xmlObj):
main()
else:
- print("Error: Cannot open DB connection")
+ print("Error: Cannot open DB connection")
exit(2)
-