From 9ac8a326412b04e4873b883c2f2a056ca0b22480 Mon Sep 17 00:00:00 2001 From: Hector Fernandez Date: Thu, 25 Jun 2020 14:42:42 +0200 Subject: Modified USB test and the way it finds USB memory storages. --- test-cli/setup.xml | 8 ++ test-cli/test/helpers/setup_xml.py | 48 +++++-- test-cli/test/helpers/syscmd.py | 88 ++++++++---- test-cli/test/helpers/usb.py | 270 +++++++++++++++++++++++++++++++++++++ test-cli/test/tests/qusb.py | 26 ++-- test-cli/test_main.py | 2 +- 6 files changed, 387 insertions(+), 55 deletions(-) create mode 100644 test-cli/test/helpers/usb.py (limited to 'test-cli') diff --git a/test-cli/setup.xml b/test-cli/setup.xml index 42c4be9..5220887 100644 --- a/test-cli/setup.xml +++ b/test-cli/setup.xml @@ -3,5 +3,13 @@ + + + + + + + + diff --git a/test-cli/test/helpers/setup_xml.py b/test-cli/test/helpers/setup_xml.py index eb8d73c..603a2ac 100644 --- a/test-cli/test/helpers/setup_xml.py +++ b/test-cli/test/helpers/setup_xml.py @@ -1,22 +1,23 @@ import xml.etree.ElementTree as XMLParser + class XMLSetup (object): - """aaaaa""" + """XML Setup Parser""" __tree = None # Parser __dbType = None # database connection required: PgSQLConnection __dbConnectionRaw = None # Connection string in raw __dbConnectionStr = None # Connection string to use in sql object connection def __init__(self, filename): - """aaaaa""" + """Parse the file in the constructor""" self.__tree = XMLParser.parse(filename) def __del__(self): - """aaaaa""" + """Destructor do nothing""" pass - def getdbConnectionStr (self): - """aaaaa""" + def getdbConnectionStr(self): + """XML to database connection string""" if self.__dbConnectionRaw is not None: return self.__dbConnectionRaw @@ -29,19 +30,44 @@ class XMLSetup (object): return None - def getPostgresConnectionStr (self): - """aaaaa""" + def getPostgresConnectionStr(self): + """Get Connection string """ str = self.__dbConnectionRaw del str['type'] return str - def getMysqlConnectionStr (self): - """aaaaa""" - pass + def getBoard(self, key, default): + for element in self.__tree.iter('board'): + if key in element.attrib: + return element.attrib[key] + return default + + def getTempPath(self, key, default): + for element in self.__tree.iter('tmpPath'): + if key in element.attrib: + return element.attrib[key] + return default + + def getKeyVal(self, tag, key, default): + for element in self.__tree.iter(tag): + if key in element.attrib: + return element.attrib[key] + return default def gettagKey (self, xmltag, xmlkey): """aaaaa""" for element in self.__tree.iter(xmltag): return element.attrib[xmlkey] - return None \ No newline at end of file + return None + + def getUSBlist(self): + list = {} + count = 0 + for elements in self.__tree.iter("usbdev"): + sublist = {} + for key,val in elements.items(): + sublist[key] = val + list[count] = sublist + count += 1 + return list \ No newline at end of file diff --git a/test-cli/test/helpers/syscmd.py b/test-cli/test/helpers/syscmd.py index 6114449..7223774 100644 --- a/test-cli/test/helpers/syscmd.py +++ b/test-cli/test/helpers/syscmd.py @@ -1,5 +1,6 @@ import unittest import subprocess +import threading class TestSysCommand(unittest.TestCase): @@ -9,24 +10,15 @@ class TestSysCommand(unittest.TestCase): __outdata = None __outtofile = False - #varlist: str_cmd, outtofile - def __init__(self, testname, testfunc, varlist): + def __init__(self, testname, testfunc, str_cmd, outtofile=False): """ init """ super(TestSysCommand, self).__init__(testfunc) - if "str_cmd" in varlist: - self.__str_cmd = varlist["str_cmd"] - else: - raise Exception('str_cmd param inside TestSysCommand have been be defined') + self.__str_cmd = str_cmd self.__testname = testname - if "outtofile" in varlist: - self.__outtofile = varlist["outtofile"] - if self.__outtofile is True: - self.__outfilename = '/tmp/{}.txt'.format(testname) - else: - self.__outtofile = None - self.__outfilename = None + self.__outtofile = outtofile self._testMethodDoc = testname - + if self.__outtofile is True: + self.__outfilename = '/tmp/{}.txt'.format(testname) def getName(self): return self.__testname @@ -50,7 +42,7 @@ class TestSysCommand(unittest.TestCase): else: res = -3 outdata = completed.stdout - self.longMessage=str(outdata).replace("'","") + self.longMessage = str(outdata).replace("'", "") self.assertTrue(True) except subprocess.CalledProcessError as err: self.assertTrue(False) @@ -62,11 +54,14 @@ class TestSysCommand(unittest.TestCase): def remove_file(self): pass + class SysCommand(object): __str_cmd = None __cmdname = None __outdata = None __errdata = None + __returnCode = None + __exception = None def __init__(self, cmdname, str_cmd): """ init """ @@ -74,31 +69,37 @@ class SysCommand(object): self.__cmdname = cmdname def getName(self): - return self.__testname + return self.__cmdname + + def setName(self, cmdName): + self.__cmdname = cmdName + + def getParams(self): + return self.__str_cmd + + def setParam(self, params): + self.__str_cmd = params def execute(self): - res = -1 + # try: + self.__outdata = None + self.__errdata = None + self.__exception = None try: - self.__outdata = None - self.__errdata = None completed = subprocess.run( self.__str_cmd, - check=True, + check=False, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE ) + self.__returnCode = completed.returncode self.__outdata = completed.stdout - if completed.returncode is 0: - res = 0 - if completed.stderr.decode('ascii') != "": - res = -1 - self.__errdata = completed.stderr + self.__errdata = completed.stderr.decode('ascii') except subprocess.CalledProcessError as err: - res = -2 - except Exception as t: - res = -3 - return res + self.__exception = err + self.__returnCode = -1 + return self.__returnCode def getOutput(self): return self.__outdata @@ -106,6 +107,12 @@ class SysCommand(object): def getOutErr(self): return self.__errdata + def getException(self): + return self.__exception + + def IsException(self): + return self.__exception is not None + def getOutputlines(self): return self.__outdata.splitlines() @@ -114,3 +121,26 @@ class SysCommand(object): f.write(self.__outdata) f.close() + +class AsyncSys(threading.Thread): + sysObject = None + sysres = None + + def __init__(self, threadID, name, counter): + threading.Thread.__init__(self) + self.threadID = threadID + self.name = name + self.counter = counter + self.sysObject = SysCommand(name, None) + + def setParams(self, Params): + self.sysObject.setParam(Params) + + def getRes(self): + return self.sysres + + def getData(self): + return self.sysObject.getOutput() + + def run(self): + self.sysres = self.sysObject.execute() diff --git a/test-cli/test/helpers/usb.py b/test-cli/test/helpers/usb.py new file mode 100644 index 0000000..8f8848d --- /dev/null +++ b/test-cli/test/helpers/usb.py @@ -0,0 +1,270 @@ +from test.helpers.syscmd import SysCommand +from test.helpers.amper import Amper +import scanf +import os +import json + + +class USBDevices(object): + __dev_list = {} + __serial_converter = {} + __usb_disk = {} + __usb_amper = {} + __dev_list_filtered = {} + __usb_mstorage = {} + __xml_config = None + + def __init__(self, xml): + self.__xml_config = xml + self.__getusblist() + self.__getSerialDevices() + self.__getAmper() + self.__getMassStorage() + + def print_usb(self): + print("------ usb list -------") + print(json.dumps(self.__dev_list)) + print("------ serial converter list -------") + print(self.__serial_converter) + print("------ usb disk list -------") + print(self.__usb_disk) + print("------ usb amper list -------") + print(self.__usb_amper) + print("------ usb mass storage list -------") + print(self.__usb_mstorage) + print("------ filtered list -------") + print(json.dumps(self.__dev_list_filtered)) + + def refresh(self): + self.__dev_list = {} + self.__serial_converter = {} + self.__usb_disk = {} + self.__usb_amper = {} + self.__usb_mstorage = {} + self.__dev_list_filtered = {} + self.__getusblist() + self.__getSerialDevices() + self.__getAmper() + self.__getMassStorage() + + def getMassStorage(self): + return self.__usb_mstorage + + def getAmper(self): + return self.__usb_amper + + def __getAmper(self): + for c, s_items in self.__serial_converter.items(): + for k,v in s_items.items(): + if k.find("USB") != -1 and v == "Prolific_Technology_Inc._USB-Serial_Controller": + testAmper = Amper(port=k) + testAmper.open() + res = testAmper.hello() + if res: + self.__usb_amper = {'port': k, 'ver': testAmper.getVersion()} + testAmper.close() + if res: + return + + def __getMassStorage(self): + for c, s_items in self.__usb_disk.items(): + for k,v in s_items.items(): + if k == "udev": + if v.get('ID_MODEL', None) == "File-Stor_Gadget" or v.get('ID_FS_LABEL', None) != "NODE_L031K6": + self.__usb_mstorage = {'disk': v.get('DEVNAME', ''), 'id': v.get('ID_FS_LABEL_ENC', '')} + + + def __check_ignore_usb_list(self, usb_list, dev_list): + count = 0 + ignore = "0" + for _, l in usb_list.items(): + for k, v in l.items(): + if k == "ignore": + ignore = v + else: + value = dev_list.get(k, None) + if v == value: + count += 1 + else: + count = 0 + break + if count >= len(l) -1: + break + + if count > 0: + if ignore == "1": + return True + return False + + + def __create_ignore_list(self, usb_list): + count = 1 + for _, dev in self.__dev_list.items(): + if not self.__check_ignore_usb_list(usb_list, dev): + self.__dev_list_filtered[count] = dev + count += 1 + + def __getusblist(self): + count = 0 + res = open('/sys/kernel/debug/usb/devices', 'rb').read().decode() + list = {} + for i in res.split('\n'): + if i.find('T:') != -1: + if len(list) > 0: + count += 1 + self.__dev_list[count] = list + list = self.__parse_T(i) + elif i.find('D:') != -1: + list = {**list, **self.__parse_D(i)} + elif i.find('P:') != -1: + list = {**list, **self.__parse_P(i)} + elif i.find('S:') != -1: + list = {**list, **self.__parse_S(i)} + else: + """ Ignore all rest""" + pass + + if count > 0: + count += 1 + self.__dev_list[count] = list + + self.__create_ignore_list(self.__xml_config.getUSBlist()) + + + def __parse_T(self, str): + data = {} + res = scanf.scanf("T: Bus=%2c Lev=%2c Prnt=%2c Port=%2c Cnt=%2c Dev#=%3c Spd=%s MxCh=%2c", str) + data["BUS"] = res[0] + data["LEV"] = res[1] + data["PRNT"] = res[2] + data["PORT"] = res[3] + data["CNT"] = res[4] + data["DEV"] = res[5].lstrip() + data["SPEED"] = res[6].rstrip() + data["MXCH"] = res[7].lstrip() + return data + + def __parse_D(self, str): + data = {} + str += ' ' + res = scanf.scanf("D: Ver=%5c Cls=%2c(%s ) %s ", str) + data["VER"] = res[0].lstrip() + data["CLASS"] = res[1] + return data + + def __parse_P(self, str): + data = {} + str += ' ' + res = scanf.scanf("P: Vendor=%4c ProdID=%4c %s", str) + data["VENDOR"] = res[0].lstrip() + data["PRODID"] = res[1] + return data + + def __parse_S(self, str): + data = {} + if str.find('Manufacturer') != -1: + pos = str.find('Manufacturer=') + data["MANUFACTURER"] = str[pos + len('Manufacturer='):].rstrip() + elif str.find('Product') != -1: + pos = str.find('Product=') + data["PRODUCT"] = str[pos + len('Product='):].rstrip() + elif str.find('SerialNumber') != -1: + pos = str.find('SerialNumber=') + data["SERIAL"] = str[pos + len('SerialNumber='):].rstrip() + else: + pass + return data + + def finddevicebytag(self, tag, str): + t = {} + count = 0 + for i, k in self.__dev_list.items(): + if tag in k: + if k[tag].find(str) != -1: + t[count] = k + count += 1 + return t + + def __getSerialDevices(self): + self.__serial_converter = {} + self.__usb_disk = {} + self.__usb_amper = {} + test_myPath = os.path.dirname(os.path.abspath(__file__)) + test_myPath = test_myPath + "/../" + s = SysCommand('usb', '{}/scripts/usb.sh'.format(test_myPath)) + s.execute() + data_list = s.getOutput().decode() + for i in data_list.split('\n'): + if len(i) > 0: + pos_div = i.find('-') + dev = i[0:pos_div - 1] + devName = i[pos_div + 2:] + # print("dev: {}".format(dev)) + # print("devName: {}".format(devName)) + res = scanf.scanf("/dev/sd%c", dev) + if res is not None: + dev_id = res[0][0] + # Now verify if is Disk or Partition + res = scanf.scanf("/dev/sd%c%c", dev) + if res is not None: + # this is a partition and we can ignore it + pass + else: + self.__usb_disk[dev_id] = {dev: devName, 'udev': self.__getudevquery(dev)} + else: + res = scanf.scanf("/dev/ttyUSB%c", dev) + if res is not None: + serial_id = res[0][0] + self.__serial_converter[serial_id] = {dev: devName, 'udev': self.__getudevquery(dev)} + + def __getudevquery(self, device): + list = {} + s = SysCommand('query-udev', 'udevadm info --query=all -n {}'.format(device)) + res = s.execute() + if res != 0: + return None + for i in s.getOutput().decode().split('\n'): + if i.find('P:') != -1: + list = self.__parse_udev_P(i) + elif i.find('S:') != -1: + list = {**list, **self.__parse_udev_S(i)} + elif i.find('N:') != -1: + list = {**list, **self.__parse_udev_N(i)} + elif i.find('S:') != -1: + list = {**list, **self.__parse_udev_S(i)} + elif i.find('E:') != -1: + list = {**list, **self.__parse_udev_E(i)} + return list + + def __parse_udev_P(self, str): + pos = str.find('P: ') + str = str[len('P: ') + pos:] + return {'PATH': str} + + def __parse_udev_S(self, str): + pos = str.find('P: ') + str = str[len('P: ') + pos:] + pos = str.find('disk/by-id/') + if pos != -1: + return {'DISK_BY_ID': str[len('disk/by-id/') + pos:]} + pos = str.find('disk/by-path/') + if pos != -1: + return {'DISK_BY_PATH': str[len('disk/by-path/') + pos:]} + pos = str.find('disk/by-uuid/') + if pos != -1: + return {'DISK_BY_UUID': str[len('disk/by-uuid/') + pos:]} + else: + return {} + + def __parse_udev_N(self, str): + pos = str.find('N: ') + str = str[len('N: ') + pos:] + return {'DEV_NAME': str} + + def __parse_udev_E(self, str): + pos = str.find('E: ') + str = str[len('E: ') + pos:] + pos = str.find('=') + str_key = str[:pos] + str_value = str[pos + 1:] + return {str_key: str_value} diff --git a/test-cli/test/tests/qusb.py b/test-cli/test/tests/qusb.py index d952afc..32d99ef 100644 --- a/test-cli/test/tests/qusb.py +++ b/test-cli/test/tests/qusb.py @@ -2,6 +2,7 @@ import sh import unittest import re import os +from test.helpers.usb import USBDevices from test.helpers.changedir import changedir @@ -16,24 +17,20 @@ class Qusb(unittest.TestCase): self.__resultlist = [] def execute(self): - # Execute script usb.sh - test_abspath = os.path.dirname(os.path.abspath(__file__)) - test_abspath = os.path.join(test_abspath, "..", "..") - p = sh.bash(os.path.join(test_abspath, 'test/scripts/usb.sh')) - # Search in the stdout a pattern "/dev/sd + {letter} + {number} - match = re.search("/dev/sd\w\d", p.stdout.decode('ascii')) - # get the first device which matches the pattern - if match: - device = match.group(0) + # get usb device + dev_obj = USBDevices(self.params["xml"]) + if dev_obj.getMassStorage(): + device = dev_obj.getMassStorage()['disk'] + "1" else: self.__resultlist.append( { "desc": "Test result", - "data": "FAILED: No pendrive found", + "data": "FAILED: No USB memory found", "type": "string" } ) - self.fail("failed: No pendrive found.") + self.fail("failed: No USB memory found.") + # create a new folder where the pendrive is going to be mounted sh.mkdir("-p", "/mnt/pendrive") # check if the device is mounted, and umount it @@ -48,11 +45,12 @@ class Qusb(unittest.TestCase): p = sh.mount(device, "/mnt/pendrive") if p.exit_code == 0: # copy files - p = sh.cp(os.path.join(test_abspath, "test/files/usbtest/usbdatatest.bin"), - os.path.join(test_abspath, "test/files/usbtest/usbdatatest.md5"), + test_abspath = os.path.dirname(os.path.abspath(__file__)) + "/../" + p = sh.cp(os.path.join(test_abspath, "files/usbtest/usbdatatest.bin"), + os.path.join(test_abspath, "files/usbtest/usbdatatest.md5"), "/mnt/pendrive") if p.exit_code == 0: - # check md5 + # check with changedir("/mnt/pendrive/"): p = sh.md5sum("-c", "usbdatatest.md5") q = re.search("OK", p.stdout.decode('ascii')) diff --git a/test-cli/test_main.py b/test-cli/test_main.py index 0df9bd4..63d8dad 100644 --- a/test-cli/test_main.py +++ b/test-cli/test_main.py @@ -92,6 +92,7 @@ def create_testsuite(): paramlist["testid"] = testid paramlist["boarduuid"] = globalVar.g_uuid paramlist["testidctl"] = globalVar.testid_ctl + paramlist["xml"] = xmlObj # add test to TestSuite add_test_task(suite, testdefname, paramlist) @@ -232,7 +233,6 @@ if __name__ == "__main__": # Clear the shell screen clear() test_abspath = os.path.dirname(os.path.abspath(__file__)) - print(test_abspath) # create logger loggerObj = ISEE_Logger(logging.INFO) -- cgit v1.1