summaryrefslogtreecommitdiff
path: root/test-cli/test/tests/qusb.py
diff options
context:
space:
mode:
Diffstat (limited to 'test-cli/test/tests/qusb.py')
-rw-r--r--test-cli/test/tests/qusb.py143
1 files changed, 71 insertions, 72 deletions
diff --git a/test-cli/test/tests/qusb.py b/test-cli/test/tests/qusb.py
index 6302012..3182685 100644
--- a/test-cli/test/tests/qusb.py
+++ b/test-cli/test/tests/qusb.py
@@ -1,91 +1,90 @@
-import sh
+import shutil
import unittest
-from test.helpers.usb import USBDevices
import os.path
+import os
+import sh
+from test.helpers.md5 import md5_file
class Qusb(unittest.TestCase):
params = None
__resultlist = None # resultlist is a python list of python dictionaries
+ __QusbName = None
def __init__(self, testname, testfunc, varlist):
self.params = varlist
super(Qusb, self).__init__(testfunc)
self._testMethodDoc = testname
- if "repetitions" in varlist:
- self.__repetitions = varlist["repetitions"]
- else:
- raise Exception('repetitions param inside Qusb must be defined')
+ self.__xmlObj = varlist["xml"]
+
+ self.__QusbName = varlist.get('name', 'qusb')
+ self.__loops = varlist.get('loops', self.__xmlObj.getKeyVal(self.__QusbName, "loops", "3"))
+ self.__filepath_from = varlist.get('path', self.__xmlObj.getKeyVal(self.__QusbName, "path", "/root/hwtest-files"))
+ self.__file_source = varlist.get('file', self.__xmlObj.getKeyVal(self.__QusbName, "file", "usb-test.bin"))
+ self.__file_md5 = varlist.get('file_md5', self.__xmlObj.getKeyVal(self.__QusbName, "file_md5", "usb-test.bin.md5"))
+ self.__path_to = varlist.get('pathto', self.__xmlObj.getKeyVal(self.__QusbName, "pathto", "/mnt/test/disk2"))
+ self.__check_mounted = varlist.get('check_mounted', self.__xmlObj.getKeyVal(self.__QusbName, "check_mounted", ""))
+
self.__resultlist = []
- def execute(self):
- # get usb device
- dev_obj = USBDevices(self.params["xml"])
- if dev_obj.getMassStorage():
- device = dev_obj.getMassStorage()['disk'] + "1"
- else:
- self.fail("failed: No USB memory found.")
- # check if the device is mounted, and umount it
- try:
- p = sh.findmnt("-n", device)
- if p.exit_code == 0:
- sh.umount(device)
- except sh.ErrorReturnCode as e:
- # error = 1 means "no found"
- pass
- # mount the device
- sh.mkdir("-p", "/mnt/station_ramdisk/pendrive")
- p = sh.mount(device, "/mnt/station_ramdisk/pendrive")
- if p.exit_code != 0:
- self.fail("failed: Unable to mount the USB memory device.")
- # execute test
- for i in range(int(self.__repetitions)):
- # copy files
+ def removefileIfExist(self, fname):
+ if os.path.exists(fname):
try:
- p = sh.cp("/var/lib/hwtest-files/usbdatatest.bin",
- "/var/lib/hwtest-files/usbdatatest.bin.md5",
- "/mnt/station_ramdisk/pendrive")
- except sh.ErrorReturnCode as e:
- try:
- sh.umount("/mnt/station_ramdisk/pendrive")
- except sh.ErrorReturnCode:
- pass
- sh.rmdir("/mnt/station_ramdisk/pendrive")
- self.fail("failed: Unable to copy files to the USB memory device.")
- # check if the device is still mounted
- if not os.path.ismount("/mnt/station_ramdisk/pendrive"):
- sh.rm("/mnt/station_ramdisk/pendrive/*")
- sh.rmdir("/mnt/station_ramdisk/pendrive")
- self.fail("failed: USB device unmounted during/after copying files.")
- # check MD5
- try:
- p = sh.md5sum("/mnt/station_ramdisk/pendrive/usbdatatest.bin")
- except sh.ErrorReturnCode as e:
- try:
- sh.umount("/mnt/station_ramdisk/pendrive")
- except sh.ErrorReturnCode:
- pass
- sh.rmdir("/mnt/station_ramdisk/pendrive")
- self.fail("failed: Unable to calculate MD5 of the copied file.")
- newmd5 = p.stdout.decode().split(" ")[0]
- with open('/mnt/station_ramdisk/pendrive/usbdatatest.bin.md5', 'r') as outfile:
- oldmd5 = outfile.read().rstrip("\n")
- outfile.close()
- if newmd5 != oldmd5:
- try:
- sh.umount("/mnt/station_ramdisk/pendrive")
- except sh.ErrorReturnCode:
- pass
- sh.rmdir("/mnt/station_ramdisk/pendrive")
- self.fail("failed: MD5 check failed.")
- # delete copied files
- sh.rm("-f", "/mnt/station_ramdisk/pendrive/usbdatatest.bin", "/mnt/station_ramdisk/pendrive/usbdatatest.bin.md5")
- # Finish
+ os.remove(fname)
+ except OSError as error:
+ return False, str(error)
+ return True, ''
+
+ def checkfiles(self):
+ if not os.path.isfile('{}/{}'.format(self.__filepath_from, self.__file_source)):
+ return False, 'File test binary Not found {}/{}'.format(self.__filepath_from, self.__file_source)
+ if not os.path.isfile('{}/{}'.format(self.__filepath_from, self.__file_md5)):
+ return False, 'File test binary md5 Not found {}/{}'.format(self.__filepath_from, self.__file_md5)
+ if self.__check_mounted != '':
+ if not os.path.ismount('{}'.format(self.__check_mounted)):
+ return False, 'Required mounted failed: {}'.format(self.__path_to)
+ r, s = self.removefileIfExist('{}/{}'.format(self.__path_to, self.__file_source))
+ if not r:
+ return r, s
+ return True, ''
+
+ def getTestFile_md5(self, fname):
+ t = ''
+ with open('{}'.format(fname), 'r') as outfile:
+ t = outfile.read().rstrip("\n")
+ outfile.close()
+ return t
+
+ def ExecuteTranfer(self):
try:
- sh.umount("/mnt/station_ramdisk/pendrive")
- except sh.ErrorReturnCode:
- pass
- sh.rmdir("/mnt/station_ramdisk/pendrive")
+ originalMD5 = self.getTestFile_md5('{}/{}'.format(self.__filepath_from, self.__file_md5))
+ if originalMD5 == '':
+ self.fail('MD5 file {}/{} Not contain any md5'.format(self.__filepath_from, self.__file_md5))
+ # shutil.copy2('{}/{}'.format(self.__filepath_from, self.__file_source), '{}'.format(self.__path_to))
+ sh.cp('{}/{}'.format(self.__filepath_from, self.__file_source), '{}'.format(self.__path_to))
+ sh.sync()
+ md5val = md5_file('{}/{}'.format(self.__path_to, self.__file_source))
+ self.removefileIfExist('{}/{}'.format(self.__path_to, self.__file_source))
+ if originalMD5 != md5val:
+ return False, 'md5 check failed {}<>{}'.format(originalMD5, md5val)
+ except OSError as why:
+ return False,'Error: {} tranfer failed'.format(why.errno)
+ except Exception as details:
+ return False, 'USB Exception: {} tranfer failed'.format(details)
+ return True, ''
+
+ def execute(self):
+ v, m = self.checkfiles()
+ if not v:
+ self.fail(m)
+ countLoops = int(self.__loops)
+ while countLoops > 0:
+ res, msg = self.ExecuteTranfer()
+ if not res:
+ res, msg = self.ExecuteTranfer()
+ if not res:
+ self.fail(msg)
+ countLoops = countLoops - 1
def getresults(self):
return self.__resultlist