From d46bce422fd03cd57d1ba336361da17d6efb48db Mon Sep 17 00:00:00 2001 From: Manel Caro Date: Fri, 31 Jul 2020 02:07:37 +0200 Subject: TEST restructure --- test-cli/test/enums/StationStates.py | 1 + test-cli/test/helpers/camara.py | 124 ++++++++++++++++++++++++++++ test-cli/test/helpers/cmdline.py | 2 +- test-cli/test/helpers/gpio.py | 55 +++++++++++++ test-cli/test/helpers/int_registers.py | 11 ++- test-cli/test/helpers/iseelogger.py | 25 +++++- test-cli/test/helpers/iw.py | 124 ++++++++++++++++++++++++++++ test-cli/test/helpers/md5.py | 8 ++ test-cli/test/helpers/plc.py | 57 +++++++++++++ test-cli/test/helpers/psqldb.py | 3 + test-cli/test/helpers/qrreader.py | 3 + test-cli/test/helpers/sdl.py | 126 +++++++++++++++++++++++++++++ test-cli/test/helpers/setup_xml.py | 1 - test-cli/test/helpers/testsrv_db.py | 25 +++++- test-cli/test/helpers/utils.py | 68 ++++++++++++++++ test-cli/test/runners/simple.py | 41 ++++++---- test-cli/test/tasks/flasheeprom.py | 121 ++++++++++++++++------------ test-cli/test/tasks/flashmemory.py | 50 +++++++++--- test-cli/test/tests/qamper.py | 32 +++++--- test-cli/test/tests/qaudio.py | 77 ++++++++++++------ test-cli/test/tests/qdmesg.py | 30 +++---- test-cli/test/tests/qeeprom.py | 77 +++++++++++------- test-cli/test/tests/qethernet.py | 77 +++++++++++++----- test-cli/test/tests/qmmcflash.py | 46 ++++++++++- test-cli/test/tests/qnand.py | 44 +++++----- test-cli/test/tests/qplc.py | 70 ++++++++++++++++ test-cli/test/tests/qram.py | 56 +++++++++---- test-cli/test/tests/qusb.py | 143 ++++++++++++++++----------------- test-cli/test/tests/qusbdual.py | 90 --------------------- test-cli/test/tests/qwifi.py | 143 ++++++++++++++++----------------- 30 files changed, 1278 insertions(+), 452 deletions(-) create mode 100644 test-cli/test/helpers/camara.py create mode 100644 test-cli/test/helpers/gpio.py create mode 100644 test-cli/test/helpers/iw.py create mode 100644 test-cli/test/helpers/md5.py create mode 100644 test-cli/test/helpers/plc.py create mode 100644 test-cli/test/helpers/sdl.py create mode 100644 test-cli/test/helpers/utils.py create mode 100644 test-cli/test/tests/qplc.py delete mode 100644 test-cli/test/tests/qusbdual.py (limited to 'test-cli/test') diff --git a/test-cli/test/enums/StationStates.py b/test-cli/test/enums/StationStates.py index 040e6bc..88dba1f 100644 --- a/test-cli/test/enums/StationStates.py +++ b/test-cli/test/enums/StationStates.py @@ -18,4 +18,5 @@ class StationStates(Enum): WAITING_FOR_SCANNER = 50 FINISHED = 60 STATION_REBOOT = 2001 + STATION_WAIT_SHUTDOWN = 2002 diff --git a/test-cli/test/helpers/camara.py b/test-cli/test/helpers/camara.py new file mode 100644 index 0000000..9b2829c --- /dev/null +++ b/test-cli/test/helpers/camara.py @@ -0,0 +1,124 @@ +import cv2 +from test.helpers.syscmd import SysCommand + +class Camara(object): + __parent = None + __device_name = None + __device = None + __w = 1280 + __h = 720 + __contrast = 0.0 + __brightness = 0.0 + __saturation = 55.0 + __hue = 0.0 + __exposure = 166 + + def __init__(self, parent, device="video0", width=1280, height=720): + self.__parent = parent + self.__device_name = device + self.__w = width + self.__h = height + + def Close(self): + if self.__device is not None: + del self.__device + self.__device = None + + def Open(self): + self.Close() + self.__device = cv2.VideoCapture("/dev/{}".format(self.__device_name)) + if self.__device.isOpened(): + self.__configure() + return True + return False + + def getSize(self): + return self.__w, self.__h + + def setSize(self, w, h): + if self.__device is not None and self.__device.isOpened(): + self.__w = self.__setCamVar(cv2.CAP_PROP_FRAME_WIDTH, w) + self.__h = self.__setCamVar(cv2.CAP_PROP_FRAME_HEIGHT, h) + else: + self.__w = w + self.__h = h + + def setContrast(self, newVal): + if self.__device.isOpened(): + self.__contrast = self.__setCamVar(cv2.CAP_PROP_CONTRAST, newVal) + else: + self.__contrast = newVal + + def getContrast(self): + return self.__contrast + + def setBrightness(self, newVal): + if self.__device.isOpened(): + self.__brightness = self.__setCamVar(cv2.CAP_PROP_BRIGHTNESS, newVal) + else: + self.__brightness = newVal + + def getBrightness(self): + return self.__brightness + + def __configure(self): + self.__w = self.__setCamVar(cv2.CAP_PROP_FRAME_WIDTH, self.__w) + self.__h = self.__setCamVar(cv2.CAP_PROP_FRAME_HEIGHT, self.__h) + cam_setup = SysCommand('v4lsetup', '{}/scripts/v4l-cam.sh'.format(self.__parent.getAppPath())) + cam_setup.execute() + + #self.__contrast = self.__setCamVar(cv2.CAP_PROP_CONTRAST, self.__contrast) + #self.__brightness = self.__setCamVar(cv2.CAP_PROP_BRIGHTNESS, self.__brightness) + #self.__saturation = self.__setCamVar(cv2.CAP_PROP_SATURATION, self.__saturation) + #self.__hue = self.__setCamVar(cv2.CAP_PROP_HUE, self.__hue) + #self.__exposure = self.__setCamVar(cv2.CAP_PROP_EXPOSURE, self.__exposure) + + + def __setCamVar(self, key, val): + valold = cv2.VideoCapture.get(self.__device, key) + if valold != val: + cv2.VideoCapture.set(self.__device, key, val) + t = cv2.VideoCapture.get(self.__device, key) + return t + return val + + def __getCamVar(self, key): + return cv2.VideoCapture.get(self.__device, key); + + def getFrameCount(self): + return cv2.VideoCapture.get(self.__device, cv2.CAP_PROP_BUFFERSIZE); + + def getFrame(self): + if self.__device.isOpened(): + retval, image = self.__device.read() + if retval: + return image + return None + else: + return None + + def getImageSize(self, image): + if hasattr(image, 'shape'): + return image.shape[1], image.shape[0] + else: + return (0, 0, 0) + + def showFrame(self, name, frame, w=False, maxTime=3000): + cv2.imshow(name, frame) + if w: + if maxTime == -1: + cv2.waitKey() + else: + cv2.waitKey(maxTime) + + def saveFrame(self, fileName, Frame): + cv2.imwrite(fileName, Frame) + + def readImage(self, filename): + return cv2.imread('{}'.format(filename), 0) + + def destroyWindow(self, name): + cv2.destroyWindow(name) + + def closeWindows(self): + cv2.destroyAllWindows() \ No newline at end of file diff --git a/test-cli/test/helpers/cmdline.py b/test-cli/test/helpers/cmdline.py index fad81ac..db94a1c 100644 --- a/test-cli/test/helpers/cmdline.py +++ b/test-cli/test/helpers/cmdline.py @@ -1,5 +1,5 @@ -class LinuxKernel: +class LinuxKernelCmd: __kernel_vars = {} diff --git a/test-cli/test/helpers/gpio.py b/test-cli/test/helpers/gpio.py new file mode 100644 index 0000000..f127f2b --- /dev/null +++ b/test-cli/test/helpers/gpio.py @@ -0,0 +1,55 @@ +import os + +class gpio (object): + gpioNum = None + + def __init__(self, gpioNum, dir, val): + self.gpioNum = gpioNum + self.__export(gpioNum) + self.__set_dir(gpioNum, dir) + if dir == 'out': + self.__set_value(gpioNum, val) + + def set_val(self, value): + self.__set_value( self.gpioNum, value) + + + def __export(self, gpio_number): + if not os.path.isfile("/sys/class/gpio/gpio{}/value".format(gpio_number)): + try: + f = open("/sys/class/gpio/export", "w", newline="\n") + f.write(str(gpio_number)) + f.close() + except IOError: + return False, '{}'.format(IOError.errno) + return True + + def __unexport(self, gpio_number): + if os.path.isfile("/sys/class/gpio/gpio{}/value".format(gpio_number)): + try: + f = open("/sys/class/gpio/unexport", "w", newline="\n") + f.write(str(gpio_number)) + f.close() + except IOError: + return False, '{}'.format(IOError.errno) + return True + + def __set_dir(self, gpio_number, dir): + try: + f = open("/sys/class/gpio/gpio{}/direction".format(gpio_number), "r+", newline="\n") + val = f.readline() + if val != dir: + f.write(dir) + f.close() + except IOError: + return False, '{}'.format(IOError.errno) + return True + + def __set_value(self, gpio_number, value): + try: + f = open("/sys/class/gpio/gpio{}/value".format(gpio_number), "w", newline="\n") + f.write(str(value)) + f.close() + except IOError: + return False, '{}'.format(IOError.errno) + return True \ No newline at end of file diff --git a/test-cli/test/helpers/int_registers.py b/test-cli/test/helpers/int_registers.py index d081853..133387c 100644 --- a/test-cli/test/helpers/int_registers.py +++ b/test-cli/test/helpers/int_registers.py @@ -22,6 +22,12 @@ def read(addr): os.close(fd) return "%08X" % retval[0] +def imx8m_readid(): + f = open("/sys/bus/soc/devices/soc0/soc_uid", "r", newline="\n") + val = f.readline() + f.close() + return val.rstrip() + def get_die_id(modelid): dieid = "" @@ -36,9 +42,10 @@ def get_die_id(modelid): registers = [0x4830A224, 0x4830A220, 0x4830A21C, 0x4830A218] elif modelid.find("OMAP5") == 0: registers = [0x4A002210, 0x4A00220C, 0x4A002208, 0x4A002200] + elif modelid.find("IGEP0048") == 0: + return imx8m_readid() else: - raise Exception('modelid not defined') - + raise Exception('modelid not defined: {}, modelid') for rg in registers: dieid = dieid + read(rg) return dieid diff --git a/test-cli/test/helpers/iseelogger.py b/test-cli/test/helpers/iseelogger.py index 785a78c..4b1087c 100644 --- a/test-cli/test/helpers/iseelogger.py +++ b/test-cli/test/helpers/iseelogger.py @@ -1,6 +1,6 @@ import logging import logging.handlers - +import datetime class ISEE_Logger(object): __logger = None @@ -37,3 +37,26 @@ class ISEE_Logger(object): def getlogger(self): return self.__logger +class MeasureTime: + __difference = None + __first_time = None + __later_time = None + + def __init__(self): + self.__first_time = datetime.datetime.now() + + def start(self): + self.first_time = datetime.datetime.now() + + def stop(self): + self.__later_time = datetime.datetime.now() + self.__difference = self.__later_time - self.__first_time + return self.__difference.total_seconds() + + def getTime(self): + return self.__difference.total_seconds() + + +global logObj +logObj = ISEE_Logger(logging.INFO) + diff --git a/test-cli/test/helpers/iw.py b/test-cli/test/helpers/iw.py new file mode 100644 index 0000000..8e29448 --- /dev/null +++ b/test-cli/test/helpers/iw.py @@ -0,0 +1,124 @@ +from sh import iw +from scanf import scanf + +class iw_scan: + __interface = None + __raw_unparsed = [] + __raw = None + + def __init__(self, interface='wlan0'): + self.__interface = interface + + def scan(self): + self.__raw_unparsed = [] + self.__raw = iw('{}'.format(self.__interface), 'scan') + if self.__raw.exit_code == 0: + self.__raw_unparsed = self.__raw.stdout.decode("utf-8").split('\n') + return True + return False + + def getRawData (self): + return self.__raw_unparsed + + def getInterface(self): + return self.__interface + + def getRawObject(self): + return self.__raw + + +class iwScan: + __iw_scan = None + __station_list = {} + + def __init__(self, interface='wlan0'): + self.__iw_scan = iw_scan(interface) + + def scan(self): + if self.__iw_scan.scan(): + self.__parse_scan(self.__iw_scan.getRawData()) + return True + return False + + def getLastError(self): + rObject = self.__iw_scan.getRawObject() + if rObject.exit_code != 0: + return rObject.stderr + return '' + + def __Parse_BBS(self, line): + data = {} + res = scanf("BSS %s(on %s) -- %s", line) + if res == None: + res = scanf("BSS %s(on %s)", line) + data["MAC"] = res[0] + data["DEV"] = res[1] + if len(res) == 3: + data["ASSOCIATED"] = 'YES' + else: + data["ASSOCIATED"] = 'NO' + return data + + def __Parse_T(self, id, OnNotFoundAttr ,line): + data = {} + res = scanf("{}: %s".format(id), line) + if res == None: + data[id.upper()] = OnNotFoundAttr + else: + data[id.upper()] = res[0] + return data + + def __Parse_X(self, line): + data = {} + res = scanf("DS Parameter set: channel %s", line) + if res == None: + data['CHANNEL'] = '-1' + else: + data['CHANNEL'] = res[0] + return data + + def __block_stationlist(self, rawdata): + list = {} + count = 0 + for line in rawdata: + idx = line.find('BSS') + if idx != -1 and idx == 0: + if len(list) > 0: + count += 1 + self.__station_list[count] = list + list = self.__Parse_BBS(line) + elif line.find('SSID:') != -1: + list = {**list, **self.__Parse_T('SSID', 'HIDDEN', line)} + elif line.find('freq:') != -1: + list = {**list, **self.__Parse_T('freq', '', line)} + elif line.find('signal:') != -1: + list = {**list, **self.__Parse_T('signal', '', line)} + elif line.find('DS Parameter set:') != -1: + list = {**list, **self.__Parse_X(line)} + if count > 0: + count += 1 + self.__station_list[count] = list + + def __parse_scan(self, rawData): + self.__station_list = {} + self.__block_stationlist(rawData) + #print('{}'.format(self.__station_list)) + #for i in self.__station_list: + # print(self.__station_list[i]) + + def findConnected (self): + for i in self.__station_list: + st = self.__station_list[i] + if st.get('ASSOCIATED', 'NO') == 'YES': + return True, self.__station_list[i] + return False, None + + def findBySSID(self, ssid): + for i in self.__station_list: + st = self.__station_list[i] + if st.get('SSID', '') == ssid: + return True, self.__station_list[i] + return False, None + + def getStations(self): + return self.__station_list \ No newline at end of file diff --git a/test-cli/test/helpers/md5.py b/test-cli/test/helpers/md5.py new file mode 100644 index 0000000..9bc9e23 --- /dev/null +++ b/test-cli/test/helpers/md5.py @@ -0,0 +1,8 @@ +import hashlib + +def md5_file(fname): + hash_md5 = hashlib.md5() + with open(fname, "rb") as f: + for chunk in iter(lambda: f.read(4096), b""): + hash_md5.update(chunk) + return hash_md5.hexdigest() \ No newline at end of file diff --git a/test-cli/test/helpers/plc.py b/test-cli/test/helpers/plc.py new file mode 100644 index 0000000..3b00934 --- /dev/null +++ b/test-cli/test/helpers/plc.py @@ -0,0 +1,57 @@ +import sh +from sh import flashcp +from sh import flash_erase +from sh import ErrorReturnCode +from sh import Command +from test.helpers.gpio import gpio + + +class dcpPLC(object): + __nReset = None + __Phy = None + __Plc = None + __mtd_device = None + __factool = None + __myConfigTool = None + + def __init__(self, plcfactool, mtd_device): + # default save firmware + self.__nRest = gpio('75', 'out', '0') + self.__nRest = gpio('69', 'out', '0') + self.__nRest = gpio('78', 'out', '1') + self.__factool = plcfactool + self.__mtd_device = mtd_device + self.__myConfigTool = Command(self.__factool) + + def setSaveFirmwareMode(self): + self.__nRest = gpio('75', 'out', '0') + self.__nRest = gpio('69', 'out', '0') + self.__nRest = gpio('78', 'out', '1') + + def setBootMode(self): + self.__nRest = gpio('78', 'out', '0') + self.__nRest = gpio('75', 'out', '1') + self.__nRest = gpio('69', 'out', '1') + + def SaveFirmware(self, firmare): + self.setSaveFirmwareMode() + try: + flash_erase(self.__mtd_device) + flashcp(firmare, self.__mtd_device) + except ErrorReturnCode as Error: + return False, "plc flash firmware failed {} ".format(Error.exit_code) + return True, '' + + def set_plc (self, var, value, password): + try: + self.__myConfigTool("-o", "SET", "-p", "{}".format(var), '{}'.format(value), "-w", "{}".format(password)) + except ErrorReturnCode as Error: + return False, "set var failed {} {}".format(var,Error.exit_code) + return True, '' + + def get_plc (self, var, value, password): + try: + self.__myConfigTool("-o", "GET", "-p", "{}".format(var), '{}'.format(value), "-w", "{}".format(password)) + except ErrorReturnCode as Error: + return False, "set var failed {} {}".format(var,Error.exit_code) + return True, '' diff --git a/test-cli/test/helpers/psqldb.py b/test-cli/test/helpers/psqldb.py index c98338a..e703c3a 100644 --- a/test-cli/test/helpers/psqldb.py +++ b/test-cli/test/helpers/psqldb.py @@ -31,6 +31,9 @@ class PgSQLConnection(object): print(error) return result + def getConfig(self): + return self.__db_config + def db_execute_query(self, query): cur = self.__conection_object.cursor() cur.execute(query) diff --git a/test-cli/test/helpers/qrreader.py b/test-cli/test/helpers/qrreader.py index 744db54..f663e99 100644 --- a/test-cli/test/helpers/qrreader.py +++ b/test-cli/test/helpers/qrreader.py @@ -4,6 +4,7 @@ import threading import time import selectors from selectors import DefaultSelector, EVENT_READ +from test.helpers.iseelogger import logObj selector = selectors.DefaultSelector() @@ -37,7 +38,9 @@ class QRReader: def getQRlist(self): devices = [evdev.InputDevice(path) for path in evdev.list_devices()] + logObj.getlogger().debug("{}".format(devices)) for device in devices: + logObj.getlogger().debug("{}".format(device.name)) if device.name in qrdevice_list: self.__myReader['NAME'] = device.name # print(self.__myReader['NAME']) diff --git a/test-cli/test/helpers/sdl.py b/test-cli/test/helpers/sdl.py new file mode 100644 index 0000000..8f55d9f --- /dev/null +++ b/test-cli/test/helpers/sdl.py @@ -0,0 +1,126 @@ +import sdl2.ext +from sdl2 import * +import os +import time + +Colors = { + 'red': sdl2.ext.Color(255, 0, 0, 255), + 'green': sdl2.ext.Color(0, 255, 0, 255), + 'blue': sdl2.ext.Color(0, 0, 255, 255), + 'white': sdl2.ext.Color(255, 255, 255, 255), + 'black': sdl2.ext.Color(0, 0, 0, 255), +} + + +class SDL2(object): + __resources = None + __w = 1024 + __h = 768 + __winName = "noname" + __window = None + __factory = None + __renderer = None + __srender = None + + def __init__(self, parent): + os.environ['SDL_VIDEODRIVER'] = "x11" + os.environ['DISPLAY'] = ":0" + self.__resources = sdl2.ext.Resources(parent.getAppPath(), 'files') + sdl2.ext.init() + self.__createWindow() + + def __createWindow(self): + self.__window = sdl2.ext.Window(title='{}'.format(self.__winName), size=(self.__w, self.__h), position=(0, 0), + flags=( + SDL_WINDOW_HIDDEN | SDL_WINDOW_FULLSCREEN | SDL_WINDOW_BORDERLESS | SDL_WINDOW_MAXIMIZED)) + self.__renderer = sdl2.ext.Renderer(self.__window) + self.__factory = sdl2.ext.SpriteFactory(sdl2.ext.TEXTURE, renderer=self.__renderer) + self.__srender = self.__factory.create_sprite_render_system(self.__window) + self.__window.show() + SDL_ShowCursor(SDL_DISABLE) + + def createFont(self, fontName, fontSize, fontColor): + return sdl2.ext.FontManager(font_path=self.__resources.get_path(fontName), size=fontSize, color=fontColor) + + def WriteText(self, text, x, y, fontManager): + imText = self.__factory.from_text(text, fontmanager=fontManager) + self.__renderer.copy(imText, dstrect=(x, y, imText.size[0], imText.size[1])) + + def show(self): + self.__window.show() + + def hide(self): + self.__window.hide() + + def setColor(self, red, green, blue, alpha): + self.__renderer.color = sdl2.ext.Color(red, green, blue, alpha) + + def fillColor(self, red, green, blue, alpha): + self.setColor(red, green, blue, alpha) + self.__renderer.clear() + + def fillbgColor(self, color, update=False): + if color in Colors: + self.__renderer.color = Colors[color] + self.__renderer.clear() + if update: + self.update() + + def update(self): + self.__renderer.present() + + def showImage(self, filename): + im = self.__factory.from_image(self.__resources.get_path(filename)) + self.__srender.render(im) + + +class SDL2_Test(object): + __sdl2 = None + + def __init__(self, parent): + self.__sdl2 = SDL2(parent) + + def Clear(self): + self.__sdl2.fillbgColor('black', True) + + def Paint(self, dcolor): + self.Clear() + self.__sdl2.fillbgColor(dcolor.lower(), True) + + +class SDL2_Message(object): + __sdl2 = None + __fontManager_w = None + __fontManager_b = None + + def __init__(self, parent): + self.__sdl2 = SDL2(parent) + self.__fontManager_w = self.__sdl2.createFont('OpenSans-Regular.ttf', 24, Colors['white']) + self.__fontManager_b = self.__sdl2.createFont('OpenSans-Regular.ttf', 24, Colors['black']) + + def Clear(self): + self.__sdl2.fillbgColor('black', True) + + def Paint(self, dcolor): + self.Clear() + self.__sdl2.fillbgColor(dcolor.lower(), True) + + def __write_w(self, x, y, text): + self.__sdl2.WriteText(text, x, y, self.__fontManager_w) + + def __write_b(self, x, y, text): + self.__sdl2.WriteText(text, x, y, self.__fontManager_b) + + def setTestOK(self, board_uuid, qrdata): + self.Paint('GREEN') + self.__write_b(100, 50, 'TEST OK') + self.__write_b(100, 100, 'uuid={}'.format(board_uuid)) + self.__write_b(100, 150, 'qr={}'.format(qrdata)) + self.__sdl2.update() + + def setTestERROR(self, error): + self.Paint('RED') + self.__write_w(100, 50, 'TEST FAILED') + self.__write_w(100, 100, '{}'.format(error)) + self.__sdl2.update() + diff --git a/test-cli/test/helpers/setup_xml.py b/test-cli/test/helpers/setup_xml.py index 603a2ac..d61d1fb 100644 --- a/test-cli/test/helpers/setup_xml.py +++ b/test-cli/test/helpers/setup_xml.py @@ -1,6 +1,5 @@ import xml.etree.ElementTree as XMLParser - class XMLSetup (object): """XML Setup Parser""" __tree = None # Parser diff --git a/test-cli/test/helpers/testsrv_db.py b/test-cli/test/helpers/testsrv_db.py index 9579e7e..c2b2276 100644 --- a/test-cli/test/helpers/testsrv_db.py +++ b/test-cli/test/helpers/testsrv_db.py @@ -24,6 +24,9 @@ class TestSrv_Database(object): self.__sqlObject = PgSQLConnection() return self.__sqlObject.db_connect(self.__xml_setup.getdbConnectionStr()) + def getConfig(self): + return self.__sqlObject.getConfig() + def create_board(self, processor_id, model_id, variant, station, bmac=None): '''create a new board''' if bmac is None: @@ -41,7 +44,8 @@ class TestSrv_Database(object): return res[0][0] except Exception as err: r = find_between(str(err), '#', '#') - # print(r) + print(r) + print(str(err)) return None def get_tests_list(self, board_uuid): @@ -95,10 +99,11 @@ class TestSrv_Database(object): sql = "SELECT isee.f_finish_test({},{},'{}','{}')".format(testid_ctl, testid, newstatus, textresult) # print('>>>' + sql) try: + print('finish_test => SQL {}'.format(sql)) self.__sqlObject.db_execute_query(sql) except Exception as err: r = find_between(str(err), '#', '#') - # print(r) + print('finish_test => {}'.format(err)) return None def upload_result_file(self, testid_ctl, testid, desc, filepath, mimetype): @@ -195,6 +200,18 @@ class TestSrv_Database(object): def change_station_state(self, station, newstate): sql = "SELECT station.setmystate('{}', '{}', NULL)".format(newstate, station) + print('>>>' + sql) + try: + res = self.__sqlObject.db_execute_query(sql) + print(res) + return res[0][0] + except Exception as err: + r = find_between(str(err), '#', '#') + print(r) + return None + + def get_setup_variable(self, skey): + sql = "SELECT * FROM admin.get_setupvar('{}')".format(skey) # print('>>>' + sql) try: res = self.__sqlObject.db_execute_query(sql) @@ -205,7 +222,7 @@ class TestSrv_Database(object): # print(r) return None - def get_setup_variable(self, skey): + def get_setup_variable(self, skey , default): sql = "SELECT * FROM admin.get_setupvar('{}')".format(skey) # print('>>>' + sql) try: @@ -215,4 +232,4 @@ class TestSrv_Database(object): except Exception as err: r = find_between(str(err), '#', '#') # print(r) - return None + return default diff --git a/test-cli/test/helpers/utils.py b/test-cli/test/helpers/utils.py new file mode 100644 index 0000000..8d2c27b --- /dev/null +++ b/test-cli/test/helpers/utils.py @@ -0,0 +1,68 @@ +import json +import socket +import os +import scanf + +def save_json_to_disk(filePath, description, mime, json_data, result): + with open(filePath, 'w') as outfile: + json.dump(json_data, outfile, indent=4) + outfile.close() + result.append( + { + "description": description, + "filepath": filePath, + "mimetype": mime + } + ) + +def save_file_to_disk(filePath, description, mime, data, result): + with open(filePath, 'w') as outfile: + outfile.write(data) + outfile.close() + result.append( + { + "description": description, + "filepath": filePath, + "mimetype": mime + } + ) + +def save_result (filePath, description, mime, result): + result.append( + { + "description": description, + "filepath": filePath, + "mimetype": mime + } + ) + + +def str2bool(v): + return v.lower() in ("yes", "true", "t", "1") + +def station2Port(base): + Name = socket.gethostname() + res = scanf.scanf("Station%d", Name) + if res[0]: + NmBase = int(base) + res = int(res[0]) + return str(NmBase + res) + return base + + +def sys_read(sysnode): + try: + f = open(sysnode, "r") + data = f.readline() + f.close() + except IOError as Error: + return False, '{}'.format(Error.errno) + return True, data + +def removefileIfExist(fname): + if os.path.exists(fname): + try: + os.remove(fname) + except OSError as error: + return False, str(error) + return True, '' diff --git a/test-cli/test/runners/simple.py b/test-cli/test/runners/simple.py index 2eb61a4..ece5a4e 100644 --- a/test-cli/test/runners/simple.py +++ b/test-cli/test/runners/simple.py @@ -5,6 +5,7 @@ Simple Test Runner for unittest module import sys import unittest +from test.helpers.iseelogger import logObj class SimpleTestRunner: @@ -59,28 +60,34 @@ class TextTestResult(unittest.TestResult): def addError(self, test, err): unittest.TestResult.addError(self, test, err) - # test.longMessage = err[1] + test.longMessage = err[1] self.result = self.ERROR - print(err[1]) + print(err) def addFailure(self, test, err): unittest.TestResult.addFailure(self, test, err) test.longMessage = err[1] self.result = self.FAIL + logObj.getlogger().info('{}:{}'.format(test, err)) def stopTest(self, test): - unittest.TestResult.stopTest(self, test) - # display: print test result - self.runner.writeUpdate(self.result) - # save result data - for result in test.getresults(): - self.__pgObj.upload_result_file(test.params["testidctl"], test.params["testid"], - result["description"], result["filepath"], result["mimetype"]) - # SEND TO DB THE RESULT OF THE TEST - if self.result == self.PASS: - status = "TEST_COMPLETE" - resulttext = test.gettextresult() - else: - status = "TEST_FAILED" - resulttext = test.longMessage - self.__pgObj.finish_test(test.params["testidctl"], test.params["testid"], status, resulttext) + try: + unittest.TestResult.stopTest(self, test) + # display: print test result + self.runner.writeUpdate(self.result) + # save result data + for result in test.getresults(): + self.__pgObj.upload_result_file(test.params["testidctl"], test.params["testid"], + result["description"], result["filepath"], result["mimetype"]) + # SEND TO DB THE RESULT OF THE TEST + if self.result == self.PASS: + status = "TEST_COMPLETE" + resulttext = test.gettextresult() + logObj.getlogger().info('{}:{}:{}:{}'.format(test.params["testidctl"], test.params["testid"], status, resulttext)) + else: + status = "TEST_FAILED" + resulttext = test.longMessage + logObj.getlogger().info('{}:{}:{}:{}'.format(test.params["testidctl"], test.params["testid"], status, resulttext)) + self.__pgObj.finish_test(test.params["testidctl"], test.params["testid"], status, resulttext) + except Exception as E: + logObj.getlogger().error('Exception: [{}]{}:{}:{}:{}'.format(E, test.params["testidctl"], test.params["testid"], status, resulttext)) \ No newline at end of file diff --git a/test-cli/test/tasks/flasheeprom.py b/test-cli/test/tasks/flasheeprom.py index feeb04c..775c556 100644 --- a/test-cli/test/tasks/flasheeprom.py +++ b/test-cli/test/tasks/flasheeprom.py @@ -2,55 +2,76 @@ import os import binascii from test.helpers.globalVariables import globalVar +class EEprom_Flasher: + __Name = None + __varlist = None + __xmlObj = None + __lastError = '' -def _generate_data_bytes(boarduuid, mac0, mac1): - data = bytearray() - data += (2029785358).to_bytes(4, 'big') # magicid --> 0x78FC110E - data += bytearray([0, 0, 0, 0]) # crc32 = 0 - data += bytearray(boarduuid + "\0", + def __init__(self, varlist): + self.__varlist = varlist + self.__xmlObj = varlist['xml'] + self.__Name = varlist.get('name_eeprom', self.__xmlObj.getKeyVal('EEProm_Flasher', 'name', 'EEProm_Flasher')) + # self.__igepflashPath = self.__xmlObj.getKeyVal('NAND_Flasher', 'igep_flash', '/usr/bin/igep-flash') + # self.__ImagePath = varlist.get('flashimagepath', '') + # self.__SkipNandtest = varlist.get('skipnandtest', self.__xmlObj.getKeyVal('NAND_Flasher', 'skipnandtest', '1')) + + def getTaskName(self): + return self.__Name + + def getError(self): + return self.__lastError + + def Execute(self): + return True, None + + def __generate_data_bytes(self, boarduuid, mac0, mac1): + data = bytearray() + data += (2029785358).to_bytes(4, 'big') # magicid --> 0x78FC110E + data += bytearray([0, 0, 0, 0]) # crc32 = 0 + data += bytearray(boarduuid + "\0", 'ascii') # uuid --> 'c0846c8a-5fa5-11ea-8576-f8b156ac62d7' and \0 at the end - data += binascii.unhexlify(mac0.replace(':', '')) # mac0 --> 'f8:b1:56:ac:62:d7' - if mac1 is not None: - data += binascii.unhexlify(mac1.replace(':', '')) # mac1 --> 'f8:b1:56:ac:62:d7' - else: - data += bytearray([0, 0, 0, 0, 0, 0]) # mac1 --> 0:0:0:0:0:0 - # calculate crc - crc = (binascii.crc32(data, 0)).to_bytes(4, 'big') - data[4:8] = crc - return data - - -def _generate_output_data(data_rx): - boarduuid = data_rx[8:44].decode('ascii') # no 8:45 porque el \0 final hace que no funcione postgresql - mac0 = binascii.hexlify(data_rx[45:51]).decode('ascii') - mac1 = binascii.hexlify(data_rx[51:57]).decode('ascii') - smac0 = ':'.join(mac0[i:i + 2] for i in range(0, len(mac0), 2)) - smac1 = ':'.join(mac1[i:i + 2] for i in range(0, len(mac1), 2)) - eepromdata = "UUID " + boarduuid + " , MAC0 " + smac0 + " , MAC1 " + smac1 - - return eepromdata - - -# returns exitcode and data saved into eeprom -def flash_eeprom(eeprompath, boarduuid, mac0, mac1=None): - print("Start programming Eeprom...") - # check if eeprompath is correct - if os.path.isfile(eeprompath): - # create u-boot data struct - data = _generate_data_bytes(boarduuid, mac0, mac1) - # write into eeprom and read back - f = open(eeprompath, "r+b") - f.write(data) - f.seek(0) - data_rx = f.read(57) - for i in range(57): - if data_rx[i] != data[i]: - print("Error while programming eeprom memory.") - return 1, None - print("Eeprom programmed succesfully.") - # generated eeprom read data - eepromdata = _generate_output_data(data_rx) - return 0, eepromdata - else: - print("Eeprom memory not found.") - return 1, None + data += binascii.unhexlify(mac0.replace(':', '')) # mac0 --> 'f8:b1:56:ac:62:d7' + if mac1 is not None: + data += binascii.unhexlify(mac1.replace(':', '')) # mac1 --> 'f8:b1:56:ac:62:d7' + else: + data += bytearray([0, 0, 0, 0, 0, 0]) # mac1 --> 0:0:0:0:0:0 + # calculate crc + crc = (binascii.crc32(data, 0)).to_bytes(4, 'big') + data[4:8] = crc + return data + + + def __generate_output_data(self, data_rx): + boarduuid = data_rx[8:44].decode('ascii') # no 8:45 porque el \0 final hace que no funcione postgresql + mac0 = binascii.hexlify(data_rx[45:51]).decode('ascii') + mac1 = binascii.hexlify(data_rx[51:57]).decode('ascii') + smac0 = ':'.join(mac0[i:i + 2] for i in range(0, len(mac0), 2)) + smac1 = ':'.join(mac1[i:i + 2] for i in range(0, len(mac1), 2)) + eepromdata = "UUID " + boarduuid + " , MAC0 " + smac0 + " , MAC1 " + smac1 + return eepromdata + + + # returns exitcode and data saved into eeprom + def flash_eeprom(self, eeprompath, boarduuid, mac0, mac1=None): + print("Start programming Eeprom...") + # check if eeprompath is correct + if os.path.isfile(eeprompath): + # create u-boot data struct + data = self.__generate_data_bytes(boarduuid, mac0, mac1) + # write into eeprom and read back + f = open(eeprompath, "r+b") + f.write(data) + f.seek(0) + data_rx = f.read(57) + for i in range(57): + if data_rx[i] != data[i]: + print("Error while programming eeprom memory.") + return 1, None + print("Eeprom programmed succesfully.") + # generated eeprom read data + eepromdata = self.__generate_output_data(data_rx) + return 0, eepromdata + else: + print("Eeprom memory not found.") + return 1, None diff --git a/test-cli/test/tasks/flashmemory.py b/test-cli/test/tasks/flashmemory.py index 3be56d7..ad9d92a 100644 --- a/test-cli/test/tasks/flashmemory.py +++ b/test-cli/test/tasks/flashmemory.py @@ -1,12 +1,44 @@ import sh +import os +class NAND_Flasher: + __Name = None + __varlist = None + __xmlObj = None + __igepflashPath = None + __lastError = '' + + def __init__(self, varlist): + self.__varlist = varlist + self.__xmlObj = varlist['xml'] + self.__Name = varlist.get('name_flashnand', self.__xmlObj.getKeyVal('NAND_Flasher', 'name', 'NAND_Flasher')) + self.__igepflashPath = self.__xmlObj.getKeyVal('NAND_Flasher', 'igep_flash', '/usr/bin/igep-flash') + self.__ImagePath = varlist.get('flashimagepath', '') + self.__SkipNandtest = varlist.get('skipnandtest', self.__xmlObj.getKeyVal('NAND_Flasher', 'skipnandtest', '1')) + + def getTaskName(self): + return self.__Name + + def getError(self): + return self.__lastError + + def Execute(self): + try: + self.__lastError = '' + if not os.path.isfile(self.__ImagePath): + self.__lastError('Not Image Found: {}'.format(self.__ImagePath)) + return False, None + if self.__SkipNandtest == '1': + p = sh.bash('{}'.format(self.__igepflashPath), "--skip-nandtest", "--image", self.__ImagePath) + else: + p = sh.bash('{}'.format(self.__igepflashPath), "--image", self.__ImagePath) + if p.exit_code != 0: + return False, None + except OSError as e: + self.__lastError('Exception: {}'.format(e.strerror)) + return False, None + except Exception as Error: + self.__lastError('Exception: {}'.format(Error)) + return False, None + return True, None -def flash_memory(imagepath): - print("Start programming Nand memory...") - p = sh.bash("/usr/bin/igep-flash", "--skip-nandtest", "--image", imagepath) - if p.exit_code != 0: - print("Flasher: Could not complete flashing task.") - return 1 - else: - print("Flasher: NAND memory flashed succesfully.") - return 0 diff --git a/test-cli/test/tests/qamper.py b/test-cli/test/tests/qamper.py index 58054c9..df340eb 100644 --- a/test-cli/test/tests/qamper.py +++ b/test-cli/test/tests/qamper.py @@ -5,6 +5,7 @@ from test.helpers.amper import Amper class Qamper(unittest.TestCase): params = None __resultlist = None # resultlist is a python list of python dictionaries + dummytest = False # varlist: undercurrent, overcurrent def __init__(self, testname, testfunc, varlist): @@ -21,8 +22,28 @@ class Qamper(unittest.TestCase): raise Exception('overcurrent param inside Qamp must be defined') self._testMethodDoc = testname self.__resultlist = [] + if "dummytest" in varlist: + self.dummytest = varlist["dummytest"] + + def saveresultinfile(self, currentvalue): + # save result in a file + with open('/mnt/station_ramdisk/amper.txt', 'w') as outfile: + n = outfile.write("Current: {} A".format(currentvalue)) + outfile.close() + self.__resultlist.append( + { + "description": "Amperimeter values", + "filepath": "/mnt/station_ramdisk/amper.txt", + "mimetype": "text/plain" + } + ) def execute(self): + if self.dummytest: + self.current = "0.0" + self.saveresultinfile(self.current) + return + amp = Amper() # open serial connection if not amp.open(): @@ -35,16 +56,7 @@ class Qamper(unittest.TestCase): # get current value (in Amperes) self.current = amp.getCurrent() # save result in a file - with open('/mnt/station_ramdisk/amper.txt', 'w') as outfile: - n = outfile.write("Current: {} A".format(self.current)) - outfile.close() - self.__resultlist.append( - { - "description": "Amperimeter values", - "filepath": "/mnt/station_ramdisk/amper.txt", - "mimetype": "text/plain" - } - ) + self.saveresultinfile(self.current) # close serial connection amp.close() # Check current range diff --git a/test-cli/test/tests/qaudio.py b/test-cli/test/tests/qaudio.py index 3d2113a..2c86809 100644 --- a/test-cli/test/tests/qaudio.py +++ b/test-cli/test/tests/qaudio.py @@ -2,56 +2,78 @@ import unittest import sh import wave import contextlib - - -def calc_audio_duration(fname): - with contextlib.closing(wave.open(fname, 'r')) as f: - frames = f.getnframes() - rate = f.getframerate() - duration = frames / float(rate) - f.close() - return duration - +import uuid +import time +from test.helpers.utils import save_result +from test.helpers.iseelogger import logObj class Qaudio(unittest.TestCase): params = None __resultlist = None # resultlist is a python list of python dictionaries __dtmf_secuence = ["1", "3", "5", "7", "9"] + __dtmf_file = None + __file_uuid = None + __QaudioName = None def __init__(self, testname, testfunc, varlist): self.params = varlist super(Qaudio, self).__init__(testfunc) self._testMethodDoc = testname self.__resultlist = [] + self.__xmlObj = varlist["xml"] + self.__QaudioName = varlist.get('name', 'qaudio') + self.__dtmf_file_play = varlist.get('play_dtmf_file', self.__xmlObj.getKeyVal(self.__QaudioName, "play_dtmf_file", "dtmf-13579.wav")) + self.__recordtime = varlist.get('recordtime', self.__xmlObj.getKeyVal(self.__QaudioName, "recordtime", "0.15")) + self.__dtmf_file_record = varlist.get('record_dtmf_file', self.__xmlObj.getKeyVal(self.__QaudioName, "record_dtmf_file", "record-{}.wav")) + self.__sampling_rate = varlist.get('rate', self.__xmlObj.getKeyVal(self.__QaudioName, "rate", "8000")) + self.__record_path = varlist.get('to', self.__xmlObj.getKeyVal(self.__QaudioName, "to", "/mnt/station_ramdisk")) + self.__run_delay = float(varlist.get('aplay_run_delay', self.__xmlObj.getKeyVal(self.__QaudioName, "aplay_run_delay", "0.0"))) + self.__file_uuid = uuid.uuid4() + self.__dtmf_file_record = self.__dtmf_file_record.format(self.__file_uuid) + + def restoreAlsaCtl(self): + try: + sh.alsactl('restore'); + except Exception as E: + logObj.getlogger().error("Failed to Execite alsactl restore"); def execute(self): - # analize audio file - recordtime = calc_audio_duration("/var/lib/hwtest-files/dtmf-13579.wav") + 0.15 + self.restoreAlsaCtl() + try: + # analize audio file + calcRecordTime = self.calc_audio_duration(self.__dtmf_file_play) + float(self.__recordtime) + float(0.1) + except TypeError as Error: + self.fail("failed: TypeError: {}.".Error) + # previous play to estabilise audio lines - p1 = sh.aplay("/var/lib/hwtest-files/dtmf-13579.wav", _bg=True) - p2 = sh.arecord("-r", 8000, "-d", recordtime, "/mnt/station_ramdisk/recorded.wav", _bg=True) - p1.wait() - p2.wait() - # play and record - p1 = sh.aplay("/var/lib/hwtest-files/dtmf-13579.wav", _bg=True) - p2 = sh.arecord("-r", 8000, "-d", recordtime, "/mnt/station_ramdisk/recorded.wav", _bg=True) - p1.wait() + p2 = sh.arecord("-r", self.__sampling_rate, "-d", calcRecordTime, "{}/{}".format(self.__record_path, self.__dtmf_file_record), _bg=True) + time.sleep(self.__run_delay) + p1 = sh.aplay(self.__dtmf_file_play) p2.wait() if p1.exit_code == 0 and p2.exit_code == 0: - p = sh.multimon("-t", "wav", "-a", "DTMF", "/mnt/station_ramdisk/recorded.wav", "-q") + p = sh.multimon("-t", "wav", "-a", "DTMF", "{}/{}".format(self.__record_path, self.__dtmf_file_record), "-q") if p.exit_code == 0: lines = p.stdout.decode('ascii').splitlines() self.__dtmf_secuence_result = [] for li in lines: if li.split(" ")[0] == "DTMF:": self.__dtmf_secuence_result.append(li.split(" ")[1]) + self.__dtmf_secuence_result = sorted(list(set(self.__dtmf_secuence_result))) # compare original and processed dtmf sequence if len(self.__dtmf_secuence) == len(self.__dtmf_secuence_result): for a, b in zip(self.__dtmf_secuence, self.__dtmf_secuence_result): if a != b: - self.fail("failed: sent and received DTMF sequence don't match.") + logObj.getlogger().debug( + "failed: sent and received DTMF sequence do not match.{} != {}".format( + self.__dtmf_secuence, self.__dtmf_secuence_result)) + self.fail("failed: sent and received DTMF sequence do not match.") + save_result(filePath='{}/{}'.format(self.__record_path, self.__dtmf_file_record), + description='sound record', + mime='audio/x-wav', + result=self.__resultlist) else: - self.fail("failed: received DTMF sequence is shorter than expected.") + logObj.getlogger().debug("received DTMF sequence is shorter than expected.{} {}".format(self.__dtmf_secuence,self.__dtmf_secuence_result)) + self.fail("received DTMF sequence is shorter than expected") else: self.fail("failed: unable to use multimon command.") else: @@ -62,3 +84,12 @@ class Qaudio(unittest.TestCase): def gettextresult(self): return "" + + def calc_audio_duration(self, fname): + duration = 0.0 + with contextlib.closing(wave.open(fname, 'r')) as f: + frames = f.getnframes() + rate = f.getframerate() + duration = frames / float(rate) + f.close() + return duration diff --git a/test-cli/test/tests/qdmesg.py b/test-cli/test/tests/qdmesg.py index b35f1ff..39a5047 100644 --- a/test-cli/test/tests/qdmesg.py +++ b/test-cli/test/tests/qdmesg.py @@ -1,4 +1,5 @@ import sh +import os import os.path from os import path @@ -9,24 +10,23 @@ class Qdmesg: def __init__(self, testname, testfunc, varlist): self.params = varlist - self._testMethodDoc = testname + self.__testMethodDoc = testname self.__resultlist = [] self.pgObj = varlist["db"] + self.__xmlObj = varlist["xml"] + self.__QdmesgName = varlist.get('name', 'qdmesg') + self.__syslog_dmesg_file = varlist.get('syslog_dmesg',self.__xmlObj.getKeyVal(self.__QdmesgName, "syslog_dmesg", + "/var/log/kern.log")) + + def getTestName(self): + return self.__testMethodDoc def execute(self): - print("running dmesg test") self.pgObj.run_test(self.params["testidctl"], self.params["testid"]) - # delete previous file - if path.exists("/mnt/station_ramdisk/dmesg.txt"): - os.remove("/mnt/station_ramdisk/dmesg.txt") - # generate file - p = sh.dmesg("--color=never", _out="/mnt/station_ramdisk/dmesg.txt") - if p.exit_code == 0: - # save dmesg result in DB - self.pgObj.upload_result_file(self.params["testidctl"], self.params["testid"], "dmesg output", - "/mnt/station_ramdisk/dmesg.txt", "text/plain") - self.pgObj.finish_test(self.params["testidctl"], self.params["testid"], "TEST_COMPLETE", "") - print("success dmesg test") - else: + if not os.path.isfile('{}'.format(self.__syslog_dmesg_file)): self.pgObj.finish_test(self.params["testidctl"], self.params["testid"], "TEST_FAILED", "") - print("fail dmesg test") + return False + self.pgObj.upload_result_file(self.params["testidctl"], self.params["testid"], "{}".format(self.__syslog_dmesg_file), + "{}".format(self.__syslog_dmesg_file), "text/plain") + self.pgObj.finish_test(self.params["testidctl"], self.params["testid"], "TEST_COMPLETE", "") + return True diff --git a/test-cli/test/tests/qeeprom.py b/test-cli/test/tests/qeeprom.py index 7fc9fcd..7900381 100644 --- a/test-cli/test/tests/qeeprom.py +++ b/test-cli/test/tests/qeeprom.py @@ -1,46 +1,65 @@ -import sh import unittest - +import os class Qeeprom(unittest.TestCase): params = None - __position = None - __eeprompath = None __resultlist = None # resultlist is a python list of python dictionaries + __QEepromName = None + __i2cBus = None + __device = None # varlist: position, eeprompath def __init__(self, testname, testfunc, varlist): self.params = varlist super(Qeeprom, self).__init__(testfunc) self._testMethodDoc = testname + self.__xmlObj = varlist["xml"] + self.__QEepromName = varlist.get('name', 'qeeprom') + self.__i2cAddress = varlist.get('i2c_address', self.__xmlObj.getKeyVal(self.__QEepromName, "i2c_address", "0050")) + self.__i2cBus = varlist.get('i2c_bus', self.__xmlObj.getKeyVal(self.__QEepromName, "i2c_bus", "0")) + self.__device = '/sys/bus/i2c/devices/{}-{}/eeprom'.format(self.__i2cBus, self.__i2cAddress) + self.__resultlist = [] - if "position" in varlist: - self.__position = varlist["position"] - else: - raise Exception('position param inside Qeeprom must be defined') - if "eeprompath" in varlist: - self.__eeprompath = varlist["eeprompath"] - else: - raise Exception('eeprompath param inside Qeeprom must be defined') + + def __check_device(self): + if os.path.isfile(self.__device): + return True + return False + + def __eeprom_write(self, data): + try: + f = open(self.__device, "wb") + f.write(data) + f.close() + except IOError as Error: + return False, '{}'.format(Error.errno) + return True, '' + + def __eeprom_read(self, size): + try: + f = open(self.__device, "rb") + data = f.read(size) + f.close() + except IOError as Error: + return False, '{}'.format(Error.errno) + return True, data + def execute(self): - # generate some data - data_tx = "isee_test" - # write data into the eeprom - p = sh.dd(sh.echo(data_tx), "of=" + self.__eeprompath, "bs=1", "seek=" + self.__position) - if p.exit_code == 0: - # read data from the eeprom - p = sh.dd("if=" + self.__eeprompath, "bs=1", "skip=" + self.__position, "count=" + str(len(data_tx))) - if p.exit_code == 0: - data_rx = p.stdout.decode('ascii') - # compare both values - if data_rx != data_tx: - # Both data are different - self.fail("failed: mismatch between written and received values.") - else: - self.fail("failed: Unable to read from the EEPROM device.") - else: - self.fail("failed: Unable to write on the EEPROM device.") + if not self.__check_device(): + self.fail("cannot open device {}".format(self.__device)) + + data = bytes('AbcDeFgHijK098521', 'utf-8') + + v, w = self.__eeprom_write(data) + if not v: + self.fail("eeprom: {} write test failed".format(w)) + v, r = self.__eeprom_read(len(data)) + if not v: + self.fail("eeprom: {} read test failed".format(r)) + if r != data: + self.fail("mismatch between write and read bytes") + def getresults(self): return self.__resultlist diff --git a/test-cli/test/tests/qethernet.py b/test-cli/test/tests/qethernet.py index 81acef1..7d081a1 100644 --- a/test-cli/test/tests/qethernet.py +++ b/test-cli/test/tests/qethernet.py @@ -2,7 +2,11 @@ import unittest import sh import json import time - +import uuid +import iperf3 +import netifaces +from test.helpers.utils import save_json_to_disk +from test.helpers.utils import station2Port class Qethernet(unittest.TestCase): __serverip = None @@ -16,29 +20,66 @@ class Qethernet(unittest.TestCase): # varlist content: serverip, bwexpected, port def __init__(self, testname, testfunc, varlist): - # save the parameters list self.params = varlist - # configure the function to be executed when the test runs. "testfunc" is a name of a method inside this - # class, that in this situation, it can only be "execute". super(Qethernet, self).__init__(testfunc) - # validate and get the parameters - if "serverip" in varlist: - self.__serverip = varlist["serverip"] - else: - raise Exception('sip param inside Qethernet have been be defined') - if "bwexpected" in varlist: - self.__bwexpected = varlist["bwexpected"] - else: - raise Exception('bwexpected param inside Qethernet must be defined') - if "port" in varlist: - self.__port = varlist["port"] - else: - raise Exception('port param inside Qethernet must be defined') - self.__numbytestx = "10M" self._testMethodDoc = testname + self.__xmlObj = varlist["xml"] + self.__QEthName = varlist.get('name', 'qeth') + self.__loops = varlist.get('loops', self.__xmlObj.getKeyVal(self.__QEthName, "loops", "1")) + self.__port = varlist.get('port', self.__xmlObj.getKeyVal(self.__QEthName, "port", "5000")) + self.__bw = varlist.get('bwexpected', self.__xmlObj.getKeyVal(self.__QEthName, "bwexpected", "40.0")) + self.__serverip = varlist.get('serverip', self.__xmlObj.getKeyVal(self.__QEthName, "serverip", "localhost")) + self.__duration = varlist.get('duration', self.__xmlObj.getKeyVal(self.__QEthName, "duration", "10")) + self.__interface = varlist.get('interface', self.__xmlObj.getKeyVal(self.__QEthName, "interface", "eth0")) + self.__toPath = varlist.get('to', self.__xmlObj.getKeyVal(self.__QEthName, "to", "/mnt/station_ramdisk")) + self.__retriesCount = varlist.get('retries', self.__xmlObj.getKeyVal(self.__QEthName, "retries", "5")) + self.__waitRetryTime = varlist.get('wait_retry', self.__xmlObj.getKeyVal(self.__QEthName, "wait_retry", "10")) + self.__wifi_res_file = varlist.get('eth_res_file', self.__xmlObj.getKeyVal(self.__QEthName, "eth_res_file", "eth_test_{}.json")) + self.__wifi_st_file = varlist.get('eth_st_file', self.__xmlObj.getKeyVal(self.__QEthName, "eth_st_file", "eth_st_{}.json")) + self.__file_uuid = uuid.uuid4() + self.__wifi_res_file = self.__wifi_res_file.format(self.__file_uuid) + self.__wifi_st_file = self.__wifi_st_file.format(self.__file_uuid) + self.__resultlist = [] + def checkInterface(self, reqInterface): + ifaces = netifaces.interfaces() + for t in ifaces: + if t == reqInterface: + return True + return False + def execute(self): + self.__resultlist = [] + # check interfaces + if not self.checkInterface(self.__interface): + self.fail('Interface not found: {}'.format(self.__interface)) + + # Start Iperf + client = iperf3.Client() + client.duration = int(self.__duration) + client.server_hostname = self.__serverip + client.port = station2Port(self.__port) + count = int(self.__retriesCount) + while count > 0: + result = client.run() + if result.error == None: + if result.received_Mbps >= float(self.__bw) and result.sent_Mbps >= float(self.__bw): + save_json_to_disk(filePath='{}/{}'.format(self.__toPath, self.__wifi_res_file), + description='eth {} iperf3'.format(self.__interface), + mime='application/json', + json_data=result.json, + result=self.__resultlist) + return + else: + self.fail('bw fail: rx:{} tx:{}'.format(result.received_Mbps, result.sent_Mbps)) + else: + count = count - 1 + time.sleep(int(self.__waitRetryTime)) + + self.fail('qEth (max tries) Execution error: {}'.format(result.error)) + + def execute2(self): # execute iperf command against the server, but it implements attempts in case the server is busy iperfdone = False while not iperfdone: diff --git a/test-cli/test/tests/qmmcflash.py b/test-cli/test/tests/qmmcflash.py index f10757e..0f5a0c1 100644 --- a/test-cli/test/tests/qmmcflash.py +++ b/test-cli/test/tests/qmmcflash.py @@ -1,5 +1,10 @@ import unittest -import sh +import scanf +from scanf import scanf +from sh import mmc +from sh import ErrorReturnCode +from test.helpers.utils import save_file_to_disk +from test.helpers.utils import sys_read class Qmmcflash(unittest.TestCase): params = None @@ -8,8 +13,43 @@ class Qmmcflash(unittest.TestCase): self.params = varlist super(Qmmcflash, self).__init__(testfunc) self._testMethodDoc = testname - # TODO + self.__xmlObj = varlist["xml"] + self.__QeMMCName = varlist.get('name', 'qemmc') + self.__emmcDevice = varlist.get('device', self.__xmlObj.getKeyVal(self.__QeMMCName, "device", "mmcblk0")) + self.__toPath = varlist.get('to', self.__xmlObj.getKeyVal(self.__QeMMCName, "to", "/mnt/station_ramdisk")) + self.__mmc_res_file = varlist.get('emmc_res_file', self.__xmlObj.getKeyVal(self.__QeMMCName, "emmc_res_file", "emmc_status.txt")) + self.__mmcPort = varlist.get('mmc_port', self.__xmlObj.getKeyVal(self.__QeMMCName, "mmc_port", "0")) + self.__mmcID = varlist.get('mmc_id', self.__xmlObj.getKeyVal(self.__QeMMCName, "mmc_id", "0001")) + + self.__resultlist = [] def execute(self): - # TODO \ No newline at end of file + try: + dataOut = mmc('extcsd', 'read', '/dev/{}'.format(self.__emmcDevice)) + save_file_to_disk(filePath='{}/{}'.format(self.__toPath, self.__mmc_res_file), + description='eMMC health test', + mime='text/plain', + data=dataOut.stdout.decode('utf-8'), + result=self.__resultlist) + sysDevice = "/sys/bus/mmc/drivers/mmcblk/mmc{}:{}".format(self.__mmcPort, self.__mmcID) + r, data = sys_read("{}/life_time".format(sysDevice)) + if not r: + self.fail("emmc: life_time not found") + res = scanf("0x%d 0x%d", data) + if res[0] > 3 or res[1] > 3: + self.fail("emmc: review {} life_time > 3".format(sysDevice)) + r, data = sys_read("{}/pre_eol_info".format(sysDevice)) + if not r: + self.fail("emmc: pre_eol_info not found") + res = scanf("0x%d", data) + if res[0] != 1: + self.fail("emmc: review {} pre_eol_info != 1".format(sysDevice)) + except ErrorReturnCode as Error: + self.fail("emmc: failed {} ".format(Error.exit_code)) + + def getresults(self): + return self.__resultlist + + def gettextresult(self): + return "" \ No newline at end of file diff --git a/test-cli/test/tests/qnand.py b/test-cli/test/tests/qnand.py index 988ab60..6de1eaf 100644 --- a/test-cli/test/tests/qnand.py +++ b/test-cli/test/tests/qnand.py @@ -1,6 +1,7 @@ import unittest import sh - +import uuid +from test.helpers.utils import save_file_to_disk class Qnand(unittest.TestCase): params = None @@ -11,29 +12,34 @@ class Qnand(unittest.TestCase): def __init__(self, testname, testfunc, varlist): self.params = varlist super(Qnand, self).__init__(testfunc) - if "device" in varlist: - self.__device = varlist["device"] - else: - raise Exception('device param inside Qnand must be defined') self._testMethodDoc = testname + self.__xmlObj = varlist["xml"] + self.__QNandName = varlist.get('name', 'qnand') + self.__device = varlist.get('device', self.__xmlObj.getKeyVal(self.__QNandName, "device", "/dev/mtd0")) + self.__nand_res_file = varlist.get('nand_res_file', self.__xmlObj.getKeyVal(self.__QNandName, "nand_res_file", "nand_test_{}.txt")) + self.__toPath = varlist.get('to', self.__xmlObj.getKeyVal(self.__QNandName, "to", "/mnt/station_ramdisk")) + self.__file_uuid = uuid.uuid4() + self.__nand_res_file = self.__nand_res_file.format(self.__file_uuid) + nandtestPath = self.__xmlObj.getKeyVal("qnand", "nandtestPath", '/root/hwtest-files/nandtest"') + + if nandtestPath == '': + self.myNandTest = sh.nandtest + else: + self.myNandTest = sh.Command(nandtestPath) self.__resultlist = [] def execute(self): try: - p = sh.nandtest("-m", self.__device) - # save result - with open('/mnt/station_ramdisk/nand-nandtest.txt', 'w') as outfile: - n = outfile.write(p.stdout.decode('ascii')) - outfile.close() - self.__resultlist.append( - { - "description": "nandtest output", - "filepath": "/mnt/station_ramdisk/nand-nandtest.txt", - "mimetype": "text/plain" - } - ) - except sh.ErrorReturnCode as e: - self.fail("failed: could not complete nandtest command") + res = self.myNandTest("-m", self.__device) + save_file_to_disk(filePath='{}/{}'.format(self.__toPath, self.__nand_res_file), + description='nand {} test'.format(self.__device), + mime='text/plain', + data=res.stdout.decode('utf-8'), + result=self.__resultlist) + + except sh.ErrorReturnCode_1 as e: + self.fail("nandtest failed: {}".format(e.ErrorReturnCode.stdout)) + def getresults(self): return self.__resultlist diff --git a/test-cli/test/tests/qplc.py b/test-cli/test/tests/qplc.py new file mode 100644 index 0000000..01258a9 --- /dev/null +++ b/test-cli/test/tests/qplc.py @@ -0,0 +1,70 @@ +import shutil +import unittest +import os.path +import os +import sh + +from test.helpers.md5 import md5_file +from test.helpers.plc import dcpPLC + +class Qplc(unittest.TestCase): + params = None + __resultlist = None # resultlist is a python list of python dictionaries + __QPLCName = None + __plc = None + + def __init__(self, testname, testfunc, varlist): + self.params = varlist + super(Qplc, self).__init__(testfunc) + self._testMethodDoc = testname + self.__xmlObj = varlist["xml"] + + self.__QPLCName = varlist.get('name', 'qplc') + self.__firmware = varlist.get('firmware', self.__xmlObj.getKeyVal(self.__QPLCName, "firmware", "")) + self.__firmware_md5 = varlist.get('firmware_md5', self.__xmlObj.getKeyVal(self.__QPLCName, "firmware_md5", "")) + self.__factoryTool = varlist.get('factory_tool', self.__xmlObj.getKeyVal(self.__QPLCName, "factory_tool", "")) + self.__gen_ip = varlist.get('gen_ip', self.__xmlObj.getKeyVal(self.__QPLCName, "gen_ip", "0")) + self.__gen_mac = varlist.get('gen_mac', self.__xmlObj.getKeyVal(self.__QPLCName, "gen_mac", "0")) + self.__mtd_device = varlist.get('mtd_device', self.__xmlObj.getKeyVal(self.__QPLCName, "mtd_device", "/dev/mtd0")) + self.__plc = dcpPLC(self.__factoryTool, self.__mtd_device) + + self.__resultlist = [] + + + def checkfiles(self): + if not os.path.isfile('{}/{}'.format(self.__filepath_from, self.__file_source)): + return False, 'File test binary Not found {}/{}'.format(self.__filepath_from, self.__file_source) + if not os.path.isfile('{}/{}'.format(self.__filepath_from, self.__file_md5)): + return False, 'File test binary md5 Not found {}/{}'.format(self.__filepath_from, self.__file_md5) + if self.__check_mounted != '': + if not os.path.ismount('{}'.format(self.__check_mounted)): + return False, 'Required mounted failed: {}'.format(self.__path_to) + r, s = self.removefileIfExist('{}/{}'.format(self.__path_to, self.__file_source)) + if not r: + return r, s + return True, '' + + def __flashMemory(self): + self.__plc.setSaveFirmwareMode() + r, v = self.__plc.SaveFirmware(self.__firmware) + if not r: + self.fail(v) + + def execute(self): + # Check files + v, m = self.checkfiles() + if not v: + self.fail(m) + # do flash & verify + self.__flashMemory() + # do Plc boot + self.__plc.setBootMode() + # Wait plc goes UP + + + + def getresults(self): + return self.__resultlist + + def gettextresult(self): + return "" diff --git a/test-cli/test/tests/qram.py b/test-cli/test/tests/qram.py index 21a01c8..49d4d54 100644 --- a/test-cli/test/tests/qram.py +++ b/test-cli/test/tests/qram.py @@ -1,33 +1,59 @@ import unittest import sh - +# from test.helpers.iseelogger import MeasureTime +# from test.helpers.iseelogger import logObj +from test.helpers.utils import save_file_to_disk class Qram(unittest.TestCase): params = None - __memsize = "10M" - __loops = "1" - __resultlist = None # resultlist is a python list of python dictionaries + __memsize = None + __loops = None + __resultlist = [] # resultlist is a python list of python dictionaries # varlist: memsize, loops def __init__(self, testname, testfunc, varlist): self.params = varlist super(Qram, self).__init__(testfunc) - if "memsize" in varlist: - self.__memsize = varlist["memsize"] - else: - raise Exception('memsize param inside Qram must be defined') - if "loops" in varlist: - self.__loops = varlist["loops"] - else: - raise Exception('loops param inside Qram must be defined') self._testMethodDoc = testname - self.__resultlist = [] + self.__xmlObj = varlist["xml"] + + self.__QramName = varlist.get('name', 'qram') + self.__loops = varlist.get('loops', self.__xmlObj.getKeyVal(self.__QramName, "loops", "1")) + self.__memsize = varlist.get('memsize', self.__xmlObj.getKeyVal(self.__QramName, "memsize", "50M")) + self.__toPath = varlist.get('to', self.__xmlObj.getKeyVal(self.__QramName, "to", "/mnt/station_ramdisk")) + self.__ram_res_file = varlist.get('ram_res_file', self.__xmlObj.getKeyVal(self.__QramName, "ram_res_file", "ram_res.txt")) + + self.__dummytest = varlist.get('dummytest', self.__xmlObj.getKeyVal(self.__QramName, "dummytest", "0")) + self.__dummyresult = varlist.get('dummytestresult', self.__xmlObj.getKeyVal(self.__QramName, "dummytest", "0")) + + memtesterPath = self.__xmlObj.getKeyVal(self.__QramName, "memtesterPath", '') + + if memtesterPath == '': + self.myMemtester = sh.memtester + else: + self.myMemtester = sh.Command(memtesterPath) def execute(self): + self.__resultlist = [] try: - p = sh.memtester(self.__memsize, "1", _out="/dev/null") + # mytime = MeasureTime() + res = self.myMemtester(self.__memsize, '{}'.format(self.__loops)) + save_file_to_disk(filePath='{}/{}'.format(self.__toPath, self.__ram_res_file), + description='Ram result test', + mime='text/plain', + data=res.stdout.decode('utf-8'), + result=self.__resultlist) + # mytime.stop() except sh.ErrorReturnCode as e: - self.fail("failed: could not complete memtester command") + save_file_to_disk(filePath='{}/{}'.format(self.__toPath, self.__ram_res_file), + description='Ram result test', + mime='text/plain', + data=e.stdout.decode('utf-8'), + result=self.__resultlist) + self.fail("failed: memtester {}::{}".format(str(e.exit_code), e.stdout.decode('utf-8'))) + + except Exception as details: + self.fail('Error: {}'.format(details)) def getresults(self): return self.__resultlist diff --git a/test-cli/test/tests/qusb.py b/test-cli/test/tests/qusb.py index 6302012..3182685 100644 --- a/test-cli/test/tests/qusb.py +++ b/test-cli/test/tests/qusb.py @@ -1,91 +1,90 @@ -import sh +import shutil import unittest -from test.helpers.usb import USBDevices import os.path +import os +import sh +from test.helpers.md5 import md5_file class Qusb(unittest.TestCase): params = None __resultlist = None # resultlist is a python list of python dictionaries + __QusbName = None def __init__(self, testname, testfunc, varlist): self.params = varlist super(Qusb, self).__init__(testfunc) self._testMethodDoc = testname - if "repetitions" in varlist: - self.__repetitions = varlist["repetitions"] - else: - raise Exception('repetitions param inside Qusb must be defined') + self.__xmlObj = varlist["xml"] + + self.__QusbName = varlist.get('name', 'qusb') + self.__loops = varlist.get('loops', self.__xmlObj.getKeyVal(self.__QusbName, "loops", "3")) + self.__filepath_from = varlist.get('path', self.__xmlObj.getKeyVal(self.__QusbName, "path", "/root/hwtest-files")) + self.__file_source = varlist.get('file', self.__xmlObj.getKeyVal(self.__QusbName, "file", "usb-test.bin")) + self.__file_md5 = varlist.get('file_md5', self.__xmlObj.getKeyVal(self.__QusbName, "file_md5", "usb-test.bin.md5")) + self.__path_to = varlist.get('pathto', self.__xmlObj.getKeyVal(self.__QusbName, "pathto", "/mnt/test/disk2")) + self.__check_mounted = varlist.get('check_mounted', self.__xmlObj.getKeyVal(self.__QusbName, "check_mounted", "")) + self.__resultlist = [] - def execute(self): - # get usb device - dev_obj = USBDevices(self.params["xml"]) - if dev_obj.getMassStorage(): - device = dev_obj.getMassStorage()['disk'] + "1" - else: - self.fail("failed: No USB memory found.") - # check if the device is mounted, and umount it - try: - p = sh.findmnt("-n", device) - if p.exit_code == 0: - sh.umount(device) - except sh.ErrorReturnCode as e: - # error = 1 means "no found" - pass - # mount the device - sh.mkdir("-p", "/mnt/station_ramdisk/pendrive") - p = sh.mount(device, "/mnt/station_ramdisk/pendrive") - if p.exit_code != 0: - self.fail("failed: Unable to mount the USB memory device.") - # execute test - for i in range(int(self.__repetitions)): - # copy files + def removefileIfExist(self, fname): + if os.path.exists(fname): try: - p = sh.cp("/var/lib/hwtest-files/usbdatatest.bin", - "/var/lib/hwtest-files/usbdatatest.bin.md5", - "/mnt/station_ramdisk/pendrive") - except sh.ErrorReturnCode as e: - try: - sh.umount("/mnt/station_ramdisk/pendrive") - except sh.ErrorReturnCode: - pass - sh.rmdir("/mnt/station_ramdisk/pendrive") - self.fail("failed: Unable to copy files to the USB memory device.") - # check if the device is still mounted - if not os.path.ismount("/mnt/station_ramdisk/pendrive"): - sh.rm("/mnt/station_ramdisk/pendrive/*") - sh.rmdir("/mnt/station_ramdisk/pendrive") - self.fail("failed: USB device unmounted during/after copying files.") - # check MD5 - try: - p = sh.md5sum("/mnt/station_ramdisk/pendrive/usbdatatest.bin") - except sh.ErrorReturnCode as e: - try: - sh.umount("/mnt/station_ramdisk/pendrive") - except sh.ErrorReturnCode: - pass - sh.rmdir("/mnt/station_ramdisk/pendrive") - self.fail("failed: Unable to calculate MD5 of the copied file.") - newmd5 = p.stdout.decode().split(" ")[0] - with open('/mnt/station_ramdisk/pendrive/usbdatatest.bin.md5', 'r') as outfile: - oldmd5 = outfile.read().rstrip("\n") - outfile.close() - if newmd5 != oldmd5: - try: - sh.umount("/mnt/station_ramdisk/pendrive") - except sh.ErrorReturnCode: - pass - sh.rmdir("/mnt/station_ramdisk/pendrive") - self.fail("failed: MD5 check failed.") - # delete copied files - sh.rm("-f", "/mnt/station_ramdisk/pendrive/usbdatatest.bin", "/mnt/station_ramdisk/pendrive/usbdatatest.bin.md5") - # Finish + os.remove(fname) + except OSError as error: + return False, str(error) + return True, '' + + def checkfiles(self): + if not os.path.isfile('{}/{}'.format(self.__filepath_from, self.__file_source)): + return False, 'File test binary Not found {}/{}'.format(self.__filepath_from, self.__file_source) + if not os.path.isfile('{}/{}'.format(self.__filepath_from, self.__file_md5)): + return False, 'File test binary md5 Not found {}/{}'.format(self.__filepath_from, self.__file_md5) + if self.__check_mounted != '': + if not os.path.ismount('{}'.format(self.__check_mounted)): + return False, 'Required mounted failed: {}'.format(self.__path_to) + r, s = self.removefileIfExist('{}/{}'.format(self.__path_to, self.__file_source)) + if not r: + return r, s + return True, '' + + def getTestFile_md5(self, fname): + t = '' + with open('{}'.format(fname), 'r') as outfile: + t = outfile.read().rstrip("\n") + outfile.close() + return t + + def ExecuteTranfer(self): try: - sh.umount("/mnt/station_ramdisk/pendrive") - except sh.ErrorReturnCode: - pass - sh.rmdir("/mnt/station_ramdisk/pendrive") + originalMD5 = self.getTestFile_md5('{}/{}'.format(self.__filepath_from, self.__file_md5)) + if originalMD5 == '': + self.fail('MD5 file {}/{} Not contain any md5'.format(self.__filepath_from, self.__file_md5)) + # shutil.copy2('{}/{}'.format(self.__filepath_from, self.__file_source), '{}'.format(self.__path_to)) + sh.cp('{}/{}'.format(self.__filepath_from, self.__file_source), '{}'.format(self.__path_to)) + sh.sync() + md5val = md5_file('{}/{}'.format(self.__path_to, self.__file_source)) + self.removefileIfExist('{}/{}'.format(self.__path_to, self.__file_source)) + if originalMD5 != md5val: + return False, 'md5 check failed {}<>{}'.format(originalMD5, md5val) + except OSError as why: + return False,'Error: {} tranfer failed'.format(why.errno) + except Exception as details: + return False, 'USB Exception: {} tranfer failed'.format(details) + return True, '' + + def execute(self): + v, m = self.checkfiles() + if not v: + self.fail(m) + countLoops = int(self.__loops) + while countLoops > 0: + res, msg = self.ExecuteTranfer() + if not res: + res, msg = self.ExecuteTranfer() + if not res: + self.fail(msg) + countLoops = countLoops - 1 def getresults(self): return self.__resultlist diff --git a/test-cli/test/tests/qusbdual.py b/test-cli/test/tests/qusbdual.py deleted file mode 100644 index 05b22c3..0000000 --- a/test-cli/test/tests/qusbdual.py +++ /dev/null @@ -1,90 +0,0 @@ -import sh -import unittest -import os.path -import time - - -class Qusbdual(unittest.TestCase): - params = None - __resultlist = None # resultlist is a python list of python dictionaries - - def __init__(self, testname, testfunc, varlist): - self.params = varlist - super(Qusbdual, self).__init__(testfunc) - self._testMethodDoc = testname - if "repetitions" in varlist: - self.__repetitions = varlist["repetitions"] - else: - raise Exception('repetitions param inside Qusbdual must be defined') - self.__resultlist = [] - - def execute(self): - # check if file-as-filesystem exists - if not os.path.isfile('/var/lib/hwtest-files/mass-otg-test.img'): - self.fail("failed: Unable to find file-as-filesystem image.") - # generate mass storage gadget - p = sh.modprobe("g_mass_storage", "file=/var/lib/hwtest-files/mass-otg-test.img") - if p.exit_code != 0: - self.fail("failed: Unable to create a mass storage gadget.") - # wait to detect the new device - time.sleep(3) - # find the mass storage device - try: - p = sh.grep(sh.lsblk("-So", "NAME,VENDOR"), "Linux") - except sh.ErrorReturnCode: - sh.modprobe("-r", "g_mass_storage") - self.fail("failed: could not find any mass storage gadget") - device = p.stdout.decode().split(" ")[0] - # mount the mass storage gadget - sh.mkdir("-p", "/mnt/station_ramdisk/hdd_gadget") - p = sh.mount("-o", "ro", "/dev/" + device, "/mnt/station_ramdisk/hdd_gadget") - if p.exit_code != 0: - sh.modprobe("-r", "g_mass_storage") - self.fail("failed: Unable to mount the mass storage gadget.") - # execute test - for i in range(int(self.__repetitions)): - # copy files - try: - p = sh.cp("/mnt/station_ramdisk/hdd_gadget/usb-test.bin", - "/mnt/station_ramdisk/hdd_gadget/usb-test.bin.md5", - "/mnt/station_nfsdisk/") - except sh.ErrorReturnCode as e: - sh.umount("/mnt/station_ramdisk/hdd_gadget") - sh.rmdir("/mnt/station_ramdisk/hdd_gadget") - sh.modprobe("-r", "g_mass_storage") - self.fail("failed: Unable to copy files through USB.") - # check if the device is still mounted - if not os.path.ismount("/mnt/station_ramdisk/hdd_gadget"): - sh.rm("/mnt/station_ramdisk/hdd_gadget/*") - sh.rmdir("/mnt/station_ramdisk/hdd_gadget") - sh.modprobe("-r", "g_mass_storage") - self.fail("failed: USB device unmounted during/after copying files.") - # Check md5 - try: - p = sh.md5sum("/mnt/station_nfsdisk/usb-test.bin") - except sh.ErrorReturnCode as e: - sh.umount("/mnt/station_ramdisk/hdd_gadget") - sh.rmdir("/mnt/station_ramdisk/hdd_gadget") - sh.modprobe("-r", "g_mass_storage") - self.fail("failed: Unable to calculate MD5 of the copied file.") - newmd5 = p.stdout.decode().split(" ")[0] - with open('/mnt/station_ramdisk/hdd_gadget/usb-test.bin.md5', 'r') as outfile: - oldmd5 = outfile.read().rstrip("\n") - outfile.close() - if newmd5 != oldmd5: - sh.umount("/mnt/station_ramdisk/hdd_gadget") - sh.rmdir("/mnt/station_ramdisk/hdd_gadget") - sh.modprobe("-r", "g_mass_storage") - self.fail("failed: MD5 check failed.") - # delete copied files - sh.rm("-f", "/mnt/station_nfsdisk/usb-test.bin", "/mnt/station_nfsdisk/usb-test.bin.md5") - # Finish - sh.umount("/mnt/station_ramdisk/hdd_gadget") - sh.rmdir("/mnt/station_ramdisk/hdd_gadget") - sh.modprobe("-r", "g_mass_storage") - - def getresults(self): - return self.__resultlist - - def gettextresult(self): - return "" diff --git a/test-cli/test/tests/qwifi.py b/test-cli/test/tests/qwifi.py index 3dd9e38..4695041 100644 --- a/test-cli/test/tests/qwifi.py +++ b/test-cli/test/tests/qwifi.py @@ -1,9 +1,11 @@ import unittest -import sh -import re -import json import time - +import uuid +import iperf3 +import netifaces +from test.helpers.iw import iwScan +from test.helpers.utils import save_json_to_disk +from test.helpers.utils import station2Port class Qwifi(unittest.TestCase): __serverip = None @@ -19,81 +21,76 @@ class Qwifi(unittest.TestCase): def __init__(self, testname, testfunc, varlist): self.params = varlist super(Qwifi, self).__init__(testfunc) - if "serverip" in varlist: - self.__serverip = varlist["serverip"] - else: - raise Exception('serverip param inside Qwifi have been be defined') - if "bwexpected" in varlist: - self.__bwexpected = varlist["bwexpected"] - else: - raise Exception('OKBW param inside Qwifi must be defined') - if "port" in varlist: - self.__port = varlist["port"] - else: - raise Exception('port param inside Qwifi must be defined') - self.__numbytestx = "10M" self._testMethodDoc = testname + self.__xmlObj = varlist["xml"] + self.__QwifiName = varlist.get('name', 'qwifi') + self.__loops = varlist.get('loops', self.__xmlObj.getKeyVal(self.__QwifiName, "loops", "1")) + self.__port = varlist.get('port', self.__xmlObj.getKeyVal(self.__QwifiName, "port", "5000")) + self.__bw = varlist.get('bwexpected', self.__xmlObj.getKeyVal(self.__QwifiName, "bwexpected", "5.0")) + self.__serverip = varlist.get('serverip', self.__xmlObj.getKeyVal(self.__QwifiName, "serverip", "localhost")) + self.__duration = varlist.get('duration', self.__xmlObj.getKeyVal(self.__QwifiName, "duration", "10")) + self.__interface = varlist.get('interface', self.__xmlObj.getKeyVal(self.__QwifiName, "interface", "wlan0")) + self.__toPath = varlist.get('to', self.__xmlObj.getKeyVal(self.__QwifiName, "to", "/mnt/station_ramdisk")) + self.__retriesCount = varlist.get('retries', self.__xmlObj.getKeyVal(self.__QwifiName, "retries", "5")) + self.__waitRetryTime = varlist.get('wait_retry', self.__xmlObj.getKeyVal(self.__QwifiName, "wait_retry", "10")) + self.__wifi_res_file = varlist.get('wifi_res_file', self.__xmlObj.getKeyVal(self.__QwifiName, "wifi_res_file", "wifi_test_{}.json")) + self.__wifi_st_file = varlist.get('wifi_st_file', self.__xmlObj.getKeyVal(self.__QwifiName, "wifi_st_file", "wifi_st_{}.json")) + self.__file_uuid = uuid.uuid4() + self.__wifi_res_file = self.__wifi_res_file.format(self.__file_uuid) + self.__wifi_st_file = self.__wifi_st_file.format(self.__file_uuid) self.__resultlist = [] - def execute(self): - # check if the board is connected to the router by wifi - p = sh.iw("wlan0", "link") - if p.exit_code == 0: - # get the first line of the output stream - out1 = p.stdout.decode('ascii').splitlines()[0] - if out1 != "Not connected.": - # check if the board has ip in the wlan0 interface - p = sh.ifconfig("wlan0") - if p.exit_code == 0: - # check if wlan0 has an IP - result = re.search( - 'inet addr:(?!127\.0{1,3}\.0{1,3}\.0{0,2}1$)((25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.){3}(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)', - p.stdout.decode('ascii')) - if result: - # execute iperf command against the server, but it implements attempts in case the server is busy - iperfdone = False - while not iperfdone: - try: - p = sh.iperf3("-c", self.__serverip, "-n", self.__numbytestx, "-f", "m", "-p", self.__port, - "-J", _timeout=20) - iperfdone = True - except sh.TimeoutException: - self.fail("failed: iperf timeout reached") - except sh.ErrorReturnCode: - time.sleep(self.timebetweenattempts) - - # check if it was executed succesfully - if p.exit_code == 0: - if p.stdout == "": - self.fail("failed: error executing iperf command") - # analyze output string - data = json.loads(p.stdout.decode('ascii')) - self.__bwreal = float(data['end']['sum_received']['bits_per_second']) / 1024 / 1024 - # save result file - with open('/mnt/station_ramdisk/wifi-iperf3.json', 'w') as outfile: - json.dump(data, outfile, indent=4) - outfile.close() - self.__resultlist.append( - { - "description": "iperf3 output", - "filepath": "/mnt/station_ramdisk/wifi-iperf3.json", - "mimetype": "application/json" - } - ) + def checkInterface(self, reqInterface): + ifaces = netifaces.interfaces() + for t in ifaces: + if t == reqInterface: + return True + return False - # check if BW is in the expected range - if self.__bwreal < float(self.__bwexpected): - self.fail("failed: speed is lower than expected. Speed(Mbits/s): " + str(self.__bwreal)) - else: - self.fail("failed: could not complete iperf3 command") - else: - self.fail("failed: wlan0 interface doesn't have any ip address.") + def execute(self): + self.__resultlist = [] + stationList = {} + # check interfaces + if not self.checkInterface(self.__interface): + self.fail('Interface not found: {}'.format(self.__interface)) + # scan networks + scanner = iwScan(self.__interface) + if scanner.scan(): + stationList = scanner.getStations() + # check if we are connected + if not scanner.findConnected(): + self.fail('Not connected to test AP') + else: + strError = scanner.getLastError() + self.fail('qWifi scanner Execution error: {}'.format(strError)) + # Start Iperf + client = iperf3.Client() + client.duration = int(self.__duration) + client.server_hostname = self.__serverip + client.port = station2Port(self.__port) + count = int(self.__retriesCount) + while count > 0: + result = client.run() + if result.error == None: + if result.received_Mbps >= float(self.__bw) and result.sent_Mbps >= float(self.__bw): + save_json_to_disk(filePath='{}/{}'.format(self.__toPath, self.__wifi_st_file), + description='wifi scan', + mime='application/json', + json_data=stationList, + result=self.__resultlist) + save_json_to_disk(filePath='{}/{}'.format(self.__toPath, self.__wifi_res_file), + description='wifi {} iperf3'.format(self.__interface), + mime='application/json', + json_data=result.json, + result=self.__resultlist) + return else: - self.fail("failed: could not complete ifconfig command.") + self.fail('bw fail: rx:{} tx:{}'.format(result.received_Mbps, result.sent_Mbps)) else: - self.fail("failed: wifi module is not connected to the router.") - else: - self.fail("failed: could not execute iw command") + count = count - 1 + time.sleep(int(self.__waitRetryTime)) + + self.fail('qWifi (max tries) Execution error: {}'.format(result.error)) def getresults(self): return self.__resultlist -- cgit v1.1