from test.helpers.syscmd import SysCommand from test.helpers.amper import Amper import scanf import os import json class USBDevices(object): __dev_list = {} __serial_converter = {} __usb_disk = {} __usb_amper = {} __dev_list_filtered = {} __usb_mstorage = {} __xml_config = None def __init__(self, xml): self.__xml_config = xml self.__getusblist() self.__getSerialDevices() self.__getAmper() self.__getMassStorage() def print_usb(self): print("------ usb list -------") print(json.dumps(self.__dev_list)) print("------ serial converter list -------") print(self.__serial_converter) print("------ usb disk list -------") print(self.__usb_disk) print("------ usb amper list -------") print(self.__usb_amper) print("------ usb mass storage list -------") print(self.__usb_mstorage) print("------ filtered list -------") print(json.dumps(self.__dev_list_filtered)) def refresh(self): self.__dev_list = {} self.__serial_converter = {} self.__usb_disk = {} self.__usb_amper = {} self.__usb_mstorage = {} self.__dev_list_filtered = {} self.__getusblist() self.__getSerialDevices() self.__getAmper() self.__getMassStorage() def getMassStorage(self): return self.__usb_mstorage def getAmper(self): return self.__usb_amper def __getAmper(self): for c, s_items in self.__serial_converter.items(): for k,v in s_items.items(): if k.find("USB") != -1 and v == "Prolific_Technology_Inc._USB-Serial_Controller": testAmper = Amper(port=k) testAmper.open() res = testAmper.hello() if res: self.__usb_amper = {'port': k, 'ver': testAmper.getVersion()} testAmper.close() if res: return def __getMassStorage(self): for c, s_items in self.__usb_disk.items(): for k,v in s_items.items(): if k == "udev": if v.get('ID_MODEL', None) == "File-Stor_Gadget" or v.get('ID_FS_LABEL', None) != "NODE_L031K6": self.__usb_mstorage = {'disk': v.get('DEVNAME', ''), 'id': v.get('ID_FS_LABEL_ENC', '')} def __check_ignore_usb_list(self, usb_list, dev_list): count = 0 ignore = "0" for _, l in usb_list.items(): for k, v in l.items(): if k == "ignore": ignore = v else: value = dev_list.get(k, None) if v == value: count += 1 else: count = 0 break if count >= len(l) -1: break if count > 0: if ignore == "1": return True return False def __create_ignore_list(self, usb_list): count = 1 for _, dev in self.__dev_list.items(): if not self.__check_ignore_usb_list(usb_list, dev): self.__dev_list_filtered[count] = dev count += 1 def __getusblist(self): count = 0 res = open('/sys/kernel/debug/usb/devices', 'rb').read().decode() list = {} for i in res.split('\n'): if i.find('T:') != -1: if len(list) > 0: count += 1 self.__dev_list[count] = list list = self.__parse_T(i) elif i.find('D:') != -1: list = {**list, **self.__parse_D(i)} elif i.find('P:') != -1: list = {**list, **self.__parse_P(i)} elif i.find('S:') != -1: list = {**list, **self.__parse_S(i)} else: """ Ignore all rest""" pass if count > 0: count += 1 self.__dev_list[count] = list self.__create_ignore_list(self.__xml_config.getUSBlist()) def __parse_T(self, str): data = {} res = scanf.scanf("T: Bus=%2c Lev=%2c Prnt=%2c Port=%2c Cnt=%2c Dev#=%3c Spd=%s MxCh=%2c", str) data["BUS"] = res[0] data["LEV"] = res[1] data["PRNT"] = res[2] data["PORT"] = res[3] data["CNT"] = res[4] data["DEV"] = res[5].lstrip() data["SPEED"] = res[6].rstrip() data["MXCH"] = res[7].lstrip() return data def __parse_D(self, str): data = {} str += ' ' res = scanf.scanf("D: Ver=%5c Cls=%2c(%s ) %s ", str) data["VER"] = res[0].lstrip() data["CLASS"] = res[1] return data def __parse_P(self, str): data = {} str += ' ' res = scanf.scanf("P: Vendor=%4c ProdID=%4c %s", str) data["VENDOR"] = res[0].lstrip() data["PRODID"] = res[1] return data def __parse_S(self, str): data = {} if str.find('Manufacturer') != -1: pos = str.find('Manufacturer=') data["MANUFACTURER"] = str[pos + len('Manufacturer='):].rstrip() elif str.find('Product') != -1: pos = str.find('Product=') data["PRODUCT"] = str[pos + len('Product='):].rstrip() elif str.find('SerialNumber') != -1: pos = str.find('SerialNumber=') data["SERIAL"] = str[pos + len('SerialNumber='):].rstrip() else: pass return data def finddevicebytag(self, tag, str): t = {} count = 0 for i, k in self.__dev_list.items(): if tag in k: if k[tag].find(str) != -1: t[count] = k count += 1 return t def __getSerialDevices(self): self.__serial_converter = {} self.__usb_disk = {} self.__usb_amper = {} test_myPath = os.path.dirname(os.path.abspath(__file__)) test_myPath = test_myPath + "/../" s = SysCommand('usb', '{}/scripts/usb.sh'.format(test_myPath)) s.execute() data_list = s.getOutput().decode() for i in data_list.split('\n'): if len(i) > 0: pos_div = i.find('-') dev = i[0:pos_div - 1] devName = i[pos_div + 2:] # print("dev: {}".format(dev)) # print("devName: {}".format(devName)) res = scanf.scanf("/dev/sd%c", dev) if res is not None: dev_id = res[0][0] # Now verify if is Disk or Partition res = scanf.scanf("/dev/sd%c%c", dev) if res is not None: # this is a partition and we can ignore it pass else: self.__usb_disk[dev_id] = {dev: devName, 'udev': self.__getudevquery(dev)} else: res = scanf.scanf("/dev/ttyUSB%c", dev) if res is not None: serial_id = res[0][0] self.__serial_converter[serial_id] = {dev: devName, 'udev': self.__getudevquery(dev)} def __getudevquery(self, device): list = {} s = SysCommand('query-udev', 'udevadm info --query=all -n {}'.format(device)) res = s.execute() if res != 0: return None for i in s.getOutput().decode().split('\n'): if i.find('P:') != -1: list = self.__parse_udev_P(i) elif i.find('S:') != -1: list = {**list, **self.__parse_udev_S(i)} elif i.find('N:') != -1: list = {**list, **self.__parse_udev_N(i)} elif i.find('S:') != -1: list = {**list, **self.__parse_udev_S(i)} elif i.find('E:') != -1: list = {**list, **self.__parse_udev_E(i)} return list def __parse_udev_P(self, str): pos = str.find('P: ') str = str[len('P: ') + pos:] return {'PATH': str} def __parse_udev_S(self, str): pos = str.find('P: ') str = str[len('P: ') + pos:] pos = str.find('disk/by-id/') if pos != -1: return {'DISK_BY_ID': str[len('disk/by-id/') + pos:]} pos = str.find('disk/by-path/') if pos != -1: return {'DISK_BY_PATH': str[len('disk/by-path/') + pos:]} pos = str.find('disk/by-uuid/') if pos != -1: return {'DISK_BY_UUID': str[len('disk/by-uuid/') + pos:]} else: return {} def __parse_udev_N(self, str): pos = str.find('N: ') str = str[len('N: ') + pos:] return {'DEV_NAME': str} def __parse_udev_E(self, str): pos = str.find('E: ') str = str[len('E: ') + pos:] pos = str.find('=') str_key = str[:pos] str_value = str[pos + 1:] return {str_key: str_value}