summaryrefslogtreecommitdiff
path: root/test-cli/test
diff options
context:
space:
mode:
authorHector Fernandez <hector@iatec.biz>2020-06-25 14:42:42 +0200
committerHector Fernandez <hector@iatec.biz>2020-06-25 14:42:42 +0200
commit9ac8a326412b04e4873b883c2f2a056ca0b22480 (patch)
tree2fe1e5fd870df9147c8eacb19980148d752be268 /test-cli/test
parentdb3b1e45c47a1ef23c1ad67114a09cbec0976681 (diff)
downloadboard-9ac8a326412b04e4873b883c2f2a056ca0b22480.zip
board-9ac8a326412b04e4873b883c2f2a056ca0b22480.tar.gz
board-9ac8a326412b04e4873b883c2f2a056ca0b22480.tar.bz2
Modified USB test and the way it finds USB memory storages.
Diffstat (limited to 'test-cli/test')
-rw-r--r--test-cli/test/helpers/setup_xml.py48
-rw-r--r--test-cli/test/helpers/syscmd.py88
-rw-r--r--test-cli/test/helpers/usb.py270
-rw-r--r--test-cli/test/tests/qusb.py26
4 files changed, 378 insertions, 54 deletions
diff --git a/test-cli/test/helpers/setup_xml.py b/test-cli/test/helpers/setup_xml.py
index eb8d73c..603a2ac 100644
--- a/test-cli/test/helpers/setup_xml.py
+++ b/test-cli/test/helpers/setup_xml.py
@@ -1,22 +1,23 @@
import xml.etree.ElementTree as XMLParser
+
class XMLSetup (object):
- """aaaaa"""
+ """XML Setup Parser"""
__tree = None # Parser
__dbType = None # database connection required: PgSQLConnection
__dbConnectionRaw = None # Connection string in raw
__dbConnectionStr = None # Connection string to use in sql object connection
def __init__(self, filename):
- """aaaaa"""
+ """Parse the file in the constructor"""
self.__tree = XMLParser.parse(filename)
def __del__(self):
- """aaaaa"""
+ """Destructor do nothing"""
pass
- def getdbConnectionStr (self):
- """aaaaa"""
+ def getdbConnectionStr(self):
+ """XML to database connection string"""
if self.__dbConnectionRaw is not None:
return self.__dbConnectionRaw
@@ -29,19 +30,44 @@ class XMLSetup (object):
return None
- def getPostgresConnectionStr (self):
- """aaaaa"""
+ def getPostgresConnectionStr(self):
+ """Get Connection string """
str = self.__dbConnectionRaw
del str['type']
return str
- def getMysqlConnectionStr (self):
- """aaaaa"""
- pass
+ def getBoard(self, key, default):
+ for element in self.__tree.iter('board'):
+ if key in element.attrib:
+ return element.attrib[key]
+ return default
+
+ def getTempPath(self, key, default):
+ for element in self.__tree.iter('tmpPath'):
+ if key in element.attrib:
+ return element.attrib[key]
+ return default
+
+ def getKeyVal(self, tag, key, default):
+ for element in self.__tree.iter(tag):
+ if key in element.attrib:
+ return element.attrib[key]
+ return default
def gettagKey (self, xmltag, xmlkey):
"""aaaaa"""
for element in self.__tree.iter(xmltag):
return element.attrib[xmlkey]
- return None \ No newline at end of file
+ return None
+
+ def getUSBlist(self):
+ list = {}
+ count = 0
+ for elements in self.__tree.iter("usbdev"):
+ sublist = {}
+ for key,val in elements.items():
+ sublist[key] = val
+ list[count] = sublist
+ count += 1
+ return list \ No newline at end of file
diff --git a/test-cli/test/helpers/syscmd.py b/test-cli/test/helpers/syscmd.py
index 6114449..7223774 100644
--- a/test-cli/test/helpers/syscmd.py
+++ b/test-cli/test/helpers/syscmd.py
@@ -1,5 +1,6 @@
import unittest
import subprocess
+import threading
class TestSysCommand(unittest.TestCase):
@@ -9,24 +10,15 @@ class TestSysCommand(unittest.TestCase):
__outdata = None
__outtofile = False
- #varlist: str_cmd, outtofile
- def __init__(self, testname, testfunc, varlist):
+ def __init__(self, testname, testfunc, str_cmd, outtofile=False):
""" init """
super(TestSysCommand, self).__init__(testfunc)
- if "str_cmd" in varlist:
- self.__str_cmd = varlist["str_cmd"]
- else:
- raise Exception('str_cmd param inside TestSysCommand have been be defined')
+ self.__str_cmd = str_cmd
self.__testname = testname
- if "outtofile" in varlist:
- self.__outtofile = varlist["outtofile"]
- if self.__outtofile is True:
- self.__outfilename = '/tmp/{}.txt'.format(testname)
- else:
- self.__outtofile = None
- self.__outfilename = None
+ self.__outtofile = outtofile
self._testMethodDoc = testname
-
+ if self.__outtofile is True:
+ self.__outfilename = '/tmp/{}.txt'.format(testname)
def getName(self):
return self.__testname
@@ -50,7 +42,7 @@ class TestSysCommand(unittest.TestCase):
else:
res = -3
outdata = completed.stdout
- self.longMessage=str(outdata).replace("'","")
+ self.longMessage = str(outdata).replace("'", "")
self.assertTrue(True)
except subprocess.CalledProcessError as err:
self.assertTrue(False)
@@ -62,11 +54,14 @@ class TestSysCommand(unittest.TestCase):
def remove_file(self):
pass
+
class SysCommand(object):
__str_cmd = None
__cmdname = None
__outdata = None
__errdata = None
+ __returnCode = None
+ __exception = None
def __init__(self, cmdname, str_cmd):
""" init """
@@ -74,31 +69,37 @@ class SysCommand(object):
self.__cmdname = cmdname
def getName(self):
- return self.__testname
+ return self.__cmdname
+
+ def setName(self, cmdName):
+ self.__cmdname = cmdName
+
+ def getParams(self):
+ return self.__str_cmd
+
+ def setParam(self, params):
+ self.__str_cmd = params
def execute(self):
- res = -1
+ # try:
+ self.__outdata = None
+ self.__errdata = None
+ self.__exception = None
try:
- self.__outdata = None
- self.__errdata = None
completed = subprocess.run(
self.__str_cmd,
- check=True,
+ check=False,
shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE
)
+ self.__returnCode = completed.returncode
self.__outdata = completed.stdout
- if completed.returncode is 0:
- res = 0
- if completed.stderr.decode('ascii') != "":
- res = -1
- self.__errdata = completed.stderr
+ self.__errdata = completed.stderr.decode('ascii')
except subprocess.CalledProcessError as err:
- res = -2
- except Exception as t:
- res = -3
- return res
+ self.__exception = err
+ self.__returnCode = -1
+ return self.__returnCode
def getOutput(self):
return self.__outdata
@@ -106,6 +107,12 @@ class SysCommand(object):
def getOutErr(self):
return self.__errdata
+ def getException(self):
+ return self.__exception
+
+ def IsException(self):
+ return self.__exception is not None
+
def getOutputlines(self):
return self.__outdata.splitlines()
@@ -114,3 +121,26 @@ class SysCommand(object):
f.write(self.__outdata)
f.close()
+
+class AsyncSys(threading.Thread):
+ sysObject = None
+ sysres = None
+
+ def __init__(self, threadID, name, counter):
+ threading.Thread.__init__(self)
+ self.threadID = threadID
+ self.name = name
+ self.counter = counter
+ self.sysObject = SysCommand(name, None)
+
+ def setParams(self, Params):
+ self.sysObject.setParam(Params)
+
+ def getRes(self):
+ return self.sysres
+
+ def getData(self):
+ return self.sysObject.getOutput()
+
+ def run(self):
+ self.sysres = self.sysObject.execute()
diff --git a/test-cli/test/helpers/usb.py b/test-cli/test/helpers/usb.py
new file mode 100644
index 0000000..8f8848d
--- /dev/null
+++ b/test-cli/test/helpers/usb.py
@@ -0,0 +1,270 @@
+from test.helpers.syscmd import SysCommand
+from test.helpers.amper import Amper
+import scanf
+import os
+import json
+
+
+class USBDevices(object):
+ __dev_list = {}
+ __serial_converter = {}
+ __usb_disk = {}
+ __usb_amper = {}
+ __dev_list_filtered = {}
+ __usb_mstorage = {}
+ __xml_config = None
+
+ def __init__(self, xml):
+ self.__xml_config = xml
+ self.__getusblist()
+ self.__getSerialDevices()
+ self.__getAmper()
+ self.__getMassStorage()
+
+ def print_usb(self):
+ print("------ usb list -------")
+ print(json.dumps(self.__dev_list))
+ print("------ serial converter list -------")
+ print(self.__serial_converter)
+ print("------ usb disk list -------")
+ print(self.__usb_disk)
+ print("------ usb amper list -------")
+ print(self.__usb_amper)
+ print("------ usb mass storage list -------")
+ print(self.__usb_mstorage)
+ print("------ filtered list -------")
+ print(json.dumps(self.__dev_list_filtered))
+
+ def refresh(self):
+ self.__dev_list = {}
+ self.__serial_converter = {}
+ self.__usb_disk = {}
+ self.__usb_amper = {}
+ self.__usb_mstorage = {}
+ self.__dev_list_filtered = {}
+ self.__getusblist()
+ self.__getSerialDevices()
+ self.__getAmper()
+ self.__getMassStorage()
+
+ def getMassStorage(self):
+ return self.__usb_mstorage
+
+ def getAmper(self):
+ return self.__usb_amper
+
+ def __getAmper(self):
+ for c, s_items in self.__serial_converter.items():
+ for k,v in s_items.items():
+ if k.find("USB") != -1 and v == "Prolific_Technology_Inc._USB-Serial_Controller":
+ testAmper = Amper(port=k)
+ testAmper.open()
+ res = testAmper.hello()
+ if res:
+ self.__usb_amper = {'port': k, 'ver': testAmper.getVersion()}
+ testAmper.close()
+ if res:
+ return
+
+ def __getMassStorage(self):
+ for c, s_items in self.__usb_disk.items():
+ for k,v in s_items.items():
+ if k == "udev":
+ if v.get('ID_MODEL', None) == "File-Stor_Gadget" or v.get('ID_FS_LABEL', None) != "NODE_L031K6":
+ self.__usb_mstorage = {'disk': v.get('DEVNAME', ''), 'id': v.get('ID_FS_LABEL_ENC', '')}
+
+
+ def __check_ignore_usb_list(self, usb_list, dev_list):
+ count = 0
+ ignore = "0"
+ for _, l in usb_list.items():
+ for k, v in l.items():
+ if k == "ignore":
+ ignore = v
+ else:
+ value = dev_list.get(k, None)
+ if v == value:
+ count += 1
+ else:
+ count = 0
+ break
+ if count >= len(l) -1:
+ break
+
+ if count > 0:
+ if ignore == "1":
+ return True
+ return False
+
+
+ def __create_ignore_list(self, usb_list):
+ count = 1
+ for _, dev in self.__dev_list.items():
+ if not self.__check_ignore_usb_list(usb_list, dev):
+ self.__dev_list_filtered[count] = dev
+ count += 1
+
+ def __getusblist(self):
+ count = 0
+ res = open('/sys/kernel/debug/usb/devices', 'rb').read().decode()
+ list = {}
+ for i in res.split('\n'):
+ if i.find('T:') != -1:
+ if len(list) > 0:
+ count += 1
+ self.__dev_list[count] = list
+ list = self.__parse_T(i)
+ elif i.find('D:') != -1:
+ list = {**list, **self.__parse_D(i)}
+ elif i.find('P:') != -1:
+ list = {**list, **self.__parse_P(i)}
+ elif i.find('S:') != -1:
+ list = {**list, **self.__parse_S(i)}
+ else:
+ """ Ignore all rest"""
+ pass
+
+ if count > 0:
+ count += 1
+ self.__dev_list[count] = list
+
+ self.__create_ignore_list(self.__xml_config.getUSBlist())
+
+
+ def __parse_T(self, str):
+ data = {}
+ res = scanf.scanf("T: Bus=%2c Lev=%2c Prnt=%2c Port=%2c Cnt=%2c Dev#=%3c Spd=%s MxCh=%2c", str)
+ data["BUS"] = res[0]
+ data["LEV"] = res[1]
+ data["PRNT"] = res[2]
+ data["PORT"] = res[3]
+ data["CNT"] = res[4]
+ data["DEV"] = res[5].lstrip()
+ data["SPEED"] = res[6].rstrip()
+ data["MXCH"] = res[7].lstrip()
+ return data
+
+ def __parse_D(self, str):
+ data = {}
+ str += ' '
+ res = scanf.scanf("D: Ver=%5c Cls=%2c(%s ) %s ", str)
+ data["VER"] = res[0].lstrip()
+ data["CLASS"] = res[1]
+ return data
+
+ def __parse_P(self, str):
+ data = {}
+ str += ' '
+ res = scanf.scanf("P: Vendor=%4c ProdID=%4c %s", str)
+ data["VENDOR"] = res[0].lstrip()
+ data["PRODID"] = res[1]
+ return data
+
+ def __parse_S(self, str):
+ data = {}
+ if str.find('Manufacturer') != -1:
+ pos = str.find('Manufacturer=')
+ data["MANUFACTURER"] = str[pos + len('Manufacturer='):].rstrip()
+ elif str.find('Product') != -1:
+ pos = str.find('Product=')
+ data["PRODUCT"] = str[pos + len('Product='):].rstrip()
+ elif str.find('SerialNumber') != -1:
+ pos = str.find('SerialNumber=')
+ data["SERIAL"] = str[pos + len('SerialNumber='):].rstrip()
+ else:
+ pass
+ return data
+
+ def finddevicebytag(self, tag, str):
+ t = {}
+ count = 0
+ for i, k in self.__dev_list.items():
+ if tag in k:
+ if k[tag].find(str) != -1:
+ t[count] = k
+ count += 1
+ return t
+
+ def __getSerialDevices(self):
+ self.__serial_converter = {}
+ self.__usb_disk = {}
+ self.__usb_amper = {}
+ test_myPath = os.path.dirname(os.path.abspath(__file__))
+ test_myPath = test_myPath + "/../"
+ s = SysCommand('usb', '{}/scripts/usb.sh'.format(test_myPath))
+ s.execute()
+ data_list = s.getOutput().decode()
+ for i in data_list.split('\n'):
+ if len(i) > 0:
+ pos_div = i.find('-')
+ dev = i[0:pos_div - 1]
+ devName = i[pos_div + 2:]
+ # print("dev: {}".format(dev))
+ # print("devName: {}".format(devName))
+ res = scanf.scanf("/dev/sd%c", dev)
+ if res is not None:
+ dev_id = res[0][0]
+ # Now verify if is Disk or Partition
+ res = scanf.scanf("/dev/sd%c%c", dev)
+ if res is not None:
+ # this is a partition and we can ignore it
+ pass
+ else:
+ self.__usb_disk[dev_id] = {dev: devName, 'udev': self.__getudevquery(dev)}
+ else:
+ res = scanf.scanf("/dev/ttyUSB%c", dev)
+ if res is not None:
+ serial_id = res[0][0]
+ self.__serial_converter[serial_id] = {dev: devName, 'udev': self.__getudevquery(dev)}
+
+ def __getudevquery(self, device):
+ list = {}
+ s = SysCommand('query-udev', 'udevadm info --query=all -n {}'.format(device))
+ res = s.execute()
+ if res != 0:
+ return None
+ for i in s.getOutput().decode().split('\n'):
+ if i.find('P:') != -1:
+ list = self.__parse_udev_P(i)
+ elif i.find('S:') != -1:
+ list = {**list, **self.__parse_udev_S(i)}
+ elif i.find('N:') != -1:
+ list = {**list, **self.__parse_udev_N(i)}
+ elif i.find('S:') != -1:
+ list = {**list, **self.__parse_udev_S(i)}
+ elif i.find('E:') != -1:
+ list = {**list, **self.__parse_udev_E(i)}
+ return list
+
+ def __parse_udev_P(self, str):
+ pos = str.find('P: ')
+ str = str[len('P: ') + pos:]
+ return {'PATH': str}
+
+ def __parse_udev_S(self, str):
+ pos = str.find('P: ')
+ str = str[len('P: ') + pos:]
+ pos = str.find('disk/by-id/')
+ if pos != -1:
+ return {'DISK_BY_ID': str[len('disk/by-id/') + pos:]}
+ pos = str.find('disk/by-path/')
+ if pos != -1:
+ return {'DISK_BY_PATH': str[len('disk/by-path/') + pos:]}
+ pos = str.find('disk/by-uuid/')
+ if pos != -1:
+ return {'DISK_BY_UUID': str[len('disk/by-uuid/') + pos:]}
+ else:
+ return {}
+
+ def __parse_udev_N(self, str):
+ pos = str.find('N: ')
+ str = str[len('N: ') + pos:]
+ return {'DEV_NAME': str}
+
+ def __parse_udev_E(self, str):
+ pos = str.find('E: ')
+ str = str[len('E: ') + pos:]
+ pos = str.find('=')
+ str_key = str[:pos]
+ str_value = str[pos + 1:]
+ return {str_key: str_value}
diff --git a/test-cli/test/tests/qusb.py b/test-cli/test/tests/qusb.py
index d952afc..32d99ef 100644
--- a/test-cli/test/tests/qusb.py
+++ b/test-cli/test/tests/qusb.py
@@ -2,6 +2,7 @@ import sh
import unittest
import re
import os
+from test.helpers.usb import USBDevices
from test.helpers.changedir import changedir
@@ -16,24 +17,20 @@ class Qusb(unittest.TestCase):
self.__resultlist = []
def execute(self):
- # Execute script usb.sh
- test_abspath = os.path.dirname(os.path.abspath(__file__))
- test_abspath = os.path.join(test_abspath, "..", "..")
- p = sh.bash(os.path.join(test_abspath, 'test/scripts/usb.sh'))
- # Search in the stdout a pattern "/dev/sd + {letter} + {number}
- match = re.search("/dev/sd\w\d", p.stdout.decode('ascii'))
- # get the first device which matches the pattern
- if match:
- device = match.group(0)
+ # get usb device
+ dev_obj = USBDevices(self.params["xml"])
+ if dev_obj.getMassStorage():
+ device = dev_obj.getMassStorage()['disk'] + "1"
else:
self.__resultlist.append(
{
"desc": "Test result",
- "data": "FAILED: No pendrive found",
+ "data": "FAILED: No USB memory found",
"type": "string"
}
)
- self.fail("failed: No pendrive found.")
+ self.fail("failed: No USB memory found.")
+
# create a new folder where the pendrive is going to be mounted
sh.mkdir("-p", "/mnt/pendrive")
# check if the device is mounted, and umount it
@@ -48,11 +45,12 @@ class Qusb(unittest.TestCase):
p = sh.mount(device, "/mnt/pendrive")
if p.exit_code == 0:
# copy files
- p = sh.cp(os.path.join(test_abspath, "test/files/usbtest/usbdatatest.bin"),
- os.path.join(test_abspath, "test/files/usbtest/usbdatatest.md5"),
+ test_abspath = os.path.dirname(os.path.abspath(__file__)) + "/../"
+ p = sh.cp(os.path.join(test_abspath, "files/usbtest/usbdatatest.bin"),
+ os.path.join(test_abspath, "files/usbtest/usbdatatest.md5"),
"/mnt/pendrive")
if p.exit_code == 0:
- # check md5
+ # check
with changedir("/mnt/pendrive/"):
p = sh.md5sum("-c", "usbdatatest.md5")
q = re.search("OK", p.stdout.decode('ascii'))