from test.helpers.syscmd import SysCommand import unittest class Qethernet(unittest.TestCase): __sip = None __raw_out = None __MB_req = None __MB_real = None __BW_real = None __dat_list = None __bind = None __OKBW = None def __init__(self, testname, testfunc, sip = None, OKBW=100, bind=None): super(Qethernet, self).__init__(testfunc) if sip is not None: self.__sip = sip if sip is not None: self.__bind = bind self.__MB_req = '10' self.__OKBW=OKBW self._testMethodDoc = testname def execute(self): print if self.__bind is None: str_cmd = "iperf -c {} -x CMSV -n {}M".format(self.__sip, self.__MB_req) else: str_cmd = "iperf -c {} -x CMSV -n {}M -B {}".format(self.__sip, self.__MB_req, self.__bind) iperf_command = SysCommand("iperf", str_cmd) if iperf_command.execute() == 0: self.__raw_out = iperf_command.getOutput() if self.__raw_out == "": return -1 lines = iperf_command.getOutput().splitlines() dat = lines[1] dat = dat.decode('ascii') dat_list = dat.split( ) for d in dat_list: a = dat_list.pop(0) if a == "sec": break self.__MB_real = dat_list[0] self.__BW_real = dat_list[2] self.__dat_list = dat_list #print(self.__MB_real) #print(self.__BW_real) self.failUnless(float(self.__BW_real)>float(self.__OKBW)*0.9,"failed: speed is lower than spected. Speed(MB/s)" + str(self.__BW_real)) else: self.fail("failed: could not complete iperf command") def get_Total_MB(self): return self.__MB_real; def get_Total_BW(self): return self.__MB_real;