From d46bce422fd03cd57d1ba336361da17d6efb48db Mon Sep 17 00:00:00 2001 From: Manel Caro Date: Fri, 31 Jul 2020 02:07:37 +0200 Subject: TEST restructure --- test-cli/test/tests/qusb.py | 143 ++++++++++++++++++++++---------------------- 1 file changed, 71 insertions(+), 72 deletions(-) (limited to 'test-cli/test/tests/qusb.py') diff --git a/test-cli/test/tests/qusb.py b/test-cli/test/tests/qusb.py index 6302012..3182685 100644 --- a/test-cli/test/tests/qusb.py +++ b/test-cli/test/tests/qusb.py @@ -1,91 +1,90 @@ -import sh +import shutil import unittest -from test.helpers.usb import USBDevices import os.path +import os +import sh +from test.helpers.md5 import md5_file class Qusb(unittest.TestCase): params = None __resultlist = None # resultlist is a python list of python dictionaries + __QusbName = None def __init__(self, testname, testfunc, varlist): self.params = varlist super(Qusb, self).__init__(testfunc) self._testMethodDoc = testname - if "repetitions" in varlist: - self.__repetitions = varlist["repetitions"] - else: - raise Exception('repetitions param inside Qusb must be defined') + self.__xmlObj = varlist["xml"] + + self.__QusbName = varlist.get('name', 'qusb') + self.__loops = varlist.get('loops', self.__xmlObj.getKeyVal(self.__QusbName, "loops", "3")) + self.__filepath_from = varlist.get('path', self.__xmlObj.getKeyVal(self.__QusbName, "path", "/root/hwtest-files")) + self.__file_source = varlist.get('file', self.__xmlObj.getKeyVal(self.__QusbName, "file", "usb-test.bin")) + self.__file_md5 = varlist.get('file_md5', self.__xmlObj.getKeyVal(self.__QusbName, "file_md5", "usb-test.bin.md5")) + self.__path_to = varlist.get('pathto', self.__xmlObj.getKeyVal(self.__QusbName, "pathto", "/mnt/test/disk2")) + self.__check_mounted = varlist.get('check_mounted', self.__xmlObj.getKeyVal(self.__QusbName, "check_mounted", "")) + self.__resultlist = [] - def execute(self): - # get usb device - dev_obj = USBDevices(self.params["xml"]) - if dev_obj.getMassStorage(): - device = dev_obj.getMassStorage()['disk'] + "1" - else: - self.fail("failed: No USB memory found.") - # check if the device is mounted, and umount it - try: - p = sh.findmnt("-n", device) - if p.exit_code == 0: - sh.umount(device) - except sh.ErrorReturnCode as e: - # error = 1 means "no found" - pass - # mount the device - sh.mkdir("-p", "/mnt/station_ramdisk/pendrive") - p = sh.mount(device, "/mnt/station_ramdisk/pendrive") - if p.exit_code != 0: - self.fail("failed: Unable to mount the USB memory device.") - # execute test - for i in range(int(self.__repetitions)): - # copy files + def removefileIfExist(self, fname): + if os.path.exists(fname): try: - p = sh.cp("/var/lib/hwtest-files/usbdatatest.bin", - "/var/lib/hwtest-files/usbdatatest.bin.md5", - "/mnt/station_ramdisk/pendrive") - except sh.ErrorReturnCode as e: - try: - sh.umount("/mnt/station_ramdisk/pendrive") - except sh.ErrorReturnCode: - pass - sh.rmdir("/mnt/station_ramdisk/pendrive") - self.fail("failed: Unable to copy files to the USB memory device.") - # check if the device is still mounted - if not os.path.ismount("/mnt/station_ramdisk/pendrive"): - sh.rm("/mnt/station_ramdisk/pendrive/*") - sh.rmdir("/mnt/station_ramdisk/pendrive") - self.fail("failed: USB device unmounted during/after copying files.") - # check MD5 - try: - p = sh.md5sum("/mnt/station_ramdisk/pendrive/usbdatatest.bin") - except sh.ErrorReturnCode as e: - try: - sh.umount("/mnt/station_ramdisk/pendrive") - except sh.ErrorReturnCode: - pass - sh.rmdir("/mnt/station_ramdisk/pendrive") - self.fail("failed: Unable to calculate MD5 of the copied file.") - newmd5 = p.stdout.decode().split(" ")[0] - with open('/mnt/station_ramdisk/pendrive/usbdatatest.bin.md5', 'r') as outfile: - oldmd5 = outfile.read().rstrip("\n") - outfile.close() - if newmd5 != oldmd5: - try: - sh.umount("/mnt/station_ramdisk/pendrive") - except sh.ErrorReturnCode: - pass - sh.rmdir("/mnt/station_ramdisk/pendrive") - self.fail("failed: MD5 check failed.") - # delete copied files - sh.rm("-f", "/mnt/station_ramdisk/pendrive/usbdatatest.bin", "/mnt/station_ramdisk/pendrive/usbdatatest.bin.md5") - # Finish + os.remove(fname) + except OSError as error: + return False, str(error) + return True, '' + + def checkfiles(self): + if not os.path.isfile('{}/{}'.format(self.__filepath_from, self.__file_source)): + return False, 'File test binary Not found {}/{}'.format(self.__filepath_from, self.__file_source) + if not os.path.isfile('{}/{}'.format(self.__filepath_from, self.__file_md5)): + return False, 'File test binary md5 Not found {}/{}'.format(self.__filepath_from, self.__file_md5) + if self.__check_mounted != '': + if not os.path.ismount('{}'.format(self.__check_mounted)): + return False, 'Required mounted failed: {}'.format(self.__path_to) + r, s = self.removefileIfExist('{}/{}'.format(self.__path_to, self.__file_source)) + if not r: + return r, s + return True, '' + + def getTestFile_md5(self, fname): + t = '' + with open('{}'.format(fname), 'r') as outfile: + t = outfile.read().rstrip("\n") + outfile.close() + return t + + def ExecuteTranfer(self): try: - sh.umount("/mnt/station_ramdisk/pendrive") - except sh.ErrorReturnCode: - pass - sh.rmdir("/mnt/station_ramdisk/pendrive") + originalMD5 = self.getTestFile_md5('{}/{}'.format(self.__filepath_from, self.__file_md5)) + if originalMD5 == '': + self.fail('MD5 file {}/{} Not contain any md5'.format(self.__filepath_from, self.__file_md5)) + # shutil.copy2('{}/{}'.format(self.__filepath_from, self.__file_source), '{}'.format(self.__path_to)) + sh.cp('{}/{}'.format(self.__filepath_from, self.__file_source), '{}'.format(self.__path_to)) + sh.sync() + md5val = md5_file('{}/{}'.format(self.__path_to, self.__file_source)) + self.removefileIfExist('{}/{}'.format(self.__path_to, self.__file_source)) + if originalMD5 != md5val: + return False, 'md5 check failed {}<>{}'.format(originalMD5, md5val) + except OSError as why: + return False,'Error: {} tranfer failed'.format(why.errno) + except Exception as details: + return False, 'USB Exception: {} tranfer failed'.format(details) + return True, '' + + def execute(self): + v, m = self.checkfiles() + if not v: + self.fail(m) + countLoops = int(self.__loops) + while countLoops > 0: + res, msg = self.ExecuteTranfer() + if not res: + res, msg = self.ExecuteTranfer() + if not res: + self.fail(msg) + countLoops = countLoops - 1 def getresults(self): return self.__resultlist -- cgit v1.1