import unittest import sh import re import json class Qethernet(unittest.TestCase): __serverip = None __numbytestx = None __bwexpected = None __port = None params = None __bwreal = None # varlist content: serverip, bwexpected, port def __init__(self, testname, testfunc, varlist): # save the parameters list self.params = varlist # configure the function to be executed when the test runs. "testfunc" is a name of a method inside this # class, that in this situation, it can only be "execute". super(Qethernet, self).__init__(testfunc) # validate and get the parameters if "serverip" in varlist: self.__serverip = varlist["serverip"] else: raise Exception('sip param inside Qethernet have been be defined') if "bwexpected" in varlist: self.__bwexpected = varlist["bwexpected"] else: raise Exception('bwexpected param inside Qethernet must be defined') if "port" in varlist: self.__port = varlist["port"] else: raise Exception('port param inside Qethernet must be defined') self.__numbytestx = "10M" self._testMethodDoc = testname def execute(self): # execute iperf command against the server try: p = sh.iperf3("-c", self.__serverip, "-n", self.__numbytestx, "-f", "m", "-p", self.__port, "-J", _timeout=20) except sh.TimeoutException: self.fail("failed: iperf timeout reached") # check if it was executed succesfully if p.exit_code == 0: if p.stdout == "": self.fail("failed: error executing iperf command") # analyze output string data = json.loads(p.stdout.decode('ascii')) self.__bwreal = float(data['end']['sum_received']['bits_per_second'])/1024/1024 # save result file with open('/tmp/ethernet-iperf.json', 'w') as outfile: json.dump(data, outfile, indent=4) # check if BW is in the expected range self.failUnless(self.__bwreal > float(self.__bwexpected) * 0.9, "failed: speed is lower than spected. Speed(Mbits/s)" + str(self.__bwreal)) else: self.fail("failed: could not complete iperf command") def getresults(self): # resultlist is a python list of python dictionaries resultlist = [ { "desc": "iperf3 output", "data": "/tmp/ethernet-iperf.json", "type": "file" } ] return resultlist