blob: 2ac447c56e12cbd2e61e443357c56c7e9b1cf244 (
plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
|
from test.helpers.syscmd import SysCommand
import unittest
class Qethernet(unittest.TestCase):
__sip = None
__raw_out = None
__MB_req = None
__MB_real = None
__BW_real = None
__dat_list = None
__bind = None
__OKBW = None
def __init__(self, testname, testfunc, sip = None, OKBW=100, bind=None):
super(Qethernet, self).__init__(testfunc)
if sip is not None:
self.__sip = sip
if sip is not None:
self.__bind = bind
self.__MB_req = '10'
self.__OKBW=OKBW
self._testMethodDoc = testname
def execute(self):
print
if self.__bind is None:
str_cmd = "iperf -c {} -x CMSV -n {}M".format(self.__sip, self.__MB_req)
else:
str_cmd = "iperf -c {} -x CMSV -n {}M -B {}".format(self.__sip, self.__MB_req, self.__bind)
iperf_command = SysCommand("iperf", str_cmd)
if iperf_command.execute() == 0:
self.__raw_out = iperf_command.getOutput()
if self.__raw_out == "":
return -1
lines = iperf_command.getOutput().splitlines()
dat = lines[1]
dat = dat.decode('ascii')
dat_list = dat.split( )
for d in dat_list:
a = dat_list.pop(0)
if a == "sec":
break
self.__MB_real = dat_list[0]
self.__BW_real = dat_list[2]
self.__dat_list = dat_list
#print(self.__MB_real)
#print(self.__BW_real)
self.failUnless(float(self.__BW_real)>float(self.__OKBW)*0.9,"failed: speed is lower than spected. Speed(MB/s)" + str(self.__BW_real))
else:
self.fail("failed: could not complete iperf command")
def get_Total_MB(self):
return self.__MB_real;
def get_Total_BW(self):
return self.__MB_real;
|