1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
|
import unittest
import sh
import ex
class Qethernet(unittest.TestCase):
__sip = None
__numbytestx = None
__bind = None
__OKBW = None
def __init__(self, testname, testfunc, varlist):
super(Qethernet, self).__init__(testfunc)
if "sip" in varlist:
self.__sip = varlist["sip"]
else:
raise Exception('sip param inside Qethernet have been be defined')
if "bind" in varlist:
self.__bind = varlist["bind"]
else:
self.__bind = None
if "OKBW" in varlist:
self.__OKBW = varlist["OKBW"]
else:
raise Exception('OKBW param inside Qethernet must be defined')
self.__numbytestx = "10M"
self._testMethodDoc = testname
def execute(self):
# execute iperf command against the server
if self.__bind is None:
p = sh.iperf("-c", self.__sip, "-x", "CMSV", "-n", self.__numbytestx, "-f", "m")
else:
p = sh.iperf("-c", self.__sip, "-x", "CMSV", "-n", self.__numbytestx, "-f", "m", "-B", self.__bind)
#check if it was executed succesfully
if p.exit_code == 0:
if p.stdout == "":
self.fail("failed: error executing iperf command")
# analyze output string
# split by lines
lines = p.stdout.splitlines()
# get first line
dat = lines[1].decode('ascii')
# search for the BW value
a = re.search("\d+(\.\d)? Mbits/sec",dat)
b = a.group().split( )
BWreal = b[0]
# check if BW is in the expected range
self.failUnless(float(BWreal)>float(self.__OKBW)*0.9,"failed: speed is lower than spected. Speed(MB/s)" + str(BWreal))
else:
self.fail("failed: could not complete iperf command")
|