From 9ac8a326412b04e4873b883c2f2a056ca0b22480 Mon Sep 17 00:00:00 2001 From: Hector Fernandez Date: Thu, 25 Jun 2020 14:42:42 +0200 Subject: Modified USB test and the way it finds USB memory storages. --- test-cli/test/helpers/usb.py | 270 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 270 insertions(+) create mode 100644 test-cli/test/helpers/usb.py (limited to 'test-cli/test/helpers/usb.py') diff --git a/test-cli/test/helpers/usb.py b/test-cli/test/helpers/usb.py new file mode 100644 index 0000000..8f8848d --- /dev/null +++ b/test-cli/test/helpers/usb.py @@ -0,0 +1,270 @@ +from test.helpers.syscmd import SysCommand +from test.helpers.amper import Amper +import scanf +import os +import json + + +class USBDevices(object): + __dev_list = {} + __serial_converter = {} + __usb_disk = {} + __usb_amper = {} + __dev_list_filtered = {} + __usb_mstorage = {} + __xml_config = None + + def __init__(self, xml): + self.__xml_config = xml + self.__getusblist() + self.__getSerialDevices() + self.__getAmper() + self.__getMassStorage() + + def print_usb(self): + print("------ usb list -------") + print(json.dumps(self.__dev_list)) + print("------ serial converter list -------") + print(self.__serial_converter) + print("------ usb disk list -------") + print(self.__usb_disk) + print("------ usb amper list -------") + print(self.__usb_amper) + print("------ usb mass storage list -------") + print(self.__usb_mstorage) + print("------ filtered list -------") + print(json.dumps(self.__dev_list_filtered)) + + def refresh(self): + self.__dev_list = {} + self.__serial_converter = {} + self.__usb_disk = {} + self.__usb_amper = {} + self.__usb_mstorage = {} + self.__dev_list_filtered = {} + self.__getusblist() + self.__getSerialDevices() + self.__getAmper() + self.__getMassStorage() + + def getMassStorage(self): + return self.__usb_mstorage + + def getAmper(self): + return self.__usb_amper + + def __getAmper(self): + for c, s_items in self.__serial_converter.items(): + for k,v in s_items.items(): + if k.find("USB") != -1 and v == "Prolific_Technology_Inc._USB-Serial_Controller": + testAmper = Amper(port=k) + testAmper.open() + res = testAmper.hello() + if res: + self.__usb_amper = {'port': k, 'ver': testAmper.getVersion()} + testAmper.close() + if res: + return + + def __getMassStorage(self): + for c, s_items in self.__usb_disk.items(): + for k,v in s_items.items(): + if k == "udev": + if v.get('ID_MODEL', None) == "File-Stor_Gadget" or v.get('ID_FS_LABEL', None) != "NODE_L031K6": + self.__usb_mstorage = {'disk': v.get('DEVNAME', ''), 'id': v.get('ID_FS_LABEL_ENC', '')} + + + def __check_ignore_usb_list(self, usb_list, dev_list): + count = 0 + ignore = "0" + for _, l in usb_list.items(): + for k, v in l.items(): + if k == "ignore": + ignore = v + else: + value = dev_list.get(k, None) + if v == value: + count += 1 + else: + count = 0 + break + if count >= len(l) -1: + break + + if count > 0: + if ignore == "1": + return True + return False + + + def __create_ignore_list(self, usb_list): + count = 1 + for _, dev in self.__dev_list.items(): + if not self.__check_ignore_usb_list(usb_list, dev): + self.__dev_list_filtered[count] = dev + count += 1 + + def __getusblist(self): + count = 0 + res = open('/sys/kernel/debug/usb/devices', 'rb').read().decode() + list = {} + for i in res.split('\n'): + if i.find('T:') != -1: + if len(list) > 0: + count += 1 + self.__dev_list[count] = list + list = self.__parse_T(i) + elif i.find('D:') != -1: + list = {**list, **self.__parse_D(i)} + elif i.find('P:') != -1: + list = {**list, **self.__parse_P(i)} + elif i.find('S:') != -1: + list = {**list, **self.__parse_S(i)} + else: + """ Ignore all rest""" + pass + + if count > 0: + count += 1 + self.__dev_list[count] = list + + self.__create_ignore_list(self.__xml_config.getUSBlist()) + + + def __parse_T(self, str): + data = {} + res = scanf.scanf("T: Bus=%2c Lev=%2c Prnt=%2c Port=%2c Cnt=%2c Dev#=%3c Spd=%s MxCh=%2c", str) + data["BUS"] = res[0] + data["LEV"] = res[1] + data["PRNT"] = res[2] + data["PORT"] = res[3] + data["CNT"] = res[4] + data["DEV"] = res[5].lstrip() + data["SPEED"] = res[6].rstrip() + data["MXCH"] = res[7].lstrip() + return data + + def __parse_D(self, str): + data = {} + str += ' ' + res = scanf.scanf("D: Ver=%5c Cls=%2c(%s ) %s ", str) + data["VER"] = res[0].lstrip() + data["CLASS"] = res[1] + return data + + def __parse_P(self, str): + data = {} + str += ' ' + res = scanf.scanf("P: Vendor=%4c ProdID=%4c %s", str) + data["VENDOR"] = res[0].lstrip() + data["PRODID"] = res[1] + return data + + def __parse_S(self, str): + data = {} + if str.find('Manufacturer') != -1: + pos = str.find('Manufacturer=') + data["MANUFACTURER"] = str[pos + len('Manufacturer='):].rstrip() + elif str.find('Product') != -1: + pos = str.find('Product=') + data["PRODUCT"] = str[pos + len('Product='):].rstrip() + elif str.find('SerialNumber') != -1: + pos = str.find('SerialNumber=') + data["SERIAL"] = str[pos + len('SerialNumber='):].rstrip() + else: + pass + return data + + def finddevicebytag(self, tag, str): + t = {} + count = 0 + for i, k in self.__dev_list.items(): + if tag in k: + if k[tag].find(str) != -1: + t[count] = k + count += 1 + return t + + def __getSerialDevices(self): + self.__serial_converter = {} + self.__usb_disk = {} + self.__usb_amper = {} + test_myPath = os.path.dirname(os.path.abspath(__file__)) + test_myPath = test_myPath + "/../" + s = SysCommand('usb', '{}/scripts/usb.sh'.format(test_myPath)) + s.execute() + data_list = s.getOutput().decode() + for i in data_list.split('\n'): + if len(i) > 0: + pos_div = i.find('-') + dev = i[0:pos_div - 1] + devName = i[pos_div + 2:] + # print("dev: {}".format(dev)) + # print("devName: {}".format(devName)) + res = scanf.scanf("/dev/sd%c", dev) + if res is not None: + dev_id = res[0][0] + # Now verify if is Disk or Partition + res = scanf.scanf("/dev/sd%c%c", dev) + if res is not None: + # this is a partition and we can ignore it + pass + else: + self.__usb_disk[dev_id] = {dev: devName, 'udev': self.__getudevquery(dev)} + else: + res = scanf.scanf("/dev/ttyUSB%c", dev) + if res is not None: + serial_id = res[0][0] + self.__serial_converter[serial_id] = {dev: devName, 'udev': self.__getudevquery(dev)} + + def __getudevquery(self, device): + list = {} + s = SysCommand('query-udev', 'udevadm info --query=all -n {}'.format(device)) + res = s.execute() + if res != 0: + return None + for i in s.getOutput().decode().split('\n'): + if i.find('P:') != -1: + list = self.__parse_udev_P(i) + elif i.find('S:') != -1: + list = {**list, **self.__parse_udev_S(i)} + elif i.find('N:') != -1: + list = {**list, **self.__parse_udev_N(i)} + elif i.find('S:') != -1: + list = {**list, **self.__parse_udev_S(i)} + elif i.find('E:') != -1: + list = {**list, **self.__parse_udev_E(i)} + return list + + def __parse_udev_P(self, str): + pos = str.find('P: ') + str = str[len('P: ') + pos:] + return {'PATH': str} + + def __parse_udev_S(self, str): + pos = str.find('P: ') + str = str[len('P: ') + pos:] + pos = str.find('disk/by-id/') + if pos != -1: + return {'DISK_BY_ID': str[len('disk/by-id/') + pos:]} + pos = str.find('disk/by-path/') + if pos != -1: + return {'DISK_BY_PATH': str[len('disk/by-path/') + pos:]} + pos = str.find('disk/by-uuid/') + if pos != -1: + return {'DISK_BY_UUID': str[len('disk/by-uuid/') + pos:]} + else: + return {} + + def __parse_udev_N(self, str): + pos = str.find('N: ') + str = str[len('N: ') + pos:] + return {'DEV_NAME': str} + + def __parse_udev_E(self, str): + pos = str.find('E: ') + str = str[len('E: ') + pos:] + pos = str.find('=') + str_key = str[:pos] + str_value = str[pos + 1:] + return {str_key: str_value} -- cgit v1.1