summaryrefslogtreecommitdiff
path: root/test-cli/test/helpers/usb.py
diff options
context:
space:
mode:
authorManel Caro <mcaro@iatec.biz>2021-11-06 16:28:38 +0100
committerManel Caro <mcaro@iatec.biz>2021-11-06 16:28:38 +0100
commitcf19bfe18cbd283b188a858ee1629f9909c924f4 (patch)
tree1efb23519727130058401df090ab1b5f4cc8ba99 /test-cli/test/helpers/usb.py
parentb6932fbaf898724ae87c29f8965621610f377084 (diff)
parentd5b273a3b58a250742049df4ca0ef0ba54f53d33 (diff)
downloadboard-cf19bfe18cbd283b188a858ee1629f9909c924f4.zip
board-cf19bfe18cbd283b188a858ee1629f9909c924f4.tar.gz
board-cf19bfe18cbd283b188a858ee1629f9909c924f4.tar.bz2
Merge branch 'sopa-test'rel.0.1sopa-test
Diffstat (limited to 'test-cli/test/helpers/usb.py')
-rw-r--r--test-cli/test/helpers/usb.py270
1 files changed, 270 insertions, 0 deletions
diff --git a/test-cli/test/helpers/usb.py b/test-cli/test/helpers/usb.py
new file mode 100644
index 0000000..8f8848d
--- /dev/null
+++ b/test-cli/test/helpers/usb.py
@@ -0,0 +1,270 @@
+from test.helpers.syscmd import SysCommand
+from test.helpers.amper import Amper
+import scanf
+import os
+import json
+
+
+class USBDevices(object):
+ __dev_list = {}
+ __serial_converter = {}
+ __usb_disk = {}
+ __usb_amper = {}
+ __dev_list_filtered = {}
+ __usb_mstorage = {}
+ __xml_config = None
+
+ def __init__(self, xml):
+ self.__xml_config = xml
+ self.__getusblist()
+ self.__getSerialDevices()
+ self.__getAmper()
+ self.__getMassStorage()
+
+ def print_usb(self):
+ print("------ usb list -------")
+ print(json.dumps(self.__dev_list))
+ print("------ serial converter list -------")
+ print(self.__serial_converter)
+ print("------ usb disk list -------")
+ print(self.__usb_disk)
+ print("------ usb amper list -------")
+ print(self.__usb_amper)
+ print("------ usb mass storage list -------")
+ print(self.__usb_mstorage)
+ print("------ filtered list -------")
+ print(json.dumps(self.__dev_list_filtered))
+
+ def refresh(self):
+ self.__dev_list = {}
+ self.__serial_converter = {}
+ self.__usb_disk = {}
+ self.__usb_amper = {}
+ self.__usb_mstorage = {}
+ self.__dev_list_filtered = {}
+ self.__getusblist()
+ self.__getSerialDevices()
+ self.__getAmper()
+ self.__getMassStorage()
+
+ def getMassStorage(self):
+ return self.__usb_mstorage
+
+ def getAmper(self):
+ return self.__usb_amper
+
+ def __getAmper(self):
+ for c, s_items in self.__serial_converter.items():
+ for k,v in s_items.items():
+ if k.find("USB") != -1 and v == "Prolific_Technology_Inc._USB-Serial_Controller":
+ testAmper = Amper(port=k)
+ testAmper.open()
+ res = testAmper.hello()
+ if res:
+ self.__usb_amper = {'port': k, 'ver': testAmper.getVersion()}
+ testAmper.close()
+ if res:
+ return
+
+ def __getMassStorage(self):
+ for c, s_items in self.__usb_disk.items():
+ for k,v in s_items.items():
+ if k == "udev":
+ if v.get('ID_MODEL', None) == "File-Stor_Gadget" or v.get('ID_FS_LABEL', None) != "NODE_L031K6":
+ self.__usb_mstorage = {'disk': v.get('DEVNAME', ''), 'id': v.get('ID_FS_LABEL_ENC', '')}
+
+
+ def __check_ignore_usb_list(self, usb_list, dev_list):
+ count = 0
+ ignore = "0"
+ for _, l in usb_list.items():
+ for k, v in l.items():
+ if k == "ignore":
+ ignore = v
+ else:
+ value = dev_list.get(k, None)
+ if v == value:
+ count += 1
+ else:
+ count = 0
+ break
+ if count >= len(l) -1:
+ break
+
+ if count > 0:
+ if ignore == "1":
+ return True
+ return False
+
+
+ def __create_ignore_list(self, usb_list):
+ count = 1
+ for _, dev in self.__dev_list.items():
+ if not self.__check_ignore_usb_list(usb_list, dev):
+ self.__dev_list_filtered[count] = dev
+ count += 1
+
+ def __getusblist(self):
+ count = 0
+ res = open('/sys/kernel/debug/usb/devices', 'rb').read().decode()
+ list = {}
+ for i in res.split('\n'):
+ if i.find('T:') != -1:
+ if len(list) > 0:
+ count += 1
+ self.__dev_list[count] = list
+ list = self.__parse_T(i)
+ elif i.find('D:') != -1:
+ list = {**list, **self.__parse_D(i)}
+ elif i.find('P:') != -1:
+ list = {**list, **self.__parse_P(i)}
+ elif i.find('S:') != -1:
+ list = {**list, **self.__parse_S(i)}
+ else:
+ """ Ignore all rest"""
+ pass
+
+ if count > 0:
+ count += 1
+ self.__dev_list[count] = list
+
+ self.__create_ignore_list(self.__xml_config.getUSBlist())
+
+
+ def __parse_T(self, str):
+ data = {}
+ res = scanf.scanf("T: Bus=%2c Lev=%2c Prnt=%2c Port=%2c Cnt=%2c Dev#=%3c Spd=%s MxCh=%2c", str)
+ data["BUS"] = res[0]
+ data["LEV"] = res[1]
+ data["PRNT"] = res[2]
+ data["PORT"] = res[3]
+ data["CNT"] = res[4]
+ data["DEV"] = res[5].lstrip()
+ data["SPEED"] = res[6].rstrip()
+ data["MXCH"] = res[7].lstrip()
+ return data
+
+ def __parse_D(self, str):
+ data = {}
+ str += ' '
+ res = scanf.scanf("D: Ver=%5c Cls=%2c(%s ) %s ", str)
+ data["VER"] = res[0].lstrip()
+ data["CLASS"] = res[1]
+ return data
+
+ def __parse_P(self, str):
+ data = {}
+ str += ' '
+ res = scanf.scanf("P: Vendor=%4c ProdID=%4c %s", str)
+ data["VENDOR"] = res[0].lstrip()
+ data["PRODID"] = res[1]
+ return data
+
+ def __parse_S(self, str):
+ data = {}
+ if str.find('Manufacturer') != -1:
+ pos = str.find('Manufacturer=')
+ data["MANUFACTURER"] = str[pos + len('Manufacturer='):].rstrip()
+ elif str.find('Product') != -1:
+ pos = str.find('Product=')
+ data["PRODUCT"] = str[pos + len('Product='):].rstrip()
+ elif str.find('SerialNumber') != -1:
+ pos = str.find('SerialNumber=')
+ data["SERIAL"] = str[pos + len('SerialNumber='):].rstrip()
+ else:
+ pass
+ return data
+
+ def finddevicebytag(self, tag, str):
+ t = {}
+ count = 0
+ for i, k in self.__dev_list.items():
+ if tag in k:
+ if k[tag].find(str) != -1:
+ t[count] = k
+ count += 1
+ return t
+
+ def __getSerialDevices(self):
+ self.__serial_converter = {}
+ self.__usb_disk = {}
+ self.__usb_amper = {}
+ test_myPath = os.path.dirname(os.path.abspath(__file__))
+ test_myPath = test_myPath + "/../"
+ s = SysCommand('usb', '{}/scripts/usb.sh'.format(test_myPath))
+ s.execute()
+ data_list = s.getOutput().decode()
+ for i in data_list.split('\n'):
+ if len(i) > 0:
+ pos_div = i.find('-')
+ dev = i[0:pos_div - 1]
+ devName = i[pos_div + 2:]
+ # print("dev: {}".format(dev))
+ # print("devName: {}".format(devName))
+ res = scanf.scanf("/dev/sd%c", dev)
+ if res is not None:
+ dev_id = res[0][0]
+ # Now verify if is Disk or Partition
+ res = scanf.scanf("/dev/sd%c%c", dev)
+ if res is not None:
+ # this is a partition and we can ignore it
+ pass
+ else:
+ self.__usb_disk[dev_id] = {dev: devName, 'udev': self.__getudevquery(dev)}
+ else:
+ res = scanf.scanf("/dev/ttyUSB%c", dev)
+ if res is not None:
+ serial_id = res[0][0]
+ self.__serial_converter[serial_id] = {dev: devName, 'udev': self.__getudevquery(dev)}
+
+ def __getudevquery(self, device):
+ list = {}
+ s = SysCommand('query-udev', 'udevadm info --query=all -n {}'.format(device))
+ res = s.execute()
+ if res != 0:
+ return None
+ for i in s.getOutput().decode().split('\n'):
+ if i.find('P:') != -1:
+ list = self.__parse_udev_P(i)
+ elif i.find('S:') != -1:
+ list = {**list, **self.__parse_udev_S(i)}
+ elif i.find('N:') != -1:
+ list = {**list, **self.__parse_udev_N(i)}
+ elif i.find('S:') != -1:
+ list = {**list, **self.__parse_udev_S(i)}
+ elif i.find('E:') != -1:
+ list = {**list, **self.__parse_udev_E(i)}
+ return list
+
+ def __parse_udev_P(self, str):
+ pos = str.find('P: ')
+ str = str[len('P: ') + pos:]
+ return {'PATH': str}
+
+ def __parse_udev_S(self, str):
+ pos = str.find('P: ')
+ str = str[len('P: ') + pos:]
+ pos = str.find('disk/by-id/')
+ if pos != -1:
+ return {'DISK_BY_ID': str[len('disk/by-id/') + pos:]}
+ pos = str.find('disk/by-path/')
+ if pos != -1:
+ return {'DISK_BY_PATH': str[len('disk/by-path/') + pos:]}
+ pos = str.find('disk/by-uuid/')
+ if pos != -1:
+ return {'DISK_BY_UUID': str[len('disk/by-uuid/') + pos:]}
+ else:
+ return {}
+
+ def __parse_udev_N(self, str):
+ pos = str.find('N: ')
+ str = str[len('N: ') + pos:]
+ return {'DEV_NAME': str}
+
+ def __parse_udev_E(self, str):
+ pos = str.find('E: ')
+ str = str[len('E: ') + pos:]
+ pos = str.find('=')
+ str_key = str[:pos]
+ str_value = str[pos + 1:]
+ return {str_key: str_value}